mirror of https://github.com/harness/drone.git
parent
b69bd618cc
commit
bc8a37de58
|
@ -11,7 +11,7 @@ steps:
|
|||
- name: test
|
||||
image: golang:1.14.15
|
||||
commands:
|
||||
- go test ./...
|
||||
- go test -race ./...
|
||||
- go build -o /dev/null github.com/drone/drone/cmd/drone-server
|
||||
- go build -o /dev/null -tags "oss nolimit" github.com/drone/drone/cmd/drone-server
|
||||
|
||||
|
|
|
@ -39,14 +39,14 @@ type queue struct {
|
|||
}
|
||||
|
||||
// newQueue returns a new Queue backed by the build datastore.
|
||||
func newQueue(store core.StageStore) *queue {
|
||||
func newQueue(ctx context.Context, store core.StageStore) *queue {
|
||||
q := &queue{
|
||||
store: store,
|
||||
globMx: redisdb.LockErrNoOp{},
|
||||
ready: make(chan struct{}, 1),
|
||||
workers: map[*worker]struct{}{},
|
||||
interval: time.Minute,
|
||||
ctx: context.Background(),
|
||||
ctx: ctx,
|
||||
}
|
||||
go q.start()
|
||||
return q
|
||||
|
@ -87,6 +87,8 @@ func (q *queue) Resume(ctx context.Context) error {
|
|||
}
|
||||
|
||||
func (q *queue) Request(ctx context.Context, params core.Filter) (*core.Stage, error) {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
w := &worker{
|
||||
kind: params.Kind,
|
||||
typ: params.Type,
|
||||
|
@ -96,6 +98,7 @@ func (q *queue) Request(ctx context.Context, params core.Filter) (*core.Stage, e
|
|||
variant: params.Variant,
|
||||
labels: params.Labels,
|
||||
channel: make(chan *core.Stage),
|
||||
done: ctx.Done(),
|
||||
}
|
||||
q.Lock()
|
||||
q.workers[w] = struct{}{}
|
||||
|
@ -209,8 +212,20 @@ func (q *queue) signal(ctx context.Context) error {
|
|||
// Msg("cannot update queue item")
|
||||
// continue
|
||||
// }
|
||||
select {
|
||||
case w.channel <- item:
|
||||
|
||||
// TODO: refactor to its own unexported method
|
||||
sendWork := func() bool {
|
||||
select {
|
||||
case w.channel <- item:
|
||||
return true
|
||||
case <-w.done:
|
||||
// Worker will exit when we call the deferred q.Unlock()
|
||||
case <-time.After(q.interval):
|
||||
// Worker failed to ack before timeout
|
||||
}
|
||||
return false
|
||||
}
|
||||
if sendWork() {
|
||||
delete(q.workers, w)
|
||||
break loop
|
||||
}
|
||||
|
@ -241,6 +256,7 @@ type worker struct {
|
|||
variant string
|
||||
labels map[string]string
|
||||
channel chan *core.Stage
|
||||
done <-chan struct{}
|
||||
}
|
||||
|
||||
type counter struct {
|
||||
|
|
|
@ -6,6 +6,7 @@ package queue
|
|||
|
||||
import (
|
||||
"context"
|
||||
"math/rand"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
@ -33,7 +34,7 @@ func TestQueue(t *testing.T) {
|
|||
store.EXPECT().ListIncomplete(ctx).Return(items[1:], nil).Times(1)
|
||||
store.EXPECT().ListIncomplete(ctx).Return(items[2:], nil).Times(1)
|
||||
|
||||
q := newQueue(store)
|
||||
q := newQueue(ctx, store)
|
||||
for _, item := range items {
|
||||
next, err := q.Request(ctx, core.Filter{OS: "linux", Arch: "amd64"})
|
||||
if err != nil {
|
||||
|
@ -54,8 +55,7 @@ func TestQueueCancel(t *testing.T) {
|
|||
store := mock.NewMockStageStore(controller)
|
||||
store.EXPECT().ListIncomplete(ctx).Return(nil, nil)
|
||||
|
||||
q := newQueue(store)
|
||||
q.ctx = ctx
|
||||
q := newQueue(ctx, store)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
|
@ -356,3 +356,63 @@ func TestWithinLimits_Old(t *testing.T) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
func incomplete(n int) ([]*core.Stage, error) {
|
||||
ret := make([]*core.Stage, n)
|
||||
for i := range ret {
|
||||
ret[i] = &core.Stage{
|
||||
OS: "linux/amd64",
|
||||
Arch: "amd64",
|
||||
}
|
||||
}
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
func TestQueueDeadlock(t *testing.T) {
|
||||
controller := gomock.NewController(t)
|
||||
defer controller.Finish()
|
||||
|
||||
n := 10
|
||||
donechan := make(chan struct{}, n)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
store := mock.NewMockStageStore(controller)
|
||||
store.EXPECT().ListIncomplete(ctx).Return(incomplete(n)).AnyTimes()
|
||||
|
||||
q := newQueue(ctx, store)
|
||||
doWork := func(i int) bool {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return false
|
||||
default:
|
||||
}
|
||||
ctx, cancel := context.WithTimeout(ctx,
|
||||
time.Duration(i+rand.Intn(1000/n))*time.Millisecond)
|
||||
defer cancel()
|
||||
if i%3 == 0 {
|
||||
// Randomly cancel some contexts to simulate timeouts
|
||||
cancel()
|
||||
}
|
||||
_, err := q.Request(ctx, core.Filter{OS: "linux/amd64", Arch: "amd64"})
|
||||
if err != nil && err != context.Canceled && err !=
|
||||
context.DeadlineExceeded {
|
||||
t.Errorf("Expected context.Canceled or context.DeadlineExceeded error, got %s", err)
|
||||
}
|
||||
select {
|
||||
case donechan <- struct{}{}:
|
||||
case <-ctx.Done():
|
||||
}
|
||||
return true
|
||||
}
|
||||
for i := 0; i < n; i++ {
|
||||
go func(i int) {
|
||||
// Spawn n workers, doing work until the parent context is canceled
|
||||
for doWork(i) {
|
||||
}
|
||||
}(i)
|
||||
}
|
||||
// Wait for n * 10 tasks to complete, then exit and cancel all the workers.
|
||||
for seen := 0; seen < n*10; seen++ {
|
||||
<-donechan
|
||||
}
|
||||
}
|
||||
|
|
|
@ -12,11 +12,13 @@
|
|||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//go:build !oss
|
||||
// +build !oss
|
||||
|
||||
package queue
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/drone/drone/core"
|
||||
|
@ -27,13 +29,13 @@ import (
|
|||
func New(store core.StageStore, r redisdb.RedisDB) core.Scheduler {
|
||||
if r == nil {
|
||||
return scheduler{
|
||||
queue: newQueue(store),
|
||||
queue: newQueue(context.Background(), store),
|
||||
canceller: newCanceller(),
|
||||
}
|
||||
}
|
||||
|
||||
sched := schedulerRedis{
|
||||
queue: newQueue(store),
|
||||
queue: newQueue(context.Background(), store),
|
||||
cancellerRedis: newCancellerRedis(r),
|
||||
}
|
||||
|
||||
|
|
|
@ -12,11 +12,14 @@
|
|||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//go:build oss
|
||||
// +build oss
|
||||
|
||||
package queue
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/drone/drone/core"
|
||||
"github.com/drone/drone/service/redisdb"
|
||||
)
|
||||
|
@ -24,7 +27,7 @@ import (
|
|||
// New creates a new scheduler.
|
||||
func New(store core.StageStore, r redisdb.RedisDB) core.Scheduler {
|
||||
return scheduler{
|
||||
queue: newQueue(store),
|
||||
queue: newQueue(context.Background(), store),
|
||||
canceller: newCanceller(),
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue