diff --git a/stream/memory_consumer.go b/stream/memory_consumer.go index 9e5532eb0..8b6a7f581 100644 --- a/stream/memory_consumer.go +++ b/stream/memory_consumer.go @@ -187,44 +187,77 @@ func (c *MemoryConsumer) consume(ctx context.Context) { continue } - err := func() (err error) { - // Ensure that handlers don't cause panic. - defer func() { - if r := recover(); r != nil { - c.pushError(fmt.Errorf("PANIC when processing message '%s' in stream '%s':\n%s", - m.id, m.streamID, debug.Stack())) - } - }() - - return handler.handle(ctx, m.id, m.values) - }() - - if err != nil { - c.pushError(fmt.Errorf("failed to process message with id '%s' in stream '%s' (retries: %d): %w", - m.id, m.streamID, m.retries, err)) - - if m.retries >= int64(handler.config.maxRetries) { - c.pushError(fmt.Errorf( - "discard message with id '%s' from stream '%s' - failed %d retries", - m.id, m.streamID, m.retries)) - continue - } - - // increase retry count - m.retries++ - - // requeue message for a retry (needs to be in a separate go func to avoid deadlock) - // IMPORTANT: this won't requeue to broker, only in this consumer's queue! - go func() { - // TODO: linear/exponential backoff relative to retry count might be good - time.Sleep(handler.config.idleTimeout) - c.messageQueue <- m - }() - } + c.processMessage(ctx, handler, m) } } } +func (c *MemoryConsumer) processMessage(ctx context.Context, handler handler, m memoryMessage) { + var handlingErr error + + ctxWithCancel, cancelFn := context.WithCancel(ctx) + defer func(err error) { + // If the original execution errors out, we rely on the timeout to retry. This is to keep the behaviour same + // as the redis consumer. + if err == nil { + cancelFn() + } + }(handlingErr) + + // Start a retry goroutine with `idleTimeout` delay + go c.retryPostTimeout(ctxWithCancel, handler, m) + + handlingErr = func() (err error) { + // Ensure that handlers don't cause panic. + defer func() { + if r := recover(); r != nil { + c.pushError(fmt.Errorf("PANIC when processing message '%s' in stream '%s':\n%s", + m.id, m.streamID, debug.Stack())) + } + }() + + return handler.handle(ctx, m.id, m.values) + }() + + if handlingErr != nil { + c.pushError(fmt.Errorf("failed to process message with id '%s' in stream '%s' (retries: %d): %w", + m.id, m.streamID, m.retries, handlingErr)) + } +} + +func (c *MemoryConsumer) retryPostTimeout(ctxWithCancel context.Context, handler handler, m memoryMessage) { + timer := time.NewTimer(handler.config.idleTimeout) + defer timer.Stop() + select { + case <-timer.C: + c.retryMessage(m, handler.config.maxRetries) + case <-ctxWithCancel.Done(): + // Retry canceled if message is processed + // Drain the timer channel if it is already stopped + if !timer.Stop() { + <-timer.C + } + } +} + +func (c *MemoryConsumer) retryMessage(m memoryMessage, maxRetries int) { + if m.retries >= int64(maxRetries) { + c.pushError(fmt.Errorf("discard message with id '%s' from stream '%s' - failed %d retries", + m.id, m.streamID, m.retries)) + return + } + + // increase retry count + m.retries++ + + // requeue message for a retry (needs to be in a separate go func to avoid deadlock) + // IMPORTANT: this won't requeue to broker, only in this consumer's queue! + go func() { + // TODO: linear/exponential backoff relative to retry count might be good + c.messageQueue <- m + }() +} + func (c *MemoryConsumer) Errors() <-chan error { return c.errorCh } func (c *MemoryConsumer) Infos() <-chan string { return c.infoCh }