aboutsummaryrefslogtreecommitdiff
path: root/worker/worker.go
blob: f56716f0b7426fcba4407da26eb8150f8dbd1986 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
package worker

import (
	"context"
	"time"

	"git.sr.ht/~gabrielgio/midr/db"
	"git.sr.ht/~gabrielgio/midr/yt"
	work "git.sr.ht/~sircmpwn/dowork"
)

const (
	statusNotQueued = "NOTQUEUED"
	statusQueued    = "QUEUED"
	statusStarted   = "RUNNING"

	commandStart   = "START"
	commandEnqueue = "ENQUEUE"
	commandDequeue = "DEQUEUE"
)

type command struct {
	action string
	index  uint
}

type Worker struct {
	jobs map[uint]string
	c    chan command
}

type Job struct {
	Id     uint
	Status string
}

func (w *Worker) CanEnqueue(index uint) bool {
	v, found := w.jobs[index]
	return !found || v == statusNotQueued
}

func (w *Worker) SpawnWorker(index uint, link string, output string) {

	if !w.CanEnqueue(index) {
		return
	}

	w.c <- command{action: commandEnqueue, index: index}
	task := work.NewTask(func(ctx context.Context) error {
		w.c <- command{action: commandStart, index: index}
		yt.RunYtDlpProcess(link, output)
		return nil
	}).After(func(ctx context.Context, task *work.Task) {
		w.c <- command{action: commandDequeue, index: index}
	})

	work.Enqueue(task)
}

func (w *Worker) startReader() {
	for true {
		command := <-w.c

		if command.action == commandEnqueue {
			w.jobs[command.index] = statusQueued
		} else if command.action == commandStart {
			w.jobs[command.index] = statusStarted
		} else if command.action == commandDequeue {
			w.jobs[command.index] = statusNotQueued
		} else {
			panic(1)
		}
	}
}

func (w *Worker) startScheduler(model db.EntryModel) {
	for true {
		entries := model.All()
		for _, e := range entries {
			w.SpawnWorker(e.ID, e.Link, e.OutputFolder)
		}
		time.Sleep(30 * time.Minute)
	}
}

func (w *Worker) StartWorker(model db.EntryModel) {
	w.c = make(chan command, 10)
	w.jobs = make(map[uint]string)
	go w.startReader()
	go w.startScheduler(model)
}

func (w *Worker) GetJobs() []Job {
	jobs := []Job{}

	for k, v := range w.jobs {
		jobs = append(jobs, Job{Id: k, Status: v})
	}

	return jobs
}