1 const { expect
} = require('expect')
8 WorkerChoiceStrategies
,
11 } = require('../../../lib')
12 const { CircularArray
} = require('../../../lib/circular-array')
13 const { Queue
} = require('../../../lib/queue')
15 describe('Abstract pool test suite', () => {
16 const numberOfWorkers
= 2
17 class StubPoolWithRemoveAllWorker
extends FixedThreadPool
{
20 this.promiseResponseMap
.clear()
23 class StubPoolWithIsMain
extends FixedThreadPool
{
29 it('Simulate pool creation from a non main thread/process', () => {
32 new StubPoolWithIsMain(
34 './tests/worker-files/thread/testWorker.js',
36 errorHandler
: e
=> console
.error(e
)
39 ).toThrowError('Cannot start a pool from a worker!')
42 it('Verify that filePath is checked', () => {
43 const expectedError
= new Error(
44 'Please specify a file with a worker implementation'
46 expect(() => new FixedThreadPool(numberOfWorkers
)).toThrowError(
49 expect(() => new FixedThreadPool(numberOfWorkers
, '')).toThrowError(
54 it('Verify that numberOfWorkers is checked', () => {
55 expect(() => new FixedThreadPool()).toThrowError(
56 'Cannot instantiate a pool without specifying the number of workers'
60 it('Verify that a negative number of workers is checked', () => {
63 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
66 'Cannot instantiate a pool with a negative number of workers'
71 it('Verify that a non integer number of workers is checked', () => {
74 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
77 'Cannot instantiate a pool with a non safe integer number of workers'
82 it('Verify that pool options are checked', async () => {
83 let pool
= new FixedThreadPool(
85 './tests/worker-files/thread/testWorker.js'
87 expect(pool
.emitter
).toBeDefined()
88 expect(pool
.opts
.enableEvents
).toBe(true)
89 expect(pool
.opts
.restartWorkerOnError
).toBe(true)
90 expect(pool
.opts
.enableTasksQueue
).toBe(false)
91 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
92 expect(pool
.opts
.workerChoiceStrategy
).toBe(
93 WorkerChoiceStrategies
.ROUND_ROBIN
95 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
96 runTime
: { median
: false },
97 waitTime
: { median
: false },
98 elu
: { median
: false }
100 expect(pool
.opts
.messageHandler
).toBeUndefined()
101 expect(pool
.opts
.errorHandler
).toBeUndefined()
102 expect(pool
.opts
.onlineHandler
).toBeUndefined()
103 expect(pool
.opts
.exitHandler
).toBeUndefined()
105 const testHandler
= () => console
.log('test handler executed')
106 pool
= new FixedThreadPool(
108 './tests/worker-files/thread/testWorker.js',
110 workerChoiceStrategy
: WorkerChoiceStrategies
.LEAST_USED
,
111 workerChoiceStrategyOptions
: {
112 runTime
: { median
: true },
113 weights
: { 0: 300, 1: 200 }
116 restartWorkerOnError
: false,
117 enableTasksQueue
: true,
118 tasksQueueOptions
: { concurrency
: 2 },
119 messageHandler
: testHandler
,
120 errorHandler
: testHandler
,
121 onlineHandler
: testHandler
,
122 exitHandler
: testHandler
125 expect(pool
.emitter
).toBeUndefined()
126 expect(pool
.opts
.enableEvents
).toBe(false)
127 expect(pool
.opts
.restartWorkerOnError
).toBe(false)
128 expect(pool
.opts
.enableTasksQueue
).toBe(true)
129 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
130 expect(pool
.opts
.workerChoiceStrategy
).toBe(
131 WorkerChoiceStrategies
.LEAST_USED
133 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
134 runTime
: { median
: true },
135 weights
: { 0: 300, 1: 200 }
137 expect(pool
.opts
.messageHandler
).toStrictEqual(testHandler
)
138 expect(pool
.opts
.errorHandler
).toStrictEqual(testHandler
)
139 expect(pool
.opts
.onlineHandler
).toStrictEqual(testHandler
)
140 expect(pool
.opts
.exitHandler
).toStrictEqual(testHandler
)
144 it('Verify that pool options are validated', async () => {
149 './tests/worker-files/thread/testWorker.js',
151 workerChoiceStrategy
: 'invalidStrategy'
154 ).toThrowError("Invalid worker choice strategy 'invalidStrategy'")
159 './tests/worker-files/thread/testWorker.js',
161 workerChoiceStrategyOptions
: 'invalidOptions'
165 'Invalid worker choice strategy options: must be a plain object'
171 './tests/worker-files/thread/testWorker.js',
173 workerChoiceStrategyOptions
: { weights
: {} }
177 'Invalid worker choice strategy options: must have a weight for each worker node'
183 './tests/worker-files/thread/testWorker.js',
185 workerChoiceStrategyOptions
: { measurement
: 'invalidMeasurement' }
189 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
195 './tests/worker-files/thread/testWorker.js',
197 enableTasksQueue
: true,
198 tasksQueueOptions
: { concurrency
: 0 }
201 ).toThrowError("Invalid worker tasks concurrency '0'")
206 './tests/worker-files/thread/testWorker.js',
208 enableTasksQueue
: true,
209 tasksQueueOptions
: 'invalidTasksQueueOptions'
212 ).toThrowError('Invalid tasks queue options: must be a plain object')
217 './tests/worker-files/thread/testWorker.js',
219 enableTasksQueue
: true,
220 tasksQueueOptions
: { concurrency
: 0.2 }
223 ).toThrowError('Invalid worker tasks concurrency: must be an integer')
226 it('Verify that worker choice strategy options can be set', async () => {
227 const pool
= new FixedThreadPool(
229 './tests/worker-files/thread/testWorker.js',
230 { workerChoiceStrategy
: WorkerChoiceStrategies
.FAIR_SHARE
}
232 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
233 runTime
: { median
: false },
234 waitTime
: { median
: false },
235 elu
: { median
: false }
237 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
238 .workerChoiceStrategies
) {
239 expect(workerChoiceStrategy
.opts
).toStrictEqual({
240 runTime
: { median
: false },
241 waitTime
: { median
: false },
242 elu
: { median
: false }
246 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
264 pool
.setWorkerChoiceStrategyOptions({
265 runTime
: { median
: true },
266 elu
: { median
: true }
268 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
269 runTime
: { median
: true },
270 elu
: { median
: true }
272 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
273 .workerChoiceStrategies
) {
274 expect(workerChoiceStrategy
.opts
).toStrictEqual({
275 runTime
: { median
: true },
276 elu
: { median
: true }
280 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
298 pool
.setWorkerChoiceStrategyOptions({
299 runTime
: { median
: false },
300 elu
: { median
: false }
302 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
303 runTime
: { median
: false },
304 elu
: { median
: false }
306 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
307 .workerChoiceStrategies
) {
308 expect(workerChoiceStrategy
.opts
).toStrictEqual({
309 runTime
: { median
: false },
310 elu
: { median
: false }
314 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
333 pool
.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
335 'Invalid worker choice strategy options: must be a plain object'
338 pool
.setWorkerChoiceStrategyOptions({ weights
: {} })
340 'Invalid worker choice strategy options: must have a weight for each worker node'
343 pool
.setWorkerChoiceStrategyOptions({ measurement
: 'invalidMeasurement' })
345 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
350 it('Verify that tasks queue can be enabled/disabled', async () => {
351 const pool
= new FixedThreadPool(
353 './tests/worker-files/thread/testWorker.js'
355 expect(pool
.opts
.enableTasksQueue
).toBe(false)
356 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
357 pool
.enableTasksQueue(true)
358 expect(pool
.opts
.enableTasksQueue
).toBe(true)
359 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 1 })
360 pool
.enableTasksQueue(true, { concurrency
: 2 })
361 expect(pool
.opts
.enableTasksQueue
).toBe(true)
362 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
363 pool
.enableTasksQueue(false)
364 expect(pool
.opts
.enableTasksQueue
).toBe(false)
365 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
369 it('Verify that tasks queue options can be set', async () => {
370 const pool
= new FixedThreadPool(
372 './tests/worker-files/thread/testWorker.js',
373 { enableTasksQueue
: true }
375 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 1 })
376 pool
.setTasksQueueOptions({ concurrency
: 2 })
377 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
379 pool
.setTasksQueueOptions('invalidTasksQueueOptions')
380 ).toThrowError('Invalid tasks queue options: must be a plain object')
381 expect(() => pool
.setTasksQueueOptions({ concurrency
: 0 })).toThrowError(
382 "Invalid worker tasks concurrency '0'"
384 expect(() => pool
.setTasksQueueOptions({ concurrency
: 0.2 })).toThrowError(
385 'Invalid worker tasks concurrency: must be an integer'
390 it('Verify that pool info is set', async () => {
391 let pool
= new FixedThreadPool(
393 './tests/worker-files/thread/testWorker.js'
395 expect(pool
.info
).toStrictEqual({
396 type
: PoolTypes
.fixed
,
397 worker
: WorkerTypes
.thread
,
398 minSize
: numberOfWorkers
,
399 maxSize
: numberOfWorkers
,
400 workerNodes
: numberOfWorkers
,
401 idleWorkerNodes
: numberOfWorkers
,
409 for (const workerNode
of pool
.workerNodes
) {
410 console
.log('thread:workerNode.info', workerNode
.info
)
413 pool
= new DynamicClusterPool(
416 './tests/worker-files/cluster/testWorker.js'
418 expect(pool
.info
).toStrictEqual({
419 type
: PoolTypes
.dynamic
,
420 worker
: WorkerTypes
.cluster
,
421 minSize
: numberOfWorkers
,
422 maxSize
: numberOfWorkers
* 2,
423 workerNodes
: numberOfWorkers
,
424 idleWorkerNodes
: numberOfWorkers
,
432 for (const workerNode
of pool
.workerNodes
) {
433 console
.log('cluster:workerNode.info', workerNode
.info
)
438 it('Simulate worker not found', async () => {
439 const pool
= new StubPoolWithRemoveAllWorker(
441 './tests/worker-files/thread/testWorker.js',
443 errorHandler
: e
=> console
.error(e
)
446 expect(pool
.workerNodes
.length
).toBe(numberOfWorkers
)
447 // Simulate worker not found.
448 pool
.removeAllWorker()
449 expect(pool
.workerNodes
.length
).toBe(0)
453 it('Verify that worker pool tasks usage are initialized', async () => {
454 const pool
= new FixedClusterPool(
456 './tests/worker-files/cluster/testWorker.js'
458 for (const workerNode
of pool
.workerNodes
) {
459 expect(workerNode
.usage
).toStrictEqual({
471 history
: expect
.any(CircularArray
)
477 history
: expect
.any(CircularArray
)
484 history
: expect
.any(CircularArray
)
490 history
: expect
.any(CircularArray
)
499 it('Verify that worker pool tasks queue are initialized', async () => {
500 const pool
= new FixedClusterPool(
502 './tests/worker-files/cluster/testWorker.js'
504 for (const workerNode
of pool
.workerNodes
) {
505 expect(workerNode
.tasksQueue
).toBeDefined()
506 expect(workerNode
.tasksQueue
).toBeInstanceOf(Queue
)
507 expect(workerNode
.tasksQueue
.size
).toBe(0)
508 expect(workerNode
.tasksQueue
.maxSize
).toBe(0)
513 it('Verify that worker pool tasks usage are computed', async () => {
514 const pool
= new FixedClusterPool(
516 './tests/worker-files/cluster/testWorker.js'
518 const promises
= new Set()
519 const maxMultiplier
= 2
520 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
521 promises
.add(pool
.execute())
523 for (const workerNode
of pool
.workerNodes
) {
524 expect(workerNode
.usage
).toStrictEqual({
527 executing
: maxMultiplier
,
536 history
: expect
.any(CircularArray
)
542 history
: expect
.any(CircularArray
)
549 history
: expect
.any(CircularArray
)
555 history
: expect
.any(CircularArray
)
561 await Promise
.all(promises
)
562 for (const workerNode
of pool
.workerNodes
) {
563 expect(workerNode
.usage
).toStrictEqual({
565 executed
: maxMultiplier
,
575 history
: expect
.any(CircularArray
)
581 history
: expect
.any(CircularArray
)
588 history
: expect
.any(CircularArray
)
594 history
: expect
.any(CircularArray
)
603 it('Verify that worker pool tasks usage are reset at worker choice strategy change', async () => {
604 const pool
= new DynamicThreadPool(
607 './tests/worker-files/thread/testWorker.js'
609 const promises
= new Set()
610 const maxMultiplier
= 2
611 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
612 promises
.add(pool
.execute())
614 await Promise
.all(promises
)
615 for (const workerNode
of pool
.workerNodes
) {
616 expect(workerNode
.usage
).toStrictEqual({
618 executed
: expect
.any(Number
),
628 history
: expect
.any(CircularArray
)
634 history
: expect
.any(CircularArray
)
641 history
: expect
.any(CircularArray
)
647 history
: expect
.any(CircularArray
)
652 expect(workerNode
.usage
.tasks
.executed
).toBeGreaterThan(0)
653 expect(workerNode
.usage
.tasks
.executed
).toBeLessThanOrEqual(maxMultiplier
)
655 pool
.setWorkerChoiceStrategy(WorkerChoiceStrategies
.FAIR_SHARE
)
656 for (const workerNode
of pool
.workerNodes
) {
657 expect(workerNode
.usage
).toStrictEqual({
669 history
: expect
.any(CircularArray
)
675 history
: expect
.any(CircularArray
)
682 history
: expect
.any(CircularArray
)
688 history
: expect
.any(CircularArray
)
693 expect(workerNode
.usage
.runTime
.history
.length
).toBe(0)
694 expect(workerNode
.usage
.waitTime
.history
.length
).toBe(0)
699 it("Verify that pool event emitter 'full' event can register a callback", async () => {
700 const pool
= new DynamicThreadPool(
703 './tests/worker-files/thread/testWorker.js'
705 const promises
= new Set()
708 pool
.emitter
.on(PoolEvents
.full
, info
=> {
712 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
713 promises
.add(pool
.execute())
715 await Promise
.all(promises
)
716 // The `full` event is triggered when the number of submitted tasks at once reach the max number of workers in the dynamic pool.
717 // So in total numberOfWorkers * 2 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool with min = max = numberOfWorkers.
718 expect(poolFull
).toBe(numberOfWorkers
* 2)
719 expect(poolInfo
).toStrictEqual({
720 type
: PoolTypes
.dynamic
,
721 worker
: WorkerTypes
.thread
,
722 minSize
: expect
.any(Number
),
723 maxSize
: expect
.any(Number
),
724 workerNodes
: expect
.any(Number
),
725 idleWorkerNodes
: expect
.any(Number
),
726 busyWorkerNodes
: expect
.any(Number
),
727 executedTasks
: expect
.any(Number
),
728 executingTasks
: expect
.any(Number
),
729 queuedTasks
: expect
.any(Number
),
730 maxQueuedTasks
: expect
.any(Number
),
731 failedTasks
: expect
.any(Number
)
736 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
737 const pool
= new FixedThreadPool(
739 './tests/worker-files/thread/testWorker.js'
741 const promises
= new Set()
744 pool
.emitter
.on(PoolEvents
.busy
, info
=> {
748 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
749 promises
.add(pool
.execute())
751 await Promise
.all(promises
)
752 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
753 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
754 expect(poolBusy
).toBe(numberOfWorkers
+ 1)
755 expect(poolInfo
).toStrictEqual({
756 type
: PoolTypes
.fixed
,
757 worker
: WorkerTypes
.thread
,
758 minSize
: expect
.any(Number
),
759 maxSize
: expect
.any(Number
),
760 workerNodes
: expect
.any(Number
),
761 idleWorkerNodes
: expect
.any(Number
),
762 busyWorkerNodes
: expect
.any(Number
),
763 executedTasks
: expect
.any(Number
),
764 executingTasks
: expect
.any(Number
),
765 queuedTasks
: expect
.any(Number
),
766 maxQueuedTasks
: expect
.any(Number
),
767 failedTasks
: expect
.any(Number
)
772 it('Verify that multiple tasks worker is working', async () => {
773 const pool
= new DynamicClusterPool(
776 './tests/worker-files/cluster/testMultiTasksWorker.js'
778 const data
= { n
: 10 }
779 const result0
= await pool
.execute(data
)
780 expect(result0
).toBe(false)
781 const result1
= await pool
.execute(data
, 'jsonIntegerSerialization')
782 expect(result1
).toBe(false)
783 const result2
= await pool
.execute(data
, 'factorial')
784 expect(result2
).toBe(3628800)
785 const result3
= await pool
.execute(data
, 'fibonacci')
786 expect(result3
).toBe(55)