1 const { expect
} = require('expect')
8 WorkerChoiceStrategies
,
11 } = require('../../../lib')
12 const { CircularArray
} = require('../../../lib/circular-array')
13 const { Queue
} = require('../../../lib/queue')
14 const { version
} = require('../../../package.json')
15 const { waitPoolEvents
} = require('../../test-utils')
17 describe('Abstract pool test suite', () => {
18 const numberOfWorkers
= 2
19 class StubPoolWithRemoveAllWorker
extends FixedThreadPool
{
22 this.promiseResponseMap
.clear()
23 this.handleWorkerReadyMessage
= () => {}
26 class StubPoolWithIsMain
extends FixedThreadPool
{
32 it('Simulate pool creation from a non main thread/process', () => {
35 new StubPoolWithIsMain(
37 './tests/worker-files/thread/testWorker.js',
39 errorHandler
: e
=> console
.error(e
)
42 ).toThrowError('Cannot start a pool from a worker!')
45 it('Verify that filePath is checked', () => {
46 const expectedError
= new Error(
47 'Please specify a file with a worker implementation'
49 expect(() => new FixedThreadPool(numberOfWorkers
)).toThrowError(
52 expect(() => new FixedThreadPool(numberOfWorkers
, '')).toThrowError(
57 it('Verify that numberOfWorkers is checked', () => {
58 expect(() => new FixedThreadPool()).toThrowError(
59 'Cannot instantiate a pool without specifying the number of workers'
63 it('Verify that a negative number of workers is checked', () => {
66 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
69 'Cannot instantiate a pool with a negative number of workers'
74 it('Verify that a non integer number of workers is checked', () => {
77 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
80 'Cannot instantiate a pool with a non safe integer number of workers'
85 it('Verify dynamic pool sizing', () => {
88 new DynamicThreadPool(2, 1, './tests/worker-files/thread/testWorker.js')
91 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
96 new DynamicThreadPool(1, 1, './tests/worker-files/thread/testWorker.js')
99 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
104 it('Verify that pool options are checked', async () => {
105 let pool
= new FixedThreadPool(
107 './tests/worker-files/thread/testWorker.js'
109 expect(pool
.emitter
).toBeDefined()
110 expect(pool
.opts
.enableEvents
).toBe(true)
111 expect(pool
.opts
.restartWorkerOnError
).toBe(true)
112 expect(pool
.opts
.enableTasksQueue
).toBe(false)
113 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
114 expect(pool
.opts
.workerChoiceStrategy
).toBe(
115 WorkerChoiceStrategies
.ROUND_ROBIN
117 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
118 runTime
: { median
: false },
119 waitTime
: { median
: false },
120 elu
: { median
: false }
122 expect(pool
.opts
.messageHandler
).toBeUndefined()
123 expect(pool
.opts
.errorHandler
).toBeUndefined()
124 expect(pool
.opts
.onlineHandler
).toBeUndefined()
125 expect(pool
.opts
.exitHandler
).toBeUndefined()
127 const testHandler
= () => console
.log('test handler executed')
128 pool
= new FixedThreadPool(
130 './tests/worker-files/thread/testWorker.js',
132 workerChoiceStrategy
: WorkerChoiceStrategies
.LEAST_USED
,
133 workerChoiceStrategyOptions
: {
134 runTime
: { median
: true },
135 weights
: { 0: 300, 1: 200 }
138 restartWorkerOnError
: false,
139 enableTasksQueue
: true,
140 tasksQueueOptions
: { concurrency
: 2 },
141 messageHandler
: testHandler
,
142 errorHandler
: testHandler
,
143 onlineHandler
: testHandler
,
144 exitHandler
: testHandler
147 expect(pool
.emitter
).toBeUndefined()
148 expect(pool
.opts
.enableEvents
).toBe(false)
149 expect(pool
.opts
.restartWorkerOnError
).toBe(false)
150 expect(pool
.opts
.enableTasksQueue
).toBe(true)
151 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
152 expect(pool
.opts
.workerChoiceStrategy
).toBe(
153 WorkerChoiceStrategies
.LEAST_USED
155 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
156 runTime
: { median
: true },
157 weights
: { 0: 300, 1: 200 }
159 expect(pool
.opts
.messageHandler
).toStrictEqual(testHandler
)
160 expect(pool
.opts
.errorHandler
).toStrictEqual(testHandler
)
161 expect(pool
.opts
.onlineHandler
).toStrictEqual(testHandler
)
162 expect(pool
.opts
.exitHandler
).toStrictEqual(testHandler
)
166 it('Verify that pool options are validated', async () => {
171 './tests/worker-files/thread/testWorker.js',
173 workerChoiceStrategy
: 'invalidStrategy'
176 ).toThrowError("Invalid worker choice strategy 'invalidStrategy'")
181 './tests/worker-files/thread/testWorker.js',
183 workerChoiceStrategyOptions
: 'invalidOptions'
187 'Invalid worker choice strategy options: must be a plain object'
193 './tests/worker-files/thread/testWorker.js',
195 workerChoiceStrategyOptions
: { weights
: {} }
199 'Invalid worker choice strategy options: must have a weight for each worker node'
205 './tests/worker-files/thread/testWorker.js',
207 workerChoiceStrategyOptions
: { measurement
: 'invalidMeasurement' }
211 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
217 './tests/worker-files/thread/testWorker.js',
219 enableTasksQueue
: true,
220 tasksQueueOptions
: { concurrency
: 0 }
223 ).toThrowError("Invalid worker tasks concurrency '0'")
228 './tests/worker-files/thread/testWorker.js',
230 enableTasksQueue
: true,
231 tasksQueueOptions
: 'invalidTasksQueueOptions'
234 ).toThrowError('Invalid tasks queue options: must be a plain object')
239 './tests/worker-files/thread/testWorker.js',
241 enableTasksQueue
: true,
242 tasksQueueOptions
: { concurrency
: 0.2 }
245 ).toThrowError('Invalid worker tasks concurrency: must be an integer')
248 it('Verify that pool worker choice strategy options can be set', async () => {
249 const pool
= new FixedThreadPool(
251 './tests/worker-files/thread/testWorker.js',
252 { workerChoiceStrategy
: WorkerChoiceStrategies
.FAIR_SHARE
}
254 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
255 runTime
: { median
: false },
256 waitTime
: { median
: false },
257 elu
: { median
: false }
259 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
260 .workerChoiceStrategies
) {
261 expect(workerChoiceStrategy
.opts
).toStrictEqual({
262 runTime
: { median
: false },
263 waitTime
: { median
: false },
264 elu
: { median
: false }
268 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
286 pool
.setWorkerChoiceStrategyOptions({
287 runTime
: { median
: true },
288 elu
: { median
: true }
290 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
291 runTime
: { median
: true },
292 elu
: { median
: true }
294 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
295 .workerChoiceStrategies
) {
296 expect(workerChoiceStrategy
.opts
).toStrictEqual({
297 runTime
: { median
: true },
298 elu
: { median
: true }
302 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
320 pool
.setWorkerChoiceStrategyOptions({
321 runTime
: { median
: false },
322 elu
: { median
: false }
324 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
325 runTime
: { median
: false },
326 elu
: { median
: false }
328 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
329 .workerChoiceStrategies
) {
330 expect(workerChoiceStrategy
.opts
).toStrictEqual({
331 runTime
: { median
: false },
332 elu
: { median
: false }
336 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
355 pool
.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
357 'Invalid worker choice strategy options: must be a plain object'
360 pool
.setWorkerChoiceStrategyOptions({ weights
: {} })
362 'Invalid worker choice strategy options: must have a weight for each worker node'
365 pool
.setWorkerChoiceStrategyOptions({ measurement
: 'invalidMeasurement' })
367 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
372 it('Verify that pool tasks queue can be enabled/disabled', async () => {
373 const pool
= new FixedThreadPool(
375 './tests/worker-files/thread/testWorker.js'
377 expect(pool
.opts
.enableTasksQueue
).toBe(false)
378 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
379 pool
.enableTasksQueue(true)
380 expect(pool
.opts
.enableTasksQueue
).toBe(true)
381 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 1 })
382 pool
.enableTasksQueue(true, { concurrency
: 2 })
383 expect(pool
.opts
.enableTasksQueue
).toBe(true)
384 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
385 pool
.enableTasksQueue(false)
386 expect(pool
.opts
.enableTasksQueue
).toBe(false)
387 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
391 it('Verify that pool tasks queue options can be set', async () => {
392 const pool
= new FixedThreadPool(
394 './tests/worker-files/thread/testWorker.js',
395 { enableTasksQueue
: true }
397 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 1 })
398 pool
.setTasksQueueOptions({ concurrency
: 2 })
399 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
401 pool
.setTasksQueueOptions('invalidTasksQueueOptions')
402 ).toThrowError('Invalid tasks queue options: must be a plain object')
403 expect(() => pool
.setTasksQueueOptions({ concurrency
: 0 })).toThrowError(
404 "Invalid worker tasks concurrency '0'"
406 expect(() => pool
.setTasksQueueOptions({ concurrency
: 0.2 })).toThrowError(
407 'Invalid worker tasks concurrency: must be an integer'
412 it('Verify that pool info is set', async () => {
413 let pool
= new FixedThreadPool(
415 './tests/worker-files/thread/testWorker.js'
417 expect(pool
.info
).toStrictEqual({
419 type
: PoolTypes
.fixed
,
420 worker
: WorkerTypes
.thread
,
422 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
423 minSize
: numberOfWorkers
,
424 maxSize
: numberOfWorkers
,
425 workerNodes
: numberOfWorkers
,
426 idleWorkerNodes
: numberOfWorkers
,
435 pool
= new DynamicClusterPool(
436 Math
.floor(numberOfWorkers
/ 2),
438 './tests/worker-files/cluster/testWorker.js'
440 expect(pool
.info
).toStrictEqual({
442 type
: PoolTypes
.dynamic
,
443 worker
: WorkerTypes
.cluster
,
445 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
446 minSize
: Math
.floor(numberOfWorkers
/ 2),
447 maxSize
: numberOfWorkers
,
448 workerNodes
: Math
.floor(numberOfWorkers
/ 2),
449 idleWorkerNodes
: Math
.floor(numberOfWorkers
/ 2),
460 it('Simulate worker not found', async () => {
461 const pool
= new StubPoolWithRemoveAllWorker(
463 './tests/worker-files/thread/testWorker.js',
465 errorHandler
: e
=> console
.error(e
)
468 expect(pool
.workerNodes
.length
).toBe(numberOfWorkers
)
469 // Simulate worker not found.
470 pool
.removeAllWorker()
471 expect(pool
.workerNodes
.length
).toBe(0)
475 it('Verify that pool worker tasks usage are initialized', async () => {
476 const pool
= new FixedClusterPool(
478 './tests/worker-files/cluster/testWorker.js'
480 for (const workerNode
of pool
.workerNodes
) {
481 expect(workerNode
.usage
).toStrictEqual({
490 history
: expect
.any(CircularArray
)
493 history
: expect
.any(CircularArray
)
497 history
: expect
.any(CircularArray
)
500 history
: expect
.any(CircularArray
)
508 it('Verify that pool worker tasks queue are initialized', async () => {
509 let pool
= new FixedClusterPool(
511 './tests/worker-files/cluster/testWorker.js'
513 for (const workerNode
of pool
.workerNodes
) {
514 expect(workerNode
.tasksQueue
).toBeDefined()
515 expect(workerNode
.tasksQueue
).toBeInstanceOf(Queue
)
516 expect(workerNode
.tasksQueue
.size
).toBe(0)
517 expect(workerNode
.tasksQueue
.maxSize
).toBe(0)
520 pool
= new DynamicThreadPool(
521 Math
.floor(numberOfWorkers
/ 2),
523 './tests/worker-files/thread/testWorker.js'
525 for (const workerNode
of pool
.workerNodes
) {
526 expect(workerNode
.tasksQueue
).toBeDefined()
527 expect(workerNode
.tasksQueue
).toBeInstanceOf(Queue
)
528 expect(workerNode
.tasksQueue
.size
).toBe(0)
529 expect(workerNode
.tasksQueue
.maxSize
).toBe(0)
533 it('Verify that pool worker info are initialized', async () => {
534 let pool
= new FixedClusterPool(
536 './tests/worker-files/cluster/testWorker.js'
538 for (const workerNode
of pool
.workerNodes
) {
539 expect(workerNode
.info
).toStrictEqual({
540 id
: expect
.any(Number
),
541 type
: WorkerTypes
.cluster
,
547 pool
= new DynamicThreadPool(
548 Math
.floor(numberOfWorkers
/ 2),
550 './tests/worker-files/thread/testWorker.js'
552 for (const workerNode
of pool
.workerNodes
) {
553 expect(workerNode
.info
).toStrictEqual({
554 id
: expect
.any(Number
),
555 type
: WorkerTypes
.thread
,
562 it('Verify that pool worker tasks usage are computed', async () => {
563 const pool
= new FixedClusterPool(
565 './tests/worker-files/cluster/testWorker.js'
567 const promises
= new Set()
568 const maxMultiplier
= 2
569 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
570 promises
.add(pool
.execute())
572 for (const workerNode
of pool
.workerNodes
) {
573 expect(workerNode
.usage
).toStrictEqual({
576 executing
: maxMultiplier
,
582 history
: expect
.any(CircularArray
)
585 history
: expect
.any(CircularArray
)
589 history
: expect
.any(CircularArray
)
592 history
: expect
.any(CircularArray
)
597 await Promise
.all(promises
)
598 for (const workerNode
of pool
.workerNodes
) {
599 expect(workerNode
.usage
).toStrictEqual({
601 executed
: maxMultiplier
,
608 history
: expect
.any(CircularArray
)
611 history
: expect
.any(CircularArray
)
615 history
: expect
.any(CircularArray
)
618 history
: expect
.any(CircularArray
)
626 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
627 const pool
= new DynamicThreadPool(
628 Math
.floor(numberOfWorkers
/ 2),
630 './tests/worker-files/thread/testWorker.js'
632 const promises
= new Set()
633 const maxMultiplier
= 2
634 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
635 promises
.add(pool
.execute())
637 await Promise
.all(promises
)
638 for (const workerNode
of pool
.workerNodes
) {
639 expect(workerNode
.usage
).toStrictEqual({
641 executed
: expect
.any(Number
),
648 history
: expect
.any(CircularArray
)
651 history
: expect
.any(CircularArray
)
655 history
: expect
.any(CircularArray
)
658 history
: expect
.any(CircularArray
)
662 expect(workerNode
.usage
.tasks
.executed
).toBeGreaterThan(0)
663 expect(workerNode
.usage
.tasks
.executed
).toBeLessThanOrEqual(maxMultiplier
)
665 pool
.setWorkerChoiceStrategy(WorkerChoiceStrategies
.FAIR_SHARE
)
666 for (const workerNode
of pool
.workerNodes
) {
667 expect(workerNode
.usage
).toStrictEqual({
676 history
: expect
.any(CircularArray
)
679 history
: expect
.any(CircularArray
)
683 history
: expect
.any(CircularArray
)
686 history
: expect
.any(CircularArray
)
690 expect(workerNode
.usage
.runTime
.history
.length
).toBe(0)
691 expect(workerNode
.usage
.waitTime
.history
.length
).toBe(0)
696 it("Verify that pool event emitter 'full' event can register a callback", async () => {
697 const pool
= new DynamicThreadPool(
698 Math
.floor(numberOfWorkers
/ 2),
700 './tests/worker-files/thread/testWorker.js'
702 const promises
= new Set()
705 pool
.emitter
.on(PoolEvents
.full
, info
=> {
709 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
710 promises
.add(pool
.execute())
712 await Promise
.all(promises
)
713 // The `full` event is triggered when the number of submitted tasks at once reach the maximum number of workers in the dynamic pool.
714 // So in total numberOfWorkers * 2 - 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool with min = (max = numberOfWorkers) / 2.
715 expect(poolFull
).toBe(numberOfWorkers
* 2 - 1)
716 expect(poolInfo
).toStrictEqual({
718 type
: PoolTypes
.dynamic
,
719 worker
: WorkerTypes
.thread
,
720 ready
: expect
.any(Boolean
),
721 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
722 minSize
: expect
.any(Number
),
723 maxSize
: expect
.any(Number
),
724 workerNodes
: expect
.any(Number
),
725 idleWorkerNodes
: expect
.any(Number
),
726 busyWorkerNodes
: expect
.any(Number
),
727 executedTasks
: expect
.any(Number
),
728 executingTasks
: expect
.any(Number
),
729 queuedTasks
: expect
.any(Number
),
730 maxQueuedTasks
: expect
.any(Number
),
731 failedTasks
: expect
.any(Number
)
736 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
737 const pool
= new FixedClusterPool(
739 './tests/worker-files/cluster/testWorker.js'
743 pool
.emitter
.on(PoolEvents
.ready
, info
=> {
747 await
waitPoolEvents(pool
, PoolEvents
.ready
, 1)
748 expect(poolReady
).toBe(1)
749 expect(poolInfo
).toStrictEqual({
751 type
: PoolTypes
.fixed
,
752 worker
: WorkerTypes
.cluster
,
754 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
755 minSize
: expect
.any(Number
),
756 maxSize
: expect
.any(Number
),
757 workerNodes
: expect
.any(Number
),
758 idleWorkerNodes
: expect
.any(Number
),
759 busyWorkerNodes
: expect
.any(Number
),
760 executedTasks
: expect
.any(Number
),
761 executingTasks
: expect
.any(Number
),
762 queuedTasks
: expect
.any(Number
),
763 maxQueuedTasks
: expect
.any(Number
),
764 failedTasks
: expect
.any(Number
)
769 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
770 const pool
= new FixedThreadPool(
772 './tests/worker-files/thread/testWorker.js'
774 const promises
= new Set()
777 pool
.emitter
.on(PoolEvents
.busy
, info
=> {
781 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
782 promises
.add(pool
.execute())
784 await Promise
.all(promises
)
785 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
786 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
787 expect(poolBusy
).toBe(numberOfWorkers
+ 1)
788 expect(poolInfo
).toStrictEqual({
790 type
: PoolTypes
.fixed
,
791 worker
: WorkerTypes
.thread
,
792 ready
: expect
.any(Boolean
),
793 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
794 minSize
: expect
.any(Number
),
795 maxSize
: expect
.any(Number
),
796 workerNodes
: expect
.any(Number
),
797 idleWorkerNodes
: expect
.any(Number
),
798 busyWorkerNodes
: expect
.any(Number
),
799 executedTasks
: expect
.any(Number
),
800 executingTasks
: expect
.any(Number
),
801 queuedTasks
: expect
.any(Number
),
802 maxQueuedTasks
: expect
.any(Number
),
803 failedTasks
: expect
.any(Number
)
808 it('Verify that multiple tasks worker is working', async () => {
809 const pool
= new DynamicClusterPool(
810 Math
.floor(numberOfWorkers
/ 2),
812 './tests/worker-files/cluster/testMultiTasksWorker.js'
814 const data
= { n
: 10 }
815 const result0
= await pool
.execute(data
)
816 expect(result0
).toStrictEqual({ ok
: 1 })
817 const result1
= await pool
.execute(data
, 'jsonIntegerSerialization')
818 expect(result1
).toStrictEqual({ ok
: 1 })
819 const result2
= await pool
.execute(data
, 'factorial')
820 expect(result2
).toBe(3628800)
821 const result3
= await pool
.execute(data
, 'fibonacci')
822 expect(result3
).toBe(55)