1 const { expect
} = require('expect')
8 } = require('../../../lib')
9 const { CircularArray
} = require('../../../lib/circular-array')
10 const { Queue
} = require('../../../lib/queue')
12 describe('Abstract pool test suite', () => {
13 const numberOfWorkers
= 1
14 const workerNotFoundInPoolError
= new Error(
15 'Worker could not be found in the pool worker nodes'
17 class StubPoolWithRemoveAllWorker
extends FixedThreadPool
{
20 this.promiseResponseMap
.clear()
23 class StubPoolWithIsMain
extends FixedThreadPool
{
29 it('Simulate pool creation from a non main thread/process', () => {
32 new StubPoolWithIsMain(
34 './tests/worker-files/thread/testWorker.js',
36 errorHandler
: e
=> console
.error(e
)
39 ).toThrowError('Cannot start a pool from a worker!')
42 it('Verify that filePath is checked', () => {
43 const expectedError
= new Error(
44 'Please specify a file with a worker implementation'
46 expect(() => new FixedThreadPool(numberOfWorkers
)).toThrowError(
49 expect(() => new FixedThreadPool(numberOfWorkers
, '')).toThrowError(
54 it('Verify that numberOfWorkers is checked', () => {
55 expect(() => new FixedThreadPool()).toThrowError(
56 'Cannot instantiate a pool without specifying the number of workers'
60 it('Verify that a negative number of workers is checked', () => {
63 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
66 'Cannot instantiate a pool with a negative number of workers'
71 it('Verify that a non integer number of workers is checked', () => {
74 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
77 'Cannot instantiate a pool with a non integer number of workers'
82 it('Verify that pool options are checked', async () => {
83 let pool
= new FixedThreadPool(
85 './tests/worker-files/thread/testWorker.js'
87 expect(pool
.opts
.enableEvents
).toBe(true)
88 expect(pool
.emitter
).toBeDefined()
89 expect(pool
.opts
.enableTasksQueue
).toBe(false)
90 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
91 expect(pool
.opts
.workerChoiceStrategy
).toBe(
92 WorkerChoiceStrategies
.ROUND_ROBIN
94 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
97 expect(pool
.opts
.messageHandler
).toBeUndefined()
98 expect(pool
.opts
.errorHandler
).toBeUndefined()
99 expect(pool
.opts
.onlineHandler
).toBeUndefined()
100 expect(pool
.opts
.exitHandler
).toBeUndefined()
102 const testHandler
= () => console
.log('test handler executed')
103 pool
= new FixedThreadPool(
105 './tests/worker-files/thread/testWorker.js',
107 workerChoiceStrategy
: WorkerChoiceStrategies
.LESS_USED
,
108 workerChoiceStrategyOptions
: { medRunTime
: true },
110 enableTasksQueue
: true,
111 tasksQueueOptions
: { concurrency
: 2 },
112 messageHandler
: testHandler
,
113 errorHandler
: testHandler
,
114 onlineHandler
: testHandler
,
115 exitHandler
: testHandler
118 expect(pool
.opts
.enableEvents
).toBe(false)
119 expect(pool
.emitter
).toBeUndefined()
120 expect(pool
.opts
.enableTasksQueue
).toBe(true)
121 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
122 expect(pool
.opts
.workerChoiceStrategy
).toBe(
123 WorkerChoiceStrategies
.LESS_USED
125 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
128 expect(pool
.opts
.messageHandler
).toStrictEqual(testHandler
)
129 expect(pool
.opts
.errorHandler
).toStrictEqual(testHandler
)
130 expect(pool
.opts
.onlineHandler
).toStrictEqual(testHandler
)
131 expect(pool
.opts
.exitHandler
).toStrictEqual(testHandler
)
135 it('Verify that pool options are validated', async () => {
140 './tests/worker-files/thread/testWorker.js',
142 enableTasksQueue
: true,
143 tasksQueueOptions
: { concurrency
: 0 }
146 ).toThrowError("Invalid worker tasks concurrency '0'")
151 './tests/worker-files/thread/testWorker.js',
153 workerChoiceStrategy
: 'invalidStrategy'
156 ).toThrowError("Invalid worker choice strategy 'invalidStrategy'")
159 it('Verify that worker choice strategy options can be set', async () => {
160 const pool
= new FixedThreadPool(
162 './tests/worker-files/thread/testWorker.js',
163 { workerChoiceStrategy
: WorkerChoiceStrategies
.FAIR_SHARE
}
165 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
168 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
169 .workerChoiceStrategies
) {
170 expect(workerChoiceStrategy
.opts
).toStrictEqual({ medRunTime
: false })
173 pool
.workerChoiceStrategyContext
.getRequiredStatistics().avgRunTime
176 pool
.workerChoiceStrategyContext
.getRequiredStatistics().medRunTime
178 pool
.setWorkerChoiceStrategyOptions({ medRunTime
: true })
179 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
182 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
183 .workerChoiceStrategies
) {
184 expect(workerChoiceStrategy
.opts
).toStrictEqual({ medRunTime
: true })
187 pool
.workerChoiceStrategyContext
.getRequiredStatistics().avgRunTime
190 pool
.workerChoiceStrategyContext
.getRequiredStatistics().medRunTime
192 pool
.setWorkerChoiceStrategyOptions({ medRunTime
: false })
193 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
196 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
197 .workerChoiceStrategies
) {
198 expect(workerChoiceStrategy
.opts
).toStrictEqual({ medRunTime
: false })
201 pool
.workerChoiceStrategyContext
.getRequiredStatistics().avgRunTime
204 pool
.workerChoiceStrategyContext
.getRequiredStatistics().medRunTime
209 it('Verify that tasks queue can be enabled/disabled', async () => {
210 const pool
= new FixedThreadPool(
212 './tests/worker-files/thread/testWorker.js'
214 expect(pool
.opts
.enableTasksQueue
).toBe(false)
215 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
216 pool
.enableTasksQueue(true)
217 expect(pool
.opts
.enableTasksQueue
).toBe(true)
218 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 1 })
219 pool
.enableTasksQueue(true, { concurrency
: 2 })
220 expect(pool
.opts
.enableTasksQueue
).toBe(true)
221 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
222 pool
.enableTasksQueue(false)
223 expect(pool
.opts
.enableTasksQueue
).toBe(false)
224 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
228 it('Verify that tasks queue options can be set', async () => {
229 const pool
= new FixedThreadPool(
231 './tests/worker-files/thread/testWorker.js',
232 { enableTasksQueue
: true }
234 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 1 })
235 pool
.setTasksQueueOptions({ concurrency
: 2 })
236 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
237 expect(() => pool
.setTasksQueueOptions({ concurrency
: 0 })).toThrowError(
238 "Invalid worker tasks concurrency '0'"
243 it('Simulate worker not found at getWorkerTasksUsage()', async () => {
244 const pool
= new StubPoolWithRemoveAllWorker(
246 './tests/worker-files/cluster/testWorker.js',
248 errorHandler
: e
=> console
.error(e
)
251 expect(pool
.workerNodes
.length
).toBe(numberOfWorkers
)
252 // Simulate worker not found.
253 pool
.removeAllWorker()
254 expect(pool
.workerNodes
.length
).toBe(0)
255 expect(() => pool
.getWorkerTasksUsage()).toThrowError(
256 workerNotFoundInPoolError
261 it('Verify that worker pool tasks usage are initialized', async () => {
262 const pool
= new FixedClusterPool(
264 './tests/worker-files/cluster/testWorker.js'
266 for (const workerNode
of pool
.workerNodes
) {
267 expect(workerNode
.tasksUsage
).toBeDefined()
268 expect(workerNode
.tasksUsage
.run
).toBe(0)
269 expect(workerNode
.tasksUsage
.running
).toBe(0)
270 expect(workerNode
.tasksUsage
.runTime
).toBe(0)
271 expect(workerNode
.tasksUsage
.runTimeHistory
).toBeInstanceOf(CircularArray
)
272 expect(workerNode
.tasksUsage
.runTimeHistory
.length
).toBe(0)
273 expect(workerNode
.tasksUsage
.avgRunTime
).toBe(0)
274 expect(workerNode
.tasksUsage
.medRunTime
).toBe(0)
275 expect(workerNode
.tasksUsage
.error
).toBe(0)
280 it('Verify that worker pool tasks queue are initialized', async () => {
281 const pool
= new FixedClusterPool(
283 './tests/worker-files/cluster/testWorker.js'
285 for (const workerNode
of pool
.workerNodes
) {
286 expect(workerNode
.tasksQueue
).toBeDefined()
287 expect(workerNode
.tasksQueue
).toBeInstanceOf(Queue
)
288 expect(workerNode
.tasksQueue
.size
).toBe(0)
293 it('Verify that worker pool tasks usage are computed', async () => {
294 const pool
= new FixedClusterPool(
296 './tests/worker-files/cluster/testWorker.js'
299 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
300 promises
.push(pool
.execute())
302 for (const workerNode
of pool
.workerNodes
) {
303 expect(workerNode
.tasksUsage
).toBeDefined()
304 expect(workerNode
.tasksUsage
.run
).toBe(0)
305 expect(workerNode
.tasksUsage
.running
).toBe(numberOfWorkers
* 2)
306 expect(workerNode
.tasksUsage
.runTime
).toBe(0)
307 expect(workerNode
.tasksUsage
.runTimeHistory
).toBeInstanceOf(CircularArray
)
308 expect(workerNode
.tasksUsage
.runTimeHistory
.length
).toBe(0)
309 expect(workerNode
.tasksUsage
.avgRunTime
).toBe(0)
310 expect(workerNode
.tasksUsage
.medRunTime
).toBe(0)
311 expect(workerNode
.tasksUsage
.error
).toBe(0)
313 await Promise
.all(promises
)
314 for (const workerNode
of pool
.workerNodes
) {
315 expect(workerNode
.tasksUsage
).toBeDefined()
316 expect(workerNode
.tasksUsage
.run
).toBe(numberOfWorkers
* 2)
317 expect(workerNode
.tasksUsage
.running
).toBe(0)
318 expect(workerNode
.tasksUsage
.runTime
).toBeGreaterThanOrEqual(0)
319 expect(workerNode
.tasksUsage
.runTimeHistory
).toBeInstanceOf(CircularArray
)
320 expect(workerNode
.tasksUsage
.runTimeHistory
.length
).toBe(0)
321 expect(workerNode
.tasksUsage
.avgRunTime
).toBeGreaterThanOrEqual(0)
322 expect(workerNode
.tasksUsage
.medRunTime
).toBe(0)
323 expect(workerNode
.tasksUsage
.error
).toBe(0)
328 it('Verify that worker pool tasks usage are reset at worker choice strategy change', async () => {
329 const pool
= new DynamicThreadPool(
332 './tests/worker-files/thread/testWorker.js'
335 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
336 promises
.push(pool
.execute())
338 await Promise
.all(promises
)
339 for (const workerNode
of pool
.workerNodes
) {
340 expect(workerNode
.tasksUsage
).toBeDefined()
341 expect(workerNode
.tasksUsage
.run
).toBe(numberOfWorkers
* 2)
342 expect(workerNode
.tasksUsage
.running
).toBe(0)
343 expect(workerNode
.tasksUsage
.runTime
).toBeGreaterThanOrEqual(0)
344 expect(workerNode
.tasksUsage
.runTimeHistory
).toBeInstanceOf(CircularArray
)
345 expect(workerNode
.tasksUsage
.runTimeHistory
.length
).toBe(0)
346 expect(workerNode
.tasksUsage
.avgRunTime
).toBeGreaterThanOrEqual(0)
347 expect(workerNode
.tasksUsage
.medRunTime
).toBe(0)
348 expect(workerNode
.tasksUsage
.error
).toBe(0)
350 pool
.setWorkerChoiceStrategy(WorkerChoiceStrategies
.FAIR_SHARE
)
351 for (const workerNode
of pool
.workerNodes
) {
352 expect(workerNode
.tasksUsage
).toBeDefined()
353 expect(workerNode
.tasksUsage
.run
).toBe(0)
354 expect(workerNode
.tasksUsage
.running
).toBe(0)
355 expect(workerNode
.tasksUsage
.runTime
).toBe(0)
356 expect(workerNode
.tasksUsage
.runTimeHistory
).toBeInstanceOf(CircularArray
)
357 expect(workerNode
.tasksUsage
.runTimeHistory
.length
).toBe(0)
358 expect(workerNode
.tasksUsage
.avgRunTime
).toBe(0)
359 expect(workerNode
.tasksUsage
.medRunTime
).toBe(0)
360 expect(workerNode
.tasksUsage
.error
).toBe(0)
365 it("Verify that pool event emitter 'full' event can register a callback", async () => {
366 const pool
= new DynamicThreadPool(
369 './tests/worker-files/thread/testWorker.js'
373 pool
.emitter
.on(PoolEvents
.full
, () => ++poolFull
)
374 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
375 promises
.push(pool
.execute())
377 await Promise
.all(promises
)
378 // The `full` event is triggered when the number of submitted tasks at once reach the max number of workers in the dynamic pool.
379 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool.
380 expect(poolFull
).toBe(numberOfWorkers
+ 1)
384 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
385 const pool
= new FixedThreadPool(
387 './tests/worker-files/thread/testWorker.js'
391 pool
.emitter
.on(PoolEvents
.busy
, () => ++poolBusy
)
392 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
393 promises
.push(pool
.execute())
395 await Promise
.all(promises
)
396 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
397 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
398 expect(poolBusy
).toBe(numberOfWorkers
+ 1)