mirror of https://github.com/harness/drone.git
320 lines
6.7 KiB
Go
320 lines
6.7 KiB
Go
// Copyright 2023 Harness, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package scheduler
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/harness/gitness/app/store"
|
|
"github.com/harness/gitness/lock"
|
|
"github.com/harness/gitness/types"
|
|
"github.com/harness/gitness/types/enum"
|
|
|
|
"github.com/rs/zerolog/log"
|
|
)
|
|
|
|
type queue struct {
|
|
sync.Mutex
|
|
globMx lock.Mutex
|
|
|
|
ready chan struct{}
|
|
paused bool
|
|
interval time.Duration
|
|
store store.StageStore
|
|
workers map[*worker]struct{}
|
|
ctx context.Context
|
|
}
|
|
|
|
// newQueue returns a new Queue backed by the build datastore.
|
|
func newQueue(store store.StageStore, lock lock.MutexManager) (*queue, error) {
|
|
const lockKey = "build_queue"
|
|
mx, err := lock.NewMutex(lockKey)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
q := &queue{
|
|
store: store,
|
|
globMx: mx,
|
|
ready: make(chan struct{}, 1),
|
|
workers: map[*worker]struct{}{},
|
|
interval: time.Minute,
|
|
ctx: context.Background(),
|
|
}
|
|
go func() {
|
|
if err := q.start(); err != nil {
|
|
log.Err(err).Msg("queue start failed")
|
|
}
|
|
}()
|
|
|
|
return q, nil
|
|
}
|
|
|
|
func (q *queue) Schedule(_ context.Context, _ *types.Stage) error {
|
|
select {
|
|
case q.ready <- struct{}{}:
|
|
default:
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (q *queue) Pause(_ context.Context) error {
|
|
q.Lock()
|
|
q.paused = true
|
|
q.Unlock()
|
|
return nil
|
|
}
|
|
|
|
func (q *queue) Request(ctx context.Context, params Filter) (*types.Stage, error) {
|
|
w := &worker{
|
|
kind: params.Kind,
|
|
typ: params.Type,
|
|
os: params.OS,
|
|
arch: params.Arch,
|
|
kernel: params.Kernel,
|
|
variant: params.Variant,
|
|
labels: params.Labels,
|
|
channel: make(chan *types.Stage),
|
|
done: ctx.Done(),
|
|
}
|
|
q.Lock()
|
|
q.workers[w] = struct{}{}
|
|
q.Unlock()
|
|
|
|
select {
|
|
case q.ready <- struct{}{}:
|
|
default:
|
|
}
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
return nil, ctx.Err()
|
|
case b := <-w.channel:
|
|
return b, nil
|
|
}
|
|
}
|
|
|
|
//nolint:gocognit // refactor if needed.
|
|
func (q *queue) signal(ctx context.Context) error {
|
|
if err := q.globMx.Lock(ctx); err != nil {
|
|
return err
|
|
}
|
|
defer func() {
|
|
if err := q.globMx.Unlock(ctx); err != nil {
|
|
log.Ctx(ctx).Err(err).Msg("failed to release global lock after signaling")
|
|
}
|
|
}()
|
|
|
|
q.Lock()
|
|
count := len(q.workers)
|
|
pause := q.paused
|
|
q.Unlock()
|
|
if pause {
|
|
return nil
|
|
}
|
|
if count == 0 {
|
|
return nil
|
|
}
|
|
items, err := q.store.ListIncomplete(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
q.Lock()
|
|
defer q.Unlock()
|
|
for _, item := range items {
|
|
if item.Status == enum.CIStatusRunning {
|
|
continue
|
|
}
|
|
if item.Machine != "" {
|
|
continue
|
|
}
|
|
|
|
// if the stage defines concurrency limits we
|
|
// need to make sure those limits are not exceeded
|
|
// before proceeding.
|
|
if !withinLimits(item, items) {
|
|
continue
|
|
}
|
|
|
|
// if the system defines concurrency limits
|
|
// per repository we need to make sure those limits
|
|
// are not exceeded before proceeding.
|
|
if shouldThrottle(item, items, item.LimitRepo) {
|
|
continue
|
|
}
|
|
|
|
loop:
|
|
for w := range q.workers {
|
|
// the worker must match the resource kind and type
|
|
if !matchResource(w.kind, w.typ, item.Kind, item.Type) {
|
|
continue
|
|
}
|
|
|
|
if w.os != "" || w.arch != "" || w.variant != "" || w.kernel != "" {
|
|
// the worker is platform-specific. check to ensure
|
|
// the queue item matches the worker platform.
|
|
if w.os != item.OS {
|
|
continue
|
|
}
|
|
if w.arch != item.Arch {
|
|
continue
|
|
}
|
|
// if the pipeline defines a variant it must match
|
|
// the worker variant (e.g. arm6, arm7, etc).
|
|
if item.Variant != "" && item.Variant != w.variant {
|
|
continue
|
|
}
|
|
// if the pipeline defines a kernel version it must match
|
|
// the worker kernel version (e.g. 1709, 1803).
|
|
if item.Kernel != "" && item.Kernel != w.kernel {
|
|
continue
|
|
}
|
|
}
|
|
|
|
if len(item.Labels) > 0 || len(w.labels) > 0 {
|
|
if !checkLabels(item.Labels, w.labels) {
|
|
continue
|
|
}
|
|
}
|
|
|
|
select {
|
|
case w.channel <- item:
|
|
case <-w.done:
|
|
}
|
|
|
|
delete(q.workers, w)
|
|
break loop
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (q *queue) start() error {
|
|
for {
|
|
select {
|
|
case <-q.ctx.Done():
|
|
return q.ctx.Err()
|
|
case <-q.ready:
|
|
if err := q.signal(q.ctx); err != nil {
|
|
// don't return, only log error
|
|
log.Ctx(q.ctx).Err(err).Msg("failed to signal on ready")
|
|
}
|
|
case <-time.After(q.interval):
|
|
if err := q.signal(q.ctx); err != nil {
|
|
// don't return, only log error
|
|
log.Ctx(q.ctx).Err(err).Msg("failed to signal on interval")
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
type worker struct {
|
|
kind string
|
|
typ string
|
|
os string
|
|
arch string
|
|
kernel string
|
|
variant string
|
|
labels map[string]string
|
|
channel chan *types.Stage
|
|
done <-chan struct{}
|
|
}
|
|
|
|
func checkLabels(a, b map[string]string) bool {
|
|
if len(a) != len(b) {
|
|
return false
|
|
}
|
|
for k, v := range a {
|
|
if w, ok := b[k]; !ok || v != w {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
func withinLimits(stage *types.Stage, siblings []*types.Stage) bool {
|
|
if stage.Limit == 0 {
|
|
return true
|
|
}
|
|
count := 0
|
|
for _, sibling := range siblings {
|
|
if sibling.RepoID != stage.RepoID {
|
|
continue
|
|
}
|
|
if sibling.ID == stage.ID {
|
|
continue
|
|
}
|
|
if sibling.Name != stage.Name {
|
|
continue
|
|
}
|
|
if sibling.ID < stage.ID ||
|
|
sibling.Status == enum.CIStatusRunning {
|
|
count++
|
|
}
|
|
}
|
|
return count < stage.Limit
|
|
}
|
|
|
|
func shouldThrottle(stage *types.Stage, siblings []*types.Stage, limit int) bool {
|
|
// if no throttle limit is defined (default) then
|
|
// return false to indicate no throttling is needed.
|
|
if limit == 0 {
|
|
return false
|
|
}
|
|
// if the repository is running it is too late
|
|
// to skip and we can exit
|
|
if stage.Status == enum.CIStatusRunning {
|
|
return false
|
|
}
|
|
|
|
count := 0
|
|
// loop through running stages to count number of
|
|
// running stages for the parent repository.
|
|
for _, sibling := range siblings {
|
|
// ignore stages from other repository.
|
|
if sibling.RepoID != stage.RepoID {
|
|
continue
|
|
}
|
|
// ignore this stage and stages that were
|
|
// scheduled after this stage.
|
|
if sibling.ID >= stage.ID {
|
|
continue
|
|
}
|
|
count++
|
|
}
|
|
// if the count of running stages exceeds the
|
|
// throttle limit return true.
|
|
return count >= limit
|
|
}
|
|
|
|
// matchResource is a helper function that returns.
|
|
func matchResource(kinda, typea, kindb, typeb string) bool {
|
|
if kinda == "" {
|
|
kinda = "pipeline"
|
|
}
|
|
if kindb == "" {
|
|
kindb = "pipeline"
|
|
}
|
|
if typea == "" {
|
|
typea = "docker"
|
|
}
|
|
if typeb == "" {
|
|
typeb = "docker"
|
|
}
|
|
return kinda == kindb && typea == typeb
|
|
}
|