Merge branch 'fix-dynamic-pool' of github.com:jerome-benoit/poolifier into fix-dynami...
[poolifier.git] / tests / pools / cluster / dynamic.test.mjs
CommitLineData
a074ffee 1import { expect } from 'expect'
d35e5717
JB
2import { DynamicClusterPool, PoolEvents } from '../../../lib/index.cjs'
3import { TaskFunctions } from '../../test-types.cjs'
4import { sleep, waitWorkerEvents } from '../../test-utils.cjs'
506c2a14 5
a35560ba 6describe('Dynamic cluster pool test suite', () => {
e1ffb94f
JB
7 const min = 1
8 const max = 3
9 const pool = new DynamicClusterPool(
10 min,
11 max,
d35e5717 12 './tests/worker-files/cluster/testWorker.cjs',
e1ffb94f 13 {
041dc05b 14 errorHandler: e => console.error(e)
e1ffb94f
JB
15 }
16 )
17
325f50bc 18 it('Verify that the function is executed in a worker cluster', async () => {
6db75ad9 19 let result = await pool.execute({
dbca3be9 20 function: TaskFunctions.fibonacci
6db75ad9 21 })
024daf59 22 expect(result).toBe(75025)
6db75ad9 23 result = await pool.execute({
dbca3be9 24 function: TaskFunctions.factorial
6db75ad9 25 })
70a4f5ea 26 expect(result).toBe(9.33262154439441e157)
506c2a14 27 })
28
29 it('Verify that new workers are created when required, max size is not exceeded and that after a while new workers will die', async () => {
7c0ba920 30 let poolBusy = 0
aee46736 31 pool.emitter.on(PoolEvents.busy, () => ++poolBusy)
cf9aa6c3 32 for (let i = 0; i < max * 2; i++) {
8cbb82eb 33 pool.execute()
506c2a14 34 }
f06e48d8
JB
35 expect(pool.workerNodes.length).toBeLessThanOrEqual(max)
36 expect(pool.workerNodes.length).toBeGreaterThan(min)
94407def 37 expect(poolBusy).toBe(1)
bac873bd 38 const numberOfExitEvents = await waitWorkerEvents(pool, 'exit', max - min)
85a3f8a7 39 expect(numberOfExitEvents).toBe(max - min)
9b5c72ff 40 expect(pool.workerNodes.length).toBe(min)
506c2a14 41 })
42
325f50bc 43 it('Verify scale worker up and down is working', async () => {
e211bc18 44 for (let i = 0; i < max * 2; i++) {
6db75ad9 45 pool.execute()
bcf04003 46 }
f06e48d8 47 expect(pool.workerNodes.length).toBeGreaterThan(min)
bac873bd 48 await waitWorkerEvents(pool, 'exit', max - min)
f06e48d8 49 expect(pool.workerNodes.length).toBe(min)
e211bc18 50 for (let i = 0; i < max * 2; i++) {
6db75ad9 51 pool.execute()
bcf04003 52 }
f06e48d8 53 expect(pool.workerNodes.length).toBeGreaterThan(min)
bac873bd 54 await waitWorkerEvents(pool, 'exit', max - min)
f06e48d8 55 expect(pool.workerNodes.length).toBe(min)
bcf04003 56 })
506c2a14 57
85a3f8a7 58 it('Shutdown test', async () => {
bac873bd 59 const exitPromise = waitWorkerEvents(pool, 'exit', min)
c726f66c 60 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
ef3891a3
JB
61 let poolDestroy = 0
62 pool.emitter.on(PoolEvents.destroy, () => ++poolDestroy)
c726f66c
JB
63 expect(pool.emitter.eventNames()).toStrictEqual([
64 PoolEvents.busy,
65 PoolEvents.destroy
66 ])
85a3f8a7 67 await pool.destroy()
bdacc2d2 68 const numberOfExitEvents = await exitPromise
bb9423b7 69 expect(pool.started).toBe(false)
76efd044 70 expect(pool.emitter.eventNames()).toStrictEqual([])
55082af9 71 expect(pool.readyEventEmitted).toBe(false)
bb9423b7 72 expect(pool.workerNodes.length).toBe(0)
bdacc2d2 73 expect(numberOfExitEvents).toBe(min)
ef3891a3 74 expect(poolDestroy).toBe(1)
506c2a14 75 })
76
8d3782fa 77 it('Validation of inputs test', () => {
948faff7 78 expect(() => new DynamicClusterPool(min)).toThrow(
c3719753 79 'The worker file path must be specified'
8d3782fa
JB
80 )
81 })
82
506c2a14 83 it('Should work even without opts in input', async () => {
0fe39c97 84 const pool = new DynamicClusterPool(
e1ffb94f
JB
85 min,
86 max,
d35e5717 87 './tests/worker-files/cluster/testWorker.cjs'
325f50bc 88 )
0fe39c97 89 const result = await pool.execute()
30b963d4 90 expect(result).toStrictEqual({ ok: 1 })
8bc77620 91 // We need to clean up the resources after our test
0fe39c97 92 await pool.destroy()
506c2a14 93 })
e826bd34 94
1c6fe997 95 it('Verify scale processes up and down is working when long executing task is used:hard', async () => {
c01733f1 96 const longRunningPool = new DynamicClusterPool(
97 min,
98 max,
d35e5717 99 './tests/worker-files/cluster/longRunningWorkerHardBehavior.cjs',
292ad316 100 {
041dc05b 101 errorHandler: e => console.error(e),
73bfd59d
JB
102 onlineHandler: () => console.info('long executing worker is online'),
103 exitHandler: () => console.info('long executing worker exited')
292ad316 104 }
4c35177b 105 )
f06e48d8 106 expect(longRunningPool.workerNodes.length).toBe(min)
e211bc18 107 for (let i = 0; i < max * 2; i++) {
6db75ad9 108 longRunningPool.execute()
4c35177b 109 }
f06e48d8 110 expect(longRunningPool.workerNodes.length).toBe(max)
bac873bd 111 await waitWorkerEvents(longRunningPool, 'exit', max - min)
f06e48d8 112 expect(longRunningPool.workerNodes.length).toBe(min)
d710242d
JB
113 expect(
114 longRunningPool.workerChoiceStrategyContext.workerChoiceStrategies.get(
115 longRunningPool.workerChoiceStrategyContext.workerChoiceStrategy
9b106837 116 ).nextWorkerNodeKey
d710242d 117 ).toBeLessThan(longRunningPool.workerNodes.length)
8bc77620
APA
118 // We need to clean up the resources after our test
119 await longRunningPool.destroy()
4c35177b 120 })
121
1c6fe997 122 it('Verify scale processes up and down is working when long executing task is used:soft', async () => {
4c35177b 123 const longRunningPool = new DynamicClusterPool(
124 min,
125 max,
d35e5717 126 './tests/worker-files/cluster/longRunningWorkerSoftBehavior.cjs',
292ad316 127 {
041dc05b 128 errorHandler: e => console.error(e),
73bfd59d
JB
129 onlineHandler: () => console.info('long executing worker is online'),
130 exitHandler: () => console.info('long executing worker exited')
292ad316 131 }
c01733f1 132 )
f06e48d8 133 expect(longRunningPool.workerNodes.length).toBe(min)
e211bc18 134 for (let i = 0; i < max * 2; i++) {
6db75ad9 135 longRunningPool.execute()
c01733f1 136 }
f06e48d8 137 expect(longRunningPool.workerNodes.length).toBe(max)
920278a2 138 await sleep(1000)
1c6fe997 139 // Here we expect the workerNodes to be at the max size since the task is still executing
f06e48d8 140 expect(longRunningPool.workerNodes.length).toBe(max)
8bc77620
APA
141 // We need to clean up the resources after our test
142 await longRunningPool.destroy()
c01733f1 143 })
8d3782fa
JB
144
145 it('Verify that a pool with zero worker can be instantiated', async () => {
146 const pool = new DynamicClusterPool(
147 0,
148 max,
d35e5717 149 './tests/worker-files/cluster/testWorker.cjs'
8d3782fa
JB
150 )
151 expect(pool).toBeInstanceOf(DynamicClusterPool)
152 // We need to clean up the resources after our test
153 await pool.destroy()
154 })
e44639e9
JB
155
156 it.skip('Verify that a pool with zero worker works', async () => {
157 const pool = new DynamicClusterPool(
158 0,
159 max,
160 './tests/worker-files/thread/testWorker.mjs'
161 )
162 expect(pool.starting).toBe(false)
163 expect(pool.workerNodes.length).toBe(pool.info.minSize)
28881126
JB
164 for (let run = 0; run < 4; run++) {
165 // pool.enableTasksQueue(true, { concurrency: 2 })
166 const maxMultiplier = 10000
167 const promises = new Set()
168 for (let i = 0; i < max * maxMultiplier; i++) {
169 promises.add(pool.execute())
170 }
171 await Promise.all(promises)
172 expect(pool.workerNodes.length).toBe(max)
173 await waitWorkerEvents(pool, 'exit', max)
174 expect(pool.workerNodes.length).toBe(pool.info.minSize)
a2283a19 175 }
e44639e9
JB
176 // We need to clean up the resources after our test
177 await pool.destroy()
178 })
506c2a14 179})