drone/job/executor.go

160 lines
4.5 KiB
Go

// Copyright 2023 Harness, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package job
import (
"context"
"errors"
"fmt"
"runtime/debug"
"time"
"github.com/harness/gitness/pubsub"
"github.com/rs/zerolog/log"
)
// Executor holds map of Handler objects per each job type registered.
// The Scheduler uses the Executor to start execution of jobs.
type Executor struct {
handlerMap map[string]Handler
handlerComplete bool
store Store
publisher pubsub.Publisher
}
const (
ProgressMin = 0
ProgressMax = 100
)
// ProgressReporter can be used by a job Handler to report back the execution progress.
type ProgressReporter func(progress int, result string) error
// Handler is a job executor for a specific job type.
// An implementation should try to honor the context and
// try to abort the execution as soon as the context is done.
type Handler interface {
Handle(ctx context.Context, input string, fn ProgressReporter) (result string, err error)
}
var errNoHandlerDefined = errors.New("no handler registered for the job type")
// NewExecutor creates new Executor.
func NewExecutor(store Store, publisher pubsub.Publisher) *Executor {
return &Executor{
handlerMap: make(map[string]Handler),
handlerComplete: false,
store: store,
publisher: publisher,
}
}
// Register registers a job Handler for the provided job type.
// This function is not thread safe. All calls are expected to be made
// in a single thread during the application boot time.
func (e *Executor) Register(jobType string, exec Handler) error {
if jobType == "" {
return errors.New("jobType must not be empty")
}
if e.handlerComplete {
return errors.New("job handler registration is complete")
}
if exec == nil {
return errors.New("provided Handler is nil")
}
if _, ok := e.handlerMap[jobType]; ok {
return fmt.Errorf("a Handler is already defined to run the '%s' job types", jobType)
}
e.handlerMap[jobType] = exec
return nil
}
// finishRegistration forbids further registration of job types.
// It is called by the Scheduler when it starts.
func (e *Executor) finishRegistration() {
e.handlerComplete = true
}
// exec runs a single job. This function is synchronous,
// so the caller is responsible to run it in a separate go-routine.
func (e *Executor) exec(
ctx context.Context,
jobUID, jobType string,
input string,
) (result string, err error) {
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf(
"panic while processing job=%s type=%s: %v\n%s",
jobUID, jobType, r, debug.Stack())
}
}()
exec, ok := e.handlerMap[jobType]
if !ok {
return "", errNoHandlerDefined
}
// progressReporter is the function with which the job can update its progress.
// This function will be executed in the job executor's Go-routine.
// It uses the job's context.
progressReporter := func(progress int, result string) error {
if progress < ProgressMin || progress > ProgressMax {
return errors.New("progress must be between 0 and 100")
}
jobDummy := &Job{
UID: jobUID,
Type: jobType,
Updated: time.Now().UnixMilli(),
Result: result,
State: JobStateRunning,
RunProgress: progress,
}
// This doesn't need to be behind the global lock because it only updates the single row.
// While a job is running no other process should touch it.
// Even this call will fail if the context deadline has been exceeded.
// The job parameter is a dummy Job object that just holds fields that should be updated.
if err := e.store.UpdateProgress(ctx, jobDummy); err != nil {
return err
}
// tell everybody that a job progress has been updated
if err := publishStateChange(ctx, e.publisher, jobDummy); err != nil {
log.Err(err).Msg("failed to publish job state change")
}
return nil
}
return exec.Handle(ctx, input, progressReporter) // runs the job
}
func FailProgress() Progress {
return Progress{
State: JobStateFailed,
Progress: ProgressMax,
Result: "",
Failure: "",
}
}