refactor: cleanup eslint configuration
[poolifier.git] / tests / pools / abstract-pool.test.mjs
CommitLineData
ded253e2 1import { createHook, executionAsyncId } from 'node:async_hooks'
a074ffee
JB
2import { EventEmitterAsyncResource } from 'node:events'
3import { readFileSync } from 'node:fs'
ded253e2 4import { dirname, join } from 'node:path'
2eb14889 5import { fileURLToPath } from 'node:url'
ded253e2 6
a074ffee
JB
7import { expect } from 'expect'
8import { restore, stub } from 'sinon'
ded253e2
JB
9
10import { CircularArray } from '../../lib/circular-array.cjs'
11import { Deque } from '../../lib/deque.cjs'
a074ffee 12import {
70a4f5ea 13 DynamicClusterPool,
9e619829 14 DynamicThreadPool,
aee46736 15 FixedClusterPool,
e843b904 16 FixedThreadPool,
aee46736 17 PoolEvents,
184855e6 18 PoolTypes,
3d6dd312 19 WorkerChoiceStrategies,
184855e6 20 WorkerTypes
d35e5717 21} from '../../lib/index.cjs'
ded253e2 22import { WorkerNode } from '../../lib/pools/worker-node.cjs'
d35e5717
JB
23import { DEFAULT_TASK_NAME } from '../../lib/utils.cjs'
24import { waitPoolEvents } from '../test-utils.cjs'
e1ffb94f
JB
25
26describe('Abstract pool test suite', () => {
2eb14889
JB
27 const version = JSON.parse(
28 readFileSync(
a5e5599c 29 join(dirname(fileURLToPath(import.meta.url)), '../..', 'package.json'),
2eb14889
JB
30 'utf8'
31 )
32 ).version
fc027381 33 const numberOfWorkers = 2
a8884ffd 34 class StubPoolWithIsMain extends FixedThreadPool {
e1ffb94f
JB
35 isMain () {
36 return false
37 }
3ec964d6 38 }
3ec964d6 39
dc021bcc 40 afterEach(() => {
a074ffee 41 restore()
dc021bcc
JB
42 })
43
bcf85f7f
JB
44 it('Verify that pool can be created and destroyed', async () => {
45 const pool = new FixedThreadPool(
46 numberOfWorkers,
47 './tests/worker-files/thread/testWorker.mjs'
48 )
49 expect(pool).toBeInstanceOf(FixedThreadPool)
50 await pool.destroy()
51 })
52
53 it('Verify that pool cannot be created from a non main thread/process', () => {
8d3782fa
JB
54 expect(
55 () =>
a8884ffd 56 new StubPoolWithIsMain(
7c0ba920 57 numberOfWorkers,
b2fd3f4a 58 './tests/worker-files/thread/testWorker.mjs',
8d3782fa 59 {
041dc05b 60 errorHandler: e => console.error(e)
8d3782fa
JB
61 }
62 )
948faff7 63 ).toThrow(
e695d66f
JB
64 new Error(
65 'Cannot start a pool from a worker with the same type as the pool'
66 )
04f45163 67 )
3ec964d6 68 })
c510fea7 69
bc61cfe6
JB
70 it('Verify that pool statuses properties are set', async () => {
71 const pool = new FixedThreadPool(
72 numberOfWorkers,
b2fd3f4a 73 './tests/worker-files/thread/testWorker.mjs'
bc61cfe6 74 )
bc61cfe6 75 expect(pool.started).toBe(true)
711623b8
JB
76 expect(pool.starting).toBe(false)
77 expect(pool.destroying).toBe(false)
bc61cfe6 78 await pool.destroy()
bc61cfe6
JB
79 })
80
c510fea7 81 it('Verify that filePath is checked', () => {
948faff7 82 expect(() => new FixedThreadPool(numberOfWorkers)).toThrow(
c3719753
JB
83 new TypeError('The worker file path must be specified')
84 )
85 expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrow(
86 new TypeError('The worker file path must be a string')
3d6dd312
JB
87 )
88 expect(
89 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
948faff7 90 ).toThrow(new Error("Cannot find the worker file './dummyWorker.ts'"))
8d3782fa
JB
91 })
92
93 it('Verify that numberOfWorkers is checked', () => {
8003c026
JB
94 expect(
95 () =>
96 new FixedThreadPool(
97 undefined,
b2fd3f4a 98 './tests/worker-files/thread/testWorker.mjs'
8003c026 99 )
948faff7 100 ).toThrow(
e695d66f
JB
101 new Error(
102 'Cannot instantiate a pool without specifying the number of workers'
103 )
8d3782fa
JB
104 )
105 })
106
107 it('Verify that a negative number of workers is checked', () => {
108 expect(
109 () =>
d35e5717 110 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.cjs')
948faff7 111 ).toThrow(
473c717a
JB
112 new RangeError(
113 'Cannot instantiate a pool with a negative number of workers'
114 )
8d3782fa
JB
115 )
116 })
117
118 it('Verify that a non integer number of workers is checked', () => {
119 expect(
120 () =>
b2fd3f4a 121 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.mjs')
948faff7 122 ).toThrow(
473c717a 123 new TypeError(
0d80593b 124 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
125 )
126 )
c510fea7 127 })
7c0ba920 128
26ce26ca
JB
129 it('Verify that pool arguments number and pool type are checked', () => {
130 expect(
131 () =>
132 new FixedThreadPool(
133 numberOfWorkers,
134 './tests/worker-files/thread/testWorker.mjs',
135 undefined,
136 numberOfWorkers * 2
137 )
138 ).toThrow(
139 new Error(
140 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
141 )
142 )
143 })
144
216541b6 145 it('Verify that dynamic pool sizing is checked', () => {
a5ed75b7
JB
146 expect(
147 () =>
148 new DynamicClusterPool(
149 1,
150 undefined,
d35e5717 151 './tests/worker-files/cluster/testWorker.cjs'
a5ed75b7 152 )
948faff7 153 ).toThrow(
a5ed75b7
JB
154 new TypeError(
155 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
156 )
157 )
2761efb4
JB
158 expect(
159 () =>
160 new DynamicThreadPool(
161 0.5,
162 1,
b2fd3f4a 163 './tests/worker-files/thread/testWorker.mjs'
2761efb4 164 )
948faff7 165 ).toThrow(
2761efb4
JB
166 new TypeError(
167 'Cannot instantiate a pool with a non safe integer number of workers'
168 )
169 )
170 expect(
171 () =>
172 new DynamicClusterPool(
173 0,
174 0.5,
d35e5717 175 './tests/worker-files/cluster/testWorker.cjs'
2761efb4 176 )
948faff7 177 ).toThrow(
2761efb4
JB
178 new TypeError(
179 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
180 )
181 )
2431bdb4
JB
182 expect(
183 () =>
b2fd3f4a
JB
184 new DynamicThreadPool(
185 2,
186 1,
187 './tests/worker-files/thread/testWorker.mjs'
188 )
948faff7 189 ).toThrow(
2431bdb4
JB
190 new RangeError(
191 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
192 )
193 )
194 expect(
195 () =>
b2fd3f4a
JB
196 new DynamicThreadPool(
197 0,
198 0,
199 './tests/worker-files/thread/testWorker.mjs'
200 )
948faff7 201 ).toThrow(
2431bdb4 202 new RangeError(
213cbac6 203 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
2431bdb4
JB
204 )
205 )
21f710aa
JB
206 expect(
207 () =>
213cbac6
JB
208 new DynamicClusterPool(
209 1,
210 1,
d35e5717 211 './tests/worker-files/cluster/testWorker.cjs'
213cbac6 212 )
948faff7 213 ).toThrow(
21f710aa 214 new RangeError(
213cbac6 215 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
21f710aa
JB
216 )
217 )
2431bdb4
JB
218 })
219
fd7ebd49 220 it('Verify that pool options are checked', async () => {
7c0ba920
JB
221 let pool = new FixedThreadPool(
222 numberOfWorkers,
b2fd3f4a 223 './tests/worker-files/thread/testWorker.mjs'
7c0ba920 224 )
b5604034 225 expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
e44639e9 226 expect(pool.emitter.eventNames()).toStrictEqual([])
47352846
JB
227 expect(pool.opts).toStrictEqual({
228 startWorkers: true,
229 enableEvents: true,
230 restartWorkerOnError: true,
231 enableTasksQueue: false,
26ce26ca 232 workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN
8990357d 233 })
999ef664
JB
234 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
235 .workerChoiceStrategies) {
86cbb766 236 expect(workerChoiceStrategy.opts).toStrictEqual({
86cbb766
JB
237 runTime: { median: false },
238 waitTime: { median: false },
239 elu: { median: false },
240 weights: expect.objectContaining({
241 0: expect.any(Number),
242 [pool.info.maxSize - 1]: expect.any(Number)
00e1bdeb 243 })
86cbb766 244 })
999ef664 245 }
fd7ebd49 246 await pool.destroy()
73bfd59d 247 const testHandler = () => console.info('test handler executed')
7c0ba920
JB
248 pool = new FixedThreadPool(
249 numberOfWorkers,
b2fd3f4a 250 './tests/worker-files/thread/testWorker.mjs',
7c0ba920 251 {
e4543b14 252 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
49be33fe 253 workerChoiceStrategyOptions: {
932fc8be 254 runTime: { median: true },
fc027381 255 weights: { 0: 300, 1: 200 }
49be33fe 256 },
35cf1c03 257 enableEvents: false,
1f68cede 258 restartWorkerOnError: false,
ff733df7 259 enableTasksQueue: true,
d4aeae5a 260 tasksQueueOptions: { concurrency: 2 },
35cf1c03
JB
261 messageHandler: testHandler,
262 errorHandler: testHandler,
263 onlineHandler: testHandler,
264 exitHandler: testHandler
7c0ba920
JB
265 }
266 )
7c0ba920 267 expect(pool.emitter).toBeUndefined()
47352846
JB
268 expect(pool.opts).toStrictEqual({
269 startWorkers: true,
270 enableEvents: false,
271 restartWorkerOnError: false,
272 enableTasksQueue: true,
273 tasksQueueOptions: {
274 concurrency: 2,
2324f8c9 275 size: Math.pow(numberOfWorkers, 2),
dbd73092 276 taskStealing: true,
32b141fd 277 tasksStealingOnBackPressure: true,
568d0075 278 tasksFinishedTimeout: 2000
47352846
JB
279 },
280 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
281 workerChoiceStrategyOptions: {
47352846 282 runTime: { median: true },
47352846
JB
283 weights: { 0: 300, 1: 200 }
284 },
285 onlineHandler: testHandler,
286 messageHandler: testHandler,
287 errorHandler: testHandler,
288 exitHandler: testHandler
8990357d 289 })
999ef664
JB
290 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
291 .workerChoiceStrategies) {
292 expect(workerChoiceStrategy.opts).toStrictEqual({
999ef664
JB
293 runTime: { median: true },
294 waitTime: { median: false },
295 elu: { median: false },
296 weights: { 0: 300, 1: 200 }
297 })
298 }
fd7ebd49 299 await pool.destroy()
7c0ba920
JB
300 })
301
fe291b64 302 it('Verify that pool options are validated', () => {
d4aeae5a
JB
303 expect(
304 () =>
305 new FixedThreadPool(
306 numberOfWorkers,
b2fd3f4a 307 './tests/worker-files/thread/testWorker.mjs',
d4aeae5a 308 {
f0d7f803 309 workerChoiceStrategy: 'invalidStrategy'
d4aeae5a
JB
310 }
311 )
948faff7 312 ).toThrow(new Error("Invalid worker choice strategy 'invalidStrategy'"))
49be33fe
JB
313 expect(
314 () =>
315 new FixedThreadPool(
316 numberOfWorkers,
b2fd3f4a 317 './tests/worker-files/thread/testWorker.mjs',
49be33fe
JB
318 {
319 workerChoiceStrategyOptions: { weights: {} }
320 }
321 )
948faff7 322 ).toThrow(
8735b4e5
JB
323 new Error(
324 'Invalid worker choice strategy options: must have a weight for each worker node'
325 )
49be33fe 326 )
f0d7f803
JB
327 expect(
328 () =>
329 new FixedThreadPool(
330 numberOfWorkers,
b2fd3f4a 331 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
332 {
333 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
334 }
335 )
948faff7 336 ).toThrow(
8735b4e5
JB
337 new Error(
338 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
339 )
f0d7f803 340 )
5c4c2dee
JB
341 expect(
342 () =>
343 new FixedThreadPool(
344 numberOfWorkers,
b2fd3f4a 345 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
346 {
347 enableTasksQueue: true,
348 tasksQueueOptions: 'invalidTasksQueueOptions'
349 }
350 )
948faff7 351 ).toThrow(
5c4c2dee
JB
352 new TypeError('Invalid tasks queue options: must be a plain object')
353 )
f0d7f803
JB
354 expect(
355 () =>
356 new FixedThreadPool(
357 numberOfWorkers,
b2fd3f4a 358 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
359 {
360 enableTasksQueue: true,
361 tasksQueueOptions: { concurrency: 0 }
362 }
363 )
948faff7 364 ).toThrow(
e695d66f 365 new RangeError(
20c6f652 366 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
8735b4e5
JB
367 )
368 )
f0d7f803
JB
369 expect(
370 () =>
371 new FixedThreadPool(
372 numberOfWorkers,
b2fd3f4a 373 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
374 {
375 enableTasksQueue: true,
5c4c2dee 376 tasksQueueOptions: { concurrency: -1 }
f0d7f803
JB
377 }
378 )
948faff7 379 ).toThrow(
5c4c2dee
JB
380 new RangeError(
381 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
382 )
8735b4e5 383 )
f0d7f803
JB
384 expect(
385 () =>
386 new FixedThreadPool(
387 numberOfWorkers,
b2fd3f4a 388 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
389 {
390 enableTasksQueue: true,
391 tasksQueueOptions: { concurrency: 0.2 }
392 }
393 )
948faff7 394 ).toThrow(
20c6f652 395 new TypeError('Invalid worker node tasks concurrency: must be an integer')
8735b4e5 396 )
5c4c2dee
JB
397 expect(
398 () =>
399 new FixedThreadPool(
400 numberOfWorkers,
b2fd3f4a 401 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
402 {
403 enableTasksQueue: true,
404 tasksQueueOptions: { size: 0 }
405 }
406 )
948faff7 407 ).toThrow(
5c4c2dee
JB
408 new RangeError(
409 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
410 )
411 )
412 expect(
413 () =>
414 new FixedThreadPool(
415 numberOfWorkers,
b2fd3f4a 416 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
417 {
418 enableTasksQueue: true,
419 tasksQueueOptions: { size: -1 }
420 }
421 )
948faff7 422 ).toThrow(
5c4c2dee
JB
423 new RangeError(
424 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
425 )
426 )
427 expect(
428 () =>
429 new FixedThreadPool(
430 numberOfWorkers,
b2fd3f4a 431 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
432 {
433 enableTasksQueue: true,
434 tasksQueueOptions: { size: 0.2 }
435 }
436 )
948faff7 437 ).toThrow(
5c4c2dee
JB
438 new TypeError('Invalid worker node tasks queue size: must be an integer')
439 )
d4aeae5a
JB
440 })
441
2431bdb4 442 it('Verify that pool worker choice strategy options can be set', async () => {
a20f0ba5
JB
443 const pool = new FixedThreadPool(
444 numberOfWorkers,
b2fd3f4a 445 './tests/worker-files/thread/testWorker.mjs',
a20f0ba5
JB
446 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
447 )
26ce26ca 448 expect(pool.opts.workerChoiceStrategyOptions).toBeUndefined()
a20f0ba5
JB
449 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
450 .workerChoiceStrategies) {
86cbb766 451 expect(workerChoiceStrategy.opts).toStrictEqual({
86cbb766
JB
452 runTime: { median: false },
453 waitTime: { median: false },
454 elu: { median: false },
455 weights: expect.objectContaining({
456 0: expect.any(Number),
457 [pool.info.maxSize - 1]: expect.any(Number)
00e1bdeb 458 })
86cbb766 459 })
a20f0ba5 460 }
87de9ff5
JB
461 expect(
462 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
463 ).toStrictEqual({
932fc8be
JB
464 runTime: {
465 aggregate: true,
466 average: true,
467 median: false
468 },
469 waitTime: {
470 aggregate: false,
471 average: false,
472 median: false
473 },
5df69fab 474 elu: {
9adcefab
JB
475 aggregate: true,
476 average: true,
5df69fab
JB
477 median: false
478 }
86bf340d 479 })
9adcefab
JB
480 pool.setWorkerChoiceStrategyOptions({
481 runTime: { median: true },
482 elu: { median: true }
483 })
a20f0ba5 484 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
9adcefab 485 runTime: { median: true },
8990357d
JB
486 elu: { median: true }
487 })
a20f0ba5
JB
488 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
489 .workerChoiceStrategies) {
86cbb766 490 expect(workerChoiceStrategy.opts).toStrictEqual({
86cbb766
JB
491 runTime: { median: true },
492 waitTime: { median: false },
493 elu: { median: true },
494 weights: expect.objectContaining({
495 0: expect.any(Number),
496 [pool.info.maxSize - 1]: expect.any(Number)
00e1bdeb 497 })
86cbb766 498 })
a20f0ba5 499 }
87de9ff5
JB
500 expect(
501 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
502 ).toStrictEqual({
932fc8be
JB
503 runTime: {
504 aggregate: true,
505 average: false,
506 median: true
507 },
508 waitTime: {
509 aggregate: false,
510 average: false,
511 median: false
512 },
5df69fab 513 elu: {
9adcefab 514 aggregate: true,
5df69fab 515 average: false,
9adcefab 516 median: true
5df69fab 517 }
86bf340d 518 })
9adcefab
JB
519 pool.setWorkerChoiceStrategyOptions({
520 runTime: { median: false },
521 elu: { median: false }
522 })
a20f0ba5 523 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
8990357d 524 runTime: { median: false },
8990357d
JB
525 elu: { median: false }
526 })
a20f0ba5
JB
527 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
528 .workerChoiceStrategies) {
86cbb766 529 expect(workerChoiceStrategy.opts).toStrictEqual({
86cbb766
JB
530 runTime: { median: false },
531 waitTime: { median: false },
532 elu: { median: false },
533 weights: expect.objectContaining({
534 0: expect.any(Number),
535 [pool.info.maxSize - 1]: expect.any(Number)
00e1bdeb 536 })
86cbb766 537 })
a20f0ba5 538 }
87de9ff5
JB
539 expect(
540 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
541 ).toStrictEqual({
932fc8be
JB
542 runTime: {
543 aggregate: true,
544 average: true,
545 median: false
546 },
547 waitTime: {
548 aggregate: false,
549 average: false,
550 median: false
551 },
5df69fab 552 elu: {
9adcefab
JB
553 aggregate: true,
554 average: true,
5df69fab
JB
555 median: false
556 }
86bf340d 557 })
1f95d544
JB
558 expect(() =>
559 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
948faff7 560 ).toThrow(
8735b4e5
JB
561 new TypeError(
562 'Invalid worker choice strategy options: must be a plain object'
563 )
1f95d544 564 )
948faff7 565 expect(() => pool.setWorkerChoiceStrategyOptions({ weights: {} })).toThrow(
8735b4e5
JB
566 new Error(
567 'Invalid worker choice strategy options: must have a weight for each worker node'
568 )
1f95d544
JB
569 )
570 expect(() =>
571 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
948faff7 572 ).toThrow(
8735b4e5
JB
573 new Error(
574 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
575 )
1f95d544 576 )
a20f0ba5
JB
577 await pool.destroy()
578 })
579
2431bdb4 580 it('Verify that pool tasks queue can be enabled/disabled', async () => {
a20f0ba5
JB
581 const pool = new FixedThreadPool(
582 numberOfWorkers,
b2fd3f4a 583 './tests/worker-files/thread/testWorker.mjs'
a20f0ba5
JB
584 )
585 expect(pool.opts.enableTasksQueue).toBe(false)
586 expect(pool.opts.tasksQueueOptions).toBeUndefined()
587 pool.enableTasksQueue(true)
588 expect(pool.opts.enableTasksQueue).toBe(true)
20c6f652
JB
589 expect(pool.opts.tasksQueueOptions).toStrictEqual({
590 concurrency: 1,
2324f8c9 591 size: Math.pow(numberOfWorkers, 2),
dbd73092 592 taskStealing: true,
32b141fd 593 tasksStealingOnBackPressure: true,
568d0075 594 tasksFinishedTimeout: 2000
20c6f652 595 })
a20f0ba5
JB
596 pool.enableTasksQueue(true, { concurrency: 2 })
597 expect(pool.opts.enableTasksQueue).toBe(true)
20c6f652
JB
598 expect(pool.opts.tasksQueueOptions).toStrictEqual({
599 concurrency: 2,
2324f8c9 600 size: Math.pow(numberOfWorkers, 2),
dbd73092 601 taskStealing: true,
32b141fd 602 tasksStealingOnBackPressure: true,
568d0075 603 tasksFinishedTimeout: 2000
20c6f652 604 })
a20f0ba5
JB
605 pool.enableTasksQueue(false)
606 expect(pool.opts.enableTasksQueue).toBe(false)
607 expect(pool.opts.tasksQueueOptions).toBeUndefined()
608 await pool.destroy()
609 })
610
2431bdb4 611 it('Verify that pool tasks queue options can be set', async () => {
a20f0ba5
JB
612 const pool = new FixedThreadPool(
613 numberOfWorkers,
b2fd3f4a 614 './tests/worker-files/thread/testWorker.mjs',
a20f0ba5
JB
615 { enableTasksQueue: true }
616 )
20c6f652
JB
617 expect(pool.opts.tasksQueueOptions).toStrictEqual({
618 concurrency: 1,
2324f8c9 619 size: Math.pow(numberOfWorkers, 2),
dbd73092 620 taskStealing: true,
32b141fd 621 tasksStealingOnBackPressure: true,
568d0075 622 tasksFinishedTimeout: 2000
20c6f652 623 })
d6ca1416 624 for (const workerNode of pool.workerNodes) {
2324f8c9
JB
625 expect(workerNode.tasksQueueBackPressureSize).toBe(
626 pool.opts.tasksQueueOptions.size
627 )
d6ca1416
JB
628 }
629 pool.setTasksQueueOptions({
630 concurrency: 2,
2324f8c9 631 size: 2,
d6ca1416 632 taskStealing: false,
32b141fd 633 tasksStealingOnBackPressure: false,
568d0075 634 tasksFinishedTimeout: 3000
d6ca1416 635 })
20c6f652
JB
636 expect(pool.opts.tasksQueueOptions).toStrictEqual({
637 concurrency: 2,
2324f8c9 638 size: 2,
d6ca1416 639 taskStealing: false,
32b141fd 640 tasksStealingOnBackPressure: false,
568d0075 641 tasksFinishedTimeout: 3000
d6ca1416
JB
642 })
643 for (const workerNode of pool.workerNodes) {
2324f8c9
JB
644 expect(workerNode.tasksQueueBackPressureSize).toBe(
645 pool.opts.tasksQueueOptions.size
646 )
d6ca1416
JB
647 }
648 pool.setTasksQueueOptions({
649 concurrency: 1,
650 taskStealing: true,
651 tasksStealingOnBackPressure: true
652 })
653 expect(pool.opts.tasksQueueOptions).toStrictEqual({
654 concurrency: 1,
2324f8c9 655 size: Math.pow(numberOfWorkers, 2),
dbd73092 656 taskStealing: true,
32b141fd 657 tasksStealingOnBackPressure: true,
568d0075 658 tasksFinishedTimeout: 2000
20c6f652 659 })
d6ca1416 660 for (const workerNode of pool.workerNodes) {
2324f8c9
JB
661 expect(workerNode.tasksQueueBackPressureSize).toBe(
662 pool.opts.tasksQueueOptions.size
663 )
d6ca1416 664 }
948faff7 665 expect(() => pool.setTasksQueueOptions('invalidTasksQueueOptions')).toThrow(
8735b4e5
JB
666 new TypeError('Invalid tasks queue options: must be a plain object')
667 )
948faff7 668 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrow(
e695d66f 669 new RangeError(
20c6f652 670 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
8735b4e5
JB
671 )
672 )
948faff7 673 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrow(
e695d66f 674 new RangeError(
20c6f652 675 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
8735b4e5 676 )
a20f0ba5 677 )
948faff7 678 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrow(
20c6f652
JB
679 new TypeError('Invalid worker node tasks concurrency: must be an integer')
680 )
948faff7 681 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrow(
20c6f652 682 new RangeError(
68dbcdc0 683 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
20c6f652
JB
684 )
685 )
948faff7 686 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrow(
20c6f652 687 new RangeError(
68dbcdc0 688 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
20c6f652
JB
689 )
690 )
948faff7 691 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrow(
68dbcdc0 692 new TypeError('Invalid worker node tasks queue size: must be an integer')
f0d7f803 693 )
a20f0ba5
JB
694 await pool.destroy()
695 })
696
6b27d407
JB
697 it('Verify that pool info is set', async () => {
698 let pool = new FixedThreadPool(
699 numberOfWorkers,
b2fd3f4a 700 './tests/worker-files/thread/testWorker.mjs'
6b27d407 701 )
2dca6cab
JB
702 expect(pool.info).toStrictEqual({
703 version,
704 type: PoolTypes.fixed,
705 worker: WorkerTypes.thread,
47352846 706 started: true,
2dca6cab
JB
707 ready: true,
708 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
0e8587d2 709 strategyRetries: 0,
2dca6cab
JB
710 minSize: numberOfWorkers,
711 maxSize: numberOfWorkers,
712 workerNodes: numberOfWorkers,
713 idleWorkerNodes: numberOfWorkers,
714 busyWorkerNodes: 0,
715 executedTasks: 0,
716 executingTasks: 0,
2dca6cab
JB
717 failedTasks: 0
718 })
6b27d407
JB
719 await pool.destroy()
720 pool = new DynamicClusterPool(
2431bdb4 721 Math.floor(numberOfWorkers / 2),
6b27d407 722 numberOfWorkers,
d35e5717 723 './tests/worker-files/cluster/testWorker.cjs'
6b27d407 724 )
2dca6cab
JB
725 expect(pool.info).toStrictEqual({
726 version,
727 type: PoolTypes.dynamic,
728 worker: WorkerTypes.cluster,
47352846 729 started: true,
2dca6cab
JB
730 ready: true,
731 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
0e8587d2 732 strategyRetries: 0,
2dca6cab
JB
733 minSize: Math.floor(numberOfWorkers / 2),
734 maxSize: numberOfWorkers,
735 workerNodes: Math.floor(numberOfWorkers / 2),
736 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
737 busyWorkerNodes: 0,
738 executedTasks: 0,
739 executingTasks: 0,
2dca6cab
JB
740 failedTasks: 0
741 })
6b27d407
JB
742 await pool.destroy()
743 })
744
2431bdb4 745 it('Verify that pool worker tasks usage are initialized', async () => {
bf9549ae
JB
746 const pool = new FixedClusterPool(
747 numberOfWorkers,
d35e5717 748 './tests/worker-files/cluster/testWorker.cjs'
bf9549ae 749 )
f06e48d8 750 for (const workerNode of pool.workerNodes) {
47352846 751 expect(workerNode).toBeInstanceOf(WorkerNode)
465b2940 752 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
753 tasks: {
754 executed: 0,
755 executing: 0,
756 queued: 0,
df593701 757 maxQueued: 0,
463226a4 758 sequentiallyStolen: 0,
68cbdc84 759 stolen: 0,
a4e07f72
JB
760 failed: 0
761 },
762 runTime: {
4ba4c7f9 763 history: new CircularArray()
a4e07f72
JB
764 },
765 waitTime: {
4ba4c7f9 766 history: new CircularArray()
a4e07f72 767 },
5df69fab
JB
768 elu: {
769 idle: {
4ba4c7f9 770 history: new CircularArray()
5df69fab
JB
771 },
772 active: {
4ba4c7f9 773 history: new CircularArray()
f7510105 774 }
5df69fab 775 }
86bf340d 776 })
f06e48d8
JB
777 }
778 await pool.destroy()
779 })
780
2431bdb4
JB
781 it('Verify that pool worker tasks queue are initialized', async () => {
782 let pool = new FixedClusterPool(
f06e48d8 783 numberOfWorkers,
d35e5717 784 './tests/worker-files/cluster/testWorker.cjs'
f06e48d8
JB
785 )
786 for (const workerNode of pool.workerNodes) {
47352846 787 expect(workerNode).toBeInstanceOf(WorkerNode)
574b351d 788 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
4d8bf9e4 789 expect(workerNode.tasksQueue.size).toBe(0)
9c16fb4b 790 expect(workerNode.tasksQueue.maxSize).toBe(0)
bf9549ae 791 }
fd7ebd49 792 await pool.destroy()
2431bdb4
JB
793 pool = new DynamicThreadPool(
794 Math.floor(numberOfWorkers / 2),
795 numberOfWorkers,
b2fd3f4a 796 './tests/worker-files/thread/testWorker.mjs'
2431bdb4
JB
797 )
798 for (const workerNode of pool.workerNodes) {
47352846 799 expect(workerNode).toBeInstanceOf(WorkerNode)
574b351d 800 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
2431bdb4
JB
801 expect(workerNode.tasksQueue.size).toBe(0)
802 expect(workerNode.tasksQueue.maxSize).toBe(0)
803 }
213cbac6 804 await pool.destroy()
2431bdb4
JB
805 })
806
807 it('Verify that pool worker info are initialized', async () => {
808 let pool = new FixedClusterPool(
809 numberOfWorkers,
d35e5717 810 './tests/worker-files/cluster/testWorker.cjs'
2431bdb4 811 )
2dca6cab 812 for (const workerNode of pool.workerNodes) {
47352846 813 expect(workerNode).toBeInstanceOf(WorkerNode)
2dca6cab
JB
814 expect(workerNode.info).toStrictEqual({
815 id: expect.any(Number),
816 type: WorkerTypes.cluster,
817 dynamic: false,
5eb72b9e
JB
818 ready: true,
819 stealing: false
2dca6cab
JB
820 })
821 }
2431bdb4
JB
822 await pool.destroy()
823 pool = new DynamicThreadPool(
824 Math.floor(numberOfWorkers / 2),
825 numberOfWorkers,
b2fd3f4a 826 './tests/worker-files/thread/testWorker.mjs'
2431bdb4 827 )
2dca6cab 828 for (const workerNode of pool.workerNodes) {
47352846 829 expect(workerNode).toBeInstanceOf(WorkerNode)
2dca6cab
JB
830 expect(workerNode.info).toStrictEqual({
831 id: expect.any(Number),
832 type: WorkerTypes.thread,
833 dynamic: false,
5eb72b9e
JB
834 ready: true,
835 stealing: false
2dca6cab
JB
836 })
837 }
213cbac6 838 await pool.destroy()
bf9549ae
JB
839 })
840
711623b8
JB
841 it('Verify that pool statuses are checked at start or destroy', async () => {
842 const pool = new FixedThreadPool(
843 numberOfWorkers,
844 './tests/worker-files/thread/testWorker.mjs'
845 )
846 expect(pool.info.started).toBe(true)
847 expect(pool.info.ready).toBe(true)
848 expect(() => pool.start()).toThrow(
849 new Error('Cannot start an already started pool')
850 )
851 await pool.destroy()
852 expect(pool.info.started).toBe(false)
853 expect(pool.info.ready).toBe(false)
854 await expect(pool.destroy()).rejects.toThrow(
855 new Error('Cannot destroy an already destroyed pool')
856 )
857 })
858
47352846
JB
859 it('Verify that pool can be started after initialization', async () => {
860 const pool = new FixedClusterPool(
861 numberOfWorkers,
d35e5717 862 './tests/worker-files/cluster/testWorker.cjs',
47352846
JB
863 {
864 startWorkers: false
865 }
866 )
867 expect(pool.info.started).toBe(false)
868 expect(pool.info.ready).toBe(false)
869 expect(pool.workerNodes).toStrictEqual([])
8e8d9101 870 expect(pool.readyEventEmitted).toBe(false)
948faff7 871 await expect(pool.execute()).rejects.toThrow(
47352846
JB
872 new Error('Cannot execute a task on not started pool')
873 )
874 pool.start()
875 expect(pool.info.started).toBe(true)
876 expect(pool.info.ready).toBe(true)
55082af9
JB
877 await waitPoolEvents(pool, PoolEvents.ready, 1)
878 expect(pool.readyEventEmitted).toBe(true)
47352846
JB
879 expect(pool.workerNodes.length).toBe(numberOfWorkers)
880 for (const workerNode of pool.workerNodes) {
881 expect(workerNode).toBeInstanceOf(WorkerNode)
882 }
883 await pool.destroy()
884 })
885
9d2d0da1
JB
886 it('Verify that pool execute() arguments are checked', async () => {
887 const pool = new FixedClusterPool(
888 numberOfWorkers,
d35e5717 889 './tests/worker-files/cluster/testWorker.cjs'
9d2d0da1 890 )
948faff7 891 await expect(pool.execute(undefined, 0)).rejects.toThrow(
9d2d0da1
JB
892 new TypeError('name argument must be a string')
893 )
948faff7 894 await expect(pool.execute(undefined, '')).rejects.toThrow(
9d2d0da1
JB
895 new TypeError('name argument must not be an empty string')
896 )
948faff7 897 await expect(pool.execute(undefined, undefined, {})).rejects.toThrow(
9d2d0da1
JB
898 new TypeError('transferList argument must be an array')
899 )
900 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
901 "Task function 'unknown' not found"
902 )
903 await pool.destroy()
948faff7 904 await expect(pool.execute()).rejects.toThrow(
47352846 905 new Error('Cannot execute a task on not started pool')
9d2d0da1
JB
906 )
907 })
908
2431bdb4 909 it('Verify that pool worker tasks usage are computed', async () => {
bf9549ae
JB
910 const pool = new FixedClusterPool(
911 numberOfWorkers,
d35e5717 912 './tests/worker-files/cluster/testWorker.cjs'
bf9549ae 913 )
09c2d0d3 914 const promises = new Set()
fc027381
JB
915 const maxMultiplier = 2
916 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
09c2d0d3 917 promises.add(pool.execute())
bf9549ae 918 }
f06e48d8 919 for (const workerNode of pool.workerNodes) {
465b2940 920 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
921 tasks: {
922 executed: 0,
923 executing: maxMultiplier,
924 queued: 0,
df593701 925 maxQueued: 0,
463226a4 926 sequentiallyStolen: 0,
68cbdc84 927 stolen: 0,
a4e07f72
JB
928 failed: 0
929 },
930 runTime: {
a4e07f72
JB
931 history: expect.any(CircularArray)
932 },
933 waitTime: {
a4e07f72
JB
934 history: expect.any(CircularArray)
935 },
5df69fab
JB
936 elu: {
937 idle: {
5df69fab
JB
938 history: expect.any(CircularArray)
939 },
940 active: {
5df69fab 941 history: expect.any(CircularArray)
f7510105 942 }
5df69fab 943 }
86bf340d 944 })
bf9549ae
JB
945 }
946 await Promise.all(promises)
f06e48d8 947 for (const workerNode of pool.workerNodes) {
465b2940 948 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
949 tasks: {
950 executed: maxMultiplier,
951 executing: 0,
952 queued: 0,
df593701 953 maxQueued: 0,
463226a4 954 sequentiallyStolen: 0,
68cbdc84 955 stolen: 0,
a4e07f72
JB
956 failed: 0
957 },
958 runTime: {
a4e07f72
JB
959 history: expect.any(CircularArray)
960 },
961 waitTime: {
a4e07f72
JB
962 history: expect.any(CircularArray)
963 },
5df69fab
JB
964 elu: {
965 idle: {
5df69fab
JB
966 history: expect.any(CircularArray)
967 },
968 active: {
5df69fab 969 history: expect.any(CircularArray)
f7510105 970 }
5df69fab 971 }
86bf340d 972 })
bf9549ae 973 }
fd7ebd49 974 await pool.destroy()
bf9549ae
JB
975 })
976
2431bdb4 977 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
7fd82a1c 978 const pool = new DynamicThreadPool(
2431bdb4 979 Math.floor(numberOfWorkers / 2),
8f4878b7 980 numberOfWorkers,
b2fd3f4a 981 './tests/worker-files/thread/testWorker.mjs'
9e619829 982 )
09c2d0d3 983 const promises = new Set()
ee9f5295
JB
984 const maxMultiplier = 2
985 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
09c2d0d3 986 promises.add(pool.execute())
9e619829
JB
987 }
988 await Promise.all(promises)
f06e48d8 989 for (const workerNode of pool.workerNodes) {
465b2940 990 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
991 tasks: {
992 executed: expect.any(Number),
993 executing: 0,
994 queued: 0,
df593701 995 maxQueued: 0,
463226a4 996 sequentiallyStolen: 0,
68cbdc84 997 stolen: 0,
a4e07f72
JB
998 failed: 0
999 },
1000 runTime: {
a4e07f72
JB
1001 history: expect.any(CircularArray)
1002 },
1003 waitTime: {
a4e07f72
JB
1004 history: expect.any(CircularArray)
1005 },
5df69fab
JB
1006 elu: {
1007 idle: {
5df69fab
JB
1008 history: expect.any(CircularArray)
1009 },
1010 active: {
5df69fab 1011 history: expect.any(CircularArray)
f7510105 1012 }
5df69fab 1013 }
86bf340d 1014 })
465b2940 1015 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
94407def
JB
1016 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1017 numberOfWorkers * maxMultiplier
1018 )
b97d82d8
JB
1019 expect(workerNode.usage.runTime.history.length).toBe(0)
1020 expect(workerNode.usage.waitTime.history.length).toBe(0)
1021 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1022 expect(workerNode.usage.elu.active.history.length).toBe(0)
9e619829
JB
1023 }
1024 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
f06e48d8 1025 for (const workerNode of pool.workerNodes) {
465b2940 1026 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
1027 tasks: {
1028 executed: 0,
1029 executing: 0,
1030 queued: 0,
df593701 1031 maxQueued: 0,
463226a4 1032 sequentiallyStolen: 0,
68cbdc84 1033 stolen: 0,
a4e07f72
JB
1034 failed: 0
1035 },
1036 runTime: {
a4e07f72
JB
1037 history: expect.any(CircularArray)
1038 },
1039 waitTime: {
a4e07f72
JB
1040 history: expect.any(CircularArray)
1041 },
5df69fab
JB
1042 elu: {
1043 idle: {
5df69fab
JB
1044 history: expect.any(CircularArray)
1045 },
1046 active: {
5df69fab 1047 history: expect.any(CircularArray)
f7510105 1048 }
5df69fab 1049 }
86bf340d 1050 })
465b2940
JB
1051 expect(workerNode.usage.runTime.history.length).toBe(0)
1052 expect(workerNode.usage.waitTime.history.length).toBe(0)
b97d82d8
JB
1053 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1054 expect(workerNode.usage.elu.active.history.length).toBe(0)
ee11a4a2 1055 }
fd7ebd49 1056 await pool.destroy()
ee11a4a2
JB
1057 })
1058
a1763c54
JB
1059 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1060 const pool = new DynamicClusterPool(
2431bdb4 1061 Math.floor(numberOfWorkers / 2),
164d950a 1062 numberOfWorkers,
d35e5717 1063 './tests/worker-files/cluster/testWorker.cjs'
164d950a 1064 )
c726f66c 1065 expect(pool.emitter.eventNames()).toStrictEqual([])
d46660cd 1066 let poolInfo
a1763c54 1067 let poolReady = 0
041dc05b 1068 pool.emitter.on(PoolEvents.ready, info => {
a1763c54 1069 ++poolReady
d46660cd
JB
1070 poolInfo = info
1071 })
a1763c54 1072 await waitPoolEvents(pool, PoolEvents.ready, 1)
c726f66c 1073 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
a1763c54 1074 expect(poolReady).toBe(1)
d46660cd 1075 expect(poolInfo).toStrictEqual({
23ccf9d7 1076 version,
d46660cd 1077 type: PoolTypes.dynamic,
a1763c54 1078 worker: WorkerTypes.cluster,
47352846 1079 started: true,
a1763c54 1080 ready: true,
2431bdb4 1081 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
0e8587d2 1082 strategyRetries: expect.any(Number),
2431bdb4
JB
1083 minSize: expect.any(Number),
1084 maxSize: expect.any(Number),
1085 workerNodes: expect.any(Number),
1086 idleWorkerNodes: expect.any(Number),
1087 busyWorkerNodes: expect.any(Number),
1088 executedTasks: expect.any(Number),
1089 executingTasks: expect.any(Number),
2431bdb4
JB
1090 failedTasks: expect.any(Number)
1091 })
1092 await pool.destroy()
1093 })
1094
a1763c54
JB
1095 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1096 const pool = new FixedThreadPool(
2431bdb4 1097 numberOfWorkers,
b2fd3f4a 1098 './tests/worker-files/thread/testWorker.mjs'
2431bdb4 1099 )
c726f66c 1100 expect(pool.emitter.eventNames()).toStrictEqual([])
a1763c54
JB
1101 const promises = new Set()
1102 let poolBusy = 0
2431bdb4 1103 let poolInfo
041dc05b 1104 pool.emitter.on(PoolEvents.busy, info => {
a1763c54 1105 ++poolBusy
2431bdb4
JB
1106 poolInfo = info
1107 })
c726f66c 1108 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
a1763c54
JB
1109 for (let i = 0; i < numberOfWorkers * 2; i++) {
1110 promises.add(pool.execute())
1111 }
1112 await Promise.all(promises)
1113 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1114 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1115 expect(poolBusy).toBe(numberOfWorkers + 1)
2431bdb4
JB
1116 expect(poolInfo).toStrictEqual({
1117 version,
a1763c54
JB
1118 type: PoolTypes.fixed,
1119 worker: WorkerTypes.thread,
47352846
JB
1120 started: true,
1121 ready: true,
2431bdb4 1122 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
0e8587d2 1123 strategyRetries: expect.any(Number),
d46660cd
JB
1124 minSize: expect.any(Number),
1125 maxSize: expect.any(Number),
1126 workerNodes: expect.any(Number),
1127 idleWorkerNodes: expect.any(Number),
1128 busyWorkerNodes: expect.any(Number),
a4e07f72
JB
1129 executedTasks: expect.any(Number),
1130 executingTasks: expect.any(Number),
a4e07f72 1131 failedTasks: expect.any(Number)
d46660cd 1132 })
164d950a
JB
1133 await pool.destroy()
1134 })
1135
a1763c54
JB
1136 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1137 const pool = new DynamicThreadPool(
1138 Math.floor(numberOfWorkers / 2),
7c0ba920 1139 numberOfWorkers,
b2fd3f4a 1140 './tests/worker-files/thread/testWorker.mjs'
7c0ba920 1141 )
c726f66c 1142 expect(pool.emitter.eventNames()).toStrictEqual([])
09c2d0d3 1143 const promises = new Set()
a1763c54 1144 let poolFull = 0
d46660cd 1145 let poolInfo
041dc05b 1146 pool.emitter.on(PoolEvents.full, info => {
a1763c54 1147 ++poolFull
d46660cd
JB
1148 poolInfo = info
1149 })
c726f66c 1150 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.full])
7c0ba920 1151 for (let i = 0; i < numberOfWorkers * 2; i++) {
f5d14e90 1152 promises.add(pool.execute())
7c0ba920 1153 }
cf597bc5 1154 await Promise.all(promises)
33e6bb4c 1155 expect(poolFull).toBe(1)
d46660cd 1156 expect(poolInfo).toStrictEqual({
23ccf9d7 1157 version,
a1763c54 1158 type: PoolTypes.dynamic,
d46660cd 1159 worker: WorkerTypes.thread,
47352846
JB
1160 started: true,
1161 ready: true,
2431bdb4 1162 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
0e8587d2 1163 strategyRetries: expect.any(Number),
8735b4e5
JB
1164 minSize: expect.any(Number),
1165 maxSize: expect.any(Number),
1166 workerNodes: expect.any(Number),
1167 idleWorkerNodes: expect.any(Number),
1168 busyWorkerNodes: expect.any(Number),
1169 executedTasks: expect.any(Number),
1170 executingTasks: expect.any(Number),
1171 failedTasks: expect.any(Number)
1172 })
1173 await pool.destroy()
1174 })
1175
3e8611a8 1176 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
b1aae695 1177 const pool = new FixedThreadPool(
8735b4e5 1178 numberOfWorkers,
b2fd3f4a 1179 './tests/worker-files/thread/testWorker.mjs',
8735b4e5
JB
1180 {
1181 enableTasksQueue: true
1182 }
1183 )
a074ffee 1184 stub(pool, 'hasBackPressure').returns(true)
c726f66c 1185 expect(pool.emitter.eventNames()).toStrictEqual([])
8735b4e5
JB
1186 const promises = new Set()
1187 let poolBackPressure = 0
1188 let poolInfo
041dc05b 1189 pool.emitter.on(PoolEvents.backPressure, info => {
8735b4e5
JB
1190 ++poolBackPressure
1191 poolInfo = info
1192 })
c726f66c 1193 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.backPressure])
033f1776 1194 for (let i = 0; i < numberOfWorkers + 1; i++) {
8735b4e5
JB
1195 promises.add(pool.execute())
1196 }
1197 await Promise.all(promises)
033f1776 1198 expect(poolBackPressure).toBe(1)
8735b4e5
JB
1199 expect(poolInfo).toStrictEqual({
1200 version,
3e8611a8 1201 type: PoolTypes.fixed,
8735b4e5 1202 worker: WorkerTypes.thread,
47352846
JB
1203 started: true,
1204 ready: true,
8735b4e5 1205 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
0e8587d2 1206 strategyRetries: expect.any(Number),
d46660cd
JB
1207 minSize: expect.any(Number),
1208 maxSize: expect.any(Number),
1209 workerNodes: expect.any(Number),
1210 idleWorkerNodes: expect.any(Number),
5eb72b9e 1211 stealingWorkerNodes: expect.any(Number),
d46660cd 1212 busyWorkerNodes: expect.any(Number),
a4e07f72
JB
1213 executedTasks: expect.any(Number),
1214 executingTasks: expect.any(Number),
3e8611a8
JB
1215 maxQueuedTasks: expect.any(Number),
1216 queuedTasks: expect.any(Number),
1217 backPressure: true,
68cbdc84 1218 stolenTasks: expect.any(Number),
a4e07f72 1219 failedTasks: expect.any(Number)
d46660cd 1220 })
5eb72b9e 1221 expect(pool.hasBackPressure.callCount).toBeGreaterThanOrEqual(7)
fd7ebd49 1222 await pool.destroy()
7c0ba920 1223 })
70a4f5ea 1224
85b2561d
JB
1225 it('Verify that destroy() waits for queued tasks to finish', async () => {
1226 const tasksFinishedTimeout = 2500
1227 const pool = new FixedThreadPool(
1228 numberOfWorkers,
1229 './tests/worker-files/thread/asyncWorker.mjs',
1230 {
1231 enableTasksQueue: true,
1232 tasksQueueOptions: { tasksFinishedTimeout }
1233 }
1234 )
1235 const maxMultiplier = 4
1236 let tasksFinished = 0
1237 for (const workerNode of pool.workerNodes) {
1238 workerNode.on('taskFinished', () => {
1239 ++tasksFinished
1240 })
1241 }
1242 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1243 pool.execute()
1244 }
1245 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1246 const startTime = performance.now()
1247 await pool.destroy()
1248 const elapsedTime = performance.now() - startTime
90afa746 1249 expect(tasksFinished).toBeLessThanOrEqual(numberOfWorkers * maxMultiplier)
85b2561d 1250 expect(elapsedTime).toBeGreaterThanOrEqual(2000)
14d5e183 1251 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 800)
85b2561d
JB
1252 })
1253
1254 it('Verify that destroy() waits until the tasks finished timeout is reached', async () => {
1255 const tasksFinishedTimeout = 1000
1256 const pool = new FixedThreadPool(
1257 numberOfWorkers,
1258 './tests/worker-files/thread/asyncWorker.mjs',
1259 {
1260 enableTasksQueue: true,
1261 tasksQueueOptions: { tasksFinishedTimeout }
1262 }
1263 )
1264 const maxMultiplier = 4
1265 let tasksFinished = 0
1266 for (const workerNode of pool.workerNodes) {
1267 workerNode.on('taskFinished', () => {
1268 ++tasksFinished
1269 })
1270 }
1271 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1272 pool.execute()
1273 }
1274 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1275 const startTime = performance.now()
1276 await pool.destroy()
1277 const elapsedTime = performance.now() - startTime
1278 expect(tasksFinished).toBe(0)
2885534c 1279 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 800)
85b2561d
JB
1280 })
1281
f18fd12b
JB
1282 it('Verify that pool asynchronous resource track tasks execution', async () => {
1283 let taskAsyncId
1284 let initCalls = 0
1285 let beforeCalls = 0
1286 let afterCalls = 0
1287 let resolveCalls = 0
1288 const hook = createHook({
1289 init (asyncId, type) {
1290 if (type === 'poolifier:task') {
1291 initCalls++
1292 taskAsyncId = asyncId
1293 }
1294 },
1295 before (asyncId) {
1296 if (asyncId === taskAsyncId) beforeCalls++
1297 },
1298 after (asyncId) {
1299 if (asyncId === taskAsyncId) afterCalls++
1300 },
1301 promiseResolve () {
1302 if (executionAsyncId() === taskAsyncId) resolveCalls++
1303 }
1304 })
f18fd12b
JB
1305 const pool = new FixedThreadPool(
1306 numberOfWorkers,
1307 './tests/worker-files/thread/testWorker.mjs'
1308 )
8954c0a3 1309 hook.enable()
f18fd12b
JB
1310 await pool.execute()
1311 hook.disable()
1312 expect(initCalls).toBe(1)
1313 expect(beforeCalls).toBe(1)
1314 expect(afterCalls).toBe(1)
1315 expect(resolveCalls).toBe(1)
8954c0a3 1316 await pool.destroy()
f18fd12b
JB
1317 })
1318
9eae3c69
JB
1319 it('Verify that hasTaskFunction() is working', async () => {
1320 const dynamicThreadPool = new DynamicThreadPool(
1321 Math.floor(numberOfWorkers / 2),
1322 numberOfWorkers,
b2fd3f4a 1323 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
9eae3c69
JB
1324 )
1325 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1326 expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1327 expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1328 true
1329 )
1330 expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true)
1331 expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true)
1332 expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false)
1333 await dynamicThreadPool.destroy()
1334 const fixedClusterPool = new FixedClusterPool(
1335 numberOfWorkers,
d35e5717 1336 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs'
9eae3c69
JB
1337 )
1338 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1339 expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1340 expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1341 true
1342 )
1343 expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true)
1344 expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true)
1345 expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false)
1346 await fixedClusterPool.destroy()
1347 })
1348
1349 it('Verify that addTaskFunction() is working', async () => {
1350 const dynamicThreadPool = new DynamicThreadPool(
1351 Math.floor(numberOfWorkers / 2),
1352 numberOfWorkers,
b2fd3f4a 1353 './tests/worker-files/thread/testWorker.mjs'
9eae3c69
JB
1354 )
1355 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
3feeab69
JB
1356 await expect(
1357 dynamicThreadPool.addTaskFunction(0, () => {})
948faff7 1358 ).rejects.toThrow(new TypeError('name argument must be a string'))
3feeab69
JB
1359 await expect(
1360 dynamicThreadPool.addTaskFunction('', () => {})
948faff7 1361 ).rejects.toThrow(
3feeab69
JB
1362 new TypeError('name argument must not be an empty string')
1363 )
948faff7
JB
1364 await expect(dynamicThreadPool.addTaskFunction('test', 0)).rejects.toThrow(
1365 new TypeError('fn argument must be a function')
1366 )
1367 await expect(dynamicThreadPool.addTaskFunction('test', '')).rejects.toThrow(
1368 new TypeError('fn argument must be a function')
1369 )
9eae3c69
JB
1370 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1371 DEFAULT_TASK_NAME,
1372 'test'
1373 ])
1374 const echoTaskFunction = data => {
1375 return data
1376 }
1377 await expect(
1378 dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1379 ).resolves.toBe(true)
1380 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1381 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1382 echoTaskFunction
1383 )
1384 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1385 DEFAULT_TASK_NAME,
1386 'test',
1387 'echo'
1388 ])
1389 const taskFunctionData = { test: 'test' }
1390 const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
1391 expect(echoResult).toStrictEqual(taskFunctionData)
adee6053
JB
1392 for (const workerNode of dynamicThreadPool.workerNodes) {
1393 expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
1394 tasks: {
1395 executed: expect.any(Number),
1396 executing: 0,
1397 queued: 0,
463226a4 1398 sequentiallyStolen: 0,
5ad42e34 1399 stolen: 0,
adee6053
JB
1400 failed: 0
1401 },
1402 runTime: {
1403 history: new CircularArray()
1404 },
1405 waitTime: {
1406 history: new CircularArray()
1407 },
1408 elu: {
1409 idle: {
1410 history: new CircularArray()
1411 },
1412 active: {
1413 history: new CircularArray()
1414 }
1415 }
1416 })
1417 }
9eae3c69
JB
1418 await dynamicThreadPool.destroy()
1419 })
1420
1421 it('Verify that removeTaskFunction() is working', async () => {
1422 const dynamicThreadPool = new DynamicThreadPool(
1423 Math.floor(numberOfWorkers / 2),
1424 numberOfWorkers,
b2fd3f4a 1425 './tests/worker-files/thread/testWorker.mjs'
9eae3c69
JB
1426 )
1427 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1428 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1429 DEFAULT_TASK_NAME,
1430 'test'
1431 ])
948faff7 1432 await expect(dynamicThreadPool.removeTaskFunction('test')).rejects.toThrow(
16248b23 1433 new Error('Cannot remove a task function not handled on the pool side')
9eae3c69
JB
1434 )
1435 const echoTaskFunction = data => {
1436 return data
1437 }
1438 await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1439 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1440 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1441 echoTaskFunction
1442 )
1443 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1444 DEFAULT_TASK_NAME,
1445 'test',
1446 'echo'
1447 ])
1448 await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
1449 true
1450 )
1451 expect(dynamicThreadPool.taskFunctions.size).toBe(0)
1452 expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
1453 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1454 DEFAULT_TASK_NAME,
1455 'test'
1456 ])
1457 await dynamicThreadPool.destroy()
1458 })
1459
30500265 1460 it('Verify that listTaskFunctionNames() is working', async () => {
90d7d101
JB
1461 const dynamicThreadPool = new DynamicThreadPool(
1462 Math.floor(numberOfWorkers / 2),
1463 numberOfWorkers,
b2fd3f4a 1464 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
90d7d101
JB
1465 )
1466 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
66979634 1467 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
6cd5248f 1468 DEFAULT_TASK_NAME,
90d7d101
JB
1469 'jsonIntegerSerialization',
1470 'factorial',
1471 'fibonacci'
1472 ])
9eae3c69 1473 await dynamicThreadPool.destroy()
90d7d101
JB
1474 const fixedClusterPool = new FixedClusterPool(
1475 numberOfWorkers,
d35e5717 1476 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs'
90d7d101
JB
1477 )
1478 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
66979634 1479 expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([
6cd5248f 1480 DEFAULT_TASK_NAME,
90d7d101
JB
1481 'jsonIntegerSerialization',
1482 'factorial',
1483 'fibonacci'
1484 ])
0fe39c97 1485 await fixedClusterPool.destroy()
90d7d101
JB
1486 })
1487
9eae3c69 1488 it('Verify that setDefaultTaskFunction() is working', async () => {
30500265
JB
1489 const dynamicThreadPool = new DynamicThreadPool(
1490 Math.floor(numberOfWorkers / 2),
1491 numberOfWorkers,
b2fd3f4a 1492 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
30500265
JB
1493 )
1494 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
711623b8 1495 const workerId = dynamicThreadPool.workerNodes[0].info.id
948faff7 1496 await expect(dynamicThreadPool.setDefaultTaskFunction(0)).rejects.toThrow(
b0b55f57 1497 new Error(
711623b8 1498 `Task function operation 'default' failed on worker ${workerId} with error: 'TypeError: name parameter is not a string'`
b0b55f57
JB
1499 )
1500 )
1501 await expect(
1502 dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
948faff7 1503 ).rejects.toThrow(
b0b55f57 1504 new Error(
711623b8 1505 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function reserved name as the default task function'`
b0b55f57
JB
1506 )
1507 )
1508 await expect(
1509 dynamicThreadPool.setDefaultTaskFunction('unknown')
948faff7 1510 ).rejects.toThrow(
b0b55f57 1511 new Error(
711623b8 1512 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function to a non-existing task function'`
b0b55f57
JB
1513 )
1514 )
9eae3c69
JB
1515 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1516 DEFAULT_TASK_NAME,
1517 'jsonIntegerSerialization',
1518 'factorial',
1519 'fibonacci'
1520 ])
1521 await expect(
1522 dynamicThreadPool.setDefaultTaskFunction('factorial')
1523 ).resolves.toBe(true)
1524 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1525 DEFAULT_TASK_NAME,
1526 'factorial',
1527 'jsonIntegerSerialization',
1528 'fibonacci'
1529 ])
1530 await expect(
1531 dynamicThreadPool.setDefaultTaskFunction('fibonacci')
1532 ).resolves.toBe(true)
1533 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1534 DEFAULT_TASK_NAME,
1535 'fibonacci',
1536 'jsonIntegerSerialization',
1537 'factorial'
1538 ])
cda9ba34 1539 await dynamicThreadPool.destroy()
30500265
JB
1540 })
1541
90d7d101 1542 it('Verify that multiple task functions worker is working', async () => {
70a4f5ea 1543 const pool = new DynamicClusterPool(
2431bdb4 1544 Math.floor(numberOfWorkers / 2),
70a4f5ea 1545 numberOfWorkers,
d35e5717 1546 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs'
70a4f5ea
JB
1547 )
1548 const data = { n: 10 }
82888165 1549 const result0 = await pool.execute(data)
30b963d4 1550 expect(result0).toStrictEqual({ ok: 1 })
70a4f5ea 1551 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
30b963d4 1552 expect(result1).toStrictEqual({ ok: 1 })
70a4f5ea
JB
1553 const result2 = await pool.execute(data, 'factorial')
1554 expect(result2).toBe(3628800)
1555 const result3 = await pool.execute(data, 'fibonacci')
024daf59 1556 expect(result3).toBe(55)
5bb5be17
JB
1557 expect(pool.info.executingTasks).toBe(0)
1558 expect(pool.info.executedTasks).toBe(4)
b414b84c 1559 for (const workerNode of pool.workerNodes) {
66979634 1560 expect(workerNode.info.taskFunctionNames).toStrictEqual([
6cd5248f 1561 DEFAULT_TASK_NAME,
b414b84c
JB
1562 'jsonIntegerSerialization',
1563 'factorial',
1564 'fibonacci'
1565 ])
1566 expect(workerNode.taskFunctionsUsage.size).toBe(3)
66979634 1567 for (const name of pool.listTaskFunctionNames()) {
5bb5be17
JB
1568 expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
1569 tasks: {
1570 executed: expect.any(Number),
4ba4c7f9 1571 executing: 0,
5bb5be17 1572 failed: 0,
68cbdc84 1573 queued: 0,
463226a4 1574 sequentiallyStolen: 0,
68cbdc84 1575 stolen: 0
5bb5be17
JB
1576 },
1577 runTime: {
1578 history: expect.any(CircularArray)
1579 },
1580 waitTime: {
1581 history: expect.any(CircularArray)
1582 },
1583 elu: {
1584 idle: {
1585 history: expect.any(CircularArray)
1586 },
1587 active: {
1588 history: expect.any(CircularArray)
1589 }
1590 }
1591 })
1592 expect(
4ba4c7f9
JB
1593 workerNode.getTaskFunctionWorkerUsage(name).tasks.executed
1594 ).toBeGreaterThan(0)
5bb5be17 1595 }
dfd7ec01
JB
1596 expect(
1597 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1598 ).toStrictEqual(
66979634
JB
1599 workerNode.getTaskFunctionWorkerUsage(
1600 workerNode.info.taskFunctionNames[1]
1601 )
dfd7ec01 1602 )
5bb5be17 1603 }
0fe39c97 1604 await pool.destroy()
70a4f5ea 1605 })
52a23942
JB
1606
1607 it('Verify sendKillMessageToWorker()', async () => {
1608 const pool = new DynamicClusterPool(
1609 Math.floor(numberOfWorkers / 2),
1610 numberOfWorkers,
d35e5717 1611 './tests/worker-files/cluster/testWorker.cjs'
52a23942
JB
1612 )
1613 const workerNodeKey = 0
1614 await expect(
adee6053 1615 pool.sendKillMessageToWorker(workerNodeKey)
52a23942
JB
1616 ).resolves.toBeUndefined()
1617 await pool.destroy()
1618 })
adee6053
JB
1619
1620 it('Verify sendTaskFunctionOperationToWorker()', async () => {
1621 const pool = new DynamicClusterPool(
1622 Math.floor(numberOfWorkers / 2),
1623 numberOfWorkers,
d35e5717 1624 './tests/worker-files/cluster/testWorker.cjs'
adee6053
JB
1625 )
1626 const workerNodeKey = 0
1627 await expect(
1628 pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
1629 taskFunctionOperation: 'add',
1630 taskFunctionName: 'empty',
1631 taskFunction: (() => {}).toString()
1632 })
1633 ).resolves.toBe(true)
1634 expect(
1635 pool.workerNodes[workerNodeKey].info.taskFunctionNames
1636 ).toStrictEqual([DEFAULT_TASK_NAME, 'test', 'empty'])
1637 await pool.destroy()
1638 })
1639
1640 it('Verify sendTaskFunctionOperationToWorkers()', async () => {
1641 const pool = new DynamicClusterPool(
1642 Math.floor(numberOfWorkers / 2),
1643 numberOfWorkers,
d35e5717 1644 './tests/worker-files/cluster/testWorker.cjs'
adee6053 1645 )
adee6053
JB
1646 await expect(
1647 pool.sendTaskFunctionOperationToWorkers({
1648 taskFunctionOperation: 'add',
1649 taskFunctionName: 'empty',
1650 taskFunction: (() => {}).toString()
1651 })
1652 ).resolves.toBe(true)
1653 for (const workerNode of pool.workerNodes) {
1654 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1655 DEFAULT_TASK_NAME,
1656 'test',
1657 'empty'
1658 ])
1659 }
1660 await pool.destroy()
1661 })
3ec964d6 1662})