1 const { expect
} = require('expect')
8 WorkerChoiceStrategies
,
11 } = require('../../../lib')
12 const { CircularArray
} = require('../../../lib/circular-array')
13 const { Queue
} = require('../../../lib/queue')
14 const { version
} = require('../../../package.json')
16 describe('Abstract pool test suite', () => {
17 const numberOfWorkers
= 2
18 class StubPoolWithRemoveAllWorker
extends FixedThreadPool
{
21 this.promiseResponseMap
.clear()
24 class StubPoolWithIsMain
extends FixedThreadPool
{
30 it('Simulate pool creation from a non main thread/process', () => {
33 new StubPoolWithIsMain(
35 './tests/worker-files/thread/testWorker.js',
37 errorHandler
: e
=> console
.error(e
)
40 ).toThrowError('Cannot start a pool from a worker!')
43 it('Verify that filePath is checked', () => {
44 const expectedError
= new Error(
45 'Please specify a file with a worker implementation'
47 expect(() => new FixedThreadPool(numberOfWorkers
)).toThrowError(
50 expect(() => new FixedThreadPool(numberOfWorkers
, '')).toThrowError(
55 it('Verify that numberOfWorkers is checked', () => {
56 expect(() => new FixedThreadPool()).toThrowError(
57 'Cannot instantiate a pool without specifying the number of workers'
61 it('Verify that a negative number of workers is checked', () => {
64 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
67 'Cannot instantiate a pool with a negative number of workers'
72 it('Verify that a non integer number of workers is checked', () => {
75 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
78 'Cannot instantiate a pool with a non safe integer number of workers'
83 it('Verify that pool options are checked', async () => {
84 let pool
= new FixedThreadPool(
86 './tests/worker-files/thread/testWorker.js'
88 expect(pool
.emitter
).toBeDefined()
89 expect(pool
.opts
.enableEvents
).toBe(true)
90 expect(pool
.opts
.restartWorkerOnError
).toBe(true)
91 expect(pool
.opts
.enableTasksQueue
).toBe(false)
92 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
93 expect(pool
.opts
.workerChoiceStrategy
).toBe(
94 WorkerChoiceStrategies
.ROUND_ROBIN
96 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
97 runTime
: { median
: false },
98 waitTime
: { median
: false },
99 elu
: { median
: false }
101 expect(pool
.opts
.messageHandler
).toBeUndefined()
102 expect(pool
.opts
.errorHandler
).toBeUndefined()
103 expect(pool
.opts
.onlineHandler
).toBeUndefined()
104 expect(pool
.opts
.exitHandler
).toBeUndefined()
106 const testHandler
= () => console
.log('test handler executed')
107 pool
= new FixedThreadPool(
109 './tests/worker-files/thread/testWorker.js',
111 workerChoiceStrategy
: WorkerChoiceStrategies
.LEAST_USED
,
112 workerChoiceStrategyOptions
: {
113 runTime
: { median
: true },
114 weights
: { 0: 300, 1: 200 }
117 restartWorkerOnError
: false,
118 enableTasksQueue
: true,
119 tasksQueueOptions
: { concurrency
: 2 },
120 messageHandler
: testHandler
,
121 errorHandler
: testHandler
,
122 onlineHandler
: testHandler
,
123 exitHandler
: testHandler
126 expect(pool
.emitter
).toBeUndefined()
127 expect(pool
.opts
.enableEvents
).toBe(false)
128 expect(pool
.opts
.restartWorkerOnError
).toBe(false)
129 expect(pool
.opts
.enableTasksQueue
).toBe(true)
130 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
131 expect(pool
.opts
.workerChoiceStrategy
).toBe(
132 WorkerChoiceStrategies
.LEAST_USED
134 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
135 runTime
: { median
: true },
136 weights
: { 0: 300, 1: 200 }
138 expect(pool
.opts
.messageHandler
).toStrictEqual(testHandler
)
139 expect(pool
.opts
.errorHandler
).toStrictEqual(testHandler
)
140 expect(pool
.opts
.onlineHandler
).toStrictEqual(testHandler
)
141 expect(pool
.opts
.exitHandler
).toStrictEqual(testHandler
)
145 it('Verify that pool options are validated', async () => {
150 './tests/worker-files/thread/testWorker.js',
152 workerChoiceStrategy
: 'invalidStrategy'
155 ).toThrowError("Invalid worker choice strategy 'invalidStrategy'")
160 './tests/worker-files/thread/testWorker.js',
162 workerChoiceStrategyOptions
: 'invalidOptions'
166 'Invalid worker choice strategy options: must be a plain object'
172 './tests/worker-files/thread/testWorker.js',
174 workerChoiceStrategyOptions
: { weights
: {} }
178 'Invalid worker choice strategy options: must have a weight for each worker node'
184 './tests/worker-files/thread/testWorker.js',
186 workerChoiceStrategyOptions
: { measurement
: 'invalidMeasurement' }
190 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
196 './tests/worker-files/thread/testWorker.js',
198 enableTasksQueue
: true,
199 tasksQueueOptions
: { concurrency
: 0 }
202 ).toThrowError("Invalid worker tasks concurrency '0'")
207 './tests/worker-files/thread/testWorker.js',
209 enableTasksQueue
: true,
210 tasksQueueOptions
: 'invalidTasksQueueOptions'
213 ).toThrowError('Invalid tasks queue options: must be a plain object')
218 './tests/worker-files/thread/testWorker.js',
220 enableTasksQueue
: true,
221 tasksQueueOptions
: { concurrency
: 0.2 }
224 ).toThrowError('Invalid worker tasks concurrency: must be an integer')
227 it('Verify that worker choice strategy options can be set', async () => {
228 const pool
= new FixedThreadPool(
230 './tests/worker-files/thread/testWorker.js',
231 { workerChoiceStrategy
: WorkerChoiceStrategies
.FAIR_SHARE
}
233 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
234 runTime
: { median
: false },
235 waitTime
: { median
: false },
236 elu
: { median
: false }
238 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
239 .workerChoiceStrategies
) {
240 expect(workerChoiceStrategy
.opts
).toStrictEqual({
241 runTime
: { median
: false },
242 waitTime
: { median
: false },
243 elu
: { median
: false }
247 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
265 pool
.setWorkerChoiceStrategyOptions({
266 runTime
: { median
: true },
267 elu
: { median
: true }
269 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
270 runTime
: { median
: true },
271 elu
: { median
: true }
273 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
274 .workerChoiceStrategies
) {
275 expect(workerChoiceStrategy
.opts
).toStrictEqual({
276 runTime
: { median
: true },
277 elu
: { median
: true }
281 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
299 pool
.setWorkerChoiceStrategyOptions({
300 runTime
: { median
: false },
301 elu
: { median
: false }
303 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
304 runTime
: { median
: false },
305 elu
: { median
: false }
307 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
308 .workerChoiceStrategies
) {
309 expect(workerChoiceStrategy
.opts
).toStrictEqual({
310 runTime
: { median
: false },
311 elu
: { median
: false }
315 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
334 pool
.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
336 'Invalid worker choice strategy options: must be a plain object'
339 pool
.setWorkerChoiceStrategyOptions({ weights
: {} })
341 'Invalid worker choice strategy options: must have a weight for each worker node'
344 pool
.setWorkerChoiceStrategyOptions({ measurement
: 'invalidMeasurement' })
346 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
351 it('Verify that tasks queue can be enabled/disabled', async () => {
352 const pool
= new FixedThreadPool(
354 './tests/worker-files/thread/testWorker.js'
356 expect(pool
.opts
.enableTasksQueue
).toBe(false)
357 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
358 pool
.enableTasksQueue(true)
359 expect(pool
.opts
.enableTasksQueue
).toBe(true)
360 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 1 })
361 pool
.enableTasksQueue(true, { concurrency
: 2 })
362 expect(pool
.opts
.enableTasksQueue
).toBe(true)
363 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
364 pool
.enableTasksQueue(false)
365 expect(pool
.opts
.enableTasksQueue
).toBe(false)
366 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
370 it('Verify that tasks queue options can be set', async () => {
371 const pool
= new FixedThreadPool(
373 './tests/worker-files/thread/testWorker.js',
374 { enableTasksQueue
: true }
376 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 1 })
377 pool
.setTasksQueueOptions({ concurrency
: 2 })
378 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
380 pool
.setTasksQueueOptions('invalidTasksQueueOptions')
381 ).toThrowError('Invalid tasks queue options: must be a plain object')
382 expect(() => pool
.setTasksQueueOptions({ concurrency
: 0 })).toThrowError(
383 "Invalid worker tasks concurrency '0'"
385 expect(() => pool
.setTasksQueueOptions({ concurrency
: 0.2 })).toThrowError(
386 'Invalid worker tasks concurrency: must be an integer'
391 it('Verify that pool info is set', async () => {
392 let pool
= new FixedThreadPool(
394 './tests/worker-files/thread/testWorker.js'
396 expect(pool
.info
).toStrictEqual({
398 type
: PoolTypes
.fixed
,
399 worker
: WorkerTypes
.thread
,
400 minSize
: numberOfWorkers
,
401 maxSize
: numberOfWorkers
,
402 workerNodes
: numberOfWorkers
,
403 idleWorkerNodes
: numberOfWorkers
,
412 pool
= new DynamicClusterPool(
415 './tests/worker-files/cluster/testWorker.js'
417 expect(pool
.info
).toStrictEqual({
419 type
: PoolTypes
.dynamic
,
420 worker
: WorkerTypes
.cluster
,
421 minSize
: numberOfWorkers
,
422 maxSize
: numberOfWorkers
* 2,
423 workerNodes
: numberOfWorkers
,
424 idleWorkerNodes
: numberOfWorkers
,
435 it('Simulate worker not found', async () => {
436 const pool
= new StubPoolWithRemoveAllWorker(
438 './tests/worker-files/thread/testWorker.js',
440 errorHandler
: e
=> console
.error(e
)
443 expect(pool
.workerNodes
.length
).toBe(numberOfWorkers
)
444 // Simulate worker not found.
445 pool
.removeAllWorker()
446 expect(pool
.workerNodes
.length
).toBe(0)
450 it('Verify that worker pool tasks usage are initialized', async () => {
451 const pool
= new FixedClusterPool(
453 './tests/worker-files/cluster/testWorker.js'
455 for (const workerNode
of pool
.workerNodes
) {
456 expect(workerNode
.usage
).toStrictEqual({
468 history
: expect
.any(CircularArray
)
474 history
: expect
.any(CircularArray
)
481 history
: expect
.any(CircularArray
)
487 history
: expect
.any(CircularArray
)
496 it('Verify that worker pool tasks queue are initialized', async () => {
497 const pool
= new FixedClusterPool(
499 './tests/worker-files/cluster/testWorker.js'
501 for (const workerNode
of pool
.workerNodes
) {
502 expect(workerNode
.tasksQueue
).toBeDefined()
503 expect(workerNode
.tasksQueue
).toBeInstanceOf(Queue
)
504 expect(workerNode
.tasksQueue
.size
).toBe(0)
505 expect(workerNode
.tasksQueue
.maxSize
).toBe(0)
510 it('Verify that worker pool tasks usage are computed', async () => {
511 const pool
= new FixedClusterPool(
513 './tests/worker-files/cluster/testWorker.js'
515 const promises
= new Set()
516 const maxMultiplier
= 2
517 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
518 promises
.add(pool
.execute())
520 for (const workerNode
of pool
.workerNodes
) {
521 expect(workerNode
.usage
).toStrictEqual({
524 executing
: maxMultiplier
,
533 history
: expect
.any(CircularArray
)
539 history
: expect
.any(CircularArray
)
546 history
: expect
.any(CircularArray
)
552 history
: expect
.any(CircularArray
)
558 await Promise
.all(promises
)
559 for (const workerNode
of pool
.workerNodes
) {
560 expect(workerNode
.usage
).toStrictEqual({
562 executed
: maxMultiplier
,
572 history
: expect
.any(CircularArray
)
578 history
: expect
.any(CircularArray
)
585 history
: expect
.any(CircularArray
)
591 history
: expect
.any(CircularArray
)
600 it('Verify that worker pool tasks usage are reset at worker choice strategy change', async () => {
601 const pool
= new DynamicThreadPool(
604 './tests/worker-files/thread/testWorker.js'
606 const promises
= new Set()
607 const maxMultiplier
= 2
608 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
609 promises
.add(pool
.execute())
611 await Promise
.all(promises
)
612 for (const workerNode
of pool
.workerNodes
) {
613 expect(workerNode
.usage
).toStrictEqual({
615 executed
: expect
.any(Number
),
625 history
: expect
.any(CircularArray
)
631 history
: expect
.any(CircularArray
)
638 history
: expect
.any(CircularArray
)
644 history
: expect
.any(CircularArray
)
649 expect(workerNode
.usage
.tasks
.executed
).toBeGreaterThan(0)
650 expect(workerNode
.usage
.tasks
.executed
).toBeLessThanOrEqual(maxMultiplier
)
652 pool
.setWorkerChoiceStrategy(WorkerChoiceStrategies
.FAIR_SHARE
)
653 for (const workerNode
of pool
.workerNodes
) {
654 expect(workerNode
.usage
).toStrictEqual({
666 history
: expect
.any(CircularArray
)
672 history
: expect
.any(CircularArray
)
679 history
: expect
.any(CircularArray
)
685 history
: expect
.any(CircularArray
)
690 expect(workerNode
.usage
.runTime
.history
.length
).toBe(0)
691 expect(workerNode
.usage
.waitTime
.history
.length
).toBe(0)
696 it("Verify that pool event emitter 'full' event can register a callback", async () => {
697 const pool
= new DynamicThreadPool(
700 './tests/worker-files/thread/testWorker.js'
702 const promises
= new Set()
705 pool
.emitter
.on(PoolEvents
.full
, info
=> {
709 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
710 promises
.add(pool
.execute())
712 await Promise
.all(promises
)
713 // The `full` event is triggered when the number of submitted tasks at once reach the max number of workers in the dynamic pool.
714 // So in total numberOfWorkers * 2 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool with min = max = numberOfWorkers.
715 expect(poolFull
).toBe(numberOfWorkers
* 2)
716 expect(poolInfo
).toStrictEqual({
718 type
: PoolTypes
.dynamic
,
719 worker
: WorkerTypes
.thread
,
720 minSize
: expect
.any(Number
),
721 maxSize
: expect
.any(Number
),
722 workerNodes
: expect
.any(Number
),
723 idleWorkerNodes
: expect
.any(Number
),
724 busyWorkerNodes
: expect
.any(Number
),
725 executedTasks
: expect
.any(Number
),
726 executingTasks
: expect
.any(Number
),
727 queuedTasks
: expect
.any(Number
),
728 maxQueuedTasks
: expect
.any(Number
),
729 failedTasks
: expect
.any(Number
)
734 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
735 const pool
= new FixedThreadPool(
737 './tests/worker-files/thread/testWorker.js'
739 const promises
= new Set()
742 pool
.emitter
.on(PoolEvents
.busy
, info
=> {
746 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
747 promises
.add(pool
.execute())
749 await Promise
.all(promises
)
750 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
751 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
752 expect(poolBusy
).toBe(numberOfWorkers
+ 1)
753 expect(poolInfo
).toStrictEqual({
755 type
: PoolTypes
.fixed
,
756 worker
: WorkerTypes
.thread
,
757 minSize
: expect
.any(Number
),
758 maxSize
: expect
.any(Number
),
759 workerNodes
: expect
.any(Number
),
760 idleWorkerNodes
: expect
.any(Number
),
761 busyWorkerNodes
: expect
.any(Number
),
762 executedTasks
: expect
.any(Number
),
763 executingTasks
: expect
.any(Number
),
764 queuedTasks
: expect
.any(Number
),
765 maxQueuedTasks
: expect
.any(Number
),
766 failedTasks
: expect
.any(Number
)
771 it('Verify that multiple tasks worker is working', async () => {
772 const pool
= new DynamicClusterPool(
775 './tests/worker-files/cluster/testMultiTasksWorker.js'
777 const data
= { n
: 10 }
778 const result0
= await pool
.execute(data
)
779 expect(result0
).toBe(false)
780 const result1
= await pool
.execute(data
, 'jsonIntegerSerialization')
781 expect(result1
).toBe(false)
782 const result2
= await pool
.execute(data
, 'factorial')
783 expect(result2
).toBe(3628800)
784 const result3
= await pool
.execute(data
, 'fibonacci')
785 expect(result3
).toBe(55)