mirror of https://github.com/harness/drone.git
770 lines
20 KiB
Go
770 lines
20 KiB
Go
// Copyright 2023 Harness, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package job
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"runtime/debug"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/harness/gitness/lock"
|
|
"github.com/harness/gitness/pubsub"
|
|
"github.com/harness/gitness/store"
|
|
|
|
"github.com/gorhill/cronexpr"
|
|
"github.com/rs/zerolog/log"
|
|
)
|
|
|
|
// Scheduler controls execution of background jobs.
|
|
type Scheduler struct {
|
|
// dependencies
|
|
store Store
|
|
executor *Executor
|
|
mxManager lock.MutexManager
|
|
pubsubService pubsub.PubSub
|
|
|
|
// configuration fields
|
|
instanceID string
|
|
maxRunning int
|
|
retentionTime time.Duration
|
|
|
|
// synchronization stuff
|
|
signal chan time.Time
|
|
done chan struct{}
|
|
wgRunning sync.WaitGroup
|
|
cancelJobMx sync.Mutex
|
|
cancelJobMap map[string]context.CancelFunc
|
|
}
|
|
|
|
func NewScheduler(
|
|
store Store,
|
|
executor *Executor,
|
|
mxManager lock.MutexManager,
|
|
pubsubService pubsub.PubSub,
|
|
instanceID string,
|
|
maxRunning int,
|
|
retentionTime time.Duration,
|
|
) (*Scheduler, error) {
|
|
if maxRunning < 1 {
|
|
maxRunning = 1
|
|
}
|
|
return &Scheduler{
|
|
store: store,
|
|
executor: executor,
|
|
mxManager: mxManager,
|
|
pubsubService: pubsubService,
|
|
|
|
instanceID: instanceID,
|
|
maxRunning: maxRunning,
|
|
retentionTime: retentionTime,
|
|
|
|
cancelJobMap: map[string]context.CancelFunc{},
|
|
}, nil
|
|
}
|
|
|
|
// Run runs the background job scheduler.
|
|
// It's a blocking call. It blocks until the provided context is done.
|
|
//
|
|
//nolint:gocognit // refactor if needed.
|
|
func (s *Scheduler) Run(ctx context.Context) error {
|
|
if s.done != nil {
|
|
return errors.New("already started")
|
|
}
|
|
|
|
consumer := s.pubsubService.Subscribe(ctx, PubSubTopicCancelJob, s.handleCancelJob)
|
|
defer func() {
|
|
err := consumer.Close()
|
|
if err != nil {
|
|
log.Ctx(ctx).Err(err).
|
|
Msg("job scheduler: failed to close pubsub cancel job consumer")
|
|
}
|
|
}()
|
|
|
|
if err := s.createNecessaryJobs(ctx); err != nil {
|
|
return fmt.Errorf("failed to create necessary jobs: %w", err)
|
|
}
|
|
|
|
if err := s.registerNecessaryJobs(); err != nil {
|
|
return fmt.Errorf("failed to register scheduler's internal jobs: %w", err)
|
|
}
|
|
|
|
s.executor.finishRegistration()
|
|
|
|
log.Ctx(ctx).Debug().Msg("job scheduler: starting")
|
|
|
|
s.done = make(chan struct{})
|
|
defer close(s.done)
|
|
|
|
s.signal = make(chan time.Time, 1)
|
|
|
|
timer := newSchedulerTimer()
|
|
defer timer.Stop()
|
|
|
|
for {
|
|
err := func() error {
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
stack := string(debug.Stack())
|
|
log.Ctx(ctx).Error().
|
|
Str("panic", fmt.Sprintf("[%T] job scheduler panic: %v", r, r)).
|
|
Msg(stack)
|
|
}
|
|
}()
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
|
|
case newTime := <-s.signal:
|
|
dur := timer.RescheduleEarlier(newTime)
|
|
if dur > 0 {
|
|
log.Ctx(ctx).Trace().
|
|
Msgf("job scheduler: update of scheduled job processing time... runs in %s", dur)
|
|
}
|
|
return nil
|
|
|
|
case now := <-timer.Ch():
|
|
count, nextExec, gotAllJobs, err := s.processReadyJobs(ctx, now)
|
|
|
|
// If the next processing time isn't known use the default.
|
|
if nextExec.IsZero() {
|
|
const period = time.Minute
|
|
nextExec = now.Add(period)
|
|
}
|
|
|
|
// Reset the timer. Make the timer edgy if there are more jobs available.
|
|
dur := timer.ResetAt(nextExec, !gotAllJobs)
|
|
|
|
if err != nil {
|
|
log.Ctx(ctx).Err(err).
|
|
Msgf("job scheduler: failed to process jobs; next iteration in %s", dur)
|
|
} else {
|
|
log.Ctx(ctx).Trace().
|
|
Msgf("job scheduler: started %d jobs; next iteration in %s", count, dur)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
}()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
// WaitJobsDone waits until execution of all jobs has finished.
|
|
// It is intended to be used for graceful shutdown, after the Run method has finished.
|
|
func (s *Scheduler) WaitJobsDone(ctx context.Context) {
|
|
log.Ctx(ctx).Debug().Msg("job scheduler: stopping... waiting for the currently running jobs to finish")
|
|
|
|
ch := make(chan struct{})
|
|
go func() {
|
|
s.wgRunning.Wait()
|
|
close(ch)
|
|
}()
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
log.Ctx(ctx).Warn().Msg("job scheduler: stop interrupted")
|
|
case <-ch:
|
|
log.Ctx(ctx).Info().Msg("job scheduler: gracefully stopped")
|
|
}
|
|
}
|
|
|
|
// CancelJob cancels a currently running or scheduled job.
|
|
func (s *Scheduler) CancelJob(ctx context.Context, jobUID string) error {
|
|
mx, err := globalLock(ctx, s.mxManager)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to obtain global lock to cancel a job: %w", err)
|
|
}
|
|
|
|
defer func() {
|
|
if err := mx.Unlock(ctx); err != nil {
|
|
log.Ctx(ctx).Err(err).Msg("failed to release global lock after canceling a job")
|
|
}
|
|
}()
|
|
|
|
job, err := s.store.Find(ctx, jobUID)
|
|
if errors.Is(err, store.ErrResourceNotFound) {
|
|
return nil // ensure consistent response for completed jobs
|
|
}
|
|
if err != nil {
|
|
return fmt.Errorf("failed to find job to cancel: %w", err)
|
|
}
|
|
|
|
if job.IsRecurring {
|
|
return errors.New("can't cancel recurring jobs")
|
|
}
|
|
|
|
if job.State != JobStateScheduled && job.State != JobStateRunning {
|
|
return nil // return no error if the job is already canceled or has finished or failed.
|
|
}
|
|
|
|
// first we update the job in the database...
|
|
|
|
job.Updated = time.Now().UnixMilli()
|
|
job.State = JobStateCanceled
|
|
|
|
err = s.store.UpdateExecution(ctx, job)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to update job to cancel it: %w", err)
|
|
}
|
|
|
|
// ... and then we cancel its context.
|
|
|
|
s.cancelJobMx.Lock()
|
|
cancelFn, ok := s.cancelJobMap[jobUID]
|
|
s.cancelJobMx.Unlock()
|
|
|
|
if ok {
|
|
cancelFn()
|
|
return nil
|
|
}
|
|
|
|
return s.pubsubService.Publish(ctx, PubSubTopicCancelJob, []byte(jobUID))
|
|
}
|
|
|
|
func (s *Scheduler) handleCancelJob(payload []byte) error {
|
|
jobUID := string(payload)
|
|
if jobUID == "" {
|
|
return nil
|
|
}
|
|
|
|
s.cancelJobMx.Lock()
|
|
cancelFn, ok := s.cancelJobMap[jobUID]
|
|
s.cancelJobMx.Unlock()
|
|
|
|
if ok {
|
|
cancelFn()
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// scheduleProcessing triggers processing of ready jobs.
|
|
// This should be run after adding new jobs to the database.
|
|
func (s *Scheduler) scheduleProcessing(scheduled time.Time) {
|
|
go func() {
|
|
select {
|
|
case <-s.done:
|
|
case s.signal <- scheduled:
|
|
}
|
|
}()
|
|
}
|
|
|
|
// scheduleIfHaveMoreJobs triggers processing of ready jobs if the timer is edgy.
|
|
// The timer would be edgy if the previous iteration found more jobs that it could start (full capacity).
|
|
// This should be run after a non-recurring job has finished.
|
|
func (s *Scheduler) scheduleIfHaveMoreJobs() {
|
|
s.scheduleProcessing(time.Time{}) // zero time will trigger the timer if it's edgy
|
|
}
|
|
|
|
// RunJob runs a single job of the type Definition.Type.
|
|
// All parameters a job Handler receives must be inside the Definition.Data string
|
|
// (as JSON or whatever the job Handler can interpret).
|
|
func (s *Scheduler) RunJob(ctx context.Context, def Definition) error {
|
|
if err := def.Validate(); err != nil {
|
|
return err
|
|
}
|
|
|
|
job := def.toNewJob()
|
|
|
|
if err := s.store.Create(ctx, job); err != nil {
|
|
return fmt.Errorf("failed to add new job to the database: %w", err)
|
|
}
|
|
|
|
s.scheduleProcessing(time.UnixMilli(job.Scheduled))
|
|
|
|
return nil
|
|
}
|
|
|
|
// RunJobs runs a several jobs. It's more efficient than calling RunJob several times
|
|
// because it locks the DB only once.
|
|
func (s *Scheduler) RunJobs(ctx context.Context, groupID string, defs []Definition) error {
|
|
if len(defs) == 0 {
|
|
return nil
|
|
}
|
|
|
|
jobs := make([]*Job, len(defs))
|
|
for i, def := range defs {
|
|
if err := def.Validate(); err != nil {
|
|
return err
|
|
}
|
|
jobs[i] = def.toNewJob()
|
|
jobs[i].GroupID = groupID
|
|
}
|
|
|
|
for _, job := range jobs {
|
|
if err := s.store.Create(ctx, job); err != nil {
|
|
return fmt.Errorf("failed to add new job to the database: %w", err)
|
|
}
|
|
}
|
|
|
|
s.scheduleProcessing(time.Now())
|
|
|
|
return nil
|
|
}
|
|
|
|
// processReadyJobs executes jobs that are ready to run. This function is periodically run by the Scheduler.
|
|
// The function returns the number of jobs it has is started, the next scheduled execution time (of this function)
|
|
// and a bool value if all currently available ready jobs were started.
|
|
// Internally the Scheduler uses an "edgy" timer to reschedule calls of this function.
|
|
// The edgy option of the timer will be on if this function hasn't been able to start all job that are ready to run.
|
|
// If the timer has the edgy option turned on it will trigger the timer (and thus this function will be called)
|
|
// when any currently running job finishes successfully or fails.
|
|
func (s *Scheduler) processReadyJobs(ctx context.Context, now time.Time) (int, time.Time, bool, error) {
|
|
mx, err := globalLock(ctx, s.mxManager)
|
|
if err != nil {
|
|
return 0, time.Time{}, false,
|
|
fmt.Errorf("failed to obtain global lock to periodically process ready jobs: %w", err)
|
|
}
|
|
|
|
defer func() {
|
|
if err := mx.Unlock(ctx); err != nil {
|
|
log.Ctx(ctx).Err(err).
|
|
Msg("failed to release global lock after periodic processing of ready jobs")
|
|
}
|
|
}()
|
|
|
|
availableCount, err := s.availableSlots(ctx)
|
|
if err != nil {
|
|
return 0, time.Time{}, false,
|
|
fmt.Errorf("failed to count available slots for job execution: %w", err)
|
|
}
|
|
|
|
// get one over the limit to check if all ready jobs are fetched
|
|
jobs, err := s.store.ListReady(ctx, now, availableCount+1)
|
|
if err != nil {
|
|
return 0, time.Time{}, false,
|
|
fmt.Errorf("failed to load scheduled jobs: %w", err)
|
|
}
|
|
|
|
var (
|
|
countExecuted int
|
|
knownNextExecTime time.Time
|
|
gotAllJobs bool
|
|
)
|
|
|
|
if len(jobs) > availableCount {
|
|
// More jobs are ready than we are able to run.
|
|
jobs = jobs[:availableCount]
|
|
} else {
|
|
gotAllJobs = true
|
|
knownNextExecTime, err = s.store.NextScheduledTime(ctx, now)
|
|
if err != nil {
|
|
return 0, time.Time{}, false,
|
|
fmt.Errorf("failed to read next scheduled time: %w", err)
|
|
}
|
|
}
|
|
|
|
for _, job := range jobs {
|
|
jobCtx := log.Ctx(ctx).With().
|
|
Str("job.UID", job.UID).
|
|
Str("job.Type", job.Type).
|
|
Logger().WithContext(ctx)
|
|
|
|
// Update the job fields for the new execution
|
|
s.preExec(job)
|
|
|
|
if err := s.store.UpdateExecution(ctx, job); err != nil {
|
|
knownNextExecTime = time.Time{}
|
|
gotAllJobs = false
|
|
log.Ctx(jobCtx).Err(err).Msg("failed to update job to mark it as running")
|
|
continue
|
|
}
|
|
|
|
// tell everybody that a job has started
|
|
if err := publishStateChange(ctx, s.pubsubService, job); err != nil {
|
|
log.Ctx(jobCtx).Err(err).Msg("failed to publish job state change")
|
|
}
|
|
|
|
s.runJob(jobCtx, job)
|
|
|
|
countExecuted++
|
|
}
|
|
|
|
return countExecuted, knownNextExecTime, gotAllJobs, nil
|
|
}
|
|
|
|
func (s *Scheduler) availableSlots(ctx context.Context) (int, error) {
|
|
countRunning, err := s.store.CountRunning(ctx)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
availableCount := s.maxRunning - countRunning
|
|
if availableCount < 0 {
|
|
return 0, nil
|
|
}
|
|
|
|
return availableCount, nil
|
|
}
|
|
|
|
// runJob updates the job in the database and starts it in a separate goroutine.
|
|
// The function will also log the execution.
|
|
func (s *Scheduler) runJob(ctx context.Context, j *Job) {
|
|
s.wgRunning.Add(1)
|
|
go func(ctx context.Context,
|
|
jobUID, jobType, jobData string,
|
|
jobRunDeadline int64,
|
|
) {
|
|
defer s.wgRunning.Done()
|
|
|
|
log.Ctx(ctx).Debug().Msg("started job")
|
|
|
|
timeStart := time.Now()
|
|
|
|
// Run the job
|
|
execResult, execFailure := s.doExec(ctx, jobUID, jobType, jobData, jobRunDeadline)
|
|
|
|
// Use the context.Background() because we want to update the job even if the job's context is done.
|
|
// The context can be done because the job exceeded its deadline or the server is shutting down.
|
|
backgroundCtx := context.Background()
|
|
|
|
if mx, err := globalLock(backgroundCtx, s.mxManager); err != nil {
|
|
// If locking failed, just log the error and proceed to update the DB anyway.
|
|
log.Ctx(ctx).Err(err).Msg("failed to obtain global lock to update job after execution")
|
|
} else {
|
|
defer func() {
|
|
if err := mx.Unlock(backgroundCtx); err != nil {
|
|
log.Ctx(ctx).Err(err).Msg("failed to release global lock to update job after execution")
|
|
}
|
|
}()
|
|
}
|
|
|
|
job, err := s.store.Find(backgroundCtx, jobUID)
|
|
if err != nil {
|
|
log.Ctx(ctx).Err(err).Msg("failed to find job after execution")
|
|
return
|
|
}
|
|
|
|
// Update the job fields, reschedule if necessary.
|
|
postExec(job, execResult, execFailure)
|
|
|
|
err = s.store.UpdateExecution(backgroundCtx, job)
|
|
if err != nil {
|
|
log.Ctx(ctx).Err(err).Msg("failed to update job after execution")
|
|
return
|
|
}
|
|
|
|
logInfo := log.Ctx(ctx).Info().Str("duration", time.Since(timeStart).String())
|
|
|
|
if job.IsRecurring {
|
|
logInfo = logInfo.Bool("job.IsRecurring", true)
|
|
}
|
|
if job.Result != "" {
|
|
logInfo = logInfo.Str("job.Result", job.Result)
|
|
}
|
|
if job.LastFailureError != "" {
|
|
logInfo = logInfo.Str("job.Failure", job.LastFailureError)
|
|
}
|
|
|
|
switch job.State {
|
|
case JobStateFinished:
|
|
logInfo.Msg("job successfully finished")
|
|
s.scheduleIfHaveMoreJobs()
|
|
|
|
case JobStateFailed:
|
|
logInfo.Msg("job failed")
|
|
s.scheduleIfHaveMoreJobs()
|
|
|
|
case JobStateCanceled:
|
|
log.Ctx(ctx).Error().Msg("job canceled")
|
|
s.scheduleIfHaveMoreJobs()
|
|
|
|
case JobStateScheduled:
|
|
scheduledTime := time.UnixMilli(job.Scheduled)
|
|
logInfo.
|
|
Str("job.Scheduled", scheduledTime.Format(time.RFC3339Nano)).
|
|
Msg("job finished and rescheduled")
|
|
|
|
s.scheduleProcessing(scheduledTime)
|
|
|
|
case JobStateRunning:
|
|
log.Ctx(ctx).Error().Msg("should not happen; job still has state=running after finishing")
|
|
}
|
|
|
|
// tell everybody that a job has finished execution
|
|
if err := publishStateChange(backgroundCtx, s.pubsubService, job); err != nil {
|
|
log.Ctx(ctx).Err(err).Msg("failed to publish job state change")
|
|
}
|
|
}(ctx, j.UID, j.Type, j.Data, j.RunDeadline)
|
|
}
|
|
|
|
// preExec updates the provided Job before execution.
|
|
func (s *Scheduler) preExec(job *Job) {
|
|
if job.MaxDurationSeconds < 1 {
|
|
job.MaxDurationSeconds = 1
|
|
}
|
|
|
|
now := time.Now()
|
|
nowMilli := now.UnixMilli()
|
|
|
|
execDuration := time.Duration(job.MaxDurationSeconds) * time.Second
|
|
execDeadline := now.Add(execDuration)
|
|
|
|
job.Updated = nowMilli
|
|
job.LastExecuted = nowMilli
|
|
job.State = JobStateRunning
|
|
job.RunDeadline = execDeadline.UnixMilli()
|
|
job.RunBy = s.instanceID
|
|
job.RunProgress = ProgressMin
|
|
job.TotalExecutions++
|
|
job.Result = ""
|
|
job.LastFailureError = ""
|
|
}
|
|
|
|
// doExec executes the provided Job.
|
|
func (s *Scheduler) doExec(ctx context.Context,
|
|
jobUID, jobType, jobData string,
|
|
jobRunDeadline int64,
|
|
) (execResult, execError string) {
|
|
execDeadline := time.UnixMilli(jobRunDeadline)
|
|
|
|
jobCtx, done := context.WithDeadline(ctx, execDeadline)
|
|
defer done()
|
|
|
|
s.cancelJobMx.Lock()
|
|
if _, ok := s.cancelJobMap[jobUID]; ok {
|
|
// should not happen: jobs have unique UIDs!
|
|
s.cancelJobMx.Unlock()
|
|
return "", "failed to start: already running"
|
|
}
|
|
s.cancelJobMap[jobUID] = done
|
|
s.cancelJobMx.Unlock()
|
|
|
|
defer func() {
|
|
s.cancelJobMx.Lock()
|
|
delete(s.cancelJobMap, jobUID)
|
|
s.cancelJobMx.Unlock()
|
|
}()
|
|
|
|
execResult, err := s.executor.exec(jobCtx, jobUID, jobType, jobData)
|
|
if err != nil {
|
|
execError = err.Error()
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
// postExec updates the provided Job after execution and reschedules it if necessary.
|
|
//
|
|
//nolint:gocognit // refactor if needed.
|
|
func postExec(job *Job, resultData, resultErr string) {
|
|
// Proceed with the update of the job if it's in the running state or
|
|
// if it's marked as canceled but has succeeded nonetheless.
|
|
// Other states should not happen, but if they do, just leave the job as it is.
|
|
if job.State != JobStateRunning && (job.State != JobStateCanceled || resultErr != "") {
|
|
return
|
|
}
|
|
|
|
now := time.Now()
|
|
nowMilli := now.UnixMilli()
|
|
|
|
job.Updated = nowMilli
|
|
job.Result = resultData
|
|
job.RunBy = ""
|
|
|
|
if resultErr != "" {
|
|
job.ConsecutiveFailures++
|
|
job.State = JobStateFailed
|
|
job.LastFailureError = resultErr
|
|
} else {
|
|
job.State = JobStateFinished
|
|
job.RunProgress = ProgressMax
|
|
}
|
|
|
|
// Reschedule recurring jobs
|
|
//nolint:nestif // refactor if needed
|
|
if job.IsRecurring {
|
|
if resultErr == "" {
|
|
job.ConsecutiveFailures = 0
|
|
}
|
|
|
|
exp, err := cronexpr.Parse(job.RecurringCron)
|
|
if err != nil {
|
|
job.State = JobStateFailed
|
|
|
|
messages := fmt.Sprintf("failed to parse cron string: %s", err.Error())
|
|
if job.LastFailureError != "" {
|
|
messages = messages + "; " + job.LastFailureError
|
|
}
|
|
|
|
job.LastFailureError = messages
|
|
} else {
|
|
job.State = JobStateScheduled
|
|
job.Scheduled = exp.Next(now).UnixMilli()
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
// Reschedule the failed job if retrying is allowed
|
|
if job.State == JobStateFailed && job.ConsecutiveFailures <= job.MaxRetries {
|
|
const retryDelay = 15 * time.Second
|
|
job.State = JobStateScheduled
|
|
job.Scheduled = now.Add(retryDelay).UnixMilli()
|
|
job.RunProgress = ProgressMin
|
|
}
|
|
}
|
|
|
|
func (s *Scheduler) GetJobProgress(ctx context.Context, jobUID string) (Progress, error) {
|
|
job, err := s.store.Find(ctx, jobUID)
|
|
if err != nil {
|
|
return Progress{}, err
|
|
}
|
|
|
|
return mapToProgress(job), nil
|
|
}
|
|
|
|
func (s *Scheduler) GetJobProgressForGroup(ctx context.Context, jobGroupUID string) ([]Progress, error) {
|
|
job, err := s.store.ListByGroupID(ctx, jobGroupUID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return mapToProgressMany(job), nil
|
|
}
|
|
|
|
func (s *Scheduler) PurgeJobsByGroupID(ctx context.Context, jobGroupID string) (int64, error) {
|
|
n, err := s.store.DeleteByGroupID(ctx, jobGroupID)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("failed to delete jobs by group id=%s: %w", jobGroupID, err)
|
|
}
|
|
return n, nil
|
|
}
|
|
|
|
func (s *Scheduler) PurgeJobByUID(ctx context.Context, jobUID string) error {
|
|
err := s.store.DeleteByUID(ctx, jobUID)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to delete job with id=%s: %w", jobUID, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func mapToProgressMany(jobs []*Job) []Progress {
|
|
if jobs == nil {
|
|
return nil
|
|
}
|
|
j := make([]Progress, len(jobs))
|
|
for i, job := range jobs {
|
|
j[i] = mapToProgress(job)
|
|
}
|
|
return j
|
|
}
|
|
|
|
func mapToProgress(job *Job) Progress {
|
|
return Progress{
|
|
State: job.State,
|
|
Progress: job.RunProgress,
|
|
Result: job.Result,
|
|
Failure: job.LastFailureError,
|
|
}
|
|
}
|
|
|
|
func (s *Scheduler) AddRecurring(
|
|
ctx context.Context,
|
|
jobUID,
|
|
jobType,
|
|
cronDef string,
|
|
maxDur time.Duration,
|
|
) error {
|
|
cronExp, err := cronexpr.Parse(cronDef)
|
|
if err != nil {
|
|
return fmt.Errorf("invalid cron definition string for job type=%s: %w", jobType, err)
|
|
}
|
|
|
|
now := time.Now()
|
|
nowMilli := now.UnixMilli()
|
|
|
|
nextExec := cronExp.Next(now)
|
|
|
|
job := &Job{
|
|
UID: jobUID,
|
|
Created: nowMilli,
|
|
Updated: nowMilli,
|
|
Type: jobType,
|
|
Priority: JobPriorityElevated,
|
|
Data: "",
|
|
Result: "",
|
|
MaxDurationSeconds: int(maxDur / time.Second),
|
|
MaxRetries: 0,
|
|
State: JobStateScheduled,
|
|
Scheduled: nextExec.UnixMilli(),
|
|
TotalExecutions: 0,
|
|
RunBy: "",
|
|
RunDeadline: 0,
|
|
RunProgress: 0,
|
|
LastExecuted: 0,
|
|
IsRecurring: true,
|
|
RecurringCron: cronDef,
|
|
ConsecutiveFailures: 0,
|
|
LastFailureError: "",
|
|
}
|
|
|
|
err = s.store.Upsert(ctx, job)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to upsert job id=%s type=%s: %w", jobUID, jobType, err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *Scheduler) createNecessaryJobs(ctx context.Context) error {
|
|
mx, err := globalLock(ctx, s.mxManager)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to obtain global lock to create necessary jobs: %w", err)
|
|
}
|
|
|
|
defer func() {
|
|
if err := mx.Unlock(ctx); err != nil {
|
|
log.Ctx(ctx).Err(err).
|
|
Msg("failed to release global lock after creating necessary jobs")
|
|
}
|
|
}()
|
|
|
|
err = s.AddRecurring(ctx, jobUIDPurge, jobTypePurge, jobCronPurge, 5*time.Second)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = s.AddRecurring(ctx, jobUIDOverdue, jobTypeOverdue, jobCronOverdue, 5*time.Second)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// registerNecessaryJobs registers two jobs: overdue job recovery and purge old finished jobs.
|
|
// These two jobs types are integral part of the job scheduler.
|
|
func (s *Scheduler) registerNecessaryJobs() error {
|
|
handlerOverdue := newJobOverdue(s.store, s.mxManager, s)
|
|
err := s.executor.Register(jobTypeOverdue, handlerOverdue)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
handlerPurge := newJobPurge(s.store, s.mxManager, s.retentionTime)
|
|
err = s.executor.Register(jobTypePurge, handlerPurge)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|