// Copyright 2022 Harness Inc. All rights reserved. // Use of this source code is governed by the Polyform Free Trial License // that can be found in the LICENSE.md file for this repository. package service import ( "context" "errors" "fmt" "io" "os" "path" "regexp" "runtime/debug" "time" "github.com/harness/gitness/gitrpc/hash" "github.com/harness/gitness/gitrpc/internal/types" "github.com/harness/gitness/gitrpc/rpc" "github.com/rs/zerolog/log" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) const ( maxFileSize = 1 << 20 gitRepoSuffix = "git" gitReferenceNamePrefixBranch = "refs/heads/" gitReferenceNamePrefixTag = "refs/tags/" gitHooksDir = "hooks" ) var ( // TODO: should be coming from caller ALWAYS. SystemIdentity = &rpc.Identity{ Name: "gitness", Email: "system@gitness", } gitServerHookNames = []string{ "pre-receive", // "update", // update is disabled for performance reasons (called once for every ref) "post-receive", } // gitSHARegex defines the valid SHA format accepted by GIT (full form and short forms). // Note: as of now SHA is at most 40 characters long, but in the future it's moving to sha256 // which is 64 chars - keep this forward-compatible. gitSHARegex = regexp.MustCompile("^[0-9a-f]{4,64}$") ) type Storage interface { Save(filePath string, data io.Reader) (string, error) } type RepositoryService struct { rpc.UnimplementedRepositoryServiceServer adapter GitAdapter store Storage reposRoot string tmpDir string gitHookPath string reposGraveyard string } func NewRepositoryService(adapter GitAdapter, store Storage, reposRoot string, tmpDir string, gitHookPath string, reposGraveyard string) (*RepositoryService, error) { return &RepositoryService{ adapter: adapter, store: store, reposRoot: reposRoot, tmpDir: tmpDir, gitHookPath: gitHookPath, reposGraveyard: reposGraveyard, }, nil } func (s RepositoryService) CreateRepository(stream rpc.RepositoryService_CreateRepositoryServer) error { // first get repo params from stream request, err := stream.Recv() if err != nil { return status.Errorf(codes.Internal, "cannot receive create repository data") } header := request.GetHeader() if header == nil { return status.Errorf(codes.Internal, "expected header to be first message in stream") } log.Info().Msgf("received a create repository request %v", header) base := header.GetBase() if base == nil { return types.ErrBaseCannotBeEmpty } committer := base.GetActor() if header.GetCommitter() != nil { committer = header.GetCommitter() } committerDate := time.Now().UTC() if header.GetCommitterDate() != 0 { committerDate = time.Unix(header.GetCommitterDate(), 0) } author := committer if header.GetAuthor() != nil { author = header.GetAuthor() } authorDate := committerDate if header.GetAuthorDate() != 0 { authorDate = time.Unix(header.GetAuthorDate(), 0) } nextFSElement := func() (*rpc.FileUpload, error) { m, errStream := stream.Recv() if errStream != nil { return nil, errStream } return m.GetFile(), nil } err = s.createRepositoryInternal( stream.Context(), base, header.GetDefaultBranch(), nextFSElement, committer, committerDate, author, authorDate, ) if err != nil { return err } res := &rpc.CreateRepositoryResponse{} err = stream.SendAndClose(res) if err != nil { return status.Errorf(codes.Internal, "cannot send completion response: %v", err) } return nil } //nolint:gocognit // refactor if needed func (s RepositoryService) createRepositoryInternal( ctx context.Context, base *rpc.WriteRequest, defaultBranch string, nextFSElement func() (*rpc.FileUpload, error), committer *rpc.Identity, committerDate time.Time, author *rpc.Identity, authorDate time.Time, ) error { log := log.Ctx(ctx) repoPath := getFullPathForRepo(s.reposRoot, base.GetRepoUid()) if _, err := os.Stat(repoPath); !os.IsNotExist(err) { return status.Errorf(codes.AlreadyExists, "repository exists already: %v", repoPath) } // create repository in repos folder ctx, cancel := context.WithCancel(ctx) defer cancel() err := s.adapter.InitRepository(ctx, repoPath, true) // delete repo dir on error defer func() { if err != nil { cleanuperr := s.DeleteRepositoryBestEffort(ctx, base.GetRepoUid()) if cleanuperr != nil { log.Warn().Err(cleanuperr).Msg("failed to cleanup repo dir") } } }() if err != nil { return processGitErrorf(err, "failed to initialize the repository") } // update default branch (currently set to non-existent branch) err = s.adapter.SetDefaultBranch(ctx, repoPath, defaultBranch, true) if err != nil { return processGitErrorf(err, "error updating default branch for repo '%s'", base.GetRepoUid()) } // only execute file creation logic if files are provided //nolint: nestif if nextFSElement != nil { // we need temp dir for cloning tempDir, err := os.MkdirTemp("", "*-"+base.GetRepoUid()) if err != nil { return fmt.Errorf("error creating temp dir for repo %s: %w", base.GetRepoUid(), err) } defer func(path string) { // when repo is successfully created remove temp dir errRm := os.RemoveAll(path) if errRm != nil { log.Err(errRm).Msg("failed to cleanup temporary dir.") } }(tempDir) // Clone repository to temp dir if err = s.adapter.Clone(ctx, repoPath, tempDir, types.CloneRepoOptions{}); err != nil { return processGitErrorf(err, "failed to clone repo") } // logic for receiving files filePaths := make([]string, 0, 16) for { var filePath string filePath, err = s.handleFileUploadIfAvailable(ctx, tempDir, nextFSElement) if errors.Is(err, io.EOF) { log.Info().Msg("received stream EOF") break } if err != nil { return status.Errorf(codes.Internal, "failed to receive file: %v", err) } filePaths = append(filePaths, filePath) } if len(filePaths) > 0 { if committer == nil { committer = base.GetActor() } if author == nil { author = committer } // NOTE: This creates the branch in origin repo (as it doesn't exist as of now) // TODO: this should at least be a constant and not hardcoded? if err = s.addFilesAndPush(ctx, tempDir, filePaths, "HEAD:"+defaultBranch, nil, author, authorDate, committer, committerDate, "origin", "initial commit"); err != nil { return err } } } // setup server hook symlinks pointing to configured server hook binary // IMPORTANT: Setup hooks after repo creation to avoid issues with externally dependent services. for _, hook := range gitServerHookNames { hookPath := path.Join(repoPath, gitHooksDir, hook) err = os.Symlink(s.gitHookPath, hookPath) if err != nil { return status.Errorf(codes.Internal, "failed to setup symlink for hook '%s' ('%s' -> '%s'): %s", hook, hookPath, s.gitHookPath, err) } } log.Info().Msgf("repository created. Path: %s", repoPath) return nil } // isValidGitSHA returns true iff the provided string is a valid git sha (short or long form). func isValidGitSHA(sha string) bool { return gitSHARegex.MatchString(sha) } func (s RepositoryService) DeleteRepository( ctx context.Context, request *rpc.DeleteRepositoryRequest, ) (*rpc.DeleteRepositoryResponse, error) { base := request.GetBase() if base == nil { return nil, types.ErrBaseCannotBeEmpty } repoPath := getFullPathForRepo(s.reposRoot, base.RepoUid) if _, err := os.Stat(repoPath); err != nil && os.IsNotExist(err) { return nil, ErrNotFound(err) } else if err != nil { return nil, fmt.Errorf("failed to check the status of the repository %v: %w", repoPath, err) } rmerr := s.DeleteRepositoryBestEffort(ctx, base.RepoUid) return &rpc.DeleteRepositoryResponse{}, rmerr } func (s *RepositoryService) DeleteRepositoryBestEffort(ctx context.Context, repoUID string) error { repoPath := getFullPathForRepo(s.reposRoot, repoUID) tempPath := path.Join(s.reposGraveyard, repoUID) // move current dir to a temp dir (prevent partial deletion) if err := os.Rename(repoPath, tempPath); err != nil { return fmt.Errorf("couldn't move dir %s to %s : %w", repoPath, tempPath, err) } if err := os.RemoveAll(tempPath); err != nil { log.Ctx(ctx).Warn().Err(err).Msgf("failed to delete dir %s from graveyard", tempPath) } return nil } func (s RepositoryService) SyncRepository( ctx context.Context, request *rpc.SyncRepositoryRequest, ) (*rpc.SyncRepositoryResponse, error) { base := request.GetBase() if base == nil { return nil, types.ErrBaseCannotBeEmpty } repoPath := getFullPathForRepo(s.reposRoot, base.RepoUid) // create repo if requested _, err := os.Stat(repoPath) if err != nil && !os.IsNotExist(err) { return nil, ErrInternalf("failed to create repo", err) } if os.IsNotExist(err) { if !request.CreateIfNotExists { return nil, ErrNotFound(err) } // the default branch doesn't matter for a sync, // we create an empty repo and the head will by updated as part of the Sync. const syncDefaultBranch = "main" if err = s.createRepositoryInternal( ctx, base, syncDefaultBranch, nil, nil, time.Time{}, nil, time.Time{}, ); err != nil { return nil, err } } // sync repo content err = s.adapter.Sync(ctx, repoPath, request.GetSource()) if err != nil { return nil, processGitErrorf(err, "failed to sync git repo") } // get remote default branch defaultBranch, err := s.adapter.GetRemoteDefaultBranch(ctx, request.GetSource()) if errors.Is(err, types.ErrNoDefaultBranch) { return &rpc.SyncRepositoryResponse{ DefaultBranch: "", }, nil } if err != nil { return nil, processGitErrorf(err, "failed to get default branch from repo") } // set default branch err = s.adapter.SetDefaultBranch(ctx, repoPath, defaultBranch, true) if err != nil { return nil, processGitErrorf(err, "failed to set default branch of repo") } return &rpc.SyncRepositoryResponse{ DefaultBranch: defaultBranch, }, nil } func (s RepositoryService) HashRepository( ctx context.Context, request *rpc.HashRepositoryRequest, ) (*rpc.HashRepositoryResponse, error) { base := request.GetBase() if base == nil { return nil, types.ErrBaseCannotBeEmpty } repoPath := getFullPathForRepo(s.reposRoot, base.RepoUid) hashType, err := mapHashType(request.GetHashType()) if err != nil { return nil, ErrInvalidArgumentf("unknown hash type '%s'", request.GetHashType()) } aggregationType, err := mapHashAggregationType(request.GetAggregationType()) if err != nil { return nil, ErrInvalidArgumentf("unknown aggregation type '%s'", request.GetAggregationType()) } // add all references of the repo to the channel in a separate go routine, to allow streamed processing. // Ensure we cancel the go routine in case we exit the func early. goCtx, cancel := context.WithCancel(ctx) defer cancel() hashChan := make(chan hash.SourceNext) go func() { // always close channel last before leaving go routine defer close(hashChan) defer func() { if r := recover(); r != nil { hashChan <- hash.SourceNext{ Err: fmt.Errorf("panic received while filling data source: %s", debug.Stack()), } } }() // add default branch to hash defaultBranch, err := s.adapter.GetDefaultBranch(goCtx, repoPath) if err != nil { hashChan <- hash.SourceNext{ Err: processGitErrorf(err, "failed to get default branch"), } return } hashChan <- hash.SourceNext{ Data: hash.SerializeHead(defaultBranch), } err = s.adapter.WalkReferences(goCtx, repoPath, func(wre types.WalkReferencesEntry) error { ref, ok := wre[types.GitReferenceFieldRefName] if !ok { return errors.New("ref entry didn't contain the ref name") } sha, ok := wre[types.GitReferenceFieldObjectName] if !ok { return errors.New("ref entry didn't contain the ref object sha") } hashChan <- hash.SourceNext{ Data: hash.SerializeReference(ref, sha), } return nil }, &types.WalkReferencesOptions{}) if err != nil { hashChan <- hash.SourceNext{ Err: processGitErrorf(err, "failed to walk references"), } } }() hasher, err := hash.New(hashType, aggregationType) if err != nil { return nil, ErrInternalf("failed to get new reference hasher", err) } source := hash.SourceFromChannel(ctx, hashChan) res, err := hasher.Hash(source) if err != nil { return nil, processGitErrorf(err, "failed to hash repository") } return &rpc.HashRepositoryResponse{ Hash: res, }, nil }