add pubsub for events

This commit is contained in:
Vistaar Juneja 2023-09-07 23:58:04 +01:00
parent e72af655d5
commit 414993a388
16 changed files with 362 additions and 20 deletions

View File

@ -41,6 +41,7 @@ import (
gitevents "github.com/harness/gitness/internal/events/git"
pullreqevents "github.com/harness/gitness/internal/events/pullreq"
"github.com/harness/gitness/internal/pipeline/commit"
eventsstream "github.com/harness/gitness/internal/pipeline/events"
"github.com/harness/gitness/internal/pipeline/file"
"github.com/harness/gitness/internal/pipeline/manager"
"github.com/harness/gitness/internal/pipeline/runner"
@ -125,6 +126,7 @@ func initSystem(ctx context.Context, config *types.Config) (*cliserver.System, e
triggerer.WireSet,
file.WireSet,
runner.WireSet,
eventsstream.WireSet,
scheduler.WireSet,
commit.WireSet,
trigger.WireSet,

View File

@ -10,7 +10,7 @@ import (
"context"
"github.com/harness/gitness/cli/server"
"github.com/harness/gitness/encrypt"
"github.com/harness/gitness/events"
events2 "github.com/harness/gitness/events"
"github.com/harness/gitness/gitrpc"
server3 "github.com/harness/gitness/gitrpc/server"
"github.com/harness/gitness/gitrpc/server/cron"
@ -36,9 +36,10 @@ import (
"github.com/harness/gitness/internal/auth/authn"
"github.com/harness/gitness/internal/auth/authz"
"github.com/harness/gitness/internal/bootstrap"
events3 "github.com/harness/gitness/internal/events/git"
events2 "github.com/harness/gitness/internal/events/pullreq"
events4 "github.com/harness/gitness/internal/events/git"
events3 "github.com/harness/gitness/internal/events/pullreq"
"github.com/harness/gitness/internal/pipeline/commit"
"github.com/harness/gitness/internal/pipeline/events"
"github.com/harness/gitness/internal/pipeline/file"
"github.com/harness/gitness/internal/pipeline/manager"
"github.com/harness/gitness/internal/pipeline/runner"
@ -137,10 +138,13 @@ func initSystem(ctx context.Context, config *types.Config) (*server.System, erro
logStore := logs.ProvideLogStore(db, config)
logStream := livelog.ProvideLogStream(config)
logsController := logs2.ProvideController(db, authorizer, executionStore, repoStore, pipelineStore, stageStore, stepStore, logStore, logStream)
pubsubConfig := pubsub.ProvideConfig(config)
pubSub := pubsub.ProvidePubSub(pubsubConfig, universalClient)
eventsEvents := events.ProvideEventsStreaming(pubSub)
secretStore := database.ProvideSecretStore(db)
connectorStore := database.ProvideConnectorStore(db)
templateStore := database.ProvideTemplateStore(db)
spaceController := space.ProvideController(db, provider, pathUID, authorizer, pathStore, pipelineStore, secretStore, connectorStore, templateStore, spaceStore, repoStore, principalStore, repoController, membershipStore)
spaceController := space.ProvideController(db, provider, eventsEvents, pathUID, authorizer, pathStore, pipelineStore, secretStore, connectorStore, templateStore, spaceStore, repoStore, principalStore, repoController, membershipStore)
pipelineController := pipeline.ProvideController(db, pathUID, pathStore, repoStore, authorizer, pipelineStore)
encrypter, err := encrypt.ProvideEncrypter(config)
if err != nil {
@ -162,11 +166,11 @@ func initSystem(ctx context.Context, config *types.Config) (*server.System, erro
if err != nil {
return nil, err
}
eventsSystem, err := events.ProvideSystem(eventsConfig, universalClient)
eventsSystem, err := events2.ProvideSystem(eventsConfig, universalClient)
if err != nil {
return nil, err
}
reporter, err := events2.ProvideReporter(eventsSystem)
reporter, err := events3.ProvideReporter(eventsSystem)
if err != nil {
return nil, err
}
@ -178,11 +182,11 @@ func initSystem(ctx context.Context, config *types.Config) (*server.System, erro
}
webhookStore := database.ProvideWebhookStore(db)
webhookExecutionStore := database.ProvideWebhookExecutionStore(db)
readerFactory, err := events3.ProvideReaderFactory(eventsSystem)
readerFactory, err := events4.ProvideReaderFactory(eventsSystem)
if err != nil {
return nil, err
}
eventsReaderFactory, err := events2.ProvideReaderFactory(eventsSystem)
eventsReaderFactory, err := events3.ProvideReaderFactory(eventsSystem)
if err != nil {
return nil, err
}
@ -191,7 +195,7 @@ func initSystem(ctx context.Context, config *types.Config) (*server.System, erro
return nil, err
}
webhookController := webhook2.ProvideController(webhookConfig, db, authorizer, webhookStore, webhookExecutionStore, repoStore, webhookService)
eventsReporter, err := events3.ProvideReporter(eventsSystem)
eventsReporter, err := events4.ProvideReporter(eventsSystem)
if err != nil {
return nil, err
}
@ -206,7 +210,7 @@ func initSystem(ctx context.Context, config *types.Config) (*server.System, erro
webHandler := router.ProvideWebHandler(config)
routerRouter := router.ProvideRouter(config, apiHandler, gitHandler, webHandler)
serverServer := server2.ProvideServer(config, routerRouter)
executionManager := manager.ProvideExecutionManager(config, executionStore, pipelineStore, provider, fileService, logStore, logStream, repoStore, schedulerScheduler, secretStore, stageStore, stepStore, principalStore)
executionManager := manager.ProvideExecutionManager(config, executionStore, pipelineStore, provider, eventsEvents, fileService, logStore, logStream, repoStore, schedulerScheduler, secretStore, stageStore, stepStore, principalStore)
client := manager.ProvideExecutionClient(executionManager, config)
runtimeRunner, err := runner.ProvideExecutionRunner(config, client, executionManager)
if err != nil {

View File

@ -7,6 +7,7 @@ package space
import (
"github.com/harness/gitness/internal/api/controller/repo"
"github.com/harness/gitness/internal/auth/authz"
"github.com/harness/gitness/internal/pipeline/events"
"github.com/harness/gitness/internal/store"
"github.com/harness/gitness/internal/url"
"github.com/harness/gitness/types/check"
@ -17,6 +18,7 @@ import (
type Controller struct {
db *sqlx.DB
urlProvider *url.Provider
eventsStream events.Events
uidCheck check.PathUID
authorizer authz.Authorizer
pathStore store.PathStore
@ -31,7 +33,7 @@ type Controller struct {
membershipStore store.MembershipStore
}
func NewController(db *sqlx.DB, urlProvider *url.Provider,
func NewController(db *sqlx.DB, urlProvider *url.Provider, eventsStream events.Events,
uidCheck check.PathUID, authorizer authz.Authorizer,
pathStore store.PathStore, pipelineStore store.PipelineStore, secretStore store.SecretStore,
connectorStore store.ConnectorStore, templateStore store.TemplateStore, spaceStore store.SpaceStore,
@ -41,6 +43,7 @@ func NewController(db *sqlx.DB, urlProvider *url.Provider,
return &Controller{
db: db,
urlProvider: urlProvider,
eventsStream: eventsStream,
uidCheck: uidCheck,
authorizer: authorizer,
pathStore: pathStore,

View File

@ -0,0 +1,33 @@
// Copyright 2022 Harness Inc. All rights reserved.
// Use of this source code is governed by the Polyform Free Trial License
// that can be found in the LICENSE.md file for this repository.
package space
import (
"context"
"fmt"
apiauth "github.com/harness/gitness/internal/api/auth"
"github.com/harness/gitness/internal/auth"
"github.com/harness/gitness/internal/pipeline/events"
"github.com/harness/gitness/types/enum"
)
func (c *Controller) Events(
ctx context.Context,
session *auth.Session,
spaceRef string,
) (<-chan *events.Event, <-chan error, error) {
space, err := c.spaceStore.FindByRef(ctx, spaceRef)
if err != nil {
return nil, nil, fmt.Errorf("failed to find space ref: %w", err)
}
if err = apiauth.CheckSpace(ctx, c.authorizer, session, space, enum.PermissionSpaceView, true); err != nil {
return nil, nil, fmt.Errorf("failed to authorize stream: %w", err)
}
events, errc := c.eventsStream.Subscribe(ctx, space.ID)
return events, errc, nil
}

View File

@ -7,6 +7,7 @@ package space
import (
"github.com/harness/gitness/internal/api/controller/repo"
"github.com/harness/gitness/internal/auth/authz"
"github.com/harness/gitness/internal/pipeline/events"
"github.com/harness/gitness/internal/store"
"github.com/harness/gitness/internal/url"
"github.com/harness/gitness/types/check"
@ -20,13 +21,14 @@ var WireSet = wire.NewSet(
ProvideController,
)
func ProvideController(db *sqlx.DB, urlProvider *url.Provider, uidCheck check.PathUID, authorizer authz.Authorizer,
pathStore store.PathStore, pipelineStore store.PipelineStore, secretStore store.SecretStore,
func ProvideController(db *sqlx.DB, urlProvider *url.Provider, eventsStream events.Events,
uidCheck check.PathUID, authorizer authz.Authorizer, pathStore store.PathStore,
pipelineStore store.PipelineStore, secretStore store.SecretStore,
connectorStore store.ConnectorStore, templateStore store.TemplateStore,
spaceStore store.SpaceStore, repoStore store.RepoStore, principalStore store.PrincipalStore,
repoCtrl *repo.Controller, membershipStore store.MembershipStore,
) *Controller {
return NewController(db, urlProvider, uidCheck, authorizer,
return NewController(db, urlProvider, eventsStream, uidCheck, authorizer,
pathStore, pipelineStore, secretStore, connectorStore, templateStore,
spaceStore, repoStore, principalStore, repoCtrl, membershipStore)
}

View File

@ -0,0 +1,110 @@
// Copyright 2022 Harness Inc. All rights reserved.
// Use of this source code is governed by the Polyform Free Trial License
// that can be found in the LICENSE.md file for this repository.
package space
import (
"context"
"encoding/json"
"io"
"net/http"
"time"
"github.com/harness/gitness/internal/api/controller/space"
"github.com/harness/gitness/internal/api/render"
"github.com/harness/gitness/internal/api/request"
"github.com/rs/zerolog/log"
)
var (
pingInterval = 30 * time.Second
tailMaxTime = 2 * time.Hour
)
// HandleEventsStream returns an http.HandlerFunc that watches for
// events on a space
func HandleEventsStream(spaceCtrl *space.Controller) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
session, _ := request.AuthSessionFrom(ctx)
spaceRef, err := request.GetSpaceRefFromPath(r)
if err != nil {
render.TranslatedUserError(w, err)
return
}
f, ok := w.(http.Flusher)
if !ok {
log.Error().Msg("http writer type assertion failed")
render.InternalError(w)
return
}
io.WriteString(w, ": ping\n\n")
f.Flush()
events, errc, err := spaceCtrl.Events(ctx, session, spaceRef)
if err != nil {
render.TranslatedUserError(w, err)
return
}
// could not get error channel
if errc == nil {
io.WriteString(w, "event: error\ndata: eof\n\n")
return
}
h := w.Header()
h.Set("Content-Type", "text/event-stream")
h.Set("Cache-Control", "no-cache")
h.Set("Connection", "keep-alive")
h.Set("X-Accel-Buffering", "no")
h.Set("Access-Control-Allow-Origin", "*")
ctx, cancel := context.WithTimeout(r.Context(), tailMaxTime)
defer cancel()
enc := json.NewEncoder(w)
pingTimer := time.NewTimer(pingInterval)
defer pingTimer.Stop()
L:
for {
// ensure timer is stopped before resetting (see documentation)
if !pingTimer.Stop() {
// in this specific case the timer's channel could be both, empty or full
select {
case <-pingTimer.C:
default:
}
}
pingTimer.Reset(pingInterval)
select {
case <-ctx.Done():
log.Debug().Msg("events: stream cancelled")
break L
case err := <-errc:
log.Err(err).Msg("events: received error in the tail channel")
break L
case <-pingTimer.C:
// if time b/w messages takes longer, send a ping
io.WriteString(w, ": ping\n\n")
f.Flush()
case event := <-events:
io.WriteString(w, "data: ")
enc.Encode(event)
io.WriteString(w, "\n\n")
f.Flush()
}
}
io.WriteString(w, "event: error\ndata: eof\n\n")
f.Flush()
log.Debug().Msg("events: stream closed")
}
}

View File

@ -7,12 +7,12 @@ package openapi
import (
"net/http"
"github.com/gotidy/ptr"
"github.com/harness/gitness/internal/api/controller/user"
"github.com/harness/gitness/internal/api/request"
"github.com/harness/gitness/internal/api/usererror"
"github.com/harness/gitness/types"
"github.com/gotidy/ptr"
"github.com/swaggest/openapi-go/openapi3"
)

View File

@ -0,0 +1,72 @@
// Copyright 2022 Harness Inc. All rights reserved.
// Use of this source code is governed by the Polyform Free Trial License
// that can be found in the LICENSE.md file for this repository.
package events
import (
"context"
"encoding/json"
"strconv"
"github.com/harness/gitness/pubsub"
"github.com/harness/gitness/types/enum"
)
// Event is an event which is sent to the UI via server-sent events.
type Event struct {
Type enum.EventType `json:"type"`
Data json.RawMessage `json:"data"`
}
type Events interface {
// Publish publishes an event to a given space ID.
Publish(ctx context.Context, spaceID int64, event *Event) error
// Subscribe listens to events on a space ID.
Subscribe(ctx context.Context, spaceID int64) (<-chan *Event, <-chan error)
}
type event struct {
pubsub pubsub.PubSub
topic string
}
func New(pubsub pubsub.PubSub, topic string) Events {
return &event{
pubsub: pubsub,
topic: topic,
}
}
func (e *event) Publish(ctx context.Context, spaceID int64, event *Event) error {
bytes, err := json.Marshal(event)
if err != nil {
return err
}
option := pubsub.WithPublishNamespace(format(spaceID))
return e.pubsub.Publish(ctx, e.topic, bytes, option)
}
// format creates the namespace name which will be spaces-<id>
func format(id int64) string {
return "spaces-" + strconv.Itoa(int(id))
}
func (e *event) Subscribe(ctx context.Context, spaceID int64) (<-chan *Event, <-chan error) {
chEvent := make(chan *Event, 100) // TODO: check best size here
chErr := make(chan error)
g := func(payload []byte) error {
event := &Event{}
err := json.Unmarshal(payload, event)
if err != nil {
// This should never happen
return err
}
chEvent <- event
return nil
}
option := pubsub.WithChannelNamespace(format(spaceID))
e.pubsub.Subscribe(ctx, e.topic, g, option)
return chEvent, chErr
}

View File

@ -0,0 +1,23 @@
// Copyright 2022 Harness Inc. All rights reserved.
// Use of this source code is governed by the Polyform Free Trial License
// that can be found in the LICENSE.md file for this repository.
package events
import (
"github.com/harness/gitness/pubsub"
"github.com/google/wire"
)
// WireSet provides a wire set for this package.
var WireSet = wire.NewSet(
ProvideEventsStreaming,
)
func ProvideEventsStreaming(pubsub pubsub.PubSub) Events {
return &event{
pubsub: pubsub,
topic: "events",
}
}

View File

@ -11,6 +11,7 @@ import (
"io"
"time"
"github.com/harness/gitness/internal/pipeline/events"
"github.com/harness/gitness/internal/pipeline/file"
"github.com/harness/gitness/internal/pipeline/scheduler"
"github.com/harness/gitness/internal/store"
@ -98,7 +99,7 @@ type Manager struct {
Pipelines store.PipelineStore
urlProvider *urlprovider.Provider
// Converter store.ConvertService
// Events store.Pubsub
Events events.Events
// Globals store.GlobalSecretStore
Logs store.LogStore
Logz livelog.LogStream
@ -119,6 +120,7 @@ func New(
executionStore store.ExecutionStore,
pipelineStore store.PipelineStore,
urlProvider *urlprovider.Provider,
events events.Events,
fileService file.FileService,
logStore store.LogStore,
logStream livelog.LogStream,
@ -134,6 +136,7 @@ func New(
Executions: executionStore,
Pipelines: pipelineStore,
urlProvider: urlProvider,
Events: events,
FileService: fileService,
Logs: logStore,
Logz: logStream,
@ -313,6 +316,7 @@ func (m *Manager) BeforeStep(ctx context.Context, step *types.Step) error {
}
updater := &updater{
Executions: m.Executions,
Events: m.Events,
Repos: m.Repos,
Steps: m.Steps,
Stages: m.Stages,
@ -332,6 +336,7 @@ func (m *Manager) AfterStep(ctx context.Context, step *types.Step) error {
var retErr error
updater := &updater{
Executions: m.Executions,
Events: m.Events,
Repos: m.Repos,
Steps: m.Steps,
Stages: m.Stages,
@ -352,6 +357,7 @@ func (m *Manager) AfterStep(ctx context.Context, step *types.Step) error {
func (m *Manager) BeforeStage(ctx context.Context, stage *types.Stage) error {
s := &setup{
Executions: m.Executions,
Events: m.Events,
Repos: m.Repos,
Steps: m.Steps,
Stages: m.Stages,
@ -365,6 +371,7 @@ func (m *Manager) BeforeStage(ctx context.Context, stage *types.Stage) error {
func (m *Manager) AfterStage(ctx context.Context, stage *types.Stage) error {
t := &teardown{
Executions: m.Executions,
Events: m.Events,
Logs: m.Logz,
Repos: m.Repos,
Scheduler: m.Scheduler,

View File

@ -6,9 +6,11 @@ package manager
import (
"context"
"encoding/json"
"errors"
"time"
"github.com/harness/gitness/internal/pipeline/events"
"github.com/harness/gitness/internal/store"
gitness_store "github.com/harness/gitness/store"
"github.com/harness/gitness/types"
@ -19,6 +21,7 @@ import (
type setup struct {
Executions store.ExecutionStore
Events events.Events
Repos store.RepoStore
Steps store.StepStore
Stages store.StageStore
@ -39,7 +42,7 @@ func (s *setup) do(ctx context.Context, stage *types.Stage) error {
Int64("repo.id", execution.RepoID).
Logger()
_, err = s.Repos.Find(noContext, execution.RepoID)
repo, err := s.Repos.Find(noContext, execution.RepoID)
if err != nil {
log.Error().Err(err).Msg("manager: cannot find the repository")
return err
@ -72,15 +75,37 @@ func (s *setup) do(ctx context.Context, stage *types.Stage) error {
}
}
_, err = s.updateExecution(ctx, execution)
_, err = s.updateExecution(noContext, execution)
if err != nil {
log.Error().Err(err).Msg("manager: cannot update the execution")
return err
}
stages, err := s.Stages.ListWithSteps(noContext, execution.ID)
if err != nil {
log.Error().Err(err).Msg("manager: could not list stages with steps")
return err
}
execution.Stages = stages
err = s.Events.Publish(noContext, repo.ParentID, executionEvent(enum.ExecutionRunning, execution))
if err != nil {
log.Warn().Err(err).Msg("manager: could not publish execution event")
}
return nil
}
func executionEvent(
eventType enum.EventType,
execution *types.Execution,
) *events.Event {
// json.Marshal will not return an error here, it can be absorbed
bytes, _ := json.Marshal(execution)
return &events.Event{
Type: eventType,
Data: bytes,
}
}
// helper function that updates the execution status from pending to running.
// This accounts for the fact that another agent may have already updated
// the execution status, which may happen if two stages execute concurrently.

View File

@ -8,6 +8,7 @@ import (
"context"
"time"
"github.com/harness/gitness/internal/pipeline/events"
"github.com/harness/gitness/internal/pipeline/scheduler"
"github.com/harness/gitness/internal/store"
"github.com/harness/gitness/livelog"
@ -20,6 +21,7 @@ import (
type teardown struct {
Executions store.ExecutionStore
Events events.Events
Logs livelog.LogStream
Scheduler scheduler.Scheduler
Repos store.RepoStore
@ -46,7 +48,7 @@ func (t *teardown) do(ctx context.Context, stage *types.Stage) error {
Str("stage.status", stage.Status).
Logger()
_, err = t.Repos.Find(noContext, execution.RepoID)
repo, err := t.Repos.Find(noContext, execution.RepoID)
if err != nil {
log.Error().Err(err).Msg("manager: cannot find the repository")
return err
@ -131,6 +133,13 @@ func (t *teardown) do(ctx context.Context, stage *types.Stage) error {
return err
}
execution.Stages = stages
err = t.Events.Publish(noContext, repo.ParentID, executionEvent(enum.ExecutionCompleted, execution))
if err != nil {
log.Warn().Err(err).
Msg("manager: could not publish execution completed event")
}
return nil
}

View File

@ -7,8 +7,10 @@ package manager
import (
"context"
"github.com/harness/gitness/internal/pipeline/events"
"github.com/harness/gitness/internal/store"
"github.com/harness/gitness/types"
"github.com/harness/gitness/types/enum"
"github.com/rs/zerolog/log"
)
@ -16,6 +18,7 @@ import (
type updater struct {
Executions store.ExecutionStore
Repos store.RepoStore
Events events.Events
Steps store.StepStore
Stages store.StageStore
}
@ -36,5 +39,35 @@ func (u *updater) do(ctx context.Context, step *types.Step) error {
return err
}
stage, err := u.Stages.Find(noContext, step.StageID)
if err != nil {
log.Error().Err(err).Msg("manager: cannot find stage")
return nil
}
execution, err := u.Executions.Find(noContext, stage.ExecutionID)
if err != nil {
log.Error().Err(err).Msg("manager: cannot find execution")
return nil
}
repo, err := u.Repos.Find(noContext, execution.RepoID)
if err != nil {
log.Error().Err(err).Msg("manager: cannot find repo")
return nil
}
stages, err := u.Stages.ListWithSteps(noContext, stage.ExecutionID)
if err != nil {
log.Error().Err(err).Msg("manager: cannot find stages")
return nil
}
execution.Stages = stages
err = u.Events.Publish(noContext, repo.ParentID, executionEvent(enum.ExecutionUpdated, execution))
if err != nil {
log.Warn().Err(err).Msg("manager: cannot publish execution updated event")
}
return nil
}

View File

@ -5,6 +5,7 @@
package manager
import (
"github.com/harness/gitness/internal/pipeline/events"
"github.com/harness/gitness/internal/pipeline/file"
"github.com/harness/gitness/internal/pipeline/scheduler"
"github.com/harness/gitness/internal/store"
@ -28,6 +29,7 @@ func ProvideExecutionManager(
executionStore store.ExecutionStore,
pipelineStore store.PipelineStore,
urlProvider *url.Provider,
events events.Events,
fileService file.FileService,
logStore store.LogStore,
logStream livelog.LogStream,
@ -37,7 +39,7 @@ func ProvideExecutionManager(
stageStore store.StageStore,
stepStore store.StepStore,
userStore store.PrincipalStore) ExecutionManager {
return New(config, executionStore, pipelineStore, urlProvider, fileService, logStore,
return New(config, executionStore, pipelineStore, urlProvider, events, fileService, logStore,
logStream, repoStore, scheduler, secretStore, stageStore, stepStore, userStore)
}

View File

@ -188,6 +188,8 @@ func setupSpaces(r chi.Router, spaceCtrl *space.Controller) {
r.Patch("/", handlerspace.HandleUpdate(spaceCtrl))
r.Delete("/", handlerspace.HandleDelete(spaceCtrl))
r.Get("/stream", handlerspace.HandleEventsStream(spaceCtrl))
r.Post("/move", handlerspace.HandleMove(spaceCtrl))
r.Get("/spaces", handlerspace.HandleListSpaces(spaceCtrl))
r.Get("/repos", handlerspace.HandleListRepos(spaceCtrl))

15
types/enum/event_types.go Normal file
View File

@ -0,0 +1,15 @@
// Copyright 2022 Harness Inc. All rights reserved.
// Use of this source code is governed by the Polyform Free Trial License
// that can be found in the LICENSE.md file for this repository.
// Enums for event types delivered to the event stream for the UI
package enum
// EventType defines the kind of event
type EventType string
const (
ExecutionUpdated = "execution_updated"
ExecutionRunning = "execution_running"
ExecutionCompleted = "execution_completed"
)