chore: v3.0.5
[poolifier.git] / tests / pools / abstract-pool.test.mjs
CommitLineData
a074ffee 1import { EventEmitterAsyncResource } from 'node:events'
2eb14889 2import { dirname, join } from 'node:path'
a074ffee 3import { readFileSync } from 'node:fs'
2eb14889 4import { fileURLToPath } from 'node:url'
a074ffee
JB
5import { expect } from 'expect'
6import { restore, stub } from 'sinon'
7import {
70a4f5ea 8 DynamicClusterPool,
9e619829 9 DynamicThreadPool,
aee46736 10 FixedClusterPool,
e843b904 11 FixedThreadPool,
aee46736 12 PoolEvents,
184855e6 13 PoolTypes,
3d6dd312 14 WorkerChoiceStrategies,
184855e6 15 WorkerTypes
a074ffee
JB
16} from '../../lib/index.js'
17import { CircularArray } from '../../lib/circular-array.js'
18import { Deque } from '../../lib/deque.js'
19import { DEFAULT_TASK_NAME } from '../../lib/utils.js'
20import { waitPoolEvents } from '../test-utils.js'
21import { WorkerNode } from '../../lib/pools/worker-node.js'
e1ffb94f
JB
22
23describe('Abstract pool test suite', () => {
2eb14889
JB
24 const version = JSON.parse(
25 readFileSync(
a5e5599c 26 join(dirname(fileURLToPath(import.meta.url)), '../..', 'package.json'),
2eb14889
JB
27 'utf8'
28 )
29 ).version
fc027381 30 const numberOfWorkers = 2
a8884ffd 31 class StubPoolWithIsMain extends FixedThreadPool {
e1ffb94f
JB
32 isMain () {
33 return false
34 }
3ec964d6 35 }
3ec964d6 36
dc021bcc 37 afterEach(() => {
a074ffee 38 restore()
dc021bcc
JB
39 })
40
bcf85f7f
JB
41 it('Verify that pool can be created and destroyed', async () => {
42 const pool = new FixedThreadPool(
43 numberOfWorkers,
44 './tests/worker-files/thread/testWorker.mjs'
45 )
46 expect(pool).toBeInstanceOf(FixedThreadPool)
47 await pool.destroy()
48 })
49
50 it('Verify that pool cannot be created from a non main thread/process', () => {
8d3782fa
JB
51 expect(
52 () =>
a8884ffd 53 new StubPoolWithIsMain(
7c0ba920 54 numberOfWorkers,
b2fd3f4a 55 './tests/worker-files/thread/testWorker.mjs',
8d3782fa 56 {
041dc05b 57 errorHandler: e => console.error(e)
8d3782fa
JB
58 }
59 )
948faff7 60 ).toThrow(
e695d66f
JB
61 new Error(
62 'Cannot start a pool from a worker with the same type as the pool'
63 )
04f45163 64 )
3ec964d6 65 })
c510fea7 66
bc61cfe6
JB
67 it('Verify that pool statuses properties are set', async () => {
68 const pool = new FixedThreadPool(
69 numberOfWorkers,
b2fd3f4a 70 './tests/worker-files/thread/testWorker.mjs'
bc61cfe6
JB
71 )
72 expect(pool.starting).toBe(false)
73 expect(pool.started).toBe(true)
74 await pool.destroy()
bc61cfe6
JB
75 })
76
c510fea7 77 it('Verify that filePath is checked', () => {
948faff7 78 expect(() => new FixedThreadPool(numberOfWorkers)).toThrow(
fa015370 79 new Error("Cannot find the worker file 'undefined'")
3d6dd312
JB
80 )
81 expect(
82 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
948faff7 83 ).toThrow(new Error("Cannot find the worker file './dummyWorker.ts'"))
8d3782fa
JB
84 })
85
86 it('Verify that numberOfWorkers is checked', () => {
8003c026
JB
87 expect(
88 () =>
89 new FixedThreadPool(
90 undefined,
b2fd3f4a 91 './tests/worker-files/thread/testWorker.mjs'
8003c026 92 )
948faff7 93 ).toThrow(
e695d66f
JB
94 new Error(
95 'Cannot instantiate a pool without specifying the number of workers'
96 )
8d3782fa
JB
97 )
98 })
99
100 it('Verify that a negative number of workers is checked', () => {
101 expect(
102 () =>
103 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
948faff7 104 ).toThrow(
473c717a
JB
105 new RangeError(
106 'Cannot instantiate a pool with a negative number of workers'
107 )
8d3782fa
JB
108 )
109 })
110
111 it('Verify that a non integer number of workers is checked', () => {
112 expect(
113 () =>
b2fd3f4a 114 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.mjs')
948faff7 115 ).toThrow(
473c717a 116 new TypeError(
0d80593b 117 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
118 )
119 )
c510fea7 120 })
7c0ba920 121
216541b6 122 it('Verify that dynamic pool sizing is checked', () => {
a5ed75b7
JB
123 expect(
124 () =>
125 new DynamicClusterPool(
126 1,
127 undefined,
128 './tests/worker-files/cluster/testWorker.js'
129 )
948faff7 130 ).toThrow(
a5ed75b7
JB
131 new TypeError(
132 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
133 )
134 )
2761efb4
JB
135 expect(
136 () =>
137 new DynamicThreadPool(
138 0.5,
139 1,
b2fd3f4a 140 './tests/worker-files/thread/testWorker.mjs'
2761efb4 141 )
948faff7 142 ).toThrow(
2761efb4
JB
143 new TypeError(
144 'Cannot instantiate a pool with a non safe integer number of workers'
145 )
146 )
147 expect(
148 () =>
149 new DynamicClusterPool(
150 0,
151 0.5,
a5ed75b7 152 './tests/worker-files/cluster/testWorker.js'
2761efb4 153 )
948faff7 154 ).toThrow(
2761efb4
JB
155 new TypeError(
156 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
157 )
158 )
2431bdb4
JB
159 expect(
160 () =>
b2fd3f4a
JB
161 new DynamicThreadPool(
162 2,
163 1,
164 './tests/worker-files/thread/testWorker.mjs'
165 )
948faff7 166 ).toThrow(
2431bdb4
JB
167 new RangeError(
168 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
169 )
170 )
171 expect(
172 () =>
b2fd3f4a
JB
173 new DynamicThreadPool(
174 0,
175 0,
176 './tests/worker-files/thread/testWorker.mjs'
177 )
948faff7 178 ).toThrow(
2431bdb4 179 new RangeError(
213cbac6 180 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
2431bdb4
JB
181 )
182 )
21f710aa
JB
183 expect(
184 () =>
213cbac6
JB
185 new DynamicClusterPool(
186 1,
187 1,
188 './tests/worker-files/cluster/testWorker.js'
189 )
948faff7 190 ).toThrow(
21f710aa 191 new RangeError(
213cbac6 192 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
21f710aa
JB
193 )
194 )
2431bdb4
JB
195 })
196
fd7ebd49 197 it('Verify that pool options are checked', async () => {
7c0ba920
JB
198 let pool = new FixedThreadPool(
199 numberOfWorkers,
b2fd3f4a 200 './tests/worker-files/thread/testWorker.mjs'
7c0ba920 201 )
b5604034 202 expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
47352846
JB
203 expect(pool.opts).toStrictEqual({
204 startWorkers: true,
205 enableEvents: true,
206 restartWorkerOnError: true,
207 enableTasksQueue: false,
208 workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
209 workerChoiceStrategyOptions: {
210 retries: 6,
211 runTime: { median: false },
212 waitTime: { median: false },
213 elu: { median: false }
214 }
8990357d
JB
215 })
216 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
8c0b113f 217 retries: 6,
932fc8be 218 runTime: { median: false },
5df69fab
JB
219 waitTime: { median: false },
220 elu: { median: false }
da309861 221 })
999ef664
JB
222 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
223 .workerChoiceStrategies) {
224 expect(workerChoiceStrategy.opts).toStrictEqual({
225 retries: 6,
226 runTime: { median: false },
227 waitTime: { median: false },
228 elu: { median: false }
229 })
230 }
fd7ebd49 231 await pool.destroy()
73bfd59d 232 const testHandler = () => console.info('test handler executed')
7c0ba920
JB
233 pool = new FixedThreadPool(
234 numberOfWorkers,
b2fd3f4a 235 './tests/worker-files/thread/testWorker.mjs',
7c0ba920 236 {
e4543b14 237 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
49be33fe 238 workerChoiceStrategyOptions: {
932fc8be 239 runTime: { median: true },
fc027381 240 weights: { 0: 300, 1: 200 }
49be33fe 241 },
35cf1c03 242 enableEvents: false,
1f68cede 243 restartWorkerOnError: false,
ff733df7 244 enableTasksQueue: true,
d4aeae5a 245 tasksQueueOptions: { concurrency: 2 },
35cf1c03
JB
246 messageHandler: testHandler,
247 errorHandler: testHandler,
248 onlineHandler: testHandler,
249 exitHandler: testHandler
7c0ba920
JB
250 }
251 )
7c0ba920 252 expect(pool.emitter).toBeUndefined()
47352846
JB
253 expect(pool.opts).toStrictEqual({
254 startWorkers: true,
255 enableEvents: false,
256 restartWorkerOnError: false,
257 enableTasksQueue: true,
258 tasksQueueOptions: {
259 concurrency: 2,
2324f8c9 260 size: Math.pow(numberOfWorkers, 2),
dbd73092 261 taskStealing: true,
47352846
JB
262 tasksStealingOnBackPressure: true
263 },
264 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
265 workerChoiceStrategyOptions: {
266 retries: 6,
267 runTime: { median: true },
268 waitTime: { median: false },
269 elu: { median: false },
270 weights: { 0: 300, 1: 200 }
271 },
272 onlineHandler: testHandler,
273 messageHandler: testHandler,
274 errorHandler: testHandler,
275 exitHandler: testHandler
8990357d
JB
276 })
277 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
8c0b113f 278 retries: 6,
8990357d
JB
279 runTime: { median: true },
280 waitTime: { median: false },
281 elu: { median: false },
fc027381 282 weights: { 0: 300, 1: 200 }
da309861 283 })
999ef664
JB
284 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
285 .workerChoiceStrategies) {
286 expect(workerChoiceStrategy.opts).toStrictEqual({
287 retries: 6,
288 runTime: { median: true },
289 waitTime: { median: false },
290 elu: { median: false },
291 weights: { 0: 300, 1: 200 }
292 })
293 }
fd7ebd49 294 await pool.destroy()
7c0ba920
JB
295 })
296
fe291b64 297 it('Verify that pool options are validated', () => {
d4aeae5a
JB
298 expect(
299 () =>
300 new FixedThreadPool(
301 numberOfWorkers,
b2fd3f4a 302 './tests/worker-files/thread/testWorker.mjs',
d4aeae5a 303 {
f0d7f803 304 workerChoiceStrategy: 'invalidStrategy'
d4aeae5a
JB
305 }
306 )
948faff7 307 ).toThrow(new Error("Invalid worker choice strategy 'invalidStrategy'"))
7e653ee0
JB
308 expect(
309 () =>
310 new FixedThreadPool(
311 numberOfWorkers,
b2fd3f4a 312 './tests/worker-files/thread/testWorker.mjs',
7e653ee0
JB
313 {
314 workerChoiceStrategyOptions: {
8c0b113f 315 retries: 'invalidChoiceRetries'
7e653ee0
JB
316 }
317 }
318 )
948faff7 319 ).toThrow(
7e653ee0 320 new TypeError(
8c0b113f 321 'Invalid worker choice strategy options: retries must be an integer'
7e653ee0
JB
322 )
323 )
324 expect(
325 () =>
326 new FixedThreadPool(
327 numberOfWorkers,
b2fd3f4a 328 './tests/worker-files/thread/testWorker.mjs',
7e653ee0
JB
329 {
330 workerChoiceStrategyOptions: {
8c0b113f 331 retries: -1
7e653ee0
JB
332 }
333 }
334 )
948faff7 335 ).toThrow(
7e653ee0 336 new RangeError(
8c0b113f 337 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
7e653ee0
JB
338 )
339 )
49be33fe
JB
340 expect(
341 () =>
342 new FixedThreadPool(
343 numberOfWorkers,
b2fd3f4a 344 './tests/worker-files/thread/testWorker.mjs',
49be33fe
JB
345 {
346 workerChoiceStrategyOptions: { weights: {} }
347 }
348 )
948faff7 349 ).toThrow(
8735b4e5
JB
350 new Error(
351 'Invalid worker choice strategy options: must have a weight for each worker node'
352 )
49be33fe 353 )
f0d7f803
JB
354 expect(
355 () =>
356 new FixedThreadPool(
357 numberOfWorkers,
b2fd3f4a 358 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
359 {
360 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
361 }
362 )
948faff7 363 ).toThrow(
8735b4e5
JB
364 new Error(
365 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
366 )
f0d7f803 367 )
5c4c2dee
JB
368 expect(
369 () =>
370 new FixedThreadPool(
371 numberOfWorkers,
b2fd3f4a 372 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
373 {
374 enableTasksQueue: true,
375 tasksQueueOptions: 'invalidTasksQueueOptions'
376 }
377 )
948faff7 378 ).toThrow(
5c4c2dee
JB
379 new TypeError('Invalid tasks queue options: must be a plain object')
380 )
f0d7f803
JB
381 expect(
382 () =>
383 new FixedThreadPool(
384 numberOfWorkers,
b2fd3f4a 385 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
386 {
387 enableTasksQueue: true,
388 tasksQueueOptions: { concurrency: 0 }
389 }
390 )
948faff7 391 ).toThrow(
e695d66f 392 new RangeError(
20c6f652 393 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
8735b4e5
JB
394 )
395 )
f0d7f803
JB
396 expect(
397 () =>
398 new FixedThreadPool(
399 numberOfWorkers,
b2fd3f4a 400 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
401 {
402 enableTasksQueue: true,
5c4c2dee 403 tasksQueueOptions: { concurrency: -1 }
f0d7f803
JB
404 }
405 )
948faff7 406 ).toThrow(
5c4c2dee
JB
407 new RangeError(
408 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
409 )
8735b4e5 410 )
f0d7f803
JB
411 expect(
412 () =>
413 new FixedThreadPool(
414 numberOfWorkers,
b2fd3f4a 415 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
416 {
417 enableTasksQueue: true,
418 tasksQueueOptions: { concurrency: 0.2 }
419 }
420 )
948faff7 421 ).toThrow(
20c6f652 422 new TypeError('Invalid worker node tasks concurrency: must be an integer')
8735b4e5 423 )
5c4c2dee
JB
424 expect(
425 () =>
426 new FixedThreadPool(
427 numberOfWorkers,
b2fd3f4a 428 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
429 {
430 enableTasksQueue: true,
431 tasksQueueOptions: { size: 0 }
432 }
433 )
948faff7 434 ).toThrow(
5c4c2dee
JB
435 new RangeError(
436 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
437 )
438 )
439 expect(
440 () =>
441 new FixedThreadPool(
442 numberOfWorkers,
b2fd3f4a 443 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
444 {
445 enableTasksQueue: true,
446 tasksQueueOptions: { size: -1 }
447 }
448 )
948faff7 449 ).toThrow(
5c4c2dee
JB
450 new RangeError(
451 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
452 )
453 )
454 expect(
455 () =>
456 new FixedThreadPool(
457 numberOfWorkers,
b2fd3f4a 458 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
459 {
460 enableTasksQueue: true,
461 tasksQueueOptions: { size: 0.2 }
462 }
463 )
948faff7 464 ).toThrow(
5c4c2dee
JB
465 new TypeError('Invalid worker node tasks queue size: must be an integer')
466 )
d4aeae5a
JB
467 })
468
2431bdb4 469 it('Verify that pool worker choice strategy options can be set', async () => {
a20f0ba5
JB
470 const pool = new FixedThreadPool(
471 numberOfWorkers,
b2fd3f4a 472 './tests/worker-files/thread/testWorker.mjs',
a20f0ba5
JB
473 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
474 )
475 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
8c0b113f 476 retries: 6,
8990357d
JB
477 runTime: { median: false },
478 waitTime: { median: false },
479 elu: { median: false }
480 })
481 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
8c0b113f 482 retries: 6,
932fc8be 483 runTime: { median: false },
5df69fab
JB
484 waitTime: { median: false },
485 elu: { median: false }
a20f0ba5
JB
486 })
487 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
488 .workerChoiceStrategies) {
86bf340d 489 expect(workerChoiceStrategy.opts).toStrictEqual({
8c0b113f 490 retries: 6,
932fc8be 491 runTime: { median: false },
5df69fab
JB
492 waitTime: { median: false },
493 elu: { median: false }
86bf340d 494 })
a20f0ba5 495 }
87de9ff5
JB
496 expect(
497 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
498 ).toStrictEqual({
932fc8be
JB
499 runTime: {
500 aggregate: true,
501 average: true,
502 median: false
503 },
504 waitTime: {
505 aggregate: false,
506 average: false,
507 median: false
508 },
5df69fab 509 elu: {
9adcefab
JB
510 aggregate: true,
511 average: true,
5df69fab
JB
512 median: false
513 }
86bf340d 514 })
9adcefab
JB
515 pool.setWorkerChoiceStrategyOptions({
516 runTime: { median: true },
517 elu: { median: true }
518 })
a20f0ba5 519 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
8c0b113f 520 retries: 6,
9adcefab 521 runTime: { median: true },
8990357d
JB
522 waitTime: { median: false },
523 elu: { median: true }
524 })
525 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
8c0b113f 526 retries: 6,
8990357d
JB
527 runTime: { median: true },
528 waitTime: { median: false },
9adcefab 529 elu: { median: true }
a20f0ba5
JB
530 })
531 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
532 .workerChoiceStrategies) {
932fc8be 533 expect(workerChoiceStrategy.opts).toStrictEqual({
8c0b113f 534 retries: 6,
9adcefab 535 runTime: { median: true },
8990357d 536 waitTime: { median: false },
9adcefab 537 elu: { median: true }
932fc8be 538 })
a20f0ba5 539 }
87de9ff5
JB
540 expect(
541 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
542 ).toStrictEqual({
932fc8be
JB
543 runTime: {
544 aggregate: true,
545 average: false,
546 median: true
547 },
548 waitTime: {
549 aggregate: false,
550 average: false,
551 median: false
552 },
5df69fab 553 elu: {
9adcefab 554 aggregate: true,
5df69fab 555 average: false,
9adcefab 556 median: true
5df69fab 557 }
86bf340d 558 })
9adcefab
JB
559 pool.setWorkerChoiceStrategyOptions({
560 runTime: { median: false },
561 elu: { median: false }
562 })
a20f0ba5 563 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
8c0b113f 564 retries: 6,
8990357d
JB
565 runTime: { median: false },
566 waitTime: { median: false },
567 elu: { median: false }
568 })
569 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
8c0b113f 570 retries: 6,
9adcefab 571 runTime: { median: false },
8990357d 572 waitTime: { median: false },
9adcefab 573 elu: { median: false }
a20f0ba5
JB
574 })
575 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
576 .workerChoiceStrategies) {
932fc8be 577 expect(workerChoiceStrategy.opts).toStrictEqual({
8c0b113f 578 retries: 6,
9adcefab 579 runTime: { median: false },
8990357d 580 waitTime: { median: false },
9adcefab 581 elu: { median: false }
932fc8be 582 })
a20f0ba5 583 }
87de9ff5
JB
584 expect(
585 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
586 ).toStrictEqual({
932fc8be
JB
587 runTime: {
588 aggregate: true,
589 average: true,
590 median: false
591 },
592 waitTime: {
593 aggregate: false,
594 average: false,
595 median: false
596 },
5df69fab 597 elu: {
9adcefab
JB
598 aggregate: true,
599 average: true,
5df69fab
JB
600 median: false
601 }
86bf340d 602 })
1f95d544
JB
603 expect(() =>
604 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
948faff7 605 ).toThrow(
8735b4e5
JB
606 new TypeError(
607 'Invalid worker choice strategy options: must be a plain object'
608 )
1f95d544 609 )
7e653ee0
JB
610 expect(() =>
611 pool.setWorkerChoiceStrategyOptions({
8c0b113f 612 retries: 'invalidChoiceRetries'
7e653ee0 613 })
948faff7 614 ).toThrow(
7e653ee0 615 new TypeError(
8c0b113f 616 'Invalid worker choice strategy options: retries must be an integer'
7e653ee0
JB
617 )
618 )
948faff7 619 expect(() => pool.setWorkerChoiceStrategyOptions({ retries: -1 })).toThrow(
7e653ee0 620 new RangeError(
8c0b113f 621 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
7e653ee0
JB
622 )
623 )
948faff7 624 expect(() => pool.setWorkerChoiceStrategyOptions({ weights: {} })).toThrow(
8735b4e5
JB
625 new Error(
626 'Invalid worker choice strategy options: must have a weight for each worker node'
627 )
1f95d544
JB
628 )
629 expect(() =>
630 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
948faff7 631 ).toThrow(
8735b4e5
JB
632 new Error(
633 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
634 )
1f95d544 635 )
a20f0ba5
JB
636 await pool.destroy()
637 })
638
2431bdb4 639 it('Verify that pool tasks queue can be enabled/disabled', async () => {
a20f0ba5
JB
640 const pool = new FixedThreadPool(
641 numberOfWorkers,
b2fd3f4a 642 './tests/worker-files/thread/testWorker.mjs'
a20f0ba5
JB
643 )
644 expect(pool.opts.enableTasksQueue).toBe(false)
645 expect(pool.opts.tasksQueueOptions).toBeUndefined()
646 pool.enableTasksQueue(true)
647 expect(pool.opts.enableTasksQueue).toBe(true)
20c6f652
JB
648 expect(pool.opts.tasksQueueOptions).toStrictEqual({
649 concurrency: 1,
2324f8c9 650 size: Math.pow(numberOfWorkers, 2),
dbd73092 651 taskStealing: true,
47352846 652 tasksStealingOnBackPressure: true
20c6f652 653 })
a20f0ba5
JB
654 pool.enableTasksQueue(true, { concurrency: 2 })
655 expect(pool.opts.enableTasksQueue).toBe(true)
20c6f652
JB
656 expect(pool.opts.tasksQueueOptions).toStrictEqual({
657 concurrency: 2,
2324f8c9 658 size: Math.pow(numberOfWorkers, 2),
dbd73092 659 taskStealing: true,
47352846 660 tasksStealingOnBackPressure: true
20c6f652 661 })
a20f0ba5
JB
662 pool.enableTasksQueue(false)
663 expect(pool.opts.enableTasksQueue).toBe(false)
664 expect(pool.opts.tasksQueueOptions).toBeUndefined()
665 await pool.destroy()
666 })
667
2431bdb4 668 it('Verify that pool tasks queue options can be set', async () => {
a20f0ba5
JB
669 const pool = new FixedThreadPool(
670 numberOfWorkers,
b2fd3f4a 671 './tests/worker-files/thread/testWorker.mjs',
a20f0ba5
JB
672 { enableTasksQueue: true }
673 )
20c6f652
JB
674 expect(pool.opts.tasksQueueOptions).toStrictEqual({
675 concurrency: 1,
2324f8c9 676 size: Math.pow(numberOfWorkers, 2),
dbd73092 677 taskStealing: true,
47352846 678 tasksStealingOnBackPressure: true
20c6f652 679 })
d6ca1416 680 for (const workerNode of pool.workerNodes) {
2324f8c9
JB
681 expect(workerNode.tasksQueueBackPressureSize).toBe(
682 pool.opts.tasksQueueOptions.size
683 )
d6ca1416
JB
684 }
685 pool.setTasksQueueOptions({
686 concurrency: 2,
2324f8c9 687 size: 2,
d6ca1416
JB
688 taskStealing: false,
689 tasksStealingOnBackPressure: false
690 })
20c6f652
JB
691 expect(pool.opts.tasksQueueOptions).toStrictEqual({
692 concurrency: 2,
2324f8c9 693 size: 2,
d6ca1416
JB
694 taskStealing: false,
695 tasksStealingOnBackPressure: false
696 })
697 for (const workerNode of pool.workerNodes) {
2324f8c9
JB
698 expect(workerNode.tasksQueueBackPressureSize).toBe(
699 pool.opts.tasksQueueOptions.size
700 )
d6ca1416
JB
701 }
702 pool.setTasksQueueOptions({
703 concurrency: 1,
704 taskStealing: true,
705 tasksStealingOnBackPressure: true
706 })
707 expect(pool.opts.tasksQueueOptions).toStrictEqual({
708 concurrency: 1,
2324f8c9 709 size: Math.pow(numberOfWorkers, 2),
dbd73092 710 taskStealing: true,
47352846 711 tasksStealingOnBackPressure: true
20c6f652 712 })
d6ca1416 713 for (const workerNode of pool.workerNodes) {
2324f8c9
JB
714 expect(workerNode.tasksQueueBackPressureSize).toBe(
715 pool.opts.tasksQueueOptions.size
716 )
d6ca1416 717 }
948faff7 718 expect(() => pool.setTasksQueueOptions('invalidTasksQueueOptions')).toThrow(
8735b4e5
JB
719 new TypeError('Invalid tasks queue options: must be a plain object')
720 )
948faff7 721 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrow(
e695d66f 722 new RangeError(
20c6f652 723 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
8735b4e5
JB
724 )
725 )
948faff7 726 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrow(
e695d66f 727 new RangeError(
20c6f652 728 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
8735b4e5 729 )
a20f0ba5 730 )
948faff7 731 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrow(
20c6f652
JB
732 new TypeError('Invalid worker node tasks concurrency: must be an integer')
733 )
948faff7 734 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrow(
20c6f652 735 new RangeError(
68dbcdc0 736 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
20c6f652
JB
737 )
738 )
948faff7 739 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrow(
20c6f652 740 new RangeError(
68dbcdc0 741 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
20c6f652
JB
742 )
743 )
948faff7 744 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrow(
68dbcdc0 745 new TypeError('Invalid worker node tasks queue size: must be an integer')
f0d7f803 746 )
a20f0ba5
JB
747 await pool.destroy()
748 })
749
6b27d407
JB
750 it('Verify that pool info is set', async () => {
751 let pool = new FixedThreadPool(
752 numberOfWorkers,
b2fd3f4a 753 './tests/worker-files/thread/testWorker.mjs'
6b27d407 754 )
2dca6cab
JB
755 expect(pool.info).toStrictEqual({
756 version,
757 type: PoolTypes.fixed,
758 worker: WorkerTypes.thread,
47352846 759 started: true,
2dca6cab
JB
760 ready: true,
761 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
762 minSize: numberOfWorkers,
763 maxSize: numberOfWorkers,
764 workerNodes: numberOfWorkers,
765 idleWorkerNodes: numberOfWorkers,
766 busyWorkerNodes: 0,
767 executedTasks: 0,
768 executingTasks: 0,
2dca6cab
JB
769 failedTasks: 0
770 })
6b27d407
JB
771 await pool.destroy()
772 pool = new DynamicClusterPool(
2431bdb4 773 Math.floor(numberOfWorkers / 2),
6b27d407 774 numberOfWorkers,
ecdfbdc0 775 './tests/worker-files/cluster/testWorker.js'
6b27d407 776 )
2dca6cab
JB
777 expect(pool.info).toStrictEqual({
778 version,
779 type: PoolTypes.dynamic,
780 worker: WorkerTypes.cluster,
47352846 781 started: true,
2dca6cab
JB
782 ready: true,
783 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
784 minSize: Math.floor(numberOfWorkers / 2),
785 maxSize: numberOfWorkers,
786 workerNodes: Math.floor(numberOfWorkers / 2),
787 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
788 busyWorkerNodes: 0,
789 executedTasks: 0,
790 executingTasks: 0,
2dca6cab
JB
791 failedTasks: 0
792 })
6b27d407
JB
793 await pool.destroy()
794 })
795
2431bdb4 796 it('Verify that pool worker tasks usage are initialized', async () => {
bf9549ae
JB
797 const pool = new FixedClusterPool(
798 numberOfWorkers,
799 './tests/worker-files/cluster/testWorker.js'
800 )
f06e48d8 801 for (const workerNode of pool.workerNodes) {
47352846 802 expect(workerNode).toBeInstanceOf(WorkerNode)
465b2940 803 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
804 tasks: {
805 executed: 0,
806 executing: 0,
807 queued: 0,
df593701 808 maxQueued: 0,
68cbdc84 809 stolen: 0,
a4e07f72
JB
810 failed: 0
811 },
812 runTime: {
4ba4c7f9 813 history: new CircularArray()
a4e07f72
JB
814 },
815 waitTime: {
4ba4c7f9 816 history: new CircularArray()
a4e07f72 817 },
5df69fab
JB
818 elu: {
819 idle: {
4ba4c7f9 820 history: new CircularArray()
5df69fab
JB
821 },
822 active: {
4ba4c7f9 823 history: new CircularArray()
f7510105 824 }
5df69fab 825 }
86bf340d 826 })
f06e48d8
JB
827 }
828 await pool.destroy()
829 })
830
2431bdb4
JB
831 it('Verify that pool worker tasks queue are initialized', async () => {
832 let pool = new FixedClusterPool(
f06e48d8
JB
833 numberOfWorkers,
834 './tests/worker-files/cluster/testWorker.js'
835 )
836 for (const workerNode of pool.workerNodes) {
47352846 837 expect(workerNode).toBeInstanceOf(WorkerNode)
574b351d 838 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
4d8bf9e4 839 expect(workerNode.tasksQueue.size).toBe(0)
9c16fb4b 840 expect(workerNode.tasksQueue.maxSize).toBe(0)
bf9549ae 841 }
fd7ebd49 842 await pool.destroy()
2431bdb4
JB
843 pool = new DynamicThreadPool(
844 Math.floor(numberOfWorkers / 2),
845 numberOfWorkers,
b2fd3f4a 846 './tests/worker-files/thread/testWorker.mjs'
2431bdb4
JB
847 )
848 for (const workerNode of pool.workerNodes) {
47352846 849 expect(workerNode).toBeInstanceOf(WorkerNode)
574b351d 850 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
2431bdb4
JB
851 expect(workerNode.tasksQueue.size).toBe(0)
852 expect(workerNode.tasksQueue.maxSize).toBe(0)
853 }
213cbac6 854 await pool.destroy()
2431bdb4
JB
855 })
856
857 it('Verify that pool worker info are initialized', async () => {
858 let pool = new FixedClusterPool(
859 numberOfWorkers,
860 './tests/worker-files/cluster/testWorker.js'
861 )
2dca6cab 862 for (const workerNode of pool.workerNodes) {
47352846 863 expect(workerNode).toBeInstanceOf(WorkerNode)
2dca6cab
JB
864 expect(workerNode.info).toStrictEqual({
865 id: expect.any(Number),
866 type: WorkerTypes.cluster,
867 dynamic: false,
868 ready: true
869 })
870 }
2431bdb4
JB
871 await pool.destroy()
872 pool = new DynamicThreadPool(
873 Math.floor(numberOfWorkers / 2),
874 numberOfWorkers,
b2fd3f4a 875 './tests/worker-files/thread/testWorker.mjs'
2431bdb4 876 )
2dca6cab 877 for (const workerNode of pool.workerNodes) {
47352846 878 expect(workerNode).toBeInstanceOf(WorkerNode)
2dca6cab
JB
879 expect(workerNode.info).toStrictEqual({
880 id: expect.any(Number),
881 type: WorkerTypes.thread,
882 dynamic: false,
7884d183 883 ready: true
2dca6cab
JB
884 })
885 }
213cbac6 886 await pool.destroy()
bf9549ae
JB
887 })
888
47352846
JB
889 it('Verify that pool can be started after initialization', async () => {
890 const pool = new FixedClusterPool(
891 numberOfWorkers,
892 './tests/worker-files/cluster/testWorker.js',
893 {
894 startWorkers: false
895 }
896 )
897 expect(pool.info.started).toBe(false)
898 expect(pool.info.ready).toBe(false)
899 expect(pool.workerNodes).toStrictEqual([])
948faff7 900 await expect(pool.execute()).rejects.toThrow(
47352846
JB
901 new Error('Cannot execute a task on not started pool')
902 )
903 pool.start()
904 expect(pool.info.started).toBe(true)
905 expect(pool.info.ready).toBe(true)
906 expect(pool.workerNodes.length).toBe(numberOfWorkers)
907 for (const workerNode of pool.workerNodes) {
908 expect(workerNode).toBeInstanceOf(WorkerNode)
909 }
910 await pool.destroy()
911 })
912
9d2d0da1
JB
913 it('Verify that pool execute() arguments are checked', async () => {
914 const pool = new FixedClusterPool(
915 numberOfWorkers,
916 './tests/worker-files/cluster/testWorker.js'
917 )
948faff7 918 await expect(pool.execute(undefined, 0)).rejects.toThrow(
9d2d0da1
JB
919 new TypeError('name argument must be a string')
920 )
948faff7 921 await expect(pool.execute(undefined, '')).rejects.toThrow(
9d2d0da1
JB
922 new TypeError('name argument must not be an empty string')
923 )
948faff7 924 await expect(pool.execute(undefined, undefined, {})).rejects.toThrow(
9d2d0da1
JB
925 new TypeError('transferList argument must be an array')
926 )
927 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
928 "Task function 'unknown' not found"
929 )
930 await pool.destroy()
948faff7 931 await expect(pool.execute()).rejects.toThrow(
47352846 932 new Error('Cannot execute a task on not started pool')
9d2d0da1
JB
933 )
934 })
935
2431bdb4 936 it('Verify that pool worker tasks usage are computed', async () => {
bf9549ae
JB
937 const pool = new FixedClusterPool(
938 numberOfWorkers,
939 './tests/worker-files/cluster/testWorker.js'
940 )
09c2d0d3 941 const promises = new Set()
fc027381
JB
942 const maxMultiplier = 2
943 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
09c2d0d3 944 promises.add(pool.execute())
bf9549ae 945 }
f06e48d8 946 for (const workerNode of pool.workerNodes) {
465b2940 947 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
948 tasks: {
949 executed: 0,
950 executing: maxMultiplier,
951 queued: 0,
df593701 952 maxQueued: 0,
68cbdc84 953 stolen: 0,
a4e07f72
JB
954 failed: 0
955 },
956 runTime: {
a4e07f72
JB
957 history: expect.any(CircularArray)
958 },
959 waitTime: {
a4e07f72
JB
960 history: expect.any(CircularArray)
961 },
5df69fab
JB
962 elu: {
963 idle: {
5df69fab
JB
964 history: expect.any(CircularArray)
965 },
966 active: {
5df69fab 967 history: expect.any(CircularArray)
f7510105 968 }
5df69fab 969 }
86bf340d 970 })
bf9549ae
JB
971 }
972 await Promise.all(promises)
f06e48d8 973 for (const workerNode of pool.workerNodes) {
465b2940 974 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
975 tasks: {
976 executed: maxMultiplier,
977 executing: 0,
978 queued: 0,
df593701 979 maxQueued: 0,
68cbdc84 980 stolen: 0,
a4e07f72
JB
981 failed: 0
982 },
983 runTime: {
a4e07f72
JB
984 history: expect.any(CircularArray)
985 },
986 waitTime: {
a4e07f72
JB
987 history: expect.any(CircularArray)
988 },
5df69fab
JB
989 elu: {
990 idle: {
5df69fab
JB
991 history: expect.any(CircularArray)
992 },
993 active: {
5df69fab 994 history: expect.any(CircularArray)
f7510105 995 }
5df69fab 996 }
86bf340d 997 })
bf9549ae 998 }
fd7ebd49 999 await pool.destroy()
bf9549ae
JB
1000 })
1001
2431bdb4 1002 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
7fd82a1c 1003 const pool = new DynamicThreadPool(
2431bdb4 1004 Math.floor(numberOfWorkers / 2),
8f4878b7 1005 numberOfWorkers,
b2fd3f4a 1006 './tests/worker-files/thread/testWorker.mjs'
9e619829 1007 )
09c2d0d3 1008 const promises = new Set()
ee9f5295
JB
1009 const maxMultiplier = 2
1010 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
09c2d0d3 1011 promises.add(pool.execute())
9e619829
JB
1012 }
1013 await Promise.all(promises)
f06e48d8 1014 for (const workerNode of pool.workerNodes) {
465b2940 1015 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
1016 tasks: {
1017 executed: expect.any(Number),
1018 executing: 0,
1019 queued: 0,
df593701 1020 maxQueued: 0,
68cbdc84 1021 stolen: 0,
a4e07f72
JB
1022 failed: 0
1023 },
1024 runTime: {
a4e07f72
JB
1025 history: expect.any(CircularArray)
1026 },
1027 waitTime: {
a4e07f72
JB
1028 history: expect.any(CircularArray)
1029 },
5df69fab
JB
1030 elu: {
1031 idle: {
5df69fab
JB
1032 history: expect.any(CircularArray)
1033 },
1034 active: {
5df69fab 1035 history: expect.any(CircularArray)
f7510105 1036 }
5df69fab 1037 }
86bf340d 1038 })
465b2940 1039 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
94407def
JB
1040 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1041 numberOfWorkers * maxMultiplier
1042 )
b97d82d8
JB
1043 expect(workerNode.usage.runTime.history.length).toBe(0)
1044 expect(workerNode.usage.waitTime.history.length).toBe(0)
1045 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1046 expect(workerNode.usage.elu.active.history.length).toBe(0)
9e619829
JB
1047 }
1048 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
f06e48d8 1049 for (const workerNode of pool.workerNodes) {
465b2940 1050 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
1051 tasks: {
1052 executed: 0,
1053 executing: 0,
1054 queued: 0,
df593701 1055 maxQueued: 0,
68cbdc84 1056 stolen: 0,
a4e07f72
JB
1057 failed: 0
1058 },
1059 runTime: {
a4e07f72
JB
1060 history: expect.any(CircularArray)
1061 },
1062 waitTime: {
a4e07f72
JB
1063 history: expect.any(CircularArray)
1064 },
5df69fab
JB
1065 elu: {
1066 idle: {
5df69fab
JB
1067 history: expect.any(CircularArray)
1068 },
1069 active: {
5df69fab 1070 history: expect.any(CircularArray)
f7510105 1071 }
5df69fab 1072 }
86bf340d 1073 })
465b2940
JB
1074 expect(workerNode.usage.runTime.history.length).toBe(0)
1075 expect(workerNode.usage.waitTime.history.length).toBe(0)
b97d82d8
JB
1076 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1077 expect(workerNode.usage.elu.active.history.length).toBe(0)
ee11a4a2 1078 }
fd7ebd49 1079 await pool.destroy()
ee11a4a2
JB
1080 })
1081
a1763c54
JB
1082 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1083 const pool = new DynamicClusterPool(
2431bdb4 1084 Math.floor(numberOfWorkers / 2),
164d950a 1085 numberOfWorkers,
a1763c54 1086 './tests/worker-files/cluster/testWorker.js'
164d950a 1087 )
c726f66c 1088 expect(pool.emitter.eventNames()).toStrictEqual([])
d46660cd 1089 let poolInfo
a1763c54 1090 let poolReady = 0
041dc05b 1091 pool.emitter.on(PoolEvents.ready, info => {
a1763c54 1092 ++poolReady
d46660cd
JB
1093 poolInfo = info
1094 })
a1763c54 1095 await waitPoolEvents(pool, PoolEvents.ready, 1)
c726f66c 1096 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
a1763c54 1097 expect(poolReady).toBe(1)
d46660cd 1098 expect(poolInfo).toStrictEqual({
23ccf9d7 1099 version,
d46660cd 1100 type: PoolTypes.dynamic,
a1763c54 1101 worker: WorkerTypes.cluster,
47352846 1102 started: true,
a1763c54 1103 ready: true,
2431bdb4
JB
1104 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1105 minSize: expect.any(Number),
1106 maxSize: expect.any(Number),
1107 workerNodes: expect.any(Number),
1108 idleWorkerNodes: expect.any(Number),
1109 busyWorkerNodes: expect.any(Number),
1110 executedTasks: expect.any(Number),
1111 executingTasks: expect.any(Number),
2431bdb4
JB
1112 failedTasks: expect.any(Number)
1113 })
1114 await pool.destroy()
1115 })
1116
a1763c54
JB
1117 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1118 const pool = new FixedThreadPool(
2431bdb4 1119 numberOfWorkers,
b2fd3f4a 1120 './tests/worker-files/thread/testWorker.mjs'
2431bdb4 1121 )
c726f66c 1122 expect(pool.emitter.eventNames()).toStrictEqual([])
a1763c54
JB
1123 const promises = new Set()
1124 let poolBusy = 0
2431bdb4 1125 let poolInfo
041dc05b 1126 pool.emitter.on(PoolEvents.busy, info => {
a1763c54 1127 ++poolBusy
2431bdb4
JB
1128 poolInfo = info
1129 })
c726f66c 1130 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
a1763c54
JB
1131 for (let i = 0; i < numberOfWorkers * 2; i++) {
1132 promises.add(pool.execute())
1133 }
1134 await Promise.all(promises)
1135 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1136 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1137 expect(poolBusy).toBe(numberOfWorkers + 1)
2431bdb4
JB
1138 expect(poolInfo).toStrictEqual({
1139 version,
a1763c54
JB
1140 type: PoolTypes.fixed,
1141 worker: WorkerTypes.thread,
47352846
JB
1142 started: true,
1143 ready: true,
2431bdb4 1144 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
d46660cd
JB
1145 minSize: expect.any(Number),
1146 maxSize: expect.any(Number),
1147 workerNodes: expect.any(Number),
1148 idleWorkerNodes: expect.any(Number),
1149 busyWorkerNodes: expect.any(Number),
a4e07f72
JB
1150 executedTasks: expect.any(Number),
1151 executingTasks: expect.any(Number),
a4e07f72 1152 failedTasks: expect.any(Number)
d46660cd 1153 })
164d950a
JB
1154 await pool.destroy()
1155 })
1156
a1763c54
JB
1157 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1158 const pool = new DynamicThreadPool(
1159 Math.floor(numberOfWorkers / 2),
7c0ba920 1160 numberOfWorkers,
b2fd3f4a 1161 './tests/worker-files/thread/testWorker.mjs'
7c0ba920 1162 )
c726f66c 1163 expect(pool.emitter.eventNames()).toStrictEqual([])
09c2d0d3 1164 const promises = new Set()
a1763c54 1165 let poolFull = 0
d46660cd 1166 let poolInfo
041dc05b 1167 pool.emitter.on(PoolEvents.full, info => {
a1763c54 1168 ++poolFull
d46660cd
JB
1169 poolInfo = info
1170 })
c726f66c 1171 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.full])
7c0ba920 1172 for (let i = 0; i < numberOfWorkers * 2; i++) {
f5d14e90 1173 promises.add(pool.execute())
7c0ba920 1174 }
cf597bc5 1175 await Promise.all(promises)
33e6bb4c 1176 expect(poolFull).toBe(1)
d46660cd 1177 expect(poolInfo).toStrictEqual({
23ccf9d7 1178 version,
a1763c54 1179 type: PoolTypes.dynamic,
d46660cd 1180 worker: WorkerTypes.thread,
47352846
JB
1181 started: true,
1182 ready: true,
2431bdb4 1183 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
8735b4e5
JB
1184 minSize: expect.any(Number),
1185 maxSize: expect.any(Number),
1186 workerNodes: expect.any(Number),
1187 idleWorkerNodes: expect.any(Number),
1188 busyWorkerNodes: expect.any(Number),
1189 executedTasks: expect.any(Number),
1190 executingTasks: expect.any(Number),
1191 failedTasks: expect.any(Number)
1192 })
1193 await pool.destroy()
1194 })
1195
3e8611a8 1196 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
b1aae695 1197 const pool = new FixedThreadPool(
8735b4e5 1198 numberOfWorkers,
b2fd3f4a 1199 './tests/worker-files/thread/testWorker.mjs',
8735b4e5
JB
1200 {
1201 enableTasksQueue: true
1202 }
1203 )
a074ffee 1204 stub(pool, 'hasBackPressure').returns(true)
c726f66c 1205 expect(pool.emitter.eventNames()).toStrictEqual([])
8735b4e5
JB
1206 const promises = new Set()
1207 let poolBackPressure = 0
1208 let poolInfo
041dc05b 1209 pool.emitter.on(PoolEvents.backPressure, info => {
8735b4e5
JB
1210 ++poolBackPressure
1211 poolInfo = info
1212 })
c726f66c 1213 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.backPressure])
033f1776 1214 for (let i = 0; i < numberOfWorkers + 1; i++) {
8735b4e5
JB
1215 promises.add(pool.execute())
1216 }
1217 await Promise.all(promises)
033f1776 1218 expect(poolBackPressure).toBe(1)
8735b4e5
JB
1219 expect(poolInfo).toStrictEqual({
1220 version,
3e8611a8 1221 type: PoolTypes.fixed,
8735b4e5 1222 worker: WorkerTypes.thread,
47352846
JB
1223 started: true,
1224 ready: true,
8735b4e5 1225 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
d46660cd
JB
1226 minSize: expect.any(Number),
1227 maxSize: expect.any(Number),
1228 workerNodes: expect.any(Number),
1229 idleWorkerNodes: expect.any(Number),
1230 busyWorkerNodes: expect.any(Number),
a4e07f72
JB
1231 executedTasks: expect.any(Number),
1232 executingTasks: expect.any(Number),
3e8611a8
JB
1233 maxQueuedTasks: expect.any(Number),
1234 queuedTasks: expect.any(Number),
1235 backPressure: true,
68cbdc84 1236 stolenTasks: expect.any(Number),
a4e07f72 1237 failedTasks: expect.any(Number)
d46660cd 1238 })
3e8611a8 1239 expect(pool.hasBackPressure.called).toBe(true)
fd7ebd49 1240 await pool.destroy()
7c0ba920 1241 })
70a4f5ea 1242
9eae3c69
JB
1243 it('Verify that hasTaskFunction() is working', async () => {
1244 const dynamicThreadPool = new DynamicThreadPool(
1245 Math.floor(numberOfWorkers / 2),
1246 numberOfWorkers,
b2fd3f4a 1247 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
9eae3c69
JB
1248 )
1249 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1250 expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1251 expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1252 true
1253 )
1254 expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true)
1255 expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true)
1256 expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false)
1257 await dynamicThreadPool.destroy()
1258 const fixedClusterPool = new FixedClusterPool(
1259 numberOfWorkers,
1260 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1261 )
1262 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1263 expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1264 expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1265 true
1266 )
1267 expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true)
1268 expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true)
1269 expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false)
1270 await fixedClusterPool.destroy()
1271 })
1272
1273 it('Verify that addTaskFunction() is working', async () => {
1274 const dynamicThreadPool = new DynamicThreadPool(
1275 Math.floor(numberOfWorkers / 2),
1276 numberOfWorkers,
b2fd3f4a 1277 './tests/worker-files/thread/testWorker.mjs'
9eae3c69
JB
1278 )
1279 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
3feeab69
JB
1280 await expect(
1281 dynamicThreadPool.addTaskFunction(0, () => {})
948faff7 1282 ).rejects.toThrow(new TypeError('name argument must be a string'))
3feeab69
JB
1283 await expect(
1284 dynamicThreadPool.addTaskFunction('', () => {})
948faff7 1285 ).rejects.toThrow(
3feeab69
JB
1286 new TypeError('name argument must not be an empty string')
1287 )
948faff7
JB
1288 await expect(dynamicThreadPool.addTaskFunction('test', 0)).rejects.toThrow(
1289 new TypeError('fn argument must be a function')
1290 )
1291 await expect(dynamicThreadPool.addTaskFunction('test', '')).rejects.toThrow(
1292 new TypeError('fn argument must be a function')
1293 )
9eae3c69
JB
1294 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1295 DEFAULT_TASK_NAME,
1296 'test'
1297 ])
1298 const echoTaskFunction = data => {
1299 return data
1300 }
1301 await expect(
1302 dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1303 ).resolves.toBe(true)
1304 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1305 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1306 echoTaskFunction
1307 )
1308 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1309 DEFAULT_TASK_NAME,
1310 'test',
1311 'echo'
1312 ])
1313 const taskFunctionData = { test: 'test' }
1314 const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
1315 expect(echoResult).toStrictEqual(taskFunctionData)
adee6053
JB
1316 for (const workerNode of dynamicThreadPool.workerNodes) {
1317 expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
1318 tasks: {
1319 executed: expect.any(Number),
1320 executing: 0,
1321 queued: 0,
1322 stolen: 0,
1323 failed: 0
1324 },
1325 runTime: {
1326 history: new CircularArray()
1327 },
1328 waitTime: {
1329 history: new CircularArray()
1330 },
1331 elu: {
1332 idle: {
1333 history: new CircularArray()
1334 },
1335 active: {
1336 history: new CircularArray()
1337 }
1338 }
1339 })
1340 }
9eae3c69
JB
1341 await dynamicThreadPool.destroy()
1342 })
1343
1344 it('Verify that removeTaskFunction() is working', async () => {
1345 const dynamicThreadPool = new DynamicThreadPool(
1346 Math.floor(numberOfWorkers / 2),
1347 numberOfWorkers,
b2fd3f4a 1348 './tests/worker-files/thread/testWorker.mjs'
9eae3c69
JB
1349 )
1350 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1351 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1352 DEFAULT_TASK_NAME,
1353 'test'
1354 ])
948faff7 1355 await expect(dynamicThreadPool.removeTaskFunction('test')).rejects.toThrow(
16248b23 1356 new Error('Cannot remove a task function not handled on the pool side')
9eae3c69
JB
1357 )
1358 const echoTaskFunction = data => {
1359 return data
1360 }
1361 await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1362 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1363 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1364 echoTaskFunction
1365 )
1366 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1367 DEFAULT_TASK_NAME,
1368 'test',
1369 'echo'
1370 ])
1371 await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
1372 true
1373 )
1374 expect(dynamicThreadPool.taskFunctions.size).toBe(0)
1375 expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
1376 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1377 DEFAULT_TASK_NAME,
1378 'test'
1379 ])
1380 await dynamicThreadPool.destroy()
1381 })
1382
30500265 1383 it('Verify that listTaskFunctionNames() is working', async () => {
90d7d101
JB
1384 const dynamicThreadPool = new DynamicThreadPool(
1385 Math.floor(numberOfWorkers / 2),
1386 numberOfWorkers,
b2fd3f4a 1387 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
90d7d101
JB
1388 )
1389 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
66979634 1390 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
6cd5248f 1391 DEFAULT_TASK_NAME,
90d7d101
JB
1392 'jsonIntegerSerialization',
1393 'factorial',
1394 'fibonacci'
1395 ])
9eae3c69 1396 await dynamicThreadPool.destroy()
90d7d101
JB
1397 const fixedClusterPool = new FixedClusterPool(
1398 numberOfWorkers,
1399 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1400 )
1401 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
66979634 1402 expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([
6cd5248f 1403 DEFAULT_TASK_NAME,
90d7d101
JB
1404 'jsonIntegerSerialization',
1405 'factorial',
1406 'fibonacci'
1407 ])
0fe39c97 1408 await fixedClusterPool.destroy()
90d7d101
JB
1409 })
1410
9eae3c69 1411 it('Verify that setDefaultTaskFunction() is working', async () => {
30500265
JB
1412 const dynamicThreadPool = new DynamicThreadPool(
1413 Math.floor(numberOfWorkers / 2),
1414 numberOfWorkers,
b2fd3f4a 1415 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
30500265
JB
1416 )
1417 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
948faff7 1418 await expect(dynamicThreadPool.setDefaultTaskFunction(0)).rejects.toThrow(
b0b55f57 1419 new Error(
bcf85f7f 1420 "Task function operation 'default' failed on worker 33 with error: 'TypeError: name parameter is not a string'"
b0b55f57
JB
1421 )
1422 )
1423 await expect(
1424 dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
948faff7 1425 ).rejects.toThrow(
b0b55f57 1426 new Error(
bcf85f7f 1427 "Task function operation 'default' failed on worker 33 with error: 'Error: Cannot set the default task function reserved name as the default task function'"
b0b55f57
JB
1428 )
1429 )
1430 await expect(
1431 dynamicThreadPool.setDefaultTaskFunction('unknown')
948faff7 1432 ).rejects.toThrow(
b0b55f57 1433 new Error(
bcf85f7f 1434 "Task function operation 'default' failed on worker 33 with error: 'Error: Cannot set the default task function to a non-existing task function'"
b0b55f57
JB
1435 )
1436 )
9eae3c69
JB
1437 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1438 DEFAULT_TASK_NAME,
1439 'jsonIntegerSerialization',
1440 'factorial',
1441 'fibonacci'
1442 ])
1443 await expect(
1444 dynamicThreadPool.setDefaultTaskFunction('factorial')
1445 ).resolves.toBe(true)
1446 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1447 DEFAULT_TASK_NAME,
1448 'factorial',
1449 'jsonIntegerSerialization',
1450 'fibonacci'
1451 ])
1452 await expect(
1453 dynamicThreadPool.setDefaultTaskFunction('fibonacci')
1454 ).resolves.toBe(true)
1455 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1456 DEFAULT_TASK_NAME,
1457 'fibonacci',
1458 'jsonIntegerSerialization',
1459 'factorial'
1460 ])
cda9ba34 1461 await dynamicThreadPool.destroy()
30500265
JB
1462 })
1463
90d7d101 1464 it('Verify that multiple task functions worker is working', async () => {
70a4f5ea 1465 const pool = new DynamicClusterPool(
2431bdb4 1466 Math.floor(numberOfWorkers / 2),
70a4f5ea 1467 numberOfWorkers,
90d7d101 1468 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
70a4f5ea
JB
1469 )
1470 const data = { n: 10 }
82888165 1471 const result0 = await pool.execute(data)
30b963d4 1472 expect(result0).toStrictEqual({ ok: 1 })
70a4f5ea 1473 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
30b963d4 1474 expect(result1).toStrictEqual({ ok: 1 })
70a4f5ea
JB
1475 const result2 = await pool.execute(data, 'factorial')
1476 expect(result2).toBe(3628800)
1477 const result3 = await pool.execute(data, 'fibonacci')
024daf59 1478 expect(result3).toBe(55)
5bb5be17
JB
1479 expect(pool.info.executingTasks).toBe(0)
1480 expect(pool.info.executedTasks).toBe(4)
b414b84c 1481 for (const workerNode of pool.workerNodes) {
66979634 1482 expect(workerNode.info.taskFunctionNames).toStrictEqual([
6cd5248f 1483 DEFAULT_TASK_NAME,
b414b84c
JB
1484 'jsonIntegerSerialization',
1485 'factorial',
1486 'fibonacci'
1487 ])
1488 expect(workerNode.taskFunctionsUsage.size).toBe(3)
66979634 1489 for (const name of pool.listTaskFunctionNames()) {
5bb5be17
JB
1490 expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
1491 tasks: {
1492 executed: expect.any(Number),
4ba4c7f9 1493 executing: 0,
5bb5be17 1494 failed: 0,
68cbdc84
JB
1495 queued: 0,
1496 stolen: 0
5bb5be17
JB
1497 },
1498 runTime: {
1499 history: expect.any(CircularArray)
1500 },
1501 waitTime: {
1502 history: expect.any(CircularArray)
1503 },
1504 elu: {
1505 idle: {
1506 history: expect.any(CircularArray)
1507 },
1508 active: {
1509 history: expect.any(CircularArray)
1510 }
1511 }
1512 })
1513 expect(
4ba4c7f9
JB
1514 workerNode.getTaskFunctionWorkerUsage(name).tasks.executed
1515 ).toBeGreaterThan(0)
5bb5be17 1516 }
dfd7ec01
JB
1517 expect(
1518 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1519 ).toStrictEqual(
66979634
JB
1520 workerNode.getTaskFunctionWorkerUsage(
1521 workerNode.info.taskFunctionNames[1]
1522 )
dfd7ec01 1523 )
5bb5be17 1524 }
0fe39c97 1525 await pool.destroy()
70a4f5ea 1526 })
52a23942
JB
1527
1528 it('Verify sendKillMessageToWorker()', async () => {
1529 const pool = new DynamicClusterPool(
1530 Math.floor(numberOfWorkers / 2),
1531 numberOfWorkers,
1532 './tests/worker-files/cluster/testWorker.js'
1533 )
1534 const workerNodeKey = 0
1535 await expect(
adee6053 1536 pool.sendKillMessageToWorker(workerNodeKey)
52a23942
JB
1537 ).resolves.toBeUndefined()
1538 await pool.destroy()
1539 })
adee6053
JB
1540
1541 it('Verify sendTaskFunctionOperationToWorker()', async () => {
1542 const pool = new DynamicClusterPool(
1543 Math.floor(numberOfWorkers / 2),
1544 numberOfWorkers,
1545 './tests/worker-files/cluster/testWorker.js'
1546 )
1547 const workerNodeKey = 0
1548 await expect(
1549 pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
1550 taskFunctionOperation: 'add',
1551 taskFunctionName: 'empty',
1552 taskFunction: (() => {}).toString()
1553 })
1554 ).resolves.toBe(true)
1555 expect(
1556 pool.workerNodes[workerNodeKey].info.taskFunctionNames
1557 ).toStrictEqual([DEFAULT_TASK_NAME, 'test', 'empty'])
1558 await pool.destroy()
1559 })
1560
1561 it('Verify sendTaskFunctionOperationToWorkers()', async () => {
1562 const pool = new DynamicClusterPool(
1563 Math.floor(numberOfWorkers / 2),
1564 numberOfWorkers,
1565 './tests/worker-files/cluster/testWorker.js'
1566 )
adee6053
JB
1567 await expect(
1568 pool.sendTaskFunctionOperationToWorkers({
1569 taskFunctionOperation: 'add',
1570 taskFunctionName: 'empty',
1571 taskFunction: (() => {}).toString()
1572 })
1573 ).resolves.toBe(true)
1574 for (const workerNode of pool.workerNodes) {
1575 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1576 DEFAULT_TASK_NAME,
1577 'test',
1578 'empty'
1579 ])
1580 }
1581 await pool.destroy()
1582 })
3ec964d6 1583})