refactor: remove unneeded type casting
[poolifier.git] / tests / pools / abstract-pool.test.mjs
CommitLineData
a074ffee 1import { EventEmitterAsyncResource } from 'node:events'
2eb14889 2import { dirname, join } from 'node:path'
a074ffee 3import { readFileSync } from 'node:fs'
2eb14889 4import { fileURLToPath } from 'node:url'
f18fd12b 5import { createHook, executionAsyncId } from 'node:async_hooks'
a074ffee
JB
6import { expect } from 'expect'
7import { restore, stub } from 'sinon'
8import {
70a4f5ea 9 DynamicClusterPool,
9e619829 10 DynamicThreadPool,
aee46736 11 FixedClusterPool,
e843b904 12 FixedThreadPool,
aee46736 13 PoolEvents,
184855e6 14 PoolTypes,
3d6dd312 15 WorkerChoiceStrategies,
184855e6 16 WorkerTypes
a074ffee
JB
17} from '../../lib/index.js'
18import { CircularArray } from '../../lib/circular-array.js'
19import { Deque } from '../../lib/deque.js'
20import { DEFAULT_TASK_NAME } from '../../lib/utils.js'
21import { waitPoolEvents } from '../test-utils.js'
22import { WorkerNode } from '../../lib/pools/worker-node.js'
e1ffb94f
JB
23
24describe('Abstract pool test suite', () => {
2eb14889
JB
25 const version = JSON.parse(
26 readFileSync(
a5e5599c 27 join(dirname(fileURLToPath(import.meta.url)), '../..', 'package.json'),
2eb14889
JB
28 'utf8'
29 )
30 ).version
fc027381 31 const numberOfWorkers = 2
a8884ffd 32 class StubPoolWithIsMain extends FixedThreadPool {
e1ffb94f
JB
33 isMain () {
34 return false
35 }
3ec964d6 36 }
3ec964d6 37
dc021bcc 38 afterEach(() => {
a074ffee 39 restore()
dc021bcc
JB
40 })
41
bcf85f7f
JB
42 it('Verify that pool can be created and destroyed', async () => {
43 const pool = new FixedThreadPool(
44 numberOfWorkers,
45 './tests/worker-files/thread/testWorker.mjs'
46 )
47 expect(pool).toBeInstanceOf(FixedThreadPool)
48 await pool.destroy()
49 })
50
51 it('Verify that pool cannot be created from a non main thread/process', () => {
8d3782fa
JB
52 expect(
53 () =>
a8884ffd 54 new StubPoolWithIsMain(
7c0ba920 55 numberOfWorkers,
b2fd3f4a 56 './tests/worker-files/thread/testWorker.mjs',
8d3782fa 57 {
041dc05b 58 errorHandler: e => console.error(e)
8d3782fa
JB
59 }
60 )
948faff7 61 ).toThrow(
e695d66f
JB
62 new Error(
63 'Cannot start a pool from a worker with the same type as the pool'
64 )
04f45163 65 )
3ec964d6 66 })
c510fea7 67
bc61cfe6
JB
68 it('Verify that pool statuses properties are set', async () => {
69 const pool = new FixedThreadPool(
70 numberOfWorkers,
b2fd3f4a 71 './tests/worker-files/thread/testWorker.mjs'
bc61cfe6 72 )
bc61cfe6 73 expect(pool.started).toBe(true)
711623b8
JB
74 expect(pool.starting).toBe(false)
75 expect(pool.destroying).toBe(false)
bc61cfe6 76 await pool.destroy()
bc61cfe6
JB
77 })
78
c510fea7 79 it('Verify that filePath is checked', () => {
948faff7 80 expect(() => new FixedThreadPool(numberOfWorkers)).toThrow(
c3719753
JB
81 new TypeError('The worker file path must be specified')
82 )
83 expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrow(
84 new TypeError('The worker file path must be a string')
3d6dd312
JB
85 )
86 expect(
87 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
948faff7 88 ).toThrow(new Error("Cannot find the worker file './dummyWorker.ts'"))
8d3782fa
JB
89 })
90
91 it('Verify that numberOfWorkers is checked', () => {
8003c026
JB
92 expect(
93 () =>
94 new FixedThreadPool(
95 undefined,
b2fd3f4a 96 './tests/worker-files/thread/testWorker.mjs'
8003c026 97 )
948faff7 98 ).toThrow(
e695d66f
JB
99 new Error(
100 'Cannot instantiate a pool without specifying the number of workers'
101 )
8d3782fa
JB
102 )
103 })
104
105 it('Verify that a negative number of workers is checked', () => {
106 expect(
107 () =>
108 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
948faff7 109 ).toThrow(
473c717a
JB
110 new RangeError(
111 'Cannot instantiate a pool with a negative number of workers'
112 )
8d3782fa
JB
113 )
114 })
115
116 it('Verify that a non integer number of workers is checked', () => {
117 expect(
118 () =>
b2fd3f4a 119 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.mjs')
948faff7 120 ).toThrow(
473c717a 121 new TypeError(
0d80593b 122 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
123 )
124 )
c510fea7 125 })
7c0ba920 126
26ce26ca
JB
127 it('Verify that pool arguments number and pool type are checked', () => {
128 expect(
129 () =>
130 new FixedThreadPool(
131 numberOfWorkers,
132 './tests/worker-files/thread/testWorker.mjs',
133 undefined,
134 numberOfWorkers * 2
135 )
136 ).toThrow(
137 new Error(
138 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
139 )
140 )
141 })
142
216541b6 143 it('Verify that dynamic pool sizing is checked', () => {
a5ed75b7
JB
144 expect(
145 () =>
146 new DynamicClusterPool(
147 1,
148 undefined,
149 './tests/worker-files/cluster/testWorker.js'
150 )
948faff7 151 ).toThrow(
a5ed75b7
JB
152 new TypeError(
153 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
154 )
155 )
2761efb4
JB
156 expect(
157 () =>
158 new DynamicThreadPool(
159 0.5,
160 1,
b2fd3f4a 161 './tests/worker-files/thread/testWorker.mjs'
2761efb4 162 )
948faff7 163 ).toThrow(
2761efb4
JB
164 new TypeError(
165 'Cannot instantiate a pool with a non safe integer number of workers'
166 )
167 )
168 expect(
169 () =>
170 new DynamicClusterPool(
171 0,
172 0.5,
a5ed75b7 173 './tests/worker-files/cluster/testWorker.js'
2761efb4 174 )
948faff7 175 ).toThrow(
2761efb4
JB
176 new TypeError(
177 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
178 )
179 )
2431bdb4
JB
180 expect(
181 () =>
b2fd3f4a
JB
182 new DynamicThreadPool(
183 2,
184 1,
185 './tests/worker-files/thread/testWorker.mjs'
186 )
948faff7 187 ).toThrow(
2431bdb4
JB
188 new RangeError(
189 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
190 )
191 )
192 expect(
193 () =>
b2fd3f4a
JB
194 new DynamicThreadPool(
195 0,
196 0,
197 './tests/worker-files/thread/testWorker.mjs'
198 )
948faff7 199 ).toThrow(
2431bdb4 200 new RangeError(
213cbac6 201 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
2431bdb4
JB
202 )
203 )
21f710aa
JB
204 expect(
205 () =>
213cbac6
JB
206 new DynamicClusterPool(
207 1,
208 1,
209 './tests/worker-files/cluster/testWorker.js'
210 )
948faff7 211 ).toThrow(
21f710aa 212 new RangeError(
213cbac6 213 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
21f710aa
JB
214 )
215 )
2431bdb4
JB
216 })
217
fd7ebd49 218 it('Verify that pool options are checked', async () => {
7c0ba920
JB
219 let pool = new FixedThreadPool(
220 numberOfWorkers,
b2fd3f4a 221 './tests/worker-files/thread/testWorker.mjs'
7c0ba920 222 )
b5604034 223 expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
47352846
JB
224 expect(pool.opts).toStrictEqual({
225 startWorkers: true,
226 enableEvents: true,
227 restartWorkerOnError: true,
228 enableTasksQueue: false,
26ce26ca 229 workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN
8990357d
JB
230 })
231 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
26ce26ca 232 retries: pool.info.maxSize,
932fc8be 233 runTime: { median: false },
5df69fab
JB
234 waitTime: { median: false },
235 elu: { median: false }
da309861 236 })
999ef664
JB
237 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
238 .workerChoiceStrategies) {
239 expect(workerChoiceStrategy.opts).toStrictEqual({
26ce26ca 240 retries: pool.info.maxSize,
999ef664
JB
241 runTime: { median: false },
242 waitTime: { median: false },
243 elu: { median: false }
244 })
245 }
fd7ebd49 246 await pool.destroy()
73bfd59d 247 const testHandler = () => console.info('test handler executed')
7c0ba920
JB
248 pool = new FixedThreadPool(
249 numberOfWorkers,
b2fd3f4a 250 './tests/worker-files/thread/testWorker.mjs',
7c0ba920 251 {
e4543b14 252 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
49be33fe 253 workerChoiceStrategyOptions: {
932fc8be 254 runTime: { median: true },
fc027381 255 weights: { 0: 300, 1: 200 }
49be33fe 256 },
35cf1c03 257 enableEvents: false,
1f68cede 258 restartWorkerOnError: false,
ff733df7 259 enableTasksQueue: true,
d4aeae5a 260 tasksQueueOptions: { concurrency: 2 },
35cf1c03
JB
261 messageHandler: testHandler,
262 errorHandler: testHandler,
263 onlineHandler: testHandler,
264 exitHandler: testHandler
7c0ba920
JB
265 }
266 )
7c0ba920 267 expect(pool.emitter).toBeUndefined()
47352846
JB
268 expect(pool.opts).toStrictEqual({
269 startWorkers: true,
270 enableEvents: false,
271 restartWorkerOnError: false,
272 enableTasksQueue: true,
273 tasksQueueOptions: {
274 concurrency: 2,
2324f8c9 275 size: Math.pow(numberOfWorkers, 2),
dbd73092 276 taskStealing: true,
32b141fd 277 tasksStealingOnBackPressure: true,
568d0075 278 tasksFinishedTimeout: 2000
47352846
JB
279 },
280 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
281 workerChoiceStrategyOptions: {
47352846 282 runTime: { median: true },
47352846
JB
283 weights: { 0: 300, 1: 200 }
284 },
285 onlineHandler: testHandler,
286 messageHandler: testHandler,
287 errorHandler: testHandler,
288 exitHandler: testHandler
8990357d
JB
289 })
290 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
449cd154
JB
291 retries:
292 pool.info.maxSize +
293 Object.keys(pool.opts.workerChoiceStrategyOptions.weights).length,
8990357d
JB
294 runTime: { median: true },
295 waitTime: { median: false },
296 elu: { median: false },
fc027381 297 weights: { 0: 300, 1: 200 }
da309861 298 })
999ef664
JB
299 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
300 .workerChoiceStrategies) {
301 expect(workerChoiceStrategy.opts).toStrictEqual({
449cd154
JB
302 retries:
303 pool.info.maxSize +
304 Object.keys(pool.opts.workerChoiceStrategyOptions.weights).length,
999ef664
JB
305 runTime: { median: true },
306 waitTime: { median: false },
307 elu: { median: false },
308 weights: { 0: 300, 1: 200 }
309 })
310 }
fd7ebd49 311 await pool.destroy()
7c0ba920
JB
312 })
313
fe291b64 314 it('Verify that pool options are validated', () => {
d4aeae5a
JB
315 expect(
316 () =>
317 new FixedThreadPool(
318 numberOfWorkers,
b2fd3f4a 319 './tests/worker-files/thread/testWorker.mjs',
d4aeae5a 320 {
f0d7f803 321 workerChoiceStrategy: 'invalidStrategy'
d4aeae5a
JB
322 }
323 )
948faff7 324 ).toThrow(new Error("Invalid worker choice strategy 'invalidStrategy'"))
49be33fe
JB
325 expect(
326 () =>
327 new FixedThreadPool(
328 numberOfWorkers,
b2fd3f4a 329 './tests/worker-files/thread/testWorker.mjs',
49be33fe
JB
330 {
331 workerChoiceStrategyOptions: { weights: {} }
332 }
333 )
948faff7 334 ).toThrow(
8735b4e5
JB
335 new Error(
336 'Invalid worker choice strategy options: must have a weight for each worker node'
337 )
49be33fe 338 )
f0d7f803
JB
339 expect(
340 () =>
341 new FixedThreadPool(
342 numberOfWorkers,
b2fd3f4a 343 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
344 {
345 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
346 }
347 )
948faff7 348 ).toThrow(
8735b4e5
JB
349 new Error(
350 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
351 )
f0d7f803 352 )
5c4c2dee
JB
353 expect(
354 () =>
355 new FixedThreadPool(
356 numberOfWorkers,
b2fd3f4a 357 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
358 {
359 enableTasksQueue: true,
360 tasksQueueOptions: 'invalidTasksQueueOptions'
361 }
362 )
948faff7 363 ).toThrow(
5c4c2dee
JB
364 new TypeError('Invalid tasks queue options: must be a plain object')
365 )
f0d7f803
JB
366 expect(
367 () =>
368 new FixedThreadPool(
369 numberOfWorkers,
b2fd3f4a 370 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
371 {
372 enableTasksQueue: true,
373 tasksQueueOptions: { concurrency: 0 }
374 }
375 )
948faff7 376 ).toThrow(
e695d66f 377 new RangeError(
20c6f652 378 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
8735b4e5
JB
379 )
380 )
f0d7f803
JB
381 expect(
382 () =>
383 new FixedThreadPool(
384 numberOfWorkers,
b2fd3f4a 385 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
386 {
387 enableTasksQueue: true,
5c4c2dee 388 tasksQueueOptions: { concurrency: -1 }
f0d7f803
JB
389 }
390 )
948faff7 391 ).toThrow(
5c4c2dee
JB
392 new RangeError(
393 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
394 )
8735b4e5 395 )
f0d7f803
JB
396 expect(
397 () =>
398 new FixedThreadPool(
399 numberOfWorkers,
b2fd3f4a 400 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
401 {
402 enableTasksQueue: true,
403 tasksQueueOptions: { concurrency: 0.2 }
404 }
405 )
948faff7 406 ).toThrow(
20c6f652 407 new TypeError('Invalid worker node tasks concurrency: must be an integer')
8735b4e5 408 )
5c4c2dee
JB
409 expect(
410 () =>
411 new FixedThreadPool(
412 numberOfWorkers,
b2fd3f4a 413 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
414 {
415 enableTasksQueue: true,
416 tasksQueueOptions: { size: 0 }
417 }
418 )
948faff7 419 ).toThrow(
5c4c2dee
JB
420 new RangeError(
421 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
422 )
423 )
424 expect(
425 () =>
426 new FixedThreadPool(
427 numberOfWorkers,
b2fd3f4a 428 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
429 {
430 enableTasksQueue: true,
431 tasksQueueOptions: { size: -1 }
432 }
433 )
948faff7 434 ).toThrow(
5c4c2dee
JB
435 new RangeError(
436 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
437 )
438 )
439 expect(
440 () =>
441 new FixedThreadPool(
442 numberOfWorkers,
b2fd3f4a 443 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
444 {
445 enableTasksQueue: true,
446 tasksQueueOptions: { size: 0.2 }
447 }
448 )
948faff7 449 ).toThrow(
5c4c2dee
JB
450 new TypeError('Invalid worker node tasks queue size: must be an integer')
451 )
d4aeae5a
JB
452 })
453
2431bdb4 454 it('Verify that pool worker choice strategy options can be set', async () => {
a20f0ba5
JB
455 const pool = new FixedThreadPool(
456 numberOfWorkers,
b2fd3f4a 457 './tests/worker-files/thread/testWorker.mjs',
a20f0ba5
JB
458 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
459 )
26ce26ca 460 expect(pool.opts.workerChoiceStrategyOptions).toBeUndefined()
8990357d 461 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
26ce26ca 462 retries: pool.info.maxSize,
932fc8be 463 runTime: { median: false },
5df69fab
JB
464 waitTime: { median: false },
465 elu: { median: false }
a20f0ba5
JB
466 })
467 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
468 .workerChoiceStrategies) {
86bf340d 469 expect(workerChoiceStrategy.opts).toStrictEqual({
26ce26ca 470 retries: pool.info.maxSize,
932fc8be 471 runTime: { median: false },
5df69fab
JB
472 waitTime: { median: false },
473 elu: { median: false }
86bf340d 474 })
a20f0ba5 475 }
87de9ff5
JB
476 expect(
477 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
478 ).toStrictEqual({
932fc8be
JB
479 runTime: {
480 aggregate: true,
481 average: true,
482 median: false
483 },
484 waitTime: {
485 aggregate: false,
486 average: false,
487 median: false
488 },
5df69fab 489 elu: {
9adcefab
JB
490 aggregate: true,
491 average: true,
5df69fab
JB
492 median: false
493 }
86bf340d 494 })
9adcefab
JB
495 pool.setWorkerChoiceStrategyOptions({
496 runTime: { median: true },
497 elu: { median: true }
498 })
a20f0ba5 499 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
9adcefab 500 runTime: { median: true },
8990357d
JB
501 elu: { median: true }
502 })
503 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
26ce26ca 504 retries: pool.info.maxSize,
8990357d
JB
505 runTime: { median: true },
506 waitTime: { median: false },
9adcefab 507 elu: { median: true }
a20f0ba5
JB
508 })
509 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
510 .workerChoiceStrategies) {
932fc8be 511 expect(workerChoiceStrategy.opts).toStrictEqual({
26ce26ca 512 retries: pool.info.maxSize,
9adcefab 513 runTime: { median: true },
8990357d 514 waitTime: { median: false },
9adcefab 515 elu: { median: true }
932fc8be 516 })
a20f0ba5 517 }
87de9ff5
JB
518 expect(
519 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
520 ).toStrictEqual({
932fc8be
JB
521 runTime: {
522 aggregate: true,
523 average: false,
524 median: true
525 },
526 waitTime: {
527 aggregate: false,
528 average: false,
529 median: false
530 },
5df69fab 531 elu: {
9adcefab 532 aggregate: true,
5df69fab 533 average: false,
9adcefab 534 median: true
5df69fab 535 }
86bf340d 536 })
9adcefab
JB
537 pool.setWorkerChoiceStrategyOptions({
538 runTime: { median: false },
539 elu: { median: false }
540 })
a20f0ba5 541 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
8990357d 542 runTime: { median: false },
8990357d
JB
543 elu: { median: false }
544 })
545 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
26ce26ca 546 retries: pool.info.maxSize,
9adcefab 547 runTime: { median: false },
8990357d 548 waitTime: { median: false },
9adcefab 549 elu: { median: false }
a20f0ba5
JB
550 })
551 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
552 .workerChoiceStrategies) {
932fc8be 553 expect(workerChoiceStrategy.opts).toStrictEqual({
26ce26ca 554 retries: pool.info.maxSize,
9adcefab 555 runTime: { median: false },
8990357d 556 waitTime: { median: false },
9adcefab 557 elu: { median: false }
932fc8be 558 })
a20f0ba5 559 }
87de9ff5
JB
560 expect(
561 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
562 ).toStrictEqual({
932fc8be
JB
563 runTime: {
564 aggregate: true,
565 average: true,
566 median: false
567 },
568 waitTime: {
569 aggregate: false,
570 average: false,
571 median: false
572 },
5df69fab 573 elu: {
9adcefab
JB
574 aggregate: true,
575 average: true,
5df69fab
JB
576 median: false
577 }
86bf340d 578 })
1f95d544
JB
579 expect(() =>
580 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
948faff7 581 ).toThrow(
8735b4e5
JB
582 new TypeError(
583 'Invalid worker choice strategy options: must be a plain object'
584 )
1f95d544 585 )
948faff7 586 expect(() => pool.setWorkerChoiceStrategyOptions({ weights: {} })).toThrow(
8735b4e5
JB
587 new Error(
588 'Invalid worker choice strategy options: must have a weight for each worker node'
589 )
1f95d544
JB
590 )
591 expect(() =>
592 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
948faff7 593 ).toThrow(
8735b4e5
JB
594 new Error(
595 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
596 )
1f95d544 597 )
a20f0ba5
JB
598 await pool.destroy()
599 })
600
2431bdb4 601 it('Verify that pool tasks queue can be enabled/disabled', async () => {
a20f0ba5
JB
602 const pool = new FixedThreadPool(
603 numberOfWorkers,
b2fd3f4a 604 './tests/worker-files/thread/testWorker.mjs'
a20f0ba5
JB
605 )
606 expect(pool.opts.enableTasksQueue).toBe(false)
607 expect(pool.opts.tasksQueueOptions).toBeUndefined()
608 pool.enableTasksQueue(true)
609 expect(pool.opts.enableTasksQueue).toBe(true)
20c6f652
JB
610 expect(pool.opts.tasksQueueOptions).toStrictEqual({
611 concurrency: 1,
2324f8c9 612 size: Math.pow(numberOfWorkers, 2),
dbd73092 613 taskStealing: true,
32b141fd 614 tasksStealingOnBackPressure: true,
568d0075 615 tasksFinishedTimeout: 2000
20c6f652 616 })
a20f0ba5
JB
617 pool.enableTasksQueue(true, { concurrency: 2 })
618 expect(pool.opts.enableTasksQueue).toBe(true)
20c6f652
JB
619 expect(pool.opts.tasksQueueOptions).toStrictEqual({
620 concurrency: 2,
2324f8c9 621 size: Math.pow(numberOfWorkers, 2),
dbd73092 622 taskStealing: true,
32b141fd 623 tasksStealingOnBackPressure: true,
568d0075 624 tasksFinishedTimeout: 2000
20c6f652 625 })
a20f0ba5
JB
626 pool.enableTasksQueue(false)
627 expect(pool.opts.enableTasksQueue).toBe(false)
628 expect(pool.opts.tasksQueueOptions).toBeUndefined()
629 await pool.destroy()
630 })
631
2431bdb4 632 it('Verify that pool tasks queue options can be set', async () => {
a20f0ba5
JB
633 const pool = new FixedThreadPool(
634 numberOfWorkers,
b2fd3f4a 635 './tests/worker-files/thread/testWorker.mjs',
a20f0ba5
JB
636 { enableTasksQueue: true }
637 )
20c6f652
JB
638 expect(pool.opts.tasksQueueOptions).toStrictEqual({
639 concurrency: 1,
2324f8c9 640 size: Math.pow(numberOfWorkers, 2),
dbd73092 641 taskStealing: true,
32b141fd 642 tasksStealingOnBackPressure: true,
568d0075 643 tasksFinishedTimeout: 2000
20c6f652 644 })
d6ca1416 645 for (const workerNode of pool.workerNodes) {
2324f8c9
JB
646 expect(workerNode.tasksQueueBackPressureSize).toBe(
647 pool.opts.tasksQueueOptions.size
648 )
d6ca1416
JB
649 }
650 pool.setTasksQueueOptions({
651 concurrency: 2,
2324f8c9 652 size: 2,
d6ca1416 653 taskStealing: false,
32b141fd 654 tasksStealingOnBackPressure: false,
568d0075 655 tasksFinishedTimeout: 3000
d6ca1416 656 })
20c6f652
JB
657 expect(pool.opts.tasksQueueOptions).toStrictEqual({
658 concurrency: 2,
2324f8c9 659 size: 2,
d6ca1416 660 taskStealing: false,
32b141fd 661 tasksStealingOnBackPressure: false,
568d0075 662 tasksFinishedTimeout: 3000
d6ca1416
JB
663 })
664 for (const workerNode of pool.workerNodes) {
2324f8c9
JB
665 expect(workerNode.tasksQueueBackPressureSize).toBe(
666 pool.opts.tasksQueueOptions.size
667 )
d6ca1416
JB
668 }
669 pool.setTasksQueueOptions({
670 concurrency: 1,
671 taskStealing: true,
672 tasksStealingOnBackPressure: true
673 })
674 expect(pool.opts.tasksQueueOptions).toStrictEqual({
675 concurrency: 1,
2324f8c9 676 size: Math.pow(numberOfWorkers, 2),
dbd73092 677 taskStealing: true,
32b141fd 678 tasksStealingOnBackPressure: true,
568d0075 679 tasksFinishedTimeout: 2000
20c6f652 680 })
d6ca1416 681 for (const workerNode of pool.workerNodes) {
2324f8c9
JB
682 expect(workerNode.tasksQueueBackPressureSize).toBe(
683 pool.opts.tasksQueueOptions.size
684 )
d6ca1416 685 }
948faff7 686 expect(() => pool.setTasksQueueOptions('invalidTasksQueueOptions')).toThrow(
8735b4e5
JB
687 new TypeError('Invalid tasks queue options: must be a plain object')
688 )
948faff7 689 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrow(
e695d66f 690 new RangeError(
20c6f652 691 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
8735b4e5
JB
692 )
693 )
948faff7 694 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrow(
e695d66f 695 new RangeError(
20c6f652 696 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
8735b4e5 697 )
a20f0ba5 698 )
948faff7 699 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrow(
20c6f652
JB
700 new TypeError('Invalid worker node tasks concurrency: must be an integer')
701 )
948faff7 702 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrow(
20c6f652 703 new RangeError(
68dbcdc0 704 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
20c6f652
JB
705 )
706 )
948faff7 707 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrow(
20c6f652 708 new RangeError(
68dbcdc0 709 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
20c6f652
JB
710 )
711 )
948faff7 712 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrow(
68dbcdc0 713 new TypeError('Invalid worker node tasks queue size: must be an integer')
f0d7f803 714 )
a20f0ba5
JB
715 await pool.destroy()
716 })
717
6b27d407
JB
718 it('Verify that pool info is set', async () => {
719 let pool = new FixedThreadPool(
720 numberOfWorkers,
b2fd3f4a 721 './tests/worker-files/thread/testWorker.mjs'
6b27d407 722 )
2dca6cab
JB
723 expect(pool.info).toStrictEqual({
724 version,
725 type: PoolTypes.fixed,
726 worker: WorkerTypes.thread,
47352846 727 started: true,
2dca6cab
JB
728 ready: true,
729 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
730 minSize: numberOfWorkers,
731 maxSize: numberOfWorkers,
732 workerNodes: numberOfWorkers,
733 idleWorkerNodes: numberOfWorkers,
734 busyWorkerNodes: 0,
735 executedTasks: 0,
736 executingTasks: 0,
2dca6cab
JB
737 failedTasks: 0
738 })
6b27d407
JB
739 await pool.destroy()
740 pool = new DynamicClusterPool(
2431bdb4 741 Math.floor(numberOfWorkers / 2),
6b27d407 742 numberOfWorkers,
ecdfbdc0 743 './tests/worker-files/cluster/testWorker.js'
6b27d407 744 )
2dca6cab
JB
745 expect(pool.info).toStrictEqual({
746 version,
747 type: PoolTypes.dynamic,
748 worker: WorkerTypes.cluster,
47352846 749 started: true,
2dca6cab
JB
750 ready: true,
751 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
752 minSize: Math.floor(numberOfWorkers / 2),
753 maxSize: numberOfWorkers,
754 workerNodes: Math.floor(numberOfWorkers / 2),
755 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
756 busyWorkerNodes: 0,
757 executedTasks: 0,
758 executingTasks: 0,
2dca6cab
JB
759 failedTasks: 0
760 })
6b27d407
JB
761 await pool.destroy()
762 })
763
2431bdb4 764 it('Verify that pool worker tasks usage are initialized', async () => {
bf9549ae
JB
765 const pool = new FixedClusterPool(
766 numberOfWorkers,
767 './tests/worker-files/cluster/testWorker.js'
768 )
f06e48d8 769 for (const workerNode of pool.workerNodes) {
47352846 770 expect(workerNode).toBeInstanceOf(WorkerNode)
465b2940 771 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
772 tasks: {
773 executed: 0,
774 executing: 0,
775 queued: 0,
df593701 776 maxQueued: 0,
463226a4 777 sequentiallyStolen: 0,
68cbdc84 778 stolen: 0,
a4e07f72
JB
779 failed: 0
780 },
781 runTime: {
4ba4c7f9 782 history: new CircularArray()
a4e07f72
JB
783 },
784 waitTime: {
4ba4c7f9 785 history: new CircularArray()
a4e07f72 786 },
5df69fab
JB
787 elu: {
788 idle: {
4ba4c7f9 789 history: new CircularArray()
5df69fab
JB
790 },
791 active: {
4ba4c7f9 792 history: new CircularArray()
f7510105 793 }
5df69fab 794 }
86bf340d 795 })
f06e48d8
JB
796 }
797 await pool.destroy()
798 })
799
2431bdb4
JB
800 it('Verify that pool worker tasks queue are initialized', async () => {
801 let pool = new FixedClusterPool(
f06e48d8
JB
802 numberOfWorkers,
803 './tests/worker-files/cluster/testWorker.js'
804 )
805 for (const workerNode of pool.workerNodes) {
47352846 806 expect(workerNode).toBeInstanceOf(WorkerNode)
574b351d 807 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
4d8bf9e4 808 expect(workerNode.tasksQueue.size).toBe(0)
9c16fb4b 809 expect(workerNode.tasksQueue.maxSize).toBe(0)
bf9549ae 810 }
fd7ebd49 811 await pool.destroy()
2431bdb4
JB
812 pool = new DynamicThreadPool(
813 Math.floor(numberOfWorkers / 2),
814 numberOfWorkers,
b2fd3f4a 815 './tests/worker-files/thread/testWorker.mjs'
2431bdb4
JB
816 )
817 for (const workerNode of pool.workerNodes) {
47352846 818 expect(workerNode).toBeInstanceOf(WorkerNode)
574b351d 819 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
2431bdb4
JB
820 expect(workerNode.tasksQueue.size).toBe(0)
821 expect(workerNode.tasksQueue.maxSize).toBe(0)
822 }
213cbac6 823 await pool.destroy()
2431bdb4
JB
824 })
825
826 it('Verify that pool worker info are initialized', async () => {
827 let pool = new FixedClusterPool(
828 numberOfWorkers,
829 './tests/worker-files/cluster/testWorker.js'
830 )
2dca6cab 831 for (const workerNode of pool.workerNodes) {
47352846 832 expect(workerNode).toBeInstanceOf(WorkerNode)
2dca6cab
JB
833 expect(workerNode.info).toStrictEqual({
834 id: expect.any(Number),
835 type: WorkerTypes.cluster,
836 dynamic: false,
837 ready: true
838 })
839 }
2431bdb4
JB
840 await pool.destroy()
841 pool = new DynamicThreadPool(
842 Math.floor(numberOfWorkers / 2),
843 numberOfWorkers,
b2fd3f4a 844 './tests/worker-files/thread/testWorker.mjs'
2431bdb4 845 )
2dca6cab 846 for (const workerNode of pool.workerNodes) {
47352846 847 expect(workerNode).toBeInstanceOf(WorkerNode)
2dca6cab
JB
848 expect(workerNode.info).toStrictEqual({
849 id: expect.any(Number),
850 type: WorkerTypes.thread,
851 dynamic: false,
7884d183 852 ready: true
2dca6cab
JB
853 })
854 }
213cbac6 855 await pool.destroy()
bf9549ae
JB
856 })
857
711623b8
JB
858 it('Verify that pool statuses are checked at start or destroy', async () => {
859 const pool = new FixedThreadPool(
860 numberOfWorkers,
861 './tests/worker-files/thread/testWorker.mjs'
862 )
863 expect(pool.info.started).toBe(true)
864 expect(pool.info.ready).toBe(true)
865 expect(() => pool.start()).toThrow(
866 new Error('Cannot start an already started pool')
867 )
868 await pool.destroy()
869 expect(pool.info.started).toBe(false)
870 expect(pool.info.ready).toBe(false)
871 await expect(pool.destroy()).rejects.toThrow(
872 new Error('Cannot destroy an already destroyed pool')
873 )
874 })
875
47352846
JB
876 it('Verify that pool can be started after initialization', async () => {
877 const pool = new FixedClusterPool(
878 numberOfWorkers,
879 './tests/worker-files/cluster/testWorker.js',
880 {
881 startWorkers: false
882 }
883 )
884 expect(pool.info.started).toBe(false)
885 expect(pool.info.ready).toBe(false)
55082af9 886 expect(pool.readyEventEmitted).toBe(false)
47352846 887 expect(pool.workerNodes).toStrictEqual([])
948faff7 888 await expect(pool.execute()).rejects.toThrow(
47352846
JB
889 new Error('Cannot execute a task on not started pool')
890 )
891 pool.start()
892 expect(pool.info.started).toBe(true)
893 expect(pool.info.ready).toBe(true)
55082af9
JB
894 await waitPoolEvents(pool, PoolEvents.ready, 1)
895 expect(pool.readyEventEmitted).toBe(true)
47352846
JB
896 expect(pool.workerNodes.length).toBe(numberOfWorkers)
897 for (const workerNode of pool.workerNodes) {
898 expect(workerNode).toBeInstanceOf(WorkerNode)
899 }
900 await pool.destroy()
901 })
902
9d2d0da1
JB
903 it('Verify that pool execute() arguments are checked', async () => {
904 const pool = new FixedClusterPool(
905 numberOfWorkers,
906 './tests/worker-files/cluster/testWorker.js'
907 )
948faff7 908 await expect(pool.execute(undefined, 0)).rejects.toThrow(
9d2d0da1
JB
909 new TypeError('name argument must be a string')
910 )
948faff7 911 await expect(pool.execute(undefined, '')).rejects.toThrow(
9d2d0da1
JB
912 new TypeError('name argument must not be an empty string')
913 )
948faff7 914 await expect(pool.execute(undefined, undefined, {})).rejects.toThrow(
9d2d0da1
JB
915 new TypeError('transferList argument must be an array')
916 )
917 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
918 "Task function 'unknown' not found"
919 )
920 await pool.destroy()
948faff7 921 await expect(pool.execute()).rejects.toThrow(
47352846 922 new Error('Cannot execute a task on not started pool')
9d2d0da1
JB
923 )
924 })
925
2431bdb4 926 it('Verify that pool worker tasks usage are computed', async () => {
bf9549ae
JB
927 const pool = new FixedClusterPool(
928 numberOfWorkers,
929 './tests/worker-files/cluster/testWorker.js'
930 )
09c2d0d3 931 const promises = new Set()
fc027381
JB
932 const maxMultiplier = 2
933 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
09c2d0d3 934 promises.add(pool.execute())
bf9549ae 935 }
f06e48d8 936 for (const workerNode of pool.workerNodes) {
465b2940 937 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
938 tasks: {
939 executed: 0,
940 executing: maxMultiplier,
941 queued: 0,
df593701 942 maxQueued: 0,
463226a4 943 sequentiallyStolen: 0,
68cbdc84 944 stolen: 0,
a4e07f72
JB
945 failed: 0
946 },
947 runTime: {
a4e07f72
JB
948 history: expect.any(CircularArray)
949 },
950 waitTime: {
a4e07f72
JB
951 history: expect.any(CircularArray)
952 },
5df69fab
JB
953 elu: {
954 idle: {
5df69fab
JB
955 history: expect.any(CircularArray)
956 },
957 active: {
5df69fab 958 history: expect.any(CircularArray)
f7510105 959 }
5df69fab 960 }
86bf340d 961 })
bf9549ae
JB
962 }
963 await Promise.all(promises)
f06e48d8 964 for (const workerNode of pool.workerNodes) {
465b2940 965 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
966 tasks: {
967 executed: maxMultiplier,
968 executing: 0,
969 queued: 0,
df593701 970 maxQueued: 0,
463226a4 971 sequentiallyStolen: 0,
68cbdc84 972 stolen: 0,
a4e07f72
JB
973 failed: 0
974 },
975 runTime: {
a4e07f72
JB
976 history: expect.any(CircularArray)
977 },
978 waitTime: {
a4e07f72
JB
979 history: expect.any(CircularArray)
980 },
5df69fab
JB
981 elu: {
982 idle: {
5df69fab
JB
983 history: expect.any(CircularArray)
984 },
985 active: {
5df69fab 986 history: expect.any(CircularArray)
f7510105 987 }
5df69fab 988 }
86bf340d 989 })
bf9549ae 990 }
fd7ebd49 991 await pool.destroy()
bf9549ae
JB
992 })
993
2431bdb4 994 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
7fd82a1c 995 const pool = new DynamicThreadPool(
2431bdb4 996 Math.floor(numberOfWorkers / 2),
8f4878b7 997 numberOfWorkers,
b2fd3f4a 998 './tests/worker-files/thread/testWorker.mjs'
9e619829 999 )
09c2d0d3 1000 const promises = new Set()
ee9f5295
JB
1001 const maxMultiplier = 2
1002 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
09c2d0d3 1003 promises.add(pool.execute())
9e619829
JB
1004 }
1005 await Promise.all(promises)
f06e48d8 1006 for (const workerNode of pool.workerNodes) {
465b2940 1007 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
1008 tasks: {
1009 executed: expect.any(Number),
1010 executing: 0,
1011 queued: 0,
df593701 1012 maxQueued: 0,
463226a4 1013 sequentiallyStolen: 0,
68cbdc84 1014 stolen: 0,
a4e07f72
JB
1015 failed: 0
1016 },
1017 runTime: {
a4e07f72
JB
1018 history: expect.any(CircularArray)
1019 },
1020 waitTime: {
a4e07f72
JB
1021 history: expect.any(CircularArray)
1022 },
5df69fab
JB
1023 elu: {
1024 idle: {
5df69fab
JB
1025 history: expect.any(CircularArray)
1026 },
1027 active: {
5df69fab 1028 history: expect.any(CircularArray)
f7510105 1029 }
5df69fab 1030 }
86bf340d 1031 })
465b2940 1032 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
94407def
JB
1033 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1034 numberOfWorkers * maxMultiplier
1035 )
b97d82d8
JB
1036 expect(workerNode.usage.runTime.history.length).toBe(0)
1037 expect(workerNode.usage.waitTime.history.length).toBe(0)
1038 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1039 expect(workerNode.usage.elu.active.history.length).toBe(0)
9e619829
JB
1040 }
1041 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
f06e48d8 1042 for (const workerNode of pool.workerNodes) {
465b2940 1043 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
1044 tasks: {
1045 executed: 0,
1046 executing: 0,
1047 queued: 0,
df593701 1048 maxQueued: 0,
463226a4 1049 sequentiallyStolen: 0,
68cbdc84 1050 stolen: 0,
a4e07f72
JB
1051 failed: 0
1052 },
1053 runTime: {
a4e07f72
JB
1054 history: expect.any(CircularArray)
1055 },
1056 waitTime: {
a4e07f72
JB
1057 history: expect.any(CircularArray)
1058 },
5df69fab
JB
1059 elu: {
1060 idle: {
5df69fab
JB
1061 history: expect.any(CircularArray)
1062 },
1063 active: {
5df69fab 1064 history: expect.any(CircularArray)
f7510105 1065 }
5df69fab 1066 }
86bf340d 1067 })
465b2940
JB
1068 expect(workerNode.usage.runTime.history.length).toBe(0)
1069 expect(workerNode.usage.waitTime.history.length).toBe(0)
b97d82d8
JB
1070 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1071 expect(workerNode.usage.elu.active.history.length).toBe(0)
ee11a4a2 1072 }
fd7ebd49 1073 await pool.destroy()
ee11a4a2
JB
1074 })
1075
a1763c54
JB
1076 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1077 const pool = new DynamicClusterPool(
2431bdb4 1078 Math.floor(numberOfWorkers / 2),
164d950a 1079 numberOfWorkers,
a1763c54 1080 './tests/worker-files/cluster/testWorker.js'
164d950a 1081 )
c726f66c 1082 expect(pool.emitter.eventNames()).toStrictEqual([])
d46660cd 1083 let poolInfo
a1763c54 1084 let poolReady = 0
041dc05b 1085 pool.emitter.on(PoolEvents.ready, info => {
a1763c54 1086 ++poolReady
d46660cd
JB
1087 poolInfo = info
1088 })
a1763c54 1089 await waitPoolEvents(pool, PoolEvents.ready, 1)
c726f66c 1090 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
a1763c54 1091 expect(poolReady).toBe(1)
d46660cd 1092 expect(poolInfo).toStrictEqual({
23ccf9d7 1093 version,
d46660cd 1094 type: PoolTypes.dynamic,
a1763c54 1095 worker: WorkerTypes.cluster,
47352846 1096 started: true,
a1763c54 1097 ready: true,
2431bdb4
JB
1098 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1099 minSize: expect.any(Number),
1100 maxSize: expect.any(Number),
1101 workerNodes: expect.any(Number),
1102 idleWorkerNodes: expect.any(Number),
1103 busyWorkerNodes: expect.any(Number),
1104 executedTasks: expect.any(Number),
1105 executingTasks: expect.any(Number),
2431bdb4
JB
1106 failedTasks: expect.any(Number)
1107 })
1108 await pool.destroy()
1109 })
1110
a1763c54
JB
1111 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1112 const pool = new FixedThreadPool(
2431bdb4 1113 numberOfWorkers,
b2fd3f4a 1114 './tests/worker-files/thread/testWorker.mjs'
2431bdb4 1115 )
c726f66c 1116 expect(pool.emitter.eventNames()).toStrictEqual([])
a1763c54
JB
1117 const promises = new Set()
1118 let poolBusy = 0
2431bdb4 1119 let poolInfo
041dc05b 1120 pool.emitter.on(PoolEvents.busy, info => {
a1763c54 1121 ++poolBusy
2431bdb4
JB
1122 poolInfo = info
1123 })
c726f66c 1124 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
a1763c54
JB
1125 for (let i = 0; i < numberOfWorkers * 2; i++) {
1126 promises.add(pool.execute())
1127 }
1128 await Promise.all(promises)
1129 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1130 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1131 expect(poolBusy).toBe(numberOfWorkers + 1)
2431bdb4
JB
1132 expect(poolInfo).toStrictEqual({
1133 version,
a1763c54
JB
1134 type: PoolTypes.fixed,
1135 worker: WorkerTypes.thread,
47352846
JB
1136 started: true,
1137 ready: true,
2431bdb4 1138 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
d46660cd
JB
1139 minSize: expect.any(Number),
1140 maxSize: expect.any(Number),
1141 workerNodes: expect.any(Number),
1142 idleWorkerNodes: expect.any(Number),
1143 busyWorkerNodes: expect.any(Number),
a4e07f72
JB
1144 executedTasks: expect.any(Number),
1145 executingTasks: expect.any(Number),
a4e07f72 1146 failedTasks: expect.any(Number)
d46660cd 1147 })
164d950a
JB
1148 await pool.destroy()
1149 })
1150
a1763c54
JB
1151 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1152 const pool = new DynamicThreadPool(
1153 Math.floor(numberOfWorkers / 2),
7c0ba920 1154 numberOfWorkers,
b2fd3f4a 1155 './tests/worker-files/thread/testWorker.mjs'
7c0ba920 1156 )
c726f66c 1157 expect(pool.emitter.eventNames()).toStrictEqual([])
09c2d0d3 1158 const promises = new Set()
a1763c54 1159 let poolFull = 0
d46660cd 1160 let poolInfo
041dc05b 1161 pool.emitter.on(PoolEvents.full, info => {
a1763c54 1162 ++poolFull
d46660cd
JB
1163 poolInfo = info
1164 })
c726f66c 1165 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.full])
7c0ba920 1166 for (let i = 0; i < numberOfWorkers * 2; i++) {
f5d14e90 1167 promises.add(pool.execute())
7c0ba920 1168 }
cf597bc5 1169 await Promise.all(promises)
33e6bb4c 1170 expect(poolFull).toBe(1)
d46660cd 1171 expect(poolInfo).toStrictEqual({
23ccf9d7 1172 version,
a1763c54 1173 type: PoolTypes.dynamic,
d46660cd 1174 worker: WorkerTypes.thread,
47352846
JB
1175 started: true,
1176 ready: true,
2431bdb4 1177 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
8735b4e5
JB
1178 minSize: expect.any(Number),
1179 maxSize: expect.any(Number),
1180 workerNodes: expect.any(Number),
1181 idleWorkerNodes: expect.any(Number),
1182 busyWorkerNodes: expect.any(Number),
1183 executedTasks: expect.any(Number),
1184 executingTasks: expect.any(Number),
1185 failedTasks: expect.any(Number)
1186 })
1187 await pool.destroy()
1188 })
1189
3e8611a8 1190 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
b1aae695 1191 const pool = new FixedThreadPool(
8735b4e5 1192 numberOfWorkers,
b2fd3f4a 1193 './tests/worker-files/thread/testWorker.mjs',
8735b4e5
JB
1194 {
1195 enableTasksQueue: true
1196 }
1197 )
a074ffee 1198 stub(pool, 'hasBackPressure').returns(true)
c726f66c 1199 expect(pool.emitter.eventNames()).toStrictEqual([])
8735b4e5
JB
1200 const promises = new Set()
1201 let poolBackPressure = 0
1202 let poolInfo
041dc05b 1203 pool.emitter.on(PoolEvents.backPressure, info => {
8735b4e5
JB
1204 ++poolBackPressure
1205 poolInfo = info
1206 })
c726f66c 1207 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.backPressure])
033f1776 1208 for (let i = 0; i < numberOfWorkers + 1; i++) {
8735b4e5
JB
1209 promises.add(pool.execute())
1210 }
1211 await Promise.all(promises)
033f1776 1212 expect(poolBackPressure).toBe(1)
8735b4e5
JB
1213 expect(poolInfo).toStrictEqual({
1214 version,
3e8611a8 1215 type: PoolTypes.fixed,
8735b4e5 1216 worker: WorkerTypes.thread,
47352846
JB
1217 started: true,
1218 ready: true,
8735b4e5 1219 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
d46660cd
JB
1220 minSize: expect.any(Number),
1221 maxSize: expect.any(Number),
1222 workerNodes: expect.any(Number),
1223 idleWorkerNodes: expect.any(Number),
1224 busyWorkerNodes: expect.any(Number),
a4e07f72
JB
1225 executedTasks: expect.any(Number),
1226 executingTasks: expect.any(Number),
3e8611a8
JB
1227 maxQueuedTasks: expect.any(Number),
1228 queuedTasks: expect.any(Number),
1229 backPressure: true,
68cbdc84 1230 stolenTasks: expect.any(Number),
a4e07f72 1231 failedTasks: expect.any(Number)
d46660cd 1232 })
e458c82e 1233 expect(pool.hasBackPressure.callCount).toBe(5)
fd7ebd49 1234 await pool.destroy()
7c0ba920 1235 })
70a4f5ea 1236
85b2561d
JB
1237 it('Verify that destroy() waits for queued tasks to finish', async () => {
1238 const tasksFinishedTimeout = 2500
1239 const pool = new FixedThreadPool(
1240 numberOfWorkers,
1241 './tests/worker-files/thread/asyncWorker.mjs',
1242 {
1243 enableTasksQueue: true,
1244 tasksQueueOptions: { tasksFinishedTimeout }
1245 }
1246 )
1247 const maxMultiplier = 4
1248 let tasksFinished = 0
1249 for (const workerNode of pool.workerNodes) {
1250 workerNode.on('taskFinished', () => {
1251 ++tasksFinished
1252 })
1253 }
1254 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1255 pool.execute()
1256 }
1257 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1258 const startTime = performance.now()
1259 await pool.destroy()
1260 const elapsedTime = performance.now() - startTime
1261 expect(tasksFinished).toBe(numberOfWorkers * maxMultiplier)
1262 expect(elapsedTime).toBeGreaterThanOrEqual(2000)
6d41131f 1263 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 100)
85b2561d
JB
1264 })
1265
1266 it('Verify that destroy() waits until the tasks finished timeout is reached', async () => {
1267 const tasksFinishedTimeout = 1000
1268 const pool = new FixedThreadPool(
1269 numberOfWorkers,
1270 './tests/worker-files/thread/asyncWorker.mjs',
1271 {
1272 enableTasksQueue: true,
1273 tasksQueueOptions: { tasksFinishedTimeout }
1274 }
1275 )
1276 const maxMultiplier = 4
1277 let tasksFinished = 0
1278 for (const workerNode of pool.workerNodes) {
1279 workerNode.on('taskFinished', () => {
1280 ++tasksFinished
1281 })
1282 }
1283 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1284 pool.execute()
1285 }
1286 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1287 const startTime = performance.now()
1288 await pool.destroy()
1289 const elapsedTime = performance.now() - startTime
1290 expect(tasksFinished).toBe(0)
c004ecec 1291 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 600)
85b2561d
JB
1292 })
1293
f18fd12b
JB
1294 it('Verify that pool asynchronous resource track tasks execution', async () => {
1295 let taskAsyncId
1296 let initCalls = 0
1297 let beforeCalls = 0
1298 let afterCalls = 0
1299 let resolveCalls = 0
1300 const hook = createHook({
1301 init (asyncId, type) {
1302 if (type === 'poolifier:task') {
1303 initCalls++
1304 taskAsyncId = asyncId
1305 }
1306 },
1307 before (asyncId) {
1308 if (asyncId === taskAsyncId) beforeCalls++
1309 },
1310 after (asyncId) {
1311 if (asyncId === taskAsyncId) afterCalls++
1312 },
1313 promiseResolve () {
1314 if (executionAsyncId() === taskAsyncId) resolveCalls++
1315 }
1316 })
f18fd12b
JB
1317 const pool = new FixedThreadPool(
1318 numberOfWorkers,
1319 './tests/worker-files/thread/testWorker.mjs'
1320 )
8954c0a3 1321 hook.enable()
f18fd12b
JB
1322 await pool.execute()
1323 hook.disable()
1324 expect(initCalls).toBe(1)
1325 expect(beforeCalls).toBe(1)
1326 expect(afterCalls).toBe(1)
1327 expect(resolveCalls).toBe(1)
8954c0a3 1328 await pool.destroy()
f18fd12b
JB
1329 })
1330
9eae3c69
JB
1331 it('Verify that hasTaskFunction() is working', async () => {
1332 const dynamicThreadPool = new DynamicThreadPool(
1333 Math.floor(numberOfWorkers / 2),
1334 numberOfWorkers,
b2fd3f4a 1335 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
9eae3c69
JB
1336 )
1337 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1338 expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1339 expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1340 true
1341 )
1342 expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true)
1343 expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true)
1344 expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false)
1345 await dynamicThreadPool.destroy()
1346 const fixedClusterPool = new FixedClusterPool(
1347 numberOfWorkers,
1348 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1349 )
1350 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1351 expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1352 expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1353 true
1354 )
1355 expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true)
1356 expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true)
1357 expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false)
1358 await fixedClusterPool.destroy()
1359 })
1360
1361 it('Verify that addTaskFunction() is working', async () => {
1362 const dynamicThreadPool = new DynamicThreadPool(
1363 Math.floor(numberOfWorkers / 2),
1364 numberOfWorkers,
b2fd3f4a 1365 './tests/worker-files/thread/testWorker.mjs'
9eae3c69
JB
1366 )
1367 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
3feeab69
JB
1368 await expect(
1369 dynamicThreadPool.addTaskFunction(0, () => {})
948faff7 1370 ).rejects.toThrow(new TypeError('name argument must be a string'))
3feeab69
JB
1371 await expect(
1372 dynamicThreadPool.addTaskFunction('', () => {})
948faff7 1373 ).rejects.toThrow(
3feeab69
JB
1374 new TypeError('name argument must not be an empty string')
1375 )
948faff7
JB
1376 await expect(dynamicThreadPool.addTaskFunction('test', 0)).rejects.toThrow(
1377 new TypeError('fn argument must be a function')
1378 )
1379 await expect(dynamicThreadPool.addTaskFunction('test', '')).rejects.toThrow(
1380 new TypeError('fn argument must be a function')
1381 )
9eae3c69
JB
1382 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1383 DEFAULT_TASK_NAME,
1384 'test'
1385 ])
1386 const echoTaskFunction = data => {
1387 return data
1388 }
1389 await expect(
1390 dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1391 ).resolves.toBe(true)
1392 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1393 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1394 echoTaskFunction
1395 )
1396 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1397 DEFAULT_TASK_NAME,
1398 'test',
1399 'echo'
1400 ])
1401 const taskFunctionData = { test: 'test' }
1402 const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
1403 expect(echoResult).toStrictEqual(taskFunctionData)
adee6053
JB
1404 for (const workerNode of dynamicThreadPool.workerNodes) {
1405 expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
1406 tasks: {
1407 executed: expect.any(Number),
1408 executing: 0,
1409 queued: 0,
463226a4 1410 sequentiallyStolen: 0,
5ad42e34 1411 stolen: 0,
adee6053
JB
1412 failed: 0
1413 },
1414 runTime: {
1415 history: new CircularArray()
1416 },
1417 waitTime: {
1418 history: new CircularArray()
1419 },
1420 elu: {
1421 idle: {
1422 history: new CircularArray()
1423 },
1424 active: {
1425 history: new CircularArray()
1426 }
1427 }
1428 })
1429 }
9eae3c69
JB
1430 await dynamicThreadPool.destroy()
1431 })
1432
1433 it('Verify that removeTaskFunction() is working', async () => {
1434 const dynamicThreadPool = new DynamicThreadPool(
1435 Math.floor(numberOfWorkers / 2),
1436 numberOfWorkers,
b2fd3f4a 1437 './tests/worker-files/thread/testWorker.mjs'
9eae3c69
JB
1438 )
1439 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1440 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1441 DEFAULT_TASK_NAME,
1442 'test'
1443 ])
948faff7 1444 await expect(dynamicThreadPool.removeTaskFunction('test')).rejects.toThrow(
16248b23 1445 new Error('Cannot remove a task function not handled on the pool side')
9eae3c69
JB
1446 )
1447 const echoTaskFunction = data => {
1448 return data
1449 }
1450 await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1451 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1452 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1453 echoTaskFunction
1454 )
1455 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1456 DEFAULT_TASK_NAME,
1457 'test',
1458 'echo'
1459 ])
1460 await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
1461 true
1462 )
1463 expect(dynamicThreadPool.taskFunctions.size).toBe(0)
1464 expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
1465 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1466 DEFAULT_TASK_NAME,
1467 'test'
1468 ])
1469 await dynamicThreadPool.destroy()
1470 })
1471
30500265 1472 it('Verify that listTaskFunctionNames() is working', async () => {
90d7d101
JB
1473 const dynamicThreadPool = new DynamicThreadPool(
1474 Math.floor(numberOfWorkers / 2),
1475 numberOfWorkers,
b2fd3f4a 1476 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
90d7d101
JB
1477 )
1478 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
66979634 1479 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
6cd5248f 1480 DEFAULT_TASK_NAME,
90d7d101
JB
1481 'jsonIntegerSerialization',
1482 'factorial',
1483 'fibonacci'
1484 ])
9eae3c69 1485 await dynamicThreadPool.destroy()
90d7d101
JB
1486 const fixedClusterPool = new FixedClusterPool(
1487 numberOfWorkers,
1488 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1489 )
1490 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
66979634 1491 expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([
6cd5248f 1492 DEFAULT_TASK_NAME,
90d7d101
JB
1493 'jsonIntegerSerialization',
1494 'factorial',
1495 'fibonacci'
1496 ])
0fe39c97 1497 await fixedClusterPool.destroy()
90d7d101
JB
1498 })
1499
9eae3c69 1500 it('Verify that setDefaultTaskFunction() is working', async () => {
30500265
JB
1501 const dynamicThreadPool = new DynamicThreadPool(
1502 Math.floor(numberOfWorkers / 2),
1503 numberOfWorkers,
b2fd3f4a 1504 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
30500265
JB
1505 )
1506 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
711623b8 1507 const workerId = dynamicThreadPool.workerNodes[0].info.id
948faff7 1508 await expect(dynamicThreadPool.setDefaultTaskFunction(0)).rejects.toThrow(
b0b55f57 1509 new Error(
711623b8 1510 `Task function operation 'default' failed on worker ${workerId} with error: 'TypeError: name parameter is not a string'`
b0b55f57
JB
1511 )
1512 )
1513 await expect(
1514 dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
948faff7 1515 ).rejects.toThrow(
b0b55f57 1516 new Error(
711623b8 1517 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function reserved name as the default task function'`
b0b55f57
JB
1518 )
1519 )
1520 await expect(
1521 dynamicThreadPool.setDefaultTaskFunction('unknown')
948faff7 1522 ).rejects.toThrow(
b0b55f57 1523 new Error(
711623b8 1524 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function to a non-existing task function'`
b0b55f57
JB
1525 )
1526 )
9eae3c69
JB
1527 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1528 DEFAULT_TASK_NAME,
1529 'jsonIntegerSerialization',
1530 'factorial',
1531 'fibonacci'
1532 ])
1533 await expect(
1534 dynamicThreadPool.setDefaultTaskFunction('factorial')
1535 ).resolves.toBe(true)
1536 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1537 DEFAULT_TASK_NAME,
1538 'factorial',
1539 'jsonIntegerSerialization',
1540 'fibonacci'
1541 ])
1542 await expect(
1543 dynamicThreadPool.setDefaultTaskFunction('fibonacci')
1544 ).resolves.toBe(true)
1545 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1546 DEFAULT_TASK_NAME,
1547 'fibonacci',
1548 'jsonIntegerSerialization',
1549 'factorial'
1550 ])
cda9ba34 1551 await dynamicThreadPool.destroy()
30500265
JB
1552 })
1553
90d7d101 1554 it('Verify that multiple task functions worker is working', async () => {
70a4f5ea 1555 const pool = new DynamicClusterPool(
2431bdb4 1556 Math.floor(numberOfWorkers / 2),
70a4f5ea 1557 numberOfWorkers,
90d7d101 1558 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
70a4f5ea
JB
1559 )
1560 const data = { n: 10 }
82888165 1561 const result0 = await pool.execute(data)
30b963d4 1562 expect(result0).toStrictEqual({ ok: 1 })
70a4f5ea 1563 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
30b963d4 1564 expect(result1).toStrictEqual({ ok: 1 })
70a4f5ea
JB
1565 const result2 = await pool.execute(data, 'factorial')
1566 expect(result2).toBe(3628800)
1567 const result3 = await pool.execute(data, 'fibonacci')
024daf59 1568 expect(result3).toBe(55)
5bb5be17
JB
1569 expect(pool.info.executingTasks).toBe(0)
1570 expect(pool.info.executedTasks).toBe(4)
b414b84c 1571 for (const workerNode of pool.workerNodes) {
66979634 1572 expect(workerNode.info.taskFunctionNames).toStrictEqual([
6cd5248f 1573 DEFAULT_TASK_NAME,
b414b84c
JB
1574 'jsonIntegerSerialization',
1575 'factorial',
1576 'fibonacci'
1577 ])
1578 expect(workerNode.taskFunctionsUsage.size).toBe(3)
66979634 1579 for (const name of pool.listTaskFunctionNames()) {
5bb5be17
JB
1580 expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
1581 tasks: {
1582 executed: expect.any(Number),
4ba4c7f9 1583 executing: 0,
5bb5be17 1584 failed: 0,
68cbdc84 1585 queued: 0,
463226a4 1586 sequentiallyStolen: 0,
68cbdc84 1587 stolen: 0
5bb5be17
JB
1588 },
1589 runTime: {
1590 history: expect.any(CircularArray)
1591 },
1592 waitTime: {
1593 history: expect.any(CircularArray)
1594 },
1595 elu: {
1596 idle: {
1597 history: expect.any(CircularArray)
1598 },
1599 active: {
1600 history: expect.any(CircularArray)
1601 }
1602 }
1603 })
1604 expect(
4ba4c7f9
JB
1605 workerNode.getTaskFunctionWorkerUsage(name).tasks.executed
1606 ).toBeGreaterThan(0)
5bb5be17 1607 }
dfd7ec01
JB
1608 expect(
1609 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1610 ).toStrictEqual(
66979634
JB
1611 workerNode.getTaskFunctionWorkerUsage(
1612 workerNode.info.taskFunctionNames[1]
1613 )
dfd7ec01 1614 )
5bb5be17 1615 }
0fe39c97 1616 await pool.destroy()
70a4f5ea 1617 })
52a23942
JB
1618
1619 it('Verify sendKillMessageToWorker()', async () => {
1620 const pool = new DynamicClusterPool(
1621 Math.floor(numberOfWorkers / 2),
1622 numberOfWorkers,
1623 './tests/worker-files/cluster/testWorker.js'
1624 )
1625 const workerNodeKey = 0
1626 await expect(
adee6053 1627 pool.sendKillMessageToWorker(workerNodeKey)
52a23942 1628 ).resolves.toBeUndefined()
85b2561d
JB
1629 await expect(
1630 pool.sendKillMessageToWorker(numberOfWorkers)
1631 ).rejects.toStrictEqual(
1632 new Error(`Invalid worker node key '${numberOfWorkers}'`)
1633 )
52a23942
JB
1634 await pool.destroy()
1635 })
adee6053
JB
1636
1637 it('Verify sendTaskFunctionOperationToWorker()', async () => {
1638 const pool = new DynamicClusterPool(
1639 Math.floor(numberOfWorkers / 2),
1640 numberOfWorkers,
1641 './tests/worker-files/cluster/testWorker.js'
1642 )
1643 const workerNodeKey = 0
1644 await expect(
1645 pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
1646 taskFunctionOperation: 'add',
1647 taskFunctionName: 'empty',
1648 taskFunction: (() => {}).toString()
1649 })
1650 ).resolves.toBe(true)
1651 expect(
1652 pool.workerNodes[workerNodeKey].info.taskFunctionNames
1653 ).toStrictEqual([DEFAULT_TASK_NAME, 'test', 'empty'])
1654 await pool.destroy()
1655 })
1656
1657 it('Verify sendTaskFunctionOperationToWorkers()', async () => {
1658 const pool = new DynamicClusterPool(
1659 Math.floor(numberOfWorkers / 2),
1660 numberOfWorkers,
1661 './tests/worker-files/cluster/testWorker.js'
1662 )
adee6053
JB
1663 await expect(
1664 pool.sendTaskFunctionOperationToWorkers({
1665 taskFunctionOperation: 'add',
1666 taskFunctionName: 'empty',
1667 taskFunction: (() => {}).toString()
1668 })
1669 ).resolves.toBe(true)
1670 for (const workerNode of pool.workerNodes) {
1671 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1672 DEFAULT_TASK_NAME,
1673 'test',
1674 'empty'
1675 ])
1676 }
1677 await pool.destroy()
1678 })
3ec964d6 1679})