aboutsummaryrefslogtreecommitdiff
path: root/worker
diff options
context:
space:
mode:
Diffstat (limited to 'worker')
-rw-r--r--worker/worker.go22
1 files changed, 14 insertions, 8 deletions
diff --git a/worker/worker.go b/worker/worker.go
index a8f1518..2444e89 100644
--- a/worker/worker.go
+++ b/worker/worker.go
@@ -3,6 +3,7 @@ package worker
import (
"context"
+ "git.sr.ht/~gabrielgio/midr/db"
"git.sr.ht/~gabrielgio/midr/yt"
work "git.sr.ht/~sircmpwn/dowork"
)
@@ -34,7 +35,7 @@ type Job struct {
func NewWorkder() Worker {
return Worker{
- c: make(chan command, 10),
+ c: make(chan command),
jobs: make(map[uint]string),
}
}
@@ -44,26 +45,31 @@ func (w *Worker) CanEnqueue(index uint) bool {
return !found || v == statusNotQueued
}
-func (w *Worker) SpawnWorker(index uint, link string, output string) {
+func (w *Worker) RemoveJob(id uint) {
+ delete(w.jobs, id)
+}
+
+func (w *Worker) SpawnWorker(entry *db.Entry) {
- if !w.CanEnqueue(index) {
+ if !w.CanEnqueue(entry.ID) {
return
}
- w.c <- command{action: commandEnqueue, index: index}
+ w.c <- command{action: commandEnqueue, index: entry.ID}
task := work.NewTask(func(ctx context.Context) error {
- w.c <- command{action: commandStart, index: index}
- yt.RunYtDlpProcess(link, output)
+
+ w.c <- command{action: commandStart, index: entry.ID}
+ yt.RunYtDlpProcess(entry)
return nil
}).After(func(ctx context.Context, task *work.Task) {
- w.c <- command{action: commandDequeue, index: index}
+ w.c <- command{action: commandDequeue, index: entry.ID}
})
work.Enqueue(task)
}
func (w *Worker) startReader() {
- for true {
+ for {
command := <-w.c
if command.action == commandEnqueue {