From b242ed3c44f4dde7c4b452312b78a3b02f42ea65 Mon Sep 17 00:00:00 2001 From: Gabriel Arakaki Giovanini Date: Sat, 19 Aug 2023 16:24:42 +0200 Subject: feat: Add task loop Now the tasks will loop every given time. With this it will be able to pick up new photos after the application was started. I added 2h for file because my personal photo gallery is quite big and quite IO bottled necked so it tasks a lot of time to go through. --- cmd/server/main.go | 25 ++++++++-------- pkg/worker/httpserver.go | 8 ++--- pkg/worker/list_processor.go | 32 ++++++++++---------- pkg/worker/list_processor_test.go | 4 +-- pkg/worker/worker.go | 62 ++++++++++++++++++++++++++------------- templates/base.qtpl | 2 +- 6 files changed, 77 insertions(+), 56 deletions(-) diff --git a/cmd/server/main.go b/cmd/server/main.go index daf5356..2a93946 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -7,6 +7,7 @@ import ( "net/http" "os" "os/signal" + "time" "github.com/gorilla/mux" "github.com/sirupsen/logrus" @@ -122,37 +123,37 @@ func main() { albumScanner = scanner.NewAlbumScanner(mediaRepository) ) - // worker + // tasks var ( - serverWorker = worker.NewServerWorker(&http.Server{Handler: r, Addr: "0.0.0.0:8080"}) - fileWorker = worker.NewWorkerFromChanProcessor[string]( + serverTask = worker.NewServerTask(&http.Server{Handler: r, Addr: "0.0.0.0:8080"}) + fileTask = worker.NewTaskFromChanProcessor[string]( fileScanner, scheduler, logrus.WithField("context", "file scanner"), ) - exifWorker = worker.NewWorkerFromBatchProcessor[*repository.Media]( + exifTask = worker.NewTaskFromBatchProcessor[*repository.Media]( exifScanner, scheduler, logrus.WithField("context", "exif scanner"), ) - thumbnailWorker = worker.NewWorkerFromBatchProcessor[*repository.Media]( + thumbnailTask = worker.NewTaskFromBatchProcessor[*repository.Media]( thumbnailScanner, scheduler, logrus.WithField("context", "thumbnail scanner"), ) - albumWorker = worker.NewWorkerFromSerialProcessor[*repository.Media]( + albumTask = worker.NewTaskFromSerialProcessor[*repository.Media]( albumScanner, scheduler, logrus.WithField("context", "thumbnail scanner"), ) ) - pool := worker.NewWorkerPool() - pool.AddWorker("http server", serverWorker) - pool.AddWorker("exif scanner", exifWorker) - pool.AddWorker("file scanner", fileWorker) - pool.AddWorker("thumbnail scanner", thumbnailWorker) - pool.AddWorker("album scanner", albumWorker) + pool := worker.NewTaskPool() + pool.AddTask("http server", time.Minute, serverTask) + pool.AddTask("exif scanner", 15*time.Minute, exifTask) + pool.AddTask("file scanner", 2*time.Hour, fileTask) + pool.AddTask("thumbnail scanner", 15*time.Minute, thumbnailTask) + pool.AddTask("album scanner", 15*time.Minute, albumTask) ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt) defer stop() diff --git a/pkg/worker/httpserver.go b/pkg/worker/httpserver.go index dc8f255..f0ec3ba 100644 --- a/pkg/worker/httpserver.go +++ b/pkg/worker/httpserver.go @@ -5,11 +5,11 @@ import ( "net/http" ) -type ServerWorker struct { +type ServerTask struct { server *http.Server } -func (self *ServerWorker) Start(ctx context.Context) error { +func (self *ServerTask) Start(ctx context.Context) error { go func() { // nolint: errcheck self.server.ListenAndServe() @@ -19,8 +19,8 @@ func (self *ServerWorker) Start(ctx context.Context) error { return self.server.Shutdown(ctx) } -func NewServerWorker(server *http.Server) *ServerWorker { - return &ServerWorker{ +func NewServerTask(server *http.Server) *ServerTask { + return &ServerTask{ server: server, } } diff --git a/pkg/worker/list_processor.go b/pkg/worker/list_processor.go index 02817c9..ea6b453 100644 --- a/pkg/worker/list_processor.go +++ b/pkg/worker/list_processor.go @@ -10,7 +10,7 @@ import ( type ( - // A simple worker to deal with list. + // A simple task to deal with list. ChanProcessor[T any] interface { Query(context.Context) (<-chan T, error) Process(context.Context, T) error @@ -25,62 +25,62 @@ type ( Process(context.Context, T) error } - chanProcessorWorker[T any] struct { + chanProcessorTask[T any] struct { chanProcessor ChanProcessor[T] logrus *logrus.Entry scheduler *Scheduler } - batchProcessorWorker[T any] struct { + batchProcessorTask[T any] struct { batchProcessor ListProcessor[T] logrus *logrus.Entry scheduler *Scheduler } - serialProcessorWorker[T any] struct { + serialProcessorTask[T any] struct { batchProcessor ListProcessor[T] logrus *logrus.Entry scheduler *Scheduler } ) -func NewWorkerFromBatchProcessor[T any]( +func NewTaskFromBatchProcessor[T any]( batchProcessor ListProcessor[T], scheduler *Scheduler, logrus *logrus.Entry, -) Worker { - return &batchProcessorWorker[T]{ +) Task { + return &batchProcessorTask[T]{ batchProcessor: batchProcessor, scheduler: scheduler, logrus: logrus, } } -func NewWorkerFromSerialProcessor[T any]( +func NewTaskFromSerialProcessor[T any]( batchProcessor ListProcessor[T], scheduler *Scheduler, logrus *logrus.Entry, -) Worker { - return &serialProcessorWorker[T]{ +) Task { + return &serialProcessorTask[T]{ batchProcessor: batchProcessor, scheduler: scheduler, logrus: logrus, } } -func NewWorkerFromChanProcessor[T any]( +func NewTaskFromChanProcessor[T any]( chanProcessor ChanProcessor[T], scheduler *Scheduler, logrus *logrus.Entry, -) Worker { - return &chanProcessorWorker[T]{ +) Task { + return &chanProcessorTask[T]{ chanProcessor: chanProcessor, scheduler: scheduler, logrus: logrus, } } -func (l *batchProcessorWorker[T]) Start(ctx context.Context) error { +func (l *batchProcessorTask[T]) Start(ctx context.Context) error { for { values, err := l.batchProcessor.Query(ctx) if err != nil { @@ -123,7 +123,7 @@ func (l *batchProcessorWorker[T]) Start(ctx context.Context) error { } } -func (l *serialProcessorWorker[T]) Start(ctx context.Context) error { +func (l *serialProcessorTask[T]) Start(ctx context.Context) error { for { values, err := l.batchProcessor.Query(ctx) if err != nil { @@ -158,7 +158,7 @@ func (l *serialProcessorWorker[T]) Start(ctx context.Context) error { } } -func (l *chanProcessorWorker[T]) Start(ctx context.Context) error { +func (l *chanProcessorTask[T]) Start(ctx context.Context) error { c, err := l.chanProcessor.Query(ctx) if err != nil { return err diff --git a/pkg/worker/list_processor_test.go b/pkg/worker/list_processor_test.go index ce3ff0a..abdb907 100644 --- a/pkg/worker/list_processor_test.go +++ b/pkg/worker/list_processor_test.go @@ -32,7 +32,7 @@ func TestListProcessorLimit(t *testing.T) { mock = &mockCounterListProcessor{countTo: 10000} ) - worker := NewWorkerFromBatchProcessor[int](mock, scheduler, log.WithField("context", "testing")) + worker := NewTaskFromBatchProcessor[int](mock, scheduler, log.WithField("context", "testing")) err := worker.Start(context.Background()) testkit.TestFatalError(t, "Start", err) @@ -47,7 +47,7 @@ func TestListProcessorContextCancelQuery(t *testing.T) { mock = &mockContextListProcessor{} ) - worker := NewWorkerFromBatchProcessor[int](mock, scheduler, log.WithField("context", "testing")) + worker := NewTaskFromBatchProcessor[int](mock, scheduler, log.WithField("context", "testing")) ctx, cancel := context.WithCancel(context.Background()) var wg sync.WaitGroup diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index 359384a..b768320 100644 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -5,48 +5,68 @@ import ( "errors" "fmt" "sync" + "time" ) type ( - // Worker should watch for context - Worker interface { + // Task should watch for context + Task interface { Start(context.Context) error } Work struct { - Name string - Worker Worker + Name string + Task Task + wait time.Duration } - WorkerPool struct { - workers []*Work + TaskPool struct { + tasks []*Work } ) -func NewWorkerPool() *WorkerPool { - return &WorkerPool{} +func NewTaskPool() *TaskPool { + return &TaskPool{} } -func (self *WorkerPool) AddWorker(name string, worker Worker) { - self.workers = append(self.workers, &Work{ - Name: name, - Worker: worker, +func (w *Work) run(ctx context.Context) error { + // first time fire from the get go + timer := time.NewTimer(time.Nanosecond) + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-timer.C: + fmt.Println("Process starting: ", w.Name) + if err := w.Task.Start(ctx); err != nil && !errors.Is(err, context.Canceled) { + fmt.Println("Process errored: ", w.Name, err.Error()) + return err + } else { + fmt.Println("Process done: ", w.Name) + } + } + timer.Reset(w.wait) + } +} + +func (self *TaskPool) AddTask(name string, wait time.Duration, task Task) { + self.tasks = append(self.tasks, &Work{ + Name: name, + Task: task, + wait: wait, }) } -func (self *WorkerPool) Start(ctx context.Context) { +func (self *TaskPool) Start(ctx context.Context) { var wg sync.WaitGroup - wg.Add(len(self.workers)) + wg.Add(len(self.tasks)) - for _, w := range self.workers { + for _, w := range self.tasks { go func(w *Work) { - defer wg.Done() - if err := w.Worker.Start(ctx); err != nil && !errors.Is(err, context.Canceled) { - fmt.Println("Processes finished, error", w.Name, err.Error()) - } else { - fmt.Println(w.Name, "done") - } + _ = w.run(ctx) + wg.Done() }(w) } diff --git a/templates/base.qtpl b/templates/base.qtpl index b1878ba..a80803a 100644 --- a/templates/base.qtpl +++ b/templates/base.qtpl @@ -47,7 +47,7 @@ Page prints a page implementing Page interface. -
+
{%= p.Content() %}
-- cgit v1.2.3