1 const { expect
} = require('expect')
8 } = require('../../../lib/index')
9 const { CircularArray
} = require('../../../lib/circular-array')
11 describe('Abstract pool test suite', () => {
12 const numberOfWorkers
= 1
13 const workerNotFoundInPoolError
= new Error(
14 'Worker could not be found in the pool worker nodes'
16 class StubPoolWithRemoveAllWorker
extends FixedThreadPool
{
19 this.promiseResponseMap
.clear()
22 class StubPoolWithIsMain
extends FixedThreadPool
{
28 it('Simulate pool creation from a non main thread/process', () => {
31 new StubPoolWithIsMain(
33 './tests/worker-files/thread/testWorker.js',
35 errorHandler
: e
=> console
.error(e
)
38 ).toThrowError('Cannot start a pool from a worker!')
41 it('Verify that filePath is checked', () => {
42 const expectedError
= new Error(
43 'Please specify a file with a worker implementation'
45 expect(() => new FixedThreadPool(numberOfWorkers
)).toThrowError(
48 expect(() => new FixedThreadPool(numberOfWorkers
, '')).toThrowError(
53 it('Verify that numberOfWorkers is checked', () => {
54 expect(() => new FixedThreadPool()).toThrowError(
55 'Cannot instantiate a pool without specifying the number of workers'
59 it('Verify that a negative number of workers is checked', () => {
62 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
65 'Cannot instantiate a pool with a negative number of workers'
70 it('Verify that a non integer number of workers is checked', () => {
73 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
76 'Cannot instantiate a pool with a non integer number of workers'
81 it('Verify that pool options are checked', async () => {
82 let pool
= new FixedThreadPool(
84 './tests/worker-files/thread/testWorker.js'
86 expect(pool
.opts
.enableEvents
).toBe(true)
87 expect(pool
.emitter
).toBeDefined()
88 expect(pool
.opts
.enableTasksQueue
).toBe(false)
89 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
90 expect(pool
.opts
.workerChoiceStrategy
).toBe(
91 WorkerChoiceStrategies
.ROUND_ROBIN
93 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
96 expect(pool
.opts
.messageHandler
).toBeUndefined()
97 expect(pool
.opts
.errorHandler
).toBeUndefined()
98 expect(pool
.opts
.onlineHandler
).toBeUndefined()
99 expect(pool
.opts
.exitHandler
).toBeUndefined()
101 const testHandler
= () => console
.log('test handler executed')
102 pool
= new FixedThreadPool(
104 './tests/worker-files/thread/testWorker.js',
106 workerChoiceStrategy
: WorkerChoiceStrategies
.LESS_USED
,
107 workerChoiceStrategyOptions
: { medRunTime
: true },
109 enableTasksQueue
: true,
110 tasksQueueOptions
: { concurrency
: 2 },
111 messageHandler
: testHandler
,
112 errorHandler
: testHandler
,
113 onlineHandler
: testHandler
,
114 exitHandler
: testHandler
117 expect(pool
.opts
.enableEvents
).toBe(false)
118 expect(pool
.emitter
).toBeUndefined()
119 expect(pool
.opts
.enableTasksQueue
).toBe(true)
120 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
121 expect(pool
.opts
.workerChoiceStrategy
).toBe(
122 WorkerChoiceStrategies
.LESS_USED
124 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
127 expect(pool
.opts
.messageHandler
).toStrictEqual(testHandler
)
128 expect(pool
.opts
.errorHandler
).toStrictEqual(testHandler
)
129 expect(pool
.opts
.onlineHandler
).toStrictEqual(testHandler
)
130 expect(pool
.opts
.exitHandler
).toStrictEqual(testHandler
)
134 it('Verify that pool options are valid', async () => {
139 './tests/worker-files/thread/testWorker.js',
141 enableTasksQueue
: true,
142 tasksQueueOptions
: { concurrency
: 0 }
145 ).toThrowError("Invalid worker tasks concurrency '0'")
150 './tests/worker-files/thread/testWorker.js',
152 workerChoiceStrategy
: 'invalidStrategy'
155 ).toThrowError("Invalid worker choice strategy 'invalidStrategy'")
158 it('Simulate worker not found at getWorkerTasksUsage()', async () => {
159 const pool
= new StubPoolWithRemoveAllWorker(
161 './tests/worker-files/cluster/testWorker.js',
163 errorHandler
: e
=> console
.error(e
)
166 expect(pool
.workerNodes
.length
).toBe(numberOfWorkers
)
167 // Simulate worker not found.
168 pool
.removeAllWorker()
169 expect(pool
.workerNodes
.length
).toBe(0)
170 expect(() => pool
.getWorkerTasksUsage()).toThrowError(
171 workerNotFoundInPoolError
176 it('Verify that worker pool tasks usage are initialized', async () => {
177 const pool
= new FixedClusterPool(
179 './tests/worker-files/cluster/testWorker.js'
181 for (const workerNode
of pool
.workerNodes
) {
182 expect(workerNode
.tasksUsage
).toBeDefined()
183 expect(workerNode
.tasksUsage
.run
).toBe(0)
184 expect(workerNode
.tasksUsage
.running
).toBe(0)
185 expect(workerNode
.tasksUsage
.runTime
).toBe(0)
186 expect(workerNode
.tasksUsage
.runTimeHistory
).toBeInstanceOf(CircularArray
)
187 expect(workerNode
.tasksUsage
.runTimeHistory
.length
).toBe(0)
188 expect(workerNode
.tasksUsage
.avgRunTime
).toBe(0)
189 expect(workerNode
.tasksUsage
.medRunTime
).toBe(0)
190 expect(workerNode
.tasksUsage
.error
).toBe(0)
195 it('Verify that worker pool tasks queue are initialized', async () => {
196 const pool
= new FixedClusterPool(
198 './tests/worker-files/cluster/testWorker.js'
200 for (const workerNode
of pool
.workerNodes
) {
201 expect(workerNode
.tasksQueue
).toBeDefined()
202 expect(workerNode
.tasksQueue
).toBeInstanceOf(Array
)
203 expect(workerNode
.tasksQueue
.length
).toBe(0)
208 it('Verify that worker pool tasks usage are computed', async () => {
209 const pool
= new FixedClusterPool(
211 './tests/worker-files/cluster/testWorker.js'
214 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
215 promises
.push(pool
.execute())
217 for (const workerNode
of pool
.workerNodes
) {
218 expect(workerNode
.tasksUsage
).toBeDefined()
219 expect(workerNode
.tasksUsage
.run
).toBe(0)
220 expect(workerNode
.tasksUsage
.running
).toBe(numberOfWorkers
* 2)
221 expect(workerNode
.tasksUsage
.runTime
).toBe(0)
222 expect(workerNode
.tasksUsage
.runTimeHistory
).toBeInstanceOf(CircularArray
)
223 expect(workerNode
.tasksUsage
.runTimeHistory
.length
).toBe(0)
224 expect(workerNode
.tasksUsage
.avgRunTime
).toBe(0)
225 expect(workerNode
.tasksUsage
.medRunTime
).toBe(0)
226 expect(workerNode
.tasksUsage
.error
).toBe(0)
228 await Promise
.all(promises
)
229 for (const workerNode
of pool
.workerNodes
) {
230 expect(workerNode
.tasksUsage
).toBeDefined()
231 expect(workerNode
.tasksUsage
.run
).toBe(numberOfWorkers
* 2)
232 expect(workerNode
.tasksUsage
.running
).toBe(0)
233 expect(workerNode
.tasksUsage
.runTime
).toBeGreaterThanOrEqual(0)
234 expect(workerNode
.tasksUsage
.runTimeHistory
).toBeInstanceOf(CircularArray
)
235 expect(workerNode
.tasksUsage
.runTimeHistory
.length
).toBe(0)
236 expect(workerNode
.tasksUsage
.avgRunTime
).toBeGreaterThanOrEqual(0)
237 expect(workerNode
.tasksUsage
.medRunTime
).toBe(0)
238 expect(workerNode
.tasksUsage
.error
).toBe(0)
243 it('Verify that worker pool tasks usage are reset at worker choice strategy change', async () => {
244 const pool
= new DynamicThreadPool(
247 './tests/worker-files/thread/testWorker.js'
250 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
251 promises
.push(pool
.execute())
253 await Promise
.all(promises
)
254 for (const workerNode
of pool
.workerNodes
) {
255 expect(workerNode
.tasksUsage
).toBeDefined()
256 expect(workerNode
.tasksUsage
.run
).toBe(numberOfWorkers
* 2)
257 expect(workerNode
.tasksUsage
.running
).toBe(0)
258 expect(workerNode
.tasksUsage
.runTime
).toBeGreaterThanOrEqual(0)
259 expect(workerNode
.tasksUsage
.runTimeHistory
).toBeInstanceOf(CircularArray
)
260 expect(workerNode
.tasksUsage
.runTimeHistory
.length
).toBe(0)
261 expect(workerNode
.tasksUsage
.avgRunTime
).toBeGreaterThanOrEqual(0)
262 expect(workerNode
.tasksUsage
.medRunTime
).toBe(0)
263 expect(workerNode
.tasksUsage
.error
).toBe(0)
265 pool
.setWorkerChoiceStrategy(WorkerChoiceStrategies
.FAIR_SHARE
)
266 for (const workerNode
of pool
.workerNodes
) {
267 expect(workerNode
.tasksUsage
).toBeDefined()
268 expect(workerNode
.tasksUsage
.run
).toBe(0)
269 expect(workerNode
.tasksUsage
.running
).toBe(0)
270 expect(workerNode
.tasksUsage
.runTime
).toBe(0)
271 expect(workerNode
.tasksUsage
.runTimeHistory
).toBeInstanceOf(CircularArray
)
272 expect(workerNode
.tasksUsage
.runTimeHistory
.length
).toBe(0)
273 expect(workerNode
.tasksUsage
.avgRunTime
).toBe(0)
274 expect(workerNode
.tasksUsage
.medRunTime
).toBe(0)
275 expect(workerNode
.tasksUsage
.error
).toBe(0)
280 it("Verify that pool event emitter 'full' event can register a callback", async () => {
281 const pool
= new DynamicThreadPool(
284 './tests/worker-files/thread/testWorker.js'
288 pool
.emitter
.on(PoolEvents
.full
, () => ++poolFull
)
289 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
290 promises
.push(pool
.execute())
292 await Promise
.all(promises
)
293 // The `full` event is triggered when the number of submitted tasks at once reach the max number of workers in the dynamic pool.
294 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool.
295 expect(poolFull
).toBe(numberOfWorkers
+ 1)
299 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
300 const pool
= new FixedThreadPool(
302 './tests/worker-files/thread/testWorker.js'
306 pool
.emitter
.on(PoolEvents
.busy
, () => ++poolBusy
)
307 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
308 promises
.push(pool
.execute())
310 await Promise
.all(promises
)
311 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
312 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
313 expect(poolBusy
).toBe(numberOfWorkers
+ 1)