feat: change pool event emitter to event emitter async resource
[poolifier.git] / tests / pools / abstract-pool.test.js
CommitLineData
b5604034 1const { EventEmitterAsyncResource } = require('node:events')
a61a0724 2const { expect } = require('expect')
b1aae695 3const sinon = require('sinon')
e843b904 4const {
70a4f5ea 5 DynamicClusterPool,
9e619829 6 DynamicThreadPool,
aee46736 7 FixedClusterPool,
e843b904 8 FixedThreadPool,
aee46736 9 PoolEvents,
184855e6 10 PoolTypes,
3d6dd312 11 WorkerChoiceStrategies,
184855e6 12 WorkerTypes
bfc75cca
JB
13} = require('../../lib')
14const { CircularArray } = require('../../lib/circular-array')
15const { Deque } = require('../../lib/deque')
16const { DEFAULT_TASK_NAME } = require('../../lib/utils')
17const { version } = require('../../package.json')
18const { waitPoolEvents } = require('../test-utils')
19const { WorkerNode } = require('../../lib/pools/worker-node')
e1ffb94f
JB
20
21describe('Abstract pool test suite', () => {
fc027381 22 const numberOfWorkers = 2
a8884ffd 23 class StubPoolWithIsMain extends FixedThreadPool {
e1ffb94f
JB
24 isMain () {
25 return false
26 }
3ec964d6 27 }
3ec964d6 28
dc021bcc
JB
29 afterEach(() => {
30 sinon.restore()
31 })
32
3ec964d6 33 it('Simulate pool creation from a non main thread/process', () => {
8d3782fa
JB
34 expect(
35 () =>
a8884ffd 36 new StubPoolWithIsMain(
7c0ba920 37 numberOfWorkers,
8d3782fa
JB
38 './tests/worker-files/thread/testWorker.js',
39 {
041dc05b 40 errorHandler: e => console.error(e)
8d3782fa
JB
41 }
42 )
04f45163 43 ).toThrowError(
e695d66f
JB
44 new Error(
45 'Cannot start a pool from a worker with the same type as the pool'
46 )
04f45163 47 )
3ec964d6 48 })
c510fea7 49
bc61cfe6
JB
50 it('Verify that pool statuses properties are set', async () => {
51 const pool = new FixedThreadPool(
52 numberOfWorkers,
53 './tests/worker-files/thread/testWorker.js'
54 )
55 expect(pool.starting).toBe(false)
56 expect(pool.started).toBe(true)
57 await pool.destroy()
bc61cfe6
JB
58 })
59
c510fea7 60 it('Verify that filePath is checked', () => {
292ad316
JB
61 const expectedError = new Error(
62 'Please specify a file with a worker implementation'
63 )
7c0ba920 64 expect(() => new FixedThreadPool(numberOfWorkers)).toThrowError(
292ad316 65 expectedError
8d3782fa 66 )
7c0ba920 67 expect(() => new FixedThreadPool(numberOfWorkers, '')).toThrowError(
292ad316 68 expectedError
8d3782fa 69 )
3d6dd312
JB
70 expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrowError(
71 expectedError
72 )
73 expect(() => new FixedThreadPool(numberOfWorkers, true)).toThrowError(
74 expectedError
75 )
76 expect(
77 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
78 ).toThrowError(new Error("Cannot find the worker file './dummyWorker.ts'"))
8d3782fa
JB
79 })
80
81 it('Verify that numberOfWorkers is checked', () => {
8003c026
JB
82 expect(
83 () =>
84 new FixedThreadPool(
85 undefined,
86 './tests/worker-files/thread/testWorker.js'
87 )
88 ).toThrowError(
e695d66f
JB
89 new Error(
90 'Cannot instantiate a pool without specifying the number of workers'
91 )
8d3782fa
JB
92 )
93 })
94
95 it('Verify that a negative number of workers is checked', () => {
96 expect(
97 () =>
98 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
99 ).toThrowError(
473c717a
JB
100 new RangeError(
101 'Cannot instantiate a pool with a negative number of workers'
102 )
8d3782fa
JB
103 )
104 })
105
106 it('Verify that a non integer number of workers is checked', () => {
107 expect(
108 () =>
109 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
110 ).toThrowError(
473c717a 111 new TypeError(
0d80593b 112 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
113 )
114 )
c510fea7 115 })
7c0ba920 116
216541b6 117 it('Verify that dynamic pool sizing is checked', () => {
a5ed75b7
JB
118 expect(
119 () =>
120 new DynamicClusterPool(
121 1,
122 undefined,
123 './tests/worker-files/cluster/testWorker.js'
124 )
125 ).toThrowError(
126 new TypeError(
127 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
128 )
129 )
2761efb4
JB
130 expect(
131 () =>
132 new DynamicThreadPool(
133 0.5,
134 1,
135 './tests/worker-files/thread/testWorker.js'
136 )
137 ).toThrowError(
138 new TypeError(
139 'Cannot instantiate a pool with a non safe integer number of workers'
140 )
141 )
142 expect(
143 () =>
144 new DynamicClusterPool(
145 0,
146 0.5,
a5ed75b7 147 './tests/worker-files/cluster/testWorker.js'
2761efb4
JB
148 )
149 ).toThrowError(
150 new TypeError(
151 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
152 )
153 )
2431bdb4
JB
154 expect(
155 () =>
156 new DynamicThreadPool(2, 1, './tests/worker-files/thread/testWorker.js')
157 ).toThrowError(
158 new RangeError(
159 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
160 )
161 )
162 expect(
163 () =>
213cbac6 164 new DynamicThreadPool(0, 0, './tests/worker-files/thread/testWorker.js')
2431bdb4
JB
165 ).toThrowError(
166 new RangeError(
213cbac6 167 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
2431bdb4
JB
168 )
169 )
21f710aa
JB
170 expect(
171 () =>
213cbac6
JB
172 new DynamicClusterPool(
173 1,
174 1,
175 './tests/worker-files/cluster/testWorker.js'
176 )
21f710aa
JB
177 ).toThrowError(
178 new RangeError(
213cbac6 179 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
21f710aa
JB
180 )
181 )
2431bdb4
JB
182 })
183
fd7ebd49 184 it('Verify that pool options are checked', async () => {
7c0ba920
JB
185 let pool = new FixedThreadPool(
186 numberOfWorkers,
187 './tests/worker-files/thread/testWorker.js'
188 )
b5604034 189 expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
47352846
JB
190 expect(pool.opts).toStrictEqual({
191 startWorkers: true,
192 enableEvents: true,
193 restartWorkerOnError: true,
194 enableTasksQueue: false,
195 workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
196 workerChoiceStrategyOptions: {
197 retries: 6,
198 runTime: { median: false },
199 waitTime: { median: false },
200 elu: { median: false }
201 }
8990357d
JB
202 })
203 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
8c0b113f 204 retries: 6,
932fc8be 205 runTime: { median: false },
5df69fab
JB
206 waitTime: { median: false },
207 elu: { median: false }
da309861 208 })
999ef664
JB
209 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
210 .workerChoiceStrategies) {
211 expect(workerChoiceStrategy.opts).toStrictEqual({
212 retries: 6,
213 runTime: { median: false },
214 waitTime: { median: false },
215 elu: { median: false }
216 })
217 }
fd7ebd49 218 await pool.destroy()
73bfd59d 219 const testHandler = () => console.info('test handler executed')
7c0ba920
JB
220 pool = new FixedThreadPool(
221 numberOfWorkers,
222 './tests/worker-files/thread/testWorker.js',
223 {
e4543b14 224 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
49be33fe 225 workerChoiceStrategyOptions: {
932fc8be 226 runTime: { median: true },
fc027381 227 weights: { 0: 300, 1: 200 }
49be33fe 228 },
35cf1c03 229 enableEvents: false,
1f68cede 230 restartWorkerOnError: false,
ff733df7 231 enableTasksQueue: true,
d4aeae5a 232 tasksQueueOptions: { concurrency: 2 },
35cf1c03
JB
233 messageHandler: testHandler,
234 errorHandler: testHandler,
235 onlineHandler: testHandler,
236 exitHandler: testHandler
7c0ba920
JB
237 }
238 )
7c0ba920 239 expect(pool.emitter).toBeUndefined()
47352846
JB
240 expect(pool.opts).toStrictEqual({
241 startWorkers: true,
242 enableEvents: false,
243 restartWorkerOnError: false,
244 enableTasksQueue: true,
245 tasksQueueOptions: {
246 concurrency: 2,
2324f8c9 247 size: Math.pow(numberOfWorkers, 2),
dbd73092 248 taskStealing: true,
47352846
JB
249 tasksStealingOnBackPressure: true
250 },
251 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
252 workerChoiceStrategyOptions: {
253 retries: 6,
254 runTime: { median: true },
255 waitTime: { median: false },
256 elu: { median: false },
257 weights: { 0: 300, 1: 200 }
258 },
259 onlineHandler: testHandler,
260 messageHandler: testHandler,
261 errorHandler: testHandler,
262 exitHandler: testHandler
8990357d
JB
263 })
264 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
8c0b113f 265 retries: 6,
8990357d
JB
266 runTime: { median: true },
267 waitTime: { median: false },
268 elu: { median: false },
fc027381 269 weights: { 0: 300, 1: 200 }
da309861 270 })
999ef664
JB
271 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
272 .workerChoiceStrategies) {
273 expect(workerChoiceStrategy.opts).toStrictEqual({
274 retries: 6,
275 runTime: { median: true },
276 waitTime: { median: false },
277 elu: { median: false },
278 weights: { 0: 300, 1: 200 }
279 })
280 }
fd7ebd49 281 await pool.destroy()
7c0ba920
JB
282 })
283
a20f0ba5 284 it('Verify that pool options are validated', async () => {
d4aeae5a
JB
285 expect(
286 () =>
287 new FixedThreadPool(
288 numberOfWorkers,
289 './tests/worker-files/thread/testWorker.js',
290 {
f0d7f803 291 workerChoiceStrategy: 'invalidStrategy'
d4aeae5a
JB
292 }
293 )
8735b4e5
JB
294 ).toThrowError(
295 new Error("Invalid worker choice strategy 'invalidStrategy'")
296 )
7e653ee0
JB
297 expect(
298 () =>
299 new FixedThreadPool(
300 numberOfWorkers,
301 './tests/worker-files/thread/testWorker.js',
302 {
303 workerChoiceStrategyOptions: {
8c0b113f 304 retries: 'invalidChoiceRetries'
7e653ee0
JB
305 }
306 }
307 )
308 ).toThrowError(
309 new TypeError(
8c0b113f 310 'Invalid worker choice strategy options: retries must be an integer'
7e653ee0
JB
311 )
312 )
313 expect(
314 () =>
315 new FixedThreadPool(
316 numberOfWorkers,
317 './tests/worker-files/thread/testWorker.js',
318 {
319 workerChoiceStrategyOptions: {
8c0b113f 320 retries: -1
7e653ee0
JB
321 }
322 }
323 )
324 ).toThrowError(
325 new RangeError(
8c0b113f 326 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
7e653ee0
JB
327 )
328 )
49be33fe
JB
329 expect(
330 () =>
331 new FixedThreadPool(
332 numberOfWorkers,
333 './tests/worker-files/thread/testWorker.js',
334 {
335 workerChoiceStrategyOptions: { weights: {} }
336 }
337 )
338 ).toThrowError(
8735b4e5
JB
339 new Error(
340 'Invalid worker choice strategy options: must have a weight for each worker node'
341 )
49be33fe 342 )
f0d7f803
JB
343 expect(
344 () =>
345 new FixedThreadPool(
346 numberOfWorkers,
347 './tests/worker-files/thread/testWorker.js',
348 {
349 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
350 }
351 )
352 ).toThrowError(
8735b4e5
JB
353 new Error(
354 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
355 )
f0d7f803 356 )
5c4c2dee
JB
357 expect(
358 () =>
359 new FixedThreadPool(
360 numberOfWorkers,
361 './tests/worker-files/thread/testWorker.js',
362 {
363 enableTasksQueue: true,
364 tasksQueueOptions: 'invalidTasksQueueOptions'
365 }
366 )
367 ).toThrowError(
368 new TypeError('Invalid tasks queue options: must be a plain object')
369 )
f0d7f803
JB
370 expect(
371 () =>
372 new FixedThreadPool(
373 numberOfWorkers,
374 './tests/worker-files/thread/testWorker.js',
375 {
376 enableTasksQueue: true,
377 tasksQueueOptions: { concurrency: 0 }
378 }
379 )
8735b4e5 380 ).toThrowError(
e695d66f 381 new RangeError(
20c6f652 382 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
8735b4e5
JB
383 )
384 )
f0d7f803
JB
385 expect(
386 () =>
387 new FixedThreadPool(
388 numberOfWorkers,
389 './tests/worker-files/thread/testWorker.js',
390 {
391 enableTasksQueue: true,
5c4c2dee 392 tasksQueueOptions: { concurrency: -1 }
f0d7f803
JB
393 }
394 )
8735b4e5 395 ).toThrowError(
5c4c2dee
JB
396 new RangeError(
397 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
398 )
8735b4e5 399 )
f0d7f803
JB
400 expect(
401 () =>
402 new FixedThreadPool(
403 numberOfWorkers,
404 './tests/worker-files/thread/testWorker.js',
405 {
406 enableTasksQueue: true,
407 tasksQueueOptions: { concurrency: 0.2 }
408 }
409 )
8735b4e5 410 ).toThrowError(
20c6f652 411 new TypeError('Invalid worker node tasks concurrency: must be an integer')
8735b4e5 412 )
5c4c2dee
JB
413 expect(
414 () =>
415 new FixedThreadPool(
416 numberOfWorkers,
417 './tests/worker-files/thread/testWorker.js',
418 {
419 enableTasksQueue: true,
420 tasksQueueOptions: { size: 0 }
421 }
422 )
423 ).toThrowError(
424 new RangeError(
425 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
426 )
427 )
428 expect(
429 () =>
430 new FixedThreadPool(
431 numberOfWorkers,
432 './tests/worker-files/thread/testWorker.js',
433 {
434 enableTasksQueue: true,
435 tasksQueueOptions: { size: -1 }
436 }
437 )
438 ).toThrowError(
439 new RangeError(
440 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
441 )
442 )
443 expect(
444 () =>
445 new FixedThreadPool(
446 numberOfWorkers,
447 './tests/worker-files/thread/testWorker.js',
448 {
449 enableTasksQueue: true,
450 tasksQueueOptions: { size: 0.2 }
451 }
452 )
453 ).toThrowError(
454 new TypeError('Invalid worker node tasks queue size: must be an integer')
455 )
d4aeae5a
JB
456 })
457
2431bdb4 458 it('Verify that pool worker choice strategy options can be set', async () => {
a20f0ba5
JB
459 const pool = new FixedThreadPool(
460 numberOfWorkers,
461 './tests/worker-files/thread/testWorker.js',
462 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
463 )
464 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
8c0b113f 465 retries: 6,
8990357d
JB
466 runTime: { median: false },
467 waitTime: { median: false },
468 elu: { median: false }
469 })
470 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
8c0b113f 471 retries: 6,
932fc8be 472 runTime: { median: false },
5df69fab
JB
473 waitTime: { median: false },
474 elu: { median: false }
a20f0ba5
JB
475 })
476 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
477 .workerChoiceStrategies) {
86bf340d 478 expect(workerChoiceStrategy.opts).toStrictEqual({
8c0b113f 479 retries: 6,
932fc8be 480 runTime: { median: false },
5df69fab
JB
481 waitTime: { median: false },
482 elu: { median: false }
86bf340d 483 })
a20f0ba5 484 }
87de9ff5
JB
485 expect(
486 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
487 ).toStrictEqual({
932fc8be
JB
488 runTime: {
489 aggregate: true,
490 average: true,
491 median: false
492 },
493 waitTime: {
494 aggregate: false,
495 average: false,
496 median: false
497 },
5df69fab 498 elu: {
9adcefab
JB
499 aggregate: true,
500 average: true,
5df69fab
JB
501 median: false
502 }
86bf340d 503 })
9adcefab
JB
504 pool.setWorkerChoiceStrategyOptions({
505 runTime: { median: true },
506 elu: { median: true }
507 })
a20f0ba5 508 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
8c0b113f 509 retries: 6,
9adcefab 510 runTime: { median: true },
8990357d
JB
511 waitTime: { median: false },
512 elu: { median: true }
513 })
514 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
8c0b113f 515 retries: 6,
8990357d
JB
516 runTime: { median: true },
517 waitTime: { median: false },
9adcefab 518 elu: { median: true }
a20f0ba5
JB
519 })
520 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
521 .workerChoiceStrategies) {
932fc8be 522 expect(workerChoiceStrategy.opts).toStrictEqual({
8c0b113f 523 retries: 6,
9adcefab 524 runTime: { median: true },
8990357d 525 waitTime: { median: false },
9adcefab 526 elu: { median: true }
932fc8be 527 })
a20f0ba5 528 }
87de9ff5
JB
529 expect(
530 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
531 ).toStrictEqual({
932fc8be
JB
532 runTime: {
533 aggregate: true,
534 average: false,
535 median: true
536 },
537 waitTime: {
538 aggregate: false,
539 average: false,
540 median: false
541 },
5df69fab 542 elu: {
9adcefab 543 aggregate: true,
5df69fab 544 average: false,
9adcefab 545 median: true
5df69fab 546 }
86bf340d 547 })
9adcefab
JB
548 pool.setWorkerChoiceStrategyOptions({
549 runTime: { median: false },
550 elu: { median: false }
551 })
a20f0ba5 552 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
8c0b113f 553 retries: 6,
8990357d
JB
554 runTime: { median: false },
555 waitTime: { median: false },
556 elu: { median: false }
557 })
558 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
8c0b113f 559 retries: 6,
9adcefab 560 runTime: { median: false },
8990357d 561 waitTime: { median: false },
9adcefab 562 elu: { median: false }
a20f0ba5
JB
563 })
564 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
565 .workerChoiceStrategies) {
932fc8be 566 expect(workerChoiceStrategy.opts).toStrictEqual({
8c0b113f 567 retries: 6,
9adcefab 568 runTime: { median: false },
8990357d 569 waitTime: { median: false },
9adcefab 570 elu: { median: false }
932fc8be 571 })
a20f0ba5 572 }
87de9ff5
JB
573 expect(
574 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
575 ).toStrictEqual({
932fc8be
JB
576 runTime: {
577 aggregate: true,
578 average: true,
579 median: false
580 },
581 waitTime: {
582 aggregate: false,
583 average: false,
584 median: false
585 },
5df69fab 586 elu: {
9adcefab
JB
587 aggregate: true,
588 average: true,
5df69fab
JB
589 median: false
590 }
86bf340d 591 })
1f95d544
JB
592 expect(() =>
593 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
594 ).toThrowError(
8735b4e5
JB
595 new TypeError(
596 'Invalid worker choice strategy options: must be a plain object'
597 )
1f95d544 598 )
7e653ee0
JB
599 expect(() =>
600 pool.setWorkerChoiceStrategyOptions({
8c0b113f 601 retries: 'invalidChoiceRetries'
7e653ee0
JB
602 })
603 ).toThrowError(
604 new TypeError(
8c0b113f 605 'Invalid worker choice strategy options: retries must be an integer'
7e653ee0
JB
606 )
607 )
608 expect(() =>
8c0b113f 609 pool.setWorkerChoiceStrategyOptions({ retries: -1 })
7e653ee0
JB
610 ).toThrowError(
611 new RangeError(
8c0b113f 612 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
7e653ee0
JB
613 )
614 )
1f95d544
JB
615 expect(() =>
616 pool.setWorkerChoiceStrategyOptions({ weights: {} })
617 ).toThrowError(
8735b4e5
JB
618 new Error(
619 'Invalid worker choice strategy options: must have a weight for each worker node'
620 )
1f95d544
JB
621 )
622 expect(() =>
623 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
624 ).toThrowError(
8735b4e5
JB
625 new Error(
626 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
627 )
1f95d544 628 )
a20f0ba5
JB
629 await pool.destroy()
630 })
631
2431bdb4 632 it('Verify that pool tasks queue can be enabled/disabled', async () => {
a20f0ba5
JB
633 const pool = new FixedThreadPool(
634 numberOfWorkers,
635 './tests/worker-files/thread/testWorker.js'
636 )
637 expect(pool.opts.enableTasksQueue).toBe(false)
638 expect(pool.opts.tasksQueueOptions).toBeUndefined()
d6ca1416
JB
639 for (const workerNode of pool.workerNodes) {
640 expect(workerNode.onEmptyQueue).toBeUndefined()
641 expect(workerNode.onBackPressure).toBeUndefined()
642 }
a20f0ba5
JB
643 pool.enableTasksQueue(true)
644 expect(pool.opts.enableTasksQueue).toBe(true)
20c6f652
JB
645 expect(pool.opts.tasksQueueOptions).toStrictEqual({
646 concurrency: 1,
2324f8c9 647 size: Math.pow(numberOfWorkers, 2),
dbd73092 648 taskStealing: true,
47352846 649 tasksStealingOnBackPressure: true
20c6f652 650 })
d6ca1416
JB
651 for (const workerNode of pool.workerNodes) {
652 expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
653 expect(workerNode.onBackPressure).toBeInstanceOf(Function)
654 }
a20f0ba5
JB
655 pool.enableTasksQueue(true, { concurrency: 2 })
656 expect(pool.opts.enableTasksQueue).toBe(true)
20c6f652
JB
657 expect(pool.opts.tasksQueueOptions).toStrictEqual({
658 concurrency: 2,
2324f8c9 659 size: Math.pow(numberOfWorkers, 2),
dbd73092 660 taskStealing: true,
47352846 661 tasksStealingOnBackPressure: true
20c6f652 662 })
d6ca1416
JB
663 for (const workerNode of pool.workerNodes) {
664 expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
665 expect(workerNode.onBackPressure).toBeInstanceOf(Function)
666 }
a20f0ba5
JB
667 pool.enableTasksQueue(false)
668 expect(pool.opts.enableTasksQueue).toBe(false)
669 expect(pool.opts.tasksQueueOptions).toBeUndefined()
d6ca1416
JB
670 for (const workerNode of pool.workerNodes) {
671 expect(workerNode.onEmptyQueue).toBeUndefined()
672 expect(workerNode.onBackPressure).toBeUndefined()
673 }
a20f0ba5
JB
674 await pool.destroy()
675 })
676
2431bdb4 677 it('Verify that pool tasks queue options can be set', async () => {
a20f0ba5
JB
678 const pool = new FixedThreadPool(
679 numberOfWorkers,
680 './tests/worker-files/thread/testWorker.js',
681 { enableTasksQueue: true }
682 )
20c6f652
JB
683 expect(pool.opts.tasksQueueOptions).toStrictEqual({
684 concurrency: 1,
2324f8c9 685 size: Math.pow(numberOfWorkers, 2),
dbd73092 686 taskStealing: true,
47352846 687 tasksStealingOnBackPressure: true
20c6f652 688 })
d6ca1416 689 for (const workerNode of pool.workerNodes) {
2324f8c9
JB
690 expect(workerNode.tasksQueueBackPressureSize).toBe(
691 pool.opts.tasksQueueOptions.size
692 )
d6ca1416
JB
693 expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
694 expect(workerNode.onBackPressure).toBeInstanceOf(Function)
695 }
696 pool.setTasksQueueOptions({
697 concurrency: 2,
2324f8c9 698 size: 2,
d6ca1416
JB
699 taskStealing: false,
700 tasksStealingOnBackPressure: false
701 })
20c6f652
JB
702 expect(pool.opts.tasksQueueOptions).toStrictEqual({
703 concurrency: 2,
2324f8c9 704 size: 2,
d6ca1416
JB
705 taskStealing: false,
706 tasksStealingOnBackPressure: false
707 })
708 for (const workerNode of pool.workerNodes) {
2324f8c9
JB
709 expect(workerNode.tasksQueueBackPressureSize).toBe(
710 pool.opts.tasksQueueOptions.size
711 )
d6ca1416
JB
712 expect(workerNode.onEmptyQueue).toBeUndefined()
713 expect(workerNode.onBackPressure).toBeUndefined()
714 }
715 pool.setTasksQueueOptions({
716 concurrency: 1,
717 taskStealing: true,
718 tasksStealingOnBackPressure: true
719 })
720 expect(pool.opts.tasksQueueOptions).toStrictEqual({
721 concurrency: 1,
2324f8c9 722 size: Math.pow(numberOfWorkers, 2),
dbd73092 723 taskStealing: true,
47352846 724 tasksStealingOnBackPressure: true
20c6f652 725 })
d6ca1416 726 for (const workerNode of pool.workerNodes) {
2324f8c9
JB
727 expect(workerNode.tasksQueueBackPressureSize).toBe(
728 pool.opts.tasksQueueOptions.size
729 )
d6ca1416
JB
730 expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
731 expect(workerNode.onBackPressure).toBeInstanceOf(Function)
732 }
f0d7f803
JB
733 expect(() =>
734 pool.setTasksQueueOptions('invalidTasksQueueOptions')
8735b4e5
JB
735 ).toThrowError(
736 new TypeError('Invalid tasks queue options: must be a plain object')
737 )
a20f0ba5 738 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrowError(
e695d66f 739 new RangeError(
20c6f652 740 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
8735b4e5
JB
741 )
742 )
743 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrowError(
e695d66f 744 new RangeError(
20c6f652 745 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
8735b4e5 746 )
a20f0ba5 747 )
f0d7f803 748 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrowError(
20c6f652
JB
749 new TypeError('Invalid worker node tasks concurrency: must be an integer')
750 )
ff3f866a 751 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrowError(
20c6f652 752 new RangeError(
68dbcdc0 753 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
20c6f652
JB
754 )
755 )
ff3f866a 756 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrowError(
20c6f652 757 new RangeError(
68dbcdc0 758 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
20c6f652
JB
759 )
760 )
ff3f866a 761 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrowError(
68dbcdc0 762 new TypeError('Invalid worker node tasks queue size: must be an integer')
f0d7f803 763 )
a20f0ba5
JB
764 await pool.destroy()
765 })
766
6b27d407
JB
767 it('Verify that pool info is set', async () => {
768 let pool = new FixedThreadPool(
769 numberOfWorkers,
770 './tests/worker-files/thread/testWorker.js'
771 )
2dca6cab
JB
772 expect(pool.info).toStrictEqual({
773 version,
774 type: PoolTypes.fixed,
775 worker: WorkerTypes.thread,
47352846 776 started: true,
2dca6cab
JB
777 ready: true,
778 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
779 minSize: numberOfWorkers,
780 maxSize: numberOfWorkers,
781 workerNodes: numberOfWorkers,
782 idleWorkerNodes: numberOfWorkers,
783 busyWorkerNodes: 0,
784 executedTasks: 0,
785 executingTasks: 0,
2dca6cab
JB
786 failedTasks: 0
787 })
6b27d407
JB
788 await pool.destroy()
789 pool = new DynamicClusterPool(
2431bdb4 790 Math.floor(numberOfWorkers / 2),
6b27d407 791 numberOfWorkers,
ecdfbdc0 792 './tests/worker-files/cluster/testWorker.js'
6b27d407 793 )
2dca6cab
JB
794 expect(pool.info).toStrictEqual({
795 version,
796 type: PoolTypes.dynamic,
797 worker: WorkerTypes.cluster,
47352846 798 started: true,
2dca6cab
JB
799 ready: true,
800 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
801 minSize: Math.floor(numberOfWorkers / 2),
802 maxSize: numberOfWorkers,
803 workerNodes: Math.floor(numberOfWorkers / 2),
804 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
805 busyWorkerNodes: 0,
806 executedTasks: 0,
807 executingTasks: 0,
2dca6cab
JB
808 failedTasks: 0
809 })
6b27d407
JB
810 await pool.destroy()
811 })
812
2431bdb4 813 it('Verify that pool worker tasks usage are initialized', async () => {
bf9549ae
JB
814 const pool = new FixedClusterPool(
815 numberOfWorkers,
816 './tests/worker-files/cluster/testWorker.js'
817 )
f06e48d8 818 for (const workerNode of pool.workerNodes) {
47352846 819 expect(workerNode).toBeInstanceOf(WorkerNode)
465b2940 820 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
821 tasks: {
822 executed: 0,
823 executing: 0,
824 queued: 0,
df593701 825 maxQueued: 0,
68cbdc84 826 stolen: 0,
a4e07f72
JB
827 failed: 0
828 },
829 runTime: {
4ba4c7f9 830 history: new CircularArray()
a4e07f72
JB
831 },
832 waitTime: {
4ba4c7f9 833 history: new CircularArray()
a4e07f72 834 },
5df69fab
JB
835 elu: {
836 idle: {
4ba4c7f9 837 history: new CircularArray()
5df69fab
JB
838 },
839 active: {
4ba4c7f9 840 history: new CircularArray()
f7510105 841 }
5df69fab 842 }
86bf340d 843 })
f06e48d8
JB
844 }
845 await pool.destroy()
846 })
847
2431bdb4
JB
848 it('Verify that pool worker tasks queue are initialized', async () => {
849 let pool = new FixedClusterPool(
f06e48d8
JB
850 numberOfWorkers,
851 './tests/worker-files/cluster/testWorker.js'
852 )
853 for (const workerNode of pool.workerNodes) {
47352846 854 expect(workerNode).toBeInstanceOf(WorkerNode)
574b351d 855 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
4d8bf9e4 856 expect(workerNode.tasksQueue.size).toBe(0)
9c16fb4b 857 expect(workerNode.tasksQueue.maxSize).toBe(0)
bf9549ae 858 }
fd7ebd49 859 await pool.destroy()
2431bdb4
JB
860 pool = new DynamicThreadPool(
861 Math.floor(numberOfWorkers / 2),
862 numberOfWorkers,
863 './tests/worker-files/thread/testWorker.js'
864 )
865 for (const workerNode of pool.workerNodes) {
47352846 866 expect(workerNode).toBeInstanceOf(WorkerNode)
574b351d 867 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
2431bdb4
JB
868 expect(workerNode.tasksQueue.size).toBe(0)
869 expect(workerNode.tasksQueue.maxSize).toBe(0)
870 }
213cbac6 871 await pool.destroy()
2431bdb4
JB
872 })
873
874 it('Verify that pool worker info are initialized', async () => {
875 let pool = new FixedClusterPool(
876 numberOfWorkers,
877 './tests/worker-files/cluster/testWorker.js'
878 )
2dca6cab 879 for (const workerNode of pool.workerNodes) {
47352846 880 expect(workerNode).toBeInstanceOf(WorkerNode)
2dca6cab
JB
881 expect(workerNode.info).toStrictEqual({
882 id: expect.any(Number),
883 type: WorkerTypes.cluster,
884 dynamic: false,
885 ready: true
886 })
887 }
2431bdb4
JB
888 await pool.destroy()
889 pool = new DynamicThreadPool(
890 Math.floor(numberOfWorkers / 2),
891 numberOfWorkers,
892 './tests/worker-files/thread/testWorker.js'
893 )
2dca6cab 894 for (const workerNode of pool.workerNodes) {
47352846 895 expect(workerNode).toBeInstanceOf(WorkerNode)
2dca6cab
JB
896 expect(workerNode.info).toStrictEqual({
897 id: expect.any(Number),
898 type: WorkerTypes.thread,
899 dynamic: false,
7884d183 900 ready: true
2dca6cab
JB
901 })
902 }
213cbac6 903 await pool.destroy()
bf9549ae
JB
904 })
905
47352846
JB
906 it('Verify that pool can be started after initialization', async () => {
907 const pool = new FixedClusterPool(
908 numberOfWorkers,
909 './tests/worker-files/cluster/testWorker.js',
910 {
911 startWorkers: false
912 }
913 )
914 expect(pool.info.started).toBe(false)
915 expect(pool.info.ready).toBe(false)
916 expect(pool.workerNodes).toStrictEqual([])
917 await expect(pool.execute()).rejects.toThrowError(
918 new Error('Cannot execute a task on not started pool')
919 )
920 pool.start()
921 expect(pool.info.started).toBe(true)
922 expect(pool.info.ready).toBe(true)
923 expect(pool.workerNodes.length).toBe(numberOfWorkers)
924 for (const workerNode of pool.workerNodes) {
925 expect(workerNode).toBeInstanceOf(WorkerNode)
926 }
927 await pool.destroy()
928 })
929
9d2d0da1
JB
930 it('Verify that pool execute() arguments are checked', async () => {
931 const pool = new FixedClusterPool(
932 numberOfWorkers,
933 './tests/worker-files/cluster/testWorker.js'
934 )
935 await expect(pool.execute(undefined, 0)).rejects.toThrowError(
936 new TypeError('name argument must be a string')
937 )
938 await expect(pool.execute(undefined, '')).rejects.toThrowError(
939 new TypeError('name argument must not be an empty string')
940 )
941 await expect(pool.execute(undefined, undefined, {})).rejects.toThrowError(
942 new TypeError('transferList argument must be an array')
943 )
944 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
945 "Task function 'unknown' not found"
946 )
947 await pool.destroy()
47352846
JB
948 await expect(pool.execute()).rejects.toThrowError(
949 new Error('Cannot execute a task on not started pool')
9d2d0da1
JB
950 )
951 })
952
2431bdb4 953 it('Verify that pool worker tasks usage are computed', async () => {
bf9549ae
JB
954 const pool = new FixedClusterPool(
955 numberOfWorkers,
956 './tests/worker-files/cluster/testWorker.js'
957 )
09c2d0d3 958 const promises = new Set()
fc027381
JB
959 const maxMultiplier = 2
960 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
09c2d0d3 961 promises.add(pool.execute())
bf9549ae 962 }
f06e48d8 963 for (const workerNode of pool.workerNodes) {
465b2940 964 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
965 tasks: {
966 executed: 0,
967 executing: maxMultiplier,
968 queued: 0,
df593701 969 maxQueued: 0,
68cbdc84 970 stolen: 0,
a4e07f72
JB
971 failed: 0
972 },
973 runTime: {
a4e07f72
JB
974 history: expect.any(CircularArray)
975 },
976 waitTime: {
a4e07f72
JB
977 history: expect.any(CircularArray)
978 },
5df69fab
JB
979 elu: {
980 idle: {
5df69fab
JB
981 history: expect.any(CircularArray)
982 },
983 active: {
5df69fab 984 history: expect.any(CircularArray)
f7510105 985 }
5df69fab 986 }
86bf340d 987 })
bf9549ae
JB
988 }
989 await Promise.all(promises)
f06e48d8 990 for (const workerNode of pool.workerNodes) {
465b2940 991 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
992 tasks: {
993 executed: maxMultiplier,
994 executing: 0,
995 queued: 0,
df593701 996 maxQueued: 0,
68cbdc84 997 stolen: 0,
a4e07f72
JB
998 failed: 0
999 },
1000 runTime: {
a4e07f72
JB
1001 history: expect.any(CircularArray)
1002 },
1003 waitTime: {
a4e07f72
JB
1004 history: expect.any(CircularArray)
1005 },
5df69fab
JB
1006 elu: {
1007 idle: {
5df69fab
JB
1008 history: expect.any(CircularArray)
1009 },
1010 active: {
5df69fab 1011 history: expect.any(CircularArray)
f7510105 1012 }
5df69fab 1013 }
86bf340d 1014 })
bf9549ae 1015 }
fd7ebd49 1016 await pool.destroy()
bf9549ae
JB
1017 })
1018
2431bdb4 1019 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
7fd82a1c 1020 const pool = new DynamicThreadPool(
2431bdb4 1021 Math.floor(numberOfWorkers / 2),
8f4878b7 1022 numberOfWorkers,
9e619829
JB
1023 './tests/worker-files/thread/testWorker.js'
1024 )
09c2d0d3 1025 const promises = new Set()
ee9f5295
JB
1026 const maxMultiplier = 2
1027 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
09c2d0d3 1028 promises.add(pool.execute())
9e619829
JB
1029 }
1030 await Promise.all(promises)
f06e48d8 1031 for (const workerNode of pool.workerNodes) {
465b2940 1032 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
1033 tasks: {
1034 executed: expect.any(Number),
1035 executing: 0,
1036 queued: 0,
df593701 1037 maxQueued: 0,
68cbdc84 1038 stolen: 0,
a4e07f72
JB
1039 failed: 0
1040 },
1041 runTime: {
a4e07f72
JB
1042 history: expect.any(CircularArray)
1043 },
1044 waitTime: {
a4e07f72
JB
1045 history: expect.any(CircularArray)
1046 },
5df69fab
JB
1047 elu: {
1048 idle: {
5df69fab
JB
1049 history: expect.any(CircularArray)
1050 },
1051 active: {
5df69fab 1052 history: expect.any(CircularArray)
f7510105 1053 }
5df69fab 1054 }
86bf340d 1055 })
465b2940 1056 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
94407def
JB
1057 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1058 numberOfWorkers * maxMultiplier
1059 )
b97d82d8
JB
1060 expect(workerNode.usage.runTime.history.length).toBe(0)
1061 expect(workerNode.usage.waitTime.history.length).toBe(0)
1062 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1063 expect(workerNode.usage.elu.active.history.length).toBe(0)
9e619829
JB
1064 }
1065 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
f06e48d8 1066 for (const workerNode of pool.workerNodes) {
465b2940 1067 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
1068 tasks: {
1069 executed: 0,
1070 executing: 0,
1071 queued: 0,
df593701 1072 maxQueued: 0,
68cbdc84 1073 stolen: 0,
a4e07f72
JB
1074 failed: 0
1075 },
1076 runTime: {
a4e07f72
JB
1077 history: expect.any(CircularArray)
1078 },
1079 waitTime: {
a4e07f72
JB
1080 history: expect.any(CircularArray)
1081 },
5df69fab
JB
1082 elu: {
1083 idle: {
5df69fab
JB
1084 history: expect.any(CircularArray)
1085 },
1086 active: {
5df69fab 1087 history: expect.any(CircularArray)
f7510105 1088 }
5df69fab 1089 }
86bf340d 1090 })
465b2940
JB
1091 expect(workerNode.usage.runTime.history.length).toBe(0)
1092 expect(workerNode.usage.waitTime.history.length).toBe(0)
b97d82d8
JB
1093 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1094 expect(workerNode.usage.elu.active.history.length).toBe(0)
ee11a4a2 1095 }
fd7ebd49 1096 await pool.destroy()
ee11a4a2
JB
1097 })
1098
a1763c54
JB
1099 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1100 const pool = new DynamicClusterPool(
2431bdb4 1101 Math.floor(numberOfWorkers / 2),
164d950a 1102 numberOfWorkers,
a1763c54 1103 './tests/worker-files/cluster/testWorker.js'
164d950a 1104 )
d46660cd 1105 let poolInfo
a1763c54 1106 let poolReady = 0
041dc05b 1107 pool.emitter.on(PoolEvents.ready, info => {
a1763c54 1108 ++poolReady
d46660cd
JB
1109 poolInfo = info
1110 })
a1763c54
JB
1111 await waitPoolEvents(pool, PoolEvents.ready, 1)
1112 expect(poolReady).toBe(1)
d46660cd 1113 expect(poolInfo).toStrictEqual({
23ccf9d7 1114 version,
d46660cd 1115 type: PoolTypes.dynamic,
a1763c54 1116 worker: WorkerTypes.cluster,
47352846 1117 started: true,
a1763c54 1118 ready: true,
2431bdb4
JB
1119 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1120 minSize: expect.any(Number),
1121 maxSize: expect.any(Number),
1122 workerNodes: expect.any(Number),
1123 idleWorkerNodes: expect.any(Number),
1124 busyWorkerNodes: expect.any(Number),
1125 executedTasks: expect.any(Number),
1126 executingTasks: expect.any(Number),
2431bdb4
JB
1127 failedTasks: expect.any(Number)
1128 })
1129 await pool.destroy()
1130 })
1131
a1763c54
JB
1132 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1133 const pool = new FixedThreadPool(
2431bdb4 1134 numberOfWorkers,
a1763c54 1135 './tests/worker-files/thread/testWorker.js'
2431bdb4 1136 )
a1763c54
JB
1137 const promises = new Set()
1138 let poolBusy = 0
2431bdb4 1139 let poolInfo
041dc05b 1140 pool.emitter.on(PoolEvents.busy, info => {
a1763c54 1141 ++poolBusy
2431bdb4
JB
1142 poolInfo = info
1143 })
a1763c54
JB
1144 for (let i = 0; i < numberOfWorkers * 2; i++) {
1145 promises.add(pool.execute())
1146 }
1147 await Promise.all(promises)
1148 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1149 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1150 expect(poolBusy).toBe(numberOfWorkers + 1)
2431bdb4
JB
1151 expect(poolInfo).toStrictEqual({
1152 version,
a1763c54
JB
1153 type: PoolTypes.fixed,
1154 worker: WorkerTypes.thread,
47352846
JB
1155 started: true,
1156 ready: true,
2431bdb4 1157 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
d46660cd
JB
1158 minSize: expect.any(Number),
1159 maxSize: expect.any(Number),
1160 workerNodes: expect.any(Number),
1161 idleWorkerNodes: expect.any(Number),
1162 busyWorkerNodes: expect.any(Number),
a4e07f72
JB
1163 executedTasks: expect.any(Number),
1164 executingTasks: expect.any(Number),
a4e07f72 1165 failedTasks: expect.any(Number)
d46660cd 1166 })
164d950a
JB
1167 await pool.destroy()
1168 })
1169
a1763c54
JB
1170 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1171 const pool = new DynamicThreadPool(
1172 Math.floor(numberOfWorkers / 2),
7c0ba920
JB
1173 numberOfWorkers,
1174 './tests/worker-files/thread/testWorker.js'
1175 )
09c2d0d3 1176 const promises = new Set()
a1763c54 1177 let poolFull = 0
d46660cd 1178 let poolInfo
041dc05b 1179 pool.emitter.on(PoolEvents.full, info => {
a1763c54 1180 ++poolFull
d46660cd
JB
1181 poolInfo = info
1182 })
7c0ba920 1183 for (let i = 0; i < numberOfWorkers * 2; i++) {
f5d14e90 1184 promises.add(pool.execute())
7c0ba920 1185 }
cf597bc5 1186 await Promise.all(promises)
33e6bb4c 1187 expect(poolFull).toBe(1)
d46660cd 1188 expect(poolInfo).toStrictEqual({
23ccf9d7 1189 version,
a1763c54 1190 type: PoolTypes.dynamic,
d46660cd 1191 worker: WorkerTypes.thread,
47352846
JB
1192 started: true,
1193 ready: true,
2431bdb4 1194 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
8735b4e5
JB
1195 minSize: expect.any(Number),
1196 maxSize: expect.any(Number),
1197 workerNodes: expect.any(Number),
1198 idleWorkerNodes: expect.any(Number),
1199 busyWorkerNodes: expect.any(Number),
1200 executedTasks: expect.any(Number),
1201 executingTasks: expect.any(Number),
1202 failedTasks: expect.any(Number)
1203 })
1204 await pool.destroy()
1205 })
1206
3e8611a8 1207 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
b1aae695 1208 const pool = new FixedThreadPool(
8735b4e5
JB
1209 numberOfWorkers,
1210 './tests/worker-files/thread/testWorker.js',
1211 {
1212 enableTasksQueue: true
1213 }
1214 )
3e8611a8 1215 sinon.stub(pool, 'hasBackPressure').returns(true)
8735b4e5
JB
1216 const promises = new Set()
1217 let poolBackPressure = 0
1218 let poolInfo
041dc05b 1219 pool.emitter.on(PoolEvents.backPressure, info => {
8735b4e5
JB
1220 ++poolBackPressure
1221 poolInfo = info
1222 })
033f1776 1223 for (let i = 0; i < numberOfWorkers + 1; i++) {
8735b4e5
JB
1224 promises.add(pool.execute())
1225 }
1226 await Promise.all(promises)
033f1776 1227 expect(poolBackPressure).toBe(1)
8735b4e5
JB
1228 expect(poolInfo).toStrictEqual({
1229 version,
3e8611a8 1230 type: PoolTypes.fixed,
8735b4e5 1231 worker: WorkerTypes.thread,
47352846
JB
1232 started: true,
1233 ready: true,
8735b4e5 1234 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
d46660cd
JB
1235 minSize: expect.any(Number),
1236 maxSize: expect.any(Number),
1237 workerNodes: expect.any(Number),
1238 idleWorkerNodes: expect.any(Number),
1239 busyWorkerNodes: expect.any(Number),
a4e07f72
JB
1240 executedTasks: expect.any(Number),
1241 executingTasks: expect.any(Number),
3e8611a8
JB
1242 maxQueuedTasks: expect.any(Number),
1243 queuedTasks: expect.any(Number),
1244 backPressure: true,
68cbdc84 1245 stolenTasks: expect.any(Number),
a4e07f72 1246 failedTasks: expect.any(Number)
d46660cd 1247 })
3e8611a8 1248 expect(pool.hasBackPressure.called).toBe(true)
fd7ebd49 1249 await pool.destroy()
7c0ba920 1250 })
70a4f5ea 1251
9eae3c69
JB
1252 it('Verify that hasTaskFunction() is working', async () => {
1253 const dynamicThreadPool = new DynamicThreadPool(
1254 Math.floor(numberOfWorkers / 2),
1255 numberOfWorkers,
1256 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.js'
1257 )
1258 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1259 expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1260 expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1261 true
1262 )
1263 expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true)
1264 expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true)
1265 expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false)
1266 await dynamicThreadPool.destroy()
1267 const fixedClusterPool = new FixedClusterPool(
1268 numberOfWorkers,
1269 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1270 )
1271 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1272 expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1273 expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1274 true
1275 )
1276 expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true)
1277 expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true)
1278 expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false)
1279 await fixedClusterPool.destroy()
1280 })
1281
1282 it('Verify that addTaskFunction() is working', async () => {
1283 const dynamicThreadPool = new DynamicThreadPool(
1284 Math.floor(numberOfWorkers / 2),
1285 numberOfWorkers,
1286 './tests/worker-files/thread/testWorker.js'
1287 )
1288 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
3feeab69
JB
1289 await expect(
1290 dynamicThreadPool.addTaskFunction(0, () => {})
1291 ).rejects.toThrowError(new TypeError('name argument must be a string'))
1292 await expect(
1293 dynamicThreadPool.addTaskFunction('', () => {})
1294 ).rejects.toThrowError(
1295 new TypeError('name argument must not be an empty string')
1296 )
1297 await expect(
1298 dynamicThreadPool.addTaskFunction('test', 0)
1299 ).rejects.toThrowError(new TypeError('fn argument must be a function'))
1300 await expect(
1301 dynamicThreadPool.addTaskFunction('test', '')
1302 ).rejects.toThrowError(new TypeError('fn argument must be a function'))
9eae3c69
JB
1303 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1304 DEFAULT_TASK_NAME,
1305 'test'
1306 ])
1307 const echoTaskFunction = data => {
1308 return data
1309 }
1310 await expect(
1311 dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1312 ).resolves.toBe(true)
1313 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1314 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1315 echoTaskFunction
1316 )
1317 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1318 DEFAULT_TASK_NAME,
1319 'test',
1320 'echo'
1321 ])
1322 const taskFunctionData = { test: 'test' }
1323 const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
1324 expect(echoResult).toStrictEqual(taskFunctionData)
adee6053
JB
1325 for (const workerNode of dynamicThreadPool.workerNodes) {
1326 expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
1327 tasks: {
1328 executed: expect.any(Number),
1329 executing: 0,
1330 queued: 0,
1331 stolen: 0,
1332 failed: 0
1333 },
1334 runTime: {
1335 history: new CircularArray()
1336 },
1337 waitTime: {
1338 history: new CircularArray()
1339 },
1340 elu: {
1341 idle: {
1342 history: new CircularArray()
1343 },
1344 active: {
1345 history: new CircularArray()
1346 }
1347 }
1348 })
1349 }
9eae3c69
JB
1350 await dynamicThreadPool.destroy()
1351 })
1352
1353 it('Verify that removeTaskFunction() is working', async () => {
1354 const dynamicThreadPool = new DynamicThreadPool(
1355 Math.floor(numberOfWorkers / 2),
1356 numberOfWorkers,
1357 './tests/worker-files/thread/testWorker.js'
1358 )
1359 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1360 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1361 DEFAULT_TASK_NAME,
1362 'test'
1363 ])
1364 await expect(
1365 dynamicThreadPool.removeTaskFunction('test')
1366 ).rejects.toThrowError(
16248b23 1367 new Error('Cannot remove a task function not handled on the pool side')
9eae3c69
JB
1368 )
1369 const echoTaskFunction = data => {
1370 return data
1371 }
1372 await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1373 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1374 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1375 echoTaskFunction
1376 )
1377 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1378 DEFAULT_TASK_NAME,
1379 'test',
1380 'echo'
1381 ])
1382 await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
1383 true
1384 )
1385 expect(dynamicThreadPool.taskFunctions.size).toBe(0)
1386 expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
1387 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1388 DEFAULT_TASK_NAME,
1389 'test'
1390 ])
1391 await dynamicThreadPool.destroy()
1392 })
1393
30500265 1394 it('Verify that listTaskFunctionNames() is working', async () => {
90d7d101
JB
1395 const dynamicThreadPool = new DynamicThreadPool(
1396 Math.floor(numberOfWorkers / 2),
1397 numberOfWorkers,
1398 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.js'
1399 )
1400 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
66979634 1401 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
6cd5248f 1402 DEFAULT_TASK_NAME,
90d7d101
JB
1403 'jsonIntegerSerialization',
1404 'factorial',
1405 'fibonacci'
1406 ])
9eae3c69 1407 await dynamicThreadPool.destroy()
90d7d101
JB
1408 const fixedClusterPool = new FixedClusterPool(
1409 numberOfWorkers,
1410 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1411 )
1412 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
66979634 1413 expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([
6cd5248f 1414 DEFAULT_TASK_NAME,
90d7d101
JB
1415 'jsonIntegerSerialization',
1416 'factorial',
1417 'fibonacci'
1418 ])
0fe39c97 1419 await fixedClusterPool.destroy()
90d7d101
JB
1420 })
1421
9eae3c69 1422 it('Verify that setDefaultTaskFunction() is working', async () => {
30500265
JB
1423 const dynamicThreadPool = new DynamicThreadPool(
1424 Math.floor(numberOfWorkers / 2),
1425 numberOfWorkers,
1426 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.js'
1427 )
1428 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
b0b55f57
JB
1429 await expect(
1430 dynamicThreadPool.setDefaultTaskFunction(0)
1431 ).rejects.toThrowError(
1432 new Error(
1433 "Task function operation 'default' failed on worker 31 with error: 'TypeError: name parameter is not a string'"
1434 )
1435 )
1436 await expect(
1437 dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
1438 ).rejects.toThrowError(
1439 new Error(
1440 "Task function operation 'default' failed on worker 31 with error: 'Error: Cannot set the default task function reserved name as the default task function'"
1441 )
1442 )
1443 await expect(
1444 dynamicThreadPool.setDefaultTaskFunction('unknown')
1445 ).rejects.toThrowError(
1446 new Error(
1447 "Task function operation 'default' failed on worker 31 with error: 'Error: Cannot set the default task function to a non-existing task function'"
1448 )
1449 )
9eae3c69
JB
1450 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1451 DEFAULT_TASK_NAME,
1452 'jsonIntegerSerialization',
1453 'factorial',
1454 'fibonacci'
1455 ])
1456 await expect(
1457 dynamicThreadPool.setDefaultTaskFunction('factorial')
1458 ).resolves.toBe(true)
1459 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1460 DEFAULT_TASK_NAME,
1461 'factorial',
1462 'jsonIntegerSerialization',
1463 'fibonacci'
1464 ])
1465 await expect(
1466 dynamicThreadPool.setDefaultTaskFunction('fibonacci')
1467 ).resolves.toBe(true)
1468 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1469 DEFAULT_TASK_NAME,
1470 'fibonacci',
1471 'jsonIntegerSerialization',
1472 'factorial'
1473 ])
30500265
JB
1474 })
1475
90d7d101 1476 it('Verify that multiple task functions worker is working', async () => {
70a4f5ea 1477 const pool = new DynamicClusterPool(
2431bdb4 1478 Math.floor(numberOfWorkers / 2),
70a4f5ea 1479 numberOfWorkers,
90d7d101 1480 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
70a4f5ea
JB
1481 )
1482 const data = { n: 10 }
82888165 1483 const result0 = await pool.execute(data)
30b963d4 1484 expect(result0).toStrictEqual({ ok: 1 })
70a4f5ea 1485 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
30b963d4 1486 expect(result1).toStrictEqual({ ok: 1 })
70a4f5ea
JB
1487 const result2 = await pool.execute(data, 'factorial')
1488 expect(result2).toBe(3628800)
1489 const result3 = await pool.execute(data, 'fibonacci')
024daf59 1490 expect(result3).toBe(55)
5bb5be17
JB
1491 expect(pool.info.executingTasks).toBe(0)
1492 expect(pool.info.executedTasks).toBe(4)
b414b84c 1493 for (const workerNode of pool.workerNodes) {
66979634 1494 expect(workerNode.info.taskFunctionNames).toStrictEqual([
6cd5248f 1495 DEFAULT_TASK_NAME,
b414b84c
JB
1496 'jsonIntegerSerialization',
1497 'factorial',
1498 'fibonacci'
1499 ])
1500 expect(workerNode.taskFunctionsUsage.size).toBe(3)
66979634 1501 for (const name of pool.listTaskFunctionNames()) {
5bb5be17
JB
1502 expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
1503 tasks: {
1504 executed: expect.any(Number),
4ba4c7f9 1505 executing: 0,
5bb5be17 1506 failed: 0,
68cbdc84
JB
1507 queued: 0,
1508 stolen: 0
5bb5be17
JB
1509 },
1510 runTime: {
1511 history: expect.any(CircularArray)
1512 },
1513 waitTime: {
1514 history: expect.any(CircularArray)
1515 },
1516 elu: {
1517 idle: {
1518 history: expect.any(CircularArray)
1519 },
1520 active: {
1521 history: expect.any(CircularArray)
1522 }
1523 }
1524 })
1525 expect(
4ba4c7f9
JB
1526 workerNode.getTaskFunctionWorkerUsage(name).tasks.executed
1527 ).toBeGreaterThan(0)
5bb5be17 1528 }
dfd7ec01
JB
1529 expect(
1530 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1531 ).toStrictEqual(
66979634
JB
1532 workerNode.getTaskFunctionWorkerUsage(
1533 workerNode.info.taskFunctionNames[1]
1534 )
dfd7ec01 1535 )
5bb5be17 1536 }
0fe39c97 1537 await pool.destroy()
70a4f5ea 1538 })
52a23942
JB
1539
1540 it('Verify sendKillMessageToWorker()', async () => {
1541 const pool = new DynamicClusterPool(
1542 Math.floor(numberOfWorkers / 2),
1543 numberOfWorkers,
1544 './tests/worker-files/cluster/testWorker.js'
1545 )
1546 const workerNodeKey = 0
1547 await expect(
adee6053 1548 pool.sendKillMessageToWorker(workerNodeKey)
52a23942
JB
1549 ).resolves.toBeUndefined()
1550 await pool.destroy()
1551 })
adee6053
JB
1552
1553 it('Verify sendTaskFunctionOperationToWorker()', async () => {
1554 const pool = new DynamicClusterPool(
1555 Math.floor(numberOfWorkers / 2),
1556 numberOfWorkers,
1557 './tests/worker-files/cluster/testWorker.js'
1558 )
1559 const workerNodeKey = 0
1560 await expect(
1561 pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
1562 taskFunctionOperation: 'add',
1563 taskFunctionName: 'empty',
1564 taskFunction: (() => {}).toString()
1565 })
1566 ).resolves.toBe(true)
1567 expect(
1568 pool.workerNodes[workerNodeKey].info.taskFunctionNames
1569 ).toStrictEqual([DEFAULT_TASK_NAME, 'test', 'empty'])
1570 await pool.destroy()
1571 })
1572
1573 it('Verify sendTaskFunctionOperationToWorkers()', async () => {
1574 const pool = new DynamicClusterPool(
1575 Math.floor(numberOfWorkers / 2),
1576 numberOfWorkers,
1577 './tests/worker-files/cluster/testWorker.js'
1578 )
adee6053
JB
1579 await expect(
1580 pool.sendTaskFunctionOperationToWorkers({
1581 taskFunctionOperation: 'add',
1582 taskFunctionName: 'empty',
1583 taskFunction: (() => {}).toString()
1584 })
1585 ).resolves.toBe(true)
1586 for (const workerNode of pool.workerNodes) {
1587 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1588 DEFAULT_TASK_NAME,
1589 'test',
1590 'empty'
1591 ])
1592 }
1593 await pool.destroy()
1594 })
3ec964d6 1595})