diff options
Diffstat (limited to 'pkg')
-rw-r--r-- | pkg/coroutine/coroutine.go | 33 | ||||
-rw-r--r-- | pkg/coroutine/coroutine_test.go | 63 | ||||
-rw-r--r-- | pkg/coroutines/coroutines.go | 1 | ||||
-rw-r--r-- | pkg/worker/exif_scanner.go | 30 | ||||
-rw-r--r-- | pkg/worker/list_processor_test.go | 21 | ||||
-rw-r--r-- | pkg/worker/worker.go | 12 |
6 files changed, 120 insertions, 40 deletions
diff --git a/pkg/coroutine/coroutine.go b/pkg/coroutine/coroutine.go new file mode 100644 index 0000000..96d149e --- /dev/null +++ b/pkg/coroutine/coroutine.go @@ -0,0 +1,33 @@ +package coroutine + +import ( + "context" +) + +// WrapProcess wraps process into a go routine and make it cancelable through context +func WrapProcess[V any](ctx context.Context, fun func() (V, error)) (V, error) { + c := make(chan V) + e := make(chan error) + go func() { + defer close(c) + defer close(e) + + v, err := fun() + if err != nil { + e <- err + } else { + c <- v + } + }() + + select { + case <-ctx.Done(): + var zero V + return zero, ctx.Err() + case m := <-c: + return m, nil + case err := <-e: + var zero V + return zero, err + } +} diff --git a/pkg/coroutine/coroutine_test.go b/pkg/coroutine/coroutine_test.go new file mode 100644 index 0000000..e876ec3 --- /dev/null +++ b/pkg/coroutine/coroutine_test.go @@ -0,0 +1,63 @@ +//go:build unit + +package coroutine + +import ( + "context" + "errors" + "sync" + "testing" + "time" + + "git.sr.ht/~gabrielgio/img/pkg/testkit" +) + +var rError = errors.New("This is a error") + +func imediatReturn() (string, error) { + return "A string", nil +} + +func imediatErrorReturn() (string, error) { + return "", rError +} + +func haltedReturn() (string, error) { + time.Sleep(time.Hour) + return "", nil +} + +func TestImediatReturn(t *testing.T) { + ctx := context.Background() + v, err := WrapProcess(ctx, imediatReturn) + testkit.TestError(t, "WrapProcess", nil, err) + testkit.TestValue(t, "WrapProcess", "A string", v) +} + +func TestImediatErrorReturn(t *testing.T) { + ctx := context.Background() + v, err := WrapProcess(ctx, imediatErrorReturn) + testkit.TestError(t, "WrapProcess", rError, err) + testkit.TestValue(t, "WrapProcess", "", v) +} + +func TestHaltedReturn(t *testing.T) { + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + + var ( + err error + wg sync.WaitGroup + ) + + wg.Add(1) + go func(err *error) { + defer wg.Done() + _, *err = WrapProcess(ctx, haltedReturn) + }(&err) + + cancel() + wg.Wait() + + testkit.TestError(t, "WrapProcess", context.Canceled, err) +} diff --git a/pkg/coroutines/coroutines.go b/pkg/coroutines/coroutines.go deleted file mode 100644 index c0f7247..0000000 --- a/pkg/coroutines/coroutines.go +++ /dev/null @@ -1 +0,0 @@ -package coroutines diff --git a/pkg/worker/exif_scanner.go b/pkg/worker/exif_scanner.go index 4aa247d..91eed12 100644 --- a/pkg/worker/exif_scanner.go +++ b/pkg/worker/exif_scanner.go @@ -4,6 +4,7 @@ import ( "context" "git.sr.ht/~gabrielgio/img/pkg/components/media" + "git.sr.ht/~gabrielgio/img/pkg/coroutine" "git.sr.ht/~gabrielgio/img/pkg/fileop" ) @@ -33,36 +34,11 @@ func (e *EXIFScanner) Query(ctx context.Context) ([]*media.Media, error) { return medias, nil } -func wrapReadExif(ctx context.Context, path string) (*media.MediaEXIF, error) { - c := make(chan *media.MediaEXIF) - e := make(chan error) - go func() { - defer close(c) - defer close(e) - - newExif, err := fileop.ReadExif(path) - if err != nil { - e <- err - } else { - c <- newExif - } - }() - - select { - case <-ctx.Done(): - return nil, ctx.Err() - case m := <-c: - return m, nil - case err := <-e: - return nil, err - } -} - func (e *EXIFScanner) Process(ctx context.Context, m *media.Media) error { - newExif, err := wrapReadExif(ctx, m.Path) + exif, err := coroutine.WrapProcess(ctx, func() (*media.MediaEXIF, error) { return fileop.ReadExif(m.Path) }) if err != nil { return err } - return e.repository.CreateEXIF(ctx, m.ID, newExif) + return e.repository.CreateEXIF(ctx, m.ID, exif) } diff --git a/pkg/worker/list_processor_test.go b/pkg/worker/list_processor_test.go index 1e4ed2d..35672f3 100644 --- a/pkg/worker/list_processor_test.go +++ b/pkg/worker/list_processor_test.go @@ -10,6 +10,7 @@ import ( "testing" "git.sr.ht/~gabrielgio/img/pkg/testkit" + "github.com/sirupsen/logrus" ) type ( @@ -24,10 +25,13 @@ type ( ) func TestListProcessorLimit(t *testing.T) { - mock := &mockCounterListProcessor{ - countTo: 10000, - } - worker := NewWorkerFromListProcessor[int](mock, nil) + var ( + log = logrus.New() + scheduler = NewScheduler(1) + mock = &mockCounterListProcessor{countTo: 10000} + ) + + worker := NewWorkerFromBatchProcessor[int](mock, scheduler, log.WithField("context", "testing")) err := worker.Start(context.Background()) testkit.TestFatalError(t, "Start", err) @@ -36,8 +40,13 @@ func TestListProcessorLimit(t *testing.T) { } func TestListProcessorContextCancelQuery(t *testing.T) { - mock := &mockContextListProcessor{} - worker := NewWorkerFromListProcessor[int](mock, nil) + var ( + log = logrus.New() + scheduler = NewScheduler(1) + mock = &mockContextListProcessor{} + ) + + worker := NewWorkerFromBatchProcessor[int](mock, scheduler, log.WithField("context", "testing")) ctx, cancel := context.WithCancel(context.Background()) var wg sync.WaitGroup diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index 18cc0e2..359384a 100644 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -20,7 +20,6 @@ type ( WorkerPool struct { workers []*Work - wg sync.WaitGroup } ) @@ -36,10 +35,13 @@ func (self *WorkerPool) AddWorker(name string, worker Worker) { } func (self *WorkerPool) Start(ctx context.Context) { - self.wg.Add(len(self.workers)) + var wg sync.WaitGroup + + wg.Add(len(self.workers)) + for _, w := range self.workers { go func(w *Work) { - defer self.wg.Done() + defer wg.Done() if err := w.Worker.Start(ctx); err != nil && !errors.Is(err, context.Canceled) { fmt.Println("Processes finished, error", w.Name, err.Error()) } else { @@ -47,8 +49,6 @@ func (self *WorkerPool) Start(ctx context.Context) { } }(w) } -} -func (self *WorkerPool) Wait() { - self.wg.Wait() + wg.Wait() } |