drone/gitrpc/operations.go
2022-11-29 11:05:35 +01:00

301 lines
7.2 KiB
Go

// Copyright 2022 Harness Inc. All rights reserved.
// Use of this source code is governed by the Polyform Free Trial License
// that can be found in the LICENSE.md file for this repository.
package gitrpc
import (
"bytes"
"context"
"encoding/base64"
"errors"
"fmt"
"io"
"time"
"github.com/harness/gitness/gitrpc/rpc"
"github.com/rs/zerolog/log"
)
type ListCommitsParams struct {
// RepoUID is the uid of the git repository
RepoUID string
// GitREF is a git reference (branch / tag / commit SHA)
GitREF string
Page int32
PageSize int32
}
type ListCommitsOutput struct {
TotalCount int64
Commits []Commit
}
type Commit struct {
SHA string
Title string
Message string
Author Signature
Committer Signature
}
type Signature struct {
Identity Identity
When time.Time
}
type Identity struct {
Name string
Email string
}
func (c *Client) ListCommits(ctx context.Context, params *ListCommitsParams) (*ListCommitsOutput, error) {
if params == nil {
return nil, ErrNoParamsProvided
}
stream, err := c.repoService.ListCommits(ctx, &rpc.ListCommitsRequest{
RepoUid: params.RepoUID,
GitRef: params.GitREF,
Page: params.Page,
PageSize: params.PageSize,
})
if err != nil {
return nil, fmt.Errorf("failed to start stream for commits: %w", err)
}
// get header first
header, err := stream.Recv()
if err != nil {
return nil, processRPCErrorf(err, "error occured while receiving header")
}
if header.GetHeader() == nil {
return nil, fmt.Errorf("header missing")
}
// NOTE: don't use PageSize as initial slice capacity - as that theoretically could be MaxInt
output := &ListCommitsOutput{
TotalCount: header.GetHeader().TotalCount,
Commits: make([]Commit, 0, 16),
}
for {
var next *rpc.ListCommitsResponse
next, err = stream.Recv()
if errors.Is(err, io.EOF) {
log.Ctx(ctx).Debug().Msg("received end of stream")
break
}
if err != nil {
return nil, processRPCErrorf(err, "received unexpected error from server")
}
if next.GetCommit() == nil {
return nil, fmt.Errorf("expected commit message")
}
var commit *Commit
commit, err = mapRPCCommit(next.GetCommit())
if err != nil {
return nil, fmt.Errorf("failed to map rpc commit: %w", err)
}
output.Commits = append(output.Commits, *commit)
}
return output, nil
}
type GetCommitDivergencesParams struct {
// RepoUID is the uid of the git repository
RepoUID string
MaxCount int32
Requests []CommitDivergenceRequest
}
type GetCommitDivergencesOutput struct {
Divergences []CommitDivergence
}
// CommitDivergenceRequest contains the refs for which the converging commits should be counted.
type CommitDivergenceRequest struct {
// From is the ref from which the counting of the diverging commits starts.
From string
// To is the ref at which the counting of the diverging commits ends.
To string
}
// CommitDivergence contains the information of the count of converging commits between two refs.
type CommitDivergence struct {
// Ahead is the count of commits the 'From' ref is ahead of the 'To' ref.
Ahead int32
// Behind is the count of commits the 'From' ref is behind the 'To' ref.
Behind int32
}
func (c *Client) GetCommitDivergences(ctx context.Context,
params *GetCommitDivergencesParams) (*GetCommitDivergencesOutput, error) {
if params == nil {
return nil, ErrNoParamsProvided
}
// build rpc request
req := &rpc.GetCommitDivergencesRequest{
RepoUid: params.RepoUID,
MaxCount: params.MaxCount,
Requests: make([]*rpc.CommitDivergenceRequest, len(params.Requests)),
}
for i := range params.Requests {
req.Requests[i] = &rpc.CommitDivergenceRequest{
From: params.Requests[i].From,
To: params.Requests[i].To,
}
}
resp, err := c.repoService.GetCommitDivergences(ctx, req)
if err != nil {
return nil, processRPCErrorf(err, "failed to get diverging commits from server")
}
divergences := resp.GetDivergences()
if divergences == nil {
return nil, fmt.Errorf("server response divergences were nil")
}
// build output
output := &GetCommitDivergencesOutput{
Divergences: make([]CommitDivergence, len(divergences)),
}
for i := range divergences {
if divergences[i] == nil {
return nil, fmt.Errorf("server returned nil divergence")
}
output.Divergences[i] = CommitDivergence{
Ahead: divergences[i].Ahead,
Behind: divergences[i].Behind,
}
}
return output, nil
}
type FileAction string
const (
CreateAction FileAction = "CREATE"
UpdateAction FileAction = "UPDATE"
DeleteAction = "DELETE"
MoveAction = "MOVE"
)
func (FileAction) Enum() []interface{} {
return []interface{}{CreateAction, UpdateAction, DeleteAction, MoveAction}
}
// CommitFileAction holds file operation data.
type CommitFileAction struct {
Action FileAction
Path string
Payload []byte
Encoding string
SHA string
}
// CommitFilesOptions holds the data for file operations.
type CommitFilesOptions struct {
RepoID string
Title string
Message string
Branch string
NewBranch string
Author Identity
Committer Identity
Actions []CommitFileAction
}
type CommitFilesResponse struct {
CommitID string
}
func (c *Client) CommitFiles(ctx context.Context, params *CommitFilesOptions) (CommitFilesResponse, error) {
stream, err := c.commitFilesService.CommitFiles(ctx)
if err != nil {
return CommitFilesResponse{}, err
}
if err = stream.Send(&rpc.CommitFilesRequest{
Payload: &rpc.CommitFilesRequest_Header{
Header: &rpc.CommitFilesRequestHeader{
RepoUid: params.RepoID,
BranchName: params.Branch,
NewBranchName: params.NewBranch,
Title: params.Title,
Message: params.Message,
Author: &rpc.Identity{
Name: params.Author.Name,
Email: params.Author.Email,
},
},
},
}); err != nil {
return CommitFilesResponse{}, err
}
for _, action := range params.Actions {
// send headers
if err = stream.Send(&rpc.CommitFilesRequest{
Payload: &rpc.CommitFilesRequest_Action{
Action: &rpc.CommitFilesAction{
Payload: &rpc.CommitFilesAction_Header{
Header: &rpc.CommitFilesActionHeader{
Action: rpc.CommitFilesActionHeader_ActionType(
rpc.CommitFilesActionHeader_ActionType_value[string(action.Action)]),
Path: action.Path,
Sha: action.SHA,
},
},
},
},
}); err != nil {
return CommitFilesResponse{}, err
}
// send file content
n := 0
buffer := make([]byte, FileTransferChunkSize)
reader := io.Reader(bytes.NewReader(action.Payload))
if action.Encoding == "base64" {
reader = base64.NewDecoder(base64.StdEncoding, reader)
}
for {
n, err = reader.Read(buffer)
if errors.Is(err, io.EOF) {
break
}
if err != nil {
return CommitFilesResponse{}, fmt.Errorf("cannot read buffer: %w", err)
}
if err = stream.Send(&rpc.CommitFilesRequest{
Payload: &rpc.CommitFilesRequest_Action{
Action: &rpc.CommitFilesAction{
Payload: &rpc.CommitFilesAction_Content{
Content: buffer[:n],
},
},
},
}); err != nil {
return CommitFilesResponse{}, err
}
}
}
recv, err := stream.CloseAndRecv()
if err != nil {
return CommitFilesResponse{}, err
}
return CommitFilesResponse{
CommitID: recv.CommitId,
}, nil
}