1 const { expect
} = require('expect')
8 WorkerChoiceStrategies
,
11 } = require('../../../lib')
12 const { CircularArray
} = require('../../../lib/circular-array')
13 const { Queue
} = require('../../../lib/queue')
14 const { version
} = require('../../../package.json')
15 const { waitPoolEvents
} = require('../../test-utils')
17 describe('Abstract pool test suite', () => {
18 const numberOfWorkers
= 2
19 class StubPoolWithIsMain
extends FixedThreadPool
{
25 it('Simulate pool creation from a non main thread/process', () => {
28 new StubPoolWithIsMain(
30 './tests/worker-files/thread/testWorker.js',
32 errorHandler
: e
=> console
.error(e
)
35 ).toThrowError('Cannot start a pool from a worker!')
38 it('Verify that filePath is checked', () => {
39 const expectedError
= new Error(
40 'Please specify a file with a worker implementation'
42 expect(() => new FixedThreadPool(numberOfWorkers
)).toThrowError(
45 expect(() => new FixedThreadPool(numberOfWorkers
, '')).toThrowError(
50 it('Verify that numberOfWorkers is checked', () => {
51 expect(() => new FixedThreadPool()).toThrowError(
52 'Cannot instantiate a pool without specifying the number of workers'
56 it('Verify that a negative number of workers is checked', () => {
59 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
62 'Cannot instantiate a pool with a negative number of workers'
67 it('Verify that a non integer number of workers is checked', () => {
70 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
73 'Cannot instantiate a pool with a non safe integer number of workers'
78 it('Verify that dynamic pool sizing is checked', () => {
81 new DynamicThreadPool(2, 1, './tests/worker-files/thread/testWorker.js')
84 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
89 new DynamicThreadPool(1, 1, './tests/worker-files/thread/testWorker.js')
92 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
97 new DynamicThreadPool(0, 0, './tests/worker-files/thread/testWorker.js')
100 'Cannot instantiate a dynamic pool with a minimum pool size and a maximum pool size equal to zero'
105 it('Verify that pool options are checked', async () => {
106 let pool
= new FixedThreadPool(
108 './tests/worker-files/thread/testWorker.js'
110 expect(pool
.emitter
).toBeDefined()
111 expect(pool
.opts
.enableEvents
).toBe(true)
112 expect(pool
.opts
.restartWorkerOnError
).toBe(true)
113 expect(pool
.opts
.enableTasksQueue
).toBe(false)
114 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
115 expect(pool
.opts
.workerChoiceStrategy
).toBe(
116 WorkerChoiceStrategies
.ROUND_ROBIN
118 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
119 runTime
: { median
: false },
120 waitTime
: { median
: false },
121 elu
: { median
: false }
123 expect(pool
.opts
.messageHandler
).toBeUndefined()
124 expect(pool
.opts
.errorHandler
).toBeUndefined()
125 expect(pool
.opts
.onlineHandler
).toBeUndefined()
126 expect(pool
.opts
.exitHandler
).toBeUndefined()
128 const testHandler
= () => console
.log('test handler executed')
129 pool
= new FixedThreadPool(
131 './tests/worker-files/thread/testWorker.js',
133 workerChoiceStrategy
: WorkerChoiceStrategies
.LEAST_USED
,
134 workerChoiceStrategyOptions
: {
135 runTime
: { median
: true },
136 weights
: { 0: 300, 1: 200 }
139 restartWorkerOnError
: false,
140 enableTasksQueue
: true,
141 tasksQueueOptions
: { concurrency
: 2 },
142 messageHandler
: testHandler
,
143 errorHandler
: testHandler
,
144 onlineHandler
: testHandler
,
145 exitHandler
: testHandler
148 expect(pool
.emitter
).toBeUndefined()
149 expect(pool
.opts
.enableEvents
).toBe(false)
150 expect(pool
.opts
.restartWorkerOnError
).toBe(false)
151 expect(pool
.opts
.enableTasksQueue
).toBe(true)
152 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
153 expect(pool
.opts
.workerChoiceStrategy
).toBe(
154 WorkerChoiceStrategies
.LEAST_USED
156 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
157 runTime
: { median
: true },
158 weights
: { 0: 300, 1: 200 }
160 expect(pool
.opts
.messageHandler
).toStrictEqual(testHandler
)
161 expect(pool
.opts
.errorHandler
).toStrictEqual(testHandler
)
162 expect(pool
.opts
.onlineHandler
).toStrictEqual(testHandler
)
163 expect(pool
.opts
.exitHandler
).toStrictEqual(testHandler
)
167 it('Verify that pool options are validated', async () => {
172 './tests/worker-files/thread/testWorker.js',
174 workerChoiceStrategy
: 'invalidStrategy'
177 ).toThrowError("Invalid worker choice strategy 'invalidStrategy'")
182 './tests/worker-files/thread/testWorker.js',
184 workerChoiceStrategyOptions
: 'invalidOptions'
188 'Invalid worker choice strategy options: must be a plain object'
194 './tests/worker-files/thread/testWorker.js',
196 workerChoiceStrategyOptions
: { weights
: {} }
200 'Invalid worker choice strategy options: must have a weight for each worker node'
206 './tests/worker-files/thread/testWorker.js',
208 workerChoiceStrategyOptions
: { measurement
: 'invalidMeasurement' }
212 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
218 './tests/worker-files/thread/testWorker.js',
220 enableTasksQueue
: true,
221 tasksQueueOptions
: { concurrency
: 0 }
224 ).toThrowError("Invalid worker tasks concurrency '0'")
229 './tests/worker-files/thread/testWorker.js',
231 enableTasksQueue
: true,
232 tasksQueueOptions
: 'invalidTasksQueueOptions'
235 ).toThrowError('Invalid tasks queue options: must be a plain object')
240 './tests/worker-files/thread/testWorker.js',
242 enableTasksQueue
: true,
243 tasksQueueOptions
: { concurrency
: 0.2 }
246 ).toThrowError('Invalid worker tasks concurrency: must be an integer')
249 it('Verify that pool worker choice strategy options can be set', async () => {
250 const pool
= new FixedThreadPool(
252 './tests/worker-files/thread/testWorker.js',
253 { workerChoiceStrategy
: WorkerChoiceStrategies
.FAIR_SHARE
}
255 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
256 runTime
: { median
: false },
257 waitTime
: { median
: false },
258 elu
: { median
: false }
260 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
261 .workerChoiceStrategies
) {
262 expect(workerChoiceStrategy
.opts
).toStrictEqual({
263 runTime
: { median
: false },
264 waitTime
: { median
: false },
265 elu
: { median
: false }
269 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
287 pool
.setWorkerChoiceStrategyOptions({
288 runTime
: { median
: true },
289 elu
: { median
: true }
291 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
292 runTime
: { median
: true },
293 elu
: { median
: true }
295 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
296 .workerChoiceStrategies
) {
297 expect(workerChoiceStrategy
.opts
).toStrictEqual({
298 runTime
: { median
: true },
299 elu
: { median
: true }
303 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
321 pool
.setWorkerChoiceStrategyOptions({
322 runTime
: { median
: false },
323 elu
: { median
: false }
325 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
326 runTime
: { median
: false },
327 elu
: { median
: false }
329 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
330 .workerChoiceStrategies
) {
331 expect(workerChoiceStrategy
.opts
).toStrictEqual({
332 runTime
: { median
: false },
333 elu
: { median
: false }
337 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
356 pool
.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
358 'Invalid worker choice strategy options: must be a plain object'
361 pool
.setWorkerChoiceStrategyOptions({ weights
: {} })
363 'Invalid worker choice strategy options: must have a weight for each worker node'
366 pool
.setWorkerChoiceStrategyOptions({ measurement
: 'invalidMeasurement' })
368 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
373 it('Verify that pool tasks queue can be enabled/disabled', async () => {
374 const pool
= new FixedThreadPool(
376 './tests/worker-files/thread/testWorker.js'
378 expect(pool
.opts
.enableTasksQueue
).toBe(false)
379 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
380 pool
.enableTasksQueue(true)
381 expect(pool
.opts
.enableTasksQueue
).toBe(true)
382 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 1 })
383 pool
.enableTasksQueue(true, { concurrency
: 2 })
384 expect(pool
.opts
.enableTasksQueue
).toBe(true)
385 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
386 pool
.enableTasksQueue(false)
387 expect(pool
.opts
.enableTasksQueue
).toBe(false)
388 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
392 it('Verify that pool tasks queue options can be set', async () => {
393 const pool
= new FixedThreadPool(
395 './tests/worker-files/thread/testWorker.js',
396 { enableTasksQueue
: true }
398 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 1 })
399 pool
.setTasksQueueOptions({ concurrency
: 2 })
400 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
402 pool
.setTasksQueueOptions('invalidTasksQueueOptions')
403 ).toThrowError('Invalid tasks queue options: must be a plain object')
404 expect(() => pool
.setTasksQueueOptions({ concurrency
: 0 })).toThrowError(
405 "Invalid worker tasks concurrency '0'"
407 expect(() => pool
.setTasksQueueOptions({ concurrency
: 0.2 })).toThrowError(
408 'Invalid worker tasks concurrency: must be an integer'
413 it('Verify that pool info is set', async () => {
414 let pool
= new FixedThreadPool(
416 './tests/worker-files/thread/testWorker.js'
418 expect(pool
.info
).toStrictEqual({
420 type
: PoolTypes
.fixed
,
421 worker
: WorkerTypes
.thread
,
423 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
424 minSize
: numberOfWorkers
,
425 maxSize
: numberOfWorkers
,
426 workerNodes
: numberOfWorkers
,
427 idleWorkerNodes
: numberOfWorkers
,
436 pool
= new DynamicClusterPool(
437 Math
.floor(numberOfWorkers
/ 2),
439 './tests/worker-files/cluster/testWorker.js'
441 expect(pool
.info
).toStrictEqual({
443 type
: PoolTypes
.dynamic
,
444 worker
: WorkerTypes
.cluster
,
446 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
447 minSize
: Math
.floor(numberOfWorkers
/ 2),
448 maxSize
: numberOfWorkers
,
449 workerNodes
: Math
.floor(numberOfWorkers
/ 2),
450 idleWorkerNodes
: Math
.floor(numberOfWorkers
/ 2),
461 it('Verify that pool worker tasks usage are initialized', async () => {
462 const pool
= new FixedClusterPool(
464 './tests/worker-files/cluster/testWorker.js'
466 for (const workerNode
of pool
.workerNodes
) {
467 expect(workerNode
.usage
).toStrictEqual({
476 history
: expect
.any(CircularArray
)
479 history
: expect
.any(CircularArray
)
483 history
: expect
.any(CircularArray
)
486 history
: expect
.any(CircularArray
)
494 it('Verify that pool worker tasks queue are initialized', async () => {
495 let pool
= new FixedClusterPool(
497 './tests/worker-files/cluster/testWorker.js'
499 for (const workerNode
of pool
.workerNodes
) {
500 expect(workerNode
.tasksQueue
).toBeDefined()
501 expect(workerNode
.tasksQueue
).toBeInstanceOf(Queue
)
502 expect(workerNode
.tasksQueue
.size
).toBe(0)
503 expect(workerNode
.tasksQueue
.maxSize
).toBe(0)
506 pool
= new DynamicThreadPool(
507 Math
.floor(numberOfWorkers
/ 2),
509 './tests/worker-files/thread/testWorker.js'
511 for (const workerNode
of pool
.workerNodes
) {
512 expect(workerNode
.tasksQueue
).toBeDefined()
513 expect(workerNode
.tasksQueue
).toBeInstanceOf(Queue
)
514 expect(workerNode
.tasksQueue
.size
).toBe(0)
515 expect(workerNode
.tasksQueue
.maxSize
).toBe(0)
519 it('Verify that pool worker info are initialized', async () => {
520 let pool
= new FixedClusterPool(
522 './tests/worker-files/cluster/testWorker.js'
524 for (const workerNode
of pool
.workerNodes
) {
525 expect(workerNode
.info
).toStrictEqual({
526 id
: expect
.any(Number
),
527 type
: WorkerTypes
.cluster
,
533 pool
= new DynamicThreadPool(
534 Math
.floor(numberOfWorkers
/ 2),
536 './tests/worker-files/thread/testWorker.js'
538 for (const workerNode
of pool
.workerNodes
) {
539 expect(workerNode
.info
).toStrictEqual({
540 id
: expect
.any(Number
),
541 type
: WorkerTypes
.thread
,
548 it('Verify that pool worker tasks usage are computed', async () => {
549 const pool
= new FixedClusterPool(
551 './tests/worker-files/cluster/testWorker.js'
553 const promises
= new Set()
554 const maxMultiplier
= 2
555 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
556 promises
.add(pool
.execute())
558 for (const workerNode
of pool
.workerNodes
) {
559 expect(workerNode
.usage
).toStrictEqual({
562 executing
: maxMultiplier
,
568 history
: expect
.any(CircularArray
)
571 history
: expect
.any(CircularArray
)
575 history
: expect
.any(CircularArray
)
578 history
: expect
.any(CircularArray
)
583 await Promise
.all(promises
)
584 for (const workerNode
of pool
.workerNodes
) {
585 expect(workerNode
.usage
).toStrictEqual({
587 executed
: maxMultiplier
,
594 history
: expect
.any(CircularArray
)
597 history
: expect
.any(CircularArray
)
601 history
: expect
.any(CircularArray
)
604 history
: expect
.any(CircularArray
)
612 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
613 const pool
= new DynamicThreadPool(
614 Math
.floor(numberOfWorkers
/ 2),
616 './tests/worker-files/thread/testWorker.js'
618 const promises
= new Set()
619 const maxMultiplier
= 2
620 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
621 promises
.add(pool
.execute())
623 await Promise
.all(promises
)
624 for (const workerNode
of pool
.workerNodes
) {
625 expect(workerNode
.usage
).toStrictEqual({
627 executed
: expect
.any(Number
),
634 history
: expect
.any(CircularArray
)
637 history
: expect
.any(CircularArray
)
641 history
: expect
.any(CircularArray
)
644 history
: expect
.any(CircularArray
)
648 expect(workerNode
.usage
.tasks
.executed
).toBeGreaterThan(0)
649 expect(workerNode
.usage
.tasks
.executed
).toBeLessThanOrEqual(maxMultiplier
)
651 pool
.setWorkerChoiceStrategy(WorkerChoiceStrategies
.FAIR_SHARE
)
652 for (const workerNode
of pool
.workerNodes
) {
653 expect(workerNode
.usage
).toStrictEqual({
662 history
: expect
.any(CircularArray
)
665 history
: expect
.any(CircularArray
)
669 history
: expect
.any(CircularArray
)
672 history
: expect
.any(CircularArray
)
676 expect(workerNode
.usage
.runTime
.history
.length
).toBe(0)
677 expect(workerNode
.usage
.waitTime
.history
.length
).toBe(0)
682 it("Verify that pool event emitter 'full' event can register a callback", async () => {
683 const pool
= new DynamicThreadPool(
684 Math
.floor(numberOfWorkers
/ 2),
686 './tests/worker-files/thread/testWorker.js'
688 const promises
= new Set()
691 pool
.emitter
.on(PoolEvents
.full
, info
=> {
695 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
696 promises
.add(pool
.execute())
698 await Promise
.all(promises
)
699 // The `full` event is triggered when the number of submitted tasks at once reach the maximum number of workers in the dynamic pool.
700 // So in total numberOfWorkers * 2 - 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool with min = (max = numberOfWorkers) / 2.
701 expect(poolFull
).toBe(numberOfWorkers
* 2 - 1)
702 expect(poolInfo
).toStrictEqual({
704 type
: PoolTypes
.dynamic
,
705 worker
: WorkerTypes
.thread
,
706 ready
: expect
.any(Boolean
),
707 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
708 minSize
: expect
.any(Number
),
709 maxSize
: expect
.any(Number
),
710 workerNodes
: expect
.any(Number
),
711 idleWorkerNodes
: expect
.any(Number
),
712 busyWorkerNodes
: expect
.any(Number
),
713 executedTasks
: expect
.any(Number
),
714 executingTasks
: expect
.any(Number
),
715 queuedTasks
: expect
.any(Number
),
716 maxQueuedTasks
: expect
.any(Number
),
717 failedTasks
: expect
.any(Number
)
722 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
723 const pool
= new DynamicClusterPool(
724 Math
.floor(numberOfWorkers
/ 2),
726 './tests/worker-files/cluster/testWorker.js'
730 pool
.emitter
.on(PoolEvents
.ready
, info
=> {
734 await
waitPoolEvents(pool
, PoolEvents
.ready
, 1)
735 expect(poolReady
).toBe(1)
736 expect(poolInfo
).toStrictEqual({
738 type
: PoolTypes
.dynamic
,
739 worker
: WorkerTypes
.cluster
,
741 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
742 minSize
: expect
.any(Number
),
743 maxSize
: expect
.any(Number
),
744 workerNodes
: expect
.any(Number
),
745 idleWorkerNodes
: expect
.any(Number
),
746 busyWorkerNodes
: expect
.any(Number
),
747 executedTasks
: expect
.any(Number
),
748 executingTasks
: expect
.any(Number
),
749 queuedTasks
: expect
.any(Number
),
750 maxQueuedTasks
: expect
.any(Number
),
751 failedTasks
: expect
.any(Number
)
756 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
757 const pool
= new FixedThreadPool(
759 './tests/worker-files/thread/testWorker.js'
761 const promises
= new Set()
764 pool
.emitter
.on(PoolEvents
.busy
, info
=> {
768 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
769 promises
.add(pool
.execute())
771 await Promise
.all(promises
)
772 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
773 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
774 expect(poolBusy
).toBe(numberOfWorkers
+ 1)
775 expect(poolInfo
).toStrictEqual({
777 type
: PoolTypes
.fixed
,
778 worker
: WorkerTypes
.thread
,
779 ready
: expect
.any(Boolean
),
780 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
781 minSize
: expect
.any(Number
),
782 maxSize
: expect
.any(Number
),
783 workerNodes
: expect
.any(Number
),
784 idleWorkerNodes
: expect
.any(Number
),
785 busyWorkerNodes
: expect
.any(Number
),
786 executedTasks
: expect
.any(Number
),
787 executingTasks
: expect
.any(Number
),
788 queuedTasks
: expect
.any(Number
),
789 maxQueuedTasks
: expect
.any(Number
),
790 failedTasks
: expect
.any(Number
)
795 it('Verify that multiple tasks worker is working', async () => {
796 const pool
= new DynamicClusterPool(
797 Math
.floor(numberOfWorkers
/ 2),
799 './tests/worker-files/cluster/testMultiTasksWorker.js'
801 const data
= { n
: 10 }
802 const result0
= await pool
.execute(data
)
803 expect(result0
).toStrictEqual({ ok
: 1 })
804 const result1
= await pool
.execute(data
, 'jsonIntegerSerialization')
805 expect(result1
).toStrictEqual({ ok
: 1 })
806 const result2
= await pool
.execute(data
, 'factorial')
807 expect(result2
).toBe(3628800)
808 const result3
= await pool
.execute(data
, 'fibonacci')
809 expect(result3
).toBe(55)