diff options
-rw-r--r-- | cmd/server/main.go | 2 | ||||
-rw-r--r-- | pkg/worker/exif_scanner.go | 2 | ||||
-rw-r--r-- | pkg/worker/list_processor.go | 32 |
3 files changed, 18 insertions, 18 deletions
diff --git a/cmd/server/main.go b/cmd/server/main.go index 54a7ba0..473bed9 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -119,7 +119,7 @@ func main() { var ( serverWorker = worker.NewServerWorker(&fasthttp.Server{Handler: r.Handler}) fileWorker = worker.NewWorkerFromChanProcessor[string](fileScanner, scheduler) - exifWorker = worker.NewWorkerFromListProcessor[*media.Media]( + exifWorker = worker.NewWorkerFromBatchProcessor[*media.Media]( exifScanner, scheduler, logrus.WithField("context", "exif scanner"), diff --git a/pkg/worker/exif_scanner.go b/pkg/worker/exif_scanner.go index d7865e3..4aa247d 100644 --- a/pkg/worker/exif_scanner.go +++ b/pkg/worker/exif_scanner.go @@ -13,7 +13,7 @@ type ( } ) -var _ ListProcessor[*media.Media] = &EXIFScanner{} +var _ BatchProcessor[*media.Media] = &EXIFScanner{} func NewEXIFScanner(repository media.Repository) *EXIFScanner { return &EXIFScanner{ diff --git a/pkg/worker/list_processor.go b/pkg/worker/list_processor.go index c9c20a9..bbc9fb7 100644 --- a/pkg/worker/list_processor.go +++ b/pkg/worker/list_processor.go @@ -16,7 +16,7 @@ type ( Process(context.Context, T) error } - ListProcessor[T any] interface { + BatchProcessor[T any] interface { Query(context.Context) ([]T, error) Process(context.Context, T) error } @@ -26,38 +26,38 @@ type ( scheduler *Scheduler } - listProcessorWorker[T any] struct { - listProcessor ListProcessor[T] - logrus *logrus.Entry - scheduler *Scheduler + batchProcessorWorker[T any] struct { + batchProcessor BatchProcessor[T] + logrus *logrus.Entry + scheduler *Scheduler } ) -func NewWorkerFromListProcessor[T any]( - listProcessor ListProcessor[T], +func NewWorkerFromBatchProcessor[T any]( + batchProcessor BatchProcessor[T], scheduler *Scheduler, logrus *logrus.Entry, ) Worker { - return &listProcessorWorker[T]{ - listProcessor: listProcessor, - scheduler: scheduler, - logrus: logrus, + return &batchProcessorWorker[T]{ + batchProcessor: batchProcessor, + scheduler: scheduler, + logrus: logrus, } } func NewWorkerFromChanProcessor[T any]( - listProcessor ChanProcessor[T], + chanProcessor ChanProcessor[T], scheduler *Scheduler, ) Worker { return &chanProcessorWorker[T]{ - chanProcessor: listProcessor, + chanProcessor: chanProcessor, scheduler: scheduler, } } -func (l *listProcessorWorker[T]) Start(ctx context.Context) error { +func (l *batchProcessorWorker[T]) Start(ctx context.Context) error { for { - values, err := l.listProcessor.Query(ctx) + values, err := l.batchProcessor.Query(ctx) if err != nil { return err } @@ -79,7 +79,7 @@ func (l *listProcessorWorker[T]) Start(ctx context.Context) error { go func(v T) { defer l.scheduler.Return() defer wg.Done() - if err := l.listProcessor.Process(ctx, v); err != nil && !errors.Is(err, context.Canceled) { + if err := l.batchProcessor.Process(ctx, v); err != nil && !errors.Is(err, context.Canceled) { l.logrus.WithError(err).Error("Error processing batch") } }(v) |