aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cmd/server/main.go2
-rw-r--r--pkg/worker/exif_scanner.go2
-rw-r--r--pkg/worker/list_processor.go32
3 files changed, 18 insertions, 18 deletions
diff --git a/cmd/server/main.go b/cmd/server/main.go
index 54a7ba0..473bed9 100644
--- a/cmd/server/main.go
+++ b/cmd/server/main.go
@@ -119,7 +119,7 @@ func main() {
var (
serverWorker = worker.NewServerWorker(&fasthttp.Server{Handler: r.Handler})
fileWorker = worker.NewWorkerFromChanProcessor[string](fileScanner, scheduler)
- exifWorker = worker.NewWorkerFromListProcessor[*media.Media](
+ exifWorker = worker.NewWorkerFromBatchProcessor[*media.Media](
exifScanner,
scheduler,
logrus.WithField("context", "exif scanner"),
diff --git a/pkg/worker/exif_scanner.go b/pkg/worker/exif_scanner.go
index d7865e3..4aa247d 100644
--- a/pkg/worker/exif_scanner.go
+++ b/pkg/worker/exif_scanner.go
@@ -13,7 +13,7 @@ type (
}
)
-var _ ListProcessor[*media.Media] = &EXIFScanner{}
+var _ BatchProcessor[*media.Media] = &EXIFScanner{}
func NewEXIFScanner(repository media.Repository) *EXIFScanner {
return &EXIFScanner{
diff --git a/pkg/worker/list_processor.go b/pkg/worker/list_processor.go
index c9c20a9..bbc9fb7 100644
--- a/pkg/worker/list_processor.go
+++ b/pkg/worker/list_processor.go
@@ -16,7 +16,7 @@ type (
Process(context.Context, T) error
}
- ListProcessor[T any] interface {
+ BatchProcessor[T any] interface {
Query(context.Context) ([]T, error)
Process(context.Context, T) error
}
@@ -26,38 +26,38 @@ type (
scheduler *Scheduler
}
- listProcessorWorker[T any] struct {
- listProcessor ListProcessor[T]
- logrus *logrus.Entry
- scheduler *Scheduler
+ batchProcessorWorker[T any] struct {
+ batchProcessor BatchProcessor[T]
+ logrus *logrus.Entry
+ scheduler *Scheduler
}
)
-func NewWorkerFromListProcessor[T any](
- listProcessor ListProcessor[T],
+func NewWorkerFromBatchProcessor[T any](
+ batchProcessor BatchProcessor[T],
scheduler *Scheduler,
logrus *logrus.Entry,
) Worker {
- return &listProcessorWorker[T]{
- listProcessor: listProcessor,
- scheduler: scheduler,
- logrus: logrus,
+ return &batchProcessorWorker[T]{
+ batchProcessor: batchProcessor,
+ scheduler: scheduler,
+ logrus: logrus,
}
}
func NewWorkerFromChanProcessor[T any](
- listProcessor ChanProcessor[T],
+ chanProcessor ChanProcessor[T],
scheduler *Scheduler,
) Worker {
return &chanProcessorWorker[T]{
- chanProcessor: listProcessor,
+ chanProcessor: chanProcessor,
scheduler: scheduler,
}
}
-func (l *listProcessorWorker[T]) Start(ctx context.Context) error {
+func (l *batchProcessorWorker[T]) Start(ctx context.Context) error {
for {
- values, err := l.listProcessor.Query(ctx)
+ values, err := l.batchProcessor.Query(ctx)
if err != nil {
return err
}
@@ -79,7 +79,7 @@ func (l *listProcessorWorker[T]) Start(ctx context.Context) error {
go func(v T) {
defer l.scheduler.Return()
defer wg.Done()
- if err := l.listProcessor.Process(ctx, v); err != nil && !errors.Is(err, context.Canceled) {
+ if err := l.batchProcessor.Process(ctx, v); err != nil && !errors.Is(err, context.Canceled) {
l.logrus.WithError(err).Error("Error processing batch")
}
}(v)