mirror of
https://github.com/woodpecker-ci/woodpecker.git
synced 2026-03-15 17:13:46 +01:00
Drop error only on purpose or else report back or log (#514)
- Remove Deadcode - Simplify Code - Drop error only on purpose
This commit is contained in:
@@ -6,6 +6,8 @@ import (
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
var noContext = context.Background()
|
||||
@@ -14,7 +16,7 @@ func TestFifo(t *testing.T) {
|
||||
want := &Task{ID: "1"}
|
||||
|
||||
q := New()
|
||||
q.Push(noContext, want)
|
||||
assert.NoError(t, q.Push(noContext, want))
|
||||
info := q.Info(noContext)
|
||||
if len(info.Pending) != 1 {
|
||||
t.Errorf("expect task in pending queue")
|
||||
@@ -37,7 +39,7 @@ func TestFifo(t *testing.T) {
|
||||
return
|
||||
}
|
||||
|
||||
q.Done(noContext, got.ID, StatusSuccess)
|
||||
assert.NoError(t, q.Done(noContext, got.ID, StatusSuccess))
|
||||
info = q.Info(noContext)
|
||||
if len(info.Pending) != 0 {
|
||||
t.Errorf("expect task removed from pending queue")
|
||||
@@ -54,7 +56,7 @@ func TestFifoExpire(t *testing.T) {
|
||||
|
||||
q := New().(*fifo)
|
||||
q.extension = 0
|
||||
q.Push(noContext, want)
|
||||
assert.NoError(t, q.Push(noContext, want))
|
||||
info := q.Info(noContext)
|
||||
if len(info.Pending) != 1 {
|
||||
t.Errorf("expect task in pending queue")
|
||||
@@ -78,7 +80,7 @@ func TestFifoWait(t *testing.T) {
|
||||
want := &Task{ID: "1"}
|
||||
|
||||
q := New().(*fifo)
|
||||
q.Push(noContext, want)
|
||||
assert.NoError(t, q.Push(noContext, want))
|
||||
|
||||
got, _ := q.Poll(noContext, func(*Task) bool { return true })
|
||||
if got != want {
|
||||
@@ -89,12 +91,12 @@ func TestFifoWait(t *testing.T) {
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
q.Wait(noContext, got.ID)
|
||||
assert.NoError(t, q.Wait(noContext, got.ID))
|
||||
wg.Done()
|
||||
}()
|
||||
|
||||
<-time.After(time.Millisecond)
|
||||
q.Done(noContext, got.ID, StatusSuccess)
|
||||
assert.NoError(t, q.Done(noContext, got.ID, StatusSuccess))
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
@@ -102,7 +104,7 @@ func TestFifoEvict(t *testing.T) {
|
||||
t1 := &Task{ID: "1"}
|
||||
|
||||
q := New()
|
||||
q.Push(noContext, t1)
|
||||
assert.NoError(t, q.Push(noContext, t1))
|
||||
info := q.Info(noContext)
|
||||
if len(info.Pending) != 1 {
|
||||
t.Errorf("expect task in pending queue")
|
||||
@@ -131,7 +133,7 @@ func TestFifoDependencies(t *testing.T) {
|
||||
}
|
||||
|
||||
q := New().(*fifo)
|
||||
q.PushAtOnce(noContext, []*Task{task2, task1})
|
||||
assert.NoError(t, q.PushAtOnce(noContext, []*Task{task2, task1}))
|
||||
|
||||
got, _ := q.Poll(noContext, func(*Task) bool { return true })
|
||||
if got != task1 {
|
||||
@@ -139,7 +141,7 @@ func TestFifoDependencies(t *testing.T) {
|
||||
return
|
||||
}
|
||||
|
||||
q.Done(noContext, got.ID, StatusSuccess)
|
||||
assert.NoError(t, q.Done(noContext, got.ID, StatusSuccess))
|
||||
|
||||
got, _ = q.Poll(noContext, func(*Task) bool { return true })
|
||||
if got != task2 {
|
||||
@@ -167,7 +169,7 @@ func TestFifoErrors(t *testing.T) {
|
||||
}
|
||||
|
||||
q := New().(*fifo)
|
||||
q.PushAtOnce(noContext, []*Task{task2, task3, task1})
|
||||
assert.NoError(t, q.PushAtOnce(noContext, []*Task{task2, task3, task1}))
|
||||
|
||||
got, _ := q.Poll(noContext, func(*Task) bool { return true })
|
||||
if got != task1 {
|
||||
@@ -175,7 +177,7 @@ func TestFifoErrors(t *testing.T) {
|
||||
return
|
||||
}
|
||||
|
||||
q.Error(noContext, got.ID, fmt.Errorf("exitcode 1, there was an error"))
|
||||
assert.NoError(t, q.Error(noContext, got.ID, fmt.Errorf("exitcode 1, there was an error")))
|
||||
|
||||
got, _ = q.Poll(noContext, func(*Task) bool { return true })
|
||||
if got != task2 {
|
||||
@@ -216,7 +218,7 @@ func TestFifoErrors2(t *testing.T) {
|
||||
}
|
||||
|
||||
q := New().(*fifo)
|
||||
q.PushAtOnce(noContext, []*Task{task2, task3, task1})
|
||||
assert.NoError(t, q.PushAtOnce(noContext, []*Task{task2, task3, task1}))
|
||||
|
||||
for i := 0; i < 2; i++ {
|
||||
got, _ := q.Poll(noContext, func(*Task) bool { return true })
|
||||
@@ -226,10 +228,10 @@ func TestFifoErrors2(t *testing.T) {
|
||||
}
|
||||
|
||||
if got != task1 {
|
||||
q.Done(noContext, got.ID, StatusSuccess)
|
||||
assert.NoError(t, q.Done(noContext, got.ID, StatusSuccess))
|
||||
}
|
||||
if got != task2 {
|
||||
q.Error(noContext, got.ID, fmt.Errorf("exitcode 1, there was an error"))
|
||||
assert.NoError(t, q.Error(noContext, got.ID, fmt.Errorf("exitcode 1, there was an error")))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -263,7 +265,7 @@ func TestFifoErrorsMultiThread(t *testing.T) {
|
||||
}
|
||||
|
||||
q := New().(*fifo)
|
||||
q.PushAtOnce(noContext, []*Task{task2, task3, task1})
|
||||
assert.NoError(t, q.PushAtOnce(noContext, []*Task{task2, task3, task1}))
|
||||
|
||||
obtainedWorkCh := make(chan *Task)
|
||||
|
||||
@@ -291,7 +293,7 @@ func TestFifoErrorsMultiThread(t *testing.T) {
|
||||
return
|
||||
} else {
|
||||
task1Processed = true
|
||||
q.Error(noContext, got.ID, fmt.Errorf("exitcode 1, there was an error"))
|
||||
assert.NoError(t, q.Error(noContext, got.ID, fmt.Errorf("exitcode 1, there was an error")))
|
||||
go func() {
|
||||
for {
|
||||
fmt.Printf("Worker spawned\n")
|
||||
@@ -306,7 +308,7 @@ func TestFifoErrorsMultiThread(t *testing.T) {
|
||||
return
|
||||
} else {
|
||||
task2Processed = true
|
||||
q.Done(noContext, got.ID, StatusSuccess)
|
||||
assert.NoError(t, q.Done(noContext, got.ID, StatusSuccess))
|
||||
go func() {
|
||||
for {
|
||||
fmt.Printf("Worker spawned\n")
|
||||
@@ -356,14 +358,14 @@ func TestFifoTransitiveErrors(t *testing.T) {
|
||||
}
|
||||
|
||||
q := New().(*fifo)
|
||||
q.PushAtOnce(noContext, []*Task{task2, task3, task1})
|
||||
assert.NoError(t, q.PushAtOnce(noContext, []*Task{task2, task3, task1}))
|
||||
|
||||
got, _ := q.Poll(noContext, func(*Task) bool { return true })
|
||||
if got != task1 {
|
||||
t.Errorf("expect task1 returned from queue as task2 depends on it")
|
||||
return
|
||||
}
|
||||
q.Error(noContext, got.ID, fmt.Errorf("exitcode 1, there was an error"))
|
||||
assert.NoError(t, q.Error(noContext, got.ID, fmt.Errorf("exitcode 1, there was an error")))
|
||||
|
||||
got, _ = q.Poll(noContext, func(*Task) bool { return true })
|
||||
if got != task2 {
|
||||
@@ -374,7 +376,7 @@ func TestFifoTransitiveErrors(t *testing.T) {
|
||||
t.Errorf("expect task2 should not run, since task1 failed")
|
||||
return
|
||||
}
|
||||
q.Done(noContext, got.ID, StatusSkipped)
|
||||
assert.NoError(t, q.Done(noContext, got.ID, StatusSkipped))
|
||||
|
||||
got, _ = q.Poll(noContext, func(*Task) bool { return true })
|
||||
if got != task3 {
|
||||
@@ -406,12 +408,12 @@ func TestFifoCancel(t *testing.T) {
|
||||
}
|
||||
|
||||
q := New().(*fifo)
|
||||
q.PushAtOnce(noContext, []*Task{task2, task3, task1})
|
||||
assert.NoError(t, q.PushAtOnce(noContext, []*Task{task2, task3, task1}))
|
||||
|
||||
_, _ = q.Poll(noContext, func(*Task) bool { return true })
|
||||
q.Error(noContext, task1.ID, fmt.Errorf("cancelled"))
|
||||
q.Error(noContext, task2.ID, fmt.Errorf("cancelled"))
|
||||
q.Error(noContext, task3.ID, fmt.Errorf("cancelled"))
|
||||
assert.NoError(t, q.Error(noContext, task1.ID, fmt.Errorf("cancelled")))
|
||||
assert.NoError(t, q.Error(noContext, task2.ID, fmt.Errorf("cancelled")))
|
||||
assert.NoError(t, q.Error(noContext, task3.ID, fmt.Errorf("cancelled")))
|
||||
|
||||
info := q.Info(noContext)
|
||||
if len(info.Pending) != 0 {
|
||||
@@ -435,7 +437,7 @@ func TestFifoPause(t *testing.T) {
|
||||
|
||||
q.Pause()
|
||||
t0 := time.Now()
|
||||
q.Push(noContext, task1)
|
||||
assert.NoError(t, q.Push(noContext, task1))
|
||||
time.Sleep(20 * time.Millisecond)
|
||||
q.Resume()
|
||||
|
||||
@@ -447,7 +449,7 @@ func TestFifoPause(t *testing.T) {
|
||||
}
|
||||
|
||||
q.Pause()
|
||||
q.Push(noContext, task1)
|
||||
assert.NoError(t, q.Push(noContext, task1))
|
||||
q.Resume()
|
||||
_, _ = q.Poll(noContext, func(*Task) bool { return true })
|
||||
}
|
||||
@@ -459,7 +461,7 @@ func TestFifoPauseResume(t *testing.T) {
|
||||
|
||||
q := New().(*fifo)
|
||||
q.Pause()
|
||||
q.Push(noContext, task1)
|
||||
assert.NoError(t, q.Push(noContext, task1))
|
||||
q.Resume()
|
||||
|
||||
_, _ = q.Poll(noContext, func(*Task) bool { return true })
|
||||
@@ -484,7 +486,7 @@ func TestWaitingVsPending(t *testing.T) {
|
||||
}
|
||||
|
||||
q := New().(*fifo)
|
||||
q.PushAtOnce(noContext, []*Task{task2, task3, task1})
|
||||
assert.NoError(t, q.PushAtOnce(noContext, []*Task{task2, task3, task1}))
|
||||
|
||||
got, _ := q.Poll(noContext, func(*Task) bool { return true })
|
||||
|
||||
@@ -493,7 +495,7 @@ func TestWaitingVsPending(t *testing.T) {
|
||||
t.Errorf("2 should wait on deps")
|
||||
}
|
||||
|
||||
q.Error(noContext, got.ID, fmt.Errorf("exitcode 1, there was an error"))
|
||||
assert.NoError(t, q.Error(noContext, got.ID, fmt.Errorf("exitcode 1, there was an error")))
|
||||
got, _ = q.Poll(noContext, func(*Task) bool { return true })
|
||||
|
||||
info = q.Info(noContext)
|
||||
|
||||
@@ -18,6 +18,7 @@ package queue
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/rs/zerolog/log"
|
||||
|
||||
"github.com/woodpecker-ci/woodpecker/server/model"
|
||||
@@ -38,7 +39,9 @@ func WithTaskStore(q Queue, s model.TaskStore) Queue {
|
||||
DepStatus: make(map[string]string),
|
||||
})
|
||||
}
|
||||
q.PushAtOnce(context.Background(), toEnqueue)
|
||||
if err := q.PushAtOnce(context.Background(), toEnqueue); err != nil {
|
||||
log.Error().Err(err).Msg("PushAtOnce failed")
|
||||
}
|
||||
return &persistentQueue{q, s}
|
||||
}
|
||||
|
||||
@@ -49,35 +52,44 @@ type persistentQueue struct {
|
||||
|
||||
// Push pushes a task to the tail of this queue.
|
||||
func (q *persistentQueue) Push(c context.Context, task *Task) error {
|
||||
q.store.TaskInsert(&model.Task{
|
||||
if err := q.store.TaskInsert(&model.Task{
|
||||
ID: task.ID,
|
||||
Data: task.Data,
|
||||
Labels: task.Labels,
|
||||
Dependencies: task.Dependencies,
|
||||
RunOn: task.RunOn,
|
||||
})
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
err := q.Queue.Push(c, task)
|
||||
if err != nil {
|
||||
q.store.TaskDelete(task.ID)
|
||||
if err2 := q.store.TaskDelete(task.ID); err2 != nil {
|
||||
err = errors.Wrapf(err, "delete task '%s' failed: %v", task.ID, err2)
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// PushAtOnce pushes multiple tasks to the tail of this queue.
|
||||
func (q *persistentQueue) PushAtOnce(c context.Context, tasks []*Task) error {
|
||||
// TODO: invent store.NewSession who return context including a session and make TaskInsert & TaskDelete use it
|
||||
for _, task := range tasks {
|
||||
q.store.TaskInsert(&model.Task{
|
||||
if err := q.store.TaskInsert(&model.Task{
|
||||
ID: task.ID,
|
||||
Data: task.Data,
|
||||
Labels: task.Labels,
|
||||
Dependencies: task.Dependencies,
|
||||
RunOn: task.RunOn,
|
||||
})
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
err := q.Queue.PushAtOnce(c, tasks)
|
||||
if err != nil {
|
||||
for _, task := range tasks {
|
||||
q.store.TaskDelete(task.ID)
|
||||
if err := q.store.TaskDelete(task.ID); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return err
|
||||
@@ -101,18 +113,20 @@ func (q *persistentQueue) Poll(c context.Context, f Filter) (*Task, error) {
|
||||
func (q *persistentQueue) Evict(c context.Context, id string) error {
|
||||
err := q.Queue.Evict(c, id)
|
||||
if err == nil {
|
||||
q.store.TaskDelete(id)
|
||||
return q.store.TaskDelete(id)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// EvictAtOnce removes a pending task from the queue.
|
||||
func (q *persistentQueue) EvictAtOnce(c context.Context, ids []string) error {
|
||||
err := q.Queue.EvictAtOnce(c, ids)
|
||||
if err == nil {
|
||||
for _, id := range ids {
|
||||
q.store.TaskDelete(id)
|
||||
if err := q.Queue.EvictAtOnce(c, ids); err != nil {
|
||||
return err
|
||||
}
|
||||
for _, id := range ids {
|
||||
if err := q.store.TaskDelete(id); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return err
|
||||
return nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user