From b242ed3c44f4dde7c4b452312b78a3b02f42ea65 Mon Sep 17 00:00:00 2001 From: Gabriel Arakaki Giovanini Date: Sat, 19 Aug 2023 16:24:42 +0200 Subject: feat: Add task loop Now the tasks will loop every given time. With this it will be able to pick up new photos after the application was started. I added 2h for file because my personal photo gallery is quite big and quite IO bottled necked so it tasks a lot of time to go through. --- pkg/worker/worker.go | 62 ++++++++++++++++++++++++++++++++++------------------ 1 file changed, 41 insertions(+), 21 deletions(-) (limited to 'pkg/worker/worker.go') diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index 359384a..b768320 100644 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -5,48 +5,68 @@ import ( "errors" "fmt" "sync" + "time" ) type ( - // Worker should watch for context - Worker interface { + // Task should watch for context + Task interface { Start(context.Context) error } Work struct { - Name string - Worker Worker + Name string + Task Task + wait time.Duration } - WorkerPool struct { - workers []*Work + TaskPool struct { + tasks []*Work } ) -func NewWorkerPool() *WorkerPool { - return &WorkerPool{} +func NewTaskPool() *TaskPool { + return &TaskPool{} } -func (self *WorkerPool) AddWorker(name string, worker Worker) { - self.workers = append(self.workers, &Work{ - Name: name, - Worker: worker, +func (w *Work) run(ctx context.Context) error { + // first time fire from the get go + timer := time.NewTimer(time.Nanosecond) + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-timer.C: + fmt.Println("Process starting: ", w.Name) + if err := w.Task.Start(ctx); err != nil && !errors.Is(err, context.Canceled) { + fmt.Println("Process errored: ", w.Name, err.Error()) + return err + } else { + fmt.Println("Process done: ", w.Name) + } + } + timer.Reset(w.wait) + } +} + +func (self *TaskPool) AddTask(name string, wait time.Duration, task Task) { + self.tasks = append(self.tasks, &Work{ + Name: name, + Task: task, + wait: wait, }) } -func (self *WorkerPool) Start(ctx context.Context) { +func (self *TaskPool) Start(ctx context.Context) { var wg sync.WaitGroup - wg.Add(len(self.workers)) + wg.Add(len(self.tasks)) - for _, w := range self.workers { + for _, w := range self.tasks { go func(w *Work) { - defer wg.Done() - if err := w.Worker.Start(ctx); err != nil && !errors.Is(err, context.Canceled) { - fmt.Println("Processes finished, error", w.Name, err.Error()) - } else { - fmt.Println(w.Name, "done") - } + _ = w.run(ctx) + wg.Done() }(w) } -- cgit v1.2.3