mirror of https://github.com/harness/drone.git
256 lines
7.7 KiB
Go
256 lines
7.7 KiB
Go
// Copyright 2023 Harness, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package pullreq
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"strconv"
|
|
"time"
|
|
|
|
pullreqevents "github.com/harness/gitness/app/events/pullreq"
|
|
"github.com/harness/gitness/errors"
|
|
"github.com/harness/gitness/events"
|
|
"github.com/harness/gitness/git"
|
|
gitenum "github.com/harness/gitness/git/enum"
|
|
"github.com/harness/gitness/git/sha"
|
|
"github.com/harness/gitness/pubsub"
|
|
"github.com/harness/gitness/types"
|
|
"github.com/harness/gitness/types/enum"
|
|
|
|
"github.com/gotidy/ptr"
|
|
"github.com/rs/zerolog/log"
|
|
)
|
|
|
|
const (
|
|
cancelMergeCheckKey = "cancel_merge_check_for_sha"
|
|
)
|
|
|
|
// mergeCheckOnCreated handles pull request Created events.
|
|
// It creates the PR head git ref.
|
|
func (s *Service) mergeCheckOnCreated(ctx context.Context,
|
|
event *events.Event[*pullreqevents.CreatedPayload],
|
|
) error {
|
|
return s.updateMergeData(
|
|
ctx,
|
|
event.Payload.TargetRepoID,
|
|
event.Payload.Number,
|
|
sha.Nil.String(),
|
|
event.Payload.SourceSHA,
|
|
)
|
|
}
|
|
|
|
// mergeCheckOnBranchUpdate handles pull request Branch Updated events.
|
|
// It updates the PR head git ref to point to the latest commit.
|
|
func (s *Service) mergeCheckOnBranchUpdate(ctx context.Context,
|
|
event *events.Event[*pullreqevents.BranchUpdatedPayload],
|
|
) error {
|
|
return s.updateMergeData(
|
|
ctx,
|
|
event.Payload.TargetRepoID,
|
|
event.Payload.Number,
|
|
event.Payload.OldSHA,
|
|
event.Payload.NewSHA,
|
|
)
|
|
}
|
|
|
|
// mergeCheckOnReopen handles pull request StateChanged events.
|
|
// It updates the PR head git ref to point to the source branch commit SHA.
|
|
func (s *Service) mergeCheckOnReopen(ctx context.Context,
|
|
event *events.Event[*pullreqevents.ReopenedPayload],
|
|
) error {
|
|
return s.updateMergeData(
|
|
ctx,
|
|
event.Payload.TargetRepoID,
|
|
event.Payload.Number,
|
|
sha.None.String(),
|
|
event.Payload.SourceSHA,
|
|
)
|
|
}
|
|
|
|
// mergeCheckOnClosed deletes the merge ref.
|
|
func (s *Service) mergeCheckOnClosed(ctx context.Context,
|
|
event *events.Event[*pullreqevents.ClosedPayload],
|
|
) error {
|
|
return s.deleteMergeRef(ctx, event.Payload.SourceRepoID, event.Payload.Number)
|
|
}
|
|
|
|
// mergeCheckOnMerged deletes the merge ref.
|
|
func (s *Service) mergeCheckOnMerged(ctx context.Context,
|
|
event *events.Event[*pullreqevents.MergedPayload],
|
|
) error {
|
|
return s.deleteMergeRef(ctx, event.Payload.SourceRepoID, event.Payload.Number)
|
|
}
|
|
|
|
func (s *Service) deleteMergeRef(ctx context.Context, repoID int64, prNum int64) error {
|
|
repo, err := s.repoGitInfoCache.Get(ctx, repoID)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get repo with ID %d: %w", repoID, err)
|
|
}
|
|
|
|
writeParams, err := createSystemRPCWriteParams(ctx, s.urlProvider, repo.ID, repo.GitUID)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to generate rpc write params: %w", err)
|
|
}
|
|
|
|
// TODO: This doesn't work for forked repos
|
|
err = s.git.UpdateRef(ctx, git.UpdateRefParams{
|
|
WriteParams: writeParams,
|
|
Name: strconv.Itoa(int(prNum)),
|
|
Type: gitenum.RefTypePullReqMerge,
|
|
NewValue: sha.None, // when NewValue is empty will delete the ref.
|
|
OldValue: sha.None, // we don't care about the old value
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("failed to remove PR merge ref: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
//nolint:funlen // refactor if required.
|
|
func (s *Service) updateMergeData(
|
|
ctx context.Context,
|
|
repoID int64,
|
|
prNum int64,
|
|
oldSHA string,
|
|
newSHA string,
|
|
) error {
|
|
pr, err := s.pullreqStore.FindByNumber(ctx, repoID, prNum)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get pull request number %d: %w", prNum, err)
|
|
}
|
|
|
|
// TODO: Merge check should not update the merge base.
|
|
// TODO: Instead it should accept it as an argument and fail if it doesn't match.
|
|
// Then is would not longer be necessary to cancel already active mergeability checks.
|
|
|
|
if pr.State != enum.PullReqStateOpen {
|
|
return fmt.Errorf("cannot do mergability check on closed PR %d", pr.Number)
|
|
}
|
|
|
|
// cancel all previous mergability work for this PR based on oldSHA
|
|
if err := s.pubsub.Publish(ctx, cancelMergeCheckKey, []byte(oldSHA),
|
|
pubsub.WithPublishNamespace("pullreq")); err != nil {
|
|
return err
|
|
}
|
|
|
|
var cancel context.CancelFunc
|
|
ctx, cancel = context.WithCancel(ctx)
|
|
|
|
s.cancelMutex.Lock()
|
|
// NOTE: Temporary workaround to avoid overwriting existing cancel method on same machine.
|
|
// This doesn't avoid same SHA running on multiple machines
|
|
if _, ok := s.cancelMergeability[newSHA]; ok {
|
|
s.cancelMutex.Unlock()
|
|
cancel()
|
|
return nil
|
|
}
|
|
s.cancelMergeability[newSHA] = cancel
|
|
s.cancelMutex.Unlock()
|
|
|
|
defer func() {
|
|
cancel()
|
|
s.cancelMutex.Lock()
|
|
delete(s.cancelMergeability, newSHA)
|
|
s.cancelMutex.Unlock()
|
|
}()
|
|
|
|
// load repository objects
|
|
targetRepo, err := s.repoGitInfoCache.Get(ctx, pr.TargetRepoID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
sourceRepo := targetRepo
|
|
if pr.TargetRepoID != pr.SourceRepoID {
|
|
sourceRepo, err = s.repoGitInfoCache.Get(ctx, pr.SourceRepoID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
writeParams, err := createSystemRPCWriteParams(ctx, s.urlProvider, targetRepo.ID, targetRepo.GitUID)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to generate rpc write params: %w", err)
|
|
}
|
|
|
|
// call merge and store output in pr merge reference.
|
|
now := time.Now()
|
|
mergeOutput, err := s.git.Merge(ctx, &git.MergeParams{
|
|
WriteParams: writeParams,
|
|
BaseBranch: pr.TargetBranch,
|
|
HeadRepoUID: sourceRepo.GitUID,
|
|
HeadBranch: pr.SourceBranch,
|
|
RefType: gitenum.RefTypePullReqMerge,
|
|
RefName: strconv.Itoa(int(pr.Number)),
|
|
HeadExpectedSHA: sha.Must(newSHA),
|
|
Force: true,
|
|
|
|
// set committer date to ensure repeatability of merge commit across replicas
|
|
CommitterDate: &now,
|
|
})
|
|
if errors.AsStatus(err) == errors.StatusPreconditionFailed {
|
|
return events.NewDiscardEventErrorf("Source branch %q is not on SHA %q anymore.",
|
|
pr.SourceBranch, newSHA)
|
|
}
|
|
if err != nil {
|
|
return fmt.Errorf("failed to run git merge with base %q and head %q: %w", pr.TargetBranch, pr.SourceBranch, err)
|
|
}
|
|
|
|
// Update DB in both cases (failure or success)
|
|
_, err = s.pullreqStore.UpdateOptLock(ctx, pr, func(pr *types.PullReq) error {
|
|
// to avoid racing conditions with merge
|
|
if pr.State != enum.PullReqStateOpen {
|
|
return errPRNotOpen
|
|
}
|
|
|
|
if pr.SourceSHA != newSHA {
|
|
return events.NewDiscardEventErrorf("PR SHA %s is newer than %s", pr.SourceSHA, newSHA)
|
|
}
|
|
|
|
if len(mergeOutput.ConflictFiles) > 0 {
|
|
pr.MergeCheckStatus = enum.MergeCheckStatusConflict
|
|
pr.MergeBaseSHA = mergeOutput.MergeBaseSHA.String()
|
|
pr.MergeTargetSHA = ptr.String(mergeOutput.BaseSHA.String())
|
|
pr.MergeSHA = nil
|
|
pr.MergeConflicts = mergeOutput.ConflictFiles
|
|
} else {
|
|
pr.MergeCheckStatus = enum.MergeCheckStatusMergeable
|
|
pr.MergeBaseSHA = mergeOutput.MergeBaseSHA.String()
|
|
pr.MergeTargetSHA = ptr.String(mergeOutput.BaseSHA.String())
|
|
pr.MergeSHA = ptr.String(mergeOutput.MergeSHA.String())
|
|
pr.MergeConflicts = nil
|
|
}
|
|
pr.Stats.DiffStats = types.NewDiffStats(
|
|
mergeOutput.CommitCount,
|
|
mergeOutput.ChangedFileCount,
|
|
mergeOutput.Additions,
|
|
mergeOutput.Deletions,
|
|
)
|
|
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("failed to update PR merge ref in db with error: %w", err)
|
|
}
|
|
|
|
if err = s.sseStreamer.Publish(ctx, targetRepo.ParentID, enum.SSETypePullRequestUpdated, pr); err != nil {
|
|
log.Ctx(ctx).Warn().Err(err).Msg("failed to publish PR changed event")
|
|
}
|
|
|
|
return nil
|
|
}
|