diff options
| author | Gabriel A. Giovanini <mail@gabrielgio.me> | 2022-07-22 15:25:27 +0200 | 
|---|---|---|
| committer | Gabriel A. Giovanini <mail@gabrielgio.me> | 2022-07-22 15:25:27 +0200 | 
| commit | 1e984fc8ced6a5915dbd7b6e17bd942e8438cf27 (patch) | |
| tree | 1dbb64f3438030e3b1a3a45e728e73ee74787279 | |
| parent | 90d9d819b70f68e10482954cfc461737c0165f8a (diff) | |
| download | mdir-1e984fc8ced6a5915dbd7b6e17bd942e8438cf27.tar.gz mdir-1e984fc8ced6a5915dbd7b6e17bd942e8438cf27.tar.bz2 mdir-1e984fc8ced6a5915dbd7b6e17bd942e8438cf27.zip | |
ref: Move the yt manager to the worker
Simplify the worker/manager relationship. Now the worker is responsible
for the managing the yt-dlp process as well.
Also introduce chan to report back logs. That is an attempt to decouple
things.
| -rw-r--r-- | README.md | 1 | ||||
| -rw-r--r-- | controller/controller.go | 18 | ||||
| -rw-r--r-- | worker/worker.go | 47 | ||||
| -rw-r--r-- | yt/manager.go | 25 | 
4 files changed, 57 insertions, 34 deletions
| @@ -2,4 +2,3 @@  This projects aim to automat a bit youtube-dl/yt-dlp downloads and learn some  go in the process. - diff --git a/controller/controller.go b/controller/controller.go index c7f4145..701d34c 100644 --- a/controller/controller.go +++ b/controller/controller.go @@ -1,8 +1,10 @@  package controller  import ( +	"log"  	"net/http"  	"strconv" +	"strings"  	"time"  	"git.sr.ht/~gabrielgio/midr/db" @@ -15,6 +17,13 @@ type Env struct {  	Worker  worker.Worker  } +func logBytes(logc <-chan []byte) { +	for l := range logc { +		logs := strings.TrimRight(string(l), "\t \n") +		log.Println(logs) +	} +} +  func (e *Env) GetEntries(c *gin.Context) {  	entries := e.Entries.All()  	c.HTML(http.StatusOK, "index", entries) @@ -41,7 +50,9 @@ func (e *Env) CreateEntry(c *gin.Context) {  	var entry db.Entry  	c.ShouldBind(&entry)  	e.Entries.Create(&entry) -	e.Worker.SpawnWorker(&entry) +	log := e.Worker.RunYtDlpWorker(&entry) +	go logBytes(log) +  	c.Redirect(http.StatusFound, "/")  } @@ -66,7 +77,10 @@ func (e *Env) StartScheduler() {  			entries := e.Entries.All()  			for _, entry := range entries { -				e.Worker.SpawnWorker(&entry) +				log := e.Worker.RunYtDlpWorker(&entry) +				if log != nil { +					go logBytes(log) +				}  			}  			time.Sleep(30 * time.Second)  		} diff --git a/worker/worker.go b/worker/worker.go index 2444e89..525a736 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -1,10 +1,13 @@  package worker  import ( +	"bufio" +	"bytes"  	"context" +	"fmt" +	"os/exec"  	"git.sr.ht/~gabrielgio/midr/db" -	"git.sr.ht/~gabrielgio/midr/yt"  	work "git.sr.ht/~sircmpwn/dowork"  ) @@ -40,6 +43,30 @@ func NewWorkder() Worker {  	}  } +func RunYtDlpProcess(entry *db.Entry) (error, []byte, []byte) { +	args := []string{entry.Link} +	var stdout bytes.Buffer +	var stderr bytes.Buffer + +	output_template := fmt.Sprintf("%s/%%(title)s.%%(ext)s", entry.OutputFolder) +	args = append(args, "-o", output_template) + +	downloaded_txt := fmt.Sprintf("%s/downloaded.txt", entry.OutputFolder) +	args = append(args, "--download-archive", downloaded_txt) + +	if len(entry.DateAfter) > 0 { +		args = append(args, "--dateafter", entry.DateAfter) +	} + +	cmd := exec.Command("yt-dlp", args...) +	cmd.Stdout = bufio.NewWriter(&stdout) +	cmd.Stderr = bufio.NewWriter(&stderr) + +	err := cmd.Run() + +	return err, stdout.Bytes(), stderr.Bytes() +} +  func (w *Worker) CanEnqueue(index uint) bool {  	v, found := w.jobs[index]  	return !found || v == statusNotQueued @@ -49,23 +76,31 @@ func (w *Worker) RemoveJob(id uint) {  	delete(w.jobs, id)  } -func (w *Worker) SpawnWorker(entry *db.Entry) { - +func (w *Worker) RunYtDlpWorker(entry *db.Entry) <-chan []byte {  	if !w.CanEnqueue(entry.ID) { -		return +		return nil  	} +	log := make(chan []byte) +  	w.c <- command{action: commandEnqueue, index: entry.ID}  	task := work.NewTask(func(ctx context.Context) error {  		w.c <- command{action: commandStart, index: entry.ID} -		yt.RunYtDlpProcess(entry) -		return nil +		err, stdout, stderr := RunYtDlpProcess(entry) + +		log <- stdout +		log <- stderr + +		return err  	}).After(func(ctx context.Context, task *work.Task) {  		w.c <- command{action: commandDequeue, index: entry.ID} +		close(log)  	})  	work.Enqueue(task) + +	return log  }  func (w *Worker) startReader() { diff --git a/yt/manager.go b/yt/manager.go deleted file mode 100644 index b9dc333..0000000 --- a/yt/manager.go +++ /dev/null @@ -1,25 +0,0 @@ -package yt - -import ( -	"fmt" -	"os/exec" - -	"git.sr.ht/~gabrielgio/midr/db" -) - -func RunYtDlpProcess(entry *db.Entry) error { -	args := []string{entry.Link} - -	output_template := fmt.Sprintf("%s/%%(title)s.%%(ext)s", entry.OutputFolder) -	args = append(args, "-o", output_template) - -	downloaded_txt := fmt.Sprintf("%s/downloaded.txt", entry.OutputFolder) -	args = append(args, "--download-archive", downloaded_txt) - -	if len(entry.DateAfter) > 0 { -		args = append(args, "--dateafter", entry.DateAfter) -	} - -	cmd := exec.Command("yt-dlp", args...) -	return cmd.Run() -} | 
