test: toThrowError() -> toThrow()
[poolifier.git] / tests / pools / abstract-pool.test.mjs
CommitLineData
a074ffee 1import { EventEmitterAsyncResource } from 'node:events'
2eb14889 2import { dirname, join } from 'node:path'
a074ffee 3import { readFileSync } from 'node:fs'
2eb14889 4import { fileURLToPath } from 'node:url'
a074ffee
JB
5import { expect } from 'expect'
6import { restore, stub } from 'sinon'
7import {
70a4f5ea 8 DynamicClusterPool,
9e619829 9 DynamicThreadPool,
aee46736 10 FixedClusterPool,
e843b904 11 FixedThreadPool,
aee46736 12 PoolEvents,
184855e6 13 PoolTypes,
3d6dd312 14 WorkerChoiceStrategies,
184855e6 15 WorkerTypes
a074ffee
JB
16} from '../../lib/index.js'
17import { CircularArray } from '../../lib/circular-array.js'
18import { Deque } from '../../lib/deque.js'
19import { DEFAULT_TASK_NAME } from '../../lib/utils.js'
20import { waitPoolEvents } from '../test-utils.js'
21import { WorkerNode } from '../../lib/pools/worker-node.js'
e1ffb94f
JB
22
23describe('Abstract pool test suite', () => {
2eb14889
JB
24 const version = JSON.parse(
25 readFileSync(
a5e5599c 26 join(dirname(fileURLToPath(import.meta.url)), '../..', 'package.json'),
2eb14889
JB
27 'utf8'
28 )
29 ).version
fc027381 30 const numberOfWorkers = 2
a8884ffd 31 class StubPoolWithIsMain extends FixedThreadPool {
e1ffb94f
JB
32 isMain () {
33 return false
34 }
3ec964d6 35 }
3ec964d6 36
dc021bcc 37 afterEach(() => {
a074ffee 38 restore()
dc021bcc
JB
39 })
40
3ec964d6 41 it('Simulate pool creation from a non main thread/process', () => {
8d3782fa
JB
42 expect(
43 () =>
a8884ffd 44 new StubPoolWithIsMain(
7c0ba920 45 numberOfWorkers,
b2fd3f4a 46 './tests/worker-files/thread/testWorker.mjs',
8d3782fa 47 {
041dc05b 48 errorHandler: e => console.error(e)
8d3782fa
JB
49 }
50 )
948faff7 51 ).toThrow(
e695d66f
JB
52 new Error(
53 'Cannot start a pool from a worker with the same type as the pool'
54 )
04f45163 55 )
3ec964d6 56 })
c510fea7 57
bc61cfe6
JB
58 it('Verify that pool statuses properties are set', async () => {
59 const pool = new FixedThreadPool(
60 numberOfWorkers,
b2fd3f4a 61 './tests/worker-files/thread/testWorker.mjs'
bc61cfe6
JB
62 )
63 expect(pool.starting).toBe(false)
64 expect(pool.started).toBe(true)
65 await pool.destroy()
bc61cfe6
JB
66 })
67
c510fea7 68 it('Verify that filePath is checked', () => {
948faff7 69 expect(() => new FixedThreadPool(numberOfWorkers)).toThrow(
fa015370 70 new Error("Cannot find the worker file 'undefined'")
3d6dd312
JB
71 )
72 expect(
73 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
948faff7 74 ).toThrow(new Error("Cannot find the worker file './dummyWorker.ts'"))
8d3782fa
JB
75 })
76
77 it('Verify that numberOfWorkers is checked', () => {
8003c026
JB
78 expect(
79 () =>
80 new FixedThreadPool(
81 undefined,
b2fd3f4a 82 './tests/worker-files/thread/testWorker.mjs'
8003c026 83 )
948faff7 84 ).toThrow(
e695d66f
JB
85 new Error(
86 'Cannot instantiate a pool without specifying the number of workers'
87 )
8d3782fa
JB
88 )
89 })
90
91 it('Verify that a negative number of workers is checked', () => {
92 expect(
93 () =>
94 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
948faff7 95 ).toThrow(
473c717a
JB
96 new RangeError(
97 'Cannot instantiate a pool with a negative number of workers'
98 )
8d3782fa
JB
99 )
100 })
101
102 it('Verify that a non integer number of workers is checked', () => {
103 expect(
104 () =>
b2fd3f4a 105 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.mjs')
948faff7 106 ).toThrow(
473c717a 107 new TypeError(
0d80593b 108 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
109 )
110 )
c510fea7 111 })
7c0ba920 112
216541b6 113 it('Verify that dynamic pool sizing is checked', () => {
a5ed75b7
JB
114 expect(
115 () =>
116 new DynamicClusterPool(
117 1,
118 undefined,
119 './tests/worker-files/cluster/testWorker.js'
120 )
948faff7 121 ).toThrow(
a5ed75b7
JB
122 new TypeError(
123 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
124 )
125 )
2761efb4
JB
126 expect(
127 () =>
128 new DynamicThreadPool(
129 0.5,
130 1,
b2fd3f4a 131 './tests/worker-files/thread/testWorker.mjs'
2761efb4 132 )
948faff7 133 ).toThrow(
2761efb4
JB
134 new TypeError(
135 'Cannot instantiate a pool with a non safe integer number of workers'
136 )
137 )
138 expect(
139 () =>
140 new DynamicClusterPool(
141 0,
142 0.5,
a5ed75b7 143 './tests/worker-files/cluster/testWorker.js'
2761efb4 144 )
948faff7 145 ).toThrow(
2761efb4
JB
146 new TypeError(
147 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
148 )
149 )
2431bdb4
JB
150 expect(
151 () =>
b2fd3f4a
JB
152 new DynamicThreadPool(
153 2,
154 1,
155 './tests/worker-files/thread/testWorker.mjs'
156 )
948faff7 157 ).toThrow(
2431bdb4
JB
158 new RangeError(
159 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
160 )
161 )
162 expect(
163 () =>
b2fd3f4a
JB
164 new DynamicThreadPool(
165 0,
166 0,
167 './tests/worker-files/thread/testWorker.mjs'
168 )
948faff7 169 ).toThrow(
2431bdb4 170 new RangeError(
213cbac6 171 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
2431bdb4
JB
172 )
173 )
21f710aa
JB
174 expect(
175 () =>
213cbac6
JB
176 new DynamicClusterPool(
177 1,
178 1,
179 './tests/worker-files/cluster/testWorker.js'
180 )
948faff7 181 ).toThrow(
21f710aa 182 new RangeError(
213cbac6 183 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
21f710aa
JB
184 )
185 )
2431bdb4
JB
186 })
187
fd7ebd49 188 it('Verify that pool options are checked', async () => {
7c0ba920
JB
189 let pool = new FixedThreadPool(
190 numberOfWorkers,
b2fd3f4a 191 './tests/worker-files/thread/testWorker.mjs'
7c0ba920 192 )
b5604034 193 expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
47352846
JB
194 expect(pool.opts).toStrictEqual({
195 startWorkers: true,
196 enableEvents: true,
197 restartWorkerOnError: true,
198 enableTasksQueue: false,
199 workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
200 workerChoiceStrategyOptions: {
201 retries: 6,
202 runTime: { median: false },
203 waitTime: { median: false },
204 elu: { median: false }
205 }
8990357d
JB
206 })
207 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
8c0b113f 208 retries: 6,
932fc8be 209 runTime: { median: false },
5df69fab
JB
210 waitTime: { median: false },
211 elu: { median: false }
da309861 212 })
999ef664
JB
213 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
214 .workerChoiceStrategies) {
215 expect(workerChoiceStrategy.opts).toStrictEqual({
216 retries: 6,
217 runTime: { median: false },
218 waitTime: { median: false },
219 elu: { median: false }
220 })
221 }
fd7ebd49 222 await pool.destroy()
73bfd59d 223 const testHandler = () => console.info('test handler executed')
7c0ba920
JB
224 pool = new FixedThreadPool(
225 numberOfWorkers,
b2fd3f4a 226 './tests/worker-files/thread/testWorker.mjs',
7c0ba920 227 {
e4543b14 228 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
49be33fe 229 workerChoiceStrategyOptions: {
932fc8be 230 runTime: { median: true },
fc027381 231 weights: { 0: 300, 1: 200 }
49be33fe 232 },
35cf1c03 233 enableEvents: false,
1f68cede 234 restartWorkerOnError: false,
ff733df7 235 enableTasksQueue: true,
d4aeae5a 236 tasksQueueOptions: { concurrency: 2 },
35cf1c03
JB
237 messageHandler: testHandler,
238 errorHandler: testHandler,
239 onlineHandler: testHandler,
240 exitHandler: testHandler
7c0ba920
JB
241 }
242 )
7c0ba920 243 expect(pool.emitter).toBeUndefined()
47352846
JB
244 expect(pool.opts).toStrictEqual({
245 startWorkers: true,
246 enableEvents: false,
247 restartWorkerOnError: false,
248 enableTasksQueue: true,
249 tasksQueueOptions: {
250 concurrency: 2,
2324f8c9 251 size: Math.pow(numberOfWorkers, 2),
dbd73092 252 taskStealing: true,
47352846
JB
253 tasksStealingOnBackPressure: true
254 },
255 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
256 workerChoiceStrategyOptions: {
257 retries: 6,
258 runTime: { median: true },
259 waitTime: { median: false },
260 elu: { median: false },
261 weights: { 0: 300, 1: 200 }
262 },
263 onlineHandler: testHandler,
264 messageHandler: testHandler,
265 errorHandler: testHandler,
266 exitHandler: testHandler
8990357d
JB
267 })
268 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
8c0b113f 269 retries: 6,
8990357d
JB
270 runTime: { median: true },
271 waitTime: { median: false },
272 elu: { median: false },
fc027381 273 weights: { 0: 300, 1: 200 }
da309861 274 })
999ef664
JB
275 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
276 .workerChoiceStrategies) {
277 expect(workerChoiceStrategy.opts).toStrictEqual({
278 retries: 6,
279 runTime: { median: true },
280 waitTime: { median: false },
281 elu: { median: false },
282 weights: { 0: 300, 1: 200 }
283 })
284 }
fd7ebd49 285 await pool.destroy()
7c0ba920
JB
286 })
287
fe291b64 288 it('Verify that pool options are validated', () => {
d4aeae5a
JB
289 expect(
290 () =>
291 new FixedThreadPool(
292 numberOfWorkers,
b2fd3f4a 293 './tests/worker-files/thread/testWorker.mjs',
d4aeae5a 294 {
f0d7f803 295 workerChoiceStrategy: 'invalidStrategy'
d4aeae5a
JB
296 }
297 )
948faff7 298 ).toThrow(new Error("Invalid worker choice strategy 'invalidStrategy'"))
7e653ee0
JB
299 expect(
300 () =>
301 new FixedThreadPool(
302 numberOfWorkers,
b2fd3f4a 303 './tests/worker-files/thread/testWorker.mjs',
7e653ee0
JB
304 {
305 workerChoiceStrategyOptions: {
8c0b113f 306 retries: 'invalidChoiceRetries'
7e653ee0
JB
307 }
308 }
309 )
948faff7 310 ).toThrow(
7e653ee0 311 new TypeError(
8c0b113f 312 'Invalid worker choice strategy options: retries must be an integer'
7e653ee0
JB
313 )
314 )
315 expect(
316 () =>
317 new FixedThreadPool(
318 numberOfWorkers,
b2fd3f4a 319 './tests/worker-files/thread/testWorker.mjs',
7e653ee0
JB
320 {
321 workerChoiceStrategyOptions: {
8c0b113f 322 retries: -1
7e653ee0
JB
323 }
324 }
325 )
948faff7 326 ).toThrow(
7e653ee0 327 new RangeError(
8c0b113f 328 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
7e653ee0
JB
329 )
330 )
49be33fe
JB
331 expect(
332 () =>
333 new FixedThreadPool(
334 numberOfWorkers,
b2fd3f4a 335 './tests/worker-files/thread/testWorker.mjs',
49be33fe
JB
336 {
337 workerChoiceStrategyOptions: { weights: {} }
338 }
339 )
948faff7 340 ).toThrow(
8735b4e5
JB
341 new Error(
342 'Invalid worker choice strategy options: must have a weight for each worker node'
343 )
49be33fe 344 )
f0d7f803
JB
345 expect(
346 () =>
347 new FixedThreadPool(
348 numberOfWorkers,
b2fd3f4a 349 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
350 {
351 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
352 }
353 )
948faff7 354 ).toThrow(
8735b4e5
JB
355 new Error(
356 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
357 )
f0d7f803 358 )
5c4c2dee
JB
359 expect(
360 () =>
361 new FixedThreadPool(
362 numberOfWorkers,
b2fd3f4a 363 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
364 {
365 enableTasksQueue: true,
366 tasksQueueOptions: 'invalidTasksQueueOptions'
367 }
368 )
948faff7 369 ).toThrow(
5c4c2dee
JB
370 new TypeError('Invalid tasks queue options: must be a plain object')
371 )
f0d7f803
JB
372 expect(
373 () =>
374 new FixedThreadPool(
375 numberOfWorkers,
b2fd3f4a 376 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
377 {
378 enableTasksQueue: true,
379 tasksQueueOptions: { concurrency: 0 }
380 }
381 )
948faff7 382 ).toThrow(
e695d66f 383 new RangeError(
20c6f652 384 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
8735b4e5
JB
385 )
386 )
f0d7f803
JB
387 expect(
388 () =>
389 new FixedThreadPool(
390 numberOfWorkers,
b2fd3f4a 391 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
392 {
393 enableTasksQueue: true,
5c4c2dee 394 tasksQueueOptions: { concurrency: -1 }
f0d7f803
JB
395 }
396 )
948faff7 397 ).toThrow(
5c4c2dee
JB
398 new RangeError(
399 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
400 )
8735b4e5 401 )
f0d7f803
JB
402 expect(
403 () =>
404 new FixedThreadPool(
405 numberOfWorkers,
b2fd3f4a 406 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
407 {
408 enableTasksQueue: true,
409 tasksQueueOptions: { concurrency: 0.2 }
410 }
411 )
948faff7 412 ).toThrow(
20c6f652 413 new TypeError('Invalid worker node tasks concurrency: must be an integer')
8735b4e5 414 )
5c4c2dee
JB
415 expect(
416 () =>
417 new FixedThreadPool(
418 numberOfWorkers,
b2fd3f4a 419 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
420 {
421 enableTasksQueue: true,
422 tasksQueueOptions: { size: 0 }
423 }
424 )
948faff7 425 ).toThrow(
5c4c2dee
JB
426 new RangeError(
427 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
428 )
429 )
430 expect(
431 () =>
432 new FixedThreadPool(
433 numberOfWorkers,
b2fd3f4a 434 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
435 {
436 enableTasksQueue: true,
437 tasksQueueOptions: { size: -1 }
438 }
439 )
948faff7 440 ).toThrow(
5c4c2dee
JB
441 new RangeError(
442 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
443 )
444 )
445 expect(
446 () =>
447 new FixedThreadPool(
448 numberOfWorkers,
b2fd3f4a 449 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
450 {
451 enableTasksQueue: true,
452 tasksQueueOptions: { size: 0.2 }
453 }
454 )
948faff7 455 ).toThrow(
5c4c2dee
JB
456 new TypeError('Invalid worker node tasks queue size: must be an integer')
457 )
d4aeae5a
JB
458 })
459
2431bdb4 460 it('Verify that pool worker choice strategy options can be set', async () => {
a20f0ba5
JB
461 const pool = new FixedThreadPool(
462 numberOfWorkers,
b2fd3f4a 463 './tests/worker-files/thread/testWorker.mjs',
a20f0ba5
JB
464 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
465 )
466 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
8c0b113f 467 retries: 6,
8990357d
JB
468 runTime: { median: false },
469 waitTime: { median: false },
470 elu: { median: false }
471 })
472 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
8c0b113f 473 retries: 6,
932fc8be 474 runTime: { median: false },
5df69fab
JB
475 waitTime: { median: false },
476 elu: { median: false }
a20f0ba5
JB
477 })
478 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
479 .workerChoiceStrategies) {
86bf340d 480 expect(workerChoiceStrategy.opts).toStrictEqual({
8c0b113f 481 retries: 6,
932fc8be 482 runTime: { median: false },
5df69fab
JB
483 waitTime: { median: false },
484 elu: { median: false }
86bf340d 485 })
a20f0ba5 486 }
87de9ff5
JB
487 expect(
488 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
489 ).toStrictEqual({
932fc8be
JB
490 runTime: {
491 aggregate: true,
492 average: true,
493 median: false
494 },
495 waitTime: {
496 aggregate: false,
497 average: false,
498 median: false
499 },
5df69fab 500 elu: {
9adcefab
JB
501 aggregate: true,
502 average: true,
5df69fab
JB
503 median: false
504 }
86bf340d 505 })
9adcefab
JB
506 pool.setWorkerChoiceStrategyOptions({
507 runTime: { median: true },
508 elu: { median: true }
509 })
a20f0ba5 510 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
8c0b113f 511 retries: 6,
9adcefab 512 runTime: { median: true },
8990357d
JB
513 waitTime: { median: false },
514 elu: { median: true }
515 })
516 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
8c0b113f 517 retries: 6,
8990357d
JB
518 runTime: { median: true },
519 waitTime: { median: false },
9adcefab 520 elu: { median: true }
a20f0ba5
JB
521 })
522 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
523 .workerChoiceStrategies) {
932fc8be 524 expect(workerChoiceStrategy.opts).toStrictEqual({
8c0b113f 525 retries: 6,
9adcefab 526 runTime: { median: true },
8990357d 527 waitTime: { median: false },
9adcefab 528 elu: { median: true }
932fc8be 529 })
a20f0ba5 530 }
87de9ff5
JB
531 expect(
532 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
533 ).toStrictEqual({
932fc8be
JB
534 runTime: {
535 aggregate: true,
536 average: false,
537 median: true
538 },
539 waitTime: {
540 aggregate: false,
541 average: false,
542 median: false
543 },
5df69fab 544 elu: {
9adcefab 545 aggregate: true,
5df69fab 546 average: false,
9adcefab 547 median: true
5df69fab 548 }
86bf340d 549 })
9adcefab
JB
550 pool.setWorkerChoiceStrategyOptions({
551 runTime: { median: false },
552 elu: { median: false }
553 })
a20f0ba5 554 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
8c0b113f 555 retries: 6,
8990357d
JB
556 runTime: { median: false },
557 waitTime: { median: false },
558 elu: { median: false }
559 })
560 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
8c0b113f 561 retries: 6,
9adcefab 562 runTime: { median: false },
8990357d 563 waitTime: { median: false },
9adcefab 564 elu: { median: false }
a20f0ba5
JB
565 })
566 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
567 .workerChoiceStrategies) {
932fc8be 568 expect(workerChoiceStrategy.opts).toStrictEqual({
8c0b113f 569 retries: 6,
9adcefab 570 runTime: { median: false },
8990357d 571 waitTime: { median: false },
9adcefab 572 elu: { median: false }
932fc8be 573 })
a20f0ba5 574 }
87de9ff5
JB
575 expect(
576 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
577 ).toStrictEqual({
932fc8be
JB
578 runTime: {
579 aggregate: true,
580 average: true,
581 median: false
582 },
583 waitTime: {
584 aggregate: false,
585 average: false,
586 median: false
587 },
5df69fab 588 elu: {
9adcefab
JB
589 aggregate: true,
590 average: true,
5df69fab
JB
591 median: false
592 }
86bf340d 593 })
1f95d544
JB
594 expect(() =>
595 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
948faff7 596 ).toThrow(
8735b4e5
JB
597 new TypeError(
598 'Invalid worker choice strategy options: must be a plain object'
599 )
1f95d544 600 )
7e653ee0
JB
601 expect(() =>
602 pool.setWorkerChoiceStrategyOptions({
8c0b113f 603 retries: 'invalidChoiceRetries'
7e653ee0 604 })
948faff7 605 ).toThrow(
7e653ee0 606 new TypeError(
8c0b113f 607 'Invalid worker choice strategy options: retries must be an integer'
7e653ee0
JB
608 )
609 )
948faff7 610 expect(() => pool.setWorkerChoiceStrategyOptions({ retries: -1 })).toThrow(
7e653ee0 611 new RangeError(
8c0b113f 612 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
7e653ee0
JB
613 )
614 )
948faff7 615 expect(() => pool.setWorkerChoiceStrategyOptions({ weights: {} })).toThrow(
8735b4e5
JB
616 new Error(
617 'Invalid worker choice strategy options: must have a weight for each worker node'
618 )
1f95d544
JB
619 )
620 expect(() =>
621 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
948faff7 622 ).toThrow(
8735b4e5
JB
623 new Error(
624 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
625 )
1f95d544 626 )
a20f0ba5
JB
627 await pool.destroy()
628 })
629
2431bdb4 630 it('Verify that pool tasks queue can be enabled/disabled', async () => {
a20f0ba5
JB
631 const pool = new FixedThreadPool(
632 numberOfWorkers,
b2fd3f4a 633 './tests/worker-files/thread/testWorker.mjs'
a20f0ba5
JB
634 )
635 expect(pool.opts.enableTasksQueue).toBe(false)
636 expect(pool.opts.tasksQueueOptions).toBeUndefined()
d6ca1416
JB
637 for (const workerNode of pool.workerNodes) {
638 expect(workerNode.onEmptyQueue).toBeUndefined()
639 expect(workerNode.onBackPressure).toBeUndefined()
640 }
a20f0ba5
JB
641 pool.enableTasksQueue(true)
642 expect(pool.opts.enableTasksQueue).toBe(true)
20c6f652
JB
643 expect(pool.opts.tasksQueueOptions).toStrictEqual({
644 concurrency: 1,
2324f8c9 645 size: Math.pow(numberOfWorkers, 2),
dbd73092 646 taskStealing: true,
47352846 647 tasksStealingOnBackPressure: true
20c6f652 648 })
d6ca1416
JB
649 for (const workerNode of pool.workerNodes) {
650 expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
651 expect(workerNode.onBackPressure).toBeInstanceOf(Function)
652 }
a20f0ba5
JB
653 pool.enableTasksQueue(true, { concurrency: 2 })
654 expect(pool.opts.enableTasksQueue).toBe(true)
20c6f652
JB
655 expect(pool.opts.tasksQueueOptions).toStrictEqual({
656 concurrency: 2,
2324f8c9 657 size: Math.pow(numberOfWorkers, 2),
dbd73092 658 taskStealing: true,
47352846 659 tasksStealingOnBackPressure: true
20c6f652 660 })
d6ca1416
JB
661 for (const workerNode of pool.workerNodes) {
662 expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
663 expect(workerNode.onBackPressure).toBeInstanceOf(Function)
664 }
a20f0ba5
JB
665 pool.enableTasksQueue(false)
666 expect(pool.opts.enableTasksQueue).toBe(false)
667 expect(pool.opts.tasksQueueOptions).toBeUndefined()
d6ca1416
JB
668 for (const workerNode of pool.workerNodes) {
669 expect(workerNode.onEmptyQueue).toBeUndefined()
670 expect(workerNode.onBackPressure).toBeUndefined()
671 }
a20f0ba5
JB
672 await pool.destroy()
673 })
674
2431bdb4 675 it('Verify that pool tasks queue options can be set', async () => {
a20f0ba5
JB
676 const pool = new FixedThreadPool(
677 numberOfWorkers,
b2fd3f4a 678 './tests/worker-files/thread/testWorker.mjs',
a20f0ba5
JB
679 { enableTasksQueue: true }
680 )
20c6f652
JB
681 expect(pool.opts.tasksQueueOptions).toStrictEqual({
682 concurrency: 1,
2324f8c9 683 size: Math.pow(numberOfWorkers, 2),
dbd73092 684 taskStealing: true,
47352846 685 tasksStealingOnBackPressure: true
20c6f652 686 })
d6ca1416 687 for (const workerNode of pool.workerNodes) {
2324f8c9
JB
688 expect(workerNode.tasksQueueBackPressureSize).toBe(
689 pool.opts.tasksQueueOptions.size
690 )
d6ca1416
JB
691 expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
692 expect(workerNode.onBackPressure).toBeInstanceOf(Function)
693 }
694 pool.setTasksQueueOptions({
695 concurrency: 2,
2324f8c9 696 size: 2,
d6ca1416
JB
697 taskStealing: false,
698 tasksStealingOnBackPressure: false
699 })
20c6f652
JB
700 expect(pool.opts.tasksQueueOptions).toStrictEqual({
701 concurrency: 2,
2324f8c9 702 size: 2,
d6ca1416
JB
703 taskStealing: false,
704 tasksStealingOnBackPressure: false
705 })
706 for (const workerNode of pool.workerNodes) {
2324f8c9
JB
707 expect(workerNode.tasksQueueBackPressureSize).toBe(
708 pool.opts.tasksQueueOptions.size
709 )
d6ca1416
JB
710 expect(workerNode.onEmptyQueue).toBeUndefined()
711 expect(workerNode.onBackPressure).toBeUndefined()
712 }
713 pool.setTasksQueueOptions({
714 concurrency: 1,
715 taskStealing: true,
716 tasksStealingOnBackPressure: true
717 })
718 expect(pool.opts.tasksQueueOptions).toStrictEqual({
719 concurrency: 1,
2324f8c9 720 size: Math.pow(numberOfWorkers, 2),
dbd73092 721 taskStealing: true,
47352846 722 tasksStealingOnBackPressure: true
20c6f652 723 })
d6ca1416 724 for (const workerNode of pool.workerNodes) {
2324f8c9
JB
725 expect(workerNode.tasksQueueBackPressureSize).toBe(
726 pool.opts.tasksQueueOptions.size
727 )
d6ca1416
JB
728 expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
729 expect(workerNode.onBackPressure).toBeInstanceOf(Function)
730 }
948faff7 731 expect(() => pool.setTasksQueueOptions('invalidTasksQueueOptions')).toThrow(
8735b4e5
JB
732 new TypeError('Invalid tasks queue options: must be a plain object')
733 )
948faff7 734 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrow(
e695d66f 735 new RangeError(
20c6f652 736 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
8735b4e5
JB
737 )
738 )
948faff7 739 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrow(
e695d66f 740 new RangeError(
20c6f652 741 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
8735b4e5 742 )
a20f0ba5 743 )
948faff7 744 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrow(
20c6f652
JB
745 new TypeError('Invalid worker node tasks concurrency: must be an integer')
746 )
948faff7 747 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrow(
20c6f652 748 new RangeError(
68dbcdc0 749 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
20c6f652
JB
750 )
751 )
948faff7 752 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrow(
20c6f652 753 new RangeError(
68dbcdc0 754 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
20c6f652
JB
755 )
756 )
948faff7 757 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrow(
68dbcdc0 758 new TypeError('Invalid worker node tasks queue size: must be an integer')
f0d7f803 759 )
a20f0ba5
JB
760 await pool.destroy()
761 })
762
6b27d407
JB
763 it('Verify that pool info is set', async () => {
764 let pool = new FixedThreadPool(
765 numberOfWorkers,
b2fd3f4a 766 './tests/worker-files/thread/testWorker.mjs'
6b27d407 767 )
2dca6cab
JB
768 expect(pool.info).toStrictEqual({
769 version,
770 type: PoolTypes.fixed,
771 worker: WorkerTypes.thread,
47352846 772 started: true,
2dca6cab
JB
773 ready: true,
774 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
775 minSize: numberOfWorkers,
776 maxSize: numberOfWorkers,
777 workerNodes: numberOfWorkers,
778 idleWorkerNodes: numberOfWorkers,
779 busyWorkerNodes: 0,
780 executedTasks: 0,
781 executingTasks: 0,
2dca6cab
JB
782 failedTasks: 0
783 })
6b27d407
JB
784 await pool.destroy()
785 pool = new DynamicClusterPool(
2431bdb4 786 Math.floor(numberOfWorkers / 2),
6b27d407 787 numberOfWorkers,
ecdfbdc0 788 './tests/worker-files/cluster/testWorker.js'
6b27d407 789 )
2dca6cab
JB
790 expect(pool.info).toStrictEqual({
791 version,
792 type: PoolTypes.dynamic,
793 worker: WorkerTypes.cluster,
47352846 794 started: true,
2dca6cab
JB
795 ready: true,
796 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
797 minSize: Math.floor(numberOfWorkers / 2),
798 maxSize: numberOfWorkers,
799 workerNodes: Math.floor(numberOfWorkers / 2),
800 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
801 busyWorkerNodes: 0,
802 executedTasks: 0,
803 executingTasks: 0,
2dca6cab
JB
804 failedTasks: 0
805 })
6b27d407
JB
806 await pool.destroy()
807 })
808
2431bdb4 809 it('Verify that pool worker tasks usage are initialized', async () => {
bf9549ae
JB
810 const pool = new FixedClusterPool(
811 numberOfWorkers,
812 './tests/worker-files/cluster/testWorker.js'
813 )
f06e48d8 814 for (const workerNode of pool.workerNodes) {
47352846 815 expect(workerNode).toBeInstanceOf(WorkerNode)
465b2940 816 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
817 tasks: {
818 executed: 0,
819 executing: 0,
820 queued: 0,
df593701 821 maxQueued: 0,
68cbdc84 822 stolen: 0,
a4e07f72
JB
823 failed: 0
824 },
825 runTime: {
4ba4c7f9 826 history: new CircularArray()
a4e07f72
JB
827 },
828 waitTime: {
4ba4c7f9 829 history: new CircularArray()
a4e07f72 830 },
5df69fab
JB
831 elu: {
832 idle: {
4ba4c7f9 833 history: new CircularArray()
5df69fab
JB
834 },
835 active: {
4ba4c7f9 836 history: new CircularArray()
f7510105 837 }
5df69fab 838 }
86bf340d 839 })
f06e48d8
JB
840 }
841 await pool.destroy()
842 })
843
2431bdb4
JB
844 it('Verify that pool worker tasks queue are initialized', async () => {
845 let pool = new FixedClusterPool(
f06e48d8
JB
846 numberOfWorkers,
847 './tests/worker-files/cluster/testWorker.js'
848 )
849 for (const workerNode of pool.workerNodes) {
47352846 850 expect(workerNode).toBeInstanceOf(WorkerNode)
574b351d 851 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
4d8bf9e4 852 expect(workerNode.tasksQueue.size).toBe(0)
9c16fb4b 853 expect(workerNode.tasksQueue.maxSize).toBe(0)
bf9549ae 854 }
fd7ebd49 855 await pool.destroy()
2431bdb4
JB
856 pool = new DynamicThreadPool(
857 Math.floor(numberOfWorkers / 2),
858 numberOfWorkers,
b2fd3f4a 859 './tests/worker-files/thread/testWorker.mjs'
2431bdb4
JB
860 )
861 for (const workerNode of pool.workerNodes) {
47352846 862 expect(workerNode).toBeInstanceOf(WorkerNode)
574b351d 863 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
2431bdb4
JB
864 expect(workerNode.tasksQueue.size).toBe(0)
865 expect(workerNode.tasksQueue.maxSize).toBe(0)
866 }
213cbac6 867 await pool.destroy()
2431bdb4
JB
868 })
869
870 it('Verify that pool worker info are initialized', async () => {
871 let pool = new FixedClusterPool(
872 numberOfWorkers,
873 './tests/worker-files/cluster/testWorker.js'
874 )
2dca6cab 875 for (const workerNode of pool.workerNodes) {
47352846 876 expect(workerNode).toBeInstanceOf(WorkerNode)
2dca6cab
JB
877 expect(workerNode.info).toStrictEqual({
878 id: expect.any(Number),
879 type: WorkerTypes.cluster,
880 dynamic: false,
881 ready: true
882 })
883 }
2431bdb4
JB
884 await pool.destroy()
885 pool = new DynamicThreadPool(
886 Math.floor(numberOfWorkers / 2),
887 numberOfWorkers,
b2fd3f4a 888 './tests/worker-files/thread/testWorker.mjs'
2431bdb4 889 )
2dca6cab 890 for (const workerNode of pool.workerNodes) {
47352846 891 expect(workerNode).toBeInstanceOf(WorkerNode)
2dca6cab
JB
892 expect(workerNode.info).toStrictEqual({
893 id: expect.any(Number),
894 type: WorkerTypes.thread,
895 dynamic: false,
7884d183 896 ready: true
2dca6cab
JB
897 })
898 }
213cbac6 899 await pool.destroy()
bf9549ae
JB
900 })
901
47352846
JB
902 it('Verify that pool can be started after initialization', async () => {
903 const pool = new FixedClusterPool(
904 numberOfWorkers,
905 './tests/worker-files/cluster/testWorker.js',
906 {
907 startWorkers: false
908 }
909 )
910 expect(pool.info.started).toBe(false)
911 expect(pool.info.ready).toBe(false)
912 expect(pool.workerNodes).toStrictEqual([])
948faff7 913 await expect(pool.execute()).rejects.toThrow(
47352846
JB
914 new Error('Cannot execute a task on not started pool')
915 )
916 pool.start()
917 expect(pool.info.started).toBe(true)
918 expect(pool.info.ready).toBe(true)
919 expect(pool.workerNodes.length).toBe(numberOfWorkers)
920 for (const workerNode of pool.workerNodes) {
921 expect(workerNode).toBeInstanceOf(WorkerNode)
922 }
923 await pool.destroy()
924 })
925
9d2d0da1
JB
926 it('Verify that pool execute() arguments are checked', async () => {
927 const pool = new FixedClusterPool(
928 numberOfWorkers,
929 './tests/worker-files/cluster/testWorker.js'
930 )
948faff7 931 await expect(pool.execute(undefined, 0)).rejects.toThrow(
9d2d0da1
JB
932 new TypeError('name argument must be a string')
933 )
948faff7 934 await expect(pool.execute(undefined, '')).rejects.toThrow(
9d2d0da1
JB
935 new TypeError('name argument must not be an empty string')
936 )
948faff7 937 await expect(pool.execute(undefined, undefined, {})).rejects.toThrow(
9d2d0da1
JB
938 new TypeError('transferList argument must be an array')
939 )
940 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
941 "Task function 'unknown' not found"
942 )
943 await pool.destroy()
948faff7 944 await expect(pool.execute()).rejects.toThrow(
47352846 945 new Error('Cannot execute a task on not started pool')
9d2d0da1
JB
946 )
947 })
948
2431bdb4 949 it('Verify that pool worker tasks usage are computed', async () => {
bf9549ae
JB
950 const pool = new FixedClusterPool(
951 numberOfWorkers,
952 './tests/worker-files/cluster/testWorker.js'
953 )
09c2d0d3 954 const promises = new Set()
fc027381
JB
955 const maxMultiplier = 2
956 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
09c2d0d3 957 promises.add(pool.execute())
bf9549ae 958 }
f06e48d8 959 for (const workerNode of pool.workerNodes) {
465b2940 960 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
961 tasks: {
962 executed: 0,
963 executing: maxMultiplier,
964 queued: 0,
df593701 965 maxQueued: 0,
68cbdc84 966 stolen: 0,
a4e07f72
JB
967 failed: 0
968 },
969 runTime: {
a4e07f72
JB
970 history: expect.any(CircularArray)
971 },
972 waitTime: {
a4e07f72
JB
973 history: expect.any(CircularArray)
974 },
5df69fab
JB
975 elu: {
976 idle: {
5df69fab
JB
977 history: expect.any(CircularArray)
978 },
979 active: {
5df69fab 980 history: expect.any(CircularArray)
f7510105 981 }
5df69fab 982 }
86bf340d 983 })
bf9549ae
JB
984 }
985 await Promise.all(promises)
f06e48d8 986 for (const workerNode of pool.workerNodes) {
465b2940 987 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
988 tasks: {
989 executed: maxMultiplier,
990 executing: 0,
991 queued: 0,
df593701 992 maxQueued: 0,
68cbdc84 993 stolen: 0,
a4e07f72
JB
994 failed: 0
995 },
996 runTime: {
a4e07f72
JB
997 history: expect.any(CircularArray)
998 },
999 waitTime: {
a4e07f72
JB
1000 history: expect.any(CircularArray)
1001 },
5df69fab
JB
1002 elu: {
1003 idle: {
5df69fab
JB
1004 history: expect.any(CircularArray)
1005 },
1006 active: {
5df69fab 1007 history: expect.any(CircularArray)
f7510105 1008 }
5df69fab 1009 }
86bf340d 1010 })
bf9549ae 1011 }
fd7ebd49 1012 await pool.destroy()
bf9549ae
JB
1013 })
1014
2431bdb4 1015 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
7fd82a1c 1016 const pool = new DynamicThreadPool(
2431bdb4 1017 Math.floor(numberOfWorkers / 2),
8f4878b7 1018 numberOfWorkers,
b2fd3f4a 1019 './tests/worker-files/thread/testWorker.mjs'
9e619829 1020 )
09c2d0d3 1021 const promises = new Set()
ee9f5295
JB
1022 const maxMultiplier = 2
1023 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
09c2d0d3 1024 promises.add(pool.execute())
9e619829
JB
1025 }
1026 await Promise.all(promises)
f06e48d8 1027 for (const workerNode of pool.workerNodes) {
465b2940 1028 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
1029 tasks: {
1030 executed: expect.any(Number),
1031 executing: 0,
1032 queued: 0,
df593701 1033 maxQueued: 0,
68cbdc84 1034 stolen: 0,
a4e07f72
JB
1035 failed: 0
1036 },
1037 runTime: {
a4e07f72
JB
1038 history: expect.any(CircularArray)
1039 },
1040 waitTime: {
a4e07f72
JB
1041 history: expect.any(CircularArray)
1042 },
5df69fab
JB
1043 elu: {
1044 idle: {
5df69fab
JB
1045 history: expect.any(CircularArray)
1046 },
1047 active: {
5df69fab 1048 history: expect.any(CircularArray)
f7510105 1049 }
5df69fab 1050 }
86bf340d 1051 })
465b2940 1052 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
94407def
JB
1053 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1054 numberOfWorkers * maxMultiplier
1055 )
b97d82d8
JB
1056 expect(workerNode.usage.runTime.history.length).toBe(0)
1057 expect(workerNode.usage.waitTime.history.length).toBe(0)
1058 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1059 expect(workerNode.usage.elu.active.history.length).toBe(0)
9e619829
JB
1060 }
1061 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
f06e48d8 1062 for (const workerNode of pool.workerNodes) {
465b2940 1063 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
1064 tasks: {
1065 executed: 0,
1066 executing: 0,
1067 queued: 0,
df593701 1068 maxQueued: 0,
68cbdc84 1069 stolen: 0,
a4e07f72
JB
1070 failed: 0
1071 },
1072 runTime: {
a4e07f72
JB
1073 history: expect.any(CircularArray)
1074 },
1075 waitTime: {
a4e07f72
JB
1076 history: expect.any(CircularArray)
1077 },
5df69fab
JB
1078 elu: {
1079 idle: {
5df69fab
JB
1080 history: expect.any(CircularArray)
1081 },
1082 active: {
5df69fab 1083 history: expect.any(CircularArray)
f7510105 1084 }
5df69fab 1085 }
86bf340d 1086 })
465b2940
JB
1087 expect(workerNode.usage.runTime.history.length).toBe(0)
1088 expect(workerNode.usage.waitTime.history.length).toBe(0)
b97d82d8
JB
1089 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1090 expect(workerNode.usage.elu.active.history.length).toBe(0)
ee11a4a2 1091 }
fd7ebd49 1092 await pool.destroy()
ee11a4a2
JB
1093 })
1094
a1763c54
JB
1095 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1096 const pool = new DynamicClusterPool(
2431bdb4 1097 Math.floor(numberOfWorkers / 2),
164d950a 1098 numberOfWorkers,
a1763c54 1099 './tests/worker-files/cluster/testWorker.js'
164d950a 1100 )
c726f66c 1101 expect(pool.emitter.eventNames()).toStrictEqual([])
d46660cd 1102 let poolInfo
a1763c54 1103 let poolReady = 0
041dc05b 1104 pool.emitter.on(PoolEvents.ready, info => {
a1763c54 1105 ++poolReady
d46660cd
JB
1106 poolInfo = info
1107 })
a1763c54 1108 await waitPoolEvents(pool, PoolEvents.ready, 1)
c726f66c 1109 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
a1763c54 1110 expect(poolReady).toBe(1)
d46660cd 1111 expect(poolInfo).toStrictEqual({
23ccf9d7 1112 version,
d46660cd 1113 type: PoolTypes.dynamic,
a1763c54 1114 worker: WorkerTypes.cluster,
47352846 1115 started: true,
a1763c54 1116 ready: true,
2431bdb4
JB
1117 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1118 minSize: expect.any(Number),
1119 maxSize: expect.any(Number),
1120 workerNodes: expect.any(Number),
1121 idleWorkerNodes: expect.any(Number),
1122 busyWorkerNodes: expect.any(Number),
1123 executedTasks: expect.any(Number),
1124 executingTasks: expect.any(Number),
2431bdb4
JB
1125 failedTasks: expect.any(Number)
1126 })
1127 await pool.destroy()
1128 })
1129
a1763c54
JB
1130 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1131 const pool = new FixedThreadPool(
2431bdb4 1132 numberOfWorkers,
b2fd3f4a 1133 './tests/worker-files/thread/testWorker.mjs'
2431bdb4 1134 )
c726f66c 1135 expect(pool.emitter.eventNames()).toStrictEqual([])
a1763c54
JB
1136 const promises = new Set()
1137 let poolBusy = 0
2431bdb4 1138 let poolInfo
041dc05b 1139 pool.emitter.on(PoolEvents.busy, info => {
a1763c54 1140 ++poolBusy
2431bdb4
JB
1141 poolInfo = info
1142 })
c726f66c 1143 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
a1763c54
JB
1144 for (let i = 0; i < numberOfWorkers * 2; i++) {
1145 promises.add(pool.execute())
1146 }
1147 await Promise.all(promises)
1148 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1149 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1150 expect(poolBusy).toBe(numberOfWorkers + 1)
2431bdb4
JB
1151 expect(poolInfo).toStrictEqual({
1152 version,
a1763c54
JB
1153 type: PoolTypes.fixed,
1154 worker: WorkerTypes.thread,
47352846
JB
1155 started: true,
1156 ready: true,
2431bdb4 1157 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
d46660cd
JB
1158 minSize: expect.any(Number),
1159 maxSize: expect.any(Number),
1160 workerNodes: expect.any(Number),
1161 idleWorkerNodes: expect.any(Number),
1162 busyWorkerNodes: expect.any(Number),
a4e07f72
JB
1163 executedTasks: expect.any(Number),
1164 executingTasks: expect.any(Number),
a4e07f72 1165 failedTasks: expect.any(Number)
d46660cd 1166 })
164d950a
JB
1167 await pool.destroy()
1168 })
1169
a1763c54
JB
1170 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1171 const pool = new DynamicThreadPool(
1172 Math.floor(numberOfWorkers / 2),
7c0ba920 1173 numberOfWorkers,
b2fd3f4a 1174 './tests/worker-files/thread/testWorker.mjs'
7c0ba920 1175 )
c726f66c 1176 expect(pool.emitter.eventNames()).toStrictEqual([])
09c2d0d3 1177 const promises = new Set()
a1763c54 1178 let poolFull = 0
d46660cd 1179 let poolInfo
041dc05b 1180 pool.emitter.on(PoolEvents.full, info => {
a1763c54 1181 ++poolFull
d46660cd
JB
1182 poolInfo = info
1183 })
c726f66c 1184 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.full])
7c0ba920 1185 for (let i = 0; i < numberOfWorkers * 2; i++) {
f5d14e90 1186 promises.add(pool.execute())
7c0ba920 1187 }
cf597bc5 1188 await Promise.all(promises)
33e6bb4c 1189 expect(poolFull).toBe(1)
d46660cd 1190 expect(poolInfo).toStrictEqual({
23ccf9d7 1191 version,
a1763c54 1192 type: PoolTypes.dynamic,
d46660cd 1193 worker: WorkerTypes.thread,
47352846
JB
1194 started: true,
1195 ready: true,
2431bdb4 1196 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
8735b4e5
JB
1197 minSize: expect.any(Number),
1198 maxSize: expect.any(Number),
1199 workerNodes: expect.any(Number),
1200 idleWorkerNodes: expect.any(Number),
1201 busyWorkerNodes: expect.any(Number),
1202 executedTasks: expect.any(Number),
1203 executingTasks: expect.any(Number),
1204 failedTasks: expect.any(Number)
1205 })
1206 await pool.destroy()
1207 })
1208
3e8611a8 1209 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
b1aae695 1210 const pool = new FixedThreadPool(
8735b4e5 1211 numberOfWorkers,
b2fd3f4a 1212 './tests/worker-files/thread/testWorker.mjs',
8735b4e5
JB
1213 {
1214 enableTasksQueue: true
1215 }
1216 )
a074ffee 1217 stub(pool, 'hasBackPressure').returns(true)
c726f66c 1218 expect(pool.emitter.eventNames()).toStrictEqual([])
8735b4e5
JB
1219 const promises = new Set()
1220 let poolBackPressure = 0
1221 let poolInfo
041dc05b 1222 pool.emitter.on(PoolEvents.backPressure, info => {
8735b4e5
JB
1223 ++poolBackPressure
1224 poolInfo = info
1225 })
c726f66c 1226 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.backPressure])
033f1776 1227 for (let i = 0; i < numberOfWorkers + 1; i++) {
8735b4e5
JB
1228 promises.add(pool.execute())
1229 }
1230 await Promise.all(promises)
033f1776 1231 expect(poolBackPressure).toBe(1)
8735b4e5
JB
1232 expect(poolInfo).toStrictEqual({
1233 version,
3e8611a8 1234 type: PoolTypes.fixed,
8735b4e5 1235 worker: WorkerTypes.thread,
47352846
JB
1236 started: true,
1237 ready: true,
8735b4e5 1238 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
d46660cd
JB
1239 minSize: expect.any(Number),
1240 maxSize: expect.any(Number),
1241 workerNodes: expect.any(Number),
1242 idleWorkerNodes: expect.any(Number),
1243 busyWorkerNodes: expect.any(Number),
a4e07f72
JB
1244 executedTasks: expect.any(Number),
1245 executingTasks: expect.any(Number),
3e8611a8
JB
1246 maxQueuedTasks: expect.any(Number),
1247 queuedTasks: expect.any(Number),
1248 backPressure: true,
68cbdc84 1249 stolenTasks: expect.any(Number),
a4e07f72 1250 failedTasks: expect.any(Number)
d46660cd 1251 })
3e8611a8 1252 expect(pool.hasBackPressure.called).toBe(true)
fd7ebd49 1253 await pool.destroy()
7c0ba920 1254 })
70a4f5ea 1255
9eae3c69
JB
1256 it('Verify that hasTaskFunction() is working', async () => {
1257 const dynamicThreadPool = new DynamicThreadPool(
1258 Math.floor(numberOfWorkers / 2),
1259 numberOfWorkers,
b2fd3f4a 1260 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
9eae3c69
JB
1261 )
1262 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1263 expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1264 expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1265 true
1266 )
1267 expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true)
1268 expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true)
1269 expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false)
1270 await dynamicThreadPool.destroy()
1271 const fixedClusterPool = new FixedClusterPool(
1272 numberOfWorkers,
1273 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1274 )
1275 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1276 expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1277 expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1278 true
1279 )
1280 expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true)
1281 expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true)
1282 expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false)
1283 await fixedClusterPool.destroy()
1284 })
1285
1286 it('Verify that addTaskFunction() is working', async () => {
1287 const dynamicThreadPool = new DynamicThreadPool(
1288 Math.floor(numberOfWorkers / 2),
1289 numberOfWorkers,
b2fd3f4a 1290 './tests/worker-files/thread/testWorker.mjs'
9eae3c69
JB
1291 )
1292 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
3feeab69
JB
1293 await expect(
1294 dynamicThreadPool.addTaskFunction(0, () => {})
948faff7 1295 ).rejects.toThrow(new TypeError('name argument must be a string'))
3feeab69
JB
1296 await expect(
1297 dynamicThreadPool.addTaskFunction('', () => {})
948faff7 1298 ).rejects.toThrow(
3feeab69
JB
1299 new TypeError('name argument must not be an empty string')
1300 )
948faff7
JB
1301 await expect(dynamicThreadPool.addTaskFunction('test', 0)).rejects.toThrow(
1302 new TypeError('fn argument must be a function')
1303 )
1304 await expect(dynamicThreadPool.addTaskFunction('test', '')).rejects.toThrow(
1305 new TypeError('fn argument must be a function')
1306 )
9eae3c69
JB
1307 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1308 DEFAULT_TASK_NAME,
1309 'test'
1310 ])
1311 const echoTaskFunction = data => {
1312 return data
1313 }
1314 await expect(
1315 dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1316 ).resolves.toBe(true)
1317 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1318 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1319 echoTaskFunction
1320 )
1321 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1322 DEFAULT_TASK_NAME,
1323 'test',
1324 'echo'
1325 ])
1326 const taskFunctionData = { test: 'test' }
1327 const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
1328 expect(echoResult).toStrictEqual(taskFunctionData)
adee6053
JB
1329 for (const workerNode of dynamicThreadPool.workerNodes) {
1330 expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
1331 tasks: {
1332 executed: expect.any(Number),
1333 executing: 0,
1334 queued: 0,
1335 stolen: 0,
1336 failed: 0
1337 },
1338 runTime: {
1339 history: new CircularArray()
1340 },
1341 waitTime: {
1342 history: new CircularArray()
1343 },
1344 elu: {
1345 idle: {
1346 history: new CircularArray()
1347 },
1348 active: {
1349 history: new CircularArray()
1350 }
1351 }
1352 })
1353 }
9eae3c69
JB
1354 await dynamicThreadPool.destroy()
1355 })
1356
1357 it('Verify that removeTaskFunction() is working', async () => {
1358 const dynamicThreadPool = new DynamicThreadPool(
1359 Math.floor(numberOfWorkers / 2),
1360 numberOfWorkers,
b2fd3f4a 1361 './tests/worker-files/thread/testWorker.mjs'
9eae3c69
JB
1362 )
1363 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1364 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1365 DEFAULT_TASK_NAME,
1366 'test'
1367 ])
948faff7 1368 await expect(dynamicThreadPool.removeTaskFunction('test')).rejects.toThrow(
16248b23 1369 new Error('Cannot remove a task function not handled on the pool side')
9eae3c69
JB
1370 )
1371 const echoTaskFunction = data => {
1372 return data
1373 }
1374 await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1375 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1376 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1377 echoTaskFunction
1378 )
1379 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1380 DEFAULT_TASK_NAME,
1381 'test',
1382 'echo'
1383 ])
1384 await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
1385 true
1386 )
1387 expect(dynamicThreadPool.taskFunctions.size).toBe(0)
1388 expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
1389 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1390 DEFAULT_TASK_NAME,
1391 'test'
1392 ])
1393 await dynamicThreadPool.destroy()
1394 })
1395
30500265 1396 it('Verify that listTaskFunctionNames() is working', async () => {
90d7d101
JB
1397 const dynamicThreadPool = new DynamicThreadPool(
1398 Math.floor(numberOfWorkers / 2),
1399 numberOfWorkers,
b2fd3f4a 1400 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
90d7d101
JB
1401 )
1402 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
66979634 1403 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
6cd5248f 1404 DEFAULT_TASK_NAME,
90d7d101
JB
1405 'jsonIntegerSerialization',
1406 'factorial',
1407 'fibonacci'
1408 ])
9eae3c69 1409 await dynamicThreadPool.destroy()
90d7d101
JB
1410 const fixedClusterPool = new FixedClusterPool(
1411 numberOfWorkers,
1412 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1413 )
1414 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
66979634 1415 expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([
6cd5248f 1416 DEFAULT_TASK_NAME,
90d7d101
JB
1417 'jsonIntegerSerialization',
1418 'factorial',
1419 'fibonacci'
1420 ])
0fe39c97 1421 await fixedClusterPool.destroy()
90d7d101
JB
1422 })
1423
9eae3c69 1424 it('Verify that setDefaultTaskFunction() is working', async () => {
30500265
JB
1425 const dynamicThreadPool = new DynamicThreadPool(
1426 Math.floor(numberOfWorkers / 2),
1427 numberOfWorkers,
b2fd3f4a 1428 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
30500265
JB
1429 )
1430 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
948faff7 1431 await expect(dynamicThreadPool.setDefaultTaskFunction(0)).rejects.toThrow(
b0b55f57
JB
1432 new Error(
1433 "Task function operation 'default' failed on worker 31 with error: 'TypeError: name parameter is not a string'"
1434 )
1435 )
1436 await expect(
1437 dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
948faff7 1438 ).rejects.toThrow(
b0b55f57
JB
1439 new Error(
1440 "Task function operation 'default' failed on worker 31 with error: 'Error: Cannot set the default task function reserved name as the default task function'"
1441 )
1442 )
1443 await expect(
1444 dynamicThreadPool.setDefaultTaskFunction('unknown')
948faff7 1445 ).rejects.toThrow(
b0b55f57
JB
1446 new Error(
1447 "Task function operation 'default' failed on worker 31 with error: 'Error: Cannot set the default task function to a non-existing task function'"
1448 )
1449 )
9eae3c69
JB
1450 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1451 DEFAULT_TASK_NAME,
1452 'jsonIntegerSerialization',
1453 'factorial',
1454 'fibonacci'
1455 ])
1456 await expect(
1457 dynamicThreadPool.setDefaultTaskFunction('factorial')
1458 ).resolves.toBe(true)
1459 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1460 DEFAULT_TASK_NAME,
1461 'factorial',
1462 'jsonIntegerSerialization',
1463 'fibonacci'
1464 ])
1465 await expect(
1466 dynamicThreadPool.setDefaultTaskFunction('fibonacci')
1467 ).resolves.toBe(true)
1468 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1469 DEFAULT_TASK_NAME,
1470 'fibonacci',
1471 'jsonIntegerSerialization',
1472 'factorial'
1473 ])
30500265
JB
1474 })
1475
90d7d101 1476 it('Verify that multiple task functions worker is working', async () => {
70a4f5ea 1477 const pool = new DynamicClusterPool(
2431bdb4 1478 Math.floor(numberOfWorkers / 2),
70a4f5ea 1479 numberOfWorkers,
90d7d101 1480 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
70a4f5ea
JB
1481 )
1482 const data = { n: 10 }
82888165 1483 const result0 = await pool.execute(data)
30b963d4 1484 expect(result0).toStrictEqual({ ok: 1 })
70a4f5ea 1485 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
30b963d4 1486 expect(result1).toStrictEqual({ ok: 1 })
70a4f5ea
JB
1487 const result2 = await pool.execute(data, 'factorial')
1488 expect(result2).toBe(3628800)
1489 const result3 = await pool.execute(data, 'fibonacci')
024daf59 1490 expect(result3).toBe(55)
5bb5be17
JB
1491 expect(pool.info.executingTasks).toBe(0)
1492 expect(pool.info.executedTasks).toBe(4)
b414b84c 1493 for (const workerNode of pool.workerNodes) {
66979634 1494 expect(workerNode.info.taskFunctionNames).toStrictEqual([
6cd5248f 1495 DEFAULT_TASK_NAME,
b414b84c
JB
1496 'jsonIntegerSerialization',
1497 'factorial',
1498 'fibonacci'
1499 ])
1500 expect(workerNode.taskFunctionsUsage.size).toBe(3)
66979634 1501 for (const name of pool.listTaskFunctionNames()) {
5bb5be17
JB
1502 expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
1503 tasks: {
1504 executed: expect.any(Number),
4ba4c7f9 1505 executing: 0,
5bb5be17 1506 failed: 0,
68cbdc84
JB
1507 queued: 0,
1508 stolen: 0
5bb5be17
JB
1509 },
1510 runTime: {
1511 history: expect.any(CircularArray)
1512 },
1513 waitTime: {
1514 history: expect.any(CircularArray)
1515 },
1516 elu: {
1517 idle: {
1518 history: expect.any(CircularArray)
1519 },
1520 active: {
1521 history: expect.any(CircularArray)
1522 }
1523 }
1524 })
1525 expect(
4ba4c7f9
JB
1526 workerNode.getTaskFunctionWorkerUsage(name).tasks.executed
1527 ).toBeGreaterThan(0)
5bb5be17 1528 }
dfd7ec01
JB
1529 expect(
1530 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1531 ).toStrictEqual(
66979634
JB
1532 workerNode.getTaskFunctionWorkerUsage(
1533 workerNode.info.taskFunctionNames[1]
1534 )
dfd7ec01 1535 )
5bb5be17 1536 }
0fe39c97 1537 await pool.destroy()
70a4f5ea 1538 })
52a23942
JB
1539
1540 it('Verify sendKillMessageToWorker()', async () => {
1541 const pool = new DynamicClusterPool(
1542 Math.floor(numberOfWorkers / 2),
1543 numberOfWorkers,
1544 './tests/worker-files/cluster/testWorker.js'
1545 )
1546 const workerNodeKey = 0
1547 await expect(
adee6053 1548 pool.sendKillMessageToWorker(workerNodeKey)
52a23942
JB
1549 ).resolves.toBeUndefined()
1550 await pool.destroy()
1551 })
adee6053
JB
1552
1553 it('Verify sendTaskFunctionOperationToWorker()', async () => {
1554 const pool = new DynamicClusterPool(
1555 Math.floor(numberOfWorkers / 2),
1556 numberOfWorkers,
1557 './tests/worker-files/cluster/testWorker.js'
1558 )
1559 const workerNodeKey = 0
1560 await expect(
1561 pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
1562 taskFunctionOperation: 'add',
1563 taskFunctionName: 'empty',
1564 taskFunction: (() => {}).toString()
1565 })
1566 ).resolves.toBe(true)
1567 expect(
1568 pool.workerNodes[workerNodeKey].info.taskFunctionNames
1569 ).toStrictEqual([DEFAULT_TASK_NAME, 'test', 'empty'])
1570 await pool.destroy()
1571 })
1572
1573 it('Verify sendTaskFunctionOperationToWorkers()', async () => {
1574 const pool = new DynamicClusterPool(
1575 Math.floor(numberOfWorkers / 2),
1576 numberOfWorkers,
1577 './tests/worker-files/cluster/testWorker.js'
1578 )
adee6053
JB
1579 await expect(
1580 pool.sendTaskFunctionOperationToWorkers({
1581 taskFunctionOperation: 'add',
1582 taskFunctionName: 'empty',
1583 taskFunction: (() => {}).toString()
1584 })
1585 ).resolves.toBe(true)
1586 for (const workerNode of pool.workerNodes) {
1587 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1588 DEFAULT_TASK_NAME,
1589 'test',
1590 'empty'
1591 ])
1592 }
1593 await pool.destroy()
1594 })
3ec964d6 1595})