test: align task timeout expectation
[poolifier.git] / tests / pools / abstract-pool.test.mjs
CommitLineData
a074ffee 1import { EventEmitterAsyncResource } from 'node:events'
2eb14889 2import { dirname, join } from 'node:path'
a074ffee 3import { readFileSync } from 'node:fs'
2eb14889 4import { fileURLToPath } from 'node:url'
f18fd12b 5import { createHook, executionAsyncId } from 'node:async_hooks'
a074ffee
JB
6import { expect } from 'expect'
7import { restore, stub } from 'sinon'
8import {
70a4f5ea 9 DynamicClusterPool,
9e619829 10 DynamicThreadPool,
aee46736 11 FixedClusterPool,
e843b904 12 FixedThreadPool,
aee46736 13 PoolEvents,
184855e6 14 PoolTypes,
3d6dd312 15 WorkerChoiceStrategies,
184855e6 16 WorkerTypes
d35e5717
JB
17} from '../../lib/index.cjs'
18import { CircularArray } from '../../lib/circular-array.cjs'
19import { Deque } from '../../lib/deque.cjs'
20import { DEFAULT_TASK_NAME } from '../../lib/utils.cjs'
21import { waitPoolEvents } from '../test-utils.cjs'
22import { WorkerNode } from '../../lib/pools/worker-node.cjs'
e1ffb94f
JB
23
24describe('Abstract pool test suite', () => {
2eb14889
JB
25 const version = JSON.parse(
26 readFileSync(
a5e5599c 27 join(dirname(fileURLToPath(import.meta.url)), '../..', 'package.json'),
2eb14889
JB
28 'utf8'
29 )
30 ).version
fc027381 31 const numberOfWorkers = 2
a8884ffd 32 class StubPoolWithIsMain extends FixedThreadPool {
e1ffb94f
JB
33 isMain () {
34 return false
35 }
3ec964d6 36 }
3ec964d6 37
dc021bcc 38 afterEach(() => {
a074ffee 39 restore()
dc021bcc
JB
40 })
41
bcf85f7f
JB
42 it('Verify that pool can be created and destroyed', async () => {
43 const pool = new FixedThreadPool(
44 numberOfWorkers,
45 './tests/worker-files/thread/testWorker.mjs'
46 )
47 expect(pool).toBeInstanceOf(FixedThreadPool)
48 await pool.destroy()
49 })
50
51 it('Verify that pool cannot be created from a non main thread/process', () => {
8d3782fa
JB
52 expect(
53 () =>
a8884ffd 54 new StubPoolWithIsMain(
7c0ba920 55 numberOfWorkers,
b2fd3f4a 56 './tests/worker-files/thread/testWorker.mjs',
8d3782fa 57 {
041dc05b 58 errorHandler: e => console.error(e)
8d3782fa
JB
59 }
60 )
948faff7 61 ).toThrow(
e695d66f
JB
62 new Error(
63 'Cannot start a pool from a worker with the same type as the pool'
64 )
04f45163 65 )
3ec964d6 66 })
c510fea7 67
bc61cfe6
JB
68 it('Verify that pool statuses properties are set', async () => {
69 const pool = new FixedThreadPool(
70 numberOfWorkers,
b2fd3f4a 71 './tests/worker-files/thread/testWorker.mjs'
bc61cfe6 72 )
bc61cfe6 73 expect(pool.started).toBe(true)
711623b8
JB
74 expect(pool.starting).toBe(false)
75 expect(pool.destroying).toBe(false)
bc61cfe6 76 await pool.destroy()
bc61cfe6
JB
77 })
78
c510fea7 79 it('Verify that filePath is checked', () => {
948faff7 80 expect(() => new FixedThreadPool(numberOfWorkers)).toThrow(
c3719753
JB
81 new TypeError('The worker file path must be specified')
82 )
83 expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrow(
84 new TypeError('The worker file path must be a string')
3d6dd312
JB
85 )
86 expect(
87 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
948faff7 88 ).toThrow(new Error("Cannot find the worker file './dummyWorker.ts'"))
8d3782fa
JB
89 })
90
91 it('Verify that numberOfWorkers is checked', () => {
8003c026
JB
92 expect(
93 () =>
94 new FixedThreadPool(
95 undefined,
b2fd3f4a 96 './tests/worker-files/thread/testWorker.mjs'
8003c026 97 )
948faff7 98 ).toThrow(
e695d66f
JB
99 new Error(
100 'Cannot instantiate a pool without specifying the number of workers'
101 )
8d3782fa
JB
102 )
103 })
104
105 it('Verify that a negative number of workers is checked', () => {
106 expect(
107 () =>
d35e5717 108 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.cjs')
948faff7 109 ).toThrow(
473c717a
JB
110 new RangeError(
111 'Cannot instantiate a pool with a negative number of workers'
112 )
8d3782fa
JB
113 )
114 })
115
116 it('Verify that a non integer number of workers is checked', () => {
117 expect(
118 () =>
b2fd3f4a 119 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.mjs')
948faff7 120 ).toThrow(
473c717a 121 new TypeError(
0d80593b 122 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
123 )
124 )
c510fea7 125 })
7c0ba920 126
26ce26ca
JB
127 it('Verify that pool arguments number and pool type are checked', () => {
128 expect(
129 () =>
130 new FixedThreadPool(
131 numberOfWorkers,
132 './tests/worker-files/thread/testWorker.mjs',
133 undefined,
134 numberOfWorkers * 2
135 )
136 ).toThrow(
137 new Error(
138 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
139 )
140 )
141 })
142
216541b6 143 it('Verify that dynamic pool sizing is checked', () => {
a5ed75b7
JB
144 expect(
145 () =>
146 new DynamicClusterPool(
147 1,
148 undefined,
d35e5717 149 './tests/worker-files/cluster/testWorker.cjs'
a5ed75b7 150 )
948faff7 151 ).toThrow(
a5ed75b7
JB
152 new TypeError(
153 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
154 )
155 )
2761efb4
JB
156 expect(
157 () =>
158 new DynamicThreadPool(
159 0.5,
160 1,
b2fd3f4a 161 './tests/worker-files/thread/testWorker.mjs'
2761efb4 162 )
948faff7 163 ).toThrow(
2761efb4
JB
164 new TypeError(
165 'Cannot instantiate a pool with a non safe integer number of workers'
166 )
167 )
168 expect(
169 () =>
170 new DynamicClusterPool(
171 0,
172 0.5,
d35e5717 173 './tests/worker-files/cluster/testWorker.cjs'
2761efb4 174 )
948faff7 175 ).toThrow(
2761efb4
JB
176 new TypeError(
177 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
178 )
179 )
2431bdb4
JB
180 expect(
181 () =>
b2fd3f4a
JB
182 new DynamicThreadPool(
183 2,
184 1,
185 './tests/worker-files/thread/testWorker.mjs'
186 )
948faff7 187 ).toThrow(
2431bdb4
JB
188 new RangeError(
189 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
190 )
191 )
192 expect(
193 () =>
b2fd3f4a
JB
194 new DynamicThreadPool(
195 0,
196 0,
197 './tests/worker-files/thread/testWorker.mjs'
198 )
948faff7 199 ).toThrow(
2431bdb4 200 new RangeError(
213cbac6 201 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
2431bdb4
JB
202 )
203 )
21f710aa
JB
204 expect(
205 () =>
213cbac6
JB
206 new DynamicClusterPool(
207 1,
208 1,
d35e5717 209 './tests/worker-files/cluster/testWorker.cjs'
213cbac6 210 )
948faff7 211 ).toThrow(
21f710aa 212 new RangeError(
213cbac6 213 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
21f710aa
JB
214 )
215 )
2431bdb4
JB
216 })
217
fd7ebd49 218 it('Verify that pool options are checked', async () => {
7c0ba920
JB
219 let pool = new FixedThreadPool(
220 numberOfWorkers,
b2fd3f4a 221 './tests/worker-files/thread/testWorker.mjs'
7c0ba920 222 )
b5604034 223 expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
47352846
JB
224 expect(pool.opts).toStrictEqual({
225 startWorkers: true,
226 enableEvents: true,
227 restartWorkerOnError: true,
228 enableTasksQueue: false,
26ce26ca 229 workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN
8990357d 230 })
999ef664
JB
231 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
232 .workerChoiceStrategies) {
86cbb766 233 expect(workerChoiceStrategy.opts).toStrictEqual({
86cbb766
JB
234 runTime: { median: false },
235 waitTime: { median: false },
236 elu: { median: false },
237 weights: expect.objectContaining({
238 0: expect.any(Number),
239 [pool.info.maxSize - 1]: expect.any(Number)
00e1bdeb 240 })
86cbb766 241 })
999ef664 242 }
fd7ebd49 243 await pool.destroy()
73bfd59d 244 const testHandler = () => console.info('test handler executed')
7c0ba920
JB
245 pool = new FixedThreadPool(
246 numberOfWorkers,
b2fd3f4a 247 './tests/worker-files/thread/testWorker.mjs',
7c0ba920 248 {
e4543b14 249 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
49be33fe 250 workerChoiceStrategyOptions: {
932fc8be 251 runTime: { median: true },
fc027381 252 weights: { 0: 300, 1: 200 }
49be33fe 253 },
35cf1c03 254 enableEvents: false,
1f68cede 255 restartWorkerOnError: false,
ff733df7 256 enableTasksQueue: true,
d4aeae5a 257 tasksQueueOptions: { concurrency: 2 },
35cf1c03
JB
258 messageHandler: testHandler,
259 errorHandler: testHandler,
260 onlineHandler: testHandler,
261 exitHandler: testHandler
7c0ba920
JB
262 }
263 )
7c0ba920 264 expect(pool.emitter).toBeUndefined()
47352846
JB
265 expect(pool.opts).toStrictEqual({
266 startWorkers: true,
267 enableEvents: false,
268 restartWorkerOnError: false,
269 enableTasksQueue: true,
270 tasksQueueOptions: {
271 concurrency: 2,
2324f8c9 272 size: Math.pow(numberOfWorkers, 2),
dbd73092 273 taskStealing: true,
32b141fd 274 tasksStealingOnBackPressure: true,
568d0075 275 tasksFinishedTimeout: 2000
47352846
JB
276 },
277 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
278 workerChoiceStrategyOptions: {
47352846 279 runTime: { median: true },
47352846
JB
280 weights: { 0: 300, 1: 200 }
281 },
282 onlineHandler: testHandler,
283 messageHandler: testHandler,
284 errorHandler: testHandler,
285 exitHandler: testHandler
8990357d 286 })
999ef664
JB
287 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
288 .workerChoiceStrategies) {
289 expect(workerChoiceStrategy.opts).toStrictEqual({
999ef664
JB
290 runTime: { median: true },
291 waitTime: { median: false },
292 elu: { median: false },
293 weights: { 0: 300, 1: 200 }
294 })
295 }
fd7ebd49 296 await pool.destroy()
7c0ba920
JB
297 })
298
fe291b64 299 it('Verify that pool options are validated', () => {
d4aeae5a
JB
300 expect(
301 () =>
302 new FixedThreadPool(
303 numberOfWorkers,
b2fd3f4a 304 './tests/worker-files/thread/testWorker.mjs',
d4aeae5a 305 {
f0d7f803 306 workerChoiceStrategy: 'invalidStrategy'
d4aeae5a
JB
307 }
308 )
948faff7 309 ).toThrow(new Error("Invalid worker choice strategy 'invalidStrategy'"))
49be33fe
JB
310 expect(
311 () =>
312 new FixedThreadPool(
313 numberOfWorkers,
b2fd3f4a 314 './tests/worker-files/thread/testWorker.mjs',
49be33fe
JB
315 {
316 workerChoiceStrategyOptions: { weights: {} }
317 }
318 )
948faff7 319 ).toThrow(
8735b4e5
JB
320 new Error(
321 'Invalid worker choice strategy options: must have a weight for each worker node'
322 )
49be33fe 323 )
f0d7f803
JB
324 expect(
325 () =>
326 new FixedThreadPool(
327 numberOfWorkers,
b2fd3f4a 328 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
329 {
330 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
331 }
332 )
948faff7 333 ).toThrow(
8735b4e5
JB
334 new Error(
335 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
336 )
f0d7f803 337 )
5c4c2dee
JB
338 expect(
339 () =>
340 new FixedThreadPool(
341 numberOfWorkers,
b2fd3f4a 342 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
343 {
344 enableTasksQueue: true,
345 tasksQueueOptions: 'invalidTasksQueueOptions'
346 }
347 )
948faff7 348 ).toThrow(
5c4c2dee
JB
349 new TypeError('Invalid tasks queue options: must be a plain object')
350 )
f0d7f803
JB
351 expect(
352 () =>
353 new FixedThreadPool(
354 numberOfWorkers,
b2fd3f4a 355 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
356 {
357 enableTasksQueue: true,
358 tasksQueueOptions: { concurrency: 0 }
359 }
360 )
948faff7 361 ).toThrow(
e695d66f 362 new RangeError(
20c6f652 363 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
8735b4e5
JB
364 )
365 )
f0d7f803
JB
366 expect(
367 () =>
368 new FixedThreadPool(
369 numberOfWorkers,
b2fd3f4a 370 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
371 {
372 enableTasksQueue: true,
5c4c2dee 373 tasksQueueOptions: { concurrency: -1 }
f0d7f803
JB
374 }
375 )
948faff7 376 ).toThrow(
5c4c2dee
JB
377 new RangeError(
378 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
379 )
8735b4e5 380 )
f0d7f803
JB
381 expect(
382 () =>
383 new FixedThreadPool(
384 numberOfWorkers,
b2fd3f4a 385 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
386 {
387 enableTasksQueue: true,
388 tasksQueueOptions: { concurrency: 0.2 }
389 }
390 )
948faff7 391 ).toThrow(
20c6f652 392 new TypeError('Invalid worker node tasks concurrency: must be an integer')
8735b4e5 393 )
5c4c2dee
JB
394 expect(
395 () =>
396 new FixedThreadPool(
397 numberOfWorkers,
b2fd3f4a 398 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
399 {
400 enableTasksQueue: true,
401 tasksQueueOptions: { size: 0 }
402 }
403 )
948faff7 404 ).toThrow(
5c4c2dee
JB
405 new RangeError(
406 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
407 )
408 )
409 expect(
410 () =>
411 new FixedThreadPool(
412 numberOfWorkers,
b2fd3f4a 413 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
414 {
415 enableTasksQueue: true,
416 tasksQueueOptions: { size: -1 }
417 }
418 )
948faff7 419 ).toThrow(
5c4c2dee
JB
420 new RangeError(
421 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
422 )
423 )
424 expect(
425 () =>
426 new FixedThreadPool(
427 numberOfWorkers,
b2fd3f4a 428 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
429 {
430 enableTasksQueue: true,
431 tasksQueueOptions: { size: 0.2 }
432 }
433 )
948faff7 434 ).toThrow(
5c4c2dee
JB
435 new TypeError('Invalid worker node tasks queue size: must be an integer')
436 )
d4aeae5a
JB
437 })
438
2431bdb4 439 it('Verify that pool worker choice strategy options can be set', async () => {
a20f0ba5
JB
440 const pool = new FixedThreadPool(
441 numberOfWorkers,
b2fd3f4a 442 './tests/worker-files/thread/testWorker.mjs',
a20f0ba5
JB
443 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
444 )
26ce26ca 445 expect(pool.opts.workerChoiceStrategyOptions).toBeUndefined()
a20f0ba5
JB
446 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
447 .workerChoiceStrategies) {
86cbb766 448 expect(workerChoiceStrategy.opts).toStrictEqual({
86cbb766
JB
449 runTime: { median: false },
450 waitTime: { median: false },
451 elu: { median: false },
452 weights: expect.objectContaining({
453 0: expect.any(Number),
454 [pool.info.maxSize - 1]: expect.any(Number)
00e1bdeb 455 })
86cbb766 456 })
a20f0ba5 457 }
87de9ff5
JB
458 expect(
459 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
460 ).toStrictEqual({
932fc8be
JB
461 runTime: {
462 aggregate: true,
463 average: true,
464 median: false
465 },
466 waitTime: {
467 aggregate: false,
468 average: false,
469 median: false
470 },
5df69fab 471 elu: {
9adcefab
JB
472 aggregate: true,
473 average: true,
5df69fab
JB
474 median: false
475 }
86bf340d 476 })
9adcefab
JB
477 pool.setWorkerChoiceStrategyOptions({
478 runTime: { median: true },
479 elu: { median: true }
480 })
a20f0ba5 481 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
9adcefab 482 runTime: { median: true },
8990357d
JB
483 elu: { median: true }
484 })
a20f0ba5
JB
485 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
486 .workerChoiceStrategies) {
86cbb766 487 expect(workerChoiceStrategy.opts).toStrictEqual({
86cbb766
JB
488 runTime: { median: true },
489 waitTime: { median: false },
490 elu: { median: true },
491 weights: expect.objectContaining({
492 0: expect.any(Number),
493 [pool.info.maxSize - 1]: expect.any(Number)
00e1bdeb 494 })
86cbb766 495 })
a20f0ba5 496 }
87de9ff5
JB
497 expect(
498 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
499 ).toStrictEqual({
932fc8be
JB
500 runTime: {
501 aggregate: true,
502 average: false,
503 median: true
504 },
505 waitTime: {
506 aggregate: false,
507 average: false,
508 median: false
509 },
5df69fab 510 elu: {
9adcefab 511 aggregate: true,
5df69fab 512 average: false,
9adcefab 513 median: true
5df69fab 514 }
86bf340d 515 })
9adcefab
JB
516 pool.setWorkerChoiceStrategyOptions({
517 runTime: { median: false },
518 elu: { median: false }
519 })
a20f0ba5 520 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
8990357d 521 runTime: { median: false },
8990357d
JB
522 elu: { median: false }
523 })
a20f0ba5
JB
524 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
525 .workerChoiceStrategies) {
86cbb766 526 expect(workerChoiceStrategy.opts).toStrictEqual({
86cbb766
JB
527 runTime: { median: false },
528 waitTime: { median: false },
529 elu: { median: false },
530 weights: expect.objectContaining({
531 0: expect.any(Number),
532 [pool.info.maxSize - 1]: expect.any(Number)
00e1bdeb 533 })
86cbb766 534 })
a20f0ba5 535 }
87de9ff5
JB
536 expect(
537 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
538 ).toStrictEqual({
932fc8be
JB
539 runTime: {
540 aggregate: true,
541 average: true,
542 median: false
543 },
544 waitTime: {
545 aggregate: false,
546 average: false,
547 median: false
548 },
5df69fab 549 elu: {
9adcefab
JB
550 aggregate: true,
551 average: true,
5df69fab
JB
552 median: false
553 }
86bf340d 554 })
1f95d544
JB
555 expect(() =>
556 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
948faff7 557 ).toThrow(
8735b4e5
JB
558 new TypeError(
559 'Invalid worker choice strategy options: must be a plain object'
560 )
1f95d544 561 )
948faff7 562 expect(() => pool.setWorkerChoiceStrategyOptions({ weights: {} })).toThrow(
8735b4e5
JB
563 new Error(
564 'Invalid worker choice strategy options: must have a weight for each worker node'
565 )
1f95d544
JB
566 )
567 expect(() =>
568 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
948faff7 569 ).toThrow(
8735b4e5
JB
570 new Error(
571 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
572 )
1f95d544 573 )
a20f0ba5
JB
574 await pool.destroy()
575 })
576
2431bdb4 577 it('Verify that pool tasks queue can be enabled/disabled', async () => {
a20f0ba5
JB
578 const pool = new FixedThreadPool(
579 numberOfWorkers,
b2fd3f4a 580 './tests/worker-files/thread/testWorker.mjs'
a20f0ba5
JB
581 )
582 expect(pool.opts.enableTasksQueue).toBe(false)
583 expect(pool.opts.tasksQueueOptions).toBeUndefined()
584 pool.enableTasksQueue(true)
585 expect(pool.opts.enableTasksQueue).toBe(true)
20c6f652
JB
586 expect(pool.opts.tasksQueueOptions).toStrictEqual({
587 concurrency: 1,
2324f8c9 588 size: Math.pow(numberOfWorkers, 2),
dbd73092 589 taskStealing: true,
32b141fd 590 tasksStealingOnBackPressure: true,
568d0075 591 tasksFinishedTimeout: 2000
20c6f652 592 })
a20f0ba5
JB
593 pool.enableTasksQueue(true, { concurrency: 2 })
594 expect(pool.opts.enableTasksQueue).toBe(true)
20c6f652
JB
595 expect(pool.opts.tasksQueueOptions).toStrictEqual({
596 concurrency: 2,
2324f8c9 597 size: Math.pow(numberOfWorkers, 2),
dbd73092 598 taskStealing: true,
32b141fd 599 tasksStealingOnBackPressure: true,
568d0075 600 tasksFinishedTimeout: 2000
20c6f652 601 })
a20f0ba5
JB
602 pool.enableTasksQueue(false)
603 expect(pool.opts.enableTasksQueue).toBe(false)
604 expect(pool.opts.tasksQueueOptions).toBeUndefined()
605 await pool.destroy()
606 })
607
2431bdb4 608 it('Verify that pool tasks queue options can be set', async () => {
a20f0ba5
JB
609 const pool = new FixedThreadPool(
610 numberOfWorkers,
b2fd3f4a 611 './tests/worker-files/thread/testWorker.mjs',
a20f0ba5
JB
612 { enableTasksQueue: true }
613 )
20c6f652
JB
614 expect(pool.opts.tasksQueueOptions).toStrictEqual({
615 concurrency: 1,
2324f8c9 616 size: Math.pow(numberOfWorkers, 2),
dbd73092 617 taskStealing: true,
32b141fd 618 tasksStealingOnBackPressure: true,
568d0075 619 tasksFinishedTimeout: 2000
20c6f652 620 })
d6ca1416 621 for (const workerNode of pool.workerNodes) {
2324f8c9
JB
622 expect(workerNode.tasksQueueBackPressureSize).toBe(
623 pool.opts.tasksQueueOptions.size
624 )
d6ca1416
JB
625 }
626 pool.setTasksQueueOptions({
627 concurrency: 2,
2324f8c9 628 size: 2,
d6ca1416 629 taskStealing: false,
32b141fd 630 tasksStealingOnBackPressure: false,
568d0075 631 tasksFinishedTimeout: 3000
d6ca1416 632 })
20c6f652
JB
633 expect(pool.opts.tasksQueueOptions).toStrictEqual({
634 concurrency: 2,
2324f8c9 635 size: 2,
d6ca1416 636 taskStealing: false,
32b141fd 637 tasksStealingOnBackPressure: false,
568d0075 638 tasksFinishedTimeout: 3000
d6ca1416
JB
639 })
640 for (const workerNode of pool.workerNodes) {
2324f8c9
JB
641 expect(workerNode.tasksQueueBackPressureSize).toBe(
642 pool.opts.tasksQueueOptions.size
643 )
d6ca1416
JB
644 }
645 pool.setTasksQueueOptions({
646 concurrency: 1,
647 taskStealing: true,
648 tasksStealingOnBackPressure: true
649 })
650 expect(pool.opts.tasksQueueOptions).toStrictEqual({
651 concurrency: 1,
2324f8c9 652 size: Math.pow(numberOfWorkers, 2),
dbd73092 653 taskStealing: true,
32b141fd 654 tasksStealingOnBackPressure: true,
568d0075 655 tasksFinishedTimeout: 2000
20c6f652 656 })
d6ca1416 657 for (const workerNode of pool.workerNodes) {
2324f8c9
JB
658 expect(workerNode.tasksQueueBackPressureSize).toBe(
659 pool.opts.tasksQueueOptions.size
660 )
d6ca1416 661 }
948faff7 662 expect(() => pool.setTasksQueueOptions('invalidTasksQueueOptions')).toThrow(
8735b4e5
JB
663 new TypeError('Invalid tasks queue options: must be a plain object')
664 )
948faff7 665 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrow(
e695d66f 666 new RangeError(
20c6f652 667 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
8735b4e5
JB
668 )
669 )
948faff7 670 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrow(
e695d66f 671 new RangeError(
20c6f652 672 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
8735b4e5 673 )
a20f0ba5 674 )
948faff7 675 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrow(
20c6f652
JB
676 new TypeError('Invalid worker node tasks concurrency: must be an integer')
677 )
948faff7 678 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrow(
20c6f652 679 new RangeError(
68dbcdc0 680 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
20c6f652
JB
681 )
682 )
948faff7 683 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrow(
20c6f652 684 new RangeError(
68dbcdc0 685 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
20c6f652
JB
686 )
687 )
948faff7 688 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrow(
68dbcdc0 689 new TypeError('Invalid worker node tasks queue size: must be an integer')
f0d7f803 690 )
a20f0ba5
JB
691 await pool.destroy()
692 })
693
6b27d407
JB
694 it('Verify that pool info is set', async () => {
695 let pool = new FixedThreadPool(
696 numberOfWorkers,
b2fd3f4a 697 './tests/worker-files/thread/testWorker.mjs'
6b27d407 698 )
2dca6cab
JB
699 expect(pool.info).toStrictEqual({
700 version,
701 type: PoolTypes.fixed,
702 worker: WorkerTypes.thread,
47352846 703 started: true,
2dca6cab
JB
704 ready: true,
705 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
706 minSize: numberOfWorkers,
707 maxSize: numberOfWorkers,
708 workerNodes: numberOfWorkers,
709 idleWorkerNodes: numberOfWorkers,
710 busyWorkerNodes: 0,
711 executedTasks: 0,
712 executingTasks: 0,
2dca6cab
JB
713 failedTasks: 0
714 })
6b27d407
JB
715 await pool.destroy()
716 pool = new DynamicClusterPool(
2431bdb4 717 Math.floor(numberOfWorkers / 2),
6b27d407 718 numberOfWorkers,
d35e5717 719 './tests/worker-files/cluster/testWorker.cjs'
6b27d407 720 )
2dca6cab
JB
721 expect(pool.info).toStrictEqual({
722 version,
723 type: PoolTypes.dynamic,
724 worker: WorkerTypes.cluster,
47352846 725 started: true,
2dca6cab
JB
726 ready: true,
727 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
728 minSize: Math.floor(numberOfWorkers / 2),
729 maxSize: numberOfWorkers,
730 workerNodes: Math.floor(numberOfWorkers / 2),
731 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
732 busyWorkerNodes: 0,
733 executedTasks: 0,
734 executingTasks: 0,
2dca6cab
JB
735 failedTasks: 0
736 })
6b27d407
JB
737 await pool.destroy()
738 })
739
2431bdb4 740 it('Verify that pool worker tasks usage are initialized', async () => {
bf9549ae
JB
741 const pool = new FixedClusterPool(
742 numberOfWorkers,
d35e5717 743 './tests/worker-files/cluster/testWorker.cjs'
bf9549ae 744 )
f06e48d8 745 for (const workerNode of pool.workerNodes) {
47352846 746 expect(workerNode).toBeInstanceOf(WorkerNode)
465b2940 747 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
748 tasks: {
749 executed: 0,
750 executing: 0,
751 queued: 0,
df593701 752 maxQueued: 0,
463226a4 753 sequentiallyStolen: 0,
68cbdc84 754 stolen: 0,
a4e07f72
JB
755 failed: 0
756 },
757 runTime: {
4ba4c7f9 758 history: new CircularArray()
a4e07f72
JB
759 },
760 waitTime: {
4ba4c7f9 761 history: new CircularArray()
a4e07f72 762 },
5df69fab
JB
763 elu: {
764 idle: {
4ba4c7f9 765 history: new CircularArray()
5df69fab
JB
766 },
767 active: {
4ba4c7f9 768 history: new CircularArray()
f7510105 769 }
5df69fab 770 }
86bf340d 771 })
f06e48d8
JB
772 }
773 await pool.destroy()
774 })
775
2431bdb4
JB
776 it('Verify that pool worker tasks queue are initialized', async () => {
777 let pool = new FixedClusterPool(
f06e48d8 778 numberOfWorkers,
d35e5717 779 './tests/worker-files/cluster/testWorker.cjs'
f06e48d8
JB
780 )
781 for (const workerNode of pool.workerNodes) {
47352846 782 expect(workerNode).toBeInstanceOf(WorkerNode)
574b351d 783 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
4d8bf9e4 784 expect(workerNode.tasksQueue.size).toBe(0)
9c16fb4b 785 expect(workerNode.tasksQueue.maxSize).toBe(0)
bf9549ae 786 }
fd7ebd49 787 await pool.destroy()
2431bdb4
JB
788 pool = new DynamicThreadPool(
789 Math.floor(numberOfWorkers / 2),
790 numberOfWorkers,
b2fd3f4a 791 './tests/worker-files/thread/testWorker.mjs'
2431bdb4
JB
792 )
793 for (const workerNode of pool.workerNodes) {
47352846 794 expect(workerNode).toBeInstanceOf(WorkerNode)
574b351d 795 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
2431bdb4
JB
796 expect(workerNode.tasksQueue.size).toBe(0)
797 expect(workerNode.tasksQueue.maxSize).toBe(0)
798 }
213cbac6 799 await pool.destroy()
2431bdb4
JB
800 })
801
802 it('Verify that pool worker info are initialized', async () => {
803 let pool = new FixedClusterPool(
804 numberOfWorkers,
d35e5717 805 './tests/worker-files/cluster/testWorker.cjs'
2431bdb4 806 )
2dca6cab 807 for (const workerNode of pool.workerNodes) {
47352846 808 expect(workerNode).toBeInstanceOf(WorkerNode)
2dca6cab
JB
809 expect(workerNode.info).toStrictEqual({
810 id: expect.any(Number),
811 type: WorkerTypes.cluster,
812 dynamic: false,
5eb72b9e
JB
813 ready: true,
814 stealing: false
2dca6cab
JB
815 })
816 }
2431bdb4
JB
817 await pool.destroy()
818 pool = new DynamicThreadPool(
819 Math.floor(numberOfWorkers / 2),
820 numberOfWorkers,
b2fd3f4a 821 './tests/worker-files/thread/testWorker.mjs'
2431bdb4 822 )
2dca6cab 823 for (const workerNode of pool.workerNodes) {
47352846 824 expect(workerNode).toBeInstanceOf(WorkerNode)
2dca6cab
JB
825 expect(workerNode.info).toStrictEqual({
826 id: expect.any(Number),
827 type: WorkerTypes.thread,
828 dynamic: false,
5eb72b9e
JB
829 ready: true,
830 stealing: false
2dca6cab
JB
831 })
832 }
213cbac6 833 await pool.destroy()
bf9549ae
JB
834 })
835
711623b8
JB
836 it('Verify that pool statuses are checked at start or destroy', async () => {
837 const pool = new FixedThreadPool(
838 numberOfWorkers,
839 './tests/worker-files/thread/testWorker.mjs'
840 )
841 expect(pool.info.started).toBe(true)
842 expect(pool.info.ready).toBe(true)
843 expect(() => pool.start()).toThrow(
844 new Error('Cannot start an already started pool')
845 )
846 await pool.destroy()
847 expect(pool.info.started).toBe(false)
848 expect(pool.info.ready).toBe(false)
849 await expect(pool.destroy()).rejects.toThrow(
850 new Error('Cannot destroy an already destroyed pool')
851 )
852 })
853
47352846
JB
854 it('Verify that pool can be started after initialization', async () => {
855 const pool = new FixedClusterPool(
856 numberOfWorkers,
d35e5717 857 './tests/worker-files/cluster/testWorker.cjs',
47352846
JB
858 {
859 startWorkers: false
860 }
861 )
862 expect(pool.info.started).toBe(false)
863 expect(pool.info.ready).toBe(false)
55082af9 864 expect(pool.readyEventEmitted).toBe(false)
47352846 865 expect(pool.workerNodes).toStrictEqual([])
948faff7 866 await expect(pool.execute()).rejects.toThrow(
47352846
JB
867 new Error('Cannot execute a task on not started pool')
868 )
869 pool.start()
870 expect(pool.info.started).toBe(true)
871 expect(pool.info.ready).toBe(true)
55082af9
JB
872 await waitPoolEvents(pool, PoolEvents.ready, 1)
873 expect(pool.readyEventEmitted).toBe(true)
47352846
JB
874 expect(pool.workerNodes.length).toBe(numberOfWorkers)
875 for (const workerNode of pool.workerNodes) {
876 expect(workerNode).toBeInstanceOf(WorkerNode)
877 }
878 await pool.destroy()
879 })
880
9d2d0da1
JB
881 it('Verify that pool execute() arguments are checked', async () => {
882 const pool = new FixedClusterPool(
883 numberOfWorkers,
d35e5717 884 './tests/worker-files/cluster/testWorker.cjs'
9d2d0da1 885 )
948faff7 886 await expect(pool.execute(undefined, 0)).rejects.toThrow(
9d2d0da1
JB
887 new TypeError('name argument must be a string')
888 )
948faff7 889 await expect(pool.execute(undefined, '')).rejects.toThrow(
9d2d0da1
JB
890 new TypeError('name argument must not be an empty string')
891 )
948faff7 892 await expect(pool.execute(undefined, undefined, {})).rejects.toThrow(
9d2d0da1
JB
893 new TypeError('transferList argument must be an array')
894 )
895 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
896 "Task function 'unknown' not found"
897 )
898 await pool.destroy()
948faff7 899 await expect(pool.execute()).rejects.toThrow(
47352846 900 new Error('Cannot execute a task on not started pool')
9d2d0da1
JB
901 )
902 })
903
2431bdb4 904 it('Verify that pool worker tasks usage are computed', async () => {
bf9549ae
JB
905 const pool = new FixedClusterPool(
906 numberOfWorkers,
d35e5717 907 './tests/worker-files/cluster/testWorker.cjs'
bf9549ae 908 )
09c2d0d3 909 const promises = new Set()
fc027381
JB
910 const maxMultiplier = 2
911 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
09c2d0d3 912 promises.add(pool.execute())
bf9549ae 913 }
f06e48d8 914 for (const workerNode of pool.workerNodes) {
465b2940 915 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
916 tasks: {
917 executed: 0,
918 executing: maxMultiplier,
919 queued: 0,
df593701 920 maxQueued: 0,
463226a4 921 sequentiallyStolen: 0,
68cbdc84 922 stolen: 0,
a4e07f72
JB
923 failed: 0
924 },
925 runTime: {
a4e07f72
JB
926 history: expect.any(CircularArray)
927 },
928 waitTime: {
a4e07f72
JB
929 history: expect.any(CircularArray)
930 },
5df69fab
JB
931 elu: {
932 idle: {
5df69fab
JB
933 history: expect.any(CircularArray)
934 },
935 active: {
5df69fab 936 history: expect.any(CircularArray)
f7510105 937 }
5df69fab 938 }
86bf340d 939 })
bf9549ae
JB
940 }
941 await Promise.all(promises)
f06e48d8 942 for (const workerNode of pool.workerNodes) {
465b2940 943 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
944 tasks: {
945 executed: maxMultiplier,
946 executing: 0,
947 queued: 0,
df593701 948 maxQueued: 0,
463226a4 949 sequentiallyStolen: 0,
68cbdc84 950 stolen: 0,
a4e07f72
JB
951 failed: 0
952 },
953 runTime: {
a4e07f72
JB
954 history: expect.any(CircularArray)
955 },
956 waitTime: {
a4e07f72
JB
957 history: expect.any(CircularArray)
958 },
5df69fab
JB
959 elu: {
960 idle: {
5df69fab
JB
961 history: expect.any(CircularArray)
962 },
963 active: {
5df69fab 964 history: expect.any(CircularArray)
f7510105 965 }
5df69fab 966 }
86bf340d 967 })
bf9549ae 968 }
fd7ebd49 969 await pool.destroy()
bf9549ae
JB
970 })
971
2431bdb4 972 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
7fd82a1c 973 const pool = new DynamicThreadPool(
2431bdb4 974 Math.floor(numberOfWorkers / 2),
8f4878b7 975 numberOfWorkers,
b2fd3f4a 976 './tests/worker-files/thread/testWorker.mjs'
9e619829 977 )
09c2d0d3 978 const promises = new Set()
ee9f5295
JB
979 const maxMultiplier = 2
980 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
09c2d0d3 981 promises.add(pool.execute())
9e619829
JB
982 }
983 await Promise.all(promises)
f06e48d8 984 for (const workerNode of pool.workerNodes) {
465b2940 985 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
986 tasks: {
987 executed: expect.any(Number),
988 executing: 0,
989 queued: 0,
df593701 990 maxQueued: 0,
463226a4 991 sequentiallyStolen: 0,
68cbdc84 992 stolen: 0,
a4e07f72
JB
993 failed: 0
994 },
995 runTime: {
a4e07f72
JB
996 history: expect.any(CircularArray)
997 },
998 waitTime: {
a4e07f72
JB
999 history: expect.any(CircularArray)
1000 },
5df69fab
JB
1001 elu: {
1002 idle: {
5df69fab
JB
1003 history: expect.any(CircularArray)
1004 },
1005 active: {
5df69fab 1006 history: expect.any(CircularArray)
f7510105 1007 }
5df69fab 1008 }
86bf340d 1009 })
465b2940 1010 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
94407def
JB
1011 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1012 numberOfWorkers * maxMultiplier
1013 )
b97d82d8
JB
1014 expect(workerNode.usage.runTime.history.length).toBe(0)
1015 expect(workerNode.usage.waitTime.history.length).toBe(0)
1016 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1017 expect(workerNode.usage.elu.active.history.length).toBe(0)
9e619829
JB
1018 }
1019 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
f06e48d8 1020 for (const workerNode of pool.workerNodes) {
465b2940 1021 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
1022 tasks: {
1023 executed: 0,
1024 executing: 0,
1025 queued: 0,
df593701 1026 maxQueued: 0,
463226a4 1027 sequentiallyStolen: 0,
68cbdc84 1028 stolen: 0,
a4e07f72
JB
1029 failed: 0
1030 },
1031 runTime: {
a4e07f72
JB
1032 history: expect.any(CircularArray)
1033 },
1034 waitTime: {
a4e07f72
JB
1035 history: expect.any(CircularArray)
1036 },
5df69fab
JB
1037 elu: {
1038 idle: {
5df69fab
JB
1039 history: expect.any(CircularArray)
1040 },
1041 active: {
5df69fab 1042 history: expect.any(CircularArray)
f7510105 1043 }
5df69fab 1044 }
86bf340d 1045 })
465b2940
JB
1046 expect(workerNode.usage.runTime.history.length).toBe(0)
1047 expect(workerNode.usage.waitTime.history.length).toBe(0)
b97d82d8
JB
1048 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1049 expect(workerNode.usage.elu.active.history.length).toBe(0)
ee11a4a2 1050 }
fd7ebd49 1051 await pool.destroy()
ee11a4a2
JB
1052 })
1053
a1763c54
JB
1054 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1055 const pool = new DynamicClusterPool(
2431bdb4 1056 Math.floor(numberOfWorkers / 2),
164d950a 1057 numberOfWorkers,
d35e5717 1058 './tests/worker-files/cluster/testWorker.cjs'
164d950a 1059 )
c726f66c 1060 expect(pool.emitter.eventNames()).toStrictEqual([])
d46660cd 1061 let poolInfo
a1763c54 1062 let poolReady = 0
041dc05b 1063 pool.emitter.on(PoolEvents.ready, info => {
a1763c54 1064 ++poolReady
d46660cd
JB
1065 poolInfo = info
1066 })
a1763c54 1067 await waitPoolEvents(pool, PoolEvents.ready, 1)
c726f66c 1068 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
a1763c54 1069 expect(poolReady).toBe(1)
d46660cd 1070 expect(poolInfo).toStrictEqual({
23ccf9d7 1071 version,
d46660cd 1072 type: PoolTypes.dynamic,
a1763c54 1073 worker: WorkerTypes.cluster,
47352846 1074 started: true,
a1763c54 1075 ready: true,
2431bdb4
JB
1076 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1077 minSize: expect.any(Number),
1078 maxSize: expect.any(Number),
1079 workerNodes: expect.any(Number),
1080 idleWorkerNodes: expect.any(Number),
1081 busyWorkerNodes: expect.any(Number),
1082 executedTasks: expect.any(Number),
1083 executingTasks: expect.any(Number),
2431bdb4
JB
1084 failedTasks: expect.any(Number)
1085 })
1086 await pool.destroy()
1087 })
1088
a1763c54
JB
1089 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1090 const pool = new FixedThreadPool(
2431bdb4 1091 numberOfWorkers,
b2fd3f4a 1092 './tests/worker-files/thread/testWorker.mjs'
2431bdb4 1093 )
c726f66c 1094 expect(pool.emitter.eventNames()).toStrictEqual([])
a1763c54
JB
1095 const promises = new Set()
1096 let poolBusy = 0
2431bdb4 1097 let poolInfo
041dc05b 1098 pool.emitter.on(PoolEvents.busy, info => {
a1763c54 1099 ++poolBusy
2431bdb4
JB
1100 poolInfo = info
1101 })
c726f66c 1102 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
a1763c54
JB
1103 for (let i = 0; i < numberOfWorkers * 2; i++) {
1104 promises.add(pool.execute())
1105 }
1106 await Promise.all(promises)
1107 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1108 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1109 expect(poolBusy).toBe(numberOfWorkers + 1)
2431bdb4
JB
1110 expect(poolInfo).toStrictEqual({
1111 version,
a1763c54
JB
1112 type: PoolTypes.fixed,
1113 worker: WorkerTypes.thread,
47352846
JB
1114 started: true,
1115 ready: true,
2431bdb4 1116 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
d46660cd
JB
1117 minSize: expect.any(Number),
1118 maxSize: expect.any(Number),
1119 workerNodes: expect.any(Number),
1120 idleWorkerNodes: expect.any(Number),
1121 busyWorkerNodes: expect.any(Number),
a4e07f72
JB
1122 executedTasks: expect.any(Number),
1123 executingTasks: expect.any(Number),
a4e07f72 1124 failedTasks: expect.any(Number)
d46660cd 1125 })
164d950a
JB
1126 await pool.destroy()
1127 })
1128
a1763c54
JB
1129 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1130 const pool = new DynamicThreadPool(
1131 Math.floor(numberOfWorkers / 2),
7c0ba920 1132 numberOfWorkers,
b2fd3f4a 1133 './tests/worker-files/thread/testWorker.mjs'
7c0ba920 1134 )
c726f66c 1135 expect(pool.emitter.eventNames()).toStrictEqual([])
09c2d0d3 1136 const promises = new Set()
a1763c54 1137 let poolFull = 0
d46660cd 1138 let poolInfo
041dc05b 1139 pool.emitter.on(PoolEvents.full, info => {
a1763c54 1140 ++poolFull
d46660cd
JB
1141 poolInfo = info
1142 })
c726f66c 1143 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.full])
7c0ba920 1144 for (let i = 0; i < numberOfWorkers * 2; i++) {
f5d14e90 1145 promises.add(pool.execute())
7c0ba920 1146 }
cf597bc5 1147 await Promise.all(promises)
33e6bb4c 1148 expect(poolFull).toBe(1)
d46660cd 1149 expect(poolInfo).toStrictEqual({
23ccf9d7 1150 version,
a1763c54 1151 type: PoolTypes.dynamic,
d46660cd 1152 worker: WorkerTypes.thread,
47352846
JB
1153 started: true,
1154 ready: true,
2431bdb4 1155 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
8735b4e5
JB
1156 minSize: expect.any(Number),
1157 maxSize: expect.any(Number),
1158 workerNodes: expect.any(Number),
1159 idleWorkerNodes: expect.any(Number),
1160 busyWorkerNodes: expect.any(Number),
1161 executedTasks: expect.any(Number),
1162 executingTasks: expect.any(Number),
1163 failedTasks: expect.any(Number)
1164 })
1165 await pool.destroy()
1166 })
1167
3e8611a8 1168 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
b1aae695 1169 const pool = new FixedThreadPool(
8735b4e5 1170 numberOfWorkers,
b2fd3f4a 1171 './tests/worker-files/thread/testWorker.mjs',
8735b4e5
JB
1172 {
1173 enableTasksQueue: true
1174 }
1175 )
a074ffee 1176 stub(pool, 'hasBackPressure').returns(true)
c726f66c 1177 expect(pool.emitter.eventNames()).toStrictEqual([])
8735b4e5
JB
1178 const promises = new Set()
1179 let poolBackPressure = 0
1180 let poolInfo
041dc05b 1181 pool.emitter.on(PoolEvents.backPressure, info => {
8735b4e5
JB
1182 ++poolBackPressure
1183 poolInfo = info
1184 })
c726f66c 1185 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.backPressure])
033f1776 1186 for (let i = 0; i < numberOfWorkers + 1; i++) {
8735b4e5
JB
1187 promises.add(pool.execute())
1188 }
1189 await Promise.all(promises)
033f1776 1190 expect(poolBackPressure).toBe(1)
8735b4e5
JB
1191 expect(poolInfo).toStrictEqual({
1192 version,
3e8611a8 1193 type: PoolTypes.fixed,
8735b4e5 1194 worker: WorkerTypes.thread,
47352846
JB
1195 started: true,
1196 ready: true,
8735b4e5 1197 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
d46660cd
JB
1198 minSize: expect.any(Number),
1199 maxSize: expect.any(Number),
1200 workerNodes: expect.any(Number),
1201 idleWorkerNodes: expect.any(Number),
5eb72b9e 1202 stealingWorkerNodes: expect.any(Number),
d46660cd 1203 busyWorkerNodes: expect.any(Number),
a4e07f72
JB
1204 executedTasks: expect.any(Number),
1205 executingTasks: expect.any(Number),
3e8611a8
JB
1206 maxQueuedTasks: expect.any(Number),
1207 queuedTasks: expect.any(Number),
1208 backPressure: true,
68cbdc84 1209 stolenTasks: expect.any(Number),
a4e07f72 1210 failedTasks: expect.any(Number)
d46660cd 1211 })
5eb72b9e 1212 expect(pool.hasBackPressure.callCount).toBeGreaterThanOrEqual(7)
fd7ebd49 1213 await pool.destroy()
7c0ba920 1214 })
70a4f5ea 1215
85b2561d
JB
1216 it('Verify that destroy() waits for queued tasks to finish', async () => {
1217 const tasksFinishedTimeout = 2500
1218 const pool = new FixedThreadPool(
1219 numberOfWorkers,
1220 './tests/worker-files/thread/asyncWorker.mjs',
1221 {
1222 enableTasksQueue: true,
1223 tasksQueueOptions: { tasksFinishedTimeout }
1224 }
1225 )
1226 const maxMultiplier = 4
1227 let tasksFinished = 0
1228 for (const workerNode of pool.workerNodes) {
1229 workerNode.on('taskFinished', () => {
1230 ++tasksFinished
1231 })
1232 }
1233 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1234 pool.execute()
1235 }
1236 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1237 const startTime = performance.now()
1238 await pool.destroy()
1239 const elapsedTime = performance.now() - startTime
90afa746 1240 expect(tasksFinished).toBeLessThanOrEqual(numberOfWorkers * maxMultiplier)
85b2561d 1241 expect(elapsedTime).toBeGreaterThanOrEqual(2000)
14d5e183 1242 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 800)
85b2561d
JB
1243 })
1244
1245 it('Verify that destroy() waits until the tasks finished timeout is reached', async () => {
1246 const tasksFinishedTimeout = 1000
1247 const pool = new FixedThreadPool(
1248 numberOfWorkers,
1249 './tests/worker-files/thread/asyncWorker.mjs',
1250 {
1251 enableTasksQueue: true,
1252 tasksQueueOptions: { tasksFinishedTimeout }
1253 }
1254 )
1255 const maxMultiplier = 4
1256 let tasksFinished = 0
1257 for (const workerNode of pool.workerNodes) {
1258 workerNode.on('taskFinished', () => {
1259 ++tasksFinished
1260 })
1261 }
1262 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1263 pool.execute()
1264 }
1265 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1266 const startTime = performance.now()
1267 await pool.destroy()
1268 const elapsedTime = performance.now() - startTime
1269 expect(tasksFinished).toBe(0)
2885534c 1270 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 800)
85b2561d
JB
1271 })
1272
f18fd12b
JB
1273 it('Verify that pool asynchronous resource track tasks execution', async () => {
1274 let taskAsyncId
1275 let initCalls = 0
1276 let beforeCalls = 0
1277 let afterCalls = 0
1278 let resolveCalls = 0
1279 const hook = createHook({
1280 init (asyncId, type) {
1281 if (type === 'poolifier:task') {
1282 initCalls++
1283 taskAsyncId = asyncId
1284 }
1285 },
1286 before (asyncId) {
1287 if (asyncId === taskAsyncId) beforeCalls++
1288 },
1289 after (asyncId) {
1290 if (asyncId === taskAsyncId) afterCalls++
1291 },
1292 promiseResolve () {
1293 if (executionAsyncId() === taskAsyncId) resolveCalls++
1294 }
1295 })
f18fd12b
JB
1296 const pool = new FixedThreadPool(
1297 numberOfWorkers,
1298 './tests/worker-files/thread/testWorker.mjs'
1299 )
8954c0a3 1300 hook.enable()
f18fd12b
JB
1301 await pool.execute()
1302 hook.disable()
1303 expect(initCalls).toBe(1)
1304 expect(beforeCalls).toBe(1)
1305 expect(afterCalls).toBe(1)
1306 expect(resolveCalls).toBe(1)
8954c0a3 1307 await pool.destroy()
f18fd12b
JB
1308 })
1309
9eae3c69
JB
1310 it('Verify that hasTaskFunction() is working', async () => {
1311 const dynamicThreadPool = new DynamicThreadPool(
1312 Math.floor(numberOfWorkers / 2),
1313 numberOfWorkers,
b2fd3f4a 1314 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
9eae3c69
JB
1315 )
1316 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1317 expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1318 expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1319 true
1320 )
1321 expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true)
1322 expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true)
1323 expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false)
1324 await dynamicThreadPool.destroy()
1325 const fixedClusterPool = new FixedClusterPool(
1326 numberOfWorkers,
d35e5717 1327 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs'
9eae3c69
JB
1328 )
1329 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1330 expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1331 expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1332 true
1333 )
1334 expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true)
1335 expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true)
1336 expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false)
1337 await fixedClusterPool.destroy()
1338 })
1339
1340 it('Verify that addTaskFunction() is working', async () => {
1341 const dynamicThreadPool = new DynamicThreadPool(
1342 Math.floor(numberOfWorkers / 2),
1343 numberOfWorkers,
b2fd3f4a 1344 './tests/worker-files/thread/testWorker.mjs'
9eae3c69
JB
1345 )
1346 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
3feeab69
JB
1347 await expect(
1348 dynamicThreadPool.addTaskFunction(0, () => {})
948faff7 1349 ).rejects.toThrow(new TypeError('name argument must be a string'))
3feeab69
JB
1350 await expect(
1351 dynamicThreadPool.addTaskFunction('', () => {})
948faff7 1352 ).rejects.toThrow(
3feeab69
JB
1353 new TypeError('name argument must not be an empty string')
1354 )
948faff7
JB
1355 await expect(dynamicThreadPool.addTaskFunction('test', 0)).rejects.toThrow(
1356 new TypeError('fn argument must be a function')
1357 )
1358 await expect(dynamicThreadPool.addTaskFunction('test', '')).rejects.toThrow(
1359 new TypeError('fn argument must be a function')
1360 )
9eae3c69
JB
1361 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1362 DEFAULT_TASK_NAME,
1363 'test'
1364 ])
1365 const echoTaskFunction = data => {
1366 return data
1367 }
1368 await expect(
1369 dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1370 ).resolves.toBe(true)
1371 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1372 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1373 echoTaskFunction
1374 )
1375 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1376 DEFAULT_TASK_NAME,
1377 'test',
1378 'echo'
1379 ])
1380 const taskFunctionData = { test: 'test' }
1381 const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
1382 expect(echoResult).toStrictEqual(taskFunctionData)
adee6053
JB
1383 for (const workerNode of dynamicThreadPool.workerNodes) {
1384 expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
1385 tasks: {
1386 executed: expect.any(Number),
1387 executing: 0,
1388 queued: 0,
463226a4 1389 sequentiallyStolen: 0,
5ad42e34 1390 stolen: 0,
adee6053
JB
1391 failed: 0
1392 },
1393 runTime: {
1394 history: new CircularArray()
1395 },
1396 waitTime: {
1397 history: new CircularArray()
1398 },
1399 elu: {
1400 idle: {
1401 history: new CircularArray()
1402 },
1403 active: {
1404 history: new CircularArray()
1405 }
1406 }
1407 })
1408 }
9eae3c69
JB
1409 await dynamicThreadPool.destroy()
1410 })
1411
1412 it('Verify that removeTaskFunction() is working', async () => {
1413 const dynamicThreadPool = new DynamicThreadPool(
1414 Math.floor(numberOfWorkers / 2),
1415 numberOfWorkers,
b2fd3f4a 1416 './tests/worker-files/thread/testWorker.mjs'
9eae3c69
JB
1417 )
1418 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1419 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1420 DEFAULT_TASK_NAME,
1421 'test'
1422 ])
948faff7 1423 await expect(dynamicThreadPool.removeTaskFunction('test')).rejects.toThrow(
16248b23 1424 new Error('Cannot remove a task function not handled on the pool side')
9eae3c69
JB
1425 )
1426 const echoTaskFunction = data => {
1427 return data
1428 }
1429 await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1430 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1431 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1432 echoTaskFunction
1433 )
1434 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1435 DEFAULT_TASK_NAME,
1436 'test',
1437 'echo'
1438 ])
1439 await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
1440 true
1441 )
1442 expect(dynamicThreadPool.taskFunctions.size).toBe(0)
1443 expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
1444 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1445 DEFAULT_TASK_NAME,
1446 'test'
1447 ])
1448 await dynamicThreadPool.destroy()
1449 })
1450
30500265 1451 it('Verify that listTaskFunctionNames() is working', async () => {
90d7d101
JB
1452 const dynamicThreadPool = new DynamicThreadPool(
1453 Math.floor(numberOfWorkers / 2),
1454 numberOfWorkers,
b2fd3f4a 1455 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
90d7d101
JB
1456 )
1457 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
66979634 1458 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
6cd5248f 1459 DEFAULT_TASK_NAME,
90d7d101
JB
1460 'jsonIntegerSerialization',
1461 'factorial',
1462 'fibonacci'
1463 ])
9eae3c69 1464 await dynamicThreadPool.destroy()
90d7d101
JB
1465 const fixedClusterPool = new FixedClusterPool(
1466 numberOfWorkers,
d35e5717 1467 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs'
90d7d101
JB
1468 )
1469 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
66979634 1470 expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([
6cd5248f 1471 DEFAULT_TASK_NAME,
90d7d101
JB
1472 'jsonIntegerSerialization',
1473 'factorial',
1474 'fibonacci'
1475 ])
0fe39c97 1476 await fixedClusterPool.destroy()
90d7d101
JB
1477 })
1478
9eae3c69 1479 it('Verify that setDefaultTaskFunction() is working', async () => {
30500265
JB
1480 const dynamicThreadPool = new DynamicThreadPool(
1481 Math.floor(numberOfWorkers / 2),
1482 numberOfWorkers,
b2fd3f4a 1483 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
30500265
JB
1484 )
1485 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
711623b8 1486 const workerId = dynamicThreadPool.workerNodes[0].info.id
948faff7 1487 await expect(dynamicThreadPool.setDefaultTaskFunction(0)).rejects.toThrow(
b0b55f57 1488 new Error(
711623b8 1489 `Task function operation 'default' failed on worker ${workerId} with error: 'TypeError: name parameter is not a string'`
b0b55f57
JB
1490 )
1491 )
1492 await expect(
1493 dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
948faff7 1494 ).rejects.toThrow(
b0b55f57 1495 new Error(
711623b8 1496 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function reserved name as the default task function'`
b0b55f57
JB
1497 )
1498 )
1499 await expect(
1500 dynamicThreadPool.setDefaultTaskFunction('unknown')
948faff7 1501 ).rejects.toThrow(
b0b55f57 1502 new Error(
711623b8 1503 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function to a non-existing task function'`
b0b55f57
JB
1504 )
1505 )
9eae3c69
JB
1506 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1507 DEFAULT_TASK_NAME,
1508 'jsonIntegerSerialization',
1509 'factorial',
1510 'fibonacci'
1511 ])
1512 await expect(
1513 dynamicThreadPool.setDefaultTaskFunction('factorial')
1514 ).resolves.toBe(true)
1515 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1516 DEFAULT_TASK_NAME,
1517 'factorial',
1518 'jsonIntegerSerialization',
1519 'fibonacci'
1520 ])
1521 await expect(
1522 dynamicThreadPool.setDefaultTaskFunction('fibonacci')
1523 ).resolves.toBe(true)
1524 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1525 DEFAULT_TASK_NAME,
1526 'fibonacci',
1527 'jsonIntegerSerialization',
1528 'factorial'
1529 ])
cda9ba34 1530 await dynamicThreadPool.destroy()
30500265
JB
1531 })
1532
90d7d101 1533 it('Verify that multiple task functions worker is working', async () => {
70a4f5ea 1534 const pool = new DynamicClusterPool(
2431bdb4 1535 Math.floor(numberOfWorkers / 2),
70a4f5ea 1536 numberOfWorkers,
d35e5717 1537 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs'
70a4f5ea
JB
1538 )
1539 const data = { n: 10 }
82888165 1540 const result0 = await pool.execute(data)
30b963d4 1541 expect(result0).toStrictEqual({ ok: 1 })
70a4f5ea 1542 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
30b963d4 1543 expect(result1).toStrictEqual({ ok: 1 })
70a4f5ea
JB
1544 const result2 = await pool.execute(data, 'factorial')
1545 expect(result2).toBe(3628800)
1546 const result3 = await pool.execute(data, 'fibonacci')
024daf59 1547 expect(result3).toBe(55)
5bb5be17
JB
1548 expect(pool.info.executingTasks).toBe(0)
1549 expect(pool.info.executedTasks).toBe(4)
b414b84c 1550 for (const workerNode of pool.workerNodes) {
66979634 1551 expect(workerNode.info.taskFunctionNames).toStrictEqual([
6cd5248f 1552 DEFAULT_TASK_NAME,
b414b84c
JB
1553 'jsonIntegerSerialization',
1554 'factorial',
1555 'fibonacci'
1556 ])
1557 expect(workerNode.taskFunctionsUsage.size).toBe(3)
66979634 1558 for (const name of pool.listTaskFunctionNames()) {
5bb5be17
JB
1559 expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
1560 tasks: {
1561 executed: expect.any(Number),
4ba4c7f9 1562 executing: 0,
5bb5be17 1563 failed: 0,
68cbdc84 1564 queued: 0,
463226a4 1565 sequentiallyStolen: 0,
68cbdc84 1566 stolen: 0
5bb5be17
JB
1567 },
1568 runTime: {
1569 history: expect.any(CircularArray)
1570 },
1571 waitTime: {
1572 history: expect.any(CircularArray)
1573 },
1574 elu: {
1575 idle: {
1576 history: expect.any(CircularArray)
1577 },
1578 active: {
1579 history: expect.any(CircularArray)
1580 }
1581 }
1582 })
1583 expect(
4ba4c7f9
JB
1584 workerNode.getTaskFunctionWorkerUsage(name).tasks.executed
1585 ).toBeGreaterThan(0)
5bb5be17 1586 }
dfd7ec01
JB
1587 expect(
1588 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1589 ).toStrictEqual(
66979634
JB
1590 workerNode.getTaskFunctionWorkerUsage(
1591 workerNode.info.taskFunctionNames[1]
1592 )
dfd7ec01 1593 )
5bb5be17 1594 }
0fe39c97 1595 await pool.destroy()
70a4f5ea 1596 })
52a23942
JB
1597
1598 it('Verify sendKillMessageToWorker()', async () => {
1599 const pool = new DynamicClusterPool(
1600 Math.floor(numberOfWorkers / 2),
1601 numberOfWorkers,
d35e5717 1602 './tests/worker-files/cluster/testWorker.cjs'
52a23942
JB
1603 )
1604 const workerNodeKey = 0
1605 await expect(
adee6053 1606 pool.sendKillMessageToWorker(workerNodeKey)
52a23942
JB
1607 ).resolves.toBeUndefined()
1608 await pool.destroy()
1609 })
adee6053
JB
1610
1611 it('Verify sendTaskFunctionOperationToWorker()', async () => {
1612 const pool = new DynamicClusterPool(
1613 Math.floor(numberOfWorkers / 2),
1614 numberOfWorkers,
d35e5717 1615 './tests/worker-files/cluster/testWorker.cjs'
adee6053
JB
1616 )
1617 const workerNodeKey = 0
1618 await expect(
1619 pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
1620 taskFunctionOperation: 'add',
1621 taskFunctionName: 'empty',
1622 taskFunction: (() => {}).toString()
1623 })
1624 ).resolves.toBe(true)
1625 expect(
1626 pool.workerNodes[workerNodeKey].info.taskFunctionNames
1627 ).toStrictEqual([DEFAULT_TASK_NAME, 'test', 'empty'])
1628 await pool.destroy()
1629 })
1630
1631 it('Verify sendTaskFunctionOperationToWorkers()', async () => {
1632 const pool = new DynamicClusterPool(
1633 Math.floor(numberOfWorkers / 2),
1634 numberOfWorkers,
d35e5717 1635 './tests/worker-files/cluster/testWorker.cjs'
adee6053 1636 )
adee6053
JB
1637 await expect(
1638 pool.sendTaskFunctionOperationToWorkers({
1639 taskFunctionOperation: 'add',
1640 taskFunctionName: 'empty',
1641 taskFunction: (() => {}).toString()
1642 })
1643 ).resolves.toBe(true)
1644 for (const workerNode of pool.workerNodes) {
1645 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1646 DEFAULT_TASK_NAME,
1647 'test',
1648 'empty'
1649 ])
1650 }
1651 await pool.destroy()
1652 })
3ec964d6 1653})