test: less strict expectation for CI
[poolifier.git] / tests / pools / abstract-pool.test.mjs
CommitLineData
a074ffee 1import { EventEmitterAsyncResource } from 'node:events'
2eb14889 2import { dirname, join } from 'node:path'
a074ffee 3import { readFileSync } from 'node:fs'
2eb14889 4import { fileURLToPath } from 'node:url'
f18fd12b 5import { createHook, executionAsyncId } from 'node:async_hooks'
a074ffee
JB
6import { expect } from 'expect'
7import { restore, stub } from 'sinon'
8import {
70a4f5ea 9 DynamicClusterPool,
9e619829 10 DynamicThreadPool,
aee46736 11 FixedClusterPool,
e843b904 12 FixedThreadPool,
aee46736 13 PoolEvents,
184855e6 14 PoolTypes,
3d6dd312 15 WorkerChoiceStrategies,
184855e6 16 WorkerTypes
a074ffee
JB
17} from '../../lib/index.js'
18import { CircularArray } from '../../lib/circular-array.js'
19import { Deque } from '../../lib/deque.js'
20import { DEFAULT_TASK_NAME } from '../../lib/utils.js'
21import { waitPoolEvents } from '../test-utils.js'
22import { WorkerNode } from '../../lib/pools/worker-node.js'
e1ffb94f
JB
23
24describe('Abstract pool test suite', () => {
2eb14889
JB
25 const version = JSON.parse(
26 readFileSync(
a5e5599c 27 join(dirname(fileURLToPath(import.meta.url)), '../..', 'package.json'),
2eb14889
JB
28 'utf8'
29 )
30 ).version
fc027381 31 const numberOfWorkers = 2
a8884ffd 32 class StubPoolWithIsMain extends FixedThreadPool {
e1ffb94f
JB
33 isMain () {
34 return false
35 }
3ec964d6 36 }
3ec964d6 37
dc021bcc 38 afterEach(() => {
a074ffee 39 restore()
dc021bcc
JB
40 })
41
bcf85f7f
JB
42 it('Verify that pool can be created and destroyed', async () => {
43 const pool = new FixedThreadPool(
44 numberOfWorkers,
45 './tests/worker-files/thread/testWorker.mjs'
46 )
47 expect(pool).toBeInstanceOf(FixedThreadPool)
48 await pool.destroy()
49 })
50
51 it('Verify that pool cannot be created from a non main thread/process', () => {
8d3782fa
JB
52 expect(
53 () =>
a8884ffd 54 new StubPoolWithIsMain(
7c0ba920 55 numberOfWorkers,
b2fd3f4a 56 './tests/worker-files/thread/testWorker.mjs',
8d3782fa 57 {
041dc05b 58 errorHandler: e => console.error(e)
8d3782fa
JB
59 }
60 )
948faff7 61 ).toThrow(
e695d66f
JB
62 new Error(
63 'Cannot start a pool from a worker with the same type as the pool'
64 )
04f45163 65 )
3ec964d6 66 })
c510fea7 67
bc61cfe6
JB
68 it('Verify that pool statuses properties are set', async () => {
69 const pool = new FixedThreadPool(
70 numberOfWorkers,
b2fd3f4a 71 './tests/worker-files/thread/testWorker.mjs'
bc61cfe6 72 )
bc61cfe6 73 expect(pool.started).toBe(true)
711623b8
JB
74 expect(pool.starting).toBe(false)
75 expect(pool.destroying).toBe(false)
bc61cfe6 76 await pool.destroy()
bc61cfe6
JB
77 })
78
c510fea7 79 it('Verify that filePath is checked', () => {
948faff7 80 expect(() => new FixedThreadPool(numberOfWorkers)).toThrow(
c3719753
JB
81 new TypeError('The worker file path must be specified')
82 )
83 expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrow(
84 new TypeError('The worker file path must be a string')
3d6dd312
JB
85 )
86 expect(
87 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
948faff7 88 ).toThrow(new Error("Cannot find the worker file './dummyWorker.ts'"))
8d3782fa
JB
89 })
90
91 it('Verify that numberOfWorkers is checked', () => {
8003c026
JB
92 expect(
93 () =>
94 new FixedThreadPool(
95 undefined,
b2fd3f4a 96 './tests/worker-files/thread/testWorker.mjs'
8003c026 97 )
948faff7 98 ).toThrow(
e695d66f
JB
99 new Error(
100 'Cannot instantiate a pool without specifying the number of workers'
101 )
8d3782fa
JB
102 )
103 })
104
105 it('Verify that a negative number of workers is checked', () => {
106 expect(
107 () =>
108 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
948faff7 109 ).toThrow(
473c717a
JB
110 new RangeError(
111 'Cannot instantiate a pool with a negative number of workers'
112 )
8d3782fa
JB
113 )
114 })
115
116 it('Verify that a non integer number of workers is checked', () => {
117 expect(
118 () =>
b2fd3f4a 119 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.mjs')
948faff7 120 ).toThrow(
473c717a 121 new TypeError(
0d80593b 122 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
123 )
124 )
c510fea7 125 })
7c0ba920 126
26ce26ca
JB
127 it('Verify that pool arguments number and pool type are checked', () => {
128 expect(
129 () =>
130 new FixedThreadPool(
131 numberOfWorkers,
132 './tests/worker-files/thread/testWorker.mjs',
133 undefined,
134 numberOfWorkers * 2
135 )
136 ).toThrow(
137 new Error(
138 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
139 )
140 )
141 })
142
216541b6 143 it('Verify that dynamic pool sizing is checked', () => {
a5ed75b7
JB
144 expect(
145 () =>
146 new DynamicClusterPool(
147 1,
148 undefined,
149 './tests/worker-files/cluster/testWorker.js'
150 )
948faff7 151 ).toThrow(
a5ed75b7
JB
152 new TypeError(
153 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
154 )
155 )
2761efb4
JB
156 expect(
157 () =>
158 new DynamicThreadPool(
159 0.5,
160 1,
b2fd3f4a 161 './tests/worker-files/thread/testWorker.mjs'
2761efb4 162 )
948faff7 163 ).toThrow(
2761efb4
JB
164 new TypeError(
165 'Cannot instantiate a pool with a non safe integer number of workers'
166 )
167 )
168 expect(
169 () =>
170 new DynamicClusterPool(
171 0,
172 0.5,
a5ed75b7 173 './tests/worker-files/cluster/testWorker.js'
2761efb4 174 )
948faff7 175 ).toThrow(
2761efb4
JB
176 new TypeError(
177 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
178 )
179 )
2431bdb4
JB
180 expect(
181 () =>
b2fd3f4a
JB
182 new DynamicThreadPool(
183 2,
184 1,
185 './tests/worker-files/thread/testWorker.mjs'
186 )
948faff7 187 ).toThrow(
2431bdb4
JB
188 new RangeError(
189 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
190 )
191 )
192 expect(
193 () =>
b2fd3f4a
JB
194 new DynamicThreadPool(
195 0,
196 0,
197 './tests/worker-files/thread/testWorker.mjs'
198 )
948faff7 199 ).toThrow(
2431bdb4 200 new RangeError(
213cbac6 201 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
2431bdb4
JB
202 )
203 )
21f710aa
JB
204 expect(
205 () =>
213cbac6
JB
206 new DynamicClusterPool(
207 1,
208 1,
209 './tests/worker-files/cluster/testWorker.js'
210 )
948faff7 211 ).toThrow(
21f710aa 212 new RangeError(
213cbac6 213 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
21f710aa
JB
214 )
215 )
2431bdb4
JB
216 })
217
fd7ebd49 218 it('Verify that pool options are checked', async () => {
7c0ba920
JB
219 let pool = new FixedThreadPool(
220 numberOfWorkers,
b2fd3f4a 221 './tests/worker-files/thread/testWorker.mjs'
7c0ba920 222 )
b5604034 223 expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
47352846
JB
224 expect(pool.opts).toStrictEqual({
225 startWorkers: true,
226 enableEvents: true,
227 restartWorkerOnError: true,
228 enableTasksQueue: false,
26ce26ca 229 workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN
8990357d
JB
230 })
231 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
00e1bdeb
JB
232 retries:
233 pool.info.maxSize +
234 Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
932fc8be 235 runTime: { median: false },
5df69fab 236 waitTime: { median: false },
00e1bdeb
JB
237 elu: { median: false },
238 weights: expect.objectContaining({
239 0: expect.any(Number),
240 [pool.info.maxSize - 1]: expect.any(Number)
241 })
da309861 242 })
999ef664
JB
243 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
244 .workerChoiceStrategies) {
fde30586
JB
245 expect(workerChoiceStrategy.opts).toStrictEqual(
246 expect.objectContaining({
247 retries:
248 pool.info.maxSize +
249 Object.keys(workerChoiceStrategy.opts.weights).length,
250 runTime: { median: false },
251 waitTime: { median: false },
252 elu: { median: false }
253 // weights: expect.objectContaining({
254 // 0: expect.any(Number),
255 // [pool.info.maxSize - 1]: expect.any(Number)
256 // })
00e1bdeb 257 })
fde30586 258 )
999ef664 259 }
fd7ebd49 260 await pool.destroy()
73bfd59d 261 const testHandler = () => console.info('test handler executed')
7c0ba920
JB
262 pool = new FixedThreadPool(
263 numberOfWorkers,
b2fd3f4a 264 './tests/worker-files/thread/testWorker.mjs',
7c0ba920 265 {
e4543b14 266 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
49be33fe 267 workerChoiceStrategyOptions: {
932fc8be 268 runTime: { median: true },
fc027381 269 weights: { 0: 300, 1: 200 }
49be33fe 270 },
35cf1c03 271 enableEvents: false,
1f68cede 272 restartWorkerOnError: false,
ff733df7 273 enableTasksQueue: true,
d4aeae5a 274 tasksQueueOptions: { concurrency: 2 },
35cf1c03
JB
275 messageHandler: testHandler,
276 errorHandler: testHandler,
277 onlineHandler: testHandler,
278 exitHandler: testHandler
7c0ba920
JB
279 }
280 )
7c0ba920 281 expect(pool.emitter).toBeUndefined()
47352846
JB
282 expect(pool.opts).toStrictEqual({
283 startWorkers: true,
284 enableEvents: false,
285 restartWorkerOnError: false,
286 enableTasksQueue: true,
287 tasksQueueOptions: {
288 concurrency: 2,
2324f8c9 289 size: Math.pow(numberOfWorkers, 2),
dbd73092 290 taskStealing: true,
32b141fd 291 tasksStealingOnBackPressure: true,
568d0075 292 tasksFinishedTimeout: 2000
47352846
JB
293 },
294 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
295 workerChoiceStrategyOptions: {
47352846 296 runTime: { median: true },
47352846
JB
297 weights: { 0: 300, 1: 200 }
298 },
299 onlineHandler: testHandler,
300 messageHandler: testHandler,
301 errorHandler: testHandler,
302 exitHandler: testHandler
8990357d
JB
303 })
304 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
449cd154
JB
305 retries:
306 pool.info.maxSize +
307 Object.keys(pool.opts.workerChoiceStrategyOptions.weights).length,
8990357d
JB
308 runTime: { median: true },
309 waitTime: { median: false },
310 elu: { median: false },
fc027381 311 weights: { 0: 300, 1: 200 }
da309861 312 })
999ef664
JB
313 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
314 .workerChoiceStrategies) {
315 expect(workerChoiceStrategy.opts).toStrictEqual({
449cd154
JB
316 retries:
317 pool.info.maxSize +
318 Object.keys(pool.opts.workerChoiceStrategyOptions.weights).length,
999ef664
JB
319 runTime: { median: true },
320 waitTime: { median: false },
321 elu: { median: false },
322 weights: { 0: 300, 1: 200 }
323 })
324 }
fd7ebd49 325 await pool.destroy()
7c0ba920
JB
326 })
327
fe291b64 328 it('Verify that pool options are validated', () => {
d4aeae5a
JB
329 expect(
330 () =>
331 new FixedThreadPool(
332 numberOfWorkers,
b2fd3f4a 333 './tests/worker-files/thread/testWorker.mjs',
d4aeae5a 334 {
f0d7f803 335 workerChoiceStrategy: 'invalidStrategy'
d4aeae5a
JB
336 }
337 )
948faff7 338 ).toThrow(new Error("Invalid worker choice strategy 'invalidStrategy'"))
49be33fe
JB
339 expect(
340 () =>
341 new FixedThreadPool(
342 numberOfWorkers,
b2fd3f4a 343 './tests/worker-files/thread/testWorker.mjs',
49be33fe
JB
344 {
345 workerChoiceStrategyOptions: { weights: {} }
346 }
347 )
948faff7 348 ).toThrow(
8735b4e5
JB
349 new Error(
350 'Invalid worker choice strategy options: must have a weight for each worker node'
351 )
49be33fe 352 )
f0d7f803
JB
353 expect(
354 () =>
355 new FixedThreadPool(
356 numberOfWorkers,
b2fd3f4a 357 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
358 {
359 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
360 }
361 )
948faff7 362 ).toThrow(
8735b4e5
JB
363 new Error(
364 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
365 )
f0d7f803 366 )
5c4c2dee
JB
367 expect(
368 () =>
369 new FixedThreadPool(
370 numberOfWorkers,
b2fd3f4a 371 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
372 {
373 enableTasksQueue: true,
374 tasksQueueOptions: 'invalidTasksQueueOptions'
375 }
376 )
948faff7 377 ).toThrow(
5c4c2dee
JB
378 new TypeError('Invalid tasks queue options: must be a plain object')
379 )
f0d7f803
JB
380 expect(
381 () =>
382 new FixedThreadPool(
383 numberOfWorkers,
b2fd3f4a 384 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
385 {
386 enableTasksQueue: true,
387 tasksQueueOptions: { concurrency: 0 }
388 }
389 )
948faff7 390 ).toThrow(
e695d66f 391 new RangeError(
20c6f652 392 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
8735b4e5
JB
393 )
394 )
f0d7f803
JB
395 expect(
396 () =>
397 new FixedThreadPool(
398 numberOfWorkers,
b2fd3f4a 399 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
400 {
401 enableTasksQueue: true,
5c4c2dee 402 tasksQueueOptions: { concurrency: -1 }
f0d7f803
JB
403 }
404 )
948faff7 405 ).toThrow(
5c4c2dee
JB
406 new RangeError(
407 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
408 )
8735b4e5 409 )
f0d7f803
JB
410 expect(
411 () =>
412 new FixedThreadPool(
413 numberOfWorkers,
b2fd3f4a 414 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
415 {
416 enableTasksQueue: true,
417 tasksQueueOptions: { concurrency: 0.2 }
418 }
419 )
948faff7 420 ).toThrow(
20c6f652 421 new TypeError('Invalid worker node tasks concurrency: must be an integer')
8735b4e5 422 )
5c4c2dee
JB
423 expect(
424 () =>
425 new FixedThreadPool(
426 numberOfWorkers,
b2fd3f4a 427 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
428 {
429 enableTasksQueue: true,
430 tasksQueueOptions: { size: 0 }
431 }
432 )
948faff7 433 ).toThrow(
5c4c2dee
JB
434 new RangeError(
435 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
436 )
437 )
438 expect(
439 () =>
440 new FixedThreadPool(
441 numberOfWorkers,
b2fd3f4a 442 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
443 {
444 enableTasksQueue: true,
445 tasksQueueOptions: { size: -1 }
446 }
447 )
948faff7 448 ).toThrow(
5c4c2dee
JB
449 new RangeError(
450 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
451 )
452 )
453 expect(
454 () =>
455 new FixedThreadPool(
456 numberOfWorkers,
b2fd3f4a 457 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
458 {
459 enableTasksQueue: true,
460 tasksQueueOptions: { size: 0.2 }
461 }
462 )
948faff7 463 ).toThrow(
5c4c2dee
JB
464 new TypeError('Invalid worker node tasks queue size: must be an integer')
465 )
d4aeae5a
JB
466 })
467
2431bdb4 468 it('Verify that pool worker choice strategy options can be set', async () => {
a20f0ba5
JB
469 const pool = new FixedThreadPool(
470 numberOfWorkers,
b2fd3f4a 471 './tests/worker-files/thread/testWorker.mjs',
a20f0ba5
JB
472 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
473 )
26ce26ca 474 expect(pool.opts.workerChoiceStrategyOptions).toBeUndefined()
8990357d 475 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
00e1bdeb
JB
476 retries:
477 pool.info.maxSize +
478 Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
932fc8be 479 runTime: { median: false },
5df69fab 480 waitTime: { median: false },
00e1bdeb
JB
481 elu: { median: false },
482 weights: expect.objectContaining({
483 0: expect.any(Number),
484 [pool.info.maxSize - 1]: expect.any(Number)
485 })
a20f0ba5
JB
486 })
487 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
488 .workerChoiceStrategies) {
55ddaf0f
JB
489 expect(workerChoiceStrategy.opts).toStrictEqual(
490 expect.objectContaining({
491 retries:
492 pool.info.maxSize +
493 Object.keys(workerChoiceStrategy.opts.weights).length,
494 runTime: { median: false },
495 waitTime: { median: false },
496 elu: { median: false }
497 // weights: expect.objectContaining({
498 // 0: expect.any(Number),
499 // [pool.info.maxSize - 1]: expect.any(Number)
500 // })
00e1bdeb 501 })
55ddaf0f 502 )
a20f0ba5 503 }
87de9ff5
JB
504 expect(
505 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
506 ).toStrictEqual({
932fc8be
JB
507 runTime: {
508 aggregate: true,
509 average: true,
510 median: false
511 },
512 waitTime: {
513 aggregate: false,
514 average: false,
515 median: false
516 },
5df69fab 517 elu: {
9adcefab
JB
518 aggregate: true,
519 average: true,
5df69fab
JB
520 median: false
521 }
86bf340d 522 })
9adcefab
JB
523 pool.setWorkerChoiceStrategyOptions({
524 runTime: { median: true },
525 elu: { median: true }
526 })
a20f0ba5 527 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
9adcefab 528 runTime: { median: true },
8990357d
JB
529 elu: { median: true }
530 })
531 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
00e1bdeb
JB
532 retries:
533 pool.info.maxSize +
534 Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
8990357d
JB
535 runTime: { median: true },
536 waitTime: { median: false },
00e1bdeb
JB
537 elu: { median: true },
538 weights: expect.objectContaining({
539 0: expect.any(Number),
540 [pool.info.maxSize - 1]: expect.any(Number)
541 })
a20f0ba5
JB
542 })
543 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
544 .workerChoiceStrategies) {
fc7633dd
JB
545 expect(workerChoiceStrategy.opts).toStrictEqual(
546 expect.objectContaining({
547 retries:
548 pool.info.maxSize +
549 Object.keys(workerChoiceStrategy.opts.weights).length,
550 runTime: { median: true },
551 waitTime: { median: false },
552 elu: { median: true }
553 // weights: expect.objectContaining({
554 // 0: expect.any(Number),
555 // [pool.info.maxSize - 1]: expect.any(Number)
556 // })
00e1bdeb 557 })
fc7633dd 558 )
a20f0ba5 559 }
87de9ff5
JB
560 expect(
561 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
562 ).toStrictEqual({
932fc8be
JB
563 runTime: {
564 aggregate: true,
565 average: false,
566 median: true
567 },
568 waitTime: {
569 aggregate: false,
570 average: false,
571 median: false
572 },
5df69fab 573 elu: {
9adcefab 574 aggregate: true,
5df69fab 575 average: false,
9adcefab 576 median: true
5df69fab 577 }
86bf340d 578 })
9adcefab
JB
579 pool.setWorkerChoiceStrategyOptions({
580 runTime: { median: false },
581 elu: { median: false }
582 })
a20f0ba5 583 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
8990357d 584 runTime: { median: false },
8990357d
JB
585 elu: { median: false }
586 })
587 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
00e1bdeb
JB
588 retries:
589 pool.info.maxSize +
590 Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
9adcefab 591 runTime: { median: false },
8990357d 592 waitTime: { median: false },
00e1bdeb
JB
593 elu: { median: false },
594 weights: expect.objectContaining({
595 0: expect.any(Number),
596 [pool.info.maxSize - 1]: expect.any(Number)
597 })
a20f0ba5
JB
598 })
599 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
600 .workerChoiceStrategies) {
932fc8be 601 expect(workerChoiceStrategy.opts).toStrictEqual({
00e1bdeb
JB
602 retries:
603 pool.info.maxSize +
604 Object.keys(workerChoiceStrategy.opts.weights).length,
9adcefab 605 runTime: { median: false },
8990357d 606 waitTime: { median: false },
00e1bdeb
JB
607 elu: { median: false },
608 weights: expect.objectContaining({
609 0: expect.any(Number),
610 [pool.info.maxSize - 1]: expect.any(Number)
611 })
932fc8be 612 })
a20f0ba5 613 }
87de9ff5
JB
614 expect(
615 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
616 ).toStrictEqual({
932fc8be
JB
617 runTime: {
618 aggregate: true,
619 average: true,
620 median: false
621 },
622 waitTime: {
623 aggregate: false,
624 average: false,
625 median: false
626 },
5df69fab 627 elu: {
9adcefab
JB
628 aggregate: true,
629 average: true,
5df69fab
JB
630 median: false
631 }
86bf340d 632 })
1f95d544
JB
633 expect(() =>
634 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
948faff7 635 ).toThrow(
8735b4e5
JB
636 new TypeError(
637 'Invalid worker choice strategy options: must be a plain object'
638 )
1f95d544 639 )
948faff7 640 expect(() => pool.setWorkerChoiceStrategyOptions({ weights: {} })).toThrow(
8735b4e5
JB
641 new Error(
642 'Invalid worker choice strategy options: must have a weight for each worker node'
643 )
1f95d544
JB
644 )
645 expect(() =>
646 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
948faff7 647 ).toThrow(
8735b4e5
JB
648 new Error(
649 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
650 )
1f95d544 651 )
a20f0ba5
JB
652 await pool.destroy()
653 })
654
2431bdb4 655 it('Verify that pool tasks queue can be enabled/disabled', async () => {
a20f0ba5
JB
656 const pool = new FixedThreadPool(
657 numberOfWorkers,
b2fd3f4a 658 './tests/worker-files/thread/testWorker.mjs'
a20f0ba5
JB
659 )
660 expect(pool.opts.enableTasksQueue).toBe(false)
661 expect(pool.opts.tasksQueueOptions).toBeUndefined()
662 pool.enableTasksQueue(true)
663 expect(pool.opts.enableTasksQueue).toBe(true)
20c6f652
JB
664 expect(pool.opts.tasksQueueOptions).toStrictEqual({
665 concurrency: 1,
2324f8c9 666 size: Math.pow(numberOfWorkers, 2),
dbd73092 667 taskStealing: true,
32b141fd 668 tasksStealingOnBackPressure: true,
568d0075 669 tasksFinishedTimeout: 2000
20c6f652 670 })
a20f0ba5
JB
671 pool.enableTasksQueue(true, { concurrency: 2 })
672 expect(pool.opts.enableTasksQueue).toBe(true)
20c6f652
JB
673 expect(pool.opts.tasksQueueOptions).toStrictEqual({
674 concurrency: 2,
2324f8c9 675 size: Math.pow(numberOfWorkers, 2),
dbd73092 676 taskStealing: true,
32b141fd 677 tasksStealingOnBackPressure: true,
568d0075 678 tasksFinishedTimeout: 2000
20c6f652 679 })
a20f0ba5
JB
680 pool.enableTasksQueue(false)
681 expect(pool.opts.enableTasksQueue).toBe(false)
682 expect(pool.opts.tasksQueueOptions).toBeUndefined()
683 await pool.destroy()
684 })
685
2431bdb4 686 it('Verify that pool tasks queue options can be set', async () => {
a20f0ba5
JB
687 const pool = new FixedThreadPool(
688 numberOfWorkers,
b2fd3f4a 689 './tests/worker-files/thread/testWorker.mjs',
a20f0ba5
JB
690 { enableTasksQueue: true }
691 )
20c6f652
JB
692 expect(pool.opts.tasksQueueOptions).toStrictEqual({
693 concurrency: 1,
2324f8c9 694 size: Math.pow(numberOfWorkers, 2),
dbd73092 695 taskStealing: true,
32b141fd 696 tasksStealingOnBackPressure: true,
568d0075 697 tasksFinishedTimeout: 2000
20c6f652 698 })
d6ca1416 699 for (const workerNode of pool.workerNodes) {
2324f8c9
JB
700 expect(workerNode.tasksQueueBackPressureSize).toBe(
701 pool.opts.tasksQueueOptions.size
702 )
d6ca1416
JB
703 }
704 pool.setTasksQueueOptions({
705 concurrency: 2,
2324f8c9 706 size: 2,
d6ca1416 707 taskStealing: false,
32b141fd 708 tasksStealingOnBackPressure: false,
568d0075 709 tasksFinishedTimeout: 3000
d6ca1416 710 })
20c6f652
JB
711 expect(pool.opts.tasksQueueOptions).toStrictEqual({
712 concurrency: 2,
2324f8c9 713 size: 2,
d6ca1416 714 taskStealing: false,
32b141fd 715 tasksStealingOnBackPressure: false,
568d0075 716 tasksFinishedTimeout: 3000
d6ca1416
JB
717 })
718 for (const workerNode of pool.workerNodes) {
2324f8c9
JB
719 expect(workerNode.tasksQueueBackPressureSize).toBe(
720 pool.opts.tasksQueueOptions.size
721 )
d6ca1416
JB
722 }
723 pool.setTasksQueueOptions({
724 concurrency: 1,
725 taskStealing: true,
726 tasksStealingOnBackPressure: true
727 })
728 expect(pool.opts.tasksQueueOptions).toStrictEqual({
729 concurrency: 1,
2324f8c9 730 size: Math.pow(numberOfWorkers, 2),
dbd73092 731 taskStealing: true,
32b141fd 732 tasksStealingOnBackPressure: true,
568d0075 733 tasksFinishedTimeout: 2000
20c6f652 734 })
d6ca1416 735 for (const workerNode of pool.workerNodes) {
2324f8c9
JB
736 expect(workerNode.tasksQueueBackPressureSize).toBe(
737 pool.opts.tasksQueueOptions.size
738 )
d6ca1416 739 }
948faff7 740 expect(() => pool.setTasksQueueOptions('invalidTasksQueueOptions')).toThrow(
8735b4e5
JB
741 new TypeError('Invalid tasks queue options: must be a plain object')
742 )
948faff7 743 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrow(
e695d66f 744 new RangeError(
20c6f652 745 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
8735b4e5
JB
746 )
747 )
948faff7 748 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrow(
e695d66f 749 new RangeError(
20c6f652 750 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
8735b4e5 751 )
a20f0ba5 752 )
948faff7 753 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrow(
20c6f652
JB
754 new TypeError('Invalid worker node tasks concurrency: must be an integer')
755 )
948faff7 756 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrow(
20c6f652 757 new RangeError(
68dbcdc0 758 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
20c6f652
JB
759 )
760 )
948faff7 761 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrow(
20c6f652 762 new RangeError(
68dbcdc0 763 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
20c6f652
JB
764 )
765 )
948faff7 766 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrow(
68dbcdc0 767 new TypeError('Invalid worker node tasks queue size: must be an integer')
f0d7f803 768 )
a20f0ba5
JB
769 await pool.destroy()
770 })
771
6b27d407
JB
772 it('Verify that pool info is set', async () => {
773 let pool = new FixedThreadPool(
774 numberOfWorkers,
b2fd3f4a 775 './tests/worker-files/thread/testWorker.mjs'
6b27d407 776 )
2dca6cab
JB
777 expect(pool.info).toStrictEqual({
778 version,
779 type: PoolTypes.fixed,
780 worker: WorkerTypes.thread,
47352846 781 started: true,
2dca6cab
JB
782 ready: true,
783 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
784 minSize: numberOfWorkers,
785 maxSize: numberOfWorkers,
786 workerNodes: numberOfWorkers,
787 idleWorkerNodes: numberOfWorkers,
788 busyWorkerNodes: 0,
789 executedTasks: 0,
790 executingTasks: 0,
2dca6cab
JB
791 failedTasks: 0
792 })
6b27d407
JB
793 await pool.destroy()
794 pool = new DynamicClusterPool(
2431bdb4 795 Math.floor(numberOfWorkers / 2),
6b27d407 796 numberOfWorkers,
ecdfbdc0 797 './tests/worker-files/cluster/testWorker.js'
6b27d407 798 )
2dca6cab
JB
799 expect(pool.info).toStrictEqual({
800 version,
801 type: PoolTypes.dynamic,
802 worker: WorkerTypes.cluster,
47352846 803 started: true,
2dca6cab
JB
804 ready: true,
805 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
806 minSize: Math.floor(numberOfWorkers / 2),
807 maxSize: numberOfWorkers,
808 workerNodes: Math.floor(numberOfWorkers / 2),
809 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
810 busyWorkerNodes: 0,
811 executedTasks: 0,
812 executingTasks: 0,
2dca6cab
JB
813 failedTasks: 0
814 })
6b27d407
JB
815 await pool.destroy()
816 })
817
2431bdb4 818 it('Verify that pool worker tasks usage are initialized', async () => {
bf9549ae
JB
819 const pool = new FixedClusterPool(
820 numberOfWorkers,
821 './tests/worker-files/cluster/testWorker.js'
822 )
f06e48d8 823 for (const workerNode of pool.workerNodes) {
47352846 824 expect(workerNode).toBeInstanceOf(WorkerNode)
465b2940 825 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
826 tasks: {
827 executed: 0,
828 executing: 0,
829 queued: 0,
df593701 830 maxQueued: 0,
463226a4 831 sequentiallyStolen: 0,
68cbdc84 832 stolen: 0,
a4e07f72
JB
833 failed: 0
834 },
835 runTime: {
4ba4c7f9 836 history: new CircularArray()
a4e07f72
JB
837 },
838 waitTime: {
4ba4c7f9 839 history: new CircularArray()
a4e07f72 840 },
5df69fab
JB
841 elu: {
842 idle: {
4ba4c7f9 843 history: new CircularArray()
5df69fab
JB
844 },
845 active: {
4ba4c7f9 846 history: new CircularArray()
f7510105 847 }
5df69fab 848 }
86bf340d 849 })
f06e48d8
JB
850 }
851 await pool.destroy()
852 })
853
2431bdb4
JB
854 it('Verify that pool worker tasks queue are initialized', async () => {
855 let pool = new FixedClusterPool(
f06e48d8
JB
856 numberOfWorkers,
857 './tests/worker-files/cluster/testWorker.js'
858 )
859 for (const workerNode of pool.workerNodes) {
47352846 860 expect(workerNode).toBeInstanceOf(WorkerNode)
574b351d 861 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
4d8bf9e4 862 expect(workerNode.tasksQueue.size).toBe(0)
9c16fb4b 863 expect(workerNode.tasksQueue.maxSize).toBe(0)
bf9549ae 864 }
fd7ebd49 865 await pool.destroy()
2431bdb4
JB
866 pool = new DynamicThreadPool(
867 Math.floor(numberOfWorkers / 2),
868 numberOfWorkers,
b2fd3f4a 869 './tests/worker-files/thread/testWorker.mjs'
2431bdb4
JB
870 )
871 for (const workerNode of pool.workerNodes) {
47352846 872 expect(workerNode).toBeInstanceOf(WorkerNode)
574b351d 873 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
2431bdb4
JB
874 expect(workerNode.tasksQueue.size).toBe(0)
875 expect(workerNode.tasksQueue.maxSize).toBe(0)
876 }
213cbac6 877 await pool.destroy()
2431bdb4
JB
878 })
879
880 it('Verify that pool worker info are initialized', async () => {
881 let pool = new FixedClusterPool(
882 numberOfWorkers,
883 './tests/worker-files/cluster/testWorker.js'
884 )
2dca6cab 885 for (const workerNode of pool.workerNodes) {
47352846 886 expect(workerNode).toBeInstanceOf(WorkerNode)
2dca6cab
JB
887 expect(workerNode.info).toStrictEqual({
888 id: expect.any(Number),
889 type: WorkerTypes.cluster,
890 dynamic: false,
891 ready: true
892 })
893 }
2431bdb4
JB
894 await pool.destroy()
895 pool = new DynamicThreadPool(
896 Math.floor(numberOfWorkers / 2),
897 numberOfWorkers,
b2fd3f4a 898 './tests/worker-files/thread/testWorker.mjs'
2431bdb4 899 )
2dca6cab 900 for (const workerNode of pool.workerNodes) {
47352846 901 expect(workerNode).toBeInstanceOf(WorkerNode)
2dca6cab
JB
902 expect(workerNode.info).toStrictEqual({
903 id: expect.any(Number),
904 type: WorkerTypes.thread,
905 dynamic: false,
7884d183 906 ready: true
2dca6cab
JB
907 })
908 }
213cbac6 909 await pool.destroy()
bf9549ae
JB
910 })
911
711623b8
JB
912 it('Verify that pool statuses are checked at start or destroy', async () => {
913 const pool = new FixedThreadPool(
914 numberOfWorkers,
915 './tests/worker-files/thread/testWorker.mjs'
916 )
917 expect(pool.info.started).toBe(true)
918 expect(pool.info.ready).toBe(true)
919 expect(() => pool.start()).toThrow(
920 new Error('Cannot start an already started pool')
921 )
922 await pool.destroy()
923 expect(pool.info.started).toBe(false)
924 expect(pool.info.ready).toBe(false)
925 await expect(pool.destroy()).rejects.toThrow(
926 new Error('Cannot destroy an already destroyed pool')
927 )
928 })
929
47352846
JB
930 it('Verify that pool can be started after initialization', async () => {
931 const pool = new FixedClusterPool(
932 numberOfWorkers,
933 './tests/worker-files/cluster/testWorker.js',
934 {
935 startWorkers: false
936 }
937 )
938 expect(pool.info.started).toBe(false)
939 expect(pool.info.ready).toBe(false)
55082af9 940 expect(pool.readyEventEmitted).toBe(false)
47352846 941 expect(pool.workerNodes).toStrictEqual([])
948faff7 942 await expect(pool.execute()).rejects.toThrow(
47352846
JB
943 new Error('Cannot execute a task on not started pool')
944 )
945 pool.start()
946 expect(pool.info.started).toBe(true)
947 expect(pool.info.ready).toBe(true)
55082af9
JB
948 await waitPoolEvents(pool, PoolEvents.ready, 1)
949 expect(pool.readyEventEmitted).toBe(true)
47352846
JB
950 expect(pool.workerNodes.length).toBe(numberOfWorkers)
951 for (const workerNode of pool.workerNodes) {
952 expect(workerNode).toBeInstanceOf(WorkerNode)
953 }
954 await pool.destroy()
955 })
956
9d2d0da1
JB
957 it('Verify that pool execute() arguments are checked', async () => {
958 const pool = new FixedClusterPool(
959 numberOfWorkers,
960 './tests/worker-files/cluster/testWorker.js'
961 )
948faff7 962 await expect(pool.execute(undefined, 0)).rejects.toThrow(
9d2d0da1
JB
963 new TypeError('name argument must be a string')
964 )
948faff7 965 await expect(pool.execute(undefined, '')).rejects.toThrow(
9d2d0da1
JB
966 new TypeError('name argument must not be an empty string')
967 )
948faff7 968 await expect(pool.execute(undefined, undefined, {})).rejects.toThrow(
9d2d0da1
JB
969 new TypeError('transferList argument must be an array')
970 )
971 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
972 "Task function 'unknown' not found"
973 )
974 await pool.destroy()
948faff7 975 await expect(pool.execute()).rejects.toThrow(
47352846 976 new Error('Cannot execute a task on not started pool')
9d2d0da1
JB
977 )
978 })
979
2431bdb4 980 it('Verify that pool worker tasks usage are computed', async () => {
bf9549ae
JB
981 const pool = new FixedClusterPool(
982 numberOfWorkers,
983 './tests/worker-files/cluster/testWorker.js'
984 )
09c2d0d3 985 const promises = new Set()
fc027381
JB
986 const maxMultiplier = 2
987 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
09c2d0d3 988 promises.add(pool.execute())
bf9549ae 989 }
f06e48d8 990 for (const workerNode of pool.workerNodes) {
465b2940 991 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
992 tasks: {
993 executed: 0,
994 executing: maxMultiplier,
995 queued: 0,
df593701 996 maxQueued: 0,
463226a4 997 sequentiallyStolen: 0,
68cbdc84 998 stolen: 0,
a4e07f72
JB
999 failed: 0
1000 },
1001 runTime: {
a4e07f72
JB
1002 history: expect.any(CircularArray)
1003 },
1004 waitTime: {
a4e07f72
JB
1005 history: expect.any(CircularArray)
1006 },
5df69fab
JB
1007 elu: {
1008 idle: {
5df69fab
JB
1009 history: expect.any(CircularArray)
1010 },
1011 active: {
5df69fab 1012 history: expect.any(CircularArray)
f7510105 1013 }
5df69fab 1014 }
86bf340d 1015 })
bf9549ae
JB
1016 }
1017 await Promise.all(promises)
f06e48d8 1018 for (const workerNode of pool.workerNodes) {
465b2940 1019 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
1020 tasks: {
1021 executed: maxMultiplier,
1022 executing: 0,
1023 queued: 0,
df593701 1024 maxQueued: 0,
463226a4 1025 sequentiallyStolen: 0,
68cbdc84 1026 stolen: 0,
a4e07f72
JB
1027 failed: 0
1028 },
1029 runTime: {
a4e07f72
JB
1030 history: expect.any(CircularArray)
1031 },
1032 waitTime: {
a4e07f72
JB
1033 history: expect.any(CircularArray)
1034 },
5df69fab
JB
1035 elu: {
1036 idle: {
5df69fab
JB
1037 history: expect.any(CircularArray)
1038 },
1039 active: {
5df69fab 1040 history: expect.any(CircularArray)
f7510105 1041 }
5df69fab 1042 }
86bf340d 1043 })
bf9549ae 1044 }
fd7ebd49 1045 await pool.destroy()
bf9549ae
JB
1046 })
1047
2431bdb4 1048 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
7fd82a1c 1049 const pool = new DynamicThreadPool(
2431bdb4 1050 Math.floor(numberOfWorkers / 2),
8f4878b7 1051 numberOfWorkers,
b2fd3f4a 1052 './tests/worker-files/thread/testWorker.mjs'
9e619829 1053 )
09c2d0d3 1054 const promises = new Set()
ee9f5295
JB
1055 const maxMultiplier = 2
1056 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
09c2d0d3 1057 promises.add(pool.execute())
9e619829
JB
1058 }
1059 await Promise.all(promises)
f06e48d8 1060 for (const workerNode of pool.workerNodes) {
465b2940 1061 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
1062 tasks: {
1063 executed: expect.any(Number),
1064 executing: 0,
1065 queued: 0,
df593701 1066 maxQueued: 0,
463226a4 1067 sequentiallyStolen: 0,
68cbdc84 1068 stolen: 0,
a4e07f72
JB
1069 failed: 0
1070 },
1071 runTime: {
a4e07f72
JB
1072 history: expect.any(CircularArray)
1073 },
1074 waitTime: {
a4e07f72
JB
1075 history: expect.any(CircularArray)
1076 },
5df69fab
JB
1077 elu: {
1078 idle: {
5df69fab
JB
1079 history: expect.any(CircularArray)
1080 },
1081 active: {
5df69fab 1082 history: expect.any(CircularArray)
f7510105 1083 }
5df69fab 1084 }
86bf340d 1085 })
465b2940 1086 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
94407def
JB
1087 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1088 numberOfWorkers * maxMultiplier
1089 )
b97d82d8
JB
1090 expect(workerNode.usage.runTime.history.length).toBe(0)
1091 expect(workerNode.usage.waitTime.history.length).toBe(0)
1092 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1093 expect(workerNode.usage.elu.active.history.length).toBe(0)
9e619829
JB
1094 }
1095 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
f06e48d8 1096 for (const workerNode of pool.workerNodes) {
465b2940 1097 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
1098 tasks: {
1099 executed: 0,
1100 executing: 0,
1101 queued: 0,
df593701 1102 maxQueued: 0,
463226a4 1103 sequentiallyStolen: 0,
68cbdc84 1104 stolen: 0,
a4e07f72
JB
1105 failed: 0
1106 },
1107 runTime: {
a4e07f72
JB
1108 history: expect.any(CircularArray)
1109 },
1110 waitTime: {
a4e07f72
JB
1111 history: expect.any(CircularArray)
1112 },
5df69fab
JB
1113 elu: {
1114 idle: {
5df69fab
JB
1115 history: expect.any(CircularArray)
1116 },
1117 active: {
5df69fab 1118 history: expect.any(CircularArray)
f7510105 1119 }
5df69fab 1120 }
86bf340d 1121 })
465b2940
JB
1122 expect(workerNode.usage.runTime.history.length).toBe(0)
1123 expect(workerNode.usage.waitTime.history.length).toBe(0)
b97d82d8
JB
1124 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1125 expect(workerNode.usage.elu.active.history.length).toBe(0)
ee11a4a2 1126 }
fd7ebd49 1127 await pool.destroy()
ee11a4a2
JB
1128 })
1129
a1763c54
JB
1130 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1131 const pool = new DynamicClusterPool(
2431bdb4 1132 Math.floor(numberOfWorkers / 2),
164d950a 1133 numberOfWorkers,
a1763c54 1134 './tests/worker-files/cluster/testWorker.js'
164d950a 1135 )
c726f66c 1136 expect(pool.emitter.eventNames()).toStrictEqual([])
d46660cd 1137 let poolInfo
a1763c54 1138 let poolReady = 0
041dc05b 1139 pool.emitter.on(PoolEvents.ready, info => {
a1763c54 1140 ++poolReady
d46660cd
JB
1141 poolInfo = info
1142 })
a1763c54 1143 await waitPoolEvents(pool, PoolEvents.ready, 1)
c726f66c 1144 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
a1763c54 1145 expect(poolReady).toBe(1)
d46660cd 1146 expect(poolInfo).toStrictEqual({
23ccf9d7 1147 version,
d46660cd 1148 type: PoolTypes.dynamic,
a1763c54 1149 worker: WorkerTypes.cluster,
47352846 1150 started: true,
a1763c54 1151 ready: true,
2431bdb4
JB
1152 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1153 minSize: expect.any(Number),
1154 maxSize: expect.any(Number),
1155 workerNodes: expect.any(Number),
1156 idleWorkerNodes: expect.any(Number),
1157 busyWorkerNodes: expect.any(Number),
1158 executedTasks: expect.any(Number),
1159 executingTasks: expect.any(Number),
2431bdb4
JB
1160 failedTasks: expect.any(Number)
1161 })
1162 await pool.destroy()
1163 })
1164
a1763c54
JB
1165 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1166 const pool = new FixedThreadPool(
2431bdb4 1167 numberOfWorkers,
b2fd3f4a 1168 './tests/worker-files/thread/testWorker.mjs'
2431bdb4 1169 )
c726f66c 1170 expect(pool.emitter.eventNames()).toStrictEqual([])
a1763c54
JB
1171 const promises = new Set()
1172 let poolBusy = 0
2431bdb4 1173 let poolInfo
041dc05b 1174 pool.emitter.on(PoolEvents.busy, info => {
a1763c54 1175 ++poolBusy
2431bdb4
JB
1176 poolInfo = info
1177 })
c726f66c 1178 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
a1763c54
JB
1179 for (let i = 0; i < numberOfWorkers * 2; i++) {
1180 promises.add(pool.execute())
1181 }
1182 await Promise.all(promises)
1183 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1184 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1185 expect(poolBusy).toBe(numberOfWorkers + 1)
2431bdb4
JB
1186 expect(poolInfo).toStrictEqual({
1187 version,
a1763c54
JB
1188 type: PoolTypes.fixed,
1189 worker: WorkerTypes.thread,
47352846
JB
1190 started: true,
1191 ready: true,
2431bdb4 1192 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
d46660cd
JB
1193 minSize: expect.any(Number),
1194 maxSize: expect.any(Number),
1195 workerNodes: expect.any(Number),
1196 idleWorkerNodes: expect.any(Number),
1197 busyWorkerNodes: expect.any(Number),
a4e07f72
JB
1198 executedTasks: expect.any(Number),
1199 executingTasks: expect.any(Number),
a4e07f72 1200 failedTasks: expect.any(Number)
d46660cd 1201 })
164d950a
JB
1202 await pool.destroy()
1203 })
1204
a1763c54
JB
1205 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1206 const pool = new DynamicThreadPool(
1207 Math.floor(numberOfWorkers / 2),
7c0ba920 1208 numberOfWorkers,
b2fd3f4a 1209 './tests/worker-files/thread/testWorker.mjs'
7c0ba920 1210 )
c726f66c 1211 expect(pool.emitter.eventNames()).toStrictEqual([])
09c2d0d3 1212 const promises = new Set()
a1763c54 1213 let poolFull = 0
d46660cd 1214 let poolInfo
041dc05b 1215 pool.emitter.on(PoolEvents.full, info => {
a1763c54 1216 ++poolFull
d46660cd
JB
1217 poolInfo = info
1218 })
c726f66c 1219 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.full])
7c0ba920 1220 for (let i = 0; i < numberOfWorkers * 2; i++) {
f5d14e90 1221 promises.add(pool.execute())
7c0ba920 1222 }
cf597bc5 1223 await Promise.all(promises)
33e6bb4c 1224 expect(poolFull).toBe(1)
d46660cd 1225 expect(poolInfo).toStrictEqual({
23ccf9d7 1226 version,
a1763c54 1227 type: PoolTypes.dynamic,
d46660cd 1228 worker: WorkerTypes.thread,
47352846
JB
1229 started: true,
1230 ready: true,
2431bdb4 1231 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
8735b4e5
JB
1232 minSize: expect.any(Number),
1233 maxSize: expect.any(Number),
1234 workerNodes: expect.any(Number),
1235 idleWorkerNodes: expect.any(Number),
1236 busyWorkerNodes: expect.any(Number),
1237 executedTasks: expect.any(Number),
1238 executingTasks: expect.any(Number),
1239 failedTasks: expect.any(Number)
1240 })
1241 await pool.destroy()
1242 })
1243
3e8611a8 1244 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
b1aae695 1245 const pool = new FixedThreadPool(
8735b4e5 1246 numberOfWorkers,
b2fd3f4a 1247 './tests/worker-files/thread/testWorker.mjs',
8735b4e5
JB
1248 {
1249 enableTasksQueue: true
1250 }
1251 )
a074ffee 1252 stub(pool, 'hasBackPressure').returns(true)
c726f66c 1253 expect(pool.emitter.eventNames()).toStrictEqual([])
8735b4e5
JB
1254 const promises = new Set()
1255 let poolBackPressure = 0
1256 let poolInfo
041dc05b 1257 pool.emitter.on(PoolEvents.backPressure, info => {
8735b4e5
JB
1258 ++poolBackPressure
1259 poolInfo = info
1260 })
c726f66c 1261 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.backPressure])
033f1776 1262 for (let i = 0; i < numberOfWorkers + 1; i++) {
8735b4e5
JB
1263 promises.add(pool.execute())
1264 }
1265 await Promise.all(promises)
033f1776 1266 expect(poolBackPressure).toBe(1)
8735b4e5
JB
1267 expect(poolInfo).toStrictEqual({
1268 version,
3e8611a8 1269 type: PoolTypes.fixed,
8735b4e5 1270 worker: WorkerTypes.thread,
47352846
JB
1271 started: true,
1272 ready: true,
8735b4e5 1273 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
d46660cd
JB
1274 minSize: expect.any(Number),
1275 maxSize: expect.any(Number),
1276 workerNodes: expect.any(Number),
1277 idleWorkerNodes: expect.any(Number),
1278 busyWorkerNodes: expect.any(Number),
a4e07f72
JB
1279 executedTasks: expect.any(Number),
1280 executingTasks: expect.any(Number),
3e8611a8
JB
1281 maxQueuedTasks: expect.any(Number),
1282 queuedTasks: expect.any(Number),
1283 backPressure: true,
68cbdc84 1284 stolenTasks: expect.any(Number),
a4e07f72 1285 failedTasks: expect.any(Number)
d46660cd 1286 })
e458c82e 1287 expect(pool.hasBackPressure.callCount).toBe(5)
fd7ebd49 1288 await pool.destroy()
7c0ba920 1289 })
70a4f5ea 1290
85b2561d
JB
1291 it('Verify that destroy() waits for queued tasks to finish', async () => {
1292 const tasksFinishedTimeout = 2500
1293 const pool = new FixedThreadPool(
1294 numberOfWorkers,
1295 './tests/worker-files/thread/asyncWorker.mjs',
1296 {
1297 enableTasksQueue: true,
1298 tasksQueueOptions: { tasksFinishedTimeout }
1299 }
1300 )
1301 const maxMultiplier = 4
1302 let tasksFinished = 0
1303 for (const workerNode of pool.workerNodes) {
1304 workerNode.on('taskFinished', () => {
1305 ++tasksFinished
1306 })
1307 }
1308 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1309 pool.execute()
1310 }
1311 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1312 const startTime = performance.now()
1313 await pool.destroy()
1314 const elapsedTime = performance.now() - startTime
1315 expect(tasksFinished).toBe(numberOfWorkers * maxMultiplier)
1316 expect(elapsedTime).toBeGreaterThanOrEqual(2000)
6d41131f 1317 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 100)
85b2561d
JB
1318 })
1319
1320 it('Verify that destroy() waits until the tasks finished timeout is reached', async () => {
1321 const tasksFinishedTimeout = 1000
1322 const pool = new FixedThreadPool(
1323 numberOfWorkers,
1324 './tests/worker-files/thread/asyncWorker.mjs',
1325 {
1326 enableTasksQueue: true,
1327 tasksQueueOptions: { tasksFinishedTimeout }
1328 }
1329 )
1330 const maxMultiplier = 4
1331 let tasksFinished = 0
1332 for (const workerNode of pool.workerNodes) {
1333 workerNode.on('taskFinished', () => {
1334 ++tasksFinished
1335 })
1336 }
1337 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1338 pool.execute()
1339 }
1340 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1341 const startTime = performance.now()
1342 await pool.destroy()
1343 const elapsedTime = performance.now() - startTime
1344 expect(tasksFinished).toBe(0)
c004ecec 1345 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 600)
85b2561d
JB
1346 })
1347
f18fd12b
JB
1348 it('Verify that pool asynchronous resource track tasks execution', async () => {
1349 let taskAsyncId
1350 let initCalls = 0
1351 let beforeCalls = 0
1352 let afterCalls = 0
1353 let resolveCalls = 0
1354 const hook = createHook({
1355 init (asyncId, type) {
1356 if (type === 'poolifier:task') {
1357 initCalls++
1358 taskAsyncId = asyncId
1359 }
1360 },
1361 before (asyncId) {
1362 if (asyncId === taskAsyncId) beforeCalls++
1363 },
1364 after (asyncId) {
1365 if (asyncId === taskAsyncId) afterCalls++
1366 },
1367 promiseResolve () {
1368 if (executionAsyncId() === taskAsyncId) resolveCalls++
1369 }
1370 })
f18fd12b
JB
1371 const pool = new FixedThreadPool(
1372 numberOfWorkers,
1373 './tests/worker-files/thread/testWorker.mjs'
1374 )
8954c0a3 1375 hook.enable()
f18fd12b
JB
1376 await pool.execute()
1377 hook.disable()
1378 expect(initCalls).toBe(1)
1379 expect(beforeCalls).toBe(1)
1380 expect(afterCalls).toBe(1)
1381 expect(resolveCalls).toBe(1)
8954c0a3 1382 await pool.destroy()
f18fd12b
JB
1383 })
1384
9eae3c69
JB
1385 it('Verify that hasTaskFunction() is working', async () => {
1386 const dynamicThreadPool = new DynamicThreadPool(
1387 Math.floor(numberOfWorkers / 2),
1388 numberOfWorkers,
b2fd3f4a 1389 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
9eae3c69
JB
1390 )
1391 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1392 expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1393 expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1394 true
1395 )
1396 expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true)
1397 expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true)
1398 expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false)
1399 await dynamicThreadPool.destroy()
1400 const fixedClusterPool = new FixedClusterPool(
1401 numberOfWorkers,
1402 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1403 )
1404 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1405 expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1406 expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1407 true
1408 )
1409 expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true)
1410 expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true)
1411 expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false)
1412 await fixedClusterPool.destroy()
1413 })
1414
1415 it('Verify that addTaskFunction() is working', async () => {
1416 const dynamicThreadPool = new DynamicThreadPool(
1417 Math.floor(numberOfWorkers / 2),
1418 numberOfWorkers,
b2fd3f4a 1419 './tests/worker-files/thread/testWorker.mjs'
9eae3c69
JB
1420 )
1421 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
3feeab69
JB
1422 await expect(
1423 dynamicThreadPool.addTaskFunction(0, () => {})
948faff7 1424 ).rejects.toThrow(new TypeError('name argument must be a string'))
3feeab69
JB
1425 await expect(
1426 dynamicThreadPool.addTaskFunction('', () => {})
948faff7 1427 ).rejects.toThrow(
3feeab69
JB
1428 new TypeError('name argument must not be an empty string')
1429 )
948faff7
JB
1430 await expect(dynamicThreadPool.addTaskFunction('test', 0)).rejects.toThrow(
1431 new TypeError('fn argument must be a function')
1432 )
1433 await expect(dynamicThreadPool.addTaskFunction('test', '')).rejects.toThrow(
1434 new TypeError('fn argument must be a function')
1435 )
9eae3c69
JB
1436 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1437 DEFAULT_TASK_NAME,
1438 'test'
1439 ])
1440 const echoTaskFunction = data => {
1441 return data
1442 }
1443 await expect(
1444 dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1445 ).resolves.toBe(true)
1446 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1447 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1448 echoTaskFunction
1449 )
1450 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1451 DEFAULT_TASK_NAME,
1452 'test',
1453 'echo'
1454 ])
1455 const taskFunctionData = { test: 'test' }
1456 const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
1457 expect(echoResult).toStrictEqual(taskFunctionData)
adee6053
JB
1458 for (const workerNode of dynamicThreadPool.workerNodes) {
1459 expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
1460 tasks: {
1461 executed: expect.any(Number),
1462 executing: 0,
1463 queued: 0,
463226a4 1464 sequentiallyStolen: 0,
5ad42e34 1465 stolen: 0,
adee6053
JB
1466 failed: 0
1467 },
1468 runTime: {
1469 history: new CircularArray()
1470 },
1471 waitTime: {
1472 history: new CircularArray()
1473 },
1474 elu: {
1475 idle: {
1476 history: new CircularArray()
1477 },
1478 active: {
1479 history: new CircularArray()
1480 }
1481 }
1482 })
1483 }
9eae3c69
JB
1484 await dynamicThreadPool.destroy()
1485 })
1486
1487 it('Verify that removeTaskFunction() is working', async () => {
1488 const dynamicThreadPool = new DynamicThreadPool(
1489 Math.floor(numberOfWorkers / 2),
1490 numberOfWorkers,
b2fd3f4a 1491 './tests/worker-files/thread/testWorker.mjs'
9eae3c69
JB
1492 )
1493 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1494 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1495 DEFAULT_TASK_NAME,
1496 'test'
1497 ])
948faff7 1498 await expect(dynamicThreadPool.removeTaskFunction('test')).rejects.toThrow(
16248b23 1499 new Error('Cannot remove a task function not handled on the pool side')
9eae3c69
JB
1500 )
1501 const echoTaskFunction = data => {
1502 return data
1503 }
1504 await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1505 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1506 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1507 echoTaskFunction
1508 )
1509 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1510 DEFAULT_TASK_NAME,
1511 'test',
1512 'echo'
1513 ])
1514 await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
1515 true
1516 )
1517 expect(dynamicThreadPool.taskFunctions.size).toBe(0)
1518 expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
1519 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1520 DEFAULT_TASK_NAME,
1521 'test'
1522 ])
1523 await dynamicThreadPool.destroy()
1524 })
1525
30500265 1526 it('Verify that listTaskFunctionNames() is working', async () => {
90d7d101
JB
1527 const dynamicThreadPool = new DynamicThreadPool(
1528 Math.floor(numberOfWorkers / 2),
1529 numberOfWorkers,
b2fd3f4a 1530 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
90d7d101
JB
1531 )
1532 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
66979634 1533 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
6cd5248f 1534 DEFAULT_TASK_NAME,
90d7d101
JB
1535 'jsonIntegerSerialization',
1536 'factorial',
1537 'fibonacci'
1538 ])
9eae3c69 1539 await dynamicThreadPool.destroy()
90d7d101
JB
1540 const fixedClusterPool = new FixedClusterPool(
1541 numberOfWorkers,
1542 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1543 )
1544 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
66979634 1545 expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([
6cd5248f 1546 DEFAULT_TASK_NAME,
90d7d101
JB
1547 'jsonIntegerSerialization',
1548 'factorial',
1549 'fibonacci'
1550 ])
0fe39c97 1551 await fixedClusterPool.destroy()
90d7d101
JB
1552 })
1553
9eae3c69 1554 it('Verify that setDefaultTaskFunction() is working', async () => {
30500265
JB
1555 const dynamicThreadPool = new DynamicThreadPool(
1556 Math.floor(numberOfWorkers / 2),
1557 numberOfWorkers,
b2fd3f4a 1558 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
30500265
JB
1559 )
1560 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
711623b8 1561 const workerId = dynamicThreadPool.workerNodes[0].info.id
948faff7 1562 await expect(dynamicThreadPool.setDefaultTaskFunction(0)).rejects.toThrow(
b0b55f57 1563 new Error(
711623b8 1564 `Task function operation 'default' failed on worker ${workerId} with error: 'TypeError: name parameter is not a string'`
b0b55f57
JB
1565 )
1566 )
1567 await expect(
1568 dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
948faff7 1569 ).rejects.toThrow(
b0b55f57 1570 new Error(
711623b8 1571 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function reserved name as the default task function'`
b0b55f57
JB
1572 )
1573 )
1574 await expect(
1575 dynamicThreadPool.setDefaultTaskFunction('unknown')
948faff7 1576 ).rejects.toThrow(
b0b55f57 1577 new Error(
711623b8 1578 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function to a non-existing task function'`
b0b55f57
JB
1579 )
1580 )
9eae3c69
JB
1581 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1582 DEFAULT_TASK_NAME,
1583 'jsonIntegerSerialization',
1584 'factorial',
1585 'fibonacci'
1586 ])
1587 await expect(
1588 dynamicThreadPool.setDefaultTaskFunction('factorial')
1589 ).resolves.toBe(true)
1590 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1591 DEFAULT_TASK_NAME,
1592 'factorial',
1593 'jsonIntegerSerialization',
1594 'fibonacci'
1595 ])
1596 await expect(
1597 dynamicThreadPool.setDefaultTaskFunction('fibonacci')
1598 ).resolves.toBe(true)
1599 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1600 DEFAULT_TASK_NAME,
1601 'fibonacci',
1602 'jsonIntegerSerialization',
1603 'factorial'
1604 ])
cda9ba34 1605 await dynamicThreadPool.destroy()
30500265
JB
1606 })
1607
90d7d101 1608 it('Verify that multiple task functions worker is working', async () => {
70a4f5ea 1609 const pool = new DynamicClusterPool(
2431bdb4 1610 Math.floor(numberOfWorkers / 2),
70a4f5ea 1611 numberOfWorkers,
90d7d101 1612 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
70a4f5ea
JB
1613 )
1614 const data = { n: 10 }
82888165 1615 const result0 = await pool.execute(data)
30b963d4 1616 expect(result0).toStrictEqual({ ok: 1 })
70a4f5ea 1617 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
30b963d4 1618 expect(result1).toStrictEqual({ ok: 1 })
70a4f5ea
JB
1619 const result2 = await pool.execute(data, 'factorial')
1620 expect(result2).toBe(3628800)
1621 const result3 = await pool.execute(data, 'fibonacci')
024daf59 1622 expect(result3).toBe(55)
5bb5be17
JB
1623 expect(pool.info.executingTasks).toBe(0)
1624 expect(pool.info.executedTasks).toBe(4)
b414b84c 1625 for (const workerNode of pool.workerNodes) {
66979634 1626 expect(workerNode.info.taskFunctionNames).toStrictEqual([
6cd5248f 1627 DEFAULT_TASK_NAME,
b414b84c
JB
1628 'jsonIntegerSerialization',
1629 'factorial',
1630 'fibonacci'
1631 ])
1632 expect(workerNode.taskFunctionsUsage.size).toBe(3)
66979634 1633 for (const name of pool.listTaskFunctionNames()) {
5bb5be17
JB
1634 expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
1635 tasks: {
1636 executed: expect.any(Number),
4ba4c7f9 1637 executing: 0,
5bb5be17 1638 failed: 0,
68cbdc84 1639 queued: 0,
463226a4 1640 sequentiallyStolen: 0,
68cbdc84 1641 stolen: 0
5bb5be17
JB
1642 },
1643 runTime: {
1644 history: expect.any(CircularArray)
1645 },
1646 waitTime: {
1647 history: expect.any(CircularArray)
1648 },
1649 elu: {
1650 idle: {
1651 history: expect.any(CircularArray)
1652 },
1653 active: {
1654 history: expect.any(CircularArray)
1655 }
1656 }
1657 })
1658 expect(
4ba4c7f9
JB
1659 workerNode.getTaskFunctionWorkerUsage(name).tasks.executed
1660 ).toBeGreaterThan(0)
5bb5be17 1661 }
dfd7ec01
JB
1662 expect(
1663 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1664 ).toStrictEqual(
66979634
JB
1665 workerNode.getTaskFunctionWorkerUsage(
1666 workerNode.info.taskFunctionNames[1]
1667 )
dfd7ec01 1668 )
5bb5be17 1669 }
0fe39c97 1670 await pool.destroy()
70a4f5ea 1671 })
52a23942
JB
1672
1673 it('Verify sendKillMessageToWorker()', async () => {
1674 const pool = new DynamicClusterPool(
1675 Math.floor(numberOfWorkers / 2),
1676 numberOfWorkers,
1677 './tests/worker-files/cluster/testWorker.js'
1678 )
1679 const workerNodeKey = 0
1680 await expect(
adee6053 1681 pool.sendKillMessageToWorker(workerNodeKey)
52a23942 1682 ).resolves.toBeUndefined()
85b2561d
JB
1683 await expect(
1684 pool.sendKillMessageToWorker(numberOfWorkers)
1685 ).rejects.toStrictEqual(
1686 new Error(`Invalid worker node key '${numberOfWorkers}'`)
1687 )
52a23942
JB
1688 await pool.destroy()
1689 })
adee6053
JB
1690
1691 it('Verify sendTaskFunctionOperationToWorker()', async () => {
1692 const pool = new DynamicClusterPool(
1693 Math.floor(numberOfWorkers / 2),
1694 numberOfWorkers,
1695 './tests/worker-files/cluster/testWorker.js'
1696 )
1697 const workerNodeKey = 0
1698 await expect(
1699 pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
1700 taskFunctionOperation: 'add',
1701 taskFunctionName: 'empty',
1702 taskFunction: (() => {}).toString()
1703 })
1704 ).resolves.toBe(true)
1705 expect(
1706 pool.workerNodes[workerNodeKey].info.taskFunctionNames
1707 ).toStrictEqual([DEFAULT_TASK_NAME, 'test', 'empty'])
1708 await pool.destroy()
1709 })
1710
1711 it('Verify sendTaskFunctionOperationToWorkers()', async () => {
1712 const pool = new DynamicClusterPool(
1713 Math.floor(numberOfWorkers / 2),
1714 numberOfWorkers,
1715 './tests/worker-files/cluster/testWorker.js'
1716 )
adee6053
JB
1717 await expect(
1718 pool.sendTaskFunctionOperationToWorkers({
1719 taskFunctionOperation: 'add',
1720 taskFunctionName: 'empty',
1721 taskFunction: (() => {}).toString()
1722 })
1723 ).resolves.toBe(true)
1724 for (const workerNode of pool.workerNodes) {
1725 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1726 DEFAULT_TASK_NAME,
1727 'test',
1728 'empty'
1729 ])
1730 }
1731 await pool.destroy()
1732 })
3ec964d6 1733})