diff options
author | Gabriel Arakaki Giovanini <mail@gabrielgio.me> | 2023-02-26 19:54:48 +0100 |
---|---|---|
committer | Gabriel Arakaki Giovanini <mail@gabrielgio.me> | 2023-06-18 16:30:36 +0200 |
commit | c8e1328164e9ffbd681c3c0e449f1e6b9856b896 (patch) | |
tree | faee639a4c55c5dc3bfc59a5400026822c40221d /pkg/worker | |
download | lens-c8e1328164e9ffbd681c3c0e449f1e6b9856b896.tar.gz lens-c8e1328164e9ffbd681c3c0e449f1e6b9856b896.tar.bz2 lens-c8e1328164e9ffbd681c3c0e449f1e6b9856b896.zip |
feat: Inicial commit
It contains rough template for the server and runners.
It contains rough template for the server and runners.
Diffstat (limited to 'pkg/worker')
-rw-r--r-- | pkg/worker/exif_scanner.go | 43 | ||||
-rw-r--r-- | pkg/worker/file_scanner.go | 81 | ||||
-rw-r--r-- | pkg/worker/httpserver.go | 31 | ||||
-rw-r--r-- | pkg/worker/list_processor.go | 102 | ||||
-rw-r--r-- | pkg/worker/list_processor_test.go | 90 | ||||
-rw-r--r-- | pkg/worker/scheduler.go | 29 | ||||
-rw-r--r-- | pkg/worker/worker.go | 54 |
7 files changed, 430 insertions, 0 deletions
diff --git a/pkg/worker/exif_scanner.go b/pkg/worker/exif_scanner.go new file mode 100644 index 0000000..66091cd --- /dev/null +++ b/pkg/worker/exif_scanner.go @@ -0,0 +1,43 @@ +package worker + +import ( + "context" + + "git.sr.ht/~gabrielgio/img/pkg/components/media" + "git.sr.ht/~gabrielgio/img/pkg/fileop" +) + +type ( + EXIFScanner struct { + repository media.Repository + } +) + +var _ ListProcessor[*media.Media] = &EXIFScanner{} + +func NewEXIFScanner(root string, repository media.Repository) *EXIFScanner { + return &EXIFScanner{ + repository: repository, + } +} + +func (e *EXIFScanner) Query(ctx context.Context) ([]*media.Media, error) { + medias, err := e.repository.GetEmptyEXIF(ctx, &media.Pagination{ + Page: 0, + Size: 100, + }) + if err != nil { + return nil, err + } + + return medias, nil +} + +func (e *EXIFScanner) Process(ctx context.Context, m *media.Media) error { + newExif, err := fileop.ReadExif(m.Path) + if err != nil { + return err + } + + return e.repository.CreateEXIF(ctx, m.ID, newExif) +} diff --git a/pkg/worker/file_scanner.go b/pkg/worker/file_scanner.go new file mode 100644 index 0000000..321fbca --- /dev/null +++ b/pkg/worker/file_scanner.go @@ -0,0 +1,81 @@ +package worker + +import ( + "context" + "crypto/md5" + "encoding/hex" + "io/fs" + "path/filepath" + + "github.com/gabriel-vasile/mimetype" + + "git.sr.ht/~gabrielgio/img/pkg/components/media" +) + +type ( + FileScanner struct { + root string + repository media.Repository + } +) + +var _ ChanProcessor[string] = &FileScanner{} + +func NewFileScanner(root string, repository media.Repository) *FileScanner { + return &FileScanner{ + root: root, + repository: repository, + } +} + +func (f *FileScanner) Query(ctx context.Context) (<-chan string, error) { + c := make(chan string) + go func() { + defer close(c) + _ = filepath.Walk(f.root, func(path string, info fs.FileInfo, err error) error { + if info.IsDir() && filepath.Base(info.Name())[0] == '.' { + return filepath.SkipDir + } + + if info.IsDir() { + return nil + } + + if filepath.Ext(info.Name()) != ".jpg" && + filepath.Ext(info.Name()) != ".jpeg" && + filepath.Ext(info.Name()) != ".png" { + return nil + } + c <- path + return nil + }) + }() + return c, nil +} + +func (f *FileScanner) Process(ctx context.Context, path string) error { + hash := md5.Sum([]byte(path)) + str := hex.EncodeToString(hash[:]) + name := filepath.Base(path) + + exists, errResp := f.repository.Exists(ctx, str) + if errResp != nil { + return errResp + } + + if exists { + return nil + } + + mime, errResp := mimetype.DetectFile(path) + if errResp != nil { + return errResp + } + + return f.repository.Create(ctx, &media.CreateMedia{ + Name: name, + Path: path, + PathHash: str, + MIMEType: mime.String(), + }) +} diff --git a/pkg/worker/httpserver.go b/pkg/worker/httpserver.go new file mode 100644 index 0000000..181cf73 --- /dev/null +++ b/pkg/worker/httpserver.go @@ -0,0 +1,31 @@ +package worker + +import ( + "context" + + "github.com/valyala/fasthttp" +) + +type ServerWorker struct { + server *fasthttp.Server +} + +func (self *ServerWorker) Start(ctx context.Context) error { + go func() { + // nolint: errcheck + self.server.ListenAndServe("0.0.0.0:8080") + }() + + <-ctx.Done() + return self.Shutdown() +} + +func (self *ServerWorker) Shutdown() error { + return self.server.Shutdown() +} + +func NewServerWorker(server *fasthttp.Server) *ServerWorker { + return &ServerWorker{ + server: server, + } +} diff --git a/pkg/worker/list_processor.go b/pkg/worker/list_processor.go new file mode 100644 index 0000000..d53b7ea --- /dev/null +++ b/pkg/worker/list_processor.go @@ -0,0 +1,102 @@ +package worker + +import ( + "context" +) + +type ( + + // A simple worker to deal with list. + ChanProcessor[T any] interface { + Query(context.Context) (<-chan T, error) + Process(context.Context, T) error + } + + ListProcessor[T any] interface { + Query(context.Context) ([]T, error) + Process(context.Context, T) error + } + + chanProcessorWorker[T any] struct { + chanProcessor ChanProcessor[T] + scheduler *Scheduler + } + + listProcessorWorker[T any] struct { + listProcessor ListProcessor[T] + scheduler *Scheduler + } +) + +func NewWorkerFromListProcessor[T any]( + listProcessor ListProcessor[T], + scheduler *Scheduler, +) Worker { + return &listProcessorWorker[T]{ + listProcessor: listProcessor, + scheduler: scheduler, + } +} + +func NewWorkerFromChanProcessor[T any]( + listProcessor ChanProcessor[T], + scheduler *Scheduler, +) Worker { + return &chanProcessorWorker[T]{ + chanProcessor: listProcessor, + scheduler: scheduler, + } +} + +func (l *listProcessorWorker[T]) Start(ctx context.Context) error { + for { + values, err := l.listProcessor.Query(ctx) + if err != nil { + return err + } + + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + if len(values) == 0 { + return nil + } + + for _, v := range values { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + if err := l.listProcessor.Process(ctx, v); err != nil { + return err + } + } + } +} + +func (l *chanProcessorWorker[T]) Start(ctx context.Context) error { + c, err := l.chanProcessor.Query(ctx) + if err != nil { + return err + } + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case v, ok := <-c: + if !ok { + return nil + } + + if err := l.chanProcessor.Process(ctx, v); err != nil { + return err + } + } + } +} diff --git a/pkg/worker/list_processor_test.go b/pkg/worker/list_processor_test.go new file mode 100644 index 0000000..b7373d1 --- /dev/null +++ b/pkg/worker/list_processor_test.go @@ -0,0 +1,90 @@ +// go:build unit + +package worker + +import ( + "context" + "errors" + "math/rand" + "sync" + "testing" + + "git.sr.ht/~gabrielgio/img/pkg/testkit" +) + +type ( + mockCounterListProcessor struct { + done bool + countTo int + counter int + } + + mockContextListProcessor struct { + } +) + +func TestListProcessorLimit(t *testing.T) { + mock := &mockCounterListProcessor{ + countTo: 10000, + } + worker := NewWorkerFromListProcessor[int](mock, nil) + + err := worker.Start(context.Background()) + testkit.TestFatalError(t, "Start", err) + + testkit.TestValue(t, "Start", mock.countTo, mock.counter) +} + +func TestListProcessorContextCancelQuery(t *testing.T) { + mock := &mockContextListProcessor{} + worker := NewWorkerFromListProcessor[int](mock, nil) + + ctx, cancel := context.WithCancel(context.Background()) + var wg sync.WaitGroup + + wg.Add(1) + go func() { + defer wg.Done() + err := worker.Start(ctx) + if errors.Is(err, context.Canceled) { + return + } + testkit.TestFatalError(t, "Start", err) + }() + + cancel() + // this rely on timeout to test + wg.Wait() +} + +func (m *mockCounterListProcessor) Query(_ context.Context) ([]int, error) { + if m.done { + return make([]int, 0), nil + } + values := make([]int, 0, m.countTo) + for i := 0; i < m.countTo; i++ { + values = append(values, rand.Int()) + } + + m.done = true + return values, nil +} + +func (m *mockCounterListProcessor) Process(_ context.Context, _ int) error { + m.counter++ + return nil +} + +func (m *mockContextListProcessor) Query(_ context.Context) ([]int, error) { + // keeps returning the query so it can run in infinity loop + values := make([]int, 0, 10) + for i := 0; i < 10; i++ { + values = append(values, rand.Int()) + } + return values, nil +} + +func (m *mockContextListProcessor) Process(_ context.Context, _ int) error { + // do nothing + return nil +} diff --git a/pkg/worker/scheduler.go b/pkg/worker/scheduler.go new file mode 100644 index 0000000..b410b33 --- /dev/null +++ b/pkg/worker/scheduler.go @@ -0,0 +1,29 @@ +package worker + +import ( + "fmt" + "sync/atomic" +) + +type Scheduler struct { + pool chan any + count atomic.Int64 +} + +func NewScheduler(count uint) *Scheduler { + return &Scheduler{ + pool: make(chan any, count), + } +} + +func (self *Scheduler) Take() { + self.pool <- nil + self.count.Add(1) + fmt.Printf("<- %d\n", self.count.Load()) +} + +func (self *Scheduler) Return() { + <-self.pool + self.count.Add(-1) + fmt.Printf("-> %d\n", self.count.Load()) +} diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go new file mode 100644 index 0000000..c52f0be --- /dev/null +++ b/pkg/worker/worker.go @@ -0,0 +1,54 @@ +package worker + +import ( + "context" + "errors" + "fmt" + "sync" +) + +type ( + // Worker should watch for context + Worker interface { + Start(context.Context) error + } + + Work struct { + Name string + Worker Worker + } + + WorkerPool struct { + workers []*Work + wg sync.WaitGroup + } +) + +func NewWorkerPool() *WorkerPool { + return &WorkerPool{} +} + +func (self *WorkerPool) AddWorker(name string, worker Worker) { + self.workers = append(self.workers, &Work{ + Name: name, + Worker: worker, + }) +} + +func (self *WorkerPool) Start(ctx context.Context) { + for _, w := range self.workers { + self.wg.Add(1) + go func(w *Work) { + defer self.wg.Done() + if err := w.Worker.Start(ctx); err != nil && !errors.Is(err, context.Canceled) { + fmt.Println("Error ", w.Name, err.Error()) + } else { + fmt.Println(w.Name, "done") + } + }(w) + } +} + +func (self *WorkerPool) Wait() { + self.wg.Wait() +} |