package worker import ( "context" "errors" "sync" "github.com/sirupsen/logrus" ) type ( // A simple worker to deal with list. ChanProcessor[T any] interface { Query(context.Context) (<-chan T, error) Process(context.Context, T) error } BatchProcessor[T any] interface { Query(context.Context) ([]T, error) Process(context.Context, T) error } chanProcessorWorker[T any] struct { chanProcessor ChanProcessor[T] scheduler *Scheduler } batchProcessorWorker[T any] struct { batchProcessor BatchProcessor[T] logrus *logrus.Entry scheduler *Scheduler } ) func NewWorkerFromBatchProcessor[T any]( batchProcessor BatchProcessor[T], scheduler *Scheduler, logrus *logrus.Entry, ) Worker { return &batchProcessorWorker[T]{ batchProcessor: batchProcessor, scheduler: scheduler, logrus: logrus, } } func NewWorkerFromChanProcessor[T any]( chanProcessor ChanProcessor[T], scheduler *Scheduler, ) Worker { return &chanProcessorWorker[T]{ chanProcessor: chanProcessor, scheduler: scheduler, } } func (l *batchProcessorWorker[T]) Start(ctx context.Context) error { for { values, err := l.batchProcessor.Query(ctx) if err != nil { return err } select { case <-ctx.Done(): return ctx.Err() default: } if len(values) == 0 { return nil } var wg sync.WaitGroup for _, v := range values { wg.Add(1) l.scheduler.Take() go func(v T) { defer l.scheduler.Return() defer wg.Done() if err := l.batchProcessor.Process(ctx, v); err != nil && !errors.Is(err, context.Canceled) { l.logrus.WithError(err).Error("Error processing batch") } }(v) } wg.Wait() } } func (l *chanProcessorWorker[T]) Start(ctx context.Context) error { c, err := l.chanProcessor.Query(ctx) if err != nil { return err } for { select { case <-ctx.Done(): return ctx.Err() case v, ok := <-c: if !ok { return nil } if err := l.chanProcessor.Process(ctx, v); err != nil { return err } } } }