mirror of https://github.com/harness/drone.git
[GITHA] Prepare `lock` package (#144)
parent
06067d4f6c
commit
d80fc15753
|
@ -14,6 +14,7 @@ import (
|
|||
"github.com/harness/gitness/gitrpc"
|
||||
"github.com/harness/gitness/gitrpc/server"
|
||||
"github.com/harness/gitness/internal/services/webhook"
|
||||
"github.com/harness/gitness/lock"
|
||||
"github.com/harness/gitness/types"
|
||||
|
||||
"github.com/kelseyhightower/envconfig"
|
||||
|
@ -155,3 +156,17 @@ func ProvideWebhookConfig() (webhook.Config, error) {
|
|||
|
||||
return config, nil
|
||||
}
|
||||
|
||||
// ProvideLockConfig generates the `lock` package config from the gitness config.
|
||||
func ProvideLockConfig(config *types.Config) lock.Config {
|
||||
return lock.Config{
|
||||
App: config.Lock.AppNamespace,
|
||||
Namespace: config.Lock.DefaultNamespace,
|
||||
Provider: lock.Provider(config.Lock.Provider),
|
||||
Expiry: config.Lock.Expiry,
|
||||
Tries: config.Lock.Tries,
|
||||
RetryDelay: config.Lock.RetryDelay,
|
||||
DriftFactor: config.Lock.DriftFactor,
|
||||
TimeoutFactor: config.Lock.TimeoutFactor,
|
||||
}
|
||||
}
|
||||
|
|
|
@ -83,6 +83,7 @@ func initSystem(ctx context.Context, config *types.Config) (*cliserver.System, e
|
|||
cliserver.ProvideWebhookConfig,
|
||||
webhook.WireSet,
|
||||
githook.WireSet,
|
||||
cliserver.ProvideLockConfig,
|
||||
lock.WireSet,
|
||||
pubsub.WireSet,
|
||||
codecomments.WireSet,
|
||||
|
|
|
@ -104,7 +104,7 @@ func initSystem(ctx context.Context, config *types.Config) (*server.System, erro
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
lockConfig := lock.ProvideConfig(config)
|
||||
lockConfig := server.ProvideLockConfig(config)
|
||||
mutexManager := lock.ProvideMutexManager(lockConfig, universalClient)
|
||||
migrator := codecomments.ProvideMigrator(gitrpcInterface)
|
||||
pullreqController := pullreq.ProvideController(db, provider, authorizer, pullReqStore, pullReqActivityStore, codeCommentView, pullReqReviewStore, pullReqReviewerStore, repoStore, principalStore, gitrpcInterface, reporter, mutexManager, migrator)
|
||||
|
|
|
@ -17,18 +17,18 @@ const (
|
|||
type DelayFunc func(tries int) time.Duration
|
||||
|
||||
type Config struct {
|
||||
app string // app namespace prefix
|
||||
namespace string
|
||||
provider Provider
|
||||
expiry time.Duration
|
||||
App string // app namespace prefix
|
||||
Namespace string
|
||||
Provider Provider
|
||||
Expiry time.Duration
|
||||
|
||||
tries int
|
||||
retryDelay time.Duration
|
||||
delayFunc DelayFunc
|
||||
Tries int
|
||||
RetryDelay time.Duration
|
||||
DelayFunc DelayFunc
|
||||
|
||||
driftFactor float64
|
||||
timeoutFactor float64
|
||||
DriftFactor float64
|
||||
TimeoutFactor float64
|
||||
|
||||
genValueFunc func() (string, error)
|
||||
value string
|
||||
GenValueFunc func() (string, error)
|
||||
Value string
|
||||
}
|
||||
|
|
|
@ -41,8 +41,8 @@ func (m *InMemory) NewMutex(key string, options ...Option) (Mutex, error) {
|
|||
config := m.config
|
||||
|
||||
// set default delayFunc
|
||||
config.delayFunc = func(i int) time.Duration {
|
||||
return config.retryDelay
|
||||
config.DelayFunc = func(i int) time.Duration {
|
||||
return config.RetryDelay
|
||||
}
|
||||
|
||||
// override config with custom options
|
||||
|
@ -51,13 +51,13 @@ func (m *InMemory) NewMutex(key string, options ...Option) (Mutex, error) {
|
|||
}
|
||||
|
||||
// format key
|
||||
key = formatKey(config.app, config.namespace, key)
|
||||
key = formatKey(config.App, config.Namespace, key)
|
||||
|
||||
switch {
|
||||
case config.value != "":
|
||||
token = config.value
|
||||
case config.genValueFunc != nil:
|
||||
token, err = config.genValueFunc()
|
||||
case config.Value != "":
|
||||
token = config.Value
|
||||
case config.GenValueFunc != nil:
|
||||
token, err = config.GenValueFunc()
|
||||
default:
|
||||
token, err = randstr(32)
|
||||
}
|
||||
|
@ -65,11 +65,18 @@ func (m *InMemory) NewMutex(key string, options ...Option) (Mutex, error) {
|
|||
return nil, NewError(GenerateTokenFailed, key, nil)
|
||||
}
|
||||
|
||||
// waitTime logic is similar to redis implementation:
|
||||
// https://github.com/go-redsync/redsync/blob/e1e5da6654c81a2069d6a360f1a31c21f05cd22d/mutex.go#LL81C4-L81C100
|
||||
waitTime := config.Expiry
|
||||
if config.TimeoutFactor > 0 {
|
||||
waitTime *= time.Duration(int64(float64(config.Expiry) * config.TimeoutFactor))
|
||||
}
|
||||
|
||||
lock := inMemMutex{
|
||||
expiry: config.expiry,
|
||||
waitTime: 15 * time.Second,
|
||||
tries: config.tries,
|
||||
delayFunc: config.delayFunc,
|
||||
expiry: config.Expiry,
|
||||
waitTime: waitTime,
|
||||
tries: config.Tries,
|
||||
delayFunc: config.DelayFunc,
|
||||
provider: m,
|
||||
key: key,
|
||||
token: token,
|
||||
|
|
|
@ -24,28 +24,28 @@ func (f OptionFunc) Apply(config *Config) {
|
|||
// WithNamespace returns an option that configures Mutex.ns.
|
||||
func WithNamespace(ns string) Option {
|
||||
return OptionFunc(func(m *Config) {
|
||||
m.namespace = ns
|
||||
m.Namespace = ns
|
||||
})
|
||||
}
|
||||
|
||||
// WithExpiry can be used to set the expiry of a mutex to the given value.
|
||||
func WithExpiry(expiry time.Duration) Option {
|
||||
return OptionFunc(func(m *Config) {
|
||||
m.expiry = expiry
|
||||
m.Expiry = expiry
|
||||
})
|
||||
}
|
||||
|
||||
// WithTries can be used to set the number of times lock acquire is attempted.
|
||||
func WithTries(tries int) Option {
|
||||
return OptionFunc(func(m *Config) {
|
||||
m.tries = tries
|
||||
m.Tries = tries
|
||||
})
|
||||
}
|
||||
|
||||
// WithRetryDelay can be used to set the amount of time to wait between retries.
|
||||
func WithRetryDelay(delay time.Duration) Option {
|
||||
return OptionFunc(func(m *Config) {
|
||||
m.delayFunc = func(tries int) time.Duration {
|
||||
m.DelayFunc = func(tries int) time.Duration {
|
||||
return delay
|
||||
}
|
||||
})
|
||||
|
@ -54,28 +54,28 @@ func WithRetryDelay(delay time.Duration) Option {
|
|||
// WithRetryDelayFunc can be used to override default delay behavior.
|
||||
func WithRetryDelayFunc(delayFunc DelayFunc) Option {
|
||||
return OptionFunc(func(m *Config) {
|
||||
m.delayFunc = delayFunc
|
||||
m.DelayFunc = delayFunc
|
||||
})
|
||||
}
|
||||
|
||||
// WithDriftFactor can be used to set the clock drift factor.
|
||||
func WithDriftFactor(factor float64) Option {
|
||||
return OptionFunc(func(m *Config) {
|
||||
m.driftFactor = factor
|
||||
m.DriftFactor = factor
|
||||
})
|
||||
}
|
||||
|
||||
// WithTimeoutFactor can be used to set the timeout factor.
|
||||
func WithTimeoutFactor(factor float64) Option {
|
||||
return OptionFunc(func(m *Config) {
|
||||
m.timeoutFactor = factor
|
||||
m.TimeoutFactor = factor
|
||||
})
|
||||
}
|
||||
|
||||
// WithGenValueFunc can be used to set the custom value generator.
|
||||
func WithGenValueFunc(genValueFunc func() (string, error)) Option {
|
||||
return OptionFunc(func(m *Config) {
|
||||
m.genValueFunc = genValueFunc
|
||||
m.GenValueFunc = genValueFunc
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -83,6 +83,6 @@ func WithGenValueFunc(genValueFunc func() (string, error)) Option {
|
|||
// This allows the ownership of a lock to be "transferred" and allows the lock to be unlocked from elsewhere.
|
||||
func WithValue(v string) Option {
|
||||
return OptionFunc(func(m *Config) {
|
||||
m.value = v
|
||||
m.Value = v
|
||||
})
|
||||
}
|
||||
|
|
|
@ -41,22 +41,22 @@ func (r *Redis) NewMutex(key string, options ...Option) (Mutex, error) {
|
|||
// convert to redis helper functions
|
||||
args := make([]redsync.Option, 0, 8)
|
||||
args = append(args,
|
||||
redsync.WithExpiry(config.expiry),
|
||||
redsync.WithTimeoutFactor(config.timeoutFactor),
|
||||
redsync.WithTries(config.tries),
|
||||
redsync.WithRetryDelay(config.retryDelay),
|
||||
redsync.WithDriftFactor(config.driftFactor),
|
||||
redsync.WithExpiry(config.Expiry),
|
||||
redsync.WithTimeoutFactor(config.TimeoutFactor),
|
||||
redsync.WithTries(config.Tries),
|
||||
redsync.WithRetryDelay(config.RetryDelay),
|
||||
redsync.WithDriftFactor(config.DriftFactor),
|
||||
)
|
||||
|
||||
if config.delayFunc != nil {
|
||||
args = append(args, redsync.WithRetryDelayFunc(redsync.DelayFunc(config.delayFunc)))
|
||||
if config.DelayFunc != nil {
|
||||
args = append(args, redsync.WithRetryDelayFunc(redsync.DelayFunc(config.DelayFunc)))
|
||||
}
|
||||
|
||||
if config.genValueFunc != nil {
|
||||
args = append(args, redsync.WithGenValueFunc(config.genValueFunc))
|
||||
if config.GenValueFunc != nil {
|
||||
args = append(args, redsync.WithGenValueFunc(config.GenValueFunc))
|
||||
}
|
||||
|
||||
uniqKey := formatKey(config.app, config.namespace, key)
|
||||
uniqKey := formatKey(config.App, config.Namespace, key)
|
||||
mutex := r.rs.NewMutex(uniqKey, args...)
|
||||
|
||||
return &RedisMutex{
|
||||
|
|
18
lock/wire.go
18
lock/wire.go
|
@ -5,32 +5,16 @@
|
|||
package lock
|
||||
|
||||
import (
|
||||
"github.com/harness/gitness/types"
|
||||
|
||||
"github.com/go-redis/redis/v8"
|
||||
"github.com/google/wire"
|
||||
)
|
||||
|
||||
var WireSet = wire.NewSet(
|
||||
ProvideConfig,
|
||||
ProvideMutexManager,
|
||||
)
|
||||
|
||||
func ProvideConfig(config *types.Config) Config {
|
||||
return Config{
|
||||
app: config.Lock.AppNamespace,
|
||||
namespace: config.Lock.DefaultNamespace,
|
||||
provider: Provider(config.Lock.Provider),
|
||||
expiry: config.Lock.Expiry,
|
||||
tries: config.Lock.Tries,
|
||||
retryDelay: config.Lock.RetryDelay,
|
||||
driftFactor: config.Lock.DriftFactor,
|
||||
timeoutFactor: config.Lock.TimeoutFactor,
|
||||
}
|
||||
}
|
||||
|
||||
func ProvideMutexManager(config Config, client redis.UniversalClient) MutexManager {
|
||||
switch config.provider {
|
||||
switch config.Provider {
|
||||
case MemoryProvider:
|
||||
return NewInMemory(config)
|
||||
case RedisProvider:
|
||||
|
|
Loading…
Reference in New Issue