build: silence sourcemap enablement warning
[poolifier.git] / tests / pools / abstract-pool.test.js
CommitLineData
b5604034 1const { EventEmitterAsyncResource } = require('node:events')
a61a0724 2const { expect } = require('expect')
b1aae695 3const sinon = require('sinon')
e843b904 4const {
70a4f5ea 5 DynamicClusterPool,
9e619829 6 DynamicThreadPool,
aee46736 7 FixedClusterPool,
e843b904 8 FixedThreadPool,
aee46736 9 PoolEvents,
184855e6 10 PoolTypes,
3d6dd312 11 WorkerChoiceStrategies,
184855e6 12 WorkerTypes
bfc75cca
JB
13} = require('../../lib')
14const { CircularArray } = require('../../lib/circular-array')
15const { Deque } = require('../../lib/deque')
16const { DEFAULT_TASK_NAME } = require('../../lib/utils')
17const { version } = require('../../package.json')
18const { waitPoolEvents } = require('../test-utils')
19const { WorkerNode } = require('../../lib/pools/worker-node')
e1ffb94f
JB
20
21describe('Abstract pool test suite', () => {
fc027381 22 const numberOfWorkers = 2
a8884ffd 23 class StubPoolWithIsMain extends FixedThreadPool {
e1ffb94f
JB
24 isMain () {
25 return false
26 }
3ec964d6 27 }
3ec964d6 28
dc021bcc
JB
29 afterEach(() => {
30 sinon.restore()
31 })
32
3ec964d6 33 it('Simulate pool creation from a non main thread/process', () => {
8d3782fa
JB
34 expect(
35 () =>
a8884ffd 36 new StubPoolWithIsMain(
7c0ba920 37 numberOfWorkers,
8d3782fa
JB
38 './tests/worker-files/thread/testWorker.js',
39 {
041dc05b 40 errorHandler: e => console.error(e)
8d3782fa
JB
41 }
42 )
04f45163 43 ).toThrowError(
e695d66f
JB
44 new Error(
45 'Cannot start a pool from a worker with the same type as the pool'
46 )
04f45163 47 )
3ec964d6 48 })
c510fea7 49
bc61cfe6
JB
50 it('Verify that pool statuses properties are set', async () => {
51 const pool = new FixedThreadPool(
52 numberOfWorkers,
53 './tests/worker-files/thread/testWorker.js'
54 )
55 expect(pool.starting).toBe(false)
56 expect(pool.started).toBe(true)
57 await pool.destroy()
bc61cfe6
JB
58 })
59
c510fea7 60 it('Verify that filePath is checked', () => {
292ad316
JB
61 const expectedError = new Error(
62 'Please specify a file with a worker implementation'
63 )
7c0ba920 64 expect(() => new FixedThreadPool(numberOfWorkers)).toThrowError(
292ad316 65 expectedError
8d3782fa 66 )
7c0ba920 67 expect(() => new FixedThreadPool(numberOfWorkers, '')).toThrowError(
292ad316 68 expectedError
8d3782fa 69 )
3d6dd312
JB
70 expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrowError(
71 expectedError
72 )
73 expect(() => new FixedThreadPool(numberOfWorkers, true)).toThrowError(
74 expectedError
75 )
76 expect(
77 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
78 ).toThrowError(new Error("Cannot find the worker file './dummyWorker.ts'"))
8d3782fa
JB
79 })
80
81 it('Verify that numberOfWorkers is checked', () => {
8003c026
JB
82 expect(
83 () =>
84 new FixedThreadPool(
85 undefined,
86 './tests/worker-files/thread/testWorker.js'
87 )
88 ).toThrowError(
e695d66f
JB
89 new Error(
90 'Cannot instantiate a pool without specifying the number of workers'
91 )
8d3782fa
JB
92 )
93 })
94
95 it('Verify that a negative number of workers is checked', () => {
96 expect(
97 () =>
98 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
99 ).toThrowError(
473c717a
JB
100 new RangeError(
101 'Cannot instantiate a pool with a negative number of workers'
102 )
8d3782fa
JB
103 )
104 })
105
106 it('Verify that a non integer number of workers is checked', () => {
107 expect(
108 () =>
109 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
110 ).toThrowError(
473c717a 111 new TypeError(
0d80593b 112 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
113 )
114 )
c510fea7 115 })
7c0ba920 116
216541b6 117 it('Verify that dynamic pool sizing is checked', () => {
a5ed75b7
JB
118 expect(
119 () =>
120 new DynamicClusterPool(
121 1,
122 undefined,
123 './tests/worker-files/cluster/testWorker.js'
124 )
125 ).toThrowError(
126 new TypeError(
127 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
128 )
129 )
2761efb4
JB
130 expect(
131 () =>
132 new DynamicThreadPool(
133 0.5,
134 1,
135 './tests/worker-files/thread/testWorker.js'
136 )
137 ).toThrowError(
138 new TypeError(
139 'Cannot instantiate a pool with a non safe integer number of workers'
140 )
141 )
142 expect(
143 () =>
144 new DynamicClusterPool(
145 0,
146 0.5,
a5ed75b7 147 './tests/worker-files/cluster/testWorker.js'
2761efb4
JB
148 )
149 ).toThrowError(
150 new TypeError(
151 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
152 )
153 )
2431bdb4
JB
154 expect(
155 () =>
156 new DynamicThreadPool(2, 1, './tests/worker-files/thread/testWorker.js')
157 ).toThrowError(
158 new RangeError(
159 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
160 )
161 )
162 expect(
163 () =>
213cbac6 164 new DynamicThreadPool(0, 0, './tests/worker-files/thread/testWorker.js')
2431bdb4
JB
165 ).toThrowError(
166 new RangeError(
213cbac6 167 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
2431bdb4
JB
168 )
169 )
21f710aa
JB
170 expect(
171 () =>
213cbac6
JB
172 new DynamicClusterPool(
173 1,
174 1,
175 './tests/worker-files/cluster/testWorker.js'
176 )
21f710aa
JB
177 ).toThrowError(
178 new RangeError(
213cbac6 179 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
21f710aa
JB
180 )
181 )
2431bdb4
JB
182 })
183
fd7ebd49 184 it('Verify that pool options are checked', async () => {
7c0ba920
JB
185 let pool = new FixedThreadPool(
186 numberOfWorkers,
187 './tests/worker-files/thread/testWorker.js'
188 )
b5604034 189 expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
47352846
JB
190 expect(pool.opts).toStrictEqual({
191 startWorkers: true,
192 enableEvents: true,
193 restartWorkerOnError: true,
194 enableTasksQueue: false,
195 workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
196 workerChoiceStrategyOptions: {
197 retries: 6,
198 runTime: { median: false },
199 waitTime: { median: false },
200 elu: { median: false }
201 }
8990357d
JB
202 })
203 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
8c0b113f 204 retries: 6,
932fc8be 205 runTime: { median: false },
5df69fab
JB
206 waitTime: { median: false },
207 elu: { median: false }
da309861 208 })
999ef664
JB
209 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
210 .workerChoiceStrategies) {
211 expect(workerChoiceStrategy.opts).toStrictEqual({
212 retries: 6,
213 runTime: { median: false },
214 waitTime: { median: false },
215 elu: { median: false }
216 })
217 }
fd7ebd49 218 await pool.destroy()
73bfd59d 219 const testHandler = () => console.info('test handler executed')
7c0ba920
JB
220 pool = new FixedThreadPool(
221 numberOfWorkers,
222 './tests/worker-files/thread/testWorker.js',
223 {
e4543b14 224 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
49be33fe 225 workerChoiceStrategyOptions: {
932fc8be 226 runTime: { median: true },
fc027381 227 weights: { 0: 300, 1: 200 }
49be33fe 228 },
35cf1c03 229 enableEvents: false,
1f68cede 230 restartWorkerOnError: false,
ff733df7 231 enableTasksQueue: true,
d4aeae5a 232 tasksQueueOptions: { concurrency: 2 },
35cf1c03
JB
233 messageHandler: testHandler,
234 errorHandler: testHandler,
235 onlineHandler: testHandler,
236 exitHandler: testHandler
7c0ba920
JB
237 }
238 )
7c0ba920 239 expect(pool.emitter).toBeUndefined()
47352846
JB
240 expect(pool.opts).toStrictEqual({
241 startWorkers: true,
242 enableEvents: false,
243 restartWorkerOnError: false,
244 enableTasksQueue: true,
245 tasksQueueOptions: {
246 concurrency: 2,
2324f8c9 247 size: Math.pow(numberOfWorkers, 2),
dbd73092 248 taskStealing: true,
47352846
JB
249 tasksStealingOnBackPressure: true
250 },
251 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
252 workerChoiceStrategyOptions: {
253 retries: 6,
254 runTime: { median: true },
255 waitTime: { median: false },
256 elu: { median: false },
257 weights: { 0: 300, 1: 200 }
258 },
259 onlineHandler: testHandler,
260 messageHandler: testHandler,
261 errorHandler: testHandler,
262 exitHandler: testHandler
8990357d
JB
263 })
264 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
8c0b113f 265 retries: 6,
8990357d
JB
266 runTime: { median: true },
267 waitTime: { median: false },
268 elu: { median: false },
fc027381 269 weights: { 0: 300, 1: 200 }
da309861 270 })
999ef664
JB
271 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
272 .workerChoiceStrategies) {
273 expect(workerChoiceStrategy.opts).toStrictEqual({
274 retries: 6,
275 runTime: { median: true },
276 waitTime: { median: false },
277 elu: { median: false },
278 weights: { 0: 300, 1: 200 }
279 })
280 }
fd7ebd49 281 await pool.destroy()
7c0ba920
JB
282 })
283
a20f0ba5 284 it('Verify that pool options are validated', async () => {
d4aeae5a
JB
285 expect(
286 () =>
287 new FixedThreadPool(
288 numberOfWorkers,
289 './tests/worker-files/thread/testWorker.js',
290 {
f0d7f803 291 workerChoiceStrategy: 'invalidStrategy'
d4aeae5a
JB
292 }
293 )
8735b4e5
JB
294 ).toThrowError(
295 new Error("Invalid worker choice strategy 'invalidStrategy'")
296 )
7e653ee0
JB
297 expect(
298 () =>
299 new FixedThreadPool(
300 numberOfWorkers,
301 './tests/worker-files/thread/testWorker.js',
302 {
303 workerChoiceStrategyOptions: {
8c0b113f 304 retries: 'invalidChoiceRetries'
7e653ee0
JB
305 }
306 }
307 )
308 ).toThrowError(
309 new TypeError(
8c0b113f 310 'Invalid worker choice strategy options: retries must be an integer'
7e653ee0
JB
311 )
312 )
313 expect(
314 () =>
315 new FixedThreadPool(
316 numberOfWorkers,
317 './tests/worker-files/thread/testWorker.js',
318 {
319 workerChoiceStrategyOptions: {
8c0b113f 320 retries: -1
7e653ee0
JB
321 }
322 }
323 )
324 ).toThrowError(
325 new RangeError(
8c0b113f 326 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
7e653ee0
JB
327 )
328 )
49be33fe
JB
329 expect(
330 () =>
331 new FixedThreadPool(
332 numberOfWorkers,
333 './tests/worker-files/thread/testWorker.js',
334 {
335 workerChoiceStrategyOptions: { weights: {} }
336 }
337 )
338 ).toThrowError(
8735b4e5
JB
339 new Error(
340 'Invalid worker choice strategy options: must have a weight for each worker node'
341 )
49be33fe 342 )
f0d7f803
JB
343 expect(
344 () =>
345 new FixedThreadPool(
346 numberOfWorkers,
347 './tests/worker-files/thread/testWorker.js',
348 {
349 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
350 }
351 )
352 ).toThrowError(
8735b4e5
JB
353 new Error(
354 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
355 )
f0d7f803 356 )
5c4c2dee
JB
357 expect(
358 () =>
359 new FixedThreadPool(
360 numberOfWorkers,
361 './tests/worker-files/thread/testWorker.js',
362 {
363 enableTasksQueue: true,
364 tasksQueueOptions: 'invalidTasksQueueOptions'
365 }
366 )
367 ).toThrowError(
368 new TypeError('Invalid tasks queue options: must be a plain object')
369 )
f0d7f803
JB
370 expect(
371 () =>
372 new FixedThreadPool(
373 numberOfWorkers,
374 './tests/worker-files/thread/testWorker.js',
375 {
376 enableTasksQueue: true,
377 tasksQueueOptions: { concurrency: 0 }
378 }
379 )
8735b4e5 380 ).toThrowError(
e695d66f 381 new RangeError(
20c6f652 382 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
8735b4e5
JB
383 )
384 )
f0d7f803
JB
385 expect(
386 () =>
387 new FixedThreadPool(
388 numberOfWorkers,
389 './tests/worker-files/thread/testWorker.js',
390 {
391 enableTasksQueue: true,
5c4c2dee 392 tasksQueueOptions: { concurrency: -1 }
f0d7f803
JB
393 }
394 )
8735b4e5 395 ).toThrowError(
5c4c2dee
JB
396 new RangeError(
397 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
398 )
8735b4e5 399 )
f0d7f803
JB
400 expect(
401 () =>
402 new FixedThreadPool(
403 numberOfWorkers,
404 './tests/worker-files/thread/testWorker.js',
405 {
406 enableTasksQueue: true,
407 tasksQueueOptions: { concurrency: 0.2 }
408 }
409 )
8735b4e5 410 ).toThrowError(
20c6f652 411 new TypeError('Invalid worker node tasks concurrency: must be an integer')
8735b4e5 412 )
5c4c2dee
JB
413 expect(
414 () =>
415 new FixedThreadPool(
416 numberOfWorkers,
417 './tests/worker-files/thread/testWorker.js',
418 {
419 enableTasksQueue: true,
420 tasksQueueOptions: { size: 0 }
421 }
422 )
423 ).toThrowError(
424 new RangeError(
425 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
426 )
427 )
428 expect(
429 () =>
430 new FixedThreadPool(
431 numberOfWorkers,
432 './tests/worker-files/thread/testWorker.js',
433 {
434 enableTasksQueue: true,
435 tasksQueueOptions: { size: -1 }
436 }
437 )
438 ).toThrowError(
439 new RangeError(
440 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
441 )
442 )
443 expect(
444 () =>
445 new FixedThreadPool(
446 numberOfWorkers,
447 './tests/worker-files/thread/testWorker.js',
448 {
449 enableTasksQueue: true,
450 tasksQueueOptions: { size: 0.2 }
451 }
452 )
453 ).toThrowError(
454 new TypeError('Invalid worker node tasks queue size: must be an integer')
455 )
d4aeae5a
JB
456 })
457
2431bdb4 458 it('Verify that pool worker choice strategy options can be set', async () => {
a20f0ba5
JB
459 const pool = new FixedThreadPool(
460 numberOfWorkers,
461 './tests/worker-files/thread/testWorker.js',
462 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
463 )
464 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
8c0b113f 465 retries: 6,
8990357d
JB
466 runTime: { median: false },
467 waitTime: { median: false },
468 elu: { median: false }
469 })
470 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
8c0b113f 471 retries: 6,
932fc8be 472 runTime: { median: false },
5df69fab
JB
473 waitTime: { median: false },
474 elu: { median: false }
a20f0ba5
JB
475 })
476 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
477 .workerChoiceStrategies) {
86bf340d 478 expect(workerChoiceStrategy.opts).toStrictEqual({
8c0b113f 479 retries: 6,
932fc8be 480 runTime: { median: false },
5df69fab
JB
481 waitTime: { median: false },
482 elu: { median: false }
86bf340d 483 })
a20f0ba5 484 }
87de9ff5
JB
485 expect(
486 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
487 ).toStrictEqual({
932fc8be
JB
488 runTime: {
489 aggregate: true,
490 average: true,
491 median: false
492 },
493 waitTime: {
494 aggregate: false,
495 average: false,
496 median: false
497 },
5df69fab 498 elu: {
9adcefab
JB
499 aggregate: true,
500 average: true,
5df69fab
JB
501 median: false
502 }
86bf340d 503 })
9adcefab
JB
504 pool.setWorkerChoiceStrategyOptions({
505 runTime: { median: true },
506 elu: { median: true }
507 })
a20f0ba5 508 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
8c0b113f 509 retries: 6,
9adcefab 510 runTime: { median: true },
8990357d
JB
511 waitTime: { median: false },
512 elu: { median: true }
513 })
514 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
8c0b113f 515 retries: 6,
8990357d
JB
516 runTime: { median: true },
517 waitTime: { median: false },
9adcefab 518 elu: { median: true }
a20f0ba5
JB
519 })
520 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
521 .workerChoiceStrategies) {
932fc8be 522 expect(workerChoiceStrategy.opts).toStrictEqual({
8c0b113f 523 retries: 6,
9adcefab 524 runTime: { median: true },
8990357d 525 waitTime: { median: false },
9adcefab 526 elu: { median: true }
932fc8be 527 })
a20f0ba5 528 }
87de9ff5
JB
529 expect(
530 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
531 ).toStrictEqual({
932fc8be
JB
532 runTime: {
533 aggregate: true,
534 average: false,
535 median: true
536 },
537 waitTime: {
538 aggregate: false,
539 average: false,
540 median: false
541 },
5df69fab 542 elu: {
9adcefab 543 aggregate: true,
5df69fab 544 average: false,
9adcefab 545 median: true
5df69fab 546 }
86bf340d 547 })
9adcefab
JB
548 pool.setWorkerChoiceStrategyOptions({
549 runTime: { median: false },
550 elu: { median: false }
551 })
a20f0ba5 552 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
8c0b113f 553 retries: 6,
8990357d
JB
554 runTime: { median: false },
555 waitTime: { median: false },
556 elu: { median: false }
557 })
558 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
8c0b113f 559 retries: 6,
9adcefab 560 runTime: { median: false },
8990357d 561 waitTime: { median: false },
9adcefab 562 elu: { median: false }
a20f0ba5
JB
563 })
564 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
565 .workerChoiceStrategies) {
932fc8be 566 expect(workerChoiceStrategy.opts).toStrictEqual({
8c0b113f 567 retries: 6,
9adcefab 568 runTime: { median: false },
8990357d 569 waitTime: { median: false },
9adcefab 570 elu: { median: false }
932fc8be 571 })
a20f0ba5 572 }
87de9ff5
JB
573 expect(
574 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
575 ).toStrictEqual({
932fc8be
JB
576 runTime: {
577 aggregate: true,
578 average: true,
579 median: false
580 },
581 waitTime: {
582 aggregate: false,
583 average: false,
584 median: false
585 },
5df69fab 586 elu: {
9adcefab
JB
587 aggregate: true,
588 average: true,
5df69fab
JB
589 median: false
590 }
86bf340d 591 })
1f95d544
JB
592 expect(() =>
593 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
594 ).toThrowError(
8735b4e5
JB
595 new TypeError(
596 'Invalid worker choice strategy options: must be a plain object'
597 )
1f95d544 598 )
7e653ee0
JB
599 expect(() =>
600 pool.setWorkerChoiceStrategyOptions({
8c0b113f 601 retries: 'invalidChoiceRetries'
7e653ee0
JB
602 })
603 ).toThrowError(
604 new TypeError(
8c0b113f 605 'Invalid worker choice strategy options: retries must be an integer'
7e653ee0
JB
606 )
607 )
608 expect(() =>
8c0b113f 609 pool.setWorkerChoiceStrategyOptions({ retries: -1 })
7e653ee0
JB
610 ).toThrowError(
611 new RangeError(
8c0b113f 612 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
7e653ee0
JB
613 )
614 )
1f95d544
JB
615 expect(() =>
616 pool.setWorkerChoiceStrategyOptions({ weights: {} })
617 ).toThrowError(
8735b4e5
JB
618 new Error(
619 'Invalid worker choice strategy options: must have a weight for each worker node'
620 )
1f95d544
JB
621 )
622 expect(() =>
623 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
624 ).toThrowError(
8735b4e5
JB
625 new Error(
626 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
627 )
1f95d544 628 )
a20f0ba5
JB
629 await pool.destroy()
630 })
631
2431bdb4 632 it('Verify that pool tasks queue can be enabled/disabled', async () => {
a20f0ba5
JB
633 const pool = new FixedThreadPool(
634 numberOfWorkers,
635 './tests/worker-files/thread/testWorker.js'
636 )
637 expect(pool.opts.enableTasksQueue).toBe(false)
638 expect(pool.opts.tasksQueueOptions).toBeUndefined()
d6ca1416
JB
639 for (const workerNode of pool.workerNodes) {
640 expect(workerNode.onEmptyQueue).toBeUndefined()
641 expect(workerNode.onBackPressure).toBeUndefined()
642 }
a20f0ba5
JB
643 pool.enableTasksQueue(true)
644 expect(pool.opts.enableTasksQueue).toBe(true)
20c6f652
JB
645 expect(pool.opts.tasksQueueOptions).toStrictEqual({
646 concurrency: 1,
2324f8c9 647 size: Math.pow(numberOfWorkers, 2),
dbd73092 648 taskStealing: true,
47352846 649 tasksStealingOnBackPressure: true
20c6f652 650 })
d6ca1416
JB
651 for (const workerNode of pool.workerNodes) {
652 expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
653 expect(workerNode.onBackPressure).toBeInstanceOf(Function)
654 }
a20f0ba5
JB
655 pool.enableTasksQueue(true, { concurrency: 2 })
656 expect(pool.opts.enableTasksQueue).toBe(true)
20c6f652
JB
657 expect(pool.opts.tasksQueueOptions).toStrictEqual({
658 concurrency: 2,
2324f8c9 659 size: Math.pow(numberOfWorkers, 2),
dbd73092 660 taskStealing: true,
47352846 661 tasksStealingOnBackPressure: true
20c6f652 662 })
d6ca1416
JB
663 for (const workerNode of pool.workerNodes) {
664 expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
665 expect(workerNode.onBackPressure).toBeInstanceOf(Function)
666 }
a20f0ba5
JB
667 pool.enableTasksQueue(false)
668 expect(pool.opts.enableTasksQueue).toBe(false)
669 expect(pool.opts.tasksQueueOptions).toBeUndefined()
d6ca1416
JB
670 for (const workerNode of pool.workerNodes) {
671 expect(workerNode.onEmptyQueue).toBeUndefined()
672 expect(workerNode.onBackPressure).toBeUndefined()
673 }
a20f0ba5
JB
674 await pool.destroy()
675 })
676
2431bdb4 677 it('Verify that pool tasks queue options can be set', async () => {
a20f0ba5
JB
678 const pool = new FixedThreadPool(
679 numberOfWorkers,
680 './tests/worker-files/thread/testWorker.js',
681 { enableTasksQueue: true }
682 )
20c6f652
JB
683 expect(pool.opts.tasksQueueOptions).toStrictEqual({
684 concurrency: 1,
2324f8c9 685 size: Math.pow(numberOfWorkers, 2),
dbd73092 686 taskStealing: true,
47352846 687 tasksStealingOnBackPressure: true
20c6f652 688 })
d6ca1416 689 for (const workerNode of pool.workerNodes) {
2324f8c9
JB
690 expect(workerNode.tasksQueueBackPressureSize).toBe(
691 pool.opts.tasksQueueOptions.size
692 )
d6ca1416
JB
693 expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
694 expect(workerNode.onBackPressure).toBeInstanceOf(Function)
695 }
696 pool.setTasksQueueOptions({
697 concurrency: 2,
2324f8c9 698 size: 2,
d6ca1416
JB
699 taskStealing: false,
700 tasksStealingOnBackPressure: false
701 })
20c6f652
JB
702 expect(pool.opts.tasksQueueOptions).toStrictEqual({
703 concurrency: 2,
2324f8c9 704 size: 2,
d6ca1416
JB
705 taskStealing: false,
706 tasksStealingOnBackPressure: false
707 })
708 for (const workerNode of pool.workerNodes) {
2324f8c9
JB
709 expect(workerNode.tasksQueueBackPressureSize).toBe(
710 pool.opts.tasksQueueOptions.size
711 )
d6ca1416
JB
712 expect(workerNode.onEmptyQueue).toBeUndefined()
713 expect(workerNode.onBackPressure).toBeUndefined()
714 }
715 pool.setTasksQueueOptions({
716 concurrency: 1,
717 taskStealing: true,
718 tasksStealingOnBackPressure: true
719 })
720 expect(pool.opts.tasksQueueOptions).toStrictEqual({
721 concurrency: 1,
2324f8c9 722 size: Math.pow(numberOfWorkers, 2),
dbd73092 723 taskStealing: true,
47352846 724 tasksStealingOnBackPressure: true
20c6f652 725 })
d6ca1416 726 for (const workerNode of pool.workerNodes) {
2324f8c9
JB
727 expect(workerNode.tasksQueueBackPressureSize).toBe(
728 pool.opts.tasksQueueOptions.size
729 )
d6ca1416
JB
730 expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
731 expect(workerNode.onBackPressure).toBeInstanceOf(Function)
732 }
f0d7f803
JB
733 expect(() =>
734 pool.setTasksQueueOptions('invalidTasksQueueOptions')
8735b4e5
JB
735 ).toThrowError(
736 new TypeError('Invalid tasks queue options: must be a plain object')
737 )
a20f0ba5 738 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrowError(
e695d66f 739 new RangeError(
20c6f652 740 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
8735b4e5
JB
741 )
742 )
743 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrowError(
e695d66f 744 new RangeError(
20c6f652 745 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
8735b4e5 746 )
a20f0ba5 747 )
f0d7f803 748 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrowError(
20c6f652
JB
749 new TypeError('Invalid worker node tasks concurrency: must be an integer')
750 )
ff3f866a 751 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrowError(
20c6f652 752 new RangeError(
68dbcdc0 753 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
20c6f652
JB
754 )
755 )
ff3f866a 756 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrowError(
20c6f652 757 new RangeError(
68dbcdc0 758 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
20c6f652
JB
759 )
760 )
ff3f866a 761 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrowError(
68dbcdc0 762 new TypeError('Invalid worker node tasks queue size: must be an integer')
f0d7f803 763 )
a20f0ba5
JB
764 await pool.destroy()
765 })
766
6b27d407
JB
767 it('Verify that pool info is set', async () => {
768 let pool = new FixedThreadPool(
769 numberOfWorkers,
770 './tests/worker-files/thread/testWorker.js'
771 )
2dca6cab
JB
772 expect(pool.info).toStrictEqual({
773 version,
774 type: PoolTypes.fixed,
775 worker: WorkerTypes.thread,
47352846 776 started: true,
2dca6cab
JB
777 ready: true,
778 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
779 minSize: numberOfWorkers,
780 maxSize: numberOfWorkers,
781 workerNodes: numberOfWorkers,
782 idleWorkerNodes: numberOfWorkers,
783 busyWorkerNodes: 0,
784 executedTasks: 0,
785 executingTasks: 0,
2dca6cab
JB
786 failedTasks: 0
787 })
6b27d407
JB
788 await pool.destroy()
789 pool = new DynamicClusterPool(
2431bdb4 790 Math.floor(numberOfWorkers / 2),
6b27d407 791 numberOfWorkers,
ecdfbdc0 792 './tests/worker-files/cluster/testWorker.js'
6b27d407 793 )
2dca6cab
JB
794 expect(pool.info).toStrictEqual({
795 version,
796 type: PoolTypes.dynamic,
797 worker: WorkerTypes.cluster,
47352846 798 started: true,
2dca6cab
JB
799 ready: true,
800 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
801 minSize: Math.floor(numberOfWorkers / 2),
802 maxSize: numberOfWorkers,
803 workerNodes: Math.floor(numberOfWorkers / 2),
804 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
805 busyWorkerNodes: 0,
806 executedTasks: 0,
807 executingTasks: 0,
2dca6cab
JB
808 failedTasks: 0
809 })
6b27d407
JB
810 await pool.destroy()
811 })
812
2431bdb4 813 it('Verify that pool worker tasks usage are initialized', async () => {
bf9549ae
JB
814 const pool = new FixedClusterPool(
815 numberOfWorkers,
816 './tests/worker-files/cluster/testWorker.js'
817 )
f06e48d8 818 for (const workerNode of pool.workerNodes) {
47352846 819 expect(workerNode).toBeInstanceOf(WorkerNode)
465b2940 820 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
821 tasks: {
822 executed: 0,
823 executing: 0,
824 queued: 0,
df593701 825 maxQueued: 0,
68cbdc84 826 stolen: 0,
a4e07f72
JB
827 failed: 0
828 },
829 runTime: {
4ba4c7f9 830 history: new CircularArray()
a4e07f72
JB
831 },
832 waitTime: {
4ba4c7f9 833 history: new CircularArray()
a4e07f72 834 },
5df69fab
JB
835 elu: {
836 idle: {
4ba4c7f9 837 history: new CircularArray()
5df69fab
JB
838 },
839 active: {
4ba4c7f9 840 history: new CircularArray()
f7510105 841 }
5df69fab 842 }
86bf340d 843 })
f06e48d8
JB
844 }
845 await pool.destroy()
846 })
847
2431bdb4
JB
848 it('Verify that pool worker tasks queue are initialized', async () => {
849 let pool = new FixedClusterPool(
f06e48d8
JB
850 numberOfWorkers,
851 './tests/worker-files/cluster/testWorker.js'
852 )
853 for (const workerNode of pool.workerNodes) {
47352846 854 expect(workerNode).toBeInstanceOf(WorkerNode)
574b351d 855 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
4d8bf9e4 856 expect(workerNode.tasksQueue.size).toBe(0)
9c16fb4b 857 expect(workerNode.tasksQueue.maxSize).toBe(0)
bf9549ae 858 }
fd7ebd49 859 await pool.destroy()
2431bdb4
JB
860 pool = new DynamicThreadPool(
861 Math.floor(numberOfWorkers / 2),
862 numberOfWorkers,
863 './tests/worker-files/thread/testWorker.js'
864 )
865 for (const workerNode of pool.workerNodes) {
47352846 866 expect(workerNode).toBeInstanceOf(WorkerNode)
574b351d 867 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
2431bdb4
JB
868 expect(workerNode.tasksQueue.size).toBe(0)
869 expect(workerNode.tasksQueue.maxSize).toBe(0)
870 }
213cbac6 871 await pool.destroy()
2431bdb4
JB
872 })
873
874 it('Verify that pool worker info are initialized', async () => {
875 let pool = new FixedClusterPool(
876 numberOfWorkers,
877 './tests/worker-files/cluster/testWorker.js'
878 )
2dca6cab 879 for (const workerNode of pool.workerNodes) {
47352846 880 expect(workerNode).toBeInstanceOf(WorkerNode)
2dca6cab
JB
881 expect(workerNode.info).toStrictEqual({
882 id: expect.any(Number),
883 type: WorkerTypes.cluster,
884 dynamic: false,
885 ready: true
886 })
887 }
2431bdb4
JB
888 await pool.destroy()
889 pool = new DynamicThreadPool(
890 Math.floor(numberOfWorkers / 2),
891 numberOfWorkers,
892 './tests/worker-files/thread/testWorker.js'
893 )
2dca6cab 894 for (const workerNode of pool.workerNodes) {
47352846 895 expect(workerNode).toBeInstanceOf(WorkerNode)
2dca6cab
JB
896 expect(workerNode.info).toStrictEqual({
897 id: expect.any(Number),
898 type: WorkerTypes.thread,
899 dynamic: false,
7884d183 900 ready: true
2dca6cab
JB
901 })
902 }
213cbac6 903 await pool.destroy()
bf9549ae
JB
904 })
905
47352846
JB
906 it('Verify that pool can be started after initialization', async () => {
907 const pool = new FixedClusterPool(
908 numberOfWorkers,
909 './tests/worker-files/cluster/testWorker.js',
910 {
911 startWorkers: false
912 }
913 )
914 expect(pool.info.started).toBe(false)
915 expect(pool.info.ready).toBe(false)
916 expect(pool.workerNodes).toStrictEqual([])
917 await expect(pool.execute()).rejects.toThrowError(
918 new Error('Cannot execute a task on not started pool')
919 )
920 pool.start()
921 expect(pool.info.started).toBe(true)
922 expect(pool.info.ready).toBe(true)
923 expect(pool.workerNodes.length).toBe(numberOfWorkers)
924 for (const workerNode of pool.workerNodes) {
925 expect(workerNode).toBeInstanceOf(WorkerNode)
926 }
927 await pool.destroy()
928 })
929
9d2d0da1
JB
930 it('Verify that pool execute() arguments are checked', async () => {
931 const pool = new FixedClusterPool(
932 numberOfWorkers,
933 './tests/worker-files/cluster/testWorker.js'
934 )
935 await expect(pool.execute(undefined, 0)).rejects.toThrowError(
936 new TypeError('name argument must be a string')
937 )
938 await expect(pool.execute(undefined, '')).rejects.toThrowError(
939 new TypeError('name argument must not be an empty string')
940 )
941 await expect(pool.execute(undefined, undefined, {})).rejects.toThrowError(
942 new TypeError('transferList argument must be an array')
943 )
944 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
945 "Task function 'unknown' not found"
946 )
947 await pool.destroy()
47352846
JB
948 await expect(pool.execute()).rejects.toThrowError(
949 new Error('Cannot execute a task on not started pool')
9d2d0da1
JB
950 )
951 })
952
2431bdb4 953 it('Verify that pool worker tasks usage are computed', async () => {
bf9549ae
JB
954 const pool = new FixedClusterPool(
955 numberOfWorkers,
956 './tests/worker-files/cluster/testWorker.js'
957 )
09c2d0d3 958 const promises = new Set()
fc027381
JB
959 const maxMultiplier = 2
960 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
09c2d0d3 961 promises.add(pool.execute())
bf9549ae 962 }
f06e48d8 963 for (const workerNode of pool.workerNodes) {
465b2940 964 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
965 tasks: {
966 executed: 0,
967 executing: maxMultiplier,
968 queued: 0,
df593701 969 maxQueued: 0,
68cbdc84 970 stolen: 0,
a4e07f72
JB
971 failed: 0
972 },
973 runTime: {
a4e07f72
JB
974 history: expect.any(CircularArray)
975 },
976 waitTime: {
a4e07f72
JB
977 history: expect.any(CircularArray)
978 },
5df69fab
JB
979 elu: {
980 idle: {
5df69fab
JB
981 history: expect.any(CircularArray)
982 },
983 active: {
5df69fab 984 history: expect.any(CircularArray)
f7510105 985 }
5df69fab 986 }
86bf340d 987 })
bf9549ae
JB
988 }
989 await Promise.all(promises)
f06e48d8 990 for (const workerNode of pool.workerNodes) {
465b2940 991 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
992 tasks: {
993 executed: maxMultiplier,
994 executing: 0,
995 queued: 0,
df593701 996 maxQueued: 0,
68cbdc84 997 stolen: 0,
a4e07f72
JB
998 failed: 0
999 },
1000 runTime: {
a4e07f72
JB
1001 history: expect.any(CircularArray)
1002 },
1003 waitTime: {
a4e07f72
JB
1004 history: expect.any(CircularArray)
1005 },
5df69fab
JB
1006 elu: {
1007 idle: {
5df69fab
JB
1008 history: expect.any(CircularArray)
1009 },
1010 active: {
5df69fab 1011 history: expect.any(CircularArray)
f7510105 1012 }
5df69fab 1013 }
86bf340d 1014 })
bf9549ae 1015 }
fd7ebd49 1016 await pool.destroy()
bf9549ae
JB
1017 })
1018
2431bdb4 1019 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
7fd82a1c 1020 const pool = new DynamicThreadPool(
2431bdb4 1021 Math.floor(numberOfWorkers / 2),
8f4878b7 1022 numberOfWorkers,
9e619829
JB
1023 './tests/worker-files/thread/testWorker.js'
1024 )
09c2d0d3 1025 const promises = new Set()
ee9f5295
JB
1026 const maxMultiplier = 2
1027 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
09c2d0d3 1028 promises.add(pool.execute())
9e619829
JB
1029 }
1030 await Promise.all(promises)
f06e48d8 1031 for (const workerNode of pool.workerNodes) {
465b2940 1032 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
1033 tasks: {
1034 executed: expect.any(Number),
1035 executing: 0,
1036 queued: 0,
df593701 1037 maxQueued: 0,
68cbdc84 1038 stolen: 0,
a4e07f72
JB
1039 failed: 0
1040 },
1041 runTime: {
a4e07f72
JB
1042 history: expect.any(CircularArray)
1043 },
1044 waitTime: {
a4e07f72
JB
1045 history: expect.any(CircularArray)
1046 },
5df69fab
JB
1047 elu: {
1048 idle: {
5df69fab
JB
1049 history: expect.any(CircularArray)
1050 },
1051 active: {
5df69fab 1052 history: expect.any(CircularArray)
f7510105 1053 }
5df69fab 1054 }
86bf340d 1055 })
465b2940 1056 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
94407def
JB
1057 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1058 numberOfWorkers * maxMultiplier
1059 )
b97d82d8
JB
1060 expect(workerNode.usage.runTime.history.length).toBe(0)
1061 expect(workerNode.usage.waitTime.history.length).toBe(0)
1062 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1063 expect(workerNode.usage.elu.active.history.length).toBe(0)
9e619829
JB
1064 }
1065 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
f06e48d8 1066 for (const workerNode of pool.workerNodes) {
465b2940 1067 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
1068 tasks: {
1069 executed: 0,
1070 executing: 0,
1071 queued: 0,
df593701 1072 maxQueued: 0,
68cbdc84 1073 stolen: 0,
a4e07f72
JB
1074 failed: 0
1075 },
1076 runTime: {
a4e07f72
JB
1077 history: expect.any(CircularArray)
1078 },
1079 waitTime: {
a4e07f72
JB
1080 history: expect.any(CircularArray)
1081 },
5df69fab
JB
1082 elu: {
1083 idle: {
5df69fab
JB
1084 history: expect.any(CircularArray)
1085 },
1086 active: {
5df69fab 1087 history: expect.any(CircularArray)
f7510105 1088 }
5df69fab 1089 }
86bf340d 1090 })
465b2940
JB
1091 expect(workerNode.usage.runTime.history.length).toBe(0)
1092 expect(workerNode.usage.waitTime.history.length).toBe(0)
b97d82d8
JB
1093 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1094 expect(workerNode.usage.elu.active.history.length).toBe(0)
ee11a4a2 1095 }
fd7ebd49 1096 await pool.destroy()
ee11a4a2
JB
1097 })
1098
a1763c54
JB
1099 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1100 const pool = new DynamicClusterPool(
2431bdb4 1101 Math.floor(numberOfWorkers / 2),
164d950a 1102 numberOfWorkers,
a1763c54 1103 './tests/worker-files/cluster/testWorker.js'
164d950a 1104 )
c726f66c 1105 expect(pool.emitter.eventNames()).toStrictEqual([])
d46660cd 1106 let poolInfo
a1763c54 1107 let poolReady = 0
041dc05b 1108 pool.emitter.on(PoolEvents.ready, info => {
a1763c54 1109 ++poolReady
d46660cd
JB
1110 poolInfo = info
1111 })
a1763c54 1112 await waitPoolEvents(pool, PoolEvents.ready, 1)
c726f66c 1113 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
a1763c54 1114 expect(poolReady).toBe(1)
d46660cd 1115 expect(poolInfo).toStrictEqual({
23ccf9d7 1116 version,
d46660cd 1117 type: PoolTypes.dynamic,
a1763c54 1118 worker: WorkerTypes.cluster,
47352846 1119 started: true,
a1763c54 1120 ready: true,
2431bdb4
JB
1121 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1122 minSize: expect.any(Number),
1123 maxSize: expect.any(Number),
1124 workerNodes: expect.any(Number),
1125 idleWorkerNodes: expect.any(Number),
1126 busyWorkerNodes: expect.any(Number),
1127 executedTasks: expect.any(Number),
1128 executingTasks: expect.any(Number),
2431bdb4
JB
1129 failedTasks: expect.any(Number)
1130 })
1131 await pool.destroy()
1132 })
1133
a1763c54
JB
1134 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1135 const pool = new FixedThreadPool(
2431bdb4 1136 numberOfWorkers,
a1763c54 1137 './tests/worker-files/thread/testWorker.js'
2431bdb4 1138 )
c726f66c 1139 expect(pool.emitter.eventNames()).toStrictEqual([])
a1763c54
JB
1140 const promises = new Set()
1141 let poolBusy = 0
2431bdb4 1142 let poolInfo
041dc05b 1143 pool.emitter.on(PoolEvents.busy, info => {
a1763c54 1144 ++poolBusy
2431bdb4
JB
1145 poolInfo = info
1146 })
c726f66c 1147 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
a1763c54
JB
1148 for (let i = 0; i < numberOfWorkers * 2; i++) {
1149 promises.add(pool.execute())
1150 }
1151 await Promise.all(promises)
1152 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1153 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1154 expect(poolBusy).toBe(numberOfWorkers + 1)
2431bdb4
JB
1155 expect(poolInfo).toStrictEqual({
1156 version,
a1763c54
JB
1157 type: PoolTypes.fixed,
1158 worker: WorkerTypes.thread,
47352846
JB
1159 started: true,
1160 ready: true,
2431bdb4 1161 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
d46660cd
JB
1162 minSize: expect.any(Number),
1163 maxSize: expect.any(Number),
1164 workerNodes: expect.any(Number),
1165 idleWorkerNodes: expect.any(Number),
1166 busyWorkerNodes: expect.any(Number),
a4e07f72
JB
1167 executedTasks: expect.any(Number),
1168 executingTasks: expect.any(Number),
a4e07f72 1169 failedTasks: expect.any(Number)
d46660cd 1170 })
164d950a
JB
1171 await pool.destroy()
1172 })
1173
a1763c54
JB
1174 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1175 const pool = new DynamicThreadPool(
1176 Math.floor(numberOfWorkers / 2),
7c0ba920
JB
1177 numberOfWorkers,
1178 './tests/worker-files/thread/testWorker.js'
1179 )
c726f66c 1180 expect(pool.emitter.eventNames()).toStrictEqual([])
09c2d0d3 1181 const promises = new Set()
a1763c54 1182 let poolFull = 0
d46660cd 1183 let poolInfo
041dc05b 1184 pool.emitter.on(PoolEvents.full, info => {
a1763c54 1185 ++poolFull
d46660cd
JB
1186 poolInfo = info
1187 })
c726f66c 1188 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.full])
7c0ba920 1189 for (let i = 0; i < numberOfWorkers * 2; i++) {
f5d14e90 1190 promises.add(pool.execute())
7c0ba920 1191 }
cf597bc5 1192 await Promise.all(promises)
33e6bb4c 1193 expect(poolFull).toBe(1)
d46660cd 1194 expect(poolInfo).toStrictEqual({
23ccf9d7 1195 version,
a1763c54 1196 type: PoolTypes.dynamic,
d46660cd 1197 worker: WorkerTypes.thread,
47352846
JB
1198 started: true,
1199 ready: true,
2431bdb4 1200 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
8735b4e5
JB
1201 minSize: expect.any(Number),
1202 maxSize: expect.any(Number),
1203 workerNodes: expect.any(Number),
1204 idleWorkerNodes: expect.any(Number),
1205 busyWorkerNodes: expect.any(Number),
1206 executedTasks: expect.any(Number),
1207 executingTasks: expect.any(Number),
1208 failedTasks: expect.any(Number)
1209 })
1210 await pool.destroy()
1211 })
1212
3e8611a8 1213 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
b1aae695 1214 const pool = new FixedThreadPool(
8735b4e5
JB
1215 numberOfWorkers,
1216 './tests/worker-files/thread/testWorker.js',
1217 {
1218 enableTasksQueue: true
1219 }
1220 )
3e8611a8 1221 sinon.stub(pool, 'hasBackPressure').returns(true)
c726f66c 1222 expect(pool.emitter.eventNames()).toStrictEqual([])
8735b4e5
JB
1223 const promises = new Set()
1224 let poolBackPressure = 0
1225 let poolInfo
041dc05b 1226 pool.emitter.on(PoolEvents.backPressure, info => {
8735b4e5
JB
1227 ++poolBackPressure
1228 poolInfo = info
1229 })
c726f66c 1230 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.backPressure])
033f1776 1231 for (let i = 0; i < numberOfWorkers + 1; i++) {
8735b4e5
JB
1232 promises.add(pool.execute())
1233 }
1234 await Promise.all(promises)
033f1776 1235 expect(poolBackPressure).toBe(1)
8735b4e5
JB
1236 expect(poolInfo).toStrictEqual({
1237 version,
3e8611a8 1238 type: PoolTypes.fixed,
8735b4e5 1239 worker: WorkerTypes.thread,
47352846
JB
1240 started: true,
1241 ready: true,
8735b4e5 1242 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
d46660cd
JB
1243 minSize: expect.any(Number),
1244 maxSize: expect.any(Number),
1245 workerNodes: expect.any(Number),
1246 idleWorkerNodes: expect.any(Number),
1247 busyWorkerNodes: expect.any(Number),
a4e07f72
JB
1248 executedTasks: expect.any(Number),
1249 executingTasks: expect.any(Number),
3e8611a8
JB
1250 maxQueuedTasks: expect.any(Number),
1251 queuedTasks: expect.any(Number),
1252 backPressure: true,
68cbdc84 1253 stolenTasks: expect.any(Number),
a4e07f72 1254 failedTasks: expect.any(Number)
d46660cd 1255 })
3e8611a8 1256 expect(pool.hasBackPressure.called).toBe(true)
fd7ebd49 1257 await pool.destroy()
7c0ba920 1258 })
70a4f5ea 1259
9eae3c69
JB
1260 it('Verify that hasTaskFunction() is working', async () => {
1261 const dynamicThreadPool = new DynamicThreadPool(
1262 Math.floor(numberOfWorkers / 2),
1263 numberOfWorkers,
1264 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.js'
1265 )
1266 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1267 expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1268 expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1269 true
1270 )
1271 expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true)
1272 expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true)
1273 expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false)
1274 await dynamicThreadPool.destroy()
1275 const fixedClusterPool = new FixedClusterPool(
1276 numberOfWorkers,
1277 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1278 )
1279 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1280 expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1281 expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1282 true
1283 )
1284 expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true)
1285 expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true)
1286 expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false)
1287 await fixedClusterPool.destroy()
1288 })
1289
1290 it('Verify that addTaskFunction() is working', async () => {
1291 const dynamicThreadPool = new DynamicThreadPool(
1292 Math.floor(numberOfWorkers / 2),
1293 numberOfWorkers,
1294 './tests/worker-files/thread/testWorker.js'
1295 )
1296 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
3feeab69
JB
1297 await expect(
1298 dynamicThreadPool.addTaskFunction(0, () => {})
1299 ).rejects.toThrowError(new TypeError('name argument must be a string'))
1300 await expect(
1301 dynamicThreadPool.addTaskFunction('', () => {})
1302 ).rejects.toThrowError(
1303 new TypeError('name argument must not be an empty string')
1304 )
1305 await expect(
1306 dynamicThreadPool.addTaskFunction('test', 0)
1307 ).rejects.toThrowError(new TypeError('fn argument must be a function'))
1308 await expect(
1309 dynamicThreadPool.addTaskFunction('test', '')
1310 ).rejects.toThrowError(new TypeError('fn argument must be a function'))
9eae3c69
JB
1311 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1312 DEFAULT_TASK_NAME,
1313 'test'
1314 ])
1315 const echoTaskFunction = data => {
1316 return data
1317 }
1318 await expect(
1319 dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1320 ).resolves.toBe(true)
1321 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1322 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1323 echoTaskFunction
1324 )
1325 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1326 DEFAULT_TASK_NAME,
1327 'test',
1328 'echo'
1329 ])
1330 const taskFunctionData = { test: 'test' }
1331 const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
1332 expect(echoResult).toStrictEqual(taskFunctionData)
adee6053
JB
1333 for (const workerNode of dynamicThreadPool.workerNodes) {
1334 expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
1335 tasks: {
1336 executed: expect.any(Number),
1337 executing: 0,
1338 queued: 0,
1339 stolen: 0,
1340 failed: 0
1341 },
1342 runTime: {
1343 history: new CircularArray()
1344 },
1345 waitTime: {
1346 history: new CircularArray()
1347 },
1348 elu: {
1349 idle: {
1350 history: new CircularArray()
1351 },
1352 active: {
1353 history: new CircularArray()
1354 }
1355 }
1356 })
1357 }
9eae3c69
JB
1358 await dynamicThreadPool.destroy()
1359 })
1360
1361 it('Verify that removeTaskFunction() is working', async () => {
1362 const dynamicThreadPool = new DynamicThreadPool(
1363 Math.floor(numberOfWorkers / 2),
1364 numberOfWorkers,
1365 './tests/worker-files/thread/testWorker.js'
1366 )
1367 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1368 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1369 DEFAULT_TASK_NAME,
1370 'test'
1371 ])
1372 await expect(
1373 dynamicThreadPool.removeTaskFunction('test')
1374 ).rejects.toThrowError(
16248b23 1375 new Error('Cannot remove a task function not handled on the pool side')
9eae3c69
JB
1376 )
1377 const echoTaskFunction = data => {
1378 return data
1379 }
1380 await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1381 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1382 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1383 echoTaskFunction
1384 )
1385 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1386 DEFAULT_TASK_NAME,
1387 'test',
1388 'echo'
1389 ])
1390 await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
1391 true
1392 )
1393 expect(dynamicThreadPool.taskFunctions.size).toBe(0)
1394 expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
1395 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1396 DEFAULT_TASK_NAME,
1397 'test'
1398 ])
1399 await dynamicThreadPool.destroy()
1400 })
1401
30500265 1402 it('Verify that listTaskFunctionNames() is working', async () => {
90d7d101
JB
1403 const dynamicThreadPool = new DynamicThreadPool(
1404 Math.floor(numberOfWorkers / 2),
1405 numberOfWorkers,
1406 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.js'
1407 )
1408 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
66979634 1409 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
6cd5248f 1410 DEFAULT_TASK_NAME,
90d7d101
JB
1411 'jsonIntegerSerialization',
1412 'factorial',
1413 'fibonacci'
1414 ])
9eae3c69 1415 await dynamicThreadPool.destroy()
90d7d101
JB
1416 const fixedClusterPool = new FixedClusterPool(
1417 numberOfWorkers,
1418 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1419 )
1420 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
66979634 1421 expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([
6cd5248f 1422 DEFAULT_TASK_NAME,
90d7d101
JB
1423 'jsonIntegerSerialization',
1424 'factorial',
1425 'fibonacci'
1426 ])
0fe39c97 1427 await fixedClusterPool.destroy()
90d7d101
JB
1428 })
1429
9eae3c69 1430 it('Verify that setDefaultTaskFunction() is working', async () => {
30500265
JB
1431 const dynamicThreadPool = new DynamicThreadPool(
1432 Math.floor(numberOfWorkers / 2),
1433 numberOfWorkers,
1434 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.js'
1435 )
1436 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
b0b55f57
JB
1437 await expect(
1438 dynamicThreadPool.setDefaultTaskFunction(0)
1439 ).rejects.toThrowError(
1440 new Error(
1441 "Task function operation 'default' failed on worker 31 with error: 'TypeError: name parameter is not a string'"
1442 )
1443 )
1444 await expect(
1445 dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
1446 ).rejects.toThrowError(
1447 new Error(
1448 "Task function operation 'default' failed on worker 31 with error: 'Error: Cannot set the default task function reserved name as the default task function'"
1449 )
1450 )
1451 await expect(
1452 dynamicThreadPool.setDefaultTaskFunction('unknown')
1453 ).rejects.toThrowError(
1454 new Error(
1455 "Task function operation 'default' failed on worker 31 with error: 'Error: Cannot set the default task function to a non-existing task function'"
1456 )
1457 )
9eae3c69
JB
1458 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1459 DEFAULT_TASK_NAME,
1460 'jsonIntegerSerialization',
1461 'factorial',
1462 'fibonacci'
1463 ])
1464 await expect(
1465 dynamicThreadPool.setDefaultTaskFunction('factorial')
1466 ).resolves.toBe(true)
1467 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1468 DEFAULT_TASK_NAME,
1469 'factorial',
1470 'jsonIntegerSerialization',
1471 'fibonacci'
1472 ])
1473 await expect(
1474 dynamicThreadPool.setDefaultTaskFunction('fibonacci')
1475 ).resolves.toBe(true)
1476 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1477 DEFAULT_TASK_NAME,
1478 'fibonacci',
1479 'jsonIntegerSerialization',
1480 'factorial'
1481 ])
30500265
JB
1482 })
1483
90d7d101 1484 it('Verify that multiple task functions worker is working', async () => {
70a4f5ea 1485 const pool = new DynamicClusterPool(
2431bdb4 1486 Math.floor(numberOfWorkers / 2),
70a4f5ea 1487 numberOfWorkers,
90d7d101 1488 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
70a4f5ea
JB
1489 )
1490 const data = { n: 10 }
82888165 1491 const result0 = await pool.execute(data)
30b963d4 1492 expect(result0).toStrictEqual({ ok: 1 })
70a4f5ea 1493 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
30b963d4 1494 expect(result1).toStrictEqual({ ok: 1 })
70a4f5ea
JB
1495 const result2 = await pool.execute(data, 'factorial')
1496 expect(result2).toBe(3628800)
1497 const result3 = await pool.execute(data, 'fibonacci')
024daf59 1498 expect(result3).toBe(55)
5bb5be17
JB
1499 expect(pool.info.executingTasks).toBe(0)
1500 expect(pool.info.executedTasks).toBe(4)
b414b84c 1501 for (const workerNode of pool.workerNodes) {
66979634 1502 expect(workerNode.info.taskFunctionNames).toStrictEqual([
6cd5248f 1503 DEFAULT_TASK_NAME,
b414b84c
JB
1504 'jsonIntegerSerialization',
1505 'factorial',
1506 'fibonacci'
1507 ])
1508 expect(workerNode.taskFunctionsUsage.size).toBe(3)
66979634 1509 for (const name of pool.listTaskFunctionNames()) {
5bb5be17
JB
1510 expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
1511 tasks: {
1512 executed: expect.any(Number),
4ba4c7f9 1513 executing: 0,
5bb5be17 1514 failed: 0,
68cbdc84
JB
1515 queued: 0,
1516 stolen: 0
5bb5be17
JB
1517 },
1518 runTime: {
1519 history: expect.any(CircularArray)
1520 },
1521 waitTime: {
1522 history: expect.any(CircularArray)
1523 },
1524 elu: {
1525 idle: {
1526 history: expect.any(CircularArray)
1527 },
1528 active: {
1529 history: expect.any(CircularArray)
1530 }
1531 }
1532 })
1533 expect(
4ba4c7f9
JB
1534 workerNode.getTaskFunctionWorkerUsage(name).tasks.executed
1535 ).toBeGreaterThan(0)
5bb5be17 1536 }
dfd7ec01
JB
1537 expect(
1538 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1539 ).toStrictEqual(
66979634
JB
1540 workerNode.getTaskFunctionWorkerUsage(
1541 workerNode.info.taskFunctionNames[1]
1542 )
dfd7ec01 1543 )
5bb5be17 1544 }
0fe39c97 1545 await pool.destroy()
70a4f5ea 1546 })
52a23942
JB
1547
1548 it('Verify sendKillMessageToWorker()', async () => {
1549 const pool = new DynamicClusterPool(
1550 Math.floor(numberOfWorkers / 2),
1551 numberOfWorkers,
1552 './tests/worker-files/cluster/testWorker.js'
1553 )
1554 const workerNodeKey = 0
1555 await expect(
adee6053 1556 pool.sendKillMessageToWorker(workerNodeKey)
52a23942
JB
1557 ).resolves.toBeUndefined()
1558 await pool.destroy()
1559 })
adee6053
JB
1560
1561 it('Verify sendTaskFunctionOperationToWorker()', async () => {
1562 const pool = new DynamicClusterPool(
1563 Math.floor(numberOfWorkers / 2),
1564 numberOfWorkers,
1565 './tests/worker-files/cluster/testWorker.js'
1566 )
1567 const workerNodeKey = 0
1568 await expect(
1569 pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
1570 taskFunctionOperation: 'add',
1571 taskFunctionName: 'empty',
1572 taskFunction: (() => {}).toString()
1573 })
1574 ).resolves.toBe(true)
1575 expect(
1576 pool.workerNodes[workerNodeKey].info.taskFunctionNames
1577 ).toStrictEqual([DEFAULT_TASK_NAME, 'test', 'empty'])
1578 await pool.destroy()
1579 })
1580
1581 it('Verify sendTaskFunctionOperationToWorkers()', async () => {
1582 const pool = new DynamicClusterPool(
1583 Math.floor(numberOfWorkers / 2),
1584 numberOfWorkers,
1585 './tests/worker-files/cluster/testWorker.js'
1586 )
adee6053
JB
1587 await expect(
1588 pool.sendTaskFunctionOperationToWorkers({
1589 taskFunctionOperation: 'add',
1590 taskFunctionName: 'empty',
1591 taskFunction: (() => {}).toString()
1592 })
1593 ).resolves.toBe(true)
1594 for (const workerNode of pool.workerNodes) {
1595 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1596 DEFAULT_TASK_NAME,
1597 'test',
1598 'empty'
1599 ])
1600 }
1601 await pool.destroy()
1602 })
3ec964d6 1603})