1 const { expect
} = require('expect')
9 WorkerChoiceStrategies
,
11 } = require('../../../lib')
12 const { CircularArray
} = require('../../../lib/circular-array')
13 const { Queue
} = require('../../../lib/queue')
14 const { version
} = require('../../../package.json')
15 const { waitPoolEvents
} = require('../../test-utils')
17 describe('Abstract pool test suite', () => {
18 const numberOfWorkers
= 2
19 class StubPoolWithIsMain
extends FixedThreadPool
{
25 it('Simulate pool creation from a non main thread/process', () => {
28 new StubPoolWithIsMain(
30 './tests/worker-files/thread/testWorker.js',
32 errorHandler
: e
=> console
.error(e
)
35 ).toThrowError('Cannot start a pool from a worker!')
38 it('Verify that filePath is checked', () => {
39 const expectedError
= new Error(
40 'Please specify a file with a worker implementation'
42 expect(() => new FixedThreadPool(numberOfWorkers
)).toThrowError(
45 expect(() => new FixedThreadPool(numberOfWorkers
, '')).toThrowError(
48 expect(() => new FixedThreadPool(numberOfWorkers
, 0)).toThrowError(
51 expect(() => new FixedThreadPool(numberOfWorkers
, true)).toThrowError(
55 () => new FixedThreadPool(numberOfWorkers
, './dummyWorker.ts')
56 ).toThrowError(new Error("Cannot find the worker file './dummyWorker.ts'"))
59 it('Verify that numberOfWorkers is checked', () => {
60 expect(() => new FixedThreadPool()).toThrowError(
61 'Cannot instantiate a pool without specifying the number of workers'
65 it('Verify that a negative number of workers is checked', () => {
68 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
71 'Cannot instantiate a pool with a negative number of workers'
76 it('Verify that a non integer number of workers is checked', () => {
79 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
82 'Cannot instantiate a pool with a non safe integer number of workers'
87 it('Verify that dynamic pool sizing is checked', () => {
90 new DynamicThreadPool(2, 1, './tests/worker-files/thread/testWorker.js')
93 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
98 new DynamicThreadPool(1, 1, './tests/worker-files/thread/testWorker.js')
101 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
106 new DynamicThreadPool(0, 0, './tests/worker-files/thread/testWorker.js')
109 'Cannot instantiate a dynamic pool with a pool size equal to zero'
114 it('Verify that pool options are checked', async () => {
115 let pool
= new FixedThreadPool(
117 './tests/worker-files/thread/testWorker.js'
119 expect(pool
.emitter
).toBeDefined()
120 expect(pool
.opts
.enableEvents
).toBe(true)
121 expect(pool
.opts
.restartWorkerOnError
).toBe(true)
122 expect(pool
.opts
.enableTasksQueue
).toBe(false)
123 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
124 expect(pool
.opts
.workerChoiceStrategy
).toBe(
125 WorkerChoiceStrategies
.ROUND_ROBIN
127 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
128 runTime
: { median
: false },
129 waitTime
: { median
: false },
130 elu
: { median
: false }
132 expect(pool
.opts
.messageHandler
).toBeUndefined()
133 expect(pool
.opts
.errorHandler
).toBeUndefined()
134 expect(pool
.opts
.onlineHandler
).toBeUndefined()
135 expect(pool
.opts
.exitHandler
).toBeUndefined()
137 const testHandler
= () => console
.log('test handler executed')
138 pool
= new FixedThreadPool(
140 './tests/worker-files/thread/testWorker.js',
142 workerChoiceStrategy
: WorkerChoiceStrategies
.LEAST_USED
,
143 workerChoiceStrategyOptions
: {
144 runTime
: { median
: true },
145 weights
: { 0: 300, 1: 200 }
148 restartWorkerOnError
: false,
149 enableTasksQueue
: true,
150 tasksQueueOptions
: { concurrency
: 2 },
151 messageHandler
: testHandler
,
152 errorHandler
: testHandler
,
153 onlineHandler
: testHandler
,
154 exitHandler
: testHandler
157 expect(pool
.emitter
).toBeUndefined()
158 expect(pool
.opts
.enableEvents
).toBe(false)
159 expect(pool
.opts
.restartWorkerOnError
).toBe(false)
160 expect(pool
.opts
.enableTasksQueue
).toBe(true)
161 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
162 expect(pool
.opts
.workerChoiceStrategy
).toBe(
163 WorkerChoiceStrategies
.LEAST_USED
165 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
166 runTime
: { median
: true },
167 weights
: { 0: 300, 1: 200 }
169 expect(pool
.opts
.messageHandler
).toStrictEqual(testHandler
)
170 expect(pool
.opts
.errorHandler
).toStrictEqual(testHandler
)
171 expect(pool
.opts
.onlineHandler
).toStrictEqual(testHandler
)
172 expect(pool
.opts
.exitHandler
).toStrictEqual(testHandler
)
176 it('Verify that pool options are validated', async () => {
181 './tests/worker-files/thread/testWorker.js',
183 workerChoiceStrategy
: 'invalidStrategy'
186 ).toThrowError("Invalid worker choice strategy 'invalidStrategy'")
191 './tests/worker-files/thread/testWorker.js',
193 workerChoiceStrategyOptions
: 'invalidOptions'
197 'Invalid worker choice strategy options: must be a plain object'
203 './tests/worker-files/thread/testWorker.js',
205 workerChoiceStrategyOptions
: { weights
: {} }
209 'Invalid worker choice strategy options: must have a weight for each worker node'
215 './tests/worker-files/thread/testWorker.js',
217 workerChoiceStrategyOptions
: { measurement
: 'invalidMeasurement' }
221 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
227 './tests/worker-files/thread/testWorker.js',
229 enableTasksQueue
: true,
230 tasksQueueOptions
: { concurrency
: 0 }
233 ).toThrowError("Invalid worker tasks concurrency '0'")
238 './tests/worker-files/thread/testWorker.js',
240 enableTasksQueue
: true,
241 tasksQueueOptions
: 'invalidTasksQueueOptions'
244 ).toThrowError('Invalid tasks queue options: must be a plain object')
249 './tests/worker-files/thread/testWorker.js',
251 enableTasksQueue
: true,
252 tasksQueueOptions
: { concurrency
: 0.2 }
255 ).toThrowError('Invalid worker tasks concurrency: must be an integer')
258 it('Verify that pool worker choice strategy options can be set', async () => {
259 const pool
= new FixedThreadPool(
261 './tests/worker-files/thread/testWorker.js',
262 { workerChoiceStrategy
: WorkerChoiceStrategies
.FAIR_SHARE
}
264 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
265 runTime
: { median
: false },
266 waitTime
: { median
: false },
267 elu
: { median
: false }
269 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
270 .workerChoiceStrategies
) {
271 expect(workerChoiceStrategy
.opts
).toStrictEqual({
272 runTime
: { median
: false },
273 waitTime
: { median
: false },
274 elu
: { median
: false }
278 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
296 pool
.setWorkerChoiceStrategyOptions({
297 runTime
: { median
: true },
298 elu
: { median
: true }
300 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
301 runTime
: { median
: true },
302 elu
: { median
: true }
304 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
305 .workerChoiceStrategies
) {
306 expect(workerChoiceStrategy
.opts
).toStrictEqual({
307 runTime
: { median
: true },
308 elu
: { median
: true }
312 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
330 pool
.setWorkerChoiceStrategyOptions({
331 runTime
: { median
: false },
332 elu
: { median
: false }
334 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
335 runTime
: { median
: false },
336 elu
: { median
: false }
338 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
339 .workerChoiceStrategies
) {
340 expect(workerChoiceStrategy
.opts
).toStrictEqual({
341 runTime
: { median
: false },
342 elu
: { median
: false }
346 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
365 pool
.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
367 'Invalid worker choice strategy options: must be a plain object'
370 pool
.setWorkerChoiceStrategyOptions({ weights
: {} })
372 'Invalid worker choice strategy options: must have a weight for each worker node'
375 pool
.setWorkerChoiceStrategyOptions({ measurement
: 'invalidMeasurement' })
377 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
382 it('Verify that pool tasks queue can be enabled/disabled', async () => {
383 const pool
= new FixedThreadPool(
385 './tests/worker-files/thread/testWorker.js'
387 expect(pool
.opts
.enableTasksQueue
).toBe(false)
388 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
389 pool
.enableTasksQueue(true)
390 expect(pool
.opts
.enableTasksQueue
).toBe(true)
391 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 1 })
392 pool
.enableTasksQueue(true, { concurrency
: 2 })
393 expect(pool
.opts
.enableTasksQueue
).toBe(true)
394 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
395 pool
.enableTasksQueue(false)
396 expect(pool
.opts
.enableTasksQueue
).toBe(false)
397 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
401 it('Verify that pool tasks queue options can be set', async () => {
402 const pool
= new FixedThreadPool(
404 './tests/worker-files/thread/testWorker.js',
405 { enableTasksQueue
: true }
407 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 1 })
408 pool
.setTasksQueueOptions({ concurrency
: 2 })
409 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
411 pool
.setTasksQueueOptions('invalidTasksQueueOptions')
412 ).toThrowError('Invalid tasks queue options: must be a plain object')
413 expect(() => pool
.setTasksQueueOptions({ concurrency
: 0 })).toThrowError(
414 "Invalid worker tasks concurrency '0'"
416 expect(() => pool
.setTasksQueueOptions({ concurrency
: 0.2 })).toThrowError(
417 'Invalid worker tasks concurrency: must be an integer'
422 it('Verify that pool info is set', async () => {
423 let pool
= new FixedThreadPool(
425 './tests/worker-files/thread/testWorker.js'
427 expect(pool
.info
).toStrictEqual({
429 type
: PoolTypes
.fixed
,
430 worker
: WorkerTypes
.thread
,
432 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
433 minSize
: numberOfWorkers
,
434 maxSize
: numberOfWorkers
,
435 workerNodes
: numberOfWorkers
,
436 idleWorkerNodes
: numberOfWorkers
,
445 pool
= new DynamicClusterPool(
446 Math
.floor(numberOfWorkers
/ 2),
448 './tests/worker-files/cluster/testWorker.js'
450 expect(pool
.info
).toStrictEqual({
452 type
: PoolTypes
.dynamic
,
453 worker
: WorkerTypes
.cluster
,
455 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
456 minSize
: Math
.floor(numberOfWorkers
/ 2),
457 maxSize
: numberOfWorkers
,
458 workerNodes
: Math
.floor(numberOfWorkers
/ 2),
459 idleWorkerNodes
: Math
.floor(numberOfWorkers
/ 2),
470 it('Verify that pool worker tasks usage are initialized', async () => {
471 const pool
= new FixedClusterPool(
473 './tests/worker-files/cluster/testWorker.js'
475 for (const workerNode
of pool
.workerNodes
) {
476 expect(workerNode
.usage
).toStrictEqual({
485 history
: expect
.any(CircularArray
)
488 history
: expect
.any(CircularArray
)
492 history
: expect
.any(CircularArray
)
495 history
: expect
.any(CircularArray
)
503 it('Verify that pool worker tasks queue are initialized', async () => {
504 let pool
= new FixedClusterPool(
506 './tests/worker-files/cluster/testWorker.js'
508 for (const workerNode
of pool
.workerNodes
) {
509 expect(workerNode
.tasksQueue
).toBeDefined()
510 expect(workerNode
.tasksQueue
).toBeInstanceOf(Queue
)
511 expect(workerNode
.tasksQueue
.size
).toBe(0)
512 expect(workerNode
.tasksQueue
.maxSize
).toBe(0)
515 pool
= new DynamicThreadPool(
516 Math
.floor(numberOfWorkers
/ 2),
518 './tests/worker-files/thread/testWorker.js'
520 for (const workerNode
of pool
.workerNodes
) {
521 expect(workerNode
.tasksQueue
).toBeDefined()
522 expect(workerNode
.tasksQueue
).toBeInstanceOf(Queue
)
523 expect(workerNode
.tasksQueue
.size
).toBe(0)
524 expect(workerNode
.tasksQueue
.maxSize
).toBe(0)
528 it('Verify that pool worker info are initialized', async () => {
529 let pool
= new FixedClusterPool(
531 './tests/worker-files/cluster/testWorker.js'
533 for (const workerNode
of pool
.workerNodes
) {
534 expect(workerNode
.info
).toStrictEqual({
535 id
: expect
.any(Number
),
536 type
: WorkerTypes
.cluster
,
542 pool
= new DynamicThreadPool(
543 Math
.floor(numberOfWorkers
/ 2),
545 './tests/worker-files/thread/testWorker.js'
547 for (const workerNode
of pool
.workerNodes
) {
548 expect(workerNode
.info
).toStrictEqual({
549 id
: expect
.any(Number
),
550 type
: WorkerTypes
.thread
,
557 it('Verify that pool worker tasks usage are computed', async () => {
558 const pool
= new FixedClusterPool(
560 './tests/worker-files/cluster/testWorker.js'
562 const promises
= new Set()
563 const maxMultiplier
= 2
564 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
565 promises
.add(pool
.execute())
567 for (const workerNode
of pool
.workerNodes
) {
568 expect(workerNode
.usage
).toStrictEqual({
571 executing
: maxMultiplier
,
577 history
: expect
.any(CircularArray
)
580 history
: expect
.any(CircularArray
)
584 history
: expect
.any(CircularArray
)
587 history
: expect
.any(CircularArray
)
592 await Promise
.all(promises
)
593 for (const workerNode
of pool
.workerNodes
) {
594 expect(workerNode
.usage
).toStrictEqual({
596 executed
: maxMultiplier
,
603 history
: expect
.any(CircularArray
)
606 history
: expect
.any(CircularArray
)
610 history
: expect
.any(CircularArray
)
613 history
: expect
.any(CircularArray
)
621 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
622 const pool
= new DynamicThreadPool(
623 Math
.floor(numberOfWorkers
/ 2),
625 './tests/worker-files/thread/testWorker.js'
627 const promises
= new Set()
628 const maxMultiplier
= 2
629 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
630 promises
.add(pool
.execute())
632 await Promise
.all(promises
)
633 for (const workerNode
of pool
.workerNodes
) {
634 expect(workerNode
.usage
).toStrictEqual({
636 executed
: expect
.any(Number
),
643 history
: expect
.any(CircularArray
)
646 history
: expect
.any(CircularArray
)
650 history
: expect
.any(CircularArray
)
653 history
: expect
.any(CircularArray
)
657 expect(workerNode
.usage
.tasks
.executed
).toBeGreaterThan(0)
658 expect(workerNode
.usage
.tasks
.executed
).toBeLessThanOrEqual(maxMultiplier
)
659 expect(workerNode
.usage
.runTime
.history
.length
).toBe(0)
660 expect(workerNode
.usage
.waitTime
.history
.length
).toBe(0)
661 expect(workerNode
.usage
.elu
.idle
.history
.length
).toBe(0)
662 expect(workerNode
.usage
.elu
.active
.history
.length
).toBe(0)
664 pool
.setWorkerChoiceStrategy(WorkerChoiceStrategies
.FAIR_SHARE
)
665 for (const workerNode
of pool
.workerNodes
) {
666 expect(workerNode
.usage
).toStrictEqual({
675 history
: expect
.any(CircularArray
)
678 history
: expect
.any(CircularArray
)
682 history
: expect
.any(CircularArray
)
685 history
: expect
.any(CircularArray
)
689 expect(workerNode
.usage
.runTime
.history
.length
).toBe(0)
690 expect(workerNode
.usage
.waitTime
.history
.length
).toBe(0)
691 expect(workerNode
.usage
.elu
.idle
.history
.length
).toBe(0)
692 expect(workerNode
.usage
.elu
.active
.history
.length
).toBe(0)
697 it("Verify that pool event emitter 'full' event can register a callback", async () => {
698 const pool
= new DynamicThreadPool(
699 Math
.floor(numberOfWorkers
/ 2),
701 './tests/worker-files/thread/testWorker.js'
703 const promises
= new Set()
706 pool
.emitter
.on(PoolEvents
.full
, info
=> {
710 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
711 promises
.add(pool
.execute())
713 await Promise
.all(promises
)
714 // The `full` event is triggered when the number of submitted tasks at once reach the maximum number of workers in the dynamic pool.
715 // So in total numberOfWorkers * 2 - 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool with min = (max = numberOfWorkers) / 2.
716 expect(poolFull
).toBe(numberOfWorkers
* 2 - 1)
717 expect(poolInfo
).toStrictEqual({
719 type
: PoolTypes
.dynamic
,
720 worker
: WorkerTypes
.thread
,
721 ready
: expect
.any(Boolean
),
722 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
723 minSize
: expect
.any(Number
),
724 maxSize
: expect
.any(Number
),
725 workerNodes
: expect
.any(Number
),
726 idleWorkerNodes
: expect
.any(Number
),
727 busyWorkerNodes
: expect
.any(Number
),
728 executedTasks
: expect
.any(Number
),
729 executingTasks
: expect
.any(Number
),
730 queuedTasks
: expect
.any(Number
),
731 maxQueuedTasks
: expect
.any(Number
),
732 failedTasks
: expect
.any(Number
)
737 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
738 const pool
= new DynamicClusterPool(
739 Math
.floor(numberOfWorkers
/ 2),
741 './tests/worker-files/cluster/testWorker.js'
745 pool
.emitter
.on(PoolEvents
.ready
, info
=> {
749 await
waitPoolEvents(pool
, PoolEvents
.ready
, 1)
750 expect(poolReady
).toBe(1)
751 expect(poolInfo
).toStrictEqual({
753 type
: PoolTypes
.dynamic
,
754 worker
: WorkerTypes
.cluster
,
756 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
757 minSize
: expect
.any(Number
),
758 maxSize
: expect
.any(Number
),
759 workerNodes
: expect
.any(Number
),
760 idleWorkerNodes
: expect
.any(Number
),
761 busyWorkerNodes
: expect
.any(Number
),
762 executedTasks
: expect
.any(Number
),
763 executingTasks
: expect
.any(Number
),
764 queuedTasks
: expect
.any(Number
),
765 maxQueuedTasks
: expect
.any(Number
),
766 failedTasks
: expect
.any(Number
)
771 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
772 const pool
= new FixedThreadPool(
774 './tests/worker-files/thread/testWorker.js'
776 const promises
= new Set()
779 pool
.emitter
.on(PoolEvents
.busy
, info
=> {
783 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
784 promises
.add(pool
.execute())
786 await Promise
.all(promises
)
787 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
788 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
789 expect(poolBusy
).toBe(numberOfWorkers
+ 1)
790 expect(poolInfo
).toStrictEqual({
792 type
: PoolTypes
.fixed
,
793 worker
: WorkerTypes
.thread
,
794 ready
: expect
.any(Boolean
),
795 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
796 minSize
: expect
.any(Number
),
797 maxSize
: expect
.any(Number
),
798 workerNodes
: expect
.any(Number
),
799 idleWorkerNodes
: expect
.any(Number
),
800 busyWorkerNodes
: expect
.any(Number
),
801 executedTasks
: expect
.any(Number
),
802 executingTasks
: expect
.any(Number
),
803 queuedTasks
: expect
.any(Number
),
804 maxQueuedTasks
: expect
.any(Number
),
805 failedTasks
: expect
.any(Number
)
810 it('Verify that multiple tasks worker is working', async () => {
811 const pool
= new DynamicClusterPool(
812 Math
.floor(numberOfWorkers
/ 2),
814 './tests/worker-files/cluster/testMultiTasksWorker.js'
816 const data
= { n
: 10 }
817 const result0
= await pool
.execute(data
)
818 expect(result0
).toStrictEqual({ ok
: 1 })
819 const result1
= await pool
.execute(data
, 'jsonIntegerSerialization')
820 expect(result1
).toStrictEqual({ ok
: 1 })
821 const result2
= await pool
.execute(data
, 'factorial')
822 expect(result2
).toBe(3628800)
823 const result3
= await pool
.execute(data
, 'fibonacci')
824 expect(result3
).toBe(55)