From db7e822dda56d32135eca6d3e3211a50cf93d31a Mon Sep 17 00:00:00 2001 From: "Gabriel A. Giovanini" Date: Thu, 16 Jun 2022 22:09:39 +0200 Subject: feat: Add one more state Add a new state to the worker so it can better report what is happening. Also added a status report in the index page. --- worker/worker.go | 43 ++++++++++++++++++++++++++++++++++--------- 1 file changed, 34 insertions(+), 9 deletions(-) (limited to 'worker') diff --git a/worker/worker.go b/worker/worker.go index 5e0c844..f56716f 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -10,11 +10,13 @@ import ( ) const ( - statusStoped = "STOPPED" - statusStarted = "STARTED" + statusNotQueued = "NOTQUEUED" + statusQueued = "QUEUED" + statusStarted = "RUNNING" - commandStart = "START" - commandStop = "STOP" + commandStart = "START" + commandEnqueue = "ENQUEUE" + commandDequeue = "DEQUEUE" ) type command struct { @@ -27,18 +29,29 @@ type Worker struct { c chan command } +type Job struct { + Id uint + Status string +} + +func (w *Worker) CanEnqueue(index uint) bool { + v, found := w.jobs[index] + return !found || v == statusNotQueued +} + func (w *Worker) SpawnWorker(index uint, link string, output string) { - if v, found := w.jobs[index]; found && v == statusStarted { + if !w.CanEnqueue(index) { return } - w.c <- command{action: commandStart, index: index} + w.c <- command{action: commandEnqueue, index: index} task := work.NewTask(func(ctx context.Context) error { + w.c <- command{action: commandStart, index: index} yt.RunYtDlpProcess(link, output) return nil }).After(func(ctx context.Context, task *work.Task) { - w.c <- command{action: commandStop, index: index} + w.c <- command{action: commandDequeue, index: index} }) work.Enqueue(task) @@ -48,10 +61,12 @@ func (w *Worker) startReader() { for true { command := <-w.c - if command.action == commandStop { - w.jobs[command.index] = statusStoped + if command.action == commandEnqueue { + w.jobs[command.index] = statusQueued } else if command.action == commandStart { w.jobs[command.index] = statusStarted + } else if command.action == commandDequeue { + w.jobs[command.index] = statusNotQueued } else { panic(1) } @@ -74,3 +89,13 @@ func (w *Worker) StartWorker(model db.EntryModel) { go w.startReader() go w.startScheduler(model) } + +func (w *Worker) GetJobs() []Job { + jobs := []Job{} + + for k, v := range w.jobs { + jobs = append(jobs, Job{Id: k, Status: v}) + } + + return jobs +} -- cgit v1.2.3