mirror of https://github.com/harness/drone.git
160 lines
4.5 KiB
Go
160 lines
4.5 KiB
Go
// Copyright 2023 Harness, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package job
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"runtime/debug"
|
|
"time"
|
|
|
|
"github.com/harness/gitness/pubsub"
|
|
|
|
"github.com/rs/zerolog/log"
|
|
)
|
|
|
|
// Executor holds map of Handler objects per each job type registered.
|
|
// The Scheduler uses the Executor to start execution of jobs.
|
|
type Executor struct {
|
|
handlerMap map[string]Handler
|
|
handlerComplete bool
|
|
store Store
|
|
publisher pubsub.Publisher
|
|
}
|
|
|
|
const (
|
|
ProgressMin = 0
|
|
ProgressMax = 100
|
|
)
|
|
|
|
// ProgressReporter can be used by a job Handler to report back the execution progress.
|
|
type ProgressReporter func(progress int, result string) error
|
|
|
|
// Handler is a job executor for a specific job type.
|
|
// An implementation should try to honor the context and
|
|
// try to abort the execution as soon as the context is done.
|
|
type Handler interface {
|
|
Handle(ctx context.Context, input string, fn ProgressReporter) (result string, err error)
|
|
}
|
|
|
|
var errNoHandlerDefined = errors.New("no handler registered for the job type")
|
|
|
|
// NewExecutor creates new Executor.
|
|
func NewExecutor(store Store, publisher pubsub.Publisher) *Executor {
|
|
return &Executor{
|
|
handlerMap: make(map[string]Handler),
|
|
handlerComplete: false,
|
|
store: store,
|
|
publisher: publisher,
|
|
}
|
|
}
|
|
|
|
// Register registers a job Handler for the provided job type.
|
|
// This function is not thread safe. All calls are expected to be made
|
|
// in a single thread during the application boot time.
|
|
func (e *Executor) Register(jobType string, exec Handler) error {
|
|
if jobType == "" {
|
|
return errors.New("jobType must not be empty")
|
|
}
|
|
|
|
if e.handlerComplete {
|
|
return errors.New("job handler registration is complete")
|
|
}
|
|
|
|
if exec == nil {
|
|
return errors.New("provided Handler is nil")
|
|
}
|
|
|
|
if _, ok := e.handlerMap[jobType]; ok {
|
|
return fmt.Errorf("a Handler is already defined to run the '%s' job types", jobType)
|
|
}
|
|
|
|
e.handlerMap[jobType] = exec
|
|
|
|
return nil
|
|
}
|
|
|
|
// finishRegistration forbids further registration of job types.
|
|
// It is called by the Scheduler when it starts.
|
|
func (e *Executor) finishRegistration() {
|
|
e.handlerComplete = true
|
|
}
|
|
|
|
// exec runs a single job. This function is synchronous,
|
|
// so the caller is responsible to run it in a separate go-routine.
|
|
func (e *Executor) exec(
|
|
ctx context.Context,
|
|
jobUID, jobType string,
|
|
input string,
|
|
) (result string, err error) {
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
err = fmt.Errorf(
|
|
"panic while processing job=%s type=%s: %v\n%s",
|
|
jobUID, jobType, r, debug.Stack())
|
|
}
|
|
}()
|
|
|
|
exec, ok := e.handlerMap[jobType]
|
|
if !ok {
|
|
return "", errNoHandlerDefined
|
|
}
|
|
|
|
// progressReporter is the function with which the job can update its progress.
|
|
// This function will be executed in the job executor's Go-routine.
|
|
// It uses the job's context.
|
|
progressReporter := func(progress int, result string) error {
|
|
if progress < ProgressMin || progress > ProgressMax {
|
|
return errors.New("progress must be between 0 and 100")
|
|
}
|
|
|
|
jobDummy := &Job{
|
|
UID: jobUID,
|
|
Type: jobType,
|
|
Updated: time.Now().UnixMilli(),
|
|
Result: result,
|
|
State: JobStateRunning,
|
|
RunProgress: progress,
|
|
}
|
|
|
|
// This doesn't need to be behind the global lock because it only updates the single row.
|
|
// While a job is running no other process should touch it.
|
|
// Even this call will fail if the context deadline has been exceeded.
|
|
// The job parameter is a dummy Job object that just holds fields that should be updated.
|
|
if err := e.store.UpdateProgress(ctx, jobDummy); err != nil {
|
|
return err
|
|
}
|
|
|
|
// tell everybody that a job progress has been updated
|
|
if err := publishStateChange(ctx, e.publisher, jobDummy); err != nil {
|
|
log.Err(err).Msg("failed to publish job state change")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
return exec.Handle(ctx, input, progressReporter) // runs the job
|
|
}
|
|
|
|
func FailProgress() Progress {
|
|
return Progress{
|
|
State: JobStateFailed,
|
|
Progress: ProgressMax,
|
|
Result: "",
|
|
Failure: "",
|
|
}
|
|
}
|