feat: [PIPE-22071]: Enhance pipeline list API to include last 10 execution summaries for a pipeline (#2840)

* Scan directly into types.ExecutionInfo
* Merge remote-tracking branch 'origin/main' into dd/pipeline-execs
* Merge remote-tracking branch 'origin/main' into dd/pipeline-execs
* Add last_executions param and optimize db query for last execs
* Enhance pipeline list API to include last 10 execution summaries for a pipeline
pull/3576/head
Darko Draskovic 2024-10-23 16:17:25 +00:00 committed by Harness
parent fdb41b5b38
commit 0f3956696c
12 changed files with 127 additions and 20 deletions

View File

@ -78,6 +78,7 @@ type Controller struct {
repoStore store.RepoStore
spaceStore store.SpaceStore
pipelineStore store.PipelineStore
executionStore store.ExecutionStore
principalStore store.PrincipalStore
ruleStore store.RuleStore
settings *settings.Service
@ -109,6 +110,7 @@ func NewController(
repoStore store.RepoStore,
spaceStore store.SpaceStore,
pipelineStore store.PipelineStore,
executionStore store.ExecutionStore,
principalStore store.PrincipalStore,
ruleStore store.RuleStore,
settings *settings.Service,
@ -139,6 +141,7 @@ func NewController(
repoStore: repoStore,
spaceStore: spaceStore,
pipelineStore: pipelineStore,
executionStore: executionStore,
principalStore: principalStore,
ruleStore: ruleStore,
settings: settings,

View File

@ -30,8 +30,7 @@ func (c *Controller) ListPipelines(
ctx context.Context,
session *auth.Session,
repoRef string,
latest bool,
filter types.ListQueryFilter,
filter *types.ListPipelinesFilter,
) ([]*types.Pipeline, int64, error) {
repo, err := c.getRepo(ctx, repoRef)
if err != nil {
@ -50,7 +49,7 @@ func (c *Controller) ListPipelines(
return fmt.Errorf("failed to count child executions: %w", err)
}
if !latest {
if !filter.Latest {
pipelines, err = c.pipelineStore.List(ctx, repo.ID, filter)
if err != nil {
return fmt.Errorf("failed to list pipelines: %w", err)
@ -62,7 +61,19 @@ func (c *Controller) ListPipelines(
}
}
return
pipelineIDs := make([]int64, len(pipelines))
for i, pipeline := range pipelines {
pipelineIDs[i] = pipeline.ID
}
execs, err := c.executionStore.ListByPipelineIDs(ctx, pipelineIDs, filter.LastExecutions)
if err != nil {
return fmt.Errorf("failed to list executions by pipeline IDs: %w", err)
}
for _, pipeline := range pipelines {
pipeline.LastExecutions = execs[pipeline.ID]
}
return nil
}, dbtx.TxDefaultReadOnly)
if err != nil {
return pipelines, count, fmt.Errorf("failed to list pipelines: %w", err)

View File

@ -54,6 +54,7 @@ func ProvideController(
spaceStore store.SpaceStore,
pipelineStore store.PipelineStore,
principalStore store.PrincipalStore,
executionStore store.ExecutionStore,
ruleStore store.RuleStore,
settings *settings.Service,
principalInfoCache store.PrincipalInfoCache,
@ -77,7 +78,7 @@ func ProvideController(
) *Controller {
return NewController(config, tx, urlProvider,
authorizer,
repoStore, spaceStore, pipelineStore,
repoStore, spaceStore, pipelineStore, executionStore,
principalStore, ruleStore, settings, principalInfoCache, protectionManager, rpcClient, importer,
codeOwners, reporeporter, indexer, limiter, locker, auditService, mtxManager, identifierCheck,
repoChecks, publicAccess, labelSvc, instrumentation, userGroupStore, userGroupService)

View File

@ -32,9 +32,12 @@ func HandleListPipelines(repoCtrl *repo.Controller) http.HandlerFunc {
return
}
filter := request.ParseListQueryFilterFromRequest(r)
latest := request.GetLatestFromPath(r)
repos, totalCount, err := repoCtrl.ListPipelines(ctx, session, repoRef, latest, filter)
filter, err := request.ParseListPipelinesFilterFromRequest(r)
if err != nil {
render.TranslatedUserError(ctx, w, err)
}
repos, totalCount, err := repoCtrl.ListPipelines(ctx, session, repoRef, &filter)
if err != nil {
render.TranslatedUserError(ctx, w, err)
return

View File

@ -16,11 +16,14 @@ package request
import (
"net/http"
"github.com/harness/gitness/types"
)
const (
PathParamPipelineIdentifier = "pipeline_identifier"
PathParamExecutionNumber = "execution_number"
PathParamLastExecutions = "last_executions"
PathParamStageNumber = "stage_number"
PathParamStepNumber = "step_number"
PathParamTriggerIdentifier = "trigger_identifier"
@ -56,3 +59,19 @@ func GetLatestFromPath(r *http.Request) bool {
func GetTriggerIdentifierFromPath(r *http.Request) (string, error) {
return PathParamOrError(r, PathParamTriggerIdentifier)
}
func ParseListPipelinesFilterFromRequest(r *http.Request) (types.ListPipelinesFilter, error) {
lastExecs, err := QueryParamAsPositiveInt64OrDefault(r, PathParamLastExecutions, 10)
if err != nil {
return types.ListPipelinesFilter{}, err
}
return types.ListPipelinesFilter{
ListQueryFilter: types.ListQueryFilter{
Query: ParseQuery(r),
Pagination: ParsePaginationFromRequest(r),
},
Latest: GetLatestFromPath(r),
LastExecutions: lastExecs,
}, nil
}

View File

@ -127,7 +127,7 @@ func (c *Collector) Handle(ctx context.Context, _ string, _ job.ProgressReporter
}
// total pipelines in the system
totalPipelines, err := c.pipelineStore.Count(ctx, 0, types.ListQueryFilter{})
totalPipelines, err := c.pipelineStore.Count(ctx, 0, &types.ListPipelinesFilter{})
if err != nil {
return "", fmt.Errorf("failed to get pipelines count: %w", err)
}

View File

@ -752,11 +752,11 @@ type (
Update(ctx context.Context, pipeline *types.Pipeline) error
// List lists the pipelines present in a repository in the datastore.
List(ctx context.Context, repoID int64, pagination types.ListQueryFilter) ([]*types.Pipeline, error)
List(ctx context.Context, repoID int64, filter *types.ListPipelinesFilter) ([]*types.Pipeline, error)
// ListLatest lists the pipelines present in a repository in the datastore.
// It also returns latest build information for all the returned entries.
ListLatest(ctx context.Context, repoID int64, pagination types.ListQueryFilter) ([]*types.Pipeline, error)
ListLatest(ctx context.Context, repoID int64, filter *types.ListPipelinesFilter) ([]*types.Pipeline, error)
// UpdateOptLock updates the pipeline using the optimistic locking mechanism.
UpdateOptLock(
@ -768,7 +768,7 @@ type (
Delete(ctx context.Context, id int64) error
// Count the number of pipelines in a repository matching the given filter.
Count(ctx context.Context, repoID int64, filter types.ListQueryFilter) (int64, error)
Count(ctx context.Context, repoID int64, filter *types.ListPipelinesFilter) (int64, error)
// DeleteByIdentifier deletes a pipeline with a given identifier under a repo.
DeleteByIdentifier(ctx context.Context, repoID int64, identifier string) error
@ -834,6 +834,12 @@ type (
// List lists the executions for a given pipeline ID
List(ctx context.Context, pipelineID int64, pagination types.Pagination) ([]*types.Execution, error)
ListByPipelineIDs(
ctx context.Context,
pipelineIDs []int64,
maxRows int64,
) (map[int64][]*types.ExecutionInfo, error)
// Delete deletes an execution given a pipeline ID and an execution number
Delete(ctx context.Context, pipelineID int64, num int64) error

View File

@ -26,6 +26,7 @@ import (
"github.com/harness/gitness/types"
"github.com/harness/gitness/types/enum"
"github.com/Masterminds/squirrel"
"github.com/jmoiron/sqlx"
sqlxtypes "github.com/jmoiron/sqlx/types"
"github.com/pkg/errors"
@ -334,6 +335,50 @@ func (s *executionStore) List(
return mapInternalToExecutionList(dst)
}
func (s executionStore) ListByPipelineIDs(
ctx context.Context,
pipelineIDs []int64,
maxRows int64,
) (map[int64][]*types.ExecutionInfo, error) {
stmt := database.Builder.
Select("execution_number, execution_pipeline_id, execution_status").
FromSelect(
database.Builder.
Select(`
execution_number, execution_pipeline_id, execution_status,
ROW_NUMBER() OVER (
PARTITION BY execution_pipeline_id
ORDER BY execution_number DESC
) AS row_num
`).
From("executions").
Where(squirrel.Eq{"execution_pipeline_id": pipelineIDs}),
"ranked",
).
Where("row_num <= ?", maxRows)
sql, args, err := stmt.ToSql()
if err != nil {
return nil, errors.Wrap(err, "Failed to convert query to sql")
}
db := dbtx.GetAccessor(ctx, s.db)
var dst []*types.ExecutionInfo
if err = db.SelectContext(ctx, &dst, sql, args...); err != nil {
return nil, database.ProcessSQLErrorf(ctx, err, "Failed to list executions by pipeline IDs")
}
executionInfosMap := make(map[int64][]*types.ExecutionInfo)
for _, info := range dst {
executionInfosMap[info.PipelineID] = append(
executionInfosMap[info.PipelineID],
info,
)
}
return executionInfosMap, nil
}
// Count of executions in a pipeline, if pipelineID is 0 then return total number of executions.
func (s *executionStore) Count(ctx context.Context, pipelineID int64) (int64, error) {
stmt := database.Builder.

View File

@ -187,7 +187,7 @@ func (s *pipelineStore) Update(ctx context.Context, p *types.Pipeline) error {
func (s *pipelineStore) List(
ctx context.Context,
repoID int64,
filter types.ListQueryFilter,
filter *types.ListPipelinesFilter,
) ([]*types.Pipeline, error) {
stmt := database.Builder.
Select(pipelineColumns).
@ -290,7 +290,7 @@ func (s *pipelineStore) CountInSpace(
func (s *pipelineStore) ListLatest(
ctx context.Context,
repoID int64,
filter types.ListQueryFilter,
filter *types.ListPipelinesFilter,
) ([]*types.Pipeline, error) {
const pipelineExecutionColumns = pipelineColumns + `
,executions.execution_id
@ -387,7 +387,11 @@ func (s *pipelineStore) UpdateOptLock(ctx context.Context,
}
// Count of pipelines under a repo, if repoID is zero it will count all pipelines in the system.
func (s *pipelineStore) Count(ctx context.Context, repoID int64, filter types.ListQueryFilter) (int64, error) {
func (s *pipelineStore) Count(
ctx context.Context,
repoID int64,
filter *types.ListPipelinesFilter,
) (int64, error) {
stmt := database.Builder.
Select("count(*)").
From("pipelines")

View File

@ -172,6 +172,7 @@ func initSystem(ctx context.Context, config *types.Config) (*server.System, erro
return nil, err
}
pipelineStore := database.ProvidePipelineStore(db)
executionStore := database.ProvideExecutionStore(db)
ruleStore := database.ProvideRuleStore(db, principalInfoCache)
settingsStore := database.ProvideSettingsStore(db)
settingsService := settings.ProvideService(settingsStore)
@ -248,9 +249,8 @@ func initSystem(ctx context.Context, config *types.Config) (*server.System, erro
instrumentService := instrument.ProvideService()
userGroupStore := database.ProvideUserGroupStore(db)
searchService := usergroup.ProvideSearchService()
repoController := repo.ProvideController(config, transactor, provider, authorizer, repoStore, spaceStore, pipelineStore, principalStore, ruleStore, settingsService, principalInfoCache, protectionManager, gitInterface, repository, codeownersService, reporter, indexer, resourceLimiter, lockerLocker, auditService, mutexManager, repoIdentifier, repoCheck, publicaccessService, labelService, instrumentService, userGroupStore, searchService)
repoController := repo.ProvideController(config, transactor, provider, authorizer, repoStore, spaceStore, pipelineStore, principalStore, executionStore, ruleStore, settingsService, principalInfoCache, protectionManager, gitInterface, repository, codeownersService, reporter, indexer, resourceLimiter, lockerLocker, auditService, mutexManager, repoIdentifier, repoCheck, publicaccessService, labelService, instrumentService, userGroupStore, searchService)
reposettingsController := reposettings.ProvideController(authorizer, repoStore, settingsService, auditService)
executionStore := database.ProvideExecutionStore(db)
checkStore := database.ProvideCheckStore(db, principalInfoCache)
stageStore := database.ProvideStageStore(db)
schedulerScheduler, err := scheduler.ProvideScheduler(stageStore, mutexManager)

View File

@ -56,3 +56,9 @@ type Execution struct {
Version int64 `json:"-"`
Stages []*Stage `json:"stages,omitempty"`
}
type ExecutionInfo struct {
Number int64 `db:"execution_number" json:"number"`
PipelineID int64 `db:"execution_pipeline_id" json:"pipeline_id"`
Status enum.CIStatus `db:"execution_status" json:"status"`
}

View File

@ -28,10 +28,13 @@ type Pipeline struct {
DefaultBranch string `db:"pipeline_default_branch" json:"default_branch"`
ConfigPath string `db:"pipeline_config_path" json:"config_path"`
Created int64 `db:"pipeline_created" json:"created"`
// Execution contains information about the latest execution if available
Execution *Execution `db:"-" json:"execution,omitempty"`
Updated int64 `db:"pipeline_updated" json:"updated"`
Version int64 `db:"pipeline_version" json:"-"`
Execution *Execution `db:"-" json:"execution,omitempty"`
LastExecutions []*ExecutionInfo `db:"-" json:"last_executions,omitempty"`
Updated int64 `db:"pipeline_updated" json:"updated"`
Version int64 `db:"pipeline_version" json:"-"`
// Repo specific information not stored with pipelines
RepoUID string `db:"-" json:"repo_uid,omitempty"`
@ -49,3 +52,9 @@ func (s Pipeline) MarshalJSON() ([]byte, error) {
UID: s.Identifier,
})
}
type ListPipelinesFilter struct {
ListQueryFilter
Latest bool
LastExecutions int64
}