fix: disable `tasksStealingOnBackPressure` by default
[poolifier.git] / tests / pools / worker-node.test.mjs
CommitLineData
9974369e 1import { Worker as ClusterWorker } from 'node:cluster'
ded253e2
JB
2import { MessageChannel, Worker as ThreadWorker } from 'node:worker_threads'
3
a074ffee 4import { expect } from 'expect'
ded253e2 5
d35e5717 6import { CircularArray } from '../../lib/circular-array.cjs'
ded253e2
JB
7import { WorkerTypes } from '../../lib/index.cjs'
8import { WorkerNode } from '../../lib/pools/worker-node.cjs'
95d1a734 9import { PriorityQueue } from '../../lib/priority-queue.cjs'
d35e5717 10import { DEFAULT_TASK_NAME } from '../../lib/utils.cjs'
26fb3c18
JB
11
12describe('Worker node test suite', () => {
c3719753
JB
13 const threadWorkerNode = new WorkerNode(
14 WorkerTypes.thread,
15 './tests/worker-files/thread/testWorker.mjs',
59ca7cff 16 { tasksQueueBackPressureSize: 12, tasksQueueBucketSize: 6 }
c3719753
JB
17 )
18 const clusterWorkerNode = new WorkerNode(
19 WorkerTypes.cluster,
d35e5717 20 './tests/worker-files/cluster/testWorker.cjs',
59ca7cff 21 { tasksQueueBackPressureSize: 12, tasksQueueBucketSize: 6 }
c3719753 22 )
26fb3c18
JB
23
24 it('Worker node instantiation', () => {
948faff7 25 expect(() => new WorkerNode()).toThrow(
c3719753 26 new TypeError('Cannot construct a worker node without a worker type')
26fb3c18 27 )
c3719753
JB
28 expect(
29 () =>
30 new WorkerNode(
31 'invalidWorkerType',
32 './tests/worker-files/thread/testWorker.mjs',
33 { tasksQueueBackPressureSize: 12 }
34 )
35 ).toThrow(
36 new TypeError(
37 "Cannot construct a worker node with an invalid worker type 'invalidWorkerType'"
38 )
39 )
40 expect(
41 () =>
42 new WorkerNode(
43 WorkerTypes.thread,
44 './tests/worker-files/thread/testWorker.mjs'
45 )
46 ).toThrow(
26fb3c18 47 new TypeError(
c3719753 48 'Cannot construct a worker node without worker node options'
26fb3c18
JB
49 )
50 )
51 expect(
c3719753
JB
52 () =>
53 new WorkerNode(
54 WorkerTypes.thread,
55 './tests/worker-files/thread/testWorker.mjs',
56 ''
57 )
948faff7 58 ).toThrow(
26fb3c18 59 new TypeError(
c3719753 60 'Cannot construct a worker node with invalid options: must be a plain object'
26fb3c18
JB
61 )
62 )
c3719753
JB
63 expect(
64 () =>
65 new WorkerNode(
66 WorkerTypes.thread,
67 './tests/worker-files/thread/testWorker.mjs',
68 {}
69 )
70 ).toThrow(
5b49e864 71 new TypeError(
c3719753 72 'Cannot construct a worker node without a tasks queue back pressure size option'
5b49e864
JB
73 )
74 )
c3719753
JB
75 expect(
76 () =>
77 new WorkerNode(
78 WorkerTypes.thread,
79 './tests/worker-files/thread/testWorker.mjs',
80 { tasksQueueBackPressureSize: 'invalidTasksQueueBackPressureSize' }
81 )
82 ).toThrow(
83 new TypeError(
84 'Cannot construct a worker node with a tasks queue back pressure size option that is not an integer'
85 )
86 )
87 expect(
88 () =>
89 new WorkerNode(
90 WorkerTypes.thread,
91 './tests/worker-files/thread/testWorker.mjs',
92 { tasksQueueBackPressureSize: 0.2 }
93 )
94 ).toThrow(
95 new TypeError(
96 'Cannot construct a worker node with a tasks queue back pressure size option that is not an integer'
97 )
98 )
99 expect(
100 () =>
101 new WorkerNode(
102 WorkerTypes.thread,
103 './tests/worker-files/thread/testWorker.mjs',
104 { tasksQueueBackPressureSize: 0 }
105 )
106 ).toThrow(
5b49e864 107 new RangeError(
c3719753 108 'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer'
5b49e864
JB
109 )
110 )
c3719753
JB
111 expect(
112 () =>
113 new WorkerNode(
114 WorkerTypes.thread,
115 './tests/worker-files/thread/testWorker.mjs',
116 { tasksQueueBackPressureSize: -1 }
117 )
118 ).toThrow(
5b49e864 119 new RangeError(
c3719753 120 'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer'
5b49e864
JB
121 )
122 )
59ca7cff
JB
123 expect(
124 () =>
125 new WorkerNode(
126 WorkerTypes.thread,
127 './tests/worker-files/thread/testWorker.mjs',
128 {
129 tasksQueueBackPressureSize: 12
130 }
131 )
132 ).toThrow(
133 new TypeError(
134 'Cannot construct a worker node without a tasks queue bucket size option'
135 )
136 )
137 expect(
138 () =>
139 new WorkerNode(
140 WorkerTypes.thread,
141 './tests/worker-files/thread/testWorker.mjs',
142 {
143 tasksQueueBackPressureSize: 12,
144 tasksQueueBucketSize: 'invalidTasksQueueBucketSize'
145 }
146 )
147 ).toThrow(
148 new TypeError(
149 'Cannot construct a worker node with a tasks queue bucket size option that is not an integer'
150 )
151 )
152 expect(
153 () =>
154 new WorkerNode(
155 WorkerTypes.thread,
156 './tests/worker-files/thread/testWorker.mjs',
157 { tasksQueueBackPressureSize: 12, tasksQueueBucketSize: 0.2 }
158 )
159 ).toThrow(
160 new TypeError(
161 'Cannot construct a worker node with a tasks queue bucket size option that is not an integer'
162 )
163 )
164 expect(
165 () =>
166 new WorkerNode(
167 WorkerTypes.thread,
168 './tests/worker-files/thread/testWorker.mjs',
169 { tasksQueueBackPressureSize: 12, tasksQueueBucketSize: 0 }
170 )
171 ).toThrow(
172 new RangeError(
173 'Cannot construct a worker node with a tasks queue bucket size option that is not a positive integer'
174 )
175 )
176 expect(
177 () =>
178 new WorkerNode(
179 WorkerTypes.thread,
180 './tests/worker-files/thread/testWorker.mjs',
181 { tasksQueueBackPressureSize: 12, tasksQueueBucketSize: -1 }
182 )
183 ).toThrow(
184 new RangeError(
185 'Cannot construct a worker node with a tasks queue bucket size option that is not a positive integer'
186 )
187 )
75de9f41 188 expect(threadWorkerNode).toBeInstanceOf(WorkerNode)
9974369e 189 expect(threadWorkerNode.worker).toBeInstanceOf(ThreadWorker)
75de9f41 190 expect(threadWorkerNode.info).toStrictEqual({
c3719753 191 id: threadWorkerNode.worker.threadId,
26fb3c18
JB
192 type: WorkerTypes.thread,
193 dynamic: false,
5eb72b9e 194 ready: false,
2eee7220 195 backPressure: false,
5eb72b9e 196 stealing: false
26fb3c18 197 })
75de9f41
JB
198 expect(threadWorkerNode.usage).toStrictEqual({
199 tasks: {
200 executed: 0,
201 executing: 0,
202 queued: 0,
203 maxQueued: 0,
463226a4 204 sequentiallyStolen: 0,
75de9f41
JB
205 stolen: 0,
206 failed: 0
207 },
208 runTime: {
4ba4c7f9 209 history: new CircularArray()
75de9f41
JB
210 },
211 waitTime: {
4ba4c7f9 212 history: new CircularArray()
75de9f41
JB
213 },
214 elu: {
215 idle: {
4ba4c7f9 216 history: new CircularArray()
75de9f41
JB
217 },
218 active: {
4ba4c7f9 219 history: new CircularArray()
75de9f41
JB
220 }
221 }
222 })
223 expect(threadWorkerNode.messageChannel).toBeInstanceOf(MessageChannel)
224 expect(threadWorkerNode.tasksQueueBackPressureSize).toBe(12)
95d1a734 225 expect(threadWorkerNode.tasksQueue).toBeInstanceOf(PriorityQueue)
75de9f41 226 expect(threadWorkerNode.tasksQueue.size).toBe(0)
59ca7cff 227 expect(threadWorkerNode.tasksQueue.k).toBe(6)
68f1f531
JB
228 expect(threadWorkerNode.tasksQueueSize()).toBe(
229 threadWorkerNode.tasksQueue.size
230 )
2eee7220 231 expect(threadWorkerNode.setBackPressureFlag).toBe(false)
75de9f41
JB
232 expect(threadWorkerNode.taskFunctionsUsage).toBeInstanceOf(Map)
233
234 expect(clusterWorkerNode).toBeInstanceOf(WorkerNode)
9974369e 235 expect(clusterWorkerNode.worker).toBeInstanceOf(ClusterWorker)
75de9f41 236 expect(clusterWorkerNode.info).toStrictEqual({
c3719753 237 id: clusterWorkerNode.worker.id,
75de9f41
JB
238 type: WorkerTypes.cluster,
239 dynamic: false,
5eb72b9e 240 ready: false,
2eee7220 241 backPressure: false,
5eb72b9e 242 stealing: false
75de9f41
JB
243 })
244 expect(clusterWorkerNode.usage).toStrictEqual({
26fb3c18
JB
245 tasks: {
246 executed: 0,
247 executing: 0,
248 queued: 0,
249 maxQueued: 0,
463226a4 250 sequentiallyStolen: 0,
26fb3c18
JB
251 stolen: 0,
252 failed: 0
253 },
254 runTime: {
4ba4c7f9 255 history: new CircularArray()
26fb3c18
JB
256 },
257 waitTime: {
4ba4c7f9 258 history: new CircularArray()
26fb3c18
JB
259 },
260 elu: {
261 idle: {
4ba4c7f9 262 history: new CircularArray()
26fb3c18
JB
263 },
264 active: {
4ba4c7f9 265 history: new CircularArray()
26fb3c18
JB
266 }
267 }
268 })
75de9f41
JB
269 expect(clusterWorkerNode.messageChannel).toBeUndefined()
270 expect(clusterWorkerNode.tasksQueueBackPressureSize).toBe(12)
95d1a734 271 expect(clusterWorkerNode.tasksQueue).toBeInstanceOf(PriorityQueue)
75de9f41 272 expect(clusterWorkerNode.tasksQueue.size).toBe(0)
59ca7cff 273 expect(clusterWorkerNode.tasksQueue.k).toBe(6)
68f1f531
JB
274 expect(clusterWorkerNode.tasksQueueSize()).toBe(
275 clusterWorkerNode.tasksQueue.size
276 )
2eee7220 277 expect(clusterWorkerNode.setBackPressureFlag).toBe(false)
75de9f41 278 expect(clusterWorkerNode.taskFunctionsUsage).toBeInstanceOf(Map)
26fb3c18
JB
279 })
280
281 it('Worker node getTaskFunctionWorkerUsage()', () => {
282 expect(() =>
75de9f41 283 threadWorkerNode.getTaskFunctionWorkerUsage('invalidTaskFunction')
948faff7 284 ).toThrow(
26fb3c18 285 new TypeError(
31847469 286 "Cannot get task function worker usage for task function name 'invalidTaskFunction' when task function properties list is not yet defined"
26fb3c18
JB
287 )
288 )
31847469
JB
289 threadWorkerNode.info.taskFunctionsProperties = [
290 { name: DEFAULT_TASK_NAME },
291 { name: 'fn1' }
292 ]
26fb3c18 293 expect(() =>
75de9f41 294 threadWorkerNode.getTaskFunctionWorkerUsage('invalidTaskFunction')
948faff7 295 ).toThrow(
26fb3c18 296 new TypeError(
31847469 297 "Cannot get task function worker usage for task function name 'invalidTaskFunction' when task function properties list has less than 3 elements"
26fb3c18
JB
298 )
299 )
31847469
JB
300 threadWorkerNode.info.taskFunctionsProperties = [
301 { name: DEFAULT_TASK_NAME },
302 { name: 'fn1' },
303 { name: 'fn2' }
304 ]
6cd5248f 305 expect(
75de9f41 306 threadWorkerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
6cd5248f 307 ).toStrictEqual({
26fb3c18
JB
308 tasks: {
309 executed: 0,
310 executing: 0,
311 queued: 0,
463226a4 312 sequentiallyStolen: 0,
5ad42e34 313 stolen: 0,
26fb3c18
JB
314 failed: 0
315 },
316 runTime: {
4ba4c7f9 317 history: new CircularArray()
26fb3c18
JB
318 },
319 waitTime: {
4ba4c7f9 320 history: new CircularArray()
26fb3c18
JB
321 },
322 elu: {
323 idle: {
4ba4c7f9 324 history: new CircularArray()
26fb3c18
JB
325 },
326 active: {
4ba4c7f9 327 history: new CircularArray()
26fb3c18
JB
328 }
329 }
330 })
75de9f41 331 expect(threadWorkerNode.getTaskFunctionWorkerUsage('fn1')).toStrictEqual({
26fb3c18
JB
332 tasks: {
333 executed: 0,
334 executing: 0,
335 queued: 0,
463226a4 336 sequentiallyStolen: 0,
5ad42e34 337 stolen: 0,
26fb3c18
JB
338 failed: 0
339 },
340 runTime: {
4ba4c7f9 341 history: new CircularArray()
26fb3c18
JB
342 },
343 waitTime: {
4ba4c7f9 344 history: new CircularArray()
26fb3c18
JB
345 },
346 elu: {
347 idle: {
4ba4c7f9 348 history: new CircularArray()
26fb3c18
JB
349 },
350 active: {
4ba4c7f9 351 history: new CircularArray()
26fb3c18
JB
352 }
353 }
354 })
75de9f41 355 expect(threadWorkerNode.getTaskFunctionWorkerUsage('fn2')).toStrictEqual({
26fb3c18
JB
356 tasks: {
357 executed: 0,
358 executing: 0,
359 queued: 0,
463226a4 360 sequentiallyStolen: 0,
5ad42e34 361 stolen: 0,
26fb3c18
JB
362 failed: 0
363 },
364 runTime: {
4ba4c7f9 365 history: new CircularArray()
26fb3c18
JB
366 },
367 waitTime: {
4ba4c7f9 368 history: new CircularArray()
26fb3c18
JB
369 },
370 elu: {
371 idle: {
4ba4c7f9 372 history: new CircularArray()
26fb3c18
JB
373 },
374 active: {
4ba4c7f9 375 history: new CircularArray()
26fb3c18
JB
376 }
377 }
378 })
75de9f41 379 expect(threadWorkerNode.taskFunctionsUsage.size).toBe(2)
26fb3c18 380 })
adee6053
JB
381
382 it('Worker node deleteTaskFunctionWorkerUsage()', () => {
31847469
JB
383 expect(threadWorkerNode.info.taskFunctionsProperties).toStrictEqual([
384 { name: DEFAULT_TASK_NAME },
385 { name: 'fn1' },
386 { name: 'fn2' }
adee6053
JB
387 ])
388 expect(threadWorkerNode.taskFunctionsUsage.size).toBe(2)
389 expect(
390 threadWorkerNode.deleteTaskFunctionWorkerUsage('invalidTaskFunction')
391 ).toBe(false)
392 expect(threadWorkerNode.taskFunctionsUsage.size).toBe(2)
393 expect(threadWorkerNode.deleteTaskFunctionWorkerUsage('fn1')).toBe(true)
394 expect(threadWorkerNode.taskFunctionsUsage.size).toBe(1)
395 })
26fb3c18 396})