Files
woodpecker/pipeline/runtime/executor.go
6543 a63b93f5ee Refactor pipeline engine (#6073)
restructure pipeline/*.go to use submodules

<!-- https://claude.ai/chat/1b8965d7-5bca-42c7-86b4-48c2d645c362 -->

- pipeline/error.go -> pipeline/errors/...
- pipeline/pipeline.go#Runtime -> pipeline/runtime/runtime.go
- pipeline/pipeline.go#execAll -> pipeline/runtime/executor.go
- pipeline/shutdown.go -> pipeline/runtime/shutdown.go
- pipeline/logger.go ->pipeline/logging
- pipeline/tracer.go -> pipeline/tracing
- pipeline/pipeline.go#State -> state/state.go
2026-02-13 11:56:43 +01:00

298 lines
7.4 KiB
Go

// Copyright 2023 Woodpecker Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package runtime
import (
"context"
"errors"
"strings"
"sync"
"time"
"golang.org/x/sync/errgroup"
backend "go.woodpecker-ci.org/woodpecker/v3/pipeline/backend/types"
pipeline_errors "go.woodpecker-ci.org/woodpecker/v3/pipeline/errors"
"go.woodpecker-ci.org/woodpecker/v3/pipeline/frontend/metadata"
"go.woodpecker-ci.org/woodpecker/v3/pipeline/state"
)
// Run starts the execution of a workflow and waits for it to complete.
func (r *Runtime) Run(runnerCtx context.Context) error {
logger := r.MakeLogger()
logger.Debug().Msgf("executing %d stages, in order of:", len(r.spec.Stages))
for stagePos, stage := range r.spec.Stages {
stepNames := []string{}
for _, step := range stage.Steps {
stepNames = append(stepNames, step.Name)
}
logger.Debug().
Int("StagePos", stagePos).
Str("Steps", strings.Join(stepNames, ",")).
Msg("stage")
}
defer func() {
ctx := runnerCtx //nolint:contextcheck
if ctx.Err() != nil {
ctx = GetShutdownCtx()
}
if err := r.engine.DestroyWorkflow(ctx, r.spec, r.taskUUID); err != nil {
logger.Error().Err(err).Msg("could not destroy engine")
}
}()
r.started = time.Now().Unix()
if err := r.engine.SetupWorkflow(runnerCtx, r.spec, r.taskUUID); err != nil {
var stepErr *pipeline_errors.ErrInvalidWorkflowSetup
if errors.As(err, &stepErr) {
state := new(state.State)
state.Pipeline.Step = stepErr.Step
state.Pipeline.Error = stepErr.Err
state.Process = backend.State{
Error: stepErr.Err,
Exited: true,
ExitCode: 1,
}
// Trace the error if we have a tracer
if r.tracer != nil {
if err := r.tracer.Trace(state); err != nil {
logger.Error().Err(err).Msg("failed to trace step error")
}
}
}
return err
}
for _, stage := range r.spec.Stages {
select {
case <-r.ctx.Done():
return pipeline_errors.ErrCancel
case err := <-r.execAll(runnerCtx, stage.Steps):
if err != nil {
r.err = err
}
}
}
return r.err
}
// Updates the current status of a step.
// If processState is nil, we assume the step did not start.
// If step did not started and err exists, it's a step start issue and step is done.
func (r *Runtime) traceStep(processState *backend.State, err error, step *backend.Step) error {
if r.tracer == nil {
// no tracer nothing to trace :)
return nil
}
state := new(state.State)
state.Pipeline.Started = r.started
state.Pipeline.Step = step
state.Pipeline.Error = r.err
// We have an error while starting the step
if processState == nil && err != nil {
state.Process = backend.State{
Error: err,
Exited: true,
OOMKilled: false,
}
} else if processState != nil {
state.Process = *processState
}
if traceErr := r.tracer.Trace(state); traceErr != nil {
return traceErr
}
return err
}
// Executes a set of parallel steps.
func (r *Runtime) execAll(runnerCtx context.Context, steps []*backend.Step) <-chan error {
var g errgroup.Group
done := make(chan error)
logger := r.MakeLogger()
for _, step := range steps {
// Required since otherwise the loop variable
// will be captured by the function. This will
// recreate the step "variable"
step := step
g.Go(func() error {
// Case the pipeline was already complete.
logger.Debug().
Str("step", step.Name).
Msg("prepare")
switch {
case r.err != nil && !step.OnFailure:
logger.Debug().
Str("step", step.Name).
Err(r.err).
Msgf("skipped due to OnFailure=%t", step.OnFailure)
return nil
case r.err == nil && !step.OnSuccess:
logger.Debug().
Str("step", step.Name).
Msgf("skipped due to OnSuccess=%t", step.OnSuccess)
return nil
}
// Trace started.
err := r.traceStep(nil, nil, step)
if err != nil {
return err
}
// add compatibility for drone-ci plugins
metadata.SetDroneEnviron(step.Environment)
logger.Debug().
Str("step", step.Name).
Msg("executing")
// setup exec func in a way it can be detached if needed
// wg will signal once
execAndTrace := func(wg *sync.WaitGroup) error {
processState, err := r.exec(runnerCtx, step, wg)
logger.Debug().
Str("step", step.Name).
Msg("complete")
// normalize context cancel error
if errors.Is(err, context.Canceled) {
err = pipeline_errors.ErrCancel
}
// Return the error after tracing it.
err = r.traceStep(processState, err, step)
if err != nil && step.Failure == metadata.FailureIgnore {
return nil
}
return err
}
// Report all errors until the setup happened.
// Afterwards errors will be dropped.
if step.Detached {
var wg sync.WaitGroup
wg.Add(1)
var setupErr error
go func() {
setupErr = execAndTrace(&wg)
}()
wg.Wait()
return setupErr
}
// run blocking
return execAndTrace(nil)
})
}
go func() {
done <- g.Wait()
close(done)
}()
return done
}
// Executes the step and returns the state and error.
func (r *Runtime) exec(runnerCtx context.Context, step *backend.Step, setupWg *sync.WaitGroup) (*backend.State, error) {
defer func() {
if setupWg != nil {
setupWg.Done()
}
}()
if err := r.engine.StartStep(r.ctx, step, r.taskUUID); err != nil { //nolint:contextcheck
return nil, err
}
startTime := time.Now().Unix()
logger := r.MakeLogger()
var wg sync.WaitGroup
if r.logger != nil {
rc, err := r.engine.TailStep(r.ctx, step, r.taskUUID) //nolint:contextcheck
if err != nil {
return nil, err
}
wg.Add(1)
go func() {
defer wg.Done()
if err := r.logger(step, rc); err != nil {
logger.Error().Err(err).Msg("process logging failed")
}
_ = rc.Close()
}()
}
// nothing else to block for detached process.
if setupWg != nil {
setupWg.Done()
// set to nil so the setupWg.Done in defer does not call it a second time
setupWg = nil
}
// We wait until all data was logged. (Needed for some backends like local as WaitStep kills the log stream)
wg.Wait()
waitState, err := r.engine.WaitStep(r.ctx, step, r.taskUUID) //nolint:contextcheck
if err != nil {
if errors.Is(err, context.Canceled) {
waitState.Error = pipeline_errors.ErrCancel
} else {
return nil, err
}
}
// It is important to use the runnerCtx here because
// in case the workflow was canceled we still have the docker daemon to stop the container.
if err := r.engine.DestroyStep(runnerCtx, step, r.taskUUID); err != nil {
return nil, err
}
// we update with our start time here
waitState.Started = startTime
// we handle cancel case
if ctxErr := r.ctx.Err(); ctxErr != nil && errors.Is(ctxErr, context.Canceled) {
waitState.Error = pipeline_errors.ErrCancel
}
if waitState.OOMKilled {
return waitState, &pipeline_errors.OomError{
UUID: step.UUID,
Code: waitState.ExitCode,
}
} else if waitState.ExitCode != 0 {
return waitState, &pipeline_errors.ExitError{
UUID: step.UUID,
Code: waitState.ExitCode,
}
}
return waitState, nil
}