test: improve pool options coverage
[poolifier.git] / tests / pools / abstract / abstract-pool.test.js
1 const { expect } = require('expect')
2 const {
3 DynamicClusterPool,
4 DynamicThreadPool,
5 FixedClusterPool,
6 FixedThreadPool,
7 PoolEvents,
8 WorkerChoiceStrategies,
9 PoolTypes,
10 WorkerTypes
11 } = require('../../../lib')
12 const { CircularArray } = require('../../../lib/circular-array')
13 const { Queue } = require('../../../lib/queue')
14
15 describe('Abstract pool test suite', () => {
16 const numberOfWorkers = 2
17 class StubPoolWithRemoveAllWorker extends FixedThreadPool {
18 removeAllWorker () {
19 this.workerNodes = []
20 this.promiseResponseMap.clear()
21 }
22 }
23 class StubPoolWithIsMain extends FixedThreadPool {
24 isMain () {
25 return false
26 }
27 }
28
29 it('Simulate pool creation from a non main thread/process', () => {
30 expect(
31 () =>
32 new StubPoolWithIsMain(
33 numberOfWorkers,
34 './tests/worker-files/thread/testWorker.js',
35 {
36 errorHandler: e => console.error(e)
37 }
38 )
39 ).toThrowError('Cannot start a pool from a worker!')
40 })
41
42 it('Verify that filePath is checked', () => {
43 const expectedError = new Error(
44 'Please specify a file with a worker implementation'
45 )
46 expect(() => new FixedThreadPool(numberOfWorkers)).toThrowError(
47 expectedError
48 )
49 expect(() => new FixedThreadPool(numberOfWorkers, '')).toThrowError(
50 expectedError
51 )
52 })
53
54 it('Verify that numberOfWorkers is checked', () => {
55 expect(() => new FixedThreadPool()).toThrowError(
56 'Cannot instantiate a pool without specifying the number of workers'
57 )
58 })
59
60 it('Verify that a negative number of workers is checked', () => {
61 expect(
62 () =>
63 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
64 ).toThrowError(
65 new RangeError(
66 'Cannot instantiate a pool with a negative number of workers'
67 )
68 )
69 })
70
71 it('Verify that a non integer number of workers is checked', () => {
72 expect(
73 () =>
74 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
75 ).toThrowError(
76 new TypeError(
77 'Cannot instantiate a pool with a non safe integer number of workers'
78 )
79 )
80 })
81
82 it('Verify that pool options are checked', async () => {
83 let pool = new FixedThreadPool(
84 numberOfWorkers,
85 './tests/worker-files/thread/testWorker.js'
86 )
87 expect(pool.emitter).toBeDefined()
88 expect(pool.opts.enableEvents).toBe(true)
89 expect(pool.opts.restartWorkerOnError).toBe(true)
90 expect(pool.opts.enableTasksQueue).toBe(false)
91 expect(pool.opts.tasksQueueOptions).toBeUndefined()
92 expect(pool.opts.workerChoiceStrategy).toBe(
93 WorkerChoiceStrategies.ROUND_ROBIN
94 )
95 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
96 runTime: { median: false },
97 waitTime: { median: false },
98 elu: { median: false }
99 })
100 expect(pool.opts.messageHandler).toBeUndefined()
101 expect(pool.opts.errorHandler).toBeUndefined()
102 expect(pool.opts.onlineHandler).toBeUndefined()
103 expect(pool.opts.exitHandler).toBeUndefined()
104 await pool.destroy()
105 const testHandler = () => console.log('test handler executed')
106 pool = new FixedThreadPool(
107 numberOfWorkers,
108 './tests/worker-files/thread/testWorker.js',
109 {
110 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
111 workerChoiceStrategyOptions: {
112 runTime: { median: true },
113 weights: { 0: 300, 1: 200 }
114 },
115 enableEvents: false,
116 restartWorkerOnError: false,
117 enableTasksQueue: true,
118 tasksQueueOptions: { concurrency: 2 },
119 messageHandler: testHandler,
120 errorHandler: testHandler,
121 onlineHandler: testHandler,
122 exitHandler: testHandler
123 }
124 )
125 expect(pool.emitter).toBeUndefined()
126 expect(pool.opts.enableEvents).toBe(false)
127 expect(pool.opts.restartWorkerOnError).toBe(false)
128 expect(pool.opts.enableTasksQueue).toBe(true)
129 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
130 expect(pool.opts.workerChoiceStrategy).toBe(
131 WorkerChoiceStrategies.LEAST_USED
132 )
133 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
134 runTime: { median: true },
135 weights: { 0: 300, 1: 200 }
136 })
137 expect(pool.opts.messageHandler).toStrictEqual(testHandler)
138 expect(pool.opts.errorHandler).toStrictEqual(testHandler)
139 expect(pool.opts.onlineHandler).toStrictEqual(testHandler)
140 expect(pool.opts.exitHandler).toStrictEqual(testHandler)
141 await pool.destroy()
142 })
143
144 it('Verify that pool options are validated', async () => {
145 expect(
146 () =>
147 new FixedThreadPool(
148 numberOfWorkers,
149 './tests/worker-files/thread/testWorker.js',
150 {
151 workerChoiceStrategy: 'invalidStrategy'
152 }
153 )
154 ).toThrowError("Invalid worker choice strategy 'invalidStrategy'")
155 expect(
156 () =>
157 new FixedThreadPool(
158 numberOfWorkers,
159 './tests/worker-files/thread/testWorker.js',
160 {
161 workerChoiceStrategyOptions: 'invalidOptions'
162 }
163 )
164 ).toThrowError(
165 'Invalid worker choice strategy options: must be a plain object'
166 )
167 expect(
168 () =>
169 new FixedThreadPool(
170 numberOfWorkers,
171 './tests/worker-files/thread/testWorker.js',
172 {
173 workerChoiceStrategyOptions: { weights: {} }
174 }
175 )
176 ).toThrowError(
177 'Invalid worker choice strategy options: must have a weight for each worker node'
178 )
179 expect(
180 () =>
181 new FixedThreadPool(
182 numberOfWorkers,
183 './tests/worker-files/thread/testWorker.js',
184 {
185 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
186 }
187 )
188 ).toThrowError(
189 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
190 )
191 expect(
192 () =>
193 new FixedThreadPool(
194 numberOfWorkers,
195 './tests/worker-files/thread/testWorker.js',
196 {
197 enableTasksQueue: true,
198 tasksQueueOptions: { concurrency: 0 }
199 }
200 )
201 ).toThrowError("Invalid worker tasks concurrency '0'")
202 expect(
203 () =>
204 new FixedThreadPool(
205 numberOfWorkers,
206 './tests/worker-files/thread/testWorker.js',
207 {
208 enableTasksQueue: true,
209 tasksQueueOptions: 'invalidTasksQueueOptions'
210 }
211 )
212 ).toThrowError('Invalid tasks queue options: must be a plain object')
213 expect(
214 () =>
215 new FixedThreadPool(
216 numberOfWorkers,
217 './tests/worker-files/thread/testWorker.js',
218 {
219 enableTasksQueue: true,
220 tasksQueueOptions: { concurrency: 0.2 }
221 }
222 )
223 ).toThrowError('Invalid worker tasks concurrency: must be an integer')
224 })
225
226 it('Verify that worker choice strategy options can be set', async () => {
227 const pool = new FixedThreadPool(
228 numberOfWorkers,
229 './tests/worker-files/thread/testWorker.js',
230 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
231 )
232 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
233 runTime: { median: false },
234 waitTime: { median: false },
235 elu: { median: false }
236 })
237 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
238 .workerChoiceStrategies) {
239 expect(workerChoiceStrategy.opts).toStrictEqual({
240 runTime: { median: false },
241 waitTime: { median: false },
242 elu: { median: false }
243 })
244 }
245 expect(
246 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
247 ).toStrictEqual({
248 runTime: {
249 aggregate: true,
250 average: true,
251 median: false
252 },
253 waitTime: {
254 aggregate: false,
255 average: false,
256 median: false
257 },
258 elu: {
259 aggregate: true,
260 average: true,
261 median: false
262 }
263 })
264 pool.setWorkerChoiceStrategyOptions({
265 runTime: { median: true },
266 elu: { median: true }
267 })
268 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
269 runTime: { median: true },
270 elu: { median: true }
271 })
272 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
273 .workerChoiceStrategies) {
274 expect(workerChoiceStrategy.opts).toStrictEqual({
275 runTime: { median: true },
276 elu: { median: true }
277 })
278 }
279 expect(
280 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
281 ).toStrictEqual({
282 runTime: {
283 aggregate: true,
284 average: false,
285 median: true
286 },
287 waitTime: {
288 aggregate: false,
289 average: false,
290 median: false
291 },
292 elu: {
293 aggregate: true,
294 average: false,
295 median: true
296 }
297 })
298 pool.setWorkerChoiceStrategyOptions({
299 runTime: { median: false },
300 elu: { median: false }
301 })
302 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
303 runTime: { median: false },
304 elu: { median: false }
305 })
306 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
307 .workerChoiceStrategies) {
308 expect(workerChoiceStrategy.opts).toStrictEqual({
309 runTime: { median: false },
310 elu: { median: false }
311 })
312 }
313 expect(
314 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
315 ).toStrictEqual({
316 runTime: {
317 aggregate: true,
318 average: true,
319 median: false
320 },
321 waitTime: {
322 aggregate: false,
323 average: false,
324 median: false
325 },
326 elu: {
327 aggregate: true,
328 average: true,
329 median: false
330 }
331 })
332 await pool.destroy()
333 })
334
335 it('Verify that tasks queue can be enabled/disabled', async () => {
336 const pool = new FixedThreadPool(
337 numberOfWorkers,
338 './tests/worker-files/thread/testWorker.js'
339 )
340 expect(pool.opts.enableTasksQueue).toBe(false)
341 expect(pool.opts.tasksQueueOptions).toBeUndefined()
342 pool.enableTasksQueue(true)
343 expect(pool.opts.enableTasksQueue).toBe(true)
344 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 1 })
345 pool.enableTasksQueue(true, { concurrency: 2 })
346 expect(pool.opts.enableTasksQueue).toBe(true)
347 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
348 pool.enableTasksQueue(false)
349 expect(pool.opts.enableTasksQueue).toBe(false)
350 expect(pool.opts.tasksQueueOptions).toBeUndefined()
351 await pool.destroy()
352 })
353
354 it('Verify that tasks queue options can be set', async () => {
355 const pool = new FixedThreadPool(
356 numberOfWorkers,
357 './tests/worker-files/thread/testWorker.js',
358 { enableTasksQueue: true }
359 )
360 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 1 })
361 pool.setTasksQueueOptions({ concurrency: 2 })
362 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
363 expect(() =>
364 pool.setTasksQueueOptions('invalidTasksQueueOptions')
365 ).toThrowError('Invalid tasks queue options: must be a plain object')
366 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrowError(
367 "Invalid worker tasks concurrency '0'"
368 )
369 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrowError(
370 'Invalid worker tasks concurrency: must be an integer'
371 )
372 await pool.destroy()
373 })
374
375 it('Verify that pool info is set', async () => {
376 let pool = new FixedThreadPool(
377 numberOfWorkers,
378 './tests/worker-files/thread/testWorker.js'
379 )
380 expect(pool.info).toStrictEqual({
381 type: PoolTypes.fixed,
382 worker: WorkerTypes.thread,
383 minSize: numberOfWorkers,
384 maxSize: numberOfWorkers,
385 workerNodes: numberOfWorkers,
386 idleWorkerNodes: numberOfWorkers,
387 busyWorkerNodes: 0,
388 executedTasks: 0,
389 executingTasks: 0,
390 queuedTasks: 0,
391 maxQueuedTasks: 0,
392 failedTasks: 0
393 })
394 await pool.destroy()
395 pool = new DynamicClusterPool(
396 numberOfWorkers,
397 numberOfWorkers * 2,
398 './tests/worker-files/thread/testWorker.js'
399 )
400 expect(pool.info).toStrictEqual({
401 type: PoolTypes.dynamic,
402 worker: WorkerTypes.cluster,
403 minSize: numberOfWorkers,
404 maxSize: numberOfWorkers * 2,
405 workerNodes: numberOfWorkers,
406 idleWorkerNodes: numberOfWorkers,
407 busyWorkerNodes: 0,
408 executedTasks: 0,
409 executingTasks: 0,
410 queuedTasks: 0,
411 maxQueuedTasks: 0,
412 failedTasks: 0
413 })
414 await pool.destroy()
415 })
416
417 it('Simulate worker not found', async () => {
418 const pool = new StubPoolWithRemoveAllWorker(
419 numberOfWorkers,
420 './tests/worker-files/cluster/testWorker.js',
421 {
422 errorHandler: e => console.error(e)
423 }
424 )
425 expect(pool.workerNodes.length).toBe(numberOfWorkers)
426 // Simulate worker not found.
427 pool.removeAllWorker()
428 expect(pool.workerNodes.length).toBe(0)
429 await pool.destroy()
430 })
431
432 it('Verify that worker pool tasks usage are initialized', async () => {
433 const pool = new FixedClusterPool(
434 numberOfWorkers,
435 './tests/worker-files/cluster/testWorker.js'
436 )
437 for (const workerNode of pool.workerNodes) {
438 expect(workerNode.workerUsage).toStrictEqual({
439 tasks: {
440 executed: 0,
441 executing: 0,
442 queued: 0,
443 failed: 0
444 },
445 runTime: {
446 aggregate: 0,
447 average: 0,
448 median: 0,
449 history: expect.any(CircularArray)
450 },
451 waitTime: {
452 aggregate: 0,
453 average: 0,
454 median: 0,
455 history: expect.any(CircularArray)
456 },
457 elu: {
458 idle: {
459 aggregate: 0,
460 average: 0,
461 median: 0,
462 history: expect.any(CircularArray)
463 },
464 active: {
465 aggregate: 0,
466 average: 0,
467 median: 0,
468 history: expect.any(CircularArray)
469 },
470 utilization: 0
471 }
472 })
473 }
474 await pool.destroy()
475 })
476
477 it('Verify that worker pool tasks queue are initialized', async () => {
478 const pool = new FixedClusterPool(
479 numberOfWorkers,
480 './tests/worker-files/cluster/testWorker.js'
481 )
482 for (const workerNode of pool.workerNodes) {
483 expect(workerNode.tasksQueue).toBeDefined()
484 expect(workerNode.tasksQueue).toBeInstanceOf(Queue)
485 expect(workerNode.tasksQueue.size).toBe(0)
486 }
487 await pool.destroy()
488 })
489
490 it('Verify that worker pool tasks usage are computed', async () => {
491 const pool = new FixedClusterPool(
492 numberOfWorkers,
493 './tests/worker-files/cluster/testWorker.js'
494 )
495 const promises = new Set()
496 const maxMultiplier = 2
497 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
498 promises.add(pool.execute())
499 }
500 for (const workerNode of pool.workerNodes) {
501 expect(workerNode.workerUsage).toStrictEqual({
502 tasks: {
503 executed: 0,
504 executing: maxMultiplier,
505 queued: 0,
506 failed: 0
507 },
508 runTime: {
509 aggregate: 0,
510 average: 0,
511 median: 0,
512 history: expect.any(CircularArray)
513 },
514 waitTime: {
515 aggregate: 0,
516 average: 0,
517 median: 0,
518 history: expect.any(CircularArray)
519 },
520 elu: {
521 idle: {
522 aggregate: 0,
523 average: 0,
524 median: 0,
525 history: expect.any(CircularArray)
526 },
527 active: {
528 aggregate: 0,
529 average: 0,
530 median: 0,
531 history: expect.any(CircularArray)
532 },
533 utilization: 0
534 }
535 })
536 }
537 await Promise.all(promises)
538 for (const workerNode of pool.workerNodes) {
539 expect(workerNode.workerUsage).toStrictEqual({
540 tasks: {
541 executed: maxMultiplier,
542 executing: 0,
543 queued: 0,
544 failed: 0
545 },
546 runTime: {
547 aggregate: 0,
548 average: 0,
549 median: 0,
550 history: expect.any(CircularArray)
551 },
552 waitTime: {
553 aggregate: 0,
554 average: 0,
555 median: 0,
556 history: expect.any(CircularArray)
557 },
558 elu: {
559 idle: {
560 aggregate: 0,
561 average: 0,
562 median: 0,
563 history: expect.any(CircularArray)
564 },
565 active: {
566 aggregate: 0,
567 average: 0,
568 median: 0,
569 history: expect.any(CircularArray)
570 },
571 utilization: 0
572 }
573 })
574 }
575 await pool.destroy()
576 })
577
578 it('Verify that worker pool tasks usage are reset at worker choice strategy change', async () => {
579 const pool = new DynamicThreadPool(
580 numberOfWorkers,
581 numberOfWorkers,
582 './tests/worker-files/thread/testWorker.js'
583 )
584 const promises = new Set()
585 const maxMultiplier = 2
586 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
587 promises.add(pool.execute())
588 }
589 await Promise.all(promises)
590 for (const workerNode of pool.workerNodes) {
591 expect(workerNode.workerUsage).toStrictEqual({
592 tasks: {
593 executed: expect.any(Number),
594 executing: 0,
595 queued: 0,
596 failed: 0
597 },
598 runTime: {
599 aggregate: 0,
600 average: 0,
601 median: 0,
602 history: expect.any(CircularArray)
603 },
604 waitTime: {
605 aggregate: 0,
606 average: 0,
607 median: 0,
608 history: expect.any(CircularArray)
609 },
610 elu: {
611 idle: {
612 aggregate: 0,
613 average: 0,
614 median: 0,
615 history: expect.any(CircularArray)
616 },
617 active: {
618 aggregate: 0,
619 average: 0,
620 median: 0,
621 history: expect.any(CircularArray)
622 },
623 utilization: 0
624 }
625 })
626 expect(workerNode.workerUsage.tasks.executed).toBeGreaterThan(0)
627 expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual(
628 maxMultiplier
629 )
630 }
631 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
632 for (const workerNode of pool.workerNodes) {
633 expect(workerNode.workerUsage).toStrictEqual({
634 tasks: {
635 executed: 0,
636 executing: 0,
637 queued: 0,
638 failed: 0
639 },
640 runTime: {
641 aggregate: 0,
642 average: 0,
643 median: 0,
644 history: expect.any(CircularArray)
645 },
646 waitTime: {
647 aggregate: 0,
648 average: 0,
649 median: 0,
650 history: expect.any(CircularArray)
651 },
652 elu: {
653 idle: {
654 aggregate: 0,
655 average: 0,
656 median: 0,
657 history: expect.any(CircularArray)
658 },
659 active: {
660 aggregate: 0,
661 average: 0,
662 median: 0,
663 history: expect.any(CircularArray)
664 },
665 utilization: 0
666 }
667 })
668 expect(workerNode.workerUsage.runTime.history.length).toBe(0)
669 expect(workerNode.workerUsage.waitTime.history.length).toBe(0)
670 }
671 await pool.destroy()
672 })
673
674 it("Verify that pool event emitter 'full' event can register a callback", async () => {
675 const pool = new DynamicThreadPool(
676 numberOfWorkers,
677 numberOfWorkers,
678 './tests/worker-files/thread/testWorker.js'
679 )
680 const promises = new Set()
681 let poolFull = 0
682 let poolInfo
683 pool.emitter.on(PoolEvents.full, info => {
684 ++poolFull
685 poolInfo = info
686 })
687 for (let i = 0; i < numberOfWorkers * 2; i++) {
688 promises.add(pool.execute())
689 }
690 await Promise.all(promises)
691 // The `full` event is triggered when the number of submitted tasks at once reach the max number of workers in the dynamic pool.
692 // So in total numberOfWorkers * 2 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool with min = max = numberOfWorkers.
693 expect(poolFull).toBe(numberOfWorkers * 2)
694 expect(poolInfo).toStrictEqual({
695 type: PoolTypes.dynamic,
696 worker: WorkerTypes.thread,
697 minSize: expect.any(Number),
698 maxSize: expect.any(Number),
699 workerNodes: expect.any(Number),
700 idleWorkerNodes: expect.any(Number),
701 busyWorkerNodes: expect.any(Number),
702 executedTasks: expect.any(Number),
703 executingTasks: expect.any(Number),
704 queuedTasks: expect.any(Number),
705 maxQueuedTasks: expect.any(Number),
706 failedTasks: expect.any(Number)
707 })
708 await pool.destroy()
709 })
710
711 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
712 const pool = new FixedThreadPool(
713 numberOfWorkers,
714 './tests/worker-files/thread/testWorker.js'
715 )
716 const promises = new Set()
717 let poolBusy = 0
718 let poolInfo
719 pool.emitter.on(PoolEvents.busy, info => {
720 ++poolBusy
721 poolInfo = info
722 })
723 for (let i = 0; i < numberOfWorkers * 2; i++) {
724 promises.add(pool.execute())
725 }
726 await Promise.all(promises)
727 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
728 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
729 expect(poolBusy).toBe(numberOfWorkers + 1)
730 expect(poolInfo).toStrictEqual({
731 type: PoolTypes.fixed,
732 worker: WorkerTypes.thread,
733 minSize: expect.any(Number),
734 maxSize: expect.any(Number),
735 workerNodes: expect.any(Number),
736 idleWorkerNodes: expect.any(Number),
737 busyWorkerNodes: expect.any(Number),
738 executedTasks: expect.any(Number),
739 executingTasks: expect.any(Number),
740 queuedTasks: expect.any(Number),
741 maxQueuedTasks: expect.any(Number),
742 failedTasks: expect.any(Number)
743 })
744 await pool.destroy()
745 })
746
747 it('Verify that multiple tasks worker is working', async () => {
748 const pool = new DynamicClusterPool(
749 numberOfWorkers,
750 numberOfWorkers * 2,
751 './tests/worker-files/cluster/testMultiTasksWorker.js'
752 )
753 const data = { n: 10 }
754 const result0 = await pool.execute(data)
755 expect(result0).toBe(false)
756 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
757 expect(result1).toBe(false)
758 const result2 = await pool.execute(data, 'factorial')
759 expect(result2).toBe(3628800)
760 const result3 = await pool.execute(data, 'fibonacci')
761 expect(result3).toBe(89)
762 })
763 })