mirror of https://github.com/harness/drone.git
588 lines
17 KiB
Go
588 lines
17 KiB
Go
// Copyright 2023 Harness, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package stream
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"net"
|
|
"runtime/debug"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/go-redis/redis/v8"
|
|
)
|
|
|
|
// RedisConsumer provides functionality to process Redis streams as part of a consumer group.
|
|
type RedisConsumer struct {
|
|
rdb redis.UniversalClient
|
|
// namespace specifies the namespace of the keys - any stream key will be prefixed with it
|
|
namespace string
|
|
// groupName specifies the name of the consumer group.
|
|
groupName string
|
|
// consumerName specifies the name of the consumer.
|
|
consumerName string
|
|
|
|
// Config is the generic consumer configuration.
|
|
Config ConsumerConfig
|
|
|
|
// streams is a map of all registered streams and their handlers.
|
|
streams map[string]handler
|
|
|
|
isStarted bool
|
|
messageQueue chan message
|
|
errorCh chan error
|
|
infoCh chan string
|
|
}
|
|
|
|
// NewRedisConsumer creates new Redis stream consumer. Streams are read with XREADGROUP.
|
|
// It returns channels of info messages and errors. The caller should not block on these channels for too long.
|
|
// These channels are provided mainly for logging.
|
|
func NewRedisConsumer(rdb redis.UniversalClient, namespace string,
|
|
groupName string, consumerName string) (*RedisConsumer, error) {
|
|
if groupName == "" {
|
|
return nil, errors.New("groupName can't be empty")
|
|
}
|
|
if consumerName == "" {
|
|
return nil, errors.New("consumerName can't be empty")
|
|
}
|
|
|
|
const queueCapacity = 500
|
|
const errorChCapacity = 64
|
|
const infoChCapacity = 64
|
|
|
|
return &RedisConsumer{
|
|
rdb: rdb,
|
|
namespace: namespace,
|
|
groupName: groupName,
|
|
consumerName: consumerName,
|
|
streams: map[string]handler{},
|
|
Config: defaultConfig,
|
|
isStarted: false,
|
|
messageQueue: make(chan message, queueCapacity),
|
|
errorCh: make(chan error, errorChCapacity),
|
|
infoCh: make(chan string, infoChCapacity),
|
|
}, nil
|
|
}
|
|
|
|
func (c *RedisConsumer) Configure(opts ...ConsumerOption) {
|
|
if c.isStarted {
|
|
return
|
|
}
|
|
|
|
for _, opt := range opts {
|
|
opt.apply(&c.Config)
|
|
}
|
|
}
|
|
|
|
func (c *RedisConsumer) Register(streamID string, fn HandlerFunc, opts ...HandlerOption) error {
|
|
if c.isStarted {
|
|
return ErrAlreadyStarted
|
|
}
|
|
if streamID == "" {
|
|
return errors.New("streamID can't be empty")
|
|
}
|
|
if fn == nil {
|
|
return errors.New("fn can't be empty")
|
|
}
|
|
|
|
// transpose streamID to key namespace - no need to keep inner streamID
|
|
transposedStreamID := transposeStreamID(c.namespace, streamID)
|
|
if _, ok := c.streams[transposedStreamID]; ok {
|
|
return fmt.Errorf("consumer is already registered for '%s' (redis stream '%s')", streamID, transposedStreamID)
|
|
}
|
|
|
|
// create final config for handler
|
|
config := c.Config.DefaultHandlerConfig
|
|
for _, opt := range opts {
|
|
opt.apply(&config)
|
|
}
|
|
|
|
c.streams[transposedStreamID] = handler{
|
|
handle: fn,
|
|
config: config,
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (c *RedisConsumer) Start(ctx context.Context) error {
|
|
if c.isStarted {
|
|
return ErrAlreadyStarted
|
|
}
|
|
|
|
if len(c.streams) == 0 {
|
|
return errors.New("no streams registered")
|
|
}
|
|
|
|
var err error
|
|
|
|
// Check if Redis is accessible, fail if it's not.
|
|
err = c.rdb.Ping(ctx).Err()
|
|
if err != nil && !errors.Is(err, redis.Nil) {
|
|
return fmt.Errorf("failed to ping redis server: %w", err)
|
|
}
|
|
|
|
// Create consumer group for all streams, creates streams if they don't exist.
|
|
err = c.createGroupForAllStreams(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// mark as started before starting go routines (can't error out from here)
|
|
c.isStarted = true
|
|
|
|
wg := &sync.WaitGroup{}
|
|
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
c.removeStaleConsumers(ctx, time.Hour)
|
|
// launch redis reader, it will finish when the ctx is done
|
|
c.reader(ctx)
|
|
}()
|
|
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
// launch redis message reclaimer, it will finish when the ctx is done.
|
|
// IMPORTANT: Keep reclaim interval small for now to support faster retries => higher load on redis!
|
|
// TODO: Make retries local by default with opt-in cross-instance retries.
|
|
// https://harness.atlassian.net/browse/SCM-83
|
|
const reclaimInterval = 10 * time.Second
|
|
c.reclaimer(ctx, reclaimInterval)
|
|
}()
|
|
|
|
for i := 0; i < c.Config.Concurrency; i++ {
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
// launch redis message consumer, it will finish when the ctx is done
|
|
c.consumer(ctx)
|
|
}()
|
|
}
|
|
|
|
go func() {
|
|
// wait for all go routines to complete
|
|
wg.Wait()
|
|
|
|
// close all channels
|
|
close(c.messageQueue)
|
|
close(c.errorCh)
|
|
close(c.infoCh)
|
|
}()
|
|
|
|
return nil
|
|
}
|
|
|
|
// reader method reads a Redis stream with XREADGROUP command to retrieve messages.
|
|
// The messages are then sent to a go channel passed as parameter for processing.
|
|
// If the stream already contains unassigned messages, those we'll be returned.
|
|
// Otherwise XREADGROUP blocks until either a new message arrives or block timeout happens.
|
|
// The method terminates when the provided context finishes.
|
|
//
|
|
//nolint:funlen,gocognit // refactor if needed
|
|
func (c *RedisConsumer) reader(ctx context.Context) {
|
|
delays := []time.Duration{1 * time.Millisecond, 5 * time.Second, 15 * time.Second, 30 * time.Second, time.Minute}
|
|
consecutiveFailures := 0
|
|
|
|
// pre-generate streams argument for XReadGroup
|
|
// NOTE: for the first call ever we want to get the history of the consumer (to allow for seamless restarts)
|
|
// ASSUMPTION: only one consumer with a given groupName+consumerName is running at a time
|
|
scanHistory := true
|
|
streamLen := len(c.streams)
|
|
streamsArg := make([]string, 2*streamLen)
|
|
i := 0
|
|
for streamID := range c.streams {
|
|
streamsArg[i] = streamID
|
|
streamsArg[streamLen+i] = "0"
|
|
i++
|
|
}
|
|
|
|
for {
|
|
var delay time.Duration
|
|
if consecutiveFailures < len(delays) {
|
|
delay = delays[consecutiveFailures]
|
|
} else {
|
|
delay = delays[len(delays)-1]
|
|
}
|
|
readTimer := time.NewTimer(delay)
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
readTimer.Stop()
|
|
return
|
|
|
|
case <-readTimer.C:
|
|
const count = 100
|
|
|
|
resReadStream, err := c.rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
|
|
Group: c.groupName,
|
|
Consumer: c.consumerName,
|
|
Streams: streamsArg,
|
|
Count: count,
|
|
Block: 5 * time.Minute,
|
|
}).Result()
|
|
|
|
// if context is canceled, continue and next iteration will exit cleanly
|
|
if errors.Is(err, context.Canceled) {
|
|
continue
|
|
}
|
|
|
|
// network timeout - log it and retry
|
|
var errNet net.Error
|
|
if ok := errors.As(err, &errNet); ok && errNet.Timeout() {
|
|
c.pushError(fmt.Errorf("encountered network failure: %w", errNet))
|
|
consecutiveFailures++
|
|
continue
|
|
}
|
|
|
|
// group doesn't exist anymore - recreate it
|
|
if err != nil && strings.HasPrefix(err.Error(), "NOGROUP") {
|
|
cErr := c.createGroupForAllStreams(ctx)
|
|
if cErr != nil {
|
|
c.pushError(fmt.Errorf("failed to re-create group for at least one stream: %w", err))
|
|
consecutiveFailures++
|
|
} else {
|
|
c.pushInfo(fmt.Sprintf("re-created group for all streams where it got removed, original error: %s",
|
|
err))
|
|
consecutiveFailures = 0
|
|
}
|
|
continue
|
|
}
|
|
|
|
// any other error we handle generically
|
|
if err != nil && !errors.Is(err, redis.Nil) {
|
|
consecutiveFailures++
|
|
c.pushError(fmt.Errorf("failed to read redis streams %v (consecutive fails: %d): %w",
|
|
streamsArg, consecutiveFailures, err))
|
|
continue
|
|
}
|
|
|
|
// check if we are done with scanning the history of all streams
|
|
if scanHistory {
|
|
scanHistory = false
|
|
|
|
// Getting history always returns all streams in the same order as queried
|
|
// (even a stream that doesn't have any history left, in that case redis returns an empty slice)
|
|
// Thus, we can use a simple incrementing index to get the streamArg for a stream in the response
|
|
x := 0
|
|
for _, stream := range resReadStream {
|
|
// If the stream had messages in the history, continue scanning after the latest read message.
|
|
if len(stream.Messages) > 0 {
|
|
scanHistory = true
|
|
streamsArg[streamLen+x] = stream.Messages[len(stream.Messages)-1].ID
|
|
|
|
c.pushInfo(fmt.Sprintf(
|
|
"stream %q had %d more messages in the history (delivered but no yet acked),"+
|
|
"continuing scanning after %q",
|
|
stream.Stream,
|
|
len(stream.Messages),
|
|
streamsArg[streamLen+x],
|
|
))
|
|
}
|
|
x++
|
|
}
|
|
|
|
if !scanHistory {
|
|
c.pushInfo("completed scan of history")
|
|
|
|
// Update stream args to read latest messages for all streams
|
|
for j := 0; j < streamLen; j++ {
|
|
streamsArg[streamLen+j] = ">"
|
|
}
|
|
|
|
continue
|
|
}
|
|
}
|
|
|
|
// reset fail count
|
|
consecutiveFailures = 0
|
|
|
|
// if no messages were read we can skip iteration
|
|
if len(resReadStream) == 0 {
|
|
continue
|
|
}
|
|
|
|
// retrieve all messages across all streams and put them into the message queue
|
|
for _, stream := range resReadStream {
|
|
for _, m := range stream.Messages {
|
|
c.messageQueue <- message{
|
|
streamID: stream.Stream,
|
|
id: m.ID,
|
|
values: m.Values,
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// reclaimer periodically inspects pending messages with XPENDING command.
|
|
// If a message sits longer than processingTimeout, we attempt to reclaim the message for this consumer
|
|
// and enqueue it for processing.
|
|
//
|
|
//nolint:funlen,gocognit // refactor if needed
|
|
func (c *RedisConsumer) reclaimer(ctx context.Context, reclaimInterval time.Duration) {
|
|
reclaimTimer := time.NewTimer(reclaimInterval)
|
|
defer func() {
|
|
reclaimTimer.Stop()
|
|
}()
|
|
|
|
const (
|
|
baseCount = 16
|
|
maxCount = 1024
|
|
)
|
|
|
|
// the minimum message ID which we are querying for.
|
|
// redis treats "-" as smaller than any valid message ID
|
|
start := "-"
|
|
// the maximum message ID which we are querying for.
|
|
// redis treats "+" as bigger than any valid message ID
|
|
end := "+"
|
|
count := baseCount
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-reclaimTimer.C:
|
|
for streamID, handler := range c.streams {
|
|
resPending, errPending := c.rdb.XPendingExt(ctx, &redis.XPendingExtArgs{
|
|
Stream: streamID,
|
|
Group: c.groupName,
|
|
Start: start,
|
|
End: end,
|
|
Idle: handler.config.idleTimeout,
|
|
Count: int64(count),
|
|
}).Result()
|
|
if errPending != nil && !errors.Is(errPending, redis.Nil) {
|
|
c.pushError(fmt.Errorf("failed to fetch pending messages: %w", errPending))
|
|
continue
|
|
}
|
|
|
|
if len(resPending) == 0 {
|
|
continue
|
|
}
|
|
|
|
// It's safe to change start of the requested range for the next iteration to oldest message.
|
|
start = resPending[0].ID
|
|
|
|
for _, resMessage := range resPending {
|
|
if resMessage.RetryCount > int64(handler.config.maxRetries) {
|
|
// Retry count gets increased after every XCLAIM.
|
|
// Large retry count might mean there is something wrong with the message, so we'll XACK it.
|
|
// WARNING this will discard the message!
|
|
errAck := c.rdb.XAck(ctx, streamID, c.groupName, resMessage.ID).Err()
|
|
if errAck != nil {
|
|
c.pushError(fmt.Errorf(
|
|
"failed to force acknowledge (discard) message '%s' (Retries: %d) in stream '%s': %w",
|
|
resMessage.ID, resMessage.RetryCount, streamID, errAck))
|
|
} else {
|
|
retryCount := resMessage.RetryCount - 1 // redis is counting this execution as retry
|
|
c.pushError(fmt.Errorf(
|
|
"force acknowledged (discarded) message '%s' (Retries: %d) in stream '%s'",
|
|
resMessage.ID, retryCount, streamID))
|
|
}
|
|
continue
|
|
}
|
|
|
|
// Otherwise, claim the message so we can retry it.
|
|
claimedMessages, errClaim := c.rdb.XClaim(ctx, &redis.XClaimArgs{
|
|
Stream: streamID,
|
|
Group: c.groupName,
|
|
Consumer: c.consumerName,
|
|
MinIdle: handler.config.idleTimeout,
|
|
Messages: []string{resMessage.ID},
|
|
}).Result()
|
|
|
|
if errors.Is(errClaim, redis.Nil) {
|
|
// Receiving redis.Nil here means the message is removed from the stream (because of MAXLEN).
|
|
// The only option is to acknowledge it with XACK.
|
|
errAck := c.rdb.XAck(ctx, streamID, c.groupName, resMessage.ID).Err()
|
|
if errAck != nil {
|
|
c.pushError(fmt.Errorf("failed to acknowledge failed message '%s' in stream '%s': %w",
|
|
resMessage.ID, streamID, errAck))
|
|
} else {
|
|
c.pushInfo(fmt.Sprintf("acknowledged failed message '%s' in stream '%s'",
|
|
resMessage.ID, streamID))
|
|
}
|
|
|
|
continue
|
|
}
|
|
|
|
if errClaim != nil {
|
|
// This can happen if two consumers try to claim the same message at once.
|
|
// One would succeed and the other will get an error.
|
|
c.pushError(fmt.Errorf("failed to claim message '%s' in stream '%s': %w",
|
|
resMessage.ID, streamID, errClaim))
|
|
|
|
continue
|
|
}
|
|
|
|
// This is not expected to happen (message will be retried or eventually discarded)
|
|
if len(claimedMessages) == 0 {
|
|
c.pushError(fmt.Errorf(
|
|
"no error when claiming message '%s' in stream '%s', but redis returned no message",
|
|
resMessage.ID, streamID))
|
|
|
|
continue
|
|
}
|
|
|
|
// we claimed only one message id so there is only one message in the slice
|
|
claimedMessage := claimedMessages[0]
|
|
c.messageQueue <- message{
|
|
streamID: streamID,
|
|
id: claimedMessage.ID,
|
|
values: claimedMessage.Values,
|
|
}
|
|
}
|
|
|
|
// If number of messages that we got is equal to the number that we requested
|
|
// it means that there's a lot for processing, so we'll increase number of messages
|
|
// that we'll pull in the next iteration.
|
|
if len(resPending) == count {
|
|
count *= 2
|
|
if count > maxCount {
|
|
count = maxCount
|
|
}
|
|
} else {
|
|
count = baseCount
|
|
}
|
|
}
|
|
|
|
reclaimTimer.Reset(reclaimInterval)
|
|
}
|
|
}
|
|
}
|
|
|
|
// consumer method consumes messages coming from Redis. The method terminates when messageQueue channel closes.
|
|
func (c *RedisConsumer) consumer(ctx context.Context) {
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case m := <-c.messageQueue:
|
|
if m.id == "" {
|
|
// id should never be empty, if it is then the channel is closed
|
|
return
|
|
}
|
|
|
|
handler, ok := c.streams[m.streamID]
|
|
if !ok {
|
|
// we don't want to ack the message
|
|
// maybe someone else can claim and process it (worst case it expires)
|
|
c.pushError(fmt.Errorf("received message '%s' in stream '%s' that doesn't belong to us, skip",
|
|
m.id, m.streamID))
|
|
continue
|
|
}
|
|
|
|
err := func() (err error) {
|
|
// Ensure that handlers don't cause panic.
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
c.pushError(fmt.Errorf("PANIC when processing message '%s' in stream '%s':\n%s",
|
|
m.id, m.streamID, debug.Stack()))
|
|
}
|
|
}()
|
|
|
|
return handler.handle(ctx, m.id, m.values)
|
|
}()
|
|
if err != nil {
|
|
c.pushError(fmt.Errorf("failed to process message '%s' in stream '%s': %w", m.id, m.streamID, err))
|
|
continue
|
|
}
|
|
|
|
err = c.rdb.XAck(ctx, m.streamID, c.groupName, m.id).Err()
|
|
if err != nil {
|
|
c.pushError(fmt.Errorf("failed to acknowledge message '%s' in stream '%s': %w", m.id, m.streamID, err))
|
|
continue
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *RedisConsumer) removeStaleConsumers(ctx context.Context, maxAge time.Duration) {
|
|
for streamID := range c.streams {
|
|
// Fetch all consumers for this stream and group.
|
|
resConsumers, err := c.rdb.XInfoConsumers(ctx, streamID, c.groupName).Result()
|
|
if err != nil {
|
|
c.pushError(fmt.Errorf("failed to read consumers for stream '%s': %w", streamID, err))
|
|
return
|
|
}
|
|
|
|
// Delete old consumers, but only if they don't have pending messages.
|
|
for _, resConsumer := range resConsumers {
|
|
age := time.Duration(resConsumer.Idle) * time.Millisecond
|
|
if resConsumer.Pending > 0 || age < maxAge {
|
|
continue
|
|
}
|
|
|
|
err = c.rdb.XGroupDelConsumer(ctx, streamID, c.groupName, resConsumer.Name).Err()
|
|
if err != nil {
|
|
c.pushError(fmt.Errorf(
|
|
"failed to remove stale consumer '%s' from group '%s' (age '%s') for stream '%s': %w",
|
|
resConsumer.Name, c.groupName, age, streamID, err))
|
|
continue
|
|
}
|
|
|
|
c.pushInfo(fmt.Sprintf("removed stale consumer '%s' from group '%s' (age '%s') for stream '%s'",
|
|
resConsumer.Name, c.groupName, age, streamID))
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *RedisConsumer) pushError(err error) {
|
|
select {
|
|
case c.errorCh <- err:
|
|
default:
|
|
}
|
|
}
|
|
|
|
func (c *RedisConsumer) pushInfo(s string) {
|
|
select {
|
|
case c.infoCh <- s:
|
|
default:
|
|
}
|
|
}
|
|
|
|
func (c *RedisConsumer) Errors() <-chan error { return c.errorCh }
|
|
func (c *RedisConsumer) Infos() <-chan string { return c.infoCh }
|
|
|
|
func (c *RedisConsumer) createGroupForAllStreams(ctx context.Context) error {
|
|
for streamID := range c.streams {
|
|
err := createGroup(ctx, c.rdb, streamID, c.groupName)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func createGroup(ctx context.Context, rdb redis.UniversalClient, streamID string, groupName string) error {
|
|
// Creates a new consumer group that starts receiving messages from now on.
|
|
// Existing messges in the queue are ignored (we don't want to overload a group with old messages)
|
|
// For more details of the XGROUPCREATE api see https://redis.io/commands/xgroup-create/
|
|
err := rdb.XGroupCreateMkStream(ctx, streamID, groupName, "$").Err()
|
|
if err != nil && !strings.HasPrefix(err.Error(), "BUSYGROUP") {
|
|
return fmt.Errorf("failed to create consumer group '%s' for stream '%s': %w", groupName, streamID, err)
|
|
}
|
|
|
|
return nil
|
|
}
|