feat: expose pool information
[poolifier.git] / tests / pools / abstract / abstract-pool.test.js
CommitLineData
a61a0724 1const { expect } = require('expect')
e843b904 2const {
70a4f5ea 3 DynamicClusterPool,
9e619829 4 DynamicThreadPool,
aee46736 5 FixedClusterPool,
e843b904 6 FixedThreadPool,
aee46736 7 PoolEvents,
6b27d407
JB
8 WorkerChoiceStrategies,
9 PoolTypes
cdace0e5 10} = require('../../../lib')
78099a15 11const { CircularArray } = require('../../../lib/circular-array')
29ee7e9a 12const { Queue } = require('../../../lib/queue')
e1ffb94f
JB
13
14describe('Abstract pool test suite', () => {
fc027381 15 const numberOfWorkers = 2
a8884ffd 16 class StubPoolWithRemoveAllWorker extends FixedThreadPool {
e1ffb94f 17 removeAllWorker () {
d4aeae5a 18 this.workerNodes = []
c923ce56 19 this.promiseResponseMap.clear()
e1ffb94f 20 }
3ec964d6 21 }
a8884ffd 22 class StubPoolWithIsMain extends FixedThreadPool {
e1ffb94f
JB
23 isMain () {
24 return false
25 }
3ec964d6 26 }
3ec964d6 27
3ec964d6 28 it('Simulate pool creation from a non main thread/process', () => {
8d3782fa
JB
29 expect(
30 () =>
a8884ffd 31 new StubPoolWithIsMain(
7c0ba920 32 numberOfWorkers,
8d3782fa
JB
33 './tests/worker-files/thread/testWorker.js',
34 {
35 errorHandler: e => console.error(e)
36 }
37 )
d4aeae5a 38 ).toThrowError('Cannot start a pool from a worker!')
3ec964d6 39 })
c510fea7
APA
40
41 it('Verify that filePath is checked', () => {
292ad316
JB
42 const expectedError = new Error(
43 'Please specify a file with a worker implementation'
44 )
7c0ba920 45 expect(() => new FixedThreadPool(numberOfWorkers)).toThrowError(
292ad316 46 expectedError
8d3782fa 47 )
7c0ba920 48 expect(() => new FixedThreadPool(numberOfWorkers, '')).toThrowError(
292ad316 49 expectedError
8d3782fa
JB
50 )
51 })
52
53 it('Verify that numberOfWorkers is checked', () => {
54 expect(() => new FixedThreadPool()).toThrowError(
d4aeae5a 55 'Cannot instantiate a pool without specifying the number of workers'
8d3782fa
JB
56 )
57 })
58
59 it('Verify that a negative number of workers is checked', () => {
60 expect(
61 () =>
62 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
63 ).toThrowError(
473c717a
JB
64 new RangeError(
65 'Cannot instantiate a pool with a negative number of workers'
66 )
8d3782fa
JB
67 )
68 })
69
70 it('Verify that a non integer number of workers is checked', () => {
71 expect(
72 () =>
73 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
74 ).toThrowError(
473c717a 75 new TypeError(
0d80593b 76 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
77 )
78 )
c510fea7 79 })
7c0ba920 80
fd7ebd49 81 it('Verify that pool options are checked', async () => {
7c0ba920
JB
82 let pool = new FixedThreadPool(
83 numberOfWorkers,
84 './tests/worker-files/thread/testWorker.js'
85 )
7c0ba920 86 expect(pool.emitter).toBeDefined()
1f68cede
JB
87 expect(pool.opts.enableEvents).toBe(true)
88 expect(pool.opts.restartWorkerOnError).toBe(true)
ff733df7 89 expect(pool.opts.enableTasksQueue).toBe(false)
d4aeae5a 90 expect(pool.opts.tasksQueueOptions).toBeUndefined()
e843b904
JB
91 expect(pool.opts.workerChoiceStrategy).toBe(
92 WorkerChoiceStrategies.ROUND_ROBIN
93 )
da309861 94 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
86bf340d
JB
95 medRunTime: false,
96 medWaitTime: false
da309861 97 })
35cf1c03
JB
98 expect(pool.opts.messageHandler).toBeUndefined()
99 expect(pool.opts.errorHandler).toBeUndefined()
100 expect(pool.opts.onlineHandler).toBeUndefined()
101 expect(pool.opts.exitHandler).toBeUndefined()
fd7ebd49 102 await pool.destroy()
35cf1c03 103 const testHandler = () => console.log('test handler executed')
7c0ba920
JB
104 pool = new FixedThreadPool(
105 numberOfWorkers,
106 './tests/worker-files/thread/testWorker.js',
107 {
e4543b14 108 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
49be33fe
JB
109 workerChoiceStrategyOptions: {
110 medRunTime: true,
fc027381 111 weights: { 0: 300, 1: 200 }
49be33fe 112 },
35cf1c03 113 enableEvents: false,
1f68cede 114 restartWorkerOnError: false,
ff733df7 115 enableTasksQueue: true,
d4aeae5a 116 tasksQueueOptions: { concurrency: 2 },
35cf1c03
JB
117 messageHandler: testHandler,
118 errorHandler: testHandler,
119 onlineHandler: testHandler,
120 exitHandler: testHandler
7c0ba920
JB
121 }
122 )
7c0ba920 123 expect(pool.emitter).toBeUndefined()
1f68cede
JB
124 expect(pool.opts.enableEvents).toBe(false)
125 expect(pool.opts.restartWorkerOnError).toBe(false)
ff733df7 126 expect(pool.opts.enableTasksQueue).toBe(true)
d4aeae5a 127 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
e843b904 128 expect(pool.opts.workerChoiceStrategy).toBe(
e4543b14 129 WorkerChoiceStrategies.LEAST_USED
e843b904 130 )
da309861 131 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
49be33fe 132 medRunTime: true,
fc027381 133 weights: { 0: 300, 1: 200 }
da309861 134 })
35cf1c03
JB
135 expect(pool.opts.messageHandler).toStrictEqual(testHandler)
136 expect(pool.opts.errorHandler).toStrictEqual(testHandler)
137 expect(pool.opts.onlineHandler).toStrictEqual(testHandler)
138 expect(pool.opts.exitHandler).toStrictEqual(testHandler)
fd7ebd49 139 await pool.destroy()
7c0ba920
JB
140 })
141
a20f0ba5 142 it('Verify that pool options are validated', async () => {
d4aeae5a
JB
143 expect(
144 () =>
145 new FixedThreadPool(
146 numberOfWorkers,
147 './tests/worker-files/thread/testWorker.js',
148 {
149 enableTasksQueue: true,
150 tasksQueueOptions: { concurrency: 0 }
151 }
152 )
153 ).toThrowError("Invalid worker tasks concurrency '0'")
154 expect(
155 () =>
156 new FixedThreadPool(
157 numberOfWorkers,
158 './tests/worker-files/thread/testWorker.js',
159 {
160 workerChoiceStrategy: 'invalidStrategy'
161 }
162 )
163 ).toThrowError("Invalid worker choice strategy 'invalidStrategy'")
49be33fe
JB
164 expect(
165 () =>
166 new FixedThreadPool(
167 numberOfWorkers,
168 './tests/worker-files/thread/testWorker.js',
169 {
170 workerChoiceStrategyOptions: { weights: {} }
171 }
172 )
173 ).toThrowError(
174 'Invalid worker choice strategy options: must have a weight for each worker node'
175 )
d4aeae5a
JB
176 })
177
a20f0ba5
JB
178 it('Verify that worker choice strategy options can be set', async () => {
179 const pool = new FixedThreadPool(
180 numberOfWorkers,
181 './tests/worker-files/thread/testWorker.js',
182 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
183 )
184 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
86bf340d
JB
185 medRunTime: false,
186 medWaitTime: false
a20f0ba5
JB
187 })
188 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
189 .workerChoiceStrategies) {
86bf340d
JB
190 expect(workerChoiceStrategy.opts).toStrictEqual({
191 medRunTime: false,
192 medWaitTime: false
193 })
a20f0ba5
JB
194 }
195 expect(
86bf340d
JB
196 pool.workerChoiceStrategyContext.getRequiredStatistics()
197 ).toStrictEqual({
198 runTime: true,
199 avgRunTime: true,
200 medRunTime: false,
201 waitTime: false,
202 avgWaitTime: false,
203 medWaitTime: false
204 })
a20f0ba5
JB
205 pool.setWorkerChoiceStrategyOptions({ medRunTime: true })
206 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
207 medRunTime: true
208 })
209 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
210 .workerChoiceStrategies) {
211 expect(workerChoiceStrategy.opts).toStrictEqual({ medRunTime: true })
212 }
213 expect(
86bf340d
JB
214 pool.workerChoiceStrategyContext.getRequiredStatistics()
215 ).toStrictEqual({
216 runTime: true,
217 avgRunTime: false,
218 medRunTime: true,
219 waitTime: false,
220 avgWaitTime: false,
221 medWaitTime: false
222 })
a20f0ba5
JB
223 pool.setWorkerChoiceStrategyOptions({ medRunTime: false })
224 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
225 medRunTime: false
226 })
227 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
228 .workerChoiceStrategies) {
229 expect(workerChoiceStrategy.opts).toStrictEqual({ medRunTime: false })
230 }
231 expect(
86bf340d
JB
232 pool.workerChoiceStrategyContext.getRequiredStatistics()
233 ).toStrictEqual({
234 runTime: true,
235 avgRunTime: true,
236 medRunTime: false,
237 waitTime: false,
238 avgWaitTime: false,
239 medWaitTime: false
240 })
a20f0ba5
JB
241 await pool.destroy()
242 })
243
244 it('Verify that tasks queue can be enabled/disabled', async () => {
245 const pool = new FixedThreadPool(
246 numberOfWorkers,
247 './tests/worker-files/thread/testWorker.js'
248 )
249 expect(pool.opts.enableTasksQueue).toBe(false)
250 expect(pool.opts.tasksQueueOptions).toBeUndefined()
251 pool.enableTasksQueue(true)
252 expect(pool.opts.enableTasksQueue).toBe(true)
253 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 1 })
254 pool.enableTasksQueue(true, { concurrency: 2 })
255 expect(pool.opts.enableTasksQueue).toBe(true)
256 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
257 pool.enableTasksQueue(false)
258 expect(pool.opts.enableTasksQueue).toBe(false)
259 expect(pool.opts.tasksQueueOptions).toBeUndefined()
260 await pool.destroy()
261 })
262
263 it('Verify that tasks queue options can be set', async () => {
264 const pool = new FixedThreadPool(
265 numberOfWorkers,
266 './tests/worker-files/thread/testWorker.js',
267 { enableTasksQueue: true }
268 )
269 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 1 })
270 pool.setTasksQueueOptions({ concurrency: 2 })
271 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
272 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrowError(
273 "Invalid worker tasks concurrency '0'"
274 )
275 await pool.destroy()
276 })
277
6b27d407
JB
278 it('Verify that pool info is set', async () => {
279 let pool = new FixedThreadPool(
280 numberOfWorkers,
281 './tests/worker-files/thread/testWorker.js'
282 )
283 expect(pool.info).toStrictEqual({
284 type: PoolTypes.fixed,
285 minSize: numberOfWorkers,
286 maxSize: numberOfWorkers,
287 workerNodes: numberOfWorkers,
288 idleWorkerNodes: numberOfWorkers,
289 busyWorkerNodes: 0,
290 runningTasks: 0,
291 queuedTasks: 0,
292 maxQueuedTasks: 0
293 })
294 await pool.destroy()
295 pool = new DynamicClusterPool(
296 numberOfWorkers,
297 numberOfWorkers * 2,
298 './tests/worker-files/thread/testWorker.js'
299 )
300 expect(pool.info).toStrictEqual({
301 type: PoolTypes.dynamic,
302 minSize: numberOfWorkers,
303 maxSize: numberOfWorkers * 2,
304 workerNodes: numberOfWorkers,
305 idleWorkerNodes: numberOfWorkers,
306 busyWorkerNodes: 0,
307 runningTasks: 0,
308 queuedTasks: 0,
309 maxQueuedTasks: 0
310 })
311 await pool.destroy()
312 })
313
9d16d33e 314 it('Simulate worker not found', async () => {
a8884ffd 315 const pool = new StubPoolWithRemoveAllWorker(
10fcfaf4
JB
316 numberOfWorkers,
317 './tests/worker-files/cluster/testWorker.js',
318 {
10fcfaf4
JB
319 errorHandler: e => console.error(e)
320 }
321 )
d4aeae5a 322 expect(pool.workerNodes.length).toBe(numberOfWorkers)
10fcfaf4
JB
323 // Simulate worker not found.
324 pool.removeAllWorker()
d4aeae5a 325 expect(pool.workerNodes.length).toBe(0)
fd7ebd49 326 await pool.destroy()
bf9549ae
JB
327 })
328
fd7ebd49 329 it('Verify that worker pool tasks usage are initialized', async () => {
bf9549ae
JB
330 const pool = new FixedClusterPool(
331 numberOfWorkers,
332 './tests/worker-files/cluster/testWorker.js'
333 )
f06e48d8 334 for (const workerNode of pool.workerNodes) {
86bf340d
JB
335 expect(workerNode.tasksUsage).toStrictEqual({
336 run: 0,
337 running: 0,
338 runTime: 0,
339 runTimeHistory: expect.any(CircularArray),
340 avgRunTime: 0,
341 medRunTime: 0,
342 waitTime: 0,
343 waitTimeHistory: expect.any(CircularArray),
344 avgWaitTime: 0,
345 medWaitTime: 0,
346 error: 0
347 })
f06e48d8
JB
348 }
349 await pool.destroy()
350 })
351
352 it('Verify that worker pool tasks queue are initialized', async () => {
353 const pool = new FixedClusterPool(
354 numberOfWorkers,
355 './tests/worker-files/cluster/testWorker.js'
356 )
357 for (const workerNode of pool.workerNodes) {
358 expect(workerNode.tasksQueue).toBeDefined()
29ee7e9a 359 expect(workerNode.tasksQueue).toBeInstanceOf(Queue)
4d8bf9e4 360 expect(workerNode.tasksQueue.size).toBe(0)
bf9549ae 361 }
fd7ebd49 362 await pool.destroy()
bf9549ae
JB
363 })
364
365 it('Verify that worker pool tasks usage are computed', async () => {
366 const pool = new FixedClusterPool(
367 numberOfWorkers,
368 './tests/worker-files/cluster/testWorker.js'
369 )
09c2d0d3 370 const promises = new Set()
fc027381
JB
371 const maxMultiplier = 2
372 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
09c2d0d3 373 promises.add(pool.execute())
bf9549ae 374 }
f06e48d8 375 for (const workerNode of pool.workerNodes) {
86bf340d
JB
376 expect(workerNode.tasksUsage).toStrictEqual({
377 run: 0,
fc027381 378 running: maxMultiplier,
86bf340d
JB
379 runTime: 0,
380 runTimeHistory: expect.any(CircularArray),
381 avgRunTime: 0,
382 medRunTime: 0,
383 waitTime: 0,
384 waitTimeHistory: expect.any(CircularArray),
385 avgWaitTime: 0,
386 medWaitTime: 0,
387 error: 0
388 })
bf9549ae
JB
389 }
390 await Promise.all(promises)
f06e48d8 391 for (const workerNode of pool.workerNodes) {
86bf340d 392 expect(workerNode.tasksUsage).toStrictEqual({
fc027381 393 run: maxMultiplier,
86bf340d
JB
394 running: 0,
395 runTime: 0,
396 runTimeHistory: expect.any(CircularArray),
397 avgRunTime: 0,
398 medRunTime: 0,
399 waitTime: 0,
400 waitTimeHistory: expect.any(CircularArray),
401 avgWaitTime: 0,
402 medWaitTime: 0,
403 error: 0
404 })
bf9549ae 405 }
fd7ebd49 406 await pool.destroy()
bf9549ae
JB
407 })
408
ee11a4a2 409 it('Verify that worker pool tasks usage are reset at worker choice strategy change', async () => {
7fd82a1c 410 const pool = new DynamicThreadPool(
9e619829 411 numberOfWorkers,
8f4878b7 412 numberOfWorkers,
9e619829
JB
413 './tests/worker-files/thread/testWorker.js'
414 )
09c2d0d3 415 const promises = new Set()
ee9f5295
JB
416 const maxMultiplier = 2
417 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
09c2d0d3 418 promises.add(pool.execute())
9e619829
JB
419 }
420 await Promise.all(promises)
f06e48d8 421 for (const workerNode of pool.workerNodes) {
86bf340d 422 expect(workerNode.tasksUsage).toStrictEqual({
fc027381 423 run: expect.any(Number),
86bf340d 424 running: 0,
ee9f5295 425 runTime: 0,
86bf340d
JB
426 runTimeHistory: expect.any(CircularArray),
427 avgRunTime: 0,
428 medRunTime: 0,
429 waitTime: 0,
430 waitTimeHistory: expect.any(CircularArray),
431 avgWaitTime: 0,
432 medWaitTime: 0,
433 error: 0
434 })
fc027381
JB
435 expect(workerNode.tasksUsage.run).toBeGreaterThan(0)
436 expect(workerNode.tasksUsage.run).toBeLessThanOrEqual(maxMultiplier)
9e619829
JB
437 }
438 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
f06e48d8 439 for (const workerNode of pool.workerNodes) {
86bf340d
JB
440 expect(workerNode.tasksUsage).toStrictEqual({
441 run: 0,
442 running: 0,
443 runTime: 0,
444 runTimeHistory: expect.any(CircularArray),
445 avgRunTime: 0,
446 medRunTime: 0,
447 waitTime: 0,
448 waitTimeHistory: expect.any(CircularArray),
449 avgWaitTime: 0,
450 medWaitTime: 0,
451 error: 0
452 })
f06e48d8 453 expect(workerNode.tasksUsage.runTimeHistory.length).toBe(0)
ee9f5295 454 expect(workerNode.tasksUsage.waitTimeHistory.length).toBe(0)
ee11a4a2 455 }
fd7ebd49 456 await pool.destroy()
ee11a4a2
JB
457 })
458
164d950a
JB
459 it("Verify that pool event emitter 'full' event can register a callback", async () => {
460 const pool = new DynamicThreadPool(
461 numberOfWorkers,
462 numberOfWorkers,
463 './tests/worker-files/thread/testWorker.js'
464 )
09c2d0d3 465 const promises = new Set()
164d950a 466 let poolFull = 0
aee46736 467 pool.emitter.on(PoolEvents.full, () => ++poolFull)
164d950a 468 for (let i = 0; i < numberOfWorkers * 2; i++) {
f5d14e90 469 promises.add(pool.execute())
164d950a
JB
470 }
471 await Promise.all(promises)
594bfb84 472 // The `full` event is triggered when the number of submitted tasks at once reach the max number of workers in the dynamic pool.
fc027381
JB
473 // So in total numberOfWorkers * 2 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool with min = max = numberOfWorkers.
474 expect(poolFull).toBe(numberOfWorkers * 2)
164d950a
JB
475 await pool.destroy()
476 })
477
cf597bc5 478 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
7c0ba920
JB
479 const pool = new FixedThreadPool(
480 numberOfWorkers,
481 './tests/worker-files/thread/testWorker.js'
482 )
09c2d0d3 483 const promises = new Set()
7c0ba920 484 let poolBusy = 0
aee46736 485 pool.emitter.on(PoolEvents.busy, () => ++poolBusy)
7c0ba920 486 for (let i = 0; i < numberOfWorkers * 2; i++) {
f5d14e90 487 promises.add(pool.execute())
7c0ba920 488 }
cf597bc5 489 await Promise.all(promises)
14916bf9
JB
490 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
491 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
492 expect(poolBusy).toBe(numberOfWorkers + 1)
fd7ebd49 493 await pool.destroy()
7c0ba920 494 })
70a4f5ea
JB
495
496 it('Verify that multiple tasks worker is working', async () => {
497 const pool = new DynamicClusterPool(
498 numberOfWorkers,
499 numberOfWorkers * 2,
500 './tests/worker-files/cluster/testMultiTasksWorker.js'
501 )
502 const data = { n: 10 }
82888165
JB
503 const result0 = await pool.execute(data)
504 expect(result0).toBe(false)
70a4f5ea
JB
505 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
506 expect(result1).toBe(false)
507 const result2 = await pool.execute(data, 'factorial')
508 expect(result2).toBe(3628800)
509 const result3 = await pool.execute(data, 'fibonacci')
510 expect(result3).toBe(89)
511 })
3ec964d6 512})