fix: fix task stealing related options handling at runtime
[poolifier.git] / tests / pools / abstract / abstract-pool.test.js
CommitLineData
a449b585 1const { EventEmitter } = require('node:events')
a61a0724 2const { expect } = require('expect')
b1aae695 3const sinon = require('sinon')
e843b904 4const {
70a4f5ea 5 DynamicClusterPool,
9e619829 6 DynamicThreadPool,
aee46736 7 FixedClusterPool,
e843b904 8 FixedThreadPool,
aee46736 9 PoolEvents,
184855e6 10 PoolTypes,
3d6dd312 11 WorkerChoiceStrategies,
184855e6 12 WorkerTypes
cdace0e5 13} = require('../../../lib')
78099a15 14const { CircularArray } = require('../../../lib/circular-array')
574b351d 15const { Deque } = require('../../../lib/deque')
6cd5248f 16const { DEFAULT_TASK_NAME } = require('../../../lib/utils')
23ccf9d7 17const { version } = require('../../../package.json')
2431bdb4 18const { waitPoolEvents } = require('../../test-utils')
47352846 19const { WorkerNode } = require('../../../lib/pools/worker-node')
e1ffb94f
JB
20
21describe('Abstract pool test suite', () => {
fc027381 22 const numberOfWorkers = 2
a8884ffd 23 class StubPoolWithIsMain extends FixedThreadPool {
e1ffb94f
JB
24 isMain () {
25 return false
26 }
3ec964d6 27 }
3ec964d6 28
dc021bcc
JB
29 afterEach(() => {
30 sinon.restore()
31 })
32
3ec964d6 33 it('Simulate pool creation from a non main thread/process', () => {
8d3782fa
JB
34 expect(
35 () =>
a8884ffd 36 new StubPoolWithIsMain(
7c0ba920 37 numberOfWorkers,
8d3782fa
JB
38 './tests/worker-files/thread/testWorker.js',
39 {
041dc05b 40 errorHandler: e => console.error(e)
8d3782fa
JB
41 }
42 )
04f45163 43 ).toThrowError(
e695d66f
JB
44 new Error(
45 'Cannot start a pool from a worker with the same type as the pool'
46 )
04f45163 47 )
3ec964d6 48 })
c510fea7 49
bc61cfe6
JB
50 it('Verify that pool statuses properties are set', async () => {
51 const pool = new FixedThreadPool(
52 numberOfWorkers,
53 './tests/worker-files/thread/testWorker.js'
54 )
55 expect(pool.starting).toBe(false)
56 expect(pool.started).toBe(true)
57 await pool.destroy()
bc61cfe6
JB
58 })
59
c510fea7 60 it('Verify that filePath is checked', () => {
292ad316
JB
61 const expectedError = new Error(
62 'Please specify a file with a worker implementation'
63 )
7c0ba920 64 expect(() => new FixedThreadPool(numberOfWorkers)).toThrowError(
292ad316 65 expectedError
8d3782fa 66 )
7c0ba920 67 expect(() => new FixedThreadPool(numberOfWorkers, '')).toThrowError(
292ad316 68 expectedError
8d3782fa 69 )
3d6dd312
JB
70 expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrowError(
71 expectedError
72 )
73 expect(() => new FixedThreadPool(numberOfWorkers, true)).toThrowError(
74 expectedError
75 )
76 expect(
77 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
78 ).toThrowError(new Error("Cannot find the worker file './dummyWorker.ts'"))
8d3782fa
JB
79 })
80
81 it('Verify that numberOfWorkers is checked', () => {
82 expect(() => new FixedThreadPool()).toThrowError(
e695d66f
JB
83 new Error(
84 'Cannot instantiate a pool without specifying the number of workers'
85 )
8d3782fa
JB
86 )
87 })
88
89 it('Verify that a negative number of workers is checked', () => {
90 expect(
91 () =>
92 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
93 ).toThrowError(
473c717a
JB
94 new RangeError(
95 'Cannot instantiate a pool with a negative number of workers'
96 )
8d3782fa
JB
97 )
98 })
99
100 it('Verify that a non integer number of workers is checked', () => {
101 expect(
102 () =>
103 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
104 ).toThrowError(
473c717a 105 new TypeError(
0d80593b 106 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
107 )
108 )
c510fea7 109 })
7c0ba920 110
216541b6 111 it('Verify that dynamic pool sizing is checked', () => {
a5ed75b7
JB
112 expect(
113 () =>
114 new DynamicClusterPool(
115 1,
116 undefined,
117 './tests/worker-files/cluster/testWorker.js'
118 )
119 ).toThrowError(
120 new TypeError(
121 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
122 )
123 )
2761efb4
JB
124 expect(
125 () =>
126 new DynamicThreadPool(
127 0.5,
128 1,
129 './tests/worker-files/thread/testWorker.js'
130 )
131 ).toThrowError(
132 new TypeError(
133 'Cannot instantiate a pool with a non safe integer number of workers'
134 )
135 )
136 expect(
137 () =>
138 new DynamicClusterPool(
139 0,
140 0.5,
a5ed75b7 141 './tests/worker-files/cluster/testWorker.js'
2761efb4
JB
142 )
143 ).toThrowError(
144 new TypeError(
145 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
146 )
147 )
2431bdb4
JB
148 expect(
149 () =>
150 new DynamicThreadPool(2, 1, './tests/worker-files/thread/testWorker.js')
151 ).toThrowError(
152 new RangeError(
153 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
154 )
155 )
156 expect(
157 () =>
213cbac6 158 new DynamicThreadPool(0, 0, './tests/worker-files/thread/testWorker.js')
2431bdb4
JB
159 ).toThrowError(
160 new RangeError(
213cbac6 161 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
2431bdb4
JB
162 )
163 )
21f710aa
JB
164 expect(
165 () =>
213cbac6
JB
166 new DynamicClusterPool(
167 1,
168 1,
169 './tests/worker-files/cluster/testWorker.js'
170 )
21f710aa
JB
171 ).toThrowError(
172 new RangeError(
213cbac6 173 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
21f710aa
JB
174 )
175 )
2431bdb4
JB
176 })
177
fd7ebd49 178 it('Verify that pool options are checked', async () => {
7c0ba920
JB
179 let pool = new FixedThreadPool(
180 numberOfWorkers,
181 './tests/worker-files/thread/testWorker.js'
182 )
cca3bb1a 183 expect(pool.emitter).toBeInstanceOf(EventEmitter)
47352846
JB
184 expect(pool.opts).toStrictEqual({
185 startWorkers: true,
186 enableEvents: true,
187 restartWorkerOnError: true,
188 enableTasksQueue: false,
189 workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
190 workerChoiceStrategyOptions: {
191 retries: 6,
192 runTime: { median: false },
193 waitTime: { median: false },
194 elu: { median: false }
195 }
8990357d
JB
196 })
197 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
8c0b113f 198 retries: 6,
932fc8be 199 runTime: { median: false },
5df69fab
JB
200 waitTime: { median: false },
201 elu: { median: false }
da309861 202 })
999ef664
JB
203 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
204 .workerChoiceStrategies) {
205 expect(workerChoiceStrategy.opts).toStrictEqual({
206 retries: 6,
207 runTime: { median: false },
208 waitTime: { median: false },
209 elu: { median: false }
210 })
211 }
fd7ebd49 212 await pool.destroy()
73bfd59d 213 const testHandler = () => console.info('test handler executed')
7c0ba920
JB
214 pool = new FixedThreadPool(
215 numberOfWorkers,
216 './tests/worker-files/thread/testWorker.js',
217 {
e4543b14 218 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
49be33fe 219 workerChoiceStrategyOptions: {
932fc8be 220 runTime: { median: true },
fc027381 221 weights: { 0: 300, 1: 200 }
49be33fe 222 },
35cf1c03 223 enableEvents: false,
1f68cede 224 restartWorkerOnError: false,
ff733df7 225 enableTasksQueue: true,
d4aeae5a 226 tasksQueueOptions: { concurrency: 2 },
35cf1c03
JB
227 messageHandler: testHandler,
228 errorHandler: testHandler,
229 onlineHandler: testHandler,
230 exitHandler: testHandler
7c0ba920
JB
231 }
232 )
7c0ba920 233 expect(pool.emitter).toBeUndefined()
47352846
JB
234 expect(pool.opts).toStrictEqual({
235 startWorkers: true,
236 enableEvents: false,
237 restartWorkerOnError: false,
238 enableTasksQueue: true,
239 tasksQueueOptions: {
240 concurrency: 2,
241 size: 4,
dbd73092 242 taskStealing: true,
47352846
JB
243 tasksStealingOnBackPressure: true
244 },
245 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
246 workerChoiceStrategyOptions: {
247 retries: 6,
248 runTime: { median: true },
249 waitTime: { median: false },
250 elu: { median: false },
251 weights: { 0: 300, 1: 200 }
252 },
253 onlineHandler: testHandler,
254 messageHandler: testHandler,
255 errorHandler: testHandler,
256 exitHandler: testHandler
8990357d
JB
257 })
258 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
8c0b113f 259 retries: 6,
8990357d
JB
260 runTime: { median: true },
261 waitTime: { median: false },
262 elu: { median: false },
fc027381 263 weights: { 0: 300, 1: 200 }
da309861 264 })
999ef664
JB
265 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
266 .workerChoiceStrategies) {
267 expect(workerChoiceStrategy.opts).toStrictEqual({
268 retries: 6,
269 runTime: { median: true },
270 waitTime: { median: false },
271 elu: { median: false },
272 weights: { 0: 300, 1: 200 }
273 })
274 }
fd7ebd49 275 await pool.destroy()
7c0ba920
JB
276 })
277
a20f0ba5 278 it('Verify that pool options are validated', async () => {
d4aeae5a
JB
279 expect(
280 () =>
281 new FixedThreadPool(
282 numberOfWorkers,
283 './tests/worker-files/thread/testWorker.js',
284 {
f0d7f803 285 workerChoiceStrategy: 'invalidStrategy'
d4aeae5a
JB
286 }
287 )
8735b4e5
JB
288 ).toThrowError(
289 new Error("Invalid worker choice strategy 'invalidStrategy'")
290 )
7e653ee0
JB
291 expect(
292 () =>
293 new FixedThreadPool(
294 numberOfWorkers,
295 './tests/worker-files/thread/testWorker.js',
296 {
297 workerChoiceStrategyOptions: {
8c0b113f 298 retries: 'invalidChoiceRetries'
7e653ee0
JB
299 }
300 }
301 )
302 ).toThrowError(
303 new TypeError(
8c0b113f 304 'Invalid worker choice strategy options: retries must be an integer'
7e653ee0
JB
305 )
306 )
307 expect(
308 () =>
309 new FixedThreadPool(
310 numberOfWorkers,
311 './tests/worker-files/thread/testWorker.js',
312 {
313 workerChoiceStrategyOptions: {
8c0b113f 314 retries: -1
7e653ee0
JB
315 }
316 }
317 )
318 ).toThrowError(
319 new RangeError(
8c0b113f 320 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
7e653ee0
JB
321 )
322 )
49be33fe
JB
323 expect(
324 () =>
325 new FixedThreadPool(
326 numberOfWorkers,
327 './tests/worker-files/thread/testWorker.js',
328 {
329 workerChoiceStrategyOptions: { weights: {} }
330 }
331 )
332 ).toThrowError(
8735b4e5
JB
333 new Error(
334 'Invalid worker choice strategy options: must have a weight for each worker node'
335 )
49be33fe 336 )
f0d7f803
JB
337 expect(
338 () =>
339 new FixedThreadPool(
340 numberOfWorkers,
341 './tests/worker-files/thread/testWorker.js',
342 {
343 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
344 }
345 )
346 ).toThrowError(
8735b4e5
JB
347 new Error(
348 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
349 )
f0d7f803 350 )
5c4c2dee
JB
351 expect(
352 () =>
353 new FixedThreadPool(
354 numberOfWorkers,
355 './tests/worker-files/thread/testWorker.js',
356 {
357 enableTasksQueue: true,
358 tasksQueueOptions: 'invalidTasksQueueOptions'
359 }
360 )
361 ).toThrowError(
362 new TypeError('Invalid tasks queue options: must be a plain object')
363 )
f0d7f803
JB
364 expect(
365 () =>
366 new FixedThreadPool(
367 numberOfWorkers,
368 './tests/worker-files/thread/testWorker.js',
369 {
370 enableTasksQueue: true,
371 tasksQueueOptions: { concurrency: 0 }
372 }
373 )
8735b4e5 374 ).toThrowError(
e695d66f 375 new RangeError(
20c6f652 376 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
8735b4e5
JB
377 )
378 )
f0d7f803
JB
379 expect(
380 () =>
381 new FixedThreadPool(
382 numberOfWorkers,
383 './tests/worker-files/thread/testWorker.js',
384 {
385 enableTasksQueue: true,
5c4c2dee 386 tasksQueueOptions: { concurrency: -1 }
f0d7f803
JB
387 }
388 )
8735b4e5 389 ).toThrowError(
5c4c2dee
JB
390 new RangeError(
391 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
392 )
8735b4e5 393 )
f0d7f803
JB
394 expect(
395 () =>
396 new FixedThreadPool(
397 numberOfWorkers,
398 './tests/worker-files/thread/testWorker.js',
399 {
400 enableTasksQueue: true,
401 tasksQueueOptions: { concurrency: 0.2 }
402 }
403 )
8735b4e5 404 ).toThrowError(
20c6f652 405 new TypeError('Invalid worker node tasks concurrency: must be an integer')
8735b4e5 406 )
5c4c2dee
JB
407 expect(
408 () =>
409 new FixedThreadPool(
410 numberOfWorkers,
411 './tests/worker-files/thread/testWorker.js',
412 {
413 enableTasksQueue: true,
414 tasksQueueOptions: { size: 0 }
415 }
416 )
417 ).toThrowError(
418 new RangeError(
419 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
420 )
421 )
422 expect(
423 () =>
424 new FixedThreadPool(
425 numberOfWorkers,
426 './tests/worker-files/thread/testWorker.js',
427 {
428 enableTasksQueue: true,
429 tasksQueueOptions: { size: -1 }
430 }
431 )
432 ).toThrowError(
433 new RangeError(
434 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
435 )
436 )
437 expect(
438 () =>
439 new FixedThreadPool(
440 numberOfWorkers,
441 './tests/worker-files/thread/testWorker.js',
442 {
443 enableTasksQueue: true,
444 tasksQueueOptions: { size: 0.2 }
445 }
446 )
447 ).toThrowError(
448 new TypeError('Invalid worker node tasks queue size: must be an integer')
449 )
d4aeae5a
JB
450 })
451
2431bdb4 452 it('Verify that pool worker choice strategy options can be set', async () => {
a20f0ba5
JB
453 const pool = new FixedThreadPool(
454 numberOfWorkers,
455 './tests/worker-files/thread/testWorker.js',
456 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
457 )
458 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
8c0b113f 459 retries: 6,
8990357d
JB
460 runTime: { median: false },
461 waitTime: { median: false },
462 elu: { median: false }
463 })
464 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
8c0b113f 465 retries: 6,
932fc8be 466 runTime: { median: false },
5df69fab
JB
467 waitTime: { median: false },
468 elu: { median: false }
a20f0ba5
JB
469 })
470 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
471 .workerChoiceStrategies) {
86bf340d 472 expect(workerChoiceStrategy.opts).toStrictEqual({
8c0b113f 473 retries: 6,
932fc8be 474 runTime: { median: false },
5df69fab
JB
475 waitTime: { median: false },
476 elu: { median: false }
86bf340d 477 })
a20f0ba5 478 }
87de9ff5
JB
479 expect(
480 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
481 ).toStrictEqual({
932fc8be
JB
482 runTime: {
483 aggregate: true,
484 average: true,
485 median: false
486 },
487 waitTime: {
488 aggregate: false,
489 average: false,
490 median: false
491 },
5df69fab 492 elu: {
9adcefab
JB
493 aggregate: true,
494 average: true,
5df69fab
JB
495 median: false
496 }
86bf340d 497 })
9adcefab
JB
498 pool.setWorkerChoiceStrategyOptions({
499 runTime: { median: true },
500 elu: { median: true }
501 })
a20f0ba5 502 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
8c0b113f 503 retries: 6,
9adcefab 504 runTime: { median: true },
8990357d
JB
505 waitTime: { median: false },
506 elu: { median: true }
507 })
508 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
8c0b113f 509 retries: 6,
8990357d
JB
510 runTime: { median: true },
511 waitTime: { median: false },
9adcefab 512 elu: { median: true }
a20f0ba5
JB
513 })
514 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
515 .workerChoiceStrategies) {
932fc8be 516 expect(workerChoiceStrategy.opts).toStrictEqual({
8c0b113f 517 retries: 6,
9adcefab 518 runTime: { median: true },
8990357d 519 waitTime: { median: false },
9adcefab 520 elu: { median: true }
932fc8be 521 })
a20f0ba5 522 }
87de9ff5
JB
523 expect(
524 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
525 ).toStrictEqual({
932fc8be
JB
526 runTime: {
527 aggregate: true,
528 average: false,
529 median: true
530 },
531 waitTime: {
532 aggregate: false,
533 average: false,
534 median: false
535 },
5df69fab 536 elu: {
9adcefab 537 aggregate: true,
5df69fab 538 average: false,
9adcefab 539 median: true
5df69fab 540 }
86bf340d 541 })
9adcefab
JB
542 pool.setWorkerChoiceStrategyOptions({
543 runTime: { median: false },
544 elu: { median: false }
545 })
a20f0ba5 546 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
8c0b113f 547 retries: 6,
8990357d
JB
548 runTime: { median: false },
549 waitTime: { median: false },
550 elu: { median: false }
551 })
552 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
8c0b113f 553 retries: 6,
9adcefab 554 runTime: { median: false },
8990357d 555 waitTime: { median: false },
9adcefab 556 elu: { median: false }
a20f0ba5
JB
557 })
558 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
559 .workerChoiceStrategies) {
932fc8be 560 expect(workerChoiceStrategy.opts).toStrictEqual({
8c0b113f 561 retries: 6,
9adcefab 562 runTime: { median: false },
8990357d 563 waitTime: { median: false },
9adcefab 564 elu: { median: false }
932fc8be 565 })
a20f0ba5 566 }
87de9ff5
JB
567 expect(
568 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
569 ).toStrictEqual({
932fc8be
JB
570 runTime: {
571 aggregate: true,
572 average: true,
573 median: false
574 },
575 waitTime: {
576 aggregate: false,
577 average: false,
578 median: false
579 },
5df69fab 580 elu: {
9adcefab
JB
581 aggregate: true,
582 average: true,
5df69fab
JB
583 median: false
584 }
86bf340d 585 })
1f95d544
JB
586 expect(() =>
587 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
588 ).toThrowError(
8735b4e5
JB
589 new TypeError(
590 'Invalid worker choice strategy options: must be a plain object'
591 )
1f95d544 592 )
7e653ee0
JB
593 expect(() =>
594 pool.setWorkerChoiceStrategyOptions({
8c0b113f 595 retries: 'invalidChoiceRetries'
7e653ee0
JB
596 })
597 ).toThrowError(
598 new TypeError(
8c0b113f 599 'Invalid worker choice strategy options: retries must be an integer'
7e653ee0
JB
600 )
601 )
602 expect(() =>
8c0b113f 603 pool.setWorkerChoiceStrategyOptions({ retries: -1 })
7e653ee0
JB
604 ).toThrowError(
605 new RangeError(
8c0b113f 606 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
7e653ee0
JB
607 )
608 )
1f95d544
JB
609 expect(() =>
610 pool.setWorkerChoiceStrategyOptions({ weights: {} })
611 ).toThrowError(
8735b4e5
JB
612 new Error(
613 'Invalid worker choice strategy options: must have a weight for each worker node'
614 )
1f95d544
JB
615 )
616 expect(() =>
617 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
618 ).toThrowError(
8735b4e5
JB
619 new Error(
620 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
621 )
1f95d544 622 )
a20f0ba5
JB
623 await pool.destroy()
624 })
625
2431bdb4 626 it('Verify that pool tasks queue can be enabled/disabled', async () => {
a20f0ba5
JB
627 const pool = new FixedThreadPool(
628 numberOfWorkers,
629 './tests/worker-files/thread/testWorker.js'
630 )
631 expect(pool.opts.enableTasksQueue).toBe(false)
632 expect(pool.opts.tasksQueueOptions).toBeUndefined()
d6ca1416
JB
633 for (const workerNode of pool.workerNodes) {
634 expect(workerNode.onEmptyQueue).toBeUndefined()
635 expect(workerNode.onBackPressure).toBeUndefined()
636 }
a20f0ba5
JB
637 pool.enableTasksQueue(true)
638 expect(pool.opts.enableTasksQueue).toBe(true)
20c6f652
JB
639 expect(pool.opts.tasksQueueOptions).toStrictEqual({
640 concurrency: 1,
47352846 641 size: 4,
dbd73092 642 taskStealing: true,
47352846 643 tasksStealingOnBackPressure: true
20c6f652 644 })
d6ca1416
JB
645 for (const workerNode of pool.workerNodes) {
646 expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
647 expect(workerNode.onBackPressure).toBeInstanceOf(Function)
648 }
a20f0ba5
JB
649 pool.enableTasksQueue(true, { concurrency: 2 })
650 expect(pool.opts.enableTasksQueue).toBe(true)
20c6f652
JB
651 expect(pool.opts.tasksQueueOptions).toStrictEqual({
652 concurrency: 2,
47352846 653 size: 4,
dbd73092 654 taskStealing: true,
47352846 655 tasksStealingOnBackPressure: true
20c6f652 656 })
d6ca1416
JB
657 for (const workerNode of pool.workerNodes) {
658 expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
659 expect(workerNode.onBackPressure).toBeInstanceOf(Function)
660 }
a20f0ba5
JB
661 pool.enableTasksQueue(false)
662 expect(pool.opts.enableTasksQueue).toBe(false)
663 expect(pool.opts.tasksQueueOptions).toBeUndefined()
d6ca1416
JB
664 for (const workerNode of pool.workerNodes) {
665 expect(workerNode.onEmptyQueue).toBeUndefined()
666 expect(workerNode.onBackPressure).toBeUndefined()
667 }
a20f0ba5
JB
668 await pool.destroy()
669 })
670
2431bdb4 671 it('Verify that pool tasks queue options can be set', async () => {
a20f0ba5
JB
672 const pool = new FixedThreadPool(
673 numberOfWorkers,
674 './tests/worker-files/thread/testWorker.js',
675 { enableTasksQueue: true }
676 )
20c6f652
JB
677 expect(pool.opts.tasksQueueOptions).toStrictEqual({
678 concurrency: 1,
47352846 679 size: 4,
dbd73092 680 taskStealing: true,
47352846 681 tasksStealingOnBackPressure: true
20c6f652 682 })
d6ca1416
JB
683 for (const workerNode of pool.workerNodes) {
684 expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
685 expect(workerNode.onBackPressure).toBeInstanceOf(Function)
686 }
687 pool.setTasksQueueOptions({
688 concurrency: 2,
689 taskStealing: false,
690 tasksStealingOnBackPressure: false
691 })
20c6f652
JB
692 expect(pool.opts.tasksQueueOptions).toStrictEqual({
693 concurrency: 2,
47352846 694 size: 4,
d6ca1416
JB
695 taskStealing: false,
696 tasksStealingOnBackPressure: false
697 })
698 for (const workerNode of pool.workerNodes) {
699 expect(workerNode.onEmptyQueue).toBeUndefined()
700 expect(workerNode.onBackPressure).toBeUndefined()
701 }
702 pool.setTasksQueueOptions({
703 concurrency: 1,
704 taskStealing: true,
705 tasksStealingOnBackPressure: true
706 })
707 expect(pool.opts.tasksQueueOptions).toStrictEqual({
708 concurrency: 1,
709 size: 4,
dbd73092 710 taskStealing: true,
47352846 711 tasksStealingOnBackPressure: true
20c6f652 712 })
d6ca1416
JB
713 for (const workerNode of pool.workerNodes) {
714 expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
715 expect(workerNode.onBackPressure).toBeInstanceOf(Function)
716 }
f0d7f803
JB
717 expect(() =>
718 pool.setTasksQueueOptions('invalidTasksQueueOptions')
8735b4e5
JB
719 ).toThrowError(
720 new TypeError('Invalid tasks queue options: must be a plain object')
721 )
a20f0ba5 722 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrowError(
e695d66f 723 new RangeError(
20c6f652 724 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
8735b4e5
JB
725 )
726 )
727 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrowError(
e695d66f 728 new RangeError(
20c6f652 729 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
8735b4e5 730 )
a20f0ba5 731 )
f0d7f803 732 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrowError(
20c6f652
JB
733 new TypeError('Invalid worker node tasks concurrency: must be an integer')
734 )
ff3f866a 735 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrowError(
20c6f652 736 new RangeError(
68dbcdc0 737 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
20c6f652
JB
738 )
739 )
ff3f866a 740 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrowError(
20c6f652 741 new RangeError(
68dbcdc0 742 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
20c6f652
JB
743 )
744 )
ff3f866a 745 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrowError(
68dbcdc0 746 new TypeError('Invalid worker node tasks queue size: must be an integer')
f0d7f803 747 )
a20f0ba5
JB
748 await pool.destroy()
749 })
750
6b27d407
JB
751 it('Verify that pool info is set', async () => {
752 let pool = new FixedThreadPool(
753 numberOfWorkers,
754 './tests/worker-files/thread/testWorker.js'
755 )
2dca6cab
JB
756 expect(pool.info).toStrictEqual({
757 version,
758 type: PoolTypes.fixed,
759 worker: WorkerTypes.thread,
47352846 760 started: true,
2dca6cab
JB
761 ready: true,
762 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
763 minSize: numberOfWorkers,
764 maxSize: numberOfWorkers,
765 workerNodes: numberOfWorkers,
766 idleWorkerNodes: numberOfWorkers,
767 busyWorkerNodes: 0,
768 executedTasks: 0,
769 executingTasks: 0,
2dca6cab
JB
770 failedTasks: 0
771 })
6b27d407
JB
772 await pool.destroy()
773 pool = new DynamicClusterPool(
2431bdb4 774 Math.floor(numberOfWorkers / 2),
6b27d407 775 numberOfWorkers,
ecdfbdc0 776 './tests/worker-files/cluster/testWorker.js'
6b27d407 777 )
2dca6cab
JB
778 expect(pool.info).toStrictEqual({
779 version,
780 type: PoolTypes.dynamic,
781 worker: WorkerTypes.cluster,
47352846 782 started: true,
2dca6cab
JB
783 ready: true,
784 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
785 minSize: Math.floor(numberOfWorkers / 2),
786 maxSize: numberOfWorkers,
787 workerNodes: Math.floor(numberOfWorkers / 2),
788 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
789 busyWorkerNodes: 0,
790 executedTasks: 0,
791 executingTasks: 0,
2dca6cab
JB
792 failedTasks: 0
793 })
6b27d407
JB
794 await pool.destroy()
795 })
796
2431bdb4 797 it('Verify that pool worker tasks usage are initialized', async () => {
bf9549ae
JB
798 const pool = new FixedClusterPool(
799 numberOfWorkers,
800 './tests/worker-files/cluster/testWorker.js'
801 )
f06e48d8 802 for (const workerNode of pool.workerNodes) {
47352846 803 expect(workerNode).toBeInstanceOf(WorkerNode)
465b2940 804 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
805 tasks: {
806 executed: 0,
807 executing: 0,
808 queued: 0,
df593701 809 maxQueued: 0,
68cbdc84 810 stolen: 0,
a4e07f72
JB
811 failed: 0
812 },
813 runTime: {
4ba4c7f9 814 history: new CircularArray()
a4e07f72
JB
815 },
816 waitTime: {
4ba4c7f9 817 history: new CircularArray()
a4e07f72 818 },
5df69fab
JB
819 elu: {
820 idle: {
4ba4c7f9 821 history: new CircularArray()
5df69fab
JB
822 },
823 active: {
4ba4c7f9 824 history: new CircularArray()
f7510105 825 }
5df69fab 826 }
86bf340d 827 })
f06e48d8
JB
828 }
829 await pool.destroy()
830 })
831
2431bdb4
JB
832 it('Verify that pool worker tasks queue are initialized', async () => {
833 let pool = new FixedClusterPool(
f06e48d8
JB
834 numberOfWorkers,
835 './tests/worker-files/cluster/testWorker.js'
836 )
837 for (const workerNode of pool.workerNodes) {
47352846 838 expect(workerNode).toBeInstanceOf(WorkerNode)
574b351d 839 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
4d8bf9e4 840 expect(workerNode.tasksQueue.size).toBe(0)
9c16fb4b 841 expect(workerNode.tasksQueue.maxSize).toBe(0)
bf9549ae 842 }
fd7ebd49 843 await pool.destroy()
2431bdb4
JB
844 pool = new DynamicThreadPool(
845 Math.floor(numberOfWorkers / 2),
846 numberOfWorkers,
847 './tests/worker-files/thread/testWorker.js'
848 )
849 for (const workerNode of pool.workerNodes) {
47352846 850 expect(workerNode).toBeInstanceOf(WorkerNode)
574b351d 851 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
2431bdb4
JB
852 expect(workerNode.tasksQueue.size).toBe(0)
853 expect(workerNode.tasksQueue.maxSize).toBe(0)
854 }
213cbac6 855 await pool.destroy()
2431bdb4
JB
856 })
857
858 it('Verify that pool worker info are initialized', async () => {
859 let pool = new FixedClusterPool(
860 numberOfWorkers,
861 './tests/worker-files/cluster/testWorker.js'
862 )
2dca6cab 863 for (const workerNode of pool.workerNodes) {
47352846 864 expect(workerNode).toBeInstanceOf(WorkerNode)
2dca6cab
JB
865 expect(workerNode.info).toStrictEqual({
866 id: expect.any(Number),
867 type: WorkerTypes.cluster,
868 dynamic: false,
869 ready: true
870 })
871 }
2431bdb4
JB
872 await pool.destroy()
873 pool = new DynamicThreadPool(
874 Math.floor(numberOfWorkers / 2),
875 numberOfWorkers,
876 './tests/worker-files/thread/testWorker.js'
877 )
2dca6cab 878 for (const workerNode of pool.workerNodes) {
47352846 879 expect(workerNode).toBeInstanceOf(WorkerNode)
2dca6cab
JB
880 expect(workerNode.info).toStrictEqual({
881 id: expect.any(Number),
882 type: WorkerTypes.thread,
883 dynamic: false,
7884d183 884 ready: true
2dca6cab
JB
885 })
886 }
213cbac6 887 await pool.destroy()
bf9549ae
JB
888 })
889
47352846
JB
890 it('Verify that pool can be started after initialization', async () => {
891 const pool = new FixedClusterPool(
892 numberOfWorkers,
893 './tests/worker-files/cluster/testWorker.js',
894 {
895 startWorkers: false
896 }
897 )
898 expect(pool.info.started).toBe(false)
899 expect(pool.info.ready).toBe(false)
900 expect(pool.workerNodes).toStrictEqual([])
901 await expect(pool.execute()).rejects.toThrowError(
902 new Error('Cannot execute a task on not started pool')
903 )
904 pool.start()
905 expect(pool.info.started).toBe(true)
906 expect(pool.info.ready).toBe(true)
907 expect(pool.workerNodes.length).toBe(numberOfWorkers)
908 for (const workerNode of pool.workerNodes) {
909 expect(workerNode).toBeInstanceOf(WorkerNode)
910 }
911 await pool.destroy()
912 })
913
9d2d0da1
JB
914 it('Verify that pool execute() arguments are checked', async () => {
915 const pool = new FixedClusterPool(
916 numberOfWorkers,
917 './tests/worker-files/cluster/testWorker.js'
918 )
919 await expect(pool.execute(undefined, 0)).rejects.toThrowError(
920 new TypeError('name argument must be a string')
921 )
922 await expect(pool.execute(undefined, '')).rejects.toThrowError(
923 new TypeError('name argument must not be an empty string')
924 )
925 await expect(pool.execute(undefined, undefined, {})).rejects.toThrowError(
926 new TypeError('transferList argument must be an array')
927 )
928 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
929 "Task function 'unknown' not found"
930 )
931 await pool.destroy()
47352846
JB
932 await expect(pool.execute()).rejects.toThrowError(
933 new Error('Cannot execute a task on not started pool')
9d2d0da1
JB
934 )
935 })
936
2431bdb4 937 it('Verify that pool worker tasks usage are computed', async () => {
bf9549ae
JB
938 const pool = new FixedClusterPool(
939 numberOfWorkers,
940 './tests/worker-files/cluster/testWorker.js'
941 )
09c2d0d3 942 const promises = new Set()
fc027381
JB
943 const maxMultiplier = 2
944 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
09c2d0d3 945 promises.add(pool.execute())
bf9549ae 946 }
f06e48d8 947 for (const workerNode of pool.workerNodes) {
465b2940 948 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
949 tasks: {
950 executed: 0,
951 executing: maxMultiplier,
952 queued: 0,
df593701 953 maxQueued: 0,
68cbdc84 954 stolen: 0,
a4e07f72
JB
955 failed: 0
956 },
957 runTime: {
a4e07f72
JB
958 history: expect.any(CircularArray)
959 },
960 waitTime: {
a4e07f72
JB
961 history: expect.any(CircularArray)
962 },
5df69fab
JB
963 elu: {
964 idle: {
5df69fab
JB
965 history: expect.any(CircularArray)
966 },
967 active: {
5df69fab 968 history: expect.any(CircularArray)
f7510105 969 }
5df69fab 970 }
86bf340d 971 })
bf9549ae
JB
972 }
973 await Promise.all(promises)
f06e48d8 974 for (const workerNode of pool.workerNodes) {
465b2940 975 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
976 tasks: {
977 executed: maxMultiplier,
978 executing: 0,
979 queued: 0,
df593701 980 maxQueued: 0,
68cbdc84 981 stolen: 0,
a4e07f72
JB
982 failed: 0
983 },
984 runTime: {
a4e07f72
JB
985 history: expect.any(CircularArray)
986 },
987 waitTime: {
a4e07f72
JB
988 history: expect.any(CircularArray)
989 },
5df69fab
JB
990 elu: {
991 idle: {
5df69fab
JB
992 history: expect.any(CircularArray)
993 },
994 active: {
5df69fab 995 history: expect.any(CircularArray)
f7510105 996 }
5df69fab 997 }
86bf340d 998 })
bf9549ae 999 }
fd7ebd49 1000 await pool.destroy()
bf9549ae
JB
1001 })
1002
2431bdb4 1003 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
7fd82a1c 1004 const pool = new DynamicThreadPool(
2431bdb4 1005 Math.floor(numberOfWorkers / 2),
8f4878b7 1006 numberOfWorkers,
9e619829
JB
1007 './tests/worker-files/thread/testWorker.js'
1008 )
09c2d0d3 1009 const promises = new Set()
ee9f5295
JB
1010 const maxMultiplier = 2
1011 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
09c2d0d3 1012 promises.add(pool.execute())
9e619829
JB
1013 }
1014 await Promise.all(promises)
f06e48d8 1015 for (const workerNode of pool.workerNodes) {
465b2940 1016 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
1017 tasks: {
1018 executed: expect.any(Number),
1019 executing: 0,
1020 queued: 0,
df593701 1021 maxQueued: 0,
68cbdc84 1022 stolen: 0,
a4e07f72
JB
1023 failed: 0
1024 },
1025 runTime: {
a4e07f72
JB
1026 history: expect.any(CircularArray)
1027 },
1028 waitTime: {
a4e07f72
JB
1029 history: expect.any(CircularArray)
1030 },
5df69fab
JB
1031 elu: {
1032 idle: {
5df69fab
JB
1033 history: expect.any(CircularArray)
1034 },
1035 active: {
5df69fab 1036 history: expect.any(CircularArray)
f7510105 1037 }
5df69fab 1038 }
86bf340d 1039 })
465b2940 1040 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
94407def
JB
1041 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1042 numberOfWorkers * maxMultiplier
1043 )
b97d82d8
JB
1044 expect(workerNode.usage.runTime.history.length).toBe(0)
1045 expect(workerNode.usage.waitTime.history.length).toBe(0)
1046 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1047 expect(workerNode.usage.elu.active.history.length).toBe(0)
9e619829
JB
1048 }
1049 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
f06e48d8 1050 for (const workerNode of pool.workerNodes) {
465b2940 1051 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
1052 tasks: {
1053 executed: 0,
1054 executing: 0,
1055 queued: 0,
df593701 1056 maxQueued: 0,
68cbdc84 1057 stolen: 0,
a4e07f72
JB
1058 failed: 0
1059 },
1060 runTime: {
a4e07f72
JB
1061 history: expect.any(CircularArray)
1062 },
1063 waitTime: {
a4e07f72
JB
1064 history: expect.any(CircularArray)
1065 },
5df69fab
JB
1066 elu: {
1067 idle: {
5df69fab
JB
1068 history: expect.any(CircularArray)
1069 },
1070 active: {
5df69fab 1071 history: expect.any(CircularArray)
f7510105 1072 }
5df69fab 1073 }
86bf340d 1074 })
465b2940
JB
1075 expect(workerNode.usage.runTime.history.length).toBe(0)
1076 expect(workerNode.usage.waitTime.history.length).toBe(0)
b97d82d8
JB
1077 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1078 expect(workerNode.usage.elu.active.history.length).toBe(0)
ee11a4a2 1079 }
fd7ebd49 1080 await pool.destroy()
ee11a4a2
JB
1081 })
1082
a1763c54
JB
1083 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1084 const pool = new DynamicClusterPool(
2431bdb4 1085 Math.floor(numberOfWorkers / 2),
164d950a 1086 numberOfWorkers,
a1763c54 1087 './tests/worker-files/cluster/testWorker.js'
164d950a 1088 )
d46660cd 1089 let poolInfo
a1763c54 1090 let poolReady = 0
041dc05b 1091 pool.emitter.on(PoolEvents.ready, info => {
a1763c54 1092 ++poolReady
d46660cd
JB
1093 poolInfo = info
1094 })
a1763c54
JB
1095 await waitPoolEvents(pool, PoolEvents.ready, 1)
1096 expect(poolReady).toBe(1)
d46660cd 1097 expect(poolInfo).toStrictEqual({
23ccf9d7 1098 version,
d46660cd 1099 type: PoolTypes.dynamic,
a1763c54 1100 worker: WorkerTypes.cluster,
47352846 1101 started: true,
a1763c54 1102 ready: true,
2431bdb4
JB
1103 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1104 minSize: expect.any(Number),
1105 maxSize: expect.any(Number),
1106 workerNodes: expect.any(Number),
1107 idleWorkerNodes: expect.any(Number),
1108 busyWorkerNodes: expect.any(Number),
1109 executedTasks: expect.any(Number),
1110 executingTasks: expect.any(Number),
2431bdb4
JB
1111 failedTasks: expect.any(Number)
1112 })
1113 await pool.destroy()
1114 })
1115
a1763c54
JB
1116 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1117 const pool = new FixedThreadPool(
2431bdb4 1118 numberOfWorkers,
a1763c54 1119 './tests/worker-files/thread/testWorker.js'
2431bdb4 1120 )
a1763c54
JB
1121 const promises = new Set()
1122 let poolBusy = 0
2431bdb4 1123 let poolInfo
041dc05b 1124 pool.emitter.on(PoolEvents.busy, info => {
a1763c54 1125 ++poolBusy
2431bdb4
JB
1126 poolInfo = info
1127 })
a1763c54
JB
1128 for (let i = 0; i < numberOfWorkers * 2; i++) {
1129 promises.add(pool.execute())
1130 }
1131 await Promise.all(promises)
1132 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1133 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1134 expect(poolBusy).toBe(numberOfWorkers + 1)
2431bdb4
JB
1135 expect(poolInfo).toStrictEqual({
1136 version,
a1763c54
JB
1137 type: PoolTypes.fixed,
1138 worker: WorkerTypes.thread,
47352846
JB
1139 started: true,
1140 ready: true,
2431bdb4 1141 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
d46660cd
JB
1142 minSize: expect.any(Number),
1143 maxSize: expect.any(Number),
1144 workerNodes: expect.any(Number),
1145 idleWorkerNodes: expect.any(Number),
1146 busyWorkerNodes: expect.any(Number),
a4e07f72
JB
1147 executedTasks: expect.any(Number),
1148 executingTasks: expect.any(Number),
a4e07f72 1149 failedTasks: expect.any(Number)
d46660cd 1150 })
164d950a
JB
1151 await pool.destroy()
1152 })
1153
a1763c54
JB
1154 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1155 const pool = new DynamicThreadPool(
1156 Math.floor(numberOfWorkers / 2),
7c0ba920
JB
1157 numberOfWorkers,
1158 './tests/worker-files/thread/testWorker.js'
1159 )
09c2d0d3 1160 const promises = new Set()
a1763c54 1161 let poolFull = 0
d46660cd 1162 let poolInfo
041dc05b 1163 pool.emitter.on(PoolEvents.full, info => {
a1763c54 1164 ++poolFull
d46660cd
JB
1165 poolInfo = info
1166 })
7c0ba920 1167 for (let i = 0; i < numberOfWorkers * 2; i++) {
f5d14e90 1168 promises.add(pool.execute())
7c0ba920 1169 }
cf597bc5 1170 await Promise.all(promises)
33e6bb4c 1171 expect(poolFull).toBe(1)
d46660cd 1172 expect(poolInfo).toStrictEqual({
23ccf9d7 1173 version,
a1763c54 1174 type: PoolTypes.dynamic,
d46660cd 1175 worker: WorkerTypes.thread,
47352846
JB
1176 started: true,
1177 ready: true,
2431bdb4 1178 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
8735b4e5
JB
1179 minSize: expect.any(Number),
1180 maxSize: expect.any(Number),
1181 workerNodes: expect.any(Number),
1182 idleWorkerNodes: expect.any(Number),
1183 busyWorkerNodes: expect.any(Number),
1184 executedTasks: expect.any(Number),
1185 executingTasks: expect.any(Number),
1186 failedTasks: expect.any(Number)
1187 })
1188 await pool.destroy()
1189 })
1190
3e8611a8 1191 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
b1aae695 1192 const pool = new FixedThreadPool(
8735b4e5
JB
1193 numberOfWorkers,
1194 './tests/worker-files/thread/testWorker.js',
1195 {
1196 enableTasksQueue: true
1197 }
1198 )
3e8611a8 1199 sinon.stub(pool, 'hasBackPressure').returns(true)
8735b4e5
JB
1200 const promises = new Set()
1201 let poolBackPressure = 0
1202 let poolInfo
041dc05b 1203 pool.emitter.on(PoolEvents.backPressure, info => {
8735b4e5
JB
1204 ++poolBackPressure
1205 poolInfo = info
1206 })
033f1776 1207 for (let i = 0; i < numberOfWorkers + 1; i++) {
8735b4e5
JB
1208 promises.add(pool.execute())
1209 }
1210 await Promise.all(promises)
033f1776 1211 expect(poolBackPressure).toBe(1)
8735b4e5
JB
1212 expect(poolInfo).toStrictEqual({
1213 version,
3e8611a8 1214 type: PoolTypes.fixed,
8735b4e5 1215 worker: WorkerTypes.thread,
47352846
JB
1216 started: true,
1217 ready: true,
8735b4e5 1218 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
d46660cd
JB
1219 minSize: expect.any(Number),
1220 maxSize: expect.any(Number),
1221 workerNodes: expect.any(Number),
1222 idleWorkerNodes: expect.any(Number),
1223 busyWorkerNodes: expect.any(Number),
a4e07f72
JB
1224 executedTasks: expect.any(Number),
1225 executingTasks: expect.any(Number),
3e8611a8
JB
1226 maxQueuedTasks: expect.any(Number),
1227 queuedTasks: expect.any(Number),
1228 backPressure: true,
68cbdc84 1229 stolenTasks: expect.any(Number),
a4e07f72 1230 failedTasks: expect.any(Number)
d46660cd 1231 })
3e8611a8 1232 expect(pool.hasBackPressure.called).toBe(true)
fd7ebd49 1233 await pool.destroy()
7c0ba920 1234 })
70a4f5ea 1235
90d7d101
JB
1236 it('Verify that listTaskFunctions() is working', async () => {
1237 const dynamicThreadPool = new DynamicThreadPool(
1238 Math.floor(numberOfWorkers / 2),
1239 numberOfWorkers,
1240 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.js'
1241 )
1242 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1243 expect(dynamicThreadPool.listTaskFunctions()).toStrictEqual([
6cd5248f 1244 DEFAULT_TASK_NAME,
90d7d101
JB
1245 'jsonIntegerSerialization',
1246 'factorial',
1247 'fibonacci'
1248 ])
1249 const fixedClusterPool = new FixedClusterPool(
1250 numberOfWorkers,
1251 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1252 )
1253 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1254 expect(fixedClusterPool.listTaskFunctions()).toStrictEqual([
6cd5248f 1255 DEFAULT_TASK_NAME,
90d7d101
JB
1256 'jsonIntegerSerialization',
1257 'factorial',
1258 'fibonacci'
1259 ])
0fe39c97
JB
1260 await dynamicThreadPool.destroy()
1261 await fixedClusterPool.destroy()
90d7d101
JB
1262 })
1263
1264 it('Verify that multiple task functions worker is working', async () => {
70a4f5ea 1265 const pool = new DynamicClusterPool(
2431bdb4 1266 Math.floor(numberOfWorkers / 2),
70a4f5ea 1267 numberOfWorkers,
90d7d101 1268 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
70a4f5ea
JB
1269 )
1270 const data = { n: 10 }
82888165 1271 const result0 = await pool.execute(data)
30b963d4 1272 expect(result0).toStrictEqual({ ok: 1 })
70a4f5ea 1273 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
30b963d4 1274 expect(result1).toStrictEqual({ ok: 1 })
70a4f5ea
JB
1275 const result2 = await pool.execute(data, 'factorial')
1276 expect(result2).toBe(3628800)
1277 const result3 = await pool.execute(data, 'fibonacci')
024daf59 1278 expect(result3).toBe(55)
5bb5be17
JB
1279 expect(pool.info.executingTasks).toBe(0)
1280 expect(pool.info.executedTasks).toBe(4)
b414b84c
JB
1281 for (const workerNode of pool.workerNodes) {
1282 expect(workerNode.info.taskFunctions).toStrictEqual([
6cd5248f 1283 DEFAULT_TASK_NAME,
b414b84c
JB
1284 'jsonIntegerSerialization',
1285 'factorial',
1286 'fibonacci'
1287 ])
1288 expect(workerNode.taskFunctionsUsage.size).toBe(3)
1289 for (const name of pool.listTaskFunctions()) {
5bb5be17
JB
1290 expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
1291 tasks: {
1292 executed: expect.any(Number),
4ba4c7f9 1293 executing: 0,
5bb5be17 1294 failed: 0,
68cbdc84
JB
1295 queued: 0,
1296 stolen: 0
5bb5be17
JB
1297 },
1298 runTime: {
1299 history: expect.any(CircularArray)
1300 },
1301 waitTime: {
1302 history: expect.any(CircularArray)
1303 },
1304 elu: {
1305 idle: {
1306 history: expect.any(CircularArray)
1307 },
1308 active: {
1309 history: expect.any(CircularArray)
1310 }
1311 }
1312 })
1313 expect(
4ba4c7f9
JB
1314 workerNode.getTaskFunctionWorkerUsage(name).tasks.executed
1315 ).toBeGreaterThan(0)
5bb5be17 1316 }
dfd7ec01
JB
1317 expect(
1318 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1319 ).toStrictEqual(
1320 workerNode.getTaskFunctionWorkerUsage(workerNode.info.taskFunctions[1])
1321 )
5bb5be17 1322 }
0fe39c97 1323 await pool.destroy()
70a4f5ea 1324 })
52a23942
JB
1325
1326 it('Verify sendKillMessageToWorker()', async () => {
1327 const pool = new DynamicClusterPool(
1328 Math.floor(numberOfWorkers / 2),
1329 numberOfWorkers,
1330 './tests/worker-files/cluster/testWorker.js'
1331 )
1332 const workerNodeKey = 0
1333 await expect(
1334 pool.sendKillMessageToWorker(
1335 workerNodeKey,
1336 pool.workerNodes[workerNodeKey].info.id
1337 )
1338 ).resolves.toBeUndefined()
1339 await pool.destroy()
1340 })
3ec964d6 1341})