package worker import ( "context" "errors" "log/slog" "time" "golang.org/x/sync/errgroup" ) type ( // Task defines a long running core component of the application. Task interface { // Start defines when the component will be started. // The task MUST watch for the context and return when the context is // canceled. // // Start MUST only error on unhandled catastrophic errors, since // returning an error will cause the application to halt. // // Context canceled error is ignored and not reported as error as they // will be trigger by OS signals or some other component erring. But in // any case the task SHOULD handle and return context cancellation // error. Start(context.Context) error } // TaskPool manages the life-cycle of a pool of tasks. TaskPool struct { tasks []*work } // work is wrapper around task to add metadata to it. work struct { Name string Task Task wait time.Duration } ) func NewTaskPool() *TaskPool { return &TaskPool{} } func (w *work) run(ctx context.Context) error { // first time fire the task from the get go timer := time.NewTimer(time.Nanosecond) for { select { case <-ctx.Done(): return ctx.Err() case <-timer.C: if err := w.Task.Start(ctx); err != nil { return err } } timer.Reset(w.wait) } } func (self *TaskPool) AddTask(name string, wait time.Duration, task Task) { self.tasks = append(self.tasks, &work{ Name: name, Task: task, wait: wait, }) } func (self *TaskPool) Start(ctx context.Context) error { var g errgroup.Group ctx, cancel := context.WithCancel(ctx) defer cancel() for _, w := range self.tasks { g.Go(func(w *work) func() error { return func() error { slog.Info("Process starting", "name", w.Name) now := time.Now() if err := w.run(ctx); err != nil && !errors.Is(context.Canceled, err) { since := time.Since(now) slog.Error( "Process erred", "name", w.Name, "error", err, "duration", since, ) cancel() return err } else { since := time.Since(now) slog.Info( "Process ended", "name", w.Name, "duration", since, ) } return nil } }(w)) } return g.Wait() }