refactor: cleanup .gitignore
[poolifier.git] / tests / pools / abstract-pool.test.mjs
CommitLineData
a074ffee 1import { EventEmitterAsyncResource } from 'node:events'
2eb14889 2import { dirname, join } from 'node:path'
a074ffee 3import { readFileSync } from 'node:fs'
2eb14889 4import { fileURLToPath } from 'node:url'
a074ffee
JB
5import { expect } from 'expect'
6import { restore, stub } from 'sinon'
7import {
70a4f5ea 8 DynamicClusterPool,
9e619829 9 DynamicThreadPool,
aee46736 10 FixedClusterPool,
e843b904 11 FixedThreadPool,
aee46736 12 PoolEvents,
184855e6 13 PoolTypes,
3d6dd312 14 WorkerChoiceStrategies,
184855e6 15 WorkerTypes
a074ffee
JB
16} from '../../lib/index.js'
17import { CircularArray } from '../../lib/circular-array.js'
18import { Deque } from '../../lib/deque.js'
19import { DEFAULT_TASK_NAME } from '../../lib/utils.js'
20import { waitPoolEvents } from '../test-utils.js'
21import { WorkerNode } from '../../lib/pools/worker-node.js'
e1ffb94f
JB
22
23describe('Abstract pool test suite', () => {
2eb14889
JB
24 const version = JSON.parse(
25 readFileSync(
a5e5599c 26 join(dirname(fileURLToPath(import.meta.url)), '../..', 'package.json'),
2eb14889
JB
27 'utf8'
28 )
29 ).version
fc027381 30 const numberOfWorkers = 2
a8884ffd 31 class StubPoolWithIsMain extends FixedThreadPool {
e1ffb94f
JB
32 isMain () {
33 return false
34 }
3ec964d6 35 }
3ec964d6 36
dc021bcc 37 afterEach(() => {
a074ffee 38 restore()
dc021bcc
JB
39 })
40
3ec964d6 41 it('Simulate pool creation from a non main thread/process', () => {
8d3782fa
JB
42 expect(
43 () =>
a8884ffd 44 new StubPoolWithIsMain(
7c0ba920 45 numberOfWorkers,
b2fd3f4a 46 './tests/worker-files/thread/testWorker.mjs',
8d3782fa 47 {
041dc05b 48 errorHandler: e => console.error(e)
8d3782fa
JB
49 }
50 )
04f45163 51 ).toThrowError(
e695d66f
JB
52 new Error(
53 'Cannot start a pool from a worker with the same type as the pool'
54 )
04f45163 55 )
3ec964d6 56 })
c510fea7 57
bc61cfe6
JB
58 it('Verify that pool statuses properties are set', async () => {
59 const pool = new FixedThreadPool(
60 numberOfWorkers,
b2fd3f4a 61 './tests/worker-files/thread/testWorker.mjs'
bc61cfe6
JB
62 )
63 expect(pool.starting).toBe(false)
64 expect(pool.started).toBe(true)
65 await pool.destroy()
bc61cfe6
JB
66 })
67
c510fea7 68 it('Verify that filePath is checked', () => {
292ad316
JB
69 const expectedError = new Error(
70 'Please specify a file with a worker implementation'
71 )
7c0ba920 72 expect(() => new FixedThreadPool(numberOfWorkers)).toThrowError(
292ad316 73 expectedError
8d3782fa 74 )
7c0ba920 75 expect(() => new FixedThreadPool(numberOfWorkers, '')).toThrowError(
292ad316 76 expectedError
8d3782fa 77 )
3d6dd312
JB
78 expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrowError(
79 expectedError
80 )
81 expect(() => new FixedThreadPool(numberOfWorkers, true)).toThrowError(
82 expectedError
83 )
84 expect(
85 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
86 ).toThrowError(new Error("Cannot find the worker file './dummyWorker.ts'"))
8d3782fa
JB
87 })
88
89 it('Verify that numberOfWorkers is checked', () => {
8003c026
JB
90 expect(
91 () =>
92 new FixedThreadPool(
93 undefined,
b2fd3f4a 94 './tests/worker-files/thread/testWorker.mjs'
8003c026
JB
95 )
96 ).toThrowError(
e695d66f
JB
97 new Error(
98 'Cannot instantiate a pool without specifying the number of workers'
99 )
8d3782fa
JB
100 )
101 })
102
103 it('Verify that a negative number of workers is checked', () => {
104 expect(
105 () =>
106 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
107 ).toThrowError(
473c717a
JB
108 new RangeError(
109 'Cannot instantiate a pool with a negative number of workers'
110 )
8d3782fa
JB
111 )
112 })
113
114 it('Verify that a non integer number of workers is checked', () => {
115 expect(
116 () =>
b2fd3f4a 117 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.mjs')
8d3782fa 118 ).toThrowError(
473c717a 119 new TypeError(
0d80593b 120 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
121 )
122 )
c510fea7 123 })
7c0ba920 124
216541b6 125 it('Verify that dynamic pool sizing is checked', () => {
a5ed75b7
JB
126 expect(
127 () =>
128 new DynamicClusterPool(
129 1,
130 undefined,
131 './tests/worker-files/cluster/testWorker.js'
132 )
133 ).toThrowError(
134 new TypeError(
135 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
136 )
137 )
2761efb4
JB
138 expect(
139 () =>
140 new DynamicThreadPool(
141 0.5,
142 1,
b2fd3f4a 143 './tests/worker-files/thread/testWorker.mjs'
2761efb4
JB
144 )
145 ).toThrowError(
146 new TypeError(
147 'Cannot instantiate a pool with a non safe integer number of workers'
148 )
149 )
150 expect(
151 () =>
152 new DynamicClusterPool(
153 0,
154 0.5,
a5ed75b7 155 './tests/worker-files/cluster/testWorker.js'
2761efb4
JB
156 )
157 ).toThrowError(
158 new TypeError(
159 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
160 )
161 )
2431bdb4
JB
162 expect(
163 () =>
b2fd3f4a
JB
164 new DynamicThreadPool(
165 2,
166 1,
167 './tests/worker-files/thread/testWorker.mjs'
168 )
2431bdb4
JB
169 ).toThrowError(
170 new RangeError(
171 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
172 )
173 )
174 expect(
175 () =>
b2fd3f4a
JB
176 new DynamicThreadPool(
177 0,
178 0,
179 './tests/worker-files/thread/testWorker.mjs'
180 )
2431bdb4
JB
181 ).toThrowError(
182 new RangeError(
213cbac6 183 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
2431bdb4
JB
184 )
185 )
21f710aa
JB
186 expect(
187 () =>
213cbac6
JB
188 new DynamicClusterPool(
189 1,
190 1,
191 './tests/worker-files/cluster/testWorker.js'
192 )
21f710aa
JB
193 ).toThrowError(
194 new RangeError(
213cbac6 195 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
21f710aa
JB
196 )
197 )
2431bdb4
JB
198 })
199
fd7ebd49 200 it('Verify that pool options are checked', async () => {
7c0ba920
JB
201 let pool = new FixedThreadPool(
202 numberOfWorkers,
b2fd3f4a 203 './tests/worker-files/thread/testWorker.mjs'
7c0ba920 204 )
b5604034 205 expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
47352846
JB
206 expect(pool.opts).toStrictEqual({
207 startWorkers: true,
208 enableEvents: true,
209 restartWorkerOnError: true,
210 enableTasksQueue: false,
211 workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
212 workerChoiceStrategyOptions: {
213 retries: 6,
214 runTime: { median: false },
215 waitTime: { median: false },
216 elu: { median: false }
217 }
8990357d
JB
218 })
219 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
8c0b113f 220 retries: 6,
932fc8be 221 runTime: { median: false },
5df69fab
JB
222 waitTime: { median: false },
223 elu: { median: false }
da309861 224 })
999ef664
JB
225 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
226 .workerChoiceStrategies) {
227 expect(workerChoiceStrategy.opts).toStrictEqual({
228 retries: 6,
229 runTime: { median: false },
230 waitTime: { median: false },
231 elu: { median: false }
232 })
233 }
fd7ebd49 234 await pool.destroy()
73bfd59d 235 const testHandler = () => console.info('test handler executed')
7c0ba920
JB
236 pool = new FixedThreadPool(
237 numberOfWorkers,
b2fd3f4a 238 './tests/worker-files/thread/testWorker.mjs',
7c0ba920 239 {
e4543b14 240 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
49be33fe 241 workerChoiceStrategyOptions: {
932fc8be 242 runTime: { median: true },
fc027381 243 weights: { 0: 300, 1: 200 }
49be33fe 244 },
35cf1c03 245 enableEvents: false,
1f68cede 246 restartWorkerOnError: false,
ff733df7 247 enableTasksQueue: true,
d4aeae5a 248 tasksQueueOptions: { concurrency: 2 },
35cf1c03
JB
249 messageHandler: testHandler,
250 errorHandler: testHandler,
251 onlineHandler: testHandler,
252 exitHandler: testHandler
7c0ba920
JB
253 }
254 )
7c0ba920 255 expect(pool.emitter).toBeUndefined()
47352846
JB
256 expect(pool.opts).toStrictEqual({
257 startWorkers: true,
258 enableEvents: false,
259 restartWorkerOnError: false,
260 enableTasksQueue: true,
261 tasksQueueOptions: {
262 concurrency: 2,
2324f8c9 263 size: Math.pow(numberOfWorkers, 2),
dbd73092 264 taskStealing: true,
47352846
JB
265 tasksStealingOnBackPressure: true
266 },
267 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
268 workerChoiceStrategyOptions: {
269 retries: 6,
270 runTime: { median: true },
271 waitTime: { median: false },
272 elu: { median: false },
273 weights: { 0: 300, 1: 200 }
274 },
275 onlineHandler: testHandler,
276 messageHandler: testHandler,
277 errorHandler: testHandler,
278 exitHandler: testHandler
8990357d
JB
279 })
280 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
8c0b113f 281 retries: 6,
8990357d
JB
282 runTime: { median: true },
283 waitTime: { median: false },
284 elu: { median: false },
fc027381 285 weights: { 0: 300, 1: 200 }
da309861 286 })
999ef664
JB
287 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
288 .workerChoiceStrategies) {
289 expect(workerChoiceStrategy.opts).toStrictEqual({
290 retries: 6,
291 runTime: { median: true },
292 waitTime: { median: false },
293 elu: { median: false },
294 weights: { 0: 300, 1: 200 }
295 })
296 }
fd7ebd49 297 await pool.destroy()
7c0ba920
JB
298 })
299
a20f0ba5 300 it('Verify that pool options are validated', async () => {
d4aeae5a
JB
301 expect(
302 () =>
303 new FixedThreadPool(
304 numberOfWorkers,
b2fd3f4a 305 './tests/worker-files/thread/testWorker.mjs',
d4aeae5a 306 {
f0d7f803 307 workerChoiceStrategy: 'invalidStrategy'
d4aeae5a
JB
308 }
309 )
8735b4e5
JB
310 ).toThrowError(
311 new Error("Invalid worker choice strategy 'invalidStrategy'")
312 )
7e653ee0
JB
313 expect(
314 () =>
315 new FixedThreadPool(
316 numberOfWorkers,
b2fd3f4a 317 './tests/worker-files/thread/testWorker.mjs',
7e653ee0
JB
318 {
319 workerChoiceStrategyOptions: {
8c0b113f 320 retries: 'invalidChoiceRetries'
7e653ee0
JB
321 }
322 }
323 )
324 ).toThrowError(
325 new TypeError(
8c0b113f 326 'Invalid worker choice strategy options: retries must be an integer'
7e653ee0
JB
327 )
328 )
329 expect(
330 () =>
331 new FixedThreadPool(
332 numberOfWorkers,
b2fd3f4a 333 './tests/worker-files/thread/testWorker.mjs',
7e653ee0
JB
334 {
335 workerChoiceStrategyOptions: {
8c0b113f 336 retries: -1
7e653ee0
JB
337 }
338 }
339 )
340 ).toThrowError(
341 new RangeError(
8c0b113f 342 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
7e653ee0
JB
343 )
344 )
49be33fe
JB
345 expect(
346 () =>
347 new FixedThreadPool(
348 numberOfWorkers,
b2fd3f4a 349 './tests/worker-files/thread/testWorker.mjs',
49be33fe
JB
350 {
351 workerChoiceStrategyOptions: { weights: {} }
352 }
353 )
354 ).toThrowError(
8735b4e5
JB
355 new Error(
356 'Invalid worker choice strategy options: must have a weight for each worker node'
357 )
49be33fe 358 )
f0d7f803
JB
359 expect(
360 () =>
361 new FixedThreadPool(
362 numberOfWorkers,
b2fd3f4a 363 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
364 {
365 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
366 }
367 )
368 ).toThrowError(
8735b4e5
JB
369 new Error(
370 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
371 )
f0d7f803 372 )
5c4c2dee
JB
373 expect(
374 () =>
375 new FixedThreadPool(
376 numberOfWorkers,
b2fd3f4a 377 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
378 {
379 enableTasksQueue: true,
380 tasksQueueOptions: 'invalidTasksQueueOptions'
381 }
382 )
383 ).toThrowError(
384 new TypeError('Invalid tasks queue options: must be a plain object')
385 )
f0d7f803
JB
386 expect(
387 () =>
388 new FixedThreadPool(
389 numberOfWorkers,
b2fd3f4a 390 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
391 {
392 enableTasksQueue: true,
393 tasksQueueOptions: { concurrency: 0 }
394 }
395 )
8735b4e5 396 ).toThrowError(
e695d66f 397 new RangeError(
20c6f652 398 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
8735b4e5
JB
399 )
400 )
f0d7f803
JB
401 expect(
402 () =>
403 new FixedThreadPool(
404 numberOfWorkers,
b2fd3f4a 405 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
406 {
407 enableTasksQueue: true,
5c4c2dee 408 tasksQueueOptions: { concurrency: -1 }
f0d7f803
JB
409 }
410 )
8735b4e5 411 ).toThrowError(
5c4c2dee
JB
412 new RangeError(
413 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
414 )
8735b4e5 415 )
f0d7f803
JB
416 expect(
417 () =>
418 new FixedThreadPool(
419 numberOfWorkers,
b2fd3f4a 420 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
421 {
422 enableTasksQueue: true,
423 tasksQueueOptions: { concurrency: 0.2 }
424 }
425 )
8735b4e5 426 ).toThrowError(
20c6f652 427 new TypeError('Invalid worker node tasks concurrency: must be an integer')
8735b4e5 428 )
5c4c2dee
JB
429 expect(
430 () =>
431 new FixedThreadPool(
432 numberOfWorkers,
b2fd3f4a 433 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
434 {
435 enableTasksQueue: true,
436 tasksQueueOptions: { size: 0 }
437 }
438 )
439 ).toThrowError(
440 new RangeError(
441 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
442 )
443 )
444 expect(
445 () =>
446 new FixedThreadPool(
447 numberOfWorkers,
b2fd3f4a 448 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
449 {
450 enableTasksQueue: true,
451 tasksQueueOptions: { size: -1 }
452 }
453 )
454 ).toThrowError(
455 new RangeError(
456 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
457 )
458 )
459 expect(
460 () =>
461 new FixedThreadPool(
462 numberOfWorkers,
b2fd3f4a 463 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
464 {
465 enableTasksQueue: true,
466 tasksQueueOptions: { size: 0.2 }
467 }
468 )
469 ).toThrowError(
470 new TypeError('Invalid worker node tasks queue size: must be an integer')
471 )
d4aeae5a
JB
472 })
473
2431bdb4 474 it('Verify that pool worker choice strategy options can be set', async () => {
a20f0ba5
JB
475 const pool = new FixedThreadPool(
476 numberOfWorkers,
b2fd3f4a 477 './tests/worker-files/thread/testWorker.mjs',
a20f0ba5
JB
478 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
479 )
480 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
8c0b113f 481 retries: 6,
8990357d
JB
482 runTime: { median: false },
483 waitTime: { median: false },
484 elu: { median: false }
485 })
486 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
8c0b113f 487 retries: 6,
932fc8be 488 runTime: { median: false },
5df69fab
JB
489 waitTime: { median: false },
490 elu: { median: false }
a20f0ba5
JB
491 })
492 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
493 .workerChoiceStrategies) {
86bf340d 494 expect(workerChoiceStrategy.opts).toStrictEqual({
8c0b113f 495 retries: 6,
932fc8be 496 runTime: { median: false },
5df69fab
JB
497 waitTime: { median: false },
498 elu: { median: false }
86bf340d 499 })
a20f0ba5 500 }
87de9ff5
JB
501 expect(
502 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
503 ).toStrictEqual({
932fc8be
JB
504 runTime: {
505 aggregate: true,
506 average: true,
507 median: false
508 },
509 waitTime: {
510 aggregate: false,
511 average: false,
512 median: false
513 },
5df69fab 514 elu: {
9adcefab
JB
515 aggregate: true,
516 average: true,
5df69fab
JB
517 median: false
518 }
86bf340d 519 })
9adcefab
JB
520 pool.setWorkerChoiceStrategyOptions({
521 runTime: { median: true },
522 elu: { median: true }
523 })
a20f0ba5 524 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
8c0b113f 525 retries: 6,
9adcefab 526 runTime: { median: true },
8990357d
JB
527 waitTime: { median: false },
528 elu: { median: true }
529 })
530 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
8c0b113f 531 retries: 6,
8990357d
JB
532 runTime: { median: true },
533 waitTime: { median: false },
9adcefab 534 elu: { median: true }
a20f0ba5
JB
535 })
536 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
537 .workerChoiceStrategies) {
932fc8be 538 expect(workerChoiceStrategy.opts).toStrictEqual({
8c0b113f 539 retries: 6,
9adcefab 540 runTime: { median: true },
8990357d 541 waitTime: { median: false },
9adcefab 542 elu: { median: true }
932fc8be 543 })
a20f0ba5 544 }
87de9ff5
JB
545 expect(
546 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
547 ).toStrictEqual({
932fc8be
JB
548 runTime: {
549 aggregate: true,
550 average: false,
551 median: true
552 },
553 waitTime: {
554 aggregate: false,
555 average: false,
556 median: false
557 },
5df69fab 558 elu: {
9adcefab 559 aggregate: true,
5df69fab 560 average: false,
9adcefab 561 median: true
5df69fab 562 }
86bf340d 563 })
9adcefab
JB
564 pool.setWorkerChoiceStrategyOptions({
565 runTime: { median: false },
566 elu: { median: false }
567 })
a20f0ba5 568 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
8c0b113f 569 retries: 6,
8990357d
JB
570 runTime: { median: false },
571 waitTime: { median: false },
572 elu: { median: false }
573 })
574 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
8c0b113f 575 retries: 6,
9adcefab 576 runTime: { median: false },
8990357d 577 waitTime: { median: false },
9adcefab 578 elu: { median: false }
a20f0ba5
JB
579 })
580 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
581 .workerChoiceStrategies) {
932fc8be 582 expect(workerChoiceStrategy.opts).toStrictEqual({
8c0b113f 583 retries: 6,
9adcefab 584 runTime: { median: false },
8990357d 585 waitTime: { median: false },
9adcefab 586 elu: { median: false }
932fc8be 587 })
a20f0ba5 588 }
87de9ff5
JB
589 expect(
590 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
591 ).toStrictEqual({
932fc8be
JB
592 runTime: {
593 aggregate: true,
594 average: true,
595 median: false
596 },
597 waitTime: {
598 aggregate: false,
599 average: false,
600 median: false
601 },
5df69fab 602 elu: {
9adcefab
JB
603 aggregate: true,
604 average: true,
5df69fab
JB
605 median: false
606 }
86bf340d 607 })
1f95d544
JB
608 expect(() =>
609 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
610 ).toThrowError(
8735b4e5
JB
611 new TypeError(
612 'Invalid worker choice strategy options: must be a plain object'
613 )
1f95d544 614 )
7e653ee0
JB
615 expect(() =>
616 pool.setWorkerChoiceStrategyOptions({
8c0b113f 617 retries: 'invalidChoiceRetries'
7e653ee0
JB
618 })
619 ).toThrowError(
620 new TypeError(
8c0b113f 621 'Invalid worker choice strategy options: retries must be an integer'
7e653ee0
JB
622 )
623 )
624 expect(() =>
8c0b113f 625 pool.setWorkerChoiceStrategyOptions({ retries: -1 })
7e653ee0
JB
626 ).toThrowError(
627 new RangeError(
8c0b113f 628 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
7e653ee0
JB
629 )
630 )
1f95d544
JB
631 expect(() =>
632 pool.setWorkerChoiceStrategyOptions({ weights: {} })
633 ).toThrowError(
8735b4e5
JB
634 new Error(
635 'Invalid worker choice strategy options: must have a weight for each worker node'
636 )
1f95d544
JB
637 )
638 expect(() =>
639 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
640 ).toThrowError(
8735b4e5
JB
641 new Error(
642 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
643 )
1f95d544 644 )
a20f0ba5
JB
645 await pool.destroy()
646 })
647
2431bdb4 648 it('Verify that pool tasks queue can be enabled/disabled', async () => {
a20f0ba5
JB
649 const pool = new FixedThreadPool(
650 numberOfWorkers,
b2fd3f4a 651 './tests/worker-files/thread/testWorker.mjs'
a20f0ba5
JB
652 )
653 expect(pool.opts.enableTasksQueue).toBe(false)
654 expect(pool.opts.tasksQueueOptions).toBeUndefined()
d6ca1416
JB
655 for (const workerNode of pool.workerNodes) {
656 expect(workerNode.onEmptyQueue).toBeUndefined()
657 expect(workerNode.onBackPressure).toBeUndefined()
658 }
a20f0ba5
JB
659 pool.enableTasksQueue(true)
660 expect(pool.opts.enableTasksQueue).toBe(true)
20c6f652
JB
661 expect(pool.opts.tasksQueueOptions).toStrictEqual({
662 concurrency: 1,
2324f8c9 663 size: Math.pow(numberOfWorkers, 2),
dbd73092 664 taskStealing: true,
47352846 665 tasksStealingOnBackPressure: true
20c6f652 666 })
d6ca1416
JB
667 for (const workerNode of pool.workerNodes) {
668 expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
669 expect(workerNode.onBackPressure).toBeInstanceOf(Function)
670 }
a20f0ba5
JB
671 pool.enableTasksQueue(true, { concurrency: 2 })
672 expect(pool.opts.enableTasksQueue).toBe(true)
20c6f652
JB
673 expect(pool.opts.tasksQueueOptions).toStrictEqual({
674 concurrency: 2,
2324f8c9 675 size: Math.pow(numberOfWorkers, 2),
dbd73092 676 taskStealing: true,
47352846 677 tasksStealingOnBackPressure: true
20c6f652 678 })
d6ca1416
JB
679 for (const workerNode of pool.workerNodes) {
680 expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
681 expect(workerNode.onBackPressure).toBeInstanceOf(Function)
682 }
a20f0ba5
JB
683 pool.enableTasksQueue(false)
684 expect(pool.opts.enableTasksQueue).toBe(false)
685 expect(pool.opts.tasksQueueOptions).toBeUndefined()
d6ca1416
JB
686 for (const workerNode of pool.workerNodes) {
687 expect(workerNode.onEmptyQueue).toBeUndefined()
688 expect(workerNode.onBackPressure).toBeUndefined()
689 }
a20f0ba5
JB
690 await pool.destroy()
691 })
692
2431bdb4 693 it('Verify that pool tasks queue options can be set', async () => {
a20f0ba5
JB
694 const pool = new FixedThreadPool(
695 numberOfWorkers,
b2fd3f4a 696 './tests/worker-files/thread/testWorker.mjs',
a20f0ba5
JB
697 { enableTasksQueue: true }
698 )
20c6f652
JB
699 expect(pool.opts.tasksQueueOptions).toStrictEqual({
700 concurrency: 1,
2324f8c9 701 size: Math.pow(numberOfWorkers, 2),
dbd73092 702 taskStealing: true,
47352846 703 tasksStealingOnBackPressure: true
20c6f652 704 })
d6ca1416 705 for (const workerNode of pool.workerNodes) {
2324f8c9
JB
706 expect(workerNode.tasksQueueBackPressureSize).toBe(
707 pool.opts.tasksQueueOptions.size
708 )
d6ca1416
JB
709 expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
710 expect(workerNode.onBackPressure).toBeInstanceOf(Function)
711 }
712 pool.setTasksQueueOptions({
713 concurrency: 2,
2324f8c9 714 size: 2,
d6ca1416
JB
715 taskStealing: false,
716 tasksStealingOnBackPressure: false
717 })
20c6f652
JB
718 expect(pool.opts.tasksQueueOptions).toStrictEqual({
719 concurrency: 2,
2324f8c9 720 size: 2,
d6ca1416
JB
721 taskStealing: false,
722 tasksStealingOnBackPressure: false
723 })
724 for (const workerNode of pool.workerNodes) {
2324f8c9
JB
725 expect(workerNode.tasksQueueBackPressureSize).toBe(
726 pool.opts.tasksQueueOptions.size
727 )
d6ca1416
JB
728 expect(workerNode.onEmptyQueue).toBeUndefined()
729 expect(workerNode.onBackPressure).toBeUndefined()
730 }
731 pool.setTasksQueueOptions({
732 concurrency: 1,
733 taskStealing: true,
734 tasksStealingOnBackPressure: true
735 })
736 expect(pool.opts.tasksQueueOptions).toStrictEqual({
737 concurrency: 1,
2324f8c9 738 size: Math.pow(numberOfWorkers, 2),
dbd73092 739 taskStealing: true,
47352846 740 tasksStealingOnBackPressure: true
20c6f652 741 })
d6ca1416 742 for (const workerNode of pool.workerNodes) {
2324f8c9
JB
743 expect(workerNode.tasksQueueBackPressureSize).toBe(
744 pool.opts.tasksQueueOptions.size
745 )
d6ca1416
JB
746 expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
747 expect(workerNode.onBackPressure).toBeInstanceOf(Function)
748 }
f0d7f803
JB
749 expect(() =>
750 pool.setTasksQueueOptions('invalidTasksQueueOptions')
8735b4e5
JB
751 ).toThrowError(
752 new TypeError('Invalid tasks queue options: must be a plain object')
753 )
a20f0ba5 754 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrowError(
e695d66f 755 new RangeError(
20c6f652 756 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
8735b4e5
JB
757 )
758 )
759 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrowError(
e695d66f 760 new RangeError(
20c6f652 761 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
8735b4e5 762 )
a20f0ba5 763 )
f0d7f803 764 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrowError(
20c6f652
JB
765 new TypeError('Invalid worker node tasks concurrency: must be an integer')
766 )
ff3f866a 767 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrowError(
20c6f652 768 new RangeError(
68dbcdc0 769 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
20c6f652
JB
770 )
771 )
ff3f866a 772 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrowError(
20c6f652 773 new RangeError(
68dbcdc0 774 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
20c6f652
JB
775 )
776 )
ff3f866a 777 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrowError(
68dbcdc0 778 new TypeError('Invalid worker node tasks queue size: must be an integer')
f0d7f803 779 )
a20f0ba5
JB
780 await pool.destroy()
781 })
782
6b27d407
JB
783 it('Verify that pool info is set', async () => {
784 let pool = new FixedThreadPool(
785 numberOfWorkers,
b2fd3f4a 786 './tests/worker-files/thread/testWorker.mjs'
6b27d407 787 )
2dca6cab
JB
788 expect(pool.info).toStrictEqual({
789 version,
790 type: PoolTypes.fixed,
791 worker: WorkerTypes.thread,
47352846 792 started: true,
2dca6cab
JB
793 ready: true,
794 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
795 minSize: numberOfWorkers,
796 maxSize: numberOfWorkers,
797 workerNodes: numberOfWorkers,
798 idleWorkerNodes: numberOfWorkers,
799 busyWorkerNodes: 0,
800 executedTasks: 0,
801 executingTasks: 0,
2dca6cab
JB
802 failedTasks: 0
803 })
6b27d407
JB
804 await pool.destroy()
805 pool = new DynamicClusterPool(
2431bdb4 806 Math.floor(numberOfWorkers / 2),
6b27d407 807 numberOfWorkers,
ecdfbdc0 808 './tests/worker-files/cluster/testWorker.js'
6b27d407 809 )
2dca6cab
JB
810 expect(pool.info).toStrictEqual({
811 version,
812 type: PoolTypes.dynamic,
813 worker: WorkerTypes.cluster,
47352846 814 started: true,
2dca6cab
JB
815 ready: true,
816 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
817 minSize: Math.floor(numberOfWorkers / 2),
818 maxSize: numberOfWorkers,
819 workerNodes: Math.floor(numberOfWorkers / 2),
820 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
821 busyWorkerNodes: 0,
822 executedTasks: 0,
823 executingTasks: 0,
2dca6cab
JB
824 failedTasks: 0
825 })
6b27d407
JB
826 await pool.destroy()
827 })
828
2431bdb4 829 it('Verify that pool worker tasks usage are initialized', async () => {
bf9549ae
JB
830 const pool = new FixedClusterPool(
831 numberOfWorkers,
832 './tests/worker-files/cluster/testWorker.js'
833 )
f06e48d8 834 for (const workerNode of pool.workerNodes) {
47352846 835 expect(workerNode).toBeInstanceOf(WorkerNode)
465b2940 836 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
837 tasks: {
838 executed: 0,
839 executing: 0,
840 queued: 0,
df593701 841 maxQueued: 0,
68cbdc84 842 stolen: 0,
a4e07f72
JB
843 failed: 0
844 },
845 runTime: {
4ba4c7f9 846 history: new CircularArray()
a4e07f72
JB
847 },
848 waitTime: {
4ba4c7f9 849 history: new CircularArray()
a4e07f72 850 },
5df69fab
JB
851 elu: {
852 idle: {
4ba4c7f9 853 history: new CircularArray()
5df69fab
JB
854 },
855 active: {
4ba4c7f9 856 history: new CircularArray()
f7510105 857 }
5df69fab 858 }
86bf340d 859 })
f06e48d8
JB
860 }
861 await pool.destroy()
862 })
863
2431bdb4
JB
864 it('Verify that pool worker tasks queue are initialized', async () => {
865 let pool = new FixedClusterPool(
f06e48d8
JB
866 numberOfWorkers,
867 './tests/worker-files/cluster/testWorker.js'
868 )
869 for (const workerNode of pool.workerNodes) {
47352846 870 expect(workerNode).toBeInstanceOf(WorkerNode)
574b351d 871 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
4d8bf9e4 872 expect(workerNode.tasksQueue.size).toBe(0)
9c16fb4b 873 expect(workerNode.tasksQueue.maxSize).toBe(0)
bf9549ae 874 }
fd7ebd49 875 await pool.destroy()
2431bdb4
JB
876 pool = new DynamicThreadPool(
877 Math.floor(numberOfWorkers / 2),
878 numberOfWorkers,
b2fd3f4a 879 './tests/worker-files/thread/testWorker.mjs'
2431bdb4
JB
880 )
881 for (const workerNode of pool.workerNodes) {
47352846 882 expect(workerNode).toBeInstanceOf(WorkerNode)
574b351d 883 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
2431bdb4
JB
884 expect(workerNode.tasksQueue.size).toBe(0)
885 expect(workerNode.tasksQueue.maxSize).toBe(0)
886 }
213cbac6 887 await pool.destroy()
2431bdb4
JB
888 })
889
890 it('Verify that pool worker info are initialized', async () => {
891 let pool = new FixedClusterPool(
892 numberOfWorkers,
893 './tests/worker-files/cluster/testWorker.js'
894 )
2dca6cab 895 for (const workerNode of pool.workerNodes) {
47352846 896 expect(workerNode).toBeInstanceOf(WorkerNode)
2dca6cab
JB
897 expect(workerNode.info).toStrictEqual({
898 id: expect.any(Number),
899 type: WorkerTypes.cluster,
900 dynamic: false,
901 ready: true
902 })
903 }
2431bdb4
JB
904 await pool.destroy()
905 pool = new DynamicThreadPool(
906 Math.floor(numberOfWorkers / 2),
907 numberOfWorkers,
b2fd3f4a 908 './tests/worker-files/thread/testWorker.mjs'
2431bdb4 909 )
2dca6cab 910 for (const workerNode of pool.workerNodes) {
47352846 911 expect(workerNode).toBeInstanceOf(WorkerNode)
2dca6cab
JB
912 expect(workerNode.info).toStrictEqual({
913 id: expect.any(Number),
914 type: WorkerTypes.thread,
915 dynamic: false,
7884d183 916 ready: true
2dca6cab
JB
917 })
918 }
213cbac6 919 await pool.destroy()
bf9549ae
JB
920 })
921
47352846
JB
922 it('Verify that pool can be started after initialization', async () => {
923 const pool = new FixedClusterPool(
924 numberOfWorkers,
925 './tests/worker-files/cluster/testWorker.js',
926 {
927 startWorkers: false
928 }
929 )
930 expect(pool.info.started).toBe(false)
931 expect(pool.info.ready).toBe(false)
932 expect(pool.workerNodes).toStrictEqual([])
933 await expect(pool.execute()).rejects.toThrowError(
934 new Error('Cannot execute a task on not started pool')
935 )
936 pool.start()
937 expect(pool.info.started).toBe(true)
938 expect(pool.info.ready).toBe(true)
939 expect(pool.workerNodes.length).toBe(numberOfWorkers)
940 for (const workerNode of pool.workerNodes) {
941 expect(workerNode).toBeInstanceOf(WorkerNode)
942 }
943 await pool.destroy()
944 })
945
9d2d0da1
JB
946 it('Verify that pool execute() arguments are checked', async () => {
947 const pool = new FixedClusterPool(
948 numberOfWorkers,
949 './tests/worker-files/cluster/testWorker.js'
950 )
951 await expect(pool.execute(undefined, 0)).rejects.toThrowError(
952 new TypeError('name argument must be a string')
953 )
954 await expect(pool.execute(undefined, '')).rejects.toThrowError(
955 new TypeError('name argument must not be an empty string')
956 )
957 await expect(pool.execute(undefined, undefined, {})).rejects.toThrowError(
958 new TypeError('transferList argument must be an array')
959 )
960 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
961 "Task function 'unknown' not found"
962 )
963 await pool.destroy()
47352846
JB
964 await expect(pool.execute()).rejects.toThrowError(
965 new Error('Cannot execute a task on not started pool')
9d2d0da1
JB
966 )
967 })
968
2431bdb4 969 it('Verify that pool worker tasks usage are computed', async () => {
bf9549ae
JB
970 const pool = new FixedClusterPool(
971 numberOfWorkers,
972 './tests/worker-files/cluster/testWorker.js'
973 )
09c2d0d3 974 const promises = new Set()
fc027381
JB
975 const maxMultiplier = 2
976 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
09c2d0d3 977 promises.add(pool.execute())
bf9549ae 978 }
f06e48d8 979 for (const workerNode of pool.workerNodes) {
465b2940 980 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
981 tasks: {
982 executed: 0,
983 executing: maxMultiplier,
984 queued: 0,
df593701 985 maxQueued: 0,
68cbdc84 986 stolen: 0,
a4e07f72
JB
987 failed: 0
988 },
989 runTime: {
a4e07f72
JB
990 history: expect.any(CircularArray)
991 },
992 waitTime: {
a4e07f72
JB
993 history: expect.any(CircularArray)
994 },
5df69fab
JB
995 elu: {
996 idle: {
5df69fab
JB
997 history: expect.any(CircularArray)
998 },
999 active: {
5df69fab 1000 history: expect.any(CircularArray)
f7510105 1001 }
5df69fab 1002 }
86bf340d 1003 })
bf9549ae
JB
1004 }
1005 await Promise.all(promises)
f06e48d8 1006 for (const workerNode of pool.workerNodes) {
465b2940 1007 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
1008 tasks: {
1009 executed: maxMultiplier,
1010 executing: 0,
1011 queued: 0,
df593701 1012 maxQueued: 0,
68cbdc84 1013 stolen: 0,
a4e07f72
JB
1014 failed: 0
1015 },
1016 runTime: {
a4e07f72
JB
1017 history: expect.any(CircularArray)
1018 },
1019 waitTime: {
a4e07f72
JB
1020 history: expect.any(CircularArray)
1021 },
5df69fab
JB
1022 elu: {
1023 idle: {
5df69fab
JB
1024 history: expect.any(CircularArray)
1025 },
1026 active: {
5df69fab 1027 history: expect.any(CircularArray)
f7510105 1028 }
5df69fab 1029 }
86bf340d 1030 })
bf9549ae 1031 }
fd7ebd49 1032 await pool.destroy()
bf9549ae
JB
1033 })
1034
2431bdb4 1035 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
7fd82a1c 1036 const pool = new DynamicThreadPool(
2431bdb4 1037 Math.floor(numberOfWorkers / 2),
8f4878b7 1038 numberOfWorkers,
b2fd3f4a 1039 './tests/worker-files/thread/testWorker.mjs'
9e619829 1040 )
09c2d0d3 1041 const promises = new Set()
ee9f5295
JB
1042 const maxMultiplier = 2
1043 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
09c2d0d3 1044 promises.add(pool.execute())
9e619829
JB
1045 }
1046 await Promise.all(promises)
f06e48d8 1047 for (const workerNode of pool.workerNodes) {
465b2940 1048 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
1049 tasks: {
1050 executed: expect.any(Number),
1051 executing: 0,
1052 queued: 0,
df593701 1053 maxQueued: 0,
68cbdc84 1054 stolen: 0,
a4e07f72
JB
1055 failed: 0
1056 },
1057 runTime: {
a4e07f72
JB
1058 history: expect.any(CircularArray)
1059 },
1060 waitTime: {
a4e07f72
JB
1061 history: expect.any(CircularArray)
1062 },
5df69fab
JB
1063 elu: {
1064 idle: {
5df69fab
JB
1065 history: expect.any(CircularArray)
1066 },
1067 active: {
5df69fab 1068 history: expect.any(CircularArray)
f7510105 1069 }
5df69fab 1070 }
86bf340d 1071 })
465b2940 1072 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
94407def
JB
1073 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1074 numberOfWorkers * maxMultiplier
1075 )
b97d82d8
JB
1076 expect(workerNode.usage.runTime.history.length).toBe(0)
1077 expect(workerNode.usage.waitTime.history.length).toBe(0)
1078 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1079 expect(workerNode.usage.elu.active.history.length).toBe(0)
9e619829
JB
1080 }
1081 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
f06e48d8 1082 for (const workerNode of pool.workerNodes) {
465b2940 1083 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
1084 tasks: {
1085 executed: 0,
1086 executing: 0,
1087 queued: 0,
df593701 1088 maxQueued: 0,
68cbdc84 1089 stolen: 0,
a4e07f72
JB
1090 failed: 0
1091 },
1092 runTime: {
a4e07f72
JB
1093 history: expect.any(CircularArray)
1094 },
1095 waitTime: {
a4e07f72
JB
1096 history: expect.any(CircularArray)
1097 },
5df69fab
JB
1098 elu: {
1099 idle: {
5df69fab
JB
1100 history: expect.any(CircularArray)
1101 },
1102 active: {
5df69fab 1103 history: expect.any(CircularArray)
f7510105 1104 }
5df69fab 1105 }
86bf340d 1106 })
465b2940
JB
1107 expect(workerNode.usage.runTime.history.length).toBe(0)
1108 expect(workerNode.usage.waitTime.history.length).toBe(0)
b97d82d8
JB
1109 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1110 expect(workerNode.usage.elu.active.history.length).toBe(0)
ee11a4a2 1111 }
fd7ebd49 1112 await pool.destroy()
ee11a4a2
JB
1113 })
1114
a1763c54
JB
1115 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1116 const pool = new DynamicClusterPool(
2431bdb4 1117 Math.floor(numberOfWorkers / 2),
164d950a 1118 numberOfWorkers,
a1763c54 1119 './tests/worker-files/cluster/testWorker.js'
164d950a 1120 )
c726f66c 1121 expect(pool.emitter.eventNames()).toStrictEqual([])
d46660cd 1122 let poolInfo
a1763c54 1123 let poolReady = 0
041dc05b 1124 pool.emitter.on(PoolEvents.ready, info => {
a1763c54 1125 ++poolReady
d46660cd
JB
1126 poolInfo = info
1127 })
a1763c54 1128 await waitPoolEvents(pool, PoolEvents.ready, 1)
c726f66c 1129 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
a1763c54 1130 expect(poolReady).toBe(1)
d46660cd 1131 expect(poolInfo).toStrictEqual({
23ccf9d7 1132 version,
d46660cd 1133 type: PoolTypes.dynamic,
a1763c54 1134 worker: WorkerTypes.cluster,
47352846 1135 started: true,
a1763c54 1136 ready: true,
2431bdb4
JB
1137 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1138 minSize: expect.any(Number),
1139 maxSize: expect.any(Number),
1140 workerNodes: expect.any(Number),
1141 idleWorkerNodes: expect.any(Number),
1142 busyWorkerNodes: expect.any(Number),
1143 executedTasks: expect.any(Number),
1144 executingTasks: expect.any(Number),
2431bdb4
JB
1145 failedTasks: expect.any(Number)
1146 })
1147 await pool.destroy()
1148 })
1149
a1763c54
JB
1150 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1151 const pool = new FixedThreadPool(
2431bdb4 1152 numberOfWorkers,
b2fd3f4a 1153 './tests/worker-files/thread/testWorker.mjs'
2431bdb4 1154 )
c726f66c 1155 expect(pool.emitter.eventNames()).toStrictEqual([])
a1763c54
JB
1156 const promises = new Set()
1157 let poolBusy = 0
2431bdb4 1158 let poolInfo
041dc05b 1159 pool.emitter.on(PoolEvents.busy, info => {
a1763c54 1160 ++poolBusy
2431bdb4
JB
1161 poolInfo = info
1162 })
c726f66c 1163 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
a1763c54
JB
1164 for (let i = 0; i < numberOfWorkers * 2; i++) {
1165 promises.add(pool.execute())
1166 }
1167 await Promise.all(promises)
1168 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1169 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1170 expect(poolBusy).toBe(numberOfWorkers + 1)
2431bdb4
JB
1171 expect(poolInfo).toStrictEqual({
1172 version,
a1763c54
JB
1173 type: PoolTypes.fixed,
1174 worker: WorkerTypes.thread,
47352846
JB
1175 started: true,
1176 ready: true,
2431bdb4 1177 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
d46660cd
JB
1178 minSize: expect.any(Number),
1179 maxSize: expect.any(Number),
1180 workerNodes: expect.any(Number),
1181 idleWorkerNodes: expect.any(Number),
1182 busyWorkerNodes: expect.any(Number),
a4e07f72
JB
1183 executedTasks: expect.any(Number),
1184 executingTasks: expect.any(Number),
a4e07f72 1185 failedTasks: expect.any(Number)
d46660cd 1186 })
164d950a
JB
1187 await pool.destroy()
1188 })
1189
a1763c54
JB
1190 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1191 const pool = new DynamicThreadPool(
1192 Math.floor(numberOfWorkers / 2),
7c0ba920 1193 numberOfWorkers,
b2fd3f4a 1194 './tests/worker-files/thread/testWorker.mjs'
7c0ba920 1195 )
c726f66c 1196 expect(pool.emitter.eventNames()).toStrictEqual([])
09c2d0d3 1197 const promises = new Set()
a1763c54 1198 let poolFull = 0
d46660cd 1199 let poolInfo
041dc05b 1200 pool.emitter.on(PoolEvents.full, info => {
a1763c54 1201 ++poolFull
d46660cd
JB
1202 poolInfo = info
1203 })
c726f66c 1204 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.full])
7c0ba920 1205 for (let i = 0; i < numberOfWorkers * 2; i++) {
f5d14e90 1206 promises.add(pool.execute())
7c0ba920 1207 }
cf597bc5 1208 await Promise.all(promises)
33e6bb4c 1209 expect(poolFull).toBe(1)
d46660cd 1210 expect(poolInfo).toStrictEqual({
23ccf9d7 1211 version,
a1763c54 1212 type: PoolTypes.dynamic,
d46660cd 1213 worker: WorkerTypes.thread,
47352846
JB
1214 started: true,
1215 ready: true,
2431bdb4 1216 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
8735b4e5
JB
1217 minSize: expect.any(Number),
1218 maxSize: expect.any(Number),
1219 workerNodes: expect.any(Number),
1220 idleWorkerNodes: expect.any(Number),
1221 busyWorkerNodes: expect.any(Number),
1222 executedTasks: expect.any(Number),
1223 executingTasks: expect.any(Number),
1224 failedTasks: expect.any(Number)
1225 })
1226 await pool.destroy()
1227 })
1228
3e8611a8 1229 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
b1aae695 1230 const pool = new FixedThreadPool(
8735b4e5 1231 numberOfWorkers,
b2fd3f4a 1232 './tests/worker-files/thread/testWorker.mjs',
8735b4e5
JB
1233 {
1234 enableTasksQueue: true
1235 }
1236 )
a074ffee 1237 stub(pool, 'hasBackPressure').returns(true)
c726f66c 1238 expect(pool.emitter.eventNames()).toStrictEqual([])
8735b4e5
JB
1239 const promises = new Set()
1240 let poolBackPressure = 0
1241 let poolInfo
041dc05b 1242 pool.emitter.on(PoolEvents.backPressure, info => {
8735b4e5
JB
1243 ++poolBackPressure
1244 poolInfo = info
1245 })
c726f66c 1246 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.backPressure])
033f1776 1247 for (let i = 0; i < numberOfWorkers + 1; i++) {
8735b4e5
JB
1248 promises.add(pool.execute())
1249 }
1250 await Promise.all(promises)
033f1776 1251 expect(poolBackPressure).toBe(1)
8735b4e5
JB
1252 expect(poolInfo).toStrictEqual({
1253 version,
3e8611a8 1254 type: PoolTypes.fixed,
8735b4e5 1255 worker: WorkerTypes.thread,
47352846
JB
1256 started: true,
1257 ready: true,
8735b4e5 1258 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
d46660cd
JB
1259 minSize: expect.any(Number),
1260 maxSize: expect.any(Number),
1261 workerNodes: expect.any(Number),
1262 idleWorkerNodes: expect.any(Number),
1263 busyWorkerNodes: expect.any(Number),
a4e07f72
JB
1264 executedTasks: expect.any(Number),
1265 executingTasks: expect.any(Number),
3e8611a8
JB
1266 maxQueuedTasks: expect.any(Number),
1267 queuedTasks: expect.any(Number),
1268 backPressure: true,
68cbdc84 1269 stolenTasks: expect.any(Number),
a4e07f72 1270 failedTasks: expect.any(Number)
d46660cd 1271 })
3e8611a8 1272 expect(pool.hasBackPressure.called).toBe(true)
fd7ebd49 1273 await pool.destroy()
7c0ba920 1274 })
70a4f5ea 1275
9eae3c69
JB
1276 it('Verify that hasTaskFunction() is working', async () => {
1277 const dynamicThreadPool = new DynamicThreadPool(
1278 Math.floor(numberOfWorkers / 2),
1279 numberOfWorkers,
b2fd3f4a 1280 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
9eae3c69
JB
1281 )
1282 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1283 expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1284 expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1285 true
1286 )
1287 expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true)
1288 expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true)
1289 expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false)
1290 await dynamicThreadPool.destroy()
1291 const fixedClusterPool = new FixedClusterPool(
1292 numberOfWorkers,
1293 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1294 )
1295 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1296 expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1297 expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1298 true
1299 )
1300 expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true)
1301 expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true)
1302 expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false)
1303 await fixedClusterPool.destroy()
1304 })
1305
1306 it('Verify that addTaskFunction() is working', async () => {
1307 const dynamicThreadPool = new DynamicThreadPool(
1308 Math.floor(numberOfWorkers / 2),
1309 numberOfWorkers,
b2fd3f4a 1310 './tests/worker-files/thread/testWorker.mjs'
9eae3c69
JB
1311 )
1312 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
3feeab69
JB
1313 await expect(
1314 dynamicThreadPool.addTaskFunction(0, () => {})
1315 ).rejects.toThrowError(new TypeError('name argument must be a string'))
1316 await expect(
1317 dynamicThreadPool.addTaskFunction('', () => {})
1318 ).rejects.toThrowError(
1319 new TypeError('name argument must not be an empty string')
1320 )
1321 await expect(
1322 dynamicThreadPool.addTaskFunction('test', 0)
1323 ).rejects.toThrowError(new TypeError('fn argument must be a function'))
1324 await expect(
1325 dynamicThreadPool.addTaskFunction('test', '')
1326 ).rejects.toThrowError(new TypeError('fn argument must be a function'))
9eae3c69
JB
1327 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1328 DEFAULT_TASK_NAME,
1329 'test'
1330 ])
1331 const echoTaskFunction = data => {
1332 return data
1333 }
1334 await expect(
1335 dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1336 ).resolves.toBe(true)
1337 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1338 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1339 echoTaskFunction
1340 )
1341 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1342 DEFAULT_TASK_NAME,
1343 'test',
1344 'echo'
1345 ])
1346 const taskFunctionData = { test: 'test' }
1347 const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
1348 expect(echoResult).toStrictEqual(taskFunctionData)
adee6053
JB
1349 for (const workerNode of dynamicThreadPool.workerNodes) {
1350 expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
1351 tasks: {
1352 executed: expect.any(Number),
1353 executing: 0,
1354 queued: 0,
1355 stolen: 0,
1356 failed: 0
1357 },
1358 runTime: {
1359 history: new CircularArray()
1360 },
1361 waitTime: {
1362 history: new CircularArray()
1363 },
1364 elu: {
1365 idle: {
1366 history: new CircularArray()
1367 },
1368 active: {
1369 history: new CircularArray()
1370 }
1371 }
1372 })
1373 }
9eae3c69
JB
1374 await dynamicThreadPool.destroy()
1375 })
1376
1377 it('Verify that removeTaskFunction() is working', async () => {
1378 const dynamicThreadPool = new DynamicThreadPool(
1379 Math.floor(numberOfWorkers / 2),
1380 numberOfWorkers,
b2fd3f4a 1381 './tests/worker-files/thread/testWorker.mjs'
9eae3c69
JB
1382 )
1383 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1384 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1385 DEFAULT_TASK_NAME,
1386 'test'
1387 ])
1388 await expect(
1389 dynamicThreadPool.removeTaskFunction('test')
1390 ).rejects.toThrowError(
16248b23 1391 new Error('Cannot remove a task function not handled on the pool side')
9eae3c69
JB
1392 )
1393 const echoTaskFunction = data => {
1394 return data
1395 }
1396 await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1397 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1398 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1399 echoTaskFunction
1400 )
1401 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1402 DEFAULT_TASK_NAME,
1403 'test',
1404 'echo'
1405 ])
1406 await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
1407 true
1408 )
1409 expect(dynamicThreadPool.taskFunctions.size).toBe(0)
1410 expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
1411 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1412 DEFAULT_TASK_NAME,
1413 'test'
1414 ])
1415 await dynamicThreadPool.destroy()
1416 })
1417
30500265 1418 it('Verify that listTaskFunctionNames() is working', async () => {
90d7d101
JB
1419 const dynamicThreadPool = new DynamicThreadPool(
1420 Math.floor(numberOfWorkers / 2),
1421 numberOfWorkers,
b2fd3f4a 1422 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
90d7d101
JB
1423 )
1424 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
66979634 1425 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
6cd5248f 1426 DEFAULT_TASK_NAME,
90d7d101
JB
1427 'jsonIntegerSerialization',
1428 'factorial',
1429 'fibonacci'
1430 ])
9eae3c69 1431 await dynamicThreadPool.destroy()
90d7d101
JB
1432 const fixedClusterPool = new FixedClusterPool(
1433 numberOfWorkers,
1434 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1435 )
1436 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
66979634 1437 expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([
6cd5248f 1438 DEFAULT_TASK_NAME,
90d7d101
JB
1439 'jsonIntegerSerialization',
1440 'factorial',
1441 'fibonacci'
1442 ])
0fe39c97 1443 await fixedClusterPool.destroy()
90d7d101
JB
1444 })
1445
9eae3c69 1446 it('Verify that setDefaultTaskFunction() is working', async () => {
30500265
JB
1447 const dynamicThreadPool = new DynamicThreadPool(
1448 Math.floor(numberOfWorkers / 2),
1449 numberOfWorkers,
b2fd3f4a 1450 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
30500265
JB
1451 )
1452 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
b0b55f57
JB
1453 await expect(
1454 dynamicThreadPool.setDefaultTaskFunction(0)
1455 ).rejects.toThrowError(
1456 new Error(
1457 "Task function operation 'default' failed on worker 31 with error: 'TypeError: name parameter is not a string'"
1458 )
1459 )
1460 await expect(
1461 dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
1462 ).rejects.toThrowError(
1463 new Error(
1464 "Task function operation 'default' failed on worker 31 with error: 'Error: Cannot set the default task function reserved name as the default task function'"
1465 )
1466 )
1467 await expect(
1468 dynamicThreadPool.setDefaultTaskFunction('unknown')
1469 ).rejects.toThrowError(
1470 new Error(
1471 "Task function operation 'default' failed on worker 31 with error: 'Error: Cannot set the default task function to a non-existing task function'"
1472 )
1473 )
9eae3c69
JB
1474 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1475 DEFAULT_TASK_NAME,
1476 'jsonIntegerSerialization',
1477 'factorial',
1478 'fibonacci'
1479 ])
1480 await expect(
1481 dynamicThreadPool.setDefaultTaskFunction('factorial')
1482 ).resolves.toBe(true)
1483 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1484 DEFAULT_TASK_NAME,
1485 'factorial',
1486 'jsonIntegerSerialization',
1487 'fibonacci'
1488 ])
1489 await expect(
1490 dynamicThreadPool.setDefaultTaskFunction('fibonacci')
1491 ).resolves.toBe(true)
1492 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1493 DEFAULT_TASK_NAME,
1494 'fibonacci',
1495 'jsonIntegerSerialization',
1496 'factorial'
1497 ])
30500265
JB
1498 })
1499
90d7d101 1500 it('Verify that multiple task functions worker is working', async () => {
70a4f5ea 1501 const pool = new DynamicClusterPool(
2431bdb4 1502 Math.floor(numberOfWorkers / 2),
70a4f5ea 1503 numberOfWorkers,
90d7d101 1504 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
70a4f5ea
JB
1505 )
1506 const data = { n: 10 }
82888165 1507 const result0 = await pool.execute(data)
30b963d4 1508 expect(result0).toStrictEqual({ ok: 1 })
70a4f5ea 1509 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
30b963d4 1510 expect(result1).toStrictEqual({ ok: 1 })
70a4f5ea
JB
1511 const result2 = await pool.execute(data, 'factorial')
1512 expect(result2).toBe(3628800)
1513 const result3 = await pool.execute(data, 'fibonacci')
024daf59 1514 expect(result3).toBe(55)
5bb5be17
JB
1515 expect(pool.info.executingTasks).toBe(0)
1516 expect(pool.info.executedTasks).toBe(4)
b414b84c 1517 for (const workerNode of pool.workerNodes) {
66979634 1518 expect(workerNode.info.taskFunctionNames).toStrictEqual([
6cd5248f 1519 DEFAULT_TASK_NAME,
b414b84c
JB
1520 'jsonIntegerSerialization',
1521 'factorial',
1522 'fibonacci'
1523 ])
1524 expect(workerNode.taskFunctionsUsage.size).toBe(3)
66979634 1525 for (const name of pool.listTaskFunctionNames()) {
5bb5be17
JB
1526 expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
1527 tasks: {
1528 executed: expect.any(Number),
4ba4c7f9 1529 executing: 0,
5bb5be17 1530 failed: 0,
68cbdc84
JB
1531 queued: 0,
1532 stolen: 0
5bb5be17
JB
1533 },
1534 runTime: {
1535 history: expect.any(CircularArray)
1536 },
1537 waitTime: {
1538 history: expect.any(CircularArray)
1539 },
1540 elu: {
1541 idle: {
1542 history: expect.any(CircularArray)
1543 },
1544 active: {
1545 history: expect.any(CircularArray)
1546 }
1547 }
1548 })
1549 expect(
4ba4c7f9
JB
1550 workerNode.getTaskFunctionWorkerUsage(name).tasks.executed
1551 ).toBeGreaterThan(0)
5bb5be17 1552 }
dfd7ec01
JB
1553 expect(
1554 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1555 ).toStrictEqual(
66979634
JB
1556 workerNode.getTaskFunctionWorkerUsage(
1557 workerNode.info.taskFunctionNames[1]
1558 )
dfd7ec01 1559 )
5bb5be17 1560 }
0fe39c97 1561 await pool.destroy()
70a4f5ea 1562 })
52a23942
JB
1563
1564 it('Verify sendKillMessageToWorker()', async () => {
1565 const pool = new DynamicClusterPool(
1566 Math.floor(numberOfWorkers / 2),
1567 numberOfWorkers,
1568 './tests/worker-files/cluster/testWorker.js'
1569 )
1570 const workerNodeKey = 0
1571 await expect(
adee6053 1572 pool.sendKillMessageToWorker(workerNodeKey)
52a23942
JB
1573 ).resolves.toBeUndefined()
1574 await pool.destroy()
1575 })
adee6053
JB
1576
1577 it('Verify sendTaskFunctionOperationToWorker()', async () => {
1578 const pool = new DynamicClusterPool(
1579 Math.floor(numberOfWorkers / 2),
1580 numberOfWorkers,
1581 './tests/worker-files/cluster/testWorker.js'
1582 )
1583 const workerNodeKey = 0
1584 await expect(
1585 pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
1586 taskFunctionOperation: 'add',
1587 taskFunctionName: 'empty',
1588 taskFunction: (() => {}).toString()
1589 })
1590 ).resolves.toBe(true)
1591 expect(
1592 pool.workerNodes[workerNodeKey].info.taskFunctionNames
1593 ).toStrictEqual([DEFAULT_TASK_NAME, 'test', 'empty'])
1594 await pool.destroy()
1595 })
1596
1597 it('Verify sendTaskFunctionOperationToWorkers()', async () => {
1598 const pool = new DynamicClusterPool(
1599 Math.floor(numberOfWorkers / 2),
1600 numberOfWorkers,
1601 './tests/worker-files/cluster/testWorker.js'
1602 )
adee6053
JB
1603 await expect(
1604 pool.sendTaskFunctionOperationToWorkers({
1605 taskFunctionOperation: 'add',
1606 taskFunctionName: 'empty',
1607 taskFunction: (() => {}).toString()
1608 })
1609 ).resolves.toBe(true)
1610 for (const workerNode of pool.workerNodes) {
1611 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1612 DEFAULT_TASK_NAME,
1613 'test',
1614 'empty'
1615 ])
1616 }
1617 await pool.destroy()
1618 })
3ec964d6 1619})