aboutsummaryrefslogtreecommitdiff
path: root/pkg/worker
diff options
context:
space:
mode:
authorGabriel Arakaki Giovanini <mail@gabrielgio.me>2023-02-26 19:54:48 +0100
committerGabriel Arakaki Giovanini <mail@gabrielgio.me>2023-06-18 16:30:36 +0200
commitc8e1328164e9ffbd681c3c0e449f1e6b9856b896 (patch)
treefaee639a4c55c5dc3bfc59a5400026822c40221d /pkg/worker
downloadlens-c8e1328164e9ffbd681c3c0e449f1e6b9856b896.tar.gz
lens-c8e1328164e9ffbd681c3c0e449f1e6b9856b896.tar.bz2
lens-c8e1328164e9ffbd681c3c0e449f1e6b9856b896.zip
feat: Inicial commit
It contains rough template for the server and runners. It contains rough template for the server and runners.
Diffstat (limited to 'pkg/worker')
-rw-r--r--pkg/worker/exif_scanner.go43
-rw-r--r--pkg/worker/file_scanner.go81
-rw-r--r--pkg/worker/httpserver.go31
-rw-r--r--pkg/worker/list_processor.go102
-rw-r--r--pkg/worker/list_processor_test.go90
-rw-r--r--pkg/worker/scheduler.go29
-rw-r--r--pkg/worker/worker.go54
7 files changed, 430 insertions, 0 deletions
diff --git a/pkg/worker/exif_scanner.go b/pkg/worker/exif_scanner.go
new file mode 100644
index 0000000..66091cd
--- /dev/null
+++ b/pkg/worker/exif_scanner.go
@@ -0,0 +1,43 @@
+package worker
+
+import (
+ "context"
+
+ "git.sr.ht/~gabrielgio/img/pkg/components/media"
+ "git.sr.ht/~gabrielgio/img/pkg/fileop"
+)
+
+type (
+ EXIFScanner struct {
+ repository media.Repository
+ }
+)
+
+var _ ListProcessor[*media.Media] = &EXIFScanner{}
+
+func NewEXIFScanner(root string, repository media.Repository) *EXIFScanner {
+ return &EXIFScanner{
+ repository: repository,
+ }
+}
+
+func (e *EXIFScanner) Query(ctx context.Context) ([]*media.Media, error) {
+ medias, err := e.repository.GetEmptyEXIF(ctx, &media.Pagination{
+ Page: 0,
+ Size: 100,
+ })
+ if err != nil {
+ return nil, err
+ }
+
+ return medias, nil
+}
+
+func (e *EXIFScanner) Process(ctx context.Context, m *media.Media) error {
+ newExif, err := fileop.ReadExif(m.Path)
+ if err != nil {
+ return err
+ }
+
+ return e.repository.CreateEXIF(ctx, m.ID, newExif)
+}
diff --git a/pkg/worker/file_scanner.go b/pkg/worker/file_scanner.go
new file mode 100644
index 0000000..321fbca
--- /dev/null
+++ b/pkg/worker/file_scanner.go
@@ -0,0 +1,81 @@
+package worker
+
+import (
+ "context"
+ "crypto/md5"
+ "encoding/hex"
+ "io/fs"
+ "path/filepath"
+
+ "github.com/gabriel-vasile/mimetype"
+
+ "git.sr.ht/~gabrielgio/img/pkg/components/media"
+)
+
+type (
+ FileScanner struct {
+ root string
+ repository media.Repository
+ }
+)
+
+var _ ChanProcessor[string] = &FileScanner{}
+
+func NewFileScanner(root string, repository media.Repository) *FileScanner {
+ return &FileScanner{
+ root: root,
+ repository: repository,
+ }
+}
+
+func (f *FileScanner) Query(ctx context.Context) (<-chan string, error) {
+ c := make(chan string)
+ go func() {
+ defer close(c)
+ _ = filepath.Walk(f.root, func(path string, info fs.FileInfo, err error) error {
+ if info.IsDir() && filepath.Base(info.Name())[0] == '.' {
+ return filepath.SkipDir
+ }
+
+ if info.IsDir() {
+ return nil
+ }
+
+ if filepath.Ext(info.Name()) != ".jpg" &&
+ filepath.Ext(info.Name()) != ".jpeg" &&
+ filepath.Ext(info.Name()) != ".png" {
+ return nil
+ }
+ c <- path
+ return nil
+ })
+ }()
+ return c, nil
+}
+
+func (f *FileScanner) Process(ctx context.Context, path string) error {
+ hash := md5.Sum([]byte(path))
+ str := hex.EncodeToString(hash[:])
+ name := filepath.Base(path)
+
+ exists, errResp := f.repository.Exists(ctx, str)
+ if errResp != nil {
+ return errResp
+ }
+
+ if exists {
+ return nil
+ }
+
+ mime, errResp := mimetype.DetectFile(path)
+ if errResp != nil {
+ return errResp
+ }
+
+ return f.repository.Create(ctx, &media.CreateMedia{
+ Name: name,
+ Path: path,
+ PathHash: str,
+ MIMEType: mime.String(),
+ })
+}
diff --git a/pkg/worker/httpserver.go b/pkg/worker/httpserver.go
new file mode 100644
index 0000000..181cf73
--- /dev/null
+++ b/pkg/worker/httpserver.go
@@ -0,0 +1,31 @@
+package worker
+
+import (
+ "context"
+
+ "github.com/valyala/fasthttp"
+)
+
+type ServerWorker struct {
+ server *fasthttp.Server
+}
+
+func (self *ServerWorker) Start(ctx context.Context) error {
+ go func() {
+ // nolint: errcheck
+ self.server.ListenAndServe("0.0.0.0:8080")
+ }()
+
+ <-ctx.Done()
+ return self.Shutdown()
+}
+
+func (self *ServerWorker) Shutdown() error {
+ return self.server.Shutdown()
+}
+
+func NewServerWorker(server *fasthttp.Server) *ServerWorker {
+ return &ServerWorker{
+ server: server,
+ }
+}
diff --git a/pkg/worker/list_processor.go b/pkg/worker/list_processor.go
new file mode 100644
index 0000000..d53b7ea
--- /dev/null
+++ b/pkg/worker/list_processor.go
@@ -0,0 +1,102 @@
+package worker
+
+import (
+ "context"
+)
+
+type (
+
+ // A simple worker to deal with list.
+ ChanProcessor[T any] interface {
+ Query(context.Context) (<-chan T, error)
+ Process(context.Context, T) error
+ }
+
+ ListProcessor[T any] interface {
+ Query(context.Context) ([]T, error)
+ Process(context.Context, T) error
+ }
+
+ chanProcessorWorker[T any] struct {
+ chanProcessor ChanProcessor[T]
+ scheduler *Scheduler
+ }
+
+ listProcessorWorker[T any] struct {
+ listProcessor ListProcessor[T]
+ scheduler *Scheduler
+ }
+)
+
+func NewWorkerFromListProcessor[T any](
+ listProcessor ListProcessor[T],
+ scheduler *Scheduler,
+) Worker {
+ return &listProcessorWorker[T]{
+ listProcessor: listProcessor,
+ scheduler: scheduler,
+ }
+}
+
+func NewWorkerFromChanProcessor[T any](
+ listProcessor ChanProcessor[T],
+ scheduler *Scheduler,
+) Worker {
+ return &chanProcessorWorker[T]{
+ chanProcessor: listProcessor,
+ scheduler: scheduler,
+ }
+}
+
+func (l *listProcessorWorker[T]) Start(ctx context.Context) error {
+ for {
+ values, err := l.listProcessor.Query(ctx)
+ if err != nil {
+ return err
+ }
+
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ default:
+ }
+
+ if len(values) == 0 {
+ return nil
+ }
+
+ for _, v := range values {
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ default:
+ }
+
+ if err := l.listProcessor.Process(ctx, v); err != nil {
+ return err
+ }
+ }
+ }
+}
+
+func (l *chanProcessorWorker[T]) Start(ctx context.Context) error {
+ c, err := l.chanProcessor.Query(ctx)
+ if err != nil {
+ return err
+ }
+
+ for {
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case v, ok := <-c:
+ if !ok {
+ return nil
+ }
+
+ if err := l.chanProcessor.Process(ctx, v); err != nil {
+ return err
+ }
+ }
+ }
+}
diff --git a/pkg/worker/list_processor_test.go b/pkg/worker/list_processor_test.go
new file mode 100644
index 0000000..b7373d1
--- /dev/null
+++ b/pkg/worker/list_processor_test.go
@@ -0,0 +1,90 @@
+// go:build unit
+
+package worker
+
+import (
+ "context"
+ "errors"
+ "math/rand"
+ "sync"
+ "testing"
+
+ "git.sr.ht/~gabrielgio/img/pkg/testkit"
+)
+
+type (
+ mockCounterListProcessor struct {
+ done bool
+ countTo int
+ counter int
+ }
+
+ mockContextListProcessor struct {
+ }
+)
+
+func TestListProcessorLimit(t *testing.T) {
+ mock := &mockCounterListProcessor{
+ countTo: 10000,
+ }
+ worker := NewWorkerFromListProcessor[int](mock, nil)
+
+ err := worker.Start(context.Background())
+ testkit.TestFatalError(t, "Start", err)
+
+ testkit.TestValue(t, "Start", mock.countTo, mock.counter)
+}
+
+func TestListProcessorContextCancelQuery(t *testing.T) {
+ mock := &mockContextListProcessor{}
+ worker := NewWorkerFromListProcessor[int](mock, nil)
+
+ ctx, cancel := context.WithCancel(context.Background())
+ var wg sync.WaitGroup
+
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ err := worker.Start(ctx)
+ if errors.Is(err, context.Canceled) {
+ return
+ }
+ testkit.TestFatalError(t, "Start", err)
+ }()
+
+ cancel()
+ // this rely on timeout to test
+ wg.Wait()
+}
+
+func (m *mockCounterListProcessor) Query(_ context.Context) ([]int, error) {
+ if m.done {
+ return make([]int, 0), nil
+ }
+ values := make([]int, 0, m.countTo)
+ for i := 0; i < m.countTo; i++ {
+ values = append(values, rand.Int())
+ }
+
+ m.done = true
+ return values, nil
+}
+
+func (m *mockCounterListProcessor) Process(_ context.Context, _ int) error {
+ m.counter++
+ return nil
+}
+
+func (m *mockContextListProcessor) Query(_ context.Context) ([]int, error) {
+ // keeps returning the query so it can run in infinity loop
+ values := make([]int, 0, 10)
+ for i := 0; i < 10; i++ {
+ values = append(values, rand.Int())
+ }
+ return values, nil
+}
+
+func (m *mockContextListProcessor) Process(_ context.Context, _ int) error {
+ // do nothing
+ return nil
+}
diff --git a/pkg/worker/scheduler.go b/pkg/worker/scheduler.go
new file mode 100644
index 0000000..b410b33
--- /dev/null
+++ b/pkg/worker/scheduler.go
@@ -0,0 +1,29 @@
+package worker
+
+import (
+ "fmt"
+ "sync/atomic"
+)
+
+type Scheduler struct {
+ pool chan any
+ count atomic.Int64
+}
+
+func NewScheduler(count uint) *Scheduler {
+ return &Scheduler{
+ pool: make(chan any, count),
+ }
+}
+
+func (self *Scheduler) Take() {
+ self.pool <- nil
+ self.count.Add(1)
+ fmt.Printf("<- %d\n", self.count.Load())
+}
+
+func (self *Scheduler) Return() {
+ <-self.pool
+ self.count.Add(-1)
+ fmt.Printf("-> %d\n", self.count.Load())
+}
diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go
new file mode 100644
index 0000000..c52f0be
--- /dev/null
+++ b/pkg/worker/worker.go
@@ -0,0 +1,54 @@
+package worker
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "sync"
+)
+
+type (
+ // Worker should watch for context
+ Worker interface {
+ Start(context.Context) error
+ }
+
+ Work struct {
+ Name string
+ Worker Worker
+ }
+
+ WorkerPool struct {
+ workers []*Work
+ wg sync.WaitGroup
+ }
+)
+
+func NewWorkerPool() *WorkerPool {
+ return &WorkerPool{}
+}
+
+func (self *WorkerPool) AddWorker(name string, worker Worker) {
+ self.workers = append(self.workers, &Work{
+ Name: name,
+ Worker: worker,
+ })
+}
+
+func (self *WorkerPool) Start(ctx context.Context) {
+ for _, w := range self.workers {
+ self.wg.Add(1)
+ go func(w *Work) {
+ defer self.wg.Done()
+ if err := w.Worker.Start(ctx); err != nil && !errors.Is(err, context.Canceled) {
+ fmt.Println("Error ", w.Name, err.Error())
+ } else {
+ fmt.Println(w.Name, "done")
+ }
+ }(w)
+ }
+}
+
+func (self *WorkerPool) Wait() {
+ self.wg.Wait()
+}