505 lines
13 KiB
Go

// Copyright 2023 Harness, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package database
import (
"context"
"database/sql"
"errors"
"fmt"
"time"
"github.com/harness/gitness/job"
gitness_store "github.com/harness/gitness/store"
"github.com/harness/gitness/store/database"
"github.com/harness/gitness/store/database/dbtx"
"github.com/harness/gitness/types/enum"
"github.com/jmoiron/sqlx"
)
var _ job.Store = (*JobStore)(nil)
func NewJobStore(db *sqlx.DB) *JobStore {
return &JobStore{
db: db,
}
}
type JobStore struct {
db *sqlx.DB
}
const (
jobColumns = `
job_uid
,job_created
,job_updated
,job_type
,job_priority
,job_data
,job_result
,job_max_duration_seconds
,job_max_retries
,job_state
,job_scheduled
,job_total_executions
,job_run_by
,job_run_deadline
,job_run_progress
,job_last_executed
,job_is_recurring
,job_recurring_cron
,job_consecutive_failures
,job_last_failure_error
,job_group_id`
jobSelectBase = `
SELECT` + jobColumns + `
FROM jobs`
)
// Find fetches a job by its unique identifier.
func (s *JobStore) Find(ctx context.Context, uid string) (*job.Job, error) {
const sqlQuery = jobSelectBase + `
WHERE job_uid = $1`
db := dbtx.GetAccessor(ctx, s.db)
result := &job.Job{}
if err := db.GetContext(ctx, result, sqlQuery, uid); err != nil {
return nil, database.ProcessSQLErrorf(ctx, err, "Failed to find job by uid")
}
return result, nil
}
// DeleteByGroupID deletes all jobs for a group id.
func (s *JobStore) DeleteByGroupID(ctx context.Context, groupID string) (int64, error) {
stmt := database.Builder.
Delete("jobs").
Where("(job_group_id = ?)", groupID)
sql, args, err := stmt.ToSql()
if err != nil {
return 0, fmt.Errorf("failed to convert delete by group id jobs query to sql: %w", err)
}
db := dbtx.GetAccessor(ctx, s.db)
result, err := db.ExecContext(ctx, sql, args...)
if err != nil {
return 0, database.ProcessSQLErrorf(ctx, err, "failed to execute delete jobs by group id query")
}
n, err := result.RowsAffected()
if err != nil {
return 0, database.ProcessSQLErrorf(ctx, err, "failed to get number of deleted jobs in group")
}
return n, nil
}
// ListByGroupID fetches all jobs for a group id.
func (s *JobStore) ListByGroupID(ctx context.Context, groupID string) ([]*job.Job, error) {
const sqlQuery = jobSelectBase + `
WHERE job_group_id = $1`
db := dbtx.GetAccessor(ctx, s.db)
dst := make([]*job.Job, 0)
if err := db.SelectContext(ctx, &dst, sqlQuery, groupID); err != nil {
return nil, database.ProcessSQLErrorf(ctx, err, "Failed to find job by group id")
}
return dst, nil
}
// Create creates a new job.
func (s *JobStore) Create(ctx context.Context, job *job.Job) error {
const sqlQuery = `
INSERT INTO jobs (` + jobColumns + `
) VALUES (
:job_uid
,:job_created
,:job_updated
,:job_type
,:job_priority
,:job_data
,:job_result
,:job_max_duration_seconds
,:job_max_retries
,:job_state
,:job_scheduled
,:job_total_executions
,:job_run_by
,:job_run_deadline
,:job_run_progress
,:job_last_executed
,:job_is_recurring
,:job_recurring_cron
,:job_consecutive_failures
,:job_last_failure_error
,:job_group_id
)`
db := dbtx.GetAccessor(ctx, s.db)
query, arg, err := db.BindNamed(sqlQuery, job)
if err != nil {
return database.ProcessSQLErrorf(ctx, err, "Failed to bind job object")
}
if _, err := db.ExecContext(ctx, query, arg...); err != nil {
return database.ProcessSQLErrorf(ctx, err, "Insert query failed")
}
return nil
}
// Upsert creates or updates a job. If the job didn't exist it will insert it in the database,
// otherwise it will update it but only if its definition has changed.
func (s *JobStore) Upsert(ctx context.Context, job *job.Job) error {
const sqlQuery = `
INSERT INTO jobs (` + jobColumns + `
) VALUES (
:job_uid
,:job_created
,:job_updated
,:job_type
,:job_priority
,:job_data
,:job_result
,:job_max_duration_seconds
,:job_max_retries
,:job_state
,:job_scheduled
,:job_total_executions
,:job_run_by
,:job_run_deadline
,:job_run_progress
,:job_last_executed
,:job_is_recurring
,:job_recurring_cron
,:job_consecutive_failures
,:job_last_failure_error
,:job_group_id
)
ON CONFLICT (job_uid) DO
UPDATE SET
job_updated = :job_updated
,job_type = :job_type
,job_priority = :job_priority
,job_data = :job_data
,job_result = :job_result
,job_max_duration_seconds = :job_max_duration_seconds
,job_max_retries = :job_max_retries
,job_state = :job_state
,job_scheduled = :job_scheduled
,job_is_recurring = :job_is_recurring
,job_recurring_cron = :job_recurring_cron
WHERE
jobs.job_type <> :job_type OR
jobs.job_priority <> :job_priority OR
jobs.job_data <> :job_data OR
jobs.job_max_duration_seconds <> :job_max_duration_seconds OR
jobs.job_max_retries <> :job_max_retries OR
jobs.job_is_recurring <> :job_is_recurring OR
jobs.job_recurring_cron <> :job_recurring_cron`
db := dbtx.GetAccessor(ctx, s.db)
query, arg, err := db.BindNamed(sqlQuery, job)
if err != nil {
return database.ProcessSQLErrorf(ctx, err, "Failed to bind job object")
}
if _, err := db.ExecContext(ctx, query, arg...); err != nil {
return database.ProcessSQLErrorf(ctx, err, "Upsert query failed")
}
return nil
}
// UpdateDefinition is used to update a job definition.
func (s *JobStore) UpdateDefinition(ctx context.Context, job *job.Job) error {
const sqlQuery = `
UPDATE jobs
SET
job_updated = :job_updated
,job_type = :job_type
,job_priority = :job_priority
,job_data = :job_data
,job_result = :job_result
,job_max_duration_seconds = :job_max_duration_seconds
,job_max_retries = :job_max_retries
,job_state = :job_state
,job_scheduled = :job_scheduled
,job_is_recurring = :job_is_recurring
,job_recurring_cron = :job_recurring_cron
,job_group_id = :job_group_id
WHERE job_uid = :job_uid`
db := dbtx.GetAccessor(ctx, s.db)
query, arg, err := db.BindNamed(sqlQuery, job)
if err != nil {
return database.ProcessSQLErrorf(ctx, err, "Failed to bind job object for update")
}
result, err := db.ExecContext(ctx, query, arg...)
if err != nil {
return database.ProcessSQLErrorf(ctx, err, "Failed to update job definition")
}
count, err := result.RowsAffected()
if err != nil {
return database.ProcessSQLErrorf(ctx, err, "Failed to get number of updated rows")
}
if count == 0 {
return gitness_store.ErrResourceNotFound
}
return nil
}
// UpdateExecution is used to update a job before and after execution.
func (s *JobStore) UpdateExecution(ctx context.Context, job *job.Job) error {
const sqlQuery = `
UPDATE jobs
SET
job_updated = :job_updated
,job_result = :job_result
,job_state = :job_state
,job_scheduled = :job_scheduled
,job_total_executions = :job_total_executions
,job_run_by = :job_run_by
,job_run_deadline = :job_run_deadline
,job_last_executed = :job_last_executed
,job_consecutive_failures = :job_consecutive_failures
,job_last_failure_error = :job_last_failure_error
WHERE job_uid = :job_uid`
db := dbtx.GetAccessor(ctx, s.db)
query, arg, err := db.BindNamed(sqlQuery, job)
if err != nil {
return database.ProcessSQLErrorf(ctx, err, "Failed to bind job object for update")
}
result, err := db.ExecContext(ctx, query, arg...)
if err != nil {
return database.ProcessSQLErrorf(ctx, err, "Failed to update job execution")
}
count, err := result.RowsAffected()
if err != nil {
return database.ProcessSQLErrorf(ctx, err, "Failed to get number of updated rows")
}
if count == 0 {
return gitness_store.ErrResourceNotFound
}
return nil
}
func (s *JobStore) UpdateProgress(ctx context.Context, job *job.Job) error {
const sqlQuery = `
UPDATE jobs
SET
job_updated = :job_updated
,job_result = :job_result
,job_run_progress = :job_run_progress
WHERE job_uid = :job_uid AND job_state = 'running'`
db := dbtx.GetAccessor(ctx, s.db)
query, arg, err := db.BindNamed(sqlQuery, job)
if err != nil {
return database.ProcessSQLErrorf(ctx, err, "Failed to bind job object for update")
}
result, err := db.ExecContext(ctx, query, arg...)
if err != nil {
return database.ProcessSQLErrorf(ctx, err, "Failed to update job progress")
}
count, err := result.RowsAffected()
if err != nil {
return database.ProcessSQLErrorf(ctx, err, "Failed to get number of updated rows")
}
if count == 0 {
return gitness_store.ErrResourceNotFound
}
return nil
}
// CountRunning returns number of jobs that are currently being run.
func (s *JobStore) CountRunning(ctx context.Context) (int, error) {
stmt := database.Builder.
Select("count(*)").
From("jobs").
Where("job_state = ?", enum.JobStateRunning)
sql, args, err := stmt.ToSql()
if err != nil {
return 0, fmt.Errorf("failed to convert count running jobs query to sql: %w", err)
}
db := dbtx.GetAccessor(ctx, s.db)
var count int64
err = db.QueryRowContext(ctx, sql, args...).Scan(&count)
if err != nil {
return 0, database.ProcessSQLErrorf(ctx, err, "failed executing count running jobs query")
}
return int(count), nil
}
// ListReady returns a list of jobs that are ready for execution:
// The jobs with state="scheduled" and scheduled time in the past.
func (s *JobStore) ListReady(ctx context.Context, now time.Time, limit int) ([]*job.Job, error) {
stmt := database.Builder.
Select(jobColumns).
From("jobs").
Where("job_state = ?", enum.JobStateScheduled).
Where("job_scheduled <= ?", now.UnixMilli()).
OrderBy("job_priority desc, job_scheduled asc, job_uid asc").
Limit(uint64(limit))
sql, args, err := stmt.ToSql()
if err != nil {
return nil, fmt.Errorf("failed to convert list scheduled jobs query to sql: %w", err)
}
result := make([]*job.Job, 0)
db := dbtx.GetAccessor(ctx, s.db)
if err = db.SelectContext(ctx, &result, sql, args...); err != nil {
return nil, database.ProcessSQLErrorf(ctx, err, "failed to execute list scheduled jobs query")
}
return result, nil
}
// ListDeadlineExceeded returns a list of jobs that have exceeded their execution deadline.
func (s *JobStore) ListDeadlineExceeded(ctx context.Context, now time.Time) ([]*job.Job, error) {
stmt := database.Builder.
Select(jobColumns).
From("jobs").
Where("job_state = ?", enum.JobStateRunning).
Where("job_run_deadline < ?", now.UnixMilli()).
OrderBy("job_run_deadline asc")
sql, args, err := stmt.ToSql()
if err != nil {
return nil, fmt.Errorf("failed to convert list overdue jobs query to sql: %w", err)
}
result := make([]*job.Job, 0)
db := dbtx.GetAccessor(ctx, s.db)
if err = db.SelectContext(ctx, &result, sql, args...); err != nil {
return nil, database.ProcessSQLErrorf(ctx, err, "failed to execute list overdue jobs query")
}
return result, nil
}
// NextScheduledTime returns a scheduled time of the next ready job or zero time if no such job exists.
func (s *JobStore) NextScheduledTime(ctx context.Context, now time.Time) (time.Time, error) {
stmt := database.Builder.
Select("job_scheduled").
From("jobs").
Where("job_state = ?", enum.JobStateScheduled).
Where("job_scheduled > ?", now.UnixMilli()).
OrderBy("job_scheduled asc").
Limit(1)
query, args, err := stmt.ToSql()
if err != nil {
return time.Time{}, fmt.Errorf("failed to convert next scheduled time query to sql: %w", err)
}
db := dbtx.GetAccessor(ctx, s.db)
var result int64
err = db.QueryRowContext(ctx, query, args...).Scan(&result)
if errors.Is(err, sql.ErrNoRows) {
return time.Time{}, nil
}
if err != nil {
return time.Time{}, database.ProcessSQLErrorf(ctx, err, "failed to execute next scheduled time query")
}
return time.UnixMilli(result), nil
}
// DeleteOld removes non-recurring jobs that have finished execution or have failed.
func (s *JobStore) DeleteOld(ctx context.Context, olderThan time.Time) (int64, error) {
stmt := database.Builder.
Delete("jobs").
Where("(job_state = ? OR job_state = ? OR job_state = ?)",
enum.JobStateFinished, enum.JobStateFailed, enum.JobStateCanceled).
Where("job_is_recurring = false").
Where("job_last_executed < ?", olderThan.UnixMilli())
sql, args, err := stmt.ToSql()
if err != nil {
return 0, fmt.Errorf("failed to convert delete done jobs query to sql: %w", err)
}
db := dbtx.GetAccessor(ctx, s.db)
result, err := db.ExecContext(ctx, sql, args...)
if err != nil {
return 0, database.ProcessSQLErrorf(ctx, err, "failed to execute delete done jobs query")
}
n, err := result.RowsAffected()
if err != nil {
return 0, database.ProcessSQLErrorf(ctx, err, "failed to get number of deleted jobs")
}
return n, nil
}
// DeleteByID deletes a job by its unique identifier.
func (s *JobStore) DeleteByUID(ctx context.Context, jobUID string) error {
stmt := database.Builder.
Delete("jobs").
Where("job_uid = ?", jobUID)
sql, args, err := stmt.ToSql()
if err != nil {
return fmt.Errorf("failed to convert delete job query to sql: %w", err)
}
db := dbtx.GetAccessor(ctx, s.db)
_, err = db.ExecContext(ctx, sql, args...)
if err != nil {
return database.ProcessSQLErrorf(ctx, err, "failed to execute delete job query")
}
return nil
}