(ns queue-api.db.core (:require [datascript.core :as d] [mount.core :as mount] [clj-time.core :as time])) (def schema {:agent/id {:db/unique :db.unique/identity} :agent/primary-skillset {:db/cardinality :db.cardinality/many} :agent/secondary-skillset {:db/cardinality :db.cardinality/many} :agent/job {:db.valueType :db.type/ref} :job/id {:db/unique :db.unique/identity} :job/agent {:db.valueType :db.type/ref}}) (mount/defstate conn :start (d/create-conn schema)) (defn add-agent "Add an agent into the database" [{:keys [id name primary-skillset secondary-skillset]}] (d/transact! queue-api.db.core/conn [{:agent/id id :agent/name name :agent/primary-skillset primary-skillset :agent/secondary-skillset secondary-skillset}])) (defn add-job "Add a job into the database" [{:keys [id type urgent]}] (d/transact! queue-api.db.core/conn [{:job/id id :job/type type :job/urgent urgent :job/date (time/now) :job/status :unassigned}])) (defn agent-jobs "Get a job that has a agent bounded with given `id` and a status of `s`" [id s] (let [q (d/q '[:find ?jid :in $ ?id ?s :where [?e :agent/id ?id] [?x :job/id ?jid] [?x :job/status ?s] [?x :job/agent ?e]] @conn id s)] (map #(d/entity @conn [:job/id (first %)]) q))) (defn q-job "Fetch job and sort by date `u`: urgent flag `s`: status of the job `ts`: types of the job" [u s & ts] (let [q (->> (d/q '[:find ?d ?id :in $ ?u ?s ?ts :where [?e :job/date ?d] [?e :job/id ?id] [?e :job/urgent ?u] [?e :job/status ?s] [?e :job/type ?t] [(clojure.string/includes? ?ts ?t)]] @conn u s ts) (sort-by first))] (map #(d/entity @conn [:job/id (last %)]) q))) (defn q-status "Query job filtering only by `s`" [s] (d/q '[:find ?id :in $ ?status :where [?e :job/status ?status] [?e :job/id ?id]] @conn s)) (defn sum-agent "Get how many jobs a agent (`id`) has performed aggregated by type" [id] (let [jobs (agent-jobs id :completed)] (->> (reduce (fn [l r] (let [t (:job/type r)] (if (nil? (get l t)) (conj l {t 1}) (conj l {t (inc (get l t))})))) {} jobs) (map (fn [x] {:type (first x) :jobs (last x)}))))) (defn sum-queue "Count all job aggregated by type" [] {:completed (map first (q-status :completed)) :processing (map first (q-status :processing)) :unassigned (map first (q-status :unassigned))}) (defn q-skillset "First query for :unassigned jobs with skillset of `s` and flagged as urgent if nothing was found query for not urgent ones." [s] (let [ju (q-job true :unassigned s)] (if (not (empty? ju)) (first ju) (let [jn (q-job false :unassigned s)] (if (not (empty? jn)) (first jn)))))) (defn request-job "Get the fittest job for a agent `id`." [id] (let [a (d/entity @conn [:agent/id id])] (if (not (nil? a)) (let [p (q-skillset (:agent/primary-skillset a))] (if (not (nil? p)) p (q-skillset (:agent/secondary-skillset a))))))) (defn t-job ([id s] (d/transact! conn [{:job/id id :job/status s}])) ([id s a] (d/transact! conn [{:job/id id :job/status s :job/agent [:agent/id a]}]))) (defn t-agent [id j] (d/transact! conn [{:agent/id id :agent/job [:job/id j]}])) (defn end-job [a] (let [jid (-> a :agent/job :job/id)] (if (not (nil? jid)) (t-job jid :completed)))) (defn t-agent-job-status [a j] (if (not (nil? j)) (do (t-job j :processing a) (t-agent a j) {:job_request {:job_id j :agent_id a}}) {:job_request {:job_id nil :agent_id a}})) (defn dequeue-job [id] (let [j (request-job id) a (d/entity @conn [:agent/id id])] (if (not (nil? a)) (do (end-job a) (t-agent-job-status (-> a :agent/id) (-> j :job/id))))))