refactor: allow to disable tasks end wait time
[poolifier.git] / tests / pools / abstract-pool.test.mjs
CommitLineData
a074ffee 1import { EventEmitterAsyncResource } from 'node:events'
2eb14889 2import { dirname, join } from 'node:path'
a074ffee 3import { readFileSync } from 'node:fs'
2eb14889 4import { fileURLToPath } from 'node:url'
f18fd12b 5import { createHook, executionAsyncId } from 'node:async_hooks'
a074ffee
JB
6import { expect } from 'expect'
7import { restore, stub } from 'sinon'
8import {
70a4f5ea 9 DynamicClusterPool,
9e619829 10 DynamicThreadPool,
aee46736 11 FixedClusterPool,
e843b904 12 FixedThreadPool,
aee46736 13 PoolEvents,
184855e6 14 PoolTypes,
3d6dd312 15 WorkerChoiceStrategies,
184855e6 16 WorkerTypes
a074ffee
JB
17} from '../../lib/index.js'
18import { CircularArray } from '../../lib/circular-array.js'
19import { Deque } from '../../lib/deque.js'
20import { DEFAULT_TASK_NAME } from '../../lib/utils.js'
21import { waitPoolEvents } from '../test-utils.js'
22import { WorkerNode } from '../../lib/pools/worker-node.js'
e1ffb94f
JB
23
24describe('Abstract pool test suite', () => {
2eb14889
JB
25 const version = JSON.parse(
26 readFileSync(
a5e5599c 27 join(dirname(fileURLToPath(import.meta.url)), '../..', 'package.json'),
2eb14889
JB
28 'utf8'
29 )
30 ).version
fc027381 31 const numberOfWorkers = 2
a8884ffd 32 class StubPoolWithIsMain extends FixedThreadPool {
e1ffb94f
JB
33 isMain () {
34 return false
35 }
3ec964d6 36 }
3ec964d6 37
dc021bcc 38 afterEach(() => {
a074ffee 39 restore()
dc021bcc
JB
40 })
41
bcf85f7f
JB
42 it('Verify that pool can be created and destroyed', async () => {
43 const pool = new FixedThreadPool(
44 numberOfWorkers,
45 './tests/worker-files/thread/testWorker.mjs'
46 )
47 expect(pool).toBeInstanceOf(FixedThreadPool)
48 await pool.destroy()
49 })
50
51 it('Verify that pool cannot be created from a non main thread/process', () => {
8d3782fa
JB
52 expect(
53 () =>
a8884ffd 54 new StubPoolWithIsMain(
7c0ba920 55 numberOfWorkers,
b2fd3f4a 56 './tests/worker-files/thread/testWorker.mjs',
8d3782fa 57 {
041dc05b 58 errorHandler: e => console.error(e)
8d3782fa
JB
59 }
60 )
948faff7 61 ).toThrow(
e695d66f
JB
62 new Error(
63 'Cannot start a pool from a worker with the same type as the pool'
64 )
04f45163 65 )
3ec964d6 66 })
c510fea7 67
bc61cfe6
JB
68 it('Verify that pool statuses properties are set', async () => {
69 const pool = new FixedThreadPool(
70 numberOfWorkers,
b2fd3f4a 71 './tests/worker-files/thread/testWorker.mjs'
bc61cfe6 72 )
bc61cfe6 73 expect(pool.started).toBe(true)
711623b8
JB
74 expect(pool.starting).toBe(false)
75 expect(pool.destroying).toBe(false)
bc61cfe6 76 await pool.destroy()
bc61cfe6
JB
77 })
78
c510fea7 79 it('Verify that filePath is checked', () => {
948faff7 80 expect(() => new FixedThreadPool(numberOfWorkers)).toThrow(
c3719753
JB
81 new TypeError('The worker file path must be specified')
82 )
83 expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrow(
84 new TypeError('The worker file path must be a string')
3d6dd312
JB
85 )
86 expect(
87 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
948faff7 88 ).toThrow(new Error("Cannot find the worker file './dummyWorker.ts'"))
8d3782fa
JB
89 })
90
91 it('Verify that numberOfWorkers is checked', () => {
8003c026
JB
92 expect(
93 () =>
94 new FixedThreadPool(
95 undefined,
b2fd3f4a 96 './tests/worker-files/thread/testWorker.mjs'
8003c026 97 )
948faff7 98 ).toThrow(
e695d66f
JB
99 new Error(
100 'Cannot instantiate a pool without specifying the number of workers'
101 )
8d3782fa
JB
102 )
103 })
104
105 it('Verify that a negative number of workers is checked', () => {
106 expect(
107 () =>
108 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
948faff7 109 ).toThrow(
473c717a
JB
110 new RangeError(
111 'Cannot instantiate a pool with a negative number of workers'
112 )
8d3782fa
JB
113 )
114 })
115
116 it('Verify that a non integer number of workers is checked', () => {
117 expect(
118 () =>
b2fd3f4a 119 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.mjs')
948faff7 120 ).toThrow(
473c717a 121 new TypeError(
0d80593b 122 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
123 )
124 )
c510fea7 125 })
7c0ba920 126
216541b6 127 it('Verify that dynamic pool sizing is checked', () => {
a5ed75b7
JB
128 expect(
129 () =>
130 new DynamicClusterPool(
131 1,
132 undefined,
133 './tests/worker-files/cluster/testWorker.js'
134 )
948faff7 135 ).toThrow(
a5ed75b7
JB
136 new TypeError(
137 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
138 )
139 )
2761efb4
JB
140 expect(
141 () =>
142 new DynamicThreadPool(
143 0.5,
144 1,
b2fd3f4a 145 './tests/worker-files/thread/testWorker.mjs'
2761efb4 146 )
948faff7 147 ).toThrow(
2761efb4
JB
148 new TypeError(
149 'Cannot instantiate a pool with a non safe integer number of workers'
150 )
151 )
152 expect(
153 () =>
154 new DynamicClusterPool(
155 0,
156 0.5,
a5ed75b7 157 './tests/worker-files/cluster/testWorker.js'
2761efb4 158 )
948faff7 159 ).toThrow(
2761efb4
JB
160 new TypeError(
161 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
162 )
163 )
2431bdb4
JB
164 expect(
165 () =>
b2fd3f4a
JB
166 new DynamicThreadPool(
167 2,
168 1,
169 './tests/worker-files/thread/testWorker.mjs'
170 )
948faff7 171 ).toThrow(
2431bdb4
JB
172 new RangeError(
173 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
174 )
175 )
176 expect(
177 () =>
b2fd3f4a
JB
178 new DynamicThreadPool(
179 0,
180 0,
181 './tests/worker-files/thread/testWorker.mjs'
182 )
948faff7 183 ).toThrow(
2431bdb4 184 new RangeError(
213cbac6 185 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
2431bdb4
JB
186 )
187 )
21f710aa
JB
188 expect(
189 () =>
213cbac6
JB
190 new DynamicClusterPool(
191 1,
192 1,
193 './tests/worker-files/cluster/testWorker.js'
194 )
948faff7 195 ).toThrow(
21f710aa 196 new RangeError(
213cbac6 197 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
21f710aa
JB
198 )
199 )
2431bdb4
JB
200 })
201
fd7ebd49 202 it('Verify that pool options are checked', async () => {
7c0ba920
JB
203 let pool = new FixedThreadPool(
204 numberOfWorkers,
b2fd3f4a 205 './tests/worker-files/thread/testWorker.mjs'
7c0ba920 206 )
b5604034 207 expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
47352846
JB
208 expect(pool.opts).toStrictEqual({
209 startWorkers: true,
210 enableEvents: true,
211 restartWorkerOnError: true,
212 enableTasksQueue: false,
213 workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
214 workerChoiceStrategyOptions: {
215 retries: 6,
216 runTime: { median: false },
217 waitTime: { median: false },
218 elu: { median: false }
219 }
8990357d
JB
220 })
221 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
8c0b113f 222 retries: 6,
932fc8be 223 runTime: { median: false },
5df69fab
JB
224 waitTime: { median: false },
225 elu: { median: false }
da309861 226 })
999ef664
JB
227 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
228 .workerChoiceStrategies) {
229 expect(workerChoiceStrategy.opts).toStrictEqual({
230 retries: 6,
231 runTime: { median: false },
232 waitTime: { median: false },
233 elu: { median: false }
234 })
235 }
fd7ebd49 236 await pool.destroy()
73bfd59d 237 const testHandler = () => console.info('test handler executed')
7c0ba920
JB
238 pool = new FixedThreadPool(
239 numberOfWorkers,
b2fd3f4a 240 './tests/worker-files/thread/testWorker.mjs',
7c0ba920 241 {
e4543b14 242 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
49be33fe 243 workerChoiceStrategyOptions: {
932fc8be 244 runTime: { median: true },
fc027381 245 weights: { 0: 300, 1: 200 }
49be33fe 246 },
35cf1c03 247 enableEvents: false,
1f68cede 248 restartWorkerOnError: false,
ff733df7 249 enableTasksQueue: true,
d4aeae5a 250 tasksQueueOptions: { concurrency: 2 },
35cf1c03
JB
251 messageHandler: testHandler,
252 errorHandler: testHandler,
253 onlineHandler: testHandler,
254 exitHandler: testHandler
7c0ba920
JB
255 }
256 )
7c0ba920 257 expect(pool.emitter).toBeUndefined()
47352846
JB
258 expect(pool.opts).toStrictEqual({
259 startWorkers: true,
260 enableEvents: false,
261 restartWorkerOnError: false,
262 enableTasksQueue: true,
263 tasksQueueOptions: {
264 concurrency: 2,
2324f8c9 265 size: Math.pow(numberOfWorkers, 2),
dbd73092 266 taskStealing: true,
32b141fd 267 tasksStealingOnBackPressure: true,
568d0075 268 tasksFinishedTimeout: 2000
47352846
JB
269 },
270 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
271 workerChoiceStrategyOptions: {
272 retries: 6,
273 runTime: { median: true },
274 waitTime: { median: false },
275 elu: { median: false },
276 weights: { 0: 300, 1: 200 }
277 },
278 onlineHandler: testHandler,
279 messageHandler: testHandler,
280 errorHandler: testHandler,
281 exitHandler: testHandler
8990357d
JB
282 })
283 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
8c0b113f 284 retries: 6,
8990357d
JB
285 runTime: { median: true },
286 waitTime: { median: false },
287 elu: { median: false },
fc027381 288 weights: { 0: 300, 1: 200 }
da309861 289 })
999ef664
JB
290 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
291 .workerChoiceStrategies) {
292 expect(workerChoiceStrategy.opts).toStrictEqual({
293 retries: 6,
294 runTime: { median: true },
295 waitTime: { median: false },
296 elu: { median: false },
297 weights: { 0: 300, 1: 200 }
298 })
299 }
fd7ebd49 300 await pool.destroy()
7c0ba920
JB
301 })
302
fe291b64 303 it('Verify that pool options are validated', () => {
d4aeae5a
JB
304 expect(
305 () =>
306 new FixedThreadPool(
307 numberOfWorkers,
b2fd3f4a 308 './tests/worker-files/thread/testWorker.mjs',
d4aeae5a 309 {
f0d7f803 310 workerChoiceStrategy: 'invalidStrategy'
d4aeae5a
JB
311 }
312 )
948faff7 313 ).toThrow(new Error("Invalid worker choice strategy 'invalidStrategy'"))
7e653ee0
JB
314 expect(
315 () =>
316 new FixedThreadPool(
317 numberOfWorkers,
b2fd3f4a 318 './tests/worker-files/thread/testWorker.mjs',
7e653ee0
JB
319 {
320 workerChoiceStrategyOptions: {
8c0b113f 321 retries: 'invalidChoiceRetries'
7e653ee0
JB
322 }
323 }
324 )
948faff7 325 ).toThrow(
7e653ee0 326 new TypeError(
8c0b113f 327 'Invalid worker choice strategy options: retries must be an integer'
7e653ee0
JB
328 )
329 )
330 expect(
331 () =>
332 new FixedThreadPool(
333 numberOfWorkers,
b2fd3f4a 334 './tests/worker-files/thread/testWorker.mjs',
7e653ee0
JB
335 {
336 workerChoiceStrategyOptions: {
8c0b113f 337 retries: -1
7e653ee0
JB
338 }
339 }
340 )
948faff7 341 ).toThrow(
7e653ee0 342 new RangeError(
8c0b113f 343 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
7e653ee0
JB
344 )
345 )
49be33fe
JB
346 expect(
347 () =>
348 new FixedThreadPool(
349 numberOfWorkers,
b2fd3f4a 350 './tests/worker-files/thread/testWorker.mjs',
49be33fe
JB
351 {
352 workerChoiceStrategyOptions: { weights: {} }
353 }
354 )
948faff7 355 ).toThrow(
8735b4e5
JB
356 new Error(
357 'Invalid worker choice strategy options: must have a weight for each worker node'
358 )
49be33fe 359 )
f0d7f803
JB
360 expect(
361 () =>
362 new FixedThreadPool(
363 numberOfWorkers,
b2fd3f4a 364 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
365 {
366 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
367 }
368 )
948faff7 369 ).toThrow(
8735b4e5
JB
370 new Error(
371 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
372 )
f0d7f803 373 )
5c4c2dee
JB
374 expect(
375 () =>
376 new FixedThreadPool(
377 numberOfWorkers,
b2fd3f4a 378 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
379 {
380 enableTasksQueue: true,
381 tasksQueueOptions: 'invalidTasksQueueOptions'
382 }
383 )
948faff7 384 ).toThrow(
5c4c2dee
JB
385 new TypeError('Invalid tasks queue options: must be a plain object')
386 )
f0d7f803
JB
387 expect(
388 () =>
389 new FixedThreadPool(
390 numberOfWorkers,
b2fd3f4a 391 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
392 {
393 enableTasksQueue: true,
394 tasksQueueOptions: { concurrency: 0 }
395 }
396 )
948faff7 397 ).toThrow(
e695d66f 398 new RangeError(
20c6f652 399 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
8735b4e5
JB
400 )
401 )
f0d7f803
JB
402 expect(
403 () =>
404 new FixedThreadPool(
405 numberOfWorkers,
b2fd3f4a 406 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
407 {
408 enableTasksQueue: true,
5c4c2dee 409 tasksQueueOptions: { concurrency: -1 }
f0d7f803
JB
410 }
411 )
948faff7 412 ).toThrow(
5c4c2dee
JB
413 new RangeError(
414 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
415 )
8735b4e5 416 )
f0d7f803
JB
417 expect(
418 () =>
419 new FixedThreadPool(
420 numberOfWorkers,
b2fd3f4a 421 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
422 {
423 enableTasksQueue: true,
424 tasksQueueOptions: { concurrency: 0.2 }
425 }
426 )
948faff7 427 ).toThrow(
20c6f652 428 new TypeError('Invalid worker node tasks concurrency: must be an integer')
8735b4e5 429 )
5c4c2dee
JB
430 expect(
431 () =>
432 new FixedThreadPool(
433 numberOfWorkers,
b2fd3f4a 434 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
435 {
436 enableTasksQueue: true,
437 tasksQueueOptions: { size: 0 }
438 }
439 )
948faff7 440 ).toThrow(
5c4c2dee
JB
441 new RangeError(
442 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
443 )
444 )
445 expect(
446 () =>
447 new FixedThreadPool(
448 numberOfWorkers,
b2fd3f4a 449 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
450 {
451 enableTasksQueue: true,
452 tasksQueueOptions: { size: -1 }
453 }
454 )
948faff7 455 ).toThrow(
5c4c2dee
JB
456 new RangeError(
457 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
458 )
459 )
460 expect(
461 () =>
462 new FixedThreadPool(
463 numberOfWorkers,
b2fd3f4a 464 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
465 {
466 enableTasksQueue: true,
467 tasksQueueOptions: { size: 0.2 }
468 }
469 )
948faff7 470 ).toThrow(
5c4c2dee
JB
471 new TypeError('Invalid worker node tasks queue size: must be an integer')
472 )
d4aeae5a
JB
473 })
474
2431bdb4 475 it('Verify that pool worker choice strategy options can be set', async () => {
a20f0ba5
JB
476 const pool = new FixedThreadPool(
477 numberOfWorkers,
b2fd3f4a 478 './tests/worker-files/thread/testWorker.mjs',
a20f0ba5
JB
479 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
480 )
481 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
8c0b113f 482 retries: 6,
8990357d
JB
483 runTime: { median: false },
484 waitTime: { median: false },
485 elu: { median: false }
486 })
487 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
8c0b113f 488 retries: 6,
932fc8be 489 runTime: { median: false },
5df69fab
JB
490 waitTime: { median: false },
491 elu: { median: false }
a20f0ba5
JB
492 })
493 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
494 .workerChoiceStrategies) {
86bf340d 495 expect(workerChoiceStrategy.opts).toStrictEqual({
8c0b113f 496 retries: 6,
932fc8be 497 runTime: { median: false },
5df69fab
JB
498 waitTime: { median: false },
499 elu: { median: false }
86bf340d 500 })
a20f0ba5 501 }
87de9ff5
JB
502 expect(
503 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
504 ).toStrictEqual({
932fc8be
JB
505 runTime: {
506 aggregate: true,
507 average: true,
508 median: false
509 },
510 waitTime: {
511 aggregate: false,
512 average: false,
513 median: false
514 },
5df69fab 515 elu: {
9adcefab
JB
516 aggregate: true,
517 average: true,
5df69fab
JB
518 median: false
519 }
86bf340d 520 })
9adcefab
JB
521 pool.setWorkerChoiceStrategyOptions({
522 runTime: { median: true },
523 elu: { median: true }
524 })
a20f0ba5 525 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
8c0b113f 526 retries: 6,
9adcefab 527 runTime: { median: true },
8990357d
JB
528 waitTime: { median: false },
529 elu: { median: true }
530 })
531 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
8c0b113f 532 retries: 6,
8990357d
JB
533 runTime: { median: true },
534 waitTime: { median: false },
9adcefab 535 elu: { median: true }
a20f0ba5
JB
536 })
537 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
538 .workerChoiceStrategies) {
932fc8be 539 expect(workerChoiceStrategy.opts).toStrictEqual({
8c0b113f 540 retries: 6,
9adcefab 541 runTime: { median: true },
8990357d 542 waitTime: { median: false },
9adcefab 543 elu: { median: true }
932fc8be 544 })
a20f0ba5 545 }
87de9ff5
JB
546 expect(
547 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
548 ).toStrictEqual({
932fc8be
JB
549 runTime: {
550 aggregate: true,
551 average: false,
552 median: true
553 },
554 waitTime: {
555 aggregate: false,
556 average: false,
557 median: false
558 },
5df69fab 559 elu: {
9adcefab 560 aggregate: true,
5df69fab 561 average: false,
9adcefab 562 median: true
5df69fab 563 }
86bf340d 564 })
9adcefab
JB
565 pool.setWorkerChoiceStrategyOptions({
566 runTime: { median: false },
567 elu: { median: false }
568 })
a20f0ba5 569 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
8c0b113f 570 retries: 6,
8990357d
JB
571 runTime: { median: false },
572 waitTime: { median: false },
573 elu: { median: false }
574 })
575 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
8c0b113f 576 retries: 6,
9adcefab 577 runTime: { median: false },
8990357d 578 waitTime: { median: false },
9adcefab 579 elu: { median: false }
a20f0ba5
JB
580 })
581 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
582 .workerChoiceStrategies) {
932fc8be 583 expect(workerChoiceStrategy.opts).toStrictEqual({
8c0b113f 584 retries: 6,
9adcefab 585 runTime: { median: false },
8990357d 586 waitTime: { median: false },
9adcefab 587 elu: { median: false }
932fc8be 588 })
a20f0ba5 589 }
87de9ff5
JB
590 expect(
591 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
592 ).toStrictEqual({
932fc8be
JB
593 runTime: {
594 aggregate: true,
595 average: true,
596 median: false
597 },
598 waitTime: {
599 aggregate: false,
600 average: false,
601 median: false
602 },
5df69fab 603 elu: {
9adcefab
JB
604 aggregate: true,
605 average: true,
5df69fab
JB
606 median: false
607 }
86bf340d 608 })
1f95d544
JB
609 expect(() =>
610 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
948faff7 611 ).toThrow(
8735b4e5
JB
612 new TypeError(
613 'Invalid worker choice strategy options: must be a plain object'
614 )
1f95d544 615 )
7e653ee0
JB
616 expect(() =>
617 pool.setWorkerChoiceStrategyOptions({
8c0b113f 618 retries: 'invalidChoiceRetries'
7e653ee0 619 })
948faff7 620 ).toThrow(
7e653ee0 621 new TypeError(
8c0b113f 622 'Invalid worker choice strategy options: retries must be an integer'
7e653ee0
JB
623 )
624 )
948faff7 625 expect(() => pool.setWorkerChoiceStrategyOptions({ retries: -1 })).toThrow(
7e653ee0 626 new RangeError(
8c0b113f 627 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
7e653ee0
JB
628 )
629 )
948faff7 630 expect(() => pool.setWorkerChoiceStrategyOptions({ weights: {} })).toThrow(
8735b4e5
JB
631 new Error(
632 'Invalid worker choice strategy options: must have a weight for each worker node'
633 )
1f95d544
JB
634 )
635 expect(() =>
636 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
948faff7 637 ).toThrow(
8735b4e5
JB
638 new Error(
639 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
640 )
1f95d544 641 )
a20f0ba5
JB
642 await pool.destroy()
643 })
644
2431bdb4 645 it('Verify that pool tasks queue can be enabled/disabled', async () => {
a20f0ba5
JB
646 const pool = new FixedThreadPool(
647 numberOfWorkers,
b2fd3f4a 648 './tests/worker-files/thread/testWorker.mjs'
a20f0ba5
JB
649 )
650 expect(pool.opts.enableTasksQueue).toBe(false)
651 expect(pool.opts.tasksQueueOptions).toBeUndefined()
652 pool.enableTasksQueue(true)
653 expect(pool.opts.enableTasksQueue).toBe(true)
20c6f652
JB
654 expect(pool.opts.tasksQueueOptions).toStrictEqual({
655 concurrency: 1,
2324f8c9 656 size: Math.pow(numberOfWorkers, 2),
dbd73092 657 taskStealing: true,
32b141fd 658 tasksStealingOnBackPressure: true,
568d0075 659 tasksFinishedTimeout: 2000
20c6f652 660 })
a20f0ba5
JB
661 pool.enableTasksQueue(true, { concurrency: 2 })
662 expect(pool.opts.enableTasksQueue).toBe(true)
20c6f652
JB
663 expect(pool.opts.tasksQueueOptions).toStrictEqual({
664 concurrency: 2,
2324f8c9 665 size: Math.pow(numberOfWorkers, 2),
dbd73092 666 taskStealing: true,
32b141fd 667 tasksStealingOnBackPressure: true,
568d0075 668 tasksFinishedTimeout: 2000
20c6f652 669 })
a20f0ba5
JB
670 pool.enableTasksQueue(false)
671 expect(pool.opts.enableTasksQueue).toBe(false)
672 expect(pool.opts.tasksQueueOptions).toBeUndefined()
673 await pool.destroy()
674 })
675
2431bdb4 676 it('Verify that pool tasks queue options can be set', async () => {
a20f0ba5
JB
677 const pool = new FixedThreadPool(
678 numberOfWorkers,
b2fd3f4a 679 './tests/worker-files/thread/testWorker.mjs',
a20f0ba5
JB
680 { enableTasksQueue: true }
681 )
20c6f652
JB
682 expect(pool.opts.tasksQueueOptions).toStrictEqual({
683 concurrency: 1,
2324f8c9 684 size: Math.pow(numberOfWorkers, 2),
dbd73092 685 taskStealing: true,
32b141fd 686 tasksStealingOnBackPressure: true,
568d0075 687 tasksFinishedTimeout: 2000
20c6f652 688 })
d6ca1416 689 for (const workerNode of pool.workerNodes) {
2324f8c9
JB
690 expect(workerNode.tasksQueueBackPressureSize).toBe(
691 pool.opts.tasksQueueOptions.size
692 )
d6ca1416
JB
693 }
694 pool.setTasksQueueOptions({
695 concurrency: 2,
2324f8c9 696 size: 2,
d6ca1416 697 taskStealing: false,
32b141fd 698 tasksStealingOnBackPressure: false,
568d0075 699 tasksFinishedTimeout: 3000
d6ca1416 700 })
20c6f652
JB
701 expect(pool.opts.tasksQueueOptions).toStrictEqual({
702 concurrency: 2,
2324f8c9 703 size: 2,
d6ca1416 704 taskStealing: false,
32b141fd 705 tasksStealingOnBackPressure: false,
568d0075 706 tasksFinishedTimeout: 3000
d6ca1416
JB
707 })
708 for (const workerNode of pool.workerNodes) {
2324f8c9
JB
709 expect(workerNode.tasksQueueBackPressureSize).toBe(
710 pool.opts.tasksQueueOptions.size
711 )
d6ca1416
JB
712 }
713 pool.setTasksQueueOptions({
714 concurrency: 1,
715 taskStealing: true,
716 tasksStealingOnBackPressure: true
717 })
718 expect(pool.opts.tasksQueueOptions).toStrictEqual({
719 concurrency: 1,
2324f8c9 720 size: Math.pow(numberOfWorkers, 2),
dbd73092 721 taskStealing: true,
32b141fd 722 tasksStealingOnBackPressure: true,
568d0075 723 tasksFinishedTimeout: 2000
20c6f652 724 })
d6ca1416 725 for (const workerNode of pool.workerNodes) {
2324f8c9
JB
726 expect(workerNode.tasksQueueBackPressureSize).toBe(
727 pool.opts.tasksQueueOptions.size
728 )
d6ca1416 729 }
948faff7 730 expect(() => pool.setTasksQueueOptions('invalidTasksQueueOptions')).toThrow(
8735b4e5
JB
731 new TypeError('Invalid tasks queue options: must be a plain object')
732 )
948faff7 733 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrow(
e695d66f 734 new RangeError(
20c6f652 735 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
8735b4e5
JB
736 )
737 )
948faff7 738 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrow(
e695d66f 739 new RangeError(
20c6f652 740 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
8735b4e5 741 )
a20f0ba5 742 )
948faff7 743 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrow(
20c6f652
JB
744 new TypeError('Invalid worker node tasks concurrency: must be an integer')
745 )
948faff7 746 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrow(
20c6f652 747 new RangeError(
68dbcdc0 748 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
20c6f652
JB
749 )
750 )
948faff7 751 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrow(
20c6f652 752 new RangeError(
68dbcdc0 753 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
20c6f652
JB
754 )
755 )
948faff7 756 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrow(
68dbcdc0 757 new TypeError('Invalid worker node tasks queue size: must be an integer')
f0d7f803 758 )
a20f0ba5
JB
759 await pool.destroy()
760 })
761
6b27d407
JB
762 it('Verify that pool info is set', async () => {
763 let pool = new FixedThreadPool(
764 numberOfWorkers,
b2fd3f4a 765 './tests/worker-files/thread/testWorker.mjs'
6b27d407 766 )
2dca6cab
JB
767 expect(pool.info).toStrictEqual({
768 version,
769 type: PoolTypes.fixed,
770 worker: WorkerTypes.thread,
47352846 771 started: true,
2dca6cab
JB
772 ready: true,
773 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
774 minSize: numberOfWorkers,
775 maxSize: numberOfWorkers,
776 workerNodes: numberOfWorkers,
777 idleWorkerNodes: numberOfWorkers,
778 busyWorkerNodes: 0,
779 executedTasks: 0,
780 executingTasks: 0,
2dca6cab
JB
781 failedTasks: 0
782 })
6b27d407
JB
783 await pool.destroy()
784 pool = new DynamicClusterPool(
2431bdb4 785 Math.floor(numberOfWorkers / 2),
6b27d407 786 numberOfWorkers,
ecdfbdc0 787 './tests/worker-files/cluster/testWorker.js'
6b27d407 788 )
2dca6cab
JB
789 expect(pool.info).toStrictEqual({
790 version,
791 type: PoolTypes.dynamic,
792 worker: WorkerTypes.cluster,
47352846 793 started: true,
2dca6cab
JB
794 ready: true,
795 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
796 minSize: Math.floor(numberOfWorkers / 2),
797 maxSize: numberOfWorkers,
798 workerNodes: Math.floor(numberOfWorkers / 2),
799 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
800 busyWorkerNodes: 0,
801 executedTasks: 0,
802 executingTasks: 0,
2dca6cab
JB
803 failedTasks: 0
804 })
6b27d407
JB
805 await pool.destroy()
806 })
807
2431bdb4 808 it('Verify that pool worker tasks usage are initialized', async () => {
bf9549ae
JB
809 const pool = new FixedClusterPool(
810 numberOfWorkers,
811 './tests/worker-files/cluster/testWorker.js'
812 )
f06e48d8 813 for (const workerNode of pool.workerNodes) {
47352846 814 expect(workerNode).toBeInstanceOf(WorkerNode)
465b2940 815 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
816 tasks: {
817 executed: 0,
818 executing: 0,
819 queued: 0,
df593701 820 maxQueued: 0,
463226a4 821 sequentiallyStolen: 0,
68cbdc84 822 stolen: 0,
a4e07f72
JB
823 failed: 0
824 },
825 runTime: {
4ba4c7f9 826 history: new CircularArray()
a4e07f72
JB
827 },
828 waitTime: {
4ba4c7f9 829 history: new CircularArray()
a4e07f72 830 },
5df69fab
JB
831 elu: {
832 idle: {
4ba4c7f9 833 history: new CircularArray()
5df69fab
JB
834 },
835 active: {
4ba4c7f9 836 history: new CircularArray()
f7510105 837 }
5df69fab 838 }
86bf340d 839 })
f06e48d8
JB
840 }
841 await pool.destroy()
842 })
843
2431bdb4
JB
844 it('Verify that pool worker tasks queue are initialized', async () => {
845 let pool = new FixedClusterPool(
f06e48d8
JB
846 numberOfWorkers,
847 './tests/worker-files/cluster/testWorker.js'
848 )
849 for (const workerNode of pool.workerNodes) {
47352846 850 expect(workerNode).toBeInstanceOf(WorkerNode)
574b351d 851 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
4d8bf9e4 852 expect(workerNode.tasksQueue.size).toBe(0)
9c16fb4b 853 expect(workerNode.tasksQueue.maxSize).toBe(0)
bf9549ae 854 }
fd7ebd49 855 await pool.destroy()
2431bdb4
JB
856 pool = new DynamicThreadPool(
857 Math.floor(numberOfWorkers / 2),
858 numberOfWorkers,
b2fd3f4a 859 './tests/worker-files/thread/testWorker.mjs'
2431bdb4
JB
860 )
861 for (const workerNode of pool.workerNodes) {
47352846 862 expect(workerNode).toBeInstanceOf(WorkerNode)
574b351d 863 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
2431bdb4
JB
864 expect(workerNode.tasksQueue.size).toBe(0)
865 expect(workerNode.tasksQueue.maxSize).toBe(0)
866 }
213cbac6 867 await pool.destroy()
2431bdb4
JB
868 })
869
870 it('Verify that pool worker info are initialized', async () => {
871 let pool = new FixedClusterPool(
872 numberOfWorkers,
873 './tests/worker-files/cluster/testWorker.js'
874 )
2dca6cab 875 for (const workerNode of pool.workerNodes) {
47352846 876 expect(workerNode).toBeInstanceOf(WorkerNode)
2dca6cab
JB
877 expect(workerNode.info).toStrictEqual({
878 id: expect.any(Number),
879 type: WorkerTypes.cluster,
880 dynamic: false,
881 ready: true
882 })
883 }
2431bdb4
JB
884 await pool.destroy()
885 pool = new DynamicThreadPool(
886 Math.floor(numberOfWorkers / 2),
887 numberOfWorkers,
b2fd3f4a 888 './tests/worker-files/thread/testWorker.mjs'
2431bdb4 889 )
2dca6cab 890 for (const workerNode of pool.workerNodes) {
47352846 891 expect(workerNode).toBeInstanceOf(WorkerNode)
2dca6cab
JB
892 expect(workerNode.info).toStrictEqual({
893 id: expect.any(Number),
894 type: WorkerTypes.thread,
895 dynamic: false,
7884d183 896 ready: true
2dca6cab
JB
897 })
898 }
213cbac6 899 await pool.destroy()
bf9549ae
JB
900 })
901
711623b8
JB
902 it('Verify that pool statuses are checked at start or destroy', async () => {
903 const pool = new FixedThreadPool(
904 numberOfWorkers,
905 './tests/worker-files/thread/testWorker.mjs'
906 )
907 expect(pool.info.started).toBe(true)
908 expect(pool.info.ready).toBe(true)
909 expect(() => pool.start()).toThrow(
910 new Error('Cannot start an already started pool')
911 )
912 await pool.destroy()
913 expect(pool.info.started).toBe(false)
914 expect(pool.info.ready).toBe(false)
915 await expect(pool.destroy()).rejects.toThrow(
916 new Error('Cannot destroy an already destroyed pool')
917 )
918 })
919
47352846
JB
920 it('Verify that pool can be started after initialization', async () => {
921 const pool = new FixedClusterPool(
922 numberOfWorkers,
923 './tests/worker-files/cluster/testWorker.js',
924 {
925 startWorkers: false
926 }
927 )
928 expect(pool.info.started).toBe(false)
929 expect(pool.info.ready).toBe(false)
55082af9 930 expect(pool.readyEventEmitted).toBe(false)
47352846 931 expect(pool.workerNodes).toStrictEqual([])
948faff7 932 await expect(pool.execute()).rejects.toThrow(
47352846
JB
933 new Error('Cannot execute a task on not started pool')
934 )
935 pool.start()
936 expect(pool.info.started).toBe(true)
937 expect(pool.info.ready).toBe(true)
55082af9
JB
938 await waitPoolEvents(pool, PoolEvents.ready, 1)
939 expect(pool.readyEventEmitted).toBe(true)
47352846
JB
940 expect(pool.workerNodes.length).toBe(numberOfWorkers)
941 for (const workerNode of pool.workerNodes) {
942 expect(workerNode).toBeInstanceOf(WorkerNode)
943 }
944 await pool.destroy()
945 })
946
9d2d0da1
JB
947 it('Verify that pool execute() arguments are checked', async () => {
948 const pool = new FixedClusterPool(
949 numberOfWorkers,
950 './tests/worker-files/cluster/testWorker.js'
951 )
948faff7 952 await expect(pool.execute(undefined, 0)).rejects.toThrow(
9d2d0da1
JB
953 new TypeError('name argument must be a string')
954 )
948faff7 955 await expect(pool.execute(undefined, '')).rejects.toThrow(
9d2d0da1
JB
956 new TypeError('name argument must not be an empty string')
957 )
948faff7 958 await expect(pool.execute(undefined, undefined, {})).rejects.toThrow(
9d2d0da1
JB
959 new TypeError('transferList argument must be an array')
960 )
961 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
962 "Task function 'unknown' not found"
963 )
964 await pool.destroy()
948faff7 965 await expect(pool.execute()).rejects.toThrow(
47352846 966 new Error('Cannot execute a task on not started pool')
9d2d0da1
JB
967 )
968 })
969
2431bdb4 970 it('Verify that pool worker tasks usage are computed', async () => {
bf9549ae
JB
971 const pool = new FixedClusterPool(
972 numberOfWorkers,
973 './tests/worker-files/cluster/testWorker.js'
974 )
09c2d0d3 975 const promises = new Set()
fc027381
JB
976 const maxMultiplier = 2
977 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
09c2d0d3 978 promises.add(pool.execute())
bf9549ae 979 }
f06e48d8 980 for (const workerNode of pool.workerNodes) {
465b2940 981 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
982 tasks: {
983 executed: 0,
984 executing: maxMultiplier,
985 queued: 0,
df593701 986 maxQueued: 0,
463226a4 987 sequentiallyStolen: 0,
68cbdc84 988 stolen: 0,
a4e07f72
JB
989 failed: 0
990 },
991 runTime: {
a4e07f72
JB
992 history: expect.any(CircularArray)
993 },
994 waitTime: {
a4e07f72
JB
995 history: expect.any(CircularArray)
996 },
5df69fab
JB
997 elu: {
998 idle: {
5df69fab
JB
999 history: expect.any(CircularArray)
1000 },
1001 active: {
5df69fab 1002 history: expect.any(CircularArray)
f7510105 1003 }
5df69fab 1004 }
86bf340d 1005 })
bf9549ae
JB
1006 }
1007 await Promise.all(promises)
f06e48d8 1008 for (const workerNode of pool.workerNodes) {
465b2940 1009 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
1010 tasks: {
1011 executed: maxMultiplier,
1012 executing: 0,
1013 queued: 0,
df593701 1014 maxQueued: 0,
463226a4 1015 sequentiallyStolen: 0,
68cbdc84 1016 stolen: 0,
a4e07f72
JB
1017 failed: 0
1018 },
1019 runTime: {
a4e07f72
JB
1020 history: expect.any(CircularArray)
1021 },
1022 waitTime: {
a4e07f72
JB
1023 history: expect.any(CircularArray)
1024 },
5df69fab
JB
1025 elu: {
1026 idle: {
5df69fab
JB
1027 history: expect.any(CircularArray)
1028 },
1029 active: {
5df69fab 1030 history: expect.any(CircularArray)
f7510105 1031 }
5df69fab 1032 }
86bf340d 1033 })
bf9549ae 1034 }
fd7ebd49 1035 await pool.destroy()
bf9549ae
JB
1036 })
1037
2431bdb4 1038 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
7fd82a1c 1039 const pool = new DynamicThreadPool(
2431bdb4 1040 Math.floor(numberOfWorkers / 2),
8f4878b7 1041 numberOfWorkers,
b2fd3f4a 1042 './tests/worker-files/thread/testWorker.mjs'
9e619829 1043 )
09c2d0d3 1044 const promises = new Set()
ee9f5295
JB
1045 const maxMultiplier = 2
1046 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
09c2d0d3 1047 promises.add(pool.execute())
9e619829
JB
1048 }
1049 await Promise.all(promises)
f06e48d8 1050 for (const workerNode of pool.workerNodes) {
465b2940 1051 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
1052 tasks: {
1053 executed: expect.any(Number),
1054 executing: 0,
1055 queued: 0,
df593701 1056 maxQueued: 0,
463226a4 1057 sequentiallyStolen: 0,
68cbdc84 1058 stolen: 0,
a4e07f72
JB
1059 failed: 0
1060 },
1061 runTime: {
a4e07f72
JB
1062 history: expect.any(CircularArray)
1063 },
1064 waitTime: {
a4e07f72
JB
1065 history: expect.any(CircularArray)
1066 },
5df69fab
JB
1067 elu: {
1068 idle: {
5df69fab
JB
1069 history: expect.any(CircularArray)
1070 },
1071 active: {
5df69fab 1072 history: expect.any(CircularArray)
f7510105 1073 }
5df69fab 1074 }
86bf340d 1075 })
465b2940 1076 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
94407def
JB
1077 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1078 numberOfWorkers * maxMultiplier
1079 )
b97d82d8
JB
1080 expect(workerNode.usage.runTime.history.length).toBe(0)
1081 expect(workerNode.usage.waitTime.history.length).toBe(0)
1082 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1083 expect(workerNode.usage.elu.active.history.length).toBe(0)
9e619829
JB
1084 }
1085 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
f06e48d8 1086 for (const workerNode of pool.workerNodes) {
465b2940 1087 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
1088 tasks: {
1089 executed: 0,
1090 executing: 0,
1091 queued: 0,
df593701 1092 maxQueued: 0,
463226a4 1093 sequentiallyStolen: 0,
68cbdc84 1094 stolen: 0,
a4e07f72
JB
1095 failed: 0
1096 },
1097 runTime: {
a4e07f72
JB
1098 history: expect.any(CircularArray)
1099 },
1100 waitTime: {
a4e07f72
JB
1101 history: expect.any(CircularArray)
1102 },
5df69fab
JB
1103 elu: {
1104 idle: {
5df69fab
JB
1105 history: expect.any(CircularArray)
1106 },
1107 active: {
5df69fab 1108 history: expect.any(CircularArray)
f7510105 1109 }
5df69fab 1110 }
86bf340d 1111 })
465b2940
JB
1112 expect(workerNode.usage.runTime.history.length).toBe(0)
1113 expect(workerNode.usage.waitTime.history.length).toBe(0)
b97d82d8
JB
1114 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1115 expect(workerNode.usage.elu.active.history.length).toBe(0)
ee11a4a2 1116 }
fd7ebd49 1117 await pool.destroy()
ee11a4a2
JB
1118 })
1119
a1763c54
JB
1120 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1121 const pool = new DynamicClusterPool(
2431bdb4 1122 Math.floor(numberOfWorkers / 2),
164d950a 1123 numberOfWorkers,
a1763c54 1124 './tests/worker-files/cluster/testWorker.js'
164d950a 1125 )
c726f66c 1126 expect(pool.emitter.eventNames()).toStrictEqual([])
d46660cd 1127 let poolInfo
a1763c54 1128 let poolReady = 0
041dc05b 1129 pool.emitter.on(PoolEvents.ready, info => {
a1763c54 1130 ++poolReady
d46660cd
JB
1131 poolInfo = info
1132 })
a1763c54 1133 await waitPoolEvents(pool, PoolEvents.ready, 1)
c726f66c 1134 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
a1763c54 1135 expect(poolReady).toBe(1)
d46660cd 1136 expect(poolInfo).toStrictEqual({
23ccf9d7 1137 version,
d46660cd 1138 type: PoolTypes.dynamic,
a1763c54 1139 worker: WorkerTypes.cluster,
47352846 1140 started: true,
a1763c54 1141 ready: true,
2431bdb4
JB
1142 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1143 minSize: expect.any(Number),
1144 maxSize: expect.any(Number),
1145 workerNodes: expect.any(Number),
1146 idleWorkerNodes: expect.any(Number),
1147 busyWorkerNodes: expect.any(Number),
1148 executedTasks: expect.any(Number),
1149 executingTasks: expect.any(Number),
2431bdb4
JB
1150 failedTasks: expect.any(Number)
1151 })
1152 await pool.destroy()
1153 })
1154
a1763c54
JB
1155 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1156 const pool = new FixedThreadPool(
2431bdb4 1157 numberOfWorkers,
b2fd3f4a 1158 './tests/worker-files/thread/testWorker.mjs'
2431bdb4 1159 )
c726f66c 1160 expect(pool.emitter.eventNames()).toStrictEqual([])
a1763c54
JB
1161 const promises = new Set()
1162 let poolBusy = 0
2431bdb4 1163 let poolInfo
041dc05b 1164 pool.emitter.on(PoolEvents.busy, info => {
a1763c54 1165 ++poolBusy
2431bdb4
JB
1166 poolInfo = info
1167 })
c726f66c 1168 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
a1763c54
JB
1169 for (let i = 0; i < numberOfWorkers * 2; i++) {
1170 promises.add(pool.execute())
1171 }
1172 await Promise.all(promises)
1173 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1174 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1175 expect(poolBusy).toBe(numberOfWorkers + 1)
2431bdb4
JB
1176 expect(poolInfo).toStrictEqual({
1177 version,
a1763c54
JB
1178 type: PoolTypes.fixed,
1179 worker: WorkerTypes.thread,
47352846
JB
1180 started: true,
1181 ready: true,
2431bdb4 1182 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
d46660cd
JB
1183 minSize: expect.any(Number),
1184 maxSize: expect.any(Number),
1185 workerNodes: expect.any(Number),
1186 idleWorkerNodes: expect.any(Number),
1187 busyWorkerNodes: expect.any(Number),
a4e07f72
JB
1188 executedTasks: expect.any(Number),
1189 executingTasks: expect.any(Number),
a4e07f72 1190 failedTasks: expect.any(Number)
d46660cd 1191 })
164d950a
JB
1192 await pool.destroy()
1193 })
1194
a1763c54
JB
1195 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1196 const pool = new DynamicThreadPool(
1197 Math.floor(numberOfWorkers / 2),
7c0ba920 1198 numberOfWorkers,
b2fd3f4a 1199 './tests/worker-files/thread/testWorker.mjs'
7c0ba920 1200 )
c726f66c 1201 expect(pool.emitter.eventNames()).toStrictEqual([])
09c2d0d3 1202 const promises = new Set()
a1763c54 1203 let poolFull = 0
d46660cd 1204 let poolInfo
041dc05b 1205 pool.emitter.on(PoolEvents.full, info => {
a1763c54 1206 ++poolFull
d46660cd
JB
1207 poolInfo = info
1208 })
c726f66c 1209 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.full])
7c0ba920 1210 for (let i = 0; i < numberOfWorkers * 2; i++) {
f5d14e90 1211 promises.add(pool.execute())
7c0ba920 1212 }
cf597bc5 1213 await Promise.all(promises)
33e6bb4c 1214 expect(poolFull).toBe(1)
d46660cd 1215 expect(poolInfo).toStrictEqual({
23ccf9d7 1216 version,
a1763c54 1217 type: PoolTypes.dynamic,
d46660cd 1218 worker: WorkerTypes.thread,
47352846
JB
1219 started: true,
1220 ready: true,
2431bdb4 1221 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
8735b4e5
JB
1222 minSize: expect.any(Number),
1223 maxSize: expect.any(Number),
1224 workerNodes: expect.any(Number),
1225 idleWorkerNodes: expect.any(Number),
1226 busyWorkerNodes: expect.any(Number),
1227 executedTasks: expect.any(Number),
1228 executingTasks: expect.any(Number),
1229 failedTasks: expect.any(Number)
1230 })
1231 await pool.destroy()
1232 })
1233
3e8611a8 1234 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
b1aae695 1235 const pool = new FixedThreadPool(
8735b4e5 1236 numberOfWorkers,
b2fd3f4a 1237 './tests/worker-files/thread/testWorker.mjs',
8735b4e5
JB
1238 {
1239 enableTasksQueue: true
1240 }
1241 )
a074ffee 1242 stub(pool, 'hasBackPressure').returns(true)
c726f66c 1243 expect(pool.emitter.eventNames()).toStrictEqual([])
8735b4e5
JB
1244 const promises = new Set()
1245 let poolBackPressure = 0
1246 let poolInfo
041dc05b 1247 pool.emitter.on(PoolEvents.backPressure, info => {
8735b4e5
JB
1248 ++poolBackPressure
1249 poolInfo = info
1250 })
c726f66c 1251 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.backPressure])
033f1776 1252 for (let i = 0; i < numberOfWorkers + 1; i++) {
8735b4e5
JB
1253 promises.add(pool.execute())
1254 }
1255 await Promise.all(promises)
033f1776 1256 expect(poolBackPressure).toBe(1)
8735b4e5
JB
1257 expect(poolInfo).toStrictEqual({
1258 version,
3e8611a8 1259 type: PoolTypes.fixed,
8735b4e5 1260 worker: WorkerTypes.thread,
47352846
JB
1261 started: true,
1262 ready: true,
8735b4e5 1263 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
d46660cd
JB
1264 minSize: expect.any(Number),
1265 maxSize: expect.any(Number),
1266 workerNodes: expect.any(Number),
1267 idleWorkerNodes: expect.any(Number),
1268 busyWorkerNodes: expect.any(Number),
a4e07f72
JB
1269 executedTasks: expect.any(Number),
1270 executingTasks: expect.any(Number),
3e8611a8
JB
1271 maxQueuedTasks: expect.any(Number),
1272 queuedTasks: expect.any(Number),
1273 backPressure: true,
68cbdc84 1274 stolenTasks: expect.any(Number),
a4e07f72 1275 failedTasks: expect.any(Number)
d46660cd 1276 })
e458c82e 1277 expect(pool.hasBackPressure.callCount).toBe(5)
fd7ebd49 1278 await pool.destroy()
7c0ba920 1279 })
70a4f5ea 1280
f18fd12b
JB
1281 it('Verify that pool asynchronous resource track tasks execution', async () => {
1282 let taskAsyncId
1283 let initCalls = 0
1284 let beforeCalls = 0
1285 let afterCalls = 0
1286 let resolveCalls = 0
1287 const hook = createHook({
1288 init (asyncId, type) {
1289 if (type === 'poolifier:task') {
1290 initCalls++
1291 taskAsyncId = asyncId
1292 }
1293 },
1294 before (asyncId) {
1295 if (asyncId === taskAsyncId) beforeCalls++
1296 },
1297 after (asyncId) {
1298 if (asyncId === taskAsyncId) afterCalls++
1299 },
1300 promiseResolve () {
1301 if (executionAsyncId() === taskAsyncId) resolveCalls++
1302 }
1303 })
f18fd12b
JB
1304 const pool = new FixedThreadPool(
1305 numberOfWorkers,
1306 './tests/worker-files/thread/testWorker.mjs'
1307 )
8954c0a3 1308 hook.enable()
f18fd12b
JB
1309 await pool.execute()
1310 hook.disable()
1311 expect(initCalls).toBe(1)
1312 expect(beforeCalls).toBe(1)
1313 expect(afterCalls).toBe(1)
1314 expect(resolveCalls).toBe(1)
8954c0a3 1315 await pool.destroy()
f18fd12b
JB
1316 })
1317
9eae3c69
JB
1318 it('Verify that hasTaskFunction() is working', async () => {
1319 const dynamicThreadPool = new DynamicThreadPool(
1320 Math.floor(numberOfWorkers / 2),
1321 numberOfWorkers,
b2fd3f4a 1322 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
9eae3c69
JB
1323 )
1324 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1325 expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1326 expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1327 true
1328 )
1329 expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true)
1330 expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true)
1331 expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false)
1332 await dynamicThreadPool.destroy()
1333 const fixedClusterPool = new FixedClusterPool(
1334 numberOfWorkers,
1335 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1336 )
1337 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1338 expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1339 expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1340 true
1341 )
1342 expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true)
1343 expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true)
1344 expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false)
1345 await fixedClusterPool.destroy()
1346 })
1347
1348 it('Verify that addTaskFunction() is working', async () => {
1349 const dynamicThreadPool = new DynamicThreadPool(
1350 Math.floor(numberOfWorkers / 2),
1351 numberOfWorkers,
b2fd3f4a 1352 './tests/worker-files/thread/testWorker.mjs'
9eae3c69
JB
1353 )
1354 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
3feeab69
JB
1355 await expect(
1356 dynamicThreadPool.addTaskFunction(0, () => {})
948faff7 1357 ).rejects.toThrow(new TypeError('name argument must be a string'))
3feeab69
JB
1358 await expect(
1359 dynamicThreadPool.addTaskFunction('', () => {})
948faff7 1360 ).rejects.toThrow(
3feeab69
JB
1361 new TypeError('name argument must not be an empty string')
1362 )
948faff7
JB
1363 await expect(dynamicThreadPool.addTaskFunction('test', 0)).rejects.toThrow(
1364 new TypeError('fn argument must be a function')
1365 )
1366 await expect(dynamicThreadPool.addTaskFunction('test', '')).rejects.toThrow(
1367 new TypeError('fn argument must be a function')
1368 )
9eae3c69
JB
1369 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1370 DEFAULT_TASK_NAME,
1371 'test'
1372 ])
1373 const echoTaskFunction = data => {
1374 return data
1375 }
1376 await expect(
1377 dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1378 ).resolves.toBe(true)
1379 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1380 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1381 echoTaskFunction
1382 )
1383 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1384 DEFAULT_TASK_NAME,
1385 'test',
1386 'echo'
1387 ])
1388 const taskFunctionData = { test: 'test' }
1389 const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
1390 expect(echoResult).toStrictEqual(taskFunctionData)
adee6053
JB
1391 for (const workerNode of dynamicThreadPool.workerNodes) {
1392 expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
1393 tasks: {
1394 executed: expect.any(Number),
1395 executing: 0,
1396 queued: 0,
463226a4 1397 sequentiallyStolen: 0,
5ad42e34 1398 stolen: 0,
adee6053
JB
1399 failed: 0
1400 },
1401 runTime: {
1402 history: new CircularArray()
1403 },
1404 waitTime: {
1405 history: new CircularArray()
1406 },
1407 elu: {
1408 idle: {
1409 history: new CircularArray()
1410 },
1411 active: {
1412 history: new CircularArray()
1413 }
1414 }
1415 })
1416 }
9eae3c69
JB
1417 await dynamicThreadPool.destroy()
1418 })
1419
1420 it('Verify that removeTaskFunction() is working', async () => {
1421 const dynamicThreadPool = new DynamicThreadPool(
1422 Math.floor(numberOfWorkers / 2),
1423 numberOfWorkers,
b2fd3f4a 1424 './tests/worker-files/thread/testWorker.mjs'
9eae3c69
JB
1425 )
1426 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1427 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1428 DEFAULT_TASK_NAME,
1429 'test'
1430 ])
948faff7 1431 await expect(dynamicThreadPool.removeTaskFunction('test')).rejects.toThrow(
16248b23 1432 new Error('Cannot remove a task function not handled on the pool side')
9eae3c69
JB
1433 )
1434 const echoTaskFunction = data => {
1435 return data
1436 }
1437 await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1438 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1439 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1440 echoTaskFunction
1441 )
1442 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1443 DEFAULT_TASK_NAME,
1444 'test',
1445 'echo'
1446 ])
1447 await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
1448 true
1449 )
1450 expect(dynamicThreadPool.taskFunctions.size).toBe(0)
1451 expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
1452 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1453 DEFAULT_TASK_NAME,
1454 'test'
1455 ])
1456 await dynamicThreadPool.destroy()
1457 })
1458
30500265 1459 it('Verify that listTaskFunctionNames() is working', async () => {
90d7d101
JB
1460 const dynamicThreadPool = new DynamicThreadPool(
1461 Math.floor(numberOfWorkers / 2),
1462 numberOfWorkers,
b2fd3f4a 1463 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
90d7d101
JB
1464 )
1465 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
66979634 1466 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
6cd5248f 1467 DEFAULT_TASK_NAME,
90d7d101
JB
1468 'jsonIntegerSerialization',
1469 'factorial',
1470 'fibonacci'
1471 ])
9eae3c69 1472 await dynamicThreadPool.destroy()
90d7d101
JB
1473 const fixedClusterPool = new FixedClusterPool(
1474 numberOfWorkers,
1475 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1476 )
1477 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
66979634 1478 expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([
6cd5248f 1479 DEFAULT_TASK_NAME,
90d7d101
JB
1480 'jsonIntegerSerialization',
1481 'factorial',
1482 'fibonacci'
1483 ])
0fe39c97 1484 await fixedClusterPool.destroy()
90d7d101
JB
1485 })
1486
9eae3c69 1487 it('Verify that setDefaultTaskFunction() is working', async () => {
30500265
JB
1488 const dynamicThreadPool = new DynamicThreadPool(
1489 Math.floor(numberOfWorkers / 2),
1490 numberOfWorkers,
b2fd3f4a 1491 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
30500265
JB
1492 )
1493 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
711623b8 1494 const workerId = dynamicThreadPool.workerNodes[0].info.id
948faff7 1495 await expect(dynamicThreadPool.setDefaultTaskFunction(0)).rejects.toThrow(
b0b55f57 1496 new Error(
711623b8 1497 `Task function operation 'default' failed on worker ${workerId} with error: 'TypeError: name parameter is not a string'`
b0b55f57
JB
1498 )
1499 )
1500 await expect(
1501 dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
948faff7 1502 ).rejects.toThrow(
b0b55f57 1503 new Error(
711623b8 1504 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function reserved name as the default task function'`
b0b55f57
JB
1505 )
1506 )
1507 await expect(
1508 dynamicThreadPool.setDefaultTaskFunction('unknown')
948faff7 1509 ).rejects.toThrow(
b0b55f57 1510 new Error(
711623b8 1511 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function to a non-existing task function'`
b0b55f57
JB
1512 )
1513 )
9eae3c69
JB
1514 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1515 DEFAULT_TASK_NAME,
1516 'jsonIntegerSerialization',
1517 'factorial',
1518 'fibonacci'
1519 ])
1520 await expect(
1521 dynamicThreadPool.setDefaultTaskFunction('factorial')
1522 ).resolves.toBe(true)
1523 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1524 DEFAULT_TASK_NAME,
1525 'factorial',
1526 'jsonIntegerSerialization',
1527 'fibonacci'
1528 ])
1529 await expect(
1530 dynamicThreadPool.setDefaultTaskFunction('fibonacci')
1531 ).resolves.toBe(true)
1532 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1533 DEFAULT_TASK_NAME,
1534 'fibonacci',
1535 'jsonIntegerSerialization',
1536 'factorial'
1537 ])
cda9ba34 1538 await dynamicThreadPool.destroy()
30500265
JB
1539 })
1540
90d7d101 1541 it('Verify that multiple task functions worker is working', async () => {
70a4f5ea 1542 const pool = new DynamicClusterPool(
2431bdb4 1543 Math.floor(numberOfWorkers / 2),
70a4f5ea 1544 numberOfWorkers,
90d7d101 1545 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
70a4f5ea
JB
1546 )
1547 const data = { n: 10 }
82888165 1548 const result0 = await pool.execute(data)
30b963d4 1549 expect(result0).toStrictEqual({ ok: 1 })
70a4f5ea 1550 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
30b963d4 1551 expect(result1).toStrictEqual({ ok: 1 })
70a4f5ea
JB
1552 const result2 = await pool.execute(data, 'factorial')
1553 expect(result2).toBe(3628800)
1554 const result3 = await pool.execute(data, 'fibonacci')
024daf59 1555 expect(result3).toBe(55)
5bb5be17
JB
1556 expect(pool.info.executingTasks).toBe(0)
1557 expect(pool.info.executedTasks).toBe(4)
b414b84c 1558 for (const workerNode of pool.workerNodes) {
66979634 1559 expect(workerNode.info.taskFunctionNames).toStrictEqual([
6cd5248f 1560 DEFAULT_TASK_NAME,
b414b84c
JB
1561 'jsonIntegerSerialization',
1562 'factorial',
1563 'fibonacci'
1564 ])
1565 expect(workerNode.taskFunctionsUsage.size).toBe(3)
66979634 1566 for (const name of pool.listTaskFunctionNames()) {
5bb5be17
JB
1567 expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
1568 tasks: {
1569 executed: expect.any(Number),
4ba4c7f9 1570 executing: 0,
5bb5be17 1571 failed: 0,
68cbdc84 1572 queued: 0,
463226a4 1573 sequentiallyStolen: 0,
68cbdc84 1574 stolen: 0
5bb5be17
JB
1575 },
1576 runTime: {
1577 history: expect.any(CircularArray)
1578 },
1579 waitTime: {
1580 history: expect.any(CircularArray)
1581 },
1582 elu: {
1583 idle: {
1584 history: expect.any(CircularArray)
1585 },
1586 active: {
1587 history: expect.any(CircularArray)
1588 }
1589 }
1590 })
1591 expect(
4ba4c7f9
JB
1592 workerNode.getTaskFunctionWorkerUsage(name).tasks.executed
1593 ).toBeGreaterThan(0)
5bb5be17 1594 }
dfd7ec01
JB
1595 expect(
1596 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1597 ).toStrictEqual(
66979634
JB
1598 workerNode.getTaskFunctionWorkerUsage(
1599 workerNode.info.taskFunctionNames[1]
1600 )
dfd7ec01 1601 )
5bb5be17 1602 }
0fe39c97 1603 await pool.destroy()
70a4f5ea 1604 })
52a23942
JB
1605
1606 it('Verify sendKillMessageToWorker()', async () => {
1607 const pool = new DynamicClusterPool(
1608 Math.floor(numberOfWorkers / 2),
1609 numberOfWorkers,
1610 './tests/worker-files/cluster/testWorker.js'
1611 )
1612 const workerNodeKey = 0
1613 await expect(
adee6053 1614 pool.sendKillMessageToWorker(workerNodeKey)
52a23942
JB
1615 ).resolves.toBeUndefined()
1616 await pool.destroy()
1617 })
adee6053
JB
1618
1619 it('Verify sendTaskFunctionOperationToWorker()', async () => {
1620 const pool = new DynamicClusterPool(
1621 Math.floor(numberOfWorkers / 2),
1622 numberOfWorkers,
1623 './tests/worker-files/cluster/testWorker.js'
1624 )
1625 const workerNodeKey = 0
1626 await expect(
1627 pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
1628 taskFunctionOperation: 'add',
1629 taskFunctionName: 'empty',
1630 taskFunction: (() => {}).toString()
1631 })
1632 ).resolves.toBe(true)
1633 expect(
1634 pool.workerNodes[workerNodeKey].info.taskFunctionNames
1635 ).toStrictEqual([DEFAULT_TASK_NAME, 'test', 'empty'])
1636 await pool.destroy()
1637 })
1638
1639 it('Verify sendTaskFunctionOperationToWorkers()', async () => {
1640 const pool = new DynamicClusterPool(
1641 Math.floor(numberOfWorkers / 2),
1642 numberOfWorkers,
1643 './tests/worker-files/cluster/testWorker.js'
1644 )
adee6053
JB
1645 await expect(
1646 pool.sendTaskFunctionOperationToWorkers({
1647 taskFunctionOperation: 'add',
1648 taskFunctionName: 'empty',
1649 taskFunction: (() => {}).toString()
1650 })
1651 ).resolves.toBe(true)
1652 for (const workerNode of pool.workerNodes) {
1653 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1654 DEFAULT_TASK_NAME,
1655 'test',
1656 'empty'
1657 ])
1658 }
1659 await pool.destroy()
1660 })
3ec964d6 1661})