1 const { expect
} = require('expect')
9 WorkerChoiceStrategies
,
11 } = require('../../../lib')
12 const { CircularArray
} = require('../../../lib/circular-array')
13 const { Queue
} = require('../../../lib/queue')
14 const { version
} = require('../../../package.json')
15 const { waitPoolEvents
} = require('../../test-utils')
17 describe('Abstract pool test suite', () => {
18 const numberOfWorkers
= 2
19 class StubPoolWithIsMain
extends FixedThreadPool
{
25 it('Simulate pool creation from a non main thread/process', () => {
28 new StubPoolWithIsMain(
30 './tests/worker-files/thread/testWorker.js',
32 errorHandler
: e
=> console
.error(e
)
35 ).toThrowError('Cannot start a pool from a worker!')
38 it('Verify that filePath is checked', () => {
39 const expectedError
= new Error(
40 'Please specify a file with a worker implementation'
42 expect(() => new FixedThreadPool(numberOfWorkers
)).toThrowError(
45 expect(() => new FixedThreadPool(numberOfWorkers
, '')).toThrowError(
48 expect(() => new FixedThreadPool(numberOfWorkers
, 0)).toThrowError(
51 expect(() => new FixedThreadPool(numberOfWorkers
, true)).toThrowError(
55 () => new FixedThreadPool(numberOfWorkers
, './dummyWorker.ts')
56 ).toThrowError(new Error("Cannot find the worker file './dummyWorker.ts'"))
59 it('Verify that numberOfWorkers is checked', () => {
60 expect(() => new FixedThreadPool()).toThrowError(
61 'Cannot instantiate a pool without specifying the number of workers'
65 it('Verify that a negative number of workers is checked', () => {
68 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
71 'Cannot instantiate a pool with a negative number of workers'
76 it('Verify that a non integer number of workers is checked', () => {
79 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
82 'Cannot instantiate a pool with a non safe integer number of workers'
87 it('Verify that dynamic pool sizing is checked', () => {
90 new DynamicThreadPool(2, 1, './tests/worker-files/thread/testWorker.js')
93 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
98 new DynamicThreadPool(1, 1, './tests/worker-files/thread/testWorker.js')
101 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
106 new DynamicThreadPool(0, 0, './tests/worker-files/thread/testWorker.js')
109 'Cannot instantiate a dynamic pool with a minimum pool size and a maximum pool size equal to zero'
114 it('Verify that pool options are checked', async () => {
115 let pool
= new FixedThreadPool(
117 './tests/worker-files/thread/testWorker.js'
119 expect(pool
.emitter
).toBeDefined()
120 expect(pool
.opts
.enableEvents
).toBe(true)
121 expect(pool
.opts
.restartWorkerOnError
).toBe(true)
122 expect(pool
.opts
.enableTasksQueue
).toBe(false)
123 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
124 expect(pool
.opts
.workerChoiceStrategy
).toBe(
125 WorkerChoiceStrategies
.ROUND_ROBIN
127 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
128 runTime
: { median
: false },
129 waitTime
: { median
: false },
130 elu
: { median
: false }
132 expect(pool
.opts
.messageHandler
).toBeUndefined()
133 expect(pool
.opts
.errorHandler
).toBeUndefined()
134 expect(pool
.opts
.onlineHandler
).toBeUndefined()
135 expect(pool
.opts
.exitHandler
).toBeUndefined()
137 const testHandler
= () => console
.log('test handler executed')
138 pool
= new FixedThreadPool(
140 './tests/worker-files/thread/testWorker.js',
142 workerChoiceStrategy
: WorkerChoiceStrategies
.LEAST_USED
,
143 workerChoiceStrategyOptions
: {
144 runTime
: { median
: true },
145 weights
: { 0: 300, 1: 200 }
148 restartWorkerOnError
: false,
149 enableTasksQueue
: true,
150 tasksQueueOptions
: { concurrency
: 2 },
151 messageHandler
: testHandler
,
152 errorHandler
: testHandler
,
153 onlineHandler
: testHandler
,
154 exitHandler
: testHandler
157 expect(pool
.emitter
).toBeUndefined()
158 expect(pool
.opts
.enableEvents
).toBe(false)
159 expect(pool
.opts
.restartWorkerOnError
).toBe(false)
160 expect(pool
.opts
.enableTasksQueue
).toBe(true)
161 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
162 expect(pool
.opts
.workerChoiceStrategy
).toBe(
163 WorkerChoiceStrategies
.LEAST_USED
165 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
166 runTime
: { median
: true },
167 weights
: { 0: 300, 1: 200 }
169 expect(pool
.opts
.messageHandler
).toStrictEqual(testHandler
)
170 expect(pool
.opts
.errorHandler
).toStrictEqual(testHandler
)
171 expect(pool
.opts
.onlineHandler
).toStrictEqual(testHandler
)
172 expect(pool
.opts
.exitHandler
).toStrictEqual(testHandler
)
176 it('Verify that pool options are validated', async () => {
181 './tests/worker-files/thread/testWorker.js',
183 workerChoiceStrategy
: 'invalidStrategy'
186 ).toThrowError("Invalid worker choice strategy 'invalidStrategy'")
191 './tests/worker-files/thread/testWorker.js',
193 workerChoiceStrategyOptions
: 'invalidOptions'
197 'Invalid worker choice strategy options: must be a plain object'
203 './tests/worker-files/thread/testWorker.js',
205 workerChoiceStrategyOptions
: { weights
: {} }
209 'Invalid worker choice strategy options: must have a weight for each worker node'
215 './tests/worker-files/thread/testWorker.js',
217 workerChoiceStrategyOptions
: { measurement
: 'invalidMeasurement' }
221 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
227 './tests/worker-files/thread/testWorker.js',
229 enableTasksQueue
: true,
230 tasksQueueOptions
: { concurrency
: 0 }
233 ).toThrowError("Invalid worker tasks concurrency '0'")
238 './tests/worker-files/thread/testWorker.js',
240 enableTasksQueue
: true,
241 tasksQueueOptions
: 'invalidTasksQueueOptions'
244 ).toThrowError('Invalid tasks queue options: must be a plain object')
249 './tests/worker-files/thread/testWorker.js',
251 enableTasksQueue
: true,
252 tasksQueueOptions
: { concurrency
: 0.2 }
255 ).toThrowError('Invalid worker tasks concurrency: must be an integer')
258 it('Verify that pool worker choice strategy options can be set', async () => {
259 const pool
= new FixedThreadPool(
261 './tests/worker-files/thread/testWorker.js',
262 { workerChoiceStrategy
: WorkerChoiceStrategies
.FAIR_SHARE
}
264 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
265 runTime
: { median
: false },
266 waitTime
: { median
: false },
267 elu
: { median
: false }
269 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
270 .workerChoiceStrategies
) {
271 expect(workerChoiceStrategy
.opts
).toStrictEqual({
272 runTime
: { median
: false },
273 waitTime
: { median
: false },
274 elu
: { median
: false }
278 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
296 pool
.setWorkerChoiceStrategyOptions({
297 runTime
: { median
: true },
298 elu
: { median
: true }
300 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
301 runTime
: { median
: true },
302 elu
: { median
: true }
304 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
305 .workerChoiceStrategies
) {
306 expect(workerChoiceStrategy
.opts
).toStrictEqual({
307 runTime
: { median
: true },
308 elu
: { median
: true }
312 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
330 pool
.setWorkerChoiceStrategyOptions({
331 runTime
: { median
: false },
332 elu
: { median
: false }
334 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
335 runTime
: { median
: false },
336 elu
: { median
: false }
338 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
339 .workerChoiceStrategies
) {
340 expect(workerChoiceStrategy
.opts
).toStrictEqual({
341 runTime
: { median
: false },
342 elu
: { median
: false }
346 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
365 pool
.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
367 'Invalid worker choice strategy options: must be a plain object'
370 pool
.setWorkerChoiceStrategyOptions({ weights
: {} })
372 'Invalid worker choice strategy options: must have a weight for each worker node'
375 pool
.setWorkerChoiceStrategyOptions({ measurement
: 'invalidMeasurement' })
377 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
382 it('Verify that pool tasks queue can be enabled/disabled', async () => {
383 const pool
= new FixedThreadPool(
385 './tests/worker-files/thread/testWorker.js'
387 expect(pool
.opts
.enableTasksQueue
).toBe(false)
388 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
389 pool
.enableTasksQueue(true)
390 expect(pool
.opts
.enableTasksQueue
).toBe(true)
391 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 1 })
392 pool
.enableTasksQueue(true, { concurrency
: 2 })
393 expect(pool
.opts
.enableTasksQueue
).toBe(true)
394 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
395 pool
.enableTasksQueue(false)
396 expect(pool
.opts
.enableTasksQueue
).toBe(false)
397 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
401 it('Verify that pool tasks queue options can be set', async () => {
402 const pool
= new FixedThreadPool(
404 './tests/worker-files/thread/testWorker.js',
405 { enableTasksQueue
: true }
407 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 1 })
408 pool
.setTasksQueueOptions({ concurrency
: 2 })
409 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
411 pool
.setTasksQueueOptions('invalidTasksQueueOptions')
412 ).toThrowError('Invalid tasks queue options: must be a plain object')
413 expect(() => pool
.setTasksQueueOptions({ concurrency
: 0 })).toThrowError(
414 "Invalid worker tasks concurrency '0'"
416 expect(() => pool
.setTasksQueueOptions({ concurrency
: 0.2 })).toThrowError(
417 'Invalid worker tasks concurrency: must be an integer'
422 it('Verify that pool info is set', async () => {
423 let pool
= new FixedThreadPool(
425 './tests/worker-files/thread/testWorker.js'
427 expect(pool
.info
).toStrictEqual({
429 type
: PoolTypes
.fixed
,
430 worker
: WorkerTypes
.thread
,
432 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
433 minSize
: numberOfWorkers
,
434 maxSize
: numberOfWorkers
,
435 workerNodes
: numberOfWorkers
,
436 idleWorkerNodes
: numberOfWorkers
,
444 await
waitPoolEvents(pool
, PoolEvents
.ready
, 1)
445 expect(pool
.info
).toStrictEqual({
447 type
: PoolTypes
.fixed
,
448 worker
: WorkerTypes
.thread
,
450 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
451 minSize
: numberOfWorkers
,
452 maxSize
: numberOfWorkers
,
453 workerNodes
: numberOfWorkers
,
454 idleWorkerNodes
: numberOfWorkers
,
463 pool
= new DynamicClusterPool(
464 Math
.floor(numberOfWorkers
/ 2),
466 './tests/worker-files/cluster/testWorker.js'
468 expect(pool
.info
).toStrictEqual({
470 type
: PoolTypes
.dynamic
,
471 worker
: WorkerTypes
.cluster
,
473 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
474 minSize
: Math
.floor(numberOfWorkers
/ 2),
475 maxSize
: numberOfWorkers
,
476 workerNodes
: Math
.floor(numberOfWorkers
/ 2),
477 idleWorkerNodes
: Math
.floor(numberOfWorkers
/ 2),
485 await
waitPoolEvents(pool
, PoolEvents
.ready
, 1)
486 expect(pool
.info
).toStrictEqual({
488 type
: PoolTypes
.dynamic
,
489 worker
: WorkerTypes
.cluster
,
491 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
492 minSize
: Math
.floor(numberOfWorkers
/ 2),
493 maxSize
: numberOfWorkers
,
494 workerNodes
: Math
.floor(numberOfWorkers
/ 2),
495 idleWorkerNodes
: Math
.floor(numberOfWorkers
/ 2),
506 it('Verify that pool worker tasks usage are initialized', async () => {
507 const pool
= new FixedClusterPool(
509 './tests/worker-files/cluster/testWorker.js'
511 for (const workerNode
of pool
.workerNodes
) {
512 expect(workerNode
.usage
).toStrictEqual({
521 history
: expect
.any(CircularArray
)
524 history
: expect
.any(CircularArray
)
528 history
: expect
.any(CircularArray
)
531 history
: expect
.any(CircularArray
)
539 it('Verify that pool worker tasks queue are initialized', async () => {
540 let pool
= new FixedClusterPool(
542 './tests/worker-files/cluster/testWorker.js'
544 for (const workerNode
of pool
.workerNodes
) {
545 expect(workerNode
.tasksQueue
).toBeDefined()
546 expect(workerNode
.tasksQueue
).toBeInstanceOf(Queue
)
547 expect(workerNode
.tasksQueue
.size
).toBe(0)
548 expect(workerNode
.tasksQueue
.maxSize
).toBe(0)
551 pool
= new DynamicThreadPool(
552 Math
.floor(numberOfWorkers
/ 2),
554 './tests/worker-files/thread/testWorker.js'
556 for (const workerNode
of pool
.workerNodes
) {
557 expect(workerNode
.tasksQueue
).toBeDefined()
558 expect(workerNode
.tasksQueue
).toBeInstanceOf(Queue
)
559 expect(workerNode
.tasksQueue
.size
).toBe(0)
560 expect(workerNode
.tasksQueue
.maxSize
).toBe(0)
564 it('Verify that pool worker info are initialized', async () => {
565 let pool
= new FixedClusterPool(
567 './tests/worker-files/cluster/testWorker.js'
569 for (const workerNode
of pool
.workerNodes
) {
570 expect(workerNode
.info
).toStrictEqual({
571 id
: expect
.any(Number
),
572 type
: WorkerTypes
.cluster
,
577 await
waitPoolEvents(pool
, PoolEvents
.ready
, 1)
578 for (const workerNode
of pool
.workerNodes
) {
579 expect(workerNode
.info
).toStrictEqual({
580 id
: expect
.any(Number
),
581 type
: WorkerTypes
.cluster
,
587 pool
= new DynamicThreadPool(
588 Math
.floor(numberOfWorkers
/ 2),
590 './tests/worker-files/thread/testWorker.js'
592 for (const workerNode
of pool
.workerNodes
) {
593 expect(workerNode
.info
).toStrictEqual({
594 id
: expect
.any(Number
),
595 type
: WorkerTypes
.thread
,
600 await
waitPoolEvents(pool
, PoolEvents
.ready
, 1)
601 for (const workerNode
of pool
.workerNodes
) {
602 expect(workerNode
.info
).toStrictEqual({
603 id
: expect
.any(Number
),
604 type
: WorkerTypes
.thread
,
611 it('Verify that pool worker tasks usage are computed', async () => {
612 const pool
= new FixedClusterPool(
614 './tests/worker-files/cluster/testWorker.js'
616 const promises
= new Set()
617 const maxMultiplier
= 2
618 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
619 promises
.add(pool
.execute())
621 for (const workerNode
of pool
.workerNodes
) {
622 expect(workerNode
.usage
).toStrictEqual({
625 executing
: maxMultiplier
,
631 history
: expect
.any(CircularArray
)
634 history
: expect
.any(CircularArray
)
638 history
: expect
.any(CircularArray
)
641 history
: expect
.any(CircularArray
)
646 await Promise
.all(promises
)
647 for (const workerNode
of pool
.workerNodes
) {
648 expect(workerNode
.usage
).toStrictEqual({
650 executed
: maxMultiplier
,
657 history
: expect
.any(CircularArray
)
660 history
: expect
.any(CircularArray
)
664 history
: expect
.any(CircularArray
)
667 history
: expect
.any(CircularArray
)
675 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
676 const pool
= new DynamicThreadPool(
677 Math
.floor(numberOfWorkers
/ 2),
679 './tests/worker-files/thread/testWorker.js'
681 const promises
= new Set()
682 const maxMultiplier
= 2
683 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
684 promises
.add(pool
.execute())
686 await Promise
.all(promises
)
687 for (const workerNode
of pool
.workerNodes
) {
688 expect(workerNode
.usage
).toStrictEqual({
690 executed
: expect
.any(Number
),
697 history
: expect
.any(CircularArray
)
700 history
: expect
.any(CircularArray
)
704 history
: expect
.any(CircularArray
)
707 history
: expect
.any(CircularArray
)
711 expect(workerNode
.usage
.tasks
.executed
).toBeGreaterThan(0)
712 expect(workerNode
.usage
.tasks
.executed
).toBeLessThanOrEqual(maxMultiplier
)
714 pool
.setWorkerChoiceStrategy(WorkerChoiceStrategies
.FAIR_SHARE
)
715 for (const workerNode
of pool
.workerNodes
) {
716 expect(workerNode
.usage
).toStrictEqual({
725 history
: expect
.any(CircularArray
)
728 history
: expect
.any(CircularArray
)
732 history
: expect
.any(CircularArray
)
735 history
: expect
.any(CircularArray
)
739 expect(workerNode
.usage
.runTime
.history
.length
).toBe(0)
740 expect(workerNode
.usage
.waitTime
.history
.length
).toBe(0)
745 it("Verify that pool event emitter 'full' event can register a callback", async () => {
746 const pool
= new DynamicThreadPool(
747 Math
.floor(numberOfWorkers
/ 2),
749 './tests/worker-files/thread/testWorker.js'
751 const promises
= new Set()
754 pool
.emitter
.on(PoolEvents
.full
, info
=> {
758 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
759 promises
.add(pool
.execute())
761 await Promise
.all(promises
)
762 // The `full` event is triggered when the number of submitted tasks at once reach the maximum number of workers in the dynamic pool.
763 // So in total numberOfWorkers * 2 - 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool with min = (max = numberOfWorkers) / 2.
764 expect(poolFull
).toBe(numberOfWorkers
* 2 - 1)
765 expect(poolInfo
).toStrictEqual({
767 type
: PoolTypes
.dynamic
,
768 worker
: WorkerTypes
.thread
,
769 ready
: expect
.any(Boolean
),
770 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
771 minSize
: expect
.any(Number
),
772 maxSize
: expect
.any(Number
),
773 workerNodes
: expect
.any(Number
),
774 idleWorkerNodes
: expect
.any(Number
),
775 busyWorkerNodes
: expect
.any(Number
),
776 executedTasks
: expect
.any(Number
),
777 executingTasks
: expect
.any(Number
),
778 queuedTasks
: expect
.any(Number
),
779 maxQueuedTasks
: expect
.any(Number
),
780 failedTasks
: expect
.any(Number
)
785 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
786 const pool
= new DynamicClusterPool(
787 Math
.floor(numberOfWorkers
/ 2),
789 './tests/worker-files/cluster/testWorker.js'
793 pool
.emitter
.on(PoolEvents
.ready
, info
=> {
797 await
waitPoolEvents(pool
, PoolEvents
.ready
, 1)
798 expect(poolReady
).toBe(1)
799 expect(poolInfo
).toStrictEqual({
801 type
: PoolTypes
.dynamic
,
802 worker
: WorkerTypes
.cluster
,
804 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
805 minSize
: expect
.any(Number
),
806 maxSize
: expect
.any(Number
),
807 workerNodes
: expect
.any(Number
),
808 idleWorkerNodes
: expect
.any(Number
),
809 busyWorkerNodes
: expect
.any(Number
),
810 executedTasks
: expect
.any(Number
),
811 executingTasks
: expect
.any(Number
),
812 queuedTasks
: expect
.any(Number
),
813 maxQueuedTasks
: expect
.any(Number
),
814 failedTasks
: expect
.any(Number
)
819 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
820 const pool
= new FixedThreadPool(
822 './tests/worker-files/thread/testWorker.js'
824 const promises
= new Set()
827 pool
.emitter
.on(PoolEvents
.busy
, info
=> {
831 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
832 promises
.add(pool
.execute())
834 await Promise
.all(promises
)
835 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
836 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
837 expect(poolBusy
).toBe(numberOfWorkers
+ 1)
838 expect(poolInfo
).toStrictEqual({
840 type
: PoolTypes
.fixed
,
841 worker
: WorkerTypes
.thread
,
842 ready
: expect
.any(Boolean
),
843 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
844 minSize
: expect
.any(Number
),
845 maxSize
: expect
.any(Number
),
846 workerNodes
: expect
.any(Number
),
847 idleWorkerNodes
: expect
.any(Number
),
848 busyWorkerNodes
: expect
.any(Number
),
849 executedTasks
: expect
.any(Number
),
850 executingTasks
: expect
.any(Number
),
851 queuedTasks
: expect
.any(Number
),
852 maxQueuedTasks
: expect
.any(Number
),
853 failedTasks
: expect
.any(Number
)
858 it('Verify that multiple tasks worker is working', async () => {
859 const pool
= new DynamicClusterPool(
860 Math
.floor(numberOfWorkers
/ 2),
862 './tests/worker-files/cluster/testMultiTasksWorker.js'
864 const data
= { n
: 10 }
865 const result0
= await pool
.execute(data
)
866 expect(result0
).toStrictEqual({ ok
: 1 })
867 const result1
= await pool
.execute(data
, 'jsonIntegerSerialization')
868 expect(result1
).toStrictEqual({ ok
: 1 })
869 const result2
= await pool
.execute(data
, 'factorial')
870 expect(result2
).toBe(3628800)
871 const result3
= await pool
.execute(data
, 'fibonacci')
872 expect(result3
).toBe(55)