aboutsummaryrefslogtreecommitdiff
path: root/pkg/worker/list_processor.go
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/worker/list_processor.go')
-rw-r--r--pkg/worker/list_processor.go13
1 files changed, 13 insertions, 0 deletions
diff --git a/pkg/worker/list_processor.go b/pkg/worker/list_processor.go
index 0a07085..c060583 100644
--- a/pkg/worker/list_processor.go
+++ b/pkg/worker/list_processor.go
@@ -16,6 +16,10 @@ type (
Process(context.Context, T) error
}
+ OnFail[T any] interface {
+ OnFail(context.Context, T, error)
+ }
+
BatchProcessor[T any] interface {
Query(context.Context) ([]T, error)
Process(context.Context, T) error
@@ -77,6 +81,12 @@ func (l *batchProcessorWorker[T]) Start(ctx context.Context) error {
var wg sync.WaitGroup
for _, v := range values {
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ default:
+ }
+
wg.Add(1)
l.scheduler.Take()
go func(v T) {
@@ -84,6 +94,9 @@ func (l *batchProcessorWorker[T]) Start(ctx context.Context) error {
defer wg.Done()
if err := l.batchProcessor.Process(ctx, v); err != nil && !errors.Is(err, context.Canceled) {
l.logrus.WithError(err).Error("Error processing batch")
+ if failure, ok := l.batchProcessor.(OnFail[T]); ok {
+ failure.OnFail(ctx, v, err)
+ }
}
}(v)
}