diff --git a/.drone.yml b/.drone.yml index ffb0fc816..50a28859b 100644 --- a/.drone.yml +++ b/.drone.yml @@ -11,7 +11,7 @@ steps: - name: test image: golang:1.14.15 commands: - - go test ./... + - go test -race ./... - go build -o /dev/null github.com/drone/drone/cmd/drone-server - go build -o /dev/null -tags "oss nolimit" github.com/drone/drone/cmd/drone-server diff --git a/scheduler/queue/queue.go b/scheduler/queue/queue.go index 1354d4f5e..255130c17 100644 --- a/scheduler/queue/queue.go +++ b/scheduler/queue/queue.go @@ -39,14 +39,14 @@ type queue struct { } // newQueue returns a new Queue backed by the build datastore. -func newQueue(store core.StageStore) *queue { +func newQueue(ctx context.Context, store core.StageStore) *queue { q := &queue{ store: store, globMx: redisdb.LockErrNoOp{}, ready: make(chan struct{}, 1), workers: map[*worker]struct{}{}, interval: time.Minute, - ctx: context.Background(), + ctx: ctx, } go q.start() return q @@ -87,6 +87,8 @@ func (q *queue) Resume(ctx context.Context) error { } func (q *queue) Request(ctx context.Context, params core.Filter) (*core.Stage, error) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() w := &worker{ kind: params.Kind, typ: params.Type, @@ -96,6 +98,7 @@ func (q *queue) Request(ctx context.Context, params core.Filter) (*core.Stage, e variant: params.Variant, labels: params.Labels, channel: make(chan *core.Stage), + done: ctx.Done(), } q.Lock() q.workers[w] = struct{}{} @@ -209,8 +212,20 @@ func (q *queue) signal(ctx context.Context) error { // Msg("cannot update queue item") // continue // } - select { - case w.channel <- item: + + // TODO: refactor to its own unexported method + sendWork := func() bool { + select { + case w.channel <- item: + return true + case <-w.done: + // Worker will exit when we call the deferred q.Unlock() + case <-time.After(q.interval): + // Worker failed to ack before timeout + } + return false + } + if sendWork() { delete(q.workers, w) break loop } @@ -241,6 +256,7 @@ type worker struct { variant string labels map[string]string channel chan *core.Stage + done <-chan struct{} } type counter struct { diff --git a/scheduler/queue/queue_test.go b/scheduler/queue/queue_test.go index 251e79941..892434607 100644 --- a/scheduler/queue/queue_test.go +++ b/scheduler/queue/queue_test.go @@ -6,6 +6,7 @@ package queue import ( "context" + "math/rand" "sync" "testing" "time" @@ -33,7 +34,7 @@ func TestQueue(t *testing.T) { store.EXPECT().ListIncomplete(ctx).Return(items[1:], nil).Times(1) store.EXPECT().ListIncomplete(ctx).Return(items[2:], nil).Times(1) - q := newQueue(store) + q := newQueue(ctx, store) for _, item := range items { next, err := q.Request(ctx, core.Filter{OS: "linux", Arch: "amd64"}) if err != nil { @@ -54,8 +55,7 @@ func TestQueueCancel(t *testing.T) { store := mock.NewMockStageStore(controller) store.EXPECT().ListIncomplete(ctx).Return(nil, nil) - q := newQueue(store) - q.ctx = ctx + q := newQueue(ctx, store) var wg sync.WaitGroup wg.Add(1) @@ -356,3 +356,63 @@ func TestWithinLimits_Old(t *testing.T) { } } } + +func incomplete(n int) ([]*core.Stage, error) { + ret := make([]*core.Stage, n) + for i := range ret { + ret[i] = &core.Stage{ + OS: "linux/amd64", + Arch: "amd64", + } + } + return ret, nil +} + +func TestQueueDeadlock(t *testing.T) { + controller := gomock.NewController(t) + defer controller.Finish() + + n := 10 + donechan := make(chan struct{}, n) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + store := mock.NewMockStageStore(controller) + store.EXPECT().ListIncomplete(ctx).Return(incomplete(n)).AnyTimes() + + q := newQueue(ctx, store) + doWork := func(i int) bool { + select { + case <-ctx.Done(): + return false + default: + } + ctx, cancel := context.WithTimeout(ctx, + time.Duration(i+rand.Intn(1000/n))*time.Millisecond) + defer cancel() + if i%3 == 0 { + // Randomly cancel some contexts to simulate timeouts + cancel() + } + _, err := q.Request(ctx, core.Filter{OS: "linux/amd64", Arch: "amd64"}) + if err != nil && err != context.Canceled && err != + context.DeadlineExceeded { + t.Errorf("Expected context.Canceled or context.DeadlineExceeded error, got %s", err) + } + select { + case donechan <- struct{}{}: + case <-ctx.Done(): + } + return true + } + for i := 0; i < n; i++ { + go func(i int) { + // Spawn n workers, doing work until the parent context is canceled + for doWork(i) { + } + }(i) + } + // Wait for n * 10 tasks to complete, then exit and cancel all the workers. + for seen := 0; seen < n*10; seen++ { + <-donechan + } +} diff --git a/scheduler/queue/scheduler_non_oss.go b/scheduler/queue/scheduler_non_oss.go index 1781fa892..54830729e 100644 --- a/scheduler/queue/scheduler_non_oss.go +++ b/scheduler/queue/scheduler_non_oss.go @@ -12,11 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. +//go:build !oss // +build !oss package queue import ( + "context" "time" "github.com/drone/drone/core" @@ -27,13 +29,13 @@ import ( func New(store core.StageStore, r redisdb.RedisDB) core.Scheduler { if r == nil { return scheduler{ - queue: newQueue(store), + queue: newQueue(context.Background(), store), canceller: newCanceller(), } } sched := schedulerRedis{ - queue: newQueue(store), + queue: newQueue(context.Background(), store), cancellerRedis: newCancellerRedis(r), } diff --git a/scheduler/queue/scheduler_oss.go b/scheduler/queue/scheduler_oss.go index 5cee23bb1..88d7d01af 100644 --- a/scheduler/queue/scheduler_oss.go +++ b/scheduler/queue/scheduler_oss.go @@ -12,11 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. +//go:build oss // +build oss package queue import ( + "context" + "github.com/drone/drone/core" "github.com/drone/drone/service/redisdb" ) @@ -24,7 +27,7 @@ import ( // New creates a new scheduler. func New(store core.StageStore, r redisdb.RedisDB) core.Scheduler { return scheduler{ - queue: newQueue(store), + queue: newQueue(context.Background(), store), canceller: newCanceller(), } }