fix: fix continuous tasks stealing on idle start at worker node idling
[poolifier.git] / tests / pools / abstract-pool.test.mjs
CommitLineData
a074ffee 1import { EventEmitterAsyncResource } from 'node:events'
2eb14889 2import { dirname, join } from 'node:path'
a074ffee 3import { readFileSync } from 'node:fs'
2eb14889 4import { fileURLToPath } from 'node:url'
a074ffee
JB
5import { expect } from 'expect'
6import { restore, stub } from 'sinon'
7import {
70a4f5ea 8 DynamicClusterPool,
9e619829 9 DynamicThreadPool,
aee46736 10 FixedClusterPool,
e843b904 11 FixedThreadPool,
aee46736 12 PoolEvents,
184855e6 13 PoolTypes,
3d6dd312 14 WorkerChoiceStrategies,
184855e6 15 WorkerTypes
a074ffee
JB
16} from '../../lib/index.js'
17import { CircularArray } from '../../lib/circular-array.js'
18import { Deque } from '../../lib/deque.js'
19import { DEFAULT_TASK_NAME } from '../../lib/utils.js'
20import { waitPoolEvents } from '../test-utils.js'
21import { WorkerNode } from '../../lib/pools/worker-node.js'
e1ffb94f
JB
22
23describe('Abstract pool test suite', () => {
2eb14889
JB
24 const version = JSON.parse(
25 readFileSync(
a5e5599c 26 join(dirname(fileURLToPath(import.meta.url)), '../..', 'package.json'),
2eb14889
JB
27 'utf8'
28 )
29 ).version
fc027381 30 const numberOfWorkers = 2
a8884ffd 31 class StubPoolWithIsMain extends FixedThreadPool {
e1ffb94f
JB
32 isMain () {
33 return false
34 }
3ec964d6 35 }
3ec964d6 36
dc021bcc 37 afterEach(() => {
a074ffee 38 restore()
dc021bcc
JB
39 })
40
bcf85f7f
JB
41 it('Verify that pool can be created and destroyed', async () => {
42 const pool = new FixedThreadPool(
43 numberOfWorkers,
44 './tests/worker-files/thread/testWorker.mjs'
45 )
46 expect(pool).toBeInstanceOf(FixedThreadPool)
47 await pool.destroy()
48 })
49
50 it('Verify that pool cannot be created from a non main thread/process', () => {
8d3782fa
JB
51 expect(
52 () =>
a8884ffd 53 new StubPoolWithIsMain(
7c0ba920 54 numberOfWorkers,
b2fd3f4a 55 './tests/worker-files/thread/testWorker.mjs',
8d3782fa 56 {
041dc05b 57 errorHandler: e => console.error(e)
8d3782fa
JB
58 }
59 )
948faff7 60 ).toThrow(
e695d66f
JB
61 new Error(
62 'Cannot start a pool from a worker with the same type as the pool'
63 )
04f45163 64 )
3ec964d6 65 })
c510fea7 66
bc61cfe6
JB
67 it('Verify that pool statuses properties are set', async () => {
68 const pool = new FixedThreadPool(
69 numberOfWorkers,
b2fd3f4a 70 './tests/worker-files/thread/testWorker.mjs'
bc61cfe6 71 )
bc61cfe6 72 expect(pool.started).toBe(true)
711623b8
JB
73 expect(pool.starting).toBe(false)
74 expect(pool.destroying).toBe(false)
bc61cfe6 75 await pool.destroy()
bc61cfe6
JB
76 })
77
c510fea7 78 it('Verify that filePath is checked', () => {
948faff7 79 expect(() => new FixedThreadPool(numberOfWorkers)).toThrow(
fa015370 80 new Error("Cannot find the worker file 'undefined'")
3d6dd312
JB
81 )
82 expect(
83 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
948faff7 84 ).toThrow(new Error("Cannot find the worker file './dummyWorker.ts'"))
8d3782fa
JB
85 })
86
87 it('Verify that numberOfWorkers is checked', () => {
8003c026
JB
88 expect(
89 () =>
90 new FixedThreadPool(
91 undefined,
b2fd3f4a 92 './tests/worker-files/thread/testWorker.mjs'
8003c026 93 )
948faff7 94 ).toThrow(
e695d66f
JB
95 new Error(
96 'Cannot instantiate a pool without specifying the number of workers'
97 )
8d3782fa
JB
98 )
99 })
100
101 it('Verify that a negative number of workers is checked', () => {
102 expect(
103 () =>
104 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
948faff7 105 ).toThrow(
473c717a
JB
106 new RangeError(
107 'Cannot instantiate a pool with a negative number of workers'
108 )
8d3782fa
JB
109 )
110 })
111
112 it('Verify that a non integer number of workers is checked', () => {
113 expect(
114 () =>
b2fd3f4a 115 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.mjs')
948faff7 116 ).toThrow(
473c717a 117 new TypeError(
0d80593b 118 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
119 )
120 )
c510fea7 121 })
7c0ba920 122
216541b6 123 it('Verify that dynamic pool sizing is checked', () => {
a5ed75b7
JB
124 expect(
125 () =>
126 new DynamicClusterPool(
127 1,
128 undefined,
129 './tests/worker-files/cluster/testWorker.js'
130 )
948faff7 131 ).toThrow(
a5ed75b7
JB
132 new TypeError(
133 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
134 )
135 )
2761efb4
JB
136 expect(
137 () =>
138 new DynamicThreadPool(
139 0.5,
140 1,
b2fd3f4a 141 './tests/worker-files/thread/testWorker.mjs'
2761efb4 142 )
948faff7 143 ).toThrow(
2761efb4
JB
144 new TypeError(
145 'Cannot instantiate a pool with a non safe integer number of workers'
146 )
147 )
148 expect(
149 () =>
150 new DynamicClusterPool(
151 0,
152 0.5,
a5ed75b7 153 './tests/worker-files/cluster/testWorker.js'
2761efb4 154 )
948faff7 155 ).toThrow(
2761efb4
JB
156 new TypeError(
157 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
158 )
159 )
2431bdb4
JB
160 expect(
161 () =>
b2fd3f4a
JB
162 new DynamicThreadPool(
163 2,
164 1,
165 './tests/worker-files/thread/testWorker.mjs'
166 )
948faff7 167 ).toThrow(
2431bdb4
JB
168 new RangeError(
169 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
170 )
171 )
172 expect(
173 () =>
b2fd3f4a
JB
174 new DynamicThreadPool(
175 0,
176 0,
177 './tests/worker-files/thread/testWorker.mjs'
178 )
948faff7 179 ).toThrow(
2431bdb4 180 new RangeError(
213cbac6 181 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
2431bdb4
JB
182 )
183 )
21f710aa
JB
184 expect(
185 () =>
213cbac6
JB
186 new DynamicClusterPool(
187 1,
188 1,
189 './tests/worker-files/cluster/testWorker.js'
190 )
948faff7 191 ).toThrow(
21f710aa 192 new RangeError(
213cbac6 193 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
21f710aa
JB
194 )
195 )
2431bdb4
JB
196 })
197
fd7ebd49 198 it('Verify that pool options are checked', async () => {
7c0ba920
JB
199 let pool = new FixedThreadPool(
200 numberOfWorkers,
b2fd3f4a 201 './tests/worker-files/thread/testWorker.mjs'
7c0ba920 202 )
b5604034 203 expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
47352846
JB
204 expect(pool.opts).toStrictEqual({
205 startWorkers: true,
206 enableEvents: true,
207 restartWorkerOnError: true,
208 enableTasksQueue: false,
209 workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
210 workerChoiceStrategyOptions: {
211 retries: 6,
212 runTime: { median: false },
213 waitTime: { median: false },
214 elu: { median: false }
215 }
8990357d
JB
216 })
217 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
8c0b113f 218 retries: 6,
932fc8be 219 runTime: { median: false },
5df69fab
JB
220 waitTime: { median: false },
221 elu: { median: false }
da309861 222 })
999ef664
JB
223 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
224 .workerChoiceStrategies) {
225 expect(workerChoiceStrategy.opts).toStrictEqual({
226 retries: 6,
227 runTime: { median: false },
228 waitTime: { median: false },
229 elu: { median: false }
230 })
231 }
fd7ebd49 232 await pool.destroy()
73bfd59d 233 const testHandler = () => console.info('test handler executed')
7c0ba920
JB
234 pool = new FixedThreadPool(
235 numberOfWorkers,
b2fd3f4a 236 './tests/worker-files/thread/testWorker.mjs',
7c0ba920 237 {
e4543b14 238 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
49be33fe 239 workerChoiceStrategyOptions: {
932fc8be 240 runTime: { median: true },
fc027381 241 weights: { 0: 300, 1: 200 }
49be33fe 242 },
35cf1c03 243 enableEvents: false,
1f68cede 244 restartWorkerOnError: false,
ff733df7 245 enableTasksQueue: true,
d4aeae5a 246 tasksQueueOptions: { concurrency: 2 },
35cf1c03
JB
247 messageHandler: testHandler,
248 errorHandler: testHandler,
249 onlineHandler: testHandler,
250 exitHandler: testHandler
7c0ba920
JB
251 }
252 )
7c0ba920 253 expect(pool.emitter).toBeUndefined()
47352846
JB
254 expect(pool.opts).toStrictEqual({
255 startWorkers: true,
256 enableEvents: false,
257 restartWorkerOnError: false,
258 enableTasksQueue: true,
259 tasksQueueOptions: {
260 concurrency: 2,
2324f8c9 261 size: Math.pow(numberOfWorkers, 2),
dbd73092 262 taskStealing: true,
47352846
JB
263 tasksStealingOnBackPressure: true
264 },
265 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
266 workerChoiceStrategyOptions: {
267 retries: 6,
268 runTime: { median: true },
269 waitTime: { median: false },
270 elu: { median: false },
271 weights: { 0: 300, 1: 200 }
272 },
273 onlineHandler: testHandler,
274 messageHandler: testHandler,
275 errorHandler: testHandler,
276 exitHandler: testHandler
8990357d
JB
277 })
278 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
8c0b113f 279 retries: 6,
8990357d
JB
280 runTime: { median: true },
281 waitTime: { median: false },
282 elu: { median: false },
fc027381 283 weights: { 0: 300, 1: 200 }
da309861 284 })
999ef664
JB
285 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
286 .workerChoiceStrategies) {
287 expect(workerChoiceStrategy.opts).toStrictEqual({
288 retries: 6,
289 runTime: { median: true },
290 waitTime: { median: false },
291 elu: { median: false },
292 weights: { 0: 300, 1: 200 }
293 })
294 }
fd7ebd49 295 await pool.destroy()
7c0ba920
JB
296 })
297
fe291b64 298 it('Verify that pool options are validated', () => {
d4aeae5a
JB
299 expect(
300 () =>
301 new FixedThreadPool(
302 numberOfWorkers,
b2fd3f4a 303 './tests/worker-files/thread/testWorker.mjs',
d4aeae5a 304 {
f0d7f803 305 workerChoiceStrategy: 'invalidStrategy'
d4aeae5a
JB
306 }
307 )
948faff7 308 ).toThrow(new Error("Invalid worker choice strategy 'invalidStrategy'"))
7e653ee0
JB
309 expect(
310 () =>
311 new FixedThreadPool(
312 numberOfWorkers,
b2fd3f4a 313 './tests/worker-files/thread/testWorker.mjs',
7e653ee0
JB
314 {
315 workerChoiceStrategyOptions: {
8c0b113f 316 retries: 'invalidChoiceRetries'
7e653ee0
JB
317 }
318 }
319 )
948faff7 320 ).toThrow(
7e653ee0 321 new TypeError(
8c0b113f 322 'Invalid worker choice strategy options: retries must be an integer'
7e653ee0
JB
323 )
324 )
325 expect(
326 () =>
327 new FixedThreadPool(
328 numberOfWorkers,
b2fd3f4a 329 './tests/worker-files/thread/testWorker.mjs',
7e653ee0
JB
330 {
331 workerChoiceStrategyOptions: {
8c0b113f 332 retries: -1
7e653ee0
JB
333 }
334 }
335 )
948faff7 336 ).toThrow(
7e653ee0 337 new RangeError(
8c0b113f 338 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
7e653ee0
JB
339 )
340 )
49be33fe
JB
341 expect(
342 () =>
343 new FixedThreadPool(
344 numberOfWorkers,
b2fd3f4a 345 './tests/worker-files/thread/testWorker.mjs',
49be33fe
JB
346 {
347 workerChoiceStrategyOptions: { weights: {} }
348 }
349 )
948faff7 350 ).toThrow(
8735b4e5
JB
351 new Error(
352 'Invalid worker choice strategy options: must have a weight for each worker node'
353 )
49be33fe 354 )
f0d7f803
JB
355 expect(
356 () =>
357 new FixedThreadPool(
358 numberOfWorkers,
b2fd3f4a 359 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
360 {
361 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
362 }
363 )
948faff7 364 ).toThrow(
8735b4e5
JB
365 new Error(
366 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
367 )
f0d7f803 368 )
5c4c2dee
JB
369 expect(
370 () =>
371 new FixedThreadPool(
372 numberOfWorkers,
b2fd3f4a 373 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
374 {
375 enableTasksQueue: true,
376 tasksQueueOptions: 'invalidTasksQueueOptions'
377 }
378 )
948faff7 379 ).toThrow(
5c4c2dee
JB
380 new TypeError('Invalid tasks queue options: must be a plain object')
381 )
f0d7f803
JB
382 expect(
383 () =>
384 new FixedThreadPool(
385 numberOfWorkers,
b2fd3f4a 386 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
387 {
388 enableTasksQueue: true,
389 tasksQueueOptions: { concurrency: 0 }
390 }
391 )
948faff7 392 ).toThrow(
e695d66f 393 new RangeError(
20c6f652 394 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
8735b4e5
JB
395 )
396 )
f0d7f803
JB
397 expect(
398 () =>
399 new FixedThreadPool(
400 numberOfWorkers,
b2fd3f4a 401 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
402 {
403 enableTasksQueue: true,
5c4c2dee 404 tasksQueueOptions: { concurrency: -1 }
f0d7f803
JB
405 }
406 )
948faff7 407 ).toThrow(
5c4c2dee
JB
408 new RangeError(
409 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
410 )
8735b4e5 411 )
f0d7f803
JB
412 expect(
413 () =>
414 new FixedThreadPool(
415 numberOfWorkers,
b2fd3f4a 416 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
417 {
418 enableTasksQueue: true,
419 tasksQueueOptions: { concurrency: 0.2 }
420 }
421 )
948faff7 422 ).toThrow(
20c6f652 423 new TypeError('Invalid worker node tasks concurrency: must be an integer')
8735b4e5 424 )
5c4c2dee
JB
425 expect(
426 () =>
427 new FixedThreadPool(
428 numberOfWorkers,
b2fd3f4a 429 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
430 {
431 enableTasksQueue: true,
432 tasksQueueOptions: { size: 0 }
433 }
434 )
948faff7 435 ).toThrow(
5c4c2dee
JB
436 new RangeError(
437 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
438 )
439 )
440 expect(
441 () =>
442 new FixedThreadPool(
443 numberOfWorkers,
b2fd3f4a 444 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
445 {
446 enableTasksQueue: true,
447 tasksQueueOptions: { size: -1 }
448 }
449 )
948faff7 450 ).toThrow(
5c4c2dee
JB
451 new RangeError(
452 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
453 )
454 )
455 expect(
456 () =>
457 new FixedThreadPool(
458 numberOfWorkers,
b2fd3f4a 459 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
460 {
461 enableTasksQueue: true,
462 tasksQueueOptions: { size: 0.2 }
463 }
464 )
948faff7 465 ).toThrow(
5c4c2dee
JB
466 new TypeError('Invalid worker node tasks queue size: must be an integer')
467 )
d4aeae5a
JB
468 })
469
2431bdb4 470 it('Verify that pool worker choice strategy options can be set', async () => {
a20f0ba5
JB
471 const pool = new FixedThreadPool(
472 numberOfWorkers,
b2fd3f4a 473 './tests/worker-files/thread/testWorker.mjs',
a20f0ba5
JB
474 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
475 )
476 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
8c0b113f 477 retries: 6,
8990357d
JB
478 runTime: { median: false },
479 waitTime: { median: false },
480 elu: { median: false }
481 })
482 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
8c0b113f 483 retries: 6,
932fc8be 484 runTime: { median: false },
5df69fab
JB
485 waitTime: { median: false },
486 elu: { median: false }
a20f0ba5
JB
487 })
488 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
489 .workerChoiceStrategies) {
86bf340d 490 expect(workerChoiceStrategy.opts).toStrictEqual({
8c0b113f 491 retries: 6,
932fc8be 492 runTime: { median: false },
5df69fab
JB
493 waitTime: { median: false },
494 elu: { median: false }
86bf340d 495 })
a20f0ba5 496 }
87de9ff5
JB
497 expect(
498 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
499 ).toStrictEqual({
932fc8be
JB
500 runTime: {
501 aggregate: true,
502 average: true,
503 median: false
504 },
505 waitTime: {
506 aggregate: false,
507 average: false,
508 median: false
509 },
5df69fab 510 elu: {
9adcefab
JB
511 aggregate: true,
512 average: true,
5df69fab
JB
513 median: false
514 }
86bf340d 515 })
9adcefab
JB
516 pool.setWorkerChoiceStrategyOptions({
517 runTime: { median: true },
518 elu: { median: true }
519 })
a20f0ba5 520 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
8c0b113f 521 retries: 6,
9adcefab 522 runTime: { median: true },
8990357d
JB
523 waitTime: { median: false },
524 elu: { median: true }
525 })
526 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
8c0b113f 527 retries: 6,
8990357d
JB
528 runTime: { median: true },
529 waitTime: { median: false },
9adcefab 530 elu: { median: true }
a20f0ba5
JB
531 })
532 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
533 .workerChoiceStrategies) {
932fc8be 534 expect(workerChoiceStrategy.opts).toStrictEqual({
8c0b113f 535 retries: 6,
9adcefab 536 runTime: { median: true },
8990357d 537 waitTime: { median: false },
9adcefab 538 elu: { median: true }
932fc8be 539 })
a20f0ba5 540 }
87de9ff5
JB
541 expect(
542 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
543 ).toStrictEqual({
932fc8be
JB
544 runTime: {
545 aggregate: true,
546 average: false,
547 median: true
548 },
549 waitTime: {
550 aggregate: false,
551 average: false,
552 median: false
553 },
5df69fab 554 elu: {
9adcefab 555 aggregate: true,
5df69fab 556 average: false,
9adcefab 557 median: true
5df69fab 558 }
86bf340d 559 })
9adcefab
JB
560 pool.setWorkerChoiceStrategyOptions({
561 runTime: { median: false },
562 elu: { median: false }
563 })
a20f0ba5 564 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
8c0b113f 565 retries: 6,
8990357d
JB
566 runTime: { median: false },
567 waitTime: { median: false },
568 elu: { median: false }
569 })
570 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
8c0b113f 571 retries: 6,
9adcefab 572 runTime: { median: false },
8990357d 573 waitTime: { median: false },
9adcefab 574 elu: { median: false }
a20f0ba5
JB
575 })
576 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
577 .workerChoiceStrategies) {
932fc8be 578 expect(workerChoiceStrategy.opts).toStrictEqual({
8c0b113f 579 retries: 6,
9adcefab 580 runTime: { median: false },
8990357d 581 waitTime: { median: false },
9adcefab 582 elu: { median: false }
932fc8be 583 })
a20f0ba5 584 }
87de9ff5
JB
585 expect(
586 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
587 ).toStrictEqual({
932fc8be
JB
588 runTime: {
589 aggregate: true,
590 average: true,
591 median: false
592 },
593 waitTime: {
594 aggregate: false,
595 average: false,
596 median: false
597 },
5df69fab 598 elu: {
9adcefab
JB
599 aggregate: true,
600 average: true,
5df69fab
JB
601 median: false
602 }
86bf340d 603 })
1f95d544
JB
604 expect(() =>
605 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
948faff7 606 ).toThrow(
8735b4e5
JB
607 new TypeError(
608 'Invalid worker choice strategy options: must be a plain object'
609 )
1f95d544 610 )
7e653ee0
JB
611 expect(() =>
612 pool.setWorkerChoiceStrategyOptions({
8c0b113f 613 retries: 'invalidChoiceRetries'
7e653ee0 614 })
948faff7 615 ).toThrow(
7e653ee0 616 new TypeError(
8c0b113f 617 'Invalid worker choice strategy options: retries must be an integer'
7e653ee0
JB
618 )
619 )
948faff7 620 expect(() => pool.setWorkerChoiceStrategyOptions({ retries: -1 })).toThrow(
7e653ee0 621 new RangeError(
8c0b113f 622 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
7e653ee0
JB
623 )
624 )
948faff7 625 expect(() => pool.setWorkerChoiceStrategyOptions({ weights: {} })).toThrow(
8735b4e5
JB
626 new Error(
627 'Invalid worker choice strategy options: must have a weight for each worker node'
628 )
1f95d544
JB
629 )
630 expect(() =>
631 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
948faff7 632 ).toThrow(
8735b4e5
JB
633 new Error(
634 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
635 )
1f95d544 636 )
a20f0ba5
JB
637 await pool.destroy()
638 })
639
2431bdb4 640 it('Verify that pool tasks queue can be enabled/disabled', async () => {
a20f0ba5
JB
641 const pool = new FixedThreadPool(
642 numberOfWorkers,
b2fd3f4a 643 './tests/worker-files/thread/testWorker.mjs'
a20f0ba5
JB
644 )
645 expect(pool.opts.enableTasksQueue).toBe(false)
646 expect(pool.opts.tasksQueueOptions).toBeUndefined()
647 pool.enableTasksQueue(true)
648 expect(pool.opts.enableTasksQueue).toBe(true)
20c6f652
JB
649 expect(pool.opts.tasksQueueOptions).toStrictEqual({
650 concurrency: 1,
2324f8c9 651 size: Math.pow(numberOfWorkers, 2),
dbd73092 652 taskStealing: true,
47352846 653 tasksStealingOnBackPressure: true
20c6f652 654 })
a20f0ba5
JB
655 pool.enableTasksQueue(true, { concurrency: 2 })
656 expect(pool.opts.enableTasksQueue).toBe(true)
20c6f652
JB
657 expect(pool.opts.tasksQueueOptions).toStrictEqual({
658 concurrency: 2,
2324f8c9 659 size: Math.pow(numberOfWorkers, 2),
dbd73092 660 taskStealing: true,
47352846 661 tasksStealingOnBackPressure: true
20c6f652 662 })
a20f0ba5
JB
663 pool.enableTasksQueue(false)
664 expect(pool.opts.enableTasksQueue).toBe(false)
665 expect(pool.opts.tasksQueueOptions).toBeUndefined()
666 await pool.destroy()
667 })
668
2431bdb4 669 it('Verify that pool tasks queue options can be set', async () => {
a20f0ba5
JB
670 const pool = new FixedThreadPool(
671 numberOfWorkers,
b2fd3f4a 672 './tests/worker-files/thread/testWorker.mjs',
a20f0ba5
JB
673 { enableTasksQueue: true }
674 )
20c6f652
JB
675 expect(pool.opts.tasksQueueOptions).toStrictEqual({
676 concurrency: 1,
2324f8c9 677 size: Math.pow(numberOfWorkers, 2),
dbd73092 678 taskStealing: true,
47352846 679 tasksStealingOnBackPressure: true
20c6f652 680 })
d6ca1416 681 for (const workerNode of pool.workerNodes) {
2324f8c9
JB
682 expect(workerNode.tasksQueueBackPressureSize).toBe(
683 pool.opts.tasksQueueOptions.size
684 )
d6ca1416
JB
685 }
686 pool.setTasksQueueOptions({
687 concurrency: 2,
2324f8c9 688 size: 2,
d6ca1416
JB
689 taskStealing: false,
690 tasksStealingOnBackPressure: false
691 })
20c6f652
JB
692 expect(pool.opts.tasksQueueOptions).toStrictEqual({
693 concurrency: 2,
2324f8c9 694 size: 2,
d6ca1416
JB
695 taskStealing: false,
696 tasksStealingOnBackPressure: false
697 })
698 for (const workerNode of pool.workerNodes) {
2324f8c9
JB
699 expect(workerNode.tasksQueueBackPressureSize).toBe(
700 pool.opts.tasksQueueOptions.size
701 )
d6ca1416
JB
702 }
703 pool.setTasksQueueOptions({
704 concurrency: 1,
705 taskStealing: true,
706 tasksStealingOnBackPressure: true
707 })
708 expect(pool.opts.tasksQueueOptions).toStrictEqual({
709 concurrency: 1,
2324f8c9 710 size: Math.pow(numberOfWorkers, 2),
dbd73092 711 taskStealing: true,
47352846 712 tasksStealingOnBackPressure: true
20c6f652 713 })
d6ca1416 714 for (const workerNode of pool.workerNodes) {
2324f8c9
JB
715 expect(workerNode.tasksQueueBackPressureSize).toBe(
716 pool.opts.tasksQueueOptions.size
717 )
d6ca1416 718 }
948faff7 719 expect(() => pool.setTasksQueueOptions('invalidTasksQueueOptions')).toThrow(
8735b4e5
JB
720 new TypeError('Invalid tasks queue options: must be a plain object')
721 )
948faff7 722 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrow(
e695d66f 723 new RangeError(
20c6f652 724 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
8735b4e5
JB
725 )
726 )
948faff7 727 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrow(
e695d66f 728 new RangeError(
20c6f652 729 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
8735b4e5 730 )
a20f0ba5 731 )
948faff7 732 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrow(
20c6f652
JB
733 new TypeError('Invalid worker node tasks concurrency: must be an integer')
734 )
948faff7 735 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrow(
20c6f652 736 new RangeError(
68dbcdc0 737 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
20c6f652
JB
738 )
739 )
948faff7 740 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrow(
20c6f652 741 new RangeError(
68dbcdc0 742 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
20c6f652
JB
743 )
744 )
948faff7 745 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrow(
68dbcdc0 746 new TypeError('Invalid worker node tasks queue size: must be an integer')
f0d7f803 747 )
a20f0ba5
JB
748 await pool.destroy()
749 })
750
6b27d407
JB
751 it('Verify that pool info is set', async () => {
752 let pool = new FixedThreadPool(
753 numberOfWorkers,
b2fd3f4a 754 './tests/worker-files/thread/testWorker.mjs'
6b27d407 755 )
2dca6cab
JB
756 expect(pool.info).toStrictEqual({
757 version,
758 type: PoolTypes.fixed,
759 worker: WorkerTypes.thread,
47352846 760 started: true,
2dca6cab
JB
761 ready: true,
762 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
763 minSize: numberOfWorkers,
764 maxSize: numberOfWorkers,
765 workerNodes: numberOfWorkers,
766 idleWorkerNodes: numberOfWorkers,
767 busyWorkerNodes: 0,
768 executedTasks: 0,
769 executingTasks: 0,
2dca6cab
JB
770 failedTasks: 0
771 })
6b27d407
JB
772 await pool.destroy()
773 pool = new DynamicClusterPool(
2431bdb4 774 Math.floor(numberOfWorkers / 2),
6b27d407 775 numberOfWorkers,
ecdfbdc0 776 './tests/worker-files/cluster/testWorker.js'
6b27d407 777 )
2dca6cab
JB
778 expect(pool.info).toStrictEqual({
779 version,
780 type: PoolTypes.dynamic,
781 worker: WorkerTypes.cluster,
47352846 782 started: true,
2dca6cab
JB
783 ready: true,
784 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
785 minSize: Math.floor(numberOfWorkers / 2),
786 maxSize: numberOfWorkers,
787 workerNodes: Math.floor(numberOfWorkers / 2),
788 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
789 busyWorkerNodes: 0,
790 executedTasks: 0,
791 executingTasks: 0,
2dca6cab
JB
792 failedTasks: 0
793 })
6b27d407
JB
794 await pool.destroy()
795 })
796
2431bdb4 797 it('Verify that pool worker tasks usage are initialized', async () => {
bf9549ae
JB
798 const pool = new FixedClusterPool(
799 numberOfWorkers,
800 './tests/worker-files/cluster/testWorker.js'
801 )
f06e48d8 802 for (const workerNode of pool.workerNodes) {
47352846 803 expect(workerNode).toBeInstanceOf(WorkerNode)
465b2940 804 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
805 tasks: {
806 executed: 0,
807 executing: 0,
808 queued: 0,
df593701 809 maxQueued: 0,
463226a4 810 sequentiallyStolen: 0,
68cbdc84 811 stolen: 0,
a4e07f72
JB
812 failed: 0
813 },
814 runTime: {
4ba4c7f9 815 history: new CircularArray()
a4e07f72
JB
816 },
817 waitTime: {
4ba4c7f9 818 history: new CircularArray()
a4e07f72 819 },
5df69fab
JB
820 elu: {
821 idle: {
4ba4c7f9 822 history: new CircularArray()
5df69fab
JB
823 },
824 active: {
4ba4c7f9 825 history: new CircularArray()
f7510105 826 }
5df69fab 827 }
86bf340d 828 })
f06e48d8
JB
829 }
830 await pool.destroy()
831 })
832
2431bdb4
JB
833 it('Verify that pool worker tasks queue are initialized', async () => {
834 let pool = new FixedClusterPool(
f06e48d8
JB
835 numberOfWorkers,
836 './tests/worker-files/cluster/testWorker.js'
837 )
838 for (const workerNode of pool.workerNodes) {
47352846 839 expect(workerNode).toBeInstanceOf(WorkerNode)
574b351d 840 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
4d8bf9e4 841 expect(workerNode.tasksQueue.size).toBe(0)
9c16fb4b 842 expect(workerNode.tasksQueue.maxSize).toBe(0)
bf9549ae 843 }
fd7ebd49 844 await pool.destroy()
2431bdb4
JB
845 pool = new DynamicThreadPool(
846 Math.floor(numberOfWorkers / 2),
847 numberOfWorkers,
b2fd3f4a 848 './tests/worker-files/thread/testWorker.mjs'
2431bdb4
JB
849 )
850 for (const workerNode of pool.workerNodes) {
47352846 851 expect(workerNode).toBeInstanceOf(WorkerNode)
574b351d 852 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
2431bdb4
JB
853 expect(workerNode.tasksQueue.size).toBe(0)
854 expect(workerNode.tasksQueue.maxSize).toBe(0)
855 }
213cbac6 856 await pool.destroy()
2431bdb4
JB
857 })
858
859 it('Verify that pool worker info are initialized', async () => {
860 let pool = new FixedClusterPool(
861 numberOfWorkers,
862 './tests/worker-files/cluster/testWorker.js'
863 )
2dca6cab 864 for (const workerNode of pool.workerNodes) {
47352846 865 expect(workerNode).toBeInstanceOf(WorkerNode)
2dca6cab
JB
866 expect(workerNode.info).toStrictEqual({
867 id: expect.any(Number),
868 type: WorkerTypes.cluster,
869 dynamic: false,
870 ready: true
871 })
872 }
2431bdb4
JB
873 await pool.destroy()
874 pool = new DynamicThreadPool(
875 Math.floor(numberOfWorkers / 2),
876 numberOfWorkers,
b2fd3f4a 877 './tests/worker-files/thread/testWorker.mjs'
2431bdb4 878 )
2dca6cab 879 for (const workerNode of pool.workerNodes) {
47352846 880 expect(workerNode).toBeInstanceOf(WorkerNode)
2dca6cab
JB
881 expect(workerNode.info).toStrictEqual({
882 id: expect.any(Number),
883 type: WorkerTypes.thread,
884 dynamic: false,
7884d183 885 ready: true
2dca6cab
JB
886 })
887 }
213cbac6 888 await pool.destroy()
bf9549ae
JB
889 })
890
711623b8
JB
891 it('Verify that pool statuses are checked at start or destroy', async () => {
892 const pool = new FixedThreadPool(
893 numberOfWorkers,
894 './tests/worker-files/thread/testWorker.mjs'
895 )
896 expect(pool.info.started).toBe(true)
897 expect(pool.info.ready).toBe(true)
898 expect(() => pool.start()).toThrow(
899 new Error('Cannot start an already started pool')
900 )
901 await pool.destroy()
902 expect(pool.info.started).toBe(false)
903 expect(pool.info.ready).toBe(false)
904 await expect(pool.destroy()).rejects.toThrow(
905 new Error('Cannot destroy an already destroyed pool')
906 )
907 })
908
47352846
JB
909 it('Verify that pool can be started after initialization', async () => {
910 const pool = new FixedClusterPool(
911 numberOfWorkers,
912 './tests/worker-files/cluster/testWorker.js',
913 {
914 startWorkers: false
915 }
916 )
917 expect(pool.info.started).toBe(false)
918 expect(pool.info.ready).toBe(false)
55082af9 919 expect(pool.readyEventEmitted).toBe(false)
47352846 920 expect(pool.workerNodes).toStrictEqual([])
948faff7 921 await expect(pool.execute()).rejects.toThrow(
47352846
JB
922 new Error('Cannot execute a task on not started pool')
923 )
924 pool.start()
925 expect(pool.info.started).toBe(true)
926 expect(pool.info.ready).toBe(true)
55082af9
JB
927 await waitPoolEvents(pool, PoolEvents.ready, 1)
928 expect(pool.readyEventEmitted).toBe(true)
47352846
JB
929 expect(pool.workerNodes.length).toBe(numberOfWorkers)
930 for (const workerNode of pool.workerNodes) {
931 expect(workerNode).toBeInstanceOf(WorkerNode)
932 }
933 await pool.destroy()
934 })
935
9d2d0da1
JB
936 it('Verify that pool execute() arguments are checked', async () => {
937 const pool = new FixedClusterPool(
938 numberOfWorkers,
939 './tests/worker-files/cluster/testWorker.js'
940 )
948faff7 941 await expect(pool.execute(undefined, 0)).rejects.toThrow(
9d2d0da1
JB
942 new TypeError('name argument must be a string')
943 )
948faff7 944 await expect(pool.execute(undefined, '')).rejects.toThrow(
9d2d0da1
JB
945 new TypeError('name argument must not be an empty string')
946 )
948faff7 947 await expect(pool.execute(undefined, undefined, {})).rejects.toThrow(
9d2d0da1
JB
948 new TypeError('transferList argument must be an array')
949 )
950 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
951 "Task function 'unknown' not found"
952 )
953 await pool.destroy()
948faff7 954 await expect(pool.execute()).rejects.toThrow(
47352846 955 new Error('Cannot execute a task on not started pool')
9d2d0da1
JB
956 )
957 })
958
2431bdb4 959 it('Verify that pool worker tasks usage are computed', async () => {
bf9549ae
JB
960 const pool = new FixedClusterPool(
961 numberOfWorkers,
962 './tests/worker-files/cluster/testWorker.js'
963 )
09c2d0d3 964 const promises = new Set()
fc027381
JB
965 const maxMultiplier = 2
966 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
09c2d0d3 967 promises.add(pool.execute())
bf9549ae 968 }
f06e48d8 969 for (const workerNode of pool.workerNodes) {
465b2940 970 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
971 tasks: {
972 executed: 0,
973 executing: maxMultiplier,
974 queued: 0,
df593701 975 maxQueued: 0,
463226a4 976 sequentiallyStolen: 0,
68cbdc84 977 stolen: 0,
a4e07f72
JB
978 failed: 0
979 },
980 runTime: {
a4e07f72
JB
981 history: expect.any(CircularArray)
982 },
983 waitTime: {
a4e07f72
JB
984 history: expect.any(CircularArray)
985 },
5df69fab
JB
986 elu: {
987 idle: {
5df69fab
JB
988 history: expect.any(CircularArray)
989 },
990 active: {
5df69fab 991 history: expect.any(CircularArray)
f7510105 992 }
5df69fab 993 }
86bf340d 994 })
bf9549ae
JB
995 }
996 await Promise.all(promises)
f06e48d8 997 for (const workerNode of pool.workerNodes) {
465b2940 998 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
999 tasks: {
1000 executed: maxMultiplier,
1001 executing: 0,
1002 queued: 0,
df593701 1003 maxQueued: 0,
463226a4 1004 sequentiallyStolen: 0,
68cbdc84 1005 stolen: 0,
a4e07f72
JB
1006 failed: 0
1007 },
1008 runTime: {
a4e07f72
JB
1009 history: expect.any(CircularArray)
1010 },
1011 waitTime: {
a4e07f72
JB
1012 history: expect.any(CircularArray)
1013 },
5df69fab
JB
1014 elu: {
1015 idle: {
5df69fab
JB
1016 history: expect.any(CircularArray)
1017 },
1018 active: {
5df69fab 1019 history: expect.any(CircularArray)
f7510105 1020 }
5df69fab 1021 }
86bf340d 1022 })
bf9549ae 1023 }
fd7ebd49 1024 await pool.destroy()
bf9549ae
JB
1025 })
1026
2431bdb4 1027 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
7fd82a1c 1028 const pool = new DynamicThreadPool(
2431bdb4 1029 Math.floor(numberOfWorkers / 2),
8f4878b7 1030 numberOfWorkers,
b2fd3f4a 1031 './tests/worker-files/thread/testWorker.mjs'
9e619829 1032 )
09c2d0d3 1033 const promises = new Set()
ee9f5295
JB
1034 const maxMultiplier = 2
1035 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
09c2d0d3 1036 promises.add(pool.execute())
9e619829
JB
1037 }
1038 await Promise.all(promises)
f06e48d8 1039 for (const workerNode of pool.workerNodes) {
465b2940 1040 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
1041 tasks: {
1042 executed: expect.any(Number),
1043 executing: 0,
1044 queued: 0,
df593701 1045 maxQueued: 0,
463226a4 1046 sequentiallyStolen: 0,
68cbdc84 1047 stolen: 0,
a4e07f72
JB
1048 failed: 0
1049 },
1050 runTime: {
a4e07f72
JB
1051 history: expect.any(CircularArray)
1052 },
1053 waitTime: {
a4e07f72
JB
1054 history: expect.any(CircularArray)
1055 },
5df69fab
JB
1056 elu: {
1057 idle: {
5df69fab
JB
1058 history: expect.any(CircularArray)
1059 },
1060 active: {
5df69fab 1061 history: expect.any(CircularArray)
f7510105 1062 }
5df69fab 1063 }
86bf340d 1064 })
465b2940 1065 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
94407def
JB
1066 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1067 numberOfWorkers * maxMultiplier
1068 )
b97d82d8
JB
1069 expect(workerNode.usage.runTime.history.length).toBe(0)
1070 expect(workerNode.usage.waitTime.history.length).toBe(0)
1071 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1072 expect(workerNode.usage.elu.active.history.length).toBe(0)
9e619829
JB
1073 }
1074 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
f06e48d8 1075 for (const workerNode of pool.workerNodes) {
465b2940 1076 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
1077 tasks: {
1078 executed: 0,
1079 executing: 0,
1080 queued: 0,
df593701 1081 maxQueued: 0,
463226a4 1082 sequentiallyStolen: 0,
68cbdc84 1083 stolen: 0,
a4e07f72
JB
1084 failed: 0
1085 },
1086 runTime: {
a4e07f72
JB
1087 history: expect.any(CircularArray)
1088 },
1089 waitTime: {
a4e07f72
JB
1090 history: expect.any(CircularArray)
1091 },
5df69fab
JB
1092 elu: {
1093 idle: {
5df69fab
JB
1094 history: expect.any(CircularArray)
1095 },
1096 active: {
5df69fab 1097 history: expect.any(CircularArray)
f7510105 1098 }
5df69fab 1099 }
86bf340d 1100 })
465b2940
JB
1101 expect(workerNode.usage.runTime.history.length).toBe(0)
1102 expect(workerNode.usage.waitTime.history.length).toBe(0)
b97d82d8
JB
1103 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1104 expect(workerNode.usage.elu.active.history.length).toBe(0)
ee11a4a2 1105 }
fd7ebd49 1106 await pool.destroy()
ee11a4a2
JB
1107 })
1108
a1763c54
JB
1109 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1110 const pool = new DynamicClusterPool(
2431bdb4 1111 Math.floor(numberOfWorkers / 2),
164d950a 1112 numberOfWorkers,
a1763c54 1113 './tests/worker-files/cluster/testWorker.js'
164d950a 1114 )
c726f66c 1115 expect(pool.emitter.eventNames()).toStrictEqual([])
d46660cd 1116 let poolInfo
a1763c54 1117 let poolReady = 0
041dc05b 1118 pool.emitter.on(PoolEvents.ready, info => {
a1763c54 1119 ++poolReady
d46660cd
JB
1120 poolInfo = info
1121 })
a1763c54 1122 await waitPoolEvents(pool, PoolEvents.ready, 1)
c726f66c 1123 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
a1763c54 1124 expect(poolReady).toBe(1)
d46660cd 1125 expect(poolInfo).toStrictEqual({
23ccf9d7 1126 version,
d46660cd 1127 type: PoolTypes.dynamic,
a1763c54 1128 worker: WorkerTypes.cluster,
47352846 1129 started: true,
a1763c54 1130 ready: true,
2431bdb4
JB
1131 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1132 minSize: expect.any(Number),
1133 maxSize: expect.any(Number),
1134 workerNodes: expect.any(Number),
1135 idleWorkerNodes: expect.any(Number),
1136 busyWorkerNodes: expect.any(Number),
1137 executedTasks: expect.any(Number),
1138 executingTasks: expect.any(Number),
2431bdb4
JB
1139 failedTasks: expect.any(Number)
1140 })
1141 await pool.destroy()
1142 })
1143
a1763c54
JB
1144 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1145 const pool = new FixedThreadPool(
2431bdb4 1146 numberOfWorkers,
b2fd3f4a 1147 './tests/worker-files/thread/testWorker.mjs'
2431bdb4 1148 )
c726f66c 1149 expect(pool.emitter.eventNames()).toStrictEqual([])
a1763c54
JB
1150 const promises = new Set()
1151 let poolBusy = 0
2431bdb4 1152 let poolInfo
041dc05b 1153 pool.emitter.on(PoolEvents.busy, info => {
a1763c54 1154 ++poolBusy
2431bdb4
JB
1155 poolInfo = info
1156 })
c726f66c 1157 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
a1763c54
JB
1158 for (let i = 0; i < numberOfWorkers * 2; i++) {
1159 promises.add(pool.execute())
1160 }
1161 await Promise.all(promises)
1162 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1163 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1164 expect(poolBusy).toBe(numberOfWorkers + 1)
2431bdb4
JB
1165 expect(poolInfo).toStrictEqual({
1166 version,
a1763c54
JB
1167 type: PoolTypes.fixed,
1168 worker: WorkerTypes.thread,
47352846
JB
1169 started: true,
1170 ready: true,
2431bdb4 1171 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
d46660cd
JB
1172 minSize: expect.any(Number),
1173 maxSize: expect.any(Number),
1174 workerNodes: expect.any(Number),
1175 idleWorkerNodes: expect.any(Number),
1176 busyWorkerNodes: expect.any(Number),
a4e07f72
JB
1177 executedTasks: expect.any(Number),
1178 executingTasks: expect.any(Number),
a4e07f72 1179 failedTasks: expect.any(Number)
d46660cd 1180 })
164d950a
JB
1181 await pool.destroy()
1182 })
1183
a1763c54
JB
1184 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1185 const pool = new DynamicThreadPool(
1186 Math.floor(numberOfWorkers / 2),
7c0ba920 1187 numberOfWorkers,
b2fd3f4a 1188 './tests/worker-files/thread/testWorker.mjs'
7c0ba920 1189 )
c726f66c 1190 expect(pool.emitter.eventNames()).toStrictEqual([])
09c2d0d3 1191 const promises = new Set()
a1763c54 1192 let poolFull = 0
d46660cd 1193 let poolInfo
041dc05b 1194 pool.emitter.on(PoolEvents.full, info => {
a1763c54 1195 ++poolFull
d46660cd
JB
1196 poolInfo = info
1197 })
c726f66c 1198 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.full])
7c0ba920 1199 for (let i = 0; i < numberOfWorkers * 2; i++) {
f5d14e90 1200 promises.add(pool.execute())
7c0ba920 1201 }
cf597bc5 1202 await Promise.all(promises)
33e6bb4c 1203 expect(poolFull).toBe(1)
d46660cd 1204 expect(poolInfo).toStrictEqual({
23ccf9d7 1205 version,
a1763c54 1206 type: PoolTypes.dynamic,
d46660cd 1207 worker: WorkerTypes.thread,
47352846
JB
1208 started: true,
1209 ready: true,
2431bdb4 1210 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
8735b4e5
JB
1211 minSize: expect.any(Number),
1212 maxSize: expect.any(Number),
1213 workerNodes: expect.any(Number),
1214 idleWorkerNodes: expect.any(Number),
1215 busyWorkerNodes: expect.any(Number),
1216 executedTasks: expect.any(Number),
1217 executingTasks: expect.any(Number),
1218 failedTasks: expect.any(Number)
1219 })
1220 await pool.destroy()
1221 })
1222
3e8611a8 1223 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
b1aae695 1224 const pool = new FixedThreadPool(
8735b4e5 1225 numberOfWorkers,
b2fd3f4a 1226 './tests/worker-files/thread/testWorker.mjs',
8735b4e5
JB
1227 {
1228 enableTasksQueue: true
1229 }
1230 )
a074ffee 1231 stub(pool, 'hasBackPressure').returns(true)
c726f66c 1232 expect(pool.emitter.eventNames()).toStrictEqual([])
8735b4e5
JB
1233 const promises = new Set()
1234 let poolBackPressure = 0
1235 let poolInfo
041dc05b 1236 pool.emitter.on(PoolEvents.backPressure, info => {
8735b4e5
JB
1237 ++poolBackPressure
1238 poolInfo = info
1239 })
c726f66c 1240 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.backPressure])
033f1776 1241 for (let i = 0; i < numberOfWorkers + 1; i++) {
8735b4e5
JB
1242 promises.add(pool.execute())
1243 }
1244 await Promise.all(promises)
033f1776 1245 expect(poolBackPressure).toBe(1)
8735b4e5
JB
1246 expect(poolInfo).toStrictEqual({
1247 version,
3e8611a8 1248 type: PoolTypes.fixed,
8735b4e5 1249 worker: WorkerTypes.thread,
47352846
JB
1250 started: true,
1251 ready: true,
8735b4e5 1252 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
d46660cd
JB
1253 minSize: expect.any(Number),
1254 maxSize: expect.any(Number),
1255 workerNodes: expect.any(Number),
1256 idleWorkerNodes: expect.any(Number),
1257 busyWorkerNodes: expect.any(Number),
a4e07f72
JB
1258 executedTasks: expect.any(Number),
1259 executingTasks: expect.any(Number),
3e8611a8
JB
1260 maxQueuedTasks: expect.any(Number),
1261 queuedTasks: expect.any(Number),
1262 backPressure: true,
68cbdc84 1263 stolenTasks: expect.any(Number),
a4e07f72 1264 failedTasks: expect.any(Number)
d46660cd 1265 })
3e8611a8 1266 expect(pool.hasBackPressure.called).toBe(true)
fd7ebd49 1267 await pool.destroy()
7c0ba920 1268 })
70a4f5ea 1269
9eae3c69
JB
1270 it('Verify that hasTaskFunction() is working', async () => {
1271 const dynamicThreadPool = new DynamicThreadPool(
1272 Math.floor(numberOfWorkers / 2),
1273 numberOfWorkers,
b2fd3f4a 1274 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
9eae3c69
JB
1275 )
1276 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1277 expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1278 expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1279 true
1280 )
1281 expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true)
1282 expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true)
1283 expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false)
1284 await dynamicThreadPool.destroy()
1285 const fixedClusterPool = new FixedClusterPool(
1286 numberOfWorkers,
1287 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1288 )
1289 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1290 expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1291 expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1292 true
1293 )
1294 expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true)
1295 expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true)
1296 expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false)
1297 await fixedClusterPool.destroy()
1298 })
1299
1300 it('Verify that addTaskFunction() is working', async () => {
1301 const dynamicThreadPool = new DynamicThreadPool(
1302 Math.floor(numberOfWorkers / 2),
1303 numberOfWorkers,
b2fd3f4a 1304 './tests/worker-files/thread/testWorker.mjs'
9eae3c69
JB
1305 )
1306 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
3feeab69
JB
1307 await expect(
1308 dynamicThreadPool.addTaskFunction(0, () => {})
948faff7 1309 ).rejects.toThrow(new TypeError('name argument must be a string'))
3feeab69
JB
1310 await expect(
1311 dynamicThreadPool.addTaskFunction('', () => {})
948faff7 1312 ).rejects.toThrow(
3feeab69
JB
1313 new TypeError('name argument must not be an empty string')
1314 )
948faff7
JB
1315 await expect(dynamicThreadPool.addTaskFunction('test', 0)).rejects.toThrow(
1316 new TypeError('fn argument must be a function')
1317 )
1318 await expect(dynamicThreadPool.addTaskFunction('test', '')).rejects.toThrow(
1319 new TypeError('fn argument must be a function')
1320 )
9eae3c69
JB
1321 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1322 DEFAULT_TASK_NAME,
1323 'test'
1324 ])
1325 const echoTaskFunction = data => {
1326 return data
1327 }
1328 await expect(
1329 dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1330 ).resolves.toBe(true)
1331 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1332 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1333 echoTaskFunction
1334 )
1335 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1336 DEFAULT_TASK_NAME,
1337 'test',
1338 'echo'
1339 ])
1340 const taskFunctionData = { test: 'test' }
1341 const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
1342 expect(echoResult).toStrictEqual(taskFunctionData)
adee6053
JB
1343 for (const workerNode of dynamicThreadPool.workerNodes) {
1344 expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
1345 tasks: {
1346 executed: expect.any(Number),
1347 executing: 0,
1348 queued: 0,
1349 stolen: 0,
463226a4 1350 sequentiallyStolen: 0,
adee6053
JB
1351 failed: 0
1352 },
1353 runTime: {
1354 history: new CircularArray()
1355 },
1356 waitTime: {
1357 history: new CircularArray()
1358 },
1359 elu: {
1360 idle: {
1361 history: new CircularArray()
1362 },
1363 active: {
1364 history: new CircularArray()
1365 }
1366 }
1367 })
1368 }
9eae3c69
JB
1369 await dynamicThreadPool.destroy()
1370 })
1371
1372 it('Verify that removeTaskFunction() is working', async () => {
1373 const dynamicThreadPool = new DynamicThreadPool(
1374 Math.floor(numberOfWorkers / 2),
1375 numberOfWorkers,
b2fd3f4a 1376 './tests/worker-files/thread/testWorker.mjs'
9eae3c69
JB
1377 )
1378 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1379 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1380 DEFAULT_TASK_NAME,
1381 'test'
1382 ])
948faff7 1383 await expect(dynamicThreadPool.removeTaskFunction('test')).rejects.toThrow(
16248b23 1384 new Error('Cannot remove a task function not handled on the pool side')
9eae3c69
JB
1385 )
1386 const echoTaskFunction = data => {
1387 return data
1388 }
1389 await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1390 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1391 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1392 echoTaskFunction
1393 )
1394 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1395 DEFAULT_TASK_NAME,
1396 'test',
1397 'echo'
1398 ])
1399 await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
1400 true
1401 )
1402 expect(dynamicThreadPool.taskFunctions.size).toBe(0)
1403 expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
1404 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1405 DEFAULT_TASK_NAME,
1406 'test'
1407 ])
1408 await dynamicThreadPool.destroy()
1409 })
1410
30500265 1411 it('Verify that listTaskFunctionNames() is working', async () => {
90d7d101
JB
1412 const dynamicThreadPool = new DynamicThreadPool(
1413 Math.floor(numberOfWorkers / 2),
1414 numberOfWorkers,
b2fd3f4a 1415 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
90d7d101
JB
1416 )
1417 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
66979634 1418 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
6cd5248f 1419 DEFAULT_TASK_NAME,
90d7d101
JB
1420 'jsonIntegerSerialization',
1421 'factorial',
1422 'fibonacci'
1423 ])
9eae3c69 1424 await dynamicThreadPool.destroy()
90d7d101
JB
1425 const fixedClusterPool = new FixedClusterPool(
1426 numberOfWorkers,
1427 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1428 )
1429 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
66979634 1430 expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([
6cd5248f 1431 DEFAULT_TASK_NAME,
90d7d101
JB
1432 'jsonIntegerSerialization',
1433 'factorial',
1434 'fibonacci'
1435 ])
0fe39c97 1436 await fixedClusterPool.destroy()
90d7d101
JB
1437 })
1438
9eae3c69 1439 it('Verify that setDefaultTaskFunction() is working', async () => {
30500265
JB
1440 const dynamicThreadPool = new DynamicThreadPool(
1441 Math.floor(numberOfWorkers / 2),
1442 numberOfWorkers,
b2fd3f4a 1443 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
30500265
JB
1444 )
1445 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
711623b8 1446 const workerId = dynamicThreadPool.workerNodes[0].info.id
948faff7 1447 await expect(dynamicThreadPool.setDefaultTaskFunction(0)).rejects.toThrow(
b0b55f57 1448 new Error(
711623b8 1449 `Task function operation 'default' failed on worker ${workerId} with error: 'TypeError: name parameter is not a string'`
b0b55f57
JB
1450 )
1451 )
1452 await expect(
1453 dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
948faff7 1454 ).rejects.toThrow(
b0b55f57 1455 new Error(
711623b8 1456 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function reserved name as the default task function'`
b0b55f57
JB
1457 )
1458 )
1459 await expect(
1460 dynamicThreadPool.setDefaultTaskFunction('unknown')
948faff7 1461 ).rejects.toThrow(
b0b55f57 1462 new Error(
711623b8 1463 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function to a non-existing task function'`
b0b55f57
JB
1464 )
1465 )
9eae3c69
JB
1466 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1467 DEFAULT_TASK_NAME,
1468 'jsonIntegerSerialization',
1469 'factorial',
1470 'fibonacci'
1471 ])
1472 await expect(
1473 dynamicThreadPool.setDefaultTaskFunction('factorial')
1474 ).resolves.toBe(true)
1475 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1476 DEFAULT_TASK_NAME,
1477 'factorial',
1478 'jsonIntegerSerialization',
1479 'fibonacci'
1480 ])
1481 await expect(
1482 dynamicThreadPool.setDefaultTaskFunction('fibonacci')
1483 ).resolves.toBe(true)
1484 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1485 DEFAULT_TASK_NAME,
1486 'fibonacci',
1487 'jsonIntegerSerialization',
1488 'factorial'
1489 ])
cda9ba34 1490 await dynamicThreadPool.destroy()
30500265
JB
1491 })
1492
90d7d101 1493 it('Verify that multiple task functions worker is working', async () => {
70a4f5ea 1494 const pool = new DynamicClusterPool(
2431bdb4 1495 Math.floor(numberOfWorkers / 2),
70a4f5ea 1496 numberOfWorkers,
90d7d101 1497 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
70a4f5ea
JB
1498 )
1499 const data = { n: 10 }
82888165 1500 const result0 = await pool.execute(data)
30b963d4 1501 expect(result0).toStrictEqual({ ok: 1 })
70a4f5ea 1502 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
30b963d4 1503 expect(result1).toStrictEqual({ ok: 1 })
70a4f5ea
JB
1504 const result2 = await pool.execute(data, 'factorial')
1505 expect(result2).toBe(3628800)
1506 const result3 = await pool.execute(data, 'fibonacci')
024daf59 1507 expect(result3).toBe(55)
5bb5be17
JB
1508 expect(pool.info.executingTasks).toBe(0)
1509 expect(pool.info.executedTasks).toBe(4)
b414b84c 1510 for (const workerNode of pool.workerNodes) {
66979634 1511 expect(workerNode.info.taskFunctionNames).toStrictEqual([
6cd5248f 1512 DEFAULT_TASK_NAME,
b414b84c
JB
1513 'jsonIntegerSerialization',
1514 'factorial',
1515 'fibonacci'
1516 ])
1517 expect(workerNode.taskFunctionsUsage.size).toBe(3)
66979634 1518 for (const name of pool.listTaskFunctionNames()) {
5bb5be17
JB
1519 expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
1520 tasks: {
1521 executed: expect.any(Number),
4ba4c7f9 1522 executing: 0,
5bb5be17 1523 failed: 0,
68cbdc84 1524 queued: 0,
463226a4 1525 sequentiallyStolen: 0,
68cbdc84 1526 stolen: 0
5bb5be17
JB
1527 },
1528 runTime: {
1529 history: expect.any(CircularArray)
1530 },
1531 waitTime: {
1532 history: expect.any(CircularArray)
1533 },
1534 elu: {
1535 idle: {
1536 history: expect.any(CircularArray)
1537 },
1538 active: {
1539 history: expect.any(CircularArray)
1540 }
1541 }
1542 })
1543 expect(
4ba4c7f9
JB
1544 workerNode.getTaskFunctionWorkerUsage(name).tasks.executed
1545 ).toBeGreaterThan(0)
5bb5be17 1546 }
dfd7ec01
JB
1547 expect(
1548 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1549 ).toStrictEqual(
66979634
JB
1550 workerNode.getTaskFunctionWorkerUsage(
1551 workerNode.info.taskFunctionNames[1]
1552 )
dfd7ec01 1553 )
5bb5be17 1554 }
0fe39c97 1555 await pool.destroy()
70a4f5ea 1556 })
52a23942
JB
1557
1558 it('Verify sendKillMessageToWorker()', async () => {
1559 const pool = new DynamicClusterPool(
1560 Math.floor(numberOfWorkers / 2),
1561 numberOfWorkers,
1562 './tests/worker-files/cluster/testWorker.js'
1563 )
1564 const workerNodeKey = 0
1565 await expect(
adee6053 1566 pool.sendKillMessageToWorker(workerNodeKey)
52a23942
JB
1567 ).resolves.toBeUndefined()
1568 await pool.destroy()
1569 })
adee6053
JB
1570
1571 it('Verify sendTaskFunctionOperationToWorker()', async () => {
1572 const pool = new DynamicClusterPool(
1573 Math.floor(numberOfWorkers / 2),
1574 numberOfWorkers,
1575 './tests/worker-files/cluster/testWorker.js'
1576 )
1577 const workerNodeKey = 0
1578 await expect(
1579 pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
1580 taskFunctionOperation: 'add',
1581 taskFunctionName: 'empty',
1582 taskFunction: (() => {}).toString()
1583 })
1584 ).resolves.toBe(true)
1585 expect(
1586 pool.workerNodes[workerNodeKey].info.taskFunctionNames
1587 ).toStrictEqual([DEFAULT_TASK_NAME, 'test', 'empty'])
1588 await pool.destroy()
1589 })
1590
1591 it('Verify sendTaskFunctionOperationToWorkers()', async () => {
1592 const pool = new DynamicClusterPool(
1593 Math.floor(numberOfWorkers / 2),
1594 numberOfWorkers,
1595 './tests/worker-files/cluster/testWorker.js'
1596 )
adee6053
JB
1597 await expect(
1598 pool.sendTaskFunctionOperationToWorkers({
1599 taskFunctionOperation: 'add',
1600 taskFunctionName: 'empty',
1601 taskFunction: (() => {}).toString()
1602 })
1603 ).resolves.toBe(true)
1604 for (const workerNode of pool.workerNodes) {
1605 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1606 DEFAULT_TASK_NAME,
1607 'test',
1608 'empty'
1609 ])
1610 }
1611 await pool.destroy()
1612 })
3ec964d6 1613})