1 const { expect
} = require('expect')
9 } = require('../../../lib')
10 const { CircularArray
} = require('../../../lib/circular-array')
11 const { Queue
} = require('../../../lib/queue')
13 describe('Abstract pool test suite', () => {
14 const numberOfWorkers
= 2
15 class StubPoolWithRemoveAllWorker
extends FixedThreadPool
{
18 this.promiseResponseMap
.clear()
21 class StubPoolWithIsMain
extends FixedThreadPool
{
27 it('Simulate pool creation from a non main thread/process', () => {
30 new StubPoolWithIsMain(
32 './tests/worker-files/thread/testWorker.js',
34 errorHandler
: e
=> console
.error(e
)
37 ).toThrowError('Cannot start a pool from a worker!')
40 it('Verify that filePath is checked', () => {
41 const expectedError
= new Error(
42 'Please specify a file with a worker implementation'
44 expect(() => new FixedThreadPool(numberOfWorkers
)).toThrowError(
47 expect(() => new FixedThreadPool(numberOfWorkers
, '')).toThrowError(
52 it('Verify that numberOfWorkers is checked', () => {
53 expect(() => new FixedThreadPool()).toThrowError(
54 'Cannot instantiate a pool without specifying the number of workers'
58 it('Verify that a negative number of workers is checked', () => {
61 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
64 'Cannot instantiate a pool with a negative number of workers'
69 it('Verify that a non integer number of workers is checked', () => {
72 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
75 'Cannot instantiate a pool with a non safe integer number of workers'
80 it('Verify that pool options are checked', async () => {
81 let pool
= new FixedThreadPool(
83 './tests/worker-files/thread/testWorker.js'
85 expect(pool
.emitter
).toBeDefined()
86 expect(pool
.opts
.enableEvents
).toBe(true)
87 expect(pool
.opts
.restartWorkerOnError
).toBe(true)
88 expect(pool
.opts
.enableTasksQueue
).toBe(false)
89 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
90 expect(pool
.opts
.workerChoiceStrategy
).toBe(
91 WorkerChoiceStrategies
.ROUND_ROBIN
93 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
97 expect(pool
.opts
.messageHandler
).toBeUndefined()
98 expect(pool
.opts
.errorHandler
).toBeUndefined()
99 expect(pool
.opts
.onlineHandler
).toBeUndefined()
100 expect(pool
.opts
.exitHandler
).toBeUndefined()
102 const testHandler
= () => console
.log('test handler executed')
103 pool
= new FixedThreadPool(
105 './tests/worker-files/thread/testWorker.js',
107 workerChoiceStrategy
: WorkerChoiceStrategies
.LEAST_USED
,
108 workerChoiceStrategyOptions
: {
110 weights
: { 0: 300, 1: 200 }
113 restartWorkerOnError
: false,
114 enableTasksQueue
: true,
115 tasksQueueOptions
: { concurrency
: 2 },
116 messageHandler
: testHandler
,
117 errorHandler
: testHandler
,
118 onlineHandler
: testHandler
,
119 exitHandler
: testHandler
122 expect(pool
.emitter
).toBeUndefined()
123 expect(pool
.opts
.enableEvents
).toBe(false)
124 expect(pool
.opts
.restartWorkerOnError
).toBe(false)
125 expect(pool
.opts
.enableTasksQueue
).toBe(true)
126 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
127 expect(pool
.opts
.workerChoiceStrategy
).toBe(
128 WorkerChoiceStrategies
.LEAST_USED
130 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
132 weights
: { 0: 300, 1: 200 }
134 expect(pool
.opts
.messageHandler
).toStrictEqual(testHandler
)
135 expect(pool
.opts
.errorHandler
).toStrictEqual(testHandler
)
136 expect(pool
.opts
.onlineHandler
).toStrictEqual(testHandler
)
137 expect(pool
.opts
.exitHandler
).toStrictEqual(testHandler
)
141 it('Verify that pool options are validated', async () => {
146 './tests/worker-files/thread/testWorker.js',
148 enableTasksQueue
: true,
149 tasksQueueOptions
: { concurrency
: 0 }
152 ).toThrowError("Invalid worker tasks concurrency '0'")
157 './tests/worker-files/thread/testWorker.js',
159 workerChoiceStrategy
: 'invalidStrategy'
162 ).toThrowError("Invalid worker choice strategy 'invalidStrategy'")
167 './tests/worker-files/thread/testWorker.js',
169 workerChoiceStrategyOptions
: { weights
: {} }
173 'Invalid worker choice strategy options: must have a weight for each worker node'
177 it('Verify that worker choice strategy options can be set', async () => {
178 const pool
= new FixedThreadPool(
180 './tests/worker-files/thread/testWorker.js',
181 { workerChoiceStrategy
: WorkerChoiceStrategies
.FAIR_SHARE
}
183 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
187 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
188 .workerChoiceStrategies
) {
189 expect(workerChoiceStrategy
.opts
).toStrictEqual({
195 pool
.workerChoiceStrategyContext
.getRequiredStatistics()
204 pool
.setWorkerChoiceStrategyOptions({ medRunTime
: true })
205 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
208 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
209 .workerChoiceStrategies
) {
210 expect(workerChoiceStrategy
.opts
).toStrictEqual({ medRunTime
: true })
213 pool
.workerChoiceStrategyContext
.getRequiredStatistics()
222 pool
.setWorkerChoiceStrategyOptions({ medRunTime
: false })
223 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
226 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
227 .workerChoiceStrategies
) {
228 expect(workerChoiceStrategy
.opts
).toStrictEqual({ medRunTime
: false })
231 pool
.workerChoiceStrategyContext
.getRequiredStatistics()
243 it('Verify that tasks queue can be enabled/disabled', async () => {
244 const pool
= new FixedThreadPool(
246 './tests/worker-files/thread/testWorker.js'
248 expect(pool
.opts
.enableTasksQueue
).toBe(false)
249 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
250 pool
.enableTasksQueue(true)
251 expect(pool
.opts
.enableTasksQueue
).toBe(true)
252 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 1 })
253 pool
.enableTasksQueue(true, { concurrency
: 2 })
254 expect(pool
.opts
.enableTasksQueue
).toBe(true)
255 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
256 pool
.enableTasksQueue(false)
257 expect(pool
.opts
.enableTasksQueue
).toBe(false)
258 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
262 it('Verify that tasks queue options can be set', async () => {
263 const pool
= new FixedThreadPool(
265 './tests/worker-files/thread/testWorker.js',
266 { enableTasksQueue
: true }
268 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 1 })
269 pool
.setTasksQueueOptions({ concurrency
: 2 })
270 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
271 expect(() => pool
.setTasksQueueOptions({ concurrency
: 0 })).toThrowError(
272 "Invalid worker tasks concurrency '0'"
277 it('Simulate worker not found', async () => {
278 const pool
= new StubPoolWithRemoveAllWorker(
280 './tests/worker-files/cluster/testWorker.js',
282 errorHandler
: e
=> console
.error(e
)
285 expect(pool
.workerNodes
.length
).toBe(numberOfWorkers
)
286 // Simulate worker not found.
287 pool
.removeAllWorker()
288 expect(pool
.workerNodes
.length
).toBe(0)
292 it('Verify that worker pool tasks usage are initialized', async () => {
293 const pool
= new FixedClusterPool(
295 './tests/worker-files/cluster/testWorker.js'
297 for (const workerNode
of pool
.workerNodes
) {
298 expect(workerNode
.tasksUsage
).toStrictEqual({
302 runTimeHistory
: expect
.any(CircularArray
),
306 waitTimeHistory
: expect
.any(CircularArray
),
315 it('Verify that worker pool tasks queue are initialized', async () => {
316 const pool
= new FixedClusterPool(
318 './tests/worker-files/cluster/testWorker.js'
320 for (const workerNode
of pool
.workerNodes
) {
321 expect(workerNode
.tasksQueue
).toBeDefined()
322 expect(workerNode
.tasksQueue
).toBeInstanceOf(Queue
)
323 expect(workerNode
.tasksQueue
.size
).toBe(0)
328 it('Verify that worker pool tasks usage are computed', async () => {
329 const pool
= new FixedClusterPool(
331 './tests/worker-files/cluster/testWorker.js'
333 const promises
= new Set()
334 const maxMultiplier
= 2
335 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
336 promises
.add(pool
.execute())
338 for (const workerNode
of pool
.workerNodes
) {
339 expect(workerNode
.tasksUsage
).toStrictEqual({
341 running
: maxMultiplier
,
343 runTimeHistory
: expect
.any(CircularArray
),
347 waitTimeHistory
: expect
.any(CircularArray
),
353 await Promise
.all(promises
)
354 for (const workerNode
of pool
.workerNodes
) {
355 expect(workerNode
.tasksUsage
).toStrictEqual({
359 runTimeHistory
: expect
.any(CircularArray
),
363 waitTimeHistory
: expect
.any(CircularArray
),
372 it('Verify that worker pool tasks usage are reset at worker choice strategy change', async () => {
373 const pool
= new DynamicThreadPool(
376 './tests/worker-files/thread/testWorker.js'
378 const promises
= new Set()
379 const maxMultiplier
= 2
380 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
381 promises
.add(pool
.execute())
383 await Promise
.all(promises
)
384 for (const workerNode
of pool
.workerNodes
) {
385 expect(workerNode
.tasksUsage
).toStrictEqual({
386 run
: expect
.any(Number
),
389 runTimeHistory
: expect
.any(CircularArray
),
393 waitTimeHistory
: expect
.any(CircularArray
),
398 expect(workerNode
.tasksUsage
.run
).toBeGreaterThan(0)
399 expect(workerNode
.tasksUsage
.run
).toBeLessThanOrEqual(maxMultiplier
)
401 pool
.setWorkerChoiceStrategy(WorkerChoiceStrategies
.FAIR_SHARE
)
402 for (const workerNode
of pool
.workerNodes
) {
403 expect(workerNode
.tasksUsage
).toStrictEqual({
407 runTimeHistory
: expect
.any(CircularArray
),
411 waitTimeHistory
: expect
.any(CircularArray
),
416 expect(workerNode
.tasksUsage
.runTimeHistory
.length
).toBe(0)
417 expect(workerNode
.tasksUsage
.waitTimeHistory
.length
).toBe(0)
422 it("Verify that pool event emitter 'full' event can register a callback", async () => {
423 const pool
= new DynamicThreadPool(
426 './tests/worker-files/thread/testWorker.js'
428 const promises
= new Set()
430 pool
.emitter
.on(PoolEvents
.full
, () => ++poolFull
)
431 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
432 promises
.add(pool
.execute())
434 await Promise
.all(promises
)
435 // The `full` event is triggered when the number of submitted tasks at once reach the max number of workers in the dynamic pool.
436 // So in total numberOfWorkers * 2 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool with min = max = numberOfWorkers.
437 expect(poolFull
).toBe(numberOfWorkers
* 2)
441 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
442 const pool
= new FixedThreadPool(
444 './tests/worker-files/thread/testWorker.js'
446 const promises
= new Set()
448 pool
.emitter
.on(PoolEvents
.busy
, () => ++poolBusy
)
449 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
450 promises
.add(pool
.execute())
452 await Promise
.all(promises
)
453 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
454 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
455 expect(poolBusy
).toBe(numberOfWorkers
+ 1)
459 it('Verify that multiple tasks worker is working', async () => {
460 const pool
= new DynamicClusterPool(
463 './tests/worker-files/cluster/testMultiTasksWorker.js'
465 const data
= { n
: 10 }
466 const result0
= await pool
.execute(data
)
467 expect(result0
).toBe(false)
468 const result1
= await pool
.execute(data
, 'jsonIntegerSerialization')
469 expect(result1
).toBe(false)
470 const result2
= await pool
.execute(data
, 'factorial')
471 expect(result2
).toBe(3628800)
472 const result3
= await pool
.execute(data
, 'fibonacci')
473 expect(result3
).toBe(89)