feat: use priority queue for task queueing
[poolifier.git] / tests / pools / worker-node.test.mjs
CommitLineData
9974369e 1import { Worker as ClusterWorker } from 'node:cluster'
ded253e2
JB
2import { MessageChannel, Worker as ThreadWorker } from 'node:worker_threads'
3
a074ffee 4import { expect } from 'expect'
ded253e2 5
d35e5717 6import { CircularArray } from '../../lib/circular-array.cjs'
ded253e2
JB
7import { WorkerTypes } from '../../lib/index.cjs'
8import { WorkerNode } from '../../lib/pools/worker-node.cjs'
95d1a734 9import { PriorityQueue } from '../../lib/priority-queue.cjs'
d35e5717 10import { DEFAULT_TASK_NAME } from '../../lib/utils.cjs'
26fb3c18
JB
11
12describe('Worker node test suite', () => {
c3719753
JB
13 const threadWorkerNode = new WorkerNode(
14 WorkerTypes.thread,
15 './tests/worker-files/thread/testWorker.mjs',
16 { tasksQueueBackPressureSize: 12 }
17 )
18 const clusterWorkerNode = new WorkerNode(
19 WorkerTypes.cluster,
d35e5717 20 './tests/worker-files/cluster/testWorker.cjs',
c3719753
JB
21 { tasksQueueBackPressureSize: 12 }
22 )
26fb3c18
JB
23
24 it('Worker node instantiation', () => {
948faff7 25 expect(() => new WorkerNode()).toThrow(
c3719753 26 new TypeError('Cannot construct a worker node without a worker type')
26fb3c18 27 )
c3719753
JB
28 expect(
29 () =>
30 new WorkerNode(
31 'invalidWorkerType',
32 './tests/worker-files/thread/testWorker.mjs',
33 { tasksQueueBackPressureSize: 12 }
34 )
35 ).toThrow(
36 new TypeError(
37 "Cannot construct a worker node with an invalid worker type 'invalidWorkerType'"
38 )
39 )
40 expect(
41 () =>
42 new WorkerNode(
43 WorkerTypes.thread,
44 './tests/worker-files/thread/testWorker.mjs'
45 )
46 ).toThrow(
26fb3c18 47 new TypeError(
c3719753 48 'Cannot construct a worker node without worker node options'
26fb3c18
JB
49 )
50 )
51 expect(
c3719753
JB
52 () =>
53 new WorkerNode(
54 WorkerTypes.thread,
55 './tests/worker-files/thread/testWorker.mjs',
56 ''
57 )
948faff7 58 ).toThrow(
26fb3c18 59 new TypeError(
c3719753 60 'Cannot construct a worker node with invalid options: must be a plain object'
26fb3c18
JB
61 )
62 )
c3719753
JB
63 expect(
64 () =>
65 new WorkerNode(
66 WorkerTypes.thread,
67 './tests/worker-files/thread/testWorker.mjs',
68 {}
69 )
70 ).toThrow(
5b49e864 71 new TypeError(
c3719753 72 'Cannot construct a worker node without a tasks queue back pressure size option'
5b49e864
JB
73 )
74 )
c3719753
JB
75 expect(
76 () =>
77 new WorkerNode(
78 WorkerTypes.thread,
79 './tests/worker-files/thread/testWorker.mjs',
80 { tasksQueueBackPressureSize: 'invalidTasksQueueBackPressureSize' }
81 )
82 ).toThrow(
83 new TypeError(
84 'Cannot construct a worker node with a tasks queue back pressure size option that is not an integer'
85 )
86 )
87 expect(
88 () =>
89 new WorkerNode(
90 WorkerTypes.thread,
91 './tests/worker-files/thread/testWorker.mjs',
92 { tasksQueueBackPressureSize: 0.2 }
93 )
94 ).toThrow(
95 new TypeError(
96 'Cannot construct a worker node with a tasks queue back pressure size option that is not an integer'
97 )
98 )
99 expect(
100 () =>
101 new WorkerNode(
102 WorkerTypes.thread,
103 './tests/worker-files/thread/testWorker.mjs',
104 { tasksQueueBackPressureSize: 0 }
105 )
106 ).toThrow(
5b49e864 107 new RangeError(
c3719753 108 'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer'
5b49e864
JB
109 )
110 )
c3719753
JB
111 expect(
112 () =>
113 new WorkerNode(
114 WorkerTypes.thread,
115 './tests/worker-files/thread/testWorker.mjs',
116 { tasksQueueBackPressureSize: -1 }
117 )
118 ).toThrow(
5b49e864 119 new RangeError(
c3719753 120 'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer'
5b49e864
JB
121 )
122 )
75de9f41 123 expect(threadWorkerNode).toBeInstanceOf(WorkerNode)
9974369e 124 expect(threadWorkerNode.worker).toBeInstanceOf(ThreadWorker)
75de9f41 125 expect(threadWorkerNode.info).toStrictEqual({
c3719753 126 id: threadWorkerNode.worker.threadId,
26fb3c18
JB
127 type: WorkerTypes.thread,
128 dynamic: false,
5eb72b9e
JB
129 ready: false,
130 stealing: false
26fb3c18 131 })
75de9f41
JB
132 expect(threadWorkerNode.usage).toStrictEqual({
133 tasks: {
134 executed: 0,
135 executing: 0,
136 queued: 0,
137 maxQueued: 0,
463226a4 138 sequentiallyStolen: 0,
75de9f41
JB
139 stolen: 0,
140 failed: 0
141 },
142 runTime: {
4ba4c7f9 143 history: new CircularArray()
75de9f41
JB
144 },
145 waitTime: {
4ba4c7f9 146 history: new CircularArray()
75de9f41
JB
147 },
148 elu: {
149 idle: {
4ba4c7f9 150 history: new CircularArray()
75de9f41
JB
151 },
152 active: {
4ba4c7f9 153 history: new CircularArray()
75de9f41
JB
154 }
155 }
156 })
157 expect(threadWorkerNode.messageChannel).toBeInstanceOf(MessageChannel)
158 expect(threadWorkerNode.tasksQueueBackPressureSize).toBe(12)
95d1a734 159 expect(threadWorkerNode.tasksQueue).toBeInstanceOf(PriorityQueue)
75de9f41 160 expect(threadWorkerNode.tasksQueue.size).toBe(0)
68f1f531
JB
161 expect(threadWorkerNode.tasksQueueSize()).toBe(
162 threadWorkerNode.tasksQueue.size
163 )
164 expect(threadWorkerNode.onBackPressureStarted).toBe(false)
75de9f41
JB
165 expect(threadWorkerNode.taskFunctionsUsage).toBeInstanceOf(Map)
166
167 expect(clusterWorkerNode).toBeInstanceOf(WorkerNode)
9974369e 168 expect(clusterWorkerNode.worker).toBeInstanceOf(ClusterWorker)
75de9f41 169 expect(clusterWorkerNode.info).toStrictEqual({
c3719753 170 id: clusterWorkerNode.worker.id,
75de9f41
JB
171 type: WorkerTypes.cluster,
172 dynamic: false,
5eb72b9e
JB
173 ready: false,
174 stealing: false
75de9f41
JB
175 })
176 expect(clusterWorkerNode.usage).toStrictEqual({
26fb3c18
JB
177 tasks: {
178 executed: 0,
179 executing: 0,
180 queued: 0,
181 maxQueued: 0,
463226a4 182 sequentiallyStolen: 0,
26fb3c18
JB
183 stolen: 0,
184 failed: 0
185 },
186 runTime: {
4ba4c7f9 187 history: new CircularArray()
26fb3c18
JB
188 },
189 waitTime: {
4ba4c7f9 190 history: new CircularArray()
26fb3c18
JB
191 },
192 elu: {
193 idle: {
4ba4c7f9 194 history: new CircularArray()
26fb3c18
JB
195 },
196 active: {
4ba4c7f9 197 history: new CircularArray()
26fb3c18
JB
198 }
199 }
200 })
75de9f41
JB
201 expect(clusterWorkerNode.messageChannel).toBeUndefined()
202 expect(clusterWorkerNode.tasksQueueBackPressureSize).toBe(12)
95d1a734 203 expect(clusterWorkerNode.tasksQueue).toBeInstanceOf(PriorityQueue)
75de9f41 204 expect(clusterWorkerNode.tasksQueue.size).toBe(0)
68f1f531
JB
205 expect(clusterWorkerNode.tasksQueueSize()).toBe(
206 clusterWorkerNode.tasksQueue.size
207 )
208 expect(clusterWorkerNode.onBackPressureStarted).toBe(false)
75de9f41 209 expect(clusterWorkerNode.taskFunctionsUsage).toBeInstanceOf(Map)
26fb3c18
JB
210 })
211
212 it('Worker node getTaskFunctionWorkerUsage()', () => {
213 expect(() =>
75de9f41 214 threadWorkerNode.getTaskFunctionWorkerUsage('invalidTaskFunction')
948faff7 215 ).toThrow(
26fb3c18 216 new TypeError(
31847469 217 "Cannot get task function worker usage for task function name 'invalidTaskFunction' when task function properties list is not yet defined"
26fb3c18
JB
218 )
219 )
31847469
JB
220 threadWorkerNode.info.taskFunctionsProperties = [
221 { name: DEFAULT_TASK_NAME },
222 { name: 'fn1' }
223 ]
26fb3c18 224 expect(() =>
75de9f41 225 threadWorkerNode.getTaskFunctionWorkerUsage('invalidTaskFunction')
948faff7 226 ).toThrow(
26fb3c18 227 new TypeError(
31847469 228 "Cannot get task function worker usage for task function name 'invalidTaskFunction' when task function properties list has less than 3 elements"
26fb3c18
JB
229 )
230 )
31847469
JB
231 threadWorkerNode.info.taskFunctionsProperties = [
232 { name: DEFAULT_TASK_NAME },
233 { name: 'fn1' },
234 { name: 'fn2' }
235 ]
6cd5248f 236 expect(
75de9f41 237 threadWorkerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
6cd5248f 238 ).toStrictEqual({
26fb3c18
JB
239 tasks: {
240 executed: 0,
241 executing: 0,
242 queued: 0,
463226a4 243 sequentiallyStolen: 0,
5ad42e34 244 stolen: 0,
26fb3c18
JB
245 failed: 0
246 },
247 runTime: {
4ba4c7f9 248 history: new CircularArray()
26fb3c18
JB
249 },
250 waitTime: {
4ba4c7f9 251 history: new CircularArray()
26fb3c18
JB
252 },
253 elu: {
254 idle: {
4ba4c7f9 255 history: new CircularArray()
26fb3c18
JB
256 },
257 active: {
4ba4c7f9 258 history: new CircularArray()
26fb3c18
JB
259 }
260 }
261 })
75de9f41 262 expect(threadWorkerNode.getTaskFunctionWorkerUsage('fn1')).toStrictEqual({
26fb3c18
JB
263 tasks: {
264 executed: 0,
265 executing: 0,
266 queued: 0,
463226a4 267 sequentiallyStolen: 0,
5ad42e34 268 stolen: 0,
26fb3c18
JB
269 failed: 0
270 },
271 runTime: {
4ba4c7f9 272 history: new CircularArray()
26fb3c18
JB
273 },
274 waitTime: {
4ba4c7f9 275 history: new CircularArray()
26fb3c18
JB
276 },
277 elu: {
278 idle: {
4ba4c7f9 279 history: new CircularArray()
26fb3c18
JB
280 },
281 active: {
4ba4c7f9 282 history: new CircularArray()
26fb3c18
JB
283 }
284 }
285 })
75de9f41 286 expect(threadWorkerNode.getTaskFunctionWorkerUsage('fn2')).toStrictEqual({
26fb3c18
JB
287 tasks: {
288 executed: 0,
289 executing: 0,
290 queued: 0,
463226a4 291 sequentiallyStolen: 0,
5ad42e34 292 stolen: 0,
26fb3c18
JB
293 failed: 0
294 },
295 runTime: {
4ba4c7f9 296 history: new CircularArray()
26fb3c18
JB
297 },
298 waitTime: {
4ba4c7f9 299 history: new CircularArray()
26fb3c18
JB
300 },
301 elu: {
302 idle: {
4ba4c7f9 303 history: new CircularArray()
26fb3c18
JB
304 },
305 active: {
4ba4c7f9 306 history: new CircularArray()
26fb3c18
JB
307 }
308 }
309 })
75de9f41 310 expect(threadWorkerNode.taskFunctionsUsage.size).toBe(2)
26fb3c18 311 })
adee6053
JB
312
313 it('Worker node deleteTaskFunctionWorkerUsage()', () => {
31847469
JB
314 expect(threadWorkerNode.info.taskFunctionsProperties).toStrictEqual([
315 { name: DEFAULT_TASK_NAME },
316 { name: 'fn1' },
317 { name: 'fn2' }
adee6053
JB
318 ])
319 expect(threadWorkerNode.taskFunctionsUsage.size).toBe(2)
320 expect(
321 threadWorkerNode.deleteTaskFunctionWorkerUsage('invalidTaskFunction')
322 ).toBe(false)
323 expect(threadWorkerNode.taskFunctionsUsage.size).toBe(2)
324 expect(threadWorkerNode.deleteTaskFunctionWorkerUsage('fn1')).toBe(true)
325 expect(threadWorkerNode.taskFunctionsUsage.size).toBe(1)
326 })
26fb3c18 327})