feat: add pool runtime setters
[poolifier.git] / tests / pools / abstract / abstract-pool.test.js
CommitLineData
a61a0724 1const { expect } = require('expect')
e843b904 2const {
9e619829 3 DynamicThreadPool,
aee46736 4 FixedClusterPool,
e843b904 5 FixedThreadPool,
aee46736 6 PoolEvents,
e843b904
JB
7 WorkerChoiceStrategies
8} = require('../../../lib/index')
78099a15 9const { CircularArray } = require('../../../lib/circular-array')
e1ffb94f
JB
10
11describe('Abstract pool test suite', () => {
12 const numberOfWorkers = 1
3032893a 13 const workerNotFoundInPoolError = new Error(
f06e48d8 14 'Worker could not be found in the pool worker nodes'
e1ffb94f 15 )
a8884ffd 16 class StubPoolWithRemoveAllWorker extends FixedThreadPool {
e1ffb94f 17 removeAllWorker () {
d4aeae5a 18 this.workerNodes = []
c923ce56 19 this.promiseResponseMap.clear()
e1ffb94f 20 }
3ec964d6 21 }
a8884ffd 22 class StubPoolWithIsMain extends FixedThreadPool {
e1ffb94f
JB
23 isMain () {
24 return false
25 }
3ec964d6 26 }
3ec964d6 27
3ec964d6 28 it('Simulate pool creation from a non main thread/process', () => {
8d3782fa
JB
29 expect(
30 () =>
a8884ffd 31 new StubPoolWithIsMain(
7c0ba920 32 numberOfWorkers,
8d3782fa
JB
33 './tests/worker-files/thread/testWorker.js',
34 {
35 errorHandler: e => console.error(e)
36 }
37 )
d4aeae5a 38 ).toThrowError('Cannot start a pool from a worker!')
3ec964d6 39 })
c510fea7
APA
40
41 it('Verify that filePath is checked', () => {
292ad316
JB
42 const expectedError = new Error(
43 'Please specify a file with a worker implementation'
44 )
7c0ba920 45 expect(() => new FixedThreadPool(numberOfWorkers)).toThrowError(
292ad316 46 expectedError
8d3782fa 47 )
7c0ba920 48 expect(() => new FixedThreadPool(numberOfWorkers, '')).toThrowError(
292ad316 49 expectedError
8d3782fa
JB
50 )
51 })
52
53 it('Verify that numberOfWorkers is checked', () => {
54 expect(() => new FixedThreadPool()).toThrowError(
d4aeae5a 55 'Cannot instantiate a pool without specifying the number of workers'
8d3782fa
JB
56 )
57 })
58
59 it('Verify that a negative number of workers is checked', () => {
60 expect(
61 () =>
62 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
63 ).toThrowError(
473c717a
JB
64 new RangeError(
65 'Cannot instantiate a pool with a negative number of workers'
66 )
8d3782fa
JB
67 )
68 })
69
70 it('Verify that a non integer number of workers is checked', () => {
71 expect(
72 () =>
73 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
74 ).toThrowError(
473c717a 75 new TypeError(
8d3782fa
JB
76 'Cannot instantiate a pool with a non integer number of workers'
77 )
78 )
c510fea7 79 })
7c0ba920 80
fd7ebd49 81 it('Verify that pool options are checked', async () => {
7c0ba920
JB
82 let pool = new FixedThreadPool(
83 numberOfWorkers,
84 './tests/worker-files/thread/testWorker.js'
85 )
8620fb25 86 expect(pool.opts.enableEvents).toBe(true)
7c0ba920 87 expect(pool.emitter).toBeDefined()
ff733df7 88 expect(pool.opts.enableTasksQueue).toBe(false)
d4aeae5a 89 expect(pool.opts.tasksQueueOptions).toBeUndefined()
e843b904
JB
90 expect(pool.opts.workerChoiceStrategy).toBe(
91 WorkerChoiceStrategies.ROUND_ROBIN
92 )
da309861
JB
93 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
94 medRunTime: false
95 })
35cf1c03
JB
96 expect(pool.opts.messageHandler).toBeUndefined()
97 expect(pool.opts.errorHandler).toBeUndefined()
98 expect(pool.opts.onlineHandler).toBeUndefined()
99 expect(pool.opts.exitHandler).toBeUndefined()
fd7ebd49 100 await pool.destroy()
35cf1c03 101 const testHandler = () => console.log('test handler executed')
7c0ba920
JB
102 pool = new FixedThreadPool(
103 numberOfWorkers,
104 './tests/worker-files/thread/testWorker.js',
105 {
737c6d97 106 workerChoiceStrategy: WorkerChoiceStrategies.LESS_USED,
da309861 107 workerChoiceStrategyOptions: { medRunTime: true },
35cf1c03 108 enableEvents: false,
ff733df7 109 enableTasksQueue: true,
d4aeae5a 110 tasksQueueOptions: { concurrency: 2 },
35cf1c03
JB
111 messageHandler: testHandler,
112 errorHandler: testHandler,
113 onlineHandler: testHandler,
114 exitHandler: testHandler
7c0ba920
JB
115 }
116 )
8620fb25 117 expect(pool.opts.enableEvents).toBe(false)
7c0ba920 118 expect(pool.emitter).toBeUndefined()
ff733df7 119 expect(pool.opts.enableTasksQueue).toBe(true)
d4aeae5a 120 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
e843b904 121 expect(pool.opts.workerChoiceStrategy).toBe(
737c6d97 122 WorkerChoiceStrategies.LESS_USED
e843b904 123 )
da309861
JB
124 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
125 medRunTime: true
126 })
35cf1c03
JB
127 expect(pool.opts.messageHandler).toStrictEqual(testHandler)
128 expect(pool.opts.errorHandler).toStrictEqual(testHandler)
129 expect(pool.opts.onlineHandler).toStrictEqual(testHandler)
130 expect(pool.opts.exitHandler).toStrictEqual(testHandler)
fd7ebd49 131 await pool.destroy()
7c0ba920
JB
132 })
133
a20f0ba5 134 it('Verify that pool options are validated', async () => {
d4aeae5a
JB
135 expect(
136 () =>
137 new FixedThreadPool(
138 numberOfWorkers,
139 './tests/worker-files/thread/testWorker.js',
140 {
141 enableTasksQueue: true,
142 tasksQueueOptions: { concurrency: 0 }
143 }
144 )
145 ).toThrowError("Invalid worker tasks concurrency '0'")
146 expect(
147 () =>
148 new FixedThreadPool(
149 numberOfWorkers,
150 './tests/worker-files/thread/testWorker.js',
151 {
152 workerChoiceStrategy: 'invalidStrategy'
153 }
154 )
155 ).toThrowError("Invalid worker choice strategy 'invalidStrategy'")
156 })
157
a20f0ba5
JB
158 it('Verify that worker choice strategy options can be set', async () => {
159 const pool = new FixedThreadPool(
160 numberOfWorkers,
161 './tests/worker-files/thread/testWorker.js',
162 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
163 )
164 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
165 medRunTime: false
166 })
167 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
168 .workerChoiceStrategies) {
169 expect(workerChoiceStrategy.opts).toStrictEqual({ medRunTime: false })
170 }
171 expect(
172 pool.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime
173 ).toBe(true)
174 expect(
175 pool.workerChoiceStrategyContext.getRequiredStatistics().medRunTime
176 ).toBe(false)
177 pool.setWorkerChoiceStrategyOptions({ medRunTime: true })
178 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
179 medRunTime: true
180 })
181 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
182 .workerChoiceStrategies) {
183 expect(workerChoiceStrategy.opts).toStrictEqual({ medRunTime: true })
184 }
185 expect(
186 pool.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime
187 ).toBe(false)
188 expect(
189 pool.workerChoiceStrategyContext.getRequiredStatistics().medRunTime
190 ).toBe(true)
191 pool.setWorkerChoiceStrategyOptions({ medRunTime: false })
192 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
193 medRunTime: false
194 })
195 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
196 .workerChoiceStrategies) {
197 expect(workerChoiceStrategy.opts).toStrictEqual({ medRunTime: false })
198 }
199 expect(
200 pool.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime
201 ).toBe(true)
202 expect(
203 pool.workerChoiceStrategyContext.getRequiredStatistics().medRunTime
204 ).toBe(false)
205 await pool.destroy()
206 })
207
208 it('Verify that tasks queue can be enabled/disabled', async () => {
209 const pool = new FixedThreadPool(
210 numberOfWorkers,
211 './tests/worker-files/thread/testWorker.js'
212 )
213 expect(pool.opts.enableTasksQueue).toBe(false)
214 expect(pool.opts.tasksQueueOptions).toBeUndefined()
215 pool.enableTasksQueue(true)
216 expect(pool.opts.enableTasksQueue).toBe(true)
217 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 1 })
218 pool.enableTasksQueue(true, { concurrency: 2 })
219 expect(pool.opts.enableTasksQueue).toBe(true)
220 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
221 pool.enableTasksQueue(false)
222 expect(pool.opts.enableTasksQueue).toBe(false)
223 expect(pool.opts.tasksQueueOptions).toBeUndefined()
224 await pool.destroy()
225 })
226
227 it('Verify that tasks queue options can be set', async () => {
228 const pool = new FixedThreadPool(
229 numberOfWorkers,
230 './tests/worker-files/thread/testWorker.js',
231 { enableTasksQueue: true }
232 )
233 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 1 })
234 pool.setTasksQueueOptions({ concurrency: 2 })
235 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
236 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrowError(
237 "Invalid worker tasks concurrency '0'"
238 )
239 await pool.destroy()
240 })
241
d4aeae5a 242 it('Simulate worker not found at getWorkerTasksUsage()', async () => {
a8884ffd 243 const pool = new StubPoolWithRemoveAllWorker(
10fcfaf4
JB
244 numberOfWorkers,
245 './tests/worker-files/cluster/testWorker.js',
246 {
10fcfaf4
JB
247 errorHandler: e => console.error(e)
248 }
249 )
d4aeae5a 250 expect(pool.workerNodes.length).toBe(numberOfWorkers)
10fcfaf4
JB
251 // Simulate worker not found.
252 pool.removeAllWorker()
d4aeae5a 253 expect(pool.workerNodes.length).toBe(0)
3032893a
JB
254 expect(() => pool.getWorkerTasksUsage()).toThrowError(
255 workerNotFoundInPoolError
bf9549ae 256 )
fd7ebd49 257 await pool.destroy()
bf9549ae
JB
258 })
259
fd7ebd49 260 it('Verify that worker pool tasks usage are initialized', async () => {
bf9549ae
JB
261 const pool = new FixedClusterPool(
262 numberOfWorkers,
263 './tests/worker-files/cluster/testWorker.js'
264 )
f06e48d8
JB
265 for (const workerNode of pool.workerNodes) {
266 expect(workerNode.tasksUsage).toBeDefined()
267 expect(workerNode.tasksUsage.run).toBe(0)
268 expect(workerNode.tasksUsage.running).toBe(0)
269 expect(workerNode.tasksUsage.runTime).toBe(0)
270 expect(workerNode.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray)
271 expect(workerNode.tasksUsage.runTimeHistory.length).toBe(0)
272 expect(workerNode.tasksUsage.avgRunTime).toBe(0)
273 expect(workerNode.tasksUsage.medRunTime).toBe(0)
274 expect(workerNode.tasksUsage.error).toBe(0)
275 }
276 await pool.destroy()
277 })
278
279 it('Verify that worker pool tasks queue are initialized', async () => {
280 const pool = new FixedClusterPool(
281 numberOfWorkers,
282 './tests/worker-files/cluster/testWorker.js'
283 )
284 for (const workerNode of pool.workerNodes) {
285 expect(workerNode.tasksQueue).toBeDefined()
286 expect(workerNode.tasksQueue).toBeInstanceOf(Array)
287 expect(workerNode.tasksQueue.length).toBe(0)
bf9549ae 288 }
fd7ebd49 289 await pool.destroy()
bf9549ae
JB
290 })
291
292 it('Verify that worker pool tasks usage are computed', async () => {
293 const pool = new FixedClusterPool(
294 numberOfWorkers,
295 './tests/worker-files/cluster/testWorker.js'
296 )
297 const promises = []
298 for (let i = 0; i < numberOfWorkers * 2; i++) {
6db75ad9 299 promises.push(pool.execute())
bf9549ae 300 }
f06e48d8
JB
301 for (const workerNode of pool.workerNodes) {
302 expect(workerNode.tasksUsage).toBeDefined()
303 expect(workerNode.tasksUsage.run).toBe(0)
304 expect(workerNode.tasksUsage.running).toBe(numberOfWorkers * 2)
305 expect(workerNode.tasksUsage.runTime).toBe(0)
306 expect(workerNode.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray)
307 expect(workerNode.tasksUsage.runTimeHistory.length).toBe(0)
308 expect(workerNode.tasksUsage.avgRunTime).toBe(0)
309 expect(workerNode.tasksUsage.medRunTime).toBe(0)
310 expect(workerNode.tasksUsage.error).toBe(0)
bf9549ae
JB
311 }
312 await Promise.all(promises)
f06e48d8
JB
313 for (const workerNode of pool.workerNodes) {
314 expect(workerNode.tasksUsage).toBeDefined()
315 expect(workerNode.tasksUsage.run).toBe(numberOfWorkers * 2)
316 expect(workerNode.tasksUsage.running).toBe(0)
317 expect(workerNode.tasksUsage.runTime).toBeGreaterThanOrEqual(0)
318 expect(workerNode.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray)
319 expect(workerNode.tasksUsage.runTimeHistory.length).toBe(0)
320 expect(workerNode.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0)
321 expect(workerNode.tasksUsage.medRunTime).toBe(0)
322 expect(workerNode.tasksUsage.error).toBe(0)
bf9549ae 323 }
fd7ebd49 324 await pool.destroy()
bf9549ae
JB
325 })
326
ee11a4a2 327 it('Verify that worker pool tasks usage are reset at worker choice strategy change', async () => {
7fd82a1c 328 const pool = new DynamicThreadPool(
9e619829
JB
329 numberOfWorkers,
330 numberOfWorkers,
331 './tests/worker-files/thread/testWorker.js'
332 )
7fd82a1c 333 const promises = []
9e619829
JB
334 for (let i = 0; i < numberOfWorkers * 2; i++) {
335 promises.push(pool.execute())
336 }
337 await Promise.all(promises)
f06e48d8
JB
338 for (const workerNode of pool.workerNodes) {
339 expect(workerNode.tasksUsage).toBeDefined()
340 expect(workerNode.tasksUsage.run).toBe(numberOfWorkers * 2)
341 expect(workerNode.tasksUsage.running).toBe(0)
342 expect(workerNode.tasksUsage.runTime).toBeGreaterThanOrEqual(0)
343 expect(workerNode.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray)
344 expect(workerNode.tasksUsage.runTimeHistory.length).toBe(0)
345 expect(workerNode.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0)
346 expect(workerNode.tasksUsage.medRunTime).toBe(0)
347 expect(workerNode.tasksUsage.error).toBe(0)
9e619829
JB
348 }
349 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
f06e48d8
JB
350 for (const workerNode of pool.workerNodes) {
351 expect(workerNode.tasksUsage).toBeDefined()
352 expect(workerNode.tasksUsage.run).toBe(0)
353 expect(workerNode.tasksUsage.running).toBe(0)
354 expect(workerNode.tasksUsage.runTime).toBe(0)
355 expect(workerNode.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray)
356 expect(workerNode.tasksUsage.runTimeHistory.length).toBe(0)
357 expect(workerNode.tasksUsage.avgRunTime).toBe(0)
358 expect(workerNode.tasksUsage.medRunTime).toBe(0)
359 expect(workerNode.tasksUsage.error).toBe(0)
ee11a4a2 360 }
fd7ebd49 361 await pool.destroy()
ee11a4a2
JB
362 })
363
164d950a
JB
364 it("Verify that pool event emitter 'full' event can register a callback", async () => {
365 const pool = new DynamicThreadPool(
366 numberOfWorkers,
367 numberOfWorkers,
368 './tests/worker-files/thread/testWorker.js'
369 )
370 const promises = []
371 let poolFull = 0
aee46736 372 pool.emitter.on(PoolEvents.full, () => ++poolFull)
164d950a
JB
373 for (let i = 0; i < numberOfWorkers * 2; i++) {
374 promises.push(pool.execute())
375 }
376 await Promise.all(promises)
594bfb84 377 // The `full` event is triggered when the number of submitted tasks at once reach the max number of workers in the dynamic pool.
164d950a
JB
378 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool.
379 expect(poolFull).toBe(numberOfWorkers + 1)
380 await pool.destroy()
381 })
382
cf597bc5 383 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
7c0ba920
JB
384 const pool = new FixedThreadPool(
385 numberOfWorkers,
386 './tests/worker-files/thread/testWorker.js'
387 )
388 const promises = []
389 let poolBusy = 0
aee46736 390 pool.emitter.on(PoolEvents.busy, () => ++poolBusy)
7c0ba920 391 for (let i = 0; i < numberOfWorkers * 2; i++) {
6db75ad9 392 promises.push(pool.execute())
7c0ba920 393 }
cf597bc5 394 await Promise.all(promises)
14916bf9
JB
395 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
396 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
397 expect(poolBusy).toBe(numberOfWorkers + 1)
fd7ebd49 398 await pool.destroy()
7c0ba920 399 })
3ec964d6 400})