drone/internal/pipeline/triggerer/trigger.go

520 lines
15 KiB
Go

// Copyright 2022 Harness Inc. All rights reserved.
// Use of this source code is governed by the Polyform Free Trial License
// that can be found in the LICENSE.md file for this repository.
package triggerer
import (
"context"
"fmt"
"regexp"
"runtime/debug"
"time"
"github.com/harness/gitness/internal/pipeline/checks"
"github.com/harness/gitness/internal/pipeline/file"
"github.com/harness/gitness/internal/pipeline/scheduler"
"github.com/harness/gitness/internal/pipeline/triggerer/dag"
"github.com/harness/gitness/internal/store"
"github.com/harness/gitness/store/database/dbtx"
"github.com/harness/gitness/types"
"github.com/harness/gitness/types/enum"
"github.com/drone-runners/drone-runner-docker/engine2/script"
"github.com/drone/drone-yaml/yaml"
"github.com/drone/drone-yaml/yaml/linter"
v1yaml "github.com/drone/spec/dist/go"
"github.com/drone/spec/dist/go/parse/expand"
"github.com/drone/spec/dist/go/parse/normalize"
"github.com/jmoiron/sqlx"
"github.com/rs/zerolog/log"
)
var _ Triggerer = (*triggerer)(nil)
// Hook represents the payload of a post-commit hook.
type Hook struct {
Parent int64 `json:"parent"`
Trigger string `json:"trigger"`
TriggeredBy int64 `json:"triggered_by"`
Action enum.TriggerAction `json:"action"`
Link string `json:"link"`
Timestamp int64 `json:"timestamp"`
Title string `json:"title"`
Message string `json:"message"`
Before string `json:"before"`
After string `json:"after"`
Ref string `json:"ref"`
Fork string `json:"fork"`
Source string `json:"source"`
Target string `json:"target"`
AuthorLogin string `json:"author_login"`
AuthorName string `json:"author_name"`
AuthorEmail string `json:"author_email"`
AuthorAvatar string `json:"author_avatar"`
Debug bool `json:"debug"`
Cron string `json:"cron"`
Sender string `json:"sender"`
Params map[string]string `json:"params"`
}
// Triggerer is responsible for triggering a Execution from an
// incoming hook (could be manual or webhook). If an execution is skipped a nil value is
// returned.
type Triggerer interface {
Trigger(ctx context.Context, pipeline *types.Pipeline, hook *Hook) (*types.Execution, error)
}
type triggerer struct {
executionStore store.ExecutionStore
checkStore store.CheckStore
stageStore store.StageStore
db *sqlx.DB
pipelineStore store.PipelineStore
fileService file.FileService
scheduler scheduler.Scheduler
repoStore store.RepoStore
}
func New(
executionStore store.ExecutionStore,
checkStore store.CheckStore,
stageStore store.StageStore,
pipelineStore store.PipelineStore,
db *sqlx.DB,
repoStore store.RepoStore,
scheduler scheduler.Scheduler,
fileService file.FileService,
) *triggerer {
return &triggerer{
executionStore: executionStore,
checkStore: checkStore,
stageStore: stageStore,
scheduler: scheduler,
db: db,
pipelineStore: pipelineStore,
fileService: fileService,
repoStore: repoStore,
}
}
func (t *triggerer) Trigger(
ctx context.Context,
pipeline *types.Pipeline,
base *Hook,
) (*types.Execution, error) {
log := log.With().
Int64("pipeline.id", pipeline.ID).
Str("trigger.ref", base.Ref).
Str("trigger.commit", base.After).
Logger()
log.Debug().Msg("trigger: received")
defer func() {
// taking the paranoid approach to recover from
// a panic that should absolutely never happen.
if r := recover(); r != nil {
log.Error().Msgf("runner: unexpected panic: %s", r)
debug.PrintStack()
}
}()
event := string(base.Action.GetTriggerEvent())
repo, err := t.repoStore.Find(ctx, pipeline.RepoID)
if err != nil {
log.Error().Err(err).Msg("could not find repo")
return nil, err
}
file, err := t.fileService.Get(ctx, repo, pipeline.ConfigPath, base.After)
if err != nil {
log.Error().Err(err).Msg("trigger: could not find yaml")
return nil, err
}
// For drone, follow the existing path of calculating dependencies, creating a DAG,
// and creating stages accordingly. For V1 YAML - for now we can just parse the stages
// and create them sequentially.
stages := []*types.Stage{}
if !isV1Yaml(file.Data) {
manifest, err := yaml.ParseString(string(file.Data))
if err != nil {
log.Warn().Err(err).Msg("trigger: cannot parse yaml")
return t.createExecutionWithError(ctx, pipeline, base, err.Error())
}
err = linter.Manifest(manifest, true)
if err != nil {
log.Warn().Err(err).Msg("trigger: yaml linting error")
return t.createExecutionWithError(ctx, pipeline, base, err.Error())
}
var matched []*yaml.Pipeline
var dag = dag.New()
for _, document := range manifest.Resources {
pipeline, ok := document.(*yaml.Pipeline)
if !ok {
continue
}
// TODO add repo
// TODO add instance
// TODO add target
// TODO add ref
name := pipeline.Name
if name == "" {
name = "default"
}
node := dag.Add(pipeline.Name, pipeline.DependsOn...)
node.Skip = true
if skipBranch(pipeline, base.Target) {
log.Info().Str("pipeline", pipeline.Name).Msg("trigger: skipping pipeline, does not match branch")
} else if skipEvent(pipeline, event) {
log.Info().Str("pipeline", pipeline.Name).Msg("trigger: skipping pipeline, does not match event")
} else if skipAction(pipeline, string(base.Action)) {
log.Info().Str("pipeline", pipeline.Name).Msg("trigger: skipping pipeline, does not match action")
} else if skipRef(pipeline, base.Ref) {
log.Info().Str("pipeline", pipeline.Name).Msg("trigger: skipping pipeline, does not match ref")
} else if skipRepo(pipeline, repo.Path) {
log.Info().Str("pipeline", pipeline.Name).Msg("trigger: skipping pipeline, does not match repo")
} else if skipCron(pipeline, base.Cron) {
log.Info().Str("pipeline", pipeline.Name).Msg("trigger: skipping pipeline, does not match cron job")
} else {
matched = append(matched, pipeline)
node.Skip = false
}
}
if dag.DetectCycles() {
return t.createExecutionWithError(ctx, pipeline, base, "Error: Dependency cycle detected in Pipeline")
}
if len(matched) == 0 {
log.Info().Msg("trigger: skipping execution, no matching pipelines")
return nil, nil
}
for i, match := range matched {
onSuccess := match.Trigger.Status.Match(string(enum.CIStatusSuccess))
onFailure := match.Trigger.Status.Match(string(enum.CIStatusFailure))
if len(match.Trigger.Status.Include)+len(match.Trigger.Status.Exclude) == 0 {
onFailure = false
}
now := time.Now().UnixMilli()
stage := &types.Stage{
RepoID: repo.ID,
Number: int64(i + 1),
Name: match.Name,
Kind: match.Kind,
Type: match.Type,
OS: match.Platform.OS,
Arch: match.Platform.Arch,
Variant: match.Platform.Variant,
Kernel: match.Platform.Version,
Limit: match.Concurrency.Limit,
Status: enum.CIStatusWaitingOnDeps,
DependsOn: match.DependsOn,
OnSuccess: onSuccess,
OnFailure: onFailure,
Labels: match.Node,
Created: now,
Updated: now,
}
if stage.Kind == "pipeline" && stage.Type == "" {
stage.Type = "docker"
}
if stage.OS == "" {
stage.OS = "linux"
}
if stage.Arch == "" {
stage.Arch = "amd64"
}
if stage.Name == "" {
stage.Name = "default"
}
if len(stage.DependsOn) == 0 {
stage.Status = enum.CIStatusPending
}
stages = append(stages, stage)
}
for _, stage := range stages {
// here we re-work the dependencies for the stage to
// account for the fact that some steps may be skipped
// and may otherwise break the dependency chain.
stage.DependsOn = dag.Dependencies(stage.Name)
// if the stage is pending dependencies, but those
// dependencies are skipped, the stage can be executed
// immediately.
if stage.Status == enum.CIStatusWaitingOnDeps &&
len(stage.DependsOn) == 0 {
stage.Status = enum.CIStatusPending
}
}
} else {
stages, err = parseV1Stages(file.Data, repo)
if err != nil {
return nil, fmt.Errorf("could not parse v1 YAML into stages: %w", err)
}
}
// Increment pipeline number using optimistic locking.
pipeline, err = t.pipelineStore.IncrementSeqNum(ctx, pipeline)
if err != nil {
log.Error().Err(err).Msg("trigger: cannot increment execution sequence number")
return nil, err
}
now := time.Now().UnixMilli()
execution := &types.Execution{
RepoID: repo.ID,
PipelineID: pipeline.ID,
Trigger: base.Trigger,
CreatedBy: base.TriggeredBy,
Number: pipeline.Seq,
Parent: base.Parent,
Status: enum.CIStatusPending,
Event: event,
Action: string(base.Action),
Link: base.Link,
// Timestamp: base.Timestamp,
Title: trunc(base.Title, 2000),
Message: trunc(base.Message, 2000),
Before: base.Before,
After: base.After,
Ref: base.Ref,
Fork: base.Fork,
Source: base.Source,
Target: base.Target,
Author: base.AuthorLogin,
AuthorName: base.AuthorName,
AuthorEmail: base.AuthorEmail,
AuthorAvatar: base.AuthorAvatar,
Params: base.Params,
Debug: base.Debug,
Sender: base.Sender,
Cron: base.Cron,
Created: now,
Updated: now,
}
err = t.createExecutionWithStages(ctx, execution, stages)
if err != nil {
log.Error().Err(err).Msg("trigger: cannot create execution")
return nil, err
}
// try to write to check store. log on failure but don't error out the execution
err = checks.Write(ctx, t.checkStore, execution, pipeline)
if err != nil {
log.Error().Err(err).Msg("trigger: could not write to check store")
}
for _, stage := range stages {
if stage.Status != enum.CIStatusPending {
continue
}
err = t.scheduler.Schedule(ctx, stage)
if err != nil {
log.Error().Err(err).Msg("trigger: cannot enqueue execution")
return nil, err
}
}
return execution, nil
}
func trunc(s string, i int) string {
runes := []rune(s)
if len(runes) > i {
return string(runes[:i])
}
return s
}
// parseV1Stages tries to parse the yaml into a list of stages and returns an error
// if we are unable to do so or the yaml contains something unexpected.
// Currently, all the stages will be executed one after the other on completion.
// Once we have depends on in v1, this will be changed to use the DAG.
func parseV1Stages(data []byte, repo *types.Repository) ([]*types.Stage, error) {
stages := []*types.Stage{}
// For V1 YAML, just go through the YAML and create stages serially for now
config, err := v1yaml.ParseBytes(data)
if err != nil {
return nil, fmt.Errorf("could not parse v1 yaml: %w", err)
}
// Expand matrix strategies in YAML
err = expand.Expand(config)
if err != nil {
return nil, fmt.Errorf("could not expand matrix stages in v1 yaml: %w", err)
}
// Normalize the config to make sure stage names and step names are unique
err = normalize.Normalize(config)
if err != nil {
return nil, fmt.Errorf("could not normalize v1 yaml: %w", err)
}
if config.Kind != "pipeline" {
return nil, fmt.Errorf("cannot support non-pipeline kinds in v1 at the moment: %w", err)
}
var prevStage string
switch v := config.Spec.(type) {
case *v1yaml.Pipeline:
// Expand expressions in strings
script.ExpandConfig(config, map[string]interface{}{}) // TODO: pass in params for resolution
for idx, stage := range v.Stages {
// Only parse CI stages for now
switch stage.Spec.(type) {
case *v1yaml.StageCI:
now := time.Now().UnixMilli()
var onSuccess, onFailure bool
onSuccess = true
if stage.When != nil {
if when := stage.When.Eval; when != "" {
// TODO: pass in params for resolution
onSuccess, onFailure, err = script.EvalWhen(when, map[string]interface{}{})
if err != nil {
return nil, fmt.Errorf("could not resolve when condition for stage: %w", err)
}
}
}
dependsOn := []string{}
if prevStage != "" {
dependsOn = append(dependsOn, prevStage)
}
status := enum.CIStatusWaitingOnDeps
// If the stage has no dependencies, it can be picked up for execution.
if len(dependsOn) == 0 {
status = enum.CIStatusPending
}
temp := &types.Stage{
RepoID: repo.ID,
Number: int64(idx + 1),
Name: stage.Id, // for v1, ID is the unique identifier per stage
Created: now,
Updated: now,
Status: status,
OnSuccess: onSuccess,
OnFailure: onFailure,
DependsOn: dependsOn,
}
prevStage = temp.Name
stages = append(stages, temp)
default:
return nil, fmt.Errorf("only CI stage supported in v1 at the moment")
}
}
default:
return nil, fmt.Errorf("unknown yaml: %w", err)
}
return stages, nil
}
// Checks whether YAML is V1 Yaml or drone Yaml
func isV1Yaml(data []byte) bool {
// if we are dealing with the legacy drone yaml, use
// the legacy drone engine.
return regexp.MustCompilePOSIX(`^spec:`).Match(data)
}
// createExecutionWithStages writes an execution along with its stages in a single transaction.
func (t *triggerer) createExecutionWithStages(
ctx context.Context,
execution *types.Execution,
stages []*types.Stage,
) error {
return dbtx.New(t.db).WithTx(ctx, func(ctx context.Context) error {
err := t.executionStore.Create(ctx, execution)
if err != nil {
return err
}
for _, stage := range stages {
stage.ExecutionID = execution.ID
err := t.stageStore.Create(ctx, stage)
if err != nil {
return err
}
}
return nil
})
}
// createExecutionWithError creates an execution with an error message.
func (t *triggerer) createExecutionWithError(
ctx context.Context,
pipeline *types.Pipeline,
base *Hook,
message string,
) (*types.Execution, error) {
log := log.With().
Int64("pipeline.id", pipeline.ID).
Str("trigger.ref", base.Ref).
Str("trigger.commit", base.After).
Logger()
pipeline, err := t.pipelineStore.IncrementSeqNum(ctx, pipeline)
if err != nil {
return nil, err
}
now := time.Now().UnixMilli()
execution := &types.Execution{
RepoID: pipeline.RepoID,
PipelineID: pipeline.ID,
Number: pipeline.Seq,
Parent: base.Parent,
Status: enum.CIStatusError,
Error: message,
Event: string(base.Action.GetTriggerEvent()),
Action: string(base.Action),
Link: base.Link,
Title: base.Title,
Message: base.Message,
CreatedBy: base.TriggeredBy,
Before: base.Before,
After: base.After,
Ref: base.Ref,
Fork: base.Fork,
Source: base.Source,
Target: base.Target,
Author: base.AuthorLogin,
AuthorName: base.AuthorName,
AuthorEmail: base.AuthorEmail,
AuthorAvatar: base.AuthorAvatar,
Debug: base.Debug,
Sender: base.Sender,
Created: now,
Updated: now,
Started: now,
Finished: now,
}
err = t.executionStore.Create(ctx, execution)
if err != nil {
log.Error().Err(err).Msg("trigger: cannot create execution error")
return nil, err
}
// try to write to check store, log on failure
err = checks.Write(ctx, t.checkStore, execution, pipeline)
if err != nil {
log.Error().Err(err).Msg("trigger: failed to update check")
}
return execution, nil
}