1 const { expect
} = require('expect')
8 WorkerChoiceStrategies
,
11 } = require('../../../lib')
12 const { CircularArray
} = require('../../../lib/circular-array')
13 const { Queue
} = require('../../../lib/queue')
15 describe('Abstract pool test suite', () => {
16 const numberOfWorkers
= 2
17 class StubPoolWithRemoveAllWorker
extends FixedThreadPool
{
20 this.promiseResponseMap
.clear()
23 class StubPoolWithIsMain
extends FixedThreadPool
{
29 it('Simulate pool creation from a non main thread/process', () => {
32 new StubPoolWithIsMain(
34 './tests/worker-files/thread/testWorker.js',
36 errorHandler
: e
=> console
.error(e
)
39 ).toThrowError('Cannot start a pool from a worker!')
42 it('Verify that filePath is checked', () => {
43 const expectedError
= new Error(
44 'Please specify a file with a worker implementation'
46 expect(() => new FixedThreadPool(numberOfWorkers
)).toThrowError(
49 expect(() => new FixedThreadPool(numberOfWorkers
, '')).toThrowError(
54 it('Verify that numberOfWorkers is checked', () => {
55 expect(() => new FixedThreadPool()).toThrowError(
56 'Cannot instantiate a pool without specifying the number of workers'
60 it('Verify that a negative number of workers is checked', () => {
63 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
66 'Cannot instantiate a pool with a negative number of workers'
71 it('Verify that a non integer number of workers is checked', () => {
74 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
77 'Cannot instantiate a pool with a non safe integer number of workers'
82 it('Verify that pool options are checked', async () => {
83 let pool
= new FixedThreadPool(
85 './tests/worker-files/thread/testWorker.js'
87 expect(pool
.emitter
).toBeDefined()
88 expect(pool
.opts
.enableEvents
).toBe(true)
89 expect(pool
.opts
.restartWorkerOnError
).toBe(true)
90 expect(pool
.opts
.enableTasksQueue
).toBe(false)
91 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
92 expect(pool
.opts
.workerChoiceStrategy
).toBe(
93 WorkerChoiceStrategies
.ROUND_ROBIN
95 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
99 expect(pool
.opts
.messageHandler
).toBeUndefined()
100 expect(pool
.opts
.errorHandler
).toBeUndefined()
101 expect(pool
.opts
.onlineHandler
).toBeUndefined()
102 expect(pool
.opts
.exitHandler
).toBeUndefined()
104 const testHandler
= () => console
.log('test handler executed')
105 pool
= new FixedThreadPool(
107 './tests/worker-files/thread/testWorker.js',
109 workerChoiceStrategy
: WorkerChoiceStrategies
.LEAST_USED
,
110 workerChoiceStrategyOptions
: {
112 weights
: { 0: 300, 1: 200 }
115 restartWorkerOnError
: false,
116 enableTasksQueue
: true,
117 tasksQueueOptions
: { concurrency
: 2 },
118 messageHandler
: testHandler
,
119 errorHandler
: testHandler
,
120 onlineHandler
: testHandler
,
121 exitHandler
: testHandler
124 expect(pool
.emitter
).toBeUndefined()
125 expect(pool
.opts
.enableEvents
).toBe(false)
126 expect(pool
.opts
.restartWorkerOnError
).toBe(false)
127 expect(pool
.opts
.enableTasksQueue
).toBe(true)
128 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
129 expect(pool
.opts
.workerChoiceStrategy
).toBe(
130 WorkerChoiceStrategies
.LEAST_USED
132 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
134 weights
: { 0: 300, 1: 200 }
136 expect(pool
.opts
.messageHandler
).toStrictEqual(testHandler
)
137 expect(pool
.opts
.errorHandler
).toStrictEqual(testHandler
)
138 expect(pool
.opts
.onlineHandler
).toStrictEqual(testHandler
)
139 expect(pool
.opts
.exitHandler
).toStrictEqual(testHandler
)
143 it('Verify that pool options are validated', async () => {
148 './tests/worker-files/thread/testWorker.js',
150 enableTasksQueue
: true,
151 tasksQueueOptions
: { concurrency
: 0 }
154 ).toThrowError("Invalid worker tasks concurrency '0'")
159 './tests/worker-files/thread/testWorker.js',
161 workerChoiceStrategy
: 'invalidStrategy'
164 ).toThrowError("Invalid worker choice strategy 'invalidStrategy'")
169 './tests/worker-files/thread/testWorker.js',
171 workerChoiceStrategyOptions
: { weights
: {} }
175 'Invalid worker choice strategy options: must have a weight for each worker node'
179 it('Verify that worker choice strategy options can be set', async () => {
180 const pool
= new FixedThreadPool(
182 './tests/worker-files/thread/testWorker.js',
183 { workerChoiceStrategy
: WorkerChoiceStrategies
.FAIR_SHARE
}
185 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
189 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
190 .workerChoiceStrategies
) {
191 expect(workerChoiceStrategy
.opts
).toStrictEqual({
196 expect(pool
.workerChoiceStrategyContext
.getTaskStatistics()).toStrictEqual({
205 pool
.setWorkerChoiceStrategyOptions({ medRunTime
: true })
206 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
209 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
210 .workerChoiceStrategies
) {
211 expect(workerChoiceStrategy
.opts
).toStrictEqual({ medRunTime
: true })
213 expect(pool
.workerChoiceStrategyContext
.getTaskStatistics()).toStrictEqual({
222 pool
.setWorkerChoiceStrategyOptions({ medRunTime
: false })
223 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
226 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
227 .workerChoiceStrategies
) {
228 expect(workerChoiceStrategy
.opts
).toStrictEqual({ medRunTime
: false })
230 expect(pool
.workerChoiceStrategyContext
.getTaskStatistics()).toStrictEqual({
242 it('Verify that tasks queue can be enabled/disabled', async () => {
243 const pool
= new FixedThreadPool(
245 './tests/worker-files/thread/testWorker.js'
247 expect(pool
.opts
.enableTasksQueue
).toBe(false)
248 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
249 pool
.enableTasksQueue(true)
250 expect(pool
.opts
.enableTasksQueue
).toBe(true)
251 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 1 })
252 pool
.enableTasksQueue(true, { concurrency
: 2 })
253 expect(pool
.opts
.enableTasksQueue
).toBe(true)
254 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
255 pool
.enableTasksQueue(false)
256 expect(pool
.opts
.enableTasksQueue
).toBe(false)
257 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
261 it('Verify that tasks queue options can be set', async () => {
262 const pool
= new FixedThreadPool(
264 './tests/worker-files/thread/testWorker.js',
265 { enableTasksQueue
: true }
267 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 1 })
268 pool
.setTasksQueueOptions({ concurrency
: 2 })
269 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
270 expect(() => pool
.setTasksQueueOptions({ concurrency
: 0 })).toThrowError(
271 "Invalid worker tasks concurrency '0'"
276 it('Verify that pool info is set', async () => {
277 let pool
= new FixedThreadPool(
279 './tests/worker-files/thread/testWorker.js'
281 expect(pool
.info
).toStrictEqual({
282 type
: PoolTypes
.fixed
,
283 worker
: WorkerTypes
.thread
,
284 minSize
: numberOfWorkers
,
285 maxSize
: numberOfWorkers
,
286 workerNodes
: numberOfWorkers
,
287 idleWorkerNodes
: numberOfWorkers
,
294 pool
= new DynamicClusterPool(
297 './tests/worker-files/thread/testWorker.js'
299 expect(pool
.info
).toStrictEqual({
300 type
: PoolTypes
.dynamic
,
301 worker
: WorkerTypes
.cluster
,
302 minSize
: numberOfWorkers
,
303 maxSize
: numberOfWorkers
* 2,
304 workerNodes
: numberOfWorkers
,
305 idleWorkerNodes
: numberOfWorkers
,
314 it('Simulate worker not found', async () => {
315 const pool
= new StubPoolWithRemoveAllWorker(
317 './tests/worker-files/cluster/testWorker.js',
319 errorHandler
: e
=> console
.error(e
)
322 expect(pool
.workerNodes
.length
).toBe(numberOfWorkers
)
323 // Simulate worker not found.
324 pool
.removeAllWorker()
325 expect(pool
.workerNodes
.length
).toBe(0)
329 it('Verify that worker pool tasks usage are initialized', async () => {
330 const pool
= new FixedClusterPool(
332 './tests/worker-files/cluster/testWorker.js'
334 for (const workerNode
of pool
.workerNodes
) {
335 expect(workerNode
.tasksUsage
).toStrictEqual({
339 runTimeHistory
: expect
.any(CircularArray
),
343 waitTimeHistory
: expect
.any(CircularArray
),
353 it('Verify that worker pool tasks queue are initialized', async () => {
354 const pool
= new FixedClusterPool(
356 './tests/worker-files/cluster/testWorker.js'
358 for (const workerNode
of pool
.workerNodes
) {
359 expect(workerNode
.tasksQueue
).toBeDefined()
360 expect(workerNode
.tasksQueue
).toBeInstanceOf(Queue
)
361 expect(workerNode
.tasksQueue
.size
).toBe(0)
366 it('Verify that worker pool tasks usage are computed', async () => {
367 const pool
= new FixedClusterPool(
369 './tests/worker-files/cluster/testWorker.js'
371 const promises
= new Set()
372 const maxMultiplier
= 2
373 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
374 promises
.add(pool
.execute())
376 for (const workerNode
of pool
.workerNodes
) {
377 expect(workerNode
.tasksUsage
).toStrictEqual({
379 running
: maxMultiplier
,
381 runTimeHistory
: expect
.any(CircularArray
),
385 waitTimeHistory
: expect
.any(CircularArray
),
392 await Promise
.all(promises
)
393 for (const workerNode
of pool
.workerNodes
) {
394 expect(workerNode
.tasksUsage
).toStrictEqual({
398 runTimeHistory
: expect
.any(CircularArray
),
402 waitTimeHistory
: expect
.any(CircularArray
),
412 it('Verify that worker pool tasks usage are reset at worker choice strategy change', async () => {
413 const pool
= new DynamicThreadPool(
416 './tests/worker-files/thread/testWorker.js'
418 const promises
= new Set()
419 const maxMultiplier
= 2
420 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
421 promises
.add(pool
.execute())
423 await Promise
.all(promises
)
424 for (const workerNode
of pool
.workerNodes
) {
425 expect(workerNode
.tasksUsage
).toStrictEqual({
426 ran
: expect
.any(Number
),
429 runTimeHistory
: expect
.any(CircularArray
),
433 waitTimeHistory
: expect
.any(CircularArray
),
439 expect(workerNode
.tasksUsage
.ran
).toBeGreaterThan(0)
440 expect(workerNode
.tasksUsage
.ran
).toBeLessThanOrEqual(maxMultiplier
)
442 pool
.setWorkerChoiceStrategy(WorkerChoiceStrategies
.FAIR_SHARE
)
443 for (const workerNode
of pool
.workerNodes
) {
444 expect(workerNode
.tasksUsage
).toStrictEqual({
448 runTimeHistory
: expect
.any(CircularArray
),
452 waitTimeHistory
: expect
.any(CircularArray
),
458 expect(workerNode
.tasksUsage
.runTimeHistory
.length
).toBe(0)
459 expect(workerNode
.tasksUsage
.waitTimeHistory
.length
).toBe(0)
464 it("Verify that pool event emitter 'full' event can register a callback", async () => {
465 const pool
= new DynamicThreadPool(
468 './tests/worker-files/thread/testWorker.js'
470 const promises
= new Set()
472 pool
.emitter
.on(PoolEvents
.full
, () => ++poolFull
)
473 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
474 promises
.add(pool
.execute())
476 await Promise
.all(promises
)
477 // The `full` event is triggered when the number of submitted tasks at once reach the max number of workers in the dynamic pool.
478 // So in total numberOfWorkers * 2 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool with min = max = numberOfWorkers.
479 expect(poolFull
).toBe(numberOfWorkers
* 2)
483 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
484 const pool
= new FixedThreadPool(
486 './tests/worker-files/thread/testWorker.js'
488 const promises
= new Set()
490 pool
.emitter
.on(PoolEvents
.busy
, () => ++poolBusy
)
491 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
492 promises
.add(pool
.execute())
494 await Promise
.all(promises
)
495 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
496 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
497 expect(poolBusy
).toBe(numberOfWorkers
+ 1)
501 it('Verify that multiple tasks worker is working', async () => {
502 const pool
= new DynamicClusterPool(
505 './tests/worker-files/cluster/testMultiTasksWorker.js'
507 const data
= { n
: 10 }
508 const result0
= await pool
.execute(data
)
509 expect(result0
).toBe(false)
510 const result1
= await pool
.execute(data
, 'jsonIntegerSerialization')
511 expect(result1
).toBe(false)
512 const result2
= await pool
.execute(data
, 'factorial')
513 expect(result2
).toBe(3628800)
514 const result3
= await pool
.execute(data
, 'fibonacci')
515 expect(result3
).toBe(89)