(ns queue-api.db.core (:require [datascript.core :as d] [mount.core :as mount] [clj-time.core :as time]) (:import (clojure.lang Keyword))) (def schema {:agent/id {:db/unique :db.unique/identity} :agent/primary-skillset {:db/cardinality :db.cardinality/many} :agent/secondary-skillset {:db/cardinality :db.cardinality/many} :agent/job {:db.valueType :db.type/ref} :job/id {:db/unique :db.unique/identity} :job/agent {:db.valueType :db.type/ref}}) (mount/defstate conn :start (d/create-conn schema)) (defn add-agent "Add an agent into the database" [{:keys [id name primary-skillset secondary-skillset]}] (d/transact! queue-api.db.core/conn [{:agent/id id :agent/name name :agent/primary-skillset primary-skillset :agent/secondary-skillset secondary-skillset}])) (defn add-job "Add a job into the database" [{:keys [id type urgent]}] (d/transact! queue-api.db.core/conn [{:job/id id :job/type type :job/urgent urgent :job/date (time/now) :job/status :unassigned}])) (defn agent-jobs "Get a job that has a agent bounded with given `id` and a status of `s`" [id s] (let [q (d/q '[:find ?jid :in $ ?id ?s :where [?e :agent/id ?id] [?x :job/id ?jid] [?x :job/status ?s] [?x :job/agent ?e]] @conn id s)] (map #(d/entity @conn [:job/id (first %)]) q))) (defn q-job "Fetch job and sort by date `u`: urgent flag `s`: status of the job `ts`: types of the job" [u s & ts] (let [q (->> (d/q '[:find ?d ?id :in $ ?u ?s ?ts :where [?e :job/date ?d] [?e :job/id ?id] [?e :job/urgent ?u] [?e :job/status ?s] [?e :job/type ?t] [(clojure.string/includes? ?ts ?t)]] @conn u s ts) (sort-by first))] (map #(d/entity @conn [:job/id (last %)]) q))) (defn q-status "Query job filtering only by `s`" [s] (d/q '[:find ?id :in $ ?status :where [?e :job/status ?status] [?e :job/id ?id]] @conn s)) (defn sum-agent "Get how many jobs a agent (`id`) has performed aggregated by type" [id] (let [jobs (agent-jobs id :completed)] (->> (reduce (fn [l r] (let [t (:job/type r)] (if (nil? (get l t)) (conj l {t 1}) (conj l {t (inc (get l t))})))) {} jobs) (map (fn [x] {:type (first x) :jobs (last x)}))))) (defn sum-queue "Count all job aggregated by type" [] {:completed (map first (q-status :completed)) :processing (map first (q-status :processing)) :unassigned (map first (q-status :unassigned))}) (defn q-skillset "First query for :unassigned jobs with skillset of `s` and flagged as urgent if nothing was found query for not urgent ones." [s] (let [ju (q-job true :unassigned s)] (if (not (empty? ju)) (first ju) (let [jn (q-job false :unassigned s)] (if (not (empty? jn)) (first jn)))))) (defn request-job "Get the fittest job for a agent `id`." [^String id] (let [a (d/entity @conn [:agent/id id])] (if (not (nil? a)) (let [p (q-skillset (:agent/primary-skillset a))] (if (not (nil? p)) p (q-skillset (:agent/secondary-skillset a))))))) (defn t-job ([^String id ^Keyword s] (d/transact! conn [{:job/id id :job/status s}])) ([^String id ^Keyword s ^String a] (d/transact! conn [{:job/id id :job/status s :job/agent [:agent/id a]}]))) (defn bind-agent "Bind an agent `a` to a job `j`" [^String a ^String j] (d/transact! conn [{:agent/id a :agent/job [:job/id j]}])) (defn end-job "Change job's status to :completed from a job bound to an agent `a`" [^String a] (let [jid (-> a :agent/job :job/id)] (if (not (nil? jid)) (t-job jid :completed)))) (defn start-job "Change status of a job `j` to :processing and bind it to an agent `a`" [^String a ^String j] (if (not (nil? j)) (do (t-job j :processing a) (bind-agent a j)))) (defn dequeue-job "Dequeue a job from a agent `id`" [^String id] (let [a (d/entity @conn [:agent/id id])] (if (not (nil? a)) (let [jid (-> (request-job id) :job/id)] (end-job a) (start-job id jid) {:job_id jid :agent_id id}))))