1 const { expect
} = require('expect')
9 } = require('../../../lib')
10 const { CircularArray
} = require('../../../lib/circular-array')
11 const { Queue
} = require('../../../lib/queue')
13 describe('Abstract pool test suite', () => {
14 const numberOfWorkers
= 1
15 const workerNotFoundInPoolError
= new Error(
16 'Worker could not be found in the pool worker nodes'
18 class StubPoolWithRemoveAllWorker
extends FixedThreadPool
{
21 this.promiseResponseMap
.clear()
24 class StubPoolWithIsMain
extends FixedThreadPool
{
30 it('Simulate pool creation from a non main thread/process', () => {
33 new StubPoolWithIsMain(
35 './tests/worker-files/thread/testWorker.js',
37 errorHandler
: e
=> console
.error(e
)
40 ).toThrowError('Cannot start a pool from a worker!')
43 it('Verify that filePath is checked', () => {
44 const expectedError
= new Error(
45 'Please specify a file with a worker implementation'
47 expect(() => new FixedThreadPool(numberOfWorkers
)).toThrowError(
50 expect(() => new FixedThreadPool(numberOfWorkers
, '')).toThrowError(
55 it('Verify that numberOfWorkers is checked', () => {
56 expect(() => new FixedThreadPool()).toThrowError(
57 'Cannot instantiate a pool without specifying the number of workers'
61 it('Verify that a negative number of workers is checked', () => {
64 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
67 'Cannot instantiate a pool with a negative number of workers'
72 it('Verify that a non integer number of workers is checked', () => {
75 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
78 'Cannot instantiate a pool with a non integer number of workers'
83 it('Verify that pool options are checked', async () => {
84 let pool
= new FixedThreadPool(
86 './tests/worker-files/thread/testWorker.js'
88 expect(pool
.opts
.enableEvents
).toBe(true)
89 expect(pool
.emitter
).toBeDefined()
90 expect(pool
.opts
.enableTasksQueue
).toBe(false)
91 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
92 expect(pool
.opts
.workerChoiceStrategy
).toBe(
93 WorkerChoiceStrategies
.ROUND_ROBIN
95 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
98 expect(pool
.opts
.messageHandler
).toBeUndefined()
99 expect(pool
.opts
.errorHandler
).toBeUndefined()
100 expect(pool
.opts
.onlineHandler
).toBeUndefined()
101 expect(pool
.opts
.exitHandler
).toBeUndefined()
103 const testHandler
= () => console
.log('test handler executed')
104 pool
= new FixedThreadPool(
106 './tests/worker-files/thread/testWorker.js',
108 workerChoiceStrategy
: WorkerChoiceStrategies
.LESS_USED
,
109 workerChoiceStrategyOptions
: { medRunTime
: true },
111 enableTasksQueue
: true,
112 tasksQueueOptions
: { concurrency
: 2 },
113 messageHandler
: testHandler
,
114 errorHandler
: testHandler
,
115 onlineHandler
: testHandler
,
116 exitHandler
: testHandler
119 expect(pool
.opts
.enableEvents
).toBe(false)
120 expect(pool
.emitter
).toBeUndefined()
121 expect(pool
.opts
.enableTasksQueue
).toBe(true)
122 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
123 expect(pool
.opts
.workerChoiceStrategy
).toBe(
124 WorkerChoiceStrategies
.LESS_USED
126 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
129 expect(pool
.opts
.messageHandler
).toStrictEqual(testHandler
)
130 expect(pool
.opts
.errorHandler
).toStrictEqual(testHandler
)
131 expect(pool
.opts
.onlineHandler
).toStrictEqual(testHandler
)
132 expect(pool
.opts
.exitHandler
).toStrictEqual(testHandler
)
136 it('Verify that pool options are validated', async () => {
141 './tests/worker-files/thread/testWorker.js',
143 enableTasksQueue
: true,
144 tasksQueueOptions
: { concurrency
: 0 }
147 ).toThrowError("Invalid worker tasks concurrency '0'")
152 './tests/worker-files/thread/testWorker.js',
154 workerChoiceStrategy
: 'invalidStrategy'
157 ).toThrowError("Invalid worker choice strategy 'invalidStrategy'")
160 it('Verify that worker choice strategy options can be set', async () => {
161 const pool
= new FixedThreadPool(
163 './tests/worker-files/thread/testWorker.js',
164 { workerChoiceStrategy
: WorkerChoiceStrategies
.FAIR_SHARE
}
166 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
169 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
170 .workerChoiceStrategies
) {
171 expect(workerChoiceStrategy
.opts
).toStrictEqual({ medRunTime
: false })
174 pool
.workerChoiceStrategyContext
.getRequiredStatistics().avgRunTime
177 pool
.workerChoiceStrategyContext
.getRequiredStatistics().medRunTime
179 pool
.setWorkerChoiceStrategyOptions({ medRunTime
: true })
180 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
183 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
184 .workerChoiceStrategies
) {
185 expect(workerChoiceStrategy
.opts
).toStrictEqual({ medRunTime
: true })
188 pool
.workerChoiceStrategyContext
.getRequiredStatistics().avgRunTime
191 pool
.workerChoiceStrategyContext
.getRequiredStatistics().medRunTime
193 pool
.setWorkerChoiceStrategyOptions({ medRunTime
: false })
194 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
197 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
198 .workerChoiceStrategies
) {
199 expect(workerChoiceStrategy
.opts
).toStrictEqual({ medRunTime
: false })
202 pool
.workerChoiceStrategyContext
.getRequiredStatistics().avgRunTime
205 pool
.workerChoiceStrategyContext
.getRequiredStatistics().medRunTime
210 it('Verify that tasks queue can be enabled/disabled', async () => {
211 const pool
= new FixedThreadPool(
213 './tests/worker-files/thread/testWorker.js'
215 expect(pool
.opts
.enableTasksQueue
).toBe(false)
216 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
217 pool
.enableTasksQueue(true)
218 expect(pool
.opts
.enableTasksQueue
).toBe(true)
219 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 1 })
220 pool
.enableTasksQueue(true, { concurrency
: 2 })
221 expect(pool
.opts
.enableTasksQueue
).toBe(true)
222 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
223 pool
.enableTasksQueue(false)
224 expect(pool
.opts
.enableTasksQueue
).toBe(false)
225 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
229 it('Verify that tasks queue options can be set', async () => {
230 const pool
= new FixedThreadPool(
232 './tests/worker-files/thread/testWorker.js',
233 { enableTasksQueue
: true }
235 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 1 })
236 pool
.setTasksQueueOptions({ concurrency
: 2 })
237 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
238 expect(() => pool
.setTasksQueueOptions({ concurrency
: 0 })).toThrowError(
239 "Invalid worker tasks concurrency '0'"
244 it('Simulate worker not found at getWorkerTasksUsage()', async () => {
245 const pool
= new StubPoolWithRemoveAllWorker(
247 './tests/worker-files/cluster/testWorker.js',
249 errorHandler
: e
=> console
.error(e
)
252 expect(pool
.workerNodes
.length
).toBe(numberOfWorkers
)
253 // Simulate worker not found.
254 pool
.removeAllWorker()
255 expect(pool
.workerNodes
.length
).toBe(0)
256 expect(() => pool
.getWorkerTasksUsage()).toThrowError(
257 workerNotFoundInPoolError
262 it('Verify that worker pool tasks usage are initialized', async () => {
263 const pool
= new FixedClusterPool(
265 './tests/worker-files/cluster/testWorker.js'
267 for (const workerNode
of pool
.workerNodes
) {
268 expect(workerNode
.tasksUsage
).toBeDefined()
269 expect(workerNode
.tasksUsage
.run
).toBe(0)
270 expect(workerNode
.tasksUsage
.running
).toBe(0)
271 expect(workerNode
.tasksUsage
.runTime
).toBe(0)
272 expect(workerNode
.tasksUsage
.runTimeHistory
).toBeInstanceOf(CircularArray
)
273 expect(workerNode
.tasksUsage
.runTimeHistory
.length
).toBe(0)
274 expect(workerNode
.tasksUsage
.avgRunTime
).toBe(0)
275 expect(workerNode
.tasksUsage
.medRunTime
).toBe(0)
276 expect(workerNode
.tasksUsage
.error
).toBe(0)
281 it('Verify that worker pool tasks queue are initialized', async () => {
282 const pool
= new FixedClusterPool(
284 './tests/worker-files/cluster/testWorker.js'
286 for (const workerNode
of pool
.workerNodes
) {
287 expect(workerNode
.tasksQueue
).toBeDefined()
288 expect(workerNode
.tasksQueue
).toBeInstanceOf(Queue
)
289 expect(workerNode
.tasksQueue
.size
).toBe(0)
294 it('Verify that worker pool tasks usage are computed', async () => {
295 const pool
= new FixedClusterPool(
297 './tests/worker-files/cluster/testWorker.js'
300 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
301 promises
.push(pool
.execute())
303 for (const workerNode
of pool
.workerNodes
) {
304 expect(workerNode
.tasksUsage
).toBeDefined()
305 expect(workerNode
.tasksUsage
.run
).toBe(0)
306 expect(workerNode
.tasksUsage
.running
).toBe(numberOfWorkers
* 2)
307 expect(workerNode
.tasksUsage
.runTime
).toBe(0)
308 expect(workerNode
.tasksUsage
.runTimeHistory
).toBeInstanceOf(CircularArray
)
309 expect(workerNode
.tasksUsage
.runTimeHistory
.length
).toBe(0)
310 expect(workerNode
.tasksUsage
.avgRunTime
).toBe(0)
311 expect(workerNode
.tasksUsage
.medRunTime
).toBe(0)
312 expect(workerNode
.tasksUsage
.error
).toBe(0)
314 await Promise
.all(promises
)
315 for (const workerNode
of pool
.workerNodes
) {
316 expect(workerNode
.tasksUsage
).toBeDefined()
317 expect(workerNode
.tasksUsage
.run
).toBe(numberOfWorkers
* 2)
318 expect(workerNode
.tasksUsage
.running
).toBe(0)
319 expect(workerNode
.tasksUsage
.runTime
).toBeGreaterThanOrEqual(0)
320 expect(workerNode
.tasksUsage
.runTimeHistory
).toBeInstanceOf(CircularArray
)
321 expect(workerNode
.tasksUsage
.runTimeHistory
.length
).toBe(0)
322 expect(workerNode
.tasksUsage
.avgRunTime
).toBeGreaterThanOrEqual(0)
323 expect(workerNode
.tasksUsage
.medRunTime
).toBe(0)
324 expect(workerNode
.tasksUsage
.error
).toBe(0)
329 it('Verify that worker pool tasks usage are reset at worker choice strategy change', async () => {
330 const pool
= new DynamicThreadPool(
333 './tests/worker-files/thread/testWorker.js'
336 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
337 promises
.push(pool
.execute())
339 await Promise
.all(promises
)
340 for (const workerNode
of pool
.workerNodes
) {
341 expect(workerNode
.tasksUsage
).toBeDefined()
342 expect(workerNode
.tasksUsage
.run
).toBe(numberOfWorkers
* 2)
343 expect(workerNode
.tasksUsage
.running
).toBe(0)
344 expect(workerNode
.tasksUsage
.runTime
).toBeGreaterThanOrEqual(0)
345 expect(workerNode
.tasksUsage
.runTimeHistory
).toBeInstanceOf(CircularArray
)
346 expect(workerNode
.tasksUsage
.runTimeHistory
.length
).toBe(0)
347 expect(workerNode
.tasksUsage
.avgRunTime
).toBeGreaterThanOrEqual(0)
348 expect(workerNode
.tasksUsage
.medRunTime
).toBe(0)
349 expect(workerNode
.tasksUsage
.error
).toBe(0)
351 pool
.setWorkerChoiceStrategy(WorkerChoiceStrategies
.FAIR_SHARE
)
352 for (const workerNode
of pool
.workerNodes
) {
353 expect(workerNode
.tasksUsage
).toBeDefined()
354 expect(workerNode
.tasksUsage
.run
).toBe(0)
355 expect(workerNode
.tasksUsage
.running
).toBe(0)
356 expect(workerNode
.tasksUsage
.runTime
).toBe(0)
357 expect(workerNode
.tasksUsage
.runTimeHistory
).toBeInstanceOf(CircularArray
)
358 expect(workerNode
.tasksUsage
.runTimeHistory
.length
).toBe(0)
359 expect(workerNode
.tasksUsage
.avgRunTime
).toBe(0)
360 expect(workerNode
.tasksUsage
.medRunTime
).toBe(0)
361 expect(workerNode
.tasksUsage
.error
).toBe(0)
366 it("Verify that pool event emitter 'full' event can register a callback", async () => {
367 const pool
= new DynamicThreadPool(
370 './tests/worker-files/thread/testWorker.js'
374 pool
.emitter
.on(PoolEvents
.full
, () => ++poolFull
)
375 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
376 promises
.push(pool
.execute())
378 await Promise
.all(promises
)
379 // The `full` event is triggered when the number of submitted tasks at once reach the max number of workers in the dynamic pool.
380 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool.
381 expect(poolFull
).toBe(numberOfWorkers
+ 1)
385 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
386 const pool
= new FixedThreadPool(
388 './tests/worker-files/thread/testWorker.js'
392 pool
.emitter
.on(PoolEvents
.busy
, () => ++poolBusy
)
393 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
394 promises
.push(pool
.execute())
396 await Promise
.all(promises
)
397 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
398 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
399 expect(poolBusy
).toBe(numberOfWorkers
+ 1)
403 it('Verify that multiple tasks worker is working', async () => {
404 const pool
= new DynamicClusterPool(
407 './tests/worker-files/cluster/testMultiTasksWorker.js'
409 const data
= { n
: 10 }
410 const result0
= await pool
.execute(data
)
411 expect(result0
).toBe(false)
412 const result1
= await pool
.execute(data
, 'jsonIntegerSerialization')
413 expect(result1
).toBe(false)
414 const result2
= await pool
.execute(data
, 'factorial')
415 expect(result2
).toBe(3628800)
416 const result3
= await pool
.execute(data
, 'fibonacci')
417 expect(result3
).toBe(89)