[MISC] Cleanup stale `Webhook-Executions` & `Session Tokens` (#640)

pull/3405/head
Johannes Batzill 2023-10-03 17:54:18 +00:00 committed by Harness
parent bac1951bff
commit d83552f288
31 changed files with 523 additions and 121 deletions

View File

@ -16,7 +16,6 @@ package serviceaccount
import (
"context"
"fmt"
apiauth "github.com/harness/gitness/app/api/auth"
"github.com/harness/gitness/app/auth"
@ -37,12 +36,5 @@ func (c *Controller) Delete(ctx context.Context, session *auth.Session,
return err
}
// delete all tokens (okay if we fail after - user intends to delete service account anyway)
// TODO: cascading delete?
err = c.tokenStore.DeleteForPrincipal(ctx, sa.ID)
if err != nil {
return fmt.Errorf("failed to delete tokens for service account: %w", err)
}
return c.principalStore.DeleteServiceAccount(ctx, sa.ID)
}

View File

@ -50,12 +50,5 @@ func (c *Controller) Delete(ctx context.Context, session *auth.Session,
return err
}
// delete all tokens (okay if we fail after - user intended to be deleted anyway)
// TODO: cascading delete?
err = c.tokenStore.DeleteForPrincipal(ctx, user.ID)
if err != nil {
return fmt.Errorf("failed to delete tokens for user: %w", err)
}
return c.principalStore.DeleteUser(ctx, user.ID)
}

View File

@ -0,0 +1,132 @@
// Copyright 2023 Harness, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package cleanup
import (
"context"
"errors"
"fmt"
"time"
"github.com/harness/gitness/app/services/job"
"github.com/harness/gitness/app/store"
)
type Config struct {
WebhookExecutionsRetentionTime time.Duration
}
func (c *Config) Prepare() error {
if c == nil {
return errors.New("config is required")
}
if c.WebhookExecutionsRetentionTime <= 0 {
return errors.New("config.WebhookExecutionsRetentionTime has to be provided")
}
return nil
}
// Service is responsible for cleaning up data in db / git / ...
type Service struct {
config Config
scheduler *job.Scheduler
executor *job.Executor
webhookExecutionStore store.WebhookExecutionStore
tokenStore store.TokenStore
}
func NewService(
config Config,
scheduler *job.Scheduler,
executor *job.Executor,
webhookExecutionStore store.WebhookExecutionStore,
tokenStore store.TokenStore,
) (*Service, error) {
if err := config.Prepare(); err != nil {
return nil, fmt.Errorf("provided cleanup config is invalid: %w", err)
}
return &Service{
config: config,
scheduler: scheduler,
executor: executor,
webhookExecutionStore: webhookExecutionStore,
tokenStore: tokenStore,
}, nil
}
func (s *Service) Register(ctx context.Context) error {
if err := s.registerJobHandlers(); err != nil {
return fmt.Errorf("failed to register cleanup job handlers: %w", err)
}
if err := s.scheduleRecurringCleanupJobs(ctx); err != nil {
return fmt.Errorf("failed to schedule cleanup jobs: %w", err)
}
return nil
}
// scheduleRecurringCleanupJobs schedules the cleanup jobs.
func (s *Service) scheduleRecurringCleanupJobs(ctx context.Context) error {
err := s.scheduler.AddRecurring(
ctx,
jobTypeWebhookExecutions,
jobTypeWebhookExecutions,
jobCronWebhookExecutions,
jobMaxDurationWebhookExecutions,
)
if err != nil {
return fmt.Errorf("failed to schedule webhook executions job: %w", err)
}
err = s.scheduler.AddRecurring(
ctx,
jobTypeTokens,
jobTypeTokens,
jobCronTokens,
jobMaxDurationTokens,
)
if err != nil {
return fmt.Errorf("failed to schedule token job: %w", err)
}
return nil
}
// registerJobHandlers registers handlers for all cleanup jobs.
func (s *Service) registerJobHandlers() error {
if err := s.executor.Register(
jobTypeWebhookExecutions,
newWebhookExecutionsCleanupJob(
s.config.WebhookExecutionsRetentionTime,
s.webhookExecutionStore,
),
); err != nil {
return fmt.Errorf("failed to register job handler for webhook executions cleanup: %w", err)
}
if err := s.executor.Register(
jobTypeTokens,
newTokensCleanupJob(
s.tokenStore,
),
); err != nil {
return fmt.Errorf("failed to register job handler for token cleanup: %w", err)
}
return nil
}

View File

@ -0,0 +1,75 @@
// Copyright 2023 Harness, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package cleanup
import (
"context"
"fmt"
"time"
"github.com/harness/gitness/app/services/job"
"github.com/harness/gitness/app/store"
"github.com/harness/gitness/types/enum"
"github.com/rs/zerolog/log"
)
const (
jobTypeTokens = "gitness:cleanup:tokens"
//nolint:gosec
jobCronTokens = "42 */4 * * *" // At minute 42 past every 4th hour.
jobMaxDurationTokens = 1 * time.Minute
// tokenRetentionTime specifies the time for which session tokens are kept even after they expired.
// This ensures that users can still trace them after expiry for some time.
// NOTE: I don't expect this to change much, so make it a constant instead of exposing it via config.
tokenRetentionTime = 72 * time.Hour // 3d
)
type tokensCleanupJob struct {
tokenStore store.TokenStore
}
func newTokensCleanupJob(
tokenStore store.TokenStore,
) *tokensCleanupJob {
return &tokensCleanupJob{
tokenStore: tokenStore,
}
}
// Handle purges old token that are expired.
func (j *tokensCleanupJob) Handle(ctx context.Context, _ string, _ job.ProgressReporter) (string, error) {
// Don't remove PAT / SAT as they were explicitly created and are manged by user.
expiredBefore := time.Now().Add(-tokenRetentionTime)
log.Ctx(ctx).Info().Msgf(
"start purging expired tokens (expired before: %s)",
expiredBefore.Format(time.RFC3339Nano),
)
n, err := j.tokenStore.DeleteExpiredBefore(ctx, expiredBefore, []enum.TokenType{enum.TokenTypeSession})
if err != nil {
return "", fmt.Errorf("failed to delete expired tokens: %w", err)
}
result := "no expired tokens found"
if n > 0 {
result = fmt.Sprintf("deleted %d tokens", n)
}
log.Ctx(ctx).Info().Msg(result)
return result, nil
}

View File

@ -0,0 +1,73 @@
// Copyright 2023 Harness, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package cleanup
import (
"context"
"fmt"
"time"
"github.com/harness/gitness/app/services/job"
"github.com/harness/gitness/app/store"
"github.com/rs/zerolog/log"
)
const (
jobTypeWebhookExecutions = "gitness:cleanup:webhook-executions"
jobCronWebhookExecutions = "21 */4 * * *" // At minute 21 past every 4th hour.
jobMaxDurationWebhookExecutions = 1 * time.Minute
)
type webhookExecutionsCleanupJob struct {
retentionTime time.Duration
webhookExecutionStore store.WebhookExecutionStore
}
func newWebhookExecutionsCleanupJob(
retentionTime time.Duration,
webhookExecutionStore store.WebhookExecutionStore,
) *webhookExecutionsCleanupJob {
return &webhookExecutionsCleanupJob{
retentionTime: retentionTime,
webhookExecutionStore: webhookExecutionStore,
}
}
// Handle purges old webhook executions that are past the retention time.
func (j *webhookExecutionsCleanupJob) Handle(ctx context.Context, _ string, _ job.ProgressReporter) (string, error) {
olderThan := time.Now().Add(-j.retentionTime)
log.Ctx(ctx).Info().Msgf(
"start purging webhook executions older than %s (aka created before %s)",
j.retentionTime,
olderThan.Format(time.RFC3339Nano))
n, err := j.webhookExecutionStore.DeleteOld(ctx, olderThan)
if err != nil {
return "", fmt.Errorf("failed to delete old webhook executions: %w", err)
}
result := "no old webhook executions found"
if n > 0 {
result = fmt.Sprintf("deleted %d webhook executions", n)
}
log.Ctx(ctx).Info().Msg(result)
return result, nil
}

View File

@ -0,0 +1,42 @@
// Copyright 2023 Harness, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package cleanup
import (
"github.com/harness/gitness/app/services/job"
"github.com/harness/gitness/app/store"
"github.com/google/wire"
)
var WireSet = wire.NewSet(
ProvideService,
)
func ProvideService(
config Config,
scheduler *job.Scheduler,
executor *job.Executor,
webhookExecutionStore store.WebhookExecutionStore,
tokenStore store.TokenStore,
) (*Service, error) {
return NewService(
config,
scheduler,
executor,
webhookExecutionStore,
tokenStore,
)
}

View File

@ -41,9 +41,9 @@ type Scheduler struct {
pubsubService pubsub.PubSub
// configuration fields
instanceID string
maxRunning int
purgeMinOldAge time.Duration
instanceID string
maxRunning int
retentionTime time.Duration
// synchronization stuff
signal chan time.Time
@ -60,7 +60,7 @@ func NewScheduler(
pubsubService pubsub.PubSub,
instanceID string,
maxRunning int,
purgeMinOldAge time.Duration,
retentionTime time.Duration,
) (*Scheduler, error) {
if maxRunning < 1 {
maxRunning = 1
@ -71,9 +71,9 @@ func NewScheduler(
mxManager: mxManager,
pubsubService: pubsubService,
instanceID: instanceID,
maxRunning: maxRunning,
purgeMinOldAge: purgeMinOldAge,
instanceID: instanceID,
maxRunning: maxRunning,
retentionTime: retentionTime,
cancelJobMap: map[string]context.CancelFunc{},
}, nil
@ -750,7 +750,7 @@ func (s *Scheduler) registerNecessaryJobs() error {
return err
}
handlerPurge := newJobPurge(s.store, s.mxManager, s.purgeMinOldAge)
handlerPurge := newJobPurge(s.store, s.mxManager, s.retentionTime)
err = s.executor.Register(jobTypePurge, handlerPurge)
if err != nil {
return err

View File

@ -52,6 +52,6 @@ func ProvideScheduler(
pubsubService,
config.InstanceID,
config.BackgroundJobs.MaxRunning,
config.BackgroundJobs.PurgeFinishedOlderThan,
config.BackgroundJobs.RetentionTime,
)
}

View File

@ -15,6 +15,7 @@
package services
import (
"github.com/harness/gitness/app/services/cleanup"
"github.com/harness/gitness/app/services/job"
"github.com/harness/gitness/app/services/metric"
"github.com/harness/gitness/app/services/pullreq"
@ -34,6 +35,7 @@ type Services struct {
Trigger *trigger.Service
JobScheduler *job.Scheduler
MetricCollector *metric.Collector
Cleanup *cleanup.Service
}
func ProvideServices(
@ -42,6 +44,7 @@ func ProvideServices(
triggerSvc *trigger.Service,
jobScheduler *job.Scheduler,
metricCollector *metric.Collector,
cleanupSvc *cleanup.Service,
) Services {
return Services{
Webhook: webhooksSvc,
@ -49,5 +52,6 @@ func ProvideServices(
Trigger: triggerSvc,
JobScheduler: jobScheduler,
MetricCollector: metricCollector,
Cleanup: cleanupSvc,
}
}

View File

@ -236,8 +236,9 @@ type (
// Delete deletes the token with the given id.
Delete(ctx context.Context, id int64) error
// DeleteForPrincipal deletes all tokens for a specific principal
DeleteForPrincipal(ctx context.Context, principalID int64) error
// DeleteExpiredBefore deletes all tokens that expired before the provided time.
// If tokenTypes are provided, then only tokens of that type are deleted.
DeleteExpiredBefore(ctx context.Context, before time.Time, tknTypes []enum.TokenType) (int64, error)
// List returns a list of tokens of a specific type for a specific principal.
List(ctx context.Context, principalID int64, tokenType enum.TokenType) ([]*types.Token, error)
@ -409,6 +410,9 @@ type (
// Create creates a new webhook execution entry.
Create(ctx context.Context, hook *types.WebhookExecution) error
// DeleteOld removes all executions that are older than the provided time.
DeleteOld(ctx context.Context, olderThan time.Time) (int64, error)
// ListForWebhook lists the webhook executions for a given webhook id.
ListForWebhook(ctx context.Context, webhookID int64,
opts *types.WebhookExecutionFilter) ([]*types.WebhookExecution, error)

View File

@ -0,0 +1 @@
DROP INDEX webhook_executions_created;

View File

@ -0,0 +1 @@
CREATE INDEX webhook_executions_created ON webhook_executions(webhook_execution_created);

View File

@ -0,0 +1 @@
DROP INDEX tokens_type_expires_at;

View File

@ -0,0 +1 @@
CREATE INDEX tokens_type_expires_at ON tokens(token_type, token_expires_at);

View File

@ -0,0 +1 @@
DROP INDEX webhook_executions_created;

View File

@ -0,0 +1 @@
CREATE INDEX webhook_executions_created ON webhook_executions(webhook_execution_created);

View File

@ -0,0 +1 @@
DROP INDEX tokens_type_expires_at;

View File

@ -0,0 +1 @@
CREATE INDEX tokens_type_expires_at ON tokens(token_type, token_expires_at);

View File

@ -16,6 +16,8 @@ package database
import (
"context"
"fmt"
"time"
"github.com/harness/gitness/app/store"
"github.com/harness/gitness/store/database"
@ -23,6 +25,7 @@ import (
"github.com/harness/gitness/types"
"github.com/harness/gitness/types/enum"
"github.com/Masterminds/squirrel"
"github.com/jmoiron/sqlx"
)
@ -89,15 +92,39 @@ func (s *TokenStore) Delete(ctx context.Context, id int64) error {
return nil
}
// DeleteForPrincipal deletes all tokens for a specific principal.
func (s *TokenStore) DeleteForPrincipal(ctx context.Context, principalID int64) error {
db := dbtx.GetAccessor(ctx, s.db)
// DeleteExpiredBefore deletes all tokens that expired before the provided time.
// If tokenTypes are provided, then only tokens of that type are deleted.
func (s *TokenStore) DeleteExpiredBefore(
ctx context.Context,
before time.Time,
tknTypes []enum.TokenType,
) (int64, error) {
stmt := database.Builder.
Delete("tokens").
Where("token_expires_at < ?", before.UnixMilli())
if _, err := db.ExecContext(ctx, tokenDeleteForPrincipal, principalID); err != nil {
return database.ProcessSQLErrorf(err, "The delete query failed")
if len(tknTypes) > 0 {
stmt = stmt.Where(squirrel.Eq{"token_type": tknTypes})
}
return nil
sql, args, err := stmt.ToSql()
if err != nil {
return 0, fmt.Errorf("failed to convert delete token query to sql: %w", err)
}
db := dbtx.GetAccessor(ctx, s.db)
result, err := db.ExecContext(ctx, sql, args...)
if err != nil {
return 0, database.ProcessSQLErrorf(err, "failed to execute delete token query")
}
n, err := result.RowsAffected()
if err != nil {
return 0, database.ProcessSQLErrorf(err, "failed to get number of deleted tokens")
}
return n, nil
}
// Count returns a count of tokens of a specifc type for a specific principal.
@ -166,11 +193,6 @@ DELETE FROM tokens
WHERE token_id = $1
`
const tokenDeleteForPrincipal = `
DELETE FROM tokens
WHERE token_principal_id = $1
`
const tokenInsert = `
INSERT INTO tokens (
token_type

View File

@ -17,6 +17,7 @@ package database
import (
"context"
"fmt"
"time"
"github.com/harness/gitness/app/store"
"github.com/harness/gitness/store/database"
@ -157,6 +158,32 @@ func (s *WebhookExecutionStore) Create(ctx context.Context, execution *types.Web
return nil
}
// DeleteOld removes all executions that are older than the provided time.
func (s *WebhookExecutionStore) DeleteOld(ctx context.Context, olderThan time.Time) (int64, error) {
stmt := database.Builder.
Delete("webhook_executions").
Where("webhook_execution_created < ?", olderThan.UnixMilli())
sql, args, err := stmt.ToSql()
if err != nil {
return 0, fmt.Errorf("failed to convert delete executions query to sql: %w", err)
}
db := dbtx.GetAccessor(ctx, s.db)
result, err := db.ExecContext(ctx, sql, args...)
if err != nil {
return 0, database.ProcessSQLErrorf(err, "failed to execute delete executions query")
}
n, err := result.RowsAffected()
if err != nil {
return 0, database.ProcessSQLErrorf(err, "failed to get number of deleted executions")
}
return n, nil
}
// ListForWebhook lists the webhook executions for a given webhook id.
func (s *WebhookExecutionStore) ListForWebhook(ctx context.Context, webhookID int64,
opts *types.WebhookExecutionFilter) ([]*types.WebhookExecution, error) {

View File

@ -22,12 +22,14 @@ import (
"strings"
"unicode"
"github.com/harness/gitness/app/services/cleanup"
"github.com/harness/gitness/app/services/trigger"
"github.com/harness/gitness/app/services/webhook"
"github.com/harness/gitness/events"
"github.com/harness/gitness/gitrpc"
"github.com/harness/gitness/gitrpc/server"
"github.com/harness/gitness/lock"
"github.com/harness/gitness/pubsub"
"github.com/harness/gitness/store/database"
"github.com/harness/gitness/types"
@ -211,15 +213,14 @@ func ProvideGitRPCClientConfig() (gitrpc.Config, error) {
return config, nil
}
// ProvideEventsConfig loads the events config from the environment.
func ProvideEventsConfig() (events.Config, error) {
config := events.Config{}
err := envconfig.Process("", &config)
if err != nil {
return events.Config{}, fmt.Errorf("failed to load events config: %w", err)
// ProvideEventsConfig loads the events config from the main config.
func ProvideEventsConfig(config *types.Config) events.Config {
return events.Config{
Mode: config.Events.Mode,
Namespace: config.Events.Namespace,
MaxStreamLength: config.Events.MaxStreamLength,
ApproxMaxStreamLength: config.Events.ApproxMaxStreamLength,
}
return config, nil
}
// ProvideWebhookConfig loads the webhook service config from the main config.
@ -249,7 +250,7 @@ func ProvideLockConfig(config *types.Config) lock.Config {
return lock.Config{
App: config.Lock.AppNamespace,
Namespace: config.Lock.DefaultNamespace,
Provider: lock.Provider(config.Lock.Provider),
Provider: config.Lock.Provider,
Expiry: config.Lock.Expiry,
Tries: config.Lock.Tries,
RetryDelay: config.Lock.RetryDelay,
@ -257,3 +258,22 @@ func ProvideLockConfig(config *types.Config) lock.Config {
TimeoutFactor: config.Lock.TimeoutFactor,
}
}
// ProvidePubsubConfig loads the pubsub config from the main config.
func ProvidePubsubConfig(config *types.Config) pubsub.Config {
return pubsub.Config{
App: config.PubSub.AppNamespace,
Namespace: config.PubSub.DefaultNamespace,
Provider: config.PubSub.Provider,
HealthInterval: config.PubSub.HealthInterval,
SendTimeout: config.PubSub.SendTimeout,
ChannelSize: config.PubSub.ChannelSize,
}
}
// ProvideCleanupConfig loads the cleanup service config from the main config.
func ProvideCleanupConfig(config *types.Config) cleanup.Config {
return cleanup.Config{
WebhookExecutionsRetentionTime: config.Webhook.RetentionTime,
}
}

View File

@ -89,13 +89,17 @@ func (c *command) run(*kingpin.ParseContext) error {
g.Go(func() error {
// initialize metric collector
if system.services.MetricCollector != nil {
err := system.services.MetricCollector.Register(gCtx)
if err != nil {
if err := system.services.MetricCollector.Register(gCtx); err != nil {
log.Error().Err(err).Msg("failed to register metric collector")
return err
}
}
if err := system.services.Cleanup.Register(gCtx); err != nil {
log.Error().Err(err).Msg("failed to register cleanup service")
return err
}
return system.services.JobScheduler.Run(gCtx)
})

View File

@ -45,6 +45,7 @@ import (
"github.com/harness/gitness/app/router"
"github.com/harness/gitness/app/server"
"github.com/harness/gitness/app/services"
"github.com/harness/gitness/app/services/cleanup"
"github.com/harness/gitness/app/services/codecomments"
"github.com/harness/gitness/app/services/exporter"
"github.com/harness/gitness/app/services/importer"
@ -118,7 +119,10 @@ func initSystem(ctx context.Context, config *types.Config) (*cliserver.System, e
githook.WireSet,
cliserver.ProvideLockConfig,
lock.WireSet,
cliserver.ProvidePubsubConfig,
pubsub.WireSet,
cliserver.ProvideCleanupConfig,
cleanup.WireSet,
codecomments.WireSet,
job.WireSet,
gitrpccron.WireSet,

View File

@ -43,6 +43,7 @@ import (
"github.com/harness/gitness/app/router"
server2 "github.com/harness/gitness/app/server"
"github.com/harness/gitness/app/services"
"github.com/harness/gitness/app/services/cleanup"
"github.com/harness/gitness/app/services/codecomments"
"github.com/harness/gitness/app/services/exporter"
"github.com/harness/gitness/app/services/importer"
@ -119,7 +120,7 @@ func initSystem(ctx context.Context, config *types.Config) (*server.System, erro
return nil, err
}
jobStore := database.ProvideJobStore(db)
pubsubConfig := pubsub.ProvideConfig(config)
pubsubConfig := server.ProvidePubsubConfig(config)
universalClient, err := server.ProvideRedis(config)
if err != nil {
return nil, err
@ -175,10 +176,7 @@ func initSystem(ctx context.Context, config *types.Config) (*server.System, erro
pullReqReviewStore := database.ProvidePullReqReviewStore(db)
pullReqReviewerStore := database.ProvidePullReqReviewerStore(db, principalInfoCache)
pullReqFileViewStore := database.ProvidePullReqFileViewStore(db)
eventsConfig, err := server.ProvideEventsConfig()
if err != nil {
return nil, err
}
eventsConfig := server.ProvideEventsConfig(config)
eventsSystem, err := events.ProvideSystem(eventsConfig, universalClient)
if err != nil {
return nil, err
@ -257,7 +255,12 @@ func initSystem(ctx context.Context, config *types.Config) (*server.System, erro
if err != nil {
return nil, err
}
servicesServices := services.ProvideServices(webhookService, pullreqService, triggerService, jobScheduler, collector)
cleanupConfig := server.ProvideCleanupConfig(config)
cleanupService, err := cleanup.ProvideService(cleanupConfig, jobScheduler, executor, webhookExecutionStore, tokenStore)
if err != nil {
return nil, err
}
servicesServices := services.ProvideServices(webhookService, pullreqService, triggerService, jobScheduler, collector, cleanupService)
serverSystem := server.NewSystem(bootstrapBootstrap, serverServer, poller, grpcServer, pluginManager, cronManager, servicesServices)
return serverSystem, nil
}

View File

@ -49,10 +49,10 @@ const (
// Config defines the config of the events system.
type Config struct {
Mode Mode `envconfig:"GITNESS_EVENTS_MODE" default:"inmemory"`
Namespace string `envconfig:"GITNESS_EVENTS_NAMESPACE" default:"gitness"`
MaxStreamLength int64 `envconfig:"GITNESS_EVENTS_MAX_STREAM_LENGTH" default:"10000"`
ApproxMaxStreamLength bool `envconfig:"GITNESS_EVENTS_APPROX_MAX_STREAM_LENGTH" default:"true"`
Mode Mode
Namespace string
MaxStreamLength int64
ApproxMaxStreamLength bool
}
func (c *Config) Validate() error {

View File

@ -24,12 +24,12 @@ const (
)
type Config struct {
app string // app namespace prefix
namespace string
App string // app namespace prefix
Namespace string
provider Provider
Provider Provider
healthInterval time.Duration
sendTimeout time.Duration
channelSize int
HealthInterval time.Duration
SendTimeout time.Duration
ChannelSize int
}

View File

@ -37,11 +37,11 @@ type InMemory struct {
// NewInMemory create an instance of memory pubsub implementation.
func NewInMemory(options ...Option) *InMemory {
config := Config{
app: "app",
namespace: "default",
healthInterval: 3 * time.Second,
sendTimeout: 60,
channelSize: 100,
App: "app",
Namespace: "default",
HealthInterval: 3 * time.Second,
SendTimeout: 60,
ChannelSize: 100,
}
for _, f := range options {
@ -65,10 +65,10 @@ func (r *InMemory) Subscribe(
config := SubscribeConfig{
topics: make([]string, 0, 8),
app: r.config.app,
namespace: r.config.namespace,
sendTimeout: r.config.sendTimeout,
channelSize: r.config.channelSize,
app: r.config.App,
namespace: r.config.Namespace,
sendTimeout: r.config.SendTimeout,
channelSize: r.config.ChannelSize,
}
for _, f := range options {
@ -100,8 +100,8 @@ func (r *InMemory) Publish(ctx context.Context, topic string, payload []byte, op
return nil
}
pubConfig := PublishConfig{
app: r.config.app,
namespace: r.config.namespace,
app: r.config.App,
namespace: r.config.Namespace,
}
for _, f := range opts {
f.Apply(&pubConfig)

View File

@ -34,14 +34,14 @@ func (f OptionFunc) Apply(config *Config) {
// WithApp returns an option that set config app name.
func WithApp(value string) Option {
return OptionFunc(func(m *Config) {
m.app = value
m.App = value
})
}
// WithNamespace returns an option that set config namespace.
func WithNamespace(value string) Option {
return OptionFunc(func(m *Config) {
m.namespace = value
m.Namespace = value
})
}
@ -51,7 +51,7 @@ func WithNamespace(value string) Option {
// To disable health check, use zero interval.
func WithHealthCheckInterval(value time.Duration) Option {
return OptionFunc(func(m *Config) {
m.healthInterval = value
m.HealthInterval = value
})
}
@ -59,7 +59,7 @@ func WithHealthCheckInterval(value time.Duration) Option {
// the message is dropped.
func WithSendTimeout(value time.Duration) Option {
return OptionFunc(func(m *Config) {
m.sendTimeout = value
m.SendTimeout = value
})
}
@ -67,7 +67,7 @@ func WithSendTimeout(value time.Duration) Option {
// incoming messages.
func WithSize(value int) Option {
return OptionFunc(func(m *Config) {
m.channelSize = value
m.ChannelSize = value
})
}

View File

@ -35,11 +35,11 @@ type Redis struct {
// NewRedis create an instance of redis PubSub implementation.
func NewRedis(client redis.UniversalClient, options ...Option) *Redis {
config := Config{
app: "app",
namespace: "default",
healthInterval: 3 * time.Second,
sendTimeout: 60,
channelSize: 100,
App: "app",
Namespace: "default",
HealthInterval: 3 * time.Second,
SendTimeout: 60,
ChannelSize: 100,
}
for _, f := range options {
@ -64,11 +64,11 @@ func (r *Redis) Subscribe(
config := SubscribeConfig{
topics: make([]string, 0, 8),
app: r.config.app,
namespace: r.config.namespace,
healthInterval: r.config.healthInterval,
sendTimeout: r.config.sendTimeout,
channelSize: r.config.channelSize,
app: r.config.App,
namespace: r.config.Namespace,
healthInterval: r.config.HealthInterval,
sendTimeout: r.config.SendTimeout,
channelSize: r.config.ChannelSize,
}
for _, f := range options {
@ -98,8 +98,8 @@ func (r *Redis) Subscribe(
// Publish event topic to message broker with payload.
func (r *Redis) Publish(ctx context.Context, topic string, payload []byte, opts ...PublishOption) error {
pubConfig := PublishConfig{
app: r.config.app,
namespace: r.config.namespace,
app: r.config.App,
namespace: r.config.Namespace,
}
for _, f := range opts {
f.Apply(&pubConfig)

View File

@ -15,47 +15,33 @@
package pubsub
import (
"github.com/harness/gitness/types"
"github.com/go-redis/redis/v8"
"github.com/google/wire"
)
var WireSet = wire.NewSet(
ProvideConfig,
ProvidePubSub,
)
func ProvideConfig(config *types.Config) Config {
return Config{
app: config.PubSub.AppNamespace,
namespace: config.PubSub.DefaultNamespace,
provider: Provider(config.PubSub.Provider),
healthInterval: config.PubSub.HealthInterval,
sendTimeout: config.PubSub.SendTimeout,
channelSize: config.PubSub.ChannelSize,
}
}
func ProvidePubSub(config Config, client redis.UniversalClient) PubSub {
switch config.provider {
switch config.Provider {
case ProviderRedis:
return NewRedis(client,
WithApp(config.app),
WithNamespace(config.namespace),
WithHealthCheckInterval(config.healthInterval),
WithSendTimeout(config.sendTimeout),
WithSize(config.channelSize),
WithApp(config.App),
WithNamespace(config.Namespace),
WithHealthCheckInterval(config.HealthInterval),
WithSendTimeout(config.SendTimeout),
WithSize(config.ChannelSize),
)
case ProviderMemory:
fallthrough
default:
return NewInMemory(
WithApp(config.app),
WithNamespace(config.namespace),
WithHealthCheckInterval(config.healthInterval),
WithSendTimeout(config.sendTimeout),
WithSize(config.channelSize),
WithApp(config.App),
WithNamespace(config.Namespace),
WithHealthCheckInterval(config.HealthInterval),
WithSendTimeout(config.SendTimeout),
WithSize(config.ChannelSize),
)
}
}

View File

@ -16,6 +16,10 @@ package types
import (
"time"
"github.com/harness/gitness/events"
"github.com/harness/gitness/lock"
"github.com/harness/gitness/pubsub"
)
// Config stores the system configuration.
@ -197,9 +201,16 @@ type Config struct {
SentinelEndpoint string `envconfig:"GITNESS_REDIS_SENTINEL_ENDPOINT"`
}
Events struct {
Mode events.Mode `envconfig:"GITNESS_EVENTS_MODE" default:"inmemory"`
Namespace string `envconfig:"GITNESS_EVENTS_NAMESPACE" default:"gitness"`
MaxStreamLength int64 `envconfig:"GITNESS_EVENTS_MAX_STREAM_LENGTH" default:"10000"`
ApproxMaxStreamLength bool `envconfig:"GITNESS_EVENTS_APPROX_MAX_STREAM_LENGTH" default:"true"`
}
Lock struct {
// Provider is a name of distributed lock service like redis, memory, file etc...
Provider string `envconfig:"GITNESS_LOCK_PROVIDER" default:"inmemory"`
Provider lock.Provider `envconfig:"GITNESS_LOCK_PROVIDER" default:"inmemory"`
Expiry time.Duration `envconfig:"GITNESS_LOCK_EXPIRE" default:"8s"`
Tries int `envconfig:"GITNESS_LOCK_TRIES" default:"32"`
RetryDelay time.Duration `envconfig:"GITNESS_LOCK_RETRY_DELAY" default:"250ms"`
@ -213,7 +224,7 @@ type Config struct {
PubSub struct {
// Provider is a name of distributed lock service like redis, memory, file etc...
Provider string `envconfig:"GITNESS_PUBSUB_PROVIDER" default:"inmemory"`
Provider pubsub.Provider `envconfig:"GITNESS_PUBSUB_PROVIDER" default:"inmemory"`
// AppNamespace is just service app prefix to avoid conflicts on channel definition
AppNamespace string `envconfig:"GITNESS_PUBSUB_APP_NAMESPACE" default:"gitness"`
// DefaultNamespace is custom namespace for their channels
@ -227,9 +238,9 @@ type Config struct {
// MaxRunning is maximum number of jobs that can be running at once.
MaxRunning int `envconfig:"GITNESS_JOBS_MAX_RUNNING" default:"10"`
// PurgeFinishedOlderThan is duration after non-recurring,
// RetentionTime is the duration after which non-recurring,
// finished and failed jobs will be purged from the DB.
PurgeFinishedOlderThan time.Duration `envconfig:"GITNESS_JOBS_PURGE_FINISHED_OLDER_THAN" default:"120h"`
RetentionTime time.Duration `envconfig:"GITNESS_JOBS_RETENTION_TIME" default:"120h"` // 5 days
}
Webhook struct {
@ -243,6 +254,8 @@ type Config struct {
MaxRetries int `envconfig:"GITNESS_WEBHOOK_MAX_RETRIES" default:"3"`
AllowPrivateNetwork bool `envconfig:"GITNESS_WEBHOOK_ALLOW_PRIVATE_NETWORK" default:"false"`
AllowLoopback bool `envconfig:"GITNESS_WEBHOOK_ALLOW_LOOPBACK" default:"false"`
// RetentionTime is the duration after which webhook executions will be purged from the DB.
RetentionTime time.Duration `envconfig:"GITNESS_WEBHOOK_RETENTION_TIME" default:"168h"` // 7 days
}
Trigger struct {