diff --git a/modules/graceful/context.go b/modules/graceful/context.go
index 6b5b207ff3..4fcbcb04b6 100644
--- a/modules/graceful/context.go
+++ b/modules/graceful/context.go
@@ -5,51 +5,8 @@ package graceful
 
 import (
 	"context"
-	"time"
 )
 
-// ChannelContext is a context that wraps a channel and error as a context
-type ChannelContext struct {
-	done <-chan struct{}
-	err  error
-}
-
-// NewChannelContext creates a ChannelContext from a channel and error
-func NewChannelContext(done <-chan struct{}, err error) *ChannelContext {
-	return &ChannelContext{
-		done: done,
-		err:  err,
-	}
-}
-
-// Deadline returns the time when work done on behalf of this context
-// should be canceled. There is no Deadline for a ChannelContext
-func (ctx *ChannelContext) Deadline() (deadline time.Time, ok bool) {
-	return deadline, ok
-}
-
-// Done returns the channel provided at the creation of this context.
-// When closed, work done on behalf of this context should be canceled.
-func (ctx *ChannelContext) Done() <-chan struct{} {
-	return ctx.done
-}
-
-// Err returns nil, if Done is not closed. If Done is closed,
-// Err returns the error provided at the creation of this context
-func (ctx *ChannelContext) Err() error {
-	select {
-	case <-ctx.done:
-		return ctx.err
-	default:
-		return nil
-	}
-}
-
-// Value returns nil for all calls as no values are or can be associated with this context
-func (ctx *ChannelContext) Value(key interface{}) interface{} {
-	return nil
-}
-
 // ShutdownContext returns a context.Context that is Done at shutdown
 // Callers using this context should ensure that they are registered as a running server
 // in order that they are waited for.
diff --git a/modules/graceful/manager.go b/modules/graceful/manager.go
index d32788092d..3604c0a3f5 100644
--- a/modules/graceful/manager.go
+++ b/modules/graceful/manager.go
@@ -23,6 +23,11 @@ const (
 	stateTerminate
 )
 
+type RunCanceler interface {
+	Run()
+	Cancel()
+}
+
 // There are some places that could inherit sockets:
 //
 // * HTTP or HTTPS main listener
@@ -55,46 +60,19 @@ func InitManager(ctx context.Context) {
 	})
 }
 
-// WithCallback is a runnable to call when the caller has finished
-type WithCallback func(callback func())
-
-// RunnableWithShutdownFns is a runnable with functions to run at shutdown and terminate
-// After the callback to atShutdown is called and is complete, the main function must return.
-// Similarly the callback function provided to atTerminate must return once termination is complete.
-// Please note that use of the atShutdown and atTerminate callbacks will create go-routines that will wait till their respective signals
-// - users must therefore be careful to only call these as necessary.
-type RunnableWithShutdownFns func(atShutdown, atTerminate func(func()))
-
-// RunWithShutdownFns takes a function that has both atShutdown and atTerminate callbacks
-// After the callback to atShutdown is called and is complete, the main function must return.
-// Similarly the callback function provided to atTerminate must return once termination is complete.
-// Please note that use of the atShutdown and atTerminate callbacks will create go-routines that will wait till their respective signals
-// - users must therefore be careful to only call these as necessary.
-func (g *Manager) RunWithShutdownFns(run RunnableWithShutdownFns) {
+// RunWithCancel helps to run a function with a custom context, the Cancel function will be called at shutdown
+// The Cancel function should stop the Run function in predictable time.
+func (g *Manager) RunWithCancel(rc RunCanceler) {
+	g.RunAtShutdown(context.Background(), rc.Cancel)
 	g.runningServerWaitGroup.Add(1)
 	defer g.runningServerWaitGroup.Done()
 	defer func() {
 		if err := recover(); err != nil {
-			log.Critical("PANIC during RunWithShutdownFns: %v\nStacktrace: %s", err, log.Stack(2))
+			log.Critical("PANIC during RunWithCancel: %v\nStacktrace: %s", err, log.Stack(2))
 			g.doShutdown()
 		}
 	}()
-	run(func(atShutdown func()) {
-		g.lock.Lock()
-		defer g.lock.Unlock()
-		g.toRunAtShutdown = append(g.toRunAtShutdown,
-			func() {
-				defer func() {
-					if err := recover(); err != nil {
-						log.Critical("PANIC during RunWithShutdownFns: %v\nStacktrace: %s", err, log.Stack(2))
-						g.doShutdown()
-					}
-				}()
-				atShutdown()
-			})
-	}, func(atTerminate func()) {
-		g.RunAtTerminate(atTerminate)
-	})
+	rc.Run()
 }
 
 // RunWithShutdownContext takes a function that has a context to watch for shutdown.
@@ -151,21 +129,6 @@ func (g *Manager) RunAtShutdown(ctx context.Context, shutdown func()) {
 		})
 }
 
-// RunAtHammer creates a go-routine to run the provided function at shutdown
-func (g *Manager) RunAtHammer(hammer func()) {
-	g.lock.Lock()
-	defer g.lock.Unlock()
-	g.toRunAtHammer = append(g.toRunAtHammer,
-		func() {
-			defer func() {
-				if err := recover(); err != nil {
-					log.Critical("PANIC during RunAtHammer: %v\nStacktrace: %s", err, log.Stack(2))
-				}
-			}()
-			hammer()
-		})
-}
-
 func (g *Manager) doShutdown() {
 	if !g.setStateTransition(stateRunning, stateShuttingDown) {
 		g.DoImmediateHammer()
@@ -206,9 +169,6 @@ func (g *Manager) doHammerTime(d time.Duration) {
 		g.hammerCtxCancel()
 		atHammerCtx := pprof.WithLabels(g.terminateCtx, pprof.Labels("graceful-lifecycle", "post-hammer"))
 		pprof.SetGoroutineLabels(atHammerCtx)
-		for _, fn := range g.toRunAtHammer {
-			go fn()
-		}
 	}
 	g.lock.Unlock()
 }
diff --git a/modules/graceful/manager_unix.go b/modules/graceful/manager_unix.go
index f949abfd21..b1fd6da76d 100644
--- a/modules/graceful/manager_unix.go
+++ b/modules/graceful/manager_unix.go
@@ -41,7 +41,6 @@ type Manager struct {
 	terminateWaitGroup     sync.WaitGroup
 
 	toRunAtShutdown  []func()
-	toRunAtHammer    []func()
 	toRunAtTerminate []func()
 }
 
diff --git a/modules/graceful/manager_windows.go b/modules/graceful/manager_windows.go
index f3606084aa..f676f86d04 100644
--- a/modules/graceful/manager_windows.go
+++ b/modules/graceful/manager_windows.go
@@ -50,7 +50,6 @@ type Manager struct {
 	shutdownRequested      chan struct{}
 
 	toRunAtShutdown  []func()
-	toRunAtHammer    []func()
 	toRunAtTerminate []func()
 }
 
diff --git a/modules/indexer/code/indexer.go b/modules/indexer/code/indexer.go
index e9b8e76500..f38fd6000c 100644
--- a/modules/indexer/code/indexer.go
+++ b/modules/indexer/code/indexer.go
@@ -166,7 +166,7 @@ func Init() {
 		handler := func(items ...*IndexerData) (unhandled []*IndexerData) {
 			idx, err := indexer.get()
 			if idx == nil || err != nil {
-				log.Error("Codes indexer handler: unable to get indexer!")
+				log.Warn("Codes indexer handler: indexer is not ready, retry later.")
 				return items
 			}
 
@@ -201,7 +201,7 @@ func Init() {
 			return unhandled
 		}
 
-		indexerQueue = queue.CreateUniqueQueue("code_indexer", handler)
+		indexerQueue = queue.CreateUniqueQueue(ctx, "code_indexer", handler)
 		if indexerQueue == nil {
 			log.Fatal("Unable to create codes indexer queue")
 		}
@@ -259,7 +259,7 @@ func Init() {
 		indexer.set(rIndexer)
 
 		// Start processing the queue
-		go graceful.GetManager().RunWithShutdownFns(indexerQueue.Run)
+		go graceful.GetManager().RunWithCancel(indexerQueue)
 
 		if populate {
 			go graceful.GetManager().RunWithShutdownContext(populateRepoIndexer)
diff --git a/modules/indexer/issues/indexer.go b/modules/indexer/issues/indexer.go
index 76ff80ffca..f36ea10935 100644
--- a/modules/indexer/issues/indexer.go
+++ b/modules/indexer/issues/indexer.go
@@ -102,7 +102,7 @@ var (
 func InitIssueIndexer(syncReindex bool) {
 	ctx, _, finished := process.GetManager().AddTypedContext(context.Background(), "Service: IssueIndexer", process.SystemProcessType, false)
 
-	waitChannel := make(chan time.Duration, 1)
+	indexerInitWaitChannel := make(chan time.Duration, 1)
 
 	// Create the Queue
 	switch setting.Indexer.IssueType {
@@ -110,7 +110,7 @@ func InitIssueIndexer(syncReindex bool) {
 		handler := func(items ...*IndexerData) (unhandled []*IndexerData) {
 			indexer := holder.get()
 			if indexer == nil {
-				log.Error("Issue indexer handler: unable to get indexer.")
+				log.Warn("Issue indexer handler: indexer is not ready, retry later.")
 				return items
 			}
 			toIndex := make([]*IndexerData, 0, len(items))
@@ -138,15 +138,17 @@ func InitIssueIndexer(syncReindex bool) {
 			return unhandled
 		}
 
-		issueIndexerQueue = queue.CreateSimpleQueue("issue_indexer", handler)
+		issueIndexerQueue = queue.CreateSimpleQueue(ctx, "issue_indexer", handler)
 
 		if issueIndexerQueue == nil {
 			log.Fatal("Unable to create issue indexer queue")
 		}
 	default:
-		issueIndexerQueue = queue.CreateSimpleQueue[*IndexerData]("issue_indexer", nil)
+		issueIndexerQueue = queue.CreateSimpleQueue[*IndexerData](ctx, "issue_indexer", nil)
 	}
 
+	graceful.GetManager().RunAtTerminate(finished)
+
 	// Create the Indexer
 	go func() {
 		pprof.SetGoroutineLabels(ctx)
@@ -178,51 +180,41 @@ func InitIssueIndexer(syncReindex bool) {
 				if issueIndexer != nil {
 					issueIndexer.Close()
 				}
-				finished()
 				log.Info("PID: %d Issue Indexer closed", os.Getpid())
 			})
 			log.Debug("Created Bleve Indexer")
 		case "elasticsearch":
-			graceful.GetManager().RunWithShutdownFns(func(_, atTerminate func(func())) {
-				pprof.SetGoroutineLabels(ctx)
-				issueIndexer, err := NewElasticSearchIndexer(setting.Indexer.IssueConnStr, setting.Indexer.IssueIndexerName)
-				if err != nil {
-					log.Fatal("Unable to initialize Elastic Search Issue Indexer at connection: %s Error: %v", setting.Indexer.IssueConnStr, err)
-				}
-				exist, err := issueIndexer.Init()
-				if err != nil {
-					log.Fatal("Unable to issueIndexer.Init with connection %s Error: %v", setting.Indexer.IssueConnStr, err)
-				}
-				populate = !exist
-				holder.set(issueIndexer)
-				atTerminate(finished)
-			})
+			issueIndexer, err := NewElasticSearchIndexer(setting.Indexer.IssueConnStr, setting.Indexer.IssueIndexerName)
+			if err != nil {
+				log.Fatal("Unable to initialize Elastic Search Issue Indexer at connection: %s Error: %v", setting.Indexer.IssueConnStr, err)
+			}
+			exist, err := issueIndexer.Init()
+			if err != nil {
+				log.Fatal("Unable to issueIndexer.Init with connection %s Error: %v", setting.Indexer.IssueConnStr, err)
+			}
+			populate = !exist
+			holder.set(issueIndexer)
 		case "db":
 			issueIndexer := &DBIndexer{}
 			holder.set(issueIndexer)
-			graceful.GetManager().RunAtTerminate(finished)
 		case "meilisearch":
-			graceful.GetManager().RunWithShutdownFns(func(_, atTerminate func(func())) {
-				pprof.SetGoroutineLabels(ctx)
-				issueIndexer, err := NewMeilisearchIndexer(setting.Indexer.IssueConnStr, setting.Indexer.IssueConnAuth, setting.Indexer.IssueIndexerName)
-				if err != nil {
-					log.Fatal("Unable to initialize Meilisearch Issue Indexer at connection: %s Error: %v", setting.Indexer.IssueConnStr, err)
-				}
-				exist, err := issueIndexer.Init()
-				if err != nil {
-					log.Fatal("Unable to issueIndexer.Init with connection %s Error: %v", setting.Indexer.IssueConnStr, err)
-				}
-				populate = !exist
-				holder.set(issueIndexer)
-				atTerminate(finished)
-			})
+			issueIndexer, err := NewMeilisearchIndexer(setting.Indexer.IssueConnStr, setting.Indexer.IssueConnAuth, setting.Indexer.IssueIndexerName)
+			if err != nil {
+				log.Fatal("Unable to initialize Meilisearch Issue Indexer at connection: %s Error: %v", setting.Indexer.IssueConnStr, err)
+			}
+			exist, err := issueIndexer.Init()
+			if err != nil {
+				log.Fatal("Unable to issueIndexer.Init with connection %s Error: %v", setting.Indexer.IssueConnStr, err)
+			}
+			populate = !exist
+			holder.set(issueIndexer)
 		default:
 			holder.cancel()
 			log.Fatal("Unknown issue indexer type: %s", setting.Indexer.IssueType)
 		}
 
 		// Start processing the queue
-		go graceful.GetManager().RunWithShutdownFns(issueIndexerQueue.Run)
+		go graceful.GetManager().RunWithCancel(issueIndexerQueue)
 
 		// Populate the index
 		if populate {
@@ -232,13 +224,14 @@ func InitIssueIndexer(syncReindex bool) {
 				go graceful.GetManager().RunWithShutdownContext(populateIssueIndexer)
 			}
 		}
-		waitChannel <- time.Since(start)
-		close(waitChannel)
+
+		indexerInitWaitChannel <- time.Since(start)
+		close(indexerInitWaitChannel)
 	}()
 
 	if syncReindex {
 		select {
-		case <-waitChannel:
+		case <-indexerInitWaitChannel:
 		case <-graceful.GetManager().IsShutdown():
 		}
 	} else if setting.Indexer.StartupTimeout > 0 {
@@ -249,7 +242,7 @@ func InitIssueIndexer(syncReindex bool) {
 				timeout += setting.GracefulHammerTime
 			}
 			select {
-			case duration := <-waitChannel:
+			case duration := <-indexerInitWaitChannel:
 				log.Info("Issue Indexer Initialization took %v", duration)
 			case <-graceful.GetManager().IsShutdown():
 				log.Warn("Shutdown occurred before issue index initialisation was complete")
diff --git a/modules/indexer/stats/queue.go b/modules/indexer/stats/queue.go
index 46438925e4..d002bd57cf 100644
--- a/modules/indexer/stats/queue.go
+++ b/modules/indexer/stats/queue.go
@@ -29,13 +29,11 @@ func handler(items ...int64) []int64 {
 }
 
 func initStatsQueue() error {
-	statsQueue = queue.CreateUniqueQueue("repo_stats_update", handler)
+	statsQueue = queue.CreateUniqueQueue(graceful.GetManager().ShutdownContext(), "repo_stats_update", handler)
 	if statsQueue == nil {
-		return fmt.Errorf("Unable to create repo_stats_update Queue")
+		return fmt.Errorf("unable to create repo_stats_update queue")
 	}
-
-	go graceful.GetManager().RunWithShutdownFns(statsQueue.Run)
-
+	go graceful.GetManager().RunWithCancel(statsQueue)
 	return nil
 }
 
diff --git a/modules/log/event_writer_base.go b/modules/log/event_writer_base.go
index f61d9a7b9d..1d45d579c0 100644
--- a/modules/log/event_writer_base.go
+++ b/modules/log/event_writer_base.go
@@ -8,6 +8,7 @@ import (
 	"fmt"
 	"io"
 	"regexp"
+	"runtime/pprof"
 	"time"
 )
 
@@ -143,9 +144,17 @@ func eventWriterStartGo(ctx context.Context, w EventWriter, shared bool) {
 	}
 	w.Base().shared = shared
 	w.Base().stopped = make(chan struct{})
+
+	ctxDesc := "Logger: EventWriter: " + w.GetWriterName()
+	if shared {
+		ctxDesc = "Logger: EventWriter (shared): " + w.GetWriterName()
+	}
+	writerCtx, writerCancel := newContext(ctx, ctxDesc)
 	go func() {
+		defer writerCancel()
 		defer close(w.Base().stopped)
-		w.Run(ctx)
+		pprof.SetGoroutineLabels(writerCtx)
+		w.Run(writerCtx)
 	}()
 }
 
diff --git a/modules/log/event_writer_conn_test.go b/modules/log/event_writer_conn_test.go
index e08ec025a3..69e87aa8c4 100644
--- a/modules/log/event_writer_conn_test.go
+++ b/modules/log/event_writer_conn_test.go
@@ -40,7 +40,7 @@ func TestConnLogger(t *testing.T) {
 	level := INFO
 	flags := LstdFlags | LUTC | Lfuncname
 
-	logger := NewLoggerWithWriters(context.Background(), NewEventWriterConn("test-conn", WriterMode{
+	logger := NewLoggerWithWriters(context.Background(), "test", NewEventWriterConn("test-conn", WriterMode{
 		Level:        level,
 		Prefix:       prefix,
 		Flags:        FlagsFromBits(flags),
diff --git a/modules/log/init.go b/modules/log/init.go
index 798ba86410..38a3ad60a5 100644
--- a/modules/log/init.go
+++ b/modules/log/init.go
@@ -4,14 +4,19 @@
 package log
 
 import (
+	"context"
 	"runtime"
 	"strings"
+	"sync/atomic"
 
 	"code.gitea.io/gitea/modules/process"
 	"code.gitea.io/gitea/modules/util/rotatingfilewriter"
 )
 
-var projectPackagePrefix string
+var (
+	projectPackagePrefix string
+	processTraceDisabled atomic.Int64
+)
 
 func init() {
 	_, filename, _, _ := runtime.Caller(0)
@@ -24,6 +29,10 @@ func init() {
 	rotatingfilewriter.ErrorPrintf = FallbackErrorf
 
 	process.Trace = func(start bool, pid process.IDType, description string, parentPID process.IDType, typ string) {
+		// the logger manager has its own mutex lock, so it's safe to use "Load" here
+		if processTraceDisabled.Load() != 0 {
+			return
+		}
 		if start && parentPID != "" {
 			Log(1, TRACE, "Start %s: %s (from %s) (%s)", NewColoredValue(pid, FgHiYellow), description, NewColoredValue(parentPID, FgYellow), NewColoredValue(typ, Reset))
 		} else if start {
@@ -33,3 +42,11 @@ func init() {
 		}
 	}
 }
+
+func newContext(parent context.Context, desc string) (ctx context.Context, cancel context.CancelFunc) {
+	// the "process manager" also calls "log.Trace()" to output logs, so if we want to create new contexts by the manager, we need to disable the trace temporarily
+	processTraceDisabled.Add(1)
+	defer processTraceDisabled.Add(-1)
+	ctx, _, cancel = process.GetManager().AddTypedContext(parent, desc, process.SystemProcessType, false)
+	return ctx, cancel
+}
diff --git a/modules/log/logger_impl.go b/modules/log/logger_impl.go
index c7e8fde3c0..abd72d326f 100644
--- a/modules/log/logger_impl.go
+++ b/modules/log/logger_impl.go
@@ -228,9 +228,9 @@ func (l *LoggerImpl) GetLevel() Level {
 	return Level(l.level.Load())
 }
 
-func NewLoggerWithWriters(ctx context.Context, writer ...EventWriter) *LoggerImpl {
+func NewLoggerWithWriters(ctx context.Context, name string, writer ...EventWriter) *LoggerImpl {
 	l := &LoggerImpl{}
-	l.ctx, l.ctxCancel = context.WithCancel(ctx)
+	l.ctx, l.ctxCancel = newContext(ctx, "Logger: "+name)
 	l.LevelLogger = BaseLoggerToGeneralLogger(l)
 	l.eventWriters = map[string]EventWriter{}
 	l.syncLevelInternal()
diff --git a/modules/log/logger_test.go b/modules/log/logger_test.go
index a91b4d23af..70222f64f5 100644
--- a/modules/log/logger_test.go
+++ b/modules/log/logger_test.go
@@ -53,7 +53,7 @@ func newDummyWriter(name string, level Level, delay time.Duration) *dummyWriter
 }
 
 func TestLogger(t *testing.T) {
-	logger := NewLoggerWithWriters(context.Background())
+	logger := NewLoggerWithWriters(context.Background(), "test")
 
 	dump := logger.DumpWriters()
 	assert.EqualValues(t, 0, len(dump))
@@ -88,7 +88,7 @@ func TestLogger(t *testing.T) {
 }
 
 func TestLoggerPause(t *testing.T) {
-	logger := NewLoggerWithWriters(context.Background())
+	logger := NewLoggerWithWriters(context.Background(), "test")
 
 	w1 := newDummyWriter("dummy-1", DEBUG, 0)
 	logger.AddWriters(w1)
@@ -117,7 +117,7 @@ func (t testLogString) LogString() string {
 }
 
 func TestLoggerLogString(t *testing.T) {
-	logger := NewLoggerWithWriters(context.Background())
+	logger := NewLoggerWithWriters(context.Background(), "test")
 
 	w1 := newDummyWriter("dummy-1", DEBUG, 0)
 	w1.Mode.Colorize = true
@@ -130,7 +130,7 @@ func TestLoggerLogString(t *testing.T) {
 }
 
 func TestLoggerExpressionFilter(t *testing.T) {
-	logger := NewLoggerWithWriters(context.Background())
+	logger := NewLoggerWithWriters(context.Background(), "test")
 
 	w1 := newDummyWriter("dummy-1", DEBUG, 0)
 	w1.Mode.Expression = "foo.*"
diff --git a/modules/log/manager.go b/modules/log/manager.go
index bbaef7eb20..b5d6cbf8e1 100644
--- a/modules/log/manager.go
+++ b/modules/log/manager.go
@@ -39,7 +39,7 @@ func (m *LoggerManager) GetLogger(name string) *LoggerImpl {
 
 	logger := m.loggers[name]
 	if logger == nil {
-		logger = NewLoggerWithWriters(m.ctx)
+		logger = NewLoggerWithWriters(m.ctx, name)
 		m.loggers[name] = logger
 		if name == DEFAULT {
 			m.defaultLogger.Store(logger)
@@ -137,6 +137,6 @@ func GetManager() *LoggerManager {
 
 func NewManager() *LoggerManager {
 	m := &LoggerManager{writers: map[string]EventWriter{}, loggers: map[string]*LoggerImpl{}}
-	m.ctx, m.ctxCancel = context.WithCancel(context.Background())
+	m.ctx, m.ctxCancel = newContext(context.Background(), "LoggerManager")
 	return m
 }
diff --git a/modules/mirror/mirror.go b/modules/mirror/mirror.go
index 73e591adba..0d9a624730 100644
--- a/modules/mirror/mirror.go
+++ b/modules/mirror/mirror.go
@@ -33,9 +33,11 @@ func StartSyncMirrors(queueHandle func(data ...*SyncRequest) []*SyncRequest) {
 	if !setting.Mirror.Enabled {
 		return
 	}
-	mirrorQueue = queue.CreateUniqueQueue("mirror", queueHandle)
-
-	go graceful.GetManager().RunWithShutdownFns(mirrorQueue.Run)
+	mirrorQueue = queue.CreateUniqueQueue(graceful.GetManager().ShutdownContext(), "mirror", queueHandle)
+	if mirrorQueue == nil {
+		log.Fatal("Unable to create mirror queue")
+	}
+	go graceful.GetManager().RunWithCancel(mirrorQueue)
 }
 
 // AddPullMirrorToQueue adds repoID to mirror queue
diff --git a/modules/notification/ui/ui.go b/modules/notification/ui/ui.go
index a4576b6791..6c5f22c122 100644
--- a/modules/notification/ui/ui.go
+++ b/modules/notification/ui/ui.go
@@ -37,7 +37,10 @@ var _ base.Notifier = &notificationService{}
 // NewNotifier create a new notificationService notifier
 func NewNotifier() base.Notifier {
 	ns := &notificationService{}
-	ns.issueQueue = queue.CreateSimpleQueue("notification-service", handler)
+	ns.issueQueue = queue.CreateSimpleQueue(graceful.GetManager().ShutdownContext(), "notification-service", handler)
+	if ns.issueQueue == nil {
+		log.Fatal("Unable to create notification-service queue")
+	}
 	return ns
 }
 
@@ -51,7 +54,7 @@ func handler(items ...issueNotificationOpts) []issueNotificationOpts {
 }
 
 func (ns *notificationService) Run() {
-	go graceful.GetManager().RunWithShutdownFns(ns.issueQueue.Run)
+	go graceful.GetManager().RunWithCancel(ns.issueQueue) // TODO: using "go" here doesn't seem right, just leave it as old code
 }
 
 func (ns *notificationService) NotifyCreateIssueComment(ctx context.Context, doer *user_model.User, repo *repo_model.Repository,
diff --git a/modules/process/manager.go b/modules/process/manager.go
index fdfca3db7a..25d503c594 100644
--- a/modules/process/manager.go
+++ b/modules/process/manager.go
@@ -167,6 +167,7 @@ func (pm *Manager) Add(ctx context.Context, description string, cancel context.C
 
 	pm.processMap[pid] = process
 	pm.mutex.Unlock()
+
 	Trace(true, pid, description, parentPID, processType)
 
 	pprofCtx := pprof.WithLabels(ctx, pprof.Labels(DescriptionPProfLabel, description, PPIDPProfLabel, string(parentPID), PIDPProfLabel, string(pid), ProcessTypePProfLabel, processType))
@@ -200,10 +201,16 @@ func (pm *Manager) nextPID() (start time.Time, pid IDType) {
 }
 
 func (pm *Manager) remove(process *process) {
+	deleted := false
+
 	pm.mutex.Lock()
-	defer pm.mutex.Unlock()
-	if p := pm.processMap[process.PID]; p == process {
+	if pm.processMap[process.PID] == process {
 		delete(pm.processMap, process.PID)
+		deleted = true
+	}
+	pm.mutex.Unlock()
+
+	if deleted {
 		Trace(false, process.PID, process.Description, process.ParentPID, process.Type)
 	}
 }
diff --git a/modules/queue/manager.go b/modules/queue/manager.go
index 95b3bad57b..8b964c0c28 100644
--- a/modules/queue/manager.go
+++ b/modules/queue/manager.go
@@ -88,22 +88,22 @@ func (m *Manager) FlushAll(ctx context.Context, timeout time.Duration) error {
 }
 
 // CreateSimpleQueue creates a simple queue from global setting config provider by name
-func CreateSimpleQueue[T any](name string, handler HandlerFuncT[T]) *WorkerPoolQueue[T] {
-	return createWorkerPoolQueue(name, setting.CfgProvider, handler, false)
+func CreateSimpleQueue[T any](ctx context.Context, name string, handler HandlerFuncT[T]) *WorkerPoolQueue[T] {
+	return createWorkerPoolQueue(ctx, name, setting.CfgProvider, handler, false)
 }
 
 // CreateUniqueQueue creates a unique queue from global setting config provider by name
-func CreateUniqueQueue[T any](name string, handler HandlerFuncT[T]) *WorkerPoolQueue[T] {
-	return createWorkerPoolQueue(name, setting.CfgProvider, handler, true)
+func CreateUniqueQueue[T any](ctx context.Context, name string, handler HandlerFuncT[T]) *WorkerPoolQueue[T] {
+	return createWorkerPoolQueue(ctx, name, setting.CfgProvider, handler, true)
 }
 
-func createWorkerPoolQueue[T any](name string, cfgProvider setting.ConfigProvider, handler HandlerFuncT[T], unique bool) *WorkerPoolQueue[T] {
+func createWorkerPoolQueue[T any](ctx context.Context, name string, cfgProvider setting.ConfigProvider, handler HandlerFuncT[T], unique bool) *WorkerPoolQueue[T] {
 	queueSetting, err := setting.GetQueueSettings(cfgProvider, name)
 	if err != nil {
 		log.Error("Failed to get queue settings for %q: %v", name, err)
 		return nil
 	}
-	w, err := NewWorkerPoolQueueBySetting(name, queueSetting, handler, unique)
+	w, err := NewWorkerPoolQueueWithContext(ctx, name, queueSetting, handler, unique)
 	if err != nil {
 		log.Error("Failed to create queue %q: %v", name, err)
 		return nil
diff --git a/modules/queue/manager_test.go b/modules/queue/manager_test.go
index 50265e27b6..1fd29f813f 100644
--- a/modules/queue/manager_test.go
+++ b/modules/queue/manager_test.go
@@ -29,7 +29,7 @@ func TestManager(t *testing.T) {
 		if err != nil {
 			return nil, err
 		}
-		return NewWorkerPoolQueueBySetting(name, qs, func(s ...int) (unhandled []int) { return nil }, false)
+		return newWorkerPoolQueueForTest(name, qs, func(s ...int) (unhandled []int) { return nil }, false)
 	}
 
 	// test invalid CONN_STR
@@ -80,7 +80,7 @@ MAX_WORKERS = 2
 
 	assert.NoError(t, err)
 
-	q1 := createWorkerPoolQueue[string]("no-such", cfgProvider, nil, false)
+	q1 := createWorkerPoolQueue[string](context.Background(), "no-such", cfgProvider, nil, false)
 	assert.Equal(t, "no-such", q1.GetName())
 	assert.Equal(t, "dummy", q1.GetType()) // no handler, so it becomes dummy
 	assert.Equal(t, filepath.Join(setting.AppDataPath, "queues/dir1"), q1.baseConfig.DataFullDir)
@@ -96,7 +96,7 @@ MAX_WORKERS = 2
 	assert.Equal(t, "string", q1.GetItemTypeName())
 	qid1 := GetManager().qidCounter
 
-	q2 := createWorkerPoolQueue("sub", cfgProvider, func(s ...int) (unhandled []int) { return nil }, false)
+	q2 := createWorkerPoolQueue(context.Background(), "sub", cfgProvider, func(s ...int) (unhandled []int) { return nil }, false)
 	assert.Equal(t, "sub", q2.GetName())
 	assert.Equal(t, "level", q2.GetType())
 	assert.Equal(t, filepath.Join(setting.AppDataPath, "queues/dir2"), q2.baseConfig.DataFullDir)
diff --git a/modules/queue/workergroup.go b/modules/queue/workergroup.go
index 7127ea1117..147a4f335e 100644
--- a/modules/queue/workergroup.go
+++ b/modules/queue/workergroup.go
@@ -5,6 +5,7 @@ package queue
 
 import (
 	"context"
+	"runtime/pprof"
 	"sync"
 	"sync/atomic"
 	"time"
@@ -13,9 +14,10 @@ import (
 )
 
 var (
-	infiniteTimerC        = make(chan time.Time)
-	batchDebounceDuration = 100 * time.Millisecond
-	workerIdleDuration    = 1 * time.Second
+	infiniteTimerC         = make(chan time.Time)
+	batchDebounceDuration  = 100 * time.Millisecond
+	workerIdleDuration     = 1 * time.Second
+	shutdownDefaultTimeout = 2 * time.Second
 
 	unhandledItemRequeueDuration atomic.Int64 // to avoid data race during test
 )
@@ -116,13 +118,15 @@ func (q *WorkerPoolQueue[T]) doWorkerHandle(batch []T) {
 // If the queue is shutting down, it returns true and try to push the items
 // Otherwise it does nothing and returns false
 func (q *WorkerPoolQueue[T]) basePushForShutdown(items ...T) bool {
-	ctxShutdown := q.ctxShutdown.Load()
-	if ctxShutdown == nil {
+	shutdownTimeout := time.Duration(q.shutdownTimeout.Load())
+	if shutdownTimeout == 0 {
 		return false
 	}
+	ctxShutdown, ctxShutdownCancel := context.WithTimeout(context.Background(), shutdownTimeout)
+	defer ctxShutdownCancel()
 	for _, item := range items {
 		// if there is still any error, the queue can do nothing instead of losing the items
-		if err := q.baseQueue.PushItem(*ctxShutdown, q.marshal(item)); err != nil {
+		if err := q.baseQueue.PushItem(ctxShutdown, q.marshal(item)); err != nil {
 			log.Error("Failed to requeue item for queue %q when shutting down: %v", q.GetName(), err)
 		}
 	}
@@ -246,6 +250,8 @@ var skipFlushChan = make(chan flushType) // an empty flush chan, used to skip re
 
 // doRun is the main loop of the queue. All related "doXxx" functions are executed in its context.
 func (q *WorkerPoolQueue[T]) doRun() {
+	pprof.SetGoroutineLabels(q.ctxRun)
+
 	log.Debug("Queue %q starts running", q.GetName())
 	defer log.Debug("Queue %q stops running", q.GetName())
 
@@ -271,8 +277,8 @@ func (q *WorkerPoolQueue[T]) doRun() {
 			}
 		}
 
-		ctxShutdownPtr := q.ctxShutdown.Load()
-		if ctxShutdownPtr != nil {
+		shutdownTimeout := time.Duration(q.shutdownTimeout.Load())
+		if shutdownTimeout != 0 {
 			// if there is a shutdown context, try to push the items back to the base queue
 			q.basePushForShutdown(unhandled...)
 			workerDone := make(chan struct{})
@@ -280,7 +286,7 @@ func (q *WorkerPoolQueue[T]) doRun() {
 			go func() { wg.wg.Wait(); close(workerDone) }()
 			select {
 			case <-workerDone:
-			case <-(*ctxShutdownPtr).Done():
+			case <-time.After(shutdownTimeout):
 				log.Error("Queue %q is shutting down, but workers are still running after timeout", q.GetName())
 			}
 		} else {
diff --git a/modules/queue/workerqueue.go b/modules/queue/workerqueue.go
index 5695c6cc23..e0d5183bd9 100644
--- a/modules/queue/workerqueue.go
+++ b/modules/queue/workerqueue.go
@@ -10,9 +10,9 @@ import (
 	"sync/atomic"
 	"time"
 
-	"code.gitea.io/gitea/modules/graceful"
 	"code.gitea.io/gitea/modules/json"
 	"code.gitea.io/gitea/modules/log"
+	"code.gitea.io/gitea/modules/process"
 	"code.gitea.io/gitea/modules/setting"
 )
 
@@ -21,8 +21,9 @@ import (
 type WorkerPoolQueue[T any] struct {
 	ctxRun       context.Context
 	ctxRunCancel context.CancelFunc
-	ctxShutdown  atomic.Pointer[context.Context]
-	shutdownDone chan struct{}
+
+	shutdownDone    chan struct{}
+	shutdownTimeout atomic.Int64 // in case some buggy handlers (workers) would hang forever, "shutdown" should finish in predictable time
 
 	origHandler HandlerFuncT[T]
 	safeHandler HandlerFuncT[T]
@@ -175,22 +176,19 @@ func (q *WorkerPoolQueue[T]) Has(data T) (bool, error) {
 	return q.baseQueue.HasItem(q.ctxRun, q.marshal(data))
 }
 
-func (q *WorkerPoolQueue[T]) Run(atShutdown, atTerminate func(func())) {
-	atShutdown(func() {
-		// in case some queue handlers are slow or have hanging bugs, at most wait for a short time
-		q.ShutdownWait(1 * time.Second)
-	})
+func (q *WorkerPoolQueue[T]) Run() {
 	q.doRun()
 }
 
+func (q *WorkerPoolQueue[T]) Cancel() {
+	q.ctxRunCancel()
+}
+
 // ShutdownWait shuts down the queue, waits for all workers to finish their jobs, and pushes the unhandled items back to the base queue
 // It waits for all workers (handlers) to finish their jobs, in case some buggy handlers would hang forever, a reasonable timeout is needed
 func (q *WorkerPoolQueue[T]) ShutdownWait(timeout time.Duration) {
-	shutdownCtx, shutdownCtxCancel := context.WithTimeout(context.Background(), timeout)
-	defer shutdownCtxCancel()
-	if q.ctxShutdown.CompareAndSwap(nil, &shutdownCtx) {
-		q.ctxRunCancel()
-	}
+	q.shutdownTimeout.Store(int64(timeout))
+	q.ctxRunCancel()
 	<-q.shutdownDone
 }
 
@@ -207,7 +205,11 @@ func getNewQueueFn(t string) (string, func(cfg *BaseConfig, unique bool) (baseQu
 	}
 }
 
-func NewWorkerPoolQueueBySetting[T any](name string, queueSetting setting.QueueSettings, handler HandlerFuncT[T], unique bool) (*WorkerPoolQueue[T], error) {
+func newWorkerPoolQueueForTest[T any](name string, queueSetting setting.QueueSettings, handler HandlerFuncT[T], unique bool) (*WorkerPoolQueue[T], error) {
+	return NewWorkerPoolQueueWithContext(context.Background(), name, queueSetting, handler, unique)
+}
+
+func NewWorkerPoolQueueWithContext[T any](ctx context.Context, name string, queueSetting setting.QueueSettings, handler HandlerFuncT[T], unique bool) (*WorkerPoolQueue[T], error) {
 	if handler == nil {
 		log.Debug("Use dummy queue for %q because handler is nil and caller doesn't want to process the queue items", name)
 		queueSetting.Type = "dummy"
@@ -224,10 +226,11 @@ func NewWorkerPoolQueueBySetting[T any](name string, queueSetting setting.QueueS
 	}
 	log.Trace("Created queue %q of type %q", name, queueType)
 
-	w.ctxRun, w.ctxRunCancel = context.WithCancel(graceful.GetManager().ShutdownContext())
+	w.ctxRun, _, w.ctxRunCancel = process.GetManager().AddTypedContext(ctx, "Queue: "+w.GetName(), process.SystemProcessType, false)
 	w.batchChan = make(chan []T)
 	w.flushChan = make(chan flushType)
 	w.shutdownDone = make(chan struct{})
+	w.shutdownTimeout.Store(int64(shutdownDefaultTimeout))
 	w.workerMaxNum = queueSetting.MaxWorkers
 	w.batchLength = queueSetting.BatchLength
 
diff --git a/modules/queue/workerqueue_test.go b/modules/queue/workerqueue_test.go
index da9451cd77..e60120162a 100644
--- a/modules/queue/workerqueue_test.go
+++ b/modules/queue/workerqueue_test.go
@@ -16,17 +16,9 @@ import (
 )
 
 func runWorkerPoolQueue[T any](q *WorkerPoolQueue[T]) func() {
-	var stop func()
-	started := make(chan struct{})
-	stopped := make(chan struct{})
-	go func() {
-		q.Run(func(f func()) { stop = f; close(started) }, nil)
-		close(stopped)
-	}()
-	<-started
+	go q.Run()
 	return func() {
-		stop()
-		<-stopped
+		q.ShutdownWait(1 * time.Second)
 	}
 }
 
@@ -57,7 +49,7 @@ func TestWorkerPoolQueueUnhandled(t *testing.T) {
 			return unhandled
 		}
 
-		q, _ := NewWorkerPoolQueueBySetting("test-workpoolqueue", queueSetting, handler, false)
+		q, _ := newWorkerPoolQueueForTest("test-workpoolqueue", queueSetting, handler, false)
 		stop := runWorkerPoolQueue(q)
 		for i := 0; i < queueSetting.Length; i++ {
 			testRecorder.Record("push:%v", i)
@@ -145,7 +137,7 @@ func testWorkerPoolQueuePersistence(t *testing.T, queueSetting setting.QueueSett
 			return nil
 		}
 
-		q, _ := NewWorkerPoolQueueBySetting("pr_patch_checker_test", queueSetting, testHandler, true)
+		q, _ := newWorkerPoolQueueForTest("pr_patch_checker_test", queueSetting, testHandler, true)
 		stop := runWorkerPoolQueue(q)
 		for i := 0; i < testCount; i++ {
 			_ = q.Push("task-" + strconv.Itoa(i))
@@ -169,7 +161,7 @@ func testWorkerPoolQueuePersistence(t *testing.T, queueSetting setting.QueueSett
 			return nil
 		}
 
-		q, _ := NewWorkerPoolQueueBySetting("pr_patch_checker_test", queueSetting, testHandler, true)
+		q, _ := newWorkerPoolQueueForTest("pr_patch_checker_test", queueSetting, testHandler, true)
 		stop := runWorkerPoolQueue(q)
 		assert.NoError(t, q.FlushWithContext(context.Background(), 0))
 		stop()
@@ -194,7 +186,7 @@ func TestWorkerPoolQueueActiveWorkers(t *testing.T) {
 		return nil
 	}
 
-	q, _ := NewWorkerPoolQueueBySetting("test-workpoolqueue", setting.QueueSettings{Type: "channel", BatchLength: 1, MaxWorkers: 1, Length: 100}, handler, false)
+	q, _ := newWorkerPoolQueueForTest("test-workpoolqueue", setting.QueueSettings{Type: "channel", BatchLength: 1, MaxWorkers: 1, Length: 100}, handler, false)
 	stop := runWorkerPoolQueue(q)
 	for i := 0; i < 5; i++ {
 		assert.NoError(t, q.Push(i))
@@ -210,7 +202,7 @@ func TestWorkerPoolQueueActiveWorkers(t *testing.T) {
 	assert.EqualValues(t, 1, q.GetWorkerNumber()) // there is at least one worker after the queue begins working
 	stop()
 
-	q, _ = NewWorkerPoolQueueBySetting("test-workpoolqueue", setting.QueueSettings{Type: "channel", BatchLength: 1, MaxWorkers: 3, Length: 100}, handler, false)
+	q, _ = newWorkerPoolQueueForTest("test-workpoolqueue", setting.QueueSettings{Type: "channel", BatchLength: 1, MaxWorkers: 3, Length: 100}, handler, false)
 	stop = runWorkerPoolQueue(q)
 	for i := 0; i < 15; i++ {
 		assert.NoError(t, q.Push(i))
@@ -238,23 +230,23 @@ func TestWorkerPoolQueueShutdown(t *testing.T) {
 		if items[0] == 0 {
 			close(handlerCalled)
 		}
-		time.Sleep(100 * time.Millisecond)
+		time.Sleep(400 * time.Millisecond)
 		return items
 	}
 
 	qs := setting.QueueSettings{Type: "level", Datadir: t.TempDir() + "/queue", BatchLength: 3, MaxWorkers: 4, Length: 20}
-	q, _ := NewWorkerPoolQueueBySetting("test-workpoolqueue", qs, handler, false)
+	q, _ := newWorkerPoolQueueForTest("test-workpoolqueue", qs, handler, false)
 	stop := runWorkerPoolQueue(q)
 	for i := 0; i < qs.Length; i++ {
 		assert.NoError(t, q.Push(i))
 	}
 	<-handlerCalled
-	time.Sleep(50 * time.Millisecond) // wait for a while to make sure all workers are active
+	time.Sleep(200 * time.Millisecond) // wait for a while to make sure all workers are active
 	assert.EqualValues(t, 4, q.GetWorkerActiveNumber())
 	stop() // stop triggers shutdown
 	assert.EqualValues(t, 0, q.GetWorkerActiveNumber())
 
 	// no item was ever handled, so we still get all of them again
-	q, _ = NewWorkerPoolQueueBySetting("test-workpoolqueue", qs, handler, false)
+	q, _ = newWorkerPoolQueueForTest("test-workpoolqueue", qs, handler, false)
 	assert.EqualValues(t, 20, q.GetQueueItemNumber())
 }
diff --git a/routers/web/admin/queue_tester.go b/routers/web/admin/queue_tester.go
index 96373c4d5f..8f713b3bb1 100644
--- a/routers/web/admin/queue_tester.go
+++ b/routers/web/admin/queue_tester.go
@@ -4,12 +4,13 @@
 package admin
 
 import (
-	gocontext "context"
+	"runtime/pprof"
 	"sync"
 	"time"
 
 	"code.gitea.io/gitea/modules/graceful"
 	"code.gitea.io/gitea/modules/log"
+	"code.gitea.io/gitea/modules/process"
 	"code.gitea.io/gitea/modules/queue"
 	"code.gitea.io/gitea/modules/setting"
 )
@@ -21,6 +22,7 @@ var testQueueOnce sync.Once
 // developers could see the queue length / worker number / items number on the admin page and try to remove the items
 func initTestQueueOnce() {
 	testQueueOnce.Do(func() {
+		ctx, _, finished := process.GetManager().AddTypedContext(graceful.GetManager().ShutdownContext(), "TestQueue", process.SystemProcessType, false)
 		qs := setting.QueueSettings{
 			Name:        "test-queue",
 			Type:        "channel",
@@ -28,7 +30,7 @@ func initTestQueueOnce() {
 			BatchLength: 2,
 			MaxWorkers:  3,
 		}
-		testQueue, err := queue.NewWorkerPoolQueueBySetting("test-queue", qs, func(t ...int64) (unhandled []int64) {
+		testQueue, err := queue.NewWorkerPoolQueueWithContext(ctx, "test-queue", qs, func(t ...int64) (unhandled []int64) {
 			for range t {
 				select {
 				case <-graceful.GetManager().ShutdownContext().Done():
@@ -44,8 +46,11 @@ func initTestQueueOnce() {
 
 		queue.GetManager().AddManagedQueue(testQueue)
 		testQueue.SetWorkerMaxNumber(5)
-		go graceful.GetManager().RunWithShutdownFns(testQueue.Run)
-		go graceful.GetManager().RunWithShutdownContext(func(ctx gocontext.Context) {
+		go graceful.GetManager().RunWithCancel(testQueue)
+		go func() {
+			pprof.SetGoroutineLabels(ctx)
+			defer finished()
+
 			cnt := int64(0)
 			adding := true
 			for {
@@ -67,6 +72,6 @@ func initTestQueueOnce() {
 					}
 				}
 			}
-		})
+		}()
 	})
 }
diff --git a/services/actions/init.go b/services/actions/init.go
index 8a9a30084a..26573c1681 100644
--- a/services/actions/init.go
+++ b/services/actions/init.go
@@ -5,6 +5,7 @@ package actions
 
 import (
 	"code.gitea.io/gitea/modules/graceful"
+	"code.gitea.io/gitea/modules/log"
 	"code.gitea.io/gitea/modules/notification"
 	"code.gitea.io/gitea/modules/queue"
 	"code.gitea.io/gitea/modules/setting"
@@ -15,8 +16,11 @@ func Init() {
 		return
 	}
 
-	jobEmitterQueue = queue.CreateUniqueQueue("actions_ready_job", jobEmitterQueueHandler)
-	go graceful.GetManager().RunWithShutdownFns(jobEmitterQueue.Run)
+	jobEmitterQueue = queue.CreateUniqueQueue(graceful.GetManager().ShutdownContext(), "actions_ready_job", jobEmitterQueueHandler)
+	if jobEmitterQueue == nil {
+		log.Fatal("Unable to create actions_ready_job queue")
+	}
+	go graceful.GetManager().RunWithCancel(jobEmitterQueue)
 
 	notification.RegisterNotifier(NewNotifier())
 }
diff --git a/services/automerge/automerge.go b/services/automerge/automerge.go
index f001a6ccc5..bf713c4431 100644
--- a/services/automerge/automerge.go
+++ b/services/automerge/automerge.go
@@ -29,11 +29,11 @@ var prAutoMergeQueue *queue.WorkerPoolQueue[string]
 
 // Init runs the task queue to that handles auto merges
 func Init() error {
-	prAutoMergeQueue = queue.CreateUniqueQueue("pr_auto_merge", handler)
+	prAutoMergeQueue = queue.CreateUniqueQueue(graceful.GetManager().ShutdownContext(), "pr_auto_merge", handler)
 	if prAutoMergeQueue == nil {
-		return fmt.Errorf("Unable to create pr_auto_merge Queue")
+		return fmt.Errorf("unable to create pr_auto_merge queue")
 	}
-	go graceful.GetManager().RunWithShutdownFns(prAutoMergeQueue.Run)
+	go graceful.GetManager().RunWithCancel(prAutoMergeQueue)
 	return nil
 }
 
diff --git a/services/mailer/mailer.go b/services/mailer/mailer.go
index 5aeda9ed79..ee4721d438 100644
--- a/services/mailer/mailer.go
+++ b/services/mailer/mailer.go
@@ -401,7 +401,9 @@ func NewContext(ctx context.Context) {
 		Sender = &smtpSender{}
 	}
 
-	mailQueue = queue.CreateSimpleQueue("mail", func(items ...*Message) []*Message {
+	subjectTemplates, bodyTemplates = templates.Mailer(ctx)
+
+	mailQueue = queue.CreateSimpleQueue(graceful.GetManager().ShutdownContext(), "mail", func(items ...*Message) []*Message {
 		for _, msg := range items {
 			gomailMsg := msg.ToMessage()
 			log.Trace("New e-mail sending request %s: %s", gomailMsg.GetHeader("To"), msg.Info)
@@ -413,10 +415,10 @@ func NewContext(ctx context.Context) {
 		}
 		return nil
 	})
-
-	go graceful.GetManager().RunWithShutdownFns(mailQueue.Run)
-
-	subjectTemplates, bodyTemplates = templates.Mailer(ctx)
+	if mailQueue == nil {
+		log.Fatal("Unable to create mail queue")
+	}
+	go graceful.GetManager().RunWithCancel(mailQueue)
 }
 
 // SendAsync send mail asynchronously
diff --git a/services/pull/check.go b/services/pull/check.go
index 8bc2bdff1d..b5150ad9a8 100644
--- a/services/pull/check.go
+++ b/services/pull/check.go
@@ -384,13 +384,13 @@ func CheckPRsForBaseBranch(baseRepo *repo_model.Repository, baseBranchName strin
 
 // Init runs the task queue to test all the checking status pull requests
 func Init() error {
-	prPatchCheckerQueue = queue.CreateUniqueQueue("pr_patch_checker", handler)
+	prPatchCheckerQueue = queue.CreateUniqueQueue(graceful.GetManager().ShutdownContext(), "pr_patch_checker", handler)
 
 	if prPatchCheckerQueue == nil {
-		return fmt.Errorf("Unable to create pr_patch_checker Queue")
+		return fmt.Errorf("unable to create pr_patch_checker queue")
 	}
 
-	go graceful.GetManager().RunWithShutdownFns(prPatchCheckerQueue.Run)
+	go graceful.GetManager().RunWithCancel(prPatchCheckerQueue)
 	go graceful.GetManager().RunWithShutdownContext(InitializePullRequests)
 	return nil
 }
diff --git a/services/pull/check_test.go b/services/pull/check_test.go
index 52209b4d35..57ccb20033 100644
--- a/services/pull/check_test.go
+++ b/services/pull/check_test.go
@@ -5,6 +5,7 @@
 package pull
 
 import (
+	"context"
 	"strconv"
 	"testing"
 	"time"
@@ -31,7 +32,7 @@ func TestPullRequest_AddToTaskQueue(t *testing.T) {
 
 	cfg, err := setting.GetQueueSettings(setting.CfgProvider, "pr_patch_checker")
 	assert.NoError(t, err)
-	prPatchCheckerQueue, err = queue.NewWorkerPoolQueueBySetting("pr_patch_checker", cfg, testHandler, true)
+	prPatchCheckerQueue, err = queue.NewWorkerPoolQueueWithContext(context.Background(), "pr_patch_checker", cfg, testHandler, true)
 	assert.NoError(t, err)
 
 	pr := unittest.AssertExistsAndLoadBean(t, &issues_model.PullRequest{ID: 2})
@@ -46,12 +47,7 @@ func TestPullRequest_AddToTaskQueue(t *testing.T) {
 	assert.True(t, has)
 	assert.NoError(t, err)
 
-	var queueShutdown, queueTerminate []func()
-	go prPatchCheckerQueue.Run(func(shutdown func()) {
-		queueShutdown = append(queueShutdown, shutdown)
-	}, func(terminate func()) {
-		queueTerminate = append(queueTerminate, terminate)
-	})
+	go prPatchCheckerQueue.Run()
 
 	select {
 	case id := <-idChan:
@@ -67,12 +63,6 @@ func TestPullRequest_AddToTaskQueue(t *testing.T) {
 	pr = unittest.AssertExistsAndLoadBean(t, &issues_model.PullRequest{ID: 2})
 	assert.Equal(t, issues_model.PullRequestStatusChecking, pr.Status)
 
-	for _, callback := range queueShutdown {
-		callback()
-	}
-	for _, callback := range queueTerminate {
-		callback()
-	}
-
+	prPatchCheckerQueue.ShutdownWait(5 * time.Second)
 	prPatchCheckerQueue = nil
 }
diff --git a/services/repository/archiver/archiver.go b/services/repository/archiver/archiver.go
index 1c514a4112..2e3defee8d 100644
--- a/services/repository/archiver/archiver.go
+++ b/services/repository/archiver/archiver.go
@@ -297,7 +297,7 @@ func ArchiveRepository(request *ArchiveRequest) (*repo_model.RepoArchiver, error
 
 var archiverQueue *queue.WorkerPoolQueue[*ArchiveRequest]
 
-// Init initlize archive
+// Init initializes archiver
 func Init() error {
 	handler := func(items ...*ArchiveRequest) []*ArchiveRequest {
 		for _, archiveReq := range items {
@@ -309,12 +309,11 @@ func Init() error {
 		return nil
 	}
 
-	archiverQueue = queue.CreateUniqueQueue("repo-archive", handler)
+	archiverQueue = queue.CreateUniqueQueue(graceful.GetManager().ShutdownContext(), "repo-archive", handler)
 	if archiverQueue == nil {
-		return errors.New("unable to create codes indexer queue")
+		return errors.New("unable to create repo-archive queue")
 	}
-
-	go graceful.GetManager().RunWithShutdownFns(archiverQueue.Run)
+	go graceful.GetManager().RunWithCancel(archiverQueue)
 
 	return nil
 }
diff --git a/services/repository/push.go b/services/repository/push.go
index f213948916..571eedccb3 100644
--- a/services/repository/push.go
+++ b/services/repository/push.go
@@ -42,12 +42,11 @@ func handler(items ...[]*repo_module.PushUpdateOptions) [][]*repo_module.PushUpd
 }
 
 func initPushQueue() error {
-	pushQueue = queue.CreateSimpleQueue("push_update", handler)
+	pushQueue = queue.CreateSimpleQueue(graceful.GetManager().ShutdownContext(), "push_update", handler)
 	if pushQueue == nil {
-		return errors.New("unable to create push_update Queue")
+		return errors.New("unable to create push_update queue")
 	}
-
-	go graceful.GetManager().RunWithShutdownFns(pushQueue.Run)
+	go graceful.GetManager().RunWithCancel(pushQueue)
 	return nil
 }
 
diff --git a/services/task/task.go b/services/task/task.go
index 493586a645..11a47a68bb 100644
--- a/services/task/task.go
+++ b/services/task/task.go
@@ -37,14 +37,11 @@ func Run(t *admin_model.Task) error {
 
 // Init will start the service to get all unfinished tasks and run them
 func Init() error {
-	taskQueue = queue.CreateSimpleQueue("task", handler)
-
+	taskQueue = queue.CreateSimpleQueue(graceful.GetManager().ShutdownContext(), "task", handler)
 	if taskQueue == nil {
-		return fmt.Errorf("Unable to create Task Queue")
+		return fmt.Errorf("unable to create task queue")
 	}
-
-	go graceful.GetManager().RunWithShutdownFns(taskQueue.Run)
-
+	go graceful.GetManager().RunWithCancel(taskQueue)
 	return nil
 }
 
diff --git a/services/webhook/deliver.go b/services/webhook/deliver.go
index e817783e55..fd7a3d7fba 100644
--- a/services/webhook/deliver.go
+++ b/services/webhook/deliver.go
@@ -283,11 +283,11 @@ func Init() error {
 		},
 	}
 
-	hookQueue = queue.CreateUniqueQueue("webhook_sender", handler)
+	hookQueue = queue.CreateUniqueQueue(graceful.GetManager().ShutdownContext(), "webhook_sender", handler)
 	if hookQueue == nil {
-		return fmt.Errorf("Unable to create webhook_sender Queue")
+		return fmt.Errorf("unable to create webhook_sender queue")
 	}
-	go graceful.GetManager().RunWithShutdownFns(hookQueue.Run)
+	go graceful.GetManager().RunWithCancel(hookQueue)
 
 	go graceful.GetManager().RunWithShutdownContext(populateWebhookSendingQueue)