From 414993a388c2b0b4fda671ff225cb8d37afb6666 Mon Sep 17 00:00:00 2001 From: Vistaar Juneja Date: Thu, 7 Sep 2023 23:58:04 +0100 Subject: [PATCH] add pubsub for events --- cmd/gitness/wire.go | 2 + cmd/gitness/wire_gen.go | 24 +++-- internal/api/controller/space/controller.go | 5 +- internal/api/controller/space/events.go | 33 ++++++ internal/api/controller/space/wire.go | 8 +- internal/api/handler/space/events_stream.go | 110 ++++++++++++++++++++ internal/api/openapi/account.go | 2 +- internal/pipeline/events/events.go | 72 +++++++++++++ internal/pipeline/events/wire.go | 23 ++++ internal/pipeline/manager/manager.go | 9 +- internal/pipeline/manager/setup.go | 29 +++++- internal/pipeline/manager/teardown.go | 11 +- internal/pipeline/manager/updater.go | 33 ++++++ internal/pipeline/manager/wire.go | 4 +- internal/router/api.go | 2 + types/enum/event_types.go | 15 +++ 16 files changed, 362 insertions(+), 20 deletions(-) create mode 100644 internal/api/controller/space/events.go create mode 100644 internal/api/handler/space/events_stream.go create mode 100644 internal/pipeline/events/events.go create mode 100644 internal/pipeline/events/wire.go create mode 100644 types/enum/event_types.go diff --git a/cmd/gitness/wire.go b/cmd/gitness/wire.go index cf8070c30..b94c730db 100644 --- a/cmd/gitness/wire.go +++ b/cmd/gitness/wire.go @@ -41,6 +41,7 @@ import ( gitevents "github.com/harness/gitness/internal/events/git" pullreqevents "github.com/harness/gitness/internal/events/pullreq" "github.com/harness/gitness/internal/pipeline/commit" + eventsstream "github.com/harness/gitness/internal/pipeline/events" "github.com/harness/gitness/internal/pipeline/file" "github.com/harness/gitness/internal/pipeline/manager" "github.com/harness/gitness/internal/pipeline/runner" @@ -125,6 +126,7 @@ func initSystem(ctx context.Context, config *types.Config) (*cliserver.System, e triggerer.WireSet, file.WireSet, runner.WireSet, + eventsstream.WireSet, scheduler.WireSet, commit.WireSet, trigger.WireSet, diff --git a/cmd/gitness/wire_gen.go b/cmd/gitness/wire_gen.go index a46fe2c07..d92d499c2 100644 --- a/cmd/gitness/wire_gen.go +++ b/cmd/gitness/wire_gen.go @@ -10,7 +10,7 @@ import ( "context" "github.com/harness/gitness/cli/server" "github.com/harness/gitness/encrypt" - "github.com/harness/gitness/events" + events2 "github.com/harness/gitness/events" "github.com/harness/gitness/gitrpc" server3 "github.com/harness/gitness/gitrpc/server" "github.com/harness/gitness/gitrpc/server/cron" @@ -36,9 +36,10 @@ import ( "github.com/harness/gitness/internal/auth/authn" "github.com/harness/gitness/internal/auth/authz" "github.com/harness/gitness/internal/bootstrap" - events3 "github.com/harness/gitness/internal/events/git" - events2 "github.com/harness/gitness/internal/events/pullreq" + events4 "github.com/harness/gitness/internal/events/git" + events3 "github.com/harness/gitness/internal/events/pullreq" "github.com/harness/gitness/internal/pipeline/commit" + "github.com/harness/gitness/internal/pipeline/events" "github.com/harness/gitness/internal/pipeline/file" "github.com/harness/gitness/internal/pipeline/manager" "github.com/harness/gitness/internal/pipeline/runner" @@ -137,10 +138,13 @@ func initSystem(ctx context.Context, config *types.Config) (*server.System, erro logStore := logs.ProvideLogStore(db, config) logStream := livelog.ProvideLogStream(config) logsController := logs2.ProvideController(db, authorizer, executionStore, repoStore, pipelineStore, stageStore, stepStore, logStore, logStream) + pubsubConfig := pubsub.ProvideConfig(config) + pubSub := pubsub.ProvidePubSub(pubsubConfig, universalClient) + eventsEvents := events.ProvideEventsStreaming(pubSub) secretStore := database.ProvideSecretStore(db) connectorStore := database.ProvideConnectorStore(db) templateStore := database.ProvideTemplateStore(db) - spaceController := space.ProvideController(db, provider, pathUID, authorizer, pathStore, pipelineStore, secretStore, connectorStore, templateStore, spaceStore, repoStore, principalStore, repoController, membershipStore) + spaceController := space.ProvideController(db, provider, eventsEvents, pathUID, authorizer, pathStore, pipelineStore, secretStore, connectorStore, templateStore, spaceStore, repoStore, principalStore, repoController, membershipStore) pipelineController := pipeline.ProvideController(db, pathUID, pathStore, repoStore, authorizer, pipelineStore) encrypter, err := encrypt.ProvideEncrypter(config) if err != nil { @@ -162,11 +166,11 @@ func initSystem(ctx context.Context, config *types.Config) (*server.System, erro if err != nil { return nil, err } - eventsSystem, err := events.ProvideSystem(eventsConfig, universalClient) + eventsSystem, err := events2.ProvideSystem(eventsConfig, universalClient) if err != nil { return nil, err } - reporter, err := events2.ProvideReporter(eventsSystem) + reporter, err := events3.ProvideReporter(eventsSystem) if err != nil { return nil, err } @@ -178,11 +182,11 @@ func initSystem(ctx context.Context, config *types.Config) (*server.System, erro } webhookStore := database.ProvideWebhookStore(db) webhookExecutionStore := database.ProvideWebhookExecutionStore(db) - readerFactory, err := events3.ProvideReaderFactory(eventsSystem) + readerFactory, err := events4.ProvideReaderFactory(eventsSystem) if err != nil { return nil, err } - eventsReaderFactory, err := events2.ProvideReaderFactory(eventsSystem) + eventsReaderFactory, err := events3.ProvideReaderFactory(eventsSystem) if err != nil { return nil, err } @@ -191,7 +195,7 @@ func initSystem(ctx context.Context, config *types.Config) (*server.System, erro return nil, err } webhookController := webhook2.ProvideController(webhookConfig, db, authorizer, webhookStore, webhookExecutionStore, repoStore, webhookService) - eventsReporter, err := events3.ProvideReporter(eventsSystem) + eventsReporter, err := events4.ProvideReporter(eventsSystem) if err != nil { return nil, err } @@ -206,7 +210,7 @@ func initSystem(ctx context.Context, config *types.Config) (*server.System, erro webHandler := router.ProvideWebHandler(config) routerRouter := router.ProvideRouter(config, apiHandler, gitHandler, webHandler) serverServer := server2.ProvideServer(config, routerRouter) - executionManager := manager.ProvideExecutionManager(config, executionStore, pipelineStore, provider, fileService, logStore, logStream, repoStore, schedulerScheduler, secretStore, stageStore, stepStore, principalStore) + executionManager := manager.ProvideExecutionManager(config, executionStore, pipelineStore, provider, eventsEvents, fileService, logStore, logStream, repoStore, schedulerScheduler, secretStore, stageStore, stepStore, principalStore) client := manager.ProvideExecutionClient(executionManager, config) runtimeRunner, err := runner.ProvideExecutionRunner(config, client, executionManager) if err != nil { diff --git a/internal/api/controller/space/controller.go b/internal/api/controller/space/controller.go index 28780f7dd..8d9dd449a 100644 --- a/internal/api/controller/space/controller.go +++ b/internal/api/controller/space/controller.go @@ -7,6 +7,7 @@ package space import ( "github.com/harness/gitness/internal/api/controller/repo" "github.com/harness/gitness/internal/auth/authz" + "github.com/harness/gitness/internal/pipeline/events" "github.com/harness/gitness/internal/store" "github.com/harness/gitness/internal/url" "github.com/harness/gitness/types/check" @@ -17,6 +18,7 @@ import ( type Controller struct { db *sqlx.DB urlProvider *url.Provider + eventsStream events.Events uidCheck check.PathUID authorizer authz.Authorizer pathStore store.PathStore @@ -31,7 +33,7 @@ type Controller struct { membershipStore store.MembershipStore } -func NewController(db *sqlx.DB, urlProvider *url.Provider, +func NewController(db *sqlx.DB, urlProvider *url.Provider, eventsStream events.Events, uidCheck check.PathUID, authorizer authz.Authorizer, pathStore store.PathStore, pipelineStore store.PipelineStore, secretStore store.SecretStore, connectorStore store.ConnectorStore, templateStore store.TemplateStore, spaceStore store.SpaceStore, @@ -41,6 +43,7 @@ func NewController(db *sqlx.DB, urlProvider *url.Provider, return &Controller{ db: db, urlProvider: urlProvider, + eventsStream: eventsStream, uidCheck: uidCheck, authorizer: authorizer, pathStore: pathStore, diff --git a/internal/api/controller/space/events.go b/internal/api/controller/space/events.go new file mode 100644 index 000000000..23df138a4 --- /dev/null +++ b/internal/api/controller/space/events.go @@ -0,0 +1,33 @@ +// Copyright 2022 Harness Inc. All rights reserved. +// Use of this source code is governed by the Polyform Free Trial License +// that can be found in the LICENSE.md file for this repository. + +package space + +import ( + "context" + "fmt" + + apiauth "github.com/harness/gitness/internal/api/auth" + "github.com/harness/gitness/internal/auth" + "github.com/harness/gitness/internal/pipeline/events" + "github.com/harness/gitness/types/enum" +) + +func (c *Controller) Events( + ctx context.Context, + session *auth.Session, + spaceRef string, +) (<-chan *events.Event, <-chan error, error) { + space, err := c.spaceStore.FindByRef(ctx, spaceRef) + if err != nil { + return nil, nil, fmt.Errorf("failed to find space ref: %w", err) + } + + if err = apiauth.CheckSpace(ctx, c.authorizer, session, space, enum.PermissionSpaceView, true); err != nil { + return nil, nil, fmt.Errorf("failed to authorize stream: %w", err) + } + + events, errc := c.eventsStream.Subscribe(ctx, space.ID) + return events, errc, nil +} diff --git a/internal/api/controller/space/wire.go b/internal/api/controller/space/wire.go index 5dcbd7dec..6fa3022d5 100644 --- a/internal/api/controller/space/wire.go +++ b/internal/api/controller/space/wire.go @@ -7,6 +7,7 @@ package space import ( "github.com/harness/gitness/internal/api/controller/repo" "github.com/harness/gitness/internal/auth/authz" + "github.com/harness/gitness/internal/pipeline/events" "github.com/harness/gitness/internal/store" "github.com/harness/gitness/internal/url" "github.com/harness/gitness/types/check" @@ -20,13 +21,14 @@ var WireSet = wire.NewSet( ProvideController, ) -func ProvideController(db *sqlx.DB, urlProvider *url.Provider, uidCheck check.PathUID, authorizer authz.Authorizer, - pathStore store.PathStore, pipelineStore store.PipelineStore, secretStore store.SecretStore, +func ProvideController(db *sqlx.DB, urlProvider *url.Provider, eventsStream events.Events, + uidCheck check.PathUID, authorizer authz.Authorizer, pathStore store.PathStore, + pipelineStore store.PipelineStore, secretStore store.SecretStore, connectorStore store.ConnectorStore, templateStore store.TemplateStore, spaceStore store.SpaceStore, repoStore store.RepoStore, principalStore store.PrincipalStore, repoCtrl *repo.Controller, membershipStore store.MembershipStore, ) *Controller { - return NewController(db, urlProvider, uidCheck, authorizer, + return NewController(db, urlProvider, eventsStream, uidCheck, authorizer, pathStore, pipelineStore, secretStore, connectorStore, templateStore, spaceStore, repoStore, principalStore, repoCtrl, membershipStore) } diff --git a/internal/api/handler/space/events_stream.go b/internal/api/handler/space/events_stream.go new file mode 100644 index 000000000..4c45a9945 --- /dev/null +++ b/internal/api/handler/space/events_stream.go @@ -0,0 +1,110 @@ +// Copyright 2022 Harness Inc. All rights reserved. +// Use of this source code is governed by the Polyform Free Trial License +// that can be found in the LICENSE.md file for this repository. + +package space + +import ( + "context" + "encoding/json" + "io" + "net/http" + "time" + + "github.com/harness/gitness/internal/api/controller/space" + "github.com/harness/gitness/internal/api/render" + "github.com/harness/gitness/internal/api/request" + + "github.com/rs/zerolog/log" +) + +var ( + pingInterval = 30 * time.Second + tailMaxTime = 2 * time.Hour +) + +// HandleEventsStream returns an http.HandlerFunc that watches for +// events on a space +func HandleEventsStream(spaceCtrl *space.Controller) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + session, _ := request.AuthSessionFrom(ctx) + + spaceRef, err := request.GetSpaceRefFromPath(r) + if err != nil { + render.TranslatedUserError(w, err) + return + } + + f, ok := w.(http.Flusher) + if !ok { + log.Error().Msg("http writer type assertion failed") + render.InternalError(w) + return + } + + io.WriteString(w, ": ping\n\n") + f.Flush() + + events, errc, err := spaceCtrl.Events(ctx, session, spaceRef) + if err != nil { + render.TranslatedUserError(w, err) + return + } + + // could not get error channel + if errc == nil { + io.WriteString(w, "event: error\ndata: eof\n\n") + return + } + + h := w.Header() + h.Set("Content-Type", "text/event-stream") + h.Set("Cache-Control", "no-cache") + h.Set("Connection", "keep-alive") + h.Set("X-Accel-Buffering", "no") + h.Set("Access-Control-Allow-Origin", "*") + + ctx, cancel := context.WithTimeout(r.Context(), tailMaxTime) + defer cancel() + + enc := json.NewEncoder(w) + + pingTimer := time.NewTimer(pingInterval) + defer pingTimer.Stop() + L: + for { + // ensure timer is stopped before resetting (see documentation) + if !pingTimer.Stop() { + // in this specific case the timer's channel could be both, empty or full + select { + case <-pingTimer.C: + default: + } + } + pingTimer.Reset(pingInterval) + select { + case <-ctx.Done(): + log.Debug().Msg("events: stream cancelled") + break L + case err := <-errc: + log.Err(err).Msg("events: received error in the tail channel") + break L + case <-pingTimer.C: + // if time b/w messages takes longer, send a ping + io.WriteString(w, ": ping\n\n") + f.Flush() + case event := <-events: + io.WriteString(w, "data: ") + enc.Encode(event) + io.WriteString(w, "\n\n") + f.Flush() + } + } + + io.WriteString(w, "event: error\ndata: eof\n\n") + f.Flush() + + log.Debug().Msg("events: stream closed") + } +} diff --git a/internal/api/openapi/account.go b/internal/api/openapi/account.go index 39bf47930..c97c1688c 100644 --- a/internal/api/openapi/account.go +++ b/internal/api/openapi/account.go @@ -7,12 +7,12 @@ package openapi import ( "net/http" - "github.com/gotidy/ptr" "github.com/harness/gitness/internal/api/controller/user" "github.com/harness/gitness/internal/api/request" "github.com/harness/gitness/internal/api/usererror" "github.com/harness/gitness/types" + "github.com/gotidy/ptr" "github.com/swaggest/openapi-go/openapi3" ) diff --git a/internal/pipeline/events/events.go b/internal/pipeline/events/events.go new file mode 100644 index 000000000..dbc26e2f7 --- /dev/null +++ b/internal/pipeline/events/events.go @@ -0,0 +1,72 @@ +// Copyright 2022 Harness Inc. All rights reserved. +// Use of this source code is governed by the Polyform Free Trial License +// that can be found in the LICENSE.md file for this repository. + +package events + +import ( + "context" + "encoding/json" + "strconv" + + "github.com/harness/gitness/pubsub" + "github.com/harness/gitness/types/enum" +) + +// Event is an event which is sent to the UI via server-sent events. +type Event struct { + Type enum.EventType `json:"type"` + Data json.RawMessage `json:"data"` +} + +type Events interface { + // Publish publishes an event to a given space ID. + Publish(ctx context.Context, spaceID int64, event *Event) error + + // Subscribe listens to events on a space ID. + Subscribe(ctx context.Context, spaceID int64) (<-chan *Event, <-chan error) +} + +type event struct { + pubsub pubsub.PubSub + topic string +} + +func New(pubsub pubsub.PubSub, topic string) Events { + return &event{ + pubsub: pubsub, + topic: topic, + } +} + +func (e *event) Publish(ctx context.Context, spaceID int64, event *Event) error { + bytes, err := json.Marshal(event) + if err != nil { + return err + } + option := pubsub.WithPublishNamespace(format(spaceID)) + return e.pubsub.Publish(ctx, e.topic, bytes, option) +} + +// format creates the namespace name which will be spaces- +func format(id int64) string { + return "spaces-" + strconv.Itoa(int(id)) +} + +func (e *event) Subscribe(ctx context.Context, spaceID int64) (<-chan *Event, <-chan error) { + chEvent := make(chan *Event, 100) // TODO: check best size here + chErr := make(chan error) + g := func(payload []byte) error { + event := &Event{} + err := json.Unmarshal(payload, event) + if err != nil { + // This should never happen + return err + } + chEvent <- event + return nil + } + option := pubsub.WithChannelNamespace(format(spaceID)) + e.pubsub.Subscribe(ctx, e.topic, g, option) + return chEvent, chErr +} diff --git a/internal/pipeline/events/wire.go b/internal/pipeline/events/wire.go new file mode 100644 index 000000000..6879e8782 --- /dev/null +++ b/internal/pipeline/events/wire.go @@ -0,0 +1,23 @@ +// Copyright 2022 Harness Inc. All rights reserved. +// Use of this source code is governed by the Polyform Free Trial License +// that can be found in the LICENSE.md file for this repository. + +package events + +import ( + "github.com/harness/gitness/pubsub" + + "github.com/google/wire" +) + +// WireSet provides a wire set for this package. +var WireSet = wire.NewSet( + ProvideEventsStreaming, +) + +func ProvideEventsStreaming(pubsub pubsub.PubSub) Events { + return &event{ + pubsub: pubsub, + topic: "events", + } +} diff --git a/internal/pipeline/manager/manager.go b/internal/pipeline/manager/manager.go index 9a9a82529..1500c0f5a 100644 --- a/internal/pipeline/manager/manager.go +++ b/internal/pipeline/manager/manager.go @@ -11,6 +11,7 @@ import ( "io" "time" + "github.com/harness/gitness/internal/pipeline/events" "github.com/harness/gitness/internal/pipeline/file" "github.com/harness/gitness/internal/pipeline/scheduler" "github.com/harness/gitness/internal/store" @@ -98,7 +99,7 @@ type Manager struct { Pipelines store.PipelineStore urlProvider *urlprovider.Provider // Converter store.ConvertService - // Events store.Pubsub + Events events.Events // Globals store.GlobalSecretStore Logs store.LogStore Logz livelog.LogStream @@ -119,6 +120,7 @@ func New( executionStore store.ExecutionStore, pipelineStore store.PipelineStore, urlProvider *urlprovider.Provider, + events events.Events, fileService file.FileService, logStore store.LogStore, logStream livelog.LogStream, @@ -134,6 +136,7 @@ func New( Executions: executionStore, Pipelines: pipelineStore, urlProvider: urlProvider, + Events: events, FileService: fileService, Logs: logStore, Logz: logStream, @@ -313,6 +316,7 @@ func (m *Manager) BeforeStep(ctx context.Context, step *types.Step) error { } updater := &updater{ Executions: m.Executions, + Events: m.Events, Repos: m.Repos, Steps: m.Steps, Stages: m.Stages, @@ -332,6 +336,7 @@ func (m *Manager) AfterStep(ctx context.Context, step *types.Step) error { var retErr error updater := &updater{ Executions: m.Executions, + Events: m.Events, Repos: m.Repos, Steps: m.Steps, Stages: m.Stages, @@ -352,6 +357,7 @@ func (m *Manager) AfterStep(ctx context.Context, step *types.Step) error { func (m *Manager) BeforeStage(ctx context.Context, stage *types.Stage) error { s := &setup{ Executions: m.Executions, + Events: m.Events, Repos: m.Repos, Steps: m.Steps, Stages: m.Stages, @@ -365,6 +371,7 @@ func (m *Manager) BeforeStage(ctx context.Context, stage *types.Stage) error { func (m *Manager) AfterStage(ctx context.Context, stage *types.Stage) error { t := &teardown{ Executions: m.Executions, + Events: m.Events, Logs: m.Logz, Repos: m.Repos, Scheduler: m.Scheduler, diff --git a/internal/pipeline/manager/setup.go b/internal/pipeline/manager/setup.go index c7bb9e40c..457878fc0 100644 --- a/internal/pipeline/manager/setup.go +++ b/internal/pipeline/manager/setup.go @@ -6,9 +6,11 @@ package manager import ( "context" + "encoding/json" "errors" "time" + "github.com/harness/gitness/internal/pipeline/events" "github.com/harness/gitness/internal/store" gitness_store "github.com/harness/gitness/store" "github.com/harness/gitness/types" @@ -19,6 +21,7 @@ import ( type setup struct { Executions store.ExecutionStore + Events events.Events Repos store.RepoStore Steps store.StepStore Stages store.StageStore @@ -39,7 +42,7 @@ func (s *setup) do(ctx context.Context, stage *types.Stage) error { Int64("repo.id", execution.RepoID). Logger() - _, err = s.Repos.Find(noContext, execution.RepoID) + repo, err := s.Repos.Find(noContext, execution.RepoID) if err != nil { log.Error().Err(err).Msg("manager: cannot find the repository") return err @@ -72,15 +75,37 @@ func (s *setup) do(ctx context.Context, stage *types.Stage) error { } } - _, err = s.updateExecution(ctx, execution) + _, err = s.updateExecution(noContext, execution) if err != nil { log.Error().Err(err).Msg("manager: cannot update the execution") return err } + stages, err := s.Stages.ListWithSteps(noContext, execution.ID) + if err != nil { + log.Error().Err(err).Msg("manager: could not list stages with steps") + return err + } + execution.Stages = stages + err = s.Events.Publish(noContext, repo.ParentID, executionEvent(enum.ExecutionRunning, execution)) + if err != nil { + log.Warn().Err(err).Msg("manager: could not publish execution event") + } return nil } +func executionEvent( + eventType enum.EventType, + execution *types.Execution, +) *events.Event { + // json.Marshal will not return an error here, it can be absorbed + bytes, _ := json.Marshal(execution) + return &events.Event{ + Type: eventType, + Data: bytes, + } +} + // helper function that updates the execution status from pending to running. // This accounts for the fact that another agent may have already updated // the execution status, which may happen if two stages execute concurrently. diff --git a/internal/pipeline/manager/teardown.go b/internal/pipeline/manager/teardown.go index 9365bc787..25cbb180c 100644 --- a/internal/pipeline/manager/teardown.go +++ b/internal/pipeline/manager/teardown.go @@ -8,6 +8,7 @@ import ( "context" "time" + "github.com/harness/gitness/internal/pipeline/events" "github.com/harness/gitness/internal/pipeline/scheduler" "github.com/harness/gitness/internal/store" "github.com/harness/gitness/livelog" @@ -20,6 +21,7 @@ import ( type teardown struct { Executions store.ExecutionStore + Events events.Events Logs livelog.LogStream Scheduler scheduler.Scheduler Repos store.RepoStore @@ -46,7 +48,7 @@ func (t *teardown) do(ctx context.Context, stage *types.Stage) error { Str("stage.status", stage.Status). Logger() - _, err = t.Repos.Find(noContext, execution.RepoID) + repo, err := t.Repos.Find(noContext, execution.RepoID) if err != nil { log.Error().Err(err).Msg("manager: cannot find the repository") return err @@ -131,6 +133,13 @@ func (t *teardown) do(ctx context.Context, stage *types.Stage) error { return err } + execution.Stages = stages + err = t.Events.Publish(noContext, repo.ParentID, executionEvent(enum.ExecutionCompleted, execution)) + if err != nil { + log.Warn().Err(err). + Msg("manager: could not publish execution completed event") + } + return nil } diff --git a/internal/pipeline/manager/updater.go b/internal/pipeline/manager/updater.go index 4a7787063..13d068486 100644 --- a/internal/pipeline/manager/updater.go +++ b/internal/pipeline/manager/updater.go @@ -7,8 +7,10 @@ package manager import ( "context" + "github.com/harness/gitness/internal/pipeline/events" "github.com/harness/gitness/internal/store" "github.com/harness/gitness/types" + "github.com/harness/gitness/types/enum" "github.com/rs/zerolog/log" ) @@ -16,6 +18,7 @@ import ( type updater struct { Executions store.ExecutionStore Repos store.RepoStore + Events events.Events Steps store.StepStore Stages store.StageStore } @@ -36,5 +39,35 @@ func (u *updater) do(ctx context.Context, step *types.Step) error { return err } + stage, err := u.Stages.Find(noContext, step.StageID) + if err != nil { + log.Error().Err(err).Msg("manager: cannot find stage") + return nil + } + + execution, err := u.Executions.Find(noContext, stage.ExecutionID) + if err != nil { + log.Error().Err(err).Msg("manager: cannot find execution") + return nil + } + + repo, err := u.Repos.Find(noContext, execution.RepoID) + if err != nil { + log.Error().Err(err).Msg("manager: cannot find repo") + return nil + } + + stages, err := u.Stages.ListWithSteps(noContext, stage.ExecutionID) + if err != nil { + log.Error().Err(err).Msg("manager: cannot find stages") + return nil + } + + execution.Stages = stages + err = u.Events.Publish(noContext, repo.ParentID, executionEvent(enum.ExecutionUpdated, execution)) + if err != nil { + log.Warn().Err(err).Msg("manager: cannot publish execution updated event") + } + return nil } diff --git a/internal/pipeline/manager/wire.go b/internal/pipeline/manager/wire.go index 7e56d5bec..4849ea89c 100644 --- a/internal/pipeline/manager/wire.go +++ b/internal/pipeline/manager/wire.go @@ -5,6 +5,7 @@ package manager import ( + "github.com/harness/gitness/internal/pipeline/events" "github.com/harness/gitness/internal/pipeline/file" "github.com/harness/gitness/internal/pipeline/scheduler" "github.com/harness/gitness/internal/store" @@ -28,6 +29,7 @@ func ProvideExecutionManager( executionStore store.ExecutionStore, pipelineStore store.PipelineStore, urlProvider *url.Provider, + events events.Events, fileService file.FileService, logStore store.LogStore, logStream livelog.LogStream, @@ -37,7 +39,7 @@ func ProvideExecutionManager( stageStore store.StageStore, stepStore store.StepStore, userStore store.PrincipalStore) ExecutionManager { - return New(config, executionStore, pipelineStore, urlProvider, fileService, logStore, + return New(config, executionStore, pipelineStore, urlProvider, events, fileService, logStore, logStream, repoStore, scheduler, secretStore, stageStore, stepStore, userStore) } diff --git a/internal/router/api.go b/internal/router/api.go index 7713d898c..c987315a3 100644 --- a/internal/router/api.go +++ b/internal/router/api.go @@ -188,6 +188,8 @@ func setupSpaces(r chi.Router, spaceCtrl *space.Controller) { r.Patch("/", handlerspace.HandleUpdate(spaceCtrl)) r.Delete("/", handlerspace.HandleDelete(spaceCtrl)) + r.Get("/stream", handlerspace.HandleEventsStream(spaceCtrl)) + r.Post("/move", handlerspace.HandleMove(spaceCtrl)) r.Get("/spaces", handlerspace.HandleListSpaces(spaceCtrl)) r.Get("/repos", handlerspace.HandleListRepos(spaceCtrl)) diff --git a/types/enum/event_types.go b/types/enum/event_types.go new file mode 100644 index 000000000..18d0d171c --- /dev/null +++ b/types/enum/event_types.go @@ -0,0 +1,15 @@ +// Copyright 2022 Harness Inc. All rights reserved. +// Use of this source code is governed by the Polyform Free Trial License +// that can be found in the LICENSE.md file for this repository. + +// Enums for event types delivered to the event stream for the UI +package enum + +// EventType defines the kind of event +type EventType string + +const ( + ExecutionUpdated = "execution_updated" + ExecutionRunning = "execution_running" + ExecutionCompleted = "execution_completed" +)