aboutsummaryrefslogtreecommitdiff
path: root/worker/worker.go
diff options
context:
space:
mode:
Diffstat (limited to 'worker/worker.go')
-rw-r--r--worker/worker.go43
1 files changed, 34 insertions, 9 deletions
diff --git a/worker/worker.go b/worker/worker.go
index 5e0c844..f56716f 100644
--- a/worker/worker.go
+++ b/worker/worker.go
@@ -10,11 +10,13 @@ import (
)
const (
- statusStoped = "STOPPED"
- statusStarted = "STARTED"
+ statusNotQueued = "NOTQUEUED"
+ statusQueued = "QUEUED"
+ statusStarted = "RUNNING"
- commandStart = "START"
- commandStop = "STOP"
+ commandStart = "START"
+ commandEnqueue = "ENQUEUE"
+ commandDequeue = "DEQUEUE"
)
type command struct {
@@ -27,18 +29,29 @@ type Worker struct {
c chan command
}
+type Job struct {
+ Id uint
+ Status string
+}
+
+func (w *Worker) CanEnqueue(index uint) bool {
+ v, found := w.jobs[index]
+ return !found || v == statusNotQueued
+}
+
func (w *Worker) SpawnWorker(index uint, link string, output string) {
- if v, found := w.jobs[index]; found && v == statusStarted {
+ if !w.CanEnqueue(index) {
return
}
- w.c <- command{action: commandStart, index: index}
+ w.c <- command{action: commandEnqueue, index: index}
task := work.NewTask(func(ctx context.Context) error {
+ w.c <- command{action: commandStart, index: index}
yt.RunYtDlpProcess(link, output)
return nil
}).After(func(ctx context.Context, task *work.Task) {
- w.c <- command{action: commandStop, index: index}
+ w.c <- command{action: commandDequeue, index: index}
})
work.Enqueue(task)
@@ -48,10 +61,12 @@ func (w *Worker) startReader() {
for true {
command := <-w.c
- if command.action == commandStop {
- w.jobs[command.index] = statusStoped
+ if command.action == commandEnqueue {
+ w.jobs[command.index] = statusQueued
} else if command.action == commandStart {
w.jobs[command.index] = statusStarted
+ } else if command.action == commandDequeue {
+ w.jobs[command.index] = statusNotQueued
} else {
panic(1)
}
@@ -74,3 +89,13 @@ func (w *Worker) StartWorker(model db.EntryModel) {
go w.startReader()
go w.startScheduler(model)
}
+
+func (w *Worker) GetJobs() []Job {
+ jobs := []Job{}
+
+ for k, v := range w.jobs {
+ jobs = append(jobs, Job{Id: k, Status: v})
+ }
+
+ return jobs
+}