Don't propagate workflow error from agent back to agent (#6056)

This commit is contained in:
6543
2026-02-06 12:22:32 +01:00
committed by GitHub
parent cf78d5dd7e
commit 06818ee6ad
3 changed files with 132 additions and 1 deletions

View File

@@ -134,6 +134,9 @@ func (q *fifo) finished(ids []string, exitStatus model.StatusValue, err error) e
q.Lock()
defer q.Unlock()
// it's an external error so we wrap it
err = NewErrExternal(err)
var errs []error
// we first process the tasks itself
for _, id := range ids {
@@ -166,7 +169,10 @@ func (q *fifo) Wait(ctx context.Context, taskID string) error {
select {
case <-ctx.Done():
case <-state.done:
return state.error
// only return queue errors and no workflow errors
if !errors.Is(state.error, new(ErrExternal)) {
return state.error
}
}
}
return nil

View File

@@ -114,6 +114,103 @@ func TestFifoBasicOperations(t *testing.T) {
}
})
t.Run("external error filtered by Wait", func(t *testing.T) {
// Test that external errors (from Error/ErrorAtOnce) are wrapped as ErrExternal
// and filtered out by Wait(), while internal errors like context cancellation
// are passed through
// Test 1: External error is filtered by Wait
task1 := &model.Task{ID: "wait-external-1"}
assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task1}))
waitForProcess()
got1, err := q.Poll(ctx, 1, filterFnTrue)
assert.NoError(t, err)
// Start waiting on the task
waitDone := make(chan error, 1)
go func() {
waitDone <- q.Wait(ctx, got1.ID)
}()
time.Sleep(10 * time.Millisecond)
// Report an external error (agent reported error)
externalErr := fmt.Errorf("agent reported error")
assert.NoError(t, q.Error(ctx, got1.ID, externalErr))
// Wait should return nil (external error filtered out)
select {
case err := <-waitDone:
assert.NoError(t, err, "Wait should filter ErrExternal and return nil")
case <-time.After(time.Second):
t.Fatal("Wait should have returned")
}
// Test 2: Internal error (context cancellation) passes through Wait
task2 := &model.Task{ID: "wait-internal-1"}
assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task2}))
waitForProcess()
got2, err := q.Poll(ctx, 2, filterFnTrue)
assert.NoError(t, err)
waitCtx, waitCancel := context.WithCancelCause(ctx)
waitDone2 := make(chan error, 1)
go func() {
waitDone2 <- q.Wait(waitCtx, got2.ID)
}()
time.Sleep(10 * time.Millisecond)
waitCancel(nil)
// Context cancellation should cause Wait to return (internal error handling)
select {
case err := <-waitDone2:
// Wait returns nil when context is canceled (normal behavior)
assert.NoError(t, err, "Wait should return nil when context is canceled")
case <-time.After(time.Second):
t.Fatal("Wait should return when context is canceled")
}
// Clean up
assert.NoError(t, q.Done(ctx, got2.ID, model.StatusSuccess))
waitForProcess()
// Test 3: Multiple waiters all get nil when external error occurs
task3 := &model.Task{ID: "wait-multi-1"}
assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task3}))
waitForProcess()
got3, err := q.Poll(ctx, 3, filterFnTrue)
assert.NoError(t, err)
// Start multiple waiters
numWaiters := 3
waitResults := make(chan error, numWaiters)
for i := 0; i < numWaiters; i++ {
go func() {
waitResults <- q.Wait(ctx, got3.ID)
}()
}
time.Sleep(10 * time.Millisecond)
// Report an external error
batchErr := fmt.Errorf("external batch failure")
assert.NoError(t, q.ErrorAtOnce(ctx, []string{got3.ID}, batchErr))
// All waiters should return nil (external error filtered)
for i := 0; i < numWaiters; i++ {
select {
case err := <-waitResults:
assert.NoError(t, err, "All waiters should get nil when ErrExternal is filtered")
case <-time.After(time.Second):
t.Fatalf("Waiter %d didn't return in time", i)
}
}
})
t.Run("error at once", func(t *testing.T) {
task1 := &model.Task{ID: "batch-1"}
task2 := &model.Task{ID: "batch-2"}

View File

@@ -41,6 +41,34 @@ var (
ErrWorkerKicked = errors.New("worker was kicked")
)
// ErrExternal wraps an external error.
type ErrExternal struct {
err error
}
func (e *ErrExternal) Error() string {
return fmt.Sprintf("external error: %s", e.err)
}
// Unwrap allows errors.Is and errors.As to work with the wrapped error.
func (e *ErrExternal) Unwrap() error {
return e.err
}
// Is allows errors.Is to match against ErrExternal types.
func (e *ErrExternal) Is(target error) bool {
_, ok := target.(*ErrExternal)
return ok
}
// NewErrExternal wraps an error as external one so queue can filter it out if needed.
func NewErrExternal(err error) error {
if err == nil {
return nil
}
return &ErrExternal{err: err}
}
// InfoT provides runtime information.
type InfoT struct {
Pending []*model.Task `json:"pending"`