1 const { expect
} = require('expect')
2 const sinon
= require('sinon')
10 WorkerChoiceStrategies
,
12 } = require('../../../lib')
13 const { CircularArray
} = require('../../../lib/circular-array')
14 const { Deque
} = require('../../../lib/deque')
15 const { DEFAULT_TASK_NAME
} = require('../../../lib/utils')
16 const { version
} = require('../../../package.json')
17 const { waitPoolEvents
} = require('../../test-utils')
19 describe('Abstract pool test suite', () => {
20 const numberOfWorkers
= 2
21 class StubPoolWithIsMain
extends FixedThreadPool
{
31 it('Simulate pool creation from a non main thread/process', () => {
34 new StubPoolWithIsMain(
36 './tests/worker-files/thread/testWorker.js',
38 errorHandler
: (e
) => console
.error(e
)
43 'Cannot start a pool from a worker with the same type as the pool'
48 it('Verify that pool statuses properties are set', async () => {
49 const pool
= new FixedThreadPool(
51 './tests/worker-files/thread/testWorker.js'
53 expect(pool
.starting
).toBe(false)
54 expect(pool
.started
).toBe(true)
58 it('Verify that filePath is checked', () => {
59 const expectedError
= new Error(
60 'Please specify a file with a worker implementation'
62 expect(() => new FixedThreadPool(numberOfWorkers
)).toThrowError(
65 expect(() => new FixedThreadPool(numberOfWorkers
, '')).toThrowError(
68 expect(() => new FixedThreadPool(numberOfWorkers
, 0)).toThrowError(
71 expect(() => new FixedThreadPool(numberOfWorkers
, true)).toThrowError(
75 () => new FixedThreadPool(numberOfWorkers
, './dummyWorker.ts')
76 ).toThrowError(new Error("Cannot find the worker file './dummyWorker.ts'"))
79 it('Verify that numberOfWorkers is checked', () => {
80 expect(() => new FixedThreadPool()).toThrowError(
82 'Cannot instantiate a pool without specifying the number of workers'
87 it('Verify that a negative number of workers is checked', () => {
90 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
93 'Cannot instantiate a pool with a negative number of workers'
98 it('Verify that a non integer number of workers is checked', () => {
101 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
104 'Cannot instantiate a pool with a non safe integer number of workers'
109 it('Verify that dynamic pool sizing is checked', () => {
112 new DynamicClusterPool(
115 './tests/worker-files/cluster/testWorker.js'
119 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
124 new DynamicThreadPool(
127 './tests/worker-files/thread/testWorker.js'
131 'Cannot instantiate a pool with a non safe integer number of workers'
136 new DynamicClusterPool(
139 './tests/worker-files/cluster/testWorker.js'
143 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
148 new DynamicThreadPool(2, 1, './tests/worker-files/thread/testWorker.js')
151 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
156 new DynamicThreadPool(0, 0, './tests/worker-files/thread/testWorker.js')
159 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
164 new DynamicClusterPool(
167 './tests/worker-files/cluster/testWorker.js'
171 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
176 it('Verify that pool options are checked', async () => {
177 let pool
= new FixedThreadPool(
179 './tests/worker-files/thread/testWorker.js'
181 expect(pool
.emitter
).toBeDefined()
182 expect(pool
.opts
.enableEvents
).toBe(true)
183 expect(pool
.opts
.restartWorkerOnError
).toBe(true)
184 expect(pool
.opts
.enableTasksQueue
).toBe(false)
185 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
186 expect(pool
.opts
.workerChoiceStrategy
).toBe(
187 WorkerChoiceStrategies
.ROUND_ROBIN
189 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
191 runTime
: { median
: false },
192 waitTime
: { median
: false },
193 elu
: { median
: false }
195 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
197 runTime
: { median
: false },
198 waitTime
: { median
: false },
199 elu
: { median
: false }
201 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
202 .workerChoiceStrategies
) {
203 expect(workerChoiceStrategy
.opts
).toStrictEqual({
205 runTime
: { median
: false },
206 waitTime
: { median
: false },
207 elu
: { median
: false }
210 expect(pool
.opts
.messageHandler
).toBeUndefined()
211 expect(pool
.opts
.errorHandler
).toBeUndefined()
212 expect(pool
.opts
.onlineHandler
).toBeUndefined()
213 expect(pool
.opts
.exitHandler
).toBeUndefined()
215 const testHandler
= () => console
.info('test handler executed')
216 pool
= new FixedThreadPool(
218 './tests/worker-files/thread/testWorker.js',
220 workerChoiceStrategy
: WorkerChoiceStrategies
.LEAST_USED
,
221 workerChoiceStrategyOptions
: {
222 runTime
: { median
: true },
223 weights
: { 0: 300, 1: 200 }
226 restartWorkerOnError
: false,
227 enableTasksQueue
: true,
228 tasksQueueOptions
: { concurrency
: 2 },
229 messageHandler
: testHandler
,
230 errorHandler
: testHandler
,
231 onlineHandler
: testHandler
,
232 exitHandler
: testHandler
235 expect(pool
.emitter
).toBeUndefined()
236 expect(pool
.opts
.enableEvents
).toBe(false)
237 expect(pool
.opts
.restartWorkerOnError
).toBe(false)
238 expect(pool
.opts
.enableTasksQueue
).toBe(true)
239 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({
243 expect(pool
.opts
.workerChoiceStrategy
).toBe(
244 WorkerChoiceStrategies
.LEAST_USED
246 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
248 runTime
: { median
: true },
249 waitTime
: { median
: false },
250 elu
: { median
: false },
251 weights
: { 0: 300, 1: 200 }
253 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
255 runTime
: { median
: true },
256 waitTime
: { median
: false },
257 elu
: { median
: false },
258 weights
: { 0: 300, 1: 200 }
260 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
261 .workerChoiceStrategies
) {
262 expect(workerChoiceStrategy
.opts
).toStrictEqual({
264 runTime
: { median
: true },
265 waitTime
: { median
: false },
266 elu
: { median
: false },
267 weights
: { 0: 300, 1: 200 }
270 expect(pool
.opts
.messageHandler
).toStrictEqual(testHandler
)
271 expect(pool
.opts
.errorHandler
).toStrictEqual(testHandler
)
272 expect(pool
.opts
.onlineHandler
).toStrictEqual(testHandler
)
273 expect(pool
.opts
.exitHandler
).toStrictEqual(testHandler
)
277 it('Verify that pool options are validated', async () => {
282 './tests/worker-files/thread/testWorker.js',
284 workerChoiceStrategy
: 'invalidStrategy'
288 new Error("Invalid worker choice strategy 'invalidStrategy'")
294 './tests/worker-files/thread/testWorker.js',
296 workerChoiceStrategyOptions
: {
297 retries
: 'invalidChoiceRetries'
303 'Invalid worker choice strategy options: retries must be an integer'
310 './tests/worker-files/thread/testWorker.js',
312 workerChoiceStrategyOptions
: {
319 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
326 './tests/worker-files/thread/testWorker.js',
328 workerChoiceStrategyOptions
: { weights
: {} }
333 'Invalid worker choice strategy options: must have a weight for each worker node'
340 './tests/worker-files/thread/testWorker.js',
342 workerChoiceStrategyOptions
: { measurement
: 'invalidMeasurement' }
347 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
354 './tests/worker-files/thread/testWorker.js',
356 enableTasksQueue
: true,
357 tasksQueueOptions
: 'invalidTasksQueueOptions'
361 new TypeError('Invalid tasks queue options: must be a plain object')
367 './tests/worker-files/thread/testWorker.js',
369 enableTasksQueue
: true,
370 tasksQueueOptions
: { concurrency
: 0 }
375 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
382 './tests/worker-files/thread/testWorker.js',
384 enableTasksQueue
: true,
385 tasksQueueOptions
: { concurrency
: -1 }
390 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
397 './tests/worker-files/thread/testWorker.js',
399 enableTasksQueue
: true,
400 tasksQueueOptions
: { concurrency
: 0.2 }
404 new TypeError('Invalid worker node tasks concurrency: must be an integer')
410 './tests/worker-files/thread/testWorker.js',
412 enableTasksQueue
: true,
413 tasksQueueOptions
: { queueMaxSize
: 2 }
418 'Invalid tasks queue options: queueMaxSize is deprecated, please use size instead'
425 './tests/worker-files/thread/testWorker.js',
427 enableTasksQueue
: true,
428 tasksQueueOptions
: { size
: 0 }
433 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
440 './tests/worker-files/thread/testWorker.js',
442 enableTasksQueue
: true,
443 tasksQueueOptions
: { size
: -1 }
448 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
455 './tests/worker-files/thread/testWorker.js',
457 enableTasksQueue
: true,
458 tasksQueueOptions
: { size
: 0.2 }
462 new TypeError('Invalid worker node tasks queue size: must be an integer')
466 it('Verify that pool worker choice strategy options can be set', async () => {
467 const pool
= new FixedThreadPool(
469 './tests/worker-files/thread/testWorker.js',
470 { workerChoiceStrategy
: WorkerChoiceStrategies
.FAIR_SHARE
}
472 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
474 runTime
: { median
: false },
475 waitTime
: { median
: false },
476 elu
: { median
: false }
478 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
480 runTime
: { median
: false },
481 waitTime
: { median
: false },
482 elu
: { median
: false }
484 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
485 .workerChoiceStrategies
) {
486 expect(workerChoiceStrategy
.opts
).toStrictEqual({
488 runTime
: { median
: false },
489 waitTime
: { median
: false },
490 elu
: { median
: false }
494 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
512 pool
.setWorkerChoiceStrategyOptions({
513 runTime
: { median
: true },
514 elu
: { median
: true }
516 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
518 runTime
: { median
: true },
519 waitTime
: { median
: false },
520 elu
: { median
: true }
522 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
524 runTime
: { median
: true },
525 waitTime
: { median
: false },
526 elu
: { median
: true }
528 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
529 .workerChoiceStrategies
) {
530 expect(workerChoiceStrategy
.opts
).toStrictEqual({
532 runTime
: { median
: true },
533 waitTime
: { median
: false },
534 elu
: { median
: true }
538 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
556 pool
.setWorkerChoiceStrategyOptions({
557 runTime
: { median
: false },
558 elu
: { median
: false }
560 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
562 runTime
: { median
: false },
563 waitTime
: { median
: false },
564 elu
: { median
: false }
566 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
568 runTime
: { median
: false },
569 waitTime
: { median
: false },
570 elu
: { median
: false }
572 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
573 .workerChoiceStrategies
) {
574 expect(workerChoiceStrategy
.opts
).toStrictEqual({
576 runTime
: { median
: false },
577 waitTime
: { median
: false },
578 elu
: { median
: false }
582 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
601 pool
.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
604 'Invalid worker choice strategy options: must be a plain object'
608 pool
.setWorkerChoiceStrategyOptions({
609 retries
: 'invalidChoiceRetries'
613 'Invalid worker choice strategy options: retries must be an integer'
617 pool
.setWorkerChoiceStrategyOptions({ retries
: -1 })
620 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
624 pool
.setWorkerChoiceStrategyOptions({ weights
: {} })
627 'Invalid worker choice strategy options: must have a weight for each worker node'
631 pool
.setWorkerChoiceStrategyOptions({ measurement
: 'invalidMeasurement' })
634 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
640 it('Verify that pool tasks queue can be enabled/disabled', async () => {
641 const pool
= new FixedThreadPool(
643 './tests/worker-files/thread/testWorker.js'
645 expect(pool
.opts
.enableTasksQueue
).toBe(false)
646 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
647 pool
.enableTasksQueue(true)
648 expect(pool
.opts
.enableTasksQueue
).toBe(true)
649 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({
653 pool
.enableTasksQueue(true, { concurrency
: 2 })
654 expect(pool
.opts
.enableTasksQueue
).toBe(true)
655 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({
659 pool
.enableTasksQueue(false)
660 expect(pool
.opts
.enableTasksQueue
).toBe(false)
661 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
665 it('Verify that pool tasks queue options can be set', async () => {
666 const pool
= new FixedThreadPool(
668 './tests/worker-files/thread/testWorker.js',
669 { enableTasksQueue
: true }
671 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({
675 pool
.setTasksQueueOptions({ concurrency
: 2 })
676 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({
681 pool
.setTasksQueueOptions('invalidTasksQueueOptions')
683 new TypeError('Invalid tasks queue options: must be a plain object')
685 expect(() => pool
.setTasksQueueOptions({ concurrency
: 0 })).toThrowError(
687 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
690 expect(() => pool
.setTasksQueueOptions({ concurrency
: -1 })).toThrowError(
692 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
695 expect(() => pool
.setTasksQueueOptions({ concurrency
: 0.2 })).toThrowError(
696 new TypeError('Invalid worker node tasks concurrency: must be an integer')
698 expect(() => pool
.setTasksQueueOptions({ queueMaxSize
: 2 })).toThrowError(
700 'Invalid tasks queue options: queueMaxSize is deprecated, please use size instead'
703 expect(() => pool
.setTasksQueueOptions({ size
: 0 })).toThrowError(
705 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
708 expect(() => pool
.setTasksQueueOptions({ size
: -1 })).toThrowError(
710 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
713 expect(() => pool
.setTasksQueueOptions({ size
: 0.2 })).toThrowError(
714 new TypeError('Invalid worker node tasks queue size: must be an integer')
719 it('Verify that pool info is set', async () => {
720 let pool
= new FixedThreadPool(
722 './tests/worker-files/thread/testWorker.js'
724 expect(pool
.info
).toStrictEqual({
726 type
: PoolTypes
.fixed
,
727 worker
: WorkerTypes
.thread
,
729 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
730 minSize
: numberOfWorkers
,
731 maxSize
: numberOfWorkers
,
732 workerNodes
: numberOfWorkers
,
733 idleWorkerNodes
: numberOfWorkers
,
740 pool
= new DynamicClusterPool(
741 Math
.floor(numberOfWorkers
/ 2),
743 './tests/worker-files/cluster/testWorker.js'
745 expect(pool
.info
).toStrictEqual({
747 type
: PoolTypes
.dynamic
,
748 worker
: WorkerTypes
.cluster
,
750 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
751 minSize
: Math
.floor(numberOfWorkers
/ 2),
752 maxSize
: numberOfWorkers
,
753 workerNodes
: Math
.floor(numberOfWorkers
/ 2),
754 idleWorkerNodes
: Math
.floor(numberOfWorkers
/ 2),
763 it('Verify that pool worker tasks usage are initialized', async () => {
764 const pool
= new FixedClusterPool(
766 './tests/worker-files/cluster/testWorker.js'
768 for (const workerNode
of pool
.workerNodes
) {
769 expect(workerNode
.usage
).toStrictEqual({
779 history
: new CircularArray()
782 history
: new CircularArray()
786 history
: new CircularArray()
789 history
: new CircularArray()
797 it('Verify that pool worker tasks queue are initialized', async () => {
798 let pool
= new FixedClusterPool(
800 './tests/worker-files/cluster/testWorker.js'
802 for (const workerNode
of pool
.workerNodes
) {
803 expect(workerNode
.tasksQueue
).toBeDefined()
804 expect(workerNode
.tasksQueue
).toBeInstanceOf(Deque
)
805 expect(workerNode
.tasksQueue
.size
).toBe(0)
806 expect(workerNode
.tasksQueue
.maxSize
).toBe(0)
809 pool
= new DynamicThreadPool(
810 Math
.floor(numberOfWorkers
/ 2),
812 './tests/worker-files/thread/testWorker.js'
814 for (const workerNode
of pool
.workerNodes
) {
815 expect(workerNode
.tasksQueue
).toBeDefined()
816 expect(workerNode
.tasksQueue
).toBeInstanceOf(Deque
)
817 expect(workerNode
.tasksQueue
.size
).toBe(0)
818 expect(workerNode
.tasksQueue
.maxSize
).toBe(0)
823 it('Verify that pool worker info are initialized', async () => {
824 let pool
= new FixedClusterPool(
826 './tests/worker-files/cluster/testWorker.js'
828 for (const workerNode
of pool
.workerNodes
) {
829 expect(workerNode
.info
).toStrictEqual({
830 id
: expect
.any(Number
),
831 type
: WorkerTypes
.cluster
,
837 pool
= new DynamicThreadPool(
838 Math
.floor(numberOfWorkers
/ 2),
840 './tests/worker-files/thread/testWorker.js'
842 for (const workerNode
of pool
.workerNodes
) {
843 expect(workerNode
.info
).toStrictEqual({
844 id
: expect
.any(Number
),
845 type
: WorkerTypes
.thread
,
853 it('Verify that pool execute() arguments are checked', async () => {
854 const pool
= new FixedClusterPool(
856 './tests/worker-files/cluster/testWorker.js'
858 await
expect(pool
.execute(undefined, 0)).rejects
.toThrowError(
859 new TypeError('name argument must be a string')
861 await
expect(pool
.execute(undefined, '')).rejects
.toThrowError(
862 new TypeError('name argument must not be an empty string')
864 await
expect(pool
.execute(undefined, undefined, {})).rejects
.toThrowError(
865 new TypeError('transferList argument must be an array')
867 await
expect(pool
.execute(undefined, 'unknown')).rejects
.toBe(
868 "Task function 'unknown' not found"
871 await
expect(pool
.execute(undefined, undefined, {})).rejects
.toThrowError(
872 new Error('Cannot execute a task on destroyed pool')
876 it('Verify that pool worker tasks usage are computed', async () => {
877 const pool
= new FixedClusterPool(
879 './tests/worker-files/cluster/testWorker.js'
881 const promises
= new Set()
882 const maxMultiplier
= 2
883 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
884 promises
.add(pool
.execute())
886 for (const workerNode
of pool
.workerNodes
) {
887 expect(workerNode
.usage
).toStrictEqual({
890 executing
: maxMultiplier
,
897 history
: expect
.any(CircularArray
)
900 history
: expect
.any(CircularArray
)
904 history
: expect
.any(CircularArray
)
907 history
: expect
.any(CircularArray
)
912 await Promise
.all(promises
)
913 for (const workerNode
of pool
.workerNodes
) {
914 expect(workerNode
.usage
).toStrictEqual({
916 executed
: maxMultiplier
,
924 history
: expect
.any(CircularArray
)
927 history
: expect
.any(CircularArray
)
931 history
: expect
.any(CircularArray
)
934 history
: expect
.any(CircularArray
)
942 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
943 const pool
= new DynamicThreadPool(
944 Math
.floor(numberOfWorkers
/ 2),
946 './tests/worker-files/thread/testWorker.js'
948 const promises
= new Set()
949 const maxMultiplier
= 2
950 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
951 promises
.add(pool
.execute())
953 await Promise
.all(promises
)
954 for (const workerNode
of pool
.workerNodes
) {
955 expect(workerNode
.usage
).toStrictEqual({
957 executed
: expect
.any(Number
),
965 history
: expect
.any(CircularArray
)
968 history
: expect
.any(CircularArray
)
972 history
: expect
.any(CircularArray
)
975 history
: expect
.any(CircularArray
)
979 expect(workerNode
.usage
.tasks
.executed
).toBeGreaterThan(0)
980 expect(workerNode
.usage
.tasks
.executed
).toBeLessThanOrEqual(
981 numberOfWorkers
* maxMultiplier
983 expect(workerNode
.usage
.runTime
.history
.length
).toBe(0)
984 expect(workerNode
.usage
.waitTime
.history
.length
).toBe(0)
985 expect(workerNode
.usage
.elu
.idle
.history
.length
).toBe(0)
986 expect(workerNode
.usage
.elu
.active
.history
.length
).toBe(0)
988 pool
.setWorkerChoiceStrategy(WorkerChoiceStrategies
.FAIR_SHARE
)
989 for (const workerNode
of pool
.workerNodes
) {
990 expect(workerNode
.usage
).toStrictEqual({
1000 history
: expect
.any(CircularArray
)
1003 history
: expect
.any(CircularArray
)
1007 history
: expect
.any(CircularArray
)
1010 history
: expect
.any(CircularArray
)
1014 expect(workerNode
.usage
.runTime
.history
.length
).toBe(0)
1015 expect(workerNode
.usage
.waitTime
.history
.length
).toBe(0)
1016 expect(workerNode
.usage
.elu
.idle
.history
.length
).toBe(0)
1017 expect(workerNode
.usage
.elu
.active
.history
.length
).toBe(0)
1019 await pool
.destroy()
1022 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1023 const pool
= new DynamicClusterPool(
1024 Math
.floor(numberOfWorkers
/ 2),
1026 './tests/worker-files/cluster/testWorker.js'
1030 pool
.emitter
.on(PoolEvents
.ready
, (info
) => {
1034 await
waitPoolEvents(pool
, PoolEvents
.ready
, 1)
1035 expect(poolReady
).toBe(1)
1036 expect(poolInfo
).toStrictEqual({
1038 type
: PoolTypes
.dynamic
,
1039 worker
: WorkerTypes
.cluster
,
1041 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
1042 minSize
: expect
.any(Number
),
1043 maxSize
: expect
.any(Number
),
1044 workerNodes
: expect
.any(Number
),
1045 idleWorkerNodes
: expect
.any(Number
),
1046 busyWorkerNodes
: expect
.any(Number
),
1047 executedTasks
: expect
.any(Number
),
1048 executingTasks
: expect
.any(Number
),
1049 failedTasks
: expect
.any(Number
)
1051 await pool
.destroy()
1054 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1055 const pool
= new FixedThreadPool(
1057 './tests/worker-files/thread/testWorker.js'
1059 const promises
= new Set()
1062 pool
.emitter
.on(PoolEvents
.busy
, (info
) => {
1066 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
1067 promises
.add(pool
.execute())
1069 await Promise
.all(promises
)
1070 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1071 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1072 expect(poolBusy
).toBe(numberOfWorkers
+ 1)
1073 expect(poolInfo
).toStrictEqual({
1075 type
: PoolTypes
.fixed
,
1076 worker
: WorkerTypes
.thread
,
1077 ready
: expect
.any(Boolean
),
1078 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
1079 minSize
: expect
.any(Number
),
1080 maxSize
: expect
.any(Number
),
1081 workerNodes
: expect
.any(Number
),
1082 idleWorkerNodes
: expect
.any(Number
),
1083 busyWorkerNodes
: expect
.any(Number
),
1084 executedTasks
: expect
.any(Number
),
1085 executingTasks
: expect
.any(Number
),
1086 failedTasks
: expect
.any(Number
)
1088 await pool
.destroy()
1091 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1092 const pool
= new DynamicThreadPool(
1093 Math
.floor(numberOfWorkers
/ 2),
1095 './tests/worker-files/thread/testWorker.js'
1097 const promises
= new Set()
1100 pool
.emitter
.on(PoolEvents
.full
, (info
) => {
1104 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
1105 promises
.add(pool
.execute())
1107 await Promise
.all(promises
)
1108 expect(poolFull
).toBe(1)
1109 expect(poolInfo
).toStrictEqual({
1111 type
: PoolTypes
.dynamic
,
1112 worker
: WorkerTypes
.thread
,
1113 ready
: expect
.any(Boolean
),
1114 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
1115 minSize
: expect
.any(Number
),
1116 maxSize
: expect
.any(Number
),
1117 workerNodes
: expect
.any(Number
),
1118 idleWorkerNodes
: expect
.any(Number
),
1119 busyWorkerNodes
: expect
.any(Number
),
1120 executedTasks
: expect
.any(Number
),
1121 executingTasks
: expect
.any(Number
),
1122 failedTasks
: expect
.any(Number
)
1124 await pool
.destroy()
1127 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
1128 const pool
= new FixedThreadPool(
1130 './tests/worker-files/thread/testWorker.js',
1132 enableTasksQueue
: true
1135 sinon
.stub(pool
, 'hasBackPressure').returns(true)
1136 const promises
= new Set()
1137 let poolBackPressure
= 0
1139 pool
.emitter
.on(PoolEvents
.backPressure
, (info
) => {
1143 for (let i
= 0; i
< numberOfWorkers
+ 1; i
++) {
1144 promises
.add(pool
.execute())
1146 await Promise
.all(promises
)
1147 expect(poolBackPressure
).toBe(1)
1148 expect(poolInfo
).toStrictEqual({
1150 type
: PoolTypes
.fixed
,
1151 worker
: WorkerTypes
.thread
,
1152 ready
: expect
.any(Boolean
),
1153 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
1154 minSize
: expect
.any(Number
),
1155 maxSize
: expect
.any(Number
),
1156 workerNodes
: expect
.any(Number
),
1157 idleWorkerNodes
: expect
.any(Number
),
1158 busyWorkerNodes
: expect
.any(Number
),
1159 executedTasks
: expect
.any(Number
),
1160 executingTasks
: expect
.any(Number
),
1161 maxQueuedTasks
: expect
.any(Number
),
1162 queuedTasks
: expect
.any(Number
),
1164 stolenTasks
: expect
.any(Number
),
1165 failedTasks
: expect
.any(Number
)
1167 expect(pool
.hasBackPressure
.called
).toBe(true)
1168 await pool
.destroy()
1171 it('Verify that listTaskFunctions() is working', async () => {
1172 const dynamicThreadPool
= new DynamicThreadPool(
1173 Math
.floor(numberOfWorkers
/ 2),
1175 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.js'
1177 await
waitPoolEvents(dynamicThreadPool
, PoolEvents
.ready
, 1)
1178 expect(dynamicThreadPool
.listTaskFunctions()).toStrictEqual([
1180 'jsonIntegerSerialization',
1184 const fixedClusterPool
= new FixedClusterPool(
1186 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1188 await
waitPoolEvents(fixedClusterPool
, PoolEvents
.ready
, 1)
1189 expect(fixedClusterPool
.listTaskFunctions()).toStrictEqual([
1191 'jsonIntegerSerialization',
1195 await dynamicThreadPool
.destroy()
1196 await fixedClusterPool
.destroy()
1199 it('Verify that multiple task functions worker is working', async () => {
1200 const pool
= new DynamicClusterPool(
1201 Math
.floor(numberOfWorkers
/ 2),
1203 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1205 const data
= { n
: 10 }
1206 const result0
= await pool
.execute(data
)
1207 expect(result0
).toStrictEqual({ ok
: 1 })
1208 const result1
= await pool
.execute(data
, 'jsonIntegerSerialization')
1209 expect(result1
).toStrictEqual({ ok
: 1 })
1210 const result2
= await pool
.execute(data
, 'factorial')
1211 expect(result2
).toBe(3628800)
1212 const result3
= await pool
.execute(data
, 'fibonacci')
1213 expect(result3
).toBe(55)
1214 expect(pool
.info
.executingTasks
).toBe(0)
1215 expect(pool
.info
.executedTasks
).toBe(4)
1216 for (const workerNode
of pool
.workerNodes
) {
1217 expect(workerNode
.info
.taskFunctions
).toStrictEqual([
1219 'jsonIntegerSerialization',
1223 expect(workerNode
.taskFunctionsUsage
.size
).toBe(3)
1224 for (const name
of pool
.listTaskFunctions()) {
1225 expect(workerNode
.getTaskFunctionWorkerUsage(name
)).toStrictEqual({
1227 executed
: expect
.any(Number
),
1234 history
: expect
.any(CircularArray
)
1237 history
: expect
.any(CircularArray
)
1241 history
: expect
.any(CircularArray
)
1244 history
: expect
.any(CircularArray
)
1249 workerNode
.getTaskFunctionWorkerUsage(name
).tasks
.executed
1250 ).toBeGreaterThan(0)
1253 await pool
.destroy()
1256 it('Verify sendKillMessageToWorker()', async () => {
1257 const pool
= new DynamicClusterPool(
1258 Math
.floor(numberOfWorkers
/ 2),
1260 './tests/worker-files/cluster/testWorker.js'
1262 const workerNodeKey
= 0
1264 pool
.sendKillMessageToWorker(
1266 pool
.workerNodes
[workerNodeKey
].info
.id
1268 ).resolves
.toBeUndefined()
1269 await pool
.destroy()