drone/events/reader.go

226 lines
7.7 KiB
Go

// Copyright 2023 Harness, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package events
import (
"bytes"
"context"
"encoding/gob"
"errors"
"fmt"
"github.com/rs/zerolog/log"
)
// ReaderFactoryFunc is an abstraction of a factory method that creates customized Reader implementations (type [R]).
// It is triggered by the ReaderFactory to create a new instance of the Reader to launch.
// The provided GenericReader object is available exclusively to the factory method (every call has a fresh instance)
// and should be used as base of any custom Reader implementation (use ReaderRegisterEvent to register custom handler).
type ReaderFactoryFunc[R Reader] func(reader *GenericReader) (R, error)
// ReaderFactory allows to launch event readers of type [R] (can be GenericReader or customized readers).
type ReaderFactory[R Reader] struct {
category string
streamConsumerFactoryFn StreamConsumerFactoryFunc
readerFactoryFn ReaderFactoryFunc[R]
}
// Launch launches a new reader for the provided group and client name.
// The setup method should be used to register the different events the reader will act on.
// To stop the reader and cleanup its resources the returned ReaderCanceler can be used.
// The reader also cancels automatically when the provided context is canceled.
// NOTE: Do not setup the reader outside of the setup method!
func (f *ReaderFactory[R]) Launch(ctx context.Context,
groupName string, readerName string, setup func(R) error) (*ReaderCanceler, error) {
if groupName == "" {
return nil, errors.New("groupName can't be empty")
}
if setup == nil {
return nil, errors.New("setup function can't be nil")
}
// setup ctx with copied logger that has extra fields set
log := log.Ctx(ctx).With().
Str("events.category", f.category).
Str("events.group_name", groupName).
Str("events.reader_name", readerName).
Logger()
// create new stream consumer using factory method
streamConsumer, err := f.streamConsumerFactoryFn(groupName, readerName)
if err != nil {
return nil, fmt.Errorf("failed to create new stream consumer: %w", err)
}
// create generic reader object
innerReader := &GenericReader{
streamConsumer: streamConsumer,
category: f.category,
}
// create new reader (could return the innerReader itself, but also allows to launch customized readers)
reader, err := f.readerFactoryFn(innerReader)
if err != nil {
//nolint:gocritic // only way to achieve this AFAIK - lint proposal is not building
return nil, fmt.Errorf("failed creation of event reader of type %T: %w", *new(R), err)
}
// execute setup function on reader (will configure concurrency, processingTimeout, ..., and register handlers)
err = setup(reader)
if err != nil {
return nil, fmt.Errorf("failed custom setup of event reader: %w", err)
}
// hook into all available logs
go func(errorCh <-chan error) {
for err := range errorCh {
log.Err(err).Msg("received an error from stream consumer")
}
}(streamConsumer.Errors())
go func(infoCh <-chan string) {
for s := range infoCh {
log.Info().Msgf("stream consumer: %s", s)
}
}(streamConsumer.Infos())
// prepare context (inject logger and make canceable)
ctx = log.WithContext(ctx)
ctx, cancelFn := context.WithCancel(ctx)
// start consumer
err = innerReader.streamConsumer.Start(ctx)
if err != nil {
cancelFn()
return nil, fmt.Errorf("failed to start consumer: %w", err)
}
return &ReaderCanceler{
cancelFn: func() error {
cancelFn()
return nil
},
}, nil
}
// ReaderCanceler exposes the functionality to cancel a reader explicitly.
type ReaderCanceler struct {
canceled bool
cancelFn func() error
}
func (d *ReaderCanceler) Cancel() error {
if d.canceled {
return errors.New("reader has already been canceled")
}
// call cancel (might be async)
err := d.cancelFn()
if err != nil {
return fmt.Errorf("failed to cancel reader: %w", err)
}
d.canceled = true
return nil
}
// Reader specifies the minimum functionality a reader should expose.
// NOTE: we don't want to enforce any event registration methods here, allowing full control for customized readers.
type Reader interface {
Configure(opts ...ReaderOption)
}
type HandlerFunc[T interface{}] func(context.Context, *Event[T]) error
// GenericReader represents an event reader that supports registering type safe handlers
// for an arbitrary set of custom events within a given event category using the ReaderRegisterEvent method.
// NOTE: Optimally this should be an interface with RegisterEvent[T] method, but that's currently not possible in go.
// IMPORTANT: This reader should not be instantiated from external packages.
type GenericReader struct {
streamConsumer StreamConsumer
category string
}
// ReaderRegisterEvent registers a type safe handler function on the reader for a specific event.
// This method allows to register type safe handlers without the need of handling the raw stream payload.
// NOTE: Generic arguments are not allowed for struct methods, hence pass the reader as input parameter.
func ReaderRegisterEvent[T interface{}](reader *GenericReader,
eventType EventType, fn HandlerFunc[T], opts ...HandlerOption) error {
streamID := getStreamID(reader.category, eventType)
// register handler for event specific stream.
return reader.streamConsumer.Register(streamID,
func(ctx context.Context, messageID string, streamPayload map[string]interface{}) error {
if streamPayload == nil {
return fmt.Errorf("stream payload is nil for message '%s'", messageID)
}
// retrieve event from stream payload
eventRaw, ok := streamPayload[streamPayloadKey]
if !ok {
return fmt.Errorf("stream payload doesn't contain event (key: '%s') for message '%s'", streamPayloadKey, messageID)
}
// retrieve bytes from raw event
// NOTE: Redis returns []byte as string - to avoid unnecessary conversion we handle both types here.
var eventBytes []byte
switch v := eventRaw.(type) {
case string:
eventBytes = []byte(v)
case []byte:
eventBytes = v
default:
return fmt.Errorf("stream payload is not of expected type string or []byte but of type %T (message '%s')",
eventRaw, messageID)
}
// decode event to correct type
var event Event[T]
decoder := gob.NewDecoder(bytes.NewReader(eventBytes))
err := decoder.Decode(&event)
if err != nil {
//nolint:gocritic // only way to achieve this AFAIK - lint proposal is not building
return fmt.Errorf("stream payload can't be decoded into type %T (message '%s')", *new(T), messageID)
}
// populate event ID using the message ID (has to be populated here, producer doesn't know the message ID yet)
event.ID = messageID
// update ctx with event type for proper logging
log := log.Ctx(ctx).With().
Str("events.type", string(eventType)).
Str("events.id", event.ID).
Logger()
ctx = log.WithContext(ctx)
// call provided handler with correctly typed payload
err = fn(ctx, &event)
// handle discardEventError
if errors.Is(err, errDiscardEvent) {
log.Warn().Err(err).Msgf("discarding event '%s'", event.ID)
return nil
}
// any other error we return as is
return err
}, toStreamHandlerOptions(opts)...)
}
func (r *GenericReader) Configure(opts ...ReaderOption) {
r.streamConsumer.Configure(toStreamConsumerOptions(opts)...)
}