mirror of https://github.com/harness/drone.git
226 lines
7.7 KiB
Go
226 lines
7.7 KiB
Go
// Copyright 2023 Harness, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package events
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/gob"
|
|
"errors"
|
|
"fmt"
|
|
|
|
"github.com/rs/zerolog/log"
|
|
)
|
|
|
|
// ReaderFactoryFunc is an abstraction of a factory method that creates customized Reader implementations (type [R]).
|
|
// It is triggered by the ReaderFactory to create a new instance of the Reader to launch.
|
|
// The provided GenericReader object is available exclusively to the factory method (every call has a fresh instance)
|
|
// and should be used as base of any custom Reader implementation (use ReaderRegisterEvent to register custom handler).
|
|
type ReaderFactoryFunc[R Reader] func(reader *GenericReader) (R, error)
|
|
|
|
// ReaderFactory allows to launch event readers of type [R] (can be GenericReader or customized readers).
|
|
type ReaderFactory[R Reader] struct {
|
|
category string
|
|
streamConsumerFactoryFn StreamConsumerFactoryFunc
|
|
readerFactoryFn ReaderFactoryFunc[R]
|
|
}
|
|
|
|
// Launch launches a new reader for the provided group and client name.
|
|
// The setup method should be used to register the different events the reader will act on.
|
|
// To stop the reader and cleanup its resources the returned ReaderCanceler can be used.
|
|
// The reader also cancels automatically when the provided context is canceled.
|
|
// NOTE: Do not setup the reader outside of the setup method!
|
|
func (f *ReaderFactory[R]) Launch(ctx context.Context,
|
|
groupName string, readerName string, setup func(R) error) (*ReaderCanceler, error) {
|
|
if groupName == "" {
|
|
return nil, errors.New("groupName can't be empty")
|
|
}
|
|
if setup == nil {
|
|
return nil, errors.New("setup function can't be nil")
|
|
}
|
|
|
|
// setup ctx with copied logger that has extra fields set
|
|
log := log.Ctx(ctx).With().
|
|
Str("events.category", f.category).
|
|
Str("events.group_name", groupName).
|
|
Str("events.reader_name", readerName).
|
|
Logger()
|
|
|
|
// create new stream consumer using factory method
|
|
streamConsumer, err := f.streamConsumerFactoryFn(groupName, readerName)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create new stream consumer: %w", err)
|
|
}
|
|
|
|
// create generic reader object
|
|
innerReader := &GenericReader{
|
|
streamConsumer: streamConsumer,
|
|
category: f.category,
|
|
}
|
|
|
|
// create new reader (could return the innerReader itself, but also allows to launch customized readers)
|
|
reader, err := f.readerFactoryFn(innerReader)
|
|
if err != nil {
|
|
//nolint:gocritic // only way to achieve this AFAIK - lint proposal is not building
|
|
return nil, fmt.Errorf("failed creation of event reader of type %T: %w", *new(R), err)
|
|
}
|
|
|
|
// execute setup function on reader (will configure concurrency, processingTimeout, ..., and register handlers)
|
|
err = setup(reader)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed custom setup of event reader: %w", err)
|
|
}
|
|
|
|
// hook into all available logs
|
|
go func(errorCh <-chan error) {
|
|
for err := range errorCh {
|
|
log.Err(err).Msg("received an error from stream consumer")
|
|
}
|
|
}(streamConsumer.Errors())
|
|
|
|
go func(infoCh <-chan string) {
|
|
for s := range infoCh {
|
|
log.Info().Msgf("stream consumer: %s", s)
|
|
}
|
|
}(streamConsumer.Infos())
|
|
|
|
// prepare context (inject logger and make canceable)
|
|
ctx = log.WithContext(ctx)
|
|
ctx, cancelFn := context.WithCancel(ctx)
|
|
|
|
// start consumer
|
|
err = innerReader.streamConsumer.Start(ctx)
|
|
if err != nil {
|
|
cancelFn()
|
|
return nil, fmt.Errorf("failed to start consumer: %w", err)
|
|
}
|
|
|
|
return &ReaderCanceler{
|
|
cancelFn: func() error {
|
|
cancelFn()
|
|
return nil
|
|
},
|
|
}, nil
|
|
}
|
|
|
|
// ReaderCanceler exposes the functionality to cancel a reader explicitly.
|
|
type ReaderCanceler struct {
|
|
canceled bool
|
|
cancelFn func() error
|
|
}
|
|
|
|
func (d *ReaderCanceler) Cancel() error {
|
|
if d.canceled {
|
|
return errors.New("reader has already been canceled")
|
|
}
|
|
|
|
// call cancel (might be async)
|
|
err := d.cancelFn()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to cancel reader: %w", err)
|
|
}
|
|
|
|
d.canceled = true
|
|
|
|
return nil
|
|
}
|
|
|
|
// Reader specifies the minimum functionality a reader should expose.
|
|
// NOTE: we don't want to enforce any event registration methods here, allowing full control for customized readers.
|
|
type Reader interface {
|
|
Configure(opts ...ReaderOption)
|
|
}
|
|
|
|
type HandlerFunc[T interface{}] func(context.Context, *Event[T]) error
|
|
|
|
// GenericReader represents an event reader that supports registering type safe handlers
|
|
// for an arbitrary set of custom events within a given event category using the ReaderRegisterEvent method.
|
|
// NOTE: Optimally this should be an interface with RegisterEvent[T] method, but that's currently not possible in go.
|
|
// IMPORTANT: This reader should not be instantiated from external packages.
|
|
type GenericReader struct {
|
|
streamConsumer StreamConsumer
|
|
category string
|
|
}
|
|
|
|
// ReaderRegisterEvent registers a type safe handler function on the reader for a specific event.
|
|
// This method allows to register type safe handlers without the need of handling the raw stream payload.
|
|
// NOTE: Generic arguments are not allowed for struct methods, hence pass the reader as input parameter.
|
|
func ReaderRegisterEvent[T interface{}](reader *GenericReader,
|
|
eventType EventType, fn HandlerFunc[T], opts ...HandlerOption) error {
|
|
streamID := getStreamID(reader.category, eventType)
|
|
|
|
// register handler for event specific stream.
|
|
return reader.streamConsumer.Register(streamID,
|
|
func(ctx context.Context, messageID string, streamPayload map[string]interface{}) error {
|
|
if streamPayload == nil {
|
|
return fmt.Errorf("stream payload is nil for message '%s'", messageID)
|
|
}
|
|
|
|
// retrieve event from stream payload
|
|
eventRaw, ok := streamPayload[streamPayloadKey]
|
|
if !ok {
|
|
return fmt.Errorf("stream payload doesn't contain event (key: '%s') for message '%s'", streamPayloadKey, messageID)
|
|
}
|
|
|
|
// retrieve bytes from raw event
|
|
// NOTE: Redis returns []byte as string - to avoid unnecessary conversion we handle both types here.
|
|
var eventBytes []byte
|
|
switch v := eventRaw.(type) {
|
|
case string:
|
|
eventBytes = []byte(v)
|
|
case []byte:
|
|
eventBytes = v
|
|
default:
|
|
return fmt.Errorf("stream payload is not of expected type string or []byte but of type %T (message '%s')",
|
|
eventRaw, messageID)
|
|
}
|
|
|
|
// decode event to correct type
|
|
var event Event[T]
|
|
decoder := gob.NewDecoder(bytes.NewReader(eventBytes))
|
|
err := decoder.Decode(&event)
|
|
if err != nil {
|
|
//nolint:gocritic // only way to achieve this AFAIK - lint proposal is not building
|
|
return fmt.Errorf("stream payload can't be decoded into type %T (message '%s')", *new(T), messageID)
|
|
}
|
|
|
|
// populate event ID using the message ID (has to be populated here, producer doesn't know the message ID yet)
|
|
event.ID = messageID
|
|
|
|
// update ctx with event type for proper logging
|
|
log := log.Ctx(ctx).With().
|
|
Str("events.type", string(eventType)).
|
|
Str("events.id", event.ID).
|
|
Logger()
|
|
ctx = log.WithContext(ctx)
|
|
|
|
// call provided handler with correctly typed payload
|
|
err = fn(ctx, &event)
|
|
|
|
// handle discardEventError
|
|
if errors.Is(err, errDiscardEvent) {
|
|
log.Warn().Err(err).Msgf("discarding event '%s'", event.ID)
|
|
return nil
|
|
}
|
|
|
|
// any other error we return as is
|
|
return err
|
|
}, toStreamHandlerOptions(opts)...)
|
|
}
|
|
|
|
func (r *GenericReader) Configure(opts ...ReaderOption) {
|
|
r.streamConsumer.Configure(toStreamConsumerOptions(opts)...)
|
|
}
|