drone/build/triggerer/trigger.go

507 lines
14 KiB
Go

// Copyright 2022 Harness Inc. All rights reserved.
// Use of this source code is governed by the Polyform Free Trial License
// that can be found in the LICENSE.md file for this repository.
package triggerer
import (
"context"
"runtime/debug"
"time"
"github.com/harness/gitness/build/file"
"github.com/harness/gitness/build/scheduler"
"github.com/harness/gitness/build/triggerer/dag"
"github.com/harness/gitness/internal/store"
"github.com/harness/gitness/store/database/dbtx"
"github.com/harness/gitness/types"
"github.com/drone/drone-yaml/yaml"
"github.com/drone/drone-yaml/yaml/linter"
"github.com/jmoiron/sqlx"
"github.com/rs/zerolog/log"
)
// Trigger types
const (
TriggerHook = "@hook"
TriggerCron = "@cron"
)
var _ Triggerer = (*triggerer)(nil)
// Hook represents the payload of a post-commit hook.
type Hook struct {
Parent int64 `json:"parent"`
Trigger string `json:"trigger"`
Event string `json:"event"`
Action string `json:"action"`
Link string `json:"link"`
Timestamp int64 `json:"timestamp"`
Title string `json:"title"`
Message string `json:"message"`
Before string `json:"before"`
After string `json:"after"`
Ref string `json:"ref"`
Fork string `json:"hook"`
Source string `json:"source"`
Target string `json:"target"`
Author string `json:"author_login"`
AuthorName string `json:"author_name"`
AuthorEmail string `json:"author_email"`
AuthorAvatar string `json:"author_avatar"`
Deployment string `json:"deploy_to"`
DeploymentID int64 `json:"deploy_id"`
Debug bool `json:"debug"`
Cron string `json:"cron"`
Sender string `json:"sender"`
Params map[string]string `json:"params"`
}
// Triggerer is responsible for triggering a Execution from an
// incoming drone. If an execution is skipped a nil value is
// returned.
type Triggerer interface {
Trigger(ctx context.Context, pipeline *types.Pipeline, hook *Hook) (*types.Execution, error)
}
type triggerer struct {
executionStore store.ExecutionStore
stageStore store.StageStore
db *sqlx.DB
pipelineStore store.PipelineStore
fileService file.FileService
scheduler scheduler.Scheduler
repoStore store.RepoStore
}
func New(
executionStore store.ExecutionStore,
stageStore store.StageStore,
pipelineStore store.PipelineStore,
db *sqlx.DB,
repoStore store.RepoStore,
scheduler scheduler.Scheduler,
fileService file.FileService,
) *triggerer {
return &triggerer{
executionStore: executionStore,
stageStore: stageStore,
scheduler: scheduler,
db: db,
pipelineStore: pipelineStore,
fileService: fileService,
repoStore: repoStore,
}
}
func (t *triggerer) Trigger(
ctx context.Context,
pipeline *types.Pipeline,
base *Hook,
) (*types.Execution, error) {
log := log.With().
Str("ref", base.Ref).
Str("commit", base.After).
Logger()
log.Debug().Msg("trigger: received")
defer func() {
// taking the paranoid approach to recover from
// a panic that should absolutely never happen.
if r := recover(); r != nil {
log.Error().Msgf("runner: unexpected panic: %s", r)
debug.PrintStack()
}
}()
repo, err := t.repoStore.Find(ctx, pipeline.RepoID)
if err != nil {
log.Error().Err(err).Msg("could not find repo")
return nil, err
}
// if skipMessage(base) {
// logger.Infoln("trigger: skipping hook. found skip directive")
// return nil, nil
// }
// if base.Event == core.EventPullRequest {
// if repo.IgnorePulls {
// logger.Infoln("trigger: skipping hook. project ignores pull requests")
// return nil, nil
// }
// if repo.IgnoreForks && !strings.EqualFold(base.Fork, repo.Slug) {
// logger.Infoln("trigger: skipping hook. project ignores forks")
// return nil, nil
// }
// }
// user, err := t.users.Find(ctx, repo.UserID)
// if err != nil {
// logger = logger.WithError(err)
// logger.Warnln("trigger: cannot find repository owner")
// return nil, err
// }
// if user.Active == false {
// logger.Infoln("trigger: skipping hook. repository owner is inactive")
// return nil, nil
// }
// if the commit message is not included we should
// make an optional API call to the version control
// system to augment the available information.
// if base.Message == "" && base.After != "" {
// commit, err := t.commits.Find(ctx, user, repo.Slug, base.After)
// if err == nil && commit != nil {
// base.Message = commit.Message
// if base.AuthorEmail == "" {
// base.AuthorEmail = commit.Author.Email
// }
// if base.AuthorName == "" {
// base.AuthorName = commit.Author.Name
// }
// if base.AuthorAvatar == "" {
// base.AuthorAvatar = commit.Author.Avatar
// }
// }
// }
var ref string
if base.After != "" {
ref = base.After
} else if base.Ref != "" {
ref = base.Ref
} else {
ref = pipeline.DefaultBranch
}
file, err := t.fileService.Get(ctx, repo, pipeline.ConfigPath, ref)
if err != nil {
log.Error().Err(err).Msg("trigger: could not find yaml")
return nil, err
}
// // this code is temporarily in place to detect and convert
// // the legacy yaml configuration file to the new format.
// raw.Data, err = converter.ConvertString(raw.Data, converter.Metadata{
// Filename: repo.Config,
// URL: repo.Link,
// Ref: base.Ref,
// })
// if err != nil {
// logger = logger.WithError(err)
// logger.Warnln("trigger: cannot convert yaml")
// return t.createExecutionError(ctx, repo, base, err.Error())
// }
manifest, err := yaml.ParseString(string(file.Data))
if err != nil {
log.Warn().Err(err).Msg("trigger: cannot parse yaml")
return t.createExecutionError(ctx, pipeline, base, err.Error())
}
// verr := t.validate.Validate(ctx, &core.ValidateArgs{
// User: user,
// Repo: repo,
// Execution: tmpExecution,
// Config: raw,
// })
// switch verr {
// case core.ErrValidatorBlock:
// logger.Debugln("trigger: yaml validation error: block pipeline")
// case core.ErrValidatorSkip:
// logger.Debugln("trigger: yaml validation error: skip pipeline")
// return nil, nil
// default:
// if verr != nil {
// logger = logger.WithError(err)
// logger.Warnln("trigger: yaml validation error")
// return t.createExecutionError(ctx, repo, base, verr.Error())
// }
// }
err = linter.Manifest(manifest, true)
if err != nil {
log.Warn().Err(err).Msg("trigger: yaml linting error")
return t.createExecutionError(ctx, pipeline, base, err.Error())
}
verified := true
// if repo.Protected && base.Trigger == core.TriggerHook {
// key := signer.KeyString(repo.Secret)
// val := []byte(raw.Data)
// verified, _ = signer.Verify(val, key)
// }
// // if pipeline validation failed with a block error, the
// // pipeline verification should be set to false, which will
// // force manual review and approval.
// if verr == core.ErrValidatorBlock {
// verified = false
// }
var matched []*yaml.Pipeline
var dag = dag.New()
for _, document := range manifest.Resources {
pipeline, ok := document.(*yaml.Pipeline)
if !ok {
continue
}
// TODO add repo
// TODO add instance
// TODO add target
// TODO add ref
name := pipeline.Name
if name == "" {
name = "default"
}
node := dag.Add(pipeline.Name, pipeline.DependsOn...)
node.Skip = true
if skipBranch(pipeline, base.Target) {
log.Info().Str("pipeline", pipeline.Name).Msg("trigger: skipping pipeline, does not match branch")
} else if skipEvent(pipeline, base.Event) {
log.Info().Str("pipeline", pipeline.Name).Msg("trigger: skipping pipeline, does not match event")
} else if skipAction(pipeline, base.Action) {
log.Info().Str("pipeline", pipeline.Name).Msg("trigger: skipping pipeline, does not match action")
} else if skipRef(pipeline, base.Ref) {
log.Info().Str("pipeline", pipeline.Name).Msg("trigger: skipping pipeline, does not match ref")
} else if skipRepo(pipeline, repo.Path) {
log.Info().Str("pipeline", pipeline.Name).Msg("trigger: skipping pipeline, does not match repo")
} else if skipTarget(pipeline, base.Deployment) {
log.Info().Str("pipeline", pipeline.Name).Msg("trigger: skipping pipeline, does not match deploy target")
} else if skipCron(pipeline, base.Cron) {
log.Info().Str("pipeline", pipeline.Name).Msg("trigger: skipping pipeline, does not match cron job")
} else {
matched = append(matched, pipeline)
node.Skip = false
}
}
if dag.DetectCycles() {
return t.createExecutionError(ctx, pipeline, base, "Error: Dependency cycle detected in Pipeline")
}
if len(matched) == 0 {
log.Info().Msg("trigger: skipping execution, no matching pipelines")
return nil, nil
}
pipeline, err = t.pipelineStore.IncrementSeqNum(ctx, pipeline)
if err != nil {
log.Error().Err(err).Msg("trigger: cannot increment execution sequence number")
return nil, err
}
execution := &types.Execution{
RepoID: repo.ID,
PipelineID: pipeline.ID,
Trigger: base.Trigger,
Number: pipeline.Seq,
Parent: base.Parent,
Status: types.StatusPending,
Event: base.Event,
Action: base.Action,
Link: base.Link,
// Timestamp: base.Timestamp,
Title: trunc(base.Title, 2000),
Message: trunc(base.Message, 2000),
Before: base.Before,
After: base.After,
Ref: base.Ref,
Fork: base.Fork,
Source: base.Source,
Target: base.Target,
Author: base.Author,
AuthorName: base.AuthorName,
AuthorEmail: base.AuthorEmail,
AuthorAvatar: base.AuthorAvatar,
Params: base.Params,
Deploy: base.Deployment,
DeployID: base.DeploymentID,
Debug: base.Debug,
Sender: base.Sender,
Cron: base.Cron,
Created: time.Now().Unix(),
Updated: time.Now().Unix(),
}
stages := make([]*types.Stage, len(matched))
for i, match := range matched {
onSuccess := match.Trigger.Status.Match(types.StatusPassing)
onFailure := match.Trigger.Status.Match(types.StatusFailing)
if len(match.Trigger.Status.Include)+len(match.Trigger.Status.Exclude) == 0 {
onFailure = false
}
stage := &types.Stage{
RepoID: repo.ID,
Number: int64(i + 1),
Name: match.Name,
Kind: match.Kind,
Type: match.Type,
OS: match.Platform.OS,
Arch: match.Platform.Arch,
Variant: match.Platform.Variant,
Kernel: match.Platform.Version,
Limit: match.Concurrency.Limit,
Status: types.StatusWaiting,
DependsOn: match.DependsOn,
OnSuccess: onSuccess,
OnFailure: onFailure,
Labels: match.Node,
Created: time.Now().Unix(),
Updated: time.Now().Unix(),
}
if stage.Kind == "pipeline" && stage.Type == "" {
stage.Type = "docker"
}
if stage.OS == "" {
stage.OS = "linux"
}
if stage.Arch == "" {
stage.Arch = "amd64"
}
if stage.Name == "" {
stage.Name = "default"
}
if verified == false {
stage.Status = types.StatusBlocked
} else if len(stage.DependsOn) == 0 {
stage.Status = types.StatusPending
}
stages[i] = stage
}
for _, stage := range stages {
// here we re-work the dependencies for the stage to
// account for the fact that some steps may be skipped
// and may otherwise break the dependency chain.
stage.DependsOn = dag.Dependencies(stage.Name)
// if the stage is pending dependencies, but those
// dependencies are skipped, the stage can be executed
// immediately.
if stage.Status == types.StatusWaiting &&
len(stage.DependsOn) == 0 {
stage.Status = types.StatusPending
}
}
err = t.createExecutionWithStages(ctx, execution, stages)
if err != nil {
log.Error().Err(err).Msg("trigger: cannot create execution")
return nil, err
}
// err = t.status.Send(ctx, user, &core.StatusInput{
// Repo: repo,
// Execution: execution,
// })
// if err != nil {
// logger = logger.WithError(err)
// logger.Warnln("trigger: cannot create status")
// }
for _, stage := range stages {
if stage.Status != types.StatusPending {
continue
}
err = t.scheduler.Schedule(ctx, stage)
if err != nil {
log.Error().Err(err).Msg("trigger: cannot enqueue execution")
return nil, err
}
}
return execution, nil
}
func trunc(s string, i int) string {
runes := []rune(s)
if len(runes) > i {
return string(runes[:i])
}
return s
}
// createExecutionWithStages writes an execution along with its stages in a single transaction.
func (t *triggerer) createExecutionWithStages(
ctx context.Context,
execution *types.Execution,
stages []*types.Stage,
) error {
return dbtx.New(t.db).WithTx(ctx, func(ctx context.Context) error {
err := t.executionStore.Create(ctx, execution)
if err != nil {
return err
}
for _, stage := range stages {
stage.ExecutionID = execution.ID
err := t.stageStore.Create(ctx, stage)
if err != nil {
return err
}
}
return nil
})
}
// createExecutionError creates an execution with an error message.
func (t *triggerer) createExecutionError(
ctx context.Context,
pipeline *types.Pipeline,
base *Hook,
message string,
) (*types.Execution, error) {
log := log.With().
Str("ref", base.Ref).
Str("commit", base.After).
Logger()
pipeline, err := t.pipelineStore.IncrementSeqNum(ctx, pipeline)
if err != nil {
return nil, err
}
execution := &types.Execution{
RepoID: pipeline.RepoID,
Number: pipeline.Seq,
Parent: base.Parent,
Status: types.StatusError,
Error: message,
Event: base.Event,
Action: base.Action,
Link: base.Link,
Title: base.Title,
Message: base.Message,
Before: base.Before,
After: base.After,
Ref: base.Ref,
Fork: base.Fork,
Source: base.Source,
Target: base.Target,
Author: base.Author,
AuthorName: base.AuthorName,
AuthorEmail: base.AuthorEmail,
AuthorAvatar: base.AuthorAvatar,
Deploy: base.Deployment,
DeployID: base.DeploymentID,
Debug: base.Debug,
Sender: base.Sender,
Created: time.Now().Unix(),
Updated: time.Now().Unix(),
Started: time.Now().Unix(),
Finished: time.Now().Unix(),
}
err = t.executionStore.Create(ctx, execution)
if err != nil {
log.Error().Err(err).Msg("trigger: cannot create execution error")
return nil, err
}
return execution, err
}