build(deps-dev): apply updates
[poolifier.git] / tests / pools / abstract-pool.test.mjs
CommitLineData
5d5885ee 1// eslint-disable-next-line n/no-unsupported-features/node-builtins
ded253e2 2import { createHook, executionAsyncId } from 'node:async_hooks'
a074ffee
JB
3import { EventEmitterAsyncResource } from 'node:events'
4import { readFileSync } from 'node:fs'
ded253e2 5import { dirname, join } from 'node:path'
2eb14889 6import { fileURLToPath } from 'node:url'
ded253e2 7
a074ffee
JB
8import { expect } from 'expect'
9import { restore, stub } from 'sinon'
ded253e2
JB
10
11import { CircularArray } from '../../lib/circular-array.cjs'
12import { Deque } from '../../lib/deque.cjs'
a074ffee 13import {
70a4f5ea 14 DynamicClusterPool,
9e619829 15 DynamicThreadPool,
aee46736 16 FixedClusterPool,
e843b904 17 FixedThreadPool,
aee46736 18 PoolEvents,
184855e6 19 PoolTypes,
3d6dd312 20 WorkerChoiceStrategies,
184855e6 21 WorkerTypes
d35e5717 22} from '../../lib/index.cjs'
ded253e2 23import { WorkerNode } from '../../lib/pools/worker-node.cjs'
d35e5717
JB
24import { DEFAULT_TASK_NAME } from '../../lib/utils.cjs'
25import { waitPoolEvents } from '../test-utils.cjs'
e1ffb94f
JB
26
27describe('Abstract pool test suite', () => {
2eb14889
JB
28 const version = JSON.parse(
29 readFileSync(
a5e5599c 30 join(dirname(fileURLToPath(import.meta.url)), '../..', 'package.json'),
2eb14889
JB
31 'utf8'
32 )
33 ).version
fc027381 34 const numberOfWorkers = 2
a8884ffd 35 class StubPoolWithIsMain extends FixedThreadPool {
e1ffb94f
JB
36 isMain () {
37 return false
38 }
3ec964d6 39 }
3ec964d6 40
dc021bcc 41 afterEach(() => {
a074ffee 42 restore()
dc021bcc
JB
43 })
44
bcf85f7f
JB
45 it('Verify that pool can be created and destroyed', async () => {
46 const pool = new FixedThreadPool(
47 numberOfWorkers,
48 './tests/worker-files/thread/testWorker.mjs'
49 )
50 expect(pool).toBeInstanceOf(FixedThreadPool)
51 await pool.destroy()
52 })
53
54 it('Verify that pool cannot be created from a non main thread/process', () => {
8d3782fa
JB
55 expect(
56 () =>
a8884ffd 57 new StubPoolWithIsMain(
7c0ba920 58 numberOfWorkers,
b2fd3f4a 59 './tests/worker-files/thread/testWorker.mjs',
8d3782fa 60 {
041dc05b 61 errorHandler: e => console.error(e)
8d3782fa
JB
62 }
63 )
948faff7 64 ).toThrow(
e695d66f
JB
65 new Error(
66 'Cannot start a pool from a worker with the same type as the pool'
67 )
04f45163 68 )
3ec964d6 69 })
c510fea7 70
bc61cfe6
JB
71 it('Verify that pool statuses properties are set', async () => {
72 const pool = new FixedThreadPool(
73 numberOfWorkers,
b2fd3f4a 74 './tests/worker-files/thread/testWorker.mjs'
bc61cfe6 75 )
bc61cfe6 76 expect(pool.started).toBe(true)
711623b8
JB
77 expect(pool.starting).toBe(false)
78 expect(pool.destroying).toBe(false)
bc61cfe6 79 await pool.destroy()
bc61cfe6
JB
80 })
81
c510fea7 82 it('Verify that filePath is checked', () => {
948faff7 83 expect(() => new FixedThreadPool(numberOfWorkers)).toThrow(
c3719753
JB
84 new TypeError('The worker file path must be specified')
85 )
86 expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrow(
87 new TypeError('The worker file path must be a string')
3d6dd312
JB
88 )
89 expect(
90 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
948faff7 91 ).toThrow(new Error("Cannot find the worker file './dummyWorker.ts'"))
8d3782fa
JB
92 })
93
94 it('Verify that numberOfWorkers is checked', () => {
8003c026
JB
95 expect(
96 () =>
97 new FixedThreadPool(
98 undefined,
b2fd3f4a 99 './tests/worker-files/thread/testWorker.mjs'
8003c026 100 )
948faff7 101 ).toThrow(
e695d66f
JB
102 new Error(
103 'Cannot instantiate a pool without specifying the number of workers'
104 )
8d3782fa
JB
105 )
106 })
107
108 it('Verify that a negative number of workers is checked', () => {
109 expect(
110 () =>
d35e5717 111 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.cjs')
948faff7 112 ).toThrow(
473c717a
JB
113 new RangeError(
114 'Cannot instantiate a pool with a negative number of workers'
115 )
8d3782fa
JB
116 )
117 })
118
119 it('Verify that a non integer number of workers is checked', () => {
120 expect(
121 () =>
b2fd3f4a 122 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.mjs')
948faff7 123 ).toThrow(
473c717a 124 new TypeError(
0d80593b 125 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
126 )
127 )
c510fea7 128 })
7c0ba920 129
26ce26ca
JB
130 it('Verify that pool arguments number and pool type are checked', () => {
131 expect(
132 () =>
133 new FixedThreadPool(
134 numberOfWorkers,
135 './tests/worker-files/thread/testWorker.mjs',
136 undefined,
137 numberOfWorkers * 2
138 )
139 ).toThrow(
140 new Error(
141 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
142 )
143 )
144 })
145
216541b6 146 it('Verify that dynamic pool sizing is checked', () => {
a5ed75b7
JB
147 expect(
148 () =>
149 new DynamicClusterPool(
150 1,
151 undefined,
d35e5717 152 './tests/worker-files/cluster/testWorker.cjs'
a5ed75b7 153 )
948faff7 154 ).toThrow(
a5ed75b7
JB
155 new TypeError(
156 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
157 )
158 )
2761efb4
JB
159 expect(
160 () =>
161 new DynamicThreadPool(
162 0.5,
163 1,
b2fd3f4a 164 './tests/worker-files/thread/testWorker.mjs'
2761efb4 165 )
948faff7 166 ).toThrow(
2761efb4
JB
167 new TypeError(
168 'Cannot instantiate a pool with a non safe integer number of workers'
169 )
170 )
171 expect(
172 () =>
173 new DynamicClusterPool(
174 0,
175 0.5,
d35e5717 176 './tests/worker-files/cluster/testWorker.cjs'
2761efb4 177 )
948faff7 178 ).toThrow(
2761efb4
JB
179 new TypeError(
180 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
181 )
182 )
2431bdb4
JB
183 expect(
184 () =>
b2fd3f4a
JB
185 new DynamicThreadPool(
186 2,
187 1,
188 './tests/worker-files/thread/testWorker.mjs'
189 )
948faff7 190 ).toThrow(
2431bdb4
JB
191 new RangeError(
192 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
193 )
194 )
195 expect(
196 () =>
b2fd3f4a
JB
197 new DynamicThreadPool(
198 0,
199 0,
200 './tests/worker-files/thread/testWorker.mjs'
201 )
948faff7 202 ).toThrow(
2431bdb4 203 new RangeError(
213cbac6 204 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
2431bdb4
JB
205 )
206 )
21f710aa
JB
207 expect(
208 () =>
213cbac6
JB
209 new DynamicClusterPool(
210 1,
211 1,
d35e5717 212 './tests/worker-files/cluster/testWorker.cjs'
213cbac6 213 )
948faff7 214 ).toThrow(
21f710aa 215 new RangeError(
213cbac6 216 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
21f710aa
JB
217 )
218 )
2431bdb4
JB
219 })
220
fd7ebd49 221 it('Verify that pool options are checked', async () => {
7c0ba920
JB
222 let pool = new FixedThreadPool(
223 numberOfWorkers,
b2fd3f4a 224 './tests/worker-files/thread/testWorker.mjs'
7c0ba920 225 )
b5604034 226 expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
e44639e9 227 expect(pool.emitter.eventNames()).toStrictEqual([])
47352846
JB
228 expect(pool.opts).toStrictEqual({
229 startWorkers: true,
230 enableEvents: true,
231 restartWorkerOnError: true,
232 enableTasksQueue: false,
26ce26ca 233 workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN
8990357d 234 })
999ef664
JB
235 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
236 .workerChoiceStrategies) {
86cbb766 237 expect(workerChoiceStrategy.opts).toStrictEqual({
86cbb766
JB
238 runTime: { median: false },
239 waitTime: { median: false },
240 elu: { median: false },
241 weights: expect.objectContaining({
242 0: expect.any(Number),
243 [pool.info.maxSize - 1]: expect.any(Number)
00e1bdeb 244 })
86cbb766 245 })
999ef664 246 }
fd7ebd49 247 await pool.destroy()
73bfd59d 248 const testHandler = () => console.info('test handler executed')
7c0ba920
JB
249 pool = new FixedThreadPool(
250 numberOfWorkers,
b2fd3f4a 251 './tests/worker-files/thread/testWorker.mjs',
7c0ba920 252 {
e4543b14 253 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
49be33fe 254 workerChoiceStrategyOptions: {
932fc8be 255 runTime: { median: true },
fc027381 256 weights: { 0: 300, 1: 200 }
49be33fe 257 },
35cf1c03 258 enableEvents: false,
1f68cede 259 restartWorkerOnError: false,
ff733df7 260 enableTasksQueue: true,
d4aeae5a 261 tasksQueueOptions: { concurrency: 2 },
35cf1c03
JB
262 messageHandler: testHandler,
263 errorHandler: testHandler,
264 onlineHandler: testHandler,
265 exitHandler: testHandler
7c0ba920
JB
266 }
267 )
7c0ba920 268 expect(pool.emitter).toBeUndefined()
47352846
JB
269 expect(pool.opts).toStrictEqual({
270 startWorkers: true,
271 enableEvents: false,
272 restartWorkerOnError: false,
273 enableTasksQueue: true,
274 tasksQueueOptions: {
275 concurrency: 2,
2324f8c9 276 size: Math.pow(numberOfWorkers, 2),
dbd73092 277 taskStealing: true,
32b141fd 278 tasksStealingOnBackPressure: true,
568d0075 279 tasksFinishedTimeout: 2000
47352846
JB
280 },
281 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
282 workerChoiceStrategyOptions: {
47352846 283 runTime: { median: true },
47352846
JB
284 weights: { 0: 300, 1: 200 }
285 },
286 onlineHandler: testHandler,
287 messageHandler: testHandler,
288 errorHandler: testHandler,
289 exitHandler: testHandler
8990357d 290 })
999ef664
JB
291 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
292 .workerChoiceStrategies) {
293 expect(workerChoiceStrategy.opts).toStrictEqual({
999ef664
JB
294 runTime: { median: true },
295 waitTime: { median: false },
296 elu: { median: false },
297 weights: { 0: 300, 1: 200 }
298 })
299 }
fd7ebd49 300 await pool.destroy()
7c0ba920
JB
301 })
302
fe291b64 303 it('Verify that pool options are validated', () => {
d4aeae5a
JB
304 expect(
305 () =>
306 new FixedThreadPool(
307 numberOfWorkers,
b2fd3f4a 308 './tests/worker-files/thread/testWorker.mjs',
d4aeae5a 309 {
f0d7f803 310 workerChoiceStrategy: 'invalidStrategy'
d4aeae5a
JB
311 }
312 )
948faff7 313 ).toThrow(new Error("Invalid worker choice strategy 'invalidStrategy'"))
49be33fe
JB
314 expect(
315 () =>
316 new FixedThreadPool(
317 numberOfWorkers,
b2fd3f4a 318 './tests/worker-files/thread/testWorker.mjs',
49be33fe
JB
319 {
320 workerChoiceStrategyOptions: { weights: {} }
321 }
322 )
948faff7 323 ).toThrow(
8735b4e5
JB
324 new Error(
325 'Invalid worker choice strategy options: must have a weight for each worker node'
326 )
49be33fe 327 )
f0d7f803
JB
328 expect(
329 () =>
330 new FixedThreadPool(
331 numberOfWorkers,
b2fd3f4a 332 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
333 {
334 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
335 }
336 )
948faff7 337 ).toThrow(
8735b4e5
JB
338 new Error(
339 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
340 )
f0d7f803 341 )
5c4c2dee
JB
342 expect(
343 () =>
344 new FixedThreadPool(
345 numberOfWorkers,
b2fd3f4a 346 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
347 {
348 enableTasksQueue: true,
349 tasksQueueOptions: 'invalidTasksQueueOptions'
350 }
351 )
948faff7 352 ).toThrow(
5c4c2dee
JB
353 new TypeError('Invalid tasks queue options: must be a plain object')
354 )
f0d7f803
JB
355 expect(
356 () =>
357 new FixedThreadPool(
358 numberOfWorkers,
b2fd3f4a 359 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
360 {
361 enableTasksQueue: true,
362 tasksQueueOptions: { concurrency: 0 }
363 }
364 )
948faff7 365 ).toThrow(
e695d66f 366 new RangeError(
20c6f652 367 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
8735b4e5
JB
368 )
369 )
f0d7f803
JB
370 expect(
371 () =>
372 new FixedThreadPool(
373 numberOfWorkers,
b2fd3f4a 374 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
375 {
376 enableTasksQueue: true,
5c4c2dee 377 tasksQueueOptions: { concurrency: -1 }
f0d7f803
JB
378 }
379 )
948faff7 380 ).toThrow(
5c4c2dee
JB
381 new RangeError(
382 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
383 )
8735b4e5 384 )
f0d7f803
JB
385 expect(
386 () =>
387 new FixedThreadPool(
388 numberOfWorkers,
b2fd3f4a 389 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
390 {
391 enableTasksQueue: true,
392 tasksQueueOptions: { concurrency: 0.2 }
393 }
394 )
948faff7 395 ).toThrow(
20c6f652 396 new TypeError('Invalid worker node tasks concurrency: must be an integer')
8735b4e5 397 )
5c4c2dee
JB
398 expect(
399 () =>
400 new FixedThreadPool(
401 numberOfWorkers,
b2fd3f4a 402 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
403 {
404 enableTasksQueue: true,
405 tasksQueueOptions: { size: 0 }
406 }
407 )
948faff7 408 ).toThrow(
5c4c2dee
JB
409 new RangeError(
410 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
411 )
412 )
413 expect(
414 () =>
415 new FixedThreadPool(
416 numberOfWorkers,
b2fd3f4a 417 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
418 {
419 enableTasksQueue: true,
420 tasksQueueOptions: { size: -1 }
421 }
422 )
948faff7 423 ).toThrow(
5c4c2dee
JB
424 new RangeError(
425 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
426 )
427 )
428 expect(
429 () =>
430 new FixedThreadPool(
431 numberOfWorkers,
b2fd3f4a 432 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
433 {
434 enableTasksQueue: true,
435 tasksQueueOptions: { size: 0.2 }
436 }
437 )
948faff7 438 ).toThrow(
5c4c2dee
JB
439 new TypeError('Invalid worker node tasks queue size: must be an integer')
440 )
d4aeae5a
JB
441 })
442
2431bdb4 443 it('Verify that pool worker choice strategy options can be set', async () => {
a20f0ba5
JB
444 const pool = new FixedThreadPool(
445 numberOfWorkers,
b2fd3f4a 446 './tests/worker-files/thread/testWorker.mjs',
a20f0ba5
JB
447 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
448 )
26ce26ca 449 expect(pool.opts.workerChoiceStrategyOptions).toBeUndefined()
a20f0ba5
JB
450 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
451 .workerChoiceStrategies) {
86cbb766 452 expect(workerChoiceStrategy.opts).toStrictEqual({
86cbb766
JB
453 runTime: { median: false },
454 waitTime: { median: false },
455 elu: { median: false },
456 weights: expect.objectContaining({
457 0: expect.any(Number),
458 [pool.info.maxSize - 1]: expect.any(Number)
00e1bdeb 459 })
86cbb766 460 })
a20f0ba5 461 }
87de9ff5
JB
462 expect(
463 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
464 ).toStrictEqual({
932fc8be
JB
465 runTime: {
466 aggregate: true,
467 average: true,
468 median: false
469 },
470 waitTime: {
471 aggregate: false,
472 average: false,
473 median: false
474 },
5df69fab 475 elu: {
9adcefab
JB
476 aggregate: true,
477 average: true,
5df69fab
JB
478 median: false
479 }
86bf340d 480 })
9adcefab
JB
481 pool.setWorkerChoiceStrategyOptions({
482 runTime: { median: true },
483 elu: { median: true }
484 })
a20f0ba5 485 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
9adcefab 486 runTime: { median: true },
8990357d
JB
487 elu: { median: true }
488 })
a20f0ba5
JB
489 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
490 .workerChoiceStrategies) {
86cbb766 491 expect(workerChoiceStrategy.opts).toStrictEqual({
86cbb766
JB
492 runTime: { median: true },
493 waitTime: { median: false },
494 elu: { median: true },
495 weights: expect.objectContaining({
496 0: expect.any(Number),
497 [pool.info.maxSize - 1]: expect.any(Number)
00e1bdeb 498 })
86cbb766 499 })
a20f0ba5 500 }
87de9ff5
JB
501 expect(
502 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
503 ).toStrictEqual({
932fc8be
JB
504 runTime: {
505 aggregate: true,
506 average: false,
507 median: true
508 },
509 waitTime: {
510 aggregate: false,
511 average: false,
512 median: false
513 },
5df69fab 514 elu: {
9adcefab 515 aggregate: true,
5df69fab 516 average: false,
9adcefab 517 median: true
5df69fab 518 }
86bf340d 519 })
9adcefab
JB
520 pool.setWorkerChoiceStrategyOptions({
521 runTime: { median: false },
522 elu: { median: false }
523 })
a20f0ba5 524 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
8990357d 525 runTime: { median: false },
8990357d
JB
526 elu: { median: false }
527 })
a20f0ba5
JB
528 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
529 .workerChoiceStrategies) {
86cbb766 530 expect(workerChoiceStrategy.opts).toStrictEqual({
86cbb766
JB
531 runTime: { median: false },
532 waitTime: { median: false },
533 elu: { median: false },
534 weights: expect.objectContaining({
535 0: expect.any(Number),
536 [pool.info.maxSize - 1]: expect.any(Number)
00e1bdeb 537 })
86cbb766 538 })
a20f0ba5 539 }
87de9ff5
JB
540 expect(
541 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
542 ).toStrictEqual({
932fc8be
JB
543 runTime: {
544 aggregate: true,
545 average: true,
546 median: false
547 },
548 waitTime: {
549 aggregate: false,
550 average: false,
551 median: false
552 },
5df69fab 553 elu: {
9adcefab
JB
554 aggregate: true,
555 average: true,
5df69fab
JB
556 median: false
557 }
86bf340d 558 })
1f95d544
JB
559 expect(() =>
560 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
948faff7 561 ).toThrow(
8735b4e5
JB
562 new TypeError(
563 'Invalid worker choice strategy options: must be a plain object'
564 )
1f95d544 565 )
948faff7 566 expect(() => pool.setWorkerChoiceStrategyOptions({ weights: {} })).toThrow(
8735b4e5
JB
567 new Error(
568 'Invalid worker choice strategy options: must have a weight for each worker node'
569 )
1f95d544
JB
570 )
571 expect(() =>
572 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
948faff7 573 ).toThrow(
8735b4e5
JB
574 new Error(
575 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
576 )
1f95d544 577 )
a20f0ba5
JB
578 await pool.destroy()
579 })
580
2431bdb4 581 it('Verify that pool tasks queue can be enabled/disabled', async () => {
a20f0ba5
JB
582 const pool = new FixedThreadPool(
583 numberOfWorkers,
b2fd3f4a 584 './tests/worker-files/thread/testWorker.mjs'
a20f0ba5
JB
585 )
586 expect(pool.opts.enableTasksQueue).toBe(false)
587 expect(pool.opts.tasksQueueOptions).toBeUndefined()
588 pool.enableTasksQueue(true)
589 expect(pool.opts.enableTasksQueue).toBe(true)
20c6f652
JB
590 expect(pool.opts.tasksQueueOptions).toStrictEqual({
591 concurrency: 1,
2324f8c9 592 size: Math.pow(numberOfWorkers, 2),
dbd73092 593 taskStealing: true,
32b141fd 594 tasksStealingOnBackPressure: true,
568d0075 595 tasksFinishedTimeout: 2000
20c6f652 596 })
a20f0ba5
JB
597 pool.enableTasksQueue(true, { concurrency: 2 })
598 expect(pool.opts.enableTasksQueue).toBe(true)
20c6f652
JB
599 expect(pool.opts.tasksQueueOptions).toStrictEqual({
600 concurrency: 2,
2324f8c9 601 size: Math.pow(numberOfWorkers, 2),
dbd73092 602 taskStealing: true,
32b141fd 603 tasksStealingOnBackPressure: true,
568d0075 604 tasksFinishedTimeout: 2000
20c6f652 605 })
a20f0ba5
JB
606 pool.enableTasksQueue(false)
607 expect(pool.opts.enableTasksQueue).toBe(false)
608 expect(pool.opts.tasksQueueOptions).toBeUndefined()
609 await pool.destroy()
610 })
611
2431bdb4 612 it('Verify that pool tasks queue options can be set', async () => {
a20f0ba5
JB
613 const pool = new FixedThreadPool(
614 numberOfWorkers,
b2fd3f4a 615 './tests/worker-files/thread/testWorker.mjs',
a20f0ba5
JB
616 { enableTasksQueue: true }
617 )
20c6f652
JB
618 expect(pool.opts.tasksQueueOptions).toStrictEqual({
619 concurrency: 1,
2324f8c9 620 size: Math.pow(numberOfWorkers, 2),
dbd73092 621 taskStealing: true,
32b141fd 622 tasksStealingOnBackPressure: true,
568d0075 623 tasksFinishedTimeout: 2000
20c6f652 624 })
d6ca1416 625 for (const workerNode of pool.workerNodes) {
2324f8c9
JB
626 expect(workerNode.tasksQueueBackPressureSize).toBe(
627 pool.opts.tasksQueueOptions.size
628 )
d6ca1416
JB
629 }
630 pool.setTasksQueueOptions({
631 concurrency: 2,
2324f8c9 632 size: 2,
d6ca1416 633 taskStealing: false,
32b141fd 634 tasksStealingOnBackPressure: false,
568d0075 635 tasksFinishedTimeout: 3000
d6ca1416 636 })
20c6f652
JB
637 expect(pool.opts.tasksQueueOptions).toStrictEqual({
638 concurrency: 2,
2324f8c9 639 size: 2,
d6ca1416 640 taskStealing: false,
32b141fd 641 tasksStealingOnBackPressure: false,
568d0075 642 tasksFinishedTimeout: 3000
d6ca1416
JB
643 })
644 for (const workerNode of pool.workerNodes) {
2324f8c9
JB
645 expect(workerNode.tasksQueueBackPressureSize).toBe(
646 pool.opts.tasksQueueOptions.size
647 )
d6ca1416
JB
648 }
649 pool.setTasksQueueOptions({
650 concurrency: 1,
651 taskStealing: true,
652 tasksStealingOnBackPressure: true
653 })
654 expect(pool.opts.tasksQueueOptions).toStrictEqual({
655 concurrency: 1,
2324f8c9 656 size: Math.pow(numberOfWorkers, 2),
dbd73092 657 taskStealing: true,
32b141fd 658 tasksStealingOnBackPressure: true,
568d0075 659 tasksFinishedTimeout: 2000
20c6f652 660 })
d6ca1416 661 for (const workerNode of pool.workerNodes) {
2324f8c9
JB
662 expect(workerNode.tasksQueueBackPressureSize).toBe(
663 pool.opts.tasksQueueOptions.size
664 )
d6ca1416 665 }
948faff7 666 expect(() => pool.setTasksQueueOptions('invalidTasksQueueOptions')).toThrow(
8735b4e5
JB
667 new TypeError('Invalid tasks queue options: must be a plain object')
668 )
948faff7 669 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrow(
e695d66f 670 new RangeError(
20c6f652 671 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
8735b4e5
JB
672 )
673 )
948faff7 674 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrow(
e695d66f 675 new RangeError(
20c6f652 676 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
8735b4e5 677 )
a20f0ba5 678 )
948faff7 679 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrow(
20c6f652
JB
680 new TypeError('Invalid worker node tasks concurrency: must be an integer')
681 )
948faff7 682 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrow(
20c6f652 683 new RangeError(
68dbcdc0 684 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
20c6f652
JB
685 )
686 )
948faff7 687 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrow(
20c6f652 688 new RangeError(
68dbcdc0 689 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
20c6f652
JB
690 )
691 )
948faff7 692 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrow(
68dbcdc0 693 new TypeError('Invalid worker node tasks queue size: must be an integer')
f0d7f803 694 )
a20f0ba5
JB
695 await pool.destroy()
696 })
697
6b27d407
JB
698 it('Verify that pool info is set', async () => {
699 let pool = new FixedThreadPool(
700 numberOfWorkers,
b2fd3f4a 701 './tests/worker-files/thread/testWorker.mjs'
6b27d407 702 )
2dca6cab
JB
703 expect(pool.info).toStrictEqual({
704 version,
705 type: PoolTypes.fixed,
706 worker: WorkerTypes.thread,
47352846 707 started: true,
2dca6cab
JB
708 ready: true,
709 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
0e8587d2 710 strategyRetries: 0,
2dca6cab
JB
711 minSize: numberOfWorkers,
712 maxSize: numberOfWorkers,
713 workerNodes: numberOfWorkers,
714 idleWorkerNodes: numberOfWorkers,
715 busyWorkerNodes: 0,
716 executedTasks: 0,
717 executingTasks: 0,
2dca6cab
JB
718 failedTasks: 0
719 })
6b27d407
JB
720 await pool.destroy()
721 pool = new DynamicClusterPool(
2431bdb4 722 Math.floor(numberOfWorkers / 2),
6b27d407 723 numberOfWorkers,
d35e5717 724 './tests/worker-files/cluster/testWorker.cjs'
6b27d407 725 )
2dca6cab
JB
726 expect(pool.info).toStrictEqual({
727 version,
728 type: PoolTypes.dynamic,
729 worker: WorkerTypes.cluster,
47352846 730 started: true,
2dca6cab
JB
731 ready: true,
732 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
0e8587d2 733 strategyRetries: 0,
2dca6cab
JB
734 minSize: Math.floor(numberOfWorkers / 2),
735 maxSize: numberOfWorkers,
736 workerNodes: Math.floor(numberOfWorkers / 2),
737 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
738 busyWorkerNodes: 0,
739 executedTasks: 0,
740 executingTasks: 0,
2dca6cab
JB
741 failedTasks: 0
742 })
6b27d407
JB
743 await pool.destroy()
744 })
745
2431bdb4 746 it('Verify that pool worker tasks usage are initialized', async () => {
bf9549ae
JB
747 const pool = new FixedClusterPool(
748 numberOfWorkers,
d35e5717 749 './tests/worker-files/cluster/testWorker.cjs'
bf9549ae 750 )
f06e48d8 751 for (const workerNode of pool.workerNodes) {
47352846 752 expect(workerNode).toBeInstanceOf(WorkerNode)
465b2940 753 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
754 tasks: {
755 executed: 0,
756 executing: 0,
757 queued: 0,
df593701 758 maxQueued: 0,
463226a4 759 sequentiallyStolen: 0,
68cbdc84 760 stolen: 0,
a4e07f72
JB
761 failed: 0
762 },
763 runTime: {
4ba4c7f9 764 history: new CircularArray()
a4e07f72
JB
765 },
766 waitTime: {
4ba4c7f9 767 history: new CircularArray()
a4e07f72 768 },
5df69fab
JB
769 elu: {
770 idle: {
4ba4c7f9 771 history: new CircularArray()
5df69fab
JB
772 },
773 active: {
4ba4c7f9 774 history: new CircularArray()
f7510105 775 }
5df69fab 776 }
86bf340d 777 })
f06e48d8
JB
778 }
779 await pool.destroy()
780 })
781
2431bdb4
JB
782 it('Verify that pool worker tasks queue are initialized', async () => {
783 let pool = new FixedClusterPool(
f06e48d8 784 numberOfWorkers,
d35e5717 785 './tests/worker-files/cluster/testWorker.cjs'
f06e48d8
JB
786 )
787 for (const workerNode of pool.workerNodes) {
47352846 788 expect(workerNode).toBeInstanceOf(WorkerNode)
574b351d 789 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
4d8bf9e4 790 expect(workerNode.tasksQueue.size).toBe(0)
9c16fb4b 791 expect(workerNode.tasksQueue.maxSize).toBe(0)
bf9549ae 792 }
fd7ebd49 793 await pool.destroy()
2431bdb4
JB
794 pool = new DynamicThreadPool(
795 Math.floor(numberOfWorkers / 2),
796 numberOfWorkers,
b2fd3f4a 797 './tests/worker-files/thread/testWorker.mjs'
2431bdb4
JB
798 )
799 for (const workerNode of pool.workerNodes) {
47352846 800 expect(workerNode).toBeInstanceOf(WorkerNode)
574b351d 801 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
2431bdb4
JB
802 expect(workerNode.tasksQueue.size).toBe(0)
803 expect(workerNode.tasksQueue.maxSize).toBe(0)
804 }
213cbac6 805 await pool.destroy()
2431bdb4
JB
806 })
807
808 it('Verify that pool worker info are initialized', async () => {
809 let pool = new FixedClusterPool(
810 numberOfWorkers,
d35e5717 811 './tests/worker-files/cluster/testWorker.cjs'
2431bdb4 812 )
2dca6cab 813 for (const workerNode of pool.workerNodes) {
47352846 814 expect(workerNode).toBeInstanceOf(WorkerNode)
2dca6cab
JB
815 expect(workerNode.info).toStrictEqual({
816 id: expect.any(Number),
817 type: WorkerTypes.cluster,
818 dynamic: false,
5eb72b9e
JB
819 ready: true,
820 stealing: false
2dca6cab
JB
821 })
822 }
2431bdb4
JB
823 await pool.destroy()
824 pool = new DynamicThreadPool(
825 Math.floor(numberOfWorkers / 2),
826 numberOfWorkers,
b2fd3f4a 827 './tests/worker-files/thread/testWorker.mjs'
2431bdb4 828 )
2dca6cab 829 for (const workerNode of pool.workerNodes) {
47352846 830 expect(workerNode).toBeInstanceOf(WorkerNode)
2dca6cab
JB
831 expect(workerNode.info).toStrictEqual({
832 id: expect.any(Number),
833 type: WorkerTypes.thread,
834 dynamic: false,
5eb72b9e
JB
835 ready: true,
836 stealing: false
2dca6cab
JB
837 })
838 }
213cbac6 839 await pool.destroy()
bf9549ae
JB
840 })
841
711623b8
JB
842 it('Verify that pool statuses are checked at start or destroy', async () => {
843 const pool = new FixedThreadPool(
844 numberOfWorkers,
845 './tests/worker-files/thread/testWorker.mjs'
846 )
847 expect(pool.info.started).toBe(true)
848 expect(pool.info.ready).toBe(true)
849 expect(() => pool.start()).toThrow(
850 new Error('Cannot start an already started pool')
851 )
852 await pool.destroy()
853 expect(pool.info.started).toBe(false)
854 expect(pool.info.ready).toBe(false)
855 await expect(pool.destroy()).rejects.toThrow(
856 new Error('Cannot destroy an already destroyed pool')
857 )
858 })
859
47352846
JB
860 it('Verify that pool can be started after initialization', async () => {
861 const pool = new FixedClusterPool(
862 numberOfWorkers,
d35e5717 863 './tests/worker-files/cluster/testWorker.cjs',
47352846
JB
864 {
865 startWorkers: false
866 }
867 )
868 expect(pool.info.started).toBe(false)
869 expect(pool.info.ready).toBe(false)
870 expect(pool.workerNodes).toStrictEqual([])
8e8d9101 871 expect(pool.readyEventEmitted).toBe(false)
948faff7 872 await expect(pool.execute()).rejects.toThrow(
47352846
JB
873 new Error('Cannot execute a task on not started pool')
874 )
875 pool.start()
876 expect(pool.info.started).toBe(true)
877 expect(pool.info.ready).toBe(true)
55082af9
JB
878 await waitPoolEvents(pool, PoolEvents.ready, 1)
879 expect(pool.readyEventEmitted).toBe(true)
47352846
JB
880 expect(pool.workerNodes.length).toBe(numberOfWorkers)
881 for (const workerNode of pool.workerNodes) {
882 expect(workerNode).toBeInstanceOf(WorkerNode)
883 }
884 await pool.destroy()
885 })
886
9d2d0da1
JB
887 it('Verify that pool execute() arguments are checked', async () => {
888 const pool = new FixedClusterPool(
889 numberOfWorkers,
d35e5717 890 './tests/worker-files/cluster/testWorker.cjs'
9d2d0da1 891 )
948faff7 892 await expect(pool.execute(undefined, 0)).rejects.toThrow(
9d2d0da1
JB
893 new TypeError('name argument must be a string')
894 )
948faff7 895 await expect(pool.execute(undefined, '')).rejects.toThrow(
9d2d0da1
JB
896 new TypeError('name argument must not be an empty string')
897 )
948faff7 898 await expect(pool.execute(undefined, undefined, {})).rejects.toThrow(
9d2d0da1
JB
899 new TypeError('transferList argument must be an array')
900 )
901 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
902 "Task function 'unknown' not found"
903 )
904 await pool.destroy()
948faff7 905 await expect(pool.execute()).rejects.toThrow(
47352846 906 new Error('Cannot execute a task on not started pool')
9d2d0da1
JB
907 )
908 })
909
2431bdb4 910 it('Verify that pool worker tasks usage are computed', async () => {
bf9549ae
JB
911 const pool = new FixedClusterPool(
912 numberOfWorkers,
d35e5717 913 './tests/worker-files/cluster/testWorker.cjs'
bf9549ae 914 )
09c2d0d3 915 const promises = new Set()
fc027381
JB
916 const maxMultiplier = 2
917 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
09c2d0d3 918 promises.add(pool.execute())
bf9549ae 919 }
f06e48d8 920 for (const workerNode of pool.workerNodes) {
465b2940 921 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
922 tasks: {
923 executed: 0,
924 executing: maxMultiplier,
925 queued: 0,
df593701 926 maxQueued: 0,
463226a4 927 sequentiallyStolen: 0,
68cbdc84 928 stolen: 0,
a4e07f72
JB
929 failed: 0
930 },
931 runTime: {
a4e07f72
JB
932 history: expect.any(CircularArray)
933 },
934 waitTime: {
a4e07f72
JB
935 history: expect.any(CircularArray)
936 },
5df69fab
JB
937 elu: {
938 idle: {
5df69fab
JB
939 history: expect.any(CircularArray)
940 },
941 active: {
5df69fab 942 history: expect.any(CircularArray)
f7510105 943 }
5df69fab 944 }
86bf340d 945 })
bf9549ae
JB
946 }
947 await Promise.all(promises)
f06e48d8 948 for (const workerNode of pool.workerNodes) {
465b2940 949 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
950 tasks: {
951 executed: maxMultiplier,
952 executing: 0,
953 queued: 0,
df593701 954 maxQueued: 0,
463226a4 955 sequentiallyStolen: 0,
68cbdc84 956 stolen: 0,
a4e07f72
JB
957 failed: 0
958 },
959 runTime: {
a4e07f72
JB
960 history: expect.any(CircularArray)
961 },
962 waitTime: {
a4e07f72
JB
963 history: expect.any(CircularArray)
964 },
5df69fab
JB
965 elu: {
966 idle: {
5df69fab
JB
967 history: expect.any(CircularArray)
968 },
969 active: {
5df69fab 970 history: expect.any(CircularArray)
f7510105 971 }
5df69fab 972 }
86bf340d 973 })
bf9549ae 974 }
fd7ebd49 975 await pool.destroy()
bf9549ae
JB
976 })
977
2431bdb4 978 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
7fd82a1c 979 const pool = new DynamicThreadPool(
2431bdb4 980 Math.floor(numberOfWorkers / 2),
8f4878b7 981 numberOfWorkers,
b2fd3f4a 982 './tests/worker-files/thread/testWorker.mjs'
9e619829 983 )
09c2d0d3 984 const promises = new Set()
ee9f5295
JB
985 const maxMultiplier = 2
986 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
09c2d0d3 987 promises.add(pool.execute())
9e619829
JB
988 }
989 await Promise.all(promises)
f06e48d8 990 for (const workerNode of pool.workerNodes) {
465b2940 991 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
992 tasks: {
993 executed: expect.any(Number),
994 executing: 0,
995 queued: 0,
df593701 996 maxQueued: 0,
463226a4 997 sequentiallyStolen: 0,
68cbdc84 998 stolen: 0,
a4e07f72
JB
999 failed: 0
1000 },
1001 runTime: {
a4e07f72
JB
1002 history: expect.any(CircularArray)
1003 },
1004 waitTime: {
a4e07f72
JB
1005 history: expect.any(CircularArray)
1006 },
5df69fab
JB
1007 elu: {
1008 idle: {
5df69fab
JB
1009 history: expect.any(CircularArray)
1010 },
1011 active: {
5df69fab 1012 history: expect.any(CircularArray)
f7510105 1013 }
5df69fab 1014 }
86bf340d 1015 })
465b2940 1016 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
94407def
JB
1017 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1018 numberOfWorkers * maxMultiplier
1019 )
b97d82d8
JB
1020 expect(workerNode.usage.runTime.history.length).toBe(0)
1021 expect(workerNode.usage.waitTime.history.length).toBe(0)
1022 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1023 expect(workerNode.usage.elu.active.history.length).toBe(0)
9e619829
JB
1024 }
1025 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
f06e48d8 1026 for (const workerNode of pool.workerNodes) {
465b2940 1027 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
1028 tasks: {
1029 executed: 0,
1030 executing: 0,
1031 queued: 0,
df593701 1032 maxQueued: 0,
463226a4 1033 sequentiallyStolen: 0,
68cbdc84 1034 stolen: 0,
a4e07f72
JB
1035 failed: 0
1036 },
1037 runTime: {
a4e07f72
JB
1038 history: expect.any(CircularArray)
1039 },
1040 waitTime: {
a4e07f72
JB
1041 history: expect.any(CircularArray)
1042 },
5df69fab
JB
1043 elu: {
1044 idle: {
5df69fab
JB
1045 history: expect.any(CircularArray)
1046 },
1047 active: {
5df69fab 1048 history: expect.any(CircularArray)
f7510105 1049 }
5df69fab 1050 }
86bf340d 1051 })
465b2940
JB
1052 expect(workerNode.usage.runTime.history.length).toBe(0)
1053 expect(workerNode.usage.waitTime.history.length).toBe(0)
b97d82d8
JB
1054 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1055 expect(workerNode.usage.elu.active.history.length).toBe(0)
ee11a4a2 1056 }
fd7ebd49 1057 await pool.destroy()
ee11a4a2
JB
1058 })
1059
a1763c54
JB
1060 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1061 const pool = new DynamicClusterPool(
2431bdb4 1062 Math.floor(numberOfWorkers / 2),
164d950a 1063 numberOfWorkers,
d35e5717 1064 './tests/worker-files/cluster/testWorker.cjs'
164d950a 1065 )
c726f66c 1066 expect(pool.emitter.eventNames()).toStrictEqual([])
d46660cd 1067 let poolInfo
a1763c54 1068 let poolReady = 0
041dc05b 1069 pool.emitter.on(PoolEvents.ready, info => {
a1763c54 1070 ++poolReady
d46660cd
JB
1071 poolInfo = info
1072 })
a1763c54 1073 await waitPoolEvents(pool, PoolEvents.ready, 1)
c726f66c 1074 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
a1763c54 1075 expect(poolReady).toBe(1)
d46660cd 1076 expect(poolInfo).toStrictEqual({
23ccf9d7 1077 version,
d46660cd 1078 type: PoolTypes.dynamic,
a1763c54 1079 worker: WorkerTypes.cluster,
47352846 1080 started: true,
a1763c54 1081 ready: true,
2431bdb4 1082 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
0e8587d2 1083 strategyRetries: expect.any(Number),
2431bdb4
JB
1084 minSize: expect.any(Number),
1085 maxSize: expect.any(Number),
1086 workerNodes: expect.any(Number),
1087 idleWorkerNodes: expect.any(Number),
1088 busyWorkerNodes: expect.any(Number),
1089 executedTasks: expect.any(Number),
1090 executingTasks: expect.any(Number),
2431bdb4
JB
1091 failedTasks: expect.any(Number)
1092 })
1093 await pool.destroy()
1094 })
1095
a1763c54
JB
1096 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1097 const pool = new FixedThreadPool(
2431bdb4 1098 numberOfWorkers,
b2fd3f4a 1099 './tests/worker-files/thread/testWorker.mjs'
2431bdb4 1100 )
c726f66c 1101 expect(pool.emitter.eventNames()).toStrictEqual([])
a1763c54
JB
1102 const promises = new Set()
1103 let poolBusy = 0
2431bdb4 1104 let poolInfo
041dc05b 1105 pool.emitter.on(PoolEvents.busy, info => {
a1763c54 1106 ++poolBusy
2431bdb4
JB
1107 poolInfo = info
1108 })
c726f66c 1109 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
a1763c54
JB
1110 for (let i = 0; i < numberOfWorkers * 2; i++) {
1111 promises.add(pool.execute())
1112 }
1113 await Promise.all(promises)
1114 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1115 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1116 expect(poolBusy).toBe(numberOfWorkers + 1)
2431bdb4
JB
1117 expect(poolInfo).toStrictEqual({
1118 version,
a1763c54
JB
1119 type: PoolTypes.fixed,
1120 worker: WorkerTypes.thread,
47352846
JB
1121 started: true,
1122 ready: true,
2431bdb4 1123 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
0e8587d2 1124 strategyRetries: expect.any(Number),
d46660cd
JB
1125 minSize: expect.any(Number),
1126 maxSize: expect.any(Number),
1127 workerNodes: expect.any(Number),
1128 idleWorkerNodes: expect.any(Number),
1129 busyWorkerNodes: expect.any(Number),
a4e07f72
JB
1130 executedTasks: expect.any(Number),
1131 executingTasks: expect.any(Number),
a4e07f72 1132 failedTasks: expect.any(Number)
d46660cd 1133 })
164d950a
JB
1134 await pool.destroy()
1135 })
1136
a1763c54
JB
1137 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1138 const pool = new DynamicThreadPool(
1139 Math.floor(numberOfWorkers / 2),
7c0ba920 1140 numberOfWorkers,
b2fd3f4a 1141 './tests/worker-files/thread/testWorker.mjs'
7c0ba920 1142 )
c726f66c 1143 expect(pool.emitter.eventNames()).toStrictEqual([])
09c2d0d3 1144 const promises = new Set()
a1763c54 1145 let poolFull = 0
d46660cd 1146 let poolInfo
041dc05b 1147 pool.emitter.on(PoolEvents.full, info => {
a1763c54 1148 ++poolFull
d46660cd
JB
1149 poolInfo = info
1150 })
c726f66c 1151 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.full])
7c0ba920 1152 for (let i = 0; i < numberOfWorkers * 2; i++) {
f5d14e90 1153 promises.add(pool.execute())
7c0ba920 1154 }
cf597bc5 1155 await Promise.all(promises)
33e6bb4c 1156 expect(poolFull).toBe(1)
d46660cd 1157 expect(poolInfo).toStrictEqual({
23ccf9d7 1158 version,
a1763c54 1159 type: PoolTypes.dynamic,
d46660cd 1160 worker: WorkerTypes.thread,
47352846
JB
1161 started: true,
1162 ready: true,
2431bdb4 1163 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
0e8587d2 1164 strategyRetries: expect.any(Number),
8735b4e5
JB
1165 minSize: expect.any(Number),
1166 maxSize: expect.any(Number),
1167 workerNodes: expect.any(Number),
1168 idleWorkerNodes: expect.any(Number),
1169 busyWorkerNodes: expect.any(Number),
1170 executedTasks: expect.any(Number),
1171 executingTasks: expect.any(Number),
1172 failedTasks: expect.any(Number)
1173 })
1174 await pool.destroy()
1175 })
1176
3e8611a8 1177 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
b1aae695 1178 const pool = new FixedThreadPool(
8735b4e5 1179 numberOfWorkers,
b2fd3f4a 1180 './tests/worker-files/thread/testWorker.mjs',
8735b4e5
JB
1181 {
1182 enableTasksQueue: true
1183 }
1184 )
a074ffee 1185 stub(pool, 'hasBackPressure').returns(true)
c726f66c 1186 expect(pool.emitter.eventNames()).toStrictEqual([])
8735b4e5
JB
1187 const promises = new Set()
1188 let poolBackPressure = 0
1189 let poolInfo
041dc05b 1190 pool.emitter.on(PoolEvents.backPressure, info => {
8735b4e5
JB
1191 ++poolBackPressure
1192 poolInfo = info
1193 })
c726f66c 1194 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.backPressure])
033f1776 1195 for (let i = 0; i < numberOfWorkers + 1; i++) {
8735b4e5
JB
1196 promises.add(pool.execute())
1197 }
1198 await Promise.all(promises)
033f1776 1199 expect(poolBackPressure).toBe(1)
8735b4e5
JB
1200 expect(poolInfo).toStrictEqual({
1201 version,
3e8611a8 1202 type: PoolTypes.fixed,
8735b4e5 1203 worker: WorkerTypes.thread,
47352846
JB
1204 started: true,
1205 ready: true,
8735b4e5 1206 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
0e8587d2 1207 strategyRetries: expect.any(Number),
d46660cd
JB
1208 minSize: expect.any(Number),
1209 maxSize: expect.any(Number),
1210 workerNodes: expect.any(Number),
1211 idleWorkerNodes: expect.any(Number),
5eb72b9e 1212 stealingWorkerNodes: expect.any(Number),
d46660cd 1213 busyWorkerNodes: expect.any(Number),
a4e07f72
JB
1214 executedTasks: expect.any(Number),
1215 executingTasks: expect.any(Number),
3e8611a8
JB
1216 maxQueuedTasks: expect.any(Number),
1217 queuedTasks: expect.any(Number),
1218 backPressure: true,
68cbdc84 1219 stolenTasks: expect.any(Number),
a4e07f72 1220 failedTasks: expect.any(Number)
d46660cd 1221 })
5eb72b9e 1222 expect(pool.hasBackPressure.callCount).toBeGreaterThanOrEqual(7)
fd7ebd49 1223 await pool.destroy()
7c0ba920 1224 })
70a4f5ea 1225
85b2561d
JB
1226 it('Verify that destroy() waits for queued tasks to finish', async () => {
1227 const tasksFinishedTimeout = 2500
1228 const pool = new FixedThreadPool(
1229 numberOfWorkers,
1230 './tests/worker-files/thread/asyncWorker.mjs',
1231 {
1232 enableTasksQueue: true,
1233 tasksQueueOptions: { tasksFinishedTimeout }
1234 }
1235 )
1236 const maxMultiplier = 4
1237 let tasksFinished = 0
1238 for (const workerNode of pool.workerNodes) {
1239 workerNode.on('taskFinished', () => {
1240 ++tasksFinished
1241 })
1242 }
1243 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1244 pool.execute()
1245 }
1246 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1247 const startTime = performance.now()
1248 await pool.destroy()
1249 const elapsedTime = performance.now() - startTime
90afa746 1250 expect(tasksFinished).toBeLessThanOrEqual(numberOfWorkers * maxMultiplier)
85b2561d 1251 expect(elapsedTime).toBeGreaterThanOrEqual(2000)
14d5e183 1252 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 800)
85b2561d
JB
1253 })
1254
1255 it('Verify that destroy() waits until the tasks finished timeout is reached', async () => {
1256 const tasksFinishedTimeout = 1000
1257 const pool = new FixedThreadPool(
1258 numberOfWorkers,
1259 './tests/worker-files/thread/asyncWorker.mjs',
1260 {
1261 enableTasksQueue: true,
1262 tasksQueueOptions: { tasksFinishedTimeout }
1263 }
1264 )
1265 const maxMultiplier = 4
1266 let tasksFinished = 0
1267 for (const workerNode of pool.workerNodes) {
1268 workerNode.on('taskFinished', () => {
1269 ++tasksFinished
1270 })
1271 }
1272 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1273 pool.execute()
1274 }
1275 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1276 const startTime = performance.now()
1277 await pool.destroy()
1278 const elapsedTime = performance.now() - startTime
1279 expect(tasksFinished).toBe(0)
2885534c 1280 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 800)
85b2561d
JB
1281 })
1282
f18fd12b
JB
1283 it('Verify that pool asynchronous resource track tasks execution', async () => {
1284 let taskAsyncId
1285 let initCalls = 0
1286 let beforeCalls = 0
1287 let afterCalls = 0
1288 let resolveCalls = 0
1289 const hook = createHook({
1290 init (asyncId, type) {
1291 if (type === 'poolifier:task') {
1292 initCalls++
1293 taskAsyncId = asyncId
1294 }
1295 },
1296 before (asyncId) {
1297 if (asyncId === taskAsyncId) beforeCalls++
1298 },
1299 after (asyncId) {
1300 if (asyncId === taskAsyncId) afterCalls++
1301 },
1302 promiseResolve () {
1303 if (executionAsyncId() === taskAsyncId) resolveCalls++
1304 }
1305 })
f18fd12b
JB
1306 const pool = new FixedThreadPool(
1307 numberOfWorkers,
1308 './tests/worker-files/thread/testWorker.mjs'
1309 )
8954c0a3 1310 hook.enable()
f18fd12b
JB
1311 await pool.execute()
1312 hook.disable()
1313 expect(initCalls).toBe(1)
1314 expect(beforeCalls).toBe(1)
1315 expect(afterCalls).toBe(1)
1316 expect(resolveCalls).toBe(1)
8954c0a3 1317 await pool.destroy()
f18fd12b
JB
1318 })
1319
9eae3c69
JB
1320 it('Verify that hasTaskFunction() is working', async () => {
1321 const dynamicThreadPool = new DynamicThreadPool(
1322 Math.floor(numberOfWorkers / 2),
1323 numberOfWorkers,
b2fd3f4a 1324 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
9eae3c69
JB
1325 )
1326 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1327 expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1328 expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1329 true
1330 )
1331 expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true)
1332 expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true)
1333 expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false)
1334 await dynamicThreadPool.destroy()
1335 const fixedClusterPool = new FixedClusterPool(
1336 numberOfWorkers,
d35e5717 1337 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs'
9eae3c69
JB
1338 )
1339 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1340 expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1341 expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1342 true
1343 )
1344 expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true)
1345 expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true)
1346 expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false)
1347 await fixedClusterPool.destroy()
1348 })
1349
1350 it('Verify that addTaskFunction() is working', async () => {
1351 const dynamicThreadPool = new DynamicThreadPool(
1352 Math.floor(numberOfWorkers / 2),
1353 numberOfWorkers,
b2fd3f4a 1354 './tests/worker-files/thread/testWorker.mjs'
9eae3c69
JB
1355 )
1356 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
3feeab69
JB
1357 await expect(
1358 dynamicThreadPool.addTaskFunction(0, () => {})
948faff7 1359 ).rejects.toThrow(new TypeError('name argument must be a string'))
3feeab69
JB
1360 await expect(
1361 dynamicThreadPool.addTaskFunction('', () => {})
948faff7 1362 ).rejects.toThrow(
3feeab69
JB
1363 new TypeError('name argument must not be an empty string')
1364 )
948faff7
JB
1365 await expect(dynamicThreadPool.addTaskFunction('test', 0)).rejects.toThrow(
1366 new TypeError('fn argument must be a function')
1367 )
1368 await expect(dynamicThreadPool.addTaskFunction('test', '')).rejects.toThrow(
1369 new TypeError('fn argument must be a function')
1370 )
9eae3c69
JB
1371 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1372 DEFAULT_TASK_NAME,
1373 'test'
1374 ])
1375 const echoTaskFunction = data => {
1376 return data
1377 }
1378 await expect(
1379 dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1380 ).resolves.toBe(true)
1381 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1382 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1383 echoTaskFunction
1384 )
1385 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1386 DEFAULT_TASK_NAME,
1387 'test',
1388 'echo'
1389 ])
1390 const taskFunctionData = { test: 'test' }
1391 const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
1392 expect(echoResult).toStrictEqual(taskFunctionData)
adee6053
JB
1393 for (const workerNode of dynamicThreadPool.workerNodes) {
1394 expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
1395 tasks: {
1396 executed: expect.any(Number),
1397 executing: 0,
1398 queued: 0,
463226a4 1399 sequentiallyStolen: 0,
5ad42e34 1400 stolen: 0,
adee6053
JB
1401 failed: 0
1402 },
1403 runTime: {
1404 history: new CircularArray()
1405 },
1406 waitTime: {
1407 history: new CircularArray()
1408 },
1409 elu: {
1410 idle: {
1411 history: new CircularArray()
1412 },
1413 active: {
1414 history: new CircularArray()
1415 }
1416 }
1417 })
1418 }
9eae3c69
JB
1419 await dynamicThreadPool.destroy()
1420 })
1421
1422 it('Verify that removeTaskFunction() is working', async () => {
1423 const dynamicThreadPool = new DynamicThreadPool(
1424 Math.floor(numberOfWorkers / 2),
1425 numberOfWorkers,
b2fd3f4a 1426 './tests/worker-files/thread/testWorker.mjs'
9eae3c69
JB
1427 )
1428 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1429 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1430 DEFAULT_TASK_NAME,
1431 'test'
1432 ])
948faff7 1433 await expect(dynamicThreadPool.removeTaskFunction('test')).rejects.toThrow(
16248b23 1434 new Error('Cannot remove a task function not handled on the pool side')
9eae3c69
JB
1435 )
1436 const echoTaskFunction = data => {
1437 return data
1438 }
1439 await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1440 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1441 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1442 echoTaskFunction
1443 )
1444 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1445 DEFAULT_TASK_NAME,
1446 'test',
1447 'echo'
1448 ])
1449 await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
1450 true
1451 )
1452 expect(dynamicThreadPool.taskFunctions.size).toBe(0)
1453 expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
1454 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1455 DEFAULT_TASK_NAME,
1456 'test'
1457 ])
1458 await dynamicThreadPool.destroy()
1459 })
1460
30500265 1461 it('Verify that listTaskFunctionNames() is working', async () => {
90d7d101
JB
1462 const dynamicThreadPool = new DynamicThreadPool(
1463 Math.floor(numberOfWorkers / 2),
1464 numberOfWorkers,
b2fd3f4a 1465 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
90d7d101
JB
1466 )
1467 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
66979634 1468 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
6cd5248f 1469 DEFAULT_TASK_NAME,
90d7d101
JB
1470 'jsonIntegerSerialization',
1471 'factorial',
1472 'fibonacci'
1473 ])
9eae3c69 1474 await dynamicThreadPool.destroy()
90d7d101
JB
1475 const fixedClusterPool = new FixedClusterPool(
1476 numberOfWorkers,
d35e5717 1477 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs'
90d7d101
JB
1478 )
1479 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
66979634 1480 expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([
6cd5248f 1481 DEFAULT_TASK_NAME,
90d7d101
JB
1482 'jsonIntegerSerialization',
1483 'factorial',
1484 'fibonacci'
1485 ])
0fe39c97 1486 await fixedClusterPool.destroy()
90d7d101
JB
1487 })
1488
9eae3c69 1489 it('Verify that setDefaultTaskFunction() is working', async () => {
30500265
JB
1490 const dynamicThreadPool = new DynamicThreadPool(
1491 Math.floor(numberOfWorkers / 2),
1492 numberOfWorkers,
b2fd3f4a 1493 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
30500265
JB
1494 )
1495 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
711623b8 1496 const workerId = dynamicThreadPool.workerNodes[0].info.id
948faff7 1497 await expect(dynamicThreadPool.setDefaultTaskFunction(0)).rejects.toThrow(
b0b55f57 1498 new Error(
711623b8 1499 `Task function operation 'default' failed on worker ${workerId} with error: 'TypeError: name parameter is not a string'`
b0b55f57
JB
1500 )
1501 )
1502 await expect(
1503 dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
948faff7 1504 ).rejects.toThrow(
b0b55f57 1505 new Error(
711623b8 1506 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function reserved name as the default task function'`
b0b55f57
JB
1507 )
1508 )
1509 await expect(
1510 dynamicThreadPool.setDefaultTaskFunction('unknown')
948faff7 1511 ).rejects.toThrow(
b0b55f57 1512 new Error(
711623b8 1513 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function to a non-existing task function'`
b0b55f57
JB
1514 )
1515 )
9eae3c69
JB
1516 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1517 DEFAULT_TASK_NAME,
1518 'jsonIntegerSerialization',
1519 'factorial',
1520 'fibonacci'
1521 ])
1522 await expect(
1523 dynamicThreadPool.setDefaultTaskFunction('factorial')
1524 ).resolves.toBe(true)
1525 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1526 DEFAULT_TASK_NAME,
1527 'factorial',
1528 'jsonIntegerSerialization',
1529 'fibonacci'
1530 ])
1531 await expect(
1532 dynamicThreadPool.setDefaultTaskFunction('fibonacci')
1533 ).resolves.toBe(true)
1534 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1535 DEFAULT_TASK_NAME,
1536 'fibonacci',
1537 'jsonIntegerSerialization',
1538 'factorial'
1539 ])
cda9ba34 1540 await dynamicThreadPool.destroy()
30500265
JB
1541 })
1542
90d7d101 1543 it('Verify that multiple task functions worker is working', async () => {
70a4f5ea 1544 const pool = new DynamicClusterPool(
2431bdb4 1545 Math.floor(numberOfWorkers / 2),
70a4f5ea 1546 numberOfWorkers,
d35e5717 1547 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs'
70a4f5ea
JB
1548 )
1549 const data = { n: 10 }
82888165 1550 const result0 = await pool.execute(data)
30b963d4 1551 expect(result0).toStrictEqual({ ok: 1 })
70a4f5ea 1552 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
30b963d4 1553 expect(result1).toStrictEqual({ ok: 1 })
70a4f5ea
JB
1554 const result2 = await pool.execute(data, 'factorial')
1555 expect(result2).toBe(3628800)
1556 const result3 = await pool.execute(data, 'fibonacci')
024daf59 1557 expect(result3).toBe(55)
5bb5be17
JB
1558 expect(pool.info.executingTasks).toBe(0)
1559 expect(pool.info.executedTasks).toBe(4)
b414b84c 1560 for (const workerNode of pool.workerNodes) {
66979634 1561 expect(workerNode.info.taskFunctionNames).toStrictEqual([
6cd5248f 1562 DEFAULT_TASK_NAME,
b414b84c
JB
1563 'jsonIntegerSerialization',
1564 'factorial',
1565 'fibonacci'
1566 ])
1567 expect(workerNode.taskFunctionsUsage.size).toBe(3)
66979634 1568 for (const name of pool.listTaskFunctionNames()) {
5bb5be17
JB
1569 expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
1570 tasks: {
1571 executed: expect.any(Number),
4ba4c7f9 1572 executing: 0,
5bb5be17 1573 failed: 0,
68cbdc84 1574 queued: 0,
463226a4 1575 sequentiallyStolen: 0,
68cbdc84 1576 stolen: 0
5bb5be17
JB
1577 },
1578 runTime: {
1579 history: expect.any(CircularArray)
1580 },
1581 waitTime: {
1582 history: expect.any(CircularArray)
1583 },
1584 elu: {
1585 idle: {
1586 history: expect.any(CircularArray)
1587 },
1588 active: {
1589 history: expect.any(CircularArray)
1590 }
1591 }
1592 })
1593 expect(
4ba4c7f9
JB
1594 workerNode.getTaskFunctionWorkerUsage(name).tasks.executed
1595 ).toBeGreaterThan(0)
5bb5be17 1596 }
dfd7ec01
JB
1597 expect(
1598 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1599 ).toStrictEqual(
66979634
JB
1600 workerNode.getTaskFunctionWorkerUsage(
1601 workerNode.info.taskFunctionNames[1]
1602 )
dfd7ec01 1603 )
5bb5be17 1604 }
0fe39c97 1605 await pool.destroy()
70a4f5ea 1606 })
52a23942
JB
1607
1608 it('Verify sendKillMessageToWorker()', async () => {
1609 const pool = new DynamicClusterPool(
1610 Math.floor(numberOfWorkers / 2),
1611 numberOfWorkers,
d35e5717 1612 './tests/worker-files/cluster/testWorker.cjs'
52a23942
JB
1613 )
1614 const workerNodeKey = 0
1615 await expect(
adee6053 1616 pool.sendKillMessageToWorker(workerNodeKey)
52a23942
JB
1617 ).resolves.toBeUndefined()
1618 await pool.destroy()
1619 })
adee6053
JB
1620
1621 it('Verify sendTaskFunctionOperationToWorker()', async () => {
1622 const pool = new DynamicClusterPool(
1623 Math.floor(numberOfWorkers / 2),
1624 numberOfWorkers,
d35e5717 1625 './tests/worker-files/cluster/testWorker.cjs'
adee6053
JB
1626 )
1627 const workerNodeKey = 0
1628 await expect(
1629 pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
1630 taskFunctionOperation: 'add',
1631 taskFunctionName: 'empty',
1632 taskFunction: (() => {}).toString()
1633 })
1634 ).resolves.toBe(true)
1635 expect(
1636 pool.workerNodes[workerNodeKey].info.taskFunctionNames
1637 ).toStrictEqual([DEFAULT_TASK_NAME, 'test', 'empty'])
1638 await pool.destroy()
1639 })
1640
1641 it('Verify sendTaskFunctionOperationToWorkers()', async () => {
1642 const pool = new DynamicClusterPool(
1643 Math.floor(numberOfWorkers / 2),
1644 numberOfWorkers,
d35e5717 1645 './tests/worker-files/cluster/testWorker.cjs'
adee6053 1646 )
adee6053
JB
1647 await expect(
1648 pool.sendTaskFunctionOperationToWorkers({
1649 taskFunctionOperation: 'add',
1650 taskFunctionName: 'empty',
1651 taskFunction: (() => {}).toString()
1652 })
1653 ).resolves.toBe(true)
1654 for (const workerNode of pool.workerNodes) {
1655 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1656 DEFAULT_TASK_NAME,
1657 'test',
1658 'empty'
1659 ])
1660 }
1661 await pool.destroy()
1662 })
3ec964d6 1663})