1 const { MessageChannel
} = require('worker_threads')
2 const { expect
} = require('expect')
10 WorkerChoiceStrategies
,
12 } = require('../../../lib')
13 const { CircularArray
} = require('../../../lib/circular-array')
14 const { Queue
} = require('../../../lib/queue')
15 const { version
} = require('../../../package.json')
16 const { waitPoolEvents
} = require('../../test-utils')
18 describe('Abstract pool test suite', () => {
19 const numberOfWorkers
= 2
20 class StubPoolWithIsMain
extends FixedThreadPool
{
26 it('Simulate pool creation from a non main thread/process', () => {
29 new StubPoolWithIsMain(
31 './tests/worker-files/thread/testWorker.js',
33 errorHandler
: (e
) => console
.error(e
)
37 'Cannot start a pool from a worker with the same type as the pool'
41 it('Verify that filePath is checked', () => {
42 const expectedError
= new Error(
43 'Please specify a file with a worker implementation'
45 expect(() => new FixedThreadPool(numberOfWorkers
)).toThrowError(
48 expect(() => new FixedThreadPool(numberOfWorkers
, '')).toThrowError(
51 expect(() => new FixedThreadPool(numberOfWorkers
, 0)).toThrowError(
54 expect(() => new FixedThreadPool(numberOfWorkers
, true)).toThrowError(
58 () => new FixedThreadPool(numberOfWorkers
, './dummyWorker.ts')
59 ).toThrowError(new Error("Cannot find the worker file './dummyWorker.ts'"))
62 it('Verify that numberOfWorkers is checked', () => {
63 expect(() => new FixedThreadPool()).toThrowError(
64 'Cannot instantiate a pool without specifying the number of workers'
68 it('Verify that a negative number of workers is checked', () => {
71 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
74 'Cannot instantiate a pool with a negative number of workers'
79 it('Verify that a non integer number of workers is checked', () => {
82 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
85 'Cannot instantiate a pool with a non safe integer number of workers'
90 it('Verify that dynamic pool sizing is checked', () => {
93 new DynamicClusterPool(
96 './tests/worker-files/cluster/testWorker.js'
100 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
105 new DynamicThreadPool(
108 './tests/worker-files/thread/testWorker.js'
112 'Cannot instantiate a pool with a non safe integer number of workers'
117 new DynamicClusterPool(
120 './tests/worker-files/cluster/testWorker.js'
124 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
129 new DynamicThreadPool(2, 1, './tests/worker-files/thread/testWorker.js')
132 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
137 new DynamicClusterPool(
140 './tests/worker-files/cluster/testWorker.js'
144 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
149 new DynamicThreadPool(0, 0, './tests/worker-files/thread/testWorker.js')
152 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
157 it('Verify that pool options are checked', async () => {
158 let pool
= new FixedThreadPool(
160 './tests/worker-files/thread/testWorker.js'
162 expect(pool
.emitter
).toBeDefined()
163 expect(pool
.opts
.enableEvents
).toBe(true)
164 expect(pool
.opts
.restartWorkerOnError
).toBe(true)
165 expect(pool
.opts
.enableTasksQueue
).toBe(false)
166 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
167 expect(pool
.opts
.workerChoiceStrategy
).toBe(
168 WorkerChoiceStrategies
.ROUND_ROBIN
170 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
171 runTime
: { median
: false },
172 waitTime
: { median
: false },
173 elu
: { median
: false }
175 expect(pool
.opts
.messageHandler
).toBeUndefined()
176 expect(pool
.opts
.errorHandler
).toBeUndefined()
177 expect(pool
.opts
.onlineHandler
).toBeUndefined()
178 expect(pool
.opts
.exitHandler
).toBeUndefined()
180 const testHandler
= () => console
.info('test handler executed')
181 pool
= new FixedThreadPool(
183 './tests/worker-files/thread/testWorker.js',
185 workerChoiceStrategy
: WorkerChoiceStrategies
.LEAST_USED
,
186 workerChoiceStrategyOptions
: {
187 runTime
: { median
: true },
188 weights
: { 0: 300, 1: 200 }
191 restartWorkerOnError
: false,
192 enableTasksQueue
: true,
193 tasksQueueOptions
: { concurrency
: 2 },
194 messageHandler
: testHandler
,
195 errorHandler
: testHandler
,
196 onlineHandler
: testHandler
,
197 exitHandler
: testHandler
200 expect(pool
.emitter
).toBeUndefined()
201 expect(pool
.opts
.enableEvents
).toBe(false)
202 expect(pool
.opts
.restartWorkerOnError
).toBe(false)
203 expect(pool
.opts
.enableTasksQueue
).toBe(true)
204 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
205 expect(pool
.opts
.workerChoiceStrategy
).toBe(
206 WorkerChoiceStrategies
.LEAST_USED
208 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
209 runTime
: { median
: true },
210 weights
: { 0: 300, 1: 200 }
212 expect(pool
.opts
.messageHandler
).toStrictEqual(testHandler
)
213 expect(pool
.opts
.errorHandler
).toStrictEqual(testHandler
)
214 expect(pool
.opts
.onlineHandler
).toStrictEqual(testHandler
)
215 expect(pool
.opts
.exitHandler
).toStrictEqual(testHandler
)
219 it('Verify that pool options are validated', async () => {
224 './tests/worker-files/thread/testWorker.js',
226 workerChoiceStrategy
: 'invalidStrategy'
229 ).toThrowError("Invalid worker choice strategy 'invalidStrategy'")
234 './tests/worker-files/thread/testWorker.js',
236 workerChoiceStrategyOptions
: 'invalidOptions'
240 'Invalid worker choice strategy options: must be a plain object'
246 './tests/worker-files/thread/testWorker.js',
248 workerChoiceStrategyOptions
: { weights
: {} }
252 'Invalid worker choice strategy options: must have a weight for each worker node'
258 './tests/worker-files/thread/testWorker.js',
260 workerChoiceStrategyOptions
: { measurement
: 'invalidMeasurement' }
264 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
270 './tests/worker-files/thread/testWorker.js',
272 enableTasksQueue
: true,
273 tasksQueueOptions
: { concurrency
: 0 }
276 ).toThrowError("Invalid worker tasks concurrency '0'")
281 './tests/worker-files/thread/testWorker.js',
283 enableTasksQueue
: true,
284 tasksQueueOptions
: 'invalidTasksQueueOptions'
287 ).toThrowError('Invalid tasks queue options: must be a plain object')
292 './tests/worker-files/thread/testWorker.js',
294 enableTasksQueue
: true,
295 tasksQueueOptions
: { concurrency
: 0.2 }
298 ).toThrowError('Invalid worker tasks concurrency: must be an integer')
301 it('Verify that pool worker choice strategy options can be set', async () => {
302 const pool
= new FixedThreadPool(
304 './tests/worker-files/thread/testWorker.js',
305 { workerChoiceStrategy
: WorkerChoiceStrategies
.FAIR_SHARE
}
307 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
308 runTime
: { median
: false },
309 waitTime
: { median
: false },
310 elu
: { median
: false }
312 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
313 .workerChoiceStrategies
) {
314 expect(workerChoiceStrategy
.opts
).toStrictEqual({
315 runTime
: { median
: false },
316 waitTime
: { median
: false },
317 elu
: { median
: false }
321 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
339 pool
.setWorkerChoiceStrategyOptions({
340 runTime
: { median
: true },
341 elu
: { median
: true }
343 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
344 runTime
: { median
: true },
345 elu
: { median
: true }
347 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
348 .workerChoiceStrategies
) {
349 expect(workerChoiceStrategy
.opts
).toStrictEqual({
350 runTime
: { median
: true },
351 elu
: { median
: true }
355 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
373 pool
.setWorkerChoiceStrategyOptions({
374 runTime
: { median
: false },
375 elu
: { median
: false }
377 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
378 runTime
: { median
: false },
379 elu
: { median
: false }
381 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
382 .workerChoiceStrategies
) {
383 expect(workerChoiceStrategy
.opts
).toStrictEqual({
384 runTime
: { median
: false },
385 elu
: { median
: false }
389 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
408 pool
.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
410 'Invalid worker choice strategy options: must be a plain object'
413 pool
.setWorkerChoiceStrategyOptions({ weights
: {} })
415 'Invalid worker choice strategy options: must have a weight for each worker node'
418 pool
.setWorkerChoiceStrategyOptions({ measurement
: 'invalidMeasurement' })
420 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
425 it('Verify that pool tasks queue can be enabled/disabled', async () => {
426 const pool
= new FixedThreadPool(
428 './tests/worker-files/thread/testWorker.js'
430 expect(pool
.opts
.enableTasksQueue
).toBe(false)
431 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
432 pool
.enableTasksQueue(true)
433 expect(pool
.opts
.enableTasksQueue
).toBe(true)
434 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 1 })
435 pool
.enableTasksQueue(true, { concurrency
: 2 })
436 expect(pool
.opts
.enableTasksQueue
).toBe(true)
437 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
438 pool
.enableTasksQueue(false)
439 expect(pool
.opts
.enableTasksQueue
).toBe(false)
440 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
444 it('Verify that pool tasks queue options can be set', async () => {
445 const pool
= new FixedThreadPool(
447 './tests/worker-files/thread/testWorker.js',
448 { enableTasksQueue
: true }
450 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 1 })
451 pool
.setTasksQueueOptions({ concurrency
: 2 })
452 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
454 pool
.setTasksQueueOptions('invalidTasksQueueOptions')
455 ).toThrowError('Invalid tasks queue options: must be a plain object')
456 expect(() => pool
.setTasksQueueOptions({ concurrency
: 0 })).toThrowError(
457 "Invalid worker tasks concurrency '0'"
459 expect(() => pool
.setTasksQueueOptions({ concurrency
: 0.2 })).toThrowError(
460 'Invalid worker tasks concurrency: must be an integer'
465 it('Verify that pool info is set', async () => {
466 let pool
= new FixedThreadPool(
468 './tests/worker-files/thread/testWorker.js'
470 expect(pool
.info
).toStrictEqual({
472 type
: PoolTypes
.fixed
,
473 worker
: WorkerTypes
.thread
,
475 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
476 minSize
: numberOfWorkers
,
477 maxSize
: numberOfWorkers
,
478 workerNodes
: numberOfWorkers
,
479 idleWorkerNodes
: numberOfWorkers
,
486 pool
= new DynamicClusterPool(
487 Math
.floor(numberOfWorkers
/ 2),
489 './tests/worker-files/cluster/testWorker.js'
491 expect(pool
.info
).toStrictEqual({
493 type
: PoolTypes
.dynamic
,
494 worker
: WorkerTypes
.cluster
,
496 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
497 minSize
: Math
.floor(numberOfWorkers
/ 2),
498 maxSize
: numberOfWorkers
,
499 workerNodes
: Math
.floor(numberOfWorkers
/ 2),
500 idleWorkerNodes
: Math
.floor(numberOfWorkers
/ 2),
509 it('Verify that pool worker tasks usage are initialized', async () => {
510 const pool
= new FixedClusterPool(
512 './tests/worker-files/cluster/testWorker.js'
514 for (const workerNode
of pool
.workerNodes
) {
515 expect(workerNode
.usage
).toStrictEqual({
524 history
: expect
.any(CircularArray
)
527 history
: expect
.any(CircularArray
)
531 history
: expect
.any(CircularArray
)
534 history
: expect
.any(CircularArray
)
542 it('Verify that pool worker tasks queue are initialized', async () => {
543 let pool
= new FixedClusterPool(
545 './tests/worker-files/cluster/testWorker.js'
547 for (const workerNode
of pool
.workerNodes
) {
548 expect(workerNode
.tasksQueue
).toBeDefined()
549 expect(workerNode
.tasksQueue
).toBeInstanceOf(Queue
)
550 expect(workerNode
.tasksQueue
.size
).toBe(0)
551 expect(workerNode
.tasksQueue
.maxSize
).toBe(0)
554 pool
= new DynamicThreadPool(
555 Math
.floor(numberOfWorkers
/ 2),
557 './tests/worker-files/thread/testWorker.js'
559 for (const workerNode
of pool
.workerNodes
) {
560 expect(workerNode
.tasksQueue
).toBeDefined()
561 expect(workerNode
.tasksQueue
).toBeInstanceOf(Queue
)
562 expect(workerNode
.tasksQueue
.size
).toBe(0)
563 expect(workerNode
.tasksQueue
.maxSize
).toBe(0)
567 it('Verify that pool worker info are initialized', async () => {
568 let pool
= new FixedClusterPool(
570 './tests/worker-files/cluster/testWorker.js'
572 for (const workerNode
of pool
.workerNodes
) {
573 expect(workerNode
.info
).toStrictEqual({
574 id
: expect
.any(Number
),
575 type
: WorkerTypes
.cluster
,
581 pool
= new DynamicThreadPool(
582 Math
.floor(numberOfWorkers
/ 2),
584 './tests/worker-files/thread/testWorker.js'
586 for (const workerNode
of pool
.workerNodes
) {
587 expect(workerNode
.info
).toStrictEqual({
588 id
: expect
.any(Number
),
589 type
: WorkerTypes
.thread
,
592 messageChannel
: expect
.any(MessageChannel
)
597 it('Verify that pool worker tasks usage are computed', async () => {
598 const pool
= new FixedClusterPool(
600 './tests/worker-files/cluster/testWorker.js'
602 const promises
= new Set()
603 const maxMultiplier
= 2
604 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
605 promises
.add(pool
.execute())
607 for (const workerNode
of pool
.workerNodes
) {
608 expect(workerNode
.usage
).toStrictEqual({
611 executing
: maxMultiplier
,
617 history
: expect
.any(CircularArray
)
620 history
: expect
.any(CircularArray
)
624 history
: expect
.any(CircularArray
)
627 history
: expect
.any(CircularArray
)
632 await Promise
.all(promises
)
633 for (const workerNode
of pool
.workerNodes
) {
634 expect(workerNode
.usage
).toStrictEqual({
636 executed
: maxMultiplier
,
643 history
: expect
.any(CircularArray
)
646 history
: expect
.any(CircularArray
)
650 history
: expect
.any(CircularArray
)
653 history
: expect
.any(CircularArray
)
661 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
662 const pool
= new DynamicThreadPool(
663 Math
.floor(numberOfWorkers
/ 2),
665 './tests/worker-files/thread/testWorker.js'
667 const promises
= new Set()
668 const maxMultiplier
= 2
669 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
670 promises
.add(pool
.execute())
672 await Promise
.all(promises
)
673 for (const workerNode
of pool
.workerNodes
) {
674 expect(workerNode
.usage
).toStrictEqual({
676 executed
: expect
.any(Number
),
683 history
: expect
.any(CircularArray
)
686 history
: expect
.any(CircularArray
)
690 history
: expect
.any(CircularArray
)
693 history
: expect
.any(CircularArray
)
697 expect(workerNode
.usage
.tasks
.executed
).toBeGreaterThan(0)
698 expect(workerNode
.usage
.tasks
.executed
).toBeLessThanOrEqual(maxMultiplier
)
699 expect(workerNode
.usage
.runTime
.history
.length
).toBe(0)
700 expect(workerNode
.usage
.waitTime
.history
.length
).toBe(0)
701 expect(workerNode
.usage
.elu
.idle
.history
.length
).toBe(0)
702 expect(workerNode
.usage
.elu
.active
.history
.length
).toBe(0)
704 pool
.setWorkerChoiceStrategy(WorkerChoiceStrategies
.FAIR_SHARE
)
705 for (const workerNode
of pool
.workerNodes
) {
706 expect(workerNode
.usage
).toStrictEqual({
715 history
: expect
.any(CircularArray
)
718 history
: expect
.any(CircularArray
)
722 history
: expect
.any(CircularArray
)
725 history
: expect
.any(CircularArray
)
729 expect(workerNode
.usage
.runTime
.history
.length
).toBe(0)
730 expect(workerNode
.usage
.waitTime
.history
.length
).toBe(0)
731 expect(workerNode
.usage
.elu
.idle
.history
.length
).toBe(0)
732 expect(workerNode
.usage
.elu
.active
.history
.length
).toBe(0)
737 it("Verify that pool event emitter 'full' event can register a callback", async () => {
738 const pool
= new DynamicThreadPool(
739 Math
.floor(numberOfWorkers
/ 2),
741 './tests/worker-files/thread/testWorker.js'
743 const promises
= new Set()
746 pool
.emitter
.on(PoolEvents
.full
, (info
) => {
750 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
751 promises
.add(pool
.execute())
753 await Promise
.all(promises
)
754 // The `full` event is triggered when the number of submitted tasks at once reach the maximum number of workers in the dynamic pool.
755 // So in total numberOfWorkers * 2 - 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool with min = (max = numberOfWorkers) / 2.
756 expect(poolFull
).toBe(numberOfWorkers
* 2 - 1)
757 expect(poolInfo
).toStrictEqual({
759 type
: PoolTypes
.dynamic
,
760 worker
: WorkerTypes
.thread
,
761 ready
: expect
.any(Boolean
),
762 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
763 minSize
: expect
.any(Number
),
764 maxSize
: expect
.any(Number
),
765 workerNodes
: expect
.any(Number
),
766 idleWorkerNodes
: expect
.any(Number
),
767 busyWorkerNodes
: expect
.any(Number
),
768 executedTasks
: expect
.any(Number
),
769 executingTasks
: expect
.any(Number
),
770 failedTasks
: expect
.any(Number
)
775 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
776 const pool
= new DynamicClusterPool(
777 Math
.floor(numberOfWorkers
/ 2),
779 './tests/worker-files/cluster/testWorker.js'
783 pool
.emitter
.on(PoolEvents
.ready
, (info
) => {
787 await
waitPoolEvents(pool
, PoolEvents
.ready
, 1)
788 expect(poolReady
).toBe(1)
789 expect(poolInfo
).toStrictEqual({
791 type
: PoolTypes
.dynamic
,
792 worker
: WorkerTypes
.cluster
,
794 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
795 minSize
: expect
.any(Number
),
796 maxSize
: expect
.any(Number
),
797 workerNodes
: expect
.any(Number
),
798 idleWorkerNodes
: expect
.any(Number
),
799 busyWorkerNodes
: expect
.any(Number
),
800 executedTasks
: expect
.any(Number
),
801 executingTasks
: expect
.any(Number
),
802 failedTasks
: expect
.any(Number
)
807 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
808 const pool
= new FixedThreadPool(
810 './tests/worker-files/thread/testWorker.js'
812 const promises
= new Set()
815 pool
.emitter
.on(PoolEvents
.busy
, (info
) => {
819 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
820 promises
.add(pool
.execute())
822 await Promise
.all(promises
)
823 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
824 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
825 expect(poolBusy
).toBe(numberOfWorkers
+ 1)
826 expect(poolInfo
).toStrictEqual({
828 type
: PoolTypes
.fixed
,
829 worker
: WorkerTypes
.thread
,
830 ready
: expect
.any(Boolean
),
831 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
832 minSize
: expect
.any(Number
),
833 maxSize
: expect
.any(Number
),
834 workerNodes
: expect
.any(Number
),
835 idleWorkerNodes
: expect
.any(Number
),
836 busyWorkerNodes
: expect
.any(Number
),
837 executedTasks
: expect
.any(Number
),
838 executingTasks
: expect
.any(Number
),
839 failedTasks
: expect
.any(Number
)
844 it('Verify that listTaskFunctions() is working', async () => {
845 const dynamicThreadPool
= new DynamicThreadPool(
846 Math
.floor(numberOfWorkers
/ 2),
848 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.js'
850 await
waitPoolEvents(dynamicThreadPool
, PoolEvents
.ready
, 1)
851 expect(dynamicThreadPool
.listTaskFunctions()).toStrictEqual([
853 'jsonIntegerSerialization',
857 const fixedClusterPool
= new FixedClusterPool(
859 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
861 await
waitPoolEvents(fixedClusterPool
, PoolEvents
.ready
, 1)
862 expect(fixedClusterPool
.listTaskFunctions()).toStrictEqual([
864 'jsonIntegerSerialization',
870 it('Verify that multiple task functions worker is working', async () => {
871 const pool
= new DynamicClusterPool(
872 Math
.floor(numberOfWorkers
/ 2),
874 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
876 const data
= { n
: 10 }
877 const result0
= await pool
.execute(data
)
878 expect(result0
).toStrictEqual({ ok
: 1 })
879 const result1
= await pool
.execute(data
, 'jsonIntegerSerialization')
880 expect(result1
).toStrictEqual({ ok
: 1 })
881 const result2
= await pool
.execute(data
, 'factorial')
882 expect(result2
).toBe(3628800)
883 const result3
= await pool
.execute(data
, 'fibonacci')
884 expect(result3
).toBe(55)