refactor: silence sonar
[poolifier.git] / tests / pools / abstract-pool.test.mjs
CommitLineData
a074ffee 1import { EventEmitterAsyncResource } from 'node:events'
2eb14889 2import { dirname, join } from 'node:path'
a074ffee 3import { readFileSync } from 'node:fs'
2eb14889 4import { fileURLToPath } from 'node:url'
f18fd12b 5import { createHook, executionAsyncId } from 'node:async_hooks'
a074ffee
JB
6import { expect } from 'expect'
7import { restore, stub } from 'sinon'
8import {
70a4f5ea 9 DynamicClusterPool,
9e619829 10 DynamicThreadPool,
aee46736 11 FixedClusterPool,
e843b904 12 FixedThreadPool,
aee46736 13 PoolEvents,
184855e6 14 PoolTypes,
3d6dd312 15 WorkerChoiceStrategies,
184855e6 16 WorkerTypes
a074ffee
JB
17} from '../../lib/index.js'
18import { CircularArray } from '../../lib/circular-array.js'
19import { Deque } from '../../lib/deque.js'
20import { DEFAULT_TASK_NAME } from '../../lib/utils.js'
21import { waitPoolEvents } from '../test-utils.js'
22import { WorkerNode } from '../../lib/pools/worker-node.js'
e1ffb94f
JB
23
24describe('Abstract pool test suite', () => {
2eb14889
JB
25 const version = JSON.parse(
26 readFileSync(
a5e5599c 27 join(dirname(fileURLToPath(import.meta.url)), '../..', 'package.json'),
2eb14889
JB
28 'utf8'
29 )
30 ).version
fc027381 31 const numberOfWorkers = 2
a8884ffd 32 class StubPoolWithIsMain extends FixedThreadPool {
e1ffb94f
JB
33 isMain () {
34 return false
35 }
3ec964d6 36 }
3ec964d6 37
dc021bcc 38 afterEach(() => {
a074ffee 39 restore()
dc021bcc
JB
40 })
41
bcf85f7f
JB
42 it('Verify that pool can be created and destroyed', async () => {
43 const pool = new FixedThreadPool(
44 numberOfWorkers,
45 './tests/worker-files/thread/testWorker.mjs'
46 )
47 expect(pool).toBeInstanceOf(FixedThreadPool)
48 await pool.destroy()
49 })
50
51 it('Verify that pool cannot be created from a non main thread/process', () => {
8d3782fa
JB
52 expect(
53 () =>
a8884ffd 54 new StubPoolWithIsMain(
7c0ba920 55 numberOfWorkers,
b2fd3f4a 56 './tests/worker-files/thread/testWorker.mjs',
8d3782fa 57 {
041dc05b 58 errorHandler: e => console.error(e)
8d3782fa
JB
59 }
60 )
948faff7 61 ).toThrow(
e695d66f
JB
62 new Error(
63 'Cannot start a pool from a worker with the same type as the pool'
64 )
04f45163 65 )
3ec964d6 66 })
c510fea7 67
bc61cfe6
JB
68 it('Verify that pool statuses properties are set', async () => {
69 const pool = new FixedThreadPool(
70 numberOfWorkers,
b2fd3f4a 71 './tests/worker-files/thread/testWorker.mjs'
bc61cfe6 72 )
bc61cfe6 73 expect(pool.started).toBe(true)
711623b8
JB
74 expect(pool.starting).toBe(false)
75 expect(pool.destroying).toBe(false)
bc61cfe6 76 await pool.destroy()
bc61cfe6
JB
77 })
78
c510fea7 79 it('Verify that filePath is checked', () => {
948faff7 80 expect(() => new FixedThreadPool(numberOfWorkers)).toThrow(
c3719753
JB
81 new TypeError('The worker file path must be specified')
82 )
83 expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrow(
84 new TypeError('The worker file path must be a string')
3d6dd312
JB
85 )
86 expect(
87 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
948faff7 88 ).toThrow(new Error("Cannot find the worker file './dummyWorker.ts'"))
8d3782fa
JB
89 })
90
91 it('Verify that numberOfWorkers is checked', () => {
8003c026
JB
92 expect(
93 () =>
94 new FixedThreadPool(
95 undefined,
b2fd3f4a 96 './tests/worker-files/thread/testWorker.mjs'
8003c026 97 )
948faff7 98 ).toThrow(
e695d66f
JB
99 new Error(
100 'Cannot instantiate a pool without specifying the number of workers'
101 )
8d3782fa
JB
102 )
103 })
104
105 it('Verify that a negative number of workers is checked', () => {
106 expect(
107 () =>
108 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
948faff7 109 ).toThrow(
473c717a
JB
110 new RangeError(
111 'Cannot instantiate a pool with a negative number of workers'
112 )
8d3782fa
JB
113 )
114 })
115
116 it('Verify that a non integer number of workers is checked', () => {
117 expect(
118 () =>
b2fd3f4a 119 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.mjs')
948faff7 120 ).toThrow(
473c717a 121 new TypeError(
0d80593b 122 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
123 )
124 )
c510fea7 125 })
7c0ba920 126
26ce26ca
JB
127 it('Verify that pool arguments number and pool type are checked', () => {
128 expect(
129 () =>
130 new FixedThreadPool(
131 numberOfWorkers,
132 './tests/worker-files/thread/testWorker.mjs',
133 undefined,
134 numberOfWorkers * 2
135 )
136 ).toThrow(
137 new Error(
138 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
139 )
140 )
141 })
142
216541b6 143 it('Verify that dynamic pool sizing is checked', () => {
a5ed75b7
JB
144 expect(
145 () =>
146 new DynamicClusterPool(
147 1,
148 undefined,
149 './tests/worker-files/cluster/testWorker.js'
150 )
948faff7 151 ).toThrow(
a5ed75b7
JB
152 new TypeError(
153 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
154 )
155 )
2761efb4
JB
156 expect(
157 () =>
158 new DynamicThreadPool(
159 0.5,
160 1,
b2fd3f4a 161 './tests/worker-files/thread/testWorker.mjs'
2761efb4 162 )
948faff7 163 ).toThrow(
2761efb4
JB
164 new TypeError(
165 'Cannot instantiate a pool with a non safe integer number of workers'
166 )
167 )
168 expect(
169 () =>
170 new DynamicClusterPool(
171 0,
172 0.5,
a5ed75b7 173 './tests/worker-files/cluster/testWorker.js'
2761efb4 174 )
948faff7 175 ).toThrow(
2761efb4
JB
176 new TypeError(
177 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
178 )
179 )
2431bdb4
JB
180 expect(
181 () =>
b2fd3f4a
JB
182 new DynamicThreadPool(
183 2,
184 1,
185 './tests/worker-files/thread/testWorker.mjs'
186 )
948faff7 187 ).toThrow(
2431bdb4
JB
188 new RangeError(
189 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
190 )
191 )
192 expect(
193 () =>
b2fd3f4a
JB
194 new DynamicThreadPool(
195 0,
196 0,
197 './tests/worker-files/thread/testWorker.mjs'
198 )
948faff7 199 ).toThrow(
2431bdb4 200 new RangeError(
213cbac6 201 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
2431bdb4
JB
202 )
203 )
21f710aa
JB
204 expect(
205 () =>
213cbac6
JB
206 new DynamicClusterPool(
207 1,
208 1,
209 './tests/worker-files/cluster/testWorker.js'
210 )
948faff7 211 ).toThrow(
21f710aa 212 new RangeError(
213cbac6 213 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
21f710aa
JB
214 )
215 )
2431bdb4
JB
216 })
217
fd7ebd49 218 it('Verify that pool options are checked', async () => {
7c0ba920
JB
219 let pool = new FixedThreadPool(
220 numberOfWorkers,
b2fd3f4a 221 './tests/worker-files/thread/testWorker.mjs'
7c0ba920 222 )
b5604034 223 expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
47352846
JB
224 expect(pool.opts).toStrictEqual({
225 startWorkers: true,
226 enableEvents: true,
227 restartWorkerOnError: true,
228 enableTasksQueue: false,
26ce26ca 229 workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN
8990357d
JB
230 })
231 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
00e1bdeb
JB
232 retries:
233 pool.info.maxSize +
234 Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
932fc8be 235 runTime: { median: false },
5df69fab 236 waitTime: { median: false },
00e1bdeb
JB
237 elu: { median: false },
238 weights: expect.objectContaining({
239 0: expect.any(Number),
240 [pool.info.maxSize - 1]: expect.any(Number)
241 })
da309861 242 })
999ef664
JB
243 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
244 .workerChoiceStrategies) {
86cbb766
JB
245 expect(workerChoiceStrategy.opts).toStrictEqual({
246 retries:
247 pool.info.maxSize +
248 Object.keys(workerChoiceStrategy.opts.weights).length,
249 runTime: { median: false },
250 waitTime: { median: false },
251 elu: { median: false },
252 weights: expect.objectContaining({
253 0: expect.any(Number),
254 [pool.info.maxSize - 1]: expect.any(Number)
00e1bdeb 255 })
86cbb766 256 })
999ef664 257 }
fd7ebd49 258 await pool.destroy()
73bfd59d 259 const testHandler = () => console.info('test handler executed')
7c0ba920
JB
260 pool = new FixedThreadPool(
261 numberOfWorkers,
b2fd3f4a 262 './tests/worker-files/thread/testWorker.mjs',
7c0ba920 263 {
e4543b14 264 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
49be33fe 265 workerChoiceStrategyOptions: {
932fc8be 266 runTime: { median: true },
fc027381 267 weights: { 0: 300, 1: 200 }
49be33fe 268 },
35cf1c03 269 enableEvents: false,
1f68cede 270 restartWorkerOnError: false,
ff733df7 271 enableTasksQueue: true,
d4aeae5a 272 tasksQueueOptions: { concurrency: 2 },
35cf1c03
JB
273 messageHandler: testHandler,
274 errorHandler: testHandler,
275 onlineHandler: testHandler,
276 exitHandler: testHandler
7c0ba920
JB
277 }
278 )
7c0ba920 279 expect(pool.emitter).toBeUndefined()
47352846
JB
280 expect(pool.opts).toStrictEqual({
281 startWorkers: true,
282 enableEvents: false,
283 restartWorkerOnError: false,
284 enableTasksQueue: true,
285 tasksQueueOptions: {
286 concurrency: 2,
2324f8c9 287 size: Math.pow(numberOfWorkers, 2),
dbd73092 288 taskStealing: true,
32b141fd 289 tasksStealingOnBackPressure: true,
568d0075 290 tasksFinishedTimeout: 2000
47352846
JB
291 },
292 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
293 workerChoiceStrategyOptions: {
47352846 294 runTime: { median: true },
47352846
JB
295 weights: { 0: 300, 1: 200 }
296 },
297 onlineHandler: testHandler,
298 messageHandler: testHandler,
299 errorHandler: testHandler,
300 exitHandler: testHandler
8990357d
JB
301 })
302 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
449cd154
JB
303 retries:
304 pool.info.maxSize +
305 Object.keys(pool.opts.workerChoiceStrategyOptions.weights).length,
8990357d
JB
306 runTime: { median: true },
307 waitTime: { median: false },
308 elu: { median: false },
fc027381 309 weights: { 0: 300, 1: 200 }
da309861 310 })
999ef664
JB
311 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
312 .workerChoiceStrategies) {
313 expect(workerChoiceStrategy.opts).toStrictEqual({
449cd154
JB
314 retries:
315 pool.info.maxSize +
316 Object.keys(pool.opts.workerChoiceStrategyOptions.weights).length,
999ef664
JB
317 runTime: { median: true },
318 waitTime: { median: false },
319 elu: { median: false },
320 weights: { 0: 300, 1: 200 }
321 })
322 }
fd7ebd49 323 await pool.destroy()
7c0ba920
JB
324 })
325
fe291b64 326 it('Verify that pool options are validated', () => {
d4aeae5a
JB
327 expect(
328 () =>
329 new FixedThreadPool(
330 numberOfWorkers,
b2fd3f4a 331 './tests/worker-files/thread/testWorker.mjs',
d4aeae5a 332 {
f0d7f803 333 workerChoiceStrategy: 'invalidStrategy'
d4aeae5a
JB
334 }
335 )
948faff7 336 ).toThrow(new Error("Invalid worker choice strategy 'invalidStrategy'"))
49be33fe
JB
337 expect(
338 () =>
339 new FixedThreadPool(
340 numberOfWorkers,
b2fd3f4a 341 './tests/worker-files/thread/testWorker.mjs',
49be33fe
JB
342 {
343 workerChoiceStrategyOptions: { weights: {} }
344 }
345 )
948faff7 346 ).toThrow(
8735b4e5
JB
347 new Error(
348 'Invalid worker choice strategy options: must have a weight for each worker node'
349 )
49be33fe 350 )
f0d7f803
JB
351 expect(
352 () =>
353 new FixedThreadPool(
354 numberOfWorkers,
b2fd3f4a 355 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
356 {
357 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
358 }
359 )
948faff7 360 ).toThrow(
8735b4e5
JB
361 new Error(
362 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
363 )
f0d7f803 364 )
5c4c2dee
JB
365 expect(
366 () =>
367 new FixedThreadPool(
368 numberOfWorkers,
b2fd3f4a 369 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
370 {
371 enableTasksQueue: true,
372 tasksQueueOptions: 'invalidTasksQueueOptions'
373 }
374 )
948faff7 375 ).toThrow(
5c4c2dee
JB
376 new TypeError('Invalid tasks queue options: must be a plain object')
377 )
f0d7f803
JB
378 expect(
379 () =>
380 new FixedThreadPool(
381 numberOfWorkers,
b2fd3f4a 382 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
383 {
384 enableTasksQueue: true,
385 tasksQueueOptions: { concurrency: 0 }
386 }
387 )
948faff7 388 ).toThrow(
e695d66f 389 new RangeError(
20c6f652 390 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
8735b4e5
JB
391 )
392 )
f0d7f803
JB
393 expect(
394 () =>
395 new FixedThreadPool(
396 numberOfWorkers,
b2fd3f4a 397 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
398 {
399 enableTasksQueue: true,
5c4c2dee 400 tasksQueueOptions: { concurrency: -1 }
f0d7f803
JB
401 }
402 )
948faff7 403 ).toThrow(
5c4c2dee
JB
404 new RangeError(
405 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
406 )
8735b4e5 407 )
f0d7f803
JB
408 expect(
409 () =>
410 new FixedThreadPool(
411 numberOfWorkers,
b2fd3f4a 412 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
413 {
414 enableTasksQueue: true,
415 tasksQueueOptions: { concurrency: 0.2 }
416 }
417 )
948faff7 418 ).toThrow(
20c6f652 419 new TypeError('Invalid worker node tasks concurrency: must be an integer')
8735b4e5 420 )
5c4c2dee
JB
421 expect(
422 () =>
423 new FixedThreadPool(
424 numberOfWorkers,
b2fd3f4a 425 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
426 {
427 enableTasksQueue: true,
428 tasksQueueOptions: { size: 0 }
429 }
430 )
948faff7 431 ).toThrow(
5c4c2dee
JB
432 new RangeError(
433 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
434 )
435 )
436 expect(
437 () =>
438 new FixedThreadPool(
439 numberOfWorkers,
b2fd3f4a 440 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
441 {
442 enableTasksQueue: true,
443 tasksQueueOptions: { size: -1 }
444 }
445 )
948faff7 446 ).toThrow(
5c4c2dee
JB
447 new RangeError(
448 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
449 )
450 )
451 expect(
452 () =>
453 new FixedThreadPool(
454 numberOfWorkers,
b2fd3f4a 455 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
456 {
457 enableTasksQueue: true,
458 tasksQueueOptions: { size: 0.2 }
459 }
460 )
948faff7 461 ).toThrow(
5c4c2dee
JB
462 new TypeError('Invalid worker node tasks queue size: must be an integer')
463 )
d4aeae5a
JB
464 })
465
2431bdb4 466 it('Verify that pool worker choice strategy options can be set', async () => {
a20f0ba5
JB
467 const pool = new FixedThreadPool(
468 numberOfWorkers,
b2fd3f4a 469 './tests/worker-files/thread/testWorker.mjs',
a20f0ba5
JB
470 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
471 )
26ce26ca 472 expect(pool.opts.workerChoiceStrategyOptions).toBeUndefined()
8990357d 473 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
00e1bdeb
JB
474 retries:
475 pool.info.maxSize +
476 Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
932fc8be 477 runTime: { median: false },
5df69fab 478 waitTime: { median: false },
00e1bdeb
JB
479 elu: { median: false },
480 weights: expect.objectContaining({
481 0: expect.any(Number),
482 [pool.info.maxSize - 1]: expect.any(Number)
483 })
a20f0ba5
JB
484 })
485 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
486 .workerChoiceStrategies) {
86cbb766
JB
487 expect(workerChoiceStrategy.opts).toStrictEqual({
488 retries:
489 pool.info.maxSize +
490 Object.keys(workerChoiceStrategy.opts.weights).length,
491 runTime: { median: false },
492 waitTime: { median: false },
493 elu: { median: false },
494 weights: expect.objectContaining({
495 0: expect.any(Number),
496 [pool.info.maxSize - 1]: expect.any(Number)
00e1bdeb 497 })
86cbb766 498 })
a20f0ba5 499 }
87de9ff5
JB
500 expect(
501 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
502 ).toStrictEqual({
932fc8be
JB
503 runTime: {
504 aggregate: true,
505 average: true,
506 median: false
507 },
508 waitTime: {
509 aggregate: false,
510 average: false,
511 median: false
512 },
5df69fab 513 elu: {
9adcefab
JB
514 aggregate: true,
515 average: true,
5df69fab
JB
516 median: false
517 }
86bf340d 518 })
9adcefab
JB
519 pool.setWorkerChoiceStrategyOptions({
520 runTime: { median: true },
521 elu: { median: true }
522 })
a20f0ba5 523 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
9adcefab 524 runTime: { median: true },
8990357d
JB
525 elu: { median: true }
526 })
527 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
00e1bdeb
JB
528 retries:
529 pool.info.maxSize +
530 Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
8990357d
JB
531 runTime: { median: true },
532 waitTime: { median: false },
00e1bdeb
JB
533 elu: { median: true },
534 weights: expect.objectContaining({
535 0: expect.any(Number),
536 [pool.info.maxSize - 1]: expect.any(Number)
537 })
a20f0ba5
JB
538 })
539 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
540 .workerChoiceStrategies) {
86cbb766
JB
541 expect(workerChoiceStrategy.opts).toStrictEqual({
542 retries:
543 pool.info.maxSize +
544 Object.keys(workerChoiceStrategy.opts.weights).length,
545 runTime: { median: true },
546 waitTime: { median: false },
547 elu: { median: true },
548 weights: expect.objectContaining({
549 0: expect.any(Number),
550 [pool.info.maxSize - 1]: expect.any(Number)
00e1bdeb 551 })
86cbb766 552 })
a20f0ba5 553 }
87de9ff5
JB
554 expect(
555 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
556 ).toStrictEqual({
932fc8be
JB
557 runTime: {
558 aggregate: true,
559 average: false,
560 median: true
561 },
562 waitTime: {
563 aggregate: false,
564 average: false,
565 median: false
566 },
5df69fab 567 elu: {
9adcefab 568 aggregate: true,
5df69fab 569 average: false,
9adcefab 570 median: true
5df69fab 571 }
86bf340d 572 })
9adcefab
JB
573 pool.setWorkerChoiceStrategyOptions({
574 runTime: { median: false },
575 elu: { median: false }
576 })
a20f0ba5 577 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
8990357d 578 runTime: { median: false },
8990357d
JB
579 elu: { median: false }
580 })
581 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
00e1bdeb
JB
582 retries:
583 pool.info.maxSize +
584 Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
9adcefab 585 runTime: { median: false },
8990357d 586 waitTime: { median: false },
00e1bdeb
JB
587 elu: { median: false },
588 weights: expect.objectContaining({
589 0: expect.any(Number),
590 [pool.info.maxSize - 1]: expect.any(Number)
591 })
a20f0ba5
JB
592 })
593 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
594 .workerChoiceStrategies) {
86cbb766
JB
595 expect(workerChoiceStrategy.opts).toStrictEqual({
596 retries:
597 pool.info.maxSize +
598 Object.keys(workerChoiceStrategy.opts.weights).length,
599 runTime: { median: false },
600 waitTime: { median: false },
601 elu: { median: false },
602 weights: expect.objectContaining({
603 0: expect.any(Number),
604 [pool.info.maxSize - 1]: expect.any(Number)
00e1bdeb 605 })
86cbb766 606 })
a20f0ba5 607 }
87de9ff5
JB
608 expect(
609 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
610 ).toStrictEqual({
932fc8be
JB
611 runTime: {
612 aggregate: true,
613 average: true,
614 median: false
615 },
616 waitTime: {
617 aggregate: false,
618 average: false,
619 median: false
620 },
5df69fab 621 elu: {
9adcefab
JB
622 aggregate: true,
623 average: true,
5df69fab
JB
624 median: false
625 }
86bf340d 626 })
1f95d544
JB
627 expect(() =>
628 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
948faff7 629 ).toThrow(
8735b4e5
JB
630 new TypeError(
631 'Invalid worker choice strategy options: must be a plain object'
632 )
1f95d544 633 )
948faff7 634 expect(() => pool.setWorkerChoiceStrategyOptions({ weights: {} })).toThrow(
8735b4e5
JB
635 new Error(
636 'Invalid worker choice strategy options: must have a weight for each worker node'
637 )
1f95d544
JB
638 )
639 expect(() =>
640 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
948faff7 641 ).toThrow(
8735b4e5
JB
642 new Error(
643 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
644 )
1f95d544 645 )
a20f0ba5
JB
646 await pool.destroy()
647 })
648
2431bdb4 649 it('Verify that pool tasks queue can be enabled/disabled', async () => {
a20f0ba5
JB
650 const pool = new FixedThreadPool(
651 numberOfWorkers,
b2fd3f4a 652 './tests/worker-files/thread/testWorker.mjs'
a20f0ba5
JB
653 )
654 expect(pool.opts.enableTasksQueue).toBe(false)
655 expect(pool.opts.tasksQueueOptions).toBeUndefined()
656 pool.enableTasksQueue(true)
657 expect(pool.opts.enableTasksQueue).toBe(true)
20c6f652
JB
658 expect(pool.opts.tasksQueueOptions).toStrictEqual({
659 concurrency: 1,
2324f8c9 660 size: Math.pow(numberOfWorkers, 2),
dbd73092 661 taskStealing: true,
32b141fd 662 tasksStealingOnBackPressure: true,
568d0075 663 tasksFinishedTimeout: 2000
20c6f652 664 })
a20f0ba5
JB
665 pool.enableTasksQueue(true, { concurrency: 2 })
666 expect(pool.opts.enableTasksQueue).toBe(true)
20c6f652
JB
667 expect(pool.opts.tasksQueueOptions).toStrictEqual({
668 concurrency: 2,
2324f8c9 669 size: Math.pow(numberOfWorkers, 2),
dbd73092 670 taskStealing: true,
32b141fd 671 tasksStealingOnBackPressure: true,
568d0075 672 tasksFinishedTimeout: 2000
20c6f652 673 })
a20f0ba5
JB
674 pool.enableTasksQueue(false)
675 expect(pool.opts.enableTasksQueue).toBe(false)
676 expect(pool.opts.tasksQueueOptions).toBeUndefined()
677 await pool.destroy()
678 })
679
2431bdb4 680 it('Verify that pool tasks queue options can be set', async () => {
a20f0ba5
JB
681 const pool = new FixedThreadPool(
682 numberOfWorkers,
b2fd3f4a 683 './tests/worker-files/thread/testWorker.mjs',
a20f0ba5
JB
684 { enableTasksQueue: true }
685 )
20c6f652
JB
686 expect(pool.opts.tasksQueueOptions).toStrictEqual({
687 concurrency: 1,
2324f8c9 688 size: Math.pow(numberOfWorkers, 2),
dbd73092 689 taskStealing: true,
32b141fd 690 tasksStealingOnBackPressure: true,
568d0075 691 tasksFinishedTimeout: 2000
20c6f652 692 })
d6ca1416 693 for (const workerNode of pool.workerNodes) {
2324f8c9
JB
694 expect(workerNode.tasksQueueBackPressureSize).toBe(
695 pool.opts.tasksQueueOptions.size
696 )
d6ca1416
JB
697 }
698 pool.setTasksQueueOptions({
699 concurrency: 2,
2324f8c9 700 size: 2,
d6ca1416 701 taskStealing: false,
32b141fd 702 tasksStealingOnBackPressure: false,
568d0075 703 tasksFinishedTimeout: 3000
d6ca1416 704 })
20c6f652
JB
705 expect(pool.opts.tasksQueueOptions).toStrictEqual({
706 concurrency: 2,
2324f8c9 707 size: 2,
d6ca1416 708 taskStealing: false,
32b141fd 709 tasksStealingOnBackPressure: false,
568d0075 710 tasksFinishedTimeout: 3000
d6ca1416
JB
711 })
712 for (const workerNode of pool.workerNodes) {
2324f8c9
JB
713 expect(workerNode.tasksQueueBackPressureSize).toBe(
714 pool.opts.tasksQueueOptions.size
715 )
d6ca1416
JB
716 }
717 pool.setTasksQueueOptions({
718 concurrency: 1,
719 taskStealing: true,
720 tasksStealingOnBackPressure: true
721 })
722 expect(pool.opts.tasksQueueOptions).toStrictEqual({
723 concurrency: 1,
2324f8c9 724 size: Math.pow(numberOfWorkers, 2),
dbd73092 725 taskStealing: true,
32b141fd 726 tasksStealingOnBackPressure: true,
568d0075 727 tasksFinishedTimeout: 2000
20c6f652 728 })
d6ca1416 729 for (const workerNode of pool.workerNodes) {
2324f8c9
JB
730 expect(workerNode.tasksQueueBackPressureSize).toBe(
731 pool.opts.tasksQueueOptions.size
732 )
d6ca1416 733 }
948faff7 734 expect(() => pool.setTasksQueueOptions('invalidTasksQueueOptions')).toThrow(
8735b4e5
JB
735 new TypeError('Invalid tasks queue options: must be a plain object')
736 )
948faff7 737 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrow(
e695d66f 738 new RangeError(
20c6f652 739 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
8735b4e5
JB
740 )
741 )
948faff7 742 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrow(
e695d66f 743 new RangeError(
20c6f652 744 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
8735b4e5 745 )
a20f0ba5 746 )
948faff7 747 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrow(
20c6f652
JB
748 new TypeError('Invalid worker node tasks concurrency: must be an integer')
749 )
948faff7 750 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrow(
20c6f652 751 new RangeError(
68dbcdc0 752 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
20c6f652
JB
753 )
754 )
948faff7 755 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrow(
20c6f652 756 new RangeError(
68dbcdc0 757 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
20c6f652
JB
758 )
759 )
948faff7 760 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrow(
68dbcdc0 761 new TypeError('Invalid worker node tasks queue size: must be an integer')
f0d7f803 762 )
a20f0ba5
JB
763 await pool.destroy()
764 })
765
6b27d407
JB
766 it('Verify that pool info is set', async () => {
767 let pool = new FixedThreadPool(
768 numberOfWorkers,
b2fd3f4a 769 './tests/worker-files/thread/testWorker.mjs'
6b27d407 770 )
2dca6cab
JB
771 expect(pool.info).toStrictEqual({
772 version,
773 type: PoolTypes.fixed,
774 worker: WorkerTypes.thread,
47352846 775 started: true,
2dca6cab
JB
776 ready: true,
777 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
778 minSize: numberOfWorkers,
779 maxSize: numberOfWorkers,
780 workerNodes: numberOfWorkers,
781 idleWorkerNodes: numberOfWorkers,
782 busyWorkerNodes: 0,
783 executedTasks: 0,
784 executingTasks: 0,
2dca6cab
JB
785 failedTasks: 0
786 })
6b27d407
JB
787 await pool.destroy()
788 pool = new DynamicClusterPool(
2431bdb4 789 Math.floor(numberOfWorkers / 2),
6b27d407 790 numberOfWorkers,
ecdfbdc0 791 './tests/worker-files/cluster/testWorker.js'
6b27d407 792 )
2dca6cab
JB
793 expect(pool.info).toStrictEqual({
794 version,
795 type: PoolTypes.dynamic,
796 worker: WorkerTypes.cluster,
47352846 797 started: true,
2dca6cab
JB
798 ready: true,
799 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
800 minSize: Math.floor(numberOfWorkers / 2),
801 maxSize: numberOfWorkers,
802 workerNodes: Math.floor(numberOfWorkers / 2),
803 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
804 busyWorkerNodes: 0,
805 executedTasks: 0,
806 executingTasks: 0,
2dca6cab
JB
807 failedTasks: 0
808 })
6b27d407
JB
809 await pool.destroy()
810 })
811
2431bdb4 812 it('Verify that pool worker tasks usage are initialized', async () => {
bf9549ae
JB
813 const pool = new FixedClusterPool(
814 numberOfWorkers,
815 './tests/worker-files/cluster/testWorker.js'
816 )
f06e48d8 817 for (const workerNode of pool.workerNodes) {
47352846 818 expect(workerNode).toBeInstanceOf(WorkerNode)
465b2940 819 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
820 tasks: {
821 executed: 0,
822 executing: 0,
823 queued: 0,
df593701 824 maxQueued: 0,
463226a4 825 sequentiallyStolen: 0,
68cbdc84 826 stolen: 0,
a4e07f72
JB
827 failed: 0
828 },
829 runTime: {
4ba4c7f9 830 history: new CircularArray()
a4e07f72
JB
831 },
832 waitTime: {
4ba4c7f9 833 history: new CircularArray()
a4e07f72 834 },
5df69fab
JB
835 elu: {
836 idle: {
4ba4c7f9 837 history: new CircularArray()
5df69fab
JB
838 },
839 active: {
4ba4c7f9 840 history: new CircularArray()
f7510105 841 }
5df69fab 842 }
86bf340d 843 })
f06e48d8
JB
844 }
845 await pool.destroy()
846 })
847
2431bdb4
JB
848 it('Verify that pool worker tasks queue are initialized', async () => {
849 let pool = new FixedClusterPool(
f06e48d8
JB
850 numberOfWorkers,
851 './tests/worker-files/cluster/testWorker.js'
852 )
853 for (const workerNode of pool.workerNodes) {
47352846 854 expect(workerNode).toBeInstanceOf(WorkerNode)
574b351d 855 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
4d8bf9e4 856 expect(workerNode.tasksQueue.size).toBe(0)
9c16fb4b 857 expect(workerNode.tasksQueue.maxSize).toBe(0)
bf9549ae 858 }
fd7ebd49 859 await pool.destroy()
2431bdb4
JB
860 pool = new DynamicThreadPool(
861 Math.floor(numberOfWorkers / 2),
862 numberOfWorkers,
b2fd3f4a 863 './tests/worker-files/thread/testWorker.mjs'
2431bdb4
JB
864 )
865 for (const workerNode of pool.workerNodes) {
47352846 866 expect(workerNode).toBeInstanceOf(WorkerNode)
574b351d 867 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
2431bdb4
JB
868 expect(workerNode.tasksQueue.size).toBe(0)
869 expect(workerNode.tasksQueue.maxSize).toBe(0)
870 }
213cbac6 871 await pool.destroy()
2431bdb4
JB
872 })
873
874 it('Verify that pool worker info are initialized', async () => {
875 let pool = new FixedClusterPool(
876 numberOfWorkers,
877 './tests/worker-files/cluster/testWorker.js'
878 )
2dca6cab 879 for (const workerNode of pool.workerNodes) {
47352846 880 expect(workerNode).toBeInstanceOf(WorkerNode)
2dca6cab
JB
881 expect(workerNode.info).toStrictEqual({
882 id: expect.any(Number),
883 type: WorkerTypes.cluster,
884 dynamic: false,
885 ready: true
886 })
887 }
2431bdb4
JB
888 await pool.destroy()
889 pool = new DynamicThreadPool(
890 Math.floor(numberOfWorkers / 2),
891 numberOfWorkers,
b2fd3f4a 892 './tests/worker-files/thread/testWorker.mjs'
2431bdb4 893 )
2dca6cab 894 for (const workerNode of pool.workerNodes) {
47352846 895 expect(workerNode).toBeInstanceOf(WorkerNode)
2dca6cab
JB
896 expect(workerNode.info).toStrictEqual({
897 id: expect.any(Number),
898 type: WorkerTypes.thread,
899 dynamic: false,
7884d183 900 ready: true
2dca6cab
JB
901 })
902 }
213cbac6 903 await pool.destroy()
bf9549ae
JB
904 })
905
711623b8
JB
906 it('Verify that pool statuses are checked at start or destroy', async () => {
907 const pool = new FixedThreadPool(
908 numberOfWorkers,
909 './tests/worker-files/thread/testWorker.mjs'
910 )
911 expect(pool.info.started).toBe(true)
912 expect(pool.info.ready).toBe(true)
913 expect(() => pool.start()).toThrow(
914 new Error('Cannot start an already started pool')
915 )
916 await pool.destroy()
917 expect(pool.info.started).toBe(false)
918 expect(pool.info.ready).toBe(false)
919 await expect(pool.destroy()).rejects.toThrow(
920 new Error('Cannot destroy an already destroyed pool')
921 )
922 })
923
47352846
JB
924 it('Verify that pool can be started after initialization', async () => {
925 const pool = new FixedClusterPool(
926 numberOfWorkers,
927 './tests/worker-files/cluster/testWorker.js',
928 {
929 startWorkers: false
930 }
931 )
932 expect(pool.info.started).toBe(false)
933 expect(pool.info.ready).toBe(false)
55082af9 934 expect(pool.readyEventEmitted).toBe(false)
47352846 935 expect(pool.workerNodes).toStrictEqual([])
948faff7 936 await expect(pool.execute()).rejects.toThrow(
47352846
JB
937 new Error('Cannot execute a task on not started pool')
938 )
939 pool.start()
940 expect(pool.info.started).toBe(true)
941 expect(pool.info.ready).toBe(true)
55082af9
JB
942 await waitPoolEvents(pool, PoolEvents.ready, 1)
943 expect(pool.readyEventEmitted).toBe(true)
47352846
JB
944 expect(pool.workerNodes.length).toBe(numberOfWorkers)
945 for (const workerNode of pool.workerNodes) {
946 expect(workerNode).toBeInstanceOf(WorkerNode)
947 }
948 await pool.destroy()
949 })
950
9d2d0da1
JB
951 it('Verify that pool execute() arguments are checked', async () => {
952 const pool = new FixedClusterPool(
953 numberOfWorkers,
954 './tests/worker-files/cluster/testWorker.js'
955 )
948faff7 956 await expect(pool.execute(undefined, 0)).rejects.toThrow(
9d2d0da1
JB
957 new TypeError('name argument must be a string')
958 )
948faff7 959 await expect(pool.execute(undefined, '')).rejects.toThrow(
9d2d0da1
JB
960 new TypeError('name argument must not be an empty string')
961 )
948faff7 962 await expect(pool.execute(undefined, undefined, {})).rejects.toThrow(
9d2d0da1
JB
963 new TypeError('transferList argument must be an array')
964 )
965 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
966 "Task function 'unknown' not found"
967 )
968 await pool.destroy()
948faff7 969 await expect(pool.execute()).rejects.toThrow(
47352846 970 new Error('Cannot execute a task on not started pool')
9d2d0da1
JB
971 )
972 })
973
2431bdb4 974 it('Verify that pool worker tasks usage are computed', async () => {
bf9549ae
JB
975 const pool = new FixedClusterPool(
976 numberOfWorkers,
977 './tests/worker-files/cluster/testWorker.js'
978 )
09c2d0d3 979 const promises = new Set()
fc027381
JB
980 const maxMultiplier = 2
981 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
09c2d0d3 982 promises.add(pool.execute())
bf9549ae 983 }
f06e48d8 984 for (const workerNode of pool.workerNodes) {
465b2940 985 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
986 tasks: {
987 executed: 0,
988 executing: maxMultiplier,
989 queued: 0,
df593701 990 maxQueued: 0,
463226a4 991 sequentiallyStolen: 0,
68cbdc84 992 stolen: 0,
a4e07f72
JB
993 failed: 0
994 },
995 runTime: {
a4e07f72
JB
996 history: expect.any(CircularArray)
997 },
998 waitTime: {
a4e07f72
JB
999 history: expect.any(CircularArray)
1000 },
5df69fab
JB
1001 elu: {
1002 idle: {
5df69fab
JB
1003 history: expect.any(CircularArray)
1004 },
1005 active: {
5df69fab 1006 history: expect.any(CircularArray)
f7510105 1007 }
5df69fab 1008 }
86bf340d 1009 })
bf9549ae
JB
1010 }
1011 await Promise.all(promises)
f06e48d8 1012 for (const workerNode of pool.workerNodes) {
465b2940 1013 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
1014 tasks: {
1015 executed: maxMultiplier,
1016 executing: 0,
1017 queued: 0,
df593701 1018 maxQueued: 0,
463226a4 1019 sequentiallyStolen: 0,
68cbdc84 1020 stolen: 0,
a4e07f72
JB
1021 failed: 0
1022 },
1023 runTime: {
a4e07f72
JB
1024 history: expect.any(CircularArray)
1025 },
1026 waitTime: {
a4e07f72
JB
1027 history: expect.any(CircularArray)
1028 },
5df69fab
JB
1029 elu: {
1030 idle: {
5df69fab
JB
1031 history: expect.any(CircularArray)
1032 },
1033 active: {
5df69fab 1034 history: expect.any(CircularArray)
f7510105 1035 }
5df69fab 1036 }
86bf340d 1037 })
bf9549ae 1038 }
fd7ebd49 1039 await pool.destroy()
bf9549ae
JB
1040 })
1041
2431bdb4 1042 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
7fd82a1c 1043 const pool = new DynamicThreadPool(
2431bdb4 1044 Math.floor(numberOfWorkers / 2),
8f4878b7 1045 numberOfWorkers,
b2fd3f4a 1046 './tests/worker-files/thread/testWorker.mjs'
9e619829 1047 )
09c2d0d3 1048 const promises = new Set()
ee9f5295
JB
1049 const maxMultiplier = 2
1050 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
09c2d0d3 1051 promises.add(pool.execute())
9e619829
JB
1052 }
1053 await Promise.all(promises)
f06e48d8 1054 for (const workerNode of pool.workerNodes) {
465b2940 1055 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
1056 tasks: {
1057 executed: expect.any(Number),
1058 executing: 0,
1059 queued: 0,
df593701 1060 maxQueued: 0,
463226a4 1061 sequentiallyStolen: 0,
68cbdc84 1062 stolen: 0,
a4e07f72
JB
1063 failed: 0
1064 },
1065 runTime: {
a4e07f72
JB
1066 history: expect.any(CircularArray)
1067 },
1068 waitTime: {
a4e07f72
JB
1069 history: expect.any(CircularArray)
1070 },
5df69fab
JB
1071 elu: {
1072 idle: {
5df69fab
JB
1073 history: expect.any(CircularArray)
1074 },
1075 active: {
5df69fab 1076 history: expect.any(CircularArray)
f7510105 1077 }
5df69fab 1078 }
86bf340d 1079 })
465b2940 1080 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
94407def
JB
1081 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1082 numberOfWorkers * maxMultiplier
1083 )
b97d82d8
JB
1084 expect(workerNode.usage.runTime.history.length).toBe(0)
1085 expect(workerNode.usage.waitTime.history.length).toBe(0)
1086 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1087 expect(workerNode.usage.elu.active.history.length).toBe(0)
9e619829
JB
1088 }
1089 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
f06e48d8 1090 for (const workerNode of pool.workerNodes) {
465b2940 1091 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
1092 tasks: {
1093 executed: 0,
1094 executing: 0,
1095 queued: 0,
df593701 1096 maxQueued: 0,
463226a4 1097 sequentiallyStolen: 0,
68cbdc84 1098 stolen: 0,
a4e07f72
JB
1099 failed: 0
1100 },
1101 runTime: {
a4e07f72
JB
1102 history: expect.any(CircularArray)
1103 },
1104 waitTime: {
a4e07f72
JB
1105 history: expect.any(CircularArray)
1106 },
5df69fab
JB
1107 elu: {
1108 idle: {
5df69fab
JB
1109 history: expect.any(CircularArray)
1110 },
1111 active: {
5df69fab 1112 history: expect.any(CircularArray)
f7510105 1113 }
5df69fab 1114 }
86bf340d 1115 })
465b2940
JB
1116 expect(workerNode.usage.runTime.history.length).toBe(0)
1117 expect(workerNode.usage.waitTime.history.length).toBe(0)
b97d82d8
JB
1118 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1119 expect(workerNode.usage.elu.active.history.length).toBe(0)
ee11a4a2 1120 }
fd7ebd49 1121 await pool.destroy()
ee11a4a2
JB
1122 })
1123
a1763c54
JB
1124 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1125 const pool = new DynamicClusterPool(
2431bdb4 1126 Math.floor(numberOfWorkers / 2),
164d950a 1127 numberOfWorkers,
a1763c54 1128 './tests/worker-files/cluster/testWorker.js'
164d950a 1129 )
c726f66c 1130 expect(pool.emitter.eventNames()).toStrictEqual([])
d46660cd 1131 let poolInfo
a1763c54 1132 let poolReady = 0
041dc05b 1133 pool.emitter.on(PoolEvents.ready, info => {
a1763c54 1134 ++poolReady
d46660cd
JB
1135 poolInfo = info
1136 })
a1763c54 1137 await waitPoolEvents(pool, PoolEvents.ready, 1)
c726f66c 1138 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
a1763c54 1139 expect(poolReady).toBe(1)
d46660cd 1140 expect(poolInfo).toStrictEqual({
23ccf9d7 1141 version,
d46660cd 1142 type: PoolTypes.dynamic,
a1763c54 1143 worker: WorkerTypes.cluster,
47352846 1144 started: true,
a1763c54 1145 ready: true,
2431bdb4
JB
1146 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1147 minSize: expect.any(Number),
1148 maxSize: expect.any(Number),
1149 workerNodes: expect.any(Number),
1150 idleWorkerNodes: expect.any(Number),
1151 busyWorkerNodes: expect.any(Number),
1152 executedTasks: expect.any(Number),
1153 executingTasks: expect.any(Number),
2431bdb4
JB
1154 failedTasks: expect.any(Number)
1155 })
1156 await pool.destroy()
1157 })
1158
a1763c54
JB
1159 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1160 const pool = new FixedThreadPool(
2431bdb4 1161 numberOfWorkers,
b2fd3f4a 1162 './tests/worker-files/thread/testWorker.mjs'
2431bdb4 1163 )
c726f66c 1164 expect(pool.emitter.eventNames()).toStrictEqual([])
a1763c54
JB
1165 const promises = new Set()
1166 let poolBusy = 0
2431bdb4 1167 let poolInfo
041dc05b 1168 pool.emitter.on(PoolEvents.busy, info => {
a1763c54 1169 ++poolBusy
2431bdb4
JB
1170 poolInfo = info
1171 })
c726f66c 1172 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
a1763c54
JB
1173 for (let i = 0; i < numberOfWorkers * 2; i++) {
1174 promises.add(pool.execute())
1175 }
1176 await Promise.all(promises)
1177 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1178 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1179 expect(poolBusy).toBe(numberOfWorkers + 1)
2431bdb4
JB
1180 expect(poolInfo).toStrictEqual({
1181 version,
a1763c54
JB
1182 type: PoolTypes.fixed,
1183 worker: WorkerTypes.thread,
47352846
JB
1184 started: true,
1185 ready: true,
2431bdb4 1186 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
d46660cd
JB
1187 minSize: expect.any(Number),
1188 maxSize: expect.any(Number),
1189 workerNodes: expect.any(Number),
1190 idleWorkerNodes: expect.any(Number),
1191 busyWorkerNodes: expect.any(Number),
a4e07f72
JB
1192 executedTasks: expect.any(Number),
1193 executingTasks: expect.any(Number),
a4e07f72 1194 failedTasks: expect.any(Number)
d46660cd 1195 })
164d950a
JB
1196 await pool.destroy()
1197 })
1198
a1763c54
JB
1199 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1200 const pool = new DynamicThreadPool(
1201 Math.floor(numberOfWorkers / 2),
7c0ba920 1202 numberOfWorkers,
b2fd3f4a 1203 './tests/worker-files/thread/testWorker.mjs'
7c0ba920 1204 )
c726f66c 1205 expect(pool.emitter.eventNames()).toStrictEqual([])
09c2d0d3 1206 const promises = new Set()
a1763c54 1207 let poolFull = 0
d46660cd 1208 let poolInfo
041dc05b 1209 pool.emitter.on(PoolEvents.full, info => {
a1763c54 1210 ++poolFull
d46660cd
JB
1211 poolInfo = info
1212 })
c726f66c 1213 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.full])
7c0ba920 1214 for (let i = 0; i < numberOfWorkers * 2; i++) {
f5d14e90 1215 promises.add(pool.execute())
7c0ba920 1216 }
cf597bc5 1217 await Promise.all(promises)
33e6bb4c 1218 expect(poolFull).toBe(1)
d46660cd 1219 expect(poolInfo).toStrictEqual({
23ccf9d7 1220 version,
a1763c54 1221 type: PoolTypes.dynamic,
d46660cd 1222 worker: WorkerTypes.thread,
47352846
JB
1223 started: true,
1224 ready: true,
2431bdb4 1225 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
8735b4e5
JB
1226 minSize: expect.any(Number),
1227 maxSize: expect.any(Number),
1228 workerNodes: expect.any(Number),
1229 idleWorkerNodes: expect.any(Number),
1230 busyWorkerNodes: expect.any(Number),
1231 executedTasks: expect.any(Number),
1232 executingTasks: expect.any(Number),
1233 failedTasks: expect.any(Number)
1234 })
1235 await pool.destroy()
1236 })
1237
3e8611a8 1238 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
b1aae695 1239 const pool = new FixedThreadPool(
8735b4e5 1240 numberOfWorkers,
b2fd3f4a 1241 './tests/worker-files/thread/testWorker.mjs',
8735b4e5
JB
1242 {
1243 enableTasksQueue: true
1244 }
1245 )
a074ffee 1246 stub(pool, 'hasBackPressure').returns(true)
c726f66c 1247 expect(pool.emitter.eventNames()).toStrictEqual([])
8735b4e5
JB
1248 const promises = new Set()
1249 let poolBackPressure = 0
1250 let poolInfo
041dc05b 1251 pool.emitter.on(PoolEvents.backPressure, info => {
8735b4e5
JB
1252 ++poolBackPressure
1253 poolInfo = info
1254 })
c726f66c 1255 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.backPressure])
033f1776 1256 for (let i = 0; i < numberOfWorkers + 1; i++) {
8735b4e5
JB
1257 promises.add(pool.execute())
1258 }
1259 await Promise.all(promises)
033f1776 1260 expect(poolBackPressure).toBe(1)
8735b4e5
JB
1261 expect(poolInfo).toStrictEqual({
1262 version,
3e8611a8 1263 type: PoolTypes.fixed,
8735b4e5 1264 worker: WorkerTypes.thread,
47352846
JB
1265 started: true,
1266 ready: true,
8735b4e5 1267 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
d46660cd
JB
1268 minSize: expect.any(Number),
1269 maxSize: expect.any(Number),
1270 workerNodes: expect.any(Number),
1271 idleWorkerNodes: expect.any(Number),
1272 busyWorkerNodes: expect.any(Number),
a4e07f72
JB
1273 executedTasks: expect.any(Number),
1274 executingTasks: expect.any(Number),
3e8611a8
JB
1275 maxQueuedTasks: expect.any(Number),
1276 queuedTasks: expect.any(Number),
1277 backPressure: true,
68cbdc84 1278 stolenTasks: expect.any(Number),
a4e07f72 1279 failedTasks: expect.any(Number)
d46660cd 1280 })
e458c82e 1281 expect(pool.hasBackPressure.callCount).toBe(5)
fd7ebd49 1282 await pool.destroy()
7c0ba920 1283 })
70a4f5ea 1284
85b2561d
JB
1285 it('Verify that destroy() waits for queued tasks to finish', async () => {
1286 const tasksFinishedTimeout = 2500
1287 const pool = new FixedThreadPool(
1288 numberOfWorkers,
1289 './tests/worker-files/thread/asyncWorker.mjs',
1290 {
1291 enableTasksQueue: true,
1292 tasksQueueOptions: { tasksFinishedTimeout }
1293 }
1294 )
1295 const maxMultiplier = 4
1296 let tasksFinished = 0
1297 for (const workerNode of pool.workerNodes) {
1298 workerNode.on('taskFinished', () => {
1299 ++tasksFinished
1300 })
1301 }
1302 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1303 pool.execute()
1304 }
1305 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1306 const startTime = performance.now()
1307 await pool.destroy()
1308 const elapsedTime = performance.now() - startTime
90afa746 1309 expect(tasksFinished).toBeLessThanOrEqual(numberOfWorkers * maxMultiplier)
85b2561d 1310 expect(elapsedTime).toBeGreaterThanOrEqual(2000)
ecab2e2a 1311 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 400)
85b2561d
JB
1312 })
1313
1314 it('Verify that destroy() waits until the tasks finished timeout is reached', async () => {
1315 const tasksFinishedTimeout = 1000
1316 const pool = new FixedThreadPool(
1317 numberOfWorkers,
1318 './tests/worker-files/thread/asyncWorker.mjs',
1319 {
1320 enableTasksQueue: true,
1321 tasksQueueOptions: { tasksFinishedTimeout }
1322 }
1323 )
1324 const maxMultiplier = 4
1325 let tasksFinished = 0
1326 for (const workerNode of pool.workerNodes) {
1327 workerNode.on('taskFinished', () => {
1328 ++tasksFinished
1329 })
1330 }
1331 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1332 pool.execute()
1333 }
1334 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1335 const startTime = performance.now()
1336 await pool.destroy()
1337 const elapsedTime = performance.now() - startTime
1338 expect(tasksFinished).toBe(0)
2885534c 1339 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 800)
85b2561d
JB
1340 })
1341
f18fd12b
JB
1342 it('Verify that pool asynchronous resource track tasks execution', async () => {
1343 let taskAsyncId
1344 let initCalls = 0
1345 let beforeCalls = 0
1346 let afterCalls = 0
1347 let resolveCalls = 0
1348 const hook = createHook({
1349 init (asyncId, type) {
1350 if (type === 'poolifier:task') {
1351 initCalls++
1352 taskAsyncId = asyncId
1353 }
1354 },
1355 before (asyncId) {
1356 if (asyncId === taskAsyncId) beforeCalls++
1357 },
1358 after (asyncId) {
1359 if (asyncId === taskAsyncId) afterCalls++
1360 },
1361 promiseResolve () {
1362 if (executionAsyncId() === taskAsyncId) resolveCalls++
1363 }
1364 })
f18fd12b
JB
1365 const pool = new FixedThreadPool(
1366 numberOfWorkers,
1367 './tests/worker-files/thread/testWorker.mjs'
1368 )
8954c0a3 1369 hook.enable()
f18fd12b
JB
1370 await pool.execute()
1371 hook.disable()
1372 expect(initCalls).toBe(1)
1373 expect(beforeCalls).toBe(1)
1374 expect(afterCalls).toBe(1)
1375 expect(resolveCalls).toBe(1)
8954c0a3 1376 await pool.destroy()
f18fd12b
JB
1377 })
1378
9eae3c69
JB
1379 it('Verify that hasTaskFunction() is working', async () => {
1380 const dynamicThreadPool = new DynamicThreadPool(
1381 Math.floor(numberOfWorkers / 2),
1382 numberOfWorkers,
b2fd3f4a 1383 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
9eae3c69
JB
1384 )
1385 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1386 expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1387 expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1388 true
1389 )
1390 expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true)
1391 expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true)
1392 expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false)
1393 await dynamicThreadPool.destroy()
1394 const fixedClusterPool = new FixedClusterPool(
1395 numberOfWorkers,
1396 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1397 )
1398 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1399 expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1400 expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1401 true
1402 )
1403 expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true)
1404 expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true)
1405 expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false)
1406 await fixedClusterPool.destroy()
1407 })
1408
1409 it('Verify that addTaskFunction() is working', async () => {
1410 const dynamicThreadPool = new DynamicThreadPool(
1411 Math.floor(numberOfWorkers / 2),
1412 numberOfWorkers,
b2fd3f4a 1413 './tests/worker-files/thread/testWorker.mjs'
9eae3c69
JB
1414 )
1415 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
3feeab69
JB
1416 await expect(
1417 dynamicThreadPool.addTaskFunction(0, () => {})
948faff7 1418 ).rejects.toThrow(new TypeError('name argument must be a string'))
3feeab69
JB
1419 await expect(
1420 dynamicThreadPool.addTaskFunction('', () => {})
948faff7 1421 ).rejects.toThrow(
3feeab69
JB
1422 new TypeError('name argument must not be an empty string')
1423 )
948faff7
JB
1424 await expect(dynamicThreadPool.addTaskFunction('test', 0)).rejects.toThrow(
1425 new TypeError('fn argument must be a function')
1426 )
1427 await expect(dynamicThreadPool.addTaskFunction('test', '')).rejects.toThrow(
1428 new TypeError('fn argument must be a function')
1429 )
9eae3c69
JB
1430 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1431 DEFAULT_TASK_NAME,
1432 'test'
1433 ])
1434 const echoTaskFunction = data => {
1435 return data
1436 }
1437 await expect(
1438 dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1439 ).resolves.toBe(true)
1440 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1441 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1442 echoTaskFunction
1443 )
1444 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1445 DEFAULT_TASK_NAME,
1446 'test',
1447 'echo'
1448 ])
1449 const taskFunctionData = { test: 'test' }
1450 const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
1451 expect(echoResult).toStrictEqual(taskFunctionData)
adee6053
JB
1452 for (const workerNode of dynamicThreadPool.workerNodes) {
1453 expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
1454 tasks: {
1455 executed: expect.any(Number),
1456 executing: 0,
1457 queued: 0,
463226a4 1458 sequentiallyStolen: 0,
5ad42e34 1459 stolen: 0,
adee6053
JB
1460 failed: 0
1461 },
1462 runTime: {
1463 history: new CircularArray()
1464 },
1465 waitTime: {
1466 history: new CircularArray()
1467 },
1468 elu: {
1469 idle: {
1470 history: new CircularArray()
1471 },
1472 active: {
1473 history: new CircularArray()
1474 }
1475 }
1476 })
1477 }
9eae3c69
JB
1478 await dynamicThreadPool.destroy()
1479 })
1480
1481 it('Verify that removeTaskFunction() is working', async () => {
1482 const dynamicThreadPool = new DynamicThreadPool(
1483 Math.floor(numberOfWorkers / 2),
1484 numberOfWorkers,
b2fd3f4a 1485 './tests/worker-files/thread/testWorker.mjs'
9eae3c69
JB
1486 )
1487 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1488 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1489 DEFAULT_TASK_NAME,
1490 'test'
1491 ])
948faff7 1492 await expect(dynamicThreadPool.removeTaskFunction('test')).rejects.toThrow(
16248b23 1493 new Error('Cannot remove a task function not handled on the pool side')
9eae3c69
JB
1494 )
1495 const echoTaskFunction = data => {
1496 return data
1497 }
1498 await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1499 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1500 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1501 echoTaskFunction
1502 )
1503 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1504 DEFAULT_TASK_NAME,
1505 'test',
1506 'echo'
1507 ])
1508 await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
1509 true
1510 )
1511 expect(dynamicThreadPool.taskFunctions.size).toBe(0)
1512 expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
1513 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1514 DEFAULT_TASK_NAME,
1515 'test'
1516 ])
1517 await dynamicThreadPool.destroy()
1518 })
1519
30500265 1520 it('Verify that listTaskFunctionNames() is working', async () => {
90d7d101
JB
1521 const dynamicThreadPool = new DynamicThreadPool(
1522 Math.floor(numberOfWorkers / 2),
1523 numberOfWorkers,
b2fd3f4a 1524 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
90d7d101
JB
1525 )
1526 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
66979634 1527 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
6cd5248f 1528 DEFAULT_TASK_NAME,
90d7d101
JB
1529 'jsonIntegerSerialization',
1530 'factorial',
1531 'fibonacci'
1532 ])
9eae3c69 1533 await dynamicThreadPool.destroy()
90d7d101
JB
1534 const fixedClusterPool = new FixedClusterPool(
1535 numberOfWorkers,
1536 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1537 )
1538 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
66979634 1539 expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([
6cd5248f 1540 DEFAULT_TASK_NAME,
90d7d101
JB
1541 'jsonIntegerSerialization',
1542 'factorial',
1543 'fibonacci'
1544 ])
0fe39c97 1545 await fixedClusterPool.destroy()
90d7d101
JB
1546 })
1547
9eae3c69 1548 it('Verify that setDefaultTaskFunction() is working', async () => {
30500265
JB
1549 const dynamicThreadPool = new DynamicThreadPool(
1550 Math.floor(numberOfWorkers / 2),
1551 numberOfWorkers,
b2fd3f4a 1552 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
30500265
JB
1553 )
1554 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
711623b8 1555 const workerId = dynamicThreadPool.workerNodes[0].info.id
948faff7 1556 await expect(dynamicThreadPool.setDefaultTaskFunction(0)).rejects.toThrow(
b0b55f57 1557 new Error(
711623b8 1558 `Task function operation 'default' failed on worker ${workerId} with error: 'TypeError: name parameter is not a string'`
b0b55f57
JB
1559 )
1560 )
1561 await expect(
1562 dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
948faff7 1563 ).rejects.toThrow(
b0b55f57 1564 new Error(
711623b8 1565 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function reserved name as the default task function'`
b0b55f57
JB
1566 )
1567 )
1568 await expect(
1569 dynamicThreadPool.setDefaultTaskFunction('unknown')
948faff7 1570 ).rejects.toThrow(
b0b55f57 1571 new Error(
711623b8 1572 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function to a non-existing task function'`
b0b55f57
JB
1573 )
1574 )
9eae3c69
JB
1575 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1576 DEFAULT_TASK_NAME,
1577 'jsonIntegerSerialization',
1578 'factorial',
1579 'fibonacci'
1580 ])
1581 await expect(
1582 dynamicThreadPool.setDefaultTaskFunction('factorial')
1583 ).resolves.toBe(true)
1584 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1585 DEFAULT_TASK_NAME,
1586 'factorial',
1587 'jsonIntegerSerialization',
1588 'fibonacci'
1589 ])
1590 await expect(
1591 dynamicThreadPool.setDefaultTaskFunction('fibonacci')
1592 ).resolves.toBe(true)
1593 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1594 DEFAULT_TASK_NAME,
1595 'fibonacci',
1596 'jsonIntegerSerialization',
1597 'factorial'
1598 ])
cda9ba34 1599 await dynamicThreadPool.destroy()
30500265
JB
1600 })
1601
90d7d101 1602 it('Verify that multiple task functions worker is working', async () => {
70a4f5ea 1603 const pool = new DynamicClusterPool(
2431bdb4 1604 Math.floor(numberOfWorkers / 2),
70a4f5ea 1605 numberOfWorkers,
90d7d101 1606 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
70a4f5ea
JB
1607 )
1608 const data = { n: 10 }
82888165 1609 const result0 = await pool.execute(data)
30b963d4 1610 expect(result0).toStrictEqual({ ok: 1 })
70a4f5ea 1611 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
30b963d4 1612 expect(result1).toStrictEqual({ ok: 1 })
70a4f5ea
JB
1613 const result2 = await pool.execute(data, 'factorial')
1614 expect(result2).toBe(3628800)
1615 const result3 = await pool.execute(data, 'fibonacci')
024daf59 1616 expect(result3).toBe(55)
5bb5be17
JB
1617 expect(pool.info.executingTasks).toBe(0)
1618 expect(pool.info.executedTasks).toBe(4)
b414b84c 1619 for (const workerNode of pool.workerNodes) {
66979634 1620 expect(workerNode.info.taskFunctionNames).toStrictEqual([
6cd5248f 1621 DEFAULT_TASK_NAME,
b414b84c
JB
1622 'jsonIntegerSerialization',
1623 'factorial',
1624 'fibonacci'
1625 ])
1626 expect(workerNode.taskFunctionsUsage.size).toBe(3)
66979634 1627 for (const name of pool.listTaskFunctionNames()) {
5bb5be17
JB
1628 expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
1629 tasks: {
1630 executed: expect.any(Number),
4ba4c7f9 1631 executing: 0,
5bb5be17 1632 failed: 0,
68cbdc84 1633 queued: 0,
463226a4 1634 sequentiallyStolen: 0,
68cbdc84 1635 stolen: 0
5bb5be17
JB
1636 },
1637 runTime: {
1638 history: expect.any(CircularArray)
1639 },
1640 waitTime: {
1641 history: expect.any(CircularArray)
1642 },
1643 elu: {
1644 idle: {
1645 history: expect.any(CircularArray)
1646 },
1647 active: {
1648 history: expect.any(CircularArray)
1649 }
1650 }
1651 })
1652 expect(
4ba4c7f9
JB
1653 workerNode.getTaskFunctionWorkerUsage(name).tasks.executed
1654 ).toBeGreaterThan(0)
5bb5be17 1655 }
dfd7ec01
JB
1656 expect(
1657 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1658 ).toStrictEqual(
66979634
JB
1659 workerNode.getTaskFunctionWorkerUsage(
1660 workerNode.info.taskFunctionNames[1]
1661 )
dfd7ec01 1662 )
5bb5be17 1663 }
0fe39c97 1664 await pool.destroy()
70a4f5ea 1665 })
52a23942
JB
1666
1667 it('Verify sendKillMessageToWorker()', async () => {
1668 const pool = new DynamicClusterPool(
1669 Math.floor(numberOfWorkers / 2),
1670 numberOfWorkers,
1671 './tests/worker-files/cluster/testWorker.js'
1672 )
1673 const workerNodeKey = 0
1674 await expect(
adee6053 1675 pool.sendKillMessageToWorker(workerNodeKey)
52a23942
JB
1676 ).resolves.toBeUndefined()
1677 await pool.destroy()
1678 })
adee6053
JB
1679
1680 it('Verify sendTaskFunctionOperationToWorker()', async () => {
1681 const pool = new DynamicClusterPool(
1682 Math.floor(numberOfWorkers / 2),
1683 numberOfWorkers,
1684 './tests/worker-files/cluster/testWorker.js'
1685 )
1686 const workerNodeKey = 0
1687 await expect(
1688 pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
1689 taskFunctionOperation: 'add',
1690 taskFunctionName: 'empty',
1691 taskFunction: (() => {}).toString()
1692 })
1693 ).resolves.toBe(true)
1694 expect(
1695 pool.workerNodes[workerNodeKey].info.taskFunctionNames
1696 ).toStrictEqual([DEFAULT_TASK_NAME, 'test', 'empty'])
1697 await pool.destroy()
1698 })
1699
1700 it('Verify sendTaskFunctionOperationToWorkers()', async () => {
1701 const pool = new DynamicClusterPool(
1702 Math.floor(numberOfWorkers / 2),
1703 numberOfWorkers,
1704 './tests/worker-files/cluster/testWorker.js'
1705 )
adee6053
JB
1706 await expect(
1707 pool.sendTaskFunctionOperationToWorkers({
1708 taskFunctionOperation: 'add',
1709 taskFunctionName: 'empty',
1710 taskFunction: (() => {}).toString()
1711 })
1712 ).resolves.toBe(true)
1713 for (const workerNode of pool.workerNodes) {
1714 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1715 DEFAULT_TASK_NAME,
1716 'test',
1717 'empty'
1718 ])
1719 }
1720 await pool.destroy()
1721 })
3ec964d6 1722})