aboutsummaryrefslogtreecommitdiff
path: root/pkg/worker/list_processor.go
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/worker/list_processor.go')
-rw-r--r--pkg/worker/list_processor.go102
1 files changed, 102 insertions, 0 deletions
diff --git a/pkg/worker/list_processor.go b/pkg/worker/list_processor.go
new file mode 100644
index 0000000..d53b7ea
--- /dev/null
+++ b/pkg/worker/list_processor.go
@@ -0,0 +1,102 @@
+package worker
+
+import (
+ "context"
+)
+
+type (
+
+ // A simple worker to deal with list.
+ ChanProcessor[T any] interface {
+ Query(context.Context) (<-chan T, error)
+ Process(context.Context, T) error
+ }
+
+ ListProcessor[T any] interface {
+ Query(context.Context) ([]T, error)
+ Process(context.Context, T) error
+ }
+
+ chanProcessorWorker[T any] struct {
+ chanProcessor ChanProcessor[T]
+ scheduler *Scheduler
+ }
+
+ listProcessorWorker[T any] struct {
+ listProcessor ListProcessor[T]
+ scheduler *Scheduler
+ }
+)
+
+func NewWorkerFromListProcessor[T any](
+ listProcessor ListProcessor[T],
+ scheduler *Scheduler,
+) Worker {
+ return &listProcessorWorker[T]{
+ listProcessor: listProcessor,
+ scheduler: scheduler,
+ }
+}
+
+func NewWorkerFromChanProcessor[T any](
+ listProcessor ChanProcessor[T],
+ scheduler *Scheduler,
+) Worker {
+ return &chanProcessorWorker[T]{
+ chanProcessor: listProcessor,
+ scheduler: scheduler,
+ }
+}
+
+func (l *listProcessorWorker[T]) Start(ctx context.Context) error {
+ for {
+ values, err := l.listProcessor.Query(ctx)
+ if err != nil {
+ return err
+ }
+
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ default:
+ }
+
+ if len(values) == 0 {
+ return nil
+ }
+
+ for _, v := range values {
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ default:
+ }
+
+ if err := l.listProcessor.Process(ctx, v); err != nil {
+ return err
+ }
+ }
+ }
+}
+
+func (l *chanProcessorWorker[T]) Start(ctx context.Context) error {
+ c, err := l.chanProcessor.Query(ctx)
+ if err != nil {
+ return err
+ }
+
+ for {
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case v, ok := <-c:
+ if !ok {
+ return nil
+ }
+
+ if err := l.chanProcessor.Process(ctx, v); err != nil {
+ return err
+ }
+ }
+ }
+}