diff options
author | Gabriel Arakaki Giovanini <mail@gabrielgio.me> | 2023-06-24 00:00:00 +0200 |
---|---|---|
committer | Gabriel Arakaki Giovanini <mail@gabrielgio.me> | 2023-06-24 00:00:00 +0200 |
commit | 1d6fa60b3c60d068d12b19f10f5ad73e836b5a70 (patch) | |
tree | a30afce205084ddccb2a3ca717b8458b48b02e4a /pkg | |
parent | cff5600c8abebd1ce988b2185c07e998c4a1d483 (diff) | |
download | lens-1d6fa60b3c60d068d12b19f10f5ad73e836b5a70.tar.gz lens-1d6fa60b3c60d068d12b19f10f5ad73e836b5a70.tar.bz2 lens-1d6fa60b3c60d068d12b19f10f5ad73e836b5a70.zip |
feat: Add scheduler again
Since I have fixed the issue with the `SIGTINT` hanging the application
I can readd the scheduler once more.
Also move the param for amount of scheduler work.
Diffstat (limited to 'pkg')
-rw-r--r-- | pkg/coroutines/coroutines.go | 1 | ||||
-rw-r--r-- | pkg/worker/list_processor.go | 23 | ||||
-rw-r--r-- | pkg/worker/scheduler.go | 12 |
3 files changed, 16 insertions, 20 deletions
diff --git a/pkg/coroutines/coroutines.go b/pkg/coroutines/coroutines.go new file mode 100644 index 0000000..c0f7247 --- /dev/null +++ b/pkg/coroutines/coroutines.go @@ -0,0 +1 @@ +package coroutines diff --git a/pkg/worker/list_processor.go b/pkg/worker/list_processor.go index d53b7ea..8169e4e 100644 --- a/pkg/worker/list_processor.go +++ b/pkg/worker/list_processor.go @@ -2,6 +2,8 @@ package worker import ( "context" + "errors" + "sync" ) type ( @@ -64,18 +66,21 @@ func (l *listProcessorWorker[T]) Start(ctx context.Context) error { if len(values) == 0 { return nil } + var wg sync.WaitGroup for _, v := range values { - select { - case <-ctx.Done(): - return ctx.Err() - default: - } - - if err := l.listProcessor.Process(ctx, v); err != nil { - return err - } + wg.Add(1) + l.scheduler.Take() + go func(v T) { + defer l.scheduler.Return() + defer wg.Done() + if err := l.listProcessor.Process(ctx, v); err != nil && !errors.Is(err, context.Canceled) { + println("Err", err.Error()) + } + }(v) } + + wg.Wait() } } diff --git a/pkg/worker/scheduler.go b/pkg/worker/scheduler.go index b410b33..2ce86fe 100644 --- a/pkg/worker/scheduler.go +++ b/pkg/worker/scheduler.go @@ -1,13 +1,7 @@ package worker -import ( - "fmt" - "sync/atomic" -) - type Scheduler struct { - pool chan any - count atomic.Int64 + pool chan any } func NewScheduler(count uint) *Scheduler { @@ -18,12 +12,8 @@ func NewScheduler(count uint) *Scheduler { func (self *Scheduler) Take() { self.pool <- nil - self.count.Add(1) - fmt.Printf("<- %d\n", self.count.Load()) } func (self *Scheduler) Return() { <-self.pool - self.count.Add(-1) - fmt.Printf("-> %d\n", self.count.Load()) } |