aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cmd/server/main.go1
-rw-r--r--pkg/coroutine/coroutine.go33
-rw-r--r--pkg/coroutine/coroutine_test.go63
-rw-r--r--pkg/coroutines/coroutines.go1
-rw-r--r--pkg/worker/exif_scanner.go30
-rw-r--r--pkg/worker/list_processor_test.go21
-rw-r--r--pkg/worker/worker.go12
7 files changed, 120 insertions, 41 deletions
diff --git a/cmd/server/main.go b/cmd/server/main.go
index 473bed9..0fa5fea 100644
--- a/cmd/server/main.go
+++ b/cmd/server/main.go
@@ -135,7 +135,6 @@ func main() {
defer stop()
pool.Start(ctx)
- pool.Wait()
}
func OpenDatabase(dbType string, dbConn string) (gorm.Dialector, error) {
diff --git a/pkg/coroutine/coroutine.go b/pkg/coroutine/coroutine.go
new file mode 100644
index 0000000..96d149e
--- /dev/null
+++ b/pkg/coroutine/coroutine.go
@@ -0,0 +1,33 @@
+package coroutine
+
+import (
+ "context"
+)
+
+// WrapProcess wraps process into a go routine and make it cancelable through context
+func WrapProcess[V any](ctx context.Context, fun func() (V, error)) (V, error) {
+ c := make(chan V)
+ e := make(chan error)
+ go func() {
+ defer close(c)
+ defer close(e)
+
+ v, err := fun()
+ if err != nil {
+ e <- err
+ } else {
+ c <- v
+ }
+ }()
+
+ select {
+ case <-ctx.Done():
+ var zero V
+ return zero, ctx.Err()
+ case m := <-c:
+ return m, nil
+ case err := <-e:
+ var zero V
+ return zero, err
+ }
+}
diff --git a/pkg/coroutine/coroutine_test.go b/pkg/coroutine/coroutine_test.go
new file mode 100644
index 0000000..e876ec3
--- /dev/null
+++ b/pkg/coroutine/coroutine_test.go
@@ -0,0 +1,63 @@
+//go:build unit
+
+package coroutine
+
+import (
+ "context"
+ "errors"
+ "sync"
+ "testing"
+ "time"
+
+ "git.sr.ht/~gabrielgio/img/pkg/testkit"
+)
+
+var rError = errors.New("This is a error")
+
+func imediatReturn() (string, error) {
+ return "A string", nil
+}
+
+func imediatErrorReturn() (string, error) {
+ return "", rError
+}
+
+func haltedReturn() (string, error) {
+ time.Sleep(time.Hour)
+ return "", nil
+}
+
+func TestImediatReturn(t *testing.T) {
+ ctx := context.Background()
+ v, err := WrapProcess(ctx, imediatReturn)
+ testkit.TestError(t, "WrapProcess", nil, err)
+ testkit.TestValue(t, "WrapProcess", "A string", v)
+}
+
+func TestImediatErrorReturn(t *testing.T) {
+ ctx := context.Background()
+ v, err := WrapProcess(ctx, imediatErrorReturn)
+ testkit.TestError(t, "WrapProcess", rError, err)
+ testkit.TestValue(t, "WrapProcess", "", v)
+}
+
+func TestHaltedReturn(t *testing.T) {
+ ctx := context.Background()
+ ctx, cancel := context.WithCancel(ctx)
+
+ var (
+ err error
+ wg sync.WaitGroup
+ )
+
+ wg.Add(1)
+ go func(err *error) {
+ defer wg.Done()
+ _, *err = WrapProcess(ctx, haltedReturn)
+ }(&err)
+
+ cancel()
+ wg.Wait()
+
+ testkit.TestError(t, "WrapProcess", context.Canceled, err)
+}
diff --git a/pkg/coroutines/coroutines.go b/pkg/coroutines/coroutines.go
deleted file mode 100644
index c0f7247..0000000
--- a/pkg/coroutines/coroutines.go
+++ /dev/null
@@ -1 +0,0 @@
-package coroutines
diff --git a/pkg/worker/exif_scanner.go b/pkg/worker/exif_scanner.go
index 4aa247d..91eed12 100644
--- a/pkg/worker/exif_scanner.go
+++ b/pkg/worker/exif_scanner.go
@@ -4,6 +4,7 @@ import (
"context"
"git.sr.ht/~gabrielgio/img/pkg/components/media"
+ "git.sr.ht/~gabrielgio/img/pkg/coroutine"
"git.sr.ht/~gabrielgio/img/pkg/fileop"
)
@@ -33,36 +34,11 @@ func (e *EXIFScanner) Query(ctx context.Context) ([]*media.Media, error) {
return medias, nil
}
-func wrapReadExif(ctx context.Context, path string) (*media.MediaEXIF, error) {
- c := make(chan *media.MediaEXIF)
- e := make(chan error)
- go func() {
- defer close(c)
- defer close(e)
-
- newExif, err := fileop.ReadExif(path)
- if err != nil {
- e <- err
- } else {
- c <- newExif
- }
- }()
-
- select {
- case <-ctx.Done():
- return nil, ctx.Err()
- case m := <-c:
- return m, nil
- case err := <-e:
- return nil, err
- }
-}
-
func (e *EXIFScanner) Process(ctx context.Context, m *media.Media) error {
- newExif, err := wrapReadExif(ctx, m.Path)
+ exif, err := coroutine.WrapProcess(ctx, func() (*media.MediaEXIF, error) { return fileop.ReadExif(m.Path) })
if err != nil {
return err
}
- return e.repository.CreateEXIF(ctx, m.ID, newExif)
+ return e.repository.CreateEXIF(ctx, m.ID, exif)
}
diff --git a/pkg/worker/list_processor_test.go b/pkg/worker/list_processor_test.go
index 1e4ed2d..35672f3 100644
--- a/pkg/worker/list_processor_test.go
+++ b/pkg/worker/list_processor_test.go
@@ -10,6 +10,7 @@ import (
"testing"
"git.sr.ht/~gabrielgio/img/pkg/testkit"
+ "github.com/sirupsen/logrus"
)
type (
@@ -24,10 +25,13 @@ type (
)
func TestListProcessorLimit(t *testing.T) {
- mock := &mockCounterListProcessor{
- countTo: 10000,
- }
- worker := NewWorkerFromListProcessor[int](mock, nil)
+ var (
+ log = logrus.New()
+ scheduler = NewScheduler(1)
+ mock = &mockCounterListProcessor{countTo: 10000}
+ )
+
+ worker := NewWorkerFromBatchProcessor[int](mock, scheduler, log.WithField("context", "testing"))
err := worker.Start(context.Background())
testkit.TestFatalError(t, "Start", err)
@@ -36,8 +40,13 @@ func TestListProcessorLimit(t *testing.T) {
}
func TestListProcessorContextCancelQuery(t *testing.T) {
- mock := &mockContextListProcessor{}
- worker := NewWorkerFromListProcessor[int](mock, nil)
+ var (
+ log = logrus.New()
+ scheduler = NewScheduler(1)
+ mock = &mockContextListProcessor{}
+ )
+
+ worker := NewWorkerFromBatchProcessor[int](mock, scheduler, log.WithField("context", "testing"))
ctx, cancel := context.WithCancel(context.Background())
var wg sync.WaitGroup
diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go
index 18cc0e2..359384a 100644
--- a/pkg/worker/worker.go
+++ b/pkg/worker/worker.go
@@ -20,7 +20,6 @@ type (
WorkerPool struct {
workers []*Work
- wg sync.WaitGroup
}
)
@@ -36,10 +35,13 @@ func (self *WorkerPool) AddWorker(name string, worker Worker) {
}
func (self *WorkerPool) Start(ctx context.Context) {
- self.wg.Add(len(self.workers))
+ var wg sync.WaitGroup
+
+ wg.Add(len(self.workers))
+
for _, w := range self.workers {
go func(w *Work) {
- defer self.wg.Done()
+ defer wg.Done()
if err := w.Worker.Start(ctx); err != nil && !errors.Is(err, context.Canceled) {
fmt.Println("Processes finished, error", w.Name, err.Error())
} else {
@@ -47,8 +49,6 @@ func (self *WorkerPool) Start(ctx context.Context) {
}
}(w)
}
-}
-func (self *WorkerPool) Wait() {
- self.wg.Wait()
+ wg.Wait()
}