diff --git a/cmd/drone-server/inject_external.go b/cmd/drone-server/inject_external.go new file mode 100644 index 000000000..515f1f904 --- /dev/null +++ b/cmd/drone-server/inject_external.go @@ -0,0 +1,51 @@ +// Copyright 2021 Drone IO, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "context" + "fmt" + + "github.com/drone/drone/cmd/drone-server/config" + + "github.com/go-redis/redis/v8" + "github.com/google/wire" +) + +// wire set for loading the external services. +var externalSet = wire.NewSet( + provideRedisClient, +) + +func provideRedisClient(config config.Config) (rdb *redis.Client, err error) { + if config.Redis.ConnectionString == "" { + return + } + + options, err := redis.ParseURL(config.Redis.ConnectionString) + if err != nil { + return + } + + rdb = redis.NewClient(options) + + _, err = rdb.Ping(context.Background()).Result() + if err != nil { + err = fmt.Errorf("redis not accessibe: %w", err) + return + } + + return +} diff --git a/cmd/drone-server/inject_store.go b/cmd/drone-server/inject_store.go index d91168568..0e06e9b0b 100644 --- a/cmd/drone-server/inject_store.go +++ b/cmd/drone-server/inject_store.go @@ -34,7 +34,6 @@ import ( "github.com/drone/drone/store/template" "github.com/drone/drone/store/user" - "github.com/go-redis/redis/v8" "github.com/google/wire" "github.com/sirupsen/logrus" ) @@ -56,7 +55,6 @@ var storeSet = wire.NewSet( global.New, step.New, template.New, - provideRedisClient, ) // provideDatabase is a Wire provider function that provides a @@ -179,18 +177,3 @@ func provideUserStore(db *db.DB, enc encrypt.Encrypter, config config.Config) co metric.UserCount(users) return users } - -func provideRedisClient(config config.Config) (rdb *redis.Client, err error) { - if config.Redis.ConnectionString == "" { - return - } - - options, err := redis.ParseURL(config.Redis.ConnectionString) - if err != nil { - return - } - - rdb = redis.NewClient(options) - - return -} diff --git a/cmd/drone-server/wire.go b/cmd/drone-server/wire.go index 84aab8c6f..bc9376511 100644 --- a/cmd/drone-server/wire.go +++ b/cmd/drone-server/wire.go @@ -32,6 +32,7 @@ func InitializeApplication(config config.Config) (application, error) { serverSet, serviceSet, storeSet, + externalSet, newApplication, ) return application{}, nil diff --git a/cmd/drone-server/wire_gen.go b/cmd/drone-server/wire_gen.go index d65d65b74..ebc2f8835 100644 --- a/cmd/drone-server/wire_gen.go +++ b/cmd/drone-server/wire_gen.go @@ -59,10 +59,7 @@ func InitializeApplication(config2 config.Config) (application, error) { if err != nil { return application{}, err } - corePubsub, err := pubsub.New(redisClient) - if err != nil { - return application{}, err - } + corePubsub := pubsub.New(redisClient) stageStore := provideStageStore(db) scheduler := provideScheduler(stageStore, config2) statusService := provideStatusService(client, renewer, config2) @@ -81,7 +78,7 @@ func InitializeApplication(config2 config.Config) (application, error) { coreLicense := provideLicense(client, config2) datadog := provideDatadog(userStore, repositoryStore, buildStore, system, coreLicense, config2) logStore := provideLogStore(db, config2) - logStream := livelog.New() + logStream := livelog.New(redisClient) netrcService := provideNetrcService(client, renewer, config2) secretStore := secret.New(db, encrypter) globalSecretStore := global.New(db, encrypter) diff --git a/livelog/livelog.go b/livelog/livelog.go index 9ab27d0b1..6843732e6 100644 --- a/livelog/livelog.go +++ b/livelog/livelog.go @@ -1,4 +1,4 @@ -// Copyright 2019 Drone IO, Inc. +// Copyright 2021 Drone IO, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -15,80 +15,17 @@ package livelog import ( - "context" - "errors" - "sync" - "github.com/drone/drone/core" + + "github.com/go-redis/redis/v8" ) -// error returned when a stream is not registered with -// the streamer. -var errStreamNotFound = errors.New("stream: not found") - -type streamer struct { - sync.Mutex - - streams map[int64]*stream -} - -// New returns a new in-memory log streamer. -func New() core.LogStream { - return &streamer{ - streams: make(map[int64]*stream), +// New creates a new log streamer. If Redis client passed as parameter is not nil it uses +// a Redis implementation, otherwise it uses an in-memory implementation. +func New(rdb *redis.Client) core.LogStream { + if rdb != nil { + return newRedis(rdb) } -} -func (s *streamer) Create(ctx context.Context, id int64) error { - s.Lock() - s.streams[id] = newStream() - s.Unlock() - return nil -} - -func (s *streamer) Delete(ctx context.Context, id int64) error { - s.Lock() - stream, ok := s.streams[id] - if ok { - delete(s.streams, id) - } - s.Unlock() - if !ok { - return errStreamNotFound - } - return stream.close() -} - -func (s *streamer) Write(ctx context.Context, id int64, line *core.Line) error { - s.Lock() - stream, ok := s.streams[id] - s.Unlock() - if !ok { - return errStreamNotFound - } - return stream.write(line) -} - -func (s *streamer) Tail(ctx context.Context, id int64) (<-chan *core.Line, <-chan error) { - s.Lock() - stream, ok := s.streams[id] - s.Unlock() - if !ok { - return nil, nil - } - return stream.subscribe(ctx) -} - -func (s *streamer) Info(ctx context.Context) *core.LogStreamInfo { - s.Lock() - defer s.Unlock() - info := &core.LogStreamInfo{ - Streams: map[int64]int{}, - } - for id, stream := range s.streams { - stream.Lock() - info.Streams[id] = len(stream.list) - stream.Unlock() - } - return info + return newStreamer() } diff --git a/livelog/redis.go b/livelog/redis.go new file mode 100644 index 000000000..5e7c670e2 --- /dev/null +++ b/livelog/redis.go @@ -0,0 +1,211 @@ +// Copyright 2019 Drone IO, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package livelog + +import ( + "context" + "encoding/json" + "fmt" + "strconv" + "time" + + "github.com/drone/drone/core" + + "github.com/go-redis/redis/v8" +) + +func newRedis(rdb *redis.Client) core.LogStream { + return &redisStream{ + client: rdb, + } +} + +const ( + redisKeyExpiryTime = 5 * time.Hour // How long each key exists in redis + redisPollTime = 100 * time.Millisecond // should not be too large to avoid redis clients getting occupied for long + redisTailMaxTime = 1 * time.Hour // maximum duration a tail can last + redisEntryKey = "line" + redisStreamPrefix = "drone-log-" +) + +type redisStream struct { + client redis.Cmdable +} + +// Create creates a redis stream and sets an expiry on it. +func (r *redisStream) Create(ctx context.Context, id int64) error { + // Delete if a stream already exists with the same key + _ = r.Delete(ctx, id) + + key := redisStreamPrefix + strconv.FormatInt(id, 10) + + addResp := r.client.XAdd(ctx, &redis.XAddArgs{ + Stream: key, + ID: "*", // auto-generate a unique incremental ID + MaxLen: bufferSize, + Approx: true, + Values: map[string]interface{}{redisEntryKey: []byte{}}, + }) + if err := addResp.Err(); err != nil { + return fmt.Errorf("livelog/redis: could not create stream with key %s", key) + } + + res := r.client.Expire(ctx, key, redisKeyExpiryTime) + if err := res.Err(); err != nil { + return fmt.Errorf("livelog/redis: could not set expiry for key %s", key) + } + + return nil +} + +// Delete deletes a stream +func (r *redisStream) Delete(ctx context.Context, id int64) error { + key := redisStreamPrefix + strconv.FormatInt(id, 10) + + if err := r._exists(ctx, key); err != nil { + return err + } + + deleteResp := r.client.Del(ctx, key) + if err := deleteResp.Err(); err != nil { + return fmt.Errorf("livelog/redis: could not delete stream for step %d", id) + } + + return nil +} + +// Write writes information into the Redis stream +func (r *redisStream) Write(ctx context.Context, id int64, line *core.Line) error { + key := redisStreamPrefix + strconv.FormatInt(id, 10) + + if err := r._exists(ctx, key); err != nil { + return err + } + + lineJsonData, _ := json.Marshal(line) + addResp := r.client.XAdd(ctx, &redis.XAddArgs{ + Stream: key, + ID: "*", // auto-generate a unique incremental ID + MaxLen: bufferSize, + Approx: true, + Values: map[string]interface{}{redisEntryKey: lineJsonData}, + }) + if err := addResp.Err(); err != nil { + return err + } + + return nil +} + +// Tail returns back all the lines in the stream. +func (r *redisStream) Tail(ctx context.Context, id int64) (<-chan *core.Line, <-chan error) { + key := redisStreamPrefix + strconv.FormatInt(id, 10) + + if err := r._exists(ctx, key); err != nil { + return nil, nil + } + + chLines := make(chan *core.Line, bufferSize) + chErr := make(chan error, 1) + + go func() { + defer close(chErr) + defer close(chLines) + timeout := time.After(redisTailMaxTime) // polling should not last for longer than tailMaxTime + + // Keep reading from the stream and writing to the channel + lastID := "0" + + for { + select { + case <-ctx.Done(): + return + case <-timeout: + return + default: + readResp := r.client.XRead(ctx, &redis.XReadArgs{ + Streams: append([]string{key}, lastID), + Block: redisPollTime, // periodically check for ctx.Done + }) + if readResp.Err() != nil && readResp.Err() != redis.Nil { // readResp.Err() is sometimes set to "redis: nil" instead of nil + chErr <- readResp.Err() + return + } + + for _, msg := range readResp.Val() { + messages := msg.Messages + if len(messages) > 0 { + lastID = messages[len(messages)-1].ID + } else { // should not happen + return + } + + for _, message := range messages { + values := message.Values + if val, ok := values[redisEntryKey]; ok { + var line *core.Line + if err := json.Unmarshal([]byte(val.(string)), &line); err != nil { + continue // ignore errors in the stream + } + chLines <- line + } + } + } + } + } + }() + + return chLines, chErr +} + +// Info returns info about log streams present in redis +func (r *redisStream) Info(ctx context.Context) (info *core.LogStreamInfo) { + info = &core.LogStreamInfo{ + Streams: make(map[int64]int), + } + + keysResp := r.client.Keys(ctx, redisStreamPrefix+"*") + if err := keysResp.Err(); err != nil { + return + } + + for _, key := range keysResp.Val() { + ids := key[len(redisStreamPrefix):] + id, err := strconv.ParseInt(ids, 10, 64) + if err != nil { + continue + } + + lenResp := r.client.XLen(ctx, key) + if err := lenResp.Err(); err != nil { + continue + } + + size := int(lenResp.Val()) + + info.Streams[id] = size + } + + return +} + +func (r *redisStream) _exists(ctx context.Context, key string) error { + exists := r.client.Exists(ctx, key) + if exists.Err() != nil || exists.Val() == 0 { + return fmt.Errorf("livelog/redis: log stream %s not found", key) + } + + return nil +} diff --git a/livelog/streamer.go b/livelog/streamer.go new file mode 100644 index 000000000..09f0b166a --- /dev/null +++ b/livelog/streamer.go @@ -0,0 +1,94 @@ +// Copyright 2019 Drone IO, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package livelog + +import ( + "context" + "errors" + "sync" + + "github.com/drone/drone/core" +) + +// error returned when a stream is not registered with +// the streamer. +var errStreamNotFound = errors.New("stream: not found") + +type streamer struct { + sync.Mutex + + streams map[int64]*stream +} + +// New returns a new in-memory log streamer. +func newStreamer() core.LogStream { + return &streamer{ + streams: make(map[int64]*stream), + } +} + +func (s *streamer) Create(ctx context.Context, id int64) error { + s.Lock() + s.streams[id] = newStream() + s.Unlock() + return nil +} + +func (s *streamer) Delete(ctx context.Context, id int64) error { + s.Lock() + stream, ok := s.streams[id] + if ok { + delete(s.streams, id) + } + s.Unlock() + if !ok { + return errStreamNotFound + } + return stream.close() +} + +func (s *streamer) Write(ctx context.Context, id int64, line *core.Line) error { + s.Lock() + stream, ok := s.streams[id] + s.Unlock() + if !ok { + return errStreamNotFound + } + return stream.write(line) +} + +func (s *streamer) Tail(ctx context.Context, id int64) (<-chan *core.Line, <-chan error) { + s.Lock() + stream, ok := s.streams[id] + s.Unlock() + if !ok { + return nil, nil + } + return stream.subscribe(ctx) +} + +func (s *streamer) Info(ctx context.Context) *core.LogStreamInfo { + s.Lock() + defer s.Unlock() + info := &core.LogStreamInfo{ + Streams: map[int64]int{}, + } + for id, stream := range s.streams { + stream.Lock() + info.Streams[id] = len(stream.list) + stream.Unlock() + } + return info +} diff --git a/livelog/livelog_test.go b/livelog/streamer_test.go similarity index 93% rename from livelog/livelog_test.go rename to livelog/streamer_test.go index 4df79e51e..df7f5f84e 100644 --- a/livelog/livelog_test.go +++ b/livelog/streamer_test.go @@ -17,7 +17,7 @@ import ( ) func TestStreamer(t *testing.T) { - s := New().(*streamer) + s := newStreamer().(*streamer) err := s.Create(context.Background(), 1) if err != nil { t.Error(err) @@ -57,7 +57,7 @@ func TestStreamer(t *testing.T) { } func TestStreamerDelete(t *testing.T) { - s := New().(*streamer) + s := newStreamer().(*streamer) err := s.Create(context.Background(), 1) if err != nil { t.Error(err) @@ -75,7 +75,7 @@ func TestStreamerDelete(t *testing.T) { } func TestStreamerDeleteErr(t *testing.T) { - s := New() + s := newStreamer() err := s.Delete(context.Background(), 1) if err != errStreamNotFound { t.Errorf("Want errStreamNotFound") @@ -83,7 +83,7 @@ func TestStreamerDeleteErr(t *testing.T) { } func TestStreamerWriteErr(t *testing.T) { - s := New() + s := newStreamer() err := s.Write(context.Background(), 1, &core.Line{}) if err != errStreamNotFound { t.Errorf("Want errStreamNotFound") @@ -91,7 +91,7 @@ func TestStreamerWriteErr(t *testing.T) { } func TestStreamTailNotFound(t *testing.T) { - s := New() + s := newStreamer() outc, errc := s.Tail(context.Background(), 0) if outc != nil && errc != nil { t.Errorf("Expect nil channel when stream not found") @@ -99,7 +99,7 @@ func TestStreamTailNotFound(t *testing.T) { } func TestStreamerInfo(t *testing.T) { - s := New().(*streamer) + s := newStreamer().(*streamer) s.streams[1] = &stream{list: map[*subscriber]struct{}{{}: struct{}{}, {}: struct{}{}}} s.streams[2] = &stream{list: map[*subscriber]struct{}{{}: struct{}{}}} s.streams[3] = &stream{list: map[*subscriber]struct{}{}} diff --git a/pubsub/pubsub.go b/pubsub/pubsub.go index 201f42e01..4c22b580c 100644 --- a/pubsub/pubsub.go +++ b/pubsub/pubsub.go @@ -22,10 +22,10 @@ import ( // New creates a new publish subscriber. If Redis client passed as parameter is not nil it uses // a Redis implementation, otherwise it uses an in-memory implementation. -func New(rdb *redis.Client) (core.Pubsub, error) { +func New(rdb *redis.Client) core.Pubsub { if rdb != nil { return newRedis(rdb) } - return newHub(), nil + return newHub() } diff --git a/pubsub/redis.go b/pubsub/redis.go index 70d9af907..8a2b2b36e 100644 --- a/pubsub/redis.go +++ b/pubsub/redis.go @@ -26,6 +26,12 @@ import ( "github.com/go-redis/redis/v8" ) +func newRedis(rdb *redis.Client) core.Pubsub { + return &hubRedis{ + rdb: rdb, + } +} + const channelPubSub = "drone-events" type hubRedis struct { @@ -70,7 +76,7 @@ func (h *hubRedis) Subscribe(ctx context.Context) (<-chan *core.Message, <-chan select { case m, ok := <-ch: if !ok { - errCh <- fmt.Errorf("redis pubsub channel=%s closed", channelPubSub) + errCh <- fmt.Errorf("pubsub/redis: channel=%s closed", channelPubSub) return } @@ -79,7 +85,7 @@ func (h *hubRedis) Subscribe(ctx context.Context) (<-chan *core.Message, <-chan if err != nil { // This is a "should not happen" situation, // because messages are encoded as json above in Publish(). - _, _ = fmt.Fprintf(os.Stderr, "error@pubsub: failed to unmarshal a message. %s\n", err) + _, _ = fmt.Fprintf(os.Stderr, "pubsub/redis: failed to unmarshal a message. %s\n", err) continue } @@ -100,13 +106,13 @@ func (h *hubRedis) Subscribers() (int, error) { v, err := h.rdb.Do(ctx, "pubsub", "numsub", channelPubSub).Result() if err != nil { - err = fmt.Errorf("error@pubsub: failed to get number of subscribers. %w", err) + err = fmt.Errorf("pubsub/redis: failed to get number of subscribers. %w", err) return 0, err } values, ok := v.([]interface{}) // the result should be: [, ] if !ok || len(values) != 2 { - err = fmt.Errorf("error@pubsub: failed to extarct number of subscribers from: %v", values) + err = fmt.Errorf("pubsub/redis: failed to extarct number of subscribers from: %v", values) return 0, err } @@ -124,21 +130,7 @@ func (h *hubRedis) Subscribers() (int, error) { case uint64: return int(n), nil default: - err = fmt.Errorf("error@pubsub: unsupported type for number of subscribers: %T", values[1]) + err = fmt.Errorf("pubsub/redis: unsupported type for number of subscribers: %T", values[1]) return 0, err } } - -func newRedis(rdb *redis.Client) (ps core.Pubsub, err error) { - _, err = rdb.Ping(context.Background()).Result() - if err != nil { - err = fmt.Errorf("redis not accessibe: %w", err) - return - } - - ps = &hubRedis{ - rdb: rdb, - } - - return -}