build(deps-dev): apply updates
[poolifier.git] / tests / pools / abstract / abstract-pool.test.js
CommitLineData
a61a0724 1const { expect } = require('expect')
e843b904 2const {
70a4f5ea 3 DynamicClusterPool,
9e619829 4 DynamicThreadPool,
aee46736 5 FixedClusterPool,
e843b904 6 FixedThreadPool,
aee46736 7 PoolEvents,
e843b904 8 WorkerChoiceStrategies
cdace0e5 9} = require('../../../lib')
78099a15 10const { CircularArray } = require('../../../lib/circular-array')
29ee7e9a 11const { Queue } = require('../../../lib/queue')
e1ffb94f
JB
12
13describe('Abstract pool test suite', () => {
14 const numberOfWorkers = 1
3032893a 15 const workerNotFoundInPoolError = new Error(
f06e48d8 16 'Worker could not be found in the pool worker nodes'
e1ffb94f 17 )
a8884ffd 18 class StubPoolWithRemoveAllWorker extends FixedThreadPool {
e1ffb94f 19 removeAllWorker () {
d4aeae5a 20 this.workerNodes = []
c923ce56 21 this.promiseResponseMap.clear()
e1ffb94f 22 }
3ec964d6 23 }
a8884ffd 24 class StubPoolWithIsMain extends FixedThreadPool {
e1ffb94f
JB
25 isMain () {
26 return false
27 }
3ec964d6 28 }
3ec964d6 29
3ec964d6 30 it('Simulate pool creation from a non main thread/process', () => {
8d3782fa
JB
31 expect(
32 () =>
a8884ffd 33 new StubPoolWithIsMain(
7c0ba920 34 numberOfWorkers,
8d3782fa
JB
35 './tests/worker-files/thread/testWorker.js',
36 {
37 errorHandler: e => console.error(e)
38 }
39 )
d4aeae5a 40 ).toThrowError('Cannot start a pool from a worker!')
3ec964d6 41 })
c510fea7
APA
42
43 it('Verify that filePath is checked', () => {
292ad316
JB
44 const expectedError = new Error(
45 'Please specify a file with a worker implementation'
46 )
7c0ba920 47 expect(() => new FixedThreadPool(numberOfWorkers)).toThrowError(
292ad316 48 expectedError
8d3782fa 49 )
7c0ba920 50 expect(() => new FixedThreadPool(numberOfWorkers, '')).toThrowError(
292ad316 51 expectedError
8d3782fa
JB
52 )
53 })
54
55 it('Verify that numberOfWorkers is checked', () => {
56 expect(() => new FixedThreadPool()).toThrowError(
d4aeae5a 57 'Cannot instantiate a pool without specifying the number of workers'
8d3782fa
JB
58 )
59 })
60
61 it('Verify that a negative number of workers is checked', () => {
62 expect(
63 () =>
64 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
65 ).toThrowError(
473c717a
JB
66 new RangeError(
67 'Cannot instantiate a pool with a negative number of workers'
68 )
8d3782fa
JB
69 )
70 })
71
72 it('Verify that a non integer number of workers is checked', () => {
73 expect(
74 () =>
75 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
76 ).toThrowError(
473c717a 77 new TypeError(
8d3782fa
JB
78 'Cannot instantiate a pool with a non integer number of workers'
79 )
80 )
c510fea7 81 })
7c0ba920 82
fd7ebd49 83 it('Verify that pool options are checked', async () => {
7c0ba920
JB
84 let pool = new FixedThreadPool(
85 numberOfWorkers,
86 './tests/worker-files/thread/testWorker.js'
87 )
8620fb25 88 expect(pool.opts.enableEvents).toBe(true)
7c0ba920 89 expect(pool.emitter).toBeDefined()
ff733df7 90 expect(pool.opts.enableTasksQueue).toBe(false)
d4aeae5a 91 expect(pool.opts.tasksQueueOptions).toBeUndefined()
e843b904
JB
92 expect(pool.opts.workerChoiceStrategy).toBe(
93 WorkerChoiceStrategies.ROUND_ROBIN
94 )
da309861
JB
95 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
96 medRunTime: false
97 })
35cf1c03
JB
98 expect(pool.opts.messageHandler).toBeUndefined()
99 expect(pool.opts.errorHandler).toBeUndefined()
100 expect(pool.opts.onlineHandler).toBeUndefined()
101 expect(pool.opts.exitHandler).toBeUndefined()
fd7ebd49 102 await pool.destroy()
35cf1c03 103 const testHandler = () => console.log('test handler executed')
7c0ba920
JB
104 pool = new FixedThreadPool(
105 numberOfWorkers,
106 './tests/worker-files/thread/testWorker.js',
107 {
737c6d97 108 workerChoiceStrategy: WorkerChoiceStrategies.LESS_USED,
da309861 109 workerChoiceStrategyOptions: { medRunTime: true },
35cf1c03 110 enableEvents: false,
ff733df7 111 enableTasksQueue: true,
d4aeae5a 112 tasksQueueOptions: { concurrency: 2 },
35cf1c03
JB
113 messageHandler: testHandler,
114 errorHandler: testHandler,
115 onlineHandler: testHandler,
116 exitHandler: testHandler
7c0ba920
JB
117 }
118 )
8620fb25 119 expect(pool.opts.enableEvents).toBe(false)
7c0ba920 120 expect(pool.emitter).toBeUndefined()
ff733df7 121 expect(pool.opts.enableTasksQueue).toBe(true)
d4aeae5a 122 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
e843b904 123 expect(pool.opts.workerChoiceStrategy).toBe(
737c6d97 124 WorkerChoiceStrategies.LESS_USED
e843b904 125 )
da309861
JB
126 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
127 medRunTime: true
128 })
35cf1c03
JB
129 expect(pool.opts.messageHandler).toStrictEqual(testHandler)
130 expect(pool.opts.errorHandler).toStrictEqual(testHandler)
131 expect(pool.opts.onlineHandler).toStrictEqual(testHandler)
132 expect(pool.opts.exitHandler).toStrictEqual(testHandler)
fd7ebd49 133 await pool.destroy()
7c0ba920
JB
134 })
135
a20f0ba5 136 it('Verify that pool options are validated', async () => {
d4aeae5a
JB
137 expect(
138 () =>
139 new FixedThreadPool(
140 numberOfWorkers,
141 './tests/worker-files/thread/testWorker.js',
142 {
143 enableTasksQueue: true,
144 tasksQueueOptions: { concurrency: 0 }
145 }
146 )
147 ).toThrowError("Invalid worker tasks concurrency '0'")
148 expect(
149 () =>
150 new FixedThreadPool(
151 numberOfWorkers,
152 './tests/worker-files/thread/testWorker.js',
153 {
154 workerChoiceStrategy: 'invalidStrategy'
155 }
156 )
157 ).toThrowError("Invalid worker choice strategy 'invalidStrategy'")
158 })
159
a20f0ba5
JB
160 it('Verify that worker choice strategy options can be set', async () => {
161 const pool = new FixedThreadPool(
162 numberOfWorkers,
163 './tests/worker-files/thread/testWorker.js',
164 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
165 )
166 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
167 medRunTime: false
168 })
169 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
170 .workerChoiceStrategies) {
171 expect(workerChoiceStrategy.opts).toStrictEqual({ medRunTime: false })
172 }
173 expect(
174 pool.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime
175 ).toBe(true)
176 expect(
177 pool.workerChoiceStrategyContext.getRequiredStatistics().medRunTime
178 ).toBe(false)
179 pool.setWorkerChoiceStrategyOptions({ medRunTime: true })
180 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
181 medRunTime: true
182 })
183 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
184 .workerChoiceStrategies) {
185 expect(workerChoiceStrategy.opts).toStrictEqual({ medRunTime: true })
186 }
187 expect(
188 pool.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime
189 ).toBe(false)
190 expect(
191 pool.workerChoiceStrategyContext.getRequiredStatistics().medRunTime
192 ).toBe(true)
193 pool.setWorkerChoiceStrategyOptions({ medRunTime: false })
194 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
195 medRunTime: false
196 })
197 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
198 .workerChoiceStrategies) {
199 expect(workerChoiceStrategy.opts).toStrictEqual({ medRunTime: false })
200 }
201 expect(
202 pool.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime
203 ).toBe(true)
204 expect(
205 pool.workerChoiceStrategyContext.getRequiredStatistics().medRunTime
206 ).toBe(false)
207 await pool.destroy()
208 })
209
210 it('Verify that tasks queue can be enabled/disabled', async () => {
211 const pool = new FixedThreadPool(
212 numberOfWorkers,
213 './tests/worker-files/thread/testWorker.js'
214 )
215 expect(pool.opts.enableTasksQueue).toBe(false)
216 expect(pool.opts.tasksQueueOptions).toBeUndefined()
217 pool.enableTasksQueue(true)
218 expect(pool.opts.enableTasksQueue).toBe(true)
219 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 1 })
220 pool.enableTasksQueue(true, { concurrency: 2 })
221 expect(pool.opts.enableTasksQueue).toBe(true)
222 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
223 pool.enableTasksQueue(false)
224 expect(pool.opts.enableTasksQueue).toBe(false)
225 expect(pool.opts.tasksQueueOptions).toBeUndefined()
226 await pool.destroy()
227 })
228
229 it('Verify that tasks queue options can be set', async () => {
230 const pool = new FixedThreadPool(
231 numberOfWorkers,
232 './tests/worker-files/thread/testWorker.js',
233 { enableTasksQueue: true }
234 )
235 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 1 })
236 pool.setTasksQueueOptions({ concurrency: 2 })
237 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
238 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrowError(
239 "Invalid worker tasks concurrency '0'"
240 )
241 await pool.destroy()
242 })
243
d4aeae5a 244 it('Simulate worker not found at getWorkerTasksUsage()', async () => {
a8884ffd 245 const pool = new StubPoolWithRemoveAllWorker(
10fcfaf4
JB
246 numberOfWorkers,
247 './tests/worker-files/cluster/testWorker.js',
248 {
10fcfaf4
JB
249 errorHandler: e => console.error(e)
250 }
251 )
d4aeae5a 252 expect(pool.workerNodes.length).toBe(numberOfWorkers)
10fcfaf4
JB
253 // Simulate worker not found.
254 pool.removeAllWorker()
d4aeae5a 255 expect(pool.workerNodes.length).toBe(0)
3032893a
JB
256 expect(() => pool.getWorkerTasksUsage()).toThrowError(
257 workerNotFoundInPoolError
bf9549ae 258 )
fd7ebd49 259 await pool.destroy()
bf9549ae
JB
260 })
261
fd7ebd49 262 it('Verify that worker pool tasks usage are initialized', async () => {
bf9549ae
JB
263 const pool = new FixedClusterPool(
264 numberOfWorkers,
265 './tests/worker-files/cluster/testWorker.js'
266 )
f06e48d8
JB
267 for (const workerNode of pool.workerNodes) {
268 expect(workerNode.tasksUsage).toBeDefined()
269 expect(workerNode.tasksUsage.run).toBe(0)
270 expect(workerNode.tasksUsage.running).toBe(0)
271 expect(workerNode.tasksUsage.runTime).toBe(0)
272 expect(workerNode.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray)
273 expect(workerNode.tasksUsage.runTimeHistory.length).toBe(0)
274 expect(workerNode.tasksUsage.avgRunTime).toBe(0)
275 expect(workerNode.tasksUsage.medRunTime).toBe(0)
276 expect(workerNode.tasksUsage.error).toBe(0)
277 }
278 await pool.destroy()
279 })
280
281 it('Verify that worker pool tasks queue are initialized', async () => {
282 const pool = new FixedClusterPool(
283 numberOfWorkers,
284 './tests/worker-files/cluster/testWorker.js'
285 )
286 for (const workerNode of pool.workerNodes) {
287 expect(workerNode.tasksQueue).toBeDefined()
29ee7e9a 288 expect(workerNode.tasksQueue).toBeInstanceOf(Queue)
4d8bf9e4 289 expect(workerNode.tasksQueue.size).toBe(0)
bf9549ae 290 }
fd7ebd49 291 await pool.destroy()
bf9549ae
JB
292 })
293
294 it('Verify that worker pool tasks usage are computed', async () => {
295 const pool = new FixedClusterPool(
296 numberOfWorkers,
297 './tests/worker-files/cluster/testWorker.js'
298 )
299 const promises = []
300 for (let i = 0; i < numberOfWorkers * 2; i++) {
6db75ad9 301 promises.push(pool.execute())
bf9549ae 302 }
f06e48d8
JB
303 for (const workerNode of pool.workerNodes) {
304 expect(workerNode.tasksUsage).toBeDefined()
305 expect(workerNode.tasksUsage.run).toBe(0)
306 expect(workerNode.tasksUsage.running).toBe(numberOfWorkers * 2)
307 expect(workerNode.tasksUsage.runTime).toBe(0)
308 expect(workerNode.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray)
309 expect(workerNode.tasksUsage.runTimeHistory.length).toBe(0)
310 expect(workerNode.tasksUsage.avgRunTime).toBe(0)
311 expect(workerNode.tasksUsage.medRunTime).toBe(0)
312 expect(workerNode.tasksUsage.error).toBe(0)
bf9549ae
JB
313 }
314 await Promise.all(promises)
f06e48d8
JB
315 for (const workerNode of pool.workerNodes) {
316 expect(workerNode.tasksUsage).toBeDefined()
317 expect(workerNode.tasksUsage.run).toBe(numberOfWorkers * 2)
318 expect(workerNode.tasksUsage.running).toBe(0)
319 expect(workerNode.tasksUsage.runTime).toBeGreaterThanOrEqual(0)
320 expect(workerNode.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray)
321 expect(workerNode.tasksUsage.runTimeHistory.length).toBe(0)
322 expect(workerNode.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0)
323 expect(workerNode.tasksUsage.medRunTime).toBe(0)
324 expect(workerNode.tasksUsage.error).toBe(0)
bf9549ae 325 }
fd7ebd49 326 await pool.destroy()
bf9549ae
JB
327 })
328
ee11a4a2 329 it('Verify that worker pool tasks usage are reset at worker choice strategy change', async () => {
7fd82a1c 330 const pool = new DynamicThreadPool(
9e619829
JB
331 numberOfWorkers,
332 numberOfWorkers,
333 './tests/worker-files/thread/testWorker.js'
334 )
7fd82a1c 335 const promises = []
9e619829
JB
336 for (let i = 0; i < numberOfWorkers * 2; i++) {
337 promises.push(pool.execute())
338 }
339 await Promise.all(promises)
f06e48d8
JB
340 for (const workerNode of pool.workerNodes) {
341 expect(workerNode.tasksUsage).toBeDefined()
342 expect(workerNode.tasksUsage.run).toBe(numberOfWorkers * 2)
343 expect(workerNode.tasksUsage.running).toBe(0)
344 expect(workerNode.tasksUsage.runTime).toBeGreaterThanOrEqual(0)
345 expect(workerNode.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray)
346 expect(workerNode.tasksUsage.runTimeHistory.length).toBe(0)
347 expect(workerNode.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0)
348 expect(workerNode.tasksUsage.medRunTime).toBe(0)
349 expect(workerNode.tasksUsage.error).toBe(0)
9e619829
JB
350 }
351 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
f06e48d8
JB
352 for (const workerNode of pool.workerNodes) {
353 expect(workerNode.tasksUsage).toBeDefined()
354 expect(workerNode.tasksUsage.run).toBe(0)
355 expect(workerNode.tasksUsage.running).toBe(0)
356 expect(workerNode.tasksUsage.runTime).toBe(0)
357 expect(workerNode.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray)
358 expect(workerNode.tasksUsage.runTimeHistory.length).toBe(0)
359 expect(workerNode.tasksUsage.avgRunTime).toBe(0)
360 expect(workerNode.tasksUsage.medRunTime).toBe(0)
361 expect(workerNode.tasksUsage.error).toBe(0)
ee11a4a2 362 }
fd7ebd49 363 await pool.destroy()
ee11a4a2
JB
364 })
365
164d950a
JB
366 it("Verify that pool event emitter 'full' event can register a callback", async () => {
367 const pool = new DynamicThreadPool(
368 numberOfWorkers,
369 numberOfWorkers,
370 './tests/worker-files/thread/testWorker.js'
371 )
372 const promises = []
373 let poolFull = 0
aee46736 374 pool.emitter.on(PoolEvents.full, () => ++poolFull)
164d950a
JB
375 for (let i = 0; i < numberOfWorkers * 2; i++) {
376 promises.push(pool.execute())
377 }
378 await Promise.all(promises)
594bfb84 379 // The `full` event is triggered when the number of submitted tasks at once reach the max number of workers in the dynamic pool.
164d950a
JB
380 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool.
381 expect(poolFull).toBe(numberOfWorkers + 1)
382 await pool.destroy()
383 })
384
cf597bc5 385 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
7c0ba920
JB
386 const pool = new FixedThreadPool(
387 numberOfWorkers,
388 './tests/worker-files/thread/testWorker.js'
389 )
390 const promises = []
391 let poolBusy = 0
aee46736 392 pool.emitter.on(PoolEvents.busy, () => ++poolBusy)
7c0ba920 393 for (let i = 0; i < numberOfWorkers * 2; i++) {
6db75ad9 394 promises.push(pool.execute())
7c0ba920 395 }
cf597bc5 396 await Promise.all(promises)
14916bf9
JB
397 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
398 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
399 expect(poolBusy).toBe(numberOfWorkers + 1)
fd7ebd49 400 await pool.destroy()
7c0ba920 401 })
70a4f5ea
JB
402
403 it('Verify that multiple tasks worker is working', async () => {
404 const pool = new DynamicClusterPool(
405 numberOfWorkers,
406 numberOfWorkers * 2,
407 './tests/worker-files/cluster/testMultiTasksWorker.js'
408 )
409 const data = { n: 10 }
82888165
JB
410 const result0 = await pool.execute(data)
411 expect(result0).toBe(false)
70a4f5ea
JB
412 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
413 expect(result1).toBe(false)
414 const result2 = await pool.execute(data, 'factorial')
415 expect(result2).toBe(3628800)
416 const result3 = await pool.execute(data, 'fibonacci')
417 expect(result3).toBe(89)
418 })
3ec964d6 419})