8b6301c932aaea4f05972e12085c1509deec93d7
[poolifier.git] / tests / pools / cluster / dynamic.test.mjs
1 import { expect } from 'expect'
2
3 import {
4 DynamicClusterPool,
5 PoolEvents,
6 WorkerChoiceStrategies
7 } from '../../../lib/index.cjs'
8 import { TaskFunctions } from '../../test-types.cjs'
9 import { sleep, waitPoolEvents, waitWorkerEvents } from '../../test-utils.cjs'
10
11 describe('Dynamic cluster pool test suite', () => {
12 const min = 1
13 const max = 3
14 const pool = new DynamicClusterPool(
15 min,
16 max,
17 './tests/worker-files/cluster/testWorker.cjs',
18 {
19 errorHandler: e => console.error(e)
20 }
21 )
22
23 it('Verify that the function is executed in a worker cluster', async () => {
24 let result = await pool.execute({
25 function: TaskFunctions.fibonacci
26 })
27 expect(result).toBe(75025)
28 result = await pool.execute({
29 function: TaskFunctions.factorial
30 })
31 expect(result).toBe(9.33262154439441e157)
32 })
33
34 it('Verify that new workers are created when required, max size is not exceeded and that after a while new workers will die', async () => {
35 let poolBusy = 0
36 pool.emitter.on(PoolEvents.busy, () => ++poolBusy)
37 for (let i = 0; i < max * 2; i++) {
38 pool.execute()
39 }
40 expect(pool.workerNodes.length).toBeLessThanOrEqual(max)
41 expect(pool.workerNodes.length).toBeGreaterThan(min)
42 expect(poolBusy).toBe(1)
43 const numberOfExitEvents = await waitWorkerEvents(pool, 'exit', max - min)
44 expect(numberOfExitEvents).toBe(max - min)
45 expect(pool.workerNodes.length).toBe(min)
46 })
47
48 it('Verify scale worker up and down is working', async () => {
49 for (let i = 0; i < max * 2; i++) {
50 pool.execute()
51 }
52 expect(pool.workerNodes.length).toBeGreaterThan(min)
53 await waitWorkerEvents(pool, 'exit', max - min)
54 expect(pool.workerNodes.length).toBe(min)
55 for (let i = 0; i < max * 2; i++) {
56 pool.execute()
57 }
58 expect(pool.workerNodes.length).toBeGreaterThan(min)
59 await waitWorkerEvents(pool, 'exit', max - min)
60 expect(pool.workerNodes.length).toBe(min)
61 })
62
63 it('Shutdown test', async () => {
64 const exitPromise = waitWorkerEvents(pool, 'exit', min)
65 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
66 let poolDestroy = 0
67 pool.emitter.on(PoolEvents.destroy, () => ++poolDestroy)
68 expect(pool.emitter.eventNames()).toStrictEqual([
69 PoolEvents.busy,
70 PoolEvents.destroy
71 ])
72 await pool.destroy()
73 const numberOfExitEvents = await exitPromise
74 expect(pool.started).toBe(false)
75 expect(pool.emitter.eventNames()).toStrictEqual(['busy', 'destroy'])
76 expect(pool.readyEventEmitted).toBe(false)
77 expect(pool.workerNodes.length).toBe(0)
78 expect(numberOfExitEvents).toBe(min)
79 expect(poolDestroy).toBe(1)
80 })
81
82 it('Validation of inputs test', () => {
83 expect(() => new DynamicClusterPool(min)).toThrow(
84 'The worker file path must be specified'
85 )
86 })
87
88 it('Should work even without opts in input', async () => {
89 const pool = new DynamicClusterPool(
90 min,
91 max,
92 './tests/worker-files/cluster/testWorker.cjs'
93 )
94 const result = await pool.execute()
95 expect(result).toStrictEqual({ ok: 1 })
96 // We need to clean up the resources after our test
97 await pool.destroy()
98 })
99
100 it('Verify scale processes up and down is working when long executing task is used:hard', async () => {
101 const longRunningPool = new DynamicClusterPool(
102 min,
103 max,
104 './tests/worker-files/cluster/longRunningWorkerHardBehavior.cjs',
105 {
106 errorHandler: e => console.error(e),
107 onlineHandler: () => console.info('long executing worker is online'),
108 exitHandler: () => console.info('long executing worker exited')
109 }
110 )
111 expect(longRunningPool.workerNodes.length).toBe(min)
112 for (let i = 0; i < max * 2; i++) {
113 longRunningPool.execute()
114 }
115 expect(longRunningPool.workerNodes.length).toBe(max)
116 await waitWorkerEvents(longRunningPool, 'exit', max - min)
117 expect(longRunningPool.workerNodes.length).toBe(min)
118 expect(
119 longRunningPool.workerChoiceStrategyContext.workerChoiceStrategies.get(
120 longRunningPool.workerChoiceStrategyContext.workerChoiceStrategy
121 ).nextWorkerNodeKey
122 ).toBeLessThan(longRunningPool.workerNodes.length)
123 // We need to clean up the resources after our test
124 await longRunningPool.destroy()
125 })
126
127 it('Verify scale processes up and down is working when long executing task is used:soft', async () => {
128 const longRunningPool = new DynamicClusterPool(
129 min,
130 max,
131 './tests/worker-files/cluster/longRunningWorkerSoftBehavior.cjs',
132 {
133 errorHandler: e => console.error(e),
134 onlineHandler: () => console.info('long executing worker is online'),
135 exitHandler: () => console.info('long executing worker exited')
136 }
137 )
138 expect(longRunningPool.workerNodes.length).toBe(min)
139 for (let i = 0; i < max * 2; i++) {
140 longRunningPool.execute()
141 }
142 expect(longRunningPool.workerNodes.length).toBe(max)
143 await sleep(1000)
144 // Here we expect the workerNodes to be at the max size since the task is still executing
145 expect(longRunningPool.workerNodes.length).toBe(max)
146 // We need to clean up the resources after our test
147 await longRunningPool.destroy()
148 })
149
150 it('Verify that a pool with zero worker can be instantiated', async () => {
151 const pool = new DynamicClusterPool(
152 0,
153 max,
154 './tests/worker-files/cluster/testWorker.cjs'
155 )
156 expect(pool).toBeInstanceOf(DynamicClusterPool)
157 // We need to clean up the resources after our test
158 await pool.destroy()
159 })
160
161 it('Verify that a pool with zero worker works', async () => {
162 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
163 const pool = new DynamicClusterPool(
164 0,
165 max,
166 './tests/worker-files/cluster/testWorker.cjs',
167 {
168 workerChoiceStrategy
169 }
170 )
171 expect(pool.starting).toBe(false)
172 expect(pool.readyEventEmitted).toBe(false)
173 for (let run = 0; run < 2; run++) {
174 run % 2 !== 0 && pool.enableTasksQueue(true)
175 const maxMultiplier = 4
176 const promises = new Set()
177 expect(pool.workerNodes.length).toBe(pool.info.minSize)
178 for (let i = 0; i < max * maxMultiplier; i++) {
179 promises.add(pool.execute())
180 }
181 await Promise.all(promises)
182 expect(pool.readyEventEmitted).toBe(true)
183 expect(pool.workerNodes.length).toBeGreaterThan(pool.info.minSize)
184 expect(pool.workerNodes.length).toBeLessThanOrEqual(pool.info.maxSize)
185 await waitPoolEvents(pool, PoolEvents.empty, 1)
186 expect(pool.readyEventEmitted).toBe(false)
187 expect(pool.workerNodes.length).toBe(pool.info.minSize)
188 }
189 // We need to clean up the resources after our test
190 await pool.destroy()
191 }
192 })
193 })