drone/stream/redis_consumer.go

580 lines
17 KiB
Go

// Copyright 2023 Harness, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package stream
import (
"context"
"errors"
"fmt"
"net"
"runtime/debug"
"strings"
"sync"
"time"
"github.com/go-redis/redis/v8"
)
// RedisConsumer provides functionality to process Redis streams as part of a consumer group.
type RedisConsumer struct {
rdb redis.UniversalClient
// namespace specifies the namespace of the keys - any stream key will be prefixed with it
namespace string
// groupName specifies the name of the consumer group.
groupName string
// consumerName specifies the name of the consumer.
consumerName string
// Config is the generic consumer configuration.
Config ConsumerConfig
// streams is a map of all registered streams and their handlers.
streams map[string]handler
isStarted bool
messageQueue chan message
errorCh chan error
infoCh chan string
}
// NewRedisConsumer creates new Redis stream consumer. Streams are read with XREADGROUP.
// It returns channels of info messages and errors. The caller should not block on these channels for too long.
// These channels are provided mainly for logging.
func NewRedisConsumer(rdb redis.UniversalClient, namespace string,
groupName string, consumerName string) (*RedisConsumer, error) {
if groupName == "" {
return nil, errors.New("groupName can't be empty")
}
if consumerName == "" {
return nil, errors.New("consumerName can't be empty")
}
const queueCapacity = 500
const errorChCapacity = 64
const infoChCapacity = 64
return &RedisConsumer{
rdb: rdb,
namespace: namespace,
groupName: groupName,
consumerName: consumerName,
streams: map[string]handler{},
Config: defaultConfig,
isStarted: false,
messageQueue: make(chan message, queueCapacity),
errorCh: make(chan error, errorChCapacity),
infoCh: make(chan string, infoChCapacity),
}, nil
}
func (c *RedisConsumer) Configure(opts ...ConsumerOption) {
if c.isStarted {
return
}
for _, opt := range opts {
opt.apply(&c.Config)
}
}
func (c *RedisConsumer) Register(streamID string, fn HandlerFunc, opts ...HandlerOption) error {
if c.isStarted {
return ErrAlreadyStarted
}
if streamID == "" {
return errors.New("streamID can't be empty")
}
if fn == nil {
return errors.New("fn can't be empty")
}
// transpose streamID to key namespace - no need to keep inner streamID
transposedStreamID := transposeStreamID(c.namespace, streamID)
if _, ok := c.streams[transposedStreamID]; ok {
return fmt.Errorf("consumer is already registered for '%s' (redis stream '%s')", streamID, transposedStreamID)
}
// create final config for handler
config := c.Config.DefaultHandlerConfig
for _, opt := range opts {
opt.apply(&config)
}
c.streams[transposedStreamID] = handler{
handle: fn,
config: config,
}
return nil
}
func (c *RedisConsumer) Start(ctx context.Context) error {
if c.isStarted {
return ErrAlreadyStarted
}
if len(c.streams) == 0 {
return errors.New("no streams registered")
}
var err error
// Check if Redis is accessible, fail if it's not.
err = c.rdb.Ping(ctx).Err()
if err != nil && !errors.Is(err, redis.Nil) {
return fmt.Errorf("failed to ping redis server: %w", err)
}
// Create consumer group for all streams, creates streams if they don't exist.
err = c.createGroupForAllStreams(ctx)
if err != nil {
return err
}
// mark as started before starting go routines (can't error out from here)
c.isStarted = true
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
c.removeStaleConsumers(ctx, time.Hour)
// launch redis reader, it will finish when the ctx is done
c.reader(ctx)
}()
wg.Add(1)
go func() {
defer wg.Done()
// launch redis message reclaimer, it will finish when the ctx is done.
// IMPORTANT: Keep reclaim interval small for now to support faster retries => higher load on redis!
// TODO: Make retries local by default with opt-in cross-instance retries.
// https://harness.atlassian.net/browse/SCM-83
const reclaimInterval = 10 * time.Second
c.reclaimer(ctx, reclaimInterval)
}()
for i := 0; i < c.Config.Concurrency; i++ {
wg.Add(1)
go func() {
defer wg.Done()
// launch redis message consumer, it will finish when the ctx is done
c.consumer(ctx)
}()
}
go func() {
// wait for all go routines to complete
wg.Wait()
// close all channels
close(c.messageQueue)
close(c.errorCh)
close(c.infoCh)
}()
return nil
}
// reader method reads a Redis stream with XREADGROUP command to retrieve messages.
// The messages are then sent to a go channel passed as parameter for processing.
// If the stream already contains unassigned messages, those we'll be returned.
// Otherwise XREADGROUP blocks until either a new message arrives or block timeout happens.
// The method terminates when the provided context finishes.
//
//nolint:funlen,gocognit // refactor if needed
func (c *RedisConsumer) reader(ctx context.Context) {
delays := []time.Duration{1 * time.Millisecond, 5 * time.Second, 15 * time.Second, 30 * time.Second, time.Minute}
consecutiveFailures := 0
// pre-generate streams argument for XReadGroup
// NOTE: for the first call ever we want to get the history of the consumer (to allow for seamless restarts)
// ASSUMPTION: only one consumer with a given groupName+consumerName is running at a time
scanHistory := true
streamLen := len(c.streams)
streamsArg := make([]string, 2*streamLen)
i := 0
for streamID := range c.streams {
streamsArg[i] = streamID
streamsArg[streamLen+i] = "0"
i++
}
for {
var delay time.Duration
if consecutiveFailures < len(delays) {
delay = delays[consecutiveFailures]
} else {
delay = delays[len(delays)-1]
}
readTimer := time.NewTimer(delay)
select {
case <-ctx.Done():
readTimer.Stop()
return
case <-readTimer.C:
const count = 100
resReadStream, err := c.rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: c.groupName,
Consumer: c.consumerName,
Streams: streamsArg,
Count: count,
Block: 5 * time.Minute,
}).Result()
// if context is canceled, continue and next iteration will exit cleanly
if errors.Is(err, context.Canceled) {
continue
}
// network timeout - log it and retry
var errNet net.Error
if ok := errors.As(err, &errNet); ok && errNet.Timeout() {
c.pushError(fmt.Errorf("encountered network failure: %w", errNet))
consecutiveFailures++
continue
}
// group doesn't exist anymore - recreate it
if err != nil && strings.HasPrefix(err.Error(), "NOGROUP") {
cErr := c.createGroupForAllStreams(ctx)
if cErr != nil {
c.pushError(fmt.Errorf("failed to re-create group for at least one stream: %w", err))
consecutiveFailures++
} else {
c.pushInfo(fmt.Sprintf("re-created group for all streams where it got removed, original error: %s",
err))
consecutiveFailures = 0
}
continue
}
// any other error we handle generically
if err != nil && !errors.Is(err, redis.Nil) {
consecutiveFailures++
c.pushError(fmt.Errorf("failed to read redis streams %v (consecutive fails: %d): %w",
streamsArg, consecutiveFailures, err))
continue
}
// check if we are done with scanning the history of all streams
if scanHistory {
scanHistory = false
// Getting history always returns all streams in the same order as queried
// (even a stream that doesn't have any history left, in that case redis returns an empty slice)
// Thus, we can use a simple incrementing index to get the streamArg for a stream in the response
x := 0
for _, stream := range resReadStream {
// If the stream had messages in the history, continue scanning after the latest read message.
if len(stream.Messages) > 0 {
scanHistory = true
streamsArg[streamLen+x] = stream.Messages[len(stream.Messages)-1].ID
}
x++
}
if !scanHistory {
c.pushInfo("completed scan of history")
// Update stream args to read latest messages for all streams
for j := 0; j < streamLen; j++ {
streamsArg[streamLen+j] = ">"
}
continue
}
}
// reset fail count
consecutiveFailures = 0
// if no messages were read we can skip iteration
if len(resReadStream) == 0 {
continue
}
// retrieve all messages across all streams and put them into the message queue
for _, stream := range resReadStream {
for _, m := range stream.Messages {
c.messageQueue <- message{
streamID: stream.Stream,
id: m.ID,
values: m.Values,
}
}
}
}
}
}
// reclaimer periodically inspects pending messages with XPENDING command.
// If a message sits longer than processingTimeout, we attempt to reclaim the message for this consumer
// and enqueue it for processing.
//
//nolint:funlen,gocognit // refactor if needed
func (c *RedisConsumer) reclaimer(ctx context.Context, reclaimInterval time.Duration) {
reclaimTimer := time.NewTimer(reclaimInterval)
defer func() {
reclaimTimer.Stop()
}()
const (
baseCount = 16
maxCount = 1024
)
// the minimum message ID which we are querying for.
// redis treats "-" as smaller than any valid message ID
start := "-"
// the maximum message ID which we are querying for.
// redis treats "+" as bigger than any valid message ID
end := "+"
count := baseCount
for {
select {
case <-ctx.Done():
return
case <-reclaimTimer.C:
for streamID, handler := range c.streams {
resPending, errPending := c.rdb.XPendingExt(ctx, &redis.XPendingExtArgs{
Stream: streamID,
Group: c.groupName,
Start: start,
End: end,
Idle: handler.config.idleTimeout,
Count: int64(count),
}).Result()
if errPending != nil && !errors.Is(errPending, redis.Nil) {
c.pushError(fmt.Errorf("failed to fetch pending messages: %w", errPending))
continue
}
if len(resPending) == 0 {
continue
}
// It's safe to change start of the requested range for the next iteration to oldest message.
start = resPending[0].ID
for _, resMessage := range resPending {
if resMessage.RetryCount > int64(handler.config.maxRetries) {
// Retry count gets increased after every XCLAIM.
// Large retry count might mean there is something wrong with the message, so we'll XACK it.
// WARNING this will discard the message!
errAck := c.rdb.XAck(ctx, streamID, c.groupName, resMessage.ID).Err()
if errAck != nil {
c.pushError(fmt.Errorf(
"failed to force acknowledge (discard) message '%s' (Retries: %d) in stream '%s': %w",
resMessage.ID, resMessage.RetryCount, streamID, errAck))
} else {
retryCount := resMessage.RetryCount - 1 // redis is counting this execution as retry
c.pushError(fmt.Errorf(
"force acknowledged (discarded) message '%s' (Retries: %d) in stream '%s'",
resMessage.ID, retryCount, streamID))
}
continue
}
// Otherwise, claim the message so we can retry it.
claimedMessages, errClaim := c.rdb.XClaim(ctx, &redis.XClaimArgs{
Stream: streamID,
Group: c.groupName,
Consumer: c.consumerName,
MinIdle: handler.config.idleTimeout,
Messages: []string{resMessage.ID},
}).Result()
if errors.Is(errClaim, redis.Nil) {
// Receiving redis.Nil here means the message is removed from the stream (because of MAXLEN).
// The only option is to acknowledge it with XACK.
errAck := c.rdb.XAck(ctx, streamID, c.groupName, resMessage.ID).Err()
if errAck != nil {
c.pushError(fmt.Errorf("failed to acknowledge failed message '%s' in stream '%s': %w",
resMessage.ID, streamID, errAck))
} else {
c.pushInfo(fmt.Sprintf("acknowledged failed message '%s' in stream '%s'",
resMessage.ID, streamID))
}
continue
}
if errClaim != nil {
// This can happen if two consumers try to claim the same message at once.
// One would succeed and the other will get an error.
c.pushError(fmt.Errorf("failed to claim message '%s' in stream '%s': %w",
resMessage.ID, streamID, errClaim))
continue
}
// This is not expected to happen (message will be retried or eventually discarded)
if len(claimedMessages) == 0 {
c.pushError(fmt.Errorf(
"no error when claiming message '%s' in stream '%s', but redis returned no message",
resMessage.ID, streamID))
continue
}
// we claimed only one message id so there is only one message in the slice
claimedMessage := claimedMessages[0]
c.messageQueue <- message{
streamID: streamID,
id: claimedMessage.ID,
values: claimedMessage.Values,
}
}
// If number of messages that we got is equal to the number that we requested
// it means that there's a lot for processing, so we'll increase number of messages
// that we'll pull in the next iteration.
if len(resPending) == count {
count *= 2
if count > maxCount {
count = maxCount
}
} else {
count = baseCount
}
}
reclaimTimer.Reset(reclaimInterval)
}
}
}
// consumer method consumes messages coming from Redis. The method terminates when messageQueue channel closes.
func (c *RedisConsumer) consumer(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case m := <-c.messageQueue:
if m.id == "" {
// id should never be empty, if it is then the channel is closed
return
}
handler, ok := c.streams[m.streamID]
if !ok {
// we don't want to ack the message
// maybe someone else can claim and process it (worst case it expires)
c.pushError(fmt.Errorf("received message '%s' in stream '%s' that doesn't belong to us, skip",
m.id, m.streamID))
continue
}
err := func() (err error) {
// Ensure that handlers don't cause panic.
defer func() {
if r := recover(); r != nil {
c.pushError(fmt.Errorf("PANIC when processing message '%s' in stream '%s':\n%s",
m.id, m.streamID, debug.Stack()))
}
}()
return handler.handle(ctx, m.id, m.values)
}()
if err != nil {
c.pushError(fmt.Errorf("failed to process message '%s' in stream '%s': %w", m.id, m.streamID, err))
continue
}
err = c.rdb.XAck(ctx, m.streamID, c.groupName, m.id).Err()
if err != nil {
c.pushError(fmt.Errorf("failed to acknowledge message '%s' in stream '%s': %w", m.id, m.streamID, err))
continue
}
}
}
}
func (c *RedisConsumer) removeStaleConsumers(ctx context.Context, maxAge time.Duration) {
for streamID := range c.streams {
// Fetch all consumers for this stream and group.
resConsumers, err := c.rdb.XInfoConsumers(ctx, streamID, c.groupName).Result()
if err != nil {
c.pushError(fmt.Errorf("failed to read consumers for stream '%s': %w", streamID, err))
return
}
// Delete old consumers, but only if they don't have pending messages.
for _, resConsumer := range resConsumers {
age := time.Duration(resConsumer.Idle) * time.Millisecond
if resConsumer.Pending > 0 || age < maxAge {
continue
}
err = c.rdb.XGroupDelConsumer(ctx, streamID, c.groupName, resConsumer.Name).Err()
if err != nil {
c.pushError(fmt.Errorf(
"failed to remove stale consumer '%s' from group '%s' (age '%s') for stream '%s': %w",
resConsumer.Name, c.groupName, age, streamID, err))
continue
}
c.pushInfo(fmt.Sprintf("removed stale consumer '%s' from group '%s' (age '%s') for stream '%s'",
resConsumer.Name, c.groupName, age, streamID))
}
}
}
func (c *RedisConsumer) pushError(err error) {
select {
case c.errorCh <- err:
default:
}
}
func (c *RedisConsumer) pushInfo(s string) {
select {
case c.infoCh <- s:
default:
}
}
func (c *RedisConsumer) Errors() <-chan error { return c.errorCh }
func (c *RedisConsumer) Infos() <-chan string { return c.infoCh }
func (c *RedisConsumer) createGroupForAllStreams(ctx context.Context) error {
for streamID := range c.streams {
err := createGroup(ctx, c.rdb, streamID, c.groupName)
if err != nil {
return err
}
}
return nil
}
func createGroup(ctx context.Context, rdb redis.UniversalClient, streamID string, groupName string) error {
// Creates a new consumer group that starts receiving messages from now on.
// Existing messges in the queue are ignored (we don't want to overload a group with old messages)
// For more details of the XGROUPCREATE api see https://redis.io/commands/xgroup-create/
err := rdb.XGroupCreateMkStream(ctx, streamID, groupName, "$").Err()
if err != nil && !strings.HasPrefix(err.Error(), "BUSYGROUP") {
return fmt.Errorf("failed to create consumer group '%s' for stream '%s': %w", groupName, streamID, err)
}
return nil
}