test: refine expectation
[poolifier.git] / tests / pools / abstract-pool.test.mjs
CommitLineData
a074ffee 1import { EventEmitterAsyncResource } from 'node:events'
2eb14889 2import { dirname, join } from 'node:path'
a074ffee 3import { readFileSync } from 'node:fs'
2eb14889 4import { fileURLToPath } from 'node:url'
f18fd12b 5import { createHook, executionAsyncId } from 'node:async_hooks'
a074ffee
JB
6import { expect } from 'expect'
7import { restore, stub } from 'sinon'
8import {
70a4f5ea 9 DynamicClusterPool,
9e619829 10 DynamicThreadPool,
aee46736 11 FixedClusterPool,
e843b904 12 FixedThreadPool,
aee46736 13 PoolEvents,
184855e6 14 PoolTypes,
3d6dd312 15 WorkerChoiceStrategies,
184855e6 16 WorkerTypes
a074ffee
JB
17} from '../../lib/index.js'
18import { CircularArray } from '../../lib/circular-array.js'
19import { Deque } from '../../lib/deque.js'
20import { DEFAULT_TASK_NAME } from '../../lib/utils.js'
21import { waitPoolEvents } from '../test-utils.js'
22import { WorkerNode } from '../../lib/pools/worker-node.js'
e1ffb94f
JB
23
24describe('Abstract pool test suite', () => {
2eb14889
JB
25 const version = JSON.parse(
26 readFileSync(
a5e5599c 27 join(dirname(fileURLToPath(import.meta.url)), '../..', 'package.json'),
2eb14889
JB
28 'utf8'
29 )
30 ).version
fc027381 31 const numberOfWorkers = 2
a8884ffd 32 class StubPoolWithIsMain extends FixedThreadPool {
e1ffb94f
JB
33 isMain () {
34 return false
35 }
3ec964d6 36 }
3ec964d6 37
dc021bcc 38 afterEach(() => {
a074ffee 39 restore()
dc021bcc
JB
40 })
41
bcf85f7f
JB
42 it('Verify that pool can be created and destroyed', async () => {
43 const pool = new FixedThreadPool(
44 numberOfWorkers,
45 './tests/worker-files/thread/testWorker.mjs'
46 )
47 expect(pool).toBeInstanceOf(FixedThreadPool)
48 await pool.destroy()
49 })
50
51 it('Verify that pool cannot be created from a non main thread/process', () => {
8d3782fa
JB
52 expect(
53 () =>
a8884ffd 54 new StubPoolWithIsMain(
7c0ba920 55 numberOfWorkers,
b2fd3f4a 56 './tests/worker-files/thread/testWorker.mjs',
8d3782fa 57 {
041dc05b 58 errorHandler: e => console.error(e)
8d3782fa
JB
59 }
60 )
948faff7 61 ).toThrow(
e695d66f
JB
62 new Error(
63 'Cannot start a pool from a worker with the same type as the pool'
64 )
04f45163 65 )
3ec964d6 66 })
c510fea7 67
bc61cfe6
JB
68 it('Verify that pool statuses properties are set', async () => {
69 const pool = new FixedThreadPool(
70 numberOfWorkers,
b2fd3f4a 71 './tests/worker-files/thread/testWorker.mjs'
bc61cfe6 72 )
bc61cfe6 73 expect(pool.started).toBe(true)
711623b8
JB
74 expect(pool.starting).toBe(false)
75 expect(pool.destroying).toBe(false)
bc61cfe6 76 await pool.destroy()
bc61cfe6
JB
77 })
78
c510fea7 79 it('Verify that filePath is checked', () => {
948faff7 80 expect(() => new FixedThreadPool(numberOfWorkers)).toThrow(
c3719753
JB
81 new TypeError('The worker file path must be specified')
82 )
83 expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrow(
84 new TypeError('The worker file path must be a string')
3d6dd312
JB
85 )
86 expect(
87 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
948faff7 88 ).toThrow(new Error("Cannot find the worker file './dummyWorker.ts'"))
8d3782fa
JB
89 })
90
91 it('Verify that numberOfWorkers is checked', () => {
8003c026
JB
92 expect(
93 () =>
94 new FixedThreadPool(
95 undefined,
b2fd3f4a 96 './tests/worker-files/thread/testWorker.mjs'
8003c026 97 )
948faff7 98 ).toThrow(
e695d66f
JB
99 new Error(
100 'Cannot instantiate a pool without specifying the number of workers'
101 )
8d3782fa
JB
102 )
103 })
104
105 it('Verify that a negative number of workers is checked', () => {
106 expect(
107 () =>
108 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
948faff7 109 ).toThrow(
473c717a
JB
110 new RangeError(
111 'Cannot instantiate a pool with a negative number of workers'
112 )
8d3782fa
JB
113 )
114 })
115
116 it('Verify that a non integer number of workers is checked', () => {
117 expect(
118 () =>
b2fd3f4a 119 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.mjs')
948faff7 120 ).toThrow(
473c717a 121 new TypeError(
0d80593b 122 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
123 )
124 )
c510fea7 125 })
7c0ba920 126
26ce26ca
JB
127 it('Verify that pool arguments number and pool type are checked', () => {
128 expect(
129 () =>
130 new FixedThreadPool(
131 numberOfWorkers,
132 './tests/worker-files/thread/testWorker.mjs',
133 undefined,
134 numberOfWorkers * 2
135 )
136 ).toThrow(
137 new Error(
138 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
139 )
140 )
141 })
142
216541b6 143 it('Verify that dynamic pool sizing is checked', () => {
a5ed75b7
JB
144 expect(
145 () =>
146 new DynamicClusterPool(
147 1,
148 undefined,
149 './tests/worker-files/cluster/testWorker.js'
150 )
948faff7 151 ).toThrow(
a5ed75b7
JB
152 new TypeError(
153 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
154 )
155 )
2761efb4
JB
156 expect(
157 () =>
158 new DynamicThreadPool(
159 0.5,
160 1,
b2fd3f4a 161 './tests/worker-files/thread/testWorker.mjs'
2761efb4 162 )
948faff7 163 ).toThrow(
2761efb4
JB
164 new TypeError(
165 'Cannot instantiate a pool with a non safe integer number of workers'
166 )
167 )
168 expect(
169 () =>
170 new DynamicClusterPool(
171 0,
172 0.5,
a5ed75b7 173 './tests/worker-files/cluster/testWorker.js'
2761efb4 174 )
948faff7 175 ).toThrow(
2761efb4
JB
176 new TypeError(
177 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
178 )
179 )
2431bdb4
JB
180 expect(
181 () =>
b2fd3f4a
JB
182 new DynamicThreadPool(
183 2,
184 1,
185 './tests/worker-files/thread/testWorker.mjs'
186 )
948faff7 187 ).toThrow(
2431bdb4
JB
188 new RangeError(
189 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
190 )
191 )
192 expect(
193 () =>
b2fd3f4a
JB
194 new DynamicThreadPool(
195 0,
196 0,
197 './tests/worker-files/thread/testWorker.mjs'
198 )
948faff7 199 ).toThrow(
2431bdb4 200 new RangeError(
213cbac6 201 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
2431bdb4
JB
202 )
203 )
21f710aa
JB
204 expect(
205 () =>
213cbac6
JB
206 new DynamicClusterPool(
207 1,
208 1,
209 './tests/worker-files/cluster/testWorker.js'
210 )
948faff7 211 ).toThrow(
21f710aa 212 new RangeError(
213cbac6 213 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
21f710aa
JB
214 )
215 )
2431bdb4
JB
216 })
217
fd7ebd49 218 it('Verify that pool options are checked', async () => {
7c0ba920
JB
219 let pool = new FixedThreadPool(
220 numberOfWorkers,
b2fd3f4a 221 './tests/worker-files/thread/testWorker.mjs'
7c0ba920 222 )
b5604034 223 expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
47352846
JB
224 expect(pool.opts).toStrictEqual({
225 startWorkers: true,
226 enableEvents: true,
227 restartWorkerOnError: true,
228 enableTasksQueue: false,
26ce26ca 229 workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN
8990357d
JB
230 })
231 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
00e1bdeb
JB
232 retries:
233 pool.info.maxSize +
234 Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
932fc8be 235 runTime: { median: false },
5df69fab 236 waitTime: { median: false },
00e1bdeb
JB
237 elu: { median: false },
238 weights: expect.objectContaining({
239 0: expect.any(Number),
240 [pool.info.maxSize - 1]: expect.any(Number)
241 })
da309861 242 })
999ef664
JB
243 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
244 .workerChoiceStrategies) {
146eaa19
JB
245 expect(workerChoiceStrategy.opts).toStrictEqual(
246 expect.objectContaining({
247 retries:
248 pool.info.maxSize +
249 Object.keys(workerChoiceStrategy.opts.weights).length,
250 runTime: { median: false },
251 waitTime: { median: false },
252 elu: { median: false }
00e1bdeb 253 })
146eaa19 254 )
999ef664 255 }
fd7ebd49 256 await pool.destroy()
73bfd59d 257 const testHandler = () => console.info('test handler executed')
7c0ba920
JB
258 pool = new FixedThreadPool(
259 numberOfWorkers,
b2fd3f4a 260 './tests/worker-files/thread/testWorker.mjs',
7c0ba920 261 {
e4543b14 262 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
49be33fe 263 workerChoiceStrategyOptions: {
932fc8be 264 runTime: { median: true },
fc027381 265 weights: { 0: 300, 1: 200 }
49be33fe 266 },
35cf1c03 267 enableEvents: false,
1f68cede 268 restartWorkerOnError: false,
ff733df7 269 enableTasksQueue: true,
d4aeae5a 270 tasksQueueOptions: { concurrency: 2 },
35cf1c03
JB
271 messageHandler: testHandler,
272 errorHandler: testHandler,
273 onlineHandler: testHandler,
274 exitHandler: testHandler
7c0ba920
JB
275 }
276 )
7c0ba920 277 expect(pool.emitter).toBeUndefined()
47352846
JB
278 expect(pool.opts).toStrictEqual({
279 startWorkers: true,
280 enableEvents: false,
281 restartWorkerOnError: false,
282 enableTasksQueue: true,
283 tasksQueueOptions: {
284 concurrency: 2,
2324f8c9 285 size: Math.pow(numberOfWorkers, 2),
dbd73092 286 taskStealing: true,
32b141fd 287 tasksStealingOnBackPressure: true,
568d0075 288 tasksFinishedTimeout: 2000
47352846
JB
289 },
290 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
291 workerChoiceStrategyOptions: {
47352846 292 runTime: { median: true },
47352846
JB
293 weights: { 0: 300, 1: 200 }
294 },
295 onlineHandler: testHandler,
296 messageHandler: testHandler,
297 errorHandler: testHandler,
298 exitHandler: testHandler
8990357d
JB
299 })
300 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
449cd154
JB
301 retries:
302 pool.info.maxSize +
303 Object.keys(pool.opts.workerChoiceStrategyOptions.weights).length,
8990357d
JB
304 runTime: { median: true },
305 waitTime: { median: false },
306 elu: { median: false },
fc027381 307 weights: { 0: 300, 1: 200 }
da309861 308 })
999ef664
JB
309 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
310 .workerChoiceStrategies) {
311 expect(workerChoiceStrategy.opts).toStrictEqual({
449cd154
JB
312 retries:
313 pool.info.maxSize +
314 Object.keys(pool.opts.workerChoiceStrategyOptions.weights).length,
999ef664
JB
315 runTime: { median: true },
316 waitTime: { median: false },
317 elu: { median: false },
318 weights: { 0: 300, 1: 200 }
319 })
320 }
fd7ebd49 321 await pool.destroy()
7c0ba920
JB
322 })
323
fe291b64 324 it('Verify that pool options are validated', () => {
d4aeae5a
JB
325 expect(
326 () =>
327 new FixedThreadPool(
328 numberOfWorkers,
b2fd3f4a 329 './tests/worker-files/thread/testWorker.mjs',
d4aeae5a 330 {
f0d7f803 331 workerChoiceStrategy: 'invalidStrategy'
d4aeae5a
JB
332 }
333 )
948faff7 334 ).toThrow(new Error("Invalid worker choice strategy 'invalidStrategy'"))
49be33fe
JB
335 expect(
336 () =>
337 new FixedThreadPool(
338 numberOfWorkers,
b2fd3f4a 339 './tests/worker-files/thread/testWorker.mjs',
49be33fe
JB
340 {
341 workerChoiceStrategyOptions: { weights: {} }
342 }
343 )
948faff7 344 ).toThrow(
8735b4e5
JB
345 new Error(
346 'Invalid worker choice strategy options: must have a weight for each worker node'
347 )
49be33fe 348 )
f0d7f803
JB
349 expect(
350 () =>
351 new FixedThreadPool(
352 numberOfWorkers,
b2fd3f4a 353 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
354 {
355 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
356 }
357 )
948faff7 358 ).toThrow(
8735b4e5
JB
359 new Error(
360 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
361 )
f0d7f803 362 )
5c4c2dee
JB
363 expect(
364 () =>
365 new FixedThreadPool(
366 numberOfWorkers,
b2fd3f4a 367 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
368 {
369 enableTasksQueue: true,
370 tasksQueueOptions: 'invalidTasksQueueOptions'
371 }
372 )
948faff7 373 ).toThrow(
5c4c2dee
JB
374 new TypeError('Invalid tasks queue options: must be a plain object')
375 )
f0d7f803
JB
376 expect(
377 () =>
378 new FixedThreadPool(
379 numberOfWorkers,
b2fd3f4a 380 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
381 {
382 enableTasksQueue: true,
383 tasksQueueOptions: { concurrency: 0 }
384 }
385 )
948faff7 386 ).toThrow(
e695d66f 387 new RangeError(
20c6f652 388 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
8735b4e5
JB
389 )
390 )
f0d7f803
JB
391 expect(
392 () =>
393 new FixedThreadPool(
394 numberOfWorkers,
b2fd3f4a 395 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
396 {
397 enableTasksQueue: true,
5c4c2dee 398 tasksQueueOptions: { concurrency: -1 }
f0d7f803
JB
399 }
400 )
948faff7 401 ).toThrow(
5c4c2dee
JB
402 new RangeError(
403 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
404 )
8735b4e5 405 )
f0d7f803
JB
406 expect(
407 () =>
408 new FixedThreadPool(
409 numberOfWorkers,
b2fd3f4a 410 './tests/worker-files/thread/testWorker.mjs',
f0d7f803
JB
411 {
412 enableTasksQueue: true,
413 tasksQueueOptions: { concurrency: 0.2 }
414 }
415 )
948faff7 416 ).toThrow(
20c6f652 417 new TypeError('Invalid worker node tasks concurrency: must be an integer')
8735b4e5 418 )
5c4c2dee
JB
419 expect(
420 () =>
421 new FixedThreadPool(
422 numberOfWorkers,
b2fd3f4a 423 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
424 {
425 enableTasksQueue: true,
426 tasksQueueOptions: { size: 0 }
427 }
428 )
948faff7 429 ).toThrow(
5c4c2dee
JB
430 new RangeError(
431 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
432 )
433 )
434 expect(
435 () =>
436 new FixedThreadPool(
437 numberOfWorkers,
b2fd3f4a 438 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
439 {
440 enableTasksQueue: true,
441 tasksQueueOptions: { size: -1 }
442 }
443 )
948faff7 444 ).toThrow(
5c4c2dee
JB
445 new RangeError(
446 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
447 )
448 )
449 expect(
450 () =>
451 new FixedThreadPool(
452 numberOfWorkers,
b2fd3f4a 453 './tests/worker-files/thread/testWorker.mjs',
5c4c2dee
JB
454 {
455 enableTasksQueue: true,
456 tasksQueueOptions: { size: 0.2 }
457 }
458 )
948faff7 459 ).toThrow(
5c4c2dee
JB
460 new TypeError('Invalid worker node tasks queue size: must be an integer')
461 )
d4aeae5a
JB
462 })
463
2431bdb4 464 it('Verify that pool worker choice strategy options can be set', async () => {
a20f0ba5
JB
465 const pool = new FixedThreadPool(
466 numberOfWorkers,
b2fd3f4a 467 './tests/worker-files/thread/testWorker.mjs',
a20f0ba5
JB
468 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
469 )
26ce26ca 470 expect(pool.opts.workerChoiceStrategyOptions).toBeUndefined()
8990357d 471 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
00e1bdeb
JB
472 retries:
473 pool.info.maxSize +
474 Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
932fc8be 475 runTime: { median: false },
5df69fab 476 waitTime: { median: false },
00e1bdeb
JB
477 elu: { median: false },
478 weights: expect.objectContaining({
479 0: expect.any(Number),
480 [pool.info.maxSize - 1]: expect.any(Number)
481 })
a20f0ba5
JB
482 })
483 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
484 .workerChoiceStrategies) {
55ddaf0f
JB
485 expect(workerChoiceStrategy.opts).toStrictEqual(
486 expect.objectContaining({
487 retries:
488 pool.info.maxSize +
489 Object.keys(workerChoiceStrategy.opts.weights).length,
490 runTime: { median: false },
491 waitTime: { median: false },
492 elu: { median: false }
00e1bdeb 493 })
55ddaf0f 494 )
a20f0ba5 495 }
87de9ff5
JB
496 expect(
497 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
498 ).toStrictEqual({
932fc8be
JB
499 runTime: {
500 aggregate: true,
501 average: true,
502 median: false
503 },
504 waitTime: {
505 aggregate: false,
506 average: false,
507 median: false
508 },
5df69fab 509 elu: {
9adcefab
JB
510 aggregate: true,
511 average: true,
5df69fab
JB
512 median: false
513 }
86bf340d 514 })
9adcefab
JB
515 pool.setWorkerChoiceStrategyOptions({
516 runTime: { median: true },
517 elu: { median: true }
518 })
a20f0ba5 519 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
9adcefab 520 runTime: { median: true },
8990357d
JB
521 elu: { median: true }
522 })
523 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
00e1bdeb
JB
524 retries:
525 pool.info.maxSize +
526 Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
8990357d
JB
527 runTime: { median: true },
528 waitTime: { median: false },
00e1bdeb
JB
529 elu: { median: true },
530 weights: expect.objectContaining({
531 0: expect.any(Number),
532 [pool.info.maxSize - 1]: expect.any(Number)
533 })
a20f0ba5
JB
534 })
535 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
536 .workerChoiceStrategies) {
fc7633dd
JB
537 expect(workerChoiceStrategy.opts).toStrictEqual(
538 expect.objectContaining({
539 retries:
540 pool.info.maxSize +
541 Object.keys(workerChoiceStrategy.opts.weights).length,
542 runTime: { median: true },
543 waitTime: { median: false },
544 elu: { median: true }
00e1bdeb 545 })
fc7633dd 546 )
a20f0ba5 547 }
87de9ff5
JB
548 expect(
549 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
550 ).toStrictEqual({
932fc8be
JB
551 runTime: {
552 aggregate: true,
553 average: false,
554 median: true
555 },
556 waitTime: {
557 aggregate: false,
558 average: false,
559 median: false
560 },
5df69fab 561 elu: {
9adcefab 562 aggregate: true,
5df69fab 563 average: false,
9adcefab 564 median: true
5df69fab 565 }
86bf340d 566 })
9adcefab
JB
567 pool.setWorkerChoiceStrategyOptions({
568 runTime: { median: false },
569 elu: { median: false }
570 })
a20f0ba5 571 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
8990357d 572 runTime: { median: false },
8990357d
JB
573 elu: { median: false }
574 })
575 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
00e1bdeb
JB
576 retries:
577 pool.info.maxSize +
578 Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
9adcefab 579 runTime: { median: false },
8990357d 580 waitTime: { median: false },
00e1bdeb
JB
581 elu: { median: false },
582 weights: expect.objectContaining({
583 0: expect.any(Number),
584 [pool.info.maxSize - 1]: expect.any(Number)
585 })
a20f0ba5
JB
586 })
587 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
588 .workerChoiceStrategies) {
69bba5e2
JB
589 expect(workerChoiceStrategy.opts).toStrictEqual(
590 expect.objectContaining({
591 retries:
592 pool.info.maxSize +
593 Object.keys(workerChoiceStrategy.opts.weights).length,
594 runTime: { median: false },
595 waitTime: { median: false },
596 elu: { median: false }
00e1bdeb 597 })
69bba5e2 598 )
a20f0ba5 599 }
87de9ff5
JB
600 expect(
601 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
602 ).toStrictEqual({
932fc8be
JB
603 runTime: {
604 aggregate: true,
605 average: true,
606 median: false
607 },
608 waitTime: {
609 aggregate: false,
610 average: false,
611 median: false
612 },
5df69fab 613 elu: {
9adcefab
JB
614 aggregate: true,
615 average: true,
5df69fab
JB
616 median: false
617 }
86bf340d 618 })
1f95d544
JB
619 expect(() =>
620 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
948faff7 621 ).toThrow(
8735b4e5
JB
622 new TypeError(
623 'Invalid worker choice strategy options: must be a plain object'
624 )
1f95d544 625 )
948faff7 626 expect(() => pool.setWorkerChoiceStrategyOptions({ weights: {} })).toThrow(
8735b4e5
JB
627 new Error(
628 'Invalid worker choice strategy options: must have a weight for each worker node'
629 )
1f95d544
JB
630 )
631 expect(() =>
632 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
948faff7 633 ).toThrow(
8735b4e5
JB
634 new Error(
635 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
636 )
1f95d544 637 )
a20f0ba5
JB
638 await pool.destroy()
639 })
640
2431bdb4 641 it('Verify that pool tasks queue can be enabled/disabled', async () => {
a20f0ba5
JB
642 const pool = new FixedThreadPool(
643 numberOfWorkers,
b2fd3f4a 644 './tests/worker-files/thread/testWorker.mjs'
a20f0ba5
JB
645 )
646 expect(pool.opts.enableTasksQueue).toBe(false)
647 expect(pool.opts.tasksQueueOptions).toBeUndefined()
648 pool.enableTasksQueue(true)
649 expect(pool.opts.enableTasksQueue).toBe(true)
20c6f652
JB
650 expect(pool.opts.tasksQueueOptions).toStrictEqual({
651 concurrency: 1,
2324f8c9 652 size: Math.pow(numberOfWorkers, 2),
dbd73092 653 taskStealing: true,
32b141fd 654 tasksStealingOnBackPressure: true,
568d0075 655 tasksFinishedTimeout: 2000
20c6f652 656 })
a20f0ba5
JB
657 pool.enableTasksQueue(true, { concurrency: 2 })
658 expect(pool.opts.enableTasksQueue).toBe(true)
20c6f652
JB
659 expect(pool.opts.tasksQueueOptions).toStrictEqual({
660 concurrency: 2,
2324f8c9 661 size: Math.pow(numberOfWorkers, 2),
dbd73092 662 taskStealing: true,
32b141fd 663 tasksStealingOnBackPressure: true,
568d0075 664 tasksFinishedTimeout: 2000
20c6f652 665 })
a20f0ba5
JB
666 pool.enableTasksQueue(false)
667 expect(pool.opts.enableTasksQueue).toBe(false)
668 expect(pool.opts.tasksQueueOptions).toBeUndefined()
669 await pool.destroy()
670 })
671
2431bdb4 672 it('Verify that pool tasks queue options can be set', async () => {
a20f0ba5
JB
673 const pool = new FixedThreadPool(
674 numberOfWorkers,
b2fd3f4a 675 './tests/worker-files/thread/testWorker.mjs',
a20f0ba5
JB
676 { enableTasksQueue: true }
677 )
20c6f652
JB
678 expect(pool.opts.tasksQueueOptions).toStrictEqual({
679 concurrency: 1,
2324f8c9 680 size: Math.pow(numberOfWorkers, 2),
dbd73092 681 taskStealing: true,
32b141fd 682 tasksStealingOnBackPressure: true,
568d0075 683 tasksFinishedTimeout: 2000
20c6f652 684 })
d6ca1416 685 for (const workerNode of pool.workerNodes) {
2324f8c9
JB
686 expect(workerNode.tasksQueueBackPressureSize).toBe(
687 pool.opts.tasksQueueOptions.size
688 )
d6ca1416
JB
689 }
690 pool.setTasksQueueOptions({
691 concurrency: 2,
2324f8c9 692 size: 2,
d6ca1416 693 taskStealing: false,
32b141fd 694 tasksStealingOnBackPressure: false,
568d0075 695 tasksFinishedTimeout: 3000
d6ca1416 696 })
20c6f652
JB
697 expect(pool.opts.tasksQueueOptions).toStrictEqual({
698 concurrency: 2,
2324f8c9 699 size: 2,
d6ca1416 700 taskStealing: false,
32b141fd 701 tasksStealingOnBackPressure: false,
568d0075 702 tasksFinishedTimeout: 3000
d6ca1416
JB
703 })
704 for (const workerNode of pool.workerNodes) {
2324f8c9
JB
705 expect(workerNode.tasksQueueBackPressureSize).toBe(
706 pool.opts.tasksQueueOptions.size
707 )
d6ca1416
JB
708 }
709 pool.setTasksQueueOptions({
710 concurrency: 1,
711 taskStealing: true,
712 tasksStealingOnBackPressure: true
713 })
714 expect(pool.opts.tasksQueueOptions).toStrictEqual({
715 concurrency: 1,
2324f8c9 716 size: Math.pow(numberOfWorkers, 2),
dbd73092 717 taskStealing: true,
32b141fd 718 tasksStealingOnBackPressure: true,
568d0075 719 tasksFinishedTimeout: 2000
20c6f652 720 })
d6ca1416 721 for (const workerNode of pool.workerNodes) {
2324f8c9
JB
722 expect(workerNode.tasksQueueBackPressureSize).toBe(
723 pool.opts.tasksQueueOptions.size
724 )
d6ca1416 725 }
948faff7 726 expect(() => pool.setTasksQueueOptions('invalidTasksQueueOptions')).toThrow(
8735b4e5
JB
727 new TypeError('Invalid tasks queue options: must be a plain object')
728 )
948faff7 729 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrow(
e695d66f 730 new RangeError(
20c6f652 731 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
8735b4e5
JB
732 )
733 )
948faff7 734 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrow(
e695d66f 735 new RangeError(
20c6f652 736 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
8735b4e5 737 )
a20f0ba5 738 )
948faff7 739 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrow(
20c6f652
JB
740 new TypeError('Invalid worker node tasks concurrency: must be an integer')
741 )
948faff7 742 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrow(
20c6f652 743 new RangeError(
68dbcdc0 744 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
20c6f652
JB
745 )
746 )
948faff7 747 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrow(
20c6f652 748 new RangeError(
68dbcdc0 749 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
20c6f652
JB
750 )
751 )
948faff7 752 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrow(
68dbcdc0 753 new TypeError('Invalid worker node tasks queue size: must be an integer')
f0d7f803 754 )
a20f0ba5
JB
755 await pool.destroy()
756 })
757
6b27d407
JB
758 it('Verify that pool info is set', async () => {
759 let pool = new FixedThreadPool(
760 numberOfWorkers,
b2fd3f4a 761 './tests/worker-files/thread/testWorker.mjs'
6b27d407 762 )
2dca6cab
JB
763 expect(pool.info).toStrictEqual({
764 version,
765 type: PoolTypes.fixed,
766 worker: WorkerTypes.thread,
47352846 767 started: true,
2dca6cab
JB
768 ready: true,
769 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
770 minSize: numberOfWorkers,
771 maxSize: numberOfWorkers,
772 workerNodes: numberOfWorkers,
773 idleWorkerNodes: numberOfWorkers,
774 busyWorkerNodes: 0,
775 executedTasks: 0,
776 executingTasks: 0,
2dca6cab
JB
777 failedTasks: 0
778 })
6b27d407
JB
779 await pool.destroy()
780 pool = new DynamicClusterPool(
2431bdb4 781 Math.floor(numberOfWorkers / 2),
6b27d407 782 numberOfWorkers,
ecdfbdc0 783 './tests/worker-files/cluster/testWorker.js'
6b27d407 784 )
2dca6cab
JB
785 expect(pool.info).toStrictEqual({
786 version,
787 type: PoolTypes.dynamic,
788 worker: WorkerTypes.cluster,
47352846 789 started: true,
2dca6cab
JB
790 ready: true,
791 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
792 minSize: Math.floor(numberOfWorkers / 2),
793 maxSize: numberOfWorkers,
794 workerNodes: Math.floor(numberOfWorkers / 2),
795 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
796 busyWorkerNodes: 0,
797 executedTasks: 0,
798 executingTasks: 0,
2dca6cab
JB
799 failedTasks: 0
800 })
6b27d407
JB
801 await pool.destroy()
802 })
803
2431bdb4 804 it('Verify that pool worker tasks usage are initialized', async () => {
bf9549ae
JB
805 const pool = new FixedClusterPool(
806 numberOfWorkers,
807 './tests/worker-files/cluster/testWorker.js'
808 )
f06e48d8 809 for (const workerNode of pool.workerNodes) {
47352846 810 expect(workerNode).toBeInstanceOf(WorkerNode)
465b2940 811 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
812 tasks: {
813 executed: 0,
814 executing: 0,
815 queued: 0,
df593701 816 maxQueued: 0,
463226a4 817 sequentiallyStolen: 0,
68cbdc84 818 stolen: 0,
a4e07f72
JB
819 failed: 0
820 },
821 runTime: {
4ba4c7f9 822 history: new CircularArray()
a4e07f72
JB
823 },
824 waitTime: {
4ba4c7f9 825 history: new CircularArray()
a4e07f72 826 },
5df69fab
JB
827 elu: {
828 idle: {
4ba4c7f9 829 history: new CircularArray()
5df69fab
JB
830 },
831 active: {
4ba4c7f9 832 history: new CircularArray()
f7510105 833 }
5df69fab 834 }
86bf340d 835 })
f06e48d8
JB
836 }
837 await pool.destroy()
838 })
839
2431bdb4
JB
840 it('Verify that pool worker tasks queue are initialized', async () => {
841 let pool = new FixedClusterPool(
f06e48d8
JB
842 numberOfWorkers,
843 './tests/worker-files/cluster/testWorker.js'
844 )
845 for (const workerNode of pool.workerNodes) {
47352846 846 expect(workerNode).toBeInstanceOf(WorkerNode)
574b351d 847 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
4d8bf9e4 848 expect(workerNode.tasksQueue.size).toBe(0)
9c16fb4b 849 expect(workerNode.tasksQueue.maxSize).toBe(0)
bf9549ae 850 }
fd7ebd49 851 await pool.destroy()
2431bdb4
JB
852 pool = new DynamicThreadPool(
853 Math.floor(numberOfWorkers / 2),
854 numberOfWorkers,
b2fd3f4a 855 './tests/worker-files/thread/testWorker.mjs'
2431bdb4
JB
856 )
857 for (const workerNode of pool.workerNodes) {
47352846 858 expect(workerNode).toBeInstanceOf(WorkerNode)
574b351d 859 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
2431bdb4
JB
860 expect(workerNode.tasksQueue.size).toBe(0)
861 expect(workerNode.tasksQueue.maxSize).toBe(0)
862 }
213cbac6 863 await pool.destroy()
2431bdb4
JB
864 })
865
866 it('Verify that pool worker info are initialized', async () => {
867 let pool = new FixedClusterPool(
868 numberOfWorkers,
869 './tests/worker-files/cluster/testWorker.js'
870 )
2dca6cab 871 for (const workerNode of pool.workerNodes) {
47352846 872 expect(workerNode).toBeInstanceOf(WorkerNode)
2dca6cab
JB
873 expect(workerNode.info).toStrictEqual({
874 id: expect.any(Number),
875 type: WorkerTypes.cluster,
876 dynamic: false,
877 ready: true
878 })
879 }
2431bdb4
JB
880 await pool.destroy()
881 pool = new DynamicThreadPool(
882 Math.floor(numberOfWorkers / 2),
883 numberOfWorkers,
b2fd3f4a 884 './tests/worker-files/thread/testWorker.mjs'
2431bdb4 885 )
2dca6cab 886 for (const workerNode of pool.workerNodes) {
47352846 887 expect(workerNode).toBeInstanceOf(WorkerNode)
2dca6cab
JB
888 expect(workerNode.info).toStrictEqual({
889 id: expect.any(Number),
890 type: WorkerTypes.thread,
891 dynamic: false,
7884d183 892 ready: true
2dca6cab
JB
893 })
894 }
213cbac6 895 await pool.destroy()
bf9549ae
JB
896 })
897
711623b8
JB
898 it('Verify that pool statuses are checked at start or destroy', async () => {
899 const pool = new FixedThreadPool(
900 numberOfWorkers,
901 './tests/worker-files/thread/testWorker.mjs'
902 )
903 expect(pool.info.started).toBe(true)
904 expect(pool.info.ready).toBe(true)
905 expect(() => pool.start()).toThrow(
906 new Error('Cannot start an already started pool')
907 )
908 await pool.destroy()
909 expect(pool.info.started).toBe(false)
910 expect(pool.info.ready).toBe(false)
911 await expect(pool.destroy()).rejects.toThrow(
912 new Error('Cannot destroy an already destroyed pool')
913 )
914 })
915
47352846
JB
916 it('Verify that pool can be started after initialization', async () => {
917 const pool = new FixedClusterPool(
918 numberOfWorkers,
919 './tests/worker-files/cluster/testWorker.js',
920 {
921 startWorkers: false
922 }
923 )
924 expect(pool.info.started).toBe(false)
925 expect(pool.info.ready).toBe(false)
55082af9 926 expect(pool.readyEventEmitted).toBe(false)
47352846 927 expect(pool.workerNodes).toStrictEqual([])
948faff7 928 await expect(pool.execute()).rejects.toThrow(
47352846
JB
929 new Error('Cannot execute a task on not started pool')
930 )
931 pool.start()
932 expect(pool.info.started).toBe(true)
933 expect(pool.info.ready).toBe(true)
55082af9
JB
934 await waitPoolEvents(pool, PoolEvents.ready, 1)
935 expect(pool.readyEventEmitted).toBe(true)
47352846
JB
936 expect(pool.workerNodes.length).toBe(numberOfWorkers)
937 for (const workerNode of pool.workerNodes) {
938 expect(workerNode).toBeInstanceOf(WorkerNode)
939 }
940 await pool.destroy()
941 })
942
9d2d0da1
JB
943 it('Verify that pool execute() arguments are checked', async () => {
944 const pool = new FixedClusterPool(
945 numberOfWorkers,
946 './tests/worker-files/cluster/testWorker.js'
947 )
948faff7 948 await expect(pool.execute(undefined, 0)).rejects.toThrow(
9d2d0da1
JB
949 new TypeError('name argument must be a string')
950 )
948faff7 951 await expect(pool.execute(undefined, '')).rejects.toThrow(
9d2d0da1
JB
952 new TypeError('name argument must not be an empty string')
953 )
948faff7 954 await expect(pool.execute(undefined, undefined, {})).rejects.toThrow(
9d2d0da1
JB
955 new TypeError('transferList argument must be an array')
956 )
957 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
958 "Task function 'unknown' not found"
959 )
960 await pool.destroy()
948faff7 961 await expect(pool.execute()).rejects.toThrow(
47352846 962 new Error('Cannot execute a task on not started pool')
9d2d0da1
JB
963 )
964 })
965
2431bdb4 966 it('Verify that pool worker tasks usage are computed', async () => {
bf9549ae
JB
967 const pool = new FixedClusterPool(
968 numberOfWorkers,
969 './tests/worker-files/cluster/testWorker.js'
970 )
09c2d0d3 971 const promises = new Set()
fc027381
JB
972 const maxMultiplier = 2
973 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
09c2d0d3 974 promises.add(pool.execute())
bf9549ae 975 }
f06e48d8 976 for (const workerNode of pool.workerNodes) {
465b2940 977 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
978 tasks: {
979 executed: 0,
980 executing: maxMultiplier,
981 queued: 0,
df593701 982 maxQueued: 0,
463226a4 983 sequentiallyStolen: 0,
68cbdc84 984 stolen: 0,
a4e07f72
JB
985 failed: 0
986 },
987 runTime: {
a4e07f72
JB
988 history: expect.any(CircularArray)
989 },
990 waitTime: {
a4e07f72
JB
991 history: expect.any(CircularArray)
992 },
5df69fab
JB
993 elu: {
994 idle: {
5df69fab
JB
995 history: expect.any(CircularArray)
996 },
997 active: {
5df69fab 998 history: expect.any(CircularArray)
f7510105 999 }
5df69fab 1000 }
86bf340d 1001 })
bf9549ae
JB
1002 }
1003 await Promise.all(promises)
f06e48d8 1004 for (const workerNode of pool.workerNodes) {
465b2940 1005 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
1006 tasks: {
1007 executed: maxMultiplier,
1008 executing: 0,
1009 queued: 0,
df593701 1010 maxQueued: 0,
463226a4 1011 sequentiallyStolen: 0,
68cbdc84 1012 stolen: 0,
a4e07f72
JB
1013 failed: 0
1014 },
1015 runTime: {
a4e07f72
JB
1016 history: expect.any(CircularArray)
1017 },
1018 waitTime: {
a4e07f72
JB
1019 history: expect.any(CircularArray)
1020 },
5df69fab
JB
1021 elu: {
1022 idle: {
5df69fab
JB
1023 history: expect.any(CircularArray)
1024 },
1025 active: {
5df69fab 1026 history: expect.any(CircularArray)
f7510105 1027 }
5df69fab 1028 }
86bf340d 1029 })
bf9549ae 1030 }
fd7ebd49 1031 await pool.destroy()
bf9549ae
JB
1032 })
1033
2431bdb4 1034 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
7fd82a1c 1035 const pool = new DynamicThreadPool(
2431bdb4 1036 Math.floor(numberOfWorkers / 2),
8f4878b7 1037 numberOfWorkers,
b2fd3f4a 1038 './tests/worker-files/thread/testWorker.mjs'
9e619829 1039 )
09c2d0d3 1040 const promises = new Set()
ee9f5295
JB
1041 const maxMultiplier = 2
1042 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
09c2d0d3 1043 promises.add(pool.execute())
9e619829
JB
1044 }
1045 await Promise.all(promises)
f06e48d8 1046 for (const workerNode of pool.workerNodes) {
465b2940 1047 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
1048 tasks: {
1049 executed: expect.any(Number),
1050 executing: 0,
1051 queued: 0,
df593701 1052 maxQueued: 0,
463226a4 1053 sequentiallyStolen: 0,
68cbdc84 1054 stolen: 0,
a4e07f72
JB
1055 failed: 0
1056 },
1057 runTime: {
a4e07f72
JB
1058 history: expect.any(CircularArray)
1059 },
1060 waitTime: {
a4e07f72
JB
1061 history: expect.any(CircularArray)
1062 },
5df69fab
JB
1063 elu: {
1064 idle: {
5df69fab
JB
1065 history: expect.any(CircularArray)
1066 },
1067 active: {
5df69fab 1068 history: expect.any(CircularArray)
f7510105 1069 }
5df69fab 1070 }
86bf340d 1071 })
465b2940 1072 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
94407def
JB
1073 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1074 numberOfWorkers * maxMultiplier
1075 )
b97d82d8
JB
1076 expect(workerNode.usage.runTime.history.length).toBe(0)
1077 expect(workerNode.usage.waitTime.history.length).toBe(0)
1078 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1079 expect(workerNode.usage.elu.active.history.length).toBe(0)
9e619829
JB
1080 }
1081 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
f06e48d8 1082 for (const workerNode of pool.workerNodes) {
465b2940 1083 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
1084 tasks: {
1085 executed: 0,
1086 executing: 0,
1087 queued: 0,
df593701 1088 maxQueued: 0,
463226a4 1089 sequentiallyStolen: 0,
68cbdc84 1090 stolen: 0,
a4e07f72
JB
1091 failed: 0
1092 },
1093 runTime: {
a4e07f72
JB
1094 history: expect.any(CircularArray)
1095 },
1096 waitTime: {
a4e07f72
JB
1097 history: expect.any(CircularArray)
1098 },
5df69fab
JB
1099 elu: {
1100 idle: {
5df69fab
JB
1101 history: expect.any(CircularArray)
1102 },
1103 active: {
5df69fab 1104 history: expect.any(CircularArray)
f7510105 1105 }
5df69fab 1106 }
86bf340d 1107 })
465b2940
JB
1108 expect(workerNode.usage.runTime.history.length).toBe(0)
1109 expect(workerNode.usage.waitTime.history.length).toBe(0)
b97d82d8
JB
1110 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1111 expect(workerNode.usage.elu.active.history.length).toBe(0)
ee11a4a2 1112 }
fd7ebd49 1113 await pool.destroy()
ee11a4a2
JB
1114 })
1115
a1763c54
JB
1116 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1117 const pool = new DynamicClusterPool(
2431bdb4 1118 Math.floor(numberOfWorkers / 2),
164d950a 1119 numberOfWorkers,
a1763c54 1120 './tests/worker-files/cluster/testWorker.js'
164d950a 1121 )
c726f66c 1122 expect(pool.emitter.eventNames()).toStrictEqual([])
d46660cd 1123 let poolInfo
a1763c54 1124 let poolReady = 0
041dc05b 1125 pool.emitter.on(PoolEvents.ready, info => {
a1763c54 1126 ++poolReady
d46660cd
JB
1127 poolInfo = info
1128 })
a1763c54 1129 await waitPoolEvents(pool, PoolEvents.ready, 1)
c726f66c 1130 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
a1763c54 1131 expect(poolReady).toBe(1)
d46660cd 1132 expect(poolInfo).toStrictEqual({
23ccf9d7 1133 version,
d46660cd 1134 type: PoolTypes.dynamic,
a1763c54 1135 worker: WorkerTypes.cluster,
47352846 1136 started: true,
a1763c54 1137 ready: true,
2431bdb4
JB
1138 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1139 minSize: expect.any(Number),
1140 maxSize: expect.any(Number),
1141 workerNodes: expect.any(Number),
1142 idleWorkerNodes: expect.any(Number),
1143 busyWorkerNodes: expect.any(Number),
1144 executedTasks: expect.any(Number),
1145 executingTasks: expect.any(Number),
2431bdb4
JB
1146 failedTasks: expect.any(Number)
1147 })
1148 await pool.destroy()
1149 })
1150
a1763c54
JB
1151 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1152 const pool = new FixedThreadPool(
2431bdb4 1153 numberOfWorkers,
b2fd3f4a 1154 './tests/worker-files/thread/testWorker.mjs'
2431bdb4 1155 )
c726f66c 1156 expect(pool.emitter.eventNames()).toStrictEqual([])
a1763c54
JB
1157 const promises = new Set()
1158 let poolBusy = 0
2431bdb4 1159 let poolInfo
041dc05b 1160 pool.emitter.on(PoolEvents.busy, info => {
a1763c54 1161 ++poolBusy
2431bdb4
JB
1162 poolInfo = info
1163 })
c726f66c 1164 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
a1763c54
JB
1165 for (let i = 0; i < numberOfWorkers * 2; i++) {
1166 promises.add(pool.execute())
1167 }
1168 await Promise.all(promises)
1169 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1170 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1171 expect(poolBusy).toBe(numberOfWorkers + 1)
2431bdb4
JB
1172 expect(poolInfo).toStrictEqual({
1173 version,
a1763c54
JB
1174 type: PoolTypes.fixed,
1175 worker: WorkerTypes.thread,
47352846
JB
1176 started: true,
1177 ready: true,
2431bdb4 1178 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
d46660cd
JB
1179 minSize: expect.any(Number),
1180 maxSize: expect.any(Number),
1181 workerNodes: expect.any(Number),
1182 idleWorkerNodes: expect.any(Number),
1183 busyWorkerNodes: expect.any(Number),
a4e07f72
JB
1184 executedTasks: expect.any(Number),
1185 executingTasks: expect.any(Number),
a4e07f72 1186 failedTasks: expect.any(Number)
d46660cd 1187 })
164d950a
JB
1188 await pool.destroy()
1189 })
1190
a1763c54
JB
1191 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1192 const pool = new DynamicThreadPool(
1193 Math.floor(numberOfWorkers / 2),
7c0ba920 1194 numberOfWorkers,
b2fd3f4a 1195 './tests/worker-files/thread/testWorker.mjs'
7c0ba920 1196 )
c726f66c 1197 expect(pool.emitter.eventNames()).toStrictEqual([])
09c2d0d3 1198 const promises = new Set()
a1763c54 1199 let poolFull = 0
d46660cd 1200 let poolInfo
041dc05b 1201 pool.emitter.on(PoolEvents.full, info => {
a1763c54 1202 ++poolFull
d46660cd
JB
1203 poolInfo = info
1204 })
c726f66c 1205 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.full])
7c0ba920 1206 for (let i = 0; i < numberOfWorkers * 2; i++) {
f5d14e90 1207 promises.add(pool.execute())
7c0ba920 1208 }
cf597bc5 1209 await Promise.all(promises)
33e6bb4c 1210 expect(poolFull).toBe(1)
d46660cd 1211 expect(poolInfo).toStrictEqual({
23ccf9d7 1212 version,
a1763c54 1213 type: PoolTypes.dynamic,
d46660cd 1214 worker: WorkerTypes.thread,
47352846
JB
1215 started: true,
1216 ready: true,
2431bdb4 1217 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
8735b4e5
JB
1218 minSize: expect.any(Number),
1219 maxSize: expect.any(Number),
1220 workerNodes: expect.any(Number),
1221 idleWorkerNodes: expect.any(Number),
1222 busyWorkerNodes: expect.any(Number),
1223 executedTasks: expect.any(Number),
1224 executingTasks: expect.any(Number),
1225 failedTasks: expect.any(Number)
1226 })
1227 await pool.destroy()
1228 })
1229
3e8611a8 1230 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
b1aae695 1231 const pool = new FixedThreadPool(
8735b4e5 1232 numberOfWorkers,
b2fd3f4a 1233 './tests/worker-files/thread/testWorker.mjs',
8735b4e5
JB
1234 {
1235 enableTasksQueue: true
1236 }
1237 )
a074ffee 1238 stub(pool, 'hasBackPressure').returns(true)
c726f66c 1239 expect(pool.emitter.eventNames()).toStrictEqual([])
8735b4e5
JB
1240 const promises = new Set()
1241 let poolBackPressure = 0
1242 let poolInfo
041dc05b 1243 pool.emitter.on(PoolEvents.backPressure, info => {
8735b4e5
JB
1244 ++poolBackPressure
1245 poolInfo = info
1246 })
c726f66c 1247 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.backPressure])
033f1776 1248 for (let i = 0; i < numberOfWorkers + 1; i++) {
8735b4e5
JB
1249 promises.add(pool.execute())
1250 }
1251 await Promise.all(promises)
033f1776 1252 expect(poolBackPressure).toBe(1)
8735b4e5
JB
1253 expect(poolInfo).toStrictEqual({
1254 version,
3e8611a8 1255 type: PoolTypes.fixed,
8735b4e5 1256 worker: WorkerTypes.thread,
47352846
JB
1257 started: true,
1258 ready: true,
8735b4e5 1259 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
d46660cd
JB
1260 minSize: expect.any(Number),
1261 maxSize: expect.any(Number),
1262 workerNodes: expect.any(Number),
1263 idleWorkerNodes: expect.any(Number),
1264 busyWorkerNodes: expect.any(Number),
a4e07f72
JB
1265 executedTasks: expect.any(Number),
1266 executingTasks: expect.any(Number),
3e8611a8
JB
1267 maxQueuedTasks: expect.any(Number),
1268 queuedTasks: expect.any(Number),
1269 backPressure: true,
68cbdc84 1270 stolenTasks: expect.any(Number),
a4e07f72 1271 failedTasks: expect.any(Number)
d46660cd 1272 })
e458c82e 1273 expect(pool.hasBackPressure.callCount).toBe(5)
fd7ebd49 1274 await pool.destroy()
7c0ba920 1275 })
70a4f5ea 1276
85b2561d
JB
1277 it('Verify that destroy() waits for queued tasks to finish', async () => {
1278 const tasksFinishedTimeout = 2500
1279 const pool = new FixedThreadPool(
1280 numberOfWorkers,
1281 './tests/worker-files/thread/asyncWorker.mjs',
1282 {
1283 enableTasksQueue: true,
1284 tasksQueueOptions: { tasksFinishedTimeout }
1285 }
1286 )
1287 const maxMultiplier = 4
1288 let tasksFinished = 0
1289 for (const workerNode of pool.workerNodes) {
1290 workerNode.on('taskFinished', () => {
1291 ++tasksFinished
1292 })
1293 }
1294 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1295 pool.execute()
1296 }
1297 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1298 const startTime = performance.now()
1299 await pool.destroy()
1300 const elapsedTime = performance.now() - startTime
1301 expect(tasksFinished).toBe(numberOfWorkers * maxMultiplier)
1302 expect(elapsedTime).toBeGreaterThanOrEqual(2000)
6d41131f 1303 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 100)
85b2561d
JB
1304 })
1305
1306 it('Verify that destroy() waits until the tasks finished timeout is reached', async () => {
1307 const tasksFinishedTimeout = 1000
1308 const pool = new FixedThreadPool(
1309 numberOfWorkers,
1310 './tests/worker-files/thread/asyncWorker.mjs',
1311 {
1312 enableTasksQueue: true,
1313 tasksQueueOptions: { tasksFinishedTimeout }
1314 }
1315 )
1316 const maxMultiplier = 4
1317 let tasksFinished = 0
1318 for (const workerNode of pool.workerNodes) {
1319 workerNode.on('taskFinished', () => {
1320 ++tasksFinished
1321 })
1322 }
1323 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1324 pool.execute()
1325 }
1326 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1327 const startTime = performance.now()
1328 await pool.destroy()
1329 const elapsedTime = performance.now() - startTime
1330 expect(tasksFinished).toBe(0)
2885534c 1331 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 800)
85b2561d
JB
1332 })
1333
f18fd12b
JB
1334 it('Verify that pool asynchronous resource track tasks execution', async () => {
1335 let taskAsyncId
1336 let initCalls = 0
1337 let beforeCalls = 0
1338 let afterCalls = 0
1339 let resolveCalls = 0
1340 const hook = createHook({
1341 init (asyncId, type) {
1342 if (type === 'poolifier:task') {
1343 initCalls++
1344 taskAsyncId = asyncId
1345 }
1346 },
1347 before (asyncId) {
1348 if (asyncId === taskAsyncId) beforeCalls++
1349 },
1350 after (asyncId) {
1351 if (asyncId === taskAsyncId) afterCalls++
1352 },
1353 promiseResolve () {
1354 if (executionAsyncId() === taskAsyncId) resolveCalls++
1355 }
1356 })
f18fd12b
JB
1357 const pool = new FixedThreadPool(
1358 numberOfWorkers,
1359 './tests/worker-files/thread/testWorker.mjs'
1360 )
8954c0a3 1361 hook.enable()
f18fd12b
JB
1362 await pool.execute()
1363 hook.disable()
1364 expect(initCalls).toBe(1)
1365 expect(beforeCalls).toBe(1)
1366 expect(afterCalls).toBe(1)
1367 expect(resolveCalls).toBe(1)
8954c0a3 1368 await pool.destroy()
f18fd12b
JB
1369 })
1370
9eae3c69
JB
1371 it('Verify that hasTaskFunction() is working', async () => {
1372 const dynamicThreadPool = new DynamicThreadPool(
1373 Math.floor(numberOfWorkers / 2),
1374 numberOfWorkers,
b2fd3f4a 1375 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
9eae3c69
JB
1376 )
1377 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1378 expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1379 expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1380 true
1381 )
1382 expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true)
1383 expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true)
1384 expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false)
1385 await dynamicThreadPool.destroy()
1386 const fixedClusterPool = new FixedClusterPool(
1387 numberOfWorkers,
1388 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1389 )
1390 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1391 expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1392 expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1393 true
1394 )
1395 expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true)
1396 expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true)
1397 expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false)
1398 await fixedClusterPool.destroy()
1399 })
1400
1401 it('Verify that addTaskFunction() is working', async () => {
1402 const dynamicThreadPool = new DynamicThreadPool(
1403 Math.floor(numberOfWorkers / 2),
1404 numberOfWorkers,
b2fd3f4a 1405 './tests/worker-files/thread/testWorker.mjs'
9eae3c69
JB
1406 )
1407 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
3feeab69
JB
1408 await expect(
1409 dynamicThreadPool.addTaskFunction(0, () => {})
948faff7 1410 ).rejects.toThrow(new TypeError('name argument must be a string'))
3feeab69
JB
1411 await expect(
1412 dynamicThreadPool.addTaskFunction('', () => {})
948faff7 1413 ).rejects.toThrow(
3feeab69
JB
1414 new TypeError('name argument must not be an empty string')
1415 )
948faff7
JB
1416 await expect(dynamicThreadPool.addTaskFunction('test', 0)).rejects.toThrow(
1417 new TypeError('fn argument must be a function')
1418 )
1419 await expect(dynamicThreadPool.addTaskFunction('test', '')).rejects.toThrow(
1420 new TypeError('fn argument must be a function')
1421 )
9eae3c69
JB
1422 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1423 DEFAULT_TASK_NAME,
1424 'test'
1425 ])
1426 const echoTaskFunction = data => {
1427 return data
1428 }
1429 await expect(
1430 dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1431 ).resolves.toBe(true)
1432 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1433 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1434 echoTaskFunction
1435 )
1436 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1437 DEFAULT_TASK_NAME,
1438 'test',
1439 'echo'
1440 ])
1441 const taskFunctionData = { test: 'test' }
1442 const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
1443 expect(echoResult).toStrictEqual(taskFunctionData)
adee6053
JB
1444 for (const workerNode of dynamicThreadPool.workerNodes) {
1445 expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
1446 tasks: {
1447 executed: expect.any(Number),
1448 executing: 0,
1449 queued: 0,
463226a4 1450 sequentiallyStolen: 0,
5ad42e34 1451 stolen: 0,
adee6053
JB
1452 failed: 0
1453 },
1454 runTime: {
1455 history: new CircularArray()
1456 },
1457 waitTime: {
1458 history: new CircularArray()
1459 },
1460 elu: {
1461 idle: {
1462 history: new CircularArray()
1463 },
1464 active: {
1465 history: new CircularArray()
1466 }
1467 }
1468 })
1469 }
9eae3c69
JB
1470 await dynamicThreadPool.destroy()
1471 })
1472
1473 it('Verify that removeTaskFunction() is working', async () => {
1474 const dynamicThreadPool = new DynamicThreadPool(
1475 Math.floor(numberOfWorkers / 2),
1476 numberOfWorkers,
b2fd3f4a 1477 './tests/worker-files/thread/testWorker.mjs'
9eae3c69
JB
1478 )
1479 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1480 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1481 DEFAULT_TASK_NAME,
1482 'test'
1483 ])
948faff7 1484 await expect(dynamicThreadPool.removeTaskFunction('test')).rejects.toThrow(
16248b23 1485 new Error('Cannot remove a task function not handled on the pool side')
9eae3c69
JB
1486 )
1487 const echoTaskFunction = data => {
1488 return data
1489 }
1490 await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1491 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1492 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1493 echoTaskFunction
1494 )
1495 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1496 DEFAULT_TASK_NAME,
1497 'test',
1498 'echo'
1499 ])
1500 await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
1501 true
1502 )
1503 expect(dynamicThreadPool.taskFunctions.size).toBe(0)
1504 expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
1505 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1506 DEFAULT_TASK_NAME,
1507 'test'
1508 ])
1509 await dynamicThreadPool.destroy()
1510 })
1511
30500265 1512 it('Verify that listTaskFunctionNames() is working', async () => {
90d7d101
JB
1513 const dynamicThreadPool = new DynamicThreadPool(
1514 Math.floor(numberOfWorkers / 2),
1515 numberOfWorkers,
b2fd3f4a 1516 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
90d7d101
JB
1517 )
1518 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
66979634 1519 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
6cd5248f 1520 DEFAULT_TASK_NAME,
90d7d101
JB
1521 'jsonIntegerSerialization',
1522 'factorial',
1523 'fibonacci'
1524 ])
9eae3c69 1525 await dynamicThreadPool.destroy()
90d7d101
JB
1526 const fixedClusterPool = new FixedClusterPool(
1527 numberOfWorkers,
1528 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1529 )
1530 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
66979634 1531 expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([
6cd5248f 1532 DEFAULT_TASK_NAME,
90d7d101
JB
1533 'jsonIntegerSerialization',
1534 'factorial',
1535 'fibonacci'
1536 ])
0fe39c97 1537 await fixedClusterPool.destroy()
90d7d101
JB
1538 })
1539
9eae3c69 1540 it('Verify that setDefaultTaskFunction() is working', async () => {
30500265
JB
1541 const dynamicThreadPool = new DynamicThreadPool(
1542 Math.floor(numberOfWorkers / 2),
1543 numberOfWorkers,
b2fd3f4a 1544 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
30500265
JB
1545 )
1546 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
711623b8 1547 const workerId = dynamicThreadPool.workerNodes[0].info.id
948faff7 1548 await expect(dynamicThreadPool.setDefaultTaskFunction(0)).rejects.toThrow(
b0b55f57 1549 new Error(
711623b8 1550 `Task function operation 'default' failed on worker ${workerId} with error: 'TypeError: name parameter is not a string'`
b0b55f57
JB
1551 )
1552 )
1553 await expect(
1554 dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
948faff7 1555 ).rejects.toThrow(
b0b55f57 1556 new Error(
711623b8 1557 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function reserved name as the default task function'`
b0b55f57
JB
1558 )
1559 )
1560 await expect(
1561 dynamicThreadPool.setDefaultTaskFunction('unknown')
948faff7 1562 ).rejects.toThrow(
b0b55f57 1563 new Error(
711623b8 1564 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function to a non-existing task function'`
b0b55f57
JB
1565 )
1566 )
9eae3c69
JB
1567 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1568 DEFAULT_TASK_NAME,
1569 'jsonIntegerSerialization',
1570 'factorial',
1571 'fibonacci'
1572 ])
1573 await expect(
1574 dynamicThreadPool.setDefaultTaskFunction('factorial')
1575 ).resolves.toBe(true)
1576 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1577 DEFAULT_TASK_NAME,
1578 'factorial',
1579 'jsonIntegerSerialization',
1580 'fibonacci'
1581 ])
1582 await expect(
1583 dynamicThreadPool.setDefaultTaskFunction('fibonacci')
1584 ).resolves.toBe(true)
1585 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1586 DEFAULT_TASK_NAME,
1587 'fibonacci',
1588 'jsonIntegerSerialization',
1589 'factorial'
1590 ])
cda9ba34 1591 await dynamicThreadPool.destroy()
30500265
JB
1592 })
1593
90d7d101 1594 it('Verify that multiple task functions worker is working', async () => {
70a4f5ea 1595 const pool = new DynamicClusterPool(
2431bdb4 1596 Math.floor(numberOfWorkers / 2),
70a4f5ea 1597 numberOfWorkers,
90d7d101 1598 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
70a4f5ea
JB
1599 )
1600 const data = { n: 10 }
82888165 1601 const result0 = await pool.execute(data)
30b963d4 1602 expect(result0).toStrictEqual({ ok: 1 })
70a4f5ea 1603 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
30b963d4 1604 expect(result1).toStrictEqual({ ok: 1 })
70a4f5ea
JB
1605 const result2 = await pool.execute(data, 'factorial')
1606 expect(result2).toBe(3628800)
1607 const result3 = await pool.execute(data, 'fibonacci')
024daf59 1608 expect(result3).toBe(55)
5bb5be17
JB
1609 expect(pool.info.executingTasks).toBe(0)
1610 expect(pool.info.executedTasks).toBe(4)
b414b84c 1611 for (const workerNode of pool.workerNodes) {
66979634 1612 expect(workerNode.info.taskFunctionNames).toStrictEqual([
6cd5248f 1613 DEFAULT_TASK_NAME,
b414b84c
JB
1614 'jsonIntegerSerialization',
1615 'factorial',
1616 'fibonacci'
1617 ])
1618 expect(workerNode.taskFunctionsUsage.size).toBe(3)
66979634 1619 for (const name of pool.listTaskFunctionNames()) {
5bb5be17
JB
1620 expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
1621 tasks: {
1622 executed: expect.any(Number),
4ba4c7f9 1623 executing: 0,
5bb5be17 1624 failed: 0,
68cbdc84 1625 queued: 0,
463226a4 1626 sequentiallyStolen: 0,
68cbdc84 1627 stolen: 0
5bb5be17
JB
1628 },
1629 runTime: {
1630 history: expect.any(CircularArray)
1631 },
1632 waitTime: {
1633 history: expect.any(CircularArray)
1634 },
1635 elu: {
1636 idle: {
1637 history: expect.any(CircularArray)
1638 },
1639 active: {
1640 history: expect.any(CircularArray)
1641 }
1642 }
1643 })
1644 expect(
4ba4c7f9
JB
1645 workerNode.getTaskFunctionWorkerUsage(name).tasks.executed
1646 ).toBeGreaterThan(0)
5bb5be17 1647 }
dfd7ec01
JB
1648 expect(
1649 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1650 ).toStrictEqual(
66979634
JB
1651 workerNode.getTaskFunctionWorkerUsage(
1652 workerNode.info.taskFunctionNames[1]
1653 )
dfd7ec01 1654 )
5bb5be17 1655 }
0fe39c97 1656 await pool.destroy()
70a4f5ea 1657 })
52a23942
JB
1658
1659 it('Verify sendKillMessageToWorker()', async () => {
1660 const pool = new DynamicClusterPool(
1661 Math.floor(numberOfWorkers / 2),
1662 numberOfWorkers,
1663 './tests/worker-files/cluster/testWorker.js'
1664 )
1665 const workerNodeKey = 0
1666 await expect(
adee6053 1667 pool.sendKillMessageToWorker(workerNodeKey)
52a23942 1668 ).resolves.toBeUndefined()
85b2561d
JB
1669 await expect(
1670 pool.sendKillMessageToWorker(numberOfWorkers)
1671 ).rejects.toStrictEqual(
1672 new Error(`Invalid worker node key '${numberOfWorkers}'`)
1673 )
52a23942
JB
1674 await pool.destroy()
1675 })
adee6053
JB
1676
1677 it('Verify sendTaskFunctionOperationToWorker()', async () => {
1678 const pool = new DynamicClusterPool(
1679 Math.floor(numberOfWorkers / 2),
1680 numberOfWorkers,
1681 './tests/worker-files/cluster/testWorker.js'
1682 )
1683 const workerNodeKey = 0
1684 await expect(
1685 pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
1686 taskFunctionOperation: 'add',
1687 taskFunctionName: 'empty',
1688 taskFunction: (() => {}).toString()
1689 })
1690 ).resolves.toBe(true)
1691 expect(
1692 pool.workerNodes[workerNodeKey].info.taskFunctionNames
1693 ).toStrictEqual([DEFAULT_TASK_NAME, 'test', 'empty'])
1694 await pool.destroy()
1695 })
1696
1697 it('Verify sendTaskFunctionOperationToWorkers()', async () => {
1698 const pool = new DynamicClusterPool(
1699 Math.floor(numberOfWorkers / 2),
1700 numberOfWorkers,
1701 './tests/worker-files/cluster/testWorker.js'
1702 )
adee6053
JB
1703 await expect(
1704 pool.sendTaskFunctionOperationToWorkers({
1705 taskFunctionOperation: 'add',
1706 taskFunctionName: 'empty',
1707 taskFunction: (() => {}).toString()
1708 })
1709 ).resolves.toBe(true)
1710 for (const workerNode of pool.workerNodes) {
1711 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1712 DEFAULT_TASK_NAME,
1713 'test',
1714 'empty'
1715 ])
1716 }
1717 await pool.destroy()
1718 })
3ec964d6 1719})