drone/app/services/pullreq/handlers_mergeable.go

256 lines
7.7 KiB
Go

// Copyright 2023 Harness, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package pullreq
import (
"context"
"fmt"
"strconv"
"time"
pullreqevents "github.com/harness/gitness/app/events/pullreq"
"github.com/harness/gitness/errors"
"github.com/harness/gitness/events"
"github.com/harness/gitness/git"
gitenum "github.com/harness/gitness/git/enum"
"github.com/harness/gitness/git/sha"
"github.com/harness/gitness/pubsub"
"github.com/harness/gitness/types"
"github.com/harness/gitness/types/enum"
"github.com/gotidy/ptr"
"github.com/rs/zerolog/log"
)
const (
cancelMergeCheckKey = "cancel_merge_check_for_sha"
)
// mergeCheckOnCreated handles pull request Created events.
// It creates the PR head git ref.
func (s *Service) mergeCheckOnCreated(ctx context.Context,
event *events.Event[*pullreqevents.CreatedPayload],
) error {
return s.updateMergeData(
ctx,
event.Payload.TargetRepoID,
event.Payload.Number,
sha.Nil.String(),
event.Payload.SourceSHA,
)
}
// mergeCheckOnBranchUpdate handles pull request Branch Updated events.
// It updates the PR head git ref to point to the latest commit.
func (s *Service) mergeCheckOnBranchUpdate(ctx context.Context,
event *events.Event[*pullreqevents.BranchUpdatedPayload],
) error {
return s.updateMergeData(
ctx,
event.Payload.TargetRepoID,
event.Payload.Number,
event.Payload.OldSHA,
event.Payload.NewSHA,
)
}
// mergeCheckOnReopen handles pull request StateChanged events.
// It updates the PR head git ref to point to the source branch commit SHA.
func (s *Service) mergeCheckOnReopen(ctx context.Context,
event *events.Event[*pullreqevents.ReopenedPayload],
) error {
return s.updateMergeData(
ctx,
event.Payload.TargetRepoID,
event.Payload.Number,
sha.None.String(),
event.Payload.SourceSHA,
)
}
// mergeCheckOnClosed deletes the merge ref.
func (s *Service) mergeCheckOnClosed(ctx context.Context,
event *events.Event[*pullreqevents.ClosedPayload],
) error {
return s.deleteMergeRef(ctx, event.Payload.SourceRepoID, event.Payload.Number)
}
// mergeCheckOnMerged deletes the merge ref.
func (s *Service) mergeCheckOnMerged(ctx context.Context,
event *events.Event[*pullreqevents.MergedPayload],
) error {
return s.deleteMergeRef(ctx, event.Payload.SourceRepoID, event.Payload.Number)
}
func (s *Service) deleteMergeRef(ctx context.Context, repoID int64, prNum int64) error {
repo, err := s.repoGitInfoCache.Get(ctx, repoID)
if err != nil {
return fmt.Errorf("failed to get repo with ID %d: %w", repoID, err)
}
writeParams, err := createSystemRPCWriteParams(ctx, s.urlProvider, repo.ID, repo.GitUID)
if err != nil {
return fmt.Errorf("failed to generate rpc write params: %w", err)
}
// TODO: This doesn't work for forked repos
err = s.git.UpdateRef(ctx, git.UpdateRefParams{
WriteParams: writeParams,
Name: strconv.Itoa(int(prNum)),
Type: gitenum.RefTypePullReqMerge,
NewValue: sha.None, // when NewValue is empty will delete the ref.
OldValue: sha.None, // we don't care about the old value
})
if err != nil {
return fmt.Errorf("failed to remove PR merge ref: %w", err)
}
return nil
}
//nolint:funlen // refactor if required.
func (s *Service) updateMergeData(
ctx context.Context,
repoID int64,
prNum int64,
oldSHA string,
newSHA string,
) error {
pr, err := s.pullreqStore.FindByNumber(ctx, repoID, prNum)
if err != nil {
return fmt.Errorf("failed to get pull request number %d: %w", prNum, err)
}
// TODO: Merge check should not update the merge base.
// TODO: Instead it should accept it as an argument and fail if it doesn't match.
// Then is would not longer be necessary to cancel already active mergeability checks.
if pr.State != enum.PullReqStateOpen {
return fmt.Errorf("cannot do mergability check on closed PR %d", pr.Number)
}
// cancel all previous mergability work for this PR based on oldSHA
if err := s.pubsub.Publish(ctx, cancelMergeCheckKey, []byte(oldSHA),
pubsub.WithPublishNamespace("pullreq")); err != nil {
return err
}
var cancel context.CancelFunc
ctx, cancel = context.WithCancel(ctx)
s.cancelMutex.Lock()
// NOTE: Temporary workaround to avoid overwriting existing cancel method on same machine.
// This doesn't avoid same SHA running on multiple machines
if _, ok := s.cancelMergeability[newSHA]; ok {
s.cancelMutex.Unlock()
cancel()
return nil
}
s.cancelMergeability[newSHA] = cancel
s.cancelMutex.Unlock()
defer func() {
cancel()
s.cancelMutex.Lock()
delete(s.cancelMergeability, newSHA)
s.cancelMutex.Unlock()
}()
// load repository objects
targetRepo, err := s.repoGitInfoCache.Get(ctx, pr.TargetRepoID)
if err != nil {
return err
}
sourceRepo := targetRepo
if pr.TargetRepoID != pr.SourceRepoID {
sourceRepo, err = s.repoGitInfoCache.Get(ctx, pr.SourceRepoID)
if err != nil {
return err
}
}
writeParams, err := createSystemRPCWriteParams(ctx, s.urlProvider, targetRepo.ID, targetRepo.GitUID)
if err != nil {
return fmt.Errorf("failed to generate rpc write params: %w", err)
}
// call merge and store output in pr merge reference.
now := time.Now()
mergeOutput, err := s.git.Merge(ctx, &git.MergeParams{
WriteParams: writeParams,
BaseBranch: pr.TargetBranch,
HeadRepoUID: sourceRepo.GitUID,
HeadBranch: pr.SourceBranch,
RefType: gitenum.RefTypePullReqMerge,
RefName: strconv.Itoa(int(pr.Number)),
HeadExpectedSHA: sha.Must(newSHA),
Force: true,
// set committer date to ensure repeatability of merge commit across replicas
CommitterDate: &now,
})
if errors.AsStatus(err) == errors.StatusPreconditionFailed {
return events.NewDiscardEventErrorf("Source branch %q is not on SHA %q anymore.",
pr.SourceBranch, newSHA)
}
if err != nil {
return fmt.Errorf("failed to run git merge with base %q and head %q: %w", pr.TargetBranch, pr.SourceBranch, err)
}
// Update DB in both cases (failure or success)
_, err = s.pullreqStore.UpdateOptLock(ctx, pr, func(pr *types.PullReq) error {
// to avoid racing conditions with merge
if pr.State != enum.PullReqStateOpen {
return errPRNotOpen
}
if pr.SourceSHA != newSHA {
return events.NewDiscardEventErrorf("PR SHA %s is newer than %s", pr.SourceSHA, newSHA)
}
if len(mergeOutput.ConflictFiles) > 0 {
pr.MergeCheckStatus = enum.MergeCheckStatusConflict
pr.MergeBaseSHA = mergeOutput.MergeBaseSHA.String()
pr.MergeTargetSHA = ptr.String(mergeOutput.BaseSHA.String())
pr.MergeSHA = nil
pr.MergeConflicts = mergeOutput.ConflictFiles
} else {
pr.MergeCheckStatus = enum.MergeCheckStatusMergeable
pr.MergeBaseSHA = mergeOutput.MergeBaseSHA.String()
pr.MergeTargetSHA = ptr.String(mergeOutput.BaseSHA.String())
pr.MergeSHA = ptr.String(mergeOutput.MergeSHA.String())
pr.MergeConflicts = nil
}
pr.Stats.DiffStats = types.NewDiffStats(
mergeOutput.CommitCount,
mergeOutput.ChangedFileCount,
mergeOutput.Additions,
mergeOutput.Deletions,
)
return nil
})
if err != nil {
return fmt.Errorf("failed to update PR merge ref in db with error: %w", err)
}
if err = s.sseStreamer.Publish(ctx, targetRepo.ParentID, enum.SSETypePullRequestUpdated, pr); err != nil {
log.Ctx(ctx).Warn().Err(err).Msg("failed to publish PR changed event")
}
return nil
}