1 const { expect
} = require('expect')
8 WorkerChoiceStrategies
,
11 } = require('../../../lib')
12 const { CircularArray
} = require('../../../lib/circular-array')
13 const { Queue
} = require('../../../lib/queue')
15 describe('Abstract pool test suite', () => {
16 const numberOfWorkers
= 2
17 class StubPoolWithRemoveAllWorker
extends FixedThreadPool
{
20 this.promiseResponseMap
.clear()
23 class StubPoolWithIsMain
extends FixedThreadPool
{
29 it('Simulate pool creation from a non main thread/process', () => {
32 new StubPoolWithIsMain(
34 './tests/worker-files/thread/testWorker.js',
36 errorHandler
: e
=> console
.error(e
)
39 ).toThrowError('Cannot start a pool from a worker!')
42 it('Verify that filePath is checked', () => {
43 const expectedError
= new Error(
44 'Please specify a file with a worker implementation'
46 expect(() => new FixedThreadPool(numberOfWorkers
)).toThrowError(
49 expect(() => new FixedThreadPool(numberOfWorkers
, '')).toThrowError(
54 it('Verify that numberOfWorkers is checked', () => {
55 expect(() => new FixedThreadPool()).toThrowError(
56 'Cannot instantiate a pool without specifying the number of workers'
60 it('Verify that a negative number of workers is checked', () => {
63 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
66 'Cannot instantiate a pool with a negative number of workers'
71 it('Verify that a non integer number of workers is checked', () => {
74 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
77 'Cannot instantiate a pool with a non safe integer number of workers'
82 it('Verify that pool options are checked', async () => {
83 let pool
= new FixedThreadPool(
85 './tests/worker-files/thread/testWorker.js'
87 expect(pool
.emitter
).toBeDefined()
88 expect(pool
.opts
.enableEvents
).toBe(true)
89 expect(pool
.opts
.restartWorkerOnError
).toBe(true)
90 expect(pool
.opts
.enableTasksQueue
).toBe(false)
91 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
92 expect(pool
.opts
.workerChoiceStrategy
).toBe(
93 WorkerChoiceStrategies
.ROUND_ROBIN
95 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
96 runTime
: { median
: false },
97 waitTime
: { median
: false },
98 elu
: { median
: false }
100 expect(pool
.opts
.messageHandler
).toBeUndefined()
101 expect(pool
.opts
.errorHandler
).toBeUndefined()
102 expect(pool
.opts
.onlineHandler
).toBeUndefined()
103 expect(pool
.opts
.exitHandler
).toBeUndefined()
105 const testHandler
= () => console
.log('test handler executed')
106 pool
= new FixedThreadPool(
108 './tests/worker-files/thread/testWorker.js',
110 workerChoiceStrategy
: WorkerChoiceStrategies
.LEAST_USED
,
111 workerChoiceStrategyOptions
: {
112 runTime
: { median
: true },
113 weights
: { 0: 300, 1: 200 }
116 restartWorkerOnError
: false,
117 enableTasksQueue
: true,
118 tasksQueueOptions
: { concurrency
: 2 },
119 messageHandler
: testHandler
,
120 errorHandler
: testHandler
,
121 onlineHandler
: testHandler
,
122 exitHandler
: testHandler
125 expect(pool
.emitter
).toBeUndefined()
126 expect(pool
.opts
.enableEvents
).toBe(false)
127 expect(pool
.opts
.restartWorkerOnError
).toBe(false)
128 expect(pool
.opts
.enableTasksQueue
).toBe(true)
129 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
130 expect(pool
.opts
.workerChoiceStrategy
).toBe(
131 WorkerChoiceStrategies
.LEAST_USED
133 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
134 runTime
: { median
: true },
135 weights
: { 0: 300, 1: 200 }
137 expect(pool
.opts
.messageHandler
).toStrictEqual(testHandler
)
138 expect(pool
.opts
.errorHandler
).toStrictEqual(testHandler
)
139 expect(pool
.opts
.onlineHandler
).toStrictEqual(testHandler
)
140 expect(pool
.opts
.exitHandler
).toStrictEqual(testHandler
)
144 it('Verify that pool options are validated', async () => {
149 './tests/worker-files/thread/testWorker.js',
151 workerChoiceStrategy
: 'invalidStrategy'
154 ).toThrowError("Invalid worker choice strategy 'invalidStrategy'")
159 './tests/worker-files/thread/testWorker.js',
161 workerChoiceStrategyOptions
: 'invalidOptions'
165 'Invalid worker choice strategy options: must be a plain object'
171 './tests/worker-files/thread/testWorker.js',
173 workerChoiceStrategyOptions
: { weights
: {} }
177 'Invalid worker choice strategy options: must have a weight for each worker node'
183 './tests/worker-files/thread/testWorker.js',
185 workerChoiceStrategyOptions
: { measurement
: 'invalidMeasurement' }
189 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
195 './tests/worker-files/thread/testWorker.js',
197 enableTasksQueue
: true,
198 tasksQueueOptions
: { concurrency
: 0 }
201 ).toThrowError("Invalid worker tasks concurrency '0'")
206 './tests/worker-files/thread/testWorker.js',
208 enableTasksQueue
: true,
209 tasksQueueOptions
: 'invalidTasksQueueOptions'
212 ).toThrowError('Invalid tasks queue options: must be a plain object')
217 './tests/worker-files/thread/testWorker.js',
219 enableTasksQueue
: true,
220 tasksQueueOptions
: { concurrency
: 0.2 }
223 ).toThrowError('Invalid worker tasks concurrency: must be an integer')
226 it('Verify that worker choice strategy options can be set', async () => {
227 const pool
= new FixedThreadPool(
229 './tests/worker-files/thread/testWorker.js',
230 { workerChoiceStrategy
: WorkerChoiceStrategies
.FAIR_SHARE
}
232 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
233 runTime
: { median
: false },
234 waitTime
: { median
: false },
235 elu
: { median
: false }
237 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
238 .workerChoiceStrategies
) {
239 expect(workerChoiceStrategy
.opts
).toStrictEqual({
240 runTime
: { median
: false },
241 waitTime
: { median
: false },
242 elu
: { median
: false }
246 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
264 pool
.setWorkerChoiceStrategyOptions({
265 runTime
: { median
: true },
266 elu
: { median
: true }
268 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
269 runTime
: { median
: true },
270 elu
: { median
: true }
272 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
273 .workerChoiceStrategies
) {
274 expect(workerChoiceStrategy
.opts
).toStrictEqual({
275 runTime
: { median
: true },
276 elu
: { median
: true }
280 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
298 pool
.setWorkerChoiceStrategyOptions({
299 runTime
: { median
: false },
300 elu
: { median
: false }
302 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
303 runTime
: { median
: false },
304 elu
: { median
: false }
306 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
307 .workerChoiceStrategies
) {
308 expect(workerChoiceStrategy
.opts
).toStrictEqual({
309 runTime
: { median
: false },
310 elu
: { median
: false }
314 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
335 it('Verify that tasks queue can be enabled/disabled', async () => {
336 const pool
= new FixedThreadPool(
338 './tests/worker-files/thread/testWorker.js'
340 expect(pool
.opts
.enableTasksQueue
).toBe(false)
341 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
342 pool
.enableTasksQueue(true)
343 expect(pool
.opts
.enableTasksQueue
).toBe(true)
344 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 1 })
345 pool
.enableTasksQueue(true, { concurrency
: 2 })
346 expect(pool
.opts
.enableTasksQueue
).toBe(true)
347 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
348 pool
.enableTasksQueue(false)
349 expect(pool
.opts
.enableTasksQueue
).toBe(false)
350 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
354 it('Verify that tasks queue options can be set', async () => {
355 const pool
= new FixedThreadPool(
357 './tests/worker-files/thread/testWorker.js',
358 { enableTasksQueue
: true }
360 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 1 })
361 pool
.setTasksQueueOptions({ concurrency
: 2 })
362 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
364 pool
.setTasksQueueOptions('invalidTasksQueueOptions')
365 ).toThrowError('Invalid tasks queue options: must be a plain object')
366 expect(() => pool
.setTasksQueueOptions({ concurrency
: 0 })).toThrowError(
367 "Invalid worker tasks concurrency '0'"
369 expect(() => pool
.setTasksQueueOptions({ concurrency
: 0.2 })).toThrowError(
370 'Invalid worker tasks concurrency: must be an integer'
375 it('Verify that pool info is set', async () => {
376 let pool
= new FixedThreadPool(
378 './tests/worker-files/thread/testWorker.js'
380 expect(pool
.info
).toStrictEqual({
381 type
: PoolTypes
.fixed
,
382 worker
: WorkerTypes
.thread
,
383 minSize
: numberOfWorkers
,
384 maxSize
: numberOfWorkers
,
385 workerNodes
: numberOfWorkers
,
386 idleWorkerNodes
: numberOfWorkers
,
395 pool
= new DynamicClusterPool(
398 './tests/worker-files/thread/testWorker.js'
400 expect(pool
.info
).toStrictEqual({
401 type
: PoolTypes
.dynamic
,
402 worker
: WorkerTypes
.cluster
,
403 minSize
: numberOfWorkers
,
404 maxSize
: numberOfWorkers
* 2,
405 workerNodes
: numberOfWorkers
,
406 idleWorkerNodes
: numberOfWorkers
,
417 it('Simulate worker not found', async () => {
418 const pool
= new StubPoolWithRemoveAllWorker(
420 './tests/worker-files/cluster/testWorker.js',
422 errorHandler
: e
=> console
.error(e
)
425 expect(pool
.workerNodes
.length
).toBe(numberOfWorkers
)
426 // Simulate worker not found.
427 pool
.removeAllWorker()
428 expect(pool
.workerNodes
.length
).toBe(0)
432 it('Verify that worker pool tasks usage are initialized', async () => {
433 const pool
= new FixedClusterPool(
435 './tests/worker-files/cluster/testWorker.js'
437 for (const workerNode
of pool
.workerNodes
) {
438 expect(workerNode
.workerUsage
).toStrictEqual({
449 history
: expect
.any(CircularArray
)
455 history
: expect
.any(CircularArray
)
462 history
: expect
.any(CircularArray
)
468 history
: expect
.any(CircularArray
)
477 it('Verify that worker pool tasks queue are initialized', async () => {
478 const pool
= new FixedClusterPool(
480 './tests/worker-files/cluster/testWorker.js'
482 for (const workerNode
of pool
.workerNodes
) {
483 expect(workerNode
.tasksQueue
).toBeDefined()
484 expect(workerNode
.tasksQueue
).toBeInstanceOf(Queue
)
485 expect(workerNode
.tasksQueue
.size
).toBe(0)
490 it('Verify that worker pool tasks usage are computed', async () => {
491 const pool
= new FixedClusterPool(
493 './tests/worker-files/cluster/testWorker.js'
495 const promises
= new Set()
496 const maxMultiplier
= 2
497 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
498 promises
.add(pool
.execute())
500 for (const workerNode
of pool
.workerNodes
) {
501 expect(workerNode
.workerUsage
).toStrictEqual({
504 executing
: maxMultiplier
,
512 history
: expect
.any(CircularArray
)
518 history
: expect
.any(CircularArray
)
525 history
: expect
.any(CircularArray
)
531 history
: expect
.any(CircularArray
)
537 await Promise
.all(promises
)
538 for (const workerNode
of pool
.workerNodes
) {
539 expect(workerNode
.workerUsage
).toStrictEqual({
541 executed
: maxMultiplier
,
550 history
: expect
.any(CircularArray
)
556 history
: expect
.any(CircularArray
)
563 history
: expect
.any(CircularArray
)
569 history
: expect
.any(CircularArray
)
578 it('Verify that worker pool tasks usage are reset at worker choice strategy change', async () => {
579 const pool
= new DynamicThreadPool(
582 './tests/worker-files/thread/testWorker.js'
584 const promises
= new Set()
585 const maxMultiplier
= 2
586 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
587 promises
.add(pool
.execute())
589 await Promise
.all(promises
)
590 for (const workerNode
of pool
.workerNodes
) {
591 expect(workerNode
.workerUsage
).toStrictEqual({
593 executed
: expect
.any(Number
),
602 history
: expect
.any(CircularArray
)
608 history
: expect
.any(CircularArray
)
615 history
: expect
.any(CircularArray
)
621 history
: expect
.any(CircularArray
)
626 expect(workerNode
.workerUsage
.tasks
.executed
).toBeGreaterThan(0)
627 expect(workerNode
.workerUsage
.tasks
.executed
).toBeLessThanOrEqual(
631 pool
.setWorkerChoiceStrategy(WorkerChoiceStrategies
.FAIR_SHARE
)
632 for (const workerNode
of pool
.workerNodes
) {
633 expect(workerNode
.workerUsage
).toStrictEqual({
644 history
: expect
.any(CircularArray
)
650 history
: expect
.any(CircularArray
)
657 history
: expect
.any(CircularArray
)
663 history
: expect
.any(CircularArray
)
668 expect(workerNode
.workerUsage
.runTime
.history
.length
).toBe(0)
669 expect(workerNode
.workerUsage
.waitTime
.history
.length
).toBe(0)
674 it("Verify that pool event emitter 'full' event can register a callback", async () => {
675 const pool
= new DynamicThreadPool(
678 './tests/worker-files/thread/testWorker.js'
680 const promises
= new Set()
683 pool
.emitter
.on(PoolEvents
.full
, info
=> {
687 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
688 promises
.add(pool
.execute())
690 await Promise
.all(promises
)
691 // The `full` event is triggered when the number of submitted tasks at once reach the max number of workers in the dynamic pool.
692 // So in total numberOfWorkers * 2 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool with min = max = numberOfWorkers.
693 expect(poolFull
).toBe(numberOfWorkers
* 2)
694 expect(poolInfo
).toStrictEqual({
695 type
: PoolTypes
.dynamic
,
696 worker
: WorkerTypes
.thread
,
697 minSize
: expect
.any(Number
),
698 maxSize
: expect
.any(Number
),
699 workerNodes
: expect
.any(Number
),
700 idleWorkerNodes
: expect
.any(Number
),
701 busyWorkerNodes
: expect
.any(Number
),
702 executedTasks
: expect
.any(Number
),
703 executingTasks
: expect
.any(Number
),
704 queuedTasks
: expect
.any(Number
),
705 maxQueuedTasks
: expect
.any(Number
),
706 failedTasks
: expect
.any(Number
)
711 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
712 const pool
= new FixedThreadPool(
714 './tests/worker-files/thread/testWorker.js'
716 const promises
= new Set()
719 pool
.emitter
.on(PoolEvents
.busy
, info
=> {
723 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
724 promises
.add(pool
.execute())
726 await Promise
.all(promises
)
727 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
728 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
729 expect(poolBusy
).toBe(numberOfWorkers
+ 1)
730 expect(poolInfo
).toStrictEqual({
731 type
: PoolTypes
.fixed
,
732 worker
: WorkerTypes
.thread
,
733 minSize
: expect
.any(Number
),
734 maxSize
: expect
.any(Number
),
735 workerNodes
: expect
.any(Number
),
736 idleWorkerNodes
: expect
.any(Number
),
737 busyWorkerNodes
: expect
.any(Number
),
738 executedTasks
: expect
.any(Number
),
739 executingTasks
: expect
.any(Number
),
740 queuedTasks
: expect
.any(Number
),
741 maxQueuedTasks
: expect
.any(Number
),
742 failedTasks
: expect
.any(Number
)
747 it('Verify that multiple tasks worker is working', async () => {
748 const pool
= new DynamicClusterPool(
751 './tests/worker-files/cluster/testMultiTasksWorker.js'
753 const data
= { n
: 10 }
754 const result0
= await pool
.execute(data
)
755 expect(result0
).toBe(false)
756 const result1
= await pool
.execute(data
, 'jsonIntegerSerialization')
757 expect(result1
).toBe(false)
758 const result2
= await pool
.execute(data
, 'factorial')
759 expect(result2
).toBe(3628800)
760 const result3
= await pool
.execute(data
, 'fibonacci')
761 expect(result3
).toBe(89)