From d4e1ca3a48e74573df6965ceee217e119ff899ae Mon Sep 17 00:00:00 2001 From: Gabriel Arakaki Giovanini Date: Mon, 26 Jun 2023 21:40:40 +0200 Subject: feat: Add scheduler to chan processor --- pkg/worker/list_processor.go | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) (limited to 'pkg/worker/list_processor.go') diff --git a/pkg/worker/list_processor.go b/pkg/worker/list_processor.go index bbc9fb7..0a07085 100644 --- a/pkg/worker/list_processor.go +++ b/pkg/worker/list_processor.go @@ -23,6 +23,7 @@ type ( chanProcessorWorker[T any] struct { chanProcessor ChanProcessor[T] + logrus *logrus.Entry scheduler *Scheduler } @@ -48,10 +49,12 @@ func NewWorkerFromBatchProcessor[T any]( func NewWorkerFromChanProcessor[T any]( chanProcessor ChanProcessor[T], scheduler *Scheduler, + logrus *logrus.Entry, ) Worker { return &chanProcessorWorker[T]{ chanProcessor: chanProcessor, scheduler: scheduler, + logrus: logrus, } } @@ -104,9 +107,13 @@ func (l *chanProcessorWorker[T]) Start(ctx context.Context) error { return nil } - if err := l.chanProcessor.Process(ctx, v); err != nil { - return err - } + l.scheduler.Take() + go func(v T) { + defer l.scheduler.Return() + if err := l.chanProcessor.Process(ctx, v); err != nil { + l.logrus.WithError(err).Error("Error processing chan") + } + }(v) } } } -- cgit v1.2.3