refactor: pipeline runtime add description and prevent race (#6167)

This commit is contained in:
6543
2026-03-08 09:50:11 +01:00
committed by GitHub
parent 0ca7fb515f
commit b6c8c8b885
5 changed files with 103 additions and 31 deletions

View File

@@ -31,7 +31,7 @@ import (
// Run starts the execution of a workflow and waits for it to complete.
func (r *Runtime) Run(runnerCtx context.Context) error {
logger := r.MakeLogger()
logger := r.makeLogger()
logger.Debug().Msgf("executing %d stages, in order of:", len(r.spec.Stages))
for stagePos, stage := range r.spec.Stages {
stepNames := []string{}
@@ -85,12 +85,12 @@ func (r *Runtime) Run(runnerCtx context.Context) error {
return pipeline_errors.ErrCancel
case err := <-r.execAll(runnerCtx, stage.Steps):
if err != nil {
r.err = err
r.err.Set(err)
}
}
}
return r.err
return r.err.Get()
}
// Updates the current status of a step.
@@ -105,7 +105,7 @@ func (r *Runtime) traceStep(processState *backend.State, err error, step *backen
state := new(state.State)
state.Pipeline.Started = r.started
state.Pipeline.Step = step
state.Pipeline.Error = r.err
state.Pipeline.Error = r.err.Get()
// We have an error while starting the step
if processState == nil && err != nil {
@@ -128,7 +128,7 @@ func (r *Runtime) traceStep(processState *backend.State, err error, step *backen
func (r *Runtime) execAll(runnerCtx context.Context, steps []*backend.Step) <-chan error {
var g errgroup.Group
done := make(chan error)
logger := r.MakeLogger()
logger := r.makeLogger()
for _, step := range steps {
// Required since otherwise the loop variable
@@ -141,14 +141,14 @@ func (r *Runtime) execAll(runnerCtx context.Context, steps []*backend.Step) <-ch
Str("step", step.Name).
Msg("prepare")
switch {
case r.err != nil && !step.OnFailure:
switch rErr := r.err.Get(); {
case rErr != nil && !step.OnFailure:
logger.Debug().
Str("step", step.Name).
Err(r.err).
Err(rErr).
Msgf("skipped due to OnFailure=%t", step.OnFailure)
return nil
case r.err == nil && !step.OnSuccess:
case rErr == nil && !step.OnSuccess:
logger.Debug().
Str("step", step.Name).
Msgf("skipped due to OnSuccess=%t", step.OnSuccess)
@@ -228,7 +228,7 @@ func (r *Runtime) exec(runnerCtx context.Context, step *backend.Step, setupWg *s
return nil, err
}
startTime := time.Now().Unix()
logger := r.MakeLogger()
logger := r.makeLogger()
var wg sync.WaitGroup
if r.logger != nil {

View File

@@ -22,43 +22,45 @@ import (
"go.woodpecker-ci.org/woodpecker/v3/pipeline/tracing"
)
// Option configures a runtime option.
// Option configures a Runtime.
type Option func(*Runtime)
// WithBackend returns an option configured with a runtime engine.
// WithBackend sets the backend engine used to run steps.
func WithBackend(backend backend.Backend) Option {
return func(r *Runtime) {
r.engine = backend
}
}
// WithLogger returns an option configured with a runtime logger.
// WithLogger sets the function used to stream step logs.
func WithLogger(logger logging.Logger) Option {
return func(r *Runtime) {
r.logger = logger
}
}
// WithTracer returns an option configured with a runtime tracer.
// WithTracer sets the tracer used to report step state changes.
func WithTracer(tracer tracing.Tracer) Option {
return func(r *Runtime) {
r.tracer = tracer
}
}
// WithContext returns an option configured with a context.
// WithContext sets the workflow execution context.
func WithContext(ctx context.Context) Option {
return func(r *Runtime) {
r.ctx = ctx
}
}
// WithDescription sets the descriptive key-value pairs attached to every log line.
func WithDescription(desc map[string]string) Option {
return func(r *Runtime) {
r.Description = desc
r.description = desc
}
}
// WithTaskUUID sets a specific task UUID instead of the auto-generated one.
func WithTaskUUID(uuid string) Option {
return func(r *Runtime) {
r.taskUUID = uuid

View File

@@ -24,46 +24,49 @@ import (
backend "go.woodpecker-ci.org/woodpecker/v3/pipeline/backend/types"
"go.woodpecker-ci.org/woodpecker/v3/pipeline/logging"
"go.woodpecker-ci.org/woodpecker/v3/pipeline/tracing"
"go.woodpecker-ci.org/woodpecker/v3/shared/utils"
)
// Runtime represents a workflow state executed by a specific backend.
// Each workflow gets its own state configuration at runtime.
// Each workflow gets its own Runtime instance.
type Runtime struct {
err error
// err holds the first error that occurred in the workflow.
err utils.Protected[error]
spec *backend.Config
engine backend.Backend
started int64
// The context a workflow is being executed with.
// All normal (non cleanup) operations must use this.
// Cleanup operations should use the runnerCtx passed to Run()
// ctx is the context for the current workflow execution.
// All normal (non-cleanup) step operations must use this context.
// Cleanup operations should use the runnerCtx passed to Run().
ctx context.Context
tracer tracing.Tracer
logger logging.Logger
taskUUID string
Description map[string]string // The runtime descriptors.
taskUUID string
description map[string]string
}
// New returns a new runtime using the specified runtime
// configuration and runtime engine.
// New returns a new Runtime for the given workflow spec and options.
func New(spec *backend.Config, opts ...Option) *Runtime {
r := new(Runtime)
r.Description = map[string]string{}
r.err = utils.NewProtected[error](nil)
r.description = map[string]string{}
r.spec = spec
r.ctx = context.Background()
r.taskUUID = ulid.Make().String()
for _, opts := range opts {
opts(r)
for _, opt := range opts {
opt(r)
}
return r
}
func (r *Runtime) MakeLogger() zerolog.Logger {
// makeLogger returns a logger enriched with all runtime description fields.
func (r *Runtime) makeLogger() zerolog.Logger {
logCtx := log.With()
for key, val := range r.Description {
for key, val := range r.description {
logCtx = logCtx.Str(key, val)
}
return logCtx.Logger()

View File

@@ -27,6 +27,9 @@ var (
shutdownCtxLock sync.Mutex
)
// GetShutdownCtx returns a context that is valid for shutdownTimeout after the
// first call. It is used as a fallback cleanup context when the runner context
// is already canceled.
func GetShutdownCtx() context.Context {
shutdownCtxLock.Lock()
defer shutdownCtxLock.Unlock()

64
shared/utils/protected.go Normal file
View File

@@ -0,0 +1,64 @@
// Copyright 2026 Woodpecker Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package utils
import (
"sync"
)
// Protected provides thread-safe read and write access to a value of type T.
type Protected[T any] interface {
// Get returns the current value using a read lock, allowing multiple concurrent
// readers. Safe to call from multiple goroutines simultaneously.
Get() T
// Set replaces the current value using an exclusive write lock.
// Blocks until all ongoing reads/writes complete.
Set(v T)
// Update performs an atomic read-modify-write operation under a single exclusive
// lock. The provided function receives the current value and returns the new value,
// eliminating the race condition that would occur with a separate Get + Set.
Update(fn func(T) T)
}
type protected[T any] struct {
mu sync.RWMutex
value T
}
// NewProtected creates and returns a new Protected wrapper initialized with the
// given value. Use this as the constructor instead of creating a protected struct directly.
func NewProtected[T any](initial T) Protected[T] {
return &protected[T]{value: initial}
}
func (p *protected[T]) Get() T {
p.mu.RLock()
defer p.mu.RUnlock()
return p.value
}
func (p *protected[T]) Set(v T) {
p.mu.Lock()
defer p.mu.Unlock()
p.value = v
}
func (p *protected[T]) Update(fn func(T) T) {
p.mu.Lock()
defer p.mu.Unlock()
p.value = fn(p.value)
}