1 const { MessageChannel
} = require('worker_threads')
2 const { expect
} = require('expect')
10 WorkerChoiceStrategies
,
12 } = require('../../../lib')
13 const { CircularArray
} = require('../../../lib/circular-array')
14 const { Queue
} = require('../../../lib/queue')
15 const { version
} = require('../../../package.json')
16 const { waitPoolEvents
} = require('../../test-utils')
18 describe('Abstract pool test suite', () => {
19 const numberOfWorkers
= 2
20 class StubPoolWithIsMain
extends FixedThreadPool
{
26 it('Simulate pool creation from a non main thread/process', () => {
29 new StubPoolWithIsMain(
31 './tests/worker-files/thread/testWorker.js',
33 errorHandler
: e
=> console
.error(e
)
36 ).toThrowError('Cannot start a pool from a worker!')
39 it('Verify that filePath is checked', () => {
40 const expectedError
= new Error(
41 'Please specify a file with a worker implementation'
43 expect(() => new FixedThreadPool(numberOfWorkers
)).toThrowError(
46 expect(() => new FixedThreadPool(numberOfWorkers
, '')).toThrowError(
49 expect(() => new FixedThreadPool(numberOfWorkers
, 0)).toThrowError(
52 expect(() => new FixedThreadPool(numberOfWorkers
, true)).toThrowError(
56 () => new FixedThreadPool(numberOfWorkers
, './dummyWorker.ts')
57 ).toThrowError(new Error("Cannot find the worker file './dummyWorker.ts'"))
60 it('Verify that numberOfWorkers is checked', () => {
61 expect(() => new FixedThreadPool()).toThrowError(
62 'Cannot instantiate a pool without specifying the number of workers'
66 it('Verify that a negative number of workers is checked', () => {
69 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
72 'Cannot instantiate a pool with a negative number of workers'
77 it('Verify that a non integer number of workers is checked', () => {
80 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
83 'Cannot instantiate a pool with a non safe integer number of workers'
88 it('Verify that dynamic pool sizing is checked', () => {
91 new DynamicClusterPool(
94 './tests/worker-files/cluster/testWorker.js'
98 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
103 new DynamicThreadPool(
106 './tests/worker-files/thread/testWorker.js'
110 'Cannot instantiate a pool with a non safe integer number of workers'
115 new DynamicClusterPool(
118 './tests/worker-files/cluster/testWorker.js'
122 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
127 new DynamicThreadPool(2, 1, './tests/worker-files/thread/testWorker.js')
130 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
135 new DynamicClusterPool(
138 './tests/worker-files/cluster/testWorker.js'
142 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
147 new DynamicThreadPool(0, 0, './tests/worker-files/thread/testWorker.js')
150 'Cannot instantiate a dynamic pool with a pool size equal to zero'
155 it('Verify that pool options are checked', async () => {
156 let pool
= new FixedThreadPool(
158 './tests/worker-files/thread/testWorker.js'
160 expect(pool
.emitter
).toBeDefined()
161 expect(pool
.opts
.enableEvents
).toBe(true)
162 expect(pool
.opts
.restartWorkerOnError
).toBe(true)
163 expect(pool
.opts
.enableTasksQueue
).toBe(false)
164 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
165 expect(pool
.opts
.workerChoiceStrategy
).toBe(
166 WorkerChoiceStrategies
.ROUND_ROBIN
168 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
169 runTime
: { median
: false },
170 waitTime
: { median
: false },
171 elu
: { median
: false }
173 expect(pool
.opts
.messageHandler
).toBeUndefined()
174 expect(pool
.opts
.errorHandler
).toBeUndefined()
175 expect(pool
.opts
.onlineHandler
).toBeUndefined()
176 expect(pool
.opts
.exitHandler
).toBeUndefined()
178 const testHandler
= () => console
.log('test handler executed')
179 pool
= new FixedThreadPool(
181 './tests/worker-files/thread/testWorker.js',
183 workerChoiceStrategy
: WorkerChoiceStrategies
.LEAST_USED
,
184 workerChoiceStrategyOptions
: {
185 runTime
: { median
: true },
186 weights
: { 0: 300, 1: 200 }
189 restartWorkerOnError
: false,
190 enableTasksQueue
: true,
191 tasksQueueOptions
: { concurrency
: 2 },
192 messageHandler
: testHandler
,
193 errorHandler
: testHandler
,
194 onlineHandler
: testHandler
,
195 exitHandler
: testHandler
198 expect(pool
.emitter
).toBeUndefined()
199 expect(pool
.opts
.enableEvents
).toBe(false)
200 expect(pool
.opts
.restartWorkerOnError
).toBe(false)
201 expect(pool
.opts
.enableTasksQueue
).toBe(true)
202 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
203 expect(pool
.opts
.workerChoiceStrategy
).toBe(
204 WorkerChoiceStrategies
.LEAST_USED
206 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
207 runTime
: { median
: true },
208 weights
: { 0: 300, 1: 200 }
210 expect(pool
.opts
.messageHandler
).toStrictEqual(testHandler
)
211 expect(pool
.opts
.errorHandler
).toStrictEqual(testHandler
)
212 expect(pool
.opts
.onlineHandler
).toStrictEqual(testHandler
)
213 expect(pool
.opts
.exitHandler
).toStrictEqual(testHandler
)
217 it('Verify that pool options are validated', async () => {
222 './tests/worker-files/thread/testWorker.js',
224 workerChoiceStrategy
: 'invalidStrategy'
227 ).toThrowError("Invalid worker choice strategy 'invalidStrategy'")
232 './tests/worker-files/thread/testWorker.js',
234 workerChoiceStrategyOptions
: 'invalidOptions'
238 'Invalid worker choice strategy options: must be a plain object'
244 './tests/worker-files/thread/testWorker.js',
246 workerChoiceStrategyOptions
: { weights
: {} }
250 'Invalid worker choice strategy options: must have a weight for each worker node'
256 './tests/worker-files/thread/testWorker.js',
258 workerChoiceStrategyOptions
: { measurement
: 'invalidMeasurement' }
262 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
268 './tests/worker-files/thread/testWorker.js',
270 enableTasksQueue
: true,
271 tasksQueueOptions
: { concurrency
: 0 }
274 ).toThrowError("Invalid worker tasks concurrency '0'")
279 './tests/worker-files/thread/testWorker.js',
281 enableTasksQueue
: true,
282 tasksQueueOptions
: 'invalidTasksQueueOptions'
285 ).toThrowError('Invalid tasks queue options: must be a plain object')
290 './tests/worker-files/thread/testWorker.js',
292 enableTasksQueue
: true,
293 tasksQueueOptions
: { concurrency
: 0.2 }
296 ).toThrowError('Invalid worker tasks concurrency: must be an integer')
299 it('Verify that pool worker choice strategy options can be set', async () => {
300 const pool
= new FixedThreadPool(
302 './tests/worker-files/thread/testWorker.js',
303 { workerChoiceStrategy
: WorkerChoiceStrategies
.FAIR_SHARE
}
305 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
306 runTime
: { median
: false },
307 waitTime
: { median
: false },
308 elu
: { median
: false }
310 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
311 .workerChoiceStrategies
) {
312 expect(workerChoiceStrategy
.opts
).toStrictEqual({
313 runTime
: { median
: false },
314 waitTime
: { median
: false },
315 elu
: { median
: false }
319 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
337 pool
.setWorkerChoiceStrategyOptions({
338 runTime
: { median
: true },
339 elu
: { median
: true }
341 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
342 runTime
: { median
: true },
343 elu
: { median
: true }
345 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
346 .workerChoiceStrategies
) {
347 expect(workerChoiceStrategy
.opts
).toStrictEqual({
348 runTime
: { median
: true },
349 elu
: { median
: true }
353 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
371 pool
.setWorkerChoiceStrategyOptions({
372 runTime
: { median
: false },
373 elu
: { median
: false }
375 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
376 runTime
: { median
: false },
377 elu
: { median
: false }
379 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
380 .workerChoiceStrategies
) {
381 expect(workerChoiceStrategy
.opts
).toStrictEqual({
382 runTime
: { median
: false },
383 elu
: { median
: false }
387 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
406 pool
.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
408 'Invalid worker choice strategy options: must be a plain object'
411 pool
.setWorkerChoiceStrategyOptions({ weights
: {} })
413 'Invalid worker choice strategy options: must have a weight for each worker node'
416 pool
.setWorkerChoiceStrategyOptions({ measurement
: 'invalidMeasurement' })
418 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
423 it('Verify that pool tasks queue can be enabled/disabled', async () => {
424 const pool
= new FixedThreadPool(
426 './tests/worker-files/thread/testWorker.js'
428 expect(pool
.opts
.enableTasksQueue
).toBe(false)
429 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
430 pool
.enableTasksQueue(true)
431 expect(pool
.opts
.enableTasksQueue
).toBe(true)
432 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 1 })
433 pool
.enableTasksQueue(true, { concurrency
: 2 })
434 expect(pool
.opts
.enableTasksQueue
).toBe(true)
435 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
436 pool
.enableTasksQueue(false)
437 expect(pool
.opts
.enableTasksQueue
).toBe(false)
438 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
442 it('Verify that pool tasks queue options can be set', async () => {
443 const pool
= new FixedThreadPool(
445 './tests/worker-files/thread/testWorker.js',
446 { enableTasksQueue
: true }
448 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 1 })
449 pool
.setTasksQueueOptions({ concurrency
: 2 })
450 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
452 pool
.setTasksQueueOptions('invalidTasksQueueOptions')
453 ).toThrowError('Invalid tasks queue options: must be a plain object')
454 expect(() => pool
.setTasksQueueOptions({ concurrency
: 0 })).toThrowError(
455 "Invalid worker tasks concurrency '0'"
457 expect(() => pool
.setTasksQueueOptions({ concurrency
: 0.2 })).toThrowError(
458 'Invalid worker tasks concurrency: must be an integer'
463 it('Verify that pool info is set', async () => {
464 let pool
= new FixedThreadPool(
466 './tests/worker-files/thread/testWorker.js'
468 expect(pool
.info
).toStrictEqual({
470 type
: PoolTypes
.fixed
,
471 worker
: WorkerTypes
.thread
,
473 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
474 minSize
: numberOfWorkers
,
475 maxSize
: numberOfWorkers
,
476 workerNodes
: numberOfWorkers
,
477 idleWorkerNodes
: numberOfWorkers
,
486 pool
= new DynamicClusterPool(
487 Math
.floor(numberOfWorkers
/ 2),
489 './tests/worker-files/cluster/testWorker.js'
491 expect(pool
.info
).toStrictEqual({
493 type
: PoolTypes
.dynamic
,
494 worker
: WorkerTypes
.cluster
,
496 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
497 minSize
: Math
.floor(numberOfWorkers
/ 2),
498 maxSize
: numberOfWorkers
,
499 workerNodes
: Math
.floor(numberOfWorkers
/ 2),
500 idleWorkerNodes
: Math
.floor(numberOfWorkers
/ 2),
511 it('Verify that pool worker tasks usage are initialized', async () => {
512 const pool
= new FixedClusterPool(
514 './tests/worker-files/cluster/testWorker.js'
516 for (const workerNode
of pool
.workerNodes
) {
517 expect(workerNode
.usage
).toStrictEqual({
526 history
: expect
.any(CircularArray
)
529 history
: expect
.any(CircularArray
)
533 history
: expect
.any(CircularArray
)
536 history
: expect
.any(CircularArray
)
544 it('Verify that pool worker tasks queue are initialized', async () => {
545 let pool
= new FixedClusterPool(
547 './tests/worker-files/cluster/testWorker.js'
549 for (const workerNode
of pool
.workerNodes
) {
550 expect(workerNode
.tasksQueue
).toBeDefined()
551 expect(workerNode
.tasksQueue
).toBeInstanceOf(Queue
)
552 expect(workerNode
.tasksQueue
.size
).toBe(0)
553 expect(workerNode
.tasksQueue
.maxSize
).toBe(0)
556 pool
= new DynamicThreadPool(
557 Math
.floor(numberOfWorkers
/ 2),
559 './tests/worker-files/thread/testWorker.js'
561 for (const workerNode
of pool
.workerNodes
) {
562 expect(workerNode
.tasksQueue
).toBeDefined()
563 expect(workerNode
.tasksQueue
).toBeInstanceOf(Queue
)
564 expect(workerNode
.tasksQueue
.size
).toBe(0)
565 expect(workerNode
.tasksQueue
.maxSize
).toBe(0)
569 it('Verify that pool worker info are initialized', async () => {
570 let pool
= new FixedClusterPool(
572 './tests/worker-files/cluster/testWorker.js'
574 for (const workerNode
of pool
.workerNodes
) {
575 expect(workerNode
.info
).toStrictEqual({
576 id
: expect
.any(Number
),
577 type
: WorkerTypes
.cluster
,
583 pool
= new DynamicThreadPool(
584 Math
.floor(numberOfWorkers
/ 2),
586 './tests/worker-files/thread/testWorker.js'
588 for (const workerNode
of pool
.workerNodes
) {
589 expect(workerNode
.info
).toStrictEqual({
590 id
: expect
.any(Number
),
591 type
: WorkerTypes
.thread
,
594 messageChannel
: expect
.any(MessageChannel
)
599 it('Verify that pool worker tasks usage are computed', async () => {
600 const pool
= new FixedClusterPool(
602 './tests/worker-files/cluster/testWorker.js'
604 const promises
= new Set()
605 const maxMultiplier
= 2
606 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
607 promises
.add(pool
.execute())
609 for (const workerNode
of pool
.workerNodes
) {
610 expect(workerNode
.usage
).toStrictEqual({
613 executing
: maxMultiplier
,
619 history
: expect
.any(CircularArray
)
622 history
: expect
.any(CircularArray
)
626 history
: expect
.any(CircularArray
)
629 history
: expect
.any(CircularArray
)
634 await Promise
.all(promises
)
635 for (const workerNode
of pool
.workerNodes
) {
636 expect(workerNode
.usage
).toStrictEqual({
638 executed
: maxMultiplier
,
645 history
: expect
.any(CircularArray
)
648 history
: expect
.any(CircularArray
)
652 history
: expect
.any(CircularArray
)
655 history
: expect
.any(CircularArray
)
663 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
664 const pool
= new DynamicThreadPool(
665 Math
.floor(numberOfWorkers
/ 2),
667 './tests/worker-files/thread/testWorker.js'
669 const promises
= new Set()
670 const maxMultiplier
= 2
671 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
672 promises
.add(pool
.execute())
674 await Promise
.all(promises
)
675 for (const workerNode
of pool
.workerNodes
) {
676 expect(workerNode
.usage
).toStrictEqual({
678 executed
: expect
.any(Number
),
685 history
: expect
.any(CircularArray
)
688 history
: expect
.any(CircularArray
)
692 history
: expect
.any(CircularArray
)
695 history
: expect
.any(CircularArray
)
699 expect(workerNode
.usage
.tasks
.executed
).toBeGreaterThan(0)
700 expect(workerNode
.usage
.tasks
.executed
).toBeLessThanOrEqual(maxMultiplier
)
701 expect(workerNode
.usage
.runTime
.history
.length
).toBe(0)
702 expect(workerNode
.usage
.waitTime
.history
.length
).toBe(0)
703 expect(workerNode
.usage
.elu
.idle
.history
.length
).toBe(0)
704 expect(workerNode
.usage
.elu
.active
.history
.length
).toBe(0)
706 pool
.setWorkerChoiceStrategy(WorkerChoiceStrategies
.FAIR_SHARE
)
707 for (const workerNode
of pool
.workerNodes
) {
708 expect(workerNode
.usage
).toStrictEqual({
717 history
: expect
.any(CircularArray
)
720 history
: expect
.any(CircularArray
)
724 history
: expect
.any(CircularArray
)
727 history
: expect
.any(CircularArray
)
731 expect(workerNode
.usage
.runTime
.history
.length
).toBe(0)
732 expect(workerNode
.usage
.waitTime
.history
.length
).toBe(0)
733 expect(workerNode
.usage
.elu
.idle
.history
.length
).toBe(0)
734 expect(workerNode
.usage
.elu
.active
.history
.length
).toBe(0)
739 it("Verify that pool event emitter 'full' event can register a callback", async () => {
740 const pool
= new DynamicThreadPool(
741 Math
.floor(numberOfWorkers
/ 2),
743 './tests/worker-files/thread/testWorker.js'
745 const promises
= new Set()
748 pool
.emitter
.on(PoolEvents
.full
, info
=> {
752 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
753 promises
.add(pool
.execute())
755 await Promise
.all(promises
)
756 // The `full` event is triggered when the number of submitted tasks at once reach the maximum number of workers in the dynamic pool.
757 // So in total numberOfWorkers * 2 - 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool with min = (max = numberOfWorkers) / 2.
758 expect(poolFull
).toBe(numberOfWorkers
* 2 - 1)
759 expect(poolInfo
).toStrictEqual({
761 type
: PoolTypes
.dynamic
,
762 worker
: WorkerTypes
.thread
,
763 ready
: expect
.any(Boolean
),
764 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
765 minSize
: expect
.any(Number
),
766 maxSize
: expect
.any(Number
),
767 workerNodes
: expect
.any(Number
),
768 idleWorkerNodes
: expect
.any(Number
),
769 busyWorkerNodes
: expect
.any(Number
),
770 executedTasks
: expect
.any(Number
),
771 executingTasks
: expect
.any(Number
),
772 queuedTasks
: expect
.any(Number
),
773 maxQueuedTasks
: expect
.any(Number
),
774 failedTasks
: expect
.any(Number
)
779 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
780 const pool
= new DynamicClusterPool(
781 Math
.floor(numberOfWorkers
/ 2),
783 './tests/worker-files/cluster/testWorker.js'
787 pool
.emitter
.on(PoolEvents
.ready
, info
=> {
791 await
waitPoolEvents(pool
, PoolEvents
.ready
, 1)
792 expect(poolReady
).toBe(1)
793 expect(poolInfo
).toStrictEqual({
795 type
: PoolTypes
.dynamic
,
796 worker
: WorkerTypes
.cluster
,
798 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
799 minSize
: expect
.any(Number
),
800 maxSize
: expect
.any(Number
),
801 workerNodes
: expect
.any(Number
),
802 idleWorkerNodes
: expect
.any(Number
),
803 busyWorkerNodes
: expect
.any(Number
),
804 executedTasks
: expect
.any(Number
),
805 executingTasks
: expect
.any(Number
),
806 queuedTasks
: expect
.any(Number
),
807 maxQueuedTasks
: expect
.any(Number
),
808 failedTasks
: expect
.any(Number
)
813 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
814 const pool
= new FixedThreadPool(
816 './tests/worker-files/thread/testWorker.js'
818 const promises
= new Set()
821 pool
.emitter
.on(PoolEvents
.busy
, info
=> {
825 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
826 promises
.add(pool
.execute())
828 await Promise
.all(promises
)
829 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
830 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
831 expect(poolBusy
).toBe(numberOfWorkers
+ 1)
832 expect(poolInfo
).toStrictEqual({
834 type
: PoolTypes
.fixed
,
835 worker
: WorkerTypes
.thread
,
836 ready
: expect
.any(Boolean
),
837 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
838 minSize
: expect
.any(Number
),
839 maxSize
: expect
.any(Number
),
840 workerNodes
: expect
.any(Number
),
841 idleWorkerNodes
: expect
.any(Number
),
842 busyWorkerNodes
: expect
.any(Number
),
843 executedTasks
: expect
.any(Number
),
844 executingTasks
: expect
.any(Number
),
845 queuedTasks
: expect
.any(Number
),
846 maxQueuedTasks
: expect
.any(Number
),
847 failedTasks
: expect
.any(Number
)
852 it('Verify that multiple tasks worker is working', async () => {
853 const pool
= new DynamicClusterPool(
854 Math
.floor(numberOfWorkers
/ 2),
856 './tests/worker-files/cluster/testMultiTasksWorker.js'
858 const data
= { n
: 10 }
859 const result0
= await pool
.execute(data
)
860 expect(result0
).toStrictEqual({ ok
: 1 })
861 const result1
= await pool
.execute(data
, 'jsonIntegerSerialization')
862 expect(result1
).toStrictEqual({ ok
: 1 })
863 const result2
= await pool
.execute(data
, 'factorial')
864 expect(result2
).toBe(3628800)
865 const result3
= await pool
.execute(data
, 'fibonacci')
866 expect(result3
).toBe(55)