1 const { expect
} = require('expect')
2 const sinon
= require('sinon')
10 WorkerChoiceStrategies
,
12 } = require('../../../lib')
13 const { CircularArray
} = require('../../../lib/circular-array')
14 const { Deque
} = require('../../../lib/deque')
15 const { DEFAULT_TASK_NAME
} = require('../../../lib/utils')
16 const { version
} = require('../../../package.json')
17 const { waitPoolEvents
} = require('../../test-utils')
19 describe('Abstract pool test suite', () => {
20 const numberOfWorkers
= 2
21 class StubPoolWithIsMain
extends FixedThreadPool
{
31 it('Simulate pool creation from a non main thread/process', () => {
34 new StubPoolWithIsMain(
36 './tests/worker-files/thread/testWorker.js',
38 errorHandler
: (e
) => console
.error(e
)
43 'Cannot start a pool from a worker with the same type as the pool'
48 it('Verify that pool statuses properties are set', async () => {
49 const pool
= new FixedThreadPool(
51 './tests/worker-files/thread/testWorker.js'
53 expect(pool
.starting
).toBe(false)
54 expect(pool
.started
).toBe(true)
56 expect(pool
.started
).toBe(false)
59 it('Verify that filePath is checked', () => {
60 const expectedError
= new Error(
61 'Please specify a file with a worker implementation'
63 expect(() => new FixedThreadPool(numberOfWorkers
)).toThrowError(
66 expect(() => new FixedThreadPool(numberOfWorkers
, '')).toThrowError(
69 expect(() => new FixedThreadPool(numberOfWorkers
, 0)).toThrowError(
72 expect(() => new FixedThreadPool(numberOfWorkers
, true)).toThrowError(
76 () => new FixedThreadPool(numberOfWorkers
, './dummyWorker.ts')
77 ).toThrowError(new Error("Cannot find the worker file './dummyWorker.ts'"))
80 it('Verify that numberOfWorkers is checked', () => {
81 expect(() => new FixedThreadPool()).toThrowError(
83 'Cannot instantiate a pool without specifying the number of workers'
88 it('Verify that a negative number of workers is checked', () => {
91 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
94 'Cannot instantiate a pool with a negative number of workers'
99 it('Verify that a non integer number of workers is checked', () => {
102 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
105 'Cannot instantiate a pool with a non safe integer number of workers'
110 it('Verify that dynamic pool sizing is checked', () => {
113 new DynamicClusterPool(
116 './tests/worker-files/cluster/testWorker.js'
120 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
125 new DynamicThreadPool(
128 './tests/worker-files/thread/testWorker.js'
132 'Cannot instantiate a pool with a non safe integer number of workers'
137 new DynamicClusterPool(
140 './tests/worker-files/cluster/testWorker.js'
144 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
149 new DynamicThreadPool(2, 1, './tests/worker-files/thread/testWorker.js')
152 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
157 new DynamicThreadPool(0, 0, './tests/worker-files/thread/testWorker.js')
160 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
165 new DynamicClusterPool(
168 './tests/worker-files/cluster/testWorker.js'
172 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
177 it('Verify that pool options are checked', async () => {
178 let pool
= new FixedThreadPool(
180 './tests/worker-files/thread/testWorker.js'
182 expect(pool
.emitter
).toBeDefined()
183 expect(pool
.opts
.enableEvents
).toBe(true)
184 expect(pool
.opts
.restartWorkerOnError
).toBe(true)
185 expect(pool
.opts
.enableTasksQueue
).toBe(false)
186 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
187 expect(pool
.opts
.workerChoiceStrategy
).toBe(
188 WorkerChoiceStrategies
.ROUND_ROBIN
190 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
192 runTime
: { median
: false },
193 waitTime
: { median
: false },
194 elu
: { median
: false }
196 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
198 runTime
: { median
: false },
199 waitTime
: { median
: false },
200 elu
: { median
: false }
202 expect(pool
.opts
.messageHandler
).toBeUndefined()
203 expect(pool
.opts
.errorHandler
).toBeUndefined()
204 expect(pool
.opts
.onlineHandler
).toBeUndefined()
205 expect(pool
.opts
.exitHandler
).toBeUndefined()
207 const testHandler
= () => console
.info('test handler executed')
208 pool
= new FixedThreadPool(
210 './tests/worker-files/thread/testWorker.js',
212 workerChoiceStrategy
: WorkerChoiceStrategies
.LEAST_USED
,
213 workerChoiceStrategyOptions
: {
214 runTime
: { median
: true },
215 weights
: { 0: 300, 1: 200 }
218 restartWorkerOnError
: false,
219 enableTasksQueue
: true,
220 tasksQueueOptions
: { concurrency
: 2 },
221 messageHandler
: testHandler
,
222 errorHandler
: testHandler
,
223 onlineHandler
: testHandler
,
224 exitHandler
: testHandler
227 expect(pool
.emitter
).toBeUndefined()
228 expect(pool
.opts
.enableEvents
).toBe(false)
229 expect(pool
.opts
.restartWorkerOnError
).toBe(false)
230 expect(pool
.opts
.enableTasksQueue
).toBe(true)
231 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({
235 expect(pool
.opts
.workerChoiceStrategy
).toBe(
236 WorkerChoiceStrategies
.LEAST_USED
238 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
240 runTime
: { median
: true },
241 waitTime
: { median
: false },
242 elu
: { median
: false },
243 weights
: { 0: 300, 1: 200 }
245 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
247 runTime
: { median
: true },
248 waitTime
: { median
: false },
249 elu
: { median
: false },
250 weights
: { 0: 300, 1: 200 }
252 expect(pool
.opts
.messageHandler
).toStrictEqual(testHandler
)
253 expect(pool
.opts
.errorHandler
).toStrictEqual(testHandler
)
254 expect(pool
.opts
.onlineHandler
).toStrictEqual(testHandler
)
255 expect(pool
.opts
.exitHandler
).toStrictEqual(testHandler
)
259 it('Verify that pool options are validated', async () => {
264 './tests/worker-files/thread/testWorker.js',
266 workerChoiceStrategy
: 'invalidStrategy'
270 new Error("Invalid worker choice strategy 'invalidStrategy'")
276 './tests/worker-files/thread/testWorker.js',
278 workerChoiceStrategyOptions
: {
279 choiceRetries
: 'invalidChoiceRetries'
285 'Invalid worker choice strategy options: choice retries must be an integer'
292 './tests/worker-files/thread/testWorker.js',
294 workerChoiceStrategyOptions
: {
301 "Invalid worker choice strategy options: choice retries '-1' must be greater or equal than zero"
308 './tests/worker-files/thread/testWorker.js',
310 workerChoiceStrategyOptions
: { weights
: {} }
315 'Invalid worker choice strategy options: must have a weight for each worker node'
322 './tests/worker-files/thread/testWorker.js',
324 workerChoiceStrategyOptions
: { measurement
: 'invalidMeasurement' }
329 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
336 './tests/worker-files/thread/testWorker.js',
338 enableTasksQueue
: true,
339 tasksQueueOptions
: 'invalidTasksQueueOptions'
343 new TypeError('Invalid tasks queue options: must be a plain object')
349 './tests/worker-files/thread/testWorker.js',
351 enableTasksQueue
: true,
352 tasksQueueOptions
: { concurrency
: 0 }
357 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
364 './tests/worker-files/thread/testWorker.js',
366 enableTasksQueue
: true,
367 tasksQueueOptions
: { concurrency
: -1 }
372 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
379 './tests/worker-files/thread/testWorker.js',
381 enableTasksQueue
: true,
382 tasksQueueOptions
: { concurrency
: 0.2 }
386 new TypeError('Invalid worker node tasks concurrency: must be an integer')
392 './tests/worker-files/thread/testWorker.js',
394 enableTasksQueue
: true,
395 tasksQueueOptions
: { queueMaxSize
: 2 }
400 'Invalid tasks queue options: queueMaxSize is deprecated, please use size instead'
407 './tests/worker-files/thread/testWorker.js',
409 enableTasksQueue
: true,
410 tasksQueueOptions
: { size
: 0 }
415 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
422 './tests/worker-files/thread/testWorker.js',
424 enableTasksQueue
: true,
425 tasksQueueOptions
: { size
: -1 }
430 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
437 './tests/worker-files/thread/testWorker.js',
439 enableTasksQueue
: true,
440 tasksQueueOptions
: { size
: 0.2 }
444 new TypeError('Invalid worker node tasks queue size: must be an integer')
448 it('Verify that pool worker choice strategy options can be set', async () => {
449 const pool
= new FixedThreadPool(
451 './tests/worker-files/thread/testWorker.js',
452 { workerChoiceStrategy
: WorkerChoiceStrategies
.FAIR_SHARE
}
454 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
456 runTime
: { median
: false },
457 waitTime
: { median
: false },
458 elu
: { median
: false }
460 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
462 runTime
: { median
: false },
463 waitTime
: { median
: false },
464 elu
: { median
: false }
466 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
467 .workerChoiceStrategies
) {
468 expect(workerChoiceStrategy
.opts
).toStrictEqual({
470 runTime
: { median
: false },
471 waitTime
: { median
: false },
472 elu
: { median
: false }
476 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
494 pool
.setWorkerChoiceStrategyOptions({
495 runTime
: { median
: true },
496 elu
: { median
: true }
498 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
500 runTime
: { median
: true },
501 waitTime
: { median
: false },
502 elu
: { median
: true }
504 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
506 runTime
: { median
: true },
507 waitTime
: { median
: false },
508 elu
: { median
: true }
510 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
511 .workerChoiceStrategies
) {
512 expect(workerChoiceStrategy
.opts
).toStrictEqual({
514 runTime
: { median
: true },
515 waitTime
: { median
: false },
516 elu
: { median
: true }
520 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
538 pool
.setWorkerChoiceStrategyOptions({
539 runTime
: { median
: false },
540 elu
: { median
: false }
542 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
544 runTime
: { median
: false },
545 waitTime
: { median
: false },
546 elu
: { median
: false }
548 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
550 runTime
: { median
: false },
551 waitTime
: { median
: false },
552 elu
: { median
: false }
554 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
555 .workerChoiceStrategies
) {
556 expect(workerChoiceStrategy
.opts
).toStrictEqual({
558 runTime
: { median
: false },
559 waitTime
: { median
: false },
560 elu
: { median
: false }
564 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
583 pool
.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
586 'Invalid worker choice strategy options: must be a plain object'
590 pool
.setWorkerChoiceStrategyOptions({
591 choiceRetries
: 'invalidChoiceRetries'
595 'Invalid worker choice strategy options: choice retries must be an integer'
599 pool
.setWorkerChoiceStrategyOptions({ choiceRetries
: -1 })
602 "Invalid worker choice strategy options: choice retries '-1' must be greater or equal than zero"
606 pool
.setWorkerChoiceStrategyOptions({ weights
: {} })
609 'Invalid worker choice strategy options: must have a weight for each worker node'
613 pool
.setWorkerChoiceStrategyOptions({ measurement
: 'invalidMeasurement' })
616 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
622 it('Verify that pool tasks queue can be enabled/disabled', async () => {
623 const pool
= new FixedThreadPool(
625 './tests/worker-files/thread/testWorker.js'
627 expect(pool
.opts
.enableTasksQueue
).toBe(false)
628 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
629 pool
.enableTasksQueue(true)
630 expect(pool
.opts
.enableTasksQueue
).toBe(true)
631 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({
635 pool
.enableTasksQueue(true, { concurrency
: 2 })
636 expect(pool
.opts
.enableTasksQueue
).toBe(true)
637 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({
641 pool
.enableTasksQueue(false)
642 expect(pool
.opts
.enableTasksQueue
).toBe(false)
643 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
647 it('Verify that pool tasks queue options can be set', async () => {
648 const pool
= new FixedThreadPool(
650 './tests/worker-files/thread/testWorker.js',
651 { enableTasksQueue
: true }
653 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({
657 pool
.setTasksQueueOptions({ concurrency
: 2 })
658 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({
663 pool
.setTasksQueueOptions('invalidTasksQueueOptions')
665 new TypeError('Invalid tasks queue options: must be a plain object')
667 expect(() => pool
.setTasksQueueOptions({ concurrency
: 0 })).toThrowError(
669 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
672 expect(() => pool
.setTasksQueueOptions({ concurrency
: -1 })).toThrowError(
674 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
677 expect(() => pool
.setTasksQueueOptions({ concurrency
: 0.2 })).toThrowError(
678 new TypeError('Invalid worker node tasks concurrency: must be an integer')
680 expect(() => pool
.setTasksQueueOptions({ queueMaxSize
: 2 })).toThrowError(
682 'Invalid tasks queue options: queueMaxSize is deprecated, please use size instead'
685 expect(() => pool
.setTasksQueueOptions({ size
: 0 })).toThrowError(
687 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
690 expect(() => pool
.setTasksQueueOptions({ size
: -1 })).toThrowError(
692 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
695 expect(() => pool
.setTasksQueueOptions({ size
: 0.2 })).toThrowError(
696 new TypeError('Invalid worker node tasks queue size: must be an integer')
701 it('Verify that pool info is set', async () => {
702 let pool
= new FixedThreadPool(
704 './tests/worker-files/thread/testWorker.js'
706 expect(pool
.info
).toStrictEqual({
708 type
: PoolTypes
.fixed
,
709 worker
: WorkerTypes
.thread
,
711 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
712 minSize
: numberOfWorkers
,
713 maxSize
: numberOfWorkers
,
714 workerNodes
: numberOfWorkers
,
715 idleWorkerNodes
: numberOfWorkers
,
722 pool
= new DynamicClusterPool(
723 Math
.floor(numberOfWorkers
/ 2),
725 './tests/worker-files/cluster/testWorker.js'
727 expect(pool
.info
).toStrictEqual({
729 type
: PoolTypes
.dynamic
,
730 worker
: WorkerTypes
.cluster
,
732 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
733 minSize
: Math
.floor(numberOfWorkers
/ 2),
734 maxSize
: numberOfWorkers
,
735 workerNodes
: Math
.floor(numberOfWorkers
/ 2),
736 idleWorkerNodes
: Math
.floor(numberOfWorkers
/ 2),
745 it('Verify that pool worker tasks usage are initialized', async () => {
746 const pool
= new FixedClusterPool(
748 './tests/worker-files/cluster/testWorker.js'
750 for (const workerNode
of pool
.workerNodes
) {
751 expect(workerNode
.usage
).toStrictEqual({
761 history
: expect
.any(CircularArray
)
764 history
: expect
.any(CircularArray
)
768 history
: expect
.any(CircularArray
)
771 history
: expect
.any(CircularArray
)
779 it('Verify that pool worker tasks queue are initialized', async () => {
780 let pool
= new FixedClusterPool(
782 './tests/worker-files/cluster/testWorker.js'
784 for (const workerNode
of pool
.workerNodes
) {
785 expect(workerNode
.tasksQueue
).toBeDefined()
786 expect(workerNode
.tasksQueue
).toBeInstanceOf(Deque
)
787 expect(workerNode
.tasksQueue
.size
).toBe(0)
788 expect(workerNode
.tasksQueue
.maxSize
).toBe(0)
791 pool
= new DynamicThreadPool(
792 Math
.floor(numberOfWorkers
/ 2),
794 './tests/worker-files/thread/testWorker.js'
796 for (const workerNode
of pool
.workerNodes
) {
797 expect(workerNode
.tasksQueue
).toBeDefined()
798 expect(workerNode
.tasksQueue
).toBeInstanceOf(Deque
)
799 expect(workerNode
.tasksQueue
.size
).toBe(0)
800 expect(workerNode
.tasksQueue
.maxSize
).toBe(0)
805 it('Verify that pool worker info are initialized', async () => {
806 let pool
= new FixedClusterPool(
808 './tests/worker-files/cluster/testWorker.js'
810 for (const workerNode
of pool
.workerNodes
) {
811 expect(workerNode
.info
).toStrictEqual({
812 id
: expect
.any(Number
),
813 type
: WorkerTypes
.cluster
,
819 pool
= new DynamicThreadPool(
820 Math
.floor(numberOfWorkers
/ 2),
822 './tests/worker-files/thread/testWorker.js'
824 for (const workerNode
of pool
.workerNodes
) {
825 expect(workerNode
.info
).toStrictEqual({
826 id
: expect
.any(Number
),
827 type
: WorkerTypes
.thread
,
835 it('Verify that pool execute() arguments are checked', async () => {
836 const pool
= new FixedClusterPool(
838 './tests/worker-files/cluster/testWorker.js'
840 await
expect(pool
.execute(undefined, 0)).rejects
.toThrowError(
841 new TypeError('name argument must be a string')
843 await
expect(pool
.execute(undefined, '')).rejects
.toThrowError(
844 new TypeError('name argument must not be an empty string')
846 await
expect(pool
.execute(undefined, undefined, {})).rejects
.toThrowError(
847 new TypeError('transferList argument must be an array')
849 await
expect(pool
.execute(undefined, 'unknown')).rejects
.toBe(
850 "Task function 'unknown' not found"
853 await
expect(pool
.execute(undefined, undefined, {})).rejects
.toThrowError(
854 new Error('Cannot execute a task on destroyed pool')
858 it('Verify that pool worker tasks usage are computed', async () => {
859 const pool
= new FixedClusterPool(
861 './tests/worker-files/cluster/testWorker.js'
863 const promises
= new Set()
864 const maxMultiplier
= 2
865 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
866 promises
.add(pool
.execute())
868 for (const workerNode
of pool
.workerNodes
) {
869 expect(workerNode
.usage
).toStrictEqual({
872 executing
: maxMultiplier
,
879 history
: expect
.any(CircularArray
)
882 history
: expect
.any(CircularArray
)
886 history
: expect
.any(CircularArray
)
889 history
: expect
.any(CircularArray
)
894 await Promise
.all(promises
)
895 for (const workerNode
of pool
.workerNodes
) {
896 expect(workerNode
.usage
).toStrictEqual({
898 executed
: maxMultiplier
,
906 history
: expect
.any(CircularArray
)
909 history
: expect
.any(CircularArray
)
913 history
: expect
.any(CircularArray
)
916 history
: expect
.any(CircularArray
)
924 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
925 const pool
= new DynamicThreadPool(
926 Math
.floor(numberOfWorkers
/ 2),
928 './tests/worker-files/thread/testWorker.js'
930 const promises
= new Set()
931 const maxMultiplier
= 2
932 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
933 promises
.add(pool
.execute())
935 await Promise
.all(promises
)
936 for (const workerNode
of pool
.workerNodes
) {
937 expect(workerNode
.usage
).toStrictEqual({
939 executed
: expect
.any(Number
),
947 history
: expect
.any(CircularArray
)
950 history
: expect
.any(CircularArray
)
954 history
: expect
.any(CircularArray
)
957 history
: expect
.any(CircularArray
)
961 expect(workerNode
.usage
.tasks
.executed
).toBeGreaterThan(0)
962 expect(workerNode
.usage
.tasks
.executed
).toBeLessThanOrEqual(
963 numberOfWorkers
* maxMultiplier
965 expect(workerNode
.usage
.runTime
.history
.length
).toBe(0)
966 expect(workerNode
.usage
.waitTime
.history
.length
).toBe(0)
967 expect(workerNode
.usage
.elu
.idle
.history
.length
).toBe(0)
968 expect(workerNode
.usage
.elu
.active
.history
.length
).toBe(0)
970 pool
.setWorkerChoiceStrategy(WorkerChoiceStrategies
.FAIR_SHARE
)
971 for (const workerNode
of pool
.workerNodes
) {
972 expect(workerNode
.usage
).toStrictEqual({
982 history
: expect
.any(CircularArray
)
985 history
: expect
.any(CircularArray
)
989 history
: expect
.any(CircularArray
)
992 history
: expect
.any(CircularArray
)
996 expect(workerNode
.usage
.runTime
.history
.length
).toBe(0)
997 expect(workerNode
.usage
.waitTime
.history
.length
).toBe(0)
998 expect(workerNode
.usage
.elu
.idle
.history
.length
).toBe(0)
999 expect(workerNode
.usage
.elu
.active
.history
.length
).toBe(0)
1001 await pool
.destroy()
1004 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1005 const pool
= new DynamicClusterPool(
1006 Math
.floor(numberOfWorkers
/ 2),
1008 './tests/worker-files/cluster/testWorker.js'
1012 pool
.emitter
.on(PoolEvents
.ready
, (info
) => {
1016 await
waitPoolEvents(pool
, PoolEvents
.ready
, 1)
1017 expect(poolReady
).toBe(1)
1018 expect(poolInfo
).toStrictEqual({
1020 type
: PoolTypes
.dynamic
,
1021 worker
: WorkerTypes
.cluster
,
1023 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
1024 minSize
: expect
.any(Number
),
1025 maxSize
: expect
.any(Number
),
1026 workerNodes
: expect
.any(Number
),
1027 idleWorkerNodes
: expect
.any(Number
),
1028 busyWorkerNodes
: expect
.any(Number
),
1029 executedTasks
: expect
.any(Number
),
1030 executingTasks
: expect
.any(Number
),
1031 failedTasks
: expect
.any(Number
)
1033 await pool
.destroy()
1036 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1037 const pool
= new FixedThreadPool(
1039 './tests/worker-files/thread/testWorker.js'
1041 const promises
= new Set()
1044 pool
.emitter
.on(PoolEvents
.busy
, (info
) => {
1048 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
1049 promises
.add(pool
.execute())
1051 await Promise
.all(promises
)
1052 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1053 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1054 expect(poolBusy
).toBe(numberOfWorkers
+ 1)
1055 expect(poolInfo
).toStrictEqual({
1057 type
: PoolTypes
.fixed
,
1058 worker
: WorkerTypes
.thread
,
1059 ready
: expect
.any(Boolean
),
1060 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
1061 minSize
: expect
.any(Number
),
1062 maxSize
: expect
.any(Number
),
1063 workerNodes
: expect
.any(Number
),
1064 idleWorkerNodes
: expect
.any(Number
),
1065 busyWorkerNodes
: expect
.any(Number
),
1066 executedTasks
: expect
.any(Number
),
1067 executingTasks
: expect
.any(Number
),
1068 failedTasks
: expect
.any(Number
)
1070 await pool
.destroy()
1073 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1074 const pool
= new DynamicThreadPool(
1075 Math
.floor(numberOfWorkers
/ 2),
1077 './tests/worker-files/thread/testWorker.js'
1079 const promises
= new Set()
1082 pool
.emitter
.on(PoolEvents
.full
, (info
) => {
1086 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
1087 promises
.add(pool
.execute())
1089 await Promise
.all(promises
)
1090 expect(poolFull
).toBe(1)
1091 expect(poolInfo
).toStrictEqual({
1093 type
: PoolTypes
.dynamic
,
1094 worker
: WorkerTypes
.thread
,
1095 ready
: expect
.any(Boolean
),
1096 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
1097 minSize
: expect
.any(Number
),
1098 maxSize
: expect
.any(Number
),
1099 workerNodes
: expect
.any(Number
),
1100 idleWorkerNodes
: expect
.any(Number
),
1101 busyWorkerNodes
: expect
.any(Number
),
1102 executedTasks
: expect
.any(Number
),
1103 executingTasks
: expect
.any(Number
),
1104 failedTasks
: expect
.any(Number
)
1106 await pool
.destroy()
1109 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
1110 const pool
= new FixedThreadPool(
1112 './tests/worker-files/thread/testWorker.js',
1114 enableTasksQueue
: true
1117 sinon
.stub(pool
, 'hasBackPressure').returns(true)
1118 const promises
= new Set()
1119 let poolBackPressure
= 0
1121 pool
.emitter
.on(PoolEvents
.backPressure
, (info
) => {
1125 for (let i
= 0; i
< numberOfWorkers
+ 1; i
++) {
1126 promises
.add(pool
.execute())
1128 await Promise
.all(promises
)
1129 expect(poolBackPressure
).toBe(1)
1130 expect(poolInfo
).toStrictEqual({
1132 type
: PoolTypes
.fixed
,
1133 worker
: WorkerTypes
.thread
,
1134 ready
: expect
.any(Boolean
),
1135 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
1136 minSize
: expect
.any(Number
),
1137 maxSize
: expect
.any(Number
),
1138 workerNodes
: expect
.any(Number
),
1139 idleWorkerNodes
: expect
.any(Number
),
1140 busyWorkerNodes
: expect
.any(Number
),
1141 executedTasks
: expect
.any(Number
),
1142 executingTasks
: expect
.any(Number
),
1143 maxQueuedTasks
: expect
.any(Number
),
1144 queuedTasks
: expect
.any(Number
),
1146 stolenTasks
: expect
.any(Number
),
1147 failedTasks
: expect
.any(Number
)
1149 expect(pool
.hasBackPressure
.called
).toBe(true)
1150 await pool
.destroy()
1153 it('Verify that listTaskFunctions() is working', async () => {
1154 const dynamicThreadPool
= new DynamicThreadPool(
1155 Math
.floor(numberOfWorkers
/ 2),
1157 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.js'
1159 await
waitPoolEvents(dynamicThreadPool
, PoolEvents
.ready
, 1)
1160 expect(dynamicThreadPool
.listTaskFunctions()).toStrictEqual([
1162 'jsonIntegerSerialization',
1166 const fixedClusterPool
= new FixedClusterPool(
1168 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1170 await
waitPoolEvents(fixedClusterPool
, PoolEvents
.ready
, 1)
1171 expect(fixedClusterPool
.listTaskFunctions()).toStrictEqual([
1173 'jsonIntegerSerialization',
1177 await dynamicThreadPool
.destroy()
1178 await fixedClusterPool
.destroy()
1181 it('Verify that multiple task functions worker is working', async () => {
1182 const pool
= new DynamicClusterPool(
1183 Math
.floor(numberOfWorkers
/ 2),
1185 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1187 const data
= { n
: 10 }
1188 const result0
= await pool
.execute(data
)
1189 expect(result0
).toStrictEqual({ ok
: 1 })
1190 const result1
= await pool
.execute(data
, 'jsonIntegerSerialization')
1191 expect(result1
).toStrictEqual({ ok
: 1 })
1192 const result2
= await pool
.execute(data
, 'factorial')
1193 expect(result2
).toBe(3628800)
1194 const result3
= await pool
.execute(data
, 'fibonacci')
1195 expect(result3
).toBe(55)
1196 expect(pool
.info
.executingTasks
).toBe(0)
1197 expect(pool
.info
.executedTasks
).toBe(4)
1198 for (const workerNode
of pool
.workerNodes
) {
1199 expect(workerNode
.info
.taskFunctions
).toStrictEqual([
1201 'jsonIntegerSerialization',
1205 expect(workerNode
.taskFunctionsUsage
.size
).toBe(3)
1206 for (const name
of pool
.listTaskFunctions()) {
1207 expect(workerNode
.getTaskFunctionWorkerUsage(name
)).toStrictEqual({
1209 executed
: expect
.any(Number
),
1210 executing
: expect
.any(Number
),
1216 history
: expect
.any(CircularArray
)
1219 history
: expect
.any(CircularArray
)
1223 history
: expect
.any(CircularArray
)
1226 history
: expect
.any(CircularArray
)
1231 workerNode
.getTaskFunctionWorkerUsage(name
).tasks
.executing
1232 ).toBeGreaterThanOrEqual(0)
1235 await pool
.destroy()