From b0460b365859ef537908fba401c7936137059c92 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marko=20Ga=C4=87e=C5=A1a?= Date: Mon, 11 Sep 2023 12:01:40 +0200 Subject: [PATCH] simplify import job starting --- internal/api/controller/repo/import.go | 13 ++-- internal/api/controller/space/import.go | 15 ++--- internal/services/importer/repository.go | 2 +- internal/services/job/scheduler.go | 83 +++++------------------- internal/services/job/timer.go | 5 -- 5 files changed, 28 insertions(+), 90 deletions(-) diff --git a/internal/api/controller/repo/import.go b/internal/api/controller/repo/import.go index cdad3bfde..09dd919dd 100644 --- a/internal/api/controller/repo/import.go +++ b/internal/api/controller/repo/import.go @@ -7,7 +7,6 @@ package repo import ( "context" "fmt" - "github.com/harness/gitness/internal/api/usererror" "github.com/harness/gitness/internal/auth" "github.com/harness/gitness/internal/paths" @@ -16,8 +15,6 @@ import ( "github.com/harness/gitness/store/database/dbtx" "github.com/harness/gitness/types" "github.com/harness/gitness/types/enum" - - "github.com/rs/zerolog/log" ) type ImportInput struct { @@ -83,17 +80,17 @@ func (c *Controller) Import(ctx context.Context, session *auth.Session, in *Impo return fmt.Errorf("failed to create path: %w", err) } + err = c.importer.Run(ctx, in.Provider, repo, remoteRepository.CloneURL) + if err != nil { + return fmt.Errorf("failed to start import repository job: %w", err) + } + return nil }) if err != nil { return nil, err } - err = c.importer.Run(ctx, in.Provider, repo, remoteRepository.CloneURL) - if err != nil { - log.Ctx(ctx).Err(err).Msg("failed to start import repository job") - } - repo.GitURL = c.urlProvider.GenerateRepoCloneURL(repo.Path) return repo, nil diff --git a/internal/api/controller/space/import.go b/internal/api/controller/space/import.go index ce00c9749..0720632fb 100644 --- a/internal/api/controller/space/import.go +++ b/internal/api/controller/space/import.go @@ -7,8 +7,6 @@ package space import ( "context" "fmt" - "github.com/harness/gitness/types/check" - "github.com/rs/zerolog/log" "strconv" "strings" "time" @@ -21,6 +19,7 @@ import ( "github.com/harness/gitness/internal/services/job" "github.com/harness/gitness/store/database/dbtx" "github.com/harness/gitness/types" + "github.com/harness/gitness/types/check" "github.com/harness/gitness/types/enum" ) @@ -168,17 +167,17 @@ func (c *Controller) Import(ctx context.Context, session *auth.Session, in *Impo cloneURLs[i] = remoteRepository.CloneURL } + jobGroupID := fmt.Sprintf("space-import-%d", space.ID) + err = c.importer.RunMany(ctx, jobGroupID, in.Provider, localRepositories, cloneURLs) + if err != nil { + return fmt.Errorf("failed to start import repository jobs: %w", err) + } + return nil }) if err != nil { return nil, err } - jobGroupID := fmt.Sprintf("space-import-%d", space.ID) - err = c.importer.RunMany(ctx, jobGroupID, in.Provider, localRepositories, cloneURLs) - if err != nil { - log.Ctx(ctx).Err(err).Msg("failed to start import repository job") - } - return space, nil } diff --git a/internal/services/importer/repository.go b/internal/services/importer/repository.go index 399168587..41a4982ee 100644 --- a/internal/services/importer/repository.go +++ b/internal/services/importer/repository.go @@ -199,7 +199,7 @@ func (i *Repository) Handle(ctx context.Context, data string, _ job.ProgressRepo return "", fmt.Errorf("failed to import repository: %w", err) } - return "", err + return "", nil } func (i *Repository) GetProgress(ctx context.Context, repo *types.Repository) (types.JobProgress, error) { diff --git a/internal/services/job/scheduler.go b/internal/services/job/scheduler.go index 1ca8545cb..f769e65f7 100644 --- a/internal/services/job/scheduler.go +++ b/internal/services/job/scheduler.go @@ -40,7 +40,6 @@ type Scheduler struct { // synchronization stuff signal chan time.Time - signalEdgy chan struct{} done chan struct{} wgRunning sync.WaitGroup cancelJobMx sync.Mutex @@ -105,7 +104,6 @@ func (s *Scheduler) Run(ctx context.Context) error { defer close(s.done) s.signal = make(chan time.Time, 1) - s.signalEdgy = make(chan struct{}, 1) s.globalCtx = ctx timer := newSchedulerTimer() @@ -126,10 +124,6 @@ func (s *Scheduler) Run(ctx context.Context) error { case <-ctx.Done(): return ctx.Err() - case <-s.signalEdgy: - timer.MakeEdgy() - return nil - case newTime := <-s.signal: dur := timer.RescheduleEarlier(newTime) if dur > 0 { @@ -264,13 +258,6 @@ func (s *Scheduler) scheduleProcessing(scheduled time.Time) { }() } -func (s *Scheduler) makeTimerEdgy() { - select { - case s.signalEdgy <- struct{}{}: - default: - } -} - // scheduleIfHaveMoreJobs triggers processing of ready jobs if the timer is edgy. // The timer would be edgy if the previous iteration found more jobs that it could start (full capacity). // This should be run after a non-recurring job has finished. @@ -286,12 +273,24 @@ func (s *Scheduler) RunJob(ctx context.Context, def Definition) error { return err } - return s.startNewJobs(ctx, []*types.Job{def.toNewJob()}) + job := def.toNewJob() + + if err := s.store.Create(ctx, job); err != nil { + return fmt.Errorf("failed to add new job to the database: %w", err) + } + + s.scheduleProcessing(time.UnixMilli(job.Scheduled)) + + return nil } // RunJobs runs a several jobs. It's more efficient than calling RunJob several times // because it locks the DB only once. func (s *Scheduler) RunJobs(ctx context.Context, groupID string, defs []Definition) error { + if len(defs) == 0 { + return nil + } + jobs := make([]*types.Job, len(defs)) for i, def := range defs { if err := def.Validate(); err != nil { @@ -301,65 +300,13 @@ func (s *Scheduler) RunJobs(ctx context.Context, groupID string, defs []Definiti jobs[i].GroupID = groupID } - return s.startNewJobs(ctx, jobs) -} - -func (s *Scheduler) startNewJobs(ctx context.Context, jobs []*types.Job) error { - mx, err := globalLock(ctx, s.mxManager) - if err != nil { - return fmt.Errorf("failed to obtain global lock to start new jobs: %w", err) - } - - defer func() { - if err := mx.Unlock(ctx); err != nil { - log.Ctx(ctx).Err(err).Msg("failed to release global lock after starting new jobs") - } - }() - - return s.startNewJobsNoLock(ctx, jobs) -} - -func (s *Scheduler) startNewJobsNoLock(ctx context.Context, jobs []*types.Job) error { - available, err := s.availableSlots(ctx) - if err != nil { - return fmt.Errorf("failed to count available slots for job execution: %w", err) - } - - canRunAll := available >= len(jobs) - for _, job := range jobs { - if available > 0 { - available-- - s.preExec(job) // Update the job fields for the new execution: It will be added to the DB as "running". - } - - err = s.store.Create(ctx, job) - if err != nil { + if err := s.store.Create(ctx, job); err != nil { return fmt.Errorf("failed to add new job to the database: %w", err) } - - if job.State != enum.JobStateRunning { - continue - } - - func(ctx context.Context) { - ctx = log.Ctx(ctx).With(). - Str("job.UID", job.UID). - Str("job.Type", job.Type). - Logger().WithContext(ctx) - - // tell everybody that a job has started - if err := publishStateChange(ctx, s.pubsubService, job); err != nil { - log.Err(err).Msg("failed to publish job state change") - } - - s.runJob(ctx, job) - }(s.globalCtx) } - if !canRunAll { - s.makeTimerEdgy() - } + s.scheduleProcessing(time.Now()) return nil } diff --git a/internal/services/job/timer.go b/internal/services/job/timer.go index 819cb884b..dfc93f70a 100644 --- a/internal/services/job/timer.go +++ b/internal/services/job/timer.go @@ -31,11 +31,6 @@ func (t *schedulerTimer) ResetAt(next time.Time, edgy bool) time.Duration { return t.resetAt(time.Now(), next, edgy) } -// MakeEdgy makes the timer edgy which meant it will be triggered immediately on reschedule attempt. -func (t *schedulerTimer) MakeEdgy() { - t.edgy = true -} - func (t *schedulerTimer) resetAt(now, next time.Time, edgy bool) time.Duration { var dur time.Duration