drone/stream/memory_consumer.go

237 lines
5.8 KiB
Go

// Copyright 2023 Harness, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package stream
import (
"context"
"errors"
"fmt"
"runtime/debug"
"sync"
"time"
)
// memoryMessage extends the message object to allow tracking retries.
type memoryMessage struct {
message
retries int64
}
// MemoryConsumer consumes streams from a MemoryBroker.
type MemoryConsumer struct {
broker *MemoryBroker
// namespace specifies the namespace of the keys - any stream key will be prefixed with it
namespace string
// groupName specifies the name of the consumer group.
groupName string
// Config is the generic consumer configuration.
Config ConsumerConfig
// streams is a map of all registered streams and their handlers.
streams map[string]handler
isStarted bool
messageQueue chan memoryMessage
errorCh chan error
infoCh chan string
}
func NewMemoryConsumer(broker *MemoryBroker, namespace string, groupName string) (*MemoryConsumer, error) {
if groupName == "" {
return nil, errors.New("groupName can't be empty")
}
const queueCapacity = 500
const errorChCapacity = 64
const infoChCapacity = 64
return &MemoryConsumer{
broker: broker,
namespace: namespace,
groupName: groupName,
streams: map[string]handler{},
Config: defaultConfig,
isStarted: false,
messageQueue: make(chan memoryMessage, queueCapacity),
errorCh: make(chan error, errorChCapacity),
infoCh: make(chan string, infoChCapacity),
}, nil
}
func (c *MemoryConsumer) Configure(opts ...ConsumerOption) {
if c.isStarted {
return
}
for _, opt := range opts {
opt.apply(&c.Config)
}
}
func (c *MemoryConsumer) Register(streamID string, fn HandlerFunc, opts ...HandlerOption) error {
if c.isStarted {
return ErrAlreadyStarted
}
if streamID == "" {
return errors.New("streamID can't be empty")
}
if fn == nil {
return errors.New("fn can't be empty")
}
// transpose streamID to key namespace - no need to keep inner streamID
transposedStreamID := transposeStreamID(c.namespace, streamID)
if _, ok := c.streams[transposedStreamID]; ok {
return fmt.Errorf("consumer is already registered for '%s' (full stream '%s')", streamID, transposedStreamID)
}
config := c.Config.DefaultHandlerConfig
for _, opt := range opts {
opt.apply(&config)
}
c.streams[transposedStreamID] = handler{
handle: fn,
config: config,
}
return nil
}
func (c *MemoryConsumer) Start(ctx context.Context) error {
if c.isStarted {
return ErrAlreadyStarted
}
if len(c.streams) == 0 {
return errors.New("no streams registered")
}
// mark as started before starting go routines (can't error out from here)
c.isStarted = true
wg := &sync.WaitGroup{}
// start routines to read messages from broker
for streamID := range c.streams {
wg.Add(1)
go func(stream string) {
defer wg.Done()
c.reader(ctx, stream)
}(streamID)
}
// start workers
for i := 0; i < c.Config.Concurrency; i++ {
wg.Add(1)
go func() {
defer wg.Done()
c.consume(ctx)
}()
}
// start cleanup routing
go func() {
// wait for all go routines to complete
wg.Wait()
close(c.messageQueue)
close(c.infoCh)
close(c.errorCh)
}()
return nil
}
// reader reads the messages of a specific stream from the broker and puts it
// into the single message queue monitored by the consumers.
func (c *MemoryConsumer) reader(ctx context.Context, streamID string) {
streamQueue := c.broker.messages(streamID, c.groupName)
for {
select {
case <-ctx.Done():
return
case m := <-streamQueue:
c.messageQueue <- memoryMessage{
message: m,
retries: 0,
}
}
}
}
func (c *MemoryConsumer) consume(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case m := <-c.messageQueue:
handler, ok := c.streams[m.streamID]
if !ok {
// we only take messages from registered streams, this should never happen.
// WARNING this will discard the message
c.pushError(fmt.Errorf("discard message with id '%s' from stream '%s' - doesn't belong to us",
m.id, m.streamID))
continue
}
err := func() (err error) {
// Ensure that handlers don't cause panic.
defer func() {
if r := recover(); r != nil {
c.pushError(fmt.Errorf("PANIC when processing message '%s' in stream '%s':\n%s",
m.id, m.streamID, debug.Stack()))
}
}()
return handler.handle(ctx, m.id, m.values)
}()
if err != nil {
c.pushError(fmt.Errorf("failed to process message with id '%s' in stream '%s' (retries: %d): %w",
m.id, m.streamID, m.retries, err))
if m.retries >= int64(handler.config.maxRetries) {
c.pushError(fmt.Errorf(
"discard message with id '%s' from stream '%s' - failed %d retries",
m.id, m.streamID, m.retries))
continue
}
// increase retry count
m.retries++
// requeue message for a retry (needs to be in a separate go func to avoid deadlock)
// IMPORTANT: this won't requeue to broker, only in this consumer's queue!
go func() {
// TODO: linear/exponential backoff relative to retry count might be good
time.Sleep(handler.config.idleTimeout)
c.messageQueue <- m
}()
}
}
}
}
func (c *MemoryConsumer) Errors() <-chan error { return c.errorCh }
func (c *MemoryConsumer) Infos() <-chan string { return c.infoCh }
func (c *MemoryConsumer) pushError(err error) {
select {
case c.errorCh <- err:
default:
}
}