1 const { expect
} = require('expect')
8 WorkerChoiceStrategies
,
11 } = require('../../../lib')
12 const { CircularArray
} = require('../../../lib/circular-array')
13 const { Queue
} = require('../../../lib/queue')
14 const { version
} = require('../../../package.json')
15 const { waitPoolEvents
} = require('../../test-utils')
17 describe('Abstract pool test suite', () => {
18 const numberOfWorkers
= 2
19 class StubPoolWithIsMain
extends FixedThreadPool
{
25 it('Simulate pool creation from a non main thread/process', () => {
28 new StubPoolWithIsMain(
30 './tests/worker-files/thread/testWorker.js',
32 errorHandler
: e
=> console
.error(e
)
35 ).toThrowError('Cannot start a pool from a worker!')
38 it('Verify that filePath is checked', () => {
39 const expectedError
= new Error(
40 'Please specify a file with a worker implementation'
42 expect(() => new FixedThreadPool(numberOfWorkers
)).toThrowError(
45 expect(() => new FixedThreadPool(numberOfWorkers
, '')).toThrowError(
50 it('Verify that numberOfWorkers is checked', () => {
51 expect(() => new FixedThreadPool()).toThrowError(
52 'Cannot instantiate a pool without specifying the number of workers'
56 it('Verify that a negative number of workers is checked', () => {
59 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
62 'Cannot instantiate a pool with a negative number of workers'
67 it('Verify that a non integer number of workers is checked', () => {
70 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
73 'Cannot instantiate a pool with a non safe integer number of workers'
78 it('Verify that dynamic pool sizing is checked', () => {
81 new DynamicThreadPool(2, 1, './tests/worker-files/thread/testWorker.js')
84 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
89 new DynamicThreadPool(1, 1, './tests/worker-files/thread/testWorker.js')
92 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
97 new DynamicThreadPool(0, 0, './tests/worker-files/thread/testWorker.js')
100 'Cannot instantiate a dynamic pool with a minimum pool size and a maximum pool size equal to zero'
105 it('Verify that pool options are checked', async () => {
106 let pool
= new FixedThreadPool(
108 './tests/worker-files/thread/testWorker.js'
110 expect(pool
.emitter
).toBeDefined()
111 expect(pool
.opts
.enableEvents
).toBe(true)
112 expect(pool
.opts
.restartWorkerOnError
).toBe(true)
113 expect(pool
.opts
.enableTasksQueue
).toBe(false)
114 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
115 expect(pool
.opts
.workerChoiceStrategy
).toBe(
116 WorkerChoiceStrategies
.ROUND_ROBIN
118 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
119 runTime
: { median
: false },
120 waitTime
: { median
: false },
121 elu
: { median
: false }
123 expect(pool
.opts
.messageHandler
).toBeUndefined()
124 expect(pool
.opts
.errorHandler
).toBeUndefined()
125 expect(pool
.opts
.onlineHandler
).toBeUndefined()
126 expect(pool
.opts
.exitHandler
).toBeUndefined()
128 const testHandler
= () => console
.log('test handler executed')
129 pool
= new FixedThreadPool(
131 './tests/worker-files/thread/testWorker.js',
133 workerChoiceStrategy
: WorkerChoiceStrategies
.LEAST_USED
,
134 workerChoiceStrategyOptions
: {
135 runTime
: { median
: true },
136 weights
: { 0: 300, 1: 200 }
139 restartWorkerOnError
: false,
140 enableTasksQueue
: true,
141 tasksQueueOptions
: { concurrency
: 2 },
142 messageHandler
: testHandler
,
143 errorHandler
: testHandler
,
144 onlineHandler
: testHandler
,
145 exitHandler
: testHandler
148 expect(pool
.emitter
).toBeUndefined()
149 expect(pool
.opts
.enableEvents
).toBe(false)
150 expect(pool
.opts
.restartWorkerOnError
).toBe(false)
151 expect(pool
.opts
.enableTasksQueue
).toBe(true)
152 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
153 expect(pool
.opts
.workerChoiceStrategy
).toBe(
154 WorkerChoiceStrategies
.LEAST_USED
156 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
157 runTime
: { median
: true },
158 weights
: { 0: 300, 1: 200 }
160 expect(pool
.opts
.messageHandler
).toStrictEqual(testHandler
)
161 expect(pool
.opts
.errorHandler
).toStrictEqual(testHandler
)
162 expect(pool
.opts
.onlineHandler
).toStrictEqual(testHandler
)
163 expect(pool
.opts
.exitHandler
).toStrictEqual(testHandler
)
167 it('Verify that pool options are validated', async () => {
172 './tests/worker-files/thread/testWorker.js',
174 workerChoiceStrategy
: 'invalidStrategy'
177 ).toThrowError("Invalid worker choice strategy 'invalidStrategy'")
182 './tests/worker-files/thread/testWorker.js',
184 workerChoiceStrategyOptions
: 'invalidOptions'
188 'Invalid worker choice strategy options: must be a plain object'
194 './tests/worker-files/thread/testWorker.js',
196 workerChoiceStrategyOptions
: { weights
: {} }
200 'Invalid worker choice strategy options: must have a weight for each worker node'
206 './tests/worker-files/thread/testWorker.js',
208 workerChoiceStrategyOptions
: { measurement
: 'invalidMeasurement' }
212 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
218 './tests/worker-files/thread/testWorker.js',
220 enableTasksQueue
: true,
221 tasksQueueOptions
: { concurrency
: 0 }
224 ).toThrowError("Invalid worker tasks concurrency '0'")
229 './tests/worker-files/thread/testWorker.js',
231 enableTasksQueue
: true,
232 tasksQueueOptions
: 'invalidTasksQueueOptions'
235 ).toThrowError('Invalid tasks queue options: must be a plain object')
240 './tests/worker-files/thread/testWorker.js',
242 enableTasksQueue
: true,
243 tasksQueueOptions
: { concurrency
: 0.2 }
246 ).toThrowError('Invalid worker tasks concurrency: must be an integer')
249 it('Verify that pool worker choice strategy options can be set', async () => {
250 const pool
= new FixedThreadPool(
252 './tests/worker-files/thread/testWorker.js',
253 { workerChoiceStrategy
: WorkerChoiceStrategies
.FAIR_SHARE
}
255 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
256 runTime
: { median
: false },
257 waitTime
: { median
: false },
258 elu
: { median
: false }
260 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
261 .workerChoiceStrategies
) {
262 expect(workerChoiceStrategy
.opts
).toStrictEqual({
263 runTime
: { median
: false },
264 waitTime
: { median
: false },
265 elu
: { median
: false }
269 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
287 pool
.setWorkerChoiceStrategyOptions({
288 runTime
: { median
: true },
289 elu
: { median
: true }
291 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
292 runTime
: { median
: true },
293 elu
: { median
: true }
295 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
296 .workerChoiceStrategies
) {
297 expect(workerChoiceStrategy
.opts
).toStrictEqual({
298 runTime
: { median
: true },
299 elu
: { median
: true }
303 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
321 pool
.setWorkerChoiceStrategyOptions({
322 runTime
: { median
: false },
323 elu
: { median
: false }
325 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
326 runTime
: { median
: false },
327 elu
: { median
: false }
329 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
330 .workerChoiceStrategies
) {
331 expect(workerChoiceStrategy
.opts
).toStrictEqual({
332 runTime
: { median
: false },
333 elu
: { median
: false }
337 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
356 pool
.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
358 'Invalid worker choice strategy options: must be a plain object'
361 pool
.setWorkerChoiceStrategyOptions({ weights
: {} })
363 'Invalid worker choice strategy options: must have a weight for each worker node'
366 pool
.setWorkerChoiceStrategyOptions({ measurement
: 'invalidMeasurement' })
368 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
373 it('Verify that pool tasks queue can be enabled/disabled', async () => {
374 const pool
= new FixedThreadPool(
376 './tests/worker-files/thread/testWorker.js'
378 expect(pool
.opts
.enableTasksQueue
).toBe(false)
379 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
380 pool
.enableTasksQueue(true)
381 expect(pool
.opts
.enableTasksQueue
).toBe(true)
382 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 1 })
383 pool
.enableTasksQueue(true, { concurrency
: 2 })
384 expect(pool
.opts
.enableTasksQueue
).toBe(true)
385 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
386 pool
.enableTasksQueue(false)
387 expect(pool
.opts
.enableTasksQueue
).toBe(false)
388 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
392 it('Verify that pool tasks queue options can be set', async () => {
393 const pool
= new FixedThreadPool(
395 './tests/worker-files/thread/testWorker.js',
396 { enableTasksQueue
: true }
398 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 1 })
399 pool
.setTasksQueueOptions({ concurrency
: 2 })
400 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
402 pool
.setTasksQueueOptions('invalidTasksQueueOptions')
403 ).toThrowError('Invalid tasks queue options: must be a plain object')
404 expect(() => pool
.setTasksQueueOptions({ concurrency
: 0 })).toThrowError(
405 "Invalid worker tasks concurrency '0'"
407 expect(() => pool
.setTasksQueueOptions({ concurrency
: 0.2 })).toThrowError(
408 'Invalid worker tasks concurrency: must be an integer'
413 it('Verify that pool info is set', async () => {
414 let pool
= new FixedThreadPool(
416 './tests/worker-files/thread/testWorker.js'
418 expect(pool
.info
).toStrictEqual({
420 type
: PoolTypes
.fixed
,
421 worker
: WorkerTypes
.thread
,
423 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
424 minSize
: numberOfWorkers
,
425 maxSize
: numberOfWorkers
,
426 workerNodes
: numberOfWorkers
,
427 idleWorkerNodes
: numberOfWorkers
,
435 await
waitPoolEvents(pool
, PoolEvents
.ready
, 1)
436 expect(pool
.info
).toStrictEqual({
438 type
: PoolTypes
.fixed
,
439 worker
: WorkerTypes
.thread
,
441 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
442 minSize
: numberOfWorkers
,
443 maxSize
: numberOfWorkers
,
444 workerNodes
: numberOfWorkers
,
445 idleWorkerNodes
: numberOfWorkers
,
454 pool
= new DynamicClusterPool(
455 Math
.floor(numberOfWorkers
/ 2),
457 './tests/worker-files/cluster/testWorker.js'
459 expect(pool
.info
).toStrictEqual({
461 type
: PoolTypes
.dynamic
,
462 worker
: WorkerTypes
.cluster
,
464 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
465 minSize
: Math
.floor(numberOfWorkers
/ 2),
466 maxSize
: numberOfWorkers
,
467 workerNodes
: Math
.floor(numberOfWorkers
/ 2),
468 idleWorkerNodes
: Math
.floor(numberOfWorkers
/ 2),
476 await
waitPoolEvents(pool
, PoolEvents
.ready
, 1)
477 expect(pool
.info
).toStrictEqual({
479 type
: PoolTypes
.dynamic
,
480 worker
: WorkerTypes
.cluster
,
482 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
483 minSize
: Math
.floor(numberOfWorkers
/ 2),
484 maxSize
: numberOfWorkers
,
485 workerNodes
: Math
.floor(numberOfWorkers
/ 2),
486 idleWorkerNodes
: Math
.floor(numberOfWorkers
/ 2),
497 it('Verify that pool worker tasks usage are initialized', async () => {
498 const pool
= new FixedClusterPool(
500 './tests/worker-files/cluster/testWorker.js'
502 for (const workerNode
of pool
.workerNodes
) {
503 expect(workerNode
.usage
).toStrictEqual({
512 history
: expect
.any(CircularArray
)
515 history
: expect
.any(CircularArray
)
519 history
: expect
.any(CircularArray
)
522 history
: expect
.any(CircularArray
)
530 it('Verify that pool worker tasks queue are initialized', async () => {
531 let pool
= new FixedClusterPool(
533 './tests/worker-files/cluster/testWorker.js'
535 for (const workerNode
of pool
.workerNodes
) {
536 expect(workerNode
.tasksQueue
).toBeDefined()
537 expect(workerNode
.tasksQueue
).toBeInstanceOf(Queue
)
538 expect(workerNode
.tasksQueue
.size
).toBe(0)
539 expect(workerNode
.tasksQueue
.maxSize
).toBe(0)
542 pool
= new DynamicThreadPool(
543 Math
.floor(numberOfWorkers
/ 2),
545 './tests/worker-files/thread/testWorker.js'
547 for (const workerNode
of pool
.workerNodes
) {
548 expect(workerNode
.tasksQueue
).toBeDefined()
549 expect(workerNode
.tasksQueue
).toBeInstanceOf(Queue
)
550 expect(workerNode
.tasksQueue
.size
).toBe(0)
551 expect(workerNode
.tasksQueue
.maxSize
).toBe(0)
555 it('Verify that pool worker info are initialized', async () => {
556 let pool
= new FixedClusterPool(
558 './tests/worker-files/cluster/testWorker.js'
560 for (const workerNode
of pool
.workerNodes
) {
561 expect(workerNode
.info
).toStrictEqual({
562 id
: expect
.any(Number
),
563 type
: WorkerTypes
.cluster
,
568 await
waitPoolEvents(pool
, PoolEvents
.ready
, 1)
569 for (const workerNode
of pool
.workerNodes
) {
570 expect(workerNode
.info
).toStrictEqual({
571 id
: expect
.any(Number
),
572 type
: WorkerTypes
.cluster
,
578 pool
= new DynamicThreadPool(
579 Math
.floor(numberOfWorkers
/ 2),
581 './tests/worker-files/thread/testWorker.js'
583 for (const workerNode
of pool
.workerNodes
) {
584 expect(workerNode
.info
).toStrictEqual({
585 id
: expect
.any(Number
),
586 type
: WorkerTypes
.thread
,
591 await
waitPoolEvents(pool
, PoolEvents
.ready
, 1)
592 for (const workerNode
of pool
.workerNodes
) {
593 expect(workerNode
.info
).toStrictEqual({
594 id
: expect
.any(Number
),
595 type
: WorkerTypes
.thread
,
602 it('Verify that pool worker tasks usage are computed', async () => {
603 const pool
= new FixedClusterPool(
605 './tests/worker-files/cluster/testWorker.js'
607 const promises
= new Set()
608 const maxMultiplier
= 2
609 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
610 promises
.add(pool
.execute())
612 for (const workerNode
of pool
.workerNodes
) {
613 expect(workerNode
.usage
).toStrictEqual({
616 executing
: maxMultiplier
,
622 history
: expect
.any(CircularArray
)
625 history
: expect
.any(CircularArray
)
629 history
: expect
.any(CircularArray
)
632 history
: expect
.any(CircularArray
)
637 await Promise
.all(promises
)
638 for (const workerNode
of pool
.workerNodes
) {
639 expect(workerNode
.usage
).toStrictEqual({
641 executed
: maxMultiplier
,
648 history
: expect
.any(CircularArray
)
651 history
: expect
.any(CircularArray
)
655 history
: expect
.any(CircularArray
)
658 history
: expect
.any(CircularArray
)
666 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
667 const pool
= new DynamicThreadPool(
668 Math
.floor(numberOfWorkers
/ 2),
670 './tests/worker-files/thread/testWorker.js'
672 const promises
= new Set()
673 const maxMultiplier
= 2
674 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
675 promises
.add(pool
.execute())
677 await Promise
.all(promises
)
678 for (const workerNode
of pool
.workerNodes
) {
679 expect(workerNode
.usage
).toStrictEqual({
681 executed
: expect
.any(Number
),
688 history
: expect
.any(CircularArray
)
691 history
: expect
.any(CircularArray
)
695 history
: expect
.any(CircularArray
)
698 history
: expect
.any(CircularArray
)
702 expect(workerNode
.usage
.tasks
.executed
).toBeGreaterThan(0)
703 expect(workerNode
.usage
.tasks
.executed
).toBeLessThanOrEqual(maxMultiplier
)
705 pool
.setWorkerChoiceStrategy(WorkerChoiceStrategies
.FAIR_SHARE
)
706 for (const workerNode
of pool
.workerNodes
) {
707 expect(workerNode
.usage
).toStrictEqual({
716 history
: expect
.any(CircularArray
)
719 history
: expect
.any(CircularArray
)
723 history
: expect
.any(CircularArray
)
726 history
: expect
.any(CircularArray
)
730 expect(workerNode
.usage
.runTime
.history
.length
).toBe(0)
731 expect(workerNode
.usage
.waitTime
.history
.length
).toBe(0)
736 it("Verify that pool event emitter 'full' event can register a callback", async () => {
737 const pool
= new DynamicThreadPool(
738 Math
.floor(numberOfWorkers
/ 2),
740 './tests/worker-files/thread/testWorker.js'
742 const promises
= new Set()
745 pool
.emitter
.on(PoolEvents
.full
, info
=> {
749 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
750 promises
.add(pool
.execute())
752 await Promise
.all(promises
)
753 // The `full` event is triggered when the number of submitted tasks at once reach the maximum number of workers in the dynamic pool.
754 // So in total numberOfWorkers * 2 - 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool with min = (max = numberOfWorkers) / 2.
755 expect(poolFull
).toBe(numberOfWorkers
* 2 - 1)
756 expect(poolInfo
).toStrictEqual({
758 type
: PoolTypes
.dynamic
,
759 worker
: WorkerTypes
.thread
,
760 ready
: expect
.any(Boolean
),
761 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
762 minSize
: expect
.any(Number
),
763 maxSize
: expect
.any(Number
),
764 workerNodes
: expect
.any(Number
),
765 idleWorkerNodes
: expect
.any(Number
),
766 busyWorkerNodes
: expect
.any(Number
),
767 executedTasks
: expect
.any(Number
),
768 executingTasks
: expect
.any(Number
),
769 queuedTasks
: expect
.any(Number
),
770 maxQueuedTasks
: expect
.any(Number
),
771 failedTasks
: expect
.any(Number
)
776 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
777 const pool
= new DynamicClusterPool(
778 Math
.floor(numberOfWorkers
/ 2),
780 './tests/worker-files/cluster/testWorker.js'
784 pool
.emitter
.on(PoolEvents
.ready
, info
=> {
788 await
waitPoolEvents(pool
, PoolEvents
.ready
, 1)
789 expect(poolReady
).toBe(1)
790 expect(poolInfo
).toStrictEqual({
792 type
: PoolTypes
.dynamic
,
793 worker
: WorkerTypes
.cluster
,
795 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
796 minSize
: expect
.any(Number
),
797 maxSize
: expect
.any(Number
),
798 workerNodes
: expect
.any(Number
),
799 idleWorkerNodes
: expect
.any(Number
),
800 busyWorkerNodes
: expect
.any(Number
),
801 executedTasks
: expect
.any(Number
),
802 executingTasks
: expect
.any(Number
),
803 queuedTasks
: expect
.any(Number
),
804 maxQueuedTasks
: expect
.any(Number
),
805 failedTasks
: expect
.any(Number
)
810 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
811 const pool
= new FixedThreadPool(
813 './tests/worker-files/thread/testWorker.js'
815 const promises
= new Set()
818 pool
.emitter
.on(PoolEvents
.busy
, info
=> {
822 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
823 promises
.add(pool
.execute())
825 await Promise
.all(promises
)
826 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
827 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
828 expect(poolBusy
).toBe(numberOfWorkers
+ 1)
829 expect(poolInfo
).toStrictEqual({
831 type
: PoolTypes
.fixed
,
832 worker
: WorkerTypes
.thread
,
833 ready
: expect
.any(Boolean
),
834 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
835 minSize
: expect
.any(Number
),
836 maxSize
: expect
.any(Number
),
837 workerNodes
: expect
.any(Number
),
838 idleWorkerNodes
: expect
.any(Number
),
839 busyWorkerNodes
: expect
.any(Number
),
840 executedTasks
: expect
.any(Number
),
841 executingTasks
: expect
.any(Number
),
842 queuedTasks
: expect
.any(Number
),
843 maxQueuedTasks
: expect
.any(Number
),
844 failedTasks
: expect
.any(Number
)
849 it('Verify that multiple tasks worker is working', async () => {
850 const pool
= new DynamicClusterPool(
851 Math
.floor(numberOfWorkers
/ 2),
853 './tests/worker-files/cluster/testMultiTasksWorker.js'
855 const data
= { n
: 10 }
856 const result0
= await pool
.execute(data
)
857 expect(result0
).toStrictEqual({ ok
: 1 })
858 const result1
= await pool
.execute(data
, 'jsonIntegerSerialization')
859 expect(result1
).toStrictEqual({ ok
: 1 })
860 const result2
= await pool
.execute(data
, 'factorial')
861 expect(result2
).toBe(3628800)
862 const result3
= await pool
.execute(data
, 'fibonacci')
863 expect(result3
).toBe(55)