1 const { expect
} = require('expect')
9 WorkerChoiceStrategies
,
11 } = require('../../../lib')
12 const { CircularArray
} = require('../../../lib/circular-array')
13 const { Queue
} = require('../../../lib/queue')
14 const { version
} = require('../../../package.json')
15 const { waitPoolEvents
} = require('../../test-utils')
17 describe('Abstract pool test suite', () => {
18 const numberOfWorkers
= 2
19 class StubPoolWithIsMain
extends FixedThreadPool
{
25 it('Simulate pool creation from a non main thread/process', () => {
28 new StubPoolWithIsMain(
30 './tests/worker-files/thread/testWorker.js',
32 errorHandler
: (e
) => console
.error(e
)
36 'Cannot start a pool from a worker with the same type as the pool'
40 it('Verify that filePath is checked', () => {
41 const expectedError
= new Error(
42 'Please specify a file with a worker implementation'
44 expect(() => new FixedThreadPool(numberOfWorkers
)).toThrowError(
47 expect(() => new FixedThreadPool(numberOfWorkers
, '')).toThrowError(
50 expect(() => new FixedThreadPool(numberOfWorkers
, 0)).toThrowError(
53 expect(() => new FixedThreadPool(numberOfWorkers
, true)).toThrowError(
57 () => new FixedThreadPool(numberOfWorkers
, './dummyWorker.ts')
58 ).toThrowError(new Error("Cannot find the worker file './dummyWorker.ts'"))
61 it('Verify that numberOfWorkers is checked', () => {
62 expect(() => new FixedThreadPool()).toThrowError(
63 'Cannot instantiate a pool without specifying the number of workers'
67 it('Verify that a negative number of workers is checked', () => {
70 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
73 'Cannot instantiate a pool with a negative number of workers'
78 it('Verify that a non integer number of workers is checked', () => {
81 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
84 'Cannot instantiate a pool with a non safe integer number of workers'
89 it('Verify that dynamic pool sizing is checked', () => {
92 new DynamicClusterPool(
95 './tests/worker-files/cluster/testWorker.js'
99 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
104 new DynamicThreadPool(
107 './tests/worker-files/thread/testWorker.js'
111 'Cannot instantiate a pool with a non safe integer number of workers'
116 new DynamicClusterPool(
119 './tests/worker-files/cluster/testWorker.js'
123 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
128 new DynamicThreadPool(2, 1, './tests/worker-files/thread/testWorker.js')
131 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
136 new DynamicClusterPool(
139 './tests/worker-files/cluster/testWorker.js'
143 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
148 new DynamicThreadPool(0, 0, './tests/worker-files/thread/testWorker.js')
151 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
156 it('Verify that pool options are checked', async () => {
157 let pool
= new FixedThreadPool(
159 './tests/worker-files/thread/testWorker.js'
161 expect(pool
.emitter
).toBeDefined()
162 expect(pool
.opts
.enableEvents
).toBe(true)
163 expect(pool
.opts
.restartWorkerOnError
).toBe(true)
164 expect(pool
.opts
.enableTasksQueue
).toBe(false)
165 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
166 expect(pool
.opts
.workerChoiceStrategy
).toBe(
167 WorkerChoiceStrategies
.ROUND_ROBIN
169 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
170 runTime
: { median
: false },
171 waitTime
: { median
: false },
172 elu
: { median
: false }
174 expect(pool
.opts
.messageHandler
).toBeUndefined()
175 expect(pool
.opts
.errorHandler
).toBeUndefined()
176 expect(pool
.opts
.onlineHandler
).toBeUndefined()
177 expect(pool
.opts
.exitHandler
).toBeUndefined()
179 const testHandler
= () => console
.info('test handler executed')
180 pool
= new FixedThreadPool(
182 './tests/worker-files/thread/testWorker.js',
184 workerChoiceStrategy
: WorkerChoiceStrategies
.LEAST_USED
,
185 workerChoiceStrategyOptions
: {
186 runTime
: { median
: true },
187 weights
: { 0: 300, 1: 200 }
190 restartWorkerOnError
: false,
191 enableTasksQueue
: true,
192 tasksQueueOptions
: { concurrency
: 2 },
193 messageHandler
: testHandler
,
194 errorHandler
: testHandler
,
195 onlineHandler
: testHandler
,
196 exitHandler
: testHandler
199 expect(pool
.emitter
).toBeUndefined()
200 expect(pool
.opts
.enableEvents
).toBe(false)
201 expect(pool
.opts
.restartWorkerOnError
).toBe(false)
202 expect(pool
.opts
.enableTasksQueue
).toBe(true)
203 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
204 expect(pool
.opts
.workerChoiceStrategy
).toBe(
205 WorkerChoiceStrategies
.LEAST_USED
207 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
208 runTime
: { median
: true },
209 weights
: { 0: 300, 1: 200 }
211 expect(pool
.opts
.messageHandler
).toStrictEqual(testHandler
)
212 expect(pool
.opts
.errorHandler
).toStrictEqual(testHandler
)
213 expect(pool
.opts
.onlineHandler
).toStrictEqual(testHandler
)
214 expect(pool
.opts
.exitHandler
).toStrictEqual(testHandler
)
218 it('Verify that pool options are validated', async () => {
223 './tests/worker-files/thread/testWorker.js',
225 workerChoiceStrategy
: 'invalidStrategy'
228 ).toThrowError("Invalid worker choice strategy 'invalidStrategy'")
233 './tests/worker-files/thread/testWorker.js',
235 workerChoiceStrategyOptions
: 'invalidOptions'
239 'Invalid worker choice strategy options: must be a plain object'
245 './tests/worker-files/thread/testWorker.js',
247 workerChoiceStrategyOptions
: { weights
: {} }
251 'Invalid worker choice strategy options: must have a weight for each worker node'
257 './tests/worker-files/thread/testWorker.js',
259 workerChoiceStrategyOptions
: { measurement
: 'invalidMeasurement' }
263 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
269 './tests/worker-files/thread/testWorker.js',
271 enableTasksQueue
: true,
272 tasksQueueOptions
: { concurrency
: 0 }
275 ).toThrowError("Invalid worker tasks concurrency '0'")
280 './tests/worker-files/thread/testWorker.js',
282 enableTasksQueue
: true,
283 tasksQueueOptions
: 'invalidTasksQueueOptions'
286 ).toThrowError('Invalid tasks queue options: must be a plain object')
291 './tests/worker-files/thread/testWorker.js',
293 enableTasksQueue
: true,
294 tasksQueueOptions
: { concurrency
: 0.2 }
297 ).toThrowError('Invalid worker tasks concurrency: must be an integer')
300 it('Verify that pool worker choice strategy options can be set', async () => {
301 const pool
= new FixedThreadPool(
303 './tests/worker-files/thread/testWorker.js',
304 { workerChoiceStrategy
: WorkerChoiceStrategies
.FAIR_SHARE
}
306 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
307 runTime
: { median
: false },
308 waitTime
: { median
: false },
309 elu
: { median
: false }
311 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
312 .workerChoiceStrategies
) {
313 expect(workerChoiceStrategy
.opts
).toStrictEqual({
314 runTime
: { median
: false },
315 waitTime
: { median
: false },
316 elu
: { median
: false }
320 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
338 pool
.setWorkerChoiceStrategyOptions({
339 runTime
: { median
: true },
340 elu
: { median
: true }
342 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
343 runTime
: { median
: true },
344 elu
: { median
: true }
346 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
347 .workerChoiceStrategies
) {
348 expect(workerChoiceStrategy
.opts
).toStrictEqual({
349 runTime
: { median
: true },
350 elu
: { median
: true }
354 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
372 pool
.setWorkerChoiceStrategyOptions({
373 runTime
: { median
: false },
374 elu
: { median
: false }
376 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
377 runTime
: { median
: false },
378 elu
: { median
: false }
380 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
381 .workerChoiceStrategies
) {
382 expect(workerChoiceStrategy
.opts
).toStrictEqual({
383 runTime
: { median
: false },
384 elu
: { median
: false }
388 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
407 pool
.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
409 'Invalid worker choice strategy options: must be a plain object'
412 pool
.setWorkerChoiceStrategyOptions({ weights
: {} })
414 'Invalid worker choice strategy options: must have a weight for each worker node'
417 pool
.setWorkerChoiceStrategyOptions({ measurement
: 'invalidMeasurement' })
419 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
424 it('Verify that pool tasks queue can be enabled/disabled', async () => {
425 const pool
= new FixedThreadPool(
427 './tests/worker-files/thread/testWorker.js'
429 expect(pool
.opts
.enableTasksQueue
).toBe(false)
430 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
431 pool
.enableTasksQueue(true)
432 expect(pool
.opts
.enableTasksQueue
).toBe(true)
433 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 1 })
434 pool
.enableTasksQueue(true, { concurrency
: 2 })
435 expect(pool
.opts
.enableTasksQueue
).toBe(true)
436 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
437 pool
.enableTasksQueue(false)
438 expect(pool
.opts
.enableTasksQueue
).toBe(false)
439 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
443 it('Verify that pool tasks queue options can be set', async () => {
444 const pool
= new FixedThreadPool(
446 './tests/worker-files/thread/testWorker.js',
447 { enableTasksQueue
: true }
449 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 1 })
450 pool
.setTasksQueueOptions({ concurrency
: 2 })
451 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
453 pool
.setTasksQueueOptions('invalidTasksQueueOptions')
454 ).toThrowError('Invalid tasks queue options: must be a plain object')
455 expect(() => pool
.setTasksQueueOptions({ concurrency
: 0 })).toThrowError(
456 "Invalid worker tasks concurrency '0'"
458 expect(() => pool
.setTasksQueueOptions({ concurrency
: 0.2 })).toThrowError(
459 'Invalid worker tasks concurrency: must be an integer'
464 it('Verify that pool info is set', async () => {
465 let pool
= new FixedThreadPool(
467 './tests/worker-files/thread/testWorker.js'
469 expect(pool
.info
).toStrictEqual({
471 type
: PoolTypes
.fixed
,
472 worker
: WorkerTypes
.thread
,
474 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
475 minSize
: numberOfWorkers
,
476 maxSize
: numberOfWorkers
,
477 workerNodes
: numberOfWorkers
,
478 idleWorkerNodes
: numberOfWorkers
,
485 pool
= new DynamicClusterPool(
486 Math
.floor(numberOfWorkers
/ 2),
488 './tests/worker-files/cluster/testWorker.js'
490 expect(pool
.info
).toStrictEqual({
492 type
: PoolTypes
.dynamic
,
493 worker
: WorkerTypes
.cluster
,
495 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
496 minSize
: Math
.floor(numberOfWorkers
/ 2),
497 maxSize
: numberOfWorkers
,
498 workerNodes
: Math
.floor(numberOfWorkers
/ 2),
499 idleWorkerNodes
: Math
.floor(numberOfWorkers
/ 2),
508 it('Verify that pool worker tasks usage are initialized', async () => {
509 const pool
= new FixedClusterPool(
511 './tests/worker-files/cluster/testWorker.js'
513 for (const workerNode
of pool
.workerNodes
) {
514 expect(workerNode
.usage
).toStrictEqual({
523 history
: expect
.any(CircularArray
)
526 history
: expect
.any(CircularArray
)
530 history
: expect
.any(CircularArray
)
533 history
: expect
.any(CircularArray
)
541 it('Verify that pool worker tasks queue are initialized', async () => {
542 let pool
= new FixedClusterPool(
544 './tests/worker-files/cluster/testWorker.js'
546 for (const workerNode
of pool
.workerNodes
) {
547 expect(workerNode
.tasksQueue
).toBeDefined()
548 expect(workerNode
.tasksQueue
).toBeInstanceOf(Queue
)
549 expect(workerNode
.tasksQueue
.size
).toBe(0)
550 expect(workerNode
.tasksQueue
.maxSize
).toBe(0)
553 pool
= new DynamicThreadPool(
554 Math
.floor(numberOfWorkers
/ 2),
556 './tests/worker-files/thread/testWorker.js'
558 for (const workerNode
of pool
.workerNodes
) {
559 expect(workerNode
.tasksQueue
).toBeDefined()
560 expect(workerNode
.tasksQueue
).toBeInstanceOf(Queue
)
561 expect(workerNode
.tasksQueue
.size
).toBe(0)
562 expect(workerNode
.tasksQueue
.maxSize
).toBe(0)
566 it('Verify that pool worker info are initialized', async () => {
567 let pool
= new FixedClusterPool(
569 './tests/worker-files/cluster/testWorker.js'
571 for (const workerNode
of pool
.workerNodes
) {
572 expect(workerNode
.info
).toStrictEqual({
573 id
: expect
.any(Number
),
574 type
: WorkerTypes
.cluster
,
580 pool
= new DynamicThreadPool(
581 Math
.floor(numberOfWorkers
/ 2),
583 './tests/worker-files/thread/testWorker.js'
585 for (const workerNode
of pool
.workerNodes
) {
586 expect(workerNode
.info
).toStrictEqual({
587 id
: expect
.any(Number
),
588 type
: WorkerTypes
.thread
,
595 it('Verify that pool worker tasks usage are computed', async () => {
596 const pool
= new FixedClusterPool(
598 './tests/worker-files/cluster/testWorker.js'
600 const promises
= new Set()
601 const maxMultiplier
= 2
602 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
603 promises
.add(pool
.execute())
605 for (const workerNode
of pool
.workerNodes
) {
606 expect(workerNode
.usage
).toStrictEqual({
609 executing
: maxMultiplier
,
615 history
: expect
.any(CircularArray
)
618 history
: expect
.any(CircularArray
)
622 history
: expect
.any(CircularArray
)
625 history
: expect
.any(CircularArray
)
630 await Promise
.all(promises
)
631 for (const workerNode
of pool
.workerNodes
) {
632 expect(workerNode
.usage
).toStrictEqual({
634 executed
: maxMultiplier
,
641 history
: expect
.any(CircularArray
)
644 history
: expect
.any(CircularArray
)
648 history
: expect
.any(CircularArray
)
651 history
: expect
.any(CircularArray
)
659 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
660 const pool
= new DynamicThreadPool(
661 Math
.floor(numberOfWorkers
/ 2),
663 './tests/worker-files/thread/testWorker.js'
665 const promises
= new Set()
666 const maxMultiplier
= 2
667 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
668 promises
.add(pool
.execute())
670 await Promise
.all(promises
)
671 for (const workerNode
of pool
.workerNodes
) {
672 expect(workerNode
.usage
).toStrictEqual({
674 executed
: expect
.any(Number
),
681 history
: expect
.any(CircularArray
)
684 history
: expect
.any(CircularArray
)
688 history
: expect
.any(CircularArray
)
691 history
: expect
.any(CircularArray
)
695 expect(workerNode
.usage
.tasks
.executed
).toBeGreaterThan(0)
696 expect(workerNode
.usage
.tasks
.executed
).toBeLessThanOrEqual(maxMultiplier
)
697 expect(workerNode
.usage
.runTime
.history
.length
).toBe(0)
698 expect(workerNode
.usage
.waitTime
.history
.length
).toBe(0)
699 expect(workerNode
.usage
.elu
.idle
.history
.length
).toBe(0)
700 expect(workerNode
.usage
.elu
.active
.history
.length
).toBe(0)
702 pool
.setWorkerChoiceStrategy(WorkerChoiceStrategies
.FAIR_SHARE
)
703 for (const workerNode
of pool
.workerNodes
) {
704 expect(workerNode
.usage
).toStrictEqual({
713 history
: expect
.any(CircularArray
)
716 history
: expect
.any(CircularArray
)
720 history
: expect
.any(CircularArray
)
723 history
: expect
.any(CircularArray
)
727 expect(workerNode
.usage
.runTime
.history
.length
).toBe(0)
728 expect(workerNode
.usage
.waitTime
.history
.length
).toBe(0)
729 expect(workerNode
.usage
.elu
.idle
.history
.length
).toBe(0)
730 expect(workerNode
.usage
.elu
.active
.history
.length
).toBe(0)
735 it("Verify that pool event emitter 'full' event can register a callback", async () => {
736 const pool
= new DynamicThreadPool(
737 Math
.floor(numberOfWorkers
/ 2),
739 './tests/worker-files/thread/testWorker.js'
741 const promises
= new Set()
744 pool
.emitter
.on(PoolEvents
.full
, (info
) => {
748 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
749 promises
.add(pool
.execute())
751 await Promise
.all(promises
)
752 // The `full` event is triggered when the number of submitted tasks at once reach the maximum number of workers in the dynamic pool.
753 // So in total numberOfWorkers * 2 - 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool with min = (max = numberOfWorkers) / 2.
754 expect(poolFull
).toBe(numberOfWorkers
* 2 - 1)
755 expect(poolInfo
).toStrictEqual({
757 type
: PoolTypes
.dynamic
,
758 worker
: WorkerTypes
.thread
,
759 ready
: expect
.any(Boolean
),
760 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
761 minSize
: expect
.any(Number
),
762 maxSize
: expect
.any(Number
),
763 workerNodes
: expect
.any(Number
),
764 idleWorkerNodes
: expect
.any(Number
),
765 busyWorkerNodes
: expect
.any(Number
),
766 executedTasks
: expect
.any(Number
),
767 executingTasks
: expect
.any(Number
),
768 failedTasks
: expect
.any(Number
)
773 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
774 const pool
= new DynamicClusterPool(
775 Math
.floor(numberOfWorkers
/ 2),
777 './tests/worker-files/cluster/testWorker.js'
781 pool
.emitter
.on(PoolEvents
.ready
, (info
) => {
785 await
waitPoolEvents(pool
, PoolEvents
.ready
, 1)
786 expect(poolReady
).toBe(1)
787 expect(poolInfo
).toStrictEqual({
789 type
: PoolTypes
.dynamic
,
790 worker
: WorkerTypes
.cluster
,
792 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
793 minSize
: expect
.any(Number
),
794 maxSize
: expect
.any(Number
),
795 workerNodes
: expect
.any(Number
),
796 idleWorkerNodes
: expect
.any(Number
),
797 busyWorkerNodes
: expect
.any(Number
),
798 executedTasks
: expect
.any(Number
),
799 executingTasks
: expect
.any(Number
),
800 failedTasks
: expect
.any(Number
)
805 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
806 const pool
= new FixedThreadPool(
808 './tests/worker-files/thread/testWorker.js'
810 const promises
= new Set()
813 pool
.emitter
.on(PoolEvents
.busy
, (info
) => {
817 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
818 promises
.add(pool
.execute())
820 await Promise
.all(promises
)
821 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
822 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
823 expect(poolBusy
).toBe(numberOfWorkers
+ 1)
824 expect(poolInfo
).toStrictEqual({
826 type
: PoolTypes
.fixed
,
827 worker
: WorkerTypes
.thread
,
828 ready
: expect
.any(Boolean
),
829 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
830 minSize
: expect
.any(Number
),
831 maxSize
: expect
.any(Number
),
832 workerNodes
: expect
.any(Number
),
833 idleWorkerNodes
: expect
.any(Number
),
834 busyWorkerNodes
: expect
.any(Number
),
835 executedTasks
: expect
.any(Number
),
836 executingTasks
: expect
.any(Number
),
837 failedTasks
: expect
.any(Number
)
842 it('Verify that listTaskFunctions() is working', async () => {
843 const dynamicThreadPool
= new DynamicThreadPool(
844 Math
.floor(numberOfWorkers
/ 2),
846 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.js'
848 await
waitPoolEvents(dynamicThreadPool
, PoolEvents
.ready
, 1)
849 expect(dynamicThreadPool
.listTaskFunctions()).toStrictEqual([
851 'jsonIntegerSerialization',
855 const fixedClusterPool
= new FixedClusterPool(
857 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
859 await
waitPoolEvents(fixedClusterPool
, PoolEvents
.ready
, 1)
860 expect(fixedClusterPool
.listTaskFunctions()).toStrictEqual([
862 'jsonIntegerSerialization',
868 it('Verify that multiple task functions worker is working', async () => {
869 const pool
= new DynamicClusterPool(
870 Math
.floor(numberOfWorkers
/ 2),
872 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
874 const data
= { n
: 10 }
875 const result0
= await pool
.execute(data
)
876 expect(result0
).toStrictEqual({ ok
: 1 })
877 const result1
= await pool
.execute(data
, 'jsonIntegerSerialization')
878 expect(result1
).toStrictEqual({ ok
: 1 })
879 const result2
= await pool
.execute(data
, 'factorial')
880 expect(result2
).toBe(3628800)
881 const result3
= await pool
.execute(data
, 'fibonacci')
882 expect(result3
).toBe(55)