From c8e1328164e9ffbd681c3c0e449f1e6b9856b896 Mon Sep 17 00:00:00 2001 From: Gabriel Arakaki Giovanini Date: Sun, 26 Feb 2023 19:54:48 +0100 Subject: feat: Inicial commit It contains rough template for the server and runners. It contains rough template for the server and runners. --- pkg/worker/list_processor.go | 102 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 102 insertions(+) create mode 100644 pkg/worker/list_processor.go (limited to 'pkg/worker/list_processor.go') diff --git a/pkg/worker/list_processor.go b/pkg/worker/list_processor.go new file mode 100644 index 0000000..d53b7ea --- /dev/null +++ b/pkg/worker/list_processor.go @@ -0,0 +1,102 @@ +package worker + +import ( + "context" +) + +type ( + + // A simple worker to deal with list. + ChanProcessor[T any] interface { + Query(context.Context) (<-chan T, error) + Process(context.Context, T) error + } + + ListProcessor[T any] interface { + Query(context.Context) ([]T, error) + Process(context.Context, T) error + } + + chanProcessorWorker[T any] struct { + chanProcessor ChanProcessor[T] + scheduler *Scheduler + } + + listProcessorWorker[T any] struct { + listProcessor ListProcessor[T] + scheduler *Scheduler + } +) + +func NewWorkerFromListProcessor[T any]( + listProcessor ListProcessor[T], + scheduler *Scheduler, +) Worker { + return &listProcessorWorker[T]{ + listProcessor: listProcessor, + scheduler: scheduler, + } +} + +func NewWorkerFromChanProcessor[T any]( + listProcessor ChanProcessor[T], + scheduler *Scheduler, +) Worker { + return &chanProcessorWorker[T]{ + chanProcessor: listProcessor, + scheduler: scheduler, + } +} + +func (l *listProcessorWorker[T]) Start(ctx context.Context) error { + for { + values, err := l.listProcessor.Query(ctx) + if err != nil { + return err + } + + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + if len(values) == 0 { + return nil + } + + for _, v := range values { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + if err := l.listProcessor.Process(ctx, v); err != nil { + return err + } + } + } +} + +func (l *chanProcessorWorker[T]) Start(ctx context.Context) error { + c, err := l.chanProcessor.Query(ctx) + if err != nil { + return err + } + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case v, ok := <-c: + if !ok { + return nil + } + + if err := l.chanProcessor.Process(ctx, v); err != nil { + return err + } + } + } +} -- cgit v1.2.3