mirror of https://github.com/harness/drone.git
[LINT] Fix All Linting Errors (#609)
parent
5ca48e7f59
commit
dc351cdcef
|
@ -292,6 +292,8 @@ issues:
|
|||
linters: [ govet ]
|
||||
- source: "^//\\s*go:generate\\s"
|
||||
linters: [ lll ]
|
||||
- text: 'replacement are not allowed: github.com/docker/docker'
|
||||
linters: [ gomoddirectives ]
|
||||
- source: "(noinspection|TODO)"
|
||||
linters: [ godot ]
|
||||
- source: "//noinspection"
|
||||
|
|
|
@ -31,8 +31,8 @@ COPY go.mod .
|
|||
COPY go.sum .
|
||||
|
||||
# TODO: REMOVE ONCE WE SPLIT REPOS
|
||||
RUN sed -i '/go-rbac/' go.mod
|
||||
RUN sed -i '/go-rbac/' go.sum
|
||||
RUN sed -i '/go-rbac/d' go.mod
|
||||
RUN sed -i '/go-rbac/d' go.sum
|
||||
|
||||
COPY Makefile .
|
||||
RUN make dep
|
||||
|
|
|
@ -16,6 +16,7 @@ package cache
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
|
@ -79,7 +80,7 @@ func (c *Redis[K, V]) Get(ctx context.Context, key K) (V, error) {
|
|||
c.countHit++
|
||||
return c.codec.Decode(raw)
|
||||
}
|
||||
if err != redis.Nil {
|
||||
if !errors.Is(err, redis.Nil) {
|
||||
return nothing, err
|
||||
}
|
||||
|
||||
|
|
|
@ -74,6 +74,7 @@ func backfillURLs(config *types.Config) error {
|
|||
}
|
||||
|
||||
// override base with whatever user explicit override
|
||||
//nolint:nestif // simple conditional override of all elements
|
||||
if config.URL.Base != "" {
|
||||
u, err := url.Parse(config.URL.Base)
|
||||
if err != nil {
|
||||
|
|
|
@ -18,6 +18,7 @@ import (
|
|||
"testing"
|
||||
|
||||
"github.com/harness/gitness/types"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
|
|
|
@ -89,7 +89,11 @@ func (c *command) run(*kingpin.ParseContext) error {
|
|||
g.Go(func() error {
|
||||
// initialize metric collector
|
||||
if system.services.MetricCollector != nil {
|
||||
system.services.MetricCollector.Register(gCtx)
|
||||
err := system.services.MetricCollector.Register(gCtx)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("failed to register metric collector")
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return system.services.JobScheduler.Run(gCtx)
|
||||
|
|
|
@ -30,7 +30,7 @@ type System struct {
|
|||
bootstrap bootstrap.Bootstrap
|
||||
server *server.Server
|
||||
gitRPCServer *gitrpcserver.GRPCServer
|
||||
pluginManager *plugin.PluginManager
|
||||
pluginManager *plugin.Manager
|
||||
poller *poller.Poller
|
||||
services services.Services
|
||||
gitRPCCronMngr *gitrpccron.Manager
|
||||
|
@ -38,7 +38,7 @@ type System struct {
|
|||
|
||||
// NewSystem returns a new system structure.
|
||||
func NewSystem(bootstrap bootstrap.Bootstrap, server *server.Server, poller *poller.Poller,
|
||||
gitRPCServer *gitrpcserver.GRPCServer, pluginManager *plugin.PluginManager,
|
||||
gitRPCServer *gitrpcserver.GRPCServer, pluginManager *plugin.Manager,
|
||||
gitrpccron *gitrpccron.Manager, services services.Services) *System {
|
||||
return &System{
|
||||
bootstrap: bootstrap,
|
||||
|
|
|
@ -144,8 +144,8 @@ func initSystem(ctx context.Context, config *types.Config) (*server.System, erro
|
|||
}
|
||||
stepStore := database.ProvideStepStore(db)
|
||||
cancelerCanceler := canceler.ProvideCanceler(executionStore, streamer, repoStore, schedulerScheduler, stageStore, stepStore)
|
||||
commitService := commit.ProvideCommitService(gitrpcInterface)
|
||||
fileService := file.ProvideFileService(gitrpcInterface)
|
||||
commitService := commit.ProvideService(gitrpcInterface)
|
||||
fileService := file.ProvideService(gitrpcInterface)
|
||||
triggererTriggerer := triggerer.ProvideTriggerer(executionStore, checkStore, stageStore, db, pipelineStore, fileService, schedulerScheduler, repoStore)
|
||||
executionController := execution.ProvideController(db, authorizer, executionStore, checkStore, cancelerCanceler, commitService, triggererTriggerer, repoStore, stageStore, pipelineStore)
|
||||
logStore := logs.ProvideLogStore(db, config)
|
||||
|
|
|
@ -87,6 +87,7 @@ func (c *CLICore) PostReceive(ctx context.Context) error {
|
|||
return handleServerHookOutput(out, err)
|
||||
}
|
||||
|
||||
//nolint:forbidigo // outputing to CMD as that's where git reads the data
|
||||
func handleServerHookOutput(out *Output, err error) error {
|
||||
if err != nil {
|
||||
return fmt.Errorf("an error occurred when calling the server: %w", err)
|
||||
|
|
|
@ -28,7 +28,7 @@ import (
|
|||
* 3: {, look for a preceding @ to reject @{ in refs
|
||||
* 4: A bad character: ASCII control characters, and
|
||||
* ":", "?", "[", "\", "^", "~", SP, or TAB
|
||||
* 5: *, reject unless REFNAME_REFSPEC_PATTERN is set
|
||||
* 5: *, reject unless REFNAME_REFSPEC_PATTERN is set.
|
||||
*/
|
||||
var refnameDisposition = [256]byte{
|
||||
1, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4,
|
||||
|
@ -41,6 +41,7 @@ var refnameDisposition = [256]byte{
|
|||
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 3, 0, 0, 4, 4,
|
||||
}
|
||||
|
||||
//nolint:gocognit // refactor if needed
|
||||
func BranchName(branch string) error {
|
||||
const lock = ".lock"
|
||||
last := byte('\x00')
|
||||
|
|
|
@ -405,6 +405,8 @@ func mapRPCFileDiffStatus(status rpc.DiffResponse_FileStatus) FileDiffStatus {
|
|||
return FileDiffStatusModified
|
||||
case rpc.DiffResponse_RENAMED:
|
||||
return FileDiffStatusRenamed
|
||||
case rpc.DiffResponse_UNDEFINED:
|
||||
return FileDiffStatusUndefined
|
||||
default:
|
||||
return FileDiffStatusUndefined
|
||||
}
|
||||
|
|
|
@ -78,6 +78,7 @@ func (s *Section) NumLines() int {
|
|||
}
|
||||
|
||||
// Line returns a specific line by given type and line number in a section.
|
||||
// nolint: gocognit
|
||||
func (s *Section) Line(lineType LineType, line int) *Line {
|
||||
var (
|
||||
difference = 0
|
||||
|
@ -93,7 +94,8 @@ loop:
|
|||
addCount++
|
||||
case DiffLineDelete:
|
||||
delCount++
|
||||
default:
|
||||
case DiffLinePlain,
|
||||
DiffLineSection:
|
||||
if matchedDiffLine != nil {
|
||||
break loop
|
||||
}
|
||||
|
@ -111,6 +113,8 @@ loop:
|
|||
if diffLine.LeftLine == 0 && diffLine.RightLine == line+difference {
|
||||
matchedDiffLine = diffLine
|
||||
}
|
||||
case DiffLinePlain,
|
||||
DiffLineSection:
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -212,8 +216,8 @@ func (p *Parser) readLine() error {
|
|||
var err error
|
||||
p.buffer, err = p.ReadBytes('\n')
|
||||
if err != nil {
|
||||
if err != io.EOF {
|
||||
return fmt.Errorf("read string: %v", err)
|
||||
if !errors.Is(err, io.EOF) {
|
||||
return fmt.Errorf("read string: %w", err)
|
||||
}
|
||||
|
||||
p.isEOF = true
|
||||
|
@ -228,6 +232,7 @@ func (p *Parser) readLine() error {
|
|||
|
||||
var diffHead = []byte("diff --git ")
|
||||
|
||||
//nolint:gocognit
|
||||
func (p *Parser) parseFileHeader() (*File, error) {
|
||||
submoduleMode := " 160000"
|
||||
line := string(p.buffer)
|
||||
|
@ -263,18 +268,18 @@ checkType:
|
|||
return nil, err
|
||||
}
|
||||
|
||||
line := string(p.buffer)
|
||||
subLine := string(p.buffer)
|
||||
p.buffer = nil
|
||||
|
||||
if len(line) == 0 {
|
||||
if len(subLine) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
switch {
|
||||
case strings.HasPrefix(line, enum.DiffExtHeaderNewFileMode):
|
||||
case strings.HasPrefix(subLine, enum.DiffExtHeaderNewFileMode):
|
||||
file.Type = FileAdd
|
||||
file.IsSubmodule = strings.HasSuffix(line, submoduleMode)
|
||||
fields := strings.Fields(line)
|
||||
file.IsSubmodule = strings.HasSuffix(subLine, submoduleMode)
|
||||
fields := strings.Fields(subLine)
|
||||
if len(fields) > 0 {
|
||||
mode, _ := strconv.ParseUint(fields[len(fields)-1], 8, 64)
|
||||
file.mode = enum.EntryMode(mode)
|
||||
|
@ -282,10 +287,10 @@ checkType:
|
|||
file.oldMode = file.mode
|
||||
}
|
||||
}
|
||||
case strings.HasPrefix(line, enum.DiffExtHeaderDeletedFileMode):
|
||||
case strings.HasPrefix(subLine, enum.DiffExtHeaderDeletedFileMode):
|
||||
file.Type = FileDelete
|
||||
file.IsSubmodule = strings.HasSuffix(line, submoduleMode)
|
||||
fields := strings.Fields(line)
|
||||
file.IsSubmodule = strings.HasSuffix(subLine, submoduleMode)
|
||||
fields := strings.Fields(subLine)
|
||||
if len(fields) > 0 {
|
||||
mode, _ := strconv.ParseUint(fields[len(fields)-1], 8, 64)
|
||||
file.mode = enum.EntryMode(mode)
|
||||
|
@ -293,8 +298,8 @@ checkType:
|
|||
file.oldMode = file.mode
|
||||
}
|
||||
}
|
||||
case strings.HasPrefix(line, enum.DiffExtHeaderIndex): // e.g. index ee791be..9997571 100644
|
||||
fields := strings.Fields(line[6:])
|
||||
case strings.HasPrefix(subLine, enum.DiffExtHeaderIndex): // e.g. index ee791be..9997571 100644
|
||||
fields := strings.Fields(subLine[6:])
|
||||
shas := strings.Split(fields[0], "..")
|
||||
if len(shas) != 2 {
|
||||
return nil, errors.New("malformed index: expect two SHAs in the form of <old>..<new>")
|
||||
|
@ -308,23 +313,23 @@ checkType:
|
|||
file.oldMode = enum.EntryMode(mode)
|
||||
}
|
||||
break checkType
|
||||
case strings.HasPrefix(line, enum.DiffExtHeaderSimilarity):
|
||||
case strings.HasPrefix(subLine, enum.DiffExtHeaderSimilarity):
|
||||
file.Type = FileRename
|
||||
file.OldPath = a
|
||||
file.Path = b
|
||||
|
||||
// No need to look for index if it's a pure rename
|
||||
if strings.HasSuffix(line, "100%") {
|
||||
if strings.HasSuffix(subLine, "100%") {
|
||||
break checkType
|
||||
}
|
||||
case strings.HasPrefix(line, enum.DiffExtHeaderNewMode):
|
||||
fields := strings.Fields(line)
|
||||
case strings.HasPrefix(subLine, enum.DiffExtHeaderNewMode):
|
||||
fields := strings.Fields(subLine)
|
||||
if len(fields) > 0 {
|
||||
mode, _ := strconv.ParseUint(fields[len(fields)-1], 8, 64)
|
||||
file.mode = enum.EntryMode(mode)
|
||||
}
|
||||
case strings.HasPrefix(line, enum.DiffExtHeaderOldMode):
|
||||
fields := strings.Fields(line)
|
||||
case strings.HasPrefix(subLine, enum.DiffExtHeaderOldMode):
|
||||
fields := strings.Fields(subLine)
|
||||
if len(fields) > 0 {
|
||||
mode, _ := strconv.ParseUint(fields[len(fields)-1], 8, 64)
|
||||
file.oldMode = enum.EntryMode(mode)
|
||||
|
@ -335,6 +340,7 @@ checkType:
|
|||
return file, nil
|
||||
}
|
||||
|
||||
//nolint:gocognit // refactor if needed
|
||||
func (p *Parser) parseSection() (*Section, error) {
|
||||
line := string(p.buffer)
|
||||
p.buffer = nil
|
||||
|
@ -374,7 +380,6 @@ func (p *Parser) parseSection() (*Section, error) {
|
|||
if p.buffer[0] != ' ' &&
|
||||
p.buffer[0] != '+' &&
|
||||
p.buffer[0] != '-' {
|
||||
|
||||
// No new line indicator
|
||||
if p.buffer[0] == '\\' &&
|
||||
bytes.HasPrefix(p.buffer, []byte(`\ No newline at end of file`)) {
|
||||
|
@ -384,14 +389,14 @@ func (p *Parser) parseSection() (*Section, error) {
|
|||
return section, nil
|
||||
}
|
||||
|
||||
line := string(p.buffer)
|
||||
subLine := string(p.buffer)
|
||||
p.buffer = nil
|
||||
|
||||
switch line[0] {
|
||||
switch subLine[0] {
|
||||
case ' ':
|
||||
section.Lines = append(section.Lines, &Line{
|
||||
Type: DiffLinePlain,
|
||||
Content: line,
|
||||
Content: subLine,
|
||||
LeftLine: leftLine,
|
||||
RightLine: rightLine,
|
||||
})
|
||||
|
@ -400,7 +405,7 @@ func (p *Parser) parseSection() (*Section, error) {
|
|||
case '+':
|
||||
section.Lines = append(section.Lines, &Line{
|
||||
Type: DiffLineAdd,
|
||||
Content: line,
|
||||
Content: subLine,
|
||||
RightLine: rightLine,
|
||||
})
|
||||
section.numAdditions++
|
||||
|
@ -408,7 +413,7 @@ func (p *Parser) parseSection() (*Section, error) {
|
|||
case '-':
|
||||
section.Lines = append(section.Lines, &Line{
|
||||
Type: DiffLineDelete,
|
||||
Content: line,
|
||||
Content: subLine,
|
||||
LeftLine: leftLine,
|
||||
})
|
||||
section.numDeletions++
|
||||
|
@ -421,7 +426,8 @@ func (p *Parser) parseSection() (*Section, error) {
|
|||
return section, nil
|
||||
}
|
||||
|
||||
func (p *Parser) Parse(f func(f *File)) error {
|
||||
//nolint:gocognit
|
||||
func (p *Parser) Parse(send func(f *File) error) error {
|
||||
file := new(File)
|
||||
currentFileLines := 0
|
||||
additions := 0
|
||||
|
@ -445,8 +451,11 @@ func (p *Parser) Parse(f func(f *File)) error {
|
|||
// Found new file
|
||||
if bytes.HasPrefix(p.buffer, diffHead) {
|
||||
// stream previous file
|
||||
if !file.IsEmpty() && f != nil {
|
||||
f(file)
|
||||
if !file.IsEmpty() && send != nil {
|
||||
err = send(file)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to send out file: %w", err)
|
||||
}
|
||||
}
|
||||
file, err = p.parseFileHeader()
|
||||
if err != nil {
|
||||
|
@ -487,8 +496,11 @@ func (p *Parser) Parse(f func(f *File)) error {
|
|||
}
|
||||
|
||||
// stream last file
|
||||
if !file.IsEmpty() && f != nil {
|
||||
f(file)
|
||||
if !file.IsEmpty() && send != nil {
|
||||
err = send(file)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to send last file: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -500,7 +512,7 @@ func UnescapeChars(in []byte) []byte {
|
|||
return in
|
||||
}
|
||||
|
||||
out := bytes.Replace(in, escapedSlash, regularSlash, -1)
|
||||
out = bytes.Replace(out, escapedTab, regularTab, -1)
|
||||
out := bytes.ReplaceAll(in, escapedSlash, regularSlash)
|
||||
out = bytes.ReplaceAll(out, escapedTab, regularTab)
|
||||
return out
|
||||
}
|
||||
|
|
|
@ -209,7 +209,7 @@ func AsPathNotFoundError(err error) (path string) {
|
|||
details := ErrorDetails(err)
|
||||
object, ok := details[pathKey]
|
||||
if ok {
|
||||
path = object.(string)
|
||||
path, _ = object.(string)
|
||||
}
|
||||
|
||||
return
|
||||
|
|
|
@ -16,6 +16,7 @@ package gitea
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
|
||||
|
@ -33,7 +34,7 @@ func (g Adapter) GetBlob(ctx context.Context, repoPath string, sha string, sizeL
|
|||
|
||||
blob, err := repo.BlobObject(gogitplumbing.NewHash(sha))
|
||||
if err != nil {
|
||||
if err == gogitplumbing.ErrObjectNotFound {
|
||||
if errors.Is(err, gogitplumbing.ErrObjectNotFound) {
|
||||
return nil, types.ErrNotFound
|
||||
}
|
||||
return nil, fmt.Errorf("failed to get blob object: %w", err)
|
||||
|
|
|
@ -59,7 +59,7 @@ func (g Adapter) RawDiff(
|
|||
return nil
|
||||
}
|
||||
|
||||
// CommitDiff will stream diff for provided ref
|
||||
// CommitDiff will stream diff for provided ref.
|
||||
func (g Adapter) CommitDiff(ctx context.Context, repoPath, sha string, w io.Writer) error {
|
||||
args := make([]string, 0, 8)
|
||||
args = append(args, "show", "--full-index", "--pretty=format:%b", sha)
|
||||
|
|
|
@ -152,7 +152,10 @@ func mapGiteaCommit(giteaCommit *gitea.Commit) (*types.Commit, error) {
|
|||
}, nil
|
||||
}
|
||||
|
||||
func mapGogitNodeToTreeNodeModeAndType(gogitMode gogitfilemode.FileMode) (types.TreeNodeType, types.TreeNodeMode, error) {
|
||||
func mapGogitNodeToTreeNodeModeAndType(
|
||||
gogitMode gogitfilemode.FileMode,
|
||||
) (types.TreeNodeType, types.TreeNodeMode, error) {
|
||||
//nolint:exhaustive
|
||||
switch gogitMode {
|
||||
case gogitfilemode.Regular, gogitfilemode.Deprecated:
|
||||
return types.TreeNodeTypeBlob, types.TreeNodeModeFile, nil
|
||||
|
|
|
@ -54,7 +54,9 @@ func (g Adapter) MatchFiles(ctx context.Context,
|
|||
}
|
||||
|
||||
var files []types.FileContent
|
||||
for _, fileEntry := range tree.Entries {
|
||||
for i := range tree.Entries {
|
||||
fileEntry := tree.Entries[i]
|
||||
|
||||
ok, err := path.Match(pattern, fileEntry.Name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
|
@ -27,6 +27,8 @@ import (
|
|||
)
|
||||
|
||||
// PathsDetails returns additional details about provided the paths.
|
||||
//
|
||||
//nolint:gocognit // refactor if needed
|
||||
func (g Adapter) PathsDetails(ctx context.Context,
|
||||
repoPath string,
|
||||
ref string,
|
||||
|
@ -49,6 +51,7 @@ func (g Adapter) PathsDetails(ctx context.Context,
|
|||
for i, path := range paths {
|
||||
results[i].Path = path
|
||||
|
||||
//nolint:nestif
|
||||
if len(path) > 0 {
|
||||
entry, err := tree.FindEntry(path)
|
||||
if errors.Is(err, gogitobject.ErrDirectoryNotFound) || errors.Is(err, gogitobject.ErrEntryNotFound) {
|
||||
|
|
|
@ -213,7 +213,7 @@ func Push(ctx context.Context, repoPath string, opts types.PushOptions) error {
|
|||
if opts.ForceWithLease != "" {
|
||||
cmd.AddArguments(fmt.Sprintf("--force-with-lease=%s", opts.ForceWithLease))
|
||||
}
|
||||
if opts.Mirror == true {
|
||||
if opts.Mirror {
|
||||
cmd.AddArguments("--mirror")
|
||||
}
|
||||
cmd.AddArguments("--", opts.Remote)
|
||||
|
|
|
@ -25,6 +25,7 @@ import (
|
|||
"github.com/harness/gitness/gitrpc/internal/streamio"
|
||||
"github.com/harness/gitness/gitrpc/internal/types"
|
||||
"github.com/harness/gitness/gitrpc/rpc"
|
||||
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
@ -45,7 +46,6 @@ func NewDiffService(adapter GitAdapter, reposRoot string, reposTempDir string) (
|
|||
}
|
||||
|
||||
func (s DiffService) RawDiff(request *rpc.DiffRequest, stream rpc.DiffService_RawDiffServer) error {
|
||||
|
||||
sw := streamio.NewWriter(func(p []byte) error {
|
||||
return stream.Send(&rpc.RawDiffResponse{Data: p})
|
||||
})
|
||||
|
@ -115,7 +115,7 @@ func validateCommitDiffRequest(in *rpc.CommitDiffRequest) error {
|
|||
func (s DiffService) DiffShortStat(ctx context.Context, r *rpc.DiffRequest) (*rpc.DiffShortStatResponse, error) {
|
||||
err := validateDiffRequest(r)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to validate request for short diff statistic, error: %v", err)
|
||||
return nil, fmt.Errorf("failed to validate request for short diff statistic, error: %w", err)
|
||||
}
|
||||
|
||||
base := r.GetBase()
|
||||
|
@ -214,12 +214,12 @@ func (s DiffService) Diff(request *rpc.DiffRequest, stream rpc.DiffService_DiffS
|
|||
}
|
||||
}()
|
||||
|
||||
return parser.Parse(func(f *diff.File) {
|
||||
streamDiffFile(f, request.IncludePatch, stream)
|
||||
return parser.Parse(func(f *diff.File) error {
|
||||
return streamDiffFile(f, request.IncludePatch, stream)
|
||||
})
|
||||
}
|
||||
|
||||
func streamDiffFile(f *diff.File, includePatch bool, stream rpc.DiffService_DiffServer) {
|
||||
func streamDiffFile(f *diff.File, includePatch bool, stream rpc.DiffService_DiffServer) error {
|
||||
var status rpc.DiffResponse_FileStatus
|
||||
switch f.Type {
|
||||
case diff.FileAdd:
|
||||
|
@ -245,7 +245,7 @@ func streamDiffFile(f *diff.File, includePatch bool, stream rpc.DiffService_Diff
|
|||
}
|
||||
}
|
||||
|
||||
stream.Send(&rpc.DiffResponse{
|
||||
err := stream.Send(&rpc.DiffResponse{
|
||||
Path: f.Path,
|
||||
OldPath: f.OldPath,
|
||||
Sha: f.SHA,
|
||||
|
@ -256,4 +256,8 @@ func streamDiffFile(f *diff.File, includePatch bool, stream rpc.DiffService_Diff
|
|||
Changes: int32(f.NumChanges()),
|
||||
Patch: patch.Bytes(),
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to send diff response on stream: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -89,22 +89,22 @@ func (s RepositoryService) GetTreeNode(ctx context.Context,
|
|||
},
|
||||
}
|
||||
|
||||
if request.GetIncludeLatestCommit() {
|
||||
pathDetails, err := s.adapter.PathsDetails(ctx, repoPath, request.GitRef, []string{request.Path})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !request.GetIncludeLatestCommit() {
|
||||
return res, nil
|
||||
}
|
||||
|
||||
if len(pathDetails) != 1 {
|
||||
return nil, fmt.Errorf("failed to get details for the path %s", request.Path)
|
||||
}
|
||||
pathDetails, err := s.adapter.PathsDetails(ctx, repoPath, request.GitRef, []string{request.Path})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if pathDetails[0].LastCommit != nil {
|
||||
res.Commit, err = mapGitCommit(pathDetails[0].LastCommit)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
if len(pathDetails) != 1 || pathDetails[0].LastCommit == nil {
|
||||
return nil, fmt.Errorf("failed to get details for the path %s", request.Path)
|
||||
}
|
||||
|
||||
res.Commit, err = mapGitCommit(pathDetails[0].LastCommit)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return res, nil
|
||||
|
|
|
@ -22,7 +22,7 @@ import (
|
|||
|
||||
type PushRemoteParams struct {
|
||||
ReadParams
|
||||
RemoteUrl string
|
||||
RemoteURL string
|
||||
}
|
||||
|
||||
func (c *Client) PushRemote(ctx context.Context, params *PushRemoteParams) error {
|
||||
|
@ -32,7 +32,7 @@ func (c *Client) PushRemote(ctx context.Context, params *PushRemoteParams) error
|
|||
|
||||
_, err := c.pushService.PushRemote(ctx, &rpc.PushRemoteRequest{
|
||||
Base: mapToRPCReadRequest(params.ReadParams),
|
||||
RemoteUrl: params.RemoteUrl,
|
||||
RemoteUrl: params.RemoteURL,
|
||||
})
|
||||
if err != nil {
|
||||
return processRPCErrorf(err, "failed to push to remote")
|
||||
|
@ -46,7 +46,7 @@ func (p PushRemoteParams) Validate() error {
|
|||
return err
|
||||
}
|
||||
|
||||
if p.RemoteUrl == "" {
|
||||
if p.RemoteURL == "" {
|
||||
return ErrInvalidArgumentf("remote url cannot be empty")
|
||||
}
|
||||
return nil
|
||||
|
|
|
@ -87,7 +87,6 @@ func (in *ReportInput) Validate() error {
|
|||
|
||||
case enum.CheckPayloadKindPipeline:
|
||||
return usererror.BadRequest("Kind cannot be pipeline for external checks")
|
||||
|
||||
}
|
||||
|
||||
return nil
|
||||
|
|
|
@ -50,6 +50,10 @@ func (c *Controller) Update(
|
|||
return nil, fmt.Errorf("failed to authorize: %w", err)
|
||||
}
|
||||
|
||||
if err = c.sanitizeUpdateInput(in); err != nil {
|
||||
return nil, fmt.Errorf("failed to sanitize input: %w", err)
|
||||
}
|
||||
|
||||
connector, err := c.connectorStore.FindByUID(ctx, space.ID, uid)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to find connector: %w", err)
|
||||
|
|
|
@ -23,6 +23,7 @@ import (
|
|||
"github.com/harness/gitness/internal/pipeline/checks"
|
||||
"github.com/harness/gitness/types"
|
||||
"github.com/harness/gitness/types/enum"
|
||||
|
||||
"github.com/rs/zerolog/log"
|
||||
)
|
||||
|
||||
|
|
|
@ -30,7 +30,7 @@ type Controller struct {
|
|||
executionStore store.ExecutionStore
|
||||
checkStore store.CheckStore
|
||||
canceler canceler.Canceler
|
||||
commitService commit.CommitService
|
||||
commitService commit.Service
|
||||
triggerer triggerer.Triggerer
|
||||
repoStore store.RepoStore
|
||||
stageStore store.StageStore
|
||||
|
@ -43,7 +43,7 @@ func NewController(
|
|||
executionStore store.ExecutionStore,
|
||||
checkStore store.CheckStore,
|
||||
canceler canceler.Canceler,
|
||||
commitService commit.CommitService,
|
||||
commitService commit.Service,
|
||||
triggerer triggerer.Triggerer,
|
||||
repoStore store.RepoStore,
|
||||
stageStore store.StageStore,
|
||||
|
|
|
@ -35,7 +35,7 @@ func ProvideController(db *sqlx.DB,
|
|||
executionStore store.ExecutionStore,
|
||||
checkStore store.CheckStore,
|
||||
canceler canceler.Canceler,
|
||||
commitService commit.CommitService,
|
||||
commitService commit.Service,
|
||||
triggerer triggerer.Triggerer,
|
||||
repoStore store.RepoStore,
|
||||
stageStore store.StageStore,
|
||||
|
|
|
@ -24,6 +24,7 @@ import (
|
|||
events "github.com/harness/gitness/internal/events/git"
|
||||
"github.com/harness/gitness/types"
|
||||
"github.com/harness/gitness/types/enum"
|
||||
|
||||
"github.com/rs/zerolog/log"
|
||||
)
|
||||
|
||||
|
@ -59,7 +60,7 @@ func (c *Controller) PostReceive(
|
|||
out := &githook.Output{}
|
||||
|
||||
// handle branch updates related to PRs - best effort
|
||||
c.handlePRMessaging(ctx, repo, principalID, in, out)
|
||||
c.handlePRMessaging(ctx, repo, in, out)
|
||||
|
||||
return out, nil
|
||||
}
|
||||
|
@ -157,7 +158,6 @@ func (c *Controller) reportTagEvent(
|
|||
func (c *Controller) handlePRMessaging(
|
||||
ctx context.Context,
|
||||
repo *types.Repository,
|
||||
principalID int64,
|
||||
in *githook.PostReceiveInput,
|
||||
out *githook.Output,
|
||||
) {
|
||||
|
|
|
@ -72,7 +72,10 @@ func (c *Controller) Find(
|
|||
|
||||
lines := []*livelog.Line{}
|
||||
buf := new(bytes.Buffer)
|
||||
buf.ReadFrom(rc)
|
||||
_, err = buf.ReadFrom(rc)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to read from buffer: %w", err)
|
||||
}
|
||||
|
||||
err = json.Unmarshal(buf.Bytes(), &lines)
|
||||
if err != nil {
|
||||
|
|
|
@ -31,10 +31,6 @@ import (
|
|||
)
|
||||
|
||||
var (
|
||||
// errPipelineRequiresParent is returned if the user tries to create a pipeline without a parent space.
|
||||
errPipelineRequiresParent = usererror.BadRequest(
|
||||
"Parent space required - standalone pipelines are not supported.")
|
||||
|
||||
// errPipelineRequiresConfigPath is returned if the user tries to create a pipeline with an empty config path.
|
||||
errPipelineRequiresConfigPath = usererror.BadRequest(
|
||||
"Pipeline requires a config path.")
|
||||
|
|
|
@ -49,6 +49,10 @@ func (c *Controller) Update(
|
|||
return nil, fmt.Errorf("failed to authorize pipeline: %w", err)
|
||||
}
|
||||
|
||||
if err = c.sanitizeUpdateInput(in); err != nil {
|
||||
return nil, fmt.Errorf("failed to sanitize input: %w", err)
|
||||
}
|
||||
|
||||
pipeline, err := c.pipelineStore.FindByUID(ctx, repo.ID, uid)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to find pipeline: %w", err)
|
||||
|
@ -72,7 +76,7 @@ func (c *Controller) Update(
|
|||
})
|
||||
}
|
||||
|
||||
func (c *Controller) sanitizeUpdatenput(in *UpdateInput) error {
|
||||
func (c *Controller) sanitizeUpdateInput(in *UpdateInput) error {
|
||||
if in.UID != nil {
|
||||
if err := c.uidCheck(*in.UID, false); err != nil {
|
||||
return err
|
||||
|
|
|
@ -29,7 +29,6 @@ func (c *Controller) List(
|
|||
ctx context.Context,
|
||||
filter types.ListQueryFilter,
|
||||
) ([]*types.Plugin, int64, error) {
|
||||
|
||||
plugins, err := c.pluginStore.List(ctx, filter)
|
||||
if err != nil {
|
||||
return nil, 0, fmt.Errorf("failed to list plugins: %w", err)
|
||||
|
|
|
@ -48,6 +48,8 @@ func (f *FileViewAddInput) Validate() error {
|
|||
// The downside is that the caller could provide a SHA that never was part of the PR in the first place.
|
||||
// We can't block against that with our current data, as the existence of force push makes it impossible to verify
|
||||
// whether the commit ever was part of the PR - it would require us to store the full pr.SourceSHA history.
|
||||
//
|
||||
//nolint:gocognit // refactor if needed.
|
||||
func (c *Controller) FileViewAdd(
|
||||
ctx context.Context,
|
||||
session *auth.Session,
|
||||
|
|
|
@ -64,7 +64,10 @@ func (c *Controller) DeleteNoAuth(ctx context.Context, session *auth.Session, re
|
|||
return nil
|
||||
}
|
||||
|
||||
func (c *Controller) DeleteGitRPCRepositories(ctx context.Context, session *auth.Session, repo *types.Repository) error {
|
||||
func (c *Controller) DeleteGitRPCRepositories(
|
||||
ctx context.Context,
|
||||
session *auth.Session, repo *types.Repository,
|
||||
) error {
|
||||
writeParams, err := CreateRPCWriteParams(ctx, c.urlProvider, session, repo)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create RPC write params: %w", err)
|
||||
|
|
|
@ -38,7 +38,7 @@ func (c *Controller) PipelineGenerate(
|
|||
ReadParams: CreateRPCReadParams(repo),
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to generate pipeline: %s", err)
|
||||
return nil, fmt.Errorf("failed to generate pipeline: %w", err)
|
||||
}
|
||||
|
||||
return result.PipelineYAML, nil
|
||||
|
|
|
@ -50,6 +50,10 @@ func (c *Controller) Update(
|
|||
return nil, fmt.Errorf("failed to authorize: %w", err)
|
||||
}
|
||||
|
||||
if err = c.sanitizeUpdateInput(in); err != nil {
|
||||
return nil, fmt.Errorf("failed to sanitize input: %w", err)
|
||||
}
|
||||
|
||||
secret, err := c.secretStore.FindByUID(ctx, space.ID, uid)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to find secret: %w", err)
|
||||
|
|
|
@ -179,7 +179,6 @@ func (c *Controller) getSpaceCheckAuthSpaceCreation(
|
|||
}
|
||||
|
||||
return parentSpace.ID, nil
|
||||
|
||||
}
|
||||
|
||||
func (c *Controller) sanitizeCreateInput(in *CreateInput) error {
|
||||
|
|
|
@ -23,7 +23,7 @@ import (
|
|||
|
||||
apiauth "github.com/harness/gitness/internal/api/auth"
|
||||
"github.com/harness/gitness/internal/auth"
|
||||
"github.com/harness/gitness/internal/writer"
|
||||
gitnessio "github.com/harness/gitness/internal/io"
|
||||
"github.com/harness/gitness/types/enum"
|
||||
|
||||
"github.com/rs/zerolog/log"
|
||||
|
@ -34,11 +34,12 @@ var (
|
|||
tailMaxTime = 2 * time.Hour
|
||||
)
|
||||
|
||||
//nolint:gocognit // refactor if needed
|
||||
func (c *Controller) Events(
|
||||
ctx context.Context,
|
||||
session *auth.Session,
|
||||
spaceRef string,
|
||||
w writer.WriterFlusher,
|
||||
w gitnessio.WriterFlusher,
|
||||
) error {
|
||||
space, err := c.spaceStore.FindByRef(ctx, spaceRef)
|
||||
if err != nil {
|
||||
|
@ -52,7 +53,10 @@ func (c *Controller) Events(
|
|||
ctx, ctxCancel := context.WithTimeout(ctx, tailMaxTime)
|
||||
defer ctxCancel()
|
||||
|
||||
io.WriteString(w, ": ping\n\n")
|
||||
_, err = io.WriteString(w, ": ping\n\n")
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to send initial ping: %w", err)
|
||||
}
|
||||
w.Flush()
|
||||
|
||||
eventStream, errorStream, sseCancel := c.sseStreamer.Stream(ctx, space.ID)
|
||||
|
@ -64,7 +68,7 @@ func (c *Controller) Events(
|
|||
}()
|
||||
// could not get error channel
|
||||
if errorStream == nil {
|
||||
io.WriteString(w, "event: error\ndata: eof\n\n")
|
||||
_, _ = io.WriteString(w, "event: error\ndata: eof\n\n")
|
||||
w.Flush()
|
||||
return fmt.Errorf("could not get error channel")
|
||||
}
|
||||
|
@ -85,28 +89,49 @@ L:
|
|||
pingTimer.Reset(pingInterval)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
log.Debug().Msg("events: stream cancelled")
|
||||
log.Ctx(ctx).Debug().Msg("events: stream cancelled")
|
||||
break L
|
||||
case err := <-errorStream:
|
||||
log.Err(err).Msg("events: received error in the tail channel")
|
||||
break L
|
||||
case <-pingTimer.C:
|
||||
// if time b/w messages takes longer, send a ping
|
||||
io.WriteString(w, ": ping\n\n")
|
||||
_, err = io.WriteString(w, ": ping\n\n")
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to send ping: %w", err)
|
||||
}
|
||||
w.Flush()
|
||||
case event := <-eventStream:
|
||||
io.WriteString(w, fmt.Sprintf("event: %s\n", event.Type))
|
||||
io.WriteString(w, "data: ")
|
||||
enc.Encode(event.Data)
|
||||
_, err = io.WriteString(w, fmt.Sprintf("event: %s\n", event.Type))
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to send event header: %w", err)
|
||||
}
|
||||
_, err = io.WriteString(w, "data: ")
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to send data header: %w", err)
|
||||
}
|
||||
err = enc.Encode(event.Data)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to send data: %w", err)
|
||||
}
|
||||
// NOTE: enc.Encode is ending the data with a new line, only add one more
|
||||
// Source: https://cs.opensource.google/go/go/+/refs/tags/go1.21.1:src/encoding/json/stream.go;l=220
|
||||
io.WriteString(w, "\n")
|
||||
_, err = io.WriteString(w, "\n")
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to send end of message: %w", err)
|
||||
}
|
||||
|
||||
w.Flush()
|
||||
}
|
||||
}
|
||||
|
||||
io.WriteString(w, "event: error\ndata: eof\n\n")
|
||||
_, err = io.WriteString(w, "event: error\ndata: eof\n\n")
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to send eof: %w", err)
|
||||
}
|
||||
w.Flush()
|
||||
log.Debug().Msg("events: stream closed")
|
||||
|
||||
log.Ctx(ctx).Debug().Msg("events: stream closed")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -29,9 +29,9 @@ import (
|
|||
)
|
||||
|
||||
type ExportInput struct {
|
||||
AccountId string `json:"accountId"`
|
||||
OrgIdentifier string `json:"orgIdentifier"`
|
||||
ProjectIdentifier string `json:"projectIdentifier"`
|
||||
AccountID string `json:"account_id"`
|
||||
OrgIdentifier string `json:"org_identifier"`
|
||||
ProjectIdentifier string `json:"project_identifier"`
|
||||
Token string `json:"token"`
|
||||
}
|
||||
|
||||
|
@ -52,7 +52,7 @@ func (c *Controller) Export(ctx context.Context, session *auth.Session, spaceRef
|
|||
}
|
||||
|
||||
providerInfo := &exporter.HarnessCodeInfo{
|
||||
AccountId: in.AccountId,
|
||||
AccountID: in.AccountID,
|
||||
ProjectIdentifier: in.ProjectIdentifier,
|
||||
OrgIdentifier: in.OrgIdentifier,
|
||||
Token: in.Token,
|
||||
|
@ -68,7 +68,7 @@ func (c *Controller) Export(ctx context.Context, session *auth.Session, spaceRef
|
|||
if len(reposInPage) == 0 {
|
||||
break
|
||||
}
|
||||
page += 1
|
||||
page++
|
||||
repos = append(repos, reposInPage...)
|
||||
}
|
||||
|
||||
|
@ -90,7 +90,7 @@ func (c *Controller) Export(ctx context.Context, session *auth.Session, spaceRef
|
|||
}
|
||||
|
||||
func (c *Controller) sanitizeExportInput(in *ExportInput) error {
|
||||
if in.AccountId == "" {
|
||||
if in.AccountID == "" {
|
||||
return usererror.BadRequest("account id must be provided")
|
||||
}
|
||||
|
||||
|
|
|
@ -24,6 +24,7 @@ import (
|
|||
"github.com/harness/gitness/internal/services/exporter"
|
||||
"github.com/harness/gitness/types"
|
||||
"github.com/harness/gitness/types/enum"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
|
|
|
@ -39,7 +39,8 @@ func ProvideController(config *types.Config, db *sqlx.DB, urlProvider *url.Provi
|
|||
pipelineStore store.PipelineStore, secretStore store.SecretStore,
|
||||
connectorStore store.ConnectorStore, templateStore store.TemplateStore,
|
||||
spaceStore store.SpaceStore, repoStore store.RepoStore, principalStore store.PrincipalStore,
|
||||
repoCtrl *repo.Controller, membershipStore store.MembershipStore, importer *importer.Repository, exporter *exporter.Repository,
|
||||
repoCtrl *repo.Controller, membershipStore store.MembershipStore, importer *importer.Repository,
|
||||
exporter *exporter.Repository,
|
||||
) *Controller {
|
||||
return NewController(config, db, urlProvider, sseStreamer, uidCheck, authorizer,
|
||||
spacePathStore, pipelineStore, secretStore,
|
||||
|
|
|
@ -50,6 +50,10 @@ func (c *Controller) Update(
|
|||
return nil, fmt.Errorf("failed to authorize: %w", err)
|
||||
}
|
||||
|
||||
if err = c.sanitizeUpdateInput(in); err != nil {
|
||||
return nil, fmt.Errorf("failed to sanitize input: %w", err)
|
||||
}
|
||||
|
||||
template, err := c.templateStore.FindByUID(ctx, space.ID, uid)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to find template: %w", err)
|
||||
|
|
|
@ -44,7 +44,7 @@ func (c *Controller) UpdateAdmin(ctx context.Context, session *auth.Session,
|
|||
}
|
||||
|
||||
// Fail if the user being updated is the only admin in DB.
|
||||
if request.Admin == false && user.Admin == true {
|
||||
if !request.Admin && user.Admin {
|
||||
admUsrCount, err := c.principalStore.CountUsers(ctx, &types.UserFilter{Admin: true})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to check admin user count: %w", err)
|
||||
|
|
|
@ -22,6 +22,7 @@ import (
|
|||
"github.com/harness/gitness/internal/api/request"
|
||||
)
|
||||
|
||||
//nolint:cyclop // this should move to controller
|
||||
func HandleFind(logCtrl *logs.Controller) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := r.Context()
|
||||
|
|
|
@ -12,6 +12,7 @@
|
|||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//nolint:cyclop
|
||||
package logs
|
||||
|
||||
import (
|
||||
|
@ -33,6 +34,9 @@ var (
|
|||
tailMaxTime = 1 * time.Hour
|
||||
)
|
||||
|
||||
// TODO: Move to controller and do error handling (see space events)
|
||||
//
|
||||
//nolint:gocognit,errcheck,cyclop // refactor if needed.
|
||||
func HandleTail(logCtrl *logs.Controller) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := r.Context()
|
||||
|
|
|
@ -20,13 +20,13 @@ import (
|
|||
"github.com/harness/gitness/internal/api/controller/space"
|
||||
"github.com/harness/gitness/internal/api/render"
|
||||
"github.com/harness/gitness/internal/api/request"
|
||||
"github.com/harness/gitness/internal/writer"
|
||||
"github.com/harness/gitness/internal/io"
|
||||
|
||||
"github.com/rs/zerolog/log"
|
||||
)
|
||||
|
||||
// HandleEvents returns an http.HandlerFunc that watches for
|
||||
// events on a space
|
||||
// events on a space.
|
||||
func HandleEvents(spaceCtrl *space.Controller) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := r.Context()
|
||||
|
@ -52,7 +52,7 @@ func HandleEvents(spaceCtrl *space.Controller) http.HandlerFunc {
|
|||
return
|
||||
}
|
||||
|
||||
writer := writer.NewWriterFlusher(w, f)
|
||||
writer := io.NewWriterFlusher(w, f)
|
||||
|
||||
err = spaceCtrl.Events(ctx, session, spaceRef, writer)
|
||||
if err != nil {
|
||||
|
|
|
@ -19,7 +19,6 @@ import (
|
|||
"net/http"
|
||||
|
||||
"github.com/harness/gitness/internal/api/controller/space"
|
||||
|
||||
"github.com/harness/gitness/internal/api/render"
|
||||
"github.com/harness/gitness/internal/api/request"
|
||||
)
|
||||
|
|
|
@ -18,7 +18,6 @@ import (
|
|||
"net/http"
|
||||
|
||||
"github.com/harness/gitness/internal/api/controller/space"
|
||||
|
||||
"github.com/harness/gitness/internal/api/render"
|
||||
"github.com/harness/gitness/internal/api/request"
|
||||
)
|
||||
|
|
|
@ -38,7 +38,7 @@ func Generate() *openapi3.Spec {
|
|||
WithTitle("API Specification").
|
||||
WithVersion(version.Version.String())
|
||||
reflector.Spec.Servers = []openapi3.Server{{
|
||||
URL: config.ApiURL,
|
||||
URL: config.APIURL,
|
||||
}}
|
||||
|
||||
//
|
||||
|
|
|
@ -57,10 +57,7 @@ func GetStepNumberFromPath(r *http.Request) (int64, error) {
|
|||
|
||||
func GetLatestFromPath(r *http.Request) bool {
|
||||
v, _ := QueryParam(r, QueryParamLatest)
|
||||
if v == "true" {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
return v == "true"
|
||||
}
|
||||
|
||||
func GetTriggerUIDFromPath(r *http.Request) (string, error) {
|
||||
|
|
|
@ -44,7 +44,7 @@ func GetRepoIDFromQuery(r *http.Request) (int64, error) {
|
|||
|
||||
// ParseSortRepo extracts the repo sort parameter from the url.
|
||||
func ParseSortRepo(r *http.Request) enum.RepoAttr {
|
||||
return enum.ParseRepoAtrr(
|
||||
return enum.ParseRepoAttr(
|
||||
r.URL.Query().Get(QueryParamSort),
|
||||
)
|
||||
}
|
||||
|
|
|
@ -48,7 +48,7 @@ const (
|
|||
PerPageMax = 100
|
||||
)
|
||||
|
||||
// GetCookie tries to retrive the cookie from the request or returns false if it doesn't exist.
|
||||
// GetCookie tries to retrieve the cookie from the request or returns false if it doesn't exist.
|
||||
func GetCookie(r *http.Request, cookieName string) (string, bool) {
|
||||
cookie, err := r.Cookie(cookieName)
|
||||
if errors.Is(err, http.ErrNoCookie) {
|
||||
|
|
|
@ -89,10 +89,7 @@ func (a *JWTAuthenticator) Authenticate(r *http.Request, sourceRouter SourceRout
|
|||
return nil, fmt.Errorf("failed to get metadata from token claims: %w", err)
|
||||
}
|
||||
case claims.Membership != nil:
|
||||
metadata, err = a.metadataFromMembershipClaims(claims.Membership)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get metadata from membership claims: %w", err)
|
||||
}
|
||||
metadata = a.metadataFromMembershipClaims(claims.Membership)
|
||||
default:
|
||||
return nil, fmt.Errorf("jwt is missing sub-claims")
|
||||
}
|
||||
|
@ -128,12 +125,12 @@ func (a *JWTAuthenticator) metadataFromTokenClaims(
|
|||
|
||||
func (a *JWTAuthenticator) metadataFromMembershipClaims(
|
||||
mbsClaims *jwt.SubClaimsMembership,
|
||||
) (auth.Metadata, error) {
|
||||
) auth.Metadata {
|
||||
// We could check if space exists - but also okay to fail later (saves db call)
|
||||
return &auth.MembershipMetadata{
|
||||
SpaceID: mbsClaims.SpaceID,
|
||||
Role: mbsClaims.Role,
|
||||
}, nil
|
||||
}
|
||||
}
|
||||
|
||||
func extractToken(r *http.Request, cookieName string) string {
|
||||
|
|
|
@ -26,6 +26,10 @@ var WireSet = wire.NewSet(
|
|||
ProvideAuthenticator,
|
||||
)
|
||||
|
||||
func ProvideAuthenticator(config *types.Config, principalStore store.PrincipalStore, tokenStore store.TokenStore) Authenticator {
|
||||
func ProvideAuthenticator(
|
||||
config *types.Config,
|
||||
principalStore store.PrincipalStore,
|
||||
tokenStore store.TokenStore,
|
||||
) Authenticator {
|
||||
return NewTokenAuthenticator(principalStore, tokenStore, config.Token.CookieName)
|
||||
}
|
||||
|
|
|
@ -78,6 +78,7 @@ func (a *MembershipAuthorizer) Check(
|
|||
|
||||
var spacePath string
|
||||
|
||||
//nolint:exhaustive // we want to fail on anything else
|
||||
switch resource.Type {
|
||||
case enum.ResourceTypeSpace:
|
||||
spacePath = paths.Concatinate(scope.SpacePath, resource.Name)
|
||||
|
@ -94,6 +95,12 @@ func (a *MembershipAuthorizer) Check(
|
|||
case enum.ResourceTypeSecret:
|
||||
spacePath = scope.SpacePath
|
||||
|
||||
case enum.ResourceTypeConnector:
|
||||
spacePath = scope.SpacePath
|
||||
|
||||
case enum.ResourceTypeTemplate:
|
||||
spacePath = scope.SpacePath
|
||||
|
||||
case enum.ResourceTypeUser:
|
||||
// a user is allowed to view / edit themselves
|
||||
if resource.Name == session.Principal.UID &&
|
||||
|
|
|
@ -15,5 +15,5 @@
|
|||
package config
|
||||
|
||||
const (
|
||||
ApiURL = "/api/v1"
|
||||
APIURL = "/api/v1"
|
||||
)
|
||||
|
|
|
@ -12,7 +12,7 @@
|
|||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package writer
|
||||
package io
|
||||
|
||||
import "io"
|
||||
|
||||
|
@ -25,6 +25,7 @@ type writeWithFlusher struct {
|
|||
flusher Flusher
|
||||
}
|
||||
|
||||
// nolint
|
||||
type WriterFlusher interface {
|
||||
io.Writer
|
||||
Flusher
|
|
@ -80,7 +80,13 @@ func GenerateForToken(token *types.Token, secret string) (string, error) {
|
|||
}
|
||||
|
||||
// GenerateWithMembership generates a jwt with the given ephemeral membership.
|
||||
func GenerateWithMembership(principalID int64, spaceID int64, role enum.MembershipRole, lifetime time.Duration, secret string) (string, error) {
|
||||
func GenerateWithMembership(
|
||||
principalID int64,
|
||||
spaceID int64,
|
||||
role enum.MembershipRole,
|
||||
lifetime time.Duration,
|
||||
secret string,
|
||||
) (string, error) {
|
||||
issuedAt := time.Now()
|
||||
expiresAt := issuedAt.Add(lifetime)
|
||||
|
||||
|
|
|
@ -86,7 +86,7 @@ func Segments(path string) []string {
|
|||
}
|
||||
|
||||
// IsAncesterOf returns true iff 'path' is an ancestor of 'other' or they are the same.
|
||||
// e.g. other = path(/.*)
|
||||
// e.g. other = path(/.*).
|
||||
func IsAncesterOf(path string, other string) bool {
|
||||
path = strings.Trim(path, types.PathSeparator)
|
||||
other = strings.Trim(other, types.PathSeparator)
|
||||
|
|
|
@ -63,6 +63,7 @@ func New(
|
|||
}
|
||||
}
|
||||
|
||||
//nolint:gocognit // refactor if needed.
|
||||
func (s *service) Cancel(ctx context.Context, repo *types.Repository, execution *types.Execution) error {
|
||||
log := log.With().
|
||||
Int64("execution.id", execution.ID).
|
||||
|
@ -73,9 +74,8 @@ func (s *service) Cancel(ctx context.Context, repo *types.Repository, execution
|
|||
// do not cancel the build if the build status is
|
||||
// complete. only cancel the build if the status is
|
||||
// running or pending.
|
||||
switch execution.Status {
|
||||
case enum.CIStatusPending, enum.CIStatusRunning:
|
||||
default:
|
||||
if execution.Status != enum.CIStatusPending &&
|
||||
execution.Status != enum.CIStatusRunning {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -26,7 +26,7 @@ type service struct {
|
|||
gitRPCClient gitrpc.Interface
|
||||
}
|
||||
|
||||
func new(gitRPCClient gitrpc.Interface) CommitService {
|
||||
func newService(gitRPCClient gitrpc.Interface) Service {
|
||||
return &service{gitRPCClient: gitRPCClient}
|
||||
}
|
||||
|
||||
|
@ -53,7 +53,7 @@ func (f *service) FindRef(
|
|||
return controller.MapCommit(branchOutput.Branch.Commit)
|
||||
}
|
||||
|
||||
// FindCommit finds information about a commit in gitness for the git SHA
|
||||
// FindCommit finds information about a commit in gitness for the git SHA.
|
||||
func (f *service) FindCommit(
|
||||
ctx context.Context,
|
||||
repo *types.Repository,
|
||||
|
|
|
@ -21,10 +21,10 @@ import (
|
|||
)
|
||||
|
||||
type (
|
||||
// CommitService provides access to commit information via
|
||||
// Service provides access to commit information via
|
||||
// the SCM provider. Today, this is gitness but it can
|
||||
// be extendible to any SCM provider.
|
||||
CommitService interface {
|
||||
Service interface {
|
||||
// ref is the ref to fetch the commit from, eg refs/heads/master
|
||||
FindRef(ctx context.Context, repo *types.Repository, ref string) (*types.Commit, error)
|
||||
|
||||
|
|
|
@ -22,11 +22,11 @@ import (
|
|||
|
||||
// WireSet provides a wire set for this package.
|
||||
var WireSet = wire.NewSet(
|
||||
ProvideCommitService,
|
||||
ProvideService,
|
||||
)
|
||||
|
||||
// ProvideCommitService provides a service which can fetch commit
|
||||
// ProvideService provides a service which can fetch commit
|
||||
// information about a repository.
|
||||
func ProvideCommitService(gitRPCClient gitrpc.Interface) CommitService {
|
||||
return new(gitRPCClient)
|
||||
func ProvideService(gitRPCClient gitrpc.Interface) Service {
|
||||
return newService(gitRPCClient)
|
||||
}
|
||||
|
|
|
@ -27,7 +27,7 @@ type service struct {
|
|||
gitRPCClient gitrpc.Interface
|
||||
}
|
||||
|
||||
func new(gitRPCClient gitrpc.Interface) FileService {
|
||||
func newService(gitRPCClient gitrpc.Interface) Service {
|
||||
return &service{gitRPCClient: gitRPCClient}
|
||||
}
|
||||
|
||||
|
|
|
@ -27,13 +27,13 @@ type (
|
|||
Data []byte
|
||||
}
|
||||
|
||||
// FileService provides access to contents of files in
|
||||
// Service provides access to contents of files in
|
||||
// the SCM provider. Today, this is gitness but it should
|
||||
// be extendible to any SCM provider.
|
||||
// The plan is for all remote repos to be pointers inside gitness
|
||||
// so a repo entry would always exist. If this changes, the interface
|
||||
// can be updated.
|
||||
FileService interface {
|
||||
Service interface {
|
||||
// path is the path in the repo to read
|
||||
// ref is the git ref for the repository e.g. refs/heads/master
|
||||
Get(ctx context.Context, repo *types.Repository, path, ref string) (*File, error)
|
||||
|
|
|
@ -22,11 +22,11 @@ import (
|
|||
|
||||
// WireSet provides a wire set for this package.
|
||||
var WireSet = wire.NewSet(
|
||||
ProvideFileService,
|
||||
ProvideService,
|
||||
)
|
||||
|
||||
// ProvideFileService provides a service which can read file contents
|
||||
// ProvideService provides a service which can read file contents
|
||||
// from a repository.
|
||||
func ProvideFileService(gitRPCClient gitrpc.Interface) FileService {
|
||||
return new(gitRPCClient)
|
||||
func ProvideService(gitRPCClient gitrpc.Interface) Service {
|
||||
return newService(gitRPCClient)
|
||||
}
|
||||
|
|
|
@ -34,7 +34,7 @@ type embedded struct {
|
|||
|
||||
var _ client.Client = (*embedded)(nil)
|
||||
|
||||
func NewEmbeddedClient(manager ExecutionManager, config *types.Config) *embedded {
|
||||
func NewEmbeddedClient(manager ExecutionManager, config *types.Config) client.Client {
|
||||
return &embedded{
|
||||
config: config,
|
||||
manager: manager,
|
||||
|
|
|
@ -223,7 +223,7 @@ func ConvertToDroneRepo(repo *types.Repository) *drone.Repo {
|
|||
Branch: repo.DefaultBranch,
|
||||
// TODO: We can get this from configuration once we start populating it.
|
||||
// If this is not set drone runner cancels the build.
|
||||
Timeout: int64(time.Duration(10 * time.Hour).Seconds()),
|
||||
Timeout: int64((10 * time.Hour).Seconds()),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -127,7 +127,7 @@ type (
|
|||
type Manager struct {
|
||||
Executions store.ExecutionStore
|
||||
Config *types.Config
|
||||
FileService file.FileService
|
||||
FileService file.Service
|
||||
Pipelines store.PipelineStore
|
||||
urlProvider *urlprovider.Provider
|
||||
Checks store.CheckStore
|
||||
|
@ -154,7 +154,7 @@ func New(
|
|||
pipelineStore store.PipelineStore,
|
||||
urlProvider *urlprovider.Provider,
|
||||
sseStreamer sse.Streamer,
|
||||
fileService file.FileService,
|
||||
fileService file.Service,
|
||||
logStore store.LogStore,
|
||||
logStream livelog.LogStream,
|
||||
checkStore store.CheckStore,
|
||||
|
@ -238,11 +238,12 @@ func (m *Manager) Accept(ctx context.Context, id int64, machine string) (*types.
|
|||
stage.Machine = machine
|
||||
stage.Status = enum.CIStatusPending
|
||||
err = m.Stages.Update(noContext, stage)
|
||||
if errors.Is(err, gitness_store.ErrVersionConflict) {
|
||||
switch {
|
||||
case errors.Is(err, gitness_store.ErrVersionConflict):
|
||||
log.Debug().Err(err).Msg("manager: stage processed by another agent")
|
||||
} else if err != nil {
|
||||
case err != nil:
|
||||
log.Debug().Err(err).Msg("manager: cannot update stage")
|
||||
} else {
|
||||
default:
|
||||
log.Info().Msg("manager: stage accepted")
|
||||
}
|
||||
return stage, err
|
||||
|
@ -353,13 +354,13 @@ func (m *Manager) createNetrc(repo *types.Repository) (*Netrc, error) {
|
|||
return nil, fmt.Errorf("failed to create jwt: %w", err)
|
||||
}
|
||||
|
||||
cloneUrl, err := url.Parse(repo.GitURL)
|
||||
cloneURL, err := url.Parse(repo.GitURL)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to parse clone url '%s': %w", cloneUrl, err)
|
||||
return nil, fmt.Errorf("failed to parse clone url '%s': %w", cloneURL, err)
|
||||
}
|
||||
|
||||
return &Netrc{
|
||||
Machine: cloneUrl.Hostname(),
|
||||
Machine: cloneURL.Hostname(),
|
||||
Login: pipelinePrincipal.UID,
|
||||
Password: jwt,
|
||||
}, nil
|
||||
|
|
|
@ -125,7 +125,7 @@ func (s *setup) updateExecution(ctx context.Context, execution *types.Execution)
|
|||
}
|
||||
execution.Started = time.Now().UnixMilli()
|
||||
execution.Status = enum.CIStatusRunning
|
||||
err := s.Executions.Update(noContext, execution)
|
||||
err := s.Executions.Update(ctx, execution)
|
||||
if errors.Is(err, gitness_store.ErrVersionConflict) {
|
||||
return false, nil
|
||||
}
|
||||
|
|
|
@ -16,6 +16,7 @@ package manager
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
|
@ -44,6 +45,7 @@ type teardown struct {
|
|||
Stages store.StageStore
|
||||
}
|
||||
|
||||
//nolint:gocognit // refactor if needed.
|
||||
func (t *teardown) do(ctx context.Context, stage *types.Stage) error {
|
||||
log := log.With().
|
||||
Int64("stage.id", stage.ID).
|
||||
|
@ -98,7 +100,10 @@ func (t *teardown) do(ctx context.Context, stage *types.Stage) error {
|
|||
}
|
||||
|
||||
for _, step := range stage.Steps {
|
||||
t.Logs.Delete(noContext, step.ID)
|
||||
err = t.Logs.Delete(noContext, step.ID)
|
||||
if err != nil {
|
||||
log.Warn().Err(err).Msgf("failed to delete log stream for step %d", step.ID)
|
||||
}
|
||||
}
|
||||
|
||||
stages, err := t.Stages.ListWithSteps(noContext, execution.ID)
|
||||
|
@ -115,7 +120,7 @@ func (t *teardown) do(ctx context.Context, stage *types.Stage) error {
|
|||
return err
|
||||
}
|
||||
|
||||
err = t.scheduleDownstream(ctx, stage, stages)
|
||||
err = t.scheduleDownstream(ctx, stages)
|
||||
if err != nil {
|
||||
log.Error().Err(err).
|
||||
Msg("manager: cannot schedule downstream builds")
|
||||
|
@ -151,7 +156,7 @@ func (t *teardown) do(ctx context.Context, stage *types.Stage) error {
|
|||
}
|
||||
|
||||
err = t.Executions.Update(noContext, execution)
|
||||
if err == gitness_store.ErrVersionConflict {
|
||||
if errors.Is(err, gitness_store.ErrVersionConflict) {
|
||||
log.Warn().Err(err).
|
||||
Msg("manager: execution updated by another goroutine")
|
||||
return nil
|
||||
|
@ -186,6 +191,8 @@ func (t *teardown) do(ctx context.Context, stage *types.Stage) error {
|
|||
// cancelDownstream is a helper function that tests for
|
||||
// downstream stages and cancels them based on the overall
|
||||
// pipeline state.
|
||||
//
|
||||
//nolint:gocognit // refactor if needed
|
||||
func (t *teardown) cancelDownstream(
|
||||
ctx context.Context,
|
||||
stages []*types.Stage,
|
||||
|
@ -233,8 +240,11 @@ func (t *teardown) cancelDownstream(
|
|||
s.Started = time.Now().UnixMilli()
|
||||
s.Stopped = time.Now().UnixMilli()
|
||||
err := t.Stages.Update(noContext, s)
|
||||
if err == gitness_store.ErrVersionConflict {
|
||||
t.resync(ctx, s)
|
||||
if errors.Is(err, gitness_store.ErrVersionConflict) {
|
||||
rErr := t.resync(ctx, s)
|
||||
if rErr != nil {
|
||||
log.Warn().Err(rErr).Msg("failed to resync after version conflict")
|
||||
}
|
||||
continue
|
||||
}
|
||||
if err != nil {
|
||||
|
@ -248,12 +258,11 @@ func (t *teardown) cancelDownstream(
|
|||
|
||||
func isexecutionComplete(stages []*types.Stage) bool {
|
||||
for _, stage := range stages {
|
||||
switch stage.Status {
|
||||
case enum.CIStatusPending,
|
||||
enum.CIStatusRunning,
|
||||
enum.CIStatusWaitingOnDeps,
|
||||
enum.CIStatusDeclined,
|
||||
enum.CIStatusBlocked:
|
||||
if stage.Status == enum.CIStatusPending ||
|
||||
stage.Status == enum.CIStatusRunning ||
|
||||
stage.Status == enum.CIStatusWaitingOnDeps ||
|
||||
stage.Status == enum.CIStatusDeclined ||
|
||||
stage.Status == enum.CIStatusBlocked {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
@ -281,55 +290,58 @@ func areDepsComplete(stage *types.Stage, stages []*types.Stage) bool {
|
|||
// and execution requirements are met.
|
||||
func (t *teardown) scheduleDownstream(
|
||||
ctx context.Context,
|
||||
stage *types.Stage,
|
||||
stages []*types.Stage,
|
||||
) error {
|
||||
|
||||
var errs error
|
||||
for _, sibling := range stages {
|
||||
if sibling.Status == enum.CIStatusWaitingOnDeps {
|
||||
if len(sibling.DependsOn) == 0 {
|
||||
continue
|
||||
}
|
||||
if sibling.Status != enum.CIStatusWaitingOnDeps {
|
||||
continue
|
||||
}
|
||||
|
||||
// PROBLEM: isDep only checks the direct parent
|
||||
// i think ....
|
||||
// if isDep(stage, sibling) == false {
|
||||
// continue
|
||||
// }
|
||||
if !areDepsComplete(sibling, stages) {
|
||||
continue
|
||||
}
|
||||
// if isLastDep(stage, sibling, stages) == false {
|
||||
// continue
|
||||
// }
|
||||
if len(sibling.DependsOn) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
log := log.With().
|
||||
Int64("stage.id", sibling.ID).
|
||||
Str("stage.name", sibling.Name).
|
||||
Str("stage.depends_on", strings.Join(sibling.DependsOn, ",")).
|
||||
Logger()
|
||||
// PROBLEM: isDep only checks the direct parent
|
||||
// i think ....
|
||||
// if isDep(stage, sibling) == false {
|
||||
// continue
|
||||
// }
|
||||
if !areDepsComplete(sibling, stages) {
|
||||
continue
|
||||
}
|
||||
// if isLastDep(stage, sibling, stages) == false {
|
||||
// continue
|
||||
// }
|
||||
|
||||
log.Debug().Msg("manager: schedule next stage")
|
||||
log := log.With().
|
||||
Int64("stage.id", sibling.ID).
|
||||
Str("stage.name", sibling.Name).
|
||||
Str("stage.depends_on", strings.Join(sibling.DependsOn, ",")).
|
||||
Logger()
|
||||
|
||||
sibling.Status = enum.CIStatusPending
|
||||
err := t.Stages.Update(noContext, sibling)
|
||||
if err == gitness_store.ErrVersionConflict {
|
||||
t.resync(ctx, sibling)
|
||||
continue
|
||||
}
|
||||
if err != nil {
|
||||
log.Error().Err(err).
|
||||
Msg("manager: cannot update stage status")
|
||||
errs = multierror.Append(errs, err)
|
||||
}
|
||||
log.Debug().Msg("manager: schedule next stage")
|
||||
|
||||
err = t.Scheduler.Schedule(noContext, sibling)
|
||||
if err != nil {
|
||||
log.Error().Err(err).
|
||||
Msg("manager: cannot schedule stage")
|
||||
errs = multierror.Append(errs, err)
|
||||
sibling.Status = enum.CIStatusPending
|
||||
err := t.Stages.Update(noContext, sibling)
|
||||
if errors.Is(err, gitness_store.ErrVersionConflict) {
|
||||
rErr := t.resync(ctx, sibling)
|
||||
if rErr != nil {
|
||||
log.Warn().Err(rErr).Msg("failed to resync after version conflict")
|
||||
}
|
||||
continue
|
||||
}
|
||||
if err != nil {
|
||||
log.Error().Err(err).
|
||||
Msg("manager: cannot update stage status")
|
||||
errs = multierror.Append(errs, err)
|
||||
}
|
||||
|
||||
err = t.Scheduler.Schedule(noContext, sibling)
|
||||
if err != nil {
|
||||
log.Error().Err(err).
|
||||
Msg("manager: cannot schedule stage")
|
||||
errs = multierror.Append(errs, err)
|
||||
}
|
||||
}
|
||||
return errs
|
||||
|
|
|
@ -34,7 +34,7 @@ type updater struct {
|
|||
}
|
||||
|
||||
func (u *updater) do(ctx context.Context, step *types.Step) error {
|
||||
log := log.With().
|
||||
log := log.Ctx(ctx).With().
|
||||
Str("step.name", step.Name).
|
||||
Str("step.status", string(step.Status)).
|
||||
Int64("step.id", step.ID).
|
||||
|
|
|
@ -40,7 +40,7 @@ func ProvideExecutionManager(
|
|||
pipelineStore store.PipelineStore,
|
||||
urlProvider *url.Provider,
|
||||
sseStreamer sse.Streamer,
|
||||
fileService file.FileService,
|
||||
fileService file.Service,
|
||||
logStore store.LogStore,
|
||||
logStream livelog.LogStream,
|
||||
checkStore store.CheckStore,
|
||||
|
@ -55,7 +55,7 @@ func ProvideExecutionManager(
|
|||
}
|
||||
|
||||
// ProvideExecutionClient provides a client implementation to interact with the execution manager.
|
||||
// We use an embedded client here
|
||||
// We use an embedded client here.
|
||||
func ProvideExecutionClient(manager ExecutionManager, config *types.Config) client.Client {
|
||||
return NewEmbeddedClient(manager, config)
|
||||
}
|
||||
|
|
|
@ -35,23 +35,23 @@ import (
|
|||
// Lookup returns a resource by name, kind and type.
|
||||
type LookupFunc func(name, kind, typ, version string) (*v1yaml.Config, error)
|
||||
|
||||
type PluginManager struct {
|
||||
type Manager struct {
|
||||
config *types.Config
|
||||
pluginStore store.PluginStore
|
||||
}
|
||||
|
||||
func NewPluginManager(
|
||||
func NewManager(
|
||||
config *types.Config,
|
||||
pluginStore store.PluginStore,
|
||||
) *PluginManager {
|
||||
return &PluginManager{
|
||||
) *Manager {
|
||||
return &Manager{
|
||||
config: config,
|
||||
pluginStore: pluginStore,
|
||||
}
|
||||
}
|
||||
|
||||
// GetLookupFn returns a lookup function for plugins which can be used in the resolver.
|
||||
func (m *PluginManager) GetLookupFn() LookupFunc {
|
||||
func (m *Manager) GetLookupFn() LookupFunc {
|
||||
return func(name, kind, typ, version string) (*v1yaml.Config, error) {
|
||||
if kind != "plugin" {
|
||||
return nil, fmt.Errorf("only plugin kind supported")
|
||||
|
@ -75,28 +75,28 @@ func (m *PluginManager) GetLookupFn() LookupFunc {
|
|||
|
||||
// Populate fetches plugins information from an external source or a local zip
|
||||
// and populates in the DB.
|
||||
func (m *PluginManager) Populate(ctx context.Context) error {
|
||||
path := m.config.CI.PluginsZipPath
|
||||
if path == "" {
|
||||
return fmt.Errorf("plugins path not provided to read schemas from")
|
||||
func (m *Manager) Populate(ctx context.Context) error {
|
||||
pluginsURL := m.config.CI.PluginsZipURL
|
||||
if pluginsURL == "" {
|
||||
return fmt.Errorf("plugins url not provided to read schemas from")
|
||||
}
|
||||
|
||||
var zipFile *zip.ReadCloser
|
||||
if _, err := os.Stat(path); err != nil { // local path doesn't exist - must be a remote link
|
||||
if _, err := os.Stat(pluginsURL); err != nil { // local path doesn't exist - must be a remote link
|
||||
// Download zip file locally
|
||||
f, err := os.CreateTemp(os.TempDir(), "plugins.zip")
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not create temp file: %w", err)
|
||||
}
|
||||
defer os.Remove(f.Name())
|
||||
err = downloadZip(path, f.Name())
|
||||
err = downloadZip(ctx, pluginsURL, f.Name())
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not download remote zip: %w", err)
|
||||
}
|
||||
path = f.Name()
|
||||
pluginsURL = f.Name()
|
||||
}
|
||||
// open up a zip reader for the file
|
||||
zipFile, err := zip.OpenReader(path)
|
||||
zipFile, err := zip.OpenReader(pluginsURL)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not open zip for reading: %w", err)
|
||||
}
|
||||
|
@ -113,12 +113,22 @@ func (m *PluginManager) Populate(ctx context.Context) error {
|
|||
|
||||
// downloadZip is a helper function that downloads a zip from a URL and
|
||||
// writes it to a path in the local filesystem.
|
||||
func downloadZip(url, path string) error {
|
||||
response, err := http.Get(url)
|
||||
//
|
||||
//nolint:gosec // URL is coming from environment variable (user configured it)
|
||||
func downloadZip(ctx context.Context, pluginURL, path string) error {
|
||||
request, err := http.NewRequestWithContext(ctx, http.MethodGet, pluginURL, nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create request: %w", err)
|
||||
}
|
||||
response, err := http.DefaultClient.Do(request)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not get zip from url: %w", err)
|
||||
}
|
||||
defer response.Body.Close()
|
||||
// ensure the body is closed after we read (independent of status code or error)
|
||||
if response != nil && response.Body != nil {
|
||||
// Use function to satisfy the linter which complains about unhandled errors otherwise
|
||||
defer func() { _ = response.Body.Close() }()
|
||||
}
|
||||
|
||||
// Create the file on the local FS. If it exists, it will be truncated.
|
||||
output, err := os.Create(path)
|
||||
|
@ -138,7 +148,9 @@ func downloadZip(url, path string) error {
|
|||
|
||||
// traverseAndUpsertPlugins traverses through the zip and upserts plugins into the database
|
||||
// if they are not present.
|
||||
func (m *PluginManager) traverseAndUpsertPlugins(ctx context.Context, rc *zip.ReadCloser) error {
|
||||
//
|
||||
//nolint:gocognit // refactor if needed.
|
||||
func (m *Manager) traverseAndUpsertPlugins(ctx context.Context, rc *zip.ReadCloser) error {
|
||||
plugins, err := m.pluginStore.ListAll(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not list plugins: %w", err)
|
||||
|
@ -164,7 +176,7 @@ func (m *PluginManager) traverseAndUpsertPlugins(ctx context.Context, rc *zip.Re
|
|||
}
|
||||
defer fc.Close()
|
||||
var buf bytes.Buffer
|
||||
_, err = io.Copy(&buf, fc)
|
||||
_, err = io.Copy(&buf, fc) //nolint:gosec // plugin source is configured via environment variables by user
|
||||
if err != nil {
|
||||
log.Warn().Err(err).Str("name", file.Name).Msg("could not read file contents")
|
||||
continue
|
||||
|
@ -212,7 +224,6 @@ func (m *PluginManager) traverseAndUpsertPlugins(ctx context.Context, rc *zip.Re
|
|||
if p.Matches(plugin) {
|
||||
continue
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// If plugin name exists with a different spec, call update - otherwise call create.
|
||||
|
|
|
@ -30,6 +30,6 @@ var WireSet = wire.NewSet(
|
|||
func ProvidePluginManager(
|
||||
config *types.Config,
|
||||
pluginStore store.PluginStore,
|
||||
) *PluginManager {
|
||||
return NewPluginManager(config, pluginStore)
|
||||
) *Manager {
|
||||
return NewManager(config, pluginStore)
|
||||
}
|
||||
|
|
|
@ -40,7 +40,7 @@ import (
|
|||
func NewExecutionRunner(
|
||||
config *types.Config,
|
||||
client runnerclient.Client,
|
||||
pluginManager *plugin.PluginManager,
|
||||
pluginManager *plugin.Manager,
|
||||
m manager.ExecutionManager,
|
||||
) (*runtime2.Runner, error) {
|
||||
// For linux, containers need to have extra hosts set in order to interact with
|
||||
|
|
|
@ -35,7 +35,7 @@ var WireSet = wire.NewSet(
|
|||
func ProvideExecutionRunner(
|
||||
config *types.Config,
|
||||
client runnerclient.Client,
|
||||
pluginManager *plugin.PluginManager,
|
||||
pluginManager *plugin.Manager,
|
||||
manager manager.ExecutionManager,
|
||||
) (*runtime2.Runner, error) {
|
||||
return NewExecutionRunner(config, client, pluginManager, manager)
|
||||
|
|
|
@ -23,6 +23,8 @@ import (
|
|||
"github.com/harness/gitness/lock"
|
||||
"github.com/harness/gitness/types"
|
||||
"github.com/harness/gitness/types/enum"
|
||||
|
||||
"github.com/rs/zerolog/log"
|
||||
)
|
||||
|
||||
type queue struct {
|
||||
|
@ -32,7 +34,6 @@ type queue struct {
|
|||
ready chan struct{}
|
||||
paused bool
|
||||
interval time.Duration
|
||||
throttle int
|
||||
store store.StageStore
|
||||
workers map[*worker]struct{}
|
||||
ctx context.Context
|
||||
|
@ -53,7 +54,12 @@ func newQueue(store store.StageStore, lock lock.MutexManager) (*queue, error) {
|
|||
interval: time.Minute,
|
||||
ctx: context.Background(),
|
||||
}
|
||||
go q.start()
|
||||
go func() {
|
||||
if err := q.start(); err != nil {
|
||||
log.Err(err).Msg("queue start failed")
|
||||
}
|
||||
}()
|
||||
|
||||
return q, nil
|
||||
}
|
||||
|
||||
|
@ -103,11 +109,16 @@ func (q *queue) Request(ctx context.Context, params Filter) (*types.Stage, error
|
|||
}
|
||||
}
|
||||
|
||||
//nolint:gocognit // refactor if needed.
|
||||
func (q *queue) signal(ctx context.Context) error {
|
||||
if err := q.globMx.Lock(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
defer q.globMx.Unlock(ctx)
|
||||
defer func() {
|
||||
if err := q.globMx.Unlock(ctx); err != nil {
|
||||
log.Ctx(ctx).Err(err).Msg("failed to release global lock after signaling")
|
||||
}
|
||||
}()
|
||||
|
||||
q.Lock()
|
||||
count := len(q.workers)
|
||||
|
@ -137,14 +148,14 @@ func (q *queue) signal(ctx context.Context) error {
|
|||
// if the stage defines concurrency limits we
|
||||
// need to make sure those limits are not exceeded
|
||||
// before proceeding.
|
||||
if withinLimits(item, items) == false {
|
||||
if !withinLimits(item, items) {
|
||||
continue
|
||||
}
|
||||
|
||||
// if the system defines concurrency limits
|
||||
// per repository we need to make sure those limits
|
||||
// are not exceeded before proceeding.
|
||||
if shouldThrottle(item, items, item.LimitRepo) == true {
|
||||
if shouldThrottle(item, items, item.LimitRepo) {
|
||||
continue
|
||||
}
|
||||
|
||||
|
@ -182,11 +193,9 @@ func (q *queue) signal(ctx context.Context) error {
|
|||
}
|
||||
}
|
||||
|
||||
select {
|
||||
case w.channel <- item:
|
||||
delete(q.workers, w)
|
||||
break loop
|
||||
}
|
||||
w.channel <- item
|
||||
delete(q.workers, w)
|
||||
break loop
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
@ -198,9 +207,15 @@ func (q *queue) start() error {
|
|||
case <-q.ctx.Done():
|
||||
return q.ctx.Err()
|
||||
case <-q.ready:
|
||||
q.signal(q.ctx)
|
||||
if err := q.signal(q.ctx); err != nil {
|
||||
// don't return, only log error
|
||||
log.Ctx(q.ctx).Err(err).Msg("failed to signal on ready")
|
||||
}
|
||||
case <-time.After(q.interval):
|
||||
q.signal(q.ctx)
|
||||
if err := q.signal(q.ctx); err != nil {
|
||||
// don't return, only log error
|
||||
log.Ctx(q.ctx).Err(err).Msg("failed to signal on interval")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -216,10 +231,6 @@ type worker struct {
|
|||
channel chan *types.Stage
|
||||
}
|
||||
|
||||
type counter struct {
|
||||
counts map[string]int
|
||||
}
|
||||
|
||||
func checkLabels(a, b map[string]string) bool {
|
||||
if len(a) != len(b) {
|
||||
return false
|
||||
|
@ -287,7 +298,7 @@ func shouldThrottle(stage *types.Stage, siblings []*types.Stage, limit int) bool
|
|||
return count >= limit
|
||||
}
|
||||
|
||||
// matchResource is a helper function that returns
|
||||
// matchResource is a helper function that returns.
|
||||
func matchResource(kinda, typea, kindb, typeb string) bool {
|
||||
if kinda == "" {
|
||||
kinda = "pipeline"
|
||||
|
|
|
@ -56,7 +56,7 @@ type scheduler struct {
|
|||
*canceler
|
||||
}
|
||||
|
||||
// newScheduler provides an instance of a scheduler with cancel abilities
|
||||
// newScheduler provides an instance of a scheduler with cancel abilities.
|
||||
func newScheduler(stageStore store.StageStore, lock lock.MutexManager) (Scheduler, error) {
|
||||
q, err := newQueue(stageStore, lock)
|
||||
if err != nil {
|
||||
|
|
|
@ -140,7 +140,6 @@ func (d *Dag) detectCycles(name string, visited, recStack map[string]bool) bool
|
|||
} else if recStack[v] {
|
||||
return true
|
||||
}
|
||||
|
||||
}
|
||||
recStack[name] = false
|
||||
return false
|
||||
|
|
|
@ -15,8 +15,6 @@
|
|||
package triggerer
|
||||
|
||||
import (
|
||||
"strings"
|
||||
|
||||
"github.com/drone/drone-yaml/yaml"
|
||||
)
|
||||
|
||||
|
@ -36,14 +34,6 @@ func skipAction(document *yaml.Pipeline, action string) bool {
|
|||
return !document.Trigger.Action.Match(action)
|
||||
}
|
||||
|
||||
func skipInstance(document *yaml.Pipeline, instance string) bool {
|
||||
return !document.Trigger.Instance.Match(instance)
|
||||
}
|
||||
|
||||
func skipTarget(document *yaml.Pipeline, env string) bool {
|
||||
return !document.Trigger.Target.Match(env)
|
||||
}
|
||||
|
||||
func skipRepo(document *yaml.Pipeline, repo string) bool {
|
||||
return !document.Trigger.Repo.Match(repo)
|
||||
}
|
||||
|
@ -51,15 +41,3 @@ func skipRepo(document *yaml.Pipeline, repo string) bool {
|
|||
func skipCron(document *yaml.Pipeline, cron string) bool {
|
||||
return !document.Trigger.Cron.Match(cron)
|
||||
}
|
||||
|
||||
func skipMessageEval(str string) bool {
|
||||
lower := strings.ToLower(str)
|
||||
switch {
|
||||
case strings.Contains(lower, "[ci skip]"),
|
||||
strings.Contains(lower, "[skip ci]"),
|
||||
strings.Contains(lower, "***no_ci***"):
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
|
|
@ -82,7 +82,7 @@ type triggerer struct {
|
|||
stageStore store.StageStore
|
||||
db *sqlx.DB
|
||||
pipelineStore store.PipelineStore
|
||||
fileService file.FileService
|
||||
fileService file.Service
|
||||
scheduler scheduler.Scheduler
|
||||
repoStore store.RepoStore
|
||||
}
|
||||
|
@ -95,8 +95,8 @@ func New(
|
|||
db *sqlx.DB,
|
||||
repoStore store.RepoStore,
|
||||
scheduler scheduler.Scheduler,
|
||||
fileService file.FileService,
|
||||
) *triggerer {
|
||||
fileService file.Service,
|
||||
) Triggerer {
|
||||
return &triggerer{
|
||||
executionStore: executionStore,
|
||||
checkStore: checkStore,
|
||||
|
@ -109,6 +109,7 @@ func New(
|
|||
}
|
||||
}
|
||||
|
||||
//nolint:gocognit,gocyclo,cyclop //TODO: Refactor @Vistaar
|
||||
func (t *triggerer) Trigger(
|
||||
ctx context.Context,
|
||||
pipeline *types.Pipeline,
|
||||
|
@ -180,6 +181,7 @@ func (t *triggerer) Trigger(
|
|||
// and creating stages accordingly. For V1 YAML - for now we can just parse the stages
|
||||
// and create them sequentially.
|
||||
stages := []*types.Stage{}
|
||||
//nolint:nestif // refactor if needed
|
||||
if !isV1Yaml(file.Data) {
|
||||
manifest, err := yaml.ParseString(string(file.Data))
|
||||
if err != nil {
|
||||
|
@ -208,22 +210,23 @@ func (t *triggerer) Trigger(
|
|||
if name == "" {
|
||||
name = "default"
|
||||
}
|
||||
node := dag.Add(pipeline.Name, pipeline.DependsOn...)
|
||||
node := dag.Add(name, pipeline.DependsOn...)
|
||||
node.Skip = true
|
||||
|
||||
if skipBranch(pipeline, base.Target) {
|
||||
log.Info().Str("pipeline", pipeline.Name).Msg("trigger: skipping pipeline, does not match branch")
|
||||
} else if skipEvent(pipeline, event) {
|
||||
log.Info().Str("pipeline", pipeline.Name).Msg("trigger: skipping pipeline, does not match event")
|
||||
} else if skipAction(pipeline, string(base.Action)) {
|
||||
log.Info().Str("pipeline", pipeline.Name).Msg("trigger: skipping pipeline, does not match action")
|
||||
} else if skipRef(pipeline, base.Ref) {
|
||||
log.Info().Str("pipeline", pipeline.Name).Msg("trigger: skipping pipeline, does not match ref")
|
||||
} else if skipRepo(pipeline, repo.Path) {
|
||||
log.Info().Str("pipeline", pipeline.Name).Msg("trigger: skipping pipeline, does not match repo")
|
||||
} else if skipCron(pipeline, base.Cron) {
|
||||
log.Info().Str("pipeline", pipeline.Name).Msg("trigger: skipping pipeline, does not match cron job")
|
||||
} else {
|
||||
switch {
|
||||
case skipBranch(pipeline, base.Target):
|
||||
log.Info().Str("pipeline", name).Msg("trigger: skipping pipeline, does not match branch")
|
||||
case skipEvent(pipeline, event):
|
||||
log.Info().Str("pipeline", name).Msg("trigger: skipping pipeline, does not match event")
|
||||
case skipAction(pipeline, string(base.Action)):
|
||||
log.Info().Str("pipeline", name).Msg("trigger: skipping pipeline, does not match action")
|
||||
case skipRef(pipeline, base.Ref):
|
||||
log.Info().Str("pipeline", name).Msg("trigger: skipping pipeline, does not match ref")
|
||||
case skipRepo(pipeline, repo.Path):
|
||||
log.Info().Str("pipeline", name).Msg("trigger: skipping pipeline, does not match repo")
|
||||
case skipCron(pipeline, base.Cron):
|
||||
log.Info().Str("pipeline", name).Msg("trigger: skipping pipeline, does not match cron job")
|
||||
default:
|
||||
matched = append(matched, pipeline)
|
||||
node.Skip = false
|
||||
}
|
||||
|
@ -235,6 +238,7 @@ func (t *triggerer) Trigger(
|
|||
|
||||
if len(matched) == 0 {
|
||||
log.Info().Msg("trigger: skipping execution, no matching pipelines")
|
||||
//nolint:nilnil // on purpose
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
|
@ -245,8 +249,6 @@ func (t *triggerer) Trigger(
|
|||
onFailure = false
|
||||
}
|
||||
|
||||
now := time.Now().UnixMilli()
|
||||
|
||||
stage := &types.Stage{
|
||||
RepoID: repo.ID,
|
||||
Number: int64(i + 1),
|
||||
|
@ -354,6 +356,8 @@ func trunc(s string, i int) string {
|
|||
// if we are unable to do so or the yaml contains something unexpected.
|
||||
// Currently, all the stages will be executed one after the other on completion.
|
||||
// Once we have depends on in v1, this will be changed to use the DAG.
|
||||
//
|
||||
//nolint:gocognit // refactor if needed.
|
||||
func parseV1Stages(data []byte, repo *types.Repository, execution *types.Execution) ([]*types.Stage, error) {
|
||||
stages := []*types.Stage{}
|
||||
// For V1 YAML, just go through the YAML and create stages serially for now
|
||||
|
@ -425,7 +429,6 @@ func parseV1Stages(data []byte, repo *types.Repository, execution *types.Executi
|
|||
default:
|
||||
return nil, fmt.Errorf("only CI stage supported in v1 at the moment")
|
||||
}
|
||||
|
||||
}
|
||||
default:
|
||||
return nil, fmt.Errorf("unknown yaml: %w", err)
|
||||
|
@ -433,7 +436,7 @@ func parseV1Stages(data []byte, repo *types.Repository, execution *types.Executi
|
|||
return stages, nil
|
||||
}
|
||||
|
||||
// Checks whether YAML is V1 Yaml or drone Yaml
|
||||
// Checks whether YAML is V1 Yaml or drone Yaml.
|
||||
func isV1Yaml(data []byte) bool {
|
||||
// if we are dealing with the legacy drone yaml, use
|
||||
// the legacy drone engine.
|
||||
|
|
|
@ -35,7 +35,7 @@ func ProvideTriggerer(
|
|||
stageStore store.StageStore,
|
||||
db *sqlx.DB,
|
||||
pipelineStore store.PipelineStore,
|
||||
fileService file.FileService,
|
||||
fileService file.Service,
|
||||
scheduler scheduler.Scheduler,
|
||||
repoStore store.RepoStore,
|
||||
) Triggerer {
|
||||
|
|
|
@ -12,9 +12,6 @@
|
|||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
// Package request provides http handlers for serving the
|
||||
// web applications and API endpoints.
|
||||
|
||||
package request
|
||||
|
||||
import (
|
||||
|
|
|
@ -12,9 +12,6 @@
|
|||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
// Package router provides http handlers for serving the
|
||||
// web applicationa and API endpoints.
|
||||
|
||||
package router
|
||||
|
||||
import (
|
||||
|
|
|
@ -67,7 +67,7 @@ func ProvideRouter(
|
|||
// only use host name to identify git traffic if it differs from api hostname.
|
||||
// TODO: Can we make this even more flexible - aka use the full base urls to route traffic?
|
||||
gitRoutingHost := ""
|
||||
if strings.ToLower(gitHostname) != strings.ToLower(apiHostname) {
|
||||
if !strings.EqualFold(gitHostname, apiHostname) {
|
||||
gitRoutingHost = gitHostname
|
||||
}
|
||||
|
||||
|
|
|
@ -33,8 +33,9 @@ import (
|
|||
const (
|
||||
pathCreateRepo = "/v1/accounts/%s/orgs/%s/projects/%s/repos"
|
||||
pathDeleteRepo = "/v1/accounts/%s/orgs/%s/projects/%s/repos/%s"
|
||||
headerApiKey = "X-Api-Key"
|
||||
routingId = "routingId"
|
||||
//nolint:gosec // wrong flagging
|
||||
headerAPIKey = "X-Api-Key"
|
||||
routingID = "routingId"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -52,25 +53,25 @@ type client struct {
|
|||
baseURL string
|
||||
httpClient http.Client
|
||||
|
||||
accountId string
|
||||
orgId string
|
||||
projectId string
|
||||
accountID string
|
||||
orgID string
|
||||
projectID string
|
||||
|
||||
token string
|
||||
}
|
||||
|
||||
// newClient creates a new harness Client for interacting with the platforms APIs.
|
||||
func newClient(baseURL string, accountID string, orgId string, projectId string, token string) (*client, error) {
|
||||
func newClient(baseURL string, accountID string, orgID string, projectID string, token string) (*client, error) {
|
||||
if baseURL == "" {
|
||||
return nil, fmt.Errorf("baseUrl required")
|
||||
}
|
||||
if accountID == "" {
|
||||
return nil, fmt.Errorf("accountID required")
|
||||
}
|
||||
if orgId == "" {
|
||||
if orgID == "" {
|
||||
return nil, fmt.Errorf("orgId required")
|
||||
}
|
||||
if projectId == "" {
|
||||
if projectID == "" {
|
||||
return nil, fmt.Errorf("projectId required")
|
||||
}
|
||||
if token == "" {
|
||||
|
@ -79,22 +80,29 @@ func newClient(baseURL string, accountID string, orgId string, projectId string,
|
|||
|
||||
return &client{
|
||||
baseURL: baseURL,
|
||||
accountId: accountID,
|
||||
orgId: orgId,
|
||||
projectId: projectId,
|
||||
accountID: accountID,
|
||||
orgID: orgID,
|
||||
projectID: projectID,
|
||||
token: token,
|
||||
httpClient: http.Client{
|
||||
Transport: &http.Transport{
|
||||
TLSClientConfig: &tls.Config{
|
||||
InsecureSkipVerify: false,
|
||||
MinVersion: tls.VersionTLS12,
|
||||
},
|
||||
},
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
func newHarnessCodeClient(baseUrl string, accountID string, orgId string, projectId string, token string) (*harnessCodeClient, error) {
|
||||
client, err := newClient(baseUrl, accountID, orgId, projectId, token)
|
||||
func newHarnessCodeClient(
|
||||
baseURL string,
|
||||
accountID string,
|
||||
orgID string,
|
||||
projectID string,
|
||||
token string,
|
||||
) (*harnessCodeClient, error) {
|
||||
client, err := newClient(baseURL, accountID, orgID, projectID, token)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -104,18 +112,22 @@ func newHarnessCodeClient(baseUrl string, accountID string, orgId string, projec
|
|||
}
|
||||
|
||||
func (c *harnessCodeClient) CreateRepo(ctx context.Context, input repo.CreateInput) (*types.Repository, error) {
|
||||
path := fmt.Sprintf(pathCreateRepo, c.client.accountId, c.client.orgId, c.client.projectId)
|
||||
path := fmt.Sprintf(pathCreateRepo, c.client.accountID, c.client.orgID, c.client.projectID)
|
||||
bodyBytes, err := json.Marshal(input)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to serialize body: %w", err)
|
||||
}
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodPost, appendPath(c.client.baseURL, path), bytes.NewBuffer(bodyBytes))
|
||||
req, err := http.NewRequestWithContext(
|
||||
ctx,
|
||||
http.MethodPost,
|
||||
appendPath(c.client.baseURL, path), bytes.NewBuffer(bodyBytes),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to create new http request : %w", err)
|
||||
}
|
||||
|
||||
q := map[string]string{routingId: c.client.accountId}
|
||||
q := map[string]string{routingID: c.client.accountID}
|
||||
addQueryParams(req, q)
|
||||
req.Header.Add("Content-Type", "application/json")
|
||||
req.ContentLength = int64(len(bodyBytes))
|
||||
|
@ -152,14 +164,14 @@ func addQueryParams(req *http.Request, params map[string]string) {
|
|||
}
|
||||
}
|
||||
|
||||
func (c *harnessCodeClient) DeleteRepo(ctx context.Context, repoUid string) error {
|
||||
path := fmt.Sprintf(pathDeleteRepo, c.client.accountId, c.client.orgId, c.client.projectId, repoUid)
|
||||
func (c *harnessCodeClient) DeleteRepo(ctx context.Context, repoUID string) error {
|
||||
path := fmt.Sprintf(pathDeleteRepo, c.client.accountID, c.client.orgID, c.client.projectID, repoUID)
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodDelete, appendPath(c.client.baseURL, path), nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to create new http request : %w", err)
|
||||
}
|
||||
|
||||
q := map[string]string{routingId: c.client.accountId}
|
||||
q := map[string]string{routingID: c.client.accountID}
|
||||
addQueryParams(req, q)
|
||||
resp, err := c.client.Do(req)
|
||||
if err != nil {
|
||||
|
@ -187,7 +199,7 @@ func (c *client) Do(r *http.Request) (*http.Response, error) {
|
|||
|
||||
// addAuthHeader adds the Authorization header to the request.
|
||||
func addAuthHeader(req *http.Request, token string) {
|
||||
req.Header.Add(headerApiKey, token)
|
||||
req.Header.Add(headerAPIKey, token)
|
||||
}
|
||||
|
||||
func unmarshalResponse(resp *http.Response, data interface{}) error {
|
||||
|
|
|
@ -25,16 +25,16 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/harness/gitness/encrypt"
|
||||
"github.com/harness/gitness/internal/api/controller/repo"
|
||||
"github.com/harness/gitness/internal/sse"
|
||||
"github.com/harness/gitness/types/enum"
|
||||
"github.com/rs/zerolog/log"
|
||||
|
||||
"github.com/harness/gitness/gitrpc"
|
||||
"github.com/harness/gitness/internal/api/controller/repo"
|
||||
"github.com/harness/gitness/internal/services/job"
|
||||
"github.com/harness/gitness/internal/sse"
|
||||
"github.com/harness/gitness/internal/store"
|
||||
gitnessurl "github.com/harness/gitness/internal/url"
|
||||
"github.com/harness/gitness/types"
|
||||
"github.com/harness/gitness/types/enum"
|
||||
|
||||
"github.com/rs/zerolog/log"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -65,7 +65,7 @@ type Input struct {
|
|||
}
|
||||
|
||||
type HarnessCodeInfo struct {
|
||||
AccountId string `json:"account_id"`
|
||||
AccountID string `json:"account_id"`
|
||||
ProjectIdentifier string `json:"project_identifier"`
|
||||
OrgIdentifier string `json:"org_identifier"`
|
||||
Token string `json:"token"`
|
||||
|
@ -76,8 +76,8 @@ var _ job.Handler = (*Repository)(nil)
|
|||
const (
|
||||
exportJobMaxRetries = 1
|
||||
exportJobMaxDuration = 45 * time.Minute
|
||||
exportRepoJobUid = "export_repo_%d"
|
||||
exportSpaceJobUid = "export_space_%d"
|
||||
exportRepoJobUID = "export_repo_%d"
|
||||
exportSpaceJobUID = "export_space_%d"
|
||||
jobType = "repository_export"
|
||||
)
|
||||
|
||||
|
@ -89,24 +89,23 @@ func (r *Repository) Register(executor *job.Executor) error {
|
|||
|
||||
func (r *Repository) RunManyForSpace(
|
||||
ctx context.Context,
|
||||
spaceId int64,
|
||||
spaceID int64,
|
||||
repos []*types.Repository,
|
||||
harnessCodeInfo *HarnessCodeInfo,
|
||||
) error {
|
||||
jobGroupId := getJobGroupId(spaceId)
|
||||
|
||||
jobs, err := r.scheduler.GetJobProgressForGroup(ctx, jobGroupId)
|
||||
jobGroupID := getJobGroupID(spaceID)
|
||||
jobs, err := r.scheduler.GetJobProgressForGroup(ctx, jobGroupID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot get job progress before starting. %w", err)
|
||||
}
|
||||
|
||||
if len(jobs) >= 0 {
|
||||
if len(jobs) > 0 {
|
||||
err = checkJobAlreadyRunning(jobs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
n, err := r.scheduler.PurgeJobsByGroupId(ctx, jobGroupId)
|
||||
n, err := r.scheduler.PurgeJobsByGroupID(ctx, jobGroupID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -133,7 +132,7 @@ func (r *Repository) RunManyForSpace(
|
|||
return fmt.Errorf("failed to encrypt job input: %w", err)
|
||||
}
|
||||
|
||||
jobUID := fmt.Sprintf(exportRepoJobUid, repository.ID)
|
||||
jobUID := fmt.Sprintf(exportRepoJobUID, repository.ID)
|
||||
|
||||
jobDefinitions[i] = job.Definition{
|
||||
UID: jobUID,
|
||||
|
@ -144,7 +143,7 @@ func (r *Repository) RunManyForSpace(
|
|||
}
|
||||
}
|
||||
|
||||
return r.scheduler.RunJobs(ctx, jobGroupId, jobDefinitions)
|
||||
return r.scheduler.RunJobs(ctx, jobGroupID, jobDefinitions)
|
||||
}
|
||||
|
||||
func checkJobAlreadyRunning(jobs []types.JobProgress) error {
|
||||
|
@ -159,8 +158,8 @@ func checkJobAlreadyRunning(jobs []types.JobProgress) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func getJobGroupId(spaceId int64) string {
|
||||
return fmt.Sprintf(exportSpaceJobUid, spaceId)
|
||||
func getJobGroupID(spaceID int64) string {
|
||||
return fmt.Sprintf(exportSpaceJobUID, spaceID)
|
||||
}
|
||||
|
||||
// Handle is repository export background job handler.
|
||||
|
@ -172,7 +171,7 @@ func (r *Repository) Handle(ctx context.Context, data string, _ job.ProgressRepo
|
|||
harnessCodeInfo := input.HarnessCodeInfo
|
||||
client, err := newHarnessCodeClient(
|
||||
harnessCodeAPIURLRaw,
|
||||
harnessCodeInfo.AccountId,
|
||||
harnessCodeInfo.AccountID,
|
||||
harnessCodeInfo.OrgIdentifier,
|
||||
harnessCodeInfo.ProjectIdentifier,
|
||||
harnessCodeInfo.Token,
|
||||
|
@ -199,14 +198,14 @@ func (r *Repository) Handle(ctx context.Context, data string, _ job.ProgressRepo
|
|||
return "", err
|
||||
}
|
||||
|
||||
urlWithToken, err := modifyUrl(remoteRepo.GitURL, harnessCodeInfo.Token)
|
||||
urlWithToken, err := modifyURL(remoteRepo.GitURL, harnessCodeInfo.Token)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
err = r.git.PushRemote(ctx, &gitrpc.PushRemoteParams{
|
||||
ReadParams: gitrpc.ReadParams{RepoUID: repository.GitUID},
|
||||
RemoteUrl: urlWithToken,
|
||||
RemoteURL: urlWithToken,
|
||||
})
|
||||
if err != nil && !strings.Contains(err.Error(), "empty") {
|
||||
errDelete := client.DeleteRepo(ctx, remoteRepo.UID)
|
||||
|
@ -253,8 +252,8 @@ func (r *Repository) getJobInput(data string) (Input, error) {
|
|||
}
|
||||
|
||||
func (r *Repository) GetProgressForSpace(ctx context.Context, spaceID int64) ([]types.JobProgress, error) {
|
||||
spaceId := getJobGroupId(spaceID)
|
||||
progress, err := r.scheduler.GetJobProgressForGroup(ctx, spaceId)
|
||||
groupID := getJobGroupID(spaceID)
|
||||
progress, err := r.scheduler.GetJobProgressForGroup(ctx, groupID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get job progress for group: %w", err)
|
||||
}
|
||||
|
@ -266,14 +265,13 @@ func (r *Repository) GetProgressForSpace(ctx context.Context, spaceID int64) ([]
|
|||
return progress, nil
|
||||
}
|
||||
|
||||
func modifyUrl(u string, token string) (string, error) {
|
||||
parsedUrl, err := url.Parse(u)
|
||||
func modifyURL(u string, token string) (string, error) {
|
||||
parsedURL, err := url.Parse(u)
|
||||
if err != nil {
|
||||
fmt.Println("Error parsing URL:", err)
|
||||
return "", err
|
||||
return "", fmt.Errorf("failed to parse URL '%s': %w", u, err)
|
||||
}
|
||||
|
||||
// Set the username and password in the URL
|
||||
parsedUrl.User = url.UserPassword("token", token)
|
||||
return parsedUrl.String(), nil
|
||||
parsedURL.User = url.UserPassword("token", token)
|
||||
return parsedURL.String(), nil
|
||||
}
|
||||
|
|
|
@ -15,13 +15,14 @@
|
|||
package exporter
|
||||
|
||||
import (
|
||||
"github.com/google/wire"
|
||||
"github.com/harness/gitness/encrypt"
|
||||
"github.com/harness/gitness/gitrpc"
|
||||
"github.com/harness/gitness/internal/services/job"
|
||||
"github.com/harness/gitness/internal/sse"
|
||||
"github.com/harness/gitness/internal/store"
|
||||
"github.com/harness/gitness/internal/url"
|
||||
|
||||
"github.com/google/wire"
|
||||
)
|
||||
|
||||
var WireSet = wire.NewSet(
|
||||
|
|
|
@ -51,7 +51,7 @@ func (r *Repository) processPipelines(ctx context.Context,
|
|||
return err
|
||||
}
|
||||
|
||||
pipelineFiles := r.convertPipelines(ctx, principal, repo)
|
||||
pipelineFiles := r.convertPipelines(ctx, repo)
|
||||
if len(pipelineFiles) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
@ -144,7 +144,6 @@ func (r *Repository) processPipelines(ctx context.Context,
|
|||
// convertPipelines converts pipelines found in the repository.
|
||||
// Note: For GitHub actions, there can be multiple.
|
||||
func (r *Repository) convertPipelines(ctx context.Context,
|
||||
principal *types.Principal,
|
||||
repo *types.Repository,
|
||||
) []pipelineFile {
|
||||
const maxSize = 65536
|
||||
|
@ -189,6 +188,7 @@ func (r *Repository) convertPipelines(ctx context.Context,
|
|||
|
||||
filesYML := match(".github/workflows", "*.yml")
|
||||
filesYAML := match(".github/workflows", "*.yaml")
|
||||
//nolint:gocritic // intended usage
|
||||
files := append(filesYML, filesYAML...)
|
||||
converted := convertPipelineFiles(ctx, files, func() pipelineConverter { return github.New() })
|
||||
if len(converted) > 0 {
|
||||
|
|
|
@ -161,7 +161,11 @@ func LoadRepositoryFromProvider(ctx context.Context, provider Provider, repoSlug
|
|||
}, nil
|
||||
}
|
||||
|
||||
func LoadRepositoriesFromProviderSpace(ctx context.Context, provider Provider, spaceSlug string) ([]RepositoryInfo, error) {
|
||||
func LoadRepositoriesFromProviderSpace(
|
||||
ctx context.Context,
|
||||
provider Provider,
|
||||
spaceSlug string,
|
||||
) ([]RepositoryInfo, error) {
|
||||
scmClient, err := getClient(provider, true)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -223,10 +227,10 @@ func convertSCMError(provider Provider, slug string, r *scm.Response, err error)
|
|||
if provider.Host != "" {
|
||||
return usererror.BadRequestf("failed to make HTTP request to %s (host=%s): %s",
|
||||
provider.Type, provider.Host, err)
|
||||
} else {
|
||||
return usererror.BadRequestf("failed to make HTTP request to %s: %s",
|
||||
provider.Type, err)
|
||||
}
|
||||
|
||||
return usererror.BadRequestf("failed to make HTTP request to %s: %s",
|
||||
provider.Type, err)
|
||||
}
|
||||
|
||||
switch r.Status {
|
||||
|
|
|
@ -177,6 +177,8 @@ func (r *Repository) getJobInput(data string) (Input, error) {
|
|||
}
|
||||
|
||||
// Handle is repository import background job handler.
|
||||
//
|
||||
//nolint:gocognit // refactor if needed.
|
||||
func (r *Repository) Handle(ctx context.Context, data string, _ job.ProgressReporter) (string, error) {
|
||||
systemPrincipal := bootstrap.NewSystemServiceSession().Principal
|
||||
|
||||
|
|
|
@ -22,9 +22,9 @@ import (
|
|||
"github.com/harness/gitness/internal/store"
|
||||
"github.com/harness/gitness/internal/url"
|
||||
"github.com/harness/gitness/types"
|
||||
"github.com/jmoiron/sqlx"
|
||||
|
||||
"github.com/google/wire"
|
||||
"github.com/jmoiron/sqlx"
|
||||
)
|
||||
|
||||
var WireSet = wire.NewSet(
|
||||
|
|
|
@ -53,7 +53,7 @@ type Handler interface {
|
|||
Handle(ctx context.Context, input string, fn ProgressReporter) (result string, err error)
|
||||
}
|
||||
|
||||
var noHandlerDefinedError = errors.New("no handler registered for the job type")
|
||||
var errNoHandlerDefined = errors.New("no handler registered for the job type")
|
||||
|
||||
// NewExecutor creates new Executor.
|
||||
func NewExecutor(jobStore store.JobStore, publisher pubsub.Publisher) *Executor {
|
||||
|
@ -113,7 +113,7 @@ func (e *Executor) exec(
|
|||
|
||||
exec, ok := e.handlerMap[jobType]
|
||||
if !ok {
|
||||
return "", noHandlerDefinedError
|
||||
return "", errNoHandlerDefined
|
||||
}
|
||||
|
||||
// progressReporter is the function with which the job can update its progress.
|
||||
|
|
|
@ -81,6 +81,8 @@ func NewScheduler(
|
|||
|
||||
// Run runs the background job scheduler.
|
||||
// It's a blocking call. It blocks until the provided context is done.
|
||||
//
|
||||
//nolint:gocognit // refactor if needed.
|
||||
func (s *Scheduler) Run(ctx context.Context) error {
|
||||
if s.done != nil {
|
||||
return errors.New("already started")
|
||||
|
@ -560,6 +562,8 @@ func (s *Scheduler) doExec(ctx context.Context,
|
|||
}
|
||||
|
||||
// postExec updates the provided types.Job after execution and reschedules it if necessary.
|
||||
//
|
||||
//nolint:gocognit // refactor if needed.
|
||||
func postExec(job *types.Job, resultData, resultErr string) {
|
||||
// Proceed with the update of the job if it's in the running state or
|
||||
// if it's marked as canceled but has succeeded nonetheless.
|
||||
|
@ -585,6 +589,7 @@ func postExec(job *types.Job, resultData, resultErr string) {
|
|||
}
|
||||
|
||||
// Reschedule recurring jobs
|
||||
//nolint:nestif // refactor if needed
|
||||
if job.IsRecurring {
|
||||
if resultErr == "" {
|
||||
job.ConsecutiveFailures = 0
|
||||
|
@ -634,7 +639,7 @@ func (s *Scheduler) GetJobProgressForGroup(ctx context.Context, jobGroupUID stri
|
|||
return mapToProgressMany(job), nil
|
||||
}
|
||||
|
||||
func (s *Scheduler) PurgeJobsByGroupId(ctx context.Context, jobGroupID string) (int64, error) {
|
||||
func (s *Scheduler) PurgeJobsByGroupID(ctx context.Context, jobGroupID string) (int64, error) {
|
||||
n, err := s.store.DeleteByGroupID(ctx, jobGroupID)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("failed to delete jobs by group id=%s: %w", jobGroupID, err)
|
||||
|
|
|
@ -54,15 +54,19 @@ type Collector struct {
|
|||
scheduler *job.Scheduler
|
||||
}
|
||||
|
||||
func (c *Collector) Register(ctx context.Context) {
|
||||
func (c *Collector) Register(ctx context.Context) error {
|
||||
if !c.enabled {
|
||||
return
|
||||
return nil
|
||||
}
|
||||
c.scheduler.AddRecurring(ctx, jobType, jobType, "0 0 * * *", time.Minute)
|
||||
err := c.scheduler.AddRecurring(ctx, jobType, jobType, "0 0 * * *", time.Minute)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to register recurring job for collector: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Collector) Handle(ctx context.Context, _ string, _ job.ProgressReporter) (string, error) {
|
||||
|
||||
if !c.enabled {
|
||||
return "", nil
|
||||
}
|
||||
|
@ -121,7 +125,7 @@ func (c *Collector) Handle(ctx context.Context, _ string, _ job.ProgressReporter
|
|||
}
|
||||
|
||||
endpoint := fmt.Sprintf("%s?api_key=%s", c.endpoint, c.token)
|
||||
req, err := http.NewRequest("POST", endpoint, buf)
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint, buf)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to create a request for metric data to endpoint %s: %w", endpoint, err)
|
||||
}
|
||||
|
|
|
@ -15,10 +15,11 @@
|
|||
package metric
|
||||
|
||||
import (
|
||||
"github.com/google/wire"
|
||||
"github.com/harness/gitness/internal/services/job"
|
||||
"github.com/harness/gitness/internal/store"
|
||||
"github.com/harness/gitness/types"
|
||||
|
||||
"github.com/google/wire"
|
||||
)
|
||||
|
||||
var WireSet = wire.NewSet(
|
||||
|
|
|
@ -28,7 +28,7 @@ import (
|
|||
// handleFileViewedOnBranchUpdate handles pull request Branch Updated events.
|
||||
// It marks existing file reviews as obsolete for the PR depending on the change to the file.
|
||||
//
|
||||
// The major reason of this handler is to allow detect changes that occured to a file since last reviewed,
|
||||
// The major reason of this handler is to allow detect changes that occurred to a file since last reviewed,
|
||||
// even if the file content is the same - e.g. file got deleted and readded with the same content.
|
||||
func (s *Service) handleFileViewedOnBranchUpdate(ctx context.Context,
|
||||
event *events.Event[*pullreqevents.BranchUpdatedPayload],
|
||||
|
@ -71,7 +71,7 @@ func (s *Service) handleFileViewedOnBranchUpdate(ctx context.Context,
|
|||
obsoletePaths = append(obsoletePaths, fileDiff.OldPath, fileDiff.Path)
|
||||
case gitrpc.FileDiffStatusModified:
|
||||
obsoletePaths = append(obsoletePaths, fileDiff.Path)
|
||||
default:
|
||||
case gitrpc.FileDiffStatusUndefined:
|
||||
// other cases we don't care
|
||||
}
|
||||
}
|
||||
|
|
|
@ -66,7 +66,7 @@ type Service struct {
|
|||
repoStore store.RepoStore
|
||||
pipelineStore store.PipelineStore
|
||||
triggerSvc triggerer.Triggerer
|
||||
commitSvc commit.CommitService
|
||||
commitSvc commit.Service
|
||||
}
|
||||
|
||||
func New(
|
||||
|
@ -77,7 +77,7 @@ func New(
|
|||
repoStore store.RepoStore,
|
||||
pipelineStore store.PipelineStore,
|
||||
triggerSvc triggerer.Triggerer,
|
||||
commitSvc commit.CommitService,
|
||||
commitSvc commit.Service,
|
||||
gitReaderFactory *events.ReaderFactory[*gitevents.Reader],
|
||||
pullreqEvReaderFactory *events.ReaderFactory[*pullreqevents.Reader],
|
||||
) (*Service, error) {
|
||||
|
|
|
@ -35,7 +35,7 @@ func ProvideService(
|
|||
ctx context.Context,
|
||||
config Config,
|
||||
triggerStore store.TriggerStore,
|
||||
commitSvc commit.CommitService,
|
||||
commitSvc commit.Service,
|
||||
pullReqStore store.PullReqStore,
|
||||
repoStore store.RepoStore,
|
||||
pipelineStore store.PipelineStore,
|
||||
|
|
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in New Issue