mirror of
https://github.com/woodpecker-ci/woodpecker.git
synced 2026-03-15 17:13:46 +01:00
Add own workflow model (#1784)
Closes #1287 --------- Co-authored-by: 6543 <6543@obermui.de>
This commit is contained in:
@@ -105,24 +105,24 @@ func (s *RPC) Extend(c context.Context, id string) error {
|
||||
|
||||
// Update implements the rpc.Update function
|
||||
func (s *RPC) Update(c context.Context, id string, state rpc.State) error {
|
||||
stepID, err := strconv.ParseInt(id, 10, 64)
|
||||
workflowID, err := strconv.ParseInt(id, 10, 64)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
pstep, err := s.store.StepLoad(stepID)
|
||||
workflow, err := s.store.WorkflowLoad(workflowID)
|
||||
if err != nil {
|
||||
log.Error().Msgf("error: rpc.update: cannot find step with id %d: %s", stepID, err)
|
||||
log.Error().Msgf("error: rpc.update: cannot find workflow with id %d: %s", workflowID, err)
|
||||
return err
|
||||
}
|
||||
|
||||
currentPipeline, err := s.store.GetPipeline(pstep.PipelineID)
|
||||
currentPipeline, err := s.store.GetPipeline(workflow.PipelineID)
|
||||
if err != nil {
|
||||
log.Error().Msgf("error: cannot find pipeline with id %d: %s", pstep.PipelineID, err)
|
||||
log.Error().Msgf("error: cannot find pipeline with id %d: %s", workflow.PipelineID, err)
|
||||
return err
|
||||
}
|
||||
|
||||
step, err := s.store.StepChild(currentPipeline, pstep.PID, state.Step)
|
||||
step, err := s.store.StepChild(currentPipeline, workflow.PID, state.Step)
|
||||
if err != nil {
|
||||
log.Error().Msgf("error: cannot find step with name %s: %s", state.Step, err)
|
||||
return err
|
||||
@@ -134,17 +134,11 @@ func (s *RPC) Update(c context.Context, id string, state rpc.State) error {
|
||||
return err
|
||||
}
|
||||
|
||||
if step, err = pipeline.UpdateStepStatus(s.store, *step, state, currentPipeline.Started); err != nil {
|
||||
if err := pipeline.UpdateStepStatus(s.store, step, state, currentPipeline.Started); err != nil {
|
||||
log.Error().Err(err).Msg("rpc.update: cannot update step")
|
||||
}
|
||||
|
||||
s.updateForgeStatus(c, repo, currentPipeline, step)
|
||||
|
||||
currentPipeline.Steps, err = s.store.StepList(currentPipeline)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("can not get step list from store")
|
||||
}
|
||||
if currentPipeline.Steps, err = model.Tree(currentPipeline.Steps); err != nil {
|
||||
if currentPipeline.Workflows, err = s.store.WorkflowGetTree(currentPipeline); err != nil {
|
||||
log.Error().Err(err).Msg("can not build tree from step list")
|
||||
return err
|
||||
}
|
||||
@@ -172,7 +166,7 @@ func (s *RPC) Init(c context.Context, id string, state rpc.State) error {
|
||||
return err
|
||||
}
|
||||
|
||||
step, err := s.store.StepLoad(stepID)
|
||||
workflow, err := s.store.WorkflowLoad(stepID)
|
||||
if err != nil {
|
||||
log.Error().Msgf("error: cannot find step with id %d: %s", stepID, err)
|
||||
return err
|
||||
@@ -182,11 +176,11 @@ func (s *RPC) Init(c context.Context, id string, state rpc.State) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
step.AgentID = agent.ID
|
||||
workflow.AgentID = agent.ID
|
||||
|
||||
currentPipeline, err := s.store.GetPipeline(step.PipelineID)
|
||||
currentPipeline, err := s.store.GetPipeline(workflow.PipelineID)
|
||||
if err != nil {
|
||||
log.Error().Msgf("error: cannot find pipeline with id %d: %s", step.PipelineID, err)
|
||||
log.Error().Msgf("error: cannot find pipeline with id %d: %s", workflow.PipelineID, err)
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -202,10 +196,10 @@ func (s *RPC) Init(c context.Context, id string, state rpc.State) error {
|
||||
}
|
||||
}
|
||||
|
||||
s.updateForgeStatus(c, repo, currentPipeline, step)
|
||||
s.updateForgeStatus(c, repo, currentPipeline, workflow)
|
||||
|
||||
defer func() {
|
||||
currentPipeline.Steps, _ = s.store.StepList(currentPipeline)
|
||||
currentPipeline.Workflows, _ = s.store.WorkflowGetTree(currentPipeline)
|
||||
message := pubsub.Message{
|
||||
Labels: map[string]string{
|
||||
"repo": repo.FullName,
|
||||
@@ -221,11 +215,11 @@ func (s *RPC) Init(c context.Context, id string, state rpc.State) error {
|
||||
}
|
||||
}()
|
||||
|
||||
step, err = pipeline.UpdateStepToStatusStarted(s.store, *step, state)
|
||||
workflow, err = pipeline.UpdateWorkflowToStatusStarted(s.store, *workflow, state)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
s.updateForgeStatus(c, repo, currentPipeline, step)
|
||||
s.updateForgeStatus(c, repo, currentPipeline, workflow)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -236,12 +230,17 @@ func (s *RPC) Done(c context.Context, id string, state rpc.State) error {
|
||||
return err
|
||||
}
|
||||
|
||||
workflow, err := s.store.StepLoad(workflowID)
|
||||
workflow, err := s.store.WorkflowLoad(workflowID)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msgf("cannot find step with id %d", workflowID)
|
||||
return err
|
||||
}
|
||||
|
||||
workflow.Children, err = s.store.StepListFromWorkflowFind(workflow)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
currentPipeline, err := s.store.GetPipeline(workflow.PipelineID)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msgf("cannot find pipeline with id %d", workflow.PipelineID)
|
||||
@@ -261,7 +260,7 @@ func (s *RPC) Done(c context.Context, id string, state rpc.State) error {
|
||||
|
||||
logger.Trace().Msgf("gRPC Done with state: %#v", state)
|
||||
|
||||
if workflow, err = pipeline.UpdateStepStatusToDone(s.store, *workflow, state); err != nil {
|
||||
if workflow, err = pipeline.UpdateWorkflowStatusToDone(s.store, *workflow, state); err != nil {
|
||||
logger.Error().Err(err).Msgf("pipeline.UpdateStepStatusToDone: cannot update workflow state: %s", err)
|
||||
}
|
||||
|
||||
@@ -275,14 +274,14 @@ func (s *RPC) Done(c context.Context, id string, state rpc.State) error {
|
||||
logger.Error().Err(queueErr).Msg("queue.Done: cannot ack workflow")
|
||||
}
|
||||
|
||||
steps, err := s.store.StepList(currentPipeline)
|
||||
currentPipeline.Workflows, err = s.store.WorkflowGetTree(currentPipeline)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
s.completeChildrenIfParentCompleted(steps, workflow)
|
||||
s.completeChildrenIfParentCompleted(workflow)
|
||||
|
||||
if !model.IsThereRunningStage(steps) {
|
||||
if currentPipeline, err = pipeline.UpdateStatusToDone(s.store, *currentPipeline, model.PipelineStatus(steps), workflow.Stopped); err != nil {
|
||||
if !model.IsThereRunningStage(currentPipeline.Workflows) {
|
||||
if currentPipeline, err = pipeline.UpdateStatusToDone(s.store, *currentPipeline, model.PipelineStatus(currentPipeline.Workflows), workflow.Stopped); err != nil {
|
||||
logger.Error().Err(err).Msgf("pipeline.UpdateStatusToDone: cannot update workflow final state")
|
||||
}
|
||||
}
|
||||
@@ -291,14 +290,16 @@ func (s *RPC) Done(c context.Context, id string, state rpc.State) error {
|
||||
|
||||
// make sure writes to pubsub are non blocking (https://github.com/woodpecker-ci/woodpecker/blob/c919f32e0b6432a95e1a6d3d0ad662f591adf73f/server/logging/log.go#L9)
|
||||
go func() {
|
||||
for _, step := range steps {
|
||||
if err := s.logger.Close(c, step.ID); err != nil {
|
||||
logger.Error().Err(err).Msgf("done: cannot close log stream for step %d", step.ID)
|
||||
for _, wf := range currentPipeline.Workflows {
|
||||
for _, step := range wf.Children {
|
||||
if err := s.logger.Close(c, step.ID); err != nil {
|
||||
logger.Error().Err(err).Msgf("done: cannot close log stream for step %d", step.ID)
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
if err := s.notify(c, repo, currentPipeline, steps); err != nil {
|
||||
if err := s.notify(c, repo, currentPipeline); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -306,7 +307,7 @@ func (s *RPC) Done(c context.Context, id string, state rpc.State) error {
|
||||
s.pipelineCount.WithLabelValues(repo.FullName, currentPipeline.Branch, string(currentPipeline.Status), "total").Inc()
|
||||
s.pipelineTime.WithLabelValues(repo.FullName, currentPipeline.Branch, string(currentPipeline.Status), "total").Set(float64(currentPipeline.Finished - currentPipeline.Started))
|
||||
}
|
||||
if model.IsMultiPipeline(steps) {
|
||||
if currentPipeline.IsMultiPipeline() {
|
||||
s.pipelineTime.WithLabelValues(repo.FullName, currentPipeline.Branch, string(workflow.State), workflow.Name).Set(float64(workflow.Stopped - workflow.Started))
|
||||
}
|
||||
|
||||
@@ -372,17 +373,17 @@ func (s *RPC) ReportHealth(ctx context.Context, status string) error {
|
||||
return s.store.AgentUpdate(agent)
|
||||
}
|
||||
|
||||
func (s *RPC) completeChildrenIfParentCompleted(steps []*model.Step, completedWorkflow *model.Step) {
|
||||
for _, p := range steps {
|
||||
if p.Running() && p.PPID == completedWorkflow.PID {
|
||||
if _, err := pipeline.UpdateStepToStatusSkipped(s.store, *p, completedWorkflow.Stopped); err != nil {
|
||||
log.Error().Msgf("error: done: cannot update step_id %d child state: %s", p.ID, err)
|
||||
func (s *RPC) completeChildrenIfParentCompleted(completedWorkflow *model.Workflow) {
|
||||
for _, c := range completedWorkflow.Children {
|
||||
if c.Running() {
|
||||
if _, err := pipeline.UpdateStepToStatusSkipped(s.store, *c, completedWorkflow.Stopped); err != nil {
|
||||
log.Error().Msgf("error: done: cannot update step_id %d child state: %s", c.ID, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *RPC) updateForgeStatus(ctx context.Context, repo *model.Repo, pipeline *model.Pipeline, step *model.Step) {
|
||||
func (s *RPC) updateForgeStatus(ctx context.Context, repo *model.Repo, pipeline *model.Pipeline, workflow *model.Workflow) {
|
||||
user, err := s.store.GetUser(repo.UserID)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msgf("can not get user with id '%d'", repo.UserID)
|
||||
@@ -401,18 +402,15 @@ func (s *RPC) updateForgeStatus(ctx context.Context, repo *model.Repo, pipeline
|
||||
}
|
||||
|
||||
// only do status updates for parent steps
|
||||
if step != nil && step.IsParent() {
|
||||
err = s.forge.Status(ctx, user, repo, pipeline, step)
|
||||
if workflow != nil {
|
||||
err = s.forge.Status(ctx, user, repo, pipeline, workflow)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msgf("error setting commit status for %s/%d", repo.FullName, pipeline.Number)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *RPC) notify(c context.Context, repo *model.Repo, pipeline *model.Pipeline, steps []*model.Step) (err error) {
|
||||
if pipeline.Steps, err = model.Tree(steps); err != nil {
|
||||
return err
|
||||
}
|
||||
func (s *RPC) notify(c context.Context, repo *model.Repo, pipeline *model.Pipeline) (err error) {
|
||||
message := pubsub.Message{
|
||||
Labels: map[string]string{
|
||||
"repo": repo.FullName,
|
||||
|
||||
Reference in New Issue
Block a user