test: less strict expectation for CI
[poolifier.git] / tests / pools / abstract-pool.test.mjs
CommitLineData
a074ffee 1import { EventEmitterAsyncResource } from 'node:events'
2eb14889 2import { dirname, join } from 'node:path'
a074ffee 3import { readFileSync } from 'node:fs'
2eb14889 4import { fileURLToPath } from 'node:url'
f18fd12b 5import { createHook, executionAsyncId } from 'node:async_hooks'
a074ffee
JB
6import { expect } from 'expect'
7import { restore, stub } from 'sinon'
8import {
70a4f5ea 9 DynamicClusterPool,
9e619829 10 DynamicThreadPool,
aee46736 11 FixedClusterPool,
e843b904 12 FixedThreadPool,
aee46736 13 PoolEvents,
184855e6 14 PoolTypes,
3d6dd312 15 WorkerChoiceStrategies,
184855e6 16 WorkerTypes
a074ffee
JB
17} from '../../lib/index.js'
18import { CircularArray } from '../../lib/circular-array.js'
19import { Deque } from '../../lib/deque.js'
20import { DEFAULT_TASK_NAME } from '../../lib/utils.js'
21import { waitPoolEvents } from '../test-utils.js'
22import { WorkerNode } from '../../lib/pools/worker-node.js'
e1ffb94f
JB
23
24describe('Abstract pool test suite', () => {
2eb14889
JB
25 const version = JSON.parse(
26 readFileSync(
a5e5599c 27 join(dirname(fileURLToPath(import.meta.url)), '../..', 'package.json'),
2eb14889
JB
28 'utf8'
29 )
30 ).version
fc027381 31 const numberOfWorkers = 2
a8884ffd 32 class StubPoolWithIsMain extends FixedThreadPool {
e1ffb94f
JB
33 isMain () {
34 return false
35 }
3ec964d6 36 }
3ec964d6 37
dc021bcc 38 afterEach(() => {
a074ffee 39 restore()
dc021bcc
JB
40 })
41
bcf85f7f
JB
42 it('Verify that pool can be created and destroyed', async () => {
43 const pool = new FixedThreadPool(
44 numberOfWorkers,
45 './tests/worker-files/thread/testWorker.mjs'
46 )
47 expect(pool).toBeInstanceOf(FixedThreadPool)
48 await pool.destroy()
49 })
50
51 it('Verify that pool cannot be created from a non main thread/process', () => {
8d3782fa
JB
52 expect(
53 () =>
a8884ffd 54 new StubPoolWithIsMain(
7c0ba920 55 numberOfWorkers,
b2fd3f4a 56 './tests/worker-files/thread/testWorker.mjs',
8d3782fa 57 {
041dc05b 58 errorHandler: e => console.error(e)
8d3782fa
JB
59 }
60 )
948faff7 61 ).toThrow(
e695d66f
JB
62 new Error(
63 'Cannot start a pool from a worker with the same type as the pool'
64 )
04f45163 65 )
3ec964d6 66 })
c510fea7 67
bc61cfe6
JB
68 it('Verify that pool statuses properties are set', async () => {
69 const pool = new FixedThreadPool(
70 numberOfWorkers,
b2fd3f4a 71 './tests/worker-files/thread/testWorker.mjs'
bc61cfe6 72 )
bc61cfe6 73 expect(pool.started).toBe(true)
711623b8
JB
74 expect(pool.starting).toBe(false)
75 expect(pool.destroying).toBe(false)
bc61cfe6 76 await pool.destroy()
bc61cfe6
JB
77 })
78
c510fea7 79 it('Verify that filePath is checked', () => {
948faff7 80 expect(() => new FixedThreadPool(numberOfWorkers)).toThrow(
c3719753
JB
81 new TypeError('The worker file path must be specified')
82 )
83 expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrow(
84 new TypeError('The worker file path must be a string')
3d6dd312
JB
85 )
86 expect(
87 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
948faff7 88 ).toThrow(new Error("Cannot find the worker file './dummyWorker.ts'"))
8d3782fa
JB
89 })
90
91 it('Verify that numberOfWorkers is checked', () => {
8003c026
JB
92 expect(
93 () =>
94 new FixedThreadPool(
95 undefined,
b2fd3f4a 96 './tests/worker-files/thread/testWorker.mjs'
8003c026 97 )
948faff7 98 ).toThrow(
e695d66f
JB
99 new Error(
100 'Cannot instantiate a pool without specifying the number of workers'
101 )
8d3782fa
JB
102 )
103 })
104
105 it('Verify that a negative number of workers is checked', () => {
106 expect(
107 () =>
108 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
948faff7 109 ).toThrow(
473c717a
JB
110 new RangeError(
111 'Cannot instantiate a pool with a negative number of workers'
112 )
8d3782fa
JB
113 )
114 })
115
116 it('Verify that a non integer number of workers is checked', () => {
117 expect(
118 () =>
b2fd3f4a 119 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.mjs')
948faff7 120 ).toThrow(
473c717a 121 new TypeError(
0d80593b 122 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
123 )
124 )
c510fea7 125 })
7c0ba920 126
26ce26ca
JB
127 it('Verify that pool arguments number and pool type are checked', () => {
128 expect(
129 () =>
130 new FixedThreadPool(
131 numberOfWorkers,
132 './tests/worker-files/thread/testWorker.mjs',
133 undefined,
134 numberOfWorkers * 2
135 )
136 ).toThrow(
137 new Error(
138 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
139 )
140 )
141 })
142
216541b6 143 it('Verify that dynamic pool sizing is checked', () => {
a5ed75b7
JB
144 expect(
145 () =>
146 new DynamicClusterPool(
147 1,
148 undefined,
149 './tests/worker-files/cluster/testWorker.js'
150 )
948faff7 151 ).toThrow(
a5ed75b7
JB
152 new TypeError(
153 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
154 )
155 )
2761efb4
JB
156 expect(
157 () =>
158 new DynamicThreadPool(
159 0.5,
160 1,
b2fd3f4a 161 './tests/worker-files/thread/testWorker.mjs'
2761efb4 162 )
948faff7 163 ).toThrow(
2761efb4
JB
164 new TypeError(
165 'Cannot instantiate a pool with a non safe integer number of workers'
166 )
167 )
168 expect(
169 () =>
170 new DynamicClusterPool(
171 0,
172 0.5,
a5ed75b7 173 './tests/worker-files/cluster/testWorker.js'
2761efb4 174 )
948faff7 175 ).toThrow(
2761efb4
JB
176 new TypeError(
177 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
178 )
179 )
2431bdb4
JB
180 expect(
181 () =>
b2fd3f4a
JB
182 new DynamicThreadPool(
183 2,
184 1,
185 './tests/worker-files/thread/testWorker.mjs'
186 )
948faff7 187 ).toThrow(
2431bdb4
JB
188 new RangeError(
189 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
190 )
191 )
192 expect(
193 () =>
b2fd3f4a
JB
194 new DynamicThreadPool(
195 0,
196 0,
197 './tests/worker-files/thread/testWorker.mjs'
198 )
948faff7 199 ).toThrow(
2431bdb4 200 new RangeError(
213cbac6 201 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
2431bdb4
JB
202 )
203 )
21f710aa
JB
204 expect(
205 () =>
213cbac6
JB
206 new DynamicClusterPool(
207 1,
208 1,
209 './tests/worker-files/cluster/testWorker.js'
210 )
948faff7 211 ).toThrow(
21f710aa 212 new RangeError(
213cbac6 213 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
21f710aa
JB
214 )
215 )
2431bdb4
JB
216 })
217
fd7ebd49 218 it('Verify that pool options are checked', async () => {
7c0ba920
JB
219 let pool = new FixedThreadPool(
220 numberOfWorkers,
b2fd3f4a 221 './tests/worker-files/thread/testWorker.mjs'
7c0ba920 222 )
b5604034 223 expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
47352846
JB
224 expect(pool.opts).toStrictEqual({
225 startWorkers: true,
226 enableEvents: true,
227 restartWorkerOnError: true,
228 enableTasksQueue: false,
26ce26ca 229 workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN
8990357d
JB
230 })
231 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
00e1bdeb
JB
232 retries:
233 pool.info.maxSize +
234 Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
932fc8be 235 runTime: { median: false },
5df69fab 236 waitTime: { median: false },
00e1bdeb
JB
237 elu: { median: false },
238 weights: expect.objectContaining({
239 0: expect.any(Number),
240 [pool.info.maxSize - 1]: expect.any(Number)
241 })
da309861 242 })
999ef664
JB
243 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
244 .workerChoiceStrategies) {
fde30586
JB
245 expect(workerChoiceStrategy.opts).toStrictEqual(
246 expect.objectContaining({
247 retries:
248 pool.info.maxSize +
249 Object.keys(workerChoiceStrategy.opts.weights).length,
250 runTime: { median: false },
251 waitTime: { median: false },
252 elu: { median: false }
253 // weights: expect.objectContaining({
254 // 0: expect.any(Number),
255 // [pool.info.maxSize - 1]: expect.any(Number)
256 // })
00e1bdeb 257 })
fde30586 258 )
999ef664 259 }
fd7ebd49 260 await pool.destroy()
73bfd59d 261 const testHandler = () => console.info('test handler executed')
7c0ba920
JB
262 pool = new FixedThreadPool(
263 numberOfWorkers,
b2fd3f4a 264 './tests/worker-files/thread/testWorker.mjs',
7c0ba920 265 {
e4543b14 266 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
49be33fe 267 workerChoiceStrategyOptions: {
932fc8be 268 runTime: { median: true },
fc027381 269 weights: { 0: 300, 1: 200 }
49be33fe 270 },
35cf1c03 271 enableEvents: false,
1f68cede 272 restartWorkerOnError: false,
ff733df7 273 enableTasksQueue: true,
d4aeae5a 274 tasksQueueOptions: { concurrency: 2 },
35cf1c03
JB
275 messageHandler: testHandler,
276 errorHandler: testHandler,
277 onlineHandler: testHandler,
278 exitHandler: testHandler
7c0ba920
JB
279 }
280 )
7c0ba920 281 expect(pool.emitter).toBeUndefined()
47352846
JB
282 expect(pool.opts).toStrictEqual({
283 startWorkers: true,
284 enableEvents: false,
285 restartWorkerOnError: false,
286 enableTasksQueue: true,
287 tasksQueueOptions: {
288 concurrency: 2,
2324f8c9 289 size: Math.pow(numberOfWorkers, 2),
dbd73092 290 taskStealing: true,
32b141fd 291 tasksStealingOnBackPressure: true,
568d0075 292 tasksFinishedTimeout: 2000
47352846
JB
293 },
294 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
295 workerChoiceStrategyOptions: {
47352846 296 runTime: { median: true },
47352846
JB
297 weights: { 0: 300, 1: 200 }
298 },
299 onlineHandler: testHandler,
300 messageHandler: testHandler,
301 errorHandler: testHandler,
302 exitHandler: testHandler
8990357d
JB
303 })
304 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
449cd154
JB
305 retries:
306 pool.info.maxSize +
307 Object.keys(pool.opts.workerChoiceStrategyOptions.weights).length,
8990357d
JB
308 runTime: { median: true },
309 waitTime: { median: false },
310 elu: { median: false },
fc027381 311 weights: { 0: 300, 1: 200 }
da309861 312 })
999ef664
JB
313 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
314 .workerChoiceStrategies) {
315 expect(workerChoiceStrategy.opts).toStrictEqual({
449cd154
JB
316 retries:
317 pool.info.maxSize +
318 Object.keys(pool.opts.workerChoiceStrategyOptions.weights).length,
999ef664
JB
319 runTime: { median: true },
320 waitTime: { median: false },
321 elu: { median: false },
322 weights: { 0: 300, 1: 200 }
323 })
324 }
fd7ebd49 325 await pool.destroy()
7c0ba920
JB
326 })
327
fe291b64 328 it('Verify that pool options are validated', () => {
d4aeae5a
JB
329 expect(
330 () =>
331 new FixedThreadPool(
332 numberOfWorkers,
b2fd3f4a 333 './tests/worker-files/thread/testWorker.mjs',
d4aeae5a 334 {
f0d7f803 335 workerChoiceStrategy: 'invalidStrategy'
d4aeae5a
JB
336 }
337 )
948faff7 338 ).toThrow(new Error("Invalid worker choice strategy 'invalidStrategy'"))
49be33fe
JB
339 expect(
340 () =>
341 new FixedThreadPool(
342 numberOfWorkers,
b2fd3f4a 343 './tests/worker-files/thread/testWorker.mjs',
49be33fe
JB
344 {
345 workerChoiceStrategyOptions: { weights: {} }
346 }
347 )
948faff7 348 ).toThrow(
8735b4e5
JB
349 new Error(
350 'Invalid worker choice strategy options: must have a weight for each worker node'
351 )
49be33fe 352 )
f0d7f803
JB
353 expect(
354 () =>
355 new FixedThreadPool(
356 numberOfWorkers,
b2fd3f4a 357 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
358 {
359 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
360 }
361 )
948faff7 362 ).toThrow(
8735b4e5
JB
363 new Error(
364 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
365 )
f0d7f803 366 )
5c4c2dee
JB
367 expect(
368 () =>
369 new FixedThreadPool(
370 numberOfWorkers,
b2fd3f4a 371 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
372 {
373 enableTasksQueue: true,
374 tasksQueueOptions: 'invalidTasksQueueOptions'
375 }
376 )
948faff7 377 ).toThrow(
5c4c2dee
JB
378 new TypeError('Invalid tasks queue options: must be a plain object')
379 )
f0d7f803
JB
380 expect(
381 () =>
382 new FixedThreadPool(
383 numberOfWorkers,
b2fd3f4a 384 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
385 {
386 enableTasksQueue: true,
387 tasksQueueOptions: { concurrency: 0 }
388 }
389 )
948faff7 390 ).toThrow(
e695d66f 391 new RangeError(
20c6f652 392 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
8735b4e5
JB
393 )
394 )
f0d7f803
JB
395 expect(
396 () =>
397 new FixedThreadPool(
398 numberOfWorkers,
b2fd3f4a 399 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
400 {
401 enableTasksQueue: true,
5c4c2dee 402 tasksQueueOptions: { concurrency: -1 }
f0d7f803
JB
403 }
404 )
948faff7 405 ).toThrow(
5c4c2dee
JB
406 new RangeError(
407 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
408 )
8735b4e5 409 )
f0d7f803
JB
410 expect(
411 () =>
412 new FixedThreadPool(
413 numberOfWorkers,
b2fd3f4a 414 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
415 {
416 enableTasksQueue: true,
417 tasksQueueOptions: { concurrency: 0.2 }
418 }
419 )
948faff7 420 ).toThrow(
20c6f652 421 new TypeError('Invalid worker node tasks concurrency: must be an integer')
8735b4e5 422 )
5c4c2dee
JB
423 expect(
424 () =>
425 new FixedThreadPool(
426 numberOfWorkers,
b2fd3f4a 427 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
428 {
429 enableTasksQueue: true,
430 tasksQueueOptions: { size: 0 }
431 }
432 )
948faff7 433 ).toThrow(
5c4c2dee
JB
434 new RangeError(
435 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
436 )
437 )
438 expect(
439 () =>
440 new FixedThreadPool(
441 numberOfWorkers,
b2fd3f4a 442 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
443 {
444 enableTasksQueue: true,
445 tasksQueueOptions: { size: -1 }
446 }
447 )
948faff7 448 ).toThrow(
5c4c2dee
JB
449 new RangeError(
450 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
451 )
452 )
453 expect(
454 () =>
455 new FixedThreadPool(
456 numberOfWorkers,
b2fd3f4a 457 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
458 {
459 enableTasksQueue: true,
460 tasksQueueOptions: { size: 0.2 }
461 }
462 )
948faff7 463 ).toThrow(
5c4c2dee
JB
464 new TypeError('Invalid worker node tasks queue size: must be an integer')
465 )
d4aeae5a
JB
466 })
467
2431bdb4 468 it('Verify that pool worker choice strategy options can be set', async () => {
a20f0ba5
JB
469 const pool = new FixedThreadPool(
470 numberOfWorkers,
b2fd3f4a 471 './tests/worker-files/thread/testWorker.mjs',
a20f0ba5
JB
472 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
473 )
26ce26ca 474 expect(pool.opts.workerChoiceStrategyOptions).toBeUndefined()
8990357d 475 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
00e1bdeb
JB
476 retries:
477 pool.info.maxSize +
478 Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
932fc8be 479 runTime: { median: false },
5df69fab 480 waitTime: { median: false },
00e1bdeb
JB
481 elu: { median: false },
482 weights: expect.objectContaining({
483 0: expect.any(Number),
484 [pool.info.maxSize - 1]: expect.any(Number)
485 })
a20f0ba5
JB
486 })
487 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
488 .workerChoiceStrategies) {
55ddaf0f
JB
489 expect(workerChoiceStrategy.opts).toStrictEqual(
490 expect.objectContaining({
491 retries:
492 pool.info.maxSize +
493 Object.keys(workerChoiceStrategy.opts.weights).length,
494 runTime: { median: false },
495 waitTime: { median: false },
496 elu: { median: false }
497 // weights: expect.objectContaining({
498 // 0: expect.any(Number),
499 // [pool.info.maxSize - 1]: expect.any(Number)
500 // })
00e1bdeb 501 })
55ddaf0f 502 )
a20f0ba5 503 }
87de9ff5
JB
504 expect(
505 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
506 ).toStrictEqual({
932fc8be
JB
507 runTime: {
508 aggregate: true,
509 average: true,
510 median: false
511 },
512 waitTime: {
513 aggregate: false,
514 average: false,
515 median: false
516 },
5df69fab 517 elu: {
9adcefab
JB
518 aggregate: true,
519 average: true,
5df69fab
JB
520 median: false
521 }
86bf340d 522 })
9adcefab
JB
523 pool.setWorkerChoiceStrategyOptions({
524 runTime: { median: true },
525 elu: { median: true }
526 })
a20f0ba5 527 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
9adcefab 528 runTime: { median: true },
8990357d
JB
529 elu: { median: true }
530 })
531 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
00e1bdeb
JB
532 retries:
533 pool.info.maxSize +
534 Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
8990357d
JB
535 runTime: { median: true },
536 waitTime: { median: false },
00e1bdeb
JB
537 elu: { median: true },
538 weights: expect.objectContaining({
539 0: expect.any(Number),
540 [pool.info.maxSize - 1]: expect.any(Number)
541 })
a20f0ba5
JB
542 })
543 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
544 .workerChoiceStrategies) {
932fc8be 545 expect(workerChoiceStrategy.opts).toStrictEqual({
00e1bdeb
JB
546 retries:
547 pool.info.maxSize +
548 Object.keys(workerChoiceStrategy.opts.weights).length,
9adcefab 549 runTime: { median: true },
8990357d 550 waitTime: { median: false },
00e1bdeb
JB
551 elu: { median: true },
552 weights: expect.objectContaining({
553 0: expect.any(Number),
554 [pool.info.maxSize - 1]: expect.any(Number)
555 })
932fc8be 556 })
a20f0ba5 557 }
87de9ff5
JB
558 expect(
559 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
560 ).toStrictEqual({
932fc8be
JB
561 runTime: {
562 aggregate: true,
563 average: false,
564 median: true
565 },
566 waitTime: {
567 aggregate: false,
568 average: false,
569 median: false
570 },
5df69fab 571 elu: {
9adcefab 572 aggregate: true,
5df69fab 573 average: false,
9adcefab 574 median: true
5df69fab 575 }
86bf340d 576 })
9adcefab
JB
577 pool.setWorkerChoiceStrategyOptions({
578 runTime: { median: false },
579 elu: { median: false }
580 })
a20f0ba5 581 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
8990357d 582 runTime: { median: false },
8990357d
JB
583 elu: { median: false }
584 })
585 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
00e1bdeb
JB
586 retries:
587 pool.info.maxSize +
588 Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
9adcefab 589 runTime: { median: false },
8990357d 590 waitTime: { median: false },
00e1bdeb
JB
591 elu: { median: false },
592 weights: expect.objectContaining({
593 0: expect.any(Number),
594 [pool.info.maxSize - 1]: expect.any(Number)
595 })
a20f0ba5
JB
596 })
597 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
598 .workerChoiceStrategies) {
932fc8be 599 expect(workerChoiceStrategy.opts).toStrictEqual({
00e1bdeb
JB
600 retries:
601 pool.info.maxSize +
602 Object.keys(workerChoiceStrategy.opts.weights).length,
9adcefab 603 runTime: { median: false },
8990357d 604 waitTime: { median: false },
00e1bdeb
JB
605 elu: { median: false },
606 weights: expect.objectContaining({
607 0: expect.any(Number),
608 [pool.info.maxSize - 1]: expect.any(Number)
609 })
932fc8be 610 })
a20f0ba5 611 }
87de9ff5
JB
612 expect(
613 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
614 ).toStrictEqual({
932fc8be
JB
615 runTime: {
616 aggregate: true,
617 average: true,
618 median: false
619 },
620 waitTime: {
621 aggregate: false,
622 average: false,
623 median: false
624 },
5df69fab 625 elu: {
9adcefab
JB
626 aggregate: true,
627 average: true,
5df69fab
JB
628 median: false
629 }
86bf340d 630 })
1f95d544
JB
631 expect(() =>
632 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
948faff7 633 ).toThrow(
8735b4e5
JB
634 new TypeError(
635 'Invalid worker choice strategy options: must be a plain object'
636 )
1f95d544 637 )
948faff7 638 expect(() => pool.setWorkerChoiceStrategyOptions({ weights: {} })).toThrow(
8735b4e5
JB
639 new Error(
640 'Invalid worker choice strategy options: must have a weight for each worker node'
641 )
1f95d544
JB
642 )
643 expect(() =>
644 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
948faff7 645 ).toThrow(
8735b4e5
JB
646 new Error(
647 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
648 )
1f95d544 649 )
a20f0ba5
JB
650 await pool.destroy()
651 })
652
2431bdb4 653 it('Verify that pool tasks queue can be enabled/disabled', async () => {
a20f0ba5
JB
654 const pool = new FixedThreadPool(
655 numberOfWorkers,
b2fd3f4a 656 './tests/worker-files/thread/testWorker.mjs'
a20f0ba5
JB
657 )
658 expect(pool.opts.enableTasksQueue).toBe(false)
659 expect(pool.opts.tasksQueueOptions).toBeUndefined()
660 pool.enableTasksQueue(true)
661 expect(pool.opts.enableTasksQueue).toBe(true)
20c6f652
JB
662 expect(pool.opts.tasksQueueOptions).toStrictEqual({
663 concurrency: 1,
2324f8c9 664 size: Math.pow(numberOfWorkers, 2),
dbd73092 665 taskStealing: true,
32b141fd 666 tasksStealingOnBackPressure: true,
568d0075 667 tasksFinishedTimeout: 2000
20c6f652 668 })
a20f0ba5
JB
669 pool.enableTasksQueue(true, { concurrency: 2 })
670 expect(pool.opts.enableTasksQueue).toBe(true)
20c6f652
JB
671 expect(pool.opts.tasksQueueOptions).toStrictEqual({
672 concurrency: 2,
2324f8c9 673 size: Math.pow(numberOfWorkers, 2),
dbd73092 674 taskStealing: true,
32b141fd 675 tasksStealingOnBackPressure: true,
568d0075 676 tasksFinishedTimeout: 2000
20c6f652 677 })
a20f0ba5
JB
678 pool.enableTasksQueue(false)
679 expect(pool.opts.enableTasksQueue).toBe(false)
680 expect(pool.opts.tasksQueueOptions).toBeUndefined()
681 await pool.destroy()
682 })
683
2431bdb4 684 it('Verify that pool tasks queue options can be set', async () => {
a20f0ba5
JB
685 const pool = new FixedThreadPool(
686 numberOfWorkers,
b2fd3f4a 687 './tests/worker-files/thread/testWorker.mjs',
a20f0ba5
JB
688 { enableTasksQueue: true }
689 )
20c6f652
JB
690 expect(pool.opts.tasksQueueOptions).toStrictEqual({
691 concurrency: 1,
2324f8c9 692 size: Math.pow(numberOfWorkers, 2),
dbd73092 693 taskStealing: true,
32b141fd 694 tasksStealingOnBackPressure: true,
568d0075 695 tasksFinishedTimeout: 2000
20c6f652 696 })
d6ca1416 697 for (const workerNode of pool.workerNodes) {
2324f8c9
JB
698 expect(workerNode.tasksQueueBackPressureSize).toBe(
699 pool.opts.tasksQueueOptions.size
700 )
d6ca1416
JB
701 }
702 pool.setTasksQueueOptions({
703 concurrency: 2,
2324f8c9 704 size: 2,
d6ca1416 705 taskStealing: false,
32b141fd 706 tasksStealingOnBackPressure: false,
568d0075 707 tasksFinishedTimeout: 3000
d6ca1416 708 })
20c6f652
JB
709 expect(pool.opts.tasksQueueOptions).toStrictEqual({
710 concurrency: 2,
2324f8c9 711 size: 2,
d6ca1416 712 taskStealing: false,
32b141fd 713 tasksStealingOnBackPressure: false,
568d0075 714 tasksFinishedTimeout: 3000
d6ca1416
JB
715 })
716 for (const workerNode of pool.workerNodes) {
2324f8c9
JB
717 expect(workerNode.tasksQueueBackPressureSize).toBe(
718 pool.opts.tasksQueueOptions.size
719 )
d6ca1416
JB
720 }
721 pool.setTasksQueueOptions({
722 concurrency: 1,
723 taskStealing: true,
724 tasksStealingOnBackPressure: true
725 })
726 expect(pool.opts.tasksQueueOptions).toStrictEqual({
727 concurrency: 1,
2324f8c9 728 size: Math.pow(numberOfWorkers, 2),
dbd73092 729 taskStealing: true,
32b141fd 730 tasksStealingOnBackPressure: true,
568d0075 731 tasksFinishedTimeout: 2000
20c6f652 732 })
d6ca1416 733 for (const workerNode of pool.workerNodes) {
2324f8c9
JB
734 expect(workerNode.tasksQueueBackPressureSize).toBe(
735 pool.opts.tasksQueueOptions.size
736 )
d6ca1416 737 }
948faff7 738 expect(() => pool.setTasksQueueOptions('invalidTasksQueueOptions')).toThrow(
8735b4e5
JB
739 new TypeError('Invalid tasks queue options: must be a plain object')
740 )
948faff7 741 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrow(
e695d66f 742 new RangeError(
20c6f652 743 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
8735b4e5
JB
744 )
745 )
948faff7 746 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrow(
e695d66f 747 new RangeError(
20c6f652 748 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
8735b4e5 749 )
a20f0ba5 750 )
948faff7 751 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrow(
20c6f652
JB
752 new TypeError('Invalid worker node tasks concurrency: must be an integer')
753 )
948faff7 754 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrow(
20c6f652 755 new RangeError(
68dbcdc0 756 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
20c6f652
JB
757 )
758 )
948faff7 759 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrow(
20c6f652 760 new RangeError(
68dbcdc0 761 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
20c6f652
JB
762 )
763 )
948faff7 764 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrow(
68dbcdc0 765 new TypeError('Invalid worker node tasks queue size: must be an integer')
f0d7f803 766 )
a20f0ba5
JB
767 await pool.destroy()
768 })
769
6b27d407
JB
770 it('Verify that pool info is set', async () => {
771 let pool = new FixedThreadPool(
772 numberOfWorkers,
b2fd3f4a 773 './tests/worker-files/thread/testWorker.mjs'
6b27d407 774 )
2dca6cab
JB
775 expect(pool.info).toStrictEqual({
776 version,
777 type: PoolTypes.fixed,
778 worker: WorkerTypes.thread,
47352846 779 started: true,
2dca6cab
JB
780 ready: true,
781 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
782 minSize: numberOfWorkers,
783 maxSize: numberOfWorkers,
784 workerNodes: numberOfWorkers,
785 idleWorkerNodes: numberOfWorkers,
786 busyWorkerNodes: 0,
787 executedTasks: 0,
788 executingTasks: 0,
2dca6cab
JB
789 failedTasks: 0
790 })
6b27d407
JB
791 await pool.destroy()
792 pool = new DynamicClusterPool(
2431bdb4 793 Math.floor(numberOfWorkers / 2),
6b27d407 794 numberOfWorkers,
ecdfbdc0 795 './tests/worker-files/cluster/testWorker.js'
6b27d407 796 )
2dca6cab
JB
797 expect(pool.info).toStrictEqual({
798 version,
799 type: PoolTypes.dynamic,
800 worker: WorkerTypes.cluster,
47352846 801 started: true,
2dca6cab
JB
802 ready: true,
803 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
804 minSize: Math.floor(numberOfWorkers / 2),
805 maxSize: numberOfWorkers,
806 workerNodes: Math.floor(numberOfWorkers / 2),
807 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
808 busyWorkerNodes: 0,
809 executedTasks: 0,
810 executingTasks: 0,
2dca6cab
JB
811 failedTasks: 0
812 })
6b27d407
JB
813 await pool.destroy()
814 })
815
2431bdb4 816 it('Verify that pool worker tasks usage are initialized', async () => {
bf9549ae
JB
817 const pool = new FixedClusterPool(
818 numberOfWorkers,
819 './tests/worker-files/cluster/testWorker.js'
820 )
f06e48d8 821 for (const workerNode of pool.workerNodes) {
47352846 822 expect(workerNode).toBeInstanceOf(WorkerNode)
465b2940 823 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
824 tasks: {
825 executed: 0,
826 executing: 0,
827 queued: 0,
df593701 828 maxQueued: 0,
463226a4 829 sequentiallyStolen: 0,
68cbdc84 830 stolen: 0,
a4e07f72
JB
831 failed: 0
832 },
833 runTime: {
4ba4c7f9 834 history: new CircularArray()
a4e07f72
JB
835 },
836 waitTime: {
4ba4c7f9 837 history: new CircularArray()
a4e07f72 838 },
5df69fab
JB
839 elu: {
840 idle: {
4ba4c7f9 841 history: new CircularArray()
5df69fab
JB
842 },
843 active: {
4ba4c7f9 844 history: new CircularArray()
f7510105 845 }
5df69fab 846 }
86bf340d 847 })
f06e48d8
JB
848 }
849 await pool.destroy()
850 })
851
2431bdb4
JB
852 it('Verify that pool worker tasks queue are initialized', async () => {
853 let pool = new FixedClusterPool(
f06e48d8
JB
854 numberOfWorkers,
855 './tests/worker-files/cluster/testWorker.js'
856 )
857 for (const workerNode of pool.workerNodes) {
47352846 858 expect(workerNode).toBeInstanceOf(WorkerNode)
574b351d 859 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
4d8bf9e4 860 expect(workerNode.tasksQueue.size).toBe(0)
9c16fb4b 861 expect(workerNode.tasksQueue.maxSize).toBe(0)
bf9549ae 862 }
fd7ebd49 863 await pool.destroy()
2431bdb4
JB
864 pool = new DynamicThreadPool(
865 Math.floor(numberOfWorkers / 2),
866 numberOfWorkers,
b2fd3f4a 867 './tests/worker-files/thread/testWorker.mjs'
2431bdb4
JB
868 )
869 for (const workerNode of pool.workerNodes) {
47352846 870 expect(workerNode).toBeInstanceOf(WorkerNode)
574b351d 871 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
2431bdb4
JB
872 expect(workerNode.tasksQueue.size).toBe(0)
873 expect(workerNode.tasksQueue.maxSize).toBe(0)
874 }
213cbac6 875 await pool.destroy()
2431bdb4
JB
876 })
877
878 it('Verify that pool worker info are initialized', async () => {
879 let pool = new FixedClusterPool(
880 numberOfWorkers,
881 './tests/worker-files/cluster/testWorker.js'
882 )
2dca6cab 883 for (const workerNode of pool.workerNodes) {
47352846 884 expect(workerNode).toBeInstanceOf(WorkerNode)
2dca6cab
JB
885 expect(workerNode.info).toStrictEqual({
886 id: expect.any(Number),
887 type: WorkerTypes.cluster,
888 dynamic: false,
889 ready: true
890 })
891 }
2431bdb4
JB
892 await pool.destroy()
893 pool = new DynamicThreadPool(
894 Math.floor(numberOfWorkers / 2),
895 numberOfWorkers,
b2fd3f4a 896 './tests/worker-files/thread/testWorker.mjs'
2431bdb4 897 )
2dca6cab 898 for (const workerNode of pool.workerNodes) {
47352846 899 expect(workerNode).toBeInstanceOf(WorkerNode)
2dca6cab
JB
900 expect(workerNode.info).toStrictEqual({
901 id: expect.any(Number),
902 type: WorkerTypes.thread,
903 dynamic: false,
7884d183 904 ready: true
2dca6cab
JB
905 })
906 }
213cbac6 907 await pool.destroy()
bf9549ae
JB
908 })
909
711623b8
JB
910 it('Verify that pool statuses are checked at start or destroy', async () => {
911 const pool = new FixedThreadPool(
912 numberOfWorkers,
913 './tests/worker-files/thread/testWorker.mjs'
914 )
915 expect(pool.info.started).toBe(true)
916 expect(pool.info.ready).toBe(true)
917 expect(() => pool.start()).toThrow(
918 new Error('Cannot start an already started pool')
919 )
920 await pool.destroy()
921 expect(pool.info.started).toBe(false)
922 expect(pool.info.ready).toBe(false)
923 await expect(pool.destroy()).rejects.toThrow(
924 new Error('Cannot destroy an already destroyed pool')
925 )
926 })
927
47352846
JB
928 it('Verify that pool can be started after initialization', async () => {
929 const pool = new FixedClusterPool(
930 numberOfWorkers,
931 './tests/worker-files/cluster/testWorker.js',
932 {
933 startWorkers: false
934 }
935 )
936 expect(pool.info.started).toBe(false)
937 expect(pool.info.ready).toBe(false)
55082af9 938 expect(pool.readyEventEmitted).toBe(false)
47352846 939 expect(pool.workerNodes).toStrictEqual([])
948faff7 940 await expect(pool.execute()).rejects.toThrow(
47352846
JB
941 new Error('Cannot execute a task on not started pool')
942 )
943 pool.start()
944 expect(pool.info.started).toBe(true)
945 expect(pool.info.ready).toBe(true)
55082af9
JB
946 await waitPoolEvents(pool, PoolEvents.ready, 1)
947 expect(pool.readyEventEmitted).toBe(true)
47352846
JB
948 expect(pool.workerNodes.length).toBe(numberOfWorkers)
949 for (const workerNode of pool.workerNodes) {
950 expect(workerNode).toBeInstanceOf(WorkerNode)
951 }
952 await pool.destroy()
953 })
954
9d2d0da1
JB
955 it('Verify that pool execute() arguments are checked', async () => {
956 const pool = new FixedClusterPool(
957 numberOfWorkers,
958 './tests/worker-files/cluster/testWorker.js'
959 )
948faff7 960 await expect(pool.execute(undefined, 0)).rejects.toThrow(
9d2d0da1
JB
961 new TypeError('name argument must be a string')
962 )
948faff7 963 await expect(pool.execute(undefined, '')).rejects.toThrow(
9d2d0da1
JB
964 new TypeError('name argument must not be an empty string')
965 )
948faff7 966 await expect(pool.execute(undefined, undefined, {})).rejects.toThrow(
9d2d0da1
JB
967 new TypeError('transferList argument must be an array')
968 )
969 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
970 "Task function 'unknown' not found"
971 )
972 await pool.destroy()
948faff7 973 await expect(pool.execute()).rejects.toThrow(
47352846 974 new Error('Cannot execute a task on not started pool')
9d2d0da1
JB
975 )
976 })
977
2431bdb4 978 it('Verify that pool worker tasks usage are computed', async () => {
bf9549ae
JB
979 const pool = new FixedClusterPool(
980 numberOfWorkers,
981 './tests/worker-files/cluster/testWorker.js'
982 )
09c2d0d3 983 const promises = new Set()
fc027381
JB
984 const maxMultiplier = 2
985 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
09c2d0d3 986 promises.add(pool.execute())
bf9549ae 987 }
f06e48d8 988 for (const workerNode of pool.workerNodes) {
465b2940 989 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
990 tasks: {
991 executed: 0,
992 executing: maxMultiplier,
993 queued: 0,
df593701 994 maxQueued: 0,
463226a4 995 sequentiallyStolen: 0,
68cbdc84 996 stolen: 0,
a4e07f72
JB
997 failed: 0
998 },
999 runTime: {
a4e07f72
JB
1000 history: expect.any(CircularArray)
1001 },
1002 waitTime: {
a4e07f72
JB
1003 history: expect.any(CircularArray)
1004 },
5df69fab
JB
1005 elu: {
1006 idle: {
5df69fab
JB
1007 history: expect.any(CircularArray)
1008 },
1009 active: {
5df69fab 1010 history: expect.any(CircularArray)
f7510105 1011 }
5df69fab 1012 }
86bf340d 1013 })
bf9549ae
JB
1014 }
1015 await Promise.all(promises)
f06e48d8 1016 for (const workerNode of pool.workerNodes) {
465b2940 1017 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
1018 tasks: {
1019 executed: maxMultiplier,
1020 executing: 0,
1021 queued: 0,
df593701 1022 maxQueued: 0,
463226a4 1023 sequentiallyStolen: 0,
68cbdc84 1024 stolen: 0,
a4e07f72
JB
1025 failed: 0
1026 },
1027 runTime: {
a4e07f72
JB
1028 history: expect.any(CircularArray)
1029 },
1030 waitTime: {
a4e07f72
JB
1031 history: expect.any(CircularArray)
1032 },
5df69fab
JB
1033 elu: {
1034 idle: {
5df69fab
JB
1035 history: expect.any(CircularArray)
1036 },
1037 active: {
5df69fab 1038 history: expect.any(CircularArray)
f7510105 1039 }
5df69fab 1040 }
86bf340d 1041 })
bf9549ae 1042 }
fd7ebd49 1043 await pool.destroy()
bf9549ae
JB
1044 })
1045
2431bdb4 1046 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
7fd82a1c 1047 const pool = new DynamicThreadPool(
2431bdb4 1048 Math.floor(numberOfWorkers / 2),
8f4878b7 1049 numberOfWorkers,
b2fd3f4a 1050 './tests/worker-files/thread/testWorker.mjs'
9e619829 1051 )
09c2d0d3 1052 const promises = new Set()
ee9f5295
JB
1053 const maxMultiplier = 2
1054 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
09c2d0d3 1055 promises.add(pool.execute())
9e619829
JB
1056 }
1057 await Promise.all(promises)
f06e48d8 1058 for (const workerNode of pool.workerNodes) {
465b2940 1059 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
1060 tasks: {
1061 executed: expect.any(Number),
1062 executing: 0,
1063 queued: 0,
df593701 1064 maxQueued: 0,
463226a4 1065 sequentiallyStolen: 0,
68cbdc84 1066 stolen: 0,
a4e07f72
JB
1067 failed: 0
1068 },
1069 runTime: {
a4e07f72
JB
1070 history: expect.any(CircularArray)
1071 },
1072 waitTime: {
a4e07f72
JB
1073 history: expect.any(CircularArray)
1074 },
5df69fab
JB
1075 elu: {
1076 idle: {
5df69fab
JB
1077 history: expect.any(CircularArray)
1078 },
1079 active: {
5df69fab 1080 history: expect.any(CircularArray)
f7510105 1081 }
5df69fab 1082 }
86bf340d 1083 })
465b2940 1084 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
94407def
JB
1085 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1086 numberOfWorkers * maxMultiplier
1087 )
b97d82d8
JB
1088 expect(workerNode.usage.runTime.history.length).toBe(0)
1089 expect(workerNode.usage.waitTime.history.length).toBe(0)
1090 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1091 expect(workerNode.usage.elu.active.history.length).toBe(0)
9e619829
JB
1092 }
1093 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
f06e48d8 1094 for (const workerNode of pool.workerNodes) {
465b2940 1095 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
1096 tasks: {
1097 executed: 0,
1098 executing: 0,
1099 queued: 0,
df593701 1100 maxQueued: 0,
463226a4 1101 sequentiallyStolen: 0,
68cbdc84 1102 stolen: 0,
a4e07f72
JB
1103 failed: 0
1104 },
1105 runTime: {
a4e07f72
JB
1106 history: expect.any(CircularArray)
1107 },
1108 waitTime: {
a4e07f72
JB
1109 history: expect.any(CircularArray)
1110 },
5df69fab
JB
1111 elu: {
1112 idle: {
5df69fab
JB
1113 history: expect.any(CircularArray)
1114 },
1115 active: {
5df69fab 1116 history: expect.any(CircularArray)
f7510105 1117 }
5df69fab 1118 }
86bf340d 1119 })
465b2940
JB
1120 expect(workerNode.usage.runTime.history.length).toBe(0)
1121 expect(workerNode.usage.waitTime.history.length).toBe(0)
b97d82d8
JB
1122 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1123 expect(workerNode.usage.elu.active.history.length).toBe(0)
ee11a4a2 1124 }
fd7ebd49 1125 await pool.destroy()
ee11a4a2
JB
1126 })
1127
a1763c54
JB
1128 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1129 const pool = new DynamicClusterPool(
2431bdb4 1130 Math.floor(numberOfWorkers / 2),
164d950a 1131 numberOfWorkers,
a1763c54 1132 './tests/worker-files/cluster/testWorker.js'
164d950a 1133 )
c726f66c 1134 expect(pool.emitter.eventNames()).toStrictEqual([])
d46660cd 1135 let poolInfo
a1763c54 1136 let poolReady = 0
041dc05b 1137 pool.emitter.on(PoolEvents.ready, info => {
a1763c54 1138 ++poolReady
d46660cd
JB
1139 poolInfo = info
1140 })
a1763c54 1141 await waitPoolEvents(pool, PoolEvents.ready, 1)
c726f66c 1142 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
a1763c54 1143 expect(poolReady).toBe(1)
d46660cd 1144 expect(poolInfo).toStrictEqual({
23ccf9d7 1145 version,
d46660cd 1146 type: PoolTypes.dynamic,
a1763c54 1147 worker: WorkerTypes.cluster,
47352846 1148 started: true,
a1763c54 1149 ready: true,
2431bdb4
JB
1150 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1151 minSize: expect.any(Number),
1152 maxSize: expect.any(Number),
1153 workerNodes: expect.any(Number),
1154 idleWorkerNodes: expect.any(Number),
1155 busyWorkerNodes: expect.any(Number),
1156 executedTasks: expect.any(Number),
1157 executingTasks: expect.any(Number),
2431bdb4
JB
1158 failedTasks: expect.any(Number)
1159 })
1160 await pool.destroy()
1161 })
1162
a1763c54
JB
1163 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1164 const pool = new FixedThreadPool(
2431bdb4 1165 numberOfWorkers,
b2fd3f4a 1166 './tests/worker-files/thread/testWorker.mjs'
2431bdb4 1167 )
c726f66c 1168 expect(pool.emitter.eventNames()).toStrictEqual([])
a1763c54
JB
1169 const promises = new Set()
1170 let poolBusy = 0
2431bdb4 1171 let poolInfo
041dc05b 1172 pool.emitter.on(PoolEvents.busy, info => {
a1763c54 1173 ++poolBusy
2431bdb4
JB
1174 poolInfo = info
1175 })
c726f66c 1176 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
a1763c54
JB
1177 for (let i = 0; i < numberOfWorkers * 2; i++) {
1178 promises.add(pool.execute())
1179 }
1180 await Promise.all(promises)
1181 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1182 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1183 expect(poolBusy).toBe(numberOfWorkers + 1)
2431bdb4
JB
1184 expect(poolInfo).toStrictEqual({
1185 version,
a1763c54
JB
1186 type: PoolTypes.fixed,
1187 worker: WorkerTypes.thread,
47352846
JB
1188 started: true,
1189 ready: true,
2431bdb4 1190 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
d46660cd
JB
1191 minSize: expect.any(Number),
1192 maxSize: expect.any(Number),
1193 workerNodes: expect.any(Number),
1194 idleWorkerNodes: expect.any(Number),
1195 busyWorkerNodes: expect.any(Number),
a4e07f72
JB
1196 executedTasks: expect.any(Number),
1197 executingTasks: expect.any(Number),
a4e07f72 1198 failedTasks: expect.any(Number)
d46660cd 1199 })
164d950a
JB
1200 await pool.destroy()
1201 })
1202
a1763c54
JB
1203 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1204 const pool = new DynamicThreadPool(
1205 Math.floor(numberOfWorkers / 2),
7c0ba920 1206 numberOfWorkers,
b2fd3f4a 1207 './tests/worker-files/thread/testWorker.mjs'
7c0ba920 1208 )
c726f66c 1209 expect(pool.emitter.eventNames()).toStrictEqual([])
09c2d0d3 1210 const promises = new Set()
a1763c54 1211 let poolFull = 0
d46660cd 1212 let poolInfo
041dc05b 1213 pool.emitter.on(PoolEvents.full, info => {
a1763c54 1214 ++poolFull
d46660cd
JB
1215 poolInfo = info
1216 })
c726f66c 1217 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.full])
7c0ba920 1218 for (let i = 0; i < numberOfWorkers * 2; i++) {
f5d14e90 1219 promises.add(pool.execute())
7c0ba920 1220 }
cf597bc5 1221 await Promise.all(promises)
33e6bb4c 1222 expect(poolFull).toBe(1)
d46660cd 1223 expect(poolInfo).toStrictEqual({
23ccf9d7 1224 version,
a1763c54 1225 type: PoolTypes.dynamic,
d46660cd 1226 worker: WorkerTypes.thread,
47352846
JB
1227 started: true,
1228 ready: true,
2431bdb4 1229 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
8735b4e5
JB
1230 minSize: expect.any(Number),
1231 maxSize: expect.any(Number),
1232 workerNodes: expect.any(Number),
1233 idleWorkerNodes: expect.any(Number),
1234 busyWorkerNodes: expect.any(Number),
1235 executedTasks: expect.any(Number),
1236 executingTasks: expect.any(Number),
1237 failedTasks: expect.any(Number)
1238 })
1239 await pool.destroy()
1240 })
1241
3e8611a8 1242 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
b1aae695 1243 const pool = new FixedThreadPool(
8735b4e5 1244 numberOfWorkers,
b2fd3f4a 1245 './tests/worker-files/thread/testWorker.mjs',
8735b4e5
JB
1246 {
1247 enableTasksQueue: true
1248 }
1249 )
a074ffee 1250 stub(pool, 'hasBackPressure').returns(true)
c726f66c 1251 expect(pool.emitter.eventNames()).toStrictEqual([])
8735b4e5
JB
1252 const promises = new Set()
1253 let poolBackPressure = 0
1254 let poolInfo
041dc05b 1255 pool.emitter.on(PoolEvents.backPressure, info => {
8735b4e5
JB
1256 ++poolBackPressure
1257 poolInfo = info
1258 })
c726f66c 1259 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.backPressure])
033f1776 1260 for (let i = 0; i < numberOfWorkers + 1; i++) {
8735b4e5
JB
1261 promises.add(pool.execute())
1262 }
1263 await Promise.all(promises)
033f1776 1264 expect(poolBackPressure).toBe(1)
8735b4e5
JB
1265 expect(poolInfo).toStrictEqual({
1266 version,
3e8611a8 1267 type: PoolTypes.fixed,
8735b4e5 1268 worker: WorkerTypes.thread,
47352846
JB
1269 started: true,
1270 ready: true,
8735b4e5 1271 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
d46660cd
JB
1272 minSize: expect.any(Number),
1273 maxSize: expect.any(Number),
1274 workerNodes: expect.any(Number),
1275 idleWorkerNodes: expect.any(Number),
1276 busyWorkerNodes: expect.any(Number),
a4e07f72
JB
1277 executedTasks: expect.any(Number),
1278 executingTasks: expect.any(Number),
3e8611a8
JB
1279 maxQueuedTasks: expect.any(Number),
1280 queuedTasks: expect.any(Number),
1281 backPressure: true,
68cbdc84 1282 stolenTasks: expect.any(Number),
a4e07f72 1283 failedTasks: expect.any(Number)
d46660cd 1284 })
e458c82e 1285 expect(pool.hasBackPressure.callCount).toBe(5)
fd7ebd49 1286 await pool.destroy()
7c0ba920 1287 })
70a4f5ea 1288
85b2561d
JB
1289 it('Verify that destroy() waits for queued tasks to finish', async () => {
1290 const tasksFinishedTimeout = 2500
1291 const pool = new FixedThreadPool(
1292 numberOfWorkers,
1293 './tests/worker-files/thread/asyncWorker.mjs',
1294 {
1295 enableTasksQueue: true,
1296 tasksQueueOptions: { tasksFinishedTimeout }
1297 }
1298 )
1299 const maxMultiplier = 4
1300 let tasksFinished = 0
1301 for (const workerNode of pool.workerNodes) {
1302 workerNode.on('taskFinished', () => {
1303 ++tasksFinished
1304 })
1305 }
1306 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1307 pool.execute()
1308 }
1309 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1310 const startTime = performance.now()
1311 await pool.destroy()
1312 const elapsedTime = performance.now() - startTime
1313 expect(tasksFinished).toBe(numberOfWorkers * maxMultiplier)
1314 expect(elapsedTime).toBeGreaterThanOrEqual(2000)
6d41131f 1315 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 100)
85b2561d
JB
1316 })
1317
1318 it('Verify that destroy() waits until the tasks finished timeout is reached', async () => {
1319 const tasksFinishedTimeout = 1000
1320 const pool = new FixedThreadPool(
1321 numberOfWorkers,
1322 './tests/worker-files/thread/asyncWorker.mjs',
1323 {
1324 enableTasksQueue: true,
1325 tasksQueueOptions: { tasksFinishedTimeout }
1326 }
1327 )
1328 const maxMultiplier = 4
1329 let tasksFinished = 0
1330 for (const workerNode of pool.workerNodes) {
1331 workerNode.on('taskFinished', () => {
1332 ++tasksFinished
1333 })
1334 }
1335 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1336 pool.execute()
1337 }
1338 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1339 const startTime = performance.now()
1340 await pool.destroy()
1341 const elapsedTime = performance.now() - startTime
1342 expect(tasksFinished).toBe(0)
c004ecec 1343 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 600)
85b2561d
JB
1344 })
1345
f18fd12b
JB
1346 it('Verify that pool asynchronous resource track tasks execution', async () => {
1347 let taskAsyncId
1348 let initCalls = 0
1349 let beforeCalls = 0
1350 let afterCalls = 0
1351 let resolveCalls = 0
1352 const hook = createHook({
1353 init (asyncId, type) {
1354 if (type === 'poolifier:task') {
1355 initCalls++
1356 taskAsyncId = asyncId
1357 }
1358 },
1359 before (asyncId) {
1360 if (asyncId === taskAsyncId) beforeCalls++
1361 },
1362 after (asyncId) {
1363 if (asyncId === taskAsyncId) afterCalls++
1364 },
1365 promiseResolve () {
1366 if (executionAsyncId() === taskAsyncId) resolveCalls++
1367 }
1368 })
f18fd12b
JB
1369 const pool = new FixedThreadPool(
1370 numberOfWorkers,
1371 './tests/worker-files/thread/testWorker.mjs'
1372 )
8954c0a3 1373 hook.enable()
f18fd12b
JB
1374 await pool.execute()
1375 hook.disable()
1376 expect(initCalls).toBe(1)
1377 expect(beforeCalls).toBe(1)
1378 expect(afterCalls).toBe(1)
1379 expect(resolveCalls).toBe(1)
8954c0a3 1380 await pool.destroy()
f18fd12b
JB
1381 })
1382
9eae3c69
JB
1383 it('Verify that hasTaskFunction() is working', async () => {
1384 const dynamicThreadPool = new DynamicThreadPool(
1385 Math.floor(numberOfWorkers / 2),
1386 numberOfWorkers,
b2fd3f4a 1387 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
9eae3c69
JB
1388 )
1389 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1390 expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1391 expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1392 true
1393 )
1394 expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true)
1395 expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true)
1396 expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false)
1397 await dynamicThreadPool.destroy()
1398 const fixedClusterPool = new FixedClusterPool(
1399 numberOfWorkers,
1400 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1401 )
1402 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1403 expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1404 expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1405 true
1406 )
1407 expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true)
1408 expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true)
1409 expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false)
1410 await fixedClusterPool.destroy()
1411 })
1412
1413 it('Verify that addTaskFunction() is working', async () => {
1414 const dynamicThreadPool = new DynamicThreadPool(
1415 Math.floor(numberOfWorkers / 2),
1416 numberOfWorkers,
b2fd3f4a 1417 './tests/worker-files/thread/testWorker.mjs'
9eae3c69
JB
1418 )
1419 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
3feeab69
JB
1420 await expect(
1421 dynamicThreadPool.addTaskFunction(0, () => {})
948faff7 1422 ).rejects.toThrow(new TypeError('name argument must be a string'))
3feeab69
JB
1423 await expect(
1424 dynamicThreadPool.addTaskFunction('', () => {})
948faff7 1425 ).rejects.toThrow(
3feeab69
JB
1426 new TypeError('name argument must not be an empty string')
1427 )
948faff7
JB
1428 await expect(dynamicThreadPool.addTaskFunction('test', 0)).rejects.toThrow(
1429 new TypeError('fn argument must be a function')
1430 )
1431 await expect(dynamicThreadPool.addTaskFunction('test', '')).rejects.toThrow(
1432 new TypeError('fn argument must be a function')
1433 )
9eae3c69
JB
1434 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1435 DEFAULT_TASK_NAME,
1436 'test'
1437 ])
1438 const echoTaskFunction = data => {
1439 return data
1440 }
1441 await expect(
1442 dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1443 ).resolves.toBe(true)
1444 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1445 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1446 echoTaskFunction
1447 )
1448 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1449 DEFAULT_TASK_NAME,
1450 'test',
1451 'echo'
1452 ])
1453 const taskFunctionData = { test: 'test' }
1454 const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
1455 expect(echoResult).toStrictEqual(taskFunctionData)
adee6053
JB
1456 for (const workerNode of dynamicThreadPool.workerNodes) {
1457 expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
1458 tasks: {
1459 executed: expect.any(Number),
1460 executing: 0,
1461 queued: 0,
463226a4 1462 sequentiallyStolen: 0,
5ad42e34 1463 stolen: 0,
adee6053
JB
1464 failed: 0
1465 },
1466 runTime: {
1467 history: new CircularArray()
1468 },
1469 waitTime: {
1470 history: new CircularArray()
1471 },
1472 elu: {
1473 idle: {
1474 history: new CircularArray()
1475 },
1476 active: {
1477 history: new CircularArray()
1478 }
1479 }
1480 })
1481 }
9eae3c69
JB
1482 await dynamicThreadPool.destroy()
1483 })
1484
1485 it('Verify that removeTaskFunction() is working', async () => {
1486 const dynamicThreadPool = new DynamicThreadPool(
1487 Math.floor(numberOfWorkers / 2),
1488 numberOfWorkers,
b2fd3f4a 1489 './tests/worker-files/thread/testWorker.mjs'
9eae3c69
JB
1490 )
1491 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1492 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1493 DEFAULT_TASK_NAME,
1494 'test'
1495 ])
948faff7 1496 await expect(dynamicThreadPool.removeTaskFunction('test')).rejects.toThrow(
16248b23 1497 new Error('Cannot remove a task function not handled on the pool side')
9eae3c69
JB
1498 )
1499 const echoTaskFunction = data => {
1500 return data
1501 }
1502 await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1503 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1504 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1505 echoTaskFunction
1506 )
1507 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1508 DEFAULT_TASK_NAME,
1509 'test',
1510 'echo'
1511 ])
1512 await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
1513 true
1514 )
1515 expect(dynamicThreadPool.taskFunctions.size).toBe(0)
1516 expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
1517 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1518 DEFAULT_TASK_NAME,
1519 'test'
1520 ])
1521 await dynamicThreadPool.destroy()
1522 })
1523
30500265 1524 it('Verify that listTaskFunctionNames() is working', async () => {
90d7d101
JB
1525 const dynamicThreadPool = new DynamicThreadPool(
1526 Math.floor(numberOfWorkers / 2),
1527 numberOfWorkers,
b2fd3f4a 1528 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
90d7d101
JB
1529 )
1530 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
66979634 1531 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
6cd5248f 1532 DEFAULT_TASK_NAME,
90d7d101
JB
1533 'jsonIntegerSerialization',
1534 'factorial',
1535 'fibonacci'
1536 ])
9eae3c69 1537 await dynamicThreadPool.destroy()
90d7d101
JB
1538 const fixedClusterPool = new FixedClusterPool(
1539 numberOfWorkers,
1540 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1541 )
1542 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
66979634 1543 expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([
6cd5248f 1544 DEFAULT_TASK_NAME,
90d7d101
JB
1545 'jsonIntegerSerialization',
1546 'factorial',
1547 'fibonacci'
1548 ])
0fe39c97 1549 await fixedClusterPool.destroy()
90d7d101
JB
1550 })
1551
9eae3c69 1552 it('Verify that setDefaultTaskFunction() is working', async () => {
30500265
JB
1553 const dynamicThreadPool = new DynamicThreadPool(
1554 Math.floor(numberOfWorkers / 2),
1555 numberOfWorkers,
b2fd3f4a 1556 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
30500265
JB
1557 )
1558 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
711623b8 1559 const workerId = dynamicThreadPool.workerNodes[0].info.id
948faff7 1560 await expect(dynamicThreadPool.setDefaultTaskFunction(0)).rejects.toThrow(
b0b55f57 1561 new Error(
711623b8 1562 `Task function operation 'default' failed on worker ${workerId} with error: 'TypeError: name parameter is not a string'`
b0b55f57
JB
1563 )
1564 )
1565 await expect(
1566 dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
948faff7 1567 ).rejects.toThrow(
b0b55f57 1568 new Error(
711623b8 1569 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function reserved name as the default task function'`
b0b55f57
JB
1570 )
1571 )
1572 await expect(
1573 dynamicThreadPool.setDefaultTaskFunction('unknown')
948faff7 1574 ).rejects.toThrow(
b0b55f57 1575 new Error(
711623b8 1576 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function to a non-existing task function'`
b0b55f57
JB
1577 )
1578 )
9eae3c69
JB
1579 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1580 DEFAULT_TASK_NAME,
1581 'jsonIntegerSerialization',
1582 'factorial',
1583 'fibonacci'
1584 ])
1585 await expect(
1586 dynamicThreadPool.setDefaultTaskFunction('factorial')
1587 ).resolves.toBe(true)
1588 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1589 DEFAULT_TASK_NAME,
1590 'factorial',
1591 'jsonIntegerSerialization',
1592 'fibonacci'
1593 ])
1594 await expect(
1595 dynamicThreadPool.setDefaultTaskFunction('fibonacci')
1596 ).resolves.toBe(true)
1597 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1598 DEFAULT_TASK_NAME,
1599 'fibonacci',
1600 'jsonIntegerSerialization',
1601 'factorial'
1602 ])
cda9ba34 1603 await dynamicThreadPool.destroy()
30500265
JB
1604 })
1605
90d7d101 1606 it('Verify that multiple task functions worker is working', async () => {
70a4f5ea 1607 const pool = new DynamicClusterPool(
2431bdb4 1608 Math.floor(numberOfWorkers / 2),
70a4f5ea 1609 numberOfWorkers,
90d7d101 1610 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
70a4f5ea
JB
1611 )
1612 const data = { n: 10 }
82888165 1613 const result0 = await pool.execute(data)
30b963d4 1614 expect(result0).toStrictEqual({ ok: 1 })
70a4f5ea 1615 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
30b963d4 1616 expect(result1).toStrictEqual({ ok: 1 })
70a4f5ea
JB
1617 const result2 = await pool.execute(data, 'factorial')
1618 expect(result2).toBe(3628800)
1619 const result3 = await pool.execute(data, 'fibonacci')
024daf59 1620 expect(result3).toBe(55)
5bb5be17
JB
1621 expect(pool.info.executingTasks).toBe(0)
1622 expect(pool.info.executedTasks).toBe(4)
b414b84c 1623 for (const workerNode of pool.workerNodes) {
66979634 1624 expect(workerNode.info.taskFunctionNames).toStrictEqual([
6cd5248f 1625 DEFAULT_TASK_NAME,
b414b84c
JB
1626 'jsonIntegerSerialization',
1627 'factorial',
1628 'fibonacci'
1629 ])
1630 expect(workerNode.taskFunctionsUsage.size).toBe(3)
66979634 1631 for (const name of pool.listTaskFunctionNames()) {
5bb5be17
JB
1632 expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
1633 tasks: {
1634 executed: expect.any(Number),
4ba4c7f9 1635 executing: 0,
5bb5be17 1636 failed: 0,
68cbdc84 1637 queued: 0,
463226a4 1638 sequentiallyStolen: 0,
68cbdc84 1639 stolen: 0
5bb5be17
JB
1640 },
1641 runTime: {
1642 history: expect.any(CircularArray)
1643 },
1644 waitTime: {
1645 history: expect.any(CircularArray)
1646 },
1647 elu: {
1648 idle: {
1649 history: expect.any(CircularArray)
1650 },
1651 active: {
1652 history: expect.any(CircularArray)
1653 }
1654 }
1655 })
1656 expect(
4ba4c7f9
JB
1657 workerNode.getTaskFunctionWorkerUsage(name).tasks.executed
1658 ).toBeGreaterThan(0)
5bb5be17 1659 }
dfd7ec01
JB
1660 expect(
1661 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1662 ).toStrictEqual(
66979634
JB
1663 workerNode.getTaskFunctionWorkerUsage(
1664 workerNode.info.taskFunctionNames[1]
1665 )
dfd7ec01 1666 )
5bb5be17 1667 }
0fe39c97 1668 await pool.destroy()
70a4f5ea 1669 })
52a23942
JB
1670
1671 it('Verify sendKillMessageToWorker()', async () => {
1672 const pool = new DynamicClusterPool(
1673 Math.floor(numberOfWorkers / 2),
1674 numberOfWorkers,
1675 './tests/worker-files/cluster/testWorker.js'
1676 )
1677 const workerNodeKey = 0
1678 await expect(
adee6053 1679 pool.sendKillMessageToWorker(workerNodeKey)
52a23942 1680 ).resolves.toBeUndefined()
85b2561d
JB
1681 await expect(
1682 pool.sendKillMessageToWorker(numberOfWorkers)
1683 ).rejects.toStrictEqual(
1684 new Error(`Invalid worker node key '${numberOfWorkers}'`)
1685 )
52a23942
JB
1686 await pool.destroy()
1687 })
adee6053
JB
1688
1689 it('Verify sendTaskFunctionOperationToWorker()', async () => {
1690 const pool = new DynamicClusterPool(
1691 Math.floor(numberOfWorkers / 2),
1692 numberOfWorkers,
1693 './tests/worker-files/cluster/testWorker.js'
1694 )
1695 const workerNodeKey = 0
1696 await expect(
1697 pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
1698 taskFunctionOperation: 'add',
1699 taskFunctionName: 'empty',
1700 taskFunction: (() => {}).toString()
1701 })
1702 ).resolves.toBe(true)
1703 expect(
1704 pool.workerNodes[workerNodeKey].info.taskFunctionNames
1705 ).toStrictEqual([DEFAULT_TASK_NAME, 'test', 'empty'])
1706 await pool.destroy()
1707 })
1708
1709 it('Verify sendTaskFunctionOperationToWorkers()', async () => {
1710 const pool = new DynamicClusterPool(
1711 Math.floor(numberOfWorkers / 2),
1712 numberOfWorkers,
1713 './tests/worker-files/cluster/testWorker.js'
1714 )
adee6053
JB
1715 await expect(
1716 pool.sendTaskFunctionOperationToWorkers({
1717 taskFunctionOperation: 'add',
1718 taskFunctionName: 'empty',
1719 taskFunction: (() => {}).toString()
1720 })
1721 ).resolves.toBe(true)
1722 for (const workerNode of pool.workerNodes) {
1723 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1724 DEFAULT_TASK_NAME,
1725 'test',
1726 'empty'
1727 ])
1728 }
1729 await pool.destroy()
1730 })
3ec964d6 1731})