build(deps-dev): apply updates
[poolifier.git] / tests / pools / abstract / abstract-pool.test.js
CommitLineData
a61a0724 1const { expect } = require('expect')
e843b904 2const {
70a4f5ea 3 DynamicClusterPool,
9e619829 4 DynamicThreadPool,
aee46736 5 FixedClusterPool,
e843b904 6 FixedThreadPool,
aee46736 7 PoolEvents,
e843b904 8 WorkerChoiceStrategies
cdace0e5 9} = require('../../../lib')
78099a15 10const { CircularArray } = require('../../../lib/circular-array')
29ee7e9a 11const { Queue } = require('../../../lib/queue')
e1ffb94f
JB
12
13describe('Abstract pool test suite', () => {
fc027381 14 const numberOfWorkers = 2
a8884ffd 15 class StubPoolWithRemoveAllWorker extends FixedThreadPool {
e1ffb94f 16 removeAllWorker () {
d4aeae5a 17 this.workerNodes = []
c923ce56 18 this.promiseResponseMap.clear()
e1ffb94f 19 }
3ec964d6 20 }
a8884ffd 21 class StubPoolWithIsMain extends FixedThreadPool {
e1ffb94f
JB
22 isMain () {
23 return false
24 }
3ec964d6 25 }
3ec964d6 26
3ec964d6 27 it('Simulate pool creation from a non main thread/process', () => {
8d3782fa
JB
28 expect(
29 () =>
a8884ffd 30 new StubPoolWithIsMain(
7c0ba920 31 numberOfWorkers,
8d3782fa
JB
32 './tests/worker-files/thread/testWorker.js',
33 {
34 errorHandler: e => console.error(e)
35 }
36 )
d4aeae5a 37 ).toThrowError('Cannot start a pool from a worker!')
3ec964d6 38 })
c510fea7
APA
39
40 it('Verify that filePath is checked', () => {
292ad316
JB
41 const expectedError = new Error(
42 'Please specify a file with a worker implementation'
43 )
7c0ba920 44 expect(() => new FixedThreadPool(numberOfWorkers)).toThrowError(
292ad316 45 expectedError
8d3782fa 46 )
7c0ba920 47 expect(() => new FixedThreadPool(numberOfWorkers, '')).toThrowError(
292ad316 48 expectedError
8d3782fa
JB
49 )
50 })
51
52 it('Verify that numberOfWorkers is checked', () => {
53 expect(() => new FixedThreadPool()).toThrowError(
d4aeae5a 54 'Cannot instantiate a pool without specifying the number of workers'
8d3782fa
JB
55 )
56 })
57
58 it('Verify that a negative number of workers is checked', () => {
59 expect(
60 () =>
61 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
62 ).toThrowError(
473c717a
JB
63 new RangeError(
64 'Cannot instantiate a pool with a negative number of workers'
65 )
8d3782fa
JB
66 )
67 })
68
69 it('Verify that a non integer number of workers is checked', () => {
70 expect(
71 () =>
72 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
73 ).toThrowError(
473c717a 74 new TypeError(
0d80593b 75 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
76 )
77 )
c510fea7 78 })
7c0ba920 79
fd7ebd49 80 it('Verify that pool options are checked', async () => {
7c0ba920
JB
81 let pool = new FixedThreadPool(
82 numberOfWorkers,
83 './tests/worker-files/thread/testWorker.js'
84 )
7c0ba920 85 expect(pool.emitter).toBeDefined()
1f68cede
JB
86 expect(pool.opts.enableEvents).toBe(true)
87 expect(pool.opts.restartWorkerOnError).toBe(true)
ff733df7 88 expect(pool.opts.enableTasksQueue).toBe(false)
d4aeae5a 89 expect(pool.opts.tasksQueueOptions).toBeUndefined()
e843b904
JB
90 expect(pool.opts.workerChoiceStrategy).toBe(
91 WorkerChoiceStrategies.ROUND_ROBIN
92 )
da309861 93 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
86bf340d
JB
94 medRunTime: false,
95 medWaitTime: false
da309861 96 })
35cf1c03
JB
97 expect(pool.opts.messageHandler).toBeUndefined()
98 expect(pool.opts.errorHandler).toBeUndefined()
99 expect(pool.opts.onlineHandler).toBeUndefined()
100 expect(pool.opts.exitHandler).toBeUndefined()
fd7ebd49 101 await pool.destroy()
35cf1c03 102 const testHandler = () => console.log('test handler executed')
7c0ba920
JB
103 pool = new FixedThreadPool(
104 numberOfWorkers,
105 './tests/worker-files/thread/testWorker.js',
106 {
e4543b14 107 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
49be33fe
JB
108 workerChoiceStrategyOptions: {
109 medRunTime: true,
fc027381 110 weights: { 0: 300, 1: 200 }
49be33fe 111 },
35cf1c03 112 enableEvents: false,
1f68cede 113 restartWorkerOnError: false,
ff733df7 114 enableTasksQueue: true,
d4aeae5a 115 tasksQueueOptions: { concurrency: 2 },
35cf1c03
JB
116 messageHandler: testHandler,
117 errorHandler: testHandler,
118 onlineHandler: testHandler,
119 exitHandler: testHandler
7c0ba920
JB
120 }
121 )
7c0ba920 122 expect(pool.emitter).toBeUndefined()
1f68cede
JB
123 expect(pool.opts.enableEvents).toBe(false)
124 expect(pool.opts.restartWorkerOnError).toBe(false)
ff733df7 125 expect(pool.opts.enableTasksQueue).toBe(true)
d4aeae5a 126 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
e843b904 127 expect(pool.opts.workerChoiceStrategy).toBe(
e4543b14 128 WorkerChoiceStrategies.LEAST_USED
e843b904 129 )
da309861 130 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
49be33fe 131 medRunTime: true,
fc027381 132 weights: { 0: 300, 1: 200 }
da309861 133 })
35cf1c03
JB
134 expect(pool.opts.messageHandler).toStrictEqual(testHandler)
135 expect(pool.opts.errorHandler).toStrictEqual(testHandler)
136 expect(pool.opts.onlineHandler).toStrictEqual(testHandler)
137 expect(pool.opts.exitHandler).toStrictEqual(testHandler)
fd7ebd49 138 await pool.destroy()
7c0ba920
JB
139 })
140
a20f0ba5 141 it('Verify that pool options are validated', async () => {
d4aeae5a
JB
142 expect(
143 () =>
144 new FixedThreadPool(
145 numberOfWorkers,
146 './tests/worker-files/thread/testWorker.js',
147 {
148 enableTasksQueue: true,
149 tasksQueueOptions: { concurrency: 0 }
150 }
151 )
152 ).toThrowError("Invalid worker tasks concurrency '0'")
153 expect(
154 () =>
155 new FixedThreadPool(
156 numberOfWorkers,
157 './tests/worker-files/thread/testWorker.js',
158 {
159 workerChoiceStrategy: 'invalidStrategy'
160 }
161 )
162 ).toThrowError("Invalid worker choice strategy 'invalidStrategy'")
49be33fe
JB
163 expect(
164 () =>
165 new FixedThreadPool(
166 numberOfWorkers,
167 './tests/worker-files/thread/testWorker.js',
168 {
169 workerChoiceStrategyOptions: { weights: {} }
170 }
171 )
172 ).toThrowError(
173 'Invalid worker choice strategy options: must have a weight for each worker node'
174 )
d4aeae5a
JB
175 })
176
a20f0ba5
JB
177 it('Verify that worker choice strategy options can be set', async () => {
178 const pool = new FixedThreadPool(
179 numberOfWorkers,
180 './tests/worker-files/thread/testWorker.js',
181 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
182 )
183 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
86bf340d
JB
184 medRunTime: false,
185 medWaitTime: false
a20f0ba5
JB
186 })
187 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
188 .workerChoiceStrategies) {
86bf340d
JB
189 expect(workerChoiceStrategy.opts).toStrictEqual({
190 medRunTime: false,
191 medWaitTime: false
192 })
a20f0ba5
JB
193 }
194 expect(
86bf340d
JB
195 pool.workerChoiceStrategyContext.getRequiredStatistics()
196 ).toStrictEqual({
197 runTime: true,
198 avgRunTime: true,
199 medRunTime: false,
200 waitTime: false,
201 avgWaitTime: false,
202 medWaitTime: false
203 })
a20f0ba5
JB
204 pool.setWorkerChoiceStrategyOptions({ medRunTime: true })
205 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
206 medRunTime: true
207 })
208 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
209 .workerChoiceStrategies) {
210 expect(workerChoiceStrategy.opts).toStrictEqual({ medRunTime: true })
211 }
212 expect(
86bf340d
JB
213 pool.workerChoiceStrategyContext.getRequiredStatistics()
214 ).toStrictEqual({
215 runTime: true,
216 avgRunTime: false,
217 medRunTime: true,
218 waitTime: false,
219 avgWaitTime: false,
220 medWaitTime: false
221 })
a20f0ba5
JB
222 pool.setWorkerChoiceStrategyOptions({ medRunTime: false })
223 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
224 medRunTime: false
225 })
226 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
227 .workerChoiceStrategies) {
228 expect(workerChoiceStrategy.opts).toStrictEqual({ medRunTime: false })
229 }
230 expect(
86bf340d
JB
231 pool.workerChoiceStrategyContext.getRequiredStatistics()
232 ).toStrictEqual({
233 runTime: true,
234 avgRunTime: true,
235 medRunTime: false,
236 waitTime: false,
237 avgWaitTime: false,
238 medWaitTime: false
239 })
a20f0ba5
JB
240 await pool.destroy()
241 })
242
243 it('Verify that tasks queue can be enabled/disabled', async () => {
244 const pool = new FixedThreadPool(
245 numberOfWorkers,
246 './tests/worker-files/thread/testWorker.js'
247 )
248 expect(pool.opts.enableTasksQueue).toBe(false)
249 expect(pool.opts.tasksQueueOptions).toBeUndefined()
250 pool.enableTasksQueue(true)
251 expect(pool.opts.enableTasksQueue).toBe(true)
252 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 1 })
253 pool.enableTasksQueue(true, { concurrency: 2 })
254 expect(pool.opts.enableTasksQueue).toBe(true)
255 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
256 pool.enableTasksQueue(false)
257 expect(pool.opts.enableTasksQueue).toBe(false)
258 expect(pool.opts.tasksQueueOptions).toBeUndefined()
259 await pool.destroy()
260 })
261
262 it('Verify that tasks queue options can be set', async () => {
263 const pool = new FixedThreadPool(
264 numberOfWorkers,
265 './tests/worker-files/thread/testWorker.js',
266 { enableTasksQueue: true }
267 )
268 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 1 })
269 pool.setTasksQueueOptions({ concurrency: 2 })
270 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
271 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrowError(
272 "Invalid worker tasks concurrency '0'"
273 )
274 await pool.destroy()
275 })
276
9d16d33e 277 it('Simulate worker not found', async () => {
a8884ffd 278 const pool = new StubPoolWithRemoveAllWorker(
10fcfaf4
JB
279 numberOfWorkers,
280 './tests/worker-files/cluster/testWorker.js',
281 {
10fcfaf4
JB
282 errorHandler: e => console.error(e)
283 }
284 )
d4aeae5a 285 expect(pool.workerNodes.length).toBe(numberOfWorkers)
10fcfaf4
JB
286 // Simulate worker not found.
287 pool.removeAllWorker()
d4aeae5a 288 expect(pool.workerNodes.length).toBe(0)
fd7ebd49 289 await pool.destroy()
bf9549ae
JB
290 })
291
fd7ebd49 292 it('Verify that worker pool tasks usage are initialized', async () => {
bf9549ae
JB
293 const pool = new FixedClusterPool(
294 numberOfWorkers,
295 './tests/worker-files/cluster/testWorker.js'
296 )
f06e48d8 297 for (const workerNode of pool.workerNodes) {
86bf340d
JB
298 expect(workerNode.tasksUsage).toStrictEqual({
299 run: 0,
300 running: 0,
301 runTime: 0,
302 runTimeHistory: expect.any(CircularArray),
303 avgRunTime: 0,
304 medRunTime: 0,
305 waitTime: 0,
306 waitTimeHistory: expect.any(CircularArray),
307 avgWaitTime: 0,
308 medWaitTime: 0,
309 error: 0
310 })
f06e48d8
JB
311 }
312 await pool.destroy()
313 })
314
315 it('Verify that worker pool tasks queue are initialized', async () => {
316 const pool = new FixedClusterPool(
317 numberOfWorkers,
318 './tests/worker-files/cluster/testWorker.js'
319 )
320 for (const workerNode of pool.workerNodes) {
321 expect(workerNode.tasksQueue).toBeDefined()
29ee7e9a 322 expect(workerNode.tasksQueue).toBeInstanceOf(Queue)
4d8bf9e4 323 expect(workerNode.tasksQueue.size).toBe(0)
bf9549ae 324 }
fd7ebd49 325 await pool.destroy()
bf9549ae
JB
326 })
327
328 it('Verify that worker pool tasks usage are computed', async () => {
329 const pool = new FixedClusterPool(
330 numberOfWorkers,
331 './tests/worker-files/cluster/testWorker.js'
332 )
09c2d0d3 333 const promises = new Set()
fc027381
JB
334 const maxMultiplier = 2
335 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
09c2d0d3 336 promises.add(pool.execute())
bf9549ae 337 }
f06e48d8 338 for (const workerNode of pool.workerNodes) {
86bf340d
JB
339 expect(workerNode.tasksUsage).toStrictEqual({
340 run: 0,
fc027381 341 running: maxMultiplier,
86bf340d
JB
342 runTime: 0,
343 runTimeHistory: expect.any(CircularArray),
344 avgRunTime: 0,
345 medRunTime: 0,
346 waitTime: 0,
347 waitTimeHistory: expect.any(CircularArray),
348 avgWaitTime: 0,
349 medWaitTime: 0,
350 error: 0
351 })
bf9549ae
JB
352 }
353 await Promise.all(promises)
f06e48d8 354 for (const workerNode of pool.workerNodes) {
86bf340d 355 expect(workerNode.tasksUsage).toStrictEqual({
fc027381 356 run: maxMultiplier,
86bf340d
JB
357 running: 0,
358 runTime: 0,
359 runTimeHistory: expect.any(CircularArray),
360 avgRunTime: 0,
361 medRunTime: 0,
362 waitTime: 0,
363 waitTimeHistory: expect.any(CircularArray),
364 avgWaitTime: 0,
365 medWaitTime: 0,
366 error: 0
367 })
bf9549ae 368 }
fd7ebd49 369 await pool.destroy()
bf9549ae
JB
370 })
371
ee11a4a2 372 it('Verify that worker pool tasks usage are reset at worker choice strategy change', async () => {
7fd82a1c 373 const pool = new DynamicThreadPool(
9e619829 374 numberOfWorkers,
8f4878b7 375 numberOfWorkers,
9e619829
JB
376 './tests/worker-files/thread/testWorker.js'
377 )
09c2d0d3 378 const promises = new Set()
ee9f5295
JB
379 const maxMultiplier = 2
380 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
09c2d0d3 381 promises.add(pool.execute())
9e619829
JB
382 }
383 await Promise.all(promises)
f06e48d8 384 for (const workerNode of pool.workerNodes) {
86bf340d 385 expect(workerNode.tasksUsage).toStrictEqual({
fc027381 386 run: expect.any(Number),
86bf340d 387 running: 0,
ee9f5295 388 runTime: 0,
86bf340d
JB
389 runTimeHistory: expect.any(CircularArray),
390 avgRunTime: 0,
391 medRunTime: 0,
392 waitTime: 0,
393 waitTimeHistory: expect.any(CircularArray),
394 avgWaitTime: 0,
395 medWaitTime: 0,
396 error: 0
397 })
fc027381
JB
398 expect(workerNode.tasksUsage.run).toBeGreaterThan(0)
399 expect(workerNode.tasksUsage.run).toBeLessThanOrEqual(maxMultiplier)
9e619829
JB
400 }
401 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
f06e48d8 402 for (const workerNode of pool.workerNodes) {
86bf340d
JB
403 expect(workerNode.tasksUsage).toStrictEqual({
404 run: 0,
405 running: 0,
406 runTime: 0,
407 runTimeHistory: expect.any(CircularArray),
408 avgRunTime: 0,
409 medRunTime: 0,
410 waitTime: 0,
411 waitTimeHistory: expect.any(CircularArray),
412 avgWaitTime: 0,
413 medWaitTime: 0,
414 error: 0
415 })
f06e48d8 416 expect(workerNode.tasksUsage.runTimeHistory.length).toBe(0)
ee9f5295 417 expect(workerNode.tasksUsage.waitTimeHistory.length).toBe(0)
ee11a4a2 418 }
fd7ebd49 419 await pool.destroy()
ee11a4a2
JB
420 })
421
164d950a
JB
422 it("Verify that pool event emitter 'full' event can register a callback", async () => {
423 const pool = new DynamicThreadPool(
424 numberOfWorkers,
425 numberOfWorkers,
426 './tests/worker-files/thread/testWorker.js'
427 )
09c2d0d3 428 const promises = new Set()
164d950a 429 let poolFull = 0
aee46736 430 pool.emitter.on(PoolEvents.full, () => ++poolFull)
164d950a 431 for (let i = 0; i < numberOfWorkers * 2; i++) {
f5d14e90 432 promises.add(pool.execute())
164d950a
JB
433 }
434 await Promise.all(promises)
594bfb84 435 // The `full` event is triggered when the number of submitted tasks at once reach the max number of workers in the dynamic pool.
fc027381
JB
436 // So in total numberOfWorkers * 2 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool with min = max = numberOfWorkers.
437 expect(poolFull).toBe(numberOfWorkers * 2)
164d950a
JB
438 await pool.destroy()
439 })
440
cf597bc5 441 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
7c0ba920
JB
442 const pool = new FixedThreadPool(
443 numberOfWorkers,
444 './tests/worker-files/thread/testWorker.js'
445 )
09c2d0d3 446 const promises = new Set()
7c0ba920 447 let poolBusy = 0
aee46736 448 pool.emitter.on(PoolEvents.busy, () => ++poolBusy)
7c0ba920 449 for (let i = 0; i < numberOfWorkers * 2; i++) {
f5d14e90 450 promises.add(pool.execute())
7c0ba920 451 }
cf597bc5 452 await Promise.all(promises)
14916bf9
JB
453 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
454 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
455 expect(poolBusy).toBe(numberOfWorkers + 1)
fd7ebd49 456 await pool.destroy()
7c0ba920 457 })
70a4f5ea
JB
458
459 it('Verify that multiple tasks worker is working', async () => {
460 const pool = new DynamicClusterPool(
461 numberOfWorkers,
462 numberOfWorkers * 2,
463 './tests/worker-files/cluster/testMultiTasksWorker.js'
464 )
465 const data = { n: 10 }
82888165
JB
466 const result0 = await pool.execute(data)
467 expect(result0).toBe(false)
70a4f5ea
JB
468 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
469 expect(result1).toBe(false)
470 const result2 = await pool.execute(data, 'factorial')
471 expect(result2).toBe(3628800)
472 const result3 = await pool.execute(data, 'fibonacci')
473 expect(result3).toBe(89)
474 })
3ec964d6 475})