aboutsummaryrefslogtreecommitdiff
path: root/pkg/worker/list_processor.go
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/worker/list_processor.go')
-rw-r--r--pkg/worker/list_processor.go13
1 files changed, 10 insertions, 3 deletions
diff --git a/pkg/worker/list_processor.go b/pkg/worker/list_processor.go
index bbc9fb7..0a07085 100644
--- a/pkg/worker/list_processor.go
+++ b/pkg/worker/list_processor.go
@@ -23,6 +23,7 @@ type (
chanProcessorWorker[T any] struct {
chanProcessor ChanProcessor[T]
+ logrus *logrus.Entry
scheduler *Scheduler
}
@@ -48,10 +49,12 @@ func NewWorkerFromBatchProcessor[T any](
func NewWorkerFromChanProcessor[T any](
chanProcessor ChanProcessor[T],
scheduler *Scheduler,
+ logrus *logrus.Entry,
) Worker {
return &chanProcessorWorker[T]{
chanProcessor: chanProcessor,
scheduler: scheduler,
+ logrus: logrus,
}
}
@@ -104,9 +107,13 @@ func (l *chanProcessorWorker[T]) Start(ctx context.Context) error {
return nil
}
- if err := l.chanProcessor.Process(ctx, v); err != nil {
- return err
- }
+ l.scheduler.Take()
+ go func(v T) {
+ defer l.scheduler.Return()
+ if err := l.chanProcessor.Process(ctx, v); err != nil {
+ l.logrus.WithError(err).Error("Error processing chan")
+ }
+ }(v)
}
}
}