From d4e1ca3a48e74573df6965ceee217e119ff899ae Mon Sep 17 00:00:00 2001 From: Gabriel Arakaki Giovanini Date: Mon, 26 Jun 2023 21:40:40 +0200 Subject: feat: Add scheduler to chan processor --- cmd/server/main.go | 8 ++++++-- go.mod | 2 -- go.sum | 4 ---- pkg/worker/file_scanner.go | 21 ++++++++++++--------- pkg/worker/list_processor.go | 13 ++++++++++--- 5 files changed, 28 insertions(+), 20 deletions(-) diff --git a/cmd/server/main.go b/cmd/server/main.go index d7c2fd6..f58366f 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -118,8 +118,12 @@ func main() { // worker var ( serverWorker = worker.NewServerWorker(&fasthttp.Server{Handler: r.Handler}) - fileWorker = worker.NewWorkerFromChanProcessor[string](fileScanner, scheduler) - exifWorker = worker.NewWorkerFromBatchProcessor[*media.Media]( + fileWorker = worker.NewWorkerFromChanProcessor[string]( + fileScanner, + scheduler, + logrus.WithField("context", "file scanner"), + ) + exifWorker = worker.NewWorkerFromBatchProcessor[*media.Media]( exifScanner, scheduler, logrus.WithField("context", "exif scanner"), diff --git a/go.mod b/go.mod index 4778b57..abd7388 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,6 @@ go 1.19 require ( github.com/barasher/go-exiftool v1.10.0 github.com/fasthttp/router v1.4.19 - github.com/gabriel-vasile/mimetype v1.4.2 github.com/google/go-cmp v0.5.9 github.com/samber/lo v1.38.1 github.com/sirupsen/logrus v1.9.2 @@ -33,7 +32,6 @@ require ( github.com/valyala/bytebufferpool v1.0.0 // indirect golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 // indirect golang.org/x/mod v0.10.0 // indirect - golang.org/x/net v0.10.0 // indirect golang.org/x/sys v0.8.0 // indirect golang.org/x/text v0.9.0 // indirect golang.org/x/tools v0.9.3 // indirect diff --git a/go.sum b/go.sum index d0e670b..ac79e0e 100644 --- a/go.sum +++ b/go.sum @@ -7,8 +7,6 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/fasthttp/router v1.4.19 h1:RLE539IU/S4kfb4MP56zgP0TIBU9kEg0ID9GpWO0vqk= github.com/fasthttp/router v1.4.19/go.mod h1:+Fh3YOd8x1+he6ZS+d2iUDBH9MGGZ1xQFUor0DE9rKE= -github.com/gabriel-vasile/mimetype v1.4.2 h1:w5qFW6JKBz9Y393Y4q372O9A7cUSequkh1Q7OhCmWKU= -github.com/gabriel-vasile/mimetype v1.4.2/go.mod h1:zApsH/mKG4w07erKIaJPFiX0Tsq9BFQgN3qGY5GnNgA= github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= github.com/go-sql-driver/mysql v1.7.0/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI= github.com/go-sql-driver/mysql v1.7.1 h1:lUIinVbN1DY0xBg0eMOzmmtGoHwWBbvnWubQUrtU8EI= @@ -58,8 +56,6 @@ golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 h1:3MTrJm4PyNL9NBqvYDSj3DHl4 golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17/go.mod h1:lgLbSvA5ygNOMpwM/9anMpWVlVJ7Z+cHWq/eFuinpGE= golang.org/x/mod v0.10.0 h1:lFO9qtOdlre5W1jxS3r/4szv2/6iXxScdzjoBMXNhYk= golang.org/x/mod v0.10.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= -golang.org/x/net v0.10.0 h1:X2//UzNDwYmtCLn7To6G58Wr6f5ahEAQgKNzv9Y951M= -golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= golang.org/x/sync v0.2.0 h1:PUR+T4wwASmuSTYdKjYHI5TD22Wy5ogLU5qZCOLxBrI= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU= diff --git a/pkg/worker/file_scanner.go b/pkg/worker/file_scanner.go index 0dc2eb2..fda869c 100644 --- a/pkg/worker/file_scanner.go +++ b/pkg/worker/file_scanner.go @@ -5,9 +5,9 @@ import ( "crypto/md5" "encoding/hex" "io/fs" + "mime" "path/filepath" - - "github.com/gabriel-vasile/mimetype" + "strings" "git.sr.ht/~gabrielgio/img/pkg/components/media" ) @@ -39,6 +39,10 @@ func (f *FileScanner) Query(ctx context.Context) (<-chan string, error) { default: } + if info == nil { + return nil + } + if info.IsDir() && filepath.Base(info.Name())[0] == '.' { return filepath.SkipDir } @@ -47,11 +51,6 @@ func (f *FileScanner) Query(ctx context.Context) (<-chan string, error) { return nil } - if filepath.Ext(info.Name()) != ".jpg" && - filepath.Ext(info.Name()) != ".jpeg" && - filepath.Ext(info.Name()) != ".png" { - return nil - } c <- path return nil }) @@ -60,6 +59,11 @@ func (f *FileScanner) Query(ctx context.Context) (<-chan string, error) { } func (f *FileScanner) Process(ctx context.Context, path string) error { + m := mime.TypeByExtension(filepath.Ext(path)) + if !strings.HasPrefix(m, "video") && !strings.HasPrefix(m, "image") { + return nil + } + hash := md5.Sum([]byte(path)) str := hex.EncodeToString(hash[:]) name := filepath.Base(path) @@ -73,7 +77,6 @@ func (f *FileScanner) Process(ctx context.Context, path string) error { return nil } - mime, errResp := mimetype.DetectFile(path) if errResp != nil { return errResp } @@ -82,6 +85,6 @@ func (f *FileScanner) Process(ctx context.Context, path string) error { Name: name, Path: path, PathHash: str, - MIMEType: mime.String(), + MIMEType: m, }) } diff --git a/pkg/worker/list_processor.go b/pkg/worker/list_processor.go index bbc9fb7..0a07085 100644 --- a/pkg/worker/list_processor.go +++ b/pkg/worker/list_processor.go @@ -23,6 +23,7 @@ type ( chanProcessorWorker[T any] struct { chanProcessor ChanProcessor[T] + logrus *logrus.Entry scheduler *Scheduler } @@ -48,10 +49,12 @@ func NewWorkerFromBatchProcessor[T any]( func NewWorkerFromChanProcessor[T any]( chanProcessor ChanProcessor[T], scheduler *Scheduler, + logrus *logrus.Entry, ) Worker { return &chanProcessorWorker[T]{ chanProcessor: chanProcessor, scheduler: scheduler, + logrus: logrus, } } @@ -104,9 +107,13 @@ func (l *chanProcessorWorker[T]) Start(ctx context.Context) error { return nil } - if err := l.chanProcessor.Process(ctx, v); err != nil { - return err - } + l.scheduler.Take() + go func(v T) { + defer l.scheduler.Return() + if err := l.chanProcessor.Process(ctx, v); err != nil { + l.logrus.WithError(err).Error("Error processing chan") + } + }(v) } } } -- cgit v1.2.3