diff options
Diffstat (limited to 'pkg/worker')
-rw-r--r-- | pkg/worker/exif_scanner.go | 30 | ||||
-rw-r--r-- | pkg/worker/list_processor_test.go | 21 | ||||
-rw-r--r-- | pkg/worker/worker.go | 12 |
3 files changed, 24 insertions, 39 deletions
diff --git a/pkg/worker/exif_scanner.go b/pkg/worker/exif_scanner.go index 4aa247d..91eed12 100644 --- a/pkg/worker/exif_scanner.go +++ b/pkg/worker/exif_scanner.go @@ -4,6 +4,7 @@ import ( "context" "git.sr.ht/~gabrielgio/img/pkg/components/media" + "git.sr.ht/~gabrielgio/img/pkg/coroutine" "git.sr.ht/~gabrielgio/img/pkg/fileop" ) @@ -33,36 +34,11 @@ func (e *EXIFScanner) Query(ctx context.Context) ([]*media.Media, error) { return medias, nil } -func wrapReadExif(ctx context.Context, path string) (*media.MediaEXIF, error) { - c := make(chan *media.MediaEXIF) - e := make(chan error) - go func() { - defer close(c) - defer close(e) - - newExif, err := fileop.ReadExif(path) - if err != nil { - e <- err - } else { - c <- newExif - } - }() - - select { - case <-ctx.Done(): - return nil, ctx.Err() - case m := <-c: - return m, nil - case err := <-e: - return nil, err - } -} - func (e *EXIFScanner) Process(ctx context.Context, m *media.Media) error { - newExif, err := wrapReadExif(ctx, m.Path) + exif, err := coroutine.WrapProcess(ctx, func() (*media.MediaEXIF, error) { return fileop.ReadExif(m.Path) }) if err != nil { return err } - return e.repository.CreateEXIF(ctx, m.ID, newExif) + return e.repository.CreateEXIF(ctx, m.ID, exif) } diff --git a/pkg/worker/list_processor_test.go b/pkg/worker/list_processor_test.go index 1e4ed2d..35672f3 100644 --- a/pkg/worker/list_processor_test.go +++ b/pkg/worker/list_processor_test.go @@ -10,6 +10,7 @@ import ( "testing" "git.sr.ht/~gabrielgio/img/pkg/testkit" + "github.com/sirupsen/logrus" ) type ( @@ -24,10 +25,13 @@ type ( ) func TestListProcessorLimit(t *testing.T) { - mock := &mockCounterListProcessor{ - countTo: 10000, - } - worker := NewWorkerFromListProcessor[int](mock, nil) + var ( + log = logrus.New() + scheduler = NewScheduler(1) + mock = &mockCounterListProcessor{countTo: 10000} + ) + + worker := NewWorkerFromBatchProcessor[int](mock, scheduler, log.WithField("context", "testing")) err := worker.Start(context.Background()) testkit.TestFatalError(t, "Start", err) @@ -36,8 +40,13 @@ func TestListProcessorLimit(t *testing.T) { } func TestListProcessorContextCancelQuery(t *testing.T) { - mock := &mockContextListProcessor{} - worker := NewWorkerFromListProcessor[int](mock, nil) + var ( + log = logrus.New() + scheduler = NewScheduler(1) + mock = &mockContextListProcessor{} + ) + + worker := NewWorkerFromBatchProcessor[int](mock, scheduler, log.WithField("context", "testing")) ctx, cancel := context.WithCancel(context.Background()) var wg sync.WaitGroup diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index 18cc0e2..359384a 100644 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -20,7 +20,6 @@ type ( WorkerPool struct { workers []*Work - wg sync.WaitGroup } ) @@ -36,10 +35,13 @@ func (self *WorkerPool) AddWorker(name string, worker Worker) { } func (self *WorkerPool) Start(ctx context.Context) { - self.wg.Add(len(self.workers)) + var wg sync.WaitGroup + + wg.Add(len(self.workers)) + for _, w := range self.workers { go func(w *Work) { - defer self.wg.Done() + defer wg.Done() if err := w.Worker.Start(ctx); err != nil && !errors.Is(err, context.Canceled) { fmt.Println("Processes finished, error", w.Name, err.Error()) } else { @@ -47,8 +49,6 @@ func (self *WorkerPool) Start(ctx context.Context) { } }(w) } -} -func (self *WorkerPool) Wait() { - self.wg.Wait() + wg.Wait() } |