aboutsummaryrefslogtreecommitdiff
path: root/worker
diff options
context:
space:
mode:
Diffstat (limited to 'worker')
-rw-r--r--worker/worker.go76
1 files changed, 76 insertions, 0 deletions
diff --git a/worker/worker.go b/worker/worker.go
new file mode 100644
index 0000000..5e0c844
--- /dev/null
+++ b/worker/worker.go
@@ -0,0 +1,76 @@
+package worker
+
+import (
+ "context"
+ "time"
+
+ "git.sr.ht/~gabrielgio/midr/db"
+ "git.sr.ht/~gabrielgio/midr/yt"
+ work "git.sr.ht/~sircmpwn/dowork"
+)
+
+const (
+ statusStoped = "STOPPED"
+ statusStarted = "STARTED"
+
+ commandStart = "START"
+ commandStop = "STOP"
+)
+
+type command struct {
+ action string
+ index uint
+}
+
+type Worker struct {
+ jobs map[uint]string
+ c chan command
+}
+
+func (w *Worker) SpawnWorker(index uint, link string, output string) {
+
+ if v, found := w.jobs[index]; found && v == statusStarted {
+ return
+ }
+
+ w.c <- command{action: commandStart, index: index}
+ task := work.NewTask(func(ctx context.Context) error {
+ yt.RunYtDlpProcess(link, output)
+ return nil
+ }).After(func(ctx context.Context, task *work.Task) {
+ w.c <- command{action: commandStop, index: index}
+ })
+
+ work.Enqueue(task)
+}
+
+func (w *Worker) startReader() {
+ for true {
+ command := <-w.c
+
+ if command.action == commandStop {
+ w.jobs[command.index] = statusStoped
+ } else if command.action == commandStart {
+ w.jobs[command.index] = statusStarted
+ } else {
+ panic(1)
+ }
+ }
+}
+
+func (w *Worker) startScheduler(model db.EntryModel) {
+ for true {
+ entries := model.All()
+ for _, e := range entries {
+ w.SpawnWorker(e.ID, e.Link, e.OutputFolder)
+ }
+ time.Sleep(30 * time.Minute)
+ }
+}
+
+func (w *Worker) StartWorker(model db.EntryModel) {
+ w.c = make(chan command, 10)
+ w.jobs = make(map[uint]string)
+ go w.startReader()
+ go w.startScheduler(model)
+}