aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cmd/server/main.go25
-rw-r--r--pkg/worker/httpserver.go8
-rw-r--r--pkg/worker/list_processor.go32
-rw-r--r--pkg/worker/list_processor_test.go4
-rw-r--r--pkg/worker/worker.go62
-rw-r--r--templates/base.qtpl2
6 files changed, 77 insertions, 56 deletions
diff --git a/cmd/server/main.go b/cmd/server/main.go
index daf5356..2a93946 100644
--- a/cmd/server/main.go
+++ b/cmd/server/main.go
@@ -7,6 +7,7 @@ import (
"net/http"
"os"
"os/signal"
+ "time"
"github.com/gorilla/mux"
"github.com/sirupsen/logrus"
@@ -122,37 +123,37 @@ func main() {
albumScanner = scanner.NewAlbumScanner(mediaRepository)
)
- // worker
+ // tasks
var (
- serverWorker = worker.NewServerWorker(&http.Server{Handler: r, Addr: "0.0.0.0:8080"})
- fileWorker = worker.NewWorkerFromChanProcessor[string](
+ serverTask = worker.NewServerTask(&http.Server{Handler: r, Addr: "0.0.0.0:8080"})
+ fileTask = worker.NewTaskFromChanProcessor[string](
fileScanner,
scheduler,
logrus.WithField("context", "file scanner"),
)
- exifWorker = worker.NewWorkerFromBatchProcessor[*repository.Media](
+ exifTask = worker.NewTaskFromBatchProcessor[*repository.Media](
exifScanner,
scheduler,
logrus.WithField("context", "exif scanner"),
)
- thumbnailWorker = worker.NewWorkerFromBatchProcessor[*repository.Media](
+ thumbnailTask = worker.NewTaskFromBatchProcessor[*repository.Media](
thumbnailScanner,
scheduler,
logrus.WithField("context", "thumbnail scanner"),
)
- albumWorker = worker.NewWorkerFromSerialProcessor[*repository.Media](
+ albumTask = worker.NewTaskFromSerialProcessor[*repository.Media](
albumScanner,
scheduler,
logrus.WithField("context", "thumbnail scanner"),
)
)
- pool := worker.NewWorkerPool()
- pool.AddWorker("http server", serverWorker)
- pool.AddWorker("exif scanner", exifWorker)
- pool.AddWorker("file scanner", fileWorker)
- pool.AddWorker("thumbnail scanner", thumbnailWorker)
- pool.AddWorker("album scanner", albumWorker)
+ pool := worker.NewTaskPool()
+ pool.AddTask("http server", time.Minute, serverTask)
+ pool.AddTask("exif scanner", 15*time.Minute, exifTask)
+ pool.AddTask("file scanner", 2*time.Hour, fileTask)
+ pool.AddTask("thumbnail scanner", 15*time.Minute, thumbnailTask)
+ pool.AddTask("album scanner", 15*time.Minute, albumTask)
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt)
defer stop()
diff --git a/pkg/worker/httpserver.go b/pkg/worker/httpserver.go
index dc8f255..f0ec3ba 100644
--- a/pkg/worker/httpserver.go
+++ b/pkg/worker/httpserver.go
@@ -5,11 +5,11 @@ import (
"net/http"
)
-type ServerWorker struct {
+type ServerTask struct {
server *http.Server
}
-func (self *ServerWorker) Start(ctx context.Context) error {
+func (self *ServerTask) Start(ctx context.Context) error {
go func() {
// nolint: errcheck
self.server.ListenAndServe()
@@ -19,8 +19,8 @@ func (self *ServerWorker) Start(ctx context.Context) error {
return self.server.Shutdown(ctx)
}
-func NewServerWorker(server *http.Server) *ServerWorker {
- return &ServerWorker{
+func NewServerTask(server *http.Server) *ServerTask {
+ return &ServerTask{
server: server,
}
}
diff --git a/pkg/worker/list_processor.go b/pkg/worker/list_processor.go
index 02817c9..ea6b453 100644
--- a/pkg/worker/list_processor.go
+++ b/pkg/worker/list_processor.go
@@ -10,7 +10,7 @@ import (
type (
- // A simple worker to deal with list.
+ // A simple task to deal with list.
ChanProcessor[T any] interface {
Query(context.Context) (<-chan T, error)
Process(context.Context, T) error
@@ -25,62 +25,62 @@ type (
Process(context.Context, T) error
}
- chanProcessorWorker[T any] struct {
+ chanProcessorTask[T any] struct {
chanProcessor ChanProcessor[T]
logrus *logrus.Entry
scheduler *Scheduler
}
- batchProcessorWorker[T any] struct {
+ batchProcessorTask[T any] struct {
batchProcessor ListProcessor[T]
logrus *logrus.Entry
scheduler *Scheduler
}
- serialProcessorWorker[T any] struct {
+ serialProcessorTask[T any] struct {
batchProcessor ListProcessor[T]
logrus *logrus.Entry
scheduler *Scheduler
}
)
-func NewWorkerFromBatchProcessor[T any](
+func NewTaskFromBatchProcessor[T any](
batchProcessor ListProcessor[T],
scheduler *Scheduler,
logrus *logrus.Entry,
-) Worker {
- return &batchProcessorWorker[T]{
+) Task {
+ return &batchProcessorTask[T]{
batchProcessor: batchProcessor,
scheduler: scheduler,
logrus: logrus,
}
}
-func NewWorkerFromSerialProcessor[T any](
+func NewTaskFromSerialProcessor[T any](
batchProcessor ListProcessor[T],
scheduler *Scheduler,
logrus *logrus.Entry,
-) Worker {
- return &serialProcessorWorker[T]{
+) Task {
+ return &serialProcessorTask[T]{
batchProcessor: batchProcessor,
scheduler: scheduler,
logrus: logrus,
}
}
-func NewWorkerFromChanProcessor[T any](
+func NewTaskFromChanProcessor[T any](
chanProcessor ChanProcessor[T],
scheduler *Scheduler,
logrus *logrus.Entry,
-) Worker {
- return &chanProcessorWorker[T]{
+) Task {
+ return &chanProcessorTask[T]{
chanProcessor: chanProcessor,
scheduler: scheduler,
logrus: logrus,
}
}
-func (l *batchProcessorWorker[T]) Start(ctx context.Context) error {
+func (l *batchProcessorTask[T]) Start(ctx context.Context) error {
for {
values, err := l.batchProcessor.Query(ctx)
if err != nil {
@@ -123,7 +123,7 @@ func (l *batchProcessorWorker[T]) Start(ctx context.Context) error {
}
}
-func (l *serialProcessorWorker[T]) Start(ctx context.Context) error {
+func (l *serialProcessorTask[T]) Start(ctx context.Context) error {
for {
values, err := l.batchProcessor.Query(ctx)
if err != nil {
@@ -158,7 +158,7 @@ func (l *serialProcessorWorker[T]) Start(ctx context.Context) error {
}
}
-func (l *chanProcessorWorker[T]) Start(ctx context.Context) error {
+func (l *chanProcessorTask[T]) Start(ctx context.Context) error {
c, err := l.chanProcessor.Query(ctx)
if err != nil {
return err
diff --git a/pkg/worker/list_processor_test.go b/pkg/worker/list_processor_test.go
index ce3ff0a..abdb907 100644
--- a/pkg/worker/list_processor_test.go
+++ b/pkg/worker/list_processor_test.go
@@ -32,7 +32,7 @@ func TestListProcessorLimit(t *testing.T) {
mock = &mockCounterListProcessor{countTo: 10000}
)
- worker := NewWorkerFromBatchProcessor[int](mock, scheduler, log.WithField("context", "testing"))
+ worker := NewTaskFromBatchProcessor[int](mock, scheduler, log.WithField("context", "testing"))
err := worker.Start(context.Background())
testkit.TestFatalError(t, "Start", err)
@@ -47,7 +47,7 @@ func TestListProcessorContextCancelQuery(t *testing.T) {
mock = &mockContextListProcessor{}
)
- worker := NewWorkerFromBatchProcessor[int](mock, scheduler, log.WithField("context", "testing"))
+ worker := NewTaskFromBatchProcessor[int](mock, scheduler, log.WithField("context", "testing"))
ctx, cancel := context.WithCancel(context.Background())
var wg sync.WaitGroup
diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go
index 359384a..b768320 100644
--- a/pkg/worker/worker.go
+++ b/pkg/worker/worker.go
@@ -5,48 +5,68 @@ import (
"errors"
"fmt"
"sync"
+ "time"
)
type (
- // Worker should watch for context
- Worker interface {
+ // Task should watch for context
+ Task interface {
Start(context.Context) error
}
Work struct {
- Name string
- Worker Worker
+ Name string
+ Task Task
+ wait time.Duration
}
- WorkerPool struct {
- workers []*Work
+ TaskPool struct {
+ tasks []*Work
}
)
-func NewWorkerPool() *WorkerPool {
- return &WorkerPool{}
+func NewTaskPool() *TaskPool {
+ return &TaskPool{}
}
-func (self *WorkerPool) AddWorker(name string, worker Worker) {
- self.workers = append(self.workers, &Work{
- Name: name,
- Worker: worker,
+func (w *Work) run(ctx context.Context) error {
+ // first time fire from the get go
+ timer := time.NewTimer(time.Nanosecond)
+
+ for {
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case <-timer.C:
+ fmt.Println("Process starting: ", w.Name)
+ if err := w.Task.Start(ctx); err != nil && !errors.Is(err, context.Canceled) {
+ fmt.Println("Process errored: ", w.Name, err.Error())
+ return err
+ } else {
+ fmt.Println("Process done: ", w.Name)
+ }
+ }
+ timer.Reset(w.wait)
+ }
+}
+
+func (self *TaskPool) AddTask(name string, wait time.Duration, task Task) {
+ self.tasks = append(self.tasks, &Work{
+ Name: name,
+ Task: task,
+ wait: wait,
})
}
-func (self *WorkerPool) Start(ctx context.Context) {
+func (self *TaskPool) Start(ctx context.Context) {
var wg sync.WaitGroup
- wg.Add(len(self.workers))
+ wg.Add(len(self.tasks))
- for _, w := range self.workers {
+ for _, w := range self.tasks {
go func(w *Work) {
- defer wg.Done()
- if err := w.Worker.Start(ctx); err != nil && !errors.Is(err, context.Canceled) {
- fmt.Println("Processes finished, error", w.Name, err.Error())
- } else {
- fmt.Println(w.Name, "done")
- }
+ _ = w.run(ctx)
+ wg.Done()
}(w)
}
diff --git a/templates/base.qtpl b/templates/base.qtpl
index b1878ba..a80803a 100644
--- a/templates/base.qtpl
+++ b/templates/base.qtpl
@@ -47,7 +47,7 @@ Page prints a page implementing Page interface.
</a>
</div>
</nav>
- <div class="container">
+ <div class="container is-fullhd">
{%= p.Content() %}
</div>
</body>