(ns queue-api.db.core (:require [datascript.core :as d] [mount.core :as mount] [clj-time.core :as time]) (:import (clojure.lang Keyword)) (:refer-clojure :exclude [agent])) (def schema {:agent/id {:db/unique :db.unique/identity} :agent/primary-skillset {:db/cardinality :db.cardinality/many} :agent/secondary-skillset {:db/cardinality :db.cardinality/many} :agent/job {:db.valueType :db.type/ref} :job/id {:db/unique :db.unique/identity} :job/agent {:db.valueType :db.type/ref}}) (mount/defstate conn :start (d/create-conn schema)) (defn add-agent "Add an agent into the database" [{:keys [id name primary_skillset secondary_skillset]}] (d/transact! conn [{:agent/id id :agent/name name :agent/primary-skillset primary_skillset :agent/secondary-skillset secondary_skillset}])) (defn add-job "Add a job into the database" [{:keys [id type urgent]}] (d/transact! conn [{:job/id id :job/type type :job/urgent urgent :job/date (time/now) :job/status :unassigned}])) (defn agent-jobs "Get a job that has a agent bounded with given `id` and a status of `s`" [id s] (map #(d/entity @conn [:job/id (first %)]) (d/q '[:find ?jid :in $ ?id ?s :where [?e :agent/id ?id] [?x :job/id ?jid] [?x :job/status ?s] [?x :job/agent ?e]] @conn id s))) (defn query-job "Query jobs and sort them by date `u`: urgent flag `s`: status of the job `ts`: types of the job" [u s & ts] (->> (d/q '[:find ?d ?id :in $ ?u ?s ?ts :where [?e :job/date ?d] [?e :job/id ?id] [?e :job/urgent ?u] [?e :job/status ?s] [?e :job/type ?t] [(clojure.string/includes? ?ts ?t)]] @conn u s ts) (sort-by first) (map #(d/entity @conn [:job/id (last %)])))) (defn query-status "Query job by status `s`" [s] (d/q '[:find ?id :in $ ?status :where [?e :job/status ?status] [?e :job/id ?id]] @conn s)) (defn sum-agent "Get how many jobs a agent `id` has performed aggregated by type" [id] (let [jobs (agent-jobs id :completed)] (reduce (fn [l r] (let [t (:job/type r)] (if (nil? (get l t)) (conj l {t 1}) (conj l {t (inc (get l t))})))) {} jobs))) (defn sum-queue "List all jobs aggregated by status" [] {:completed (map first (query-status :completed)) :processing (map first (query-status :processing)) :unassigned (map first (query-status :unassigned))}) (defn query-skillset "First query for :unassigned jobs with skillset of `s` and flagged as urgent if nothing was found query for not urgent ones." [s] (let [ju (query-job true :unassigned s)] (if (seq ju) (first ju) (let [jn (query-job false :unassigned s)] (if (seq jn) (first jn)))))) (defn agent "Get a agent by id" [id] (d/entity @conn [:agent/id id])) (defn job "Get a job by id" [id] (d/entity @conn [:job/id id])) (defn fittest-job "Get the fittest job for a agent `id`." [^String id] (let [a (d/entity @conn [:agent/id id])] (if (not (nil? a)) (let [p (query-skillset (:agent/primary-skillset a))] (if-not (nil? p) p (query-skillset (:agent/secondary-skillset a))))))) (defn transact-job "Transact status `s` and agent `a` of a job `id`" ([^String id ^Keyword s] (d/transact! conn [{:job/id id :job/status s}])) ([^String id ^Keyword s ^String a] (d/transact! conn [{:job/id id :job/status s :job/agent [:agent/id a]}]))) (defn bind-agent "Bind an agent `a` to a job `j`" [^String a ^String j] (d/transact! conn [{:agent/id a :agent/job [:job/id j]}])) (defn end-job "Change job's status to :completed from a job bound to an agent `a`" [^String a] (let [jid (-> a :agent/job :job/id)] (if (not (nil? jid)) (transact-job jid :completed)))) (defn start-job "Change status of a job `j` to :processing and bind it to an agent `a`" [^String a ^String j] (when-not (nil? j) (transact-job j :processing a) (bind-agent a j))) (defn dequeue-job "Dequeue a job to a agent `id`" [^String id] (let [a (d/entity @conn [:agent/id id])] (if-not (nil? a) (locking (Object.) (let [jid (:job/id (fittest-job id))] (end-job a) (start-job id jid) jid)))))