package worker import ( "context" ) type ( // A simple worker to deal with list. ChanProcessor[T any] interface { Query(context.Context) (<-chan T, error) Process(context.Context, T) error } ListProcessor[T any] interface { Query(context.Context) ([]T, error) Process(context.Context, T) error } chanProcessorWorker[T any] struct { chanProcessor ChanProcessor[T] scheduler *Scheduler } listProcessorWorker[T any] struct { listProcessor ListProcessor[T] scheduler *Scheduler } ) func NewWorkerFromListProcessor[T any]( listProcessor ListProcessor[T], scheduler *Scheduler, ) Worker { return &listProcessorWorker[T]{ listProcessor: listProcessor, scheduler: scheduler, } } func NewWorkerFromChanProcessor[T any]( listProcessor ChanProcessor[T], scheduler *Scheduler, ) Worker { return &chanProcessorWorker[T]{ chanProcessor: listProcessor, scheduler: scheduler, } } func (l *listProcessorWorker[T]) Start(ctx context.Context) error { for { values, err := l.listProcessor.Query(ctx) if err != nil { return err } select { case <-ctx.Done(): return ctx.Err() default: } if len(values) == 0 { return nil } for _, v := range values { select { case <-ctx.Done(): return ctx.Err() default: } if err := l.listProcessor.Process(ctx, v); err != nil { return err } } } } func (l *chanProcessorWorker[T]) Start(ctx context.Context) error { c, err := l.chanProcessor.Query(ctx) if err != nil { return err } for { select { case <-ctx.Done(): return ctx.Err() case v, ok := <-c: if !ok { return nil } if err := l.chanProcessor.Process(ctx, v); err != nil { return err } } } }