aboutsummaryrefslogtreecommitdiff
path: root/pkg/worker/worker.go
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/worker/worker.go')
-rw-r--r--pkg/worker/worker.go46
1 files changed, 29 insertions, 17 deletions
diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go
index 6b5c21c..fc97c97 100644
--- a/pkg/worker/worker.go
+++ b/pkg/worker/worker.go
@@ -10,31 +10,42 @@ import (
)
type (
+
+ // Task defines a long running core component of the application.
Task interface {
+ // Start defines when the component will be started.
+ // The task MUST watch for the context and return when the context is
+ // canceled.
+ //
+ // Start MUST only error on unhandled catastrophic errors, since
+ // returning an error will cause the application to halt.
+ //
+ // Context canceled error is ignored and not reported as error as they
+ // will be trigger by OS signals or some other component erring. But in
+ // any case the task SHOULD handle and return context cancellation
+ // error.
Start(context.Context) error
}
- Work struct {
+ // TaskPool manages the life-cycle of a pool of tasks.
+ TaskPool struct {
+ tasks []*work
+ }
+
+ // work is wrapper around task to add metadata to it.
+ work struct {
Name string
Task Task
wait time.Duration
}
-
- TaskPool struct {
- tasks []*Work
- }
-)
-
-const (
- format = "2006.01.02 15:04:05"
)
func NewTaskPool() *TaskPool {
return &TaskPool{}
}
-func (w *Work) run(ctx context.Context) error {
- // first time fire from the get go
+func (w *work) run(ctx context.Context) error {
+ // first time fire the task from the get go
timer := time.NewTimer(time.Nanosecond)
for {
@@ -42,7 +53,7 @@ func (w *Work) run(ctx context.Context) error {
case <-ctx.Done():
return ctx.Err()
case <-timer.C:
- if err := w.Task.Start(ctx); err != nil && !errors.Is(err, context.Canceled) {
+ if err := w.Task.Start(ctx); err != nil {
return err
}
}
@@ -51,7 +62,7 @@ func (w *Work) run(ctx context.Context) error {
}
func (self *TaskPool) AddTask(name string, wait time.Duration, task Task) {
- self.tasks = append(self.tasks, &Work{
+ self.tasks = append(self.tasks, &work{
Name: name,
Task: task,
wait: wait,
@@ -60,27 +71,28 @@ func (self *TaskPool) AddTask(name string, wait time.Duration, task Task) {
func (self *TaskPool) Start(ctx context.Context) error {
var g errgroup.Group
+ ctx, cancel := context.WithCancel(ctx)
+ defer cancel()
for _, w := range self.tasks {
- g.Go(func(w *Work) func() error {
+ g.Go(func(w *work) func() error {
return func() error {
- slog.Info("Process starting", "time", time.Now().Format(format), "name", w.Name)
+ slog.Info("Process starting", "name", w.Name)
now := time.Now()
if err := w.run(ctx); err != nil && !errors.Is(context.Canceled, err) {
since := time.Since(now)
slog.Error(
"Process erred",
- "time", time.Now().Format(format),
"name", w.Name,
"error", err,
"duration", since,
)
+ cancel()
return err
} else {
since := time.Since(now)
slog.Info(
"Process ended",
- "time", time.Now().Format(format),
"name", w.Name,
"duration", since,
)