test: align randomInt interval
[poolifier.git] / tests / pools / abstract-pool.test.mjs
CommitLineData
a074ffee 1import { EventEmitterAsyncResource } from 'node:events'
2eb14889 2import { dirname, join } from 'node:path'
a074ffee 3import { readFileSync } from 'node:fs'
2eb14889 4import { fileURLToPath } from 'node:url'
f18fd12b 5import { createHook, executionAsyncId } from 'node:async_hooks'
a074ffee
JB
6import { expect } from 'expect'
7import { restore, stub } from 'sinon'
8import {
70a4f5ea 9 DynamicClusterPool,
9e619829 10 DynamicThreadPool,
aee46736 11 FixedClusterPool,
e843b904 12 FixedThreadPool,
aee46736 13 PoolEvents,
184855e6 14 PoolTypes,
3d6dd312 15 WorkerChoiceStrategies,
184855e6 16 WorkerTypes
d35e5717
JB
17} from '../../lib/index.cjs'
18import { CircularArray } from '../../lib/circular-array.cjs'
19import { Deque } from '../../lib/deque.cjs'
20import { DEFAULT_TASK_NAME } from '../../lib/utils.cjs'
21import { waitPoolEvents } from '../test-utils.cjs'
22import { WorkerNode } from '../../lib/pools/worker-node.cjs'
e1ffb94f
JB
23
24describe('Abstract pool test suite', () => {
2eb14889
JB
25 const version = JSON.parse(
26 readFileSync(
a5e5599c 27 join(dirname(fileURLToPath(import.meta.url)), '../..', 'package.json'),
2eb14889
JB
28 'utf8'
29 )
30 ).version
fc027381 31 const numberOfWorkers = 2
a8884ffd 32 class StubPoolWithIsMain extends FixedThreadPool {
e1ffb94f
JB
33 isMain () {
34 return false
35 }
3ec964d6 36 }
3ec964d6 37
dc021bcc 38 afterEach(() => {
a074ffee 39 restore()
dc021bcc
JB
40 })
41
bcf85f7f
JB
42 it('Verify that pool can be created and destroyed', async () => {
43 const pool = new FixedThreadPool(
44 numberOfWorkers,
45 './tests/worker-files/thread/testWorker.mjs'
46 )
47 expect(pool).toBeInstanceOf(FixedThreadPool)
48 await pool.destroy()
49 })
50
51 it('Verify that pool cannot be created from a non main thread/process', () => {
8d3782fa
JB
52 expect(
53 () =>
a8884ffd 54 new StubPoolWithIsMain(
7c0ba920 55 numberOfWorkers,
b2fd3f4a 56 './tests/worker-files/thread/testWorker.mjs',
8d3782fa 57 {
041dc05b 58 errorHandler: e => console.error(e)
8d3782fa
JB
59 }
60 )
948faff7 61 ).toThrow(
e695d66f
JB
62 new Error(
63 'Cannot start a pool from a worker with the same type as the pool'
64 )
04f45163 65 )
3ec964d6 66 })
c510fea7 67
bc61cfe6
JB
68 it('Verify that pool statuses properties are set', async () => {
69 const pool = new FixedThreadPool(
70 numberOfWorkers,
b2fd3f4a 71 './tests/worker-files/thread/testWorker.mjs'
bc61cfe6 72 )
bc61cfe6 73 expect(pool.started).toBe(true)
711623b8
JB
74 expect(pool.starting).toBe(false)
75 expect(pool.destroying).toBe(false)
bc61cfe6 76 await pool.destroy()
bc61cfe6
JB
77 })
78
c510fea7 79 it('Verify that filePath is checked', () => {
948faff7 80 expect(() => new FixedThreadPool(numberOfWorkers)).toThrow(
c3719753
JB
81 new TypeError('The worker file path must be specified')
82 )
83 expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrow(
84 new TypeError('The worker file path must be a string')
3d6dd312
JB
85 )
86 expect(
87 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
948faff7 88 ).toThrow(new Error("Cannot find the worker file './dummyWorker.ts'"))
8d3782fa
JB
89 })
90
91 it('Verify that numberOfWorkers is checked', () => {
8003c026
JB
92 expect(
93 () =>
94 new FixedThreadPool(
95 undefined,
b2fd3f4a 96 './tests/worker-files/thread/testWorker.mjs'
8003c026 97 )
948faff7 98 ).toThrow(
e695d66f
JB
99 new Error(
100 'Cannot instantiate a pool without specifying the number of workers'
101 )
8d3782fa
JB
102 )
103 })
104
105 it('Verify that a negative number of workers is checked', () => {
106 expect(
107 () =>
d35e5717 108 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.cjs')
948faff7 109 ).toThrow(
473c717a
JB
110 new RangeError(
111 'Cannot instantiate a pool with a negative number of workers'
112 )
8d3782fa
JB
113 )
114 })
115
116 it('Verify that a non integer number of workers is checked', () => {
117 expect(
118 () =>
b2fd3f4a 119 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.mjs')
948faff7 120 ).toThrow(
473c717a 121 new TypeError(
0d80593b 122 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
123 )
124 )
c510fea7 125 })
7c0ba920 126
26ce26ca
JB
127 it('Verify that pool arguments number and pool type are checked', () => {
128 expect(
129 () =>
130 new FixedThreadPool(
131 numberOfWorkers,
132 './tests/worker-files/thread/testWorker.mjs',
133 undefined,
134 numberOfWorkers * 2
135 )
136 ).toThrow(
137 new Error(
138 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
139 )
140 )
141 })
142
216541b6 143 it('Verify that dynamic pool sizing is checked', () => {
a5ed75b7
JB
144 expect(
145 () =>
146 new DynamicClusterPool(
147 1,
148 undefined,
d35e5717 149 './tests/worker-files/cluster/testWorker.cjs'
a5ed75b7 150 )
948faff7 151 ).toThrow(
a5ed75b7
JB
152 new TypeError(
153 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
154 )
155 )
2761efb4
JB
156 expect(
157 () =>
158 new DynamicThreadPool(
159 0.5,
160 1,
b2fd3f4a 161 './tests/worker-files/thread/testWorker.mjs'
2761efb4 162 )
948faff7 163 ).toThrow(
2761efb4
JB
164 new TypeError(
165 'Cannot instantiate a pool with a non safe integer number of workers'
166 )
167 )
168 expect(
169 () =>
170 new DynamicClusterPool(
171 0,
172 0.5,
d35e5717 173 './tests/worker-files/cluster/testWorker.cjs'
2761efb4 174 )
948faff7 175 ).toThrow(
2761efb4
JB
176 new TypeError(
177 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
178 )
179 )
2431bdb4
JB
180 expect(
181 () =>
b2fd3f4a
JB
182 new DynamicThreadPool(
183 2,
184 1,
185 './tests/worker-files/thread/testWorker.mjs'
186 )
948faff7 187 ).toThrow(
2431bdb4
JB
188 new RangeError(
189 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
190 )
191 )
192 expect(
193 () =>
b2fd3f4a
JB
194 new DynamicThreadPool(
195 0,
196 0,
197 './tests/worker-files/thread/testWorker.mjs'
198 )
948faff7 199 ).toThrow(
2431bdb4 200 new RangeError(
213cbac6 201 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
2431bdb4
JB
202 )
203 )
21f710aa
JB
204 expect(
205 () =>
213cbac6
JB
206 new DynamicClusterPool(
207 1,
208 1,
d35e5717 209 './tests/worker-files/cluster/testWorker.cjs'
213cbac6 210 )
948faff7 211 ).toThrow(
21f710aa 212 new RangeError(
213cbac6 213 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
21f710aa
JB
214 )
215 )
2431bdb4
JB
216 })
217
fd7ebd49 218 it('Verify that pool options are checked', async () => {
7c0ba920
JB
219 let pool = new FixedThreadPool(
220 numberOfWorkers,
b2fd3f4a 221 './tests/worker-files/thread/testWorker.mjs'
7c0ba920 222 )
b5604034 223 expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
e44639e9 224 expect(pool.emitter.eventNames()).toStrictEqual([])
47352846
JB
225 expect(pool.opts).toStrictEqual({
226 startWorkers: true,
227 enableEvents: true,
228 restartWorkerOnError: true,
229 enableTasksQueue: false,
26ce26ca 230 workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN
8990357d 231 })
999ef664
JB
232 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
233 .workerChoiceStrategies) {
86cbb766 234 expect(workerChoiceStrategy.opts).toStrictEqual({
86cbb766
JB
235 runTime: { median: false },
236 waitTime: { median: false },
237 elu: { median: false },
238 weights: expect.objectContaining({
239 0: expect.any(Number),
240 [pool.info.maxSize - 1]: expect.any(Number)
00e1bdeb 241 })
86cbb766 242 })
999ef664 243 }
fd7ebd49 244 await pool.destroy()
73bfd59d 245 const testHandler = () => console.info('test handler executed')
7c0ba920
JB
246 pool = new FixedThreadPool(
247 numberOfWorkers,
b2fd3f4a 248 './tests/worker-files/thread/testWorker.mjs',
7c0ba920 249 {
e4543b14 250 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
49be33fe 251 workerChoiceStrategyOptions: {
932fc8be 252 runTime: { median: true },
fc027381 253 weights: { 0: 300, 1: 200 }
49be33fe 254 },
35cf1c03 255 enableEvents: false,
1f68cede 256 restartWorkerOnError: false,
ff733df7 257 enableTasksQueue: true,
d4aeae5a 258 tasksQueueOptions: { concurrency: 2 },
35cf1c03
JB
259 messageHandler: testHandler,
260 errorHandler: testHandler,
261 onlineHandler: testHandler,
262 exitHandler: testHandler
7c0ba920
JB
263 }
264 )
7c0ba920 265 expect(pool.emitter).toBeUndefined()
47352846
JB
266 expect(pool.opts).toStrictEqual({
267 startWorkers: true,
268 enableEvents: false,
269 restartWorkerOnError: false,
270 enableTasksQueue: true,
271 tasksQueueOptions: {
272 concurrency: 2,
2324f8c9 273 size: Math.pow(numberOfWorkers, 2),
dbd73092 274 taskStealing: true,
32b141fd 275 tasksStealingOnBackPressure: true,
568d0075 276 tasksFinishedTimeout: 2000
47352846
JB
277 },
278 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
279 workerChoiceStrategyOptions: {
47352846 280 runTime: { median: true },
47352846
JB
281 weights: { 0: 300, 1: 200 }
282 },
283 onlineHandler: testHandler,
284 messageHandler: testHandler,
285 errorHandler: testHandler,
286 exitHandler: testHandler
8990357d 287 })
999ef664
JB
288 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
289 .workerChoiceStrategies) {
290 expect(workerChoiceStrategy.opts).toStrictEqual({
999ef664
JB
291 runTime: { median: true },
292 waitTime: { median: false },
293 elu: { median: false },
294 weights: { 0: 300, 1: 200 }
295 })
296 }
fd7ebd49 297 await pool.destroy()
7c0ba920
JB
298 })
299
fe291b64 300 it('Verify that pool options are validated', () => {
d4aeae5a
JB
301 expect(
302 () =>
303 new FixedThreadPool(
304 numberOfWorkers,
b2fd3f4a 305 './tests/worker-files/thread/testWorker.mjs',
d4aeae5a 306 {
f0d7f803 307 workerChoiceStrategy: 'invalidStrategy'
d4aeae5a
JB
308 }
309 )
948faff7 310 ).toThrow(new Error("Invalid worker choice strategy 'invalidStrategy'"))
49be33fe
JB
311 expect(
312 () =>
313 new FixedThreadPool(
314 numberOfWorkers,
b2fd3f4a 315 './tests/worker-files/thread/testWorker.mjs',
49be33fe
JB
316 {
317 workerChoiceStrategyOptions: { weights: {} }
318 }
319 )
948faff7 320 ).toThrow(
8735b4e5
JB
321 new Error(
322 'Invalid worker choice strategy options: must have a weight for each worker node'
323 )
49be33fe 324 )
f0d7f803
JB
325 expect(
326 () =>
327 new FixedThreadPool(
328 numberOfWorkers,
b2fd3f4a 329 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
330 {
331 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
332 }
333 )
948faff7 334 ).toThrow(
8735b4e5
JB
335 new Error(
336 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
337 )
f0d7f803 338 )
5c4c2dee
JB
339 expect(
340 () =>
341 new FixedThreadPool(
342 numberOfWorkers,
b2fd3f4a 343 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
344 {
345 enableTasksQueue: true,
346 tasksQueueOptions: 'invalidTasksQueueOptions'
347 }
348 )
948faff7 349 ).toThrow(
5c4c2dee
JB
350 new TypeError('Invalid tasks queue options: must be a plain object')
351 )
f0d7f803
JB
352 expect(
353 () =>
354 new FixedThreadPool(
355 numberOfWorkers,
b2fd3f4a 356 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
357 {
358 enableTasksQueue: true,
359 tasksQueueOptions: { concurrency: 0 }
360 }
361 )
948faff7 362 ).toThrow(
e695d66f 363 new RangeError(
20c6f652 364 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
8735b4e5
JB
365 )
366 )
f0d7f803
JB
367 expect(
368 () =>
369 new FixedThreadPool(
370 numberOfWorkers,
b2fd3f4a 371 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
372 {
373 enableTasksQueue: true,
5c4c2dee 374 tasksQueueOptions: { concurrency: -1 }
f0d7f803
JB
375 }
376 )
948faff7 377 ).toThrow(
5c4c2dee
JB
378 new RangeError(
379 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
380 )
8735b4e5 381 )
f0d7f803
JB
382 expect(
383 () =>
384 new FixedThreadPool(
385 numberOfWorkers,
b2fd3f4a 386 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
387 {
388 enableTasksQueue: true,
389 tasksQueueOptions: { concurrency: 0.2 }
390 }
391 )
948faff7 392 ).toThrow(
20c6f652 393 new TypeError('Invalid worker node tasks concurrency: must be an integer')
8735b4e5 394 )
5c4c2dee
JB
395 expect(
396 () =>
397 new FixedThreadPool(
398 numberOfWorkers,
b2fd3f4a 399 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
400 {
401 enableTasksQueue: true,
402 tasksQueueOptions: { size: 0 }
403 }
404 )
948faff7 405 ).toThrow(
5c4c2dee
JB
406 new RangeError(
407 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
408 )
409 )
410 expect(
411 () =>
412 new FixedThreadPool(
413 numberOfWorkers,
b2fd3f4a 414 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
415 {
416 enableTasksQueue: true,
417 tasksQueueOptions: { size: -1 }
418 }
419 )
948faff7 420 ).toThrow(
5c4c2dee
JB
421 new RangeError(
422 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
423 )
424 )
425 expect(
426 () =>
427 new FixedThreadPool(
428 numberOfWorkers,
b2fd3f4a 429 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
430 {
431 enableTasksQueue: true,
432 tasksQueueOptions: { size: 0.2 }
433 }
434 )
948faff7 435 ).toThrow(
5c4c2dee
JB
436 new TypeError('Invalid worker node tasks queue size: must be an integer')
437 )
d4aeae5a
JB
438 })
439
2431bdb4 440 it('Verify that pool worker choice strategy options can be set', async () => {
a20f0ba5
JB
441 const pool = new FixedThreadPool(
442 numberOfWorkers,
b2fd3f4a 443 './tests/worker-files/thread/testWorker.mjs',
a20f0ba5
JB
444 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
445 )
26ce26ca 446 expect(pool.opts.workerChoiceStrategyOptions).toBeUndefined()
a20f0ba5
JB
447 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
448 .workerChoiceStrategies) {
86cbb766 449 expect(workerChoiceStrategy.opts).toStrictEqual({
86cbb766
JB
450 runTime: { median: false },
451 waitTime: { median: false },
452 elu: { median: false },
453 weights: expect.objectContaining({
454 0: expect.any(Number),
455 [pool.info.maxSize - 1]: expect.any(Number)
00e1bdeb 456 })
86cbb766 457 })
a20f0ba5 458 }
87de9ff5
JB
459 expect(
460 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
461 ).toStrictEqual({
932fc8be
JB
462 runTime: {
463 aggregate: true,
464 average: true,
465 median: false
466 },
467 waitTime: {
468 aggregate: false,
469 average: false,
470 median: false
471 },
5df69fab 472 elu: {
9adcefab
JB
473 aggregate: true,
474 average: true,
5df69fab
JB
475 median: false
476 }
86bf340d 477 })
9adcefab
JB
478 pool.setWorkerChoiceStrategyOptions({
479 runTime: { median: true },
480 elu: { median: true }
481 })
a20f0ba5 482 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
9adcefab 483 runTime: { median: true },
8990357d
JB
484 elu: { median: true }
485 })
a20f0ba5
JB
486 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
487 .workerChoiceStrategies) {
86cbb766 488 expect(workerChoiceStrategy.opts).toStrictEqual({
86cbb766
JB
489 runTime: { median: true },
490 waitTime: { median: false },
491 elu: { median: true },
492 weights: expect.objectContaining({
493 0: expect.any(Number),
494 [pool.info.maxSize - 1]: expect.any(Number)
00e1bdeb 495 })
86cbb766 496 })
a20f0ba5 497 }
87de9ff5
JB
498 expect(
499 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
500 ).toStrictEqual({
932fc8be
JB
501 runTime: {
502 aggregate: true,
503 average: false,
504 median: true
505 },
506 waitTime: {
507 aggregate: false,
508 average: false,
509 median: false
510 },
5df69fab 511 elu: {
9adcefab 512 aggregate: true,
5df69fab 513 average: false,
9adcefab 514 median: true
5df69fab 515 }
86bf340d 516 })
9adcefab
JB
517 pool.setWorkerChoiceStrategyOptions({
518 runTime: { median: false },
519 elu: { median: false }
520 })
a20f0ba5 521 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
8990357d 522 runTime: { median: false },
8990357d
JB
523 elu: { median: false }
524 })
a20f0ba5
JB
525 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
526 .workerChoiceStrategies) {
86cbb766 527 expect(workerChoiceStrategy.opts).toStrictEqual({
86cbb766
JB
528 runTime: { median: false },
529 waitTime: { median: false },
530 elu: { median: false },
531 weights: expect.objectContaining({
532 0: expect.any(Number),
533 [pool.info.maxSize - 1]: expect.any(Number)
00e1bdeb 534 })
86cbb766 535 })
a20f0ba5 536 }
87de9ff5
JB
537 expect(
538 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
539 ).toStrictEqual({
932fc8be
JB
540 runTime: {
541 aggregate: true,
542 average: true,
543 median: false
544 },
545 waitTime: {
546 aggregate: false,
547 average: false,
548 median: false
549 },
5df69fab 550 elu: {
9adcefab
JB
551 aggregate: true,
552 average: true,
5df69fab
JB
553 median: false
554 }
86bf340d 555 })
1f95d544
JB
556 expect(() =>
557 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
948faff7 558 ).toThrow(
8735b4e5
JB
559 new TypeError(
560 'Invalid worker choice strategy options: must be a plain object'
561 )
1f95d544 562 )
948faff7 563 expect(() => pool.setWorkerChoiceStrategyOptions({ weights: {} })).toThrow(
8735b4e5
JB
564 new Error(
565 'Invalid worker choice strategy options: must have a weight for each worker node'
566 )
1f95d544
JB
567 )
568 expect(() =>
569 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
948faff7 570 ).toThrow(
8735b4e5
JB
571 new Error(
572 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
573 )
1f95d544 574 )
a20f0ba5
JB
575 await pool.destroy()
576 })
577
2431bdb4 578 it('Verify that pool tasks queue can be enabled/disabled', async () => {
a20f0ba5
JB
579 const pool = new FixedThreadPool(
580 numberOfWorkers,
b2fd3f4a 581 './tests/worker-files/thread/testWorker.mjs'
a20f0ba5
JB
582 )
583 expect(pool.opts.enableTasksQueue).toBe(false)
584 expect(pool.opts.tasksQueueOptions).toBeUndefined()
585 pool.enableTasksQueue(true)
586 expect(pool.opts.enableTasksQueue).toBe(true)
20c6f652
JB
587 expect(pool.opts.tasksQueueOptions).toStrictEqual({
588 concurrency: 1,
2324f8c9 589 size: Math.pow(numberOfWorkers, 2),
dbd73092 590 taskStealing: true,
32b141fd 591 tasksStealingOnBackPressure: true,
568d0075 592 tasksFinishedTimeout: 2000
20c6f652 593 })
a20f0ba5
JB
594 pool.enableTasksQueue(true, { concurrency: 2 })
595 expect(pool.opts.enableTasksQueue).toBe(true)
20c6f652
JB
596 expect(pool.opts.tasksQueueOptions).toStrictEqual({
597 concurrency: 2,
2324f8c9 598 size: Math.pow(numberOfWorkers, 2),
dbd73092 599 taskStealing: true,
32b141fd 600 tasksStealingOnBackPressure: true,
568d0075 601 tasksFinishedTimeout: 2000
20c6f652 602 })
a20f0ba5
JB
603 pool.enableTasksQueue(false)
604 expect(pool.opts.enableTasksQueue).toBe(false)
605 expect(pool.opts.tasksQueueOptions).toBeUndefined()
606 await pool.destroy()
607 })
608
2431bdb4 609 it('Verify that pool tasks queue options can be set', async () => {
a20f0ba5
JB
610 const pool = new FixedThreadPool(
611 numberOfWorkers,
b2fd3f4a 612 './tests/worker-files/thread/testWorker.mjs',
a20f0ba5
JB
613 { enableTasksQueue: true }
614 )
20c6f652
JB
615 expect(pool.opts.tasksQueueOptions).toStrictEqual({
616 concurrency: 1,
2324f8c9 617 size: Math.pow(numberOfWorkers, 2),
dbd73092 618 taskStealing: true,
32b141fd 619 tasksStealingOnBackPressure: true,
568d0075 620 tasksFinishedTimeout: 2000
20c6f652 621 })
d6ca1416 622 for (const workerNode of pool.workerNodes) {
2324f8c9
JB
623 expect(workerNode.tasksQueueBackPressureSize).toBe(
624 pool.opts.tasksQueueOptions.size
625 )
d6ca1416
JB
626 }
627 pool.setTasksQueueOptions({
628 concurrency: 2,
2324f8c9 629 size: 2,
d6ca1416 630 taskStealing: false,
32b141fd 631 tasksStealingOnBackPressure: false,
568d0075 632 tasksFinishedTimeout: 3000
d6ca1416 633 })
20c6f652
JB
634 expect(pool.opts.tasksQueueOptions).toStrictEqual({
635 concurrency: 2,
2324f8c9 636 size: 2,
d6ca1416 637 taskStealing: false,
32b141fd 638 tasksStealingOnBackPressure: false,
568d0075 639 tasksFinishedTimeout: 3000
d6ca1416
JB
640 })
641 for (const workerNode of pool.workerNodes) {
2324f8c9
JB
642 expect(workerNode.tasksQueueBackPressureSize).toBe(
643 pool.opts.tasksQueueOptions.size
644 )
d6ca1416
JB
645 }
646 pool.setTasksQueueOptions({
647 concurrency: 1,
648 taskStealing: true,
649 tasksStealingOnBackPressure: true
650 })
651 expect(pool.opts.tasksQueueOptions).toStrictEqual({
652 concurrency: 1,
2324f8c9 653 size: Math.pow(numberOfWorkers, 2),
dbd73092 654 taskStealing: true,
32b141fd 655 tasksStealingOnBackPressure: true,
568d0075 656 tasksFinishedTimeout: 2000
20c6f652 657 })
d6ca1416 658 for (const workerNode of pool.workerNodes) {
2324f8c9
JB
659 expect(workerNode.tasksQueueBackPressureSize).toBe(
660 pool.opts.tasksQueueOptions.size
661 )
d6ca1416 662 }
948faff7 663 expect(() => pool.setTasksQueueOptions('invalidTasksQueueOptions')).toThrow(
8735b4e5
JB
664 new TypeError('Invalid tasks queue options: must be a plain object')
665 )
948faff7 666 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrow(
e695d66f 667 new RangeError(
20c6f652 668 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
8735b4e5
JB
669 )
670 )
948faff7 671 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrow(
e695d66f 672 new RangeError(
20c6f652 673 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
8735b4e5 674 )
a20f0ba5 675 )
948faff7 676 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrow(
20c6f652
JB
677 new TypeError('Invalid worker node tasks concurrency: must be an integer')
678 )
948faff7 679 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrow(
20c6f652 680 new RangeError(
68dbcdc0 681 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
20c6f652
JB
682 )
683 )
948faff7 684 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrow(
20c6f652 685 new RangeError(
68dbcdc0 686 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
20c6f652
JB
687 )
688 )
948faff7 689 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrow(
68dbcdc0 690 new TypeError('Invalid worker node tasks queue size: must be an integer')
f0d7f803 691 )
a20f0ba5
JB
692 await pool.destroy()
693 })
694
6b27d407
JB
695 it('Verify that pool info is set', async () => {
696 let pool = new FixedThreadPool(
697 numberOfWorkers,
b2fd3f4a 698 './tests/worker-files/thread/testWorker.mjs'
6b27d407 699 )
2dca6cab
JB
700 expect(pool.info).toStrictEqual({
701 version,
702 type: PoolTypes.fixed,
703 worker: WorkerTypes.thread,
47352846 704 started: true,
2dca6cab
JB
705 ready: true,
706 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
707 minSize: numberOfWorkers,
708 maxSize: numberOfWorkers,
709 workerNodes: numberOfWorkers,
710 idleWorkerNodes: numberOfWorkers,
711 busyWorkerNodes: 0,
712 executedTasks: 0,
713 executingTasks: 0,
2dca6cab
JB
714 failedTasks: 0
715 })
6b27d407
JB
716 await pool.destroy()
717 pool = new DynamicClusterPool(
2431bdb4 718 Math.floor(numberOfWorkers / 2),
6b27d407 719 numberOfWorkers,
d35e5717 720 './tests/worker-files/cluster/testWorker.cjs'
6b27d407 721 )
2dca6cab
JB
722 expect(pool.info).toStrictEqual({
723 version,
724 type: PoolTypes.dynamic,
725 worker: WorkerTypes.cluster,
47352846 726 started: true,
2dca6cab
JB
727 ready: true,
728 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
729 minSize: Math.floor(numberOfWorkers / 2),
730 maxSize: numberOfWorkers,
731 workerNodes: Math.floor(numberOfWorkers / 2),
732 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
733 busyWorkerNodes: 0,
734 executedTasks: 0,
735 executingTasks: 0,
2dca6cab
JB
736 failedTasks: 0
737 })
6b27d407
JB
738 await pool.destroy()
739 })
740
2431bdb4 741 it('Verify that pool worker tasks usage are initialized', async () => {
bf9549ae
JB
742 const pool = new FixedClusterPool(
743 numberOfWorkers,
d35e5717 744 './tests/worker-files/cluster/testWorker.cjs'
bf9549ae 745 )
f06e48d8 746 for (const workerNode of pool.workerNodes) {
47352846 747 expect(workerNode).toBeInstanceOf(WorkerNode)
465b2940 748 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
749 tasks: {
750 executed: 0,
751 executing: 0,
752 queued: 0,
df593701 753 maxQueued: 0,
463226a4 754 sequentiallyStolen: 0,
68cbdc84 755 stolen: 0,
a4e07f72
JB
756 failed: 0
757 },
758 runTime: {
4ba4c7f9 759 history: new CircularArray()
a4e07f72
JB
760 },
761 waitTime: {
4ba4c7f9 762 history: new CircularArray()
a4e07f72 763 },
5df69fab
JB
764 elu: {
765 idle: {
4ba4c7f9 766 history: new CircularArray()
5df69fab
JB
767 },
768 active: {
4ba4c7f9 769 history: new CircularArray()
f7510105 770 }
5df69fab 771 }
86bf340d 772 })
f06e48d8
JB
773 }
774 await pool.destroy()
775 })
776
2431bdb4
JB
777 it('Verify that pool worker tasks queue are initialized', async () => {
778 let pool = new FixedClusterPool(
f06e48d8 779 numberOfWorkers,
d35e5717 780 './tests/worker-files/cluster/testWorker.cjs'
f06e48d8
JB
781 )
782 for (const workerNode of pool.workerNodes) {
47352846 783 expect(workerNode).toBeInstanceOf(WorkerNode)
574b351d 784 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
4d8bf9e4 785 expect(workerNode.tasksQueue.size).toBe(0)
9c16fb4b 786 expect(workerNode.tasksQueue.maxSize).toBe(0)
bf9549ae 787 }
fd7ebd49 788 await pool.destroy()
2431bdb4
JB
789 pool = new DynamicThreadPool(
790 Math.floor(numberOfWorkers / 2),
791 numberOfWorkers,
b2fd3f4a 792 './tests/worker-files/thread/testWorker.mjs'
2431bdb4
JB
793 )
794 for (const workerNode of pool.workerNodes) {
47352846 795 expect(workerNode).toBeInstanceOf(WorkerNode)
574b351d 796 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
2431bdb4
JB
797 expect(workerNode.tasksQueue.size).toBe(0)
798 expect(workerNode.tasksQueue.maxSize).toBe(0)
799 }
213cbac6 800 await pool.destroy()
2431bdb4
JB
801 })
802
803 it('Verify that pool worker info are initialized', async () => {
804 let pool = new FixedClusterPool(
805 numberOfWorkers,
d35e5717 806 './tests/worker-files/cluster/testWorker.cjs'
2431bdb4 807 )
2dca6cab 808 for (const workerNode of pool.workerNodes) {
47352846 809 expect(workerNode).toBeInstanceOf(WorkerNode)
2dca6cab
JB
810 expect(workerNode.info).toStrictEqual({
811 id: expect.any(Number),
812 type: WorkerTypes.cluster,
813 dynamic: false,
5eb72b9e
JB
814 ready: true,
815 stealing: false
2dca6cab
JB
816 })
817 }
2431bdb4
JB
818 await pool.destroy()
819 pool = new DynamicThreadPool(
820 Math.floor(numberOfWorkers / 2),
821 numberOfWorkers,
b2fd3f4a 822 './tests/worker-files/thread/testWorker.mjs'
2431bdb4 823 )
2dca6cab 824 for (const workerNode of pool.workerNodes) {
47352846 825 expect(workerNode).toBeInstanceOf(WorkerNode)
2dca6cab
JB
826 expect(workerNode.info).toStrictEqual({
827 id: expect.any(Number),
828 type: WorkerTypes.thread,
829 dynamic: false,
5eb72b9e
JB
830 ready: true,
831 stealing: false
2dca6cab
JB
832 })
833 }
213cbac6 834 await pool.destroy()
bf9549ae
JB
835 })
836
711623b8
JB
837 it('Verify that pool statuses are checked at start or destroy', async () => {
838 const pool = new FixedThreadPool(
839 numberOfWorkers,
840 './tests/worker-files/thread/testWorker.mjs'
841 )
842 expect(pool.info.started).toBe(true)
843 expect(pool.info.ready).toBe(true)
844 expect(() => pool.start()).toThrow(
845 new Error('Cannot start an already started pool')
846 )
847 await pool.destroy()
848 expect(pool.info.started).toBe(false)
849 expect(pool.info.ready).toBe(false)
850 await expect(pool.destroy()).rejects.toThrow(
851 new Error('Cannot destroy an already destroyed pool')
852 )
853 })
854
47352846
JB
855 it('Verify that pool can be started after initialization', async () => {
856 const pool = new FixedClusterPool(
857 numberOfWorkers,
d35e5717 858 './tests/worker-files/cluster/testWorker.cjs',
47352846
JB
859 {
860 startWorkers: false
861 }
862 )
863 expect(pool.info.started).toBe(false)
864 expect(pool.info.ready).toBe(false)
865 expect(pool.workerNodes).toStrictEqual([])
8e8d9101 866 expect(pool.readyEventEmitted).toBe(false)
948faff7 867 await expect(pool.execute()).rejects.toThrow(
47352846
JB
868 new Error('Cannot execute a task on not started pool')
869 )
870 pool.start()
871 expect(pool.info.started).toBe(true)
872 expect(pool.info.ready).toBe(true)
55082af9
JB
873 await waitPoolEvents(pool, PoolEvents.ready, 1)
874 expect(pool.readyEventEmitted).toBe(true)
47352846
JB
875 expect(pool.workerNodes.length).toBe(numberOfWorkers)
876 for (const workerNode of pool.workerNodes) {
877 expect(workerNode).toBeInstanceOf(WorkerNode)
878 }
879 await pool.destroy()
880 })
881
9d2d0da1
JB
882 it('Verify that pool execute() arguments are checked', async () => {
883 const pool = new FixedClusterPool(
884 numberOfWorkers,
d35e5717 885 './tests/worker-files/cluster/testWorker.cjs'
9d2d0da1 886 )
948faff7 887 await expect(pool.execute(undefined, 0)).rejects.toThrow(
9d2d0da1
JB
888 new TypeError('name argument must be a string')
889 )
948faff7 890 await expect(pool.execute(undefined, '')).rejects.toThrow(
9d2d0da1
JB
891 new TypeError('name argument must not be an empty string')
892 )
948faff7 893 await expect(pool.execute(undefined, undefined, {})).rejects.toThrow(
9d2d0da1
JB
894 new TypeError('transferList argument must be an array')
895 )
896 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
897 "Task function 'unknown' not found"
898 )
899 await pool.destroy()
948faff7 900 await expect(pool.execute()).rejects.toThrow(
47352846 901 new Error('Cannot execute a task on not started pool')
9d2d0da1
JB
902 )
903 })
904
2431bdb4 905 it('Verify that pool worker tasks usage are computed', async () => {
bf9549ae
JB
906 const pool = new FixedClusterPool(
907 numberOfWorkers,
d35e5717 908 './tests/worker-files/cluster/testWorker.cjs'
bf9549ae 909 )
09c2d0d3 910 const promises = new Set()
fc027381
JB
911 const maxMultiplier = 2
912 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
09c2d0d3 913 promises.add(pool.execute())
bf9549ae 914 }
f06e48d8 915 for (const workerNode of pool.workerNodes) {
465b2940 916 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
917 tasks: {
918 executed: 0,
919 executing: maxMultiplier,
920 queued: 0,
df593701 921 maxQueued: 0,
463226a4 922 sequentiallyStolen: 0,
68cbdc84 923 stolen: 0,
a4e07f72
JB
924 failed: 0
925 },
926 runTime: {
a4e07f72
JB
927 history: expect.any(CircularArray)
928 },
929 waitTime: {
a4e07f72
JB
930 history: expect.any(CircularArray)
931 },
5df69fab
JB
932 elu: {
933 idle: {
5df69fab
JB
934 history: expect.any(CircularArray)
935 },
936 active: {
5df69fab 937 history: expect.any(CircularArray)
f7510105 938 }
5df69fab 939 }
86bf340d 940 })
bf9549ae
JB
941 }
942 await Promise.all(promises)
f06e48d8 943 for (const workerNode of pool.workerNodes) {
465b2940 944 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
945 tasks: {
946 executed: maxMultiplier,
947 executing: 0,
948 queued: 0,
df593701 949 maxQueued: 0,
463226a4 950 sequentiallyStolen: 0,
68cbdc84 951 stolen: 0,
a4e07f72
JB
952 failed: 0
953 },
954 runTime: {
a4e07f72
JB
955 history: expect.any(CircularArray)
956 },
957 waitTime: {
a4e07f72
JB
958 history: expect.any(CircularArray)
959 },
5df69fab
JB
960 elu: {
961 idle: {
5df69fab
JB
962 history: expect.any(CircularArray)
963 },
964 active: {
5df69fab 965 history: expect.any(CircularArray)
f7510105 966 }
5df69fab 967 }
86bf340d 968 })
bf9549ae 969 }
fd7ebd49 970 await pool.destroy()
bf9549ae
JB
971 })
972
2431bdb4 973 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
7fd82a1c 974 const pool = new DynamicThreadPool(
2431bdb4 975 Math.floor(numberOfWorkers / 2),
8f4878b7 976 numberOfWorkers,
b2fd3f4a 977 './tests/worker-files/thread/testWorker.mjs'
9e619829 978 )
09c2d0d3 979 const promises = new Set()
ee9f5295
JB
980 const maxMultiplier = 2
981 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
09c2d0d3 982 promises.add(pool.execute())
9e619829
JB
983 }
984 await Promise.all(promises)
f06e48d8 985 for (const workerNode of pool.workerNodes) {
465b2940 986 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
987 tasks: {
988 executed: expect.any(Number),
989 executing: 0,
990 queued: 0,
df593701 991 maxQueued: 0,
463226a4 992 sequentiallyStolen: 0,
68cbdc84 993 stolen: 0,
a4e07f72
JB
994 failed: 0
995 },
996 runTime: {
a4e07f72
JB
997 history: expect.any(CircularArray)
998 },
999 waitTime: {
a4e07f72
JB
1000 history: expect.any(CircularArray)
1001 },
5df69fab
JB
1002 elu: {
1003 idle: {
5df69fab
JB
1004 history: expect.any(CircularArray)
1005 },
1006 active: {
5df69fab 1007 history: expect.any(CircularArray)
f7510105 1008 }
5df69fab 1009 }
86bf340d 1010 })
465b2940 1011 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
94407def
JB
1012 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1013 numberOfWorkers * maxMultiplier
1014 )
b97d82d8
JB
1015 expect(workerNode.usage.runTime.history.length).toBe(0)
1016 expect(workerNode.usage.waitTime.history.length).toBe(0)
1017 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1018 expect(workerNode.usage.elu.active.history.length).toBe(0)
9e619829
JB
1019 }
1020 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
f06e48d8 1021 for (const workerNode of pool.workerNodes) {
465b2940 1022 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
1023 tasks: {
1024 executed: 0,
1025 executing: 0,
1026 queued: 0,
df593701 1027 maxQueued: 0,
463226a4 1028 sequentiallyStolen: 0,
68cbdc84 1029 stolen: 0,
a4e07f72
JB
1030 failed: 0
1031 },
1032 runTime: {
a4e07f72
JB
1033 history: expect.any(CircularArray)
1034 },
1035 waitTime: {
a4e07f72
JB
1036 history: expect.any(CircularArray)
1037 },
5df69fab
JB
1038 elu: {
1039 idle: {
5df69fab
JB
1040 history: expect.any(CircularArray)
1041 },
1042 active: {
5df69fab 1043 history: expect.any(CircularArray)
f7510105 1044 }
5df69fab 1045 }
86bf340d 1046 })
465b2940
JB
1047 expect(workerNode.usage.runTime.history.length).toBe(0)
1048 expect(workerNode.usage.waitTime.history.length).toBe(0)
b97d82d8
JB
1049 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1050 expect(workerNode.usage.elu.active.history.length).toBe(0)
ee11a4a2 1051 }
fd7ebd49 1052 await pool.destroy()
ee11a4a2
JB
1053 })
1054
a1763c54
JB
1055 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1056 const pool = new DynamicClusterPool(
2431bdb4 1057 Math.floor(numberOfWorkers / 2),
164d950a 1058 numberOfWorkers,
d35e5717 1059 './tests/worker-files/cluster/testWorker.cjs'
164d950a 1060 )
c726f66c 1061 expect(pool.emitter.eventNames()).toStrictEqual([])
d46660cd 1062 let poolInfo
a1763c54 1063 let poolReady = 0
041dc05b 1064 pool.emitter.on(PoolEvents.ready, info => {
a1763c54 1065 ++poolReady
d46660cd
JB
1066 poolInfo = info
1067 })
a1763c54 1068 await waitPoolEvents(pool, PoolEvents.ready, 1)
c726f66c 1069 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
a1763c54 1070 expect(poolReady).toBe(1)
d46660cd 1071 expect(poolInfo).toStrictEqual({
23ccf9d7 1072 version,
d46660cd 1073 type: PoolTypes.dynamic,
a1763c54 1074 worker: WorkerTypes.cluster,
47352846 1075 started: true,
a1763c54 1076 ready: true,
2431bdb4
JB
1077 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1078 minSize: expect.any(Number),
1079 maxSize: expect.any(Number),
1080 workerNodes: expect.any(Number),
1081 idleWorkerNodes: expect.any(Number),
1082 busyWorkerNodes: expect.any(Number),
1083 executedTasks: expect.any(Number),
1084 executingTasks: expect.any(Number),
2431bdb4
JB
1085 failedTasks: expect.any(Number)
1086 })
1087 await pool.destroy()
1088 })
1089
a1763c54
JB
1090 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1091 const pool = new FixedThreadPool(
2431bdb4 1092 numberOfWorkers,
b2fd3f4a 1093 './tests/worker-files/thread/testWorker.mjs'
2431bdb4 1094 )
c726f66c 1095 expect(pool.emitter.eventNames()).toStrictEqual([])
a1763c54
JB
1096 const promises = new Set()
1097 let poolBusy = 0
2431bdb4 1098 let poolInfo
041dc05b 1099 pool.emitter.on(PoolEvents.busy, info => {
a1763c54 1100 ++poolBusy
2431bdb4
JB
1101 poolInfo = info
1102 })
c726f66c 1103 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
a1763c54
JB
1104 for (let i = 0; i < numberOfWorkers * 2; i++) {
1105 promises.add(pool.execute())
1106 }
1107 await Promise.all(promises)
1108 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1109 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1110 expect(poolBusy).toBe(numberOfWorkers + 1)
2431bdb4
JB
1111 expect(poolInfo).toStrictEqual({
1112 version,
a1763c54
JB
1113 type: PoolTypes.fixed,
1114 worker: WorkerTypes.thread,
47352846
JB
1115 started: true,
1116 ready: true,
2431bdb4 1117 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
d46660cd
JB
1118 minSize: expect.any(Number),
1119 maxSize: expect.any(Number),
1120 workerNodes: expect.any(Number),
1121 idleWorkerNodes: expect.any(Number),
1122 busyWorkerNodes: expect.any(Number),
a4e07f72
JB
1123 executedTasks: expect.any(Number),
1124 executingTasks: expect.any(Number),
a4e07f72 1125 failedTasks: expect.any(Number)
d46660cd 1126 })
164d950a
JB
1127 await pool.destroy()
1128 })
1129
a1763c54
JB
1130 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1131 const pool = new DynamicThreadPool(
1132 Math.floor(numberOfWorkers / 2),
7c0ba920 1133 numberOfWorkers,
b2fd3f4a 1134 './tests/worker-files/thread/testWorker.mjs'
7c0ba920 1135 )
c726f66c 1136 expect(pool.emitter.eventNames()).toStrictEqual([])
09c2d0d3 1137 const promises = new Set()
a1763c54 1138 let poolFull = 0
d46660cd 1139 let poolInfo
041dc05b 1140 pool.emitter.on(PoolEvents.full, info => {
a1763c54 1141 ++poolFull
d46660cd
JB
1142 poolInfo = info
1143 })
c726f66c 1144 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.full])
7c0ba920 1145 for (let i = 0; i < numberOfWorkers * 2; i++) {
f5d14e90 1146 promises.add(pool.execute())
7c0ba920 1147 }
cf597bc5 1148 await Promise.all(promises)
33e6bb4c 1149 expect(poolFull).toBe(1)
d46660cd 1150 expect(poolInfo).toStrictEqual({
23ccf9d7 1151 version,
a1763c54 1152 type: PoolTypes.dynamic,
d46660cd 1153 worker: WorkerTypes.thread,
47352846
JB
1154 started: true,
1155 ready: true,
2431bdb4 1156 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
8735b4e5
JB
1157 minSize: expect.any(Number),
1158 maxSize: expect.any(Number),
1159 workerNodes: expect.any(Number),
1160 idleWorkerNodes: expect.any(Number),
1161 busyWorkerNodes: expect.any(Number),
1162 executedTasks: expect.any(Number),
1163 executingTasks: expect.any(Number),
1164 failedTasks: expect.any(Number)
1165 })
1166 await pool.destroy()
1167 })
1168
3e8611a8 1169 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
b1aae695 1170 const pool = new FixedThreadPool(
8735b4e5 1171 numberOfWorkers,
b2fd3f4a 1172 './tests/worker-files/thread/testWorker.mjs',
8735b4e5
JB
1173 {
1174 enableTasksQueue: true
1175 }
1176 )
a074ffee 1177 stub(pool, 'hasBackPressure').returns(true)
c726f66c 1178 expect(pool.emitter.eventNames()).toStrictEqual([])
8735b4e5
JB
1179 const promises = new Set()
1180 let poolBackPressure = 0
1181 let poolInfo
041dc05b 1182 pool.emitter.on(PoolEvents.backPressure, info => {
8735b4e5
JB
1183 ++poolBackPressure
1184 poolInfo = info
1185 })
c726f66c 1186 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.backPressure])
033f1776 1187 for (let i = 0; i < numberOfWorkers + 1; i++) {
8735b4e5
JB
1188 promises.add(pool.execute())
1189 }
1190 await Promise.all(promises)
033f1776 1191 expect(poolBackPressure).toBe(1)
8735b4e5
JB
1192 expect(poolInfo).toStrictEqual({
1193 version,
3e8611a8 1194 type: PoolTypes.fixed,
8735b4e5 1195 worker: WorkerTypes.thread,
47352846
JB
1196 started: true,
1197 ready: true,
8735b4e5 1198 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
d46660cd
JB
1199 minSize: expect.any(Number),
1200 maxSize: expect.any(Number),
1201 workerNodes: expect.any(Number),
1202 idleWorkerNodes: expect.any(Number),
5eb72b9e 1203 stealingWorkerNodes: expect.any(Number),
d46660cd 1204 busyWorkerNodes: expect.any(Number),
a4e07f72
JB
1205 executedTasks: expect.any(Number),
1206 executingTasks: expect.any(Number),
3e8611a8
JB
1207 maxQueuedTasks: expect.any(Number),
1208 queuedTasks: expect.any(Number),
1209 backPressure: true,
68cbdc84 1210 stolenTasks: expect.any(Number),
a4e07f72 1211 failedTasks: expect.any(Number)
d46660cd 1212 })
5eb72b9e 1213 expect(pool.hasBackPressure.callCount).toBeGreaterThanOrEqual(7)
fd7ebd49 1214 await pool.destroy()
7c0ba920 1215 })
70a4f5ea 1216
85b2561d
JB
1217 it('Verify that destroy() waits for queued tasks to finish', async () => {
1218 const tasksFinishedTimeout = 2500
1219 const pool = new FixedThreadPool(
1220 numberOfWorkers,
1221 './tests/worker-files/thread/asyncWorker.mjs',
1222 {
1223 enableTasksQueue: true,
1224 tasksQueueOptions: { tasksFinishedTimeout }
1225 }
1226 )
1227 const maxMultiplier = 4
1228 let tasksFinished = 0
1229 for (const workerNode of pool.workerNodes) {
1230 workerNode.on('taskFinished', () => {
1231 ++tasksFinished
1232 })
1233 }
1234 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1235 pool.execute()
1236 }
1237 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1238 const startTime = performance.now()
1239 await pool.destroy()
1240 const elapsedTime = performance.now() - startTime
90afa746 1241 expect(tasksFinished).toBeLessThanOrEqual(numberOfWorkers * maxMultiplier)
85b2561d 1242 expect(elapsedTime).toBeGreaterThanOrEqual(2000)
14d5e183 1243 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 800)
85b2561d
JB
1244 })
1245
1246 it('Verify that destroy() waits until the tasks finished timeout is reached', async () => {
1247 const tasksFinishedTimeout = 1000
1248 const pool = new FixedThreadPool(
1249 numberOfWorkers,
1250 './tests/worker-files/thread/asyncWorker.mjs',
1251 {
1252 enableTasksQueue: true,
1253 tasksQueueOptions: { tasksFinishedTimeout }
1254 }
1255 )
1256 const maxMultiplier = 4
1257 let tasksFinished = 0
1258 for (const workerNode of pool.workerNodes) {
1259 workerNode.on('taskFinished', () => {
1260 ++tasksFinished
1261 })
1262 }
1263 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1264 pool.execute()
1265 }
1266 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1267 const startTime = performance.now()
1268 await pool.destroy()
1269 const elapsedTime = performance.now() - startTime
1270 expect(tasksFinished).toBe(0)
2885534c 1271 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 800)
85b2561d
JB
1272 })
1273
f18fd12b
JB
1274 it('Verify that pool asynchronous resource track tasks execution', async () => {
1275 let taskAsyncId
1276 let initCalls = 0
1277 let beforeCalls = 0
1278 let afterCalls = 0
1279 let resolveCalls = 0
1280 const hook = createHook({
1281 init (asyncId, type) {
1282 if (type === 'poolifier:task') {
1283 initCalls++
1284 taskAsyncId = asyncId
1285 }
1286 },
1287 before (asyncId) {
1288 if (asyncId === taskAsyncId) beforeCalls++
1289 },
1290 after (asyncId) {
1291 if (asyncId === taskAsyncId) afterCalls++
1292 },
1293 promiseResolve () {
1294 if (executionAsyncId() === taskAsyncId) resolveCalls++
1295 }
1296 })
f18fd12b
JB
1297 const pool = new FixedThreadPool(
1298 numberOfWorkers,
1299 './tests/worker-files/thread/testWorker.mjs'
1300 )
8954c0a3 1301 hook.enable()
f18fd12b
JB
1302 await pool.execute()
1303 hook.disable()
1304 expect(initCalls).toBe(1)
1305 expect(beforeCalls).toBe(1)
1306 expect(afterCalls).toBe(1)
1307 expect(resolveCalls).toBe(1)
8954c0a3 1308 await pool.destroy()
f18fd12b
JB
1309 })
1310
9eae3c69
JB
1311 it('Verify that hasTaskFunction() is working', async () => {
1312 const dynamicThreadPool = new DynamicThreadPool(
1313 Math.floor(numberOfWorkers / 2),
1314 numberOfWorkers,
b2fd3f4a 1315 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
9eae3c69
JB
1316 )
1317 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1318 expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1319 expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1320 true
1321 )
1322 expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true)
1323 expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true)
1324 expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false)
1325 await dynamicThreadPool.destroy()
1326 const fixedClusterPool = new FixedClusterPool(
1327 numberOfWorkers,
d35e5717 1328 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs'
9eae3c69
JB
1329 )
1330 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1331 expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1332 expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1333 true
1334 )
1335 expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true)
1336 expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true)
1337 expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false)
1338 await fixedClusterPool.destroy()
1339 })
1340
1341 it('Verify that addTaskFunction() is working', async () => {
1342 const dynamicThreadPool = new DynamicThreadPool(
1343 Math.floor(numberOfWorkers / 2),
1344 numberOfWorkers,
b2fd3f4a 1345 './tests/worker-files/thread/testWorker.mjs'
9eae3c69
JB
1346 )
1347 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
3feeab69
JB
1348 await expect(
1349 dynamicThreadPool.addTaskFunction(0, () => {})
948faff7 1350 ).rejects.toThrow(new TypeError('name argument must be a string'))
3feeab69
JB
1351 await expect(
1352 dynamicThreadPool.addTaskFunction('', () => {})
948faff7 1353 ).rejects.toThrow(
3feeab69
JB
1354 new TypeError('name argument must not be an empty string')
1355 )
948faff7
JB
1356 await expect(dynamicThreadPool.addTaskFunction('test', 0)).rejects.toThrow(
1357 new TypeError('fn argument must be a function')
1358 )
1359 await expect(dynamicThreadPool.addTaskFunction('test', '')).rejects.toThrow(
1360 new TypeError('fn argument must be a function')
1361 )
9eae3c69
JB
1362 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1363 DEFAULT_TASK_NAME,
1364 'test'
1365 ])
1366 const echoTaskFunction = data => {
1367 return data
1368 }
1369 await expect(
1370 dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1371 ).resolves.toBe(true)
1372 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1373 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1374 echoTaskFunction
1375 )
1376 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1377 DEFAULT_TASK_NAME,
1378 'test',
1379 'echo'
1380 ])
1381 const taskFunctionData = { test: 'test' }
1382 const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
1383 expect(echoResult).toStrictEqual(taskFunctionData)
adee6053
JB
1384 for (const workerNode of dynamicThreadPool.workerNodes) {
1385 expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
1386 tasks: {
1387 executed: expect.any(Number),
1388 executing: 0,
1389 queued: 0,
463226a4 1390 sequentiallyStolen: 0,
5ad42e34 1391 stolen: 0,
adee6053
JB
1392 failed: 0
1393 },
1394 runTime: {
1395 history: new CircularArray()
1396 },
1397 waitTime: {
1398 history: new CircularArray()
1399 },
1400 elu: {
1401 idle: {
1402 history: new CircularArray()
1403 },
1404 active: {
1405 history: new CircularArray()
1406 }
1407 }
1408 })
1409 }
9eae3c69
JB
1410 await dynamicThreadPool.destroy()
1411 })
1412
1413 it('Verify that removeTaskFunction() is working', async () => {
1414 const dynamicThreadPool = new DynamicThreadPool(
1415 Math.floor(numberOfWorkers / 2),
1416 numberOfWorkers,
b2fd3f4a 1417 './tests/worker-files/thread/testWorker.mjs'
9eae3c69
JB
1418 )
1419 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1420 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1421 DEFAULT_TASK_NAME,
1422 'test'
1423 ])
948faff7 1424 await expect(dynamicThreadPool.removeTaskFunction('test')).rejects.toThrow(
16248b23 1425 new Error('Cannot remove a task function not handled on the pool side')
9eae3c69
JB
1426 )
1427 const echoTaskFunction = data => {
1428 return data
1429 }
1430 await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1431 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1432 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1433 echoTaskFunction
1434 )
1435 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1436 DEFAULT_TASK_NAME,
1437 'test',
1438 'echo'
1439 ])
1440 await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
1441 true
1442 )
1443 expect(dynamicThreadPool.taskFunctions.size).toBe(0)
1444 expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
1445 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1446 DEFAULT_TASK_NAME,
1447 'test'
1448 ])
1449 await dynamicThreadPool.destroy()
1450 })
1451
30500265 1452 it('Verify that listTaskFunctionNames() is working', async () => {
90d7d101
JB
1453 const dynamicThreadPool = new DynamicThreadPool(
1454 Math.floor(numberOfWorkers / 2),
1455 numberOfWorkers,
b2fd3f4a 1456 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
90d7d101
JB
1457 )
1458 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
66979634 1459 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
6cd5248f 1460 DEFAULT_TASK_NAME,
90d7d101
JB
1461 'jsonIntegerSerialization',
1462 'factorial',
1463 'fibonacci'
1464 ])
9eae3c69 1465 await dynamicThreadPool.destroy()
90d7d101
JB
1466 const fixedClusterPool = new FixedClusterPool(
1467 numberOfWorkers,
d35e5717 1468 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs'
90d7d101
JB
1469 )
1470 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
66979634 1471 expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([
6cd5248f 1472 DEFAULT_TASK_NAME,
90d7d101
JB
1473 'jsonIntegerSerialization',
1474 'factorial',
1475 'fibonacci'
1476 ])
0fe39c97 1477 await fixedClusterPool.destroy()
90d7d101
JB
1478 })
1479
9eae3c69 1480 it('Verify that setDefaultTaskFunction() is working', async () => {
30500265
JB
1481 const dynamicThreadPool = new DynamicThreadPool(
1482 Math.floor(numberOfWorkers / 2),
1483 numberOfWorkers,
b2fd3f4a 1484 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
30500265
JB
1485 )
1486 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
711623b8 1487 const workerId = dynamicThreadPool.workerNodes[0].info.id
948faff7 1488 await expect(dynamicThreadPool.setDefaultTaskFunction(0)).rejects.toThrow(
b0b55f57 1489 new Error(
711623b8 1490 `Task function operation 'default' failed on worker ${workerId} with error: 'TypeError: name parameter is not a string'`
b0b55f57
JB
1491 )
1492 )
1493 await expect(
1494 dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
948faff7 1495 ).rejects.toThrow(
b0b55f57 1496 new Error(
711623b8 1497 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function reserved name as the default task function'`
b0b55f57
JB
1498 )
1499 )
1500 await expect(
1501 dynamicThreadPool.setDefaultTaskFunction('unknown')
948faff7 1502 ).rejects.toThrow(
b0b55f57 1503 new Error(
711623b8 1504 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function to a non-existing task function'`
b0b55f57
JB
1505 )
1506 )
9eae3c69
JB
1507 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1508 DEFAULT_TASK_NAME,
1509 'jsonIntegerSerialization',
1510 'factorial',
1511 'fibonacci'
1512 ])
1513 await expect(
1514 dynamicThreadPool.setDefaultTaskFunction('factorial')
1515 ).resolves.toBe(true)
1516 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1517 DEFAULT_TASK_NAME,
1518 'factorial',
1519 'jsonIntegerSerialization',
1520 'fibonacci'
1521 ])
1522 await expect(
1523 dynamicThreadPool.setDefaultTaskFunction('fibonacci')
1524 ).resolves.toBe(true)
1525 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1526 DEFAULT_TASK_NAME,
1527 'fibonacci',
1528 'jsonIntegerSerialization',
1529 'factorial'
1530 ])
cda9ba34 1531 await dynamicThreadPool.destroy()
30500265
JB
1532 })
1533
90d7d101 1534 it('Verify that multiple task functions worker is working', async () => {
70a4f5ea 1535 const pool = new DynamicClusterPool(
2431bdb4 1536 Math.floor(numberOfWorkers / 2),
70a4f5ea 1537 numberOfWorkers,
d35e5717 1538 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs'
70a4f5ea
JB
1539 )
1540 const data = { n: 10 }
82888165 1541 const result0 = await pool.execute(data)
30b963d4 1542 expect(result0).toStrictEqual({ ok: 1 })
70a4f5ea 1543 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
30b963d4 1544 expect(result1).toStrictEqual({ ok: 1 })
70a4f5ea
JB
1545 const result2 = await pool.execute(data, 'factorial')
1546 expect(result2).toBe(3628800)
1547 const result3 = await pool.execute(data, 'fibonacci')
024daf59 1548 expect(result3).toBe(55)
5bb5be17
JB
1549 expect(pool.info.executingTasks).toBe(0)
1550 expect(pool.info.executedTasks).toBe(4)
b414b84c 1551 for (const workerNode of pool.workerNodes) {
66979634 1552 expect(workerNode.info.taskFunctionNames).toStrictEqual([
6cd5248f 1553 DEFAULT_TASK_NAME,
b414b84c
JB
1554 'jsonIntegerSerialization',
1555 'factorial',
1556 'fibonacci'
1557 ])
1558 expect(workerNode.taskFunctionsUsage.size).toBe(3)
66979634 1559 for (const name of pool.listTaskFunctionNames()) {
5bb5be17
JB
1560 expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
1561 tasks: {
1562 executed: expect.any(Number),
4ba4c7f9 1563 executing: 0,
5bb5be17 1564 failed: 0,
68cbdc84 1565 queued: 0,
463226a4 1566 sequentiallyStolen: 0,
68cbdc84 1567 stolen: 0
5bb5be17
JB
1568 },
1569 runTime: {
1570 history: expect.any(CircularArray)
1571 },
1572 waitTime: {
1573 history: expect.any(CircularArray)
1574 },
1575 elu: {
1576 idle: {
1577 history: expect.any(CircularArray)
1578 },
1579 active: {
1580 history: expect.any(CircularArray)
1581 }
1582 }
1583 })
1584 expect(
4ba4c7f9
JB
1585 workerNode.getTaskFunctionWorkerUsage(name).tasks.executed
1586 ).toBeGreaterThan(0)
5bb5be17 1587 }
dfd7ec01
JB
1588 expect(
1589 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1590 ).toStrictEqual(
66979634
JB
1591 workerNode.getTaskFunctionWorkerUsage(
1592 workerNode.info.taskFunctionNames[1]
1593 )
dfd7ec01 1594 )
5bb5be17 1595 }
0fe39c97 1596 await pool.destroy()
70a4f5ea 1597 })
52a23942
JB
1598
1599 it('Verify sendKillMessageToWorker()', async () => {
1600 const pool = new DynamicClusterPool(
1601 Math.floor(numberOfWorkers / 2),
1602 numberOfWorkers,
d35e5717 1603 './tests/worker-files/cluster/testWorker.cjs'
52a23942
JB
1604 )
1605 const workerNodeKey = 0
1606 await expect(
adee6053 1607 pool.sendKillMessageToWorker(workerNodeKey)
52a23942
JB
1608 ).resolves.toBeUndefined()
1609 await pool.destroy()
1610 })
adee6053
JB
1611
1612 it('Verify sendTaskFunctionOperationToWorker()', async () => {
1613 const pool = new DynamicClusterPool(
1614 Math.floor(numberOfWorkers / 2),
1615 numberOfWorkers,
d35e5717 1616 './tests/worker-files/cluster/testWorker.cjs'
adee6053
JB
1617 )
1618 const workerNodeKey = 0
1619 await expect(
1620 pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
1621 taskFunctionOperation: 'add',
1622 taskFunctionName: 'empty',
1623 taskFunction: (() => {}).toString()
1624 })
1625 ).resolves.toBe(true)
1626 expect(
1627 pool.workerNodes[workerNodeKey].info.taskFunctionNames
1628 ).toStrictEqual([DEFAULT_TASK_NAME, 'test', 'empty'])
1629 await pool.destroy()
1630 })
1631
1632 it('Verify sendTaskFunctionOperationToWorkers()', async () => {
1633 const pool = new DynamicClusterPool(
1634 Math.floor(numberOfWorkers / 2),
1635 numberOfWorkers,
d35e5717 1636 './tests/worker-files/cluster/testWorker.cjs'
adee6053 1637 )
adee6053
JB
1638 await expect(
1639 pool.sendTaskFunctionOperationToWorkers({
1640 taskFunctionOperation: 'add',
1641 taskFunctionName: 'empty',
1642 taskFunction: (() => {}).toString()
1643 })
1644 ).resolves.toBe(true)
1645 for (const workerNode of pool.workerNodes) {
1646 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1647 DEFAULT_TASK_NAME,
1648 'test',
1649 'empty'
1650 ])
1651 }
1652 await pool.destroy()
1653 })
3ec964d6 1654})