test: fix test expectation
[poolifier.git] / tests / pools / abstract-pool.test.mjs
CommitLineData
a074ffee 1import { EventEmitterAsyncResource } from 'node:events'
2eb14889 2import { dirname, join } from 'node:path'
a074ffee 3import { readFileSync } from 'node:fs'
2eb14889 4import { fileURLToPath } from 'node:url'
f18fd12b 5import { createHook, executionAsyncId } from 'node:async_hooks'
a074ffee
JB
6import { expect } from 'expect'
7import { restore, stub } from 'sinon'
8import {
70a4f5ea 9 DynamicClusterPool,
9e619829 10 DynamicThreadPool,
aee46736 11 FixedClusterPool,
e843b904 12 FixedThreadPool,
aee46736 13 PoolEvents,
184855e6 14 PoolTypes,
3d6dd312 15 WorkerChoiceStrategies,
184855e6 16 WorkerTypes
a074ffee
JB
17} from '../../lib/index.js'
18import { CircularArray } from '../../lib/circular-array.js'
19import { Deque } from '../../lib/deque.js'
20import { DEFAULT_TASK_NAME } from '../../lib/utils.js'
21import { waitPoolEvents } from '../test-utils.js'
22import { WorkerNode } from '../../lib/pools/worker-node.js'
e1ffb94f
JB
23
24describe('Abstract pool test suite', () => {
2eb14889
JB
25 const version = JSON.parse(
26 readFileSync(
a5e5599c 27 join(dirname(fileURLToPath(import.meta.url)), '../..', 'package.json'),
2eb14889
JB
28 'utf8'
29 )
30 ).version
fc027381 31 const numberOfWorkers = 2
a8884ffd 32 class StubPoolWithIsMain extends FixedThreadPool {
e1ffb94f
JB
33 isMain () {
34 return false
35 }
3ec964d6 36 }
3ec964d6 37
dc021bcc 38 afterEach(() => {
a074ffee 39 restore()
dc021bcc
JB
40 })
41
bcf85f7f
JB
42 it('Verify that pool can be created and destroyed', async () => {
43 const pool = new FixedThreadPool(
44 numberOfWorkers,
45 './tests/worker-files/thread/testWorker.mjs'
46 )
47 expect(pool).toBeInstanceOf(FixedThreadPool)
48 await pool.destroy()
49 })
50
51 it('Verify that pool cannot be created from a non main thread/process', () => {
8d3782fa
JB
52 expect(
53 () =>
a8884ffd 54 new StubPoolWithIsMain(
7c0ba920 55 numberOfWorkers,
b2fd3f4a 56 './tests/worker-files/thread/testWorker.mjs',
8d3782fa 57 {
041dc05b 58 errorHandler: e => console.error(e)
8d3782fa
JB
59 }
60 )
948faff7 61 ).toThrow(
e695d66f
JB
62 new Error(
63 'Cannot start a pool from a worker with the same type as the pool'
64 )
04f45163 65 )
3ec964d6 66 })
c510fea7 67
bc61cfe6
JB
68 it('Verify that pool statuses properties are set', async () => {
69 const pool = new FixedThreadPool(
70 numberOfWorkers,
b2fd3f4a 71 './tests/worker-files/thread/testWorker.mjs'
bc61cfe6 72 )
bc61cfe6 73 expect(pool.started).toBe(true)
711623b8
JB
74 expect(pool.starting).toBe(false)
75 expect(pool.destroying).toBe(false)
bc61cfe6 76 await pool.destroy()
bc61cfe6
JB
77 })
78
c510fea7 79 it('Verify that filePath is checked', () => {
948faff7 80 expect(() => new FixedThreadPool(numberOfWorkers)).toThrow(
c3719753
JB
81 new TypeError('The worker file path must be specified')
82 )
83 expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrow(
84 new TypeError('The worker file path must be a string')
3d6dd312
JB
85 )
86 expect(
87 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
948faff7 88 ).toThrow(new Error("Cannot find the worker file './dummyWorker.ts'"))
8d3782fa
JB
89 })
90
91 it('Verify that numberOfWorkers is checked', () => {
8003c026
JB
92 expect(
93 () =>
94 new FixedThreadPool(
95 undefined,
b2fd3f4a 96 './tests/worker-files/thread/testWorker.mjs'
8003c026 97 )
948faff7 98 ).toThrow(
e695d66f
JB
99 new Error(
100 'Cannot instantiate a pool without specifying the number of workers'
101 )
8d3782fa
JB
102 )
103 })
104
105 it('Verify that a negative number of workers is checked', () => {
106 expect(
107 () =>
108 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
948faff7 109 ).toThrow(
473c717a
JB
110 new RangeError(
111 'Cannot instantiate a pool with a negative number of workers'
112 )
8d3782fa
JB
113 )
114 })
115
116 it('Verify that a non integer number of workers is checked', () => {
117 expect(
118 () =>
b2fd3f4a 119 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.mjs')
948faff7 120 ).toThrow(
473c717a 121 new TypeError(
0d80593b 122 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
123 )
124 )
c510fea7 125 })
7c0ba920 126
26ce26ca
JB
127 it('Verify that pool arguments number and pool type are checked', () => {
128 expect(
129 () =>
130 new FixedThreadPool(
131 numberOfWorkers,
132 './tests/worker-files/thread/testWorker.mjs',
133 undefined,
134 numberOfWorkers * 2
135 )
136 ).toThrow(
137 new Error(
138 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
139 )
140 )
141 })
142
216541b6 143 it('Verify that dynamic pool sizing is checked', () => {
a5ed75b7
JB
144 expect(
145 () =>
146 new DynamicClusterPool(
147 1,
148 undefined,
149 './tests/worker-files/cluster/testWorker.js'
150 )
948faff7 151 ).toThrow(
a5ed75b7
JB
152 new TypeError(
153 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
154 )
155 )
2761efb4
JB
156 expect(
157 () =>
158 new DynamicThreadPool(
159 0.5,
160 1,
b2fd3f4a 161 './tests/worker-files/thread/testWorker.mjs'
2761efb4 162 )
948faff7 163 ).toThrow(
2761efb4
JB
164 new TypeError(
165 'Cannot instantiate a pool with a non safe integer number of workers'
166 )
167 )
168 expect(
169 () =>
170 new DynamicClusterPool(
171 0,
172 0.5,
a5ed75b7 173 './tests/worker-files/cluster/testWorker.js'
2761efb4 174 )
948faff7 175 ).toThrow(
2761efb4
JB
176 new TypeError(
177 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
178 )
179 )
2431bdb4
JB
180 expect(
181 () =>
b2fd3f4a
JB
182 new DynamicThreadPool(
183 2,
184 1,
185 './tests/worker-files/thread/testWorker.mjs'
186 )
948faff7 187 ).toThrow(
2431bdb4
JB
188 new RangeError(
189 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
190 )
191 )
192 expect(
193 () =>
b2fd3f4a
JB
194 new DynamicThreadPool(
195 0,
196 0,
197 './tests/worker-files/thread/testWorker.mjs'
198 )
948faff7 199 ).toThrow(
2431bdb4 200 new RangeError(
213cbac6 201 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
2431bdb4
JB
202 )
203 )
21f710aa
JB
204 expect(
205 () =>
213cbac6
JB
206 new DynamicClusterPool(
207 1,
208 1,
209 './tests/worker-files/cluster/testWorker.js'
210 )
948faff7 211 ).toThrow(
21f710aa 212 new RangeError(
213cbac6 213 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
21f710aa
JB
214 )
215 )
2431bdb4
JB
216 })
217
fd7ebd49 218 it('Verify that pool options are checked', async () => {
7c0ba920
JB
219 let pool = new FixedThreadPool(
220 numberOfWorkers,
b2fd3f4a 221 './tests/worker-files/thread/testWorker.mjs'
7c0ba920 222 )
b5604034 223 expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
47352846
JB
224 expect(pool.opts).toStrictEqual({
225 startWorkers: true,
226 enableEvents: true,
227 restartWorkerOnError: true,
228 enableTasksQueue: false,
26ce26ca 229 workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN
8990357d
JB
230 })
231 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
00e1bdeb
JB
232 retries:
233 pool.info.maxSize +
234 Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
932fc8be 235 runTime: { median: false },
5df69fab 236 waitTime: { median: false },
00e1bdeb
JB
237 elu: { median: false },
238 weights: expect.objectContaining({
239 0: expect.any(Number),
240 [pool.info.maxSize - 1]: expect.any(Number)
241 })
da309861 242 })
999ef664
JB
243 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
244 .workerChoiceStrategies) {
afd6e845
JB
245 expect(workerChoiceStrategy.opts).toStrictEqual({
246 retries:
247 pool.info.maxSize +
248 Object.keys(workerChoiceStrategy.opts.weights).length,
249 runTime: { median: false },
250 waitTime: { median: false },
251 elu: { median: false },
252 weights: expect.objectContaining({
253 0: expect.any(Number),
254 [pool.info.maxSize - 1]: expect.any(Number)
00e1bdeb 255 })
afd6e845 256 })
999ef664 257 }
fd7ebd49 258 await pool.destroy()
73bfd59d 259 const testHandler = () => console.info('test handler executed')
7c0ba920
JB
260 pool = new FixedThreadPool(
261 numberOfWorkers,
b2fd3f4a 262 './tests/worker-files/thread/testWorker.mjs',
7c0ba920 263 {
e4543b14 264 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
49be33fe 265 workerChoiceStrategyOptions: {
932fc8be 266 runTime: { median: true },
fc027381 267 weights: { 0: 300, 1: 200 }
49be33fe 268 },
35cf1c03 269 enableEvents: false,
1f68cede 270 restartWorkerOnError: false,
ff733df7 271 enableTasksQueue: true,
d4aeae5a 272 tasksQueueOptions: { concurrency: 2 },
35cf1c03
JB
273 messageHandler: testHandler,
274 errorHandler: testHandler,
275 onlineHandler: testHandler,
276 exitHandler: testHandler
7c0ba920
JB
277 }
278 )
7c0ba920 279 expect(pool.emitter).toBeUndefined()
47352846
JB
280 expect(pool.opts).toStrictEqual({
281 startWorkers: true,
282 enableEvents: false,
283 restartWorkerOnError: false,
284 enableTasksQueue: true,
285 tasksQueueOptions: {
286 concurrency: 2,
2324f8c9 287 size: Math.pow(numberOfWorkers, 2),
dbd73092 288 taskStealing: true,
32b141fd 289 tasksStealingOnBackPressure: true,
568d0075 290 tasksFinishedTimeout: 2000
47352846
JB
291 },
292 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
293 workerChoiceStrategyOptions: {
47352846 294 runTime: { median: true },
47352846
JB
295 weights: { 0: 300, 1: 200 }
296 },
297 onlineHandler: testHandler,
298 messageHandler: testHandler,
299 errorHandler: testHandler,
300 exitHandler: testHandler
8990357d
JB
301 })
302 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
449cd154
JB
303 retries:
304 pool.info.maxSize +
305 Object.keys(pool.opts.workerChoiceStrategyOptions.weights).length,
8990357d
JB
306 runTime: { median: true },
307 waitTime: { median: false },
308 elu: { median: false },
fc027381 309 weights: { 0: 300, 1: 200 }
da309861 310 })
999ef664
JB
311 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
312 .workerChoiceStrategies) {
313 expect(workerChoiceStrategy.opts).toStrictEqual({
449cd154
JB
314 retries:
315 pool.info.maxSize +
316 Object.keys(pool.opts.workerChoiceStrategyOptions.weights).length,
999ef664
JB
317 runTime: { median: true },
318 waitTime: { median: false },
319 elu: { median: false },
320 weights: { 0: 300, 1: 200 }
321 })
322 }
fd7ebd49 323 await pool.destroy()
7c0ba920
JB
324 })
325
fe291b64 326 it('Verify that pool options are validated', () => {
d4aeae5a
JB
327 expect(
328 () =>
329 new FixedThreadPool(
330 numberOfWorkers,
b2fd3f4a 331 './tests/worker-files/thread/testWorker.mjs',
d4aeae5a 332 {
f0d7f803 333 workerChoiceStrategy: 'invalidStrategy'
d4aeae5a
JB
334 }
335 )
948faff7 336 ).toThrow(new Error("Invalid worker choice strategy 'invalidStrategy'"))
49be33fe
JB
337 expect(
338 () =>
339 new FixedThreadPool(
340 numberOfWorkers,
b2fd3f4a 341 './tests/worker-files/thread/testWorker.mjs',
49be33fe
JB
342 {
343 workerChoiceStrategyOptions: { weights: {} }
344 }
345 )
948faff7 346 ).toThrow(
8735b4e5
JB
347 new Error(
348 'Invalid worker choice strategy options: must have a weight for each worker node'
349 )
49be33fe 350 )
f0d7f803
JB
351 expect(
352 () =>
353 new FixedThreadPool(
354 numberOfWorkers,
b2fd3f4a 355 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
356 {
357 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
358 }
359 )
948faff7 360 ).toThrow(
8735b4e5
JB
361 new Error(
362 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
363 )
f0d7f803 364 )
5c4c2dee
JB
365 expect(
366 () =>
367 new FixedThreadPool(
368 numberOfWorkers,
b2fd3f4a 369 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
370 {
371 enableTasksQueue: true,
372 tasksQueueOptions: 'invalidTasksQueueOptions'
373 }
374 )
948faff7 375 ).toThrow(
5c4c2dee
JB
376 new TypeError('Invalid tasks queue options: must be a plain object')
377 )
f0d7f803
JB
378 expect(
379 () =>
380 new FixedThreadPool(
381 numberOfWorkers,
b2fd3f4a 382 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
383 {
384 enableTasksQueue: true,
385 tasksQueueOptions: { concurrency: 0 }
386 }
387 )
948faff7 388 ).toThrow(
e695d66f 389 new RangeError(
20c6f652 390 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
8735b4e5
JB
391 )
392 )
f0d7f803
JB
393 expect(
394 () =>
395 new FixedThreadPool(
396 numberOfWorkers,
b2fd3f4a 397 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
398 {
399 enableTasksQueue: true,
5c4c2dee 400 tasksQueueOptions: { concurrency: -1 }
f0d7f803
JB
401 }
402 )
948faff7 403 ).toThrow(
5c4c2dee
JB
404 new RangeError(
405 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
406 )
8735b4e5 407 )
f0d7f803
JB
408 expect(
409 () =>
410 new FixedThreadPool(
411 numberOfWorkers,
b2fd3f4a 412 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
413 {
414 enableTasksQueue: true,
415 tasksQueueOptions: { concurrency: 0.2 }
416 }
417 )
948faff7 418 ).toThrow(
20c6f652 419 new TypeError('Invalid worker node tasks concurrency: must be an integer')
8735b4e5 420 )
5c4c2dee
JB
421 expect(
422 () =>
423 new FixedThreadPool(
424 numberOfWorkers,
b2fd3f4a 425 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
426 {
427 enableTasksQueue: true,
428 tasksQueueOptions: { size: 0 }
429 }
430 )
948faff7 431 ).toThrow(
5c4c2dee
JB
432 new RangeError(
433 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
434 )
435 )
436 expect(
437 () =>
438 new FixedThreadPool(
439 numberOfWorkers,
b2fd3f4a 440 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
441 {
442 enableTasksQueue: true,
443 tasksQueueOptions: { size: -1 }
444 }
445 )
948faff7 446 ).toThrow(
5c4c2dee
JB
447 new RangeError(
448 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
449 )
450 )
451 expect(
452 () =>
453 new FixedThreadPool(
454 numberOfWorkers,
b2fd3f4a 455 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
456 {
457 enableTasksQueue: true,
458 tasksQueueOptions: { size: 0.2 }
459 }
460 )
948faff7 461 ).toThrow(
5c4c2dee
JB
462 new TypeError('Invalid worker node tasks queue size: must be an integer')
463 )
d4aeae5a
JB
464 })
465
2431bdb4 466 it('Verify that pool worker choice strategy options can be set', async () => {
a20f0ba5
JB
467 const pool = new FixedThreadPool(
468 numberOfWorkers,
b2fd3f4a 469 './tests/worker-files/thread/testWorker.mjs',
a20f0ba5
JB
470 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
471 )
26ce26ca 472 expect(pool.opts.workerChoiceStrategyOptions).toBeUndefined()
8990357d 473 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
00e1bdeb
JB
474 retries:
475 pool.info.maxSize +
476 Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
932fc8be 477 runTime: { median: false },
5df69fab 478 waitTime: { median: false },
00e1bdeb
JB
479 elu: { median: false },
480 weights: expect.objectContaining({
481 0: expect.any(Number),
482 [pool.info.maxSize - 1]: expect.any(Number)
483 })
a20f0ba5
JB
484 })
485 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
486 .workerChoiceStrategies) {
55ddaf0f
JB
487 expect(workerChoiceStrategy.opts).toStrictEqual(
488 expect.objectContaining({
489 retries:
490 pool.info.maxSize +
491 Object.keys(workerChoiceStrategy.opts.weights).length,
492 runTime: { median: false },
493 waitTime: { median: false },
494 elu: { median: false }
00e1bdeb 495 })
55ddaf0f 496 )
a20f0ba5 497 }
87de9ff5
JB
498 expect(
499 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
500 ).toStrictEqual({
932fc8be
JB
501 runTime: {
502 aggregate: true,
503 average: true,
504 median: false
505 },
506 waitTime: {
507 aggregate: false,
508 average: false,
509 median: false
510 },
5df69fab 511 elu: {
9adcefab
JB
512 aggregate: true,
513 average: true,
5df69fab
JB
514 median: false
515 }
86bf340d 516 })
9adcefab
JB
517 pool.setWorkerChoiceStrategyOptions({
518 runTime: { median: true },
519 elu: { median: true }
520 })
a20f0ba5 521 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
9adcefab 522 runTime: { median: true },
8990357d
JB
523 elu: { median: true }
524 })
525 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
00e1bdeb
JB
526 retries:
527 pool.info.maxSize +
528 Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
8990357d
JB
529 runTime: { median: true },
530 waitTime: { median: false },
00e1bdeb
JB
531 elu: { median: true },
532 weights: expect.objectContaining({
533 0: expect.any(Number),
534 [pool.info.maxSize - 1]: expect.any(Number)
535 })
a20f0ba5
JB
536 })
537 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
538 .workerChoiceStrategies) {
fc7633dd
JB
539 expect(workerChoiceStrategy.opts).toStrictEqual(
540 expect.objectContaining({
541 retries:
542 pool.info.maxSize +
543 Object.keys(workerChoiceStrategy.opts.weights).length,
544 runTime: { median: true },
545 waitTime: { median: false },
546 elu: { median: true }
00e1bdeb 547 })
fc7633dd 548 )
a20f0ba5 549 }
87de9ff5
JB
550 expect(
551 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
552 ).toStrictEqual({
932fc8be
JB
553 runTime: {
554 aggregate: true,
555 average: false,
556 median: true
557 },
558 waitTime: {
559 aggregate: false,
560 average: false,
561 median: false
562 },
5df69fab 563 elu: {
9adcefab 564 aggregate: true,
5df69fab 565 average: false,
9adcefab 566 median: true
5df69fab 567 }
86bf340d 568 })
9adcefab
JB
569 pool.setWorkerChoiceStrategyOptions({
570 runTime: { median: false },
571 elu: { median: false }
572 })
a20f0ba5 573 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
8990357d 574 runTime: { median: false },
8990357d
JB
575 elu: { median: false }
576 })
577 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
00e1bdeb
JB
578 retries:
579 pool.info.maxSize +
580 Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
9adcefab 581 runTime: { median: false },
8990357d 582 waitTime: { median: false },
00e1bdeb
JB
583 elu: { median: false },
584 weights: expect.objectContaining({
585 0: expect.any(Number),
586 [pool.info.maxSize - 1]: expect.any(Number)
587 })
a20f0ba5
JB
588 })
589 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
590 .workerChoiceStrategies) {
69bba5e2
JB
591 expect(workerChoiceStrategy.opts).toStrictEqual(
592 expect.objectContaining({
593 retries:
594 pool.info.maxSize +
595 Object.keys(workerChoiceStrategy.opts.weights).length,
596 runTime: { median: false },
597 waitTime: { median: false },
598 elu: { median: false }
00e1bdeb 599 })
69bba5e2 600 )
a20f0ba5 601 }
87de9ff5
JB
602 expect(
603 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
604 ).toStrictEqual({
932fc8be
JB
605 runTime: {
606 aggregate: true,
607 average: true,
608 median: false
609 },
610 waitTime: {
611 aggregate: false,
612 average: false,
613 median: false
614 },
5df69fab 615 elu: {
9adcefab
JB
616 aggregate: true,
617 average: true,
5df69fab
JB
618 median: false
619 }
86bf340d 620 })
1f95d544
JB
621 expect(() =>
622 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
948faff7 623 ).toThrow(
8735b4e5
JB
624 new TypeError(
625 'Invalid worker choice strategy options: must be a plain object'
626 )
1f95d544 627 )
948faff7 628 expect(() => pool.setWorkerChoiceStrategyOptions({ weights: {} })).toThrow(
8735b4e5
JB
629 new Error(
630 'Invalid worker choice strategy options: must have a weight for each worker node'
631 )
1f95d544
JB
632 )
633 expect(() =>
634 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
948faff7 635 ).toThrow(
8735b4e5
JB
636 new Error(
637 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
638 )
1f95d544 639 )
a20f0ba5
JB
640 await pool.destroy()
641 })
642
2431bdb4 643 it('Verify that pool tasks queue can be enabled/disabled', async () => {
a20f0ba5
JB
644 const pool = new FixedThreadPool(
645 numberOfWorkers,
b2fd3f4a 646 './tests/worker-files/thread/testWorker.mjs'
a20f0ba5
JB
647 )
648 expect(pool.opts.enableTasksQueue).toBe(false)
649 expect(pool.opts.tasksQueueOptions).toBeUndefined()
650 pool.enableTasksQueue(true)
651 expect(pool.opts.enableTasksQueue).toBe(true)
20c6f652
JB
652 expect(pool.opts.tasksQueueOptions).toStrictEqual({
653 concurrency: 1,
2324f8c9 654 size: Math.pow(numberOfWorkers, 2),
dbd73092 655 taskStealing: true,
32b141fd 656 tasksStealingOnBackPressure: true,
568d0075 657 tasksFinishedTimeout: 2000
20c6f652 658 })
a20f0ba5
JB
659 pool.enableTasksQueue(true, { concurrency: 2 })
660 expect(pool.opts.enableTasksQueue).toBe(true)
20c6f652
JB
661 expect(pool.opts.tasksQueueOptions).toStrictEqual({
662 concurrency: 2,
2324f8c9 663 size: Math.pow(numberOfWorkers, 2),
dbd73092 664 taskStealing: true,
32b141fd 665 tasksStealingOnBackPressure: true,
568d0075 666 tasksFinishedTimeout: 2000
20c6f652 667 })
a20f0ba5
JB
668 pool.enableTasksQueue(false)
669 expect(pool.opts.enableTasksQueue).toBe(false)
670 expect(pool.opts.tasksQueueOptions).toBeUndefined()
671 await pool.destroy()
672 })
673
2431bdb4 674 it('Verify that pool tasks queue options can be set', async () => {
a20f0ba5
JB
675 const pool = new FixedThreadPool(
676 numberOfWorkers,
b2fd3f4a 677 './tests/worker-files/thread/testWorker.mjs',
a20f0ba5
JB
678 { enableTasksQueue: true }
679 )
20c6f652
JB
680 expect(pool.opts.tasksQueueOptions).toStrictEqual({
681 concurrency: 1,
2324f8c9 682 size: Math.pow(numberOfWorkers, 2),
dbd73092 683 taskStealing: true,
32b141fd 684 tasksStealingOnBackPressure: true,
568d0075 685 tasksFinishedTimeout: 2000
20c6f652 686 })
d6ca1416 687 for (const workerNode of pool.workerNodes) {
2324f8c9
JB
688 expect(workerNode.tasksQueueBackPressureSize).toBe(
689 pool.opts.tasksQueueOptions.size
690 )
d6ca1416
JB
691 }
692 pool.setTasksQueueOptions({
693 concurrency: 2,
2324f8c9 694 size: 2,
d6ca1416 695 taskStealing: false,
32b141fd 696 tasksStealingOnBackPressure: false,
568d0075 697 tasksFinishedTimeout: 3000
d6ca1416 698 })
20c6f652
JB
699 expect(pool.opts.tasksQueueOptions).toStrictEqual({
700 concurrency: 2,
2324f8c9 701 size: 2,
d6ca1416 702 taskStealing: false,
32b141fd 703 tasksStealingOnBackPressure: false,
568d0075 704 tasksFinishedTimeout: 3000
d6ca1416
JB
705 })
706 for (const workerNode of pool.workerNodes) {
2324f8c9
JB
707 expect(workerNode.tasksQueueBackPressureSize).toBe(
708 pool.opts.tasksQueueOptions.size
709 )
d6ca1416
JB
710 }
711 pool.setTasksQueueOptions({
712 concurrency: 1,
713 taskStealing: true,
714 tasksStealingOnBackPressure: true
715 })
716 expect(pool.opts.tasksQueueOptions).toStrictEqual({
717 concurrency: 1,
2324f8c9 718 size: Math.pow(numberOfWorkers, 2),
dbd73092 719 taskStealing: true,
32b141fd 720 tasksStealingOnBackPressure: true,
568d0075 721 tasksFinishedTimeout: 2000
20c6f652 722 })
d6ca1416 723 for (const workerNode of pool.workerNodes) {
2324f8c9
JB
724 expect(workerNode.tasksQueueBackPressureSize).toBe(
725 pool.opts.tasksQueueOptions.size
726 )
d6ca1416 727 }
948faff7 728 expect(() => pool.setTasksQueueOptions('invalidTasksQueueOptions')).toThrow(
8735b4e5
JB
729 new TypeError('Invalid tasks queue options: must be a plain object')
730 )
948faff7 731 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrow(
e695d66f 732 new RangeError(
20c6f652 733 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
8735b4e5
JB
734 )
735 )
948faff7 736 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrow(
e695d66f 737 new RangeError(
20c6f652 738 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
8735b4e5 739 )
a20f0ba5 740 )
948faff7 741 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrow(
20c6f652
JB
742 new TypeError('Invalid worker node tasks concurrency: must be an integer')
743 )
948faff7 744 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrow(
20c6f652 745 new RangeError(
68dbcdc0 746 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
20c6f652
JB
747 )
748 )
948faff7 749 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrow(
20c6f652 750 new RangeError(
68dbcdc0 751 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
20c6f652
JB
752 )
753 )
948faff7 754 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrow(
68dbcdc0 755 new TypeError('Invalid worker node tasks queue size: must be an integer')
f0d7f803 756 )
a20f0ba5
JB
757 await pool.destroy()
758 })
759
6b27d407
JB
760 it('Verify that pool info is set', async () => {
761 let pool = new FixedThreadPool(
762 numberOfWorkers,
b2fd3f4a 763 './tests/worker-files/thread/testWorker.mjs'
6b27d407 764 )
2dca6cab
JB
765 expect(pool.info).toStrictEqual({
766 version,
767 type: PoolTypes.fixed,
768 worker: WorkerTypes.thread,
47352846 769 started: true,
2dca6cab
JB
770 ready: true,
771 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
772 minSize: numberOfWorkers,
773 maxSize: numberOfWorkers,
774 workerNodes: numberOfWorkers,
775 idleWorkerNodes: numberOfWorkers,
776 busyWorkerNodes: 0,
777 executedTasks: 0,
778 executingTasks: 0,
2dca6cab
JB
779 failedTasks: 0
780 })
6b27d407
JB
781 await pool.destroy()
782 pool = new DynamicClusterPool(
2431bdb4 783 Math.floor(numberOfWorkers / 2),
6b27d407 784 numberOfWorkers,
ecdfbdc0 785 './tests/worker-files/cluster/testWorker.js'
6b27d407 786 )
2dca6cab
JB
787 expect(pool.info).toStrictEqual({
788 version,
789 type: PoolTypes.dynamic,
790 worker: WorkerTypes.cluster,
47352846 791 started: true,
2dca6cab
JB
792 ready: true,
793 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
794 minSize: Math.floor(numberOfWorkers / 2),
795 maxSize: numberOfWorkers,
796 workerNodes: Math.floor(numberOfWorkers / 2),
797 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
798 busyWorkerNodes: 0,
799 executedTasks: 0,
800 executingTasks: 0,
2dca6cab
JB
801 failedTasks: 0
802 })
6b27d407
JB
803 await pool.destroy()
804 })
805
2431bdb4 806 it('Verify that pool worker tasks usage are initialized', async () => {
bf9549ae
JB
807 const pool = new FixedClusterPool(
808 numberOfWorkers,
809 './tests/worker-files/cluster/testWorker.js'
810 )
f06e48d8 811 for (const workerNode of pool.workerNodes) {
47352846 812 expect(workerNode).toBeInstanceOf(WorkerNode)
465b2940 813 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
814 tasks: {
815 executed: 0,
816 executing: 0,
817 queued: 0,
df593701 818 maxQueued: 0,
463226a4 819 sequentiallyStolen: 0,
68cbdc84 820 stolen: 0,
a4e07f72
JB
821 failed: 0
822 },
823 runTime: {
4ba4c7f9 824 history: new CircularArray()
a4e07f72
JB
825 },
826 waitTime: {
4ba4c7f9 827 history: new CircularArray()
a4e07f72 828 },
5df69fab
JB
829 elu: {
830 idle: {
4ba4c7f9 831 history: new CircularArray()
5df69fab
JB
832 },
833 active: {
4ba4c7f9 834 history: new CircularArray()
f7510105 835 }
5df69fab 836 }
86bf340d 837 })
f06e48d8
JB
838 }
839 await pool.destroy()
840 })
841
2431bdb4
JB
842 it('Verify that pool worker tasks queue are initialized', async () => {
843 let pool = new FixedClusterPool(
f06e48d8
JB
844 numberOfWorkers,
845 './tests/worker-files/cluster/testWorker.js'
846 )
847 for (const workerNode of pool.workerNodes) {
47352846 848 expect(workerNode).toBeInstanceOf(WorkerNode)
574b351d 849 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
4d8bf9e4 850 expect(workerNode.tasksQueue.size).toBe(0)
9c16fb4b 851 expect(workerNode.tasksQueue.maxSize).toBe(0)
bf9549ae 852 }
fd7ebd49 853 await pool.destroy()
2431bdb4
JB
854 pool = new DynamicThreadPool(
855 Math.floor(numberOfWorkers / 2),
856 numberOfWorkers,
b2fd3f4a 857 './tests/worker-files/thread/testWorker.mjs'
2431bdb4
JB
858 )
859 for (const workerNode of pool.workerNodes) {
47352846 860 expect(workerNode).toBeInstanceOf(WorkerNode)
574b351d 861 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
2431bdb4
JB
862 expect(workerNode.tasksQueue.size).toBe(0)
863 expect(workerNode.tasksQueue.maxSize).toBe(0)
864 }
213cbac6 865 await pool.destroy()
2431bdb4
JB
866 })
867
868 it('Verify that pool worker info are initialized', async () => {
869 let pool = new FixedClusterPool(
870 numberOfWorkers,
871 './tests/worker-files/cluster/testWorker.js'
872 )
2dca6cab 873 for (const workerNode of pool.workerNodes) {
47352846 874 expect(workerNode).toBeInstanceOf(WorkerNode)
2dca6cab
JB
875 expect(workerNode.info).toStrictEqual({
876 id: expect.any(Number),
877 type: WorkerTypes.cluster,
878 dynamic: false,
879 ready: true
880 })
881 }
2431bdb4
JB
882 await pool.destroy()
883 pool = new DynamicThreadPool(
884 Math.floor(numberOfWorkers / 2),
885 numberOfWorkers,
b2fd3f4a 886 './tests/worker-files/thread/testWorker.mjs'
2431bdb4 887 )
2dca6cab 888 for (const workerNode of pool.workerNodes) {
47352846 889 expect(workerNode).toBeInstanceOf(WorkerNode)
2dca6cab
JB
890 expect(workerNode.info).toStrictEqual({
891 id: expect.any(Number),
892 type: WorkerTypes.thread,
893 dynamic: false,
7884d183 894 ready: true
2dca6cab
JB
895 })
896 }
213cbac6 897 await pool.destroy()
bf9549ae
JB
898 })
899
711623b8
JB
900 it('Verify that pool statuses are checked at start or destroy', async () => {
901 const pool = new FixedThreadPool(
902 numberOfWorkers,
903 './tests/worker-files/thread/testWorker.mjs'
904 )
905 expect(pool.info.started).toBe(true)
906 expect(pool.info.ready).toBe(true)
907 expect(() => pool.start()).toThrow(
908 new Error('Cannot start an already started pool')
909 )
910 await pool.destroy()
911 expect(pool.info.started).toBe(false)
912 expect(pool.info.ready).toBe(false)
913 await expect(pool.destroy()).rejects.toThrow(
914 new Error('Cannot destroy an already destroyed pool')
915 )
916 })
917
47352846
JB
918 it('Verify that pool can be started after initialization', async () => {
919 const pool = new FixedClusterPool(
920 numberOfWorkers,
921 './tests/worker-files/cluster/testWorker.js',
922 {
923 startWorkers: false
924 }
925 )
926 expect(pool.info.started).toBe(false)
927 expect(pool.info.ready).toBe(false)
55082af9 928 expect(pool.readyEventEmitted).toBe(false)
47352846 929 expect(pool.workerNodes).toStrictEqual([])
948faff7 930 await expect(pool.execute()).rejects.toThrow(
47352846
JB
931 new Error('Cannot execute a task on not started pool')
932 )
933 pool.start()
934 expect(pool.info.started).toBe(true)
935 expect(pool.info.ready).toBe(true)
55082af9
JB
936 await waitPoolEvents(pool, PoolEvents.ready, 1)
937 expect(pool.readyEventEmitted).toBe(true)
47352846
JB
938 expect(pool.workerNodes.length).toBe(numberOfWorkers)
939 for (const workerNode of pool.workerNodes) {
940 expect(workerNode).toBeInstanceOf(WorkerNode)
941 }
942 await pool.destroy()
943 })
944
9d2d0da1
JB
945 it('Verify that pool execute() arguments are checked', async () => {
946 const pool = new FixedClusterPool(
947 numberOfWorkers,
948 './tests/worker-files/cluster/testWorker.js'
949 )
948faff7 950 await expect(pool.execute(undefined, 0)).rejects.toThrow(
9d2d0da1
JB
951 new TypeError('name argument must be a string')
952 )
948faff7 953 await expect(pool.execute(undefined, '')).rejects.toThrow(
9d2d0da1
JB
954 new TypeError('name argument must not be an empty string')
955 )
948faff7 956 await expect(pool.execute(undefined, undefined, {})).rejects.toThrow(
9d2d0da1
JB
957 new TypeError('transferList argument must be an array')
958 )
959 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
960 "Task function 'unknown' not found"
961 )
962 await pool.destroy()
948faff7 963 await expect(pool.execute()).rejects.toThrow(
47352846 964 new Error('Cannot execute a task on not started pool')
9d2d0da1
JB
965 )
966 })
967
2431bdb4 968 it('Verify that pool worker tasks usage are computed', async () => {
bf9549ae
JB
969 const pool = new FixedClusterPool(
970 numberOfWorkers,
971 './tests/worker-files/cluster/testWorker.js'
972 )
09c2d0d3 973 const promises = new Set()
fc027381
JB
974 const maxMultiplier = 2
975 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
09c2d0d3 976 promises.add(pool.execute())
bf9549ae 977 }
f06e48d8 978 for (const workerNode of pool.workerNodes) {
465b2940 979 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
980 tasks: {
981 executed: 0,
982 executing: maxMultiplier,
983 queued: 0,
df593701 984 maxQueued: 0,
463226a4 985 sequentiallyStolen: 0,
68cbdc84 986 stolen: 0,
a4e07f72
JB
987 failed: 0
988 },
989 runTime: {
a4e07f72
JB
990 history: expect.any(CircularArray)
991 },
992 waitTime: {
a4e07f72
JB
993 history: expect.any(CircularArray)
994 },
5df69fab
JB
995 elu: {
996 idle: {
5df69fab
JB
997 history: expect.any(CircularArray)
998 },
999 active: {
5df69fab 1000 history: expect.any(CircularArray)
f7510105 1001 }
5df69fab 1002 }
86bf340d 1003 })
bf9549ae
JB
1004 }
1005 await Promise.all(promises)
f06e48d8 1006 for (const workerNode of pool.workerNodes) {
465b2940 1007 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
1008 tasks: {
1009 executed: maxMultiplier,
1010 executing: 0,
1011 queued: 0,
df593701 1012 maxQueued: 0,
463226a4 1013 sequentiallyStolen: 0,
68cbdc84 1014 stolen: 0,
a4e07f72
JB
1015 failed: 0
1016 },
1017 runTime: {
a4e07f72
JB
1018 history: expect.any(CircularArray)
1019 },
1020 waitTime: {
a4e07f72
JB
1021 history: expect.any(CircularArray)
1022 },
5df69fab
JB
1023 elu: {
1024 idle: {
5df69fab
JB
1025 history: expect.any(CircularArray)
1026 },
1027 active: {
5df69fab 1028 history: expect.any(CircularArray)
f7510105 1029 }
5df69fab 1030 }
86bf340d 1031 })
bf9549ae 1032 }
fd7ebd49 1033 await pool.destroy()
bf9549ae
JB
1034 })
1035
2431bdb4 1036 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
7fd82a1c 1037 const pool = new DynamicThreadPool(
2431bdb4 1038 Math.floor(numberOfWorkers / 2),
8f4878b7 1039 numberOfWorkers,
b2fd3f4a 1040 './tests/worker-files/thread/testWorker.mjs'
9e619829 1041 )
09c2d0d3 1042 const promises = new Set()
ee9f5295
JB
1043 const maxMultiplier = 2
1044 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
09c2d0d3 1045 promises.add(pool.execute())
9e619829
JB
1046 }
1047 await Promise.all(promises)
f06e48d8 1048 for (const workerNode of pool.workerNodes) {
465b2940 1049 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
1050 tasks: {
1051 executed: expect.any(Number),
1052 executing: 0,
1053 queued: 0,
df593701 1054 maxQueued: 0,
463226a4 1055 sequentiallyStolen: 0,
68cbdc84 1056 stolen: 0,
a4e07f72
JB
1057 failed: 0
1058 },
1059 runTime: {
a4e07f72
JB
1060 history: expect.any(CircularArray)
1061 },
1062 waitTime: {
a4e07f72
JB
1063 history: expect.any(CircularArray)
1064 },
5df69fab
JB
1065 elu: {
1066 idle: {
5df69fab
JB
1067 history: expect.any(CircularArray)
1068 },
1069 active: {
5df69fab 1070 history: expect.any(CircularArray)
f7510105 1071 }
5df69fab 1072 }
86bf340d 1073 })
465b2940 1074 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
94407def
JB
1075 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1076 numberOfWorkers * maxMultiplier
1077 )
b97d82d8
JB
1078 expect(workerNode.usage.runTime.history.length).toBe(0)
1079 expect(workerNode.usage.waitTime.history.length).toBe(0)
1080 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1081 expect(workerNode.usage.elu.active.history.length).toBe(0)
9e619829
JB
1082 }
1083 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
f06e48d8 1084 for (const workerNode of pool.workerNodes) {
465b2940 1085 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
1086 tasks: {
1087 executed: 0,
1088 executing: 0,
1089 queued: 0,
df593701 1090 maxQueued: 0,
463226a4 1091 sequentiallyStolen: 0,
68cbdc84 1092 stolen: 0,
a4e07f72
JB
1093 failed: 0
1094 },
1095 runTime: {
a4e07f72
JB
1096 history: expect.any(CircularArray)
1097 },
1098 waitTime: {
a4e07f72
JB
1099 history: expect.any(CircularArray)
1100 },
5df69fab
JB
1101 elu: {
1102 idle: {
5df69fab
JB
1103 history: expect.any(CircularArray)
1104 },
1105 active: {
5df69fab 1106 history: expect.any(CircularArray)
f7510105 1107 }
5df69fab 1108 }
86bf340d 1109 })
465b2940
JB
1110 expect(workerNode.usage.runTime.history.length).toBe(0)
1111 expect(workerNode.usage.waitTime.history.length).toBe(0)
b97d82d8
JB
1112 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1113 expect(workerNode.usage.elu.active.history.length).toBe(0)
ee11a4a2 1114 }
fd7ebd49 1115 await pool.destroy()
ee11a4a2
JB
1116 })
1117
a1763c54
JB
1118 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1119 const pool = new DynamicClusterPool(
2431bdb4 1120 Math.floor(numberOfWorkers / 2),
164d950a 1121 numberOfWorkers,
a1763c54 1122 './tests/worker-files/cluster/testWorker.js'
164d950a 1123 )
c726f66c 1124 expect(pool.emitter.eventNames()).toStrictEqual([])
d46660cd 1125 let poolInfo
a1763c54 1126 let poolReady = 0
041dc05b 1127 pool.emitter.on(PoolEvents.ready, info => {
a1763c54 1128 ++poolReady
d46660cd
JB
1129 poolInfo = info
1130 })
a1763c54 1131 await waitPoolEvents(pool, PoolEvents.ready, 1)
c726f66c 1132 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
a1763c54 1133 expect(poolReady).toBe(1)
d46660cd 1134 expect(poolInfo).toStrictEqual({
23ccf9d7 1135 version,
d46660cd 1136 type: PoolTypes.dynamic,
a1763c54 1137 worker: WorkerTypes.cluster,
47352846 1138 started: true,
a1763c54 1139 ready: true,
2431bdb4
JB
1140 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1141 minSize: expect.any(Number),
1142 maxSize: expect.any(Number),
1143 workerNodes: expect.any(Number),
1144 idleWorkerNodes: expect.any(Number),
1145 busyWorkerNodes: expect.any(Number),
1146 executedTasks: expect.any(Number),
1147 executingTasks: expect.any(Number),
2431bdb4
JB
1148 failedTasks: expect.any(Number)
1149 })
1150 await pool.destroy()
1151 })
1152
a1763c54
JB
1153 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1154 const pool = new FixedThreadPool(
2431bdb4 1155 numberOfWorkers,
b2fd3f4a 1156 './tests/worker-files/thread/testWorker.mjs'
2431bdb4 1157 )
c726f66c 1158 expect(pool.emitter.eventNames()).toStrictEqual([])
a1763c54
JB
1159 const promises = new Set()
1160 let poolBusy = 0
2431bdb4 1161 let poolInfo
041dc05b 1162 pool.emitter.on(PoolEvents.busy, info => {
a1763c54 1163 ++poolBusy
2431bdb4
JB
1164 poolInfo = info
1165 })
c726f66c 1166 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
a1763c54
JB
1167 for (let i = 0; i < numberOfWorkers * 2; i++) {
1168 promises.add(pool.execute())
1169 }
1170 await Promise.all(promises)
1171 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1172 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1173 expect(poolBusy).toBe(numberOfWorkers + 1)
2431bdb4
JB
1174 expect(poolInfo).toStrictEqual({
1175 version,
a1763c54
JB
1176 type: PoolTypes.fixed,
1177 worker: WorkerTypes.thread,
47352846
JB
1178 started: true,
1179 ready: true,
2431bdb4 1180 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
d46660cd
JB
1181 minSize: expect.any(Number),
1182 maxSize: expect.any(Number),
1183 workerNodes: expect.any(Number),
1184 idleWorkerNodes: expect.any(Number),
1185 busyWorkerNodes: expect.any(Number),
a4e07f72
JB
1186 executedTasks: expect.any(Number),
1187 executingTasks: expect.any(Number),
a4e07f72 1188 failedTasks: expect.any(Number)
d46660cd 1189 })
164d950a
JB
1190 await pool.destroy()
1191 })
1192
a1763c54
JB
1193 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1194 const pool = new DynamicThreadPool(
1195 Math.floor(numberOfWorkers / 2),
7c0ba920 1196 numberOfWorkers,
b2fd3f4a 1197 './tests/worker-files/thread/testWorker.mjs'
7c0ba920 1198 )
c726f66c 1199 expect(pool.emitter.eventNames()).toStrictEqual([])
09c2d0d3 1200 const promises = new Set()
a1763c54 1201 let poolFull = 0
d46660cd 1202 let poolInfo
041dc05b 1203 pool.emitter.on(PoolEvents.full, info => {
a1763c54 1204 ++poolFull
d46660cd
JB
1205 poolInfo = info
1206 })
c726f66c 1207 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.full])
7c0ba920 1208 for (let i = 0; i < numberOfWorkers * 2; i++) {
f5d14e90 1209 promises.add(pool.execute())
7c0ba920 1210 }
cf597bc5 1211 await Promise.all(promises)
33e6bb4c 1212 expect(poolFull).toBe(1)
d46660cd 1213 expect(poolInfo).toStrictEqual({
23ccf9d7 1214 version,
a1763c54 1215 type: PoolTypes.dynamic,
d46660cd 1216 worker: WorkerTypes.thread,
47352846
JB
1217 started: true,
1218 ready: true,
2431bdb4 1219 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
8735b4e5
JB
1220 minSize: expect.any(Number),
1221 maxSize: expect.any(Number),
1222 workerNodes: expect.any(Number),
1223 idleWorkerNodes: expect.any(Number),
1224 busyWorkerNodes: expect.any(Number),
1225 executedTasks: expect.any(Number),
1226 executingTasks: expect.any(Number),
1227 failedTasks: expect.any(Number)
1228 })
1229 await pool.destroy()
1230 })
1231
3e8611a8 1232 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
b1aae695 1233 const pool = new FixedThreadPool(
8735b4e5 1234 numberOfWorkers,
b2fd3f4a 1235 './tests/worker-files/thread/testWorker.mjs',
8735b4e5
JB
1236 {
1237 enableTasksQueue: true
1238 }
1239 )
a074ffee 1240 stub(pool, 'hasBackPressure').returns(true)
c726f66c 1241 expect(pool.emitter.eventNames()).toStrictEqual([])
8735b4e5
JB
1242 const promises = new Set()
1243 let poolBackPressure = 0
1244 let poolInfo
041dc05b 1245 pool.emitter.on(PoolEvents.backPressure, info => {
8735b4e5
JB
1246 ++poolBackPressure
1247 poolInfo = info
1248 })
c726f66c 1249 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.backPressure])
033f1776 1250 for (let i = 0; i < numberOfWorkers + 1; i++) {
8735b4e5
JB
1251 promises.add(pool.execute())
1252 }
1253 await Promise.all(promises)
033f1776 1254 expect(poolBackPressure).toBe(1)
8735b4e5
JB
1255 expect(poolInfo).toStrictEqual({
1256 version,
3e8611a8 1257 type: PoolTypes.fixed,
8735b4e5 1258 worker: WorkerTypes.thread,
47352846
JB
1259 started: true,
1260 ready: true,
8735b4e5 1261 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
d46660cd
JB
1262 minSize: expect.any(Number),
1263 maxSize: expect.any(Number),
1264 workerNodes: expect.any(Number),
1265 idleWorkerNodes: expect.any(Number),
1266 busyWorkerNodes: expect.any(Number),
a4e07f72
JB
1267 executedTasks: expect.any(Number),
1268 executingTasks: expect.any(Number),
3e8611a8
JB
1269 maxQueuedTasks: expect.any(Number),
1270 queuedTasks: expect.any(Number),
1271 backPressure: true,
68cbdc84 1272 stolenTasks: expect.any(Number),
a4e07f72 1273 failedTasks: expect.any(Number)
d46660cd 1274 })
e458c82e 1275 expect(pool.hasBackPressure.callCount).toBe(5)
fd7ebd49 1276 await pool.destroy()
7c0ba920 1277 })
70a4f5ea 1278
85b2561d
JB
1279 it('Verify that destroy() waits for queued tasks to finish', async () => {
1280 const tasksFinishedTimeout = 2500
1281 const pool = new FixedThreadPool(
1282 numberOfWorkers,
1283 './tests/worker-files/thread/asyncWorker.mjs',
1284 {
1285 enableTasksQueue: true,
1286 tasksQueueOptions: { tasksFinishedTimeout }
1287 }
1288 )
1289 const maxMultiplier = 4
1290 let tasksFinished = 0
1291 for (const workerNode of pool.workerNodes) {
1292 workerNode.on('taskFinished', () => {
1293 ++tasksFinished
1294 })
1295 }
1296 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1297 pool.execute()
1298 }
1299 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1300 const startTime = performance.now()
1301 await pool.destroy()
1302 const elapsedTime = performance.now() - startTime
1303 expect(tasksFinished).toBe(numberOfWorkers * maxMultiplier)
1304 expect(elapsedTime).toBeGreaterThanOrEqual(2000)
6d41131f 1305 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 100)
85b2561d
JB
1306 })
1307
1308 it('Verify that destroy() waits until the tasks finished timeout is reached', async () => {
1309 const tasksFinishedTimeout = 1000
1310 const pool = new FixedThreadPool(
1311 numberOfWorkers,
1312 './tests/worker-files/thread/asyncWorker.mjs',
1313 {
1314 enableTasksQueue: true,
1315 tasksQueueOptions: { tasksFinishedTimeout }
1316 }
1317 )
1318 const maxMultiplier = 4
1319 let tasksFinished = 0
1320 for (const workerNode of pool.workerNodes) {
1321 workerNode.on('taskFinished', () => {
1322 ++tasksFinished
1323 })
1324 }
1325 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1326 pool.execute()
1327 }
1328 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1329 const startTime = performance.now()
1330 await pool.destroy()
1331 const elapsedTime = performance.now() - startTime
1332 expect(tasksFinished).toBe(0)
c004ecec 1333 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 600)
85b2561d
JB
1334 })
1335
f18fd12b
JB
1336 it('Verify that pool asynchronous resource track tasks execution', async () => {
1337 let taskAsyncId
1338 let initCalls = 0
1339 let beforeCalls = 0
1340 let afterCalls = 0
1341 let resolveCalls = 0
1342 const hook = createHook({
1343 init (asyncId, type) {
1344 if (type === 'poolifier:task') {
1345 initCalls++
1346 taskAsyncId = asyncId
1347 }
1348 },
1349 before (asyncId) {
1350 if (asyncId === taskAsyncId) beforeCalls++
1351 },
1352 after (asyncId) {
1353 if (asyncId === taskAsyncId) afterCalls++
1354 },
1355 promiseResolve () {
1356 if (executionAsyncId() === taskAsyncId) resolveCalls++
1357 }
1358 })
f18fd12b
JB
1359 const pool = new FixedThreadPool(
1360 numberOfWorkers,
1361 './tests/worker-files/thread/testWorker.mjs'
1362 )
8954c0a3 1363 hook.enable()
f18fd12b
JB
1364 await pool.execute()
1365 hook.disable()
1366 expect(initCalls).toBe(1)
1367 expect(beforeCalls).toBe(1)
1368 expect(afterCalls).toBe(1)
1369 expect(resolveCalls).toBe(1)
8954c0a3 1370 await pool.destroy()
f18fd12b
JB
1371 })
1372
9eae3c69
JB
1373 it('Verify that hasTaskFunction() is working', async () => {
1374 const dynamicThreadPool = new DynamicThreadPool(
1375 Math.floor(numberOfWorkers / 2),
1376 numberOfWorkers,
b2fd3f4a 1377 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
9eae3c69
JB
1378 )
1379 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1380 expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1381 expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1382 true
1383 )
1384 expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true)
1385 expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true)
1386 expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false)
1387 await dynamicThreadPool.destroy()
1388 const fixedClusterPool = new FixedClusterPool(
1389 numberOfWorkers,
1390 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1391 )
1392 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1393 expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1394 expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1395 true
1396 )
1397 expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true)
1398 expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true)
1399 expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false)
1400 await fixedClusterPool.destroy()
1401 })
1402
1403 it('Verify that addTaskFunction() is working', async () => {
1404 const dynamicThreadPool = new DynamicThreadPool(
1405 Math.floor(numberOfWorkers / 2),
1406 numberOfWorkers,
b2fd3f4a 1407 './tests/worker-files/thread/testWorker.mjs'
9eae3c69
JB
1408 )
1409 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
3feeab69
JB
1410 await expect(
1411 dynamicThreadPool.addTaskFunction(0, () => {})
948faff7 1412 ).rejects.toThrow(new TypeError('name argument must be a string'))
3feeab69
JB
1413 await expect(
1414 dynamicThreadPool.addTaskFunction('', () => {})
948faff7 1415 ).rejects.toThrow(
3feeab69
JB
1416 new TypeError('name argument must not be an empty string')
1417 )
948faff7
JB
1418 await expect(dynamicThreadPool.addTaskFunction('test', 0)).rejects.toThrow(
1419 new TypeError('fn argument must be a function')
1420 )
1421 await expect(dynamicThreadPool.addTaskFunction('test', '')).rejects.toThrow(
1422 new TypeError('fn argument must be a function')
1423 )
9eae3c69
JB
1424 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1425 DEFAULT_TASK_NAME,
1426 'test'
1427 ])
1428 const echoTaskFunction = data => {
1429 return data
1430 }
1431 await expect(
1432 dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1433 ).resolves.toBe(true)
1434 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1435 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1436 echoTaskFunction
1437 )
1438 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1439 DEFAULT_TASK_NAME,
1440 'test',
1441 'echo'
1442 ])
1443 const taskFunctionData = { test: 'test' }
1444 const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
1445 expect(echoResult).toStrictEqual(taskFunctionData)
adee6053
JB
1446 for (const workerNode of dynamicThreadPool.workerNodes) {
1447 expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
1448 tasks: {
1449 executed: expect.any(Number),
1450 executing: 0,
1451 queued: 0,
463226a4 1452 sequentiallyStolen: 0,
5ad42e34 1453 stolen: 0,
adee6053
JB
1454 failed: 0
1455 },
1456 runTime: {
1457 history: new CircularArray()
1458 },
1459 waitTime: {
1460 history: new CircularArray()
1461 },
1462 elu: {
1463 idle: {
1464 history: new CircularArray()
1465 },
1466 active: {
1467 history: new CircularArray()
1468 }
1469 }
1470 })
1471 }
9eae3c69
JB
1472 await dynamicThreadPool.destroy()
1473 })
1474
1475 it('Verify that removeTaskFunction() is working', async () => {
1476 const dynamicThreadPool = new DynamicThreadPool(
1477 Math.floor(numberOfWorkers / 2),
1478 numberOfWorkers,
b2fd3f4a 1479 './tests/worker-files/thread/testWorker.mjs'
9eae3c69
JB
1480 )
1481 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1482 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1483 DEFAULT_TASK_NAME,
1484 'test'
1485 ])
948faff7 1486 await expect(dynamicThreadPool.removeTaskFunction('test')).rejects.toThrow(
16248b23 1487 new Error('Cannot remove a task function not handled on the pool side')
9eae3c69
JB
1488 )
1489 const echoTaskFunction = data => {
1490 return data
1491 }
1492 await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1493 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1494 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1495 echoTaskFunction
1496 )
1497 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1498 DEFAULT_TASK_NAME,
1499 'test',
1500 'echo'
1501 ])
1502 await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
1503 true
1504 )
1505 expect(dynamicThreadPool.taskFunctions.size).toBe(0)
1506 expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
1507 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1508 DEFAULT_TASK_NAME,
1509 'test'
1510 ])
1511 await dynamicThreadPool.destroy()
1512 })
1513
30500265 1514 it('Verify that listTaskFunctionNames() is working', async () => {
90d7d101
JB
1515 const dynamicThreadPool = new DynamicThreadPool(
1516 Math.floor(numberOfWorkers / 2),
1517 numberOfWorkers,
b2fd3f4a 1518 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
90d7d101
JB
1519 )
1520 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
66979634 1521 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
6cd5248f 1522 DEFAULT_TASK_NAME,
90d7d101
JB
1523 'jsonIntegerSerialization',
1524 'factorial',
1525 'fibonacci'
1526 ])
9eae3c69 1527 await dynamicThreadPool.destroy()
90d7d101
JB
1528 const fixedClusterPool = new FixedClusterPool(
1529 numberOfWorkers,
1530 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1531 )
1532 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
66979634 1533 expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([
6cd5248f 1534 DEFAULT_TASK_NAME,
90d7d101
JB
1535 'jsonIntegerSerialization',
1536 'factorial',
1537 'fibonacci'
1538 ])
0fe39c97 1539 await fixedClusterPool.destroy()
90d7d101
JB
1540 })
1541
9eae3c69 1542 it('Verify that setDefaultTaskFunction() is working', async () => {
30500265
JB
1543 const dynamicThreadPool = new DynamicThreadPool(
1544 Math.floor(numberOfWorkers / 2),
1545 numberOfWorkers,
b2fd3f4a 1546 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
30500265
JB
1547 )
1548 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
711623b8 1549 const workerId = dynamicThreadPool.workerNodes[0].info.id
948faff7 1550 await expect(dynamicThreadPool.setDefaultTaskFunction(0)).rejects.toThrow(
b0b55f57 1551 new Error(
711623b8 1552 `Task function operation 'default' failed on worker ${workerId} with error: 'TypeError: name parameter is not a string'`
b0b55f57
JB
1553 )
1554 )
1555 await expect(
1556 dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
948faff7 1557 ).rejects.toThrow(
b0b55f57 1558 new Error(
711623b8 1559 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function reserved name as the default task function'`
b0b55f57
JB
1560 )
1561 )
1562 await expect(
1563 dynamicThreadPool.setDefaultTaskFunction('unknown')
948faff7 1564 ).rejects.toThrow(
b0b55f57 1565 new Error(
711623b8 1566 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function to a non-existing task function'`
b0b55f57
JB
1567 )
1568 )
9eae3c69
JB
1569 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1570 DEFAULT_TASK_NAME,
1571 'jsonIntegerSerialization',
1572 'factorial',
1573 'fibonacci'
1574 ])
1575 await expect(
1576 dynamicThreadPool.setDefaultTaskFunction('factorial')
1577 ).resolves.toBe(true)
1578 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1579 DEFAULT_TASK_NAME,
1580 'factorial',
1581 'jsonIntegerSerialization',
1582 'fibonacci'
1583 ])
1584 await expect(
1585 dynamicThreadPool.setDefaultTaskFunction('fibonacci')
1586 ).resolves.toBe(true)
1587 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1588 DEFAULT_TASK_NAME,
1589 'fibonacci',
1590 'jsonIntegerSerialization',
1591 'factorial'
1592 ])
cda9ba34 1593 await dynamicThreadPool.destroy()
30500265
JB
1594 })
1595
90d7d101 1596 it('Verify that multiple task functions worker is working', async () => {
70a4f5ea 1597 const pool = new DynamicClusterPool(
2431bdb4 1598 Math.floor(numberOfWorkers / 2),
70a4f5ea 1599 numberOfWorkers,
90d7d101 1600 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
70a4f5ea
JB
1601 )
1602 const data = { n: 10 }
82888165 1603 const result0 = await pool.execute(data)
30b963d4 1604 expect(result0).toStrictEqual({ ok: 1 })
70a4f5ea 1605 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
30b963d4 1606 expect(result1).toStrictEqual({ ok: 1 })
70a4f5ea
JB
1607 const result2 = await pool.execute(data, 'factorial')
1608 expect(result2).toBe(3628800)
1609 const result3 = await pool.execute(data, 'fibonacci')
024daf59 1610 expect(result3).toBe(55)
5bb5be17
JB
1611 expect(pool.info.executingTasks).toBe(0)
1612 expect(pool.info.executedTasks).toBe(4)
b414b84c 1613 for (const workerNode of pool.workerNodes) {
66979634 1614 expect(workerNode.info.taskFunctionNames).toStrictEqual([
6cd5248f 1615 DEFAULT_TASK_NAME,
b414b84c
JB
1616 'jsonIntegerSerialization',
1617 'factorial',
1618 'fibonacci'
1619 ])
1620 expect(workerNode.taskFunctionsUsage.size).toBe(3)
66979634 1621 for (const name of pool.listTaskFunctionNames()) {
5bb5be17
JB
1622 expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
1623 tasks: {
1624 executed: expect.any(Number),
4ba4c7f9 1625 executing: 0,
5bb5be17 1626 failed: 0,
68cbdc84 1627 queued: 0,
463226a4 1628 sequentiallyStolen: 0,
68cbdc84 1629 stolen: 0
5bb5be17
JB
1630 },
1631 runTime: {
1632 history: expect.any(CircularArray)
1633 },
1634 waitTime: {
1635 history: expect.any(CircularArray)
1636 },
1637 elu: {
1638 idle: {
1639 history: expect.any(CircularArray)
1640 },
1641 active: {
1642 history: expect.any(CircularArray)
1643 }
1644 }
1645 })
1646 expect(
4ba4c7f9
JB
1647 workerNode.getTaskFunctionWorkerUsage(name).tasks.executed
1648 ).toBeGreaterThan(0)
5bb5be17 1649 }
dfd7ec01
JB
1650 expect(
1651 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1652 ).toStrictEqual(
66979634
JB
1653 workerNode.getTaskFunctionWorkerUsage(
1654 workerNode.info.taskFunctionNames[1]
1655 )
dfd7ec01 1656 )
5bb5be17 1657 }
0fe39c97 1658 await pool.destroy()
70a4f5ea 1659 })
52a23942
JB
1660
1661 it('Verify sendKillMessageToWorker()', async () => {
1662 const pool = new DynamicClusterPool(
1663 Math.floor(numberOfWorkers / 2),
1664 numberOfWorkers,
1665 './tests/worker-files/cluster/testWorker.js'
1666 )
1667 const workerNodeKey = 0
1668 await expect(
adee6053 1669 pool.sendKillMessageToWorker(workerNodeKey)
52a23942 1670 ).resolves.toBeUndefined()
85b2561d
JB
1671 await expect(
1672 pool.sendKillMessageToWorker(numberOfWorkers)
1673 ).rejects.toStrictEqual(
1674 new Error(`Invalid worker node key '${numberOfWorkers}'`)
1675 )
52a23942
JB
1676 await pool.destroy()
1677 })
adee6053
JB
1678
1679 it('Verify sendTaskFunctionOperationToWorker()', async () => {
1680 const pool = new DynamicClusterPool(
1681 Math.floor(numberOfWorkers / 2),
1682 numberOfWorkers,
1683 './tests/worker-files/cluster/testWorker.js'
1684 )
1685 const workerNodeKey = 0
1686 await expect(
1687 pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
1688 taskFunctionOperation: 'add',
1689 taskFunctionName: 'empty',
1690 taskFunction: (() => {}).toString()
1691 })
1692 ).resolves.toBe(true)
1693 expect(
1694 pool.workerNodes[workerNodeKey].info.taskFunctionNames
1695 ).toStrictEqual([DEFAULT_TASK_NAME, 'test', 'empty'])
1696 await pool.destroy()
1697 })
1698
1699 it('Verify sendTaskFunctionOperationToWorkers()', async () => {
1700 const pool = new DynamicClusterPool(
1701 Math.floor(numberOfWorkers / 2),
1702 numberOfWorkers,
1703 './tests/worker-files/cluster/testWorker.js'
1704 )
adee6053
JB
1705 await expect(
1706 pool.sendTaskFunctionOperationToWorkers({
1707 taskFunctionOperation: 'add',
1708 taskFunctionName: 'empty',
1709 taskFunction: (() => {}).toString()
1710 })
1711 ).resolves.toBe(true)
1712 for (const workerNode of pool.workerNodes) {
1713 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1714 DEFAULT_TASK_NAME,
1715 'test',
1716 'empty'
1717 ])
1718 }
1719 await pool.destroy()
1720 })
3ec964d6 1721})