[Events] This PR introduces the Trigger Service for Event Consumption + Some Minor improvements (#419)

jobatzil/rename
Johannes Batzill 2023-09-08 23:52:19 +00:00 committed by Harness
parent 32888b07ce
commit 0d086a1a4d
16 changed files with 272 additions and 43 deletions

View File

@ -13,6 +13,7 @@ import (
"github.com/harness/gitness/events" "github.com/harness/gitness/events"
"github.com/harness/gitness/gitrpc" "github.com/harness/gitness/gitrpc"
"github.com/harness/gitness/gitrpc/server" "github.com/harness/gitness/gitrpc/server"
"github.com/harness/gitness/internal/services/trigger"
"github.com/harness/gitness/internal/services/webhook" "github.com/harness/gitness/internal/services/webhook"
"github.com/harness/gitness/lock" "github.com/harness/gitness/lock"
"github.com/harness/gitness/store/database" "github.com/harness/gitness/store/database"
@ -147,23 +148,26 @@ func ProvideEventsConfig() (events.Config, error) {
return config, nil return config, nil
} }
// ProvideWebhookConfig loads the webhook config from the environment. // ProvideWebhookConfig loads the webhook service config from the main config.
// It backfills certain config elements if required. func ProvideWebhookConfig(config *types.Config) webhook.Config {
func ProvideWebhookConfig() (webhook.Config, error) { return webhook.Config{
config := webhook.Config{} UserAgentIdentity: config.Webhook.UserAgentIdentity,
err := envconfig.Process("", &config) HeaderIdentity: config.Webhook.HeaderIdentity,
if err != nil { EventReaderName: config.InstanceID,
return webhook.Config{}, fmt.Errorf("failed to load events config: %w", err) Concurrency: config.Webhook.Concurrency,
MaxRetries: config.Webhook.MaxRetries,
AllowPrivateNetwork: config.Webhook.AllowPrivateNetwork,
AllowLoopback: config.Webhook.AllowLoopback,
} }
}
if config.EventReaderName == "" { // ProvideTriggerConfig loads the trigger service config from the main config.
config.EventReaderName, err = getSanitizedMachineName() func ProvideTriggerConfig(config *types.Config) trigger.Config {
if err != nil { return trigger.Config{
return webhook.Config{}, fmt.Errorf("failed to get sanitized machine name: %w", err) EventReaderName: config.InstanceID,
} Concurrency: config.Webhook.Concurrency,
MaxRetries: config.Webhook.MaxRetries,
} }
return config, nil
} }
// ProvideLockConfig generates the `lock` package config from the gitness config. // ProvideLockConfig generates the `lock` package config from the gitness config.

View File

@ -32,7 +32,7 @@ import (
"github.com/harness/gitness/internal/api/controller/space" "github.com/harness/gitness/internal/api/controller/space"
"github.com/harness/gitness/internal/api/controller/system" "github.com/harness/gitness/internal/api/controller/system"
"github.com/harness/gitness/internal/api/controller/template" "github.com/harness/gitness/internal/api/controller/template"
"github.com/harness/gitness/internal/api/controller/trigger" controllertrigger "github.com/harness/gitness/internal/api/controller/trigger"
"github.com/harness/gitness/internal/api/controller/user" "github.com/harness/gitness/internal/api/controller/user"
controllerwebhook "github.com/harness/gitness/internal/api/controller/webhook" controllerwebhook "github.com/harness/gitness/internal/api/controller/webhook"
"github.com/harness/gitness/internal/auth/authn" "github.com/harness/gitness/internal/auth/authn"
@ -54,6 +54,7 @@ import (
"github.com/harness/gitness/internal/services/importer" "github.com/harness/gitness/internal/services/importer"
"github.com/harness/gitness/internal/services/job" "github.com/harness/gitness/internal/services/job"
pullreqservice "github.com/harness/gitness/internal/services/pullreq" pullreqservice "github.com/harness/gitness/internal/services/pullreq"
"github.com/harness/gitness/internal/services/trigger"
"github.com/harness/gitness/internal/services/webhook" "github.com/harness/gitness/internal/services/webhook"
"github.com/harness/gitness/internal/store" "github.com/harness/gitness/internal/store"
"github.com/harness/gitness/internal/store/cache" "github.com/harness/gitness/internal/store/cache"
@ -106,6 +107,8 @@ func initSystem(ctx context.Context, config *types.Config) (*cliserver.System, e
events.WireSet, events.WireSet,
cliserver.ProvideWebhookConfig, cliserver.ProvideWebhookConfig,
webhook.WireSet, webhook.WireSet,
cliserver.ProvideTriggerConfig,
trigger.WireSet,
githook.WireSet, githook.WireSet,
cliserver.ProvideLockConfig, cliserver.ProvideLockConfig,
lock.WireSet, lock.WireSet,
@ -129,7 +132,7 @@ func initSystem(ctx context.Context, config *types.Config) (*cliserver.System, e
eventsstream.WireSet, eventsstream.WireSet,
scheduler.WireSet, scheduler.WireSet,
commit.WireSet, commit.WireSet,
trigger.WireSet, controllertrigger.WireSet,
plugin.WireSet, plugin.WireSet,
importer.WireSet, importer.WireSet,
) )

View File

@ -52,6 +52,7 @@ import (
"github.com/harness/gitness/internal/services/importer" "github.com/harness/gitness/internal/services/importer"
"github.com/harness/gitness/internal/services/job" "github.com/harness/gitness/internal/services/job"
pullreq2 "github.com/harness/gitness/internal/services/pullreq" pullreq2 "github.com/harness/gitness/internal/services/pullreq"
trigger2 "github.com/harness/gitness/internal/services/trigger"
"github.com/harness/gitness/internal/services/webhook" "github.com/harness/gitness/internal/services/webhook"
"github.com/harness/gitness/internal/store" "github.com/harness/gitness/internal/store"
"github.com/harness/gitness/internal/store/cache" "github.com/harness/gitness/internal/store/cache"
@ -174,10 +175,7 @@ func initSystem(ctx context.Context, config *types.Config) (*server.System, erro
} }
migrator := codecomments.ProvideMigrator(gitrpcInterface) migrator := codecomments.ProvideMigrator(gitrpcInterface)
pullreqController := pullreq.ProvideController(db, provider, authorizer, pullReqStore, pullReqActivityStore, codeCommentView, pullReqReviewStore, pullReqReviewerStore, repoStore, principalStore, gitrpcInterface, reporter, mutexManager, migrator) pullreqController := pullreq.ProvideController(db, provider, authorizer, pullReqStore, pullReqActivityStore, codeCommentView, pullReqReviewStore, pullReqReviewerStore, repoStore, principalStore, gitrpcInterface, reporter, mutexManager, migrator)
webhookConfig, err := server.ProvideWebhookConfig() webhookConfig := server.ProvideWebhookConfig(config)
if err != nil {
return nil, err
}
webhookStore := database.ProvideWebhookStore(db) webhookStore := database.ProvideWebhookStore(db)
webhookExecutionStore := database.ProvideWebhookExecutionStore(db) webhookExecutionStore := database.ProvideWebhookExecutionStore(db)
readerFactory, err := events4.ProvideReaderFactory(eventsSystem) readerFactory, err := events4.ProvideReaderFactory(eventsSystem)
@ -236,7 +234,12 @@ func initSystem(ctx context.Context, config *types.Config) (*server.System, erro
if err != nil { if err != nil {
return nil, err return nil, err
} }
servicesServices := services.ProvideServices(webhookService, pullreqService, executor, jobScheduler) triggerConfig := server.ProvideTriggerConfig(config)
triggerService, err := trigger2.ProvideService(ctx, triggerConfig, readerFactory, eventsReaderFactory)
if err != nil {
return nil, err
}
servicesServices := services.ProvideServices(webhookService, pullreqService, triggerService, jobScheduler)
serverSystem := server.NewSystem(bootstrapBootstrap, serverServer, poller, grpcServer, cronManager, servicesServices) serverSystem := server.NewSystem(bootstrapBootstrap, serverServer, poller, grpcServer, cronManager, servicesServices)
return serverSystem, nil return serverSystem, nil
} }

View File

@ -43,9 +43,9 @@ func (f *ReaderFactory[R]) Launch(ctx context.Context,
// setup ctx with copied logger that has extra fields set // setup ctx with copied logger that has extra fields set
log := log.Ctx(ctx).With(). log := log.Ctx(ctx).With().
Str("events_category", f.category). Str("events.category", f.category).
Str("events_group_name", groupName). Str("events.group_name", groupName).
Str("events_reader_name", readerName). Str("events.reader_name", readerName).
Logger() Logger()
// create new stream consumer using factory method // create new stream consumer using factory method
@ -191,8 +191,8 @@ func ReaderRegisterEvent[T interface{}](reader *GenericReader,
// update ctx with event type for proper logging // update ctx with event type for proper logging
log := log.Ctx(ctx).With(). log := log.Ctx(ctx).With().
Str("events_type", string(eventType)). Str("events.type", string(eventType)).
Str("events_id", event.ID). Str("events.id", event.ID).
Logger() Logger()
ctx = log.WithContext(ctx) ctx = log.WithContext(ctx)

View File

@ -0,0 +1,22 @@
// Copyright 2022 Harness Inc. All rights reserved.
// Use of this source code is governed by the Polyform Free Trial License
// that can be found in the LICENSE.md file for this repository.
package trigger
import (
"context"
"github.com/harness/gitness/events"
gitevents "github.com/harness/gitness/internal/events/git"
)
func (s *Service) handleEventBranchCreated(ctx context.Context,
event *events.Event[*gitevents.BranchCreatedPayload]) error {
return events.NewDiscardEventErrorf("not implemented")
}
func (s *Service) handleEventBranchUpdated(ctx context.Context,
event *events.Event[*gitevents.BranchUpdatedPayload]) error {
return events.NewDiscardEventErrorf("not implemented")
}

View File

@ -0,0 +1,27 @@
// Copyright 2022 Harness Inc. All rights reserved.
// Use of this source code is governed by the Polyform Free Trial License
// that can be found in the LICENSE.md file for this repository.
package trigger
import (
"context"
"github.com/harness/gitness/events"
pullreqevents "github.com/harness/gitness/internal/events/pullreq"
)
func (s *Service) handleEventPullReqCreated(ctx context.Context,
event *events.Event[*pullreqevents.CreatedPayload]) error {
return events.NewDiscardEventErrorf("not implemented")
}
func (s *Service) handleEventPullReqReopened(ctx context.Context,
event *events.Event[*pullreqevents.ReopenedPayload]) error {
return events.NewDiscardEventErrorf("not implemented")
}
func (s *Service) handleEventPullReqBranchUpdated(ctx context.Context,
event *events.Event[*pullreqevents.BranchUpdatedPayload]) error {
return events.NewDiscardEventErrorf("not implemented")
}

View File

@ -0,0 +1,22 @@
// Copyright 2022 Harness Inc. All rights reserved.
// Use of this source code is governed by the Polyform Free Trial License
// that can be found in the LICENSE.md file for this repository.
package trigger
import (
"context"
"github.com/harness/gitness/events"
gitevents "github.com/harness/gitness/internal/events/git"
)
func (s *Service) handleEventTagCreated(ctx context.Context,
event *events.Event[*gitevents.TagCreatedPayload]) error {
return events.NewDiscardEventErrorf("not implemented")
}
func (s *Service) handleEventTagUpdated(ctx context.Context,
event *events.Event[*gitevents.TagUpdatedPayload]) error {
return events.NewDiscardEventErrorf("not implemented")
}

View File

@ -0,0 +1,103 @@
// Copyright 2022 Harness Inc. All rights reserved.
// Use of this source code is governed by the Polyform Free Trial License
// that can be found in the LICENSE.md file for this repository.
package trigger
import (
"context"
"errors"
"fmt"
"time"
"github.com/harness/gitness/events"
gitevents "github.com/harness/gitness/internal/events/git"
pullreqevents "github.com/harness/gitness/internal/events/pullreq"
"github.com/harness/gitness/stream"
)
const (
eventsReaderGroupName = "gitness:trigger"
)
type Config struct {
EventReaderName string
Concurrency int
MaxRetries int
}
func (c *Config) Prepare() error {
if c == nil {
return errors.New("config is required")
}
if c.EventReaderName == "" {
return errors.New("config.EventReaderName is required")
}
if c.Concurrency < 1 {
return errors.New("config.Concurrency has to be a positive number")
}
if c.MaxRetries < 0 {
return errors.New("config.MaxRetries can't be negative")
}
return nil
}
type Service struct{}
func New(
ctx context.Context,
config Config,
gitReaderFactory *events.ReaderFactory[*gitevents.Reader],
pullreqEvReaderFactory *events.ReaderFactory[*pullreqevents.Reader],
) (*Service, error) {
if err := config.Prepare(); err != nil {
return nil, fmt.Errorf("provided trigger service config is invalid: %w", err)
}
service := &Service{}
_, err := gitReaderFactory.Launch(ctx, eventsReaderGroupName, config.EventReaderName,
func(r *gitevents.Reader) error {
const idleTimeout = 1 * time.Minute
r.Configure(
stream.WithConcurrency(config.Concurrency),
stream.WithHandlerOptions(
stream.WithIdleTimeout(idleTimeout),
stream.WithMaxRetries(config.MaxRetries),
))
_ = r.RegisterBranchCreated(service.handleEventBranchCreated)
_ = r.RegisterBranchUpdated(service.handleEventBranchUpdated)
_ = r.RegisterTagCreated(service.handleEventTagCreated)
_ = r.RegisterTagUpdated(service.handleEventTagUpdated)
return nil
})
if err != nil {
return nil, fmt.Errorf("failed to launch git events reader: %w", err)
}
_, err = pullreqEvReaderFactory.Launch(ctx, eventsReaderGroupName, config.EventReaderName,
func(r *pullreqevents.Reader) error {
const idleTimeout = 1 * time.Minute
r.Configure(
stream.WithConcurrency(config.Concurrency),
stream.WithHandlerOptions(
stream.WithIdleTimeout(idleTimeout),
stream.WithMaxRetries(config.MaxRetries),
))
_ = r.RegisterCreated(service.handleEventPullReqCreated)
_ = r.RegisterBranchUpdated(service.handleEventPullReqBranchUpdated)
_ = r.RegisterReopened(service.handleEventPullReqReopened)
return nil
})
if err != nil {
return nil, fmt.Errorf("failed to launch pr events reader: %w", err)
}
return service, nil
}

View File

@ -0,0 +1,28 @@
// Copyright 2022 Harness Inc. All rights reserved.
// Use of this source code is governed by the Polyform Free Trial License
// that can be found in the LICENSE.md file for this repository.
package trigger
import (
"context"
"github.com/harness/gitness/events"
gitevents "github.com/harness/gitness/internal/events/git"
pullreqevents "github.com/harness/gitness/internal/events/pullreq"
"github.com/google/wire"
)
var WireSet = wire.NewSet(
ProvideService,
)
func ProvideService(
ctx context.Context,
config Config,
gitReaderFactory *events.ReaderFactory[*gitevents.Reader],
pullReqEvFactory *events.ReaderFactory[*pullreqevents.Reader],
) (*Service, error) {
return New(ctx, config, gitReaderFactory, pullReqEvFactory)
}

View File

@ -27,17 +27,15 @@ const (
type Config struct { type Config struct {
// UserAgentIdentity specifies the identity used for the user agent header // UserAgentIdentity specifies the identity used for the user agent header
// IMPORTANT: do not include version. // IMPORTANT: do not include version.
UserAgentIdentity string `envconfig:"GITNESS_WEBHOOK_USER_AGENT_IDENTITY" default:"Gitness"` UserAgentIdentity string
// HeaderIdentity specifies the identity used for headers in webhook calls (e.g. X-Gitness-Trigger, ...). // HeaderIdentity specifies the identity used for headers in webhook calls (e.g. X-Gitness-Trigger, ...).
// NOTE: If no value is provided, the UserAgentIdentity will be used. // NOTE: If no value is provided, the UserAgentIdentity will be used.
HeaderIdentity string `envconfig:"GITNESS_WEBHOOK_HEADER_IDENTITY"` HeaderIdentity string
// EventReaderName is the name used to read events from stream. EventReaderName string
// Note: this should be different for every running instance. Concurrency int
EventReaderName string `envconfig:"GITNESS_WEBHOOK_EVENT_READER_NAME"` MaxRetries int
Concurrency int `envconfig:"GITNESS_WEBHOOK_CONCURRENCY" default:"4"` AllowPrivateNetwork bool
MaxRetries int `envconfig:"GITNESS_WEBHOOK_MAX_RETRIES" default:"3"` AllowLoopback bool
AllowPrivateNetwork bool `envconfig:"GITNESS_WEBHOOK_ALLOW_PRIVATE_NETWORK" default:"false"`
AllowLoopback bool `envconfig:"GITNESS_WEBHOOK_ALLOW_LOOPBACK" default:"false"`
} }
func (c *Config) Prepare() error { func (c *Config) Prepare() error {
@ -91,7 +89,7 @@ func NewService(ctx context.Context, config Config,
repoStore store.RepoStore, pullreqStore store.PullReqStore, urlProvider *url.Provider, repoStore store.RepoStore, pullreqStore store.PullReqStore, urlProvider *url.Provider,
principalStore store.PrincipalStore, gitRPCClient gitrpc.Interface) (*Service, error) { principalStore store.PrincipalStore, gitRPCClient gitrpc.Interface) (*Service, error) {
if err := config.Prepare(); err != nil { if err := config.Prepare(); err != nil {
return nil, fmt.Errorf("provided config is invalid: %w", err) return nil, fmt.Errorf("provided webhook service config is invalid: %w", err)
} }
service := &Service{ service := &Service{
webhookStore: webhookStore, webhookStore: webhookStore,

View File

@ -7,6 +7,7 @@ package services
import ( import (
"github.com/harness/gitness/internal/services/job" "github.com/harness/gitness/internal/services/job"
"github.com/harness/gitness/internal/services/pullreq" "github.com/harness/gitness/internal/services/pullreq"
"github.com/harness/gitness/internal/services/trigger"
"github.com/harness/gitness/internal/services/webhook" "github.com/harness/gitness/internal/services/webhook"
"github.com/google/wire" "github.com/google/wire"
@ -19,20 +20,20 @@ var WireSet = wire.NewSet(
type Services struct { type Services struct {
Webhook *webhook.Service Webhook *webhook.Service
PullReq *pullreq.Service PullReq *pullreq.Service
JobExecutor *job.Executor Trigger *trigger.Service
JobScheduler *job.Scheduler JobScheduler *job.Scheduler
} }
func ProvideServices( func ProvideServices(
webhooksSrv *webhook.Service, webhooksSvc *webhook.Service,
pullReqSrv *pullreq.Service, pullReqSvc *pullreq.Service,
jobExecutor *job.Executor, triggerSvc *trigger.Service,
jobScheduler *job.Scheduler, jobScheduler *job.Scheduler,
) Services { ) Services {
return Services{ return Services{
Webhook: webhooksSrv, Webhook: webhooksSvc,
PullReq: pullReqSrv, PullReq: pullReqSvc,
JobExecutor: jobExecutor, Trigger: triggerSvc,
JobScheduler: jobScheduler, JobScheduler: jobScheduler,
} }
} }

View File

@ -200,4 +200,22 @@ type Config struct {
// finished and failed jobs will be purged from the DB. // finished and failed jobs will be purged from the DB.
PurgeFinishedOlderThan time.Duration `envconfig:"GITNESS_JOBS_PURGE_FINISHED_OLDER_THAN" default:"120h"` PurgeFinishedOlderThan time.Duration `envconfig:"GITNESS_JOBS_PURGE_FINISHED_OLDER_THAN" default:"120h"`
} }
Webhook struct {
// UserAgentIdentity specifies the identity used for the user agent header
// IMPORTANT: do not include version.
UserAgentIdentity string `envconfig:"GITNESS_WEBHOOK_USER_AGENT_IDENTITY" default:"Gitness"`
// HeaderIdentity specifies the identity used for headers in webhook calls (e.g. X-Gitness-Trigger, ...).
// NOTE: If no value is provided, the UserAgentIdentity will be used.
HeaderIdentity string `envconfig:"GITNESS_WEBHOOK_HEADER_IDENTITY"`
Concurrency int `envconfig:"GITNESS_WEBHOOK_CONCURRENCY" default:"4"`
MaxRetries int `envconfig:"GITNESS_WEBHOOK_MAX_RETRIES" default:"3"`
AllowPrivateNetwork bool `envconfig:"GITNESS_WEBHOOK_ALLOW_PRIVATE_NETWORK" default:"false"`
AllowLoopback bool `envconfig:"GITNESS_WEBHOOK_ALLOW_LOOPBACK" default:"false"`
}
Trigger struct {
Concurrency int `envconfig:"GITNESS_TRIGGER_CONCURRENCY" default:"4"`
MaxRetries int `envconfig:"GITNESS_TRIGGER_MAX_RETRIES" default:"3"`
}
} }