feat: [CODE-888]: minor export improvements

jobatzil/rename
Abhinav Singh 2023-09-18 00:27:55 -07:00
parent 1b79309f4a
commit c7971bbbb1
6 changed files with 76 additions and 2 deletions

View File

@ -30,6 +30,7 @@ var (
ErrNotFound = fmt.Errorf("not found")
ErrBadRequest = fmt.Errorf("bad request")
ErrInternal = fmt.Errorf("internal error")
ErrDuplicate = fmt.Errorf("resource already exists")
)
type HarnessCodeClient struct {
@ -203,6 +204,8 @@ func mapStatusCodeToError(statusCode int) error {
return ErrNotFound
case statusCode == 400:
return ErrBadRequest
case statusCode == 409:
return ErrDuplicate
case statusCode >= 400:
return fmt.Errorf("received client side error status code %d", statusCode)
case statusCode >= 300:

View File

@ -8,12 +8,14 @@ import (
"context"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"github.com/harness/gitness/encrypt"
"github.com/harness/gitness/internal/api/controller/repo"
"github.com/harness/gitness/internal/sse"
"github.com/harness/gitness/types/enum"
"github.com/rs/zerolog/log"
"golang.org/x/exp/slices"
"net/url"
"strings"
"time"
@ -56,9 +58,10 @@ const (
exportJobMaxDuration = 45 * time.Minute
exportRepoJobUid = "export_repo_%d"
exportSpaceJobUid = "export_space_%d"
jobType = "repository_export"
)
const jobType = "repository_export"
var ErrJobRunning = errors.New("an export job is already running")
func (r *Repository) Register(executor *job.Executor) error {
return executor.Register(jobType, r)
@ -66,6 +69,23 @@ func (r *Repository) Register(executor *job.Executor) error {
func (r *Repository) RunMany(ctx context.Context, spaceId int64, harnessCodeInfo *HarnessCodeInfo, repos []*types.Repository) error {
jobGroupId := getJobGroupId(spaceId)
jobs, err := r.scheduler.GetJobProgressForGroup(ctx, jobGroupId)
if err != nil {
return fmt.Errorf("cannot get job progress before starting. %w", err)
}
err = checkJobAlreadyRunning(jobs)
if err != nil {
return err
}
n, err := r.scheduler.PurgeJobsByGroupId(ctx, jobGroupId)
if err != nil {
return err
}
log.Ctx(ctx).Info().Msgf("deleted %d old jobs", n)
jobDefinitions := make([]job.Definition, len(repos))
for i, repository := range repos {
repoJobData := Input{
@ -100,6 +120,17 @@ func (r *Repository) RunMany(ctx context.Context, spaceId int64, harnessCodeInfo
return r.scheduler.RunJobs(ctx, jobGroupId, jobDefinitions)
}
func checkJobAlreadyRunning(jobs []types.JobProgress) error {
if jobs != nil {
for _, j := range jobs {
if !slices.Contains(enum.GetCompletedJobState(), j.State) {
return ErrJobRunning
}
}
}
return nil
}
func getJobGroupId(spaceId int64) string {
return fmt.Sprintf(exportSpaceJobUid, spaceId)
}
@ -156,7 +187,6 @@ func (r *Repository) Handle(ctx context.Context, data string, _ job.ProgressRepo
}
log.Info().Msgf("completed repository export for repo", repository.UID)
publishSSE(ctx, r, repository)
return "", nil

View File

@ -624,6 +624,14 @@ func (s *Scheduler) GetJobProgressForGroup(ctx context.Context, jobGroupUID stri
return mapToProgressMany(job), nil
}
func (s *Scheduler) PurgeJobsByGroupId(ctx context.Context, jobGroupUID string) (int64, error) {
n, err := s.store.DeleteByGroupID(ctx, jobGroupUID)
if err != nil {
return 0, err
}
return n, nil
}
func mapToProgressMany(jobs []*types.Job) []types.JobProgress {
if jobs == nil {
return nil

View File

@ -448,6 +448,9 @@ type (
// ListByGroupID fetches all jobs for a group id
ListByGroupID(ctx context.Context, groupId string) ([]*types.Job, error)
// DeleteByGroupID deletes all jobs for a group id
DeleteByGroupID(ctx context.Context, groupId string) (int64, error)
// Create is used to create a new job.
Create(ctx context.Context, job *types.Job) error

View File

@ -91,6 +91,32 @@ func (s *JobStore) ListByGroupID(ctx context.Context, groupId string) ([]*types.
return result, nil
}
// DeleteByGroupID deletes all jobs for a group id
func (s *JobStore) DeleteByGroupID(ctx context.Context, groupId string) (int64, error) {
stmt := database.Builder.
Delete("jobs").
Where("(job_group_id = ?)", groupId)
sql, args, err := stmt.ToSql()
if err != nil {
return 0, fmt.Errorf("failed to convert delete by group id jobs query to sql: %w", err)
}
db := dbtx.GetAccessor(ctx, s.db)
result, err := db.ExecContext(ctx, sql, args...)
if err != nil {
return 0, database.ProcessSQLErrorf(err, "failed to execute delete jobs by group id query")
}
n, err := result.RowsAffected()
if err != nil {
return 0, database.ProcessSQLErrorf(err, "failed to get number of deleted jobs in group")
}
return n, nil
}
// Create creates a new job.
func (s *JobStore) Create(ctx context.Context, job *types.Job) error {
const sqlQuery = `

View File

@ -24,3 +24,7 @@ const (
JobPriorityNormal JobPriority = 0
JobPriorityElevated JobPriority = 1
)
func GetCompletedJobState() []JobState {
return []JobState{JobStateFinished, JobStateCanceled, JobStateFinished}
}