aboutsummaryrefslogtreecommitdiff
path: root/pkg/worker/list_processor.go
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/worker/list_processor.go')
-rw-r--r--pkg/worker/list_processor.go23
1 files changed, 14 insertions, 9 deletions
diff --git a/pkg/worker/list_processor.go b/pkg/worker/list_processor.go
index d53b7ea..8169e4e 100644
--- a/pkg/worker/list_processor.go
+++ b/pkg/worker/list_processor.go
@@ -2,6 +2,8 @@ package worker
import (
"context"
+ "errors"
+ "sync"
)
type (
@@ -64,18 +66,21 @@ func (l *listProcessorWorker[T]) Start(ctx context.Context) error {
if len(values) == 0 {
return nil
}
+ var wg sync.WaitGroup
for _, v := range values {
- select {
- case <-ctx.Done():
- return ctx.Err()
- default:
- }
-
- if err := l.listProcessor.Process(ctx, v); err != nil {
- return err
- }
+ wg.Add(1)
+ l.scheduler.Take()
+ go func(v T) {
+ defer l.scheduler.Return()
+ defer wg.Done()
+ if err := l.listProcessor.Process(ctx, v); err != nil && !errors.Is(err, context.Canceled) {
+ println("Err", err.Error())
+ }
+ }(v)
}
+
+ wg.Wait()
}
}