aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Makefile4
-rw-r--r--cmd/server/main.go11
-rw-r--r--pkg/coroutines/coroutines.go1
-rw-r--r--pkg/worker/list_processor.go23
-rw-r--r--pkg/worker/scheduler.go12
5 files changed, 23 insertions, 28 deletions
diff --git a/Makefile b/Makefile
index 5731808..85f22dd 100644
--- a/Makefile
+++ b/Makefile
@@ -13,9 +13,7 @@ build:
run: sass
$(GO_RUN) $(SERVER) \
- --db-type psql \
- --db-con "host=localhost user=gabrielgio password=diablo123 dbname=img port=5432 sslmode=disable" \
- --log-level trace \
+ --log-level error \
--aes-key=6368616e676520746869732070617373 \
--root=${HOME}
diff --git a/cmd/server/main.go b/cmd/server/main.go
index 0abdc09..8b1cc00 100644
--- a/cmd/server/main.go
+++ b/cmd/server/main.go
@@ -28,10 +28,11 @@ import (
func main() {
var (
- key = flag.String("aes-key", "", "AES key, either 16, 24, or 32 bytes string to select AES-128, AES-192, or AES-256")
- dbType = flag.String("db-type", "sqlite", "Database to be used. Choose either mysql, psql or sqlite")
- dbCon = flag.String("db-con", "main.db", "Database string connection for given database type. Ref: https://gorm.io/docs/connecting_to_the_database.html")
- logLevel = flag.String("log-level", "error", "Log level: Choose either trace, debug, info, warning, error, fatal or panic")
+ key = flag.String("aes-key", "", "AES key, either 16, 24, or 32 bytes string to select AES-128, AES-192, or AES-256")
+ dbType = flag.String("db-type", "sqlite", "Database to be used. Choose either mysql, psql or sqlite")
+ dbCon = flag.String("db-con", "main.db", "Database string connection for given database type. Ref: https://gorm.io/docs/connecting_to_the_database.html")
+ logLevel = flag.String("log-level", "error", "Log level: Choose either trace, debug, info, warning, error, fatal or panic")
+ schedulerCount = flag.Uint("scheduler-count", 10, "How many workers are created to process media files")
// TODO: this will later be replaced by user specific root folder
root = flag.String("root", "", "root folder for the whole application. All the workers will use it as working directory")
@@ -79,7 +80,7 @@ func main() {
extRouter.AddMiddleware(authMiddleware.LoggedIn)
extRouter.AddMiddleware(ext.HTML)
- scheduler := worker.NewScheduler(10)
+ scheduler := worker.NewScheduler(*schedulerCount)
// repository
var (
diff --git a/pkg/coroutines/coroutines.go b/pkg/coroutines/coroutines.go
new file mode 100644
index 0000000..c0f7247
--- /dev/null
+++ b/pkg/coroutines/coroutines.go
@@ -0,0 +1 @@
+package coroutines
diff --git a/pkg/worker/list_processor.go b/pkg/worker/list_processor.go
index d53b7ea..8169e4e 100644
--- a/pkg/worker/list_processor.go
+++ b/pkg/worker/list_processor.go
@@ -2,6 +2,8 @@ package worker
import (
"context"
+ "errors"
+ "sync"
)
type (
@@ -64,18 +66,21 @@ func (l *listProcessorWorker[T]) Start(ctx context.Context) error {
if len(values) == 0 {
return nil
}
+ var wg sync.WaitGroup
for _, v := range values {
- select {
- case <-ctx.Done():
- return ctx.Err()
- default:
- }
-
- if err := l.listProcessor.Process(ctx, v); err != nil {
- return err
- }
+ wg.Add(1)
+ l.scheduler.Take()
+ go func(v T) {
+ defer l.scheduler.Return()
+ defer wg.Done()
+ if err := l.listProcessor.Process(ctx, v); err != nil && !errors.Is(err, context.Canceled) {
+ println("Err", err.Error())
+ }
+ }(v)
}
+
+ wg.Wait()
}
}
diff --git a/pkg/worker/scheduler.go b/pkg/worker/scheduler.go
index b410b33..2ce86fe 100644
--- a/pkg/worker/scheduler.go
+++ b/pkg/worker/scheduler.go
@@ -1,13 +1,7 @@
package worker
-import (
- "fmt"
- "sync/atomic"
-)
-
type Scheduler struct {
- pool chan any
- count atomic.Int64
+ pool chan any
}
func NewScheduler(count uint) *Scheduler {
@@ -18,12 +12,8 @@ func NewScheduler(count uint) *Scheduler {
func (self *Scheduler) Take() {
self.pool <- nil
- self.count.Add(1)
- fmt.Printf("<- %d\n", self.count.Load())
}
func (self *Scheduler) Return() {
<-self.pool
- self.count.Add(-1)
- fmt.Printf("-> %d\n", self.count.Load())
}