blob: 40e61883362ee5374bb6b2086eb2a2da0432c649 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
|
(ns queue-api.db.core
(:require [datascript.core :as d]
[mount.core :as mount]
[clj-time.core :as time]))
(def schema {:agent/id {:db/unique :db.unique/identity}
:agent/primary-skillset {:db/cardinality :db.cardinality/many}
:agent/secondary-skillset {:db/cardinality :db.cardinality/many}
:agent/job {:db.valueType :db.type/ref}
:job/id {:db/unique :db.unique/identity}
:job/agent {:db.valueType :db.type/ref}})
(mount/defstate conn
:start (d/create-conn schema))
(defn add-agent
"Add an agent into the database"
[{:keys [id name primary-skillset secondary-skillset]}]
(d/transact! queue-api.db.core/conn
[{:agent/id id
:agent/name name
:agent/primary-skillset primary-skillset
:agent/secondary-skillset secondary-skillset}]))
(defn add-job
"Add a job into the database"
[{:keys [id type urgent]}]
(d/transact! queue-api.db.core/conn
[{:job/id id
:job/type type
:job/urgent urgent
:job/date (time/now)
:job/status :unassigned}]))
(defn agent-jobs
"Get a job that has a agent bounded with given `id` and a status of `s`"
[id s]
(let [q (d/q '[:find ?jid
:in $ ?id ?s
:where
[?e :agent/id ?id]
[?x :job/id ?jid]
[?x :job/status ?s]
[?x :job/agent ?e]]
@conn id s)]
(map #(d/entity @conn [:job/id (first %)]) q)))
(defn q-job
"Fetch job and sort by date
`u`: urgent flag
`s`: status of the job
`ts`: types of the job"
[u s & ts]
(let [q (->> (d/q '[:find ?d ?id
:in $ ?u ?s ?ts
:where
[?e :job/date ?d]
[?e :job/id ?id]
[?e :job/urgent ?u]
[?e :job/status ?s]
[?e :job/type ?t]
[(clojure.string/includes? ?ts ?t)]]
@conn u s ts)
(sort-by first))]
(map #(d/entity @conn [:job/id (last %)]) q)))
(defn q-status
"Query job filtering only by `s`"
[s]
(d/q '[:find ?id :in $ ?status
:where
[?e :job/status ?status]
[?e :job/id ?id]]
@conn s))
(defn sum-agent
"Get how many jobs a agent (`id`) has performed aggregated by type"
[id]
(let [jobs (agent-jobs id :completed)]
(->> (reduce (fn [l r]
(let [t (:job/type r)]
(if (nil? (get l t))
(conj l {t 1})
(conj l {t (inc (get l t))})))) {} jobs)
(map (fn [x]
{:type (first x)
:jobs (last x)})))))
(defn sum-queue
"Count all job aggregated by type"
[]
{:completed (map first (q-status :completed))
:processing (map first (q-status :processing))
:unassigned (map first (q-status :unassigned))})
(defn q-skillset
"First query for :unassigned jobs with skillset of `s` and flagged as urgent
if nothing was found query for not urgent ones."
[s]
(let [ju (q-job true :unassigned s)]
(if (not (empty? ju))
(first ju)
(let [jn (q-job false :unassigned s)]
(if (not (empty? jn))
(first jn))))))
(defn request-job
"Get the fittest job for a agent `id`."
[id]
(let [a (d/entity @conn [:agent/id id])]
(if (not (nil? a))
(let [p (q-skillset (:primary-skillset a))]
(if (not (nil? p))
p
(q-skillset (:secondary-skillset a)))))))
(defn dequeue-job [id]
)
|