diff options
Diffstat (limited to 'pkg/worker')
| -rw-r--r-- | pkg/worker/httpserver.go | 8 | ||||
| -rw-r--r-- | pkg/worker/list_processor.go | 32 | ||||
| -rw-r--r-- | pkg/worker/list_processor_test.go | 4 | ||||
| -rw-r--r-- | pkg/worker/worker.go | 62 | 
4 files changed, 63 insertions, 43 deletions
| diff --git a/pkg/worker/httpserver.go b/pkg/worker/httpserver.go index dc8f255..f0ec3ba 100644 --- a/pkg/worker/httpserver.go +++ b/pkg/worker/httpserver.go @@ -5,11 +5,11 @@ import (  	"net/http"  ) -type ServerWorker struct { +type ServerTask struct {  	server *http.Server  } -func (self *ServerWorker) Start(ctx context.Context) error { +func (self *ServerTask) Start(ctx context.Context) error {  	go func() {  		// nolint: errcheck  		self.server.ListenAndServe() @@ -19,8 +19,8 @@ func (self *ServerWorker) Start(ctx context.Context) error {  	return self.server.Shutdown(ctx)  } -func NewServerWorker(server *http.Server) *ServerWorker { -	return &ServerWorker{ +func NewServerTask(server *http.Server) *ServerTask { +	return &ServerTask{  		server: server,  	}  } diff --git a/pkg/worker/list_processor.go b/pkg/worker/list_processor.go index 02817c9..ea6b453 100644 --- a/pkg/worker/list_processor.go +++ b/pkg/worker/list_processor.go @@ -10,7 +10,7 @@ import (  type ( -	// A simple worker to deal with list. +	// A simple task to deal with list.  	ChanProcessor[T any] interface {  		Query(context.Context) (<-chan T, error)  		Process(context.Context, T) error @@ -25,62 +25,62 @@ type (  		Process(context.Context, T) error  	} -	chanProcessorWorker[T any] struct { +	chanProcessorTask[T any] struct {  		chanProcessor ChanProcessor[T]  		logrus        *logrus.Entry  		scheduler     *Scheduler  	} -	batchProcessorWorker[T any] struct { +	batchProcessorTask[T any] struct {  		batchProcessor ListProcessor[T]  		logrus         *logrus.Entry  		scheduler      *Scheduler  	} -	serialProcessorWorker[T any] struct { +	serialProcessorTask[T any] struct {  		batchProcessor ListProcessor[T]  		logrus         *logrus.Entry  		scheduler      *Scheduler  	}  ) -func NewWorkerFromBatchProcessor[T any]( +func NewTaskFromBatchProcessor[T any](  	batchProcessor ListProcessor[T],  	scheduler *Scheduler,  	logrus *logrus.Entry, -) Worker { -	return &batchProcessorWorker[T]{ +) Task { +	return &batchProcessorTask[T]{  		batchProcessor: batchProcessor,  		scheduler:      scheduler,  		logrus:         logrus,  	}  } -func NewWorkerFromSerialProcessor[T any]( +func NewTaskFromSerialProcessor[T any](  	batchProcessor ListProcessor[T],  	scheduler *Scheduler,  	logrus *logrus.Entry, -) Worker { -	return &serialProcessorWorker[T]{ +) Task { +	return &serialProcessorTask[T]{  		batchProcessor: batchProcessor,  		scheduler:      scheduler,  		logrus:         logrus,  	}  } -func NewWorkerFromChanProcessor[T any]( +func NewTaskFromChanProcessor[T any](  	chanProcessor ChanProcessor[T],  	scheduler *Scheduler,  	logrus *logrus.Entry, -) Worker { -	return &chanProcessorWorker[T]{ +) Task { +	return &chanProcessorTask[T]{  		chanProcessor: chanProcessor,  		scheduler:     scheduler,  		logrus:        logrus,  	}  } -func (l *batchProcessorWorker[T]) Start(ctx context.Context) error { +func (l *batchProcessorTask[T]) Start(ctx context.Context) error {  	for {  		values, err := l.batchProcessor.Query(ctx)  		if err != nil { @@ -123,7 +123,7 @@ func (l *batchProcessorWorker[T]) Start(ctx context.Context) error {  	}  } -func (l *serialProcessorWorker[T]) Start(ctx context.Context) error { +func (l *serialProcessorTask[T]) Start(ctx context.Context) error {  	for {  		values, err := l.batchProcessor.Query(ctx)  		if err != nil { @@ -158,7 +158,7 @@ func (l *serialProcessorWorker[T]) Start(ctx context.Context) error {  	}  } -func (l *chanProcessorWorker[T]) Start(ctx context.Context) error { +func (l *chanProcessorTask[T]) Start(ctx context.Context) error {  	c, err := l.chanProcessor.Query(ctx)  	if err != nil {  		return err diff --git a/pkg/worker/list_processor_test.go b/pkg/worker/list_processor_test.go index ce3ff0a..abdb907 100644 --- a/pkg/worker/list_processor_test.go +++ b/pkg/worker/list_processor_test.go @@ -32,7 +32,7 @@ func TestListProcessorLimit(t *testing.T) {  		mock      = &mockCounterListProcessor{countTo: 10000}  	) -	worker := NewWorkerFromBatchProcessor[int](mock, scheduler, log.WithField("context", "testing")) +	worker := NewTaskFromBatchProcessor[int](mock, scheduler, log.WithField("context", "testing"))  	err := worker.Start(context.Background())  	testkit.TestFatalError(t, "Start", err) @@ -47,7 +47,7 @@ func TestListProcessorContextCancelQuery(t *testing.T) {  		mock      = &mockContextListProcessor{}  	) -	worker := NewWorkerFromBatchProcessor[int](mock, scheduler, log.WithField("context", "testing")) +	worker := NewTaskFromBatchProcessor[int](mock, scheduler, log.WithField("context", "testing"))  	ctx, cancel := context.WithCancel(context.Background())  	var wg sync.WaitGroup diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index 359384a..b768320 100644 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -5,48 +5,68 @@ import (  	"errors"  	"fmt"  	"sync" +	"time"  )  type ( -	// Worker should watch for context -	Worker interface { +	// Task should watch for context +	Task interface {  		Start(context.Context) error  	}  	Work struct { -		Name   string -		Worker Worker +		Name string +		Task Task +		wait time.Duration  	} -	WorkerPool struct { -		workers []*Work +	TaskPool struct { +		tasks []*Work  	}  ) -func NewWorkerPool() *WorkerPool { -	return &WorkerPool{} +func NewTaskPool() *TaskPool { +	return &TaskPool{}  } -func (self *WorkerPool) AddWorker(name string, worker Worker) { -	self.workers = append(self.workers, &Work{ -		Name:   name, -		Worker: worker, +func (w *Work) run(ctx context.Context) error { +	// first time fire from the get go +	timer := time.NewTimer(time.Nanosecond) + +	for { +		select { +		case <-ctx.Done(): +			return ctx.Err() +		case <-timer.C: +			fmt.Println("Process starting: ", w.Name) +			if err := w.Task.Start(ctx); err != nil && !errors.Is(err, context.Canceled) { +				fmt.Println("Process errored: ", w.Name, err.Error()) +				return err +			} else { +				fmt.Println("Process done: ", w.Name) +			} +		} +		timer.Reset(w.wait) +	} +} + +func (self *TaskPool) AddTask(name string, wait time.Duration, task Task) { +	self.tasks = append(self.tasks, &Work{ +		Name: name, +		Task: task, +		wait: wait,  	})  } -func (self *WorkerPool) Start(ctx context.Context) { +func (self *TaskPool) Start(ctx context.Context) {  	var wg sync.WaitGroup -	wg.Add(len(self.workers)) +	wg.Add(len(self.tasks)) -	for _, w := range self.workers { +	for _, w := range self.tasks {  		go func(w *Work) { -			defer wg.Done() -			if err := w.Worker.Start(ctx); err != nil && !errors.Is(err, context.Canceled) { -				fmt.Println("Processes finished, error", w.Name, err.Error()) -			} else { -				fmt.Println(w.Name, "done") -			} +			_ = w.run(ctx) +			wg.Done()  		}(w)  	} | 
