diff options
author | Gabriel Arakaki Giovanini <mail@gabrielgio.me> | 2023-06-26 21:40:40 +0200 |
---|---|---|
committer | Gabriel Arakaki Giovanini <mail@gabrielgio.me> | 2023-06-26 21:42:17 +0200 |
commit | d4e1ca3a48e74573df6965ceee217e119ff899ae (patch) | |
tree | cbe4f1e981aa67a1a2bca62b20d9687b3550af3d /pkg | |
parent | e1c8bb1bd5381d8ade3c699a2d6b4fb373112880 (diff) | |
download | lens-d4e1ca3a48e74573df6965ceee217e119ff899ae.tar.gz lens-d4e1ca3a48e74573df6965ceee217e119ff899ae.tar.bz2 lens-d4e1ca3a48e74573df6965ceee217e119ff899ae.zip |
feat: Add scheduler to chan processor
Diffstat (limited to 'pkg')
-rw-r--r-- | pkg/worker/file_scanner.go | 21 | ||||
-rw-r--r-- | pkg/worker/list_processor.go | 13 |
2 files changed, 22 insertions, 12 deletions
diff --git a/pkg/worker/file_scanner.go b/pkg/worker/file_scanner.go index 0dc2eb2..fda869c 100644 --- a/pkg/worker/file_scanner.go +++ b/pkg/worker/file_scanner.go @@ -5,9 +5,9 @@ import ( "crypto/md5" "encoding/hex" "io/fs" + "mime" "path/filepath" - - "github.com/gabriel-vasile/mimetype" + "strings" "git.sr.ht/~gabrielgio/img/pkg/components/media" ) @@ -39,6 +39,10 @@ func (f *FileScanner) Query(ctx context.Context) (<-chan string, error) { default: } + if info == nil { + return nil + } + if info.IsDir() && filepath.Base(info.Name())[0] == '.' { return filepath.SkipDir } @@ -47,11 +51,6 @@ func (f *FileScanner) Query(ctx context.Context) (<-chan string, error) { return nil } - if filepath.Ext(info.Name()) != ".jpg" && - filepath.Ext(info.Name()) != ".jpeg" && - filepath.Ext(info.Name()) != ".png" { - return nil - } c <- path return nil }) @@ -60,6 +59,11 @@ func (f *FileScanner) Query(ctx context.Context) (<-chan string, error) { } func (f *FileScanner) Process(ctx context.Context, path string) error { + m := mime.TypeByExtension(filepath.Ext(path)) + if !strings.HasPrefix(m, "video") && !strings.HasPrefix(m, "image") { + return nil + } + hash := md5.Sum([]byte(path)) str := hex.EncodeToString(hash[:]) name := filepath.Base(path) @@ -73,7 +77,6 @@ func (f *FileScanner) Process(ctx context.Context, path string) error { return nil } - mime, errResp := mimetype.DetectFile(path) if errResp != nil { return errResp } @@ -82,6 +85,6 @@ func (f *FileScanner) Process(ctx context.Context, path string) error { Name: name, Path: path, PathHash: str, - MIMEType: mime.String(), + MIMEType: m, }) } diff --git a/pkg/worker/list_processor.go b/pkg/worker/list_processor.go index bbc9fb7..0a07085 100644 --- a/pkg/worker/list_processor.go +++ b/pkg/worker/list_processor.go @@ -23,6 +23,7 @@ type ( chanProcessorWorker[T any] struct { chanProcessor ChanProcessor[T] + logrus *logrus.Entry scheduler *Scheduler } @@ -48,10 +49,12 @@ func NewWorkerFromBatchProcessor[T any]( func NewWorkerFromChanProcessor[T any]( chanProcessor ChanProcessor[T], scheduler *Scheduler, + logrus *logrus.Entry, ) Worker { return &chanProcessorWorker[T]{ chanProcessor: chanProcessor, scheduler: scheduler, + logrus: logrus, } } @@ -104,9 +107,13 @@ func (l *chanProcessorWorker[T]) Start(ctx context.Context) error { return nil } - if err := l.chanProcessor.Process(ctx, v); err != nil { - return err - } + l.scheduler.Take() + go func(v T) { + defer l.scheduler.Return() + if err := l.chanProcessor.Process(ctx, v); err != nil { + l.logrus.WithError(err).Error("Error processing chan") + } + }(v) } } } |