1 const { expect
} = require('expect')
8 WorkerChoiceStrategies
,
11 } = require('../../../lib')
12 const { CircularArray
} = require('../../../lib/circular-array')
13 const { Queue
} = require('../../../lib/queue')
15 describe('Abstract pool test suite', () => {
16 const numberOfWorkers
= 2
17 class StubPoolWithRemoveAllWorker
extends FixedThreadPool
{
20 this.promiseResponseMap
.clear()
23 class StubPoolWithIsMain
extends FixedThreadPool
{
29 it('Simulate pool creation from a non main thread/process', () => {
32 new StubPoolWithIsMain(
34 './tests/worker-files/thread/testWorker.js',
36 errorHandler
: e
=> console
.error(e
)
39 ).toThrowError('Cannot start a pool from a worker!')
42 it('Verify that filePath is checked', () => {
43 const expectedError
= new Error(
44 'Please specify a file with a worker implementation'
46 expect(() => new FixedThreadPool(numberOfWorkers
)).toThrowError(
49 expect(() => new FixedThreadPool(numberOfWorkers
, '')).toThrowError(
54 it('Verify that numberOfWorkers is checked', () => {
55 expect(() => new FixedThreadPool()).toThrowError(
56 'Cannot instantiate a pool without specifying the number of workers'
60 it('Verify that a negative number of workers is checked', () => {
63 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
66 'Cannot instantiate a pool with a negative number of workers'
71 it('Verify that a non integer number of workers is checked', () => {
74 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
77 'Cannot instantiate a pool with a non safe integer number of workers'
82 it('Verify that pool options are checked', async () => {
83 let pool
= new FixedThreadPool(
85 './tests/worker-files/thread/testWorker.js'
87 expect(pool
.emitter
).toBeDefined()
88 expect(pool
.opts
.enableEvents
).toBe(true)
89 expect(pool
.opts
.restartWorkerOnError
).toBe(true)
90 expect(pool
.opts
.enableTasksQueue
).toBe(false)
91 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
92 expect(pool
.opts
.workerChoiceStrategy
).toBe(
93 WorkerChoiceStrategies
.ROUND_ROBIN
95 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
99 expect(pool
.opts
.messageHandler
).toBeUndefined()
100 expect(pool
.opts
.errorHandler
).toBeUndefined()
101 expect(pool
.opts
.onlineHandler
).toBeUndefined()
102 expect(pool
.opts
.exitHandler
).toBeUndefined()
104 const testHandler
= () => console
.log('test handler executed')
105 pool
= new FixedThreadPool(
107 './tests/worker-files/thread/testWorker.js',
109 workerChoiceStrategy
: WorkerChoiceStrategies
.LEAST_USED
,
110 workerChoiceStrategyOptions
: {
112 weights
: { 0: 300, 1: 200 }
115 restartWorkerOnError
: false,
116 enableTasksQueue
: true,
117 tasksQueueOptions
: { concurrency
: 2 },
118 messageHandler
: testHandler
,
119 errorHandler
: testHandler
,
120 onlineHandler
: testHandler
,
121 exitHandler
: testHandler
124 expect(pool
.emitter
).toBeUndefined()
125 expect(pool
.opts
.enableEvents
).toBe(false)
126 expect(pool
.opts
.restartWorkerOnError
).toBe(false)
127 expect(pool
.opts
.enableTasksQueue
).toBe(true)
128 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
129 expect(pool
.opts
.workerChoiceStrategy
).toBe(
130 WorkerChoiceStrategies
.LEAST_USED
132 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
134 weights
: { 0: 300, 1: 200 }
136 expect(pool
.opts
.messageHandler
).toStrictEqual(testHandler
)
137 expect(pool
.opts
.errorHandler
).toStrictEqual(testHandler
)
138 expect(pool
.opts
.onlineHandler
).toStrictEqual(testHandler
)
139 expect(pool
.opts
.exitHandler
).toStrictEqual(testHandler
)
143 it('Verify that pool options are validated', async () => {
148 './tests/worker-files/thread/testWorker.js',
150 enableTasksQueue
: true,
151 tasksQueueOptions
: { concurrency
: 0 }
154 ).toThrowError("Invalid worker tasks concurrency '0'")
159 './tests/worker-files/thread/testWorker.js',
161 workerChoiceStrategy
: 'invalidStrategy'
164 ).toThrowError("Invalid worker choice strategy 'invalidStrategy'")
169 './tests/worker-files/thread/testWorker.js',
171 workerChoiceStrategyOptions
: { weights
: {} }
175 'Invalid worker choice strategy options: must have a weight for each worker node'
179 it('Verify that worker choice strategy options can be set', async () => {
180 const pool
= new FixedThreadPool(
182 './tests/worker-files/thread/testWorker.js',
183 { workerChoiceStrategy
: WorkerChoiceStrategies
.FAIR_SHARE
}
185 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
189 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
190 .workerChoiceStrategies
) {
191 expect(workerChoiceStrategy
.opts
).toStrictEqual({
197 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
207 pool
.setWorkerChoiceStrategyOptions({ medRunTime
: true })
208 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
211 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
212 .workerChoiceStrategies
) {
213 expect(workerChoiceStrategy
.opts
).toStrictEqual({ medRunTime
: true })
216 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
226 pool
.setWorkerChoiceStrategyOptions({ medRunTime
: false })
227 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
230 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
231 .workerChoiceStrategies
) {
232 expect(workerChoiceStrategy
.opts
).toStrictEqual({ medRunTime
: false })
235 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
248 it('Verify that tasks queue can be enabled/disabled', async () => {
249 const pool
= new FixedThreadPool(
251 './tests/worker-files/thread/testWorker.js'
253 expect(pool
.opts
.enableTasksQueue
).toBe(false)
254 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
255 pool
.enableTasksQueue(true)
256 expect(pool
.opts
.enableTasksQueue
).toBe(true)
257 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 1 })
258 pool
.enableTasksQueue(true, { concurrency
: 2 })
259 expect(pool
.opts
.enableTasksQueue
).toBe(true)
260 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
261 pool
.enableTasksQueue(false)
262 expect(pool
.opts
.enableTasksQueue
).toBe(false)
263 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
267 it('Verify that tasks queue options can be set', async () => {
268 const pool
= new FixedThreadPool(
270 './tests/worker-files/thread/testWorker.js',
271 { enableTasksQueue
: true }
273 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 1 })
274 pool
.setTasksQueueOptions({ concurrency
: 2 })
275 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
276 expect(() => pool
.setTasksQueueOptions({ concurrency
: 0 })).toThrowError(
277 "Invalid worker tasks concurrency '0'"
282 it('Verify that pool info is set', async () => {
283 let pool
= new FixedThreadPool(
285 './tests/worker-files/thread/testWorker.js'
287 expect(pool
.info
).toStrictEqual({
288 type
: PoolTypes
.fixed
,
289 worker
: WorkerTypes
.thread
,
290 minSize
: numberOfWorkers
,
291 maxSize
: numberOfWorkers
,
292 workerNodes
: numberOfWorkers
,
293 idleWorkerNodes
: numberOfWorkers
,
302 pool
= new DynamicClusterPool(
305 './tests/worker-files/thread/testWorker.js'
307 expect(pool
.info
).toStrictEqual({
308 type
: PoolTypes
.dynamic
,
309 worker
: WorkerTypes
.cluster
,
310 minSize
: numberOfWorkers
,
311 maxSize
: numberOfWorkers
* 2,
312 workerNodes
: numberOfWorkers
,
313 idleWorkerNodes
: numberOfWorkers
,
324 it('Simulate worker not found', async () => {
325 const pool
= new StubPoolWithRemoveAllWorker(
327 './tests/worker-files/cluster/testWorker.js',
329 errorHandler
: e
=> console
.error(e
)
332 expect(pool
.workerNodes
.length
).toBe(numberOfWorkers
)
333 // Simulate worker not found.
334 pool
.removeAllWorker()
335 expect(pool
.workerNodes
.length
).toBe(0)
339 it('Verify that worker pool tasks usage are initialized', async () => {
340 const pool
= new FixedClusterPool(
342 './tests/worker-files/cluster/testWorker.js'
344 for (const workerNode
of pool
.workerNodes
) {
345 expect(workerNode
.workerUsage
).toStrictEqual({
356 history
: expect
.any(CircularArray
)
362 history
: expect
.any(CircularArray
)
370 it('Verify that worker pool tasks queue are initialized', async () => {
371 const pool
= new FixedClusterPool(
373 './tests/worker-files/cluster/testWorker.js'
375 for (const workerNode
of pool
.workerNodes
) {
376 expect(workerNode
.tasksQueue
).toBeDefined()
377 expect(workerNode
.tasksQueue
).toBeInstanceOf(Queue
)
378 expect(workerNode
.tasksQueue
.size
).toBe(0)
383 it('Verify that worker pool tasks usage are computed', async () => {
384 const pool
= new FixedClusterPool(
386 './tests/worker-files/cluster/testWorker.js'
388 const promises
= new Set()
389 const maxMultiplier
= 2
390 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
391 promises
.add(pool
.execute())
393 for (const workerNode
of pool
.workerNodes
) {
394 expect(workerNode
.workerUsage
).toStrictEqual({
397 executing
: maxMultiplier
,
405 history
: expect
.any(CircularArray
)
411 history
: expect
.any(CircularArray
)
416 await Promise
.all(promises
)
417 for (const workerNode
of pool
.workerNodes
) {
418 expect(workerNode
.workerUsage
).toStrictEqual({
420 executed
: maxMultiplier
,
429 history
: expect
.any(CircularArray
)
435 history
: expect
.any(CircularArray
)
443 it('Verify that worker pool tasks usage are reset at worker choice strategy change', async () => {
444 const pool
= new DynamicThreadPool(
447 './tests/worker-files/thread/testWorker.js'
449 const promises
= new Set()
450 const maxMultiplier
= 2
451 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
452 promises
.add(pool
.execute())
454 await Promise
.all(promises
)
455 for (const workerNode
of pool
.workerNodes
) {
456 expect(workerNode
.workerUsage
).toStrictEqual({
458 executed
: expect
.any(Number
),
467 history
: expect
.any(CircularArray
)
473 history
: expect
.any(CircularArray
)
477 expect(workerNode
.workerUsage
.tasks
.executed
).toBeGreaterThan(0)
478 expect(workerNode
.workerUsage
.tasks
.executed
).toBeLessThanOrEqual(
482 pool
.setWorkerChoiceStrategy(WorkerChoiceStrategies
.FAIR_SHARE
)
483 for (const workerNode
of pool
.workerNodes
) {
484 expect(workerNode
.workerUsage
).toStrictEqual({
495 history
: expect
.any(CircularArray
)
501 history
: expect
.any(CircularArray
)
505 expect(workerNode
.workerUsage
.runTime
.history
.length
).toBe(0)
506 expect(workerNode
.workerUsage
.waitTime
.history
.length
).toBe(0)
511 it("Verify that pool event emitter 'full' event can register a callback", async () => {
512 const pool
= new DynamicThreadPool(
515 './tests/worker-files/thread/testWorker.js'
517 const promises
= new Set()
520 pool
.emitter
.on(PoolEvents
.full
, info
=> {
524 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
525 promises
.add(pool
.execute())
527 await Promise
.all(promises
)
528 // The `full` event is triggered when the number of submitted tasks at once reach the max number of workers in the dynamic pool.
529 // So in total numberOfWorkers * 2 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool with min = max = numberOfWorkers.
530 expect(poolFull
).toBe(numberOfWorkers
* 2)
531 expect(poolInfo
).toStrictEqual({
532 type
: PoolTypes
.dynamic
,
533 worker
: WorkerTypes
.thread
,
534 minSize
: expect
.any(Number
),
535 maxSize
: expect
.any(Number
),
536 workerNodes
: expect
.any(Number
),
537 idleWorkerNodes
: expect
.any(Number
),
538 busyWorkerNodes
: expect
.any(Number
),
539 executedTasks
: expect
.any(Number
),
540 executingTasks
: expect
.any(Number
),
541 queuedTasks
: expect
.any(Number
),
542 maxQueuedTasks
: expect
.any(Number
),
543 failedTasks
: expect
.any(Number
)
548 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
549 const pool
= new FixedThreadPool(
551 './tests/worker-files/thread/testWorker.js'
553 const promises
= new Set()
556 pool
.emitter
.on(PoolEvents
.busy
, info
=> {
560 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
561 promises
.add(pool
.execute())
563 await Promise
.all(promises
)
564 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
565 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
566 expect(poolBusy
).toBe(numberOfWorkers
+ 1)
567 expect(poolInfo
).toStrictEqual({
568 type
: PoolTypes
.fixed
,
569 worker
: WorkerTypes
.thread
,
570 minSize
: expect
.any(Number
),
571 maxSize
: expect
.any(Number
),
572 workerNodes
: expect
.any(Number
),
573 idleWorkerNodes
: expect
.any(Number
),
574 busyWorkerNodes
: expect
.any(Number
),
575 executedTasks
: expect
.any(Number
),
576 executingTasks
: expect
.any(Number
),
577 queuedTasks
: expect
.any(Number
),
578 maxQueuedTasks
: expect
.any(Number
),
579 failedTasks
: expect
.any(Number
)
584 it('Verify that multiple tasks worker is working', async () => {
585 const pool
= new DynamicClusterPool(
588 './tests/worker-files/cluster/testMultiTasksWorker.js'
590 const data
= { n
: 10 }
591 const result0
= await pool
.execute(data
)
592 expect(result0
).toBe(false)
593 const result1
= await pool
.execute(data
, 'jsonIntegerSerialization')
594 expect(result1
).toBe(false)
595 const result2
= await pool
.execute(data
, 'factorial')
596 expect(result2
).toBe(3628800)
597 const result3
= await pool
.execute(data
, 'fibonacci')
598 expect(result3
).toBe(89)