1 const { expect
} = require('expect')
9 WorkerChoiceStrategies
,
11 } = require('../../../lib')
12 const { CircularArray
} = require('../../../lib/circular-array')
13 const { Queue
} = require('../../../lib/queue')
14 const { version
} = require('../../../package.json')
15 const { waitPoolEvents
} = require('../../test-utils')
17 describe('Abstract pool test suite', () => {
18 const numberOfWorkers
= 2
19 class StubPoolWithIsMain
extends FixedThreadPool
{
25 it('Simulate pool creation from a non main thread/process', () => {
28 new StubPoolWithIsMain(
30 './tests/worker-files/thread/testWorker.js',
32 errorHandler
: e
=> console
.error(e
)
35 ).toThrowError('Cannot start a pool from a worker!')
38 it('Verify that filePath is checked', () => {
39 const expectedError
= new Error(
40 'Please specify a file with a worker implementation'
42 expect(() => new FixedThreadPool(numberOfWorkers
)).toThrowError(
45 expect(() => new FixedThreadPool(numberOfWorkers
, '')).toThrowError(
48 expect(() => new FixedThreadPool(numberOfWorkers
, 0)).toThrowError(
51 expect(() => new FixedThreadPool(numberOfWorkers
, true)).toThrowError(
55 () => new FixedThreadPool(numberOfWorkers
, './dummyWorker.ts')
56 ).toThrowError(new Error("Cannot find the worker file './dummyWorker.ts'"))
59 it('Verify that numberOfWorkers is checked', () => {
60 expect(() => new FixedThreadPool()).toThrowError(
61 'Cannot instantiate a pool without specifying the number of workers'
65 it('Verify that a negative number of workers is checked', () => {
68 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
71 'Cannot instantiate a pool with a negative number of workers'
76 it('Verify that a non integer number of workers is checked', () => {
79 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
82 'Cannot instantiate a pool with a non safe integer number of workers'
87 it('Verify that dynamic pool sizing is checked', () => {
90 new DynamicThreadPool(2, 1, './tests/worker-files/thread/testWorker.js')
93 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
98 new DynamicThreadPool(1, 1, './tests/worker-files/thread/testWorker.js')
101 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
106 new DynamicThreadPool(0, 0, './tests/worker-files/thread/testWorker.js')
109 'Cannot instantiate a dynamic pool with a minimum pool size and a maximum pool size equal to zero'
114 it('Verify that pool options are checked', async () => {
115 let pool
= new FixedThreadPool(
117 './tests/worker-files/thread/testWorker.js'
119 expect(pool
.emitter
).toBeDefined()
120 expect(pool
.opts
.enableEvents
).toBe(true)
121 expect(pool
.opts
.restartWorkerOnError
).toBe(true)
122 expect(pool
.opts
.enableTasksQueue
).toBe(false)
123 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
124 expect(pool
.opts
.workerChoiceStrategy
).toBe(
125 WorkerChoiceStrategies
.ROUND_ROBIN
127 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
128 runTime
: { median
: false },
129 waitTime
: { median
: false },
130 elu
: { median
: false }
132 expect(pool
.opts
.messageHandler
).toBeUndefined()
133 expect(pool
.opts
.errorHandler
).toBeUndefined()
134 expect(pool
.opts
.onlineHandler
).toBeUndefined()
135 expect(pool
.opts
.exitHandler
).toBeUndefined()
137 const testHandler
= () => console
.log('test handler executed')
138 pool
= new FixedThreadPool(
140 './tests/worker-files/thread/testWorker.js',
142 workerChoiceStrategy
: WorkerChoiceStrategies
.LEAST_USED
,
143 workerChoiceStrategyOptions
: {
144 runTime
: { median
: true },
145 weights
: { 0: 300, 1: 200 }
148 restartWorkerOnError
: false,
149 enableTasksQueue
: true,
150 tasksQueueOptions
: { concurrency
: 2 },
151 messageHandler
: testHandler
,
152 errorHandler
: testHandler
,
153 onlineHandler
: testHandler
,
154 exitHandler
: testHandler
157 expect(pool
.emitter
).toBeUndefined()
158 expect(pool
.opts
.enableEvents
).toBe(false)
159 expect(pool
.opts
.restartWorkerOnError
).toBe(false)
160 expect(pool
.opts
.enableTasksQueue
).toBe(true)
161 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
162 expect(pool
.opts
.workerChoiceStrategy
).toBe(
163 WorkerChoiceStrategies
.LEAST_USED
165 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
166 runTime
: { median
: true },
167 weights
: { 0: 300, 1: 200 }
169 expect(pool
.opts
.messageHandler
).toStrictEqual(testHandler
)
170 expect(pool
.opts
.errorHandler
).toStrictEqual(testHandler
)
171 expect(pool
.opts
.onlineHandler
).toStrictEqual(testHandler
)
172 expect(pool
.opts
.exitHandler
).toStrictEqual(testHandler
)
176 it('Verify that pool options are validated', async () => {
181 './tests/worker-files/thread/testWorker.js',
183 workerChoiceStrategy
: 'invalidStrategy'
186 ).toThrowError("Invalid worker choice strategy 'invalidStrategy'")
191 './tests/worker-files/thread/testWorker.js',
193 workerChoiceStrategyOptions
: 'invalidOptions'
197 'Invalid worker choice strategy options: must be a plain object'
203 './tests/worker-files/thread/testWorker.js',
205 workerChoiceStrategyOptions
: { weights
: {} }
209 'Invalid worker choice strategy options: must have a weight for each worker node'
215 './tests/worker-files/thread/testWorker.js',
217 workerChoiceStrategyOptions
: { measurement
: 'invalidMeasurement' }
221 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
227 './tests/worker-files/thread/testWorker.js',
229 enableTasksQueue
: true,
230 tasksQueueOptions
: { concurrency
: 0 }
233 ).toThrowError("Invalid worker tasks concurrency '0'")
238 './tests/worker-files/thread/testWorker.js',
240 enableTasksQueue
: true,
241 tasksQueueOptions
: 'invalidTasksQueueOptions'
244 ).toThrowError('Invalid tasks queue options: must be a plain object')
249 './tests/worker-files/thread/testWorker.js',
251 enableTasksQueue
: true,
252 tasksQueueOptions
: { concurrency
: 0.2 }
255 ).toThrowError('Invalid worker tasks concurrency: must be an integer')
258 it('Verify that pool worker choice strategy options can be set', async () => {
259 const pool
= new FixedThreadPool(
261 './tests/worker-files/thread/testWorker.js',
262 { workerChoiceStrategy
: WorkerChoiceStrategies
.FAIR_SHARE
}
264 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
265 runTime
: { median
: false },
266 waitTime
: { median
: false },
267 elu
: { median
: false }
269 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
270 .workerChoiceStrategies
) {
271 expect(workerChoiceStrategy
.opts
).toStrictEqual({
272 runTime
: { median
: false },
273 waitTime
: { median
: false },
274 elu
: { median
: false }
278 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
296 pool
.setWorkerChoiceStrategyOptions({
297 runTime
: { median
: true },
298 elu
: { median
: true }
300 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
301 runTime
: { median
: true },
302 elu
: { median
: true }
304 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
305 .workerChoiceStrategies
) {
306 expect(workerChoiceStrategy
.opts
).toStrictEqual({
307 runTime
: { median
: true },
308 elu
: { median
: true }
312 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
330 pool
.setWorkerChoiceStrategyOptions({
331 runTime
: { median
: false },
332 elu
: { median
: false }
334 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
335 runTime
: { median
: false },
336 elu
: { median
: false }
338 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
339 .workerChoiceStrategies
) {
340 expect(workerChoiceStrategy
.opts
).toStrictEqual({
341 runTime
: { median
: false },
342 elu
: { median
: false }
346 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
365 pool
.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
367 'Invalid worker choice strategy options: must be a plain object'
370 pool
.setWorkerChoiceStrategyOptions({ weights
: {} })
372 'Invalid worker choice strategy options: must have a weight for each worker node'
375 pool
.setWorkerChoiceStrategyOptions({ measurement
: 'invalidMeasurement' })
377 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
382 it('Verify that pool tasks queue can be enabled/disabled', async () => {
383 const pool
= new FixedThreadPool(
385 './tests/worker-files/thread/testWorker.js'
387 expect(pool
.opts
.enableTasksQueue
).toBe(false)
388 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
389 pool
.enableTasksQueue(true)
390 expect(pool
.opts
.enableTasksQueue
).toBe(true)
391 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 1 })
392 pool
.enableTasksQueue(true, { concurrency
: 2 })
393 expect(pool
.opts
.enableTasksQueue
).toBe(true)
394 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
395 pool
.enableTasksQueue(false)
396 expect(pool
.opts
.enableTasksQueue
).toBe(false)
397 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
401 it('Verify that pool tasks queue options can be set', async () => {
402 const pool
= new FixedThreadPool(
404 './tests/worker-files/thread/testWorker.js',
405 { enableTasksQueue
: true }
407 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 1 })
408 pool
.setTasksQueueOptions({ concurrency
: 2 })
409 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
411 pool
.setTasksQueueOptions('invalidTasksQueueOptions')
412 ).toThrowError('Invalid tasks queue options: must be a plain object')
413 expect(() => pool
.setTasksQueueOptions({ concurrency
: 0 })).toThrowError(
414 "Invalid worker tasks concurrency '0'"
416 expect(() => pool
.setTasksQueueOptions({ concurrency
: 0.2 })).toThrowError(
417 'Invalid worker tasks concurrency: must be an integer'
422 it('Verify that pool info is set', async () => {
423 let pool
= new FixedThreadPool(
425 './tests/worker-files/thread/testWorker.js'
427 expect(pool
.info
).toStrictEqual({
429 type
: PoolTypes
.fixed
,
430 worker
: WorkerTypes
.thread
,
432 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
433 minSize
: numberOfWorkers
,
434 maxSize
: numberOfWorkers
,
435 workerNodes
: numberOfWorkers
,
436 idleWorkerNodes
: numberOfWorkers
,
445 pool
= new DynamicClusterPool(
446 Math
.floor(numberOfWorkers
/ 2),
448 './tests/worker-files/cluster/testWorker.js'
450 expect(pool
.info
).toStrictEqual({
452 type
: PoolTypes
.dynamic
,
453 worker
: WorkerTypes
.cluster
,
455 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
456 minSize
: Math
.floor(numberOfWorkers
/ 2),
457 maxSize
: numberOfWorkers
,
458 workerNodes
: Math
.floor(numberOfWorkers
/ 2),
459 idleWorkerNodes
: Math
.floor(numberOfWorkers
/ 2),
470 it('Verify that pool worker tasks usage are initialized', async () => {
471 const pool
= new FixedClusterPool(
473 './tests/worker-files/cluster/testWorker.js'
475 for (const workerNode
of pool
.workerNodes
) {
476 expect(workerNode
.usage
).toStrictEqual({
485 history
: expect
.any(CircularArray
)
488 history
: expect
.any(CircularArray
)
492 history
: expect
.any(CircularArray
)
495 history
: expect
.any(CircularArray
)
503 it('Verify that pool worker tasks queue are initialized', async () => {
504 let pool
= new FixedClusterPool(
506 './tests/worker-files/cluster/testWorker.js'
508 for (const workerNode
of pool
.workerNodes
) {
509 expect(workerNode
.tasksQueue
).toBeDefined()
510 expect(workerNode
.tasksQueue
).toBeInstanceOf(Queue
)
511 expect(workerNode
.tasksQueue
.size
).toBe(0)
512 expect(workerNode
.tasksQueue
.maxSize
).toBe(0)
515 pool
= new DynamicThreadPool(
516 Math
.floor(numberOfWorkers
/ 2),
518 './tests/worker-files/thread/testWorker.js'
520 for (const workerNode
of pool
.workerNodes
) {
521 expect(workerNode
.tasksQueue
).toBeDefined()
522 expect(workerNode
.tasksQueue
).toBeInstanceOf(Queue
)
523 expect(workerNode
.tasksQueue
.size
).toBe(0)
524 expect(workerNode
.tasksQueue
.maxSize
).toBe(0)
528 it('Verify that pool worker info are initialized', async () => {
529 let pool
= new FixedClusterPool(
531 './tests/worker-files/cluster/testWorker.js'
533 for (const workerNode
of pool
.workerNodes
) {
534 expect(workerNode
.info
).toStrictEqual({
535 id
: expect
.any(Number
),
536 type
: WorkerTypes
.cluster
,
542 pool
= new DynamicThreadPool(
543 Math
.floor(numberOfWorkers
/ 2),
545 './tests/worker-files/thread/testWorker.js'
547 for (const workerNode
of pool
.workerNodes
) {
548 expect(workerNode
.info
).toStrictEqual({
549 id
: expect
.any(Number
),
550 type
: WorkerTypes
.thread
,
557 it('Verify that pool worker tasks usage are computed', async () => {
558 const pool
= new FixedClusterPool(
560 './tests/worker-files/cluster/testWorker.js'
562 const promises
= new Set()
563 const maxMultiplier
= 2
564 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
565 promises
.add(pool
.execute())
567 for (const workerNode
of pool
.workerNodes
) {
568 expect(workerNode
.usage
).toStrictEqual({
571 executing
: maxMultiplier
,
577 history
: expect
.any(CircularArray
)
580 history
: expect
.any(CircularArray
)
584 history
: expect
.any(CircularArray
)
587 history
: expect
.any(CircularArray
)
592 await Promise
.all(promises
)
593 for (const workerNode
of pool
.workerNodes
) {
594 expect(workerNode
.usage
).toStrictEqual({
596 executed
: maxMultiplier
,
603 history
: expect
.any(CircularArray
)
606 history
: expect
.any(CircularArray
)
610 history
: expect
.any(CircularArray
)
613 history
: expect
.any(CircularArray
)
621 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
622 const pool
= new DynamicThreadPool(
623 Math
.floor(numberOfWorkers
/ 2),
625 './tests/worker-files/thread/testWorker.js'
627 const promises
= new Set()
628 const maxMultiplier
= 2
629 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
630 promises
.add(pool
.execute())
632 await Promise
.all(promises
)
633 for (const workerNode
of pool
.workerNodes
) {
634 expect(workerNode
.usage
).toStrictEqual({
636 executed
: expect
.any(Number
),
643 history
: expect
.any(CircularArray
)
646 history
: expect
.any(CircularArray
)
650 history
: expect
.any(CircularArray
)
653 history
: expect
.any(CircularArray
)
657 expect(workerNode
.usage
.tasks
.executed
).toBeGreaterThan(0)
658 expect(workerNode
.usage
.tasks
.executed
).toBeLessThanOrEqual(maxMultiplier
)
660 pool
.setWorkerChoiceStrategy(WorkerChoiceStrategies
.FAIR_SHARE
)
661 for (const workerNode
of pool
.workerNodes
) {
662 expect(workerNode
.usage
).toStrictEqual({
671 history
: expect
.any(CircularArray
)
674 history
: expect
.any(CircularArray
)
678 history
: expect
.any(CircularArray
)
681 history
: expect
.any(CircularArray
)
685 expect(workerNode
.usage
.runTime
.history
.length
).toBe(0)
686 expect(workerNode
.usage
.waitTime
.history
.length
).toBe(0)
691 it("Verify that pool event emitter 'full' event can register a callback", async () => {
692 const pool
= new DynamicThreadPool(
693 Math
.floor(numberOfWorkers
/ 2),
695 './tests/worker-files/thread/testWorker.js'
697 const promises
= new Set()
700 pool
.emitter
.on(PoolEvents
.full
, info
=> {
704 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
705 promises
.add(pool
.execute())
707 await Promise
.all(promises
)
708 // The `full` event is triggered when the number of submitted tasks at once reach the maximum number of workers in the dynamic pool.
709 // So in total numberOfWorkers * 2 - 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool with min = (max = numberOfWorkers) / 2.
710 expect(poolFull
).toBe(numberOfWorkers
* 2 - 1)
711 expect(poolInfo
).toStrictEqual({
713 type
: PoolTypes
.dynamic
,
714 worker
: WorkerTypes
.thread
,
715 ready
: expect
.any(Boolean
),
716 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
717 minSize
: expect
.any(Number
),
718 maxSize
: expect
.any(Number
),
719 workerNodes
: expect
.any(Number
),
720 idleWorkerNodes
: expect
.any(Number
),
721 busyWorkerNodes
: expect
.any(Number
),
722 executedTasks
: expect
.any(Number
),
723 executingTasks
: expect
.any(Number
),
724 queuedTasks
: expect
.any(Number
),
725 maxQueuedTasks
: expect
.any(Number
),
726 failedTasks
: expect
.any(Number
)
731 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
732 const pool
= new DynamicClusterPool(
733 Math
.floor(numberOfWorkers
/ 2),
735 './tests/worker-files/cluster/testWorker.js'
739 pool
.emitter
.on(PoolEvents
.ready
, info
=> {
743 await
waitPoolEvents(pool
, PoolEvents
.ready
, 1)
744 expect(poolReady
).toBe(1)
745 expect(poolInfo
).toStrictEqual({
747 type
: PoolTypes
.dynamic
,
748 worker
: WorkerTypes
.cluster
,
750 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
751 minSize
: expect
.any(Number
),
752 maxSize
: expect
.any(Number
),
753 workerNodes
: expect
.any(Number
),
754 idleWorkerNodes
: expect
.any(Number
),
755 busyWorkerNodes
: expect
.any(Number
),
756 executedTasks
: expect
.any(Number
),
757 executingTasks
: expect
.any(Number
),
758 queuedTasks
: expect
.any(Number
),
759 maxQueuedTasks
: expect
.any(Number
),
760 failedTasks
: expect
.any(Number
)
765 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
766 const pool
= new FixedThreadPool(
768 './tests/worker-files/thread/testWorker.js'
770 const promises
= new Set()
773 pool
.emitter
.on(PoolEvents
.busy
, info
=> {
777 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
778 promises
.add(pool
.execute())
780 await Promise
.all(promises
)
781 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
782 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
783 expect(poolBusy
).toBe(numberOfWorkers
+ 1)
784 expect(poolInfo
).toStrictEqual({
786 type
: PoolTypes
.fixed
,
787 worker
: WorkerTypes
.thread
,
788 ready
: expect
.any(Boolean
),
789 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
790 minSize
: expect
.any(Number
),
791 maxSize
: expect
.any(Number
),
792 workerNodes
: expect
.any(Number
),
793 idleWorkerNodes
: expect
.any(Number
),
794 busyWorkerNodes
: expect
.any(Number
),
795 executedTasks
: expect
.any(Number
),
796 executingTasks
: expect
.any(Number
),
797 queuedTasks
: expect
.any(Number
),
798 maxQueuedTasks
: expect
.any(Number
),
799 failedTasks
: expect
.any(Number
)
804 it('Verify that multiple tasks worker is working', async () => {
805 const pool
= new DynamicClusterPool(
806 Math
.floor(numberOfWorkers
/ 2),
808 './tests/worker-files/cluster/testMultiTasksWorker.js'
810 const data
= { n
: 10 }
811 const result0
= await pool
.execute(data
)
812 expect(result0
).toStrictEqual({ ok
: 1 })
813 const result1
= await pool
.execute(data
, 'jsonIntegerSerialization')
814 expect(result1
).toStrictEqual({ ok
: 1 })
815 const result2
= await pool
.execute(data
, 'factorial')
816 expect(result2
).toBe(3628800)
817 const result3
= await pool
.execute(data
, 'fibonacci')
818 expect(result3
).toBe(55)