1 const { EventEmitter
} = require('node:events')
2 const { expect
} = require('expect')
3 const sinon
= require('sinon')
11 WorkerChoiceStrategies
,
13 } = require('../../lib')
14 const { CircularArray
} = require('../../lib/circular-array')
15 const { Deque
} = require('../../lib/deque')
16 const { DEFAULT_TASK_NAME
} = require('../../lib/utils')
17 const { version
} = require('../../package.json')
18 const { waitPoolEvents
} = require('../test-utils')
19 const { WorkerNode
} = require('../../lib/pools/worker-node')
21 describe('Abstract pool test suite', () => {
22 const numberOfWorkers
= 2
23 class StubPoolWithIsMain
extends FixedThreadPool
{
33 it('Simulate pool creation from a non main thread/process', () => {
36 new StubPoolWithIsMain(
38 './tests/worker-files/thread/testWorker.js',
40 errorHandler
: e
=> console
.error(e
)
45 'Cannot start a pool from a worker with the same type as the pool'
50 it('Verify that pool statuses properties are set', async () => {
51 const pool
= new FixedThreadPool(
53 './tests/worker-files/thread/testWorker.js'
55 expect(pool
.starting
).toBe(false)
56 expect(pool
.started
).toBe(true)
60 it('Verify that filePath is checked', () => {
61 const expectedError
= new Error(
62 'Please specify a file with a worker implementation'
64 expect(() => new FixedThreadPool(numberOfWorkers
)).toThrowError(
67 expect(() => new FixedThreadPool(numberOfWorkers
, '')).toThrowError(
70 expect(() => new FixedThreadPool(numberOfWorkers
, 0)).toThrowError(
73 expect(() => new FixedThreadPool(numberOfWorkers
, true)).toThrowError(
77 () => new FixedThreadPool(numberOfWorkers
, './dummyWorker.ts')
78 ).toThrowError(new Error("Cannot find the worker file './dummyWorker.ts'"))
81 it('Verify that numberOfWorkers is checked', () => {
86 './tests/worker-files/thread/testWorker.js'
90 'Cannot instantiate a pool without specifying the number of workers'
95 it('Verify that a negative number of workers is checked', () => {
98 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
101 'Cannot instantiate a pool with a negative number of workers'
106 it('Verify that a non integer number of workers is checked', () => {
109 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
112 'Cannot instantiate a pool with a non safe integer number of workers'
117 it('Verify that dynamic pool sizing is checked', () => {
120 new DynamicClusterPool(
123 './tests/worker-files/cluster/testWorker.js'
127 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
132 new DynamicThreadPool(
135 './tests/worker-files/thread/testWorker.js'
139 'Cannot instantiate a pool with a non safe integer number of workers'
144 new DynamicClusterPool(
147 './tests/worker-files/cluster/testWorker.js'
151 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
156 new DynamicThreadPool(2, 1, './tests/worker-files/thread/testWorker.js')
159 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
164 new DynamicThreadPool(0, 0, './tests/worker-files/thread/testWorker.js')
167 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
172 new DynamicClusterPool(
175 './tests/worker-files/cluster/testWorker.js'
179 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
184 it('Verify that pool options are checked', async () => {
185 let pool
= new FixedThreadPool(
187 './tests/worker-files/thread/testWorker.js'
189 expect(pool
.emitter
).toBeInstanceOf(EventEmitter
)
190 expect(pool
.opts
).toStrictEqual({
193 restartWorkerOnError
: true,
194 enableTasksQueue
: false,
195 workerChoiceStrategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
196 workerChoiceStrategyOptions
: {
198 runTime
: { median
: false },
199 waitTime
: { median
: false },
200 elu
: { median
: false }
203 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
205 runTime
: { median
: false },
206 waitTime
: { median
: false },
207 elu
: { median
: false }
209 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
210 .workerChoiceStrategies
) {
211 expect(workerChoiceStrategy
.opts
).toStrictEqual({
213 runTime
: { median
: false },
214 waitTime
: { median
: false },
215 elu
: { median
: false }
219 const testHandler
= () => console
.info('test handler executed')
220 pool
= new FixedThreadPool(
222 './tests/worker-files/thread/testWorker.js',
224 workerChoiceStrategy
: WorkerChoiceStrategies
.LEAST_USED
,
225 workerChoiceStrategyOptions
: {
226 runTime
: { median
: true },
227 weights
: { 0: 300, 1: 200 }
230 restartWorkerOnError
: false,
231 enableTasksQueue
: true,
232 tasksQueueOptions
: { concurrency
: 2 },
233 messageHandler
: testHandler
,
234 errorHandler
: testHandler
,
235 onlineHandler
: testHandler
,
236 exitHandler
: testHandler
239 expect(pool
.emitter
).toBeUndefined()
240 expect(pool
.opts
).toStrictEqual({
243 restartWorkerOnError
: false,
244 enableTasksQueue
: true,
247 size
: Math
.pow(numberOfWorkers
, 2),
249 tasksStealingOnBackPressure
: true
251 workerChoiceStrategy
: WorkerChoiceStrategies
.LEAST_USED
,
252 workerChoiceStrategyOptions
: {
254 runTime
: { median
: true },
255 waitTime
: { median
: false },
256 elu
: { median
: false },
257 weights
: { 0: 300, 1: 200 }
259 onlineHandler
: testHandler
,
260 messageHandler
: testHandler
,
261 errorHandler
: testHandler
,
262 exitHandler
: testHandler
264 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
266 runTime
: { median
: true },
267 waitTime
: { median
: false },
268 elu
: { median
: false },
269 weights
: { 0: 300, 1: 200 }
271 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
272 .workerChoiceStrategies
) {
273 expect(workerChoiceStrategy
.opts
).toStrictEqual({
275 runTime
: { median
: true },
276 waitTime
: { median
: false },
277 elu
: { median
: false },
278 weights
: { 0: 300, 1: 200 }
284 it('Verify that pool options are validated', async () => {
289 './tests/worker-files/thread/testWorker.js',
291 workerChoiceStrategy
: 'invalidStrategy'
295 new Error("Invalid worker choice strategy 'invalidStrategy'")
301 './tests/worker-files/thread/testWorker.js',
303 workerChoiceStrategyOptions
: {
304 retries
: 'invalidChoiceRetries'
310 'Invalid worker choice strategy options: retries must be an integer'
317 './tests/worker-files/thread/testWorker.js',
319 workerChoiceStrategyOptions
: {
326 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
333 './tests/worker-files/thread/testWorker.js',
335 workerChoiceStrategyOptions
: { weights
: {} }
340 'Invalid worker choice strategy options: must have a weight for each worker node'
347 './tests/worker-files/thread/testWorker.js',
349 workerChoiceStrategyOptions
: { measurement
: 'invalidMeasurement' }
354 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
361 './tests/worker-files/thread/testWorker.js',
363 enableTasksQueue
: true,
364 tasksQueueOptions
: 'invalidTasksQueueOptions'
368 new TypeError('Invalid tasks queue options: must be a plain object')
374 './tests/worker-files/thread/testWorker.js',
376 enableTasksQueue
: true,
377 tasksQueueOptions
: { concurrency
: 0 }
382 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
389 './tests/worker-files/thread/testWorker.js',
391 enableTasksQueue
: true,
392 tasksQueueOptions
: { concurrency
: -1 }
397 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
404 './tests/worker-files/thread/testWorker.js',
406 enableTasksQueue
: true,
407 tasksQueueOptions
: { concurrency
: 0.2 }
411 new TypeError('Invalid worker node tasks concurrency: must be an integer')
417 './tests/worker-files/thread/testWorker.js',
419 enableTasksQueue
: true,
420 tasksQueueOptions
: { size
: 0 }
425 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
432 './tests/worker-files/thread/testWorker.js',
434 enableTasksQueue
: true,
435 tasksQueueOptions
: { size
: -1 }
440 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
447 './tests/worker-files/thread/testWorker.js',
449 enableTasksQueue
: true,
450 tasksQueueOptions
: { size
: 0.2 }
454 new TypeError('Invalid worker node tasks queue size: must be an integer')
458 it('Verify that pool worker choice strategy options can be set', async () => {
459 const pool
= new FixedThreadPool(
461 './tests/worker-files/thread/testWorker.js',
462 { workerChoiceStrategy
: WorkerChoiceStrategies
.FAIR_SHARE
}
464 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
466 runTime
: { median
: false },
467 waitTime
: { median
: false },
468 elu
: { median
: false }
470 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
472 runTime
: { median
: false },
473 waitTime
: { median
: false },
474 elu
: { median
: false }
476 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
477 .workerChoiceStrategies
) {
478 expect(workerChoiceStrategy
.opts
).toStrictEqual({
480 runTime
: { median
: false },
481 waitTime
: { median
: false },
482 elu
: { median
: false }
486 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
504 pool
.setWorkerChoiceStrategyOptions({
505 runTime
: { median
: true },
506 elu
: { median
: true }
508 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
510 runTime
: { median
: true },
511 waitTime
: { median
: false },
512 elu
: { median
: true }
514 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
516 runTime
: { median
: true },
517 waitTime
: { median
: false },
518 elu
: { median
: true }
520 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
521 .workerChoiceStrategies
) {
522 expect(workerChoiceStrategy
.opts
).toStrictEqual({
524 runTime
: { median
: true },
525 waitTime
: { median
: false },
526 elu
: { median
: true }
530 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
548 pool
.setWorkerChoiceStrategyOptions({
549 runTime
: { median
: false },
550 elu
: { median
: false }
552 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
554 runTime
: { median
: false },
555 waitTime
: { median
: false },
556 elu
: { median
: false }
558 expect(pool
.workerChoiceStrategyContext
.opts
).toStrictEqual({
560 runTime
: { median
: false },
561 waitTime
: { median
: false },
562 elu
: { median
: false }
564 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
565 .workerChoiceStrategies
) {
566 expect(workerChoiceStrategy
.opts
).toStrictEqual({
568 runTime
: { median
: false },
569 waitTime
: { median
: false },
570 elu
: { median
: false }
574 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
593 pool
.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
596 'Invalid worker choice strategy options: must be a plain object'
600 pool
.setWorkerChoiceStrategyOptions({
601 retries
: 'invalidChoiceRetries'
605 'Invalid worker choice strategy options: retries must be an integer'
609 pool
.setWorkerChoiceStrategyOptions({ retries
: -1 })
612 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
616 pool
.setWorkerChoiceStrategyOptions({ weights
: {} })
619 'Invalid worker choice strategy options: must have a weight for each worker node'
623 pool
.setWorkerChoiceStrategyOptions({ measurement
: 'invalidMeasurement' })
626 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
632 it('Verify that pool tasks queue can be enabled/disabled', async () => {
633 const pool
= new FixedThreadPool(
635 './tests/worker-files/thread/testWorker.js'
637 expect(pool
.opts
.enableTasksQueue
).toBe(false)
638 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
639 for (const workerNode
of pool
.workerNodes
) {
640 expect(workerNode
.onEmptyQueue
).toBeUndefined()
641 expect(workerNode
.onBackPressure
).toBeUndefined()
643 pool
.enableTasksQueue(true)
644 expect(pool
.opts
.enableTasksQueue
).toBe(true)
645 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({
647 size
: Math
.pow(numberOfWorkers
, 2),
649 tasksStealingOnBackPressure
: true
651 for (const workerNode
of pool
.workerNodes
) {
652 expect(workerNode
.onEmptyQueue
).toBeInstanceOf(Function
)
653 expect(workerNode
.onBackPressure
).toBeInstanceOf(Function
)
655 pool
.enableTasksQueue(true, { concurrency
: 2 })
656 expect(pool
.opts
.enableTasksQueue
).toBe(true)
657 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({
659 size
: Math
.pow(numberOfWorkers
, 2),
661 tasksStealingOnBackPressure
: true
663 for (const workerNode
of pool
.workerNodes
) {
664 expect(workerNode
.onEmptyQueue
).toBeInstanceOf(Function
)
665 expect(workerNode
.onBackPressure
).toBeInstanceOf(Function
)
667 pool
.enableTasksQueue(false)
668 expect(pool
.opts
.enableTasksQueue
).toBe(false)
669 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
670 for (const workerNode
of pool
.workerNodes
) {
671 expect(workerNode
.onEmptyQueue
).toBeUndefined()
672 expect(workerNode
.onBackPressure
).toBeUndefined()
677 it('Verify that pool tasks queue options can be set', async () => {
678 const pool
= new FixedThreadPool(
680 './tests/worker-files/thread/testWorker.js',
681 { enableTasksQueue
: true }
683 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({
685 size
: Math
.pow(numberOfWorkers
, 2),
687 tasksStealingOnBackPressure
: true
689 for (const workerNode
of pool
.workerNodes
) {
690 expect(workerNode
.tasksQueueBackPressureSize
).toBe(
691 pool
.opts
.tasksQueueOptions
.size
693 expect(workerNode
.onEmptyQueue
).toBeInstanceOf(Function
)
694 expect(workerNode
.onBackPressure
).toBeInstanceOf(Function
)
696 pool
.setTasksQueueOptions({
700 tasksStealingOnBackPressure
: false
702 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({
706 tasksStealingOnBackPressure
: false
708 for (const workerNode
of pool
.workerNodes
) {
709 expect(workerNode
.tasksQueueBackPressureSize
).toBe(
710 pool
.opts
.tasksQueueOptions
.size
712 expect(workerNode
.onEmptyQueue
).toBeUndefined()
713 expect(workerNode
.onBackPressure
).toBeUndefined()
715 pool
.setTasksQueueOptions({
718 tasksStealingOnBackPressure
: true
720 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({
722 size
: Math
.pow(numberOfWorkers
, 2),
724 tasksStealingOnBackPressure
: true
726 for (const workerNode
of pool
.workerNodes
) {
727 expect(workerNode
.tasksQueueBackPressureSize
).toBe(
728 pool
.opts
.tasksQueueOptions
.size
730 expect(workerNode
.onEmptyQueue
).toBeInstanceOf(Function
)
731 expect(workerNode
.onBackPressure
).toBeInstanceOf(Function
)
734 pool
.setTasksQueueOptions('invalidTasksQueueOptions')
736 new TypeError('Invalid tasks queue options: must be a plain object')
738 expect(() => pool
.setTasksQueueOptions({ concurrency
: 0 })).toThrowError(
740 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
743 expect(() => pool
.setTasksQueueOptions({ concurrency
: -1 })).toThrowError(
745 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
748 expect(() => pool
.setTasksQueueOptions({ concurrency
: 0.2 })).toThrowError(
749 new TypeError('Invalid worker node tasks concurrency: must be an integer')
751 expect(() => pool
.setTasksQueueOptions({ size
: 0 })).toThrowError(
753 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
756 expect(() => pool
.setTasksQueueOptions({ size
: -1 })).toThrowError(
758 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
761 expect(() => pool
.setTasksQueueOptions({ size
: 0.2 })).toThrowError(
762 new TypeError('Invalid worker node tasks queue size: must be an integer')
767 it('Verify that pool info is set', async () => {
768 let pool
= new FixedThreadPool(
770 './tests/worker-files/thread/testWorker.js'
772 expect(pool
.info
).toStrictEqual({
774 type
: PoolTypes
.fixed
,
775 worker
: WorkerTypes
.thread
,
778 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
779 minSize
: numberOfWorkers
,
780 maxSize
: numberOfWorkers
,
781 workerNodes
: numberOfWorkers
,
782 idleWorkerNodes
: numberOfWorkers
,
789 pool
= new DynamicClusterPool(
790 Math
.floor(numberOfWorkers
/ 2),
792 './tests/worker-files/cluster/testWorker.js'
794 expect(pool
.info
).toStrictEqual({
796 type
: PoolTypes
.dynamic
,
797 worker
: WorkerTypes
.cluster
,
800 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
801 minSize
: Math
.floor(numberOfWorkers
/ 2),
802 maxSize
: numberOfWorkers
,
803 workerNodes
: Math
.floor(numberOfWorkers
/ 2),
804 idleWorkerNodes
: Math
.floor(numberOfWorkers
/ 2),
813 it('Verify that pool worker tasks usage are initialized', async () => {
814 const pool
= new FixedClusterPool(
816 './tests/worker-files/cluster/testWorker.js'
818 for (const workerNode
of pool
.workerNodes
) {
819 expect(workerNode
).toBeInstanceOf(WorkerNode
)
820 expect(workerNode
.usage
).toStrictEqual({
830 history
: new CircularArray()
833 history
: new CircularArray()
837 history
: new CircularArray()
840 history
: new CircularArray()
848 it('Verify that pool worker tasks queue are initialized', async () => {
849 let pool
= new FixedClusterPool(
851 './tests/worker-files/cluster/testWorker.js'
853 for (const workerNode
of pool
.workerNodes
) {
854 expect(workerNode
).toBeInstanceOf(WorkerNode
)
855 expect(workerNode
.tasksQueue
).toBeInstanceOf(Deque
)
856 expect(workerNode
.tasksQueue
.size
).toBe(0)
857 expect(workerNode
.tasksQueue
.maxSize
).toBe(0)
860 pool
= new DynamicThreadPool(
861 Math
.floor(numberOfWorkers
/ 2),
863 './tests/worker-files/thread/testWorker.js'
865 for (const workerNode
of pool
.workerNodes
) {
866 expect(workerNode
).toBeInstanceOf(WorkerNode
)
867 expect(workerNode
.tasksQueue
).toBeInstanceOf(Deque
)
868 expect(workerNode
.tasksQueue
.size
).toBe(0)
869 expect(workerNode
.tasksQueue
.maxSize
).toBe(0)
874 it('Verify that pool worker info are initialized', async () => {
875 let pool
= new FixedClusterPool(
877 './tests/worker-files/cluster/testWorker.js'
879 for (const workerNode
of pool
.workerNodes
) {
880 expect(workerNode
).toBeInstanceOf(WorkerNode
)
881 expect(workerNode
.info
).toStrictEqual({
882 id
: expect
.any(Number
),
883 type
: WorkerTypes
.cluster
,
889 pool
= new DynamicThreadPool(
890 Math
.floor(numberOfWorkers
/ 2),
892 './tests/worker-files/thread/testWorker.js'
894 for (const workerNode
of pool
.workerNodes
) {
895 expect(workerNode
).toBeInstanceOf(WorkerNode
)
896 expect(workerNode
.info
).toStrictEqual({
897 id
: expect
.any(Number
),
898 type
: WorkerTypes
.thread
,
906 it('Verify that pool can be started after initialization', async () => {
907 const pool
= new FixedClusterPool(
909 './tests/worker-files/cluster/testWorker.js',
914 expect(pool
.info
.started
).toBe(false)
915 expect(pool
.info
.ready
).toBe(false)
916 expect(pool
.workerNodes
).toStrictEqual([])
917 await
expect(pool
.execute()).rejects
.toThrowError(
918 new Error('Cannot execute a task on not started pool')
921 expect(pool
.info
.started
).toBe(true)
922 expect(pool
.info
.ready
).toBe(true)
923 expect(pool
.workerNodes
.length
).toBe(numberOfWorkers
)
924 for (const workerNode
of pool
.workerNodes
) {
925 expect(workerNode
).toBeInstanceOf(WorkerNode
)
930 it('Verify that pool execute() arguments are checked', async () => {
931 const pool
= new FixedClusterPool(
933 './tests/worker-files/cluster/testWorker.js'
935 await
expect(pool
.execute(undefined, 0)).rejects
.toThrowError(
936 new TypeError('name argument must be a string')
938 await
expect(pool
.execute(undefined, '')).rejects
.toThrowError(
939 new TypeError('name argument must not be an empty string')
941 await
expect(pool
.execute(undefined, undefined, {})).rejects
.toThrowError(
942 new TypeError('transferList argument must be an array')
944 await
expect(pool
.execute(undefined, 'unknown')).rejects
.toBe(
945 "Task function 'unknown' not found"
948 await
expect(pool
.execute()).rejects
.toThrowError(
949 new Error('Cannot execute a task on not started pool')
953 it('Verify that pool worker tasks usage are computed', async () => {
954 const pool
= new FixedClusterPool(
956 './tests/worker-files/cluster/testWorker.js'
958 const promises
= new Set()
959 const maxMultiplier
= 2
960 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
961 promises
.add(pool
.execute())
963 for (const workerNode
of pool
.workerNodes
) {
964 expect(workerNode
.usage
).toStrictEqual({
967 executing
: maxMultiplier
,
974 history
: expect
.any(CircularArray
)
977 history
: expect
.any(CircularArray
)
981 history
: expect
.any(CircularArray
)
984 history
: expect
.any(CircularArray
)
989 await Promise
.all(promises
)
990 for (const workerNode
of pool
.workerNodes
) {
991 expect(workerNode
.usage
).toStrictEqual({
993 executed
: maxMultiplier
,
1001 history
: expect
.any(CircularArray
)
1004 history
: expect
.any(CircularArray
)
1008 history
: expect
.any(CircularArray
)
1011 history
: expect
.any(CircularArray
)
1016 await pool
.destroy()
1019 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
1020 const pool
= new DynamicThreadPool(
1021 Math
.floor(numberOfWorkers
/ 2),
1023 './tests/worker-files/thread/testWorker.js'
1025 const promises
= new Set()
1026 const maxMultiplier
= 2
1027 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
1028 promises
.add(pool
.execute())
1030 await Promise
.all(promises
)
1031 for (const workerNode
of pool
.workerNodes
) {
1032 expect(workerNode
.usage
).toStrictEqual({
1034 executed
: expect
.any(Number
),
1042 history
: expect
.any(CircularArray
)
1045 history
: expect
.any(CircularArray
)
1049 history
: expect
.any(CircularArray
)
1052 history
: expect
.any(CircularArray
)
1056 expect(workerNode
.usage
.tasks
.executed
).toBeGreaterThan(0)
1057 expect(workerNode
.usage
.tasks
.executed
).toBeLessThanOrEqual(
1058 numberOfWorkers
* maxMultiplier
1060 expect(workerNode
.usage
.runTime
.history
.length
).toBe(0)
1061 expect(workerNode
.usage
.waitTime
.history
.length
).toBe(0)
1062 expect(workerNode
.usage
.elu
.idle
.history
.length
).toBe(0)
1063 expect(workerNode
.usage
.elu
.active
.history
.length
).toBe(0)
1065 pool
.setWorkerChoiceStrategy(WorkerChoiceStrategies
.FAIR_SHARE
)
1066 for (const workerNode
of pool
.workerNodes
) {
1067 expect(workerNode
.usage
).toStrictEqual({
1077 history
: expect
.any(CircularArray
)
1080 history
: expect
.any(CircularArray
)
1084 history
: expect
.any(CircularArray
)
1087 history
: expect
.any(CircularArray
)
1091 expect(workerNode
.usage
.runTime
.history
.length
).toBe(0)
1092 expect(workerNode
.usage
.waitTime
.history
.length
).toBe(0)
1093 expect(workerNode
.usage
.elu
.idle
.history
.length
).toBe(0)
1094 expect(workerNode
.usage
.elu
.active
.history
.length
).toBe(0)
1096 await pool
.destroy()
1099 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1100 const pool
= new DynamicClusterPool(
1101 Math
.floor(numberOfWorkers
/ 2),
1103 './tests/worker-files/cluster/testWorker.js'
1107 pool
.emitter
.on(PoolEvents
.ready
, info
=> {
1111 await
waitPoolEvents(pool
, PoolEvents
.ready
, 1)
1112 expect(poolReady
).toBe(1)
1113 expect(poolInfo
).toStrictEqual({
1115 type
: PoolTypes
.dynamic
,
1116 worker
: WorkerTypes
.cluster
,
1119 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
1120 minSize
: expect
.any(Number
),
1121 maxSize
: expect
.any(Number
),
1122 workerNodes
: expect
.any(Number
),
1123 idleWorkerNodes
: expect
.any(Number
),
1124 busyWorkerNodes
: expect
.any(Number
),
1125 executedTasks
: expect
.any(Number
),
1126 executingTasks
: expect
.any(Number
),
1127 failedTasks
: expect
.any(Number
)
1129 await pool
.destroy()
1132 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1133 const pool
= new FixedThreadPool(
1135 './tests/worker-files/thread/testWorker.js'
1137 const promises
= new Set()
1140 pool
.emitter
.on(PoolEvents
.busy
, info
=> {
1144 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
1145 promises
.add(pool
.execute())
1147 await Promise
.all(promises
)
1148 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1149 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1150 expect(poolBusy
).toBe(numberOfWorkers
+ 1)
1151 expect(poolInfo
).toStrictEqual({
1153 type
: PoolTypes
.fixed
,
1154 worker
: WorkerTypes
.thread
,
1157 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
1158 minSize
: expect
.any(Number
),
1159 maxSize
: expect
.any(Number
),
1160 workerNodes
: expect
.any(Number
),
1161 idleWorkerNodes
: expect
.any(Number
),
1162 busyWorkerNodes
: expect
.any(Number
),
1163 executedTasks
: expect
.any(Number
),
1164 executingTasks
: expect
.any(Number
),
1165 failedTasks
: expect
.any(Number
)
1167 await pool
.destroy()
1170 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1171 const pool
= new DynamicThreadPool(
1172 Math
.floor(numberOfWorkers
/ 2),
1174 './tests/worker-files/thread/testWorker.js'
1176 const promises
= new Set()
1179 pool
.emitter
.on(PoolEvents
.full
, info
=> {
1183 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
1184 promises
.add(pool
.execute())
1186 await Promise
.all(promises
)
1187 expect(poolFull
).toBe(1)
1188 expect(poolInfo
).toStrictEqual({
1190 type
: PoolTypes
.dynamic
,
1191 worker
: WorkerTypes
.thread
,
1194 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
1195 minSize
: expect
.any(Number
),
1196 maxSize
: expect
.any(Number
),
1197 workerNodes
: expect
.any(Number
),
1198 idleWorkerNodes
: expect
.any(Number
),
1199 busyWorkerNodes
: expect
.any(Number
),
1200 executedTasks
: expect
.any(Number
),
1201 executingTasks
: expect
.any(Number
),
1202 failedTasks
: expect
.any(Number
)
1204 await pool
.destroy()
1207 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
1208 const pool
= new FixedThreadPool(
1210 './tests/worker-files/thread/testWorker.js',
1212 enableTasksQueue
: true
1215 sinon
.stub(pool
, 'hasBackPressure').returns(true)
1216 const promises
= new Set()
1217 let poolBackPressure
= 0
1219 pool
.emitter
.on(PoolEvents
.backPressure
, info
=> {
1223 for (let i
= 0; i
< numberOfWorkers
+ 1; i
++) {
1224 promises
.add(pool
.execute())
1226 await Promise
.all(promises
)
1227 expect(poolBackPressure
).toBe(1)
1228 expect(poolInfo
).toStrictEqual({
1230 type
: PoolTypes
.fixed
,
1231 worker
: WorkerTypes
.thread
,
1234 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
1235 minSize
: expect
.any(Number
),
1236 maxSize
: expect
.any(Number
),
1237 workerNodes
: expect
.any(Number
),
1238 idleWorkerNodes
: expect
.any(Number
),
1239 busyWorkerNodes
: expect
.any(Number
),
1240 executedTasks
: expect
.any(Number
),
1241 executingTasks
: expect
.any(Number
),
1242 maxQueuedTasks
: expect
.any(Number
),
1243 queuedTasks
: expect
.any(Number
),
1245 stolenTasks
: expect
.any(Number
),
1246 failedTasks
: expect
.any(Number
)
1248 expect(pool
.hasBackPressure
.called
).toBe(true)
1249 await pool
.destroy()
1252 it('Verify that hasTaskFunction() is working', async () => {
1253 const dynamicThreadPool
= new DynamicThreadPool(
1254 Math
.floor(numberOfWorkers
/ 2),
1256 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.js'
1258 await
waitPoolEvents(dynamicThreadPool
, PoolEvents
.ready
, 1)
1259 expect(dynamicThreadPool
.hasTaskFunction(DEFAULT_TASK_NAME
)).toBe(true)
1260 expect(dynamicThreadPool
.hasTaskFunction('jsonIntegerSerialization')).toBe(
1263 expect(dynamicThreadPool
.hasTaskFunction('factorial')).toBe(true)
1264 expect(dynamicThreadPool
.hasTaskFunction('fibonacci')).toBe(true)
1265 expect(dynamicThreadPool
.hasTaskFunction('unknown')).toBe(false)
1266 await dynamicThreadPool
.destroy()
1267 const fixedClusterPool
= new FixedClusterPool(
1269 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1271 await
waitPoolEvents(fixedClusterPool
, PoolEvents
.ready
, 1)
1272 expect(fixedClusterPool
.hasTaskFunction(DEFAULT_TASK_NAME
)).toBe(true)
1273 expect(fixedClusterPool
.hasTaskFunction('jsonIntegerSerialization')).toBe(
1276 expect(fixedClusterPool
.hasTaskFunction('factorial')).toBe(true)
1277 expect(fixedClusterPool
.hasTaskFunction('fibonacci')).toBe(true)
1278 expect(fixedClusterPool
.hasTaskFunction('unknown')).toBe(false)
1279 await fixedClusterPool
.destroy()
1282 it('Verify that addTaskFunction() is working', async () => {
1283 const dynamicThreadPool
= new DynamicThreadPool(
1284 Math
.floor(numberOfWorkers
/ 2),
1286 './tests/worker-files/thread/testWorker.js'
1288 await
waitPoolEvents(dynamicThreadPool
, PoolEvents
.ready
, 1)
1290 dynamicThreadPool
.addTaskFunction(0, () => {})
1291 ).rejects
.toThrowError(new TypeError('name argument must be a string'))
1293 dynamicThreadPool
.addTaskFunction('', () => {})
1294 ).rejects
.toThrowError(
1295 new TypeError('name argument must not be an empty string')
1298 dynamicThreadPool
.addTaskFunction('test', 0)
1299 ).rejects
.toThrowError(new TypeError('fn argument must be a function'))
1301 dynamicThreadPool
.addTaskFunction('test', '')
1302 ).rejects
.toThrowError(new TypeError('fn argument must be a function'))
1303 expect(dynamicThreadPool
.listTaskFunctionNames()).toStrictEqual([
1307 const echoTaskFunction
= data
=> {
1311 dynamicThreadPool
.addTaskFunction('echo', echoTaskFunction
)
1312 ).resolves
.toBe(true)
1313 expect(dynamicThreadPool
.taskFunctions
.size
).toBe(1)
1314 expect(dynamicThreadPool
.taskFunctions
.get('echo')).toStrictEqual(
1317 expect(dynamicThreadPool
.listTaskFunctionNames()).toStrictEqual([
1322 const taskFunctionData
= { test
: 'test' }
1323 const echoResult
= await dynamicThreadPool
.execute(taskFunctionData
, 'echo')
1324 expect(echoResult
).toStrictEqual(taskFunctionData
)
1325 for (const workerNode
of dynamicThreadPool
.workerNodes
) {
1326 expect(workerNode
.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
1328 executed
: expect
.any(Number
),
1335 history
: new CircularArray()
1338 history
: new CircularArray()
1342 history
: new CircularArray()
1345 history
: new CircularArray()
1350 await dynamicThreadPool
.destroy()
1353 it('Verify that removeTaskFunction() is working', async () => {
1354 const dynamicThreadPool
= new DynamicThreadPool(
1355 Math
.floor(numberOfWorkers
/ 2),
1357 './tests/worker-files/thread/testWorker.js'
1359 await
waitPoolEvents(dynamicThreadPool
, PoolEvents
.ready
, 1)
1360 expect(dynamicThreadPool
.listTaskFunctionNames()).toStrictEqual([
1365 dynamicThreadPool
.removeTaskFunction('test')
1366 ).rejects
.toThrowError(
1367 new Error('Cannot remove a task function not handled on the pool side')
1369 const echoTaskFunction
= data
=> {
1372 await dynamicThreadPool
.addTaskFunction('echo', echoTaskFunction
)
1373 expect(dynamicThreadPool
.taskFunctions
.size
).toBe(1)
1374 expect(dynamicThreadPool
.taskFunctions
.get('echo')).toStrictEqual(
1377 expect(dynamicThreadPool
.listTaskFunctionNames()).toStrictEqual([
1382 await
expect(dynamicThreadPool
.removeTaskFunction('echo')).resolves
.toBe(
1385 expect(dynamicThreadPool
.taskFunctions
.size
).toBe(0)
1386 expect(dynamicThreadPool
.taskFunctions
.get('echo')).toBeUndefined()
1387 expect(dynamicThreadPool
.listTaskFunctionNames()).toStrictEqual([
1391 await dynamicThreadPool
.destroy()
1394 it('Verify that listTaskFunctionNames() is working', async () => {
1395 const dynamicThreadPool
= new DynamicThreadPool(
1396 Math
.floor(numberOfWorkers
/ 2),
1398 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.js'
1400 await
waitPoolEvents(dynamicThreadPool
, PoolEvents
.ready
, 1)
1401 expect(dynamicThreadPool
.listTaskFunctionNames()).toStrictEqual([
1403 'jsonIntegerSerialization',
1407 await dynamicThreadPool
.destroy()
1408 const fixedClusterPool
= new FixedClusterPool(
1410 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1412 await
waitPoolEvents(fixedClusterPool
, PoolEvents
.ready
, 1)
1413 expect(fixedClusterPool
.listTaskFunctionNames()).toStrictEqual([
1415 'jsonIntegerSerialization',
1419 await fixedClusterPool
.destroy()
1422 it('Verify that setDefaultTaskFunction() is working', async () => {
1423 const dynamicThreadPool
= new DynamicThreadPool(
1424 Math
.floor(numberOfWorkers
/ 2),
1426 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.js'
1428 await
waitPoolEvents(dynamicThreadPool
, PoolEvents
.ready
, 1)
1430 dynamicThreadPool
.setDefaultTaskFunction(0)
1431 ).rejects
.toThrowError(
1433 "Task function operation 'default' failed on worker 31 with error: 'TypeError: name parameter is not a string'"
1437 dynamicThreadPool
.setDefaultTaskFunction(DEFAULT_TASK_NAME
)
1438 ).rejects
.toThrowError(
1440 "Task function operation 'default' failed on worker 31 with error: 'Error: Cannot set the default task function reserved name as the default task function'"
1444 dynamicThreadPool
.setDefaultTaskFunction('unknown')
1445 ).rejects
.toThrowError(
1447 "Task function operation 'default' failed on worker 31 with error: 'Error: Cannot set the default task function to a non-existing task function'"
1450 expect(dynamicThreadPool
.listTaskFunctionNames()).toStrictEqual([
1452 'jsonIntegerSerialization',
1457 dynamicThreadPool
.setDefaultTaskFunction('factorial')
1458 ).resolves
.toBe(true)
1459 expect(dynamicThreadPool
.listTaskFunctionNames()).toStrictEqual([
1462 'jsonIntegerSerialization',
1466 dynamicThreadPool
.setDefaultTaskFunction('fibonacci')
1467 ).resolves
.toBe(true)
1468 expect(dynamicThreadPool
.listTaskFunctionNames()).toStrictEqual([
1471 'jsonIntegerSerialization',
1476 it('Verify that multiple task functions worker is working', async () => {
1477 const pool
= new DynamicClusterPool(
1478 Math
.floor(numberOfWorkers
/ 2),
1480 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1482 const data
= { n
: 10 }
1483 const result0
= await pool
.execute(data
)
1484 expect(result0
).toStrictEqual({ ok
: 1 })
1485 const result1
= await pool
.execute(data
, 'jsonIntegerSerialization')
1486 expect(result1
).toStrictEqual({ ok
: 1 })
1487 const result2
= await pool
.execute(data
, 'factorial')
1488 expect(result2
).toBe(3628800)
1489 const result3
= await pool
.execute(data
, 'fibonacci')
1490 expect(result3
).toBe(55)
1491 expect(pool
.info
.executingTasks
).toBe(0)
1492 expect(pool
.info
.executedTasks
).toBe(4)
1493 for (const workerNode
of pool
.workerNodes
) {
1494 expect(workerNode
.info
.taskFunctionNames
).toStrictEqual([
1496 'jsonIntegerSerialization',
1500 expect(workerNode
.taskFunctionsUsage
.size
).toBe(3)
1501 for (const name
of pool
.listTaskFunctionNames()) {
1502 expect(workerNode
.getTaskFunctionWorkerUsage(name
)).toStrictEqual({
1504 executed
: expect
.any(Number
),
1511 history
: expect
.any(CircularArray
)
1514 history
: expect
.any(CircularArray
)
1518 history
: expect
.any(CircularArray
)
1521 history
: expect
.any(CircularArray
)
1526 workerNode
.getTaskFunctionWorkerUsage(name
).tasks
.executed
1527 ).toBeGreaterThan(0)
1530 workerNode
.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME
)
1532 workerNode
.getTaskFunctionWorkerUsage(
1533 workerNode
.info
.taskFunctionNames
[1]
1537 await pool
.destroy()
1540 it('Verify sendKillMessageToWorker()', async () => {
1541 const pool
= new DynamicClusterPool(
1542 Math
.floor(numberOfWorkers
/ 2),
1544 './tests/worker-files/cluster/testWorker.js'
1546 const workerNodeKey
= 0
1548 pool
.sendKillMessageToWorker(workerNodeKey
)
1549 ).resolves
.toBeUndefined()
1550 await pool
.destroy()
1553 it('Verify sendTaskFunctionOperationToWorker()', async () => {
1554 const pool
= new DynamicClusterPool(
1555 Math
.floor(numberOfWorkers
/ 2),
1557 './tests/worker-files/cluster/testWorker.js'
1559 const workerNodeKey
= 0
1561 pool
.sendTaskFunctionOperationToWorker(workerNodeKey
, {
1562 taskFunctionOperation
: 'add',
1563 taskFunctionName
: 'empty',
1564 taskFunction
: (() => {}).toString()
1566 ).resolves
.toBe(true)
1568 pool
.workerNodes
[workerNodeKey
].info
.taskFunctionNames
1569 ).toStrictEqual([DEFAULT_TASK_NAME
, 'test', 'empty'])
1570 await pool
.destroy()
1573 it('Verify sendTaskFunctionOperationToWorkers()', async () => {
1574 const pool
= new DynamicClusterPool(
1575 Math
.floor(numberOfWorkers
/ 2),
1577 './tests/worker-files/cluster/testWorker.js'
1580 pool
.sendTaskFunctionOperationToWorkers({
1581 taskFunctionOperation
: 'add',
1582 taskFunctionName
: 'empty',
1583 taskFunction
: (() => {}).toString()
1585 ).resolves
.toBe(true)
1586 for (const workerNode
of pool
.workerNodes
) {
1587 expect(workerNode
.info
.taskFunctionNames
).toStrictEqual([
1593 await pool
.destroy()