1 const { expect
} = require('expect')
8 WorkerChoiceStrategies
,
11 } = require('../../../lib')
12 const { CircularArray
} = require('../../../lib/circular-array')
13 const { Queue
} = require('../../../lib/queue')
15 describe('Abstract pool test suite', () => {
16 const numberOfWorkers
= 2
17 class StubPoolWithRemoveAllWorker
extends FixedThreadPool
{
20 this.promiseResponseMap
.clear()
23 class StubPoolWithIsMain
extends FixedThreadPool
{
29 it('Simulate pool creation from a non main thread/process', () => {
32 new StubPoolWithIsMain(
34 './tests/worker-files/thread/testWorker.js',
36 errorHandler
: e
=> console
.error(e
)
39 ).toThrowError('Cannot start a pool from a worker!')
42 it('Verify that filePath is checked', () => {
43 const expectedError
= new Error(
44 'Please specify a file with a worker implementation'
46 expect(() => new FixedThreadPool(numberOfWorkers
)).toThrowError(
49 expect(() => new FixedThreadPool(numberOfWorkers
, '')).toThrowError(
54 it('Verify that numberOfWorkers is checked', () => {
55 expect(() => new FixedThreadPool()).toThrowError(
56 'Cannot instantiate a pool without specifying the number of workers'
60 it('Verify that a negative number of workers is checked', () => {
63 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
66 'Cannot instantiate a pool with a negative number of workers'
71 it('Verify that a non integer number of workers is checked', () => {
74 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
77 'Cannot instantiate a pool with a non safe integer number of workers'
82 it('Verify that pool options are checked', async () => {
83 let pool
= new FixedThreadPool(
85 './tests/worker-files/thread/testWorker.js'
87 expect(pool
.emitter
).toBeDefined()
88 expect(pool
.opts
.enableEvents
).toBe(true)
89 expect(pool
.opts
.restartWorkerOnError
).toBe(true)
90 expect(pool
.opts
.enableTasksQueue
).toBe(false)
91 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
92 expect(pool
.opts
.workerChoiceStrategy
).toBe(
93 WorkerChoiceStrategies
.ROUND_ROBIN
95 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
99 expect(pool
.opts
.messageHandler
).toBeUndefined()
100 expect(pool
.opts
.errorHandler
).toBeUndefined()
101 expect(pool
.opts
.onlineHandler
).toBeUndefined()
102 expect(pool
.opts
.exitHandler
).toBeUndefined()
104 const testHandler
= () => console
.log('test handler executed')
105 pool
= new FixedThreadPool(
107 './tests/worker-files/thread/testWorker.js',
109 workerChoiceStrategy
: WorkerChoiceStrategies
.LEAST_USED
,
110 workerChoiceStrategyOptions
: {
112 weights
: { 0: 300, 1: 200 }
115 restartWorkerOnError
: false,
116 enableTasksQueue
: true,
117 tasksQueueOptions
: { concurrency
: 2 },
118 messageHandler
: testHandler
,
119 errorHandler
: testHandler
,
120 onlineHandler
: testHandler
,
121 exitHandler
: testHandler
124 expect(pool
.emitter
).toBeUndefined()
125 expect(pool
.opts
.enableEvents
).toBe(false)
126 expect(pool
.opts
.restartWorkerOnError
).toBe(false)
127 expect(pool
.opts
.enableTasksQueue
).toBe(true)
128 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
129 expect(pool
.opts
.workerChoiceStrategy
).toBe(
130 WorkerChoiceStrategies
.LEAST_USED
132 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
134 weights
: { 0: 300, 1: 200 }
136 expect(pool
.opts
.messageHandler
).toStrictEqual(testHandler
)
137 expect(pool
.opts
.errorHandler
).toStrictEqual(testHandler
)
138 expect(pool
.opts
.onlineHandler
).toStrictEqual(testHandler
)
139 expect(pool
.opts
.exitHandler
).toStrictEqual(testHandler
)
143 it('Verify that pool options are validated', async () => {
148 './tests/worker-files/thread/testWorker.js',
150 enableTasksQueue
: true,
151 tasksQueueOptions
: { concurrency
: 0 }
154 ).toThrowError("Invalid worker tasks concurrency '0'")
159 './tests/worker-files/thread/testWorker.js',
161 workerChoiceStrategy
: 'invalidStrategy'
164 ).toThrowError("Invalid worker choice strategy 'invalidStrategy'")
169 './tests/worker-files/thread/testWorker.js',
171 workerChoiceStrategyOptions
: { weights
: {} }
175 'Invalid worker choice strategy options: must have a weight for each worker node'
179 it('Verify that worker choice strategy options can be set', async () => {
180 const pool
= new FixedThreadPool(
182 './tests/worker-files/thread/testWorker.js',
183 { workerChoiceStrategy
: WorkerChoiceStrategies
.FAIR_SHARE
}
185 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
189 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
190 .workerChoiceStrategies
) {
191 expect(workerChoiceStrategy
.opts
).toStrictEqual({
197 pool
.workerChoiceStrategyContext
.getRequiredStatistics()
206 pool
.setWorkerChoiceStrategyOptions({ medRunTime
: true })
207 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
210 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
211 .workerChoiceStrategies
) {
212 expect(workerChoiceStrategy
.opts
).toStrictEqual({ medRunTime
: true })
215 pool
.workerChoiceStrategyContext
.getRequiredStatistics()
224 pool
.setWorkerChoiceStrategyOptions({ medRunTime
: false })
225 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
228 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
229 .workerChoiceStrategies
) {
230 expect(workerChoiceStrategy
.opts
).toStrictEqual({ medRunTime
: false })
233 pool
.workerChoiceStrategyContext
.getRequiredStatistics()
245 it('Verify that tasks queue can be enabled/disabled', async () => {
246 const pool
= new FixedThreadPool(
248 './tests/worker-files/thread/testWorker.js'
250 expect(pool
.opts
.enableTasksQueue
).toBe(false)
251 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
252 pool
.enableTasksQueue(true)
253 expect(pool
.opts
.enableTasksQueue
).toBe(true)
254 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 1 })
255 pool
.enableTasksQueue(true, { concurrency
: 2 })
256 expect(pool
.opts
.enableTasksQueue
).toBe(true)
257 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
258 pool
.enableTasksQueue(false)
259 expect(pool
.opts
.enableTasksQueue
).toBe(false)
260 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
264 it('Verify that tasks queue options can be set', async () => {
265 const pool
= new FixedThreadPool(
267 './tests/worker-files/thread/testWorker.js',
268 { enableTasksQueue
: true }
270 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 1 })
271 pool
.setTasksQueueOptions({ concurrency
: 2 })
272 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
273 expect(() => pool
.setTasksQueueOptions({ concurrency
: 0 })).toThrowError(
274 "Invalid worker tasks concurrency '0'"
279 it('Verify that pool info is set', async () => {
280 let pool
= new FixedThreadPool(
282 './tests/worker-files/thread/testWorker.js'
284 expect(pool
.info
).toStrictEqual({
285 type
: PoolTypes
.fixed
,
286 worker
: WorkerTypes
.thread
,
287 minSize
: numberOfWorkers
,
288 maxSize
: numberOfWorkers
,
289 workerNodes
: numberOfWorkers
,
290 idleWorkerNodes
: numberOfWorkers
,
297 pool
= new DynamicClusterPool(
300 './tests/worker-files/thread/testWorker.js'
302 expect(pool
.info
).toStrictEqual({
303 type
: PoolTypes
.dynamic
,
304 worker
: WorkerTypes
.cluster
,
305 minSize
: numberOfWorkers
,
306 maxSize
: numberOfWorkers
* 2,
307 workerNodes
: numberOfWorkers
,
308 idleWorkerNodes
: numberOfWorkers
,
317 it('Simulate worker not found', async () => {
318 const pool
= new StubPoolWithRemoveAllWorker(
320 './tests/worker-files/cluster/testWorker.js',
322 errorHandler
: e
=> console
.error(e
)
325 expect(pool
.workerNodes
.length
).toBe(numberOfWorkers
)
326 // Simulate worker not found.
327 pool
.removeAllWorker()
328 expect(pool
.workerNodes
.length
).toBe(0)
332 it('Verify that worker pool tasks usage are initialized', async () => {
333 const pool
= new FixedClusterPool(
335 './tests/worker-files/cluster/testWorker.js'
337 for (const workerNode
of pool
.workerNodes
) {
338 expect(workerNode
.tasksUsage
).toStrictEqual({
342 runTimeHistory
: expect
.any(CircularArray
),
346 waitTimeHistory
: expect
.any(CircularArray
),
355 it('Verify that worker pool tasks queue are initialized', async () => {
356 const pool
= new FixedClusterPool(
358 './tests/worker-files/cluster/testWorker.js'
360 for (const workerNode
of pool
.workerNodes
) {
361 expect(workerNode
.tasksQueue
).toBeDefined()
362 expect(workerNode
.tasksQueue
).toBeInstanceOf(Queue
)
363 expect(workerNode
.tasksQueue
.size
).toBe(0)
368 it('Verify that worker pool tasks usage are computed', async () => {
369 const pool
= new FixedClusterPool(
371 './tests/worker-files/cluster/testWorker.js'
373 const promises
= new Set()
374 const maxMultiplier
= 2
375 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
376 promises
.add(pool
.execute())
378 for (const workerNode
of pool
.workerNodes
) {
379 expect(workerNode
.tasksUsage
).toStrictEqual({
381 running
: maxMultiplier
,
383 runTimeHistory
: expect
.any(CircularArray
),
387 waitTimeHistory
: expect
.any(CircularArray
),
393 await Promise
.all(promises
)
394 for (const workerNode
of pool
.workerNodes
) {
395 expect(workerNode
.tasksUsage
).toStrictEqual({
399 runTimeHistory
: expect
.any(CircularArray
),
403 waitTimeHistory
: expect
.any(CircularArray
),
412 it('Verify that worker pool tasks usage are reset at worker choice strategy change', async () => {
413 const pool
= new DynamicThreadPool(
416 './tests/worker-files/thread/testWorker.js'
418 const promises
= new Set()
419 const maxMultiplier
= 2
420 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
421 promises
.add(pool
.execute())
423 await Promise
.all(promises
)
424 for (const workerNode
of pool
.workerNodes
) {
425 expect(workerNode
.tasksUsage
).toStrictEqual({
426 ran
: expect
.any(Number
),
429 runTimeHistory
: expect
.any(CircularArray
),
433 waitTimeHistory
: expect
.any(CircularArray
),
438 expect(workerNode
.tasksUsage
.ran
).toBeGreaterThan(0)
439 expect(workerNode
.tasksUsage
.ran
).toBeLessThanOrEqual(maxMultiplier
)
441 pool
.setWorkerChoiceStrategy(WorkerChoiceStrategies
.FAIR_SHARE
)
442 for (const workerNode
of pool
.workerNodes
) {
443 expect(workerNode
.tasksUsage
).toStrictEqual({
447 runTimeHistory
: expect
.any(CircularArray
),
451 waitTimeHistory
: expect
.any(CircularArray
),
456 expect(workerNode
.tasksUsage
.runTimeHistory
.length
).toBe(0)
457 expect(workerNode
.tasksUsage
.waitTimeHistory
.length
).toBe(0)
462 it("Verify that pool event emitter 'full' event can register a callback", async () => {
463 const pool
= new DynamicThreadPool(
466 './tests/worker-files/thread/testWorker.js'
468 const promises
= new Set()
470 pool
.emitter
.on(PoolEvents
.full
, () => ++poolFull
)
471 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
472 promises
.add(pool
.execute())
474 await Promise
.all(promises
)
475 // The `full` event is triggered when the number of submitted tasks at once reach the max number of workers in the dynamic pool.
476 // So in total numberOfWorkers * 2 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool with min = max = numberOfWorkers.
477 expect(poolFull
).toBe(numberOfWorkers
* 2)
481 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
482 const pool
= new FixedThreadPool(
484 './tests/worker-files/thread/testWorker.js'
486 const promises
= new Set()
488 pool
.emitter
.on(PoolEvents
.busy
, () => ++poolBusy
)
489 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
490 promises
.add(pool
.execute())
492 await Promise
.all(promises
)
493 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
494 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
495 expect(poolBusy
).toBe(numberOfWorkers
+ 1)
499 it('Verify that multiple tasks worker is working', async () => {
500 const pool
= new DynamicClusterPool(
503 './tests/worker-files/cluster/testMultiTasksWorker.js'
505 const data
= { n
: 10 }
506 const result0
= await pool
.execute(data
)
507 expect(result0
).toBe(false)
508 const result1
= await pool
.execute(data
, 'jsonIntegerSerialization')
509 expect(result1
).toBe(false)
510 const result2
= await pool
.execute(data
, 'factorial')
511 expect(result2
).toBe(3628800)
512 const result3
= await pool
.execute(data
, 'fibonacci')
513 expect(result3
).toBe(89)