refactor: prepare worker choice strategies code for worker readiness
[poolifier.git] / tests / pools / cluster / dynamic.test.js
CommitLineData
a61a0724 1const { expect } = require('expect')
cdace0e5 2const { DynamicClusterPool, PoolEvents } = require('../../../lib')
2d2e32c2 3const { WorkerFunctions } = require('../../test-types')
bac873bd 4const { sleep, waitWorkerEvents } = require('../../test-utils')
506c2a14 5
a35560ba 6describe('Dynamic cluster pool test suite', () => {
e1ffb94f
JB
7 const min = 1
8 const max = 3
9 const pool = new DynamicClusterPool(
10 min,
11 max,
12 './tests/worker-files/cluster/testWorker.js',
13 {
14 errorHandler: e => console.error(e)
15 }
16 )
17
325f50bc 18 it('Verify that the function is executed in a worker cluster', async () => {
6db75ad9
JB
19 let result = await pool.execute({
20 function: WorkerFunctions.fibonacci
21 })
024daf59 22 expect(result).toBe(75025)
6db75ad9
JB
23 result = await pool.execute({
24 function: WorkerFunctions.factorial
25 })
70a4f5ea 26 expect(result).toBe(9.33262154439441e157)
506c2a14 27 })
28
29 it('Verify that new workers are created when required, max size is not exceeded and that after a while new workers will die', async () => {
7c0ba920 30 let poolBusy = 0
aee46736 31 pool.emitter.on(PoolEvents.busy, () => ++poolBusy)
cf9aa6c3 32 for (let i = 0; i < max * 2; i++) {
8cbb82eb 33 pool.execute()
506c2a14 34 }
f06e48d8
JB
35 expect(pool.workerNodes.length).toBeLessThanOrEqual(max)
36 expect(pool.workerNodes.length).toBeGreaterThan(min)
14916bf9
JB
37 // The `busy` event is triggered when the number of submitted tasks at once reach the max number of workers in the dynamic pool.
38 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool.
8620fb25 39 expect(poolBusy).toBe(max + 1)
bac873bd 40 const numberOfExitEvents = await waitWorkerEvents(pool, 'exit', max - min)
85a3f8a7 41 expect(numberOfExitEvents).toBe(max - min)
506c2a14 42 })
43
325f50bc 44 it('Verify scale worker up and down is working', async () => {
f06e48d8 45 expect(pool.workerNodes.length).toBe(min)
e211bc18 46 for (let i = 0; i < max * 2; i++) {
6db75ad9 47 pool.execute()
bcf04003 48 }
f06e48d8 49 expect(pool.workerNodes.length).toBeGreaterThan(min)
bac873bd 50 await waitWorkerEvents(pool, 'exit', max - min)
f06e48d8 51 expect(pool.workerNodes.length).toBe(min)
e211bc18 52 for (let i = 0; i < max * 2; i++) {
6db75ad9 53 pool.execute()
bcf04003 54 }
f06e48d8 55 expect(pool.workerNodes.length).toBeGreaterThan(min)
bac873bd 56 await waitWorkerEvents(pool, 'exit', max - min)
f06e48d8 57 expect(pool.workerNodes.length).toBe(min)
bcf04003 58 })
506c2a14 59
85a3f8a7 60 it('Shutdown test', async () => {
bac873bd 61 const exitPromise = waitWorkerEvents(pool, 'exit', min)
85a3f8a7 62 await pool.destroy()
bdacc2d2
JB
63 const numberOfExitEvents = await exitPromise
64 expect(numberOfExitEvents).toBe(min)
506c2a14 65 })
66
8d3782fa
JB
67 it('Validation of inputs test', () => {
68 expect(() => new DynamicClusterPool(min)).toThrowError(
d4aeae5a 69 'Please specify a file with a worker implementation'
8d3782fa
JB
70 )
71 })
72
506c2a14 73 it('Should work even without opts in input', async () => {
325f50bc 74 const pool1 = new DynamicClusterPool(
e1ffb94f
JB
75 min,
76 max,
76b1e974 77 './tests/worker-files/cluster/testWorker.js'
325f50bc 78 )
6db75ad9 79 const result = await pool1.execute()
30b963d4 80 expect(result).toStrictEqual({ ok: 1 })
8bc77620
APA
81 // We need to clean up the resources after our test
82 await pool1.destroy()
506c2a14 83 })
e826bd34 84
1c6fe997 85 it('Verify scale processes up and down is working when long executing task is used:hard', async () => {
c01733f1 86 const longRunningPool = new DynamicClusterPool(
87 min,
88 max,
292ad316
JB
89 './tests/worker-files/cluster/longRunningWorkerHardBehavior.js',
90 {
91 errorHandler: e => console.error(e),
1c6fe997
JB
92 onlineHandler: () => console.log('long executing worker is online'),
93 exitHandler: () => console.log('long executing worker exited')
292ad316 94 }
4c35177b 95 )
f06e48d8 96 expect(longRunningPool.workerNodes.length).toBe(min)
e211bc18 97 for (let i = 0; i < max * 2; i++) {
6db75ad9 98 longRunningPool.execute()
4c35177b 99 }
f06e48d8 100 expect(longRunningPool.workerNodes.length).toBe(max)
bac873bd 101 await waitWorkerEvents(longRunningPool, 'exit', max - min)
f06e48d8 102 expect(longRunningPool.workerNodes.length).toBe(min)
d710242d
JB
103 expect(
104 longRunningPool.workerChoiceStrategyContext.workerChoiceStrategies.get(
105 longRunningPool.workerChoiceStrategyContext.workerChoiceStrategy
9b106837 106 ).nextWorkerNodeKey
d710242d 107 ).toBeLessThan(longRunningPool.workerNodes.length)
8bc77620
APA
108 // We need to clean up the resources after our test
109 await longRunningPool.destroy()
4c35177b 110 })
111
1c6fe997 112 it('Verify scale processes up and down is working when long executing task is used:soft', async () => {
4c35177b 113 const longRunningPool = new DynamicClusterPool(
114 min,
115 max,
292ad316
JB
116 './tests/worker-files/cluster/longRunningWorkerSoftBehavior.js',
117 {
118 errorHandler: e => console.error(e),
1c6fe997
JB
119 onlineHandler: () => console.log('long executing worker is online'),
120 exitHandler: () => console.log('long executing worker exited')
292ad316 121 }
c01733f1 122 )
f06e48d8 123 expect(longRunningPool.workerNodes.length).toBe(min)
e211bc18 124 for (let i = 0; i < max * 2; i++) {
6db75ad9 125 longRunningPool.execute()
c01733f1 126 }
f06e48d8 127 expect(longRunningPool.workerNodes.length).toBe(max)
920278a2 128 await sleep(1000)
1c6fe997 129 // Here we expect the workerNodes to be at the max size since the task is still executing
f06e48d8 130 expect(longRunningPool.workerNodes.length).toBe(max)
8bc77620
APA
131 // We need to clean up the resources after our test
132 await longRunningPool.destroy()
c01733f1 133 })
8d3782fa
JB
134
135 it('Verify that a pool with zero worker can be instantiated', async () => {
136 const pool = new DynamicClusterPool(
137 0,
138 max,
139 './tests/worker-files/cluster/testWorker.js'
140 )
141 expect(pool).toBeInstanceOf(DynamicClusterPool)
142 // We need to clean up the resources after our test
143 await pool.destroy()
144 })
506c2a14 145})