chore(deps-dev): apply updates
[poolifier.git] / tests / pools / abstract-pool.test.mjs
CommitLineData
97231086 1import { expect } from 'expect'
5d5885ee 2// eslint-disable-next-line n/no-unsupported-features/node-builtins
ded253e2 3import { createHook, executionAsyncId } from 'node:async_hooks'
a074ffee
JB
4import { EventEmitterAsyncResource } from 'node:events'
5import { readFileSync } from 'node:fs'
ded253e2 6import { dirname, join } from 'node:path'
2eb14889 7import { fileURLToPath } from 'node:url'
ded253e2 8
f12182ad 9import { CircularBuffer } from '../../lib/circular-buffer.cjs'
a074ffee 10import {
70a4f5ea 11 DynamicClusterPool,
9e619829 12 DynamicThreadPool,
aee46736 13 FixedClusterPool,
e843b904 14 FixedThreadPool,
aee46736 15 PoolEvents,
184855e6 16 PoolTypes,
3d6dd312 17 WorkerChoiceStrategies,
3a502712 18 WorkerTypes,
d35e5717 19} from '../../lib/index.cjs'
ded253e2 20import { WorkerNode } from '../../lib/pools/worker-node.cjs'
c6dd1aeb
JB
21import { PriorityQueue } from '../../lib/queues/priority-queue.cjs'
22import { defaultBucketSize } from '../../lib/queues/queue-types.cjs'
d35e5717
JB
23import { DEFAULT_TASK_NAME } from '../../lib/utils.cjs'
24import { waitPoolEvents } from '../test-utils.cjs'
e1ffb94f
JB
25
26describe('Abstract pool test suite', () => {
2eb14889
JB
27 const version = JSON.parse(
28 readFileSync(
a5e5599c 29 join(dirname(fileURLToPath(import.meta.url)), '../..', 'package.json'),
2eb14889
JB
30 'utf8'
31 )
32 ).version
fc027381 33 const numberOfWorkers = 2
a8884ffd 34 class StubPoolWithIsMain extends FixedThreadPool {
e1ffb94f
JB
35 isMain () {
36 return false
37 }
3ec964d6 38 }
3ec964d6 39
bcf85f7f
JB
40 it('Verify that pool can be created and destroyed', async () => {
41 const pool = new FixedThreadPool(
42 numberOfWorkers,
43 './tests/worker-files/thread/testWorker.mjs'
44 )
45 expect(pool).toBeInstanceOf(FixedThreadPool)
46 await pool.destroy()
47 })
48
49 it('Verify that pool cannot be created from a non main thread/process', () => {
8d3782fa
JB
50 expect(
51 () =>
a8884ffd 52 new StubPoolWithIsMain(
7c0ba920 53 numberOfWorkers,
b2fd3f4a 54 './tests/worker-files/thread/testWorker.mjs',
8d3782fa 55 {
3a502712 56 errorHandler: e => console.error(e),
8d3782fa
JB
57 }
58 )
948faff7 59 ).toThrow(
e695d66f
JB
60 new Error(
61 'Cannot start a pool from a worker with the same type as the pool'
62 )
04f45163 63 )
3ec964d6 64 })
c510fea7 65
bc61cfe6
JB
66 it('Verify that pool statuses properties are set', async () => {
67 const pool = new FixedThreadPool(
68 numberOfWorkers,
b2fd3f4a 69 './tests/worker-files/thread/testWorker.mjs'
bc61cfe6 70 )
bc61cfe6 71 expect(pool.started).toBe(true)
711623b8
JB
72 expect(pool.starting).toBe(false)
73 expect(pool.destroying).toBe(false)
bc61cfe6 74 await pool.destroy()
e48fa7a7
JB
75 expect(pool.started).toBe(false)
76 expect(pool.starting).toBe(false)
77 expect(pool.destroying).toBe(false)
bc61cfe6
JB
78 })
79
c510fea7 80 it('Verify that filePath is checked', () => {
948faff7 81 expect(() => new FixedThreadPool(numberOfWorkers)).toThrow(
c3719753
JB
82 new TypeError('The worker file path must be specified')
83 )
84 expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrow(
85 new TypeError('The worker file path must be a string')
3d6dd312
JB
86 )
87 expect(
88 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
948faff7 89 ).toThrow(new Error("Cannot find the worker file './dummyWorker.ts'"))
8d3782fa
JB
90 })
91
92 it('Verify that numberOfWorkers is checked', () => {
8003c026
JB
93 expect(
94 () =>
95 new FixedThreadPool(
96 undefined,
b2fd3f4a 97 './tests/worker-files/thread/testWorker.mjs'
8003c026 98 )
948faff7 99 ).toThrow(
e695d66f
JB
100 new Error(
101 'Cannot instantiate a pool without specifying the number of workers'
102 )
8d3782fa
JB
103 )
104 })
105
106 it('Verify that a negative number of workers is checked', () => {
107 expect(
108 () =>
d35e5717 109 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.cjs')
948faff7 110 ).toThrow(
473c717a
JB
111 new RangeError(
112 'Cannot instantiate a pool with a negative number of workers'
113 )
8d3782fa
JB
114 )
115 })
116
117 it('Verify that a non integer number of workers is checked', () => {
118 expect(
119 () =>
b2fd3f4a 120 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.mjs')
948faff7 121 ).toThrow(
473c717a 122 new TypeError(
0d80593b 123 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
124 )
125 )
c510fea7 126 })
7c0ba920 127
26ce26ca
JB
128 it('Verify that pool arguments number and pool type are checked', () => {
129 expect(
130 () =>
131 new FixedThreadPool(
132 numberOfWorkers,
133 './tests/worker-files/thread/testWorker.mjs',
134 undefined,
135 numberOfWorkers * 2
136 )
137 ).toThrow(
138 new Error(
139 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
140 )
141 )
142 })
143
216541b6 144 it('Verify that dynamic pool sizing is checked', () => {
a5ed75b7
JB
145 expect(
146 () =>
147 new DynamicClusterPool(
148 1,
149 undefined,
d35e5717 150 './tests/worker-files/cluster/testWorker.cjs'
a5ed75b7 151 )
948faff7 152 ).toThrow(
a5ed75b7
JB
153 new TypeError(
154 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
155 )
156 )
2761efb4
JB
157 expect(
158 () =>
159 new DynamicThreadPool(
160 0.5,
161 1,
b2fd3f4a 162 './tests/worker-files/thread/testWorker.mjs'
2761efb4 163 )
948faff7 164 ).toThrow(
2761efb4
JB
165 new TypeError(
166 'Cannot instantiate a pool with a non safe integer number of workers'
167 )
168 )
169 expect(
170 () =>
171 new DynamicClusterPool(
172 0,
173 0.5,
d35e5717 174 './tests/worker-files/cluster/testWorker.cjs'
2761efb4 175 )
948faff7 176 ).toThrow(
2761efb4
JB
177 new TypeError(
178 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
179 )
180 )
2431bdb4
JB
181 expect(
182 () =>
b2fd3f4a
JB
183 new DynamicThreadPool(
184 2,
185 1,
186 './tests/worker-files/thread/testWorker.mjs'
187 )
948faff7 188 ).toThrow(
2431bdb4
JB
189 new RangeError(
190 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
191 )
192 )
193 expect(
194 () =>
b2fd3f4a
JB
195 new DynamicThreadPool(
196 0,
197 0,
198 './tests/worker-files/thread/testWorker.mjs'
199 )
948faff7 200 ).toThrow(
2431bdb4 201 new RangeError(
213cbac6 202 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
2431bdb4
JB
203 )
204 )
21f710aa
JB
205 expect(
206 () =>
213cbac6
JB
207 new DynamicClusterPool(
208 1,
209 1,
d35e5717 210 './tests/worker-files/cluster/testWorker.cjs'
213cbac6 211 )
948faff7 212 ).toThrow(
21f710aa 213 new RangeError(
213cbac6 214 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
21f710aa
JB
215 )
216 )
2431bdb4
JB
217 })
218
fd7ebd49 219 it('Verify that pool options are checked', async () => {
7c0ba920
JB
220 let pool = new FixedThreadPool(
221 numberOfWorkers,
b2fd3f4a 222 './tests/worker-files/thread/testWorker.mjs'
7c0ba920 223 )
b5604034 224 expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
e44639e9 225 expect(pool.emitter.eventNames()).toStrictEqual([])
47352846 226 expect(pool.opts).toStrictEqual({
47352846 227 enableEvents: true,
47352846 228 enableTasksQueue: false,
97231086
JB
229 restartWorkerOnError: true,
230 startWorkers: true,
3a502712 231 workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
8990357d 232 })
bcfb06ce 233 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext
999ef664 234 .workerChoiceStrategies) {
86cbb766 235 expect(workerChoiceStrategy.opts).toStrictEqual({
97231086 236 elu: { median: false },
86cbb766
JB
237 runTime: { median: false },
238 waitTime: { median: false },
86cbb766
JB
239 weights: expect.objectContaining({
240 0: expect.any(Number),
3a502712
JB
241 [pool.info.maxSize - 1]: expect.any(Number),
242 }),
86cbb766 243 })
999ef664 244 }
fd7ebd49 245 await pool.destroy()
73bfd59d 246 const testHandler = () => console.info('test handler executed')
7c0ba920
JB
247 pool = new FixedThreadPool(
248 numberOfWorkers,
b2fd3f4a 249 './tests/worker-files/thread/testWorker.mjs',
7c0ba920 250 {
97231086
JB
251 enableEvents: false,
252 enableTasksQueue: true,
253 errorHandler: testHandler,
254 exitHandler: testHandler,
255 messageHandler: testHandler,
256 onlineHandler: testHandler,
257 restartWorkerOnError: false,
258 tasksQueueOptions: { concurrency: 2 },
e4543b14 259 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
49be33fe 260 workerChoiceStrategyOptions: {
932fc8be 261 runTime: { median: true },
3a502712 262 weights: { 0: 300, 1: 200 },
49be33fe 263 },
7c0ba920
JB
264 }
265 )
7c0ba920 266 expect(pool.emitter).toBeUndefined()
47352846 267 expect(pool.opts).toStrictEqual({
47352846 268 enableEvents: false,
47352846 269 enableTasksQueue: true,
97231086
JB
270 errorHandler: testHandler,
271 exitHandler: testHandler,
272 messageHandler: testHandler,
273 onlineHandler: testHandler,
274 restartWorkerOnError: false,
275 startWorkers: true,
47352846
JB
276 tasksQueueOptions: {
277 concurrency: 2,
2324f8c9 278 size: Math.pow(numberOfWorkers, 2),
97231086 279 tasksFinishedTimeout: 2000,
f09b1954 280 tasksStealingOnBackPressure: true,
e25f86b3 281 tasksStealingRatio: 0.6,
97231086 282 taskStealing: true,
47352846
JB
283 },
284 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
285 workerChoiceStrategyOptions: {
47352846 286 runTime: { median: true },
3a502712 287 weights: { 0: 300, 1: 200 },
47352846 288 },
8990357d 289 })
bcfb06ce 290 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext
999ef664
JB
291 .workerChoiceStrategies) {
292 expect(workerChoiceStrategy.opts).toStrictEqual({
97231086 293 elu: { median: false },
999ef664
JB
294 runTime: { median: true },
295 waitTime: { median: false },
3a502712 296 weights: { 0: 300, 1: 200 },
999ef664
JB
297 })
298 }
fd7ebd49 299 await pool.destroy()
7c0ba920
JB
300 })
301
fe291b64 302 it('Verify that pool options are validated', () => {
d4aeae5a
JB
303 expect(
304 () =>
305 new FixedThreadPool(
306 numberOfWorkers,
b2fd3f4a 307 './tests/worker-files/thread/testWorker.mjs',
d4aeae5a 308 {
3a502712 309 workerChoiceStrategy: 'invalidStrategy',
d4aeae5a
JB
310 }
311 )
948faff7 312 ).toThrow(new Error("Invalid worker choice strategy 'invalidStrategy'"))
49be33fe
JB
313 expect(
314 () =>
315 new FixedThreadPool(
316 numberOfWorkers,
b2fd3f4a 317 './tests/worker-files/thread/testWorker.mjs',
49be33fe 318 {
3a502712 319 workerChoiceStrategyOptions: { weights: {} },
49be33fe
JB
320 }
321 )
948faff7 322 ).toThrow(
8735b4e5
JB
323 new Error(
324 'Invalid worker choice strategy options: must have a weight for each worker node'
325 )
49be33fe 326 )
f0d7f803
JB
327 expect(
328 () =>
329 new FixedThreadPool(
330 numberOfWorkers,
b2fd3f4a 331 './tests/worker-files/thread/testWorker.mjs',
f0d7f803 332 {
3a502712 333 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' },
f0d7f803
JB
334 }
335 )
948faff7 336 ).toThrow(
8735b4e5
JB
337 new Error(
338 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
339 )
f0d7f803 340 )
5c4c2dee
JB
341 expect(
342 () =>
343 new FixedThreadPool(
344 numberOfWorkers,
b2fd3f4a 345 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
346 {
347 enableTasksQueue: true,
3a502712 348 tasksQueueOptions: 'invalidTasksQueueOptions',
5c4c2dee
JB
349 }
350 )
948faff7 351 ).toThrow(
5c4c2dee
JB
352 new TypeError('Invalid tasks queue options: must be a plain object')
353 )
f0d7f803
JB
354 expect(
355 () =>
356 new FixedThreadPool(
357 numberOfWorkers,
b2fd3f4a 358 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
359 {
360 enableTasksQueue: true,
3a502712 361 tasksQueueOptions: { concurrency: 0 },
f0d7f803
JB
362 }
363 )
948faff7 364 ).toThrow(
e695d66f 365 new RangeError(
20c6f652 366 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
8735b4e5
JB
367 )
368 )
f0d7f803
JB
369 expect(
370 () =>
371 new FixedThreadPool(
372 numberOfWorkers,
b2fd3f4a 373 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
374 {
375 enableTasksQueue: true,
3a502712 376 tasksQueueOptions: { concurrency: -1 },
f0d7f803
JB
377 }
378 )
948faff7 379 ).toThrow(
5c4c2dee
JB
380 new RangeError(
381 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
382 )
8735b4e5 383 )
f0d7f803
JB
384 expect(
385 () =>
386 new FixedThreadPool(
387 numberOfWorkers,
b2fd3f4a 388 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
389 {
390 enableTasksQueue: true,
3a502712 391 tasksQueueOptions: { concurrency: 0.2 },
f0d7f803
JB
392 }
393 )
948faff7 394 ).toThrow(
20c6f652 395 new TypeError('Invalid worker node tasks concurrency: must be an integer')
8735b4e5 396 )
5c4c2dee
JB
397 expect(
398 () =>
399 new FixedThreadPool(
400 numberOfWorkers,
b2fd3f4a 401 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
402 {
403 enableTasksQueue: true,
3a502712 404 tasksQueueOptions: { size: 0 },
5c4c2dee
JB
405 }
406 )
948faff7 407 ).toThrow(
5c4c2dee
JB
408 new RangeError(
409 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
410 )
411 )
412 expect(
413 () =>
414 new FixedThreadPool(
415 numberOfWorkers,
b2fd3f4a 416 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
417 {
418 enableTasksQueue: true,
3a502712 419 tasksQueueOptions: { size: -1 },
5c4c2dee
JB
420 }
421 )
948faff7 422 ).toThrow(
5c4c2dee
JB
423 new RangeError(
424 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
425 )
426 )
427 expect(
428 () =>
429 new FixedThreadPool(
430 numberOfWorkers,
b2fd3f4a 431 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
432 {
433 enableTasksQueue: true,
3a502712 434 tasksQueueOptions: { size: 0.2 },
5c4c2dee
JB
435 }
436 )
948faff7 437 ).toThrow(
5c4c2dee
JB
438 new TypeError('Invalid worker node tasks queue size: must be an integer')
439 )
e25f86b3
JB
440 expect(
441 () =>
442 new FixedThreadPool(
443 numberOfWorkers,
444 './tests/worker-files/thread/testWorker.mjs',
445 {
446 enableTasksQueue: true,
447 tasksQueueOptions: { tasksStealingRatio: '' },
448 }
449 )
450 ).toThrow(
451 new TypeError(
452 'Invalid worker node tasks stealing ratio: must be a number'
453 )
454 )
455 expect(
456 () =>
457 new FixedThreadPool(
458 numberOfWorkers,
459 './tests/worker-files/thread/testWorker.mjs',
460 {
461 enableTasksQueue: true,
462 tasksQueueOptions: { tasksStealingRatio: 1.1 },
463 }
464 )
465 ).toThrow(
466 new RangeError(
467 'Invalid worker node tasks stealing ratio: must be between 0 and 1'
468 )
469 )
d4aeae5a
JB
470 })
471
2431bdb4 472 it('Verify that pool worker choice strategy options can be set', async () => {
a20f0ba5
JB
473 const pool = new FixedThreadPool(
474 numberOfWorkers,
b2fd3f4a 475 './tests/worker-files/thread/testWorker.mjs',
a20f0ba5
JB
476 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
477 )
26ce26ca 478 expect(pool.opts.workerChoiceStrategyOptions).toBeUndefined()
bcfb06ce 479 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext
a20f0ba5 480 .workerChoiceStrategies) {
86cbb766 481 expect(workerChoiceStrategy.opts).toStrictEqual({
97231086 482 elu: { median: false },
86cbb766
JB
483 runTime: { median: false },
484 waitTime: { median: false },
86cbb766
JB
485 weights: expect.objectContaining({
486 0: expect.any(Number),
3a502712
JB
487 [pool.info.maxSize - 1]: expect.any(Number),
488 }),
86cbb766 489 })
a20f0ba5 490 }
87de9ff5 491 expect(
bcfb06ce 492 pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
87de9ff5 493 ).toStrictEqual({
97231086 494 elu: {
932fc8be
JB
495 aggregate: true,
496 average: true,
3a502712 497 median: false,
932fc8be 498 },
97231086 499 runTime: {
e0843544
JB
500 aggregate: true,
501 average: true,
3a502712 502 median: false,
932fc8be 503 },
97231086 504 waitTime: {
9adcefab
JB
505 aggregate: true,
506 average: true,
3a502712
JB
507 median: false,
508 },
86bf340d 509 })
9adcefab 510 pool.setWorkerChoiceStrategyOptions({
3a502712 511 elu: { median: true },
97231086 512 runTime: { median: true },
9adcefab 513 })
a20f0ba5 514 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
3a502712 515 elu: { median: true },
97231086 516 runTime: { median: true },
8990357d 517 })
bcfb06ce 518 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext
a20f0ba5 519 .workerChoiceStrategies) {
86cbb766 520 expect(workerChoiceStrategy.opts).toStrictEqual({
97231086 521 elu: { median: true },
86cbb766
JB
522 runTime: { median: true },
523 waitTime: { median: false },
86cbb766
JB
524 weights: expect.objectContaining({
525 0: expect.any(Number),
3a502712
JB
526 [pool.info.maxSize - 1]: expect.any(Number),
527 }),
86cbb766 528 })
a20f0ba5 529 }
87de9ff5 530 expect(
bcfb06ce 531 pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
87de9ff5 532 ).toStrictEqual({
97231086
JB
533 elu: {
534 aggregate: true,
535 average: false,
536 median: true,
537 },
932fc8be
JB
538 runTime: {
539 aggregate: true,
540 average: false,
3a502712 541 median: true,
932fc8be
JB
542 },
543 waitTime: {
e0843544
JB
544 aggregate: true,
545 average: true,
3a502712 546 median: false,
932fc8be 547 },
86bf340d 548 })
9adcefab 549 pool.setWorkerChoiceStrategyOptions({
3a502712 550 elu: { median: false },
97231086 551 runTime: { median: false },
9adcefab 552 })
a20f0ba5 553 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
3a502712 554 elu: { median: false },
97231086 555 runTime: { median: false },
8990357d 556 })
bcfb06ce 557 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext
a20f0ba5 558 .workerChoiceStrategies) {
86cbb766 559 expect(workerChoiceStrategy.opts).toStrictEqual({
97231086 560 elu: { median: false },
86cbb766
JB
561 runTime: { median: false },
562 waitTime: { median: false },
86cbb766
JB
563 weights: expect.objectContaining({
564 0: expect.any(Number),
3a502712
JB
565 [pool.info.maxSize - 1]: expect.any(Number),
566 }),
86cbb766 567 })
a20f0ba5 568 }
87de9ff5 569 expect(
bcfb06ce 570 pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
87de9ff5 571 ).toStrictEqual({
97231086 572 elu: {
932fc8be
JB
573 aggregate: true,
574 average: true,
3a502712 575 median: false,
932fc8be 576 },
97231086 577 runTime: {
e0843544
JB
578 aggregate: true,
579 average: true,
3a502712 580 median: false,
932fc8be 581 },
97231086 582 waitTime: {
9adcefab
JB
583 aggregate: true,
584 average: true,
3a502712
JB
585 median: false,
586 },
86bf340d 587 })
1f95d544
JB
588 expect(() =>
589 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
948faff7 590 ).toThrow(
8735b4e5
JB
591 new TypeError(
592 'Invalid worker choice strategy options: must be a plain object'
593 )
1f95d544 594 )
948faff7 595 expect(() => pool.setWorkerChoiceStrategyOptions({ weights: {} })).toThrow(
8735b4e5
JB
596 new Error(
597 'Invalid worker choice strategy options: must have a weight for each worker node'
598 )
1f95d544
JB
599 )
600 expect(() =>
601 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
948faff7 602 ).toThrow(
8735b4e5
JB
603 new Error(
604 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
605 )
1f95d544 606 )
a20f0ba5
JB
607 await pool.destroy()
608 })
609
2431bdb4 610 it('Verify that pool tasks queue can be enabled/disabled', async () => {
a20f0ba5
JB
611 const pool = new FixedThreadPool(
612 numberOfWorkers,
b2fd3f4a 613 './tests/worker-files/thread/testWorker.mjs'
a20f0ba5
JB
614 )
615 expect(pool.opts.enableTasksQueue).toBe(false)
616 expect(pool.opts.tasksQueueOptions).toBeUndefined()
617 pool.enableTasksQueue(true)
618 expect(pool.opts.enableTasksQueue).toBe(true)
20c6f652
JB
619 expect(pool.opts.tasksQueueOptions).toStrictEqual({
620 concurrency: 1,
2324f8c9 621 size: Math.pow(numberOfWorkers, 2),
97231086 622 tasksFinishedTimeout: 2000,
f09b1954 623 tasksStealingOnBackPressure: true,
e25f86b3 624 tasksStealingRatio: 0.6,
97231086 625 taskStealing: true,
20c6f652 626 })
a20f0ba5
JB
627 pool.enableTasksQueue(true, { concurrency: 2 })
628 expect(pool.opts.enableTasksQueue).toBe(true)
20c6f652
JB
629 expect(pool.opts.tasksQueueOptions).toStrictEqual({
630 concurrency: 2,
2324f8c9 631 size: Math.pow(numberOfWorkers, 2),
97231086 632 tasksFinishedTimeout: 2000,
f09b1954 633 tasksStealingOnBackPressure: true,
e25f86b3 634 tasksStealingRatio: 0.6,
97231086 635 taskStealing: true,
20c6f652 636 })
a20f0ba5
JB
637 pool.enableTasksQueue(false)
638 expect(pool.opts.enableTasksQueue).toBe(false)
639 expect(pool.opts.tasksQueueOptions).toBeUndefined()
640 await pool.destroy()
641 })
642
2431bdb4 643 it('Verify that pool tasks queue options can be set', async () => {
a20f0ba5
JB
644 const pool = new FixedThreadPool(
645 numberOfWorkers,
b2fd3f4a 646 './tests/worker-files/thread/testWorker.mjs',
a20f0ba5
JB
647 { enableTasksQueue: true }
648 )
20c6f652
JB
649 expect(pool.opts.tasksQueueOptions).toStrictEqual({
650 concurrency: 1,
2324f8c9 651 size: Math.pow(numberOfWorkers, 2),
97231086 652 tasksFinishedTimeout: 2000,
f09b1954 653 tasksStealingOnBackPressure: true,
e25f86b3 654 tasksStealingRatio: 0.6,
97231086 655 taskStealing: true,
20c6f652 656 })
d6ca1416 657 for (const workerNode of pool.workerNodes) {
2324f8c9
JB
658 expect(workerNode.tasksQueueBackPressureSize).toBe(
659 pool.opts.tasksQueueOptions.size
660 )
d6ca1416
JB
661 }
662 pool.setTasksQueueOptions({
663 concurrency: 2,
2324f8c9 664 size: 2,
97231086 665 tasksFinishedTimeout: 3000,
32b141fd 666 tasksStealingOnBackPressure: false,
e25f86b3 667 tasksStealingRatio: 0.5,
97231086 668 taskStealing: false,
d6ca1416 669 })
20c6f652
JB
670 expect(pool.opts.tasksQueueOptions).toStrictEqual({
671 concurrency: 2,
2324f8c9 672 size: 2,
97231086 673 tasksFinishedTimeout: 3000,
32b141fd 674 tasksStealingOnBackPressure: false,
e25f86b3 675 tasksStealingRatio: 0.5,
97231086 676 taskStealing: false,
d6ca1416
JB
677 })
678 for (const workerNode of pool.workerNodes) {
2324f8c9
JB
679 expect(workerNode.tasksQueueBackPressureSize).toBe(
680 pool.opts.tasksQueueOptions.size
681 )
d6ca1416
JB
682 }
683 pool.setTasksQueueOptions({
684 concurrency: 1,
3a502712 685 tasksStealingOnBackPressure: true,
97231086 686 taskStealing: true,
d6ca1416
JB
687 })
688 expect(pool.opts.tasksQueueOptions).toStrictEqual({
689 concurrency: 1,
e25f86b3 690 size: 2,
97231086 691 tasksFinishedTimeout: 3000,
32b141fd 692 tasksStealingOnBackPressure: true,
e25f86b3 693 tasksStealingRatio: 0.5,
97231086 694 taskStealing: true,
20c6f652 695 })
d6ca1416 696 for (const workerNode of pool.workerNodes) {
2324f8c9
JB
697 expect(workerNode.tasksQueueBackPressureSize).toBe(
698 pool.opts.tasksQueueOptions.size
699 )
d6ca1416 700 }
948faff7 701 expect(() => pool.setTasksQueueOptions('invalidTasksQueueOptions')).toThrow(
8735b4e5
JB
702 new TypeError('Invalid tasks queue options: must be a plain object')
703 )
948faff7 704 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrow(
e695d66f 705 new RangeError(
20c6f652 706 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
8735b4e5
JB
707 )
708 )
948faff7 709 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrow(
e695d66f 710 new RangeError(
20c6f652 711 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
8735b4e5 712 )
a20f0ba5 713 )
948faff7 714 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrow(
20c6f652
JB
715 new TypeError('Invalid worker node tasks concurrency: must be an integer')
716 )
948faff7 717 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrow(
20c6f652 718 new RangeError(
68dbcdc0 719 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
20c6f652
JB
720 )
721 )
948faff7 722 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrow(
20c6f652 723 new RangeError(
68dbcdc0 724 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
20c6f652
JB
725 )
726 )
948faff7 727 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrow(
68dbcdc0 728 new TypeError('Invalid worker node tasks queue size: must be an integer')
f0d7f803 729 )
e25f86b3
JB
730 expect(() => pool.setTasksQueueOptions({ tasksStealingRatio: '' })).toThrow(
731 new TypeError(
732 'Invalid worker node tasks stealing ratio: must be a number'
733 )
734 )
735 expect(() =>
736 pool.setTasksQueueOptions({ tasksStealingRatio: 1.1 })
737 ).toThrow(
16196bc0 738 new RangeError(
e25f86b3
JB
739 'Invalid worker node tasks stealing ratio: must be between 0 and 1'
740 )
741 )
a20f0ba5
JB
742 await pool.destroy()
743 })
744
6b27d407
JB
745 it('Verify that pool info is set', async () => {
746 let pool = new FixedThreadPool(
747 numberOfWorkers,
b2fd3f4a 748 './tests/worker-files/thread/testWorker.mjs'
6b27d407 749 )
2dca6cab 750 expect(pool.info).toStrictEqual({
2dca6cab 751 busyWorkerNodes: 0,
97231086 752 defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
2dca6cab
JB
753 executedTasks: 0,
754 executingTasks: 0,
3a502712 755 failedTasks: 0,
97231086
JB
756 idleWorkerNodes: numberOfWorkers,
757 maxSize: numberOfWorkers,
758 minSize: numberOfWorkers,
759 ready: true,
760 started: true,
761 strategyRetries: 0,
762 type: PoolTypes.fixed,
763 version,
764 worker: WorkerTypes.thread,
765 workerNodes: numberOfWorkers,
2dca6cab 766 })
6b27d407
JB
767 await pool.destroy()
768 pool = new DynamicClusterPool(
2431bdb4 769 Math.floor(numberOfWorkers / 2),
6b27d407 770 numberOfWorkers,
d35e5717 771 './tests/worker-files/cluster/testWorker.cjs'
6b27d407 772 )
2dca6cab 773 expect(pool.info).toStrictEqual({
2dca6cab 774 busyWorkerNodes: 0,
97231086 775 defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
01277ce6 776 dynamicWorkerNodes: 0,
2dca6cab
JB
777 executedTasks: 0,
778 executingTasks: 0,
3a502712 779 failedTasks: 0,
97231086
JB
780 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
781 maxSize: numberOfWorkers,
782 minSize: Math.floor(numberOfWorkers / 2),
783 ready: true,
784 started: true,
785 strategyRetries: 0,
786 type: PoolTypes.dynamic,
787 version,
788 worker: WorkerTypes.cluster,
789 workerNodes: Math.floor(numberOfWorkers / 2),
2dca6cab 790 })
6b27d407
JB
791 await pool.destroy()
792 })
793
2431bdb4 794 it('Verify that pool worker tasks usage are initialized', async () => {
bf9549ae
JB
795 const pool = new FixedClusterPool(
796 numberOfWorkers,
d35e5717 797 './tests/worker-files/cluster/testWorker.cjs'
bf9549ae 798 )
f06e48d8 799 for (const workerNode of pool.workerNodes) {
47352846 800 expect(workerNode).toBeInstanceOf(WorkerNode)
465b2940 801 expect(workerNode.usage).toStrictEqual({
97231086
JB
802 elu: {
803 active: {
804 history: expect.any(CircularBuffer),
805 },
806 idle: {
807 history: expect.any(CircularBuffer),
808 },
809 },
810 runTime: {
811 history: expect.any(CircularBuffer),
812 },
a4e07f72
JB
813 tasks: {
814 executed: 0,
815 executing: 0,
97231086 816 failed: 0,
df593701 817 maxQueued: 0,
97231086 818 queued: 0,
463226a4 819 sequentiallyStolen: 0,
68cbdc84 820 stolen: 0,
a4e07f72
JB
821 },
822 waitTime: {
3a502712 823 history: expect.any(CircularBuffer),
a4e07f72 824 },
86bf340d 825 })
f06e48d8
JB
826 }
827 await pool.destroy()
828 })
829
2431bdb4
JB
830 it('Verify that pool worker tasks queue are initialized', async () => {
831 let pool = new FixedClusterPool(
f06e48d8 832 numberOfWorkers,
d35e5717 833 './tests/worker-files/cluster/testWorker.cjs'
f06e48d8
JB
834 )
835 for (const workerNode of pool.workerNodes) {
47352846 836 expect(workerNode).toBeInstanceOf(WorkerNode)
95d1a734 837 expect(workerNode.tasksQueue).toBeInstanceOf(PriorityQueue)
4d8bf9e4 838 expect(workerNode.tasksQueue.size).toBe(0)
9c16fb4b 839 expect(workerNode.tasksQueue.maxSize).toBe(0)
9df282a0 840 expect(workerNode.tasksQueue.bucketSize).toBe(defaultBucketSize)
fcfc3353 841 expect(workerNode.tasksQueue.enablePriority).toBe(false)
bf9549ae 842 }
fd7ebd49 843 await pool.destroy()
2431bdb4
JB
844 pool = new DynamicThreadPool(
845 Math.floor(numberOfWorkers / 2),
846 numberOfWorkers,
b2fd3f4a 847 './tests/worker-files/thread/testWorker.mjs'
2431bdb4
JB
848 )
849 for (const workerNode of pool.workerNodes) {
47352846 850 expect(workerNode).toBeInstanceOf(WorkerNode)
95d1a734 851 expect(workerNode.tasksQueue).toBeInstanceOf(PriorityQueue)
2431bdb4
JB
852 expect(workerNode.tasksQueue.size).toBe(0)
853 expect(workerNode.tasksQueue.maxSize).toBe(0)
9df282a0 854 expect(workerNode.tasksQueue.bucketSize).toBe(defaultBucketSize)
fcfc3353 855 expect(workerNode.tasksQueue.enablePriority).toBe(false)
2431bdb4 856 }
213cbac6 857 await pool.destroy()
2431bdb4
JB
858 })
859
860 it('Verify that pool worker info are initialized', async () => {
861 let pool = new FixedClusterPool(
862 numberOfWorkers,
d35e5717 863 './tests/worker-files/cluster/testWorker.cjs'
2431bdb4 864 )
2dca6cab 865 for (const workerNode of pool.workerNodes) {
47352846 866 expect(workerNode).toBeInstanceOf(WorkerNode)
2dca6cab 867 expect(workerNode.info).toStrictEqual({
97231086
JB
868 backPressure: false,
869 backPressureStealing: false,
870 continuousStealing: false,
2dca6cab 871 dynamic: false,
97231086 872 id: expect.any(Number),
5eb72b9e 873 ready: true,
85b553ba 874 stealing: false,
eebfd819 875 stolen: false,
97231086 876 type: WorkerTypes.cluster,
2dca6cab
JB
877 })
878 }
2431bdb4
JB
879 await pool.destroy()
880 pool = new DynamicThreadPool(
881 Math.floor(numberOfWorkers / 2),
882 numberOfWorkers,
b2fd3f4a 883 './tests/worker-files/thread/testWorker.mjs'
2431bdb4 884 )
2dca6cab 885 for (const workerNode of pool.workerNodes) {
47352846 886 expect(workerNode).toBeInstanceOf(WorkerNode)
2dca6cab 887 expect(workerNode.info).toStrictEqual({
97231086
JB
888 backPressure: false,
889 backPressureStealing: false,
890 continuousStealing: false,
2dca6cab 891 dynamic: false,
97231086 892 id: expect.any(Number),
5eb72b9e 893 ready: true,
85b553ba 894 stealing: false,
eebfd819 895 stolen: false,
97231086 896 type: WorkerTypes.thread,
2dca6cab
JB
897 })
898 }
213cbac6 899 await pool.destroy()
bf9549ae
JB
900 })
901
711623b8
JB
902 it('Verify that pool statuses are checked at start or destroy', async () => {
903 const pool = new FixedThreadPool(
904 numberOfWorkers,
905 './tests/worker-files/thread/testWorker.mjs'
906 )
907 expect(pool.info.started).toBe(true)
908 expect(pool.info.ready).toBe(true)
909 expect(() => pool.start()).toThrow(
910 new Error('Cannot start an already started pool')
911 )
912 await pool.destroy()
913 expect(pool.info.started).toBe(false)
914 expect(pool.info.ready).toBe(false)
915 await expect(pool.destroy()).rejects.toThrow(
916 new Error('Cannot destroy an already destroyed pool')
917 )
918 })
919
47352846
JB
920 it('Verify that pool can be started after initialization', async () => {
921 const pool = new FixedClusterPool(
922 numberOfWorkers,
d35e5717 923 './tests/worker-files/cluster/testWorker.cjs',
47352846 924 {
3a502712 925 startWorkers: false,
47352846
JB
926 }
927 )
928 expect(pool.info.started).toBe(false)
929 expect(pool.info.ready).toBe(false)
930 expect(pool.workerNodes).toStrictEqual([])
8e8d9101 931 expect(pool.readyEventEmitted).toBe(false)
f8a57da1 932 expect(pool.busyEventEmitted).toBe(false)
303c0db0 933 expect(pool.backPressureEventEmitted).toBe(false)
47352846
JB
934 pool.start()
935 expect(pool.info.started).toBe(true)
936 expect(pool.info.ready).toBe(true)
55082af9
JB
937 await waitPoolEvents(pool, PoolEvents.ready, 1)
938 expect(pool.readyEventEmitted).toBe(true)
f8a57da1 939 expect(pool.busyEventEmitted).toBe(false)
303c0db0 940 expect(pool.backPressureEventEmitted).toBe(false)
47352846
JB
941 expect(pool.workerNodes.length).toBe(numberOfWorkers)
942 for (const workerNode of pool.workerNodes) {
943 expect(workerNode).toBeInstanceOf(WorkerNode)
944 }
945 await pool.destroy()
946 })
947
9d2d0da1
JB
948 it('Verify that pool execute() arguments are checked', async () => {
949 const pool = new FixedClusterPool(
950 numberOfWorkers,
d35e5717 951 './tests/worker-files/cluster/testWorker.cjs'
9d2d0da1 952 )
948faff7 953 await expect(pool.execute(undefined, 0)).rejects.toThrow(
9d2d0da1
JB
954 new TypeError('name argument must be a string')
955 )
948faff7 956 await expect(pool.execute(undefined, '')).rejects.toThrow(
9d2d0da1
JB
957 new TypeError('name argument must not be an empty string')
958 )
948faff7 959 await expect(pool.execute(undefined, undefined, {})).rejects.toThrow(
9d2d0da1
JB
960 new TypeError('transferList argument must be an array')
961 )
962 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
963 "Task function 'unknown' not found"
964 )
965 await pool.destroy()
948faff7 966 await expect(pool.execute()).rejects.toThrow(
47352846 967 new Error('Cannot execute a task on not started pool')
9d2d0da1
JB
968 )
969 })
970
2431bdb4 971 it('Verify that pool worker tasks usage are computed', async () => {
bf9549ae
JB
972 const pool = new FixedClusterPool(
973 numberOfWorkers,
d35e5717 974 './tests/worker-files/cluster/testWorker.cjs'
bf9549ae 975 )
09c2d0d3 976 const promises = new Set()
fc027381
JB
977 const maxMultiplier = 2
978 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
09c2d0d3 979 promises.add(pool.execute())
bf9549ae 980 }
f06e48d8 981 for (const workerNode of pool.workerNodes) {
465b2940 982 expect(workerNode.usage).toStrictEqual({
97231086
JB
983 elu: {
984 active: {
985 history: expect.any(CircularBuffer),
986 },
987 idle: {
988 history: expect.any(CircularBuffer),
989 },
990 },
991 runTime: {
992 history: expect.any(CircularBuffer),
993 },
a4e07f72
JB
994 tasks: {
995 executed: 0,
996 executing: maxMultiplier,
97231086 997 failed: 0,
df593701 998 maxQueued: 0,
97231086 999 queued: 0,
463226a4 1000 sequentiallyStolen: 0,
68cbdc84 1001 stolen: 0,
a4e07f72
JB
1002 },
1003 waitTime: {
3a502712 1004 history: expect.any(CircularBuffer),
a4e07f72 1005 },
97231086
JB
1006 })
1007 }
1008 await Promise.all(promises)
1009 for (const workerNode of pool.workerNodes) {
1010 expect(workerNode.usage).toStrictEqual({
5df69fab 1011 elu: {
97231086 1012 active: {
3a502712 1013 history: expect.any(CircularBuffer),
5df69fab 1014 },
97231086 1015 idle: {
3a502712
JB
1016 history: expect.any(CircularBuffer),
1017 },
1018 },
97231086
JB
1019 runTime: {
1020 history: expect.any(CircularBuffer),
1021 },
a4e07f72
JB
1022 tasks: {
1023 executed: maxMultiplier,
1024 executing: 0,
97231086 1025 failed: 0,
df593701 1026 maxQueued: 0,
97231086 1027 queued: 0,
463226a4 1028 sequentiallyStolen: 0,
68cbdc84 1029 stolen: 0,
a4e07f72
JB
1030 },
1031 waitTime: {
3a502712 1032 history: expect.any(CircularBuffer),
a4e07f72 1033 },
86bf340d 1034 })
bf9549ae 1035 }
fd7ebd49 1036 await pool.destroy()
bf9549ae
JB
1037 })
1038
bcfb06ce 1039 it("Verify that pool worker tasks usage aren't reset at worker choice strategy change", async () => {
7fd82a1c 1040 const pool = new DynamicThreadPool(
2431bdb4 1041 Math.floor(numberOfWorkers / 2),
8f4878b7 1042 numberOfWorkers,
b2fd3f4a 1043 './tests/worker-files/thread/testWorker.mjs'
9e619829 1044 )
09c2d0d3 1045 const promises = new Set()
ee9f5295
JB
1046 const maxMultiplier = 2
1047 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
09c2d0d3 1048 promises.add(pool.execute())
9e619829
JB
1049 }
1050 await Promise.all(promises)
f06e48d8 1051 for (const workerNode of pool.workerNodes) {
465b2940 1052 expect(workerNode.usage).toStrictEqual({
97231086
JB
1053 elu: {
1054 active: {
1055 history: expect.any(CircularBuffer),
1056 },
1057 idle: {
1058 history: expect.any(CircularBuffer),
1059 },
1060 },
1061 runTime: {
1062 history: expect.any(CircularBuffer),
1063 },
a4e07f72
JB
1064 tasks: {
1065 executed: expect.any(Number),
1066 executing: 0,
97231086 1067 failed: 0,
df593701 1068 maxQueued: 0,
97231086 1069 queued: 0,
463226a4 1070 sequentiallyStolen: 0,
68cbdc84 1071 stolen: 0,
a4e07f72
JB
1072 },
1073 waitTime: {
3a502712 1074 history: expect.any(CircularBuffer),
a4e07f72 1075 },
86bf340d 1076 })
465b2940 1077 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
94407def
JB
1078 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1079 numberOfWorkers * maxMultiplier
1080 )
9e619829
JB
1081 }
1082 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
f06e48d8 1083 for (const workerNode of pool.workerNodes) {
465b2940 1084 expect(workerNode.usage).toStrictEqual({
97231086
JB
1085 elu: {
1086 active: {
1087 history: expect.any(CircularBuffer),
1088 },
1089 idle: {
1090 history: expect.any(CircularBuffer),
1091 },
1092 },
1093 runTime: {
1094 history: expect.any(CircularBuffer),
1095 },
a4e07f72 1096 tasks: {
bcfb06ce 1097 executed: expect.any(Number),
a4e07f72 1098 executing: 0,
97231086 1099 failed: 0,
df593701 1100 maxQueued: 0,
97231086 1101 queued: 0,
463226a4 1102 sequentiallyStolen: 0,
68cbdc84 1103 stolen: 0,
a4e07f72
JB
1104 },
1105 waitTime: {
3a502712 1106 history: expect.any(CircularBuffer),
a4e07f72 1107 },
86bf340d 1108 })
bcfb06ce
JB
1109 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
1110 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1111 numberOfWorkers * maxMultiplier
1112 )
ee11a4a2 1113 }
fd7ebd49 1114 await pool.destroy()
ee11a4a2
JB
1115 })
1116
a1763c54
JB
1117 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1118 const pool = new DynamicClusterPool(
2431bdb4 1119 Math.floor(numberOfWorkers / 2),
164d950a 1120 numberOfWorkers,
d35e5717 1121 './tests/worker-files/cluster/testWorker.cjs'
164d950a 1122 )
c726f66c 1123 expect(pool.emitter.eventNames()).toStrictEqual([])
d46660cd 1124 let poolInfo
a1763c54 1125 let poolReady = 0
041dc05b 1126 pool.emitter.on(PoolEvents.ready, info => {
a1763c54 1127 ++poolReady
d46660cd
JB
1128 poolInfo = info
1129 })
a1763c54 1130 await waitPoolEvents(pool, PoolEvents.ready, 1)
c726f66c 1131 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
a1763c54 1132 expect(poolReady).toBe(1)
d46660cd 1133 expect(poolInfo).toStrictEqual({
01277ce6 1134 busyWorkerNodes: 0,
97231086 1135 defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
01277ce6
JB
1136 dynamicWorkerNodes: 0,
1137 executedTasks: 0,
1138 executingTasks: 0,
1139 failedTasks: 0,
1140 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
5f9e9050
JB
1141 maxSize: numberOfWorkers,
1142 minSize: Math.floor(numberOfWorkers / 2),
97231086
JB
1143 ready: true,
1144 started: true,
1145 strategyRetries: expect.any(Number),
1146 type: PoolTypes.dynamic,
1147 version,
1148 worker: WorkerTypes.cluster,
5f9e9050 1149 workerNodes: Math.floor(numberOfWorkers / 2),
2431bdb4
JB
1150 })
1151 await pool.destroy()
1152 })
1153
f8a57da1 1154 it("Verify that pool event emitter 'busy' and 'busyEnd' events can register a callback", async () => {
a1763c54 1155 const pool = new FixedThreadPool(
2431bdb4 1156 numberOfWorkers,
b2fd3f4a 1157 './tests/worker-files/thread/testWorker.mjs'
2431bdb4 1158 )
c726f66c 1159 expect(pool.emitter.eventNames()).toStrictEqual([])
a1763c54
JB
1160 const promises = new Set()
1161 let poolBusy = 0
f8a57da1 1162 let poolBusyInfo
041dc05b 1163 pool.emitter.on(PoolEvents.busy, info => {
a1763c54 1164 ++poolBusy
f8a57da1
JB
1165 poolBusyInfo = info
1166 })
1167 let poolBusyEnd = 0
1168 let poolBusyEndInfo
1169 pool.emitter.on(PoolEvents.busyEnd, info => {
1170 ++poolBusyEnd
1171 poolBusyEndInfo = info
2431bdb4 1172 })
f8a57da1
JB
1173 expect(pool.emitter.eventNames()).toStrictEqual([
1174 PoolEvents.busy,
1175 PoolEvents.busyEnd,
1176 ])
a1763c54
JB
1177 for (let i = 0; i < numberOfWorkers * 2; i++) {
1178 promises.add(pool.execute())
1179 }
1180 await Promise.all(promises)
f8a57da1
JB
1181 expect(poolBusy).toBe(1)
1182 expect(poolBusyInfo).toStrictEqual({
6a307669 1183 busyWorkerNodes: numberOfWorkers,
f8a57da1
JB
1184 defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
1185 executedTasks: expect.any(Number),
1186 executingTasks: expect.any(Number),
1187 failedTasks: expect.any(Number),
6a307669 1188 idleWorkerNodes: 0,
5f9e9050
JB
1189 maxSize: numberOfWorkers,
1190 minSize: numberOfWorkers,
f8a57da1
JB
1191 ready: true,
1192 started: true,
1193 strategyRetries: expect.any(Number),
1194 type: PoolTypes.fixed,
1195 version,
1196 worker: WorkerTypes.thread,
5f9e9050 1197 workerNodes: numberOfWorkers,
f8a57da1
JB
1198 })
1199 expect(poolBusyEnd).toBe(1)
1200 expect(poolBusyEndInfo).toStrictEqual({
d46660cd 1201 busyWorkerNodes: expect.any(Number),
97231086 1202 defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
a4e07f72
JB
1203 executedTasks: expect.any(Number),
1204 executingTasks: expect.any(Number),
3a502712 1205 failedTasks: expect.any(Number),
97231086 1206 idleWorkerNodes: expect.any(Number),
5f9e9050
JB
1207 maxSize: numberOfWorkers,
1208 minSize: numberOfWorkers,
97231086
JB
1209 ready: true,
1210 started: true,
1211 strategyRetries: expect.any(Number),
1212 type: PoolTypes.fixed,
1213 version,
1214 worker: WorkerTypes.thread,
5f9e9050 1215 workerNodes: numberOfWorkers,
d46660cd 1216 })
6a307669 1217 expect(poolBusyEndInfo.busyWorkerNodes).toBeLessThan(numberOfWorkers)
164d950a
JB
1218 await pool.destroy()
1219 })
1220
5f9e9050 1221 it("Verify that pool event emitter 'full' and 'fullEnd' events can register a callback", async () => {
7682c56f 1222 const pool = new DynamicClusterPool(
a1763c54 1223 Math.floor(numberOfWorkers / 2),
7c0ba920 1224 numberOfWorkers,
7682c56f 1225 './tests/worker-files/cluster/testWorker.cjs'
7c0ba920 1226 )
c726f66c 1227 expect(pool.emitter.eventNames()).toStrictEqual([])
09c2d0d3 1228 const promises = new Set()
a1763c54 1229 let poolFull = 0
5f9e9050 1230 let poolFullInfo
041dc05b 1231 pool.emitter.on(PoolEvents.full, info => {
a1763c54 1232 ++poolFull
5f9e9050 1233 poolFullInfo = info
d46660cd 1234 })
5f9e9050
JB
1235 let poolFullEnd = 0
1236 let poolFullEndInfo
1237 pool.emitter.on(PoolEvents.fullEnd, info => {
1238 ++poolFullEnd
1239 poolFullEndInfo = info
1240 })
1241 expect(pool.emitter.eventNames()).toStrictEqual([
1242 PoolEvents.full,
1243 PoolEvents.fullEnd,
1244 ])
7c0ba920 1245 for (let i = 0; i < numberOfWorkers * 2; i++) {
f5d14e90 1246 promises.add(pool.execute())
7c0ba920 1247 }
cf597bc5 1248 await Promise.all(promises)
33e6bb4c 1249 expect(poolFull).toBe(1)
5f9e9050 1250 expect(poolFullInfo).toStrictEqual({
8735b4e5 1251 busyWorkerNodes: expect.any(Number),
97231086 1252 defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
01277ce6 1253 dynamicWorkerNodes: Math.floor(numberOfWorkers / 2),
8735b4e5
JB
1254 executedTasks: expect.any(Number),
1255 executingTasks: expect.any(Number),
3a502712 1256 failedTasks: expect.any(Number),
97231086 1257 idleWorkerNodes: expect.any(Number),
5f9e9050
JB
1258 maxSize: numberOfWorkers,
1259 minSize: Math.floor(numberOfWorkers / 2),
97231086
JB
1260 ready: true,
1261 started: true,
1262 strategyRetries: expect.any(Number),
1263 type: PoolTypes.dynamic,
1264 version,
7682c56f 1265 worker: WorkerTypes.cluster,
5f9e9050
JB
1266 workerNodes: numberOfWorkers,
1267 })
1268 await waitPoolEvents(pool, PoolEvents.fullEnd, 1)
1269 expect(poolFullEnd).toBe(1)
1270 expect(poolFullEndInfo).toStrictEqual({
1271 busyWorkerNodes: expect.any(Number),
1272 defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
01277ce6 1273 dynamicWorkerNodes: 0,
5f9e9050
JB
1274 executedTasks: expect.any(Number),
1275 executingTasks: expect.any(Number),
1276 failedTasks: expect.any(Number),
1277 idleWorkerNodes: expect.any(Number),
1278 maxSize: numberOfWorkers,
1279 minSize: Math.floor(numberOfWorkers / 2),
1280 ready: true,
1281 started: true,
1282 strategyRetries: expect.any(Number),
1283 type: PoolTypes.dynamic,
1284 version,
1285 worker: WorkerTypes.cluster,
1286 workerNodes: Math.floor(numberOfWorkers / 2),
8735b4e5
JB
1287 })
1288 await pool.destroy()
1289 })
1290
303c0db0 1291 it("Verify that pool event emitter 'backPressure' and 'backPressureEnd' events can register a callback", async () => {
b1aae695 1292 const pool = new FixedThreadPool(
8735b4e5 1293 numberOfWorkers,
b2fd3f4a 1294 './tests/worker-files/thread/testWorker.mjs',
8735b4e5 1295 {
3a502712 1296 enableTasksQueue: true,
8735b4e5
JB
1297 }
1298 )
c726f66c 1299 expect(pool.emitter.eventNames()).toStrictEqual([])
8735b4e5
JB
1300 const promises = new Set()
1301 let poolBackPressure = 0
f8a57da1 1302 let poolBackPressureInfo
041dc05b 1303 pool.emitter.on(PoolEvents.backPressure, info => {
8735b4e5 1304 ++poolBackPressure
f8a57da1 1305 poolBackPressureInfo = info
8735b4e5 1306 })
303c0db0 1307 let poolBackPressureEnd = 0
f8a57da1 1308 let poolBackPressureEndInfo
303c0db0
JB
1309 pool.emitter.on(PoolEvents.backPressureEnd, info => {
1310 ++poolBackPressureEnd
f8a57da1 1311 poolBackPressureEndInfo = info
303c0db0
JB
1312 })
1313 expect(pool.emitter.eventNames()).toStrictEqual([
1314 PoolEvents.backPressure,
1315 PoolEvents.backPressureEnd,
1316 ])
1317 for (let i = 0; i < numberOfWorkers * 10; i++) {
8735b4e5
JB
1318 promises.add(pool.execute())
1319 }
1320 await Promise.all(promises)
033f1776 1321 expect(poolBackPressure).toBe(1)
f8a57da1 1322 expect(poolBackPressureInfo).toStrictEqual({
97231086 1323 backPressure: true,
6a307669 1324 backPressureWorkerNodes: numberOfWorkers,
97231086
JB
1325 busyWorkerNodes: expect.any(Number),
1326 defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
a4e07f72
JB
1327 executedTasks: expect.any(Number),
1328 executingTasks: expect.any(Number),
97231086
JB
1329 failedTasks: expect.any(Number),
1330 idleWorkerNodes: expect.any(Number),
3e8611a8 1331 maxQueuedTasks: expect.any(Number),
5f9e9050
JB
1332 maxSize: numberOfWorkers,
1333 minSize: numberOfWorkers,
3e8611a8 1334 queuedTasks: expect.any(Number),
97231086
JB
1335 ready: true,
1336 started: true,
1337 stealingWorkerNodes: expect.any(Number),
68cbdc84 1338 stolenTasks: expect.any(Number),
97231086
JB
1339 strategyRetries: expect.any(Number),
1340 type: PoolTypes.fixed,
1341 version,
1342 worker: WorkerTypes.thread,
5f9e9050 1343 workerNodes: numberOfWorkers,
d46660cd 1344 })
303c0db0 1345 expect(poolBackPressureEnd).toBe(1)
f8a57da1 1346 expect(poolBackPressureEndInfo).toStrictEqual({
303c0db0
JB
1347 backPressure: false,
1348 backPressureWorkerNodes: expect.any(Number),
1349 busyWorkerNodes: expect.any(Number),
1350 defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
1351 executedTasks: expect.any(Number),
1352 executingTasks: expect.any(Number),
1353 failedTasks: expect.any(Number),
1354 idleWorkerNodes: expect.any(Number),
1355 maxQueuedTasks: expect.any(Number),
5f9e9050
JB
1356 maxSize: numberOfWorkers,
1357 minSize: numberOfWorkers,
303c0db0
JB
1358 queuedTasks: expect.any(Number),
1359 ready: true,
1360 started: true,
1361 stealingWorkerNodes: expect.any(Number),
1362 stolenTasks: expect.any(Number),
1363 strategyRetries: expect.any(Number),
1364 type: PoolTypes.fixed,
1365 version,
1366 worker: WorkerTypes.thread,
5f9e9050 1367 workerNodes: numberOfWorkers,
303c0db0 1368 })
6a307669
JB
1369 expect(poolBackPressureEndInfo.backPressureWorkerNodes).toBeLessThan(
1370 numberOfWorkers
1371 )
fd7ebd49 1372 await pool.destroy()
7c0ba920 1373 })
70a4f5ea 1374
7682c56f
JB
1375 it("Verify that pool event emitter 'empty' event can register a callback", async () => {
1376 const pool = new DynamicClusterPool(
1377 0,
1378 numberOfWorkers,
1379 './tests/worker-files/cluster/testWorker.cjs'
1380 )
1381 expect(pool.emitter.eventNames()).toStrictEqual([])
1382 const promises = new Set()
1383 let poolEmpty = 0
1384 let poolInfo
1385 pool.emitter.on(PoolEvents.empty, info => {
1386 ++poolEmpty
1387 poolInfo = info
1388 })
1389 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.empty])
fa97876e 1390 for (let i = 0; i < numberOfWorkers; i++) {
7682c56f
JB
1391 promises.add(pool.execute())
1392 }
1393 await Promise.all(promises)
1394 await waitPoolEvents(pool, PoolEvents.empty, 1)
1395 expect(poolEmpty).toBe(1)
1396 expect(poolInfo).toStrictEqual({
4b91b771 1397 busyWorkerNodes: 0,
7682c56f 1398 defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
01277ce6 1399 dynamicWorkerNodes: 0,
7682c56f
JB
1400 executedTasks: expect.any(Number),
1401 executingTasks: expect.any(Number),
1402 failedTasks: expect.any(Number),
4b91b771 1403 idleWorkerNodes: 0,
5f9e9050
JB
1404 maxSize: numberOfWorkers,
1405 minSize: 0,
7682c56f
JB
1406 ready: true,
1407 started: true,
1408 strategyRetries: expect.any(Number),
1409 type: PoolTypes.dynamic,
1410 version,
1411 worker: WorkerTypes.cluster,
5f9e9050 1412 workerNodes: 0,
7682c56f
JB
1413 })
1414 await pool.destroy()
1415 })
1416
85b2561d
JB
1417 it('Verify that destroy() waits for queued tasks to finish', async () => {
1418 const tasksFinishedTimeout = 2500
1419 const pool = new FixedThreadPool(
1420 numberOfWorkers,
1421 './tests/worker-files/thread/asyncWorker.mjs',
1422 {
1423 enableTasksQueue: true,
3a502712 1424 tasksQueueOptions: { tasksFinishedTimeout },
85b2561d
JB
1425 }
1426 )
1427 const maxMultiplier = 4
1428 let tasksFinished = 0
1429 for (const workerNode of pool.workerNodes) {
1430 workerNode.on('taskFinished', () => {
1431 ++tasksFinished
1432 })
1433 }
1434 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1435 pool.execute()
1436 }
1437 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1438 const startTime = performance.now()
1439 await pool.destroy()
1440 const elapsedTime = performance.now() - startTime
90afa746 1441 expect(tasksFinished).toBeLessThanOrEqual(numberOfWorkers * maxMultiplier)
85b2561d 1442 expect(elapsedTime).toBeGreaterThanOrEqual(2000)
14d5e183 1443 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 800)
85b2561d
JB
1444 })
1445
1446 it('Verify that destroy() waits until the tasks finished timeout is reached', async () => {
1447 const tasksFinishedTimeout = 1000
1448 const pool = new FixedThreadPool(
1449 numberOfWorkers,
1450 './tests/worker-files/thread/asyncWorker.mjs',
1451 {
1452 enableTasksQueue: true,
3a502712 1453 tasksQueueOptions: { tasksFinishedTimeout },
85b2561d
JB
1454 }
1455 )
1456 const maxMultiplier = 4
1457 let tasksFinished = 0
1458 for (const workerNode of pool.workerNodes) {
1459 workerNode.on('taskFinished', () => {
1460 ++tasksFinished
1461 })
1462 }
1463 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1464 pool.execute()
1465 }
1466 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1467 const startTime = performance.now()
1468 await pool.destroy()
1469 const elapsedTime = performance.now() - startTime
1470 expect(tasksFinished).toBe(0)
2885534c 1471 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 800)
85b2561d
JB
1472 })
1473
f18fd12b
JB
1474 it('Verify that pool asynchronous resource track tasks execution', async () => {
1475 let taskAsyncId
1476 let initCalls = 0
1477 let beforeCalls = 0
1478 let afterCalls = 0
1479 let resolveCalls = 0
1480 const hook = createHook({
97231086
JB
1481 after (asyncId) {
1482 if (asyncId === taskAsyncId) afterCalls++
1483 },
1484 before (asyncId) {
1485 if (asyncId === taskAsyncId) beforeCalls++
1486 },
f18fd12b
JB
1487 init (asyncId, type) {
1488 if (type === 'poolifier:task') {
1489 initCalls++
1490 taskAsyncId = asyncId
1491 }
1492 },
f18fd12b
JB
1493 promiseResolve () {
1494 if (executionAsyncId() === taskAsyncId) resolveCalls++
3a502712 1495 },
f18fd12b 1496 })
f18fd12b
JB
1497 const pool = new FixedThreadPool(
1498 numberOfWorkers,
1499 './tests/worker-files/thread/testWorker.mjs'
1500 )
8954c0a3 1501 hook.enable()
f18fd12b
JB
1502 await pool.execute()
1503 hook.disable()
1504 expect(initCalls).toBe(1)
1505 expect(beforeCalls).toBe(1)
1506 expect(afterCalls).toBe(1)
1507 expect(resolveCalls).toBe(1)
8954c0a3 1508 await pool.destroy()
f18fd12b
JB
1509 })
1510
9eae3c69
JB
1511 it('Verify that hasTaskFunction() is working', async () => {
1512 const dynamicThreadPool = new DynamicThreadPool(
1513 Math.floor(numberOfWorkers / 2),
1514 numberOfWorkers,
b2fd3f4a 1515 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
9eae3c69
JB
1516 )
1517 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1518 expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1519 expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1520 true
1521 )
1522 expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true)
1523 expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true)
1524 expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false)
1525 await dynamicThreadPool.destroy()
1526 const fixedClusterPool = new FixedClusterPool(
1527 numberOfWorkers,
d35e5717 1528 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs'
9eae3c69
JB
1529 )
1530 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1531 expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1532 expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1533 true
1534 )
1535 expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true)
1536 expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true)
1537 expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false)
1538 await fixedClusterPool.destroy()
1539 })
1540
1541 it('Verify that addTaskFunction() is working', async () => {
1542 const dynamicThreadPool = new DynamicThreadPool(
1543 Math.floor(numberOfWorkers / 2),
1544 numberOfWorkers,
b2fd3f4a 1545 './tests/worker-files/thread/testWorker.mjs'
9eae3c69
JB
1546 )
1547 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
3feeab69
JB
1548 await expect(
1549 dynamicThreadPool.addTaskFunction(0, () => {})
948faff7 1550 ).rejects.toThrow(new TypeError('name argument must be a string'))
3feeab69
JB
1551 await expect(
1552 dynamicThreadPool.addTaskFunction('', () => {})
948faff7 1553 ).rejects.toThrow(
3feeab69
JB
1554 new TypeError('name argument must not be an empty string')
1555 )
948faff7 1556 await expect(dynamicThreadPool.addTaskFunction('test', 0)).rejects.toThrow(
31847469 1557 new TypeError('taskFunction property must be a function')
948faff7
JB
1558 )
1559 await expect(dynamicThreadPool.addTaskFunction('test', '')).rejects.toThrow(
31847469 1560 new TypeError('taskFunction property must be a function')
948faff7 1561 )
19c42d90
JB
1562 await expect(
1563 dynamicThreadPool.addTaskFunction('test', { taskFunction: 0 })
1564 ).rejects.toThrow(new TypeError('taskFunction property must be a function'))
1565 await expect(
1566 dynamicThreadPool.addTaskFunction('test', { taskFunction: '' })
1567 ).rejects.toThrow(new TypeError('taskFunction property must be a function'))
1568 await expect(
1569 dynamicThreadPool.addTaskFunction('test', {
3a502712 1570 priority: -21,
97231086 1571 taskFunction: () => {},
19c42d90
JB
1572 })
1573 ).rejects.toThrow(
1574 new RangeError("Property 'priority' must be between -20 and 19")
1575 )
1576 await expect(
1577 dynamicThreadPool.addTaskFunction('test', {
3a502712 1578 priority: 20,
97231086 1579 taskFunction: () => {},
19c42d90
JB
1580 })
1581 ).rejects.toThrow(
1582 new RangeError("Property 'priority' must be between -20 and 19")
1583 )
1584 await expect(
1585 dynamicThreadPool.addTaskFunction('test', {
3a502712 1586 strategy: 'invalidStrategy',
97231086 1587 taskFunction: () => {},
19c42d90
JB
1588 })
1589 ).rejects.toThrow(
1590 new Error("Invalid worker choice strategy 'invalidStrategy'")
1591 )
31847469
JB
1592 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1593 { name: DEFAULT_TASK_NAME },
3a502712 1594 { name: 'test' },
9eae3c69 1595 ])
f7a08a34 1596 expect([
3a502712 1597 ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys(),
f7a08a34 1598 ]).toStrictEqual([WorkerChoiceStrategies.ROUND_ROBIN])
9eae3c69
JB
1599 const echoTaskFunction = data => {
1600 return data
1601 }
1602 await expect(
f7a08a34 1603 dynamicThreadPool.addTaskFunction('echo', {
3a502712 1604 strategy: WorkerChoiceStrategies.LEAST_ELU,
97231086 1605 taskFunction: echoTaskFunction,
f7a08a34 1606 })
9eae3c69
JB
1607 ).resolves.toBe(true)
1608 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
31847469 1609 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual({
3a502712 1610 strategy: WorkerChoiceStrategies.LEAST_ELU,
97231086 1611 taskFunction: echoTaskFunction,
31847469 1612 })
f7a08a34 1613 expect([
3a502712 1614 ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys(),
f7a08a34
JB
1615 ]).toStrictEqual([
1616 WorkerChoiceStrategies.ROUND_ROBIN,
3a502712 1617 WorkerChoiceStrategies.LEAST_ELU,
f7a08a34 1618 ])
31847469
JB
1619 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1620 { name: DEFAULT_TASK_NAME },
1621 { name: 'test' },
3a502712 1622 { name: 'echo', strategy: WorkerChoiceStrategies.LEAST_ELU },
9eae3c69
JB
1623 ])
1624 const taskFunctionData = { test: 'test' }
1625 const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
1626 expect(echoResult).toStrictEqual(taskFunctionData)
adee6053
JB
1627 for (const workerNode of dynamicThreadPool.workerNodes) {
1628 expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
97231086
JB
1629 elu: expect.objectContaining({
1630 active: expect.objectContaining({
1631 history: expect.any(CircularBuffer),
1632 }),
1633 idle: expect.objectContaining({
1634 history: expect.any(CircularBuffer),
1635 }),
1636 }),
1637 runTime: {
1638 history: expect.any(CircularBuffer),
1639 },
adee6053
JB
1640 tasks: {
1641 executed: expect.any(Number),
1642 executing: 0,
97231086 1643 failed: 0,
adee6053 1644 queued: 0,
463226a4 1645 sequentiallyStolen: 0,
5ad42e34 1646 stolen: 0,
adee6053
JB
1647 },
1648 waitTime: {
3a502712 1649 history: expect.any(CircularBuffer),
adee6053 1650 },
adee6053 1651 })
9a55fa8c
JB
1652 expect(
1653 workerNode.getTaskFunctionWorkerUsage('echo').tasks.executed
1654 ).toBeGreaterThan(0)
1655 if (
1656 workerNode.getTaskFunctionWorkerUsage('echo').elu.active.aggregate ==
1657 null
1658 ) {
1659 expect(
1660 workerNode.getTaskFunctionWorkerUsage('echo').elu.active.aggregate
1661 ).toBeUndefined()
1662 } else {
1663 expect(
1664 workerNode.getTaskFunctionWorkerUsage('echo').elu.active.aggregate
1665 ).toBeGreaterThan(0)
1666 }
1667 if (
1668 workerNode.getTaskFunctionWorkerUsage('echo').elu.idle.aggregate == null
1669 ) {
1670 expect(
1671 workerNode.getTaskFunctionWorkerUsage('echo').elu.idle.aggregate
1672 ).toBeUndefined()
1673 } else {
1674 expect(
1675 workerNode.getTaskFunctionWorkerUsage('echo').elu.idle.aggregate
1676 ).toBeGreaterThanOrEqual(0)
1677 }
1678 if (
1679 workerNode.getTaskFunctionWorkerUsage('echo').elu.utilization == null
1680 ) {
1681 expect(
1682 workerNode.getTaskFunctionWorkerUsage('echo').elu.utilization
1683 ).toBeUndefined()
1684 } else {
1685 expect(
1686 workerNode.getTaskFunctionWorkerUsage('echo').elu.utilization
1687 ).toBeGreaterThanOrEqual(0)
1688 expect(
1689 workerNode.getTaskFunctionWorkerUsage('echo').elu.utilization
1690 ).toBeLessThanOrEqual(1)
1691 }
adee6053 1692 }
9eae3c69
JB
1693 await dynamicThreadPool.destroy()
1694 })
1695
1696 it('Verify that removeTaskFunction() is working', async () => {
1697 const dynamicThreadPool = new DynamicThreadPool(
1698 Math.floor(numberOfWorkers / 2),
1699 numberOfWorkers,
b2fd3f4a 1700 './tests/worker-files/thread/testWorker.mjs'
9eae3c69
JB
1701 )
1702 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
31847469
JB
1703 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1704 { name: DEFAULT_TASK_NAME },
3a502712 1705 { name: 'test' },
9eae3c69 1706 ])
948faff7 1707 await expect(dynamicThreadPool.removeTaskFunction('test')).rejects.toThrow(
16248b23 1708 new Error('Cannot remove a task function not handled on the pool side')
9eae3c69
JB
1709 )
1710 const echoTaskFunction = data => {
1711 return data
1712 }
f7a08a34 1713 await dynamicThreadPool.addTaskFunction('echo', {
3a502712 1714 strategy: WorkerChoiceStrategies.LEAST_ELU,
97231086 1715 taskFunction: echoTaskFunction,
f7a08a34 1716 })
9eae3c69 1717 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
31847469 1718 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual({
3a502712 1719 strategy: WorkerChoiceStrategies.LEAST_ELU,
97231086 1720 taskFunction: echoTaskFunction,
31847469 1721 })
f7a08a34 1722 expect([
3a502712 1723 ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys(),
f7a08a34
JB
1724 ]).toStrictEqual([
1725 WorkerChoiceStrategies.ROUND_ROBIN,
3a502712 1726 WorkerChoiceStrategies.LEAST_ELU,
f7a08a34 1727 ])
31847469
JB
1728 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1729 { name: DEFAULT_TASK_NAME },
1730 { name: 'test' },
3a502712 1731 { name: 'echo', strategy: WorkerChoiceStrategies.LEAST_ELU },
9eae3c69
JB
1732 ])
1733 await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
1734 true
1735 )
1736 expect(dynamicThreadPool.taskFunctions.size).toBe(0)
1737 expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
f7a08a34 1738 expect([
3a502712 1739 ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys(),
f7a08a34 1740 ]).toStrictEqual([WorkerChoiceStrategies.ROUND_ROBIN])
31847469
JB
1741 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1742 { name: DEFAULT_TASK_NAME },
3a502712 1743 { name: 'test' },
9eae3c69
JB
1744 ])
1745 await dynamicThreadPool.destroy()
1746 })
1747
f7a08a34 1748 it('Verify that listTaskFunctionsProperties() is working', async () => {
90d7d101
JB
1749 const dynamicThreadPool = new DynamicThreadPool(
1750 Math.floor(numberOfWorkers / 2),
1751 numberOfWorkers,
b2fd3f4a 1752 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
90d7d101
JB
1753 )
1754 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
31847469
JB
1755 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1756 { name: DEFAULT_TASK_NAME },
31847469 1757 { name: 'factorial' },
3a502712 1758 { name: 'fibonacci' },
97231086 1759 { name: 'jsonIntegerSerialization' },
90d7d101 1760 ])
9eae3c69 1761 await dynamicThreadPool.destroy()
90d7d101
JB
1762 const fixedClusterPool = new FixedClusterPool(
1763 numberOfWorkers,
d35e5717 1764 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs'
90d7d101
JB
1765 )
1766 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
31847469
JB
1767 expect(fixedClusterPool.listTaskFunctionsProperties()).toStrictEqual([
1768 { name: DEFAULT_TASK_NAME },
31847469 1769 { name: 'factorial' },
3a502712 1770 { name: 'fibonacci' },
97231086 1771 { name: 'jsonIntegerSerialization' },
90d7d101 1772 ])
0fe39c97 1773 await fixedClusterPool.destroy()
90d7d101
JB
1774 })
1775
9eae3c69 1776 it('Verify that setDefaultTaskFunction() is working', async () => {
30500265
JB
1777 const dynamicThreadPool = new DynamicThreadPool(
1778 Math.floor(numberOfWorkers / 2),
1779 numberOfWorkers,
b2fd3f4a 1780 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
30500265
JB
1781 )
1782 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
711623b8 1783 const workerId = dynamicThreadPool.workerNodes[0].info.id
948faff7 1784 await expect(dynamicThreadPool.setDefaultTaskFunction(0)).rejects.toThrow(
b0b55f57 1785 new Error(
711623b8 1786 `Task function operation 'default' failed on worker ${workerId} with error: 'TypeError: name parameter is not a string'`
b0b55f57
JB
1787 )
1788 )
1789 await expect(
1790 dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
948faff7 1791 ).rejects.toThrow(
b0b55f57 1792 new Error(
711623b8 1793 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function reserved name as the default task function'`
b0b55f57
JB
1794 )
1795 )
1796 await expect(
1797 dynamicThreadPool.setDefaultTaskFunction('unknown')
948faff7 1798 ).rejects.toThrow(
b0b55f57 1799 new Error(
711623b8 1800 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function to a non-existing task function'`
b0b55f57
JB
1801 )
1802 )
31847469
JB
1803 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1804 { name: DEFAULT_TASK_NAME },
31847469 1805 { name: 'factorial' },
3a502712 1806 { name: 'fibonacci' },
97231086 1807 { name: 'jsonIntegerSerialization' },
9eae3c69
JB
1808 ])
1809 await expect(
1810 dynamicThreadPool.setDefaultTaskFunction('factorial')
1811 ).resolves.toBe(true)
31847469
JB
1812 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1813 { name: DEFAULT_TASK_NAME },
1814 { name: 'factorial' },
3a502712 1815 { name: 'fibonacci' },
97231086 1816 { name: 'jsonIntegerSerialization' },
9eae3c69
JB
1817 ])
1818 await expect(
1819 dynamicThreadPool.setDefaultTaskFunction('fibonacci')
1820 ).resolves.toBe(true)
31847469
JB
1821 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1822 { name: DEFAULT_TASK_NAME },
1823 { name: 'fibonacci' },
3a502712 1824 { name: 'factorial' },
97231086 1825 { name: 'jsonIntegerSerialization' },
9eae3c69 1826 ])
cda9ba34 1827 await dynamicThreadPool.destroy()
30500265
JB
1828 })
1829
90d7d101 1830 it('Verify that multiple task functions worker is working', async () => {
70a4f5ea 1831 const pool = new DynamicClusterPool(
2431bdb4 1832 Math.floor(numberOfWorkers / 2),
70a4f5ea 1833 numberOfWorkers,
d35e5717 1834 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs'
70a4f5ea
JB
1835 )
1836 const data = { n: 10 }
82888165 1837 const result0 = await pool.execute(data)
97231086 1838 expect(result0).toStrictEqual(3628800)
70a4f5ea 1839 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
30b963d4 1840 expect(result1).toStrictEqual({ ok: 1 })
70a4f5ea
JB
1841 const result2 = await pool.execute(data, 'factorial')
1842 expect(result2).toBe(3628800)
1843 const result3 = await pool.execute(data, 'fibonacci')
024daf59 1844 expect(result3).toBe(55)
5bb5be17
JB
1845 expect(pool.info.executingTasks).toBe(0)
1846 expect(pool.info.executedTasks).toBe(4)
b414b84c 1847 for (const workerNode of pool.workerNodes) {
31847469
JB
1848 expect(workerNode.info.taskFunctionsProperties).toStrictEqual([
1849 { name: DEFAULT_TASK_NAME },
31847469 1850 { name: 'factorial' },
3a502712 1851 { name: 'fibonacci' },
97231086 1852 { name: 'jsonIntegerSerialization' },
b414b84c
JB
1853 ])
1854 expect(workerNode.taskFunctionsUsage.size).toBe(3)
74111331 1855 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
fcfc3353 1856 expect(workerNode.tasksQueue.enablePriority).toBe(false)
31847469
JB
1857 for (const taskFunctionProperties of pool.listTaskFunctionsProperties()) {
1858 expect(
1859 workerNode.getTaskFunctionWorkerUsage(taskFunctionProperties.name)
1860 ).toStrictEqual({
97231086
JB
1861 elu: {
1862 active: {
1863 history: expect.any(CircularBuffer),
1864 },
1865 idle: {
1866 history: expect.any(CircularBuffer),
1867 },
1868 },
1869 runTime: {
1870 history: expect.any(CircularBuffer),
1871 },
5bb5be17
JB
1872 tasks: {
1873 executed: expect.any(Number),
4ba4c7f9 1874 executing: 0,
5bb5be17 1875 failed: 0,
68cbdc84 1876 queued: 0,
463226a4 1877 sequentiallyStolen: 0,
3a502712 1878 stolen: 0,
5bb5be17 1879 },
5bb5be17 1880 waitTime: {
3a502712 1881 history: expect.any(CircularBuffer),
5bb5be17 1882 },
5bb5be17
JB
1883 })
1884 expect(
31847469
JB
1885 workerNode.getTaskFunctionWorkerUsage(taskFunctionProperties.name)
1886 .tasks.executed
4ba4c7f9 1887 ).toBeGreaterThan(0)
5bb5be17 1888 }
dfd7ec01
JB
1889 expect(
1890 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
8139fbbc
JB
1891 ).toStrictEqual(
1892 workerNode.getTaskFunctionWorkerUsage(
1893 workerNode.info.taskFunctionsProperties[1].name
1894 )
1895 )
1896 }
1897 await pool.destroy()
1898 })
1899
a390c10d 1900 it('Verify that mapExecute() is working', async () => {
2155d8bb
JB
1901 const pool = new DynamicThreadPool(
1902 Math.floor(numberOfWorkers / 2),
1903 numberOfWorkers,
1904 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1905 )
390300c3 1906 await expect(pool.mapExecute()).rejects.toThrow(
fe6df285
JB
1907 new TypeError('data argument must be a defined iterable')
1908 )
390300c3 1909 await expect(pool.mapExecute(0)).rejects.toThrow(
fe6df285
JB
1910 new TypeError('data argument must be an iterable')
1911 )
390300c3
JB
1912 await expect(pool.mapExecute([undefined], 0)).rejects.toThrow(
1913 new TypeError('name argument must be a string')
1914 )
1915 await expect(pool.mapExecute([undefined], '')).rejects.toThrow(
1916 new TypeError('name argument must not be an empty string')
1917 )
1918 await expect(pool.mapExecute([undefined], undefined, {})).rejects.toThrow(
1919 new TypeError('transferList argument must be an array')
1920 )
1921 await expect(pool.mapExecute([undefined], 'unknown')).rejects.toBe(
1922 "Task function 'unknown' not found"
1923 )
97231086
JB
1924 let results = await pool.mapExecute(
1925 [{}, {}, {}, {}],
1926 'jsonIntegerSerialization'
1927 )
fe6df285 1928 expect(results).toStrictEqual([{ ok: 1 }, { ok: 1 }, { ok: 1 }, { ok: 1 }])
1789b1eb
JB
1929 expect(pool.info.executingTasks).toBe(0)
1930 expect(pool.info.executedTasks).toBe(4)
fe6df285
JB
1931 results = await pool.mapExecute(
1932 [{ n: 10 }, { n: 20 }, { n: 30 }, { n: 40 }],
1933 'factorial'
1934 )
2155d8bb 1935 expect(results).toStrictEqual([
fe6df285 1936 3628800, 2432902008176640000, 2.6525285981219103e32, 8.159152832478977e47,
2155d8bb
JB
1937 ])
1938 expect(pool.info.executingTasks).toBe(0)
1789b1eb 1939 expect(pool.info.executedTasks).toBe(8)
fe6df285
JB
1940 results = await pool.mapExecute(
1941 new Set([{ n: 10 }, { n: 20 }, { n: 30 }, { n: 40 }]),
1942 'factorial'
1943 )
27469db4 1944 expect(results).toStrictEqual([
fe6df285 1945 3628800, 2432902008176640000, 2.6525285981219103e32, 8.159152832478977e47,
27469db4
JB
1946 ])
1947 expect(pool.info.executingTasks).toBe(0)
1948 expect(pool.info.executedTasks).toBe(12)
2155d8bb 1949 await pool.destroy()
390300c3
JB
1950 await expect(pool.mapExecute()).rejects.toThrow(
1951 new Error('Cannot execute task(s) on not started pool')
1952 )
2155d8bb
JB
1953 })
1954
8139fbbc
JB
1955 it('Verify that task function objects worker is working', async () => {
1956 const pool = new DynamicThreadPool(
1957 Math.floor(numberOfWorkers / 2),
1958 numberOfWorkers,
1959 './tests/worker-files/thread/testTaskFunctionObjectsWorker.mjs'
1960 )
1961 const data = { n: 10 }
1962 const result0 = await pool.execute(data)
97231086 1963 expect(result0).toStrictEqual(3628800)
8139fbbc
JB
1964 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
1965 expect(result1).toStrictEqual({ ok: 1 })
1966 const result2 = await pool.execute(data, 'factorial')
1967 expect(result2).toBe(3628800)
1968 const result3 = await pool.execute(data, 'fibonacci')
1969 expect(result3).toBe(55)
1970 expect(pool.info.executingTasks).toBe(0)
1971 expect(pool.info.executedTasks).toBe(4)
1972 for (const workerNode of pool.workerNodes) {
1973 expect(workerNode.info.taskFunctionsProperties).toStrictEqual([
1974 { name: DEFAULT_TASK_NAME },
8139fbbc 1975 { name: 'factorial' },
3a502712 1976 { name: 'fibonacci', priority: -5 },
97231086 1977 { name: 'jsonIntegerSerialization' },
8139fbbc
JB
1978 ])
1979 expect(workerNode.taskFunctionsUsage.size).toBe(3)
1980 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
fcfc3353 1981 expect(workerNode.tasksQueue.enablePriority).toBe(true)
8139fbbc
JB
1982 for (const taskFunctionProperties of pool.listTaskFunctionsProperties()) {
1983 expect(
1984 workerNode.getTaskFunctionWorkerUsage(taskFunctionProperties.name)
1985 ).toStrictEqual({
97231086
JB
1986 elu: {
1987 active: {
1988 history: expect.any(CircularBuffer),
1989 },
1990 idle: {
1991 history: expect.any(CircularBuffer),
1992 },
1993 },
1994 runTime: {
1995 history: expect.any(CircularBuffer),
1996 },
8139fbbc
JB
1997 tasks: {
1998 executed: expect.any(Number),
1999 executing: 0,
2000 failed: 0,
2001 queued: 0,
2002 sequentiallyStolen: 0,
3a502712 2003 stolen: 0,
8139fbbc 2004 },
8139fbbc 2005 waitTime: {
3a502712 2006 history: expect.any(CircularBuffer),
8139fbbc 2007 },
8139fbbc
JB
2008 })
2009 expect(
2010 workerNode.getTaskFunctionWorkerUsage(taskFunctionProperties.name)
2011 .tasks.executed
2012 ).toBeGreaterThan(0)
2013 }
2014 expect(
2015 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
dfd7ec01 2016 ).toStrictEqual(
66979634 2017 workerNode.getTaskFunctionWorkerUsage(
31847469 2018 workerNode.info.taskFunctionsProperties[1].name
66979634 2019 )
dfd7ec01 2020 )
5bb5be17 2021 }
0fe39c97 2022 await pool.destroy()
70a4f5ea 2023 })
52a23942
JB
2024
2025 it('Verify sendKillMessageToWorker()', async () => {
2026 const pool = new DynamicClusterPool(
2027 Math.floor(numberOfWorkers / 2),
2028 numberOfWorkers,
d35e5717 2029 './tests/worker-files/cluster/testWorker.cjs'
52a23942
JB
2030 )
2031 const workerNodeKey = 0
2032 await expect(
adee6053 2033 pool.sendKillMessageToWorker(workerNodeKey)
52a23942
JB
2034 ).resolves.toBeUndefined()
2035 await pool.destroy()
2036 })
adee6053
JB
2037
2038 it('Verify sendTaskFunctionOperationToWorker()', async () => {
2039 const pool = new DynamicClusterPool(
2040 Math.floor(numberOfWorkers / 2),
2041 numberOfWorkers,
d35e5717 2042 './tests/worker-files/cluster/testWorker.cjs'
adee6053
JB
2043 )
2044 const workerNodeKey = 0
2045 await expect(
2046 pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
97231086 2047 taskFunction: (() => {}).toString(),
adee6053 2048 taskFunctionOperation: 'add',
31847469 2049 taskFunctionProperties: { name: 'empty' },
adee6053
JB
2050 })
2051 ).resolves.toBe(true)
2052 expect(
31847469
JB
2053 pool.workerNodes[workerNodeKey].info.taskFunctionsProperties
2054 ).toStrictEqual([
2055 { name: DEFAULT_TASK_NAME },
2056 { name: 'test' },
3a502712 2057 { name: 'empty' },
31847469 2058 ])
adee6053
JB
2059 await pool.destroy()
2060 })
2061
2062 it('Verify sendTaskFunctionOperationToWorkers()', async () => {
2063 const pool = new DynamicClusterPool(
2064 Math.floor(numberOfWorkers / 2),
2065 numberOfWorkers,
d35e5717 2066 './tests/worker-files/cluster/testWorker.cjs'
adee6053 2067 )
adee6053
JB
2068 await expect(
2069 pool.sendTaskFunctionOperationToWorkers({
97231086 2070 taskFunction: (() => {}).toString(),
adee6053 2071 taskFunctionOperation: 'add',
31847469 2072 taskFunctionProperties: { name: 'empty' },
adee6053
JB
2073 })
2074 ).resolves.toBe(true)
2075 for (const workerNode of pool.workerNodes) {
31847469
JB
2076 expect(workerNode.info.taskFunctionsProperties).toStrictEqual([
2077 { name: DEFAULT_TASK_NAME },
2078 { name: 'test' },
3a502712 2079 { name: 'empty' },
adee6053
JB
2080 ])
2081 }
2082 await pool.destroy()
2083 })
3ec964d6 2084})