feat: conditional task performance computation at the worker level
[poolifier.git] / tests / pools / abstract / abstract-pool.test.js
CommitLineData
a61a0724 1const { expect } = require('expect')
e843b904 2const {
70a4f5ea 3 DynamicClusterPool,
9e619829 4 DynamicThreadPool,
aee46736 5 FixedClusterPool,
e843b904 6 FixedThreadPool,
aee46736 7 PoolEvents,
6b27d407 8 WorkerChoiceStrategies,
184855e6
JB
9 PoolTypes,
10 WorkerTypes
cdace0e5 11} = require('../../../lib')
78099a15 12const { CircularArray } = require('../../../lib/circular-array')
29ee7e9a 13const { Queue } = require('../../../lib/queue')
e1ffb94f
JB
14
15describe('Abstract pool test suite', () => {
fc027381 16 const numberOfWorkers = 2
a8884ffd 17 class StubPoolWithRemoveAllWorker extends FixedThreadPool {
e1ffb94f 18 removeAllWorker () {
d4aeae5a 19 this.workerNodes = []
c923ce56 20 this.promiseResponseMap.clear()
e1ffb94f 21 }
3ec964d6 22 }
a8884ffd 23 class StubPoolWithIsMain extends FixedThreadPool {
e1ffb94f
JB
24 isMain () {
25 return false
26 }
3ec964d6 27 }
3ec964d6 28
3ec964d6 29 it('Simulate pool creation from a non main thread/process', () => {
8d3782fa
JB
30 expect(
31 () =>
a8884ffd 32 new StubPoolWithIsMain(
7c0ba920 33 numberOfWorkers,
8d3782fa
JB
34 './tests/worker-files/thread/testWorker.js',
35 {
36 errorHandler: e => console.error(e)
37 }
38 )
d4aeae5a 39 ).toThrowError('Cannot start a pool from a worker!')
3ec964d6 40 })
c510fea7
APA
41
42 it('Verify that filePath is checked', () => {
292ad316
JB
43 const expectedError = new Error(
44 'Please specify a file with a worker implementation'
45 )
7c0ba920 46 expect(() => new FixedThreadPool(numberOfWorkers)).toThrowError(
292ad316 47 expectedError
8d3782fa 48 )
7c0ba920 49 expect(() => new FixedThreadPool(numberOfWorkers, '')).toThrowError(
292ad316 50 expectedError
8d3782fa
JB
51 )
52 })
53
54 it('Verify that numberOfWorkers is checked', () => {
55 expect(() => new FixedThreadPool()).toThrowError(
d4aeae5a 56 'Cannot instantiate a pool without specifying the number of workers'
8d3782fa
JB
57 )
58 })
59
60 it('Verify that a negative number of workers is checked', () => {
61 expect(
62 () =>
63 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
64 ).toThrowError(
473c717a
JB
65 new RangeError(
66 'Cannot instantiate a pool with a negative number of workers'
67 )
8d3782fa
JB
68 )
69 })
70
71 it('Verify that a non integer number of workers is checked', () => {
72 expect(
73 () =>
74 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
75 ).toThrowError(
473c717a 76 new TypeError(
0d80593b 77 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
78 )
79 )
c510fea7 80 })
7c0ba920 81
fd7ebd49 82 it('Verify that pool options are checked', async () => {
7c0ba920
JB
83 let pool = new FixedThreadPool(
84 numberOfWorkers,
85 './tests/worker-files/thread/testWorker.js'
86 )
7c0ba920 87 expect(pool.emitter).toBeDefined()
1f68cede
JB
88 expect(pool.opts.enableEvents).toBe(true)
89 expect(pool.opts.restartWorkerOnError).toBe(true)
ff733df7 90 expect(pool.opts.enableTasksQueue).toBe(false)
d4aeae5a 91 expect(pool.opts.tasksQueueOptions).toBeUndefined()
e843b904
JB
92 expect(pool.opts.workerChoiceStrategy).toBe(
93 WorkerChoiceStrategies.ROUND_ROBIN
94 )
da309861 95 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
86bf340d
JB
96 medRunTime: false,
97 medWaitTime: false
da309861 98 })
35cf1c03
JB
99 expect(pool.opts.messageHandler).toBeUndefined()
100 expect(pool.opts.errorHandler).toBeUndefined()
101 expect(pool.opts.onlineHandler).toBeUndefined()
102 expect(pool.opts.exitHandler).toBeUndefined()
fd7ebd49 103 await pool.destroy()
35cf1c03 104 const testHandler = () => console.log('test handler executed')
7c0ba920
JB
105 pool = new FixedThreadPool(
106 numberOfWorkers,
107 './tests/worker-files/thread/testWorker.js',
108 {
e4543b14 109 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
49be33fe
JB
110 workerChoiceStrategyOptions: {
111 medRunTime: true,
fc027381 112 weights: { 0: 300, 1: 200 }
49be33fe 113 },
35cf1c03 114 enableEvents: false,
1f68cede 115 restartWorkerOnError: false,
ff733df7 116 enableTasksQueue: true,
d4aeae5a 117 tasksQueueOptions: { concurrency: 2 },
35cf1c03
JB
118 messageHandler: testHandler,
119 errorHandler: testHandler,
120 onlineHandler: testHandler,
121 exitHandler: testHandler
7c0ba920
JB
122 }
123 )
7c0ba920 124 expect(pool.emitter).toBeUndefined()
1f68cede
JB
125 expect(pool.opts.enableEvents).toBe(false)
126 expect(pool.opts.restartWorkerOnError).toBe(false)
ff733df7 127 expect(pool.opts.enableTasksQueue).toBe(true)
d4aeae5a 128 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
e843b904 129 expect(pool.opts.workerChoiceStrategy).toBe(
e4543b14 130 WorkerChoiceStrategies.LEAST_USED
e843b904 131 )
da309861 132 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
49be33fe 133 medRunTime: true,
fc027381 134 weights: { 0: 300, 1: 200 }
da309861 135 })
35cf1c03
JB
136 expect(pool.opts.messageHandler).toStrictEqual(testHandler)
137 expect(pool.opts.errorHandler).toStrictEqual(testHandler)
138 expect(pool.opts.onlineHandler).toStrictEqual(testHandler)
139 expect(pool.opts.exitHandler).toStrictEqual(testHandler)
fd7ebd49 140 await pool.destroy()
7c0ba920
JB
141 })
142
a20f0ba5 143 it('Verify that pool options are validated', async () => {
d4aeae5a
JB
144 expect(
145 () =>
146 new FixedThreadPool(
147 numberOfWorkers,
148 './tests/worker-files/thread/testWorker.js',
149 {
150 enableTasksQueue: true,
151 tasksQueueOptions: { concurrency: 0 }
152 }
153 )
154 ).toThrowError("Invalid worker tasks concurrency '0'")
155 expect(
156 () =>
157 new FixedThreadPool(
158 numberOfWorkers,
159 './tests/worker-files/thread/testWorker.js',
160 {
161 workerChoiceStrategy: 'invalidStrategy'
162 }
163 )
164 ).toThrowError("Invalid worker choice strategy 'invalidStrategy'")
49be33fe
JB
165 expect(
166 () =>
167 new FixedThreadPool(
168 numberOfWorkers,
169 './tests/worker-files/thread/testWorker.js',
170 {
171 workerChoiceStrategyOptions: { weights: {} }
172 }
173 )
174 ).toThrowError(
175 'Invalid worker choice strategy options: must have a weight for each worker node'
176 )
d4aeae5a
JB
177 })
178
a20f0ba5
JB
179 it('Verify that worker choice strategy options can be set', async () => {
180 const pool = new FixedThreadPool(
181 numberOfWorkers,
182 './tests/worker-files/thread/testWorker.js',
183 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
184 )
185 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
86bf340d
JB
186 medRunTime: false,
187 medWaitTime: false
a20f0ba5
JB
188 })
189 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
190 .workerChoiceStrategies) {
86bf340d
JB
191 expect(workerChoiceStrategy.opts).toStrictEqual({
192 medRunTime: false,
193 medWaitTime: false
194 })
a20f0ba5 195 }
b6b32453 196 expect(pool.workerChoiceStrategyContext.getTaskStatistics()).toStrictEqual({
86bf340d
JB
197 runTime: true,
198 avgRunTime: true,
199 medRunTime: false,
200 waitTime: false,
201 avgWaitTime: false,
d44d5953
JB
202 medWaitTime: false,
203 elu: false
86bf340d 204 })
a20f0ba5
JB
205 pool.setWorkerChoiceStrategyOptions({ medRunTime: true })
206 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
207 medRunTime: true
208 })
209 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
210 .workerChoiceStrategies) {
211 expect(workerChoiceStrategy.opts).toStrictEqual({ medRunTime: true })
212 }
b6b32453 213 expect(pool.workerChoiceStrategyContext.getTaskStatistics()).toStrictEqual({
86bf340d
JB
214 runTime: true,
215 avgRunTime: false,
216 medRunTime: true,
217 waitTime: false,
218 avgWaitTime: false,
d44d5953
JB
219 medWaitTime: false,
220 elu: false
86bf340d 221 })
a20f0ba5
JB
222 pool.setWorkerChoiceStrategyOptions({ medRunTime: false })
223 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
224 medRunTime: false
225 })
226 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
227 .workerChoiceStrategies) {
228 expect(workerChoiceStrategy.opts).toStrictEqual({ medRunTime: false })
229 }
b6b32453 230 expect(pool.workerChoiceStrategyContext.getTaskStatistics()).toStrictEqual({
86bf340d
JB
231 runTime: true,
232 avgRunTime: true,
233 medRunTime: false,
234 waitTime: false,
235 avgWaitTime: false,
d44d5953
JB
236 medWaitTime: false,
237 elu: false
86bf340d 238 })
a20f0ba5
JB
239 await pool.destroy()
240 })
241
242 it('Verify that tasks queue can be enabled/disabled', async () => {
243 const pool = new FixedThreadPool(
244 numberOfWorkers,
245 './tests/worker-files/thread/testWorker.js'
246 )
247 expect(pool.opts.enableTasksQueue).toBe(false)
248 expect(pool.opts.tasksQueueOptions).toBeUndefined()
249 pool.enableTasksQueue(true)
250 expect(pool.opts.enableTasksQueue).toBe(true)
251 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 1 })
252 pool.enableTasksQueue(true, { concurrency: 2 })
253 expect(pool.opts.enableTasksQueue).toBe(true)
254 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
255 pool.enableTasksQueue(false)
256 expect(pool.opts.enableTasksQueue).toBe(false)
257 expect(pool.opts.tasksQueueOptions).toBeUndefined()
258 await pool.destroy()
259 })
260
261 it('Verify that tasks queue options can be set', async () => {
262 const pool = new FixedThreadPool(
263 numberOfWorkers,
264 './tests/worker-files/thread/testWorker.js',
265 { enableTasksQueue: true }
266 )
267 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 1 })
268 pool.setTasksQueueOptions({ concurrency: 2 })
269 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
270 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrowError(
271 "Invalid worker tasks concurrency '0'"
272 )
273 await pool.destroy()
274 })
275
6b27d407
JB
276 it('Verify that pool info is set', async () => {
277 let pool = new FixedThreadPool(
278 numberOfWorkers,
279 './tests/worker-files/thread/testWorker.js'
280 )
281 expect(pool.info).toStrictEqual({
282 type: PoolTypes.fixed,
184855e6 283 worker: WorkerTypes.thread,
6b27d407
JB
284 minSize: numberOfWorkers,
285 maxSize: numberOfWorkers,
286 workerNodes: numberOfWorkers,
287 idleWorkerNodes: numberOfWorkers,
288 busyWorkerNodes: 0,
289 runningTasks: 0,
290 queuedTasks: 0,
291 maxQueuedTasks: 0
292 })
293 await pool.destroy()
294 pool = new DynamicClusterPool(
295 numberOfWorkers,
296 numberOfWorkers * 2,
297 './tests/worker-files/thread/testWorker.js'
298 )
299 expect(pool.info).toStrictEqual({
300 type: PoolTypes.dynamic,
184855e6 301 worker: WorkerTypes.cluster,
6b27d407
JB
302 minSize: numberOfWorkers,
303 maxSize: numberOfWorkers * 2,
304 workerNodes: numberOfWorkers,
305 idleWorkerNodes: numberOfWorkers,
306 busyWorkerNodes: 0,
307 runningTasks: 0,
308 queuedTasks: 0,
309 maxQueuedTasks: 0
310 })
311 await pool.destroy()
312 })
313
9d16d33e 314 it('Simulate worker not found', async () => {
a8884ffd 315 const pool = new StubPoolWithRemoveAllWorker(
10fcfaf4
JB
316 numberOfWorkers,
317 './tests/worker-files/cluster/testWorker.js',
318 {
10fcfaf4
JB
319 errorHandler: e => console.error(e)
320 }
321 )
d4aeae5a 322 expect(pool.workerNodes.length).toBe(numberOfWorkers)
10fcfaf4
JB
323 // Simulate worker not found.
324 pool.removeAllWorker()
d4aeae5a 325 expect(pool.workerNodes.length).toBe(0)
fd7ebd49 326 await pool.destroy()
bf9549ae
JB
327 })
328
fd7ebd49 329 it('Verify that worker pool tasks usage are initialized', async () => {
bf9549ae
JB
330 const pool = new FixedClusterPool(
331 numberOfWorkers,
332 './tests/worker-files/cluster/testWorker.js'
333 )
f06e48d8 334 for (const workerNode of pool.workerNodes) {
86bf340d 335 expect(workerNode.tasksUsage).toStrictEqual({
1ab50fe5 336 ran: 0,
86bf340d
JB
337 running: 0,
338 runTime: 0,
339 runTimeHistory: expect.any(CircularArray),
340 avgRunTime: 0,
341 medRunTime: 0,
342 waitTime: 0,
343 waitTimeHistory: expect.any(CircularArray),
344 avgWaitTime: 0,
345 medWaitTime: 0,
d44d5953
JB
346 error: 0,
347 elu: undefined
86bf340d 348 })
f06e48d8
JB
349 }
350 await pool.destroy()
351 })
352
353 it('Verify that worker pool tasks queue are initialized', async () => {
354 const pool = new FixedClusterPool(
355 numberOfWorkers,
356 './tests/worker-files/cluster/testWorker.js'
357 )
358 for (const workerNode of pool.workerNodes) {
359 expect(workerNode.tasksQueue).toBeDefined()
29ee7e9a 360 expect(workerNode.tasksQueue).toBeInstanceOf(Queue)
4d8bf9e4 361 expect(workerNode.tasksQueue.size).toBe(0)
bf9549ae 362 }
fd7ebd49 363 await pool.destroy()
bf9549ae
JB
364 })
365
366 it('Verify that worker pool tasks usage are computed', async () => {
367 const pool = new FixedClusterPool(
368 numberOfWorkers,
369 './tests/worker-files/cluster/testWorker.js'
370 )
09c2d0d3 371 const promises = new Set()
fc027381
JB
372 const maxMultiplier = 2
373 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
09c2d0d3 374 promises.add(pool.execute())
bf9549ae 375 }
f06e48d8 376 for (const workerNode of pool.workerNodes) {
86bf340d 377 expect(workerNode.tasksUsage).toStrictEqual({
1ab50fe5 378 ran: 0,
fc027381 379 running: maxMultiplier,
86bf340d
JB
380 runTime: 0,
381 runTimeHistory: expect.any(CircularArray),
382 avgRunTime: 0,
383 medRunTime: 0,
384 waitTime: 0,
385 waitTimeHistory: expect.any(CircularArray),
386 avgWaitTime: 0,
387 medWaitTime: 0,
d44d5953
JB
388 error: 0,
389 elu: undefined
86bf340d 390 })
bf9549ae
JB
391 }
392 await Promise.all(promises)
f06e48d8 393 for (const workerNode of pool.workerNodes) {
86bf340d 394 expect(workerNode.tasksUsage).toStrictEqual({
1ab50fe5 395 ran: maxMultiplier,
86bf340d
JB
396 running: 0,
397 runTime: 0,
398 runTimeHistory: expect.any(CircularArray),
399 avgRunTime: 0,
400 medRunTime: 0,
401 waitTime: 0,
402 waitTimeHistory: expect.any(CircularArray),
403 avgWaitTime: 0,
404 medWaitTime: 0,
d44d5953
JB
405 error: 0,
406 elu: undefined
86bf340d 407 })
bf9549ae 408 }
fd7ebd49 409 await pool.destroy()
bf9549ae
JB
410 })
411
ee11a4a2 412 it('Verify that worker pool tasks usage are reset at worker choice strategy change', async () => {
7fd82a1c 413 const pool = new DynamicThreadPool(
9e619829 414 numberOfWorkers,
8f4878b7 415 numberOfWorkers,
9e619829
JB
416 './tests/worker-files/thread/testWorker.js'
417 )
09c2d0d3 418 const promises = new Set()
ee9f5295
JB
419 const maxMultiplier = 2
420 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
09c2d0d3 421 promises.add(pool.execute())
9e619829
JB
422 }
423 await Promise.all(promises)
f06e48d8 424 for (const workerNode of pool.workerNodes) {
86bf340d 425 expect(workerNode.tasksUsage).toStrictEqual({
1ab50fe5 426 ran: expect.any(Number),
86bf340d 427 running: 0,
ee9f5295 428 runTime: 0,
86bf340d
JB
429 runTimeHistory: expect.any(CircularArray),
430 avgRunTime: 0,
431 medRunTime: 0,
432 waitTime: 0,
433 waitTimeHistory: expect.any(CircularArray),
434 avgWaitTime: 0,
435 medWaitTime: 0,
d44d5953
JB
436 error: 0,
437 elu: undefined
86bf340d 438 })
1ab50fe5
JB
439 expect(workerNode.tasksUsage.ran).toBeGreaterThan(0)
440 expect(workerNode.tasksUsage.ran).toBeLessThanOrEqual(maxMultiplier)
9e619829
JB
441 }
442 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
f06e48d8 443 for (const workerNode of pool.workerNodes) {
86bf340d 444 expect(workerNode.tasksUsage).toStrictEqual({
1ab50fe5 445 ran: 0,
86bf340d
JB
446 running: 0,
447 runTime: 0,
448 runTimeHistory: expect.any(CircularArray),
449 avgRunTime: 0,
450 medRunTime: 0,
451 waitTime: 0,
452 waitTimeHistory: expect.any(CircularArray),
453 avgWaitTime: 0,
454 medWaitTime: 0,
d44d5953
JB
455 error: 0,
456 elu: undefined
86bf340d 457 })
f06e48d8 458 expect(workerNode.tasksUsage.runTimeHistory.length).toBe(0)
ee9f5295 459 expect(workerNode.tasksUsage.waitTimeHistory.length).toBe(0)
ee11a4a2 460 }
fd7ebd49 461 await pool.destroy()
ee11a4a2
JB
462 })
463
164d950a
JB
464 it("Verify that pool event emitter 'full' event can register a callback", async () => {
465 const pool = new DynamicThreadPool(
466 numberOfWorkers,
467 numberOfWorkers,
468 './tests/worker-files/thread/testWorker.js'
469 )
09c2d0d3 470 const promises = new Set()
164d950a 471 let poolFull = 0
aee46736 472 pool.emitter.on(PoolEvents.full, () => ++poolFull)
164d950a 473 for (let i = 0; i < numberOfWorkers * 2; i++) {
f5d14e90 474 promises.add(pool.execute())
164d950a
JB
475 }
476 await Promise.all(promises)
594bfb84 477 // The `full` event is triggered when the number of submitted tasks at once reach the max number of workers in the dynamic pool.
fc027381
JB
478 // So in total numberOfWorkers * 2 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool with min = max = numberOfWorkers.
479 expect(poolFull).toBe(numberOfWorkers * 2)
164d950a
JB
480 await pool.destroy()
481 })
482
cf597bc5 483 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
7c0ba920
JB
484 const pool = new FixedThreadPool(
485 numberOfWorkers,
486 './tests/worker-files/thread/testWorker.js'
487 )
09c2d0d3 488 const promises = new Set()
7c0ba920 489 let poolBusy = 0
aee46736 490 pool.emitter.on(PoolEvents.busy, () => ++poolBusy)
7c0ba920 491 for (let i = 0; i < numberOfWorkers * 2; i++) {
f5d14e90 492 promises.add(pool.execute())
7c0ba920 493 }
cf597bc5 494 await Promise.all(promises)
14916bf9
JB
495 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
496 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
497 expect(poolBusy).toBe(numberOfWorkers + 1)
fd7ebd49 498 await pool.destroy()
7c0ba920 499 })
70a4f5ea
JB
500
501 it('Verify that multiple tasks worker is working', async () => {
502 const pool = new DynamicClusterPool(
503 numberOfWorkers,
504 numberOfWorkers * 2,
505 './tests/worker-files/cluster/testMultiTasksWorker.js'
506 )
507 const data = { n: 10 }
82888165
JB
508 const result0 = await pool.execute(data)
509 expect(result0).toBe(false)
70a4f5ea
JB
510 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
511 expect(result1).toBe(false)
512 const result2 = await pool.execute(data, 'factorial')
513 expect(result2).toBe(3628800)
514 const result3 = await pool.execute(data, 'fibonacci')
515 expect(result3).toBe(89)
516 })
3ec964d6 517})