1 const { expect
} = require('expect')
8 WorkerChoiceStrategies
,
11 } = require('../../../lib')
12 const { CircularArray
} = require('../../../lib/circular-array')
13 const { Queue
} = require('../../../lib/queue')
15 describe('Abstract pool test suite', () => {
16 const numberOfWorkers
= 2
17 class StubPoolWithRemoveAllWorker
extends FixedThreadPool
{
20 this.promiseResponseMap
.clear()
23 class StubPoolWithIsMain
extends FixedThreadPool
{
29 it('Simulate pool creation from a non main thread/process', () => {
32 new StubPoolWithIsMain(
34 './tests/worker-files/thread/testWorker.js',
36 errorHandler
: e
=> console
.error(e
)
39 ).toThrowError('Cannot start a pool from a worker!')
42 it('Verify that filePath is checked', () => {
43 const expectedError
= new Error(
44 'Please specify a file with a worker implementation'
46 expect(() => new FixedThreadPool(numberOfWorkers
)).toThrowError(
49 expect(() => new FixedThreadPool(numberOfWorkers
, '')).toThrowError(
54 it('Verify that numberOfWorkers is checked', () => {
55 expect(() => new FixedThreadPool()).toThrowError(
56 'Cannot instantiate a pool without specifying the number of workers'
60 it('Verify that a negative number of workers is checked', () => {
63 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
66 'Cannot instantiate a pool with a negative number of workers'
71 it('Verify that a non integer number of workers is checked', () => {
74 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
77 'Cannot instantiate a pool with a non safe integer number of workers'
82 it('Verify that pool options are checked', async () => {
83 let pool
= new FixedThreadPool(
85 './tests/worker-files/thread/testWorker.js'
87 expect(pool
.emitter
).toBeDefined()
88 expect(pool
.opts
.enableEvents
).toBe(true)
89 expect(pool
.opts
.restartWorkerOnError
).toBe(true)
90 expect(pool
.opts
.enableTasksQueue
).toBe(false)
91 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
92 expect(pool
.opts
.workerChoiceStrategy
).toBe(
93 WorkerChoiceStrategies
.ROUND_ROBIN
95 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
99 expect(pool
.opts
.messageHandler
).toBeUndefined()
100 expect(pool
.opts
.errorHandler
).toBeUndefined()
101 expect(pool
.opts
.onlineHandler
).toBeUndefined()
102 expect(pool
.opts
.exitHandler
).toBeUndefined()
104 const testHandler
= () => console
.log('test handler executed')
105 pool
= new FixedThreadPool(
107 './tests/worker-files/thread/testWorker.js',
109 workerChoiceStrategy
: WorkerChoiceStrategies
.LEAST_USED
,
110 workerChoiceStrategyOptions
: {
112 weights
: { 0: 300, 1: 200 }
115 restartWorkerOnError
: false,
116 enableTasksQueue
: true,
117 tasksQueueOptions
: { concurrency
: 2 },
118 messageHandler
: testHandler
,
119 errorHandler
: testHandler
,
120 onlineHandler
: testHandler
,
121 exitHandler
: testHandler
124 expect(pool
.emitter
).toBeUndefined()
125 expect(pool
.opts
.enableEvents
).toBe(false)
126 expect(pool
.opts
.restartWorkerOnError
).toBe(false)
127 expect(pool
.opts
.enableTasksQueue
).toBe(true)
128 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
129 expect(pool
.opts
.workerChoiceStrategy
).toBe(
130 WorkerChoiceStrategies
.LEAST_USED
132 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
134 weights
: { 0: 300, 1: 200 }
136 expect(pool
.opts
.messageHandler
).toStrictEqual(testHandler
)
137 expect(pool
.opts
.errorHandler
).toStrictEqual(testHandler
)
138 expect(pool
.opts
.onlineHandler
).toStrictEqual(testHandler
)
139 expect(pool
.opts
.exitHandler
).toStrictEqual(testHandler
)
143 it('Verify that pool options are validated', async () => {
148 './tests/worker-files/thread/testWorker.js',
150 enableTasksQueue
: true,
151 tasksQueueOptions
: { concurrency
: 0 }
154 ).toThrowError("Invalid worker tasks concurrency '0'")
159 './tests/worker-files/thread/testWorker.js',
161 workerChoiceStrategy
: 'invalidStrategy'
164 ).toThrowError("Invalid worker choice strategy 'invalidStrategy'")
169 './tests/worker-files/thread/testWorker.js',
171 workerChoiceStrategyOptions
: { weights
: {} }
175 'Invalid worker choice strategy options: must have a weight for each worker node'
179 it('Verify that worker choice strategy options can be set', async () => {
180 const pool
= new FixedThreadPool(
182 './tests/worker-files/thread/testWorker.js',
183 { workerChoiceStrategy
: WorkerChoiceStrategies
.FAIR_SHARE
}
185 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
189 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
190 .workerChoiceStrategies
) {
191 expect(workerChoiceStrategy
.opts
).toStrictEqual({
196 expect(pool
.workerChoiceStrategyContext
.getTaskStatistics()).toStrictEqual({
205 pool
.setWorkerChoiceStrategyOptions({ medRunTime
: true })
206 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
209 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
210 .workerChoiceStrategies
) {
211 expect(workerChoiceStrategy
.opts
).toStrictEqual({ medRunTime
: true })
213 expect(pool
.workerChoiceStrategyContext
.getTaskStatistics()).toStrictEqual({
222 pool
.setWorkerChoiceStrategyOptions({ medRunTime
: false })
223 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
226 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
227 .workerChoiceStrategies
) {
228 expect(workerChoiceStrategy
.opts
).toStrictEqual({ medRunTime
: false })
230 expect(pool
.workerChoiceStrategyContext
.getTaskStatistics()).toStrictEqual({
242 it('Verify that tasks queue can be enabled/disabled', async () => {
243 const pool
= new FixedThreadPool(
245 './tests/worker-files/thread/testWorker.js'
247 expect(pool
.opts
.enableTasksQueue
).toBe(false)
248 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
249 pool
.enableTasksQueue(true)
250 expect(pool
.opts
.enableTasksQueue
).toBe(true)
251 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 1 })
252 pool
.enableTasksQueue(true, { concurrency
: 2 })
253 expect(pool
.opts
.enableTasksQueue
).toBe(true)
254 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
255 pool
.enableTasksQueue(false)
256 expect(pool
.opts
.enableTasksQueue
).toBe(false)
257 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
261 it('Verify that tasks queue options can be set', async () => {
262 const pool
= new FixedThreadPool(
264 './tests/worker-files/thread/testWorker.js',
265 { enableTasksQueue
: true }
267 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 1 })
268 pool
.setTasksQueueOptions({ concurrency
: 2 })
269 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
270 expect(() => pool
.setTasksQueueOptions({ concurrency
: 0 })).toThrowError(
271 "Invalid worker tasks concurrency '0'"
276 it('Verify that pool info is set', async () => {
277 let pool
= new FixedThreadPool(
279 './tests/worker-files/thread/testWorker.js'
281 expect(pool
.info
).toStrictEqual({
282 type
: PoolTypes
.fixed
,
283 worker
: WorkerTypes
.thread
,
284 minSize
: numberOfWorkers
,
285 maxSize
: numberOfWorkers
,
286 workerNodes
: numberOfWorkers
,
287 idleWorkerNodes
: numberOfWorkers
,
294 pool
= new DynamicClusterPool(
297 './tests/worker-files/thread/testWorker.js'
299 expect(pool
.info
).toStrictEqual({
300 type
: PoolTypes
.dynamic
,
301 worker
: WorkerTypes
.cluster
,
302 minSize
: numberOfWorkers
,
303 maxSize
: numberOfWorkers
* 2,
304 workerNodes
: numberOfWorkers
,
305 idleWorkerNodes
: numberOfWorkers
,
314 it('Simulate worker not found', async () => {
315 const pool
= new StubPoolWithRemoveAllWorker(
317 './tests/worker-files/cluster/testWorker.js',
319 errorHandler
: e
=> console
.error(e
)
322 expect(pool
.workerNodes
.length
).toBe(numberOfWorkers
)
323 // Simulate worker not found.
324 pool
.removeAllWorker()
325 expect(pool
.workerNodes
.length
).toBe(0)
329 it('Verify that worker pool tasks usage are initialized', async () => {
330 const pool
= new FixedClusterPool(
332 './tests/worker-files/cluster/testWorker.js'
334 for (const workerNode
of pool
.workerNodes
) {
335 expect(workerNode
.tasksUsage
).toStrictEqual({
339 runTimeHistory
: expect
.any(CircularArray
),
343 waitTimeHistory
: expect
.any(CircularArray
),
353 it('Verify that worker pool tasks queue are initialized', async () => {
354 const pool
= new FixedClusterPool(
356 './tests/worker-files/cluster/testWorker.js'
358 for (const workerNode
of pool
.workerNodes
) {
359 expect(workerNode
.tasksQueue
).toBeDefined()
360 expect(workerNode
.tasksQueue
).toBeInstanceOf(Queue
)
361 expect(workerNode
.tasksQueue
.size
).toBe(0)
366 it('Verify that worker pool tasks usage are computed', async () => {
367 const pool
= new FixedClusterPool(
369 './tests/worker-files/cluster/testWorker.js'
371 const promises
= new Set()
372 const maxMultiplier
= 2
373 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
374 promises
.add(pool
.execute())
376 for (const workerNode
of pool
.workerNodes
) {
377 expect(workerNode
.tasksUsage
).toStrictEqual({
379 running
: maxMultiplier
,
381 runTimeHistory
: expect
.any(CircularArray
),
385 waitTimeHistory
: expect
.any(CircularArray
),
392 await Promise
.all(promises
)
393 for (const workerNode
of pool
.workerNodes
) {
394 expect(workerNode
.tasksUsage
).toStrictEqual({
398 runTimeHistory
: expect
.any(CircularArray
),
402 waitTimeHistory
: expect
.any(CircularArray
),
412 it('Verify that worker pool tasks usage are reset at worker choice strategy change', async () => {
413 const pool
= new DynamicThreadPool(
416 './tests/worker-files/thread/testWorker.js'
418 const promises
= new Set()
419 const maxMultiplier
= 2
420 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
421 promises
.add(pool
.execute())
423 await Promise
.all(promises
)
424 for (const workerNode
of pool
.workerNodes
) {
425 expect(workerNode
.tasksUsage
).toStrictEqual({
426 ran
: expect
.any(Number
),
429 runTimeHistory
: expect
.any(CircularArray
),
433 waitTimeHistory
: expect
.any(CircularArray
),
439 expect(workerNode
.tasksUsage
.ran
).toBeGreaterThan(0)
440 expect(workerNode
.tasksUsage
.ran
).toBeLessThanOrEqual(maxMultiplier
)
442 pool
.setWorkerChoiceStrategy(WorkerChoiceStrategies
.FAIR_SHARE
)
443 for (const workerNode
of pool
.workerNodes
) {
444 expect(workerNode
.tasksUsage
).toStrictEqual({
448 runTimeHistory
: expect
.any(CircularArray
),
452 waitTimeHistory
: expect
.any(CircularArray
),
458 expect(workerNode
.tasksUsage
.runTimeHistory
.length
).toBe(0)
459 expect(workerNode
.tasksUsage
.waitTimeHistory
.length
).toBe(0)
464 it("Verify that pool event emitter 'full' event can register a callback", async () => {
465 const pool
= new DynamicThreadPool(
468 './tests/worker-files/thread/testWorker.js'
470 const promises
= new Set()
473 pool
.emitter
.on(PoolEvents
.full
, info
=> {
477 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
478 promises
.add(pool
.execute())
480 await Promise
.all(promises
)
481 // The `full` event is triggered when the number of submitted tasks at once reach the max number of workers in the dynamic pool.
482 // So in total numberOfWorkers * 2 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool with min = max = numberOfWorkers.
483 expect(poolFull
).toBe(numberOfWorkers
* 2)
484 expect(poolInfo
).toStrictEqual({
485 type
: PoolTypes
.dynamic
,
486 worker
: WorkerTypes
.thread
,
487 minSize
: expect
.any(Number
),
488 maxSize
: expect
.any(Number
),
489 workerNodes
: expect
.any(Number
),
490 idleWorkerNodes
: expect
.any(Number
),
491 busyWorkerNodes
: expect
.any(Number
),
492 runningTasks
: expect
.any(Number
),
493 queuedTasks
: expect
.any(Number
),
494 maxQueuedTasks
: expect
.any(Number
)
499 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
500 const pool
= new FixedThreadPool(
502 './tests/worker-files/thread/testWorker.js'
504 const promises
= new Set()
507 pool
.emitter
.on(PoolEvents
.busy
, info
=> {
511 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
512 promises
.add(pool
.execute())
514 await Promise
.all(promises
)
515 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
516 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
517 expect(poolBusy
).toBe(numberOfWorkers
+ 1)
518 expect(poolInfo
).toStrictEqual({
519 type
: PoolTypes
.fixed
,
520 worker
: WorkerTypes
.thread
,
521 minSize
: expect
.any(Number
),
522 maxSize
: expect
.any(Number
),
523 workerNodes
: expect
.any(Number
),
524 idleWorkerNodes
: expect
.any(Number
),
525 busyWorkerNodes
: expect
.any(Number
),
526 runningTasks
: expect
.any(Number
),
527 queuedTasks
: expect
.any(Number
),
528 maxQueuedTasks
: expect
.any(Number
)
533 it('Verify that multiple tasks worker is working', async () => {
534 const pool
= new DynamicClusterPool(
537 './tests/worker-files/cluster/testMultiTasksWorker.js'
539 const data
= { n
: 10 }
540 const result0
= await pool
.execute(data
)
541 expect(result0
).toBe(false)
542 const result1
= await pool
.execute(data
, 'jsonIntegerSerialization')
543 expect(result1
).toBe(false)
544 const result2
= await pool
.execute(data
, 'factorial')
545 expect(result2
).toBe(3628800)
546 const result3
= await pool
.execute(data
, 'fibonacci')
547 expect(result3
).toBe(89)