diff options
Diffstat (limited to 'pkg')
-rw-r--r-- | pkg/coroutines/coroutines.go | 1 | ||||
-rw-r--r-- | pkg/worker/list_processor.go | 23 | ||||
-rw-r--r-- | pkg/worker/scheduler.go | 12 |
3 files changed, 16 insertions, 20 deletions
diff --git a/pkg/coroutines/coroutines.go b/pkg/coroutines/coroutines.go new file mode 100644 index 0000000..c0f7247 --- /dev/null +++ b/pkg/coroutines/coroutines.go @@ -0,0 +1 @@ +package coroutines diff --git a/pkg/worker/list_processor.go b/pkg/worker/list_processor.go index d53b7ea..8169e4e 100644 --- a/pkg/worker/list_processor.go +++ b/pkg/worker/list_processor.go @@ -2,6 +2,8 @@ package worker import ( "context" + "errors" + "sync" ) type ( @@ -64,18 +66,21 @@ func (l *listProcessorWorker[T]) Start(ctx context.Context) error { if len(values) == 0 { return nil } + var wg sync.WaitGroup for _, v := range values { - select { - case <-ctx.Done(): - return ctx.Err() - default: - } - - if err := l.listProcessor.Process(ctx, v); err != nil { - return err - } + wg.Add(1) + l.scheduler.Take() + go func(v T) { + defer l.scheduler.Return() + defer wg.Done() + if err := l.listProcessor.Process(ctx, v); err != nil && !errors.Is(err, context.Canceled) { + println("Err", err.Error()) + } + }(v) } + + wg.Wait() } } diff --git a/pkg/worker/scheduler.go b/pkg/worker/scheduler.go index b410b33..2ce86fe 100644 --- a/pkg/worker/scheduler.go +++ b/pkg/worker/scheduler.go @@ -1,13 +1,7 @@ package worker -import ( - "fmt" - "sync/atomic" -) - type Scheduler struct { - pool chan any - count atomic.Int64 + pool chan any } func NewScheduler(count uint) *Scheduler { @@ -18,12 +12,8 @@ func NewScheduler(count uint) *Scheduler { func (self *Scheduler) Take() { self.pool <- nil - self.count.Add(1) - fmt.Printf("<- %d\n", self.count.Load()) } func (self *Scheduler) Return() { <-self.pool - self.count.Add(-1) - fmt.Printf("-> %d\n", self.count.Load()) } |