1 const { expect
} = require('expect')
8 WorkerChoiceStrategies
,
11 } = require('../../../lib')
12 const { CircularArray
} = require('../../../lib/circular-array')
13 const { Queue
} = require('../../../lib/queue')
14 const { version
} = require('../../../package.json')
16 describe('Abstract pool test suite', () => {
17 const numberOfWorkers
= 2
18 class StubPoolWithRemoveAllWorker
extends FixedThreadPool
{
21 this.promiseResponseMap
.clear()
24 class StubPoolWithIsMain
extends FixedThreadPool
{
30 it('Simulate pool creation from a non main thread/process', () => {
33 new StubPoolWithIsMain(
35 './tests/worker-files/thread/testWorker.js',
37 errorHandler
: e
=> console
.error(e
)
40 ).toThrowError('Cannot start a pool from a worker!')
43 it('Verify that filePath is checked', () => {
44 const expectedError
= new Error(
45 'Please specify a file with a worker implementation'
47 expect(() => new FixedThreadPool(numberOfWorkers
)).toThrowError(
50 expect(() => new FixedThreadPool(numberOfWorkers
, '')).toThrowError(
55 it('Verify that numberOfWorkers is checked', () => {
56 expect(() => new FixedThreadPool()).toThrowError(
57 'Cannot instantiate a pool without specifying the number of workers'
61 it('Verify that a negative number of workers is checked', () => {
64 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
67 'Cannot instantiate a pool with a negative number of workers'
72 it('Verify that a non integer number of workers is checked', () => {
75 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
78 'Cannot instantiate a pool with a non safe integer number of workers'
83 it('Verify that pool options are checked', async () => {
84 let pool
= new FixedThreadPool(
86 './tests/worker-files/thread/testWorker.js'
88 expect(pool
.emitter
).toBeDefined()
89 expect(pool
.opts
.enableEvents
).toBe(true)
90 expect(pool
.opts
.restartWorkerOnError
).toBe(true)
91 expect(pool
.opts
.enableTasksQueue
).toBe(false)
92 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
93 expect(pool
.opts
.workerChoiceStrategy
).toBe(
94 WorkerChoiceStrategies
.ROUND_ROBIN
96 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
97 runTime
: { median
: false },
98 waitTime
: { median
: false },
99 elu
: { median
: false }
101 expect(pool
.opts
.messageHandler
).toBeUndefined()
102 expect(pool
.opts
.errorHandler
).toBeUndefined()
103 expect(pool
.opts
.onlineHandler
).toBeUndefined()
104 expect(pool
.opts
.exitHandler
).toBeUndefined()
106 const testHandler
= () => console
.log('test handler executed')
107 pool
= new FixedThreadPool(
109 './tests/worker-files/thread/testWorker.js',
111 workerChoiceStrategy
: WorkerChoiceStrategies
.LEAST_USED
,
112 workerChoiceStrategyOptions
: {
113 runTime
: { median
: true },
114 weights
: { 0: 300, 1: 200 }
117 restartWorkerOnError
: false,
118 enableTasksQueue
: true,
119 tasksQueueOptions
: { concurrency
: 2 },
120 messageHandler
: testHandler
,
121 errorHandler
: testHandler
,
122 onlineHandler
: testHandler
,
123 exitHandler
: testHandler
126 expect(pool
.emitter
).toBeUndefined()
127 expect(pool
.opts
.enableEvents
).toBe(false)
128 expect(pool
.opts
.restartWorkerOnError
).toBe(false)
129 expect(pool
.opts
.enableTasksQueue
).toBe(true)
130 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
131 expect(pool
.opts
.workerChoiceStrategy
).toBe(
132 WorkerChoiceStrategies
.LEAST_USED
134 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
135 runTime
: { median
: true },
136 weights
: { 0: 300, 1: 200 }
138 expect(pool
.opts
.messageHandler
).toStrictEqual(testHandler
)
139 expect(pool
.opts
.errorHandler
).toStrictEqual(testHandler
)
140 expect(pool
.opts
.onlineHandler
).toStrictEqual(testHandler
)
141 expect(pool
.opts
.exitHandler
).toStrictEqual(testHandler
)
145 it('Verify that pool options are validated', async () => {
150 './tests/worker-files/thread/testWorker.js',
152 workerChoiceStrategy
: 'invalidStrategy'
155 ).toThrowError("Invalid worker choice strategy 'invalidStrategy'")
160 './tests/worker-files/thread/testWorker.js',
162 workerChoiceStrategyOptions
: 'invalidOptions'
166 'Invalid worker choice strategy options: must be a plain object'
172 './tests/worker-files/thread/testWorker.js',
174 workerChoiceStrategyOptions
: { weights
: {} }
178 'Invalid worker choice strategy options: must have a weight for each worker node'
184 './tests/worker-files/thread/testWorker.js',
186 workerChoiceStrategyOptions
: { measurement
: 'invalidMeasurement' }
190 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
196 './tests/worker-files/thread/testWorker.js',
198 enableTasksQueue
: true,
199 tasksQueueOptions
: { concurrency
: 0 }
202 ).toThrowError("Invalid worker tasks concurrency '0'")
207 './tests/worker-files/thread/testWorker.js',
209 enableTasksQueue
: true,
210 tasksQueueOptions
: 'invalidTasksQueueOptions'
213 ).toThrowError('Invalid tasks queue options: must be a plain object')
218 './tests/worker-files/thread/testWorker.js',
220 enableTasksQueue
: true,
221 tasksQueueOptions
: { concurrency
: 0.2 }
224 ).toThrowError('Invalid worker tasks concurrency: must be an integer')
227 it('Verify that worker choice strategy options can be set', async () => {
228 const pool
= new FixedThreadPool(
230 './tests/worker-files/thread/testWorker.js',
231 { workerChoiceStrategy
: WorkerChoiceStrategies
.FAIR_SHARE
}
233 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
234 runTime
: { median
: false },
235 waitTime
: { median
: false },
236 elu
: { median
: false }
238 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
239 .workerChoiceStrategies
) {
240 expect(workerChoiceStrategy
.opts
).toStrictEqual({
241 runTime
: { median
: false },
242 waitTime
: { median
: false },
243 elu
: { median
: false }
247 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
265 pool
.setWorkerChoiceStrategyOptions({
266 runTime
: { median
: true },
267 elu
: { median
: true }
269 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
270 runTime
: { median
: true },
271 elu
: { median
: true }
273 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
274 .workerChoiceStrategies
) {
275 expect(workerChoiceStrategy
.opts
).toStrictEqual({
276 runTime
: { median
: true },
277 elu
: { median
: true }
281 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
299 pool
.setWorkerChoiceStrategyOptions({
300 runTime
: { median
: false },
301 elu
: { median
: false }
303 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
304 runTime
: { median
: false },
305 elu
: { median
: false }
307 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
308 .workerChoiceStrategies
) {
309 expect(workerChoiceStrategy
.opts
).toStrictEqual({
310 runTime
: { median
: false },
311 elu
: { median
: false }
315 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
334 pool
.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
336 'Invalid worker choice strategy options: must be a plain object'
339 pool
.setWorkerChoiceStrategyOptions({ weights
: {} })
341 'Invalid worker choice strategy options: must have a weight for each worker node'
344 pool
.setWorkerChoiceStrategyOptions({ measurement
: 'invalidMeasurement' })
346 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
351 it('Verify that tasks queue can be enabled/disabled', async () => {
352 const pool
= new FixedThreadPool(
354 './tests/worker-files/thread/testWorker.js'
356 expect(pool
.opts
.enableTasksQueue
).toBe(false)
357 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
358 pool
.enableTasksQueue(true)
359 expect(pool
.opts
.enableTasksQueue
).toBe(true)
360 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 1 })
361 pool
.enableTasksQueue(true, { concurrency
: 2 })
362 expect(pool
.opts
.enableTasksQueue
).toBe(true)
363 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
364 pool
.enableTasksQueue(false)
365 expect(pool
.opts
.enableTasksQueue
).toBe(false)
366 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
370 it('Verify that tasks queue options can be set', async () => {
371 const pool
= new FixedThreadPool(
373 './tests/worker-files/thread/testWorker.js',
374 { enableTasksQueue
: true }
376 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 1 })
377 pool
.setTasksQueueOptions({ concurrency
: 2 })
378 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
380 pool
.setTasksQueueOptions('invalidTasksQueueOptions')
381 ).toThrowError('Invalid tasks queue options: must be a plain object')
382 expect(() => pool
.setTasksQueueOptions({ concurrency
: 0 })).toThrowError(
383 "Invalid worker tasks concurrency '0'"
385 expect(() => pool
.setTasksQueueOptions({ concurrency
: 0.2 })).toThrowError(
386 'Invalid worker tasks concurrency: must be an integer'
391 it('Verify that pool info is set', async () => {
392 let pool
= new FixedThreadPool(
394 './tests/worker-files/thread/testWorker.js'
396 expect(pool
.info
).toStrictEqual({
398 type
: PoolTypes
.fixed
,
399 worker
: WorkerTypes
.thread
,
400 minSize
: numberOfWorkers
,
401 maxSize
: numberOfWorkers
,
402 workerNodes
: numberOfWorkers
,
403 idleWorkerNodes
: numberOfWorkers
,
412 pool
= new DynamicClusterPool(
415 './tests/worker-files/cluster/testWorker.js'
417 expect(pool
.info
).toStrictEqual({
419 type
: PoolTypes
.dynamic
,
420 worker
: WorkerTypes
.cluster
,
421 minSize
: numberOfWorkers
,
422 maxSize
: numberOfWorkers
* 2,
423 workerNodes
: numberOfWorkers
,
424 idleWorkerNodes
: numberOfWorkers
,
435 it('Simulate worker not found', async () => {
436 const pool
= new StubPoolWithRemoveAllWorker(
438 './tests/worker-files/thread/testWorker.js',
440 errorHandler
: e
=> console
.error(e
)
443 expect(pool
.workerNodes
.length
).toBe(numberOfWorkers
)
444 // Simulate worker not found.
445 pool
.removeAllWorker()
446 expect(pool
.workerNodes
.length
).toBe(0)
450 it('Verify that worker pool tasks usage are initialized', async () => {
451 const pool
= new FixedClusterPool(
453 './tests/worker-files/cluster/testWorker.js'
455 for (const workerNode
of pool
.workerNodes
) {
456 expect(workerNode
.usage
).toStrictEqual({
470 history
: expect
.any(CircularArray
)
478 history
: expect
.any(CircularArray
)
487 history
: expect
.any(CircularArray
)
495 history
: expect
.any(CircularArray
)
503 it('Verify that worker pool tasks queue are initialized', async () => {
504 const pool
= new FixedClusterPool(
506 './tests/worker-files/cluster/testWorker.js'
508 for (const workerNode
of pool
.workerNodes
) {
509 expect(workerNode
.tasksQueue
).toBeDefined()
510 expect(workerNode
.tasksQueue
).toBeInstanceOf(Queue
)
511 expect(workerNode
.tasksQueue
.size
).toBe(0)
512 expect(workerNode
.tasksQueue
.maxSize
).toBe(0)
517 it('Verify that worker pool tasks usage are computed', async () => {
518 const pool
= new FixedClusterPool(
520 './tests/worker-files/cluster/testWorker.js'
522 const promises
= new Set()
523 const maxMultiplier
= 2
524 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
525 promises
.add(pool
.execute())
527 for (const workerNode
of pool
.workerNodes
) {
528 expect(workerNode
.usage
).toStrictEqual({
531 executing
: maxMultiplier
,
542 history
: expect
.any(CircularArray
)
550 history
: expect
.any(CircularArray
)
559 history
: expect
.any(CircularArray
)
567 history
: expect
.any(CircularArray
)
572 await Promise
.all(promises
)
573 for (const workerNode
of pool
.workerNodes
) {
574 expect(workerNode
.usage
).toStrictEqual({
576 executed
: maxMultiplier
,
588 history
: expect
.any(CircularArray
)
596 history
: expect
.any(CircularArray
)
605 history
: expect
.any(CircularArray
)
613 history
: expect
.any(CircularArray
)
621 it('Verify that worker pool tasks usage are reset at worker choice strategy change', async () => {
622 const pool
= new DynamicThreadPool(
625 './tests/worker-files/thread/testWorker.js'
627 const promises
= new Set()
628 const maxMultiplier
= 2
629 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
630 promises
.add(pool
.execute())
632 await Promise
.all(promises
)
633 for (const workerNode
of pool
.workerNodes
) {
634 expect(workerNode
.usage
).toStrictEqual({
636 executed
: expect
.any(Number
),
648 history
: expect
.any(CircularArray
)
656 history
: expect
.any(CircularArray
)
665 history
: expect
.any(CircularArray
)
673 history
: expect
.any(CircularArray
)
677 expect(workerNode
.usage
.tasks
.executed
).toBeGreaterThan(0)
678 expect(workerNode
.usage
.tasks
.executed
).toBeLessThanOrEqual(maxMultiplier
)
680 pool
.setWorkerChoiceStrategy(WorkerChoiceStrategies
.FAIR_SHARE
)
681 for (const workerNode
of pool
.workerNodes
) {
682 expect(workerNode
.usage
).toStrictEqual({
696 history
: expect
.any(CircularArray
)
704 history
: expect
.any(CircularArray
)
713 history
: expect
.any(CircularArray
)
721 history
: expect
.any(CircularArray
)
725 expect(workerNode
.usage
.runTime
.history
.length
).toBe(0)
726 expect(workerNode
.usage
.waitTime
.history
.length
).toBe(0)
731 it("Verify that pool event emitter 'full' event can register a callback", async () => {
732 const pool
= new DynamicThreadPool(
735 './tests/worker-files/thread/testWorker.js'
737 const promises
= new Set()
740 pool
.emitter
.on(PoolEvents
.full
, info
=> {
744 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
745 promises
.add(pool
.execute())
747 await Promise
.all(promises
)
748 // The `full` event is triggered when the number of submitted tasks at once reach the max number of workers in the dynamic pool.
749 // So in total numberOfWorkers * 2 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool with min = max = numberOfWorkers.
750 expect(poolFull
).toBe(numberOfWorkers
* 2)
751 expect(poolInfo
).toStrictEqual({
753 type
: PoolTypes
.dynamic
,
754 worker
: WorkerTypes
.thread
,
755 minSize
: expect
.any(Number
),
756 maxSize
: expect
.any(Number
),
757 workerNodes
: expect
.any(Number
),
758 idleWorkerNodes
: expect
.any(Number
),
759 busyWorkerNodes
: expect
.any(Number
),
760 executedTasks
: expect
.any(Number
),
761 executingTasks
: expect
.any(Number
),
762 queuedTasks
: expect
.any(Number
),
763 maxQueuedTasks
: expect
.any(Number
),
764 failedTasks
: expect
.any(Number
)
769 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
770 const pool
= new FixedThreadPool(
772 './tests/worker-files/thread/testWorker.js'
774 const promises
= new Set()
777 pool
.emitter
.on(PoolEvents
.busy
, info
=> {
781 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
782 promises
.add(pool
.execute())
784 await Promise
.all(promises
)
785 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
786 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
787 expect(poolBusy
).toBe(numberOfWorkers
+ 1)
788 expect(poolInfo
).toStrictEqual({
790 type
: PoolTypes
.fixed
,
791 worker
: WorkerTypes
.thread
,
792 minSize
: expect
.any(Number
),
793 maxSize
: expect
.any(Number
),
794 workerNodes
: expect
.any(Number
),
795 idleWorkerNodes
: expect
.any(Number
),
796 busyWorkerNodes
: expect
.any(Number
),
797 executedTasks
: expect
.any(Number
),
798 executingTasks
: expect
.any(Number
),
799 queuedTasks
: expect
.any(Number
),
800 maxQueuedTasks
: expect
.any(Number
),
801 failedTasks
: expect
.any(Number
)
806 it('Verify that multiple tasks worker is working', async () => {
807 const pool
= new DynamicClusterPool(
810 './tests/worker-files/cluster/testMultiTasksWorker.js'
812 const data
= { n
: 10 }
813 const result0
= await pool
.execute(data
)
814 expect(result0
).toBe(false)
815 const result1
= await pool
.execute(data
, 'jsonIntegerSerialization')
816 expect(result1
).toBe(false)
817 const result2
= await pool
.execute(data
, 'factorial')
818 expect(result2
).toBe(3628800)
819 const result3
= await pool
.execute(data
, 'fibonacci')
820 expect(result3
).toBe(55)