perf: enable prioritized tasks queue only when necessary
[poolifier.git] / tests / pools / worker-node.test.mjs
1 import { Worker as ClusterWorker } from 'node:cluster'
2 import { MessageChannel, Worker as ThreadWorker } from 'node:worker_threads'
3
4 import { expect } from 'expect'
5
6 import { CircularBuffer } from '../../lib/circular-buffer.cjs'
7 import { WorkerTypes } from '../../lib/index.cjs'
8 import { WorkerNode } from '../../lib/pools/worker-node.cjs'
9 import { PriorityQueue } from '../../lib/priority-queue.cjs'
10 import { DEFAULT_TASK_NAME } from '../../lib/utils.cjs'
11
12 describe('Worker node test suite', () => {
13 const threadWorkerNode = new WorkerNode(
14 WorkerTypes.thread,
15 './tests/worker-files/thread/testWorker.mjs',
16 {
17 tasksQueueBackPressureSize: 12,
18 tasksQueueBucketSize: 6,
19 tasksQueuePriority: true
20 }
21 )
22 const clusterWorkerNode = new WorkerNode(
23 WorkerTypes.cluster,
24 './tests/worker-files/cluster/testWorker.cjs',
25 {
26 tasksQueueBackPressureSize: 12,
27 tasksQueueBucketSize: 6,
28 tasksQueuePriority: true
29 }
30 )
31
32 it('Worker node instantiation', () => {
33 expect(() => new WorkerNode()).toThrow(
34 new TypeError('Cannot construct a worker node without a worker type')
35 )
36 expect(
37 () =>
38 new WorkerNode(
39 'invalidWorkerType',
40 './tests/worker-files/thread/testWorker.mjs'
41 )
42 ).toThrow(
43 new TypeError(
44 "Cannot construct a worker node with an invalid worker type 'invalidWorkerType'"
45 )
46 )
47 expect(
48 () =>
49 new WorkerNode(
50 WorkerTypes.thread,
51 './tests/worker-files/thread/testWorker.mjs'
52 )
53 ).toThrow(
54 new TypeError(
55 'Cannot construct a worker node without worker node options'
56 )
57 )
58 expect(
59 () =>
60 new WorkerNode(
61 WorkerTypes.thread,
62 './tests/worker-files/thread/testWorker.mjs',
63 ''
64 )
65 ).toThrow(
66 new TypeError(
67 'Cannot construct a worker node with invalid worker node options: must be a plain object'
68 )
69 )
70 expect(
71 () =>
72 new WorkerNode(
73 WorkerTypes.thread,
74 './tests/worker-files/thread/testWorker.mjs',
75 {}
76 )
77 ).toThrow(
78 new TypeError(
79 'Cannot construct a worker node without a tasks queue back pressure size option'
80 )
81 )
82 expect(
83 () =>
84 new WorkerNode(
85 WorkerTypes.thread,
86 './tests/worker-files/thread/testWorker.mjs',
87 { tasksQueueBackPressureSize: 'invalidTasksQueueBackPressureSize' }
88 )
89 ).toThrow(
90 new TypeError(
91 'Cannot construct a worker node with a tasks queue back pressure size option that is not an integer'
92 )
93 )
94 expect(
95 () =>
96 new WorkerNode(
97 WorkerTypes.thread,
98 './tests/worker-files/thread/testWorker.mjs',
99 { tasksQueueBackPressureSize: 0.2 }
100 )
101 ).toThrow(
102 new TypeError(
103 'Cannot construct a worker node with a tasks queue back pressure size option that is not an integer'
104 )
105 )
106 expect(
107 () =>
108 new WorkerNode(
109 WorkerTypes.thread,
110 './tests/worker-files/thread/testWorker.mjs',
111 { tasksQueueBackPressureSize: 0 }
112 )
113 ).toThrow(
114 new RangeError(
115 'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer'
116 )
117 )
118 expect(
119 () =>
120 new WorkerNode(
121 WorkerTypes.thread,
122 './tests/worker-files/thread/testWorker.mjs',
123 { tasksQueueBackPressureSize: -1 }
124 )
125 ).toThrow(
126 new RangeError(
127 'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer'
128 )
129 )
130 expect(
131 () =>
132 new WorkerNode(
133 WorkerTypes.thread,
134 './tests/worker-files/thread/testWorker.mjs',
135 {
136 tasksQueueBackPressureSize: 12
137 }
138 )
139 ).toThrow(
140 new TypeError(
141 'Cannot construct a worker node without a tasks queue bucket size option'
142 )
143 )
144 expect(
145 () =>
146 new WorkerNode(
147 WorkerTypes.thread,
148 './tests/worker-files/thread/testWorker.mjs',
149 {
150 tasksQueueBackPressureSize: 12,
151 tasksQueueBucketSize: 'invalidTasksQueueBucketSize'
152 }
153 )
154 ).toThrow(
155 new TypeError(
156 'Cannot construct a worker node with a tasks queue bucket size option that is not an integer'
157 )
158 )
159 expect(
160 () =>
161 new WorkerNode(
162 WorkerTypes.thread,
163 './tests/worker-files/thread/testWorker.mjs',
164 { tasksQueueBackPressureSize: 12, tasksQueueBucketSize: 0.2 }
165 )
166 ).toThrow(
167 new TypeError(
168 'Cannot construct a worker node with a tasks queue bucket size option that is not an integer'
169 )
170 )
171 expect(
172 () =>
173 new WorkerNode(
174 WorkerTypes.thread,
175 './tests/worker-files/thread/testWorker.mjs',
176 { tasksQueueBackPressureSize: 12, tasksQueueBucketSize: 0 }
177 )
178 ).toThrow(
179 new RangeError(
180 'Cannot construct a worker node with a tasks queue bucket size option that is not a positive integer'
181 )
182 )
183 expect(
184 () =>
185 new WorkerNode(
186 WorkerTypes.thread,
187 './tests/worker-files/thread/testWorker.mjs',
188 { tasksQueueBackPressureSize: 12, tasksQueueBucketSize: -1 }
189 )
190 ).toThrow(
191 new RangeError(
192 'Cannot construct a worker node with a tasks queue bucket size option that is not a positive integer'
193 )
194 )
195 expect(
196 () =>
197 new WorkerNode(
198 WorkerTypes.thread,
199 './tests/worker-files/thread/testWorker.mjs',
200 {
201 tasksQueueBackPressureSize: 12,
202 tasksQueueBucketSize: 6
203 }
204 )
205 ).toThrow(
206 new RangeError(
207 'Cannot construct a worker node without a tasks queue priority option'
208 )
209 )
210 expect(
211 () =>
212 new WorkerNode(
213 WorkerTypes.thread,
214 './tests/worker-files/thread/testWorker.mjs',
215 {
216 tasksQueueBackPressureSize: 12,
217 tasksQueueBucketSize: 6,
218 tasksQueuePriority: 'invalidTasksQueuePriority'
219 }
220 )
221 ).toThrow(
222 new RangeError(
223 'Cannot construct a worker node with a tasks queue priority option that is not a boolean'
224 )
225 )
226 expect(threadWorkerNode).toBeInstanceOf(WorkerNode)
227 expect(threadWorkerNode.worker).toBeInstanceOf(ThreadWorker)
228 expect(threadWorkerNode.info).toStrictEqual({
229 id: threadWorkerNode.worker.threadId,
230 type: WorkerTypes.thread,
231 dynamic: false,
232 ready: false,
233 stealing: false,
234 backPressure: false
235 })
236 expect(threadWorkerNode.usage).toStrictEqual({
237 tasks: {
238 executed: 0,
239 executing: 0,
240 queued: 0,
241 maxQueued: 0,
242 sequentiallyStolen: 0,
243 stolen: 0,
244 failed: 0
245 },
246 runTime: {
247 history: expect.any(CircularBuffer)
248 },
249 waitTime: {
250 history: expect.any(CircularBuffer)
251 },
252 elu: {
253 idle: {
254 history: expect.any(CircularBuffer)
255 },
256 active: {
257 history: expect.any(CircularBuffer)
258 }
259 }
260 })
261 expect(threadWorkerNode.messageChannel).toBeInstanceOf(MessageChannel)
262 expect(threadWorkerNode.tasksQueueBackPressureSize).toBe(12)
263 expect(threadWorkerNode.tasksQueue).toBeInstanceOf(PriorityQueue)
264 expect(threadWorkerNode.tasksQueue.size).toBe(0)
265 expect(threadWorkerNode.tasksQueue.bucketSize).toBe(6)
266 expect(threadWorkerNode.tasksQueue.enablePriority).toBe(true)
267 expect(threadWorkerNode.tasksQueueSize()).toBe(
268 threadWorkerNode.tasksQueue.size
269 )
270 expect(threadWorkerNode.setBackPressureFlag).toBe(false)
271 expect(threadWorkerNode.taskFunctionsUsage).toBeInstanceOf(Map)
272
273 expect(clusterWorkerNode).toBeInstanceOf(WorkerNode)
274 expect(clusterWorkerNode.worker).toBeInstanceOf(ClusterWorker)
275 expect(clusterWorkerNode.info).toStrictEqual({
276 id: clusterWorkerNode.worker.id,
277 type: WorkerTypes.cluster,
278 dynamic: false,
279 ready: false,
280 stealing: false,
281 backPressure: false
282 })
283 expect(clusterWorkerNode.usage).toStrictEqual({
284 tasks: {
285 executed: 0,
286 executing: 0,
287 queued: 0,
288 maxQueued: 0,
289 sequentiallyStolen: 0,
290 stolen: 0,
291 failed: 0
292 },
293 runTime: {
294 history: expect.any(CircularBuffer)
295 },
296 waitTime: {
297 history: expect.any(CircularBuffer)
298 },
299 elu: {
300 idle: {
301 history: expect.any(CircularBuffer)
302 },
303 active: {
304 history: expect.any(CircularBuffer)
305 }
306 }
307 })
308 expect(clusterWorkerNode.messageChannel).toBeUndefined()
309 expect(clusterWorkerNode.tasksQueueBackPressureSize).toBe(12)
310 expect(clusterWorkerNode.tasksQueue).toBeInstanceOf(PriorityQueue)
311 expect(clusterWorkerNode.tasksQueue.size).toBe(0)
312 expect(clusterWorkerNode.tasksQueue.bucketSize).toBe(6)
313 expect(clusterWorkerNode.tasksQueue.enablePriority).toBe(true)
314 expect(clusterWorkerNode.tasksQueueSize()).toBe(
315 clusterWorkerNode.tasksQueue.size
316 )
317 expect(clusterWorkerNode.setBackPressureFlag).toBe(false)
318 expect(clusterWorkerNode.taskFunctionsUsage).toBeInstanceOf(Map)
319 })
320
321 it('Worker node getTaskFunctionWorkerUsage()', () => {
322 expect(() =>
323 threadWorkerNode.getTaskFunctionWorkerUsage('invalidTaskFunction')
324 ).toThrow(
325 new TypeError(
326 "Cannot get task function worker usage for task function name 'invalidTaskFunction' when task function properties list is not yet defined"
327 )
328 )
329 threadWorkerNode.info.taskFunctionsProperties = [
330 { name: DEFAULT_TASK_NAME },
331 { name: 'fn1' }
332 ]
333 expect(() =>
334 threadWorkerNode.getTaskFunctionWorkerUsage('invalidTaskFunction')
335 ).toThrow(
336 new TypeError(
337 "Cannot get task function worker usage for task function name 'invalidTaskFunction' when task function properties list has less than 3 elements"
338 )
339 )
340 threadWorkerNode.info.taskFunctionsProperties = [
341 { name: DEFAULT_TASK_NAME },
342 { name: 'fn1' },
343 { name: 'fn2' }
344 ]
345 expect(
346 threadWorkerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
347 ).toStrictEqual({
348 tasks: {
349 executed: 0,
350 executing: 0,
351 queued: 0,
352 sequentiallyStolen: 0,
353 stolen: 0,
354 failed: 0
355 },
356 runTime: {
357 history: expect.any(CircularBuffer)
358 },
359 waitTime: {
360 history: expect.any(CircularBuffer)
361 },
362 elu: {
363 idle: {
364 history: expect.any(CircularBuffer)
365 },
366 active: {
367 history: expect.any(CircularBuffer)
368 }
369 }
370 })
371 expect(threadWorkerNode.getTaskFunctionWorkerUsage('fn1')).toStrictEqual({
372 tasks: {
373 executed: 0,
374 executing: 0,
375 queued: 0,
376 sequentiallyStolen: 0,
377 stolen: 0,
378 failed: 0
379 },
380 runTime: {
381 history: expect.any(CircularBuffer)
382 },
383 waitTime: {
384 history: expect.any(CircularBuffer)
385 },
386 elu: {
387 idle: {
388 history: expect.any(CircularBuffer)
389 },
390 active: {
391 history: expect.any(CircularBuffer)
392 }
393 }
394 })
395 expect(threadWorkerNode.getTaskFunctionWorkerUsage('fn2')).toStrictEqual({
396 tasks: {
397 executed: 0,
398 executing: 0,
399 queued: 0,
400 sequentiallyStolen: 0,
401 stolen: 0,
402 failed: 0
403 },
404 runTime: {
405 history: expect.any(CircularBuffer)
406 },
407 waitTime: {
408 history: expect.any(CircularBuffer)
409 },
410 elu: {
411 idle: {
412 history: expect.any(CircularBuffer)
413 },
414 active: {
415 history: expect.any(CircularBuffer)
416 }
417 }
418 })
419 expect(threadWorkerNode.taskFunctionsUsage.size).toBe(2)
420 })
421
422 it('Worker node deleteTaskFunctionWorkerUsage()', () => {
423 expect(threadWorkerNode.info.taskFunctionsProperties).toStrictEqual([
424 { name: DEFAULT_TASK_NAME },
425 { name: 'fn1' },
426 { name: 'fn2' }
427 ])
428 expect(threadWorkerNode.taskFunctionsUsage.size).toBe(2)
429 expect(
430 threadWorkerNode.deleteTaskFunctionWorkerUsage('invalidTaskFunction')
431 ).toBe(false)
432 expect(threadWorkerNode.taskFunctionsUsage.size).toBe(2)
433 expect(threadWorkerNode.deleteTaskFunctionWorkerUsage('fn1')).toBe(true)
434 expect(threadWorkerNode.taskFunctionsUsage.size).toBe(1)
435 })
436 })