perf: enable prioritized tasks queue only when necessary
[poolifier.git] / tests / pools / worker-node.test.mjs
CommitLineData
9974369e 1import { Worker as ClusterWorker } from 'node:cluster'
ded253e2
JB
2import { MessageChannel, Worker as ThreadWorker } from 'node:worker_threads'
3
a074ffee 4import { expect } from 'expect'
ded253e2 5
f12182ad 6import { CircularBuffer } from '../../lib/circular-buffer.cjs'
ded253e2
JB
7import { WorkerTypes } from '../../lib/index.cjs'
8import { WorkerNode } from '../../lib/pools/worker-node.cjs'
95d1a734 9import { PriorityQueue } from '../../lib/priority-queue.cjs'
d35e5717 10import { DEFAULT_TASK_NAME } from '../../lib/utils.cjs'
26fb3c18
JB
11
12describe('Worker node test suite', () => {
c3719753
JB
13 const threadWorkerNode = new WorkerNode(
14 WorkerTypes.thread,
15 './tests/worker-files/thread/testWorker.mjs',
fcfc3353
JB
16 {
17 tasksQueueBackPressureSize: 12,
18 tasksQueueBucketSize: 6,
19 tasksQueuePriority: true
20 }
c3719753
JB
21 )
22 const clusterWorkerNode = new WorkerNode(
23 WorkerTypes.cluster,
d35e5717 24 './tests/worker-files/cluster/testWorker.cjs',
fcfc3353
JB
25 {
26 tasksQueueBackPressureSize: 12,
27 tasksQueueBucketSize: 6,
28 tasksQueuePriority: true
29 }
c3719753 30 )
26fb3c18
JB
31
32 it('Worker node instantiation', () => {
948faff7 33 expect(() => new WorkerNode()).toThrow(
c3719753 34 new TypeError('Cannot construct a worker node without a worker type')
26fb3c18 35 )
c3719753
JB
36 expect(
37 () =>
38 new WorkerNode(
39 'invalidWorkerType',
fcfc3353 40 './tests/worker-files/thread/testWorker.mjs'
c3719753
JB
41 )
42 ).toThrow(
43 new TypeError(
44 "Cannot construct a worker node with an invalid worker type 'invalidWorkerType'"
45 )
46 )
47 expect(
48 () =>
49 new WorkerNode(
50 WorkerTypes.thread,
51 './tests/worker-files/thread/testWorker.mjs'
52 )
53 ).toThrow(
26fb3c18 54 new TypeError(
c3719753 55 'Cannot construct a worker node without worker node options'
26fb3c18
JB
56 )
57 )
58 expect(
c3719753
JB
59 () =>
60 new WorkerNode(
61 WorkerTypes.thread,
62 './tests/worker-files/thread/testWorker.mjs',
63 ''
64 )
948faff7 65 ).toThrow(
26fb3c18 66 new TypeError(
fcfc3353 67 'Cannot construct a worker node with invalid worker node options: must be a plain object'
26fb3c18
JB
68 )
69 )
c3719753
JB
70 expect(
71 () =>
72 new WorkerNode(
73 WorkerTypes.thread,
74 './tests/worker-files/thread/testWorker.mjs',
75 {}
76 )
77 ).toThrow(
5b49e864 78 new TypeError(
c3719753 79 'Cannot construct a worker node without a tasks queue back pressure size option'
5b49e864
JB
80 )
81 )
c3719753
JB
82 expect(
83 () =>
84 new WorkerNode(
85 WorkerTypes.thread,
86 './tests/worker-files/thread/testWorker.mjs',
87 { tasksQueueBackPressureSize: 'invalidTasksQueueBackPressureSize' }
88 )
89 ).toThrow(
90 new TypeError(
91 'Cannot construct a worker node with a tasks queue back pressure size option that is not an integer'
92 )
93 )
94 expect(
95 () =>
96 new WorkerNode(
97 WorkerTypes.thread,
98 './tests/worker-files/thread/testWorker.mjs',
99 { tasksQueueBackPressureSize: 0.2 }
100 )
101 ).toThrow(
102 new TypeError(
103 'Cannot construct a worker node with a tasks queue back pressure size option that is not an integer'
104 )
105 )
106 expect(
107 () =>
108 new WorkerNode(
109 WorkerTypes.thread,
110 './tests/worker-files/thread/testWorker.mjs',
111 { tasksQueueBackPressureSize: 0 }
112 )
113 ).toThrow(
5b49e864 114 new RangeError(
c3719753 115 'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer'
5b49e864
JB
116 )
117 )
c3719753
JB
118 expect(
119 () =>
120 new WorkerNode(
121 WorkerTypes.thread,
122 './tests/worker-files/thread/testWorker.mjs',
123 { tasksQueueBackPressureSize: -1 }
124 )
125 ).toThrow(
5b49e864 126 new RangeError(
c3719753 127 'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer'
5b49e864
JB
128 )
129 )
59ca7cff
JB
130 expect(
131 () =>
132 new WorkerNode(
133 WorkerTypes.thread,
134 './tests/worker-files/thread/testWorker.mjs',
135 {
136 tasksQueueBackPressureSize: 12
137 }
138 )
139 ).toThrow(
140 new TypeError(
141 'Cannot construct a worker node without a tasks queue bucket size option'
142 )
143 )
144 expect(
145 () =>
146 new WorkerNode(
147 WorkerTypes.thread,
148 './tests/worker-files/thread/testWorker.mjs',
149 {
150 tasksQueueBackPressureSize: 12,
151 tasksQueueBucketSize: 'invalidTasksQueueBucketSize'
152 }
153 )
154 ).toThrow(
155 new TypeError(
156 'Cannot construct a worker node with a tasks queue bucket size option that is not an integer'
157 )
158 )
159 expect(
160 () =>
161 new WorkerNode(
162 WorkerTypes.thread,
163 './tests/worker-files/thread/testWorker.mjs',
164 { tasksQueueBackPressureSize: 12, tasksQueueBucketSize: 0.2 }
165 )
166 ).toThrow(
167 new TypeError(
168 'Cannot construct a worker node with a tasks queue bucket size option that is not an integer'
169 )
170 )
171 expect(
172 () =>
173 new WorkerNode(
174 WorkerTypes.thread,
175 './tests/worker-files/thread/testWorker.mjs',
176 { tasksQueueBackPressureSize: 12, tasksQueueBucketSize: 0 }
177 )
178 ).toThrow(
179 new RangeError(
180 'Cannot construct a worker node with a tasks queue bucket size option that is not a positive integer'
181 )
182 )
183 expect(
184 () =>
185 new WorkerNode(
186 WorkerTypes.thread,
187 './tests/worker-files/thread/testWorker.mjs',
188 { tasksQueueBackPressureSize: 12, tasksQueueBucketSize: -1 }
189 )
190 ).toThrow(
191 new RangeError(
192 'Cannot construct a worker node with a tasks queue bucket size option that is not a positive integer'
193 )
194 )
fcfc3353
JB
195 expect(
196 () =>
197 new WorkerNode(
198 WorkerTypes.thread,
199 './tests/worker-files/thread/testWorker.mjs',
200 {
201 tasksQueueBackPressureSize: 12,
202 tasksQueueBucketSize: 6
203 }
204 )
205 ).toThrow(
206 new RangeError(
207 'Cannot construct a worker node without a tasks queue priority option'
208 )
209 )
210 expect(
211 () =>
212 new WorkerNode(
213 WorkerTypes.thread,
214 './tests/worker-files/thread/testWorker.mjs',
215 {
216 tasksQueueBackPressureSize: 12,
217 tasksQueueBucketSize: 6,
218 tasksQueuePriority: 'invalidTasksQueuePriority'
219 }
220 )
221 ).toThrow(
222 new RangeError(
223 'Cannot construct a worker node with a tasks queue priority option that is not a boolean'
224 )
225 )
75de9f41 226 expect(threadWorkerNode).toBeInstanceOf(WorkerNode)
9974369e 227 expect(threadWorkerNode.worker).toBeInstanceOf(ThreadWorker)
75de9f41 228 expect(threadWorkerNode.info).toStrictEqual({
c3719753 229 id: threadWorkerNode.worker.threadId,
26fb3c18
JB
230 type: WorkerTypes.thread,
231 dynamic: false,
5eb72b9e 232 ready: false,
85b553ba
JB
233 stealing: false,
234 backPressure: false
26fb3c18 235 })
75de9f41
JB
236 expect(threadWorkerNode.usage).toStrictEqual({
237 tasks: {
238 executed: 0,
239 executing: 0,
240 queued: 0,
241 maxQueued: 0,
463226a4 242 sequentiallyStolen: 0,
75de9f41
JB
243 stolen: 0,
244 failed: 0
245 },
246 runTime: {
f12182ad 247 history: expect.any(CircularBuffer)
75de9f41
JB
248 },
249 waitTime: {
f12182ad 250 history: expect.any(CircularBuffer)
75de9f41
JB
251 },
252 elu: {
253 idle: {
f12182ad 254 history: expect.any(CircularBuffer)
75de9f41
JB
255 },
256 active: {
f12182ad 257 history: expect.any(CircularBuffer)
75de9f41
JB
258 }
259 }
260 })
261 expect(threadWorkerNode.messageChannel).toBeInstanceOf(MessageChannel)
262 expect(threadWorkerNode.tasksQueueBackPressureSize).toBe(12)
95d1a734 263 expect(threadWorkerNode.tasksQueue).toBeInstanceOf(PriorityQueue)
75de9f41 264 expect(threadWorkerNode.tasksQueue.size).toBe(0)
2107bc57 265 expect(threadWorkerNode.tasksQueue.bucketSize).toBe(6)
fcfc3353 266 expect(threadWorkerNode.tasksQueue.enablePriority).toBe(true)
68f1f531
JB
267 expect(threadWorkerNode.tasksQueueSize()).toBe(
268 threadWorkerNode.tasksQueue.size
269 )
2eee7220 270 expect(threadWorkerNode.setBackPressureFlag).toBe(false)
75de9f41
JB
271 expect(threadWorkerNode.taskFunctionsUsage).toBeInstanceOf(Map)
272
273 expect(clusterWorkerNode).toBeInstanceOf(WorkerNode)
9974369e 274 expect(clusterWorkerNode.worker).toBeInstanceOf(ClusterWorker)
75de9f41 275 expect(clusterWorkerNode.info).toStrictEqual({
c3719753 276 id: clusterWorkerNode.worker.id,
75de9f41
JB
277 type: WorkerTypes.cluster,
278 dynamic: false,
5eb72b9e 279 ready: false,
85b553ba
JB
280 stealing: false,
281 backPressure: false
75de9f41
JB
282 })
283 expect(clusterWorkerNode.usage).toStrictEqual({
26fb3c18
JB
284 tasks: {
285 executed: 0,
286 executing: 0,
287 queued: 0,
288 maxQueued: 0,
463226a4 289 sequentiallyStolen: 0,
26fb3c18
JB
290 stolen: 0,
291 failed: 0
292 },
293 runTime: {
f12182ad 294 history: expect.any(CircularBuffer)
26fb3c18
JB
295 },
296 waitTime: {
f12182ad 297 history: expect.any(CircularBuffer)
26fb3c18
JB
298 },
299 elu: {
300 idle: {
f12182ad 301 history: expect.any(CircularBuffer)
26fb3c18
JB
302 },
303 active: {
f12182ad 304 history: expect.any(CircularBuffer)
26fb3c18
JB
305 }
306 }
307 })
75de9f41
JB
308 expect(clusterWorkerNode.messageChannel).toBeUndefined()
309 expect(clusterWorkerNode.tasksQueueBackPressureSize).toBe(12)
95d1a734 310 expect(clusterWorkerNode.tasksQueue).toBeInstanceOf(PriorityQueue)
75de9f41 311 expect(clusterWorkerNode.tasksQueue.size).toBe(0)
2107bc57 312 expect(clusterWorkerNode.tasksQueue.bucketSize).toBe(6)
fcfc3353 313 expect(clusterWorkerNode.tasksQueue.enablePriority).toBe(true)
68f1f531
JB
314 expect(clusterWorkerNode.tasksQueueSize()).toBe(
315 clusterWorkerNode.tasksQueue.size
316 )
2eee7220 317 expect(clusterWorkerNode.setBackPressureFlag).toBe(false)
75de9f41 318 expect(clusterWorkerNode.taskFunctionsUsage).toBeInstanceOf(Map)
26fb3c18
JB
319 })
320
321 it('Worker node getTaskFunctionWorkerUsage()', () => {
322 expect(() =>
75de9f41 323 threadWorkerNode.getTaskFunctionWorkerUsage('invalidTaskFunction')
948faff7 324 ).toThrow(
26fb3c18 325 new TypeError(
31847469 326 "Cannot get task function worker usage for task function name 'invalidTaskFunction' when task function properties list is not yet defined"
26fb3c18
JB
327 )
328 )
31847469
JB
329 threadWorkerNode.info.taskFunctionsProperties = [
330 { name: DEFAULT_TASK_NAME },
331 { name: 'fn1' }
332 ]
26fb3c18 333 expect(() =>
75de9f41 334 threadWorkerNode.getTaskFunctionWorkerUsage('invalidTaskFunction')
948faff7 335 ).toThrow(
26fb3c18 336 new TypeError(
31847469 337 "Cannot get task function worker usage for task function name 'invalidTaskFunction' when task function properties list has less than 3 elements"
26fb3c18
JB
338 )
339 )
31847469
JB
340 threadWorkerNode.info.taskFunctionsProperties = [
341 { name: DEFAULT_TASK_NAME },
342 { name: 'fn1' },
343 { name: 'fn2' }
344 ]
6cd5248f 345 expect(
75de9f41 346 threadWorkerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
6cd5248f 347 ).toStrictEqual({
26fb3c18
JB
348 tasks: {
349 executed: 0,
350 executing: 0,
351 queued: 0,
463226a4 352 sequentiallyStolen: 0,
5ad42e34 353 stolen: 0,
26fb3c18
JB
354 failed: 0
355 },
356 runTime: {
f12182ad 357 history: expect.any(CircularBuffer)
26fb3c18
JB
358 },
359 waitTime: {
f12182ad 360 history: expect.any(CircularBuffer)
26fb3c18
JB
361 },
362 elu: {
363 idle: {
f12182ad 364 history: expect.any(CircularBuffer)
26fb3c18
JB
365 },
366 active: {
f12182ad 367 history: expect.any(CircularBuffer)
26fb3c18
JB
368 }
369 }
370 })
75de9f41 371 expect(threadWorkerNode.getTaskFunctionWorkerUsage('fn1')).toStrictEqual({
26fb3c18
JB
372 tasks: {
373 executed: 0,
374 executing: 0,
375 queued: 0,
463226a4 376 sequentiallyStolen: 0,
5ad42e34 377 stolen: 0,
26fb3c18
JB
378 failed: 0
379 },
380 runTime: {
f12182ad 381 history: expect.any(CircularBuffer)
26fb3c18
JB
382 },
383 waitTime: {
f12182ad 384 history: expect.any(CircularBuffer)
26fb3c18
JB
385 },
386 elu: {
387 idle: {
f12182ad 388 history: expect.any(CircularBuffer)
26fb3c18
JB
389 },
390 active: {
f12182ad 391 history: expect.any(CircularBuffer)
26fb3c18
JB
392 }
393 }
394 })
75de9f41 395 expect(threadWorkerNode.getTaskFunctionWorkerUsage('fn2')).toStrictEqual({
26fb3c18
JB
396 tasks: {
397 executed: 0,
398 executing: 0,
399 queued: 0,
463226a4 400 sequentiallyStolen: 0,
5ad42e34 401 stolen: 0,
26fb3c18
JB
402 failed: 0
403 },
404 runTime: {
f12182ad 405 history: expect.any(CircularBuffer)
26fb3c18
JB
406 },
407 waitTime: {
f12182ad 408 history: expect.any(CircularBuffer)
26fb3c18
JB
409 },
410 elu: {
411 idle: {
f12182ad 412 history: expect.any(CircularBuffer)
26fb3c18
JB
413 },
414 active: {
f12182ad 415 history: expect.any(CircularBuffer)
26fb3c18
JB
416 }
417 }
418 })
75de9f41 419 expect(threadWorkerNode.taskFunctionsUsage.size).toBe(2)
26fb3c18 420 })
adee6053
JB
421
422 it('Worker node deleteTaskFunctionWorkerUsage()', () => {
31847469
JB
423 expect(threadWorkerNode.info.taskFunctionsProperties).toStrictEqual([
424 { name: DEFAULT_TASK_NAME },
425 { name: 'fn1' },
426 { name: 'fn2' }
adee6053
JB
427 ])
428 expect(threadWorkerNode.taskFunctionsUsage.size).toBe(2)
429 expect(
430 threadWorkerNode.deleteTaskFunctionWorkerUsage('invalidTaskFunction')
431 ).toBe(false)
432 expect(threadWorkerNode.taskFunctionsUsage.size).toBe(2)
433 expect(threadWorkerNode.deleteTaskFunctionWorkerUsage('fn1')).toBe(true)
434 expect(threadWorkerNode.taskFunctionsUsage.size).toBe(1)
435 })
26fb3c18 436})