aboutsummaryrefslogtreecommitdiff
path: root/pkg/worker
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/worker')
-rw-r--r--pkg/worker/exif_scanner.go30
-rw-r--r--pkg/worker/list_processor_test.go21
-rw-r--r--pkg/worker/worker.go12
3 files changed, 24 insertions, 39 deletions
diff --git a/pkg/worker/exif_scanner.go b/pkg/worker/exif_scanner.go
index 4aa247d..91eed12 100644
--- a/pkg/worker/exif_scanner.go
+++ b/pkg/worker/exif_scanner.go
@@ -4,6 +4,7 @@ import (
"context"
"git.sr.ht/~gabrielgio/img/pkg/components/media"
+ "git.sr.ht/~gabrielgio/img/pkg/coroutine"
"git.sr.ht/~gabrielgio/img/pkg/fileop"
)
@@ -33,36 +34,11 @@ func (e *EXIFScanner) Query(ctx context.Context) ([]*media.Media, error) {
return medias, nil
}
-func wrapReadExif(ctx context.Context, path string) (*media.MediaEXIF, error) {
- c := make(chan *media.MediaEXIF)
- e := make(chan error)
- go func() {
- defer close(c)
- defer close(e)
-
- newExif, err := fileop.ReadExif(path)
- if err != nil {
- e <- err
- } else {
- c <- newExif
- }
- }()
-
- select {
- case <-ctx.Done():
- return nil, ctx.Err()
- case m := <-c:
- return m, nil
- case err := <-e:
- return nil, err
- }
-}
-
func (e *EXIFScanner) Process(ctx context.Context, m *media.Media) error {
- newExif, err := wrapReadExif(ctx, m.Path)
+ exif, err := coroutine.WrapProcess(ctx, func() (*media.MediaEXIF, error) { return fileop.ReadExif(m.Path) })
if err != nil {
return err
}
- return e.repository.CreateEXIF(ctx, m.ID, newExif)
+ return e.repository.CreateEXIF(ctx, m.ID, exif)
}
diff --git a/pkg/worker/list_processor_test.go b/pkg/worker/list_processor_test.go
index 1e4ed2d..35672f3 100644
--- a/pkg/worker/list_processor_test.go
+++ b/pkg/worker/list_processor_test.go
@@ -10,6 +10,7 @@ import (
"testing"
"git.sr.ht/~gabrielgio/img/pkg/testkit"
+ "github.com/sirupsen/logrus"
)
type (
@@ -24,10 +25,13 @@ type (
)
func TestListProcessorLimit(t *testing.T) {
- mock := &mockCounterListProcessor{
- countTo: 10000,
- }
- worker := NewWorkerFromListProcessor[int](mock, nil)
+ var (
+ log = logrus.New()
+ scheduler = NewScheduler(1)
+ mock = &mockCounterListProcessor{countTo: 10000}
+ )
+
+ worker := NewWorkerFromBatchProcessor[int](mock, scheduler, log.WithField("context", "testing"))
err := worker.Start(context.Background())
testkit.TestFatalError(t, "Start", err)
@@ -36,8 +40,13 @@ func TestListProcessorLimit(t *testing.T) {
}
func TestListProcessorContextCancelQuery(t *testing.T) {
- mock := &mockContextListProcessor{}
- worker := NewWorkerFromListProcessor[int](mock, nil)
+ var (
+ log = logrus.New()
+ scheduler = NewScheduler(1)
+ mock = &mockContextListProcessor{}
+ )
+
+ worker := NewWorkerFromBatchProcessor[int](mock, scheduler, log.WithField("context", "testing"))
ctx, cancel := context.WithCancel(context.Background())
var wg sync.WaitGroup
diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go
index 18cc0e2..359384a 100644
--- a/pkg/worker/worker.go
+++ b/pkg/worker/worker.go
@@ -20,7 +20,6 @@ type (
WorkerPool struct {
workers []*Work
- wg sync.WaitGroup
}
)
@@ -36,10 +35,13 @@ func (self *WorkerPool) AddWorker(name string, worker Worker) {
}
func (self *WorkerPool) Start(ctx context.Context) {
- self.wg.Add(len(self.workers))
+ var wg sync.WaitGroup
+
+ wg.Add(len(self.workers))
+
for _, w := range self.workers {
go func(w *Work) {
- defer self.wg.Done()
+ defer wg.Done()
if err := w.Worker.Start(ctx); err != nil && !errors.Is(err, context.Canceled) {
fmt.Println("Processes finished, error", w.Name, err.Error())
} else {
@@ -47,8 +49,6 @@ func (self *WorkerPool) Start(ctx context.Context) {
}
}(w)
}
-}
-func (self *WorkerPool) Wait() {
- self.wg.Wait()
+ wg.Wait()
}