aboutsummaryrefslogtreecommitdiff
path: root/pkg/worker
diff options
context:
space:
mode:
authorGabriel Arakaki Giovanini <mail@gabrielgio.me>2023-06-26 21:40:40 +0200
committerGabriel Arakaki Giovanini <mail@gabrielgio.me>2023-06-26 21:42:17 +0200
commitd4e1ca3a48e74573df6965ceee217e119ff899ae (patch)
treecbe4f1e981aa67a1a2bca62b20d9687b3550af3d /pkg/worker
parente1c8bb1bd5381d8ade3c699a2d6b4fb373112880 (diff)
downloadlens-d4e1ca3a48e74573df6965ceee217e119ff899ae.tar.gz
lens-d4e1ca3a48e74573df6965ceee217e119ff899ae.tar.bz2
lens-d4e1ca3a48e74573df6965ceee217e119ff899ae.zip
feat: Add scheduler to chan processor
Diffstat (limited to 'pkg/worker')
-rw-r--r--pkg/worker/file_scanner.go21
-rw-r--r--pkg/worker/list_processor.go13
2 files changed, 22 insertions, 12 deletions
diff --git a/pkg/worker/file_scanner.go b/pkg/worker/file_scanner.go
index 0dc2eb2..fda869c 100644
--- a/pkg/worker/file_scanner.go
+++ b/pkg/worker/file_scanner.go
@@ -5,9 +5,9 @@ import (
"crypto/md5"
"encoding/hex"
"io/fs"
+ "mime"
"path/filepath"
-
- "github.com/gabriel-vasile/mimetype"
+ "strings"
"git.sr.ht/~gabrielgio/img/pkg/components/media"
)
@@ -39,6 +39,10 @@ func (f *FileScanner) Query(ctx context.Context) (<-chan string, error) {
default:
}
+ if info == nil {
+ return nil
+ }
+
if info.IsDir() && filepath.Base(info.Name())[0] == '.' {
return filepath.SkipDir
}
@@ -47,11 +51,6 @@ func (f *FileScanner) Query(ctx context.Context) (<-chan string, error) {
return nil
}
- if filepath.Ext(info.Name()) != ".jpg" &&
- filepath.Ext(info.Name()) != ".jpeg" &&
- filepath.Ext(info.Name()) != ".png" {
- return nil
- }
c <- path
return nil
})
@@ -60,6 +59,11 @@ func (f *FileScanner) Query(ctx context.Context) (<-chan string, error) {
}
func (f *FileScanner) Process(ctx context.Context, path string) error {
+ m := mime.TypeByExtension(filepath.Ext(path))
+ if !strings.HasPrefix(m, "video") && !strings.HasPrefix(m, "image") {
+ return nil
+ }
+
hash := md5.Sum([]byte(path))
str := hex.EncodeToString(hash[:])
name := filepath.Base(path)
@@ -73,7 +77,6 @@ func (f *FileScanner) Process(ctx context.Context, path string) error {
return nil
}
- mime, errResp := mimetype.DetectFile(path)
if errResp != nil {
return errResp
}
@@ -82,6 +85,6 @@ func (f *FileScanner) Process(ctx context.Context, path string) error {
Name: name,
Path: path,
PathHash: str,
- MIMEType: mime.String(),
+ MIMEType: m,
})
}
diff --git a/pkg/worker/list_processor.go b/pkg/worker/list_processor.go
index bbc9fb7..0a07085 100644
--- a/pkg/worker/list_processor.go
+++ b/pkg/worker/list_processor.go
@@ -23,6 +23,7 @@ type (
chanProcessorWorker[T any] struct {
chanProcessor ChanProcessor[T]
+ logrus *logrus.Entry
scheduler *Scheduler
}
@@ -48,10 +49,12 @@ func NewWorkerFromBatchProcessor[T any](
func NewWorkerFromChanProcessor[T any](
chanProcessor ChanProcessor[T],
scheduler *Scheduler,
+ logrus *logrus.Entry,
) Worker {
return &chanProcessorWorker[T]{
chanProcessor: chanProcessor,
scheduler: scheduler,
+ logrus: logrus,
}
}
@@ -104,9 +107,13 @@ func (l *chanProcessorWorker[T]) Start(ctx context.Context) error {
return nil
}
- if err := l.chanProcessor.Process(ctx, v); err != nil {
- return err
- }
+ l.scheduler.Take()
+ go func(v T) {
+ defer l.scheduler.Return()
+ if err := l.chanProcessor.Process(ctx, v); err != nil {
+ l.logrus.WithError(err).Error("Error processing chan")
+ }
+ }(v)
}
}
}