Merge pull request #1360 from poolifier/combined-prs-branch
[poolifier.git] / tests / pools / abstract-pool.test.mjs
CommitLineData
a074ffee
JB
1import { EventEmitterAsyncResource } from 'node:events'
2import { readFileSync } from 'node:fs'
3import { expect } from 'expect'
4import { restore, stub } from 'sinon'
5import {
70a4f5ea 6 DynamicClusterPool,
9e619829 7 DynamicThreadPool,
aee46736 8 FixedClusterPool,
e843b904 9 FixedThreadPool,
aee46736 10 PoolEvents,
184855e6 11 PoolTypes,
3d6dd312 12 WorkerChoiceStrategies,
184855e6 13 WorkerTypes
a074ffee
JB
14} from '../../lib/index.js'
15import { CircularArray } from '../../lib/circular-array.js'
16import { Deque } from '../../lib/deque.js'
17import { DEFAULT_TASK_NAME } from '../../lib/utils.js'
18import { waitPoolEvents } from '../test-utils.js'
19import { WorkerNode } from '../../lib/pools/worker-node.js'
e1ffb94f
JB
20
21describe('Abstract pool test suite', () => {
a074ffee 22 const version = JSON.parse(readFileSync('./package.json', 'utf8')).version
fc027381 23 const numberOfWorkers = 2
a8884ffd 24 class StubPoolWithIsMain extends FixedThreadPool {
e1ffb94f
JB
25 isMain () {
26 return false
27 }
3ec964d6 28 }
3ec964d6 29
dc021bcc 30 afterEach(() => {
a074ffee 31 restore()
dc021bcc
JB
32 })
33
3ec964d6 34 it('Simulate pool creation from a non main thread/process', () => {
8d3782fa
JB
35 expect(
36 () =>
a8884ffd 37 new StubPoolWithIsMain(
7c0ba920 38 numberOfWorkers,
b2fd3f4a 39 './tests/worker-files/thread/testWorker.mjs',
8d3782fa 40 {
041dc05b 41 errorHandler: e => console.error(e)
8d3782fa
JB
42 }
43 )
04f45163 44 ).toThrowError(
e695d66f
JB
45 new Error(
46 'Cannot start a pool from a worker with the same type as the pool'
47 )
04f45163 48 )
3ec964d6 49 })
c510fea7 50
bc61cfe6
JB
51 it('Verify that pool statuses properties are set', async () => {
52 const pool = new FixedThreadPool(
53 numberOfWorkers,
b2fd3f4a 54 './tests/worker-files/thread/testWorker.mjs'
bc61cfe6
JB
55 )
56 expect(pool.starting).toBe(false)
57 expect(pool.started).toBe(true)
58 await pool.destroy()
bc61cfe6
JB
59 })
60
c510fea7 61 it('Verify that filePath is checked', () => {
292ad316
JB
62 const expectedError = new Error(
63 'Please specify a file with a worker implementation'
64 )
7c0ba920 65 expect(() => new FixedThreadPool(numberOfWorkers)).toThrowError(
292ad316 66 expectedError
8d3782fa 67 )
7c0ba920 68 expect(() => new FixedThreadPool(numberOfWorkers, '')).toThrowError(
292ad316 69 expectedError
8d3782fa 70 )
3d6dd312
JB
71 expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrowError(
72 expectedError
73 )
74 expect(() => new FixedThreadPool(numberOfWorkers, true)).toThrowError(
75 expectedError
76 )
77 expect(
78 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
79 ).toThrowError(new Error("Cannot find the worker file './dummyWorker.ts'"))
8d3782fa
JB
80 })
81
82 it('Verify that numberOfWorkers is checked', () => {
8003c026
JB
83 expect(
84 () =>
85 new FixedThreadPool(
86 undefined,
b2fd3f4a 87 './tests/worker-files/thread/testWorker.mjs'
8003c026
JB
88 )
89 ).toThrowError(
e695d66f
JB
90 new Error(
91 'Cannot instantiate a pool without specifying the number of workers'
92 )
8d3782fa
JB
93 )
94 })
95
96 it('Verify that a negative number of workers is checked', () => {
97 expect(
98 () =>
99 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
100 ).toThrowError(
473c717a
JB
101 new RangeError(
102 'Cannot instantiate a pool with a negative number of workers'
103 )
8d3782fa
JB
104 )
105 })
106
107 it('Verify that a non integer number of workers is checked', () => {
108 expect(
109 () =>
b2fd3f4a 110 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.mjs')
8d3782fa 111 ).toThrowError(
473c717a 112 new TypeError(
0d80593b 113 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
114 )
115 )
c510fea7 116 })
7c0ba920 117
216541b6 118 it('Verify that dynamic pool sizing is checked', () => {
a5ed75b7
JB
119 expect(
120 () =>
121 new DynamicClusterPool(
122 1,
123 undefined,
124 './tests/worker-files/cluster/testWorker.js'
125 )
126 ).toThrowError(
127 new TypeError(
128 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
129 )
130 )
2761efb4
JB
131 expect(
132 () =>
133 new DynamicThreadPool(
134 0.5,
135 1,
b2fd3f4a 136 './tests/worker-files/thread/testWorker.mjs'
2761efb4
JB
137 )
138 ).toThrowError(
139 new TypeError(
140 'Cannot instantiate a pool with a non safe integer number of workers'
141 )
142 )
143 expect(
144 () =>
145 new DynamicClusterPool(
146 0,
147 0.5,
a5ed75b7 148 './tests/worker-files/cluster/testWorker.js'
2761efb4
JB
149 )
150 ).toThrowError(
151 new TypeError(
152 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
153 )
154 )
2431bdb4
JB
155 expect(
156 () =>
b2fd3f4a
JB
157 new DynamicThreadPool(
158 2,
159 1,
160 './tests/worker-files/thread/testWorker.mjs'
161 )
2431bdb4
JB
162 ).toThrowError(
163 new RangeError(
164 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
165 )
166 )
167 expect(
168 () =>
b2fd3f4a
JB
169 new DynamicThreadPool(
170 0,
171 0,
172 './tests/worker-files/thread/testWorker.mjs'
173 )
2431bdb4
JB
174 ).toThrowError(
175 new RangeError(
213cbac6 176 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
2431bdb4
JB
177 )
178 )
21f710aa
JB
179 expect(
180 () =>
213cbac6
JB
181 new DynamicClusterPool(
182 1,
183 1,
184 './tests/worker-files/cluster/testWorker.js'
185 )
21f710aa
JB
186 ).toThrowError(
187 new RangeError(
213cbac6 188 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
21f710aa
JB
189 )
190 )
2431bdb4
JB
191 })
192
fd7ebd49 193 it('Verify that pool options are checked', async () => {
7c0ba920
JB
194 let pool = new FixedThreadPool(
195 numberOfWorkers,
b2fd3f4a 196 './tests/worker-files/thread/testWorker.mjs'
7c0ba920 197 )
b5604034 198 expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
47352846
JB
199 expect(pool.opts).toStrictEqual({
200 startWorkers: true,
201 enableEvents: true,
202 restartWorkerOnError: true,
203 enableTasksQueue: false,
204 workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
205 workerChoiceStrategyOptions: {
206 retries: 6,
207 runTime: { median: false },
208 waitTime: { median: false },
209 elu: { median: false }
210 }
8990357d
JB
211 })
212 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
8c0b113f 213 retries: 6,
932fc8be 214 runTime: { median: false },
5df69fab
JB
215 waitTime: { median: false },
216 elu: { median: false }
da309861 217 })
999ef664
JB
218 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
219 .workerChoiceStrategies) {
220 expect(workerChoiceStrategy.opts).toStrictEqual({
221 retries: 6,
222 runTime: { median: false },
223 waitTime: { median: false },
224 elu: { median: false }
225 })
226 }
fd7ebd49 227 await pool.destroy()
73bfd59d 228 const testHandler = () => console.info('test handler executed')
7c0ba920
JB
229 pool = new FixedThreadPool(
230 numberOfWorkers,
b2fd3f4a 231 './tests/worker-files/thread/testWorker.mjs',
7c0ba920 232 {
e4543b14 233 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
49be33fe 234 workerChoiceStrategyOptions: {
932fc8be 235 runTime: { median: true },
fc027381 236 weights: { 0: 300, 1: 200 }
49be33fe 237 },
35cf1c03 238 enableEvents: false,
1f68cede 239 restartWorkerOnError: false,
ff733df7 240 enableTasksQueue: true,
d4aeae5a 241 tasksQueueOptions: { concurrency: 2 },
35cf1c03
JB
242 messageHandler: testHandler,
243 errorHandler: testHandler,
244 onlineHandler: testHandler,
245 exitHandler: testHandler
7c0ba920
JB
246 }
247 )
7c0ba920 248 expect(pool.emitter).toBeUndefined()
47352846
JB
249 expect(pool.opts).toStrictEqual({
250 startWorkers: true,
251 enableEvents: false,
252 restartWorkerOnError: false,
253 enableTasksQueue: true,
254 tasksQueueOptions: {
255 concurrency: 2,
2324f8c9 256 size: Math.pow(numberOfWorkers, 2),
dbd73092 257 taskStealing: true,
47352846
JB
258 tasksStealingOnBackPressure: true
259 },
260 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
261 workerChoiceStrategyOptions: {
262 retries: 6,
263 runTime: { median: true },
264 waitTime: { median: false },
265 elu: { median: false },
266 weights: { 0: 300, 1: 200 }
267 },
268 onlineHandler: testHandler,
269 messageHandler: testHandler,
270 errorHandler: testHandler,
271 exitHandler: testHandler
8990357d
JB
272 })
273 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
8c0b113f 274 retries: 6,
8990357d
JB
275 runTime: { median: true },
276 waitTime: { median: false },
277 elu: { median: false },
fc027381 278 weights: { 0: 300, 1: 200 }
da309861 279 })
999ef664
JB
280 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
281 .workerChoiceStrategies) {
282 expect(workerChoiceStrategy.opts).toStrictEqual({
283 retries: 6,
284 runTime: { median: true },
285 waitTime: { median: false },
286 elu: { median: false },
287 weights: { 0: 300, 1: 200 }
288 })
289 }
fd7ebd49 290 await pool.destroy()
7c0ba920
JB
291 })
292
a20f0ba5 293 it('Verify that pool options are validated', async () => {
d4aeae5a
JB
294 expect(
295 () =>
296 new FixedThreadPool(
297 numberOfWorkers,
b2fd3f4a 298 './tests/worker-files/thread/testWorker.mjs',
d4aeae5a 299 {
f0d7f803 300 workerChoiceStrategy: 'invalidStrategy'
d4aeae5a
JB
301 }
302 )
8735b4e5
JB
303 ).toThrowError(
304 new Error("Invalid worker choice strategy 'invalidStrategy'")
305 )
7e653ee0
JB
306 expect(
307 () =>
308 new FixedThreadPool(
309 numberOfWorkers,
b2fd3f4a 310 './tests/worker-files/thread/testWorker.mjs',
7e653ee0
JB
311 {
312 workerChoiceStrategyOptions: {
8c0b113f 313 retries: 'invalidChoiceRetries'
7e653ee0
JB
314 }
315 }
316 )
317 ).toThrowError(
318 new TypeError(
8c0b113f 319 'Invalid worker choice strategy options: retries must be an integer'
7e653ee0
JB
320 )
321 )
322 expect(
323 () =>
324 new FixedThreadPool(
325 numberOfWorkers,
b2fd3f4a 326 './tests/worker-files/thread/testWorker.mjs',
7e653ee0
JB
327 {
328 workerChoiceStrategyOptions: {
8c0b113f 329 retries: -1
7e653ee0
JB
330 }
331 }
332 )
333 ).toThrowError(
334 new RangeError(
8c0b113f 335 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
7e653ee0
JB
336 )
337 )
49be33fe
JB
338 expect(
339 () =>
340 new FixedThreadPool(
341 numberOfWorkers,
b2fd3f4a 342 './tests/worker-files/thread/testWorker.mjs',
49be33fe
JB
343 {
344 workerChoiceStrategyOptions: { weights: {} }
345 }
346 )
347 ).toThrowError(
8735b4e5
JB
348 new Error(
349 'Invalid worker choice strategy options: must have a weight for each worker node'
350 )
49be33fe 351 )
f0d7f803
JB
352 expect(
353 () =>
354 new FixedThreadPool(
355 numberOfWorkers,
b2fd3f4a 356 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
357 {
358 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
359 }
360 )
361 ).toThrowError(
8735b4e5
JB
362 new Error(
363 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
364 )
f0d7f803 365 )
5c4c2dee
JB
366 expect(
367 () =>
368 new FixedThreadPool(
369 numberOfWorkers,
b2fd3f4a 370 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
371 {
372 enableTasksQueue: true,
373 tasksQueueOptions: 'invalidTasksQueueOptions'
374 }
375 )
376 ).toThrowError(
377 new TypeError('Invalid tasks queue options: must be a plain object')
378 )
f0d7f803
JB
379 expect(
380 () =>
381 new FixedThreadPool(
382 numberOfWorkers,
b2fd3f4a 383 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
384 {
385 enableTasksQueue: true,
386 tasksQueueOptions: { concurrency: 0 }
387 }
388 )
8735b4e5 389 ).toThrowError(
e695d66f 390 new RangeError(
20c6f652 391 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
8735b4e5
JB
392 )
393 )
f0d7f803
JB
394 expect(
395 () =>
396 new FixedThreadPool(
397 numberOfWorkers,
b2fd3f4a 398 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
399 {
400 enableTasksQueue: true,
5c4c2dee 401 tasksQueueOptions: { concurrency: -1 }
f0d7f803
JB
402 }
403 )
8735b4e5 404 ).toThrowError(
5c4c2dee
JB
405 new RangeError(
406 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
407 )
8735b4e5 408 )
f0d7f803
JB
409 expect(
410 () =>
411 new FixedThreadPool(
412 numberOfWorkers,
b2fd3f4a 413 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
414 {
415 enableTasksQueue: true,
416 tasksQueueOptions: { concurrency: 0.2 }
417 }
418 )
8735b4e5 419 ).toThrowError(
20c6f652 420 new TypeError('Invalid worker node tasks concurrency: must be an integer')
8735b4e5 421 )
5c4c2dee
JB
422 expect(
423 () =>
424 new FixedThreadPool(
425 numberOfWorkers,
b2fd3f4a 426 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
427 {
428 enableTasksQueue: true,
429 tasksQueueOptions: { size: 0 }
430 }
431 )
432 ).toThrowError(
433 new RangeError(
434 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
435 )
436 )
437 expect(
438 () =>
439 new FixedThreadPool(
440 numberOfWorkers,
b2fd3f4a 441 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
442 {
443 enableTasksQueue: true,
444 tasksQueueOptions: { size: -1 }
445 }
446 )
447 ).toThrowError(
448 new RangeError(
449 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
450 )
451 )
452 expect(
453 () =>
454 new FixedThreadPool(
455 numberOfWorkers,
b2fd3f4a 456 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
457 {
458 enableTasksQueue: true,
459 tasksQueueOptions: { size: 0.2 }
460 }
461 )
462 ).toThrowError(
463 new TypeError('Invalid worker node tasks queue size: must be an integer')
464 )
d4aeae5a
JB
465 })
466
2431bdb4 467 it('Verify that pool worker choice strategy options can be set', async () => {
a20f0ba5
JB
468 const pool = new FixedThreadPool(
469 numberOfWorkers,
b2fd3f4a 470 './tests/worker-files/thread/testWorker.mjs',
a20f0ba5
JB
471 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
472 )
473 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
8c0b113f 474 retries: 6,
8990357d
JB
475 runTime: { median: false },
476 waitTime: { median: false },
477 elu: { median: false }
478 })
479 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
8c0b113f 480 retries: 6,
932fc8be 481 runTime: { median: false },
5df69fab
JB
482 waitTime: { median: false },
483 elu: { median: false }
a20f0ba5
JB
484 })
485 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
486 .workerChoiceStrategies) {
86bf340d 487 expect(workerChoiceStrategy.opts).toStrictEqual({
8c0b113f 488 retries: 6,
932fc8be 489 runTime: { median: false },
5df69fab
JB
490 waitTime: { median: false },
491 elu: { median: false }
86bf340d 492 })
a20f0ba5 493 }
87de9ff5
JB
494 expect(
495 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
496 ).toStrictEqual({
932fc8be
JB
497 runTime: {
498 aggregate: true,
499 average: true,
500 median: false
501 },
502 waitTime: {
503 aggregate: false,
504 average: false,
505 median: false
506 },
5df69fab 507 elu: {
9adcefab
JB
508 aggregate: true,
509 average: true,
5df69fab
JB
510 median: false
511 }
86bf340d 512 })
9adcefab
JB
513 pool.setWorkerChoiceStrategyOptions({
514 runTime: { median: true },
515 elu: { median: true }
516 })
a20f0ba5 517 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
8c0b113f 518 retries: 6,
9adcefab 519 runTime: { median: true },
8990357d
JB
520 waitTime: { median: false },
521 elu: { median: true }
522 })
523 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
8c0b113f 524 retries: 6,
8990357d
JB
525 runTime: { median: true },
526 waitTime: { median: false },
9adcefab 527 elu: { median: true }
a20f0ba5
JB
528 })
529 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
530 .workerChoiceStrategies) {
932fc8be 531 expect(workerChoiceStrategy.opts).toStrictEqual({
8c0b113f 532 retries: 6,
9adcefab 533 runTime: { median: true },
8990357d 534 waitTime: { median: false },
9adcefab 535 elu: { median: true }
932fc8be 536 })
a20f0ba5 537 }
87de9ff5
JB
538 expect(
539 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
540 ).toStrictEqual({
932fc8be
JB
541 runTime: {
542 aggregate: true,
543 average: false,
544 median: true
545 },
546 waitTime: {
547 aggregate: false,
548 average: false,
549 median: false
550 },
5df69fab 551 elu: {
9adcefab 552 aggregate: true,
5df69fab 553 average: false,
9adcefab 554 median: true
5df69fab 555 }
86bf340d 556 })
9adcefab
JB
557 pool.setWorkerChoiceStrategyOptions({
558 runTime: { median: false },
559 elu: { median: false }
560 })
a20f0ba5 561 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
8c0b113f 562 retries: 6,
8990357d
JB
563 runTime: { median: false },
564 waitTime: { median: false },
565 elu: { median: false }
566 })
567 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
8c0b113f 568 retries: 6,
9adcefab 569 runTime: { median: false },
8990357d 570 waitTime: { median: false },
9adcefab 571 elu: { median: false }
a20f0ba5
JB
572 })
573 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
574 .workerChoiceStrategies) {
932fc8be 575 expect(workerChoiceStrategy.opts).toStrictEqual({
8c0b113f 576 retries: 6,
9adcefab 577 runTime: { median: false },
8990357d 578 waitTime: { median: false },
9adcefab 579 elu: { median: false }
932fc8be 580 })
a20f0ba5 581 }
87de9ff5
JB
582 expect(
583 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
584 ).toStrictEqual({
932fc8be
JB
585 runTime: {
586 aggregate: true,
587 average: true,
588 median: false
589 },
590 waitTime: {
591 aggregate: false,
592 average: false,
593 median: false
594 },
5df69fab 595 elu: {
9adcefab
JB
596 aggregate: true,
597 average: true,
5df69fab
JB
598 median: false
599 }
86bf340d 600 })
1f95d544
JB
601 expect(() =>
602 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
603 ).toThrowError(
8735b4e5
JB
604 new TypeError(
605 'Invalid worker choice strategy options: must be a plain object'
606 )
1f95d544 607 )
7e653ee0
JB
608 expect(() =>
609 pool.setWorkerChoiceStrategyOptions({
8c0b113f 610 retries: 'invalidChoiceRetries'
7e653ee0
JB
611 })
612 ).toThrowError(
613 new TypeError(
8c0b113f 614 'Invalid worker choice strategy options: retries must be an integer'
7e653ee0
JB
615 )
616 )
617 expect(() =>
8c0b113f 618 pool.setWorkerChoiceStrategyOptions({ retries: -1 })
7e653ee0
JB
619 ).toThrowError(
620 new RangeError(
8c0b113f 621 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
7e653ee0
JB
622 )
623 )
1f95d544
JB
624 expect(() =>
625 pool.setWorkerChoiceStrategyOptions({ weights: {} })
626 ).toThrowError(
8735b4e5
JB
627 new Error(
628 'Invalid worker choice strategy options: must have a weight for each worker node'
629 )
1f95d544
JB
630 )
631 expect(() =>
632 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
633 ).toThrowError(
8735b4e5
JB
634 new Error(
635 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
636 )
1f95d544 637 )
a20f0ba5
JB
638 await pool.destroy()
639 })
640
2431bdb4 641 it('Verify that pool tasks queue can be enabled/disabled', async () => {
a20f0ba5
JB
642 const pool = new FixedThreadPool(
643 numberOfWorkers,
b2fd3f4a 644 './tests/worker-files/thread/testWorker.mjs'
a20f0ba5
JB
645 )
646 expect(pool.opts.enableTasksQueue).toBe(false)
647 expect(pool.opts.tasksQueueOptions).toBeUndefined()
d6ca1416
JB
648 for (const workerNode of pool.workerNodes) {
649 expect(workerNode.onEmptyQueue).toBeUndefined()
650 expect(workerNode.onBackPressure).toBeUndefined()
651 }
a20f0ba5
JB
652 pool.enableTasksQueue(true)
653 expect(pool.opts.enableTasksQueue).toBe(true)
20c6f652
JB
654 expect(pool.opts.tasksQueueOptions).toStrictEqual({
655 concurrency: 1,
2324f8c9 656 size: Math.pow(numberOfWorkers, 2),
dbd73092 657 taskStealing: true,
47352846 658 tasksStealingOnBackPressure: true
20c6f652 659 })
d6ca1416
JB
660 for (const workerNode of pool.workerNodes) {
661 expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
662 expect(workerNode.onBackPressure).toBeInstanceOf(Function)
663 }
a20f0ba5
JB
664 pool.enableTasksQueue(true, { concurrency: 2 })
665 expect(pool.opts.enableTasksQueue).toBe(true)
20c6f652
JB
666 expect(pool.opts.tasksQueueOptions).toStrictEqual({
667 concurrency: 2,
2324f8c9 668 size: Math.pow(numberOfWorkers, 2),
dbd73092 669 taskStealing: true,
47352846 670 tasksStealingOnBackPressure: true
20c6f652 671 })
d6ca1416
JB
672 for (const workerNode of pool.workerNodes) {
673 expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
674 expect(workerNode.onBackPressure).toBeInstanceOf(Function)
675 }
a20f0ba5
JB
676 pool.enableTasksQueue(false)
677 expect(pool.opts.enableTasksQueue).toBe(false)
678 expect(pool.opts.tasksQueueOptions).toBeUndefined()
d6ca1416
JB
679 for (const workerNode of pool.workerNodes) {
680 expect(workerNode.onEmptyQueue).toBeUndefined()
681 expect(workerNode.onBackPressure).toBeUndefined()
682 }
a20f0ba5
JB
683 await pool.destroy()
684 })
685
2431bdb4 686 it('Verify that pool tasks queue options can be set', async () => {
a20f0ba5
JB
687 const pool = new FixedThreadPool(
688 numberOfWorkers,
b2fd3f4a 689 './tests/worker-files/thread/testWorker.mjs',
a20f0ba5
JB
690 { enableTasksQueue: true }
691 )
20c6f652
JB
692 expect(pool.opts.tasksQueueOptions).toStrictEqual({
693 concurrency: 1,
2324f8c9 694 size: Math.pow(numberOfWorkers, 2),
dbd73092 695 taskStealing: true,
47352846 696 tasksStealingOnBackPressure: true
20c6f652 697 })
d6ca1416 698 for (const workerNode of pool.workerNodes) {
2324f8c9
JB
699 expect(workerNode.tasksQueueBackPressureSize).toBe(
700 pool.opts.tasksQueueOptions.size
701 )
d6ca1416
JB
702 expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
703 expect(workerNode.onBackPressure).toBeInstanceOf(Function)
704 }
705 pool.setTasksQueueOptions({
706 concurrency: 2,
2324f8c9 707 size: 2,
d6ca1416
JB
708 taskStealing: false,
709 tasksStealingOnBackPressure: false
710 })
20c6f652
JB
711 expect(pool.opts.tasksQueueOptions).toStrictEqual({
712 concurrency: 2,
2324f8c9 713 size: 2,
d6ca1416
JB
714 taskStealing: false,
715 tasksStealingOnBackPressure: false
716 })
717 for (const workerNode of pool.workerNodes) {
2324f8c9
JB
718 expect(workerNode.tasksQueueBackPressureSize).toBe(
719 pool.opts.tasksQueueOptions.size
720 )
d6ca1416
JB
721 expect(workerNode.onEmptyQueue).toBeUndefined()
722 expect(workerNode.onBackPressure).toBeUndefined()
723 }
724 pool.setTasksQueueOptions({
725 concurrency: 1,
726 taskStealing: true,
727 tasksStealingOnBackPressure: true
728 })
729 expect(pool.opts.tasksQueueOptions).toStrictEqual({
730 concurrency: 1,
2324f8c9 731 size: Math.pow(numberOfWorkers, 2),
dbd73092 732 taskStealing: true,
47352846 733 tasksStealingOnBackPressure: true
20c6f652 734 })
d6ca1416 735 for (const workerNode of pool.workerNodes) {
2324f8c9
JB
736 expect(workerNode.tasksQueueBackPressureSize).toBe(
737 pool.opts.tasksQueueOptions.size
738 )
d6ca1416
JB
739 expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
740 expect(workerNode.onBackPressure).toBeInstanceOf(Function)
741 }
f0d7f803
JB
742 expect(() =>
743 pool.setTasksQueueOptions('invalidTasksQueueOptions')
8735b4e5
JB
744 ).toThrowError(
745 new TypeError('Invalid tasks queue options: must be a plain object')
746 )
a20f0ba5 747 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrowError(
e695d66f 748 new RangeError(
20c6f652 749 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
8735b4e5
JB
750 )
751 )
752 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrowError(
e695d66f 753 new RangeError(
20c6f652 754 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
8735b4e5 755 )
a20f0ba5 756 )
f0d7f803 757 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrowError(
20c6f652
JB
758 new TypeError('Invalid worker node tasks concurrency: must be an integer')
759 )
ff3f866a 760 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrowError(
20c6f652 761 new RangeError(
68dbcdc0 762 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
20c6f652
JB
763 )
764 )
ff3f866a 765 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrowError(
20c6f652 766 new RangeError(
68dbcdc0 767 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
20c6f652
JB
768 )
769 )
ff3f866a 770 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrowError(
68dbcdc0 771 new TypeError('Invalid worker node tasks queue size: must be an integer')
f0d7f803 772 )
a20f0ba5
JB
773 await pool.destroy()
774 })
775
6b27d407
JB
776 it('Verify that pool info is set', async () => {
777 let pool = new FixedThreadPool(
778 numberOfWorkers,
b2fd3f4a 779 './tests/worker-files/thread/testWorker.mjs'
6b27d407 780 )
2dca6cab
JB
781 expect(pool.info).toStrictEqual({
782 version,
783 type: PoolTypes.fixed,
784 worker: WorkerTypes.thread,
47352846 785 started: true,
2dca6cab
JB
786 ready: true,
787 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
788 minSize: numberOfWorkers,
789 maxSize: numberOfWorkers,
790 workerNodes: numberOfWorkers,
791 idleWorkerNodes: numberOfWorkers,
792 busyWorkerNodes: 0,
793 executedTasks: 0,
794 executingTasks: 0,
2dca6cab
JB
795 failedTasks: 0
796 })
6b27d407
JB
797 await pool.destroy()
798 pool = new DynamicClusterPool(
2431bdb4 799 Math.floor(numberOfWorkers / 2),
6b27d407 800 numberOfWorkers,
ecdfbdc0 801 './tests/worker-files/cluster/testWorker.js'
6b27d407 802 )
2dca6cab
JB
803 expect(pool.info).toStrictEqual({
804 version,
805 type: PoolTypes.dynamic,
806 worker: WorkerTypes.cluster,
47352846 807 started: true,
2dca6cab
JB
808 ready: true,
809 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
810 minSize: Math.floor(numberOfWorkers / 2),
811 maxSize: numberOfWorkers,
812 workerNodes: Math.floor(numberOfWorkers / 2),
813 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
814 busyWorkerNodes: 0,
815 executedTasks: 0,
816 executingTasks: 0,
2dca6cab
JB
817 failedTasks: 0
818 })
6b27d407
JB
819 await pool.destroy()
820 })
821
2431bdb4 822 it('Verify that pool worker tasks usage are initialized', async () => {
bf9549ae
JB
823 const pool = new FixedClusterPool(
824 numberOfWorkers,
825 './tests/worker-files/cluster/testWorker.js'
826 )
f06e48d8 827 for (const workerNode of pool.workerNodes) {
47352846 828 expect(workerNode).toBeInstanceOf(WorkerNode)
465b2940 829 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
830 tasks: {
831 executed: 0,
832 executing: 0,
833 queued: 0,
df593701 834 maxQueued: 0,
68cbdc84 835 stolen: 0,
a4e07f72
JB
836 failed: 0
837 },
838 runTime: {
4ba4c7f9 839 history: new CircularArray()
a4e07f72
JB
840 },
841 waitTime: {
4ba4c7f9 842 history: new CircularArray()
a4e07f72 843 },
5df69fab
JB
844 elu: {
845 idle: {
4ba4c7f9 846 history: new CircularArray()
5df69fab
JB
847 },
848 active: {
4ba4c7f9 849 history: new CircularArray()
f7510105 850 }
5df69fab 851 }
86bf340d 852 })
f06e48d8
JB
853 }
854 await pool.destroy()
855 })
856
2431bdb4
JB
857 it('Verify that pool worker tasks queue are initialized', async () => {
858 let pool = new FixedClusterPool(
f06e48d8
JB
859 numberOfWorkers,
860 './tests/worker-files/cluster/testWorker.js'
861 )
862 for (const workerNode of pool.workerNodes) {
47352846 863 expect(workerNode).toBeInstanceOf(WorkerNode)
574b351d 864 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
4d8bf9e4 865 expect(workerNode.tasksQueue.size).toBe(0)
9c16fb4b 866 expect(workerNode.tasksQueue.maxSize).toBe(0)
bf9549ae 867 }
fd7ebd49 868 await pool.destroy()
2431bdb4
JB
869 pool = new DynamicThreadPool(
870 Math.floor(numberOfWorkers / 2),
871 numberOfWorkers,
b2fd3f4a 872 './tests/worker-files/thread/testWorker.mjs'
2431bdb4
JB
873 )
874 for (const workerNode of pool.workerNodes) {
47352846 875 expect(workerNode).toBeInstanceOf(WorkerNode)
574b351d 876 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
2431bdb4
JB
877 expect(workerNode.tasksQueue.size).toBe(0)
878 expect(workerNode.tasksQueue.maxSize).toBe(0)
879 }
213cbac6 880 await pool.destroy()
2431bdb4
JB
881 })
882
883 it('Verify that pool worker info are initialized', async () => {
884 let pool = new FixedClusterPool(
885 numberOfWorkers,
886 './tests/worker-files/cluster/testWorker.js'
887 )
2dca6cab 888 for (const workerNode of pool.workerNodes) {
47352846 889 expect(workerNode).toBeInstanceOf(WorkerNode)
2dca6cab
JB
890 expect(workerNode.info).toStrictEqual({
891 id: expect.any(Number),
892 type: WorkerTypes.cluster,
893 dynamic: false,
894 ready: true
895 })
896 }
2431bdb4
JB
897 await pool.destroy()
898 pool = new DynamicThreadPool(
899 Math.floor(numberOfWorkers / 2),
900 numberOfWorkers,
b2fd3f4a 901 './tests/worker-files/thread/testWorker.mjs'
2431bdb4 902 )
2dca6cab 903 for (const workerNode of pool.workerNodes) {
47352846 904 expect(workerNode).toBeInstanceOf(WorkerNode)
2dca6cab
JB
905 expect(workerNode.info).toStrictEqual({
906 id: expect.any(Number),
907 type: WorkerTypes.thread,
908 dynamic: false,
7884d183 909 ready: true
2dca6cab
JB
910 })
911 }
213cbac6 912 await pool.destroy()
bf9549ae
JB
913 })
914
47352846
JB
915 it('Verify that pool can be started after initialization', async () => {
916 const pool = new FixedClusterPool(
917 numberOfWorkers,
918 './tests/worker-files/cluster/testWorker.js',
919 {
920 startWorkers: false
921 }
922 )
923 expect(pool.info.started).toBe(false)
924 expect(pool.info.ready).toBe(false)
925 expect(pool.workerNodes).toStrictEqual([])
926 await expect(pool.execute()).rejects.toThrowError(
927 new Error('Cannot execute a task on not started pool')
928 )
929 pool.start()
930 expect(pool.info.started).toBe(true)
931 expect(pool.info.ready).toBe(true)
932 expect(pool.workerNodes.length).toBe(numberOfWorkers)
933 for (const workerNode of pool.workerNodes) {
934 expect(workerNode).toBeInstanceOf(WorkerNode)
935 }
936 await pool.destroy()
937 })
938
9d2d0da1
JB
939 it('Verify that pool execute() arguments are checked', async () => {
940 const pool = new FixedClusterPool(
941 numberOfWorkers,
942 './tests/worker-files/cluster/testWorker.js'
943 )
944 await expect(pool.execute(undefined, 0)).rejects.toThrowError(
945 new TypeError('name argument must be a string')
946 )
947 await expect(pool.execute(undefined, '')).rejects.toThrowError(
948 new TypeError('name argument must not be an empty string')
949 )
950 await expect(pool.execute(undefined, undefined, {})).rejects.toThrowError(
951 new TypeError('transferList argument must be an array')
952 )
953 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
954 "Task function 'unknown' not found"
955 )
956 await pool.destroy()
47352846
JB
957 await expect(pool.execute()).rejects.toThrowError(
958 new Error('Cannot execute a task on not started pool')
9d2d0da1
JB
959 )
960 })
961
2431bdb4 962 it('Verify that pool worker tasks usage are computed', async () => {
bf9549ae
JB
963 const pool = new FixedClusterPool(
964 numberOfWorkers,
965 './tests/worker-files/cluster/testWorker.js'
966 )
09c2d0d3 967 const promises = new Set()
fc027381
JB
968 const maxMultiplier = 2
969 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
09c2d0d3 970 promises.add(pool.execute())
bf9549ae 971 }
f06e48d8 972 for (const workerNode of pool.workerNodes) {
465b2940 973 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
974 tasks: {
975 executed: 0,
976 executing: maxMultiplier,
977 queued: 0,
df593701 978 maxQueued: 0,
68cbdc84 979 stolen: 0,
a4e07f72
JB
980 failed: 0
981 },
982 runTime: {
a4e07f72
JB
983 history: expect.any(CircularArray)
984 },
985 waitTime: {
a4e07f72
JB
986 history: expect.any(CircularArray)
987 },
5df69fab
JB
988 elu: {
989 idle: {
5df69fab
JB
990 history: expect.any(CircularArray)
991 },
992 active: {
5df69fab 993 history: expect.any(CircularArray)
f7510105 994 }
5df69fab 995 }
86bf340d 996 })
bf9549ae
JB
997 }
998 await Promise.all(promises)
f06e48d8 999 for (const workerNode of pool.workerNodes) {
465b2940 1000 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
1001 tasks: {
1002 executed: maxMultiplier,
1003 executing: 0,
1004 queued: 0,
df593701 1005 maxQueued: 0,
68cbdc84 1006 stolen: 0,
a4e07f72
JB
1007 failed: 0
1008 },
1009 runTime: {
a4e07f72
JB
1010 history: expect.any(CircularArray)
1011 },
1012 waitTime: {
a4e07f72
JB
1013 history: expect.any(CircularArray)
1014 },
5df69fab
JB
1015 elu: {
1016 idle: {
5df69fab
JB
1017 history: expect.any(CircularArray)
1018 },
1019 active: {
5df69fab 1020 history: expect.any(CircularArray)
f7510105 1021 }
5df69fab 1022 }
86bf340d 1023 })
bf9549ae 1024 }
fd7ebd49 1025 await pool.destroy()
bf9549ae
JB
1026 })
1027
2431bdb4 1028 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
7fd82a1c 1029 const pool = new DynamicThreadPool(
2431bdb4 1030 Math.floor(numberOfWorkers / 2),
8f4878b7 1031 numberOfWorkers,
b2fd3f4a 1032 './tests/worker-files/thread/testWorker.mjs'
9e619829 1033 )
09c2d0d3 1034 const promises = new Set()
ee9f5295
JB
1035 const maxMultiplier = 2
1036 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
09c2d0d3 1037 promises.add(pool.execute())
9e619829
JB
1038 }
1039 await Promise.all(promises)
f06e48d8 1040 for (const workerNode of pool.workerNodes) {
465b2940 1041 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
1042 tasks: {
1043 executed: expect.any(Number),
1044 executing: 0,
1045 queued: 0,
df593701 1046 maxQueued: 0,
68cbdc84 1047 stolen: 0,
a4e07f72
JB
1048 failed: 0
1049 },
1050 runTime: {
a4e07f72
JB
1051 history: expect.any(CircularArray)
1052 },
1053 waitTime: {
a4e07f72
JB
1054 history: expect.any(CircularArray)
1055 },
5df69fab
JB
1056 elu: {
1057 idle: {
5df69fab
JB
1058 history: expect.any(CircularArray)
1059 },
1060 active: {
5df69fab 1061 history: expect.any(CircularArray)
f7510105 1062 }
5df69fab 1063 }
86bf340d 1064 })
465b2940 1065 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
94407def
JB
1066 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1067 numberOfWorkers * maxMultiplier
1068 )
b97d82d8
JB
1069 expect(workerNode.usage.runTime.history.length).toBe(0)
1070 expect(workerNode.usage.waitTime.history.length).toBe(0)
1071 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1072 expect(workerNode.usage.elu.active.history.length).toBe(0)
9e619829
JB
1073 }
1074 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
f06e48d8 1075 for (const workerNode of pool.workerNodes) {
465b2940 1076 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
1077 tasks: {
1078 executed: 0,
1079 executing: 0,
1080 queued: 0,
df593701 1081 maxQueued: 0,
68cbdc84 1082 stolen: 0,
a4e07f72
JB
1083 failed: 0
1084 },
1085 runTime: {
a4e07f72
JB
1086 history: expect.any(CircularArray)
1087 },
1088 waitTime: {
a4e07f72
JB
1089 history: expect.any(CircularArray)
1090 },
5df69fab
JB
1091 elu: {
1092 idle: {
5df69fab
JB
1093 history: expect.any(CircularArray)
1094 },
1095 active: {
5df69fab 1096 history: expect.any(CircularArray)
f7510105 1097 }
5df69fab 1098 }
86bf340d 1099 })
465b2940
JB
1100 expect(workerNode.usage.runTime.history.length).toBe(0)
1101 expect(workerNode.usage.waitTime.history.length).toBe(0)
b97d82d8
JB
1102 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1103 expect(workerNode.usage.elu.active.history.length).toBe(0)
ee11a4a2 1104 }
fd7ebd49 1105 await pool.destroy()
ee11a4a2
JB
1106 })
1107
a1763c54
JB
1108 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1109 const pool = new DynamicClusterPool(
2431bdb4 1110 Math.floor(numberOfWorkers / 2),
164d950a 1111 numberOfWorkers,
a1763c54 1112 './tests/worker-files/cluster/testWorker.js'
164d950a 1113 )
c726f66c 1114 expect(pool.emitter.eventNames()).toStrictEqual([])
d46660cd 1115 let poolInfo
a1763c54 1116 let poolReady = 0
041dc05b 1117 pool.emitter.on(PoolEvents.ready, info => {
a1763c54 1118 ++poolReady
d46660cd
JB
1119 poolInfo = info
1120 })
a1763c54 1121 await waitPoolEvents(pool, PoolEvents.ready, 1)
c726f66c 1122 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
a1763c54 1123 expect(poolReady).toBe(1)
d46660cd 1124 expect(poolInfo).toStrictEqual({
23ccf9d7 1125 version,
d46660cd 1126 type: PoolTypes.dynamic,
a1763c54 1127 worker: WorkerTypes.cluster,
47352846 1128 started: true,
a1763c54 1129 ready: true,
2431bdb4
JB
1130 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1131 minSize: expect.any(Number),
1132 maxSize: expect.any(Number),
1133 workerNodes: expect.any(Number),
1134 idleWorkerNodes: expect.any(Number),
1135 busyWorkerNodes: expect.any(Number),
1136 executedTasks: expect.any(Number),
1137 executingTasks: expect.any(Number),
2431bdb4
JB
1138 failedTasks: expect.any(Number)
1139 })
1140 await pool.destroy()
1141 })
1142
a1763c54
JB
1143 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1144 const pool = new FixedThreadPool(
2431bdb4 1145 numberOfWorkers,
b2fd3f4a 1146 './tests/worker-files/thread/testWorker.mjs'
2431bdb4 1147 )
c726f66c 1148 expect(pool.emitter.eventNames()).toStrictEqual([])
a1763c54
JB
1149 const promises = new Set()
1150 let poolBusy = 0
2431bdb4 1151 let poolInfo
041dc05b 1152 pool.emitter.on(PoolEvents.busy, info => {
a1763c54 1153 ++poolBusy
2431bdb4
JB
1154 poolInfo = info
1155 })
c726f66c 1156 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
a1763c54
JB
1157 for (let i = 0; i < numberOfWorkers * 2; i++) {
1158 promises.add(pool.execute())
1159 }
1160 await Promise.all(promises)
1161 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1162 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1163 expect(poolBusy).toBe(numberOfWorkers + 1)
2431bdb4
JB
1164 expect(poolInfo).toStrictEqual({
1165 version,
a1763c54
JB
1166 type: PoolTypes.fixed,
1167 worker: WorkerTypes.thread,
47352846
JB
1168 started: true,
1169 ready: true,
2431bdb4 1170 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
d46660cd
JB
1171 minSize: expect.any(Number),
1172 maxSize: expect.any(Number),
1173 workerNodes: expect.any(Number),
1174 idleWorkerNodes: expect.any(Number),
1175 busyWorkerNodes: expect.any(Number),
a4e07f72
JB
1176 executedTasks: expect.any(Number),
1177 executingTasks: expect.any(Number),
a4e07f72 1178 failedTasks: expect.any(Number)
d46660cd 1179 })
164d950a
JB
1180 await pool.destroy()
1181 })
1182
a1763c54
JB
1183 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1184 const pool = new DynamicThreadPool(
1185 Math.floor(numberOfWorkers / 2),
7c0ba920 1186 numberOfWorkers,
b2fd3f4a 1187 './tests/worker-files/thread/testWorker.mjs'
7c0ba920 1188 )
c726f66c 1189 expect(pool.emitter.eventNames()).toStrictEqual([])
09c2d0d3 1190 const promises = new Set()
a1763c54 1191 let poolFull = 0
d46660cd 1192 let poolInfo
041dc05b 1193 pool.emitter.on(PoolEvents.full, info => {
a1763c54 1194 ++poolFull
d46660cd
JB
1195 poolInfo = info
1196 })
c726f66c 1197 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.full])
7c0ba920 1198 for (let i = 0; i < numberOfWorkers * 2; i++) {
f5d14e90 1199 promises.add(pool.execute())
7c0ba920 1200 }
cf597bc5 1201 await Promise.all(promises)
33e6bb4c 1202 expect(poolFull).toBe(1)
d46660cd 1203 expect(poolInfo).toStrictEqual({
23ccf9d7 1204 version,
a1763c54 1205 type: PoolTypes.dynamic,
d46660cd 1206 worker: WorkerTypes.thread,
47352846
JB
1207 started: true,
1208 ready: true,
2431bdb4 1209 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
8735b4e5
JB
1210 minSize: expect.any(Number),
1211 maxSize: expect.any(Number),
1212 workerNodes: expect.any(Number),
1213 idleWorkerNodes: expect.any(Number),
1214 busyWorkerNodes: expect.any(Number),
1215 executedTasks: expect.any(Number),
1216 executingTasks: expect.any(Number),
1217 failedTasks: expect.any(Number)
1218 })
1219 await pool.destroy()
1220 })
1221
3e8611a8 1222 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
b1aae695 1223 const pool = new FixedThreadPool(
8735b4e5 1224 numberOfWorkers,
b2fd3f4a 1225 './tests/worker-files/thread/testWorker.mjs',
8735b4e5
JB
1226 {
1227 enableTasksQueue: true
1228 }
1229 )
a074ffee 1230 stub(pool, 'hasBackPressure').returns(true)
c726f66c 1231 expect(pool.emitter.eventNames()).toStrictEqual([])
8735b4e5
JB
1232 const promises = new Set()
1233 let poolBackPressure = 0
1234 let poolInfo
041dc05b 1235 pool.emitter.on(PoolEvents.backPressure, info => {
8735b4e5
JB
1236 ++poolBackPressure
1237 poolInfo = info
1238 })
c726f66c 1239 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.backPressure])
033f1776 1240 for (let i = 0; i < numberOfWorkers + 1; i++) {
8735b4e5
JB
1241 promises.add(pool.execute())
1242 }
1243 await Promise.all(promises)
033f1776 1244 expect(poolBackPressure).toBe(1)
8735b4e5
JB
1245 expect(poolInfo).toStrictEqual({
1246 version,
3e8611a8 1247 type: PoolTypes.fixed,
8735b4e5 1248 worker: WorkerTypes.thread,
47352846
JB
1249 started: true,
1250 ready: true,
8735b4e5 1251 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
d46660cd
JB
1252 minSize: expect.any(Number),
1253 maxSize: expect.any(Number),
1254 workerNodes: expect.any(Number),
1255 idleWorkerNodes: expect.any(Number),
1256 busyWorkerNodes: expect.any(Number),
a4e07f72
JB
1257 executedTasks: expect.any(Number),
1258 executingTasks: expect.any(Number),
3e8611a8
JB
1259 maxQueuedTasks: expect.any(Number),
1260 queuedTasks: expect.any(Number),
1261 backPressure: true,
68cbdc84 1262 stolenTasks: expect.any(Number),
a4e07f72 1263 failedTasks: expect.any(Number)
d46660cd 1264 })
3e8611a8 1265 expect(pool.hasBackPressure.called).toBe(true)
fd7ebd49 1266 await pool.destroy()
7c0ba920 1267 })
70a4f5ea 1268
9eae3c69
JB
1269 it('Verify that hasTaskFunction() is working', async () => {
1270 const dynamicThreadPool = new DynamicThreadPool(
1271 Math.floor(numberOfWorkers / 2),
1272 numberOfWorkers,
b2fd3f4a 1273 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
9eae3c69
JB
1274 )
1275 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1276 expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1277 expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1278 true
1279 )
1280 expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true)
1281 expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true)
1282 expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false)
1283 await dynamicThreadPool.destroy()
1284 const fixedClusterPool = new FixedClusterPool(
1285 numberOfWorkers,
1286 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1287 )
1288 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1289 expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1290 expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1291 true
1292 )
1293 expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true)
1294 expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true)
1295 expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false)
1296 await fixedClusterPool.destroy()
1297 })
1298
1299 it('Verify that addTaskFunction() is working', async () => {
1300 const dynamicThreadPool = new DynamicThreadPool(
1301 Math.floor(numberOfWorkers / 2),
1302 numberOfWorkers,
b2fd3f4a 1303 './tests/worker-files/thread/testWorker.mjs'
9eae3c69
JB
1304 )
1305 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
3feeab69
JB
1306 await expect(
1307 dynamicThreadPool.addTaskFunction(0, () => {})
1308 ).rejects.toThrowError(new TypeError('name argument must be a string'))
1309 await expect(
1310 dynamicThreadPool.addTaskFunction('', () => {})
1311 ).rejects.toThrowError(
1312 new TypeError('name argument must not be an empty string')
1313 )
1314 await expect(
1315 dynamicThreadPool.addTaskFunction('test', 0)
1316 ).rejects.toThrowError(new TypeError('fn argument must be a function'))
1317 await expect(
1318 dynamicThreadPool.addTaskFunction('test', '')
1319 ).rejects.toThrowError(new TypeError('fn argument must be a function'))
9eae3c69
JB
1320 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1321 DEFAULT_TASK_NAME,
1322 'test'
1323 ])
1324 const echoTaskFunction = data => {
1325 return data
1326 }
1327 await expect(
1328 dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1329 ).resolves.toBe(true)
1330 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1331 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1332 echoTaskFunction
1333 )
1334 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1335 DEFAULT_TASK_NAME,
1336 'test',
1337 'echo'
1338 ])
1339 const taskFunctionData = { test: 'test' }
1340 const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
1341 expect(echoResult).toStrictEqual(taskFunctionData)
adee6053
JB
1342 for (const workerNode of dynamicThreadPool.workerNodes) {
1343 expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
1344 tasks: {
1345 executed: expect.any(Number),
1346 executing: 0,
1347 queued: 0,
1348 stolen: 0,
1349 failed: 0
1350 },
1351 runTime: {
1352 history: new CircularArray()
1353 },
1354 waitTime: {
1355 history: new CircularArray()
1356 },
1357 elu: {
1358 idle: {
1359 history: new CircularArray()
1360 },
1361 active: {
1362 history: new CircularArray()
1363 }
1364 }
1365 })
1366 }
9eae3c69
JB
1367 await dynamicThreadPool.destroy()
1368 })
1369
1370 it('Verify that removeTaskFunction() is working', async () => {
1371 const dynamicThreadPool = new DynamicThreadPool(
1372 Math.floor(numberOfWorkers / 2),
1373 numberOfWorkers,
b2fd3f4a 1374 './tests/worker-files/thread/testWorker.mjs'
9eae3c69
JB
1375 )
1376 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1377 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1378 DEFAULT_TASK_NAME,
1379 'test'
1380 ])
1381 await expect(
1382 dynamicThreadPool.removeTaskFunction('test')
1383 ).rejects.toThrowError(
16248b23 1384 new Error('Cannot remove a task function not handled on the pool side')
9eae3c69
JB
1385 )
1386 const echoTaskFunction = data => {
1387 return data
1388 }
1389 await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1390 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1391 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1392 echoTaskFunction
1393 )
1394 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1395 DEFAULT_TASK_NAME,
1396 'test',
1397 'echo'
1398 ])
1399 await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
1400 true
1401 )
1402 expect(dynamicThreadPool.taskFunctions.size).toBe(0)
1403 expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
1404 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1405 DEFAULT_TASK_NAME,
1406 'test'
1407 ])
1408 await dynamicThreadPool.destroy()
1409 })
1410
30500265 1411 it('Verify that listTaskFunctionNames() is working', async () => {
90d7d101
JB
1412 const dynamicThreadPool = new DynamicThreadPool(
1413 Math.floor(numberOfWorkers / 2),
1414 numberOfWorkers,
b2fd3f4a 1415 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
90d7d101
JB
1416 )
1417 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
66979634 1418 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
6cd5248f 1419 DEFAULT_TASK_NAME,
90d7d101
JB
1420 'jsonIntegerSerialization',
1421 'factorial',
1422 'fibonacci'
1423 ])
9eae3c69 1424 await dynamicThreadPool.destroy()
90d7d101
JB
1425 const fixedClusterPool = new FixedClusterPool(
1426 numberOfWorkers,
1427 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1428 )
1429 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
66979634 1430 expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([
6cd5248f 1431 DEFAULT_TASK_NAME,
90d7d101
JB
1432 'jsonIntegerSerialization',
1433 'factorial',
1434 'fibonacci'
1435 ])
0fe39c97 1436 await fixedClusterPool.destroy()
90d7d101
JB
1437 })
1438
9eae3c69 1439 it('Verify that setDefaultTaskFunction() is working', async () => {
30500265
JB
1440 const dynamicThreadPool = new DynamicThreadPool(
1441 Math.floor(numberOfWorkers / 2),
1442 numberOfWorkers,
b2fd3f4a 1443 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
30500265
JB
1444 )
1445 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
b0b55f57
JB
1446 await expect(
1447 dynamicThreadPool.setDefaultTaskFunction(0)
1448 ).rejects.toThrowError(
1449 new Error(
1450 "Task function operation 'default' failed on worker 31 with error: 'TypeError: name parameter is not a string'"
1451 )
1452 )
1453 await expect(
1454 dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
1455 ).rejects.toThrowError(
1456 new Error(
1457 "Task function operation 'default' failed on worker 31 with error: 'Error: Cannot set the default task function reserved name as the default task function'"
1458 )
1459 )
1460 await expect(
1461 dynamicThreadPool.setDefaultTaskFunction('unknown')
1462 ).rejects.toThrowError(
1463 new Error(
1464 "Task function operation 'default' failed on worker 31 with error: 'Error: Cannot set the default task function to a non-existing task function'"
1465 )
1466 )
9eae3c69
JB
1467 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1468 DEFAULT_TASK_NAME,
1469 'jsonIntegerSerialization',
1470 'factorial',
1471 'fibonacci'
1472 ])
1473 await expect(
1474 dynamicThreadPool.setDefaultTaskFunction('factorial')
1475 ).resolves.toBe(true)
1476 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1477 DEFAULT_TASK_NAME,
1478 'factorial',
1479 'jsonIntegerSerialization',
1480 'fibonacci'
1481 ])
1482 await expect(
1483 dynamicThreadPool.setDefaultTaskFunction('fibonacci')
1484 ).resolves.toBe(true)
1485 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1486 DEFAULT_TASK_NAME,
1487 'fibonacci',
1488 'jsonIntegerSerialization',
1489 'factorial'
1490 ])
30500265
JB
1491 })
1492
90d7d101 1493 it('Verify that multiple task functions worker is working', async () => {
70a4f5ea 1494 const pool = new DynamicClusterPool(
2431bdb4 1495 Math.floor(numberOfWorkers / 2),
70a4f5ea 1496 numberOfWorkers,
90d7d101 1497 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
70a4f5ea
JB
1498 )
1499 const data = { n: 10 }
82888165 1500 const result0 = await pool.execute(data)
30b963d4 1501 expect(result0).toStrictEqual({ ok: 1 })
70a4f5ea 1502 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
30b963d4 1503 expect(result1).toStrictEqual({ ok: 1 })
70a4f5ea
JB
1504 const result2 = await pool.execute(data, 'factorial')
1505 expect(result2).toBe(3628800)
1506 const result3 = await pool.execute(data, 'fibonacci')
024daf59 1507 expect(result3).toBe(55)
5bb5be17
JB
1508 expect(pool.info.executingTasks).toBe(0)
1509 expect(pool.info.executedTasks).toBe(4)
b414b84c 1510 for (const workerNode of pool.workerNodes) {
66979634 1511 expect(workerNode.info.taskFunctionNames).toStrictEqual([
6cd5248f 1512 DEFAULT_TASK_NAME,
b414b84c
JB
1513 'jsonIntegerSerialization',
1514 'factorial',
1515 'fibonacci'
1516 ])
1517 expect(workerNode.taskFunctionsUsage.size).toBe(3)
66979634 1518 for (const name of pool.listTaskFunctionNames()) {
5bb5be17
JB
1519 expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
1520 tasks: {
1521 executed: expect.any(Number),
4ba4c7f9 1522 executing: 0,
5bb5be17 1523 failed: 0,
68cbdc84
JB
1524 queued: 0,
1525 stolen: 0
5bb5be17
JB
1526 },
1527 runTime: {
1528 history: expect.any(CircularArray)
1529 },
1530 waitTime: {
1531 history: expect.any(CircularArray)
1532 },
1533 elu: {
1534 idle: {
1535 history: expect.any(CircularArray)
1536 },
1537 active: {
1538 history: expect.any(CircularArray)
1539 }
1540 }
1541 })
1542 expect(
4ba4c7f9
JB
1543 workerNode.getTaskFunctionWorkerUsage(name).tasks.executed
1544 ).toBeGreaterThan(0)
5bb5be17 1545 }
dfd7ec01
JB
1546 expect(
1547 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1548 ).toStrictEqual(
66979634
JB
1549 workerNode.getTaskFunctionWorkerUsage(
1550 workerNode.info.taskFunctionNames[1]
1551 )
dfd7ec01 1552 )
5bb5be17 1553 }
0fe39c97 1554 await pool.destroy()
70a4f5ea 1555 })
52a23942
JB
1556
1557 it('Verify sendKillMessageToWorker()', async () => {
1558 const pool = new DynamicClusterPool(
1559 Math.floor(numberOfWorkers / 2),
1560 numberOfWorkers,
1561 './tests/worker-files/cluster/testWorker.js'
1562 )
1563 const workerNodeKey = 0
1564 await expect(
adee6053 1565 pool.sendKillMessageToWorker(workerNodeKey)
52a23942
JB
1566 ).resolves.toBeUndefined()
1567 await pool.destroy()
1568 })
adee6053
JB
1569
1570 it('Verify sendTaskFunctionOperationToWorker()', async () => {
1571 const pool = new DynamicClusterPool(
1572 Math.floor(numberOfWorkers / 2),
1573 numberOfWorkers,
1574 './tests/worker-files/cluster/testWorker.js'
1575 )
1576 const workerNodeKey = 0
1577 await expect(
1578 pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
1579 taskFunctionOperation: 'add',
1580 taskFunctionName: 'empty',
1581 taskFunction: (() => {}).toString()
1582 })
1583 ).resolves.toBe(true)
1584 expect(
1585 pool.workerNodes[workerNodeKey].info.taskFunctionNames
1586 ).toStrictEqual([DEFAULT_TASK_NAME, 'test', 'empty'])
1587 await pool.destroy()
1588 })
1589
1590 it('Verify sendTaskFunctionOperationToWorkers()', async () => {
1591 const pool = new DynamicClusterPool(
1592 Math.floor(numberOfWorkers / 2),
1593 numberOfWorkers,
1594 './tests/worker-files/cluster/testWorker.js'
1595 )
adee6053
JB
1596 await expect(
1597 pool.sendTaskFunctionOperationToWorkers({
1598 taskFunctionOperation: 'add',
1599 taskFunctionName: 'empty',
1600 taskFunction: (() => {}).toString()
1601 })
1602 ).resolves.toBe(true)
1603 for (const workerNode of pool.workerNodes) {
1604 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1605 DEFAULT_TASK_NAME,
1606 'test',
1607 'empty'
1608 ])
1609 }
1610 await pool.destroy()
1611 })
3ec964d6 1612})