1 const { EventEmitter
} = require('node:events')
2 const { expect
} = require('expect')
3 const sinon
= require('sinon')
11 WorkerChoiceStrategies
,
13 } = require('../../../lib')
14 const { CircularArray
} = require('../../../lib/circular-array')
15 const { Deque
} = require('../../../lib/deque')
16 const { DEFAULT_TASK_NAME
} = require('../../../lib/utils')
17 const { version
} = require('../../../package.json')
18 const { waitPoolEvents
} = require('../../test-utils')
19 const { WorkerNode
} = require('../../../lib/pools/worker-node')
21 describe('Abstract pool test suite', () => {
22 const numberOfWorkers
= 2
23 class StubPoolWithIsMain
extends FixedThreadPool
{
33 it('Simulate pool creation from a non main thread/process', () => {
36 new StubPoolWithIsMain(
38 './tests/worker-files/thread/testWorker.js',
40 errorHandler
: e
=> console
.error(e
)
45 'Cannot start a pool from a worker with the same type as the pool'
50 it('Verify that pool statuses properties are set', async () => {
51 const pool
= new FixedThreadPool(
53 './tests/worker-files/thread/testWorker.js'
55 expect(pool
.starting
).toBe(false)
56 expect(pool
.started
).toBe(true)
60 it('Verify that filePath is checked', () => {
61 const expectedError
= new Error(
62 'Please specify a file with a worker implementation'
64 expect(() => new FixedThreadPool(numberOfWorkers
)).toThrowError(
67 expect(() => new FixedThreadPool(numberOfWorkers
, '')).toThrowError(
70 expect(() => new FixedThreadPool(numberOfWorkers
, 0)).toThrowError(
73 expect(() => new FixedThreadPool(numberOfWorkers
, true)).toThrowError(
77 () => new FixedThreadPool(numberOfWorkers
, './dummyWorker.ts')
78 ).toThrowError(new Error("Cannot find the worker file './dummyWorker.ts'"))
81 it('Verify that numberOfWorkers is checked', () => {
82 expect(() => new FixedThreadPool()).toThrowError(
84 'Cannot instantiate a pool without specifying the number of workers'
89 it('Verify that a negative number of workers is checked', () => {
92 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
95 'Cannot instantiate a pool with a negative number of workers'
100 it('Verify that a non integer number of workers is checked', () => {
103 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
106 'Cannot instantiate a pool with a non safe integer number of workers'
111 it('Verify that dynamic pool sizing is checked', () => {
114 new DynamicClusterPool(
117 './tests/worker-files/cluster/testWorker.js'
121 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
126 new DynamicThreadPool(
129 './tests/worker-files/thread/testWorker.js'
133 'Cannot instantiate a pool with a non safe integer number of workers'
138 new DynamicClusterPool(
141 './tests/worker-files/cluster/testWorker.js'
145 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
150 new DynamicThreadPool(2, 1, './tests/worker-files/thread/testWorker.js')
153 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
158 new DynamicThreadPool(0, 0, './tests/worker-files/thread/testWorker.js')
161 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
166 new DynamicClusterPool(
169 './tests/worker-files/cluster/testWorker.js'
173 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
178 it('Verify that pool options are checked', async () => {
179 let pool
= new FixedThreadPool(
181 './tests/worker-files/thread/testWorker.js'
183 expect(pool
.emitter
).toBeInstanceOf(EventEmitter
)
184 expect(pool
.opts
).toStrictEqual({
187 restartWorkerOnError
: true,
188 enableTasksQueue
: false,
189 workerChoiceStrategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
190 workerChoiceStrategyOptions
: {
192 runTime
: { median
: false },
193 waitTime
: { median
: false },
194 elu
: { median
: false }
197 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
199 runTime
: { median
: false },
200 waitTime
: { median
: false },
201 elu
: { median
: false }
203 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
204 .workerChoiceStrategies
) {
205 expect(workerChoiceStrategy
.opts
).toStrictEqual({
207 runTime
: { median
: false },
208 waitTime
: { median
: false },
209 elu
: { median
: false }
213 const testHandler
= () => console
.info('test handler executed')
214 pool
= new FixedThreadPool(
216 './tests/worker-files/thread/testWorker.js',
218 workerChoiceStrategy
: WorkerChoiceStrategies
.LEAST_USED
,
219 workerChoiceStrategyOptions
: {
220 runTime
: { median
: true },
221 weights
: { 0: 300, 1: 200 }
224 restartWorkerOnError
: false,
225 enableTasksQueue
: true,
226 tasksQueueOptions
: { concurrency
: 2 },
227 messageHandler
: testHandler
,
228 errorHandler
: testHandler
,
229 onlineHandler
: testHandler
,
230 exitHandler
: testHandler
233 expect(pool
.emitter
).toBeUndefined()
234 expect(pool
.opts
).toStrictEqual({
237 restartWorkerOnError
: false,
238 enableTasksQueue
: true,
243 tasksStealingOnBackPressure
: true
245 workerChoiceStrategy
: WorkerChoiceStrategies
.LEAST_USED
,
246 workerChoiceStrategyOptions
: {
248 runTime
: { median
: true },
249 waitTime
: { median
: false },
250 elu
: { median
: false },
251 weights
: { 0: 300, 1: 200 }
253 onlineHandler
: testHandler
,
254 messageHandler
: testHandler
,
255 errorHandler
: testHandler
,
256 exitHandler
: testHandler
258 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
260 runTime
: { median
: true },
261 waitTime
: { median
: false },
262 elu
: { median
: false },
263 weights
: { 0: 300, 1: 200 }
265 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
266 .workerChoiceStrategies
) {
267 expect(workerChoiceStrategy
.opts
).toStrictEqual({
269 runTime
: { median
: true },
270 waitTime
: { median
: false },
271 elu
: { median
: false },
272 weights
: { 0: 300, 1: 200 }
278 it('Verify that pool options are validated', async () => {
283 './tests/worker-files/thread/testWorker.js',
285 workerChoiceStrategy
: 'invalidStrategy'
289 new Error("Invalid worker choice strategy 'invalidStrategy'")
295 './tests/worker-files/thread/testWorker.js',
297 workerChoiceStrategyOptions
: {
298 retries
: 'invalidChoiceRetries'
304 'Invalid worker choice strategy options: retries must be an integer'
311 './tests/worker-files/thread/testWorker.js',
313 workerChoiceStrategyOptions
: {
320 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
327 './tests/worker-files/thread/testWorker.js',
329 workerChoiceStrategyOptions
: { weights
: {} }
334 'Invalid worker choice strategy options: must have a weight for each worker node'
341 './tests/worker-files/thread/testWorker.js',
343 workerChoiceStrategyOptions
: { measurement
: 'invalidMeasurement' }
348 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
355 './tests/worker-files/thread/testWorker.js',
357 enableTasksQueue
: true,
358 tasksQueueOptions
: 'invalidTasksQueueOptions'
362 new TypeError('Invalid tasks queue options: must be a plain object')
368 './tests/worker-files/thread/testWorker.js',
370 enableTasksQueue
: true,
371 tasksQueueOptions
: { concurrency
: 0 }
376 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
383 './tests/worker-files/thread/testWorker.js',
385 enableTasksQueue
: true,
386 tasksQueueOptions
: { concurrency
: -1 }
391 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
398 './tests/worker-files/thread/testWorker.js',
400 enableTasksQueue
: true,
401 tasksQueueOptions
: { concurrency
: 0.2 }
405 new TypeError('Invalid worker node tasks concurrency: must be an integer')
411 './tests/worker-files/thread/testWorker.js',
413 enableTasksQueue
: true,
414 tasksQueueOptions
: { size
: 0 }
419 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
426 './tests/worker-files/thread/testWorker.js',
428 enableTasksQueue
: true,
429 tasksQueueOptions
: { size
: -1 }
434 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
441 './tests/worker-files/thread/testWorker.js',
443 enableTasksQueue
: true,
444 tasksQueueOptions
: { size
: 0.2 }
448 new TypeError('Invalid worker node tasks queue size: must be an integer')
452 it('Verify that pool worker choice strategy options can be set', async () => {
453 const pool
= new FixedThreadPool(
455 './tests/worker-files/thread/testWorker.js',
456 { workerChoiceStrategy
: WorkerChoiceStrategies
.FAIR_SHARE
}
458 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
460 runTime
: { median
: false },
461 waitTime
: { median
: false },
462 elu
: { median
: false }
464 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
466 runTime
: { median
: false },
467 waitTime
: { median
: false },
468 elu
: { median
: false }
470 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
471 .workerChoiceStrategies
) {
472 expect(workerChoiceStrategy
.opts
).toStrictEqual({
474 runTime
: { median
: false },
475 waitTime
: { median
: false },
476 elu
: { median
: false }
480 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
498 pool
.setWorkerChoiceStrategyOptions({
499 runTime
: { median
: true },
500 elu
: { median
: true }
502 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
504 runTime
: { median
: true },
505 waitTime
: { median
: false },
506 elu
: { median
: true }
508 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
510 runTime
: { median
: true },
511 waitTime
: { median
: false },
512 elu
: { median
: true }
514 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
515 .workerChoiceStrategies
) {
516 expect(workerChoiceStrategy
.opts
).toStrictEqual({
518 runTime
: { median
: true },
519 waitTime
: { median
: false },
520 elu
: { median
: true }
524 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
542 pool
.setWorkerChoiceStrategyOptions({
543 runTime
: { median
: false },
544 elu
: { median
: false }
546 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
548 runTime
: { median
: false },
549 waitTime
: { median
: false },
550 elu
: { median
: false }
552 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
554 runTime
: { median
: false },
555 waitTime
: { median
: false },
556 elu
: { median
: false }
558 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
559 .workerChoiceStrategies
) {
560 expect(workerChoiceStrategy
.opts
).toStrictEqual({
562 runTime
: { median
: false },
563 waitTime
: { median
: false },
564 elu
: { median
: false }
568 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
587 pool
.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
590 'Invalid worker choice strategy options: must be a plain object'
594 pool
.setWorkerChoiceStrategyOptions({
595 retries
: 'invalidChoiceRetries'
599 'Invalid worker choice strategy options: retries must be an integer'
603 pool
.setWorkerChoiceStrategyOptions({ retries
: -1 })
606 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
610 pool
.setWorkerChoiceStrategyOptions({ weights
: {} })
613 'Invalid worker choice strategy options: must have a weight for each worker node'
617 pool
.setWorkerChoiceStrategyOptions({ measurement
: 'invalidMeasurement' })
620 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
626 it('Verify that pool tasks queue can be enabled/disabled', async () => {
627 const pool
= new FixedThreadPool(
629 './tests/worker-files/thread/testWorker.js'
631 expect(pool
.opts
.enableTasksQueue
).toBe(false)
632 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
633 for (const workerNode
of pool
.workerNodes
) {
634 expect(workerNode
.onEmptyQueue
).toBeUndefined()
635 expect(workerNode
.onBackPressure
).toBeUndefined()
637 pool
.enableTasksQueue(true)
638 expect(pool
.opts
.enableTasksQueue
).toBe(true)
639 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({
643 tasksStealingOnBackPressure
: true
645 for (const workerNode
of pool
.workerNodes
) {
646 expect(workerNode
.onEmptyQueue
).toBeInstanceOf(Function
)
647 expect(workerNode
.onBackPressure
).toBeInstanceOf(Function
)
649 pool
.enableTasksQueue(true, { concurrency
: 2 })
650 expect(pool
.opts
.enableTasksQueue
).toBe(true)
651 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({
655 tasksStealingOnBackPressure
: true
657 for (const workerNode
of pool
.workerNodes
) {
658 expect(workerNode
.onEmptyQueue
).toBeInstanceOf(Function
)
659 expect(workerNode
.onBackPressure
).toBeInstanceOf(Function
)
661 pool
.enableTasksQueue(false)
662 expect(pool
.opts
.enableTasksQueue
).toBe(false)
663 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
664 for (const workerNode
of pool
.workerNodes
) {
665 expect(workerNode
.onEmptyQueue
).toBeUndefined()
666 expect(workerNode
.onBackPressure
).toBeUndefined()
671 it('Verify that pool tasks queue options can be set', async () => {
672 const pool
= new FixedThreadPool(
674 './tests/worker-files/thread/testWorker.js',
675 { enableTasksQueue
: true }
677 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({
681 tasksStealingOnBackPressure
: true
683 for (const workerNode
of pool
.workerNodes
) {
684 expect(workerNode
.onEmptyQueue
).toBeInstanceOf(Function
)
685 expect(workerNode
.onBackPressure
).toBeInstanceOf(Function
)
687 pool
.setTasksQueueOptions({
690 tasksStealingOnBackPressure
: false
692 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({
696 tasksStealingOnBackPressure
: false
698 for (const workerNode
of pool
.workerNodes
) {
699 expect(workerNode
.onEmptyQueue
).toBeUndefined()
700 expect(workerNode
.onBackPressure
).toBeUndefined()
702 pool
.setTasksQueueOptions({
705 tasksStealingOnBackPressure
: true
707 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({
711 tasksStealingOnBackPressure
: true
713 for (const workerNode
of pool
.workerNodes
) {
714 expect(workerNode
.onEmptyQueue
).toBeInstanceOf(Function
)
715 expect(workerNode
.onBackPressure
).toBeInstanceOf(Function
)
718 pool
.setTasksQueueOptions('invalidTasksQueueOptions')
720 new TypeError('Invalid tasks queue options: must be a plain object')
722 expect(() => pool
.setTasksQueueOptions({ concurrency
: 0 })).toThrowError(
724 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
727 expect(() => pool
.setTasksQueueOptions({ concurrency
: -1 })).toThrowError(
729 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
732 expect(() => pool
.setTasksQueueOptions({ concurrency
: 0.2 })).toThrowError(
733 new TypeError('Invalid worker node tasks concurrency: must be an integer')
735 expect(() => pool
.setTasksQueueOptions({ size
: 0 })).toThrowError(
737 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
740 expect(() => pool
.setTasksQueueOptions({ size
: -1 })).toThrowError(
742 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
745 expect(() => pool
.setTasksQueueOptions({ size
: 0.2 })).toThrowError(
746 new TypeError('Invalid worker node tasks queue size: must be an integer')
751 it('Verify that pool info is set', async () => {
752 let pool
= new FixedThreadPool(
754 './tests/worker-files/thread/testWorker.js'
756 expect(pool
.info
).toStrictEqual({
758 type
: PoolTypes
.fixed
,
759 worker
: WorkerTypes
.thread
,
762 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
763 minSize
: numberOfWorkers
,
764 maxSize
: numberOfWorkers
,
765 workerNodes
: numberOfWorkers
,
766 idleWorkerNodes
: numberOfWorkers
,
773 pool
= new DynamicClusterPool(
774 Math
.floor(numberOfWorkers
/ 2),
776 './tests/worker-files/cluster/testWorker.js'
778 expect(pool
.info
).toStrictEqual({
780 type
: PoolTypes
.dynamic
,
781 worker
: WorkerTypes
.cluster
,
784 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
785 minSize
: Math
.floor(numberOfWorkers
/ 2),
786 maxSize
: numberOfWorkers
,
787 workerNodes
: Math
.floor(numberOfWorkers
/ 2),
788 idleWorkerNodes
: Math
.floor(numberOfWorkers
/ 2),
797 it('Verify that pool worker tasks usage are initialized', async () => {
798 const pool
= new FixedClusterPool(
800 './tests/worker-files/cluster/testWorker.js'
802 for (const workerNode
of pool
.workerNodes
) {
803 expect(workerNode
).toBeInstanceOf(WorkerNode
)
804 expect(workerNode
.usage
).toStrictEqual({
814 history
: new CircularArray()
817 history
: new CircularArray()
821 history
: new CircularArray()
824 history
: new CircularArray()
832 it('Verify that pool worker tasks queue are initialized', async () => {
833 let pool
= new FixedClusterPool(
835 './tests/worker-files/cluster/testWorker.js'
837 for (const workerNode
of pool
.workerNodes
) {
838 expect(workerNode
).toBeInstanceOf(WorkerNode
)
839 expect(workerNode
.tasksQueue
).toBeInstanceOf(Deque
)
840 expect(workerNode
.tasksQueue
.size
).toBe(0)
841 expect(workerNode
.tasksQueue
.maxSize
).toBe(0)
844 pool
= new DynamicThreadPool(
845 Math
.floor(numberOfWorkers
/ 2),
847 './tests/worker-files/thread/testWorker.js'
849 for (const workerNode
of pool
.workerNodes
) {
850 expect(workerNode
).toBeInstanceOf(WorkerNode
)
851 expect(workerNode
.tasksQueue
).toBeInstanceOf(Deque
)
852 expect(workerNode
.tasksQueue
.size
).toBe(0)
853 expect(workerNode
.tasksQueue
.maxSize
).toBe(0)
858 it('Verify that pool worker info are initialized', async () => {
859 let pool
= new FixedClusterPool(
861 './tests/worker-files/cluster/testWorker.js'
863 for (const workerNode
of pool
.workerNodes
) {
864 expect(workerNode
).toBeInstanceOf(WorkerNode
)
865 expect(workerNode
.info
).toStrictEqual({
866 id
: expect
.any(Number
),
867 type
: WorkerTypes
.cluster
,
873 pool
= new DynamicThreadPool(
874 Math
.floor(numberOfWorkers
/ 2),
876 './tests/worker-files/thread/testWorker.js'
878 for (const workerNode
of pool
.workerNodes
) {
879 expect(workerNode
).toBeInstanceOf(WorkerNode
)
880 expect(workerNode
.info
).toStrictEqual({
881 id
: expect
.any(Number
),
882 type
: WorkerTypes
.thread
,
890 it('Verify that pool can be started after initialization', async () => {
891 const pool
= new FixedClusterPool(
893 './tests/worker-files/cluster/testWorker.js',
898 expect(pool
.info
.started
).toBe(false)
899 expect(pool
.info
.ready
).toBe(false)
900 expect(pool
.workerNodes
).toStrictEqual([])
901 await
expect(pool
.execute()).rejects
.toThrowError(
902 new Error('Cannot execute a task on not started pool')
905 expect(pool
.info
.started
).toBe(true)
906 expect(pool
.info
.ready
).toBe(true)
907 expect(pool
.workerNodes
.length
).toBe(numberOfWorkers
)
908 for (const workerNode
of pool
.workerNodes
) {
909 expect(workerNode
).toBeInstanceOf(WorkerNode
)
914 it('Verify that pool execute() arguments are checked', async () => {
915 const pool
= new FixedClusterPool(
917 './tests/worker-files/cluster/testWorker.js'
919 await
expect(pool
.execute(undefined, 0)).rejects
.toThrowError(
920 new TypeError('name argument must be a string')
922 await
expect(pool
.execute(undefined, '')).rejects
.toThrowError(
923 new TypeError('name argument must not be an empty string')
925 await
expect(pool
.execute(undefined, undefined, {})).rejects
.toThrowError(
926 new TypeError('transferList argument must be an array')
928 await
expect(pool
.execute(undefined, 'unknown')).rejects
.toBe(
929 "Task function 'unknown' not found"
932 await
expect(pool
.execute()).rejects
.toThrowError(
933 new Error('Cannot execute a task on not started pool')
937 it('Verify that pool worker tasks usage are computed', async () => {
938 const pool
= new FixedClusterPool(
940 './tests/worker-files/cluster/testWorker.js'
942 const promises
= new Set()
943 const maxMultiplier
= 2
944 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
945 promises
.add(pool
.execute())
947 for (const workerNode
of pool
.workerNodes
) {
948 expect(workerNode
.usage
).toStrictEqual({
951 executing
: maxMultiplier
,
958 history
: expect
.any(CircularArray
)
961 history
: expect
.any(CircularArray
)
965 history
: expect
.any(CircularArray
)
968 history
: expect
.any(CircularArray
)
973 await Promise
.all(promises
)
974 for (const workerNode
of pool
.workerNodes
) {
975 expect(workerNode
.usage
).toStrictEqual({
977 executed
: maxMultiplier
,
985 history
: expect
.any(CircularArray
)
988 history
: expect
.any(CircularArray
)
992 history
: expect
.any(CircularArray
)
995 history
: expect
.any(CircularArray
)
1000 await pool
.destroy()
1003 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
1004 const pool
= new DynamicThreadPool(
1005 Math
.floor(numberOfWorkers
/ 2),
1007 './tests/worker-files/thread/testWorker.js'
1009 const promises
= new Set()
1010 const maxMultiplier
= 2
1011 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
1012 promises
.add(pool
.execute())
1014 await Promise
.all(promises
)
1015 for (const workerNode
of pool
.workerNodes
) {
1016 expect(workerNode
.usage
).toStrictEqual({
1018 executed
: expect
.any(Number
),
1026 history
: expect
.any(CircularArray
)
1029 history
: expect
.any(CircularArray
)
1033 history
: expect
.any(CircularArray
)
1036 history
: expect
.any(CircularArray
)
1040 expect(workerNode
.usage
.tasks
.executed
).toBeGreaterThan(0)
1041 expect(workerNode
.usage
.tasks
.executed
).toBeLessThanOrEqual(
1042 numberOfWorkers
* maxMultiplier
1044 expect(workerNode
.usage
.runTime
.history
.length
).toBe(0)
1045 expect(workerNode
.usage
.waitTime
.history
.length
).toBe(0)
1046 expect(workerNode
.usage
.elu
.idle
.history
.length
).toBe(0)
1047 expect(workerNode
.usage
.elu
.active
.history
.length
).toBe(0)
1049 pool
.setWorkerChoiceStrategy(WorkerChoiceStrategies
.FAIR_SHARE
)
1050 for (const workerNode
of pool
.workerNodes
) {
1051 expect(workerNode
.usage
).toStrictEqual({
1061 history
: expect
.any(CircularArray
)
1064 history
: expect
.any(CircularArray
)
1068 history
: expect
.any(CircularArray
)
1071 history
: expect
.any(CircularArray
)
1075 expect(workerNode
.usage
.runTime
.history
.length
).toBe(0)
1076 expect(workerNode
.usage
.waitTime
.history
.length
).toBe(0)
1077 expect(workerNode
.usage
.elu
.idle
.history
.length
).toBe(0)
1078 expect(workerNode
.usage
.elu
.active
.history
.length
).toBe(0)
1080 await pool
.destroy()
1083 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1084 const pool
= new DynamicClusterPool(
1085 Math
.floor(numberOfWorkers
/ 2),
1087 './tests/worker-files/cluster/testWorker.js'
1091 pool
.emitter
.on(PoolEvents
.ready
, info
=> {
1095 await
waitPoolEvents(pool
, PoolEvents
.ready
, 1)
1096 expect(poolReady
).toBe(1)
1097 expect(poolInfo
).toStrictEqual({
1099 type
: PoolTypes
.dynamic
,
1100 worker
: WorkerTypes
.cluster
,
1103 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
1104 minSize
: expect
.any(Number
),
1105 maxSize
: expect
.any(Number
),
1106 workerNodes
: expect
.any(Number
),
1107 idleWorkerNodes
: expect
.any(Number
),
1108 busyWorkerNodes
: expect
.any(Number
),
1109 executedTasks
: expect
.any(Number
),
1110 executingTasks
: expect
.any(Number
),
1111 failedTasks
: expect
.any(Number
)
1113 await pool
.destroy()
1116 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1117 const pool
= new FixedThreadPool(
1119 './tests/worker-files/thread/testWorker.js'
1121 const promises
= new Set()
1124 pool
.emitter
.on(PoolEvents
.busy
, info
=> {
1128 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
1129 promises
.add(pool
.execute())
1131 await Promise
.all(promises
)
1132 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1133 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1134 expect(poolBusy
).toBe(numberOfWorkers
+ 1)
1135 expect(poolInfo
).toStrictEqual({
1137 type
: PoolTypes
.fixed
,
1138 worker
: WorkerTypes
.thread
,
1141 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
1142 minSize
: expect
.any(Number
),
1143 maxSize
: expect
.any(Number
),
1144 workerNodes
: expect
.any(Number
),
1145 idleWorkerNodes
: expect
.any(Number
),
1146 busyWorkerNodes
: expect
.any(Number
),
1147 executedTasks
: expect
.any(Number
),
1148 executingTasks
: expect
.any(Number
),
1149 failedTasks
: expect
.any(Number
)
1151 await pool
.destroy()
1154 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1155 const pool
= new DynamicThreadPool(
1156 Math
.floor(numberOfWorkers
/ 2),
1158 './tests/worker-files/thread/testWorker.js'
1160 const promises
= new Set()
1163 pool
.emitter
.on(PoolEvents
.full
, info
=> {
1167 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
1168 promises
.add(pool
.execute())
1170 await Promise
.all(promises
)
1171 expect(poolFull
).toBe(1)
1172 expect(poolInfo
).toStrictEqual({
1174 type
: PoolTypes
.dynamic
,
1175 worker
: WorkerTypes
.thread
,
1178 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
1179 minSize
: expect
.any(Number
),
1180 maxSize
: expect
.any(Number
),
1181 workerNodes
: expect
.any(Number
),
1182 idleWorkerNodes
: expect
.any(Number
),
1183 busyWorkerNodes
: expect
.any(Number
),
1184 executedTasks
: expect
.any(Number
),
1185 executingTasks
: expect
.any(Number
),
1186 failedTasks
: expect
.any(Number
)
1188 await pool
.destroy()
1191 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
1192 const pool
= new FixedThreadPool(
1194 './tests/worker-files/thread/testWorker.js',
1196 enableTasksQueue
: true
1199 sinon
.stub(pool
, 'hasBackPressure').returns(true)
1200 const promises
= new Set()
1201 let poolBackPressure
= 0
1203 pool
.emitter
.on(PoolEvents
.backPressure
, info
=> {
1207 for (let i
= 0; i
< numberOfWorkers
+ 1; i
++) {
1208 promises
.add(pool
.execute())
1210 await Promise
.all(promises
)
1211 expect(poolBackPressure
).toBe(1)
1212 expect(poolInfo
).toStrictEqual({
1214 type
: PoolTypes
.fixed
,
1215 worker
: WorkerTypes
.thread
,
1218 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
1219 minSize
: expect
.any(Number
),
1220 maxSize
: expect
.any(Number
),
1221 workerNodes
: expect
.any(Number
),
1222 idleWorkerNodes
: expect
.any(Number
),
1223 busyWorkerNodes
: expect
.any(Number
),
1224 executedTasks
: expect
.any(Number
),
1225 executingTasks
: expect
.any(Number
),
1226 maxQueuedTasks
: expect
.any(Number
),
1227 queuedTasks
: expect
.any(Number
),
1229 stolenTasks
: expect
.any(Number
),
1230 failedTasks
: expect
.any(Number
)
1232 expect(pool
.hasBackPressure
.called
).toBe(true)
1233 await pool
.destroy()
1236 it('Verify that listTaskFunctions() is working', async () => {
1237 const dynamicThreadPool
= new DynamicThreadPool(
1238 Math
.floor(numberOfWorkers
/ 2),
1240 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.js'
1242 await
waitPoolEvents(dynamicThreadPool
, PoolEvents
.ready
, 1)
1243 expect(dynamicThreadPool
.listTaskFunctions()).toStrictEqual([
1245 'jsonIntegerSerialization',
1249 const fixedClusterPool
= new FixedClusterPool(
1251 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1253 await
waitPoolEvents(fixedClusterPool
, PoolEvents
.ready
, 1)
1254 expect(fixedClusterPool
.listTaskFunctions()).toStrictEqual([
1256 'jsonIntegerSerialization',
1260 await dynamicThreadPool
.destroy()
1261 await fixedClusterPool
.destroy()
1264 it('Verify that multiple task functions worker is working', async () => {
1265 const pool
= new DynamicClusterPool(
1266 Math
.floor(numberOfWorkers
/ 2),
1268 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1270 const data
= { n
: 10 }
1271 const result0
= await pool
.execute(data
)
1272 expect(result0
).toStrictEqual({ ok
: 1 })
1273 const result1
= await pool
.execute(data
, 'jsonIntegerSerialization')
1274 expect(result1
).toStrictEqual({ ok
: 1 })
1275 const result2
= await pool
.execute(data
, 'factorial')
1276 expect(result2
).toBe(3628800)
1277 const result3
= await pool
.execute(data
, 'fibonacci')
1278 expect(result3
).toBe(55)
1279 expect(pool
.info
.executingTasks
).toBe(0)
1280 expect(pool
.info
.executedTasks
).toBe(4)
1281 for (const workerNode
of pool
.workerNodes
) {
1282 expect(workerNode
.info
.taskFunctions
).toStrictEqual([
1284 'jsonIntegerSerialization',
1288 expect(workerNode
.taskFunctionsUsage
.size
).toBe(3)
1289 for (const name
of pool
.listTaskFunctions()) {
1290 expect(workerNode
.getTaskFunctionWorkerUsage(name
)).toStrictEqual({
1292 executed
: expect
.any(Number
),
1299 history
: expect
.any(CircularArray
)
1302 history
: expect
.any(CircularArray
)
1306 history
: expect
.any(CircularArray
)
1309 history
: expect
.any(CircularArray
)
1314 workerNode
.getTaskFunctionWorkerUsage(name
).tasks
.executed
1315 ).toBeGreaterThan(0)
1318 workerNode
.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME
)
1320 workerNode
.getTaskFunctionWorkerUsage(workerNode
.info
.taskFunctions
[1])
1323 await pool
.destroy()
1326 it('Verify sendKillMessageToWorker()', async () => {
1327 const pool
= new DynamicClusterPool(
1328 Math
.floor(numberOfWorkers
/ 2),
1330 './tests/worker-files/cluster/testWorker.js'
1332 const workerNodeKey
= 0
1334 pool
.sendKillMessageToWorker(
1336 pool
.workerNodes
[workerNodeKey
].info
.id
1338 ).resolves
.toBeUndefined()
1339 await pool
.destroy()