drone/job/job_overdue.go

98 lines
2.5 KiB
Go

// Copyright 2023 Harness, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package job
import (
"context"
"fmt"
"time"
"github.com/harness/gitness/lock"
"github.com/rs/zerolog/log"
)
const (
jobUIDOverdue = "gitness:jobs:overdue"
jobTypeOverdue = "gitness:jobs:overdue"
jobCronOverdue = "*/20 * * * *" // every 20 min
)
type jobOverdue struct {
store Store
mxManager lock.MutexManager
scheduler *Scheduler
}
func newJobOverdue(store Store, mxManager lock.MutexManager, scheduler *Scheduler) *jobOverdue {
return &jobOverdue{
store: store,
mxManager: mxManager,
scheduler: scheduler,
}
}
// Handle reclaims overdue jobs. Normally this shouldn't happen.
// But, it can occur if DB update after a job execution fails,
// or the server suddenly terminates while the job is still running.
func (j *jobOverdue) Handle(ctx context.Context, _ string, _ ProgressReporter) (string, error) {
mx, err := globalLock(ctx, j.mxManager)
if err != nil {
return "", fmt.Errorf("failed to obtain the lock to reclaim overdue jobs")
}
defer func() {
if err := mx.Unlock(ctx); err != nil {
log.Err(err).Msg("failed to release global lock after reclaiming overdue jobs")
}
}()
overdueJobs, err := j.store.ListDeadlineExceeded(ctx, time.Now())
if err != nil {
return "", fmt.Errorf("failed to list overdue jobs")
}
if len(overdueJobs) == 0 {
return "", nil
}
var minScheduled time.Time
for _, job := range overdueJobs {
const errorMessage = "deadline exceeded"
postExec(job, "", errorMessage)
err = j.store.UpdateExecution(ctx, job)
if err != nil {
return "", fmt.Errorf("failed update overdue job")
}
if job.State == JobStateScheduled {
scheduled := time.UnixMilli(job.Scheduled)
if minScheduled.IsZero() || minScheduled.After(scheduled) {
minScheduled = scheduled
}
}
}
if !minScheduled.IsZero() {
j.scheduler.scheduleProcessing(minScheduled)
}
result := fmt.Sprintf("found %d overdue jobs", len(overdueJobs))
return result, nil
}