1 const { expect
} = require('expect')
8 WorkerChoiceStrategies
,
11 } = require('../../../lib')
12 const { CircularArray
} = require('../../../lib/circular-array')
13 const { Queue
} = require('../../../lib/queue')
15 describe('Abstract pool test suite', () => {
16 const numberOfWorkers
= 2
17 class StubPoolWithRemoveAllWorker
extends FixedThreadPool
{
20 this.promiseResponseMap
.clear()
23 class StubPoolWithIsMain
extends FixedThreadPool
{
29 it('Simulate pool creation from a non main thread/process', () => {
32 new StubPoolWithIsMain(
34 './tests/worker-files/thread/testWorker.js',
36 errorHandler
: e
=> console
.error(e
)
39 ).toThrowError('Cannot start a pool from a worker!')
42 it('Verify that filePath is checked', () => {
43 const expectedError
= new Error(
44 'Please specify a file with a worker implementation'
46 expect(() => new FixedThreadPool(numberOfWorkers
)).toThrowError(
49 expect(() => new FixedThreadPool(numberOfWorkers
, '')).toThrowError(
54 it('Verify that numberOfWorkers is checked', () => {
55 expect(() => new FixedThreadPool()).toThrowError(
56 'Cannot instantiate a pool without specifying the number of workers'
60 it('Verify that a negative number of workers is checked', () => {
63 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
66 'Cannot instantiate a pool with a negative number of workers'
71 it('Verify that a non integer number of workers is checked', () => {
74 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
77 'Cannot instantiate a pool with a non safe integer number of workers'
82 it('Verify that pool options are checked', async () => {
83 let pool
= new FixedThreadPool(
85 './tests/worker-files/thread/testWorker.js'
87 expect(pool
.emitter
).toBeDefined()
88 expect(pool
.opts
.enableEvents
).toBe(true)
89 expect(pool
.opts
.restartWorkerOnError
).toBe(true)
90 expect(pool
.opts
.enableTasksQueue
).toBe(false)
91 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
92 expect(pool
.opts
.workerChoiceStrategy
).toBe(
93 WorkerChoiceStrategies
.ROUND_ROBIN
95 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
96 runTime
: { median
: false },
97 waitTime
: { median
: false },
98 elu
: { median
: false }
100 expect(pool
.opts
.messageHandler
).toBeUndefined()
101 expect(pool
.opts
.errorHandler
).toBeUndefined()
102 expect(pool
.opts
.onlineHandler
).toBeUndefined()
103 expect(pool
.opts
.exitHandler
).toBeUndefined()
105 const testHandler
= () => console
.log('test handler executed')
106 pool
= new FixedThreadPool(
108 './tests/worker-files/thread/testWorker.js',
110 workerChoiceStrategy
: WorkerChoiceStrategies
.LEAST_USED
,
111 workerChoiceStrategyOptions
: {
112 runTime
: { median
: true },
113 weights
: { 0: 300, 1: 200 }
116 restartWorkerOnError
: false,
117 enableTasksQueue
: true,
118 tasksQueueOptions
: { concurrency
: 2 },
119 messageHandler
: testHandler
,
120 errorHandler
: testHandler
,
121 onlineHandler
: testHandler
,
122 exitHandler
: testHandler
125 expect(pool
.emitter
).toBeUndefined()
126 expect(pool
.opts
.enableEvents
).toBe(false)
127 expect(pool
.opts
.restartWorkerOnError
).toBe(false)
128 expect(pool
.opts
.enableTasksQueue
).toBe(true)
129 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
130 expect(pool
.opts
.workerChoiceStrategy
).toBe(
131 WorkerChoiceStrategies
.LEAST_USED
133 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
134 runTime
: { median
: true },
135 weights
: { 0: 300, 1: 200 }
137 expect(pool
.opts
.messageHandler
).toStrictEqual(testHandler
)
138 expect(pool
.opts
.errorHandler
).toStrictEqual(testHandler
)
139 expect(pool
.opts
.onlineHandler
).toStrictEqual(testHandler
)
140 expect(pool
.opts
.exitHandler
).toStrictEqual(testHandler
)
144 it('Verify that pool options are validated', async () => {
149 './tests/worker-files/thread/testWorker.js',
151 enableTasksQueue
: true,
152 tasksQueueOptions
: { concurrency
: 0 }
155 ).toThrowError("Invalid worker tasks concurrency '0'")
160 './tests/worker-files/thread/testWorker.js',
162 workerChoiceStrategy
: 'invalidStrategy'
165 ).toThrowError("Invalid worker choice strategy 'invalidStrategy'")
170 './tests/worker-files/thread/testWorker.js',
172 workerChoiceStrategyOptions
: { weights
: {} }
176 'Invalid worker choice strategy options: must have a weight for each worker node'
180 it('Verify that worker choice strategy options can be set', async () => {
181 const pool
= new FixedThreadPool(
183 './tests/worker-files/thread/testWorker.js',
184 { workerChoiceStrategy
: WorkerChoiceStrategies
.FAIR_SHARE
}
186 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
187 runTime
: { median
: false },
188 waitTime
: { median
: false },
189 elu
: { median
: false }
191 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
192 .workerChoiceStrategies
) {
193 expect(workerChoiceStrategy
.opts
).toStrictEqual({
194 runTime
: { median
: false },
195 waitTime
: { median
: false },
196 elu
: { median
: false }
200 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
218 pool
.setWorkerChoiceStrategyOptions({
219 runTime
: { median
: true },
220 elu
: { median
: true }
222 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
223 runTime
: { median
: true },
224 elu
: { median
: true }
226 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
227 .workerChoiceStrategies
) {
228 expect(workerChoiceStrategy
.opts
).toStrictEqual({
229 runTime
: { median
: true },
230 elu
: { median
: true }
234 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
252 pool
.setWorkerChoiceStrategyOptions({
253 runTime
: { median
: false },
254 elu
: { median
: false }
256 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
257 runTime
: { median
: false },
258 elu
: { median
: false }
260 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
261 .workerChoiceStrategies
) {
262 expect(workerChoiceStrategy
.opts
).toStrictEqual({
263 runTime
: { median
: false },
264 elu
: { median
: false }
268 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
289 it('Verify that tasks queue can be enabled/disabled', async () => {
290 const pool
= new FixedThreadPool(
292 './tests/worker-files/thread/testWorker.js'
294 expect(pool
.opts
.enableTasksQueue
).toBe(false)
295 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
296 pool
.enableTasksQueue(true)
297 expect(pool
.opts
.enableTasksQueue
).toBe(true)
298 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 1 })
299 pool
.enableTasksQueue(true, { concurrency
: 2 })
300 expect(pool
.opts
.enableTasksQueue
).toBe(true)
301 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
302 pool
.enableTasksQueue(false)
303 expect(pool
.opts
.enableTasksQueue
).toBe(false)
304 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
308 it('Verify that tasks queue options can be set', async () => {
309 const pool
= new FixedThreadPool(
311 './tests/worker-files/thread/testWorker.js',
312 { enableTasksQueue
: true }
314 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 1 })
315 pool
.setTasksQueueOptions({ concurrency
: 2 })
316 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
317 expect(() => pool
.setTasksQueueOptions({ concurrency
: 0 })).toThrowError(
318 "Invalid worker tasks concurrency '0'"
323 it('Verify that pool info is set', async () => {
324 let pool
= new FixedThreadPool(
326 './tests/worker-files/thread/testWorker.js'
328 expect(pool
.info
).toStrictEqual({
329 type
: PoolTypes
.fixed
,
330 worker
: WorkerTypes
.thread
,
331 minSize
: numberOfWorkers
,
332 maxSize
: numberOfWorkers
,
333 workerNodes
: numberOfWorkers
,
334 idleWorkerNodes
: numberOfWorkers
,
343 pool
= new DynamicClusterPool(
346 './tests/worker-files/thread/testWorker.js'
348 expect(pool
.info
).toStrictEqual({
349 type
: PoolTypes
.dynamic
,
350 worker
: WorkerTypes
.cluster
,
351 minSize
: numberOfWorkers
,
352 maxSize
: numberOfWorkers
* 2,
353 workerNodes
: numberOfWorkers
,
354 idleWorkerNodes
: numberOfWorkers
,
365 it('Simulate worker not found', async () => {
366 const pool
= new StubPoolWithRemoveAllWorker(
368 './tests/worker-files/cluster/testWorker.js',
370 errorHandler
: e
=> console
.error(e
)
373 expect(pool
.workerNodes
.length
).toBe(numberOfWorkers
)
374 // Simulate worker not found.
375 pool
.removeAllWorker()
376 expect(pool
.workerNodes
.length
).toBe(0)
380 it('Verify that worker pool tasks usage are initialized', async () => {
381 const pool
= new FixedClusterPool(
383 './tests/worker-files/cluster/testWorker.js'
385 for (const workerNode
of pool
.workerNodes
) {
386 expect(workerNode
.workerUsage
).toStrictEqual({
397 history
: expect
.any(CircularArray
)
403 history
: expect
.any(CircularArray
)
410 history
: expect
.any(CircularArray
)
416 history
: expect
.any(CircularArray
)
425 it('Verify that worker pool tasks queue are initialized', async () => {
426 const pool
= new FixedClusterPool(
428 './tests/worker-files/cluster/testWorker.js'
430 for (const workerNode
of pool
.workerNodes
) {
431 expect(workerNode
.tasksQueue
).toBeDefined()
432 expect(workerNode
.tasksQueue
).toBeInstanceOf(Queue
)
433 expect(workerNode
.tasksQueue
.size
).toBe(0)
438 it('Verify that worker pool tasks usage are computed', async () => {
439 const pool
= new FixedClusterPool(
441 './tests/worker-files/cluster/testWorker.js'
443 const promises
= new Set()
444 const maxMultiplier
= 2
445 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
446 promises
.add(pool
.execute())
448 for (const workerNode
of pool
.workerNodes
) {
449 expect(workerNode
.workerUsage
).toStrictEqual({
452 executing
: maxMultiplier
,
460 history
: expect
.any(CircularArray
)
466 history
: expect
.any(CircularArray
)
473 history
: expect
.any(CircularArray
)
479 history
: expect
.any(CircularArray
)
485 await Promise
.all(promises
)
486 for (const workerNode
of pool
.workerNodes
) {
487 expect(workerNode
.workerUsage
).toStrictEqual({
489 executed
: maxMultiplier
,
498 history
: expect
.any(CircularArray
)
504 history
: expect
.any(CircularArray
)
511 history
: expect
.any(CircularArray
)
517 history
: expect
.any(CircularArray
)
526 it('Verify that worker pool tasks usage are reset at worker choice strategy change', async () => {
527 const pool
= new DynamicThreadPool(
530 './tests/worker-files/thread/testWorker.js'
532 const promises
= new Set()
533 const maxMultiplier
= 2
534 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
535 promises
.add(pool
.execute())
537 await Promise
.all(promises
)
538 for (const workerNode
of pool
.workerNodes
) {
539 expect(workerNode
.workerUsage
).toStrictEqual({
541 executed
: expect
.any(Number
),
550 history
: expect
.any(CircularArray
)
556 history
: expect
.any(CircularArray
)
563 history
: expect
.any(CircularArray
)
569 history
: expect
.any(CircularArray
)
574 expect(workerNode
.workerUsage
.tasks
.executed
).toBeGreaterThan(0)
575 expect(workerNode
.workerUsage
.tasks
.executed
).toBeLessThanOrEqual(
579 pool
.setWorkerChoiceStrategy(WorkerChoiceStrategies
.FAIR_SHARE
)
580 for (const workerNode
of pool
.workerNodes
) {
581 expect(workerNode
.workerUsage
).toStrictEqual({
592 history
: expect
.any(CircularArray
)
598 history
: expect
.any(CircularArray
)
605 history
: expect
.any(CircularArray
)
611 history
: expect
.any(CircularArray
)
616 expect(workerNode
.workerUsage
.runTime
.history
.length
).toBe(0)
617 expect(workerNode
.workerUsage
.waitTime
.history
.length
).toBe(0)
622 it("Verify that pool event emitter 'full' event can register a callback", async () => {
623 const pool
= new DynamicThreadPool(
626 './tests/worker-files/thread/testWorker.js'
628 const promises
= new Set()
631 pool
.emitter
.on(PoolEvents
.full
, info
=> {
635 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
636 promises
.add(pool
.execute())
638 await Promise
.all(promises
)
639 // The `full` event is triggered when the number of submitted tasks at once reach the max number of workers in the dynamic pool.
640 // So in total numberOfWorkers * 2 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool with min = max = numberOfWorkers.
641 expect(poolFull
).toBe(numberOfWorkers
* 2)
642 expect(poolInfo
).toStrictEqual({
643 type
: PoolTypes
.dynamic
,
644 worker
: WorkerTypes
.thread
,
645 minSize
: expect
.any(Number
),
646 maxSize
: expect
.any(Number
),
647 workerNodes
: expect
.any(Number
),
648 idleWorkerNodes
: expect
.any(Number
),
649 busyWorkerNodes
: expect
.any(Number
),
650 executedTasks
: expect
.any(Number
),
651 executingTasks
: expect
.any(Number
),
652 queuedTasks
: expect
.any(Number
),
653 maxQueuedTasks
: expect
.any(Number
),
654 failedTasks
: expect
.any(Number
)
659 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
660 const pool
= new FixedThreadPool(
662 './tests/worker-files/thread/testWorker.js'
664 const promises
= new Set()
667 pool
.emitter
.on(PoolEvents
.busy
, info
=> {
671 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
672 promises
.add(pool
.execute())
674 await Promise
.all(promises
)
675 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
676 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
677 expect(poolBusy
).toBe(numberOfWorkers
+ 1)
678 expect(poolInfo
).toStrictEqual({
679 type
: PoolTypes
.fixed
,
680 worker
: WorkerTypes
.thread
,
681 minSize
: expect
.any(Number
),
682 maxSize
: expect
.any(Number
),
683 workerNodes
: expect
.any(Number
),
684 idleWorkerNodes
: expect
.any(Number
),
685 busyWorkerNodes
: expect
.any(Number
),
686 executedTasks
: expect
.any(Number
),
687 executingTasks
: expect
.any(Number
),
688 queuedTasks
: expect
.any(Number
),
689 maxQueuedTasks
: expect
.any(Number
),
690 failedTasks
: expect
.any(Number
)
695 it('Verify that multiple tasks worker is working', async () => {
696 const pool
= new DynamicClusterPool(
699 './tests/worker-files/cluster/testMultiTasksWorker.js'
701 const data
= { n
: 10 }
702 const result0
= await pool
.execute(data
)
703 expect(result0
).toBe(false)
704 const result1
= await pool
.execute(data
, 'jsonIntegerSerialization')
705 expect(result1
).toBe(false)
706 const result2
= await pool
.execute(data
, 'factorial')
707 expect(result2
).toBe(3628800)
708 const result3
= await pool
.execute(data
, 'fibonacci')
709 expect(result3
).toBe(89)