simplify import job starting

This commit is contained in:
Marko Gaćeša 2023-09-11 12:01:40 +02:00
parent 05598a47a8
commit b0460b3658
5 changed files with 28 additions and 90 deletions

View File

@ -7,7 +7,6 @@ package repo
import ( import (
"context" "context"
"fmt" "fmt"
"github.com/harness/gitness/internal/api/usererror" "github.com/harness/gitness/internal/api/usererror"
"github.com/harness/gitness/internal/auth" "github.com/harness/gitness/internal/auth"
"github.com/harness/gitness/internal/paths" "github.com/harness/gitness/internal/paths"
@ -16,8 +15,6 @@ import (
"github.com/harness/gitness/store/database/dbtx" "github.com/harness/gitness/store/database/dbtx"
"github.com/harness/gitness/types" "github.com/harness/gitness/types"
"github.com/harness/gitness/types/enum" "github.com/harness/gitness/types/enum"
"github.com/rs/zerolog/log"
) )
type ImportInput struct { type ImportInput struct {
@ -83,17 +80,17 @@ func (c *Controller) Import(ctx context.Context, session *auth.Session, in *Impo
return fmt.Errorf("failed to create path: %w", err) return fmt.Errorf("failed to create path: %w", err)
} }
err = c.importer.Run(ctx, in.Provider, repo, remoteRepository.CloneURL)
if err != nil {
return fmt.Errorf("failed to start import repository job: %w", err)
}
return nil return nil
}) })
if err != nil { if err != nil {
return nil, err return nil, err
} }
err = c.importer.Run(ctx, in.Provider, repo, remoteRepository.CloneURL)
if err != nil {
log.Ctx(ctx).Err(err).Msg("failed to start import repository job")
}
repo.GitURL = c.urlProvider.GenerateRepoCloneURL(repo.Path) repo.GitURL = c.urlProvider.GenerateRepoCloneURL(repo.Path)
return repo, nil return repo, nil

View File

@ -7,8 +7,6 @@ package space
import ( import (
"context" "context"
"fmt" "fmt"
"github.com/harness/gitness/types/check"
"github.com/rs/zerolog/log"
"strconv" "strconv"
"strings" "strings"
"time" "time"
@ -21,6 +19,7 @@ import (
"github.com/harness/gitness/internal/services/job" "github.com/harness/gitness/internal/services/job"
"github.com/harness/gitness/store/database/dbtx" "github.com/harness/gitness/store/database/dbtx"
"github.com/harness/gitness/types" "github.com/harness/gitness/types"
"github.com/harness/gitness/types/check"
"github.com/harness/gitness/types/enum" "github.com/harness/gitness/types/enum"
) )
@ -168,17 +167,17 @@ func (c *Controller) Import(ctx context.Context, session *auth.Session, in *Impo
cloneURLs[i] = remoteRepository.CloneURL cloneURLs[i] = remoteRepository.CloneURL
} }
jobGroupID := fmt.Sprintf("space-import-%d", space.ID)
err = c.importer.RunMany(ctx, jobGroupID, in.Provider, localRepositories, cloneURLs)
if err != nil {
return fmt.Errorf("failed to start import repository jobs: %w", err)
}
return nil return nil
}) })
if err != nil { if err != nil {
return nil, err return nil, err
} }
jobGroupID := fmt.Sprintf("space-import-%d", space.ID)
err = c.importer.RunMany(ctx, jobGroupID, in.Provider, localRepositories, cloneURLs)
if err != nil {
log.Ctx(ctx).Err(err).Msg("failed to start import repository job")
}
return space, nil return space, nil
} }

View File

@ -199,7 +199,7 @@ func (i *Repository) Handle(ctx context.Context, data string, _ job.ProgressRepo
return "", fmt.Errorf("failed to import repository: %w", err) return "", fmt.Errorf("failed to import repository: %w", err)
} }
return "", err return "", nil
} }
func (i *Repository) GetProgress(ctx context.Context, repo *types.Repository) (types.JobProgress, error) { func (i *Repository) GetProgress(ctx context.Context, repo *types.Repository) (types.JobProgress, error) {

View File

@ -40,7 +40,6 @@ type Scheduler struct {
// synchronization stuff // synchronization stuff
signal chan time.Time signal chan time.Time
signalEdgy chan struct{}
done chan struct{} done chan struct{}
wgRunning sync.WaitGroup wgRunning sync.WaitGroup
cancelJobMx sync.Mutex cancelJobMx sync.Mutex
@ -105,7 +104,6 @@ func (s *Scheduler) Run(ctx context.Context) error {
defer close(s.done) defer close(s.done)
s.signal = make(chan time.Time, 1) s.signal = make(chan time.Time, 1)
s.signalEdgy = make(chan struct{}, 1)
s.globalCtx = ctx s.globalCtx = ctx
timer := newSchedulerTimer() timer := newSchedulerTimer()
@ -126,10 +124,6 @@ func (s *Scheduler) Run(ctx context.Context) error {
case <-ctx.Done(): case <-ctx.Done():
return ctx.Err() return ctx.Err()
case <-s.signalEdgy:
timer.MakeEdgy()
return nil
case newTime := <-s.signal: case newTime := <-s.signal:
dur := timer.RescheduleEarlier(newTime) dur := timer.RescheduleEarlier(newTime)
if dur > 0 { if dur > 0 {
@ -264,13 +258,6 @@ func (s *Scheduler) scheduleProcessing(scheduled time.Time) {
}() }()
} }
func (s *Scheduler) makeTimerEdgy() {
select {
case s.signalEdgy <- struct{}{}:
default:
}
}
// scheduleIfHaveMoreJobs triggers processing of ready jobs if the timer is edgy. // scheduleIfHaveMoreJobs triggers processing of ready jobs if the timer is edgy.
// The timer would be edgy if the previous iteration found more jobs that it could start (full capacity). // The timer would be edgy if the previous iteration found more jobs that it could start (full capacity).
// This should be run after a non-recurring job has finished. // This should be run after a non-recurring job has finished.
@ -286,12 +273,24 @@ func (s *Scheduler) RunJob(ctx context.Context, def Definition) error {
return err return err
} }
return s.startNewJobs(ctx, []*types.Job{def.toNewJob()}) job := def.toNewJob()
if err := s.store.Create(ctx, job); err != nil {
return fmt.Errorf("failed to add new job to the database: %w", err)
}
s.scheduleProcessing(time.UnixMilli(job.Scheduled))
return nil
} }
// RunJobs runs a several jobs. It's more efficient than calling RunJob several times // RunJobs runs a several jobs. It's more efficient than calling RunJob several times
// because it locks the DB only once. // because it locks the DB only once.
func (s *Scheduler) RunJobs(ctx context.Context, groupID string, defs []Definition) error { func (s *Scheduler) RunJobs(ctx context.Context, groupID string, defs []Definition) error {
if len(defs) == 0 {
return nil
}
jobs := make([]*types.Job, len(defs)) jobs := make([]*types.Job, len(defs))
for i, def := range defs { for i, def := range defs {
if err := def.Validate(); err != nil { if err := def.Validate(); err != nil {
@ -301,65 +300,13 @@ func (s *Scheduler) RunJobs(ctx context.Context, groupID string, defs []Definiti
jobs[i].GroupID = groupID jobs[i].GroupID = groupID
} }
return s.startNewJobs(ctx, jobs)
}
func (s *Scheduler) startNewJobs(ctx context.Context, jobs []*types.Job) error {
mx, err := globalLock(ctx, s.mxManager)
if err != nil {
return fmt.Errorf("failed to obtain global lock to start new jobs: %w", err)
}
defer func() {
if err := mx.Unlock(ctx); err != nil {
log.Ctx(ctx).Err(err).Msg("failed to release global lock after starting new jobs")
}
}()
return s.startNewJobsNoLock(ctx, jobs)
}
func (s *Scheduler) startNewJobsNoLock(ctx context.Context, jobs []*types.Job) error {
available, err := s.availableSlots(ctx)
if err != nil {
return fmt.Errorf("failed to count available slots for job execution: %w", err)
}
canRunAll := available >= len(jobs)
for _, job := range jobs { for _, job := range jobs {
if available > 0 { if err := s.store.Create(ctx, job); err != nil {
available--
s.preExec(job) // Update the job fields for the new execution: It will be added to the DB as "running".
}
err = s.store.Create(ctx, job)
if err != nil {
return fmt.Errorf("failed to add new job to the database: %w", err) return fmt.Errorf("failed to add new job to the database: %w", err)
} }
if job.State != enum.JobStateRunning {
continue
}
func(ctx context.Context) {
ctx = log.Ctx(ctx).With().
Str("job.UID", job.UID).
Str("job.Type", job.Type).
Logger().WithContext(ctx)
// tell everybody that a job has started
if err := publishStateChange(ctx, s.pubsubService, job); err != nil {
log.Err(err).Msg("failed to publish job state change")
}
s.runJob(ctx, job)
}(s.globalCtx)
} }
if !canRunAll { s.scheduleProcessing(time.Now())
s.makeTimerEdgy()
}
return nil return nil
} }

View File

@ -31,11 +31,6 @@ func (t *schedulerTimer) ResetAt(next time.Time, edgy bool) time.Duration {
return t.resetAt(time.Now(), next, edgy) return t.resetAt(time.Now(), next, edgy)
} }
// MakeEdgy makes the timer edgy which meant it will be triggered immediately on reschedule attempt.
func (t *schedulerTimer) MakeEdgy() {
t.edgy = true
}
func (t *schedulerTimer) resetAt(now, next time.Time, edgy bool) time.Duration { func (t *schedulerTimer) resetAt(now, next time.Time, edgy bool) time.Duration {
var dur time.Duration var dur time.Duration