Merge branch 'master' of github.com:poolifier/poolifier
[poolifier.git] / tests / pools / abstract-pool.test.mjs
CommitLineData
a074ffee 1import { EventEmitterAsyncResource } from 'node:events'
2eb14889 2import { dirname, join } from 'node:path'
a074ffee 3import { readFileSync } from 'node:fs'
2eb14889 4import { fileURLToPath } from 'node:url'
a074ffee
JB
5import { expect } from 'expect'
6import { restore, stub } from 'sinon'
7import {
70a4f5ea 8 DynamicClusterPool,
9e619829 9 DynamicThreadPool,
aee46736 10 FixedClusterPool,
e843b904 11 FixedThreadPool,
aee46736 12 PoolEvents,
184855e6 13 PoolTypes,
3d6dd312 14 WorkerChoiceStrategies,
184855e6 15 WorkerTypes
a074ffee
JB
16} from '../../lib/index.js'
17import { CircularArray } from '../../lib/circular-array.js'
18import { Deque } from '../../lib/deque.js'
19import { DEFAULT_TASK_NAME } from '../../lib/utils.js'
20import { waitPoolEvents } from '../test-utils.js'
21import { WorkerNode } from '../../lib/pools/worker-node.js'
e1ffb94f
JB
22
23describe('Abstract pool test suite', () => {
2eb14889
JB
24 const version = JSON.parse(
25 readFileSync(
a5e5599c 26 join(dirname(fileURLToPath(import.meta.url)), '../..', 'package.json'),
2eb14889
JB
27 'utf8'
28 )
29 ).version
fc027381 30 const numberOfWorkers = 2
a8884ffd 31 class StubPoolWithIsMain extends FixedThreadPool {
e1ffb94f
JB
32 isMain () {
33 return false
34 }
3ec964d6 35 }
3ec964d6 36
dc021bcc 37 afterEach(() => {
a074ffee 38 restore()
dc021bcc
JB
39 })
40
3ec964d6 41 it('Simulate pool creation from a non main thread/process', () => {
8d3782fa
JB
42 expect(
43 () =>
a8884ffd 44 new StubPoolWithIsMain(
7c0ba920 45 numberOfWorkers,
b2fd3f4a 46 './tests/worker-files/thread/testWorker.mjs',
8d3782fa 47 {
041dc05b 48 errorHandler: e => console.error(e)
8d3782fa
JB
49 }
50 )
04f45163 51 ).toThrowError(
e695d66f
JB
52 new Error(
53 'Cannot start a pool from a worker with the same type as the pool'
54 )
04f45163 55 )
3ec964d6 56 })
c510fea7 57
bc61cfe6
JB
58 it('Verify that pool statuses properties are set', async () => {
59 const pool = new FixedThreadPool(
60 numberOfWorkers,
b2fd3f4a 61 './tests/worker-files/thread/testWorker.mjs'
bc61cfe6
JB
62 )
63 expect(pool.starting).toBe(false)
64 expect(pool.started).toBe(true)
65 await pool.destroy()
bc61cfe6
JB
66 })
67
c510fea7 68 it('Verify that filePath is checked', () => {
7c0ba920 69 expect(() => new FixedThreadPool(numberOfWorkers)).toThrowError(
fa015370 70 new Error("Cannot find the worker file 'undefined'")
3d6dd312
JB
71 )
72 expect(
73 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
74 ).toThrowError(new Error("Cannot find the worker file './dummyWorker.ts'"))
8d3782fa
JB
75 })
76
77 it('Verify that numberOfWorkers is checked', () => {
8003c026
JB
78 expect(
79 () =>
80 new FixedThreadPool(
81 undefined,
b2fd3f4a 82 './tests/worker-files/thread/testWorker.mjs'
8003c026
JB
83 )
84 ).toThrowError(
e695d66f
JB
85 new Error(
86 'Cannot instantiate a pool without specifying the number of workers'
87 )
8d3782fa
JB
88 )
89 })
90
91 it('Verify that a negative number of workers is checked', () => {
92 expect(
93 () =>
94 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
95 ).toThrowError(
473c717a
JB
96 new RangeError(
97 'Cannot instantiate a pool with a negative number of workers'
98 )
8d3782fa
JB
99 )
100 })
101
102 it('Verify that a non integer number of workers is checked', () => {
103 expect(
104 () =>
b2fd3f4a 105 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.mjs')
8d3782fa 106 ).toThrowError(
473c717a 107 new TypeError(
0d80593b 108 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
109 )
110 )
c510fea7 111 })
7c0ba920 112
216541b6 113 it('Verify that dynamic pool sizing is checked', () => {
a5ed75b7
JB
114 expect(
115 () =>
116 new DynamicClusterPool(
117 1,
118 undefined,
119 './tests/worker-files/cluster/testWorker.js'
120 )
121 ).toThrowError(
122 new TypeError(
123 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
124 )
125 )
2761efb4
JB
126 expect(
127 () =>
128 new DynamicThreadPool(
129 0.5,
130 1,
b2fd3f4a 131 './tests/worker-files/thread/testWorker.mjs'
2761efb4
JB
132 )
133 ).toThrowError(
134 new TypeError(
135 'Cannot instantiate a pool with a non safe integer number of workers'
136 )
137 )
138 expect(
139 () =>
140 new DynamicClusterPool(
141 0,
142 0.5,
a5ed75b7 143 './tests/worker-files/cluster/testWorker.js'
2761efb4
JB
144 )
145 ).toThrowError(
146 new TypeError(
147 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
148 )
149 )
2431bdb4
JB
150 expect(
151 () =>
b2fd3f4a
JB
152 new DynamicThreadPool(
153 2,
154 1,
155 './tests/worker-files/thread/testWorker.mjs'
156 )
2431bdb4
JB
157 ).toThrowError(
158 new RangeError(
159 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
160 )
161 )
162 expect(
163 () =>
b2fd3f4a
JB
164 new DynamicThreadPool(
165 0,
166 0,
167 './tests/worker-files/thread/testWorker.mjs'
168 )
2431bdb4
JB
169 ).toThrowError(
170 new RangeError(
213cbac6 171 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
2431bdb4
JB
172 )
173 )
21f710aa
JB
174 expect(
175 () =>
213cbac6
JB
176 new DynamicClusterPool(
177 1,
178 1,
179 './tests/worker-files/cluster/testWorker.js'
180 )
21f710aa
JB
181 ).toThrowError(
182 new RangeError(
213cbac6 183 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
21f710aa
JB
184 )
185 )
2431bdb4
JB
186 })
187
fd7ebd49 188 it('Verify that pool options are checked', async () => {
7c0ba920
JB
189 let pool = new FixedThreadPool(
190 numberOfWorkers,
b2fd3f4a 191 './tests/worker-files/thread/testWorker.mjs'
7c0ba920 192 )
b5604034 193 expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
47352846
JB
194 expect(pool.opts).toStrictEqual({
195 startWorkers: true,
196 enableEvents: true,
197 restartWorkerOnError: true,
198 enableTasksQueue: false,
199 workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
200 workerChoiceStrategyOptions: {
201 retries: 6,
202 runTime: { median: false },
203 waitTime: { median: false },
204 elu: { median: false }
205 }
8990357d
JB
206 })
207 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
8c0b113f 208 retries: 6,
932fc8be 209 runTime: { median: false },
5df69fab
JB
210 waitTime: { median: false },
211 elu: { median: false }
da309861 212 })
999ef664
JB
213 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
214 .workerChoiceStrategies) {
215 expect(workerChoiceStrategy.opts).toStrictEqual({
216 retries: 6,
217 runTime: { median: false },
218 waitTime: { median: false },
219 elu: { median: false }
220 })
221 }
fd7ebd49 222 await pool.destroy()
73bfd59d 223 const testHandler = () => console.info('test handler executed')
7c0ba920
JB
224 pool = new FixedThreadPool(
225 numberOfWorkers,
b2fd3f4a 226 './tests/worker-files/thread/testWorker.mjs',
7c0ba920 227 {
e4543b14 228 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
49be33fe 229 workerChoiceStrategyOptions: {
932fc8be 230 runTime: { median: true },
fc027381 231 weights: { 0: 300, 1: 200 }
49be33fe 232 },
35cf1c03 233 enableEvents: false,
1f68cede 234 restartWorkerOnError: false,
ff733df7 235 enableTasksQueue: true,
d4aeae5a 236 tasksQueueOptions: { concurrency: 2 },
35cf1c03
JB
237 messageHandler: testHandler,
238 errorHandler: testHandler,
239 onlineHandler: testHandler,
240 exitHandler: testHandler
7c0ba920
JB
241 }
242 )
7c0ba920 243 expect(pool.emitter).toBeUndefined()
47352846
JB
244 expect(pool.opts).toStrictEqual({
245 startWorkers: true,
246 enableEvents: false,
247 restartWorkerOnError: false,
248 enableTasksQueue: true,
249 tasksQueueOptions: {
250 concurrency: 2,
2324f8c9 251 size: Math.pow(numberOfWorkers, 2),
dbd73092 252 taskStealing: true,
47352846
JB
253 tasksStealingOnBackPressure: true
254 },
255 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
256 workerChoiceStrategyOptions: {
257 retries: 6,
258 runTime: { median: true },
259 waitTime: { median: false },
260 elu: { median: false },
261 weights: { 0: 300, 1: 200 }
262 },
263 onlineHandler: testHandler,
264 messageHandler: testHandler,
265 errorHandler: testHandler,
266 exitHandler: testHandler
8990357d
JB
267 })
268 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
8c0b113f 269 retries: 6,
8990357d
JB
270 runTime: { median: true },
271 waitTime: { median: false },
272 elu: { median: false },
fc027381 273 weights: { 0: 300, 1: 200 }
da309861 274 })
999ef664
JB
275 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
276 .workerChoiceStrategies) {
277 expect(workerChoiceStrategy.opts).toStrictEqual({
278 retries: 6,
279 runTime: { median: true },
280 waitTime: { median: false },
281 elu: { median: false },
282 weights: { 0: 300, 1: 200 }
283 })
284 }
fd7ebd49 285 await pool.destroy()
7c0ba920
JB
286 })
287
fe291b64 288 it('Verify that pool options are validated', () => {
d4aeae5a
JB
289 expect(
290 () =>
291 new FixedThreadPool(
292 numberOfWorkers,
b2fd3f4a 293 './tests/worker-files/thread/testWorker.mjs',
d4aeae5a 294 {
f0d7f803 295 workerChoiceStrategy: 'invalidStrategy'
d4aeae5a
JB
296 }
297 )
8735b4e5
JB
298 ).toThrowError(
299 new Error("Invalid worker choice strategy 'invalidStrategy'")
300 )
7e653ee0
JB
301 expect(
302 () =>
303 new FixedThreadPool(
304 numberOfWorkers,
b2fd3f4a 305 './tests/worker-files/thread/testWorker.mjs',
7e653ee0
JB
306 {
307 workerChoiceStrategyOptions: {
8c0b113f 308 retries: 'invalidChoiceRetries'
7e653ee0
JB
309 }
310 }
311 )
312 ).toThrowError(
313 new TypeError(
8c0b113f 314 'Invalid worker choice strategy options: retries must be an integer'
7e653ee0
JB
315 )
316 )
317 expect(
318 () =>
319 new FixedThreadPool(
320 numberOfWorkers,
b2fd3f4a 321 './tests/worker-files/thread/testWorker.mjs',
7e653ee0
JB
322 {
323 workerChoiceStrategyOptions: {
8c0b113f 324 retries: -1
7e653ee0
JB
325 }
326 }
327 )
328 ).toThrowError(
329 new RangeError(
8c0b113f 330 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
7e653ee0
JB
331 )
332 )
49be33fe
JB
333 expect(
334 () =>
335 new FixedThreadPool(
336 numberOfWorkers,
b2fd3f4a 337 './tests/worker-files/thread/testWorker.mjs',
49be33fe
JB
338 {
339 workerChoiceStrategyOptions: { weights: {} }
340 }
341 )
342 ).toThrowError(
8735b4e5
JB
343 new Error(
344 'Invalid worker choice strategy options: must have a weight for each worker node'
345 )
49be33fe 346 )
f0d7f803
JB
347 expect(
348 () =>
349 new FixedThreadPool(
350 numberOfWorkers,
b2fd3f4a 351 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
352 {
353 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
354 }
355 )
356 ).toThrowError(
8735b4e5
JB
357 new Error(
358 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
359 )
f0d7f803 360 )
5c4c2dee
JB
361 expect(
362 () =>
363 new FixedThreadPool(
364 numberOfWorkers,
b2fd3f4a 365 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
366 {
367 enableTasksQueue: true,
368 tasksQueueOptions: 'invalidTasksQueueOptions'
369 }
370 )
371 ).toThrowError(
372 new TypeError('Invalid tasks queue options: must be a plain object')
373 )
f0d7f803
JB
374 expect(
375 () =>
376 new FixedThreadPool(
377 numberOfWorkers,
b2fd3f4a 378 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
379 {
380 enableTasksQueue: true,
381 tasksQueueOptions: { concurrency: 0 }
382 }
383 )
8735b4e5 384 ).toThrowError(
e695d66f 385 new RangeError(
20c6f652 386 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
8735b4e5
JB
387 )
388 )
f0d7f803
JB
389 expect(
390 () =>
391 new FixedThreadPool(
392 numberOfWorkers,
b2fd3f4a 393 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
394 {
395 enableTasksQueue: true,
5c4c2dee 396 tasksQueueOptions: { concurrency: -1 }
f0d7f803
JB
397 }
398 )
8735b4e5 399 ).toThrowError(
5c4c2dee
JB
400 new RangeError(
401 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
402 )
8735b4e5 403 )
f0d7f803
JB
404 expect(
405 () =>
406 new FixedThreadPool(
407 numberOfWorkers,
b2fd3f4a 408 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
409 {
410 enableTasksQueue: true,
411 tasksQueueOptions: { concurrency: 0.2 }
412 }
413 )
8735b4e5 414 ).toThrowError(
20c6f652 415 new TypeError('Invalid worker node tasks concurrency: must be an integer')
8735b4e5 416 )
5c4c2dee
JB
417 expect(
418 () =>
419 new FixedThreadPool(
420 numberOfWorkers,
b2fd3f4a 421 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
422 {
423 enableTasksQueue: true,
424 tasksQueueOptions: { size: 0 }
425 }
426 )
427 ).toThrowError(
428 new RangeError(
429 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
430 )
431 )
432 expect(
433 () =>
434 new FixedThreadPool(
435 numberOfWorkers,
b2fd3f4a 436 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
437 {
438 enableTasksQueue: true,
439 tasksQueueOptions: { size: -1 }
440 }
441 )
442 ).toThrowError(
443 new RangeError(
444 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
445 )
446 )
447 expect(
448 () =>
449 new FixedThreadPool(
450 numberOfWorkers,
b2fd3f4a 451 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
452 {
453 enableTasksQueue: true,
454 tasksQueueOptions: { size: 0.2 }
455 }
456 )
457 ).toThrowError(
458 new TypeError('Invalid worker node tasks queue size: must be an integer')
459 )
d4aeae5a
JB
460 })
461
2431bdb4 462 it('Verify that pool worker choice strategy options can be set', async () => {
a20f0ba5
JB
463 const pool = new FixedThreadPool(
464 numberOfWorkers,
b2fd3f4a 465 './tests/worker-files/thread/testWorker.mjs',
a20f0ba5
JB
466 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
467 )
468 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
8c0b113f 469 retries: 6,
8990357d
JB
470 runTime: { median: false },
471 waitTime: { median: false },
472 elu: { median: false }
473 })
474 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
8c0b113f 475 retries: 6,
932fc8be 476 runTime: { median: false },
5df69fab
JB
477 waitTime: { median: false },
478 elu: { median: false }
a20f0ba5
JB
479 })
480 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
481 .workerChoiceStrategies) {
86bf340d 482 expect(workerChoiceStrategy.opts).toStrictEqual({
8c0b113f 483 retries: 6,
932fc8be 484 runTime: { median: false },
5df69fab
JB
485 waitTime: { median: false },
486 elu: { median: false }
86bf340d 487 })
a20f0ba5 488 }
87de9ff5
JB
489 expect(
490 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
491 ).toStrictEqual({
932fc8be
JB
492 runTime: {
493 aggregate: true,
494 average: true,
495 median: false
496 },
497 waitTime: {
498 aggregate: false,
499 average: false,
500 median: false
501 },
5df69fab 502 elu: {
9adcefab
JB
503 aggregate: true,
504 average: true,
5df69fab
JB
505 median: false
506 }
86bf340d 507 })
9adcefab
JB
508 pool.setWorkerChoiceStrategyOptions({
509 runTime: { median: true },
510 elu: { median: true }
511 })
a20f0ba5 512 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
8c0b113f 513 retries: 6,
9adcefab 514 runTime: { median: true },
8990357d
JB
515 waitTime: { median: false },
516 elu: { median: true }
517 })
518 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
8c0b113f 519 retries: 6,
8990357d
JB
520 runTime: { median: true },
521 waitTime: { median: false },
9adcefab 522 elu: { median: true }
a20f0ba5
JB
523 })
524 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
525 .workerChoiceStrategies) {
932fc8be 526 expect(workerChoiceStrategy.opts).toStrictEqual({
8c0b113f 527 retries: 6,
9adcefab 528 runTime: { median: true },
8990357d 529 waitTime: { median: false },
9adcefab 530 elu: { median: true }
932fc8be 531 })
a20f0ba5 532 }
87de9ff5
JB
533 expect(
534 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
535 ).toStrictEqual({
932fc8be
JB
536 runTime: {
537 aggregate: true,
538 average: false,
539 median: true
540 },
541 waitTime: {
542 aggregate: false,
543 average: false,
544 median: false
545 },
5df69fab 546 elu: {
9adcefab 547 aggregate: true,
5df69fab 548 average: false,
9adcefab 549 median: true
5df69fab 550 }
86bf340d 551 })
9adcefab
JB
552 pool.setWorkerChoiceStrategyOptions({
553 runTime: { median: false },
554 elu: { median: false }
555 })
a20f0ba5 556 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
8c0b113f 557 retries: 6,
8990357d
JB
558 runTime: { median: false },
559 waitTime: { median: false },
560 elu: { median: false }
561 })
562 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
8c0b113f 563 retries: 6,
9adcefab 564 runTime: { median: false },
8990357d 565 waitTime: { median: false },
9adcefab 566 elu: { median: false }
a20f0ba5
JB
567 })
568 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
569 .workerChoiceStrategies) {
932fc8be 570 expect(workerChoiceStrategy.opts).toStrictEqual({
8c0b113f 571 retries: 6,
9adcefab 572 runTime: { median: false },
8990357d 573 waitTime: { median: false },
9adcefab 574 elu: { median: false }
932fc8be 575 })
a20f0ba5 576 }
87de9ff5
JB
577 expect(
578 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
579 ).toStrictEqual({
932fc8be
JB
580 runTime: {
581 aggregate: true,
582 average: true,
583 median: false
584 },
585 waitTime: {
586 aggregate: false,
587 average: false,
588 median: false
589 },
5df69fab 590 elu: {
9adcefab
JB
591 aggregate: true,
592 average: true,
5df69fab
JB
593 median: false
594 }
86bf340d 595 })
1f95d544
JB
596 expect(() =>
597 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
598 ).toThrowError(
8735b4e5
JB
599 new TypeError(
600 'Invalid worker choice strategy options: must be a plain object'
601 )
1f95d544 602 )
7e653ee0
JB
603 expect(() =>
604 pool.setWorkerChoiceStrategyOptions({
8c0b113f 605 retries: 'invalidChoiceRetries'
7e653ee0
JB
606 })
607 ).toThrowError(
608 new TypeError(
8c0b113f 609 'Invalid worker choice strategy options: retries must be an integer'
7e653ee0
JB
610 )
611 )
612 expect(() =>
8c0b113f 613 pool.setWorkerChoiceStrategyOptions({ retries: -1 })
7e653ee0
JB
614 ).toThrowError(
615 new RangeError(
8c0b113f 616 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
7e653ee0
JB
617 )
618 )
1f95d544
JB
619 expect(() =>
620 pool.setWorkerChoiceStrategyOptions({ weights: {} })
621 ).toThrowError(
8735b4e5
JB
622 new Error(
623 'Invalid worker choice strategy options: must have a weight for each worker node'
624 )
1f95d544
JB
625 )
626 expect(() =>
627 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
628 ).toThrowError(
8735b4e5
JB
629 new Error(
630 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
631 )
1f95d544 632 )
a20f0ba5
JB
633 await pool.destroy()
634 })
635
2431bdb4 636 it('Verify that pool tasks queue can be enabled/disabled', async () => {
a20f0ba5
JB
637 const pool = new FixedThreadPool(
638 numberOfWorkers,
b2fd3f4a 639 './tests/worker-files/thread/testWorker.mjs'
a20f0ba5
JB
640 )
641 expect(pool.opts.enableTasksQueue).toBe(false)
642 expect(pool.opts.tasksQueueOptions).toBeUndefined()
d6ca1416
JB
643 for (const workerNode of pool.workerNodes) {
644 expect(workerNode.onEmptyQueue).toBeUndefined()
645 expect(workerNode.onBackPressure).toBeUndefined()
646 }
a20f0ba5
JB
647 pool.enableTasksQueue(true)
648 expect(pool.opts.enableTasksQueue).toBe(true)
20c6f652
JB
649 expect(pool.opts.tasksQueueOptions).toStrictEqual({
650 concurrency: 1,
2324f8c9 651 size: Math.pow(numberOfWorkers, 2),
dbd73092 652 taskStealing: true,
47352846 653 tasksStealingOnBackPressure: true
20c6f652 654 })
d6ca1416
JB
655 for (const workerNode of pool.workerNodes) {
656 expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
657 expect(workerNode.onBackPressure).toBeInstanceOf(Function)
658 }
a20f0ba5
JB
659 pool.enableTasksQueue(true, { concurrency: 2 })
660 expect(pool.opts.enableTasksQueue).toBe(true)
20c6f652
JB
661 expect(pool.opts.tasksQueueOptions).toStrictEqual({
662 concurrency: 2,
2324f8c9 663 size: Math.pow(numberOfWorkers, 2),
dbd73092 664 taskStealing: true,
47352846 665 tasksStealingOnBackPressure: true
20c6f652 666 })
d6ca1416
JB
667 for (const workerNode of pool.workerNodes) {
668 expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
669 expect(workerNode.onBackPressure).toBeInstanceOf(Function)
670 }
a20f0ba5
JB
671 pool.enableTasksQueue(false)
672 expect(pool.opts.enableTasksQueue).toBe(false)
673 expect(pool.opts.tasksQueueOptions).toBeUndefined()
d6ca1416
JB
674 for (const workerNode of pool.workerNodes) {
675 expect(workerNode.onEmptyQueue).toBeUndefined()
676 expect(workerNode.onBackPressure).toBeUndefined()
677 }
a20f0ba5
JB
678 await pool.destroy()
679 })
680
2431bdb4 681 it('Verify that pool tasks queue options can be set', async () => {
a20f0ba5
JB
682 const pool = new FixedThreadPool(
683 numberOfWorkers,
b2fd3f4a 684 './tests/worker-files/thread/testWorker.mjs',
a20f0ba5
JB
685 { enableTasksQueue: true }
686 )
20c6f652
JB
687 expect(pool.opts.tasksQueueOptions).toStrictEqual({
688 concurrency: 1,
2324f8c9 689 size: Math.pow(numberOfWorkers, 2),
dbd73092 690 taskStealing: true,
47352846 691 tasksStealingOnBackPressure: true
20c6f652 692 })
d6ca1416 693 for (const workerNode of pool.workerNodes) {
2324f8c9
JB
694 expect(workerNode.tasksQueueBackPressureSize).toBe(
695 pool.opts.tasksQueueOptions.size
696 )
d6ca1416
JB
697 expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
698 expect(workerNode.onBackPressure).toBeInstanceOf(Function)
699 }
700 pool.setTasksQueueOptions({
701 concurrency: 2,
2324f8c9 702 size: 2,
d6ca1416
JB
703 taskStealing: false,
704 tasksStealingOnBackPressure: false
705 })
20c6f652
JB
706 expect(pool.opts.tasksQueueOptions).toStrictEqual({
707 concurrency: 2,
2324f8c9 708 size: 2,
d6ca1416
JB
709 taskStealing: false,
710 tasksStealingOnBackPressure: false
711 })
712 for (const workerNode of pool.workerNodes) {
2324f8c9
JB
713 expect(workerNode.tasksQueueBackPressureSize).toBe(
714 pool.opts.tasksQueueOptions.size
715 )
d6ca1416
JB
716 expect(workerNode.onEmptyQueue).toBeUndefined()
717 expect(workerNode.onBackPressure).toBeUndefined()
718 }
719 pool.setTasksQueueOptions({
720 concurrency: 1,
721 taskStealing: true,
722 tasksStealingOnBackPressure: true
723 })
724 expect(pool.opts.tasksQueueOptions).toStrictEqual({
725 concurrency: 1,
2324f8c9 726 size: Math.pow(numberOfWorkers, 2),
dbd73092 727 taskStealing: true,
47352846 728 tasksStealingOnBackPressure: true
20c6f652 729 })
d6ca1416 730 for (const workerNode of pool.workerNodes) {
2324f8c9
JB
731 expect(workerNode.tasksQueueBackPressureSize).toBe(
732 pool.opts.tasksQueueOptions.size
733 )
d6ca1416
JB
734 expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
735 expect(workerNode.onBackPressure).toBeInstanceOf(Function)
736 }
f0d7f803
JB
737 expect(() =>
738 pool.setTasksQueueOptions('invalidTasksQueueOptions')
8735b4e5
JB
739 ).toThrowError(
740 new TypeError('Invalid tasks queue options: must be a plain object')
741 )
a20f0ba5 742 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrowError(
e695d66f 743 new RangeError(
20c6f652 744 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
8735b4e5
JB
745 )
746 )
747 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrowError(
e695d66f 748 new RangeError(
20c6f652 749 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
8735b4e5 750 )
a20f0ba5 751 )
f0d7f803 752 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrowError(
20c6f652
JB
753 new TypeError('Invalid worker node tasks concurrency: must be an integer')
754 )
ff3f866a 755 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrowError(
20c6f652 756 new RangeError(
68dbcdc0 757 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
20c6f652
JB
758 )
759 )
ff3f866a 760 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrowError(
20c6f652 761 new RangeError(
68dbcdc0 762 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
20c6f652
JB
763 )
764 )
ff3f866a 765 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrowError(
68dbcdc0 766 new TypeError('Invalid worker node tasks queue size: must be an integer')
f0d7f803 767 )
a20f0ba5
JB
768 await pool.destroy()
769 })
770
6b27d407
JB
771 it('Verify that pool info is set', async () => {
772 let pool = new FixedThreadPool(
773 numberOfWorkers,
b2fd3f4a 774 './tests/worker-files/thread/testWorker.mjs'
6b27d407 775 )
2dca6cab
JB
776 expect(pool.info).toStrictEqual({
777 version,
778 type: PoolTypes.fixed,
779 worker: WorkerTypes.thread,
47352846 780 started: true,
2dca6cab
JB
781 ready: true,
782 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
783 minSize: numberOfWorkers,
784 maxSize: numberOfWorkers,
785 workerNodes: numberOfWorkers,
786 idleWorkerNodes: numberOfWorkers,
787 busyWorkerNodes: 0,
788 executedTasks: 0,
789 executingTasks: 0,
2dca6cab
JB
790 failedTasks: 0
791 })
6b27d407
JB
792 await pool.destroy()
793 pool = new DynamicClusterPool(
2431bdb4 794 Math.floor(numberOfWorkers / 2),
6b27d407 795 numberOfWorkers,
ecdfbdc0 796 './tests/worker-files/cluster/testWorker.js'
6b27d407 797 )
2dca6cab
JB
798 expect(pool.info).toStrictEqual({
799 version,
800 type: PoolTypes.dynamic,
801 worker: WorkerTypes.cluster,
47352846 802 started: true,
2dca6cab
JB
803 ready: true,
804 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
805 minSize: Math.floor(numberOfWorkers / 2),
806 maxSize: numberOfWorkers,
807 workerNodes: Math.floor(numberOfWorkers / 2),
808 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
809 busyWorkerNodes: 0,
810 executedTasks: 0,
811 executingTasks: 0,
2dca6cab
JB
812 failedTasks: 0
813 })
6b27d407
JB
814 await pool.destroy()
815 })
816
2431bdb4 817 it('Verify that pool worker tasks usage are initialized', async () => {
bf9549ae
JB
818 const pool = new FixedClusterPool(
819 numberOfWorkers,
820 './tests/worker-files/cluster/testWorker.js'
821 )
f06e48d8 822 for (const workerNode of pool.workerNodes) {
47352846 823 expect(workerNode).toBeInstanceOf(WorkerNode)
465b2940 824 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
825 tasks: {
826 executed: 0,
827 executing: 0,
828 queued: 0,
df593701 829 maxQueued: 0,
68cbdc84 830 stolen: 0,
a4e07f72
JB
831 failed: 0
832 },
833 runTime: {
4ba4c7f9 834 history: new CircularArray()
a4e07f72
JB
835 },
836 waitTime: {
4ba4c7f9 837 history: new CircularArray()
a4e07f72 838 },
5df69fab
JB
839 elu: {
840 idle: {
4ba4c7f9 841 history: new CircularArray()
5df69fab
JB
842 },
843 active: {
4ba4c7f9 844 history: new CircularArray()
f7510105 845 }
5df69fab 846 }
86bf340d 847 })
f06e48d8
JB
848 }
849 await pool.destroy()
850 })
851
2431bdb4
JB
852 it('Verify that pool worker tasks queue are initialized', async () => {
853 let pool = new FixedClusterPool(
f06e48d8
JB
854 numberOfWorkers,
855 './tests/worker-files/cluster/testWorker.js'
856 )
857 for (const workerNode of pool.workerNodes) {
47352846 858 expect(workerNode).toBeInstanceOf(WorkerNode)
574b351d 859 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
4d8bf9e4 860 expect(workerNode.tasksQueue.size).toBe(0)
9c16fb4b 861 expect(workerNode.tasksQueue.maxSize).toBe(0)
bf9549ae 862 }
fd7ebd49 863 await pool.destroy()
2431bdb4
JB
864 pool = new DynamicThreadPool(
865 Math.floor(numberOfWorkers / 2),
866 numberOfWorkers,
b2fd3f4a 867 './tests/worker-files/thread/testWorker.mjs'
2431bdb4
JB
868 )
869 for (const workerNode of pool.workerNodes) {
47352846 870 expect(workerNode).toBeInstanceOf(WorkerNode)
574b351d 871 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
2431bdb4
JB
872 expect(workerNode.tasksQueue.size).toBe(0)
873 expect(workerNode.tasksQueue.maxSize).toBe(0)
874 }
213cbac6 875 await pool.destroy()
2431bdb4
JB
876 })
877
878 it('Verify that pool worker info are initialized', async () => {
879 let pool = new FixedClusterPool(
880 numberOfWorkers,
881 './tests/worker-files/cluster/testWorker.js'
882 )
2dca6cab 883 for (const workerNode of pool.workerNodes) {
47352846 884 expect(workerNode).toBeInstanceOf(WorkerNode)
2dca6cab
JB
885 expect(workerNode.info).toStrictEqual({
886 id: expect.any(Number),
887 type: WorkerTypes.cluster,
888 dynamic: false,
889 ready: true
890 })
891 }
2431bdb4
JB
892 await pool.destroy()
893 pool = new DynamicThreadPool(
894 Math.floor(numberOfWorkers / 2),
895 numberOfWorkers,
b2fd3f4a 896 './tests/worker-files/thread/testWorker.mjs'
2431bdb4 897 )
2dca6cab 898 for (const workerNode of pool.workerNodes) {
47352846 899 expect(workerNode).toBeInstanceOf(WorkerNode)
2dca6cab
JB
900 expect(workerNode.info).toStrictEqual({
901 id: expect.any(Number),
902 type: WorkerTypes.thread,
903 dynamic: false,
7884d183 904 ready: true
2dca6cab
JB
905 })
906 }
213cbac6 907 await pool.destroy()
bf9549ae
JB
908 })
909
47352846
JB
910 it('Verify that pool can be started after initialization', async () => {
911 const pool = new FixedClusterPool(
912 numberOfWorkers,
913 './tests/worker-files/cluster/testWorker.js',
914 {
915 startWorkers: false
916 }
917 )
918 expect(pool.info.started).toBe(false)
919 expect(pool.info.ready).toBe(false)
920 expect(pool.workerNodes).toStrictEqual([])
921 await expect(pool.execute()).rejects.toThrowError(
922 new Error('Cannot execute a task on not started pool')
923 )
924 pool.start()
925 expect(pool.info.started).toBe(true)
926 expect(pool.info.ready).toBe(true)
927 expect(pool.workerNodes.length).toBe(numberOfWorkers)
928 for (const workerNode of pool.workerNodes) {
929 expect(workerNode).toBeInstanceOf(WorkerNode)
930 }
931 await pool.destroy()
932 })
933
9d2d0da1
JB
934 it('Verify that pool execute() arguments are checked', async () => {
935 const pool = new FixedClusterPool(
936 numberOfWorkers,
937 './tests/worker-files/cluster/testWorker.js'
938 )
939 await expect(pool.execute(undefined, 0)).rejects.toThrowError(
940 new TypeError('name argument must be a string')
941 )
942 await expect(pool.execute(undefined, '')).rejects.toThrowError(
943 new TypeError('name argument must not be an empty string')
944 )
945 await expect(pool.execute(undefined, undefined, {})).rejects.toThrowError(
946 new TypeError('transferList argument must be an array')
947 )
948 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
949 "Task function 'unknown' not found"
950 )
951 await pool.destroy()
47352846
JB
952 await expect(pool.execute()).rejects.toThrowError(
953 new Error('Cannot execute a task on not started pool')
9d2d0da1
JB
954 )
955 })
956
2431bdb4 957 it('Verify that pool worker tasks usage are computed', async () => {
bf9549ae
JB
958 const pool = new FixedClusterPool(
959 numberOfWorkers,
960 './tests/worker-files/cluster/testWorker.js'
961 )
09c2d0d3 962 const promises = new Set()
fc027381
JB
963 const maxMultiplier = 2
964 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
09c2d0d3 965 promises.add(pool.execute())
bf9549ae 966 }
f06e48d8 967 for (const workerNode of pool.workerNodes) {
465b2940 968 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
969 tasks: {
970 executed: 0,
971 executing: maxMultiplier,
972 queued: 0,
df593701 973 maxQueued: 0,
68cbdc84 974 stolen: 0,
a4e07f72
JB
975 failed: 0
976 },
977 runTime: {
a4e07f72
JB
978 history: expect.any(CircularArray)
979 },
980 waitTime: {
a4e07f72
JB
981 history: expect.any(CircularArray)
982 },
5df69fab
JB
983 elu: {
984 idle: {
5df69fab
JB
985 history: expect.any(CircularArray)
986 },
987 active: {
5df69fab 988 history: expect.any(CircularArray)
f7510105 989 }
5df69fab 990 }
86bf340d 991 })
bf9549ae
JB
992 }
993 await Promise.all(promises)
f06e48d8 994 for (const workerNode of pool.workerNodes) {
465b2940 995 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
996 tasks: {
997 executed: maxMultiplier,
998 executing: 0,
999 queued: 0,
df593701 1000 maxQueued: 0,
68cbdc84 1001 stolen: 0,
a4e07f72
JB
1002 failed: 0
1003 },
1004 runTime: {
a4e07f72
JB
1005 history: expect.any(CircularArray)
1006 },
1007 waitTime: {
a4e07f72
JB
1008 history: expect.any(CircularArray)
1009 },
5df69fab
JB
1010 elu: {
1011 idle: {
5df69fab
JB
1012 history: expect.any(CircularArray)
1013 },
1014 active: {
5df69fab 1015 history: expect.any(CircularArray)
f7510105 1016 }
5df69fab 1017 }
86bf340d 1018 })
bf9549ae 1019 }
fd7ebd49 1020 await pool.destroy()
bf9549ae
JB
1021 })
1022
2431bdb4 1023 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
7fd82a1c 1024 const pool = new DynamicThreadPool(
2431bdb4 1025 Math.floor(numberOfWorkers / 2),
8f4878b7 1026 numberOfWorkers,
b2fd3f4a 1027 './tests/worker-files/thread/testWorker.mjs'
9e619829 1028 )
09c2d0d3 1029 const promises = new Set()
ee9f5295
JB
1030 const maxMultiplier = 2
1031 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
09c2d0d3 1032 promises.add(pool.execute())
9e619829
JB
1033 }
1034 await Promise.all(promises)
f06e48d8 1035 for (const workerNode of pool.workerNodes) {
465b2940 1036 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
1037 tasks: {
1038 executed: expect.any(Number),
1039 executing: 0,
1040 queued: 0,
df593701 1041 maxQueued: 0,
68cbdc84 1042 stolen: 0,
a4e07f72
JB
1043 failed: 0
1044 },
1045 runTime: {
a4e07f72
JB
1046 history: expect.any(CircularArray)
1047 },
1048 waitTime: {
a4e07f72
JB
1049 history: expect.any(CircularArray)
1050 },
5df69fab
JB
1051 elu: {
1052 idle: {
5df69fab
JB
1053 history: expect.any(CircularArray)
1054 },
1055 active: {
5df69fab 1056 history: expect.any(CircularArray)
f7510105 1057 }
5df69fab 1058 }
86bf340d 1059 })
465b2940 1060 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
94407def
JB
1061 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1062 numberOfWorkers * maxMultiplier
1063 )
b97d82d8
JB
1064 expect(workerNode.usage.runTime.history.length).toBe(0)
1065 expect(workerNode.usage.waitTime.history.length).toBe(0)
1066 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1067 expect(workerNode.usage.elu.active.history.length).toBe(0)
9e619829
JB
1068 }
1069 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
f06e48d8 1070 for (const workerNode of pool.workerNodes) {
465b2940 1071 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
1072 tasks: {
1073 executed: 0,
1074 executing: 0,
1075 queued: 0,
df593701 1076 maxQueued: 0,
68cbdc84 1077 stolen: 0,
a4e07f72
JB
1078 failed: 0
1079 },
1080 runTime: {
a4e07f72
JB
1081 history: expect.any(CircularArray)
1082 },
1083 waitTime: {
a4e07f72
JB
1084 history: expect.any(CircularArray)
1085 },
5df69fab
JB
1086 elu: {
1087 idle: {
5df69fab
JB
1088 history: expect.any(CircularArray)
1089 },
1090 active: {
5df69fab 1091 history: expect.any(CircularArray)
f7510105 1092 }
5df69fab 1093 }
86bf340d 1094 })
465b2940
JB
1095 expect(workerNode.usage.runTime.history.length).toBe(0)
1096 expect(workerNode.usage.waitTime.history.length).toBe(0)
b97d82d8
JB
1097 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1098 expect(workerNode.usage.elu.active.history.length).toBe(0)
ee11a4a2 1099 }
fd7ebd49 1100 await pool.destroy()
ee11a4a2
JB
1101 })
1102
a1763c54
JB
1103 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1104 const pool = new DynamicClusterPool(
2431bdb4 1105 Math.floor(numberOfWorkers / 2),
164d950a 1106 numberOfWorkers,
a1763c54 1107 './tests/worker-files/cluster/testWorker.js'
164d950a 1108 )
c726f66c 1109 expect(pool.emitter.eventNames()).toStrictEqual([])
d46660cd 1110 let poolInfo
a1763c54 1111 let poolReady = 0
041dc05b 1112 pool.emitter.on(PoolEvents.ready, info => {
a1763c54 1113 ++poolReady
d46660cd
JB
1114 poolInfo = info
1115 })
a1763c54 1116 await waitPoolEvents(pool, PoolEvents.ready, 1)
c726f66c 1117 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
a1763c54 1118 expect(poolReady).toBe(1)
d46660cd 1119 expect(poolInfo).toStrictEqual({
23ccf9d7 1120 version,
d46660cd 1121 type: PoolTypes.dynamic,
a1763c54 1122 worker: WorkerTypes.cluster,
47352846 1123 started: true,
a1763c54 1124 ready: true,
2431bdb4
JB
1125 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1126 minSize: expect.any(Number),
1127 maxSize: expect.any(Number),
1128 workerNodes: expect.any(Number),
1129 idleWorkerNodes: expect.any(Number),
1130 busyWorkerNodes: expect.any(Number),
1131 executedTasks: expect.any(Number),
1132 executingTasks: expect.any(Number),
2431bdb4
JB
1133 failedTasks: expect.any(Number)
1134 })
1135 await pool.destroy()
1136 })
1137
a1763c54
JB
1138 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1139 const pool = new FixedThreadPool(
2431bdb4 1140 numberOfWorkers,
b2fd3f4a 1141 './tests/worker-files/thread/testWorker.mjs'
2431bdb4 1142 )
c726f66c 1143 expect(pool.emitter.eventNames()).toStrictEqual([])
a1763c54
JB
1144 const promises = new Set()
1145 let poolBusy = 0
2431bdb4 1146 let poolInfo
041dc05b 1147 pool.emitter.on(PoolEvents.busy, info => {
a1763c54 1148 ++poolBusy
2431bdb4
JB
1149 poolInfo = info
1150 })
c726f66c 1151 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
a1763c54
JB
1152 for (let i = 0; i < numberOfWorkers * 2; i++) {
1153 promises.add(pool.execute())
1154 }
1155 await Promise.all(promises)
1156 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1157 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1158 expect(poolBusy).toBe(numberOfWorkers + 1)
2431bdb4
JB
1159 expect(poolInfo).toStrictEqual({
1160 version,
a1763c54
JB
1161 type: PoolTypes.fixed,
1162 worker: WorkerTypes.thread,
47352846
JB
1163 started: true,
1164 ready: true,
2431bdb4 1165 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
d46660cd
JB
1166 minSize: expect.any(Number),
1167 maxSize: expect.any(Number),
1168 workerNodes: expect.any(Number),
1169 idleWorkerNodes: expect.any(Number),
1170 busyWorkerNodes: expect.any(Number),
a4e07f72
JB
1171 executedTasks: expect.any(Number),
1172 executingTasks: expect.any(Number),
a4e07f72 1173 failedTasks: expect.any(Number)
d46660cd 1174 })
164d950a
JB
1175 await pool.destroy()
1176 })
1177
a1763c54
JB
1178 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1179 const pool = new DynamicThreadPool(
1180 Math.floor(numberOfWorkers / 2),
7c0ba920 1181 numberOfWorkers,
b2fd3f4a 1182 './tests/worker-files/thread/testWorker.mjs'
7c0ba920 1183 )
c726f66c 1184 expect(pool.emitter.eventNames()).toStrictEqual([])
09c2d0d3 1185 const promises = new Set()
a1763c54 1186 let poolFull = 0
d46660cd 1187 let poolInfo
041dc05b 1188 pool.emitter.on(PoolEvents.full, info => {
a1763c54 1189 ++poolFull
d46660cd
JB
1190 poolInfo = info
1191 })
c726f66c 1192 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.full])
7c0ba920 1193 for (let i = 0; i < numberOfWorkers * 2; i++) {
f5d14e90 1194 promises.add(pool.execute())
7c0ba920 1195 }
cf597bc5 1196 await Promise.all(promises)
33e6bb4c 1197 expect(poolFull).toBe(1)
d46660cd 1198 expect(poolInfo).toStrictEqual({
23ccf9d7 1199 version,
a1763c54 1200 type: PoolTypes.dynamic,
d46660cd 1201 worker: WorkerTypes.thread,
47352846
JB
1202 started: true,
1203 ready: true,
2431bdb4 1204 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
8735b4e5
JB
1205 minSize: expect.any(Number),
1206 maxSize: expect.any(Number),
1207 workerNodes: expect.any(Number),
1208 idleWorkerNodes: expect.any(Number),
1209 busyWorkerNodes: expect.any(Number),
1210 executedTasks: expect.any(Number),
1211 executingTasks: expect.any(Number),
1212 failedTasks: expect.any(Number)
1213 })
1214 await pool.destroy()
1215 })
1216
3e8611a8 1217 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
b1aae695 1218 const pool = new FixedThreadPool(
8735b4e5 1219 numberOfWorkers,
b2fd3f4a 1220 './tests/worker-files/thread/testWorker.mjs',
8735b4e5
JB
1221 {
1222 enableTasksQueue: true
1223 }
1224 )
a074ffee 1225 stub(pool, 'hasBackPressure').returns(true)
c726f66c 1226 expect(pool.emitter.eventNames()).toStrictEqual([])
8735b4e5
JB
1227 const promises = new Set()
1228 let poolBackPressure = 0
1229 let poolInfo
041dc05b 1230 pool.emitter.on(PoolEvents.backPressure, info => {
8735b4e5
JB
1231 ++poolBackPressure
1232 poolInfo = info
1233 })
c726f66c 1234 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.backPressure])
033f1776 1235 for (let i = 0; i < numberOfWorkers + 1; i++) {
8735b4e5
JB
1236 promises.add(pool.execute())
1237 }
1238 await Promise.all(promises)
033f1776 1239 expect(poolBackPressure).toBe(1)
8735b4e5
JB
1240 expect(poolInfo).toStrictEqual({
1241 version,
3e8611a8 1242 type: PoolTypes.fixed,
8735b4e5 1243 worker: WorkerTypes.thread,
47352846
JB
1244 started: true,
1245 ready: true,
8735b4e5 1246 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
d46660cd
JB
1247 minSize: expect.any(Number),
1248 maxSize: expect.any(Number),
1249 workerNodes: expect.any(Number),
1250 idleWorkerNodes: expect.any(Number),
1251 busyWorkerNodes: expect.any(Number),
a4e07f72
JB
1252 executedTasks: expect.any(Number),
1253 executingTasks: expect.any(Number),
3e8611a8
JB
1254 maxQueuedTasks: expect.any(Number),
1255 queuedTasks: expect.any(Number),
1256 backPressure: true,
68cbdc84 1257 stolenTasks: expect.any(Number),
a4e07f72 1258 failedTasks: expect.any(Number)
d46660cd 1259 })
3e8611a8 1260 expect(pool.hasBackPressure.called).toBe(true)
fd7ebd49 1261 await pool.destroy()
7c0ba920 1262 })
70a4f5ea 1263
9eae3c69
JB
1264 it('Verify that hasTaskFunction() is working', async () => {
1265 const dynamicThreadPool = new DynamicThreadPool(
1266 Math.floor(numberOfWorkers / 2),
1267 numberOfWorkers,
b2fd3f4a 1268 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
9eae3c69
JB
1269 )
1270 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1271 expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1272 expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1273 true
1274 )
1275 expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true)
1276 expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true)
1277 expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false)
1278 await dynamicThreadPool.destroy()
1279 const fixedClusterPool = new FixedClusterPool(
1280 numberOfWorkers,
1281 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1282 )
1283 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1284 expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1285 expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1286 true
1287 )
1288 expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true)
1289 expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true)
1290 expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false)
1291 await fixedClusterPool.destroy()
1292 })
1293
1294 it('Verify that addTaskFunction() is working', async () => {
1295 const dynamicThreadPool = new DynamicThreadPool(
1296 Math.floor(numberOfWorkers / 2),
1297 numberOfWorkers,
b2fd3f4a 1298 './tests/worker-files/thread/testWorker.mjs'
9eae3c69
JB
1299 )
1300 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
3feeab69
JB
1301 await expect(
1302 dynamicThreadPool.addTaskFunction(0, () => {})
1303 ).rejects.toThrowError(new TypeError('name argument must be a string'))
1304 await expect(
1305 dynamicThreadPool.addTaskFunction('', () => {})
1306 ).rejects.toThrowError(
1307 new TypeError('name argument must not be an empty string')
1308 )
1309 await expect(
1310 dynamicThreadPool.addTaskFunction('test', 0)
1311 ).rejects.toThrowError(new TypeError('fn argument must be a function'))
1312 await expect(
1313 dynamicThreadPool.addTaskFunction('test', '')
1314 ).rejects.toThrowError(new TypeError('fn argument must be a function'))
9eae3c69
JB
1315 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1316 DEFAULT_TASK_NAME,
1317 'test'
1318 ])
1319 const echoTaskFunction = data => {
1320 return data
1321 }
1322 await expect(
1323 dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1324 ).resolves.toBe(true)
1325 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1326 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1327 echoTaskFunction
1328 )
1329 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1330 DEFAULT_TASK_NAME,
1331 'test',
1332 'echo'
1333 ])
1334 const taskFunctionData = { test: 'test' }
1335 const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
1336 expect(echoResult).toStrictEqual(taskFunctionData)
adee6053
JB
1337 for (const workerNode of dynamicThreadPool.workerNodes) {
1338 expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
1339 tasks: {
1340 executed: expect.any(Number),
1341 executing: 0,
1342 queued: 0,
1343 stolen: 0,
1344 failed: 0
1345 },
1346 runTime: {
1347 history: new CircularArray()
1348 },
1349 waitTime: {
1350 history: new CircularArray()
1351 },
1352 elu: {
1353 idle: {
1354 history: new CircularArray()
1355 },
1356 active: {
1357 history: new CircularArray()
1358 }
1359 }
1360 })
1361 }
9eae3c69
JB
1362 await dynamicThreadPool.destroy()
1363 })
1364
1365 it('Verify that removeTaskFunction() is working', async () => {
1366 const dynamicThreadPool = new DynamicThreadPool(
1367 Math.floor(numberOfWorkers / 2),
1368 numberOfWorkers,
b2fd3f4a 1369 './tests/worker-files/thread/testWorker.mjs'
9eae3c69
JB
1370 )
1371 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1372 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1373 DEFAULT_TASK_NAME,
1374 'test'
1375 ])
1376 await expect(
1377 dynamicThreadPool.removeTaskFunction('test')
1378 ).rejects.toThrowError(
16248b23 1379 new Error('Cannot remove a task function not handled on the pool side')
9eae3c69
JB
1380 )
1381 const echoTaskFunction = data => {
1382 return data
1383 }
1384 await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1385 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1386 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1387 echoTaskFunction
1388 )
1389 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1390 DEFAULT_TASK_NAME,
1391 'test',
1392 'echo'
1393 ])
1394 await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
1395 true
1396 )
1397 expect(dynamicThreadPool.taskFunctions.size).toBe(0)
1398 expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
1399 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1400 DEFAULT_TASK_NAME,
1401 'test'
1402 ])
1403 await dynamicThreadPool.destroy()
1404 })
1405
30500265 1406 it('Verify that listTaskFunctionNames() is working', async () => {
90d7d101
JB
1407 const dynamicThreadPool = new DynamicThreadPool(
1408 Math.floor(numberOfWorkers / 2),
1409 numberOfWorkers,
b2fd3f4a 1410 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
90d7d101
JB
1411 )
1412 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
66979634 1413 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
6cd5248f 1414 DEFAULT_TASK_NAME,
90d7d101
JB
1415 'jsonIntegerSerialization',
1416 'factorial',
1417 'fibonacci'
1418 ])
9eae3c69 1419 await dynamicThreadPool.destroy()
90d7d101
JB
1420 const fixedClusterPool = new FixedClusterPool(
1421 numberOfWorkers,
1422 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1423 )
1424 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
66979634 1425 expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([
6cd5248f 1426 DEFAULT_TASK_NAME,
90d7d101
JB
1427 'jsonIntegerSerialization',
1428 'factorial',
1429 'fibonacci'
1430 ])
0fe39c97 1431 await fixedClusterPool.destroy()
90d7d101
JB
1432 })
1433
9eae3c69 1434 it('Verify that setDefaultTaskFunction() is working', async () => {
30500265
JB
1435 const dynamicThreadPool = new DynamicThreadPool(
1436 Math.floor(numberOfWorkers / 2),
1437 numberOfWorkers,
b2fd3f4a 1438 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
30500265
JB
1439 )
1440 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
b0b55f57
JB
1441 await expect(
1442 dynamicThreadPool.setDefaultTaskFunction(0)
1443 ).rejects.toThrowError(
1444 new Error(
1445 "Task function operation 'default' failed on worker 31 with error: 'TypeError: name parameter is not a string'"
1446 )
1447 )
1448 await expect(
1449 dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
1450 ).rejects.toThrowError(
1451 new Error(
1452 "Task function operation 'default' failed on worker 31 with error: 'Error: Cannot set the default task function reserved name as the default task function'"
1453 )
1454 )
1455 await expect(
1456 dynamicThreadPool.setDefaultTaskFunction('unknown')
1457 ).rejects.toThrowError(
1458 new Error(
1459 "Task function operation 'default' failed on worker 31 with error: 'Error: Cannot set the default task function to a non-existing task function'"
1460 )
1461 )
9eae3c69
JB
1462 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1463 DEFAULT_TASK_NAME,
1464 'jsonIntegerSerialization',
1465 'factorial',
1466 'fibonacci'
1467 ])
1468 await expect(
1469 dynamicThreadPool.setDefaultTaskFunction('factorial')
1470 ).resolves.toBe(true)
1471 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1472 DEFAULT_TASK_NAME,
1473 'factorial',
1474 'jsonIntegerSerialization',
1475 'fibonacci'
1476 ])
1477 await expect(
1478 dynamicThreadPool.setDefaultTaskFunction('fibonacci')
1479 ).resolves.toBe(true)
1480 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1481 DEFAULT_TASK_NAME,
1482 'fibonacci',
1483 'jsonIntegerSerialization',
1484 'factorial'
1485 ])
30500265
JB
1486 })
1487
90d7d101 1488 it('Verify that multiple task functions worker is working', async () => {
70a4f5ea 1489 const pool = new DynamicClusterPool(
2431bdb4 1490 Math.floor(numberOfWorkers / 2),
70a4f5ea 1491 numberOfWorkers,
90d7d101 1492 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
70a4f5ea
JB
1493 )
1494 const data = { n: 10 }
82888165 1495 const result0 = await pool.execute(data)
30b963d4 1496 expect(result0).toStrictEqual({ ok: 1 })
70a4f5ea 1497 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
30b963d4 1498 expect(result1).toStrictEqual({ ok: 1 })
70a4f5ea
JB
1499 const result2 = await pool.execute(data, 'factorial')
1500 expect(result2).toBe(3628800)
1501 const result3 = await pool.execute(data, 'fibonacci')
024daf59 1502 expect(result3).toBe(55)
5bb5be17
JB
1503 expect(pool.info.executingTasks).toBe(0)
1504 expect(pool.info.executedTasks).toBe(4)
b414b84c 1505 for (const workerNode of pool.workerNodes) {
66979634 1506 expect(workerNode.info.taskFunctionNames).toStrictEqual([
6cd5248f 1507 DEFAULT_TASK_NAME,
b414b84c
JB
1508 'jsonIntegerSerialization',
1509 'factorial',
1510 'fibonacci'
1511 ])
1512 expect(workerNode.taskFunctionsUsage.size).toBe(3)
66979634 1513 for (const name of pool.listTaskFunctionNames()) {
5bb5be17
JB
1514 expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
1515 tasks: {
1516 executed: expect.any(Number),
4ba4c7f9 1517 executing: 0,
5bb5be17 1518 failed: 0,
68cbdc84
JB
1519 queued: 0,
1520 stolen: 0
5bb5be17
JB
1521 },
1522 runTime: {
1523 history: expect.any(CircularArray)
1524 },
1525 waitTime: {
1526 history: expect.any(CircularArray)
1527 },
1528 elu: {
1529 idle: {
1530 history: expect.any(CircularArray)
1531 },
1532 active: {
1533 history: expect.any(CircularArray)
1534 }
1535 }
1536 })
1537 expect(
4ba4c7f9
JB
1538 workerNode.getTaskFunctionWorkerUsage(name).tasks.executed
1539 ).toBeGreaterThan(0)
5bb5be17 1540 }
dfd7ec01
JB
1541 expect(
1542 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1543 ).toStrictEqual(
66979634
JB
1544 workerNode.getTaskFunctionWorkerUsage(
1545 workerNode.info.taskFunctionNames[1]
1546 )
dfd7ec01 1547 )
5bb5be17 1548 }
0fe39c97 1549 await pool.destroy()
70a4f5ea 1550 })
52a23942
JB
1551
1552 it('Verify sendKillMessageToWorker()', async () => {
1553 const pool = new DynamicClusterPool(
1554 Math.floor(numberOfWorkers / 2),
1555 numberOfWorkers,
1556 './tests/worker-files/cluster/testWorker.js'
1557 )
1558 const workerNodeKey = 0
1559 await expect(
adee6053 1560 pool.sendKillMessageToWorker(workerNodeKey)
52a23942
JB
1561 ).resolves.toBeUndefined()
1562 await pool.destroy()
1563 })
adee6053
JB
1564
1565 it('Verify sendTaskFunctionOperationToWorker()', async () => {
1566 const pool = new DynamicClusterPool(
1567 Math.floor(numberOfWorkers / 2),
1568 numberOfWorkers,
1569 './tests/worker-files/cluster/testWorker.js'
1570 )
1571 const workerNodeKey = 0
1572 await expect(
1573 pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
1574 taskFunctionOperation: 'add',
1575 taskFunctionName: 'empty',
1576 taskFunction: (() => {}).toString()
1577 })
1578 ).resolves.toBe(true)
1579 expect(
1580 pool.workerNodes[workerNodeKey].info.taskFunctionNames
1581 ).toStrictEqual([DEFAULT_TASK_NAME, 'test', 'empty'])
1582 await pool.destroy()
1583 })
1584
1585 it('Verify sendTaskFunctionOperationToWorkers()', async () => {
1586 const pool = new DynamicClusterPool(
1587 Math.floor(numberOfWorkers / 2),
1588 numberOfWorkers,
1589 './tests/worker-files/cluster/testWorker.js'
1590 )
adee6053
JB
1591 await expect(
1592 pool.sendTaskFunctionOperationToWorkers({
1593 taskFunctionOperation: 'add',
1594 taskFunctionName: 'empty',
1595 taskFunction: (() => {}).toString()
1596 })
1597 ).resolves.toBe(true)
1598 for (const workerNode of pool.workerNodes) {
1599 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1600 DEFAULT_TASK_NAME,
1601 'test',
1602 'empty'
1603 ])
1604 }
1605 await pool.destroy()
1606 })
3ec964d6 1607})