diff --git a/pipeline/runtime/executor.go b/pipeline/runtime/executor.go index 9c1f94ad6..dfb7b0aee 100644 --- a/pipeline/runtime/executor.go +++ b/pipeline/runtime/executor.go @@ -31,7 +31,7 @@ import ( // Run starts the execution of a workflow and waits for it to complete. func (r *Runtime) Run(runnerCtx context.Context) error { - logger := r.MakeLogger() + logger := r.makeLogger() logger.Debug().Msgf("executing %d stages, in order of:", len(r.spec.Stages)) for stagePos, stage := range r.spec.Stages { stepNames := []string{} @@ -85,12 +85,12 @@ func (r *Runtime) Run(runnerCtx context.Context) error { return pipeline_errors.ErrCancel case err := <-r.execAll(runnerCtx, stage.Steps): if err != nil { - r.err = err + r.err.Set(err) } } } - return r.err + return r.err.Get() } // Updates the current status of a step. @@ -105,7 +105,7 @@ func (r *Runtime) traceStep(processState *backend.State, err error, step *backen state := new(state.State) state.Pipeline.Started = r.started state.Pipeline.Step = step - state.Pipeline.Error = r.err + state.Pipeline.Error = r.err.Get() // We have an error while starting the step if processState == nil && err != nil { @@ -128,7 +128,7 @@ func (r *Runtime) traceStep(processState *backend.State, err error, step *backen func (r *Runtime) execAll(runnerCtx context.Context, steps []*backend.Step) <-chan error { var g errgroup.Group done := make(chan error) - logger := r.MakeLogger() + logger := r.makeLogger() for _, step := range steps { // Required since otherwise the loop variable @@ -141,14 +141,14 @@ func (r *Runtime) execAll(runnerCtx context.Context, steps []*backend.Step) <-ch Str("step", step.Name). Msg("prepare") - switch { - case r.err != nil && !step.OnFailure: + switch rErr := r.err.Get(); { + case rErr != nil && !step.OnFailure: logger.Debug(). Str("step", step.Name). - Err(r.err). + Err(rErr). Msgf("skipped due to OnFailure=%t", step.OnFailure) return nil - case r.err == nil && !step.OnSuccess: + case rErr == nil && !step.OnSuccess: logger.Debug(). Str("step", step.Name). Msgf("skipped due to OnSuccess=%t", step.OnSuccess) @@ -228,7 +228,7 @@ func (r *Runtime) exec(runnerCtx context.Context, step *backend.Step, setupWg *s return nil, err } startTime := time.Now().Unix() - logger := r.MakeLogger() + logger := r.makeLogger() var wg sync.WaitGroup if r.logger != nil { diff --git a/pipeline/runtime/option.go b/pipeline/runtime/option.go index f87d4a108..28cd3009e 100644 --- a/pipeline/runtime/option.go +++ b/pipeline/runtime/option.go @@ -22,43 +22,45 @@ import ( "go.woodpecker-ci.org/woodpecker/v3/pipeline/tracing" ) -// Option configures a runtime option. +// Option configures a Runtime. type Option func(*Runtime) -// WithBackend returns an option configured with a runtime engine. +// WithBackend sets the backend engine used to run steps. func WithBackend(backend backend.Backend) Option { return func(r *Runtime) { r.engine = backend } } -// WithLogger returns an option configured with a runtime logger. +// WithLogger sets the function used to stream step logs. func WithLogger(logger logging.Logger) Option { return func(r *Runtime) { r.logger = logger } } -// WithTracer returns an option configured with a runtime tracer. +// WithTracer sets the tracer used to report step state changes. func WithTracer(tracer tracing.Tracer) Option { return func(r *Runtime) { r.tracer = tracer } } -// WithContext returns an option configured with a context. +// WithContext sets the workflow execution context. func WithContext(ctx context.Context) Option { return func(r *Runtime) { r.ctx = ctx } } +// WithDescription sets the descriptive key-value pairs attached to every log line. func WithDescription(desc map[string]string) Option { return func(r *Runtime) { - r.Description = desc + r.description = desc } } +// WithTaskUUID sets a specific task UUID instead of the auto-generated one. func WithTaskUUID(uuid string) Option { return func(r *Runtime) { r.taskUUID = uuid diff --git a/pipeline/runtime/runtime.go b/pipeline/runtime/runtime.go index 5fb197b4a..7cf4cb4a8 100644 --- a/pipeline/runtime/runtime.go +++ b/pipeline/runtime/runtime.go @@ -24,46 +24,49 @@ import ( backend "go.woodpecker-ci.org/woodpecker/v3/pipeline/backend/types" "go.woodpecker-ci.org/woodpecker/v3/pipeline/logging" "go.woodpecker-ci.org/woodpecker/v3/pipeline/tracing" + "go.woodpecker-ci.org/woodpecker/v3/shared/utils" ) // Runtime represents a workflow state executed by a specific backend. -// Each workflow gets its own state configuration at runtime. +// Each workflow gets its own Runtime instance. type Runtime struct { - err error + // err holds the first error that occurred in the workflow. + err utils.Protected[error] + spec *backend.Config engine backend.Backend started int64 - // The context a workflow is being executed with. - // All normal (non cleanup) operations must use this. - // Cleanup operations should use the runnerCtx passed to Run() + // ctx is the context for the current workflow execution. + // All normal (non-cleanup) step operations must use this context. + // Cleanup operations should use the runnerCtx passed to Run(). ctx context.Context tracer tracing.Tracer logger logging.Logger - taskUUID string - - Description map[string]string // The runtime descriptors. + taskUUID string + description map[string]string } -// New returns a new runtime using the specified runtime -// configuration and runtime engine. +// New returns a new Runtime for the given workflow spec and options. func New(spec *backend.Config, opts ...Option) *Runtime { r := new(Runtime) - r.Description = map[string]string{} + r.err = utils.NewProtected[error](nil) + r.description = map[string]string{} r.spec = spec r.ctx = context.Background() r.taskUUID = ulid.Make().String() - for _, opts := range opts { - opts(r) + for _, opt := range opts { + opt(r) } return r } -func (r *Runtime) MakeLogger() zerolog.Logger { +// makeLogger returns a logger enriched with all runtime description fields. +func (r *Runtime) makeLogger() zerolog.Logger { logCtx := log.With() - for key, val := range r.Description { + for key, val := range r.description { logCtx = logCtx.Str(key, val) } return logCtx.Logger() diff --git a/pipeline/runtime/shutdown.go b/pipeline/runtime/shutdown.go index d53cf1c0d..c6896cc8a 100644 --- a/pipeline/runtime/shutdown.go +++ b/pipeline/runtime/shutdown.go @@ -27,6 +27,9 @@ var ( shutdownCtxLock sync.Mutex ) +// GetShutdownCtx returns a context that is valid for shutdownTimeout after the +// first call. It is used as a fallback cleanup context when the runner context +// is already canceled. func GetShutdownCtx() context.Context { shutdownCtxLock.Lock() defer shutdownCtxLock.Unlock() diff --git a/shared/utils/protected.go b/shared/utils/protected.go new file mode 100644 index 000000000..0a01ca99c --- /dev/null +++ b/shared/utils/protected.go @@ -0,0 +1,64 @@ +// Copyright 2026 Woodpecker Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package utils + +import ( + "sync" +) + +// Protected provides thread-safe read and write access to a value of type T. +type Protected[T any] interface { + // Get returns the current value using a read lock, allowing multiple concurrent + // readers. Safe to call from multiple goroutines simultaneously. + Get() T + + // Set replaces the current value using an exclusive write lock. + // Blocks until all ongoing reads/writes complete. + Set(v T) + + // Update performs an atomic read-modify-write operation under a single exclusive + // lock. The provided function receives the current value and returns the new value, + // eliminating the race condition that would occur with a separate Get + Set. + Update(fn func(T) T) +} + +type protected[T any] struct { + mu sync.RWMutex + value T +} + +// NewProtected creates and returns a new Protected wrapper initialized with the +// given value. Use this as the constructor instead of creating a protected struct directly. +func NewProtected[T any](initial T) Protected[T] { + return &protected[T]{value: initial} +} + +func (p *protected[T]) Get() T { + p.mu.RLock() + defer p.mu.RUnlock() + return p.value +} + +func (p *protected[T]) Set(v T) { + p.mu.Lock() + defer p.mu.Unlock() + p.value = v +} + +func (p *protected[T]) Update(fn func(T) T) { + p.mu.Lock() + defer p.mu.Unlock() + p.value = fn(p.value) +}