aboutsummaryrefslogtreecommitdiff
path: root/pkg/worker/list_processor.go
diff options
context:
space:
mode:
authorGabriel Arakaki Giovanini <mail@gabrielgio.me>2023-07-31 18:25:13 +0200
committerGabriel Arakaki Giovanini <mail@gabrielgio.me>2023-08-06 18:41:34 +0200
commit5f660b309bc695277c67223520499fcc13f3c59f (patch)
treece30f46d8feebac6eb3f5145e9c772be1c32f4ad /pkg/worker/list_processor.go
parent5168a9476f0e83264ecafc85bc9145e8bdcbb8dc (diff)
downloadlens-5f660b309bc695277c67223520499fcc13f3c59f.tar.gz
lens-5f660b309bc695277c67223520499fcc13f3c59f.tar.bz2
lens-5f660b309bc695277c67223520499fcc13f3c59f.zip
feat: Add album scanner
Diffstat (limited to 'pkg/worker/list_processor.go')
-rw-r--r--pkg/worker/list_processor.go59
1 files changed, 56 insertions, 3 deletions
diff --git a/pkg/worker/list_processor.go b/pkg/worker/list_processor.go
index c060583..02817c9 100644
--- a/pkg/worker/list_processor.go
+++ b/pkg/worker/list_processor.go
@@ -20,7 +20,7 @@ type (
OnFail(context.Context, T, error)
}
- BatchProcessor[T any] interface {
+ ListProcessor[T any] interface {
Query(context.Context) ([]T, error)
Process(context.Context, T) error
}
@@ -32,14 +32,20 @@ type (
}
batchProcessorWorker[T any] struct {
- batchProcessor BatchProcessor[T]
+ batchProcessor ListProcessor[T]
+ logrus *logrus.Entry
+ scheduler *Scheduler
+ }
+
+ serialProcessorWorker[T any] struct {
+ batchProcessor ListProcessor[T]
logrus *logrus.Entry
scheduler *Scheduler
}
)
func NewWorkerFromBatchProcessor[T any](
- batchProcessor BatchProcessor[T],
+ batchProcessor ListProcessor[T],
scheduler *Scheduler,
logrus *logrus.Entry,
) Worker {
@@ -50,6 +56,18 @@ func NewWorkerFromBatchProcessor[T any](
}
}
+func NewWorkerFromSerialProcessor[T any](
+ batchProcessor ListProcessor[T],
+ scheduler *Scheduler,
+ logrus *logrus.Entry,
+) Worker {
+ return &serialProcessorWorker[T]{
+ batchProcessor: batchProcessor,
+ scheduler: scheduler,
+ logrus: logrus,
+ }
+}
+
func NewWorkerFromChanProcessor[T any](
chanProcessor ChanProcessor[T],
scheduler *Scheduler,
@@ -105,6 +123,41 @@ func (l *batchProcessorWorker[T]) Start(ctx context.Context) error {
}
}
+func (l *serialProcessorWorker[T]) Start(ctx context.Context) error {
+ for {
+ values, err := l.batchProcessor.Query(ctx)
+ if err != nil {
+ return err
+ }
+
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ default:
+ }
+
+ if len(values) == 0 {
+ return nil
+ }
+ for _, v := range values {
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ default:
+ }
+
+ l.scheduler.Take()
+ if err := l.batchProcessor.Process(ctx, v); err != nil && !errors.Is(err, context.Canceled) {
+ l.logrus.WithError(err).Error("Error processing batch")
+ if failure, ok := l.batchProcessor.(OnFail[T]); ok {
+ failure.OnFail(ctx, v, err)
+ }
+ }
+ l.scheduler.Return()
+ }
+ }
+}
+
func (l *chanProcessorWorker[T]) Start(ctx context.Context) error {
c, err := l.chanProcessor.Query(ctx)
if err != nil {