diff options
| -rw-r--r-- | cmd/server/main.go | 25 | ||||
| -rw-r--r-- | pkg/worker/httpserver.go | 8 | ||||
| -rw-r--r-- | pkg/worker/list_processor.go | 32 | ||||
| -rw-r--r-- | pkg/worker/list_processor_test.go | 4 | ||||
| -rw-r--r-- | pkg/worker/worker.go | 62 | ||||
| -rw-r--r-- | templates/base.qtpl | 2 | 
6 files changed, 77 insertions, 56 deletions
| diff --git a/cmd/server/main.go b/cmd/server/main.go index daf5356..2a93946 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -7,6 +7,7 @@ import (  	"net/http"  	"os"  	"os/signal" +	"time"  	"github.com/gorilla/mux"  	"github.com/sirupsen/logrus" @@ -122,37 +123,37 @@ func main() {  		albumScanner     = scanner.NewAlbumScanner(mediaRepository)  	) -	// worker +	// tasks  	var ( -		serverWorker = worker.NewServerWorker(&http.Server{Handler: r, Addr: "0.0.0.0:8080"}) -		fileWorker   = worker.NewWorkerFromChanProcessor[string]( +		serverTask = worker.NewServerTask(&http.Server{Handler: r, Addr: "0.0.0.0:8080"}) +		fileTask   = worker.NewTaskFromChanProcessor[string](  			fileScanner,  			scheduler,  			logrus.WithField("context", "file scanner"),  		) -		exifWorker = worker.NewWorkerFromBatchProcessor[*repository.Media]( +		exifTask = worker.NewTaskFromBatchProcessor[*repository.Media](  			exifScanner,  			scheduler,  			logrus.WithField("context", "exif scanner"),  		) -		thumbnailWorker = worker.NewWorkerFromBatchProcessor[*repository.Media]( +		thumbnailTask = worker.NewTaskFromBatchProcessor[*repository.Media](  			thumbnailScanner,  			scheduler,  			logrus.WithField("context", "thumbnail scanner"),  		) -		albumWorker = worker.NewWorkerFromSerialProcessor[*repository.Media]( +		albumTask = worker.NewTaskFromSerialProcessor[*repository.Media](  			albumScanner,  			scheduler,  			logrus.WithField("context", "thumbnail scanner"),  		)  	) -	pool := worker.NewWorkerPool() -	pool.AddWorker("http server", serverWorker) -	pool.AddWorker("exif scanner", exifWorker) -	pool.AddWorker("file scanner", fileWorker) -	pool.AddWorker("thumbnail scanner", thumbnailWorker) -	pool.AddWorker("album scanner", albumWorker) +	pool := worker.NewTaskPool() +	pool.AddTask("http server", time.Minute, serverTask) +	pool.AddTask("exif scanner", 15*time.Minute, exifTask) +	pool.AddTask("file scanner", 2*time.Hour, fileTask) +	pool.AddTask("thumbnail scanner", 15*time.Minute, thumbnailTask) +	pool.AddTask("album scanner", 15*time.Minute, albumTask)  	ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt)  	defer stop() diff --git a/pkg/worker/httpserver.go b/pkg/worker/httpserver.go index dc8f255..f0ec3ba 100644 --- a/pkg/worker/httpserver.go +++ b/pkg/worker/httpserver.go @@ -5,11 +5,11 @@ import (  	"net/http"  ) -type ServerWorker struct { +type ServerTask struct {  	server *http.Server  } -func (self *ServerWorker) Start(ctx context.Context) error { +func (self *ServerTask) Start(ctx context.Context) error {  	go func() {  		// nolint: errcheck  		self.server.ListenAndServe() @@ -19,8 +19,8 @@ func (self *ServerWorker) Start(ctx context.Context) error {  	return self.server.Shutdown(ctx)  } -func NewServerWorker(server *http.Server) *ServerWorker { -	return &ServerWorker{ +func NewServerTask(server *http.Server) *ServerTask { +	return &ServerTask{  		server: server,  	}  } diff --git a/pkg/worker/list_processor.go b/pkg/worker/list_processor.go index 02817c9..ea6b453 100644 --- a/pkg/worker/list_processor.go +++ b/pkg/worker/list_processor.go @@ -10,7 +10,7 @@ import (  type ( -	// A simple worker to deal with list. +	// A simple task to deal with list.  	ChanProcessor[T any] interface {  		Query(context.Context) (<-chan T, error)  		Process(context.Context, T) error @@ -25,62 +25,62 @@ type (  		Process(context.Context, T) error  	} -	chanProcessorWorker[T any] struct { +	chanProcessorTask[T any] struct {  		chanProcessor ChanProcessor[T]  		logrus        *logrus.Entry  		scheduler     *Scheduler  	} -	batchProcessorWorker[T any] struct { +	batchProcessorTask[T any] struct {  		batchProcessor ListProcessor[T]  		logrus         *logrus.Entry  		scheduler      *Scheduler  	} -	serialProcessorWorker[T any] struct { +	serialProcessorTask[T any] struct {  		batchProcessor ListProcessor[T]  		logrus         *logrus.Entry  		scheduler      *Scheduler  	}  ) -func NewWorkerFromBatchProcessor[T any]( +func NewTaskFromBatchProcessor[T any](  	batchProcessor ListProcessor[T],  	scheduler *Scheduler,  	logrus *logrus.Entry, -) Worker { -	return &batchProcessorWorker[T]{ +) Task { +	return &batchProcessorTask[T]{  		batchProcessor: batchProcessor,  		scheduler:      scheduler,  		logrus:         logrus,  	}  } -func NewWorkerFromSerialProcessor[T any]( +func NewTaskFromSerialProcessor[T any](  	batchProcessor ListProcessor[T],  	scheduler *Scheduler,  	logrus *logrus.Entry, -) Worker { -	return &serialProcessorWorker[T]{ +) Task { +	return &serialProcessorTask[T]{  		batchProcessor: batchProcessor,  		scheduler:      scheduler,  		logrus:         logrus,  	}  } -func NewWorkerFromChanProcessor[T any]( +func NewTaskFromChanProcessor[T any](  	chanProcessor ChanProcessor[T],  	scheduler *Scheduler,  	logrus *logrus.Entry, -) Worker { -	return &chanProcessorWorker[T]{ +) Task { +	return &chanProcessorTask[T]{  		chanProcessor: chanProcessor,  		scheduler:     scheduler,  		logrus:        logrus,  	}  } -func (l *batchProcessorWorker[T]) Start(ctx context.Context) error { +func (l *batchProcessorTask[T]) Start(ctx context.Context) error {  	for {  		values, err := l.batchProcessor.Query(ctx)  		if err != nil { @@ -123,7 +123,7 @@ func (l *batchProcessorWorker[T]) Start(ctx context.Context) error {  	}  } -func (l *serialProcessorWorker[T]) Start(ctx context.Context) error { +func (l *serialProcessorTask[T]) Start(ctx context.Context) error {  	for {  		values, err := l.batchProcessor.Query(ctx)  		if err != nil { @@ -158,7 +158,7 @@ func (l *serialProcessorWorker[T]) Start(ctx context.Context) error {  	}  } -func (l *chanProcessorWorker[T]) Start(ctx context.Context) error { +func (l *chanProcessorTask[T]) Start(ctx context.Context) error {  	c, err := l.chanProcessor.Query(ctx)  	if err != nil {  		return err diff --git a/pkg/worker/list_processor_test.go b/pkg/worker/list_processor_test.go index ce3ff0a..abdb907 100644 --- a/pkg/worker/list_processor_test.go +++ b/pkg/worker/list_processor_test.go @@ -32,7 +32,7 @@ func TestListProcessorLimit(t *testing.T) {  		mock      = &mockCounterListProcessor{countTo: 10000}  	) -	worker := NewWorkerFromBatchProcessor[int](mock, scheduler, log.WithField("context", "testing")) +	worker := NewTaskFromBatchProcessor[int](mock, scheduler, log.WithField("context", "testing"))  	err := worker.Start(context.Background())  	testkit.TestFatalError(t, "Start", err) @@ -47,7 +47,7 @@ func TestListProcessorContextCancelQuery(t *testing.T) {  		mock      = &mockContextListProcessor{}  	) -	worker := NewWorkerFromBatchProcessor[int](mock, scheduler, log.WithField("context", "testing")) +	worker := NewTaskFromBatchProcessor[int](mock, scheduler, log.WithField("context", "testing"))  	ctx, cancel := context.WithCancel(context.Background())  	var wg sync.WaitGroup diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index 359384a..b768320 100644 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -5,48 +5,68 @@ import (  	"errors"  	"fmt"  	"sync" +	"time"  )  type ( -	// Worker should watch for context -	Worker interface { +	// Task should watch for context +	Task interface {  		Start(context.Context) error  	}  	Work struct { -		Name   string -		Worker Worker +		Name string +		Task Task +		wait time.Duration  	} -	WorkerPool struct { -		workers []*Work +	TaskPool struct { +		tasks []*Work  	}  ) -func NewWorkerPool() *WorkerPool { -	return &WorkerPool{} +func NewTaskPool() *TaskPool { +	return &TaskPool{}  } -func (self *WorkerPool) AddWorker(name string, worker Worker) { -	self.workers = append(self.workers, &Work{ -		Name:   name, -		Worker: worker, +func (w *Work) run(ctx context.Context) error { +	// first time fire from the get go +	timer := time.NewTimer(time.Nanosecond) + +	for { +		select { +		case <-ctx.Done(): +			return ctx.Err() +		case <-timer.C: +			fmt.Println("Process starting: ", w.Name) +			if err := w.Task.Start(ctx); err != nil && !errors.Is(err, context.Canceled) { +				fmt.Println("Process errored: ", w.Name, err.Error()) +				return err +			} else { +				fmt.Println("Process done: ", w.Name) +			} +		} +		timer.Reset(w.wait) +	} +} + +func (self *TaskPool) AddTask(name string, wait time.Duration, task Task) { +	self.tasks = append(self.tasks, &Work{ +		Name: name, +		Task: task, +		wait: wait,  	})  } -func (self *WorkerPool) Start(ctx context.Context) { +func (self *TaskPool) Start(ctx context.Context) {  	var wg sync.WaitGroup -	wg.Add(len(self.workers)) +	wg.Add(len(self.tasks)) -	for _, w := range self.workers { +	for _, w := range self.tasks {  		go func(w *Work) { -			defer wg.Done() -			if err := w.Worker.Start(ctx); err != nil && !errors.Is(err, context.Canceled) { -				fmt.Println("Processes finished, error", w.Name, err.Error()) -			} else { -				fmt.Println(w.Name, "done") -			} +			_ = w.run(ctx) +			wg.Done()  		}(w)  	} diff --git a/templates/base.qtpl b/templates/base.qtpl index b1878ba..a80803a 100644 --- a/templates/base.qtpl +++ b/templates/base.qtpl @@ -47,7 +47,7 @@ Page prints a page implementing Page interface.                  </a>              </div>          </nav> -        <div class="container"> +        <div class="container is-fullhd">              {%= p.Content() %}          </div>      </body> | 
