From fadc75c731368d39edc718246b6dedff40e097e3 Mon Sep 17 00:00:00 2001 From: "Gabriel A. Giovanini" Date: Sat, 4 May 2024 23:28:05 +0200 Subject: feat: Close worker on error If a worker error the application, as a whole, is terminadated. --- pkg/worker/http.go | 20 ++++++++++++++++---- pkg/worker/worker.go | 46 +++++++++++++++++++++++++++++----------------- 2 files changed, 45 insertions(+), 21 deletions(-) diff --git a/pkg/worker/http.go b/pkg/worker/http.go index 1d56f86..973775e 100644 --- a/pkg/worker/http.go +++ b/pkg/worker/http.go @@ -16,11 +16,23 @@ func NewServerTask(server *http.Server) *ServerTask { } func (self *ServerTask) Start(ctx context.Context) error { + done := make(chan error) + go func() { - // nolint: errcheck - self.server.ListenAndServe() + done <- self.server.ListenAndServe() }() - <-ctx.Done() - return self.server.Shutdown(ctx) + select { + // if ListenAndServe error for something other than context.Canceled + //(e.g.: address already in use) it trigger done to return sonner with + // the return error + case err := <-done: + return err + + // in case of context canceled it will manually trigger the server to + // shutdown, and return its error, which is most cases, but not limited, is + // context.Canceled. + case <-ctx.Done(): + return self.server.Shutdown(ctx) + } } diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index 6b5c21c..fc97c97 100644 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -10,31 +10,42 @@ import ( ) type ( + + // Task defines a long running core component of the application. Task interface { + // Start defines when the component will be started. + // The task MUST watch for the context and return when the context is + // canceled. + // + // Start MUST only error on unhandled catastrophic errors, since + // returning an error will cause the application to halt. + // + // Context canceled error is ignored and not reported as error as they + // will be trigger by OS signals or some other component erring. But in + // any case the task SHOULD handle and return context cancellation + // error. Start(context.Context) error } - Work struct { + // TaskPool manages the life-cycle of a pool of tasks. + TaskPool struct { + tasks []*work + } + + // work is wrapper around task to add metadata to it. + work struct { Name string Task Task wait time.Duration } - - TaskPool struct { - tasks []*Work - } -) - -const ( - format = "2006.01.02 15:04:05" ) func NewTaskPool() *TaskPool { return &TaskPool{} } -func (w *Work) run(ctx context.Context) error { - // first time fire from the get go +func (w *work) run(ctx context.Context) error { + // first time fire the task from the get go timer := time.NewTimer(time.Nanosecond) for { @@ -42,7 +53,7 @@ func (w *Work) run(ctx context.Context) error { case <-ctx.Done(): return ctx.Err() case <-timer.C: - if err := w.Task.Start(ctx); err != nil && !errors.Is(err, context.Canceled) { + if err := w.Task.Start(ctx); err != nil { return err } } @@ -51,7 +62,7 @@ func (w *Work) run(ctx context.Context) error { } func (self *TaskPool) AddTask(name string, wait time.Duration, task Task) { - self.tasks = append(self.tasks, &Work{ + self.tasks = append(self.tasks, &work{ Name: name, Task: task, wait: wait, @@ -60,27 +71,28 @@ func (self *TaskPool) AddTask(name string, wait time.Duration, task Task) { func (self *TaskPool) Start(ctx context.Context) error { var g errgroup.Group + ctx, cancel := context.WithCancel(ctx) + defer cancel() for _, w := range self.tasks { - g.Go(func(w *Work) func() error { + g.Go(func(w *work) func() error { return func() error { - slog.Info("Process starting", "time", time.Now().Format(format), "name", w.Name) + slog.Info("Process starting", "name", w.Name) now := time.Now() if err := w.run(ctx); err != nil && !errors.Is(context.Canceled, err) { since := time.Since(now) slog.Error( "Process erred", - "time", time.Now().Format(format), "name", w.Name, "error", err, "duration", since, ) + cancel() return err } else { since := time.Since(now) slog.Info( "Process ended", - "time", time.Now().Format(format), "name", w.Name, "duration", since, ) -- cgit v1.2.3