1 const { expect
} = require('expect')
9 } = require('../../../lib')
10 const { CircularArray
} = require('../../../lib/circular-array')
11 const { Queue
} = require('../../../lib/queue')
13 describe('Abstract pool test suite', () => {
14 const numberOfWorkers
= 2
15 class StubPoolWithRemoveAllWorker
extends FixedThreadPool
{
18 this.promiseResponseMap
.clear()
21 class StubPoolWithIsMain
extends FixedThreadPool
{
27 it('Simulate pool creation from a non main thread/process', () => {
30 new StubPoolWithIsMain(
32 './tests/worker-files/thread/testWorker.js',
34 errorHandler
: e
=> console
.error(e
)
37 ).toThrowError('Cannot start a pool from a worker!')
40 it('Verify that filePath is checked', () => {
41 const expectedError
= new Error(
42 'Please specify a file with a worker implementation'
44 expect(() => new FixedThreadPool(numberOfWorkers
)).toThrowError(
47 expect(() => new FixedThreadPool(numberOfWorkers
, '')).toThrowError(
52 it('Verify that numberOfWorkers is checked', () => {
53 expect(() => new FixedThreadPool()).toThrowError(
54 'Cannot instantiate a pool without specifying the number of workers'
58 it('Verify that a negative number of workers is checked', () => {
61 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
64 'Cannot instantiate a pool with a negative number of workers'
69 it('Verify that a non integer number of workers is checked', () => {
72 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
75 'Cannot instantiate a pool with a non safe integer number of workers'
80 it('Verify that pool options are checked', async () => {
81 let pool
= new FixedThreadPool(
83 './tests/worker-files/thread/testWorker.js'
85 expect(pool
.opts
.enableEvents
).toBe(true)
86 expect(pool
.emitter
).toBeDefined()
87 expect(pool
.opts
.enableTasksQueue
).toBe(false)
88 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
89 expect(pool
.opts
.workerChoiceStrategy
).toBe(
90 WorkerChoiceStrategies
.ROUND_ROBIN
92 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
96 expect(pool
.opts
.messageHandler
).toBeUndefined()
97 expect(pool
.opts
.errorHandler
).toBeUndefined()
98 expect(pool
.opts
.onlineHandler
).toBeUndefined()
99 expect(pool
.opts
.exitHandler
).toBeUndefined()
101 const testHandler
= () => console
.log('test handler executed')
102 pool
= new FixedThreadPool(
104 './tests/worker-files/thread/testWorker.js',
106 workerChoiceStrategy
: WorkerChoiceStrategies
.LEAST_USED
,
107 workerChoiceStrategyOptions
: {
109 weights
: { 0: 300, 1: 200 }
112 enableTasksQueue
: true,
113 tasksQueueOptions
: { concurrency
: 2 },
114 messageHandler
: testHandler
,
115 errorHandler
: testHandler
,
116 onlineHandler
: testHandler
,
117 exitHandler
: testHandler
120 expect(pool
.opts
.enableEvents
).toBe(false)
121 expect(pool
.emitter
).toBeUndefined()
122 expect(pool
.opts
.enableTasksQueue
).toBe(true)
123 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
124 expect(pool
.opts
.workerChoiceStrategy
).toBe(
125 WorkerChoiceStrategies
.LEAST_USED
127 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
129 weights
: { 0: 300, 1: 200 }
131 expect(pool
.opts
.messageHandler
).toStrictEqual(testHandler
)
132 expect(pool
.opts
.errorHandler
).toStrictEqual(testHandler
)
133 expect(pool
.opts
.onlineHandler
).toStrictEqual(testHandler
)
134 expect(pool
.opts
.exitHandler
).toStrictEqual(testHandler
)
138 it('Verify that pool options are validated', async () => {
143 './tests/worker-files/thread/testWorker.js',
145 enableTasksQueue
: true,
146 tasksQueueOptions
: { concurrency
: 0 }
149 ).toThrowError("Invalid worker tasks concurrency '0'")
154 './tests/worker-files/thread/testWorker.js',
156 workerChoiceStrategy
: 'invalidStrategy'
159 ).toThrowError("Invalid worker choice strategy 'invalidStrategy'")
164 './tests/worker-files/thread/testWorker.js',
166 workerChoiceStrategyOptions
: { weights
: {} }
170 'Invalid worker choice strategy options: must have a weight for each worker node'
174 it('Verify that worker choice strategy options can be set', async () => {
175 const pool
= new FixedThreadPool(
177 './tests/worker-files/thread/testWorker.js',
178 { workerChoiceStrategy
: WorkerChoiceStrategies
.FAIR_SHARE
}
180 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
184 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
185 .workerChoiceStrategies
) {
186 expect(workerChoiceStrategy
.opts
).toStrictEqual({
192 pool
.workerChoiceStrategyContext
.getRequiredStatistics()
201 pool
.setWorkerChoiceStrategyOptions({ medRunTime
: true })
202 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
205 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
206 .workerChoiceStrategies
) {
207 expect(workerChoiceStrategy
.opts
).toStrictEqual({ medRunTime
: true })
210 pool
.workerChoiceStrategyContext
.getRequiredStatistics()
219 pool
.setWorkerChoiceStrategyOptions({ medRunTime
: false })
220 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
223 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
224 .workerChoiceStrategies
) {
225 expect(workerChoiceStrategy
.opts
).toStrictEqual({ medRunTime
: false })
228 pool
.workerChoiceStrategyContext
.getRequiredStatistics()
240 it('Verify that tasks queue can be enabled/disabled', async () => {
241 const pool
= new FixedThreadPool(
243 './tests/worker-files/thread/testWorker.js'
245 expect(pool
.opts
.enableTasksQueue
).toBe(false)
246 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
247 pool
.enableTasksQueue(true)
248 expect(pool
.opts
.enableTasksQueue
).toBe(true)
249 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 1 })
250 pool
.enableTasksQueue(true, { concurrency
: 2 })
251 expect(pool
.opts
.enableTasksQueue
).toBe(true)
252 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
253 pool
.enableTasksQueue(false)
254 expect(pool
.opts
.enableTasksQueue
).toBe(false)
255 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
259 it('Verify that tasks queue options can be set', async () => {
260 const pool
= new FixedThreadPool(
262 './tests/worker-files/thread/testWorker.js',
263 { enableTasksQueue
: true }
265 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 1 })
266 pool
.setTasksQueueOptions({ concurrency
: 2 })
267 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
268 expect(() => pool
.setTasksQueueOptions({ concurrency
: 0 })).toThrowError(
269 "Invalid worker tasks concurrency '0'"
274 it('Simulate worker not found', async () => {
275 const pool
= new StubPoolWithRemoveAllWorker(
277 './tests/worker-files/cluster/testWorker.js',
279 errorHandler
: e
=> console
.error(e
)
282 expect(pool
.workerNodes
.length
).toBe(numberOfWorkers
)
283 // Simulate worker not found.
284 pool
.removeAllWorker()
285 expect(pool
.workerNodes
.length
).toBe(0)
289 it('Verify that worker pool tasks usage are initialized', async () => {
290 const pool
= new FixedClusterPool(
292 './tests/worker-files/cluster/testWorker.js'
294 for (const workerNode
of pool
.workerNodes
) {
295 expect(workerNode
.tasksUsage
).toStrictEqual({
299 runTimeHistory
: expect
.any(CircularArray
),
303 waitTimeHistory
: expect
.any(CircularArray
),
312 it('Verify that worker pool tasks queue are initialized', async () => {
313 const pool
= new FixedClusterPool(
315 './tests/worker-files/cluster/testWorker.js'
317 for (const workerNode
of pool
.workerNodes
) {
318 expect(workerNode
.tasksQueue
).toBeDefined()
319 expect(workerNode
.tasksQueue
).toBeInstanceOf(Queue
)
320 expect(workerNode
.tasksQueue
.size
).toBe(0)
325 it('Verify that worker pool tasks usage are computed', async () => {
326 const pool
= new FixedClusterPool(
328 './tests/worker-files/cluster/testWorker.js'
330 const promises
= new Set()
331 const maxMultiplier
= 2
332 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
333 promises
.add(pool
.execute())
335 for (const workerNode
of pool
.workerNodes
) {
336 expect(workerNode
.tasksUsage
).toStrictEqual({
338 running
: maxMultiplier
,
340 runTimeHistory
: expect
.any(CircularArray
),
344 waitTimeHistory
: expect
.any(CircularArray
),
350 await Promise
.all(promises
)
351 for (const workerNode
of pool
.workerNodes
) {
352 expect(workerNode
.tasksUsage
).toStrictEqual({
356 runTimeHistory
: expect
.any(CircularArray
),
360 waitTimeHistory
: expect
.any(CircularArray
),
369 it('Verify that worker pool tasks usage are reset at worker choice strategy change', async () => {
370 const pool
= new DynamicThreadPool(
373 './tests/worker-files/thread/testWorker.js'
375 const promises
= new Set()
376 const maxMultiplier
= 2
377 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
378 promises
.add(pool
.execute())
380 await Promise
.all(promises
)
381 for (const workerNode
of pool
.workerNodes
) {
382 expect(workerNode
.tasksUsage
).toStrictEqual({
383 run
: expect
.any(Number
),
386 runTimeHistory
: expect
.any(CircularArray
),
390 waitTimeHistory
: expect
.any(CircularArray
),
395 expect(workerNode
.tasksUsage
.run
).toBeGreaterThan(0)
396 expect(workerNode
.tasksUsage
.run
).toBeLessThanOrEqual(maxMultiplier
)
398 pool
.setWorkerChoiceStrategy(WorkerChoiceStrategies
.FAIR_SHARE
)
399 for (const workerNode
of pool
.workerNodes
) {
400 expect(workerNode
.tasksUsage
).toStrictEqual({
404 runTimeHistory
: expect
.any(CircularArray
),
408 waitTimeHistory
: expect
.any(CircularArray
),
413 expect(workerNode
.tasksUsage
.runTimeHistory
.length
).toBe(0)
414 expect(workerNode
.tasksUsage
.waitTimeHistory
.length
).toBe(0)
419 it("Verify that pool event emitter 'full' event can register a callback", async () => {
420 const pool
= new DynamicThreadPool(
423 './tests/worker-files/thread/testWorker.js'
425 const promises
= new Set()
427 pool
.emitter
.on(PoolEvents
.full
, () => ++poolFull
)
428 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
429 promises
.add(pool
.execute())
431 await Promise
.all(promises
)
432 // The `full` event is triggered when the number of submitted tasks at once reach the max number of workers in the dynamic pool.
433 // So in total numberOfWorkers * 2 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool with min = max = numberOfWorkers.
434 expect(poolFull
).toBe(numberOfWorkers
* 2)
438 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
439 const pool
= new FixedThreadPool(
441 './tests/worker-files/thread/testWorker.js'
443 const promises
= new Set()
445 pool
.emitter
.on(PoolEvents
.busy
, () => ++poolBusy
)
446 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
447 promises
.add(pool
.execute())
449 await Promise
.all(promises
)
450 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
451 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
452 expect(poolBusy
).toBe(numberOfWorkers
+ 1)
456 it('Verify that multiple tasks worker is working', async () => {
457 const pool
= new DynamicClusterPool(
460 './tests/worker-files/cluster/testMultiTasksWorker.js'
462 const data
= { n
: 10 }
463 const result0
= await pool
.execute(data
)
464 expect(result0
).toBe(false)
465 const result1
= await pool
.execute(data
, 'jsonIntegerSerialization')
466 expect(result1
).toBe(false)
467 const result2
= await pool
.execute(data
, 'factorial')
468 expect(result2
).toBe(3628800)
469 const result3
= await pool
.execute(data
, 'fibonacci')
470 expect(result3
).toBe(89)