feat: [CODE-1353]: Add status check and merge check SSE events (#3146)

* Merge remote-tracking branch 'origin/main' into dd/sse-events
* Add webhook create/update/delete event
* Add branch/tag created/updated deleted events
* Merge remote-tracking branch 'origin/main' into dd/sse-events
* Merge remote-tracking branch 'origin/main' into dd/sse-events
* Merge remote-tracking branch 'origin/main' into dd/sse-events
* Add new event types
* Merge remote-tracking branch 'origin/main' into dd/sse-events
* Add logs directly to publisher
* Merge remote-tracking branch 'origin/main' into dd/sse-events
* Add status check and merge check SSE events
BT-10437
Darko Draskovic 2024-12-18 16:00:58 +00:00 committed by Harness
parent 694e790091
commit 265eb40f22
38 changed files with 218 additions and 145 deletions

View File

@ -182,6 +182,8 @@ func (c *Controller) Report(
return nil, fmt.Errorf("failed to upsert status check result for repo=%s: %w", repo.Identifier, err)
}
c.sseStreamer.Publish(ctx, repo.ParentID, enum.SSETypeStatusCheckReportUpdated, statusCheckReport)
return statusCheckReport, nil
}

View File

@ -23,6 +23,7 @@ import (
"github.com/harness/gitness/app/api/usererror"
"github.com/harness/gitness/app/auth"
"github.com/harness/gitness/app/auth/authz"
"github.com/harness/gitness/app/sse"
"github.com/harness/gitness/app/store"
"github.com/harness/gitness/git"
"github.com/harness/gitness/store/database/dbtx"
@ -31,13 +32,14 @@ import (
)
type Controller struct {
tx dbtx.Transactor
authorizer authz.Authorizer
repoStore store.RepoStore
spaceStore store.SpaceStore
checkStore store.CheckStore
git git.Interface
sanitizers map[enum.CheckPayloadKind]func(in *ReportInput, s *auth.Session) error
tx dbtx.Transactor
authorizer authz.Authorizer
repoStore store.RepoStore
spaceStore store.SpaceStore
checkStore store.CheckStore
git git.Interface
sanitizers map[enum.CheckPayloadKind]func(in *ReportInput, s *auth.Session) error
sseStreamer sse.Streamer
}
func NewController(
@ -48,15 +50,17 @@ func NewController(
checkStore store.CheckStore,
git git.Interface,
sanitizers map[enum.CheckPayloadKind]func(in *ReportInput, s *auth.Session) error,
sseStreamer sse.Streamer,
) *Controller {
return &Controller{
tx: tx,
authorizer: authorizer,
repoStore: repoStore,
spaceStore: spaceStore,
checkStore: checkStore,
git: git,
sanitizers: sanitizers,
tx: tx,
authorizer: authorizer,
repoStore: repoStore,
spaceStore: spaceStore,
checkStore: checkStore,
git: git,
sanitizers: sanitizers,
sseStreamer: sseStreamer,
}
}

View File

@ -17,6 +17,7 @@ package check
import (
"github.com/harness/gitness/app/auth"
"github.com/harness/gitness/app/auth/authz"
"github.com/harness/gitness/app/sse"
"github.com/harness/gitness/app/store"
"github.com/harness/gitness/git"
"github.com/harness/gitness/store/database/dbtx"
@ -39,6 +40,7 @@ func ProvideController(
checkStore store.CheckStore,
rpcClient git.Interface,
sanitizers map[enum.CheckPayloadKind]func(in *ReportInput, s *auth.Session) error,
sseStreamer sse.Streamer,
) *Controller {
return NewController(
tx,
@ -48,5 +50,6 @@ func ProvideController(
checkStore,
rpcClient,
sanitizers,
sseStreamer,
)
}

View File

@ -26,6 +26,7 @@ import (
eventsrepo "github.com/harness/gitness/app/events/repo"
"github.com/harness/gitness/app/services/protection"
"github.com/harness/gitness/app/services/settings"
"github.com/harness/gitness/app/sse"
"github.com/harness/gitness/app/store"
"github.com/harness/gitness/app/url"
"github.com/harness/gitness/errors"
@ -52,6 +53,7 @@ type Controller struct {
preReceiveExtender PreReceiveExtender
updateExtender UpdateExtender
postReceiveExtender PostReceiveExtender
sseStreamer sse.Streamer
}
func NewController(
@ -69,6 +71,7 @@ func NewController(
preReceiveExtender PreReceiveExtender,
updateExtender UpdateExtender,
postReceiveExtender PostReceiveExtender,
sseStreamer sse.Streamer,
) *Controller {
return &Controller{
authorizer: authorizer,
@ -85,6 +88,7 @@ func NewController(
preReceiveExtender: preReceiveExtender,
updateExtender: updateExtender,
postReceiveExtender: postReceiveExtender,
sseStreamer: sseStreamer,
}
}

View File

@ -109,19 +109,29 @@ func (c *Controller) reportBranchEvent(
) {
switch {
case branchUpdate.Old.IsNil():
c.gitReporter.BranchCreated(ctx, &events.BranchCreatedPayload{
payload := &events.BranchCreatedPayload{
RepoID: repo.ID,
PrincipalID: principalID,
Ref: branchUpdate.Ref,
SHA: branchUpdate.New.String(),
})
}
c.gitReporter.BranchCreated(ctx, payload)
c.sseStreamer.Publish(ctx, repo.ParentID, enum.SSETypeBranchCreated, payload)
case branchUpdate.New.IsNil():
c.gitReporter.BranchDeleted(ctx, &events.BranchDeletedPayload{
payload := &events.BranchDeletedPayload{
RepoID: repo.ID,
PrincipalID: principalID,
Ref: branchUpdate.Ref,
SHA: branchUpdate.Old.String(),
})
}
c.gitReporter.BranchDeleted(ctx, payload)
c.sseStreamer.Publish(ctx, repo.ParentID, enum.SSETypeBranchDeleted, payload)
default:
// A force update event might trigger some additional operations that aren't required
// for ordinary updates (force pushes alter the commit history of a branch).
@ -135,14 +145,18 @@ func (c *Controller) reportBranchEvent(
Msg("failed to check ancestor")
}
c.gitReporter.BranchUpdated(ctx, &events.BranchUpdatedPayload{
payload := &events.BranchUpdatedPayload{
RepoID: repo.ID,
PrincipalID: principalID,
Ref: branchUpdate.Ref,
OldSHA: branchUpdate.Old.String(),
NewSHA: branchUpdate.New.String(),
Forced: forced,
})
}
c.gitReporter.BranchUpdated(ctx, payload)
c.sseStreamer.Publish(ctx, repo.ParentID, enum.SSETypeBranchUpdated, payload)
}
}
@ -154,21 +168,31 @@ func (c *Controller) reportTagEvent(
) {
switch {
case tagUpdate.Old.IsNil():
c.gitReporter.TagCreated(ctx, &events.TagCreatedPayload{
payload := &events.TagCreatedPayload{
RepoID: repo.ID,
PrincipalID: principalID,
Ref: tagUpdate.Ref,
SHA: tagUpdate.New.String(),
})
}
c.gitReporter.TagCreated(ctx, payload)
c.sseStreamer.Publish(ctx, repo.ParentID, enum.SSETypeTagCreated, payload)
case tagUpdate.New.IsNil():
c.gitReporter.TagDeleted(ctx, &events.TagDeletedPayload{
payload := &events.TagDeletedPayload{
RepoID: repo.ID,
PrincipalID: principalID,
Ref: tagUpdate.Ref,
SHA: tagUpdate.Old.String(),
})
}
c.gitReporter.TagDeleted(ctx, payload)
c.sseStreamer.Publish(ctx, repo.ParentID, enum.SSETypeTagDeleted, payload)
default:
c.gitReporter.TagUpdated(ctx, &events.TagUpdatedPayload{
payload := &events.TagUpdatedPayload{
RepoID: repo.ID,
PrincipalID: principalID,
Ref: tagUpdate.Ref,
@ -176,7 +200,11 @@ func (c *Controller) reportTagEvent(
NewSHA: tagUpdate.New.String(),
// tags can only be force updated!
Forced: true,
})
}
c.gitReporter.TagUpdated(ctx, payload)
c.sseStreamer.Publish(ctx, repo.ParentID, enum.SSETypeTagUpdated, payload)
}
}

View File

@ -21,6 +21,7 @@ import (
eventsrepo "github.com/harness/gitness/app/events/repo"
"github.com/harness/gitness/app/services/protection"
"github.com/harness/gitness/app/services/settings"
"github.com/harness/gitness/app/sse"
"github.com/harness/gitness/app/store"
"github.com/harness/gitness/app/url"
"github.com/harness/gitness/git"
@ -56,6 +57,7 @@ func ProvideController(
preReceiveExtender PreReceiveExtender,
updateExtender UpdateExtender,
postReceiveExtender PostReceiveExtender,
sseStreamer sse.Streamer,
) *Controller {
ctrl := NewController(
authorizer,
@ -72,6 +74,7 @@ func ProvideController(
preReceiveExtender,
updateExtender,
postReceiveExtender,
sseStreamer,
)
// TODO: improve wiring if possible

View File

@ -356,9 +356,7 @@ func (c *Controller) CommentApplySuggestions(
fmt.Errorf("failed to update pull request's unresolved comment count: %w", err)
}
if err = c.sseStreamer.Publish(ctx, repo.ParentID, enum.SSETypePullRequestUpdated, pr); err != nil {
log.Ctx(ctx).Warn().Err(err).Msg("failed to publish PR changed event")
}
c.sseStreamer.Publish(ctx, repo.ParentID, enum.SSETypePullReqUpdated, pr)
err = c.instrumentation.Track(ctx, instrument.Event{
Type: instrument.EventTypePRSuggestionApplied,

View File

@ -216,9 +216,7 @@ func (c *Controller) CommentCreate(
c.migrateCodeComment(ctx, repo, pr, in, act.AsCodeComment(), cut)
}
if err = c.sseStreamer.Publish(ctx, repo.ParentID, enum.SSETypePullRequestUpdated, pr); err != nil {
log.Ctx(ctx).Warn().Err(err).Msg("failed to publish PR changed event")
}
c.sseStreamer.Publish(ctx, repo.ParentID, enum.SSETypePullReqUpdated, pr)
// publish event for all comments
if act.Type == enum.PullReqActivityTypeComment || act.Type == enum.PullReqActivityTypeCodeComment {

View File

@ -23,8 +23,6 @@ import (
"github.com/harness/gitness/app/auth"
"github.com/harness/gitness/types"
"github.com/harness/gitness/types/enum"
"github.com/rs/zerolog/log"
)
// CommentDelete deletes a pull request comment.
@ -85,9 +83,7 @@ func (c *Controller) CommentDelete(
return err
}
if err = c.sseStreamer.Publish(ctx, repo.ParentID, enum.SSETypePullRequestUpdated, pr); err != nil {
log.Ctx(ctx).Warn().Err(err).Msg("failed to publish PR changed event")
}
c.sseStreamer.Publish(ctx, repo.ParentID, enum.SSETypePullReqUpdated, pr)
return nil
}

View File

@ -25,8 +25,6 @@ import (
events "github.com/harness/gitness/app/events/pullreq"
"github.com/harness/gitness/types"
"github.com/harness/gitness/types/enum"
"github.com/rs/zerolog/log"
)
type CommentStatusInput struct {
@ -126,9 +124,7 @@ func (c *Controller) CommentStatus(
return nil, err
}
if err = c.sseStreamer.Publish(ctx, repo.ParentID, enum.SSETypePullRequestUpdated, pr); err != nil {
log.Ctx(ctx).Warn().Err(err).Msg("failed to publish PR changed event")
}
c.sseStreamer.Publish(ctx, repo.ParentID, enum.SSETypePullReqUpdated, pr)
c.eventReporter.CommentStatusUpdated(ctx, &events.CommentStatusUpdatedPayload{
Base: events.Base{

View File

@ -24,8 +24,6 @@ import (
events "github.com/harness/gitness/app/events/pullreq"
"github.com/harness/gitness/types"
"github.com/harness/gitness/types/enum"
"github.com/rs/zerolog/log"
)
type CommentUpdateInput struct {
@ -116,9 +114,7 @@ func (c *Controller) CommentUpdate(
// Populate activity mentions (used only for response purposes).
act.Mentions = principalInfos
if err = c.sseStreamer.Publish(ctx, repo.ParentID, enum.SSETypePullRequestUpdated, pr); err != nil {
log.Ctx(ctx).Warn().Err(err).Msg("failed to publish PR changed event")
}
c.sseStreamer.Publish(ctx, repo.ParentID, enum.SSETypePullReqUpdated, pr)
c.reportCommentUpdated(ctx, pr, session.Principal.ID, act.ID, act.IsReply())

View File

@ -328,9 +328,7 @@ func (c *Controller) Merge(
// non-critical error
log.Ctx(ctx).Warn().Err(err).Msg("failed to update unchecked pull request")
} else {
if err = c.sseStreamer.Publish(ctx, targetRepo.ParentID, enum.SSETypePullRequestUpdated, pr); err != nil {
log.Ctx(ctx).Warn().Err(err).Msg("failed to publish PR changed event")
}
c.sseStreamer.Publish(ctx, targetRepo.ParentID, enum.SSETypePullReqUpdated, pr)
}
}
@ -470,9 +468,7 @@ func (c *Controller) Merge(
// non-critical error
log.Ctx(ctx).Warn().Err(err).Msg("failed to update pull request with conflict files")
} else {
if err = c.sseStreamer.Publish(ctx, targetRepo.ParentID, enum.SSETypePullRequestUpdated, pr); err != nil {
log.Ctx(ctx).Warn().Err(err).Msg("failed to publish PR changed event")
}
c.sseStreamer.Publish(ctx, targetRepo.ParentID, enum.SSETypePullReqUpdated, pr)
}
log.Ctx(ctx).Info().Msg("aborting pull request merge because of conflicts")
@ -573,9 +569,7 @@ func (c *Controller) Merge(
}
}
if err = c.sseStreamer.Publish(ctx, targetRepo.ParentID, enum.SSETypePullRequestUpdated, pr); err != nil {
log.Ctx(ctx).Warn().Err(err).Msg("failed to publish PR changed event")
}
c.sseStreamer.Publish(ctx, targetRepo.ParentID, enum.SSETypePullReqUpdated, pr)
if protection.IsBypassed(violations) {
err = c.auditService.Log(ctx,

View File

@ -151,9 +151,7 @@ func (c *Controller) Create(
SourceSHA: sourceSHA.String(),
})
if err = c.sseStreamer.Publish(ctx, targetRepo.ParentID, enum.SSETypePullRequestUpdated, pr); err != nil {
log.Ctx(ctx).Warn().Err(err).Msg("failed to publish PR changed event")
}
c.sseStreamer.Publish(ctx, targetRepo.ParentID, enum.SSETypePullReqUpdated, pr)
err = c.instrumentation.Track(ctx, instrument.Event{
Type: instrument.EventTypeCreatePullRequest,

View File

@ -189,9 +189,7 @@ func (c *Controller) State(ctx context.Context,
})
}
if err = c.sseStreamer.Publish(ctx, targetRepo.ParentID, enum.SSETypePullRequestUpdated, pr); err != nil {
log.Ctx(ctx).Warn().Err(err).Msg("failed to publish PR changed event")
}
c.sseStreamer.Publish(ctx, targetRepo.ParentID, enum.SSETypePullReqUpdated, pr)
return pr, nil
}

View File

@ -132,9 +132,7 @@ func (c *Controller) Update(ctx context.Context,
c.eventReporter.Updated(ctx, updateEvent)
if err = c.sseStreamer.Publish(ctx, targetRepo.ParentID, enum.SSETypePullRequestUpdated, pr); err != nil {
log.Ctx(ctx).Warn().Err(err).Msg("failed to publish PR changed event")
}
c.sseStreamer.Publish(ctx, targetRepo.ParentID, enum.SSETypePullReqUpdated, pr)
return pr, nil
}

View File

@ -150,14 +150,7 @@ func (c *Controller) ReviewerAdd(
c.reportReviewerAddition(ctx, session, pr, reviewer)
if err = c.sseStreamer.Publish(
ctx, repo.ParentID, enum.SSETypePullRequestReviewerAdded, pr,
); err != nil {
log.Ctx(ctx).Warn().Err(err).Msgf(
"failed to publish %s event", enum.SSETypePullRequestReviewerAdded,
)
}
c.sseStreamer.Publish(ctx, repo.ParentID, enum.SSETypePullReqReviewerAdded, pr)
return reviewer, nil
}

View File

@ -101,13 +101,7 @@ func (c *Controller) ReviewerDelete(
log.Ctx(ctx).Err(err).Msg("failed to write pull request activity after reviewer removal")
}
if err = c.sseStreamer.Publish(
ctx, repo.ParentID, enum.SSETypePullRequestReviewerAdded, pr,
); err != nil {
log.Ctx(ctx).Warn().Err(err).Msgf(
"failed to publish %s event", enum.SSETypePullRequestReviewerRemoved,
)
}
c.sseStreamer.Publish(ctx, repo.ParentID, enum.SSETypePullReqReviewerAdded, pr)
return nil
}

View File

@ -38,6 +38,7 @@ import (
"github.com/harness/gitness/app/services/rules"
"github.com/harness/gitness/app/services/settings"
"github.com/harness/gitness/app/services/usergroup"
"github.com/harness/gitness/app/sse"
"github.com/harness/gitness/app/store"
"github.com/harness/gitness/app/url"
"github.com/harness/gitness/audit"
@ -104,6 +105,7 @@ type Controller struct {
labelSvc *label.Service
instrumentation instrument.Service
rulesSvc *rules.Service
sseStreamer sse.Streamer
}
func NewController(
@ -139,6 +141,7 @@ func NewController(
userGroupStore store.UserGroupStore,
userGroupService usergroup.SearchService,
rulesSvc *rules.Service,
sseStreamer sse.Streamer,
) *Controller {
return &Controller{
defaultBranch: config.Git.DefaultBranch,
@ -173,6 +176,7 @@ func NewController(
userGroupStore: userGroupStore,
userGroupService: userGroupService,
rulesSvc: rulesSvc,
sseStreamer: sseStreamer,
}
}

View File

@ -75,6 +75,8 @@ func (c *Controller) MergeCheck(
}, nil
}
c.sseStreamer.Publish(ctx, repo.ParentID, enum.SSETypeBranchMergableUpdated, mergeOutput)
return MergeCheck{
Mergeable: true,
}, nil

View File

@ -29,6 +29,7 @@ import (
"github.com/harness/gitness/app/services/rules"
"github.com/harness/gitness/app/services/settings"
"github.com/harness/gitness/app/services/usergroup"
"github.com/harness/gitness/app/sse"
"github.com/harness/gitness/app/store"
"github.com/harness/gitness/app/url"
"github.com/harness/gitness/audit"
@ -79,6 +80,7 @@ func ProvideController(
userGroupStore store.UserGroupStore,
userGroupService usergroup.SearchService,
rulesSvc *rules.Service,
sseStreamer sse.Streamer,
) *Controller {
return NewController(config, tx, urlProvider,
authorizer,
@ -87,7 +89,7 @@ func ProvideController(
principalInfoCache, protectionManager, rpcClient, importer,
codeOwners, reporeporter, indexer, limiter, locker, auditService, mtxManager, identifierCheck,
repoChecks, publicAccess, labelSvc, instrumentation, userGroupStore, userGroupService,
rulesSvc,
rulesSvc, sseStreamer,
)
}

View File

@ -146,12 +146,7 @@ func (s *service) Cancel(ctx context.Context, repo *types.Repository, execution
execution.Stages = stages
log.Info().Msg("canceler: successfully cancelled build")
// trigger a SSE to notify subscribers that
// the execution was cancelled.
err = s.sseStreamer.Publish(ctx, repo.ParentID, enum.SSETypeExecutionCanceled, execution)
if err != nil {
log.Debug().Err(err).Msg("canceler: failed to publish server-sent event")
}
s.sseStreamer.Publish(ctx, repo.ParentID, enum.SSETypeExecutionCanceled, execution)
return nil
}

View File

@ -108,10 +108,8 @@ func (s *setup) do(ctx context.Context, stage *types.Stage) error {
return err
}
execution.Stages = stages
err = s.SSEStreamer.Publish(noContext, repo.ParentID, enum.SSETypeExecutionRunning, execution)
if err != nil {
log.Warn().Err(err).Msg("manager: could not publish execution event")
}
s.SSEStreamer.Publish(noContext, repo.ParentID, enum.SSETypeExecutionRunning, execution)
return nil
}

View File

@ -170,11 +170,8 @@ func (t *teardown) do(ctx context.Context, stage *types.Stage) error {
}
execution.Stages = stages
err = t.SSEStreamer.Publish(noContext, repo.ParentID, enum.SSETypeExecutionCompleted, execution)
if err != nil {
log.Warn().Err(err).
Msg("manager: could not publish execution completed event")
}
t.SSEStreamer.Publish(noContext, repo.ParentID, enum.SSETypeExecutionCompleted, execution)
// send pipeline execution status
t.reportExecutionCompleted(ctx, execution)

View File

@ -72,12 +72,9 @@ func (u *updater) do(ctx context.Context, step *types.Step) error {
log.Error().Err(err).Msg("manager: cannot find stages")
return nil
}
execution.Stages = stages
err = u.SSEStreamer.Publish(noContext, repo.ParentID, enum.SSETypeExecutionUpdated, execution)
if err != nil {
log.Warn().Err(err).Msg("manager: cannot publish execution updated event")
}
u.SSEStreamer.Publish(noContext, repo.ParentID, enum.SSETypeExecutionUpdated, execution)
return nil
}

View File

@ -202,7 +202,7 @@ func (r *Repository) Handle(ctx context.Context, data string, _ job.ProgressRepo
GitIgnore: "",
})
if err != nil {
r.publishSSE(ctx, repository)
r.sseStreamer.Publish(ctx, repository.ParentID, enum.SSETypeRepositoryExportCompleted, repository)
return "", err
}
@ -220,24 +220,17 @@ func (r *Repository) Handle(ctx context.Context, data string, _ job.ProgressRepo
if errDelete != nil {
log.Ctx(ctx).Err(errDelete).Msgf("failed to delete repo '%s' on harness", remoteRepo.Identifier)
}
r.publishSSE(ctx, repository)
r.sseStreamer.Publish(ctx, repository.ParentID, enum.SSETypeRepositoryExportCompleted, repository)
return "", err
}
log.Ctx(ctx).Info().Msgf("completed exporting repository '%s' to harness", repository.Identifier)
r.publishSSE(ctx, repository)
r.sseStreamer.Publish(ctx, repository.ParentID, enum.SSETypeRepositoryExportCompleted, repository)
return "", nil
}
func (r *Repository) publishSSE(ctx context.Context, repository *types.Repository) {
err := r.sseStreamer.Publish(ctx, repository.ParentID, enum.SSETypeRepositoryExportCompleted, repository)
if err != nil {
log.Ctx(ctx).Warn().Err(err).Msg("failed to publish export completion SSE")
}
}
func (r *Repository) getJobInput(data string) (Input, error) {
encrypted, err := base64.StdEncoding.DecodeString(data)
if err != nil {

View File

@ -362,10 +362,7 @@ func (r *Repository) Handle(ctx context.Context, data string, _ job.ProgressRepo
return "", fmt.Errorf("failed to import repository: %w", err)
}
err = r.sseStreamer.Publish(ctx, repo.ParentID, enum.SSETypeRepositoryImportCompleted, repo)
if err != nil {
log.Warn().Err(err).Msg("failed to publish import completion SSE")
}
r.sseStreamer.Publish(ctx, repo.ParentID, enum.SSETypeRepositoryImportCompleted, repo)
err = r.indexer.Index(ctx, repo)
if err != nil {

View File

@ -165,9 +165,7 @@ func (s *Service) triggerPREventOnBranchUpdate(ctx context.Context,
Forced: event.Payload.Forced,
})
if err = s.sseStreamer.Publish(ctx, targetRepo.ParentID, enum.SSETypePullRequestUpdated, pr); err != nil {
log.Ctx(ctx).Warn().Err(err).Msg("failed to publish PR changed event")
}
s.sseStreamer.Publish(ctx, targetRepo.ParentID, enum.SSETypePullReqUpdated, pr)
return nil
})
@ -246,9 +244,7 @@ func (s *Service) closePullReqOnBranchDelete(ctx context.Context,
SourceSHA: pr.SourceSHA,
})
if err = s.sseStreamer.Publish(ctx, targetRepo.ParentID, enum.SSETypePullRequestUpdated, pr); err != nil {
log.Ctx(ctx).Warn().Err(err).Msg("failed to publish PR changed event")
}
s.sseStreamer.Publish(ctx, targetRepo.ParentID, enum.SSETypePullReqUpdated, pr)
return nil
})

View File

@ -31,7 +31,6 @@ import (
"github.com/harness/gitness/types/enum"
"github.com/gotidy/ptr"
"github.com/rs/zerolog/log"
)
const (
@ -242,9 +241,7 @@ func (s *Service) updateMergeData(
return fmt.Errorf("failed to update PR merge ref in db with error: %w", err)
}
if err = s.sseStreamer.Publish(ctx, targetRepo.ParentID, enum.SSETypePullRequestUpdated, pr); err != nil {
log.Ctx(ctx).Warn().Err(err).Msg("failed to publish PR changed event")
}
s.sseStreamer.Publish(ctx, targetRepo.ParentID, enum.SSETypePullReqUpdated, pr)
return nil
}

View File

@ -109,11 +109,10 @@ func (s *Service) sendSSE(
if parentType == enum.RuleParentRepo {
repo, err := s.repoStore.Find(ctx, parentID)
if err != nil {
log.Ctx(ctx).Warn().Err(err).Msg("failed to get repo")
log.Ctx(ctx).Warn().Err(err).Msg("failed to find repo")
return
}
spaceID = repo.ParentID
}
if err := s.sseStreamer.Publish(ctx, spaceID, sseType, rule); err != nil {
log.Ctx(ctx).Warn().Err(err).Msgf("failed to publish %s event", sseType)
}
s.sseStreamer.Publish(ctx, spaceID, sseType, rule)
}

View File

@ -15,12 +15,16 @@
package webhook
import (
"context"
"net"
"net/url"
"github.com/harness/gitness/errors"
"github.com/harness/gitness/types"
"github.com/harness/gitness/types/check"
"github.com/harness/gitness/types/enum"
"github.com/rs/zerolog/log"
)
const (
@ -127,3 +131,22 @@ func ConvertTriggers(vals []string) []enum.WebhookTrigger {
}
return res
}
func (s *Service) sendSSE(
ctx context.Context,
parentID int64,
parentType enum.WebhookParent,
sseType enum.SSEType,
webhook *types.Webhook,
) {
spaceID := parentID
if parentType == enum.WebhookParentRepo {
repo, err := s.repoStore.Find(ctx, parentID)
if err != nil {
log.Ctx(ctx).Warn().Err(err).Msg("failed to find repo")
return
}
spaceID = repo.ParentID
}
s.sseStreamer.Publish(ctx, spaceID, sseType, webhook)
}

View File

@ -146,5 +146,7 @@ func (s *Service) Create(
return nil, fmt.Errorf("failed to store webhook: %w", err)
}
s.sendSSE(ctx, parentID, parentType, enum.SSETypeWebhookCreated, hook)
return hook, nil
}

View File

@ -37,5 +37,11 @@ func (s *Service) Delete(
return ErrInternalWebhookOperationNotAllowed
}
return s.webhookStore.Delete(ctx, hook.ID)
if err := s.webhookStore.Delete(ctx, hook.ID); err != nil {
return err
}
s.sendSSE(ctx, parentID, parentType, enum.SSETypeWebhookDeleted, hook)
return nil
}

View File

@ -23,6 +23,7 @@ import (
gitevents "github.com/harness/gitness/app/events/git"
pullreqevents "github.com/harness/gitness/app/events/pullreq"
"github.com/harness/gitness/app/sse"
"github.com/harness/gitness/app/store"
"github.com/harness/gitness/app/url"
"github.com/harness/gitness/encrypt"
@ -103,6 +104,8 @@ type Service struct {
config Config
webhookURLProvider URLProvider
sseStreamer sse.Streamer
}
func NewService(
@ -124,6 +127,7 @@ func NewService(
labelStore store.LabelStore,
webhookURLProvider URLProvider,
labelValueStore store.LabelValueStore,
sseStreamer sse.Streamer,
) (*Service, error) {
if err := config.Prepare(); err != nil {
return nil, fmt.Errorf("provided webhook service config is invalid: %w", err)
@ -152,6 +156,8 @@ func NewService(
labelStore: labelStore,
labelValueStore: labelValueStore,
webhookURLProvider: webhookURLProvider,
sseStreamer: sseStreamer,
}
_, err := gitReaderFactory.Launch(ctx, eventsReaderGroupName, config.EventReaderName,

View File

@ -120,5 +120,7 @@ func (s *Service) Update(
return nil, err
}
s.sendSSE(ctx, parentID, parentType, enum.SSETypeWebhookUpdated, hook)
return hook, nil
}

View File

@ -19,6 +19,7 @@ import (
gitevents "github.com/harness/gitness/app/events/git"
pullreqevents "github.com/harness/gitness/app/events/pullreq"
"github.com/harness/gitness/app/sse"
"github.com/harness/gitness/app/store"
"github.com/harness/gitness/app/url"
"github.com/harness/gitness/encrypt"
@ -54,6 +55,7 @@ func ProvideService(
labelStore store.LabelStore,
webhookURLProvider URLProvider,
labelValueStore store.LabelValueStore,
sseStreamer sse.Streamer,
) (*Service, error) {
return NewService(
ctx,
@ -73,6 +75,7 @@ func ProvideService(
labelStore,
webhookURLProvider,
labelValueStore,
sseStreamer,
)
}

View File

@ -17,11 +17,12 @@ package sse
import (
"context"
"encoding/json"
"fmt"
"strconv"
"github.com/harness/gitness/pubsub"
"github.com/harness/gitness/types/enum"
"github.com/rs/zerolog/log"
)
// Event is a server sent event.
@ -32,7 +33,7 @@ type Event struct {
type Streamer interface {
// Publish publishes an event to a given space ID.
Publish(ctx context.Context, spaceID int64, eventType enum.SSEType, data any) error
Publish(ctx context.Context, spaceID int64, eventType enum.SSEType, data any)
// Stream streams the events on a space ID.
Stream(ctx context.Context, spaceID int64) (<-chan *Event, <-chan error, func(context.Context) error)
@ -50,10 +51,15 @@ func NewStreamer(pubsub pubsub.PubSub, namespace string) Streamer {
}
}
func (e *pubsubStreamer) Publish(ctx context.Context, spaceID int64, eventType enum.SSEType, data any) error {
func (e *pubsubStreamer) Publish(
ctx context.Context,
spaceID int64,
eventType enum.SSEType,
data any,
) {
dataSerialized, err := json.Marshal(data)
if err != nil {
return fmt.Errorf("failed to serialize data: %w", err)
log.Ctx(ctx).Warn().Err(err).Msgf("failed to serialize data: %v", err.Error())
}
event := Event{
Type: eventType,
@ -61,16 +67,14 @@ func (e *pubsubStreamer) Publish(ctx context.Context, spaceID int64, eventType e
}
serializedEvent, err := json.Marshal(event)
if err != nil {
return fmt.Errorf("failed to serialize event: %w", err)
log.Ctx(ctx).Warn().Err(err).Msgf("failed to serialize event: %v", err.Error())
}
namespaceOption := pubsub.WithPublishNamespace(e.namespace)
topic := getSpaceTopic(spaceID)
err = e.pubsub.Publish(ctx, topic, serializedEvent, namespaceOption)
if err != nil {
return fmt.Errorf("failed to publish event on pubsub: %w", err)
log.Ctx(ctx).Warn().Err(err).Msgf("failed to publish %s event", eventType)
}
return nil
}
func (e *pubsubStreamer) Stream(

View File

@ -255,7 +255,7 @@ func initSystem(ctx context.Context, config *types.Config) (*server.System, erro
userGroupStore := database.ProvideUserGroupStore(db)
searchService := usergroup.ProvideSearchService()
rulesService := rules.ProvideService(transactor, ruleStore, repoStore, spaceStore, protectionManager, auditService, instrumentService, principalInfoCache, userGroupStore, searchService, streamer)
repoController := repo.ProvideController(config, transactor, provider, authorizer, repoStore, spaceStore, pipelineStore, principalStore, executionStore, ruleStore, checkStore, pullReqStore, settingsService, principalInfoCache, protectionManager, gitInterface, repository, codeownersService, reporter, indexer, resourceLimiter, lockerLocker, auditService, mutexManager, repoIdentifier, repoCheck, publicaccessService, labelService, instrumentService, userGroupStore, searchService, rulesService)
repoController := repo.ProvideController(config, transactor, provider, authorizer, repoStore, spaceStore, pipelineStore, principalStore, executionStore, ruleStore, checkStore, pullReqStore, settingsService, principalInfoCache, protectionManager, gitInterface, repository, codeownersService, reporter, indexer, resourceLimiter, lockerLocker, auditService, mutexManager, repoIdentifier, repoCheck, publicaccessService, labelService, instrumentService, userGroupStore, searchService, rulesService, streamer)
reposettingsController := reposettings.ProvideController(authorizer, repoStore, settingsService, auditService)
stageStore := database.ProvideStageStore(db)
schedulerScheduler, err := scheduler.ProvideScheduler(stageStore, mutexManager)
@ -379,7 +379,7 @@ func initSystem(ctx context.Context, config *types.Config) (*server.System, erro
webhookStore := database.ProvideWebhookStore(db)
webhookExecutionStore := database.ProvideWebhookExecutionStore(db)
urlProvider := webhook.ProvideURLProvider(ctx)
webhookService, err := webhook.ProvideService(ctx, webhookConfig, transactor, readerFactory, eventsReaderFactory, webhookStore, webhookExecutionStore, spaceStore, repoStore, pullReqStore, pullReqActivityStore, provider, principalStore, gitInterface, encrypter, labelStore, urlProvider, labelValueStore)
webhookService, err := webhook.ProvideService(ctx, webhookConfig, transactor, readerFactory, eventsReaderFactory, webhookStore, webhookExecutionStore, spaceStore, repoStore, pullReqStore, pullReqActivityStore, provider, principalStore, gitInterface, encrypter, labelStore, urlProvider, labelValueStore, streamer)
if err != nil {
return nil, err
}
@ -401,12 +401,12 @@ func initSystem(ctx context.Context, config *types.Config) (*server.System, erro
if err != nil {
return nil, err
}
githookController := githook.ProvideController(authorizer, principalStore, repoStore, reporter5, reporter, gitInterface, pullReqStore, provider, protectionManager, clientFactory, resourceLimiter, settingsService, preReceiveExtender, updateExtender, postReceiveExtender)
githookController := githook.ProvideController(authorizer, principalStore, repoStore, reporter5, reporter, gitInterface, pullReqStore, provider, protectionManager, clientFactory, resourceLimiter, settingsService, preReceiveExtender, updateExtender, postReceiveExtender, streamer)
serviceaccountController := serviceaccount.NewController(principalUID, authorizer, principalStore, spaceStore, repoStore, tokenStore)
principalController := principal.ProvideController(principalStore, authorizer)
usergroupController := usergroup2.ProvideController(userGroupStore, spaceStore, authorizer, searchService)
v := check2.ProvideCheckSanitizers()
checkController := check2.ProvideController(transactor, authorizer, repoStore, spaceStore, checkStore, gitInterface, v)
checkController := check2.ProvideController(transactor, authorizer, repoStore, spaceStore, checkStore, gitInterface, v, streamer)
systemController := system.NewController(principalStore, config)
blobConfig, err := server.ProvideBlobStoreConfig(config)
if err != nil {

View File

@ -19,22 +19,69 @@ type SSEType string
// Enums for event types delivered to the event stream for the UI.
const (
// Executions.
SSETypeExecutionUpdated SSEType = "execution_updated"
SSETypeExecutionRunning SSEType = "execution_running"
SSETypeExecutionCompleted SSEType = "execution_completed"
SSETypeExecutionCanceled SSEType = "execution_canceled"
// Repo import/export.
SSETypeRepositoryImportCompleted SSEType = "repository_import_completed"
SSETypeRepositoryExportCompleted SSEType = "repository_export_completed"
SSETypePullRequestUpdated SSEType = "pullreq_updated"
// Pull reqs.
SSETypePullRequestReviewerAdded SSEType = "pullreq_reviewer_added"
SSETypePullRequestReviewerRemoved SSEType = "pullreq_reviewer_removed"
SSETypePullReqUpdated SSEType = "pullreq_updated"
SSETypePullReqReviewerAdded SSEType = "pullreq_reviewer_added"
SSETypePullReqtReviewerRemoved SSEType = "pullreq_reviewer_removed"
SSETypePullReqCommentCreated SSEType = "pullreq_comment_created"
SSETypePullReqCommentEdited SSEType = "pullreq_comment_edited"
SSETypePullReqCommentUpdated SSEType = "pullreq_comment_updated"
SSETypePullReqCommentStatusResolved SSEType = "pullreq_comment_status_resolved"
SSETypePullReqCommentStatusReactivated SSEType = "pullreq_comment_status_reactivated"
SSETypePullReqOpened SSEType = "pullreq_opened"
SSETypePullReqClosed SSEType = "pullreq_closed"
SSETypePullReqMarkedAsDraft SSEType = "pullreq_marked_as_draft"
SSETypePullReqReadyForReview SSEType = "pullreq_ready_for_review"
// Branches.
SSETypeBranchMergableUpdated SSEType = "branch_mergable_updated"
SSETypeBranchCreated SSEType = "branch_created"
SSETypeBranchUpdated SSEType = "branch_updated"
SSETypeBranchDeleted SSEType = "branch_deleted"
// Tags.
SSETypeTagCreated SSEType = "tag_created"
SSETypeTagUpdated SSEType = "tag_updated"
SSETypeTagDeleted SSEType = "tag_deleted"
// Statuses.
SSETypeStatusCheckReportUpdated SSEType = "status_check_report_updated"
// Logs.
SSETypeLogLineAppended SSEType = "log_line_appended"
// Rules.
SSETypeRuleCreated SSEType = "rule_created"
SSETypeRuleUpdated SSEType = "rule_updated"
SSETypeRuleDeleted SSEType = "rule_deleted"
// Webhooks.
SSETypeWebhookCreated SSEType = "webhook_created"
SSETypeWebhookUpdated SSEType = "webhook_updated"
SSETypeWebhookDeleted SSEType = "webhook_deleted"
)