1 const { expect
} = require('expect')
9 WorkerChoiceStrategies
,
11 } = require('../../../lib')
12 const { CircularArray
} = require('../../../lib/circular-array')
13 const { Queue
} = require('../../../lib/queue')
14 const { version
} = require('../../../package.json')
15 const { waitPoolEvents
} = require('../../test-utils')
17 describe('Abstract pool test suite', () => {
18 const numberOfWorkers
= 2
19 class StubPoolWithIsMain
extends FixedThreadPool
{
25 it('Simulate pool creation from a non main thread/process', () => {
28 new StubPoolWithIsMain(
30 './tests/worker-files/thread/testWorker.js',
32 errorHandler
: e
=> console
.error(e
)
35 ).toThrowError('Cannot start a pool from a worker!')
38 it('Verify that filePath is checked', () => {
39 const expectedError
= new Error(
40 'Please specify a file with a worker implementation'
42 expect(() => new FixedThreadPool(numberOfWorkers
)).toThrowError(
45 expect(() => new FixedThreadPool(numberOfWorkers
, '')).toThrowError(
48 expect(() => new FixedThreadPool(numberOfWorkers
, 0)).toThrowError(
51 expect(() => new FixedThreadPool(numberOfWorkers
, true)).toThrowError(
55 () => new FixedThreadPool(numberOfWorkers
, './dummyWorker.ts')
56 ).toThrowError(new Error("Cannot find the worker file './dummyWorker.ts'"))
59 it('Verify that numberOfWorkers is checked', () => {
60 expect(() => new FixedThreadPool()).toThrowError(
61 'Cannot instantiate a pool without specifying the number of workers'
65 it('Verify that a negative number of workers is checked', () => {
68 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
71 'Cannot instantiate a pool with a negative number of workers'
76 it('Verify that a non integer number of workers is checked', () => {
79 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
82 'Cannot instantiate a pool with a non safe integer number of workers'
87 it('Verify that dynamic pool sizing is checked', () => {
90 new DynamicThreadPool(2, 1, './tests/worker-files/thread/testWorker.js')
93 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
98 new DynamicThreadPool(1, 1, './tests/worker-files/thread/testWorker.js')
101 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
106 new DynamicThreadPool(0, 0, './tests/worker-files/thread/testWorker.js')
109 'Cannot instantiate a dynamic pool with a pool size equal to zero'
114 it('Verify that pool options are checked', async () => {
115 let pool
= new FixedThreadPool(
117 './tests/worker-files/thread/testWorker.js'
119 expect(pool
.emitter
).toBeDefined()
120 expect(pool
.opts
.enableEvents
).toBe(true)
121 expect(pool
.opts
.restartWorkerOnError
).toBe(true)
122 expect(pool
.opts
.enableTasksQueue
).toBe(false)
123 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
124 expect(pool
.opts
.workerChoiceStrategy
).toBe(
125 WorkerChoiceStrategies
.ROUND_ROBIN
127 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
128 runTime
: { median
: false },
129 waitTime
: { median
: false },
130 elu
: { median
: false }
132 expect(pool
.opts
.messageHandler
).toBeUndefined()
133 expect(pool
.opts
.errorHandler
).toBeUndefined()
134 expect(pool
.opts
.onlineHandler
).toBeUndefined()
135 expect(pool
.opts
.exitHandler
).toBeUndefined()
137 const testHandler
= () => console
.log('test handler executed')
138 pool
= new FixedThreadPool(
140 './tests/worker-files/thread/testWorker.js',
142 workerChoiceStrategy
: WorkerChoiceStrategies
.LEAST_USED
,
143 workerChoiceStrategyOptions
: {
144 runTime
: { median
: true },
145 weights
: { 0: 300, 1: 200 }
148 restartWorkerOnError
: false,
149 enableTasksQueue
: true,
150 tasksQueueOptions
: { concurrency
: 2 },
151 messageHandler
: testHandler
,
152 errorHandler
: testHandler
,
153 onlineHandler
: testHandler
,
154 exitHandler
: testHandler
157 expect(pool
.emitter
).toBeUndefined()
158 expect(pool
.opts
.enableEvents
).toBe(false)
159 expect(pool
.opts
.restartWorkerOnError
).toBe(false)
160 expect(pool
.opts
.enableTasksQueue
).toBe(true)
161 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
162 expect(pool
.opts
.workerChoiceStrategy
).toBe(
163 WorkerChoiceStrategies
.LEAST_USED
165 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
166 runTime
: { median
: true },
167 weights
: { 0: 300, 1: 200 }
169 expect(pool
.opts
.messageHandler
).toStrictEqual(testHandler
)
170 expect(pool
.opts
.errorHandler
).toStrictEqual(testHandler
)
171 expect(pool
.opts
.onlineHandler
).toStrictEqual(testHandler
)
172 expect(pool
.opts
.exitHandler
).toStrictEqual(testHandler
)
176 it('Verify that pool options are validated', async () => {
181 './tests/worker-files/thread/testWorker.js',
183 workerChoiceStrategy
: 'invalidStrategy'
186 ).toThrowError("Invalid worker choice strategy 'invalidStrategy'")
191 './tests/worker-files/thread/testWorker.js',
193 workerChoiceStrategyOptions
: 'invalidOptions'
197 'Invalid worker choice strategy options: must be a plain object'
203 './tests/worker-files/thread/testWorker.js',
205 workerChoiceStrategyOptions
: { weights
: {} }
209 'Invalid worker choice strategy options: must have a weight for each worker node'
215 './tests/worker-files/thread/testWorker.js',
217 workerChoiceStrategyOptions
: { measurement
: 'invalidMeasurement' }
221 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
227 './tests/worker-files/thread/testWorker.js',
229 enableTasksQueue
: true,
230 tasksQueueOptions
: { concurrency
: 0 }
233 ).toThrowError("Invalid worker tasks concurrency '0'")
238 './tests/worker-files/thread/testWorker.js',
240 enableTasksQueue
: true,
241 tasksQueueOptions
: 'invalidTasksQueueOptions'
244 ).toThrowError('Invalid tasks queue options: must be a plain object')
249 './tests/worker-files/thread/testWorker.js',
251 enableTasksQueue
: true,
252 tasksQueueOptions
: { concurrency
: 0.2 }
255 ).toThrowError('Invalid worker tasks concurrency: must be an integer')
258 it('Verify that pool worker choice strategy options can be set', async () => {
259 const pool
= new FixedThreadPool(
261 './tests/worker-files/thread/testWorker.js',
262 { workerChoiceStrategy
: WorkerChoiceStrategies
.FAIR_SHARE
}
264 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
265 runTime
: { median
: false },
266 waitTime
: { median
: false },
267 elu
: { median
: false }
269 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
270 .workerChoiceStrategies
) {
271 expect(workerChoiceStrategy
.opts
).toStrictEqual({
272 runTime
: { median
: false },
273 waitTime
: { median
: false },
274 elu
: { median
: false }
278 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
296 pool
.setWorkerChoiceStrategyOptions({
297 runTime
: { median
: true },
298 elu
: { median
: true }
300 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
301 runTime
: { median
: true },
302 elu
: { median
: true }
304 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
305 .workerChoiceStrategies
) {
306 expect(workerChoiceStrategy
.opts
).toStrictEqual({
307 runTime
: { median
: true },
308 elu
: { median
: true }
312 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
330 pool
.setWorkerChoiceStrategyOptions({
331 runTime
: { median
: false },
332 elu
: { median
: false }
334 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
335 runTime
: { median
: false },
336 elu
: { median
: false }
338 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
339 .workerChoiceStrategies
) {
340 expect(workerChoiceStrategy
.opts
).toStrictEqual({
341 runTime
: { median
: false },
342 elu
: { median
: false }
346 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
365 pool
.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
367 'Invalid worker choice strategy options: must be a plain object'
370 pool
.setWorkerChoiceStrategyOptions({ weights
: {} })
372 'Invalid worker choice strategy options: must have a weight for each worker node'
375 pool
.setWorkerChoiceStrategyOptions({ measurement
: 'invalidMeasurement' })
377 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
382 it('Verify that pool tasks queue can be enabled/disabled', async () => {
383 const pool
= new FixedThreadPool(
385 './tests/worker-files/thread/testWorker.js'
387 expect(pool
.opts
.enableTasksQueue
).toBe(false)
388 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
389 pool
.enableTasksQueue(true)
390 expect(pool
.opts
.enableTasksQueue
).toBe(true)
391 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 1 })
392 pool
.enableTasksQueue(true, { concurrency
: 2 })
393 expect(pool
.opts
.enableTasksQueue
).toBe(true)
394 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
395 pool
.enableTasksQueue(false)
396 expect(pool
.opts
.enableTasksQueue
).toBe(false)
397 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
401 it('Verify that pool tasks queue options can be set', async () => {
402 const pool
= new FixedThreadPool(
404 './tests/worker-files/thread/testWorker.js',
405 { enableTasksQueue
: true }
407 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 1 })
408 pool
.setTasksQueueOptions({ concurrency
: 2 })
409 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
411 pool
.setTasksQueueOptions('invalidTasksQueueOptions')
412 ).toThrowError('Invalid tasks queue options: must be a plain object')
413 expect(() => pool
.setTasksQueueOptions({ concurrency
: 0 })).toThrowError(
414 "Invalid worker tasks concurrency '0'"
416 expect(() => pool
.setTasksQueueOptions({ concurrency
: 0.2 })).toThrowError(
417 'Invalid worker tasks concurrency: must be an integer'
422 it('Verify that pool info is set', async () => {
423 let pool
= new FixedThreadPool(
425 './tests/worker-files/thread/testWorker.js'
427 expect(pool
.info
).toStrictEqual({
429 type
: PoolTypes
.fixed
,
430 worker
: WorkerTypes
.thread
,
432 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
433 minSize
: numberOfWorkers
,
434 maxSize
: numberOfWorkers
,
435 workerNodes
: numberOfWorkers
,
436 idleWorkerNodes
: numberOfWorkers
,
445 pool
= new DynamicClusterPool(
446 Math
.floor(numberOfWorkers
/ 2),
448 './tests/worker-files/cluster/testWorker.js'
450 expect(pool
.info
).toStrictEqual({
452 type
: PoolTypes
.dynamic
,
453 worker
: WorkerTypes
.cluster
,
455 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
456 minSize
: Math
.floor(numberOfWorkers
/ 2),
457 maxSize
: numberOfWorkers
,
458 workerNodes
: Math
.floor(numberOfWorkers
/ 2),
459 idleWorkerNodes
: Math
.floor(numberOfWorkers
/ 2),
470 it('Verify that pool worker tasks usage are initialized', async () => {
471 const pool
= new FixedClusterPool(
473 './tests/worker-files/cluster/testWorker.js'
475 for (const workerNode
of pool
.workerNodes
) {
476 expect(workerNode
.usage
).toStrictEqual({
485 history
: expect
.any(CircularArray
)
488 history
: expect
.any(CircularArray
)
492 history
: expect
.any(CircularArray
)
495 history
: expect
.any(CircularArray
)
503 it('Verify that pool worker tasks queue are initialized', async () => {
504 let pool
= new FixedClusterPool(
506 './tests/worker-files/cluster/testWorker.js'
508 for (const workerNode
of pool
.workerNodes
) {
509 expect(workerNode
.tasksQueue
).toBeDefined()
510 expect(workerNode
.tasksQueue
).toBeInstanceOf(Queue
)
511 expect(workerNode
.tasksQueue
.size
).toBe(0)
512 expect(workerNode
.tasksQueue
.maxSize
).toBe(0)
515 pool
= new DynamicThreadPool(
516 Math
.floor(numberOfWorkers
/ 2),
518 './tests/worker-files/thread/testWorker.js'
520 for (const workerNode
of pool
.workerNodes
) {
521 expect(workerNode
.tasksQueue
).toBeDefined()
522 expect(workerNode
.tasksQueue
).toBeInstanceOf(Queue
)
523 expect(workerNode
.tasksQueue
.size
).toBe(0)
524 expect(workerNode
.tasksQueue
.maxSize
).toBe(0)
528 it('Verify that pool worker info are initialized', async () => {
529 let pool
= new FixedClusterPool(
531 './tests/worker-files/cluster/testWorker.js'
533 for (const workerNode
of pool
.workerNodes
) {
534 expect(workerNode
.info
).toStrictEqual({
535 id
: expect
.any(Number
),
536 type
: WorkerTypes
.cluster
,
542 pool
= new DynamicThreadPool(
543 Math
.floor(numberOfWorkers
/ 2),
545 './tests/worker-files/thread/testWorker.js'
547 for (const workerNode
of pool
.workerNodes
) {
548 expect(workerNode
.info
).toStrictEqual({
549 id
: expect
.any(Number
),
550 type
: WorkerTypes
.thread
,
557 it('Verify that pool worker tasks usage are computed', async () => {
558 const pool
= new FixedClusterPool(
560 './tests/worker-files/cluster/testWorker.js'
562 const promises
= new Set()
563 const maxMultiplier
= 2
564 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
565 promises
.add(pool
.execute())
567 for (const workerNode
of pool
.workerNodes
) {
568 expect(workerNode
.usage
).toStrictEqual({
571 executing
: maxMultiplier
,
577 history
: expect
.any(CircularArray
)
580 history
: expect
.any(CircularArray
)
584 history
: expect
.any(CircularArray
)
587 history
: expect
.any(CircularArray
)
592 await Promise
.all(promises
)
593 for (const workerNode
of pool
.workerNodes
) {
594 expect(workerNode
.usage
).toStrictEqual({
596 executed
: maxMultiplier
,
603 history
: expect
.any(CircularArray
)
606 history
: expect
.any(CircularArray
)
610 history
: expect
.any(CircularArray
)
613 history
: expect
.any(CircularArray
)
621 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
622 const pool
= new DynamicThreadPool(
623 Math
.floor(numberOfWorkers
/ 2),
625 './tests/worker-files/thread/testWorker.js'
627 const promises
= new Set()
628 const maxMultiplier
= 2
629 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
630 promises
.add(pool
.execute())
632 await Promise
.all(promises
)
633 for (const workerNode
of pool
.workerNodes
) {
634 expect(workerNode
.usage
).toStrictEqual({
636 executed
: expect
.any(Number
),
643 history
: expect
.any(CircularArray
)
646 history
: expect
.any(CircularArray
)
650 history
: expect
.any(CircularArray
)
653 history
: expect
.any(CircularArray
)
657 expect(workerNode
.usage
.tasks
.executed
).toBeGreaterThan(0)
658 expect(workerNode
.usage
.tasks
.executed
).toBeLessThanOrEqual(
659 numberOfWorkers
* maxMultiplier
661 expect(workerNode
.usage
.runTime
.history
.length
).toBe(0)
662 expect(workerNode
.usage
.waitTime
.history
.length
).toBe(0)
663 expect(workerNode
.usage
.elu
.idle
.history
.length
).toBe(0)
664 expect(workerNode
.usage
.elu
.active
.history
.length
).toBe(0)
666 pool
.setWorkerChoiceStrategy(WorkerChoiceStrategies
.FAIR_SHARE
)
667 for (const workerNode
of pool
.workerNodes
) {
668 expect(workerNode
.usage
).toStrictEqual({
677 history
: expect
.any(CircularArray
)
680 history
: expect
.any(CircularArray
)
684 history
: expect
.any(CircularArray
)
687 history
: expect
.any(CircularArray
)
691 expect(workerNode
.usage
.runTime
.history
.length
).toBe(0)
692 expect(workerNode
.usage
.waitTime
.history
.length
).toBe(0)
693 expect(workerNode
.usage
.elu
.idle
.history
.length
).toBe(0)
694 expect(workerNode
.usage
.elu
.active
.history
.length
).toBe(0)
699 it("Verify that pool event emitter 'full' event can register a callback", async () => {
700 const pool
= new DynamicThreadPool(
701 Math
.floor(numberOfWorkers
/ 2),
703 './tests/worker-files/thread/testWorker.js'
705 const promises
= new Set()
708 pool
.emitter
.on(PoolEvents
.full
, info
=> {
712 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
713 promises
.add(pool
.execute())
715 await Promise
.all(promises
)
716 // The `full` event is triggered when the number of submitted tasks at once reach the maximum number of workers in the dynamic pool.
717 // So in total numberOfWorkers * 2 - 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool with min = (max = numberOfWorkers) / 2.
718 expect(poolFull
).toBe(numberOfWorkers
* 2 - 1)
719 expect(poolInfo
).toStrictEqual({
721 type
: PoolTypes
.dynamic
,
722 worker
: WorkerTypes
.thread
,
723 ready
: expect
.any(Boolean
),
724 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
725 minSize
: expect
.any(Number
),
726 maxSize
: expect
.any(Number
),
727 workerNodes
: expect
.any(Number
),
728 idleWorkerNodes
: expect
.any(Number
),
729 busyWorkerNodes
: expect
.any(Number
),
730 executedTasks
: expect
.any(Number
),
731 executingTasks
: expect
.any(Number
),
732 queuedTasks
: expect
.any(Number
),
733 maxQueuedTasks
: expect
.any(Number
),
734 failedTasks
: expect
.any(Number
)
739 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
740 const pool
= new DynamicClusterPool(
741 Math
.floor(numberOfWorkers
/ 2),
743 './tests/worker-files/cluster/testWorker.js'
747 pool
.emitter
.on(PoolEvents
.ready
, info
=> {
751 await
waitPoolEvents(pool
, PoolEvents
.ready
, 1)
752 expect(poolReady
).toBe(1)
753 expect(poolInfo
).toStrictEqual({
755 type
: PoolTypes
.dynamic
,
756 worker
: WorkerTypes
.cluster
,
758 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
759 minSize
: expect
.any(Number
),
760 maxSize
: expect
.any(Number
),
761 workerNodes
: expect
.any(Number
),
762 idleWorkerNodes
: expect
.any(Number
),
763 busyWorkerNodes
: expect
.any(Number
),
764 executedTasks
: expect
.any(Number
),
765 executingTasks
: expect
.any(Number
),
766 queuedTasks
: expect
.any(Number
),
767 maxQueuedTasks
: expect
.any(Number
),
768 failedTasks
: expect
.any(Number
)
773 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
774 const pool
= new FixedThreadPool(
776 './tests/worker-files/thread/testWorker.js'
778 const promises
= new Set()
781 pool
.emitter
.on(PoolEvents
.busy
, info
=> {
785 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
786 promises
.add(pool
.execute())
788 await Promise
.all(promises
)
789 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
790 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
791 expect(poolBusy
).toBe(numberOfWorkers
+ 1)
792 expect(poolInfo
).toStrictEqual({
794 type
: PoolTypes
.fixed
,
795 worker
: WorkerTypes
.thread
,
796 ready
: expect
.any(Boolean
),
797 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
798 minSize
: expect
.any(Number
),
799 maxSize
: expect
.any(Number
),
800 workerNodes
: expect
.any(Number
),
801 idleWorkerNodes
: expect
.any(Number
),
802 busyWorkerNodes
: expect
.any(Number
),
803 executedTasks
: expect
.any(Number
),
804 executingTasks
: expect
.any(Number
),
805 queuedTasks
: expect
.any(Number
),
806 maxQueuedTasks
: expect
.any(Number
),
807 failedTasks
: expect
.any(Number
)
812 it('Verify that multiple tasks worker is working', async () => {
813 const pool
= new DynamicClusterPool(
814 Math
.floor(numberOfWorkers
/ 2),
816 './tests/worker-files/cluster/testMultiTasksWorker.js'
818 const data
= { n
: 10 }
819 const result0
= await pool
.execute(data
)
820 expect(result0
).toStrictEqual({ ok
: 1 })
821 const result1
= await pool
.execute(data
, 'jsonIntegerSerialization')
822 expect(result1
).toStrictEqual({ ok
: 1 })
823 const result2
= await pool
.execute(data
, 'factorial')
824 expect(result2
).toBe(3628800)
825 const result3
= await pool
.execute(data
, 'fibonacci')
826 expect(result3
).toBe(55)