Test: Verify that worker pool tasks usage are reset at worker choice strategy change...
[poolifier.git] / tests / pools / abstract / abstract-pool.test.js
CommitLineData
a61a0724 1const { expect } = require('expect')
e843b904
JB
2const {
3 FixedClusterPool,
9e619829 4 DynamicThreadPool,
e843b904
JB
5 FixedThreadPool,
6 WorkerChoiceStrategies
7} = require('../../../lib/index')
e1ffb94f
JB
8
9describe('Abstract pool test suite', () => {
10 const numberOfWorkers = 1
11 const workerNotFoundInTasksUsageMapError = new Error(
12 'Worker could not be found in worker tasks usage map'
13 )
14 class StubPoolWithWorkerTasksUsageMapClear extends FixedThreadPool {
15 removeAllWorker () {
16 this.workersTasksUsage.clear()
17 }
3ec964d6 18 }
e1ffb94f
JB
19 class StubPoolWithIsMainMethod extends FixedThreadPool {
20 isMain () {
21 return false
22 }
3ec964d6 23 }
3ec964d6 24
3ec964d6 25 it('Simulate pool creation from a non main thread/process', () => {
8d3782fa
JB
26 expect(
27 () =>
28 new StubPoolWithIsMainMethod(
7c0ba920 29 numberOfWorkers,
8d3782fa
JB
30 './tests/worker-files/thread/testWorker.js',
31 {
32 errorHandler: e => console.error(e)
33 }
34 )
35 ).toThrowError(new Error('Cannot start a pool from a worker!'))
3ec964d6 36 })
c510fea7
APA
37
38 it('Verify that filePath is checked', () => {
292ad316
JB
39 const expectedError = new Error(
40 'Please specify a file with a worker implementation'
41 )
7c0ba920 42 expect(() => new FixedThreadPool(numberOfWorkers)).toThrowError(
292ad316 43 expectedError
8d3782fa 44 )
7c0ba920 45 expect(() => new FixedThreadPool(numberOfWorkers, '')).toThrowError(
292ad316 46 expectedError
8d3782fa
JB
47 )
48 })
49
50 it('Verify that numberOfWorkers is checked', () => {
51 expect(() => new FixedThreadPool()).toThrowError(
52 new Error(
53 'Cannot instantiate a pool without specifying the number of workers'
54 )
55 )
56 })
57
58 it('Verify that a negative number of workers is checked', () => {
59 expect(
60 () =>
61 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
62 ).toThrowError(
63 new Error('Cannot instantiate a pool with a negative number of workers')
64 )
65 })
66
67 it('Verify that a non integer number of workers is checked', () => {
68 expect(
69 () =>
70 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
71 ).toThrowError(
72 new Error(
73 'Cannot instantiate a pool with a non integer number of workers'
74 )
75 )
c510fea7 76 })
7c0ba920 77
fd7ebd49 78 it('Verify that pool options are checked', async () => {
7c0ba920
JB
79 let pool = new FixedThreadPool(
80 numberOfWorkers,
81 './tests/worker-files/thread/testWorker.js'
82 )
8620fb25 83 expect(pool.opts.enableEvents).toBe(true)
7c0ba920 84 expect(pool.emitter).toBeDefined()
e843b904
JB
85 expect(pool.opts.workerChoiceStrategy).toBe(
86 WorkerChoiceStrategies.ROUND_ROBIN
87 )
35cf1c03
JB
88 expect(pool.opts.messageHandler).toBeUndefined()
89 expect(pool.opts.errorHandler).toBeUndefined()
90 expect(pool.opts.onlineHandler).toBeUndefined()
91 expect(pool.opts.exitHandler).toBeUndefined()
fd7ebd49 92 await pool.destroy()
35cf1c03 93 const testHandler = () => console.log('test handler executed')
7c0ba920
JB
94 pool = new FixedThreadPool(
95 numberOfWorkers,
96 './tests/worker-files/thread/testWorker.js',
97 {
e843b904 98 workerChoiceStrategy: WorkerChoiceStrategies.LESS_RECENTLY_USED,
35cf1c03
JB
99 enableEvents: false,
100 messageHandler: testHandler,
101 errorHandler: testHandler,
102 onlineHandler: testHandler,
103 exitHandler: testHandler
7c0ba920
JB
104 }
105 )
8620fb25 106 expect(pool.opts.enableEvents).toBe(false)
7c0ba920 107 expect(pool.emitter).toBeUndefined()
e843b904
JB
108 expect(pool.opts.workerChoiceStrategy).toBe(
109 WorkerChoiceStrategies.LESS_RECENTLY_USED
110 )
35cf1c03
JB
111 expect(pool.opts.messageHandler).toStrictEqual(testHandler)
112 expect(pool.opts.errorHandler).toStrictEqual(testHandler)
113 expect(pool.opts.onlineHandler).toStrictEqual(testHandler)
114 expect(pool.opts.exitHandler).toStrictEqual(testHandler)
fd7ebd49 115 await pool.destroy()
7c0ba920
JB
116 })
117
fd7ebd49 118 it('Simulate worker not found during increaseWorkerRunningTasks', async () => {
bf9549ae
JB
119 const pool = new StubPoolWithWorkerTasksUsageMapClear(
120 numberOfWorkers,
121 './tests/worker-files/cluster/testWorker.js'
122 )
123 // Simulate worker not found.
124 pool.removeAllWorker()
125 expect(() => pool.increaseWorkerRunningTasks()).toThrowError(
126 workerNotFoundInTasksUsageMapError
127 )
fd7ebd49 128 await pool.destroy()
bf9549ae
JB
129 })
130
fd7ebd49 131 it('Simulate worker not found during decreaseWorkerRunningTasks', async () => {
bf9549ae
JB
132 const pool = new StubPoolWithWorkerTasksUsageMapClear(
133 numberOfWorkers,
134 './tests/worker-files/cluster/testWorker.js',
135 {
136 errorHandler: e => console.error(e)
137 }
138 )
139 // Simulate worker not found.
140 pool.removeAllWorker()
141 expect(() => pool.decreaseWorkerRunningTasks()).toThrowError(
142 workerNotFoundInTasksUsageMapError
143 )
fd7ebd49 144 await pool.destroy()
bf9549ae
JB
145 })
146
fd7ebd49 147 it('Simulate worker not found during stepWorkerRunTasks', async () => {
bf9549ae
JB
148 const pool = new StubPoolWithWorkerTasksUsageMapClear(
149 numberOfWorkers,
150 './tests/worker-files/cluster/testWorker.js',
151 {
152 errorHandler: e => console.error(e)
153 }
154 )
155 // Simulate worker not found.
156 pool.removeAllWorker()
157 expect(() => pool.stepWorkerRunTasks()).toThrowError(
158 workerNotFoundInTasksUsageMapError
159 )
fd7ebd49 160 await pool.destroy()
bf9549ae
JB
161 })
162
fd7ebd49 163 it('Simulate worker not found during updateWorkerTasksRunTime with strategy not requiring it', async () => {
bf9549ae
JB
164 const pool = new StubPoolWithWorkerTasksUsageMapClear(
165 numberOfWorkers,
166 './tests/worker-files/cluster/testWorker.js',
167 {
168 errorHandler: e => console.error(e)
169 }
170 )
171 // Simulate worker not found.
172 pool.removeAllWorker()
10fcfaf4 173 expect(() => pool.updateWorkerTasksRunTime()).not.toThrowError()
fd7ebd49 174 await pool.destroy()
10fcfaf4
JB
175 })
176
fd7ebd49 177 it('Simulate worker not found during updateWorkerTasksRunTime with strategy requiring it', async () => {
10fcfaf4
JB
178 const pool = new StubPoolWithWorkerTasksUsageMapClear(
179 numberOfWorkers,
180 './tests/worker-files/cluster/testWorker.js',
181 {
182 workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE,
183 errorHandler: e => console.error(e)
184 }
185 )
186 // Simulate worker not found.
187 pool.removeAllWorker()
bf9549ae
JB
188 expect(() => pool.updateWorkerTasksRunTime()).toThrowError(
189 workerNotFoundInTasksUsageMapError
190 )
fd7ebd49 191 await pool.destroy()
bf9549ae
JB
192 })
193
fd7ebd49 194 it('Verify that worker pool tasks usage are initialized', async () => {
bf9549ae
JB
195 const pool = new FixedClusterPool(
196 numberOfWorkers,
197 './tests/worker-files/cluster/testWorker.js'
198 )
199 for (const tasksUsage of pool.workersTasksUsage.values()) {
200 expect(tasksUsage).toBeDefined()
201 expect(tasksUsage.run).toBe(0)
202 expect(tasksUsage.running).toBe(0)
203 expect(tasksUsage.runTime).toBe(0)
204 expect(tasksUsage.avgRunTime).toBe(0)
205 }
fd7ebd49 206 await pool.destroy()
bf9549ae
JB
207 })
208
209 it('Verify that worker pool tasks usage are computed', async () => {
210 const pool = new FixedClusterPool(
211 numberOfWorkers,
212 './tests/worker-files/cluster/testWorker.js'
213 )
214 const promises = []
215 for (let i = 0; i < numberOfWorkers * 2; i++) {
6db75ad9 216 promises.push(pool.execute())
bf9549ae
JB
217 }
218 for (const tasksUsage of pool.workersTasksUsage.values()) {
219 expect(tasksUsage).toBeDefined()
220 expect(tasksUsage.run).toBe(0)
221 expect(tasksUsage.running).toBe(numberOfWorkers * 2)
222 expect(tasksUsage.runTime).toBe(0)
223 expect(tasksUsage.avgRunTime).toBe(0)
224 }
225 await Promise.all(promises)
226 for (const tasksUsage of pool.workersTasksUsage.values()) {
227 expect(tasksUsage).toBeDefined()
228 expect(tasksUsage.run).toBe(numberOfWorkers * 2)
229 expect(tasksUsage.running).toBe(0)
230 expect(tasksUsage.runTime).toBeGreaterThanOrEqual(0)
231 expect(tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0)
232 }
fd7ebd49 233 await pool.destroy()
bf9549ae
JB
234 })
235
ee11a4a2 236 it('Verify that worker pool tasks usage are reset at worker choice strategy change', async () => {
9e619829 237 let pool = new FixedThreadPool(
ee11a4a2
JB
238 numberOfWorkers,
239 './tests/worker-files/thread/testWorker.js'
240 )
241 const promises = []
242 for (let i = 0; i < numberOfWorkers * 2; i++) {
243 promises.push(pool.execute())
244 }
245 await Promise.all(promises)
246 for (const tasksUsage of pool.workersTasksUsage.values()) {
247 expect(tasksUsage).toBeDefined()
248 expect(tasksUsage.run).toBe(numberOfWorkers * 2)
249 expect(tasksUsage.running).toBe(0)
250 expect(tasksUsage.runTime).toBeGreaterThanOrEqual(0)
251 expect(tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0)
252 }
253 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
254 for (const tasksUsage of pool.workersTasksUsage.values()) {
9e619829
JB
255 expect(tasksUsage).toBeDefined()
256 expect(tasksUsage.run).toBe(0)
257 expect(tasksUsage.running).toBe(0)
258 expect(tasksUsage.runTime).toBe(0)
259 expect(tasksUsage.avgRunTime).toBe(0)
260 }
261 await pool.destroy()
262 pool = new DynamicThreadPool(
263 numberOfWorkers,
264 numberOfWorkers,
265 './tests/worker-files/thread/testWorker.js'
266 )
267 promises.length = 0
268 for (let i = 0; i < numberOfWorkers * 2; i++) {
269 promises.push(pool.execute())
270 }
271 await Promise.all(promises)
272 for (const tasksUsage of pool.workersTasksUsage.values()) {
273 expect(tasksUsage).toBeDefined()
274 expect(tasksUsage.run).toBe(numberOfWorkers * 2)
275 expect(tasksUsage.running).toBe(0)
276 expect(tasksUsage.runTime).toBeGreaterThanOrEqual(0)
277 expect(tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0)
278 }
279 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
280 for (const tasksUsage of pool.workersTasksUsage.values()) {
ee11a4a2
JB
281 expect(tasksUsage).toBeDefined()
282 expect(tasksUsage.run).toBe(0)
283 expect(tasksUsage.running).toBe(0)
284 expect(tasksUsage.runTime).toBe(0)
285 expect(tasksUsage.avgRunTime).toBe(0)
286 }
fd7ebd49 287 await pool.destroy()
ee11a4a2
JB
288 })
289
cf597bc5 290 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
7c0ba920
JB
291 const pool = new FixedThreadPool(
292 numberOfWorkers,
293 './tests/worker-files/thread/testWorker.js'
294 )
295 const promises = []
296 let poolBusy = 0
297 pool.emitter.on('busy', () => poolBusy++)
298 for (let i = 0; i < numberOfWorkers * 2; i++) {
6db75ad9 299 promises.push(pool.execute())
7c0ba920 300 }
cf597bc5 301 await Promise.all(promises)
14916bf9
JB
302 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
303 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
304 expect(poolBusy).toBe(numberOfWorkers + 1)
fd7ebd49 305 await pool.destroy()
7c0ba920 306 })
3ec964d6 307})