1 const { expect
} = require('expect')
8 WorkerChoiceStrategies
,
11 } = require('../../../lib')
12 const { CircularArray
} = require('../../../lib/circular-array')
13 const { Queue
} = require('../../../lib/queue')
14 const { version
} = require('../../../package.json')
16 describe('Abstract pool test suite', () => {
17 const numberOfWorkers
= 2
18 class StubPoolWithRemoveAllWorker
extends FixedThreadPool
{
21 this.promiseResponseMap
.clear()
24 class StubPoolWithIsMain
extends FixedThreadPool
{
30 it('Simulate pool creation from a non main thread/process', () => {
33 new StubPoolWithIsMain(
35 './tests/worker-files/thread/testWorker.js',
37 errorHandler
: e
=> console
.error(e
)
40 ).toThrowError('Cannot start a pool from a worker!')
43 it('Verify that filePath is checked', () => {
44 const expectedError
= new Error(
45 'Please specify a file with a worker implementation'
47 expect(() => new FixedThreadPool(numberOfWorkers
)).toThrowError(
50 expect(() => new FixedThreadPool(numberOfWorkers
, '')).toThrowError(
55 it('Verify that numberOfWorkers is checked', () => {
56 expect(() => new FixedThreadPool()).toThrowError(
57 'Cannot instantiate a pool without specifying the number of workers'
61 it('Verify that a negative number of workers is checked', () => {
64 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
67 'Cannot instantiate a pool with a negative number of workers'
72 it('Verify that a non integer number of workers is checked', () => {
75 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
78 'Cannot instantiate a pool with a non safe integer number of workers'
83 it('Verify that pool options are checked', async () => {
84 let pool
= new FixedThreadPool(
86 './tests/worker-files/thread/testWorker.js'
88 expect(pool
.emitter
).toBeDefined()
89 expect(pool
.opts
.enableEvents
).toBe(true)
90 expect(pool
.opts
.restartWorkerOnError
).toBe(true)
91 expect(pool
.opts
.enableTasksQueue
).toBe(false)
92 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
93 expect(pool
.opts
.workerChoiceStrategy
).toBe(
94 WorkerChoiceStrategies
.ROUND_ROBIN
96 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
97 runTime
: { median
: false },
98 waitTime
: { median
: false },
99 elu
: { median
: false }
101 expect(pool
.opts
.messageHandler
).toBeUndefined()
102 expect(pool
.opts
.errorHandler
).toBeUndefined()
103 expect(pool
.opts
.onlineHandler
).toBeUndefined()
104 expect(pool
.opts
.exitHandler
).toBeUndefined()
106 const testHandler
= () => console
.log('test handler executed')
107 pool
= new FixedThreadPool(
109 './tests/worker-files/thread/testWorker.js',
111 workerChoiceStrategy
: WorkerChoiceStrategies
.LEAST_USED
,
112 workerChoiceStrategyOptions
: {
113 runTime
: { median
: true },
114 weights
: { 0: 300, 1: 200 }
117 restartWorkerOnError
: false,
118 enableTasksQueue
: true,
119 tasksQueueOptions
: { concurrency
: 2 },
120 messageHandler
: testHandler
,
121 errorHandler
: testHandler
,
122 onlineHandler
: testHandler
,
123 exitHandler
: testHandler
126 expect(pool
.emitter
).toBeUndefined()
127 expect(pool
.opts
.enableEvents
).toBe(false)
128 expect(pool
.opts
.restartWorkerOnError
).toBe(false)
129 expect(pool
.opts
.enableTasksQueue
).toBe(true)
130 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
131 expect(pool
.opts
.workerChoiceStrategy
).toBe(
132 WorkerChoiceStrategies
.LEAST_USED
134 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
135 runTime
: { median
: true },
136 weights
: { 0: 300, 1: 200 }
138 expect(pool
.opts
.messageHandler
).toStrictEqual(testHandler
)
139 expect(pool
.opts
.errorHandler
).toStrictEqual(testHandler
)
140 expect(pool
.opts
.onlineHandler
).toStrictEqual(testHandler
)
141 expect(pool
.opts
.exitHandler
).toStrictEqual(testHandler
)
145 it('Verify that pool options are validated', async () => {
150 './tests/worker-files/thread/testWorker.js',
152 workerChoiceStrategy
: 'invalidStrategy'
155 ).toThrowError("Invalid worker choice strategy 'invalidStrategy'")
160 './tests/worker-files/thread/testWorker.js',
162 workerChoiceStrategyOptions
: 'invalidOptions'
166 'Invalid worker choice strategy options: must be a plain object'
172 './tests/worker-files/thread/testWorker.js',
174 workerChoiceStrategyOptions
: { weights
: {} }
178 'Invalid worker choice strategy options: must have a weight for each worker node'
184 './tests/worker-files/thread/testWorker.js',
186 workerChoiceStrategyOptions
: { measurement
: 'invalidMeasurement' }
190 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
196 './tests/worker-files/thread/testWorker.js',
198 enableTasksQueue
: true,
199 tasksQueueOptions
: { concurrency
: 0 }
202 ).toThrowError("Invalid worker tasks concurrency '0'")
207 './tests/worker-files/thread/testWorker.js',
209 enableTasksQueue
: true,
210 tasksQueueOptions
: 'invalidTasksQueueOptions'
213 ).toThrowError('Invalid tasks queue options: must be a plain object')
218 './tests/worker-files/thread/testWorker.js',
220 enableTasksQueue
: true,
221 tasksQueueOptions
: { concurrency
: 0.2 }
224 ).toThrowError('Invalid worker tasks concurrency: must be an integer')
227 it('Verify that worker choice strategy options can be set', async () => {
228 const pool
= new FixedThreadPool(
230 './tests/worker-files/thread/testWorker.js',
231 { workerChoiceStrategy
: WorkerChoiceStrategies
.FAIR_SHARE
}
233 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
234 runTime
: { median
: false },
235 waitTime
: { median
: false },
236 elu
: { median
: false }
238 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
239 .workerChoiceStrategies
) {
240 expect(workerChoiceStrategy
.opts
).toStrictEqual({
241 runTime
: { median
: false },
242 waitTime
: { median
: false },
243 elu
: { median
: false }
247 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
265 pool
.setWorkerChoiceStrategyOptions({
266 runTime
: { median
: true },
267 elu
: { median
: true }
269 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
270 runTime
: { median
: true },
271 elu
: { median
: true }
273 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
274 .workerChoiceStrategies
) {
275 expect(workerChoiceStrategy
.opts
).toStrictEqual({
276 runTime
: { median
: true },
277 elu
: { median
: true }
281 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
299 pool
.setWorkerChoiceStrategyOptions({
300 runTime
: { median
: false },
301 elu
: { median
: false }
303 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
304 runTime
: { median
: false },
305 elu
: { median
: false }
307 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
308 .workerChoiceStrategies
) {
309 expect(workerChoiceStrategy
.opts
).toStrictEqual({
310 runTime
: { median
: false },
311 elu
: { median
: false }
315 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
334 pool
.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
336 'Invalid worker choice strategy options: must be a plain object'
339 pool
.setWorkerChoiceStrategyOptions({ weights
: {} })
341 'Invalid worker choice strategy options: must have a weight for each worker node'
344 pool
.setWorkerChoiceStrategyOptions({ measurement
: 'invalidMeasurement' })
346 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
351 it('Verify that tasks queue can be enabled/disabled', async () => {
352 const pool
= new FixedThreadPool(
354 './tests/worker-files/thread/testWorker.js'
356 expect(pool
.opts
.enableTasksQueue
).toBe(false)
357 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
358 pool
.enableTasksQueue(true)
359 expect(pool
.opts
.enableTasksQueue
).toBe(true)
360 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 1 })
361 pool
.enableTasksQueue(true, { concurrency
: 2 })
362 expect(pool
.opts
.enableTasksQueue
).toBe(true)
363 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
364 pool
.enableTasksQueue(false)
365 expect(pool
.opts
.enableTasksQueue
).toBe(false)
366 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
370 it('Verify that tasks queue options can be set', async () => {
371 const pool
= new FixedThreadPool(
373 './tests/worker-files/thread/testWorker.js',
374 { enableTasksQueue
: true }
376 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 1 })
377 pool
.setTasksQueueOptions({ concurrency
: 2 })
378 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
380 pool
.setTasksQueueOptions('invalidTasksQueueOptions')
381 ).toThrowError('Invalid tasks queue options: must be a plain object')
382 expect(() => pool
.setTasksQueueOptions({ concurrency
: 0 })).toThrowError(
383 "Invalid worker tasks concurrency '0'"
385 expect(() => pool
.setTasksQueueOptions({ concurrency
: 0.2 })).toThrowError(
386 'Invalid worker tasks concurrency: must be an integer'
391 it('Verify that pool info is set', async () => {
392 let pool
= new FixedThreadPool(
394 './tests/worker-files/thread/testWorker.js'
396 expect(pool
.info
).toStrictEqual({
398 type
: PoolTypes
.fixed
,
399 worker
: WorkerTypes
.thread
,
400 minSize
: numberOfWorkers
,
401 maxSize
: numberOfWorkers
,
402 workerNodes
: numberOfWorkers
,
403 idleWorkerNodes
: numberOfWorkers
,
412 pool
= new DynamicClusterPool(
415 './tests/worker-files/cluster/testWorker.js'
417 expect(pool
.info
).toStrictEqual({
419 type
: PoolTypes
.dynamic
,
420 worker
: WorkerTypes
.cluster
,
421 minSize
: numberOfWorkers
,
422 maxSize
: numberOfWorkers
* 2,
423 workerNodes
: numberOfWorkers
,
424 idleWorkerNodes
: numberOfWorkers
,
435 it('Simulate worker not found', async () => {
436 const pool
= new StubPoolWithRemoveAllWorker(
438 './tests/worker-files/thread/testWorker.js',
440 errorHandler
: e
=> console
.error(e
)
443 expect(pool
.workerNodes
.length
).toBe(numberOfWorkers
)
444 // Simulate worker not found.
445 pool
.removeAllWorker()
446 expect(pool
.workerNodes
.length
).toBe(0)
450 it('Verify that worker pool tasks usage are initialized', async () => {
451 const pool
= new FixedClusterPool(
453 './tests/worker-files/cluster/testWorker.js'
455 for (const workerNode
of pool
.workerNodes
) {
456 expect(workerNode
.usage
).toStrictEqual({
465 history
: expect
.any(CircularArray
)
468 history
: expect
.any(CircularArray
)
472 history
: expect
.any(CircularArray
)
475 history
: expect
.any(CircularArray
)
483 it('Verify that worker pool tasks queue are initialized', async () => {
484 const pool
= new FixedClusterPool(
486 './tests/worker-files/cluster/testWorker.js'
488 for (const workerNode
of pool
.workerNodes
) {
489 expect(workerNode
.tasksQueue
).toBeDefined()
490 expect(workerNode
.tasksQueue
).toBeInstanceOf(Queue
)
491 expect(workerNode
.tasksQueue
.size
).toBe(0)
492 expect(workerNode
.tasksQueue
.maxSize
).toBe(0)
497 it('Verify that worker pool tasks usage are computed', async () => {
498 const pool
= new FixedClusterPool(
500 './tests/worker-files/cluster/testWorker.js'
502 const promises
= new Set()
503 const maxMultiplier
= 2
504 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
505 promises
.add(pool
.execute())
507 for (const workerNode
of pool
.workerNodes
) {
508 expect(workerNode
.usage
).toStrictEqual({
511 executing
: maxMultiplier
,
517 history
: expect
.any(CircularArray
)
520 history
: expect
.any(CircularArray
)
524 history
: expect
.any(CircularArray
)
527 history
: expect
.any(CircularArray
)
532 await Promise
.all(promises
)
533 for (const workerNode
of pool
.workerNodes
) {
534 expect(workerNode
.usage
).toStrictEqual({
536 executed
: maxMultiplier
,
543 history
: expect
.any(CircularArray
)
546 history
: expect
.any(CircularArray
)
550 history
: expect
.any(CircularArray
)
553 history
: expect
.any(CircularArray
)
561 it('Verify that worker pool tasks usage are reset at worker choice strategy change', async () => {
562 const pool
= new DynamicThreadPool(
565 './tests/worker-files/thread/testWorker.js'
567 const promises
= new Set()
568 const maxMultiplier
= 2
569 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
570 promises
.add(pool
.execute())
572 await Promise
.all(promises
)
573 for (const workerNode
of pool
.workerNodes
) {
574 expect(workerNode
.usage
).toStrictEqual({
576 executed
: expect
.any(Number
),
583 history
: expect
.any(CircularArray
)
586 history
: expect
.any(CircularArray
)
590 history
: expect
.any(CircularArray
)
593 history
: expect
.any(CircularArray
)
597 expect(workerNode
.usage
.tasks
.executed
).toBeGreaterThan(0)
598 expect(workerNode
.usage
.tasks
.executed
).toBeLessThanOrEqual(maxMultiplier
)
600 pool
.setWorkerChoiceStrategy(WorkerChoiceStrategies
.FAIR_SHARE
)
601 for (const workerNode
of pool
.workerNodes
) {
602 expect(workerNode
.usage
).toStrictEqual({
611 history
: expect
.any(CircularArray
)
614 history
: expect
.any(CircularArray
)
618 history
: expect
.any(CircularArray
)
621 history
: expect
.any(CircularArray
)
625 expect(workerNode
.usage
.runTime
.history
.length
).toBe(0)
626 expect(workerNode
.usage
.waitTime
.history
.length
).toBe(0)
631 it("Verify that pool event emitter 'full' event can register a callback", async () => {
632 const pool
= new DynamicThreadPool(
635 './tests/worker-files/thread/testWorker.js'
637 const promises
= new Set()
640 pool
.emitter
.on(PoolEvents
.full
, info
=> {
644 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
645 promises
.add(pool
.execute())
647 await Promise
.all(promises
)
648 // The `full` event is triggered when the number of submitted tasks at once reach the max number of workers in the dynamic pool.
649 // So in total numberOfWorkers * 2 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool with min = max = numberOfWorkers.
650 expect(poolFull
).toBe(numberOfWorkers
* 2)
651 expect(poolInfo
).toStrictEqual({
653 type
: PoolTypes
.dynamic
,
654 worker
: WorkerTypes
.thread
,
655 minSize
: expect
.any(Number
),
656 maxSize
: expect
.any(Number
),
657 workerNodes
: expect
.any(Number
),
658 idleWorkerNodes
: expect
.any(Number
),
659 busyWorkerNodes
: expect
.any(Number
),
660 executedTasks
: expect
.any(Number
),
661 executingTasks
: expect
.any(Number
),
662 queuedTasks
: expect
.any(Number
),
663 maxQueuedTasks
: expect
.any(Number
),
664 failedTasks
: expect
.any(Number
)
669 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
670 const pool
= new FixedThreadPool(
672 './tests/worker-files/thread/testWorker.js'
674 const promises
= new Set()
677 pool
.emitter
.on(PoolEvents
.busy
, info
=> {
681 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
682 promises
.add(pool
.execute())
684 await Promise
.all(promises
)
685 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
686 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
687 expect(poolBusy
).toBe(numberOfWorkers
+ 1)
688 expect(poolInfo
).toStrictEqual({
690 type
: PoolTypes
.fixed
,
691 worker
: WorkerTypes
.thread
,
692 minSize
: expect
.any(Number
),
693 maxSize
: expect
.any(Number
),
694 workerNodes
: expect
.any(Number
),
695 idleWorkerNodes
: expect
.any(Number
),
696 busyWorkerNodes
: expect
.any(Number
),
697 executedTasks
: expect
.any(Number
),
698 executingTasks
: expect
.any(Number
),
699 queuedTasks
: expect
.any(Number
),
700 maxQueuedTasks
: expect
.any(Number
),
701 failedTasks
: expect
.any(Number
)
706 it('Verify that multiple tasks worker is working', async () => {
707 const pool
= new DynamicClusterPool(
710 './tests/worker-files/cluster/testMultiTasksWorker.js'
712 const data
= { n
: 10 }
713 const result0
= await pool
.execute(data
)
714 expect(result0
).toBe(false)
715 const result1
= await pool
.execute(data
, 'jsonIntegerSerialization')
716 expect(result1
).toBe(false)
717 const result2
= await pool
.execute(data
, 'factorial')
718 expect(result2
).toBe(3628800)
719 const result3
= await pool
.execute(data
, 'fibonacci')
720 expect(result3
).toBe(55)