diff options
Diffstat (limited to 'worker')
| -rw-r--r-- | worker/worker.go | 76 | 
1 files changed, 76 insertions, 0 deletions
| diff --git a/worker/worker.go b/worker/worker.go new file mode 100644 index 0000000..5e0c844 --- /dev/null +++ b/worker/worker.go @@ -0,0 +1,76 @@ +package worker + +import ( +	"context" +	"time" + +	"git.sr.ht/~gabrielgio/midr/db" +	"git.sr.ht/~gabrielgio/midr/yt" +	work "git.sr.ht/~sircmpwn/dowork" +) + +const ( +	statusStoped  = "STOPPED" +	statusStarted = "STARTED" + +	commandStart = "START" +	commandStop  = "STOP" +) + +type command struct { +	action string +	index  uint +} + +type Worker struct { +	jobs map[uint]string +	c    chan command +} + +func (w *Worker) SpawnWorker(index uint, link string, output string) { + +	if v, found := w.jobs[index]; found && v == statusStarted { +		return +	} + +	w.c <- command{action: commandStart, index: index} +	task := work.NewTask(func(ctx context.Context) error { +		yt.RunYtDlpProcess(link, output) +		return nil +	}).After(func(ctx context.Context, task *work.Task) { +		w.c <- command{action: commandStop, index: index} +	}) + +	work.Enqueue(task) +} + +func (w *Worker) startReader() { +	for true { +		command := <-w.c + +		if command.action == commandStop { +			w.jobs[command.index] = statusStoped +		} else if command.action == commandStart { +			w.jobs[command.index] = statusStarted +		} else { +			panic(1) +		} +	} +} + +func (w *Worker) startScheduler(model db.EntryModel) { +	for true { +		entries := model.All() +		for _, e := range entries { +			w.SpawnWorker(e.ID, e.Link, e.OutputFolder) +		} +		time.Sleep(30 * time.Minute) +	} +} + +func (w *Worker) StartWorker(model db.EntryModel) { +	w.c = make(chan command, 10) +	w.jobs = make(map[uint]string) +	go w.startReader() +	go w.startScheduler(model) +} | 
