Revert "fix scheduler queue deadlock"

revert-3330-queue-deadlock-fix
TP Honey 2023-07-05 15:07:07 +01:00 committed by GitHub
parent 9b7f1d5190
commit ba806ecd87
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 13 additions and 41 deletions

View File

@ -11,7 +11,7 @@ steps:
- name: test
image: golang:1.14.15
commands:
- go test -race ./...
- go test ./...
- go build -o /dev/null github.com/drone/drone/cmd/drone-server
- go build -o /dev/null -tags "oss nolimit" github.com/drone/drone/cmd/drone-server

View File

@ -96,7 +96,6 @@ func (q *queue) Request(ctx context.Context, params core.Filter) (*core.Stage, e
variant: params.Variant,
labels: params.Labels,
channel: make(chan *core.Stage),
done: ctx.Done(),
}
q.Lock()
q.workers[w] = struct{}{}
@ -109,6 +108,9 @@ func (q *queue) Request(ctx context.Context, params core.Filter) (*core.Stage, e
select {
case <-ctx.Done():
q.Lock()
delete(q.workers, w)
q.Unlock()
return nil, ctx.Err()
case b := <-w.channel:
return b, nil
@ -209,12 +211,9 @@ func (q *queue) signal(ctx context.Context) error {
// }
select {
case w.channel <- item:
case <-w.done:
case <-time.After(q.interval):
delete(q.workers, w)
break loop
}
delete(q.workers, w)
break loop
}
}
return nil
@ -242,7 +241,6 @@ type worker struct {
variant string
labels map[string]string
channel chan *core.Stage
done <-chan struct{}
}
type counter struct {

View File

@ -50,15 +50,16 @@ func TestQueueCancel(t *testing.T) {
controller := gomock.NewController(t)
defer controller.Finish()
ctx, cancel := context.WithCancel(context.Background())
store := mock.NewMockStageStore(controller)
store.EXPECT().ListIncomplete(gomock.Any()).Return(nil, nil)
store.EXPECT().ListIncomplete(ctx).Return(nil, nil)
q := newQueue(store)
q.ctx = ctx
var wg sync.WaitGroup
wg.Add(1)
ctx, cancel := context.WithCancel(context.Background())
go func() {
build, err := q.Request(ctx, core.Filter{OS: "linux/amd64", Arch: "amd64"})
if err != context.Canceled {
@ -101,7 +102,10 @@ func TestQueuePush(t *testing.T) {
ctx := context.Background()
store := mock.NewMockStageStore(controller)
q := newQueue(store)
q := &queue{
store: store,
ready: make(chan struct{}, 1),
}
q.Schedule(ctx, item1)
q.Schedule(ctx, item2)
select {
@ -352,33 +356,3 @@ func TestWithinLimits_Old(t *testing.T) {
}
}
}
func TestQueueContextCanceling(t *testing.T) {
listIncompleteResponse := []*core.Stage{
{ID: 1, OS: "linux/amd64", Arch: "amd64", Status: drone.StatusPending},
}
controller := gomock.NewController(t)
defer controller.Finish()
globCtx := context.Background()
mockStageStore := mock.NewMockStageStore(controller)
mockStageStore.EXPECT().ListIncomplete(globCtx).Return(listIncompleteResponse, nil).AnyTimes()
q := newQueue(mockStageStore)
for k := 0; k < 1000; k++ {
reqCtx, reqCanc := context.WithCancel(context.Background())
go reqCanc() // asynchronously cancel the context
stage, err := q.Request(reqCtx, core.Filter{OS: "linux/amd64", Arch: "amd64"})
if stage == nil && err == context.Canceled {
continue // we got the ctx canceled error
}
if stage == listIncompleteResponse[0] && err == nil {
continue // we got a stage before the context got canceled
}
t.Errorf("got neither the context canceled error nor the data: stage=%v err=%v", stage, err)
}
}