Merge dependabot/npm_and_yarn/examples/typescript/websocket-server-pool/ws-worker_thr...
[poolifier.git] / tests / pools / abstract-pool.test.mjs
CommitLineData
a074ffee 1import { EventEmitterAsyncResource } from 'node:events'
2eb14889 2import { dirname, join } from 'node:path'
a074ffee 3import { readFileSync } from 'node:fs'
2eb14889 4import { fileURLToPath } from 'node:url'
f18fd12b 5import { createHook, executionAsyncId } from 'node:async_hooks'
a074ffee
JB
6import { expect } from 'expect'
7import { restore, stub } from 'sinon'
8import {
70a4f5ea 9 DynamicClusterPool,
9e619829 10 DynamicThreadPool,
aee46736 11 FixedClusterPool,
e843b904 12 FixedThreadPool,
aee46736 13 PoolEvents,
184855e6 14 PoolTypes,
3d6dd312 15 WorkerChoiceStrategies,
184855e6 16 WorkerTypes
d35e5717
JB
17} from '../../lib/index.cjs'
18import { CircularArray } from '../../lib/circular-array.cjs'
19import { Deque } from '../../lib/deque.cjs'
20import { DEFAULT_TASK_NAME } from '../../lib/utils.cjs'
21import { waitPoolEvents } from '../test-utils.cjs'
22import { WorkerNode } from '../../lib/pools/worker-node.cjs'
e1ffb94f
JB
23
24describe('Abstract pool test suite', () => {
2eb14889
JB
25 const version = JSON.parse(
26 readFileSync(
a5e5599c 27 join(dirname(fileURLToPath(import.meta.url)), '../..', 'package.json'),
2eb14889
JB
28 'utf8'
29 )
30 ).version
fc027381 31 const numberOfWorkers = 2
a8884ffd 32 class StubPoolWithIsMain extends FixedThreadPool {
e1ffb94f
JB
33 isMain () {
34 return false
35 }
3ec964d6 36 }
3ec964d6 37
dc021bcc 38 afterEach(() => {
a074ffee 39 restore()
dc021bcc
JB
40 })
41
bcf85f7f
JB
42 it('Verify that pool can be created and destroyed', async () => {
43 const pool = new FixedThreadPool(
44 numberOfWorkers,
45 './tests/worker-files/thread/testWorker.mjs'
46 )
47 expect(pool).toBeInstanceOf(FixedThreadPool)
48 await pool.destroy()
49 })
50
51 it('Verify that pool cannot be created from a non main thread/process', () => {
8d3782fa
JB
52 expect(
53 () =>
a8884ffd 54 new StubPoolWithIsMain(
7c0ba920 55 numberOfWorkers,
b2fd3f4a 56 './tests/worker-files/thread/testWorker.mjs',
8d3782fa 57 {
041dc05b 58 errorHandler: e => console.error(e)
8d3782fa
JB
59 }
60 )
948faff7 61 ).toThrow(
e695d66f
JB
62 new Error(
63 'Cannot start a pool from a worker with the same type as the pool'
64 )
04f45163 65 )
3ec964d6 66 })
c510fea7 67
bc61cfe6
JB
68 it('Verify that pool statuses properties are set', async () => {
69 const pool = new FixedThreadPool(
70 numberOfWorkers,
b2fd3f4a 71 './tests/worker-files/thread/testWorker.mjs'
bc61cfe6 72 )
bc61cfe6 73 expect(pool.started).toBe(true)
711623b8
JB
74 expect(pool.starting).toBe(false)
75 expect(pool.destroying).toBe(false)
bc61cfe6 76 await pool.destroy()
bc61cfe6
JB
77 })
78
c510fea7 79 it('Verify that filePath is checked', () => {
948faff7 80 expect(() => new FixedThreadPool(numberOfWorkers)).toThrow(
c3719753
JB
81 new TypeError('The worker file path must be specified')
82 )
83 expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrow(
84 new TypeError('The worker file path must be a string')
3d6dd312
JB
85 )
86 expect(
87 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
948faff7 88 ).toThrow(new Error("Cannot find the worker file './dummyWorker.ts'"))
8d3782fa
JB
89 })
90
91 it('Verify that numberOfWorkers is checked', () => {
8003c026
JB
92 expect(
93 () =>
94 new FixedThreadPool(
95 undefined,
b2fd3f4a 96 './tests/worker-files/thread/testWorker.mjs'
8003c026 97 )
948faff7 98 ).toThrow(
e695d66f
JB
99 new Error(
100 'Cannot instantiate a pool without specifying the number of workers'
101 )
8d3782fa
JB
102 )
103 })
104
105 it('Verify that a negative number of workers is checked', () => {
106 expect(
107 () =>
d35e5717 108 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.cjs')
948faff7 109 ).toThrow(
473c717a
JB
110 new RangeError(
111 'Cannot instantiate a pool with a negative number of workers'
112 )
8d3782fa
JB
113 )
114 })
115
116 it('Verify that a non integer number of workers is checked', () => {
117 expect(
118 () =>
b2fd3f4a 119 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.mjs')
948faff7 120 ).toThrow(
473c717a 121 new TypeError(
0d80593b 122 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
123 )
124 )
c510fea7 125 })
7c0ba920 126
26ce26ca
JB
127 it('Verify that pool arguments number and pool type are checked', () => {
128 expect(
129 () =>
130 new FixedThreadPool(
131 numberOfWorkers,
132 './tests/worker-files/thread/testWorker.mjs',
133 undefined,
134 numberOfWorkers * 2
135 )
136 ).toThrow(
137 new Error(
138 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
139 )
140 )
141 })
142
216541b6 143 it('Verify that dynamic pool sizing is checked', () => {
a5ed75b7
JB
144 expect(
145 () =>
146 new DynamicClusterPool(
147 1,
148 undefined,
d35e5717 149 './tests/worker-files/cluster/testWorker.cjs'
a5ed75b7 150 )
948faff7 151 ).toThrow(
a5ed75b7
JB
152 new TypeError(
153 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
154 )
155 )
2761efb4
JB
156 expect(
157 () =>
158 new DynamicThreadPool(
159 0.5,
160 1,
b2fd3f4a 161 './tests/worker-files/thread/testWorker.mjs'
2761efb4 162 )
948faff7 163 ).toThrow(
2761efb4
JB
164 new TypeError(
165 'Cannot instantiate a pool with a non safe integer number of workers'
166 )
167 )
168 expect(
169 () =>
170 new DynamicClusterPool(
171 0,
172 0.5,
d35e5717 173 './tests/worker-files/cluster/testWorker.cjs'
2761efb4 174 )
948faff7 175 ).toThrow(
2761efb4
JB
176 new TypeError(
177 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
178 )
179 )
2431bdb4
JB
180 expect(
181 () =>
b2fd3f4a
JB
182 new DynamicThreadPool(
183 2,
184 1,
185 './tests/worker-files/thread/testWorker.mjs'
186 )
948faff7 187 ).toThrow(
2431bdb4
JB
188 new RangeError(
189 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
190 )
191 )
192 expect(
193 () =>
b2fd3f4a
JB
194 new DynamicThreadPool(
195 0,
196 0,
197 './tests/worker-files/thread/testWorker.mjs'
198 )
948faff7 199 ).toThrow(
2431bdb4 200 new RangeError(
213cbac6 201 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
2431bdb4
JB
202 )
203 )
21f710aa
JB
204 expect(
205 () =>
213cbac6
JB
206 new DynamicClusterPool(
207 1,
208 1,
d35e5717 209 './tests/worker-files/cluster/testWorker.cjs'
213cbac6 210 )
948faff7 211 ).toThrow(
21f710aa 212 new RangeError(
213cbac6 213 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
21f710aa
JB
214 )
215 )
2431bdb4
JB
216 })
217
fd7ebd49 218 it('Verify that pool options are checked', async () => {
7c0ba920
JB
219 let pool = new FixedThreadPool(
220 numberOfWorkers,
b2fd3f4a 221 './tests/worker-files/thread/testWorker.mjs'
7c0ba920 222 )
b5604034 223 expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
e44639e9 224 expect(pool.emitter.eventNames()).toStrictEqual([])
47352846
JB
225 expect(pool.opts).toStrictEqual({
226 startWorkers: true,
227 enableEvents: true,
228 restartWorkerOnError: true,
229 enableTasksQueue: false,
26ce26ca 230 workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN
8990357d 231 })
999ef664
JB
232 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
233 .workerChoiceStrategies) {
86cbb766 234 expect(workerChoiceStrategy.opts).toStrictEqual({
86cbb766
JB
235 runTime: { median: false },
236 waitTime: { median: false },
237 elu: { median: false },
238 weights: expect.objectContaining({
239 0: expect.any(Number),
240 [pool.info.maxSize - 1]: expect.any(Number)
00e1bdeb 241 })
86cbb766 242 })
999ef664 243 }
fd7ebd49 244 await pool.destroy()
73bfd59d 245 const testHandler = () => console.info('test handler executed')
7c0ba920
JB
246 pool = new FixedThreadPool(
247 numberOfWorkers,
b2fd3f4a 248 './tests/worker-files/thread/testWorker.mjs',
7c0ba920 249 {
e4543b14 250 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
49be33fe 251 workerChoiceStrategyOptions: {
932fc8be 252 runTime: { median: true },
fc027381 253 weights: { 0: 300, 1: 200 }
49be33fe 254 },
35cf1c03 255 enableEvents: false,
1f68cede 256 restartWorkerOnError: false,
ff733df7 257 enableTasksQueue: true,
d4aeae5a 258 tasksQueueOptions: { concurrency: 2 },
35cf1c03
JB
259 messageHandler: testHandler,
260 errorHandler: testHandler,
261 onlineHandler: testHandler,
262 exitHandler: testHandler
7c0ba920
JB
263 }
264 )
7c0ba920 265 expect(pool.emitter).toBeUndefined()
47352846
JB
266 expect(pool.opts).toStrictEqual({
267 startWorkers: true,
268 enableEvents: false,
269 restartWorkerOnError: false,
270 enableTasksQueue: true,
271 tasksQueueOptions: {
272 concurrency: 2,
2324f8c9 273 size: Math.pow(numberOfWorkers, 2),
dbd73092 274 taskStealing: true,
32b141fd 275 tasksStealingOnBackPressure: true,
568d0075 276 tasksFinishedTimeout: 2000
47352846
JB
277 },
278 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
279 workerChoiceStrategyOptions: {
47352846 280 runTime: { median: true },
47352846
JB
281 weights: { 0: 300, 1: 200 }
282 },
283 onlineHandler: testHandler,
284 messageHandler: testHandler,
285 errorHandler: testHandler,
286 exitHandler: testHandler
8990357d 287 })
999ef664
JB
288 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
289 .workerChoiceStrategies) {
290 expect(workerChoiceStrategy.opts).toStrictEqual({
999ef664
JB
291 runTime: { median: true },
292 waitTime: { median: false },
293 elu: { median: false },
294 weights: { 0: 300, 1: 200 }
295 })
296 }
fd7ebd49 297 await pool.destroy()
7c0ba920
JB
298 })
299
fe291b64 300 it('Verify that pool options are validated', () => {
d4aeae5a
JB
301 expect(
302 () =>
303 new FixedThreadPool(
304 numberOfWorkers,
b2fd3f4a 305 './tests/worker-files/thread/testWorker.mjs',
d4aeae5a 306 {
f0d7f803 307 workerChoiceStrategy: 'invalidStrategy'
d4aeae5a
JB
308 }
309 )
948faff7 310 ).toThrow(new Error("Invalid worker choice strategy 'invalidStrategy'"))
49be33fe
JB
311 expect(
312 () =>
313 new FixedThreadPool(
314 numberOfWorkers,
b2fd3f4a 315 './tests/worker-files/thread/testWorker.mjs',
49be33fe
JB
316 {
317 workerChoiceStrategyOptions: { weights: {} }
318 }
319 )
948faff7 320 ).toThrow(
8735b4e5
JB
321 new Error(
322 'Invalid worker choice strategy options: must have a weight for each worker node'
323 )
49be33fe 324 )
f0d7f803
JB
325 expect(
326 () =>
327 new FixedThreadPool(
328 numberOfWorkers,
b2fd3f4a 329 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
330 {
331 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
332 }
333 )
948faff7 334 ).toThrow(
8735b4e5
JB
335 new Error(
336 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
337 )
f0d7f803 338 )
5c4c2dee
JB
339 expect(
340 () =>
341 new FixedThreadPool(
342 numberOfWorkers,
b2fd3f4a 343 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
344 {
345 enableTasksQueue: true,
346 tasksQueueOptions: 'invalidTasksQueueOptions'
347 }
348 )
948faff7 349 ).toThrow(
5c4c2dee
JB
350 new TypeError('Invalid tasks queue options: must be a plain object')
351 )
f0d7f803
JB
352 expect(
353 () =>
354 new FixedThreadPool(
355 numberOfWorkers,
b2fd3f4a 356 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
357 {
358 enableTasksQueue: true,
359 tasksQueueOptions: { concurrency: 0 }
360 }
361 )
948faff7 362 ).toThrow(
e695d66f 363 new RangeError(
20c6f652 364 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
8735b4e5
JB
365 )
366 )
f0d7f803
JB
367 expect(
368 () =>
369 new FixedThreadPool(
370 numberOfWorkers,
b2fd3f4a 371 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
372 {
373 enableTasksQueue: true,
5c4c2dee 374 tasksQueueOptions: { concurrency: -1 }
f0d7f803
JB
375 }
376 )
948faff7 377 ).toThrow(
5c4c2dee
JB
378 new RangeError(
379 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
380 )
8735b4e5 381 )
f0d7f803
JB
382 expect(
383 () =>
384 new FixedThreadPool(
385 numberOfWorkers,
b2fd3f4a 386 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
387 {
388 enableTasksQueue: true,
389 tasksQueueOptions: { concurrency: 0.2 }
390 }
391 )
948faff7 392 ).toThrow(
20c6f652 393 new TypeError('Invalid worker node tasks concurrency: must be an integer')
8735b4e5 394 )
5c4c2dee
JB
395 expect(
396 () =>
397 new FixedThreadPool(
398 numberOfWorkers,
b2fd3f4a 399 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
400 {
401 enableTasksQueue: true,
402 tasksQueueOptions: { size: 0 }
403 }
404 )
948faff7 405 ).toThrow(
5c4c2dee
JB
406 new RangeError(
407 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
408 )
409 )
410 expect(
411 () =>
412 new FixedThreadPool(
413 numberOfWorkers,
b2fd3f4a 414 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
415 {
416 enableTasksQueue: true,
417 tasksQueueOptions: { size: -1 }
418 }
419 )
948faff7 420 ).toThrow(
5c4c2dee
JB
421 new RangeError(
422 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
423 )
424 )
425 expect(
426 () =>
427 new FixedThreadPool(
428 numberOfWorkers,
b2fd3f4a 429 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
430 {
431 enableTasksQueue: true,
432 tasksQueueOptions: { size: 0.2 }
433 }
434 )
948faff7 435 ).toThrow(
5c4c2dee
JB
436 new TypeError('Invalid worker node tasks queue size: must be an integer')
437 )
d4aeae5a
JB
438 })
439
2431bdb4 440 it('Verify that pool worker choice strategy options can be set', async () => {
a20f0ba5
JB
441 const pool = new FixedThreadPool(
442 numberOfWorkers,
b2fd3f4a 443 './tests/worker-files/thread/testWorker.mjs',
a20f0ba5
JB
444 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
445 )
26ce26ca 446 expect(pool.opts.workerChoiceStrategyOptions).toBeUndefined()
a20f0ba5
JB
447 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
448 .workerChoiceStrategies) {
86cbb766 449 expect(workerChoiceStrategy.opts).toStrictEqual({
86cbb766
JB
450 runTime: { median: false },
451 waitTime: { median: false },
452 elu: { median: false },
453 weights: expect.objectContaining({
454 0: expect.any(Number),
455 [pool.info.maxSize - 1]: expect.any(Number)
00e1bdeb 456 })
86cbb766 457 })
a20f0ba5 458 }
87de9ff5
JB
459 expect(
460 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
461 ).toStrictEqual({
932fc8be
JB
462 runTime: {
463 aggregate: true,
464 average: true,
465 median: false
466 },
467 waitTime: {
468 aggregate: false,
469 average: false,
470 median: false
471 },
5df69fab 472 elu: {
9adcefab
JB
473 aggregate: true,
474 average: true,
5df69fab
JB
475 median: false
476 }
86bf340d 477 })
9adcefab
JB
478 pool.setWorkerChoiceStrategyOptions({
479 runTime: { median: true },
480 elu: { median: true }
481 })
a20f0ba5 482 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
9adcefab 483 runTime: { median: true },
8990357d
JB
484 elu: { median: true }
485 })
a20f0ba5
JB
486 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
487 .workerChoiceStrategies) {
86cbb766 488 expect(workerChoiceStrategy.opts).toStrictEqual({
86cbb766
JB
489 runTime: { median: true },
490 waitTime: { median: false },
491 elu: { median: true },
492 weights: expect.objectContaining({
493 0: expect.any(Number),
494 [pool.info.maxSize - 1]: expect.any(Number)
00e1bdeb 495 })
86cbb766 496 })
a20f0ba5 497 }
87de9ff5
JB
498 expect(
499 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
500 ).toStrictEqual({
932fc8be
JB
501 runTime: {
502 aggregate: true,
503 average: false,
504 median: true
505 },
506 waitTime: {
507 aggregate: false,
508 average: false,
509 median: false
510 },
5df69fab 511 elu: {
9adcefab 512 aggregate: true,
5df69fab 513 average: false,
9adcefab 514 median: true
5df69fab 515 }
86bf340d 516 })
9adcefab
JB
517 pool.setWorkerChoiceStrategyOptions({
518 runTime: { median: false },
519 elu: { median: false }
520 })
a20f0ba5 521 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
8990357d 522 runTime: { median: false },
8990357d
JB
523 elu: { median: false }
524 })
a20f0ba5
JB
525 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
526 .workerChoiceStrategies) {
86cbb766 527 expect(workerChoiceStrategy.opts).toStrictEqual({
86cbb766
JB
528 runTime: { median: false },
529 waitTime: { median: false },
530 elu: { median: false },
531 weights: expect.objectContaining({
532 0: expect.any(Number),
533 [pool.info.maxSize - 1]: expect.any(Number)
00e1bdeb 534 })
86cbb766 535 })
a20f0ba5 536 }
87de9ff5
JB
537 expect(
538 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
539 ).toStrictEqual({
932fc8be
JB
540 runTime: {
541 aggregate: true,
542 average: true,
543 median: false
544 },
545 waitTime: {
546 aggregate: false,
547 average: false,
548 median: false
549 },
5df69fab 550 elu: {
9adcefab
JB
551 aggregate: true,
552 average: true,
5df69fab
JB
553 median: false
554 }
86bf340d 555 })
1f95d544
JB
556 expect(() =>
557 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
948faff7 558 ).toThrow(
8735b4e5
JB
559 new TypeError(
560 'Invalid worker choice strategy options: must be a plain object'
561 )
1f95d544 562 )
948faff7 563 expect(() => pool.setWorkerChoiceStrategyOptions({ weights: {} })).toThrow(
8735b4e5
JB
564 new Error(
565 'Invalid worker choice strategy options: must have a weight for each worker node'
566 )
1f95d544
JB
567 )
568 expect(() =>
569 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
948faff7 570 ).toThrow(
8735b4e5
JB
571 new Error(
572 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
573 )
1f95d544 574 )
a20f0ba5
JB
575 await pool.destroy()
576 })
577
2431bdb4 578 it('Verify that pool tasks queue can be enabled/disabled', async () => {
a20f0ba5
JB
579 const pool = new FixedThreadPool(
580 numberOfWorkers,
b2fd3f4a 581 './tests/worker-files/thread/testWorker.mjs'
a20f0ba5
JB
582 )
583 expect(pool.opts.enableTasksQueue).toBe(false)
584 expect(pool.opts.tasksQueueOptions).toBeUndefined()
585 pool.enableTasksQueue(true)
586 expect(pool.opts.enableTasksQueue).toBe(true)
20c6f652
JB
587 expect(pool.opts.tasksQueueOptions).toStrictEqual({
588 concurrency: 1,
2324f8c9 589 size: Math.pow(numberOfWorkers, 2),
dbd73092 590 taskStealing: true,
32b141fd 591 tasksStealingOnBackPressure: true,
568d0075 592 tasksFinishedTimeout: 2000
20c6f652 593 })
a20f0ba5
JB
594 pool.enableTasksQueue(true, { concurrency: 2 })
595 expect(pool.opts.enableTasksQueue).toBe(true)
20c6f652
JB
596 expect(pool.opts.tasksQueueOptions).toStrictEqual({
597 concurrency: 2,
2324f8c9 598 size: Math.pow(numberOfWorkers, 2),
dbd73092 599 taskStealing: true,
32b141fd 600 tasksStealingOnBackPressure: true,
568d0075 601 tasksFinishedTimeout: 2000
20c6f652 602 })
a20f0ba5
JB
603 pool.enableTasksQueue(false)
604 expect(pool.opts.enableTasksQueue).toBe(false)
605 expect(pool.opts.tasksQueueOptions).toBeUndefined()
606 await pool.destroy()
607 })
608
2431bdb4 609 it('Verify that pool tasks queue options can be set', async () => {
a20f0ba5
JB
610 const pool = new FixedThreadPool(
611 numberOfWorkers,
b2fd3f4a 612 './tests/worker-files/thread/testWorker.mjs',
a20f0ba5
JB
613 { enableTasksQueue: true }
614 )
20c6f652
JB
615 expect(pool.opts.tasksQueueOptions).toStrictEqual({
616 concurrency: 1,
2324f8c9 617 size: Math.pow(numberOfWorkers, 2),
dbd73092 618 taskStealing: true,
32b141fd 619 tasksStealingOnBackPressure: true,
568d0075 620 tasksFinishedTimeout: 2000
20c6f652 621 })
d6ca1416 622 for (const workerNode of pool.workerNodes) {
2324f8c9
JB
623 expect(workerNode.tasksQueueBackPressureSize).toBe(
624 pool.opts.tasksQueueOptions.size
625 )
d6ca1416
JB
626 }
627 pool.setTasksQueueOptions({
628 concurrency: 2,
2324f8c9 629 size: 2,
d6ca1416 630 taskStealing: false,
32b141fd 631 tasksStealingOnBackPressure: false,
568d0075 632 tasksFinishedTimeout: 3000
d6ca1416 633 })
20c6f652
JB
634 expect(pool.opts.tasksQueueOptions).toStrictEqual({
635 concurrency: 2,
2324f8c9 636 size: 2,
d6ca1416 637 taskStealing: false,
32b141fd 638 tasksStealingOnBackPressure: false,
568d0075 639 tasksFinishedTimeout: 3000
d6ca1416
JB
640 })
641 for (const workerNode of pool.workerNodes) {
2324f8c9
JB
642 expect(workerNode.tasksQueueBackPressureSize).toBe(
643 pool.opts.tasksQueueOptions.size
644 )
d6ca1416
JB
645 }
646 pool.setTasksQueueOptions({
647 concurrency: 1,
648 taskStealing: true,
649 tasksStealingOnBackPressure: true
650 })
651 expect(pool.opts.tasksQueueOptions).toStrictEqual({
652 concurrency: 1,
2324f8c9 653 size: Math.pow(numberOfWorkers, 2),
dbd73092 654 taskStealing: true,
32b141fd 655 tasksStealingOnBackPressure: true,
568d0075 656 tasksFinishedTimeout: 2000
20c6f652 657 })
d6ca1416 658 for (const workerNode of pool.workerNodes) {
2324f8c9
JB
659 expect(workerNode.tasksQueueBackPressureSize).toBe(
660 pool.opts.tasksQueueOptions.size
661 )
d6ca1416 662 }
948faff7 663 expect(() => pool.setTasksQueueOptions('invalidTasksQueueOptions')).toThrow(
8735b4e5
JB
664 new TypeError('Invalid tasks queue options: must be a plain object')
665 )
948faff7 666 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrow(
e695d66f 667 new RangeError(
20c6f652 668 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
8735b4e5
JB
669 )
670 )
948faff7 671 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrow(
e695d66f 672 new RangeError(
20c6f652 673 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
8735b4e5 674 )
a20f0ba5 675 )
948faff7 676 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrow(
20c6f652
JB
677 new TypeError('Invalid worker node tasks concurrency: must be an integer')
678 )
948faff7 679 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrow(
20c6f652 680 new RangeError(
68dbcdc0 681 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
20c6f652
JB
682 )
683 )
948faff7 684 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrow(
20c6f652 685 new RangeError(
68dbcdc0 686 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
20c6f652
JB
687 )
688 )
948faff7 689 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrow(
68dbcdc0 690 new TypeError('Invalid worker node tasks queue size: must be an integer')
f0d7f803 691 )
a20f0ba5
JB
692 await pool.destroy()
693 })
694
6b27d407
JB
695 it('Verify that pool info is set', async () => {
696 let pool = new FixedThreadPool(
697 numberOfWorkers,
b2fd3f4a 698 './tests/worker-files/thread/testWorker.mjs'
6b27d407 699 )
2dca6cab
JB
700 expect(pool.info).toStrictEqual({
701 version,
702 type: PoolTypes.fixed,
703 worker: WorkerTypes.thread,
47352846 704 started: true,
2dca6cab
JB
705 ready: true,
706 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
0e8587d2 707 strategyRetries: 0,
2dca6cab
JB
708 minSize: numberOfWorkers,
709 maxSize: numberOfWorkers,
710 workerNodes: numberOfWorkers,
711 idleWorkerNodes: numberOfWorkers,
712 busyWorkerNodes: 0,
713 executedTasks: 0,
714 executingTasks: 0,
2dca6cab
JB
715 failedTasks: 0
716 })
6b27d407
JB
717 await pool.destroy()
718 pool = new DynamicClusterPool(
2431bdb4 719 Math.floor(numberOfWorkers / 2),
6b27d407 720 numberOfWorkers,
d35e5717 721 './tests/worker-files/cluster/testWorker.cjs'
6b27d407 722 )
2dca6cab
JB
723 expect(pool.info).toStrictEqual({
724 version,
725 type: PoolTypes.dynamic,
726 worker: WorkerTypes.cluster,
47352846 727 started: true,
2dca6cab
JB
728 ready: true,
729 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
0e8587d2 730 strategyRetries: 0,
2dca6cab
JB
731 minSize: Math.floor(numberOfWorkers / 2),
732 maxSize: numberOfWorkers,
733 workerNodes: Math.floor(numberOfWorkers / 2),
734 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
735 busyWorkerNodes: 0,
736 executedTasks: 0,
737 executingTasks: 0,
2dca6cab
JB
738 failedTasks: 0
739 })
6b27d407
JB
740 await pool.destroy()
741 })
742
2431bdb4 743 it('Verify that pool worker tasks usage are initialized', async () => {
bf9549ae
JB
744 const pool = new FixedClusterPool(
745 numberOfWorkers,
d35e5717 746 './tests/worker-files/cluster/testWorker.cjs'
bf9549ae 747 )
f06e48d8 748 for (const workerNode of pool.workerNodes) {
47352846 749 expect(workerNode).toBeInstanceOf(WorkerNode)
465b2940 750 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
751 tasks: {
752 executed: 0,
753 executing: 0,
754 queued: 0,
df593701 755 maxQueued: 0,
463226a4 756 sequentiallyStolen: 0,
68cbdc84 757 stolen: 0,
a4e07f72
JB
758 failed: 0
759 },
760 runTime: {
4ba4c7f9 761 history: new CircularArray()
a4e07f72
JB
762 },
763 waitTime: {
4ba4c7f9 764 history: new CircularArray()
a4e07f72 765 },
5df69fab
JB
766 elu: {
767 idle: {
4ba4c7f9 768 history: new CircularArray()
5df69fab
JB
769 },
770 active: {
4ba4c7f9 771 history: new CircularArray()
f7510105 772 }
5df69fab 773 }
86bf340d 774 })
f06e48d8
JB
775 }
776 await pool.destroy()
777 })
778
2431bdb4
JB
779 it('Verify that pool worker tasks queue are initialized', async () => {
780 let pool = new FixedClusterPool(
f06e48d8 781 numberOfWorkers,
d35e5717 782 './tests/worker-files/cluster/testWorker.cjs'
f06e48d8
JB
783 )
784 for (const workerNode of pool.workerNodes) {
47352846 785 expect(workerNode).toBeInstanceOf(WorkerNode)
574b351d 786 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
4d8bf9e4 787 expect(workerNode.tasksQueue.size).toBe(0)
9c16fb4b 788 expect(workerNode.tasksQueue.maxSize).toBe(0)
bf9549ae 789 }
fd7ebd49 790 await pool.destroy()
2431bdb4
JB
791 pool = new DynamicThreadPool(
792 Math.floor(numberOfWorkers / 2),
793 numberOfWorkers,
b2fd3f4a 794 './tests/worker-files/thread/testWorker.mjs'
2431bdb4
JB
795 )
796 for (const workerNode of pool.workerNodes) {
47352846 797 expect(workerNode).toBeInstanceOf(WorkerNode)
574b351d 798 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
2431bdb4
JB
799 expect(workerNode.tasksQueue.size).toBe(0)
800 expect(workerNode.tasksQueue.maxSize).toBe(0)
801 }
213cbac6 802 await pool.destroy()
2431bdb4
JB
803 })
804
805 it('Verify that pool worker info are initialized', async () => {
806 let pool = new FixedClusterPool(
807 numberOfWorkers,
d35e5717 808 './tests/worker-files/cluster/testWorker.cjs'
2431bdb4 809 )
2dca6cab 810 for (const workerNode of pool.workerNodes) {
47352846 811 expect(workerNode).toBeInstanceOf(WorkerNode)
2dca6cab
JB
812 expect(workerNode.info).toStrictEqual({
813 id: expect.any(Number),
814 type: WorkerTypes.cluster,
815 dynamic: false,
5eb72b9e
JB
816 ready: true,
817 stealing: false
2dca6cab
JB
818 })
819 }
2431bdb4
JB
820 await pool.destroy()
821 pool = new DynamicThreadPool(
822 Math.floor(numberOfWorkers / 2),
823 numberOfWorkers,
b2fd3f4a 824 './tests/worker-files/thread/testWorker.mjs'
2431bdb4 825 )
2dca6cab 826 for (const workerNode of pool.workerNodes) {
47352846 827 expect(workerNode).toBeInstanceOf(WorkerNode)
2dca6cab
JB
828 expect(workerNode.info).toStrictEqual({
829 id: expect.any(Number),
830 type: WorkerTypes.thread,
831 dynamic: false,
5eb72b9e
JB
832 ready: true,
833 stealing: false
2dca6cab
JB
834 })
835 }
213cbac6 836 await pool.destroy()
bf9549ae
JB
837 })
838
711623b8
JB
839 it('Verify that pool statuses are checked at start or destroy', async () => {
840 const pool = new FixedThreadPool(
841 numberOfWorkers,
842 './tests/worker-files/thread/testWorker.mjs'
843 )
844 expect(pool.info.started).toBe(true)
845 expect(pool.info.ready).toBe(true)
846 expect(() => pool.start()).toThrow(
847 new Error('Cannot start an already started pool')
848 )
849 await pool.destroy()
850 expect(pool.info.started).toBe(false)
851 expect(pool.info.ready).toBe(false)
852 await expect(pool.destroy()).rejects.toThrow(
853 new Error('Cannot destroy an already destroyed pool')
854 )
855 })
856
47352846
JB
857 it('Verify that pool can be started after initialization', async () => {
858 const pool = new FixedClusterPool(
859 numberOfWorkers,
d35e5717 860 './tests/worker-files/cluster/testWorker.cjs',
47352846
JB
861 {
862 startWorkers: false
863 }
864 )
865 expect(pool.info.started).toBe(false)
866 expect(pool.info.ready).toBe(false)
867 expect(pool.workerNodes).toStrictEqual([])
8e8d9101 868 expect(pool.readyEventEmitted).toBe(false)
948faff7 869 await expect(pool.execute()).rejects.toThrow(
47352846
JB
870 new Error('Cannot execute a task on not started pool')
871 )
872 pool.start()
873 expect(pool.info.started).toBe(true)
874 expect(pool.info.ready).toBe(true)
55082af9
JB
875 await waitPoolEvents(pool, PoolEvents.ready, 1)
876 expect(pool.readyEventEmitted).toBe(true)
47352846
JB
877 expect(pool.workerNodes.length).toBe(numberOfWorkers)
878 for (const workerNode of pool.workerNodes) {
879 expect(workerNode).toBeInstanceOf(WorkerNode)
880 }
881 await pool.destroy()
882 })
883
9d2d0da1
JB
884 it('Verify that pool execute() arguments are checked', async () => {
885 const pool = new FixedClusterPool(
886 numberOfWorkers,
d35e5717 887 './tests/worker-files/cluster/testWorker.cjs'
9d2d0da1 888 )
948faff7 889 await expect(pool.execute(undefined, 0)).rejects.toThrow(
9d2d0da1
JB
890 new TypeError('name argument must be a string')
891 )
948faff7 892 await expect(pool.execute(undefined, '')).rejects.toThrow(
9d2d0da1
JB
893 new TypeError('name argument must not be an empty string')
894 )
948faff7 895 await expect(pool.execute(undefined, undefined, {})).rejects.toThrow(
9d2d0da1
JB
896 new TypeError('transferList argument must be an array')
897 )
898 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
899 "Task function 'unknown' not found"
900 )
901 await pool.destroy()
948faff7 902 await expect(pool.execute()).rejects.toThrow(
47352846 903 new Error('Cannot execute a task on not started pool')
9d2d0da1
JB
904 )
905 })
906
2431bdb4 907 it('Verify that pool worker tasks usage are computed', async () => {
bf9549ae
JB
908 const pool = new FixedClusterPool(
909 numberOfWorkers,
d35e5717 910 './tests/worker-files/cluster/testWorker.cjs'
bf9549ae 911 )
09c2d0d3 912 const promises = new Set()
fc027381
JB
913 const maxMultiplier = 2
914 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
09c2d0d3 915 promises.add(pool.execute())
bf9549ae 916 }
f06e48d8 917 for (const workerNode of pool.workerNodes) {
465b2940 918 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
919 tasks: {
920 executed: 0,
921 executing: maxMultiplier,
922 queued: 0,
df593701 923 maxQueued: 0,
463226a4 924 sequentiallyStolen: 0,
68cbdc84 925 stolen: 0,
a4e07f72
JB
926 failed: 0
927 },
928 runTime: {
a4e07f72
JB
929 history: expect.any(CircularArray)
930 },
931 waitTime: {
a4e07f72
JB
932 history: expect.any(CircularArray)
933 },
5df69fab
JB
934 elu: {
935 idle: {
5df69fab
JB
936 history: expect.any(CircularArray)
937 },
938 active: {
5df69fab 939 history: expect.any(CircularArray)
f7510105 940 }
5df69fab 941 }
86bf340d 942 })
bf9549ae
JB
943 }
944 await Promise.all(promises)
f06e48d8 945 for (const workerNode of pool.workerNodes) {
465b2940 946 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
947 tasks: {
948 executed: maxMultiplier,
949 executing: 0,
950 queued: 0,
df593701 951 maxQueued: 0,
463226a4 952 sequentiallyStolen: 0,
68cbdc84 953 stolen: 0,
a4e07f72
JB
954 failed: 0
955 },
956 runTime: {
a4e07f72
JB
957 history: expect.any(CircularArray)
958 },
959 waitTime: {
a4e07f72
JB
960 history: expect.any(CircularArray)
961 },
5df69fab
JB
962 elu: {
963 idle: {
5df69fab
JB
964 history: expect.any(CircularArray)
965 },
966 active: {
5df69fab 967 history: expect.any(CircularArray)
f7510105 968 }
5df69fab 969 }
86bf340d 970 })
bf9549ae 971 }
fd7ebd49 972 await pool.destroy()
bf9549ae
JB
973 })
974
2431bdb4 975 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
7fd82a1c 976 const pool = new DynamicThreadPool(
2431bdb4 977 Math.floor(numberOfWorkers / 2),
8f4878b7 978 numberOfWorkers,
b2fd3f4a 979 './tests/worker-files/thread/testWorker.mjs'
9e619829 980 )
09c2d0d3 981 const promises = new Set()
ee9f5295
JB
982 const maxMultiplier = 2
983 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
09c2d0d3 984 promises.add(pool.execute())
9e619829
JB
985 }
986 await Promise.all(promises)
f06e48d8 987 for (const workerNode of pool.workerNodes) {
465b2940 988 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
989 tasks: {
990 executed: expect.any(Number),
991 executing: 0,
992 queued: 0,
df593701 993 maxQueued: 0,
463226a4 994 sequentiallyStolen: 0,
68cbdc84 995 stolen: 0,
a4e07f72
JB
996 failed: 0
997 },
998 runTime: {
a4e07f72
JB
999 history: expect.any(CircularArray)
1000 },
1001 waitTime: {
a4e07f72
JB
1002 history: expect.any(CircularArray)
1003 },
5df69fab
JB
1004 elu: {
1005 idle: {
5df69fab
JB
1006 history: expect.any(CircularArray)
1007 },
1008 active: {
5df69fab 1009 history: expect.any(CircularArray)
f7510105 1010 }
5df69fab 1011 }
86bf340d 1012 })
465b2940 1013 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
94407def
JB
1014 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1015 numberOfWorkers * maxMultiplier
1016 )
b97d82d8
JB
1017 expect(workerNode.usage.runTime.history.length).toBe(0)
1018 expect(workerNode.usage.waitTime.history.length).toBe(0)
1019 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1020 expect(workerNode.usage.elu.active.history.length).toBe(0)
9e619829
JB
1021 }
1022 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
f06e48d8 1023 for (const workerNode of pool.workerNodes) {
465b2940 1024 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
1025 tasks: {
1026 executed: 0,
1027 executing: 0,
1028 queued: 0,
df593701 1029 maxQueued: 0,
463226a4 1030 sequentiallyStolen: 0,
68cbdc84 1031 stolen: 0,
a4e07f72
JB
1032 failed: 0
1033 },
1034 runTime: {
a4e07f72
JB
1035 history: expect.any(CircularArray)
1036 },
1037 waitTime: {
a4e07f72
JB
1038 history: expect.any(CircularArray)
1039 },
5df69fab
JB
1040 elu: {
1041 idle: {
5df69fab
JB
1042 history: expect.any(CircularArray)
1043 },
1044 active: {
5df69fab 1045 history: expect.any(CircularArray)
f7510105 1046 }
5df69fab 1047 }
86bf340d 1048 })
465b2940
JB
1049 expect(workerNode.usage.runTime.history.length).toBe(0)
1050 expect(workerNode.usage.waitTime.history.length).toBe(0)
b97d82d8
JB
1051 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1052 expect(workerNode.usage.elu.active.history.length).toBe(0)
ee11a4a2 1053 }
fd7ebd49 1054 await pool.destroy()
ee11a4a2
JB
1055 })
1056
a1763c54
JB
1057 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1058 const pool = new DynamicClusterPool(
2431bdb4 1059 Math.floor(numberOfWorkers / 2),
164d950a 1060 numberOfWorkers,
d35e5717 1061 './tests/worker-files/cluster/testWorker.cjs'
164d950a 1062 )
c726f66c 1063 expect(pool.emitter.eventNames()).toStrictEqual([])
d46660cd 1064 let poolInfo
a1763c54 1065 let poolReady = 0
041dc05b 1066 pool.emitter.on(PoolEvents.ready, info => {
a1763c54 1067 ++poolReady
d46660cd
JB
1068 poolInfo = info
1069 })
a1763c54 1070 await waitPoolEvents(pool, PoolEvents.ready, 1)
c726f66c 1071 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
a1763c54 1072 expect(poolReady).toBe(1)
d46660cd 1073 expect(poolInfo).toStrictEqual({
23ccf9d7 1074 version,
d46660cd 1075 type: PoolTypes.dynamic,
a1763c54 1076 worker: WorkerTypes.cluster,
47352846 1077 started: true,
a1763c54 1078 ready: true,
2431bdb4 1079 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
0e8587d2 1080 strategyRetries: expect.any(Number),
2431bdb4
JB
1081 minSize: expect.any(Number),
1082 maxSize: expect.any(Number),
1083 workerNodes: expect.any(Number),
1084 idleWorkerNodes: expect.any(Number),
1085 busyWorkerNodes: expect.any(Number),
1086 executedTasks: expect.any(Number),
1087 executingTasks: expect.any(Number),
2431bdb4
JB
1088 failedTasks: expect.any(Number)
1089 })
1090 await pool.destroy()
1091 })
1092
a1763c54
JB
1093 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1094 const pool = new FixedThreadPool(
2431bdb4 1095 numberOfWorkers,
b2fd3f4a 1096 './tests/worker-files/thread/testWorker.mjs'
2431bdb4 1097 )
c726f66c 1098 expect(pool.emitter.eventNames()).toStrictEqual([])
a1763c54
JB
1099 const promises = new Set()
1100 let poolBusy = 0
2431bdb4 1101 let poolInfo
041dc05b 1102 pool.emitter.on(PoolEvents.busy, info => {
a1763c54 1103 ++poolBusy
2431bdb4
JB
1104 poolInfo = info
1105 })
c726f66c 1106 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
a1763c54
JB
1107 for (let i = 0; i < numberOfWorkers * 2; i++) {
1108 promises.add(pool.execute())
1109 }
1110 await Promise.all(promises)
1111 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1112 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1113 expect(poolBusy).toBe(numberOfWorkers + 1)
2431bdb4
JB
1114 expect(poolInfo).toStrictEqual({
1115 version,
a1763c54
JB
1116 type: PoolTypes.fixed,
1117 worker: WorkerTypes.thread,
47352846
JB
1118 started: true,
1119 ready: true,
2431bdb4 1120 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
0e8587d2 1121 strategyRetries: expect.any(Number),
d46660cd
JB
1122 minSize: expect.any(Number),
1123 maxSize: expect.any(Number),
1124 workerNodes: expect.any(Number),
1125 idleWorkerNodes: expect.any(Number),
1126 busyWorkerNodes: expect.any(Number),
a4e07f72
JB
1127 executedTasks: expect.any(Number),
1128 executingTasks: expect.any(Number),
a4e07f72 1129 failedTasks: expect.any(Number)
d46660cd 1130 })
164d950a
JB
1131 await pool.destroy()
1132 })
1133
a1763c54
JB
1134 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1135 const pool = new DynamicThreadPool(
1136 Math.floor(numberOfWorkers / 2),
7c0ba920 1137 numberOfWorkers,
b2fd3f4a 1138 './tests/worker-files/thread/testWorker.mjs'
7c0ba920 1139 )
c726f66c 1140 expect(pool.emitter.eventNames()).toStrictEqual([])
09c2d0d3 1141 const promises = new Set()
a1763c54 1142 let poolFull = 0
d46660cd 1143 let poolInfo
041dc05b 1144 pool.emitter.on(PoolEvents.full, info => {
a1763c54 1145 ++poolFull
d46660cd
JB
1146 poolInfo = info
1147 })
c726f66c 1148 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.full])
7c0ba920 1149 for (let i = 0; i < numberOfWorkers * 2; i++) {
f5d14e90 1150 promises.add(pool.execute())
7c0ba920 1151 }
cf597bc5 1152 await Promise.all(promises)
33e6bb4c 1153 expect(poolFull).toBe(1)
d46660cd 1154 expect(poolInfo).toStrictEqual({
23ccf9d7 1155 version,
a1763c54 1156 type: PoolTypes.dynamic,
d46660cd 1157 worker: WorkerTypes.thread,
47352846
JB
1158 started: true,
1159 ready: true,
2431bdb4 1160 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
0e8587d2 1161 strategyRetries: expect.any(Number),
8735b4e5
JB
1162 minSize: expect.any(Number),
1163 maxSize: expect.any(Number),
1164 workerNodes: expect.any(Number),
1165 idleWorkerNodes: expect.any(Number),
1166 busyWorkerNodes: expect.any(Number),
1167 executedTasks: expect.any(Number),
1168 executingTasks: expect.any(Number),
1169 failedTasks: expect.any(Number)
1170 })
1171 await pool.destroy()
1172 })
1173
3e8611a8 1174 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
b1aae695 1175 const pool = new FixedThreadPool(
8735b4e5 1176 numberOfWorkers,
b2fd3f4a 1177 './tests/worker-files/thread/testWorker.mjs',
8735b4e5
JB
1178 {
1179 enableTasksQueue: true
1180 }
1181 )
a074ffee 1182 stub(pool, 'hasBackPressure').returns(true)
c726f66c 1183 expect(pool.emitter.eventNames()).toStrictEqual([])
8735b4e5
JB
1184 const promises = new Set()
1185 let poolBackPressure = 0
1186 let poolInfo
041dc05b 1187 pool.emitter.on(PoolEvents.backPressure, info => {
8735b4e5
JB
1188 ++poolBackPressure
1189 poolInfo = info
1190 })
c726f66c 1191 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.backPressure])
033f1776 1192 for (let i = 0; i < numberOfWorkers + 1; i++) {
8735b4e5
JB
1193 promises.add(pool.execute())
1194 }
1195 await Promise.all(promises)
033f1776 1196 expect(poolBackPressure).toBe(1)
8735b4e5
JB
1197 expect(poolInfo).toStrictEqual({
1198 version,
3e8611a8 1199 type: PoolTypes.fixed,
8735b4e5 1200 worker: WorkerTypes.thread,
47352846
JB
1201 started: true,
1202 ready: true,
8735b4e5 1203 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
0e8587d2 1204 strategyRetries: expect.any(Number),
d46660cd
JB
1205 minSize: expect.any(Number),
1206 maxSize: expect.any(Number),
1207 workerNodes: expect.any(Number),
1208 idleWorkerNodes: expect.any(Number),
5eb72b9e 1209 stealingWorkerNodes: expect.any(Number),
d46660cd 1210 busyWorkerNodes: expect.any(Number),
a4e07f72
JB
1211 executedTasks: expect.any(Number),
1212 executingTasks: expect.any(Number),
3e8611a8
JB
1213 maxQueuedTasks: expect.any(Number),
1214 queuedTasks: expect.any(Number),
1215 backPressure: true,
68cbdc84 1216 stolenTasks: expect.any(Number),
a4e07f72 1217 failedTasks: expect.any(Number)
d46660cd 1218 })
5eb72b9e 1219 expect(pool.hasBackPressure.callCount).toBeGreaterThanOrEqual(7)
fd7ebd49 1220 await pool.destroy()
7c0ba920 1221 })
70a4f5ea 1222
85b2561d
JB
1223 it('Verify that destroy() waits for queued tasks to finish', async () => {
1224 const tasksFinishedTimeout = 2500
1225 const pool = new FixedThreadPool(
1226 numberOfWorkers,
1227 './tests/worker-files/thread/asyncWorker.mjs',
1228 {
1229 enableTasksQueue: true,
1230 tasksQueueOptions: { tasksFinishedTimeout }
1231 }
1232 )
1233 const maxMultiplier = 4
1234 let tasksFinished = 0
1235 for (const workerNode of pool.workerNodes) {
1236 workerNode.on('taskFinished', () => {
1237 ++tasksFinished
1238 })
1239 }
1240 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1241 pool.execute()
1242 }
1243 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1244 const startTime = performance.now()
1245 await pool.destroy()
1246 const elapsedTime = performance.now() - startTime
90afa746 1247 expect(tasksFinished).toBeLessThanOrEqual(numberOfWorkers * maxMultiplier)
85b2561d 1248 expect(elapsedTime).toBeGreaterThanOrEqual(2000)
14d5e183 1249 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 800)
85b2561d
JB
1250 })
1251
1252 it('Verify that destroy() waits until the tasks finished timeout is reached', async () => {
1253 const tasksFinishedTimeout = 1000
1254 const pool = new FixedThreadPool(
1255 numberOfWorkers,
1256 './tests/worker-files/thread/asyncWorker.mjs',
1257 {
1258 enableTasksQueue: true,
1259 tasksQueueOptions: { tasksFinishedTimeout }
1260 }
1261 )
1262 const maxMultiplier = 4
1263 let tasksFinished = 0
1264 for (const workerNode of pool.workerNodes) {
1265 workerNode.on('taskFinished', () => {
1266 ++tasksFinished
1267 })
1268 }
1269 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1270 pool.execute()
1271 }
1272 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1273 const startTime = performance.now()
1274 await pool.destroy()
1275 const elapsedTime = performance.now() - startTime
1276 expect(tasksFinished).toBe(0)
2885534c 1277 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 800)
85b2561d
JB
1278 })
1279
f18fd12b
JB
1280 it('Verify that pool asynchronous resource track tasks execution', async () => {
1281 let taskAsyncId
1282 let initCalls = 0
1283 let beforeCalls = 0
1284 let afterCalls = 0
1285 let resolveCalls = 0
1286 const hook = createHook({
1287 init (asyncId, type) {
1288 if (type === 'poolifier:task') {
1289 initCalls++
1290 taskAsyncId = asyncId
1291 }
1292 },
1293 before (asyncId) {
1294 if (asyncId === taskAsyncId) beforeCalls++
1295 },
1296 after (asyncId) {
1297 if (asyncId === taskAsyncId) afterCalls++
1298 },
1299 promiseResolve () {
1300 if (executionAsyncId() === taskAsyncId) resolveCalls++
1301 }
1302 })
f18fd12b
JB
1303 const pool = new FixedThreadPool(
1304 numberOfWorkers,
1305 './tests/worker-files/thread/testWorker.mjs'
1306 )
8954c0a3 1307 hook.enable()
f18fd12b
JB
1308 await pool.execute()
1309 hook.disable()
1310 expect(initCalls).toBe(1)
1311 expect(beforeCalls).toBe(1)
1312 expect(afterCalls).toBe(1)
1313 expect(resolveCalls).toBe(1)
8954c0a3 1314 await pool.destroy()
f18fd12b
JB
1315 })
1316
9eae3c69
JB
1317 it('Verify that hasTaskFunction() is working', async () => {
1318 const dynamicThreadPool = new DynamicThreadPool(
1319 Math.floor(numberOfWorkers / 2),
1320 numberOfWorkers,
b2fd3f4a 1321 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
9eae3c69
JB
1322 )
1323 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1324 expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1325 expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1326 true
1327 )
1328 expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true)
1329 expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true)
1330 expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false)
1331 await dynamicThreadPool.destroy()
1332 const fixedClusterPool = new FixedClusterPool(
1333 numberOfWorkers,
d35e5717 1334 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs'
9eae3c69
JB
1335 )
1336 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1337 expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1338 expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1339 true
1340 )
1341 expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true)
1342 expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true)
1343 expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false)
1344 await fixedClusterPool.destroy()
1345 })
1346
1347 it('Verify that addTaskFunction() is working', async () => {
1348 const dynamicThreadPool = new DynamicThreadPool(
1349 Math.floor(numberOfWorkers / 2),
1350 numberOfWorkers,
b2fd3f4a 1351 './tests/worker-files/thread/testWorker.mjs'
9eae3c69
JB
1352 )
1353 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
3feeab69
JB
1354 await expect(
1355 dynamicThreadPool.addTaskFunction(0, () => {})
948faff7 1356 ).rejects.toThrow(new TypeError('name argument must be a string'))
3feeab69
JB
1357 await expect(
1358 dynamicThreadPool.addTaskFunction('', () => {})
948faff7 1359 ).rejects.toThrow(
3feeab69
JB
1360 new TypeError('name argument must not be an empty string')
1361 )
948faff7
JB
1362 await expect(dynamicThreadPool.addTaskFunction('test', 0)).rejects.toThrow(
1363 new TypeError('fn argument must be a function')
1364 )
1365 await expect(dynamicThreadPool.addTaskFunction('test', '')).rejects.toThrow(
1366 new TypeError('fn argument must be a function')
1367 )
9eae3c69
JB
1368 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1369 DEFAULT_TASK_NAME,
1370 'test'
1371 ])
1372 const echoTaskFunction = data => {
1373 return data
1374 }
1375 await expect(
1376 dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1377 ).resolves.toBe(true)
1378 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1379 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1380 echoTaskFunction
1381 )
1382 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1383 DEFAULT_TASK_NAME,
1384 'test',
1385 'echo'
1386 ])
1387 const taskFunctionData = { test: 'test' }
1388 const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
1389 expect(echoResult).toStrictEqual(taskFunctionData)
adee6053
JB
1390 for (const workerNode of dynamicThreadPool.workerNodes) {
1391 expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
1392 tasks: {
1393 executed: expect.any(Number),
1394 executing: 0,
1395 queued: 0,
463226a4 1396 sequentiallyStolen: 0,
5ad42e34 1397 stolen: 0,
adee6053
JB
1398 failed: 0
1399 },
1400 runTime: {
1401 history: new CircularArray()
1402 },
1403 waitTime: {
1404 history: new CircularArray()
1405 },
1406 elu: {
1407 idle: {
1408 history: new CircularArray()
1409 },
1410 active: {
1411 history: new CircularArray()
1412 }
1413 }
1414 })
1415 }
9eae3c69
JB
1416 await dynamicThreadPool.destroy()
1417 })
1418
1419 it('Verify that removeTaskFunction() is working', async () => {
1420 const dynamicThreadPool = new DynamicThreadPool(
1421 Math.floor(numberOfWorkers / 2),
1422 numberOfWorkers,
b2fd3f4a 1423 './tests/worker-files/thread/testWorker.mjs'
9eae3c69
JB
1424 )
1425 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1426 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1427 DEFAULT_TASK_NAME,
1428 'test'
1429 ])
948faff7 1430 await expect(dynamicThreadPool.removeTaskFunction('test')).rejects.toThrow(
16248b23 1431 new Error('Cannot remove a task function not handled on the pool side')
9eae3c69
JB
1432 )
1433 const echoTaskFunction = data => {
1434 return data
1435 }
1436 await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1437 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1438 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1439 echoTaskFunction
1440 )
1441 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1442 DEFAULT_TASK_NAME,
1443 'test',
1444 'echo'
1445 ])
1446 await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
1447 true
1448 )
1449 expect(dynamicThreadPool.taskFunctions.size).toBe(0)
1450 expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
1451 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1452 DEFAULT_TASK_NAME,
1453 'test'
1454 ])
1455 await dynamicThreadPool.destroy()
1456 })
1457
30500265 1458 it('Verify that listTaskFunctionNames() is working', async () => {
90d7d101
JB
1459 const dynamicThreadPool = new DynamicThreadPool(
1460 Math.floor(numberOfWorkers / 2),
1461 numberOfWorkers,
b2fd3f4a 1462 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
90d7d101
JB
1463 )
1464 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
66979634 1465 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
6cd5248f 1466 DEFAULT_TASK_NAME,
90d7d101
JB
1467 'jsonIntegerSerialization',
1468 'factorial',
1469 'fibonacci'
1470 ])
9eae3c69 1471 await dynamicThreadPool.destroy()
90d7d101
JB
1472 const fixedClusterPool = new FixedClusterPool(
1473 numberOfWorkers,
d35e5717 1474 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs'
90d7d101
JB
1475 )
1476 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
66979634 1477 expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([
6cd5248f 1478 DEFAULT_TASK_NAME,
90d7d101
JB
1479 'jsonIntegerSerialization',
1480 'factorial',
1481 'fibonacci'
1482 ])
0fe39c97 1483 await fixedClusterPool.destroy()
90d7d101
JB
1484 })
1485
9eae3c69 1486 it('Verify that setDefaultTaskFunction() is working', async () => {
30500265
JB
1487 const dynamicThreadPool = new DynamicThreadPool(
1488 Math.floor(numberOfWorkers / 2),
1489 numberOfWorkers,
b2fd3f4a 1490 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
30500265
JB
1491 )
1492 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
711623b8 1493 const workerId = dynamicThreadPool.workerNodes[0].info.id
948faff7 1494 await expect(dynamicThreadPool.setDefaultTaskFunction(0)).rejects.toThrow(
b0b55f57 1495 new Error(
711623b8 1496 `Task function operation 'default' failed on worker ${workerId} with error: 'TypeError: name parameter is not a string'`
b0b55f57
JB
1497 )
1498 )
1499 await expect(
1500 dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
948faff7 1501 ).rejects.toThrow(
b0b55f57 1502 new Error(
711623b8 1503 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function reserved name as the default task function'`
b0b55f57
JB
1504 )
1505 )
1506 await expect(
1507 dynamicThreadPool.setDefaultTaskFunction('unknown')
948faff7 1508 ).rejects.toThrow(
b0b55f57 1509 new Error(
711623b8 1510 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function to a non-existing task function'`
b0b55f57
JB
1511 )
1512 )
9eae3c69
JB
1513 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1514 DEFAULT_TASK_NAME,
1515 'jsonIntegerSerialization',
1516 'factorial',
1517 'fibonacci'
1518 ])
1519 await expect(
1520 dynamicThreadPool.setDefaultTaskFunction('factorial')
1521 ).resolves.toBe(true)
1522 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1523 DEFAULT_TASK_NAME,
1524 'factorial',
1525 'jsonIntegerSerialization',
1526 'fibonacci'
1527 ])
1528 await expect(
1529 dynamicThreadPool.setDefaultTaskFunction('fibonacci')
1530 ).resolves.toBe(true)
1531 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1532 DEFAULT_TASK_NAME,
1533 'fibonacci',
1534 'jsonIntegerSerialization',
1535 'factorial'
1536 ])
cda9ba34 1537 await dynamicThreadPool.destroy()
30500265
JB
1538 })
1539
90d7d101 1540 it('Verify that multiple task functions worker is working', async () => {
70a4f5ea 1541 const pool = new DynamicClusterPool(
2431bdb4 1542 Math.floor(numberOfWorkers / 2),
70a4f5ea 1543 numberOfWorkers,
d35e5717 1544 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs'
70a4f5ea
JB
1545 )
1546 const data = { n: 10 }
82888165 1547 const result0 = await pool.execute(data)
30b963d4 1548 expect(result0).toStrictEqual({ ok: 1 })
70a4f5ea 1549 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
30b963d4 1550 expect(result1).toStrictEqual({ ok: 1 })
70a4f5ea
JB
1551 const result2 = await pool.execute(data, 'factorial')
1552 expect(result2).toBe(3628800)
1553 const result3 = await pool.execute(data, 'fibonacci')
024daf59 1554 expect(result3).toBe(55)
5bb5be17
JB
1555 expect(pool.info.executingTasks).toBe(0)
1556 expect(pool.info.executedTasks).toBe(4)
b414b84c 1557 for (const workerNode of pool.workerNodes) {
66979634 1558 expect(workerNode.info.taskFunctionNames).toStrictEqual([
6cd5248f 1559 DEFAULT_TASK_NAME,
b414b84c
JB
1560 'jsonIntegerSerialization',
1561 'factorial',
1562 'fibonacci'
1563 ])
1564 expect(workerNode.taskFunctionsUsage.size).toBe(3)
66979634 1565 for (const name of pool.listTaskFunctionNames()) {
5bb5be17
JB
1566 expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
1567 tasks: {
1568 executed: expect.any(Number),
4ba4c7f9 1569 executing: 0,
5bb5be17 1570 failed: 0,
68cbdc84 1571 queued: 0,
463226a4 1572 sequentiallyStolen: 0,
68cbdc84 1573 stolen: 0
5bb5be17
JB
1574 },
1575 runTime: {
1576 history: expect.any(CircularArray)
1577 },
1578 waitTime: {
1579 history: expect.any(CircularArray)
1580 },
1581 elu: {
1582 idle: {
1583 history: expect.any(CircularArray)
1584 },
1585 active: {
1586 history: expect.any(CircularArray)
1587 }
1588 }
1589 })
1590 expect(
4ba4c7f9
JB
1591 workerNode.getTaskFunctionWorkerUsage(name).tasks.executed
1592 ).toBeGreaterThan(0)
5bb5be17 1593 }
dfd7ec01
JB
1594 expect(
1595 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1596 ).toStrictEqual(
66979634
JB
1597 workerNode.getTaskFunctionWorkerUsage(
1598 workerNode.info.taskFunctionNames[1]
1599 )
dfd7ec01 1600 )
5bb5be17 1601 }
0fe39c97 1602 await pool.destroy()
70a4f5ea 1603 })
52a23942
JB
1604
1605 it('Verify sendKillMessageToWorker()', async () => {
1606 const pool = new DynamicClusterPool(
1607 Math.floor(numberOfWorkers / 2),
1608 numberOfWorkers,
d35e5717 1609 './tests/worker-files/cluster/testWorker.cjs'
52a23942
JB
1610 )
1611 const workerNodeKey = 0
1612 await expect(
adee6053 1613 pool.sendKillMessageToWorker(workerNodeKey)
52a23942
JB
1614 ).resolves.toBeUndefined()
1615 await pool.destroy()
1616 })
adee6053
JB
1617
1618 it('Verify sendTaskFunctionOperationToWorker()', async () => {
1619 const pool = new DynamicClusterPool(
1620 Math.floor(numberOfWorkers / 2),
1621 numberOfWorkers,
d35e5717 1622 './tests/worker-files/cluster/testWorker.cjs'
adee6053
JB
1623 )
1624 const workerNodeKey = 0
1625 await expect(
1626 pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
1627 taskFunctionOperation: 'add',
1628 taskFunctionName: 'empty',
1629 taskFunction: (() => {}).toString()
1630 })
1631 ).resolves.toBe(true)
1632 expect(
1633 pool.workerNodes[workerNodeKey].info.taskFunctionNames
1634 ).toStrictEqual([DEFAULT_TASK_NAME, 'test', 'empty'])
1635 await pool.destroy()
1636 })
1637
1638 it('Verify sendTaskFunctionOperationToWorkers()', async () => {
1639 const pool = new DynamicClusterPool(
1640 Math.floor(numberOfWorkers / 2),
1641 numberOfWorkers,
d35e5717 1642 './tests/worker-files/cluster/testWorker.cjs'
adee6053 1643 )
adee6053
JB
1644 await expect(
1645 pool.sendTaskFunctionOperationToWorkers({
1646 taskFunctionOperation: 'add',
1647 taskFunctionName: 'empty',
1648 taskFunction: (() => {}).toString()
1649 })
1650 ).resolves.toBe(true)
1651 for (const workerNode of pool.workerNodes) {
1652 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1653 DEFAULT_TASK_NAME,
1654 'test',
1655 'empty'
1656 ])
1657 }
1658 await pool.destroy()
1659 })
3ec964d6 1660})