1 const { expect
} = require('expect')
9 } = require('../../../lib')
10 const { CircularArray
} = require('../../../lib/circular-array')
11 const { Queue
} = require('../../../lib/queue')
13 describe('Abstract pool test suite', () => {
14 const numberOfWorkers
= 1
15 class StubPoolWithRemoveAllWorker
extends FixedThreadPool
{
18 this.promiseResponseMap
.clear()
21 class StubPoolWithIsMain
extends FixedThreadPool
{
27 it('Simulate pool creation from a non main thread/process', () => {
30 new StubPoolWithIsMain(
32 './tests/worker-files/thread/testWorker.js',
34 errorHandler
: e
=> console
.error(e
)
37 ).toThrowError('Cannot start a pool from a worker!')
40 it('Verify that filePath is checked', () => {
41 const expectedError
= new Error(
42 'Please specify a file with a worker implementation'
44 expect(() => new FixedThreadPool(numberOfWorkers
)).toThrowError(
47 expect(() => new FixedThreadPool(numberOfWorkers
, '')).toThrowError(
52 it('Verify that numberOfWorkers is checked', () => {
53 expect(() => new FixedThreadPool()).toThrowError(
54 'Cannot instantiate a pool without specifying the number of workers'
58 it('Verify that a negative number of workers is checked', () => {
61 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
64 'Cannot instantiate a pool with a negative number of workers'
69 it('Verify that a non integer number of workers is checked', () => {
72 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
75 'Cannot instantiate a pool with a non safe integer number of workers'
80 it('Verify that pool options are checked', async () => {
81 let pool
= new FixedThreadPool(
83 './tests/worker-files/thread/testWorker.js'
85 expect(pool
.opts
.enableEvents
).toBe(true)
86 expect(pool
.emitter
).toBeDefined()
87 expect(pool
.opts
.enableTasksQueue
).toBe(false)
88 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
89 expect(pool
.opts
.workerChoiceStrategy
).toBe(
90 WorkerChoiceStrategies
.ROUND_ROBIN
92 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
95 expect(pool
.opts
.messageHandler
).toBeUndefined()
96 expect(pool
.opts
.errorHandler
).toBeUndefined()
97 expect(pool
.opts
.onlineHandler
).toBeUndefined()
98 expect(pool
.opts
.exitHandler
).toBeUndefined()
100 const testHandler
= () => console
.log('test handler executed')
101 pool
= new FixedThreadPool(
103 './tests/worker-files/thread/testWorker.js',
105 workerChoiceStrategy
: WorkerChoiceStrategies
.LESS_USED
,
106 workerChoiceStrategyOptions
: { medRunTime
: true },
108 enableTasksQueue
: true,
109 tasksQueueOptions
: { concurrency
: 2 },
110 messageHandler
: testHandler
,
111 errorHandler
: testHandler
,
112 onlineHandler
: testHandler
,
113 exitHandler
: testHandler
116 expect(pool
.opts
.enableEvents
).toBe(false)
117 expect(pool
.emitter
).toBeUndefined()
118 expect(pool
.opts
.enableTasksQueue
).toBe(true)
119 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
120 expect(pool
.opts
.workerChoiceStrategy
).toBe(
121 WorkerChoiceStrategies
.LESS_USED
123 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
126 expect(pool
.opts
.messageHandler
).toStrictEqual(testHandler
)
127 expect(pool
.opts
.errorHandler
).toStrictEqual(testHandler
)
128 expect(pool
.opts
.onlineHandler
).toStrictEqual(testHandler
)
129 expect(pool
.opts
.exitHandler
).toStrictEqual(testHandler
)
133 it('Verify that pool options are validated', async () => {
138 './tests/worker-files/thread/testWorker.js',
140 enableTasksQueue
: true,
141 tasksQueueOptions
: { concurrency
: 0 }
144 ).toThrowError("Invalid worker tasks concurrency '0'")
149 './tests/worker-files/thread/testWorker.js',
151 workerChoiceStrategy
: 'invalidStrategy'
154 ).toThrowError("Invalid worker choice strategy 'invalidStrategy'")
157 it('Verify that worker choice strategy options can be set', async () => {
158 const pool
= new FixedThreadPool(
160 './tests/worker-files/thread/testWorker.js',
161 { workerChoiceStrategy
: WorkerChoiceStrategies
.FAIR_SHARE
}
163 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
166 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
167 .workerChoiceStrategies
) {
168 expect(workerChoiceStrategy
.opts
).toStrictEqual({ medRunTime
: false })
171 pool
.workerChoiceStrategyContext
.getRequiredStatistics().avgRunTime
174 pool
.workerChoiceStrategyContext
.getRequiredStatistics().medRunTime
176 pool
.setWorkerChoiceStrategyOptions({ medRunTime
: true })
177 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
180 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
181 .workerChoiceStrategies
) {
182 expect(workerChoiceStrategy
.opts
).toStrictEqual({ medRunTime
: true })
185 pool
.workerChoiceStrategyContext
.getRequiredStatistics().avgRunTime
188 pool
.workerChoiceStrategyContext
.getRequiredStatistics().medRunTime
190 pool
.setWorkerChoiceStrategyOptions({ medRunTime
: false })
191 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
194 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
195 .workerChoiceStrategies
) {
196 expect(workerChoiceStrategy
.opts
).toStrictEqual({ medRunTime
: false })
199 pool
.workerChoiceStrategyContext
.getRequiredStatistics().avgRunTime
202 pool
.workerChoiceStrategyContext
.getRequiredStatistics().medRunTime
207 it('Verify that tasks queue can be enabled/disabled', async () => {
208 const pool
= new FixedThreadPool(
210 './tests/worker-files/thread/testWorker.js'
212 expect(pool
.opts
.enableTasksQueue
).toBe(false)
213 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
214 pool
.enableTasksQueue(true)
215 expect(pool
.opts
.enableTasksQueue
).toBe(true)
216 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 1 })
217 pool
.enableTasksQueue(true, { concurrency
: 2 })
218 expect(pool
.opts
.enableTasksQueue
).toBe(true)
219 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
220 pool
.enableTasksQueue(false)
221 expect(pool
.opts
.enableTasksQueue
).toBe(false)
222 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
226 it('Verify that tasks queue options can be set', async () => {
227 const pool
= new FixedThreadPool(
229 './tests/worker-files/thread/testWorker.js',
230 { enableTasksQueue
: true }
232 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 1 })
233 pool
.setTasksQueueOptions({ concurrency
: 2 })
234 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
235 expect(() => pool
.setTasksQueueOptions({ concurrency
: 0 })).toThrowError(
236 "Invalid worker tasks concurrency '0'"
241 it('Simulate worker not found', async () => {
242 const pool
= new StubPoolWithRemoveAllWorker(
244 './tests/worker-files/cluster/testWorker.js',
246 errorHandler
: e
=> console
.error(e
)
249 expect(pool
.workerNodes
.length
).toBe(numberOfWorkers
)
250 // Simulate worker not found.
251 pool
.removeAllWorker()
252 expect(pool
.workerNodes
.length
).toBe(0)
256 it('Verify that worker pool tasks usage are initialized', async () => {
257 const pool
= new FixedClusterPool(
259 './tests/worker-files/cluster/testWorker.js'
261 for (const workerNode
of pool
.workerNodes
) {
262 expect(workerNode
.tasksUsage
).toBeDefined()
263 expect(workerNode
.tasksUsage
.run
).toBe(0)
264 expect(workerNode
.tasksUsage
.running
).toBe(0)
265 expect(workerNode
.tasksUsage
.runTime
).toBe(0)
266 expect(workerNode
.tasksUsage
.runTimeHistory
).toBeInstanceOf(CircularArray
)
267 expect(workerNode
.tasksUsage
.runTimeHistory
.length
).toBe(0)
268 expect(workerNode
.tasksUsage
.avgRunTime
).toBe(0)
269 expect(workerNode
.tasksUsage
.medRunTime
).toBe(0)
270 expect(workerNode
.tasksUsage
.error
).toBe(0)
275 it('Verify that worker pool tasks queue are initialized', async () => {
276 const pool
= new FixedClusterPool(
278 './tests/worker-files/cluster/testWorker.js'
280 for (const workerNode
of pool
.workerNodes
) {
281 expect(workerNode
.tasksQueue
).toBeDefined()
282 expect(workerNode
.tasksQueue
).toBeInstanceOf(Queue
)
283 expect(workerNode
.tasksQueue
.size
).toBe(0)
288 it('Verify that worker pool tasks usage are computed', async () => {
289 const pool
= new FixedClusterPool(
291 './tests/worker-files/cluster/testWorker.js'
294 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
295 promises
.push(pool
.execute())
297 for (const workerNode
of pool
.workerNodes
) {
298 expect(workerNode
.tasksUsage
).toBeDefined()
299 expect(workerNode
.tasksUsage
.run
).toBe(0)
300 expect(workerNode
.tasksUsage
.running
).toBe(numberOfWorkers
* 2)
301 expect(workerNode
.tasksUsage
.runTime
).toBe(0)
302 expect(workerNode
.tasksUsage
.runTimeHistory
).toBeInstanceOf(CircularArray
)
303 expect(workerNode
.tasksUsage
.runTimeHistory
.length
).toBe(0)
304 expect(workerNode
.tasksUsage
.avgRunTime
).toBe(0)
305 expect(workerNode
.tasksUsage
.medRunTime
).toBe(0)
306 expect(workerNode
.tasksUsage
.error
).toBe(0)
308 await Promise
.all(promises
)
309 for (const workerNode
of pool
.workerNodes
) {
310 expect(workerNode
.tasksUsage
).toBeDefined()
311 expect(workerNode
.tasksUsage
.run
).toBe(numberOfWorkers
* 2)
312 expect(workerNode
.tasksUsage
.running
).toBe(0)
313 expect(workerNode
.tasksUsage
.runTime
).toBeGreaterThanOrEqual(0)
314 expect(workerNode
.tasksUsage
.runTimeHistory
).toBeInstanceOf(CircularArray
)
315 expect(workerNode
.tasksUsage
.runTimeHistory
.length
).toBe(0)
316 expect(workerNode
.tasksUsage
.avgRunTime
).toBeGreaterThanOrEqual(0)
317 expect(workerNode
.tasksUsage
.medRunTime
).toBe(0)
318 expect(workerNode
.tasksUsage
.error
).toBe(0)
323 it('Verify that worker pool tasks usage are reset at worker choice strategy change', async () => {
324 const pool
= new DynamicThreadPool(
327 './tests/worker-files/thread/testWorker.js'
330 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
331 promises
.push(pool
.execute())
333 await Promise
.all(promises
)
334 for (const workerNode
of pool
.workerNodes
) {
335 expect(workerNode
.tasksUsage
).toBeDefined()
336 expect(workerNode
.tasksUsage
.run
).toBe(numberOfWorkers
* 2)
337 expect(workerNode
.tasksUsage
.running
).toBe(0)
338 expect(workerNode
.tasksUsage
.runTime
).toBeGreaterThanOrEqual(0)
339 expect(workerNode
.tasksUsage
.runTimeHistory
).toBeInstanceOf(CircularArray
)
340 expect(workerNode
.tasksUsage
.runTimeHistory
.length
).toBe(0)
341 expect(workerNode
.tasksUsage
.avgRunTime
).toBeGreaterThanOrEqual(0)
342 expect(workerNode
.tasksUsage
.medRunTime
).toBe(0)
343 expect(workerNode
.tasksUsage
.error
).toBe(0)
345 pool
.setWorkerChoiceStrategy(WorkerChoiceStrategies
.FAIR_SHARE
)
346 for (const workerNode
of pool
.workerNodes
) {
347 expect(workerNode
.tasksUsage
).toBeDefined()
348 expect(workerNode
.tasksUsage
.run
).toBe(0)
349 expect(workerNode
.tasksUsage
.running
).toBe(0)
350 expect(workerNode
.tasksUsage
.runTime
).toBe(0)
351 expect(workerNode
.tasksUsage
.runTimeHistory
).toBeInstanceOf(CircularArray
)
352 expect(workerNode
.tasksUsage
.runTimeHistory
.length
).toBe(0)
353 expect(workerNode
.tasksUsage
.avgRunTime
).toBe(0)
354 expect(workerNode
.tasksUsage
.medRunTime
).toBe(0)
355 expect(workerNode
.tasksUsage
.error
).toBe(0)
360 it("Verify that pool event emitter 'full' event can register a callback", async () => {
361 const pool
= new DynamicThreadPool(
364 './tests/worker-files/thread/testWorker.js'
368 pool
.emitter
.on(PoolEvents
.full
, () => ++poolFull
)
369 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
370 promises
.push(pool
.execute())
372 await Promise
.all(promises
)
373 // The `full` event is triggered when the number of submitted tasks at once reach the max number of workers in the dynamic pool.
374 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool.
375 expect(poolFull
).toBe(numberOfWorkers
+ 1)
379 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
380 const pool
= new FixedThreadPool(
382 './tests/worker-files/thread/testWorker.js'
386 pool
.emitter
.on(PoolEvents
.busy
, () => ++poolBusy
)
387 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
388 promises
.push(pool
.execute())
390 await Promise
.all(promises
)
391 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
392 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
393 expect(poolBusy
).toBe(numberOfWorkers
+ 1)
397 it('Verify that multiple tasks worker is working', async () => {
398 const pool
= new DynamicClusterPool(
401 './tests/worker-files/cluster/testMultiTasksWorker.js'
403 const data
= { n
: 10 }
404 const result0
= await pool
.execute(data
)
405 expect(result0
).toBe(false)
406 const result1
= await pool
.execute(data
, 'jsonIntegerSerialization')
407 expect(result1
).toBe(false)
408 const result2
= await pool
.execute(data
, 'factorial')
409 expect(result2
).toBe(3628800)
410 const result3
= await pool
.execute(data
, 'fibonacci')
411 expect(result3
).toBe(89)