From ff7013334168752e7c559137ffc3e1b1e2361730 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marko=20Ga=C4=87e=C5=A1a?= Date: Thu, 22 Jul 2021 18:56:41 +0200 Subject: [PATCH] added error as return value for pubsub.Subscribers() --- core/pubsub.go | 2 +- handler/api/system/stats.go | 8 +++++- mock/mock_gen.go | 5 ++-- pubsub/hub.go | 4 +-- pubsub/hub_test.go | 8 +++++- pubsub/redis.go | 54 ++++++++++++++++++++++++------------- 6 files changed, 56 insertions(+), 25 deletions(-) diff --git a/core/pubsub.go b/core/pubsub.go index c1e2d961e..59e4d97a7 100644 --- a/core/pubsub.go +++ b/core/pubsub.go @@ -33,5 +33,5 @@ type Pubsub interface { Subscribe(context.Context) (<-chan *Message, <-chan error) // Subscribers returns a count of subscribers. - Subscribers() int + Subscribers() (int, error) } diff --git a/handler/api/system/stats.go b/handler/api/system/stats.go index 4466a8118..db5a429dc 100644 --- a/handler/api/system/stats.go +++ b/handler/api/system/stats.go @@ -145,7 +145,13 @@ func HandleStats( // Event Stats // - stats.Events.Subscribers = bus.Subscribers() + stats.Events.Subscribers, err = bus.Subscribers() + if err != nil { + render.InternalError(w, err) + logger.FromRequest(r).WithError(err). + Warnln("stats: cannot get number of subscribers") + return + } // // Stream Stats diff --git a/mock/mock_gen.go b/mock/mock_gen.go index 73d3c2972..b630a3ddf 100644 --- a/mock/mock_gen.go +++ b/mock/mock_gen.go @@ -67,11 +67,12 @@ func (mr *MockPubsubMockRecorder) Subscribe(arg0 interface{}) *gomock.Call { } // Subscribers mocks base method. -func (m *MockPubsub) Subscribers() int { +func (m *MockPubsub) Subscribers() (int, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Subscribers") ret0, _ := ret[0].(int) - return ret0 + ret1, _ := ret[1].(error) + return ret0, ret1 } // Subscribers indicates an expected call of Subscribers. diff --git a/pubsub/hub.go b/pubsub/hub.go index 3445e7bc1..42143974d 100644 --- a/pubsub/hub.go +++ b/pubsub/hub.go @@ -65,9 +65,9 @@ func (h *hub) Subscribe(ctx context.Context) (<-chan *core.Message, <-chan error return s.handler, errc } -func (h *hub) Subscribers() int { +func (h *hub) Subscribers() (int, error) { h.Lock() c := len(h.subs) h.Unlock() - return c + return c, nil } diff --git a/pubsub/hub_test.go b/pubsub/hub_test.go index 2c4c4e145..866eb888d 100644 --- a/pubsub/hub_test.go +++ b/pubsub/hub_test.go @@ -21,7 +21,13 @@ func TestBus(t *testing.T) { p := newHub() events, errc := p.Subscribe(ctx) - if got, want := p.Subscribers(), 1; got != want { + got, err := p.Subscribers() + if err != nil { + t.Errorf("Test failed with an error: %s", err.Error()) + return + } + + if want := 1; got != want { t.Errorf("Want %d subscribers, got %d", want, got) } diff --git a/pubsub/redis.go b/pubsub/redis.go index c322b1181..70d9af907 100644 --- a/pubsub/redis.go +++ b/pubsub/redis.go @@ -18,6 +18,7 @@ import ( "context" "encoding/json" "fmt" + "os" "time" "github.com/drone/drone/core" @@ -68,18 +69,22 @@ func (h *hubRedis) Subscribe(ctx context.Context) (<-chan *core.Message, <-chan for { select { case m, ok := <-ch: - if ok { - message := &core.Message{} - err = json.Unmarshal([]byte(m.Payload), message) - if err != nil { - fmt.Printf("error@pubsub: failed to unmarshal a message. %s\n", err) - continue - } - messageCh <- message - } else { + if !ok { errCh <- fmt.Errorf("redis pubsub channel=%s closed", channelPubSub) return } + + message := &core.Message{} + err = json.Unmarshal([]byte(m.Payload), message) + if err != nil { + // This is a "should not happen" situation, + // because messages are encoded as json above in Publish(). + _, _ = fmt.Fprintf(os.Stderr, "error@pubsub: failed to unmarshal a message. %s\n", err) + continue + } + + messageCh <- message + case <-ctx.Done(): return } @@ -89,26 +94,39 @@ func (h *hubRedis) Subscribe(ctx context.Context) (<-chan *core.Message, <-chan return messageCh, errCh } -func (h *hubRedis) Subscribers() int { +func (h *hubRedis) Subscribers() (int, error) { ctx, cancelFunc := context.WithTimeout(context.Background(), 5*time.Second) defer cancelFunc() v, err := h.rdb.Do(ctx, "pubsub", "numsub", channelPubSub).Result() if err != nil { - fmt.Printf("error@pubsub: failed to get number of subscribers. %s\n", err) - return -1 + err = fmt.Errorf("error@pubsub: failed to get number of subscribers. %w", err) + return 0, err } - values, ok := v.([]interface{}) // the result should be: [, ] + values, ok := v.([]interface{}) // the result should be: [, ] if !ok || len(values) != 2 { - return 0 + err = fmt.Errorf("error@pubsub: failed to extarct number of subscribers from: %v", values) + return 0, err } - if subscriberCount, ok := values[1].(int64); ok { - return int(subscriberCount) + switch n := values[1].(type) { + case int: + return n, nil + case uint: + return int(n), nil + case int32: + return int(n), nil + case uint32: + return int(n), nil + case int64: + return int(n), nil + case uint64: + return int(n), nil + default: + err = fmt.Errorf("error@pubsub: unsupported type for number of subscribers: %T", values[1]) + return 0, err } - - return 0 } func newRedis(rdb *redis.Client) (ps core.Pubsub, err error) {