From 1d6fa60b3c60d068d12b19f10f5ad73e836b5a70 Mon Sep 17 00:00:00 2001 From: Gabriel Arakaki Giovanini Date: Sat, 24 Jun 2023 00:00:00 +0200 Subject: feat: Add scheduler again Since I have fixed the issue with the `SIGTINT` hanging the application I can readd the scheduler once more. Also move the param for amount of scheduler work. --- Makefile | 4 +--- cmd/server/main.go | 11 ++++++----- pkg/coroutines/coroutines.go | 1 + pkg/worker/list_processor.go | 23 ++++++++++++++--------- pkg/worker/scheduler.go | 12 +----------- 5 files changed, 23 insertions(+), 28 deletions(-) create mode 100644 pkg/coroutines/coroutines.go diff --git a/Makefile b/Makefile index 5731808..85f22dd 100644 --- a/Makefile +++ b/Makefile @@ -13,9 +13,7 @@ build: run: sass $(GO_RUN) $(SERVER) \ - --db-type psql \ - --db-con "host=localhost user=gabrielgio password=diablo123 dbname=img port=5432 sslmode=disable" \ - --log-level trace \ + --log-level error \ --aes-key=6368616e676520746869732070617373 \ --root=${HOME} diff --git a/cmd/server/main.go b/cmd/server/main.go index 0abdc09..8b1cc00 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -28,10 +28,11 @@ import ( func main() { var ( - key = flag.String("aes-key", "", "AES key, either 16, 24, or 32 bytes string to select AES-128, AES-192, or AES-256") - dbType = flag.String("db-type", "sqlite", "Database to be used. Choose either mysql, psql or sqlite") - dbCon = flag.String("db-con", "main.db", "Database string connection for given database type. Ref: https://gorm.io/docs/connecting_to_the_database.html") - logLevel = flag.String("log-level", "error", "Log level: Choose either trace, debug, info, warning, error, fatal or panic") + key = flag.String("aes-key", "", "AES key, either 16, 24, or 32 bytes string to select AES-128, AES-192, or AES-256") + dbType = flag.String("db-type", "sqlite", "Database to be used. Choose either mysql, psql or sqlite") + dbCon = flag.String("db-con", "main.db", "Database string connection for given database type. Ref: https://gorm.io/docs/connecting_to_the_database.html") + logLevel = flag.String("log-level", "error", "Log level: Choose either trace, debug, info, warning, error, fatal or panic") + schedulerCount = flag.Uint("scheduler-count", 10, "How many workers are created to process media files") // TODO: this will later be replaced by user specific root folder root = flag.String("root", "", "root folder for the whole application. All the workers will use it as working directory") @@ -79,7 +80,7 @@ func main() { extRouter.AddMiddleware(authMiddleware.LoggedIn) extRouter.AddMiddleware(ext.HTML) - scheduler := worker.NewScheduler(10) + scheduler := worker.NewScheduler(*schedulerCount) // repository var ( diff --git a/pkg/coroutines/coroutines.go b/pkg/coroutines/coroutines.go new file mode 100644 index 0000000..c0f7247 --- /dev/null +++ b/pkg/coroutines/coroutines.go @@ -0,0 +1 @@ +package coroutines diff --git a/pkg/worker/list_processor.go b/pkg/worker/list_processor.go index d53b7ea..8169e4e 100644 --- a/pkg/worker/list_processor.go +++ b/pkg/worker/list_processor.go @@ -2,6 +2,8 @@ package worker import ( "context" + "errors" + "sync" ) type ( @@ -64,18 +66,21 @@ func (l *listProcessorWorker[T]) Start(ctx context.Context) error { if len(values) == 0 { return nil } + var wg sync.WaitGroup for _, v := range values { - select { - case <-ctx.Done(): - return ctx.Err() - default: - } - - if err := l.listProcessor.Process(ctx, v); err != nil { - return err - } + wg.Add(1) + l.scheduler.Take() + go func(v T) { + defer l.scheduler.Return() + defer wg.Done() + if err := l.listProcessor.Process(ctx, v); err != nil && !errors.Is(err, context.Canceled) { + println("Err", err.Error()) + } + }(v) } + + wg.Wait() } } diff --git a/pkg/worker/scheduler.go b/pkg/worker/scheduler.go index b410b33..2ce86fe 100644 --- a/pkg/worker/scheduler.go +++ b/pkg/worker/scheduler.go @@ -1,13 +1,7 @@ package worker -import ( - "fmt" - "sync/atomic" -) - type Scheduler struct { - pool chan any - count atomic.Int64 + pool chan any } func NewScheduler(count uint) *Scheduler { @@ -18,12 +12,8 @@ func NewScheduler(count uint) *Scheduler { func (self *Scheduler) Take() { self.pool <- nil - self.count.Add(1) - fmt.Printf("<- %d\n", self.count.Load()) } func (self *Scheduler) Return() { <-self.pool - self.count.Add(-1) - fmt.Printf("-> %d\n", self.count.Load()) } -- cgit v1.2.3