refactor: abstract out measurement statistics
[poolifier.git] / tests / pools / abstract / abstract-pool.test.js
1 const { expect } = require('expect')
2 const {
3 DynamicClusterPool,
4 DynamicThreadPool,
5 FixedClusterPool,
6 FixedThreadPool,
7 PoolEvents,
8 WorkerChoiceStrategies,
9 PoolTypes,
10 WorkerTypes
11 } = require('../../../lib')
12 const { CircularArray } = require('../../../lib/circular-array')
13 const { Queue } = require('../../../lib/queue')
14
15 describe('Abstract pool test suite', () => {
16 const numberOfWorkers = 2
17 class StubPoolWithRemoveAllWorker extends FixedThreadPool {
18 removeAllWorker () {
19 this.workerNodes = []
20 this.promiseResponseMap.clear()
21 }
22 }
23 class StubPoolWithIsMain extends FixedThreadPool {
24 isMain () {
25 return false
26 }
27 }
28
29 it('Simulate pool creation from a non main thread/process', () => {
30 expect(
31 () =>
32 new StubPoolWithIsMain(
33 numberOfWorkers,
34 './tests/worker-files/thread/testWorker.js',
35 {
36 errorHandler: e => console.error(e)
37 }
38 )
39 ).toThrowError('Cannot start a pool from a worker!')
40 })
41
42 it('Verify that filePath is checked', () => {
43 const expectedError = new Error(
44 'Please specify a file with a worker implementation'
45 )
46 expect(() => new FixedThreadPool(numberOfWorkers)).toThrowError(
47 expectedError
48 )
49 expect(() => new FixedThreadPool(numberOfWorkers, '')).toThrowError(
50 expectedError
51 )
52 })
53
54 it('Verify that numberOfWorkers is checked', () => {
55 expect(() => new FixedThreadPool()).toThrowError(
56 'Cannot instantiate a pool without specifying the number of workers'
57 )
58 })
59
60 it('Verify that a negative number of workers is checked', () => {
61 expect(
62 () =>
63 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
64 ).toThrowError(
65 new RangeError(
66 'Cannot instantiate a pool with a negative number of workers'
67 )
68 )
69 })
70
71 it('Verify that a non integer number of workers is checked', () => {
72 expect(
73 () =>
74 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
75 ).toThrowError(
76 new TypeError(
77 'Cannot instantiate a pool with a non safe integer number of workers'
78 )
79 )
80 })
81
82 it('Verify that pool options are checked', async () => {
83 let pool = new FixedThreadPool(
84 numberOfWorkers,
85 './tests/worker-files/thread/testWorker.js'
86 )
87 expect(pool.emitter).toBeDefined()
88 expect(pool.opts.enableEvents).toBe(true)
89 expect(pool.opts.restartWorkerOnError).toBe(true)
90 expect(pool.opts.enableTasksQueue).toBe(false)
91 expect(pool.opts.tasksQueueOptions).toBeUndefined()
92 expect(pool.opts.workerChoiceStrategy).toBe(
93 WorkerChoiceStrategies.ROUND_ROBIN
94 )
95 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
96 medRunTime: false,
97 medWaitTime: false
98 })
99 expect(pool.opts.messageHandler).toBeUndefined()
100 expect(pool.opts.errorHandler).toBeUndefined()
101 expect(pool.opts.onlineHandler).toBeUndefined()
102 expect(pool.opts.exitHandler).toBeUndefined()
103 await pool.destroy()
104 const testHandler = () => console.log('test handler executed')
105 pool = new FixedThreadPool(
106 numberOfWorkers,
107 './tests/worker-files/thread/testWorker.js',
108 {
109 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
110 workerChoiceStrategyOptions: {
111 medRunTime: true,
112 weights: { 0: 300, 1: 200 }
113 },
114 enableEvents: false,
115 restartWorkerOnError: false,
116 enableTasksQueue: true,
117 tasksQueueOptions: { concurrency: 2 },
118 messageHandler: testHandler,
119 errorHandler: testHandler,
120 onlineHandler: testHandler,
121 exitHandler: testHandler
122 }
123 )
124 expect(pool.emitter).toBeUndefined()
125 expect(pool.opts.enableEvents).toBe(false)
126 expect(pool.opts.restartWorkerOnError).toBe(false)
127 expect(pool.opts.enableTasksQueue).toBe(true)
128 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
129 expect(pool.opts.workerChoiceStrategy).toBe(
130 WorkerChoiceStrategies.LEAST_USED
131 )
132 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
133 medRunTime: true,
134 weights: { 0: 300, 1: 200 }
135 })
136 expect(pool.opts.messageHandler).toStrictEqual(testHandler)
137 expect(pool.opts.errorHandler).toStrictEqual(testHandler)
138 expect(pool.opts.onlineHandler).toStrictEqual(testHandler)
139 expect(pool.opts.exitHandler).toStrictEqual(testHandler)
140 await pool.destroy()
141 })
142
143 it('Verify that pool options are validated', async () => {
144 expect(
145 () =>
146 new FixedThreadPool(
147 numberOfWorkers,
148 './tests/worker-files/thread/testWorker.js',
149 {
150 enableTasksQueue: true,
151 tasksQueueOptions: { concurrency: 0 }
152 }
153 )
154 ).toThrowError("Invalid worker tasks concurrency '0'")
155 expect(
156 () =>
157 new FixedThreadPool(
158 numberOfWorkers,
159 './tests/worker-files/thread/testWorker.js',
160 {
161 workerChoiceStrategy: 'invalidStrategy'
162 }
163 )
164 ).toThrowError("Invalid worker choice strategy 'invalidStrategy'")
165 expect(
166 () =>
167 new FixedThreadPool(
168 numberOfWorkers,
169 './tests/worker-files/thread/testWorker.js',
170 {
171 workerChoiceStrategyOptions: { weights: {} }
172 }
173 )
174 ).toThrowError(
175 'Invalid worker choice strategy options: must have a weight for each worker node'
176 )
177 })
178
179 it('Verify that worker choice strategy options can be set', async () => {
180 const pool = new FixedThreadPool(
181 numberOfWorkers,
182 './tests/worker-files/thread/testWorker.js',
183 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
184 )
185 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
186 medRunTime: false,
187 medWaitTime: false
188 })
189 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
190 .workerChoiceStrategies) {
191 expect(workerChoiceStrategy.opts).toStrictEqual({
192 medRunTime: false,
193 medWaitTime: false
194 })
195 }
196 expect(pool.workerChoiceStrategyContext.getTaskStatistics()).toStrictEqual({
197 runTime: true,
198 avgRunTime: true,
199 medRunTime: false,
200 waitTime: false,
201 avgWaitTime: false,
202 medWaitTime: false,
203 elu: false
204 })
205 pool.setWorkerChoiceStrategyOptions({ medRunTime: true })
206 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
207 medRunTime: true
208 })
209 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
210 .workerChoiceStrategies) {
211 expect(workerChoiceStrategy.opts).toStrictEqual({ medRunTime: true })
212 }
213 expect(pool.workerChoiceStrategyContext.getTaskStatistics()).toStrictEqual({
214 runTime: true,
215 avgRunTime: false,
216 medRunTime: true,
217 waitTime: false,
218 avgWaitTime: false,
219 medWaitTime: false,
220 elu: false
221 })
222 pool.setWorkerChoiceStrategyOptions({ medRunTime: false })
223 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
224 medRunTime: false
225 })
226 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
227 .workerChoiceStrategies) {
228 expect(workerChoiceStrategy.opts).toStrictEqual({ medRunTime: false })
229 }
230 expect(pool.workerChoiceStrategyContext.getTaskStatistics()).toStrictEqual({
231 runTime: true,
232 avgRunTime: true,
233 medRunTime: false,
234 waitTime: false,
235 avgWaitTime: false,
236 medWaitTime: false,
237 elu: false
238 })
239 await pool.destroy()
240 })
241
242 it('Verify that tasks queue can be enabled/disabled', async () => {
243 const pool = new FixedThreadPool(
244 numberOfWorkers,
245 './tests/worker-files/thread/testWorker.js'
246 )
247 expect(pool.opts.enableTasksQueue).toBe(false)
248 expect(pool.opts.tasksQueueOptions).toBeUndefined()
249 pool.enableTasksQueue(true)
250 expect(pool.opts.enableTasksQueue).toBe(true)
251 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 1 })
252 pool.enableTasksQueue(true, { concurrency: 2 })
253 expect(pool.opts.enableTasksQueue).toBe(true)
254 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
255 pool.enableTasksQueue(false)
256 expect(pool.opts.enableTasksQueue).toBe(false)
257 expect(pool.opts.tasksQueueOptions).toBeUndefined()
258 await pool.destroy()
259 })
260
261 it('Verify that tasks queue options can be set', async () => {
262 const pool = new FixedThreadPool(
263 numberOfWorkers,
264 './tests/worker-files/thread/testWorker.js',
265 { enableTasksQueue: true }
266 )
267 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 1 })
268 pool.setTasksQueueOptions({ concurrency: 2 })
269 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
270 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrowError(
271 "Invalid worker tasks concurrency '0'"
272 )
273 await pool.destroy()
274 })
275
276 it('Verify that pool info is set', async () => {
277 let pool = new FixedThreadPool(
278 numberOfWorkers,
279 './tests/worker-files/thread/testWorker.js'
280 )
281 expect(pool.info).toStrictEqual({
282 type: PoolTypes.fixed,
283 worker: WorkerTypes.thread,
284 minSize: numberOfWorkers,
285 maxSize: numberOfWorkers,
286 workerNodes: numberOfWorkers,
287 idleWorkerNodes: numberOfWorkers,
288 busyWorkerNodes: 0,
289 executedTasks: 0,
290 executingTasks: 0,
291 queuedTasks: 0,
292 maxQueuedTasks: 0,
293 failedTasks: 0
294 })
295 await pool.destroy()
296 pool = new DynamicClusterPool(
297 numberOfWorkers,
298 numberOfWorkers * 2,
299 './tests/worker-files/thread/testWorker.js'
300 )
301 expect(pool.info).toStrictEqual({
302 type: PoolTypes.dynamic,
303 worker: WorkerTypes.cluster,
304 minSize: numberOfWorkers,
305 maxSize: numberOfWorkers * 2,
306 workerNodes: numberOfWorkers,
307 idleWorkerNodes: numberOfWorkers,
308 busyWorkerNodes: 0,
309 executedTasks: 0,
310 executingTasks: 0,
311 queuedTasks: 0,
312 maxQueuedTasks: 0,
313 failedTasks: 0
314 })
315 await pool.destroy()
316 })
317
318 it('Simulate worker not found', async () => {
319 const pool = new StubPoolWithRemoveAllWorker(
320 numberOfWorkers,
321 './tests/worker-files/cluster/testWorker.js',
322 {
323 errorHandler: e => console.error(e)
324 }
325 )
326 expect(pool.workerNodes.length).toBe(numberOfWorkers)
327 // Simulate worker not found.
328 pool.removeAllWorker()
329 expect(pool.workerNodes.length).toBe(0)
330 await pool.destroy()
331 })
332
333 it('Verify that worker pool tasks usage are initialized', async () => {
334 const pool = new FixedClusterPool(
335 numberOfWorkers,
336 './tests/worker-files/cluster/testWorker.js'
337 )
338 for (const workerNode of pool.workerNodes) {
339 expect(workerNode.workerUsage).toStrictEqual({
340 tasks: {
341 executed: 0,
342 executing: 0,
343 queued: 0,
344 failed: 0
345 },
346 runTime: {
347 aggregation: 0,
348 average: 0,
349 median: 0,
350 history: expect.any(CircularArray)
351 },
352 waitTime: {
353 aggregation: 0,
354 average: 0,
355 median: 0,
356 history: expect.any(CircularArray)
357 },
358 elu: undefined
359 })
360 }
361 await pool.destroy()
362 })
363
364 it('Verify that worker pool tasks queue are initialized', async () => {
365 const pool = new FixedClusterPool(
366 numberOfWorkers,
367 './tests/worker-files/cluster/testWorker.js'
368 )
369 for (const workerNode of pool.workerNodes) {
370 expect(workerNode.tasksQueue).toBeDefined()
371 expect(workerNode.tasksQueue).toBeInstanceOf(Queue)
372 expect(workerNode.tasksQueue.size).toBe(0)
373 }
374 await pool.destroy()
375 })
376
377 it('Verify that worker pool tasks usage are computed', async () => {
378 const pool = new FixedClusterPool(
379 numberOfWorkers,
380 './tests/worker-files/cluster/testWorker.js'
381 )
382 const promises = new Set()
383 const maxMultiplier = 2
384 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
385 promises.add(pool.execute())
386 }
387 for (const workerNode of pool.workerNodes) {
388 expect(workerNode.workerUsage).toStrictEqual({
389 tasks: {
390 executed: 0,
391 executing: maxMultiplier,
392 queued: 0,
393 failed: 0
394 },
395 runTime: {
396 aggregation: 0,
397 average: 0,
398 median: 0,
399 history: expect.any(CircularArray)
400 },
401 waitTime: {
402 aggregation: 0,
403 average: 0,
404 median: 0,
405 history: expect.any(CircularArray)
406 },
407 elu: undefined
408 })
409 }
410 await Promise.all(promises)
411 for (const workerNode of pool.workerNodes) {
412 expect(workerNode.workerUsage).toStrictEqual({
413 tasks: {
414 executed: maxMultiplier,
415 executing: 0,
416 queued: 0,
417 failed: 0
418 },
419 runTime: {
420 aggregation: 0,
421 average: 0,
422 median: 0,
423 history: expect.any(CircularArray)
424 },
425 waitTime: {
426 aggregation: 0,
427 average: 0,
428 median: 0,
429 history: expect.any(CircularArray)
430 },
431 elu: undefined
432 })
433 }
434 await pool.destroy()
435 })
436
437 it('Verify that worker pool tasks usage are reset at worker choice strategy change', async () => {
438 const pool = new DynamicThreadPool(
439 numberOfWorkers,
440 numberOfWorkers,
441 './tests/worker-files/thread/testWorker.js'
442 )
443 const promises = new Set()
444 const maxMultiplier = 2
445 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
446 promises.add(pool.execute())
447 }
448 await Promise.all(promises)
449 for (const workerNode of pool.workerNodes) {
450 expect(workerNode.workerUsage).toStrictEqual({
451 tasks: {
452 executed: expect.any(Number),
453 executing: 0,
454 queued: 0,
455 failed: 0
456 },
457 runTime: {
458 aggregation: 0,
459 average: 0,
460 median: 0,
461 history: expect.any(CircularArray)
462 },
463 waitTime: {
464 aggregation: 0,
465 average: 0,
466 median: 0,
467 history: expect.any(CircularArray)
468 },
469 elu: undefined
470 })
471 expect(workerNode.workerUsage.tasks.executed).toBeGreaterThan(0)
472 expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual(
473 maxMultiplier
474 )
475 }
476 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
477 for (const workerNode of pool.workerNodes) {
478 expect(workerNode.workerUsage).toStrictEqual({
479 tasks: {
480 executed: 0,
481 executing: 0,
482 queued: 0,
483 failed: 0
484 },
485 runTime: {
486 aggregation: 0,
487 average: 0,
488 median: 0,
489 history: expect.any(CircularArray)
490 },
491 waitTime: {
492 aggregation: 0,
493 average: 0,
494 median: 0,
495 history: expect.any(CircularArray)
496 },
497 elu: undefined
498 })
499 expect(workerNode.workerUsage.runTime.history.length).toBe(0)
500 expect(workerNode.workerUsage.waitTime.history.length).toBe(0)
501 }
502 await pool.destroy()
503 })
504
505 it("Verify that pool event emitter 'full' event can register a callback", async () => {
506 const pool = new DynamicThreadPool(
507 numberOfWorkers,
508 numberOfWorkers,
509 './tests/worker-files/thread/testWorker.js'
510 )
511 const promises = new Set()
512 let poolFull = 0
513 let poolInfo
514 pool.emitter.on(PoolEvents.full, info => {
515 ++poolFull
516 poolInfo = info
517 })
518 for (let i = 0; i < numberOfWorkers * 2; i++) {
519 promises.add(pool.execute())
520 }
521 await Promise.all(promises)
522 // The `full` event is triggered when the number of submitted tasks at once reach the max number of workers in the dynamic pool.
523 // So in total numberOfWorkers * 2 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool with min = max = numberOfWorkers.
524 expect(poolFull).toBe(numberOfWorkers * 2)
525 expect(poolInfo).toStrictEqual({
526 type: PoolTypes.dynamic,
527 worker: WorkerTypes.thread,
528 minSize: expect.any(Number),
529 maxSize: expect.any(Number),
530 workerNodes: expect.any(Number),
531 idleWorkerNodes: expect.any(Number),
532 busyWorkerNodes: expect.any(Number),
533 executedTasks: expect.any(Number),
534 executingTasks: expect.any(Number),
535 queuedTasks: expect.any(Number),
536 maxQueuedTasks: expect.any(Number),
537 failedTasks: expect.any(Number)
538 })
539 await pool.destroy()
540 })
541
542 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
543 const pool = new FixedThreadPool(
544 numberOfWorkers,
545 './tests/worker-files/thread/testWorker.js'
546 )
547 const promises = new Set()
548 let poolBusy = 0
549 let poolInfo
550 pool.emitter.on(PoolEvents.busy, info => {
551 ++poolBusy
552 poolInfo = info
553 })
554 for (let i = 0; i < numberOfWorkers * 2; i++) {
555 promises.add(pool.execute())
556 }
557 await Promise.all(promises)
558 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
559 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
560 expect(poolBusy).toBe(numberOfWorkers + 1)
561 expect(poolInfo).toStrictEqual({
562 type: PoolTypes.fixed,
563 worker: WorkerTypes.thread,
564 minSize: expect.any(Number),
565 maxSize: expect.any(Number),
566 workerNodes: expect.any(Number),
567 idleWorkerNodes: expect.any(Number),
568 busyWorkerNodes: expect.any(Number),
569 executedTasks: expect.any(Number),
570 executingTasks: expect.any(Number),
571 queuedTasks: expect.any(Number),
572 maxQueuedTasks: expect.any(Number),
573 failedTasks: expect.any(Number)
574 })
575 await pool.destroy()
576 })
577
578 it('Verify that multiple tasks worker is working', async () => {
579 const pool = new DynamicClusterPool(
580 numberOfWorkers,
581 numberOfWorkers * 2,
582 './tests/worker-files/cluster/testMultiTasksWorker.js'
583 )
584 const data = { n: 10 }
585 const result0 = await pool.execute(data)
586 expect(result0).toBe(false)
587 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
588 expect(result1).toBe(false)
589 const result2 = await pool.execute(data, 'factorial')
590 expect(result2).toBe(3628800)
591 const result3 = await pool.execute(data, 'fibonacci')
592 expect(result3).toBe(89)
593 })
594 })