refactor: move queueing code into its own directory
[poolifier.git] / tests / pools / abstract-pool.test.mjs
CommitLineData
5d5885ee 1// eslint-disable-next-line n/no-unsupported-features/node-builtins
ded253e2 2import { createHook, executionAsyncId } from 'node:async_hooks'
a074ffee
JB
3import { EventEmitterAsyncResource } from 'node:events'
4import { readFileSync } from 'node:fs'
ded253e2 5import { dirname, join } from 'node:path'
2eb14889 6import { fileURLToPath } from 'node:url'
ded253e2 7
a074ffee
JB
8import { expect } from 'expect'
9import { restore, stub } from 'sinon'
ded253e2 10
f12182ad 11import { CircularBuffer } from '../../lib/circular-buffer.cjs'
a074ffee 12import {
70a4f5ea 13 DynamicClusterPool,
9e619829 14 DynamicThreadPool,
aee46736 15 FixedClusterPool,
e843b904 16 FixedThreadPool,
aee46736 17 PoolEvents,
184855e6 18 PoolTypes,
3d6dd312 19 WorkerChoiceStrategies,
3a502712 20 WorkerTypes,
d35e5717 21} from '../../lib/index.cjs'
ded253e2 22import { WorkerNode } from '../../lib/pools/worker-node.cjs'
c6dd1aeb
JB
23import { PriorityQueue } from '../../lib/queues/priority-queue.cjs'
24import { defaultBucketSize } from '../../lib/queues/queue-types.cjs'
d35e5717
JB
25import { DEFAULT_TASK_NAME } from '../../lib/utils.cjs'
26import { waitPoolEvents } from '../test-utils.cjs'
e1ffb94f
JB
27
28describe('Abstract pool test suite', () => {
2eb14889
JB
29 const version = JSON.parse(
30 readFileSync(
a5e5599c 31 join(dirname(fileURLToPath(import.meta.url)), '../..', 'package.json'),
2eb14889
JB
32 'utf8'
33 )
34 ).version
fc027381 35 const numberOfWorkers = 2
a8884ffd 36 class StubPoolWithIsMain extends FixedThreadPool {
e1ffb94f
JB
37 isMain () {
38 return false
39 }
3ec964d6 40 }
3ec964d6 41
dc021bcc 42 afterEach(() => {
a074ffee 43 restore()
dc021bcc
JB
44 })
45
bcf85f7f
JB
46 it('Verify that pool can be created and destroyed', async () => {
47 const pool = new FixedThreadPool(
48 numberOfWorkers,
49 './tests/worker-files/thread/testWorker.mjs'
50 )
51 expect(pool).toBeInstanceOf(FixedThreadPool)
52 await pool.destroy()
53 })
54
55 it('Verify that pool cannot be created from a non main thread/process', () => {
8d3782fa
JB
56 expect(
57 () =>
a8884ffd 58 new StubPoolWithIsMain(
7c0ba920 59 numberOfWorkers,
b2fd3f4a 60 './tests/worker-files/thread/testWorker.mjs',
8d3782fa 61 {
3a502712 62 errorHandler: e => console.error(e),
8d3782fa
JB
63 }
64 )
948faff7 65 ).toThrow(
e695d66f
JB
66 new Error(
67 'Cannot start a pool from a worker with the same type as the pool'
68 )
04f45163 69 )
3ec964d6 70 })
c510fea7 71
bc61cfe6
JB
72 it('Verify that pool statuses properties are set', async () => {
73 const pool = new FixedThreadPool(
74 numberOfWorkers,
b2fd3f4a 75 './tests/worker-files/thread/testWorker.mjs'
bc61cfe6 76 )
bc61cfe6 77 expect(pool.started).toBe(true)
711623b8
JB
78 expect(pool.starting).toBe(false)
79 expect(pool.destroying).toBe(false)
bc61cfe6 80 await pool.destroy()
bc61cfe6
JB
81 })
82
c510fea7 83 it('Verify that filePath is checked', () => {
948faff7 84 expect(() => new FixedThreadPool(numberOfWorkers)).toThrow(
c3719753
JB
85 new TypeError('The worker file path must be specified')
86 )
87 expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrow(
88 new TypeError('The worker file path must be a string')
3d6dd312
JB
89 )
90 expect(
91 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
948faff7 92 ).toThrow(new Error("Cannot find the worker file './dummyWorker.ts'"))
8d3782fa
JB
93 })
94
95 it('Verify that numberOfWorkers is checked', () => {
8003c026
JB
96 expect(
97 () =>
98 new FixedThreadPool(
99 undefined,
b2fd3f4a 100 './tests/worker-files/thread/testWorker.mjs'
8003c026 101 )
948faff7 102 ).toThrow(
e695d66f
JB
103 new Error(
104 'Cannot instantiate a pool without specifying the number of workers'
105 )
8d3782fa
JB
106 )
107 })
108
109 it('Verify that a negative number of workers is checked', () => {
110 expect(
111 () =>
d35e5717 112 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.cjs')
948faff7 113 ).toThrow(
473c717a
JB
114 new RangeError(
115 'Cannot instantiate a pool with a negative number of workers'
116 )
8d3782fa
JB
117 )
118 })
119
120 it('Verify that a non integer number of workers is checked', () => {
121 expect(
122 () =>
b2fd3f4a 123 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.mjs')
948faff7 124 ).toThrow(
473c717a 125 new TypeError(
0d80593b 126 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
127 )
128 )
c510fea7 129 })
7c0ba920 130
26ce26ca
JB
131 it('Verify that pool arguments number and pool type are checked', () => {
132 expect(
133 () =>
134 new FixedThreadPool(
135 numberOfWorkers,
136 './tests/worker-files/thread/testWorker.mjs',
137 undefined,
138 numberOfWorkers * 2
139 )
140 ).toThrow(
141 new Error(
142 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
143 )
144 )
145 })
146
216541b6 147 it('Verify that dynamic pool sizing is checked', () => {
a5ed75b7
JB
148 expect(
149 () =>
150 new DynamicClusterPool(
151 1,
152 undefined,
d35e5717 153 './tests/worker-files/cluster/testWorker.cjs'
a5ed75b7 154 )
948faff7 155 ).toThrow(
a5ed75b7
JB
156 new TypeError(
157 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
158 )
159 )
2761efb4
JB
160 expect(
161 () =>
162 new DynamicThreadPool(
163 0.5,
164 1,
b2fd3f4a 165 './tests/worker-files/thread/testWorker.mjs'
2761efb4 166 )
948faff7 167 ).toThrow(
2761efb4
JB
168 new TypeError(
169 'Cannot instantiate a pool with a non safe integer number of workers'
170 )
171 )
172 expect(
173 () =>
174 new DynamicClusterPool(
175 0,
176 0.5,
d35e5717 177 './tests/worker-files/cluster/testWorker.cjs'
2761efb4 178 )
948faff7 179 ).toThrow(
2761efb4
JB
180 new TypeError(
181 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
182 )
183 )
2431bdb4
JB
184 expect(
185 () =>
b2fd3f4a
JB
186 new DynamicThreadPool(
187 2,
188 1,
189 './tests/worker-files/thread/testWorker.mjs'
190 )
948faff7 191 ).toThrow(
2431bdb4
JB
192 new RangeError(
193 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
194 )
195 )
196 expect(
197 () =>
b2fd3f4a
JB
198 new DynamicThreadPool(
199 0,
200 0,
201 './tests/worker-files/thread/testWorker.mjs'
202 )
948faff7 203 ).toThrow(
2431bdb4 204 new RangeError(
213cbac6 205 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
2431bdb4
JB
206 )
207 )
21f710aa
JB
208 expect(
209 () =>
213cbac6
JB
210 new DynamicClusterPool(
211 1,
212 1,
d35e5717 213 './tests/worker-files/cluster/testWorker.cjs'
213cbac6 214 )
948faff7 215 ).toThrow(
21f710aa 216 new RangeError(
213cbac6 217 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
21f710aa
JB
218 )
219 )
2431bdb4
JB
220 })
221
fd7ebd49 222 it('Verify that pool options are checked', async () => {
7c0ba920
JB
223 let pool = new FixedThreadPool(
224 numberOfWorkers,
b2fd3f4a 225 './tests/worker-files/thread/testWorker.mjs'
7c0ba920 226 )
b5604034 227 expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
e44639e9 228 expect(pool.emitter.eventNames()).toStrictEqual([])
47352846
JB
229 expect(pool.opts).toStrictEqual({
230 startWorkers: true,
231 enableEvents: true,
232 restartWorkerOnError: true,
233 enableTasksQueue: false,
3a502712 234 workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
8990357d 235 })
bcfb06ce 236 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext
999ef664 237 .workerChoiceStrategies) {
86cbb766 238 expect(workerChoiceStrategy.opts).toStrictEqual({
86cbb766
JB
239 runTime: { median: false },
240 waitTime: { median: false },
241 elu: { median: false },
242 weights: expect.objectContaining({
243 0: expect.any(Number),
3a502712
JB
244 [pool.info.maxSize - 1]: expect.any(Number),
245 }),
86cbb766 246 })
999ef664 247 }
fd7ebd49 248 await pool.destroy()
73bfd59d 249 const testHandler = () => console.info('test handler executed')
7c0ba920
JB
250 pool = new FixedThreadPool(
251 numberOfWorkers,
b2fd3f4a 252 './tests/worker-files/thread/testWorker.mjs',
7c0ba920 253 {
e4543b14 254 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
49be33fe 255 workerChoiceStrategyOptions: {
932fc8be 256 runTime: { median: true },
3a502712 257 weights: { 0: 300, 1: 200 },
49be33fe 258 },
35cf1c03 259 enableEvents: false,
1f68cede 260 restartWorkerOnError: false,
ff733df7 261 enableTasksQueue: true,
d4aeae5a 262 tasksQueueOptions: { concurrency: 2 },
35cf1c03
JB
263 messageHandler: testHandler,
264 errorHandler: testHandler,
265 onlineHandler: testHandler,
3a502712 266 exitHandler: testHandler,
7c0ba920
JB
267 }
268 )
7c0ba920 269 expect(pool.emitter).toBeUndefined()
47352846
JB
270 expect(pool.opts).toStrictEqual({
271 startWorkers: true,
272 enableEvents: false,
273 restartWorkerOnError: false,
274 enableTasksQueue: true,
275 tasksQueueOptions: {
276 concurrency: 2,
2324f8c9 277 size: Math.pow(numberOfWorkers, 2),
dbd73092 278 taskStealing: true,
2eee7220 279 tasksStealingOnBackPressure: false,
3a502712 280 tasksFinishedTimeout: 2000,
47352846
JB
281 },
282 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
283 workerChoiceStrategyOptions: {
47352846 284 runTime: { median: true },
3a502712 285 weights: { 0: 300, 1: 200 },
47352846
JB
286 },
287 onlineHandler: testHandler,
288 messageHandler: testHandler,
289 errorHandler: testHandler,
3a502712 290 exitHandler: testHandler,
8990357d 291 })
bcfb06ce 292 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext
999ef664
JB
293 .workerChoiceStrategies) {
294 expect(workerChoiceStrategy.opts).toStrictEqual({
999ef664
JB
295 runTime: { median: true },
296 waitTime: { median: false },
297 elu: { median: false },
3a502712 298 weights: { 0: 300, 1: 200 },
999ef664
JB
299 })
300 }
fd7ebd49 301 await pool.destroy()
7c0ba920
JB
302 })
303
fe291b64 304 it('Verify that pool options are validated', () => {
d4aeae5a
JB
305 expect(
306 () =>
307 new FixedThreadPool(
308 numberOfWorkers,
b2fd3f4a 309 './tests/worker-files/thread/testWorker.mjs',
d4aeae5a 310 {
3a502712 311 workerChoiceStrategy: 'invalidStrategy',
d4aeae5a
JB
312 }
313 )
948faff7 314 ).toThrow(new Error("Invalid worker choice strategy 'invalidStrategy'"))
49be33fe
JB
315 expect(
316 () =>
317 new FixedThreadPool(
318 numberOfWorkers,
b2fd3f4a 319 './tests/worker-files/thread/testWorker.mjs',
49be33fe 320 {
3a502712 321 workerChoiceStrategyOptions: { weights: {} },
49be33fe
JB
322 }
323 )
948faff7 324 ).toThrow(
8735b4e5
JB
325 new Error(
326 'Invalid worker choice strategy options: must have a weight for each worker node'
327 )
49be33fe 328 )
f0d7f803
JB
329 expect(
330 () =>
331 new FixedThreadPool(
332 numberOfWorkers,
b2fd3f4a 333 './tests/worker-files/thread/testWorker.mjs',
f0d7f803 334 {
3a502712 335 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' },
f0d7f803
JB
336 }
337 )
948faff7 338 ).toThrow(
8735b4e5
JB
339 new Error(
340 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
341 )
f0d7f803 342 )
5c4c2dee
JB
343 expect(
344 () =>
345 new FixedThreadPool(
346 numberOfWorkers,
b2fd3f4a 347 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
348 {
349 enableTasksQueue: true,
3a502712 350 tasksQueueOptions: 'invalidTasksQueueOptions',
5c4c2dee
JB
351 }
352 )
948faff7 353 ).toThrow(
5c4c2dee
JB
354 new TypeError('Invalid tasks queue options: must be a plain object')
355 )
f0d7f803
JB
356 expect(
357 () =>
358 new FixedThreadPool(
359 numberOfWorkers,
b2fd3f4a 360 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
361 {
362 enableTasksQueue: true,
3a502712 363 tasksQueueOptions: { concurrency: 0 },
f0d7f803
JB
364 }
365 )
948faff7 366 ).toThrow(
e695d66f 367 new RangeError(
20c6f652 368 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
8735b4e5
JB
369 )
370 )
f0d7f803
JB
371 expect(
372 () =>
373 new FixedThreadPool(
374 numberOfWorkers,
b2fd3f4a 375 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
376 {
377 enableTasksQueue: true,
3a502712 378 tasksQueueOptions: { concurrency: -1 },
f0d7f803
JB
379 }
380 )
948faff7 381 ).toThrow(
5c4c2dee
JB
382 new RangeError(
383 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
384 )
8735b4e5 385 )
f0d7f803
JB
386 expect(
387 () =>
388 new FixedThreadPool(
389 numberOfWorkers,
b2fd3f4a 390 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
391 {
392 enableTasksQueue: true,
3a502712 393 tasksQueueOptions: { concurrency: 0.2 },
f0d7f803
JB
394 }
395 )
948faff7 396 ).toThrow(
20c6f652 397 new TypeError('Invalid worker node tasks concurrency: must be an integer')
8735b4e5 398 )
5c4c2dee
JB
399 expect(
400 () =>
401 new FixedThreadPool(
402 numberOfWorkers,
b2fd3f4a 403 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
404 {
405 enableTasksQueue: true,
3a502712 406 tasksQueueOptions: { size: 0 },
5c4c2dee
JB
407 }
408 )
948faff7 409 ).toThrow(
5c4c2dee
JB
410 new RangeError(
411 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
412 )
413 )
414 expect(
415 () =>
416 new FixedThreadPool(
417 numberOfWorkers,
b2fd3f4a 418 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
419 {
420 enableTasksQueue: true,
3a502712 421 tasksQueueOptions: { size: -1 },
5c4c2dee
JB
422 }
423 )
948faff7 424 ).toThrow(
5c4c2dee
JB
425 new RangeError(
426 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
427 )
428 )
429 expect(
430 () =>
431 new FixedThreadPool(
432 numberOfWorkers,
b2fd3f4a 433 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
434 {
435 enableTasksQueue: true,
3a502712 436 tasksQueueOptions: { size: 0.2 },
5c4c2dee
JB
437 }
438 )
948faff7 439 ).toThrow(
5c4c2dee
JB
440 new TypeError('Invalid worker node tasks queue size: must be an integer')
441 )
d4aeae5a
JB
442 })
443
2431bdb4 444 it('Verify that pool worker choice strategy options can be set', async () => {
a20f0ba5
JB
445 const pool = new FixedThreadPool(
446 numberOfWorkers,
b2fd3f4a 447 './tests/worker-files/thread/testWorker.mjs',
a20f0ba5
JB
448 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
449 )
26ce26ca 450 expect(pool.opts.workerChoiceStrategyOptions).toBeUndefined()
bcfb06ce 451 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext
a20f0ba5 452 .workerChoiceStrategies) {
86cbb766 453 expect(workerChoiceStrategy.opts).toStrictEqual({
86cbb766
JB
454 runTime: { median: false },
455 waitTime: { median: false },
456 elu: { median: false },
457 weights: expect.objectContaining({
458 0: expect.any(Number),
3a502712
JB
459 [pool.info.maxSize - 1]: expect.any(Number),
460 }),
86cbb766 461 })
a20f0ba5 462 }
87de9ff5 463 expect(
bcfb06ce 464 pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
87de9ff5 465 ).toStrictEqual({
932fc8be
JB
466 runTime: {
467 aggregate: true,
468 average: true,
3a502712 469 median: false,
932fc8be
JB
470 },
471 waitTime: {
e0843544
JB
472 aggregate: true,
473 average: true,
3a502712 474 median: false,
932fc8be 475 },
5df69fab 476 elu: {
9adcefab
JB
477 aggregate: true,
478 average: true,
3a502712
JB
479 median: false,
480 },
86bf340d 481 })
9adcefab
JB
482 pool.setWorkerChoiceStrategyOptions({
483 runTime: { median: true },
3a502712 484 elu: { median: true },
9adcefab 485 })
a20f0ba5 486 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
9adcefab 487 runTime: { median: true },
3a502712 488 elu: { median: true },
8990357d 489 })
bcfb06ce 490 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext
a20f0ba5 491 .workerChoiceStrategies) {
86cbb766 492 expect(workerChoiceStrategy.opts).toStrictEqual({
86cbb766
JB
493 runTime: { median: true },
494 waitTime: { median: false },
495 elu: { median: true },
496 weights: expect.objectContaining({
497 0: expect.any(Number),
3a502712
JB
498 [pool.info.maxSize - 1]: expect.any(Number),
499 }),
86cbb766 500 })
a20f0ba5 501 }
87de9ff5 502 expect(
bcfb06ce 503 pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
87de9ff5 504 ).toStrictEqual({
932fc8be
JB
505 runTime: {
506 aggregate: true,
507 average: false,
3a502712 508 median: true,
932fc8be
JB
509 },
510 waitTime: {
e0843544
JB
511 aggregate: true,
512 average: true,
3a502712 513 median: false,
932fc8be 514 },
5df69fab 515 elu: {
9adcefab 516 aggregate: true,
5df69fab 517 average: false,
3a502712
JB
518 median: true,
519 },
86bf340d 520 })
9adcefab
JB
521 pool.setWorkerChoiceStrategyOptions({
522 runTime: { median: false },
3a502712 523 elu: { median: false },
9adcefab 524 })
a20f0ba5 525 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
8990357d 526 runTime: { median: false },
3a502712 527 elu: { median: false },
8990357d 528 })
bcfb06ce 529 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext
a20f0ba5 530 .workerChoiceStrategies) {
86cbb766 531 expect(workerChoiceStrategy.opts).toStrictEqual({
86cbb766
JB
532 runTime: { median: false },
533 waitTime: { median: false },
534 elu: { median: false },
535 weights: expect.objectContaining({
536 0: expect.any(Number),
3a502712
JB
537 [pool.info.maxSize - 1]: expect.any(Number),
538 }),
86cbb766 539 })
a20f0ba5 540 }
87de9ff5 541 expect(
bcfb06ce 542 pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
87de9ff5 543 ).toStrictEqual({
932fc8be
JB
544 runTime: {
545 aggregate: true,
546 average: true,
3a502712 547 median: false,
932fc8be
JB
548 },
549 waitTime: {
e0843544
JB
550 aggregate: true,
551 average: true,
3a502712 552 median: false,
932fc8be 553 },
5df69fab 554 elu: {
9adcefab
JB
555 aggregate: true,
556 average: true,
3a502712
JB
557 median: false,
558 },
86bf340d 559 })
1f95d544
JB
560 expect(() =>
561 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
948faff7 562 ).toThrow(
8735b4e5
JB
563 new TypeError(
564 'Invalid worker choice strategy options: must be a plain object'
565 )
1f95d544 566 )
948faff7 567 expect(() => pool.setWorkerChoiceStrategyOptions({ weights: {} })).toThrow(
8735b4e5
JB
568 new Error(
569 'Invalid worker choice strategy options: must have a weight for each worker node'
570 )
1f95d544
JB
571 )
572 expect(() =>
573 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
948faff7 574 ).toThrow(
8735b4e5
JB
575 new Error(
576 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
577 )
1f95d544 578 )
a20f0ba5
JB
579 await pool.destroy()
580 })
581
2431bdb4 582 it('Verify that pool tasks queue can be enabled/disabled', async () => {
a20f0ba5
JB
583 const pool = new FixedThreadPool(
584 numberOfWorkers,
b2fd3f4a 585 './tests/worker-files/thread/testWorker.mjs'
a20f0ba5
JB
586 )
587 expect(pool.opts.enableTasksQueue).toBe(false)
588 expect(pool.opts.tasksQueueOptions).toBeUndefined()
589 pool.enableTasksQueue(true)
590 expect(pool.opts.enableTasksQueue).toBe(true)
20c6f652
JB
591 expect(pool.opts.tasksQueueOptions).toStrictEqual({
592 concurrency: 1,
2324f8c9 593 size: Math.pow(numberOfWorkers, 2),
dbd73092 594 taskStealing: true,
2eee7220 595 tasksStealingOnBackPressure: false,
3a502712 596 tasksFinishedTimeout: 2000,
20c6f652 597 })
a20f0ba5
JB
598 pool.enableTasksQueue(true, { concurrency: 2 })
599 expect(pool.opts.enableTasksQueue).toBe(true)
20c6f652
JB
600 expect(pool.opts.tasksQueueOptions).toStrictEqual({
601 concurrency: 2,
2324f8c9 602 size: Math.pow(numberOfWorkers, 2),
dbd73092 603 taskStealing: true,
2eee7220 604 tasksStealingOnBackPressure: false,
3a502712 605 tasksFinishedTimeout: 2000,
20c6f652 606 })
a20f0ba5
JB
607 pool.enableTasksQueue(false)
608 expect(pool.opts.enableTasksQueue).toBe(false)
609 expect(pool.opts.tasksQueueOptions).toBeUndefined()
610 await pool.destroy()
611 })
612
2431bdb4 613 it('Verify that pool tasks queue options can be set', async () => {
a20f0ba5
JB
614 const pool = new FixedThreadPool(
615 numberOfWorkers,
b2fd3f4a 616 './tests/worker-files/thread/testWorker.mjs',
a20f0ba5
JB
617 { enableTasksQueue: true }
618 )
20c6f652
JB
619 expect(pool.opts.tasksQueueOptions).toStrictEqual({
620 concurrency: 1,
2324f8c9 621 size: Math.pow(numberOfWorkers, 2),
dbd73092 622 taskStealing: true,
2eee7220 623 tasksStealingOnBackPressure: false,
3a502712 624 tasksFinishedTimeout: 2000,
20c6f652 625 })
d6ca1416 626 for (const workerNode of pool.workerNodes) {
2324f8c9
JB
627 expect(workerNode.tasksQueueBackPressureSize).toBe(
628 pool.opts.tasksQueueOptions.size
629 )
d6ca1416
JB
630 }
631 pool.setTasksQueueOptions({
632 concurrency: 2,
2324f8c9 633 size: 2,
d6ca1416 634 taskStealing: false,
32b141fd 635 tasksStealingOnBackPressure: false,
3a502712 636 tasksFinishedTimeout: 3000,
d6ca1416 637 })
20c6f652
JB
638 expect(pool.opts.tasksQueueOptions).toStrictEqual({
639 concurrency: 2,
2324f8c9 640 size: 2,
d6ca1416 641 taskStealing: false,
32b141fd 642 tasksStealingOnBackPressure: false,
3a502712 643 tasksFinishedTimeout: 3000,
d6ca1416
JB
644 })
645 for (const workerNode of pool.workerNodes) {
2324f8c9
JB
646 expect(workerNode.tasksQueueBackPressureSize).toBe(
647 pool.opts.tasksQueueOptions.size
648 )
d6ca1416
JB
649 }
650 pool.setTasksQueueOptions({
651 concurrency: 1,
652 taskStealing: true,
3a502712 653 tasksStealingOnBackPressure: true,
d6ca1416
JB
654 })
655 expect(pool.opts.tasksQueueOptions).toStrictEqual({
656 concurrency: 1,
2324f8c9 657 size: Math.pow(numberOfWorkers, 2),
dbd73092 658 taskStealing: true,
32b141fd 659 tasksStealingOnBackPressure: true,
3a502712 660 tasksFinishedTimeout: 2000,
20c6f652 661 })
d6ca1416 662 for (const workerNode of pool.workerNodes) {
2324f8c9
JB
663 expect(workerNode.tasksQueueBackPressureSize).toBe(
664 pool.opts.tasksQueueOptions.size
665 )
d6ca1416 666 }
948faff7 667 expect(() => pool.setTasksQueueOptions('invalidTasksQueueOptions')).toThrow(
8735b4e5
JB
668 new TypeError('Invalid tasks queue options: must be a plain object')
669 )
948faff7 670 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrow(
e695d66f 671 new RangeError(
20c6f652 672 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
8735b4e5
JB
673 )
674 )
948faff7 675 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrow(
e695d66f 676 new RangeError(
20c6f652 677 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
8735b4e5 678 )
a20f0ba5 679 )
948faff7 680 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrow(
20c6f652
JB
681 new TypeError('Invalid worker node tasks concurrency: must be an integer')
682 )
948faff7 683 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrow(
20c6f652 684 new RangeError(
68dbcdc0 685 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
20c6f652
JB
686 )
687 )
948faff7 688 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrow(
20c6f652 689 new RangeError(
68dbcdc0 690 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
20c6f652
JB
691 )
692 )
948faff7 693 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrow(
68dbcdc0 694 new TypeError('Invalid worker node tasks queue size: must be an integer')
f0d7f803 695 )
a20f0ba5
JB
696 await pool.destroy()
697 })
698
6b27d407
JB
699 it('Verify that pool info is set', async () => {
700 let pool = new FixedThreadPool(
701 numberOfWorkers,
b2fd3f4a 702 './tests/worker-files/thread/testWorker.mjs'
6b27d407 703 )
2dca6cab
JB
704 expect(pool.info).toStrictEqual({
705 version,
706 type: PoolTypes.fixed,
707 worker: WorkerTypes.thread,
47352846 708 started: true,
2dca6cab 709 ready: true,
bcfb06ce 710 defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
0e8587d2 711 strategyRetries: 0,
2dca6cab
JB
712 minSize: numberOfWorkers,
713 maxSize: numberOfWorkers,
714 workerNodes: numberOfWorkers,
715 idleWorkerNodes: numberOfWorkers,
716 busyWorkerNodes: 0,
717 executedTasks: 0,
718 executingTasks: 0,
3a502712 719 failedTasks: 0,
2dca6cab 720 })
6b27d407
JB
721 await pool.destroy()
722 pool = new DynamicClusterPool(
2431bdb4 723 Math.floor(numberOfWorkers / 2),
6b27d407 724 numberOfWorkers,
d35e5717 725 './tests/worker-files/cluster/testWorker.cjs'
6b27d407 726 )
2dca6cab
JB
727 expect(pool.info).toStrictEqual({
728 version,
729 type: PoolTypes.dynamic,
730 worker: WorkerTypes.cluster,
47352846 731 started: true,
2dca6cab 732 ready: true,
bcfb06ce 733 defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
0e8587d2 734 strategyRetries: 0,
2dca6cab
JB
735 minSize: Math.floor(numberOfWorkers / 2),
736 maxSize: numberOfWorkers,
737 workerNodes: Math.floor(numberOfWorkers / 2),
738 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
739 busyWorkerNodes: 0,
740 executedTasks: 0,
741 executingTasks: 0,
3a502712 742 failedTasks: 0,
2dca6cab 743 })
6b27d407
JB
744 await pool.destroy()
745 })
746
2431bdb4 747 it('Verify that pool worker tasks usage are initialized', async () => {
bf9549ae
JB
748 const pool = new FixedClusterPool(
749 numberOfWorkers,
d35e5717 750 './tests/worker-files/cluster/testWorker.cjs'
bf9549ae 751 )
f06e48d8 752 for (const workerNode of pool.workerNodes) {
47352846 753 expect(workerNode).toBeInstanceOf(WorkerNode)
465b2940 754 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
755 tasks: {
756 executed: 0,
757 executing: 0,
758 queued: 0,
df593701 759 maxQueued: 0,
463226a4 760 sequentiallyStolen: 0,
68cbdc84 761 stolen: 0,
3a502712 762 failed: 0,
a4e07f72
JB
763 },
764 runTime: {
3a502712 765 history: expect.any(CircularBuffer),
a4e07f72
JB
766 },
767 waitTime: {
3a502712 768 history: expect.any(CircularBuffer),
a4e07f72 769 },
5df69fab
JB
770 elu: {
771 idle: {
3a502712 772 history: expect.any(CircularBuffer),
5df69fab
JB
773 },
774 active: {
3a502712
JB
775 history: expect.any(CircularBuffer),
776 },
777 },
86bf340d 778 })
f06e48d8
JB
779 }
780 await pool.destroy()
781 })
782
2431bdb4
JB
783 it('Verify that pool worker tasks queue are initialized', async () => {
784 let pool = new FixedClusterPool(
f06e48d8 785 numberOfWorkers,
d35e5717 786 './tests/worker-files/cluster/testWorker.cjs'
f06e48d8
JB
787 )
788 for (const workerNode of pool.workerNodes) {
47352846 789 expect(workerNode).toBeInstanceOf(WorkerNode)
95d1a734 790 expect(workerNode.tasksQueue).toBeInstanceOf(PriorityQueue)
4d8bf9e4 791 expect(workerNode.tasksQueue.size).toBe(0)
9c16fb4b 792 expect(workerNode.tasksQueue.maxSize).toBe(0)
9df282a0 793 expect(workerNode.tasksQueue.bucketSize).toBe(defaultBucketSize)
fcfc3353 794 expect(workerNode.tasksQueue.enablePriority).toBe(false)
bf9549ae 795 }
fd7ebd49 796 await pool.destroy()
2431bdb4
JB
797 pool = new DynamicThreadPool(
798 Math.floor(numberOfWorkers / 2),
799 numberOfWorkers,
b2fd3f4a 800 './tests/worker-files/thread/testWorker.mjs'
2431bdb4
JB
801 )
802 for (const workerNode of pool.workerNodes) {
47352846 803 expect(workerNode).toBeInstanceOf(WorkerNode)
95d1a734 804 expect(workerNode.tasksQueue).toBeInstanceOf(PriorityQueue)
2431bdb4
JB
805 expect(workerNode.tasksQueue.size).toBe(0)
806 expect(workerNode.tasksQueue.maxSize).toBe(0)
9df282a0 807 expect(workerNode.tasksQueue.bucketSize).toBe(defaultBucketSize)
fcfc3353 808 expect(workerNode.tasksQueue.enablePriority).toBe(false)
2431bdb4 809 }
213cbac6 810 await pool.destroy()
2431bdb4
JB
811 })
812
813 it('Verify that pool worker info are initialized', async () => {
814 let pool = new FixedClusterPool(
815 numberOfWorkers,
d35e5717 816 './tests/worker-files/cluster/testWorker.cjs'
2431bdb4 817 )
2dca6cab 818 for (const workerNode of pool.workerNodes) {
47352846 819 expect(workerNode).toBeInstanceOf(WorkerNode)
2dca6cab
JB
820 expect(workerNode.info).toStrictEqual({
821 id: expect.any(Number),
822 type: WorkerTypes.cluster,
823 dynamic: false,
5eb72b9e 824 ready: true,
85b553ba 825 stealing: false,
3a502712 826 backPressure: false,
2dca6cab
JB
827 })
828 }
2431bdb4
JB
829 await pool.destroy()
830 pool = new DynamicThreadPool(
831 Math.floor(numberOfWorkers / 2),
832 numberOfWorkers,
b2fd3f4a 833 './tests/worker-files/thread/testWorker.mjs'
2431bdb4 834 )
2dca6cab 835 for (const workerNode of pool.workerNodes) {
47352846 836 expect(workerNode).toBeInstanceOf(WorkerNode)
2dca6cab
JB
837 expect(workerNode.info).toStrictEqual({
838 id: expect.any(Number),
839 type: WorkerTypes.thread,
840 dynamic: false,
5eb72b9e 841 ready: true,
85b553ba 842 stealing: false,
3a502712 843 backPressure: false,
2dca6cab
JB
844 })
845 }
213cbac6 846 await pool.destroy()
bf9549ae
JB
847 })
848
711623b8
JB
849 it('Verify that pool statuses are checked at start or destroy', async () => {
850 const pool = new FixedThreadPool(
851 numberOfWorkers,
852 './tests/worker-files/thread/testWorker.mjs'
853 )
854 expect(pool.info.started).toBe(true)
855 expect(pool.info.ready).toBe(true)
856 expect(() => pool.start()).toThrow(
857 new Error('Cannot start an already started pool')
858 )
859 await pool.destroy()
860 expect(pool.info.started).toBe(false)
861 expect(pool.info.ready).toBe(false)
862 await expect(pool.destroy()).rejects.toThrow(
863 new Error('Cannot destroy an already destroyed pool')
864 )
865 })
866
47352846
JB
867 it('Verify that pool can be started after initialization', async () => {
868 const pool = new FixedClusterPool(
869 numberOfWorkers,
d35e5717 870 './tests/worker-files/cluster/testWorker.cjs',
47352846 871 {
3a502712 872 startWorkers: false,
47352846
JB
873 }
874 )
875 expect(pool.info.started).toBe(false)
876 expect(pool.info.ready).toBe(false)
877 expect(pool.workerNodes).toStrictEqual([])
8e8d9101 878 expect(pool.readyEventEmitted).toBe(false)
948faff7 879 await expect(pool.execute()).rejects.toThrow(
47352846
JB
880 new Error('Cannot execute a task on not started pool')
881 )
882 pool.start()
883 expect(pool.info.started).toBe(true)
884 expect(pool.info.ready).toBe(true)
55082af9
JB
885 await waitPoolEvents(pool, PoolEvents.ready, 1)
886 expect(pool.readyEventEmitted).toBe(true)
47352846
JB
887 expect(pool.workerNodes.length).toBe(numberOfWorkers)
888 for (const workerNode of pool.workerNodes) {
889 expect(workerNode).toBeInstanceOf(WorkerNode)
890 }
891 await pool.destroy()
892 })
893
9d2d0da1
JB
894 it('Verify that pool execute() arguments are checked', async () => {
895 const pool = new FixedClusterPool(
896 numberOfWorkers,
d35e5717 897 './tests/worker-files/cluster/testWorker.cjs'
9d2d0da1 898 )
948faff7 899 await expect(pool.execute(undefined, 0)).rejects.toThrow(
9d2d0da1
JB
900 new TypeError('name argument must be a string')
901 )
948faff7 902 await expect(pool.execute(undefined, '')).rejects.toThrow(
9d2d0da1
JB
903 new TypeError('name argument must not be an empty string')
904 )
948faff7 905 await expect(pool.execute(undefined, undefined, {})).rejects.toThrow(
9d2d0da1
JB
906 new TypeError('transferList argument must be an array')
907 )
908 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
909 "Task function 'unknown' not found"
910 )
911 await pool.destroy()
948faff7 912 await expect(pool.execute()).rejects.toThrow(
47352846 913 new Error('Cannot execute a task on not started pool')
9d2d0da1
JB
914 )
915 })
916
2431bdb4 917 it('Verify that pool worker tasks usage are computed', async () => {
bf9549ae
JB
918 const pool = new FixedClusterPool(
919 numberOfWorkers,
d35e5717 920 './tests/worker-files/cluster/testWorker.cjs'
bf9549ae 921 )
09c2d0d3 922 const promises = new Set()
fc027381
JB
923 const maxMultiplier = 2
924 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
09c2d0d3 925 promises.add(pool.execute())
bf9549ae 926 }
f06e48d8 927 for (const workerNode of pool.workerNodes) {
465b2940 928 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
929 tasks: {
930 executed: 0,
931 executing: maxMultiplier,
932 queued: 0,
df593701 933 maxQueued: 0,
463226a4 934 sequentiallyStolen: 0,
68cbdc84 935 stolen: 0,
3a502712 936 failed: 0,
a4e07f72
JB
937 },
938 runTime: {
3a502712 939 history: expect.any(CircularBuffer),
a4e07f72
JB
940 },
941 waitTime: {
3a502712 942 history: expect.any(CircularBuffer),
a4e07f72 943 },
5df69fab
JB
944 elu: {
945 idle: {
3a502712 946 history: expect.any(CircularBuffer),
5df69fab
JB
947 },
948 active: {
3a502712
JB
949 history: expect.any(CircularBuffer),
950 },
951 },
86bf340d 952 })
bf9549ae
JB
953 }
954 await Promise.all(promises)
f06e48d8 955 for (const workerNode of pool.workerNodes) {
465b2940 956 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
957 tasks: {
958 executed: maxMultiplier,
959 executing: 0,
960 queued: 0,
df593701 961 maxQueued: 0,
463226a4 962 sequentiallyStolen: 0,
68cbdc84 963 stolen: 0,
3a502712 964 failed: 0,
a4e07f72
JB
965 },
966 runTime: {
3a502712 967 history: expect.any(CircularBuffer),
a4e07f72
JB
968 },
969 waitTime: {
3a502712 970 history: expect.any(CircularBuffer),
a4e07f72 971 },
5df69fab
JB
972 elu: {
973 idle: {
3a502712 974 history: expect.any(CircularBuffer),
5df69fab
JB
975 },
976 active: {
3a502712
JB
977 history: expect.any(CircularBuffer),
978 },
979 },
86bf340d 980 })
bf9549ae 981 }
fd7ebd49 982 await pool.destroy()
bf9549ae
JB
983 })
984
bcfb06ce 985 it("Verify that pool worker tasks usage aren't reset at worker choice strategy change", async () => {
7fd82a1c 986 const pool = new DynamicThreadPool(
2431bdb4 987 Math.floor(numberOfWorkers / 2),
8f4878b7 988 numberOfWorkers,
b2fd3f4a 989 './tests/worker-files/thread/testWorker.mjs'
9e619829 990 )
09c2d0d3 991 const promises = new Set()
ee9f5295
JB
992 const maxMultiplier = 2
993 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
09c2d0d3 994 promises.add(pool.execute())
9e619829
JB
995 }
996 await Promise.all(promises)
f06e48d8 997 for (const workerNode of pool.workerNodes) {
465b2940 998 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
999 tasks: {
1000 executed: expect.any(Number),
1001 executing: 0,
1002 queued: 0,
df593701 1003 maxQueued: 0,
463226a4 1004 sequentiallyStolen: 0,
68cbdc84 1005 stolen: 0,
3a502712 1006 failed: 0,
a4e07f72
JB
1007 },
1008 runTime: {
3a502712 1009 history: expect.any(CircularBuffer),
a4e07f72
JB
1010 },
1011 waitTime: {
3a502712 1012 history: expect.any(CircularBuffer),
a4e07f72 1013 },
5df69fab
JB
1014 elu: {
1015 idle: {
3a502712 1016 history: expect.any(CircularBuffer),
5df69fab
JB
1017 },
1018 active: {
3a502712
JB
1019 history: expect.any(CircularBuffer),
1020 },
1021 },
86bf340d 1022 })
465b2940 1023 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
94407def
JB
1024 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1025 numberOfWorkers * maxMultiplier
1026 )
9e619829
JB
1027 }
1028 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
f06e48d8 1029 for (const workerNode of pool.workerNodes) {
465b2940 1030 expect(workerNode.usage).toStrictEqual({
a4e07f72 1031 tasks: {
bcfb06ce 1032 executed: expect.any(Number),
a4e07f72
JB
1033 executing: 0,
1034 queued: 0,
df593701 1035 maxQueued: 0,
463226a4 1036 sequentiallyStolen: 0,
68cbdc84 1037 stolen: 0,
3a502712 1038 failed: 0,
a4e07f72
JB
1039 },
1040 runTime: {
3a502712 1041 history: expect.any(CircularBuffer),
a4e07f72
JB
1042 },
1043 waitTime: {
3a502712 1044 history: expect.any(CircularBuffer),
a4e07f72 1045 },
5df69fab
JB
1046 elu: {
1047 idle: {
3a502712 1048 history: expect.any(CircularBuffer),
5df69fab
JB
1049 },
1050 active: {
3a502712
JB
1051 history: expect.any(CircularBuffer),
1052 },
1053 },
86bf340d 1054 })
bcfb06ce
JB
1055 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
1056 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1057 numberOfWorkers * maxMultiplier
1058 )
ee11a4a2 1059 }
fd7ebd49 1060 await pool.destroy()
ee11a4a2
JB
1061 })
1062
a1763c54
JB
1063 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1064 const pool = new DynamicClusterPool(
2431bdb4 1065 Math.floor(numberOfWorkers / 2),
164d950a 1066 numberOfWorkers,
d35e5717 1067 './tests/worker-files/cluster/testWorker.cjs'
164d950a 1068 )
c726f66c 1069 expect(pool.emitter.eventNames()).toStrictEqual([])
d46660cd 1070 let poolInfo
a1763c54 1071 let poolReady = 0
041dc05b 1072 pool.emitter.on(PoolEvents.ready, info => {
a1763c54 1073 ++poolReady
d46660cd
JB
1074 poolInfo = info
1075 })
a1763c54 1076 await waitPoolEvents(pool, PoolEvents.ready, 1)
c726f66c 1077 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
a1763c54 1078 expect(poolReady).toBe(1)
d46660cd 1079 expect(poolInfo).toStrictEqual({
23ccf9d7 1080 version,
d46660cd 1081 type: PoolTypes.dynamic,
a1763c54 1082 worker: WorkerTypes.cluster,
47352846 1083 started: true,
a1763c54 1084 ready: true,
bcfb06ce 1085 defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
0e8587d2 1086 strategyRetries: expect.any(Number),
2431bdb4
JB
1087 minSize: expect.any(Number),
1088 maxSize: expect.any(Number),
1089 workerNodes: expect.any(Number),
1090 idleWorkerNodes: expect.any(Number),
1091 busyWorkerNodes: expect.any(Number),
1092 executedTasks: expect.any(Number),
1093 executingTasks: expect.any(Number),
3a502712 1094 failedTasks: expect.any(Number),
2431bdb4
JB
1095 })
1096 await pool.destroy()
1097 })
1098
a1763c54
JB
1099 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1100 const pool = new FixedThreadPool(
2431bdb4 1101 numberOfWorkers,
b2fd3f4a 1102 './tests/worker-files/thread/testWorker.mjs'
2431bdb4 1103 )
c726f66c 1104 expect(pool.emitter.eventNames()).toStrictEqual([])
a1763c54
JB
1105 const promises = new Set()
1106 let poolBusy = 0
2431bdb4 1107 let poolInfo
041dc05b 1108 pool.emitter.on(PoolEvents.busy, info => {
a1763c54 1109 ++poolBusy
2431bdb4
JB
1110 poolInfo = info
1111 })
c726f66c 1112 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
a1763c54
JB
1113 for (let i = 0; i < numberOfWorkers * 2; i++) {
1114 promises.add(pool.execute())
1115 }
1116 await Promise.all(promises)
1117 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1118 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1119 expect(poolBusy).toBe(numberOfWorkers + 1)
2431bdb4
JB
1120 expect(poolInfo).toStrictEqual({
1121 version,
a1763c54
JB
1122 type: PoolTypes.fixed,
1123 worker: WorkerTypes.thread,
47352846
JB
1124 started: true,
1125 ready: true,
bcfb06ce 1126 defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
0e8587d2 1127 strategyRetries: expect.any(Number),
d46660cd
JB
1128 minSize: expect.any(Number),
1129 maxSize: expect.any(Number),
1130 workerNodes: expect.any(Number),
1131 idleWorkerNodes: expect.any(Number),
1132 busyWorkerNodes: expect.any(Number),
a4e07f72
JB
1133 executedTasks: expect.any(Number),
1134 executingTasks: expect.any(Number),
3a502712 1135 failedTasks: expect.any(Number),
d46660cd 1136 })
164d950a
JB
1137 await pool.destroy()
1138 })
1139
a1763c54
JB
1140 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1141 const pool = new DynamicThreadPool(
1142 Math.floor(numberOfWorkers / 2),
7c0ba920 1143 numberOfWorkers,
b2fd3f4a 1144 './tests/worker-files/thread/testWorker.mjs'
7c0ba920 1145 )
c726f66c 1146 expect(pool.emitter.eventNames()).toStrictEqual([])
09c2d0d3 1147 const promises = new Set()
a1763c54 1148 let poolFull = 0
d46660cd 1149 let poolInfo
041dc05b 1150 pool.emitter.on(PoolEvents.full, info => {
a1763c54 1151 ++poolFull
d46660cd
JB
1152 poolInfo = info
1153 })
c726f66c 1154 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.full])
7c0ba920 1155 for (let i = 0; i < numberOfWorkers * 2; i++) {
f5d14e90 1156 promises.add(pool.execute())
7c0ba920 1157 }
cf597bc5 1158 await Promise.all(promises)
33e6bb4c 1159 expect(poolFull).toBe(1)
d46660cd 1160 expect(poolInfo).toStrictEqual({
23ccf9d7 1161 version,
a1763c54 1162 type: PoolTypes.dynamic,
d46660cd 1163 worker: WorkerTypes.thread,
47352846
JB
1164 started: true,
1165 ready: true,
bcfb06ce 1166 defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
0e8587d2 1167 strategyRetries: expect.any(Number),
8735b4e5
JB
1168 minSize: expect.any(Number),
1169 maxSize: expect.any(Number),
1170 workerNodes: expect.any(Number),
1171 idleWorkerNodes: expect.any(Number),
1172 busyWorkerNodes: expect.any(Number),
1173 executedTasks: expect.any(Number),
1174 executingTasks: expect.any(Number),
3a502712 1175 failedTasks: expect.any(Number),
8735b4e5
JB
1176 })
1177 await pool.destroy()
1178 })
1179
3e8611a8 1180 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
b1aae695 1181 const pool = new FixedThreadPool(
8735b4e5 1182 numberOfWorkers,
b2fd3f4a 1183 './tests/worker-files/thread/testWorker.mjs',
8735b4e5 1184 {
3a502712 1185 enableTasksQueue: true,
8735b4e5
JB
1186 }
1187 )
a074ffee 1188 stub(pool, 'hasBackPressure').returns(true)
c726f66c 1189 expect(pool.emitter.eventNames()).toStrictEqual([])
8735b4e5
JB
1190 const promises = new Set()
1191 let poolBackPressure = 0
1192 let poolInfo
041dc05b 1193 pool.emitter.on(PoolEvents.backPressure, info => {
8735b4e5
JB
1194 ++poolBackPressure
1195 poolInfo = info
1196 })
c726f66c 1197 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.backPressure])
033f1776 1198 for (let i = 0; i < numberOfWorkers + 1; i++) {
8735b4e5
JB
1199 promises.add(pool.execute())
1200 }
1201 await Promise.all(promises)
033f1776 1202 expect(poolBackPressure).toBe(1)
8735b4e5
JB
1203 expect(poolInfo).toStrictEqual({
1204 version,
3e8611a8 1205 type: PoolTypes.fixed,
8735b4e5 1206 worker: WorkerTypes.thread,
47352846
JB
1207 started: true,
1208 ready: true,
bcfb06ce 1209 defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
0e8587d2 1210 strategyRetries: expect.any(Number),
d46660cd
JB
1211 minSize: expect.any(Number),
1212 maxSize: expect.any(Number),
1213 workerNodes: expect.any(Number),
1214 idleWorkerNodes: expect.any(Number),
5eb72b9e 1215 stealingWorkerNodes: expect.any(Number),
d46660cd 1216 busyWorkerNodes: expect.any(Number),
a4e07f72
JB
1217 executedTasks: expect.any(Number),
1218 executingTasks: expect.any(Number),
3e8611a8
JB
1219 maxQueuedTasks: expect.any(Number),
1220 queuedTasks: expect.any(Number),
1221 backPressure: true,
68cbdc84 1222 stolenTasks: expect.any(Number),
3a502712 1223 failedTasks: expect.any(Number),
d46660cd 1224 })
5eb72b9e 1225 expect(pool.hasBackPressure.callCount).toBeGreaterThanOrEqual(7)
fd7ebd49 1226 await pool.destroy()
7c0ba920 1227 })
70a4f5ea 1228
85b2561d
JB
1229 it('Verify that destroy() waits for queued tasks to finish', async () => {
1230 const tasksFinishedTimeout = 2500
1231 const pool = new FixedThreadPool(
1232 numberOfWorkers,
1233 './tests/worker-files/thread/asyncWorker.mjs',
1234 {
1235 enableTasksQueue: true,
3a502712 1236 tasksQueueOptions: { tasksFinishedTimeout },
85b2561d
JB
1237 }
1238 )
1239 const maxMultiplier = 4
1240 let tasksFinished = 0
1241 for (const workerNode of pool.workerNodes) {
1242 workerNode.on('taskFinished', () => {
1243 ++tasksFinished
1244 })
1245 }
1246 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1247 pool.execute()
1248 }
1249 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1250 const startTime = performance.now()
1251 await pool.destroy()
1252 const elapsedTime = performance.now() - startTime
90afa746 1253 expect(tasksFinished).toBeLessThanOrEqual(numberOfWorkers * maxMultiplier)
85b2561d 1254 expect(elapsedTime).toBeGreaterThanOrEqual(2000)
14d5e183 1255 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 800)
85b2561d
JB
1256 })
1257
1258 it('Verify that destroy() waits until the tasks finished timeout is reached', async () => {
1259 const tasksFinishedTimeout = 1000
1260 const pool = new FixedThreadPool(
1261 numberOfWorkers,
1262 './tests/worker-files/thread/asyncWorker.mjs',
1263 {
1264 enableTasksQueue: true,
3a502712 1265 tasksQueueOptions: { tasksFinishedTimeout },
85b2561d
JB
1266 }
1267 )
1268 const maxMultiplier = 4
1269 let tasksFinished = 0
1270 for (const workerNode of pool.workerNodes) {
1271 workerNode.on('taskFinished', () => {
1272 ++tasksFinished
1273 })
1274 }
1275 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1276 pool.execute()
1277 }
1278 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1279 const startTime = performance.now()
1280 await pool.destroy()
1281 const elapsedTime = performance.now() - startTime
1282 expect(tasksFinished).toBe(0)
2885534c 1283 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 800)
85b2561d
JB
1284 })
1285
f18fd12b
JB
1286 it('Verify that pool asynchronous resource track tasks execution', async () => {
1287 let taskAsyncId
1288 let initCalls = 0
1289 let beforeCalls = 0
1290 let afterCalls = 0
1291 let resolveCalls = 0
1292 const hook = createHook({
1293 init (asyncId, type) {
1294 if (type === 'poolifier:task') {
1295 initCalls++
1296 taskAsyncId = asyncId
1297 }
1298 },
1299 before (asyncId) {
1300 if (asyncId === taskAsyncId) beforeCalls++
1301 },
1302 after (asyncId) {
1303 if (asyncId === taskAsyncId) afterCalls++
1304 },
1305 promiseResolve () {
1306 if (executionAsyncId() === taskAsyncId) resolveCalls++
3a502712 1307 },
f18fd12b 1308 })
f18fd12b
JB
1309 const pool = new FixedThreadPool(
1310 numberOfWorkers,
1311 './tests/worker-files/thread/testWorker.mjs'
1312 )
8954c0a3 1313 hook.enable()
f18fd12b
JB
1314 await pool.execute()
1315 hook.disable()
1316 expect(initCalls).toBe(1)
1317 expect(beforeCalls).toBe(1)
1318 expect(afterCalls).toBe(1)
1319 expect(resolveCalls).toBe(1)
8954c0a3 1320 await pool.destroy()
f18fd12b
JB
1321 })
1322
9eae3c69
JB
1323 it('Verify that hasTaskFunction() is working', async () => {
1324 const dynamicThreadPool = new DynamicThreadPool(
1325 Math.floor(numberOfWorkers / 2),
1326 numberOfWorkers,
b2fd3f4a 1327 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
9eae3c69
JB
1328 )
1329 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1330 expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1331 expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1332 true
1333 )
1334 expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true)
1335 expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true)
1336 expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false)
1337 await dynamicThreadPool.destroy()
1338 const fixedClusterPool = new FixedClusterPool(
1339 numberOfWorkers,
d35e5717 1340 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs'
9eae3c69
JB
1341 )
1342 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1343 expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1344 expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1345 true
1346 )
1347 expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true)
1348 expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true)
1349 expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false)
1350 await fixedClusterPool.destroy()
1351 })
1352
1353 it('Verify that addTaskFunction() is working', async () => {
1354 const dynamicThreadPool = new DynamicThreadPool(
1355 Math.floor(numberOfWorkers / 2),
1356 numberOfWorkers,
b2fd3f4a 1357 './tests/worker-files/thread/testWorker.mjs'
9eae3c69
JB
1358 )
1359 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
3feeab69
JB
1360 await expect(
1361 dynamicThreadPool.addTaskFunction(0, () => {})
948faff7 1362 ).rejects.toThrow(new TypeError('name argument must be a string'))
3feeab69
JB
1363 await expect(
1364 dynamicThreadPool.addTaskFunction('', () => {})
948faff7 1365 ).rejects.toThrow(
3feeab69
JB
1366 new TypeError('name argument must not be an empty string')
1367 )
948faff7 1368 await expect(dynamicThreadPool.addTaskFunction('test', 0)).rejects.toThrow(
31847469 1369 new TypeError('taskFunction property must be a function')
948faff7
JB
1370 )
1371 await expect(dynamicThreadPool.addTaskFunction('test', '')).rejects.toThrow(
31847469 1372 new TypeError('taskFunction property must be a function')
948faff7 1373 )
19c42d90
JB
1374 await expect(
1375 dynamicThreadPool.addTaskFunction('test', { taskFunction: 0 })
1376 ).rejects.toThrow(new TypeError('taskFunction property must be a function'))
1377 await expect(
1378 dynamicThreadPool.addTaskFunction('test', { taskFunction: '' })
1379 ).rejects.toThrow(new TypeError('taskFunction property must be a function'))
1380 await expect(
1381 dynamicThreadPool.addTaskFunction('test', {
1382 taskFunction: () => {},
3a502712 1383 priority: -21,
19c42d90
JB
1384 })
1385 ).rejects.toThrow(
1386 new RangeError("Property 'priority' must be between -20 and 19")
1387 )
1388 await expect(
1389 dynamicThreadPool.addTaskFunction('test', {
1390 taskFunction: () => {},
3a502712 1391 priority: 20,
19c42d90
JB
1392 })
1393 ).rejects.toThrow(
1394 new RangeError("Property 'priority' must be between -20 and 19")
1395 )
1396 await expect(
1397 dynamicThreadPool.addTaskFunction('test', {
1398 taskFunction: () => {},
3a502712 1399 strategy: 'invalidStrategy',
19c42d90
JB
1400 })
1401 ).rejects.toThrow(
1402 new Error("Invalid worker choice strategy 'invalidStrategy'")
1403 )
31847469
JB
1404 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1405 { name: DEFAULT_TASK_NAME },
3a502712 1406 { name: 'test' },
9eae3c69 1407 ])
f7a08a34 1408 expect([
3a502712 1409 ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys(),
f7a08a34 1410 ]).toStrictEqual([WorkerChoiceStrategies.ROUND_ROBIN])
9eae3c69
JB
1411 const echoTaskFunction = data => {
1412 return data
1413 }
1414 await expect(
f7a08a34
JB
1415 dynamicThreadPool.addTaskFunction('echo', {
1416 taskFunction: echoTaskFunction,
3a502712 1417 strategy: WorkerChoiceStrategies.LEAST_ELU,
f7a08a34 1418 })
9eae3c69
JB
1419 ).resolves.toBe(true)
1420 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
31847469 1421 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual({
f7a08a34 1422 taskFunction: echoTaskFunction,
3a502712 1423 strategy: WorkerChoiceStrategies.LEAST_ELU,
31847469 1424 })
f7a08a34 1425 expect([
3a502712 1426 ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys(),
f7a08a34
JB
1427 ]).toStrictEqual([
1428 WorkerChoiceStrategies.ROUND_ROBIN,
3a502712 1429 WorkerChoiceStrategies.LEAST_ELU,
f7a08a34 1430 ])
31847469
JB
1431 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1432 { name: DEFAULT_TASK_NAME },
1433 { name: 'test' },
3a502712 1434 { name: 'echo', strategy: WorkerChoiceStrategies.LEAST_ELU },
9eae3c69
JB
1435 ])
1436 const taskFunctionData = { test: 'test' }
1437 const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
1438 expect(echoResult).toStrictEqual(taskFunctionData)
adee6053
JB
1439 for (const workerNode of dynamicThreadPool.workerNodes) {
1440 expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
1441 tasks: {
1442 executed: expect.any(Number),
1443 executing: 0,
1444 queued: 0,
463226a4 1445 sequentiallyStolen: 0,
5ad42e34 1446 stolen: 0,
3a502712 1447 failed: 0,
adee6053
JB
1448 },
1449 runTime: {
3a502712 1450 history: expect.any(CircularBuffer),
adee6053
JB
1451 },
1452 waitTime: {
3a502712 1453 history: expect.any(CircularBuffer),
adee6053 1454 },
9a55fa8c
JB
1455 elu: expect.objectContaining({
1456 idle: expect.objectContaining({
3a502712 1457 history: expect.any(CircularBuffer),
9a55fa8c
JB
1458 }),
1459 active: expect.objectContaining({
3a502712
JB
1460 history: expect.any(CircularBuffer),
1461 }),
1462 }),
adee6053 1463 })
9a55fa8c
JB
1464 expect(
1465 workerNode.getTaskFunctionWorkerUsage('echo').tasks.executed
1466 ).toBeGreaterThan(0)
1467 if (
1468 workerNode.getTaskFunctionWorkerUsage('echo').elu.active.aggregate ==
1469 null
1470 ) {
1471 expect(
1472 workerNode.getTaskFunctionWorkerUsage('echo').elu.active.aggregate
1473 ).toBeUndefined()
1474 } else {
1475 expect(
1476 workerNode.getTaskFunctionWorkerUsage('echo').elu.active.aggregate
1477 ).toBeGreaterThan(0)
1478 }
1479 if (
1480 workerNode.getTaskFunctionWorkerUsage('echo').elu.idle.aggregate == null
1481 ) {
1482 expect(
1483 workerNode.getTaskFunctionWorkerUsage('echo').elu.idle.aggregate
1484 ).toBeUndefined()
1485 } else {
1486 expect(
1487 workerNode.getTaskFunctionWorkerUsage('echo').elu.idle.aggregate
1488 ).toBeGreaterThanOrEqual(0)
1489 }
1490 if (
1491 workerNode.getTaskFunctionWorkerUsage('echo').elu.utilization == null
1492 ) {
1493 expect(
1494 workerNode.getTaskFunctionWorkerUsage('echo').elu.utilization
1495 ).toBeUndefined()
1496 } else {
1497 expect(
1498 workerNode.getTaskFunctionWorkerUsage('echo').elu.utilization
1499 ).toBeGreaterThanOrEqual(0)
1500 expect(
1501 workerNode.getTaskFunctionWorkerUsage('echo').elu.utilization
1502 ).toBeLessThanOrEqual(1)
1503 }
adee6053 1504 }
9eae3c69
JB
1505 await dynamicThreadPool.destroy()
1506 })
1507
1508 it('Verify that removeTaskFunction() is working', async () => {
1509 const dynamicThreadPool = new DynamicThreadPool(
1510 Math.floor(numberOfWorkers / 2),
1511 numberOfWorkers,
b2fd3f4a 1512 './tests/worker-files/thread/testWorker.mjs'
9eae3c69
JB
1513 )
1514 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
31847469
JB
1515 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1516 { name: DEFAULT_TASK_NAME },
3a502712 1517 { name: 'test' },
9eae3c69 1518 ])
948faff7 1519 await expect(dynamicThreadPool.removeTaskFunction('test')).rejects.toThrow(
16248b23 1520 new Error('Cannot remove a task function not handled on the pool side')
9eae3c69
JB
1521 )
1522 const echoTaskFunction = data => {
1523 return data
1524 }
f7a08a34
JB
1525 await dynamicThreadPool.addTaskFunction('echo', {
1526 taskFunction: echoTaskFunction,
3a502712 1527 strategy: WorkerChoiceStrategies.LEAST_ELU,
f7a08a34 1528 })
9eae3c69 1529 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
31847469 1530 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual({
f7a08a34 1531 taskFunction: echoTaskFunction,
3a502712 1532 strategy: WorkerChoiceStrategies.LEAST_ELU,
31847469 1533 })
f7a08a34 1534 expect([
3a502712 1535 ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys(),
f7a08a34
JB
1536 ]).toStrictEqual([
1537 WorkerChoiceStrategies.ROUND_ROBIN,
3a502712 1538 WorkerChoiceStrategies.LEAST_ELU,
f7a08a34 1539 ])
31847469
JB
1540 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1541 { name: DEFAULT_TASK_NAME },
1542 { name: 'test' },
3a502712 1543 { name: 'echo', strategy: WorkerChoiceStrategies.LEAST_ELU },
9eae3c69
JB
1544 ])
1545 await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
1546 true
1547 )
1548 expect(dynamicThreadPool.taskFunctions.size).toBe(0)
1549 expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
f7a08a34 1550 expect([
3a502712 1551 ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys(),
f7a08a34 1552 ]).toStrictEqual([WorkerChoiceStrategies.ROUND_ROBIN])
31847469
JB
1553 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1554 { name: DEFAULT_TASK_NAME },
3a502712 1555 { name: 'test' },
9eae3c69
JB
1556 ])
1557 await dynamicThreadPool.destroy()
1558 })
1559
f7a08a34 1560 it('Verify that listTaskFunctionsProperties() is working', async () => {
90d7d101
JB
1561 const dynamicThreadPool = new DynamicThreadPool(
1562 Math.floor(numberOfWorkers / 2),
1563 numberOfWorkers,
b2fd3f4a 1564 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
90d7d101
JB
1565 )
1566 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
31847469
JB
1567 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1568 { name: DEFAULT_TASK_NAME },
1569 { name: 'jsonIntegerSerialization' },
1570 { name: 'factorial' },
3a502712 1571 { name: 'fibonacci' },
90d7d101 1572 ])
9eae3c69 1573 await dynamicThreadPool.destroy()
90d7d101
JB
1574 const fixedClusterPool = new FixedClusterPool(
1575 numberOfWorkers,
d35e5717 1576 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs'
90d7d101
JB
1577 )
1578 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
31847469
JB
1579 expect(fixedClusterPool.listTaskFunctionsProperties()).toStrictEqual([
1580 { name: DEFAULT_TASK_NAME },
1581 { name: 'jsonIntegerSerialization' },
1582 { name: 'factorial' },
3a502712 1583 { name: 'fibonacci' },
90d7d101 1584 ])
0fe39c97 1585 await fixedClusterPool.destroy()
90d7d101
JB
1586 })
1587
9eae3c69 1588 it('Verify that setDefaultTaskFunction() is working', async () => {
30500265
JB
1589 const dynamicThreadPool = new DynamicThreadPool(
1590 Math.floor(numberOfWorkers / 2),
1591 numberOfWorkers,
b2fd3f4a 1592 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
30500265
JB
1593 )
1594 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
711623b8 1595 const workerId = dynamicThreadPool.workerNodes[0].info.id
948faff7 1596 await expect(dynamicThreadPool.setDefaultTaskFunction(0)).rejects.toThrow(
b0b55f57 1597 new Error(
711623b8 1598 `Task function operation 'default' failed on worker ${workerId} with error: 'TypeError: name parameter is not a string'`
b0b55f57
JB
1599 )
1600 )
1601 await expect(
1602 dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
948faff7 1603 ).rejects.toThrow(
b0b55f57 1604 new Error(
711623b8 1605 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function reserved name as the default task function'`
b0b55f57
JB
1606 )
1607 )
1608 await expect(
1609 dynamicThreadPool.setDefaultTaskFunction('unknown')
948faff7 1610 ).rejects.toThrow(
b0b55f57 1611 new Error(
711623b8 1612 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function to a non-existing task function'`
b0b55f57
JB
1613 )
1614 )
31847469
JB
1615 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1616 { name: DEFAULT_TASK_NAME },
1617 { name: 'jsonIntegerSerialization' },
1618 { name: 'factorial' },
3a502712 1619 { name: 'fibonacci' },
9eae3c69
JB
1620 ])
1621 await expect(
1622 dynamicThreadPool.setDefaultTaskFunction('factorial')
1623 ).resolves.toBe(true)
31847469
JB
1624 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1625 { name: DEFAULT_TASK_NAME },
1626 { name: 'factorial' },
1627 { name: 'jsonIntegerSerialization' },
3a502712 1628 { name: 'fibonacci' },
9eae3c69
JB
1629 ])
1630 await expect(
1631 dynamicThreadPool.setDefaultTaskFunction('fibonacci')
1632 ).resolves.toBe(true)
31847469
JB
1633 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1634 { name: DEFAULT_TASK_NAME },
1635 { name: 'fibonacci' },
1636 { name: 'jsonIntegerSerialization' },
3a502712 1637 { name: 'factorial' },
9eae3c69 1638 ])
cda9ba34 1639 await dynamicThreadPool.destroy()
30500265
JB
1640 })
1641
90d7d101 1642 it('Verify that multiple task functions worker is working', async () => {
70a4f5ea 1643 const pool = new DynamicClusterPool(
2431bdb4 1644 Math.floor(numberOfWorkers / 2),
70a4f5ea 1645 numberOfWorkers,
d35e5717 1646 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs'
70a4f5ea
JB
1647 )
1648 const data = { n: 10 }
82888165 1649 const result0 = await pool.execute(data)
30b963d4 1650 expect(result0).toStrictEqual({ ok: 1 })
70a4f5ea 1651 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
30b963d4 1652 expect(result1).toStrictEqual({ ok: 1 })
70a4f5ea
JB
1653 const result2 = await pool.execute(data, 'factorial')
1654 expect(result2).toBe(3628800)
1655 const result3 = await pool.execute(data, 'fibonacci')
024daf59 1656 expect(result3).toBe(55)
5bb5be17
JB
1657 expect(pool.info.executingTasks).toBe(0)
1658 expect(pool.info.executedTasks).toBe(4)
b414b84c 1659 for (const workerNode of pool.workerNodes) {
31847469
JB
1660 expect(workerNode.info.taskFunctionsProperties).toStrictEqual([
1661 { name: DEFAULT_TASK_NAME },
1662 { name: 'jsonIntegerSerialization' },
1663 { name: 'factorial' },
3a502712 1664 { name: 'fibonacci' },
b414b84c
JB
1665 ])
1666 expect(workerNode.taskFunctionsUsage.size).toBe(3)
74111331 1667 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
fcfc3353 1668 expect(workerNode.tasksQueue.enablePriority).toBe(false)
31847469
JB
1669 for (const taskFunctionProperties of pool.listTaskFunctionsProperties()) {
1670 expect(
1671 workerNode.getTaskFunctionWorkerUsage(taskFunctionProperties.name)
1672 ).toStrictEqual({
5bb5be17
JB
1673 tasks: {
1674 executed: expect.any(Number),
4ba4c7f9 1675 executing: 0,
5bb5be17 1676 failed: 0,
68cbdc84 1677 queued: 0,
463226a4 1678 sequentiallyStolen: 0,
3a502712 1679 stolen: 0,
5bb5be17
JB
1680 },
1681 runTime: {
3a502712 1682 history: expect.any(CircularBuffer),
5bb5be17
JB
1683 },
1684 waitTime: {
3a502712 1685 history: expect.any(CircularBuffer),
5bb5be17
JB
1686 },
1687 elu: {
1688 idle: {
3a502712 1689 history: expect.any(CircularBuffer),
5bb5be17
JB
1690 },
1691 active: {
3a502712
JB
1692 history: expect.any(CircularBuffer),
1693 },
1694 },
5bb5be17
JB
1695 })
1696 expect(
31847469
JB
1697 workerNode.getTaskFunctionWorkerUsage(taskFunctionProperties.name)
1698 .tasks.executed
4ba4c7f9 1699 ).toBeGreaterThan(0)
5bb5be17 1700 }
dfd7ec01
JB
1701 expect(
1702 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
8139fbbc
JB
1703 ).toStrictEqual(
1704 workerNode.getTaskFunctionWorkerUsage(
1705 workerNode.info.taskFunctionsProperties[1].name
1706 )
1707 )
1708 }
1709 await pool.destroy()
1710 })
1711
a390c10d 1712 it('Verify that mapExecute() is working', async () => {
2155d8bb
JB
1713 const pool = new DynamicThreadPool(
1714 Math.floor(numberOfWorkers / 2),
1715 numberOfWorkers,
1716 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1717 )
fe6df285
JB
1718 expect(() => pool.mapExecute()).toThrow(
1719 new TypeError('data argument must be a defined iterable')
1720 )
1721 expect(() => pool.mapExecute(0)).toThrow(
1722 new TypeError('data argument must be an iterable')
1723 )
1789b1eb 1724 let results = await pool.mapExecute([{}, {}, {}, {}])
fe6df285 1725 expect(results).toStrictEqual([{ ok: 1 }, { ok: 1 }, { ok: 1 }, { ok: 1 }])
1789b1eb
JB
1726 expect(pool.info.executingTasks).toBe(0)
1727 expect(pool.info.executedTasks).toBe(4)
fe6df285
JB
1728 results = await pool.mapExecute(
1729 [{ n: 10 }, { n: 20 }, { n: 30 }, { n: 40 }],
1730 'factorial'
1731 )
2155d8bb 1732 expect(results).toStrictEqual([
fe6df285 1733 3628800, 2432902008176640000, 2.6525285981219103e32, 8.159152832478977e47,
2155d8bb
JB
1734 ])
1735 expect(pool.info.executingTasks).toBe(0)
1789b1eb 1736 expect(pool.info.executedTasks).toBe(8)
fe6df285
JB
1737 results = await pool.mapExecute(
1738 new Set([{ n: 10 }, { n: 20 }, { n: 30 }, { n: 40 }]),
1739 'factorial'
1740 )
27469db4 1741 expect(results).toStrictEqual([
fe6df285 1742 3628800, 2432902008176640000, 2.6525285981219103e32, 8.159152832478977e47,
27469db4
JB
1743 ])
1744 expect(pool.info.executingTasks).toBe(0)
1745 expect(pool.info.executedTasks).toBe(12)
2155d8bb
JB
1746 await pool.destroy()
1747 })
1748
8139fbbc
JB
1749 it('Verify that task function objects worker is working', async () => {
1750 const pool = new DynamicThreadPool(
1751 Math.floor(numberOfWorkers / 2),
1752 numberOfWorkers,
1753 './tests/worker-files/thread/testTaskFunctionObjectsWorker.mjs'
1754 )
1755 const data = { n: 10 }
1756 const result0 = await pool.execute(data)
1757 expect(result0).toStrictEqual({ ok: 1 })
1758 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
1759 expect(result1).toStrictEqual({ ok: 1 })
1760 const result2 = await pool.execute(data, 'factorial')
1761 expect(result2).toBe(3628800)
1762 const result3 = await pool.execute(data, 'fibonacci')
1763 expect(result3).toBe(55)
1764 expect(pool.info.executingTasks).toBe(0)
1765 expect(pool.info.executedTasks).toBe(4)
1766 for (const workerNode of pool.workerNodes) {
1767 expect(workerNode.info.taskFunctionsProperties).toStrictEqual([
1768 { name: DEFAULT_TASK_NAME },
1769 { name: 'jsonIntegerSerialization' },
1770 { name: 'factorial' },
3a502712 1771 { name: 'fibonacci', priority: -5 },
8139fbbc
JB
1772 ])
1773 expect(workerNode.taskFunctionsUsage.size).toBe(3)
1774 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
fcfc3353 1775 expect(workerNode.tasksQueue.enablePriority).toBe(true)
8139fbbc
JB
1776 for (const taskFunctionProperties of pool.listTaskFunctionsProperties()) {
1777 expect(
1778 workerNode.getTaskFunctionWorkerUsage(taskFunctionProperties.name)
1779 ).toStrictEqual({
1780 tasks: {
1781 executed: expect.any(Number),
1782 executing: 0,
1783 failed: 0,
1784 queued: 0,
1785 sequentiallyStolen: 0,
3a502712 1786 stolen: 0,
8139fbbc
JB
1787 },
1788 runTime: {
3a502712 1789 history: expect.any(CircularBuffer),
8139fbbc
JB
1790 },
1791 waitTime: {
3a502712 1792 history: expect.any(CircularBuffer),
8139fbbc
JB
1793 },
1794 elu: {
1795 idle: {
3a502712 1796 history: expect.any(CircularBuffer),
8139fbbc
JB
1797 },
1798 active: {
3a502712
JB
1799 history: expect.any(CircularBuffer),
1800 },
1801 },
8139fbbc
JB
1802 })
1803 expect(
1804 workerNode.getTaskFunctionWorkerUsage(taskFunctionProperties.name)
1805 .tasks.executed
1806 ).toBeGreaterThan(0)
1807 }
1808 expect(
1809 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
dfd7ec01 1810 ).toStrictEqual(
66979634 1811 workerNode.getTaskFunctionWorkerUsage(
31847469 1812 workerNode.info.taskFunctionsProperties[1].name
66979634 1813 )
dfd7ec01 1814 )
5bb5be17 1815 }
0fe39c97 1816 await pool.destroy()
70a4f5ea 1817 })
52a23942
JB
1818
1819 it('Verify sendKillMessageToWorker()', async () => {
1820 const pool = new DynamicClusterPool(
1821 Math.floor(numberOfWorkers / 2),
1822 numberOfWorkers,
d35e5717 1823 './tests/worker-files/cluster/testWorker.cjs'
52a23942
JB
1824 )
1825 const workerNodeKey = 0
1826 await expect(
adee6053 1827 pool.sendKillMessageToWorker(workerNodeKey)
52a23942
JB
1828 ).resolves.toBeUndefined()
1829 await pool.destroy()
1830 })
adee6053
JB
1831
1832 it('Verify sendTaskFunctionOperationToWorker()', async () => {
1833 const pool = new DynamicClusterPool(
1834 Math.floor(numberOfWorkers / 2),
1835 numberOfWorkers,
d35e5717 1836 './tests/worker-files/cluster/testWorker.cjs'
adee6053
JB
1837 )
1838 const workerNodeKey = 0
1839 await expect(
1840 pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
1841 taskFunctionOperation: 'add',
31847469 1842 taskFunctionProperties: { name: 'empty' },
3a502712 1843 taskFunction: (() => {}).toString(),
adee6053
JB
1844 })
1845 ).resolves.toBe(true)
1846 expect(
31847469
JB
1847 pool.workerNodes[workerNodeKey].info.taskFunctionsProperties
1848 ).toStrictEqual([
1849 { name: DEFAULT_TASK_NAME },
1850 { name: 'test' },
3a502712 1851 { name: 'empty' },
31847469 1852 ])
adee6053
JB
1853 await pool.destroy()
1854 })
1855
1856 it('Verify sendTaskFunctionOperationToWorkers()', async () => {
1857 const pool = new DynamicClusterPool(
1858 Math.floor(numberOfWorkers / 2),
1859 numberOfWorkers,
d35e5717 1860 './tests/worker-files/cluster/testWorker.cjs'
adee6053 1861 )
adee6053
JB
1862 await expect(
1863 pool.sendTaskFunctionOperationToWorkers({
1864 taskFunctionOperation: 'add',
31847469 1865 taskFunctionProperties: { name: 'empty' },
3a502712 1866 taskFunction: (() => {}).toString(),
adee6053
JB
1867 })
1868 ).resolves.toBe(true)
1869 for (const workerNode of pool.workerNodes) {
31847469
JB
1870 expect(workerNode.info.taskFunctionsProperties).toStrictEqual([
1871 { name: DEFAULT_TASK_NAME },
1872 { name: 'test' },
3a502712 1873 { name: 'empty' },
adee6053
JB
1874 ])
1875 }
1876 await pool.destroy()
1877 })
3ec964d6 1878})