fix: fix round handling at worker removal in IWRR
[poolifier.git] / tests / pools / abstract / abstract-pool.test.js
CommitLineData
a61a0724 1const { expect } = require('expect')
e843b904 2const {
70a4f5ea 3 DynamicClusterPool,
9e619829 4 DynamicThreadPool,
aee46736 5 FixedClusterPool,
e843b904 6 FixedThreadPool,
aee46736 7 PoolEvents,
e843b904 8 WorkerChoiceStrategies
cdace0e5 9} = require('../../../lib')
78099a15 10const { CircularArray } = require('../../../lib/circular-array')
29ee7e9a 11const { Queue } = require('../../../lib/queue')
e1ffb94f
JB
12
13describe('Abstract pool test suite', () => {
14 const numberOfWorkers = 1
a8884ffd 15 class StubPoolWithRemoveAllWorker extends FixedThreadPool {
e1ffb94f 16 removeAllWorker () {
d4aeae5a 17 this.workerNodes = []
c923ce56 18 this.promiseResponseMap.clear()
e1ffb94f 19 }
3ec964d6 20 }
a8884ffd 21 class StubPoolWithIsMain extends FixedThreadPool {
e1ffb94f
JB
22 isMain () {
23 return false
24 }
3ec964d6 25 }
3ec964d6 26
3ec964d6 27 it('Simulate pool creation from a non main thread/process', () => {
8d3782fa
JB
28 expect(
29 () =>
a8884ffd 30 new StubPoolWithIsMain(
7c0ba920 31 numberOfWorkers,
8d3782fa
JB
32 './tests/worker-files/thread/testWorker.js',
33 {
34 errorHandler: e => console.error(e)
35 }
36 )
d4aeae5a 37 ).toThrowError('Cannot start a pool from a worker!')
3ec964d6 38 })
c510fea7
APA
39
40 it('Verify that filePath is checked', () => {
292ad316
JB
41 const expectedError = new Error(
42 'Please specify a file with a worker implementation'
43 )
7c0ba920 44 expect(() => new FixedThreadPool(numberOfWorkers)).toThrowError(
292ad316 45 expectedError
8d3782fa 46 )
7c0ba920 47 expect(() => new FixedThreadPool(numberOfWorkers, '')).toThrowError(
292ad316 48 expectedError
8d3782fa
JB
49 )
50 })
51
52 it('Verify that numberOfWorkers is checked', () => {
53 expect(() => new FixedThreadPool()).toThrowError(
d4aeae5a 54 'Cannot instantiate a pool without specifying the number of workers'
8d3782fa
JB
55 )
56 })
57
58 it('Verify that a negative number of workers is checked', () => {
59 expect(
60 () =>
61 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
62 ).toThrowError(
473c717a
JB
63 new RangeError(
64 'Cannot instantiate a pool with a negative number of workers'
65 )
8d3782fa
JB
66 )
67 })
68
69 it('Verify that a non integer number of workers is checked', () => {
70 expect(
71 () =>
72 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
73 ).toThrowError(
473c717a 74 new TypeError(
0d80593b 75 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
76 )
77 )
c510fea7 78 })
7c0ba920 79
fd7ebd49 80 it('Verify that pool options are checked', async () => {
7c0ba920
JB
81 let pool = new FixedThreadPool(
82 numberOfWorkers,
83 './tests/worker-files/thread/testWorker.js'
84 )
8620fb25 85 expect(pool.opts.enableEvents).toBe(true)
7c0ba920 86 expect(pool.emitter).toBeDefined()
ff733df7 87 expect(pool.opts.enableTasksQueue).toBe(false)
d4aeae5a 88 expect(pool.opts.tasksQueueOptions).toBeUndefined()
e843b904
JB
89 expect(pool.opts.workerChoiceStrategy).toBe(
90 WorkerChoiceStrategies.ROUND_ROBIN
91 )
da309861
JB
92 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
93 medRunTime: false
94 })
35cf1c03
JB
95 expect(pool.opts.messageHandler).toBeUndefined()
96 expect(pool.opts.errorHandler).toBeUndefined()
97 expect(pool.opts.onlineHandler).toBeUndefined()
98 expect(pool.opts.exitHandler).toBeUndefined()
fd7ebd49 99 await pool.destroy()
35cf1c03 100 const testHandler = () => console.log('test handler executed')
7c0ba920
JB
101 pool = new FixedThreadPool(
102 numberOfWorkers,
103 './tests/worker-files/thread/testWorker.js',
104 {
737c6d97 105 workerChoiceStrategy: WorkerChoiceStrategies.LESS_USED,
da309861 106 workerChoiceStrategyOptions: { medRunTime: true },
35cf1c03 107 enableEvents: false,
ff733df7 108 enableTasksQueue: true,
d4aeae5a 109 tasksQueueOptions: { concurrency: 2 },
35cf1c03
JB
110 messageHandler: testHandler,
111 errorHandler: testHandler,
112 onlineHandler: testHandler,
113 exitHandler: testHandler
7c0ba920
JB
114 }
115 )
8620fb25 116 expect(pool.opts.enableEvents).toBe(false)
7c0ba920 117 expect(pool.emitter).toBeUndefined()
ff733df7 118 expect(pool.opts.enableTasksQueue).toBe(true)
d4aeae5a 119 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
e843b904 120 expect(pool.opts.workerChoiceStrategy).toBe(
737c6d97 121 WorkerChoiceStrategies.LESS_USED
e843b904 122 )
da309861
JB
123 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
124 medRunTime: true
125 })
35cf1c03
JB
126 expect(pool.opts.messageHandler).toStrictEqual(testHandler)
127 expect(pool.opts.errorHandler).toStrictEqual(testHandler)
128 expect(pool.opts.onlineHandler).toStrictEqual(testHandler)
129 expect(pool.opts.exitHandler).toStrictEqual(testHandler)
fd7ebd49 130 await pool.destroy()
7c0ba920
JB
131 })
132
a20f0ba5 133 it('Verify that pool options are validated', async () => {
d4aeae5a
JB
134 expect(
135 () =>
136 new FixedThreadPool(
137 numberOfWorkers,
138 './tests/worker-files/thread/testWorker.js',
139 {
140 enableTasksQueue: true,
141 tasksQueueOptions: { concurrency: 0 }
142 }
143 )
144 ).toThrowError("Invalid worker tasks concurrency '0'")
145 expect(
146 () =>
147 new FixedThreadPool(
148 numberOfWorkers,
149 './tests/worker-files/thread/testWorker.js',
150 {
151 workerChoiceStrategy: 'invalidStrategy'
152 }
153 )
154 ).toThrowError("Invalid worker choice strategy 'invalidStrategy'")
155 })
156
a20f0ba5
JB
157 it('Verify that worker choice strategy options can be set', async () => {
158 const pool = new FixedThreadPool(
159 numberOfWorkers,
160 './tests/worker-files/thread/testWorker.js',
161 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
162 )
163 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
164 medRunTime: false
165 })
166 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
167 .workerChoiceStrategies) {
168 expect(workerChoiceStrategy.opts).toStrictEqual({ medRunTime: false })
169 }
170 expect(
171 pool.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime
172 ).toBe(true)
173 expect(
174 pool.workerChoiceStrategyContext.getRequiredStatistics().medRunTime
175 ).toBe(false)
176 pool.setWorkerChoiceStrategyOptions({ medRunTime: true })
177 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
178 medRunTime: true
179 })
180 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
181 .workerChoiceStrategies) {
182 expect(workerChoiceStrategy.opts).toStrictEqual({ medRunTime: true })
183 }
184 expect(
185 pool.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime
186 ).toBe(false)
187 expect(
188 pool.workerChoiceStrategyContext.getRequiredStatistics().medRunTime
189 ).toBe(true)
190 pool.setWorkerChoiceStrategyOptions({ medRunTime: false })
191 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
192 medRunTime: false
193 })
194 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
195 .workerChoiceStrategies) {
196 expect(workerChoiceStrategy.opts).toStrictEqual({ medRunTime: false })
197 }
198 expect(
199 pool.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime
200 ).toBe(true)
201 expect(
202 pool.workerChoiceStrategyContext.getRequiredStatistics().medRunTime
203 ).toBe(false)
204 await pool.destroy()
205 })
206
207 it('Verify that tasks queue can be enabled/disabled', async () => {
208 const pool = new FixedThreadPool(
209 numberOfWorkers,
210 './tests/worker-files/thread/testWorker.js'
211 )
212 expect(pool.opts.enableTasksQueue).toBe(false)
213 expect(pool.opts.tasksQueueOptions).toBeUndefined()
214 pool.enableTasksQueue(true)
215 expect(pool.opts.enableTasksQueue).toBe(true)
216 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 1 })
217 pool.enableTasksQueue(true, { concurrency: 2 })
218 expect(pool.opts.enableTasksQueue).toBe(true)
219 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
220 pool.enableTasksQueue(false)
221 expect(pool.opts.enableTasksQueue).toBe(false)
222 expect(pool.opts.tasksQueueOptions).toBeUndefined()
223 await pool.destroy()
224 })
225
226 it('Verify that tasks queue options can be set', async () => {
227 const pool = new FixedThreadPool(
228 numberOfWorkers,
229 './tests/worker-files/thread/testWorker.js',
230 { enableTasksQueue: true }
231 )
232 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 1 })
233 pool.setTasksQueueOptions({ concurrency: 2 })
234 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
235 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrowError(
236 "Invalid worker tasks concurrency '0'"
237 )
238 await pool.destroy()
239 })
240
9d16d33e 241 it('Simulate worker not found', async () => {
a8884ffd 242 const pool = new StubPoolWithRemoveAllWorker(
10fcfaf4
JB
243 numberOfWorkers,
244 './tests/worker-files/cluster/testWorker.js',
245 {
10fcfaf4
JB
246 errorHandler: e => console.error(e)
247 }
248 )
d4aeae5a 249 expect(pool.workerNodes.length).toBe(numberOfWorkers)
10fcfaf4
JB
250 // Simulate worker not found.
251 pool.removeAllWorker()
d4aeae5a 252 expect(pool.workerNodes.length).toBe(0)
fd7ebd49 253 await pool.destroy()
bf9549ae
JB
254 })
255
fd7ebd49 256 it('Verify that worker pool tasks usage are initialized', async () => {
bf9549ae
JB
257 const pool = new FixedClusterPool(
258 numberOfWorkers,
259 './tests/worker-files/cluster/testWorker.js'
260 )
f06e48d8
JB
261 for (const workerNode of pool.workerNodes) {
262 expect(workerNode.tasksUsage).toBeDefined()
263 expect(workerNode.tasksUsage.run).toBe(0)
264 expect(workerNode.tasksUsage.running).toBe(0)
265 expect(workerNode.tasksUsage.runTime).toBe(0)
266 expect(workerNode.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray)
267 expect(workerNode.tasksUsage.runTimeHistory.length).toBe(0)
268 expect(workerNode.tasksUsage.avgRunTime).toBe(0)
269 expect(workerNode.tasksUsage.medRunTime).toBe(0)
270 expect(workerNode.tasksUsage.error).toBe(0)
271 }
272 await pool.destroy()
273 })
274
275 it('Verify that worker pool tasks queue are initialized', async () => {
276 const pool = new FixedClusterPool(
277 numberOfWorkers,
278 './tests/worker-files/cluster/testWorker.js'
279 )
280 for (const workerNode of pool.workerNodes) {
281 expect(workerNode.tasksQueue).toBeDefined()
29ee7e9a 282 expect(workerNode.tasksQueue).toBeInstanceOf(Queue)
4d8bf9e4 283 expect(workerNode.tasksQueue.size).toBe(0)
bf9549ae 284 }
fd7ebd49 285 await pool.destroy()
bf9549ae
JB
286 })
287
288 it('Verify that worker pool tasks usage are computed', async () => {
289 const pool = new FixedClusterPool(
290 numberOfWorkers,
291 './tests/worker-files/cluster/testWorker.js'
292 )
293 const promises = []
294 for (let i = 0; i < numberOfWorkers * 2; i++) {
6db75ad9 295 promises.push(pool.execute())
bf9549ae 296 }
f06e48d8
JB
297 for (const workerNode of pool.workerNodes) {
298 expect(workerNode.tasksUsage).toBeDefined()
299 expect(workerNode.tasksUsage.run).toBe(0)
300 expect(workerNode.tasksUsage.running).toBe(numberOfWorkers * 2)
301 expect(workerNode.tasksUsage.runTime).toBe(0)
302 expect(workerNode.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray)
303 expect(workerNode.tasksUsage.runTimeHistory.length).toBe(0)
304 expect(workerNode.tasksUsage.avgRunTime).toBe(0)
305 expect(workerNode.tasksUsage.medRunTime).toBe(0)
306 expect(workerNode.tasksUsage.error).toBe(0)
bf9549ae
JB
307 }
308 await Promise.all(promises)
f06e48d8
JB
309 for (const workerNode of pool.workerNodes) {
310 expect(workerNode.tasksUsage).toBeDefined()
311 expect(workerNode.tasksUsage.run).toBe(numberOfWorkers * 2)
312 expect(workerNode.tasksUsage.running).toBe(0)
313 expect(workerNode.tasksUsage.runTime).toBeGreaterThanOrEqual(0)
314 expect(workerNode.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray)
315 expect(workerNode.tasksUsage.runTimeHistory.length).toBe(0)
316 expect(workerNode.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0)
317 expect(workerNode.tasksUsage.medRunTime).toBe(0)
318 expect(workerNode.tasksUsage.error).toBe(0)
bf9549ae 319 }
fd7ebd49 320 await pool.destroy()
bf9549ae
JB
321 })
322
ee11a4a2 323 it('Verify that worker pool tasks usage are reset at worker choice strategy change', async () => {
7fd82a1c 324 const pool = new DynamicThreadPool(
9e619829
JB
325 numberOfWorkers,
326 numberOfWorkers,
327 './tests/worker-files/thread/testWorker.js'
328 )
7fd82a1c 329 const promises = []
9e619829
JB
330 for (let i = 0; i < numberOfWorkers * 2; i++) {
331 promises.push(pool.execute())
332 }
333 await Promise.all(promises)
f06e48d8
JB
334 for (const workerNode of pool.workerNodes) {
335 expect(workerNode.tasksUsage).toBeDefined()
336 expect(workerNode.tasksUsage.run).toBe(numberOfWorkers * 2)
337 expect(workerNode.tasksUsage.running).toBe(0)
338 expect(workerNode.tasksUsage.runTime).toBeGreaterThanOrEqual(0)
339 expect(workerNode.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray)
340 expect(workerNode.tasksUsage.runTimeHistory.length).toBe(0)
341 expect(workerNode.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0)
342 expect(workerNode.tasksUsage.medRunTime).toBe(0)
343 expect(workerNode.tasksUsage.error).toBe(0)
9e619829
JB
344 }
345 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
f06e48d8
JB
346 for (const workerNode of pool.workerNodes) {
347 expect(workerNode.tasksUsage).toBeDefined()
348 expect(workerNode.tasksUsage.run).toBe(0)
349 expect(workerNode.tasksUsage.running).toBe(0)
350 expect(workerNode.tasksUsage.runTime).toBe(0)
351 expect(workerNode.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray)
352 expect(workerNode.tasksUsage.runTimeHistory.length).toBe(0)
353 expect(workerNode.tasksUsage.avgRunTime).toBe(0)
354 expect(workerNode.tasksUsage.medRunTime).toBe(0)
355 expect(workerNode.tasksUsage.error).toBe(0)
ee11a4a2 356 }
fd7ebd49 357 await pool.destroy()
ee11a4a2
JB
358 })
359
164d950a
JB
360 it("Verify that pool event emitter 'full' event can register a callback", async () => {
361 const pool = new DynamicThreadPool(
362 numberOfWorkers,
363 numberOfWorkers,
364 './tests/worker-files/thread/testWorker.js'
365 )
366 const promises = []
367 let poolFull = 0
aee46736 368 pool.emitter.on(PoolEvents.full, () => ++poolFull)
164d950a
JB
369 for (let i = 0; i < numberOfWorkers * 2; i++) {
370 promises.push(pool.execute())
371 }
372 await Promise.all(promises)
594bfb84 373 // The `full` event is triggered when the number of submitted tasks at once reach the max number of workers in the dynamic pool.
164d950a
JB
374 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool.
375 expect(poolFull).toBe(numberOfWorkers + 1)
376 await pool.destroy()
377 })
378
cf597bc5 379 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
7c0ba920
JB
380 const pool = new FixedThreadPool(
381 numberOfWorkers,
382 './tests/worker-files/thread/testWorker.js'
383 )
384 const promises = []
385 let poolBusy = 0
aee46736 386 pool.emitter.on(PoolEvents.busy, () => ++poolBusy)
7c0ba920 387 for (let i = 0; i < numberOfWorkers * 2; i++) {
6db75ad9 388 promises.push(pool.execute())
7c0ba920 389 }
cf597bc5 390 await Promise.all(promises)
14916bf9
JB
391 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
392 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
393 expect(poolBusy).toBe(numberOfWorkers + 1)
fd7ebd49 394 await pool.destroy()
7c0ba920 395 })
70a4f5ea
JB
396
397 it('Verify that multiple tasks worker is working', async () => {
398 const pool = new DynamicClusterPool(
399 numberOfWorkers,
400 numberOfWorkers * 2,
401 './tests/worker-files/cluster/testMultiTasksWorker.js'
402 )
403 const data = { n: 10 }
82888165
JB
404 const result0 = await pool.execute(data)
405 expect(result0).toBe(false)
70a4f5ea
JB
406 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
407 expect(result1).toBe(false)
408 const result2 = await pool.execute(data, 'factorial')
409 expect(result2).toBe(3628800)
410 const result3 = await pool.execute(data, 'fibonacci')
411 expect(result3).toBe(89)
412 })
3ec964d6 413})