1 const { expect
} = require('expect')
8 WorkerChoiceStrategies
,
11 } = require('../../../lib')
12 const { CircularArray
} = require('../../../lib/circular-array')
13 const { Queue
} = require('../../../lib/queue')
15 describe('Abstract pool test suite', () => {
16 const numberOfWorkers
= 2
17 class StubPoolWithRemoveAllWorker
extends FixedThreadPool
{
20 this.promiseResponseMap
.clear()
23 class StubPoolWithIsMain
extends FixedThreadPool
{
29 it('Simulate pool creation from a non main thread/process', () => {
32 new StubPoolWithIsMain(
34 './tests/worker-files/thread/testWorker.js',
36 errorHandler
: e
=> console
.error(e
)
39 ).toThrowError('Cannot start a pool from a worker!')
42 it('Verify that filePath is checked', () => {
43 const expectedError
= new Error(
44 'Please specify a file with a worker implementation'
46 expect(() => new FixedThreadPool(numberOfWorkers
)).toThrowError(
49 expect(() => new FixedThreadPool(numberOfWorkers
, '')).toThrowError(
54 it('Verify that numberOfWorkers is checked', () => {
55 expect(() => new FixedThreadPool()).toThrowError(
56 'Cannot instantiate a pool without specifying the number of workers'
60 it('Verify that a negative number of workers is checked', () => {
63 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
66 'Cannot instantiate a pool with a negative number of workers'
71 it('Verify that a non integer number of workers is checked', () => {
74 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
77 'Cannot instantiate a pool with a non safe integer number of workers'
82 it('Verify that pool options are checked', async () => {
83 let pool
= new FixedThreadPool(
85 './tests/worker-files/thread/testWorker.js'
87 expect(pool
.emitter
).toBeDefined()
88 expect(pool
.opts
.enableEvents
).toBe(true)
89 expect(pool
.opts
.restartWorkerOnError
).toBe(true)
90 expect(pool
.opts
.enableTasksQueue
).toBe(false)
91 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
92 expect(pool
.opts
.workerChoiceStrategy
).toBe(
93 WorkerChoiceStrategies
.ROUND_ROBIN
95 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
96 runTime
: { median
: false },
97 waitTime
: { median
: false }
99 expect(pool
.opts
.messageHandler
).toBeUndefined()
100 expect(pool
.opts
.errorHandler
).toBeUndefined()
101 expect(pool
.opts
.onlineHandler
).toBeUndefined()
102 expect(pool
.opts
.exitHandler
).toBeUndefined()
104 const testHandler
= () => console
.log('test handler executed')
105 pool
= new FixedThreadPool(
107 './tests/worker-files/thread/testWorker.js',
109 workerChoiceStrategy
: WorkerChoiceStrategies
.LEAST_USED
,
110 workerChoiceStrategyOptions
: {
111 runTime
: { median
: true },
112 weights
: { 0: 300, 1: 200 }
115 restartWorkerOnError
: false,
116 enableTasksQueue
: true,
117 tasksQueueOptions
: { concurrency
: 2 },
118 messageHandler
: testHandler
,
119 errorHandler
: testHandler
,
120 onlineHandler
: testHandler
,
121 exitHandler
: testHandler
124 expect(pool
.emitter
).toBeUndefined()
125 expect(pool
.opts
.enableEvents
).toBe(false)
126 expect(pool
.opts
.restartWorkerOnError
).toBe(false)
127 expect(pool
.opts
.enableTasksQueue
).toBe(true)
128 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
129 expect(pool
.opts
.workerChoiceStrategy
).toBe(
130 WorkerChoiceStrategies
.LEAST_USED
132 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
133 runTime
: { median
: true },
134 weights
: { 0: 300, 1: 200 }
136 expect(pool
.opts
.messageHandler
).toStrictEqual(testHandler
)
137 expect(pool
.opts
.errorHandler
).toStrictEqual(testHandler
)
138 expect(pool
.opts
.onlineHandler
).toStrictEqual(testHandler
)
139 expect(pool
.opts
.exitHandler
).toStrictEqual(testHandler
)
143 it('Verify that pool options are validated', async () => {
148 './tests/worker-files/thread/testWorker.js',
150 enableTasksQueue
: true,
151 tasksQueueOptions
: { concurrency
: 0 }
154 ).toThrowError("Invalid worker tasks concurrency '0'")
159 './tests/worker-files/thread/testWorker.js',
161 workerChoiceStrategy
: 'invalidStrategy'
164 ).toThrowError("Invalid worker choice strategy 'invalidStrategy'")
169 './tests/worker-files/thread/testWorker.js',
171 workerChoiceStrategyOptions
: { weights
: {} }
175 'Invalid worker choice strategy options: must have a weight for each worker node'
179 it('Verify that worker choice strategy options can be set', async () => {
180 const pool
= new FixedThreadPool(
182 './tests/worker-files/thread/testWorker.js',
183 { workerChoiceStrategy
: WorkerChoiceStrategies
.FAIR_SHARE
}
185 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
186 runTime
: { median
: false },
187 waitTime
: { median
: false }
189 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
190 .workerChoiceStrategies
) {
191 expect(workerChoiceStrategy
.opts
).toStrictEqual({
192 runTime
: { median
: false },
193 waitTime
: { median
: false }
197 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
211 pool
.setWorkerChoiceStrategyOptions({ runTime
: { median
: true } })
212 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
213 runTime
: { median
: true }
215 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
216 .workerChoiceStrategies
) {
217 expect(workerChoiceStrategy
.opts
).toStrictEqual({
218 runTime
: { median
: true }
222 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
236 pool
.setWorkerChoiceStrategyOptions({ runTime
: { median
: false } })
237 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
238 runTime
: { median
: false }
240 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
241 .workerChoiceStrategies
) {
242 expect(workerChoiceStrategy
.opts
).toStrictEqual({
243 runTime
: { median
: false }
247 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
264 it('Verify that tasks queue can be enabled/disabled', async () => {
265 const pool
= new FixedThreadPool(
267 './tests/worker-files/thread/testWorker.js'
269 expect(pool
.opts
.enableTasksQueue
).toBe(false)
270 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
271 pool
.enableTasksQueue(true)
272 expect(pool
.opts
.enableTasksQueue
).toBe(true)
273 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 1 })
274 pool
.enableTasksQueue(true, { concurrency
: 2 })
275 expect(pool
.opts
.enableTasksQueue
).toBe(true)
276 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
277 pool
.enableTasksQueue(false)
278 expect(pool
.opts
.enableTasksQueue
).toBe(false)
279 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
283 it('Verify that tasks queue options can be set', async () => {
284 const pool
= new FixedThreadPool(
286 './tests/worker-files/thread/testWorker.js',
287 { enableTasksQueue
: true }
289 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 1 })
290 pool
.setTasksQueueOptions({ concurrency
: 2 })
291 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
292 expect(() => pool
.setTasksQueueOptions({ concurrency
: 0 })).toThrowError(
293 "Invalid worker tasks concurrency '0'"
298 it('Verify that pool info is set', async () => {
299 let pool
= new FixedThreadPool(
301 './tests/worker-files/thread/testWorker.js'
303 expect(pool
.info
).toStrictEqual({
304 type
: PoolTypes
.fixed
,
305 worker
: WorkerTypes
.thread
,
306 minSize
: numberOfWorkers
,
307 maxSize
: numberOfWorkers
,
308 workerNodes
: numberOfWorkers
,
309 idleWorkerNodes
: numberOfWorkers
,
318 pool
= new DynamicClusterPool(
321 './tests/worker-files/thread/testWorker.js'
323 expect(pool
.info
).toStrictEqual({
324 type
: PoolTypes
.dynamic
,
325 worker
: WorkerTypes
.cluster
,
326 minSize
: numberOfWorkers
,
327 maxSize
: numberOfWorkers
* 2,
328 workerNodes
: numberOfWorkers
,
329 idleWorkerNodes
: numberOfWorkers
,
340 it('Simulate worker not found', async () => {
341 const pool
= new StubPoolWithRemoveAllWorker(
343 './tests/worker-files/cluster/testWorker.js',
345 errorHandler
: e
=> console
.error(e
)
348 expect(pool
.workerNodes
.length
).toBe(numberOfWorkers
)
349 // Simulate worker not found.
350 pool
.removeAllWorker()
351 expect(pool
.workerNodes
.length
).toBe(0)
355 it('Verify that worker pool tasks usage are initialized', async () => {
356 const pool
= new FixedClusterPool(
358 './tests/worker-files/cluster/testWorker.js'
360 for (const workerNode
of pool
.workerNodes
) {
361 expect(workerNode
.workerUsage
).toStrictEqual({
372 history
: expect
.any(CircularArray
)
378 history
: expect
.any(CircularArray
)
386 it('Verify that worker pool tasks queue are initialized', async () => {
387 const pool
= new FixedClusterPool(
389 './tests/worker-files/cluster/testWorker.js'
391 for (const workerNode
of pool
.workerNodes
) {
392 expect(workerNode
.tasksQueue
).toBeDefined()
393 expect(workerNode
.tasksQueue
).toBeInstanceOf(Queue
)
394 expect(workerNode
.tasksQueue
.size
).toBe(0)
399 it('Verify that worker pool tasks usage are computed', async () => {
400 const pool
= new FixedClusterPool(
402 './tests/worker-files/cluster/testWorker.js'
404 const promises
= new Set()
405 const maxMultiplier
= 2
406 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
407 promises
.add(pool
.execute())
409 for (const workerNode
of pool
.workerNodes
) {
410 expect(workerNode
.workerUsage
).toStrictEqual({
413 executing
: maxMultiplier
,
421 history
: expect
.any(CircularArray
)
427 history
: expect
.any(CircularArray
)
432 await Promise
.all(promises
)
433 for (const workerNode
of pool
.workerNodes
) {
434 expect(workerNode
.workerUsage
).toStrictEqual({
436 executed
: maxMultiplier
,
445 history
: expect
.any(CircularArray
)
451 history
: expect
.any(CircularArray
)
459 it('Verify that worker pool tasks usage are reset at worker choice strategy change', async () => {
460 const pool
= new DynamicThreadPool(
463 './tests/worker-files/thread/testWorker.js'
465 const promises
= new Set()
466 const maxMultiplier
= 2
467 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
468 promises
.add(pool
.execute())
470 await Promise
.all(promises
)
471 for (const workerNode
of pool
.workerNodes
) {
472 expect(workerNode
.workerUsage
).toStrictEqual({
474 executed
: expect
.any(Number
),
483 history
: expect
.any(CircularArray
)
489 history
: expect
.any(CircularArray
)
493 expect(workerNode
.workerUsage
.tasks
.executed
).toBeGreaterThan(0)
494 expect(workerNode
.workerUsage
.tasks
.executed
).toBeLessThanOrEqual(
498 pool
.setWorkerChoiceStrategy(WorkerChoiceStrategies
.FAIR_SHARE
)
499 for (const workerNode
of pool
.workerNodes
) {
500 expect(workerNode
.workerUsage
).toStrictEqual({
511 history
: expect
.any(CircularArray
)
517 history
: expect
.any(CircularArray
)
521 expect(workerNode
.workerUsage
.runTime
.history
.length
).toBe(0)
522 expect(workerNode
.workerUsage
.waitTime
.history
.length
).toBe(0)
527 it("Verify that pool event emitter 'full' event can register a callback", async () => {
528 const pool
= new DynamicThreadPool(
531 './tests/worker-files/thread/testWorker.js'
533 const promises
= new Set()
536 pool
.emitter
.on(PoolEvents
.full
, info
=> {
540 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
541 promises
.add(pool
.execute())
543 await Promise
.all(promises
)
544 // The `full` event is triggered when the number of submitted tasks at once reach the max number of workers in the dynamic pool.
545 // So in total numberOfWorkers * 2 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool with min = max = numberOfWorkers.
546 expect(poolFull
).toBe(numberOfWorkers
* 2)
547 expect(poolInfo
).toStrictEqual({
548 type
: PoolTypes
.dynamic
,
549 worker
: WorkerTypes
.thread
,
550 minSize
: expect
.any(Number
),
551 maxSize
: expect
.any(Number
),
552 workerNodes
: expect
.any(Number
),
553 idleWorkerNodes
: expect
.any(Number
),
554 busyWorkerNodes
: expect
.any(Number
),
555 executedTasks
: expect
.any(Number
),
556 executingTasks
: expect
.any(Number
),
557 queuedTasks
: expect
.any(Number
),
558 maxQueuedTasks
: expect
.any(Number
),
559 failedTasks
: expect
.any(Number
)
564 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
565 const pool
= new FixedThreadPool(
567 './tests/worker-files/thread/testWorker.js'
569 const promises
= new Set()
572 pool
.emitter
.on(PoolEvents
.busy
, info
=> {
576 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
577 promises
.add(pool
.execute())
579 await Promise
.all(promises
)
580 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
581 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
582 expect(poolBusy
).toBe(numberOfWorkers
+ 1)
583 expect(poolInfo
).toStrictEqual({
584 type
: PoolTypes
.fixed
,
585 worker
: WorkerTypes
.thread
,
586 minSize
: expect
.any(Number
),
587 maxSize
: expect
.any(Number
),
588 workerNodes
: expect
.any(Number
),
589 idleWorkerNodes
: expect
.any(Number
),
590 busyWorkerNodes
: expect
.any(Number
),
591 executedTasks
: expect
.any(Number
),
592 executingTasks
: expect
.any(Number
),
593 queuedTasks
: expect
.any(Number
),
594 maxQueuedTasks
: expect
.any(Number
),
595 failedTasks
: expect
.any(Number
)
600 it('Verify that multiple tasks worker is working', async () => {
601 const pool
= new DynamicClusterPool(
604 './tests/worker-files/cluster/testMultiTasksWorker.js'
606 const data
= { n
: 10 }
607 const result0
= await pool
.execute(data
)
608 expect(result0
).toBe(false)
609 const result1
= await pool
.execute(data
, 'jsonIntegerSerialization')
610 expect(result1
).toBe(false)
611 const result2
= await pool
.execute(data
, 'factorial')
612 expect(result2
).toBe(3628800)
613 const result3
= await pool
.execute(data
, 'fibonacci')
614 expect(result3
).toBe(89)