diff --git a/cli/server/harness.wire.go b/cli/server/harness.wire.go index 304cef98a..071c92a18 100644 --- a/cli/server/harness.wire.go +++ b/cli/server/harness.wire.go @@ -39,6 +39,7 @@ import ( "github.com/harness/gitness/internal/store/cache" "github.com/harness/gitness/internal/store/database" "github.com/harness/gitness/internal/url" + "github.com/harness/gitness/lock" gitnesstypes "github.com/harness/gitness/types" "github.com/google/wire" @@ -78,6 +79,7 @@ func initSystem(ctx context.Context, config *gitnesstypes.Config) (*system, erro events.WireSet, webhook.WireSet, githook.WireSet, + lock.WireSet, ) return &system{}, nil } diff --git a/cli/server/harness.wire_gen.go b/cli/server/harness.wire_gen.go index 43cc3eaf3..482479182 100644 --- a/cli/server/harness.wire_gen.go +++ b/cli/server/harness.wire_gen.go @@ -37,6 +37,7 @@ import ( "github.com/harness/gitness/internal/store/cache" "github.com/harness/gitness/internal/store/database" "github.com/harness/gitness/internal/url" + "github.com/harness/gitness/lock" "github.com/harness/gitness/types" ) @@ -114,11 +115,11 @@ func initSystem(ctx context.Context, config *types.Config) (*system, error) { pullReqReviewStore := database.ProvidePullReqReviewStore(db) pullReqReviewerStore := database.ProvidePullReqReviewerStore(db, principalInfoCache) eventsConfig := ProvideEventsConfig(config) - cmdable, err := ProvideRedis(config) + universalClient, err := ProvideRedis(config) if err != nil { return nil, err } - eventsSystem, err := events.ProvideSystem(eventsConfig, cmdable) + eventsSystem, err := events.ProvideSystem(eventsConfig, universalClient) if err != nil { return nil, err } @@ -126,7 +127,9 @@ func initSystem(ctx context.Context, config *types.Config) (*system, error) { if err != nil { return nil, err } - pullreqController := pullreq.ProvideController(db, provider, authorizer, pullReqStore, pullReqActivityStore, pullReqReviewStore, pullReqReviewerStore, repoStore, principalStore, gitrpcInterface, reporter) + lockConfig := lock.ProvideConfig(config) + mutexManager := lock.ProvideMutexManager(lockConfig, universalClient) + pullreqController := pullreq.ProvideController(db, provider, authorizer, pullReqStore, pullReqActivityStore, pullReqReviewStore, pullReqReviewerStore, repoStore, principalStore, gitrpcInterface, reporter, mutexManager) webhookStore := database.ProvideWebhookStore(db) webhookExecutionStore := database.ProvideWebhookExecutionStore(db) webhookConfig := ProvideWebhookConfig(config) diff --git a/cli/server/redis.go b/cli/server/redis.go index 95a84e0bf..0f6756742 100644 --- a/cli/server/redis.go +++ b/cli/server/redis.go @@ -13,7 +13,7 @@ import ( // ProvideRedis provides a redis client based on the configuration. // TODO: add support for sentinal / cluster // TODO: add support for TLS -func ProvideRedis(config *types.Config) (redis.Cmdable, error) { +func ProvideRedis(config *types.Config) (redis.UniversalClient, error) { options := &redis.Options{ Addr: config.Redis.Endpoint, MaxRetries: config.Redis.MaxRetries, diff --git a/cli/server/standalone.wire.go b/cli/server/standalone.wire.go index 5f4844600..3a708f2a4 100644 --- a/cli/server/standalone.wire.go +++ b/cli/server/standalone.wire.go @@ -35,6 +35,7 @@ import ( "github.com/harness/gitness/internal/store/cache" "github.com/harness/gitness/internal/store/database" "github.com/harness/gitness/internal/url" + "github.com/harness/gitness/lock" "github.com/harness/gitness/types" "github.com/harness/gitness/types/check" @@ -72,6 +73,7 @@ func initSystem(ctx context.Context, config *types.Config) (*system, error) { events.WireSet, webhook.WireSet, githook.WireSet, + lock.WireSet, ) return &system{}, nil } diff --git a/cli/server/standalone.wire_gen.go b/cli/server/standalone.wire_gen.go index a7b615371..e28095b3d 100644 --- a/cli/server/standalone.wire_gen.go +++ b/cli/server/standalone.wire_gen.go @@ -32,6 +32,7 @@ import ( "github.com/harness/gitness/internal/store/cache" "github.com/harness/gitness/internal/store/database" "github.com/harness/gitness/internal/url" + "github.com/harness/gitness/lock" "github.com/harness/gitness/types" "github.com/harness/gitness/types/check" ) @@ -75,11 +76,11 @@ func initSystem(ctx context.Context, config *types.Config) (*system, error) { pullReqReviewStore := database.ProvidePullReqReviewStore(db) pullReqReviewerStore := database.ProvidePullReqReviewerStore(db, principalInfoCache) eventsConfig := ProvideEventsConfig(config) - cmdable, err := ProvideRedis(config) + universalClient, err := ProvideRedis(config) if err != nil { return nil, err } - eventsSystem, err := events.ProvideSystem(eventsConfig, cmdable) + eventsSystem, err := events.ProvideSystem(eventsConfig, universalClient) if err != nil { return nil, err } @@ -87,7 +88,9 @@ func initSystem(ctx context.Context, config *types.Config) (*system, error) { if err != nil { return nil, err } - pullreqController := pullreq.ProvideController(db, provider, authorizer, pullReqStore, pullReqActivityStore, pullReqReviewStore, pullReqReviewerStore, repoStore, principalStore, gitrpcInterface, reporter) + lockConfig := lock.ProvideConfig(config) + mutexManager := lock.ProvideMutexManager(lockConfig, universalClient) + pullreqController := pullreq.ProvideController(db, provider, authorizer, pullReqStore, pullReqActivityStore, pullReqReviewStore, pullReqReviewerStore, repoStore, principalStore, gitrpcInterface, reporter, mutexManager) webhookStore := database.ProvideWebhookStore(db) webhookExecutionStore := database.ProvideWebhookExecutionStore(db) webhookConfig := ProvideWebhookConfig(config) diff --git a/events/wire.go b/events/wire.go index b97505e92..d225dfb85 100644 --- a/events/wire.go +++ b/events/wire.go @@ -19,7 +19,7 @@ var WireSet = wire.NewSet( ProvideSystem, ) -func ProvideSystem(config Config, redisClient redis.Cmdable) (*System, error) { +func ProvideSystem(config Config, redisClient redis.UniversalClient) (*System, error) { var system *System var err error switch config.Mode { @@ -50,7 +50,7 @@ func provideSystemInMemory(config Config) (*System, error) { ) } -func provideSystemRedis(config Config, redisClient redis.Cmdable) (*System, error) { +func provideSystemRedis(config Config, redisClient redis.UniversalClient) (*System, error) { if redisClient == nil { return nil, errors.New("redis client required") } @@ -72,13 +72,16 @@ func newMemoryStreamProducer(broker *stream.MemoryBroker, namespace string) Stre return stream.NewMemoryProducer(broker, namespace) } -func newRedisStreamConsumerFactoryMethod(redisClient redis.Cmdable, namespace string) StreamConsumerFactoryFunc { +func newRedisStreamConsumerFactoryMethod( + redisClient redis.UniversalClient, + namespace string, +) StreamConsumerFactoryFunc { return func(groupName string, consumerName string) (StreamConsumer, error) { return stream.NewRedisConsumer(redisClient, namespace, groupName, consumerName) } } -func newRedisStreamProducer(redisClient redis.Cmdable, namespace string, +func newRedisStreamProducer(redisClient redis.UniversalClient, namespace string, maxStreamLength int64, approxMaxStreamLength bool) StreamProducer { return stream.NewRedisProducer(redisClient, namespace, maxStreamLength, approxMaxStreamLength) } diff --git a/gitrpc/lock/interface.go b/gitrpc/lock/interface.go deleted file mode 100644 index a340ae635..000000000 --- a/gitrpc/lock/interface.go +++ /dev/null @@ -1,12 +0,0 @@ -// Copyright 2022 Harness Inc. All rights reserved. -// Use of this source code is governed by the Polyform Free Trial License -// that can be found in the LICENSE.md file for this repository. - -package lock - -import "context" - -// Locker acquires new lock based on key. -type Locker interface { - AcquireLock(ctx context.Context, key string) (*Lock, error) -} diff --git a/gitrpc/lock/mutex.go b/gitrpc/lock/mutex.go deleted file mode 100644 index a397d6a03..000000000 --- a/gitrpc/lock/mutex.go +++ /dev/null @@ -1,81 +0,0 @@ -// Copyright 2022 Harness Inc. All rights reserved. -// Use of this source code is governed by the Polyform Free Trial License -// that can be found in the LICENSE.md file for this repository. - -package lock - -import ( - "context" - "errors" - "sync" -) - -// Mutex is basic locker implementation -// do not use it in prod and distributed environment. -type Mutex struct { - mux sync.RWMutex - locks map[string]*Lock -} - -func (c *Mutex) AcquireLock(ctx context.Context, key string) (*Lock, error) { - if c == nil { - return nil, errors.New("mutex not initialized") - } - - c.mux.Lock() - if c.locks == nil { - c.locks = make(map[string]*Lock) - } - lock, ok := c.locks[key] - c.mux.Unlock() - - if !ok { - lock = &Lock{ - state: false, - key: key, - lockChan: make(chan struct{}, 1), - } - } - - select { - case lock.lockChan <- struct{}{}: - c.mux.Lock() - defer c.mux.Unlock() - lock.state = true - c.locks[key] = lock - return lock, nil - case <-ctx.Done(): - return nil, errors.New("deadline exceeded, lock not created") - } -} - -// Lock represents an obtained, app wide lock. -type Lock struct { - state bool - key string - lockChan chan struct{} -} - -// Key returns the redis key used by the lock. -func (l *Lock) Key() string { - if l == nil { - return "" - } - return l.key -} - -// Locked returns if this key is locked. -func (l *Lock) Locked() bool { - if l == nil { - return false - } - return l.state -} - -// Release manually releases the lock. -func (l *Lock) Release() { - if l == nil { - return - } - <-l.lockChan -} diff --git a/gitrpc/lock/mutex_test.go b/gitrpc/lock/mutex_test.go deleted file mode 100644 index 45013d8cd..000000000 --- a/gitrpc/lock/mutex_test.go +++ /dev/null @@ -1,24 +0,0 @@ -// Copyright 2022 Harness Inc. All rights reserved. -// Use of this source code is governed by the Polyform Free Trial License -// that can be found in the LICENSE.md file for this repository. - -package lock - -import ( - "context" - "testing" -) - -func TestClient_AcquireLock(t *testing.T) { - c := Mutex{} - lock, err := c.AcquireLock(context.Background(), "simple") - if err != nil { - t.Error(err) - } - lock1, err := c.AcquireLock(context.Background(), "simple") - if err != nil { - t.Error(err) - } - lock.Release() - lock1.Release() -} diff --git a/go.mod b/go.mod index a5a9721da..eb180c3b9 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,7 @@ require ( github.com/go-chi/chi v1.5.4 github.com/go-chi/cors v1.2.1 github.com/go-redis/redis/v8 v8.11.5 + github.com/go-redsync/redsync/v4 v4.7.1 github.com/golang-jwt/jwt v3.2.2+incompatible github.com/golang/mock v1.6.0 github.com/google/go-cmp v0.5.8 @@ -88,6 +89,7 @@ require ( github.com/go-git/gcfg v1.5.0 // indirect github.com/go-git/go-billy/v5 v5.3.1 // indirect github.com/go-git/go-git/v5 v5.4.3-0.20210630082519-b4368b2a2ca4 // indirect + github.com/go-redis/redis v6.15.9+incompatible // indirect github.com/go-sql-driver/mysql v1.6.0 // indirect github.com/gobwas/glob v0.2.3 // indirect github.com/goccy/go-json v0.9.7 // indirect diff --git a/go.sum b/go.sum index eff1f4bf4..0bf71bd0c 100644 --- a/go.sum +++ b/go.sum @@ -325,6 +325,7 @@ github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHqu github.com/franela/goblin v0.0.0-20200105215937-c9ffbefa60db/go.mod h1:7dvUGVsVBjqR7JHJk0brhHOZYGmfBYOrK0ZhYMEtBr4= github.com/franela/goreq v0.0.0-20171204163338-bcd34c9993f8/go.mod h1:ZhphrRTfi2rbfLwlschooIH4+wKKDR4Pdxhh+TRoA20= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= +github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/fsnotify/fsnotify v1.5.1/go.mod h1:T3375wBYaZdLLcVNkcVbzGHY7f1l/uK5T5Ai1i3InKU= github.com/fsnotify/fsnotify v1.5.4 h1:jRbGcIw6P2Meqdwuo0H1p6JVLbL5DHKAKlYndzMwVZI= github.com/fullstorydev/grpcurl v1.8.0/go.mod h1:Mn2jWbdMrQGJQ8UD62uNyMumT2acsZUCkZIqFxsQf1o= @@ -366,15 +367,24 @@ github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9 github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= github.com/go-redis/redis v6.15.2+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA= +github.com/go-redis/redis v6.15.9+incompatible h1:K0pv1D7EQUjfyoMql+r/jZqCLizCGKFlFgcHWWmHQjg= github.com/go-redis/redis v6.15.9+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA= +github.com/go-redis/redis/v7 v7.4.0 h1:7obg6wUoj05T0EpY0o8B59S9w5yeMWql7sw2kwNW1x4= +github.com/go-redis/redis/v7 v7.4.0/go.mod h1:JDNMw23GTyLNC4GZu9njt15ctBQVn7xjRfnwdHj/Dcg= +github.com/go-redis/redis/v8 v8.11.4/go.mod h1:2Z2wHZXdQpCDXEGzqMockDpNyYvi2l4Pxt6RJr792+w= github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI= github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo= +github.com/go-redis/redis/v9 v9.0.0-beta.2 h1:ZSr84TsnQyKMAg8gnV+oawuQezeJR11/09THcWCQzr4= +github.com/go-redis/redis/v9 v9.0.0-beta.2/go.mod h1:Bldcd/M/bm9HbnNPi/LUtYBSD8ttcZYBMupwMXhdU0o= +github.com/go-redsync/redsync/v4 v4.7.1 h1:j5rmHCdN5qCEWp5oA2XEbGwtD4LZblqkhbcjCUsfNhs= +github.com/go-redsync/redsync/v4 v4.7.1/go.mod h1:IxV3sygNwjOERTXrj3XvNMSb1tgNgic8GvM8alwnWcM= github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= github.com/go-sql-driver/mysql v1.6.0 h1:BCTh4TKNUYmOmMUcQ3IipzF5prigylS7XXjEkfCHuOE= github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= +github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE= github.com/go-testfixtures/testfixtures/v3 v3.6.1 h1:n4Fv95Exp0D05G6l6CAZv22Ck1EJK0pa0TfPqE4ncSs= github.com/gobwas/glob v0.2.3 h1:A4xDbljILXROh+kObIiy5kIaPYD8e96x1tgBhUI5J+Y= github.com/gobwas/glob v0.2.3/go.mod h1:d3Ez4x06l9bZtSvzIay5+Yzi0fmZzPgnTbPcKjJAkT8= @@ -446,6 +456,8 @@ github.com/golang/snappy v0.0.2/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEW github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/gomodule/redigo v1.8.2 h1:H5XSIre1MB5NbPYFp+i1NBbb5qN1W8Y8YAQoAYbkm8k= +github.com/gomodule/redigo v1.8.2/go.mod h1:P9dn9mFrCBvWhGE1wpxx6fgq7BAeLBk+UUUzlpkBYO0= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.1 h1:gK4Kx5IaGY9CD5sPJ36FHiBJ6ZXl0kilRiiCj+jdYp4= @@ -491,6 +503,7 @@ github.com/google/pprof v0.0.0-20201023163331-3e6fc7fc9c4c/go.mod h1:kpwsk12EmLe github.com/google/pprof v0.0.0-20201203190320-1bf35d6f28c2/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/pprof v0.0.0-20210122040257-d980be63207e/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/pprof v0.0.0-20210226084205-cbba55b83ad5/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= +github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/pprof v0.0.0-20210601050228-01bbb1931b22/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/pprof v0.0.0-20210609004039-a478d1d731e9/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= @@ -871,7 +884,9 @@ github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646/go.mod h1:jpp1/29i3P1S github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/nishanths/predeclared v0.0.0-20200524104333-86fad755b4d3/go.mod h1:nt3d53pc1VYcphSCIaYAJtnPYnr3Zyn8fMq2wvPGPso= github.com/nkovacs/streamquote v1.0.0/go.mod h1:BN+NaZ2CmdKqUuTUXUEm9j95B2TRbpOWpxbJYzzgUsc= +github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= +github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= github.com/oklog/oklog v0.3.2/go.mod h1:FCV+B7mhrz4o+ueLpx+KqkyXRGMWOYEvfiXtdGtbWGs= github.com/oklog/run v1.0.0/go.mod h1:dlhp/R75TPv97u0XWUtDeV/lRKWPKSdTuV0TZvrmrQA= github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= @@ -884,12 +899,24 @@ github.com/oliamb/cutter v0.2.2/go.mod h1:4BenG2/4GuRBDbVm/OPahDVqbrOemzpPiG5mi1 github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.8.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/ginkgo v1.10.1/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.10.3/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk= +github.com/onsi/ginkgo v1.16.4/go.mod h1:dX+/inL/fNMqNlz0e9LfyB9TswhZpCVdJM/Z6Vvnwo0= github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= +github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU= +github.com/onsi/ginkgo/v2 v2.1.3/go.mod h1:vw5CSIxN1JObi/U8gcbwft7ZxR2dgaR70JSE3/PpL4c= +github.com/onsi/ginkgo/v2 v2.1.4/go.mod h1:um6tUpWM/cxCK3/FK8BXqEiUMUwRgSM4JXG47RKZmLU= github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= +github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= -github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE= +github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= +github.com/onsi/gomega v1.16.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY= +github.com/onsi/gomega v1.17.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY= +github.com/onsi/gomega v1.19.0/go.mod h1:LY+I3pBVzYsTBU1AnDwOSxaYi9WoWiqgwooUqq9yPro= +github.com/onsi/gomega v1.20.0 h1:8W0cWlwFkflGPLltQvLRB7ZVD5HuP6ng320w2IS245Q= +github.com/onsi/gomega v1.20.0/go.mod h1:DtrZpjmvpn2mPm4YWQa0/ALMDj9v4YxLgojwPeREyVo= github.com/op/go-logging v0.0.0-20160315200505-970db520ece7/go.mod h1:HzydrMdWErDVzsI23lYNej1Htcns9BCg93Dk0bBINWk= github.com/opentracing-contrib/go-observer v0.0.0-20170622124052-a52f23424492/go.mod h1:Ngi6UdF0k5OKD5t5wlmGhe/EDKPoUM3BXZSSfIuJbis= github.com/opentracing/basictracer-go v1.0.0/go.mod h1:QfBfYuafItcjQuMwinw9GhYKwFXS9KnPs5lxoYwgW74= @@ -1089,6 +1116,8 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/stvp/tempredis v0.0.0-20181119212430-b82af8480203 h1:QVqDTf3h2WHt08YuiTGPZLls0Wq99X9bWd0Q5ZSBesM= +github.com/stvp/tempredis v0.0.0-20181119212430-b82af8480203/go.mod h1:oqN97ltKNihBbwlX8dLpwxCl3+HnXKV/R0e+sRLd9C8= github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw= github.com/swaggest/assertjson v1.7.0 h1:SKw5Rn0LQs6UvmGrIdaKQbMR1R3ncXm5KNon+QJ7jtw= github.com/swaggest/jsonschema-go v0.3.40 h1:9EqQ9RvtdW69xfYODmyEKWOSZ12x5eiK+wGD2EVh/L4= @@ -1156,6 +1185,7 @@ github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= github.com/yuin/goldmark v1.4.0/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= +github.com/yuin/goldmark v1.4.1/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= github.com/yuin/goldmark v1.4.5/go.mod h1:rmuwmfZ0+bvzB24eSC//bk1R1Zp3hM0OXYv/G2LIilg= github.com/yuin/goldmark v1.4.13 h1:fVcFKWvrslecOb/tg+Cc05dkeYx540o0FuFt3nUVDoE= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= @@ -1265,6 +1295,7 @@ golang.org/x/crypto v0.0.0-20210513164829-c07d793c2f9a/go.mod h1:P+XmwS30IXTQdn5 golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20210817164053-32db794688a5/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.1.0 h1:MDRAIl0xIo9Io2xV565hzXHw3zVseKrJKodhohM5CjU= golang.org/x/crypto v0.1.0/go.mod h1:RecgLatLF4+eUMCP1PoPZQb+cVrJcOPbHkTkbkB9sbw= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -1306,6 +1337,7 @@ golang.org/x/mod v0.4.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.1/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.5.0/go.mod h1:5OXOZSfqPIIbmVBIIKWRFfZjPR0E5r58TLhUjH0a2Ro= +golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3/go.mod h1:3p9vT2HGsQu2K1YbXdKPJLVgG5VJdoTa1poYQBtP1AY= golang.org/x/mod v0.6.0 h1:b9gGHsz9/HhJ3HF5DHQytPpuwocVTChQJK3AvoLRD5I= golang.org/x/mod v0.6.0/go.mod h1:4mET923SAdbXp2ki8ey+zGs1SLqsuM2Y0uvdZR/fUNI= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -1345,6 +1377,7 @@ golang.org/x/net v0.0.0-20200421231249-e086a090c8fd/go.mod h1:qpuaurCH72eLCgpAm/ golang.org/x/net v0.0.0-20200501053045-e0ff5e5a1de5/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200506145744-7e3656a0809f/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200513185701-a91f0712d120/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= +golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200520182314-0ba52f642ac2/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= @@ -1360,14 +1393,18 @@ golang.org/x/net v0.0.0-20210316092652-d523dce5a7f4/go.mod h1:RBQZq4jEuRlivfhVLd golang.org/x/net v0.0.0-20210326060303-6b1517762897/go.mod h1:uSPa2vr4CLtc/ILN5odXGNXS6mhrKVzTaCXzk9m6W3k= golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= golang.org/x/net v0.0.0-20210410081132-afb366fc7cd1/go.mod h1:9tjilg8BloeKEkVJvy7fQ90B1CfIiPueXVOjqfkSzI8= +golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk= golang.org/x/net v0.0.0-20210503060351-7fd8e65b6420/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20210510120150-4163338589ed/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20210610132358-84b48f89b13b/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20210813160813-60bc85c4be6d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20211105192438-b53810dc28af/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= +golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= +golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/net v0.2.0 h1:sZfSu1wtKLGlWI4ZZayP0ck9Y73K1ynO6gqzTdBVdPU= golang.org/x/net v0.2.0/go.mod h1:KqCZLdyyvdV855qA2rE3GC2aiw5xGR5TEjj8smXukLY= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -1432,13 +1469,17 @@ golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20190730183949-1393eb018365/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190826190057-c7b8b68b1456/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190922100055-0a153f010e69/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190924154521-2837fb4f24fe/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191001151750-bb3f8db39f24/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191008105621-543471e840be/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20191010194322-b09406accb47/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191119060738-e882bf8e40c2/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191220142924-d4481acd189f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191228213918-04cbcbbfeed8/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -1469,6 +1510,7 @@ golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20201126233918-771906719818/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201201145000-ef89a241ccb3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210104204734-6f8348627aad/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210112080510-489259a85091/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210220050731-9a76102bfb43/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -1499,10 +1541,13 @@ golang.org/x/sys v0.0.0-20210902050250-f475640dd07b/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210908233432-aa78b53d3365/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20211019181941-9d821ace8654/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211124211545-fe61309f8881/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211205182925-97ca703d548d/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220319134239-a9b59b0215f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220422013727-9388b58f7150/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.2.0 h1:ljd4t30dBnAvMZaQCevtY0xLLD0A+bRZXbgLMLU1F/A= golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= @@ -1594,6 +1639,7 @@ golang.org/x/tools v0.0.0-20201110124207-079ba7bd75cd/go.mod h1:emZCQorbCU4vsT4f golang.org/x/tools v0.0.0-20201124115921-2c860bdd6e78/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20201201161351-ac6f37ff4c2a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20201208233053-a543418bbed2/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20210105154028-b0ab187a4818/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0= @@ -1603,6 +1649,7 @@ golang.org/x/tools v0.1.3/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.4/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.7/go.mod h1:LGqMHiF4EqQNHR1JncWGqT5BVaXmza+X+BDGol+dOxo= +golang.org/x/tools v0.1.10/go.mod h1:Uh6Zz+xoGYZom868N8YTex3t7RhtHDBrE8Gzo9bV56E= golang.org/x/tools v0.2.0 h1:G6AHpWxTMGY1KyEYoAQ5WTtIekUUvDNjan3ugu60JvE= golang.org/x/tools v0.2.0/go.mod h1:y4OqIKeOV/fWJetJ8bXPU1sEVniLMIyDAZWeHdV+NTA= golang.org/x/xerrors v0.0.0-20190410155217-1f06c39b4373/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/internal/api/controller/pullreq/controller.go b/internal/api/controller/pullreq/controller.go index b08f0049e..8de5abad9 100644 --- a/internal/api/controller/pullreq/controller.go +++ b/internal/api/controller/pullreq/controller.go @@ -18,6 +18,7 @@ import ( pullreqevents "github.com/harness/gitness/internal/events/pullreq" "github.com/harness/gitness/internal/store" "github.com/harness/gitness/internal/url" + "github.com/harness/gitness/lock" "github.com/harness/gitness/types" "github.com/harness/gitness/types/enum" @@ -36,6 +37,7 @@ type Controller struct { principalStore store.PrincipalStore gitRPCClient gitrpc.Interface eventReporter *pullreqevents.Reporter + mtxManager lock.MutexManager } func NewController( @@ -50,6 +52,7 @@ func NewController( principalStore store.PrincipalStore, gitRPCClient gitrpc.Interface, eventReporter *pullreqevents.Reporter, + mtxManager lock.MutexManager, ) *Controller { return &Controller{ db: db, @@ -63,6 +66,7 @@ func NewController( principalStore: principalStore, gitRPCClient: gitRPCClient, eventReporter: eventReporter, + mtxManager: mtxManager, } } diff --git a/internal/api/controller/pullreq/locks.go b/internal/api/controller/pullreq/locks.go new file mode 100644 index 000000000..c711f7d6a --- /dev/null +++ b/internal/api/controller/pullreq/locks.go @@ -0,0 +1,19 @@ +// Copyright 2022 Harness Inc. All rights reserved. +// Use of this source code is governed by the Polyform Free Trial License +// that can be found in the LICENSE.md file for this repository. + +package pullreq + +import ( + "strconv" + + "github.com/harness/gitness/lock" +) + +func (c *Controller) newMutexForPR(repoUID string, pr int64, options ...lock.Option) (lock.Mutex, error) { + key := repoUID + "/pulls" + if pr != 0 { + key += "/" + strconv.FormatInt(pr, 10) + } + return c.mtxManager.NewMutex(key, append(options, lock.WithNamespace("repo"))...) +} diff --git a/internal/api/controller/pullreq/merge.go b/internal/api/controller/pullreq/merge.go index d32a7e156..1111e95ae 100644 --- a/internal/api/controller/pullreq/merge.go +++ b/internal/api/controller/pullreq/merge.go @@ -14,7 +14,6 @@ import ( "github.com/harness/gitness/internal/api/usererror" "github.com/harness/gitness/internal/auth" pullreqevents "github.com/harness/gitness/internal/events/pullreq" - "github.com/harness/gitness/internal/store/database/dbtx" "github.com/harness/gitness/types" "github.com/harness/gitness/types/enum" @@ -29,7 +28,7 @@ type MergeInput struct { // Merge merges the pull request. // -//nolint:gocognit,funlen // no need to refactor +//nolint:funlen // no need to refactor func (c *Controller) Merge( ctx context.Context, session *auth.Session, @@ -50,66 +49,80 @@ func (c *Controller) Merge( } in.Method = method - now := time.Now().UnixMilli() - targetRepo, err := c.getRepoCheckAccess(ctx, session, repoRef, enum.PermissionRepoEdit) if err != nil { return types.MergeResponse{}, fmt.Errorf("failed to acquire access to target repo: %w", err) } - err = dbtx.New(c.db).WithTx(ctx, func(ctx context.Context) error { - // pesimistic lock for no other user can merge the same pr - pr, err = c.pullreqStore.FindByNumberWithLock(ctx, targetRepo.ID, pullreqNum) + // if two requests for merging comes at the same time then mutex will lock + // first one and second one will wait, when first one is done then second one + // continue with latest data from db with state merged and return error that + // pr is already merged. + mutex, err := c.newMutexForPR(targetRepo.GitUID, 0) // 0 means locks all PRs for this repo + if err != nil { + return types.MergeResponse{}, err + } + err = mutex.Lock(ctx) + if err != nil { + return types.MergeResponse{}, err + } + defer func() { + _ = mutex.Unlock(ctx) + }() + + pr, err = c.pullreqStore.FindByNumber(ctx, targetRepo.ID, pullreqNum) + if err != nil { + return types.MergeResponse{}, fmt.Errorf("failed to get pull request by number: %w", err) + } + + if pr.Merged != nil { + return types.MergeResponse{}, usererror.BadRequest("Pull request already merged") + } + + if pr.State != enum.PullReqStateOpen { + return types.MergeResponse{}, usererror.BadRequest("Pull request must be open") + } + + if pr.IsDraft { + return types.MergeResponse{}, usererror.BadRequest("Draft pull requests can't be merged. Clear the draft flag first.") + } + + sourceRepo := targetRepo + if pr.SourceRepoID != pr.TargetRepoID { + sourceRepo, err = c.repoStore.Find(ctx, pr.SourceRepoID) if err != nil { - return fmt.Errorf("failed to get pull request by number: %w", err) + return types.MergeResponse{}, fmt.Errorf("failed to get source repository: %w", err) } + } - if pr.Merged != nil { - return usererror.BadRequest("Pull request already merged") - } + var writeParams gitrpc.WriteParams + writeParams, err = controller.CreateRPCWriteParams(ctx, c.urlProvider, session, targetRepo) + if err != nil { + return types.MergeResponse{}, fmt.Errorf("failed to create RPC write params: %w", err) + } - if pr.State != enum.PullReqStateOpen { - return usererror.BadRequest("Pull request must be open") - } + // TODO: for forking merge title might be different? + mergeTitle := fmt.Sprintf("Merge branch '%s' of %s (#%d)", pr.SourceBranch, sourceRepo.Path, pr.Number) - if pr.IsDraft { - return usererror.BadRequest("Draft pull requests can't be merged. Clear the draft flag first.") - } + var mergeOutput gitrpc.MergeBranchOutput + mergeOutput, err = c.gitRPCClient.MergeBranch(ctx, &gitrpc.MergeBranchParams{ + WriteParams: writeParams, + BaseBranch: pr.TargetBranch, + HeadRepoUID: sourceRepo.GitUID, + HeadBranch: pr.SourceBranch, + Title: mergeTitle, + Message: "", + Force: in.Force, + DeleteHeadBranch: in.DeleteBranch, + }) + if err != nil { + return types.MergeResponse{}, err + } - sourceRepo := targetRepo - if pr.SourceRepoID != pr.TargetRepoID { - sourceRepo, err = c.repoStore.Find(ctx, pr.SourceRepoID) - if err != nil { - return fmt.Errorf("failed to get source repository: %w", err) - } - } - - var writeParams gitrpc.WriteParams - writeParams, err = controller.CreateRPCWriteParams(ctx, c.urlProvider, session, targetRepo) - if err != nil { - return fmt.Errorf("failed to create RPC write params: %w", err) - } - - // TODO: for forking merge title might be different? - mergeTitle := fmt.Sprintf("Merge branch '%s' of %s (#%d)", pr.SourceBranch, sourceRepo.Path, pr.Number) - - var mergeOutput gitrpc.MergeBranchOutput - mergeOutput, err = c.gitRPCClient.MergeBranch(ctx, &gitrpc.MergeBranchParams{ - WriteParams: writeParams, - BaseBranch: pr.TargetBranch, - HeadRepoUID: sourceRepo.GitUID, - HeadBranch: pr.SourceBranch, - Title: mergeTitle, - Message: "", - Force: in.Force, - DeleteHeadBranch: in.DeleteBranch, - }) - if err != nil { - return err - } - - activity = getMergeActivity(session, pr, in, sha) + activity = getMergeActivity(session, pr, in, sha) + pr, err = c.pullreqStore.UpdateOptLock(ctx, pr, func(pr *types.PullReq) error { + now := time.Now().UnixMilli() pr.MergeStrategy = &in.Method pr.Merged = &now pr.MergedBy = &session.Principal.ID @@ -117,16 +130,10 @@ func (c *Controller) Merge( pr.MergeBaseSHA = &mergeOutput.BaseSHA pr.MergeHeadSHA = &mergeOutput.HeadSHA - - err = c.pullreqStore.Update(ctx, pr) - if err != nil { - return fmt.Errorf("failed to update pull request: %w", err) - } - return nil }) if err != nil { - return types.MergeResponse{}, err + return types.MergeResponse{}, fmt.Errorf("failed to update pull request: %w", err) } err = c.writeActivity(ctx, pr, activity) diff --git a/internal/api/controller/pullreq/wire.go b/internal/api/controller/pullreq/wire.go index a4646c99b..a8b174d84 100644 --- a/internal/api/controller/pullreq/wire.go +++ b/internal/api/controller/pullreq/wire.go @@ -10,6 +10,7 @@ import ( pullreqevents "github.com/harness/gitness/internal/events/pullreq" "github.com/harness/gitness/internal/store" "github.com/harness/gitness/internal/url" + "github.com/harness/gitness/lock" "github.com/google/wire" "github.com/jmoiron/sqlx" @@ -24,10 +25,11 @@ func ProvideController(db *sqlx.DB, urlProvider *url.Provider, authorizer authz. pullReqStore store.PullReqStore, pullReqActivityStore store.PullReqActivityStore, pullReqReviewStore store.PullReqReviewStore, pullReqReviewerStore store.PullReqReviewerStore, repoStore store.RepoStore, principalStore store.PrincipalStore, - rpcClient gitrpc.Interface, eventReporter *pullreqevents.Reporter) *Controller { + rpcClient gitrpc.Interface, eventReporter *pullreqevents.Reporter, + mtxManager lock.MutexManager) *Controller { return NewController(db, urlProvider, authorizer, pullReqStore, pullReqActivityStore, pullReqReviewStore, pullReqReviewerStore, repoStore, principalStore, - rpcClient, eventReporter) + rpcClient, eventReporter, mtxManager) } diff --git a/lock/config.go b/lock/config.go new file mode 100644 index 000000000..3c1252096 --- /dev/null +++ b/lock/config.go @@ -0,0 +1,34 @@ +// Copyright 2022 Harness Inc. All rights reserved. +// Use of this source code is governed by the Polyform Free Trial License +// that can be found in the LICENSE.md file for this repository. + +package lock + +import "time" + +type Provider string + +const ( + MemoryProvider Provider = "inmemory" + RedisProvider Provider = "redis" +) + +// A DelayFunc is used to decide the amount of time to wait between retries. +type DelayFunc func(tries int) time.Duration + +type Config struct { + app string // app namespace prefix + namespace string + provider Provider + expiry time.Duration + + tries int + retryDelay time.Duration + delayFunc DelayFunc + + driftFactor float64 + timeoutFactor float64 + + genValueFunc func() (string, error) + value string +} diff --git a/lock/lock.go b/lock/lock.go new file mode 100644 index 000000000..b8ea1d9b2 --- /dev/null +++ b/lock/lock.go @@ -0,0 +1,65 @@ +// Copyright 2022 Harness Inc. All rights reserved. +// Use of this source code is governed by the Polyform Free Trial License +// that can be found in the LICENSE.md file for this repository. + +package lock + +import ( + "context" + "fmt" +) + +// KindError enum displays human readable message +// in error. +type KindError string + +const ( + LockHeld KindError = "lock already held" + LockNotHeld KindError = "lock not held" + ProviderError KindError = "lock provider error" + CannotLock KindError = "timeout while trying to acquire lock" + Context KindError = "context error while trying to acquire lock" + MaxRetriesExceeded KindError = "max retries exceeded to acquire lock" + GenerateTokenFailed KindError = "token generation failed" +) + +// Error is custom unique type for all type of errors. +type Error struct { + Kind KindError + Key string + Err error +} + +func NewError(kind KindError, key string, err error) *Error { + return &Error{ + Kind: kind, + Key: key, + Err: err, + } +} + +// Error implements error interface. +func (e Error) Error() string { + if e.Err != nil { + return fmt.Sprintf("%s on key %s with err: %v", e.Kind, e.Key, e.Err) + } + return fmt.Sprintf("%s on key %s", e.Kind, e.Key) +} + +// MutexManager describes a Distributed Lock Manager. +type MutexManager interface { + // NewMutex creates a mutex for the given key. The returned mutex is not held + // and must be acquired with a call to .Lock. + NewMutex(key string, options ...Option) (Mutex, error) +} + +type Mutex interface { + // Key returns the key to be locked. + Key() string + + // Lock acquires the lock. It fails with error if the lock is already held. + Lock(ctx context.Context) error + + // Unlock releases the lock. It fails with error if the lock is not currently held. + Unlock(ctx context.Context) error +} diff --git a/lock/memory.go b/lock/memory.go new file mode 100644 index 000000000..935a3be7e --- /dev/null +++ b/lock/memory.go @@ -0,0 +1,190 @@ +// Copyright 2022 Harness Inc. All rights reserved. +// Use of this source code is governed by the Polyform Free Trial License +// that can be found in the LICENSE.md file for this repository. + +package lock + +import ( + "context" + "crypto/rand" + "encoding/base64" + "sync" + "time" +) + +// InMemory is a local implementation of a MutexManager that it's intended to be used during development. +type InMemory struct { + config Config // force value copy + mutex sync.Mutex + keys map[string]inMemEntry +} + +// NewInMemory creates a new InMemory instance only used for development. +func NewInMemory(config Config) *InMemory { + keys := make(map[string]inMemEntry) + + return &InMemory{ + config: config, + keys: keys, + } +} + +// NewMutex creates a mutex for the given key. The returned mutex is not held +// and must be acquired with a call to .Lock. +func (m *InMemory) NewMutex(key string, options ...Option) (Mutex, error) { + var ( + token string + err error + ) + + // copy default values + config := m.config + + // set default delayFunc + config.delayFunc = func(i int) time.Duration { + return config.retryDelay + } + + // override config with custom options + for _, opt := range options { + opt.Apply(&config) + } + + // format key + key = formatKey(config.app, config.namespace, key) + + switch { + case config.value != "": + token = config.value + case config.genValueFunc != nil: + token, err = config.genValueFunc() + default: + token, err = randstr(32) + } + if err != nil { + return nil, NewError(GenerateTokenFailed, key, nil) + } + + lock := inMemMutex{ + expiry: config.expiry, + waitTime: 15 * time.Second, + tries: config.tries, + delayFunc: config.delayFunc, + provider: m, + key: key, + token: token, + } + + return &lock, nil +} + +func (m *InMemory) acquire(key, token string, ttl time.Duration) bool { + m.mutex.Lock() + defer m.mutex.Unlock() + + now := time.Now() + + entry, ok := m.keys[key] + if ok && entry.validUntil.After(now) { + return false + } + + m.keys[key] = inMemEntry{token, now.Add(ttl)} + + return true +} + +func (m *InMemory) release(key, token string) bool { + m.mutex.Lock() + defer m.mutex.Unlock() + + entry, ok := m.keys[key] + if !ok || entry.token != token { + return false + } + + delete(m.keys, key) + + return true +} + +type inMemEntry struct { + token string + validUntil time.Time +} + +type inMemMutex struct { + mutex sync.Mutex // Used while manipulating the internal state of the lock itself + + provider *InMemory + + expiry time.Duration + waitTime time.Duration + + tries int + delayFunc DelayFunc + + key string + token string // A random string used to safely release the lock + isHeld bool +} + +// Key returns the key to be locked. +func (l *inMemMutex) Key() string { + return l.key +} + +// Lock acquires the lock. It fails with error if the lock is already held. +func (l *inMemMutex) Lock(ctx context.Context) error { + l.mutex.Lock() + defer l.mutex.Unlock() + + if l.isHeld { + return NewError(LockHeld, l.key, nil) + } + + if l.provider.acquire(l.key, l.token, l.expiry) { + l.isHeld = true + return nil + } + + timeout := time.NewTimer(l.waitTime) + defer timeout.Stop() + + for i := 1; i <= l.tries; i++ { + select { + case <-ctx.Done(): + return NewError(Context, l.key, ctx.Err()) + case <-timeout.C: + return NewError(CannotLock, l.key, nil) + case <-time.After(l.delayFunc(i)): + if l.provider.acquire(l.key, l.token, l.expiry) { + l.isHeld = true + return nil + } + } + } + return NewError(MaxRetriesExceeded, l.key, nil) +} + +// Unlock releases the lock. It fails with error if the lock is not currently held. +func (l *inMemMutex) Unlock(_ context.Context) error { + l.mutex.Lock() + defer l.mutex.Unlock() + + if !l.isHeld || !l.provider.release(l.key, l.token) { + return NewError(LockNotHeld, l.key, nil) + } + + l.isHeld = false + return nil +} + +func randstr(size int) (string, error) { + buffer := make([]byte, size) + if _, err := rand.Read(buffer); err != nil { + return "", err + } + + return base64.URLEncoding.EncodeToString(buffer), nil +} diff --git a/lock/options.go b/lock/options.go new file mode 100644 index 000000000..44394022f --- /dev/null +++ b/lock/options.go @@ -0,0 +1,88 @@ +// Copyright 2022 Harness Inc. All rights reserved. +// Use of this source code is governed by the Polyform Free Trial License +// that can be found in the LICENSE.md file for this repository. + +package lock + +import ( + "time" +) + +// An Option configures a mutex. +type Option interface { + Apply(*Config) +} + +// OptionFunc is a function that configures a mutex. +type OptionFunc func(*Config) + +// Apply calls f(config). +func (f OptionFunc) Apply(config *Config) { + f(config) +} + +// WithNamespace returns an option that configures Mutex.ns. +func WithNamespace(ns string) Option { + return OptionFunc(func(m *Config) { + m.namespace = ns + }) +} + +// WithExpiry can be used to set the expiry of a mutex to the given value. +func WithExpiry(expiry time.Duration) Option { + return OptionFunc(func(m *Config) { + m.expiry = expiry + }) +} + +// WithTries can be used to set the number of times lock acquire is attempted. +func WithTries(tries int) Option { + return OptionFunc(func(m *Config) { + m.tries = tries + }) +} + +// WithRetryDelay can be used to set the amount of time to wait between retries. +func WithRetryDelay(delay time.Duration) Option { + return OptionFunc(func(m *Config) { + m.delayFunc = func(tries int) time.Duration { + return delay + } + }) +} + +// WithRetryDelayFunc can be used to override default delay behavior. +func WithRetryDelayFunc(delayFunc DelayFunc) Option { + return OptionFunc(func(m *Config) { + m.delayFunc = delayFunc + }) +} + +// WithDriftFactor can be used to set the clock drift factor. +func WithDriftFactor(factor float64) Option { + return OptionFunc(func(m *Config) { + m.driftFactor = factor + }) +} + +// WithTimeoutFactor can be used to set the timeout factor. +func WithTimeoutFactor(factor float64) Option { + return OptionFunc(func(m *Config) { + m.timeoutFactor = factor + }) +} + +// WithGenValueFunc can be used to set the custom value generator. +func WithGenValueFunc(genValueFunc func() (string, error)) Option { + return OptionFunc(func(m *Config) { + m.genValueFunc = genValueFunc + }) +} + +// WithValue can be used to assign the random value without having to call lock. +// This allows the ownership of a lock to be "transferred" and allows the lock to be unlocked from elsewhere. +func WithValue(v string) Option { + return OptionFunc(func(m *Config) { + m.value = v + }) +} diff --git a/lock/redis.go b/lock/redis.go new file mode 100644 index 000000000..3e6c1dff2 --- /dev/null +++ b/lock/redis.go @@ -0,0 +1,105 @@ +// Copyright 2022 Harness Inc. All rights reserved. +// Use of this source code is governed by the Polyform Free Trial License +// that can be found in the LICENSE.md file for this repository. + +package lock + +import ( + "context" + "errors" + + redislib "github.com/go-redis/redis/v8" + "github.com/go-redsync/redsync/v4" + "github.com/go-redsync/redsync/v4/redis/goredis/v8" +) + +// Redis wrapper for redsync. +type Redis struct { + config Config + rs *redsync.Redsync +} + +// NewRedis create an instance of redisync to be used to obtain a mutual exclusion +// lock. +func NewRedis(config Config, client redislib.UniversalClient) *Redis { + pool := goredis.NewPool(client) + return &Redis{ + config: config, + rs: redsync.New(pool), + } +} + +// Acquire new lock. +func (r *Redis) NewMutex(key string, options ...Option) (Mutex, error) { + // copy default values + config := r.config + // customize config + for _, opt := range options { + opt.Apply(&config) + } + + // convert to redis helper functions + args := make([]redsync.Option, 0, 8) + args = append(args, + redsync.WithExpiry(config.expiry), + redsync.WithTimeoutFactor(config.timeoutFactor), + redsync.WithTries(config.tries), + redsync.WithRetryDelay(config.retryDelay), + redsync.WithDriftFactor(config.driftFactor), + ) + + if config.delayFunc != nil { + args = append(args, redsync.WithRetryDelayFunc(redsync.DelayFunc(config.delayFunc))) + } + + if config.genValueFunc != nil { + args = append(args, redsync.WithGenValueFunc(config.genValueFunc)) + } + + uniqKey := formatKey(config.app, config.namespace, key) + mutex := r.rs.NewMutex(uniqKey, args...) + + return &RedisMutex{ + mutex: mutex, + }, nil +} + +type RedisMutex struct { + mutex *redsync.Mutex +} + +// Key returns the key to be locked. +func (l *RedisMutex) Key() string { + return l.mutex.Name() +} + +// Lock acquires the lock. It fails with error if the lock is already held. +func (l *RedisMutex) Lock(ctx context.Context) error { + err := l.mutex.LockContext(ctx) + if err != nil { + return translateRedisErr(err, l.Key()) + } + return nil +} + +// Unlock releases the lock. It fails with error if the lock is not currently held. +func (l *RedisMutex) Unlock(ctx context.Context) error { + _, err := l.mutex.UnlockContext(ctx) + if err != nil { + return translateRedisErr(err, l.Key()) + } + return nil +} + +func translateRedisErr(err error, key string) error { + var kind KindError + switch { + case errors.Is(err, redsync.ErrFailed): + kind = CannotLock + case errors.Is(err, redsync.ErrExtendFailed), errors.Is(err, &redsync.RedisError{}): + kind = ProviderError + case errors.Is(err, &redsync.ErrTaken{}), errors.Is(err, &redsync.ErrNodeTaken{}): + kind = LockHeld + } + return NewError(kind, key, err) +} diff --git a/lock/util.go b/lock/util.go new file mode 100644 index 000000000..6d3efdea5 --- /dev/null +++ b/lock/util.go @@ -0,0 +1,21 @@ +// Copyright 2022 Harness Inc. All rights reserved. +// Use of this source code is governed by the Polyform Free Trial License +// that can be found in the LICENSE.md file for this repository. + +package lock + +import "strings" + +func formatKey(app, ns, key string) string { + return app + ":" + ns + ":" + key +} + +func SplitKey(uniqKey string) (namespace, key string) { + parts := strings.Split(uniqKey, ":") + key = uniqKey + if len(parts) > 2 { + namespace = parts[1] + key = parts[2] + } + return +} diff --git a/lock/wire.go b/lock/wire.go new file mode 100644 index 000000000..708e85167 --- /dev/null +++ b/lock/wire.go @@ -0,0 +1,40 @@ +// Copyright 2022 Harness Inc. All rights reserved. +// Use of this source code is governed by the Polyform Free Trial License +// that can be found in the LICENSE.md file for this repository. + +package lock + +import ( + "github.com/harness/gitness/types" + + "github.com/go-redis/redis/v8" + "github.com/google/wire" +) + +var WireSet = wire.NewSet( + ProvideConfig, + ProvideMutexManager, +) + +func ProvideConfig(config *types.Config) Config { + return Config{ + app: config.Lock.AppNamespace, + namespace: config.Lock.DefaultNamespace, + provider: Provider(config.Lock.Provider), + expiry: config.Lock.Expiry, + tries: config.Lock.Tries, + retryDelay: config.Lock.RetryDelay, + driftFactor: config.Lock.DriftFactor, + timeoutFactor: config.Lock.TimeoutFactor, + } +} + +func ProvideMutexManager(config Config, client redis.UniversalClient) MutexManager { + switch config.provider { + case MemoryProvider: + return NewInMemory(config) + case RedisProvider: + return NewRedis(config, client) + } + return nil +} diff --git a/mocks/mock_client.go b/mocks/mock_client.go index 3216e04cb..fbd985bd4 100644 --- a/mocks/mock_client.go +++ b/mocks/mock_client.go @@ -8,9 +8,10 @@ import ( context "context" reflect "reflect" - gomock "github.com/golang/mock/gomock" user "github.com/harness/gitness/internal/api/controller/user" types "github.com/harness/gitness/types" + + gomock "github.com/golang/mock/gomock" ) // MockClient is a mock of Client interface. diff --git a/mocks/mock_store.go b/mocks/mock_store.go index 30b7ec550..8ad9973bf 100644 --- a/mocks/mock_store.go +++ b/mocks/mock_store.go @@ -8,9 +8,10 @@ import ( context "context" reflect "reflect" - gomock "github.com/golang/mock/gomock" types "github.com/harness/gitness/types" enum "github.com/harness/gitness/types/enum" + + gomock "github.com/golang/mock/gomock" ) // MockPrincipalStore is a mock of PrincipalStore interface. diff --git a/stream/redis_consumer.go b/stream/redis_consumer.go index 42ddf03b1..8a0a01dd7 100644 --- a/stream/redis_consumer.go +++ b/stream/redis_consumer.go @@ -19,7 +19,7 @@ import ( // RedisConsumer provides functionality to process Redis streams as part of a consumer group. type RedisConsumer struct { - rdb redis.Cmdable + rdb redis.UniversalClient // namespace specifies the namespace of the keys - any stream key will be prefixed with it namespace string // groupName specifies the name of the consumer group @@ -46,7 +46,7 @@ type RedisConsumer struct { // NewRedisConsumer creates new Redis stream consumer. Streams are read with XREADGROUP. // It returns channels of info messages and errors. The caller should not block on these channels for too long. // These channels are provided mainly for logging. -func NewRedisConsumer(rdb redis.Cmdable, namespace string, +func NewRedisConsumer(rdb redis.UniversalClient, namespace string, groupName string, consumerName string) (*RedisConsumer, error) { if groupName == "" { return nil, errors.New("groupName can't be empty") @@ -586,7 +586,7 @@ func (c *RedisConsumer) createGroupForAllStreams(ctx context.Context) error { return nil } -func createGroup(ctx context.Context, rdb redis.Cmdable, streamID string, groupName string) error { +func createGroup(ctx context.Context, rdb redis.UniversalClient, streamID string, groupName string) error { // Creates a new consumer group that starts receiving messages from now on. // Existing messges in the queue are ignored (we don't want to overload a group with old messages) // For more details of the XGROUPCREATE api see https://redis.io/commands/xgroup-create/ diff --git a/stream/redis_producer.go b/stream/redis_producer.go index c1628f555..869b2bfa7 100644 --- a/stream/redis_producer.go +++ b/stream/redis_producer.go @@ -12,7 +12,7 @@ import ( ) type RedisProducer struct { - rdb redis.Cmdable + rdb redis.UniversalClient // namespace defines the namespace of the stream keys - any stream key will be prefixed with it. namespace string // maxStreamLength defines the maximum number of entries in each stream (ring buffer). @@ -22,7 +22,7 @@ type RedisProducer struct { approxMaxStreamLength bool } -func NewRedisProducer(rdb redis.Cmdable, namespace string, +func NewRedisProducer(rdb redis.UniversalClient, namespace string, maxStreamLength int64, approxMaxStreamLength bool) *RedisProducer { return &RedisProducer{ rdb: rdb, diff --git a/types/config.go b/types/config.go index e0cf5db5e..bc3c270fc 100644 --- a/types/config.go +++ b/types/config.go @@ -130,6 +130,20 @@ type Config struct { ApproxMaxStreamLength bool `envconfig:"GITNESS_EVENTS_APPROX_MAX_STREAM_LENGTH" default:"true"` } + Lock struct { + // Provider is a name of distributed lock service like redis, memory, file etc... + Provider string `envconfig:"GITNESS_LOCK_PROVIDER" default:"inmemory"` + Expiry time.Duration `envconfig:"GITNESS_LOCK_EXPIRE" default:"8s"` + Tries int `envconfig:"GITNESS_LOCK_TRIES" default:"32"` + RetryDelay time.Duration `envconfig:"GITNESS_LOCK_RETRY_DELAY" default:"250ms"` + DriftFactor float64 `envconfig:"GITNESS_LOCK_DRIFT_FACTOR" default:"0.01"` + TimeoutFactor float64 `envconfig:"GITNESS_LOCK_TIMEOUT_FACTOR" default:"0.05"` + // AppNamespace is just service app prefix to avoid conflicts on key definition + AppNamespace string `envconfig:"GITNESS_LOCK_APP_NAMESPACE" default:"gitness"` + // DefaultNamespace is when mutex doesn't specify custom namespace for their keys + DefaultNamespace string `envconfig:"GITNESS_LOCK_DEFAULT_NAMESPACE" default:"default"` + } + Webhook struct { MaxRetryCount int64 `envconfig:"GITNESS_WEBHOOK_MAX_RETRY_COUNT" default:"3"` Concurrency int `envconfig:"GITNESS_WEBHOOK_CONCURRENCY" default:"4"`