From d83552f288adbb7fb053aaf98c3da2cae1c9d70b Mon Sep 17 00:00:00 2001 From: Johannes Batzill Date: Tue, 3 Oct 2023 17:54:18 +0000 Subject: [PATCH] [MISC] Cleanup stale `Webhook-Executions` & `Session Tokens` (#640) --- app/api/controller/serviceaccount/delete.go | 8 -- app/api/controller/user/delete.go | 7 - app/services/cleanup/service.go | 132 ++++++++++++++++++ app/services/cleanup/tokens.go | 75 ++++++++++ app/services/cleanup/webhook_executions.go | 73 ++++++++++ app/services/cleanup/wire.go | 42 ++++++ app/services/job/scheduler.go | 16 +-- app/services/job/wire.go | 2 +- app/services/wire.go | 4 + app/store/database.go | 8 +- ..._index_webhook_executions_created.down.sql | 1 + ...te_index_webhook_executions_created.up.sql | 1 + ...eate_index_tokens_type_expires_at.down.sql | 1 + ...create_index_tokens_type_expires_at.up.sql | 1 + ..._index_webhook_executions_created.down.sql | 1 + ...te_index_webhook_executions_created.up.sql | 1 + ...eate_index_tokens_type_expires_at.down.sql | 1 + ...create_index_tokens_type_expires_at.up.sql | 1 + app/store/database/token.go | 44 ++++-- app/store/database/webhook_execution.go | 27 ++++ cli/server/config.go | 38 +++-- cli/server/server.go | 8 +- cmd/gitness/wire.go | 4 + cmd/gitness/wire_gen.go | 15 +- events/events.go | 8 +- pubsub/config.go | 12 +- pubsub/inmem.go | 22 +-- pubsub/options.go | 10 +- pubsub/redis.go | 24 ++-- pubsub/wire.go | 36 ++--- types/config.go | 21 ++- 31 files changed, 523 insertions(+), 121 deletions(-) create mode 100644 app/services/cleanup/service.go create mode 100644 app/services/cleanup/tokens.go create mode 100644 app/services/cleanup/webhook_executions.go create mode 100644 app/services/cleanup/wire.go create mode 100644 app/store/database/migrate/postgres/0034_create_index_webhook_executions_created.down.sql create mode 100644 app/store/database/migrate/postgres/0034_create_index_webhook_executions_created.up.sql create mode 100644 app/store/database/migrate/postgres/0035_create_index_tokens_type_expires_at.down.sql create mode 100644 app/store/database/migrate/postgres/0035_create_index_tokens_type_expires_at.up.sql create mode 100644 app/store/database/migrate/sqlite/0034_create_index_webhook_executions_created.down.sql create mode 100644 app/store/database/migrate/sqlite/0034_create_index_webhook_executions_created.up.sql create mode 100644 app/store/database/migrate/sqlite/0035_create_index_tokens_type_expires_at.down.sql create mode 100644 app/store/database/migrate/sqlite/0035_create_index_tokens_type_expires_at.up.sql diff --git a/app/api/controller/serviceaccount/delete.go b/app/api/controller/serviceaccount/delete.go index 1eb12f53f..6c22e42ea 100644 --- a/app/api/controller/serviceaccount/delete.go +++ b/app/api/controller/serviceaccount/delete.go @@ -16,7 +16,6 @@ package serviceaccount import ( "context" - "fmt" apiauth "github.com/harness/gitness/app/api/auth" "github.com/harness/gitness/app/auth" @@ -37,12 +36,5 @@ func (c *Controller) Delete(ctx context.Context, session *auth.Session, return err } - // delete all tokens (okay if we fail after - user intends to delete service account anyway) - // TODO: cascading delete? - err = c.tokenStore.DeleteForPrincipal(ctx, sa.ID) - if err != nil { - return fmt.Errorf("failed to delete tokens for service account: %w", err) - } - return c.principalStore.DeleteServiceAccount(ctx, sa.ID) } diff --git a/app/api/controller/user/delete.go b/app/api/controller/user/delete.go index 9cd9962ea..a07bb1184 100644 --- a/app/api/controller/user/delete.go +++ b/app/api/controller/user/delete.go @@ -50,12 +50,5 @@ func (c *Controller) Delete(ctx context.Context, session *auth.Session, return err } - // delete all tokens (okay if we fail after - user intended to be deleted anyway) - // TODO: cascading delete? - err = c.tokenStore.DeleteForPrincipal(ctx, user.ID) - if err != nil { - return fmt.Errorf("failed to delete tokens for user: %w", err) - } - return c.principalStore.DeleteUser(ctx, user.ID) } diff --git a/app/services/cleanup/service.go b/app/services/cleanup/service.go new file mode 100644 index 000000000..e6afa135a --- /dev/null +++ b/app/services/cleanup/service.go @@ -0,0 +1,132 @@ +// Copyright 2023 Harness, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cleanup + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/harness/gitness/app/services/job" + "github.com/harness/gitness/app/store" +) + +type Config struct { + WebhookExecutionsRetentionTime time.Duration +} + +func (c *Config) Prepare() error { + if c == nil { + return errors.New("config is required") + } + if c.WebhookExecutionsRetentionTime <= 0 { + return errors.New("config.WebhookExecutionsRetentionTime has to be provided") + } + return nil +} + +// Service is responsible for cleaning up data in db / git / ... +type Service struct { + config Config + scheduler *job.Scheduler + executor *job.Executor + webhookExecutionStore store.WebhookExecutionStore + tokenStore store.TokenStore +} + +func NewService( + config Config, + scheduler *job.Scheduler, + executor *job.Executor, + webhookExecutionStore store.WebhookExecutionStore, + tokenStore store.TokenStore, +) (*Service, error) { + if err := config.Prepare(); err != nil { + return nil, fmt.Errorf("provided cleanup config is invalid: %w", err) + } + + return &Service{ + config: config, + + scheduler: scheduler, + executor: executor, + webhookExecutionStore: webhookExecutionStore, + tokenStore: tokenStore, + }, nil +} + +func (s *Service) Register(ctx context.Context) error { + if err := s.registerJobHandlers(); err != nil { + return fmt.Errorf("failed to register cleanup job handlers: %w", err) + } + + if err := s.scheduleRecurringCleanupJobs(ctx); err != nil { + return fmt.Errorf("failed to schedule cleanup jobs: %w", err) + } + + return nil +} + +// scheduleRecurringCleanupJobs schedules the cleanup jobs. +func (s *Service) scheduleRecurringCleanupJobs(ctx context.Context) error { + err := s.scheduler.AddRecurring( + ctx, + jobTypeWebhookExecutions, + jobTypeWebhookExecutions, + jobCronWebhookExecutions, + jobMaxDurationWebhookExecutions, + ) + if err != nil { + return fmt.Errorf("failed to schedule webhook executions job: %w", err) + } + + err = s.scheduler.AddRecurring( + ctx, + jobTypeTokens, + jobTypeTokens, + jobCronTokens, + jobMaxDurationTokens, + ) + if err != nil { + return fmt.Errorf("failed to schedule token job: %w", err) + } + + return nil +} + +// registerJobHandlers registers handlers for all cleanup jobs. +func (s *Service) registerJobHandlers() error { + if err := s.executor.Register( + jobTypeWebhookExecutions, + newWebhookExecutionsCleanupJob( + s.config.WebhookExecutionsRetentionTime, + s.webhookExecutionStore, + ), + ); err != nil { + return fmt.Errorf("failed to register job handler for webhook executions cleanup: %w", err) + } + + if err := s.executor.Register( + jobTypeTokens, + newTokensCleanupJob( + s.tokenStore, + ), + ); err != nil { + return fmt.Errorf("failed to register job handler for token cleanup: %w", err) + } + + return nil +} diff --git a/app/services/cleanup/tokens.go b/app/services/cleanup/tokens.go new file mode 100644 index 000000000..448d41f9d --- /dev/null +++ b/app/services/cleanup/tokens.go @@ -0,0 +1,75 @@ +// Copyright 2023 Harness, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cleanup + +import ( + "context" + "fmt" + "time" + + "github.com/harness/gitness/app/services/job" + "github.com/harness/gitness/app/store" + "github.com/harness/gitness/types/enum" + + "github.com/rs/zerolog/log" +) + +const ( + jobTypeTokens = "gitness:cleanup:tokens" + //nolint:gosec + jobCronTokens = "42 */4 * * *" // At minute 42 past every 4th hour. + jobMaxDurationTokens = 1 * time.Minute + + // tokenRetentionTime specifies the time for which session tokens are kept even after they expired. + // This ensures that users can still trace them after expiry for some time. + // NOTE: I don't expect this to change much, so make it a constant instead of exposing it via config. + tokenRetentionTime = 72 * time.Hour // 3d +) + +type tokensCleanupJob struct { + tokenStore store.TokenStore +} + +func newTokensCleanupJob( + tokenStore store.TokenStore, +) *tokensCleanupJob { + return &tokensCleanupJob{ + tokenStore: tokenStore, + } +} + +// Handle purges old token that are expired. +func (j *tokensCleanupJob) Handle(ctx context.Context, _ string, _ job.ProgressReporter) (string, error) { + // Don't remove PAT / SAT as they were explicitly created and are manged by user. + expiredBefore := time.Now().Add(-tokenRetentionTime) + log.Ctx(ctx).Info().Msgf( + "start purging expired tokens (expired before: %s)", + expiredBefore.Format(time.RFC3339Nano), + ) + + n, err := j.tokenStore.DeleteExpiredBefore(ctx, expiredBefore, []enum.TokenType{enum.TokenTypeSession}) + if err != nil { + return "", fmt.Errorf("failed to delete expired tokens: %w", err) + } + + result := "no expired tokens found" + if n > 0 { + result = fmt.Sprintf("deleted %d tokens", n) + } + + log.Ctx(ctx).Info().Msg(result) + + return result, nil +} diff --git a/app/services/cleanup/webhook_executions.go b/app/services/cleanup/webhook_executions.go new file mode 100644 index 000000000..a7de55847 --- /dev/null +++ b/app/services/cleanup/webhook_executions.go @@ -0,0 +1,73 @@ +// Copyright 2023 Harness, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cleanup + +import ( + "context" + "fmt" + "time" + + "github.com/harness/gitness/app/services/job" + "github.com/harness/gitness/app/store" + + "github.com/rs/zerolog/log" +) + +const ( + jobTypeWebhookExecutions = "gitness:cleanup:webhook-executions" + jobCronWebhookExecutions = "21 */4 * * *" // At minute 21 past every 4th hour. + jobMaxDurationWebhookExecutions = 1 * time.Minute +) + +type webhookExecutionsCleanupJob struct { + retentionTime time.Duration + + webhookExecutionStore store.WebhookExecutionStore +} + +func newWebhookExecutionsCleanupJob( + retentionTime time.Duration, + webhookExecutionStore store.WebhookExecutionStore, +) *webhookExecutionsCleanupJob { + return &webhookExecutionsCleanupJob{ + retentionTime: retentionTime, + + webhookExecutionStore: webhookExecutionStore, + } +} + +// Handle purges old webhook executions that are past the retention time. +func (j *webhookExecutionsCleanupJob) Handle(ctx context.Context, _ string, _ job.ProgressReporter) (string, error) { + olderThan := time.Now().Add(-j.retentionTime) + + log.Ctx(ctx).Info().Msgf( + "start purging webhook executions older than %s (aka created before %s)", + j.retentionTime, + olderThan.Format(time.RFC3339Nano)) + + n, err := j.webhookExecutionStore.DeleteOld(ctx, olderThan) + if err != nil { + return "", fmt.Errorf("failed to delete old webhook executions: %w", err) + } + + result := "no old webhook executions found" + if n > 0 { + result = fmt.Sprintf("deleted %d webhook executions", n) + } + + log.Ctx(ctx).Info().Msg(result) + + return result, nil +} diff --git a/app/services/cleanup/wire.go b/app/services/cleanup/wire.go new file mode 100644 index 000000000..c9481424a --- /dev/null +++ b/app/services/cleanup/wire.go @@ -0,0 +1,42 @@ +// Copyright 2023 Harness, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cleanup + +import ( + "github.com/harness/gitness/app/services/job" + "github.com/harness/gitness/app/store" + + "github.com/google/wire" +) + +var WireSet = wire.NewSet( + ProvideService, +) + +func ProvideService( + config Config, + scheduler *job.Scheduler, + executor *job.Executor, + webhookExecutionStore store.WebhookExecutionStore, + tokenStore store.TokenStore, +) (*Service, error) { + return NewService( + config, + scheduler, + executor, + webhookExecutionStore, + tokenStore, + ) +} diff --git a/app/services/job/scheduler.go b/app/services/job/scheduler.go index 049d8994f..17927ad3f 100644 --- a/app/services/job/scheduler.go +++ b/app/services/job/scheduler.go @@ -41,9 +41,9 @@ type Scheduler struct { pubsubService pubsub.PubSub // configuration fields - instanceID string - maxRunning int - purgeMinOldAge time.Duration + instanceID string + maxRunning int + retentionTime time.Duration // synchronization stuff signal chan time.Time @@ -60,7 +60,7 @@ func NewScheduler( pubsubService pubsub.PubSub, instanceID string, maxRunning int, - purgeMinOldAge time.Duration, + retentionTime time.Duration, ) (*Scheduler, error) { if maxRunning < 1 { maxRunning = 1 @@ -71,9 +71,9 @@ func NewScheduler( mxManager: mxManager, pubsubService: pubsubService, - instanceID: instanceID, - maxRunning: maxRunning, - purgeMinOldAge: purgeMinOldAge, + instanceID: instanceID, + maxRunning: maxRunning, + retentionTime: retentionTime, cancelJobMap: map[string]context.CancelFunc{}, }, nil @@ -750,7 +750,7 @@ func (s *Scheduler) registerNecessaryJobs() error { return err } - handlerPurge := newJobPurge(s.store, s.mxManager, s.purgeMinOldAge) + handlerPurge := newJobPurge(s.store, s.mxManager, s.retentionTime) err = s.executor.Register(jobTypePurge, handlerPurge) if err != nil { return err diff --git a/app/services/job/wire.go b/app/services/job/wire.go index 8dfd4311f..48f0c6ade 100644 --- a/app/services/job/wire.go +++ b/app/services/job/wire.go @@ -52,6 +52,6 @@ func ProvideScheduler( pubsubService, config.InstanceID, config.BackgroundJobs.MaxRunning, - config.BackgroundJobs.PurgeFinishedOlderThan, + config.BackgroundJobs.RetentionTime, ) } diff --git a/app/services/wire.go b/app/services/wire.go index 69a1970d5..4c21887a9 100644 --- a/app/services/wire.go +++ b/app/services/wire.go @@ -15,6 +15,7 @@ package services import ( + "github.com/harness/gitness/app/services/cleanup" "github.com/harness/gitness/app/services/job" "github.com/harness/gitness/app/services/metric" "github.com/harness/gitness/app/services/pullreq" @@ -34,6 +35,7 @@ type Services struct { Trigger *trigger.Service JobScheduler *job.Scheduler MetricCollector *metric.Collector + Cleanup *cleanup.Service } func ProvideServices( @@ -42,6 +44,7 @@ func ProvideServices( triggerSvc *trigger.Service, jobScheduler *job.Scheduler, metricCollector *metric.Collector, + cleanupSvc *cleanup.Service, ) Services { return Services{ Webhook: webhooksSvc, @@ -49,5 +52,6 @@ func ProvideServices( Trigger: triggerSvc, JobScheduler: jobScheduler, MetricCollector: metricCollector, + Cleanup: cleanupSvc, } } diff --git a/app/store/database.go b/app/store/database.go index 71ae056cd..b211988bd 100644 --- a/app/store/database.go +++ b/app/store/database.go @@ -236,8 +236,9 @@ type ( // Delete deletes the token with the given id. Delete(ctx context.Context, id int64) error - // DeleteForPrincipal deletes all tokens for a specific principal - DeleteForPrincipal(ctx context.Context, principalID int64) error + // DeleteExpiredBefore deletes all tokens that expired before the provided time. + // If tokenTypes are provided, then only tokens of that type are deleted. + DeleteExpiredBefore(ctx context.Context, before time.Time, tknTypes []enum.TokenType) (int64, error) // List returns a list of tokens of a specific type for a specific principal. List(ctx context.Context, principalID int64, tokenType enum.TokenType) ([]*types.Token, error) @@ -409,6 +410,9 @@ type ( // Create creates a new webhook execution entry. Create(ctx context.Context, hook *types.WebhookExecution) error + // DeleteOld removes all executions that are older than the provided time. + DeleteOld(ctx context.Context, olderThan time.Time) (int64, error) + // ListForWebhook lists the webhook executions for a given webhook id. ListForWebhook(ctx context.Context, webhookID int64, opts *types.WebhookExecutionFilter) ([]*types.WebhookExecution, error) diff --git a/app/store/database/migrate/postgres/0034_create_index_webhook_executions_created.down.sql b/app/store/database/migrate/postgres/0034_create_index_webhook_executions_created.down.sql new file mode 100644 index 000000000..553a6461a --- /dev/null +++ b/app/store/database/migrate/postgres/0034_create_index_webhook_executions_created.down.sql @@ -0,0 +1 @@ +DROP INDEX webhook_executions_created; \ No newline at end of file diff --git a/app/store/database/migrate/postgres/0034_create_index_webhook_executions_created.up.sql b/app/store/database/migrate/postgres/0034_create_index_webhook_executions_created.up.sql new file mode 100644 index 000000000..e4aba23ed --- /dev/null +++ b/app/store/database/migrate/postgres/0034_create_index_webhook_executions_created.up.sql @@ -0,0 +1 @@ +CREATE INDEX webhook_executions_created ON webhook_executions(webhook_execution_created); \ No newline at end of file diff --git a/app/store/database/migrate/postgres/0035_create_index_tokens_type_expires_at.down.sql b/app/store/database/migrate/postgres/0035_create_index_tokens_type_expires_at.down.sql new file mode 100644 index 000000000..cdf471f3c --- /dev/null +++ b/app/store/database/migrate/postgres/0035_create_index_tokens_type_expires_at.down.sql @@ -0,0 +1 @@ +DROP INDEX tokens_type_expires_at; \ No newline at end of file diff --git a/app/store/database/migrate/postgres/0035_create_index_tokens_type_expires_at.up.sql b/app/store/database/migrate/postgres/0035_create_index_tokens_type_expires_at.up.sql new file mode 100644 index 000000000..24ea7a425 --- /dev/null +++ b/app/store/database/migrate/postgres/0035_create_index_tokens_type_expires_at.up.sql @@ -0,0 +1 @@ +CREATE INDEX tokens_type_expires_at ON tokens(token_type, token_expires_at); \ No newline at end of file diff --git a/app/store/database/migrate/sqlite/0034_create_index_webhook_executions_created.down.sql b/app/store/database/migrate/sqlite/0034_create_index_webhook_executions_created.down.sql new file mode 100644 index 000000000..553a6461a --- /dev/null +++ b/app/store/database/migrate/sqlite/0034_create_index_webhook_executions_created.down.sql @@ -0,0 +1 @@ +DROP INDEX webhook_executions_created; \ No newline at end of file diff --git a/app/store/database/migrate/sqlite/0034_create_index_webhook_executions_created.up.sql b/app/store/database/migrate/sqlite/0034_create_index_webhook_executions_created.up.sql new file mode 100644 index 000000000..e4aba23ed --- /dev/null +++ b/app/store/database/migrate/sqlite/0034_create_index_webhook_executions_created.up.sql @@ -0,0 +1 @@ +CREATE INDEX webhook_executions_created ON webhook_executions(webhook_execution_created); \ No newline at end of file diff --git a/app/store/database/migrate/sqlite/0035_create_index_tokens_type_expires_at.down.sql b/app/store/database/migrate/sqlite/0035_create_index_tokens_type_expires_at.down.sql new file mode 100644 index 000000000..cdf471f3c --- /dev/null +++ b/app/store/database/migrate/sqlite/0035_create_index_tokens_type_expires_at.down.sql @@ -0,0 +1 @@ +DROP INDEX tokens_type_expires_at; \ No newline at end of file diff --git a/app/store/database/migrate/sqlite/0035_create_index_tokens_type_expires_at.up.sql b/app/store/database/migrate/sqlite/0035_create_index_tokens_type_expires_at.up.sql new file mode 100644 index 000000000..24ea7a425 --- /dev/null +++ b/app/store/database/migrate/sqlite/0035_create_index_tokens_type_expires_at.up.sql @@ -0,0 +1 @@ +CREATE INDEX tokens_type_expires_at ON tokens(token_type, token_expires_at); \ No newline at end of file diff --git a/app/store/database/token.go b/app/store/database/token.go index 46417ec15..12ef657b6 100644 --- a/app/store/database/token.go +++ b/app/store/database/token.go @@ -16,6 +16,8 @@ package database import ( "context" + "fmt" + "time" "github.com/harness/gitness/app/store" "github.com/harness/gitness/store/database" @@ -23,6 +25,7 @@ import ( "github.com/harness/gitness/types" "github.com/harness/gitness/types/enum" + "github.com/Masterminds/squirrel" "github.com/jmoiron/sqlx" ) @@ -89,15 +92,39 @@ func (s *TokenStore) Delete(ctx context.Context, id int64) error { return nil } -// DeleteForPrincipal deletes all tokens for a specific principal. -func (s *TokenStore) DeleteForPrincipal(ctx context.Context, principalID int64) error { - db := dbtx.GetAccessor(ctx, s.db) +// DeleteExpiredBefore deletes all tokens that expired before the provided time. +// If tokenTypes are provided, then only tokens of that type are deleted. +func (s *TokenStore) DeleteExpiredBefore( + ctx context.Context, + before time.Time, + tknTypes []enum.TokenType, +) (int64, error) { + stmt := database.Builder. + Delete("tokens"). + Where("token_expires_at < ?", before.UnixMilli()) - if _, err := db.ExecContext(ctx, tokenDeleteForPrincipal, principalID); err != nil { - return database.ProcessSQLErrorf(err, "The delete query failed") + if len(tknTypes) > 0 { + stmt = stmt.Where(squirrel.Eq{"token_type": tknTypes}) } - return nil + sql, args, err := stmt.ToSql() + if err != nil { + return 0, fmt.Errorf("failed to convert delete token query to sql: %w", err) + } + + db := dbtx.GetAccessor(ctx, s.db) + + result, err := db.ExecContext(ctx, sql, args...) + if err != nil { + return 0, database.ProcessSQLErrorf(err, "failed to execute delete token query") + } + + n, err := result.RowsAffected() + if err != nil { + return 0, database.ProcessSQLErrorf(err, "failed to get number of deleted tokens") + } + + return n, nil } // Count returns a count of tokens of a specifc type for a specific principal. @@ -166,11 +193,6 @@ DELETE FROM tokens WHERE token_id = $1 ` -const tokenDeleteForPrincipal = ` -DELETE FROM tokens -WHERE token_principal_id = $1 -` - const tokenInsert = ` INSERT INTO tokens ( token_type diff --git a/app/store/database/webhook_execution.go b/app/store/database/webhook_execution.go index 5f8b89198..2cc67e342 100644 --- a/app/store/database/webhook_execution.go +++ b/app/store/database/webhook_execution.go @@ -17,6 +17,7 @@ package database import ( "context" "fmt" + "time" "github.com/harness/gitness/app/store" "github.com/harness/gitness/store/database" @@ -157,6 +158,32 @@ func (s *WebhookExecutionStore) Create(ctx context.Context, execution *types.Web return nil } +// DeleteOld removes all executions that are older than the provided time. +func (s *WebhookExecutionStore) DeleteOld(ctx context.Context, olderThan time.Time) (int64, error) { + stmt := database.Builder. + Delete("webhook_executions"). + Where("webhook_execution_created < ?", olderThan.UnixMilli()) + + sql, args, err := stmt.ToSql() + if err != nil { + return 0, fmt.Errorf("failed to convert delete executions query to sql: %w", err) + } + + db := dbtx.GetAccessor(ctx, s.db) + + result, err := db.ExecContext(ctx, sql, args...) + if err != nil { + return 0, database.ProcessSQLErrorf(err, "failed to execute delete executions query") + } + + n, err := result.RowsAffected() + if err != nil { + return 0, database.ProcessSQLErrorf(err, "failed to get number of deleted executions") + } + + return n, nil +} + // ListForWebhook lists the webhook executions for a given webhook id. func (s *WebhookExecutionStore) ListForWebhook(ctx context.Context, webhookID int64, opts *types.WebhookExecutionFilter) ([]*types.WebhookExecution, error) { diff --git a/cli/server/config.go b/cli/server/config.go index 614ffd46f..2614a7478 100644 --- a/cli/server/config.go +++ b/cli/server/config.go @@ -22,12 +22,14 @@ import ( "strings" "unicode" + "github.com/harness/gitness/app/services/cleanup" "github.com/harness/gitness/app/services/trigger" "github.com/harness/gitness/app/services/webhook" "github.com/harness/gitness/events" "github.com/harness/gitness/gitrpc" "github.com/harness/gitness/gitrpc/server" "github.com/harness/gitness/lock" + "github.com/harness/gitness/pubsub" "github.com/harness/gitness/store/database" "github.com/harness/gitness/types" @@ -211,15 +213,14 @@ func ProvideGitRPCClientConfig() (gitrpc.Config, error) { return config, nil } -// ProvideEventsConfig loads the events config from the environment. -func ProvideEventsConfig() (events.Config, error) { - config := events.Config{} - err := envconfig.Process("", &config) - if err != nil { - return events.Config{}, fmt.Errorf("failed to load events config: %w", err) +// ProvideEventsConfig loads the events config from the main config. +func ProvideEventsConfig(config *types.Config) events.Config { + return events.Config{ + Mode: config.Events.Mode, + Namespace: config.Events.Namespace, + MaxStreamLength: config.Events.MaxStreamLength, + ApproxMaxStreamLength: config.Events.ApproxMaxStreamLength, } - - return config, nil } // ProvideWebhookConfig loads the webhook service config from the main config. @@ -249,7 +250,7 @@ func ProvideLockConfig(config *types.Config) lock.Config { return lock.Config{ App: config.Lock.AppNamespace, Namespace: config.Lock.DefaultNamespace, - Provider: lock.Provider(config.Lock.Provider), + Provider: config.Lock.Provider, Expiry: config.Lock.Expiry, Tries: config.Lock.Tries, RetryDelay: config.Lock.RetryDelay, @@ -257,3 +258,22 @@ func ProvideLockConfig(config *types.Config) lock.Config { TimeoutFactor: config.Lock.TimeoutFactor, } } + +// ProvidePubsubConfig loads the pubsub config from the main config. +func ProvidePubsubConfig(config *types.Config) pubsub.Config { + return pubsub.Config{ + App: config.PubSub.AppNamespace, + Namespace: config.PubSub.DefaultNamespace, + Provider: config.PubSub.Provider, + HealthInterval: config.PubSub.HealthInterval, + SendTimeout: config.PubSub.SendTimeout, + ChannelSize: config.PubSub.ChannelSize, + } +} + +// ProvideCleanupConfig loads the cleanup service config from the main config. +func ProvideCleanupConfig(config *types.Config) cleanup.Config { + return cleanup.Config{ + WebhookExecutionsRetentionTime: config.Webhook.RetentionTime, + } +} diff --git a/cli/server/server.go b/cli/server/server.go index 41b14570a..9da835137 100644 --- a/cli/server/server.go +++ b/cli/server/server.go @@ -89,13 +89,17 @@ func (c *command) run(*kingpin.ParseContext) error { g.Go(func() error { // initialize metric collector if system.services.MetricCollector != nil { - err := system.services.MetricCollector.Register(gCtx) - if err != nil { + if err := system.services.MetricCollector.Register(gCtx); err != nil { log.Error().Err(err).Msg("failed to register metric collector") return err } } + if err := system.services.Cleanup.Register(gCtx); err != nil { + log.Error().Err(err).Msg("failed to register cleanup service") + return err + } + return system.services.JobScheduler.Run(gCtx) }) diff --git a/cmd/gitness/wire.go b/cmd/gitness/wire.go index bd926060a..1f99a8822 100644 --- a/cmd/gitness/wire.go +++ b/cmd/gitness/wire.go @@ -45,6 +45,7 @@ import ( "github.com/harness/gitness/app/router" "github.com/harness/gitness/app/server" "github.com/harness/gitness/app/services" + "github.com/harness/gitness/app/services/cleanup" "github.com/harness/gitness/app/services/codecomments" "github.com/harness/gitness/app/services/exporter" "github.com/harness/gitness/app/services/importer" @@ -118,7 +119,10 @@ func initSystem(ctx context.Context, config *types.Config) (*cliserver.System, e githook.WireSet, cliserver.ProvideLockConfig, lock.WireSet, + cliserver.ProvidePubsubConfig, pubsub.WireSet, + cliserver.ProvideCleanupConfig, + cleanup.WireSet, codecomments.WireSet, job.WireSet, gitrpccron.WireSet, diff --git a/cmd/gitness/wire_gen.go b/cmd/gitness/wire_gen.go index b43d43045..14abf825e 100644 --- a/cmd/gitness/wire_gen.go +++ b/cmd/gitness/wire_gen.go @@ -43,6 +43,7 @@ import ( "github.com/harness/gitness/app/router" server2 "github.com/harness/gitness/app/server" "github.com/harness/gitness/app/services" + "github.com/harness/gitness/app/services/cleanup" "github.com/harness/gitness/app/services/codecomments" "github.com/harness/gitness/app/services/exporter" "github.com/harness/gitness/app/services/importer" @@ -119,7 +120,7 @@ func initSystem(ctx context.Context, config *types.Config) (*server.System, erro return nil, err } jobStore := database.ProvideJobStore(db) - pubsubConfig := pubsub.ProvideConfig(config) + pubsubConfig := server.ProvidePubsubConfig(config) universalClient, err := server.ProvideRedis(config) if err != nil { return nil, err @@ -175,10 +176,7 @@ func initSystem(ctx context.Context, config *types.Config) (*server.System, erro pullReqReviewStore := database.ProvidePullReqReviewStore(db) pullReqReviewerStore := database.ProvidePullReqReviewerStore(db, principalInfoCache) pullReqFileViewStore := database.ProvidePullReqFileViewStore(db) - eventsConfig, err := server.ProvideEventsConfig() - if err != nil { - return nil, err - } + eventsConfig := server.ProvideEventsConfig(config) eventsSystem, err := events.ProvideSystem(eventsConfig, universalClient) if err != nil { return nil, err @@ -257,7 +255,12 @@ func initSystem(ctx context.Context, config *types.Config) (*server.System, erro if err != nil { return nil, err } - servicesServices := services.ProvideServices(webhookService, pullreqService, triggerService, jobScheduler, collector) + cleanupConfig := server.ProvideCleanupConfig(config) + cleanupService, err := cleanup.ProvideService(cleanupConfig, jobScheduler, executor, webhookExecutionStore, tokenStore) + if err != nil { + return nil, err + } + servicesServices := services.ProvideServices(webhookService, pullreqService, triggerService, jobScheduler, collector, cleanupService) serverSystem := server.NewSystem(bootstrapBootstrap, serverServer, poller, grpcServer, pluginManager, cronManager, servicesServices) return serverSystem, nil } diff --git a/events/events.go b/events/events.go index 15d27e3d6..2dc4d2919 100644 --- a/events/events.go +++ b/events/events.go @@ -49,10 +49,10 @@ const ( // Config defines the config of the events system. type Config struct { - Mode Mode `envconfig:"GITNESS_EVENTS_MODE" default:"inmemory"` - Namespace string `envconfig:"GITNESS_EVENTS_NAMESPACE" default:"gitness"` - MaxStreamLength int64 `envconfig:"GITNESS_EVENTS_MAX_STREAM_LENGTH" default:"10000"` - ApproxMaxStreamLength bool `envconfig:"GITNESS_EVENTS_APPROX_MAX_STREAM_LENGTH" default:"true"` + Mode Mode + Namespace string + MaxStreamLength int64 + ApproxMaxStreamLength bool } func (c *Config) Validate() error { diff --git a/pubsub/config.go b/pubsub/config.go index 363d1c488..c738b9901 100644 --- a/pubsub/config.go +++ b/pubsub/config.go @@ -24,12 +24,12 @@ const ( ) type Config struct { - app string // app namespace prefix - namespace string + App string // app namespace prefix + Namespace string - provider Provider + Provider Provider - healthInterval time.Duration - sendTimeout time.Duration - channelSize int + HealthInterval time.Duration + SendTimeout time.Duration + ChannelSize int } diff --git a/pubsub/inmem.go b/pubsub/inmem.go index c9f949bc9..2c8e1e601 100644 --- a/pubsub/inmem.go +++ b/pubsub/inmem.go @@ -37,11 +37,11 @@ type InMemory struct { // NewInMemory create an instance of memory pubsub implementation. func NewInMemory(options ...Option) *InMemory { config := Config{ - app: "app", - namespace: "default", - healthInterval: 3 * time.Second, - sendTimeout: 60, - channelSize: 100, + App: "app", + Namespace: "default", + HealthInterval: 3 * time.Second, + SendTimeout: 60, + ChannelSize: 100, } for _, f := range options { @@ -65,10 +65,10 @@ func (r *InMemory) Subscribe( config := SubscribeConfig{ topics: make([]string, 0, 8), - app: r.config.app, - namespace: r.config.namespace, - sendTimeout: r.config.sendTimeout, - channelSize: r.config.channelSize, + app: r.config.App, + namespace: r.config.Namespace, + sendTimeout: r.config.SendTimeout, + channelSize: r.config.ChannelSize, } for _, f := range options { @@ -100,8 +100,8 @@ func (r *InMemory) Publish(ctx context.Context, topic string, payload []byte, op return nil } pubConfig := PublishConfig{ - app: r.config.app, - namespace: r.config.namespace, + app: r.config.App, + namespace: r.config.Namespace, } for _, f := range opts { f.Apply(&pubConfig) diff --git a/pubsub/options.go b/pubsub/options.go index af2c13adb..970036eed 100644 --- a/pubsub/options.go +++ b/pubsub/options.go @@ -34,14 +34,14 @@ func (f OptionFunc) Apply(config *Config) { // WithApp returns an option that set config app name. func WithApp(value string) Option { return OptionFunc(func(m *Config) { - m.app = value + m.App = value }) } // WithNamespace returns an option that set config namespace. func WithNamespace(value string) Option { return OptionFunc(func(m *Config) { - m.namespace = value + m.Namespace = value }) } @@ -51,7 +51,7 @@ func WithNamespace(value string) Option { // To disable health check, use zero interval. func WithHealthCheckInterval(value time.Duration) Option { return OptionFunc(func(m *Config) { - m.healthInterval = value + m.HealthInterval = value }) } @@ -59,7 +59,7 @@ func WithHealthCheckInterval(value time.Duration) Option { // the message is dropped. func WithSendTimeout(value time.Duration) Option { return OptionFunc(func(m *Config) { - m.sendTimeout = value + m.SendTimeout = value }) } @@ -67,7 +67,7 @@ func WithSendTimeout(value time.Duration) Option { // incoming messages. func WithSize(value int) Option { return OptionFunc(func(m *Config) { - m.channelSize = value + m.ChannelSize = value }) } diff --git a/pubsub/redis.go b/pubsub/redis.go index 7a482a9dd..15514a96c 100644 --- a/pubsub/redis.go +++ b/pubsub/redis.go @@ -35,11 +35,11 @@ type Redis struct { // NewRedis create an instance of redis PubSub implementation. func NewRedis(client redis.UniversalClient, options ...Option) *Redis { config := Config{ - app: "app", - namespace: "default", - healthInterval: 3 * time.Second, - sendTimeout: 60, - channelSize: 100, + App: "app", + Namespace: "default", + HealthInterval: 3 * time.Second, + SendTimeout: 60, + ChannelSize: 100, } for _, f := range options { @@ -64,11 +64,11 @@ func (r *Redis) Subscribe( config := SubscribeConfig{ topics: make([]string, 0, 8), - app: r.config.app, - namespace: r.config.namespace, - healthInterval: r.config.healthInterval, - sendTimeout: r.config.sendTimeout, - channelSize: r.config.channelSize, + app: r.config.App, + namespace: r.config.Namespace, + healthInterval: r.config.HealthInterval, + sendTimeout: r.config.SendTimeout, + channelSize: r.config.ChannelSize, } for _, f := range options { @@ -98,8 +98,8 @@ func (r *Redis) Subscribe( // Publish event topic to message broker with payload. func (r *Redis) Publish(ctx context.Context, topic string, payload []byte, opts ...PublishOption) error { pubConfig := PublishConfig{ - app: r.config.app, - namespace: r.config.namespace, + app: r.config.App, + namespace: r.config.Namespace, } for _, f := range opts { f.Apply(&pubConfig) diff --git a/pubsub/wire.go b/pubsub/wire.go index 6cbdfdcce..f9c9ba5d3 100644 --- a/pubsub/wire.go +++ b/pubsub/wire.go @@ -15,47 +15,33 @@ package pubsub import ( - "github.com/harness/gitness/types" - "github.com/go-redis/redis/v8" "github.com/google/wire" ) var WireSet = wire.NewSet( - ProvideConfig, ProvidePubSub, ) -func ProvideConfig(config *types.Config) Config { - return Config{ - app: config.PubSub.AppNamespace, - namespace: config.PubSub.DefaultNamespace, - provider: Provider(config.PubSub.Provider), - healthInterval: config.PubSub.HealthInterval, - sendTimeout: config.PubSub.SendTimeout, - channelSize: config.PubSub.ChannelSize, - } -} - func ProvidePubSub(config Config, client redis.UniversalClient) PubSub { - switch config.provider { + switch config.Provider { case ProviderRedis: return NewRedis(client, - WithApp(config.app), - WithNamespace(config.namespace), - WithHealthCheckInterval(config.healthInterval), - WithSendTimeout(config.sendTimeout), - WithSize(config.channelSize), + WithApp(config.App), + WithNamespace(config.Namespace), + WithHealthCheckInterval(config.HealthInterval), + WithSendTimeout(config.SendTimeout), + WithSize(config.ChannelSize), ) case ProviderMemory: fallthrough default: return NewInMemory( - WithApp(config.app), - WithNamespace(config.namespace), - WithHealthCheckInterval(config.healthInterval), - WithSendTimeout(config.sendTimeout), - WithSize(config.channelSize), + WithApp(config.App), + WithNamespace(config.Namespace), + WithHealthCheckInterval(config.HealthInterval), + WithSendTimeout(config.SendTimeout), + WithSize(config.ChannelSize), ) } } diff --git a/types/config.go b/types/config.go index dcf6a2daf..8f4053b91 100644 --- a/types/config.go +++ b/types/config.go @@ -16,6 +16,10 @@ package types import ( "time" + + "github.com/harness/gitness/events" + "github.com/harness/gitness/lock" + "github.com/harness/gitness/pubsub" ) // Config stores the system configuration. @@ -197,9 +201,16 @@ type Config struct { SentinelEndpoint string `envconfig:"GITNESS_REDIS_SENTINEL_ENDPOINT"` } + Events struct { + Mode events.Mode `envconfig:"GITNESS_EVENTS_MODE" default:"inmemory"` + Namespace string `envconfig:"GITNESS_EVENTS_NAMESPACE" default:"gitness"` + MaxStreamLength int64 `envconfig:"GITNESS_EVENTS_MAX_STREAM_LENGTH" default:"10000"` + ApproxMaxStreamLength bool `envconfig:"GITNESS_EVENTS_APPROX_MAX_STREAM_LENGTH" default:"true"` + } + Lock struct { // Provider is a name of distributed lock service like redis, memory, file etc... - Provider string `envconfig:"GITNESS_LOCK_PROVIDER" default:"inmemory"` + Provider lock.Provider `envconfig:"GITNESS_LOCK_PROVIDER" default:"inmemory"` Expiry time.Duration `envconfig:"GITNESS_LOCK_EXPIRE" default:"8s"` Tries int `envconfig:"GITNESS_LOCK_TRIES" default:"32"` RetryDelay time.Duration `envconfig:"GITNESS_LOCK_RETRY_DELAY" default:"250ms"` @@ -213,7 +224,7 @@ type Config struct { PubSub struct { // Provider is a name of distributed lock service like redis, memory, file etc... - Provider string `envconfig:"GITNESS_PUBSUB_PROVIDER" default:"inmemory"` + Provider pubsub.Provider `envconfig:"GITNESS_PUBSUB_PROVIDER" default:"inmemory"` // AppNamespace is just service app prefix to avoid conflicts on channel definition AppNamespace string `envconfig:"GITNESS_PUBSUB_APP_NAMESPACE" default:"gitness"` // DefaultNamespace is custom namespace for their channels @@ -227,9 +238,9 @@ type Config struct { // MaxRunning is maximum number of jobs that can be running at once. MaxRunning int `envconfig:"GITNESS_JOBS_MAX_RUNNING" default:"10"` - // PurgeFinishedOlderThan is duration after non-recurring, + // RetentionTime is the duration after which non-recurring, // finished and failed jobs will be purged from the DB. - PurgeFinishedOlderThan time.Duration `envconfig:"GITNESS_JOBS_PURGE_FINISHED_OLDER_THAN" default:"120h"` + RetentionTime time.Duration `envconfig:"GITNESS_JOBS_RETENTION_TIME" default:"120h"` // 5 days } Webhook struct { @@ -243,6 +254,8 @@ type Config struct { MaxRetries int `envconfig:"GITNESS_WEBHOOK_MAX_RETRIES" default:"3"` AllowPrivateNetwork bool `envconfig:"GITNESS_WEBHOOK_ALLOW_PRIVATE_NETWORK" default:"false"` AllowLoopback bool `envconfig:"GITNESS_WEBHOOK_ALLOW_LOOPBACK" default:"false"` + // RetentionTime is the duration after which webhook executions will be purged from the DB. + RetentionTime time.Duration `envconfig:"GITNESS_WEBHOOK_RETENTION_TIME" default:"168h"` // 7 days } Trigger struct {