Merge branch 'master' of github.com:poolifier/poolifier into waittime
[poolifier.git] / tests / pools / abstract / abstract-pool.test.js
CommitLineData
a61a0724 1const { expect } = require('expect')
e843b904 2const {
70a4f5ea 3 DynamicClusterPool,
9e619829 4 DynamicThreadPool,
aee46736 5 FixedClusterPool,
e843b904 6 FixedThreadPool,
aee46736 7 PoolEvents,
e843b904 8 WorkerChoiceStrategies
cdace0e5 9} = require('../../../lib')
78099a15 10const { CircularArray } = require('../../../lib/circular-array')
29ee7e9a 11const { Queue } = require('../../../lib/queue')
e1ffb94f
JB
12
13describe('Abstract pool test suite', () => {
14 const numberOfWorkers = 1
a8884ffd 15 class StubPoolWithRemoveAllWorker extends FixedThreadPool {
e1ffb94f 16 removeAllWorker () {
d4aeae5a 17 this.workerNodes = []
c923ce56 18 this.promiseResponseMap.clear()
e1ffb94f 19 }
3ec964d6 20 }
a8884ffd 21 class StubPoolWithIsMain extends FixedThreadPool {
e1ffb94f
JB
22 isMain () {
23 return false
24 }
3ec964d6 25 }
3ec964d6 26
3ec964d6 27 it('Simulate pool creation from a non main thread/process', () => {
8d3782fa
JB
28 expect(
29 () =>
a8884ffd 30 new StubPoolWithIsMain(
7c0ba920 31 numberOfWorkers,
8d3782fa
JB
32 './tests/worker-files/thread/testWorker.js',
33 {
34 errorHandler: e => console.error(e)
35 }
36 )
d4aeae5a 37 ).toThrowError('Cannot start a pool from a worker!')
3ec964d6 38 })
c510fea7
APA
39
40 it('Verify that filePath is checked', () => {
292ad316
JB
41 const expectedError = new Error(
42 'Please specify a file with a worker implementation'
43 )
7c0ba920 44 expect(() => new FixedThreadPool(numberOfWorkers)).toThrowError(
292ad316 45 expectedError
8d3782fa 46 )
7c0ba920 47 expect(() => new FixedThreadPool(numberOfWorkers, '')).toThrowError(
292ad316 48 expectedError
8d3782fa
JB
49 )
50 })
51
52 it('Verify that numberOfWorkers is checked', () => {
53 expect(() => new FixedThreadPool()).toThrowError(
d4aeae5a 54 'Cannot instantiate a pool without specifying the number of workers'
8d3782fa
JB
55 )
56 })
57
58 it('Verify that a negative number of workers is checked', () => {
59 expect(
60 () =>
61 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
62 ).toThrowError(
473c717a
JB
63 new RangeError(
64 'Cannot instantiate a pool with a negative number of workers'
65 )
8d3782fa
JB
66 )
67 })
68
69 it('Verify that a non integer number of workers is checked', () => {
70 expect(
71 () =>
72 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
73 ).toThrowError(
473c717a 74 new TypeError(
0d80593b 75 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
76 )
77 )
c510fea7 78 })
7c0ba920 79
fd7ebd49 80 it('Verify that pool options are checked', async () => {
7c0ba920
JB
81 let pool = new FixedThreadPool(
82 numberOfWorkers,
83 './tests/worker-files/thread/testWorker.js'
84 )
8620fb25 85 expect(pool.opts.enableEvents).toBe(true)
7c0ba920 86 expect(pool.emitter).toBeDefined()
ff733df7 87 expect(pool.opts.enableTasksQueue).toBe(false)
d4aeae5a 88 expect(pool.opts.tasksQueueOptions).toBeUndefined()
e843b904
JB
89 expect(pool.opts.workerChoiceStrategy).toBe(
90 WorkerChoiceStrategies.ROUND_ROBIN
91 )
da309861
JB
92 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
93 medRunTime: false
94 })
35cf1c03
JB
95 expect(pool.opts.messageHandler).toBeUndefined()
96 expect(pool.opts.errorHandler).toBeUndefined()
97 expect(pool.opts.onlineHandler).toBeUndefined()
98 expect(pool.opts.exitHandler).toBeUndefined()
fd7ebd49 99 await pool.destroy()
35cf1c03 100 const testHandler = () => console.log('test handler executed')
7c0ba920
JB
101 pool = new FixedThreadPool(
102 numberOfWorkers,
103 './tests/worker-files/thread/testWorker.js',
104 {
e4543b14 105 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
49be33fe
JB
106 workerChoiceStrategyOptions: {
107 medRunTime: true,
108 weights: { 0: 300 }
109 },
35cf1c03 110 enableEvents: false,
ff733df7 111 enableTasksQueue: true,
d4aeae5a 112 tasksQueueOptions: { concurrency: 2 },
35cf1c03
JB
113 messageHandler: testHandler,
114 errorHandler: testHandler,
115 onlineHandler: testHandler,
116 exitHandler: testHandler
7c0ba920
JB
117 }
118 )
8620fb25 119 expect(pool.opts.enableEvents).toBe(false)
7c0ba920 120 expect(pool.emitter).toBeUndefined()
ff733df7 121 expect(pool.opts.enableTasksQueue).toBe(true)
d4aeae5a 122 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
e843b904 123 expect(pool.opts.workerChoiceStrategy).toBe(
e4543b14 124 WorkerChoiceStrategies.LEAST_USED
e843b904 125 )
da309861 126 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
49be33fe
JB
127 medRunTime: true,
128 weights: { 0: 300 }
da309861 129 })
35cf1c03
JB
130 expect(pool.opts.messageHandler).toStrictEqual(testHandler)
131 expect(pool.opts.errorHandler).toStrictEqual(testHandler)
132 expect(pool.opts.onlineHandler).toStrictEqual(testHandler)
133 expect(pool.opts.exitHandler).toStrictEqual(testHandler)
fd7ebd49 134 await pool.destroy()
7c0ba920
JB
135 })
136
a20f0ba5 137 it('Verify that pool options are validated', async () => {
d4aeae5a
JB
138 expect(
139 () =>
140 new FixedThreadPool(
141 numberOfWorkers,
142 './tests/worker-files/thread/testWorker.js',
143 {
144 enableTasksQueue: true,
145 tasksQueueOptions: { concurrency: 0 }
146 }
147 )
148 ).toThrowError("Invalid worker tasks concurrency '0'")
149 expect(
150 () =>
151 new FixedThreadPool(
152 numberOfWorkers,
153 './tests/worker-files/thread/testWorker.js',
154 {
155 workerChoiceStrategy: 'invalidStrategy'
156 }
157 )
158 ).toThrowError("Invalid worker choice strategy 'invalidStrategy'")
49be33fe
JB
159 expect(
160 () =>
161 new FixedThreadPool(
162 numberOfWorkers,
163 './tests/worker-files/thread/testWorker.js',
164 {
165 workerChoiceStrategyOptions: { weights: {} }
166 }
167 )
168 ).toThrowError(
169 'Invalid worker choice strategy options: must have a weight for each worker node'
170 )
d4aeae5a
JB
171 })
172
a20f0ba5
JB
173 it('Verify that worker choice strategy options can be set', async () => {
174 const pool = new FixedThreadPool(
175 numberOfWorkers,
176 './tests/worker-files/thread/testWorker.js',
177 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
178 )
179 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
180 medRunTime: false
181 })
182 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
183 .workerChoiceStrategies) {
184 expect(workerChoiceStrategy.opts).toStrictEqual({ medRunTime: false })
185 }
186 expect(
187 pool.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime
188 ).toBe(true)
189 expect(
190 pool.workerChoiceStrategyContext.getRequiredStatistics().medRunTime
191 ).toBe(false)
192 pool.setWorkerChoiceStrategyOptions({ medRunTime: true })
193 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
194 medRunTime: true
195 })
196 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
197 .workerChoiceStrategies) {
198 expect(workerChoiceStrategy.opts).toStrictEqual({ medRunTime: true })
199 }
200 expect(
201 pool.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime
202 ).toBe(false)
203 expect(
204 pool.workerChoiceStrategyContext.getRequiredStatistics().medRunTime
205 ).toBe(true)
206 pool.setWorkerChoiceStrategyOptions({ medRunTime: false })
207 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
208 medRunTime: false
209 })
210 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
211 .workerChoiceStrategies) {
212 expect(workerChoiceStrategy.opts).toStrictEqual({ medRunTime: false })
213 }
214 expect(
215 pool.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime
216 ).toBe(true)
217 expect(
218 pool.workerChoiceStrategyContext.getRequiredStatistics().medRunTime
219 ).toBe(false)
220 await pool.destroy()
221 })
222
223 it('Verify that tasks queue can be enabled/disabled', async () => {
224 const pool = new FixedThreadPool(
225 numberOfWorkers,
226 './tests/worker-files/thread/testWorker.js'
227 )
228 expect(pool.opts.enableTasksQueue).toBe(false)
229 expect(pool.opts.tasksQueueOptions).toBeUndefined()
230 pool.enableTasksQueue(true)
231 expect(pool.opts.enableTasksQueue).toBe(true)
232 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 1 })
233 pool.enableTasksQueue(true, { concurrency: 2 })
234 expect(pool.opts.enableTasksQueue).toBe(true)
235 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
236 pool.enableTasksQueue(false)
237 expect(pool.opts.enableTasksQueue).toBe(false)
238 expect(pool.opts.tasksQueueOptions).toBeUndefined()
239 await pool.destroy()
240 })
241
242 it('Verify that tasks queue options can be set', async () => {
243 const pool = new FixedThreadPool(
244 numberOfWorkers,
245 './tests/worker-files/thread/testWorker.js',
246 { enableTasksQueue: true }
247 )
248 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 1 })
249 pool.setTasksQueueOptions({ concurrency: 2 })
250 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
251 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrowError(
252 "Invalid worker tasks concurrency '0'"
253 )
254 await pool.destroy()
255 })
256
9d16d33e 257 it('Simulate worker not found', async () => {
a8884ffd 258 const pool = new StubPoolWithRemoveAllWorker(
10fcfaf4
JB
259 numberOfWorkers,
260 './tests/worker-files/cluster/testWorker.js',
261 {
10fcfaf4
JB
262 errorHandler: e => console.error(e)
263 }
264 )
d4aeae5a 265 expect(pool.workerNodes.length).toBe(numberOfWorkers)
10fcfaf4
JB
266 // Simulate worker not found.
267 pool.removeAllWorker()
d4aeae5a 268 expect(pool.workerNodes.length).toBe(0)
fd7ebd49 269 await pool.destroy()
bf9549ae
JB
270 })
271
fd7ebd49 272 it('Verify that worker pool tasks usage are initialized', async () => {
bf9549ae
JB
273 const pool = new FixedClusterPool(
274 numberOfWorkers,
275 './tests/worker-files/cluster/testWorker.js'
276 )
f06e48d8
JB
277 for (const workerNode of pool.workerNodes) {
278 expect(workerNode.tasksUsage).toBeDefined()
279 expect(workerNode.tasksUsage.run).toBe(0)
280 expect(workerNode.tasksUsage.running).toBe(0)
281 expect(workerNode.tasksUsage.runTime).toBe(0)
282 expect(workerNode.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray)
283 expect(workerNode.tasksUsage.runTimeHistory.length).toBe(0)
284 expect(workerNode.tasksUsage.avgRunTime).toBe(0)
285 expect(workerNode.tasksUsage.medRunTime).toBe(0)
286 expect(workerNode.tasksUsage.error).toBe(0)
287 }
288 await pool.destroy()
289 })
290
291 it('Verify that worker pool tasks queue are initialized', async () => {
292 const pool = new FixedClusterPool(
293 numberOfWorkers,
294 './tests/worker-files/cluster/testWorker.js'
295 )
296 for (const workerNode of pool.workerNodes) {
297 expect(workerNode.tasksQueue).toBeDefined()
29ee7e9a 298 expect(workerNode.tasksQueue).toBeInstanceOf(Queue)
4d8bf9e4 299 expect(workerNode.tasksQueue.size).toBe(0)
bf9549ae 300 }
fd7ebd49 301 await pool.destroy()
bf9549ae
JB
302 })
303
304 it('Verify that worker pool tasks usage are computed', async () => {
305 const pool = new FixedClusterPool(
306 numberOfWorkers,
307 './tests/worker-files/cluster/testWorker.js'
308 )
309 const promises = []
310 for (let i = 0; i < numberOfWorkers * 2; i++) {
6db75ad9 311 promises.push(pool.execute())
bf9549ae 312 }
f06e48d8
JB
313 for (const workerNode of pool.workerNodes) {
314 expect(workerNode.tasksUsage).toBeDefined()
315 expect(workerNode.tasksUsage.run).toBe(0)
316 expect(workerNode.tasksUsage.running).toBe(numberOfWorkers * 2)
317 expect(workerNode.tasksUsage.runTime).toBe(0)
318 expect(workerNode.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray)
319 expect(workerNode.tasksUsage.runTimeHistory.length).toBe(0)
320 expect(workerNode.tasksUsage.avgRunTime).toBe(0)
321 expect(workerNode.tasksUsage.medRunTime).toBe(0)
322 expect(workerNode.tasksUsage.error).toBe(0)
bf9549ae
JB
323 }
324 await Promise.all(promises)
f06e48d8
JB
325 for (const workerNode of pool.workerNodes) {
326 expect(workerNode.tasksUsage).toBeDefined()
327 expect(workerNode.tasksUsage.run).toBe(numberOfWorkers * 2)
328 expect(workerNode.tasksUsage.running).toBe(0)
329 expect(workerNode.tasksUsage.runTime).toBeGreaterThanOrEqual(0)
330 expect(workerNode.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray)
331 expect(workerNode.tasksUsage.runTimeHistory.length).toBe(0)
332 expect(workerNode.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0)
333 expect(workerNode.tasksUsage.medRunTime).toBe(0)
334 expect(workerNode.tasksUsage.error).toBe(0)
bf9549ae 335 }
fd7ebd49 336 await pool.destroy()
bf9549ae
JB
337 })
338
ee11a4a2 339 it('Verify that worker pool tasks usage are reset at worker choice strategy change', async () => {
7fd82a1c 340 const pool = new DynamicThreadPool(
9e619829
JB
341 numberOfWorkers,
342 numberOfWorkers,
343 './tests/worker-files/thread/testWorker.js'
344 )
7fd82a1c 345 const promises = []
9e619829
JB
346 for (let i = 0; i < numberOfWorkers * 2; i++) {
347 promises.push(pool.execute())
348 }
349 await Promise.all(promises)
f06e48d8
JB
350 for (const workerNode of pool.workerNodes) {
351 expect(workerNode.tasksUsage).toBeDefined()
352 expect(workerNode.tasksUsage.run).toBe(numberOfWorkers * 2)
353 expect(workerNode.tasksUsage.running).toBe(0)
354 expect(workerNode.tasksUsage.runTime).toBeGreaterThanOrEqual(0)
355 expect(workerNode.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray)
356 expect(workerNode.tasksUsage.runTimeHistory.length).toBe(0)
357 expect(workerNode.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0)
358 expect(workerNode.tasksUsage.medRunTime).toBe(0)
359 expect(workerNode.tasksUsage.error).toBe(0)
9e619829
JB
360 }
361 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
f06e48d8
JB
362 for (const workerNode of pool.workerNodes) {
363 expect(workerNode.tasksUsage).toBeDefined()
364 expect(workerNode.tasksUsage.run).toBe(0)
365 expect(workerNode.tasksUsage.running).toBe(0)
366 expect(workerNode.tasksUsage.runTime).toBe(0)
367 expect(workerNode.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray)
368 expect(workerNode.tasksUsage.runTimeHistory.length).toBe(0)
369 expect(workerNode.tasksUsage.avgRunTime).toBe(0)
370 expect(workerNode.tasksUsage.medRunTime).toBe(0)
371 expect(workerNode.tasksUsage.error).toBe(0)
ee11a4a2 372 }
fd7ebd49 373 await pool.destroy()
ee11a4a2
JB
374 })
375
164d950a
JB
376 it("Verify that pool event emitter 'full' event can register a callback", async () => {
377 const pool = new DynamicThreadPool(
378 numberOfWorkers,
379 numberOfWorkers,
380 './tests/worker-files/thread/testWorker.js'
381 )
382 const promises = []
383 let poolFull = 0
aee46736 384 pool.emitter.on(PoolEvents.full, () => ++poolFull)
164d950a
JB
385 for (let i = 0; i < numberOfWorkers * 2; i++) {
386 promises.push(pool.execute())
387 }
388 await Promise.all(promises)
594bfb84 389 // The `full` event is triggered when the number of submitted tasks at once reach the max number of workers in the dynamic pool.
164d950a
JB
390 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool.
391 expect(poolFull).toBe(numberOfWorkers + 1)
392 await pool.destroy()
393 })
394
cf597bc5 395 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
7c0ba920
JB
396 const pool = new FixedThreadPool(
397 numberOfWorkers,
398 './tests/worker-files/thread/testWorker.js'
399 )
400 const promises = []
401 let poolBusy = 0
aee46736 402 pool.emitter.on(PoolEvents.busy, () => ++poolBusy)
7c0ba920 403 for (let i = 0; i < numberOfWorkers * 2; i++) {
6db75ad9 404 promises.push(pool.execute())
7c0ba920 405 }
cf597bc5 406 await Promise.all(promises)
14916bf9
JB
407 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
408 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
409 expect(poolBusy).toBe(numberOfWorkers + 1)
fd7ebd49 410 await pool.destroy()
7c0ba920 411 })
70a4f5ea
JB
412
413 it('Verify that multiple tasks worker is working', async () => {
414 const pool = new DynamicClusterPool(
415 numberOfWorkers,
416 numberOfWorkers * 2,
417 './tests/worker-files/cluster/testMultiTasksWorker.js'
418 )
419 const data = { n: 10 }
82888165
JB
420 const result0 = await pool.execute(data)
421 expect(result0).toBe(false)
70a4f5ea
JB
422 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
423 expect(result1).toBe(false)
424 const result2 = await pool.execute(data, 'factorial')
425 expect(result2).toBe(3628800)
426 const result3 = await pool.execute(data, 'fibonacci')
427 expect(result3).toBe(89)
428 })
3ec964d6 429})