[BugBash] Fix PR Activities, Increase merge lock timeout (#783)

rkapoor10-patch-1
Johannes Batzill 2023-11-09 20:04:36 +00:00 committed by Harness
parent 073fc482ed
commit e6bba2e0ce
13 changed files with 131 additions and 48 deletions

View File

@ -16,6 +16,7 @@ package pullreq
import ( import (
"strconv" "strconv"
"time"
"github.com/harness/gitness/lock" "github.com/harness/gitness/lock"
) )
@ -25,5 +26,11 @@ func (c *Controller) newMutexForPR(repoUID string, pr int64, options ...lock.Opt
if pr != 0 { if pr != 0 {
key += "/" + strconv.FormatInt(pr, 10) key += "/" + strconv.FormatInt(pr, 10)
} }
return c.mtxManager.NewMutex(key, append(options, lock.WithNamespace("repo"))...) return c.mtxManager.NewMutex(
key,
append(options,
lock.WithNamespace("repo"),
lock.WithExpiry(16*time.Second),
lock.WithTimeoutFactor(0.25), // 4s
)...)
} }

View File

@ -265,6 +265,7 @@ func (c *Controller) Merge(
return nil, nil, fmt.Errorf("merge check execution failed: %w", err) return nil, nil, fmt.Errorf("merge check execution failed: %w", err)
} }
var activitySeqMerge, activitySeqBranchDeleted int64
pr, err = c.pullreqStore.UpdateOptLock(ctx, pr, func(pr *types.PullReq) error { pr, err = c.pullreqStore.UpdateOptLock(ctx, pr, func(pr *types.PullReq) error {
pr.State = enum.PullReqStateMerged pr.State = enum.PullReqStateMerged
@ -280,13 +281,22 @@ func (c *Controller) Merge(
pr.MergeSHA = &mergeOutput.MergeSHA pr.MergeSHA = &mergeOutput.MergeSHA
pr.MergeConflicts = nil pr.MergeConflicts = nil
pr.ActivitySeq++ // because we need to write the activity entry // update sequence for PR activities
pr.ActivitySeq++
activitySeqMerge = pr.ActivitySeq
if ruleOut.DeleteSourceBranch {
pr.ActivitySeq++
activitySeqBranchDeleted = pr.ActivitySeq
}
return nil return nil
}) })
if err != nil { if err != nil {
return nil, nil, fmt.Errorf("failed to update pull request: %w", err) return nil, nil, fmt.Errorf("failed to update pull request: %w", err)
} }
pr.ActivitySeq = activitySeqMerge
activityPayload := &types.PullRequestActivityPayloadMerge{ activityPayload := &types.PullRequestActivityPayloadMerge{
MergeMethod: in.Method, MergeMethod: in.Method,
MergeSHA: mergeOutput.MergeSHA, MergeSHA: mergeOutput.MergeSHA,
@ -317,9 +327,22 @@ func (c *Controller) Merge(
log.Ctx(ctx).Err(errDelete).Msgf("failed to delete source branch after merging") log.Ctx(ctx).Err(errDelete).Msgf("failed to delete source branch after merging")
} else { } else {
branchDeleted = true branchDeleted = true
// NOTE: there is a chance someone pushed on the branch between merge and delete.
// Either way, we'll use the SHA that was merged with for the activity to be consistent from PR perspective.
pr.ActivitySeq = activitySeqBranchDeleted
if _, errAct := c.activityStore.CreateWithPayload(ctx, pr, session.Principal.ID,
&types.PullRequestActivityPayloadBranchDelete{SHA: in.SourceSHA}); errAct != nil {
// non-critical error
log.Ctx(ctx).Err(errAct).Msgf("failed to write pull request activity for successful automatic branch delete")
}
} }
} }
if err = c.sseStreamer.Publish(ctx, targetRepo.ParentID, enum.SSETypePullrequesUpdated, pr); err != nil {
log.Ctx(ctx).Warn().Msg("failed to publish PR changed event")
}
return &types.MergeResponse{ return &types.MergeResponse{
SHA: mergeOutput.MergeSHA, SHA: mergeOutput.MergeSHA,
BranchDeleted: branchDeleted, BranchDeleted: branchDeleted,

View File

@ -17,7 +17,6 @@ package pullreq
import ( import (
"context" "context"
"fmt" "fmt"
"strings"
"time" "time"
apiauth "github.com/harness/gitness/app/api/auth" apiauth "github.com/harness/gitness/app/api/auth"
@ -34,7 +33,6 @@ import (
type StateInput struct { type StateInput struct {
State enum.PullReqState `json:"state"` State enum.PullReqState `json:"state"`
IsDraft bool `json:"is_draft"` IsDraft bool `json:"is_draft"`
Message string `json:"message"`
} }
func (in *StateInput) Check() error { func (in *StateInput) Check() error {
@ -45,14 +43,11 @@ func (in *StateInput) Check() error {
} }
in.State = state in.State = state
in.Message = strings.TrimSpace(in.Message)
if in.State == enum.PullReqStateMerged { if in.State == enum.PullReqStateMerged {
return usererror.BadRequest("Pull requests can't be merged with this API") return usererror.BadRequest("Pull requests can't be merged with this API")
} }
// TODO: Need to check the length of the message string
return nil return nil
} }
@ -171,7 +166,6 @@ func (c *Controller) State(ctx context.Context,
New: pr.State, New: pr.State,
OldDraft: oldDraft, OldDraft: oldDraft,
NewDraft: pr.IsDraft, NewDraft: pr.IsDraft,
Message: in.Message,
} }
if _, errAct := c.activityStore.CreateWithPayload(ctx, pr, session.Principal.ID, payload); errAct != nil { if _, errAct := c.activityStore.CreateWithPayload(ctx, pr, session.Principal.ID, payload); errAct != nil {
// non-critical error // non-critical error

View File

@ -18,7 +18,6 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"strings"
"time" "time"
"github.com/harness/gitness/app/api/usererror" "github.com/harness/gitness/app/api/usererror"
@ -34,7 +33,6 @@ import (
type ReviewSubmitInput struct { type ReviewSubmitInput struct {
CommitSHA string `json:"commit_sha"` CommitSHA string `json:"commit_sha"`
Decision enum.PullReqReviewDecision `json:"decision"` Decision enum.PullReqReviewDecision `json:"decision"`
Message string `json:"message"`
} }
func (in *ReviewSubmitInput) Validate() error { func (in *ReviewSubmitInput) Validate() error {
@ -50,11 +48,7 @@ func (in *ReviewSubmitInput) Validate() error {
enum.PullReqReviewDecisionReviewed) enum.PullReqReviewDecisionReviewed)
return usererror.BadRequest(msg) return usererror.BadRequest(msg)
} }
in.Decision = decision in.Decision = decision
in.Message = strings.TrimSpace(in.Message)
// TODO: Check the length of the message string
return nil return nil
} }
@ -128,7 +122,6 @@ func (c *Controller) ReviewSubmit(
payload := &types.PullRequestActivityPayloadReviewSubmit{ payload := &types.PullRequestActivityPayloadReviewSubmit{
CommitSHA: commitSHA, CommitSHA: commitSHA,
Message: in.Message,
Decision: in.Decision, Decision: in.Decision,
} }
_, err = c.activityStore.CreateWithPayload(ctx, pr, session.Principal.ID, payload) _, err = c.activityStore.CreateWithPayload(ctx, pr, session.Principal.ID, payload)

View File

@ -23,6 +23,7 @@ import (
"github.com/harness/gitness/app/services/webhook" "github.com/harness/gitness/app/services/webhook"
"github.com/harness/gitness/blob" "github.com/harness/gitness/blob"
"github.com/harness/gitness/gitrpc" "github.com/harness/gitness/gitrpc"
"github.com/harness/gitness/lock"
"github.com/harness/gitness/store" "github.com/harness/gitness/store"
"github.com/harness/gitness/types/check" "github.com/harness/gitness/types/check"
@ -36,6 +37,7 @@ func Translate(err error) *Error {
gitrpcError *gitrpc.Error gitrpcError *gitrpc.Error
maxBytesErr *http.MaxBytesError maxBytesErr *http.MaxBytesError
codeOwnersTooLargeError *codeowners.TooLargeError codeOwnersTooLargeError *codeowners.TooLargeError
lockError *lock.Error
) )
// TODO: Improve performance of checking multiple errors with errors.Is // TODO: Improve performance of checking multiple errors with errors.Is
@ -89,12 +91,16 @@ func Translate(err error) *Error {
case errors.Is(err, webhook.ErrWebhookNotRetriggerable): case errors.Is(err, webhook.ErrWebhookNotRetriggerable):
return ErrWebhookNotRetriggerable return ErrWebhookNotRetriggerable
// codeowners errors // codeowners errors
case errors.Is(err, codeowners.ErrNotFound): case errors.Is(err, codeowners.ErrNotFound):
return ErrCodeOwnersNotFound return ErrCodeOwnersNotFound
case errors.As(err, &codeOwnersTooLargeError): case errors.As(err, &codeOwnersTooLargeError):
return UnprocessableEntityf(codeOwnersTooLargeError.Error()) return UnprocessableEntityf(codeOwnersTooLargeError.Error())
// lock errors
case errors.As(err, &lockError):
return errorFromLockError(lockError)
// unknown error // unknown error
default: default:
log.Warn().Msgf("Unable to translate error: %s", err) log.Warn().Msgf("Unable to translate error: %s", err)
@ -102,6 +108,18 @@ func Translate(err error) *Error {
} }
} }
// errorFromLockError returns the associated error for a given lock error.
func errorFromLockError(err *lock.Error) *Error {
log.Warn().Err(err).Msg("encountered lock error")
if err.Kind == lock.ErrorKindCannotLock ||
err.Kind == lock.ErrorKindLockHeld ||
err.Kind == lock.ErrorKindMaxRetriesExceeded {
return ErrResourceLocked
}
return ErrInternal
}
// lookup of gitrpc error codes to HTTP status codes. // lookup of gitrpc error codes to HTTP status codes.
var codes = map[gitrpc.Status]int{ var codes = map[gitrpc.Status]int{
gitrpc.StatusConflict: http.StatusConflict, gitrpc.StatusConflict: http.StatusConflict,

View File

@ -82,6 +82,12 @@ var (
// ErrResponseNotFlushable is returned if the response writer doesn't implement http.Flusher. // ErrResponseNotFlushable is returned if the response writer doesn't implement http.Flusher.
ErrResponseNotFlushable = New(http.StatusInternalServerError, "Response not streamable") ErrResponseNotFlushable = New(http.StatusInternalServerError, "Response not streamable")
// ErrResourceLocked is returned if the resource is locked.
ErrResourceLocked = New(
http.StatusLocked,
"The requested resource is temporarily locked, please retry the operation.",
)
) )
// Error represents a json-encoded API error. // Error represents a json-encoded API error.

View File

@ -16,6 +16,7 @@ package pullreq
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"strings" "strings"
"time" "time"
@ -30,8 +31,14 @@ import (
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
) )
var (
errPRNotOpen = errors.New("PR is not open")
)
// triggerPREventOnBranchUpdate handles branch update events. For every open pull request // triggerPREventOnBranchUpdate handles branch update events. For every open pull request
// it writes an activity entry and triggers the pull request Branch Updated event. // it writes an activity entry and triggers the pull request Branch Updated event.
//
//nolint:gocognit // refactor if needed
func (s *Service) triggerPREventOnBranchUpdate(ctx context.Context, func (s *Service) triggerPREventOnBranchUpdate(ctx context.Context,
event *events.Event[*gitevents.BranchUpdatedPayload], event *events.Event[*gitevents.BranchUpdatedPayload],
) error { ) error {
@ -75,8 +82,12 @@ func (s *Service) triggerPREventOnBranchUpdate(ctx context.Context,
newMergeBase := mergeBaseInfo.MergeBaseSHA newMergeBase := mergeBaseInfo.MergeBaseSHA
// Update the database with the latest source commit SHA and the merge base SHA. // Update the database with the latest source commit SHA and the merge base SHA.
pr, err = s.pullreqStore.UpdateOptLock(ctx, pr, func(pr *types.PullReq) error { pr, err = s.pullreqStore.UpdateOptLock(ctx, pr, func(pr *types.PullReq) error {
// to avoid racing conditions
if pr.State != enum.PullReqStateOpen {
return errPRNotOpen
}
pr.ActivitySeq++ pr.ActivitySeq++
if pr.SourceSHA != event.Payload.OldSHA { if pr.SourceSHA != event.Payload.OldSHA {
return fmt.Errorf( return fmt.Errorf(
@ -94,6 +105,9 @@ func (s *Service) triggerPREventOnBranchUpdate(ctx context.Context,
pr.MergeConflicts = nil pr.MergeConflicts = nil
return nil return nil
}) })
if errors.Is(err, errPRNotOpen) {
return nil
}
if err != nil { if err != nil {
return err return err
} }
@ -144,8 +158,17 @@ func (s *Service) closePullReqOnBranchDelete(ctx context.Context,
return fmt.Errorf("failed to get repo info: %w", err) return fmt.Errorf("failed to get repo info: %w", err)
} }
var activitySeqBranchDeleted, activitySeqPRClosed int64
pr, err = s.pullreqStore.UpdateOptLock(ctx, pr, func(pr *types.PullReq) error { pr, err = s.pullreqStore.UpdateOptLock(ctx, pr, func(pr *types.PullReq) error {
pr.ActivitySeq++ // because we need to write the activity // to avoid racing conditions
if pr.State != enum.PullReqStateOpen {
return errPRNotOpen
}
// get sequence numbers for both activities (branch deletion should be first)
pr.ActivitySeq += 2
activitySeqBranchDeleted = pr.ActivitySeq - 1
activitySeqPRClosed = pr.ActivitySeq
pr.State = enum.PullReqStateClosed pr.State = enum.PullReqStateClosed
pr.MergeCheckStatus = enum.MergeCheckStatusUnchecked pr.MergeCheckStatus = enum.MergeCheckStatusUnchecked
@ -154,15 +177,36 @@ func (s *Service) closePullReqOnBranchDelete(ctx context.Context,
return nil return nil
}) })
if errors.Is(err, errPRNotOpen) {
return nil
}
if err != nil { if err != nil {
return fmt.Errorf("failed to close pull request after branch delete: %w", err) return fmt.Errorf("failed to close pull request after branch delete: %w", err)
} }
_, errAct := s.activityStore.CreateWithPayload(ctx, pr, event.Payload.PrincipalID, // NOTE: We use the latest PR source sha for the branch deleted activity.
&types.PullRequestActivityPayloadBranchDelete{SHA: event.Payload.SHA}) // There is a chance the PR is behind, but we can't guarantee any missing commit exists after branch deletion.
if errAct != nil { // Whatever is the source sha of the PR is most likely to be pointed at by the PR head ref.
pr.ActivitySeq = activitySeqBranchDeleted
_, err = s.activityStore.CreateWithPayload(ctx, pr, event.Payload.PrincipalID,
&types.PullRequestActivityPayloadBranchDelete{SHA: pr.SourceSHA})
if err != nil {
// non-critical error // non-critical error
log.Ctx(ctx).Err(errAct).Msgf("failed to write pull request activity after branch delete") log.Ctx(ctx).Err(err).Msg("failed to write pull request activity for branch deletion")
}
pr.ActivitySeq = activitySeqPRClosed
payload := &types.PullRequestActivityPayloadStateChange{
Old: enum.PullReqStateOpen,
New: enum.PullReqStateClosed,
OldDraft: pr.IsDraft,
NewDraft: pr.IsDraft,
}
if _, err := s.activityStore.CreateWithPayload(ctx, pr, event.Payload.PrincipalID, payload); err != nil {
// non-critical error
log.Ctx(ctx).Err(err).Msg(
"failed to write pull request activity for pullrequest closure after branch deletion",
)
} }
s.pullreqEvReporter.Closed(ctx, &pullreqevents.ClosedPayload{ s.pullreqEvReporter.Closed(ctx, &pullreqevents.ClosedPayload{

View File

@ -19,28 +19,28 @@ import (
"fmt" "fmt"
) )
// KindError enum displays human readable message // ErrorKind enum displays human readable message
// in error. // in error.
type KindError string type ErrorKind string
const ( const (
LockHeld KindError = "lock already held" ErrorKindLockHeld ErrorKind = "lock already held"
LockNotHeld KindError = "lock not held" ErrorKindLockNotHeld ErrorKind = "lock not held"
ProviderError KindError = "lock provider error" ErrorKindProviderError ErrorKind = "lock provider error"
CannotLock KindError = "timeout while trying to acquire lock" ErrorKindCannotLock ErrorKind = "timeout while trying to acquire lock"
Context KindError = "context error while trying to acquire lock" ErrorKindContext ErrorKind = "context error while trying to acquire lock"
MaxRetriesExceeded KindError = "max retries exceeded to acquire lock" ErrorKindMaxRetriesExceeded ErrorKind = "max retries exceeded to acquire lock"
GenerateTokenFailed KindError = "token generation failed" ErrorKindGenerateTokenFailed ErrorKind = "token generation failed"
) )
// Error is custom unique type for all type of errors. // Error is custom unique type for all type of errors.
type Error struct { type Error struct {
Kind KindError Kind ErrorKind
Key string Key string
Err error Err error
} }
func NewError(kind KindError, key string, err error) *Error { func NewError(kind ErrorKind, key string, err error) *Error {
return &Error{ return &Error{
Kind: kind, Kind: kind,
Key: key, Key: key,

View File

@ -74,7 +74,7 @@ func (m *InMemory) NewMutex(key string, options ...Option) (Mutex, error) {
token, err = randstr(32) token, err = randstr(32)
} }
if err != nil { if err != nil {
return nil, NewError(GenerateTokenFailed, key, nil) return nil, NewError(ErrorKindGenerateTokenFailed, key, nil)
} }
// waitTime logic is similar to redis implementation: // waitTime logic is similar to redis implementation:
@ -159,7 +159,7 @@ func (m *inMemMutex) Lock(ctx context.Context) error {
defer m.mutex.Unlock() defer m.mutex.Unlock()
if m.isHeld { if m.isHeld {
return NewError(LockHeld, m.key, nil) return NewError(ErrorKindLockHeld, m.key, nil)
} }
if m.provider.acquire(m.key, m.token, m.expiry) { if m.provider.acquire(m.key, m.token, m.expiry) {
@ -183,7 +183,7 @@ func (m *inMemMutex) retry(ctx context.Context, attempt int, timeout *time.Timer
return nil return nil
} }
if attempt == m.tries { if attempt == m.tries {
return NewError(MaxRetriesExceeded, m.key, nil) return NewError(ErrorKindMaxRetriesExceeded, m.key, nil)
} }
delay := time.NewTimer(m.delayFunc(attempt)) delay := time.NewTimer(m.delayFunc(attempt))
@ -191,9 +191,9 @@ func (m *inMemMutex) retry(ctx context.Context, attempt int, timeout *time.Timer
select { select {
case <-ctx.Done(): case <-ctx.Done():
return NewError(Context, m.key, ctx.Err()) return NewError(ErrorKindContext, m.key, ctx.Err())
case <-timeout.C: case <-timeout.C:
return NewError(CannotLock, m.key, nil) return NewError(ErrorKindCannotLock, m.key, nil)
case <-delay.C: // just wait case <-delay.C: // just wait
} }
@ -209,7 +209,7 @@ func (m *inMemMutex) Unlock(_ context.Context) error {
defer m.mutex.Unlock() defer m.mutex.Unlock()
if !m.isHeld || !m.provider.release(m.key, m.token) { if !m.isHeld || !m.provider.release(m.key, m.token) {
return NewError(LockNotHeld, m.key, nil) return NewError(ErrorKindLockNotHeld, m.key, nil)
} }
m.isHeld = false m.isHeld = false

View File

@ -93,7 +93,7 @@ func Test_inMemMutex_MaxTries(t *testing.T) {
t.Errorf("expected error lock.Error, got: %v", err) t.Errorf("expected error lock.Error, got: %v", err)
return return
} }
if errLock.Kind != MaxRetriesExceeded { if errLock.Kind != ErrorKindMaxRetriesExceeded {
t.Errorf("expected lock.MaxRetriesExceeded, got: %v", err) t.Errorf("expected lock.MaxRetriesExceeded, got: %v", err)
return return
} }

View File

@ -102,14 +102,14 @@ func (l *RedisMutex) Unlock(ctx context.Context) error {
} }
func translateRedisErr(err error, key string) error { func translateRedisErr(err error, key string) error {
var kind KindError var kind ErrorKind
switch { switch {
case errors.Is(err, redsync.ErrFailed): case errors.Is(err, redsync.ErrFailed):
kind = CannotLock kind = ErrorKindCannotLock
case errors.Is(err, redsync.ErrExtendFailed), errors.Is(err, &redsync.RedisError{}): case errors.Is(err, redsync.ErrExtendFailed), errors.Is(err, &redsync.RedisError{}):
kind = ProviderError kind = ErrorKindProviderError
case errors.Is(err, &redsync.ErrTaken{}), errors.Is(err, &redsync.ErrNodeTaken{}): case errors.Is(err, &redsync.ErrTaken{}), errors.Is(err, &redsync.ErrNodeTaken{}):
kind = LockHeld kind = ErrorKindLockHeld
} }
return NewError(kind, key, err) return NewError(kind, key, err)
} }

View File

@ -225,10 +225,10 @@ type Config struct {
// Provider is a name of distributed lock service like redis, memory, file etc... // Provider is a name of distributed lock service like redis, memory, file etc...
Provider lock.Provider `envconfig:"GITNESS_LOCK_PROVIDER" default:"inmemory"` Provider lock.Provider `envconfig:"GITNESS_LOCK_PROVIDER" default:"inmemory"`
Expiry time.Duration `envconfig:"GITNESS_LOCK_EXPIRE" default:"8s"` Expiry time.Duration `envconfig:"GITNESS_LOCK_EXPIRE" default:"8s"`
Tries int `envconfig:"GITNESS_LOCK_TRIES" default:"32"` Tries int `envconfig:"GITNESS_LOCK_TRIES" default:"8"`
RetryDelay time.Duration `envconfig:"GITNESS_LOCK_RETRY_DELAY" default:"250ms"` RetryDelay time.Duration `envconfig:"GITNESS_LOCK_RETRY_DELAY" default:"250ms"`
DriftFactor float64 `envconfig:"GITNESS_LOCK_DRIFT_FACTOR" default:"0.01"` DriftFactor float64 `envconfig:"GITNESS_LOCK_DRIFT_FACTOR" default:"0.01"`
TimeoutFactor float64 `envconfig:"GITNESS_LOCK_TIMEOUT_FACTOR" default:"0.05"` TimeoutFactor float64 `envconfig:"GITNESS_LOCK_TIMEOUT_FACTOR" default:"0.25"`
// AppNamespace is just service app prefix to avoid conflicts on key definition // AppNamespace is just service app prefix to avoid conflicts on key definition
AppNamespace string `envconfig:"GITNESS_LOCK_APP_NAMESPACE" default:"gitness"` AppNamespace string `envconfig:"GITNESS_LOCK_APP_NAMESPACE" default:"gitness"`
// DefaultNamespace is when mutex doesn't specify custom namespace for their keys // DefaultNamespace is when mutex doesn't specify custom namespace for their keys

View File

@ -235,7 +235,6 @@ type PullRequestActivityPayloadStateChange struct {
New enum.PullReqState `json:"new"` New enum.PullReqState `json:"new"`
OldDraft bool `json:"old_draft"` OldDraft bool `json:"old_draft"`
NewDraft bool `json:"new_draft"` NewDraft bool `json:"new_draft"`
Message string `json:"message,omitempty"`
} }
func (a *PullRequestActivityPayloadStateChange) ActivityType() enum.PullReqActivityType { func (a *PullRequestActivityPayloadStateChange) ActivityType() enum.PullReqActivityType {
@ -253,7 +252,6 @@ func (a *PullRequestActivityPayloadTitleChange) ActivityType() enum.PullReqActiv
type PullRequestActivityPayloadReviewSubmit struct { type PullRequestActivityPayloadReviewSubmit struct {
CommitSHA string `json:"commit_sha"` CommitSHA string `json:"commit_sha"`
Message string `json:"message,omitempty"`
Decision enum.PullReqReviewDecision `json:"decision"` Decision enum.PullReqReviewDecision `json:"decision"`
} }