From e6bba2e0ce81e62f5b6db50f1d93a08bc083755a Mon Sep 17 00:00:00 2001 From: Johannes Batzill Date: Thu, 9 Nov 2023 20:04:36 +0000 Subject: [PATCH] [BugBash] Fix PR Activities, Increase merge lock timeout (#783) --- app/api/controller/pullreq/locks.go | 9 +++- app/api/controller/pullreq/merge.go | 25 ++++++++- app/api/controller/pullreq/pr_state.go | 6 --- app/api/controller/pullreq/review_submit.go | 7 --- app/api/usererror/translate.go | 20 +++++++- app/api/usererror/usererror.go | 6 +++ app/services/pullreq/handlers_branch.go | 56 ++++++++++++++++++--- lock/lock.go | 22 ++++---- lock/memory.go | 12 ++--- lock/memory_test.go | 2 +- lock/redis.go | 8 +-- types/config.go | 4 +- types/pullreq_activity.go | 2 - 13 files changed, 131 insertions(+), 48 deletions(-) diff --git a/app/api/controller/pullreq/locks.go b/app/api/controller/pullreq/locks.go index 8551e59a7..ca76816a7 100644 --- a/app/api/controller/pullreq/locks.go +++ b/app/api/controller/pullreq/locks.go @@ -16,6 +16,7 @@ package pullreq import ( "strconv" + "time" "github.com/harness/gitness/lock" ) @@ -25,5 +26,11 @@ func (c *Controller) newMutexForPR(repoUID string, pr int64, options ...lock.Opt if pr != 0 { key += "/" + strconv.FormatInt(pr, 10) } - return c.mtxManager.NewMutex(key, append(options, lock.WithNamespace("repo"))...) + return c.mtxManager.NewMutex( + key, + append(options, + lock.WithNamespace("repo"), + lock.WithExpiry(16*time.Second), + lock.WithTimeoutFactor(0.25), // 4s + )...) } diff --git a/app/api/controller/pullreq/merge.go b/app/api/controller/pullreq/merge.go index 377428edc..5e10825a2 100644 --- a/app/api/controller/pullreq/merge.go +++ b/app/api/controller/pullreq/merge.go @@ -265,6 +265,7 @@ func (c *Controller) Merge( return nil, nil, fmt.Errorf("merge check execution failed: %w", err) } + var activitySeqMerge, activitySeqBranchDeleted int64 pr, err = c.pullreqStore.UpdateOptLock(ctx, pr, func(pr *types.PullReq) error { pr.State = enum.PullReqStateMerged @@ -280,13 +281,22 @@ func (c *Controller) Merge( pr.MergeSHA = &mergeOutput.MergeSHA pr.MergeConflicts = nil - pr.ActivitySeq++ // because we need to write the activity entry + // update sequence for PR activities + pr.ActivitySeq++ + activitySeqMerge = pr.ActivitySeq + + if ruleOut.DeleteSourceBranch { + pr.ActivitySeq++ + activitySeqBranchDeleted = pr.ActivitySeq + } + return nil }) if err != nil { return nil, nil, fmt.Errorf("failed to update pull request: %w", err) } + pr.ActivitySeq = activitySeqMerge activityPayload := &types.PullRequestActivityPayloadMerge{ MergeMethod: in.Method, MergeSHA: mergeOutput.MergeSHA, @@ -317,9 +327,22 @@ func (c *Controller) Merge( log.Ctx(ctx).Err(errDelete).Msgf("failed to delete source branch after merging") } else { branchDeleted = true + + // NOTE: there is a chance someone pushed on the branch between merge and delete. + // Either way, we'll use the SHA that was merged with for the activity to be consistent from PR perspective. + pr.ActivitySeq = activitySeqBranchDeleted + if _, errAct := c.activityStore.CreateWithPayload(ctx, pr, session.Principal.ID, + &types.PullRequestActivityPayloadBranchDelete{SHA: in.SourceSHA}); errAct != nil { + // non-critical error + log.Ctx(ctx).Err(errAct).Msgf("failed to write pull request activity for successful automatic branch delete") + } } } + if err = c.sseStreamer.Publish(ctx, targetRepo.ParentID, enum.SSETypePullrequesUpdated, pr); err != nil { + log.Ctx(ctx).Warn().Msg("failed to publish PR changed event") + } + return &types.MergeResponse{ SHA: mergeOutput.MergeSHA, BranchDeleted: branchDeleted, diff --git a/app/api/controller/pullreq/pr_state.go b/app/api/controller/pullreq/pr_state.go index df2b2f317..9e28d7469 100644 --- a/app/api/controller/pullreq/pr_state.go +++ b/app/api/controller/pullreq/pr_state.go @@ -17,7 +17,6 @@ package pullreq import ( "context" "fmt" - "strings" "time" apiauth "github.com/harness/gitness/app/api/auth" @@ -34,7 +33,6 @@ import ( type StateInput struct { State enum.PullReqState `json:"state"` IsDraft bool `json:"is_draft"` - Message string `json:"message"` } func (in *StateInput) Check() error { @@ -45,14 +43,11 @@ func (in *StateInput) Check() error { } in.State = state - in.Message = strings.TrimSpace(in.Message) if in.State == enum.PullReqStateMerged { return usererror.BadRequest("Pull requests can't be merged with this API") } - // TODO: Need to check the length of the message string - return nil } @@ -171,7 +166,6 @@ func (c *Controller) State(ctx context.Context, New: pr.State, OldDraft: oldDraft, NewDraft: pr.IsDraft, - Message: in.Message, } if _, errAct := c.activityStore.CreateWithPayload(ctx, pr, session.Principal.ID, payload); errAct != nil { // non-critical error diff --git a/app/api/controller/pullreq/review_submit.go b/app/api/controller/pullreq/review_submit.go index 1e312aef3..c52549402 100644 --- a/app/api/controller/pullreq/review_submit.go +++ b/app/api/controller/pullreq/review_submit.go @@ -18,7 +18,6 @@ import ( "context" "errors" "fmt" - "strings" "time" "github.com/harness/gitness/app/api/usererror" @@ -34,7 +33,6 @@ import ( type ReviewSubmitInput struct { CommitSHA string `json:"commit_sha"` Decision enum.PullReqReviewDecision `json:"decision"` - Message string `json:"message"` } func (in *ReviewSubmitInput) Validate() error { @@ -50,11 +48,7 @@ func (in *ReviewSubmitInput) Validate() error { enum.PullReqReviewDecisionReviewed) return usererror.BadRequest(msg) } - in.Decision = decision - in.Message = strings.TrimSpace(in.Message) - - // TODO: Check the length of the message string return nil } @@ -128,7 +122,6 @@ func (c *Controller) ReviewSubmit( payload := &types.PullRequestActivityPayloadReviewSubmit{ CommitSHA: commitSHA, - Message: in.Message, Decision: in.Decision, } _, err = c.activityStore.CreateWithPayload(ctx, pr, session.Principal.ID, payload) diff --git a/app/api/usererror/translate.go b/app/api/usererror/translate.go index a3c186032..6e7c8ec92 100644 --- a/app/api/usererror/translate.go +++ b/app/api/usererror/translate.go @@ -23,6 +23,7 @@ import ( "github.com/harness/gitness/app/services/webhook" "github.com/harness/gitness/blob" "github.com/harness/gitness/gitrpc" + "github.com/harness/gitness/lock" "github.com/harness/gitness/store" "github.com/harness/gitness/types/check" @@ -36,6 +37,7 @@ func Translate(err error) *Error { gitrpcError *gitrpc.Error maxBytesErr *http.MaxBytesError codeOwnersTooLargeError *codeowners.TooLargeError + lockError *lock.Error ) // TODO: Improve performance of checking multiple errors with errors.Is @@ -89,12 +91,16 @@ func Translate(err error) *Error { case errors.Is(err, webhook.ErrWebhookNotRetriggerable): return ErrWebhookNotRetriggerable - // codeowners errors + // codeowners errors case errors.Is(err, codeowners.ErrNotFound): return ErrCodeOwnersNotFound case errors.As(err, &codeOwnersTooLargeError): return UnprocessableEntityf(codeOwnersTooLargeError.Error()) + // lock errors + case errors.As(err, &lockError): + return errorFromLockError(lockError) + // unknown error default: log.Warn().Msgf("Unable to translate error: %s", err) @@ -102,6 +108,18 @@ func Translate(err error) *Error { } } +// errorFromLockError returns the associated error for a given lock error. +func errorFromLockError(err *lock.Error) *Error { + log.Warn().Err(err).Msg("encountered lock error") + if err.Kind == lock.ErrorKindCannotLock || + err.Kind == lock.ErrorKindLockHeld || + err.Kind == lock.ErrorKindMaxRetriesExceeded { + return ErrResourceLocked + } + + return ErrInternal +} + // lookup of gitrpc error codes to HTTP status codes. var codes = map[gitrpc.Status]int{ gitrpc.StatusConflict: http.StatusConflict, diff --git a/app/api/usererror/usererror.go b/app/api/usererror/usererror.go index b07f653e6..cbf69f26c 100644 --- a/app/api/usererror/usererror.go +++ b/app/api/usererror/usererror.go @@ -82,6 +82,12 @@ var ( // ErrResponseNotFlushable is returned if the response writer doesn't implement http.Flusher. ErrResponseNotFlushable = New(http.StatusInternalServerError, "Response not streamable") + + // ErrResourceLocked is returned if the resource is locked. + ErrResourceLocked = New( + http.StatusLocked, + "The requested resource is temporarily locked, please retry the operation.", + ) ) // Error represents a json-encoded API error. diff --git a/app/services/pullreq/handlers_branch.go b/app/services/pullreq/handlers_branch.go index 56e9da6b8..c7b769b92 100644 --- a/app/services/pullreq/handlers_branch.go +++ b/app/services/pullreq/handlers_branch.go @@ -16,6 +16,7 @@ package pullreq import ( "context" + "errors" "fmt" "strings" "time" @@ -30,8 +31,14 @@ import ( "github.com/rs/zerolog/log" ) +var ( + errPRNotOpen = errors.New("PR is not open") +) + // triggerPREventOnBranchUpdate handles branch update events. For every open pull request // it writes an activity entry and triggers the pull request Branch Updated event. +// +//nolint:gocognit // refactor if needed func (s *Service) triggerPREventOnBranchUpdate(ctx context.Context, event *events.Event[*gitevents.BranchUpdatedPayload], ) error { @@ -75,8 +82,12 @@ func (s *Service) triggerPREventOnBranchUpdate(ctx context.Context, newMergeBase := mergeBaseInfo.MergeBaseSHA // Update the database with the latest source commit SHA and the merge base SHA. - pr, err = s.pullreqStore.UpdateOptLock(ctx, pr, func(pr *types.PullReq) error { + // to avoid racing conditions + if pr.State != enum.PullReqStateOpen { + return errPRNotOpen + } + pr.ActivitySeq++ if pr.SourceSHA != event.Payload.OldSHA { return fmt.Errorf( @@ -94,6 +105,9 @@ func (s *Service) triggerPREventOnBranchUpdate(ctx context.Context, pr.MergeConflicts = nil return nil }) + if errors.Is(err, errPRNotOpen) { + return nil + } if err != nil { return err } @@ -144,8 +158,17 @@ func (s *Service) closePullReqOnBranchDelete(ctx context.Context, return fmt.Errorf("failed to get repo info: %w", err) } + var activitySeqBranchDeleted, activitySeqPRClosed int64 pr, err = s.pullreqStore.UpdateOptLock(ctx, pr, func(pr *types.PullReq) error { - pr.ActivitySeq++ // because we need to write the activity + // to avoid racing conditions + if pr.State != enum.PullReqStateOpen { + return errPRNotOpen + } + + // get sequence numbers for both activities (branch deletion should be first) + pr.ActivitySeq += 2 + activitySeqBranchDeleted = pr.ActivitySeq - 1 + activitySeqPRClosed = pr.ActivitySeq pr.State = enum.PullReqStateClosed pr.MergeCheckStatus = enum.MergeCheckStatusUnchecked @@ -154,15 +177,36 @@ func (s *Service) closePullReqOnBranchDelete(ctx context.Context, return nil }) + if errors.Is(err, errPRNotOpen) { + return nil + } if err != nil { return fmt.Errorf("failed to close pull request after branch delete: %w", err) } - _, errAct := s.activityStore.CreateWithPayload(ctx, pr, event.Payload.PrincipalID, - &types.PullRequestActivityPayloadBranchDelete{SHA: event.Payload.SHA}) - if errAct != nil { + // NOTE: We use the latest PR source sha for the branch deleted activity. + // There is a chance the PR is behind, but we can't guarantee any missing commit exists after branch deletion. + // Whatever is the source sha of the PR is most likely to be pointed at by the PR head ref. + pr.ActivitySeq = activitySeqBranchDeleted + _, err = s.activityStore.CreateWithPayload(ctx, pr, event.Payload.PrincipalID, + &types.PullRequestActivityPayloadBranchDelete{SHA: pr.SourceSHA}) + if err != nil { // non-critical error - log.Ctx(ctx).Err(errAct).Msgf("failed to write pull request activity after branch delete") + log.Ctx(ctx).Err(err).Msg("failed to write pull request activity for branch deletion") + } + + pr.ActivitySeq = activitySeqPRClosed + payload := &types.PullRequestActivityPayloadStateChange{ + Old: enum.PullReqStateOpen, + New: enum.PullReqStateClosed, + OldDraft: pr.IsDraft, + NewDraft: pr.IsDraft, + } + if _, err := s.activityStore.CreateWithPayload(ctx, pr, event.Payload.PrincipalID, payload); err != nil { + // non-critical error + log.Ctx(ctx).Err(err).Msg( + "failed to write pull request activity for pullrequest closure after branch deletion", + ) } s.pullreqEvReporter.Closed(ctx, &pullreqevents.ClosedPayload{ diff --git a/lock/lock.go b/lock/lock.go index 5a7765f52..357030264 100644 --- a/lock/lock.go +++ b/lock/lock.go @@ -19,28 +19,28 @@ import ( "fmt" ) -// KindError enum displays human readable message +// ErrorKind enum displays human readable message // in error. -type KindError string +type ErrorKind string const ( - LockHeld KindError = "lock already held" - LockNotHeld KindError = "lock not held" - ProviderError KindError = "lock provider error" - CannotLock KindError = "timeout while trying to acquire lock" - Context KindError = "context error while trying to acquire lock" - MaxRetriesExceeded KindError = "max retries exceeded to acquire lock" - GenerateTokenFailed KindError = "token generation failed" + ErrorKindLockHeld ErrorKind = "lock already held" + ErrorKindLockNotHeld ErrorKind = "lock not held" + ErrorKindProviderError ErrorKind = "lock provider error" + ErrorKindCannotLock ErrorKind = "timeout while trying to acquire lock" + ErrorKindContext ErrorKind = "context error while trying to acquire lock" + ErrorKindMaxRetriesExceeded ErrorKind = "max retries exceeded to acquire lock" + ErrorKindGenerateTokenFailed ErrorKind = "token generation failed" ) // Error is custom unique type for all type of errors. type Error struct { - Kind KindError + Kind ErrorKind Key string Err error } -func NewError(kind KindError, key string, err error) *Error { +func NewError(kind ErrorKind, key string, err error) *Error { return &Error{ Kind: kind, Key: key, diff --git a/lock/memory.go b/lock/memory.go index e58c079c6..61bb807aa 100644 --- a/lock/memory.go +++ b/lock/memory.go @@ -74,7 +74,7 @@ func (m *InMemory) NewMutex(key string, options ...Option) (Mutex, error) { token, err = randstr(32) } if err != nil { - return nil, NewError(GenerateTokenFailed, key, nil) + return nil, NewError(ErrorKindGenerateTokenFailed, key, nil) } // waitTime logic is similar to redis implementation: @@ -159,7 +159,7 @@ func (m *inMemMutex) Lock(ctx context.Context) error { defer m.mutex.Unlock() if m.isHeld { - return NewError(LockHeld, m.key, nil) + return NewError(ErrorKindLockHeld, m.key, nil) } if m.provider.acquire(m.key, m.token, m.expiry) { @@ -183,7 +183,7 @@ func (m *inMemMutex) retry(ctx context.Context, attempt int, timeout *time.Timer return nil } if attempt == m.tries { - return NewError(MaxRetriesExceeded, m.key, nil) + return NewError(ErrorKindMaxRetriesExceeded, m.key, nil) } delay := time.NewTimer(m.delayFunc(attempt)) @@ -191,9 +191,9 @@ func (m *inMemMutex) retry(ctx context.Context, attempt int, timeout *time.Timer select { case <-ctx.Done(): - return NewError(Context, m.key, ctx.Err()) + return NewError(ErrorKindContext, m.key, ctx.Err()) case <-timeout.C: - return NewError(CannotLock, m.key, nil) + return NewError(ErrorKindCannotLock, m.key, nil) case <-delay.C: // just wait } @@ -209,7 +209,7 @@ func (m *inMemMutex) Unlock(_ context.Context) error { defer m.mutex.Unlock() if !m.isHeld || !m.provider.release(m.key, m.token) { - return NewError(LockNotHeld, m.key, nil) + return NewError(ErrorKindLockNotHeld, m.key, nil) } m.isHeld = false diff --git a/lock/memory_test.go b/lock/memory_test.go index 7f959853b..c8ff53bfd 100644 --- a/lock/memory_test.go +++ b/lock/memory_test.go @@ -93,7 +93,7 @@ func Test_inMemMutex_MaxTries(t *testing.T) { t.Errorf("expected error lock.Error, got: %v", err) return } - if errLock.Kind != MaxRetriesExceeded { + if errLock.Kind != ErrorKindMaxRetriesExceeded { t.Errorf("expected lock.MaxRetriesExceeded, got: %v", err) return } diff --git a/lock/redis.go b/lock/redis.go index 89698abf9..c529a0ff6 100644 --- a/lock/redis.go +++ b/lock/redis.go @@ -102,14 +102,14 @@ func (l *RedisMutex) Unlock(ctx context.Context) error { } func translateRedisErr(err error, key string) error { - var kind KindError + var kind ErrorKind switch { case errors.Is(err, redsync.ErrFailed): - kind = CannotLock + kind = ErrorKindCannotLock case errors.Is(err, redsync.ErrExtendFailed), errors.Is(err, &redsync.RedisError{}): - kind = ProviderError + kind = ErrorKindProviderError case errors.Is(err, &redsync.ErrTaken{}), errors.Is(err, &redsync.ErrNodeTaken{}): - kind = LockHeld + kind = ErrorKindLockHeld } return NewError(kind, key, err) } diff --git a/types/config.go b/types/config.go index bd7ef07a0..c52de59ad 100644 --- a/types/config.go +++ b/types/config.go @@ -225,10 +225,10 @@ type Config struct { // Provider is a name of distributed lock service like redis, memory, file etc... Provider lock.Provider `envconfig:"GITNESS_LOCK_PROVIDER" default:"inmemory"` Expiry time.Duration `envconfig:"GITNESS_LOCK_EXPIRE" default:"8s"` - Tries int `envconfig:"GITNESS_LOCK_TRIES" default:"32"` + Tries int `envconfig:"GITNESS_LOCK_TRIES" default:"8"` RetryDelay time.Duration `envconfig:"GITNESS_LOCK_RETRY_DELAY" default:"250ms"` DriftFactor float64 `envconfig:"GITNESS_LOCK_DRIFT_FACTOR" default:"0.01"` - TimeoutFactor float64 `envconfig:"GITNESS_LOCK_TIMEOUT_FACTOR" default:"0.05"` + TimeoutFactor float64 `envconfig:"GITNESS_LOCK_TIMEOUT_FACTOR" default:"0.25"` // AppNamespace is just service app prefix to avoid conflicts on key definition AppNamespace string `envconfig:"GITNESS_LOCK_APP_NAMESPACE" default:"gitness"` // DefaultNamespace is when mutex doesn't specify custom namespace for their keys diff --git a/types/pullreq_activity.go b/types/pullreq_activity.go index 9e3e919de..3c9c88054 100644 --- a/types/pullreq_activity.go +++ b/types/pullreq_activity.go @@ -235,7 +235,6 @@ type PullRequestActivityPayloadStateChange struct { New enum.PullReqState `json:"new"` OldDraft bool `json:"old_draft"` NewDraft bool `json:"new_draft"` - Message string `json:"message,omitempty"` } func (a *PullRequestActivityPayloadStateChange) ActivityType() enum.PullReqActivityType { @@ -253,7 +252,6 @@ func (a *PullRequestActivityPayloadTitleChange) ActivityType() enum.PullReqActiv type PullRequestActivityPayloadReviewSubmit struct { CommitSHA string `json:"commit_sha"` - Message string `json:"message,omitempty"` Decision enum.PullReqReviewDecision `json:"decision"` }