1 const { expect
} = require('expect')
8 WorkerChoiceStrategies
,
11 } = require('../../../lib')
12 const { CircularArray
} = require('../../../lib/circular-array')
13 const { Queue
} = require('../../../lib/queue')
15 describe('Abstract pool test suite', () => {
16 const numberOfWorkers
= 2
17 class StubPoolWithRemoveAllWorker
extends FixedThreadPool
{
20 this.promiseResponseMap
.clear()
23 class StubPoolWithIsMain
extends FixedThreadPool
{
29 it('Simulate pool creation from a non main thread/process', () => {
32 new StubPoolWithIsMain(
34 './tests/worker-files/thread/testWorker.js',
36 errorHandler
: e
=> console
.error(e
)
39 ).toThrowError('Cannot start a pool from a worker!')
42 it('Verify that filePath is checked', () => {
43 const expectedError
= new Error(
44 'Please specify a file with a worker implementation'
46 expect(() => new FixedThreadPool(numberOfWorkers
)).toThrowError(
49 expect(() => new FixedThreadPool(numberOfWorkers
, '')).toThrowError(
54 it('Verify that numberOfWorkers is checked', () => {
55 expect(() => new FixedThreadPool()).toThrowError(
56 'Cannot instantiate a pool without specifying the number of workers'
60 it('Verify that a negative number of workers is checked', () => {
63 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
66 'Cannot instantiate a pool with a negative number of workers'
71 it('Verify that a non integer number of workers is checked', () => {
74 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
77 'Cannot instantiate a pool with a non safe integer number of workers'
82 it('Verify that pool options are checked', async () => {
83 let pool
= new FixedThreadPool(
85 './tests/worker-files/thread/testWorker.js'
87 expect(pool
.emitter
).toBeDefined()
88 expect(pool
.opts
.enableEvents
).toBe(true)
89 expect(pool
.opts
.restartWorkerOnError
).toBe(true)
90 expect(pool
.opts
.enableTasksQueue
).toBe(false)
91 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
92 expect(pool
.opts
.workerChoiceStrategy
).toBe(
93 WorkerChoiceStrategies
.ROUND_ROBIN
95 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
99 expect(pool
.opts
.messageHandler
).toBeUndefined()
100 expect(pool
.opts
.errorHandler
).toBeUndefined()
101 expect(pool
.opts
.onlineHandler
).toBeUndefined()
102 expect(pool
.opts
.exitHandler
).toBeUndefined()
104 const testHandler
= () => console
.log('test handler executed')
105 pool
= new FixedThreadPool(
107 './tests/worker-files/thread/testWorker.js',
109 workerChoiceStrategy
: WorkerChoiceStrategies
.LEAST_USED
,
110 workerChoiceStrategyOptions
: {
112 weights
: { 0: 300, 1: 200 }
115 restartWorkerOnError
: false,
116 enableTasksQueue
: true,
117 tasksQueueOptions
: { concurrency
: 2 },
118 messageHandler
: testHandler
,
119 errorHandler
: testHandler
,
120 onlineHandler
: testHandler
,
121 exitHandler
: testHandler
124 expect(pool
.emitter
).toBeUndefined()
125 expect(pool
.opts
.enableEvents
).toBe(false)
126 expect(pool
.opts
.restartWorkerOnError
).toBe(false)
127 expect(pool
.opts
.enableTasksQueue
).toBe(true)
128 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
129 expect(pool
.opts
.workerChoiceStrategy
).toBe(
130 WorkerChoiceStrategies
.LEAST_USED
132 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
134 weights
: { 0: 300, 1: 200 }
136 expect(pool
.opts
.messageHandler
).toStrictEqual(testHandler
)
137 expect(pool
.opts
.errorHandler
).toStrictEqual(testHandler
)
138 expect(pool
.opts
.onlineHandler
).toStrictEqual(testHandler
)
139 expect(pool
.opts
.exitHandler
).toStrictEqual(testHandler
)
143 it('Verify that pool options are validated', async () => {
148 './tests/worker-files/thread/testWorker.js',
150 enableTasksQueue
: true,
151 tasksQueueOptions
: { concurrency
: 0 }
154 ).toThrowError("Invalid worker tasks concurrency '0'")
159 './tests/worker-files/thread/testWorker.js',
161 workerChoiceStrategy
: 'invalidStrategy'
164 ).toThrowError("Invalid worker choice strategy 'invalidStrategy'")
169 './tests/worker-files/thread/testWorker.js',
171 workerChoiceStrategyOptions
: { weights
: {} }
175 'Invalid worker choice strategy options: must have a weight for each worker node'
179 it('Verify that worker choice strategy options can be set', async () => {
180 const pool
= new FixedThreadPool(
182 './tests/worker-files/thread/testWorker.js',
183 { workerChoiceStrategy
: WorkerChoiceStrategies
.FAIR_SHARE
}
185 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
189 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
190 .workerChoiceStrategies
) {
191 expect(workerChoiceStrategy
.opts
).toStrictEqual({
196 expect(pool
.workerChoiceStrategyContext
.getTaskStatistics()).toStrictEqual({
205 pool
.setWorkerChoiceStrategyOptions({ medRunTime
: true })
206 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
209 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
210 .workerChoiceStrategies
) {
211 expect(workerChoiceStrategy
.opts
).toStrictEqual({ medRunTime
: true })
213 expect(pool
.workerChoiceStrategyContext
.getTaskStatistics()).toStrictEqual({
222 pool
.setWorkerChoiceStrategyOptions({ medRunTime
: false })
223 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
226 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
227 .workerChoiceStrategies
) {
228 expect(workerChoiceStrategy
.opts
).toStrictEqual({ medRunTime
: false })
230 expect(pool
.workerChoiceStrategyContext
.getTaskStatistics()).toStrictEqual({
242 it('Verify that tasks queue can be enabled/disabled', async () => {
243 const pool
= new FixedThreadPool(
245 './tests/worker-files/thread/testWorker.js'
247 expect(pool
.opts
.enableTasksQueue
).toBe(false)
248 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
249 pool
.enableTasksQueue(true)
250 expect(pool
.opts
.enableTasksQueue
).toBe(true)
251 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 1 })
252 pool
.enableTasksQueue(true, { concurrency
: 2 })
253 expect(pool
.opts
.enableTasksQueue
).toBe(true)
254 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
255 pool
.enableTasksQueue(false)
256 expect(pool
.opts
.enableTasksQueue
).toBe(false)
257 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
261 it('Verify that tasks queue options can be set', async () => {
262 const pool
= new FixedThreadPool(
264 './tests/worker-files/thread/testWorker.js',
265 { enableTasksQueue
: true }
267 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 1 })
268 pool
.setTasksQueueOptions({ concurrency
: 2 })
269 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
270 expect(() => pool
.setTasksQueueOptions({ concurrency
: 0 })).toThrowError(
271 "Invalid worker tasks concurrency '0'"
276 it('Verify that pool info is set', async () => {
277 let pool
= new FixedThreadPool(
279 './tests/worker-files/thread/testWorker.js'
281 expect(pool
.info
).toStrictEqual({
282 type
: PoolTypes
.fixed
,
283 worker
: WorkerTypes
.thread
,
284 minSize
: numberOfWorkers
,
285 maxSize
: numberOfWorkers
,
286 workerNodes
: numberOfWorkers
,
287 idleWorkerNodes
: numberOfWorkers
,
296 pool
= new DynamicClusterPool(
299 './tests/worker-files/thread/testWorker.js'
301 expect(pool
.info
).toStrictEqual({
302 type
: PoolTypes
.dynamic
,
303 worker
: WorkerTypes
.cluster
,
304 minSize
: numberOfWorkers
,
305 maxSize
: numberOfWorkers
* 2,
306 workerNodes
: numberOfWorkers
,
307 idleWorkerNodes
: numberOfWorkers
,
318 it('Simulate worker not found', async () => {
319 const pool
= new StubPoolWithRemoveAllWorker(
321 './tests/worker-files/cluster/testWorker.js',
323 errorHandler
: e
=> console
.error(e
)
326 expect(pool
.workerNodes
.length
).toBe(numberOfWorkers
)
327 // Simulate worker not found.
328 pool
.removeAllWorker()
329 expect(pool
.workerNodes
.length
).toBe(0)
333 it('Verify that worker pool tasks usage are initialized', async () => {
334 const pool
= new FixedClusterPool(
336 './tests/worker-files/cluster/testWorker.js'
338 for (const workerNode
of pool
.workerNodes
) {
339 expect(workerNode
.workerUsage
).toStrictEqual({
350 history
: expect
.any(CircularArray
)
356 history
: expect
.any(CircularArray
)
364 it('Verify that worker pool tasks queue are initialized', async () => {
365 const pool
= new FixedClusterPool(
367 './tests/worker-files/cluster/testWorker.js'
369 for (const workerNode
of pool
.workerNodes
) {
370 expect(workerNode
.tasksQueue
).toBeDefined()
371 expect(workerNode
.tasksQueue
).toBeInstanceOf(Queue
)
372 expect(workerNode
.tasksQueue
.size
).toBe(0)
377 it('Verify that worker pool tasks usage are computed', async () => {
378 const pool
= new FixedClusterPool(
380 './tests/worker-files/cluster/testWorker.js'
382 const promises
= new Set()
383 const maxMultiplier
= 2
384 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
385 promises
.add(pool
.execute())
387 for (const workerNode
of pool
.workerNodes
) {
388 expect(workerNode
.workerUsage
).toStrictEqual({
391 executing
: maxMultiplier
,
399 history
: expect
.any(CircularArray
)
405 history
: expect
.any(CircularArray
)
410 await Promise
.all(promises
)
411 for (const workerNode
of pool
.workerNodes
) {
412 expect(workerNode
.workerUsage
).toStrictEqual({
414 executed
: maxMultiplier
,
423 history
: expect
.any(CircularArray
)
429 history
: expect
.any(CircularArray
)
437 it('Verify that worker pool tasks usage are reset at worker choice strategy change', async () => {
438 const pool
= new DynamicThreadPool(
441 './tests/worker-files/thread/testWorker.js'
443 const promises
= new Set()
444 const maxMultiplier
= 2
445 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
446 promises
.add(pool
.execute())
448 await Promise
.all(promises
)
449 for (const workerNode
of pool
.workerNodes
) {
450 expect(workerNode
.workerUsage
).toStrictEqual({
452 executed
: expect
.any(Number
),
461 history
: expect
.any(CircularArray
)
467 history
: expect
.any(CircularArray
)
471 expect(workerNode
.workerUsage
.tasks
.executed
).toBeGreaterThan(0)
472 expect(workerNode
.workerUsage
.tasks
.executed
).toBeLessThanOrEqual(
476 pool
.setWorkerChoiceStrategy(WorkerChoiceStrategies
.FAIR_SHARE
)
477 for (const workerNode
of pool
.workerNodes
) {
478 expect(workerNode
.workerUsage
).toStrictEqual({
489 history
: expect
.any(CircularArray
)
495 history
: expect
.any(CircularArray
)
499 expect(workerNode
.workerUsage
.runTime
.history
.length
).toBe(0)
500 expect(workerNode
.workerUsage
.waitTime
.history
.length
).toBe(0)
505 it("Verify that pool event emitter 'full' event can register a callback", async () => {
506 const pool
= new DynamicThreadPool(
509 './tests/worker-files/thread/testWorker.js'
511 const promises
= new Set()
514 pool
.emitter
.on(PoolEvents
.full
, info
=> {
518 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
519 promises
.add(pool
.execute())
521 await Promise
.all(promises
)
522 // The `full` event is triggered when the number of submitted tasks at once reach the max number of workers in the dynamic pool.
523 // So in total numberOfWorkers * 2 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool with min = max = numberOfWorkers.
524 expect(poolFull
).toBe(numberOfWorkers
* 2)
525 expect(poolInfo
).toStrictEqual({
526 type
: PoolTypes
.dynamic
,
527 worker
: WorkerTypes
.thread
,
528 minSize
: expect
.any(Number
),
529 maxSize
: expect
.any(Number
),
530 workerNodes
: expect
.any(Number
),
531 idleWorkerNodes
: expect
.any(Number
),
532 busyWorkerNodes
: expect
.any(Number
),
533 executedTasks
: expect
.any(Number
),
534 executingTasks
: expect
.any(Number
),
535 queuedTasks
: expect
.any(Number
),
536 maxQueuedTasks
: expect
.any(Number
),
537 failedTasks
: expect
.any(Number
)
542 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
543 const pool
= new FixedThreadPool(
545 './tests/worker-files/thread/testWorker.js'
547 const promises
= new Set()
550 pool
.emitter
.on(PoolEvents
.busy
, info
=> {
554 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
555 promises
.add(pool
.execute())
557 await Promise
.all(promises
)
558 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
559 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
560 expect(poolBusy
).toBe(numberOfWorkers
+ 1)
561 expect(poolInfo
).toStrictEqual({
562 type
: PoolTypes
.fixed
,
563 worker
: WorkerTypes
.thread
,
564 minSize
: expect
.any(Number
),
565 maxSize
: expect
.any(Number
),
566 workerNodes
: expect
.any(Number
),
567 idleWorkerNodes
: expect
.any(Number
),
568 busyWorkerNodes
: expect
.any(Number
),
569 executedTasks
: expect
.any(Number
),
570 executingTasks
: expect
.any(Number
),
571 queuedTasks
: expect
.any(Number
),
572 maxQueuedTasks
: expect
.any(Number
),
573 failedTasks
: expect
.any(Number
)
578 it('Verify that multiple tasks worker is working', async () => {
579 const pool
= new DynamicClusterPool(
582 './tests/worker-files/cluster/testMultiTasksWorker.js'
584 const data
= { n
: 10 }
585 const result0
= await pool
.execute(data
)
586 expect(result0
).toBe(false)
587 const result1
= await pool
.execute(data
, 'jsonIntegerSerialization')
588 expect(result1
).toBe(false)
589 const result2
= await pool
.execute(data
, 'factorial')
590 expect(result2
).toBe(3628800)
591 const result3
= await pool
.execute(data
, 'fibonacci')
592 expect(result3
).toBe(89)