// Copyright 2023 Harness, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package pubsub import ( "context" "fmt" "strings" "sync" "time" "github.com/go-redis/redis/v8" "github.com/rs/zerolog/log" ) type Redis struct { config Config client redis.UniversalClient mutex sync.RWMutex registry []Consumer } // NewRedis create an instance of redis PubSub implementation. func NewRedis(client redis.UniversalClient, options ...Option) *Redis { config := Config{ App: "app", Namespace: "default", HealthInterval: 3 * time.Second, SendTimeout: 60, ChannelSize: 100, } for _, f := range options { f.Apply(&config) } return &Redis{ config: config, client: client, registry: make([]Consumer, 0, 16), } } // Subscribe consumer to process the event with payload. func (r *Redis) Subscribe( ctx context.Context, topic string, handler func(payload []byte) error, options ...SubscribeOption, ) Consumer { r.mutex.Lock() defer r.mutex.Unlock() config := SubscribeConfig{ topics: make([]string, 0, 8), app: r.config.App, namespace: r.config.Namespace, healthInterval: r.config.HealthInterval, sendTimeout: r.config.SendTimeout, channelSize: r.config.ChannelSize, } for _, f := range options { f.Apply(&config) } // create subscriber and map it to the registry subscriber := &redisSubscriber{ config: &config, handler: handler, } config.topics = append(config.topics, topic) topics := subscriber.formatTopics(config.topics...) subscriber.rdb = r.client.Subscribe(ctx, topics...) // start subscriber go subscriber.start(ctx) // register subscriber r.registry = append(r.registry, subscriber) return subscriber } // Publish event topic to message broker with payload. func (r *Redis) Publish(ctx context.Context, topic string, payload []byte, opts ...PublishOption) error { pubConfig := PublishConfig{ app: r.config.App, namespace: r.config.Namespace, } for _, f := range opts { f.Apply(&pubConfig) } topic = formatTopic(pubConfig.app, pubConfig.namespace, topic) err := r.client.Publish(ctx, topic, payload).Err() if err != nil { return fmt.Errorf("failed to write to pubsub topic '%s'. Error: %w", topic, err) } return nil } func (r *Redis) Close(_ context.Context) error { for _, subscriber := range r.registry { err := subscriber.Close() if err != nil { return err } } return nil } type redisSubscriber struct { config *SubscribeConfig rdb *redis.PubSub handler func([]byte) error } func (s *redisSubscriber) start(ctx context.Context) { // Go channel which receives messages. ch := s.rdb.Channel( redis.WithChannelHealthCheckInterval(s.config.healthInterval), redis.WithChannelSendTimeout(s.config.sendTimeout), redis.WithChannelSize(s.config.channelSize), ) for { select { case <-ctx.Done(): return case msg, ok := <-ch: if !ok { log.Ctx(ctx).Debug().Msg("redis channel was closed") return } if err := s.handler([]byte(msg.Payload)); err != nil { log.Ctx(ctx).Err(err).Msg("received an error from handler function") } } } } func (s *redisSubscriber) Subscribe(ctx context.Context, topics ...string) error { err := s.rdb.Subscribe(ctx, s.formatTopics(topics...)...) if err != nil { return fmt.Errorf("subscribe failed for chanels %v with error: %w", strings.Join(topics, ","), err) } return nil } func (s *redisSubscriber) Unsubscribe(ctx context.Context, topics ...string) error { err := s.rdb.Unsubscribe(ctx, s.formatTopics(topics...)...) if err != nil { return fmt.Errorf("unsubscribe failed for chanels %v with error: %w", strings.Join(topics, ","), err) } return nil } func (s *redisSubscriber) Close() error { err := s.rdb.Close() if err != nil { return fmt.Errorf("failed while closing subscriber with error: %w", err) } return nil } func (s *redisSubscriber) formatTopics(topics ...string) []string { result := make([]string, len(topics)) for i, topic := range topics { result[i] = formatTopic(s.config.app, s.config.namespace, topic) } return result }