1 const { MessageChannel
} = require('worker_threads')
2 const { expect
} = require('expect')
10 WorkerChoiceStrategies
,
12 } = require('../../../lib')
13 const { CircularArray
} = require('../../../lib/circular-array')
14 const { Queue
} = require('../../../lib/queue')
15 const { version
} = require('../../../package.json')
16 const { waitPoolEvents
} = require('../../test-utils')
18 describe('Abstract pool test suite', () => {
19 const numberOfWorkers
= 2
20 class StubPoolWithIsMain
extends FixedThreadPool
{
26 it('Simulate pool creation from a non main thread/process', () => {
29 new StubPoolWithIsMain(
31 './tests/worker-files/thread/testWorker.js',
33 errorHandler
: e
=> console
.error(e
)
36 ).toThrowError('Cannot start a pool from a worker!')
39 it('Verify that filePath is checked', () => {
40 const expectedError
= new Error(
41 'Please specify a file with a worker implementation'
43 expect(() => new FixedThreadPool(numberOfWorkers
)).toThrowError(
46 expect(() => new FixedThreadPool(numberOfWorkers
, '')).toThrowError(
49 expect(() => new FixedThreadPool(numberOfWorkers
, 0)).toThrowError(
52 expect(() => new FixedThreadPool(numberOfWorkers
, true)).toThrowError(
56 () => new FixedThreadPool(numberOfWorkers
, './dummyWorker.ts')
57 ).toThrowError(new Error("Cannot find the worker file './dummyWorker.ts'"))
60 it('Verify that numberOfWorkers is checked', () => {
61 expect(() => new FixedThreadPool()).toThrowError(
62 'Cannot instantiate a pool without specifying the number of workers'
66 it('Verify that a negative number of workers is checked', () => {
69 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
72 'Cannot instantiate a pool with a negative number of workers'
77 it('Verify that a non integer number of workers is checked', () => {
80 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
83 'Cannot instantiate a pool with a non safe integer number of workers'
88 it('Verify that dynamic pool sizing is checked', () => {
91 new DynamicClusterPool(
94 './tests/worker-files/cluster/testWorker.js'
98 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
103 new DynamicThreadPool(
106 './tests/worker-files/thread/testWorker.js'
110 'Cannot instantiate a pool with a non safe integer number of workers'
115 new DynamicClusterPool(
118 './tests/worker-files/cluster/testWorker.js'
122 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
127 new DynamicThreadPool(2, 1, './tests/worker-files/thread/testWorker.js')
130 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
135 new DynamicClusterPool(
138 './tests/worker-files/cluster/testWorker.js'
142 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
147 new DynamicThreadPool(0, 0, './tests/worker-files/thread/testWorker.js')
150 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
155 it('Verify that pool options are checked', async () => {
156 let pool
= new FixedThreadPool(
158 './tests/worker-files/thread/testWorker.js'
160 expect(pool
.emitter
).toBeDefined()
161 expect(pool
.opts
.enableEvents
).toBe(true)
162 expect(pool
.opts
.restartWorkerOnError
).toBe(true)
163 expect(pool
.opts
.enableTasksQueue
).toBe(false)
164 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
165 expect(pool
.opts
.workerChoiceStrategy
).toBe(
166 WorkerChoiceStrategies
.ROUND_ROBIN
168 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
169 runTime
: { median
: false },
170 waitTime
: { median
: false },
171 elu
: { median
: false }
173 expect(pool
.opts
.messageHandler
).toBeUndefined()
174 expect(pool
.opts
.errorHandler
).toBeUndefined()
175 expect(pool
.opts
.onlineHandler
).toBeUndefined()
176 expect(pool
.opts
.exitHandler
).toBeUndefined()
178 const testHandler
= () => console
.info('test handler executed')
179 pool
= new FixedThreadPool(
181 './tests/worker-files/thread/testWorker.js',
183 workerChoiceStrategy
: WorkerChoiceStrategies
.LEAST_USED
,
184 workerChoiceStrategyOptions
: {
185 runTime
: { median
: true },
186 weights
: { 0: 300, 1: 200 }
189 restartWorkerOnError
: false,
190 enableTasksQueue
: true,
191 tasksQueueOptions
: { concurrency
: 2 },
192 messageHandler
: testHandler
,
193 errorHandler
: testHandler
,
194 onlineHandler
: testHandler
,
195 exitHandler
: testHandler
198 expect(pool
.emitter
).toBeUndefined()
199 expect(pool
.opts
.enableEvents
).toBe(false)
200 expect(pool
.opts
.restartWorkerOnError
).toBe(false)
201 expect(pool
.opts
.enableTasksQueue
).toBe(true)
202 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
203 expect(pool
.opts
.workerChoiceStrategy
).toBe(
204 WorkerChoiceStrategies
.LEAST_USED
206 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
207 runTime
: { median
: true },
208 weights
: { 0: 300, 1: 200 }
210 expect(pool
.opts
.messageHandler
).toStrictEqual(testHandler
)
211 expect(pool
.opts
.errorHandler
).toStrictEqual(testHandler
)
212 expect(pool
.opts
.onlineHandler
).toStrictEqual(testHandler
)
213 expect(pool
.opts
.exitHandler
).toStrictEqual(testHandler
)
217 it('Verify that pool options are validated', async () => {
222 './tests/worker-files/thread/testWorker.js',
224 workerChoiceStrategy
: 'invalidStrategy'
227 ).toThrowError("Invalid worker choice strategy 'invalidStrategy'")
232 './tests/worker-files/thread/testWorker.js',
234 workerChoiceStrategyOptions
: 'invalidOptions'
238 'Invalid worker choice strategy options: must be a plain object'
244 './tests/worker-files/thread/testWorker.js',
246 workerChoiceStrategyOptions
: { weights
: {} }
250 'Invalid worker choice strategy options: must have a weight for each worker node'
256 './tests/worker-files/thread/testWorker.js',
258 workerChoiceStrategyOptions
: { measurement
: 'invalidMeasurement' }
262 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
268 './tests/worker-files/thread/testWorker.js',
270 enableTasksQueue
: true,
271 tasksQueueOptions
: { concurrency
: 0 }
274 ).toThrowError("Invalid worker tasks concurrency '0'")
279 './tests/worker-files/thread/testWorker.js',
281 enableTasksQueue
: true,
282 tasksQueueOptions
: 'invalidTasksQueueOptions'
285 ).toThrowError('Invalid tasks queue options: must be a plain object')
290 './tests/worker-files/thread/testWorker.js',
292 enableTasksQueue
: true,
293 tasksQueueOptions
: { concurrency
: 0.2 }
296 ).toThrowError('Invalid worker tasks concurrency: must be an integer')
299 it('Verify that pool worker choice strategy options can be set', async () => {
300 const pool
= new FixedThreadPool(
302 './tests/worker-files/thread/testWorker.js',
303 { workerChoiceStrategy
: WorkerChoiceStrategies
.FAIR_SHARE
}
305 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
306 runTime
: { median
: false },
307 waitTime
: { median
: false },
308 elu
: { median
: false }
310 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
311 .workerChoiceStrategies
) {
312 expect(workerChoiceStrategy
.opts
).toStrictEqual({
313 runTime
: { median
: false },
314 waitTime
: { median
: false },
315 elu
: { median
: false }
319 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
337 pool
.setWorkerChoiceStrategyOptions({
338 runTime
: { median
: true },
339 elu
: { median
: true }
341 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
342 runTime
: { median
: true },
343 elu
: { median
: true }
345 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
346 .workerChoiceStrategies
) {
347 expect(workerChoiceStrategy
.opts
).toStrictEqual({
348 runTime
: { median
: true },
349 elu
: { median
: true }
353 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
371 pool
.setWorkerChoiceStrategyOptions({
372 runTime
: { median
: false },
373 elu
: { median
: false }
375 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
376 runTime
: { median
: false },
377 elu
: { median
: false }
379 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
380 .workerChoiceStrategies
) {
381 expect(workerChoiceStrategy
.opts
).toStrictEqual({
382 runTime
: { median
: false },
383 elu
: { median
: false }
387 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
406 pool
.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
408 'Invalid worker choice strategy options: must be a plain object'
411 pool
.setWorkerChoiceStrategyOptions({ weights
: {} })
413 'Invalid worker choice strategy options: must have a weight for each worker node'
416 pool
.setWorkerChoiceStrategyOptions({ measurement
: 'invalidMeasurement' })
418 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
423 it('Verify that pool tasks queue can be enabled/disabled', async () => {
424 const pool
= new FixedThreadPool(
426 './tests/worker-files/thread/testWorker.js'
428 expect(pool
.opts
.enableTasksQueue
).toBe(false)
429 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
430 pool
.enableTasksQueue(true)
431 expect(pool
.opts
.enableTasksQueue
).toBe(true)
432 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 1 })
433 pool
.enableTasksQueue(true, { concurrency
: 2 })
434 expect(pool
.opts
.enableTasksQueue
).toBe(true)
435 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
436 pool
.enableTasksQueue(false)
437 expect(pool
.opts
.enableTasksQueue
).toBe(false)
438 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
442 it('Verify that pool tasks queue options can be set', async () => {
443 const pool
= new FixedThreadPool(
445 './tests/worker-files/thread/testWorker.js',
446 { enableTasksQueue
: true }
448 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 1 })
449 pool
.setTasksQueueOptions({ concurrency
: 2 })
450 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
452 pool
.setTasksQueueOptions('invalidTasksQueueOptions')
453 ).toThrowError('Invalid tasks queue options: must be a plain object')
454 expect(() => pool
.setTasksQueueOptions({ concurrency
: 0 })).toThrowError(
455 "Invalid worker tasks concurrency '0'"
457 expect(() => pool
.setTasksQueueOptions({ concurrency
: 0.2 })).toThrowError(
458 'Invalid worker tasks concurrency: must be an integer'
463 it('Verify that pool info is set', async () => {
464 let pool
= new FixedThreadPool(
466 './tests/worker-files/thread/testWorker.js'
468 expect(pool
.info
).toStrictEqual({
470 type
: PoolTypes
.fixed
,
471 worker
: WorkerTypes
.thread
,
473 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
474 minSize
: numberOfWorkers
,
475 maxSize
: numberOfWorkers
,
476 workerNodes
: numberOfWorkers
,
477 idleWorkerNodes
: numberOfWorkers
,
484 pool
= new DynamicClusterPool(
485 Math
.floor(numberOfWorkers
/ 2),
487 './tests/worker-files/cluster/testWorker.js'
489 expect(pool
.info
).toStrictEqual({
491 type
: PoolTypes
.dynamic
,
492 worker
: WorkerTypes
.cluster
,
494 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
495 minSize
: Math
.floor(numberOfWorkers
/ 2),
496 maxSize
: numberOfWorkers
,
497 workerNodes
: Math
.floor(numberOfWorkers
/ 2),
498 idleWorkerNodes
: Math
.floor(numberOfWorkers
/ 2),
507 it('Verify that pool worker tasks usage are initialized', async () => {
508 const pool
= new FixedClusterPool(
510 './tests/worker-files/cluster/testWorker.js'
512 for (const workerNode
of pool
.workerNodes
) {
513 expect(workerNode
.usage
).toStrictEqual({
522 history
: expect
.any(CircularArray
)
525 history
: expect
.any(CircularArray
)
529 history
: expect
.any(CircularArray
)
532 history
: expect
.any(CircularArray
)
540 it('Verify that pool worker tasks queue are initialized', async () => {
541 let pool
= new FixedClusterPool(
543 './tests/worker-files/cluster/testWorker.js'
545 for (const workerNode
of pool
.workerNodes
) {
546 expect(workerNode
.tasksQueue
).toBeDefined()
547 expect(workerNode
.tasksQueue
).toBeInstanceOf(Queue
)
548 expect(workerNode
.tasksQueue
.size
).toBe(0)
549 expect(workerNode
.tasksQueue
.maxSize
).toBe(0)
552 pool
= new DynamicThreadPool(
553 Math
.floor(numberOfWorkers
/ 2),
555 './tests/worker-files/thread/testWorker.js'
557 for (const workerNode
of pool
.workerNodes
) {
558 expect(workerNode
.tasksQueue
).toBeDefined()
559 expect(workerNode
.tasksQueue
).toBeInstanceOf(Queue
)
560 expect(workerNode
.tasksQueue
.size
).toBe(0)
561 expect(workerNode
.tasksQueue
.maxSize
).toBe(0)
565 it('Verify that pool worker info are initialized', async () => {
566 let pool
= new FixedClusterPool(
568 './tests/worker-files/cluster/testWorker.js'
570 for (const workerNode
of pool
.workerNodes
) {
571 expect(workerNode
.info
).toStrictEqual({
572 id
: expect
.any(Number
),
573 type
: WorkerTypes
.cluster
,
579 pool
= new DynamicThreadPool(
580 Math
.floor(numberOfWorkers
/ 2),
582 './tests/worker-files/thread/testWorker.js'
584 for (const workerNode
of pool
.workerNodes
) {
585 expect(workerNode
.info
).toStrictEqual({
586 id
: expect
.any(Number
),
587 type
: WorkerTypes
.thread
,
590 messageChannel
: expect
.any(MessageChannel
)
595 it('Verify that pool worker tasks usage are computed', async () => {
596 const pool
= new FixedClusterPool(
598 './tests/worker-files/cluster/testWorker.js'
600 const promises
= new Set()
601 const maxMultiplier
= 2
602 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
603 promises
.add(pool
.execute())
605 for (const workerNode
of pool
.workerNodes
) {
606 expect(workerNode
.usage
).toStrictEqual({
609 executing
: maxMultiplier
,
615 history
: expect
.any(CircularArray
)
618 history
: expect
.any(CircularArray
)
622 history
: expect
.any(CircularArray
)
625 history
: expect
.any(CircularArray
)
630 await Promise
.all(promises
)
631 for (const workerNode
of pool
.workerNodes
) {
632 expect(workerNode
.usage
).toStrictEqual({
634 executed
: maxMultiplier
,
641 history
: expect
.any(CircularArray
)
644 history
: expect
.any(CircularArray
)
648 history
: expect
.any(CircularArray
)
651 history
: expect
.any(CircularArray
)
659 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
660 const pool
= new DynamicThreadPool(
661 Math
.floor(numberOfWorkers
/ 2),
663 './tests/worker-files/thread/testWorker.js'
665 const promises
= new Set()
666 const maxMultiplier
= 2
667 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
668 promises
.add(pool
.execute())
670 await Promise
.all(promises
)
671 for (const workerNode
of pool
.workerNodes
) {
672 expect(workerNode
.usage
).toStrictEqual({
674 executed
: expect
.any(Number
),
681 history
: expect
.any(CircularArray
)
684 history
: expect
.any(CircularArray
)
688 history
: expect
.any(CircularArray
)
691 history
: expect
.any(CircularArray
)
695 expect(workerNode
.usage
.tasks
.executed
).toBeGreaterThan(0)
696 expect(workerNode
.usage
.tasks
.executed
).toBeLessThanOrEqual(maxMultiplier
)
697 expect(workerNode
.usage
.runTime
.history
.length
).toBe(0)
698 expect(workerNode
.usage
.waitTime
.history
.length
).toBe(0)
699 expect(workerNode
.usage
.elu
.idle
.history
.length
).toBe(0)
700 expect(workerNode
.usage
.elu
.active
.history
.length
).toBe(0)
702 pool
.setWorkerChoiceStrategy(WorkerChoiceStrategies
.FAIR_SHARE
)
703 for (const workerNode
of pool
.workerNodes
) {
704 expect(workerNode
.usage
).toStrictEqual({
713 history
: expect
.any(CircularArray
)
716 history
: expect
.any(CircularArray
)
720 history
: expect
.any(CircularArray
)
723 history
: expect
.any(CircularArray
)
727 expect(workerNode
.usage
.runTime
.history
.length
).toBe(0)
728 expect(workerNode
.usage
.waitTime
.history
.length
).toBe(0)
729 expect(workerNode
.usage
.elu
.idle
.history
.length
).toBe(0)
730 expect(workerNode
.usage
.elu
.active
.history
.length
).toBe(0)
735 it("Verify that pool event emitter 'full' event can register a callback", async () => {
736 const pool
= new DynamicThreadPool(
737 Math
.floor(numberOfWorkers
/ 2),
739 './tests/worker-files/thread/testWorker.js'
741 const promises
= new Set()
744 pool
.emitter
.on(PoolEvents
.full
, info
=> {
748 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
749 promises
.add(pool
.execute())
751 await Promise
.all(promises
)
752 // The `full` event is triggered when the number of submitted tasks at once reach the maximum number of workers in the dynamic pool.
753 // So in total numberOfWorkers * 2 - 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool with min = (max = numberOfWorkers) / 2.
754 expect(poolFull
).toBe(numberOfWorkers
* 2 - 1)
755 expect(poolInfo
).toStrictEqual({
757 type
: PoolTypes
.dynamic
,
758 worker
: WorkerTypes
.thread
,
759 ready
: expect
.any(Boolean
),
760 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
761 minSize
: expect
.any(Number
),
762 maxSize
: expect
.any(Number
),
763 workerNodes
: expect
.any(Number
),
764 idleWorkerNodes
: expect
.any(Number
),
765 busyWorkerNodes
: expect
.any(Number
),
766 executedTasks
: expect
.any(Number
),
767 executingTasks
: expect
.any(Number
),
768 failedTasks
: expect
.any(Number
)
773 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
774 const pool
= new DynamicClusterPool(
775 Math
.floor(numberOfWorkers
/ 2),
777 './tests/worker-files/cluster/testWorker.js'
781 pool
.emitter
.on(PoolEvents
.ready
, info
=> {
785 await
waitPoolEvents(pool
, PoolEvents
.ready
, 1)
786 expect(poolReady
).toBe(1)
787 expect(poolInfo
).toStrictEqual({
789 type
: PoolTypes
.dynamic
,
790 worker
: WorkerTypes
.cluster
,
792 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
793 minSize
: expect
.any(Number
),
794 maxSize
: expect
.any(Number
),
795 workerNodes
: expect
.any(Number
),
796 idleWorkerNodes
: expect
.any(Number
),
797 busyWorkerNodes
: expect
.any(Number
),
798 executedTasks
: expect
.any(Number
),
799 executingTasks
: expect
.any(Number
),
800 failedTasks
: expect
.any(Number
)
805 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
806 const pool
= new FixedThreadPool(
808 './tests/worker-files/thread/testWorker.js'
810 const promises
= new Set()
813 pool
.emitter
.on(PoolEvents
.busy
, info
=> {
817 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
818 promises
.add(pool
.execute())
820 await Promise
.all(promises
)
821 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
822 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
823 expect(poolBusy
).toBe(numberOfWorkers
+ 1)
824 expect(poolInfo
).toStrictEqual({
826 type
: PoolTypes
.fixed
,
827 worker
: WorkerTypes
.thread
,
828 ready
: expect
.any(Boolean
),
829 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
830 minSize
: expect
.any(Number
),
831 maxSize
: expect
.any(Number
),
832 workerNodes
: expect
.any(Number
),
833 idleWorkerNodes
: expect
.any(Number
),
834 busyWorkerNodes
: expect
.any(Number
),
835 executedTasks
: expect
.any(Number
),
836 executingTasks
: expect
.any(Number
),
837 failedTasks
: expect
.any(Number
)
842 it('Verify that multiple tasks worker is working', async () => {
843 const pool
= new DynamicClusterPool(
844 Math
.floor(numberOfWorkers
/ 2),
846 './tests/worker-files/cluster/testMultiTasksWorker.js'
848 const data
= { n
: 10 }
849 const result0
= await pool
.execute(data
)
850 expect(result0
).toStrictEqual({ ok
: 1 })
851 const result1
= await pool
.execute(data
, 'jsonIntegerSerialization')
852 expect(result1
).toStrictEqual({ ok
: 1 })
853 const result2
= await pool
.execute(data
, 'factorial')
854 expect(result2
).toBe(3628800)
855 const result3
= await pool
.execute(data
, 'fibonacci')
856 expect(result3
).toBe(55)