Merge branch 'master' into multiple-functions
[poolifier.git] / tests / pools / abstract / abstract-pool.test.js
CommitLineData
a61a0724 1const { expect } = require('expect')
e843b904 2const {
9e619829 3 DynamicThreadPool,
aee46736 4 FixedClusterPool,
e843b904 5 FixedThreadPool,
aee46736 6 PoolEvents,
e843b904 7 WorkerChoiceStrategies
cdace0e5 8} = require('../../../lib')
78099a15 9const { CircularArray } = require('../../../lib/circular-array')
29ee7e9a 10const { Queue } = require('../../../lib/queue')
e1ffb94f
JB
11
12describe('Abstract pool test suite', () => {
13 const numberOfWorkers = 1
3032893a 14 const workerNotFoundInPoolError = new Error(
f06e48d8 15 'Worker could not be found in the pool worker nodes'
e1ffb94f 16 )
a8884ffd 17 class StubPoolWithRemoveAllWorker extends FixedThreadPool {
e1ffb94f 18 removeAllWorker () {
d4aeae5a 19 this.workerNodes = []
c923ce56 20 this.promiseResponseMap.clear()
e1ffb94f 21 }
3ec964d6 22 }
a8884ffd 23 class StubPoolWithIsMain extends FixedThreadPool {
e1ffb94f
JB
24 isMain () {
25 return false
26 }
3ec964d6 27 }
3ec964d6 28
3ec964d6 29 it('Simulate pool creation from a non main thread/process', () => {
8d3782fa
JB
30 expect(
31 () =>
a8884ffd 32 new StubPoolWithIsMain(
7c0ba920 33 numberOfWorkers,
8d3782fa
JB
34 './tests/worker-files/thread/testWorker.js',
35 {
36 errorHandler: e => console.error(e)
37 }
38 )
d4aeae5a 39 ).toThrowError('Cannot start a pool from a worker!')
3ec964d6 40 })
c510fea7
APA
41
42 it('Verify that filePath is checked', () => {
292ad316
JB
43 const expectedError = new Error(
44 'Please specify a file with a worker implementation'
45 )
7c0ba920 46 expect(() => new FixedThreadPool(numberOfWorkers)).toThrowError(
292ad316 47 expectedError
8d3782fa 48 )
7c0ba920 49 expect(() => new FixedThreadPool(numberOfWorkers, '')).toThrowError(
292ad316 50 expectedError
8d3782fa
JB
51 )
52 })
53
54 it('Verify that numberOfWorkers is checked', () => {
55 expect(() => new FixedThreadPool()).toThrowError(
d4aeae5a 56 'Cannot instantiate a pool without specifying the number of workers'
8d3782fa
JB
57 )
58 })
59
60 it('Verify that a negative number of workers is checked', () => {
61 expect(
62 () =>
63 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
64 ).toThrowError(
473c717a
JB
65 new RangeError(
66 'Cannot instantiate a pool with a negative number of workers'
67 )
8d3782fa
JB
68 )
69 })
70
71 it('Verify that a non integer number of workers is checked', () => {
72 expect(
73 () =>
74 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
75 ).toThrowError(
473c717a 76 new TypeError(
8d3782fa
JB
77 'Cannot instantiate a pool with a non integer number of workers'
78 )
79 )
c510fea7 80 })
7c0ba920 81
fd7ebd49 82 it('Verify that pool options are checked', async () => {
7c0ba920
JB
83 let pool = new FixedThreadPool(
84 numberOfWorkers,
85 './tests/worker-files/thread/testWorker.js'
86 )
8620fb25 87 expect(pool.opts.enableEvents).toBe(true)
7c0ba920 88 expect(pool.emitter).toBeDefined()
ff733df7 89 expect(pool.opts.enableTasksQueue).toBe(false)
d4aeae5a 90 expect(pool.opts.tasksQueueOptions).toBeUndefined()
e843b904
JB
91 expect(pool.opts.workerChoiceStrategy).toBe(
92 WorkerChoiceStrategies.ROUND_ROBIN
93 )
da309861
JB
94 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
95 medRunTime: false
96 })
35cf1c03
JB
97 expect(pool.opts.messageHandler).toBeUndefined()
98 expect(pool.opts.errorHandler).toBeUndefined()
99 expect(pool.opts.onlineHandler).toBeUndefined()
100 expect(pool.opts.exitHandler).toBeUndefined()
fd7ebd49 101 await pool.destroy()
35cf1c03 102 const testHandler = () => console.log('test handler executed')
7c0ba920
JB
103 pool = new FixedThreadPool(
104 numberOfWorkers,
105 './tests/worker-files/thread/testWorker.js',
106 {
737c6d97 107 workerChoiceStrategy: WorkerChoiceStrategies.LESS_USED,
da309861 108 workerChoiceStrategyOptions: { medRunTime: true },
35cf1c03 109 enableEvents: false,
ff733df7 110 enableTasksQueue: true,
d4aeae5a 111 tasksQueueOptions: { concurrency: 2 },
35cf1c03
JB
112 messageHandler: testHandler,
113 errorHandler: testHandler,
114 onlineHandler: testHandler,
115 exitHandler: testHandler
7c0ba920
JB
116 }
117 )
8620fb25 118 expect(pool.opts.enableEvents).toBe(false)
7c0ba920 119 expect(pool.emitter).toBeUndefined()
ff733df7 120 expect(pool.opts.enableTasksQueue).toBe(true)
d4aeae5a 121 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
e843b904 122 expect(pool.opts.workerChoiceStrategy).toBe(
737c6d97 123 WorkerChoiceStrategies.LESS_USED
e843b904 124 )
da309861
JB
125 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
126 medRunTime: true
127 })
35cf1c03
JB
128 expect(pool.opts.messageHandler).toStrictEqual(testHandler)
129 expect(pool.opts.errorHandler).toStrictEqual(testHandler)
130 expect(pool.opts.onlineHandler).toStrictEqual(testHandler)
131 expect(pool.opts.exitHandler).toStrictEqual(testHandler)
fd7ebd49 132 await pool.destroy()
7c0ba920
JB
133 })
134
a20f0ba5 135 it('Verify that pool options are validated', async () => {
d4aeae5a
JB
136 expect(
137 () =>
138 new FixedThreadPool(
139 numberOfWorkers,
140 './tests/worker-files/thread/testWorker.js',
141 {
142 enableTasksQueue: true,
143 tasksQueueOptions: { concurrency: 0 }
144 }
145 )
146 ).toThrowError("Invalid worker tasks concurrency '0'")
147 expect(
148 () =>
149 new FixedThreadPool(
150 numberOfWorkers,
151 './tests/worker-files/thread/testWorker.js',
152 {
153 workerChoiceStrategy: 'invalidStrategy'
154 }
155 )
156 ).toThrowError("Invalid worker choice strategy 'invalidStrategy'")
157 })
158
a20f0ba5
JB
159 it('Verify that worker choice strategy options can be set', async () => {
160 const pool = new FixedThreadPool(
161 numberOfWorkers,
162 './tests/worker-files/thread/testWorker.js',
163 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
164 )
165 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
166 medRunTime: false
167 })
168 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
169 .workerChoiceStrategies) {
170 expect(workerChoiceStrategy.opts).toStrictEqual({ medRunTime: false })
171 }
172 expect(
173 pool.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime
174 ).toBe(true)
175 expect(
176 pool.workerChoiceStrategyContext.getRequiredStatistics().medRunTime
177 ).toBe(false)
178 pool.setWorkerChoiceStrategyOptions({ medRunTime: true })
179 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
180 medRunTime: true
181 })
182 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
183 .workerChoiceStrategies) {
184 expect(workerChoiceStrategy.opts).toStrictEqual({ medRunTime: true })
185 }
186 expect(
187 pool.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime
188 ).toBe(false)
189 expect(
190 pool.workerChoiceStrategyContext.getRequiredStatistics().medRunTime
191 ).toBe(true)
192 pool.setWorkerChoiceStrategyOptions({ medRunTime: false })
193 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
194 medRunTime: false
195 })
196 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
197 .workerChoiceStrategies) {
198 expect(workerChoiceStrategy.opts).toStrictEqual({ medRunTime: false })
199 }
200 expect(
201 pool.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime
202 ).toBe(true)
203 expect(
204 pool.workerChoiceStrategyContext.getRequiredStatistics().medRunTime
205 ).toBe(false)
206 await pool.destroy()
207 })
208
209 it('Verify that tasks queue can be enabled/disabled', async () => {
210 const pool = new FixedThreadPool(
211 numberOfWorkers,
212 './tests/worker-files/thread/testWorker.js'
213 )
214 expect(pool.opts.enableTasksQueue).toBe(false)
215 expect(pool.opts.tasksQueueOptions).toBeUndefined()
216 pool.enableTasksQueue(true)
217 expect(pool.opts.enableTasksQueue).toBe(true)
218 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 1 })
219 pool.enableTasksQueue(true, { concurrency: 2 })
220 expect(pool.opts.enableTasksQueue).toBe(true)
221 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
222 pool.enableTasksQueue(false)
223 expect(pool.opts.enableTasksQueue).toBe(false)
224 expect(pool.opts.tasksQueueOptions).toBeUndefined()
225 await pool.destroy()
226 })
227
228 it('Verify that tasks queue options can be set', async () => {
229 const pool = new FixedThreadPool(
230 numberOfWorkers,
231 './tests/worker-files/thread/testWorker.js',
232 { enableTasksQueue: true }
233 )
234 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 1 })
235 pool.setTasksQueueOptions({ concurrency: 2 })
236 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
237 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrowError(
238 "Invalid worker tasks concurrency '0'"
239 )
240 await pool.destroy()
241 })
242
d4aeae5a 243 it('Simulate worker not found at getWorkerTasksUsage()', async () => {
a8884ffd 244 const pool = new StubPoolWithRemoveAllWorker(
10fcfaf4
JB
245 numberOfWorkers,
246 './tests/worker-files/cluster/testWorker.js',
247 {
10fcfaf4
JB
248 errorHandler: e => console.error(e)
249 }
250 )
d4aeae5a 251 expect(pool.workerNodes.length).toBe(numberOfWorkers)
10fcfaf4
JB
252 // Simulate worker not found.
253 pool.removeAllWorker()
d4aeae5a 254 expect(pool.workerNodes.length).toBe(0)
3032893a
JB
255 expect(() => pool.getWorkerTasksUsage()).toThrowError(
256 workerNotFoundInPoolError
bf9549ae 257 )
fd7ebd49 258 await pool.destroy()
bf9549ae
JB
259 })
260
fd7ebd49 261 it('Verify that worker pool tasks usage are initialized', async () => {
bf9549ae
JB
262 const pool = new FixedClusterPool(
263 numberOfWorkers,
264 './tests/worker-files/cluster/testWorker.js'
265 )
f06e48d8
JB
266 for (const workerNode of pool.workerNodes) {
267 expect(workerNode.tasksUsage).toBeDefined()
268 expect(workerNode.tasksUsage.run).toBe(0)
269 expect(workerNode.tasksUsage.running).toBe(0)
270 expect(workerNode.tasksUsage.runTime).toBe(0)
271 expect(workerNode.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray)
272 expect(workerNode.tasksUsage.runTimeHistory.length).toBe(0)
273 expect(workerNode.tasksUsage.avgRunTime).toBe(0)
274 expect(workerNode.tasksUsage.medRunTime).toBe(0)
275 expect(workerNode.tasksUsage.error).toBe(0)
276 }
277 await pool.destroy()
278 })
279
280 it('Verify that worker pool tasks queue are initialized', async () => {
281 const pool = new FixedClusterPool(
282 numberOfWorkers,
283 './tests/worker-files/cluster/testWorker.js'
284 )
285 for (const workerNode of pool.workerNodes) {
286 expect(workerNode.tasksQueue).toBeDefined()
29ee7e9a 287 expect(workerNode.tasksQueue).toBeInstanceOf(Queue)
4d8bf9e4 288 expect(workerNode.tasksQueue.size).toBe(0)
bf9549ae 289 }
fd7ebd49 290 await pool.destroy()
bf9549ae
JB
291 })
292
293 it('Verify that worker pool tasks usage are computed', async () => {
294 const pool = new FixedClusterPool(
295 numberOfWorkers,
296 './tests/worker-files/cluster/testWorker.js'
297 )
298 const promises = []
299 for (let i = 0; i < numberOfWorkers * 2; i++) {
6db75ad9 300 promises.push(pool.execute())
bf9549ae 301 }
f06e48d8
JB
302 for (const workerNode of pool.workerNodes) {
303 expect(workerNode.tasksUsage).toBeDefined()
304 expect(workerNode.tasksUsage.run).toBe(0)
305 expect(workerNode.tasksUsage.running).toBe(numberOfWorkers * 2)
306 expect(workerNode.tasksUsage.runTime).toBe(0)
307 expect(workerNode.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray)
308 expect(workerNode.tasksUsage.runTimeHistory.length).toBe(0)
309 expect(workerNode.tasksUsage.avgRunTime).toBe(0)
310 expect(workerNode.tasksUsage.medRunTime).toBe(0)
311 expect(workerNode.tasksUsage.error).toBe(0)
bf9549ae
JB
312 }
313 await Promise.all(promises)
f06e48d8
JB
314 for (const workerNode of pool.workerNodes) {
315 expect(workerNode.tasksUsage).toBeDefined()
316 expect(workerNode.tasksUsage.run).toBe(numberOfWorkers * 2)
317 expect(workerNode.tasksUsage.running).toBe(0)
318 expect(workerNode.tasksUsage.runTime).toBeGreaterThanOrEqual(0)
319 expect(workerNode.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray)
320 expect(workerNode.tasksUsage.runTimeHistory.length).toBe(0)
321 expect(workerNode.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0)
322 expect(workerNode.tasksUsage.medRunTime).toBe(0)
323 expect(workerNode.tasksUsage.error).toBe(0)
bf9549ae 324 }
fd7ebd49 325 await pool.destroy()
bf9549ae
JB
326 })
327
ee11a4a2 328 it('Verify that worker pool tasks usage are reset at worker choice strategy change', async () => {
7fd82a1c 329 const pool = new DynamicThreadPool(
9e619829
JB
330 numberOfWorkers,
331 numberOfWorkers,
332 './tests/worker-files/thread/testWorker.js'
333 )
7fd82a1c 334 const promises = []
9e619829
JB
335 for (let i = 0; i < numberOfWorkers * 2; i++) {
336 promises.push(pool.execute())
337 }
338 await Promise.all(promises)
f06e48d8
JB
339 for (const workerNode of pool.workerNodes) {
340 expect(workerNode.tasksUsage).toBeDefined()
341 expect(workerNode.tasksUsage.run).toBe(numberOfWorkers * 2)
342 expect(workerNode.tasksUsage.running).toBe(0)
343 expect(workerNode.tasksUsage.runTime).toBeGreaterThanOrEqual(0)
344 expect(workerNode.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray)
345 expect(workerNode.tasksUsage.runTimeHistory.length).toBe(0)
346 expect(workerNode.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0)
347 expect(workerNode.tasksUsage.medRunTime).toBe(0)
348 expect(workerNode.tasksUsage.error).toBe(0)
9e619829
JB
349 }
350 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
f06e48d8
JB
351 for (const workerNode of pool.workerNodes) {
352 expect(workerNode.tasksUsage).toBeDefined()
353 expect(workerNode.tasksUsage.run).toBe(0)
354 expect(workerNode.tasksUsage.running).toBe(0)
355 expect(workerNode.tasksUsage.runTime).toBe(0)
356 expect(workerNode.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray)
357 expect(workerNode.tasksUsage.runTimeHistory.length).toBe(0)
358 expect(workerNode.tasksUsage.avgRunTime).toBe(0)
359 expect(workerNode.tasksUsage.medRunTime).toBe(0)
360 expect(workerNode.tasksUsage.error).toBe(0)
ee11a4a2 361 }
fd7ebd49 362 await pool.destroy()
ee11a4a2
JB
363 })
364
164d950a
JB
365 it("Verify that pool event emitter 'full' event can register a callback", async () => {
366 const pool = new DynamicThreadPool(
367 numberOfWorkers,
368 numberOfWorkers,
369 './tests/worker-files/thread/testWorker.js'
370 )
371 const promises = []
372 let poolFull = 0
aee46736 373 pool.emitter.on(PoolEvents.full, () => ++poolFull)
164d950a
JB
374 for (let i = 0; i < numberOfWorkers * 2; i++) {
375 promises.push(pool.execute())
376 }
377 await Promise.all(promises)
594bfb84 378 // The `full` event is triggered when the number of submitted tasks at once reach the max number of workers in the dynamic pool.
164d950a
JB
379 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool.
380 expect(poolFull).toBe(numberOfWorkers + 1)
381 await pool.destroy()
382 })
383
cf597bc5 384 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
7c0ba920
JB
385 const pool = new FixedThreadPool(
386 numberOfWorkers,
387 './tests/worker-files/thread/testWorker.js'
388 )
389 const promises = []
390 let poolBusy = 0
aee46736 391 pool.emitter.on(PoolEvents.busy, () => ++poolBusy)
7c0ba920 392 for (let i = 0; i < numberOfWorkers * 2; i++) {
6db75ad9 393 promises.push(pool.execute())
7c0ba920 394 }
cf597bc5 395 await Promise.all(promises)
14916bf9
JB
396 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
397 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
398 expect(poolBusy).toBe(numberOfWorkers + 1)
fd7ebd49 399 await pool.destroy()
7c0ba920 400 })
3ec964d6 401})