docs: refine code comment
[poolifier.git] / tests / pools / abstract-pool.test.mjs
CommitLineData
a074ffee 1import { EventEmitterAsyncResource } from 'node:events'
2eb14889 2import { dirname, join } from 'node:path'
a074ffee 3import { readFileSync } from 'node:fs'
2eb14889 4import { fileURLToPath } from 'node:url'
f18fd12b 5import { createHook, executionAsyncId } from 'node:async_hooks'
a074ffee
JB
6import { expect } from 'expect'
7import { restore, stub } from 'sinon'
8import {
70a4f5ea 9 DynamicClusterPool,
9e619829 10 DynamicThreadPool,
aee46736 11 FixedClusterPool,
e843b904 12 FixedThreadPool,
aee46736 13 PoolEvents,
184855e6 14 PoolTypes,
3d6dd312 15 WorkerChoiceStrategies,
184855e6 16 WorkerTypes
a074ffee
JB
17} from '../../lib/index.js'
18import { CircularArray } from '../../lib/circular-array.js'
19import { Deque } from '../../lib/deque.js'
20import { DEFAULT_TASK_NAME } from '../../lib/utils.js'
21import { waitPoolEvents } from '../test-utils.js'
22import { WorkerNode } from '../../lib/pools/worker-node.js'
e1ffb94f
JB
23
24describe('Abstract pool test suite', () => {
2eb14889
JB
25 const version = JSON.parse(
26 readFileSync(
a5e5599c 27 join(dirname(fileURLToPath(import.meta.url)), '../..', 'package.json'),
2eb14889
JB
28 'utf8'
29 )
30 ).version
fc027381 31 const numberOfWorkers = 2
a8884ffd 32 class StubPoolWithIsMain extends FixedThreadPool {
e1ffb94f
JB
33 isMain () {
34 return false
35 }
3ec964d6 36 }
3ec964d6 37
dc021bcc 38 afterEach(() => {
a074ffee 39 restore()
dc021bcc
JB
40 })
41
bcf85f7f
JB
42 it('Verify that pool can be created and destroyed', async () => {
43 const pool = new FixedThreadPool(
44 numberOfWorkers,
45 './tests/worker-files/thread/testWorker.mjs'
46 )
47 expect(pool).toBeInstanceOf(FixedThreadPool)
48 await pool.destroy()
49 })
50
51 it('Verify that pool cannot be created from a non main thread/process', () => {
8d3782fa
JB
52 expect(
53 () =>
a8884ffd 54 new StubPoolWithIsMain(
7c0ba920 55 numberOfWorkers,
b2fd3f4a 56 './tests/worker-files/thread/testWorker.mjs',
8d3782fa 57 {
041dc05b 58 errorHandler: e => console.error(e)
8d3782fa
JB
59 }
60 )
948faff7 61 ).toThrow(
e695d66f
JB
62 new Error(
63 'Cannot start a pool from a worker with the same type as the pool'
64 )
04f45163 65 )
3ec964d6 66 })
c510fea7 67
bc61cfe6
JB
68 it('Verify that pool statuses properties are set', async () => {
69 const pool = new FixedThreadPool(
70 numberOfWorkers,
b2fd3f4a 71 './tests/worker-files/thread/testWorker.mjs'
bc61cfe6 72 )
bc61cfe6 73 expect(pool.started).toBe(true)
711623b8
JB
74 expect(pool.starting).toBe(false)
75 expect(pool.destroying).toBe(false)
bc61cfe6 76 await pool.destroy()
bc61cfe6
JB
77 })
78
c510fea7 79 it('Verify that filePath is checked', () => {
948faff7 80 expect(() => new FixedThreadPool(numberOfWorkers)).toThrow(
c3719753
JB
81 new TypeError('The worker file path must be specified')
82 )
83 expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrow(
84 new TypeError('The worker file path must be a string')
3d6dd312
JB
85 )
86 expect(
87 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
948faff7 88 ).toThrow(new Error("Cannot find the worker file './dummyWorker.ts'"))
8d3782fa
JB
89 })
90
91 it('Verify that numberOfWorkers is checked', () => {
8003c026
JB
92 expect(
93 () =>
94 new FixedThreadPool(
95 undefined,
b2fd3f4a 96 './tests/worker-files/thread/testWorker.mjs'
8003c026 97 )
948faff7 98 ).toThrow(
e695d66f
JB
99 new Error(
100 'Cannot instantiate a pool without specifying the number of workers'
101 )
8d3782fa
JB
102 )
103 })
104
105 it('Verify that a negative number of workers is checked', () => {
106 expect(
107 () =>
108 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
948faff7 109 ).toThrow(
473c717a
JB
110 new RangeError(
111 'Cannot instantiate a pool with a negative number of workers'
112 )
8d3782fa
JB
113 )
114 })
115
116 it('Verify that a non integer number of workers is checked', () => {
117 expect(
118 () =>
b2fd3f4a 119 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.mjs')
948faff7 120 ).toThrow(
473c717a 121 new TypeError(
0d80593b 122 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
123 )
124 )
c510fea7 125 })
7c0ba920 126
26ce26ca
JB
127 it('Verify that pool arguments number and pool type are checked', () => {
128 expect(
129 () =>
130 new FixedThreadPool(
131 numberOfWorkers,
132 './tests/worker-files/thread/testWorker.mjs',
133 undefined,
134 numberOfWorkers * 2
135 )
136 ).toThrow(
137 new Error(
138 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
139 )
140 )
141 })
142
216541b6 143 it('Verify that dynamic pool sizing is checked', () => {
a5ed75b7
JB
144 expect(
145 () =>
146 new DynamicClusterPool(
147 1,
148 undefined,
149 './tests/worker-files/cluster/testWorker.js'
150 )
948faff7 151 ).toThrow(
a5ed75b7
JB
152 new TypeError(
153 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
154 )
155 )
2761efb4
JB
156 expect(
157 () =>
158 new DynamicThreadPool(
159 0.5,
160 1,
b2fd3f4a 161 './tests/worker-files/thread/testWorker.mjs'
2761efb4 162 )
948faff7 163 ).toThrow(
2761efb4
JB
164 new TypeError(
165 'Cannot instantiate a pool with a non safe integer number of workers'
166 )
167 )
168 expect(
169 () =>
170 new DynamicClusterPool(
171 0,
172 0.5,
a5ed75b7 173 './tests/worker-files/cluster/testWorker.js'
2761efb4 174 )
948faff7 175 ).toThrow(
2761efb4
JB
176 new TypeError(
177 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
178 )
179 )
2431bdb4
JB
180 expect(
181 () =>
b2fd3f4a
JB
182 new DynamicThreadPool(
183 2,
184 1,
185 './tests/worker-files/thread/testWorker.mjs'
186 )
948faff7 187 ).toThrow(
2431bdb4
JB
188 new RangeError(
189 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
190 )
191 )
192 expect(
193 () =>
b2fd3f4a
JB
194 new DynamicThreadPool(
195 0,
196 0,
197 './tests/worker-files/thread/testWorker.mjs'
198 )
948faff7 199 ).toThrow(
2431bdb4 200 new RangeError(
213cbac6 201 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
2431bdb4
JB
202 )
203 )
21f710aa
JB
204 expect(
205 () =>
213cbac6
JB
206 new DynamicClusterPool(
207 1,
208 1,
209 './tests/worker-files/cluster/testWorker.js'
210 )
948faff7 211 ).toThrow(
21f710aa 212 new RangeError(
213cbac6 213 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
21f710aa
JB
214 )
215 )
2431bdb4
JB
216 })
217
fd7ebd49 218 it('Verify that pool options are checked', async () => {
7c0ba920
JB
219 let pool = new FixedThreadPool(
220 numberOfWorkers,
b2fd3f4a 221 './tests/worker-files/thread/testWorker.mjs'
7c0ba920 222 )
b5604034 223 expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
47352846
JB
224 expect(pool.opts).toStrictEqual({
225 startWorkers: true,
226 enableEvents: true,
227 restartWorkerOnError: true,
228 enableTasksQueue: false,
26ce26ca 229 workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN
8990357d
JB
230 })
231 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
26ce26ca 232 retries: pool.info.maxSize,
932fc8be 233 runTime: { median: false },
5df69fab
JB
234 waitTime: { median: false },
235 elu: { median: false }
da309861 236 })
999ef664
JB
237 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
238 .workerChoiceStrategies) {
239 expect(workerChoiceStrategy.opts).toStrictEqual({
26ce26ca 240 retries: pool.info.maxSize,
999ef664
JB
241 runTime: { median: false },
242 waitTime: { median: false },
243 elu: { median: false }
244 })
245 }
fd7ebd49 246 await pool.destroy()
73bfd59d 247 const testHandler = () => console.info('test handler executed')
7c0ba920
JB
248 pool = new FixedThreadPool(
249 numberOfWorkers,
b2fd3f4a 250 './tests/worker-files/thread/testWorker.mjs',
7c0ba920 251 {
e4543b14 252 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
49be33fe 253 workerChoiceStrategyOptions: {
932fc8be 254 runTime: { median: true },
fc027381 255 weights: { 0: 300, 1: 200 }
49be33fe 256 },
35cf1c03 257 enableEvents: false,
1f68cede 258 restartWorkerOnError: false,
ff733df7 259 enableTasksQueue: true,
d4aeae5a 260 tasksQueueOptions: { concurrency: 2 },
35cf1c03
JB
261 messageHandler: testHandler,
262 errorHandler: testHandler,
263 onlineHandler: testHandler,
264 exitHandler: testHandler
7c0ba920
JB
265 }
266 )
7c0ba920 267 expect(pool.emitter).toBeUndefined()
47352846
JB
268 expect(pool.opts).toStrictEqual({
269 startWorkers: true,
270 enableEvents: false,
271 restartWorkerOnError: false,
272 enableTasksQueue: true,
273 tasksQueueOptions: {
274 concurrency: 2,
2324f8c9 275 size: Math.pow(numberOfWorkers, 2),
dbd73092 276 taskStealing: true,
32b141fd 277 tasksStealingOnBackPressure: true,
568d0075 278 tasksFinishedTimeout: 2000
47352846
JB
279 },
280 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
281 workerChoiceStrategyOptions: {
47352846 282 runTime: { median: true },
47352846
JB
283 weights: { 0: 300, 1: 200 }
284 },
285 onlineHandler: testHandler,
286 messageHandler: testHandler,
287 errorHandler: testHandler,
288 exitHandler: testHandler
8990357d
JB
289 })
290 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
26ce26ca 291 retries: pool.info.maxSize,
8990357d
JB
292 runTime: { median: true },
293 waitTime: { median: false },
294 elu: { median: false },
fc027381 295 weights: { 0: 300, 1: 200 }
da309861 296 })
999ef664
JB
297 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
298 .workerChoiceStrategies) {
299 expect(workerChoiceStrategy.opts).toStrictEqual({
26ce26ca 300 retries: pool.info.maxSize,
999ef664
JB
301 runTime: { median: true },
302 waitTime: { median: false },
303 elu: { median: false },
304 weights: { 0: 300, 1: 200 }
305 })
306 }
fd7ebd49 307 await pool.destroy()
7c0ba920
JB
308 })
309
fe291b64 310 it('Verify that pool options are validated', () => {
d4aeae5a
JB
311 expect(
312 () =>
313 new FixedThreadPool(
314 numberOfWorkers,
b2fd3f4a 315 './tests/worker-files/thread/testWorker.mjs',
d4aeae5a 316 {
f0d7f803 317 workerChoiceStrategy: 'invalidStrategy'
d4aeae5a
JB
318 }
319 )
948faff7 320 ).toThrow(new Error("Invalid worker choice strategy 'invalidStrategy'"))
49be33fe
JB
321 expect(
322 () =>
323 new FixedThreadPool(
324 numberOfWorkers,
b2fd3f4a 325 './tests/worker-files/thread/testWorker.mjs',
49be33fe
JB
326 {
327 workerChoiceStrategyOptions: { weights: {} }
328 }
329 )
948faff7 330 ).toThrow(
8735b4e5
JB
331 new Error(
332 'Invalid worker choice strategy options: must have a weight for each worker node'
333 )
49be33fe 334 )
f0d7f803
JB
335 expect(
336 () =>
337 new FixedThreadPool(
338 numberOfWorkers,
b2fd3f4a 339 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
340 {
341 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
342 }
343 )
948faff7 344 ).toThrow(
8735b4e5
JB
345 new Error(
346 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
347 )
f0d7f803 348 )
5c4c2dee
JB
349 expect(
350 () =>
351 new FixedThreadPool(
352 numberOfWorkers,
b2fd3f4a 353 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
354 {
355 enableTasksQueue: true,
356 tasksQueueOptions: 'invalidTasksQueueOptions'
357 }
358 )
948faff7 359 ).toThrow(
5c4c2dee
JB
360 new TypeError('Invalid tasks queue options: must be a plain object')
361 )
f0d7f803
JB
362 expect(
363 () =>
364 new FixedThreadPool(
365 numberOfWorkers,
b2fd3f4a 366 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
367 {
368 enableTasksQueue: true,
369 tasksQueueOptions: { concurrency: 0 }
370 }
371 )
948faff7 372 ).toThrow(
e695d66f 373 new RangeError(
20c6f652 374 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
8735b4e5
JB
375 )
376 )
f0d7f803
JB
377 expect(
378 () =>
379 new FixedThreadPool(
380 numberOfWorkers,
b2fd3f4a 381 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
382 {
383 enableTasksQueue: true,
5c4c2dee 384 tasksQueueOptions: { concurrency: -1 }
f0d7f803
JB
385 }
386 )
948faff7 387 ).toThrow(
5c4c2dee
JB
388 new RangeError(
389 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
390 )
8735b4e5 391 )
f0d7f803
JB
392 expect(
393 () =>
394 new FixedThreadPool(
395 numberOfWorkers,
b2fd3f4a 396 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
397 {
398 enableTasksQueue: true,
399 tasksQueueOptions: { concurrency: 0.2 }
400 }
401 )
948faff7 402 ).toThrow(
20c6f652 403 new TypeError('Invalid worker node tasks concurrency: must be an integer')
8735b4e5 404 )
5c4c2dee
JB
405 expect(
406 () =>
407 new FixedThreadPool(
408 numberOfWorkers,
b2fd3f4a 409 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
410 {
411 enableTasksQueue: true,
412 tasksQueueOptions: { size: 0 }
413 }
414 )
948faff7 415 ).toThrow(
5c4c2dee
JB
416 new RangeError(
417 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
418 )
419 )
420 expect(
421 () =>
422 new FixedThreadPool(
423 numberOfWorkers,
b2fd3f4a 424 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
425 {
426 enableTasksQueue: true,
427 tasksQueueOptions: { size: -1 }
428 }
429 )
948faff7 430 ).toThrow(
5c4c2dee
JB
431 new RangeError(
432 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
433 )
434 )
435 expect(
436 () =>
437 new FixedThreadPool(
438 numberOfWorkers,
b2fd3f4a 439 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
440 {
441 enableTasksQueue: true,
442 tasksQueueOptions: { size: 0.2 }
443 }
444 )
948faff7 445 ).toThrow(
5c4c2dee
JB
446 new TypeError('Invalid worker node tasks queue size: must be an integer')
447 )
d4aeae5a
JB
448 })
449
2431bdb4 450 it('Verify that pool worker choice strategy options can be set', async () => {
a20f0ba5
JB
451 const pool = new FixedThreadPool(
452 numberOfWorkers,
b2fd3f4a 453 './tests/worker-files/thread/testWorker.mjs',
a20f0ba5
JB
454 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
455 )
26ce26ca 456 expect(pool.opts.workerChoiceStrategyOptions).toBeUndefined()
8990357d 457 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
26ce26ca 458 retries: pool.info.maxSize,
932fc8be 459 runTime: { median: false },
5df69fab
JB
460 waitTime: { median: false },
461 elu: { median: false }
a20f0ba5
JB
462 })
463 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
464 .workerChoiceStrategies) {
86bf340d 465 expect(workerChoiceStrategy.opts).toStrictEqual({
26ce26ca 466 retries: pool.info.maxSize,
932fc8be 467 runTime: { median: false },
5df69fab
JB
468 waitTime: { median: false },
469 elu: { median: false }
86bf340d 470 })
a20f0ba5 471 }
87de9ff5
JB
472 expect(
473 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
474 ).toStrictEqual({
932fc8be
JB
475 runTime: {
476 aggregate: true,
477 average: true,
478 median: false
479 },
480 waitTime: {
481 aggregate: false,
482 average: false,
483 median: false
484 },
5df69fab 485 elu: {
9adcefab
JB
486 aggregate: true,
487 average: true,
5df69fab
JB
488 median: false
489 }
86bf340d 490 })
9adcefab
JB
491 pool.setWorkerChoiceStrategyOptions({
492 runTime: { median: true },
493 elu: { median: true }
494 })
a20f0ba5 495 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
9adcefab 496 runTime: { median: true },
8990357d
JB
497 elu: { median: true }
498 })
499 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
26ce26ca 500 retries: pool.info.maxSize,
8990357d
JB
501 runTime: { median: true },
502 waitTime: { median: false },
9adcefab 503 elu: { median: true }
a20f0ba5
JB
504 })
505 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
506 .workerChoiceStrategies) {
932fc8be 507 expect(workerChoiceStrategy.opts).toStrictEqual({
26ce26ca 508 retries: pool.info.maxSize,
9adcefab 509 runTime: { median: true },
8990357d 510 waitTime: { median: false },
9adcefab 511 elu: { median: true }
932fc8be 512 })
a20f0ba5 513 }
87de9ff5
JB
514 expect(
515 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
516 ).toStrictEqual({
932fc8be
JB
517 runTime: {
518 aggregate: true,
519 average: false,
520 median: true
521 },
522 waitTime: {
523 aggregate: false,
524 average: false,
525 median: false
526 },
5df69fab 527 elu: {
9adcefab 528 aggregate: true,
5df69fab 529 average: false,
9adcefab 530 median: true
5df69fab 531 }
86bf340d 532 })
9adcefab
JB
533 pool.setWorkerChoiceStrategyOptions({
534 runTime: { median: false },
535 elu: { median: false }
536 })
a20f0ba5 537 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
8990357d 538 runTime: { median: false },
8990357d
JB
539 elu: { median: false }
540 })
541 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
26ce26ca 542 retries: pool.info.maxSize,
9adcefab 543 runTime: { median: false },
8990357d 544 waitTime: { median: false },
9adcefab 545 elu: { median: false }
a20f0ba5
JB
546 })
547 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
548 .workerChoiceStrategies) {
932fc8be 549 expect(workerChoiceStrategy.opts).toStrictEqual({
26ce26ca 550 retries: pool.info.maxSize,
9adcefab 551 runTime: { median: false },
8990357d 552 waitTime: { median: false },
9adcefab 553 elu: { median: false }
932fc8be 554 })
a20f0ba5 555 }
87de9ff5
JB
556 expect(
557 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
558 ).toStrictEqual({
932fc8be
JB
559 runTime: {
560 aggregate: true,
561 average: true,
562 median: false
563 },
564 waitTime: {
565 aggregate: false,
566 average: false,
567 median: false
568 },
5df69fab 569 elu: {
9adcefab
JB
570 aggregate: true,
571 average: true,
5df69fab
JB
572 median: false
573 }
86bf340d 574 })
1f95d544
JB
575 expect(() =>
576 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
948faff7 577 ).toThrow(
8735b4e5
JB
578 new TypeError(
579 'Invalid worker choice strategy options: must be a plain object'
580 )
1f95d544 581 )
948faff7 582 expect(() => pool.setWorkerChoiceStrategyOptions({ weights: {} })).toThrow(
8735b4e5
JB
583 new Error(
584 'Invalid worker choice strategy options: must have a weight for each worker node'
585 )
1f95d544
JB
586 )
587 expect(() =>
588 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
948faff7 589 ).toThrow(
8735b4e5
JB
590 new Error(
591 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
592 )
1f95d544 593 )
a20f0ba5
JB
594 await pool.destroy()
595 })
596
2431bdb4 597 it('Verify that pool tasks queue can be enabled/disabled', async () => {
a20f0ba5
JB
598 const pool = new FixedThreadPool(
599 numberOfWorkers,
b2fd3f4a 600 './tests/worker-files/thread/testWorker.mjs'
a20f0ba5
JB
601 )
602 expect(pool.opts.enableTasksQueue).toBe(false)
603 expect(pool.opts.tasksQueueOptions).toBeUndefined()
604 pool.enableTasksQueue(true)
605 expect(pool.opts.enableTasksQueue).toBe(true)
20c6f652
JB
606 expect(pool.opts.tasksQueueOptions).toStrictEqual({
607 concurrency: 1,
2324f8c9 608 size: Math.pow(numberOfWorkers, 2),
dbd73092 609 taskStealing: true,
32b141fd 610 tasksStealingOnBackPressure: true,
568d0075 611 tasksFinishedTimeout: 2000
20c6f652 612 })
a20f0ba5
JB
613 pool.enableTasksQueue(true, { concurrency: 2 })
614 expect(pool.opts.enableTasksQueue).toBe(true)
20c6f652
JB
615 expect(pool.opts.tasksQueueOptions).toStrictEqual({
616 concurrency: 2,
2324f8c9 617 size: Math.pow(numberOfWorkers, 2),
dbd73092 618 taskStealing: true,
32b141fd 619 tasksStealingOnBackPressure: true,
568d0075 620 tasksFinishedTimeout: 2000
20c6f652 621 })
a20f0ba5
JB
622 pool.enableTasksQueue(false)
623 expect(pool.opts.enableTasksQueue).toBe(false)
624 expect(pool.opts.tasksQueueOptions).toBeUndefined()
625 await pool.destroy()
626 })
627
2431bdb4 628 it('Verify that pool tasks queue options can be set', async () => {
a20f0ba5
JB
629 const pool = new FixedThreadPool(
630 numberOfWorkers,
b2fd3f4a 631 './tests/worker-files/thread/testWorker.mjs',
a20f0ba5
JB
632 { enableTasksQueue: true }
633 )
20c6f652
JB
634 expect(pool.opts.tasksQueueOptions).toStrictEqual({
635 concurrency: 1,
2324f8c9 636 size: Math.pow(numberOfWorkers, 2),
dbd73092 637 taskStealing: true,
32b141fd 638 tasksStealingOnBackPressure: true,
568d0075 639 tasksFinishedTimeout: 2000
20c6f652 640 })
d6ca1416 641 for (const workerNode of pool.workerNodes) {
2324f8c9
JB
642 expect(workerNode.tasksQueueBackPressureSize).toBe(
643 pool.opts.tasksQueueOptions.size
644 )
d6ca1416
JB
645 }
646 pool.setTasksQueueOptions({
647 concurrency: 2,
2324f8c9 648 size: 2,
d6ca1416 649 taskStealing: false,
32b141fd 650 tasksStealingOnBackPressure: false,
568d0075 651 tasksFinishedTimeout: 3000
d6ca1416 652 })
20c6f652
JB
653 expect(pool.opts.tasksQueueOptions).toStrictEqual({
654 concurrency: 2,
2324f8c9 655 size: 2,
d6ca1416 656 taskStealing: false,
32b141fd 657 tasksStealingOnBackPressure: false,
568d0075 658 tasksFinishedTimeout: 3000
d6ca1416
JB
659 })
660 for (const workerNode of pool.workerNodes) {
2324f8c9
JB
661 expect(workerNode.tasksQueueBackPressureSize).toBe(
662 pool.opts.tasksQueueOptions.size
663 )
d6ca1416
JB
664 }
665 pool.setTasksQueueOptions({
666 concurrency: 1,
667 taskStealing: true,
668 tasksStealingOnBackPressure: true
669 })
670 expect(pool.opts.tasksQueueOptions).toStrictEqual({
671 concurrency: 1,
2324f8c9 672 size: Math.pow(numberOfWorkers, 2),
dbd73092 673 taskStealing: true,
32b141fd 674 tasksStealingOnBackPressure: true,
568d0075 675 tasksFinishedTimeout: 2000
20c6f652 676 })
d6ca1416 677 for (const workerNode of pool.workerNodes) {
2324f8c9
JB
678 expect(workerNode.tasksQueueBackPressureSize).toBe(
679 pool.opts.tasksQueueOptions.size
680 )
d6ca1416 681 }
948faff7 682 expect(() => pool.setTasksQueueOptions('invalidTasksQueueOptions')).toThrow(
8735b4e5
JB
683 new TypeError('Invalid tasks queue options: must be a plain object')
684 )
948faff7 685 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrow(
e695d66f 686 new RangeError(
20c6f652 687 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
8735b4e5
JB
688 )
689 )
948faff7 690 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrow(
e695d66f 691 new RangeError(
20c6f652 692 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
8735b4e5 693 )
a20f0ba5 694 )
948faff7 695 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrow(
20c6f652
JB
696 new TypeError('Invalid worker node tasks concurrency: must be an integer')
697 )
948faff7 698 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrow(
20c6f652 699 new RangeError(
68dbcdc0 700 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
20c6f652
JB
701 )
702 )
948faff7 703 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrow(
20c6f652 704 new RangeError(
68dbcdc0 705 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
20c6f652
JB
706 )
707 )
948faff7 708 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrow(
68dbcdc0 709 new TypeError('Invalid worker node tasks queue size: must be an integer')
f0d7f803 710 )
a20f0ba5
JB
711 await pool.destroy()
712 })
713
6b27d407
JB
714 it('Verify that pool info is set', async () => {
715 let pool = new FixedThreadPool(
716 numberOfWorkers,
b2fd3f4a 717 './tests/worker-files/thread/testWorker.mjs'
6b27d407 718 )
2dca6cab
JB
719 expect(pool.info).toStrictEqual({
720 version,
721 type: PoolTypes.fixed,
722 worker: WorkerTypes.thread,
47352846 723 started: true,
2dca6cab
JB
724 ready: true,
725 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
726 minSize: numberOfWorkers,
727 maxSize: numberOfWorkers,
728 workerNodes: numberOfWorkers,
729 idleWorkerNodes: numberOfWorkers,
730 busyWorkerNodes: 0,
731 executedTasks: 0,
732 executingTasks: 0,
2dca6cab
JB
733 failedTasks: 0
734 })
6b27d407
JB
735 await pool.destroy()
736 pool = new DynamicClusterPool(
2431bdb4 737 Math.floor(numberOfWorkers / 2),
6b27d407 738 numberOfWorkers,
ecdfbdc0 739 './tests/worker-files/cluster/testWorker.js'
6b27d407 740 )
2dca6cab
JB
741 expect(pool.info).toStrictEqual({
742 version,
743 type: PoolTypes.dynamic,
744 worker: WorkerTypes.cluster,
47352846 745 started: true,
2dca6cab
JB
746 ready: true,
747 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
748 minSize: Math.floor(numberOfWorkers / 2),
749 maxSize: numberOfWorkers,
750 workerNodes: Math.floor(numberOfWorkers / 2),
751 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
752 busyWorkerNodes: 0,
753 executedTasks: 0,
754 executingTasks: 0,
2dca6cab
JB
755 failedTasks: 0
756 })
6b27d407
JB
757 await pool.destroy()
758 })
759
2431bdb4 760 it('Verify that pool worker tasks usage are initialized', async () => {
bf9549ae
JB
761 const pool = new FixedClusterPool(
762 numberOfWorkers,
763 './tests/worker-files/cluster/testWorker.js'
764 )
f06e48d8 765 for (const workerNode of pool.workerNodes) {
47352846 766 expect(workerNode).toBeInstanceOf(WorkerNode)
465b2940 767 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
768 tasks: {
769 executed: 0,
770 executing: 0,
771 queued: 0,
df593701 772 maxQueued: 0,
463226a4 773 sequentiallyStolen: 0,
68cbdc84 774 stolen: 0,
a4e07f72
JB
775 failed: 0
776 },
777 runTime: {
4ba4c7f9 778 history: new CircularArray()
a4e07f72
JB
779 },
780 waitTime: {
4ba4c7f9 781 history: new CircularArray()
a4e07f72 782 },
5df69fab
JB
783 elu: {
784 idle: {
4ba4c7f9 785 history: new CircularArray()
5df69fab
JB
786 },
787 active: {
4ba4c7f9 788 history: new CircularArray()
f7510105 789 }
5df69fab 790 }
86bf340d 791 })
f06e48d8
JB
792 }
793 await pool.destroy()
794 })
795
2431bdb4
JB
796 it('Verify that pool worker tasks queue are initialized', async () => {
797 let pool = new FixedClusterPool(
f06e48d8
JB
798 numberOfWorkers,
799 './tests/worker-files/cluster/testWorker.js'
800 )
801 for (const workerNode of pool.workerNodes) {
47352846 802 expect(workerNode).toBeInstanceOf(WorkerNode)
574b351d 803 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
4d8bf9e4 804 expect(workerNode.tasksQueue.size).toBe(0)
9c16fb4b 805 expect(workerNode.tasksQueue.maxSize).toBe(0)
bf9549ae 806 }
fd7ebd49 807 await pool.destroy()
2431bdb4
JB
808 pool = new DynamicThreadPool(
809 Math.floor(numberOfWorkers / 2),
810 numberOfWorkers,
b2fd3f4a 811 './tests/worker-files/thread/testWorker.mjs'
2431bdb4
JB
812 )
813 for (const workerNode of pool.workerNodes) {
47352846 814 expect(workerNode).toBeInstanceOf(WorkerNode)
574b351d 815 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
2431bdb4
JB
816 expect(workerNode.tasksQueue.size).toBe(0)
817 expect(workerNode.tasksQueue.maxSize).toBe(0)
818 }
213cbac6 819 await pool.destroy()
2431bdb4
JB
820 })
821
822 it('Verify that pool worker info are initialized', async () => {
823 let pool = new FixedClusterPool(
824 numberOfWorkers,
825 './tests/worker-files/cluster/testWorker.js'
826 )
2dca6cab 827 for (const workerNode of pool.workerNodes) {
47352846 828 expect(workerNode).toBeInstanceOf(WorkerNode)
2dca6cab
JB
829 expect(workerNode.info).toStrictEqual({
830 id: expect.any(Number),
831 type: WorkerTypes.cluster,
832 dynamic: false,
833 ready: true
834 })
835 }
2431bdb4
JB
836 await pool.destroy()
837 pool = new DynamicThreadPool(
838 Math.floor(numberOfWorkers / 2),
839 numberOfWorkers,
b2fd3f4a 840 './tests/worker-files/thread/testWorker.mjs'
2431bdb4 841 )
2dca6cab 842 for (const workerNode of pool.workerNodes) {
47352846 843 expect(workerNode).toBeInstanceOf(WorkerNode)
2dca6cab
JB
844 expect(workerNode.info).toStrictEqual({
845 id: expect.any(Number),
846 type: WorkerTypes.thread,
847 dynamic: false,
7884d183 848 ready: true
2dca6cab
JB
849 })
850 }
213cbac6 851 await pool.destroy()
bf9549ae
JB
852 })
853
711623b8
JB
854 it('Verify that pool statuses are checked at start or destroy', async () => {
855 const pool = new FixedThreadPool(
856 numberOfWorkers,
857 './tests/worker-files/thread/testWorker.mjs'
858 )
859 expect(pool.info.started).toBe(true)
860 expect(pool.info.ready).toBe(true)
861 expect(() => pool.start()).toThrow(
862 new Error('Cannot start an already started pool')
863 )
864 await pool.destroy()
865 expect(pool.info.started).toBe(false)
866 expect(pool.info.ready).toBe(false)
867 await expect(pool.destroy()).rejects.toThrow(
868 new Error('Cannot destroy an already destroyed pool')
869 )
870 })
871
47352846
JB
872 it('Verify that pool can be started after initialization', async () => {
873 const pool = new FixedClusterPool(
874 numberOfWorkers,
875 './tests/worker-files/cluster/testWorker.js',
876 {
877 startWorkers: false
878 }
879 )
880 expect(pool.info.started).toBe(false)
881 expect(pool.info.ready).toBe(false)
55082af9 882 expect(pool.readyEventEmitted).toBe(false)
47352846 883 expect(pool.workerNodes).toStrictEqual([])
948faff7 884 await expect(pool.execute()).rejects.toThrow(
47352846
JB
885 new Error('Cannot execute a task on not started pool')
886 )
887 pool.start()
888 expect(pool.info.started).toBe(true)
889 expect(pool.info.ready).toBe(true)
55082af9
JB
890 await waitPoolEvents(pool, PoolEvents.ready, 1)
891 expect(pool.readyEventEmitted).toBe(true)
47352846
JB
892 expect(pool.workerNodes.length).toBe(numberOfWorkers)
893 for (const workerNode of pool.workerNodes) {
894 expect(workerNode).toBeInstanceOf(WorkerNode)
895 }
896 await pool.destroy()
897 })
898
9d2d0da1
JB
899 it('Verify that pool execute() arguments are checked', async () => {
900 const pool = new FixedClusterPool(
901 numberOfWorkers,
902 './tests/worker-files/cluster/testWorker.js'
903 )
948faff7 904 await expect(pool.execute(undefined, 0)).rejects.toThrow(
9d2d0da1
JB
905 new TypeError('name argument must be a string')
906 )
948faff7 907 await expect(pool.execute(undefined, '')).rejects.toThrow(
9d2d0da1
JB
908 new TypeError('name argument must not be an empty string')
909 )
948faff7 910 await expect(pool.execute(undefined, undefined, {})).rejects.toThrow(
9d2d0da1
JB
911 new TypeError('transferList argument must be an array')
912 )
913 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
914 "Task function 'unknown' not found"
915 )
916 await pool.destroy()
948faff7 917 await expect(pool.execute()).rejects.toThrow(
47352846 918 new Error('Cannot execute a task on not started pool')
9d2d0da1
JB
919 )
920 })
921
2431bdb4 922 it('Verify that pool worker tasks usage are computed', async () => {
bf9549ae
JB
923 const pool = new FixedClusterPool(
924 numberOfWorkers,
925 './tests/worker-files/cluster/testWorker.js'
926 )
09c2d0d3 927 const promises = new Set()
fc027381
JB
928 const maxMultiplier = 2
929 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
09c2d0d3 930 promises.add(pool.execute())
bf9549ae 931 }
f06e48d8 932 for (const workerNode of pool.workerNodes) {
465b2940 933 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
934 tasks: {
935 executed: 0,
936 executing: maxMultiplier,
937 queued: 0,
df593701 938 maxQueued: 0,
463226a4 939 sequentiallyStolen: 0,
68cbdc84 940 stolen: 0,
a4e07f72
JB
941 failed: 0
942 },
943 runTime: {
a4e07f72
JB
944 history: expect.any(CircularArray)
945 },
946 waitTime: {
a4e07f72
JB
947 history: expect.any(CircularArray)
948 },
5df69fab
JB
949 elu: {
950 idle: {
5df69fab
JB
951 history: expect.any(CircularArray)
952 },
953 active: {
5df69fab 954 history: expect.any(CircularArray)
f7510105 955 }
5df69fab 956 }
86bf340d 957 })
bf9549ae
JB
958 }
959 await Promise.all(promises)
f06e48d8 960 for (const workerNode of pool.workerNodes) {
465b2940 961 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
962 tasks: {
963 executed: maxMultiplier,
964 executing: 0,
965 queued: 0,
df593701 966 maxQueued: 0,
463226a4 967 sequentiallyStolen: 0,
68cbdc84 968 stolen: 0,
a4e07f72
JB
969 failed: 0
970 },
971 runTime: {
a4e07f72
JB
972 history: expect.any(CircularArray)
973 },
974 waitTime: {
a4e07f72
JB
975 history: expect.any(CircularArray)
976 },
5df69fab
JB
977 elu: {
978 idle: {
5df69fab
JB
979 history: expect.any(CircularArray)
980 },
981 active: {
5df69fab 982 history: expect.any(CircularArray)
f7510105 983 }
5df69fab 984 }
86bf340d 985 })
bf9549ae 986 }
fd7ebd49 987 await pool.destroy()
bf9549ae
JB
988 })
989
2431bdb4 990 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
7fd82a1c 991 const pool = new DynamicThreadPool(
2431bdb4 992 Math.floor(numberOfWorkers / 2),
8f4878b7 993 numberOfWorkers,
b2fd3f4a 994 './tests/worker-files/thread/testWorker.mjs'
9e619829 995 )
09c2d0d3 996 const promises = new Set()
ee9f5295
JB
997 const maxMultiplier = 2
998 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
09c2d0d3 999 promises.add(pool.execute())
9e619829
JB
1000 }
1001 await Promise.all(promises)
f06e48d8 1002 for (const workerNode of pool.workerNodes) {
465b2940 1003 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
1004 tasks: {
1005 executed: expect.any(Number),
1006 executing: 0,
1007 queued: 0,
df593701 1008 maxQueued: 0,
463226a4 1009 sequentiallyStolen: 0,
68cbdc84 1010 stolen: 0,
a4e07f72
JB
1011 failed: 0
1012 },
1013 runTime: {
a4e07f72
JB
1014 history: expect.any(CircularArray)
1015 },
1016 waitTime: {
a4e07f72
JB
1017 history: expect.any(CircularArray)
1018 },
5df69fab
JB
1019 elu: {
1020 idle: {
5df69fab
JB
1021 history: expect.any(CircularArray)
1022 },
1023 active: {
5df69fab 1024 history: expect.any(CircularArray)
f7510105 1025 }
5df69fab 1026 }
86bf340d 1027 })
465b2940 1028 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
94407def
JB
1029 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1030 numberOfWorkers * maxMultiplier
1031 )
b97d82d8
JB
1032 expect(workerNode.usage.runTime.history.length).toBe(0)
1033 expect(workerNode.usage.waitTime.history.length).toBe(0)
1034 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1035 expect(workerNode.usage.elu.active.history.length).toBe(0)
9e619829
JB
1036 }
1037 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
f06e48d8 1038 for (const workerNode of pool.workerNodes) {
465b2940 1039 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
1040 tasks: {
1041 executed: 0,
1042 executing: 0,
1043 queued: 0,
df593701 1044 maxQueued: 0,
463226a4 1045 sequentiallyStolen: 0,
68cbdc84 1046 stolen: 0,
a4e07f72
JB
1047 failed: 0
1048 },
1049 runTime: {
a4e07f72
JB
1050 history: expect.any(CircularArray)
1051 },
1052 waitTime: {
a4e07f72
JB
1053 history: expect.any(CircularArray)
1054 },
5df69fab
JB
1055 elu: {
1056 idle: {
5df69fab
JB
1057 history: expect.any(CircularArray)
1058 },
1059 active: {
5df69fab 1060 history: expect.any(CircularArray)
f7510105 1061 }
5df69fab 1062 }
86bf340d 1063 })
465b2940
JB
1064 expect(workerNode.usage.runTime.history.length).toBe(0)
1065 expect(workerNode.usage.waitTime.history.length).toBe(0)
b97d82d8
JB
1066 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1067 expect(workerNode.usage.elu.active.history.length).toBe(0)
ee11a4a2 1068 }
fd7ebd49 1069 await pool.destroy()
ee11a4a2
JB
1070 })
1071
a1763c54
JB
1072 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1073 const pool = new DynamicClusterPool(
2431bdb4 1074 Math.floor(numberOfWorkers / 2),
164d950a 1075 numberOfWorkers,
a1763c54 1076 './tests/worker-files/cluster/testWorker.js'
164d950a 1077 )
c726f66c 1078 expect(pool.emitter.eventNames()).toStrictEqual([])
d46660cd 1079 let poolInfo
a1763c54 1080 let poolReady = 0
041dc05b 1081 pool.emitter.on(PoolEvents.ready, info => {
a1763c54 1082 ++poolReady
d46660cd
JB
1083 poolInfo = info
1084 })
a1763c54 1085 await waitPoolEvents(pool, PoolEvents.ready, 1)
c726f66c 1086 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
a1763c54 1087 expect(poolReady).toBe(1)
d46660cd 1088 expect(poolInfo).toStrictEqual({
23ccf9d7 1089 version,
d46660cd 1090 type: PoolTypes.dynamic,
a1763c54 1091 worker: WorkerTypes.cluster,
47352846 1092 started: true,
a1763c54 1093 ready: true,
2431bdb4
JB
1094 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1095 minSize: expect.any(Number),
1096 maxSize: expect.any(Number),
1097 workerNodes: expect.any(Number),
1098 idleWorkerNodes: expect.any(Number),
1099 busyWorkerNodes: expect.any(Number),
1100 executedTasks: expect.any(Number),
1101 executingTasks: expect.any(Number),
2431bdb4
JB
1102 failedTasks: expect.any(Number)
1103 })
1104 await pool.destroy()
1105 })
1106
a1763c54
JB
1107 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1108 const pool = new FixedThreadPool(
2431bdb4 1109 numberOfWorkers,
b2fd3f4a 1110 './tests/worker-files/thread/testWorker.mjs'
2431bdb4 1111 )
c726f66c 1112 expect(pool.emitter.eventNames()).toStrictEqual([])
a1763c54
JB
1113 const promises = new Set()
1114 let poolBusy = 0
2431bdb4 1115 let poolInfo
041dc05b 1116 pool.emitter.on(PoolEvents.busy, info => {
a1763c54 1117 ++poolBusy
2431bdb4
JB
1118 poolInfo = info
1119 })
c726f66c 1120 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
a1763c54
JB
1121 for (let i = 0; i < numberOfWorkers * 2; i++) {
1122 promises.add(pool.execute())
1123 }
1124 await Promise.all(promises)
1125 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1126 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1127 expect(poolBusy).toBe(numberOfWorkers + 1)
2431bdb4
JB
1128 expect(poolInfo).toStrictEqual({
1129 version,
a1763c54
JB
1130 type: PoolTypes.fixed,
1131 worker: WorkerTypes.thread,
47352846
JB
1132 started: true,
1133 ready: true,
2431bdb4 1134 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
d46660cd
JB
1135 minSize: expect.any(Number),
1136 maxSize: expect.any(Number),
1137 workerNodes: expect.any(Number),
1138 idleWorkerNodes: expect.any(Number),
1139 busyWorkerNodes: expect.any(Number),
a4e07f72
JB
1140 executedTasks: expect.any(Number),
1141 executingTasks: expect.any(Number),
a4e07f72 1142 failedTasks: expect.any(Number)
d46660cd 1143 })
164d950a
JB
1144 await pool.destroy()
1145 })
1146
a1763c54
JB
1147 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1148 const pool = new DynamicThreadPool(
1149 Math.floor(numberOfWorkers / 2),
7c0ba920 1150 numberOfWorkers,
b2fd3f4a 1151 './tests/worker-files/thread/testWorker.mjs'
7c0ba920 1152 )
c726f66c 1153 expect(pool.emitter.eventNames()).toStrictEqual([])
09c2d0d3 1154 const promises = new Set()
a1763c54 1155 let poolFull = 0
d46660cd 1156 let poolInfo
041dc05b 1157 pool.emitter.on(PoolEvents.full, info => {
a1763c54 1158 ++poolFull
d46660cd
JB
1159 poolInfo = info
1160 })
c726f66c 1161 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.full])
7c0ba920 1162 for (let i = 0; i < numberOfWorkers * 2; i++) {
f5d14e90 1163 promises.add(pool.execute())
7c0ba920 1164 }
cf597bc5 1165 await Promise.all(promises)
33e6bb4c 1166 expect(poolFull).toBe(1)
d46660cd 1167 expect(poolInfo).toStrictEqual({
23ccf9d7 1168 version,
a1763c54 1169 type: PoolTypes.dynamic,
d46660cd 1170 worker: WorkerTypes.thread,
47352846
JB
1171 started: true,
1172 ready: true,
2431bdb4 1173 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
8735b4e5
JB
1174 minSize: expect.any(Number),
1175 maxSize: expect.any(Number),
1176 workerNodes: expect.any(Number),
1177 idleWorkerNodes: expect.any(Number),
1178 busyWorkerNodes: expect.any(Number),
1179 executedTasks: expect.any(Number),
1180 executingTasks: expect.any(Number),
1181 failedTasks: expect.any(Number)
1182 })
1183 await pool.destroy()
1184 })
1185
3e8611a8 1186 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
b1aae695 1187 const pool = new FixedThreadPool(
8735b4e5 1188 numberOfWorkers,
b2fd3f4a 1189 './tests/worker-files/thread/testWorker.mjs',
8735b4e5
JB
1190 {
1191 enableTasksQueue: true
1192 }
1193 )
a074ffee 1194 stub(pool, 'hasBackPressure').returns(true)
c726f66c 1195 expect(pool.emitter.eventNames()).toStrictEqual([])
8735b4e5
JB
1196 const promises = new Set()
1197 let poolBackPressure = 0
1198 let poolInfo
041dc05b 1199 pool.emitter.on(PoolEvents.backPressure, info => {
8735b4e5
JB
1200 ++poolBackPressure
1201 poolInfo = info
1202 })
c726f66c 1203 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.backPressure])
033f1776 1204 for (let i = 0; i < numberOfWorkers + 1; i++) {
8735b4e5
JB
1205 promises.add(pool.execute())
1206 }
1207 await Promise.all(promises)
033f1776 1208 expect(poolBackPressure).toBe(1)
8735b4e5
JB
1209 expect(poolInfo).toStrictEqual({
1210 version,
3e8611a8 1211 type: PoolTypes.fixed,
8735b4e5 1212 worker: WorkerTypes.thread,
47352846
JB
1213 started: true,
1214 ready: true,
8735b4e5 1215 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
d46660cd
JB
1216 minSize: expect.any(Number),
1217 maxSize: expect.any(Number),
1218 workerNodes: expect.any(Number),
1219 idleWorkerNodes: expect.any(Number),
1220 busyWorkerNodes: expect.any(Number),
a4e07f72
JB
1221 executedTasks: expect.any(Number),
1222 executingTasks: expect.any(Number),
3e8611a8
JB
1223 maxQueuedTasks: expect.any(Number),
1224 queuedTasks: expect.any(Number),
1225 backPressure: true,
68cbdc84 1226 stolenTasks: expect.any(Number),
a4e07f72 1227 failedTasks: expect.any(Number)
d46660cd 1228 })
e458c82e 1229 expect(pool.hasBackPressure.callCount).toBe(5)
fd7ebd49 1230 await pool.destroy()
7c0ba920 1231 })
70a4f5ea 1232
85b2561d
JB
1233 it('Verify that destroy() waits for queued tasks to finish', async () => {
1234 const tasksFinishedTimeout = 2500
1235 const pool = new FixedThreadPool(
1236 numberOfWorkers,
1237 './tests/worker-files/thread/asyncWorker.mjs',
1238 {
1239 enableTasksQueue: true,
1240 tasksQueueOptions: { tasksFinishedTimeout }
1241 }
1242 )
1243 const maxMultiplier = 4
1244 let tasksFinished = 0
1245 for (const workerNode of pool.workerNodes) {
1246 workerNode.on('taskFinished', () => {
1247 ++tasksFinished
1248 })
1249 }
1250 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1251 pool.execute()
1252 }
1253 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1254 const startTime = performance.now()
1255 await pool.destroy()
1256 const elapsedTime = performance.now() - startTime
1257 expect(tasksFinished).toBe(numberOfWorkers * maxMultiplier)
1258 expect(elapsedTime).toBeGreaterThanOrEqual(2000)
6d41131f 1259 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 100)
85b2561d
JB
1260 })
1261
1262 it('Verify that destroy() waits until the tasks finished timeout is reached', async () => {
1263 const tasksFinishedTimeout = 1000
1264 const pool = new FixedThreadPool(
1265 numberOfWorkers,
1266 './tests/worker-files/thread/asyncWorker.mjs',
1267 {
1268 enableTasksQueue: true,
1269 tasksQueueOptions: { tasksFinishedTimeout }
1270 }
1271 )
1272 const maxMultiplier = 4
1273 let tasksFinished = 0
1274 for (const workerNode of pool.workerNodes) {
1275 workerNode.on('taskFinished', () => {
1276 ++tasksFinished
1277 })
1278 }
1279 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1280 pool.execute()
1281 }
1282 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1283 const startTime = performance.now()
1284 await pool.destroy()
1285 const elapsedTime = performance.now() - startTime
1286 expect(tasksFinished).toBe(0)
c004ecec 1287 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 600)
85b2561d
JB
1288 })
1289
f18fd12b
JB
1290 it('Verify that pool asynchronous resource track tasks execution', async () => {
1291 let taskAsyncId
1292 let initCalls = 0
1293 let beforeCalls = 0
1294 let afterCalls = 0
1295 let resolveCalls = 0
1296 const hook = createHook({
1297 init (asyncId, type) {
1298 if (type === 'poolifier:task') {
1299 initCalls++
1300 taskAsyncId = asyncId
1301 }
1302 },
1303 before (asyncId) {
1304 if (asyncId === taskAsyncId) beforeCalls++
1305 },
1306 after (asyncId) {
1307 if (asyncId === taskAsyncId) afterCalls++
1308 },
1309 promiseResolve () {
1310 if (executionAsyncId() === taskAsyncId) resolveCalls++
1311 }
1312 })
f18fd12b
JB
1313 const pool = new FixedThreadPool(
1314 numberOfWorkers,
1315 './tests/worker-files/thread/testWorker.mjs'
1316 )
8954c0a3 1317 hook.enable()
f18fd12b
JB
1318 await pool.execute()
1319 hook.disable()
1320 expect(initCalls).toBe(1)
1321 expect(beforeCalls).toBe(1)
1322 expect(afterCalls).toBe(1)
1323 expect(resolveCalls).toBe(1)
8954c0a3 1324 await pool.destroy()
f18fd12b
JB
1325 })
1326
9eae3c69
JB
1327 it('Verify that hasTaskFunction() is working', async () => {
1328 const dynamicThreadPool = new DynamicThreadPool(
1329 Math.floor(numberOfWorkers / 2),
1330 numberOfWorkers,
b2fd3f4a 1331 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
9eae3c69
JB
1332 )
1333 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1334 expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1335 expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1336 true
1337 )
1338 expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true)
1339 expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true)
1340 expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false)
1341 await dynamicThreadPool.destroy()
1342 const fixedClusterPool = new FixedClusterPool(
1343 numberOfWorkers,
1344 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1345 )
1346 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1347 expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1348 expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1349 true
1350 )
1351 expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true)
1352 expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true)
1353 expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false)
1354 await fixedClusterPool.destroy()
1355 })
1356
1357 it('Verify that addTaskFunction() is working', async () => {
1358 const dynamicThreadPool = new DynamicThreadPool(
1359 Math.floor(numberOfWorkers / 2),
1360 numberOfWorkers,
b2fd3f4a 1361 './tests/worker-files/thread/testWorker.mjs'
9eae3c69
JB
1362 )
1363 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
3feeab69
JB
1364 await expect(
1365 dynamicThreadPool.addTaskFunction(0, () => {})
948faff7 1366 ).rejects.toThrow(new TypeError('name argument must be a string'))
3feeab69
JB
1367 await expect(
1368 dynamicThreadPool.addTaskFunction('', () => {})
948faff7 1369 ).rejects.toThrow(
3feeab69
JB
1370 new TypeError('name argument must not be an empty string')
1371 )
948faff7
JB
1372 await expect(dynamicThreadPool.addTaskFunction('test', 0)).rejects.toThrow(
1373 new TypeError('fn argument must be a function')
1374 )
1375 await expect(dynamicThreadPool.addTaskFunction('test', '')).rejects.toThrow(
1376 new TypeError('fn argument must be a function')
1377 )
9eae3c69
JB
1378 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1379 DEFAULT_TASK_NAME,
1380 'test'
1381 ])
1382 const echoTaskFunction = data => {
1383 return data
1384 }
1385 await expect(
1386 dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1387 ).resolves.toBe(true)
1388 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1389 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1390 echoTaskFunction
1391 )
1392 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1393 DEFAULT_TASK_NAME,
1394 'test',
1395 'echo'
1396 ])
1397 const taskFunctionData = { test: 'test' }
1398 const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
1399 expect(echoResult).toStrictEqual(taskFunctionData)
adee6053
JB
1400 for (const workerNode of dynamicThreadPool.workerNodes) {
1401 expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
1402 tasks: {
1403 executed: expect.any(Number),
1404 executing: 0,
1405 queued: 0,
463226a4 1406 sequentiallyStolen: 0,
5ad42e34 1407 stolen: 0,
adee6053
JB
1408 failed: 0
1409 },
1410 runTime: {
1411 history: new CircularArray()
1412 },
1413 waitTime: {
1414 history: new CircularArray()
1415 },
1416 elu: {
1417 idle: {
1418 history: new CircularArray()
1419 },
1420 active: {
1421 history: new CircularArray()
1422 }
1423 }
1424 })
1425 }
9eae3c69
JB
1426 await dynamicThreadPool.destroy()
1427 })
1428
1429 it('Verify that removeTaskFunction() is working', async () => {
1430 const dynamicThreadPool = new DynamicThreadPool(
1431 Math.floor(numberOfWorkers / 2),
1432 numberOfWorkers,
b2fd3f4a 1433 './tests/worker-files/thread/testWorker.mjs'
9eae3c69
JB
1434 )
1435 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1436 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1437 DEFAULT_TASK_NAME,
1438 'test'
1439 ])
948faff7 1440 await expect(dynamicThreadPool.removeTaskFunction('test')).rejects.toThrow(
16248b23 1441 new Error('Cannot remove a task function not handled on the pool side')
9eae3c69
JB
1442 )
1443 const echoTaskFunction = data => {
1444 return data
1445 }
1446 await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1447 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1448 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1449 echoTaskFunction
1450 )
1451 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1452 DEFAULT_TASK_NAME,
1453 'test',
1454 'echo'
1455 ])
1456 await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
1457 true
1458 )
1459 expect(dynamicThreadPool.taskFunctions.size).toBe(0)
1460 expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
1461 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1462 DEFAULT_TASK_NAME,
1463 'test'
1464 ])
1465 await dynamicThreadPool.destroy()
1466 })
1467
30500265 1468 it('Verify that listTaskFunctionNames() is working', async () => {
90d7d101
JB
1469 const dynamicThreadPool = new DynamicThreadPool(
1470 Math.floor(numberOfWorkers / 2),
1471 numberOfWorkers,
b2fd3f4a 1472 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
90d7d101
JB
1473 )
1474 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
66979634 1475 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
6cd5248f 1476 DEFAULT_TASK_NAME,
90d7d101
JB
1477 'jsonIntegerSerialization',
1478 'factorial',
1479 'fibonacci'
1480 ])
9eae3c69 1481 await dynamicThreadPool.destroy()
90d7d101
JB
1482 const fixedClusterPool = new FixedClusterPool(
1483 numberOfWorkers,
1484 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1485 )
1486 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
66979634 1487 expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([
6cd5248f 1488 DEFAULT_TASK_NAME,
90d7d101
JB
1489 'jsonIntegerSerialization',
1490 'factorial',
1491 'fibonacci'
1492 ])
0fe39c97 1493 await fixedClusterPool.destroy()
90d7d101
JB
1494 })
1495
9eae3c69 1496 it('Verify that setDefaultTaskFunction() is working', async () => {
30500265
JB
1497 const dynamicThreadPool = new DynamicThreadPool(
1498 Math.floor(numberOfWorkers / 2),
1499 numberOfWorkers,
b2fd3f4a 1500 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
30500265
JB
1501 )
1502 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
711623b8 1503 const workerId = dynamicThreadPool.workerNodes[0].info.id
948faff7 1504 await expect(dynamicThreadPool.setDefaultTaskFunction(0)).rejects.toThrow(
b0b55f57 1505 new Error(
711623b8 1506 `Task function operation 'default' failed on worker ${workerId} with error: 'TypeError: name parameter is not a string'`
b0b55f57
JB
1507 )
1508 )
1509 await expect(
1510 dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
948faff7 1511 ).rejects.toThrow(
b0b55f57 1512 new Error(
711623b8 1513 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function reserved name as the default task function'`
b0b55f57
JB
1514 )
1515 )
1516 await expect(
1517 dynamicThreadPool.setDefaultTaskFunction('unknown')
948faff7 1518 ).rejects.toThrow(
b0b55f57 1519 new Error(
711623b8 1520 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function to a non-existing task function'`
b0b55f57
JB
1521 )
1522 )
9eae3c69
JB
1523 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1524 DEFAULT_TASK_NAME,
1525 'jsonIntegerSerialization',
1526 'factorial',
1527 'fibonacci'
1528 ])
1529 await expect(
1530 dynamicThreadPool.setDefaultTaskFunction('factorial')
1531 ).resolves.toBe(true)
1532 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1533 DEFAULT_TASK_NAME,
1534 'factorial',
1535 'jsonIntegerSerialization',
1536 'fibonacci'
1537 ])
1538 await expect(
1539 dynamicThreadPool.setDefaultTaskFunction('fibonacci')
1540 ).resolves.toBe(true)
1541 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1542 DEFAULT_TASK_NAME,
1543 'fibonacci',
1544 'jsonIntegerSerialization',
1545 'factorial'
1546 ])
cda9ba34 1547 await dynamicThreadPool.destroy()
30500265
JB
1548 })
1549
90d7d101 1550 it('Verify that multiple task functions worker is working', async () => {
70a4f5ea 1551 const pool = new DynamicClusterPool(
2431bdb4 1552 Math.floor(numberOfWorkers / 2),
70a4f5ea 1553 numberOfWorkers,
90d7d101 1554 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
70a4f5ea
JB
1555 )
1556 const data = { n: 10 }
82888165 1557 const result0 = await pool.execute(data)
30b963d4 1558 expect(result0).toStrictEqual({ ok: 1 })
70a4f5ea 1559 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
30b963d4 1560 expect(result1).toStrictEqual({ ok: 1 })
70a4f5ea
JB
1561 const result2 = await pool.execute(data, 'factorial')
1562 expect(result2).toBe(3628800)
1563 const result3 = await pool.execute(data, 'fibonacci')
024daf59 1564 expect(result3).toBe(55)
5bb5be17
JB
1565 expect(pool.info.executingTasks).toBe(0)
1566 expect(pool.info.executedTasks).toBe(4)
b414b84c 1567 for (const workerNode of pool.workerNodes) {
66979634 1568 expect(workerNode.info.taskFunctionNames).toStrictEqual([
6cd5248f 1569 DEFAULT_TASK_NAME,
b414b84c
JB
1570 'jsonIntegerSerialization',
1571 'factorial',
1572 'fibonacci'
1573 ])
1574 expect(workerNode.taskFunctionsUsage.size).toBe(3)
66979634 1575 for (const name of pool.listTaskFunctionNames()) {
5bb5be17
JB
1576 expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
1577 tasks: {
1578 executed: expect.any(Number),
4ba4c7f9 1579 executing: 0,
5bb5be17 1580 failed: 0,
68cbdc84 1581 queued: 0,
463226a4 1582 sequentiallyStolen: 0,
68cbdc84 1583 stolen: 0
5bb5be17
JB
1584 },
1585 runTime: {
1586 history: expect.any(CircularArray)
1587 },
1588 waitTime: {
1589 history: expect.any(CircularArray)
1590 },
1591 elu: {
1592 idle: {
1593 history: expect.any(CircularArray)
1594 },
1595 active: {
1596 history: expect.any(CircularArray)
1597 }
1598 }
1599 })
1600 expect(
4ba4c7f9
JB
1601 workerNode.getTaskFunctionWorkerUsage(name).tasks.executed
1602 ).toBeGreaterThan(0)
5bb5be17 1603 }
dfd7ec01
JB
1604 expect(
1605 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1606 ).toStrictEqual(
66979634
JB
1607 workerNode.getTaskFunctionWorkerUsage(
1608 workerNode.info.taskFunctionNames[1]
1609 )
dfd7ec01 1610 )
5bb5be17 1611 }
0fe39c97 1612 await pool.destroy()
70a4f5ea 1613 })
52a23942
JB
1614
1615 it('Verify sendKillMessageToWorker()', async () => {
1616 const pool = new DynamicClusterPool(
1617 Math.floor(numberOfWorkers / 2),
1618 numberOfWorkers,
1619 './tests/worker-files/cluster/testWorker.js'
1620 )
1621 const workerNodeKey = 0
1622 await expect(
adee6053 1623 pool.sendKillMessageToWorker(workerNodeKey)
52a23942 1624 ).resolves.toBeUndefined()
85b2561d
JB
1625 await expect(
1626 pool.sendKillMessageToWorker(numberOfWorkers)
1627 ).rejects.toStrictEqual(
1628 new Error(`Invalid worker node key '${numberOfWorkers}'`)
1629 )
52a23942
JB
1630 await pool.destroy()
1631 })
adee6053
JB
1632
1633 it('Verify sendTaskFunctionOperationToWorker()', async () => {
1634 const pool = new DynamicClusterPool(
1635 Math.floor(numberOfWorkers / 2),
1636 numberOfWorkers,
1637 './tests/worker-files/cluster/testWorker.js'
1638 )
1639 const workerNodeKey = 0
1640 await expect(
1641 pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
1642 taskFunctionOperation: 'add',
1643 taskFunctionName: 'empty',
1644 taskFunction: (() => {}).toString()
1645 })
1646 ).resolves.toBe(true)
1647 expect(
1648 pool.workerNodes[workerNodeKey].info.taskFunctionNames
1649 ).toStrictEqual([DEFAULT_TASK_NAME, 'test', 'empty'])
1650 await pool.destroy()
1651 })
1652
1653 it('Verify sendTaskFunctionOperationToWorkers()', async () => {
1654 const pool = new DynamicClusterPool(
1655 Math.floor(numberOfWorkers / 2),
1656 numberOfWorkers,
1657 './tests/worker-files/cluster/testWorker.js'
1658 )
adee6053
JB
1659 await expect(
1660 pool.sendTaskFunctionOperationToWorkers({
1661 taskFunctionOperation: 'add',
1662 taskFunctionName: 'empty',
1663 taskFunction: (() => {}).toString()
1664 })
1665 ).resolves.toBe(true)
1666 for (const workerNode of pool.workerNodes) {
1667 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1668 DEFAULT_TASK_NAME,
1669 'test',
1670 'empty'
1671 ])
1672 }
1673 await pool.destroy()
1674 })
3ec964d6 1675})