1 const { expect
} = require('expect')
9 } = require('../../../lib')
10 const { CircularArray
} = require('../../../lib/circular-array')
11 const { Queue
} = require('../../../lib/queue')
13 describe('Abstract pool test suite', () => {
14 const numberOfWorkers
= 1
15 class StubPoolWithRemoveAllWorker
extends FixedThreadPool
{
18 this.promiseResponseMap
.clear()
21 class StubPoolWithIsMain
extends FixedThreadPool
{
27 it('Simulate pool creation from a non main thread/process', () => {
30 new StubPoolWithIsMain(
32 './tests/worker-files/thread/testWorker.js',
34 errorHandler
: e
=> console
.error(e
)
37 ).toThrowError('Cannot start a pool from a worker!')
40 it('Verify that filePath is checked', () => {
41 const expectedError
= new Error(
42 'Please specify a file with a worker implementation'
44 expect(() => new FixedThreadPool(numberOfWorkers
)).toThrowError(
47 expect(() => new FixedThreadPool(numberOfWorkers
, '')).toThrowError(
52 it('Verify that numberOfWorkers is checked', () => {
53 expect(() => new FixedThreadPool()).toThrowError(
54 'Cannot instantiate a pool without specifying the number of workers'
58 it('Verify that a negative number of workers is checked', () => {
61 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
64 'Cannot instantiate a pool with a negative number of workers'
69 it('Verify that a non integer number of workers is checked', () => {
72 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
75 'Cannot instantiate a pool with a non safe integer number of workers'
80 it('Verify that pool options are checked', async () => {
81 let pool
= new FixedThreadPool(
83 './tests/worker-files/thread/testWorker.js'
85 expect(pool
.opts
.enableEvents
).toBe(true)
86 expect(pool
.emitter
).toBeDefined()
87 expect(pool
.opts
.enableTasksQueue
).toBe(false)
88 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
89 expect(pool
.opts
.workerChoiceStrategy
).toBe(
90 WorkerChoiceStrategies
.ROUND_ROBIN
92 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
95 expect(pool
.opts
.messageHandler
).toBeUndefined()
96 expect(pool
.opts
.errorHandler
).toBeUndefined()
97 expect(pool
.opts
.onlineHandler
).toBeUndefined()
98 expect(pool
.opts
.exitHandler
).toBeUndefined()
100 const testHandler
= () => console
.log('test handler executed')
101 pool
= new FixedThreadPool(
103 './tests/worker-files/thread/testWorker.js',
105 workerChoiceStrategy
: WorkerChoiceStrategies
.LESS_USED
,
106 workerChoiceStrategyOptions
: {
111 enableTasksQueue
: true,
112 tasksQueueOptions
: { concurrency
: 2 },
113 messageHandler
: testHandler
,
114 errorHandler
: testHandler
,
115 onlineHandler
: testHandler
,
116 exitHandler
: testHandler
119 expect(pool
.opts
.enableEvents
).toBe(false)
120 expect(pool
.emitter
).toBeUndefined()
121 expect(pool
.opts
.enableTasksQueue
).toBe(true)
122 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
123 expect(pool
.opts
.workerChoiceStrategy
).toBe(
124 WorkerChoiceStrategies
.LESS_USED
126 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
130 expect(pool
.opts
.messageHandler
).toStrictEqual(testHandler
)
131 expect(pool
.opts
.errorHandler
).toStrictEqual(testHandler
)
132 expect(pool
.opts
.onlineHandler
).toStrictEqual(testHandler
)
133 expect(pool
.opts
.exitHandler
).toStrictEqual(testHandler
)
137 it('Verify that pool options are validated', async () => {
142 './tests/worker-files/thread/testWorker.js',
144 enableTasksQueue
: true,
145 tasksQueueOptions
: { concurrency
: 0 }
148 ).toThrowError("Invalid worker tasks concurrency '0'")
153 './tests/worker-files/thread/testWorker.js',
155 workerChoiceStrategy
: 'invalidStrategy'
158 ).toThrowError("Invalid worker choice strategy 'invalidStrategy'")
163 './tests/worker-files/thread/testWorker.js',
165 workerChoiceStrategyOptions
: { weights
: {} }
169 'Invalid worker choice strategy options: must have a weight for each worker node'
173 it('Verify that worker choice strategy options can be set', async () => {
174 const pool
= new FixedThreadPool(
176 './tests/worker-files/thread/testWorker.js',
177 { workerChoiceStrategy
: WorkerChoiceStrategies
.FAIR_SHARE
}
179 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
182 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
183 .workerChoiceStrategies
) {
184 expect(workerChoiceStrategy
.opts
).toStrictEqual({ medRunTime
: false })
187 pool
.workerChoiceStrategyContext
.getRequiredStatistics().avgRunTime
190 pool
.workerChoiceStrategyContext
.getRequiredStatistics().medRunTime
192 pool
.setWorkerChoiceStrategyOptions({ medRunTime
: true })
193 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
196 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
197 .workerChoiceStrategies
) {
198 expect(workerChoiceStrategy
.opts
).toStrictEqual({ medRunTime
: true })
201 pool
.workerChoiceStrategyContext
.getRequiredStatistics().avgRunTime
204 pool
.workerChoiceStrategyContext
.getRequiredStatistics().medRunTime
206 pool
.setWorkerChoiceStrategyOptions({ medRunTime
: false })
207 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
210 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
211 .workerChoiceStrategies
) {
212 expect(workerChoiceStrategy
.opts
).toStrictEqual({ medRunTime
: false })
215 pool
.workerChoiceStrategyContext
.getRequiredStatistics().avgRunTime
218 pool
.workerChoiceStrategyContext
.getRequiredStatistics().medRunTime
223 it('Verify that tasks queue can be enabled/disabled', async () => {
224 const pool
= new FixedThreadPool(
226 './tests/worker-files/thread/testWorker.js'
228 expect(pool
.opts
.enableTasksQueue
).toBe(false)
229 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
230 pool
.enableTasksQueue(true)
231 expect(pool
.opts
.enableTasksQueue
).toBe(true)
232 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 1 })
233 pool
.enableTasksQueue(true, { concurrency
: 2 })
234 expect(pool
.opts
.enableTasksQueue
).toBe(true)
235 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
236 pool
.enableTasksQueue(false)
237 expect(pool
.opts
.enableTasksQueue
).toBe(false)
238 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
242 it('Verify that tasks queue options can be set', async () => {
243 const pool
= new FixedThreadPool(
245 './tests/worker-files/thread/testWorker.js',
246 { enableTasksQueue
: true }
248 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 1 })
249 pool
.setTasksQueueOptions({ concurrency
: 2 })
250 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
251 expect(() => pool
.setTasksQueueOptions({ concurrency
: 0 })).toThrowError(
252 "Invalid worker tasks concurrency '0'"
257 it('Simulate worker not found', async () => {
258 const pool
= new StubPoolWithRemoveAllWorker(
260 './tests/worker-files/cluster/testWorker.js',
262 errorHandler
: e
=> console
.error(e
)
265 expect(pool
.workerNodes
.length
).toBe(numberOfWorkers
)
266 // Simulate worker not found.
267 pool
.removeAllWorker()
268 expect(pool
.workerNodes
.length
).toBe(0)
272 it('Verify that worker pool tasks usage are initialized', async () => {
273 const pool
= new FixedClusterPool(
275 './tests/worker-files/cluster/testWorker.js'
277 for (const workerNode
of pool
.workerNodes
) {
278 expect(workerNode
.tasksUsage
).toBeDefined()
279 expect(workerNode
.tasksUsage
.run
).toBe(0)
280 expect(workerNode
.tasksUsage
.running
).toBe(0)
281 expect(workerNode
.tasksUsage
.runTime
).toBe(0)
282 expect(workerNode
.tasksUsage
.runTimeHistory
).toBeInstanceOf(CircularArray
)
283 expect(workerNode
.tasksUsage
.runTimeHistory
.length
).toBe(0)
284 expect(workerNode
.tasksUsage
.avgRunTime
).toBe(0)
285 expect(workerNode
.tasksUsage
.medRunTime
).toBe(0)
286 expect(workerNode
.tasksUsage
.error
).toBe(0)
291 it('Verify that worker pool tasks queue are initialized', async () => {
292 const pool
= new FixedClusterPool(
294 './tests/worker-files/cluster/testWorker.js'
296 for (const workerNode
of pool
.workerNodes
) {
297 expect(workerNode
.tasksQueue
).toBeDefined()
298 expect(workerNode
.tasksQueue
).toBeInstanceOf(Queue
)
299 expect(workerNode
.tasksQueue
.size
).toBe(0)
304 it('Verify that worker pool tasks usage are computed', async () => {
305 const pool
= new FixedClusterPool(
307 './tests/worker-files/cluster/testWorker.js'
310 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
311 promises
.push(pool
.execute())
313 for (const workerNode
of pool
.workerNodes
) {
314 expect(workerNode
.tasksUsage
).toBeDefined()
315 expect(workerNode
.tasksUsage
.run
).toBe(0)
316 expect(workerNode
.tasksUsage
.running
).toBe(numberOfWorkers
* 2)
317 expect(workerNode
.tasksUsage
.runTime
).toBe(0)
318 expect(workerNode
.tasksUsage
.runTimeHistory
).toBeInstanceOf(CircularArray
)
319 expect(workerNode
.tasksUsage
.runTimeHistory
.length
).toBe(0)
320 expect(workerNode
.tasksUsage
.avgRunTime
).toBe(0)
321 expect(workerNode
.tasksUsage
.medRunTime
).toBe(0)
322 expect(workerNode
.tasksUsage
.error
).toBe(0)
324 await Promise
.all(promises
)
325 for (const workerNode
of pool
.workerNodes
) {
326 expect(workerNode
.tasksUsage
).toBeDefined()
327 expect(workerNode
.tasksUsage
.run
).toBe(numberOfWorkers
* 2)
328 expect(workerNode
.tasksUsage
.running
).toBe(0)
329 expect(workerNode
.tasksUsage
.runTime
).toBeGreaterThanOrEqual(0)
330 expect(workerNode
.tasksUsage
.runTimeHistory
).toBeInstanceOf(CircularArray
)
331 expect(workerNode
.tasksUsage
.runTimeHistory
.length
).toBe(0)
332 expect(workerNode
.tasksUsage
.avgRunTime
).toBeGreaterThanOrEqual(0)
333 expect(workerNode
.tasksUsage
.medRunTime
).toBe(0)
334 expect(workerNode
.tasksUsage
.error
).toBe(0)
339 it('Verify that worker pool tasks usage are reset at worker choice strategy change', async () => {
340 const pool
= new DynamicThreadPool(
343 './tests/worker-files/thread/testWorker.js'
346 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
347 promises
.push(pool
.execute())
349 await Promise
.all(promises
)
350 for (const workerNode
of pool
.workerNodes
) {
351 expect(workerNode
.tasksUsage
).toBeDefined()
352 expect(workerNode
.tasksUsage
.run
).toBe(numberOfWorkers
* 2)
353 expect(workerNode
.tasksUsage
.running
).toBe(0)
354 expect(workerNode
.tasksUsage
.runTime
).toBeGreaterThanOrEqual(0)
355 expect(workerNode
.tasksUsage
.runTimeHistory
).toBeInstanceOf(CircularArray
)
356 expect(workerNode
.tasksUsage
.runTimeHistory
.length
).toBe(0)
357 expect(workerNode
.tasksUsage
.avgRunTime
).toBeGreaterThanOrEqual(0)
358 expect(workerNode
.tasksUsage
.medRunTime
).toBe(0)
359 expect(workerNode
.tasksUsage
.error
).toBe(0)
361 pool
.setWorkerChoiceStrategy(WorkerChoiceStrategies
.FAIR_SHARE
)
362 for (const workerNode
of pool
.workerNodes
) {
363 expect(workerNode
.tasksUsage
).toBeDefined()
364 expect(workerNode
.tasksUsage
.run
).toBe(0)
365 expect(workerNode
.tasksUsage
.running
).toBe(0)
366 expect(workerNode
.tasksUsage
.runTime
).toBe(0)
367 expect(workerNode
.tasksUsage
.runTimeHistory
).toBeInstanceOf(CircularArray
)
368 expect(workerNode
.tasksUsage
.runTimeHistory
.length
).toBe(0)
369 expect(workerNode
.tasksUsage
.avgRunTime
).toBe(0)
370 expect(workerNode
.tasksUsage
.medRunTime
).toBe(0)
371 expect(workerNode
.tasksUsage
.error
).toBe(0)
376 it("Verify that pool event emitter 'full' event can register a callback", async () => {
377 const pool
= new DynamicThreadPool(
380 './tests/worker-files/thread/testWorker.js'
384 pool
.emitter
.on(PoolEvents
.full
, () => ++poolFull
)
385 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
386 promises
.push(pool
.execute())
388 await Promise
.all(promises
)
389 // The `full` event is triggered when the number of submitted tasks at once reach the max number of workers in the dynamic pool.
390 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool.
391 expect(poolFull
).toBe(numberOfWorkers
+ 1)
395 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
396 const pool
= new FixedThreadPool(
398 './tests/worker-files/thread/testWorker.js'
402 pool
.emitter
.on(PoolEvents
.busy
, () => ++poolBusy
)
403 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
404 promises
.push(pool
.execute())
406 await Promise
.all(promises
)
407 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
408 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
409 expect(poolBusy
).toBe(numberOfWorkers
+ 1)
413 it('Verify that multiple tasks worker is working', async () => {
414 const pool
= new DynamicClusterPool(
417 './tests/worker-files/cluster/testMultiTasksWorker.js'
419 const data
= { n
: 10 }
420 const result0
= await pool
.execute(data
)
421 expect(result0
).toBe(false)
422 const result1
= await pool
.execute(data
, 'jsonIntegerSerialization')
423 expect(result1
).toBe(false)
424 const result2
= await pool
.execute(data
, 'factorial')
425 expect(result2
).toBe(3628800)
426 const result3
= await pool
.execute(data
, 'fibonacci')
427 expect(result3
).toBe(89)