1 const { expect
} = require('expect')
8 } = require('../../../lib/index')
9 const { CircularArray
} = require('../../../lib/circular-array')
11 describe('Abstract pool test suite', () => {
12 const numberOfWorkers
= 1
13 const workerNotFoundInPoolError
= new Error(
14 'Worker could not be found in the pool worker nodes'
16 class StubPoolWithRemoveAllWorker
extends FixedThreadPool
{
19 this.promiseResponseMap
.clear()
22 class StubPoolWithIsMain
extends FixedThreadPool
{
28 it('Simulate pool creation from a non main thread/process', () => {
31 new StubPoolWithIsMain(
33 './tests/worker-files/thread/testWorker.js',
35 errorHandler
: e
=> console
.error(e
)
38 ).toThrowError('Cannot start a pool from a worker!')
41 it('Verify that filePath is checked', () => {
42 const expectedError
= new Error(
43 'Please specify a file with a worker implementation'
45 expect(() => new FixedThreadPool(numberOfWorkers
)).toThrowError(
48 expect(() => new FixedThreadPool(numberOfWorkers
, '')).toThrowError(
53 it('Verify that numberOfWorkers is checked', () => {
54 expect(() => new FixedThreadPool()).toThrowError(
55 'Cannot instantiate a pool without specifying the number of workers'
59 it('Verify that a negative number of workers is checked', () => {
62 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
65 'Cannot instantiate a pool with a negative number of workers'
70 it('Verify that a non integer number of workers is checked', () => {
73 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
76 'Cannot instantiate a pool with a non integer number of workers'
81 it('Verify that pool options are checked', async () => {
82 let pool
= new FixedThreadPool(
84 './tests/worker-files/thread/testWorker.js'
86 expect(pool
.opts
.enableEvents
).toBe(true)
87 expect(pool
.emitter
).toBeDefined()
88 expect(pool
.opts
.enableTasksQueue
).toBe(false)
89 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
90 expect(pool
.opts
.workerChoiceStrategy
).toBe(
91 WorkerChoiceStrategies
.ROUND_ROBIN
93 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
96 expect(pool
.opts
.messageHandler
).toBeUndefined()
97 expect(pool
.opts
.errorHandler
).toBeUndefined()
98 expect(pool
.opts
.onlineHandler
).toBeUndefined()
99 expect(pool
.opts
.exitHandler
).toBeUndefined()
101 const testHandler
= () => console
.log('test handler executed')
102 pool
= new FixedThreadPool(
104 './tests/worker-files/thread/testWorker.js',
106 workerChoiceStrategy
: WorkerChoiceStrategies
.LESS_USED
,
107 workerChoiceStrategyOptions
: { medRunTime
: true },
109 enableTasksQueue
: true,
110 tasksQueueOptions
: { concurrency
: 2 },
111 messageHandler
: testHandler
,
112 errorHandler
: testHandler
,
113 onlineHandler
: testHandler
,
114 exitHandler
: testHandler
117 expect(pool
.opts
.enableEvents
).toBe(false)
118 expect(pool
.emitter
).toBeUndefined()
119 expect(pool
.opts
.enableTasksQueue
).toBe(true)
120 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
121 expect(pool
.opts
.workerChoiceStrategy
).toBe(
122 WorkerChoiceStrategies
.LESS_USED
124 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
127 expect(pool
.opts
.messageHandler
).toStrictEqual(testHandler
)
128 expect(pool
.opts
.errorHandler
).toStrictEqual(testHandler
)
129 expect(pool
.opts
.onlineHandler
).toStrictEqual(testHandler
)
130 expect(pool
.opts
.exitHandler
).toStrictEqual(testHandler
)
134 it('Verify that pool options are validated', async () => {
139 './tests/worker-files/thread/testWorker.js',
141 enableTasksQueue
: true,
142 tasksQueueOptions
: { concurrency
: 0 }
145 ).toThrowError("Invalid worker tasks concurrency '0'")
150 './tests/worker-files/thread/testWorker.js',
152 workerChoiceStrategy
: 'invalidStrategy'
155 ).toThrowError("Invalid worker choice strategy 'invalidStrategy'")
158 it('Verify that worker choice strategy options can be set', async () => {
159 const pool
= new FixedThreadPool(
161 './tests/worker-files/thread/testWorker.js',
162 { workerChoiceStrategy
: WorkerChoiceStrategies
.FAIR_SHARE
}
164 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
167 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
168 .workerChoiceStrategies
) {
169 expect(workerChoiceStrategy
.opts
).toStrictEqual({ medRunTime
: false })
172 pool
.workerChoiceStrategyContext
.getRequiredStatistics().avgRunTime
175 pool
.workerChoiceStrategyContext
.getRequiredStatistics().medRunTime
177 pool
.setWorkerChoiceStrategyOptions({ medRunTime
: true })
178 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
181 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
182 .workerChoiceStrategies
) {
183 expect(workerChoiceStrategy
.opts
).toStrictEqual({ medRunTime
: true })
186 pool
.workerChoiceStrategyContext
.getRequiredStatistics().avgRunTime
189 pool
.workerChoiceStrategyContext
.getRequiredStatistics().medRunTime
191 pool
.setWorkerChoiceStrategyOptions({ medRunTime
: false })
192 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
195 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
196 .workerChoiceStrategies
) {
197 expect(workerChoiceStrategy
.opts
).toStrictEqual({ medRunTime
: false })
200 pool
.workerChoiceStrategyContext
.getRequiredStatistics().avgRunTime
203 pool
.workerChoiceStrategyContext
.getRequiredStatistics().medRunTime
208 it('Verify that tasks queue can be enabled/disabled', async () => {
209 const pool
= new FixedThreadPool(
211 './tests/worker-files/thread/testWorker.js'
213 expect(pool
.opts
.enableTasksQueue
).toBe(false)
214 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
215 pool
.enableTasksQueue(true)
216 expect(pool
.opts
.enableTasksQueue
).toBe(true)
217 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 1 })
218 pool
.enableTasksQueue(true, { concurrency
: 2 })
219 expect(pool
.opts
.enableTasksQueue
).toBe(true)
220 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
221 pool
.enableTasksQueue(false)
222 expect(pool
.opts
.enableTasksQueue
).toBe(false)
223 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
227 it('Verify that tasks queue options can be set', async () => {
228 const pool
= new FixedThreadPool(
230 './tests/worker-files/thread/testWorker.js',
231 { enableTasksQueue
: true }
233 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 1 })
234 pool
.setTasksQueueOptions({ concurrency
: 2 })
235 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
236 expect(() => pool
.setTasksQueueOptions({ concurrency
: 0 })).toThrowError(
237 "Invalid worker tasks concurrency '0'"
242 it('Simulate worker not found at getWorkerTasksUsage()', async () => {
243 const pool
= new StubPoolWithRemoveAllWorker(
245 './tests/worker-files/cluster/testWorker.js',
247 errorHandler
: e
=> console
.error(e
)
250 expect(pool
.workerNodes
.length
).toBe(numberOfWorkers
)
251 // Simulate worker not found.
252 pool
.removeAllWorker()
253 expect(pool
.workerNodes
.length
).toBe(0)
254 expect(() => pool
.getWorkerTasksUsage()).toThrowError(
255 workerNotFoundInPoolError
260 it('Verify that worker pool tasks usage are initialized', async () => {
261 const pool
= new FixedClusterPool(
263 './tests/worker-files/cluster/testWorker.js'
265 for (const workerNode
of pool
.workerNodes
) {
266 expect(workerNode
.tasksUsage
).toBeDefined()
267 expect(workerNode
.tasksUsage
.run
).toBe(0)
268 expect(workerNode
.tasksUsage
.running
).toBe(0)
269 expect(workerNode
.tasksUsage
.runTime
).toBe(0)
270 expect(workerNode
.tasksUsage
.runTimeHistory
).toBeInstanceOf(CircularArray
)
271 expect(workerNode
.tasksUsage
.runTimeHistory
.length
).toBe(0)
272 expect(workerNode
.tasksUsage
.avgRunTime
).toBe(0)
273 expect(workerNode
.tasksUsage
.medRunTime
).toBe(0)
274 expect(workerNode
.tasksUsage
.error
).toBe(0)
279 it('Verify that worker pool tasks queue are initialized', async () => {
280 const pool
= new FixedClusterPool(
282 './tests/worker-files/cluster/testWorker.js'
284 for (const workerNode
of pool
.workerNodes
) {
285 expect(workerNode
.tasksQueue
).toBeDefined()
286 expect(workerNode
.tasksQueue
).toBeInstanceOf(Array
)
287 expect(workerNode
.tasksQueue
.length
).toBe(0)
292 it('Verify that worker pool tasks usage are computed', async () => {
293 const pool
= new FixedClusterPool(
295 './tests/worker-files/cluster/testWorker.js'
298 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
299 promises
.push(pool
.execute())
301 for (const workerNode
of pool
.workerNodes
) {
302 expect(workerNode
.tasksUsage
).toBeDefined()
303 expect(workerNode
.tasksUsage
.run
).toBe(0)
304 expect(workerNode
.tasksUsage
.running
).toBe(numberOfWorkers
* 2)
305 expect(workerNode
.tasksUsage
.runTime
).toBe(0)
306 expect(workerNode
.tasksUsage
.runTimeHistory
).toBeInstanceOf(CircularArray
)
307 expect(workerNode
.tasksUsage
.runTimeHistory
.length
).toBe(0)
308 expect(workerNode
.tasksUsage
.avgRunTime
).toBe(0)
309 expect(workerNode
.tasksUsage
.medRunTime
).toBe(0)
310 expect(workerNode
.tasksUsage
.error
).toBe(0)
312 await Promise
.all(promises
)
313 for (const workerNode
of pool
.workerNodes
) {
314 expect(workerNode
.tasksUsage
).toBeDefined()
315 expect(workerNode
.tasksUsage
.run
).toBe(numberOfWorkers
* 2)
316 expect(workerNode
.tasksUsage
.running
).toBe(0)
317 expect(workerNode
.tasksUsage
.runTime
).toBeGreaterThanOrEqual(0)
318 expect(workerNode
.tasksUsage
.runTimeHistory
).toBeInstanceOf(CircularArray
)
319 expect(workerNode
.tasksUsage
.runTimeHistory
.length
).toBe(0)
320 expect(workerNode
.tasksUsage
.avgRunTime
).toBeGreaterThanOrEqual(0)
321 expect(workerNode
.tasksUsage
.medRunTime
).toBe(0)
322 expect(workerNode
.tasksUsage
.error
).toBe(0)
327 it('Verify that worker pool tasks usage are reset at worker choice strategy change', async () => {
328 const pool
= new DynamicThreadPool(
331 './tests/worker-files/thread/testWorker.js'
334 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
335 promises
.push(pool
.execute())
337 await Promise
.all(promises
)
338 for (const workerNode
of pool
.workerNodes
) {
339 expect(workerNode
.tasksUsage
).toBeDefined()
340 expect(workerNode
.tasksUsage
.run
).toBe(numberOfWorkers
* 2)
341 expect(workerNode
.tasksUsage
.running
).toBe(0)
342 expect(workerNode
.tasksUsage
.runTime
).toBeGreaterThanOrEqual(0)
343 expect(workerNode
.tasksUsage
.runTimeHistory
).toBeInstanceOf(CircularArray
)
344 expect(workerNode
.tasksUsage
.runTimeHistory
.length
).toBe(0)
345 expect(workerNode
.tasksUsage
.avgRunTime
).toBeGreaterThanOrEqual(0)
346 expect(workerNode
.tasksUsage
.medRunTime
).toBe(0)
347 expect(workerNode
.tasksUsage
.error
).toBe(0)
349 pool
.setWorkerChoiceStrategy(WorkerChoiceStrategies
.FAIR_SHARE
)
350 for (const workerNode
of pool
.workerNodes
) {
351 expect(workerNode
.tasksUsage
).toBeDefined()
352 expect(workerNode
.tasksUsage
.run
).toBe(0)
353 expect(workerNode
.tasksUsage
.running
).toBe(0)
354 expect(workerNode
.tasksUsage
.runTime
).toBe(0)
355 expect(workerNode
.tasksUsage
.runTimeHistory
).toBeInstanceOf(CircularArray
)
356 expect(workerNode
.tasksUsage
.runTimeHistory
.length
).toBe(0)
357 expect(workerNode
.tasksUsage
.avgRunTime
).toBe(0)
358 expect(workerNode
.tasksUsage
.medRunTime
).toBe(0)
359 expect(workerNode
.tasksUsage
.error
).toBe(0)
364 it("Verify that pool event emitter 'full' event can register a callback", async () => {
365 const pool
= new DynamicThreadPool(
368 './tests/worker-files/thread/testWorker.js'
372 pool
.emitter
.on(PoolEvents
.full
, () => ++poolFull
)
373 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
374 promises
.push(pool
.execute())
376 await Promise
.all(promises
)
377 // The `full` event is triggered when the number of submitted tasks at once reach the max number of workers in the dynamic pool.
378 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool.
379 expect(poolFull
).toBe(numberOfWorkers
+ 1)
383 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
384 const pool
= new FixedThreadPool(
386 './tests/worker-files/thread/testWorker.js'
390 pool
.emitter
.on(PoolEvents
.busy
, () => ++poolBusy
)
391 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
392 promises
.push(pool
.execute())
394 await Promise
.all(promises
)
395 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
396 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
397 expect(poolBusy
).toBe(numberOfWorkers
+ 1)