drone/pubsub/inmem.go

231 lines
5.3 KiB
Go

// Copyright 2023 Harness, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package pubsub
import (
"context"
"errors"
"sync"
"time"
"github.com/rs/zerolog/log"
"golang.org/x/exp/slices"
)
var (
ErrClosed = errors.New("pubsub: subscriber is closed")
)
type InMemory struct {
config Config
mutex sync.Mutex
registry []*inMemorySubscriber
}
// NewInMemory create an instance of memory pubsub implementation.
func NewInMemory(options ...Option) *InMemory {
config := Config{
App: "app",
Namespace: "default",
HealthInterval: 3 * time.Second,
SendTimeout: 60,
ChannelSize: 100,
}
for _, f := range options {
f.Apply(&config)
}
return &InMemory{
config: config,
registry: make([]*inMemorySubscriber, 0, 16),
}
}
// Subscribe consumer to process the event with payload.
func (r *InMemory) Subscribe(
ctx context.Context,
topic string,
handler func(payload []byte) error,
options ...SubscribeOption,
) Consumer {
r.mutex.Lock()
defer r.mutex.Unlock()
config := SubscribeConfig{
topics: make([]string, 0, 8),
app: r.config.App,
namespace: r.config.Namespace,
sendTimeout: r.config.SendTimeout,
channelSize: r.config.ChannelSize,
}
for _, f := range options {
f.Apply(&config)
}
// create subscriber and map it to the registry
subscriber := &inMemorySubscriber{
config: &config,
handler: handler,
}
config.topics = append(config.topics, topic)
subscriber.topics = subscriber.formatTopics(config.topics...)
// start subscriber
go subscriber.start(ctx)
// register subscriber
r.registry = append(r.registry, subscriber)
return subscriber
}
// Publish event to message broker with payload.
func (r *InMemory) Publish(ctx context.Context, topic string, payload []byte, opts ...PublishOption) error {
if len(r.registry) == 0 {
log.Ctx(ctx).Warn().Msg("in pubsub Publish: no subscribers registered")
return nil
}
pubConfig := PublishConfig{
app: r.config.App,
namespace: r.config.Namespace,
}
for _, f := range opts {
f.Apply(&pubConfig)
}
topic = formatTopic(pubConfig.app, pubConfig.namespace, topic)
wg := sync.WaitGroup{}
for _, sub := range r.registry {
if slices.Contains(sub.topics, topic) && !sub.isClosed() {
wg.Add(1)
go func(subscriber *inMemorySubscriber) {
defer wg.Done()
// timer is based on subscriber data
t := time.NewTimer(subscriber.config.sendTimeout)
defer t.Stop()
select {
case <-ctx.Done():
return
case subscriber.channel <- payload:
log.Ctx(ctx).Trace().Msgf("in pubsub Publish: message %v sent to topic %s", string(payload), topic)
case <-t.C:
// channel is full for topic (message is dropped)
log.Ctx(ctx).Warn().Msgf("in pubsub Publish: %s topic is full for %s (message is dropped)",
topic, subscriber.config.sendTimeout)
}
}(sub)
}
}
// Wait for all subscribers to complete
// Otherwise, we might fail notifying some subscribers due to context completion.
wg.Wait()
return nil
}
func (r *InMemory) Close(_ context.Context) error {
for _, subscriber := range r.registry {
if err := subscriber.Close(); err != nil {
return err
}
}
return nil
}
type inMemorySubscriber struct {
config *SubscribeConfig
handler func([]byte) error
channel chan []byte
once sync.Once
mutex sync.RWMutex
topics []string
closed bool
}
func (s *inMemorySubscriber) start(ctx context.Context) {
s.channel = make(chan []byte, s.config.channelSize)
for {
select {
case <-ctx.Done():
return
case msg, ok := <-s.channel:
if !ok {
return
}
if err := s.handler(msg); err != nil {
// TODO: bump err to caller
log.Ctx(ctx).Err(err).Msgf("in pubsub start: error while running handler for topic")
}
}
}
}
func (s *inMemorySubscriber) Subscribe(_ context.Context, topics ...string) error {
s.mutex.RLock()
defer s.mutex.RUnlock()
topics = s.formatTopics(topics...)
for _, ch := range topics {
if slices.Contains(s.topics, ch) {
continue
}
s.topics = append(s.topics, ch)
}
return nil
}
func (s *inMemorySubscriber) Unsubscribe(_ context.Context, topics ...string) error {
s.mutex.RLock()
defer s.mutex.RUnlock()
topics = s.formatTopics(topics...)
for i, ch := range topics {
if slices.Contains(s.topics, ch) {
s.topics[i] = s.topics[len(s.topics)-1]
s.topics = s.topics[:len(s.topics)-1]
}
}
return nil
}
func (s *inMemorySubscriber) Close() error {
s.mutex.Lock()
defer s.mutex.Unlock()
if s.closed {
return ErrClosed
}
s.closed = true
s.once.Do(func() {
close(s.channel)
})
return nil
}
func (s *inMemorySubscriber) isClosed() bool {
s.mutex.Lock()
defer s.mutex.Unlock()
return s.closed
}
func (s *inMemorySubscriber) formatTopics(topics ...string) []string {
result := make([]string, len(topics))
for i, topic := range topics {
result[i] = formatTopic(s.config.app, s.config.namespace, topic)
}
return result
}