From 5f660b309bc695277c67223520499fcc13f3c59f Mon Sep 17 00:00:00 2001 From: Gabriel Arakaki Giovanini Date: Mon, 31 Jul 2023 18:25:13 +0200 Subject: feat: Add album scanner --- pkg/worker/list_processor.go | 59 +++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 56 insertions(+), 3 deletions(-) (limited to 'pkg/worker/list_processor.go') diff --git a/pkg/worker/list_processor.go b/pkg/worker/list_processor.go index c060583..02817c9 100644 --- a/pkg/worker/list_processor.go +++ b/pkg/worker/list_processor.go @@ -20,7 +20,7 @@ type ( OnFail(context.Context, T, error) } - BatchProcessor[T any] interface { + ListProcessor[T any] interface { Query(context.Context) ([]T, error) Process(context.Context, T) error } @@ -32,14 +32,20 @@ type ( } batchProcessorWorker[T any] struct { - batchProcessor BatchProcessor[T] + batchProcessor ListProcessor[T] + logrus *logrus.Entry + scheduler *Scheduler + } + + serialProcessorWorker[T any] struct { + batchProcessor ListProcessor[T] logrus *logrus.Entry scheduler *Scheduler } ) func NewWorkerFromBatchProcessor[T any]( - batchProcessor BatchProcessor[T], + batchProcessor ListProcessor[T], scheduler *Scheduler, logrus *logrus.Entry, ) Worker { @@ -50,6 +56,18 @@ func NewWorkerFromBatchProcessor[T any]( } } +func NewWorkerFromSerialProcessor[T any]( + batchProcessor ListProcessor[T], + scheduler *Scheduler, + logrus *logrus.Entry, +) Worker { + return &serialProcessorWorker[T]{ + batchProcessor: batchProcessor, + scheduler: scheduler, + logrus: logrus, + } +} + func NewWorkerFromChanProcessor[T any]( chanProcessor ChanProcessor[T], scheduler *Scheduler, @@ -105,6 +123,41 @@ func (l *batchProcessorWorker[T]) Start(ctx context.Context) error { } } +func (l *serialProcessorWorker[T]) Start(ctx context.Context) error { + for { + values, err := l.batchProcessor.Query(ctx) + if err != nil { + return err + } + + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + if len(values) == 0 { + return nil + } + for _, v := range values { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + l.scheduler.Take() + if err := l.batchProcessor.Process(ctx, v); err != nil && !errors.Is(err, context.Canceled) { + l.logrus.WithError(err).Error("Error processing batch") + if failure, ok := l.batchProcessor.(OnFail[T]); ok { + failure.OnFail(ctx, v, err) + } + } + l.scheduler.Return() + } + } +} + func (l *chanProcessorWorker[T]) Start(ctx context.Context) error { c, err := l.chanProcessor.Query(ctx) if err != nil { -- cgit v1.2.3