build(ci): reduce coverage requirements for node 18.x
[poolifier.git] / tests / pools / abstract-pool.test.mjs
CommitLineData
a074ffee 1import { EventEmitterAsyncResource } from 'node:events'
2eb14889 2import { dirname, join } from 'node:path'
a074ffee 3import { readFileSync } from 'node:fs'
2eb14889 4import { fileURLToPath } from 'node:url'
f18fd12b 5import { createHook, executionAsyncId } from 'node:async_hooks'
a074ffee
JB
6import { expect } from 'expect'
7import { restore, stub } from 'sinon'
8import {
70a4f5ea 9 DynamicClusterPool,
9e619829 10 DynamicThreadPool,
aee46736 11 FixedClusterPool,
e843b904 12 FixedThreadPool,
aee46736 13 PoolEvents,
184855e6 14 PoolTypes,
3d6dd312 15 WorkerChoiceStrategies,
184855e6 16 WorkerTypes
a074ffee
JB
17} from '../../lib/index.js'
18import { CircularArray } from '../../lib/circular-array.js'
19import { Deque } from '../../lib/deque.js'
20import { DEFAULT_TASK_NAME } from '../../lib/utils.js'
21import { waitPoolEvents } from '../test-utils.js'
22import { WorkerNode } from '../../lib/pools/worker-node.js'
e1ffb94f
JB
23
24describe('Abstract pool test suite', () => {
2eb14889
JB
25 const version = JSON.parse(
26 readFileSync(
a5e5599c 27 join(dirname(fileURLToPath(import.meta.url)), '../..', 'package.json'),
2eb14889
JB
28 'utf8'
29 )
30 ).version
fc027381 31 const numberOfWorkers = 2
a8884ffd 32 class StubPoolWithIsMain extends FixedThreadPool {
e1ffb94f
JB
33 isMain () {
34 return false
35 }
3ec964d6 36 }
3ec964d6 37
dc021bcc 38 afterEach(() => {
a074ffee 39 restore()
dc021bcc
JB
40 })
41
bcf85f7f
JB
42 it('Verify that pool can be created and destroyed', async () => {
43 const pool = new FixedThreadPool(
44 numberOfWorkers,
45 './tests/worker-files/thread/testWorker.mjs'
46 )
47 expect(pool).toBeInstanceOf(FixedThreadPool)
48 await pool.destroy()
49 })
50
51 it('Verify that pool cannot be created from a non main thread/process', () => {
8d3782fa
JB
52 expect(
53 () =>
a8884ffd 54 new StubPoolWithIsMain(
7c0ba920 55 numberOfWorkers,
b2fd3f4a 56 './tests/worker-files/thread/testWorker.mjs',
8d3782fa 57 {
041dc05b 58 errorHandler: e => console.error(e)
8d3782fa
JB
59 }
60 )
948faff7 61 ).toThrow(
e695d66f
JB
62 new Error(
63 'Cannot start a pool from a worker with the same type as the pool'
64 )
04f45163 65 )
3ec964d6 66 })
c510fea7 67
bc61cfe6
JB
68 it('Verify that pool statuses properties are set', async () => {
69 const pool = new FixedThreadPool(
70 numberOfWorkers,
b2fd3f4a 71 './tests/worker-files/thread/testWorker.mjs'
bc61cfe6 72 )
bc61cfe6 73 expect(pool.started).toBe(true)
711623b8
JB
74 expect(pool.starting).toBe(false)
75 expect(pool.destroying).toBe(false)
bc61cfe6 76 await pool.destroy()
bc61cfe6
JB
77 })
78
c510fea7 79 it('Verify that filePath is checked', () => {
948faff7 80 expect(() => new FixedThreadPool(numberOfWorkers)).toThrow(
fa015370 81 new Error("Cannot find the worker file 'undefined'")
3d6dd312
JB
82 )
83 expect(
84 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
948faff7 85 ).toThrow(new Error("Cannot find the worker file './dummyWorker.ts'"))
8d3782fa
JB
86 })
87
88 it('Verify that numberOfWorkers is checked', () => {
8003c026
JB
89 expect(
90 () =>
91 new FixedThreadPool(
92 undefined,
b2fd3f4a 93 './tests/worker-files/thread/testWorker.mjs'
8003c026 94 )
948faff7 95 ).toThrow(
e695d66f
JB
96 new Error(
97 'Cannot instantiate a pool without specifying the number of workers'
98 )
8d3782fa
JB
99 )
100 })
101
102 it('Verify that a negative number of workers is checked', () => {
103 expect(
104 () =>
105 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
948faff7 106 ).toThrow(
473c717a
JB
107 new RangeError(
108 'Cannot instantiate a pool with a negative number of workers'
109 )
8d3782fa
JB
110 )
111 })
112
113 it('Verify that a non integer number of workers is checked', () => {
114 expect(
115 () =>
b2fd3f4a 116 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.mjs')
948faff7 117 ).toThrow(
473c717a 118 new TypeError(
0d80593b 119 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
120 )
121 )
c510fea7 122 })
7c0ba920 123
216541b6 124 it('Verify that dynamic pool sizing is checked', () => {
a5ed75b7
JB
125 expect(
126 () =>
127 new DynamicClusterPool(
128 1,
129 undefined,
130 './tests/worker-files/cluster/testWorker.js'
131 )
948faff7 132 ).toThrow(
a5ed75b7
JB
133 new TypeError(
134 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
135 )
136 )
2761efb4
JB
137 expect(
138 () =>
139 new DynamicThreadPool(
140 0.5,
141 1,
b2fd3f4a 142 './tests/worker-files/thread/testWorker.mjs'
2761efb4 143 )
948faff7 144 ).toThrow(
2761efb4
JB
145 new TypeError(
146 'Cannot instantiate a pool with a non safe integer number of workers'
147 )
148 )
149 expect(
150 () =>
151 new DynamicClusterPool(
152 0,
153 0.5,
a5ed75b7 154 './tests/worker-files/cluster/testWorker.js'
2761efb4 155 )
948faff7 156 ).toThrow(
2761efb4
JB
157 new TypeError(
158 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
159 )
160 )
2431bdb4
JB
161 expect(
162 () =>
b2fd3f4a
JB
163 new DynamicThreadPool(
164 2,
165 1,
166 './tests/worker-files/thread/testWorker.mjs'
167 )
948faff7 168 ).toThrow(
2431bdb4
JB
169 new RangeError(
170 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
171 )
172 )
173 expect(
174 () =>
b2fd3f4a
JB
175 new DynamicThreadPool(
176 0,
177 0,
178 './tests/worker-files/thread/testWorker.mjs'
179 )
948faff7 180 ).toThrow(
2431bdb4 181 new RangeError(
213cbac6 182 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
2431bdb4
JB
183 )
184 )
21f710aa
JB
185 expect(
186 () =>
213cbac6
JB
187 new DynamicClusterPool(
188 1,
189 1,
190 './tests/worker-files/cluster/testWorker.js'
191 )
948faff7 192 ).toThrow(
21f710aa 193 new RangeError(
213cbac6 194 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
21f710aa
JB
195 )
196 )
2431bdb4
JB
197 })
198
fd7ebd49 199 it('Verify that pool options are checked', async () => {
7c0ba920
JB
200 let pool = new FixedThreadPool(
201 numberOfWorkers,
b2fd3f4a 202 './tests/worker-files/thread/testWorker.mjs'
7c0ba920 203 )
b5604034 204 expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
47352846
JB
205 expect(pool.opts).toStrictEqual({
206 startWorkers: true,
207 enableEvents: true,
208 restartWorkerOnError: true,
209 enableTasksQueue: false,
210 workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
211 workerChoiceStrategyOptions: {
212 retries: 6,
213 runTime: { median: false },
214 waitTime: { median: false },
215 elu: { median: false }
216 }
8990357d
JB
217 })
218 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
8c0b113f 219 retries: 6,
932fc8be 220 runTime: { median: false },
5df69fab
JB
221 waitTime: { median: false },
222 elu: { median: false }
da309861 223 })
999ef664
JB
224 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
225 .workerChoiceStrategies) {
226 expect(workerChoiceStrategy.opts).toStrictEqual({
227 retries: 6,
228 runTime: { median: false },
229 waitTime: { median: false },
230 elu: { median: false }
231 })
232 }
fd7ebd49 233 await pool.destroy()
73bfd59d 234 const testHandler = () => console.info('test handler executed')
7c0ba920
JB
235 pool = new FixedThreadPool(
236 numberOfWorkers,
b2fd3f4a 237 './tests/worker-files/thread/testWorker.mjs',
7c0ba920 238 {
e4543b14 239 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
49be33fe 240 workerChoiceStrategyOptions: {
932fc8be 241 runTime: { median: true },
fc027381 242 weights: { 0: 300, 1: 200 }
49be33fe 243 },
35cf1c03 244 enableEvents: false,
1f68cede 245 restartWorkerOnError: false,
ff733df7 246 enableTasksQueue: true,
d4aeae5a 247 tasksQueueOptions: { concurrency: 2 },
35cf1c03
JB
248 messageHandler: testHandler,
249 errorHandler: testHandler,
250 onlineHandler: testHandler,
251 exitHandler: testHandler
7c0ba920
JB
252 }
253 )
7c0ba920 254 expect(pool.emitter).toBeUndefined()
47352846
JB
255 expect(pool.opts).toStrictEqual({
256 startWorkers: true,
257 enableEvents: false,
258 restartWorkerOnError: false,
259 enableTasksQueue: true,
260 tasksQueueOptions: {
261 concurrency: 2,
2324f8c9 262 size: Math.pow(numberOfWorkers, 2),
dbd73092 263 taskStealing: true,
47352846
JB
264 tasksStealingOnBackPressure: true
265 },
266 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
267 workerChoiceStrategyOptions: {
268 retries: 6,
269 runTime: { median: true },
270 waitTime: { median: false },
271 elu: { median: false },
272 weights: { 0: 300, 1: 200 }
273 },
274 onlineHandler: testHandler,
275 messageHandler: testHandler,
276 errorHandler: testHandler,
277 exitHandler: testHandler
8990357d
JB
278 })
279 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
8c0b113f 280 retries: 6,
8990357d
JB
281 runTime: { median: true },
282 waitTime: { median: false },
283 elu: { median: false },
fc027381 284 weights: { 0: 300, 1: 200 }
da309861 285 })
999ef664
JB
286 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
287 .workerChoiceStrategies) {
288 expect(workerChoiceStrategy.opts).toStrictEqual({
289 retries: 6,
290 runTime: { median: true },
291 waitTime: { median: false },
292 elu: { median: false },
293 weights: { 0: 300, 1: 200 }
294 })
295 }
fd7ebd49 296 await pool.destroy()
7c0ba920
JB
297 })
298
fe291b64 299 it('Verify that pool options are validated', () => {
d4aeae5a
JB
300 expect(
301 () =>
302 new FixedThreadPool(
303 numberOfWorkers,
b2fd3f4a 304 './tests/worker-files/thread/testWorker.mjs',
d4aeae5a 305 {
f0d7f803 306 workerChoiceStrategy: 'invalidStrategy'
d4aeae5a
JB
307 }
308 )
948faff7 309 ).toThrow(new Error("Invalid worker choice strategy 'invalidStrategy'"))
7e653ee0
JB
310 expect(
311 () =>
312 new FixedThreadPool(
313 numberOfWorkers,
b2fd3f4a 314 './tests/worker-files/thread/testWorker.mjs',
7e653ee0
JB
315 {
316 workerChoiceStrategyOptions: {
8c0b113f 317 retries: 'invalidChoiceRetries'
7e653ee0
JB
318 }
319 }
320 )
948faff7 321 ).toThrow(
7e653ee0 322 new TypeError(
8c0b113f 323 'Invalid worker choice strategy options: retries must be an integer'
7e653ee0
JB
324 )
325 )
326 expect(
327 () =>
328 new FixedThreadPool(
329 numberOfWorkers,
b2fd3f4a 330 './tests/worker-files/thread/testWorker.mjs',
7e653ee0
JB
331 {
332 workerChoiceStrategyOptions: {
8c0b113f 333 retries: -1
7e653ee0
JB
334 }
335 }
336 )
948faff7 337 ).toThrow(
7e653ee0 338 new RangeError(
8c0b113f 339 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
7e653ee0
JB
340 )
341 )
49be33fe
JB
342 expect(
343 () =>
344 new FixedThreadPool(
345 numberOfWorkers,
b2fd3f4a 346 './tests/worker-files/thread/testWorker.mjs',
49be33fe
JB
347 {
348 workerChoiceStrategyOptions: { weights: {} }
349 }
350 )
948faff7 351 ).toThrow(
8735b4e5
JB
352 new Error(
353 'Invalid worker choice strategy options: must have a weight for each worker node'
354 )
49be33fe 355 )
f0d7f803
JB
356 expect(
357 () =>
358 new FixedThreadPool(
359 numberOfWorkers,
b2fd3f4a 360 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
361 {
362 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
363 }
364 )
948faff7 365 ).toThrow(
8735b4e5
JB
366 new Error(
367 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
368 )
f0d7f803 369 )
5c4c2dee
JB
370 expect(
371 () =>
372 new FixedThreadPool(
373 numberOfWorkers,
b2fd3f4a 374 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
375 {
376 enableTasksQueue: true,
377 tasksQueueOptions: 'invalidTasksQueueOptions'
378 }
379 )
948faff7 380 ).toThrow(
5c4c2dee
JB
381 new TypeError('Invalid tasks queue options: must be a plain object')
382 )
f0d7f803
JB
383 expect(
384 () =>
385 new FixedThreadPool(
386 numberOfWorkers,
b2fd3f4a 387 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
388 {
389 enableTasksQueue: true,
390 tasksQueueOptions: { concurrency: 0 }
391 }
392 )
948faff7 393 ).toThrow(
e695d66f 394 new RangeError(
20c6f652 395 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
8735b4e5
JB
396 )
397 )
f0d7f803
JB
398 expect(
399 () =>
400 new FixedThreadPool(
401 numberOfWorkers,
b2fd3f4a 402 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
403 {
404 enableTasksQueue: true,
5c4c2dee 405 tasksQueueOptions: { concurrency: -1 }
f0d7f803
JB
406 }
407 )
948faff7 408 ).toThrow(
5c4c2dee
JB
409 new RangeError(
410 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
411 )
8735b4e5 412 )
f0d7f803
JB
413 expect(
414 () =>
415 new FixedThreadPool(
416 numberOfWorkers,
b2fd3f4a 417 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
418 {
419 enableTasksQueue: true,
420 tasksQueueOptions: { concurrency: 0.2 }
421 }
422 )
948faff7 423 ).toThrow(
20c6f652 424 new TypeError('Invalid worker node tasks concurrency: must be an integer')
8735b4e5 425 )
5c4c2dee
JB
426 expect(
427 () =>
428 new FixedThreadPool(
429 numberOfWorkers,
b2fd3f4a 430 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
431 {
432 enableTasksQueue: true,
433 tasksQueueOptions: { size: 0 }
434 }
435 )
948faff7 436 ).toThrow(
5c4c2dee
JB
437 new RangeError(
438 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
439 )
440 )
441 expect(
442 () =>
443 new FixedThreadPool(
444 numberOfWorkers,
b2fd3f4a 445 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
446 {
447 enableTasksQueue: true,
448 tasksQueueOptions: { size: -1 }
449 }
450 )
948faff7 451 ).toThrow(
5c4c2dee
JB
452 new RangeError(
453 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
454 )
455 )
456 expect(
457 () =>
458 new FixedThreadPool(
459 numberOfWorkers,
b2fd3f4a 460 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
461 {
462 enableTasksQueue: true,
463 tasksQueueOptions: { size: 0.2 }
464 }
465 )
948faff7 466 ).toThrow(
5c4c2dee
JB
467 new TypeError('Invalid worker node tasks queue size: must be an integer')
468 )
d4aeae5a
JB
469 })
470
2431bdb4 471 it('Verify that pool worker choice strategy options can be set', async () => {
a20f0ba5
JB
472 const pool = new FixedThreadPool(
473 numberOfWorkers,
b2fd3f4a 474 './tests/worker-files/thread/testWorker.mjs',
a20f0ba5
JB
475 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
476 )
477 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
8c0b113f 478 retries: 6,
8990357d
JB
479 runTime: { median: false },
480 waitTime: { median: false },
481 elu: { median: false }
482 })
483 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
8c0b113f 484 retries: 6,
932fc8be 485 runTime: { median: false },
5df69fab
JB
486 waitTime: { median: false },
487 elu: { median: false }
a20f0ba5
JB
488 })
489 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
490 .workerChoiceStrategies) {
86bf340d 491 expect(workerChoiceStrategy.opts).toStrictEqual({
8c0b113f 492 retries: 6,
932fc8be 493 runTime: { median: false },
5df69fab
JB
494 waitTime: { median: false },
495 elu: { median: false }
86bf340d 496 })
a20f0ba5 497 }
87de9ff5
JB
498 expect(
499 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
500 ).toStrictEqual({
932fc8be
JB
501 runTime: {
502 aggregate: true,
503 average: true,
504 median: false
505 },
506 waitTime: {
507 aggregate: false,
508 average: false,
509 median: false
510 },
5df69fab 511 elu: {
9adcefab
JB
512 aggregate: true,
513 average: true,
5df69fab
JB
514 median: false
515 }
86bf340d 516 })
9adcefab
JB
517 pool.setWorkerChoiceStrategyOptions({
518 runTime: { median: true },
519 elu: { median: true }
520 })
a20f0ba5 521 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
8c0b113f 522 retries: 6,
9adcefab 523 runTime: { median: true },
8990357d
JB
524 waitTime: { median: false },
525 elu: { median: true }
526 })
527 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
8c0b113f 528 retries: 6,
8990357d
JB
529 runTime: { median: true },
530 waitTime: { median: false },
9adcefab 531 elu: { median: true }
a20f0ba5
JB
532 })
533 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
534 .workerChoiceStrategies) {
932fc8be 535 expect(workerChoiceStrategy.opts).toStrictEqual({
8c0b113f 536 retries: 6,
9adcefab 537 runTime: { median: true },
8990357d 538 waitTime: { median: false },
9adcefab 539 elu: { median: true }
932fc8be 540 })
a20f0ba5 541 }
87de9ff5
JB
542 expect(
543 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
544 ).toStrictEqual({
932fc8be
JB
545 runTime: {
546 aggregate: true,
547 average: false,
548 median: true
549 },
550 waitTime: {
551 aggregate: false,
552 average: false,
553 median: false
554 },
5df69fab 555 elu: {
9adcefab 556 aggregate: true,
5df69fab 557 average: false,
9adcefab 558 median: true
5df69fab 559 }
86bf340d 560 })
9adcefab
JB
561 pool.setWorkerChoiceStrategyOptions({
562 runTime: { median: false },
563 elu: { median: false }
564 })
a20f0ba5 565 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
8c0b113f 566 retries: 6,
8990357d
JB
567 runTime: { median: false },
568 waitTime: { median: false },
569 elu: { median: false }
570 })
571 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
8c0b113f 572 retries: 6,
9adcefab 573 runTime: { median: false },
8990357d 574 waitTime: { median: false },
9adcefab 575 elu: { median: false }
a20f0ba5
JB
576 })
577 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
578 .workerChoiceStrategies) {
932fc8be 579 expect(workerChoiceStrategy.opts).toStrictEqual({
8c0b113f 580 retries: 6,
9adcefab 581 runTime: { median: false },
8990357d 582 waitTime: { median: false },
9adcefab 583 elu: { median: false }
932fc8be 584 })
a20f0ba5 585 }
87de9ff5
JB
586 expect(
587 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
588 ).toStrictEqual({
932fc8be
JB
589 runTime: {
590 aggregate: true,
591 average: true,
592 median: false
593 },
594 waitTime: {
595 aggregate: false,
596 average: false,
597 median: false
598 },
5df69fab 599 elu: {
9adcefab
JB
600 aggregate: true,
601 average: true,
5df69fab
JB
602 median: false
603 }
86bf340d 604 })
1f95d544
JB
605 expect(() =>
606 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
948faff7 607 ).toThrow(
8735b4e5
JB
608 new TypeError(
609 'Invalid worker choice strategy options: must be a plain object'
610 )
1f95d544 611 )
7e653ee0
JB
612 expect(() =>
613 pool.setWorkerChoiceStrategyOptions({
8c0b113f 614 retries: 'invalidChoiceRetries'
7e653ee0 615 })
948faff7 616 ).toThrow(
7e653ee0 617 new TypeError(
8c0b113f 618 'Invalid worker choice strategy options: retries must be an integer'
7e653ee0
JB
619 )
620 )
948faff7 621 expect(() => pool.setWorkerChoiceStrategyOptions({ retries: -1 })).toThrow(
7e653ee0 622 new RangeError(
8c0b113f 623 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
7e653ee0
JB
624 )
625 )
948faff7 626 expect(() => pool.setWorkerChoiceStrategyOptions({ weights: {} })).toThrow(
8735b4e5
JB
627 new Error(
628 'Invalid worker choice strategy options: must have a weight for each worker node'
629 )
1f95d544
JB
630 )
631 expect(() =>
632 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
948faff7 633 ).toThrow(
8735b4e5
JB
634 new Error(
635 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
636 )
1f95d544 637 )
a20f0ba5
JB
638 await pool.destroy()
639 })
640
2431bdb4 641 it('Verify that pool tasks queue can be enabled/disabled', async () => {
a20f0ba5
JB
642 const pool = new FixedThreadPool(
643 numberOfWorkers,
b2fd3f4a 644 './tests/worker-files/thread/testWorker.mjs'
a20f0ba5
JB
645 )
646 expect(pool.opts.enableTasksQueue).toBe(false)
647 expect(pool.opts.tasksQueueOptions).toBeUndefined()
648 pool.enableTasksQueue(true)
649 expect(pool.opts.enableTasksQueue).toBe(true)
20c6f652
JB
650 expect(pool.opts.tasksQueueOptions).toStrictEqual({
651 concurrency: 1,
2324f8c9 652 size: Math.pow(numberOfWorkers, 2),
dbd73092 653 taskStealing: true,
47352846 654 tasksStealingOnBackPressure: true
20c6f652 655 })
a20f0ba5
JB
656 pool.enableTasksQueue(true, { concurrency: 2 })
657 expect(pool.opts.enableTasksQueue).toBe(true)
20c6f652
JB
658 expect(pool.opts.tasksQueueOptions).toStrictEqual({
659 concurrency: 2,
2324f8c9 660 size: Math.pow(numberOfWorkers, 2),
dbd73092 661 taskStealing: true,
47352846 662 tasksStealingOnBackPressure: true
20c6f652 663 })
a20f0ba5
JB
664 pool.enableTasksQueue(false)
665 expect(pool.opts.enableTasksQueue).toBe(false)
666 expect(pool.opts.tasksQueueOptions).toBeUndefined()
667 await pool.destroy()
668 })
669
2431bdb4 670 it('Verify that pool tasks queue options can be set', async () => {
a20f0ba5
JB
671 const pool = new FixedThreadPool(
672 numberOfWorkers,
b2fd3f4a 673 './tests/worker-files/thread/testWorker.mjs',
a20f0ba5
JB
674 { enableTasksQueue: true }
675 )
20c6f652
JB
676 expect(pool.opts.tasksQueueOptions).toStrictEqual({
677 concurrency: 1,
2324f8c9 678 size: Math.pow(numberOfWorkers, 2),
dbd73092 679 taskStealing: true,
47352846 680 tasksStealingOnBackPressure: true
20c6f652 681 })
d6ca1416 682 for (const workerNode of pool.workerNodes) {
2324f8c9
JB
683 expect(workerNode.tasksQueueBackPressureSize).toBe(
684 pool.opts.tasksQueueOptions.size
685 )
d6ca1416
JB
686 }
687 pool.setTasksQueueOptions({
688 concurrency: 2,
2324f8c9 689 size: 2,
d6ca1416
JB
690 taskStealing: false,
691 tasksStealingOnBackPressure: false
692 })
20c6f652
JB
693 expect(pool.opts.tasksQueueOptions).toStrictEqual({
694 concurrency: 2,
2324f8c9 695 size: 2,
d6ca1416
JB
696 taskStealing: false,
697 tasksStealingOnBackPressure: false
698 })
699 for (const workerNode of pool.workerNodes) {
2324f8c9
JB
700 expect(workerNode.tasksQueueBackPressureSize).toBe(
701 pool.opts.tasksQueueOptions.size
702 )
d6ca1416
JB
703 }
704 pool.setTasksQueueOptions({
705 concurrency: 1,
706 taskStealing: true,
707 tasksStealingOnBackPressure: true
708 })
709 expect(pool.opts.tasksQueueOptions).toStrictEqual({
710 concurrency: 1,
2324f8c9 711 size: Math.pow(numberOfWorkers, 2),
dbd73092 712 taskStealing: true,
47352846 713 tasksStealingOnBackPressure: true
20c6f652 714 })
d6ca1416 715 for (const workerNode of pool.workerNodes) {
2324f8c9
JB
716 expect(workerNode.tasksQueueBackPressureSize).toBe(
717 pool.opts.tasksQueueOptions.size
718 )
d6ca1416 719 }
948faff7 720 expect(() => pool.setTasksQueueOptions('invalidTasksQueueOptions')).toThrow(
8735b4e5
JB
721 new TypeError('Invalid tasks queue options: must be a plain object')
722 )
948faff7 723 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrow(
e695d66f 724 new RangeError(
20c6f652 725 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
8735b4e5
JB
726 )
727 )
948faff7 728 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrow(
e695d66f 729 new RangeError(
20c6f652 730 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
8735b4e5 731 )
a20f0ba5 732 )
948faff7 733 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrow(
20c6f652
JB
734 new TypeError('Invalid worker node tasks concurrency: must be an integer')
735 )
948faff7 736 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrow(
20c6f652 737 new RangeError(
68dbcdc0 738 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
20c6f652
JB
739 )
740 )
948faff7 741 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrow(
20c6f652 742 new RangeError(
68dbcdc0 743 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
20c6f652
JB
744 )
745 )
948faff7 746 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrow(
68dbcdc0 747 new TypeError('Invalid worker node tasks queue size: must be an integer')
f0d7f803 748 )
a20f0ba5
JB
749 await pool.destroy()
750 })
751
6b27d407
JB
752 it('Verify that pool info is set', async () => {
753 let pool = new FixedThreadPool(
754 numberOfWorkers,
b2fd3f4a 755 './tests/worker-files/thread/testWorker.mjs'
6b27d407 756 )
2dca6cab
JB
757 expect(pool.info).toStrictEqual({
758 version,
759 type: PoolTypes.fixed,
760 worker: WorkerTypes.thread,
47352846 761 started: true,
2dca6cab
JB
762 ready: true,
763 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
764 minSize: numberOfWorkers,
765 maxSize: numberOfWorkers,
766 workerNodes: numberOfWorkers,
767 idleWorkerNodes: numberOfWorkers,
768 busyWorkerNodes: 0,
769 executedTasks: 0,
770 executingTasks: 0,
2dca6cab
JB
771 failedTasks: 0
772 })
6b27d407
JB
773 await pool.destroy()
774 pool = new DynamicClusterPool(
2431bdb4 775 Math.floor(numberOfWorkers / 2),
6b27d407 776 numberOfWorkers,
ecdfbdc0 777 './tests/worker-files/cluster/testWorker.js'
6b27d407 778 )
2dca6cab
JB
779 expect(pool.info).toStrictEqual({
780 version,
781 type: PoolTypes.dynamic,
782 worker: WorkerTypes.cluster,
47352846 783 started: true,
2dca6cab
JB
784 ready: true,
785 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
786 minSize: Math.floor(numberOfWorkers / 2),
787 maxSize: numberOfWorkers,
788 workerNodes: Math.floor(numberOfWorkers / 2),
789 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
790 busyWorkerNodes: 0,
791 executedTasks: 0,
792 executingTasks: 0,
2dca6cab
JB
793 failedTasks: 0
794 })
6b27d407
JB
795 await pool.destroy()
796 })
797
2431bdb4 798 it('Verify that pool worker tasks usage are initialized', async () => {
bf9549ae
JB
799 const pool = new FixedClusterPool(
800 numberOfWorkers,
801 './tests/worker-files/cluster/testWorker.js'
802 )
f06e48d8 803 for (const workerNode of pool.workerNodes) {
47352846 804 expect(workerNode).toBeInstanceOf(WorkerNode)
465b2940 805 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
806 tasks: {
807 executed: 0,
808 executing: 0,
809 queued: 0,
df593701 810 maxQueued: 0,
463226a4 811 sequentiallyStolen: 0,
68cbdc84 812 stolen: 0,
a4e07f72
JB
813 failed: 0
814 },
815 runTime: {
4ba4c7f9 816 history: new CircularArray()
a4e07f72
JB
817 },
818 waitTime: {
4ba4c7f9 819 history: new CircularArray()
a4e07f72 820 },
5df69fab
JB
821 elu: {
822 idle: {
4ba4c7f9 823 history: new CircularArray()
5df69fab
JB
824 },
825 active: {
4ba4c7f9 826 history: new CircularArray()
f7510105 827 }
5df69fab 828 }
86bf340d 829 })
f06e48d8
JB
830 }
831 await pool.destroy()
832 })
833
2431bdb4
JB
834 it('Verify that pool worker tasks queue are initialized', async () => {
835 let pool = new FixedClusterPool(
f06e48d8
JB
836 numberOfWorkers,
837 './tests/worker-files/cluster/testWorker.js'
838 )
839 for (const workerNode of pool.workerNodes) {
47352846 840 expect(workerNode).toBeInstanceOf(WorkerNode)
574b351d 841 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
4d8bf9e4 842 expect(workerNode.tasksQueue.size).toBe(0)
9c16fb4b 843 expect(workerNode.tasksQueue.maxSize).toBe(0)
bf9549ae 844 }
fd7ebd49 845 await pool.destroy()
2431bdb4
JB
846 pool = new DynamicThreadPool(
847 Math.floor(numberOfWorkers / 2),
848 numberOfWorkers,
b2fd3f4a 849 './tests/worker-files/thread/testWorker.mjs'
2431bdb4
JB
850 )
851 for (const workerNode of pool.workerNodes) {
47352846 852 expect(workerNode).toBeInstanceOf(WorkerNode)
574b351d 853 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
2431bdb4
JB
854 expect(workerNode.tasksQueue.size).toBe(0)
855 expect(workerNode.tasksQueue.maxSize).toBe(0)
856 }
213cbac6 857 await pool.destroy()
2431bdb4
JB
858 })
859
860 it('Verify that pool worker info are initialized', async () => {
861 let pool = new FixedClusterPool(
862 numberOfWorkers,
863 './tests/worker-files/cluster/testWorker.js'
864 )
2dca6cab 865 for (const workerNode of pool.workerNodes) {
47352846 866 expect(workerNode).toBeInstanceOf(WorkerNode)
2dca6cab
JB
867 expect(workerNode.info).toStrictEqual({
868 id: expect.any(Number),
869 type: WorkerTypes.cluster,
870 dynamic: false,
871 ready: true
872 })
873 }
2431bdb4
JB
874 await pool.destroy()
875 pool = new DynamicThreadPool(
876 Math.floor(numberOfWorkers / 2),
877 numberOfWorkers,
b2fd3f4a 878 './tests/worker-files/thread/testWorker.mjs'
2431bdb4 879 )
2dca6cab 880 for (const workerNode of pool.workerNodes) {
47352846 881 expect(workerNode).toBeInstanceOf(WorkerNode)
2dca6cab
JB
882 expect(workerNode.info).toStrictEqual({
883 id: expect.any(Number),
884 type: WorkerTypes.thread,
885 dynamic: false,
7884d183 886 ready: true
2dca6cab
JB
887 })
888 }
213cbac6 889 await pool.destroy()
bf9549ae
JB
890 })
891
711623b8
JB
892 it('Verify that pool statuses are checked at start or destroy', async () => {
893 const pool = new FixedThreadPool(
894 numberOfWorkers,
895 './tests/worker-files/thread/testWorker.mjs'
896 )
897 expect(pool.info.started).toBe(true)
898 expect(pool.info.ready).toBe(true)
899 expect(() => pool.start()).toThrow(
900 new Error('Cannot start an already started pool')
901 )
902 await pool.destroy()
903 expect(pool.info.started).toBe(false)
904 expect(pool.info.ready).toBe(false)
905 await expect(pool.destroy()).rejects.toThrow(
906 new Error('Cannot destroy an already destroyed pool')
907 )
908 })
909
47352846
JB
910 it('Verify that pool can be started after initialization', async () => {
911 const pool = new FixedClusterPool(
912 numberOfWorkers,
913 './tests/worker-files/cluster/testWorker.js',
914 {
915 startWorkers: false
916 }
917 )
918 expect(pool.info.started).toBe(false)
919 expect(pool.info.ready).toBe(false)
55082af9 920 expect(pool.readyEventEmitted).toBe(false)
47352846 921 expect(pool.workerNodes).toStrictEqual([])
948faff7 922 await expect(pool.execute()).rejects.toThrow(
47352846
JB
923 new Error('Cannot execute a task on not started pool')
924 )
925 pool.start()
926 expect(pool.info.started).toBe(true)
927 expect(pool.info.ready).toBe(true)
55082af9
JB
928 await waitPoolEvents(pool, PoolEvents.ready, 1)
929 expect(pool.readyEventEmitted).toBe(true)
47352846
JB
930 expect(pool.workerNodes.length).toBe(numberOfWorkers)
931 for (const workerNode of pool.workerNodes) {
932 expect(workerNode).toBeInstanceOf(WorkerNode)
933 }
934 await pool.destroy()
935 })
936
9d2d0da1
JB
937 it('Verify that pool execute() arguments are checked', async () => {
938 const pool = new FixedClusterPool(
939 numberOfWorkers,
940 './tests/worker-files/cluster/testWorker.js'
941 )
948faff7 942 await expect(pool.execute(undefined, 0)).rejects.toThrow(
9d2d0da1
JB
943 new TypeError('name argument must be a string')
944 )
948faff7 945 await expect(pool.execute(undefined, '')).rejects.toThrow(
9d2d0da1
JB
946 new TypeError('name argument must not be an empty string')
947 )
948faff7 948 await expect(pool.execute(undefined, undefined, {})).rejects.toThrow(
9d2d0da1
JB
949 new TypeError('transferList argument must be an array')
950 )
951 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
952 "Task function 'unknown' not found"
953 )
954 await pool.destroy()
948faff7 955 await expect(pool.execute()).rejects.toThrow(
47352846 956 new Error('Cannot execute a task on not started pool')
9d2d0da1
JB
957 )
958 })
959
2431bdb4 960 it('Verify that pool worker tasks usage are computed', async () => {
bf9549ae
JB
961 const pool = new FixedClusterPool(
962 numberOfWorkers,
963 './tests/worker-files/cluster/testWorker.js'
964 )
09c2d0d3 965 const promises = new Set()
fc027381
JB
966 const maxMultiplier = 2
967 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
09c2d0d3 968 promises.add(pool.execute())
bf9549ae 969 }
f06e48d8 970 for (const workerNode of pool.workerNodes) {
465b2940 971 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
972 tasks: {
973 executed: 0,
974 executing: maxMultiplier,
975 queued: 0,
df593701 976 maxQueued: 0,
463226a4 977 sequentiallyStolen: 0,
68cbdc84 978 stolen: 0,
a4e07f72
JB
979 failed: 0
980 },
981 runTime: {
a4e07f72
JB
982 history: expect.any(CircularArray)
983 },
984 waitTime: {
a4e07f72
JB
985 history: expect.any(CircularArray)
986 },
5df69fab
JB
987 elu: {
988 idle: {
5df69fab
JB
989 history: expect.any(CircularArray)
990 },
991 active: {
5df69fab 992 history: expect.any(CircularArray)
f7510105 993 }
5df69fab 994 }
86bf340d 995 })
bf9549ae
JB
996 }
997 await Promise.all(promises)
f06e48d8 998 for (const workerNode of pool.workerNodes) {
465b2940 999 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
1000 tasks: {
1001 executed: maxMultiplier,
1002 executing: 0,
1003 queued: 0,
df593701 1004 maxQueued: 0,
463226a4 1005 sequentiallyStolen: 0,
68cbdc84 1006 stolen: 0,
a4e07f72
JB
1007 failed: 0
1008 },
1009 runTime: {
a4e07f72
JB
1010 history: expect.any(CircularArray)
1011 },
1012 waitTime: {
a4e07f72
JB
1013 history: expect.any(CircularArray)
1014 },
5df69fab
JB
1015 elu: {
1016 idle: {
5df69fab
JB
1017 history: expect.any(CircularArray)
1018 },
1019 active: {
5df69fab 1020 history: expect.any(CircularArray)
f7510105 1021 }
5df69fab 1022 }
86bf340d 1023 })
bf9549ae 1024 }
fd7ebd49 1025 await pool.destroy()
bf9549ae
JB
1026 })
1027
2431bdb4 1028 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
7fd82a1c 1029 const pool = new DynamicThreadPool(
2431bdb4 1030 Math.floor(numberOfWorkers / 2),
8f4878b7 1031 numberOfWorkers,
b2fd3f4a 1032 './tests/worker-files/thread/testWorker.mjs'
9e619829 1033 )
09c2d0d3 1034 const promises = new Set()
ee9f5295
JB
1035 const maxMultiplier = 2
1036 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
09c2d0d3 1037 promises.add(pool.execute())
9e619829
JB
1038 }
1039 await Promise.all(promises)
f06e48d8 1040 for (const workerNode of pool.workerNodes) {
465b2940 1041 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
1042 tasks: {
1043 executed: expect.any(Number),
1044 executing: 0,
1045 queued: 0,
df593701 1046 maxQueued: 0,
463226a4 1047 sequentiallyStolen: 0,
68cbdc84 1048 stolen: 0,
a4e07f72
JB
1049 failed: 0
1050 },
1051 runTime: {
a4e07f72
JB
1052 history: expect.any(CircularArray)
1053 },
1054 waitTime: {
a4e07f72
JB
1055 history: expect.any(CircularArray)
1056 },
5df69fab
JB
1057 elu: {
1058 idle: {
5df69fab
JB
1059 history: expect.any(CircularArray)
1060 },
1061 active: {
5df69fab 1062 history: expect.any(CircularArray)
f7510105 1063 }
5df69fab 1064 }
86bf340d 1065 })
465b2940 1066 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
94407def
JB
1067 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1068 numberOfWorkers * maxMultiplier
1069 )
b97d82d8
JB
1070 expect(workerNode.usage.runTime.history.length).toBe(0)
1071 expect(workerNode.usage.waitTime.history.length).toBe(0)
1072 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1073 expect(workerNode.usage.elu.active.history.length).toBe(0)
9e619829
JB
1074 }
1075 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
f06e48d8 1076 for (const workerNode of pool.workerNodes) {
465b2940 1077 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
1078 tasks: {
1079 executed: 0,
1080 executing: 0,
1081 queued: 0,
df593701 1082 maxQueued: 0,
463226a4 1083 sequentiallyStolen: 0,
68cbdc84 1084 stolen: 0,
a4e07f72
JB
1085 failed: 0
1086 },
1087 runTime: {
a4e07f72
JB
1088 history: expect.any(CircularArray)
1089 },
1090 waitTime: {
a4e07f72
JB
1091 history: expect.any(CircularArray)
1092 },
5df69fab
JB
1093 elu: {
1094 idle: {
5df69fab
JB
1095 history: expect.any(CircularArray)
1096 },
1097 active: {
5df69fab 1098 history: expect.any(CircularArray)
f7510105 1099 }
5df69fab 1100 }
86bf340d 1101 })
465b2940
JB
1102 expect(workerNode.usage.runTime.history.length).toBe(0)
1103 expect(workerNode.usage.waitTime.history.length).toBe(0)
b97d82d8
JB
1104 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1105 expect(workerNode.usage.elu.active.history.length).toBe(0)
ee11a4a2 1106 }
fd7ebd49 1107 await pool.destroy()
ee11a4a2
JB
1108 })
1109
a1763c54
JB
1110 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1111 const pool = new DynamicClusterPool(
2431bdb4 1112 Math.floor(numberOfWorkers / 2),
164d950a 1113 numberOfWorkers,
a1763c54 1114 './tests/worker-files/cluster/testWorker.js'
164d950a 1115 )
c726f66c 1116 expect(pool.emitter.eventNames()).toStrictEqual([])
d46660cd 1117 let poolInfo
a1763c54 1118 let poolReady = 0
041dc05b 1119 pool.emitter.on(PoolEvents.ready, info => {
a1763c54 1120 ++poolReady
d46660cd
JB
1121 poolInfo = info
1122 })
a1763c54 1123 await waitPoolEvents(pool, PoolEvents.ready, 1)
c726f66c 1124 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
a1763c54 1125 expect(poolReady).toBe(1)
d46660cd 1126 expect(poolInfo).toStrictEqual({
23ccf9d7 1127 version,
d46660cd 1128 type: PoolTypes.dynamic,
a1763c54 1129 worker: WorkerTypes.cluster,
47352846 1130 started: true,
a1763c54 1131 ready: true,
2431bdb4
JB
1132 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1133 minSize: expect.any(Number),
1134 maxSize: expect.any(Number),
1135 workerNodes: expect.any(Number),
1136 idleWorkerNodes: expect.any(Number),
1137 busyWorkerNodes: expect.any(Number),
1138 executedTasks: expect.any(Number),
1139 executingTasks: expect.any(Number),
2431bdb4
JB
1140 failedTasks: expect.any(Number)
1141 })
1142 await pool.destroy()
1143 })
1144
a1763c54
JB
1145 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1146 const pool = new FixedThreadPool(
2431bdb4 1147 numberOfWorkers,
b2fd3f4a 1148 './tests/worker-files/thread/testWorker.mjs'
2431bdb4 1149 )
c726f66c 1150 expect(pool.emitter.eventNames()).toStrictEqual([])
a1763c54
JB
1151 const promises = new Set()
1152 let poolBusy = 0
2431bdb4 1153 let poolInfo
041dc05b 1154 pool.emitter.on(PoolEvents.busy, info => {
a1763c54 1155 ++poolBusy
2431bdb4
JB
1156 poolInfo = info
1157 })
c726f66c 1158 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
a1763c54
JB
1159 for (let i = 0; i < numberOfWorkers * 2; i++) {
1160 promises.add(pool.execute())
1161 }
1162 await Promise.all(promises)
1163 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1164 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1165 expect(poolBusy).toBe(numberOfWorkers + 1)
2431bdb4
JB
1166 expect(poolInfo).toStrictEqual({
1167 version,
a1763c54
JB
1168 type: PoolTypes.fixed,
1169 worker: WorkerTypes.thread,
47352846
JB
1170 started: true,
1171 ready: true,
2431bdb4 1172 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
d46660cd
JB
1173 minSize: expect.any(Number),
1174 maxSize: expect.any(Number),
1175 workerNodes: expect.any(Number),
1176 idleWorkerNodes: expect.any(Number),
1177 busyWorkerNodes: expect.any(Number),
a4e07f72
JB
1178 executedTasks: expect.any(Number),
1179 executingTasks: expect.any(Number),
a4e07f72 1180 failedTasks: expect.any(Number)
d46660cd 1181 })
164d950a
JB
1182 await pool.destroy()
1183 })
1184
a1763c54
JB
1185 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1186 const pool = new DynamicThreadPool(
1187 Math.floor(numberOfWorkers / 2),
7c0ba920 1188 numberOfWorkers,
b2fd3f4a 1189 './tests/worker-files/thread/testWorker.mjs'
7c0ba920 1190 )
c726f66c 1191 expect(pool.emitter.eventNames()).toStrictEqual([])
09c2d0d3 1192 const promises = new Set()
a1763c54 1193 let poolFull = 0
d46660cd 1194 let poolInfo
041dc05b 1195 pool.emitter.on(PoolEvents.full, info => {
a1763c54 1196 ++poolFull
d46660cd
JB
1197 poolInfo = info
1198 })
c726f66c 1199 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.full])
7c0ba920 1200 for (let i = 0; i < numberOfWorkers * 2; i++) {
f5d14e90 1201 promises.add(pool.execute())
7c0ba920 1202 }
cf597bc5 1203 await Promise.all(promises)
33e6bb4c 1204 expect(poolFull).toBe(1)
d46660cd 1205 expect(poolInfo).toStrictEqual({
23ccf9d7 1206 version,
a1763c54 1207 type: PoolTypes.dynamic,
d46660cd 1208 worker: WorkerTypes.thread,
47352846
JB
1209 started: true,
1210 ready: true,
2431bdb4 1211 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
8735b4e5
JB
1212 minSize: expect.any(Number),
1213 maxSize: expect.any(Number),
1214 workerNodes: expect.any(Number),
1215 idleWorkerNodes: expect.any(Number),
1216 busyWorkerNodes: expect.any(Number),
1217 executedTasks: expect.any(Number),
1218 executingTasks: expect.any(Number),
1219 failedTasks: expect.any(Number)
1220 })
1221 await pool.destroy()
1222 })
1223
3e8611a8 1224 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
b1aae695 1225 const pool = new FixedThreadPool(
8735b4e5 1226 numberOfWorkers,
b2fd3f4a 1227 './tests/worker-files/thread/testWorker.mjs',
8735b4e5
JB
1228 {
1229 enableTasksQueue: true
1230 }
1231 )
a074ffee 1232 stub(pool, 'hasBackPressure').returns(true)
c726f66c 1233 expect(pool.emitter.eventNames()).toStrictEqual([])
8735b4e5
JB
1234 const promises = new Set()
1235 let poolBackPressure = 0
1236 let poolInfo
041dc05b 1237 pool.emitter.on(PoolEvents.backPressure, info => {
8735b4e5
JB
1238 ++poolBackPressure
1239 poolInfo = info
1240 })
c726f66c 1241 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.backPressure])
033f1776 1242 for (let i = 0; i < numberOfWorkers + 1; i++) {
8735b4e5
JB
1243 promises.add(pool.execute())
1244 }
1245 await Promise.all(promises)
033f1776 1246 expect(poolBackPressure).toBe(1)
8735b4e5
JB
1247 expect(poolInfo).toStrictEqual({
1248 version,
3e8611a8 1249 type: PoolTypes.fixed,
8735b4e5 1250 worker: WorkerTypes.thread,
47352846
JB
1251 started: true,
1252 ready: true,
8735b4e5 1253 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
d46660cd
JB
1254 minSize: expect.any(Number),
1255 maxSize: expect.any(Number),
1256 workerNodes: expect.any(Number),
1257 idleWorkerNodes: expect.any(Number),
1258 busyWorkerNodes: expect.any(Number),
a4e07f72
JB
1259 executedTasks: expect.any(Number),
1260 executingTasks: expect.any(Number),
3e8611a8
JB
1261 maxQueuedTasks: expect.any(Number),
1262 queuedTasks: expect.any(Number),
1263 backPressure: true,
68cbdc84 1264 stolenTasks: expect.any(Number),
a4e07f72 1265 failedTasks: expect.any(Number)
d46660cd 1266 })
3e8611a8 1267 expect(pool.hasBackPressure.called).toBe(true)
fd7ebd49 1268 await pool.destroy()
7c0ba920 1269 })
70a4f5ea 1270
f18fd12b
JB
1271 it('Verify that pool asynchronous resource track tasks execution', async () => {
1272 let taskAsyncId
1273 let initCalls = 0
1274 let beforeCalls = 0
1275 let afterCalls = 0
1276 let resolveCalls = 0
1277 const hook = createHook({
1278 init (asyncId, type) {
1279 if (type === 'poolifier:task') {
1280 initCalls++
1281 taskAsyncId = asyncId
1282 }
1283 },
1284 before (asyncId) {
1285 if (asyncId === taskAsyncId) beforeCalls++
1286 },
1287 after (asyncId) {
1288 if (asyncId === taskAsyncId) afterCalls++
1289 },
1290 promiseResolve () {
1291 if (executionAsyncId() === taskAsyncId) resolveCalls++
1292 }
1293 })
f18fd12b
JB
1294 const pool = new FixedThreadPool(
1295 numberOfWorkers,
1296 './tests/worker-files/thread/testWorker.mjs'
1297 )
8954c0a3 1298 hook.enable()
f18fd12b
JB
1299 await pool.execute()
1300 hook.disable()
1301 expect(initCalls).toBe(1)
1302 expect(beforeCalls).toBe(1)
1303 expect(afterCalls).toBe(1)
1304 expect(resolveCalls).toBe(1)
8954c0a3 1305 await pool.destroy()
f18fd12b
JB
1306 })
1307
9eae3c69
JB
1308 it('Verify that hasTaskFunction() is working', async () => {
1309 const dynamicThreadPool = new DynamicThreadPool(
1310 Math.floor(numberOfWorkers / 2),
1311 numberOfWorkers,
b2fd3f4a 1312 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
9eae3c69
JB
1313 )
1314 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1315 expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1316 expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1317 true
1318 )
1319 expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true)
1320 expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true)
1321 expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false)
1322 await dynamicThreadPool.destroy()
1323 const fixedClusterPool = new FixedClusterPool(
1324 numberOfWorkers,
1325 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1326 )
1327 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1328 expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1329 expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1330 true
1331 )
1332 expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true)
1333 expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true)
1334 expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false)
1335 await fixedClusterPool.destroy()
1336 })
1337
1338 it('Verify that addTaskFunction() is working', async () => {
1339 const dynamicThreadPool = new DynamicThreadPool(
1340 Math.floor(numberOfWorkers / 2),
1341 numberOfWorkers,
b2fd3f4a 1342 './tests/worker-files/thread/testWorker.mjs'
9eae3c69
JB
1343 )
1344 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
3feeab69
JB
1345 await expect(
1346 dynamicThreadPool.addTaskFunction(0, () => {})
948faff7 1347 ).rejects.toThrow(new TypeError('name argument must be a string'))
3feeab69
JB
1348 await expect(
1349 dynamicThreadPool.addTaskFunction('', () => {})
948faff7 1350 ).rejects.toThrow(
3feeab69
JB
1351 new TypeError('name argument must not be an empty string')
1352 )
948faff7
JB
1353 await expect(dynamicThreadPool.addTaskFunction('test', 0)).rejects.toThrow(
1354 new TypeError('fn argument must be a function')
1355 )
1356 await expect(dynamicThreadPool.addTaskFunction('test', '')).rejects.toThrow(
1357 new TypeError('fn argument must be a function')
1358 )
9eae3c69
JB
1359 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1360 DEFAULT_TASK_NAME,
1361 'test'
1362 ])
1363 const echoTaskFunction = data => {
1364 return data
1365 }
1366 await expect(
1367 dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1368 ).resolves.toBe(true)
1369 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1370 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1371 echoTaskFunction
1372 )
1373 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1374 DEFAULT_TASK_NAME,
1375 'test',
1376 'echo'
1377 ])
1378 const taskFunctionData = { test: 'test' }
1379 const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
1380 expect(echoResult).toStrictEqual(taskFunctionData)
adee6053
JB
1381 for (const workerNode of dynamicThreadPool.workerNodes) {
1382 expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
1383 tasks: {
1384 executed: expect.any(Number),
1385 executing: 0,
1386 queued: 0,
463226a4 1387 sequentiallyStolen: 0,
5ad42e34 1388 stolen: 0,
adee6053
JB
1389 failed: 0
1390 },
1391 runTime: {
1392 history: new CircularArray()
1393 },
1394 waitTime: {
1395 history: new CircularArray()
1396 },
1397 elu: {
1398 idle: {
1399 history: new CircularArray()
1400 },
1401 active: {
1402 history: new CircularArray()
1403 }
1404 }
1405 })
1406 }
9eae3c69
JB
1407 await dynamicThreadPool.destroy()
1408 })
1409
1410 it('Verify that removeTaskFunction() is working', async () => {
1411 const dynamicThreadPool = new DynamicThreadPool(
1412 Math.floor(numberOfWorkers / 2),
1413 numberOfWorkers,
b2fd3f4a 1414 './tests/worker-files/thread/testWorker.mjs'
9eae3c69
JB
1415 )
1416 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1417 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1418 DEFAULT_TASK_NAME,
1419 'test'
1420 ])
948faff7 1421 await expect(dynamicThreadPool.removeTaskFunction('test')).rejects.toThrow(
16248b23 1422 new Error('Cannot remove a task function not handled on the pool side')
9eae3c69
JB
1423 )
1424 const echoTaskFunction = data => {
1425 return data
1426 }
1427 await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1428 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1429 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1430 echoTaskFunction
1431 )
1432 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1433 DEFAULT_TASK_NAME,
1434 'test',
1435 'echo'
1436 ])
1437 await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
1438 true
1439 )
1440 expect(dynamicThreadPool.taskFunctions.size).toBe(0)
1441 expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
1442 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1443 DEFAULT_TASK_NAME,
1444 'test'
1445 ])
1446 await dynamicThreadPool.destroy()
1447 })
1448
30500265 1449 it('Verify that listTaskFunctionNames() is working', async () => {
90d7d101
JB
1450 const dynamicThreadPool = new DynamicThreadPool(
1451 Math.floor(numberOfWorkers / 2),
1452 numberOfWorkers,
b2fd3f4a 1453 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
90d7d101
JB
1454 )
1455 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
66979634 1456 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
6cd5248f 1457 DEFAULT_TASK_NAME,
90d7d101
JB
1458 'jsonIntegerSerialization',
1459 'factorial',
1460 'fibonacci'
1461 ])
9eae3c69 1462 await dynamicThreadPool.destroy()
90d7d101
JB
1463 const fixedClusterPool = new FixedClusterPool(
1464 numberOfWorkers,
1465 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1466 )
1467 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
66979634 1468 expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([
6cd5248f 1469 DEFAULT_TASK_NAME,
90d7d101
JB
1470 'jsonIntegerSerialization',
1471 'factorial',
1472 'fibonacci'
1473 ])
0fe39c97 1474 await fixedClusterPool.destroy()
90d7d101
JB
1475 })
1476
9eae3c69 1477 it('Verify that setDefaultTaskFunction() is working', async () => {
30500265
JB
1478 const dynamicThreadPool = new DynamicThreadPool(
1479 Math.floor(numberOfWorkers / 2),
1480 numberOfWorkers,
b2fd3f4a 1481 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
30500265
JB
1482 )
1483 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
711623b8 1484 const workerId = dynamicThreadPool.workerNodes[0].info.id
948faff7 1485 await expect(dynamicThreadPool.setDefaultTaskFunction(0)).rejects.toThrow(
b0b55f57 1486 new Error(
711623b8 1487 `Task function operation 'default' failed on worker ${workerId} with error: 'TypeError: name parameter is not a string'`
b0b55f57
JB
1488 )
1489 )
1490 await expect(
1491 dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
948faff7 1492 ).rejects.toThrow(
b0b55f57 1493 new Error(
711623b8 1494 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function reserved name as the default task function'`
b0b55f57
JB
1495 )
1496 )
1497 await expect(
1498 dynamicThreadPool.setDefaultTaskFunction('unknown')
948faff7 1499 ).rejects.toThrow(
b0b55f57 1500 new Error(
711623b8 1501 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function to a non-existing task function'`
b0b55f57
JB
1502 )
1503 )
9eae3c69
JB
1504 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1505 DEFAULT_TASK_NAME,
1506 'jsonIntegerSerialization',
1507 'factorial',
1508 'fibonacci'
1509 ])
1510 await expect(
1511 dynamicThreadPool.setDefaultTaskFunction('factorial')
1512 ).resolves.toBe(true)
1513 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1514 DEFAULT_TASK_NAME,
1515 'factorial',
1516 'jsonIntegerSerialization',
1517 'fibonacci'
1518 ])
1519 await expect(
1520 dynamicThreadPool.setDefaultTaskFunction('fibonacci')
1521 ).resolves.toBe(true)
1522 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1523 DEFAULT_TASK_NAME,
1524 'fibonacci',
1525 'jsonIntegerSerialization',
1526 'factorial'
1527 ])
cda9ba34 1528 await dynamicThreadPool.destroy()
30500265
JB
1529 })
1530
90d7d101 1531 it('Verify that multiple task functions worker is working', async () => {
70a4f5ea 1532 const pool = new DynamicClusterPool(
2431bdb4 1533 Math.floor(numberOfWorkers / 2),
70a4f5ea 1534 numberOfWorkers,
90d7d101 1535 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
70a4f5ea
JB
1536 )
1537 const data = { n: 10 }
82888165 1538 const result0 = await pool.execute(data)
30b963d4 1539 expect(result0).toStrictEqual({ ok: 1 })
70a4f5ea 1540 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
30b963d4 1541 expect(result1).toStrictEqual({ ok: 1 })
70a4f5ea
JB
1542 const result2 = await pool.execute(data, 'factorial')
1543 expect(result2).toBe(3628800)
1544 const result3 = await pool.execute(data, 'fibonacci')
024daf59 1545 expect(result3).toBe(55)
5bb5be17
JB
1546 expect(pool.info.executingTasks).toBe(0)
1547 expect(pool.info.executedTasks).toBe(4)
b414b84c 1548 for (const workerNode of pool.workerNodes) {
66979634 1549 expect(workerNode.info.taskFunctionNames).toStrictEqual([
6cd5248f 1550 DEFAULT_TASK_NAME,
b414b84c
JB
1551 'jsonIntegerSerialization',
1552 'factorial',
1553 'fibonacci'
1554 ])
1555 expect(workerNode.taskFunctionsUsage.size).toBe(3)
66979634 1556 for (const name of pool.listTaskFunctionNames()) {
5bb5be17
JB
1557 expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
1558 tasks: {
1559 executed: expect.any(Number),
4ba4c7f9 1560 executing: 0,
5bb5be17 1561 failed: 0,
68cbdc84 1562 queued: 0,
463226a4 1563 sequentiallyStolen: 0,
68cbdc84 1564 stolen: 0
5bb5be17
JB
1565 },
1566 runTime: {
1567 history: expect.any(CircularArray)
1568 },
1569 waitTime: {
1570 history: expect.any(CircularArray)
1571 },
1572 elu: {
1573 idle: {
1574 history: expect.any(CircularArray)
1575 },
1576 active: {
1577 history: expect.any(CircularArray)
1578 }
1579 }
1580 })
1581 expect(
4ba4c7f9
JB
1582 workerNode.getTaskFunctionWorkerUsage(name).tasks.executed
1583 ).toBeGreaterThan(0)
5bb5be17 1584 }
dfd7ec01
JB
1585 expect(
1586 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1587 ).toStrictEqual(
66979634
JB
1588 workerNode.getTaskFunctionWorkerUsage(
1589 workerNode.info.taskFunctionNames[1]
1590 )
dfd7ec01 1591 )
5bb5be17 1592 }
0fe39c97 1593 await pool.destroy()
70a4f5ea 1594 })
52a23942
JB
1595
1596 it('Verify sendKillMessageToWorker()', async () => {
1597 const pool = new DynamicClusterPool(
1598 Math.floor(numberOfWorkers / 2),
1599 numberOfWorkers,
1600 './tests/worker-files/cluster/testWorker.js'
1601 )
1602 const workerNodeKey = 0
1603 await expect(
adee6053 1604 pool.sendKillMessageToWorker(workerNodeKey)
52a23942
JB
1605 ).resolves.toBeUndefined()
1606 await pool.destroy()
1607 })
adee6053
JB
1608
1609 it('Verify sendTaskFunctionOperationToWorker()', async () => {
1610 const pool = new DynamicClusterPool(
1611 Math.floor(numberOfWorkers / 2),
1612 numberOfWorkers,
1613 './tests/worker-files/cluster/testWorker.js'
1614 )
1615 const workerNodeKey = 0
1616 await expect(
1617 pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
1618 taskFunctionOperation: 'add',
1619 taskFunctionName: 'empty',
1620 taskFunction: (() => {}).toString()
1621 })
1622 ).resolves.toBe(true)
1623 expect(
1624 pool.workerNodes[workerNodeKey].info.taskFunctionNames
1625 ).toStrictEqual([DEFAULT_TASK_NAME, 'test', 'empty'])
1626 await pool.destroy()
1627 })
1628
1629 it('Verify sendTaskFunctionOperationToWorkers()', async () => {
1630 const pool = new DynamicClusterPool(
1631 Math.floor(numberOfWorkers / 2),
1632 numberOfWorkers,
1633 './tests/worker-files/cluster/testWorker.js'
1634 )
adee6053
JB
1635 await expect(
1636 pool.sendTaskFunctionOperationToWorkers({
1637 taskFunctionOperation: 'add',
1638 taskFunctionName: 'empty',
1639 taskFunction: (() => {}).toString()
1640 })
1641 ).resolves.toBe(true)
1642 for (const workerNode of pool.workerNodes) {
1643 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1644 DEFAULT_TASK_NAME,
1645 'test',
1646 'empty'
1647 ])
1648 }
1649 await pool.destroy()
1650 })
3ec964d6 1651})