1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
|
package worker
import (
"context"
"git.sr.ht/~gabrielgio/midr/yt"
work "git.sr.ht/~sircmpwn/dowork"
)
const (
statusNotQueued = "NOTQUEUED"
statusQueued = "QUEUED"
statusStarted = "RUNNING"
commandStart = "START"
commandEnqueue = "ENQUEUE"
commandDequeue = "DEQUEUE"
)
type command struct {
action string
index uint
}
type Worker struct {
jobs map[uint]string
c chan command
}
type Job struct {
Id uint
Status string
}
func NewWorkder() Worker {
return Worker{
c: make(chan command, 10),
jobs: make(map[uint]string),
}
}
func (w *Worker) CanEnqueue(index uint) bool {
v, found := w.jobs[index]
return !found || v == statusNotQueued
}
func (w *Worker) SpawnWorker(index uint, link string, output string) {
if !w.CanEnqueue(index) {
return
}
w.c <- command{action: commandEnqueue, index: index}
task := work.NewTask(func(ctx context.Context) error {
w.c <- command{action: commandStart, index: index}
yt.RunYtDlpProcess(link, output)
return nil
}).After(func(ctx context.Context, task *work.Task) {
w.c <- command{action: commandDequeue, index: index}
})
work.Enqueue(task)
}
func (w *Worker) startReader() {
for true {
command := <-w.c
if command.action == commandEnqueue {
w.jobs[command.index] = statusQueued
} else if command.action == commandStart {
w.jobs[command.index] = statusStarted
} else if command.action == commandDequeue {
w.jobs[command.index] = statusNotQueued
} else {
panic(1)
}
}
}
func (w *Worker) StartReader() {
go w.startReader()
}
func (w *Worker) GetJobs() []Job {
jobs := make([]Job, len(w.jobs))
count := 0
for k, v := range w.jobs {
jobs[count] = Job{Id: k, Status: v}
count++
}
return jobs
}
|