fix: fix worker weights handling
[poolifier.git] / tests / pools / abstract-pool.test.mjs
CommitLineData
a074ffee 1import { EventEmitterAsyncResource } from 'node:events'
2eb14889 2import { dirname, join } from 'node:path'
a074ffee 3import { readFileSync } from 'node:fs'
2eb14889 4import { fileURLToPath } from 'node:url'
f18fd12b 5import { createHook, executionAsyncId } from 'node:async_hooks'
a074ffee
JB
6import { expect } from 'expect'
7import { restore, stub } from 'sinon'
8import {
70a4f5ea 9 DynamicClusterPool,
9e619829 10 DynamicThreadPool,
aee46736 11 FixedClusterPool,
e843b904 12 FixedThreadPool,
aee46736 13 PoolEvents,
184855e6 14 PoolTypes,
3d6dd312 15 WorkerChoiceStrategies,
184855e6 16 WorkerTypes
a074ffee
JB
17} from '../../lib/index.js'
18import { CircularArray } from '../../lib/circular-array.js'
19import { Deque } from '../../lib/deque.js'
20import { DEFAULT_TASK_NAME } from '../../lib/utils.js'
21import { waitPoolEvents } from '../test-utils.js'
22import { WorkerNode } from '../../lib/pools/worker-node.js'
e1ffb94f
JB
23
24describe('Abstract pool test suite', () => {
2eb14889
JB
25 const version = JSON.parse(
26 readFileSync(
a5e5599c 27 join(dirname(fileURLToPath(import.meta.url)), '../..', 'package.json'),
2eb14889
JB
28 'utf8'
29 )
30 ).version
fc027381 31 const numberOfWorkers = 2
a8884ffd 32 class StubPoolWithIsMain extends FixedThreadPool {
e1ffb94f
JB
33 isMain () {
34 return false
35 }
3ec964d6 36 }
3ec964d6 37
dc021bcc 38 afterEach(() => {
a074ffee 39 restore()
dc021bcc
JB
40 })
41
bcf85f7f
JB
42 it('Verify that pool can be created and destroyed', async () => {
43 const pool = new FixedThreadPool(
44 numberOfWorkers,
45 './tests/worker-files/thread/testWorker.mjs'
46 )
47 expect(pool).toBeInstanceOf(FixedThreadPool)
48 await pool.destroy()
49 })
50
51 it('Verify that pool cannot be created from a non main thread/process', () => {
8d3782fa
JB
52 expect(
53 () =>
a8884ffd 54 new StubPoolWithIsMain(
7c0ba920 55 numberOfWorkers,
b2fd3f4a 56 './tests/worker-files/thread/testWorker.mjs',
8d3782fa 57 {
041dc05b 58 errorHandler: e => console.error(e)
8d3782fa
JB
59 }
60 )
948faff7 61 ).toThrow(
e695d66f
JB
62 new Error(
63 'Cannot start a pool from a worker with the same type as the pool'
64 )
04f45163 65 )
3ec964d6 66 })
c510fea7 67
bc61cfe6
JB
68 it('Verify that pool statuses properties are set', async () => {
69 const pool = new FixedThreadPool(
70 numberOfWorkers,
b2fd3f4a 71 './tests/worker-files/thread/testWorker.mjs'
bc61cfe6 72 )
bc61cfe6 73 expect(pool.started).toBe(true)
711623b8
JB
74 expect(pool.starting).toBe(false)
75 expect(pool.destroying).toBe(false)
bc61cfe6 76 await pool.destroy()
bc61cfe6
JB
77 })
78
c510fea7 79 it('Verify that filePath is checked', () => {
948faff7 80 expect(() => new FixedThreadPool(numberOfWorkers)).toThrow(
c3719753
JB
81 new TypeError('The worker file path must be specified')
82 )
83 expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrow(
84 new TypeError('The worker file path must be a string')
3d6dd312
JB
85 )
86 expect(
87 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
948faff7 88 ).toThrow(new Error("Cannot find the worker file './dummyWorker.ts'"))
8d3782fa
JB
89 })
90
91 it('Verify that numberOfWorkers is checked', () => {
8003c026
JB
92 expect(
93 () =>
94 new FixedThreadPool(
95 undefined,
b2fd3f4a 96 './tests/worker-files/thread/testWorker.mjs'
8003c026 97 )
948faff7 98 ).toThrow(
e695d66f
JB
99 new Error(
100 'Cannot instantiate a pool without specifying the number of workers'
101 )
8d3782fa
JB
102 )
103 })
104
105 it('Verify that a negative number of workers is checked', () => {
106 expect(
107 () =>
108 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
948faff7 109 ).toThrow(
473c717a
JB
110 new RangeError(
111 'Cannot instantiate a pool with a negative number of workers'
112 )
8d3782fa
JB
113 )
114 })
115
116 it('Verify that a non integer number of workers is checked', () => {
117 expect(
118 () =>
b2fd3f4a 119 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.mjs')
948faff7 120 ).toThrow(
473c717a 121 new TypeError(
0d80593b 122 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
123 )
124 )
c510fea7 125 })
7c0ba920 126
26ce26ca
JB
127 it('Verify that pool arguments number and pool type are checked', () => {
128 expect(
129 () =>
130 new FixedThreadPool(
131 numberOfWorkers,
132 './tests/worker-files/thread/testWorker.mjs',
133 undefined,
134 numberOfWorkers * 2
135 )
136 ).toThrow(
137 new Error(
138 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
139 )
140 )
141 })
142
216541b6 143 it('Verify that dynamic pool sizing is checked', () => {
a5ed75b7
JB
144 expect(
145 () =>
146 new DynamicClusterPool(
147 1,
148 undefined,
149 './tests/worker-files/cluster/testWorker.js'
150 )
948faff7 151 ).toThrow(
a5ed75b7
JB
152 new TypeError(
153 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
154 )
155 )
2761efb4
JB
156 expect(
157 () =>
158 new DynamicThreadPool(
159 0.5,
160 1,
b2fd3f4a 161 './tests/worker-files/thread/testWorker.mjs'
2761efb4 162 )
948faff7 163 ).toThrow(
2761efb4
JB
164 new TypeError(
165 'Cannot instantiate a pool with a non safe integer number of workers'
166 )
167 )
168 expect(
169 () =>
170 new DynamicClusterPool(
171 0,
172 0.5,
a5ed75b7 173 './tests/worker-files/cluster/testWorker.js'
2761efb4 174 )
948faff7 175 ).toThrow(
2761efb4
JB
176 new TypeError(
177 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
178 )
179 )
2431bdb4
JB
180 expect(
181 () =>
b2fd3f4a
JB
182 new DynamicThreadPool(
183 2,
184 1,
185 './tests/worker-files/thread/testWorker.mjs'
186 )
948faff7 187 ).toThrow(
2431bdb4
JB
188 new RangeError(
189 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
190 )
191 )
192 expect(
193 () =>
b2fd3f4a
JB
194 new DynamicThreadPool(
195 0,
196 0,
197 './tests/worker-files/thread/testWorker.mjs'
198 )
948faff7 199 ).toThrow(
2431bdb4 200 new RangeError(
213cbac6 201 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
2431bdb4
JB
202 )
203 )
21f710aa
JB
204 expect(
205 () =>
213cbac6
JB
206 new DynamicClusterPool(
207 1,
208 1,
209 './tests/worker-files/cluster/testWorker.js'
210 )
948faff7 211 ).toThrow(
21f710aa 212 new RangeError(
213cbac6 213 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
21f710aa
JB
214 )
215 )
2431bdb4
JB
216 })
217
fd7ebd49 218 it('Verify that pool options are checked', async () => {
7c0ba920
JB
219 let pool = new FixedThreadPool(
220 numberOfWorkers,
b2fd3f4a 221 './tests/worker-files/thread/testWorker.mjs'
7c0ba920 222 )
b5604034 223 expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
47352846
JB
224 expect(pool.opts).toStrictEqual({
225 startWorkers: true,
226 enableEvents: true,
227 restartWorkerOnError: true,
228 enableTasksQueue: false,
26ce26ca 229 workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN
8990357d
JB
230 })
231 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
00e1bdeb
JB
232 retries:
233 pool.info.maxSize +
234 Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
932fc8be 235 runTime: { median: false },
5df69fab 236 waitTime: { median: false },
00e1bdeb
JB
237 elu: { median: false },
238 weights: expect.objectContaining({
239 0: expect.any(Number),
240 [pool.info.maxSize - 1]: expect.any(Number)
241 })
da309861 242 })
999ef664
JB
243 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
244 .workerChoiceStrategies) {
fde30586
JB
245 expect(workerChoiceStrategy.opts).toStrictEqual(
246 expect.objectContaining({
247 retries:
248 pool.info.maxSize +
249 Object.keys(workerChoiceStrategy.opts.weights).length,
250 runTime: { median: false },
251 waitTime: { median: false },
252 elu: { median: false }
253 // weights: expect.objectContaining({
254 // 0: expect.any(Number),
255 // [pool.info.maxSize - 1]: expect.any(Number)
256 // })
00e1bdeb 257 })
fde30586 258 )
999ef664 259 }
fd7ebd49 260 await pool.destroy()
73bfd59d 261 const testHandler = () => console.info('test handler executed')
7c0ba920
JB
262 pool = new FixedThreadPool(
263 numberOfWorkers,
b2fd3f4a 264 './tests/worker-files/thread/testWorker.mjs',
7c0ba920 265 {
e4543b14 266 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
49be33fe 267 workerChoiceStrategyOptions: {
932fc8be 268 runTime: { median: true },
fc027381 269 weights: { 0: 300, 1: 200 }
49be33fe 270 },
35cf1c03 271 enableEvents: false,
1f68cede 272 restartWorkerOnError: false,
ff733df7 273 enableTasksQueue: true,
d4aeae5a 274 tasksQueueOptions: { concurrency: 2 },
35cf1c03
JB
275 messageHandler: testHandler,
276 errorHandler: testHandler,
277 onlineHandler: testHandler,
278 exitHandler: testHandler
7c0ba920
JB
279 }
280 )
7c0ba920 281 expect(pool.emitter).toBeUndefined()
47352846
JB
282 expect(pool.opts).toStrictEqual({
283 startWorkers: true,
284 enableEvents: false,
285 restartWorkerOnError: false,
286 enableTasksQueue: true,
287 tasksQueueOptions: {
288 concurrency: 2,
2324f8c9 289 size: Math.pow(numberOfWorkers, 2),
dbd73092 290 taskStealing: true,
32b141fd 291 tasksStealingOnBackPressure: true,
568d0075 292 tasksFinishedTimeout: 2000
47352846
JB
293 },
294 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
295 workerChoiceStrategyOptions: {
47352846 296 runTime: { median: true },
47352846
JB
297 weights: { 0: 300, 1: 200 }
298 },
299 onlineHandler: testHandler,
300 messageHandler: testHandler,
301 errorHandler: testHandler,
302 exitHandler: testHandler
8990357d
JB
303 })
304 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
449cd154
JB
305 retries:
306 pool.info.maxSize +
307 Object.keys(pool.opts.workerChoiceStrategyOptions.weights).length,
8990357d
JB
308 runTime: { median: true },
309 waitTime: { median: false },
310 elu: { median: false },
fc027381 311 weights: { 0: 300, 1: 200 }
da309861 312 })
999ef664
JB
313 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
314 .workerChoiceStrategies) {
315 expect(workerChoiceStrategy.opts).toStrictEqual({
449cd154
JB
316 retries:
317 pool.info.maxSize +
318 Object.keys(pool.opts.workerChoiceStrategyOptions.weights).length,
999ef664
JB
319 runTime: { median: true },
320 waitTime: { median: false },
321 elu: { median: false },
322 weights: { 0: 300, 1: 200 }
323 })
324 }
fd7ebd49 325 await pool.destroy()
7c0ba920
JB
326 })
327
fe291b64 328 it('Verify that pool options are validated', () => {
d4aeae5a
JB
329 expect(
330 () =>
331 new FixedThreadPool(
332 numberOfWorkers,
b2fd3f4a 333 './tests/worker-files/thread/testWorker.mjs',
d4aeae5a 334 {
f0d7f803 335 workerChoiceStrategy: 'invalidStrategy'
d4aeae5a
JB
336 }
337 )
948faff7 338 ).toThrow(new Error("Invalid worker choice strategy 'invalidStrategy'"))
49be33fe
JB
339 expect(
340 () =>
341 new FixedThreadPool(
342 numberOfWorkers,
b2fd3f4a 343 './tests/worker-files/thread/testWorker.mjs',
49be33fe
JB
344 {
345 workerChoiceStrategyOptions: { weights: {} }
346 }
347 )
948faff7 348 ).toThrow(
8735b4e5
JB
349 new Error(
350 'Invalid worker choice strategy options: must have a weight for each worker node'
351 )
49be33fe 352 )
f0d7f803
JB
353 expect(
354 () =>
355 new FixedThreadPool(
356 numberOfWorkers,
b2fd3f4a 357 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
358 {
359 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
360 }
361 )
948faff7 362 ).toThrow(
8735b4e5
JB
363 new Error(
364 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
365 )
f0d7f803 366 )
5c4c2dee
JB
367 expect(
368 () =>
369 new FixedThreadPool(
370 numberOfWorkers,
b2fd3f4a 371 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
372 {
373 enableTasksQueue: true,
374 tasksQueueOptions: 'invalidTasksQueueOptions'
375 }
376 )
948faff7 377 ).toThrow(
5c4c2dee
JB
378 new TypeError('Invalid tasks queue options: must be a plain object')
379 )
f0d7f803
JB
380 expect(
381 () =>
382 new FixedThreadPool(
383 numberOfWorkers,
b2fd3f4a 384 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
385 {
386 enableTasksQueue: true,
387 tasksQueueOptions: { concurrency: 0 }
388 }
389 )
948faff7 390 ).toThrow(
e695d66f 391 new RangeError(
20c6f652 392 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
8735b4e5
JB
393 )
394 )
f0d7f803
JB
395 expect(
396 () =>
397 new FixedThreadPool(
398 numberOfWorkers,
b2fd3f4a 399 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
400 {
401 enableTasksQueue: true,
5c4c2dee 402 tasksQueueOptions: { concurrency: -1 }
f0d7f803
JB
403 }
404 )
948faff7 405 ).toThrow(
5c4c2dee
JB
406 new RangeError(
407 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
408 )
8735b4e5 409 )
f0d7f803
JB
410 expect(
411 () =>
412 new FixedThreadPool(
413 numberOfWorkers,
b2fd3f4a 414 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
415 {
416 enableTasksQueue: true,
417 tasksQueueOptions: { concurrency: 0.2 }
418 }
419 )
948faff7 420 ).toThrow(
20c6f652 421 new TypeError('Invalid worker node tasks concurrency: must be an integer')
8735b4e5 422 )
5c4c2dee
JB
423 expect(
424 () =>
425 new FixedThreadPool(
426 numberOfWorkers,
b2fd3f4a 427 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
428 {
429 enableTasksQueue: true,
430 tasksQueueOptions: { size: 0 }
431 }
432 )
948faff7 433 ).toThrow(
5c4c2dee
JB
434 new RangeError(
435 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
436 )
437 )
438 expect(
439 () =>
440 new FixedThreadPool(
441 numberOfWorkers,
b2fd3f4a 442 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
443 {
444 enableTasksQueue: true,
445 tasksQueueOptions: { size: -1 }
446 }
447 )
948faff7 448 ).toThrow(
5c4c2dee
JB
449 new RangeError(
450 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
451 )
452 )
453 expect(
454 () =>
455 new FixedThreadPool(
456 numberOfWorkers,
b2fd3f4a 457 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
458 {
459 enableTasksQueue: true,
460 tasksQueueOptions: { size: 0.2 }
461 }
462 )
948faff7 463 ).toThrow(
5c4c2dee
JB
464 new TypeError('Invalid worker node tasks queue size: must be an integer')
465 )
d4aeae5a
JB
466 })
467
2431bdb4 468 it('Verify that pool worker choice strategy options can be set', async () => {
a20f0ba5
JB
469 const pool = new FixedThreadPool(
470 numberOfWorkers,
b2fd3f4a 471 './tests/worker-files/thread/testWorker.mjs',
a20f0ba5
JB
472 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
473 )
26ce26ca 474 expect(pool.opts.workerChoiceStrategyOptions).toBeUndefined()
8990357d 475 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
00e1bdeb
JB
476 retries:
477 pool.info.maxSize +
478 Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
932fc8be 479 runTime: { median: false },
5df69fab 480 waitTime: { median: false },
00e1bdeb
JB
481 elu: { median: false },
482 weights: expect.objectContaining({
483 0: expect.any(Number),
484 [pool.info.maxSize - 1]: expect.any(Number)
485 })
a20f0ba5
JB
486 })
487 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
488 .workerChoiceStrategies) {
55ddaf0f
JB
489 expect(workerChoiceStrategy.opts).toStrictEqual(
490 expect.objectContaining({
491 retries:
492 pool.info.maxSize +
493 Object.keys(workerChoiceStrategy.opts.weights).length,
494 runTime: { median: false },
495 waitTime: { median: false },
496 elu: { median: false }
497 // weights: expect.objectContaining({
498 // 0: expect.any(Number),
499 // [pool.info.maxSize - 1]: expect.any(Number)
500 // })
00e1bdeb 501 })
55ddaf0f 502 )
a20f0ba5 503 }
87de9ff5
JB
504 expect(
505 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
506 ).toStrictEqual({
932fc8be
JB
507 runTime: {
508 aggregate: true,
509 average: true,
510 median: false
511 },
512 waitTime: {
513 aggregate: false,
514 average: false,
515 median: false
516 },
5df69fab 517 elu: {
9adcefab
JB
518 aggregate: true,
519 average: true,
5df69fab
JB
520 median: false
521 }
86bf340d 522 })
9adcefab
JB
523 pool.setWorkerChoiceStrategyOptions({
524 runTime: { median: true },
525 elu: { median: true }
526 })
a20f0ba5 527 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
9adcefab 528 runTime: { median: true },
8990357d
JB
529 elu: { median: true }
530 })
531 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
00e1bdeb
JB
532 retries:
533 pool.info.maxSize +
534 Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
8990357d
JB
535 runTime: { median: true },
536 waitTime: { median: false },
00e1bdeb
JB
537 elu: { median: true },
538 weights: expect.objectContaining({
539 0: expect.any(Number),
540 [pool.info.maxSize - 1]: expect.any(Number)
541 })
a20f0ba5
JB
542 })
543 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
544 .workerChoiceStrategies) {
fc7633dd
JB
545 expect(workerChoiceStrategy.opts).toStrictEqual(
546 expect.objectContaining({
547 retries:
548 pool.info.maxSize +
549 Object.keys(workerChoiceStrategy.opts.weights).length,
550 runTime: { median: true },
551 waitTime: { median: false },
552 elu: { median: true }
553 // weights: expect.objectContaining({
554 // 0: expect.any(Number),
555 // [pool.info.maxSize - 1]: expect.any(Number)
556 // })
00e1bdeb 557 })
fc7633dd 558 )
a20f0ba5 559 }
87de9ff5
JB
560 expect(
561 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
562 ).toStrictEqual({
932fc8be
JB
563 runTime: {
564 aggregate: true,
565 average: false,
566 median: true
567 },
568 waitTime: {
569 aggregate: false,
570 average: false,
571 median: false
572 },
5df69fab 573 elu: {
9adcefab 574 aggregate: true,
5df69fab 575 average: false,
9adcefab 576 median: true
5df69fab 577 }
86bf340d 578 })
9adcefab
JB
579 pool.setWorkerChoiceStrategyOptions({
580 runTime: { median: false },
581 elu: { median: false }
582 })
a20f0ba5 583 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
8990357d 584 runTime: { median: false },
8990357d
JB
585 elu: { median: false }
586 })
587 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
00e1bdeb
JB
588 retries:
589 pool.info.maxSize +
590 Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
9adcefab 591 runTime: { median: false },
8990357d 592 waitTime: { median: false },
00e1bdeb
JB
593 elu: { median: false },
594 weights: expect.objectContaining({
595 0: expect.any(Number),
596 [pool.info.maxSize - 1]: expect.any(Number)
597 })
a20f0ba5
JB
598 })
599 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
600 .workerChoiceStrategies) {
69bba5e2
JB
601 expect(workerChoiceStrategy.opts).toStrictEqual(
602 expect.objectContaining({
603 retries:
604 pool.info.maxSize +
605 Object.keys(workerChoiceStrategy.opts.weights).length,
606 runTime: { median: false },
607 waitTime: { median: false },
608 elu: { median: false }
609 // weights: expect.objectContaining({
610 // 0: expect.any(Number),
611 // [pool.info.maxSize - 1]: expect.any(Number)
612 // })
00e1bdeb 613 })
69bba5e2 614 )
a20f0ba5 615 }
87de9ff5
JB
616 expect(
617 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
618 ).toStrictEqual({
932fc8be
JB
619 runTime: {
620 aggregate: true,
621 average: true,
622 median: false
623 },
624 waitTime: {
625 aggregate: false,
626 average: false,
627 median: false
628 },
5df69fab 629 elu: {
9adcefab
JB
630 aggregate: true,
631 average: true,
5df69fab
JB
632 median: false
633 }
86bf340d 634 })
1f95d544
JB
635 expect(() =>
636 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
948faff7 637 ).toThrow(
8735b4e5
JB
638 new TypeError(
639 'Invalid worker choice strategy options: must be a plain object'
640 )
1f95d544 641 )
948faff7 642 expect(() => pool.setWorkerChoiceStrategyOptions({ weights: {} })).toThrow(
8735b4e5
JB
643 new Error(
644 'Invalid worker choice strategy options: must have a weight for each worker node'
645 )
1f95d544
JB
646 )
647 expect(() =>
648 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
948faff7 649 ).toThrow(
8735b4e5
JB
650 new Error(
651 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
652 )
1f95d544 653 )
a20f0ba5
JB
654 await pool.destroy()
655 })
656
2431bdb4 657 it('Verify that pool tasks queue can be enabled/disabled', async () => {
a20f0ba5
JB
658 const pool = new FixedThreadPool(
659 numberOfWorkers,
b2fd3f4a 660 './tests/worker-files/thread/testWorker.mjs'
a20f0ba5
JB
661 )
662 expect(pool.opts.enableTasksQueue).toBe(false)
663 expect(pool.opts.tasksQueueOptions).toBeUndefined()
664 pool.enableTasksQueue(true)
665 expect(pool.opts.enableTasksQueue).toBe(true)
20c6f652
JB
666 expect(pool.opts.tasksQueueOptions).toStrictEqual({
667 concurrency: 1,
2324f8c9 668 size: Math.pow(numberOfWorkers, 2),
dbd73092 669 taskStealing: true,
32b141fd 670 tasksStealingOnBackPressure: true,
568d0075 671 tasksFinishedTimeout: 2000
20c6f652 672 })
a20f0ba5
JB
673 pool.enableTasksQueue(true, { concurrency: 2 })
674 expect(pool.opts.enableTasksQueue).toBe(true)
20c6f652
JB
675 expect(pool.opts.tasksQueueOptions).toStrictEqual({
676 concurrency: 2,
2324f8c9 677 size: Math.pow(numberOfWorkers, 2),
dbd73092 678 taskStealing: true,
32b141fd 679 tasksStealingOnBackPressure: true,
568d0075 680 tasksFinishedTimeout: 2000
20c6f652 681 })
a20f0ba5
JB
682 pool.enableTasksQueue(false)
683 expect(pool.opts.enableTasksQueue).toBe(false)
684 expect(pool.opts.tasksQueueOptions).toBeUndefined()
685 await pool.destroy()
686 })
687
2431bdb4 688 it('Verify that pool tasks queue options can be set', async () => {
a20f0ba5
JB
689 const pool = new FixedThreadPool(
690 numberOfWorkers,
b2fd3f4a 691 './tests/worker-files/thread/testWorker.mjs',
a20f0ba5
JB
692 { enableTasksQueue: true }
693 )
20c6f652
JB
694 expect(pool.opts.tasksQueueOptions).toStrictEqual({
695 concurrency: 1,
2324f8c9 696 size: Math.pow(numberOfWorkers, 2),
dbd73092 697 taskStealing: true,
32b141fd 698 tasksStealingOnBackPressure: true,
568d0075 699 tasksFinishedTimeout: 2000
20c6f652 700 })
d6ca1416 701 for (const workerNode of pool.workerNodes) {
2324f8c9
JB
702 expect(workerNode.tasksQueueBackPressureSize).toBe(
703 pool.opts.tasksQueueOptions.size
704 )
d6ca1416
JB
705 }
706 pool.setTasksQueueOptions({
707 concurrency: 2,
2324f8c9 708 size: 2,
d6ca1416 709 taskStealing: false,
32b141fd 710 tasksStealingOnBackPressure: false,
568d0075 711 tasksFinishedTimeout: 3000
d6ca1416 712 })
20c6f652
JB
713 expect(pool.opts.tasksQueueOptions).toStrictEqual({
714 concurrency: 2,
2324f8c9 715 size: 2,
d6ca1416 716 taskStealing: false,
32b141fd 717 tasksStealingOnBackPressure: false,
568d0075 718 tasksFinishedTimeout: 3000
d6ca1416
JB
719 })
720 for (const workerNode of pool.workerNodes) {
2324f8c9
JB
721 expect(workerNode.tasksQueueBackPressureSize).toBe(
722 pool.opts.tasksQueueOptions.size
723 )
d6ca1416
JB
724 }
725 pool.setTasksQueueOptions({
726 concurrency: 1,
727 taskStealing: true,
728 tasksStealingOnBackPressure: true
729 })
730 expect(pool.opts.tasksQueueOptions).toStrictEqual({
731 concurrency: 1,
2324f8c9 732 size: Math.pow(numberOfWorkers, 2),
dbd73092 733 taskStealing: true,
32b141fd 734 tasksStealingOnBackPressure: true,
568d0075 735 tasksFinishedTimeout: 2000
20c6f652 736 })
d6ca1416 737 for (const workerNode of pool.workerNodes) {
2324f8c9
JB
738 expect(workerNode.tasksQueueBackPressureSize).toBe(
739 pool.opts.tasksQueueOptions.size
740 )
d6ca1416 741 }
948faff7 742 expect(() => pool.setTasksQueueOptions('invalidTasksQueueOptions')).toThrow(
8735b4e5
JB
743 new TypeError('Invalid tasks queue options: must be a plain object')
744 )
948faff7 745 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrow(
e695d66f 746 new RangeError(
20c6f652 747 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
8735b4e5
JB
748 )
749 )
948faff7 750 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrow(
e695d66f 751 new RangeError(
20c6f652 752 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
8735b4e5 753 )
a20f0ba5 754 )
948faff7 755 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrow(
20c6f652
JB
756 new TypeError('Invalid worker node tasks concurrency: must be an integer')
757 )
948faff7 758 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrow(
20c6f652 759 new RangeError(
68dbcdc0 760 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
20c6f652
JB
761 )
762 )
948faff7 763 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrow(
20c6f652 764 new RangeError(
68dbcdc0 765 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
20c6f652
JB
766 )
767 )
948faff7 768 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrow(
68dbcdc0 769 new TypeError('Invalid worker node tasks queue size: must be an integer')
f0d7f803 770 )
a20f0ba5
JB
771 await pool.destroy()
772 })
773
6b27d407
JB
774 it('Verify that pool info is set', async () => {
775 let pool = new FixedThreadPool(
776 numberOfWorkers,
b2fd3f4a 777 './tests/worker-files/thread/testWorker.mjs'
6b27d407 778 )
2dca6cab
JB
779 expect(pool.info).toStrictEqual({
780 version,
781 type: PoolTypes.fixed,
782 worker: WorkerTypes.thread,
47352846 783 started: true,
2dca6cab
JB
784 ready: true,
785 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
786 minSize: numberOfWorkers,
787 maxSize: numberOfWorkers,
788 workerNodes: numberOfWorkers,
789 idleWorkerNodes: numberOfWorkers,
790 busyWorkerNodes: 0,
791 executedTasks: 0,
792 executingTasks: 0,
2dca6cab
JB
793 failedTasks: 0
794 })
6b27d407
JB
795 await pool.destroy()
796 pool = new DynamicClusterPool(
2431bdb4 797 Math.floor(numberOfWorkers / 2),
6b27d407 798 numberOfWorkers,
ecdfbdc0 799 './tests/worker-files/cluster/testWorker.js'
6b27d407 800 )
2dca6cab
JB
801 expect(pool.info).toStrictEqual({
802 version,
803 type: PoolTypes.dynamic,
804 worker: WorkerTypes.cluster,
47352846 805 started: true,
2dca6cab
JB
806 ready: true,
807 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
808 minSize: Math.floor(numberOfWorkers / 2),
809 maxSize: numberOfWorkers,
810 workerNodes: Math.floor(numberOfWorkers / 2),
811 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
812 busyWorkerNodes: 0,
813 executedTasks: 0,
814 executingTasks: 0,
2dca6cab
JB
815 failedTasks: 0
816 })
6b27d407
JB
817 await pool.destroy()
818 })
819
2431bdb4 820 it('Verify that pool worker tasks usage are initialized', async () => {
bf9549ae
JB
821 const pool = new FixedClusterPool(
822 numberOfWorkers,
823 './tests/worker-files/cluster/testWorker.js'
824 )
f06e48d8 825 for (const workerNode of pool.workerNodes) {
47352846 826 expect(workerNode).toBeInstanceOf(WorkerNode)
465b2940 827 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
828 tasks: {
829 executed: 0,
830 executing: 0,
831 queued: 0,
df593701 832 maxQueued: 0,
463226a4 833 sequentiallyStolen: 0,
68cbdc84 834 stolen: 0,
a4e07f72
JB
835 failed: 0
836 },
837 runTime: {
4ba4c7f9 838 history: new CircularArray()
a4e07f72
JB
839 },
840 waitTime: {
4ba4c7f9 841 history: new CircularArray()
a4e07f72 842 },
5df69fab
JB
843 elu: {
844 idle: {
4ba4c7f9 845 history: new CircularArray()
5df69fab
JB
846 },
847 active: {
4ba4c7f9 848 history: new CircularArray()
f7510105 849 }
5df69fab 850 }
86bf340d 851 })
f06e48d8
JB
852 }
853 await pool.destroy()
854 })
855
2431bdb4
JB
856 it('Verify that pool worker tasks queue are initialized', async () => {
857 let pool = new FixedClusterPool(
f06e48d8
JB
858 numberOfWorkers,
859 './tests/worker-files/cluster/testWorker.js'
860 )
861 for (const workerNode of pool.workerNodes) {
47352846 862 expect(workerNode).toBeInstanceOf(WorkerNode)
574b351d 863 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
4d8bf9e4 864 expect(workerNode.tasksQueue.size).toBe(0)
9c16fb4b 865 expect(workerNode.tasksQueue.maxSize).toBe(0)
bf9549ae 866 }
fd7ebd49 867 await pool.destroy()
2431bdb4
JB
868 pool = new DynamicThreadPool(
869 Math.floor(numberOfWorkers / 2),
870 numberOfWorkers,
b2fd3f4a 871 './tests/worker-files/thread/testWorker.mjs'
2431bdb4
JB
872 )
873 for (const workerNode of pool.workerNodes) {
47352846 874 expect(workerNode).toBeInstanceOf(WorkerNode)
574b351d 875 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
2431bdb4
JB
876 expect(workerNode.tasksQueue.size).toBe(0)
877 expect(workerNode.tasksQueue.maxSize).toBe(0)
878 }
213cbac6 879 await pool.destroy()
2431bdb4
JB
880 })
881
882 it('Verify that pool worker info are initialized', async () => {
883 let pool = new FixedClusterPool(
884 numberOfWorkers,
885 './tests/worker-files/cluster/testWorker.js'
886 )
2dca6cab 887 for (const workerNode of pool.workerNodes) {
47352846 888 expect(workerNode).toBeInstanceOf(WorkerNode)
2dca6cab
JB
889 expect(workerNode.info).toStrictEqual({
890 id: expect.any(Number),
891 type: WorkerTypes.cluster,
892 dynamic: false,
893 ready: true
894 })
895 }
2431bdb4
JB
896 await pool.destroy()
897 pool = new DynamicThreadPool(
898 Math.floor(numberOfWorkers / 2),
899 numberOfWorkers,
b2fd3f4a 900 './tests/worker-files/thread/testWorker.mjs'
2431bdb4 901 )
2dca6cab 902 for (const workerNode of pool.workerNodes) {
47352846 903 expect(workerNode).toBeInstanceOf(WorkerNode)
2dca6cab
JB
904 expect(workerNode.info).toStrictEqual({
905 id: expect.any(Number),
906 type: WorkerTypes.thread,
907 dynamic: false,
7884d183 908 ready: true
2dca6cab
JB
909 })
910 }
213cbac6 911 await pool.destroy()
bf9549ae
JB
912 })
913
711623b8
JB
914 it('Verify that pool statuses are checked at start or destroy', async () => {
915 const pool = new FixedThreadPool(
916 numberOfWorkers,
917 './tests/worker-files/thread/testWorker.mjs'
918 )
919 expect(pool.info.started).toBe(true)
920 expect(pool.info.ready).toBe(true)
921 expect(() => pool.start()).toThrow(
922 new Error('Cannot start an already started pool')
923 )
924 await pool.destroy()
925 expect(pool.info.started).toBe(false)
926 expect(pool.info.ready).toBe(false)
927 await expect(pool.destroy()).rejects.toThrow(
928 new Error('Cannot destroy an already destroyed pool')
929 )
930 })
931
47352846
JB
932 it('Verify that pool can be started after initialization', async () => {
933 const pool = new FixedClusterPool(
934 numberOfWorkers,
935 './tests/worker-files/cluster/testWorker.js',
936 {
937 startWorkers: false
938 }
939 )
940 expect(pool.info.started).toBe(false)
941 expect(pool.info.ready).toBe(false)
55082af9 942 expect(pool.readyEventEmitted).toBe(false)
47352846 943 expect(pool.workerNodes).toStrictEqual([])
948faff7 944 await expect(pool.execute()).rejects.toThrow(
47352846
JB
945 new Error('Cannot execute a task on not started pool')
946 )
947 pool.start()
948 expect(pool.info.started).toBe(true)
949 expect(pool.info.ready).toBe(true)
55082af9
JB
950 await waitPoolEvents(pool, PoolEvents.ready, 1)
951 expect(pool.readyEventEmitted).toBe(true)
47352846
JB
952 expect(pool.workerNodes.length).toBe(numberOfWorkers)
953 for (const workerNode of pool.workerNodes) {
954 expect(workerNode).toBeInstanceOf(WorkerNode)
955 }
956 await pool.destroy()
957 })
958
9d2d0da1
JB
959 it('Verify that pool execute() arguments are checked', async () => {
960 const pool = new FixedClusterPool(
961 numberOfWorkers,
962 './tests/worker-files/cluster/testWorker.js'
963 )
948faff7 964 await expect(pool.execute(undefined, 0)).rejects.toThrow(
9d2d0da1
JB
965 new TypeError('name argument must be a string')
966 )
948faff7 967 await expect(pool.execute(undefined, '')).rejects.toThrow(
9d2d0da1
JB
968 new TypeError('name argument must not be an empty string')
969 )
948faff7 970 await expect(pool.execute(undefined, undefined, {})).rejects.toThrow(
9d2d0da1
JB
971 new TypeError('transferList argument must be an array')
972 )
973 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
974 "Task function 'unknown' not found"
975 )
976 await pool.destroy()
948faff7 977 await expect(pool.execute()).rejects.toThrow(
47352846 978 new Error('Cannot execute a task on not started pool')
9d2d0da1
JB
979 )
980 })
981
2431bdb4 982 it('Verify that pool worker tasks usage are computed', async () => {
bf9549ae
JB
983 const pool = new FixedClusterPool(
984 numberOfWorkers,
985 './tests/worker-files/cluster/testWorker.js'
986 )
09c2d0d3 987 const promises = new Set()
fc027381
JB
988 const maxMultiplier = 2
989 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
09c2d0d3 990 promises.add(pool.execute())
bf9549ae 991 }
f06e48d8 992 for (const workerNode of pool.workerNodes) {
465b2940 993 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
994 tasks: {
995 executed: 0,
996 executing: maxMultiplier,
997 queued: 0,
df593701 998 maxQueued: 0,
463226a4 999 sequentiallyStolen: 0,
68cbdc84 1000 stolen: 0,
a4e07f72
JB
1001 failed: 0
1002 },
1003 runTime: {
a4e07f72
JB
1004 history: expect.any(CircularArray)
1005 },
1006 waitTime: {
a4e07f72
JB
1007 history: expect.any(CircularArray)
1008 },
5df69fab
JB
1009 elu: {
1010 idle: {
5df69fab
JB
1011 history: expect.any(CircularArray)
1012 },
1013 active: {
5df69fab 1014 history: expect.any(CircularArray)
f7510105 1015 }
5df69fab 1016 }
86bf340d 1017 })
bf9549ae
JB
1018 }
1019 await Promise.all(promises)
f06e48d8 1020 for (const workerNode of pool.workerNodes) {
465b2940 1021 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
1022 tasks: {
1023 executed: maxMultiplier,
1024 executing: 0,
1025 queued: 0,
df593701 1026 maxQueued: 0,
463226a4 1027 sequentiallyStolen: 0,
68cbdc84 1028 stolen: 0,
a4e07f72
JB
1029 failed: 0
1030 },
1031 runTime: {
a4e07f72
JB
1032 history: expect.any(CircularArray)
1033 },
1034 waitTime: {
a4e07f72
JB
1035 history: expect.any(CircularArray)
1036 },
5df69fab
JB
1037 elu: {
1038 idle: {
5df69fab
JB
1039 history: expect.any(CircularArray)
1040 },
1041 active: {
5df69fab 1042 history: expect.any(CircularArray)
f7510105 1043 }
5df69fab 1044 }
86bf340d 1045 })
bf9549ae 1046 }
fd7ebd49 1047 await pool.destroy()
bf9549ae
JB
1048 })
1049
2431bdb4 1050 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
7fd82a1c 1051 const pool = new DynamicThreadPool(
2431bdb4 1052 Math.floor(numberOfWorkers / 2),
8f4878b7 1053 numberOfWorkers,
b2fd3f4a 1054 './tests/worker-files/thread/testWorker.mjs'
9e619829 1055 )
09c2d0d3 1056 const promises = new Set()
ee9f5295
JB
1057 const maxMultiplier = 2
1058 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
09c2d0d3 1059 promises.add(pool.execute())
9e619829
JB
1060 }
1061 await Promise.all(promises)
f06e48d8 1062 for (const workerNode of pool.workerNodes) {
465b2940 1063 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
1064 tasks: {
1065 executed: expect.any(Number),
1066 executing: 0,
1067 queued: 0,
df593701 1068 maxQueued: 0,
463226a4 1069 sequentiallyStolen: 0,
68cbdc84 1070 stolen: 0,
a4e07f72
JB
1071 failed: 0
1072 },
1073 runTime: {
a4e07f72
JB
1074 history: expect.any(CircularArray)
1075 },
1076 waitTime: {
a4e07f72
JB
1077 history: expect.any(CircularArray)
1078 },
5df69fab
JB
1079 elu: {
1080 idle: {
5df69fab
JB
1081 history: expect.any(CircularArray)
1082 },
1083 active: {
5df69fab 1084 history: expect.any(CircularArray)
f7510105 1085 }
5df69fab 1086 }
86bf340d 1087 })
465b2940 1088 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
94407def
JB
1089 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1090 numberOfWorkers * maxMultiplier
1091 )
b97d82d8
JB
1092 expect(workerNode.usage.runTime.history.length).toBe(0)
1093 expect(workerNode.usage.waitTime.history.length).toBe(0)
1094 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1095 expect(workerNode.usage.elu.active.history.length).toBe(0)
9e619829
JB
1096 }
1097 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
f06e48d8 1098 for (const workerNode of pool.workerNodes) {
465b2940 1099 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
1100 tasks: {
1101 executed: 0,
1102 executing: 0,
1103 queued: 0,
df593701 1104 maxQueued: 0,
463226a4 1105 sequentiallyStolen: 0,
68cbdc84 1106 stolen: 0,
a4e07f72
JB
1107 failed: 0
1108 },
1109 runTime: {
a4e07f72
JB
1110 history: expect.any(CircularArray)
1111 },
1112 waitTime: {
a4e07f72
JB
1113 history: expect.any(CircularArray)
1114 },
5df69fab
JB
1115 elu: {
1116 idle: {
5df69fab
JB
1117 history: expect.any(CircularArray)
1118 },
1119 active: {
5df69fab 1120 history: expect.any(CircularArray)
f7510105 1121 }
5df69fab 1122 }
86bf340d 1123 })
465b2940
JB
1124 expect(workerNode.usage.runTime.history.length).toBe(0)
1125 expect(workerNode.usage.waitTime.history.length).toBe(0)
b97d82d8
JB
1126 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1127 expect(workerNode.usage.elu.active.history.length).toBe(0)
ee11a4a2 1128 }
fd7ebd49 1129 await pool.destroy()
ee11a4a2
JB
1130 })
1131
a1763c54
JB
1132 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1133 const pool = new DynamicClusterPool(
2431bdb4 1134 Math.floor(numberOfWorkers / 2),
164d950a 1135 numberOfWorkers,
a1763c54 1136 './tests/worker-files/cluster/testWorker.js'
164d950a 1137 )
c726f66c 1138 expect(pool.emitter.eventNames()).toStrictEqual([])
d46660cd 1139 let poolInfo
a1763c54 1140 let poolReady = 0
041dc05b 1141 pool.emitter.on(PoolEvents.ready, info => {
a1763c54 1142 ++poolReady
d46660cd
JB
1143 poolInfo = info
1144 })
a1763c54 1145 await waitPoolEvents(pool, PoolEvents.ready, 1)
c726f66c 1146 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
a1763c54 1147 expect(poolReady).toBe(1)
d46660cd 1148 expect(poolInfo).toStrictEqual({
23ccf9d7 1149 version,
d46660cd 1150 type: PoolTypes.dynamic,
a1763c54 1151 worker: WorkerTypes.cluster,
47352846 1152 started: true,
a1763c54 1153 ready: true,
2431bdb4
JB
1154 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1155 minSize: expect.any(Number),
1156 maxSize: expect.any(Number),
1157 workerNodes: expect.any(Number),
1158 idleWorkerNodes: expect.any(Number),
1159 busyWorkerNodes: expect.any(Number),
1160 executedTasks: expect.any(Number),
1161 executingTasks: expect.any(Number),
2431bdb4
JB
1162 failedTasks: expect.any(Number)
1163 })
1164 await pool.destroy()
1165 })
1166
a1763c54
JB
1167 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1168 const pool = new FixedThreadPool(
2431bdb4 1169 numberOfWorkers,
b2fd3f4a 1170 './tests/worker-files/thread/testWorker.mjs'
2431bdb4 1171 )
c726f66c 1172 expect(pool.emitter.eventNames()).toStrictEqual([])
a1763c54
JB
1173 const promises = new Set()
1174 let poolBusy = 0
2431bdb4 1175 let poolInfo
041dc05b 1176 pool.emitter.on(PoolEvents.busy, info => {
a1763c54 1177 ++poolBusy
2431bdb4
JB
1178 poolInfo = info
1179 })
c726f66c 1180 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
a1763c54
JB
1181 for (let i = 0; i < numberOfWorkers * 2; i++) {
1182 promises.add(pool.execute())
1183 }
1184 await Promise.all(promises)
1185 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1186 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1187 expect(poolBusy).toBe(numberOfWorkers + 1)
2431bdb4
JB
1188 expect(poolInfo).toStrictEqual({
1189 version,
a1763c54
JB
1190 type: PoolTypes.fixed,
1191 worker: WorkerTypes.thread,
47352846
JB
1192 started: true,
1193 ready: true,
2431bdb4 1194 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
d46660cd
JB
1195 minSize: expect.any(Number),
1196 maxSize: expect.any(Number),
1197 workerNodes: expect.any(Number),
1198 idleWorkerNodes: expect.any(Number),
1199 busyWorkerNodes: expect.any(Number),
a4e07f72
JB
1200 executedTasks: expect.any(Number),
1201 executingTasks: expect.any(Number),
a4e07f72 1202 failedTasks: expect.any(Number)
d46660cd 1203 })
164d950a
JB
1204 await pool.destroy()
1205 })
1206
a1763c54
JB
1207 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1208 const pool = new DynamicThreadPool(
1209 Math.floor(numberOfWorkers / 2),
7c0ba920 1210 numberOfWorkers,
b2fd3f4a 1211 './tests/worker-files/thread/testWorker.mjs'
7c0ba920 1212 )
c726f66c 1213 expect(pool.emitter.eventNames()).toStrictEqual([])
09c2d0d3 1214 const promises = new Set()
a1763c54 1215 let poolFull = 0
d46660cd 1216 let poolInfo
041dc05b 1217 pool.emitter.on(PoolEvents.full, info => {
a1763c54 1218 ++poolFull
d46660cd
JB
1219 poolInfo = info
1220 })
c726f66c 1221 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.full])
7c0ba920 1222 for (let i = 0; i < numberOfWorkers * 2; i++) {
f5d14e90 1223 promises.add(pool.execute())
7c0ba920 1224 }
cf597bc5 1225 await Promise.all(promises)
33e6bb4c 1226 expect(poolFull).toBe(1)
d46660cd 1227 expect(poolInfo).toStrictEqual({
23ccf9d7 1228 version,
a1763c54 1229 type: PoolTypes.dynamic,
d46660cd 1230 worker: WorkerTypes.thread,
47352846
JB
1231 started: true,
1232 ready: true,
2431bdb4 1233 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
8735b4e5
JB
1234 minSize: expect.any(Number),
1235 maxSize: expect.any(Number),
1236 workerNodes: expect.any(Number),
1237 idleWorkerNodes: expect.any(Number),
1238 busyWorkerNodes: expect.any(Number),
1239 executedTasks: expect.any(Number),
1240 executingTasks: expect.any(Number),
1241 failedTasks: expect.any(Number)
1242 })
1243 await pool.destroy()
1244 })
1245
3e8611a8 1246 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
b1aae695 1247 const pool = new FixedThreadPool(
8735b4e5 1248 numberOfWorkers,
b2fd3f4a 1249 './tests/worker-files/thread/testWorker.mjs',
8735b4e5
JB
1250 {
1251 enableTasksQueue: true
1252 }
1253 )
a074ffee 1254 stub(pool, 'hasBackPressure').returns(true)
c726f66c 1255 expect(pool.emitter.eventNames()).toStrictEqual([])
8735b4e5
JB
1256 const promises = new Set()
1257 let poolBackPressure = 0
1258 let poolInfo
041dc05b 1259 pool.emitter.on(PoolEvents.backPressure, info => {
8735b4e5
JB
1260 ++poolBackPressure
1261 poolInfo = info
1262 })
c726f66c 1263 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.backPressure])
033f1776 1264 for (let i = 0; i < numberOfWorkers + 1; i++) {
8735b4e5
JB
1265 promises.add(pool.execute())
1266 }
1267 await Promise.all(promises)
033f1776 1268 expect(poolBackPressure).toBe(1)
8735b4e5
JB
1269 expect(poolInfo).toStrictEqual({
1270 version,
3e8611a8 1271 type: PoolTypes.fixed,
8735b4e5 1272 worker: WorkerTypes.thread,
47352846
JB
1273 started: true,
1274 ready: true,
8735b4e5 1275 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
d46660cd
JB
1276 minSize: expect.any(Number),
1277 maxSize: expect.any(Number),
1278 workerNodes: expect.any(Number),
1279 idleWorkerNodes: expect.any(Number),
1280 busyWorkerNodes: expect.any(Number),
a4e07f72
JB
1281 executedTasks: expect.any(Number),
1282 executingTasks: expect.any(Number),
3e8611a8
JB
1283 maxQueuedTasks: expect.any(Number),
1284 queuedTasks: expect.any(Number),
1285 backPressure: true,
68cbdc84 1286 stolenTasks: expect.any(Number),
a4e07f72 1287 failedTasks: expect.any(Number)
d46660cd 1288 })
e458c82e 1289 expect(pool.hasBackPressure.callCount).toBe(5)
fd7ebd49 1290 await pool.destroy()
7c0ba920 1291 })
70a4f5ea 1292
85b2561d
JB
1293 it('Verify that destroy() waits for queued tasks to finish', async () => {
1294 const tasksFinishedTimeout = 2500
1295 const pool = new FixedThreadPool(
1296 numberOfWorkers,
1297 './tests/worker-files/thread/asyncWorker.mjs',
1298 {
1299 enableTasksQueue: true,
1300 tasksQueueOptions: { tasksFinishedTimeout }
1301 }
1302 )
1303 const maxMultiplier = 4
1304 let tasksFinished = 0
1305 for (const workerNode of pool.workerNodes) {
1306 workerNode.on('taskFinished', () => {
1307 ++tasksFinished
1308 })
1309 }
1310 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1311 pool.execute()
1312 }
1313 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1314 const startTime = performance.now()
1315 await pool.destroy()
1316 const elapsedTime = performance.now() - startTime
1317 expect(tasksFinished).toBe(numberOfWorkers * maxMultiplier)
1318 expect(elapsedTime).toBeGreaterThanOrEqual(2000)
6d41131f 1319 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 100)
85b2561d
JB
1320 })
1321
1322 it('Verify that destroy() waits until the tasks finished timeout is reached', async () => {
1323 const tasksFinishedTimeout = 1000
1324 const pool = new FixedThreadPool(
1325 numberOfWorkers,
1326 './tests/worker-files/thread/asyncWorker.mjs',
1327 {
1328 enableTasksQueue: true,
1329 tasksQueueOptions: { tasksFinishedTimeout }
1330 }
1331 )
1332 const maxMultiplier = 4
1333 let tasksFinished = 0
1334 for (const workerNode of pool.workerNodes) {
1335 workerNode.on('taskFinished', () => {
1336 ++tasksFinished
1337 })
1338 }
1339 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1340 pool.execute()
1341 }
1342 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1343 const startTime = performance.now()
1344 await pool.destroy()
1345 const elapsedTime = performance.now() - startTime
1346 expect(tasksFinished).toBe(0)
c004ecec 1347 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 600)
85b2561d
JB
1348 })
1349
f18fd12b
JB
1350 it('Verify that pool asynchronous resource track tasks execution', async () => {
1351 let taskAsyncId
1352 let initCalls = 0
1353 let beforeCalls = 0
1354 let afterCalls = 0
1355 let resolveCalls = 0
1356 const hook = createHook({
1357 init (asyncId, type) {
1358 if (type === 'poolifier:task') {
1359 initCalls++
1360 taskAsyncId = asyncId
1361 }
1362 },
1363 before (asyncId) {
1364 if (asyncId === taskAsyncId) beforeCalls++
1365 },
1366 after (asyncId) {
1367 if (asyncId === taskAsyncId) afterCalls++
1368 },
1369 promiseResolve () {
1370 if (executionAsyncId() === taskAsyncId) resolveCalls++
1371 }
1372 })
f18fd12b
JB
1373 const pool = new FixedThreadPool(
1374 numberOfWorkers,
1375 './tests/worker-files/thread/testWorker.mjs'
1376 )
8954c0a3 1377 hook.enable()
f18fd12b
JB
1378 await pool.execute()
1379 hook.disable()
1380 expect(initCalls).toBe(1)
1381 expect(beforeCalls).toBe(1)
1382 expect(afterCalls).toBe(1)
1383 expect(resolveCalls).toBe(1)
8954c0a3 1384 await pool.destroy()
f18fd12b
JB
1385 })
1386
9eae3c69
JB
1387 it('Verify that hasTaskFunction() is working', async () => {
1388 const dynamicThreadPool = new DynamicThreadPool(
1389 Math.floor(numberOfWorkers / 2),
1390 numberOfWorkers,
b2fd3f4a 1391 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
9eae3c69
JB
1392 )
1393 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1394 expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1395 expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1396 true
1397 )
1398 expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true)
1399 expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true)
1400 expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false)
1401 await dynamicThreadPool.destroy()
1402 const fixedClusterPool = new FixedClusterPool(
1403 numberOfWorkers,
1404 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1405 )
1406 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1407 expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1408 expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1409 true
1410 )
1411 expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true)
1412 expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true)
1413 expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false)
1414 await fixedClusterPool.destroy()
1415 })
1416
1417 it('Verify that addTaskFunction() is working', async () => {
1418 const dynamicThreadPool = new DynamicThreadPool(
1419 Math.floor(numberOfWorkers / 2),
1420 numberOfWorkers,
b2fd3f4a 1421 './tests/worker-files/thread/testWorker.mjs'
9eae3c69
JB
1422 )
1423 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
3feeab69
JB
1424 await expect(
1425 dynamicThreadPool.addTaskFunction(0, () => {})
948faff7 1426 ).rejects.toThrow(new TypeError('name argument must be a string'))
3feeab69
JB
1427 await expect(
1428 dynamicThreadPool.addTaskFunction('', () => {})
948faff7 1429 ).rejects.toThrow(
3feeab69
JB
1430 new TypeError('name argument must not be an empty string')
1431 )
948faff7
JB
1432 await expect(dynamicThreadPool.addTaskFunction('test', 0)).rejects.toThrow(
1433 new TypeError('fn argument must be a function')
1434 )
1435 await expect(dynamicThreadPool.addTaskFunction('test', '')).rejects.toThrow(
1436 new TypeError('fn argument must be a function')
1437 )
9eae3c69
JB
1438 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1439 DEFAULT_TASK_NAME,
1440 'test'
1441 ])
1442 const echoTaskFunction = data => {
1443 return data
1444 }
1445 await expect(
1446 dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1447 ).resolves.toBe(true)
1448 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1449 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1450 echoTaskFunction
1451 )
1452 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1453 DEFAULT_TASK_NAME,
1454 'test',
1455 'echo'
1456 ])
1457 const taskFunctionData = { test: 'test' }
1458 const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
1459 expect(echoResult).toStrictEqual(taskFunctionData)
adee6053
JB
1460 for (const workerNode of dynamicThreadPool.workerNodes) {
1461 expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
1462 tasks: {
1463 executed: expect.any(Number),
1464 executing: 0,
1465 queued: 0,
463226a4 1466 sequentiallyStolen: 0,
5ad42e34 1467 stolen: 0,
adee6053
JB
1468 failed: 0
1469 },
1470 runTime: {
1471 history: new CircularArray()
1472 },
1473 waitTime: {
1474 history: new CircularArray()
1475 },
1476 elu: {
1477 idle: {
1478 history: new CircularArray()
1479 },
1480 active: {
1481 history: new CircularArray()
1482 }
1483 }
1484 })
1485 }
9eae3c69
JB
1486 await dynamicThreadPool.destroy()
1487 })
1488
1489 it('Verify that removeTaskFunction() is working', async () => {
1490 const dynamicThreadPool = new DynamicThreadPool(
1491 Math.floor(numberOfWorkers / 2),
1492 numberOfWorkers,
b2fd3f4a 1493 './tests/worker-files/thread/testWorker.mjs'
9eae3c69
JB
1494 )
1495 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1496 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1497 DEFAULT_TASK_NAME,
1498 'test'
1499 ])
948faff7 1500 await expect(dynamicThreadPool.removeTaskFunction('test')).rejects.toThrow(
16248b23 1501 new Error('Cannot remove a task function not handled on the pool side')
9eae3c69
JB
1502 )
1503 const echoTaskFunction = data => {
1504 return data
1505 }
1506 await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1507 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1508 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1509 echoTaskFunction
1510 )
1511 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1512 DEFAULT_TASK_NAME,
1513 'test',
1514 'echo'
1515 ])
1516 await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
1517 true
1518 )
1519 expect(dynamicThreadPool.taskFunctions.size).toBe(0)
1520 expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
1521 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1522 DEFAULT_TASK_NAME,
1523 'test'
1524 ])
1525 await dynamicThreadPool.destroy()
1526 })
1527
30500265 1528 it('Verify that listTaskFunctionNames() is working', async () => {
90d7d101
JB
1529 const dynamicThreadPool = new DynamicThreadPool(
1530 Math.floor(numberOfWorkers / 2),
1531 numberOfWorkers,
b2fd3f4a 1532 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
90d7d101
JB
1533 )
1534 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
66979634 1535 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
6cd5248f 1536 DEFAULT_TASK_NAME,
90d7d101
JB
1537 'jsonIntegerSerialization',
1538 'factorial',
1539 'fibonacci'
1540 ])
9eae3c69 1541 await dynamicThreadPool.destroy()
90d7d101
JB
1542 const fixedClusterPool = new FixedClusterPool(
1543 numberOfWorkers,
1544 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1545 )
1546 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
66979634 1547 expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([
6cd5248f 1548 DEFAULT_TASK_NAME,
90d7d101
JB
1549 'jsonIntegerSerialization',
1550 'factorial',
1551 'fibonacci'
1552 ])
0fe39c97 1553 await fixedClusterPool.destroy()
90d7d101
JB
1554 })
1555
9eae3c69 1556 it('Verify that setDefaultTaskFunction() is working', async () => {
30500265
JB
1557 const dynamicThreadPool = new DynamicThreadPool(
1558 Math.floor(numberOfWorkers / 2),
1559 numberOfWorkers,
b2fd3f4a 1560 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
30500265
JB
1561 )
1562 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
711623b8 1563 const workerId = dynamicThreadPool.workerNodes[0].info.id
948faff7 1564 await expect(dynamicThreadPool.setDefaultTaskFunction(0)).rejects.toThrow(
b0b55f57 1565 new Error(
711623b8 1566 `Task function operation 'default' failed on worker ${workerId} with error: 'TypeError: name parameter is not a string'`
b0b55f57
JB
1567 )
1568 )
1569 await expect(
1570 dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
948faff7 1571 ).rejects.toThrow(
b0b55f57 1572 new Error(
711623b8 1573 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function reserved name as the default task function'`
b0b55f57
JB
1574 )
1575 )
1576 await expect(
1577 dynamicThreadPool.setDefaultTaskFunction('unknown')
948faff7 1578 ).rejects.toThrow(
b0b55f57 1579 new Error(
711623b8 1580 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function to a non-existing task function'`
b0b55f57
JB
1581 )
1582 )
9eae3c69
JB
1583 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1584 DEFAULT_TASK_NAME,
1585 'jsonIntegerSerialization',
1586 'factorial',
1587 'fibonacci'
1588 ])
1589 await expect(
1590 dynamicThreadPool.setDefaultTaskFunction('factorial')
1591 ).resolves.toBe(true)
1592 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1593 DEFAULT_TASK_NAME,
1594 'factorial',
1595 'jsonIntegerSerialization',
1596 'fibonacci'
1597 ])
1598 await expect(
1599 dynamicThreadPool.setDefaultTaskFunction('fibonacci')
1600 ).resolves.toBe(true)
1601 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1602 DEFAULT_TASK_NAME,
1603 'fibonacci',
1604 'jsonIntegerSerialization',
1605 'factorial'
1606 ])
cda9ba34 1607 await dynamicThreadPool.destroy()
30500265
JB
1608 })
1609
90d7d101 1610 it('Verify that multiple task functions worker is working', async () => {
70a4f5ea 1611 const pool = new DynamicClusterPool(
2431bdb4 1612 Math.floor(numberOfWorkers / 2),
70a4f5ea 1613 numberOfWorkers,
90d7d101 1614 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
70a4f5ea
JB
1615 )
1616 const data = { n: 10 }
82888165 1617 const result0 = await pool.execute(data)
30b963d4 1618 expect(result0).toStrictEqual({ ok: 1 })
70a4f5ea 1619 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
30b963d4 1620 expect(result1).toStrictEqual({ ok: 1 })
70a4f5ea
JB
1621 const result2 = await pool.execute(data, 'factorial')
1622 expect(result2).toBe(3628800)
1623 const result3 = await pool.execute(data, 'fibonacci')
024daf59 1624 expect(result3).toBe(55)
5bb5be17
JB
1625 expect(pool.info.executingTasks).toBe(0)
1626 expect(pool.info.executedTasks).toBe(4)
b414b84c 1627 for (const workerNode of pool.workerNodes) {
66979634 1628 expect(workerNode.info.taskFunctionNames).toStrictEqual([
6cd5248f 1629 DEFAULT_TASK_NAME,
b414b84c
JB
1630 'jsonIntegerSerialization',
1631 'factorial',
1632 'fibonacci'
1633 ])
1634 expect(workerNode.taskFunctionsUsage.size).toBe(3)
66979634 1635 for (const name of pool.listTaskFunctionNames()) {
5bb5be17
JB
1636 expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
1637 tasks: {
1638 executed: expect.any(Number),
4ba4c7f9 1639 executing: 0,
5bb5be17 1640 failed: 0,
68cbdc84 1641 queued: 0,
463226a4 1642 sequentiallyStolen: 0,
68cbdc84 1643 stolen: 0
5bb5be17
JB
1644 },
1645 runTime: {
1646 history: expect.any(CircularArray)
1647 },
1648 waitTime: {
1649 history: expect.any(CircularArray)
1650 },
1651 elu: {
1652 idle: {
1653 history: expect.any(CircularArray)
1654 },
1655 active: {
1656 history: expect.any(CircularArray)
1657 }
1658 }
1659 })
1660 expect(
4ba4c7f9
JB
1661 workerNode.getTaskFunctionWorkerUsage(name).tasks.executed
1662 ).toBeGreaterThan(0)
5bb5be17 1663 }
dfd7ec01
JB
1664 expect(
1665 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1666 ).toStrictEqual(
66979634
JB
1667 workerNode.getTaskFunctionWorkerUsage(
1668 workerNode.info.taskFunctionNames[1]
1669 )
dfd7ec01 1670 )
5bb5be17 1671 }
0fe39c97 1672 await pool.destroy()
70a4f5ea 1673 })
52a23942
JB
1674
1675 it('Verify sendKillMessageToWorker()', async () => {
1676 const pool = new DynamicClusterPool(
1677 Math.floor(numberOfWorkers / 2),
1678 numberOfWorkers,
1679 './tests/worker-files/cluster/testWorker.js'
1680 )
1681 const workerNodeKey = 0
1682 await expect(
adee6053 1683 pool.sendKillMessageToWorker(workerNodeKey)
52a23942 1684 ).resolves.toBeUndefined()
85b2561d
JB
1685 await expect(
1686 pool.sendKillMessageToWorker(numberOfWorkers)
1687 ).rejects.toStrictEqual(
1688 new Error(`Invalid worker node key '${numberOfWorkers}'`)
1689 )
52a23942
JB
1690 await pool.destroy()
1691 })
adee6053
JB
1692
1693 it('Verify sendTaskFunctionOperationToWorker()', async () => {
1694 const pool = new DynamicClusterPool(
1695 Math.floor(numberOfWorkers / 2),
1696 numberOfWorkers,
1697 './tests/worker-files/cluster/testWorker.js'
1698 )
1699 const workerNodeKey = 0
1700 await expect(
1701 pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
1702 taskFunctionOperation: 'add',
1703 taskFunctionName: 'empty',
1704 taskFunction: (() => {}).toString()
1705 })
1706 ).resolves.toBe(true)
1707 expect(
1708 pool.workerNodes[workerNodeKey].info.taskFunctionNames
1709 ).toStrictEqual([DEFAULT_TASK_NAME, 'test', 'empty'])
1710 await pool.destroy()
1711 })
1712
1713 it('Verify sendTaskFunctionOperationToWorkers()', async () => {
1714 const pool = new DynamicClusterPool(
1715 Math.floor(numberOfWorkers / 2),
1716 numberOfWorkers,
1717 './tests/worker-files/cluster/testWorker.js'
1718 )
adee6053
JB
1719 await expect(
1720 pool.sendTaskFunctionOperationToWorkers({
1721 taskFunctionOperation: 'add',
1722 taskFunctionName: 'empty',
1723 taskFunction: (() => {}).toString()
1724 })
1725 ).resolves.toBe(true)
1726 for (const workerNode of pool.workerNodes) {
1727 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1728 DEFAULT_TASK_NAME,
1729 'test',
1730 'empty'
1731 ])
1732 }
1733 await pool.destroy()
1734 })
3ec964d6 1735})