From f13ffc2c8f668b945c0c72451568625c599aaca3 Mon Sep 17 00:00:00 2001 From: Anbraten Date: Tue, 21 Mar 2023 14:10:43 +0100 Subject: [PATCH] Save agent-id for tasks and add endpoint to get agent tasks (#1631) Save which agent is running a task. This is now visible in the admin UI in the queue and in the agent details screen. # changes - [x] save id of agent executing a task - [x] add endpoint to get tasks of an agent for #999 - [x] show assigned agent-id in queue - [x] (offtopic) use same colors for queue stats and icons (similar to the ones used by pipelines) - [x] (offtopic) use badges for queue labels & dependencies ![image](https://user-images.githubusercontent.com/6918444/226541271-23f3b7b2-7a08-45c2-a2e6-1c7fc31b6f1d.png) --- server/api/agent.go | 25 +++++++++++ server/grpc/rpc.go | 21 +++------ server/model/step.go | 4 +- server/model/task.go | 9 +--- server/queue/fifo.go | 5 ++- server/queue/fifo_test.go | 44 +++++++++---------- server/queue/persistent.go | 4 +- server/queue/queue.go | 2 +- server/router/api.go | 1 + server/store/datastore/step_test.go | 4 +- web/components.d.ts | 2 +- web/src/assets/locales/en.json | 6 ++- .../admin/settings/AdminAgentsTab.vue | 40 +++++++++++------ .../admin/settings/AdminQueueTab.vue | 19 ++++++-- .../admin/settings/queue/AdminQueueStats.vue | 8 ++-- web/src/components/atomic/Badge.vue | 7 ++- web/src/lib/api/types/queue.ts | 1 + 17 files changed, 124 insertions(+), 78 deletions(-) diff --git a/server/api/agent.go b/server/api/agent.go index 3582fd917..7b1e9d4cb 100644 --- a/server/api/agent.go +++ b/server/api/agent.go @@ -22,6 +22,7 @@ import ( "github.com/gin-gonic/gin" "github.com/gorilla/securecookie" + "github.com/woodpecker-ci/woodpecker/server" "github.com/woodpecker-ci/woodpecker/server/model" "github.com/woodpecker-ci/woodpecker/server/router/middleware/session" "github.com/woodpecker-ci/woodpecker/server/store" @@ -51,6 +52,30 @@ func GetAgent(c *gin.Context) { c.JSON(http.StatusOK, agent) } +func GetAgentTasks(c *gin.Context) { + agentID, err := strconv.ParseInt(c.Param("agent"), 10, 64) + if err != nil { + _ = c.AbortWithError(http.StatusBadRequest, err) + return + } + + agent, err := store.FromContext(c).AgentFind(agentID) + if err != nil { + c.String(http.StatusNotFound, "Cannot find agent. %s", err) + return + } + + tasks := []*model.Task{} + info := server.Config.Services.Queue.Info(c) + for _, task := range info.Running { + if task.AgentID == agent.ID { + tasks = append(tasks, task) + } + } + + c.JSON(http.StatusOK, tasks) +} + func PatchAgent(c *gin.Context) { _store := store.FromContext(c) diff --git a/server/grpc/rpc.go b/server/grpc/rpc.go index 665785186..2ce718715 100644 --- a/server/grpc/rpc.go +++ b/server/grpc/rpc.go @@ -77,7 +77,7 @@ func (s *RPC) Next(c context.Context, agentFilter rpc.Filter) (*rpc.Pipeline, er return nil, nil } - task, err := s.queue.Poll(c, fn) + task, err := s.queue.Poll(c, agent.ID, fn) if err != nil { return nil, err } else if task == nil { @@ -131,14 +131,6 @@ func (s *RPC) Update(c context.Context, id string, state rpc.State) error { return err } - metadata, ok := grpcMetadata.FromIncomingContext(c) - if ok { - hostname, ok := metadata["hostname"] - if ok && len(hostname) != 0 { - step.Machine = hostname[0] - } - } - repo, err := s.store.GetRepo(currentPipeline.RepoID) if err != nil { log.Error().Msgf("error: cannot find repo with id %d: %s", currentPipeline.RepoID, err) @@ -258,13 +250,12 @@ func (s *RPC) Init(c context.Context, id string, state rpc.State) error { log.Error().Msgf("error: cannot find step with id %d: %s", stepID, err) return err } - metadata, ok := grpcMetadata.FromIncomingContext(c) - if ok { - hostname, ok := metadata["hostname"] - if ok && len(hostname) != 0 { - step.Machine = hostname[0] - } + + agent, err := s.getAgentFromContext(c) + if err != nil { + return err } + step.AgentID = agent.ID currentPipeline, err := s.store.GetPipeline(step.PipelineID) if err != nil { diff --git a/server/model/step.go b/server/model/step.go index fe4e68d36..05fcc70ff 100644 --- a/server/model/step.go +++ b/server/model/step.go @@ -32,7 +32,7 @@ type StepStore interface { // swagger:model step type Step struct { ID int64 `json:"id" xorm:"pk autoincr 'step_id'"` - PipelineID int64 `json:"pipeline_id" xorm:"UNIQUE(s) INDEX 'step_pipeline_id'"` + PipelineID int64 `json:"pipeline_id" xorm:"UNIQUE(s) INDEX 'step_pipeline_id'"` PID int `json:"pid" xorm:"UNIQUE(s) 'step_pid'"` PPID int `json:"ppid" xorm:"step_ppid"` PGID int `json:"pgid" xorm:"step_pgid"` @@ -42,7 +42,7 @@ type Step struct { ExitCode int `json:"exit_code" xorm:"step_exit_code"` Started int64 `json:"start_time,omitempty" xorm:"step_started"` Stopped int64 `json:"end_time,omitempty" xorm:"step_stopped"` - Machine string `json:"machine,omitempty" xorm:"step_machine"` + AgentID int64 `json:"agent_id,omitempty" xorm:"step_agent_id"` Platform string `json:"platform,omitempty" xorm:"step_platform"` Environ map[string]string `json:"environ,omitempty" xorm:"json 'step_environ'"` Children []*Step `json:"children,omitempty" xorm:"-"` diff --git a/server/model/task.go b/server/model/task.go index 0ec4ae3e1..0365ff3f6 100644 --- a/server/model/task.go +++ b/server/model/task.go @@ -26,14 +26,6 @@ type TaskStore interface { TaskDelete(string) error } -type TaskStatusValue string - -const ( - TaskStatusSkipped TaskStatusValue = "skipped" - TaskStatusSuccess TaskStatusValue = "success" - TaskStatusFailure TaskStatusValue = "failure" -) - // Task defines scheduled pipeline Task. type Task struct { ID string `json:"id" xorm:"PK UNIQUE 'task_id'"` @@ -42,6 +34,7 @@ type Task struct { Dependencies []string `json:"dependencies" xorm:"json 'task_dependencies'"` RunOn []string `json:"run_on" xorm:"json 'task_run_on'"` DepStatus map[string]StatusValue `json:"dep_status" xorm:"json 'task_dep_status'"` + AgentID int64 `json:"agent_id" xorm:"'agent_id'"` } // TableName return database table name for xorm diff --git a/server/queue/fifo.go b/server/queue/fifo.go index e38425678..e79173d1a 100644 --- a/server/queue/fifo.go +++ b/server/queue/fifo.go @@ -34,6 +34,7 @@ type entry struct { } type worker struct { + agentID int64 filter FilterFn channel chan *model.Task } @@ -82,9 +83,10 @@ func (q *fifo) PushAtOnce(_ context.Context, tasks []*model.Task) error { } // Poll retrieves and removes the head of this queue. -func (q *fifo) Poll(c context.Context, f FilterFn) (*model.Task, error) { +func (q *fifo) Poll(c context.Context, agentID int64, f FilterFn) (*model.Task, error) { q.Lock() w := &worker{ + agentID: agentID, channel: make(chan *model.Task, 1), filter: f, } @@ -254,6 +256,7 @@ func (q *fifo) process() { q.filterWaiting() for pending, worker := q.assignToWorker(); pending != nil && worker != nil; pending, worker = q.assignToWorker() { task := pending.Value.(*model.Task) + task.AgentID = worker.agentID delete(q.workers, worker) q.pending.Remove(pending) q.running[task.ID] = &entry{ diff --git a/server/queue/fifo_test.go b/server/queue/fifo_test.go index 595238f58..8ad288b04 100644 --- a/server/queue/fifo_test.go +++ b/server/queue/fifo_test.go @@ -25,7 +25,7 @@ func TestFifo(t *testing.T) { return } - got, _ := q.Poll(noContext, func(*model.Task) bool { return true }) + got, _ := q.Poll(noContext, 1, func(*model.Task) bool { return true }) if got != want { t.Errorf("expect task returned form queue") return @@ -65,7 +65,7 @@ func TestFifoExpire(t *testing.T) { return } - got, _ := q.Poll(noContext, func(*model.Task) bool { return true }) + got, _ := q.Poll(noContext, 1, func(*model.Task) bool { return true }) if got != want { t.Errorf("expect task returned form queue") return @@ -84,7 +84,7 @@ func TestFifoWait(t *testing.T) { q := New(context.Background()).(*fifo) assert.NoError(t, q.Push(noContext, want)) - got, _ := q.Poll(noContext, func(*model.Task) bool { return true }) + got, _ := q.Poll(noContext, 1, func(*model.Task) bool { return true }) if got != want { t.Errorf("expect task returned form queue") return @@ -137,7 +137,7 @@ func TestFifoDependencies(t *testing.T) { q := New(context.Background()).(*fifo) assert.NoError(t, q.PushAtOnce(noContext, []*model.Task{task2, task1})) - got, _ := q.Poll(noContext, func(*model.Task) bool { return true }) + got, _ := q.Poll(noContext, 1, func(*model.Task) bool { return true }) if got != task1 { t.Errorf("expect task1 returned from queue as task2 depends on it") return @@ -145,7 +145,7 @@ func TestFifoDependencies(t *testing.T) { assert.NoError(t, q.Done(noContext, got.ID, model.StatusSuccess)) - got, _ = q.Poll(noContext, func(*model.Task) bool { return true }) + got, _ = q.Poll(noContext, 1, func(*model.Task) bool { return true }) if got != task2 { t.Errorf("expect task2 returned from queue") return @@ -173,7 +173,7 @@ func TestFifoErrors(t *testing.T) { q := New(context.Background()).(*fifo) assert.NoError(t, q.PushAtOnce(noContext, []*model.Task{task2, task3, task1})) - got, _ := q.Poll(noContext, func(*model.Task) bool { return true }) + got, _ := q.Poll(noContext, 1, func(*model.Task) bool { return true }) if got != task1 { t.Errorf("expect task1 returned from queue as task2 depends on it") return @@ -181,7 +181,7 @@ func TestFifoErrors(t *testing.T) { assert.NoError(t, q.Error(noContext, got.ID, fmt.Errorf("exitcode 1, there was an error"))) - got, _ = q.Poll(noContext, func(*model.Task) bool { return true }) + got, _ = q.Poll(noContext, 1, func(*model.Task) bool { return true }) if got != task2 { t.Errorf("expect task2 returned from queue") return @@ -192,7 +192,7 @@ func TestFifoErrors(t *testing.T) { return } - got, _ = q.Poll(noContext, func(*model.Task) bool { return true }) + got, _ = q.Poll(noContext, 1, func(*model.Task) bool { return true }) if got != task3 { t.Errorf("expect task3 returned from queue") return @@ -223,7 +223,7 @@ func TestFifoErrors2(t *testing.T) { assert.NoError(t, q.PushAtOnce(noContext, []*model.Task{task2, task3, task1})) for i := 0; i < 2; i++ { - got, _ := q.Poll(noContext, func(*model.Task) bool { return true }) + got, _ := q.Poll(noContext, 1, func(*model.Task) bool { return true }) if got != task1 && got != task2 { t.Errorf("expect task1 or task2 returned from queue as task3 depends on them") return @@ -237,7 +237,7 @@ func TestFifoErrors2(t *testing.T) { } } - got, _ := q.Poll(noContext, func(*model.Task) bool { return true }) + got, _ := q.Poll(noContext, 1, func(*model.Task) bool { return true }) if got != task3 { t.Errorf("expect task3 returned from queue") return @@ -275,7 +275,7 @@ func TestFifoErrorsMultiThread(t *testing.T) { go func(i int) { for { fmt.Printf("Worker %d started\n", i) - got, _ := q.Poll(noContext, func(*model.Task) bool { return true }) + got, _ := q.Poll(noContext, 1, func(*model.Task) bool { return true }) obtainedWorkCh <- got } }(i) @@ -299,7 +299,7 @@ func TestFifoErrorsMultiThread(t *testing.T) { go func() { for { fmt.Printf("Worker spawned\n") - got, _ := q.Poll(noContext, func(*model.Task) bool { return true }) + got, _ := q.Poll(noContext, 1, func(*model.Task) bool { return true }) obtainedWorkCh <- got } }() @@ -313,7 +313,7 @@ func TestFifoErrorsMultiThread(t *testing.T) { go func() { for { fmt.Printf("Worker spawned\n") - got, _ := q.Poll(noContext, func(*model.Task) bool { return true }) + got, _ := q.Poll(noContext, 1, func(*model.Task) bool { return true }) obtainedWorkCh <- got } }() @@ -359,14 +359,14 @@ func TestFifoTransitiveErrors(t *testing.T) { q := New(context.Background()).(*fifo) assert.NoError(t, q.PushAtOnce(noContext, []*model.Task{task2, task3, task1})) - got, _ := q.Poll(noContext, func(*model.Task) bool { return true }) + got, _ := q.Poll(noContext, 1, func(*model.Task) bool { return true }) if got != task1 { t.Errorf("expect task1 returned from queue as task2 depends on it") return } assert.NoError(t, q.Error(noContext, got.ID, fmt.Errorf("exitcode 1, there was an error"))) - got, _ = q.Poll(noContext, func(*model.Task) bool { return true }) + got, _ = q.Poll(noContext, 1, func(*model.Task) bool { return true }) if got != task2 { t.Errorf("expect task2 returned from queue") return @@ -377,7 +377,7 @@ func TestFifoTransitiveErrors(t *testing.T) { } assert.NoError(t, q.Done(noContext, got.ID, model.StatusSkipped)) - got, _ = q.Poll(noContext, func(*model.Task) bool { return true }) + got, _ = q.Poll(noContext, 1, func(*model.Task) bool { return true }) if got != task3 { t.Errorf("expect task3 returned from queue") return @@ -409,7 +409,7 @@ func TestFifoCancel(t *testing.T) { q := New(context.Background()).(*fifo) assert.NoError(t, q.PushAtOnce(noContext, []*model.Task{task2, task3, task1})) - _, _ = q.Poll(noContext, func(*model.Task) bool { return true }) + _, _ = q.Poll(noContext, 1, func(*model.Task) bool { return true }) assert.NoError(t, q.Error(noContext, task1.ID, fmt.Errorf("canceled"))) assert.NoError(t, q.Error(noContext, task2.ID, fmt.Errorf("canceled"))) assert.NoError(t, q.Error(noContext, task3.ID, fmt.Errorf("canceled"))) @@ -430,7 +430,7 @@ func TestFifoPause(t *testing.T) { var wg sync.WaitGroup wg.Add(1) go func() { - _, _ = q.Poll(noContext, func(*model.Task) bool { return true }) + _, _ = q.Poll(noContext, 1, func(*model.Task) bool { return true }) wg.Done() }() @@ -450,7 +450,7 @@ func TestFifoPause(t *testing.T) { q.Pause() assert.NoError(t, q.Push(noContext, task1)) q.Resume() - _, _ = q.Poll(noContext, func(*model.Task) bool { return true }) + _, _ = q.Poll(noContext, 1, func(*model.Task) bool { return true }) } func TestFifoPauseResume(t *testing.T) { @@ -463,7 +463,7 @@ func TestFifoPauseResume(t *testing.T) { assert.NoError(t, q.Push(noContext, task1)) q.Resume() - _, _ = q.Poll(noContext, func(*model.Task) bool { return true }) + _, _ = q.Poll(noContext, 1, func(*model.Task) bool { return true }) } func TestWaitingVsPending(t *testing.T) { @@ -487,7 +487,7 @@ func TestWaitingVsPending(t *testing.T) { q := New(context.Background()).(*fifo) assert.NoError(t, q.PushAtOnce(noContext, []*model.Task{task2, task3, task1})) - got, _ := q.Poll(noContext, func(*model.Task) bool { return true }) + got, _ := q.Poll(noContext, 1, func(*model.Task) bool { return true }) info := q.Info(noContext) if info.Stats.WaitingOnDeps != 2 { @@ -495,7 +495,7 @@ func TestWaitingVsPending(t *testing.T) { } assert.NoError(t, q.Error(noContext, got.ID, fmt.Errorf("exitcode 1, there was an error"))) - got, err := q.Poll(noContext, func(*model.Task) bool { return true }) + got, err := q.Poll(noContext, 1, func(*model.Task) bool { return true }) assert.NoError(t, err) assert.EqualValues(t, task2, got) diff --git a/server/queue/persistent.go b/server/queue/persistent.go index c7530e518..f32db54f5 100644 --- a/server/queue/persistent.go +++ b/server/queue/persistent.go @@ -73,8 +73,8 @@ func (q *persistentQueue) PushAtOnce(c context.Context, tasks []*model.Task) err } // Poll retrieves and removes a task head of this queue. -func (q *persistentQueue) Poll(c context.Context, f FilterFn) (*model.Task, error) { - task, err := q.Queue.Poll(c, f) +func (q *persistentQueue) Poll(c context.Context, agentID int64, f FilterFn) (*model.Task, error) { + task, err := q.Queue.Poll(c, agentID, f) if task != nil { log.Debug().Msgf("pull queue item: %s: remove from backup", task.ID) if derr := q.store.TaskDelete(task.ID); derr != nil { diff --git a/server/queue/queue.go b/server/queue/queue.go index 5ab73fd59..22667e3b3 100644 --- a/server/queue/queue.go +++ b/server/queue/queue.go @@ -63,7 +63,7 @@ type Queue interface { PushAtOnce(c context.Context, tasks []*model.Task) error // Poll retrieves and removes a task head of this queue. - Poll(c context.Context, f FilterFn) (*model.Task, error) + Poll(c context.Context, agentID int64, f FilterFn) (*model.Task, error) // Extend extends the deadline for a task. Extend(c context.Context, id string) error diff --git a/server/router/api.go b/server/router/api.go index dcb7468bf..9f3f3ecb9 100644 --- a/server/router/api.go +++ b/server/router/api.go @@ -173,6 +173,7 @@ func apiRoutes(e *gin.Engine) { agentBase.GET("", api.GetAgents) agentBase.POST("", api.PostAgent) agentBase.GET("/:agent", api.GetAgent) + agentBase.GET("/:agent/tasks", api.GetAgentTasks) agentBase.PATCH("/:agent", api.PatchAgent) agentBase.DELETE("/:agent", api.DeleteAgent) } diff --git a/server/store/datastore/step_test.go b/server/store/datastore/step_test.go index c25772f14..d8f5f47b3 100644 --- a/server/store/datastore/step_test.go +++ b/server/store/datastore/step_test.go @@ -37,7 +37,7 @@ func TestStepFind(t *testing.T) { State: model.StatusSuccess, Error: "pc load letter", ExitCode: 255, - Machine: "localhost", + AgentID: 1, Platform: "linux/amd64", Environ: map[string]string{"GOLANG": "tip"}, }, @@ -147,7 +147,7 @@ func TestStepUpdate(t *testing.T) { State: "pending", Error: "pc load letter", ExitCode: 255, - Machine: "localhost", + AgentID: 1, Platform: "linux/amd64", Environ: map[string]string{"GOLANG": "tip"}, } diff --git a/web/components.d.ts b/web/components.d.ts index 4a3baf93d..cf610abf7 100644 --- a/web/components.d.ts +++ b/web/components.d.ts @@ -63,10 +63,10 @@ declare module '@vue/runtime-core' { IMdiGestureTap: typeof import('~icons/mdi/gesture-tap')['default'] IMdiGithub: typeof import('~icons/mdi/github')['default'] IMdiLoading: typeof import('~icons/mdi/loading')['default'] - IMdiSync: typeof import('~icons/mdi/sync')['default'] IMdiSourceBranch: typeof import('~icons/mdi/source-branch')['default'] IMdisourceCommit: typeof import('~icons/mdi/source-commit')['default'] IMdiSourcePull: typeof import('~icons/mdi/source-pull')['default'] + IMdiSync: typeof import('~icons/mdi/sync')['default'] IMdiTagOutline: typeof import('~icons/mdi/tag-outline')['default'] InputField: typeof import('./src/components/form/InputField.vue')['default'] IPhGitlabLogoSimpleFill: typeof import('~icons/ph/gitlab-logo-simple-fill')['default'] diff --git a/web/src/assets/locales/en.json b/web/src/assets/locales/en.json index 927a64bed..c4e7a383e 100644 --- a/web/src/assets/locales/en.json +++ b/web/src/assets/locales/en.json @@ -368,7 +368,9 @@ "version": "Version", "last_contact": "Last contact", "never": "Never", - "delete_confirm": "Do you really want to delete this agent? It wont be able to connected to the server anymore." + "delete_confirm": "Do you really want to delete this agent? It wont be able to connected to the server anymore.", + "edit_agent": "Edit agent", + "delete_agent": "Delete agent" }, "queue": { "queue": "Queue", @@ -381,6 +383,8 @@ "task_running": "Task is running", "task_pending": "Task is pending", "task_waiting_on_deps": "Task is waiting on dependencies", + "agent": "agent", + "waiting_for": "waiting for", "stats": { "completed_count": "Completed Tasks", "worker_count": "Free", diff --git a/web/src/components/admin/settings/AdminAgentsTab.vue b/web/src/components/admin/settings/AdminAgentsTab.vue index 2a4f7a60b..e2aaf2544 100644 --- a/web/src/components/admin/settings/AdminAgentsTab.vue +++ b/web/src/components/admin/settings/AdminAgentsTab.vue @@ -12,10 +12,7 @@ start-icon="back" @click="selectedAgent = undefined" /> - +