1 const { EventEmitter
} = require('node:events')
2 const { expect
} = require('expect')
3 const sinon
= require('sinon')
11 WorkerChoiceStrategies
,
13 } = require('../../../lib')
14 const { CircularArray
} = require('../../../lib/circular-array')
15 const { Deque
} = require('../../../lib/deque')
16 const { DEFAULT_TASK_NAME
} = require('../../../lib/utils')
17 const { version
} = require('../../../package.json')
18 const { waitPoolEvents
} = require('../../test-utils')
19 const { WorkerNode
} = require('../../../lib/pools/worker-node')
21 describe('Abstract pool test suite', () => {
22 const numberOfWorkers
= 2
23 class StubPoolWithIsMain
extends FixedThreadPool
{
33 it('Simulate pool creation from a non main thread/process', () => {
36 new StubPoolWithIsMain(
38 './tests/worker-files/thread/testWorker.js',
40 errorHandler
: e
=> console
.error(e
)
45 'Cannot start a pool from a worker with the same type as the pool'
50 it('Verify that pool statuses properties are set', async () => {
51 const pool
= new FixedThreadPool(
53 './tests/worker-files/thread/testWorker.js'
55 expect(pool
.starting
).toBe(false)
56 expect(pool
.started
).toBe(true)
60 it('Verify that filePath is checked', () => {
61 const expectedError
= new Error(
62 'Please specify a file with a worker implementation'
64 expect(() => new FixedThreadPool(numberOfWorkers
)).toThrowError(
67 expect(() => new FixedThreadPool(numberOfWorkers
, '')).toThrowError(
70 expect(() => new FixedThreadPool(numberOfWorkers
, 0)).toThrowError(
73 expect(() => new FixedThreadPool(numberOfWorkers
, true)).toThrowError(
77 () => new FixedThreadPool(numberOfWorkers
, './dummyWorker.ts')
78 ).toThrowError(new Error("Cannot find the worker file './dummyWorker.ts'"))
81 it('Verify that numberOfWorkers is checked', () => {
82 expect(() => new FixedThreadPool()).toThrowError(
84 'Cannot instantiate a pool without specifying the number of workers'
89 it('Verify that a negative number of workers is checked', () => {
92 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
95 'Cannot instantiate a pool with a negative number of workers'
100 it('Verify that a non integer number of workers is checked', () => {
103 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
106 'Cannot instantiate a pool with a non safe integer number of workers'
111 it('Verify that dynamic pool sizing is checked', () => {
114 new DynamicClusterPool(
117 './tests/worker-files/cluster/testWorker.js'
121 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
126 new DynamicThreadPool(
129 './tests/worker-files/thread/testWorker.js'
133 'Cannot instantiate a pool with a non safe integer number of workers'
138 new DynamicClusterPool(
141 './tests/worker-files/cluster/testWorker.js'
145 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
150 new DynamicThreadPool(2, 1, './tests/worker-files/thread/testWorker.js')
153 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
158 new DynamicThreadPool(0, 0, './tests/worker-files/thread/testWorker.js')
161 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
166 new DynamicClusterPool(
169 './tests/worker-files/cluster/testWorker.js'
173 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
178 it('Verify that pool options are checked', async () => {
179 let pool
= new FixedThreadPool(
181 './tests/worker-files/thread/testWorker.js'
183 expect(pool
.emitter
).toBeInstanceOf(EventEmitter
)
184 expect(pool
.opts
).toStrictEqual({
187 restartWorkerOnError
: true,
188 enableTasksQueue
: false,
189 workerChoiceStrategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
190 workerChoiceStrategyOptions
: {
192 runTime
: { median
: false },
193 waitTime
: { median
: false },
194 elu
: { median
: false }
197 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
199 runTime
: { median
: false },
200 waitTime
: { median
: false },
201 elu
: { median
: false }
203 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
204 .workerChoiceStrategies
) {
205 expect(workerChoiceStrategy
.opts
).toStrictEqual({
207 runTime
: { median
: false },
208 waitTime
: { median
: false },
209 elu
: { median
: false }
213 const testHandler
= () => console
.info('test handler executed')
214 pool
= new FixedThreadPool(
216 './tests/worker-files/thread/testWorker.js',
218 workerChoiceStrategy
: WorkerChoiceStrategies
.LEAST_USED
,
219 workerChoiceStrategyOptions
: {
220 runTime
: { median
: true },
221 weights
: { 0: 300, 1: 200 }
224 restartWorkerOnError
: false,
225 enableTasksQueue
: true,
226 tasksQueueOptions
: { concurrency
: 2 },
227 messageHandler
: testHandler
,
228 errorHandler
: testHandler
,
229 onlineHandler
: testHandler
,
230 exitHandler
: testHandler
233 expect(pool
.emitter
).toBeUndefined()
234 expect(pool
.opts
).toStrictEqual({
237 restartWorkerOnError
: false,
238 enableTasksQueue
: true,
241 size
: Math
.pow(numberOfWorkers
, 2),
243 tasksStealingOnBackPressure
: true
245 workerChoiceStrategy
: WorkerChoiceStrategies
.LEAST_USED
,
246 workerChoiceStrategyOptions
: {
248 runTime
: { median
: true },
249 waitTime
: { median
: false },
250 elu
: { median
: false },
251 weights
: { 0: 300, 1: 200 }
253 onlineHandler
: testHandler
,
254 messageHandler
: testHandler
,
255 errorHandler
: testHandler
,
256 exitHandler
: testHandler
258 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
260 runTime
: { median
: true },
261 waitTime
: { median
: false },
262 elu
: { median
: false },
263 weights
: { 0: 300, 1: 200 }
265 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
266 .workerChoiceStrategies
) {
267 expect(workerChoiceStrategy
.opts
).toStrictEqual({
269 runTime
: { median
: true },
270 waitTime
: { median
: false },
271 elu
: { median
: false },
272 weights
: { 0: 300, 1: 200 }
278 it('Verify that pool options are validated', async () => {
283 './tests/worker-files/thread/testWorker.js',
285 workerChoiceStrategy
: 'invalidStrategy'
289 new Error("Invalid worker choice strategy 'invalidStrategy'")
295 './tests/worker-files/thread/testWorker.js',
297 workerChoiceStrategyOptions
: {
298 retries
: 'invalidChoiceRetries'
304 'Invalid worker choice strategy options: retries must be an integer'
311 './tests/worker-files/thread/testWorker.js',
313 workerChoiceStrategyOptions
: {
320 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
327 './tests/worker-files/thread/testWorker.js',
329 workerChoiceStrategyOptions
: { weights
: {} }
334 'Invalid worker choice strategy options: must have a weight for each worker node'
341 './tests/worker-files/thread/testWorker.js',
343 workerChoiceStrategyOptions
: { measurement
: 'invalidMeasurement' }
348 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
355 './tests/worker-files/thread/testWorker.js',
357 enableTasksQueue
: true,
358 tasksQueueOptions
: 'invalidTasksQueueOptions'
362 new TypeError('Invalid tasks queue options: must be a plain object')
368 './tests/worker-files/thread/testWorker.js',
370 enableTasksQueue
: true,
371 tasksQueueOptions
: { concurrency
: 0 }
376 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
383 './tests/worker-files/thread/testWorker.js',
385 enableTasksQueue
: true,
386 tasksQueueOptions
: { concurrency
: -1 }
391 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
398 './tests/worker-files/thread/testWorker.js',
400 enableTasksQueue
: true,
401 tasksQueueOptions
: { concurrency
: 0.2 }
405 new TypeError('Invalid worker node tasks concurrency: must be an integer')
411 './tests/worker-files/thread/testWorker.js',
413 enableTasksQueue
: true,
414 tasksQueueOptions
: { size
: 0 }
419 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
426 './tests/worker-files/thread/testWorker.js',
428 enableTasksQueue
: true,
429 tasksQueueOptions
: { size
: -1 }
434 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
441 './tests/worker-files/thread/testWorker.js',
443 enableTasksQueue
: true,
444 tasksQueueOptions
: { size
: 0.2 }
448 new TypeError('Invalid worker node tasks queue size: must be an integer')
452 it('Verify that pool worker choice strategy options can be set', async () => {
453 const pool
= new FixedThreadPool(
455 './tests/worker-files/thread/testWorker.js',
456 { workerChoiceStrategy
: WorkerChoiceStrategies
.FAIR_SHARE
}
458 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
460 runTime
: { median
: false },
461 waitTime
: { median
: false },
462 elu
: { median
: false }
464 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
466 runTime
: { median
: false },
467 waitTime
: { median
: false },
468 elu
: { median
: false }
470 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
471 .workerChoiceStrategies
) {
472 expect(workerChoiceStrategy
.opts
).toStrictEqual({
474 runTime
: { median
: false },
475 waitTime
: { median
: false },
476 elu
: { median
: false }
480 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
498 pool
.setWorkerChoiceStrategyOptions({
499 runTime
: { median
: true },
500 elu
: { median
: true }
502 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
504 runTime
: { median
: true },
505 waitTime
: { median
: false },
506 elu
: { median
: true }
508 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
510 runTime
: { median
: true },
511 waitTime
: { median
: false },
512 elu
: { median
: true }
514 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
515 .workerChoiceStrategies
) {
516 expect(workerChoiceStrategy
.opts
).toStrictEqual({
518 runTime
: { median
: true },
519 waitTime
: { median
: false },
520 elu
: { median
: true }
524 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
542 pool
.setWorkerChoiceStrategyOptions({
543 runTime
: { median
: false },
544 elu
: { median
: false }
546 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
548 runTime
: { median
: false },
549 waitTime
: { median
: false },
550 elu
: { median
: false }
552 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
554 runTime
: { median
: false },
555 waitTime
: { median
: false },
556 elu
: { median
: false }
558 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
559 .workerChoiceStrategies
) {
560 expect(workerChoiceStrategy
.opts
).toStrictEqual({
562 runTime
: { median
: false },
563 waitTime
: { median
: false },
564 elu
: { median
: false }
568 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
587 pool
.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
590 'Invalid worker choice strategy options: must be a plain object'
594 pool
.setWorkerChoiceStrategyOptions({
595 retries
: 'invalidChoiceRetries'
599 'Invalid worker choice strategy options: retries must be an integer'
603 pool
.setWorkerChoiceStrategyOptions({ retries
: -1 })
606 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
610 pool
.setWorkerChoiceStrategyOptions({ weights
: {} })
613 'Invalid worker choice strategy options: must have a weight for each worker node'
617 pool
.setWorkerChoiceStrategyOptions({ measurement
: 'invalidMeasurement' })
620 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
626 it('Verify that pool tasks queue can be enabled/disabled', async () => {
627 const pool
= new FixedThreadPool(
629 './tests/worker-files/thread/testWorker.js'
631 expect(pool
.opts
.enableTasksQueue
).toBe(false)
632 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
633 for (const workerNode
of pool
.workerNodes
) {
634 expect(workerNode
.onEmptyQueue
).toBeUndefined()
635 expect(workerNode
.onBackPressure
).toBeUndefined()
637 pool
.enableTasksQueue(true)
638 expect(pool
.opts
.enableTasksQueue
).toBe(true)
639 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({
641 size
: Math
.pow(numberOfWorkers
, 2),
643 tasksStealingOnBackPressure
: true
645 for (const workerNode
of pool
.workerNodes
) {
646 expect(workerNode
.onEmptyQueue
).toBeInstanceOf(Function
)
647 expect(workerNode
.onBackPressure
).toBeInstanceOf(Function
)
649 pool
.enableTasksQueue(true, { concurrency
: 2 })
650 expect(pool
.opts
.enableTasksQueue
).toBe(true)
651 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({
653 size
: Math
.pow(numberOfWorkers
, 2),
655 tasksStealingOnBackPressure
: true
657 for (const workerNode
of pool
.workerNodes
) {
658 expect(workerNode
.onEmptyQueue
).toBeInstanceOf(Function
)
659 expect(workerNode
.onBackPressure
).toBeInstanceOf(Function
)
661 pool
.enableTasksQueue(false)
662 expect(pool
.opts
.enableTasksQueue
).toBe(false)
663 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
664 for (const workerNode
of pool
.workerNodes
) {
665 expect(workerNode
.onEmptyQueue
).toBeUndefined()
666 expect(workerNode
.onBackPressure
).toBeUndefined()
671 it('Verify that pool tasks queue options can be set', async () => {
672 const pool
= new FixedThreadPool(
674 './tests/worker-files/thread/testWorker.js',
675 { enableTasksQueue
: true }
677 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({
679 size
: Math
.pow(numberOfWorkers
, 2),
681 tasksStealingOnBackPressure
: true
683 for (const workerNode
of pool
.workerNodes
) {
684 expect(workerNode
.tasksQueueBackPressureSize
).toBe(
685 pool
.opts
.tasksQueueOptions
.size
687 expect(workerNode
.onEmptyQueue
).toBeInstanceOf(Function
)
688 expect(workerNode
.onBackPressure
).toBeInstanceOf(Function
)
690 pool
.setTasksQueueOptions({
694 tasksStealingOnBackPressure
: false
696 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({
700 tasksStealingOnBackPressure
: false
702 for (const workerNode
of pool
.workerNodes
) {
703 expect(workerNode
.tasksQueueBackPressureSize
).toBe(
704 pool
.opts
.tasksQueueOptions
.size
706 expect(workerNode
.onEmptyQueue
).toBeUndefined()
707 expect(workerNode
.onBackPressure
).toBeUndefined()
709 pool
.setTasksQueueOptions({
712 tasksStealingOnBackPressure
: true
714 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({
716 size
: Math
.pow(numberOfWorkers
, 2),
718 tasksStealingOnBackPressure
: true
720 for (const workerNode
of pool
.workerNodes
) {
721 expect(workerNode
.tasksQueueBackPressureSize
).toBe(
722 pool
.opts
.tasksQueueOptions
.size
724 expect(workerNode
.onEmptyQueue
).toBeInstanceOf(Function
)
725 expect(workerNode
.onBackPressure
).toBeInstanceOf(Function
)
728 pool
.setTasksQueueOptions('invalidTasksQueueOptions')
730 new TypeError('Invalid tasks queue options: must be a plain object')
732 expect(() => pool
.setTasksQueueOptions({ concurrency
: 0 })).toThrowError(
734 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
737 expect(() => pool
.setTasksQueueOptions({ concurrency
: -1 })).toThrowError(
739 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
742 expect(() => pool
.setTasksQueueOptions({ concurrency
: 0.2 })).toThrowError(
743 new TypeError('Invalid worker node tasks concurrency: must be an integer')
745 expect(() => pool
.setTasksQueueOptions({ size
: 0 })).toThrowError(
747 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
750 expect(() => pool
.setTasksQueueOptions({ size
: -1 })).toThrowError(
752 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
755 expect(() => pool
.setTasksQueueOptions({ size
: 0.2 })).toThrowError(
756 new TypeError('Invalid worker node tasks queue size: must be an integer')
761 it('Verify that pool info is set', async () => {
762 let pool
= new FixedThreadPool(
764 './tests/worker-files/thread/testWorker.js'
766 expect(pool
.info
).toStrictEqual({
768 type
: PoolTypes
.fixed
,
769 worker
: WorkerTypes
.thread
,
772 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
773 minSize
: numberOfWorkers
,
774 maxSize
: numberOfWorkers
,
775 workerNodes
: numberOfWorkers
,
776 idleWorkerNodes
: numberOfWorkers
,
783 pool
= new DynamicClusterPool(
784 Math
.floor(numberOfWorkers
/ 2),
786 './tests/worker-files/cluster/testWorker.js'
788 expect(pool
.info
).toStrictEqual({
790 type
: PoolTypes
.dynamic
,
791 worker
: WorkerTypes
.cluster
,
794 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
795 minSize
: Math
.floor(numberOfWorkers
/ 2),
796 maxSize
: numberOfWorkers
,
797 workerNodes
: Math
.floor(numberOfWorkers
/ 2),
798 idleWorkerNodes
: Math
.floor(numberOfWorkers
/ 2),
807 it('Verify that pool worker tasks usage are initialized', async () => {
808 const pool
= new FixedClusterPool(
810 './tests/worker-files/cluster/testWorker.js'
812 for (const workerNode
of pool
.workerNodes
) {
813 expect(workerNode
).toBeInstanceOf(WorkerNode
)
814 expect(workerNode
.usage
).toStrictEqual({
824 history
: new CircularArray()
827 history
: new CircularArray()
831 history
: new CircularArray()
834 history
: new CircularArray()
842 it('Verify that pool worker tasks queue are initialized', async () => {
843 let pool
= new FixedClusterPool(
845 './tests/worker-files/cluster/testWorker.js'
847 for (const workerNode
of pool
.workerNodes
) {
848 expect(workerNode
).toBeInstanceOf(WorkerNode
)
849 expect(workerNode
.tasksQueue
).toBeInstanceOf(Deque
)
850 expect(workerNode
.tasksQueue
.size
).toBe(0)
851 expect(workerNode
.tasksQueue
.maxSize
).toBe(0)
854 pool
= new DynamicThreadPool(
855 Math
.floor(numberOfWorkers
/ 2),
857 './tests/worker-files/thread/testWorker.js'
859 for (const workerNode
of pool
.workerNodes
) {
860 expect(workerNode
).toBeInstanceOf(WorkerNode
)
861 expect(workerNode
.tasksQueue
).toBeInstanceOf(Deque
)
862 expect(workerNode
.tasksQueue
.size
).toBe(0)
863 expect(workerNode
.tasksQueue
.maxSize
).toBe(0)
868 it('Verify that pool worker info are initialized', async () => {
869 let pool
= new FixedClusterPool(
871 './tests/worker-files/cluster/testWorker.js'
873 for (const workerNode
of pool
.workerNodes
) {
874 expect(workerNode
).toBeInstanceOf(WorkerNode
)
875 expect(workerNode
.info
).toStrictEqual({
876 id
: expect
.any(Number
),
877 type
: WorkerTypes
.cluster
,
883 pool
= new DynamicThreadPool(
884 Math
.floor(numberOfWorkers
/ 2),
886 './tests/worker-files/thread/testWorker.js'
888 for (const workerNode
of pool
.workerNodes
) {
889 expect(workerNode
).toBeInstanceOf(WorkerNode
)
890 expect(workerNode
.info
).toStrictEqual({
891 id
: expect
.any(Number
),
892 type
: WorkerTypes
.thread
,
900 it('Verify that pool can be started after initialization', async () => {
901 const pool
= new FixedClusterPool(
903 './tests/worker-files/cluster/testWorker.js',
908 expect(pool
.info
.started
).toBe(false)
909 expect(pool
.info
.ready
).toBe(false)
910 expect(pool
.workerNodes
).toStrictEqual([])
911 await
expect(pool
.execute()).rejects
.toThrowError(
912 new Error('Cannot execute a task on not started pool')
915 expect(pool
.info
.started
).toBe(true)
916 expect(pool
.info
.ready
).toBe(true)
917 expect(pool
.workerNodes
.length
).toBe(numberOfWorkers
)
918 for (const workerNode
of pool
.workerNodes
) {
919 expect(workerNode
).toBeInstanceOf(WorkerNode
)
924 it('Verify that pool execute() arguments are checked', async () => {
925 const pool
= new FixedClusterPool(
927 './tests/worker-files/cluster/testWorker.js'
929 await
expect(pool
.execute(undefined, 0)).rejects
.toThrowError(
930 new TypeError('name argument must be a string')
932 await
expect(pool
.execute(undefined, '')).rejects
.toThrowError(
933 new TypeError('name argument must not be an empty string')
935 await
expect(pool
.execute(undefined, undefined, {})).rejects
.toThrowError(
936 new TypeError('transferList argument must be an array')
938 await
expect(pool
.execute(undefined, 'unknown')).rejects
.toBe(
939 "Task function 'unknown' not found"
942 await
expect(pool
.execute()).rejects
.toThrowError(
943 new Error('Cannot execute a task on not started pool')
947 it('Verify that pool worker tasks usage are computed', async () => {
948 const pool
= new FixedClusterPool(
950 './tests/worker-files/cluster/testWorker.js'
952 const promises
= new Set()
953 const maxMultiplier
= 2
954 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
955 promises
.add(pool
.execute())
957 for (const workerNode
of pool
.workerNodes
) {
958 expect(workerNode
.usage
).toStrictEqual({
961 executing
: maxMultiplier
,
968 history
: expect
.any(CircularArray
)
971 history
: expect
.any(CircularArray
)
975 history
: expect
.any(CircularArray
)
978 history
: expect
.any(CircularArray
)
983 await Promise
.all(promises
)
984 for (const workerNode
of pool
.workerNodes
) {
985 expect(workerNode
.usage
).toStrictEqual({
987 executed
: maxMultiplier
,
995 history
: expect
.any(CircularArray
)
998 history
: expect
.any(CircularArray
)
1002 history
: expect
.any(CircularArray
)
1005 history
: expect
.any(CircularArray
)
1010 await pool
.destroy()
1013 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
1014 const pool
= new DynamicThreadPool(
1015 Math
.floor(numberOfWorkers
/ 2),
1017 './tests/worker-files/thread/testWorker.js'
1019 const promises
= new Set()
1020 const maxMultiplier
= 2
1021 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
1022 promises
.add(pool
.execute())
1024 await Promise
.all(promises
)
1025 for (const workerNode
of pool
.workerNodes
) {
1026 expect(workerNode
.usage
).toStrictEqual({
1028 executed
: expect
.any(Number
),
1036 history
: expect
.any(CircularArray
)
1039 history
: expect
.any(CircularArray
)
1043 history
: expect
.any(CircularArray
)
1046 history
: expect
.any(CircularArray
)
1050 expect(workerNode
.usage
.tasks
.executed
).toBeGreaterThan(0)
1051 expect(workerNode
.usage
.tasks
.executed
).toBeLessThanOrEqual(
1052 numberOfWorkers
* maxMultiplier
1054 expect(workerNode
.usage
.runTime
.history
.length
).toBe(0)
1055 expect(workerNode
.usage
.waitTime
.history
.length
).toBe(0)
1056 expect(workerNode
.usage
.elu
.idle
.history
.length
).toBe(0)
1057 expect(workerNode
.usage
.elu
.active
.history
.length
).toBe(0)
1059 pool
.setWorkerChoiceStrategy(WorkerChoiceStrategies
.FAIR_SHARE
)
1060 for (const workerNode
of pool
.workerNodes
) {
1061 expect(workerNode
.usage
).toStrictEqual({
1071 history
: expect
.any(CircularArray
)
1074 history
: expect
.any(CircularArray
)
1078 history
: expect
.any(CircularArray
)
1081 history
: expect
.any(CircularArray
)
1085 expect(workerNode
.usage
.runTime
.history
.length
).toBe(0)
1086 expect(workerNode
.usage
.waitTime
.history
.length
).toBe(0)
1087 expect(workerNode
.usage
.elu
.idle
.history
.length
).toBe(0)
1088 expect(workerNode
.usage
.elu
.active
.history
.length
).toBe(0)
1090 await pool
.destroy()
1093 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1094 const pool
= new DynamicClusterPool(
1095 Math
.floor(numberOfWorkers
/ 2),
1097 './tests/worker-files/cluster/testWorker.js'
1101 pool
.emitter
.on(PoolEvents
.ready
, info
=> {
1105 await
waitPoolEvents(pool
, PoolEvents
.ready
, 1)
1106 expect(poolReady
).toBe(1)
1107 expect(poolInfo
).toStrictEqual({
1109 type
: PoolTypes
.dynamic
,
1110 worker
: WorkerTypes
.cluster
,
1113 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
1114 minSize
: expect
.any(Number
),
1115 maxSize
: expect
.any(Number
),
1116 workerNodes
: expect
.any(Number
),
1117 idleWorkerNodes
: expect
.any(Number
),
1118 busyWorkerNodes
: expect
.any(Number
),
1119 executedTasks
: expect
.any(Number
),
1120 executingTasks
: expect
.any(Number
),
1121 failedTasks
: expect
.any(Number
)
1123 await pool
.destroy()
1126 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1127 const pool
= new FixedThreadPool(
1129 './tests/worker-files/thread/testWorker.js'
1131 const promises
= new Set()
1134 pool
.emitter
.on(PoolEvents
.busy
, info
=> {
1138 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
1139 promises
.add(pool
.execute())
1141 await Promise
.all(promises
)
1142 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1143 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1144 expect(poolBusy
).toBe(numberOfWorkers
+ 1)
1145 expect(poolInfo
).toStrictEqual({
1147 type
: PoolTypes
.fixed
,
1148 worker
: WorkerTypes
.thread
,
1151 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
1152 minSize
: expect
.any(Number
),
1153 maxSize
: expect
.any(Number
),
1154 workerNodes
: expect
.any(Number
),
1155 idleWorkerNodes
: expect
.any(Number
),
1156 busyWorkerNodes
: expect
.any(Number
),
1157 executedTasks
: expect
.any(Number
),
1158 executingTasks
: expect
.any(Number
),
1159 failedTasks
: expect
.any(Number
)
1161 await pool
.destroy()
1164 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1165 const pool
= new DynamicThreadPool(
1166 Math
.floor(numberOfWorkers
/ 2),
1168 './tests/worker-files/thread/testWorker.js'
1170 const promises
= new Set()
1173 pool
.emitter
.on(PoolEvents
.full
, info
=> {
1177 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
1178 promises
.add(pool
.execute())
1180 await Promise
.all(promises
)
1181 expect(poolFull
).toBe(1)
1182 expect(poolInfo
).toStrictEqual({
1184 type
: PoolTypes
.dynamic
,
1185 worker
: WorkerTypes
.thread
,
1188 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
1189 minSize
: expect
.any(Number
),
1190 maxSize
: expect
.any(Number
),
1191 workerNodes
: expect
.any(Number
),
1192 idleWorkerNodes
: expect
.any(Number
),
1193 busyWorkerNodes
: expect
.any(Number
),
1194 executedTasks
: expect
.any(Number
),
1195 executingTasks
: expect
.any(Number
),
1196 failedTasks
: expect
.any(Number
)
1198 await pool
.destroy()
1201 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
1202 const pool
= new FixedThreadPool(
1204 './tests/worker-files/thread/testWorker.js',
1206 enableTasksQueue
: true
1209 sinon
.stub(pool
, 'hasBackPressure').returns(true)
1210 const promises
= new Set()
1211 let poolBackPressure
= 0
1213 pool
.emitter
.on(PoolEvents
.backPressure
, info
=> {
1217 for (let i
= 0; i
< numberOfWorkers
+ 1; i
++) {
1218 promises
.add(pool
.execute())
1220 await Promise
.all(promises
)
1221 expect(poolBackPressure
).toBe(1)
1222 expect(poolInfo
).toStrictEqual({
1224 type
: PoolTypes
.fixed
,
1225 worker
: WorkerTypes
.thread
,
1228 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
1229 minSize
: expect
.any(Number
),
1230 maxSize
: expect
.any(Number
),
1231 workerNodes
: expect
.any(Number
),
1232 idleWorkerNodes
: expect
.any(Number
),
1233 busyWorkerNodes
: expect
.any(Number
),
1234 executedTasks
: expect
.any(Number
),
1235 executingTasks
: expect
.any(Number
),
1236 maxQueuedTasks
: expect
.any(Number
),
1237 queuedTasks
: expect
.any(Number
),
1239 stolenTasks
: expect
.any(Number
),
1240 failedTasks
: expect
.any(Number
)
1242 expect(pool
.hasBackPressure
.called
).toBe(true)
1243 await pool
.destroy()
1246 it('Verify that hasTaskFunction() is working', async () => {
1247 const dynamicThreadPool
= new DynamicThreadPool(
1248 Math
.floor(numberOfWorkers
/ 2),
1250 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.js'
1252 await
waitPoolEvents(dynamicThreadPool
, PoolEvents
.ready
, 1)
1253 expect(dynamicThreadPool
.hasTaskFunction(DEFAULT_TASK_NAME
)).toBe(true)
1254 expect(dynamicThreadPool
.hasTaskFunction('jsonIntegerSerialization')).toBe(
1257 expect(dynamicThreadPool
.hasTaskFunction('factorial')).toBe(true)
1258 expect(dynamicThreadPool
.hasTaskFunction('fibonacci')).toBe(true)
1259 expect(dynamicThreadPool
.hasTaskFunction('unknown')).toBe(false)
1260 await dynamicThreadPool
.destroy()
1261 const fixedClusterPool
= new FixedClusterPool(
1263 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1265 await
waitPoolEvents(fixedClusterPool
, PoolEvents
.ready
, 1)
1266 expect(fixedClusterPool
.hasTaskFunction(DEFAULT_TASK_NAME
)).toBe(true)
1267 expect(fixedClusterPool
.hasTaskFunction('jsonIntegerSerialization')).toBe(
1270 expect(fixedClusterPool
.hasTaskFunction('factorial')).toBe(true)
1271 expect(fixedClusterPool
.hasTaskFunction('fibonacci')).toBe(true)
1272 expect(fixedClusterPool
.hasTaskFunction('unknown')).toBe(false)
1273 await fixedClusterPool
.destroy()
1276 it('Verify that addTaskFunction() is working', async () => {
1277 const dynamicThreadPool
= new DynamicThreadPool(
1278 Math
.floor(numberOfWorkers
/ 2),
1280 './tests/worker-files/thread/testWorker.js'
1282 await
waitPoolEvents(dynamicThreadPool
, PoolEvents
.ready
, 1)
1284 dynamicThreadPool
.addTaskFunction(0, () => {})
1285 ).rejects
.toThrowError(new TypeError('name argument must be a string'))
1287 dynamicThreadPool
.addTaskFunction('', () => {})
1288 ).rejects
.toThrowError(
1289 new TypeError('name argument must not be an empty string')
1292 dynamicThreadPool
.addTaskFunction('test', 0)
1293 ).rejects
.toThrowError(new TypeError('fn argument must be a function'))
1295 dynamicThreadPool
.addTaskFunction('test', '')
1296 ).rejects
.toThrowError(new TypeError('fn argument must be a function'))
1297 expect(dynamicThreadPool
.listTaskFunctionNames()).toStrictEqual([
1301 const echoTaskFunction
= data
=> {
1305 dynamicThreadPool
.addTaskFunction('echo', echoTaskFunction
)
1306 ).resolves
.toBe(true)
1307 expect(dynamicThreadPool
.taskFunctions
.size
).toBe(1)
1308 expect(dynamicThreadPool
.taskFunctions
.get('echo')).toStrictEqual(
1311 expect(dynamicThreadPool
.listTaskFunctionNames()).toStrictEqual([
1316 const taskFunctionData
= { test
: 'test' }
1317 const echoResult
= await dynamicThreadPool
.execute(taskFunctionData
, 'echo')
1318 expect(echoResult
).toStrictEqual(taskFunctionData
)
1319 await dynamicThreadPool
.destroy()
1322 it('Verify that removeTaskFunction() is working', async () => {
1323 const dynamicThreadPool
= new DynamicThreadPool(
1324 Math
.floor(numberOfWorkers
/ 2),
1326 './tests/worker-files/thread/testWorker.js'
1328 await
waitPoolEvents(dynamicThreadPool
, PoolEvents
.ready
, 1)
1329 expect(dynamicThreadPool
.listTaskFunctionNames()).toStrictEqual([
1334 dynamicThreadPool
.removeTaskFunction('test')
1335 ).rejects
.toThrowError(
1336 new Error('Cannot remove a task function not handled on the pool side')
1338 const echoTaskFunction
= data
=> {
1341 await dynamicThreadPool
.addTaskFunction('echo', echoTaskFunction
)
1342 expect(dynamicThreadPool
.taskFunctions
.size
).toBe(1)
1343 expect(dynamicThreadPool
.taskFunctions
.get('echo')).toStrictEqual(
1346 expect(dynamicThreadPool
.listTaskFunctionNames()).toStrictEqual([
1351 await
expect(dynamicThreadPool
.removeTaskFunction('echo')).resolves
.toBe(
1354 expect(dynamicThreadPool
.taskFunctions
.size
).toBe(0)
1355 expect(dynamicThreadPool
.taskFunctions
.get('echo')).toBeUndefined()
1356 expect(dynamicThreadPool
.listTaskFunctionNames()).toStrictEqual([
1360 await dynamicThreadPool
.destroy()
1363 it('Verify that listTaskFunctionNames() is working', async () => {
1364 const dynamicThreadPool
= new DynamicThreadPool(
1365 Math
.floor(numberOfWorkers
/ 2),
1367 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.js'
1369 await
waitPoolEvents(dynamicThreadPool
, PoolEvents
.ready
, 1)
1370 expect(dynamicThreadPool
.listTaskFunctionNames()).toStrictEqual([
1372 'jsonIntegerSerialization',
1376 await dynamicThreadPool
.destroy()
1377 const fixedClusterPool
= new FixedClusterPool(
1379 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1381 await
waitPoolEvents(fixedClusterPool
, PoolEvents
.ready
, 1)
1382 expect(fixedClusterPool
.listTaskFunctionNames()).toStrictEqual([
1384 'jsonIntegerSerialization',
1388 await fixedClusterPool
.destroy()
1391 it('Verify that setDefaultTaskFunction() is working', async () => {
1392 const dynamicThreadPool
= new DynamicThreadPool(
1393 Math
.floor(numberOfWorkers
/ 2),
1395 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.js'
1397 await
waitPoolEvents(dynamicThreadPool
, PoolEvents
.ready
, 1)
1398 expect(dynamicThreadPool
.listTaskFunctionNames()).toStrictEqual([
1400 'jsonIntegerSerialization',
1405 dynamicThreadPool
.setDefaultTaskFunction('factorial')
1406 ).resolves
.toBe(true)
1407 expect(dynamicThreadPool
.listTaskFunctionNames()).toStrictEqual([
1410 'jsonIntegerSerialization',
1414 dynamicThreadPool
.setDefaultTaskFunction('fibonacci')
1415 ).resolves
.toBe(true)
1416 expect(dynamicThreadPool
.listTaskFunctionNames()).toStrictEqual([
1419 'jsonIntegerSerialization',
1424 it('Verify that multiple task functions worker is working', async () => {
1425 const pool
= new DynamicClusterPool(
1426 Math
.floor(numberOfWorkers
/ 2),
1428 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1430 const data
= { n
: 10 }
1431 const result0
= await pool
.execute(data
)
1432 expect(result0
).toStrictEqual({ ok
: 1 })
1433 const result1
= await pool
.execute(data
, 'jsonIntegerSerialization')
1434 expect(result1
).toStrictEqual({ ok
: 1 })
1435 const result2
= await pool
.execute(data
, 'factorial')
1436 expect(result2
).toBe(3628800)
1437 const result3
= await pool
.execute(data
, 'fibonacci')
1438 expect(result3
).toBe(55)
1439 expect(pool
.info
.executingTasks
).toBe(0)
1440 expect(pool
.info
.executedTasks
).toBe(4)
1441 for (const workerNode
of pool
.workerNodes
) {
1442 expect(workerNode
.info
.taskFunctionNames
).toStrictEqual([
1444 'jsonIntegerSerialization',
1448 expect(workerNode
.taskFunctionsUsage
.size
).toBe(3)
1449 for (const name
of pool
.listTaskFunctionNames()) {
1450 expect(workerNode
.getTaskFunctionWorkerUsage(name
)).toStrictEqual({
1452 executed
: expect
.any(Number
),
1459 history
: expect
.any(CircularArray
)
1462 history
: expect
.any(CircularArray
)
1466 history
: expect
.any(CircularArray
)
1469 history
: expect
.any(CircularArray
)
1474 workerNode
.getTaskFunctionWorkerUsage(name
).tasks
.executed
1475 ).toBeGreaterThan(0)
1478 workerNode
.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME
)
1480 workerNode
.getTaskFunctionWorkerUsage(
1481 workerNode
.info
.taskFunctionNames
[1]
1485 await pool
.destroy()
1488 it('Verify sendKillMessageToWorker()', async () => {
1489 const pool
= new DynamicClusterPool(
1490 Math
.floor(numberOfWorkers
/ 2),
1492 './tests/worker-files/cluster/testWorker.js'
1494 const workerNodeKey
= 0
1496 pool
.sendKillMessageToWorker(
1498 pool
.workerNodes
[workerNodeKey
].info
.id
1500 ).resolves
.toBeUndefined()
1501 await pool
.destroy()