aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGabriel Arakaki Giovanini <mail@gabrielgio.me>2023-06-26 21:40:40 +0200
committerGabriel Arakaki Giovanini <mail@gabrielgio.me>2023-06-26 21:42:17 +0200
commitd4e1ca3a48e74573df6965ceee217e119ff899ae (patch)
treecbe4f1e981aa67a1a2bca62b20d9687b3550af3d
parente1c8bb1bd5381d8ade3c699a2d6b4fb373112880 (diff)
downloadlens-d4e1ca3a48e74573df6965ceee217e119ff899ae.tar.gz
lens-d4e1ca3a48e74573df6965ceee217e119ff899ae.tar.bz2
lens-d4e1ca3a48e74573df6965ceee217e119ff899ae.zip
feat: Add scheduler to chan processor
-rw-r--r--cmd/server/main.go8
-rw-r--r--go.mod2
-rw-r--r--go.sum4
-rw-r--r--pkg/worker/file_scanner.go21
-rw-r--r--pkg/worker/list_processor.go13
5 files changed, 28 insertions, 20 deletions
diff --git a/cmd/server/main.go b/cmd/server/main.go
index d7c2fd6..f58366f 100644
--- a/cmd/server/main.go
+++ b/cmd/server/main.go
@@ -118,8 +118,12 @@ func main() {
// worker
var (
serverWorker = worker.NewServerWorker(&fasthttp.Server{Handler: r.Handler})
- fileWorker = worker.NewWorkerFromChanProcessor[string](fileScanner, scheduler)
- exifWorker = worker.NewWorkerFromBatchProcessor[*media.Media](
+ fileWorker = worker.NewWorkerFromChanProcessor[string](
+ fileScanner,
+ scheduler,
+ logrus.WithField("context", "file scanner"),
+ )
+ exifWorker = worker.NewWorkerFromBatchProcessor[*media.Media](
exifScanner,
scheduler,
logrus.WithField("context", "exif scanner"),
diff --git a/go.mod b/go.mod
index 4778b57..abd7388 100644
--- a/go.mod
+++ b/go.mod
@@ -5,7 +5,6 @@ go 1.19
require (
github.com/barasher/go-exiftool v1.10.0
github.com/fasthttp/router v1.4.19
- github.com/gabriel-vasile/mimetype v1.4.2
github.com/google/go-cmp v0.5.9
github.com/samber/lo v1.38.1
github.com/sirupsen/logrus v1.9.2
@@ -33,7 +32,6 @@ require (
github.com/valyala/bytebufferpool v1.0.0 // indirect
golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 // indirect
golang.org/x/mod v0.10.0 // indirect
- golang.org/x/net v0.10.0 // indirect
golang.org/x/sys v0.8.0 // indirect
golang.org/x/text v0.9.0 // indirect
golang.org/x/tools v0.9.3 // indirect
diff --git a/go.sum b/go.sum
index d0e670b..ac79e0e 100644
--- a/go.sum
+++ b/go.sum
@@ -7,8 +7,6 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/fasthttp/router v1.4.19 h1:RLE539IU/S4kfb4MP56zgP0TIBU9kEg0ID9GpWO0vqk=
github.com/fasthttp/router v1.4.19/go.mod h1:+Fh3YOd8x1+he6ZS+d2iUDBH9MGGZ1xQFUor0DE9rKE=
-github.com/gabriel-vasile/mimetype v1.4.2 h1:w5qFW6JKBz9Y393Y4q372O9A7cUSequkh1Q7OhCmWKU=
-github.com/gabriel-vasile/mimetype v1.4.2/go.mod h1:zApsH/mKG4w07erKIaJPFiX0Tsq9BFQgN3qGY5GnNgA=
github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
github.com/go-sql-driver/mysql v1.7.0/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI=
github.com/go-sql-driver/mysql v1.7.1 h1:lUIinVbN1DY0xBg0eMOzmmtGoHwWBbvnWubQUrtU8EI=
@@ -58,8 +56,6 @@ golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 h1:3MTrJm4PyNL9NBqvYDSj3DHl4
golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17/go.mod h1:lgLbSvA5ygNOMpwM/9anMpWVlVJ7Z+cHWq/eFuinpGE=
golang.org/x/mod v0.10.0 h1:lFO9qtOdlre5W1jxS3r/4szv2/6iXxScdzjoBMXNhYk=
golang.org/x/mod v0.10.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
-golang.org/x/net v0.10.0 h1:X2//UzNDwYmtCLn7To6G58Wr6f5ahEAQgKNzv9Y951M=
-golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
golang.org/x/sync v0.2.0 h1:PUR+T4wwASmuSTYdKjYHI5TD22Wy5ogLU5qZCOLxBrI=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU=
diff --git a/pkg/worker/file_scanner.go b/pkg/worker/file_scanner.go
index 0dc2eb2..fda869c 100644
--- a/pkg/worker/file_scanner.go
+++ b/pkg/worker/file_scanner.go
@@ -5,9 +5,9 @@ import (
"crypto/md5"
"encoding/hex"
"io/fs"
+ "mime"
"path/filepath"
-
- "github.com/gabriel-vasile/mimetype"
+ "strings"
"git.sr.ht/~gabrielgio/img/pkg/components/media"
)
@@ -39,6 +39,10 @@ func (f *FileScanner) Query(ctx context.Context) (<-chan string, error) {
default:
}
+ if info == nil {
+ return nil
+ }
+
if info.IsDir() && filepath.Base(info.Name())[0] == '.' {
return filepath.SkipDir
}
@@ -47,11 +51,6 @@ func (f *FileScanner) Query(ctx context.Context) (<-chan string, error) {
return nil
}
- if filepath.Ext(info.Name()) != ".jpg" &&
- filepath.Ext(info.Name()) != ".jpeg" &&
- filepath.Ext(info.Name()) != ".png" {
- return nil
- }
c <- path
return nil
})
@@ -60,6 +59,11 @@ func (f *FileScanner) Query(ctx context.Context) (<-chan string, error) {
}
func (f *FileScanner) Process(ctx context.Context, path string) error {
+ m := mime.TypeByExtension(filepath.Ext(path))
+ if !strings.HasPrefix(m, "video") && !strings.HasPrefix(m, "image") {
+ return nil
+ }
+
hash := md5.Sum([]byte(path))
str := hex.EncodeToString(hash[:])
name := filepath.Base(path)
@@ -73,7 +77,6 @@ func (f *FileScanner) Process(ctx context.Context, path string) error {
return nil
}
- mime, errResp := mimetype.DetectFile(path)
if errResp != nil {
return errResp
}
@@ -82,6 +85,6 @@ func (f *FileScanner) Process(ctx context.Context, path string) error {
Name: name,
Path: path,
PathHash: str,
- MIMEType: mime.String(),
+ MIMEType: m,
})
}
diff --git a/pkg/worker/list_processor.go b/pkg/worker/list_processor.go
index bbc9fb7..0a07085 100644
--- a/pkg/worker/list_processor.go
+++ b/pkg/worker/list_processor.go
@@ -23,6 +23,7 @@ type (
chanProcessorWorker[T any] struct {
chanProcessor ChanProcessor[T]
+ logrus *logrus.Entry
scheduler *Scheduler
}
@@ -48,10 +49,12 @@ func NewWorkerFromBatchProcessor[T any](
func NewWorkerFromChanProcessor[T any](
chanProcessor ChanProcessor[T],
scheduler *Scheduler,
+ logrus *logrus.Entry,
) Worker {
return &chanProcessorWorker[T]{
chanProcessor: chanProcessor,
scheduler: scheduler,
+ logrus: logrus,
}
}
@@ -104,9 +107,13 @@ func (l *chanProcessorWorker[T]) Start(ctx context.Context) error {
return nil
}
- if err := l.chanProcessor.Process(ctx, v); err != nil {
- return err
- }
+ l.scheduler.Take()
+ go func(v T) {
+ defer l.scheduler.Return()
+ if err := l.chanProcessor.Process(ctx, v); err != nil {
+ l.logrus.WithError(err).Error("Error processing chan")
+ }
+ }(v)
}
}
}