From d708de0730fc4475c289da915f260c8246f8550f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marko=20Ga=C4=87e=C5=A1a?= Date: Tue, 18 Mar 2025 11:31:51 +0000 Subject: [PATCH] fix: [CODE-2947]: add posthog (#3528) * changed the object-verb separator char * change env var name for the PH API key * add events for repo import and migrate * rename pr "open" to "create"" * resolve pr comments * add posthog --- app/api/controller/githook/post_receive.go | 10 +- app/api/controller/migrate/controller.go | 4 + app/api/controller/migrate/create_repo.go | 9 + app/api/controller/migrate/wire.go | 3 + app/api/controller/repo/controller.go | 7 + app/api/controller/repo/create.go | 8 + app/api/controller/repo/default_branch.go | 8 +- app/api/controller/repo/purge.go | 3 +- app/api/controller/repo/soft_delete.go | 9 + app/api/controller/repo/update.go | 15 +- .../controller/repo/update_public_access.go | 7 + app/events/repo/events.go | 20 ++ app/events/repo/events_repo.go | 136 +++++++- app/services/importer/repository.go | 10 + app/services/importer/wire.go | 3 + .../metric/{metrics.go => collector_job.go} | 92 ++++-- app/services/metric/common.go | 68 ++++ app/services/metric/event_handlers.go | 299 ++++++++++++++++++ app/services/metric/posthog.go | 241 ++++++++++++++ app/services/metric/values.go | 65 ++++ app/services/metric/wire.go | 88 ++++-- app/services/wire.go | 4 +- cmd/gitness/wire_gen.go | 38 ++- go.mod | 1 + go.sum | 2 + types/config.go | 8 + 26 files changed, 1065 insertions(+), 93 deletions(-) create mode 100644 app/events/repo/events.go rename app/services/metric/{metrics.go => collector_job.go} (73%) create mode 100644 app/services/metric/common.go create mode 100644 app/services/metric/event_handlers.go create mode 100644 app/services/metric/posthog.go create mode 100644 app/services/metric/values.go diff --git a/app/api/controller/githook/post_receive.go b/app/api/controller/githook/post_receive.go index 333fa168a..ca07fadeb 100644 --- a/app/api/controller/githook/post_receive.go +++ b/app/api/controller/githook/post_receive.go @@ -343,10 +343,12 @@ func (c *Controller) handleEmptyRepoPush( if repo.DefaultBranch != oldName { c.repoReporter.DefaultBranchUpdated(ctx, &repoevents.DefaultBranchUpdatedPayload{ - RepoID: repo.ID, - PrincipalID: bootstrap.NewSystemServiceSession().Principal.ID, - OldName: oldName, - NewName: repo.DefaultBranch, + Base: repoevents.Base{ + RepoID: repo.ID, + PrincipalID: bootstrap.NewSystemServiceSession().Principal.ID, + }, + OldName: oldName, + NewName: repo.DefaultBranch, }) } } diff --git a/app/api/controller/migrate/controller.go b/app/api/controller/migrate/controller.go index 14f237c23..0fd0323ae 100644 --- a/app/api/controller/migrate/controller.go +++ b/app/api/controller/migrate/controller.go @@ -23,6 +23,7 @@ import ( "github.com/harness/gitness/app/api/usererror" "github.com/harness/gitness/app/auth" "github.com/harness/gitness/app/auth/authz" + repoevents "github.com/harness/gitness/app/events/repo" "github.com/harness/gitness/app/services/migrate" "github.com/harness/gitness/app/services/publicaccess" "github.com/harness/gitness/app/services/refcache" @@ -53,6 +54,7 @@ type Controller struct { repoStore store.RepoStore spaceFinder refcache.SpaceFinder repoFinder refcache.RepoFinder + eventReporter *repoevents.Reporter } func NewController( @@ -72,6 +74,7 @@ func NewController( repoStore store.RepoStore, spaceFinder refcache.SpaceFinder, repoFinder refcache.RepoFinder, + eventReporter *repoevents.Reporter, ) *Controller { return &Controller{ authorizer: authorizer, @@ -90,6 +93,7 @@ func NewController( repoStore: repoStore, spaceFinder: spaceFinder, repoFinder: repoFinder, + eventReporter: eventReporter, } } diff --git a/app/api/controller/migrate/create_repo.go b/app/api/controller/migrate/create_repo.go index 1c71a18fc..b9ef52375 100644 --- a/app/api/controller/migrate/create_repo.go +++ b/app/api/controller/migrate/create_repo.go @@ -25,6 +25,7 @@ import ( repoCtrl "github.com/harness/gitness/app/api/controller/repo" "github.com/harness/gitness/app/auth" "github.com/harness/gitness/app/bootstrap" + repoevents "github.com/harness/gitness/app/events/repo" "github.com/harness/gitness/app/githook" "github.com/harness/gitness/app/paths" "github.com/harness/gitness/audit" @@ -167,6 +168,14 @@ func (c *Controller) CreateRepo( log.Warn().Msgf("failed to insert audit log for import repository operation: %s", err) } + c.eventReporter.Created(ctx, &repoevents.CreatedPayload{ + Base: repoevents.Base{ + RepoID: repo.ID, + PrincipalID: session.Principal.ID, + }, + Type: "migrated", + }) + return &repoCtrl.RepositoryOutput{ Repository: *repo, IsPublic: isRepoPublic, diff --git a/app/api/controller/migrate/wire.go b/app/api/controller/migrate/wire.go index 8f2db8899..5120aa0bf 100644 --- a/app/api/controller/migrate/wire.go +++ b/app/api/controller/migrate/wire.go @@ -17,6 +17,7 @@ package migrate import ( "github.com/harness/gitness/app/api/controller/limiter" "github.com/harness/gitness/app/auth/authz" + repoevents "github.com/harness/gitness/app/events/repo" "github.com/harness/gitness/app/services/migrate" "github.com/harness/gitness/app/services/publicaccess" "github.com/harness/gitness/app/services/refcache" @@ -52,6 +53,7 @@ func ProvideController( repoStore store.RepoStore, spaceFinder refcache.SpaceFinder, repoFinder refcache.RepoFinder, + eventReporter *repoevents.Reporter, ) *Controller { return NewController( authorizer, @@ -70,5 +72,6 @@ func ProvideController( repoStore, spaceFinder, repoFinder, + eventReporter, ) } diff --git a/app/api/controller/repo/controller.go b/app/api/controller/repo/controller.go index a07dba387..3b1622f6a 100644 --- a/app/api/controller/repo/controller.go +++ b/app/api/controller/repo/controller.go @@ -247,6 +247,13 @@ func ValidateParentRef(parentRef string) error { return nil } +func eventBase(repo *types.RepositoryCore, principal *types.Principal) repoevents.Base { + return repoevents.Base{ + RepoID: repo.ID, + PrincipalID: principal.ID, + } +} + func (c *Controller) fetchRules( ctx context.Context, session *auth.Session, diff --git a/app/api/controller/repo/create.go b/app/api/controller/repo/create.go index 7b3a466a4..2e6839554 100644 --- a/app/api/controller/repo/create.go +++ b/app/api/controller/repo/create.go @@ -26,6 +26,7 @@ import ( "github.com/harness/gitness/app/api/usererror" "github.com/harness/gitness/app/auth" "github.com/harness/gitness/app/bootstrap" + repoevents "github.com/harness/gitness/app/events/repo" "github.com/harness/gitness/app/githook" "github.com/harness/gitness/app/paths" "github.com/harness/gitness/app/services/instrument" @@ -165,6 +166,7 @@ func (c *Controller) Create(ctx context.Context, session *auth.Session, in *Crea if err != nil { log.Ctx(ctx).Warn().Msgf("failed to insert audit log for create repository operation: %s", err) } + err = c.instrumentation.Track(ctx, instrument.Event{ Type: instrument.EventTypeRepositoryCreate, Principal: session.Principal.ToPrincipalInfo(), @@ -178,6 +180,12 @@ func (c *Controller) Create(ctx context.Context, session *auth.Session, in *Crea if err != nil { log.Ctx(ctx).Warn().Msgf("failed to insert instrumentation record for create repository operation: %s", err) } + + c.eventReporter.Created(ctx, &repoevents.CreatedPayload{ + Base: eventBase(repo.Core(), &session.Principal), + Type: "created", + }) + // index repository if files are created if !repo.IsEmpty { err = c.indexer.Index(ctx, repo) diff --git a/app/api/controller/repo/default_branch.go b/app/api/controller/repo/default_branch.go index 0a3fbef19..c0a19560f 100644 --- a/app/api/controller/repo/default_branch.go +++ b/app/api/controller/repo/default_branch.go @@ -21,7 +21,6 @@ import ( "github.com/harness/gitness/app/api/controller" "github.com/harness/gitness/app/auth" - "github.com/harness/gitness/app/bootstrap" repoevents "github.com/harness/gitness/app/events/repo" "github.com/harness/gitness/app/paths" "github.com/harness/gitness/audit" @@ -129,10 +128,9 @@ func (c *Controller) UpdateDefaultBranch( } c.eventReporter.DefaultBranchUpdated(ctx, &repoevents.DefaultBranchUpdatedPayload{ - RepoID: repo.ID, - PrincipalID: bootstrap.NewSystemServiceSession().Principal.ID, - OldName: oldName, - NewName: repo.DefaultBranch, + Base: eventBase(repo, &session.Principal), + OldName: oldName, + NewName: repoFull.DefaultBranch, }) return repoOutput, nil diff --git a/app/api/controller/repo/purge.go b/app/api/controller/repo/purge.go index 34140009b..be05edf65 100644 --- a/app/api/controller/repo/purge.go +++ b/app/api/controller/repo/purge.go @@ -21,6 +21,7 @@ import ( apiauth "github.com/harness/gitness/app/api/auth" "github.com/harness/gitness/app/api/usererror" "github.com/harness/gitness/app/auth" + "github.com/harness/gitness/app/bootstrap" repoevents "github.com/harness/gitness/app/events/repo" "github.com/harness/gitness/app/githook" "github.com/harness/gitness/errors" @@ -87,7 +88,7 @@ func (c *Controller) PurgeNoAuth( c.eventReporter.Deleted( ctx, &repoevents.DeletedPayload{ - RepoID: repo.ID, + Base: eventBase(repo.Core(), &bootstrap.NewSystemServiceSession().Principal), }, ) diff --git a/app/api/controller/repo/soft_delete.go b/app/api/controller/repo/soft_delete.go index af0b78dba..90f2a405a 100644 --- a/app/api/controller/repo/soft_delete.go +++ b/app/api/controller/repo/soft_delete.go @@ -22,6 +22,7 @@ import ( apiauth "github.com/harness/gitness/app/api/auth" "github.com/harness/gitness/app/api/usererror" "github.com/harness/gitness/app/auth" + repoevents "github.com/harness/gitness/app/events/repo" "github.com/harness/gitness/app/paths" "github.com/harness/gitness/audit" "github.com/harness/gitness/types" @@ -116,5 +117,13 @@ func (c *Controller) SoftDeleteNoAuth( c.repoFinder.MarkChanged(ctx, repo.Core()) + if repo.Deleted != nil { + c.eventReporter.SoftDeleted(ctx, &repoevents.SoftDeletedPayload{ + Base: eventBase(repo.Core(), &session.Principal), + RepoPath: repo.Path, + Deleted: *repo.Deleted, + }) + } + return nil } diff --git a/app/api/controller/repo/update.go b/app/api/controller/repo/update.go index 50ba164ae..b2714c83b 100644 --- a/app/api/controller/repo/update.go +++ b/app/api/controller/repo/update.go @@ -22,6 +22,7 @@ import ( apiauth "github.com/harness/gitness/app/api/auth" "github.com/harness/gitness/app/api/usererror" "github.com/harness/gitness/app/auth" + repoevents "github.com/harness/gitness/app/events/repo" "github.com/harness/gitness/app/paths" "github.com/harness/gitness/audit" "github.com/harness/gitness/types" @@ -88,8 +89,6 @@ func (c *Controller) Update(ctx context.Context, return nil, fmt.Errorf("failed to find repository by ID: %w", err) } - repoClone := repo.Clone() - if !in.hasChanges(repo) { return GetRepoOutput(ctx, c.publicAccess, repo) } @@ -108,7 +107,11 @@ func (c *Controller) Update(ctx context.Context, repo.State, *in.State) } + var repoClone types.Repository + repo, err = c.repoStore.UpdateOptLock(ctx, repo, func(repo *types.Repository) error { + repoClone = *repo + // update values only if provided if in.Description != nil { repo.Description = *in.Description @@ -141,6 +144,14 @@ func (c *Controller) Update(ctx context.Context, repo.GitURL = c.urlProvider.GenerateGITCloneURL(ctx, repo.Path) repo.GitSSHURL = c.urlProvider.GenerateGITCloneSSHURL(ctx, repo.Path) + if repo.State != repoClone.State { + c.eventReporter.StateChanged(ctx, &repoevents.StateChangedPayload{ + Base: eventBase(repo.Core(), &session.Principal), + OldState: repoClone.State, + NewState: repo.State, + }) + } + return GetRepoOutput(ctx, c.publicAccess, repo) } diff --git a/app/api/controller/repo/update_public_access.go b/app/api/controller/repo/update_public_access.go index ce0f98154..8613c2fc5 100644 --- a/app/api/controller/repo/update_public_access.go +++ b/app/api/controller/repo/update_public_access.go @@ -19,6 +19,7 @@ import ( "fmt" "github.com/harness/gitness/app/auth" + repoevents "github.com/harness/gitness/app/events/repo" "github.com/harness/gitness/app/paths" "github.com/harness/gitness/audit" "github.com/harness/gitness/types/enum" @@ -98,5 +99,11 @@ func (c *Controller) UpdatePublicAccess(ctx context.Context, log.Ctx(ctx).Warn().Msgf("failed to insert audit log for update repository operation: %s", err) } + c.eventReporter.PublicAccessChanged(ctx, &repoevents.PublicAccessChangedPayload{ + Base: eventBase(repo.Core(), &session.Principal), + OldIsPublic: isPublic, + NewIsPublic: in.IsPublic, + }) + return GetRepoOutputWithAccess(ctx, in.IsPublic, repo), nil } diff --git a/app/events/repo/events.go b/app/events/repo/events.go new file mode 100644 index 000000000..8ecf25461 --- /dev/null +++ b/app/events/repo/events.go @@ -0,0 +1,20 @@ +// Copyright 2023 Harness, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package events + +type Base struct { + RepoID int64 `json:"repo_id"` + PrincipalID int64 `json:"principal_id"` +} diff --git a/app/events/repo/events_repo.go b/app/events/repo/events_repo.go index 9321f7075..3e66f259e 100644 --- a/app/events/repo/events_repo.go +++ b/app/events/repo/events_repo.go @@ -18,14 +18,129 @@ import ( "context" "github.com/harness/gitness/events" + "github.com/harness/gitness/types/enum" "github.com/rs/zerolog/log" ) +const CreatedEvent events.EventType = "created" + +type CreatedPayload struct { + Base + Type string `json:"type"` +} + +func (r *Reporter) Created(ctx context.Context, payload *CreatedPayload) { + if payload == nil { + return + } + + eventID, err := events.ReporterSendEvent(r.innerReporter, ctx, CreatedEvent, payload) + if err != nil { + log.Ctx(ctx).Err(err).Msgf("failed to send repo created event") + return + } + + log.Ctx(ctx).Debug().Msgf("reported repo created event with id '%s'", eventID) +} + +func (r *Reader) RegisterCreated( + fn events.HandlerFunc[*CreatedPayload], + opts ...events.HandlerOption, +) error { + return events.ReaderRegisterEvent(r.innerReader, CreatedEvent, fn, opts...) +} + +const StateChangedEvent events.EventType = "state-changed" + +type StateChangedPayload struct { + Base + OldState enum.RepoState `json:"old_state"` + NewState enum.RepoState `json:"new_state"` +} + +func (r *Reporter) StateChanged(ctx context.Context, payload *StateChangedPayload) { + if payload == nil { + return + } + + eventID, err := events.ReporterSendEvent(r.innerReporter, ctx, StateChangedEvent, payload) + if err != nil { + log.Ctx(ctx).Err(err).Msgf("failed to send repo srtate change event") + return + } + + log.Ctx(ctx).Debug().Msgf("reported repo state change event with id '%s'", eventID) +} + +func (r *Reader) RegisterStateChanged( + fn events.HandlerFunc[*StateChangedPayload], + opts ...events.HandlerOption, +) error { + return events.ReaderRegisterEvent(r.innerReader, StateChangedEvent, fn, opts...) +} + +const PublicAccessChangedEvent events.EventType = "public-access-changed" + +type PublicAccessChangedPayload struct { + Base + OldIsPublic bool `json:"old_is_public"` + NewIsPublic bool `json:"new_is_public"` +} + +func (r *Reporter) PublicAccessChanged(ctx context.Context, payload *PublicAccessChangedPayload) { + if payload == nil { + return + } + + eventID, err := events.ReporterSendEvent(r.innerReporter, ctx, PublicAccessChangedEvent, payload) + if err != nil { + log.Ctx(ctx).Err(err).Msgf("failed to send repo public access changed event") + return + } + + log.Ctx(ctx).Debug().Msgf("reported repo public access changed event with id '%s'", eventID) +} + +func (r *Reader) RegisterPublicAccessChanged( + fn events.HandlerFunc[*PublicAccessChangedPayload], + opts ...events.HandlerOption, +) error { + return events.ReaderRegisterEvent(r.innerReader, PublicAccessChangedEvent, fn, opts...) +} + +const SoftDeletedEvent events.EventType = "soft-deleted" + +type SoftDeletedPayload struct { + Base + RepoPath string `json:"repo_path"` + Deleted int64 `json:"deleted"` +} + +func (r *Reporter) SoftDeleted(ctx context.Context, payload *SoftDeletedPayload) { + if payload == nil { + return + } + eventID, err := events.ReporterSendEvent(r.innerReporter, ctx, SoftDeletedEvent, payload) + if err != nil { + log.Ctx(ctx).Err(err).Msgf("failed to send repo soft deleted event") + return + } + + log.Ctx(ctx).Debug().Msgf("reported repo soft deleted event with id '%s'", eventID) +} + +func (r *Reader) RegisterSoftDeleted( + fn events.HandlerFunc[*SoftDeletedPayload], + opts ...events.HandlerOption, +) error { + return events.ReaderRegisterEvent(r.innerReader, SoftDeletedEvent, fn, opts...) +} + const DeletedEvent events.EventType = "deleted" type DeletedPayload struct { - RepoID int64 `json:"repo_id"` + Base } func (r *Reporter) Deleted(ctx context.Context, payload *DeletedPayload) { @@ -41,18 +156,19 @@ func (r *Reporter) Deleted(ctx context.Context, payload *DeletedPayload) { log.Ctx(ctx).Debug().Msgf("reported repo deleted event with id '%s'", eventID) } -func (r *Reader) RegisterRepoDeleted(fn events.HandlerFunc[*DeletedPayload], - opts ...events.HandlerOption) error { +func (r *Reader) RegisterDeleted( + fn events.HandlerFunc[*DeletedPayload], + opts ...events.HandlerOption, +) error { return events.ReaderRegisterEvent(r.innerReader, DeletedEvent, fn, opts...) } const DefaultBranchUpdatedEvent events.EventType = "default-branch-updated" type DefaultBranchUpdatedPayload struct { - RepoID int64 `json:"repo_id"` - PrincipalID int64 `json:"principal_id"` - OldName string `json:"old_name"` - NewName string `json:"new_name"` + Base + OldName string `json:"old_name"` + NewName string `json:"new_name"` } func (r *Reporter) DefaultBranchUpdated(ctx context.Context, payload *DefaultBranchUpdatedPayload) { @@ -65,7 +181,9 @@ func (r *Reporter) DefaultBranchUpdated(ctx context.Context, payload *DefaultBra log.Ctx(ctx).Debug().Msgf("reported default branch updated event with id '%s'", eventID) } -func (r *Reader) RegisterDefaultBranchUpdated(fn events.HandlerFunc[*DefaultBranchUpdatedPayload], - opts ...events.HandlerOption) error { +func (r *Reader) RegisterDefaultBranchUpdated( + fn events.HandlerFunc[*DefaultBranchUpdatedPayload], + opts ...events.HandlerOption, +) error { return events.ReaderRegisterEvent(r.innerReader, DefaultBranchUpdatedEvent, fn, opts...) } diff --git a/app/services/importer/repository.go b/app/services/importer/repository.go index f59fa04d9..665b7ea9e 100644 --- a/app/services/importer/repository.go +++ b/app/services/importer/repository.go @@ -24,6 +24,7 @@ import ( "time" "github.com/harness/gitness/app/bootstrap" + repoevents "github.com/harness/gitness/app/events/repo" "github.com/harness/gitness/app/githook" "github.com/harness/gitness/app/paths" "github.com/harness/gitness/app/services/keywordsearch" @@ -69,6 +70,7 @@ type Repository struct { sseStreamer sse.Streamer indexer keywordsearch.Indexer publicAccess publicaccess.Service + eventReporter *repoevents.Reporter auditService audit.Service } @@ -370,6 +372,14 @@ func (r *Repository) Handle(ctx context.Context, data string, _ job.ProgressRepo r.sseStreamer.Publish(ctx, repo.ParentID, enum.SSETypeRepositoryImportCompleted, repo) + r.eventReporter.Created(ctx, &repoevents.CreatedPayload{ + Base: repoevents.Base{ + RepoID: repo.ID, + PrincipalID: bootstrap.NewSystemServiceSession().Principal.ID, + }, + Type: "imported", + }) + err = r.indexer.Index(ctx, repo) if err != nil { log.Warn().Err(err).Msg("failed to index repository") diff --git a/app/services/importer/wire.go b/app/services/importer/wire.go index 1b1d584e1..787ce4a3e 100644 --- a/app/services/importer/wire.go +++ b/app/services/importer/wire.go @@ -15,6 +15,7 @@ package importer import ( + repoevents "github.com/harness/gitness/app/events/repo" "github.com/harness/gitness/app/services/keywordsearch" "github.com/harness/gitness/app/services/publicaccess" "github.com/harness/gitness/app/services/refcache" @@ -50,6 +51,7 @@ func ProvideRepoImporter( sseStreamer sse.Streamer, indexer keywordsearch.Indexer, publicAccess publicaccess.Service, + eventReporter *repoevents.Reporter, auditService audit.Service, ) (*Repository, error) { importer := &Repository{ @@ -66,6 +68,7 @@ func ProvideRepoImporter( sseStreamer: sseStreamer, indexer: indexer, publicAccess: publicAccess, + eventReporter: eventReporter, auditService: auditService, } diff --git a/app/services/metric/metrics.go b/app/services/metric/collector_job.go similarity index 73% rename from app/services/metric/metrics.go rename to app/services/metric/collector_job.go index 944d920d6..d51fd0aeb 100644 --- a/app/services/metric/metrics.go +++ b/app/services/metric/collector_job.go @@ -22,14 +22,12 @@ import ( "net/http" "time" - "github.com/harness/gitness/app/services/settings" "github.com/harness/gitness/app/store" "github.com/harness/gitness/job" - store2 "github.com/harness/gitness/registry/app/store" + registrystore "github.com/harness/gitness/registry/app/store" "github.com/harness/gitness/types" + "github.com/harness/gitness/types/enum" "github.com/harness/gitness/version" - - "github.com/google/uuid" ) const jobType = "metric-collector" @@ -50,12 +48,10 @@ type metricData struct { ArtifactCount int64 `json:"artifact_count"` } -type Collector struct { - hostname string - enabled bool - endpoint string - token string - installID string +type CollectorJob struct { + values *Values + endpoint string + token string userStore store.PrincipalStore repoStore store.RepoStore @@ -63,29 +59,50 @@ type Collector struct { executionStore store.ExecutionStore scheduler *job.Scheduler gitspaceConfigStore store.GitspaceConfigStore - registryStore store2.RegistryRepository - artifactStore store2.ArtifactRepository - settings *settings.Service + registryStore registrystore.RegistryRepository + artifactStore registrystore.ArtifactRepository + submitter Submitter } -func (c *Collector) Register(ctx context.Context) error { - if !c.enabled { +func NewCollectorJob( + values *Values, + endpoint string, + token string, + userStore store.PrincipalStore, + repoStore store.RepoStore, + pipelineStore store.PipelineStore, + executionStore store.ExecutionStore, + scheduler *job.Scheduler, + gitspaceConfigStore store.GitspaceConfigStore, + registryStore registrystore.RegistryRepository, + artifactStore registrystore.ArtifactRepository, + submitter Submitter, +) *CollectorJob { + return &CollectorJob{ + values: values, + + endpoint: endpoint, + token: token, + + userStore: userStore, + repoStore: repoStore, + pipelineStore: pipelineStore, + executionStore: executionStore, + scheduler: scheduler, + gitspaceConfigStore: gitspaceConfigStore, + registryStore: registryStore, + artifactStore: artifactStore, + + submitter: submitter, + } +} + +func (c *CollectorJob) Register(ctx context.Context) error { + if !c.values.Enabled { return nil } - ok, err := c.settings.SystemGet(ctx, settings.KeyInstallID, &c.installID) - if err != nil { - return fmt.Errorf("failed to find install id: %w", err) - } - if !ok || c.installID == "" { - c.installID = uuid.New().String() - err = c.settings.SystemSet(ctx, settings.KeyInstallID, c.installID) - if err != nil { - return fmt.Errorf("failed to update system settings: %w", err) - } - } - - err = c.scheduler.AddRecurring(ctx, jobType, jobType, "0 0 * * *", time.Minute) + err := c.scheduler.AddRecurring(ctx, jobType, jobType, "0 0 * * *", time.Minute) if err != nil { return fmt.Errorf("failed to register recurring job for collector: %w", err) } @@ -93,15 +110,22 @@ func (c *Collector) Register(ctx context.Context) error { return nil } -func (c *Collector) Handle(ctx context.Context, _ string, _ job.ProgressReporter) (string, error) { - if !c.enabled { +func (c *CollectorJob) Handle(ctx context.Context, _ string, _ job.ProgressReporter) (string, error) { + if !c.values.Enabled { return "", nil } + err := c.submitter.SubmitGroups(ctx) + if err != nil { + return "", fmt.Errorf("failed to submit metric groups: %w", err) + } + // get first available user users, err := c.userStore.ListUsers(ctx, &types.UserFilter{ - Page: 1, - Size: 1, + Page: 1, + Size: 1, + Sort: enum.UserAttrCreated, + Order: enum.OrderAsc, }) if err != nil { return "", err @@ -151,8 +175,8 @@ func (c *Collector) Handle(ctx context.Context, _ string, _ job.ProgressReporter } data := metricData{ - Hostname: c.hostname, - InstallID: c.installID, + Hostname: c.values.Hostname, + InstallID: c.values.InstallID, Installer: users[0].Email, Installed: time.UnixMilli(users[0].Created).Format("2006-01-02 15:04:05"), Version: version.Version.String(), diff --git a/app/services/metric/common.go b/app/services/metric/common.go new file mode 100644 index 000000000..80e6fe976 --- /dev/null +++ b/app/services/metric/common.go @@ -0,0 +1,68 @@ +// Copyright 2023 Harness, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metric + +import ( + "context" + + "github.com/harness/gitness/types" +) + +type Object string + +const ( + ObjectRepository Object = "repository" + ObjectPullRequest Object = "pull_request" +) + +type VerbRepo string + +// Repository verbs. +const ( + VerbRepoCreate VerbRepo = "create" + VerbRepoUpdate VerbRepo = "update" + VerbRepoDelete VerbRepo = "delete" +) + +type VerbPullReq string + +// Pull request verbs. +const ( + VerbPullReqCreate VerbPullReq = "create" + VerbPullReqMerge VerbPullReq = "merge" + VerbPullReqClose VerbPullReq = "close" + VerbPullReqReopen VerbPullReq = "reopen" +) + +type Submitter interface { + // SubmitGroups should be called once a day to update info about all the groups. + SubmitGroups(ctx context.Context) error + + // SubmitForRepo submits an event for a repository. + SubmitForRepo( + ctx context.Context, + user *types.PrincipalInfo, + verb VerbRepo, + properties map[string]any, + ) error + + // SubmitForPullReq submits an event for a pull request. + SubmitForPullReq( + ctx context.Context, + user *types.PrincipalInfo, + verb VerbPullReq, + properties map[string]any, + ) error +} diff --git a/app/services/metric/event_handlers.go b/app/services/metric/event_handlers.go new file mode 100644 index 000000000..03c1177c3 --- /dev/null +++ b/app/services/metric/event_handlers.go @@ -0,0 +1,299 @@ +// Copyright 2023 Harness, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metric + +import ( + "context" + "fmt" + "time" + + pullreqevents "github.com/harness/gitness/app/events/pullreq" + repoevents "github.com/harness/gitness/app/events/repo" + "github.com/harness/gitness/app/services/refcache" + "github.com/harness/gitness/app/store" + "github.com/harness/gitness/events" + "github.com/harness/gitness/stream" + "github.com/harness/gitness/types" +) + +func registerEventListeners( + ctx context.Context, + config *types.Config, + principalInfoCache store.PrincipalInfoCache, + pullReqStore store.PullReqStore, + repoReaderFactory *events.ReaderFactory[*repoevents.Reader], + pullreqEvReaderFactory *events.ReaderFactory[*pullreqevents.Reader], + repoFinder refcache.RepoFinder, + submitter Submitter, +) error { + if submitter == nil { + return nil + } + + var err error + + const groupMetricsRepo = "gitness:metrics:repo" + _, err = repoReaderFactory.Launch(ctx, groupMetricsRepo, config.InstanceID, + func(r *repoevents.Reader) error { + const idleTimeout = 10 * time.Second + r.Configure( + stream.WithConcurrency(1), + stream.WithHandlerOptions( + stream.WithIdleTimeout(idleTimeout), + stream.WithMaxRetries(2), + )) + + h := handlersRepo{ + principalInfoCache: principalInfoCache, + repoFinder: repoFinder, + submitter: submitter, + } + + _ = r.RegisterCreated(h.Create) + _ = r.RegisterDefaultBranchUpdated(h.DefaultBranchUpdate) + _ = r.RegisterStateChanged(h.StateChange) + _ = r.RegisterPublicAccessChanged(h.PublicAccessChange) + _ = r.RegisterSoftDeleted(h.SoftDelete) + + return nil + }) + if err != nil { + return err + } + + const groupMetricsPullReq = "gitness:metrics:pullreq" + _, err = pullreqEvReaderFactory.Launch(ctx, groupMetricsPullReq, config.InstanceID, + func(r *pullreqevents.Reader) error { + const idleTimeout = 10 * time.Second + r.Configure( + stream.WithConcurrency(1), + stream.WithHandlerOptions( + stream.WithIdleTimeout(idleTimeout), + stream.WithMaxRetries(2), + )) + + h := handlersPullReq{ + principalInfoCache: principalInfoCache, + repoFinder: repoFinder, + pullReqStore: pullReqStore, + submitter: submitter, + } + + _ = r.RegisterCreated(h.Create) + _ = r.RegisterReopened(h.Reopen) + _ = r.RegisterClosed(h.Close) + _ = r.RegisterMerged(h.Merge) + + return nil + }) + if err != nil { + return err + } + + return nil +} + +type handlersRepo struct { + principalInfoCache store.PrincipalInfoCache + repoFinder refcache.RepoFinder + submitter Submitter +} + +func (h handlersRepo) Create(ctx context.Context, e *events.Event[*repoevents.CreatedPayload]) error { + props := map[string]any{ + "type": e.Payload.Type, + } + return h.submit(ctx, e.Payload.RepoID, e.Payload.PrincipalID, VerbRepoCreate, props) +} + +func (h handlersRepo) DefaultBranchUpdate( + ctx context.Context, + e *events.Event[*repoevents.DefaultBranchUpdatedPayload], +) error { + props := map[string]any{ + "change": "default_branch", + "repo_old_default_branch": e.Payload.OldName, + "repo_new_default_branch": e.Payload.NewName, + } + return h.submit(ctx, e.Payload.RepoID, e.Payload.PrincipalID, VerbRepoUpdate, props) +} + +func (h handlersRepo) StateChange( + ctx context.Context, + e *events.Event[*repoevents.StateChangedPayload], +) error { + props := map[string]any{ + "change": "state", + "repo_old_state": e.Payload.OldState, + "repo_new_state": e.Payload.NewState, + } + return h.submit(ctx, e.Payload.RepoID, e.Payload.PrincipalID, VerbRepoUpdate, props) +} + +func (h handlersRepo) PublicAccessChange( + ctx context.Context, + e *events.Event[*repoevents.PublicAccessChangedPayload], +) error { + props := map[string]any{ + "change": "public_access", + "repo_old_is_public": e.Payload.OldIsPublic, + "repo_new_is_public": e.Payload.NewIsPublic, + } + return h.submit(ctx, e.Payload.RepoID, e.Payload.PrincipalID, VerbRepoUpdate, props) +} + +func (h handlersRepo) SoftDelete( + ctx context.Context, + e *events.Event[*repoevents.SoftDeletedPayload], +) error { + return h.submitDeleted(ctx, e.Payload.RepoPath, e.Payload.Deleted, e.Payload.PrincipalID, VerbRepoDelete, nil) +} + +func (h handlersRepo) submit( + ctx context.Context, + repoID int64, + principalID int64, + verb VerbRepo, + props map[string]any, +) error { + repo, err := h.repoFinder.FindByID(ctx, repoID) + if err != nil { + return fmt.Errorf("failed to find repository") + } + + err = h.submitMetric(ctx, repo, principalID, verb, props) + if err != nil { + return fmt.Errorf("failed to submit event: %w", err) + } + + return nil +} + +func (h handlersRepo) submitDeleted( + ctx context.Context, + repoRef string, + deletedAt int64, + principalID int64, + verb VerbRepo, + props map[string]any, +) error { + repo, err := h.repoFinder.FindDeletedByRef(ctx, repoRef, deletedAt) + if err != nil { + return fmt.Errorf("failed to find delete repo: %w", err) + } + + err = h.submitMetric(ctx, repo.Core(), principalID, verb, props) + if err != nil { + return fmt.Errorf("failed to submit event: %w", err) + } + + return nil +} + +func (h handlersRepo) submitMetric( + ctx context.Context, + repo *types.RepositoryCore, + principalID int64, + verb VerbRepo, + props map[string]any, +) error { + principal, err := h.principalInfoCache.Get(ctx, principalID) + if err != nil { + return fmt.Errorf("failed to get principal info: %w", err) + } + + if props == nil { + props = make(map[string]any) + } + + props["repo_id"] = repo.ID + props["repo_path"] = repo.Path + props["repo_parent_id"] = repo.ParentID + + err = h.submitter.SubmitForRepo(ctx, principal, verb, props) + if err != nil { + return fmt.Errorf("failed to submit metric data for repositoy: %w", err) + } + + return nil +} + +type handlersPullReq struct { + principalInfoCache store.PrincipalInfoCache + repoFinder refcache.RepoFinder + pullReqStore store.PullReqStore + submitter Submitter +} + +func (h handlersPullReq) Create(ctx context.Context, e *events.Event[*pullreqevents.CreatedPayload]) error { + return h.submit(ctx, e.Payload.PullReqID, e.Payload.PrincipalID, VerbPullReqCreate) +} + +func (h handlersPullReq) Close(ctx context.Context, e *events.Event[*pullreqevents.ClosedPayload]) error { + return h.submit(ctx, e.Payload.PullReqID, e.Payload.PrincipalID, VerbPullReqClose) +} + +func (h handlersPullReq) Reopen(ctx context.Context, e *events.Event[*pullreqevents.ReopenedPayload]) error { + return h.submit(ctx, e.Payload.PullReqID, e.Payload.PrincipalID, VerbPullReqReopen) +} + +func (h handlersPullReq) Merge(ctx context.Context, e *events.Event[*pullreqevents.MergedPayload]) error { + return h.submit(ctx, e.Payload.PullReqID, e.Payload.PrincipalID, VerbPullReqMerge) +} + +func (h handlersPullReq) submit(ctx context.Context, pullReqID, principalID int64, verb VerbPullReq) error { + pr, err := h.pullReqStore.Find(ctx, pullReqID) + if err != nil { + return fmt.Errorf("failed to find pull request: %w", err) + } + + repo, err := h.repoFinder.FindByID(ctx, pr.TargetRepoID) + if err != nil { + return fmt.Errorf("failed to find pull request: %w", err) + } + + principal, err := h.principalInfoCache.Get(ctx, principalID) + if err != nil { + return fmt.Errorf("failed to get principal info: %w", err) + } + + author, err := h.principalInfoCache.Get(ctx, pr.CreatedBy) + if err != nil { + return fmt.Errorf("failed to get author principal info: %w", err) + } + + props := map[string]any{ + "repo_id": repo.ID, + "repo_path": repo.Path, + "repo_parent_id": repo.ParentID, + "pullreq_author_email": author.Email, + "pullreq_number": pr.Number, + "pullreq_target_repo_id": pr.TargetRepoID, + "pullreq_source_repo_id": pr.SourceRepoID, + "pullreq_target_branch": pr.TargetBranch, + "pullreq_source_branch": pr.SourceBranch, + } + + if pr.MergeMethod != nil { + props["pullreq_merge_method"] = string(*pr.MergeMethod) + } + + err = h.submitter.SubmitForPullReq(ctx, principal, verb, props) + if err != nil { + return fmt.Errorf("failed to submit metric data for pull request: %w", err) + } + + return nil +} diff --git a/app/services/metric/posthog.go b/app/services/metric/posthog.go new file mode 100644 index 000000000..e9fd3750f --- /dev/null +++ b/app/services/metric/posthog.go @@ -0,0 +1,241 @@ +// Copyright 2023 Harness, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metric + +import ( + "context" + "fmt" + "time" + + "github.com/harness/gitness/app/store" + "github.com/harness/gitness/errors" + "github.com/harness/gitness/types" + "github.com/harness/gitness/types/enum" + + "github.com/posthog/posthog-go" + "github.com/rs/zerolog" + "github.com/rs/zerolog/log" +) + +const postHogGroupInstall = "install" +const postHogServerUserID = "harness-server" + +type PostHog struct { + client posthog.Client + installID string + hostname string + principalStore store.PrincipalStore + principalInfoCache store.PrincipalInfoCache +} + +type group struct { + Type string + ID string + Properties map[string]any +} + +func NewPostHog( + ctx context.Context, + config *types.Config, + values *Values, + principalStore store.PrincipalStore, + principalInfoCache store.PrincipalInfoCache, +) (*PostHog, error) { + if !values.Enabled || values.InstallID == "" || config.Metric.PostHogProjectAPIKey == "" { + return nil, nil //nolint:nilnil // PostHog is disabled + } + + logr := log.Ctx(ctx).With().Str("service.name", "posthog").Logger() + + // https://posthog.com/docs/libraries/go#overriding-geoip-properties + + phConfig := posthog.Config{ + Endpoint: config.Metric.PostHogEndpoint, + PersonalApiKey: config.Metric.PostHogPersonalAPIKey, + Logger: &logger{Logger: logr}, + DefaultEventProperties: posthog.NewProperties().Set("install_id", values.InstallID), + Callback: nil, + } + + client, err := posthog.NewWithConfig(config.Metric.PostHogProjectAPIKey, phConfig) + if err != nil { + return nil, fmt.Errorf("failed to create PostHog client: %w", err) + } + + ph := &PostHog{ + client: client, + installID: values.InstallID, + hostname: values.Hostname, + principalStore: principalStore, + principalInfoCache: principalInfoCache, + } + + go ph.submitDefaultGroupOnce(ctx) + + return ph, nil +} + +func (ph *PostHog) SubmitGroups(context.Context) error { + // No implementation + return nil +} + +func (ph *PostHog) SubmitForRepo( + ctx context.Context, + user *types.PrincipalInfo, + verb VerbRepo, + properties map[string]any, +) error { + return ph.submit(ctx, user, ObjectRepository, string(verb), properties) +} + +func (ph *PostHog) SubmitForPullReq( + ctx context.Context, + user *types.PrincipalInfo, + verb VerbPullReq, + properties map[string]any, +) error { + return ph.submit(ctx, user, ObjectPullRequest, string(verb), properties) +} + +func (ph *PostHog) uniqueUserID(id string) string { + return ph.installID + ":" + id +} + +func (ph *PostHog) submit( + _ context.Context, + user *types.PrincipalInfo, + object Object, + verb string, + properties map[string]any, +) error { + if ph == nil { + return nil + } + + var distinctID string + if user != nil { + distinctID = ph.uniqueUserID(user.UID) + + p := posthog.NewProperties().Merge(properties) + p.Set("$set_once", map[string]any{ + "type": user.Type, + "created": user.Created, + }) + p.Set("$set", map[string]any{ + "email": user.Email, + }) + + properties = p + } + + err := ph.client.Enqueue(posthog.Capture{ + DistinctId: distinctID, + Event: string(object) + ":" + verb, + Groups: posthog.NewGroups().Set(postHogGroupInstall, ph.installID), + Properties: properties, + }) + if err != nil { + return fmt.Errorf("failed to enqueue event; object=%s verb=%s: %w", object, verb, err) + } + + return nil +} + +func (ph *PostHog) submitGroup(group group) error { + err := ph.client.Enqueue(posthog.GroupIdentify{ + DistinctId: postHogServerUserID, + Type: group.Type, + Key: group.ID, + Properties: group.Properties, + }) + if err != nil { + return fmt.Errorf("failed to enqueue group identify: %w", err) + } + + return nil +} + +func (ph *PostHog) submitDefaultGroup(ctx context.Context) error { + users, err := ph.principalStore.ListUsers(ctx, &types.UserFilter{ + Page: 1, + Size: 1, + Sort: enum.UserAttrCreated, + Order: enum.OrderAsc, + }) + if err != nil { + return fmt.Errorf("failed to list users: %w", err) + } + + if len(users) == 0 { + return errors.New("no users found") + } + + userFirst := users[0] + + // Note: The PostHog UI identifies a group using the name property. + // If the name property is not found, it falls back to the group key. + // https://posthog.com/docs/product-analytics/group-analytics#how-to-set-group-properties + g := group{ + Type: postHogGroupInstall, + ID: ph.installID, + Properties: posthog.NewProperties(). + Set("name", "install"). + Set("hostname", ph.hostname). + Set("email", userFirst.Email). + Set("created", userFirst.Created), + } + + err = ph.submitGroup(g) + if err != nil { + return fmt.Errorf("failed to submit default group: %w", err) + } + + return nil +} + +func (ph *PostHog) submitDefaultGroupOnce(ctx context.Context) { + timer := time.NewTimer(time.Hour) + defer timer.Stop() + + logr := log.Ctx(ctx).With().Str("service.name", "posthog").Logger() + + for { + select { + case <-ctx.Done(): + return + case <-timer.C: + if err := ph.submitDefaultGroup(ctx); err != nil { + logr.Err(err).Msg("failed to submit default group") + timer.Reset(time.Hour) + continue + } + + return + } + } +} + +type logger struct { + zerolog.Logger +} + +func (l *logger) Logf(format string, args ...interface{}) { + l.Info().Msgf(format, args...) +} + +func (l *logger) Errorf(format string, args ...interface{}) { + l.Error().Msgf(format, args...) +} diff --git a/app/services/metric/values.go b/app/services/metric/values.go new file mode 100644 index 000000000..1dbc4a47c --- /dev/null +++ b/app/services/metric/values.go @@ -0,0 +1,65 @@ +// Copyright 2023 Harness, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metric + +import ( + "context" + "fmt" + + "github.com/harness/gitness/app/services/settings" + "github.com/harness/gitness/types" + + "github.com/google/uuid" +) + +type Values struct { + Enabled bool + Hostname string + InstallID string +} + +func NewValues( + ctx context.Context, + config *types.Config, + settingsSrv *settings.Service, +) (*Values, error) { + if !config.Metric.Enabled { + return &Values{ + Enabled: false, + InstallID: "", + }, nil + } + + values := Values{ + Enabled: true, + Hostname: config.InstanceID, + InstallID: "", + } + + ok, err := settingsSrv.SystemGet(ctx, settings.KeyInstallID, &values.InstallID) + if err != nil { + return nil, fmt.Errorf("failed to find install id: %w", err) + } + + if !ok || values.InstallID == "" { + values.InstallID = uuid.New().String() + err = settingsSrv.SystemSet(ctx, settings.KeyInstallID, values.InstallID) + if err != nil { + return nil, fmt.Errorf("failed to update system settings: %w", err) + } + } + + return &values, nil +} diff --git a/app/services/metric/wire.go b/app/services/metric/wire.go index 9facab5f7..650cd5d62 100644 --- a/app/services/metric/wire.go +++ b/app/services/metric/wire.go @@ -15,8 +15,15 @@ package metric import ( + "context" + "fmt" + + pullreqevents "github.com/harness/gitness/app/events/pullreq" + repoevents "github.com/harness/gitness/app/events/repo" + "github.com/harness/gitness/app/services/refcache" "github.com/harness/gitness/app/services/settings" "github.com/harness/gitness/app/store" + "github.com/harness/gitness/events" "github.com/harness/gitness/job" registrystore "github.com/harness/gitness/registry/app/store" "github.com/harness/gitness/types" @@ -25,11 +32,51 @@ import ( ) var WireSet = wire.NewSet( - ProvideCollector, + ProvideValues, + ProvideSubmitter, + ProvideCollectorJob, ) -func ProvideCollector( +func ProvideValues(ctx context.Context, config *types.Config, settingsSrv *settings.Service) (*Values, error) { + return NewValues(ctx, config, settingsSrv) +} + +func ProvideSubmitter( + appCtx context.Context, config *types.Config, + values *Values, + principalStore store.PrincipalStore, + principalInfoCache store.PrincipalInfoCache, + pullReqStore store.PullReqStore, + repoReaderFactory *events.ReaderFactory[*repoevents.Reader], + pullreqEvReaderFactory *events.ReaderFactory[*pullreqevents.Reader], + repoFinder refcache.RepoFinder, +) (Submitter, error) { + submitter, err := NewPostHog(appCtx, config, values, principalStore, principalInfoCache) + if err != nil { + return nil, fmt.Errorf("failed to create posthog metrics submitter: %w", err) + } + + err = registerEventListeners( + appCtx, + config, + principalInfoCache, + pullReqStore, + repoReaderFactory, + pullreqEvReaderFactory, + repoFinder, + submitter, + ) + if err != nil { + return nil, fmt.Errorf("failed to register metric event listeners: %w", err) + } + + return submitter, nil +} + +func ProvideCollectorJob( + config *types.Config, + values *Values, userStore store.PrincipalStore, repoStore store.RepoStore, pipelineStore store.PipelineStore, @@ -37,30 +84,29 @@ func ProvideCollector( scheduler *job.Scheduler, executor *job.Executor, gitspaceConfigStore store.GitspaceConfigStore, - settings *settings.Service, registryStore registrystore.RegistryRepository, artifactStore registrystore.ArtifactRepository, -) (*Collector, error) { - job := &Collector{ - hostname: config.InstanceID, - enabled: config.Metric.Enabled, - endpoint: config.Metric.Endpoint, - token: config.Metric.Token, - userStore: userStore, - repoStore: repoStore, - pipelineStore: pipelineStore, - executionStore: executionStore, - scheduler: scheduler, - gitspaceConfigStore: gitspaceConfigStore, - registryStore: registryStore, - artifactStore: artifactStore, - settings: settings, - } + submitter Submitter, +) (*CollectorJob, error) { + collector := NewCollectorJob( + values, + config.Metric.Endpoint, + config.Metric.Token, + userStore, + repoStore, + pipelineStore, + executionStore, + scheduler, + gitspaceConfigStore, + registryStore, + artifactStore, + submitter, + ) - err := executor.Register(jobType, job) + err := executor.Register(jobType, collector) if err != nil { return nil, err } - return job, nil + return collector, nil } diff --git a/app/services/wire.go b/app/services/wire.go index 802234211..5a222d793 100644 --- a/app/services/wire.go +++ b/app/services/wire.go @@ -45,7 +45,7 @@ type Services struct { PullReq *pullreq.Service Trigger *trigger.Service JobScheduler *job.Scheduler - MetricCollector *metric.Collector + MetricCollector *metric.CollectorJob RepoSizeCalculator *repo.SizeCalculator Repo *repo.Service Cleanup *cleanup.Service @@ -90,7 +90,7 @@ func ProvideServices( pullReqSvc *pullreq.Service, triggerSvc *trigger.Service, jobScheduler *job.Scheduler, - metricCollector *metric.Collector, + metricCollector *metric.CollectorJob, repoSizeCalculator *repo.SizeCalculator, repo *repo.Service, cleanupSvc *cleanup.Service, diff --git a/cmd/gitness/wire_gen.go b/cmd/gitness/wire_gen.go index 90ab93bd4..84163467e 100644 --- a/cmd/gitness/wire_gen.go +++ b/cmd/gitness/wire_gen.go @@ -236,14 +236,6 @@ func initSystem(ctx context.Context, config *types.Config) (*server.System, erro streamer := sse.ProvideEventsStreaming(pubSub) localIndexSearcher := keywordsearch.ProvideLocalIndexSearcher() indexer := keywordsearch.ProvideIndexer(localIndexSearcher) - auditService := audit.ProvideAuditService() - repository, err := importer.ProvideRepoImporter(config, provider, gitInterface, transactor, repoStore, pipelineStore, triggerStore, repoFinder, encrypter, jobScheduler, executor, streamer, indexer, publicaccessService, auditService) - if err != nil { - return nil, err - } - codeownersConfig := server.ProvideCodeOwnerConfig(config) - usergroupResolver := usergroup.ProvideUserGroupResolver() - codeownersService := codeowners.ProvideCodeOwners(gitInterface, repoStore, codeownersConfig, principalStore, usergroupResolver) eventsConfig := server.ProvideEventsConfig(config) eventsSystem, err := events.ProvideSystem(eventsConfig, universalClient) if err != nil { @@ -253,6 +245,14 @@ func initSystem(ctx context.Context, config *types.Config) (*server.System, erro if err != nil { return nil, err } + auditService := audit.ProvideAuditService() + repository, err := importer.ProvideRepoImporter(config, provider, gitInterface, transactor, repoStore, pipelineStore, triggerStore, repoFinder, encrypter, jobScheduler, executor, streamer, indexer, publicaccessService, reporter, auditService) + if err != nil { + return nil, err + } + codeownersConfig := server.ProvideCodeOwnerConfig(config) + usergroupResolver := usergroup.ProvideUserGroupResolver() + codeownersService := codeowners.ProvideCodeOwners(gitInterface, repoStore, codeownersConfig, principalStore, usergroupResolver) resourceLimiter, err := limiter.ProvideLimiter() if err != nil { return nil, err @@ -449,7 +449,7 @@ func initSystem(ctx context.Context, config *types.Config) (*server.System, erro rule := migrate.ProvideRuleImporter(ruleStore, transactor, principalStore) migrateWebhook := migrate.ProvideWebhookImporter(webhookConfig, transactor, webhookStore) migrateLabel := migrate.ProvideLabelImporter(transactor, labelStore, labelValueStore, spaceStore) - migrateController := migrate2.ProvideController(authorizer, publicaccessService, gitInterface, provider, pullReq, rule, migrateWebhook, migrateLabel, resourceLimiter, auditService, repoIdentifier, transactor, spaceStore, repoStore, spaceFinder, repoFinder) + migrateController := migrate2.ProvideController(authorizer, publicaccessService, gitInterface, provider, pullReq, rule, migrateWebhook, migrateLabel, resourceLimiter, auditService, repoIdentifier, transactor, spaceStore, repoStore, spaceFinder, repoFinder, reporter) openapiService := openapi.ProvideOpenAPIService() storageDriver, err := api2.BlobStorageProvider(config) if err != nil { @@ -539,11 +539,7 @@ func initSystem(ctx context.Context, config *types.Config) (*server.System, erro if err != nil { return nil, err } - collector, err := metric.ProvideCollector(config, principalStore, repoStore, pipelineStore, executionStore, jobScheduler, executor, gitspaceConfigStore, settingsService, registryRepository, artifactRepository) - if err != nil { - return nil, err - } - sizeCalculator, err := repo2.ProvideCalculator(config, gitInterface, repoStore, jobScheduler, executor) + values, err := metric.ProvideValues(ctx, config, settingsService) if err != nil { return nil, err } @@ -551,6 +547,18 @@ func initSystem(ctx context.Context, config *types.Config) (*server.System, erro if err != nil { return nil, err } + submitter, err := metric.ProvideSubmitter(ctx, config, values, principalStore, principalInfoCache, pullReqStore, readerFactory3, eventsReaderFactory, repoFinder) + if err != nil { + return nil, err + } + collectorJob, err := metric.ProvideCollectorJob(config, values, principalStore, repoStore, pipelineStore, executionStore, jobScheduler, executor, gitspaceConfigStore, registryRepository, artifactRepository, submitter) + if err != nil { + return nil, err + } + sizeCalculator, err := repo2.ProvideCalculator(config, gitInterface, repoStore, jobScheduler, executor) + if err != nil { + return nil, err + } repoService, err := repo2.ProvideService(ctx, config, reporter, readerFactory3, repoStore, provider, gitInterface, lockerLocker) if err != nil { return nil, err @@ -615,7 +623,7 @@ func initSystem(ctx context.Context, config *types.Config) (*server.System, erro if err != nil { return nil, err } - servicesServices := services.ProvideServices(webhookService, pullreqService, triggerService, jobScheduler, collector, sizeCalculator, repoService, cleanupService, notificationService, keywordsearchService, gitspaceServices, instrumentService, consumer, repositoryCount, service2) + servicesServices := services.ProvideServices(webhookService, pullreqService, triggerService, jobScheduler, collectorJob, sizeCalculator, repoService, cleanupService, notificationService, keywordsearchService, gitspaceServices, instrumentService, consumer, repositoryCount, service2) serverSystem := server.NewSystem(bootstrapBootstrap, serverServer, sshServer, poller, resolverManager, servicesServices) return serverSystem, nil } diff --git a/go.mod b/go.mod index 7f018cb20..d1570ca8d 100644 --- a/go.mod +++ b/go.mod @@ -145,6 +145,7 @@ require ( github.com/onsi/gomega v1.27.10 // indirect github.com/pelletier/go-toml/v2 v2.2.2 // indirect github.com/perimeterx/marshmallow v1.1.5 // indirect + github.com/posthog/posthog-go v1.3.3 // indirect github.com/prometheus/client_golang v1.19.1 // indirect github.com/prometheus/client_model v0.6.1 // indirect github.com/prometheus/common v0.55.0 // indirect diff --git a/go.sum b/go.sum index 608a917e0..2d8709ea4 100644 --- a/go.sum +++ b/go.sum @@ -636,6 +636,8 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI= +github.com/posthog/posthog-go v1.3.3 h1:0b1JlfPDMKXGRgbTtqkSG6hsrlj8yW2tv2HS6Dn7wFk= +github.com/posthog/posthog-go v1.3.3/go.mod h1:uYC2l1Yktc8E+9FAHJ9QZG4vQf/NHJPD800Hsm7DzoM= github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= github.com/prometheus/client_golang v0.9.3-0.20190127221311-3c4408c8b829/go.mod h1:p2iRAGwDERtqlqzRXnrOVns+ignqQo//hLXqYxZYVNs= github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo= diff --git a/types/config.go b/types/config.go index 0a27283f0..54611d792 100644 --- a/types/config.go +++ b/types/config.go @@ -352,6 +352,14 @@ type Config struct { Enabled bool `envconfig:"GITNESS_METRIC_ENABLED" default:"true"` Endpoint string `envconfig:"GITNESS_METRIC_ENDPOINT" default:"https://stats.drone.ci/api/v1/gitness"` Token string `envconfig:"GITNESS_METRIC_TOKEN"` + + // PostHogEndpoint is URL to the PostHog service + PostHogEndpoint string `envconfig:"GITNESS_METRIC_POSTHOG_ENDPOINT" default:"https://us.i.posthog.com"` + // PostHogProjectAPIKey (starts with "phc_") is public (can be exposed in frontend) token used to submit events. + PostHogProjectAPIKey string `envconfig:"GITNESS_METRIC_POSTHOG_PROJECT_APIKEY"` + // PostHogPersonalAPIKey (starts with "phx_") is sensitive. It's used to access private access points. + // It's not required for submitting events. + PostHogPersonalAPIKey string `envconfig:"GITNESS_METRIC_POSTHOG_PERSONAL_APIKEY"` } RepoSize struct {