diff --git a/cmd/drone-server/inject_external.go b/cmd/drone-server/inject_external.go index 515f1f904..d5a29a1e3 100644 --- a/cmd/drone-server/inject_external.go +++ b/cmd/drone-server/inject_external.go @@ -15,12 +15,9 @@ package main import ( - "context" - "fmt" - "github.com/drone/drone/cmd/drone-server/config" + "github.com/drone/drone/service/redisdb" - "github.com/go-redis/redis/v8" "github.com/google/wire" ) @@ -29,23 +26,6 @@ var externalSet = wire.NewSet( provideRedisClient, ) -func provideRedisClient(config config.Config) (rdb *redis.Client, err error) { - if config.Redis.ConnectionString == "" { - return - } - - options, err := redis.ParseURL(config.Redis.ConnectionString) - if err != nil { - return - } - - rdb = redis.NewClient(options) - - _, err = rdb.Ping(context.Background()).Result() - if err != nil { - err = fmt.Errorf("redis not accessibe: %w", err) - return - } - - return +func provideRedisClient(config config.Config) (rdb redisdb.RedisDB, err error) { + return redisdb.New(config) } diff --git a/cmd/drone-server/inject_scheduler.go b/cmd/drone-server/inject_scheduler.go index fa3337a1a..42a6053c9 100644 --- a/cmd/drone-server/inject_scheduler.go +++ b/cmd/drone-server/inject_scheduler.go @@ -17,8 +17,8 @@ package main import ( "github.com/drone/drone/core" "github.com/drone/drone/scheduler/queue" + "github.com/drone/drone/service/redisdb" - "github.com/go-redis/redis/v8" "github.com/google/wire" ) @@ -29,6 +29,6 @@ var schedulerSet = wire.NewSet( // provideScheduler is a Wire provider function that returns a // scheduler based on the environment configuration. -func provideScheduler(store core.StageStore, rdb *redis.Client) core.Scheduler { - return queue.New(store, rdb) +func provideScheduler(store core.StageStore, r redisdb.RedisDB) core.Scheduler { + return queue.New(store, r) } diff --git a/cmd/drone-server/wire_gen.go b/cmd/drone-server/wire_gen.go index 295605f24..33783fe50 100644 --- a/cmd/drone-server/wire_gen.go +++ b/cmd/drone-server/wire_gen.go @@ -55,13 +55,13 @@ func InitializeApplication(config2 config.Config) (application, error) { cronStore := cron.New(db) repositoryStore := provideRepoStore(db) buildStore := provideBuildStore(db) - redisClient, err := provideRedisClient(config2) + redisDB, err := provideRedisClient(config2) if err != nil { return application{}, err } - corePubsub := pubsub.New(redisClient) + corePubsub := pubsub.New(redisDB) stageStore := provideStageStore(db) - scheduler := provideScheduler(stageStore, redisClient) + scheduler := provideScheduler(stageStore, redisDB) statusService := provideStatusService(client, renewer, config2) stepStore := step.New(db) system := provideSystem(config2) @@ -78,7 +78,7 @@ func InitializeApplication(config2 config.Config) (application, error) { coreLicense := provideLicense(client, config2) datadog := provideDatadog(userStore, repositoryStore, buildStore, system, coreLicense, config2) logStore := provideLogStore(db, config2) - logStream := livelog.New(redisClient) + logStream := livelog.New(redisDB) netrcService := provideNetrcService(client, renewer, config2) secretStore := secret.New(db, encrypter) globalSecretStore := global.New(db, encrypter) diff --git a/livelog/livelog.go b/livelog/livelog.go index 6843732e6..5e288189b 100644 --- a/livelog/livelog.go +++ b/livelog/livelog.go @@ -12,19 +12,20 @@ // See the License for the specific language governing permissions and // limitations under the License. +// +build !oss + package livelog import ( "github.com/drone/drone/core" - - "github.com/go-redis/redis/v8" + "github.com/drone/drone/service/redisdb" ) // New creates a new log streamer. If Redis client passed as parameter is not nil it uses // a Redis implementation, otherwise it uses an in-memory implementation. -func New(rdb *redis.Client) core.LogStream { +func New(rdb redisdb.RedisDB) core.LogStream { if rdb != nil { - return newRedis(rdb) + return newStreamRedis(rdb) } return newStreamer() diff --git a/livelog/livelogOSS.go b/livelog/livelogOSS.go new file mode 100644 index 000000000..c638b25e5 --- /dev/null +++ b/livelog/livelogOSS.go @@ -0,0 +1,27 @@ +// Copyright 2021 Drone IO, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build oss + +package livelog + +import ( + "github.com/drone/drone/core" + "github.com/drone/drone/service/redisdb" +) + +// New creates a new in-memory log streamer. +func New(r redisdb.RedisDB) core.LogStream { + return newStreamer() +} diff --git a/livelog/redis.go b/livelog/streamRedis.go similarity index 79% rename from livelog/redis.go rename to livelog/streamRedis.go index 5e7c670e2..9a72fab79 100644 --- a/livelog/redis.go +++ b/livelog/streamRedis.go @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +// +build !oss + package livelog import ( @@ -22,13 +24,14 @@ import ( "time" "github.com/drone/drone/core" + "github.com/drone/drone/service/redisdb" "github.com/go-redis/redis/v8" ) -func newRedis(rdb *redis.Client) core.LogStream { - return &redisStream{ - client: rdb, +func newStreamRedis(r redisdb.RedisDB) core.LogStream { + return streamRedis{ + rdb: r, } } @@ -40,18 +43,20 @@ const ( redisStreamPrefix = "drone-log-" ) -type redisStream struct { - client redis.Cmdable +type streamRedis struct { + rdb redisdb.RedisDB } // Create creates a redis stream and sets an expiry on it. -func (r *redisStream) Create(ctx context.Context, id int64) error { +func (r streamRedis) Create(ctx context.Context, id int64) error { // Delete if a stream already exists with the same key _ = r.Delete(ctx, id) + client := r.rdb.Client() + key := redisStreamPrefix + strconv.FormatInt(id, 10) - addResp := r.client.XAdd(ctx, &redis.XAddArgs{ + addResp := client.XAdd(ctx, &redis.XAddArgs{ Stream: key, ID: "*", // auto-generate a unique incremental ID MaxLen: bufferSize, @@ -62,7 +67,7 @@ func (r *redisStream) Create(ctx context.Context, id int64) error { return fmt.Errorf("livelog/redis: could not create stream with key %s", key) } - res := r.client.Expire(ctx, key, redisKeyExpiryTime) + res := client.Expire(ctx, key, redisKeyExpiryTime) if err := res.Err(); err != nil { return fmt.Errorf("livelog/redis: could not set expiry for key %s", key) } @@ -71,14 +76,16 @@ func (r *redisStream) Create(ctx context.Context, id int64) error { } // Delete deletes a stream -func (r *redisStream) Delete(ctx context.Context, id int64) error { +func (r streamRedis) Delete(ctx context.Context, id int64) error { + client := r.rdb.Client() + key := redisStreamPrefix + strconv.FormatInt(id, 10) if err := r._exists(ctx, key); err != nil { return err } - deleteResp := r.client.Del(ctx, key) + deleteResp := client.Del(ctx, key) if err := deleteResp.Err(); err != nil { return fmt.Errorf("livelog/redis: could not delete stream for step %d", id) } @@ -87,7 +94,9 @@ func (r *redisStream) Delete(ctx context.Context, id int64) error { } // Write writes information into the Redis stream -func (r *redisStream) Write(ctx context.Context, id int64, line *core.Line) error { +func (r streamRedis) Write(ctx context.Context, id int64, line *core.Line) error { + client := r.rdb.Client() + key := redisStreamPrefix + strconv.FormatInt(id, 10) if err := r._exists(ctx, key); err != nil { @@ -95,7 +104,7 @@ func (r *redisStream) Write(ctx context.Context, id int64, line *core.Line) erro } lineJsonData, _ := json.Marshal(line) - addResp := r.client.XAdd(ctx, &redis.XAddArgs{ + addResp := client.XAdd(ctx, &redis.XAddArgs{ Stream: key, ID: "*", // auto-generate a unique incremental ID MaxLen: bufferSize, @@ -110,7 +119,9 @@ func (r *redisStream) Write(ctx context.Context, id int64, line *core.Line) erro } // Tail returns back all the lines in the stream. -func (r *redisStream) Tail(ctx context.Context, id int64) (<-chan *core.Line, <-chan error) { +func (r streamRedis) Tail(ctx context.Context, id int64) (<-chan *core.Line, <-chan error) { + client := r.rdb.Client() + key := redisStreamPrefix + strconv.FormatInt(id, 10) if err := r._exists(ctx, key); err != nil { @@ -135,7 +146,7 @@ func (r *redisStream) Tail(ctx context.Context, id int64) (<-chan *core.Line, <- case <-timeout: return default: - readResp := r.client.XRead(ctx, &redis.XReadArgs{ + readResp := client.XRead(ctx, &redis.XReadArgs{ Streams: append([]string{key}, lastID), Block: redisPollTime, // periodically check for ctx.Done }) @@ -171,12 +182,14 @@ func (r *redisStream) Tail(ctx context.Context, id int64) (<-chan *core.Line, <- } // Info returns info about log streams present in redis -func (r *redisStream) Info(ctx context.Context) (info *core.LogStreamInfo) { +func (r streamRedis) Info(ctx context.Context) (info *core.LogStreamInfo) { + client := r.rdb.Client() + info = &core.LogStreamInfo{ Streams: make(map[int64]int), } - keysResp := r.client.Keys(ctx, redisStreamPrefix+"*") + keysResp := client.Keys(ctx, redisStreamPrefix+"*") if err := keysResp.Err(); err != nil { return } @@ -188,7 +201,7 @@ func (r *redisStream) Info(ctx context.Context) (info *core.LogStreamInfo) { continue } - lenResp := r.client.XLen(ctx, key) + lenResp := client.XLen(ctx, key) if err := lenResp.Err(); err != nil { continue } @@ -201,8 +214,10 @@ func (r *redisStream) Info(ctx context.Context) (info *core.LogStreamInfo) { return } -func (r *redisStream) _exists(ctx context.Context, key string) error { - exists := r.client.Exists(ctx, key) +func (r streamRedis) _exists(ctx context.Context, key string) error { + client := r.rdb.Client() + + exists := client.Exists(ctx, key) if exists.Err() != nil || exists.Val() == 0 { return fmt.Errorf("livelog/redis: log stream %s not found", key) } diff --git a/pubsub/hubRedis.go b/pubsub/hubRedis.go new file mode 100644 index 000000000..8b546d6e6 --- /dev/null +++ b/pubsub/hubRedis.go @@ -0,0 +1,124 @@ +// Copyright 2021 Drone IO, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build !oss + +package pubsub + +import ( + "context" + "encoding/json" + "fmt" + "os" + "sync" + + "github.com/drone/drone/core" + "github.com/drone/drone/service/redisdb" +) + +const ( + redisPubSubEvents = "drone-events" + redisPubSubCapacity = 100 +) + +func newHubRedis(r redisdb.RedisDB) core.Pubsub { + h := &hubRedis{ + rdb: r, + subscribers: make(map[chan<- *core.Message]struct{}), + } + + go r.Subscribe(context.Background(), redisPubSubEvents, redisPubSubCapacity, h) + + return h +} + +type hubRedis struct { + sync.Mutex + rdb redisdb.RedisDB + subscribers map[chan<- *core.Message]struct{} +} + +// Publish publishes a new message. All subscribers will get it. +func (h *hubRedis) Publish(ctx context.Context, e *core.Message) (err error) { + client := h.rdb.Client() + + data, err := json.Marshal(e) + if err != nil { + return + } + + _, err = client.Publish(ctx, redisPubSubEvents, data).Result() + if err != nil { + return + } + + return +} + +// Subscribe add a new subscriber. The subscriber gets event until its context is not finished. +func (h *hubRedis) Subscribe(ctx context.Context) (<-chan *core.Message, <-chan error) { + chMessage := make(chan *core.Message, redisPubSubCapacity) + chErr := make(chan error) + + h.Lock() + h.subscribers[chMessage] = struct{}{} + h.Unlock() + + go func() { + <-ctx.Done() + + h.Lock() + delete(h.subscribers, chMessage) + h.Unlock() + + close(chMessage) + close(chErr) + }() + + return chMessage, chErr +} + +// Subscribers returns number of subscribers. +func (h *hubRedis) Subscribers() (int, error) { + h.Lock() + n := len(h.subscribers) + h.Unlock() + + return n, nil +} + +// ProcessMessage relays the message to all subscribers listening to drone events. +// It is a part of redisdb.PubSubProcessor implementation and it's called internally by redisdb.Subscribe. +func (h *hubRedis) ProcessMessage(s string) { + message := &core.Message{} + err := json.Unmarshal([]byte(s), message) + if err != nil { + // Ignore invalid messages. This is a "should not happen" situation, + // because messages are encoded as json in Publish(). + _, _ = fmt.Fprintf(os.Stderr, "pubsub/redis: failed to unmarshal a message. %s\n", err) + return + } + + h.Lock() + for ss := range h.subscribers { + select { + case ss <- message: + default: // messages are lost if a subscriber channel reaches its capacity + } + } + h.Unlock() +} + +// ProcessError is a part of redisdb.PubSubProcessor implementation. +func (h *hubRedis) ProcessError(error) {} diff --git a/pubsub/pubsub.go b/pubsub/pubsub.go index 4c22b580c..f4f2db160 100644 --- a/pubsub/pubsub.go +++ b/pubsub/pubsub.go @@ -12,19 +12,20 @@ // See the License for the specific language governing permissions and // limitations under the License. +// +build !oss + package pubsub import ( "github.com/drone/drone/core" - - "github.com/go-redis/redis/v8" + "github.com/drone/drone/service/redisdb" ) // New creates a new publish subscriber. If Redis client passed as parameter is not nil it uses // a Redis implementation, otherwise it uses an in-memory implementation. -func New(rdb *redis.Client) core.Pubsub { - if rdb != nil { - return newRedis(rdb) +func New(r redisdb.RedisDB) core.Pubsub { + if r != nil { + return newHubRedis(r) } return newHub() diff --git a/pubsub/pubsubOSS.go b/pubsub/pubsubOSS.go new file mode 100644 index 000000000..85a01fd69 --- /dev/null +++ b/pubsub/pubsubOSS.go @@ -0,0 +1,27 @@ +// Copyright 2021 Drone IO, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build oss + +package pubsub + +import ( + "github.com/drone/drone/core" + "github.com/drone/drone/service/redisdb" +) + +// New creates a new in-memory publish subscriber. +func New(r redisdb.RedisDB) core.Pubsub { + return newHub() +} diff --git a/pubsub/redis.go b/pubsub/redis.go deleted file mode 100644 index 9fef153cc..000000000 --- a/pubsub/redis.go +++ /dev/null @@ -1,134 +0,0 @@ -// Copyright 2021 Drone IO, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package pubsub - -import ( - "context" - "encoding/json" - "fmt" - "os" - "time" - - "github.com/drone/drone/core" - - "github.com/go-redis/redis/v8" -) - -func newRedis(rdb *redis.Client) core.Pubsub { - return &hubRedis{rdb: rdb} -} - -const redisPubSubEvents = "drone-events" - -type hubRedis struct { - rdb *redis.Client -} - -func (h *hubRedis) Publish(ctx context.Context, e *core.Message) (err error) { - data, err := json.Marshal(e) - if err != nil { - return - } - - _, err = h.rdb.Publish(ctx, redisPubSubEvents, data).Result() - if err != nil { - return - } - - return -} - -func (h *hubRedis) Subscribe(ctx context.Context) (<-chan *core.Message, <-chan error) { - chMessage := make(chan *core.Message, 100) - chErr := make(chan error) - - go func() { - pubsub := h.rdb.Subscribe(ctx, redisPubSubEvents) - ch := pubsub.Channel(redis.WithChannelSize(100)) - - defer func() { - _ = pubsub.Close() - close(chMessage) - close(chErr) - }() - - err := pubsub.Ping(ctx) - if err != nil { - chErr <- err - return - } - - for { - select { - case m, ok := <-ch: - if !ok { - chErr <- fmt.Errorf("pubsub/redis: channel=%s closed", redisPubSubEvents) - return - } - - message := &core.Message{} - err = json.Unmarshal([]byte(m.Payload), message) - if err != nil { - // This is a "should not happen" situation, - // because messages are encoded as json above in Publish(). - _, _ = fmt.Fprintf(os.Stderr, "pubsub/redis: failed to unmarshal a message. %s\n", err) - continue - } - - chMessage <- message - - case <-ctx.Done(): - return - } - } - }() - - return chMessage, chErr -} - -func (h *hubRedis) Subscribers() (int, error) { - ctx, cancelFunc := context.WithTimeout(context.Background(), 5*time.Second) - defer cancelFunc() - - v, err := h.rdb.Do(ctx, "pubsub", "numsub", redisPubSubEvents).Result() - if err != nil { - err = fmt.Errorf("pubsub/redis: failed to get number of subscribers. %w", err) - return 0, err - } - - values, ok := v.([]interface{}) // the result should be: [, ] - if !ok || len(values) != 2 { - err = fmt.Errorf("pubsub/redis: failed to extarct number of subscribers from: %v", values) - return 0, err - } - - switch n := values[1].(type) { - case int: - return n, nil - case uint: - return int(n), nil - case int32: - return int(n), nil - case uint32: - return int(n), nil - case int64: - return int(n), nil - case uint64: - return int(n), nil - default: - err = fmt.Errorf("pubsub/redis: unsupported type for number of subscribers: %T", values[1]) - return 0, err - } -} diff --git a/scheduler/queue/cancellerRedis.go b/scheduler/queue/cancellerRedis.go new file mode 100644 index 000000000..d43a976f8 --- /dev/null +++ b/scheduler/queue/cancellerRedis.go @@ -0,0 +1,160 @@ +// Copyright 2021 Drone IO, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build !oss + +package queue + +import ( + "context" + "fmt" + "os" + "strconv" + "sync" + "time" + + "github.com/drone/drone/service/redisdb" + + "github.com/go-redis/redis/v8" +) + +const ( + redisPubSubCancel = "drone-cancel" + redisCancelValuePrefix = "drone-cancel-" + redisCancelValueTimeout = 5 * time.Minute + redisCancelValue = "canceled" +) + +func newCancellerRedis(r redisdb.RedisDB) *cancellerRedis { + h := &cancellerRedis{ + rdb: r, + subscribers: make(map[*cancelSubscriber]struct{}), + } + + go r.Subscribe(context.Background(), redisPubSubCancel, 1, h) + + return h +} + +type cancellerRedis struct { + rdb redisdb.RedisDB + subscribers map[*cancelSubscriber]struct{} + sync.Mutex +} + +type cancelSubscriber struct { + id int64 + ch chan<- error +} + +// Cancel informs all subscribers that a build with the provided id is cancelled. +func (c *cancellerRedis) Cancel(ctx context.Context, id int64) (err error) { + client := c.rdb.Client() + + ids := strconv.FormatInt(id, 10) + + // publish a cancel event to all subscribers (runners) waiting to + _, err = client.Publish(ctx, redisPubSubCancel, ids).Result() + if err != nil { + return + } + + // put a limited duration value in case a runner isn't listening currently. + _, err = client.Set(ctx, redisCancelValuePrefix+ids, redisCancelValue, redisCancelValueTimeout).Result() + if err != nil { + return + } + + return +} + +// Cancelled waits until it gets info that a build with the provided id is cancelled. +// The waiting is aborted when the provided context is done. +func (c *cancellerRedis) Cancelled(ctx context.Context, id int64) (isCancelled bool, err error) { + client := c.rdb.Client() + + ids := strconv.FormatInt(id, 10) + + // first check if the build is already cancelled + + result, err := client.Get(ctx, redisCancelValuePrefix+ids).Result() + if err != nil && err != redis.Nil { + return + } + + isCancelled = err != redis.Nil && result == redisCancelValue + if isCancelled { + return + } + + // if it is not cancelled, subscribe and listen to cancel build events + // until the context is cancelled or until the build is cancelled. + + ch := make(chan error) + sub := &cancelSubscriber{id: id, ch: ch} + + c.Lock() + c.subscribers[sub] = struct{}{} + c.Unlock() + + select { + case err = <-ch: + // If the build is cancelled or an error happened, + // than the subscriber is removed from the set by other go routine + isCancelled = err != nil + case <-ctx.Done(): + // If the context is cancelled then the subscriber must be be removed here. + c.Lock() + delete(c.subscribers, sub) + c.Unlock() + } + + return +} + +// ProcessMessage informs all subscribers listening to cancellation that the build with this id is cancelled. +// It is a part of redisdb.PubSubProcessor implementation and it's called internally by Subscribe. +func (c *cancellerRedis) ProcessMessage(s string) { + id, err := strconv.ParseInt(s, 10, 64) + if err != nil { + // Ignore invalid messages. This is a "should not happen" situation, + // because all messages are integers as strings in method Cancel(). + _, _ = fmt.Fprintf(os.Stderr, "canceller/redis: message is not an integer: %s\n", s) + return + } + + c.Lock() + for ss := range c.subscribers { + if ss.id == id { + ss.ch <- nil + close(ss.ch) + delete(c.subscribers, ss) + } + } + c.Unlock() +} + +// ProcessError informs all subscribers that an error happened and clears the set of subscribers. +// The set of subscribers is cleared because each subscriber receives only one message, +// so an error could cause that the message is missed - it's safer to return an error. +// It is a part of redisdb.PubSubProcessor implementation and it's called internally by Subscribe. +func (c *cancellerRedis) ProcessError(err error) { + c.Lock() + for ss := range c.subscribers { + ss.ch <- err + close(ss.ch) + delete(c.subscribers, ss) + } + c.Unlock() +} diff --git a/scheduler/queue/redis.go b/scheduler/queue/redis.go deleted file mode 100644 index f5096a87a..000000000 --- a/scheduler/queue/redis.go +++ /dev/null @@ -1,136 +0,0 @@ -// Copyright 2021 Drone IO, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package queue - -import ( - "context" - "fmt" - "os" - "strconv" - "time" - - "github.com/go-redis/redis/v8" -) - -const ( - redisPubSubCancel = "drone-cancel" - redisCancelValuePrefix = "drone-cancel-" - redisCancelValueTimeout = 5 * time.Minute - redisCancelValue = "canceled" -) - -func newRedisCanceller(rdb *redis.Client) *redisCanceller { - return &redisCanceller{rdb: rdb} -} - -type redisCanceller struct { - rdb *redis.Client -} - -func (c *redisCanceller) Cancel(ctx context.Context, id int64) (err error) { - ids := strconv.FormatInt(id, 10) - - // publish a cancel event to all subscribers (runners) waiting to - _, err = c.rdb.Publish(ctx, redisPubSubCancel, ids).Result() - if err != nil { - return - } - - // put a limited duration value in case a runner isn't listening currently. - _, err = c.rdb.Set(ctx, redisCancelValuePrefix+ids, redisCancelValue, redisCancelValueTimeout).Result() - if err != nil { - return - } - - return nil -} - -func (c *redisCanceller) Cancelled(ctx context.Context, id int64) (isCancelled bool, err error) { - ids := strconv.FormatInt(id, 10) - - // first check if the build is already cancelled - - result, err := c.rdb.Get(ctx, redisCancelValuePrefix+ids).Result() - if err != nil && err != redis.Nil { - return - } - - isCancelled = err != redis.Nil && result == redisCancelValue - if isCancelled { - return - } - - // if it is not cancelled, subscribe and listen to cancel build events - // until the context is cancelled or until the build is cancelled. - - chResult := make(chan interface{}) - - go func() { - pubsub := c.rdb.Subscribe(ctx, redisPubSubCancel) - ch := pubsub.Channel() - - defer func() { - _ = pubsub.Close() - close(chResult) - }() - - err := pubsub.Ping(ctx) - if err != nil { - chResult <- err - return - } - - for { - select { - case m, ok := <-ch: - if !ok { - chResult <- fmt.Errorf("canceller/redis: channel=%s closed", redisPubSubCancel) - return - } - - idMessage, err := strconv.ParseInt(m.Payload, 10, 64) - if err != nil { // should not happen - _, _ = fmt.Fprintf(os.Stderr, "canceller/redis: message is not an integer: %s\n", m.Payload) - continue // ignore data errors - } - - if id == idMessage { - chResult <- true - } - - case <-ctx.Done(): - return - } - } - }() - - value, ok := <-chResult - - if !ok { - return - } - - err, ok = value.(error) - if ok { - return - } - - isCancelled, ok = value.(bool) - if ok { - return - } - - return -} diff --git a/scheduler/queue/scheduler.go b/scheduler/queue/scheduler.go index 2ab5c5dec..44620870f 100644 --- a/scheduler/queue/scheduler.go +++ b/scheduler/queue/scheduler.go @@ -1,4 +1,4 @@ -// Copyright 2019 Drone IO, Inc. +// Copyright 2021 Drone IO, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -17,10 +17,6 @@ package queue import ( "context" "errors" - - "github.com/drone/drone/core" - - "github.com/go-redis/redis/v8" ) type scheduler struct { @@ -28,30 +24,6 @@ type scheduler struct { *canceller } -type redisScheduler struct { - *queue - *redisCanceller -} - -// New creates a new scheduler. -func New(store core.StageStore, rdb *redis.Client) core.Scheduler { - if rdb != nil { - return redisScheduler{ - queue: newQueue(store), - redisCanceller: newRedisCanceller(rdb), - } - } - - return scheduler{ - queue: newQueue(store), - canceller: newCanceller(), - } -} - func (d scheduler) Stats(context.Context) (interface{}, error) { return nil, errors.New("not implemented") } - -func (d redisScheduler) Stats(context.Context) (interface{}, error) { - return nil, errors.New("not implemented") -} diff --git a/scheduler/queue/schedulerNonOSS.go b/scheduler/queue/schedulerNonOSS.go new file mode 100644 index 000000000..d23fb6c1d --- /dev/null +++ b/scheduler/queue/schedulerNonOSS.go @@ -0,0 +1,37 @@ +// Copyright 2021 Drone IO, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build !oss + +package queue + +import ( + "github.com/drone/drone/core" + "github.com/drone/drone/service/redisdb" +) + +// New creates a new scheduler. +func New(store core.StageStore, r redisdb.RedisDB) core.Scheduler { + if r == nil { + return scheduler{ + queue: newQueue(store), + canceller: newCanceller(), + } + } + + return schedulerRedis{ + queue: newQueue(store), + cancellerRedis: newCancellerRedis(r), + } +} diff --git a/scheduler/queue/schedulerOSS.go b/scheduler/queue/schedulerOSS.go new file mode 100644 index 000000000..5cee23bb1 --- /dev/null +++ b/scheduler/queue/schedulerOSS.go @@ -0,0 +1,30 @@ +// Copyright 2021 Drone IO, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build oss + +package queue + +import ( + "github.com/drone/drone/core" + "github.com/drone/drone/service/redisdb" +) + +// New creates a new scheduler. +func New(store core.StageStore, r redisdb.RedisDB) core.Scheduler { + return scheduler{ + queue: newQueue(store), + canceller: newCanceller(), + } +} diff --git a/scheduler/queue/schedulerRedis.go b/scheduler/queue/schedulerRedis.go new file mode 100644 index 000000000..a14cf489f --- /dev/null +++ b/scheduler/queue/schedulerRedis.go @@ -0,0 +1,31 @@ +// Copyright 2021 Drone IO, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build !oss + +package queue + +import ( + "context" + "errors" +) + +type schedulerRedis struct { + *queue + *cancellerRedis +} + +func (d schedulerRedis) Stats(context.Context) (interface{}, error) { + return nil, errors.New("not implemented") +} diff --git a/service/redisdb/redisdb.go b/service/redisdb/redisdb.go new file mode 100644 index 000000000..f604229c0 --- /dev/null +++ b/service/redisdb/redisdb.go @@ -0,0 +1,161 @@ +// Copyright 2021 Drone IO, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package redisdb + +import ( + "context" + "fmt" + "time" + + "github.com/drone/drone/cmd/drone-server/config" + + "github.com/go-redis/redis/v8" + "github.com/sirupsen/logrus" +) + +func New(config config.Config) (srv RedisDB, err error) { + if config.Redis.ConnectionString == "" { + return + } + + options, err := redis.ParseURL(config.Redis.ConnectionString) + if err != nil { + return + } + + rdb := redis.NewClient(options) + + _, err = rdb.Ping(context.Background()).Result() + if err != nil { + err = fmt.Errorf("redis not accessibe: %w", err) + return + } + + srv = redisService{ + rdb: rdb, + } + + return +} + +type RedisDB interface { + Client() redis.Cmdable + Subscribe(ctx context.Context, channelName string, channelSize int, proc PubSubProcessor) +} + +type redisService struct { + rdb *redis.Client +} + +// Client exposes redis.Cmdable interface +func (r redisService) Client() redis.Cmdable { + return r.rdb +} + +type PubSubProcessor interface { + ProcessMessage(s string) + ProcessError(err error) +} + +var backoffDurations = []time.Duration{ + 0, time.Second, 3 * time.Second, 5 * time.Second, 10 * time.Second, 20 * time.Second, +} + +// Subscribe subscribes to a redis pub-sub channel. The messages are processed with the supplied PubSubProcessor. +// In case of en error the function will automatically reconnect with an increasing back of delay. +// The only way to exit this function is to terminate or expire the supplied context. +func (r redisService) Subscribe(ctx context.Context, channelName string, channelSize int, proc PubSubProcessor) { + var connectTry int + for { + err := func() (err error) { + defer func() { + // panic recovery because external PubSubProcessor methods might cause panics. + if p := recover(); p != nil { + err = fmt.Errorf("redis pubsub: panic: %v", p) + } + }() + + var options []redis.ChannelOption + + if channelSize > 1 { + options = append(options, redis.WithChannelSize(channelSize)) + } + + pubsub := r.rdb.Subscribe(ctx, channelName) + ch := pubsub.Channel(options...) + + defer func() { + _ = pubsub.Close() + }() + + // make sure the connection is successful + err = pubsub.Ping(ctx) + if err != nil { + return + } + + connectTry = 0 // successfully connected, reset the counter + + logrus. + WithField("try", connectTry+1). + WithField("channel", channelName). + Trace("redis pubsub: subscribed") + + for { + select { + case m, ok := <-ch: + if !ok { + err = fmt.Errorf("redis pubsub: channel=%s closed", channelName) + return + } + + proc.ProcessMessage(m.Payload) + + case <-ctx.Done(): + err = ctx.Err() + return + } + } + }() + if err == nil { + // should not happen, the function should always exit with an error + continue + } + + proc.ProcessError(err) + + if err == context.Canceled || err == context.DeadlineExceeded { + logrus. + WithField("channel", channelName). + Trace("redis pubsub: finished") + return + } + + dur := backoffDurations[connectTry] + + logrus. + WithError(err). + WithField("try", connectTry+1). + WithField("pause", dur.String()). + WithField("channel", channelName). + Error("redis pubsub: connection failed, reconnecting") + + time.Sleep(dur) + + if connectTry < len(backoffDurations)-1 { + connectTry++ + } + } +}