From 1d6fa60b3c60d068d12b19f10f5ad73e836b5a70 Mon Sep 17 00:00:00 2001 From: Gabriel Arakaki Giovanini Date: Sat, 24 Jun 2023 00:00:00 +0200 Subject: feat: Add scheduler again Since I have fixed the issue with the `SIGTINT` hanging the application I can readd the scheduler once more. Also move the param for amount of scheduler work. --- pkg/worker/list_processor.go | 23 ++++++++++++++--------- pkg/worker/scheduler.go | 12 +----------- 2 files changed, 15 insertions(+), 20 deletions(-) (limited to 'pkg/worker') diff --git a/pkg/worker/list_processor.go b/pkg/worker/list_processor.go index d53b7ea..8169e4e 100644 --- a/pkg/worker/list_processor.go +++ b/pkg/worker/list_processor.go @@ -2,6 +2,8 @@ package worker import ( "context" + "errors" + "sync" ) type ( @@ -64,18 +66,21 @@ func (l *listProcessorWorker[T]) Start(ctx context.Context) error { if len(values) == 0 { return nil } + var wg sync.WaitGroup for _, v := range values { - select { - case <-ctx.Done(): - return ctx.Err() - default: - } - - if err := l.listProcessor.Process(ctx, v); err != nil { - return err - } + wg.Add(1) + l.scheduler.Take() + go func(v T) { + defer l.scheduler.Return() + defer wg.Done() + if err := l.listProcessor.Process(ctx, v); err != nil && !errors.Is(err, context.Canceled) { + println("Err", err.Error()) + } + }(v) } + + wg.Wait() } } diff --git a/pkg/worker/scheduler.go b/pkg/worker/scheduler.go index b410b33..2ce86fe 100644 --- a/pkg/worker/scheduler.go +++ b/pkg/worker/scheduler.go @@ -1,13 +1,7 @@ package worker -import ( - "fmt" - "sync/atomic" -) - type Scheduler struct { - pool chan any - count atomic.Int64 + pool chan any } func NewScheduler(count uint) *Scheduler { @@ -18,12 +12,8 @@ func NewScheduler(count uint) *Scheduler { func (self *Scheduler) Take() { self.pool <- nil - self.count.Add(1) - fmt.Printf("<- %d\n", self.count.Load()) } func (self *Scheduler) Return() { <-self.pool - self.count.Add(-1) - fmt.Printf("-> %d\n", self.count.Load()) } -- cgit v1.2.3