diff --git a/docs/content/doc/advanced/config-cheat-sheet.en-us.md b/docs/content/doc/advanced/config-cheat-sheet.en-us.md
index 7e5b39e480..cbf05b5349 100644
--- a/docs/content/doc/advanced/config-cheat-sheet.en-us.md
+++ b/docs/content/doc/advanced/config-cheat-sheet.en-us.md
@@ -252,6 +252,10 @@ relation to port exhaustion.
 - `BATCH_LENGTH`: **20**: Batch data before passing to the handler
 - `CONN_STR`: **addrs=127.0.0.1:6379 db=0**: Connection string for the redis queue type.
 - `QUEUE_NAME`: **_queue**: The suffix for default redis queue name. Individual queues will default to **`name`**`QUEUE_NAME` but can be overriden in the specific `queue.name` section.
+- `SET_NAME`: **_unique**: The suffix that will added to the default redis
+set name for unique queues. Individual queues will default to
+**`name`**`QUEUE_NAME`_`SET_NAME`_ but can be overridden in the specific
+`queue.name` section.
 - `WRAP_IF_NECESSARY`: **true**: Will wrap queues with a timeoutable queue if the selected queue is not ready to be created - (Only relevant for the level queue.)
 - `MAX_ATTEMPTS`: **10**: Maximum number of attempts to create the wrapped queue
 - `TIMEOUT`: **GRACEFUL_HAMMER_TIME + 30s**: Timeout the creation of the wrapped queue if it takes longer than this to create.
diff --git a/go.mod b/go.mod
index cb1dca4b5d..508024d39c 100644
--- a/go.mod
+++ b/go.mod
@@ -4,7 +4,7 @@ go 1.13
 
 require (
 	cloud.google.com/go v0.45.0 // indirect
-	gitea.com/lunny/levelqueue v0.1.0
+	gitea.com/lunny/levelqueue v0.2.0
 	gitea.com/macaron/binding v0.0.0-20190822013154-a5f53841ed2b
 	gitea.com/macaron/cache v0.0.0-20190822004001-a6e7fee4ee76
 	gitea.com/macaron/captcha v0.0.0-20190822015246-daa973478bae
diff --git a/go.sum b/go.sum
index 13ffa77502..27ad269429 100644
--- a/go.sum
+++ b/go.sum
@@ -11,6 +11,8 @@ cloud.google.com/go/bigquery v1.0.1/go.mod h1:i/xbL2UlR5RvWAURpBYZTtm/cXjCha9lbf
 cloud.google.com/go/datastore v1.0.0/go.mod h1:LXYbyblFSglQ5pkeyhO+Qmw7ukd3C+pD7TKLgZqpHYE=
 gitea.com/lunny/levelqueue v0.1.0 h1:7wMk0VH6mvKN6vZEZCy9nUDgRmdPLgeNrm1NkW8EHNk=
 gitea.com/lunny/levelqueue v0.1.0/go.mod h1:G7hVb908t0Bl0uk7zGSg14fyzNtxgtD9Shf04wkMK7s=
+gitea.com/lunny/levelqueue v0.2.0 h1:lR/5EAwQtFcn5YvPEkNMw0p9pAy2/O2nSP5ImECLA2E=
+gitea.com/lunny/levelqueue v0.2.0/go.mod h1:G7hVb908t0Bl0uk7zGSg14fyzNtxgtD9Shf04wkMK7s=
 gitea.com/macaron/binding v0.0.0-20190822013154-a5f53841ed2b h1:vXt85uYV17KURaUlhU7v4GbCShkqRZDSfo0TkC0YCjQ=
 gitea.com/macaron/binding v0.0.0-20190822013154-a5f53841ed2b/go.mod h1:Cxadig6POWpPYYSfg23E7jo35Yf0yvsdC1lifoKWmPo=
 gitea.com/macaron/cache v0.0.0-20190822004001-a6e7fee4ee76 h1:mMsMEg90c5KXQgRWsH8D6GHXfZIW1RAe5S9VYIb12lM=
diff --git a/modules/queue/bytefifo.go b/modules/queue/bytefifo.go
new file mode 100644
index 0000000000..2cd0ba0b95
--- /dev/null
+++ b/modules/queue/bytefifo.go
@@ -0,0 +1,61 @@
+// Copyright 2020 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package queue
+
+// ByteFIFO defines a FIFO that takes a byte array
+type ByteFIFO interface {
+	// Len returns the length of the fifo
+	Len() int64
+	// PushFunc pushes data to the end of the fifo and calls the callback if it is added
+	PushFunc(data []byte, fn func() error) error
+	// Pop pops data from the start of the fifo
+	Pop() ([]byte, error)
+	// Close this fifo
+	Close() error
+}
+
+// UniqueByteFIFO defines a FIFO that Uniques its contents
+type UniqueByteFIFO interface {
+	ByteFIFO
+	// Has returns whether the fifo contains this data
+	Has(data []byte) (bool, error)
+}
+
+var _ (ByteFIFO) = &DummyByteFIFO{}
+
+// DummyByteFIFO represents a dummy fifo
+type DummyByteFIFO struct{}
+
+// PushFunc returns nil
+func (*DummyByteFIFO) PushFunc(data []byte, fn func() error) error {
+	return nil
+}
+
+// Pop returns nil
+func (*DummyByteFIFO) Pop() ([]byte, error) {
+	return []byte{}, nil
+}
+
+// Close returns nil
+func (*DummyByteFIFO) Close() error {
+	return nil
+}
+
+// Len is always 0
+func (*DummyByteFIFO) Len() int64 {
+	return 0
+}
+
+var _ (UniqueByteFIFO) = &DummyUniqueByteFIFO{}
+
+// DummyUniqueByteFIFO represents a dummy unique fifo
+type DummyUniqueByteFIFO struct {
+	DummyByteFIFO
+}
+
+// Has always returns false
+func (*DummyUniqueByteFIFO) Has([]byte) (bool, error) {
+	return false, nil
+}
diff --git a/modules/queue/queue.go b/modules/queue/queue.go
index 094699d4af..e3c63310be 100644
--- a/modules/queue/queue.go
+++ b/modules/queue/queue.go
@@ -74,25 +74,35 @@ type DummyQueue struct {
 }
 
 // Run does nothing
-func (b *DummyQueue) Run(_, _ func(context.Context, func())) {}
+func (*DummyQueue) Run(_, _ func(context.Context, func())) {}
 
 // Push fakes a push of data to the queue
-func (b *DummyQueue) Push(Data) error {
+func (*DummyQueue) Push(Data) error {
 	return nil
 }
 
+// PushFunc fakes a push of data to the queue with a function. The function is never run.
+func (*DummyQueue) PushFunc(Data, func() error) error {
+	return nil
+}
+
+// Has always returns false as this queue never does anything
+func (*DummyQueue) Has(Data) (bool, error) {
+	return false, nil
+}
+
 // Flush always returns nil
-func (b *DummyQueue) Flush(time.Duration) error {
+func (*DummyQueue) Flush(time.Duration) error {
 	return nil
 }
 
 // FlushWithContext always returns nil
-func (b *DummyQueue) FlushWithContext(context.Context) error {
+func (*DummyQueue) FlushWithContext(context.Context) error {
 	return nil
 }
 
 // IsEmpty asserts that the queue is empty
-func (b *DummyQueue) IsEmpty() bool {
+func (*DummyQueue) IsEmpty() bool {
 	return true
 }
 
diff --git a/modules/queue/queue_bytefifo.go b/modules/queue/queue_bytefifo.go
new file mode 100644
index 0000000000..cad258bda8
--- /dev/null
+++ b/modules/queue/queue_bytefifo.go
@@ -0,0 +1,227 @@
+// Copyright 2020 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package queue
+
+import (
+	"context"
+	"encoding/json"
+	"fmt"
+	"sync"
+	"time"
+
+	"code.gitea.io/gitea/modules/log"
+)
+
+// ByteFIFOQueueConfiguration is the configuration for a ByteFIFOQueue
+type ByteFIFOQueueConfiguration struct {
+	WorkerPoolConfiguration
+	Workers int
+	Name    string
+}
+
+var _ (Queue) = &ByteFIFOQueue{}
+
+// ByteFIFOQueue is a Queue formed from a ByteFIFO and WorkerPool
+type ByteFIFOQueue struct {
+	*WorkerPool
+	byteFIFO   ByteFIFO
+	typ        Type
+	closed     chan struct{}
+	terminated chan struct{}
+	exemplar   interface{}
+	workers    int
+	name       string
+	lock       sync.Mutex
+}
+
+// NewByteFIFOQueue creates a new ByteFIFOQueue
+func NewByteFIFOQueue(typ Type, byteFIFO ByteFIFO, handle HandlerFunc, cfg, exemplar interface{}) (*ByteFIFOQueue, error) {
+	configInterface, err := toConfig(ByteFIFOQueueConfiguration{}, cfg)
+	if err != nil {
+		return nil, err
+	}
+	config := configInterface.(ByteFIFOQueueConfiguration)
+
+	return &ByteFIFOQueue{
+		WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration),
+		byteFIFO:   byteFIFO,
+		typ:        typ,
+		closed:     make(chan struct{}),
+		terminated: make(chan struct{}),
+		exemplar:   exemplar,
+		workers:    config.Workers,
+		name:       config.Name,
+	}, nil
+}
+
+// Name returns the name of this queue
+func (q *ByteFIFOQueue) Name() string {
+	return q.name
+}
+
+// Push pushes data to the fifo
+func (q *ByteFIFOQueue) Push(data Data) error {
+	return q.PushFunc(data, nil)
+}
+
+// PushFunc pushes data to the fifo
+func (q *ByteFIFOQueue) PushFunc(data Data, fn func() error) error {
+	if !assignableTo(data, q.exemplar) {
+		return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name)
+	}
+	bs, err := json.Marshal(data)
+	if err != nil {
+		return err
+	}
+	return q.byteFIFO.PushFunc(bs, fn)
+}
+
+// IsEmpty checks if the queue is empty
+func (q *ByteFIFOQueue) IsEmpty() bool {
+	q.lock.Lock()
+	defer q.lock.Unlock()
+	if !q.WorkerPool.IsEmpty() {
+		return false
+	}
+	return q.byteFIFO.Len() == 0
+}
+
+// Run runs the bytefifo queue
+func (q *ByteFIFOQueue) Run(atShutdown, atTerminate func(context.Context, func())) {
+	atShutdown(context.Background(), q.Shutdown)
+	atTerminate(context.Background(), q.Terminate)
+	log.Debug("%s: %s Starting", q.typ, q.name)
+
+	go func() {
+		_ = q.AddWorkers(q.workers, 0)
+	}()
+
+	go q.readToChan()
+
+	log.Trace("%s: %s Waiting til closed", q.typ, q.name)
+	<-q.closed
+	log.Trace("%s: %s Waiting til done", q.typ, q.name)
+	q.Wait()
+
+	log.Trace("%s: %s Waiting til cleaned", q.typ, q.name)
+	ctx, cancel := context.WithCancel(context.Background())
+	atTerminate(ctx, cancel)
+	q.CleanUp(ctx)
+	cancel()
+}
+
+func (q *ByteFIFOQueue) readToChan() {
+	for {
+		select {
+		case <-q.closed:
+			// tell the pool to shutdown.
+			q.cancel()
+			return
+		default:
+			q.lock.Lock()
+			bs, err := q.byteFIFO.Pop()
+			if err != nil {
+				q.lock.Unlock()
+				log.Error("%s: %s Error on Pop: %v", q.typ, q.name, err)
+				time.Sleep(time.Millisecond * 100)
+				continue
+			}
+
+			if len(bs) == 0 {
+				q.lock.Unlock()
+				time.Sleep(time.Millisecond * 100)
+				continue
+			}
+
+			data, err := unmarshalAs(bs, q.exemplar)
+			if err != nil {
+				log.Error("%s: %s Failed to unmarshal with error: %v", q.typ, q.name, err)
+				q.lock.Unlock()
+				time.Sleep(time.Millisecond * 100)
+				continue
+			}
+
+			log.Trace("%s %s: Task found: %#v", q.typ, q.name, data)
+			q.WorkerPool.Push(data)
+			q.lock.Unlock()
+		}
+	}
+}
+
+// Shutdown processing from this queue
+func (q *ByteFIFOQueue) Shutdown() {
+	log.Trace("%s: %s Shutting down", q.typ, q.name)
+	q.lock.Lock()
+	select {
+	case <-q.closed:
+	default:
+		close(q.closed)
+	}
+	q.lock.Unlock()
+	log.Debug("%s: %s Shutdown", q.typ, q.name)
+}
+
+// Terminate this queue and close the queue
+func (q *ByteFIFOQueue) Terminate() {
+	log.Trace("%s: %s Terminating", q.typ, q.name)
+	q.Shutdown()
+	q.lock.Lock()
+	select {
+	case <-q.terminated:
+		q.lock.Unlock()
+		return
+	default:
+	}
+	close(q.terminated)
+	q.lock.Unlock()
+	if log.IsDebug() {
+		log.Debug("%s: %s Closing with %d tasks left in queue", q.typ, q.name, q.byteFIFO.Len())
+	}
+	if err := q.byteFIFO.Close(); err != nil {
+		log.Error("Error whilst closing internal byte fifo in %s: %s: %v", q.typ, q.name, err)
+	}
+	log.Debug("%s: %s Terminated", q.typ, q.name)
+}
+
+var _ (UniqueQueue) = &ByteFIFOUniqueQueue{}
+
+// ByteFIFOUniqueQueue represents a UniqueQueue formed from a UniqueByteFifo
+type ByteFIFOUniqueQueue struct {
+	ByteFIFOQueue
+}
+
+// NewByteFIFOUniqueQueue creates a new ByteFIFOUniqueQueue
+func NewByteFIFOUniqueQueue(typ Type, byteFIFO UniqueByteFIFO, handle HandlerFunc, cfg, exemplar interface{}) (*ByteFIFOUniqueQueue, error) {
+	configInterface, err := toConfig(ByteFIFOQueueConfiguration{}, cfg)
+	if err != nil {
+		return nil, err
+	}
+	config := configInterface.(ByteFIFOQueueConfiguration)
+
+	return &ByteFIFOUniqueQueue{
+		ByteFIFOQueue: ByteFIFOQueue{
+			WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration),
+			byteFIFO:   byteFIFO,
+			typ:        typ,
+			closed:     make(chan struct{}),
+			terminated: make(chan struct{}),
+			exemplar:   exemplar,
+			workers:    config.Workers,
+			name:       config.Name,
+		},
+	}, nil
+}
+
+// Has checks if the provided data is in the queue
+func (q *ByteFIFOUniqueQueue) Has(data Data) (bool, error) {
+	if !assignableTo(data, q.exemplar) {
+		return false, fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name)
+	}
+	bs, err := json.Marshal(data)
+	if err != nil {
+		return false, err
+	}
+	return q.byteFIFO.(UniqueByteFIFO).Has(bs)
+}
diff --git a/modules/queue/queue_channel.go b/modules/queue/queue_channel.go
index 45df8a443e..d7a11e79f5 100644
--- a/modules/queue/queue_channel.go
+++ b/modules/queue/queue_channel.go
@@ -53,31 +53,31 @@ func NewChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, erro
 }
 
 // Run starts to run the queue
-func (c *ChannelQueue) Run(atShutdown, atTerminate func(context.Context, func())) {
+func (q *ChannelQueue) Run(atShutdown, atTerminate func(context.Context, func())) {
 	atShutdown(context.Background(), func() {
-		log.Warn("ChannelQueue: %s is not shutdownable!", c.name)
+		log.Warn("ChannelQueue: %s is not shutdownable!", q.name)
 	})
 	atTerminate(context.Background(), func() {
-		log.Warn("ChannelQueue: %s is not terminatable!", c.name)
+		log.Warn("ChannelQueue: %s is not terminatable!", q.name)
 	})
-	log.Debug("ChannelQueue: %s Starting", c.name)
+	log.Debug("ChannelQueue: %s Starting", q.name)
 	go func() {
-		_ = c.AddWorkers(c.workers, 0)
+		_ = q.AddWorkers(q.workers, 0)
 	}()
 }
 
 // Push will push data into the queue
-func (c *ChannelQueue) Push(data Data) error {
-	if !assignableTo(data, c.exemplar) {
-		return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in queue: %s", data, c.exemplar, c.name)
+func (q *ChannelQueue) Push(data Data) error {
+	if !assignableTo(data, q.exemplar) {
+		return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in queue: %s", data, q.exemplar, q.name)
 	}
-	c.WorkerPool.Push(data)
+	q.WorkerPool.Push(data)
 	return nil
 }
 
 // Name returns the name of this queue
-func (c *ChannelQueue) Name() string {
-	return c.name
+func (q *ChannelQueue) Name() string {
+	return q.name
 }
 
 func init() {
diff --git a/modules/queue/queue_disk.go b/modules/queue/queue_disk.go
index ca3e230e3d..ff0876488b 100644
--- a/modules/queue/queue_disk.go
+++ b/modules/queue/queue_disk.go
@@ -5,15 +5,6 @@
 package queue
 
 import (
-	"context"
-	"encoding/json"
-	"fmt"
-	"sync"
-	"sync/atomic"
-	"time"
-
-	"code.gitea.io/gitea/modules/log"
-
 	"gitea.com/lunny/levelqueue"
 )
 
@@ -22,22 +13,13 @@ const LevelQueueType Type = "level"
 
 // LevelQueueConfiguration is the configuration for a LevelQueue
 type LevelQueueConfiguration struct {
-	WorkerPoolConfiguration
+	ByteFIFOQueueConfiguration
 	DataDir string
-	Workers int
-	Name    string
 }
 
 // LevelQueue implements a disk library queue
 type LevelQueue struct {
-	*WorkerPool
-	queue      *levelqueue.Queue
-	closed     chan struct{}
-	terminated chan struct{}
-	lock       sync.Mutex
-	exemplar   interface{}
-	workers    int
-	name       string
+	*ByteFIFOQueue
 }
 
 // NewLevelQueue creates a ledis local queue
@@ -48,149 +30,69 @@ func NewLevelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error)
 	}
 	config := configInterface.(LevelQueueConfiguration)
 
-	internal, err := levelqueue.Open(config.DataDir)
+	byteFIFO, err := NewLevelQueueByteFIFO(config.DataDir)
+	if err != nil {
+		return nil, err
+	}
+
+	byteFIFOQueue, err := NewByteFIFOQueue(LevelQueueType, byteFIFO, handle, config.ByteFIFOQueueConfiguration, exemplar)
 	if err != nil {
 		return nil, err
 	}
 
 	queue := &LevelQueue{
-		WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration),
-		queue:      internal,
-		exemplar:   exemplar,
-		closed:     make(chan struct{}),
-		terminated: make(chan struct{}),
-		workers:    config.Workers,
-		name:       config.Name,
+		ByteFIFOQueue: byteFIFOQueue,
 	}
 	queue.qid = GetManager().Add(queue, LevelQueueType, config, exemplar)
 	return queue, nil
 }
 
-// Run starts to run the queue
-func (l *LevelQueue) Run(atShutdown, atTerminate func(context.Context, func())) {
-	atShutdown(context.Background(), l.Shutdown)
-	atTerminate(context.Background(), l.Terminate)
-	log.Debug("LevelQueue: %s Starting", l.name)
-
-	go func() {
-		_ = l.AddWorkers(l.workers, 0)
-	}()
-
-	go l.readToChan()
-
-	log.Trace("LevelQueue: %s Waiting til closed", l.name)
-	<-l.closed
-
-	log.Trace("LevelQueue: %s Waiting til done", l.name)
-	l.Wait()
-
-	log.Trace("LevelQueue: %s Waiting til cleaned", l.name)
-	ctx, cancel := context.WithCancel(context.Background())
-	atTerminate(ctx, cancel)
-	l.CleanUp(ctx)
-	cancel()
-	log.Trace("LevelQueue: %s Cleaned", l.name)
+var _ (ByteFIFO) = &LevelQueueByteFIFO{}
 
+// LevelQueueByteFIFO represents a ByteFIFO formed from a LevelQueue
+type LevelQueueByteFIFO struct {
+	internal *levelqueue.Queue
 }
 
-func (l *LevelQueue) readToChan() {
-	for {
-		select {
-		case <-l.closed:
-			// tell the pool to shutdown.
-			l.cancel()
-			return
-		default:
-			atomic.AddInt64(&l.numInQueue, 1)
-			bs, err := l.queue.RPop()
-			if err != nil {
-				if err != levelqueue.ErrNotFound {
-					log.Error("LevelQueue: %s Error on RPop: %v", l.name, err)
-				}
-				atomic.AddInt64(&l.numInQueue, -1)
-				time.Sleep(time.Millisecond * 100)
-				continue
-			}
-
-			if len(bs) == 0 {
-				atomic.AddInt64(&l.numInQueue, -1)
-				time.Sleep(time.Millisecond * 100)
-				continue
-			}
-
-			data, err := unmarshalAs(bs, l.exemplar)
-			if err != nil {
-				log.Error("LevelQueue: %s Failed to unmarshal with error: %v", l.name, err)
-				atomic.AddInt64(&l.numInQueue, -1)
-				time.Sleep(time.Millisecond * 100)
-				continue
-			}
-
-			log.Trace("LevelQueue %s: Task found: %#v", l.name, data)
-			l.WorkerPool.Push(data)
-			atomic.AddInt64(&l.numInQueue, -1)
-		}
-	}
-}
-
-// Push will push the indexer data to queue
-func (l *LevelQueue) Push(data Data) error {
-	if !assignableTo(data, l.exemplar) {
-		return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, l.exemplar, l.name)
-	}
-	bs, err := json.Marshal(data)
+// NewLevelQueueByteFIFO creates a ByteFIFO formed from a LevelQueue
+func NewLevelQueueByteFIFO(dataDir string) (*LevelQueueByteFIFO, error) {
+	internal, err := levelqueue.Open(dataDir)
 	if err != nil {
-		return err
+		return nil, err
 	}
-	return l.queue.LPush(bs)
+
+	return &LevelQueueByteFIFO{
+		internal: internal,
+	}, nil
 }
 
-// IsEmpty checks whether the queue is empty
-func (l *LevelQueue) IsEmpty() bool {
-	if !l.WorkerPool.IsEmpty() {
-		return false
-	}
-	return l.queue.Len() == 0
-}
-
-// Shutdown this queue and stop processing
-func (l *LevelQueue) Shutdown() {
-	l.lock.Lock()
-	defer l.lock.Unlock()
-	log.Trace("LevelQueue: %s Shutting down", l.name)
-	select {
-	case <-l.closed:
-	default:
-		close(l.closed)
-	}
-	log.Debug("LevelQueue: %s Shutdown", l.name)
-}
-
-// Terminate this queue and close the queue
-func (l *LevelQueue) Terminate() {
-	log.Trace("LevelQueue: %s Terminating", l.name)
-	l.Shutdown()
-	l.lock.Lock()
-	select {
-	case <-l.terminated:
-		l.lock.Unlock()
-	default:
-		close(l.terminated)
-		l.lock.Unlock()
-		if log.IsDebug() {
-			log.Debug("LevelQueue: %s Closing with %d tasks left in queue", l.name, l.queue.Len())
+// PushFunc will push data into the fifo
+func (fifo *LevelQueueByteFIFO) PushFunc(data []byte, fn func() error) error {
+	if fn != nil {
+		if err := fn(); err != nil {
+			return err
 		}
-		if err := l.queue.Close(); err != nil && err.Error() != "leveldb: closed" {
-			log.Error("Error whilst closing internal queue in %s: %v", l.name, err)
-		}
-
 	}
-	log.Debug("LevelQueue: %s Terminated", l.name)
+	return fifo.internal.LPush(data)
 }
 
-// Name returns the name of this queue
-func (l *LevelQueue) Name() string {
-	return l.name
+// Pop pops data from the start of the fifo
+func (fifo *LevelQueueByteFIFO) Pop() ([]byte, error) {
+	data, err := fifo.internal.RPop()
+	if err != nil && err != levelqueue.ErrNotFound {
+		return nil, err
+	}
+	return data, nil
+}
+
+// Close this fifo
+func (fifo *LevelQueueByteFIFO) Close() error {
+	return fifo.internal.Close()
+}
+
+// Len returns the length of the fifo
+func (fifo *LevelQueueByteFIFO) Len() int64 {
+	return fifo.internal.Len()
 }
 
 func init() {
diff --git a/modules/queue/queue_disk_channel.go b/modules/queue/queue_disk_channel.go
index 961187ab0d..433435c301 100644
--- a/modules/queue/queue_disk_channel.go
+++ b/modules/queue/queue_disk_channel.go
@@ -69,17 +69,19 @@ func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (
 
 	// the level backend only needs temporary workers to catch up with the previously dropped work
 	levelCfg := LevelQueueConfiguration{
-		WorkerPoolConfiguration: WorkerPoolConfiguration{
-			QueueLength:  config.QueueLength,
-			BatchLength:  config.BatchLength,
-			BlockTimeout: 1 * time.Second,
-			BoostTimeout: 5 * time.Minute,
-			BoostWorkers: 5,
-			MaxWorkers:   6,
+		ByteFIFOQueueConfiguration: ByteFIFOQueueConfiguration{
+			WorkerPoolConfiguration: WorkerPoolConfiguration{
+				QueueLength:  config.QueueLength,
+				BatchLength:  config.BatchLength,
+				BlockTimeout: 1 * time.Second,
+				BoostTimeout: 5 * time.Minute,
+				BoostWorkers: 5,
+				MaxWorkers:   6,
+			},
+			Workers: 1,
+			Name:    config.Name + "-level",
 		},
 		DataDir: config.DataDir,
-		Workers: 1,
-		Name:    config.Name + "-level",
 	}
 
 	levelQueue, err := NewLevelQueue(handle, levelCfg, exemplar)
@@ -116,67 +118,67 @@ func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (
 }
 
 // Name returns the name of this queue
-func (p *PersistableChannelQueue) Name() string {
-	return p.delayedStarter.name
+func (q *PersistableChannelQueue) Name() string {
+	return q.delayedStarter.name
 }
 
 // Push will push the indexer data to queue
-func (p *PersistableChannelQueue) Push(data Data) error {
+func (q *PersistableChannelQueue) Push(data Data) error {
 	select {
-	case <-p.closed:
-		return p.internal.Push(data)
+	case <-q.closed:
+		return q.internal.Push(data)
 	default:
-		return p.channelQueue.Push(data)
+		return q.channelQueue.Push(data)
 	}
 }
 
 // Run starts to run the queue
-func (p *PersistableChannelQueue) Run(atShutdown, atTerminate func(context.Context, func())) {
-	log.Debug("PersistableChannelQueue: %s Starting", p.delayedStarter.name)
+func (q *PersistableChannelQueue) Run(atShutdown, atTerminate func(context.Context, func())) {
+	log.Debug("PersistableChannelQueue: %s Starting", q.delayedStarter.name)
 
-	p.lock.Lock()
-	if p.internal == nil {
-		err := p.setInternal(atShutdown, p.channelQueue.handle, p.channelQueue.exemplar)
-		p.lock.Unlock()
+	q.lock.Lock()
+	if q.internal == nil {
+		err := q.setInternal(atShutdown, q.channelQueue.handle, q.channelQueue.exemplar)
+		q.lock.Unlock()
 		if err != nil {
-			log.Fatal("Unable to create internal queue for %s Error: %v", p.Name(), err)
+			log.Fatal("Unable to create internal queue for %s Error: %v", q.Name(), err)
 			return
 		}
 	} else {
-		p.lock.Unlock()
+		q.lock.Unlock()
 	}
-	atShutdown(context.Background(), p.Shutdown)
-	atTerminate(context.Background(), p.Terminate)
+	atShutdown(context.Background(), q.Shutdown)
+	atTerminate(context.Background(), q.Terminate)
 
 	// Just run the level queue - we shut it down later
-	go p.internal.Run(func(_ context.Context, _ func()) {}, func(_ context.Context, _ func()) {})
+	go q.internal.Run(func(_ context.Context, _ func()) {}, func(_ context.Context, _ func()) {})
 
 	go func() {
-		_ = p.channelQueue.AddWorkers(p.channelQueue.workers, 0)
+		_ = q.channelQueue.AddWorkers(q.channelQueue.workers, 0)
 	}()
 
-	log.Trace("PersistableChannelQueue: %s Waiting til closed", p.delayedStarter.name)
-	<-p.closed
-	log.Trace("PersistableChannelQueue: %s Cancelling pools", p.delayedStarter.name)
-	p.channelQueue.cancel()
-	p.internal.(*LevelQueue).cancel()
-	log.Trace("PersistableChannelQueue: %s Waiting til done", p.delayedStarter.name)
-	p.channelQueue.Wait()
-	p.internal.(*LevelQueue).Wait()
+	log.Trace("PersistableChannelQueue: %s Waiting til closed", q.delayedStarter.name)
+	<-q.closed
+	log.Trace("PersistableChannelQueue: %s Cancelling pools", q.delayedStarter.name)
+	q.channelQueue.cancel()
+	q.internal.(*LevelQueue).cancel()
+	log.Trace("PersistableChannelQueue: %s Waiting til done", q.delayedStarter.name)
+	q.channelQueue.Wait()
+	q.internal.(*LevelQueue).Wait()
 	// Redirect all remaining data in the chan to the internal channel
 	go func() {
-		log.Trace("PersistableChannelQueue: %s Redirecting remaining data", p.delayedStarter.name)
-		for data := range p.channelQueue.dataChan {
-			_ = p.internal.Push(data)
-			atomic.AddInt64(&p.channelQueue.numInQueue, -1)
+		log.Trace("PersistableChannelQueue: %s Redirecting remaining data", q.delayedStarter.name)
+		for data := range q.channelQueue.dataChan {
+			_ = q.internal.Push(data)
+			atomic.AddInt64(&q.channelQueue.numInQueue, -1)
 		}
-		log.Trace("PersistableChannelQueue: %s Done Redirecting remaining data", p.delayedStarter.name)
+		log.Trace("PersistableChannelQueue: %s Done Redirecting remaining data", q.delayedStarter.name)
 	}()
-	log.Trace("PersistableChannelQueue: %s Done main loop", p.delayedStarter.name)
+	log.Trace("PersistableChannelQueue: %s Done main loop", q.delayedStarter.name)
 }
 
 // Flush flushes the queue and blocks till the queue is empty
-func (p *PersistableChannelQueue) Flush(timeout time.Duration) error {
+func (q *PersistableChannelQueue) Flush(timeout time.Duration) error {
 	var ctx context.Context
 	var cancel context.CancelFunc
 	if timeout > 0 {
@@ -185,24 +187,24 @@ func (p *PersistableChannelQueue) Flush(timeout time.Duration) error {
 		ctx, cancel = context.WithCancel(context.Background())
 	}
 	defer cancel()
-	return p.FlushWithContext(ctx)
+	return q.FlushWithContext(ctx)
 }
 
 // FlushWithContext flushes the queue and blocks till the queue is empty
-func (p *PersistableChannelQueue) FlushWithContext(ctx context.Context) error {
+func (q *PersistableChannelQueue) FlushWithContext(ctx context.Context) error {
 	errChan := make(chan error, 1)
 	go func() {
-		errChan <- p.channelQueue.FlushWithContext(ctx)
+		errChan <- q.channelQueue.FlushWithContext(ctx)
 	}()
 	go func() {
-		p.lock.Lock()
-		if p.internal == nil {
-			p.lock.Unlock()
-			errChan <- fmt.Errorf("not ready to flush internal queue %s yet", p.Name())
+		q.lock.Lock()
+		if q.internal == nil {
+			q.lock.Unlock()
+			errChan <- fmt.Errorf("not ready to flush internal queue %s yet", q.Name())
 			return
 		}
-		p.lock.Unlock()
-		errChan <- p.internal.FlushWithContext(ctx)
+		q.lock.Unlock()
+		errChan <- q.internal.FlushWithContext(ctx)
 	}()
 	err1 := <-errChan
 	err2 := <-errChan
@@ -214,44 +216,44 @@ func (p *PersistableChannelQueue) FlushWithContext(ctx context.Context) error {
 }
 
 // IsEmpty checks if a queue is empty
-func (p *PersistableChannelQueue) IsEmpty() bool {
-	if !p.channelQueue.IsEmpty() {
+func (q *PersistableChannelQueue) IsEmpty() bool {
+	if !q.channelQueue.IsEmpty() {
 		return false
 	}
-	p.lock.Lock()
-	defer p.lock.Unlock()
-	if p.internal == nil {
+	q.lock.Lock()
+	defer q.lock.Unlock()
+	if q.internal == nil {
 		return false
 	}
-	return p.internal.IsEmpty()
+	return q.internal.IsEmpty()
 }
 
 // Shutdown processing this queue
-func (p *PersistableChannelQueue) Shutdown() {
-	log.Trace("PersistableChannelQueue: %s Shutting down", p.delayedStarter.name)
+func (q *PersistableChannelQueue) Shutdown() {
+	log.Trace("PersistableChannelQueue: %s Shutting down", q.delayedStarter.name)
+	q.lock.Lock()
+	defer q.lock.Unlock()
 	select {
-	case <-p.closed:
+	case <-q.closed:
 	default:
-		p.lock.Lock()
-		defer p.lock.Unlock()
-		if p.internal != nil {
-			p.internal.(*LevelQueue).Shutdown()
+		if q.internal != nil {
+			q.internal.(*LevelQueue).Shutdown()
 		}
-		close(p.closed)
+		close(q.closed)
+		log.Debug("PersistableChannelQueue: %s Shutdown", q.delayedStarter.name)
 	}
-	log.Debug("PersistableChannelQueue: %s Shutdown", p.delayedStarter.name)
 }
 
 // Terminate this queue and close the queue
-func (p *PersistableChannelQueue) Terminate() {
-	log.Trace("PersistableChannelQueue: %s Terminating", p.delayedStarter.name)
-	p.Shutdown()
-	p.lock.Lock()
-	defer p.lock.Unlock()
-	if p.internal != nil {
-		p.internal.(*LevelQueue).Terminate()
+func (q *PersistableChannelQueue) Terminate() {
+	log.Trace("PersistableChannelQueue: %s Terminating", q.delayedStarter.name)
+	q.Shutdown()
+	q.lock.Lock()
+	defer q.lock.Unlock()
+	if q.internal != nil {
+		q.internal.(*LevelQueue).Terminate()
 	}
-	log.Debug("PersistableChannelQueue: %s Terminated", p.delayedStarter.name)
+	log.Debug("PersistableChannelQueue: %s Terminated", q.delayedStarter.name)
 }
 
 func init() {
diff --git a/modules/queue/queue_disk_test.go b/modules/queue/queue_disk_test.go
index 038d7d8223..c7d3eb160b 100644
--- a/modules/queue/queue_disk_test.go
+++ b/modules/queue/queue_disk_test.go
@@ -34,16 +34,18 @@ func TestLevelQueue(t *testing.T) {
 	defer os.RemoveAll(tmpDir)
 
 	queue, err := NewLevelQueue(handle, LevelQueueConfiguration{
-		WorkerPoolConfiguration: WorkerPoolConfiguration{
-			QueueLength:  20,
-			BatchLength:  2,
-			BlockTimeout: 1 * time.Second,
-			BoostTimeout: 5 * time.Minute,
-			BoostWorkers: 5,
-			MaxWorkers:   10,
+		ByteFIFOQueueConfiguration: ByteFIFOQueueConfiguration{
+			WorkerPoolConfiguration: WorkerPoolConfiguration{
+				QueueLength:  20,
+				BatchLength:  2,
+				BlockTimeout: 1 * time.Second,
+				BoostTimeout: 5 * time.Minute,
+				BoostWorkers: 5,
+				MaxWorkers:   10,
+			},
+			Workers: 1,
 		},
 		DataDir: tmpDir,
-		Workers: 1,
 	}, &testData{})
 	assert.NoError(t, err)
 
@@ -105,16 +107,18 @@ func TestLevelQueue(t *testing.T) {
 		WrappedQueueConfiguration{
 			Underlying: LevelQueueType,
 			Config: LevelQueueConfiguration{
-				WorkerPoolConfiguration: WorkerPoolConfiguration{
-					QueueLength:  20,
-					BatchLength:  2,
-					BlockTimeout: 1 * time.Second,
-					BoostTimeout: 5 * time.Minute,
-					BoostWorkers: 5,
-					MaxWorkers:   10,
+				ByteFIFOQueueConfiguration: ByteFIFOQueueConfiguration{
+					WorkerPoolConfiguration: WorkerPoolConfiguration{
+						QueueLength:  20,
+						BatchLength:  2,
+						BlockTimeout: 1 * time.Second,
+						BoostTimeout: 5 * time.Minute,
+						BoostWorkers: 5,
+						MaxWorkers:   10,
+					},
+					Workers: 1,
 				},
 				DataDir: tmpDir,
-				Workers: 1,
 			},
 		}, &testData{})
 	assert.NoError(t, err)
diff --git a/modules/queue/queue_redis.go b/modules/queue/queue_redis.go
index 0167c1ec49..8a395cd5aa 100644
--- a/modules/queue/queue_redis.go
+++ b/modules/queue/queue_redis.go
@@ -5,14 +5,8 @@
 package queue
 
 import (
-	"context"
-	"encoding/json"
 	"errors"
-	"fmt"
 	"strings"
-	"sync"
-	"sync/atomic"
-	"time"
 
 	"code.gitea.io/gitea/modules/log"
 
@@ -22,37 +16,15 @@ import (
 // RedisQueueType is the type for redis queue
 const RedisQueueType Type = "redis"
 
-type redisClient interface {
-	RPush(key string, args ...interface{}) *redis.IntCmd
-	LPop(key string) *redis.StringCmd
-	LLen(key string) *redis.IntCmd
-	Ping() *redis.StatusCmd
-	Close() error
+// RedisQueueConfiguration is the configuration for the redis queue
+type RedisQueueConfiguration struct {
+	ByteFIFOQueueConfiguration
+	RedisByteFIFOConfiguration
 }
 
 // RedisQueue redis queue
 type RedisQueue struct {
-	*WorkerPool
-	client     redisClient
-	queueName  string
-	closed     chan struct{}
-	terminated chan struct{}
-	exemplar   interface{}
-	workers    int
-	name       string
-	lock       sync.Mutex
-}
-
-// RedisQueueConfiguration is the configuration for the redis queue
-type RedisQueueConfiguration struct {
-	WorkerPoolConfiguration
-	Network   string
-	Addresses string
-	Password  string
-	DBIndex   int
-	QueueName string
-	Workers   int
-	Name      string
+	*ByteFIFOQueue
 }
 
 // NewRedisQueue creates single redis or cluster redis queue
@@ -63,163 +35,111 @@ func NewRedisQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error)
 	}
 	config := configInterface.(RedisQueueConfiguration)
 
-	dbs := strings.Split(config.Addresses, ",")
-
-	var queue = &RedisQueue{
-		WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration),
-		queueName:  config.QueueName,
-		exemplar:   exemplar,
-		closed:     make(chan struct{}),
-		terminated: make(chan struct{}),
-		workers:    config.Workers,
-		name:       config.Name,
+	byteFIFO, err := NewRedisByteFIFO(config.RedisByteFIFOConfiguration)
+	if err != nil {
+		return nil, err
 	}
+
+	byteFIFOQueue, err := NewByteFIFOQueue(RedisQueueType, byteFIFO, handle, config.ByteFIFOQueueConfiguration, exemplar)
+	if err != nil {
+		return nil, err
+	}
+
+	queue := &RedisQueue{
+		ByteFIFOQueue: byteFIFOQueue,
+	}
+
+	queue.qid = GetManager().Add(queue, RedisQueueType, config, exemplar)
+
+	return queue, nil
+}
+
+type redisClient interface {
+	RPush(key string, args ...interface{}) *redis.IntCmd
+	LPop(key string) *redis.StringCmd
+	LLen(key string) *redis.IntCmd
+	SAdd(key string, members ...interface{}) *redis.IntCmd
+	SRem(key string, members ...interface{}) *redis.IntCmd
+	SIsMember(key string, member interface{}) *redis.BoolCmd
+	Ping() *redis.StatusCmd
+	Close() error
+}
+
+var _ (ByteFIFO) = &RedisByteFIFO{}
+
+// RedisByteFIFO represents a ByteFIFO formed from a redisClient
+type RedisByteFIFO struct {
+	client    redisClient
+	queueName string
+}
+
+// RedisByteFIFOConfiguration is the configuration for the RedisByteFIFO
+type RedisByteFIFOConfiguration struct {
+	Network   string
+	Addresses string
+	Password  string
+	DBIndex   int
+	QueueName string
+}
+
+// NewRedisByteFIFO creates a ByteFIFO formed from a redisClient
+func NewRedisByteFIFO(config RedisByteFIFOConfiguration) (*RedisByteFIFO, error) {
+	fifo := &RedisByteFIFO{
+		queueName: config.QueueName,
+	}
+	dbs := strings.Split(config.Addresses, ",")
 	if len(dbs) == 0 {
 		return nil, errors.New("no redis host specified")
 	} else if len(dbs) == 1 {
-		queue.client = redis.NewClient(&redis.Options{
+		fifo.client = redis.NewClient(&redis.Options{
 			Network:  config.Network,
 			Addr:     strings.TrimSpace(dbs[0]), // use default Addr
 			Password: config.Password,           // no password set
 			DB:       config.DBIndex,            // use default DB
 		})
 	} else {
-		queue.client = redis.NewClusterClient(&redis.ClusterOptions{
+		fifo.client = redis.NewClusterClient(&redis.ClusterOptions{
 			Addrs: dbs,
 		})
 	}
-	if err := queue.client.Ping().Err(); err != nil {
+	if err := fifo.client.Ping().Err(); err != nil {
 		return nil, err
 	}
-	queue.qid = GetManager().Add(queue, RedisQueueType, config, exemplar)
-
-	return queue, nil
+	return fifo, nil
 }
 
-// Run runs the redis queue
-func (r *RedisQueue) Run(atShutdown, atTerminate func(context.Context, func())) {
-	atShutdown(context.Background(), r.Shutdown)
-	atTerminate(context.Background(), r.Terminate)
-	log.Debug("RedisQueue: %s Starting", r.name)
-
-	go func() {
-		_ = r.AddWorkers(r.workers, 0)
-	}()
-
-	go r.readToChan()
-
-	log.Trace("RedisQueue: %s Waiting til closed", r.name)
-	<-r.closed
-	log.Trace("RedisQueue: %s Waiting til done", r.name)
-	r.Wait()
-
-	log.Trace("RedisQueue: %s Waiting til cleaned", r.name)
-	ctx, cancel := context.WithCancel(context.Background())
-	atTerminate(ctx, cancel)
-	r.CleanUp(ctx)
-	cancel()
-}
-
-func (r *RedisQueue) readToChan() {
-	for {
-		select {
-		case <-r.closed:
-			// tell the pool to shutdown
-			r.cancel()
-			return
-		default:
-			atomic.AddInt64(&r.numInQueue, 1)
-			bs, err := r.client.LPop(r.queueName).Bytes()
-			if err != nil && err != redis.Nil {
-				log.Error("RedisQueue: %s Error on LPop: %v", r.name, err)
-				atomic.AddInt64(&r.numInQueue, -1)
-				time.Sleep(time.Millisecond * 100)
-				continue
-			}
-
-			if len(bs) == 0 {
-				atomic.AddInt64(&r.numInQueue, -1)
-				time.Sleep(time.Millisecond * 100)
-				continue
-			}
-
-			data, err := unmarshalAs(bs, r.exemplar)
-			if err != nil {
-				log.Error("RedisQueue: %s Error on Unmarshal: %v", r.name, err)
-				atomic.AddInt64(&r.numInQueue, -1)
-				time.Sleep(time.Millisecond * 100)
-				continue
-			}
-
-			log.Trace("RedisQueue: %s Task found: %#v", r.name, data)
-			r.WorkerPool.Push(data)
-			atomic.AddInt64(&r.numInQueue, -1)
+// PushFunc pushes data to the end of the fifo and calls the callback if it is added
+func (fifo *RedisByteFIFO) PushFunc(data []byte, fn func() error) error {
+	if fn != nil {
+		if err := fn(); err != nil {
+			return err
 		}
 	}
+	return fifo.client.RPush(fifo.queueName, data).Err()
 }
 
-// Push implements Queue
-func (r *RedisQueue) Push(data Data) error {
-	if !assignableTo(data, r.exemplar) {
-		return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, r.exemplar, r.name)
+// Pop pops data from the start of the fifo
+func (fifo *RedisByteFIFO) Pop() ([]byte, error) {
+	data, err := fifo.client.LPop(fifo.queueName).Bytes()
+	if err != nil && err == redis.Nil {
+		return data, nil
 	}
-	bs, err := json.Marshal(data)
+	return data, err
+}
+
+// Close this fifo
+func (fifo *RedisByteFIFO) Close() error {
+	return fifo.client.Close()
+}
+
+// Len returns the length of the fifo
+func (fifo *RedisByteFIFO) Len() int64 {
+	val, err := fifo.client.LLen(fifo.queueName).Result()
 	if err != nil {
-		return err
+		log.Error("Error whilst getting length of redis queue %s: Error: %v", fifo.queueName, err)
+		return -1
 	}
-	return r.client.RPush(r.queueName, bs).Err()
-}
-
-// IsEmpty checks if the queue is empty
-func (r *RedisQueue) IsEmpty() bool {
-	if !r.WorkerPool.IsEmpty() {
-		return false
-	}
-	length, err := r.client.LLen(r.queueName).Result()
-	if err != nil {
-		log.Error("Error whilst getting queue length for %s: Error: %v", r.name, err)
-		return false
-	}
-	return length == 0
-}
-
-// Shutdown processing from this queue
-func (r *RedisQueue) Shutdown() {
-	log.Trace("RedisQueue: %s Shutting down", r.name)
-	r.lock.Lock()
-	select {
-	case <-r.closed:
-	default:
-		close(r.closed)
-	}
-	r.lock.Unlock()
-	log.Debug("RedisQueue: %s Shutdown", r.name)
-}
-
-// Terminate this queue and close the queue
-func (r *RedisQueue) Terminate() {
-	log.Trace("RedisQueue: %s Terminating", r.name)
-	r.Shutdown()
-	r.lock.Lock()
-	select {
-	case <-r.terminated:
-		r.lock.Unlock()
-	default:
-		close(r.terminated)
-		r.lock.Unlock()
-		if log.IsDebug() {
-			log.Debug("RedisQueue: %s Closing with %d tasks left in queue", r.name, r.client.LLen(r.queueName))
-		}
-		if err := r.client.Close(); err != nil {
-			log.Error("Error whilst closing internal redis client in %s: %v", r.name, err)
-		}
-	}
-	log.Debug("RedisQueue: %s Terminated", r.name)
-}
-
-// Name returns the name of this queue
-func (r *RedisQueue) Name() string {
-	return r.name
+	return val
 }
 
 func init() {
diff --git a/modules/queue/setting.go b/modules/queue/setting.go
index 8760c09ae8..c47e85f756 100644
--- a/modules/queue/setting.go
+++ b/modules/queue/setting.go
@@ -7,6 +7,7 @@ package queue
 import (
 	"encoding/json"
 	"fmt"
+	"strings"
 
 	"code.gitea.io/gitea/modules/log"
 	"code.gitea.io/gitea/modules/setting"
@@ -36,6 +37,7 @@ func getQueueSettings(name string) (setting.QueueSettings, []byte) {
 	opts["Password"] = q.Password
 	opts["DBIndex"] = q.DBIndex
 	opts["QueueName"] = q.QueueName
+	opts["SetName"] = q.SetName
 	opts["Workers"] = q.Workers
 	opts["MaxWorkers"] = q.MaxWorkers
 	opts["BlockTimeout"] = q.BlockTimeout
@@ -81,3 +83,41 @@ func CreateQueue(name string, handle HandlerFunc, exemplar interface{}) Queue {
 	}
 	return returnable
 }
+
+// CreateUniqueQueue for name with provided handler and exemplar
+func CreateUniqueQueue(name string, handle HandlerFunc, exemplar interface{}) UniqueQueue {
+	q, cfg := getQueueSettings(name)
+	if len(cfg) == 0 {
+		return nil
+	}
+
+	if len(q.Type) > 0 && q.Type != "dummy" && !strings.HasPrefix(q.Type, "unique-") {
+		q.Type = "unique-" + q.Type
+	}
+
+	typ, err := validType(q.Type)
+	if err != nil || typ == PersistableChannelQueueType {
+		typ = PersistableChannelUniqueQueueType
+		if err != nil {
+			log.Error("Invalid type %s provided for queue named %s defaulting to %s", q.Type, name, string(typ))
+		}
+	}
+
+	returnable, err := NewQueue(typ, handle, cfg, exemplar)
+	if q.WrapIfNecessary && err != nil {
+		log.Warn("Unable to create unique queue for %s: %v", name, err)
+		log.Warn("Attempting to create wrapped queue")
+		returnable, err = NewQueue(WrappedUniqueQueueType, handle, WrappedUniqueQueueConfiguration{
+			Underlying:  typ,
+			Timeout:     q.Timeout,
+			MaxAttempts: q.MaxAttempts,
+			Config:      cfg,
+			QueueLength: q.Length,
+		}, exemplar)
+	}
+	if err != nil {
+		log.Error("Unable to create unique queue for %s: %v", name, err)
+		return nil
+	}
+	return returnable.(UniqueQueue)
+}
diff --git a/modules/queue/unique_queue.go b/modules/queue/unique_queue.go
new file mode 100644
index 0000000000..87e0594ecf
--- /dev/null
+++ b/modules/queue/unique_queue.go
@@ -0,0 +1,29 @@
+// Copyright 2020 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package queue
+
+import (
+	"fmt"
+)
+
+// UniqueQueue defines a queue which guarantees only one instance of same
+// data is in the queue. Instances with same identity will be
+// discarded if there is already one in the line.
+//
+// This queue is particularly useful for preventing duplicated task
+// of same purpose - please note that this does not guarantee that a particular
+// task cannot be processed twice or more at the same time. Uniqueness is
+// only guaranteed whilst the task is waiting in the queue.
+//
+// Users of this queue should be careful to push only the identifier of the
+// data
+type UniqueQueue interface {
+	Queue
+	PushFunc(Data, func() error) error
+	Has(Data) (bool, error)
+}
+
+// ErrAlreadyInQueue is returned when trying to push data to the queue that is already in the queue
+var ErrAlreadyInQueue = fmt.Errorf("already in queue")
diff --git a/modules/queue/unique_queue_channel.go b/modules/queue/unique_queue_channel.go
new file mode 100644
index 0000000000..dec1cfc5c0
--- /dev/null
+++ b/modules/queue/unique_queue_channel.go
@@ -0,0 +1,132 @@
+// Copyright 2020 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package queue
+
+import (
+	"context"
+	"fmt"
+	"sync"
+
+	"code.gitea.io/gitea/modules/log"
+)
+
+// ChannelUniqueQueueType is the type for channel queue
+const ChannelUniqueQueueType Type = "unique-channel"
+
+// ChannelUniqueQueueConfiguration is the configuration for a ChannelUniqueQueue
+type ChannelUniqueQueueConfiguration ChannelQueueConfiguration
+
+// ChannelUniqueQueue implements UniqueQueue
+//
+// It is basically a thin wrapper around a WorkerPool but keeps a store of
+// what has been pushed within a table.
+//
+// Please note that this Queue does not guarantee that a particular
+// task cannot be processed twice or more at the same time. Uniqueness is
+// only guaranteed whilst the task is waiting in the queue.
+type ChannelUniqueQueue struct {
+	*WorkerPool
+	lock     sync.Mutex
+	table    map[Data]bool
+	exemplar interface{}
+	workers  int
+	name     string
+}
+
+// NewChannelUniqueQueue create a memory channel queue
+func NewChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) {
+	configInterface, err := toConfig(ChannelUniqueQueueConfiguration{}, cfg)
+	if err != nil {
+		return nil, err
+	}
+	config := configInterface.(ChannelUniqueQueueConfiguration)
+	if config.BatchLength == 0 {
+		config.BatchLength = 1
+	}
+	queue := &ChannelUniqueQueue{
+		table:    map[Data]bool{},
+		exemplar: exemplar,
+		workers:  config.Workers,
+		name:     config.Name,
+	}
+	queue.WorkerPool = NewWorkerPool(func(data ...Data) {
+		for _, datum := range data {
+			queue.lock.Lock()
+			delete(queue.table, datum)
+			queue.lock.Unlock()
+			handle(datum)
+		}
+	}, config.WorkerPoolConfiguration)
+
+	queue.qid = GetManager().Add(queue, ChannelUniqueQueueType, config, exemplar)
+	return queue, nil
+}
+
+// Run starts to run the queue
+func (q *ChannelUniqueQueue) Run(atShutdown, atTerminate func(context.Context, func())) {
+	atShutdown(context.Background(), func() {
+		log.Warn("ChannelUniqueQueue: %s is not shutdownable!", q.name)
+	})
+	atTerminate(context.Background(), func() {
+		log.Warn("ChannelUniqueQueue: %s is not terminatable!", q.name)
+	})
+	log.Debug("ChannelUniqueQueue: %s Starting", q.name)
+	go func() {
+		_ = q.AddWorkers(q.workers, 0)
+	}()
+}
+
+// Push will push data into the queue if the data is not already in the queue
+func (q *ChannelUniqueQueue) Push(data Data) error {
+	return q.PushFunc(data, nil)
+}
+
+// PushFunc will push data into the queue
+func (q *ChannelUniqueQueue) PushFunc(data Data, fn func() error) error {
+	if !assignableTo(data, q.exemplar) {
+		return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in queue: %s", data, q.exemplar, q.name)
+	}
+	q.lock.Lock()
+	locked := true
+	defer func() {
+		if locked {
+			q.lock.Unlock()
+		}
+	}()
+	if _, ok := q.table[data]; ok {
+		return ErrAlreadyInQueue
+	}
+	// FIXME: We probably need to implement some sort of limit here
+	// If the downstream queue blocks this table will grow without limit
+	q.table[data] = true
+	if fn != nil {
+		err := fn()
+		if err != nil {
+			delete(q.table, data)
+			return err
+		}
+	}
+	locked = false
+	q.lock.Unlock()
+	q.WorkerPool.Push(data)
+	return nil
+}
+
+// Has checks if the data is in the queue
+func (q *ChannelUniqueQueue) Has(data Data) (bool, error) {
+	q.lock.Lock()
+	defer q.lock.Unlock()
+	_, has := q.table[data]
+	return has, nil
+}
+
+// Name returns the name of this queue
+func (q *ChannelUniqueQueue) Name() string {
+	return q.name
+}
+
+func init() {
+	queuesMap[ChannelUniqueQueueType] = NewChannelUniqueQueue
+}
diff --git a/modules/queue/unique_queue_disk.go b/modules/queue/unique_queue_disk.go
new file mode 100644
index 0000000000..bfe7aeed83
--- /dev/null
+++ b/modules/queue/unique_queue_disk.go
@@ -0,0 +1,104 @@
+// Copyright 2019 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package queue
+
+import (
+	"gitea.com/lunny/levelqueue"
+)
+
+// LevelUniqueQueueType is the type for level queue
+const LevelUniqueQueueType Type = "unique-level"
+
+// LevelUniqueQueueConfiguration is the configuration for a LevelUniqueQueue
+type LevelUniqueQueueConfiguration struct {
+	ByteFIFOQueueConfiguration
+	DataDir string
+}
+
+// LevelUniqueQueue implements a disk library queue
+type LevelUniqueQueue struct {
+	*ByteFIFOUniqueQueue
+}
+
+// NewLevelUniqueQueue creates a ledis local queue
+//
+// Please note that this Queue does not guarantee that a particular
+// task cannot be processed twice or more at the same time. Uniqueness is
+// only guaranteed whilst the task is waiting in the queue.
+func NewLevelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) {
+	configInterface, err := toConfig(LevelUniqueQueueConfiguration{}, cfg)
+	if err != nil {
+		return nil, err
+	}
+	config := configInterface.(LevelUniqueQueueConfiguration)
+
+	byteFIFO, err := NewLevelUniqueQueueByteFIFO(config.DataDir)
+	if err != nil {
+		return nil, err
+	}
+
+	byteFIFOQueue, err := NewByteFIFOUniqueQueue(LevelUniqueQueueType, byteFIFO, handle, config.ByteFIFOQueueConfiguration, exemplar)
+	if err != nil {
+		return nil, err
+	}
+
+	queue := &LevelUniqueQueue{
+		ByteFIFOUniqueQueue: byteFIFOQueue,
+	}
+	queue.qid = GetManager().Add(queue, LevelUniqueQueueType, config, exemplar)
+	return queue, nil
+}
+
+var _ (UniqueByteFIFO) = &LevelUniqueQueueByteFIFO{}
+
+// LevelUniqueQueueByteFIFO represents a ByteFIFO formed from a LevelUniqueQueue
+type LevelUniqueQueueByteFIFO struct {
+	internal *levelqueue.UniqueQueue
+}
+
+// NewLevelUniqueQueueByteFIFO creates a new ByteFIFO formed from a LevelUniqueQueue
+func NewLevelUniqueQueueByteFIFO(dataDir string) (*LevelUniqueQueueByteFIFO, error) {
+	internal, err := levelqueue.OpenUnique(dataDir)
+	if err != nil {
+		return nil, err
+	}
+
+	return &LevelUniqueQueueByteFIFO{
+		internal: internal,
+	}, nil
+}
+
+// PushFunc pushes data to the end of the fifo and calls the callback if it is added
+func (fifo *LevelUniqueQueueByteFIFO) PushFunc(data []byte, fn func() error) error {
+	return fifo.internal.LPushFunc(data, fn)
+}
+
+// Pop pops data from the start of the fifo
+func (fifo *LevelUniqueQueueByteFIFO) Pop() ([]byte, error) {
+	data, err := fifo.internal.RPop()
+	if err != nil && err != levelqueue.ErrNotFound {
+		return nil, err
+	}
+	return data, nil
+}
+
+// Len returns the length of the fifo
+func (fifo *LevelUniqueQueueByteFIFO) Len() int64 {
+	return fifo.internal.Len()
+}
+
+// Has returns whether the fifo contains this data
+func (fifo *LevelUniqueQueueByteFIFO) Has(data []byte) (bool, error) {
+	return fifo.internal.Has(data)
+}
+
+// Close this fifo
+func (fifo *LevelUniqueQueueByteFIFO) Close() error {
+	return fifo.internal.Close()
+}
+
+func init() {
+	queuesMap[LevelUniqueQueueType] = NewLevelUniqueQueue
+}
diff --git a/modules/queue/unique_queue_disk_channel.go b/modules/queue/unique_queue_disk_channel.go
new file mode 100644
index 0000000000..71049f3259
--- /dev/null
+++ b/modules/queue/unique_queue_disk_channel.go
@@ -0,0 +1,241 @@
+// Copyright 2020 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package queue
+
+import (
+	"context"
+	"sync"
+	"time"
+
+	"code.gitea.io/gitea/modules/log"
+)
+
+// PersistableChannelUniqueQueueType is the type for persistable queue
+const PersistableChannelUniqueQueueType Type = "unique-persistable-channel"
+
+// PersistableChannelUniqueQueueConfiguration is the configuration for a PersistableChannelUniqueQueue
+type PersistableChannelUniqueQueueConfiguration struct {
+	Name         string
+	DataDir      string
+	BatchLength  int
+	QueueLength  int
+	Timeout      time.Duration
+	MaxAttempts  int
+	Workers      int
+	MaxWorkers   int
+	BlockTimeout time.Duration
+	BoostTimeout time.Duration
+	BoostWorkers int
+}
+
+// PersistableChannelUniqueQueue wraps a channel queue and level queue together
+//
+// Please note that this Queue does not guarantee that a particular
+// task cannot be processed twice or more at the same time. Uniqueness is
+// only guaranteed whilst the task is waiting in the queue.
+type PersistableChannelUniqueQueue struct {
+	*ChannelUniqueQueue
+	delayedStarter
+	lock   sync.Mutex
+	closed chan struct{}
+}
+
+// NewPersistableChannelUniqueQueue creates a wrapped batched channel queue with persistable level queue backend when shutting down
+// This differs from a wrapped queue in that the persistent queue is only used to persist at shutdown/terminate
+func NewPersistableChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) {
+	configInterface, err := toConfig(PersistableChannelUniqueQueueConfiguration{}, cfg)
+	if err != nil {
+		return nil, err
+	}
+	config := configInterface.(PersistableChannelUniqueQueueConfiguration)
+
+	channelUniqueQueue, err := NewChannelUniqueQueue(handle, ChannelUniqueQueueConfiguration{
+		WorkerPoolConfiguration: WorkerPoolConfiguration{
+			QueueLength:  config.QueueLength,
+			BatchLength:  config.BatchLength,
+			BlockTimeout: config.BlockTimeout,
+			BoostTimeout: config.BoostTimeout,
+			BoostWorkers: config.BoostWorkers,
+			MaxWorkers:   config.MaxWorkers,
+		},
+		Workers: config.Workers,
+		Name:    config.Name + "-channel",
+	}, exemplar)
+	if err != nil {
+		return nil, err
+	}
+
+	// the level backend only needs temporary workers to catch up with the previously dropped work
+	levelCfg := LevelUniqueQueueConfiguration{
+		ByteFIFOQueueConfiguration: ByteFIFOQueueConfiguration{
+			WorkerPoolConfiguration: WorkerPoolConfiguration{
+				QueueLength:  config.QueueLength,
+				BatchLength:  config.BatchLength,
+				BlockTimeout: 0,
+				BoostTimeout: 0,
+				BoostWorkers: 0,
+				MaxWorkers:   1,
+			},
+			Workers: 1,
+			Name:    config.Name + "-level",
+		},
+		DataDir: config.DataDir,
+	}
+
+	queue := &PersistableChannelUniqueQueue{
+		ChannelUniqueQueue: channelUniqueQueue.(*ChannelUniqueQueue),
+		closed:             make(chan struct{}),
+	}
+
+	levelQueue, err := NewLevelUniqueQueue(func(data ...Data) {
+		for _, datum := range data {
+			err := queue.Push(datum)
+			if err != nil && err != ErrAlreadyInQueue {
+				log.Error("Unable push to channelled queue: %v", err)
+			}
+		}
+	}, levelCfg, exemplar)
+	if err == nil {
+		queue.delayedStarter = delayedStarter{
+			internal: levelQueue.(*LevelUniqueQueue),
+			name:     config.Name,
+		}
+
+		_ = GetManager().Add(queue, PersistableChannelUniqueQueueType, config, exemplar)
+		return queue, nil
+	}
+	if IsErrInvalidConfiguration(err) {
+		// Retrying ain't gonna make this any better...
+		return nil, ErrInvalidConfiguration{cfg: cfg}
+	}
+
+	queue.delayedStarter = delayedStarter{
+		cfg:         levelCfg,
+		underlying:  LevelUniqueQueueType,
+		timeout:     config.Timeout,
+		maxAttempts: config.MaxAttempts,
+		name:        config.Name,
+	}
+	_ = GetManager().Add(queue, PersistableChannelUniqueQueueType, config, exemplar)
+	return queue, nil
+}
+
+// Name returns the name of this queue
+func (q *PersistableChannelUniqueQueue) Name() string {
+	return q.delayedStarter.name
+}
+
+// Push will push the indexer data to queue
+func (q *PersistableChannelUniqueQueue) Push(data Data) error {
+	return q.PushFunc(data, nil)
+}
+
+// PushFunc will push the indexer data to queue
+func (q *PersistableChannelUniqueQueue) PushFunc(data Data, fn func() error) error {
+	select {
+	case <-q.closed:
+		return q.internal.(UniqueQueue).PushFunc(data, fn)
+	default:
+		return q.ChannelUniqueQueue.PushFunc(data, fn)
+	}
+}
+
+// Has will test if the queue has the data
+func (q *PersistableChannelUniqueQueue) Has(data Data) (bool, error) {
+	// This is more difficult...
+	has, err := q.ChannelUniqueQueue.Has(data)
+	if err != nil || has {
+		return has, err
+	}
+	return q.internal.(UniqueQueue).Has(data)
+}
+
+// Run starts to run the queue
+func (q *PersistableChannelUniqueQueue) Run(atShutdown, atTerminate func(context.Context, func())) {
+	log.Debug("PersistableChannelUniqueQueue: %s Starting", q.delayedStarter.name)
+
+	q.lock.Lock()
+	if q.internal == nil {
+		err := q.setInternal(atShutdown, func(data ...Data) {
+			for _, datum := range data {
+				err := q.Push(datum)
+				if err != nil && err != ErrAlreadyInQueue {
+					log.Error("Unable push to channelled queue: %v", err)
+				}
+			}
+		}, q.exemplar)
+		q.lock.Unlock()
+		if err != nil {
+			log.Fatal("Unable to create internal queue for %s Error: %v", q.Name(), err)
+			return
+		}
+	} else {
+		q.lock.Unlock()
+	}
+	atShutdown(context.Background(), q.Shutdown)
+	atTerminate(context.Background(), q.Terminate)
+
+	// Just run the level queue - we shut it down later
+	go q.internal.Run(func(_ context.Context, _ func()) {}, func(_ context.Context, _ func()) {})
+
+	go func() {
+		_ = q.ChannelUniqueQueue.AddWorkers(q.workers, 0)
+	}()
+
+	log.Trace("PersistableChannelUniqueQueue: %s Waiting til closed", q.delayedStarter.name)
+	<-q.closed
+	log.Trace("PersistableChannelUniqueQueue: %s Cancelling pools", q.delayedStarter.name)
+	q.internal.(*LevelUniqueQueue).cancel()
+	q.ChannelUniqueQueue.cancel()
+	log.Trace("PersistableChannelUniqueQueue: %s Waiting til done", q.delayedStarter.name)
+	q.ChannelUniqueQueue.Wait()
+	q.internal.(*LevelUniqueQueue).Wait()
+	// Redirect all remaining data in the chan to the internal channel
+	go func() {
+		log.Trace("PersistableChannelUniqueQueue: %s Redirecting remaining data", q.delayedStarter.name)
+		for data := range q.ChannelUniqueQueue.dataChan {
+			_ = q.internal.Push(data)
+		}
+		log.Trace("PersistableChannelUniqueQueue: %s Done Redirecting remaining data", q.delayedStarter.name)
+	}()
+	log.Trace("PersistableChannelUniqueQueue: %s Done main loop", q.delayedStarter.name)
+}
+
+// Flush flushes the queue
+func (q *PersistableChannelUniqueQueue) Flush(timeout time.Duration) error {
+	return q.ChannelUniqueQueue.Flush(timeout)
+}
+
+// Shutdown processing this queue
+func (q *PersistableChannelUniqueQueue) Shutdown() {
+	log.Trace("PersistableChannelUniqueQueue: %s Shutting down", q.delayedStarter.name)
+	q.lock.Lock()
+	defer q.lock.Unlock()
+	select {
+	case <-q.closed:
+	default:
+		if q.internal != nil {
+			q.internal.(*LevelUniqueQueue).Shutdown()
+		}
+		close(q.closed)
+	}
+	log.Debug("PersistableChannelUniqueQueue: %s Shutdown", q.delayedStarter.name)
+}
+
+// Terminate this queue and close the queue
+func (q *PersistableChannelUniqueQueue) Terminate() {
+	log.Trace("PersistableChannelUniqueQueue: %s Terminating", q.delayedStarter.name)
+	q.Shutdown()
+	q.lock.Lock()
+	defer q.lock.Unlock()
+	if q.internal != nil {
+		q.internal.(*LevelUniqueQueue).Terminate()
+	}
+	log.Debug("PersistableChannelUniqueQueue: %s Terminated", q.delayedStarter.name)
+}
+
+func init() {
+	queuesMap[PersistableChannelUniqueQueueType] = NewPersistableChannelUniqueQueue
+}
diff --git a/modules/queue/unique_queue_redis.go b/modules/queue/unique_queue_redis.go
new file mode 100644
index 0000000000..e5b2c48dbb
--- /dev/null
+++ b/modules/queue/unique_queue_redis.go
@@ -0,0 +1,124 @@
+// Copyright 2019 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package queue
+
+// RedisUniqueQueueType is the type for redis queue
+const RedisUniqueQueueType Type = "unique-redis"
+
+// RedisUniqueQueue redis queue
+type RedisUniqueQueue struct {
+	*ByteFIFOUniqueQueue
+}
+
+// RedisUniqueQueueConfiguration is the configuration for the redis queue
+type RedisUniqueQueueConfiguration struct {
+	ByteFIFOQueueConfiguration
+	RedisUniqueByteFIFOConfiguration
+}
+
+// NewRedisUniqueQueue creates single redis or cluster redis queue.
+//
+// Please note that this Queue does not guarantee that a particular
+// task cannot be processed twice or more at the same time. Uniqueness is
+// only guaranteed whilst the task is waiting in the queue.
+func NewRedisUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) {
+	configInterface, err := toConfig(RedisUniqueQueueConfiguration{}, cfg)
+	if err != nil {
+		return nil, err
+	}
+	config := configInterface.(RedisUniqueQueueConfiguration)
+
+	byteFIFO, err := NewRedisUniqueByteFIFO(config.RedisUniqueByteFIFOConfiguration)
+	if err != nil {
+		return nil, err
+	}
+
+	if len(byteFIFO.setName) == 0 {
+		byteFIFO.setName = byteFIFO.queueName + "_unique"
+	}
+
+	byteFIFOQueue, err := NewByteFIFOUniqueQueue(RedisUniqueQueueType, byteFIFO, handle, config.ByteFIFOQueueConfiguration, exemplar)
+	if err != nil {
+		return nil, err
+	}
+
+	queue := &RedisUniqueQueue{
+		ByteFIFOUniqueQueue: byteFIFOQueue,
+	}
+
+	queue.qid = GetManager().Add(queue, RedisUniqueQueueType, config, exemplar)
+
+	return queue, nil
+}
+
+var _ (UniqueByteFIFO) = &RedisUniqueByteFIFO{}
+
+// RedisUniqueByteFIFO represents a UniqueByteFIFO formed from a redisClient
+type RedisUniqueByteFIFO struct {
+	RedisByteFIFO
+	setName string
+}
+
+// RedisUniqueByteFIFOConfiguration is the configuration for the RedisUniqueByteFIFO
+type RedisUniqueByteFIFOConfiguration struct {
+	RedisByteFIFOConfiguration
+	SetName string
+}
+
+// NewRedisUniqueByteFIFO creates a UniqueByteFIFO formed from a redisClient
+func NewRedisUniqueByteFIFO(config RedisUniqueByteFIFOConfiguration) (*RedisUniqueByteFIFO, error) {
+	internal, err := NewRedisByteFIFO(config.RedisByteFIFOConfiguration)
+	if err != nil {
+		return nil, err
+	}
+
+	fifo := &RedisUniqueByteFIFO{
+		RedisByteFIFO: *internal,
+		setName:       config.SetName,
+	}
+
+	return fifo, nil
+}
+
+// PushFunc pushes data to the end of the fifo and calls the callback if it is added
+func (fifo *RedisUniqueByteFIFO) PushFunc(data []byte, fn func() error) error {
+	added, err := fifo.client.SAdd(fifo.setName, data).Result()
+	if err != nil {
+		return err
+	}
+	if added == 0 {
+		return ErrAlreadyInQueue
+	}
+	if fn != nil {
+		if err := fn(); err != nil {
+			return err
+		}
+	}
+	return fifo.client.RPush(fifo.queueName, data).Err()
+}
+
+// Pop pops data from the start of the fifo
+func (fifo *RedisUniqueByteFIFO) Pop() ([]byte, error) {
+	data, err := fifo.client.LPop(fifo.queueName).Bytes()
+	if err != nil {
+		return data, err
+	}
+
+	if len(data) == 0 {
+		return data, nil
+	}
+
+	err = fifo.client.SRem(fifo.setName, data).Err()
+	return data, err
+}
+
+// Has returns whether the fifo contains this data
+func (fifo *RedisUniqueByteFIFO) Has(data []byte) (bool, error) {
+	return fifo.client.SIsMember(fifo.setName, data).Result()
+}
+
+func init() {
+	queuesMap[RedisUniqueQueueType] = NewRedisUniqueQueue
+}
diff --git a/modules/queue/unique_queue_wrapped.go b/modules/queue/unique_queue_wrapped.go
new file mode 100644
index 0000000000..8c815218dd
--- /dev/null
+++ b/modules/queue/unique_queue_wrapped.go
@@ -0,0 +1,172 @@
+// Copyright 2020 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package queue
+
+import (
+	"fmt"
+	"sync"
+	"time"
+)
+
+// WrappedUniqueQueueType is the type for a wrapped delayed starting queue
+const WrappedUniqueQueueType Type = "unique-wrapped"
+
+// WrappedUniqueQueueConfiguration is the configuration for a WrappedUniqueQueue
+type WrappedUniqueQueueConfiguration struct {
+	Underlying  Type
+	Timeout     time.Duration
+	MaxAttempts int
+	Config      interface{}
+	QueueLength int
+	Name        string
+}
+
+// WrappedUniqueQueue wraps a delayed starting unique queue
+type WrappedUniqueQueue struct {
+	*WrappedQueue
+	table map[Data]bool
+	tlock sync.Mutex
+	ready bool
+}
+
+// NewWrappedUniqueQueue will attempt to create a unique queue of the provided type,
+// but if there is a problem creating this queue it will instead create
+// a WrappedUniqueQueue with delayed startup of the queue instead and a
+// channel which will be redirected to the queue
+//
+// Please note that this Queue does not guarantee that a particular
+// task cannot be processed twice or more at the same time. Uniqueness is
+// only guaranteed whilst the task is waiting in the queue.
+func NewWrappedUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) {
+	configInterface, err := toConfig(WrappedUniqueQueueConfiguration{}, cfg)
+	if err != nil {
+		return nil, err
+	}
+	config := configInterface.(WrappedUniqueQueueConfiguration)
+
+	queue, err := NewQueue(config.Underlying, handle, config.Config, exemplar)
+	if err == nil {
+		// Just return the queue there is no need to wrap
+		return queue, nil
+	}
+	if IsErrInvalidConfiguration(err) {
+		// Retrying ain't gonna make this any better...
+		return nil, ErrInvalidConfiguration{cfg: cfg}
+	}
+
+	wrapped := &WrappedUniqueQueue{
+		WrappedQueue: &WrappedQueue{
+			channel:  make(chan Data, config.QueueLength),
+			exemplar: exemplar,
+			delayedStarter: delayedStarter{
+				cfg:         config.Config,
+				underlying:  config.Underlying,
+				timeout:     config.Timeout,
+				maxAttempts: config.MaxAttempts,
+				name:        config.Name,
+			},
+		},
+		table: map[Data]bool{},
+	}
+
+	// wrapped.handle is passed to the delayedStarting internal queue and is run to handle
+	// data passed to
+	wrapped.handle = func(data ...Data) {
+		for _, datum := range data {
+			wrapped.tlock.Lock()
+			if !wrapped.ready {
+				delete(wrapped.table, data)
+				// If our table is empty all of the requests we have buffered between the
+				// wrapper queue starting and the internal queue starting have been handled.
+				// We can stop buffering requests in our local table and just pass Push
+				// direct to the internal queue
+				if len(wrapped.table) == 0 {
+					wrapped.ready = true
+				}
+			}
+			wrapped.tlock.Unlock()
+			handle(datum)
+		}
+	}
+	_ = GetManager().Add(queue, WrappedUniqueQueueType, config, exemplar)
+	return wrapped, nil
+}
+
+// Push will push the data to the internal channel checking it against the exemplar
+func (q *WrappedUniqueQueue) Push(data Data) error {
+	return q.PushFunc(data, nil)
+}
+
+// PushFunc will push the data to the internal channel checking it against the exemplar
+func (q *WrappedUniqueQueue) PushFunc(data Data, fn func() error) error {
+	if !assignableTo(data, q.exemplar) {
+		return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name)
+	}
+
+	q.tlock.Lock()
+	if q.ready {
+		// ready means our table is empty and all of the requests we have buffered between the
+		// wrapper queue starting and the internal queue starting have been handled.
+		// We can stop buffering requests in our local table and just pass Push
+		// direct to the internal queue
+		q.tlock.Unlock()
+		return q.internal.(UniqueQueue).PushFunc(data, fn)
+	}
+
+	locked := true
+	defer func() {
+		if locked {
+			q.tlock.Unlock()
+		}
+	}()
+	if _, ok := q.table[data]; ok {
+		return ErrAlreadyInQueue
+	}
+	// FIXME: We probably need to implement some sort of limit here
+	// If the downstream queue blocks this table will grow without limit
+	q.table[data] = true
+	if fn != nil {
+		err := fn()
+		if err != nil {
+			delete(q.table, data)
+			return err
+		}
+	}
+	locked = false
+	q.tlock.Unlock()
+
+	q.channel <- data
+	return nil
+}
+
+// Has checks if the data is in the queue
+func (q *WrappedUniqueQueue) Has(data Data) (bool, error) {
+	q.tlock.Lock()
+	defer q.tlock.Unlock()
+	if q.ready {
+		return q.internal.(UniqueQueue).Has(data)
+	}
+	_, has := q.table[data]
+	return has, nil
+}
+
+// IsEmpty checks whether the queue is empty
+func (q *WrappedUniqueQueue) IsEmpty() bool {
+	q.tlock.Lock()
+	if len(q.table) > 0 {
+		q.tlock.Unlock()
+		return false
+	}
+	if q.ready {
+		q.tlock.Unlock()
+		return q.internal.IsEmpty()
+	}
+	q.tlock.Unlock()
+	return false
+}
+
+func init() {
+	queuesMap[WrappedUniqueQueueType] = NewWrappedUniqueQueue
+}
diff --git a/modules/setting/queue.go b/modules/setting/queue.go
index 934c5a8108..8bdca1017f 100644
--- a/modules/setting/queue.go
+++ b/modules/setting/queue.go
@@ -26,6 +26,7 @@ type QueueSettings struct {
 	Addresses        string
 	Password         string
 	QueueName        string
+	SetName          string
 	DBIndex          int
 	WrapIfNecessary  bool
 	MaxAttempts      int
@@ -54,8 +55,13 @@ func GetQueueSettings(name string) QueueSettings {
 			q.DataDir = key.MustString(q.DataDir)
 		case "QUEUE_NAME":
 			q.QueueName = key.MustString(q.QueueName)
+		case "SET_NAME":
+			q.SetName = key.MustString(q.SetName)
 		}
 	}
+	if len(q.SetName) == 0 && len(Queue.SetName) > 0 {
+		q.SetName = q.QueueName + Queue.SetName
+	}
 	if !filepath.IsAbs(q.DataDir) {
 		q.DataDir = filepath.Join(AppDataPath, q.DataDir)
 	}
@@ -100,6 +106,7 @@ func NewQueueService() {
 	Queue.BoostTimeout = sec.Key("BOOST_TIMEOUT").MustDuration(5 * time.Minute)
 	Queue.BoostWorkers = sec.Key("BOOST_WORKERS").MustInt(5)
 	Queue.QueueName = sec.Key("QUEUE_NAME").MustString("_queue")
+	Queue.SetName = sec.Key("SET_NAME").MustString("")
 
 	// Now handle the old issue_indexer configuration
 	section := Cfg.Section("queue.issue_indexer")
@@ -142,6 +149,17 @@ func NewQueueService() {
 	if _, ok := sectionMap["LENGTH"]; !ok {
 		_, _ = section.NewKey("LENGTH", fmt.Sprintf("%d", Cfg.Section("mailer").Key("SEND_BUFFER_LEN").MustInt(100)))
 	}
+
+	// Handle the old test pull requests configuration
+	// Please note this will be a unique queue
+	section = Cfg.Section("queue.pr_patch_checker")
+	sectionMap = map[string]bool{}
+	for _, key := range section.Keys() {
+		sectionMap[key.Name()] = true
+	}
+	if _, ok := sectionMap["LENGTH"]; !ok {
+		_, _ = section.NewKey("LENGTH", fmt.Sprintf("%d", Repository.PullRequestQueueLength))
+	}
 }
 
 // ParseQueueConnStr parses a queue connection string
diff --git a/routers/init.go b/routers/init.go
index 1d7cf78438..f86a7ad4b2 100644
--- a/routers/init.go
+++ b/routers/init.go
@@ -113,7 +113,9 @@ func GlobalInit(ctx context.Context) {
 		code_indexer.Init()
 		mirror_service.InitSyncMirrors()
 		webhook.InitDeliverHooks()
-		pull_service.Init()
+		if err := pull_service.Init(); err != nil {
+			log.Fatal("Failed to initialize test pull requests queue: %v", err)
+		}
 		if err := task.Init(); err != nil {
 			log.Fatal("Failed to initialize task scheduler: %v", err)
 		}
diff --git a/services/pull/check.go b/services/pull/check.go
index 5d380b4609..d64f49de3b 100644
--- a/services/pull/check.go
+++ b/services/pull/check.go
@@ -10,6 +10,7 @@ import (
 	"fmt"
 	"io/ioutil"
 	"os"
+	"strconv"
 	"strings"
 
 	"code.gitea.io/gitea/models"
@@ -17,24 +18,32 @@ import (
 	"code.gitea.io/gitea/modules/graceful"
 	"code.gitea.io/gitea/modules/log"
 	"code.gitea.io/gitea/modules/notification"
-	"code.gitea.io/gitea/modules/setting"
-	"code.gitea.io/gitea/modules/sync"
+	"code.gitea.io/gitea/modules/queue"
 	"code.gitea.io/gitea/modules/timeutil"
 
 	"github.com/unknwon/com"
 )
 
-// pullRequestQueue represents a queue to handle update pull request tests
-var pullRequestQueue = sync.NewUniqueQueue(setting.Repository.PullRequestQueueLength)
+// prQueue represents a queue to handle update pull request tests
+var prQueue queue.UniqueQueue
 
 // AddToTaskQueue adds itself to pull request test task queue.
 func AddToTaskQueue(pr *models.PullRequest) {
-	go pullRequestQueue.AddFunc(pr.ID, func() {
-		pr.Status = models.PullRequestStatusChecking
-		if err := pr.UpdateCols("status"); err != nil {
-			log.Error("AddToTaskQueue.UpdateCols[%d].(add to queue): %v", pr.ID, err)
+	go func() {
+		err := prQueue.PushFunc(strconv.FormatInt(pr.ID, 10), func() error {
+			pr.Status = models.PullRequestStatusChecking
+			err := pr.UpdateCols("status")
+			if err != nil {
+				log.Error("AddToTaskQueue.UpdateCols[%d].(add to queue): %v", pr.ID, err)
+			} else {
+				log.Trace("Adding PR ID: %d to the test pull requests queue", pr.ID)
+			}
+			return err
+		})
+		if err != nil && err != queue.ErrAlreadyInQueue {
+			log.Error("Error adding prID %d to the test pull requests queue: %v", pr.ID, err)
 		}
-	})
+	}()
 }
 
 // checkAndUpdateStatus checks if pull request is possible to leaving checking status,
@@ -46,7 +55,12 @@ func checkAndUpdateStatus(pr *models.PullRequest) {
 	}
 
 	// Make sure there is no waiting test to process before leaving the checking status.
-	if !pullRequestQueue.Exist(pr.ID) {
+	has, err := prQueue.Has(strconv.FormatInt(pr.ID, 10))
+	if err != nil {
+		log.Error("Unable to check if the queue is waiting to reprocess pr.ID %d. Error: %v", pr.ID, err)
+	}
+
+	if !has {
 		if err := pr.UpdateCols("status, conflicted_files"); err != nil {
 			log.Error("Update[%d]: %v", pr.ID, err)
 		}
@@ -73,7 +87,8 @@ func getMergeCommit(pr *models.PullRequest) (*git.Commit, error) {
 	headFile := pr.GetGitRefName()
 
 	// Check if a pull request is merged into BaseBranch
-	_, err = git.NewCommand("merge-base", "--is-ancestor", headFile, pr.BaseBranch).RunInDirWithEnv(pr.BaseRepo.RepoPath(), []string{"GIT_INDEX_FILE=" + indexTmpPath, "GIT_DIR=" + pr.BaseRepo.RepoPath()})
+	_, err = git.NewCommand("merge-base", "--is-ancestor", headFile, pr.BaseBranch).
+		RunInDirWithEnv(pr.BaseRepo.RepoPath(), []string{"GIT_INDEX_FILE=" + indexTmpPath, "GIT_DIR=" + pr.BaseRepo.RepoPath()})
 	if err != nil {
 		// Errors are signaled by a non-zero status that is not 1
 		if strings.Contains(err.Error(), "exit status 1") {
@@ -93,7 +108,8 @@ func getMergeCommit(pr *models.PullRequest) (*git.Commit, error) {
 	cmd := commitID[:40] + ".." + pr.BaseBranch
 
 	// Get the commit from BaseBranch where the pull request got merged
-	mergeCommit, err := git.NewCommand("rev-list", "--ancestry-path", "--merges", "--reverse", cmd).RunInDirWithEnv("", []string{"GIT_INDEX_FILE=" + indexTmpPath, "GIT_DIR=" + pr.BaseRepo.RepoPath()})
+	mergeCommit, err := git.NewCommand("rev-list", "--ancestry-path", "--merges", "--reverse", cmd).
+		RunInDirWithEnv("", []string{"GIT_INDEX_FILE=" + indexTmpPath, "GIT_DIR=" + pr.BaseRepo.RepoPath()})
 	if err != nil {
 		return nil, fmt.Errorf("git rev-list --ancestry-path --merges --reverse: %v", err)
 	} else if len(mergeCommit) < 40 {
@@ -155,61 +171,65 @@ func manuallyMerged(pr *models.PullRequest) bool {
 	return false
 }
 
-// TestPullRequests checks and tests untested patches of pull requests.
-// TODO: test more pull requests at same time.
-func TestPullRequests(ctx context.Context) {
-
-	go func() {
-		prs, err := models.GetPullRequestIDsByCheckStatus(models.PullRequestStatusChecking)
-		if err != nil {
-			log.Error("Find Checking PRs: %v", err)
-			return
-		}
-		for _, prID := range prs {
-			select {
-			case <-ctx.Done():
-				return
-			default:
-				pullRequestQueue.Add(prID)
-			}
-		}
-	}()
-
-	// Start listening on new test requests.
-	for {
+// InitializePullRequests checks and tests untested patches of pull requests.
+func InitializePullRequests(ctx context.Context) {
+	prs, err := models.GetPullRequestIDsByCheckStatus(models.PullRequestStatusChecking)
+	if err != nil {
+		log.Error("Find Checking PRs: %v", err)
+		return
+	}
+	for _, prID := range prs {
 		select {
-		case prID := <-pullRequestQueue.Queue():
-			log.Trace("TestPullRequests[%v]: processing test task", prID)
-			pullRequestQueue.Remove(prID)
-
-			id := com.StrTo(prID).MustInt64()
-
-			pr, err := models.GetPullRequestByID(id)
-			if err != nil {
-				log.Error("GetPullRequestByID[%s]: %v", prID, err)
-				continue
-			} else if pr.Status != models.PullRequestStatusChecking {
-				continue
-			} else if manuallyMerged(pr) {
-				continue
-			} else if err = TestPatch(pr); err != nil {
-				log.Error("testPatch[%d]: %v", pr.ID, err)
-				pr.Status = models.PullRequestStatusError
-				if err := pr.UpdateCols("status"); err != nil {
-					log.Error("update pr [%d] status to PullRequestStatusError failed: %v", pr.ID, err)
-				}
-				continue
-			}
-			checkAndUpdateStatus(pr)
 		case <-ctx.Done():
-			pullRequestQueue.Close()
-			log.Info("PID: %d Pull Request testing shutdown", os.Getpid())
 			return
+		default:
+			if err := prQueue.PushFunc(strconv.FormatInt(prID, 10), func() error {
+				log.Trace("Adding PR ID: %d to the pull requests patch checking queue", prID)
+				return nil
+			}); err != nil {
+				log.Error("Error adding prID: %s to the pull requests patch checking queue %v", prID, err)
+			}
 		}
 	}
 }
 
-// Init runs the task queue to test all the checking status pull requests
-func Init() {
-	go graceful.GetManager().RunWithShutdownContext(TestPullRequests)
+// handle passed PR IDs and test the PRs
+func handle(data ...queue.Data) {
+	for _, datum := range data {
+		prID := datum.(string)
+		id := com.StrTo(prID).MustInt64()
+
+		log.Trace("Testing PR ID %d from the pull requests patch checking queue", id)
+
+		pr, err := models.GetPullRequestByID(id)
+		if err != nil {
+			log.Error("GetPullRequestByID[%s]: %v", prID, err)
+			continue
+		} else if pr.Status != models.PullRequestStatusChecking {
+			continue
+		} else if manuallyMerged(pr) {
+			continue
+		} else if err = TestPatch(pr); err != nil {
+			log.Error("testPatch[%d]: %v", pr.ID, err)
+			pr.Status = models.PullRequestStatusError
+			if err := pr.UpdateCols("status"); err != nil {
+				log.Error("update pr [%d] status to PullRequestStatusError failed: %v", pr.ID, err)
+			}
+			continue
+		}
+		checkAndUpdateStatus(pr)
+	}
+}
+
+// Init runs the task queue to test all the checking status pull requests
+func Init() error {
+	prQueue = queue.CreateUniqueQueue("pr_patch_checker", handle, "").(queue.UniqueQueue)
+
+	if prQueue == nil {
+		return fmt.Errorf("Unable to create pr_patch_checker Queue")
+	}
+
+	go graceful.GetManager().RunWithShutdownFns(prQueue.Run)
+	go graceful.GetManager().RunWithShutdownContext(InitializePullRequests)
+	return nil
 }
diff --git a/services/pull/check_test.go b/services/pull/check_test.go
index 48a7774a61..4591edd7aa 100644
--- a/services/pull/check_test.go
+++ b/services/pull/check_test.go
@@ -6,29 +6,82 @@
 package pull
 
 import (
+	"context"
 	"strconv"
 	"testing"
 	"time"
 
 	"code.gitea.io/gitea/models"
+	"code.gitea.io/gitea/modules/queue"
 
 	"github.com/stretchr/testify/assert"
+	"github.com/unknwon/com"
 )
 
 func TestPullRequest_AddToTaskQueue(t *testing.T) {
 	assert.NoError(t, models.PrepareTestDatabase())
 
+	idChan := make(chan int64, 10)
+
+	q, err := queue.NewChannelUniqueQueue(func(data ...queue.Data) {
+		for _, datum := range data {
+			prID := datum.(string)
+			id := com.StrTo(prID).MustInt64()
+			idChan <- id
+		}
+	}, queue.ChannelUniqueQueueConfiguration{
+		WorkerPoolConfiguration: queue.WorkerPoolConfiguration{
+			QueueLength: 10,
+			BatchLength: 1,
+		},
+		Workers: 1,
+		Name:    "temporary-queue",
+	}, "")
+	assert.NoError(t, err)
+
+	queueShutdown := []func(){}
+	queueTerminate := []func(){}
+
+	prQueue = q.(queue.UniqueQueue)
+
 	pr := models.AssertExistsAndLoadBean(t, &models.PullRequest{ID: 1}).(*models.PullRequest)
 	AddToTaskQueue(pr)
 
+	assert.Eventually(t, func() bool {
+		pr = models.AssertExistsAndLoadBean(t, &models.PullRequest{ID: 1}).(*models.PullRequest)
+		return pr.Status == models.PullRequestStatusChecking
+	}, 1*time.Second, 100*time.Millisecond)
+
+	has, err := prQueue.Has(strconv.FormatInt(pr.ID, 10))
+	assert.True(t, has)
+	assert.NoError(t, err)
+
+	prQueue.Run(func(_ context.Context, shutdown func()) {
+		queueShutdown = append(queueShutdown, shutdown)
+	}, func(_ context.Context, terminate func()) {
+		queueTerminate = append(queueTerminate, terminate)
+	})
+
 	select {
-	case id := <-pullRequestQueue.Queue():
-		assert.EqualValues(t, strconv.FormatInt(pr.ID, 10), id)
+	case id := <-idChan:
+		assert.EqualValues(t, pr.ID, id)
 	case <-time.After(time.Second):
 		assert.Fail(t, "Timeout: nothing was added to pullRequestQueue")
 	}
 
-	assert.True(t, pullRequestQueue.Exist(pr.ID))
+	has, err = prQueue.Has(strconv.FormatInt(pr.ID, 10))
+	assert.False(t, has)
+	assert.NoError(t, err)
+
 	pr = models.AssertExistsAndLoadBean(t, &models.PullRequest{ID: 1}).(*models.PullRequest)
 	assert.Equal(t, models.PullRequestStatusChecking, pr.Status)
+
+	for _, callback := range queueShutdown {
+		callback()
+	}
+	for _, callback := range queueTerminate {
+		callback()
+	}
+
+	prQueue = nil
 }
diff --git a/vendor/gitea.com/lunny/levelqueue/.gitignore b/vendor/gitea.com/lunny/levelqueue/.gitignore
index 59a8bdee30..ab1fe76029 100644
--- a/vendor/gitea.com/lunny/levelqueue/.gitignore
+++ b/vendor/gitea.com/lunny/levelqueue/.gitignore
@@ -1,3 +1,7 @@
 queue/
 queue_pop/
-queue_push/
\ No newline at end of file
+queue_push/
+uniquequeue/
+uniquequeue_pop/
+uniquequeue_push/
+set/
diff --git a/vendor/gitea.com/lunny/levelqueue/README.md b/vendor/gitea.com/lunny/levelqueue/README.md
index 80a0853cf6..21db280839 100644
--- a/vendor/gitea.com/lunny/levelqueue/README.md
+++ b/vendor/gitea.com/lunny/levelqueue/README.md
@@ -25,4 +25,36 @@ data, err = queue.LPop()
 queue.LHandle(func(dt []byte) error{
     return nil
 })
-```
\ No newline at end of file
+```
+
+You can now create a Set from a leveldb:
+
+```Go
+set, err := levelqueue.OpenSet("./set")
+
+added, err:= set.Add([]byte("member1"))
+
+has, err := set.Has([]byte("member1"))
+
+members, err := set.Members()
+
+removed, err := set.Remove([]byte("member1"))
+```
+
+And you can create a UniqueQueue from a leveldb:
+
+```Go
+queue, err := levelqueue.OpenUnique("./queue")
+
+err := queue.RPush([]byte("member1"))
+
+err = queue.LPush([]byte("member1"))
+// Will return ErrAlreadyInQueue
+
+// and so on.
+```
+
+## Creating Queues, UniqueQueues and Sets from already open DB
+
+If you have an already open DB you can create these from this using the
+`NewQueue`, `NewUniqueQueue` and `NewSet` functions.
\ No newline at end of file
diff --git a/vendor/gitea.com/lunny/levelqueue/error.go b/vendor/gitea.com/lunny/levelqueue/error.go
index d639c5d496..648c185655 100644
--- a/vendor/gitea.com/lunny/levelqueue/error.go
+++ b/vendor/gitea.com/lunny/levelqueue/error.go
@@ -7,6 +7,8 @@ package levelqueue
 import "errors"
 
 var (
-	// ErrNotFound means no element in queue
+	// ErrNotFound means no elements in queue
 	ErrNotFound = errors.New("no key found")
+
+	ErrAlreadyInQueue = errors.New("value already in queue")
 )
diff --git a/vendor/gitea.com/lunny/levelqueue/queue.go b/vendor/gitea.com/lunny/levelqueue/queue.go
index af624db8e4..20ed90100c 100644
--- a/vendor/gitea.com/lunny/levelqueue/queue.go
+++ b/vendor/gitea.com/lunny/levelqueue/queue.go
@@ -12,37 +12,62 @@ import (
 	"github.com/syndtr/goleveldb/leveldb"
 )
 
+const (
+	lowKeyStr  = "low"
+	highKeyStr = "high"
+)
+
 // Queue defines a queue struct
 type Queue struct {
-	db       *leveldb.DB
-	highLock sync.Mutex
-	lowLock  sync.Mutex
-	low      int64
-	high     int64
+	db                *leveldb.DB
+	highLock          sync.Mutex
+	lowLock           sync.Mutex
+	low               int64
+	high              int64
+	lowKey            []byte
+	highKey           []byte
+	prefix            []byte
+	closeUnderlyingDB bool
 }
 
-// Open opens a queue object or create it if not exist
+// Open opens a queue from the db path or creates a
+// queue if it doesn't exist.
+// The keys will not be prefixed by default
 func Open(dataDir string) (*Queue, error) {
 	db, err := leveldb.OpenFile(dataDir, nil)
 	if err != nil {
 		return nil, err
 	}
+	return NewQueue(db, []byte{}, true)
+}
+
+// NewQueue creates a queue from a db. The keys will be prefixed with prefix
+// and at close the db will be closed as per closeUnderlyingDB
+func NewQueue(db *leveldb.DB, prefix []byte, closeUnderlyingDB bool) (*Queue, error) {
+	var err error
 
 	var queue = &Queue{
-		db: db,
+		db:                db,
+		closeUnderlyingDB: closeUnderlyingDB,
 	}
-	queue.low, err = queue.readID(lowKey)
+
+	queue.prefix = make([]byte, len(prefix))
+	copy(queue.prefix, prefix)
+	queue.lowKey = withPrefix(prefix, []byte(lowKeyStr))
+	queue.highKey = withPrefix(prefix, []byte(highKeyStr))
+
+	queue.low, err = queue.readID(queue.lowKey)
 	if err == leveldb.ErrNotFound {
 		queue.low = 1
-		err = db.Put(lowKey, id2bytes(1), nil)
+		err = db.Put(queue.lowKey, id2bytes(1), nil)
 	}
 	if err != nil {
 		return nil, err
 	}
 
-	queue.high, err = queue.readID(highKey)
+	queue.high, err = queue.readID(queue.highKey)
 	if err == leveldb.ErrNotFound {
-		err = db.Put(highKey, id2bytes(0), nil)
+		err = db.Put(queue.highKey, id2bytes(0), nil)
 	}
 	if err != nil {
 		return nil, err
@@ -59,15 +84,10 @@ func (queue *Queue) readID(key []byte) (int64, error) {
 	return bytes2id(bs)
 }
 
-var (
-	lowKey  = []byte("low")
-	highKey = []byte("high")
-)
-
 func (queue *Queue) highincrement() (int64, error) {
 	id := queue.high + 1
 	queue.high = id
-	err := queue.db.Put(highKey, id2bytes(queue.high), nil)
+	err := queue.db.Put(queue.highKey, id2bytes(queue.high), nil)
 	if err != nil {
 		queue.high = queue.high - 1
 		return 0, err
@@ -77,7 +97,7 @@ func (queue *Queue) highincrement() (int64, error) {
 
 func (queue *Queue) highdecrement() (int64, error) {
 	queue.high = queue.high - 1
-	err := queue.db.Put(highKey, id2bytes(queue.high), nil)
+	err := queue.db.Put(queue.highKey, id2bytes(queue.high), nil)
 	if err != nil {
 		queue.high = queue.high + 1
 		return 0, err
@@ -87,7 +107,7 @@ func (queue *Queue) highdecrement() (int64, error) {
 
 func (queue *Queue) lowincrement() (int64, error) {
 	queue.low = queue.low + 1
-	err := queue.db.Put(lowKey, id2bytes(queue.low), nil)
+	err := queue.db.Put(queue.lowKey, id2bytes(queue.low), nil)
 	if err != nil {
 		queue.low = queue.low - 1
 		return 0, err
@@ -97,7 +117,7 @@ func (queue *Queue) lowincrement() (int64, error) {
 
 func (queue *Queue) lowdecrement() (int64, error) {
 	queue.low = queue.low - 1
-	err := queue.db.Put(lowKey, id2bytes(queue.low), nil)
+	err := queue.db.Put(queue.lowKey, id2bytes(queue.low), nil)
 	if err != nil {
 		queue.low = queue.low + 1
 		return 0, err
@@ -125,6 +145,17 @@ func bytes2id(b []byte) (int64, error) {
 	return binary.ReadVarint(bytes.NewReader(b))
 }
 
+func withPrefix(prefix []byte, value []byte) []byte {
+	if len(prefix) == 0 {
+		return value
+	}
+	prefixed := make([]byte, len(prefix)+1+len(value))
+	copy(prefixed[0:len(prefix)], prefix)
+	prefixed[len(prefix)] = '-'
+	copy(prefixed[len(prefix)+1:], value)
+	return prefixed
+}
+
 // RPush pushes a data from right of queue
 func (queue *Queue) RPush(data []byte) error {
 	queue.highLock.Lock()
@@ -133,7 +164,7 @@ func (queue *Queue) RPush(data []byte) error {
 		queue.highLock.Unlock()
 		return err
 	}
-	err = queue.db.Put(id2bytes(id), data, nil)
+	err = queue.db.Put(withPrefix(queue.prefix, id2bytes(id)), data, nil)
 	queue.highLock.Unlock()
 	return err
 }
@@ -146,7 +177,7 @@ func (queue *Queue) LPush(data []byte) error {
 		queue.lowLock.Unlock()
 		return err
 	}
-	err = queue.db.Put(id2bytes(id), data, nil)
+	err = queue.db.Put(withPrefix(queue.prefix, id2bytes(id)), data, nil)
 	queue.lowLock.Unlock()
 	return err
 }
@@ -157,7 +188,7 @@ func (queue *Queue) RPop() ([]byte, error) {
 	defer queue.highLock.Unlock()
 	currentID := queue.high
 
-	res, err := queue.db.Get(id2bytes(currentID), nil)
+	res, err := queue.db.Get(withPrefix(queue.prefix, id2bytes(currentID)), nil)
 	if err != nil {
 		if err == leveldb.ErrNotFound {
 			return nil, ErrNotFound
@@ -170,7 +201,7 @@ func (queue *Queue) RPop() ([]byte, error) {
 		return nil, err
 	}
 
-	err = queue.db.Delete(id2bytes(currentID), nil)
+	err = queue.db.Delete(withPrefix(queue.prefix, id2bytes(currentID)), nil)
 	if err != nil {
 		return nil, err
 	}
@@ -183,7 +214,7 @@ func (queue *Queue) RHandle(h func([]byte) error) error {
 	defer queue.highLock.Unlock()
 	currentID := queue.high
 
-	res, err := queue.db.Get(id2bytes(currentID), nil)
+	res, err := queue.db.Get(withPrefix(queue.prefix, id2bytes(currentID)), nil)
 	if err != nil {
 		if err == leveldb.ErrNotFound {
 			return ErrNotFound
@@ -200,7 +231,7 @@ func (queue *Queue) RHandle(h func([]byte) error) error {
 		return err
 	}
 
-	return queue.db.Delete(id2bytes(currentID), nil)
+	return queue.db.Delete(withPrefix(queue.prefix, id2bytes(currentID)), nil)
 }
 
 // LPop pop a data from left of queue
@@ -209,7 +240,7 @@ func (queue *Queue) LPop() ([]byte, error) {
 	defer queue.lowLock.Unlock()
 	currentID := queue.low
 
-	res, err := queue.db.Get(id2bytes(currentID), nil)
+	res, err := queue.db.Get(withPrefix(queue.prefix, id2bytes(currentID)), nil)
 	if err != nil {
 		if err == leveldb.ErrNotFound {
 			return nil, ErrNotFound
@@ -222,7 +253,7 @@ func (queue *Queue) LPop() ([]byte, error) {
 		return nil, err
 	}
 
-	err = queue.db.Delete(id2bytes(currentID), nil)
+	err = queue.db.Delete(withPrefix(queue.prefix, id2bytes(currentID)), nil)
 	if err != nil {
 		return nil, err
 	}
@@ -235,7 +266,7 @@ func (queue *Queue) LHandle(h func([]byte) error) error {
 	defer queue.lowLock.Unlock()
 	currentID := queue.low
 
-	res, err := queue.db.Get(id2bytes(currentID), nil)
+	res, err := queue.db.Get(withPrefix(queue.prefix, id2bytes(currentID)), nil)
 	if err != nil {
 		if err == leveldb.ErrNotFound {
 			return ErrNotFound
@@ -252,11 +283,15 @@ func (queue *Queue) LHandle(h func([]byte) error) error {
 		return err
 	}
 
-	return queue.db.Delete(id2bytes(currentID), nil)
+	return queue.db.Delete(withPrefix(queue.prefix, id2bytes(currentID)), nil)
 }
 
-// Close closes the queue
+// Close closes the queue (and the underlying db is set to closeUnderlyingDB)
 func (queue *Queue) Close() error {
+	if !queue.closeUnderlyingDB {
+		queue.db = nil
+		return nil
+	}
 	err := queue.db.Close()
 	queue.db = nil
 	return err
diff --git a/vendor/gitea.com/lunny/levelqueue/set.go b/vendor/gitea.com/lunny/levelqueue/set.go
new file mode 100644
index 0000000000..88f4e9b1d1
--- /dev/null
+++ b/vendor/gitea.com/lunny/levelqueue/set.go
@@ -0,0 +1,110 @@
+// Copyright 2020 Andrew Thornton. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package levelqueue
+
+import (
+	"sync"
+
+	"github.com/syndtr/goleveldb/leveldb"
+	"github.com/syndtr/goleveldb/leveldb/util"
+)
+
+const (
+	setPrefixStr = "set"
+)
+
+// Set defines a set struct
+type Set struct {
+	db                *leveldb.DB
+	closeUnderlyingDB bool
+	lock              sync.Mutex
+	prefix            []byte
+}
+
+// OpenSet opens a set from the db path or creates a set if it doesn't exist.
+// The keys will be prefixed with "set-" by default
+func OpenSet(dataDir string) (*Set, error) {
+	db, err := leveldb.OpenFile(dataDir, nil)
+	if err != nil {
+		return nil, err
+	}
+	return NewSet(db, []byte(setPrefixStr), true)
+}
+
+// NewSet creates a set from a db. The keys will be prefixed with prefix
+// and at close the db will be closed as per closeUnderlyingDB
+func NewSet(db *leveldb.DB, prefix []byte, closeUnderlyingDB bool) (*Set, error) {
+	set := &Set{
+		db:                db,
+		closeUnderlyingDB: closeUnderlyingDB,
+	}
+	set.prefix = make([]byte, len(prefix))
+	copy(set.prefix, prefix)
+
+	return set, nil
+}
+
+// Add adds a member string to a key set, returns true if the member was not already present
+func (set *Set) Add(value []byte) (bool, error) {
+	set.lock.Lock()
+	defer set.lock.Unlock()
+	setKey := withPrefix(set.prefix, value)
+	has, err := set.db.Has(setKey, nil)
+	if err != nil || has {
+		return !has, err
+	}
+	return !has, set.db.Put(setKey, []byte(""), nil)
+}
+
+// Members returns the current members of the set
+func (set *Set) Members() ([][]byte, error) {
+	set.lock.Lock()
+	defer set.lock.Unlock()
+	var members [][]byte
+	prefix := withPrefix(set.prefix, []byte{})
+	iter := set.db.NewIterator(util.BytesPrefix(prefix), nil)
+	for iter.Next() {
+		slice := iter.Key()[len(prefix):]
+		value := make([]byte, len(slice))
+		copy(value, slice)
+		members = append(members, value)
+	}
+	iter.Release()
+	return members, iter.Error()
+}
+
+// Has returns if the member is in the set
+func (set *Set) Has(value []byte) (bool, error) {
+	set.lock.Lock()
+	defer set.lock.Unlock()
+	setKey := withPrefix(set.prefix, value)
+
+	return set.db.Has(setKey, nil)
+}
+
+// Remove removes a member from the set, returns true if the member was present
+func (set *Set) Remove(value []byte) (bool, error) {
+	set.lock.Lock()
+	defer set.lock.Unlock()
+	setKey := withPrefix(set.prefix, value)
+
+	has, err := set.db.Has(setKey, nil)
+	if err != nil || !has {
+		return has, err
+	}
+
+	return has, set.db.Delete(setKey, nil)
+}
+
+// Close closes the set (and the underlying db if set to closeUnderlyingDB)
+func (set *Set) Close() error {
+	if !set.closeUnderlyingDB {
+		set.db = nil
+		return nil
+	}
+	err := set.db.Close()
+	set.db = nil
+	return err
+}
diff --git a/vendor/gitea.com/lunny/levelqueue/uniquequeue.go b/vendor/gitea.com/lunny/levelqueue/uniquequeue.go
new file mode 100644
index 0000000000..8d2676b0d2
--- /dev/null
+++ b/vendor/gitea.com/lunny/levelqueue/uniquequeue.go
@@ -0,0 +1,184 @@
+// Copyright 2020 Andrew Thornton. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package levelqueue
+
+import (
+	"fmt"
+
+	"github.com/syndtr/goleveldb/leveldb"
+)
+
+const (
+	uniqueQueuePrefixStr = "unique"
+)
+
+// UniqueQueue defines an unique queue struct
+type UniqueQueue struct {
+	q                 *Queue
+	set               *Set
+	db                *leveldb.DB
+	closeUnderlyingDB bool
+}
+
+// OpenUnique opens an unique queue from the db path or creates a set if it doesn't exist.
+// The keys in the queue portion will not be prefixed, and the set keys will be prefixed with "set-"
+func OpenUnique(dataDir string) (*UniqueQueue, error) {
+	db, err := leveldb.OpenFile(dataDir, nil)
+	if err != nil {
+		return nil, err
+	}
+	return NewUniqueQueue(db, []byte{}, []byte(uniqueQueuePrefixStr), true)
+}
+
+// NewUniqueQueue creates a new unique queue from a db.
+// The queue keys will be prefixed with queuePrefix and the set keys with setPrefix
+// and at close the db will be closed as per closeUnderlyingDB
+func NewUniqueQueue(db *leveldb.DB, queuePrefix []byte, setPrefix []byte, closeUnderlyingDB bool) (*UniqueQueue, error) {
+	internal, err := NewQueue(db, queuePrefix, false)
+	if err != nil {
+		return nil, err
+	}
+	set, err := NewSet(db, setPrefix, false)
+	if err != nil {
+		return nil, err
+	}
+	queue := &UniqueQueue{
+		q:                 internal,
+		set:               set,
+		db:                db,
+		closeUnderlyingDB: closeUnderlyingDB,
+	}
+
+	return queue, err
+}
+
+// LPush pushes data to the left of the queue
+func (queue *UniqueQueue) LPush(data []byte) error {
+	return queue.LPushFunc(data, nil)
+}
+
+// LPushFunc pushes data to the left of the queue and calls the callback if it is added
+func (queue *UniqueQueue) LPushFunc(data []byte, fn func() error) error {
+	added, err := queue.set.Add(data)
+	if err != nil {
+		return err
+	}
+	if !added {
+		return ErrAlreadyInQueue
+	}
+
+	if fn != nil {
+		err = fn()
+		if err != nil {
+			_, remErr := queue.set.Remove(data)
+			if remErr != nil {
+				return fmt.Errorf("%v & %v", err, remErr)
+			}
+			return err
+		}
+	}
+
+	return queue.q.LPush(data)
+}
+
+// RPush pushes data to the right of the queue
+func (queue *UniqueQueue) RPush(data []byte) error {
+	return queue.RPushFunc(data, nil)
+}
+
+// RPushFunc pushes data to the right of the queue and calls the callback if is added
+func (queue *UniqueQueue) RPushFunc(data []byte, fn func() error) error {
+	added, err := queue.set.Add(data)
+	if err != nil {
+		return err
+	}
+	if !added {
+		return ErrAlreadyInQueue
+	}
+
+	if fn != nil {
+		err = fn()
+		if err != nil {
+			_, remErr := queue.set.Remove(data)
+			if remErr != nil {
+				return fmt.Errorf("%v & %v", err, remErr)
+			}
+			return err
+		}
+	}
+
+	return queue.q.RPush(data)
+}
+
+// RPop pop data from the right of the queue
+func (queue *UniqueQueue) RPop() ([]byte, error) {
+	popped, err := queue.q.RPop()
+	if err != nil {
+		return popped, err
+	}
+	_, err = queue.set.Remove(popped)
+
+	return popped, err
+}
+
+// RHandle receives a user callback function to handle the right element of the queue, if the function returns nil, then delete the element, otherwise keep the element.
+func (queue *UniqueQueue) RHandle(h func([]byte) error) error {
+	return queue.q.RHandle(func(data []byte) error {
+		err := h(data)
+		if err != nil {
+			return err
+		}
+		_, err = queue.set.Remove(data)
+		return err
+	})
+}
+
+// LPop pops data from left of the queue
+func (queue *UniqueQueue) LPop() ([]byte, error) {
+	popped, err := queue.q.LPop()
+	if err != nil {
+		return popped, err
+	}
+	_, err = queue.set.Remove(popped)
+
+	return popped, err
+}
+
+// LHandle receives a user callback function to handle the left element of the queue, if the function returns nil, then delete the element, otherwise keep the element.
+func (queue *UniqueQueue) LHandle(h func([]byte) error) error {
+	return queue.q.LHandle(func(data []byte) error {
+		err := h(data)
+		if err != nil {
+			return err
+		}
+		_, err = queue.set.Remove(data)
+		return err
+	})
+}
+
+// Has checks whether the data is already in the queue
+func (queue *UniqueQueue) Has(data []byte) (bool, error) {
+	return queue.set.Has(data)
+}
+
+// Len returns the length of the queue
+func (queue *UniqueQueue) Len() int64 {
+	queue.set.lock.Lock()
+	defer queue.set.lock.Unlock()
+	return queue.q.Len()
+}
+
+// Close closes the queue (and the underlying DB if set to closeUnderlyingDB)
+func (queue *UniqueQueue) Close() error {
+	_ = queue.q.Close()
+	_ = queue.set.Close()
+	if !queue.closeUnderlyingDB {
+		queue.db = nil
+		return nil
+	}
+	err := queue.db.Close()
+	queue.db = nil
+	return err
+}
diff --git a/vendor/modules.txt b/vendor/modules.txt
index 5203c24e4a..947008d63c 100644
--- a/vendor/modules.txt
+++ b/vendor/modules.txt
@@ -1,6 +1,6 @@
 # cloud.google.com/go v0.45.0
 cloud.google.com/go/compute/metadata
-# gitea.com/lunny/levelqueue v0.1.0
+# gitea.com/lunny/levelqueue v0.2.0
 gitea.com/lunny/levelqueue
 # gitea.com/macaron/binding v0.0.0-20190822013154-a5f53841ed2b
 gitea.com/macaron/binding