build(deps-dev): apply updates
[poolifier.git] / tests / pools / cluster / dynamic.test.mjs
CommitLineData
a074ffee 1import { expect } from 'expect'
8e8d9101
JB
2import {
3 DynamicClusterPool,
4 PoolEvents,
5 WorkerChoiceStrategies
6} from '../../../lib/index.cjs'
d35e5717 7import { TaskFunctions } from '../../test-types.cjs'
8e8d9101 8import { sleep, waitPoolEvents, waitWorkerEvents } from '../../test-utils.cjs'
506c2a14 9
a35560ba 10describe('Dynamic cluster pool test suite', () => {
e1ffb94f
JB
11 const min = 1
12 const max = 3
13 const pool = new DynamicClusterPool(
14 min,
15 max,
d35e5717 16 './tests/worker-files/cluster/testWorker.cjs',
e1ffb94f 17 {
041dc05b 18 errorHandler: e => console.error(e)
e1ffb94f
JB
19 }
20 )
21
325f50bc 22 it('Verify that the function is executed in a worker cluster', async () => {
6db75ad9 23 let result = await pool.execute({
dbca3be9 24 function: TaskFunctions.fibonacci
6db75ad9 25 })
024daf59 26 expect(result).toBe(75025)
6db75ad9 27 result = await pool.execute({
dbca3be9 28 function: TaskFunctions.factorial
6db75ad9 29 })
70a4f5ea 30 expect(result).toBe(9.33262154439441e157)
506c2a14 31 })
32
33 it('Verify that new workers are created when required, max size is not exceeded and that after a while new workers will die', async () => {
7c0ba920 34 let poolBusy = 0
aee46736 35 pool.emitter.on(PoolEvents.busy, () => ++poolBusy)
cf9aa6c3 36 for (let i = 0; i < max * 2; i++) {
8cbb82eb 37 pool.execute()
506c2a14 38 }
f06e48d8
JB
39 expect(pool.workerNodes.length).toBeLessThanOrEqual(max)
40 expect(pool.workerNodes.length).toBeGreaterThan(min)
94407def 41 expect(poolBusy).toBe(1)
bac873bd 42 const numberOfExitEvents = await waitWorkerEvents(pool, 'exit', max - min)
85a3f8a7 43 expect(numberOfExitEvents).toBe(max - min)
9b5c72ff 44 expect(pool.workerNodes.length).toBe(min)
506c2a14 45 })
46
325f50bc 47 it('Verify scale worker up and down is working', async () => {
e211bc18 48 for (let i = 0; i < max * 2; i++) {
6db75ad9 49 pool.execute()
bcf04003 50 }
f06e48d8 51 expect(pool.workerNodes.length).toBeGreaterThan(min)
bac873bd 52 await waitWorkerEvents(pool, 'exit', max - min)
f06e48d8 53 expect(pool.workerNodes.length).toBe(min)
e211bc18 54 for (let i = 0; i < max * 2; i++) {
6db75ad9 55 pool.execute()
bcf04003 56 }
f06e48d8 57 expect(pool.workerNodes.length).toBeGreaterThan(min)
bac873bd 58 await waitWorkerEvents(pool, 'exit', max - min)
f06e48d8 59 expect(pool.workerNodes.length).toBe(min)
bcf04003 60 })
506c2a14 61
85a3f8a7 62 it('Shutdown test', async () => {
bac873bd 63 const exitPromise = waitWorkerEvents(pool, 'exit', min)
c726f66c 64 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
ef3891a3
JB
65 let poolDestroy = 0
66 pool.emitter.on(PoolEvents.destroy, () => ++poolDestroy)
c726f66c
JB
67 expect(pool.emitter.eventNames()).toStrictEqual([
68 PoolEvents.busy,
69 PoolEvents.destroy
70 ])
85a3f8a7 71 await pool.destroy()
bdacc2d2 72 const numberOfExitEvents = await exitPromise
bb9423b7 73 expect(pool.started).toBe(false)
76efd044 74 expect(pool.emitter.eventNames()).toStrictEqual([])
55082af9 75 expect(pool.readyEventEmitted).toBe(false)
bb9423b7 76 expect(pool.workerNodes.length).toBe(0)
bdacc2d2 77 expect(numberOfExitEvents).toBe(min)
ef3891a3 78 expect(poolDestroy).toBe(1)
506c2a14 79 })
80
8d3782fa 81 it('Validation of inputs test', () => {
948faff7 82 expect(() => new DynamicClusterPool(min)).toThrow(
c3719753 83 'The worker file path must be specified'
8d3782fa
JB
84 )
85 })
86
506c2a14 87 it('Should work even without opts in input', async () => {
0fe39c97 88 const pool = new DynamicClusterPool(
e1ffb94f
JB
89 min,
90 max,
d35e5717 91 './tests/worker-files/cluster/testWorker.cjs'
325f50bc 92 )
0fe39c97 93 const result = await pool.execute()
30b963d4 94 expect(result).toStrictEqual({ ok: 1 })
8bc77620 95 // We need to clean up the resources after our test
0fe39c97 96 await pool.destroy()
506c2a14 97 })
e826bd34 98
1c6fe997 99 it('Verify scale processes up and down is working when long executing task is used:hard', async () => {
c01733f1 100 const longRunningPool = new DynamicClusterPool(
101 min,
102 max,
d35e5717 103 './tests/worker-files/cluster/longRunningWorkerHardBehavior.cjs',
292ad316 104 {
041dc05b 105 errorHandler: e => console.error(e),
73bfd59d
JB
106 onlineHandler: () => console.info('long executing worker is online'),
107 exitHandler: () => console.info('long executing worker exited')
292ad316 108 }
4c35177b 109 )
f06e48d8 110 expect(longRunningPool.workerNodes.length).toBe(min)
e211bc18 111 for (let i = 0; i < max * 2; i++) {
6db75ad9 112 longRunningPool.execute()
4c35177b 113 }
f06e48d8 114 expect(longRunningPool.workerNodes.length).toBe(max)
bac873bd 115 await waitWorkerEvents(longRunningPool, 'exit', max - min)
f06e48d8 116 expect(longRunningPool.workerNodes.length).toBe(min)
d710242d
JB
117 expect(
118 longRunningPool.workerChoiceStrategyContext.workerChoiceStrategies.get(
119 longRunningPool.workerChoiceStrategyContext.workerChoiceStrategy
9b106837 120 ).nextWorkerNodeKey
d710242d 121 ).toBeLessThan(longRunningPool.workerNodes.length)
8bc77620
APA
122 // We need to clean up the resources after our test
123 await longRunningPool.destroy()
4c35177b 124 })
125
1c6fe997 126 it('Verify scale processes up and down is working when long executing task is used:soft', async () => {
4c35177b 127 const longRunningPool = new DynamicClusterPool(
128 min,
129 max,
d35e5717 130 './tests/worker-files/cluster/longRunningWorkerSoftBehavior.cjs',
292ad316 131 {
041dc05b 132 errorHandler: e => console.error(e),
73bfd59d
JB
133 onlineHandler: () => console.info('long executing worker is online'),
134 exitHandler: () => console.info('long executing worker exited')
292ad316 135 }
c01733f1 136 )
f06e48d8 137 expect(longRunningPool.workerNodes.length).toBe(min)
e211bc18 138 for (let i = 0; i < max * 2; i++) {
6db75ad9 139 longRunningPool.execute()
c01733f1 140 }
f06e48d8 141 expect(longRunningPool.workerNodes.length).toBe(max)
920278a2 142 await sleep(1000)
1c6fe997 143 // Here we expect the workerNodes to be at the max size since the task is still executing
f06e48d8 144 expect(longRunningPool.workerNodes.length).toBe(max)
8bc77620
APA
145 // We need to clean up the resources after our test
146 await longRunningPool.destroy()
c01733f1 147 })
8d3782fa
JB
148
149 it('Verify that a pool with zero worker can be instantiated', async () => {
150 const pool = new DynamicClusterPool(
151 0,
152 max,
d35e5717 153 './tests/worker-files/cluster/testWorker.cjs'
8d3782fa
JB
154 )
155 expect(pool).toBeInstanceOf(DynamicClusterPool)
156 // We need to clean up the resources after our test
157 await pool.destroy()
158 })
e44639e9 159
6d7beb8c 160 it('Verify that a pool with zero worker works', async () => {
8e8d9101 161 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
6d7beb8c
JB
162 const pool = new DynamicClusterPool(
163 0,
164 max,
165 './tests/worker-files/cluster/testWorker.cjs',
166 {
167 workerChoiceStrategy
168 }
169 )
170 expect(pool.starting).toBe(false)
8e8d9101
JB
171 expect(pool.readyEventEmitted).toBe(false)
172 for (let run = 0; run < 2; run++) {
173 run % 2 !== 0 && pool.enableTasksQueue(true)
174 const maxMultiplier = 4
175 const promises = new Set()
176 expect(pool.workerNodes.length).toBe(pool.info.minSize)
177 for (let i = 0; i < max * maxMultiplier; i++) {
178 promises.add(pool.execute())
179 }
180 await Promise.all(promises)
181 expect(pool.readyEventEmitted).toBe(true)
182 expect(pool.workerNodes.length).toBeGreaterThan(pool.info.minSize)
183 expect(pool.workerNodes.length).toBeLessThanOrEqual(pool.info.maxSize)
184 await waitPoolEvents(pool, PoolEvents.empty, 1)
185 expect(pool.readyEventEmitted).toBe(false)
186 expect(pool.workerNodes.length).toBe(pool.info.minSize)
28881126 187 }
6d7beb8c
JB
188 // We need to clean up the resources after our test
189 await pool.destroy()
a2283a19 190 }
e44639e9 191 })
506c2a14 192})