diff options
Diffstat (limited to 'pkg/worker/list_processor.go')
-rw-r--r-- | pkg/worker/list_processor.go | 102 |
1 files changed, 102 insertions, 0 deletions
diff --git a/pkg/worker/list_processor.go b/pkg/worker/list_processor.go new file mode 100644 index 0000000..d53b7ea --- /dev/null +++ b/pkg/worker/list_processor.go @@ -0,0 +1,102 @@ +package worker + +import ( + "context" +) + +type ( + + // A simple worker to deal with list. + ChanProcessor[T any] interface { + Query(context.Context) (<-chan T, error) + Process(context.Context, T) error + } + + ListProcessor[T any] interface { + Query(context.Context) ([]T, error) + Process(context.Context, T) error + } + + chanProcessorWorker[T any] struct { + chanProcessor ChanProcessor[T] + scheduler *Scheduler + } + + listProcessorWorker[T any] struct { + listProcessor ListProcessor[T] + scheduler *Scheduler + } +) + +func NewWorkerFromListProcessor[T any]( + listProcessor ListProcessor[T], + scheduler *Scheduler, +) Worker { + return &listProcessorWorker[T]{ + listProcessor: listProcessor, + scheduler: scheduler, + } +} + +func NewWorkerFromChanProcessor[T any]( + listProcessor ChanProcessor[T], + scheduler *Scheduler, +) Worker { + return &chanProcessorWorker[T]{ + chanProcessor: listProcessor, + scheduler: scheduler, + } +} + +func (l *listProcessorWorker[T]) Start(ctx context.Context) error { + for { + values, err := l.listProcessor.Query(ctx) + if err != nil { + return err + } + + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + if len(values) == 0 { + return nil + } + + for _, v := range values { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + if err := l.listProcessor.Process(ctx, v); err != nil { + return err + } + } + } +} + +func (l *chanProcessorWorker[T]) Start(ctx context.Context) error { + c, err := l.chanProcessor.Query(ctx) + if err != nil { + return err + } + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case v, ok := <-c: + if !ok { + return nil + } + + if err := l.chanProcessor.Process(ctx, v); err != nil { + return err + } + } + } +} |