From 1e984fc8ced6a5915dbd7b6e17bd942e8438cf27 Mon Sep 17 00:00:00 2001 From: "Gabriel A. Giovanini" Date: Fri, 22 Jul 2022 15:25:27 +0200 Subject: ref: Move the yt manager to the worker Simplify the worker/manager relationship. Now the worker is responsible for the managing the yt-dlp process as well. Also introduce chan to report back logs. That is an attempt to decouple things. --- README.md | 1 - controller/controller.go | 18 ++++++++++++++++-- worker/worker.go | 47 +++++++++++++++++++++++++++++++++++++++++------ yt/manager.go | 25 ------------------------- 4 files changed, 57 insertions(+), 34 deletions(-) delete mode 100644 yt/manager.go diff --git a/README.md b/README.md index e19fa46..7c89770 100644 --- a/README.md +++ b/README.md @@ -2,4 +2,3 @@ This projects aim to automat a bit youtube-dl/yt-dlp downloads and learn some go in the process. - diff --git a/controller/controller.go b/controller/controller.go index c7f4145..701d34c 100644 --- a/controller/controller.go +++ b/controller/controller.go @@ -1,8 +1,10 @@ package controller import ( + "log" "net/http" "strconv" + "strings" "time" "git.sr.ht/~gabrielgio/midr/db" @@ -15,6 +17,13 @@ type Env struct { Worker worker.Worker } +func logBytes(logc <-chan []byte) { + for l := range logc { + logs := strings.TrimRight(string(l), "\t \n") + log.Println(logs) + } +} + func (e *Env) GetEntries(c *gin.Context) { entries := e.Entries.All() c.HTML(http.StatusOK, "index", entries) @@ -41,7 +50,9 @@ func (e *Env) CreateEntry(c *gin.Context) { var entry db.Entry c.ShouldBind(&entry) e.Entries.Create(&entry) - e.Worker.SpawnWorker(&entry) + log := e.Worker.RunYtDlpWorker(&entry) + go logBytes(log) + c.Redirect(http.StatusFound, "/") } @@ -66,7 +77,10 @@ func (e *Env) StartScheduler() { entries := e.Entries.All() for _, entry := range entries { - e.Worker.SpawnWorker(&entry) + log := e.Worker.RunYtDlpWorker(&entry) + if log != nil { + go logBytes(log) + } } time.Sleep(30 * time.Second) } diff --git a/worker/worker.go b/worker/worker.go index 2444e89..525a736 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -1,10 +1,13 @@ package worker import ( + "bufio" + "bytes" "context" + "fmt" + "os/exec" "git.sr.ht/~gabrielgio/midr/db" - "git.sr.ht/~gabrielgio/midr/yt" work "git.sr.ht/~sircmpwn/dowork" ) @@ -40,6 +43,30 @@ func NewWorkder() Worker { } } +func RunYtDlpProcess(entry *db.Entry) (error, []byte, []byte) { + args := []string{entry.Link} + var stdout bytes.Buffer + var stderr bytes.Buffer + + output_template := fmt.Sprintf("%s/%%(title)s.%%(ext)s", entry.OutputFolder) + args = append(args, "-o", output_template) + + downloaded_txt := fmt.Sprintf("%s/downloaded.txt", entry.OutputFolder) + args = append(args, "--download-archive", downloaded_txt) + + if len(entry.DateAfter) > 0 { + args = append(args, "--dateafter", entry.DateAfter) + } + + cmd := exec.Command("yt-dlp", args...) + cmd.Stdout = bufio.NewWriter(&stdout) + cmd.Stderr = bufio.NewWriter(&stderr) + + err := cmd.Run() + + return err, stdout.Bytes(), stderr.Bytes() +} + func (w *Worker) CanEnqueue(index uint) bool { v, found := w.jobs[index] return !found || v == statusNotQueued @@ -49,23 +76,31 @@ func (w *Worker) RemoveJob(id uint) { delete(w.jobs, id) } -func (w *Worker) SpawnWorker(entry *db.Entry) { - +func (w *Worker) RunYtDlpWorker(entry *db.Entry) <-chan []byte { if !w.CanEnqueue(entry.ID) { - return + return nil } + log := make(chan []byte) + w.c <- command{action: commandEnqueue, index: entry.ID} task := work.NewTask(func(ctx context.Context) error { w.c <- command{action: commandStart, index: entry.ID} - yt.RunYtDlpProcess(entry) - return nil + err, stdout, stderr := RunYtDlpProcess(entry) + + log <- stdout + log <- stderr + + return err }).After(func(ctx context.Context, task *work.Task) { w.c <- command{action: commandDequeue, index: entry.ID} + close(log) }) work.Enqueue(task) + + return log } func (w *Worker) startReader() { diff --git a/yt/manager.go b/yt/manager.go deleted file mode 100644 index b9dc333..0000000 --- a/yt/manager.go +++ /dev/null @@ -1,25 +0,0 @@ -package yt - -import ( - "fmt" - "os/exec" - - "git.sr.ht/~gabrielgio/midr/db" -) - -func RunYtDlpProcess(entry *db.Entry) error { - args := []string{entry.Link} - - output_template := fmt.Sprintf("%s/%%(title)s.%%(ext)s", entry.OutputFolder) - args = append(args, "-o", output_template) - - downloaded_txt := fmt.Sprintf("%s/downloaded.txt", entry.OutputFolder) - args = append(args, "--download-archive", downloaded_txt) - - if len(entry.DateAfter) > 0 { - args = append(args, "--dateafter", entry.DateAfter) - } - - cmd := exec.Command("yt-dlp", args...) - return cmd.Run() -} -- cgit v1.2.3