aboutsummaryrefslogtreecommitdiff
path: root/pkg/worker/worker.go
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/worker/worker.go')
-rw-r--r--pkg/worker/worker.go94
1 files changed, 94 insertions, 0 deletions
diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go
new file mode 100644
index 0000000..6b5c21c
--- /dev/null
+++ b/pkg/worker/worker.go
@@ -0,0 +1,94 @@
+package worker
+
+import (
+ "context"
+ "errors"
+ "log/slog"
+ "time"
+
+ "golang.org/x/sync/errgroup"
+)
+
+type (
+ Task interface {
+ Start(context.Context) error
+ }
+
+ Work struct {
+ Name string
+ Task Task
+ wait time.Duration
+ }
+
+ TaskPool struct {
+ tasks []*Work
+ }
+)
+
+const (
+ format = "2006.01.02 15:04:05"
+)
+
+func NewTaskPool() *TaskPool {
+ return &TaskPool{}
+}
+
+func (w *Work) run(ctx context.Context) error {
+ // first time fire from the get go
+ timer := time.NewTimer(time.Nanosecond)
+
+ for {
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case <-timer.C:
+ if err := w.Task.Start(ctx); err != nil && !errors.Is(err, context.Canceled) {
+ return err
+ }
+ }
+ timer.Reset(w.wait)
+ }
+}
+
+func (self *TaskPool) AddTask(name string, wait time.Duration, task Task) {
+ self.tasks = append(self.tasks, &Work{
+ Name: name,
+ Task: task,
+ wait: wait,
+ })
+}
+
+func (self *TaskPool) Start(ctx context.Context) error {
+ var g errgroup.Group
+
+ for _, w := range self.tasks {
+ g.Go(func(w *Work) func() error {
+ return func() error {
+ slog.Info("Process starting", "time", time.Now().Format(format), "name", w.Name)
+ now := time.Now()
+ if err := w.run(ctx); err != nil && !errors.Is(context.Canceled, err) {
+ since := time.Since(now)
+ slog.Error(
+ "Process erred",
+ "time", time.Now().Format(format),
+ "name", w.Name,
+ "error", err,
+ "duration", since,
+ )
+ return err
+ } else {
+ since := time.Since(now)
+ slog.Info(
+ "Process ended",
+ "time", time.Now().Format(format),
+ "name", w.Name,
+ "duration", since,
+ )
+ }
+ return nil
+ }
+ }(w))
+ }
+
+ return g.Wait()
+}