Merge pull request #1553 from poolifier/combined-prs-branch
[poolifier.git] / tests / pools / abstract-pool.test.mjs
CommitLineData
a074ffee 1import { EventEmitterAsyncResource } from 'node:events'
2eb14889 2import { dirname, join } from 'node:path'
a074ffee 3import { readFileSync } from 'node:fs'
2eb14889 4import { fileURLToPath } from 'node:url'
a074ffee
JB
5import { expect } from 'expect'
6import { restore, stub } from 'sinon'
7import {
70a4f5ea 8 DynamicClusterPool,
9e619829 9 DynamicThreadPool,
aee46736 10 FixedClusterPool,
e843b904 11 FixedThreadPool,
aee46736 12 PoolEvents,
184855e6 13 PoolTypes,
3d6dd312 14 WorkerChoiceStrategies,
184855e6 15 WorkerTypes
a074ffee
JB
16} from '../../lib/index.js'
17import { CircularArray } from '../../lib/circular-array.js'
18import { Deque } from '../../lib/deque.js'
19import { DEFAULT_TASK_NAME } from '../../lib/utils.js'
20import { waitPoolEvents } from '../test-utils.js'
21import { WorkerNode } from '../../lib/pools/worker-node.js'
e1ffb94f
JB
22
23describe('Abstract pool test suite', () => {
2eb14889
JB
24 const version = JSON.parse(
25 readFileSync(
a5e5599c 26 join(dirname(fileURLToPath(import.meta.url)), '../..', 'package.json'),
2eb14889
JB
27 'utf8'
28 )
29 ).version
fc027381 30 const numberOfWorkers = 2
a8884ffd 31 class StubPoolWithIsMain extends FixedThreadPool {
e1ffb94f
JB
32 isMain () {
33 return false
34 }
3ec964d6 35 }
3ec964d6 36
dc021bcc 37 afterEach(() => {
a074ffee 38 restore()
dc021bcc
JB
39 })
40
3ec964d6 41 it('Simulate pool creation from a non main thread/process', () => {
8d3782fa
JB
42 expect(
43 () =>
a8884ffd 44 new StubPoolWithIsMain(
7c0ba920 45 numberOfWorkers,
b2fd3f4a 46 './tests/worker-files/thread/testWorker.mjs',
8d3782fa 47 {
041dc05b 48 errorHandler: e => console.error(e)
8d3782fa
JB
49 }
50 )
948faff7 51 ).toThrow(
e695d66f
JB
52 new Error(
53 'Cannot start a pool from a worker with the same type as the pool'
54 )
04f45163 55 )
3ec964d6 56 })
c510fea7 57
bc61cfe6
JB
58 it('Verify that pool statuses properties are set', async () => {
59 const pool = new FixedThreadPool(
60 numberOfWorkers,
b2fd3f4a 61 './tests/worker-files/thread/testWorker.mjs'
bc61cfe6
JB
62 )
63 expect(pool.starting).toBe(false)
64 expect(pool.started).toBe(true)
65 await pool.destroy()
bc61cfe6
JB
66 })
67
c510fea7 68 it('Verify that filePath is checked', () => {
948faff7 69 expect(() => new FixedThreadPool(numberOfWorkers)).toThrow(
fa015370 70 new Error("Cannot find the worker file 'undefined'")
3d6dd312
JB
71 )
72 expect(
73 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
948faff7 74 ).toThrow(new Error("Cannot find the worker file './dummyWorker.ts'"))
8d3782fa
JB
75 })
76
77 it('Verify that numberOfWorkers is checked', () => {
8003c026
JB
78 expect(
79 () =>
80 new FixedThreadPool(
81 undefined,
b2fd3f4a 82 './tests/worker-files/thread/testWorker.mjs'
8003c026 83 )
948faff7 84 ).toThrow(
e695d66f
JB
85 new Error(
86 'Cannot instantiate a pool without specifying the number of workers'
87 )
8d3782fa
JB
88 )
89 })
90
91 it('Verify that a negative number of workers is checked', () => {
92 expect(
93 () =>
94 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
948faff7 95 ).toThrow(
473c717a
JB
96 new RangeError(
97 'Cannot instantiate a pool with a negative number of workers'
98 )
8d3782fa
JB
99 )
100 })
101
102 it('Verify that a non integer number of workers is checked', () => {
103 expect(
104 () =>
b2fd3f4a 105 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.mjs')
948faff7 106 ).toThrow(
473c717a 107 new TypeError(
0d80593b 108 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
109 )
110 )
c510fea7 111 })
7c0ba920 112
216541b6 113 it('Verify that dynamic pool sizing is checked', () => {
a5ed75b7
JB
114 expect(
115 () =>
116 new DynamicClusterPool(
117 1,
118 undefined,
119 './tests/worker-files/cluster/testWorker.js'
120 )
948faff7 121 ).toThrow(
a5ed75b7
JB
122 new TypeError(
123 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
124 )
125 )
2761efb4
JB
126 expect(
127 () =>
128 new DynamicThreadPool(
129 0.5,
130 1,
b2fd3f4a 131 './tests/worker-files/thread/testWorker.mjs'
2761efb4 132 )
948faff7 133 ).toThrow(
2761efb4
JB
134 new TypeError(
135 'Cannot instantiate a pool with a non safe integer number of workers'
136 )
137 )
138 expect(
139 () =>
140 new DynamicClusterPool(
141 0,
142 0.5,
a5ed75b7 143 './tests/worker-files/cluster/testWorker.js'
2761efb4 144 )
948faff7 145 ).toThrow(
2761efb4
JB
146 new TypeError(
147 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
148 )
149 )
2431bdb4
JB
150 expect(
151 () =>
b2fd3f4a
JB
152 new DynamicThreadPool(
153 2,
154 1,
155 './tests/worker-files/thread/testWorker.mjs'
156 )
948faff7 157 ).toThrow(
2431bdb4
JB
158 new RangeError(
159 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
160 )
161 )
162 expect(
163 () =>
b2fd3f4a
JB
164 new DynamicThreadPool(
165 0,
166 0,
167 './tests/worker-files/thread/testWorker.mjs'
168 )
948faff7 169 ).toThrow(
2431bdb4 170 new RangeError(
213cbac6 171 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
2431bdb4
JB
172 )
173 )
21f710aa
JB
174 expect(
175 () =>
213cbac6
JB
176 new DynamicClusterPool(
177 1,
178 1,
179 './tests/worker-files/cluster/testWorker.js'
180 )
948faff7 181 ).toThrow(
21f710aa 182 new RangeError(
213cbac6 183 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
21f710aa
JB
184 )
185 )
2431bdb4
JB
186 })
187
fd7ebd49 188 it('Verify that pool options are checked', async () => {
7c0ba920
JB
189 let pool = new FixedThreadPool(
190 numberOfWorkers,
b2fd3f4a 191 './tests/worker-files/thread/testWorker.mjs'
7c0ba920 192 )
b5604034 193 expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
47352846
JB
194 expect(pool.opts).toStrictEqual({
195 startWorkers: true,
196 enableEvents: true,
197 restartWorkerOnError: true,
198 enableTasksQueue: false,
199 workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
200 workerChoiceStrategyOptions: {
201 retries: 6,
202 runTime: { median: false },
203 waitTime: { median: false },
204 elu: { median: false }
205 }
8990357d
JB
206 })
207 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
8c0b113f 208 retries: 6,
932fc8be 209 runTime: { median: false },
5df69fab
JB
210 waitTime: { median: false },
211 elu: { median: false }
da309861 212 })
999ef664
JB
213 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
214 .workerChoiceStrategies) {
215 expect(workerChoiceStrategy.opts).toStrictEqual({
216 retries: 6,
217 runTime: { median: false },
218 waitTime: { median: false },
219 elu: { median: false }
220 })
221 }
fd7ebd49 222 await pool.destroy()
73bfd59d 223 const testHandler = () => console.info('test handler executed')
7c0ba920
JB
224 pool = new FixedThreadPool(
225 numberOfWorkers,
b2fd3f4a 226 './tests/worker-files/thread/testWorker.mjs',
7c0ba920 227 {
e4543b14 228 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
49be33fe 229 workerChoiceStrategyOptions: {
932fc8be 230 runTime: { median: true },
fc027381 231 weights: { 0: 300, 1: 200 }
49be33fe 232 },
35cf1c03 233 enableEvents: false,
1f68cede 234 restartWorkerOnError: false,
ff733df7 235 enableTasksQueue: true,
d4aeae5a 236 tasksQueueOptions: { concurrency: 2 },
35cf1c03
JB
237 messageHandler: testHandler,
238 errorHandler: testHandler,
239 onlineHandler: testHandler,
240 exitHandler: testHandler
7c0ba920
JB
241 }
242 )
7c0ba920 243 expect(pool.emitter).toBeUndefined()
47352846
JB
244 expect(pool.opts).toStrictEqual({
245 startWorkers: true,
246 enableEvents: false,
247 restartWorkerOnError: false,
248 enableTasksQueue: true,
249 tasksQueueOptions: {
250 concurrency: 2,
2324f8c9 251 size: Math.pow(numberOfWorkers, 2),
dbd73092 252 taskStealing: true,
47352846
JB
253 tasksStealingOnBackPressure: true
254 },
255 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
256 workerChoiceStrategyOptions: {
257 retries: 6,
258 runTime: { median: true },
259 waitTime: { median: false },
260 elu: { median: false },
261 weights: { 0: 300, 1: 200 }
262 },
263 onlineHandler: testHandler,
264 messageHandler: testHandler,
265 errorHandler: testHandler,
266 exitHandler: testHandler
8990357d
JB
267 })
268 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
8c0b113f 269 retries: 6,
8990357d
JB
270 runTime: { median: true },
271 waitTime: { median: false },
272 elu: { median: false },
fc027381 273 weights: { 0: 300, 1: 200 }
da309861 274 })
999ef664
JB
275 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
276 .workerChoiceStrategies) {
277 expect(workerChoiceStrategy.opts).toStrictEqual({
278 retries: 6,
279 runTime: { median: true },
280 waitTime: { median: false },
281 elu: { median: false },
282 weights: { 0: 300, 1: 200 }
283 })
284 }
fd7ebd49 285 await pool.destroy()
7c0ba920
JB
286 })
287
fe291b64 288 it('Verify that pool options are validated', () => {
d4aeae5a
JB
289 expect(
290 () =>
291 new FixedThreadPool(
292 numberOfWorkers,
b2fd3f4a 293 './tests/worker-files/thread/testWorker.mjs',
d4aeae5a 294 {
f0d7f803 295 workerChoiceStrategy: 'invalidStrategy'
d4aeae5a
JB
296 }
297 )
948faff7 298 ).toThrow(new Error("Invalid worker choice strategy 'invalidStrategy'"))
7e653ee0
JB
299 expect(
300 () =>
301 new FixedThreadPool(
302 numberOfWorkers,
b2fd3f4a 303 './tests/worker-files/thread/testWorker.mjs',
7e653ee0
JB
304 {
305 workerChoiceStrategyOptions: {
8c0b113f 306 retries: 'invalidChoiceRetries'
7e653ee0
JB
307 }
308 }
309 )
948faff7 310 ).toThrow(
7e653ee0 311 new TypeError(
8c0b113f 312 'Invalid worker choice strategy options: retries must be an integer'
7e653ee0
JB
313 )
314 )
315 expect(
316 () =>
317 new FixedThreadPool(
318 numberOfWorkers,
b2fd3f4a 319 './tests/worker-files/thread/testWorker.mjs',
7e653ee0
JB
320 {
321 workerChoiceStrategyOptions: {
8c0b113f 322 retries: -1
7e653ee0
JB
323 }
324 }
325 )
948faff7 326 ).toThrow(
7e653ee0 327 new RangeError(
8c0b113f 328 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
7e653ee0
JB
329 )
330 )
49be33fe
JB
331 expect(
332 () =>
333 new FixedThreadPool(
334 numberOfWorkers,
b2fd3f4a 335 './tests/worker-files/thread/testWorker.mjs',
49be33fe
JB
336 {
337 workerChoiceStrategyOptions: { weights: {} }
338 }
339 )
948faff7 340 ).toThrow(
8735b4e5
JB
341 new Error(
342 'Invalid worker choice strategy options: must have a weight for each worker node'
343 )
49be33fe 344 )
f0d7f803
JB
345 expect(
346 () =>
347 new FixedThreadPool(
348 numberOfWorkers,
b2fd3f4a 349 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
350 {
351 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
352 }
353 )
948faff7 354 ).toThrow(
8735b4e5
JB
355 new Error(
356 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
357 )
f0d7f803 358 )
5c4c2dee
JB
359 expect(
360 () =>
361 new FixedThreadPool(
362 numberOfWorkers,
b2fd3f4a 363 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
364 {
365 enableTasksQueue: true,
366 tasksQueueOptions: 'invalidTasksQueueOptions'
367 }
368 )
948faff7 369 ).toThrow(
5c4c2dee
JB
370 new TypeError('Invalid tasks queue options: must be a plain object')
371 )
f0d7f803
JB
372 expect(
373 () =>
374 new FixedThreadPool(
375 numberOfWorkers,
b2fd3f4a 376 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
377 {
378 enableTasksQueue: true,
379 tasksQueueOptions: { concurrency: 0 }
380 }
381 )
948faff7 382 ).toThrow(
e695d66f 383 new RangeError(
20c6f652 384 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
8735b4e5
JB
385 )
386 )
f0d7f803
JB
387 expect(
388 () =>
389 new FixedThreadPool(
390 numberOfWorkers,
b2fd3f4a 391 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
392 {
393 enableTasksQueue: true,
5c4c2dee 394 tasksQueueOptions: { concurrency: -1 }
f0d7f803
JB
395 }
396 )
948faff7 397 ).toThrow(
5c4c2dee
JB
398 new RangeError(
399 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
400 )
8735b4e5 401 )
f0d7f803
JB
402 expect(
403 () =>
404 new FixedThreadPool(
405 numberOfWorkers,
b2fd3f4a 406 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
407 {
408 enableTasksQueue: true,
409 tasksQueueOptions: { concurrency: 0.2 }
410 }
411 )
948faff7 412 ).toThrow(
20c6f652 413 new TypeError('Invalid worker node tasks concurrency: must be an integer')
8735b4e5 414 )
5c4c2dee
JB
415 expect(
416 () =>
417 new FixedThreadPool(
418 numberOfWorkers,
b2fd3f4a 419 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
420 {
421 enableTasksQueue: true,
422 tasksQueueOptions: { size: 0 }
423 }
424 )
948faff7 425 ).toThrow(
5c4c2dee
JB
426 new RangeError(
427 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
428 )
429 )
430 expect(
431 () =>
432 new FixedThreadPool(
433 numberOfWorkers,
b2fd3f4a 434 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
435 {
436 enableTasksQueue: true,
437 tasksQueueOptions: { size: -1 }
438 }
439 )
948faff7 440 ).toThrow(
5c4c2dee
JB
441 new RangeError(
442 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
443 )
444 )
445 expect(
446 () =>
447 new FixedThreadPool(
448 numberOfWorkers,
b2fd3f4a 449 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
450 {
451 enableTasksQueue: true,
452 tasksQueueOptions: { size: 0.2 }
453 }
454 )
948faff7 455 ).toThrow(
5c4c2dee
JB
456 new TypeError('Invalid worker node tasks queue size: must be an integer')
457 )
d4aeae5a
JB
458 })
459
2431bdb4 460 it('Verify that pool worker choice strategy options can be set', async () => {
a20f0ba5
JB
461 const pool = new FixedThreadPool(
462 numberOfWorkers,
b2fd3f4a 463 './tests/worker-files/thread/testWorker.mjs',
a20f0ba5
JB
464 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
465 )
466 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
8c0b113f 467 retries: 6,
8990357d
JB
468 runTime: { median: false },
469 waitTime: { median: false },
470 elu: { median: false }
471 })
472 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
8c0b113f 473 retries: 6,
932fc8be 474 runTime: { median: false },
5df69fab
JB
475 waitTime: { median: false },
476 elu: { median: false }
a20f0ba5
JB
477 })
478 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
479 .workerChoiceStrategies) {
86bf340d 480 expect(workerChoiceStrategy.opts).toStrictEqual({
8c0b113f 481 retries: 6,
932fc8be 482 runTime: { median: false },
5df69fab
JB
483 waitTime: { median: false },
484 elu: { median: false }
86bf340d 485 })
a20f0ba5 486 }
87de9ff5
JB
487 expect(
488 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
489 ).toStrictEqual({
932fc8be
JB
490 runTime: {
491 aggregate: true,
492 average: true,
493 median: false
494 },
495 waitTime: {
496 aggregate: false,
497 average: false,
498 median: false
499 },
5df69fab 500 elu: {
9adcefab
JB
501 aggregate: true,
502 average: true,
5df69fab
JB
503 median: false
504 }
86bf340d 505 })
9adcefab
JB
506 pool.setWorkerChoiceStrategyOptions({
507 runTime: { median: true },
508 elu: { median: true }
509 })
a20f0ba5 510 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
8c0b113f 511 retries: 6,
9adcefab 512 runTime: { median: true },
8990357d
JB
513 waitTime: { median: false },
514 elu: { median: true }
515 })
516 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
8c0b113f 517 retries: 6,
8990357d
JB
518 runTime: { median: true },
519 waitTime: { median: false },
9adcefab 520 elu: { median: true }
a20f0ba5
JB
521 })
522 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
523 .workerChoiceStrategies) {
932fc8be 524 expect(workerChoiceStrategy.opts).toStrictEqual({
8c0b113f 525 retries: 6,
9adcefab 526 runTime: { median: true },
8990357d 527 waitTime: { median: false },
9adcefab 528 elu: { median: true }
932fc8be 529 })
a20f0ba5 530 }
87de9ff5
JB
531 expect(
532 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
533 ).toStrictEqual({
932fc8be
JB
534 runTime: {
535 aggregate: true,
536 average: false,
537 median: true
538 },
539 waitTime: {
540 aggregate: false,
541 average: false,
542 median: false
543 },
5df69fab 544 elu: {
9adcefab 545 aggregate: true,
5df69fab 546 average: false,
9adcefab 547 median: true
5df69fab 548 }
86bf340d 549 })
9adcefab
JB
550 pool.setWorkerChoiceStrategyOptions({
551 runTime: { median: false },
552 elu: { median: false }
553 })
a20f0ba5 554 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
8c0b113f 555 retries: 6,
8990357d
JB
556 runTime: { median: false },
557 waitTime: { median: false },
558 elu: { median: false }
559 })
560 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
8c0b113f 561 retries: 6,
9adcefab 562 runTime: { median: false },
8990357d 563 waitTime: { median: false },
9adcefab 564 elu: { median: false }
a20f0ba5
JB
565 })
566 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
567 .workerChoiceStrategies) {
932fc8be 568 expect(workerChoiceStrategy.opts).toStrictEqual({
8c0b113f 569 retries: 6,
9adcefab 570 runTime: { median: false },
8990357d 571 waitTime: { median: false },
9adcefab 572 elu: { median: false }
932fc8be 573 })
a20f0ba5 574 }
87de9ff5
JB
575 expect(
576 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
577 ).toStrictEqual({
932fc8be
JB
578 runTime: {
579 aggregate: true,
580 average: true,
581 median: false
582 },
583 waitTime: {
584 aggregate: false,
585 average: false,
586 median: false
587 },
5df69fab 588 elu: {
9adcefab
JB
589 aggregate: true,
590 average: true,
5df69fab
JB
591 median: false
592 }
86bf340d 593 })
1f95d544
JB
594 expect(() =>
595 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
948faff7 596 ).toThrow(
8735b4e5
JB
597 new TypeError(
598 'Invalid worker choice strategy options: must be a plain object'
599 )
1f95d544 600 )
7e653ee0
JB
601 expect(() =>
602 pool.setWorkerChoiceStrategyOptions({
8c0b113f 603 retries: 'invalidChoiceRetries'
7e653ee0 604 })
948faff7 605 ).toThrow(
7e653ee0 606 new TypeError(
8c0b113f 607 'Invalid worker choice strategy options: retries must be an integer'
7e653ee0
JB
608 )
609 )
948faff7 610 expect(() => pool.setWorkerChoiceStrategyOptions({ retries: -1 })).toThrow(
7e653ee0 611 new RangeError(
8c0b113f 612 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
7e653ee0
JB
613 )
614 )
948faff7 615 expect(() => pool.setWorkerChoiceStrategyOptions({ weights: {} })).toThrow(
8735b4e5
JB
616 new Error(
617 'Invalid worker choice strategy options: must have a weight for each worker node'
618 )
1f95d544
JB
619 )
620 expect(() =>
621 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
948faff7 622 ).toThrow(
8735b4e5
JB
623 new Error(
624 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
625 )
1f95d544 626 )
a20f0ba5
JB
627 await pool.destroy()
628 })
629
2431bdb4 630 it('Verify that pool tasks queue can be enabled/disabled', async () => {
a20f0ba5
JB
631 const pool = new FixedThreadPool(
632 numberOfWorkers,
b2fd3f4a 633 './tests/worker-files/thread/testWorker.mjs'
a20f0ba5
JB
634 )
635 expect(pool.opts.enableTasksQueue).toBe(false)
636 expect(pool.opts.tasksQueueOptions).toBeUndefined()
637 pool.enableTasksQueue(true)
638 expect(pool.opts.enableTasksQueue).toBe(true)
20c6f652
JB
639 expect(pool.opts.tasksQueueOptions).toStrictEqual({
640 concurrency: 1,
2324f8c9 641 size: Math.pow(numberOfWorkers, 2),
dbd73092 642 taskStealing: true,
47352846 643 tasksStealingOnBackPressure: true
20c6f652 644 })
a20f0ba5
JB
645 pool.enableTasksQueue(true, { concurrency: 2 })
646 expect(pool.opts.enableTasksQueue).toBe(true)
20c6f652
JB
647 expect(pool.opts.tasksQueueOptions).toStrictEqual({
648 concurrency: 2,
2324f8c9 649 size: Math.pow(numberOfWorkers, 2),
dbd73092 650 taskStealing: true,
47352846 651 tasksStealingOnBackPressure: true
20c6f652 652 })
a20f0ba5
JB
653 pool.enableTasksQueue(false)
654 expect(pool.opts.enableTasksQueue).toBe(false)
655 expect(pool.opts.tasksQueueOptions).toBeUndefined()
656 await pool.destroy()
657 })
658
2431bdb4 659 it('Verify that pool tasks queue options can be set', async () => {
a20f0ba5
JB
660 const pool = new FixedThreadPool(
661 numberOfWorkers,
b2fd3f4a 662 './tests/worker-files/thread/testWorker.mjs',
a20f0ba5
JB
663 { enableTasksQueue: true }
664 )
20c6f652
JB
665 expect(pool.opts.tasksQueueOptions).toStrictEqual({
666 concurrency: 1,
2324f8c9 667 size: Math.pow(numberOfWorkers, 2),
dbd73092 668 taskStealing: true,
47352846 669 tasksStealingOnBackPressure: true
20c6f652 670 })
d6ca1416 671 for (const workerNode of pool.workerNodes) {
2324f8c9
JB
672 expect(workerNode.tasksQueueBackPressureSize).toBe(
673 pool.opts.tasksQueueOptions.size
674 )
d6ca1416
JB
675 }
676 pool.setTasksQueueOptions({
677 concurrency: 2,
2324f8c9 678 size: 2,
d6ca1416
JB
679 taskStealing: false,
680 tasksStealingOnBackPressure: false
681 })
20c6f652
JB
682 expect(pool.opts.tasksQueueOptions).toStrictEqual({
683 concurrency: 2,
2324f8c9 684 size: 2,
d6ca1416
JB
685 taskStealing: false,
686 tasksStealingOnBackPressure: false
687 })
688 for (const workerNode of pool.workerNodes) {
2324f8c9
JB
689 expect(workerNode.tasksQueueBackPressureSize).toBe(
690 pool.opts.tasksQueueOptions.size
691 )
d6ca1416
JB
692 }
693 pool.setTasksQueueOptions({
694 concurrency: 1,
695 taskStealing: true,
696 tasksStealingOnBackPressure: true
697 })
698 expect(pool.opts.tasksQueueOptions).toStrictEqual({
699 concurrency: 1,
2324f8c9 700 size: Math.pow(numberOfWorkers, 2),
dbd73092 701 taskStealing: true,
47352846 702 tasksStealingOnBackPressure: true
20c6f652 703 })
d6ca1416 704 for (const workerNode of pool.workerNodes) {
2324f8c9
JB
705 expect(workerNode.tasksQueueBackPressureSize).toBe(
706 pool.opts.tasksQueueOptions.size
707 )
d6ca1416 708 }
948faff7 709 expect(() => pool.setTasksQueueOptions('invalidTasksQueueOptions')).toThrow(
8735b4e5
JB
710 new TypeError('Invalid tasks queue options: must be a plain object')
711 )
948faff7 712 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrow(
e695d66f 713 new RangeError(
20c6f652 714 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
8735b4e5
JB
715 )
716 )
948faff7 717 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrow(
e695d66f 718 new RangeError(
20c6f652 719 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
8735b4e5 720 )
a20f0ba5 721 )
948faff7 722 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrow(
20c6f652
JB
723 new TypeError('Invalid worker node tasks concurrency: must be an integer')
724 )
948faff7 725 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrow(
20c6f652 726 new RangeError(
68dbcdc0 727 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
20c6f652
JB
728 )
729 )
948faff7 730 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrow(
20c6f652 731 new RangeError(
68dbcdc0 732 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
20c6f652
JB
733 )
734 )
948faff7 735 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrow(
68dbcdc0 736 new TypeError('Invalid worker node tasks queue size: must be an integer')
f0d7f803 737 )
a20f0ba5
JB
738 await pool.destroy()
739 })
740
6b27d407
JB
741 it('Verify that pool info is set', async () => {
742 let pool = new FixedThreadPool(
743 numberOfWorkers,
b2fd3f4a 744 './tests/worker-files/thread/testWorker.mjs'
6b27d407 745 )
2dca6cab
JB
746 expect(pool.info).toStrictEqual({
747 version,
748 type: PoolTypes.fixed,
749 worker: WorkerTypes.thread,
47352846 750 started: true,
2dca6cab
JB
751 ready: true,
752 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
753 minSize: numberOfWorkers,
754 maxSize: numberOfWorkers,
755 workerNodes: numberOfWorkers,
756 idleWorkerNodes: numberOfWorkers,
757 busyWorkerNodes: 0,
758 executedTasks: 0,
759 executingTasks: 0,
2dca6cab
JB
760 failedTasks: 0
761 })
6b27d407
JB
762 await pool.destroy()
763 pool = new DynamicClusterPool(
2431bdb4 764 Math.floor(numberOfWorkers / 2),
6b27d407 765 numberOfWorkers,
ecdfbdc0 766 './tests/worker-files/cluster/testWorker.js'
6b27d407 767 )
2dca6cab
JB
768 expect(pool.info).toStrictEqual({
769 version,
770 type: PoolTypes.dynamic,
771 worker: WorkerTypes.cluster,
47352846 772 started: true,
2dca6cab
JB
773 ready: true,
774 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
775 minSize: Math.floor(numberOfWorkers / 2),
776 maxSize: numberOfWorkers,
777 workerNodes: Math.floor(numberOfWorkers / 2),
778 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
779 busyWorkerNodes: 0,
780 executedTasks: 0,
781 executingTasks: 0,
2dca6cab
JB
782 failedTasks: 0
783 })
6b27d407
JB
784 await pool.destroy()
785 })
786
2431bdb4 787 it('Verify that pool worker tasks usage are initialized', async () => {
bf9549ae
JB
788 const pool = new FixedClusterPool(
789 numberOfWorkers,
790 './tests/worker-files/cluster/testWorker.js'
791 )
f06e48d8 792 for (const workerNode of pool.workerNodes) {
47352846 793 expect(workerNode).toBeInstanceOf(WorkerNode)
465b2940 794 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
795 tasks: {
796 executed: 0,
797 executing: 0,
798 queued: 0,
df593701 799 maxQueued: 0,
68cbdc84 800 stolen: 0,
a4e07f72
JB
801 failed: 0
802 },
803 runTime: {
4ba4c7f9 804 history: new CircularArray()
a4e07f72
JB
805 },
806 waitTime: {
4ba4c7f9 807 history: new CircularArray()
a4e07f72 808 },
5df69fab
JB
809 elu: {
810 idle: {
4ba4c7f9 811 history: new CircularArray()
5df69fab
JB
812 },
813 active: {
4ba4c7f9 814 history: new CircularArray()
f7510105 815 }
5df69fab 816 }
86bf340d 817 })
f06e48d8
JB
818 }
819 await pool.destroy()
820 })
821
2431bdb4
JB
822 it('Verify that pool worker tasks queue are initialized', async () => {
823 let pool = new FixedClusterPool(
f06e48d8
JB
824 numberOfWorkers,
825 './tests/worker-files/cluster/testWorker.js'
826 )
827 for (const workerNode of pool.workerNodes) {
47352846 828 expect(workerNode).toBeInstanceOf(WorkerNode)
574b351d 829 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
4d8bf9e4 830 expect(workerNode.tasksQueue.size).toBe(0)
9c16fb4b 831 expect(workerNode.tasksQueue.maxSize).toBe(0)
bf9549ae 832 }
fd7ebd49 833 await pool.destroy()
2431bdb4
JB
834 pool = new DynamicThreadPool(
835 Math.floor(numberOfWorkers / 2),
836 numberOfWorkers,
b2fd3f4a 837 './tests/worker-files/thread/testWorker.mjs'
2431bdb4
JB
838 )
839 for (const workerNode of pool.workerNodes) {
47352846 840 expect(workerNode).toBeInstanceOf(WorkerNode)
574b351d 841 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
2431bdb4
JB
842 expect(workerNode.tasksQueue.size).toBe(0)
843 expect(workerNode.tasksQueue.maxSize).toBe(0)
844 }
213cbac6 845 await pool.destroy()
2431bdb4
JB
846 })
847
848 it('Verify that pool worker info are initialized', async () => {
849 let pool = new FixedClusterPool(
850 numberOfWorkers,
851 './tests/worker-files/cluster/testWorker.js'
852 )
2dca6cab 853 for (const workerNode of pool.workerNodes) {
47352846 854 expect(workerNode).toBeInstanceOf(WorkerNode)
2dca6cab
JB
855 expect(workerNode.info).toStrictEqual({
856 id: expect.any(Number),
857 type: WorkerTypes.cluster,
858 dynamic: false,
859 ready: true
860 })
861 }
2431bdb4
JB
862 await pool.destroy()
863 pool = new DynamicThreadPool(
864 Math.floor(numberOfWorkers / 2),
865 numberOfWorkers,
b2fd3f4a 866 './tests/worker-files/thread/testWorker.mjs'
2431bdb4 867 )
2dca6cab 868 for (const workerNode of pool.workerNodes) {
47352846 869 expect(workerNode).toBeInstanceOf(WorkerNode)
2dca6cab
JB
870 expect(workerNode.info).toStrictEqual({
871 id: expect.any(Number),
872 type: WorkerTypes.thread,
873 dynamic: false,
7884d183 874 ready: true
2dca6cab
JB
875 })
876 }
213cbac6 877 await pool.destroy()
bf9549ae
JB
878 })
879
47352846
JB
880 it('Verify that pool can be started after initialization', async () => {
881 const pool = new FixedClusterPool(
882 numberOfWorkers,
883 './tests/worker-files/cluster/testWorker.js',
884 {
885 startWorkers: false
886 }
887 )
888 expect(pool.info.started).toBe(false)
889 expect(pool.info.ready).toBe(false)
890 expect(pool.workerNodes).toStrictEqual([])
948faff7 891 await expect(pool.execute()).rejects.toThrow(
47352846
JB
892 new Error('Cannot execute a task on not started pool')
893 )
894 pool.start()
895 expect(pool.info.started).toBe(true)
896 expect(pool.info.ready).toBe(true)
897 expect(pool.workerNodes.length).toBe(numberOfWorkers)
898 for (const workerNode of pool.workerNodes) {
899 expect(workerNode).toBeInstanceOf(WorkerNode)
900 }
901 await pool.destroy()
902 })
903
9d2d0da1
JB
904 it('Verify that pool execute() arguments are checked', async () => {
905 const pool = new FixedClusterPool(
906 numberOfWorkers,
907 './tests/worker-files/cluster/testWorker.js'
908 )
948faff7 909 await expect(pool.execute(undefined, 0)).rejects.toThrow(
9d2d0da1
JB
910 new TypeError('name argument must be a string')
911 )
948faff7 912 await expect(pool.execute(undefined, '')).rejects.toThrow(
9d2d0da1
JB
913 new TypeError('name argument must not be an empty string')
914 )
948faff7 915 await expect(pool.execute(undefined, undefined, {})).rejects.toThrow(
9d2d0da1
JB
916 new TypeError('transferList argument must be an array')
917 )
918 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
919 "Task function 'unknown' not found"
920 )
921 await pool.destroy()
948faff7 922 await expect(pool.execute()).rejects.toThrow(
47352846 923 new Error('Cannot execute a task on not started pool')
9d2d0da1
JB
924 )
925 })
926
2431bdb4 927 it('Verify that pool worker tasks usage are computed', async () => {
bf9549ae
JB
928 const pool = new FixedClusterPool(
929 numberOfWorkers,
930 './tests/worker-files/cluster/testWorker.js'
931 )
09c2d0d3 932 const promises = new Set()
fc027381
JB
933 const maxMultiplier = 2
934 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
09c2d0d3 935 promises.add(pool.execute())
bf9549ae 936 }
f06e48d8 937 for (const workerNode of pool.workerNodes) {
465b2940 938 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
939 tasks: {
940 executed: 0,
941 executing: maxMultiplier,
942 queued: 0,
df593701 943 maxQueued: 0,
68cbdc84 944 stolen: 0,
a4e07f72
JB
945 failed: 0
946 },
947 runTime: {
a4e07f72
JB
948 history: expect.any(CircularArray)
949 },
950 waitTime: {
a4e07f72
JB
951 history: expect.any(CircularArray)
952 },
5df69fab
JB
953 elu: {
954 idle: {
5df69fab
JB
955 history: expect.any(CircularArray)
956 },
957 active: {
5df69fab 958 history: expect.any(CircularArray)
f7510105 959 }
5df69fab 960 }
86bf340d 961 })
bf9549ae
JB
962 }
963 await Promise.all(promises)
f06e48d8 964 for (const workerNode of pool.workerNodes) {
465b2940 965 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
966 tasks: {
967 executed: maxMultiplier,
968 executing: 0,
969 queued: 0,
df593701 970 maxQueued: 0,
68cbdc84 971 stolen: 0,
a4e07f72
JB
972 failed: 0
973 },
974 runTime: {
a4e07f72
JB
975 history: expect.any(CircularArray)
976 },
977 waitTime: {
a4e07f72
JB
978 history: expect.any(CircularArray)
979 },
5df69fab
JB
980 elu: {
981 idle: {
5df69fab
JB
982 history: expect.any(CircularArray)
983 },
984 active: {
5df69fab 985 history: expect.any(CircularArray)
f7510105 986 }
5df69fab 987 }
86bf340d 988 })
bf9549ae 989 }
fd7ebd49 990 await pool.destroy()
bf9549ae
JB
991 })
992
2431bdb4 993 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
7fd82a1c 994 const pool = new DynamicThreadPool(
2431bdb4 995 Math.floor(numberOfWorkers / 2),
8f4878b7 996 numberOfWorkers,
b2fd3f4a 997 './tests/worker-files/thread/testWorker.mjs'
9e619829 998 )
09c2d0d3 999 const promises = new Set()
ee9f5295
JB
1000 const maxMultiplier = 2
1001 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
09c2d0d3 1002 promises.add(pool.execute())
9e619829
JB
1003 }
1004 await Promise.all(promises)
f06e48d8 1005 for (const workerNode of pool.workerNodes) {
465b2940 1006 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
1007 tasks: {
1008 executed: expect.any(Number),
1009 executing: 0,
1010 queued: 0,
df593701 1011 maxQueued: 0,
68cbdc84 1012 stolen: 0,
a4e07f72
JB
1013 failed: 0
1014 },
1015 runTime: {
a4e07f72
JB
1016 history: expect.any(CircularArray)
1017 },
1018 waitTime: {
a4e07f72
JB
1019 history: expect.any(CircularArray)
1020 },
5df69fab
JB
1021 elu: {
1022 idle: {
5df69fab
JB
1023 history: expect.any(CircularArray)
1024 },
1025 active: {
5df69fab 1026 history: expect.any(CircularArray)
f7510105 1027 }
5df69fab 1028 }
86bf340d 1029 })
465b2940 1030 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
94407def
JB
1031 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1032 numberOfWorkers * maxMultiplier
1033 )
b97d82d8
JB
1034 expect(workerNode.usage.runTime.history.length).toBe(0)
1035 expect(workerNode.usage.waitTime.history.length).toBe(0)
1036 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1037 expect(workerNode.usage.elu.active.history.length).toBe(0)
9e619829
JB
1038 }
1039 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
f06e48d8 1040 for (const workerNode of pool.workerNodes) {
465b2940 1041 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
1042 tasks: {
1043 executed: 0,
1044 executing: 0,
1045 queued: 0,
df593701 1046 maxQueued: 0,
68cbdc84 1047 stolen: 0,
a4e07f72
JB
1048 failed: 0
1049 },
1050 runTime: {
a4e07f72
JB
1051 history: expect.any(CircularArray)
1052 },
1053 waitTime: {
a4e07f72
JB
1054 history: expect.any(CircularArray)
1055 },
5df69fab
JB
1056 elu: {
1057 idle: {
5df69fab
JB
1058 history: expect.any(CircularArray)
1059 },
1060 active: {
5df69fab 1061 history: expect.any(CircularArray)
f7510105 1062 }
5df69fab 1063 }
86bf340d 1064 })
465b2940
JB
1065 expect(workerNode.usage.runTime.history.length).toBe(0)
1066 expect(workerNode.usage.waitTime.history.length).toBe(0)
b97d82d8
JB
1067 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1068 expect(workerNode.usage.elu.active.history.length).toBe(0)
ee11a4a2 1069 }
fd7ebd49 1070 await pool.destroy()
ee11a4a2
JB
1071 })
1072
a1763c54
JB
1073 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1074 const pool = new DynamicClusterPool(
2431bdb4 1075 Math.floor(numberOfWorkers / 2),
164d950a 1076 numberOfWorkers,
a1763c54 1077 './tests/worker-files/cluster/testWorker.js'
164d950a 1078 )
c726f66c 1079 expect(pool.emitter.eventNames()).toStrictEqual([])
d46660cd 1080 let poolInfo
a1763c54 1081 let poolReady = 0
041dc05b 1082 pool.emitter.on(PoolEvents.ready, info => {
a1763c54 1083 ++poolReady
d46660cd
JB
1084 poolInfo = info
1085 })
a1763c54 1086 await waitPoolEvents(pool, PoolEvents.ready, 1)
c726f66c 1087 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
a1763c54 1088 expect(poolReady).toBe(1)
d46660cd 1089 expect(poolInfo).toStrictEqual({
23ccf9d7 1090 version,
d46660cd 1091 type: PoolTypes.dynamic,
a1763c54 1092 worker: WorkerTypes.cluster,
47352846 1093 started: true,
a1763c54 1094 ready: true,
2431bdb4
JB
1095 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1096 minSize: expect.any(Number),
1097 maxSize: expect.any(Number),
1098 workerNodes: expect.any(Number),
1099 idleWorkerNodes: expect.any(Number),
1100 busyWorkerNodes: expect.any(Number),
1101 executedTasks: expect.any(Number),
1102 executingTasks: expect.any(Number),
2431bdb4
JB
1103 failedTasks: expect.any(Number)
1104 })
1105 await pool.destroy()
1106 })
1107
a1763c54
JB
1108 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1109 const pool = new FixedThreadPool(
2431bdb4 1110 numberOfWorkers,
b2fd3f4a 1111 './tests/worker-files/thread/testWorker.mjs'
2431bdb4 1112 )
c726f66c 1113 expect(pool.emitter.eventNames()).toStrictEqual([])
a1763c54
JB
1114 const promises = new Set()
1115 let poolBusy = 0
2431bdb4 1116 let poolInfo
041dc05b 1117 pool.emitter.on(PoolEvents.busy, info => {
a1763c54 1118 ++poolBusy
2431bdb4
JB
1119 poolInfo = info
1120 })
c726f66c 1121 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
a1763c54
JB
1122 for (let i = 0; i < numberOfWorkers * 2; i++) {
1123 promises.add(pool.execute())
1124 }
1125 await Promise.all(promises)
1126 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1127 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1128 expect(poolBusy).toBe(numberOfWorkers + 1)
2431bdb4
JB
1129 expect(poolInfo).toStrictEqual({
1130 version,
a1763c54
JB
1131 type: PoolTypes.fixed,
1132 worker: WorkerTypes.thread,
47352846
JB
1133 started: true,
1134 ready: true,
2431bdb4 1135 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
d46660cd
JB
1136 minSize: expect.any(Number),
1137 maxSize: expect.any(Number),
1138 workerNodes: expect.any(Number),
1139 idleWorkerNodes: expect.any(Number),
1140 busyWorkerNodes: expect.any(Number),
a4e07f72
JB
1141 executedTasks: expect.any(Number),
1142 executingTasks: expect.any(Number),
a4e07f72 1143 failedTasks: expect.any(Number)
d46660cd 1144 })
164d950a
JB
1145 await pool.destroy()
1146 })
1147
a1763c54
JB
1148 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1149 const pool = new DynamicThreadPool(
1150 Math.floor(numberOfWorkers / 2),
7c0ba920 1151 numberOfWorkers,
b2fd3f4a 1152 './tests/worker-files/thread/testWorker.mjs'
7c0ba920 1153 )
c726f66c 1154 expect(pool.emitter.eventNames()).toStrictEqual([])
09c2d0d3 1155 const promises = new Set()
a1763c54 1156 let poolFull = 0
d46660cd 1157 let poolInfo
041dc05b 1158 pool.emitter.on(PoolEvents.full, info => {
a1763c54 1159 ++poolFull
d46660cd
JB
1160 poolInfo = info
1161 })
c726f66c 1162 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.full])
7c0ba920 1163 for (let i = 0; i < numberOfWorkers * 2; i++) {
f5d14e90 1164 promises.add(pool.execute())
7c0ba920 1165 }
cf597bc5 1166 await Promise.all(promises)
33e6bb4c 1167 expect(poolFull).toBe(1)
d46660cd 1168 expect(poolInfo).toStrictEqual({
23ccf9d7 1169 version,
a1763c54 1170 type: PoolTypes.dynamic,
d46660cd 1171 worker: WorkerTypes.thread,
47352846
JB
1172 started: true,
1173 ready: true,
2431bdb4 1174 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
8735b4e5
JB
1175 minSize: expect.any(Number),
1176 maxSize: expect.any(Number),
1177 workerNodes: expect.any(Number),
1178 idleWorkerNodes: expect.any(Number),
1179 busyWorkerNodes: expect.any(Number),
1180 executedTasks: expect.any(Number),
1181 executingTasks: expect.any(Number),
1182 failedTasks: expect.any(Number)
1183 })
1184 await pool.destroy()
1185 })
1186
3e8611a8 1187 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
b1aae695 1188 const pool = new FixedThreadPool(
8735b4e5 1189 numberOfWorkers,
b2fd3f4a 1190 './tests/worker-files/thread/testWorker.mjs',
8735b4e5
JB
1191 {
1192 enableTasksQueue: true
1193 }
1194 )
a074ffee 1195 stub(pool, 'hasBackPressure').returns(true)
c726f66c 1196 expect(pool.emitter.eventNames()).toStrictEqual([])
8735b4e5
JB
1197 const promises = new Set()
1198 let poolBackPressure = 0
1199 let poolInfo
041dc05b 1200 pool.emitter.on(PoolEvents.backPressure, info => {
8735b4e5
JB
1201 ++poolBackPressure
1202 poolInfo = info
1203 })
c726f66c 1204 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.backPressure])
033f1776 1205 for (let i = 0; i < numberOfWorkers + 1; i++) {
8735b4e5
JB
1206 promises.add(pool.execute())
1207 }
1208 await Promise.all(promises)
033f1776 1209 expect(poolBackPressure).toBe(1)
8735b4e5
JB
1210 expect(poolInfo).toStrictEqual({
1211 version,
3e8611a8 1212 type: PoolTypes.fixed,
8735b4e5 1213 worker: WorkerTypes.thread,
47352846
JB
1214 started: true,
1215 ready: true,
8735b4e5 1216 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
d46660cd
JB
1217 minSize: expect.any(Number),
1218 maxSize: expect.any(Number),
1219 workerNodes: expect.any(Number),
1220 idleWorkerNodes: expect.any(Number),
1221 busyWorkerNodes: expect.any(Number),
a4e07f72
JB
1222 executedTasks: expect.any(Number),
1223 executingTasks: expect.any(Number),
3e8611a8
JB
1224 maxQueuedTasks: expect.any(Number),
1225 queuedTasks: expect.any(Number),
1226 backPressure: true,
68cbdc84 1227 stolenTasks: expect.any(Number),
a4e07f72 1228 failedTasks: expect.any(Number)
d46660cd 1229 })
3e8611a8 1230 expect(pool.hasBackPressure.called).toBe(true)
fd7ebd49 1231 await pool.destroy()
7c0ba920 1232 })
70a4f5ea 1233
9eae3c69
JB
1234 it('Verify that hasTaskFunction() is working', async () => {
1235 const dynamicThreadPool = new DynamicThreadPool(
1236 Math.floor(numberOfWorkers / 2),
1237 numberOfWorkers,
b2fd3f4a 1238 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
9eae3c69
JB
1239 )
1240 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1241 expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1242 expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1243 true
1244 )
1245 expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true)
1246 expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true)
1247 expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false)
1248 await dynamicThreadPool.destroy()
1249 const fixedClusterPool = new FixedClusterPool(
1250 numberOfWorkers,
1251 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1252 )
1253 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1254 expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1255 expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1256 true
1257 )
1258 expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true)
1259 expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true)
1260 expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false)
1261 await fixedClusterPool.destroy()
1262 })
1263
1264 it('Verify that addTaskFunction() is working', async () => {
1265 const dynamicThreadPool = new DynamicThreadPool(
1266 Math.floor(numberOfWorkers / 2),
1267 numberOfWorkers,
b2fd3f4a 1268 './tests/worker-files/thread/testWorker.mjs'
9eae3c69
JB
1269 )
1270 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
3feeab69
JB
1271 await expect(
1272 dynamicThreadPool.addTaskFunction(0, () => {})
948faff7 1273 ).rejects.toThrow(new TypeError('name argument must be a string'))
3feeab69
JB
1274 await expect(
1275 dynamicThreadPool.addTaskFunction('', () => {})
948faff7 1276 ).rejects.toThrow(
3feeab69
JB
1277 new TypeError('name argument must not be an empty string')
1278 )
948faff7
JB
1279 await expect(dynamicThreadPool.addTaskFunction('test', 0)).rejects.toThrow(
1280 new TypeError('fn argument must be a function')
1281 )
1282 await expect(dynamicThreadPool.addTaskFunction('test', '')).rejects.toThrow(
1283 new TypeError('fn argument must be a function')
1284 )
9eae3c69
JB
1285 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1286 DEFAULT_TASK_NAME,
1287 'test'
1288 ])
1289 const echoTaskFunction = data => {
1290 return data
1291 }
1292 await expect(
1293 dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1294 ).resolves.toBe(true)
1295 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1296 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1297 echoTaskFunction
1298 )
1299 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1300 DEFAULT_TASK_NAME,
1301 'test',
1302 'echo'
1303 ])
1304 const taskFunctionData = { test: 'test' }
1305 const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
1306 expect(echoResult).toStrictEqual(taskFunctionData)
adee6053
JB
1307 for (const workerNode of dynamicThreadPool.workerNodes) {
1308 expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
1309 tasks: {
1310 executed: expect.any(Number),
1311 executing: 0,
1312 queued: 0,
1313 stolen: 0,
1314 failed: 0
1315 },
1316 runTime: {
1317 history: new CircularArray()
1318 },
1319 waitTime: {
1320 history: new CircularArray()
1321 },
1322 elu: {
1323 idle: {
1324 history: new CircularArray()
1325 },
1326 active: {
1327 history: new CircularArray()
1328 }
1329 }
1330 })
1331 }
9eae3c69
JB
1332 await dynamicThreadPool.destroy()
1333 })
1334
1335 it('Verify that removeTaskFunction() is working', async () => {
1336 const dynamicThreadPool = new DynamicThreadPool(
1337 Math.floor(numberOfWorkers / 2),
1338 numberOfWorkers,
b2fd3f4a 1339 './tests/worker-files/thread/testWorker.mjs'
9eae3c69
JB
1340 )
1341 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1342 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1343 DEFAULT_TASK_NAME,
1344 'test'
1345 ])
948faff7 1346 await expect(dynamicThreadPool.removeTaskFunction('test')).rejects.toThrow(
16248b23 1347 new Error('Cannot remove a task function not handled on the pool side')
9eae3c69
JB
1348 )
1349 const echoTaskFunction = data => {
1350 return data
1351 }
1352 await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1353 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1354 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1355 echoTaskFunction
1356 )
1357 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1358 DEFAULT_TASK_NAME,
1359 'test',
1360 'echo'
1361 ])
1362 await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
1363 true
1364 )
1365 expect(dynamicThreadPool.taskFunctions.size).toBe(0)
1366 expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
1367 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1368 DEFAULT_TASK_NAME,
1369 'test'
1370 ])
1371 await dynamicThreadPool.destroy()
1372 })
1373
30500265 1374 it('Verify that listTaskFunctionNames() is working', async () => {
90d7d101
JB
1375 const dynamicThreadPool = new DynamicThreadPool(
1376 Math.floor(numberOfWorkers / 2),
1377 numberOfWorkers,
b2fd3f4a 1378 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
90d7d101
JB
1379 )
1380 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
66979634 1381 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
6cd5248f 1382 DEFAULT_TASK_NAME,
90d7d101
JB
1383 'jsonIntegerSerialization',
1384 'factorial',
1385 'fibonacci'
1386 ])
9eae3c69 1387 await dynamicThreadPool.destroy()
90d7d101
JB
1388 const fixedClusterPool = new FixedClusterPool(
1389 numberOfWorkers,
1390 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1391 )
1392 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
66979634 1393 expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([
6cd5248f 1394 DEFAULT_TASK_NAME,
90d7d101
JB
1395 'jsonIntegerSerialization',
1396 'factorial',
1397 'fibonacci'
1398 ])
0fe39c97 1399 await fixedClusterPool.destroy()
90d7d101
JB
1400 })
1401
9eae3c69 1402 it('Verify that setDefaultTaskFunction() is working', async () => {
30500265
JB
1403 const dynamicThreadPool = new DynamicThreadPool(
1404 Math.floor(numberOfWorkers / 2),
1405 numberOfWorkers,
b2fd3f4a 1406 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
30500265
JB
1407 )
1408 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
948faff7 1409 await expect(dynamicThreadPool.setDefaultTaskFunction(0)).rejects.toThrow(
b0b55f57
JB
1410 new Error(
1411 "Task function operation 'default' failed on worker 31 with error: 'TypeError: name parameter is not a string'"
1412 )
1413 )
1414 await expect(
1415 dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
948faff7 1416 ).rejects.toThrow(
b0b55f57
JB
1417 new Error(
1418 "Task function operation 'default' failed on worker 31 with error: 'Error: Cannot set the default task function reserved name as the default task function'"
1419 )
1420 )
1421 await expect(
1422 dynamicThreadPool.setDefaultTaskFunction('unknown')
948faff7 1423 ).rejects.toThrow(
b0b55f57
JB
1424 new Error(
1425 "Task function operation 'default' failed on worker 31 with error: 'Error: Cannot set the default task function to a non-existing task function'"
1426 )
1427 )
9eae3c69
JB
1428 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1429 DEFAULT_TASK_NAME,
1430 'jsonIntegerSerialization',
1431 'factorial',
1432 'fibonacci'
1433 ])
1434 await expect(
1435 dynamicThreadPool.setDefaultTaskFunction('factorial')
1436 ).resolves.toBe(true)
1437 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1438 DEFAULT_TASK_NAME,
1439 'factorial',
1440 'jsonIntegerSerialization',
1441 'fibonacci'
1442 ])
1443 await expect(
1444 dynamicThreadPool.setDefaultTaskFunction('fibonacci')
1445 ).resolves.toBe(true)
1446 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1447 DEFAULT_TASK_NAME,
1448 'fibonacci',
1449 'jsonIntegerSerialization',
1450 'factorial'
1451 ])
cda9ba34 1452 await dynamicThreadPool.destroy()
30500265
JB
1453 })
1454
90d7d101 1455 it('Verify that multiple task functions worker is working', async () => {
70a4f5ea 1456 const pool = new DynamicClusterPool(
2431bdb4 1457 Math.floor(numberOfWorkers / 2),
70a4f5ea 1458 numberOfWorkers,
90d7d101 1459 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
70a4f5ea
JB
1460 )
1461 const data = { n: 10 }
82888165 1462 const result0 = await pool.execute(data)
30b963d4 1463 expect(result0).toStrictEqual({ ok: 1 })
70a4f5ea 1464 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
30b963d4 1465 expect(result1).toStrictEqual({ ok: 1 })
70a4f5ea
JB
1466 const result2 = await pool.execute(data, 'factorial')
1467 expect(result2).toBe(3628800)
1468 const result3 = await pool.execute(data, 'fibonacci')
024daf59 1469 expect(result3).toBe(55)
5bb5be17
JB
1470 expect(pool.info.executingTasks).toBe(0)
1471 expect(pool.info.executedTasks).toBe(4)
b414b84c 1472 for (const workerNode of pool.workerNodes) {
66979634 1473 expect(workerNode.info.taskFunctionNames).toStrictEqual([
6cd5248f 1474 DEFAULT_TASK_NAME,
b414b84c
JB
1475 'jsonIntegerSerialization',
1476 'factorial',
1477 'fibonacci'
1478 ])
1479 expect(workerNode.taskFunctionsUsage.size).toBe(3)
66979634 1480 for (const name of pool.listTaskFunctionNames()) {
5bb5be17
JB
1481 expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
1482 tasks: {
1483 executed: expect.any(Number),
4ba4c7f9 1484 executing: 0,
5bb5be17 1485 failed: 0,
68cbdc84
JB
1486 queued: 0,
1487 stolen: 0
5bb5be17
JB
1488 },
1489 runTime: {
1490 history: expect.any(CircularArray)
1491 },
1492 waitTime: {
1493 history: expect.any(CircularArray)
1494 },
1495 elu: {
1496 idle: {
1497 history: expect.any(CircularArray)
1498 },
1499 active: {
1500 history: expect.any(CircularArray)
1501 }
1502 }
1503 })
1504 expect(
4ba4c7f9
JB
1505 workerNode.getTaskFunctionWorkerUsage(name).tasks.executed
1506 ).toBeGreaterThan(0)
5bb5be17 1507 }
dfd7ec01
JB
1508 expect(
1509 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1510 ).toStrictEqual(
66979634
JB
1511 workerNode.getTaskFunctionWorkerUsage(
1512 workerNode.info.taskFunctionNames[1]
1513 )
dfd7ec01 1514 )
5bb5be17 1515 }
0fe39c97 1516 await pool.destroy()
70a4f5ea 1517 })
52a23942
JB
1518
1519 it('Verify sendKillMessageToWorker()', async () => {
1520 const pool = new DynamicClusterPool(
1521 Math.floor(numberOfWorkers / 2),
1522 numberOfWorkers,
1523 './tests/worker-files/cluster/testWorker.js'
1524 )
1525 const workerNodeKey = 0
1526 await expect(
adee6053 1527 pool.sendKillMessageToWorker(workerNodeKey)
52a23942
JB
1528 ).resolves.toBeUndefined()
1529 await pool.destroy()
1530 })
adee6053
JB
1531
1532 it('Verify sendTaskFunctionOperationToWorker()', async () => {
1533 const pool = new DynamicClusterPool(
1534 Math.floor(numberOfWorkers / 2),
1535 numberOfWorkers,
1536 './tests/worker-files/cluster/testWorker.js'
1537 )
1538 const workerNodeKey = 0
1539 await expect(
1540 pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
1541 taskFunctionOperation: 'add',
1542 taskFunctionName: 'empty',
1543 taskFunction: (() => {}).toString()
1544 })
1545 ).resolves.toBe(true)
1546 expect(
1547 pool.workerNodes[workerNodeKey].info.taskFunctionNames
1548 ).toStrictEqual([DEFAULT_TASK_NAME, 'test', 'empty'])
1549 await pool.destroy()
1550 })
1551
1552 it('Verify sendTaskFunctionOperationToWorkers()', async () => {
1553 const pool = new DynamicClusterPool(
1554 Math.floor(numberOfWorkers / 2),
1555 numberOfWorkers,
1556 './tests/worker-files/cluster/testWorker.js'
1557 )
adee6053
JB
1558 await expect(
1559 pool.sendTaskFunctionOperationToWorkers({
1560 taskFunctionOperation: 'add',
1561 taskFunctionName: 'empty',
1562 taskFunction: (() => {}).toString()
1563 })
1564 ).resolves.toBe(true)
1565 for (const workerNode of pool.workerNodes) {
1566 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1567 DEFAULT_TASK_NAME,
1568 'test',
1569 'empty'
1570 ])
1571 }
1572 await pool.destroy()
1573 })
3ec964d6 1574})