test: refine expectation at back pressure callback
[poolifier.git] / tests / pools / abstract-pool.test.mjs
CommitLineData
a074ffee 1import { EventEmitterAsyncResource } from 'node:events'
2eb14889 2import { dirname, join } from 'node:path'
a074ffee 3import { readFileSync } from 'node:fs'
2eb14889 4import { fileURLToPath } from 'node:url'
f18fd12b 5import { createHook, executionAsyncId } from 'node:async_hooks'
a074ffee
JB
6import { expect } from 'expect'
7import { restore, stub } from 'sinon'
8import {
70a4f5ea 9 DynamicClusterPool,
9e619829 10 DynamicThreadPool,
aee46736 11 FixedClusterPool,
e843b904 12 FixedThreadPool,
aee46736 13 PoolEvents,
184855e6 14 PoolTypes,
3d6dd312 15 WorkerChoiceStrategies,
184855e6 16 WorkerTypes
a074ffee
JB
17} from '../../lib/index.js'
18import { CircularArray } from '../../lib/circular-array.js'
19import { Deque } from '../../lib/deque.js'
20import { DEFAULT_TASK_NAME } from '../../lib/utils.js'
21import { waitPoolEvents } from '../test-utils.js'
22import { WorkerNode } from '../../lib/pools/worker-node.js'
e1ffb94f
JB
23
24describe('Abstract pool test suite', () => {
2eb14889
JB
25 const version = JSON.parse(
26 readFileSync(
a5e5599c 27 join(dirname(fileURLToPath(import.meta.url)), '../..', 'package.json'),
2eb14889
JB
28 'utf8'
29 )
30 ).version
fc027381 31 const numberOfWorkers = 2
a8884ffd 32 class StubPoolWithIsMain extends FixedThreadPool {
e1ffb94f
JB
33 isMain () {
34 return false
35 }
3ec964d6 36 }
3ec964d6 37
dc021bcc 38 afterEach(() => {
a074ffee 39 restore()
dc021bcc
JB
40 })
41
bcf85f7f
JB
42 it('Verify that pool can be created and destroyed', async () => {
43 const pool = new FixedThreadPool(
44 numberOfWorkers,
45 './tests/worker-files/thread/testWorker.mjs'
46 )
47 expect(pool).toBeInstanceOf(FixedThreadPool)
48 await pool.destroy()
49 })
50
51 it('Verify that pool cannot be created from a non main thread/process', () => {
8d3782fa
JB
52 expect(
53 () =>
a8884ffd 54 new StubPoolWithIsMain(
7c0ba920 55 numberOfWorkers,
b2fd3f4a 56 './tests/worker-files/thread/testWorker.mjs',
8d3782fa 57 {
041dc05b 58 errorHandler: e => console.error(e)
8d3782fa
JB
59 }
60 )
948faff7 61 ).toThrow(
e695d66f
JB
62 new Error(
63 'Cannot start a pool from a worker with the same type as the pool'
64 )
04f45163 65 )
3ec964d6 66 })
c510fea7 67
bc61cfe6
JB
68 it('Verify that pool statuses properties are set', async () => {
69 const pool = new FixedThreadPool(
70 numberOfWorkers,
b2fd3f4a 71 './tests/worker-files/thread/testWorker.mjs'
bc61cfe6 72 )
bc61cfe6 73 expect(pool.started).toBe(true)
711623b8
JB
74 expect(pool.starting).toBe(false)
75 expect(pool.destroying).toBe(false)
bc61cfe6 76 await pool.destroy()
bc61cfe6
JB
77 })
78
c510fea7 79 it('Verify that filePath is checked', () => {
948faff7 80 expect(() => new FixedThreadPool(numberOfWorkers)).toThrow(
c3719753
JB
81 new TypeError('The worker file path must be specified')
82 )
83 expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrow(
84 new TypeError('The worker file path must be a string')
3d6dd312
JB
85 )
86 expect(
87 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
948faff7 88 ).toThrow(new Error("Cannot find the worker file './dummyWorker.ts'"))
8d3782fa
JB
89 })
90
91 it('Verify that numberOfWorkers is checked', () => {
8003c026
JB
92 expect(
93 () =>
94 new FixedThreadPool(
95 undefined,
b2fd3f4a 96 './tests/worker-files/thread/testWorker.mjs'
8003c026 97 )
948faff7 98 ).toThrow(
e695d66f
JB
99 new Error(
100 'Cannot instantiate a pool without specifying the number of workers'
101 )
8d3782fa
JB
102 )
103 })
104
105 it('Verify that a negative number of workers is checked', () => {
106 expect(
107 () =>
108 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
948faff7 109 ).toThrow(
473c717a
JB
110 new RangeError(
111 'Cannot instantiate a pool with a negative number of workers'
112 )
8d3782fa
JB
113 )
114 })
115
116 it('Verify that a non integer number of workers is checked', () => {
117 expect(
118 () =>
b2fd3f4a 119 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.mjs')
948faff7 120 ).toThrow(
473c717a 121 new TypeError(
0d80593b 122 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
123 )
124 )
c510fea7 125 })
7c0ba920 126
216541b6 127 it('Verify that dynamic pool sizing is checked', () => {
a5ed75b7
JB
128 expect(
129 () =>
130 new DynamicClusterPool(
131 1,
132 undefined,
133 './tests/worker-files/cluster/testWorker.js'
134 )
948faff7 135 ).toThrow(
a5ed75b7
JB
136 new TypeError(
137 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
138 )
139 )
2761efb4
JB
140 expect(
141 () =>
142 new DynamicThreadPool(
143 0.5,
144 1,
b2fd3f4a 145 './tests/worker-files/thread/testWorker.mjs'
2761efb4 146 )
948faff7 147 ).toThrow(
2761efb4
JB
148 new TypeError(
149 'Cannot instantiate a pool with a non safe integer number of workers'
150 )
151 )
152 expect(
153 () =>
154 new DynamicClusterPool(
155 0,
156 0.5,
a5ed75b7 157 './tests/worker-files/cluster/testWorker.js'
2761efb4 158 )
948faff7 159 ).toThrow(
2761efb4
JB
160 new TypeError(
161 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
162 )
163 )
2431bdb4
JB
164 expect(
165 () =>
b2fd3f4a
JB
166 new DynamicThreadPool(
167 2,
168 1,
169 './tests/worker-files/thread/testWorker.mjs'
170 )
948faff7 171 ).toThrow(
2431bdb4
JB
172 new RangeError(
173 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
174 )
175 )
176 expect(
177 () =>
b2fd3f4a
JB
178 new DynamicThreadPool(
179 0,
180 0,
181 './tests/worker-files/thread/testWorker.mjs'
182 )
948faff7 183 ).toThrow(
2431bdb4 184 new RangeError(
213cbac6 185 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
2431bdb4
JB
186 )
187 )
21f710aa
JB
188 expect(
189 () =>
213cbac6
JB
190 new DynamicClusterPool(
191 1,
192 1,
193 './tests/worker-files/cluster/testWorker.js'
194 )
948faff7 195 ).toThrow(
21f710aa 196 new RangeError(
213cbac6 197 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
21f710aa
JB
198 )
199 )
2431bdb4
JB
200 })
201
fd7ebd49 202 it('Verify that pool options are checked', async () => {
7c0ba920
JB
203 let pool = new FixedThreadPool(
204 numberOfWorkers,
b2fd3f4a 205 './tests/worker-files/thread/testWorker.mjs'
7c0ba920 206 )
b5604034 207 expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
47352846
JB
208 expect(pool.opts).toStrictEqual({
209 startWorkers: true,
210 enableEvents: true,
211 restartWorkerOnError: true,
212 enableTasksQueue: false,
213 workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
214 workerChoiceStrategyOptions: {
215 retries: 6,
216 runTime: { median: false },
217 waitTime: { median: false },
218 elu: { median: false }
219 }
8990357d
JB
220 })
221 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
8c0b113f 222 retries: 6,
932fc8be 223 runTime: { median: false },
5df69fab
JB
224 waitTime: { median: false },
225 elu: { median: false }
da309861 226 })
999ef664
JB
227 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
228 .workerChoiceStrategies) {
229 expect(workerChoiceStrategy.opts).toStrictEqual({
230 retries: 6,
231 runTime: { median: false },
232 waitTime: { median: false },
233 elu: { median: false }
234 })
235 }
fd7ebd49 236 await pool.destroy()
73bfd59d 237 const testHandler = () => console.info('test handler executed')
7c0ba920
JB
238 pool = new FixedThreadPool(
239 numberOfWorkers,
b2fd3f4a 240 './tests/worker-files/thread/testWorker.mjs',
7c0ba920 241 {
e4543b14 242 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
49be33fe 243 workerChoiceStrategyOptions: {
932fc8be 244 runTime: { median: true },
fc027381 245 weights: { 0: 300, 1: 200 }
49be33fe 246 },
35cf1c03 247 enableEvents: false,
1f68cede 248 restartWorkerOnError: false,
ff733df7 249 enableTasksQueue: true,
d4aeae5a 250 tasksQueueOptions: { concurrency: 2 },
35cf1c03
JB
251 messageHandler: testHandler,
252 errorHandler: testHandler,
253 onlineHandler: testHandler,
254 exitHandler: testHandler
7c0ba920
JB
255 }
256 )
7c0ba920 257 expect(pool.emitter).toBeUndefined()
47352846
JB
258 expect(pool.opts).toStrictEqual({
259 startWorkers: true,
260 enableEvents: false,
261 restartWorkerOnError: false,
262 enableTasksQueue: true,
263 tasksQueueOptions: {
264 concurrency: 2,
2324f8c9 265 size: Math.pow(numberOfWorkers, 2),
dbd73092 266 taskStealing: true,
47352846
JB
267 tasksStealingOnBackPressure: true
268 },
269 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
270 workerChoiceStrategyOptions: {
271 retries: 6,
272 runTime: { median: true },
273 waitTime: { median: false },
274 elu: { median: false },
275 weights: { 0: 300, 1: 200 }
276 },
277 onlineHandler: testHandler,
278 messageHandler: testHandler,
279 errorHandler: testHandler,
280 exitHandler: testHandler
8990357d
JB
281 })
282 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
8c0b113f 283 retries: 6,
8990357d
JB
284 runTime: { median: true },
285 waitTime: { median: false },
286 elu: { median: false },
fc027381 287 weights: { 0: 300, 1: 200 }
da309861 288 })
999ef664
JB
289 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
290 .workerChoiceStrategies) {
291 expect(workerChoiceStrategy.opts).toStrictEqual({
292 retries: 6,
293 runTime: { median: true },
294 waitTime: { median: false },
295 elu: { median: false },
296 weights: { 0: 300, 1: 200 }
297 })
298 }
fd7ebd49 299 await pool.destroy()
7c0ba920
JB
300 })
301
fe291b64 302 it('Verify that pool options are validated', () => {
d4aeae5a
JB
303 expect(
304 () =>
305 new FixedThreadPool(
306 numberOfWorkers,
b2fd3f4a 307 './tests/worker-files/thread/testWorker.mjs',
d4aeae5a 308 {
f0d7f803 309 workerChoiceStrategy: 'invalidStrategy'
d4aeae5a
JB
310 }
311 )
948faff7 312 ).toThrow(new Error("Invalid worker choice strategy 'invalidStrategy'"))
7e653ee0
JB
313 expect(
314 () =>
315 new FixedThreadPool(
316 numberOfWorkers,
b2fd3f4a 317 './tests/worker-files/thread/testWorker.mjs',
7e653ee0
JB
318 {
319 workerChoiceStrategyOptions: {
8c0b113f 320 retries: 'invalidChoiceRetries'
7e653ee0
JB
321 }
322 }
323 )
948faff7 324 ).toThrow(
7e653ee0 325 new TypeError(
8c0b113f 326 'Invalid worker choice strategy options: retries must be an integer'
7e653ee0
JB
327 )
328 )
329 expect(
330 () =>
331 new FixedThreadPool(
332 numberOfWorkers,
b2fd3f4a 333 './tests/worker-files/thread/testWorker.mjs',
7e653ee0
JB
334 {
335 workerChoiceStrategyOptions: {
8c0b113f 336 retries: -1
7e653ee0
JB
337 }
338 }
339 )
948faff7 340 ).toThrow(
7e653ee0 341 new RangeError(
8c0b113f 342 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
7e653ee0
JB
343 )
344 )
49be33fe
JB
345 expect(
346 () =>
347 new FixedThreadPool(
348 numberOfWorkers,
b2fd3f4a 349 './tests/worker-files/thread/testWorker.mjs',
49be33fe
JB
350 {
351 workerChoiceStrategyOptions: { weights: {} }
352 }
353 )
948faff7 354 ).toThrow(
8735b4e5
JB
355 new Error(
356 'Invalid worker choice strategy options: must have a weight for each worker node'
357 )
49be33fe 358 )
f0d7f803
JB
359 expect(
360 () =>
361 new FixedThreadPool(
362 numberOfWorkers,
b2fd3f4a 363 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
364 {
365 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
366 }
367 )
948faff7 368 ).toThrow(
8735b4e5
JB
369 new Error(
370 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
371 )
f0d7f803 372 )
5c4c2dee
JB
373 expect(
374 () =>
375 new FixedThreadPool(
376 numberOfWorkers,
b2fd3f4a 377 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
378 {
379 enableTasksQueue: true,
380 tasksQueueOptions: 'invalidTasksQueueOptions'
381 }
382 )
948faff7 383 ).toThrow(
5c4c2dee
JB
384 new TypeError('Invalid tasks queue options: must be a plain object')
385 )
f0d7f803
JB
386 expect(
387 () =>
388 new FixedThreadPool(
389 numberOfWorkers,
b2fd3f4a 390 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
391 {
392 enableTasksQueue: true,
393 tasksQueueOptions: { concurrency: 0 }
394 }
395 )
948faff7 396 ).toThrow(
e695d66f 397 new RangeError(
20c6f652 398 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
8735b4e5
JB
399 )
400 )
f0d7f803
JB
401 expect(
402 () =>
403 new FixedThreadPool(
404 numberOfWorkers,
b2fd3f4a 405 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
406 {
407 enableTasksQueue: true,
5c4c2dee 408 tasksQueueOptions: { concurrency: -1 }
f0d7f803
JB
409 }
410 )
948faff7 411 ).toThrow(
5c4c2dee
JB
412 new RangeError(
413 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
414 )
8735b4e5 415 )
f0d7f803
JB
416 expect(
417 () =>
418 new FixedThreadPool(
419 numberOfWorkers,
b2fd3f4a 420 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
421 {
422 enableTasksQueue: true,
423 tasksQueueOptions: { concurrency: 0.2 }
424 }
425 )
948faff7 426 ).toThrow(
20c6f652 427 new TypeError('Invalid worker node tasks concurrency: must be an integer')
8735b4e5 428 )
5c4c2dee
JB
429 expect(
430 () =>
431 new FixedThreadPool(
432 numberOfWorkers,
b2fd3f4a 433 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
434 {
435 enableTasksQueue: true,
436 tasksQueueOptions: { size: 0 }
437 }
438 )
948faff7 439 ).toThrow(
5c4c2dee
JB
440 new RangeError(
441 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
442 )
443 )
444 expect(
445 () =>
446 new FixedThreadPool(
447 numberOfWorkers,
b2fd3f4a 448 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
449 {
450 enableTasksQueue: true,
451 tasksQueueOptions: { size: -1 }
452 }
453 )
948faff7 454 ).toThrow(
5c4c2dee
JB
455 new RangeError(
456 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
457 )
458 )
459 expect(
460 () =>
461 new FixedThreadPool(
462 numberOfWorkers,
b2fd3f4a 463 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
464 {
465 enableTasksQueue: true,
466 tasksQueueOptions: { size: 0.2 }
467 }
468 )
948faff7 469 ).toThrow(
5c4c2dee
JB
470 new TypeError('Invalid worker node tasks queue size: must be an integer')
471 )
d4aeae5a
JB
472 })
473
2431bdb4 474 it('Verify that pool worker choice strategy options can be set', async () => {
a20f0ba5
JB
475 const pool = new FixedThreadPool(
476 numberOfWorkers,
b2fd3f4a 477 './tests/worker-files/thread/testWorker.mjs',
a20f0ba5
JB
478 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
479 )
480 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
8c0b113f 481 retries: 6,
8990357d
JB
482 runTime: { median: false },
483 waitTime: { median: false },
484 elu: { median: false }
485 })
486 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
8c0b113f 487 retries: 6,
932fc8be 488 runTime: { median: false },
5df69fab
JB
489 waitTime: { median: false },
490 elu: { median: false }
a20f0ba5
JB
491 })
492 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
493 .workerChoiceStrategies) {
86bf340d 494 expect(workerChoiceStrategy.opts).toStrictEqual({
8c0b113f 495 retries: 6,
932fc8be 496 runTime: { median: false },
5df69fab
JB
497 waitTime: { median: false },
498 elu: { median: false }
86bf340d 499 })
a20f0ba5 500 }
87de9ff5
JB
501 expect(
502 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
503 ).toStrictEqual({
932fc8be
JB
504 runTime: {
505 aggregate: true,
506 average: true,
507 median: false
508 },
509 waitTime: {
510 aggregate: false,
511 average: false,
512 median: false
513 },
5df69fab 514 elu: {
9adcefab
JB
515 aggregate: true,
516 average: true,
5df69fab
JB
517 median: false
518 }
86bf340d 519 })
9adcefab
JB
520 pool.setWorkerChoiceStrategyOptions({
521 runTime: { median: true },
522 elu: { median: true }
523 })
a20f0ba5 524 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
8c0b113f 525 retries: 6,
9adcefab 526 runTime: { median: true },
8990357d
JB
527 waitTime: { median: false },
528 elu: { median: true }
529 })
530 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
8c0b113f 531 retries: 6,
8990357d
JB
532 runTime: { median: true },
533 waitTime: { median: false },
9adcefab 534 elu: { median: true }
a20f0ba5
JB
535 })
536 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
537 .workerChoiceStrategies) {
932fc8be 538 expect(workerChoiceStrategy.opts).toStrictEqual({
8c0b113f 539 retries: 6,
9adcefab 540 runTime: { median: true },
8990357d 541 waitTime: { median: false },
9adcefab 542 elu: { median: true }
932fc8be 543 })
a20f0ba5 544 }
87de9ff5
JB
545 expect(
546 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
547 ).toStrictEqual({
932fc8be
JB
548 runTime: {
549 aggregate: true,
550 average: false,
551 median: true
552 },
553 waitTime: {
554 aggregate: false,
555 average: false,
556 median: false
557 },
5df69fab 558 elu: {
9adcefab 559 aggregate: true,
5df69fab 560 average: false,
9adcefab 561 median: true
5df69fab 562 }
86bf340d 563 })
9adcefab
JB
564 pool.setWorkerChoiceStrategyOptions({
565 runTime: { median: false },
566 elu: { median: false }
567 })
a20f0ba5 568 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
8c0b113f 569 retries: 6,
8990357d
JB
570 runTime: { median: false },
571 waitTime: { median: false },
572 elu: { median: false }
573 })
574 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
8c0b113f 575 retries: 6,
9adcefab 576 runTime: { median: false },
8990357d 577 waitTime: { median: false },
9adcefab 578 elu: { median: false }
a20f0ba5
JB
579 })
580 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
581 .workerChoiceStrategies) {
932fc8be 582 expect(workerChoiceStrategy.opts).toStrictEqual({
8c0b113f 583 retries: 6,
9adcefab 584 runTime: { median: false },
8990357d 585 waitTime: { median: false },
9adcefab 586 elu: { median: false }
932fc8be 587 })
a20f0ba5 588 }
87de9ff5
JB
589 expect(
590 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
591 ).toStrictEqual({
932fc8be
JB
592 runTime: {
593 aggregate: true,
594 average: true,
595 median: false
596 },
597 waitTime: {
598 aggregate: false,
599 average: false,
600 median: false
601 },
5df69fab 602 elu: {
9adcefab
JB
603 aggregate: true,
604 average: true,
5df69fab
JB
605 median: false
606 }
86bf340d 607 })
1f95d544
JB
608 expect(() =>
609 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
948faff7 610 ).toThrow(
8735b4e5
JB
611 new TypeError(
612 'Invalid worker choice strategy options: must be a plain object'
613 )
1f95d544 614 )
7e653ee0
JB
615 expect(() =>
616 pool.setWorkerChoiceStrategyOptions({
8c0b113f 617 retries: 'invalidChoiceRetries'
7e653ee0 618 })
948faff7 619 ).toThrow(
7e653ee0 620 new TypeError(
8c0b113f 621 'Invalid worker choice strategy options: retries must be an integer'
7e653ee0
JB
622 )
623 )
948faff7 624 expect(() => pool.setWorkerChoiceStrategyOptions({ retries: -1 })).toThrow(
7e653ee0 625 new RangeError(
8c0b113f 626 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
7e653ee0
JB
627 )
628 )
948faff7 629 expect(() => pool.setWorkerChoiceStrategyOptions({ weights: {} })).toThrow(
8735b4e5
JB
630 new Error(
631 'Invalid worker choice strategy options: must have a weight for each worker node'
632 )
1f95d544
JB
633 )
634 expect(() =>
635 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
948faff7 636 ).toThrow(
8735b4e5
JB
637 new Error(
638 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
639 )
1f95d544 640 )
a20f0ba5
JB
641 await pool.destroy()
642 })
643
2431bdb4 644 it('Verify that pool tasks queue can be enabled/disabled', async () => {
a20f0ba5
JB
645 const pool = new FixedThreadPool(
646 numberOfWorkers,
b2fd3f4a 647 './tests/worker-files/thread/testWorker.mjs'
a20f0ba5
JB
648 )
649 expect(pool.opts.enableTasksQueue).toBe(false)
650 expect(pool.opts.tasksQueueOptions).toBeUndefined()
651 pool.enableTasksQueue(true)
652 expect(pool.opts.enableTasksQueue).toBe(true)
20c6f652
JB
653 expect(pool.opts.tasksQueueOptions).toStrictEqual({
654 concurrency: 1,
2324f8c9 655 size: Math.pow(numberOfWorkers, 2),
dbd73092 656 taskStealing: true,
47352846 657 tasksStealingOnBackPressure: true
20c6f652 658 })
a20f0ba5
JB
659 pool.enableTasksQueue(true, { concurrency: 2 })
660 expect(pool.opts.enableTasksQueue).toBe(true)
20c6f652
JB
661 expect(pool.opts.tasksQueueOptions).toStrictEqual({
662 concurrency: 2,
2324f8c9 663 size: Math.pow(numberOfWorkers, 2),
dbd73092 664 taskStealing: true,
47352846 665 tasksStealingOnBackPressure: true
20c6f652 666 })
a20f0ba5
JB
667 pool.enableTasksQueue(false)
668 expect(pool.opts.enableTasksQueue).toBe(false)
669 expect(pool.opts.tasksQueueOptions).toBeUndefined()
670 await pool.destroy()
671 })
672
2431bdb4 673 it('Verify that pool tasks queue options can be set', async () => {
a20f0ba5
JB
674 const pool = new FixedThreadPool(
675 numberOfWorkers,
b2fd3f4a 676 './tests/worker-files/thread/testWorker.mjs',
a20f0ba5
JB
677 { enableTasksQueue: true }
678 )
20c6f652
JB
679 expect(pool.opts.tasksQueueOptions).toStrictEqual({
680 concurrency: 1,
2324f8c9 681 size: Math.pow(numberOfWorkers, 2),
dbd73092 682 taskStealing: true,
47352846 683 tasksStealingOnBackPressure: true
20c6f652 684 })
d6ca1416 685 for (const workerNode of pool.workerNodes) {
2324f8c9
JB
686 expect(workerNode.tasksQueueBackPressureSize).toBe(
687 pool.opts.tasksQueueOptions.size
688 )
d6ca1416
JB
689 }
690 pool.setTasksQueueOptions({
691 concurrency: 2,
2324f8c9 692 size: 2,
d6ca1416
JB
693 taskStealing: false,
694 tasksStealingOnBackPressure: false
695 })
20c6f652
JB
696 expect(pool.opts.tasksQueueOptions).toStrictEqual({
697 concurrency: 2,
2324f8c9 698 size: 2,
d6ca1416
JB
699 taskStealing: false,
700 tasksStealingOnBackPressure: false
701 })
702 for (const workerNode of pool.workerNodes) {
2324f8c9
JB
703 expect(workerNode.tasksQueueBackPressureSize).toBe(
704 pool.opts.tasksQueueOptions.size
705 )
d6ca1416
JB
706 }
707 pool.setTasksQueueOptions({
708 concurrency: 1,
709 taskStealing: true,
710 tasksStealingOnBackPressure: true
711 })
712 expect(pool.opts.tasksQueueOptions).toStrictEqual({
713 concurrency: 1,
2324f8c9 714 size: Math.pow(numberOfWorkers, 2),
dbd73092 715 taskStealing: true,
47352846 716 tasksStealingOnBackPressure: true
20c6f652 717 })
d6ca1416 718 for (const workerNode of pool.workerNodes) {
2324f8c9
JB
719 expect(workerNode.tasksQueueBackPressureSize).toBe(
720 pool.opts.tasksQueueOptions.size
721 )
d6ca1416 722 }
948faff7 723 expect(() => pool.setTasksQueueOptions('invalidTasksQueueOptions')).toThrow(
8735b4e5
JB
724 new TypeError('Invalid tasks queue options: must be a plain object')
725 )
948faff7 726 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrow(
e695d66f 727 new RangeError(
20c6f652 728 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
8735b4e5
JB
729 )
730 )
948faff7 731 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrow(
e695d66f 732 new RangeError(
20c6f652 733 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
8735b4e5 734 )
a20f0ba5 735 )
948faff7 736 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrow(
20c6f652
JB
737 new TypeError('Invalid worker node tasks concurrency: must be an integer')
738 )
948faff7 739 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrow(
20c6f652 740 new RangeError(
68dbcdc0 741 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
20c6f652
JB
742 )
743 )
948faff7 744 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrow(
20c6f652 745 new RangeError(
68dbcdc0 746 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
20c6f652
JB
747 )
748 )
948faff7 749 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrow(
68dbcdc0 750 new TypeError('Invalid worker node tasks queue size: must be an integer')
f0d7f803 751 )
a20f0ba5
JB
752 await pool.destroy()
753 })
754
6b27d407
JB
755 it('Verify that pool info is set', async () => {
756 let pool = new FixedThreadPool(
757 numberOfWorkers,
b2fd3f4a 758 './tests/worker-files/thread/testWorker.mjs'
6b27d407 759 )
2dca6cab
JB
760 expect(pool.info).toStrictEqual({
761 version,
762 type: PoolTypes.fixed,
763 worker: WorkerTypes.thread,
47352846 764 started: true,
2dca6cab
JB
765 ready: true,
766 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
767 minSize: numberOfWorkers,
768 maxSize: numberOfWorkers,
769 workerNodes: numberOfWorkers,
770 idleWorkerNodes: numberOfWorkers,
771 busyWorkerNodes: 0,
772 executedTasks: 0,
773 executingTasks: 0,
2dca6cab
JB
774 failedTasks: 0
775 })
6b27d407
JB
776 await pool.destroy()
777 pool = new DynamicClusterPool(
2431bdb4 778 Math.floor(numberOfWorkers / 2),
6b27d407 779 numberOfWorkers,
ecdfbdc0 780 './tests/worker-files/cluster/testWorker.js'
6b27d407 781 )
2dca6cab
JB
782 expect(pool.info).toStrictEqual({
783 version,
784 type: PoolTypes.dynamic,
785 worker: WorkerTypes.cluster,
47352846 786 started: true,
2dca6cab
JB
787 ready: true,
788 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
789 minSize: Math.floor(numberOfWorkers / 2),
790 maxSize: numberOfWorkers,
791 workerNodes: Math.floor(numberOfWorkers / 2),
792 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
793 busyWorkerNodes: 0,
794 executedTasks: 0,
795 executingTasks: 0,
2dca6cab
JB
796 failedTasks: 0
797 })
6b27d407
JB
798 await pool.destroy()
799 })
800
2431bdb4 801 it('Verify that pool worker tasks usage are initialized', async () => {
bf9549ae
JB
802 const pool = new FixedClusterPool(
803 numberOfWorkers,
804 './tests/worker-files/cluster/testWorker.js'
805 )
f06e48d8 806 for (const workerNode of pool.workerNodes) {
47352846 807 expect(workerNode).toBeInstanceOf(WorkerNode)
465b2940 808 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
809 tasks: {
810 executed: 0,
811 executing: 0,
812 queued: 0,
df593701 813 maxQueued: 0,
463226a4 814 sequentiallyStolen: 0,
68cbdc84 815 stolen: 0,
a4e07f72
JB
816 failed: 0
817 },
818 runTime: {
4ba4c7f9 819 history: new CircularArray()
a4e07f72
JB
820 },
821 waitTime: {
4ba4c7f9 822 history: new CircularArray()
a4e07f72 823 },
5df69fab
JB
824 elu: {
825 idle: {
4ba4c7f9 826 history: new CircularArray()
5df69fab
JB
827 },
828 active: {
4ba4c7f9 829 history: new CircularArray()
f7510105 830 }
5df69fab 831 }
86bf340d 832 })
f06e48d8
JB
833 }
834 await pool.destroy()
835 })
836
2431bdb4
JB
837 it('Verify that pool worker tasks queue are initialized', async () => {
838 let pool = new FixedClusterPool(
f06e48d8
JB
839 numberOfWorkers,
840 './tests/worker-files/cluster/testWorker.js'
841 )
842 for (const workerNode of pool.workerNodes) {
47352846 843 expect(workerNode).toBeInstanceOf(WorkerNode)
574b351d 844 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
4d8bf9e4 845 expect(workerNode.tasksQueue.size).toBe(0)
9c16fb4b 846 expect(workerNode.tasksQueue.maxSize).toBe(0)
bf9549ae 847 }
fd7ebd49 848 await pool.destroy()
2431bdb4
JB
849 pool = new DynamicThreadPool(
850 Math.floor(numberOfWorkers / 2),
851 numberOfWorkers,
b2fd3f4a 852 './tests/worker-files/thread/testWorker.mjs'
2431bdb4
JB
853 )
854 for (const workerNode of pool.workerNodes) {
47352846 855 expect(workerNode).toBeInstanceOf(WorkerNode)
574b351d 856 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
2431bdb4
JB
857 expect(workerNode.tasksQueue.size).toBe(0)
858 expect(workerNode.tasksQueue.maxSize).toBe(0)
859 }
213cbac6 860 await pool.destroy()
2431bdb4
JB
861 })
862
863 it('Verify that pool worker info are initialized', async () => {
864 let pool = new FixedClusterPool(
865 numberOfWorkers,
866 './tests/worker-files/cluster/testWorker.js'
867 )
2dca6cab 868 for (const workerNode of pool.workerNodes) {
47352846 869 expect(workerNode).toBeInstanceOf(WorkerNode)
2dca6cab
JB
870 expect(workerNode.info).toStrictEqual({
871 id: expect.any(Number),
872 type: WorkerTypes.cluster,
873 dynamic: false,
874 ready: true
875 })
876 }
2431bdb4
JB
877 await pool.destroy()
878 pool = new DynamicThreadPool(
879 Math.floor(numberOfWorkers / 2),
880 numberOfWorkers,
b2fd3f4a 881 './tests/worker-files/thread/testWorker.mjs'
2431bdb4 882 )
2dca6cab 883 for (const workerNode of pool.workerNodes) {
47352846 884 expect(workerNode).toBeInstanceOf(WorkerNode)
2dca6cab
JB
885 expect(workerNode.info).toStrictEqual({
886 id: expect.any(Number),
887 type: WorkerTypes.thread,
888 dynamic: false,
7884d183 889 ready: true
2dca6cab
JB
890 })
891 }
213cbac6 892 await pool.destroy()
bf9549ae
JB
893 })
894
711623b8
JB
895 it('Verify that pool statuses are checked at start or destroy', async () => {
896 const pool = new FixedThreadPool(
897 numberOfWorkers,
898 './tests/worker-files/thread/testWorker.mjs'
899 )
900 expect(pool.info.started).toBe(true)
901 expect(pool.info.ready).toBe(true)
902 expect(() => pool.start()).toThrow(
903 new Error('Cannot start an already started pool')
904 )
905 await pool.destroy()
906 expect(pool.info.started).toBe(false)
907 expect(pool.info.ready).toBe(false)
908 await expect(pool.destroy()).rejects.toThrow(
909 new Error('Cannot destroy an already destroyed pool')
910 )
911 })
912
47352846
JB
913 it('Verify that pool can be started after initialization', async () => {
914 const pool = new FixedClusterPool(
915 numberOfWorkers,
916 './tests/worker-files/cluster/testWorker.js',
917 {
918 startWorkers: false
919 }
920 )
921 expect(pool.info.started).toBe(false)
922 expect(pool.info.ready).toBe(false)
55082af9 923 expect(pool.readyEventEmitted).toBe(false)
47352846 924 expect(pool.workerNodes).toStrictEqual([])
948faff7 925 await expect(pool.execute()).rejects.toThrow(
47352846
JB
926 new Error('Cannot execute a task on not started pool')
927 )
928 pool.start()
929 expect(pool.info.started).toBe(true)
930 expect(pool.info.ready).toBe(true)
55082af9
JB
931 await waitPoolEvents(pool, PoolEvents.ready, 1)
932 expect(pool.readyEventEmitted).toBe(true)
47352846
JB
933 expect(pool.workerNodes.length).toBe(numberOfWorkers)
934 for (const workerNode of pool.workerNodes) {
935 expect(workerNode).toBeInstanceOf(WorkerNode)
936 }
937 await pool.destroy()
938 })
939
9d2d0da1
JB
940 it('Verify that pool execute() arguments are checked', async () => {
941 const pool = new FixedClusterPool(
942 numberOfWorkers,
943 './tests/worker-files/cluster/testWorker.js'
944 )
948faff7 945 await expect(pool.execute(undefined, 0)).rejects.toThrow(
9d2d0da1
JB
946 new TypeError('name argument must be a string')
947 )
948faff7 948 await expect(pool.execute(undefined, '')).rejects.toThrow(
9d2d0da1
JB
949 new TypeError('name argument must not be an empty string')
950 )
948faff7 951 await expect(pool.execute(undefined, undefined, {})).rejects.toThrow(
9d2d0da1
JB
952 new TypeError('transferList argument must be an array')
953 )
954 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
955 "Task function 'unknown' not found"
956 )
957 await pool.destroy()
948faff7 958 await expect(pool.execute()).rejects.toThrow(
47352846 959 new Error('Cannot execute a task on not started pool')
9d2d0da1
JB
960 )
961 })
962
2431bdb4 963 it('Verify that pool worker tasks usage are computed', async () => {
bf9549ae
JB
964 const pool = new FixedClusterPool(
965 numberOfWorkers,
966 './tests/worker-files/cluster/testWorker.js'
967 )
09c2d0d3 968 const promises = new Set()
fc027381
JB
969 const maxMultiplier = 2
970 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
09c2d0d3 971 promises.add(pool.execute())
bf9549ae 972 }
f06e48d8 973 for (const workerNode of pool.workerNodes) {
465b2940 974 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
975 tasks: {
976 executed: 0,
977 executing: maxMultiplier,
978 queued: 0,
df593701 979 maxQueued: 0,
463226a4 980 sequentiallyStolen: 0,
68cbdc84 981 stolen: 0,
a4e07f72
JB
982 failed: 0
983 },
984 runTime: {
a4e07f72
JB
985 history: expect.any(CircularArray)
986 },
987 waitTime: {
a4e07f72
JB
988 history: expect.any(CircularArray)
989 },
5df69fab
JB
990 elu: {
991 idle: {
5df69fab
JB
992 history: expect.any(CircularArray)
993 },
994 active: {
5df69fab 995 history: expect.any(CircularArray)
f7510105 996 }
5df69fab 997 }
86bf340d 998 })
bf9549ae
JB
999 }
1000 await Promise.all(promises)
f06e48d8 1001 for (const workerNode of pool.workerNodes) {
465b2940 1002 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
1003 tasks: {
1004 executed: maxMultiplier,
1005 executing: 0,
1006 queued: 0,
df593701 1007 maxQueued: 0,
463226a4 1008 sequentiallyStolen: 0,
68cbdc84 1009 stolen: 0,
a4e07f72
JB
1010 failed: 0
1011 },
1012 runTime: {
a4e07f72
JB
1013 history: expect.any(CircularArray)
1014 },
1015 waitTime: {
a4e07f72
JB
1016 history: expect.any(CircularArray)
1017 },
5df69fab
JB
1018 elu: {
1019 idle: {
5df69fab
JB
1020 history: expect.any(CircularArray)
1021 },
1022 active: {
5df69fab 1023 history: expect.any(CircularArray)
f7510105 1024 }
5df69fab 1025 }
86bf340d 1026 })
bf9549ae 1027 }
fd7ebd49 1028 await pool.destroy()
bf9549ae
JB
1029 })
1030
2431bdb4 1031 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
7fd82a1c 1032 const pool = new DynamicThreadPool(
2431bdb4 1033 Math.floor(numberOfWorkers / 2),
8f4878b7 1034 numberOfWorkers,
b2fd3f4a 1035 './tests/worker-files/thread/testWorker.mjs'
9e619829 1036 )
09c2d0d3 1037 const promises = new Set()
ee9f5295
JB
1038 const maxMultiplier = 2
1039 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
09c2d0d3 1040 promises.add(pool.execute())
9e619829
JB
1041 }
1042 await Promise.all(promises)
f06e48d8 1043 for (const workerNode of pool.workerNodes) {
465b2940 1044 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
1045 tasks: {
1046 executed: expect.any(Number),
1047 executing: 0,
1048 queued: 0,
df593701 1049 maxQueued: 0,
463226a4 1050 sequentiallyStolen: 0,
68cbdc84 1051 stolen: 0,
a4e07f72
JB
1052 failed: 0
1053 },
1054 runTime: {
a4e07f72
JB
1055 history: expect.any(CircularArray)
1056 },
1057 waitTime: {
a4e07f72
JB
1058 history: expect.any(CircularArray)
1059 },
5df69fab
JB
1060 elu: {
1061 idle: {
5df69fab
JB
1062 history: expect.any(CircularArray)
1063 },
1064 active: {
5df69fab 1065 history: expect.any(CircularArray)
f7510105 1066 }
5df69fab 1067 }
86bf340d 1068 })
465b2940 1069 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
94407def
JB
1070 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1071 numberOfWorkers * maxMultiplier
1072 )
b97d82d8
JB
1073 expect(workerNode.usage.runTime.history.length).toBe(0)
1074 expect(workerNode.usage.waitTime.history.length).toBe(0)
1075 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1076 expect(workerNode.usage.elu.active.history.length).toBe(0)
9e619829
JB
1077 }
1078 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
f06e48d8 1079 for (const workerNode of pool.workerNodes) {
465b2940 1080 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
1081 tasks: {
1082 executed: 0,
1083 executing: 0,
1084 queued: 0,
df593701 1085 maxQueued: 0,
463226a4 1086 sequentiallyStolen: 0,
68cbdc84 1087 stolen: 0,
a4e07f72
JB
1088 failed: 0
1089 },
1090 runTime: {
a4e07f72
JB
1091 history: expect.any(CircularArray)
1092 },
1093 waitTime: {
a4e07f72
JB
1094 history: expect.any(CircularArray)
1095 },
5df69fab
JB
1096 elu: {
1097 idle: {
5df69fab
JB
1098 history: expect.any(CircularArray)
1099 },
1100 active: {
5df69fab 1101 history: expect.any(CircularArray)
f7510105 1102 }
5df69fab 1103 }
86bf340d 1104 })
465b2940
JB
1105 expect(workerNode.usage.runTime.history.length).toBe(0)
1106 expect(workerNode.usage.waitTime.history.length).toBe(0)
b97d82d8
JB
1107 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1108 expect(workerNode.usage.elu.active.history.length).toBe(0)
ee11a4a2 1109 }
fd7ebd49 1110 await pool.destroy()
ee11a4a2
JB
1111 })
1112
a1763c54
JB
1113 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1114 const pool = new DynamicClusterPool(
2431bdb4 1115 Math.floor(numberOfWorkers / 2),
164d950a 1116 numberOfWorkers,
a1763c54 1117 './tests/worker-files/cluster/testWorker.js'
164d950a 1118 )
c726f66c 1119 expect(pool.emitter.eventNames()).toStrictEqual([])
d46660cd 1120 let poolInfo
a1763c54 1121 let poolReady = 0
041dc05b 1122 pool.emitter.on(PoolEvents.ready, info => {
a1763c54 1123 ++poolReady
d46660cd
JB
1124 poolInfo = info
1125 })
a1763c54 1126 await waitPoolEvents(pool, PoolEvents.ready, 1)
c726f66c 1127 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
a1763c54 1128 expect(poolReady).toBe(1)
d46660cd 1129 expect(poolInfo).toStrictEqual({
23ccf9d7 1130 version,
d46660cd 1131 type: PoolTypes.dynamic,
a1763c54 1132 worker: WorkerTypes.cluster,
47352846 1133 started: true,
a1763c54 1134 ready: true,
2431bdb4
JB
1135 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1136 minSize: expect.any(Number),
1137 maxSize: expect.any(Number),
1138 workerNodes: expect.any(Number),
1139 idleWorkerNodes: expect.any(Number),
1140 busyWorkerNodes: expect.any(Number),
1141 executedTasks: expect.any(Number),
1142 executingTasks: expect.any(Number),
2431bdb4
JB
1143 failedTasks: expect.any(Number)
1144 })
1145 await pool.destroy()
1146 })
1147
a1763c54
JB
1148 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1149 const pool = new FixedThreadPool(
2431bdb4 1150 numberOfWorkers,
b2fd3f4a 1151 './tests/worker-files/thread/testWorker.mjs'
2431bdb4 1152 )
c726f66c 1153 expect(pool.emitter.eventNames()).toStrictEqual([])
a1763c54
JB
1154 const promises = new Set()
1155 let poolBusy = 0
2431bdb4 1156 let poolInfo
041dc05b 1157 pool.emitter.on(PoolEvents.busy, info => {
a1763c54 1158 ++poolBusy
2431bdb4
JB
1159 poolInfo = info
1160 })
c726f66c 1161 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
a1763c54
JB
1162 for (let i = 0; i < numberOfWorkers * 2; i++) {
1163 promises.add(pool.execute())
1164 }
1165 await Promise.all(promises)
1166 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1167 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1168 expect(poolBusy).toBe(numberOfWorkers + 1)
2431bdb4
JB
1169 expect(poolInfo).toStrictEqual({
1170 version,
a1763c54
JB
1171 type: PoolTypes.fixed,
1172 worker: WorkerTypes.thread,
47352846
JB
1173 started: true,
1174 ready: true,
2431bdb4 1175 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
d46660cd
JB
1176 minSize: expect.any(Number),
1177 maxSize: expect.any(Number),
1178 workerNodes: expect.any(Number),
1179 idleWorkerNodes: expect.any(Number),
1180 busyWorkerNodes: expect.any(Number),
a4e07f72
JB
1181 executedTasks: expect.any(Number),
1182 executingTasks: expect.any(Number),
a4e07f72 1183 failedTasks: expect.any(Number)
d46660cd 1184 })
164d950a
JB
1185 await pool.destroy()
1186 })
1187
a1763c54
JB
1188 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1189 const pool = new DynamicThreadPool(
1190 Math.floor(numberOfWorkers / 2),
7c0ba920 1191 numberOfWorkers,
b2fd3f4a 1192 './tests/worker-files/thread/testWorker.mjs'
7c0ba920 1193 )
c726f66c 1194 expect(pool.emitter.eventNames()).toStrictEqual([])
09c2d0d3 1195 const promises = new Set()
a1763c54 1196 let poolFull = 0
d46660cd 1197 let poolInfo
041dc05b 1198 pool.emitter.on(PoolEvents.full, info => {
a1763c54 1199 ++poolFull
d46660cd
JB
1200 poolInfo = info
1201 })
c726f66c 1202 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.full])
7c0ba920 1203 for (let i = 0; i < numberOfWorkers * 2; i++) {
f5d14e90 1204 promises.add(pool.execute())
7c0ba920 1205 }
cf597bc5 1206 await Promise.all(promises)
33e6bb4c 1207 expect(poolFull).toBe(1)
d46660cd 1208 expect(poolInfo).toStrictEqual({
23ccf9d7 1209 version,
a1763c54 1210 type: PoolTypes.dynamic,
d46660cd 1211 worker: WorkerTypes.thread,
47352846
JB
1212 started: true,
1213 ready: true,
2431bdb4 1214 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
8735b4e5
JB
1215 minSize: expect.any(Number),
1216 maxSize: expect.any(Number),
1217 workerNodes: expect.any(Number),
1218 idleWorkerNodes: expect.any(Number),
1219 busyWorkerNodes: expect.any(Number),
1220 executedTasks: expect.any(Number),
1221 executingTasks: expect.any(Number),
1222 failedTasks: expect.any(Number)
1223 })
1224 await pool.destroy()
1225 })
1226
3e8611a8 1227 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
b1aae695 1228 const pool = new FixedThreadPool(
8735b4e5 1229 numberOfWorkers,
b2fd3f4a 1230 './tests/worker-files/thread/testWorker.mjs',
8735b4e5
JB
1231 {
1232 enableTasksQueue: true
1233 }
1234 )
a074ffee 1235 stub(pool, 'hasBackPressure').returns(true)
c726f66c 1236 expect(pool.emitter.eventNames()).toStrictEqual([])
8735b4e5
JB
1237 const promises = new Set()
1238 let poolBackPressure = 0
1239 let poolInfo
041dc05b 1240 pool.emitter.on(PoolEvents.backPressure, info => {
8735b4e5
JB
1241 ++poolBackPressure
1242 poolInfo = info
1243 })
c726f66c 1244 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.backPressure])
033f1776 1245 for (let i = 0; i < numberOfWorkers + 1; i++) {
8735b4e5
JB
1246 promises.add(pool.execute())
1247 }
1248 await Promise.all(promises)
033f1776 1249 expect(poolBackPressure).toBe(1)
8735b4e5
JB
1250 expect(poolInfo).toStrictEqual({
1251 version,
3e8611a8 1252 type: PoolTypes.fixed,
8735b4e5 1253 worker: WorkerTypes.thread,
47352846
JB
1254 started: true,
1255 ready: true,
8735b4e5 1256 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
d46660cd
JB
1257 minSize: expect.any(Number),
1258 maxSize: expect.any(Number),
1259 workerNodes: expect.any(Number),
1260 idleWorkerNodes: expect.any(Number),
1261 busyWorkerNodes: expect.any(Number),
a4e07f72
JB
1262 executedTasks: expect.any(Number),
1263 executingTasks: expect.any(Number),
3e8611a8
JB
1264 maxQueuedTasks: expect.any(Number),
1265 queuedTasks: expect.any(Number),
1266 backPressure: true,
68cbdc84 1267 stolenTasks: expect.any(Number),
a4e07f72 1268 failedTasks: expect.any(Number)
d46660cd 1269 })
e458c82e 1270 expect(pool.hasBackPressure.callCount).toBe(5)
fd7ebd49 1271 await pool.destroy()
7c0ba920 1272 })
70a4f5ea 1273
f18fd12b
JB
1274 it('Verify that pool asynchronous resource track tasks execution', async () => {
1275 let taskAsyncId
1276 let initCalls = 0
1277 let beforeCalls = 0
1278 let afterCalls = 0
1279 let resolveCalls = 0
1280 const hook = createHook({
1281 init (asyncId, type) {
1282 if (type === 'poolifier:task') {
1283 initCalls++
1284 taskAsyncId = asyncId
1285 }
1286 },
1287 before (asyncId) {
1288 if (asyncId === taskAsyncId) beforeCalls++
1289 },
1290 after (asyncId) {
1291 if (asyncId === taskAsyncId) afterCalls++
1292 },
1293 promiseResolve () {
1294 if (executionAsyncId() === taskAsyncId) resolveCalls++
1295 }
1296 })
f18fd12b
JB
1297 const pool = new FixedThreadPool(
1298 numberOfWorkers,
1299 './tests/worker-files/thread/testWorker.mjs'
1300 )
8954c0a3 1301 hook.enable()
f18fd12b
JB
1302 await pool.execute()
1303 hook.disable()
1304 expect(initCalls).toBe(1)
1305 expect(beforeCalls).toBe(1)
1306 expect(afterCalls).toBe(1)
1307 expect(resolveCalls).toBe(1)
8954c0a3 1308 await pool.destroy()
f18fd12b
JB
1309 })
1310
9eae3c69
JB
1311 it('Verify that hasTaskFunction() is working', async () => {
1312 const dynamicThreadPool = new DynamicThreadPool(
1313 Math.floor(numberOfWorkers / 2),
1314 numberOfWorkers,
b2fd3f4a 1315 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
9eae3c69
JB
1316 )
1317 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1318 expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1319 expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1320 true
1321 )
1322 expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true)
1323 expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true)
1324 expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false)
1325 await dynamicThreadPool.destroy()
1326 const fixedClusterPool = new FixedClusterPool(
1327 numberOfWorkers,
1328 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1329 )
1330 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1331 expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1332 expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1333 true
1334 )
1335 expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true)
1336 expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true)
1337 expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false)
1338 await fixedClusterPool.destroy()
1339 })
1340
1341 it('Verify that addTaskFunction() is working', async () => {
1342 const dynamicThreadPool = new DynamicThreadPool(
1343 Math.floor(numberOfWorkers / 2),
1344 numberOfWorkers,
b2fd3f4a 1345 './tests/worker-files/thread/testWorker.mjs'
9eae3c69
JB
1346 )
1347 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
3feeab69
JB
1348 await expect(
1349 dynamicThreadPool.addTaskFunction(0, () => {})
948faff7 1350 ).rejects.toThrow(new TypeError('name argument must be a string'))
3feeab69
JB
1351 await expect(
1352 dynamicThreadPool.addTaskFunction('', () => {})
948faff7 1353 ).rejects.toThrow(
3feeab69
JB
1354 new TypeError('name argument must not be an empty string')
1355 )
948faff7
JB
1356 await expect(dynamicThreadPool.addTaskFunction('test', 0)).rejects.toThrow(
1357 new TypeError('fn argument must be a function')
1358 )
1359 await expect(dynamicThreadPool.addTaskFunction('test', '')).rejects.toThrow(
1360 new TypeError('fn argument must be a function')
1361 )
9eae3c69
JB
1362 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1363 DEFAULT_TASK_NAME,
1364 'test'
1365 ])
1366 const echoTaskFunction = data => {
1367 return data
1368 }
1369 await expect(
1370 dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1371 ).resolves.toBe(true)
1372 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1373 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1374 echoTaskFunction
1375 )
1376 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1377 DEFAULT_TASK_NAME,
1378 'test',
1379 'echo'
1380 ])
1381 const taskFunctionData = { test: 'test' }
1382 const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
1383 expect(echoResult).toStrictEqual(taskFunctionData)
adee6053
JB
1384 for (const workerNode of dynamicThreadPool.workerNodes) {
1385 expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
1386 tasks: {
1387 executed: expect.any(Number),
1388 executing: 0,
1389 queued: 0,
463226a4 1390 sequentiallyStolen: 0,
5ad42e34 1391 stolen: 0,
adee6053
JB
1392 failed: 0
1393 },
1394 runTime: {
1395 history: new CircularArray()
1396 },
1397 waitTime: {
1398 history: new CircularArray()
1399 },
1400 elu: {
1401 idle: {
1402 history: new CircularArray()
1403 },
1404 active: {
1405 history: new CircularArray()
1406 }
1407 }
1408 })
1409 }
9eae3c69
JB
1410 await dynamicThreadPool.destroy()
1411 })
1412
1413 it('Verify that removeTaskFunction() is working', async () => {
1414 const dynamicThreadPool = new DynamicThreadPool(
1415 Math.floor(numberOfWorkers / 2),
1416 numberOfWorkers,
b2fd3f4a 1417 './tests/worker-files/thread/testWorker.mjs'
9eae3c69
JB
1418 )
1419 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1420 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1421 DEFAULT_TASK_NAME,
1422 'test'
1423 ])
948faff7 1424 await expect(dynamicThreadPool.removeTaskFunction('test')).rejects.toThrow(
16248b23 1425 new Error('Cannot remove a task function not handled on the pool side')
9eae3c69
JB
1426 )
1427 const echoTaskFunction = data => {
1428 return data
1429 }
1430 await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1431 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1432 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1433 echoTaskFunction
1434 )
1435 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1436 DEFAULT_TASK_NAME,
1437 'test',
1438 'echo'
1439 ])
1440 await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
1441 true
1442 )
1443 expect(dynamicThreadPool.taskFunctions.size).toBe(0)
1444 expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
1445 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1446 DEFAULT_TASK_NAME,
1447 'test'
1448 ])
1449 await dynamicThreadPool.destroy()
1450 })
1451
30500265 1452 it('Verify that listTaskFunctionNames() is working', async () => {
90d7d101
JB
1453 const dynamicThreadPool = new DynamicThreadPool(
1454 Math.floor(numberOfWorkers / 2),
1455 numberOfWorkers,
b2fd3f4a 1456 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
90d7d101
JB
1457 )
1458 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
66979634 1459 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
6cd5248f 1460 DEFAULT_TASK_NAME,
90d7d101
JB
1461 'jsonIntegerSerialization',
1462 'factorial',
1463 'fibonacci'
1464 ])
9eae3c69 1465 await dynamicThreadPool.destroy()
90d7d101
JB
1466 const fixedClusterPool = new FixedClusterPool(
1467 numberOfWorkers,
1468 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1469 )
1470 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
66979634 1471 expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([
6cd5248f 1472 DEFAULT_TASK_NAME,
90d7d101
JB
1473 'jsonIntegerSerialization',
1474 'factorial',
1475 'fibonacci'
1476 ])
0fe39c97 1477 await fixedClusterPool.destroy()
90d7d101
JB
1478 })
1479
9eae3c69 1480 it('Verify that setDefaultTaskFunction() is working', async () => {
30500265
JB
1481 const dynamicThreadPool = new DynamicThreadPool(
1482 Math.floor(numberOfWorkers / 2),
1483 numberOfWorkers,
b2fd3f4a 1484 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
30500265
JB
1485 )
1486 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
711623b8 1487 const workerId = dynamicThreadPool.workerNodes[0].info.id
948faff7 1488 await expect(dynamicThreadPool.setDefaultTaskFunction(0)).rejects.toThrow(
b0b55f57 1489 new Error(
711623b8 1490 `Task function operation 'default' failed on worker ${workerId} with error: 'TypeError: name parameter is not a string'`
b0b55f57
JB
1491 )
1492 )
1493 await expect(
1494 dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
948faff7 1495 ).rejects.toThrow(
b0b55f57 1496 new Error(
711623b8 1497 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function reserved name as the default task function'`
b0b55f57
JB
1498 )
1499 )
1500 await expect(
1501 dynamicThreadPool.setDefaultTaskFunction('unknown')
948faff7 1502 ).rejects.toThrow(
b0b55f57 1503 new Error(
711623b8 1504 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function to a non-existing task function'`
b0b55f57
JB
1505 )
1506 )
9eae3c69
JB
1507 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1508 DEFAULT_TASK_NAME,
1509 'jsonIntegerSerialization',
1510 'factorial',
1511 'fibonacci'
1512 ])
1513 await expect(
1514 dynamicThreadPool.setDefaultTaskFunction('factorial')
1515 ).resolves.toBe(true)
1516 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1517 DEFAULT_TASK_NAME,
1518 'factorial',
1519 'jsonIntegerSerialization',
1520 'fibonacci'
1521 ])
1522 await expect(
1523 dynamicThreadPool.setDefaultTaskFunction('fibonacci')
1524 ).resolves.toBe(true)
1525 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1526 DEFAULT_TASK_NAME,
1527 'fibonacci',
1528 'jsonIntegerSerialization',
1529 'factorial'
1530 ])
cda9ba34 1531 await dynamicThreadPool.destroy()
30500265
JB
1532 })
1533
90d7d101 1534 it('Verify that multiple task functions worker is working', async () => {
70a4f5ea 1535 const pool = new DynamicClusterPool(
2431bdb4 1536 Math.floor(numberOfWorkers / 2),
70a4f5ea 1537 numberOfWorkers,
90d7d101 1538 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
70a4f5ea
JB
1539 )
1540 const data = { n: 10 }
82888165 1541 const result0 = await pool.execute(data)
30b963d4 1542 expect(result0).toStrictEqual({ ok: 1 })
70a4f5ea 1543 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
30b963d4 1544 expect(result1).toStrictEqual({ ok: 1 })
70a4f5ea
JB
1545 const result2 = await pool.execute(data, 'factorial')
1546 expect(result2).toBe(3628800)
1547 const result3 = await pool.execute(data, 'fibonacci')
024daf59 1548 expect(result3).toBe(55)
5bb5be17
JB
1549 expect(pool.info.executingTasks).toBe(0)
1550 expect(pool.info.executedTasks).toBe(4)
b414b84c 1551 for (const workerNode of pool.workerNodes) {
66979634 1552 expect(workerNode.info.taskFunctionNames).toStrictEqual([
6cd5248f 1553 DEFAULT_TASK_NAME,
b414b84c
JB
1554 'jsonIntegerSerialization',
1555 'factorial',
1556 'fibonacci'
1557 ])
1558 expect(workerNode.taskFunctionsUsage.size).toBe(3)
66979634 1559 for (const name of pool.listTaskFunctionNames()) {
5bb5be17
JB
1560 expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
1561 tasks: {
1562 executed: expect.any(Number),
4ba4c7f9 1563 executing: 0,
5bb5be17 1564 failed: 0,
68cbdc84 1565 queued: 0,
463226a4 1566 sequentiallyStolen: 0,
68cbdc84 1567 stolen: 0
5bb5be17
JB
1568 },
1569 runTime: {
1570 history: expect.any(CircularArray)
1571 },
1572 waitTime: {
1573 history: expect.any(CircularArray)
1574 },
1575 elu: {
1576 idle: {
1577 history: expect.any(CircularArray)
1578 },
1579 active: {
1580 history: expect.any(CircularArray)
1581 }
1582 }
1583 })
1584 expect(
4ba4c7f9
JB
1585 workerNode.getTaskFunctionWorkerUsage(name).tasks.executed
1586 ).toBeGreaterThan(0)
5bb5be17 1587 }
dfd7ec01
JB
1588 expect(
1589 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1590 ).toStrictEqual(
66979634
JB
1591 workerNode.getTaskFunctionWorkerUsage(
1592 workerNode.info.taskFunctionNames[1]
1593 )
dfd7ec01 1594 )
5bb5be17 1595 }
0fe39c97 1596 await pool.destroy()
70a4f5ea 1597 })
52a23942
JB
1598
1599 it('Verify sendKillMessageToWorker()', async () => {
1600 const pool = new DynamicClusterPool(
1601 Math.floor(numberOfWorkers / 2),
1602 numberOfWorkers,
1603 './tests/worker-files/cluster/testWorker.js'
1604 )
1605 const workerNodeKey = 0
1606 await expect(
adee6053 1607 pool.sendKillMessageToWorker(workerNodeKey)
52a23942
JB
1608 ).resolves.toBeUndefined()
1609 await pool.destroy()
1610 })
adee6053
JB
1611
1612 it('Verify sendTaskFunctionOperationToWorker()', async () => {
1613 const pool = new DynamicClusterPool(
1614 Math.floor(numberOfWorkers / 2),
1615 numberOfWorkers,
1616 './tests/worker-files/cluster/testWorker.js'
1617 )
1618 const workerNodeKey = 0
1619 await expect(
1620 pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
1621 taskFunctionOperation: 'add',
1622 taskFunctionName: 'empty',
1623 taskFunction: (() => {}).toString()
1624 })
1625 ).resolves.toBe(true)
1626 expect(
1627 pool.workerNodes[workerNodeKey].info.taskFunctionNames
1628 ).toStrictEqual([DEFAULT_TASK_NAME, 'test', 'empty'])
1629 await pool.destroy()
1630 })
1631
1632 it('Verify sendTaskFunctionOperationToWorkers()', async () => {
1633 const pool = new DynamicClusterPool(
1634 Math.floor(numberOfWorkers / 2),
1635 numberOfWorkers,
1636 './tests/worker-files/cluster/testWorker.js'
1637 )
adee6053
JB
1638 await expect(
1639 pool.sendTaskFunctionOperationToWorkers({
1640 taskFunctionOperation: 'add',
1641 taskFunctionName: 'empty',
1642 taskFunction: (() => {}).toString()
1643 })
1644 ).resolves.toBe(true)
1645 for (const workerNode of pool.workerNodes) {
1646 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1647 DEFAULT_TASK_NAME,
1648 'test',
1649 'empty'
1650 ])
1651 }
1652 await pool.destroy()
1653 })
3ec964d6 1654})