aboutsummaryrefslogtreecommitdiff
path: root/pkg/worker/list_processor.go
diff options
context:
space:
mode:
authorGabriel Arakaki Giovanini <mail@gabrielgio.me>2023-06-26 21:40:40 +0200
committerGabriel Arakaki Giovanini <mail@gabrielgio.me>2023-06-26 21:42:17 +0200
commitd4e1ca3a48e74573df6965ceee217e119ff899ae (patch)
treecbe4f1e981aa67a1a2bca62b20d9687b3550af3d /pkg/worker/list_processor.go
parente1c8bb1bd5381d8ade3c699a2d6b4fb373112880 (diff)
downloadlens-d4e1ca3a48e74573df6965ceee217e119ff899ae.tar.gz
lens-d4e1ca3a48e74573df6965ceee217e119ff899ae.tar.bz2
lens-d4e1ca3a48e74573df6965ceee217e119ff899ae.zip
feat: Add scheduler to chan processor
Diffstat (limited to 'pkg/worker/list_processor.go')
-rw-r--r--pkg/worker/list_processor.go13
1 files changed, 10 insertions, 3 deletions
diff --git a/pkg/worker/list_processor.go b/pkg/worker/list_processor.go
index bbc9fb7..0a07085 100644
--- a/pkg/worker/list_processor.go
+++ b/pkg/worker/list_processor.go
@@ -23,6 +23,7 @@ type (
chanProcessorWorker[T any] struct {
chanProcessor ChanProcessor[T]
+ logrus *logrus.Entry
scheduler *Scheduler
}
@@ -48,10 +49,12 @@ func NewWorkerFromBatchProcessor[T any](
func NewWorkerFromChanProcessor[T any](
chanProcessor ChanProcessor[T],
scheduler *Scheduler,
+ logrus *logrus.Entry,
) Worker {
return &chanProcessorWorker[T]{
chanProcessor: chanProcessor,
scheduler: scheduler,
+ logrus: logrus,
}
}
@@ -104,9 +107,13 @@ func (l *chanProcessorWorker[T]) Start(ctx context.Context) error {
return nil
}
- if err := l.chanProcessor.Process(ctx, v); err != nil {
- return err
- }
+ l.scheduler.Take()
+ go func(v T) {
+ defer l.scheduler.Return()
+ if err := l.chanProcessor.Process(ctx, v); err != nil {
+ l.logrus.WithError(err).Error("Error processing chan")
+ }
+ }(v)
}
}
}