diff options
Diffstat (limited to 'pkg/worker/worker.go')
-rw-r--r-- | pkg/worker/worker.go | 46 |
1 files changed, 29 insertions, 17 deletions
diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index 6b5c21c..fc97c97 100644 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -10,31 +10,42 @@ import ( ) type ( + + // Task defines a long running core component of the application. Task interface { + // Start defines when the component will be started. + // The task MUST watch for the context and return when the context is + // canceled. + // + // Start MUST only error on unhandled catastrophic errors, since + // returning an error will cause the application to halt. + // + // Context canceled error is ignored and not reported as error as they + // will be trigger by OS signals or some other component erring. But in + // any case the task SHOULD handle and return context cancellation + // error. Start(context.Context) error } - Work struct { + // TaskPool manages the life-cycle of a pool of tasks. + TaskPool struct { + tasks []*work + } + + // work is wrapper around task to add metadata to it. + work struct { Name string Task Task wait time.Duration } - - TaskPool struct { - tasks []*Work - } -) - -const ( - format = "2006.01.02 15:04:05" ) func NewTaskPool() *TaskPool { return &TaskPool{} } -func (w *Work) run(ctx context.Context) error { - // first time fire from the get go +func (w *work) run(ctx context.Context) error { + // first time fire the task from the get go timer := time.NewTimer(time.Nanosecond) for { @@ -42,7 +53,7 @@ func (w *Work) run(ctx context.Context) error { case <-ctx.Done(): return ctx.Err() case <-timer.C: - if err := w.Task.Start(ctx); err != nil && !errors.Is(err, context.Canceled) { + if err := w.Task.Start(ctx); err != nil { return err } } @@ -51,7 +62,7 @@ func (w *Work) run(ctx context.Context) error { } func (self *TaskPool) AddTask(name string, wait time.Duration, task Task) { - self.tasks = append(self.tasks, &Work{ + self.tasks = append(self.tasks, &work{ Name: name, Task: task, wait: wait, @@ -60,27 +71,28 @@ func (self *TaskPool) AddTask(name string, wait time.Duration, task Task) { func (self *TaskPool) Start(ctx context.Context) error { var g errgroup.Group + ctx, cancel := context.WithCancel(ctx) + defer cancel() for _, w := range self.tasks { - g.Go(func(w *Work) func() error { + g.Go(func(w *work) func() error { return func() error { - slog.Info("Process starting", "time", time.Now().Format(format), "name", w.Name) + slog.Info("Process starting", "name", w.Name) now := time.Now() if err := w.run(ctx); err != nil && !errors.Is(context.Canceled, err) { since := time.Since(now) slog.Error( "Process erred", - "time", time.Now().Format(format), "name", w.Name, "error", err, "duration", since, ) + cancel() return err } else { since := time.Since(now) slog.Info( "Process ended", - "time", time.Now().Format(format), "name", w.Name, "duration", since, ) |