1 const { expect
} = require('expect')
8 WorkerChoiceStrategies
,
10 } = require('../../../lib')
11 const { CircularArray
} = require('../../../lib/circular-array')
12 const { Queue
} = require('../../../lib/queue')
14 describe('Abstract pool test suite', () => {
15 const numberOfWorkers
= 2
16 class StubPoolWithRemoveAllWorker
extends FixedThreadPool
{
19 this.promiseResponseMap
.clear()
22 class StubPoolWithIsMain
extends FixedThreadPool
{
28 it('Simulate pool creation from a non main thread/process', () => {
31 new StubPoolWithIsMain(
33 './tests/worker-files/thread/testWorker.js',
35 errorHandler
: e
=> console
.error(e
)
38 ).toThrowError('Cannot start a pool from a worker!')
41 it('Verify that filePath is checked', () => {
42 const expectedError
= new Error(
43 'Please specify a file with a worker implementation'
45 expect(() => new FixedThreadPool(numberOfWorkers
)).toThrowError(
48 expect(() => new FixedThreadPool(numberOfWorkers
, '')).toThrowError(
53 it('Verify that numberOfWorkers is checked', () => {
54 expect(() => new FixedThreadPool()).toThrowError(
55 'Cannot instantiate a pool without specifying the number of workers'
59 it('Verify that a negative number of workers is checked', () => {
62 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
65 'Cannot instantiate a pool with a negative number of workers'
70 it('Verify that a non integer number of workers is checked', () => {
73 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
76 'Cannot instantiate a pool with a non safe integer number of workers'
81 it('Verify that pool options are checked', async () => {
82 let pool
= new FixedThreadPool(
84 './tests/worker-files/thread/testWorker.js'
86 expect(pool
.emitter
).toBeDefined()
87 expect(pool
.opts
.enableEvents
).toBe(true)
88 expect(pool
.opts
.restartWorkerOnError
).toBe(true)
89 expect(pool
.opts
.enableTasksQueue
).toBe(false)
90 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
91 expect(pool
.opts
.workerChoiceStrategy
).toBe(
92 WorkerChoiceStrategies
.ROUND_ROBIN
94 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
98 expect(pool
.opts
.messageHandler
).toBeUndefined()
99 expect(pool
.opts
.errorHandler
).toBeUndefined()
100 expect(pool
.opts
.onlineHandler
).toBeUndefined()
101 expect(pool
.opts
.exitHandler
).toBeUndefined()
103 const testHandler
= () => console
.log('test handler executed')
104 pool
= new FixedThreadPool(
106 './tests/worker-files/thread/testWorker.js',
108 workerChoiceStrategy
: WorkerChoiceStrategies
.LEAST_USED
,
109 workerChoiceStrategyOptions
: {
111 weights
: { 0: 300, 1: 200 }
114 restartWorkerOnError
: false,
115 enableTasksQueue
: true,
116 tasksQueueOptions
: { concurrency
: 2 },
117 messageHandler
: testHandler
,
118 errorHandler
: testHandler
,
119 onlineHandler
: testHandler
,
120 exitHandler
: testHandler
123 expect(pool
.emitter
).toBeUndefined()
124 expect(pool
.opts
.enableEvents
).toBe(false)
125 expect(pool
.opts
.restartWorkerOnError
).toBe(false)
126 expect(pool
.opts
.enableTasksQueue
).toBe(true)
127 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
128 expect(pool
.opts
.workerChoiceStrategy
).toBe(
129 WorkerChoiceStrategies
.LEAST_USED
131 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
133 weights
: { 0: 300, 1: 200 }
135 expect(pool
.opts
.messageHandler
).toStrictEqual(testHandler
)
136 expect(pool
.opts
.errorHandler
).toStrictEqual(testHandler
)
137 expect(pool
.opts
.onlineHandler
).toStrictEqual(testHandler
)
138 expect(pool
.opts
.exitHandler
).toStrictEqual(testHandler
)
142 it('Verify that pool options are validated', async () => {
147 './tests/worker-files/thread/testWorker.js',
149 enableTasksQueue
: true,
150 tasksQueueOptions
: { concurrency
: 0 }
153 ).toThrowError("Invalid worker tasks concurrency '0'")
158 './tests/worker-files/thread/testWorker.js',
160 workerChoiceStrategy
: 'invalidStrategy'
163 ).toThrowError("Invalid worker choice strategy 'invalidStrategy'")
168 './tests/worker-files/thread/testWorker.js',
170 workerChoiceStrategyOptions
: { weights
: {} }
174 'Invalid worker choice strategy options: must have a weight for each worker node'
178 it('Verify that worker choice strategy options can be set', async () => {
179 const pool
= new FixedThreadPool(
181 './tests/worker-files/thread/testWorker.js',
182 { workerChoiceStrategy
: WorkerChoiceStrategies
.FAIR_SHARE
}
184 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
188 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
189 .workerChoiceStrategies
) {
190 expect(workerChoiceStrategy
.opts
).toStrictEqual({
196 pool
.workerChoiceStrategyContext
.getRequiredStatistics()
205 pool
.setWorkerChoiceStrategyOptions({ medRunTime
: true })
206 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
209 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
210 .workerChoiceStrategies
) {
211 expect(workerChoiceStrategy
.opts
).toStrictEqual({ medRunTime
: true })
214 pool
.workerChoiceStrategyContext
.getRequiredStatistics()
223 pool
.setWorkerChoiceStrategyOptions({ medRunTime
: false })
224 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
227 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
228 .workerChoiceStrategies
) {
229 expect(workerChoiceStrategy
.opts
).toStrictEqual({ medRunTime
: false })
232 pool
.workerChoiceStrategyContext
.getRequiredStatistics()
244 it('Verify that tasks queue can be enabled/disabled', async () => {
245 const pool
= new FixedThreadPool(
247 './tests/worker-files/thread/testWorker.js'
249 expect(pool
.opts
.enableTasksQueue
).toBe(false)
250 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
251 pool
.enableTasksQueue(true)
252 expect(pool
.opts
.enableTasksQueue
).toBe(true)
253 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 1 })
254 pool
.enableTasksQueue(true, { concurrency
: 2 })
255 expect(pool
.opts
.enableTasksQueue
).toBe(true)
256 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
257 pool
.enableTasksQueue(false)
258 expect(pool
.opts
.enableTasksQueue
).toBe(false)
259 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
263 it('Verify that tasks queue options can be set', async () => {
264 const pool
= new FixedThreadPool(
266 './tests/worker-files/thread/testWorker.js',
267 { enableTasksQueue
: true }
269 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 1 })
270 pool
.setTasksQueueOptions({ concurrency
: 2 })
271 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
272 expect(() => pool
.setTasksQueueOptions({ concurrency
: 0 })).toThrowError(
273 "Invalid worker tasks concurrency '0'"
278 it('Verify that pool info is set', async () => {
279 let pool
= new FixedThreadPool(
281 './tests/worker-files/thread/testWorker.js'
283 expect(pool
.info
).toStrictEqual({
284 type
: PoolTypes
.fixed
,
285 minSize
: numberOfWorkers
,
286 maxSize
: numberOfWorkers
,
287 workerNodes
: numberOfWorkers
,
288 idleWorkerNodes
: numberOfWorkers
,
295 pool
= new DynamicClusterPool(
298 './tests/worker-files/thread/testWorker.js'
300 expect(pool
.info
).toStrictEqual({
301 type
: PoolTypes
.dynamic
,
302 minSize
: numberOfWorkers
,
303 maxSize
: numberOfWorkers
* 2,
304 workerNodes
: numberOfWorkers
,
305 idleWorkerNodes
: numberOfWorkers
,
314 it('Simulate worker not found', async () => {
315 const pool
= new StubPoolWithRemoveAllWorker(
317 './tests/worker-files/cluster/testWorker.js',
319 errorHandler
: e
=> console
.error(e
)
322 expect(pool
.workerNodes
.length
).toBe(numberOfWorkers
)
323 // Simulate worker not found.
324 pool
.removeAllWorker()
325 expect(pool
.workerNodes
.length
).toBe(0)
329 it('Verify that worker pool tasks usage are initialized', async () => {
330 const pool
= new FixedClusterPool(
332 './tests/worker-files/cluster/testWorker.js'
334 for (const workerNode
of pool
.workerNodes
) {
335 expect(workerNode
.tasksUsage
).toStrictEqual({
339 runTimeHistory
: expect
.any(CircularArray
),
343 waitTimeHistory
: expect
.any(CircularArray
),
352 it('Verify that worker pool tasks queue are initialized', async () => {
353 const pool
= new FixedClusterPool(
355 './tests/worker-files/cluster/testWorker.js'
357 for (const workerNode
of pool
.workerNodes
) {
358 expect(workerNode
.tasksQueue
).toBeDefined()
359 expect(workerNode
.tasksQueue
).toBeInstanceOf(Queue
)
360 expect(workerNode
.tasksQueue
.size
).toBe(0)
365 it('Verify that worker pool tasks usage are computed', async () => {
366 const pool
= new FixedClusterPool(
368 './tests/worker-files/cluster/testWorker.js'
370 const promises
= new Set()
371 const maxMultiplier
= 2
372 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
373 promises
.add(pool
.execute())
375 for (const workerNode
of pool
.workerNodes
) {
376 expect(workerNode
.tasksUsage
).toStrictEqual({
378 running
: maxMultiplier
,
380 runTimeHistory
: expect
.any(CircularArray
),
384 waitTimeHistory
: expect
.any(CircularArray
),
390 await Promise
.all(promises
)
391 for (const workerNode
of pool
.workerNodes
) {
392 expect(workerNode
.tasksUsage
).toStrictEqual({
396 runTimeHistory
: expect
.any(CircularArray
),
400 waitTimeHistory
: expect
.any(CircularArray
),
409 it('Verify that worker pool tasks usage are reset at worker choice strategy change', async () => {
410 const pool
= new DynamicThreadPool(
413 './tests/worker-files/thread/testWorker.js'
415 const promises
= new Set()
416 const maxMultiplier
= 2
417 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
418 promises
.add(pool
.execute())
420 await Promise
.all(promises
)
421 for (const workerNode
of pool
.workerNodes
) {
422 expect(workerNode
.tasksUsage
).toStrictEqual({
423 run
: expect
.any(Number
),
426 runTimeHistory
: expect
.any(CircularArray
),
430 waitTimeHistory
: expect
.any(CircularArray
),
435 expect(workerNode
.tasksUsage
.run
).toBeGreaterThan(0)
436 expect(workerNode
.tasksUsage
.run
).toBeLessThanOrEqual(maxMultiplier
)
438 pool
.setWorkerChoiceStrategy(WorkerChoiceStrategies
.FAIR_SHARE
)
439 for (const workerNode
of pool
.workerNodes
) {
440 expect(workerNode
.tasksUsage
).toStrictEqual({
444 runTimeHistory
: expect
.any(CircularArray
),
448 waitTimeHistory
: expect
.any(CircularArray
),
453 expect(workerNode
.tasksUsage
.runTimeHistory
.length
).toBe(0)
454 expect(workerNode
.tasksUsage
.waitTimeHistory
.length
).toBe(0)
459 it("Verify that pool event emitter 'full' event can register a callback", async () => {
460 const pool
= new DynamicThreadPool(
463 './tests/worker-files/thread/testWorker.js'
465 const promises
= new Set()
467 pool
.emitter
.on(PoolEvents
.full
, () => ++poolFull
)
468 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
469 promises
.add(pool
.execute())
471 await Promise
.all(promises
)
472 // The `full` event is triggered when the number of submitted tasks at once reach the max number of workers in the dynamic pool.
473 // So in total numberOfWorkers * 2 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool with min = max = numberOfWorkers.
474 expect(poolFull
).toBe(numberOfWorkers
* 2)
478 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
479 const pool
= new FixedThreadPool(
481 './tests/worker-files/thread/testWorker.js'
483 const promises
= new Set()
485 pool
.emitter
.on(PoolEvents
.busy
, () => ++poolBusy
)
486 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
487 promises
.add(pool
.execute())
489 await Promise
.all(promises
)
490 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
491 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
492 expect(poolBusy
).toBe(numberOfWorkers
+ 1)
496 it('Verify that multiple tasks worker is working', async () => {
497 const pool
= new DynamicClusterPool(
500 './tests/worker-files/cluster/testMultiTasksWorker.js'
502 const data
= { n
: 10 }
503 const result0
= await pool
.execute(data
)
504 expect(result0
).toBe(false)
505 const result1
= await pool
.execute(data
, 'jsonIntegerSerialization')
506 expect(result1
).toBe(false)
507 const result2
= await pool
.execute(data
, 'factorial')
508 expect(result2
).toBe(3628800)
509 const result3
= await pool
.execute(data
, 'fibonacci')
510 expect(result3
).toBe(89)