diff options
| -rw-r--r-- | Makefile | 4 | ||||
| -rw-r--r-- | cmd/server/main.go | 11 | ||||
| -rw-r--r-- | pkg/coroutines/coroutines.go | 1 | ||||
| -rw-r--r-- | pkg/worker/list_processor.go | 23 | ||||
| -rw-r--r-- | pkg/worker/scheduler.go | 12 | 
5 files changed, 23 insertions, 28 deletions
@@ -13,9 +13,7 @@ build:  run: sass  	$(GO_RUN) $(SERVER) \ -		--db-type psql \ -		--db-con "host=localhost user=gabrielgio password=diablo123 dbname=img port=5432 sslmode=disable" \ -		--log-level trace \ +		--log-level error \  		--aes-key=6368616e676520746869732070617373 \  		--root=${HOME} diff --git a/cmd/server/main.go b/cmd/server/main.go index 0abdc09..8b1cc00 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -28,10 +28,11 @@ import (  func main() {  	var ( -		key      = flag.String("aes-key", "", "AES key, either 16, 24, or 32 bytes string to select AES-128, AES-192, or AES-256") -		dbType   = flag.String("db-type", "sqlite", "Database to be used. Choose either mysql, psql or sqlite") -		dbCon    = flag.String("db-con", "main.db", "Database string connection for given database type. Ref: https://gorm.io/docs/connecting_to_the_database.html") -		logLevel = flag.String("log-level", "error", "Log level: Choose either trace, debug, info, warning, error, fatal or panic") +		key            = flag.String("aes-key", "", "AES key, either 16, 24, or 32 bytes string to select AES-128, AES-192, or AES-256") +		dbType         = flag.String("db-type", "sqlite", "Database to be used. Choose either mysql, psql or sqlite") +		dbCon          = flag.String("db-con", "main.db", "Database string connection for given database type. Ref: https://gorm.io/docs/connecting_to_the_database.html") +		logLevel       = flag.String("log-level", "error", "Log level: Choose either trace, debug, info, warning, error, fatal or panic") +		schedulerCount = flag.Uint("scheduler-count", 10, "How many workers are created to process media files")  		// TODO: this will later be replaced by user specific root folder  		root = flag.String("root", "", "root folder for the whole application. All the workers will use it as working directory") @@ -79,7 +80,7 @@ func main() {  	extRouter.AddMiddleware(authMiddleware.LoggedIn)  	extRouter.AddMiddleware(ext.HTML) -	scheduler := worker.NewScheduler(10) +	scheduler := worker.NewScheduler(*schedulerCount)  	// repository  	var ( diff --git a/pkg/coroutines/coroutines.go b/pkg/coroutines/coroutines.go new file mode 100644 index 0000000..c0f7247 --- /dev/null +++ b/pkg/coroutines/coroutines.go @@ -0,0 +1 @@ +package coroutines diff --git a/pkg/worker/list_processor.go b/pkg/worker/list_processor.go index d53b7ea..8169e4e 100644 --- a/pkg/worker/list_processor.go +++ b/pkg/worker/list_processor.go @@ -2,6 +2,8 @@ package worker  import (  	"context" +	"errors" +	"sync"  )  type ( @@ -64,18 +66,21 @@ func (l *listProcessorWorker[T]) Start(ctx context.Context) error {  		if len(values) == 0 {  			return nil  		} +		var wg sync.WaitGroup  		for _, v := range values { -			select { -			case <-ctx.Done(): -				return ctx.Err() -			default: -			} - -			if err := l.listProcessor.Process(ctx, v); err != nil { -				return err -			} +			wg.Add(1) +			l.scheduler.Take() +			go func(v T) { +				defer l.scheduler.Return() +				defer wg.Done() +				if err := l.listProcessor.Process(ctx, v); err != nil && !errors.Is(err, context.Canceled) { +					println("Err", err.Error()) +				} +			}(v)  		} + +		wg.Wait()  	}  } diff --git a/pkg/worker/scheduler.go b/pkg/worker/scheduler.go index b410b33..2ce86fe 100644 --- a/pkg/worker/scheduler.go +++ b/pkg/worker/scheduler.go @@ -1,13 +1,7 @@  package worker -import ( -	"fmt" -	"sync/atomic" -) -  type Scheduler struct { -	pool  chan any -	count atomic.Int64 +	pool chan any  }  func NewScheduler(count uint) *Scheduler { @@ -18,12 +12,8 @@ func NewScheduler(count uint) *Scheduler {  func (self *Scheduler) Take() {  	self.pool <- nil -	self.count.Add(1) -	fmt.Printf("<- %d\n", self.count.Load())  }  func (self *Scheduler) Return() {  	<-self.pool -	self.count.Add(-1) -	fmt.Printf("-> %d\n", self.count.Load())  }  | 
