From 5aec7bf37fe91d4a0a428a3201053a538dde4d38 Mon Sep 17 00:00:00 2001 From: Atefeh Mohseni-Ejiyeh Date: Fri, 8 Dec 2023 17:06:44 +0000 Subject: [PATCH] move job package to the top level (#883) --- app/api/controller/repo/import_progress.go | 12 +-- app/api/controller/space/export_progress.go | 4 +- app/services/cleanup/service.go | 2 +- app/services/cleanup/tokens.go | 2 +- app/services/cleanup/webhook_executions.go | 2 +- app/services/cleanup/wire.go | 2 +- app/services/exporter/repository.go | 6 +- app/services/exporter/wire.go | 2 +- app/services/importer/repository.go | 8 +- app/services/importer/wire.go | 2 +- app/services/metric/metrics.go | 2 +- app/services/metric/wire.go | 2 +- app/services/wire.go | 2 +- app/store/database.go | 43 ---------- app/store/database/job.go | 31 ++++--- app/store/database/wire.go | 3 +- cmd/gitness/wire.go | 2 +- cmd/gitness/wire_gen.go | 2 +- {app/services/job => job}/definition.go | 11 +-- job/enum.go | 89 +++++++++++++++++++++ {app/services/job => job}/executor.go | 21 +++-- {app/services/job => job}/job_overdue.go | 10 +-- {app/services/job => job}/job_purge.go | 7 +- {app/services/job => job}/lock.go | 0 {app/services/job => job}/pubsub.go | 15 ++-- {app/services/job => job}/scheduler.go | 73 ++++++++--------- job/store.go | 62 ++++++++++++++ {app/services/job => job}/timer.go | 0 {app/services/job => job}/timer_test.go | 0 job/types.go | 55 +++++++++++++ {app/services/job => job}/uid.go | 0 {app/services/job => job}/wire.go | 9 +-- types/job.go | 57 ------------- 33 files changed, 315 insertions(+), 223 deletions(-) rename {app/services/job => job}/definition.go (88%) create mode 100644 job/enum.go rename {app/services/job => job}/executor.go (89%) rename {app/services/job => job}/job_overdue.go (89%) rename {app/services/job => job}/job_purge.go (89%) rename {app/services/job => job}/lock.go (100%) rename {app/services/job => job}/pubsub.go (78%) rename {app/services/job => job}/scheduler.go (91%) create mode 100644 job/store.go rename {app/services/job => job}/timer.go (100%) rename {app/services/job => job}/timer_test.go (100%) create mode 100644 job/types.go rename {app/services/job => job}/uid.go (100%) rename {app/services/job => job}/wire.go (91%) delete mode 100644 types/job.go diff --git a/app/api/controller/repo/import_progress.go b/app/api/controller/repo/import_progress.go index cf01026e5..ec999f617 100644 --- a/app/api/controller/repo/import_progress.go +++ b/app/api/controller/repo/import_progress.go @@ -23,7 +23,7 @@ import ( "github.com/harness/gitness/app/api/usererror" "github.com/harness/gitness/app/auth" "github.com/harness/gitness/app/services/importer" - "github.com/harness/gitness/types" + "github.com/harness/gitness/job" "github.com/harness/gitness/types/enum" ) @@ -31,23 +31,23 @@ import ( func (c *Controller) ImportProgress(ctx context.Context, session *auth.Session, repoRef string, -) (types.JobProgress, error) { +) (job.Progress, error) { // note: can't use c.getRepoCheckAccess because this needs to fetch a repo being imported. repo, err := c.repoStore.FindByRef(ctx, repoRef) if err != nil { - return types.JobProgress{}, err + return job.Progress{}, err } if err = apiauth.CheckRepo(ctx, c.authorizer, session, repo, enum.PermissionRepoView, false); err != nil { - return types.JobProgress{}, err + return job.Progress{}, err } progress, err := c.importer.GetProgress(ctx, repo) if errors.Is(err, importer.ErrNotFound) { - return types.JobProgress{}, usererror.NotFound("No recent or ongoing import found for repository.") + return job.Progress{}, usererror.NotFound("No recent or ongoing import found for repository.") } if err != nil { - return types.JobProgress{}, fmt.Errorf("failed to retrieve import progress: %w", err) + return job.Progress{}, fmt.Errorf("failed to retrieve import progress: %w", err) } return progress, err diff --git a/app/api/controller/space/export_progress.go b/app/api/controller/space/export_progress.go index 22f6e352c..92df8b0c9 100644 --- a/app/api/controller/space/export_progress.go +++ b/app/api/controller/space/export_progress.go @@ -22,14 +22,14 @@ import ( "github.com/harness/gitness/app/api/usererror" "github.com/harness/gitness/app/auth" "github.com/harness/gitness/app/services/exporter" - "github.com/harness/gitness/types" + "github.com/harness/gitness/job" "github.com/harness/gitness/types/enum" "github.com/pkg/errors" ) type ExportProgressOutput struct { - Repos []types.JobProgress `json:"repos"` + Repos []job.Progress `json:"repos"` } // ExportProgress returns progress of the export job. diff --git a/app/services/cleanup/service.go b/app/services/cleanup/service.go index e6afa135a..6f30b9786 100644 --- a/app/services/cleanup/service.go +++ b/app/services/cleanup/service.go @@ -20,8 +20,8 @@ import ( "fmt" "time" - "github.com/harness/gitness/app/services/job" "github.com/harness/gitness/app/store" + "github.com/harness/gitness/job" ) type Config struct { diff --git a/app/services/cleanup/tokens.go b/app/services/cleanup/tokens.go index 448d41f9d..7fb2f5bf2 100644 --- a/app/services/cleanup/tokens.go +++ b/app/services/cleanup/tokens.go @@ -19,8 +19,8 @@ import ( "fmt" "time" - "github.com/harness/gitness/app/services/job" "github.com/harness/gitness/app/store" + "github.com/harness/gitness/job" "github.com/harness/gitness/types/enum" "github.com/rs/zerolog/log" diff --git a/app/services/cleanup/webhook_executions.go b/app/services/cleanup/webhook_executions.go index a7de55847..e97c6107c 100644 --- a/app/services/cleanup/webhook_executions.go +++ b/app/services/cleanup/webhook_executions.go @@ -19,8 +19,8 @@ import ( "fmt" "time" - "github.com/harness/gitness/app/services/job" "github.com/harness/gitness/app/store" + "github.com/harness/gitness/job" "github.com/rs/zerolog/log" ) diff --git a/app/services/cleanup/wire.go b/app/services/cleanup/wire.go index c9481424a..605cbdada 100644 --- a/app/services/cleanup/wire.go +++ b/app/services/cleanup/wire.go @@ -15,8 +15,8 @@ package cleanup import ( - "github.com/harness/gitness/app/services/job" "github.com/harness/gitness/app/store" + "github.com/harness/gitness/job" "github.com/google/wire" ) diff --git a/app/services/exporter/repository.go b/app/services/exporter/repository.go index 4fb20aa22..6eeff3288 100644 --- a/app/services/exporter/repository.go +++ b/app/services/exporter/repository.go @@ -25,12 +25,12 @@ import ( "time" "github.com/harness/gitness/app/api/controller/repo" - "github.com/harness/gitness/app/services/job" "github.com/harness/gitness/app/sse" "github.com/harness/gitness/app/store" gitnessurl "github.com/harness/gitness/app/url" "github.com/harness/gitness/encrypt" "github.com/harness/gitness/git" + "github.com/harness/gitness/job" "github.com/harness/gitness/types" "github.com/harness/gitness/types/enum" @@ -146,7 +146,7 @@ func (r *Repository) RunManyForSpace( return r.scheduler.RunJobs(ctx, jobGroupID, jobDefinitions) } -func checkJobAlreadyRunning(jobs []types.JobProgress) error { +func checkJobAlreadyRunning(jobs []job.Progress) error { if jobs == nil { return nil } @@ -251,7 +251,7 @@ func (r *Repository) getJobInput(data string) (Input, error) { return input, nil } -func (r *Repository) GetProgressForSpace(ctx context.Context, spaceID int64) ([]types.JobProgress, error) { +func (r *Repository) GetProgressForSpace(ctx context.Context, spaceID int64) ([]job.Progress, error) { groupID := getJobGroupID(spaceID) progress, err := r.scheduler.GetJobProgressForGroup(ctx, groupID) if err != nil { diff --git a/app/services/exporter/wire.go b/app/services/exporter/wire.go index 17d9418de..7085dd470 100644 --- a/app/services/exporter/wire.go +++ b/app/services/exporter/wire.go @@ -15,12 +15,12 @@ package exporter import ( - "github.com/harness/gitness/app/services/job" "github.com/harness/gitness/app/sse" "github.com/harness/gitness/app/store" "github.com/harness/gitness/app/url" "github.com/harness/gitness/encrypt" "github.com/harness/gitness/git" + "github.com/harness/gitness/job" "github.com/google/wire" ) diff --git a/app/services/importer/repository.go b/app/services/importer/repository.go index 229e11dbb..a53ab3154 100644 --- a/app/services/importer/repository.go +++ b/app/services/importer/repository.go @@ -26,13 +26,13 @@ import ( "github.com/harness/gitness/app/bootstrap" "github.com/harness/gitness/app/githook" - "github.com/harness/gitness/app/services/job" "github.com/harness/gitness/app/services/keywordsearch" "github.com/harness/gitness/app/sse" "github.com/harness/gitness/app/store" gitnessurl "github.com/harness/gitness/app/url" "github.com/harness/gitness/encrypt" "github.com/harness/gitness/git" + "github.com/harness/gitness/job" gitness_store "github.com/harness/gitness/store" "github.com/harness/gitness/store/database/dbtx" "github.com/harness/gitness/types" @@ -328,7 +328,7 @@ func (r *Repository) Handle(ctx context.Context, data string, _ job.ProgressRepo return "", nil } -func (r *Repository) GetProgress(ctx context.Context, repo *types.Repository) (types.JobProgress, error) { +func (r *Repository) GetProgress(ctx context.Context, repo *types.Repository) (job.Progress, error) { progress, err := r.scheduler.GetJobProgress(ctx, JobIDFromRepoID(repo.ID)) if errors.Is(err, gitness_store.ErrResourceNotFound) { if repo.Importing { @@ -337,10 +337,10 @@ func (r *Repository) GetProgress(ctx context.Context, repo *types.Repository) (t } // otherwise there either was no import, or it completed a long time ago (job cleaned up by now) - return types.JobProgress{}, ErrNotFound + return job.Progress{}, ErrNotFound } if err != nil { - return types.JobProgress{}, fmt.Errorf("failed to get job progress: %w", err) + return job.Progress{}, fmt.Errorf("failed to get job progress: %w", err) } return progress, nil diff --git a/app/services/importer/wire.go b/app/services/importer/wire.go index e4debeb8d..c5dac7f77 100644 --- a/app/services/importer/wire.go +++ b/app/services/importer/wire.go @@ -15,13 +15,13 @@ package importer import ( - "github.com/harness/gitness/app/services/job" "github.com/harness/gitness/app/services/keywordsearch" "github.com/harness/gitness/app/sse" "github.com/harness/gitness/app/store" "github.com/harness/gitness/app/url" "github.com/harness/gitness/encrypt" "github.com/harness/gitness/git" + "github.com/harness/gitness/job" "github.com/harness/gitness/store/database/dbtx" "github.com/harness/gitness/types" diff --git a/app/services/metric/metrics.go b/app/services/metric/metrics.go index 0998b4bb4..c75105af9 100644 --- a/app/services/metric/metrics.go +++ b/app/services/metric/metrics.go @@ -22,8 +22,8 @@ import ( "net/http" "time" - "github.com/harness/gitness/app/services/job" "github.com/harness/gitness/app/store" + "github.com/harness/gitness/job" "github.com/harness/gitness/types" "github.com/harness/gitness/version" ) diff --git a/app/services/metric/wire.go b/app/services/metric/wire.go index f0831f438..8208f8550 100644 --- a/app/services/metric/wire.go +++ b/app/services/metric/wire.go @@ -15,8 +15,8 @@ package metric import ( - "github.com/harness/gitness/app/services/job" "github.com/harness/gitness/app/store" + "github.com/harness/gitness/job" "github.com/harness/gitness/types" "github.com/google/wire" diff --git a/app/services/wire.go b/app/services/wire.go index 8c4166bca..938cf2aff 100644 --- a/app/services/wire.go +++ b/app/services/wire.go @@ -16,13 +16,13 @@ package services import ( "github.com/harness/gitness/app/services/cleanup" - "github.com/harness/gitness/app/services/job" "github.com/harness/gitness/app/services/keywordsearch" "github.com/harness/gitness/app/services/metric" "github.com/harness/gitness/app/services/notification" "github.com/harness/gitness/app/services/pullreq" "github.com/harness/gitness/app/services/trigger" "github.com/harness/gitness/app/services/webhook" + "github.com/harness/gitness/job" "github.com/google/wire" ) diff --git a/app/store/database.go b/app/store/database.go index 30fbd1400..1dffe6d6a 100644 --- a/app/store/database.go +++ b/app/store/database.go @@ -478,49 +478,6 @@ type ( // Delete removes a required status checks for a repo. Delete(ctx context.Context, repoID, reqCheckID int64) error } - - JobStore interface { - // Find fetches a job by its unique identifier. - Find(ctx context.Context, uid string) (*types.Job, error) - - // ListByGroupID fetches all jobs for a group id - ListByGroupID(ctx context.Context, groupID string) ([]*types.Job, error) - - // DeleteByGroupID deletes all jobs for a group id - DeleteByGroupID(ctx context.Context, groupID string) (int64, error) - - // Create is used to create a new job. - Create(ctx context.Context, job *types.Job) error - - // Upsert will insert the job in the database if the job didn't already exist, - // or it will update the existing one but only if its definition has changed. - Upsert(ctx context.Context, job *types.Job) error - - // UpdateDefinition is used to update a job definition. - UpdateDefinition(ctx context.Context, job *types.Job) error - - // UpdateExecution is used to update a job before and after execution. - UpdateExecution(ctx context.Context, job *types.Job) error - - // UpdateProgress is used to update a job progress data. - UpdateProgress(ctx context.Context, job *types.Job) error - - // CountRunning returns number of jobs that are currently being run. - CountRunning(ctx context.Context) (int, error) - - // ListReady returns a list of jobs that are ready for execution. - ListReady(ctx context.Context, now time.Time, limit int) ([]*types.Job, error) - - // ListDeadlineExceeded returns a list of jobs that have exceeded their execution deadline. - ListDeadlineExceeded(ctx context.Context, now time.Time) ([]*types.Job, error) - - // NextScheduledTime returns a scheduled time of the next ready job. - NextScheduledTime(ctx context.Context, now time.Time) (time.Time, error) - - // DeleteOld removes non-recurring jobs that have finished execution or have failed. - DeleteOld(ctx context.Context, olderThan time.Time) (int64, error) - } - PipelineStore interface { // Find returns a pipeline given a pipeline ID from the datastore. Find(ctx context.Context, id int64) (*types.Pipeline, error) diff --git a/app/store/database/job.go b/app/store/database/job.go index b76ce139a..4a38cbb59 100644 --- a/app/store/database/job.go +++ b/app/store/database/job.go @@ -21,17 +21,16 @@ import ( "fmt" "time" - "github.com/harness/gitness/app/store" + "github.com/harness/gitness/job" gitness_store "github.com/harness/gitness/store" "github.com/harness/gitness/store/database" "github.com/harness/gitness/store/database/dbtx" - "github.com/harness/gitness/types" "github.com/harness/gitness/types/enum" "github.com/jmoiron/sqlx" ) -var _ store.JobStore = (*JobStore)(nil) +var _ job.Store = (*JobStore)(nil) func NewJobStore(db *sqlx.DB) *JobStore { return &JobStore{ @@ -73,13 +72,13 @@ const ( ) // Find fetches a job by its unique identifier. -func (s *JobStore) Find(ctx context.Context, uid string) (*types.Job, error) { +func (s *JobStore) Find(ctx context.Context, uid string) (*job.Job, error) { const sqlQuery = jobSelectBase + ` WHERE job_uid = $1` db := dbtx.GetAccessor(ctx, s.db) - result := &types.Job{} + result := &job.Job{} if err := db.GetContext(ctx, result, sqlQuery, uid); err != nil { return nil, database.ProcessSQLErrorf(err, "Failed to find job by uid") } @@ -114,13 +113,13 @@ func (s *JobStore) DeleteByGroupID(ctx context.Context, groupID string) (int64, } // ListByGroupID fetches all jobs for a group id. -func (s *JobStore) ListByGroupID(ctx context.Context, groupID string) ([]*types.Job, error) { +func (s *JobStore) ListByGroupID(ctx context.Context, groupID string) ([]*job.Job, error) { const sqlQuery = jobSelectBase + ` WHERE job_group_id = $1` db := dbtx.GetAccessor(ctx, s.db) - dst := make([]*types.Job, 0) + dst := make([]*job.Job, 0) if err := db.SelectContext(ctx, &dst, sqlQuery, groupID); err != nil { return nil, database.ProcessSQLErrorf(err, "Failed to find job by group id") } @@ -129,7 +128,7 @@ func (s *JobStore) ListByGroupID(ctx context.Context, groupID string) ([]*types. } // Create creates a new job. -func (s *JobStore) Create(ctx context.Context, job *types.Job) error { +func (s *JobStore) Create(ctx context.Context, job *job.Job) error { const sqlQuery = ` INSERT INTO jobs (` + jobColumns + ` ) VALUES ( @@ -172,7 +171,7 @@ func (s *JobStore) Create(ctx context.Context, job *types.Job) error { // Upsert creates or updates a job. If the job didn't exist it will insert it in the database, // otherwise it will update it but only if its definition has changed. -func (s *JobStore) Upsert(ctx context.Context, job *types.Job) error { +func (s *JobStore) Upsert(ctx context.Context, job *job.Job) error { const sqlQuery = ` INSERT INTO jobs (` + jobColumns + ` ) VALUES ( @@ -235,7 +234,7 @@ func (s *JobStore) Upsert(ctx context.Context, job *types.Job) error { } // UpdateDefinition is used to update a job definition. -func (s *JobStore) UpdateDefinition(ctx context.Context, job *types.Job) error { +func (s *JobStore) UpdateDefinition(ctx context.Context, job *job.Job) error { const sqlQuery = ` UPDATE jobs SET @@ -278,7 +277,7 @@ func (s *JobStore) UpdateDefinition(ctx context.Context, job *types.Job) error { } // UpdateExecution is used to update a job before and after execution. -func (s *JobStore) UpdateExecution(ctx context.Context, job *types.Job) error { +func (s *JobStore) UpdateExecution(ctx context.Context, job *job.Job) error { const sqlQuery = ` UPDATE jobs SET @@ -318,7 +317,7 @@ func (s *JobStore) UpdateExecution(ctx context.Context, job *types.Job) error { return nil } -func (s *JobStore) UpdateProgress(ctx context.Context, job *types.Job) error { +func (s *JobStore) UpdateProgress(ctx context.Context, job *job.Job) error { const sqlQuery = ` UPDATE jobs SET @@ -376,7 +375,7 @@ func (s *JobStore) CountRunning(ctx context.Context) (int, error) { // ListReady returns a list of jobs that are ready for execution: // The jobs with state="scheduled" and scheduled time in the past. -func (s *JobStore) ListReady(ctx context.Context, now time.Time, limit int) ([]*types.Job, error) { +func (s *JobStore) ListReady(ctx context.Context, now time.Time, limit int) ([]*job.Job, error) { stmt := database.Builder. Select(jobColumns). From("jobs"). @@ -390,7 +389,7 @@ func (s *JobStore) ListReady(ctx context.Context, now time.Time, limit int) ([]* return nil, fmt.Errorf("failed to convert list scheduled jobs query to sql: %w", err) } - result := make([]*types.Job, 0) + result := make([]*job.Job, 0) db := dbtx.GetAccessor(ctx, s.db) @@ -402,7 +401,7 @@ func (s *JobStore) ListReady(ctx context.Context, now time.Time, limit int) ([]* } // ListDeadlineExceeded returns a list of jobs that have exceeded their execution deadline. -func (s *JobStore) ListDeadlineExceeded(ctx context.Context, now time.Time) ([]*types.Job, error) { +func (s *JobStore) ListDeadlineExceeded(ctx context.Context, now time.Time) ([]*job.Job, error) { stmt := database.Builder. Select(jobColumns). From("jobs"). @@ -415,7 +414,7 @@ func (s *JobStore) ListDeadlineExceeded(ctx context.Context, now time.Time) ([]* return nil, fmt.Errorf("failed to convert list overdue jobs query to sql: %w", err) } - result := make([]*types.Job, 0) + result := make([]*job.Job, 0) db := dbtx.GetAccessor(ctx, s.db) diff --git a/app/store/database/wire.go b/app/store/database/wire.go index b16c6d8b4..72008f388 100644 --- a/app/store/database/wire.go +++ b/app/store/database/wire.go @@ -19,6 +19,7 @@ import ( "github.com/harness/gitness/app/store" "github.com/harness/gitness/app/store/database/migrate" + "github.com/harness/gitness/job" "github.com/harness/gitness/store/database" "github.com/google/wire" @@ -120,7 +121,7 @@ func ProvideRuleStore( } // ProvideJobStore provides a job store. -func ProvideJobStore(db *sqlx.DB) store.JobStore { +func ProvideJobStore(db *sqlx.DB) job.Store { return NewJobStore(db) } diff --git a/cmd/gitness/wire.go b/cmd/gitness/wire.go index be3602bff..d5ca8506e 100644 --- a/cmd/gitness/wire.go +++ b/cmd/gitness/wire.go @@ -52,7 +52,6 @@ import ( "github.com/harness/gitness/app/services/codeowners" "github.com/harness/gitness/app/services/exporter" "github.com/harness/gitness/app/services/importer" - "github.com/harness/gitness/app/services/job" "github.com/harness/gitness/app/services/keywordsearch" "github.com/harness/gitness/app/services/metric" "github.com/harness/gitness/app/services/notification" @@ -75,6 +74,7 @@ import ( "github.com/harness/gitness/git" "github.com/harness/gitness/git/adapter" "github.com/harness/gitness/git/storage" + "github.com/harness/gitness/job" "github.com/harness/gitness/livelog" "github.com/harness/gitness/lock" "github.com/harness/gitness/pubsub" diff --git a/cmd/gitness/wire_gen.go b/cmd/gitness/wire_gen.go index 0d204876a..6967ca09d 100644 --- a/cmd/gitness/wire_gen.go +++ b/cmd/gitness/wire_gen.go @@ -52,7 +52,6 @@ import ( "github.com/harness/gitness/app/services/codeowners" "github.com/harness/gitness/app/services/exporter" "github.com/harness/gitness/app/services/importer" - "github.com/harness/gitness/app/services/job" "github.com/harness/gitness/app/services/keywordsearch" "github.com/harness/gitness/app/services/metric" "github.com/harness/gitness/app/services/notification" @@ -75,6 +74,7 @@ import ( "github.com/harness/gitness/git" "github.com/harness/gitness/git/adapter" "github.com/harness/gitness/git/storage" + "github.com/harness/gitness/job" "github.com/harness/gitness/livelog" "github.com/harness/gitness/lock" "github.com/harness/gitness/pubsub" diff --git a/app/services/job/definition.go b/job/definition.go similarity index 88% rename from app/services/job/definition.go rename to job/definition.go index 73823faca..30f682b0d 100644 --- a/app/services/job/definition.go +++ b/job/definition.go @@ -17,9 +17,6 @@ package job import ( "errors" "time" - - "github.com/harness/gitness/types" - "github.com/harness/gitness/types/enum" ) type Definition struct { @@ -50,19 +47,19 @@ func (def *Definition) Validate() error { return nil } -func (def *Definition) toNewJob() *types.Job { +func (def *Definition) toNewJob() *Job { nowMilli := time.Now().UnixMilli() - return &types.Job{ + return &Job{ UID: def.UID, Created: nowMilli, Updated: nowMilli, Type: def.Type, - Priority: enum.JobPriorityNormal, + Priority: JobPriorityNormal, Data: def.Data, Result: "", MaxDurationSeconds: int(def.Timeout / time.Second), MaxRetries: def.MaxRetries, - State: enum.JobStateScheduled, + State: JobStateScheduled, Scheduled: nowMilli, TotalExecutions: 0, RunBy: "", diff --git a/job/enum.go b/job/enum.go new file mode 100644 index 000000000..570f84204 --- /dev/null +++ b/job/enum.go @@ -0,0 +1,89 @@ +// Copyright 2023 Harness, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package job + +import ( + "golang.org/x/exp/constraints" + "golang.org/x/exp/slices" +) + +// State represents state of a background job. +type State string + +// State enumeration. +const ( + JobStateScheduled State = "scheduled" + JobStateRunning State = "running" + JobStateFinished State = "finished" + JobStateFailed State = "failed" + JobStateCanceled State = "canceled" +) + +var jobStates = sortEnum([]State{ + JobStateScheduled, + JobStateRunning, + JobStateFinished, + JobStateFailed, + JobStateCanceled, +}) + +func (State) Enum() []interface{} { return toInterfaceSlice(jobStates) } + +func (s State) Sanitize() (State, bool) { + return Sanitize(s, GetAllJobStates) +} + +func GetAllJobStates() ([]State, State) { + return jobStates, "" +} + +// Priority represents priority of a background job. +type Priority int + +// JobPriority enumeration. +const ( + JobPriorityNormal Priority = 0 + JobPriorityElevated Priority = 1 +) + +func (s State) IsCompleted() bool { + return s == JobStateFinished || s == JobStateFailed || s == JobStateCanceled +} + +func sortEnum[T constraints.Ordered](slice []T) []T { + slices.Sort(slice) + return slice +} + +func toInterfaceSlice[T interface{}](vals []T) []interface{} { + res := make([]interface{}, len(vals)) + for i := range vals { + res[i] = vals[i] + } + return res +} + +func Sanitize[E constraints.Ordered](element E, all func() ([]E, E)) (E, bool) { + allValues, defValue := all() + var empty E + if element == empty && defValue != empty { + return defValue, true + } + idx, exists := slices.BinarySearch(allValues, element) + if exists { + return allValues[idx], true + } + return defValue, false +} diff --git a/app/services/job/executor.go b/job/executor.go similarity index 89% rename from app/services/job/executor.go rename to job/executor.go index ea5bad887..adf8576b7 100644 --- a/app/services/job/executor.go +++ b/job/executor.go @@ -21,10 +21,7 @@ import ( "runtime/debug" "time" - "github.com/harness/gitness/app/store" "github.com/harness/gitness/pubsub" - "github.com/harness/gitness/types" - "github.com/harness/gitness/types/enum" "github.com/rs/zerolog/log" ) @@ -34,7 +31,7 @@ import ( type Executor struct { handlerMap map[string]Handler handlerComplete bool - store store.JobStore + store Store publisher pubsub.Publisher } @@ -56,11 +53,11 @@ type Handler interface { var errNoHandlerDefined = errors.New("no handler registered for the job type") // NewExecutor creates new Executor. -func NewExecutor(jobStore store.JobStore, publisher pubsub.Publisher) *Executor { +func NewExecutor(store Store, publisher pubsub.Publisher) *Executor { return &Executor{ handlerMap: make(map[string]Handler), handlerComplete: false, - store: jobStore, + store: store, publisher: publisher, } } @@ -124,19 +121,19 @@ func (e *Executor) exec( return errors.New("progress must be between 0 and 100") } - jobDummy := &types.Job{ + jobDummy := &Job{ UID: jobUID, Type: jobType, Updated: time.Now().UnixMilli(), Result: result, - State: enum.JobStateRunning, + State: JobStateRunning, RunProgress: progress, } // This doesn't need to be behind the global lock because it only updates the single row. // While a job is running no other process should touch it. // Even this call will fail if the context deadline has been exceeded. - // The job parameter is a dummy types.Job object that just holds fields that should be updated. + // The job parameter is a dummy Job object that just holds fields that should be updated. if err := e.store.UpdateProgress(ctx, jobDummy); err != nil { return err } @@ -152,9 +149,9 @@ func (e *Executor) exec( return exec.Handle(ctx, input, progressReporter) // runs the job } -func FailProgress() types.JobProgress { - return types.JobProgress{ - State: enum.JobStateFailed, +func FailProgress() Progress { + return Progress{ + State: JobStateFailed, Progress: ProgressMax, Result: "", Failure: "", diff --git a/app/services/job/job_overdue.go b/job/job_overdue.go similarity index 89% rename from app/services/job/job_overdue.go rename to job/job_overdue.go index 4e0c163e5..565b5c2be 100644 --- a/app/services/job/job_overdue.go +++ b/job/job_overdue.go @@ -19,9 +19,7 @@ import ( "fmt" "time" - "github.com/harness/gitness/app/store" "github.com/harness/gitness/lock" - "github.com/harness/gitness/types/enum" "github.com/rs/zerolog/log" ) @@ -33,14 +31,14 @@ const ( ) type jobOverdue struct { - store store.JobStore + store Store mxManager lock.MutexManager scheduler *Scheduler } -func newJobOverdue(jobStore store.JobStore, mxManager lock.MutexManager, scheduler *Scheduler) *jobOverdue { +func newJobOverdue(store Store, mxManager lock.MutexManager, scheduler *Scheduler) *jobOverdue { return &jobOverdue{ - store: jobStore, + store: store, mxManager: mxManager, scheduler: scheduler, } @@ -81,7 +79,7 @@ func (j *jobOverdue) Handle(ctx context.Context, _ string, _ ProgressReporter) ( return "", fmt.Errorf("failed update overdue job") } - if job.State == enum.JobStateScheduled { + if job.State == JobStateScheduled { scheduled := time.UnixMilli(job.Scheduled) if minScheduled.IsZero() || minScheduled.After(scheduled) { minScheduled = scheduled diff --git a/app/services/job/job_purge.go b/job/job_purge.go similarity index 89% rename from app/services/job/job_purge.go rename to job/job_purge.go index f84fb9ef1..d62a659ce 100644 --- a/app/services/job/job_purge.go +++ b/job/job_purge.go @@ -19,7 +19,6 @@ import ( "fmt" "time" - "github.com/harness/gitness/app/store" "github.com/harness/gitness/lock" "github.com/rs/zerolog/log" @@ -32,18 +31,18 @@ const ( ) type jobPurge struct { - store store.JobStore + store Store mxManager lock.MutexManager minOldAge time.Duration } -func newJobPurge(jobStore store.JobStore, mxManager lock.MutexManager, minOldAge time.Duration) *jobPurge { +func newJobPurge(store Store, mxManager lock.MutexManager, minOldAge time.Duration) *jobPurge { if minOldAge < 0 { minOldAge = 0 } return &jobPurge{ - store: jobStore, + store: store, mxManager: mxManager, minOldAge: minOldAge, } diff --git a/app/services/job/lock.go b/job/lock.go similarity index 100% rename from app/services/job/lock.go rename to job/lock.go diff --git a/app/services/job/pubsub.go b/job/pubsub.go similarity index 78% rename from app/services/job/pubsub.go rename to job/pubsub.go index 6b36c07fe..38f566bdd 100644 --- a/app/services/job/pubsub.go +++ b/job/pubsub.go @@ -21,7 +21,6 @@ import ( "fmt" "github.com/harness/gitness/pubsub" - "github.com/harness/gitness/types" ) const ( @@ -29,8 +28,8 @@ const ( PubSubTopicStateChange = "gitness:job:state_change" ) -func encodeStateChange(job *types.Job) ([]byte, error) { - stateChange := &types.JobStateChange{ +func encodeStateChange(job *Job) ([]byte, error) { + stateChange := &StateChange{ UID: job.UID, Type: job.Type, State: job.State, @@ -47,8 +46,8 @@ func encodeStateChange(job *types.Job) ([]byte, error) { return buffer.Bytes(), nil } -func DecodeStateChange(payload []byte) (*types.JobStateChange, error) { - stateChange := &types.JobStateChange{} +func DecodeStateChange(payload []byte) (*StateChange, error) { + stateChange := &StateChange{} if err := gob.NewDecoder(bytes.NewReader(payload)).Decode(stateChange); err != nil { return nil, err } @@ -56,15 +55,15 @@ func DecodeStateChange(payload []byte) (*types.JobStateChange, error) { return stateChange, nil } -func publishStateChange(ctx context.Context, publisher pubsub.Publisher, job *types.Job) error { +func publishStateChange(ctx context.Context, publisher pubsub.Publisher, job *Job) error { payload, err := encodeStateChange(job) if err != nil { - return fmt.Errorf("failed to gob encode JobStateChange: %w", err) + return fmt.Errorf("failed to gob encode StateChange: %w", err) } err = publisher.Publish(ctx, PubSubTopicStateChange, payload) if err != nil { - return fmt.Errorf("failed to publish JobStateChange: %w", err) + return fmt.Errorf("failed to publish StateChange: %w", err) } return nil diff --git a/app/services/job/scheduler.go b/job/scheduler.go similarity index 91% rename from app/services/job/scheduler.go rename to job/scheduler.go index 17927ad3f..2438c8d1c 100644 --- a/app/services/job/scheduler.go +++ b/job/scheduler.go @@ -22,11 +22,8 @@ import ( "sync" "time" - "github.com/harness/gitness/app/store" "github.com/harness/gitness/lock" "github.com/harness/gitness/pubsub" - "github.com/harness/gitness/types" - "github.com/harness/gitness/types/enum" "github.com/gorhill/cronexpr" "github.com/rs/zerolog/log" @@ -35,7 +32,7 @@ import ( // Scheduler controls execution of background jobs. type Scheduler struct { // dependencies - store store.JobStore + store Store executor *Executor mxManager lock.MutexManager pubsubService pubsub.PubSub @@ -54,7 +51,7 @@ type Scheduler struct { } func NewScheduler( - jobStore store.JobStore, + store Store, executor *Executor, mxManager lock.MutexManager, pubsubService pubsub.PubSub, @@ -66,7 +63,7 @@ func NewScheduler( maxRunning = 1 } return &Scheduler{ - store: jobStore, + store: store, executor: executor, mxManager: mxManager, pubsubService: pubsubService, @@ -210,14 +207,14 @@ func (s *Scheduler) CancelJob(ctx context.Context, jobUID string) error { return errors.New("can't cancel recurring jobs") } - if job.State != enum.JobStateScheduled && job.State != enum.JobStateRunning { + if job.State != JobStateScheduled && job.State != JobStateRunning { return nil // return no error if the job is already canceled or has finished or failed. } // first we update the job in the database... job.Updated = time.Now().UnixMilli() - job.State = enum.JobStateCanceled + job.State = JobStateCanceled err = s.store.UpdateExecution(ctx, job) if err != nil { @@ -299,7 +296,7 @@ func (s *Scheduler) RunJobs(ctx context.Context, groupID string, defs []Definiti return nil } - jobs := make([]*types.Job, len(defs)) + jobs := make([]*Job, len(defs)) for i, def := range defs { if err := def.Validate(); err != nil { return err @@ -416,7 +413,7 @@ func (s *Scheduler) availableSlots(ctx context.Context) (int, error) { // runJob updates the job in the database and starts it in a separate goroutine. // The function will also log the execution. -func (s *Scheduler) runJob(ctx context.Context, j *types.Job) { +func (s *Scheduler) runJob(ctx context.Context, j *Job) { s.wgRunning.Add(1) go func(ctx context.Context, jobUID, jobType, jobData string, @@ -474,19 +471,19 @@ func (s *Scheduler) runJob(ctx context.Context, j *types.Job) { } switch job.State { - case enum.JobStateFinished: + case JobStateFinished: logInfo.Msg("job successfully finished") s.scheduleIfHaveMoreJobs() - case enum.JobStateFailed: + case JobStateFailed: logInfo.Msg("job failed") s.scheduleIfHaveMoreJobs() - case enum.JobStateCanceled: + case JobStateCanceled: log.Ctx(ctx).Error().Msg("job canceled") s.scheduleIfHaveMoreJobs() - case enum.JobStateScheduled: + case JobStateScheduled: scheduledTime := time.UnixMilli(job.Scheduled) logInfo. Str("job.Scheduled", scheduledTime.Format(time.RFC3339Nano)). @@ -494,7 +491,7 @@ func (s *Scheduler) runJob(ctx context.Context, j *types.Job) { s.scheduleProcessing(scheduledTime) - case enum.JobStateRunning: + case JobStateRunning: log.Ctx(ctx).Error().Msg("should not happen; job still has state=running after finishing") } @@ -505,8 +502,8 @@ func (s *Scheduler) runJob(ctx context.Context, j *types.Job) { }(ctx, j.UID, j.Type, j.Data, j.RunDeadline) } -// preExec updates the provided types.Job before execution. -func (s *Scheduler) preExec(job *types.Job) { +// preExec updates the provided Job before execution. +func (s *Scheduler) preExec(job *Job) { if job.MaxDurationSeconds < 1 { job.MaxDurationSeconds = 1 } @@ -519,7 +516,7 @@ func (s *Scheduler) preExec(job *types.Job) { job.Updated = nowMilli job.LastExecuted = nowMilli - job.State = enum.JobStateRunning + job.State = JobStateRunning job.RunDeadline = execDeadline.UnixMilli() job.RunBy = s.instanceID job.RunProgress = ProgressMin @@ -528,7 +525,7 @@ func (s *Scheduler) preExec(job *types.Job) { job.LastFailureError = "" } -// doExec executes the provided types.Job. +// doExec executes the provided Job. func (s *Scheduler) doExec(ctx context.Context, jobUID, jobType, jobData string, jobRunDeadline int64, @@ -561,14 +558,14 @@ func (s *Scheduler) doExec(ctx context.Context, return } -// postExec updates the provided types.Job after execution and reschedules it if necessary. +// postExec updates the provided Job after execution and reschedules it if necessary. // //nolint:gocognit // refactor if needed. -func postExec(job *types.Job, resultData, resultErr string) { +func postExec(job *Job, resultData, resultErr string) { // Proceed with the update of the job if it's in the running state or // if it's marked as canceled but has succeeded nonetheless. // Other states should not happen, but if they do, just leave the job as it is. - if job.State != enum.JobStateRunning && (job.State != enum.JobStateCanceled || resultErr != "") { + if job.State != JobStateRunning && (job.State != JobStateCanceled || resultErr != "") { return } @@ -581,10 +578,10 @@ func postExec(job *types.Job, resultData, resultErr string) { if resultErr != "" { job.ConsecutiveFailures++ - job.State = enum.JobStateFailed + job.State = JobStateFailed job.LastFailureError = resultErr } else { - job.State = enum.JobStateFinished + job.State = JobStateFinished job.RunProgress = ProgressMax } @@ -597,7 +594,7 @@ func postExec(job *types.Job, resultData, resultErr string) { exp, err := cronexpr.Parse(job.RecurringCron) if err != nil { - job.State = enum.JobStateFailed + job.State = JobStateFailed messages := fmt.Sprintf("failed to parse cron string: %s", err.Error()) if job.LastFailureError != "" { @@ -606,7 +603,7 @@ func postExec(job *types.Job, resultData, resultErr string) { job.LastFailureError = messages } else { - job.State = enum.JobStateScheduled + job.State = JobStateScheduled job.Scheduled = exp.Next(now).UnixMilli() } @@ -614,24 +611,24 @@ func postExec(job *types.Job, resultData, resultErr string) { } // Reschedule the failed job if retrying is allowed - if job.State == enum.JobStateFailed && job.ConsecutiveFailures <= job.MaxRetries { + if job.State == JobStateFailed && job.ConsecutiveFailures <= job.MaxRetries { const retryDelay = 15 * time.Second - job.State = enum.JobStateScheduled + job.State = JobStateScheduled job.Scheduled = now.Add(retryDelay).UnixMilli() job.RunProgress = ProgressMin } } -func (s *Scheduler) GetJobProgress(ctx context.Context, jobUID string) (types.JobProgress, error) { +func (s *Scheduler) GetJobProgress(ctx context.Context, jobUID string) (Progress, error) { job, err := s.store.Find(ctx, jobUID) if err != nil { - return types.JobProgress{}, err + return Progress{}, err } return mapToProgress(job), nil } -func (s *Scheduler) GetJobProgressForGroup(ctx context.Context, jobGroupUID string) ([]types.JobProgress, error) { +func (s *Scheduler) GetJobProgressForGroup(ctx context.Context, jobGroupUID string) ([]Progress, error) { job, err := s.store.ListByGroupID(ctx, jobGroupUID) if err != nil { return nil, err @@ -647,19 +644,19 @@ func (s *Scheduler) PurgeJobsByGroupID(ctx context.Context, jobGroupID string) ( return n, nil } -func mapToProgressMany(jobs []*types.Job) []types.JobProgress { +func mapToProgressMany(jobs []*Job) []Progress { if jobs == nil { return nil } - j := make([]types.JobProgress, len(jobs)) + j := make([]Progress, len(jobs)) for i, job := range jobs { j[i] = mapToProgress(job) } return j } -func mapToProgress(job *types.Job) types.JobProgress { - return types.JobProgress{ +func mapToProgress(job *Job) Progress { + return Progress{ State: job.State, Progress: job.RunProgress, Result: job.Result, @@ -684,17 +681,17 @@ func (s *Scheduler) AddRecurring( nextExec := cronExp.Next(now) - job := &types.Job{ + job := &Job{ UID: jobUID, Created: nowMilli, Updated: nowMilli, Type: jobType, - Priority: enum.JobPriorityElevated, + Priority: JobPriorityElevated, Data: "", Result: "", MaxDurationSeconds: int(maxDur / time.Second), MaxRetries: 0, - State: enum.JobStateScheduled, + State: JobStateScheduled, Scheduled: nextExec.UnixMilli(), TotalExecutions: 0, RunBy: "", diff --git a/job/store.go b/job/store.go new file mode 100644 index 000000000..4ba96f74b --- /dev/null +++ b/job/store.go @@ -0,0 +1,62 @@ +// Copyright 2023 Harness, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package job + +import ( + "context" + "time" +) + +type Store interface { + // Find fetches a job by its unique identifier. + Find(ctx context.Context, uid string) (*Job, error) + + // ListByGroupID fetches all jobs for a group id + ListByGroupID(ctx context.Context, groupID string) ([]*Job, error) + + // DeleteByGroupID deletes all jobs for a group id + DeleteByGroupID(ctx context.Context, groupID string) (int64, error) + + // Create is used to create a new job. + Create(ctx context.Context, job *Job) error + + // Upsert will insert the job in the database if the job didn't already exist, + // or it will update the existing one but only if its definition has changed. + Upsert(ctx context.Context, job *Job) error + + // UpdateDefinition is used to update a job definition. + UpdateDefinition(ctx context.Context, job *Job) error + + // UpdateExecution is used to update a job before and after execution. + UpdateExecution(ctx context.Context, job *Job) error + + // UpdateProgress is used to update a job progress data. + UpdateProgress(ctx context.Context, job *Job) error + + // CountRunning returns number of jobs that are currently being run. + CountRunning(ctx context.Context) (int, error) + + // ListReady returns a list of jobs that are ready for execution. + ListReady(ctx context.Context, now time.Time, limit int) ([]*Job, error) + + // ListDeadlineExceeded returns a list of jobs that have exceeded their execution deadline. + ListDeadlineExceeded(ctx context.Context, now time.Time) ([]*Job, error) + + // NextScheduledTime returns a scheduled time of the next ready job. + NextScheduledTime(ctx context.Context, now time.Time) (time.Time, error) + + // DeleteOld removes non-recurring jobs that have finished execution or have failed. + DeleteOld(ctx context.Context, olderThan time.Time) (int64, error) +} diff --git a/app/services/job/timer.go b/job/timer.go similarity index 100% rename from app/services/job/timer.go rename to job/timer.go diff --git a/app/services/job/timer_test.go b/job/timer_test.go similarity index 100% rename from app/services/job/timer_test.go rename to job/timer_test.go diff --git a/job/types.go b/job/types.go new file mode 100644 index 000000000..cdd90df4e --- /dev/null +++ b/job/types.go @@ -0,0 +1,55 @@ +// Copyright 2023 Harness, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package job + +type Job struct { + UID string `db:"job_uid"` + Created int64 `db:"job_created"` + Updated int64 `db:"job_updated"` + Type string `db:"job_type"` + Priority Priority `db:"job_priority"` + Data string `db:"job_data"` + Result string `db:"job_result"` + MaxDurationSeconds int `db:"job_max_duration_seconds"` + MaxRetries int `db:"job_max_retries"` + State State `db:"job_state"` + Scheduled int64 `db:"job_scheduled"` + TotalExecutions int `db:"job_total_executions"` + RunBy string `db:"job_run_by"` + RunDeadline int64 `db:"job_run_deadline"` + RunProgress int `db:"job_run_progress"` + LastExecuted int64 `db:"job_last_executed"` + IsRecurring bool `db:"job_is_recurring"` + RecurringCron string `db:"job_recurring_cron"` + ConsecutiveFailures int `db:"job_consecutive_failures"` + LastFailureError string `db:"job_last_failure_error"` + GroupID string `db:"job_group_id"` +} + +type StateChange struct { + UID string `json:"uid"` + Type string `json:"type"` + State State `json:"state"` + Progress int `json:"progress"` + Result string `json:"result"` + Failure string `json:"failure"` +} + +type Progress struct { + State State `json:"state"` + Progress int `json:"progress"` + Result string `json:"result,omitempty"` + Failure string `json:"failure,omitempty"` +} diff --git a/app/services/job/uid.go b/job/uid.go similarity index 100% rename from app/services/job/uid.go rename to job/uid.go diff --git a/app/services/job/wire.go b/job/wire.go similarity index 91% rename from app/services/job/wire.go rename to job/wire.go index 48f0c6ade..b8c65ba70 100644 --- a/app/services/job/wire.go +++ b/job/wire.go @@ -15,7 +15,6 @@ package job import ( - "github.com/harness/gitness/app/store" "github.com/harness/gitness/lock" "github.com/harness/gitness/pubsub" "github.com/harness/gitness/types" @@ -29,24 +28,24 @@ var WireSet = wire.NewSet( ) func ProvideExecutor( - jobStore store.JobStore, + store Store, pubsubService pubsub.PubSub, ) *Executor { return NewExecutor( - jobStore, + store, pubsubService, ) } func ProvideScheduler( - jobStore store.JobStore, + store Store, executor *Executor, mutexManager lock.MutexManager, pubsubService pubsub.PubSub, config *types.Config, ) (*Scheduler, error) { return NewScheduler( - jobStore, + store, executor, mutexManager, pubsubService, diff --git a/types/job.go b/types/job.go deleted file mode 100644 index dec58fbd6..000000000 --- a/types/job.go +++ /dev/null @@ -1,57 +0,0 @@ -// Copyright 2023 Harness, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package types - -import "github.com/harness/gitness/types/enum" - -type Job struct { - UID string `db:"job_uid"` - Created int64 `db:"job_created"` - Updated int64 `db:"job_updated"` - Type string `db:"job_type"` - Priority enum.JobPriority `db:"job_priority"` - Data string `db:"job_data"` - Result string `db:"job_result"` - MaxDurationSeconds int `db:"job_max_duration_seconds"` - MaxRetries int `db:"job_max_retries"` - State enum.JobState `db:"job_state"` - Scheduled int64 `db:"job_scheduled"` - TotalExecutions int `db:"job_total_executions"` - RunBy string `db:"job_run_by"` - RunDeadline int64 `db:"job_run_deadline"` - RunProgress int `db:"job_run_progress"` - LastExecuted int64 `db:"job_last_executed"` - IsRecurring bool `db:"job_is_recurring"` - RecurringCron string `db:"job_recurring_cron"` - ConsecutiveFailures int `db:"job_consecutive_failures"` - LastFailureError string `db:"job_last_failure_error"` - GroupID string `db:"job_group_id"` -} - -type JobStateChange struct { - UID string `json:"uid"` - Type string `json:"type"` - State enum.JobState `json:"state"` - Progress int `json:"progress"` - Result string `json:"result"` - Failure string `json:"failure"` -} - -type JobProgress struct { - State enum.JobState `json:"state"` - Progress int `json:"progress"` - Result string `json:"result,omitempty"` - Failure string `json:"failure,omitempty"` -}