From ba806ecd878b01c76cd04983b9d2ef8538b93440 Mon Sep 17 00:00:00 2001 From: TP Honey Date: Wed, 5 Jul 2023 15:07:07 +0100 Subject: [PATCH] Revert "fix scheduler queue deadlock" --- .drone.yml | 2 +- scheduler/queue/queue.go | 12 +++++------ scheduler/queue/queue_test.go | 40 ++++++----------------------------- 3 files changed, 13 insertions(+), 41 deletions(-) diff --git a/.drone.yml b/.drone.yml index 50a28859b..ffb0fc816 100644 --- a/.drone.yml +++ b/.drone.yml @@ -11,7 +11,7 @@ steps: - name: test image: golang:1.14.15 commands: - - go test -race ./... + - go test ./... - go build -o /dev/null github.com/drone/drone/cmd/drone-server - go build -o /dev/null -tags "oss nolimit" github.com/drone/drone/cmd/drone-server diff --git a/scheduler/queue/queue.go b/scheduler/queue/queue.go index 14704de58..1354d4f5e 100644 --- a/scheduler/queue/queue.go +++ b/scheduler/queue/queue.go @@ -96,7 +96,6 @@ func (q *queue) Request(ctx context.Context, params core.Filter) (*core.Stage, e variant: params.Variant, labels: params.Labels, channel: make(chan *core.Stage), - done: ctx.Done(), } q.Lock() q.workers[w] = struct{}{} @@ -109,6 +108,9 @@ func (q *queue) Request(ctx context.Context, params core.Filter) (*core.Stage, e select { case <-ctx.Done(): + q.Lock() + delete(q.workers, w) + q.Unlock() return nil, ctx.Err() case b := <-w.channel: return b, nil @@ -209,12 +211,9 @@ func (q *queue) signal(ctx context.Context) error { // } select { case w.channel <- item: - case <-w.done: - case <-time.After(q.interval): + delete(q.workers, w) + break loop } - - delete(q.workers, w) - break loop } } return nil @@ -242,7 +241,6 @@ type worker struct { variant string labels map[string]string channel chan *core.Stage - done <-chan struct{} } type counter struct { diff --git a/scheduler/queue/queue_test.go b/scheduler/queue/queue_test.go index 496a6f958..251e79941 100644 --- a/scheduler/queue/queue_test.go +++ b/scheduler/queue/queue_test.go @@ -50,15 +50,16 @@ func TestQueueCancel(t *testing.T) { controller := gomock.NewController(t) defer controller.Finish() + ctx, cancel := context.WithCancel(context.Background()) store := mock.NewMockStageStore(controller) - store.EXPECT().ListIncomplete(gomock.Any()).Return(nil, nil) + store.EXPECT().ListIncomplete(ctx).Return(nil, nil) q := newQueue(store) + q.ctx = ctx var wg sync.WaitGroup wg.Add(1) - ctx, cancel := context.WithCancel(context.Background()) go func() { build, err := q.Request(ctx, core.Filter{OS: "linux/amd64", Arch: "amd64"}) if err != context.Canceled { @@ -101,7 +102,10 @@ func TestQueuePush(t *testing.T) { ctx := context.Background() store := mock.NewMockStageStore(controller) - q := newQueue(store) + q := &queue{ + store: store, + ready: make(chan struct{}, 1), + } q.Schedule(ctx, item1) q.Schedule(ctx, item2) select { @@ -352,33 +356,3 @@ func TestWithinLimits_Old(t *testing.T) { } } } - -func TestQueueContextCanceling(t *testing.T) { - listIncompleteResponse := []*core.Stage{ - {ID: 1, OS: "linux/amd64", Arch: "amd64", Status: drone.StatusPending}, - } - - controller := gomock.NewController(t) - defer controller.Finish() - - globCtx := context.Background() - - mockStageStore := mock.NewMockStageStore(controller) - mockStageStore.EXPECT().ListIncomplete(globCtx).Return(listIncompleteResponse, nil).AnyTimes() - - q := newQueue(mockStageStore) - - for k := 0; k < 1000; k++ { - reqCtx, reqCanc := context.WithCancel(context.Background()) - go reqCanc() // asynchronously cancel the context - - stage, err := q.Request(reqCtx, core.Filter{OS: "linux/amd64", Arch: "amd64"}) - if stage == nil && err == context.Canceled { - continue // we got the ctx canceled error - } - if stage == listIncompleteResponse[0] && err == nil { - continue // we got a stage before the context got canceled - } - t.Errorf("got neither the context canceled error nor the data: stage=%v err=%v", stage, err) - } -}