1 const { expect
} = require('expect')
8 WorkerChoiceStrategies
,
11 } = require('../../../lib')
12 const { CircularArray
} = require('../../../lib/circular-array')
13 const { Queue
} = require('../../../lib/queue')
15 describe('Abstract pool test suite', () => {
16 const numberOfWorkers
= 2
17 class StubPoolWithRemoveAllWorker
extends FixedThreadPool
{
20 this.promiseResponseMap
.clear()
23 class StubPoolWithIsMain
extends FixedThreadPool
{
29 it('Simulate pool creation from a non main thread/process', () => {
32 new StubPoolWithIsMain(
34 './tests/worker-files/thread/testWorker.js',
36 errorHandler
: e
=> console
.error(e
)
39 ).toThrowError('Cannot start a pool from a worker!')
42 it('Verify that filePath is checked', () => {
43 const expectedError
= new Error(
44 'Please specify a file with a worker implementation'
46 expect(() => new FixedThreadPool(numberOfWorkers
)).toThrowError(
49 expect(() => new FixedThreadPool(numberOfWorkers
, '')).toThrowError(
54 it('Verify that numberOfWorkers is checked', () => {
55 expect(() => new FixedThreadPool()).toThrowError(
56 'Cannot instantiate a pool without specifying the number of workers'
60 it('Verify that a negative number of workers is checked', () => {
63 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
66 'Cannot instantiate a pool with a negative number of workers'
71 it('Verify that a non integer number of workers is checked', () => {
74 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
77 'Cannot instantiate a pool with a non safe integer number of workers'
82 it('Verify that pool options are checked', async () => {
83 let pool
= new FixedThreadPool(
85 './tests/worker-files/thread/testWorker.js'
87 expect(pool
.emitter
).toBeDefined()
88 expect(pool
.opts
.enableEvents
).toBe(true)
89 expect(pool
.opts
.restartWorkerOnError
).toBe(true)
90 expect(pool
.opts
.enableTasksQueue
).toBe(false)
91 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
92 expect(pool
.opts
.workerChoiceStrategy
).toBe(
93 WorkerChoiceStrategies
.ROUND_ROBIN
95 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
96 runTime
: { median
: false },
97 waitTime
: { median
: false },
98 elu
: { median
: false }
100 expect(pool
.opts
.messageHandler
).toBeUndefined()
101 expect(pool
.opts
.errorHandler
).toBeUndefined()
102 expect(pool
.opts
.onlineHandler
).toBeUndefined()
103 expect(pool
.opts
.exitHandler
).toBeUndefined()
105 const testHandler
= () => console
.log('test handler executed')
106 pool
= new FixedThreadPool(
108 './tests/worker-files/thread/testWorker.js',
110 workerChoiceStrategy
: WorkerChoiceStrategies
.LEAST_USED
,
111 workerChoiceStrategyOptions
: {
112 runTime
: { median
: true },
113 weights
: { 0: 300, 1: 200 }
116 restartWorkerOnError
: false,
117 enableTasksQueue
: true,
118 tasksQueueOptions
: { concurrency
: 2 },
119 messageHandler
: testHandler
,
120 errorHandler
: testHandler
,
121 onlineHandler
: testHandler
,
122 exitHandler
: testHandler
125 expect(pool
.emitter
).toBeUndefined()
126 expect(pool
.opts
.enableEvents
).toBe(false)
127 expect(pool
.opts
.restartWorkerOnError
).toBe(false)
128 expect(pool
.opts
.enableTasksQueue
).toBe(true)
129 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
130 expect(pool
.opts
.workerChoiceStrategy
).toBe(
131 WorkerChoiceStrategies
.LEAST_USED
133 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
134 runTime
: { median
: true },
135 weights
: { 0: 300, 1: 200 }
137 expect(pool
.opts
.messageHandler
).toStrictEqual(testHandler
)
138 expect(pool
.opts
.errorHandler
).toStrictEqual(testHandler
)
139 expect(pool
.opts
.onlineHandler
).toStrictEqual(testHandler
)
140 expect(pool
.opts
.exitHandler
).toStrictEqual(testHandler
)
144 it('Verify that pool options are validated', async () => {
149 './tests/worker-files/thread/testWorker.js',
151 enableTasksQueue
: true,
152 tasksQueueOptions
: { concurrency
: 0 }
155 ).toThrowError("Invalid worker tasks concurrency '0'")
160 './tests/worker-files/thread/testWorker.js',
162 workerChoiceStrategy
: 'invalidStrategy'
165 ).toThrowError("Invalid worker choice strategy 'invalidStrategy'")
170 './tests/worker-files/thread/testWorker.js',
172 workerChoiceStrategyOptions
: { weights
: {} }
176 'Invalid worker choice strategy options: must have a weight for each worker node'
180 it('Verify that worker choice strategy options can be set', async () => {
181 const pool
= new FixedThreadPool(
183 './tests/worker-files/thread/testWorker.js',
184 { workerChoiceStrategy
: WorkerChoiceStrategies
.FAIR_SHARE
}
186 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
187 runTime
: { median
: false },
188 waitTime
: { median
: false },
189 elu
: { median
: false }
191 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
192 .workerChoiceStrategies
) {
193 expect(workerChoiceStrategy
.opts
).toStrictEqual({
194 runTime
: { median
: false },
195 waitTime
: { median
: false },
196 elu
: { median
: false }
200 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
218 pool
.setWorkerChoiceStrategyOptions({ runTime
: { median
: true } })
219 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
220 runTime
: { median
: true }
222 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
223 .workerChoiceStrategies
) {
224 expect(workerChoiceStrategy
.opts
).toStrictEqual({
225 runTime
: { median
: true }
229 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
247 pool
.setWorkerChoiceStrategyOptions({ runTime
: { median
: false } })
248 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
249 runTime
: { median
: false }
251 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
252 .workerChoiceStrategies
) {
253 expect(workerChoiceStrategy
.opts
).toStrictEqual({
254 runTime
: { median
: false }
258 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
279 it('Verify that tasks queue can be enabled/disabled', async () => {
280 const pool
= new FixedThreadPool(
282 './tests/worker-files/thread/testWorker.js'
284 expect(pool
.opts
.enableTasksQueue
).toBe(false)
285 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
286 pool
.enableTasksQueue(true)
287 expect(pool
.opts
.enableTasksQueue
).toBe(true)
288 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 1 })
289 pool
.enableTasksQueue(true, { concurrency
: 2 })
290 expect(pool
.opts
.enableTasksQueue
).toBe(true)
291 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
292 pool
.enableTasksQueue(false)
293 expect(pool
.opts
.enableTasksQueue
).toBe(false)
294 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
298 it('Verify that tasks queue options can be set', async () => {
299 const pool
= new FixedThreadPool(
301 './tests/worker-files/thread/testWorker.js',
302 { enableTasksQueue
: true }
304 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 1 })
305 pool
.setTasksQueueOptions({ concurrency
: 2 })
306 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
307 expect(() => pool
.setTasksQueueOptions({ concurrency
: 0 })).toThrowError(
308 "Invalid worker tasks concurrency '0'"
313 it('Verify that pool info is set', async () => {
314 let pool
= new FixedThreadPool(
316 './tests/worker-files/thread/testWorker.js'
318 expect(pool
.info
).toStrictEqual({
319 type
: PoolTypes
.fixed
,
320 worker
: WorkerTypes
.thread
,
321 minSize
: numberOfWorkers
,
322 maxSize
: numberOfWorkers
,
323 workerNodes
: numberOfWorkers
,
324 idleWorkerNodes
: numberOfWorkers
,
333 pool
= new DynamicClusterPool(
336 './tests/worker-files/thread/testWorker.js'
338 expect(pool
.info
).toStrictEqual({
339 type
: PoolTypes
.dynamic
,
340 worker
: WorkerTypes
.cluster
,
341 minSize
: numberOfWorkers
,
342 maxSize
: numberOfWorkers
* 2,
343 workerNodes
: numberOfWorkers
,
344 idleWorkerNodes
: numberOfWorkers
,
355 it('Simulate worker not found', async () => {
356 const pool
= new StubPoolWithRemoveAllWorker(
358 './tests/worker-files/cluster/testWorker.js',
360 errorHandler
: e
=> console
.error(e
)
363 expect(pool
.workerNodes
.length
).toBe(numberOfWorkers
)
364 // Simulate worker not found.
365 pool
.removeAllWorker()
366 expect(pool
.workerNodes
.length
).toBe(0)
370 it('Verify that worker pool tasks usage are initialized', async () => {
371 const pool
= new FixedClusterPool(
373 './tests/worker-files/cluster/testWorker.js'
375 for (const workerNode
of pool
.workerNodes
) {
376 expect(workerNode
.workerUsage
).toStrictEqual({
387 history
: expect
.any(CircularArray
)
393 history
: expect
.any(CircularArray
)
400 history
: expect
.any(CircularArray
)
406 history
: expect
.any(CircularArray
)
415 it('Verify that worker pool tasks queue are initialized', async () => {
416 const pool
= new FixedClusterPool(
418 './tests/worker-files/cluster/testWorker.js'
420 for (const workerNode
of pool
.workerNodes
) {
421 expect(workerNode
.tasksQueue
).toBeDefined()
422 expect(workerNode
.tasksQueue
).toBeInstanceOf(Queue
)
423 expect(workerNode
.tasksQueue
.size
).toBe(0)
428 it('Verify that worker pool tasks usage are computed', async () => {
429 const pool
= new FixedClusterPool(
431 './tests/worker-files/cluster/testWorker.js'
433 const promises
= new Set()
434 const maxMultiplier
= 2
435 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
436 promises
.add(pool
.execute())
438 for (const workerNode
of pool
.workerNodes
) {
439 expect(workerNode
.workerUsage
).toStrictEqual({
442 executing
: maxMultiplier
,
450 history
: expect
.any(CircularArray
)
456 history
: expect
.any(CircularArray
)
463 history
: expect
.any(CircularArray
)
469 history
: expect
.any(CircularArray
)
475 await Promise
.all(promises
)
476 for (const workerNode
of pool
.workerNodes
) {
477 expect(workerNode
.workerUsage
).toStrictEqual({
479 executed
: maxMultiplier
,
488 history
: expect
.any(CircularArray
)
494 history
: expect
.any(CircularArray
)
501 history
: expect
.any(CircularArray
)
507 history
: expect
.any(CircularArray
)
516 it('Verify that worker pool tasks usage are reset at worker choice strategy change', async () => {
517 const pool
= new DynamicThreadPool(
520 './tests/worker-files/thread/testWorker.js'
522 const promises
= new Set()
523 const maxMultiplier
= 2
524 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
525 promises
.add(pool
.execute())
527 await Promise
.all(promises
)
528 for (const workerNode
of pool
.workerNodes
) {
529 expect(workerNode
.workerUsage
).toStrictEqual({
531 executed
: expect
.any(Number
),
540 history
: expect
.any(CircularArray
)
546 history
: expect
.any(CircularArray
)
553 history
: expect
.any(CircularArray
)
559 history
: expect
.any(CircularArray
)
564 expect(workerNode
.workerUsage
.tasks
.executed
).toBeGreaterThan(0)
565 expect(workerNode
.workerUsage
.tasks
.executed
).toBeLessThanOrEqual(
569 pool
.setWorkerChoiceStrategy(WorkerChoiceStrategies
.FAIR_SHARE
)
570 for (const workerNode
of pool
.workerNodes
) {
571 expect(workerNode
.workerUsage
).toStrictEqual({
582 history
: expect
.any(CircularArray
)
588 history
: expect
.any(CircularArray
)
595 history
: expect
.any(CircularArray
)
601 history
: expect
.any(CircularArray
)
606 expect(workerNode
.workerUsage
.runTime
.history
.length
).toBe(0)
607 expect(workerNode
.workerUsage
.waitTime
.history
.length
).toBe(0)
612 it("Verify that pool event emitter 'full' event can register a callback", async () => {
613 const pool
= new DynamicThreadPool(
616 './tests/worker-files/thread/testWorker.js'
618 const promises
= new Set()
621 pool
.emitter
.on(PoolEvents
.full
, info
=> {
625 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
626 promises
.add(pool
.execute())
628 await Promise
.all(promises
)
629 // The `full` event is triggered when the number of submitted tasks at once reach the max number of workers in the dynamic pool.
630 // So in total numberOfWorkers * 2 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool with min = max = numberOfWorkers.
631 expect(poolFull
).toBe(numberOfWorkers
* 2)
632 expect(poolInfo
).toStrictEqual({
633 type
: PoolTypes
.dynamic
,
634 worker
: WorkerTypes
.thread
,
635 minSize
: expect
.any(Number
),
636 maxSize
: expect
.any(Number
),
637 workerNodes
: expect
.any(Number
),
638 idleWorkerNodes
: expect
.any(Number
),
639 busyWorkerNodes
: expect
.any(Number
),
640 executedTasks
: expect
.any(Number
),
641 executingTasks
: expect
.any(Number
),
642 queuedTasks
: expect
.any(Number
),
643 maxQueuedTasks
: expect
.any(Number
),
644 failedTasks
: expect
.any(Number
)
649 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
650 const pool
= new FixedThreadPool(
652 './tests/worker-files/thread/testWorker.js'
654 const promises
= new Set()
657 pool
.emitter
.on(PoolEvents
.busy
, info
=> {
661 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
662 promises
.add(pool
.execute())
664 await Promise
.all(promises
)
665 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
666 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
667 expect(poolBusy
).toBe(numberOfWorkers
+ 1)
668 expect(poolInfo
).toStrictEqual({
669 type
: PoolTypes
.fixed
,
670 worker
: WorkerTypes
.thread
,
671 minSize
: expect
.any(Number
),
672 maxSize
: expect
.any(Number
),
673 workerNodes
: expect
.any(Number
),
674 idleWorkerNodes
: expect
.any(Number
),
675 busyWorkerNodes
: expect
.any(Number
),
676 executedTasks
: expect
.any(Number
),
677 executingTasks
: expect
.any(Number
),
678 queuedTasks
: expect
.any(Number
),
679 maxQueuedTasks
: expect
.any(Number
),
680 failedTasks
: expect
.any(Number
)
685 it('Verify that multiple tasks worker is working', async () => {
686 const pool
= new DynamicClusterPool(
689 './tests/worker-files/cluster/testMultiTasksWorker.js'
691 const data
= { n
: 10 }
692 const result0
= await pool
.execute(data
)
693 expect(result0
).toBe(false)
694 const result1
= await pool
.execute(data
, 'jsonIntegerSerialization')
695 expect(result1
).toBe(false)
696 const result2
= await pool
.execute(data
, 'factorial')
697 expect(result2
).toBe(3628800)
698 const result3
= await pool
.execute(data
, 'fibonacci')
699 expect(result3
).toBe(89)