1 import { EventEmitterAsyncResource } from 'node:events'
2 import { readFileSync } from 'node:fs'
3 import { expect } from 'expect'
4 import { restore, stub } from 'sinon'
12 WorkerChoiceStrategies,
14 } from '../../lib/index.js'
15 import { CircularArray } from '../../lib/circular-array.js'
16 import { Deque } from '../../lib/deque.js'
17 import { DEFAULT_TASK_NAME } from '../../lib/utils.js'
18 import { waitPoolEvents } from '../test-utils.js'
19 import { WorkerNode } from '../../lib/pools/worker-node.js'
21 describe('Abstract pool test suite', () => {
22 const version = JSON.parse(readFileSync('./package.json', 'utf8')).version
23 const numberOfWorkers = 2
24 class StubPoolWithIsMain extends FixedThreadPool {
34 it('Simulate pool creation from a non main thread/process', () => {
37 new StubPoolWithIsMain(
39 './tests/worker-files/thread/testWorker.js',
41 errorHandler: e => console.error(e)
46 'Cannot start a pool from a worker with the same type as the pool'
51 it('Verify that pool statuses properties are set', async () => {
52 const pool = new FixedThreadPool(
54 './tests/worker-files/thread/testWorker.js'
56 expect(pool.starting).toBe(false)
57 expect(pool.started).toBe(true)
61 it('Verify that filePath is checked', () => {
62 const expectedError = new Error(
63 'Please specify a file with a worker implementation'
65 expect(() => new FixedThreadPool(numberOfWorkers)).toThrowError(
68 expect(() => new FixedThreadPool(numberOfWorkers, '')).toThrowError(
71 expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrowError(
74 expect(() => new FixedThreadPool(numberOfWorkers, true)).toThrowError(
78 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
79 ).toThrowError(new Error("Cannot find the worker file './dummyWorker.ts'"))
82 it('Verify that numberOfWorkers is checked', () => {
87 './tests/worker-files/thread/testWorker.js'
91 'Cannot instantiate a pool without specifying the number of workers'
96 it('Verify that a negative number of workers is checked', () => {
99 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
102 'Cannot instantiate a pool with a negative number of workers'
107 it('Verify that a non integer number of workers is checked', () => {
110 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
113 'Cannot instantiate a pool with a non safe integer number of workers'
118 it('Verify that dynamic pool sizing is checked', () => {
121 new DynamicClusterPool(
124 './tests/worker-files/cluster/testWorker.js'
128 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
133 new DynamicThreadPool(
136 './tests/worker-files/thread/testWorker.js'
140 'Cannot instantiate a pool with a non safe integer number of workers'
145 new DynamicClusterPool(
148 './tests/worker-files/cluster/testWorker.js'
152 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
157 new DynamicThreadPool(2, 1, './tests/worker-files/thread/testWorker.js')
160 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
165 new DynamicThreadPool(0, 0, './tests/worker-files/thread/testWorker.js')
168 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
173 new DynamicClusterPool(
176 './tests/worker-files/cluster/testWorker.js'
180 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
185 it('Verify that pool options are checked', async () => {
186 let pool = new FixedThreadPool(
188 './tests/worker-files/thread/testWorker.js'
190 expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
191 expect(pool.opts).toStrictEqual({
194 restartWorkerOnError: true,
195 enableTasksQueue: false,
196 workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
197 workerChoiceStrategyOptions: {
199 runTime: { median: false },
200 waitTime: { median: false },
201 elu: { median: false }
204 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
206 runTime: { median: false },
207 waitTime: { median: false },
208 elu: { median: false }
210 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
211 .workerChoiceStrategies) {
212 expect(workerChoiceStrategy.opts).toStrictEqual({
214 runTime: { median: false },
215 waitTime: { median: false },
216 elu: { median: false }
220 const testHandler = () => console.info('test handler executed')
221 pool = new FixedThreadPool(
223 './tests/worker-files/thread/testWorker.js',
225 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
226 workerChoiceStrategyOptions: {
227 runTime: { median: true },
228 weights: { 0: 300, 1: 200 }
231 restartWorkerOnError: false,
232 enableTasksQueue: true,
233 tasksQueueOptions: { concurrency: 2 },
234 messageHandler: testHandler,
235 errorHandler: testHandler,
236 onlineHandler: testHandler,
237 exitHandler: testHandler
240 expect(pool.emitter).toBeUndefined()
241 expect(pool.opts).toStrictEqual({
244 restartWorkerOnError: false,
245 enableTasksQueue: true,
248 size: Math.pow(numberOfWorkers, 2),
250 tasksStealingOnBackPressure: true
252 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
253 workerChoiceStrategyOptions: {
255 runTime: { median: true },
256 waitTime: { median: false },
257 elu: { median: false },
258 weights: { 0: 300, 1: 200 }
260 onlineHandler: testHandler,
261 messageHandler: testHandler,
262 errorHandler: testHandler,
263 exitHandler: testHandler
265 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
267 runTime: { median: true },
268 waitTime: { median: false },
269 elu: { median: false },
270 weights: { 0: 300, 1: 200 }
272 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
273 .workerChoiceStrategies) {
274 expect(workerChoiceStrategy.opts).toStrictEqual({
276 runTime: { median: true },
277 waitTime: { median: false },
278 elu: { median: false },
279 weights: { 0: 300, 1: 200 }
285 it('Verify that pool options are validated', async () => {
290 './tests/worker-files/thread/testWorker.js',
292 workerChoiceStrategy: 'invalidStrategy'
296 new Error("Invalid worker choice strategy 'invalidStrategy'")
302 './tests/worker-files/thread/testWorker.js',
304 workerChoiceStrategyOptions: {
305 retries: 'invalidChoiceRetries'
311 'Invalid worker choice strategy options: retries must be an integer'
318 './tests/worker-files/thread/testWorker.js',
320 workerChoiceStrategyOptions: {
327 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
334 './tests/worker-files/thread/testWorker.js',
336 workerChoiceStrategyOptions: { weights: {} }
341 'Invalid worker choice strategy options: must have a weight for each worker node'
348 './tests/worker-files/thread/testWorker.js',
350 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
355 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
362 './tests/worker-files/thread/testWorker.js',
364 enableTasksQueue: true,
365 tasksQueueOptions: 'invalidTasksQueueOptions'
369 new TypeError('Invalid tasks queue options: must be a plain object')
375 './tests/worker-files/thread/testWorker.js',
377 enableTasksQueue: true,
378 tasksQueueOptions: { concurrency: 0 }
383 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
390 './tests/worker-files/thread/testWorker.js',
392 enableTasksQueue: true,
393 tasksQueueOptions: { concurrency: -1 }
398 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
405 './tests/worker-files/thread/testWorker.js',
407 enableTasksQueue: true,
408 tasksQueueOptions: { concurrency: 0.2 }
412 new TypeError('Invalid worker node tasks concurrency: must be an integer')
418 './tests/worker-files/thread/testWorker.js',
420 enableTasksQueue: true,
421 tasksQueueOptions: { size: 0 }
426 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
433 './tests/worker-files/thread/testWorker.js',
435 enableTasksQueue: true,
436 tasksQueueOptions: { size: -1 }
441 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
448 './tests/worker-files/thread/testWorker.js',
450 enableTasksQueue: true,
451 tasksQueueOptions: { size: 0.2 }
455 new TypeError('Invalid worker node tasks queue size: must be an integer')
459 it('Verify that pool worker choice strategy options can be set', async () => {
460 const pool = new FixedThreadPool(
462 './tests/worker-files/thread/testWorker.js',
463 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
465 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
467 runTime: { median: false },
468 waitTime: { median: false },
469 elu: { median: false }
471 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
473 runTime: { median: false },
474 waitTime: { median: false },
475 elu: { median: false }
477 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
478 .workerChoiceStrategies) {
479 expect(workerChoiceStrategy.opts).toStrictEqual({
481 runTime: { median: false },
482 waitTime: { median: false },
483 elu: { median: false }
487 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
505 pool.setWorkerChoiceStrategyOptions({
506 runTime: { median: true },
507 elu: { median: true }
509 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
511 runTime: { median: true },
512 waitTime: { median: false },
513 elu: { median: true }
515 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
517 runTime: { median: true },
518 waitTime: { median: false },
519 elu: { median: true }
521 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
522 .workerChoiceStrategies) {
523 expect(workerChoiceStrategy.opts).toStrictEqual({
525 runTime: { median: true },
526 waitTime: { median: false },
527 elu: { median: true }
531 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
549 pool.setWorkerChoiceStrategyOptions({
550 runTime: { median: false },
551 elu: { median: false }
553 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
555 runTime: { median: false },
556 waitTime: { median: false },
557 elu: { median: false }
559 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
561 runTime: { median: false },
562 waitTime: { median: false },
563 elu: { median: false }
565 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
566 .workerChoiceStrategies) {
567 expect(workerChoiceStrategy.opts).toStrictEqual({
569 runTime: { median: false },
570 waitTime: { median: false },
571 elu: { median: false }
575 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
594 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
597 'Invalid worker choice strategy options: must be a plain object'
601 pool.setWorkerChoiceStrategyOptions({
602 retries: 'invalidChoiceRetries'
606 'Invalid worker choice strategy options: retries must be an integer'
610 pool.setWorkerChoiceStrategyOptions({ retries: -1 })
613 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
617 pool.setWorkerChoiceStrategyOptions({ weights: {} })
620 'Invalid worker choice strategy options: must have a weight for each worker node'
624 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
627 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
633 it('Verify that pool tasks queue can be enabled/disabled', async () => {
634 const pool = new FixedThreadPool(
636 './tests/worker-files/thread/testWorker.js'
638 expect(pool.opts.enableTasksQueue).toBe(false)
639 expect(pool.opts.tasksQueueOptions).toBeUndefined()
640 for (const workerNode of pool.workerNodes) {
641 expect(workerNode.onEmptyQueue).toBeUndefined()
642 expect(workerNode.onBackPressure).toBeUndefined()
644 pool.enableTasksQueue(true)
645 expect(pool.opts.enableTasksQueue).toBe(true)
646 expect(pool.opts.tasksQueueOptions).toStrictEqual({
648 size: Math.pow(numberOfWorkers, 2),
650 tasksStealingOnBackPressure: true
652 for (const workerNode of pool.workerNodes) {
653 expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
654 expect(workerNode.onBackPressure).toBeInstanceOf(Function)
656 pool.enableTasksQueue(true, { concurrency: 2 })
657 expect(pool.opts.enableTasksQueue).toBe(true)
658 expect(pool.opts.tasksQueueOptions).toStrictEqual({
660 size: Math.pow(numberOfWorkers, 2),
662 tasksStealingOnBackPressure: true
664 for (const workerNode of pool.workerNodes) {
665 expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
666 expect(workerNode.onBackPressure).toBeInstanceOf(Function)
668 pool.enableTasksQueue(false)
669 expect(pool.opts.enableTasksQueue).toBe(false)
670 expect(pool.opts.tasksQueueOptions).toBeUndefined()
671 for (const workerNode of pool.workerNodes) {
672 expect(workerNode.onEmptyQueue).toBeUndefined()
673 expect(workerNode.onBackPressure).toBeUndefined()
678 it('Verify that pool tasks queue options can be set', async () => {
679 const pool = new FixedThreadPool(
681 './tests/worker-files/thread/testWorker.js',
682 { enableTasksQueue: true }
684 expect(pool.opts.tasksQueueOptions).toStrictEqual({
686 size: Math.pow(numberOfWorkers, 2),
688 tasksStealingOnBackPressure: true
690 for (const workerNode of pool.workerNodes) {
691 expect(workerNode.tasksQueueBackPressureSize).toBe(
692 pool.opts.tasksQueueOptions.size
694 expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
695 expect(workerNode.onBackPressure).toBeInstanceOf(Function)
697 pool.setTasksQueueOptions({
701 tasksStealingOnBackPressure: false
703 expect(pool.opts.tasksQueueOptions).toStrictEqual({
707 tasksStealingOnBackPressure: false
709 for (const workerNode of pool.workerNodes) {
710 expect(workerNode.tasksQueueBackPressureSize).toBe(
711 pool.opts.tasksQueueOptions.size
713 expect(workerNode.onEmptyQueue).toBeUndefined()
714 expect(workerNode.onBackPressure).toBeUndefined()
716 pool.setTasksQueueOptions({
719 tasksStealingOnBackPressure: true
721 expect(pool.opts.tasksQueueOptions).toStrictEqual({
723 size: Math.pow(numberOfWorkers, 2),
725 tasksStealingOnBackPressure: true
727 for (const workerNode of pool.workerNodes) {
728 expect(workerNode.tasksQueueBackPressureSize).toBe(
729 pool.opts.tasksQueueOptions.size
731 expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
732 expect(workerNode.onBackPressure).toBeInstanceOf(Function)
735 pool.setTasksQueueOptions('invalidTasksQueueOptions')
737 new TypeError('Invalid tasks queue options: must be a plain object')
739 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrowError(
741 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
744 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrowError(
746 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
749 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrowError(
750 new TypeError('Invalid worker node tasks concurrency: must be an integer')
752 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrowError(
754 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
757 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrowError(
759 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
762 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrowError(
763 new TypeError('Invalid worker node tasks queue size: must be an integer')
768 it('Verify that pool info is set', async () => {
769 let pool = new FixedThreadPool(
771 './tests/worker-files/thread/testWorker.js'
773 expect(pool.info).toStrictEqual({
775 type: PoolTypes.fixed,
776 worker: WorkerTypes.thread,
779 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
780 minSize: numberOfWorkers,
781 maxSize: numberOfWorkers,
782 workerNodes: numberOfWorkers,
783 idleWorkerNodes: numberOfWorkers,
790 pool = new DynamicClusterPool(
791 Math.floor(numberOfWorkers / 2),
793 './tests/worker-files/cluster/testWorker.js'
795 expect(pool.info).toStrictEqual({
797 type: PoolTypes.dynamic,
798 worker: WorkerTypes.cluster,
801 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
802 minSize: Math.floor(numberOfWorkers / 2),
803 maxSize: numberOfWorkers,
804 workerNodes: Math.floor(numberOfWorkers / 2),
805 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
814 it('Verify that pool worker tasks usage are initialized', async () => {
815 const pool = new FixedClusterPool(
817 './tests/worker-files/cluster/testWorker.js'
819 for (const workerNode of pool.workerNodes) {
820 expect(workerNode).toBeInstanceOf(WorkerNode)
821 expect(workerNode.usage).toStrictEqual({
831 history: new CircularArray()
834 history: new CircularArray()
838 history: new CircularArray()
841 history: new CircularArray()
849 it('Verify that pool worker tasks queue are initialized', async () => {
850 let pool = new FixedClusterPool(
852 './tests/worker-files/cluster/testWorker.js'
854 for (const workerNode of pool.workerNodes) {
855 expect(workerNode).toBeInstanceOf(WorkerNode)
856 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
857 expect(workerNode.tasksQueue.size).toBe(0)
858 expect(workerNode.tasksQueue.maxSize).toBe(0)
861 pool = new DynamicThreadPool(
862 Math.floor(numberOfWorkers / 2),
864 './tests/worker-files/thread/testWorker.js'
866 for (const workerNode of pool.workerNodes) {
867 expect(workerNode).toBeInstanceOf(WorkerNode)
868 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
869 expect(workerNode.tasksQueue.size).toBe(0)
870 expect(workerNode.tasksQueue.maxSize).toBe(0)
875 it('Verify that pool worker info are initialized', async () => {
876 let pool = new FixedClusterPool(
878 './tests/worker-files/cluster/testWorker.js'
880 for (const workerNode of pool.workerNodes) {
881 expect(workerNode).toBeInstanceOf(WorkerNode)
882 expect(workerNode.info).toStrictEqual({
883 id: expect.any(Number),
884 type: WorkerTypes.cluster,
890 pool = new DynamicThreadPool(
891 Math.floor(numberOfWorkers / 2),
893 './tests/worker-files/thread/testWorker.js'
895 for (const workerNode of pool.workerNodes) {
896 expect(workerNode).toBeInstanceOf(WorkerNode)
897 expect(workerNode.info).toStrictEqual({
898 id: expect.any(Number),
899 type: WorkerTypes.thread,
907 it('Verify that pool can be started after initialization', async () => {
908 const pool = new FixedClusterPool(
910 './tests/worker-files/cluster/testWorker.js',
915 expect(pool.info.started).toBe(false)
916 expect(pool.info.ready).toBe(false)
917 expect(pool.workerNodes).toStrictEqual([])
918 await expect(pool.execute()).rejects.toThrowError(
919 new Error('Cannot execute a task on not started pool')
922 expect(pool.info.started).toBe(true)
923 expect(pool.info.ready).toBe(true)
924 expect(pool.workerNodes.length).toBe(numberOfWorkers)
925 for (const workerNode of pool.workerNodes) {
926 expect(workerNode).toBeInstanceOf(WorkerNode)
931 it('Verify that pool execute() arguments are checked', async () => {
932 const pool = new FixedClusterPool(
934 './tests/worker-files/cluster/testWorker.js'
936 await expect(pool.execute(undefined, 0)).rejects.toThrowError(
937 new TypeError('name argument must be a string')
939 await expect(pool.execute(undefined, '')).rejects.toThrowError(
940 new TypeError('name argument must not be an empty string')
942 await expect(pool.execute(undefined, undefined, {})).rejects.toThrowError(
943 new TypeError('transferList argument must be an array')
945 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
946 "Task function 'unknown' not found"
949 await expect(pool.execute()).rejects.toThrowError(
950 new Error('Cannot execute a task on not started pool')
954 it('Verify that pool worker tasks usage are computed', async () => {
955 const pool = new FixedClusterPool(
957 './tests/worker-files/cluster/testWorker.js'
959 const promises = new Set()
960 const maxMultiplier = 2
961 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
962 promises.add(pool.execute())
964 for (const workerNode of pool.workerNodes) {
965 expect(workerNode.usage).toStrictEqual({
968 executing: maxMultiplier,
975 history: expect.any(CircularArray)
978 history: expect.any(CircularArray)
982 history: expect.any(CircularArray)
985 history: expect.any(CircularArray)
990 await Promise.all(promises)
991 for (const workerNode of pool.workerNodes) {
992 expect(workerNode.usage).toStrictEqual({
994 executed: maxMultiplier,
1002 history: expect.any(CircularArray)
1005 history: expect.any(CircularArray)
1009 history: expect.any(CircularArray)
1012 history: expect.any(CircularArray)
1017 await pool.destroy()
1020 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
1021 const pool = new DynamicThreadPool(
1022 Math.floor(numberOfWorkers / 2),
1024 './tests/worker-files/thread/testWorker.js'
1026 const promises = new Set()
1027 const maxMultiplier = 2
1028 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1029 promises.add(pool.execute())
1031 await Promise.all(promises)
1032 for (const workerNode of pool.workerNodes) {
1033 expect(workerNode.usage).toStrictEqual({
1035 executed: expect.any(Number),
1043 history: expect.any(CircularArray)
1046 history: expect.any(CircularArray)
1050 history: expect.any(CircularArray)
1053 history: expect.any(CircularArray)
1057 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
1058 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1059 numberOfWorkers * maxMultiplier
1061 expect(workerNode.usage.runTime.history.length).toBe(0)
1062 expect(workerNode.usage.waitTime.history.length).toBe(0)
1063 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1064 expect(workerNode.usage.elu.active.history.length).toBe(0)
1066 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
1067 for (const workerNode of pool.workerNodes) {
1068 expect(workerNode.usage).toStrictEqual({
1078 history: expect.any(CircularArray)
1081 history: expect.any(CircularArray)
1085 history: expect.any(CircularArray)
1088 history: expect.any(CircularArray)
1092 expect(workerNode.usage.runTime.history.length).toBe(0)
1093 expect(workerNode.usage.waitTime.history.length).toBe(0)
1094 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1095 expect(workerNode.usage.elu.active.history.length).toBe(0)
1097 await pool.destroy()
1100 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1101 const pool = new DynamicClusterPool(
1102 Math.floor(numberOfWorkers / 2),
1104 './tests/worker-files/cluster/testWorker.js'
1106 expect(pool.emitter.eventNames()).toStrictEqual([])
1109 pool.emitter.on(PoolEvents.ready, info => {
1113 await waitPoolEvents(pool, PoolEvents.ready, 1)
1114 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
1115 expect(poolReady).toBe(1)
1116 expect(poolInfo).toStrictEqual({
1118 type: PoolTypes.dynamic,
1119 worker: WorkerTypes.cluster,
1122 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1123 minSize: expect.any(Number),
1124 maxSize: expect.any(Number),
1125 workerNodes: expect.any(Number),
1126 idleWorkerNodes: expect.any(Number),
1127 busyWorkerNodes: expect.any(Number),
1128 executedTasks: expect.any(Number),
1129 executingTasks: expect.any(Number),
1130 failedTasks: expect.any(Number)
1132 await pool.destroy()
1135 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1136 const pool = new FixedThreadPool(
1138 './tests/worker-files/thread/testWorker.js'
1140 expect(pool.emitter.eventNames()).toStrictEqual([])
1141 const promises = new Set()
1144 pool.emitter.on(PoolEvents.busy, info => {
1148 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
1149 for (let i = 0; i < numberOfWorkers * 2; i++) {
1150 promises.add(pool.execute())
1152 await Promise.all(promises)
1153 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1154 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1155 expect(poolBusy).toBe(numberOfWorkers + 1)
1156 expect(poolInfo).toStrictEqual({
1158 type: PoolTypes.fixed,
1159 worker: WorkerTypes.thread,
1162 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1163 minSize: expect.any(Number),
1164 maxSize: expect.any(Number),
1165 workerNodes: expect.any(Number),
1166 idleWorkerNodes: expect.any(Number),
1167 busyWorkerNodes: expect.any(Number),
1168 executedTasks: expect.any(Number),
1169 executingTasks: expect.any(Number),
1170 failedTasks: expect.any(Number)
1172 await pool.destroy()
1175 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1176 const pool = new DynamicThreadPool(
1177 Math.floor(numberOfWorkers / 2),
1179 './tests/worker-files/thread/testWorker.js'
1181 expect(pool.emitter.eventNames()).toStrictEqual([])
1182 const promises = new Set()
1185 pool.emitter.on(PoolEvents.full, info => {
1189 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.full])
1190 for (let i = 0; i < numberOfWorkers * 2; i++) {
1191 promises.add(pool.execute())
1193 await Promise.all(promises)
1194 expect(poolFull).toBe(1)
1195 expect(poolInfo).toStrictEqual({
1197 type: PoolTypes.dynamic,
1198 worker: WorkerTypes.thread,
1201 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1202 minSize: expect.any(Number),
1203 maxSize: expect.any(Number),
1204 workerNodes: expect.any(Number),
1205 idleWorkerNodes: expect.any(Number),
1206 busyWorkerNodes: expect.any(Number),
1207 executedTasks: expect.any(Number),
1208 executingTasks: expect.any(Number),
1209 failedTasks: expect.any(Number)
1211 await pool.destroy()
1214 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
1215 const pool = new FixedThreadPool(
1217 './tests/worker-files/thread/testWorker.js',
1219 enableTasksQueue: true
1222 stub(pool, 'hasBackPressure').returns(true)
1223 expect(pool.emitter.eventNames()).toStrictEqual([])
1224 const promises = new Set()
1225 let poolBackPressure = 0
1227 pool.emitter.on(PoolEvents.backPressure, info => {
1231 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.backPressure])
1232 for (let i = 0; i < numberOfWorkers + 1; i++) {
1233 promises.add(pool.execute())
1235 await Promise.all(promises)
1236 expect(poolBackPressure).toBe(1)
1237 expect(poolInfo).toStrictEqual({
1239 type: PoolTypes.fixed,
1240 worker: WorkerTypes.thread,
1243 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1244 minSize: expect.any(Number),
1245 maxSize: expect.any(Number),
1246 workerNodes: expect.any(Number),
1247 idleWorkerNodes: expect.any(Number),
1248 busyWorkerNodes: expect.any(Number),
1249 executedTasks: expect.any(Number),
1250 executingTasks: expect.any(Number),
1251 maxQueuedTasks: expect.any(Number),
1252 queuedTasks: expect.any(Number),
1254 stolenTasks: expect.any(Number),
1255 failedTasks: expect.any(Number)
1257 expect(pool.hasBackPressure.called).toBe(true)
1258 await pool.destroy()
1261 it('Verify that hasTaskFunction() is working', async () => {
1262 const dynamicThreadPool = new DynamicThreadPool(
1263 Math.floor(numberOfWorkers / 2),
1265 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.js'
1267 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1268 expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1269 expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1272 expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true)
1273 expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true)
1274 expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false)
1275 await dynamicThreadPool.destroy()
1276 const fixedClusterPool = new FixedClusterPool(
1278 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1280 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1281 expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1282 expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1285 expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true)
1286 expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true)
1287 expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false)
1288 await fixedClusterPool.destroy()
1291 it('Verify that addTaskFunction() is working', async () => {
1292 const dynamicThreadPool = new DynamicThreadPool(
1293 Math.floor(numberOfWorkers / 2),
1295 './tests/worker-files/thread/testWorker.js'
1297 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1299 dynamicThreadPool.addTaskFunction(0, () => {})
1300 ).rejects.toThrowError(new TypeError('name argument must be a string'))
1302 dynamicThreadPool.addTaskFunction('', () => {})
1303 ).rejects.toThrowError(
1304 new TypeError('name argument must not be an empty string')
1307 dynamicThreadPool.addTaskFunction('test', 0)
1308 ).rejects.toThrowError(new TypeError('fn argument must be a function'))
1310 dynamicThreadPool.addTaskFunction('test', '')
1311 ).rejects.toThrowError(new TypeError('fn argument must be a function'))
1312 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1316 const echoTaskFunction = data => {
1320 dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1321 ).resolves.toBe(true)
1322 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1323 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1326 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1331 const taskFunctionData = { test: 'test' }
1332 const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
1333 expect(echoResult).toStrictEqual(taskFunctionData)
1334 for (const workerNode of dynamicThreadPool.workerNodes) {
1335 expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
1337 executed: expect.any(Number),
1344 history: new CircularArray()
1347 history: new CircularArray()
1351 history: new CircularArray()
1354 history: new CircularArray()
1359 await dynamicThreadPool.destroy()
1362 it('Verify that removeTaskFunction() is working', async () => {
1363 const dynamicThreadPool = new DynamicThreadPool(
1364 Math.floor(numberOfWorkers / 2),
1366 './tests/worker-files/thread/testWorker.js'
1368 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1369 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1374 dynamicThreadPool.removeTaskFunction('test')
1375 ).rejects.toThrowError(
1376 new Error('Cannot remove a task function not handled on the pool side')
1378 const echoTaskFunction = data => {
1381 await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1382 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1383 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1386 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1391 await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
1394 expect(dynamicThreadPool.taskFunctions.size).toBe(0)
1395 expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
1396 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1400 await dynamicThreadPool.destroy()
1403 it('Verify that listTaskFunctionNames() is working', async () => {
1404 const dynamicThreadPool = new DynamicThreadPool(
1405 Math.floor(numberOfWorkers / 2),
1407 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.js'
1409 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1410 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1412 'jsonIntegerSerialization',
1416 await dynamicThreadPool.destroy()
1417 const fixedClusterPool = new FixedClusterPool(
1419 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1421 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1422 expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([
1424 'jsonIntegerSerialization',
1428 await fixedClusterPool.destroy()
1431 it('Verify that setDefaultTaskFunction() is working', async () => {
1432 const dynamicThreadPool = new DynamicThreadPool(
1433 Math.floor(numberOfWorkers / 2),
1435 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.js'
1437 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1439 dynamicThreadPool.setDefaultTaskFunction(0)
1440 ).rejects.toThrowError(
1442 "Task function operation 'default' failed on worker 31 with error: 'TypeError: name parameter is not a string'"
1446 dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
1447 ).rejects.toThrowError(
1449 "Task function operation 'default' failed on worker 31 with error: 'Error: Cannot set the default task function reserved name as the default task function'"
1453 dynamicThreadPool.setDefaultTaskFunction('unknown')
1454 ).rejects.toThrowError(
1456 "Task function operation 'default' failed on worker 31 with error: 'Error: Cannot set the default task function to a non-existing task function'"
1459 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1461 'jsonIntegerSerialization',
1466 dynamicThreadPool.setDefaultTaskFunction('factorial')
1467 ).resolves.toBe(true)
1468 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1471 'jsonIntegerSerialization',
1475 dynamicThreadPool.setDefaultTaskFunction('fibonacci')
1476 ).resolves.toBe(true)
1477 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1480 'jsonIntegerSerialization',
1485 it('Verify that multiple task functions worker is working', async () => {
1486 const pool = new DynamicClusterPool(
1487 Math.floor(numberOfWorkers / 2),
1489 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1491 const data = { n: 10 }
1492 const result0 = await pool.execute(data)
1493 expect(result0).toStrictEqual({ ok: 1 })
1494 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
1495 expect(result1).toStrictEqual({ ok: 1 })
1496 const result2 = await pool.execute(data, 'factorial')
1497 expect(result2).toBe(3628800)
1498 const result3 = await pool.execute(data, 'fibonacci')
1499 expect(result3).toBe(55)
1500 expect(pool.info.executingTasks).toBe(0)
1501 expect(pool.info.executedTasks).toBe(4)
1502 for (const workerNode of pool.workerNodes) {
1503 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1505 'jsonIntegerSerialization',
1509 expect(workerNode.taskFunctionsUsage.size).toBe(3)
1510 for (const name of pool.listTaskFunctionNames()) {
1511 expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
1513 executed: expect.any(Number),
1520 history: expect.any(CircularArray)
1523 history: expect.any(CircularArray)
1527 history: expect.any(CircularArray)
1530 history: expect.any(CircularArray)
1535 workerNode.getTaskFunctionWorkerUsage(name).tasks.executed
1536 ).toBeGreaterThan(0)
1539 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1541 workerNode.getTaskFunctionWorkerUsage(
1542 workerNode.info.taskFunctionNames[1]
1546 await pool.destroy()
1549 it('Verify sendKillMessageToWorker()', async () => {
1550 const pool = new DynamicClusterPool(
1551 Math.floor(numberOfWorkers / 2),
1553 './tests/worker-files/cluster/testWorker.js'
1555 const workerNodeKey = 0
1557 pool.sendKillMessageToWorker(workerNodeKey)
1558 ).resolves.toBeUndefined()
1559 await pool.destroy()
1562 it('Verify sendTaskFunctionOperationToWorker()', async () => {
1563 const pool = new DynamicClusterPool(
1564 Math.floor(numberOfWorkers / 2),
1566 './tests/worker-files/cluster/testWorker.js'
1568 const workerNodeKey = 0
1570 pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
1571 taskFunctionOperation: 'add',
1572 taskFunctionName: 'empty',
1573 taskFunction: (() => {}).toString()
1575 ).resolves.toBe(true)
1577 pool.workerNodes[workerNodeKey].info.taskFunctionNames
1578 ).toStrictEqual([DEFAULT_TASK_NAME, 'test', 'empty'])
1579 await pool.destroy()
1582 it('Verify sendTaskFunctionOperationToWorkers()', async () => {
1583 const pool = new DynamicClusterPool(
1584 Math.floor(numberOfWorkers / 2),
1586 './tests/worker-files/cluster/testWorker.js'
1589 pool.sendTaskFunctionOperationToWorkers({
1590 taskFunctionOperation: 'add',
1591 taskFunctionName: 'empty',
1592 taskFunction: (() => {}).toString()
1594 ).resolves.toBe(true)
1595 for (const workerNode of pool.workerNodes) {
1596 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1602 await pool.destroy()