package worker import ( "context" "errors" "log/slog" "time" "golang.org/x/sync/errgroup" ) type ( Task interface { Start(context.Context) error } Work struct { Name string Task Task wait time.Duration } TaskPool struct { tasks []*Work } ) const ( format = "2006.01.02 15:04:05" ) func NewTaskPool() *TaskPool { return &TaskPool{} } func (w *Work) run(ctx context.Context) error { // first time fire from the get go timer := time.NewTimer(time.Nanosecond) for { select { case <-ctx.Done(): return ctx.Err() case <-timer.C: if err := w.Task.Start(ctx); err != nil && !errors.Is(err, context.Canceled) { return err } } timer.Reset(w.wait) } } func (self *TaskPool) AddTask(name string, wait time.Duration, task Task) { self.tasks = append(self.tasks, &Work{ Name: name, Task: task, wait: wait, }) } func (self *TaskPool) Start(ctx context.Context) error { var g errgroup.Group for _, w := range self.tasks { g.Go(func(w *Work) func() error { return func() error { slog.Info("Process starting", "time", time.Now().Format(format), "name", w.Name) now := time.Now() if err := w.run(ctx); err != nil && !errors.Is(context.Canceled, err) { since := time.Since(now) slog.Error( "Process erred", "time", time.Now().Format(format), "name", w.Name, "error", err, "duration", since, ) return err } else { since := time.Since(now) slog.Info( "Process ended", "time", time.Now().Format(format), "name", w.Name, "duration", since, ) } return nil } }(w)) } return g.Wait() }