mirror of
https://github.com/woodpecker-ci/woodpecker.git
synced 2026-03-11 21:55:01 +01:00
feat: enhance service starting
This commit is contained in:
@@ -19,7 +19,6 @@ import (
|
||||
"fmt"
|
||||
"net"
|
||||
|
||||
"github.com/rs/zerolog/log"
|
||||
"github.com/urfave/cli/v3"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/keepalive"
|
||||
@@ -71,9 +70,7 @@ func runGrpcServer(ctx context.Context, c *cli.Command, _store store.Store) erro
|
||||
if grpcServer == nil {
|
||||
return
|
||||
}
|
||||
log.Info().Msg("terminating grpc service gracefully")
|
||||
grpcServer.GracefulStop()
|
||||
log.Info().Msg("grpc service stopped")
|
||||
}()
|
||||
|
||||
if err := grpcServer.Serve(lis); err != nil {
|
||||
|
||||
65
cmd/server/http_server.go
Normal file
65
cmd/server/http_server.go
Normal file
@@ -0,0 +1,65 @@
|
||||
// Copyright 2024 Woodpecker Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/http/httputil"
|
||||
"net/url"
|
||||
"time"
|
||||
|
||||
"github.com/urfave/cli/v3"
|
||||
|
||||
"go.woodpecker-ci.org/woodpecker/v3/server/router"
|
||||
"go.woodpecker-ci.org/woodpecker/v3/server/router/middleware"
|
||||
"go.woodpecker-ci.org/woodpecker/v3/server/store"
|
||||
"go.woodpecker-ci.org/woodpecker/v3/server/web"
|
||||
)
|
||||
|
||||
func getHTTPHandler(c *cli.Command, _store store.Store) (http.Handler, error) {
|
||||
proxyWebUI := c.String("www-proxy")
|
||||
var webUIServe func(w http.ResponseWriter, r *http.Request)
|
||||
|
||||
if proxyWebUI == "" {
|
||||
webEngine, err := web.New()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create webEngine: %w", err)
|
||||
}
|
||||
webUIServe = webEngine.ServeHTTP
|
||||
} else {
|
||||
origin, _ := url.Parse(proxyWebUI)
|
||||
|
||||
director := func(req *http.Request) {
|
||||
req.Header.Add("X-Forwarded-Host", req.Host)
|
||||
req.Header.Add("X-Origin-Host", origin.Host)
|
||||
req.URL.Scheme = origin.Scheme
|
||||
req.URL.Host = origin.Host
|
||||
}
|
||||
|
||||
proxy := &httputil.ReverseProxy{Director: director}
|
||||
webUIServe = proxy.ServeHTTP
|
||||
}
|
||||
|
||||
// setup the server and start the listener
|
||||
handler := router.Load(
|
||||
webUIServe,
|
||||
middleware.Logger(time.RFC3339, true),
|
||||
middleware.Version,
|
||||
middleware.Store(_store),
|
||||
)
|
||||
|
||||
return handler, nil
|
||||
}
|
||||
@@ -65,8 +65,6 @@ func startMetricsCollector(ctx context.Context, _store store.Store) {
|
||||
})
|
||||
|
||||
go func() {
|
||||
log.Info().Msg("queue metric collector started")
|
||||
|
||||
for {
|
||||
stats := server.Config.Services.Queue.Info(ctx)
|
||||
pendingSteps.Set(float64(stats.Stats.Pending))
|
||||
@@ -76,15 +74,12 @@ func startMetricsCollector(ctx context.Context, _store store.Store) {
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
log.Info().Msg("queue metric collector stopped")
|
||||
return
|
||||
case <-time.After(queueInfoRefreshInterval):
|
||||
}
|
||||
}
|
||||
}()
|
||||
go func() {
|
||||
log.Info().Msg("store metric collector started")
|
||||
|
||||
for {
|
||||
repoCount, repoErr := _store.GetRepoCount()
|
||||
userCount, userErr := _store.GetUserCount()
|
||||
@@ -99,7 +94,6 @@ func startMetricsCollector(ctx context.Context, _store store.Store) {
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
log.Info().Msg("store metric collector stopped")
|
||||
return
|
||||
case <-time.After(storeInfoRefreshInterval):
|
||||
}
|
||||
|
||||
@@ -20,7 +20,6 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/http/httputil"
|
||||
"net/url"
|
||||
"strings"
|
||||
"time"
|
||||
@@ -35,40 +34,20 @@ import (
|
||||
|
||||
"go.woodpecker-ci.org/woodpecker/v3/server"
|
||||
"go.woodpecker-ci.org/woodpecker/v3/server/cron"
|
||||
"go.woodpecker-ci.org/woodpecker/v3/server/router"
|
||||
"go.woodpecker-ci.org/woodpecker/v3/server/router/middleware"
|
||||
"go.woodpecker-ci.org/woodpecker/v3/server/store"
|
||||
"go.woodpecker-ci.org/woodpecker/v3/server/web"
|
||||
"go.woodpecker-ci.org/woodpecker/v3/shared/logger"
|
||||
"go.woodpecker-ci.org/woodpecker/v3/version"
|
||||
)
|
||||
|
||||
const (
|
||||
shutdownTimeout = time.Second * 5
|
||||
)
|
||||
|
||||
var (
|
||||
stopServerFunc context.CancelCauseFunc = func(error) {}
|
||||
shutdownCancelFunc context.CancelFunc = func() {}
|
||||
shutdownCtx = context.Background()
|
||||
)
|
||||
|
||||
func run(ctx context.Context, c *cli.Command) error {
|
||||
if err := logger.SetupGlobalLogger(ctx, c, true); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
shutdownCtx := context.TODO()
|
||||
|
||||
ctx, ctxCancel := context.WithCancelCause(ctx)
|
||||
stopServerFunc = func(err error) {
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("shutdown of whole server")
|
||||
}
|
||||
stopServerFunc = func(error) {}
|
||||
shutdownCtx, shutdownCancelFunc = context.WithTimeout(shutdownCtx, shutdownTimeout)
|
||||
ctxCancel(err)
|
||||
}
|
||||
defer stopServerFunc(nil)
|
||||
defer shutdownCancelFunc()
|
||||
defer ctxCancel(nil)
|
||||
|
||||
// set gin mode based on log level
|
||||
if zerolog.GlobalLevel() > zerolog.DebugLevel {
|
||||
@@ -93,6 +72,44 @@ func run(ctx context.Context, c *cli.Command) error {
|
||||
)
|
||||
}
|
||||
|
||||
// wait for all services until one does stop with an error
|
||||
serviceWaitGroup := errgroup.Group{}
|
||||
|
||||
startService := func(name string, startFnc func(context.Context) error, stopFnc func(context.Context) error, stopOnErr bool) {
|
||||
if strings.Contains(name, " ") {
|
||||
name = fmt.Sprintf("'%s'", name)
|
||||
}
|
||||
|
||||
serviceWaitGroup.Go(func() error {
|
||||
log.Debug().Msgf("starting %s service ...", name)
|
||||
|
||||
go func() {
|
||||
<-ctx.Done()
|
||||
|
||||
log.Debug().Msgf("stopping %s service ...", name)
|
||||
|
||||
if stopFnc != nil {
|
||||
if err := stopFnc(shutdownCtx); err != nil {
|
||||
log.Error().Err(err).Msgf("failed to stop %s service", name)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
log.Debug().Msgf("%s service stopped", name)
|
||||
}()
|
||||
|
||||
if err := startFnc(ctx); err != nil {
|
||||
if stopOnErr {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Error().Err(err).Msgf("could not start %s service", name)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
_store, err := backoff.Retry(ctx,
|
||||
func() (store.Store, error) {
|
||||
return setupStore(ctx, c)
|
||||
@@ -117,96 +134,58 @@ func run(ctx context.Context, c *cli.Command) error {
|
||||
return fmt.Errorf("can't setup globals: %w", err)
|
||||
}
|
||||
|
||||
// wait for all services until one do stops with an error
|
||||
serviceWaitingGroup := errgroup.Group{}
|
||||
|
||||
log.Info().Msgf("starting Woodpecker server with version '%s'", version.String())
|
||||
|
||||
serviceWaitingGroup.Go(func() error {
|
||||
log.Info().Msg("starting cron service ...")
|
||||
if err := cron.Run(ctx, _store); err != nil {
|
||||
go stopServerFunc(err)
|
||||
return err
|
||||
}
|
||||
log.Info().Msg("cron service stopped")
|
||||
return nil
|
||||
})
|
||||
|
||||
// start the grpc server
|
||||
serviceWaitingGroup.Go(func() error {
|
||||
log.Info().Msg("starting grpc server ...")
|
||||
if err := runGrpcServer(ctx, c, _store); err != nil {
|
||||
// stop whole server as grpc is essential
|
||||
go stopServerFunc(err)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
proxyWebUI := c.String("www-proxy")
|
||||
var webUIServe func(w http.ResponseWriter, r *http.Request)
|
||||
|
||||
if proxyWebUI == "" {
|
||||
webEngine, err := web.New()
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("failed to create web engine")
|
||||
return err
|
||||
}
|
||||
webUIServe = webEngine.ServeHTTP
|
||||
} else {
|
||||
origin, _ := url.Parse(proxyWebUI)
|
||||
|
||||
director := func(req *http.Request) {
|
||||
req.Header.Add("X-Forwarded-Host", req.Host)
|
||||
req.Header.Add("X-Origin-Host", origin.Host)
|
||||
req.URL.Scheme = origin.Scheme
|
||||
req.URL.Host = origin.Host
|
||||
}
|
||||
|
||||
proxy := &httputil.ReverseProxy{Director: director}
|
||||
webUIServe = proxy.ServeHTTP
|
||||
}
|
||||
|
||||
// setup the server and start the listener
|
||||
handler := router.Load(
|
||||
webUIServe,
|
||||
middleware.Logger(time.RFC3339, true),
|
||||
middleware.Version,
|
||||
middleware.Store(_store),
|
||||
startService("cron",
|
||||
func(ctx context.Context) error {
|
||||
return cron.Run(ctx, _store)
|
||||
},
|
||||
nil,
|
||||
true,
|
||||
)
|
||||
|
||||
startService("grpc",
|
||||
func(ctx context.Context) error {
|
||||
return runGrpcServer(ctx, c, _store)
|
||||
},
|
||||
nil,
|
||||
true,
|
||||
)
|
||||
|
||||
httpHandler, err := getHTTPHandler(c, _store)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if c.String("server-cert") != "" {
|
||||
// start the server with tls enabled
|
||||
serviceWaitingGroup.Go(func() error {
|
||||
tlsServer := &http.Server{
|
||||
Addr: server.Config.Server.PortTLS,
|
||||
Handler: handler,
|
||||
TLSConfig: &tls.Config{
|
||||
NextProtos: []string{"h2", "http/1.1"},
|
||||
},
|
||||
}
|
||||
tlsServer := &http.Server{
|
||||
Addr: server.Config.Server.PortTLS,
|
||||
Handler: httpHandler,
|
||||
TLSConfig: &tls.Config{
|
||||
NextProtos: []string{"h2", "http/1.1"},
|
||||
},
|
||||
}
|
||||
|
||||
go func() {
|
||||
<-ctx.Done()
|
||||
log.Info().Msg("shutdown tls server ...")
|
||||
if err := tlsServer.Shutdown(shutdownCtx); err != nil { //nolint:contextcheck
|
||||
log.Error().Err(err).Msg("shutdown tls server failed")
|
||||
} else {
|
||||
log.Info().Msg("tls server stopped")
|
||||
startService("http with tls",
|
||||
func(_ context.Context) error {
|
||||
log.Info().Msgf("access ui at %s or http://localhost%s", server.Config.Server.Host, server.Config.Server.PortTLS)
|
||||
|
||||
err := tlsServer.ListenAndServeTLS(
|
||||
c.String("server-cert"),
|
||||
c.String("server-key"),
|
||||
)
|
||||
if err != nil && !errors.Is(err, http.ErrServerClosed) {
|
||||
return err
|
||||
}
|
||||
}()
|
||||
|
||||
log.Info().Msg("starting tls server ...")
|
||||
err := tlsServer.ListenAndServeTLS(
|
||||
c.String("server-cert"),
|
||||
c.String("server-key"),
|
||||
)
|
||||
if err != nil && !errors.Is(err, http.ErrServerClosed) {
|
||||
log.Error().Err(err).Msg("TLS server failed")
|
||||
stopServerFunc(fmt.Errorf("TLS server failed: %w", err))
|
||||
}
|
||||
return err
|
||||
})
|
||||
return nil
|
||||
},
|
||||
func(ctx context.Context) error {
|
||||
return tlsServer.Shutdown(ctx)
|
||||
},
|
||||
true,
|
||||
)
|
||||
|
||||
// http to https redirect
|
||||
redirect := func(w http.ResponseWriter, req *http.Request) {
|
||||
@@ -219,85 +198,79 @@ func run(ctx context.Context, c *cli.Command) error {
|
||||
http.Redirect(w, req, req.URL.String(), http.StatusMovedPermanently)
|
||||
}
|
||||
|
||||
serviceWaitingGroup.Go(func() error {
|
||||
redirectServer := &http.Server{
|
||||
Addr: server.Config.Server.Port,
|
||||
Handler: http.HandlerFunc(redirect),
|
||||
}
|
||||
go func() {
|
||||
<-ctx.Done()
|
||||
log.Info().Msg("shutdown redirect server ...")
|
||||
if err := redirectServer.Shutdown(shutdownCtx); err != nil { //nolint:contextcheck
|
||||
log.Error().Err(err).Msg("shutdown redirect server failed")
|
||||
} else {
|
||||
log.Info().Msg("redirect server stopped")
|
||||
}
|
||||
}()
|
||||
redirectServer := &http.Server{
|
||||
Addr: server.Config.Server.Port,
|
||||
Handler: http.HandlerFunc(redirect),
|
||||
}
|
||||
|
||||
log.Info().Msg("starting redirect server ...")
|
||||
if err := redirectServer.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
|
||||
log.Error().Err(err).Msg("redirect server failed")
|
||||
stopServerFunc(fmt.Errorf("redirect server failed: %w", err))
|
||||
}
|
||||
return nil
|
||||
})
|
||||
startService("http redirect",
|
||||
func(_ context.Context) error {
|
||||
err := redirectServer.ListenAndServe()
|
||||
if err != nil && !errors.Is(err, http.ErrServerClosed) {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
},
|
||||
func(ctx context.Context) error {
|
||||
return redirectServer.Shutdown(ctx)
|
||||
},
|
||||
true,
|
||||
)
|
||||
} else {
|
||||
// start the server without tls
|
||||
serviceWaitingGroup.Go(func() error {
|
||||
httpServer := &http.Server{
|
||||
Addr: c.String("server-addr"),
|
||||
Handler: handler,
|
||||
}
|
||||
httpServer := &http.Server{
|
||||
Addr: c.String("server-addr"),
|
||||
Handler: httpHandler,
|
||||
}
|
||||
|
||||
go func() {
|
||||
<-ctx.Done()
|
||||
log.Info().Msg("shutdown http server ...")
|
||||
if err := httpServer.Shutdown(shutdownCtx); err != nil { //nolint:contextcheck
|
||||
log.Error().Err(err).Msg("shutdown http server failed")
|
||||
} else {
|
||||
log.Info().Msg("http server stopped")
|
||||
startService("http",
|
||||
func(_ context.Context) error {
|
||||
log.Info().Msgf("access ui at %s or http://localhost%s", server.Config.Server.Host, server.Config.Server.Port)
|
||||
|
||||
err := httpServer.ListenAndServe()
|
||||
if err != nil && !errors.Is(err, http.ErrServerClosed) {
|
||||
return err
|
||||
}
|
||||
}()
|
||||
|
||||
log.Info().Msg("starting http server ...")
|
||||
if err := httpServer.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
|
||||
log.Error().Err(err).Msg("http server failed")
|
||||
stopServerFunc(fmt.Errorf("http server failed: %w", err))
|
||||
}
|
||||
return err
|
||||
})
|
||||
return nil
|
||||
},
|
||||
func(ctx context.Context) error {
|
||||
return httpServer.Shutdown(ctx)
|
||||
},
|
||||
true,
|
||||
)
|
||||
}
|
||||
|
||||
if metricsServerAddr := c.String("metrics-server-addr"); metricsServerAddr != "" {
|
||||
startMetricsCollector(ctx, _store)
|
||||
startService("metrics collector", func(ctx context.Context) error {
|
||||
startMetricsCollector(ctx, _store)
|
||||
return nil
|
||||
}, nil, false)
|
||||
|
||||
serviceWaitingGroup.Go(func() error {
|
||||
metricsRouter := gin.New()
|
||||
metricsRouter.GET("/metrics", gin.WrapH(prometheus_http.Handler()))
|
||||
metricsRouter := gin.New()
|
||||
metricsRouter.GET("/metrics", gin.WrapH(prometheus_http.Handler()))
|
||||
|
||||
metricsServer := &http.Server{
|
||||
Addr: metricsServerAddr,
|
||||
Handler: metricsRouter,
|
||||
}
|
||||
metricsServer := &http.Server{
|
||||
Addr: metricsServerAddr,
|
||||
Handler: metricsRouter,
|
||||
}
|
||||
|
||||
go func() {
|
||||
<-ctx.Done()
|
||||
log.Info().Msg("shutdown metrics server ...")
|
||||
if err := metricsServer.Shutdown(shutdownCtx); err != nil { //nolint:contextcheck
|
||||
log.Error().Err(err).Msg("shutdown metrics server failed")
|
||||
} else {
|
||||
log.Info().Msg("metrics server stopped")
|
||||
startService("metrics",
|
||||
func(_ context.Context) error {
|
||||
err := metricsServer.ListenAndServe()
|
||||
if err != nil && !errors.Is(err, http.ErrServerClosed) {
|
||||
return err
|
||||
}
|
||||
}()
|
||||
|
||||
log.Info().Msg("starting metrics server ...")
|
||||
if err := metricsServer.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
|
||||
log.Error().Err(err).Msg("metrics server failed")
|
||||
stopServerFunc(fmt.Errorf("metrics server failed: %w", err))
|
||||
}
|
||||
return err
|
||||
})
|
||||
return nil
|
||||
},
|
||||
func(ctx context.Context) error {
|
||||
return metricsServer.Shutdown(ctx)
|
||||
},
|
||||
false,
|
||||
)
|
||||
}
|
||||
|
||||
return serviceWaitingGroup.Wait()
|
||||
return serviceWaitGroup.Wait()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user