aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGabriel A. Giovanini <mail@gabrielgio.me>2022-07-22 15:25:27 +0200
committerGabriel A. Giovanini <mail@gabrielgio.me>2022-07-22 15:25:27 +0200
commit1e984fc8ced6a5915dbd7b6e17bd942e8438cf27 (patch)
tree1dbb64f3438030e3b1a3a45e728e73ee74787279
parent90d9d819b70f68e10482954cfc461737c0165f8a (diff)
downloadmdir-1e984fc8ced6a5915dbd7b6e17bd942e8438cf27.tar.gz
mdir-1e984fc8ced6a5915dbd7b6e17bd942e8438cf27.tar.bz2
mdir-1e984fc8ced6a5915dbd7b6e17bd942e8438cf27.zip
ref: Move the yt manager to the worker
Simplify the worker/manager relationship. Now the worker is responsible for the managing the yt-dlp process as well. Also introduce chan to report back logs. That is an attempt to decouple things.
-rw-r--r--README.md1
-rw-r--r--controller/controller.go18
-rw-r--r--worker/worker.go47
-rw-r--r--yt/manager.go25
4 files changed, 57 insertions, 34 deletions
diff --git a/README.md b/README.md
index e19fa46..7c89770 100644
--- a/README.md
+++ b/README.md
@@ -2,4 +2,3 @@
This projects aim to automat a bit youtube-dl/yt-dlp downloads and learn some
go in the process.
-
diff --git a/controller/controller.go b/controller/controller.go
index c7f4145..701d34c 100644
--- a/controller/controller.go
+++ b/controller/controller.go
@@ -1,8 +1,10 @@
package controller
import (
+ "log"
"net/http"
"strconv"
+ "strings"
"time"
"git.sr.ht/~gabrielgio/midr/db"
@@ -15,6 +17,13 @@ type Env struct {
Worker worker.Worker
}
+func logBytes(logc <-chan []byte) {
+ for l := range logc {
+ logs := strings.TrimRight(string(l), "\t \n")
+ log.Println(logs)
+ }
+}
+
func (e *Env) GetEntries(c *gin.Context) {
entries := e.Entries.All()
c.HTML(http.StatusOK, "index", entries)
@@ -41,7 +50,9 @@ func (e *Env) CreateEntry(c *gin.Context) {
var entry db.Entry
c.ShouldBind(&entry)
e.Entries.Create(&entry)
- e.Worker.SpawnWorker(&entry)
+ log := e.Worker.RunYtDlpWorker(&entry)
+ go logBytes(log)
+
c.Redirect(http.StatusFound, "/")
}
@@ -66,7 +77,10 @@ func (e *Env) StartScheduler() {
entries := e.Entries.All()
for _, entry := range entries {
- e.Worker.SpawnWorker(&entry)
+ log := e.Worker.RunYtDlpWorker(&entry)
+ if log != nil {
+ go logBytes(log)
+ }
}
time.Sleep(30 * time.Second)
}
diff --git a/worker/worker.go b/worker/worker.go
index 2444e89..525a736 100644
--- a/worker/worker.go
+++ b/worker/worker.go
@@ -1,10 +1,13 @@
package worker
import (
+ "bufio"
+ "bytes"
"context"
+ "fmt"
+ "os/exec"
"git.sr.ht/~gabrielgio/midr/db"
- "git.sr.ht/~gabrielgio/midr/yt"
work "git.sr.ht/~sircmpwn/dowork"
)
@@ -40,6 +43,30 @@ func NewWorkder() Worker {
}
}
+func RunYtDlpProcess(entry *db.Entry) (error, []byte, []byte) {
+ args := []string{entry.Link}
+ var stdout bytes.Buffer
+ var stderr bytes.Buffer
+
+ output_template := fmt.Sprintf("%s/%%(title)s.%%(ext)s", entry.OutputFolder)
+ args = append(args, "-o", output_template)
+
+ downloaded_txt := fmt.Sprintf("%s/downloaded.txt", entry.OutputFolder)
+ args = append(args, "--download-archive", downloaded_txt)
+
+ if len(entry.DateAfter) > 0 {
+ args = append(args, "--dateafter", entry.DateAfter)
+ }
+
+ cmd := exec.Command("yt-dlp", args...)
+ cmd.Stdout = bufio.NewWriter(&stdout)
+ cmd.Stderr = bufio.NewWriter(&stderr)
+
+ err := cmd.Run()
+
+ return err, stdout.Bytes(), stderr.Bytes()
+}
+
func (w *Worker) CanEnqueue(index uint) bool {
v, found := w.jobs[index]
return !found || v == statusNotQueued
@@ -49,23 +76,31 @@ func (w *Worker) RemoveJob(id uint) {
delete(w.jobs, id)
}
-func (w *Worker) SpawnWorker(entry *db.Entry) {
-
+func (w *Worker) RunYtDlpWorker(entry *db.Entry) <-chan []byte {
if !w.CanEnqueue(entry.ID) {
- return
+ return nil
}
+ log := make(chan []byte)
+
w.c <- command{action: commandEnqueue, index: entry.ID}
task := work.NewTask(func(ctx context.Context) error {
w.c <- command{action: commandStart, index: entry.ID}
- yt.RunYtDlpProcess(entry)
- return nil
+ err, stdout, stderr := RunYtDlpProcess(entry)
+
+ log <- stdout
+ log <- stderr
+
+ return err
}).After(func(ctx context.Context, task *work.Task) {
w.c <- command{action: commandDequeue, index: entry.ID}
+ close(log)
})
work.Enqueue(task)
+
+ return log
}
func (w *Worker) startReader() {
diff --git a/yt/manager.go b/yt/manager.go
deleted file mode 100644
index b9dc333..0000000
--- a/yt/manager.go
+++ /dev/null
@@ -1,25 +0,0 @@
-package yt
-
-import (
- "fmt"
- "os/exec"
-
- "git.sr.ht/~gabrielgio/midr/db"
-)
-
-func RunYtDlpProcess(entry *db.Entry) error {
- args := []string{entry.Link}
-
- output_template := fmt.Sprintf("%s/%%(title)s.%%(ext)s", entry.OutputFolder)
- args = append(args, "-o", output_template)
-
- downloaded_txt := fmt.Sprintf("%s/downloaded.txt", entry.OutputFolder)
- args = append(args, "--download-archive", downloaded_txt)
-
- if len(entry.DateAfter) > 0 {
- args = append(args, "--dateafter", entry.DateAfter)
- }
-
- cmd := exec.Command("yt-dlp", args...)
- return cmd.Run()
-}