From 594bfb844b5eb66cf533715a6ddad1799b1f5f8f Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Fri, 14 Apr 2023 17:13:51 +0200 Subject: [PATCH] test: add tasks queuing tests MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- tests/pools/abstract/abstract-pool.test.js | 2 +- tests/pools/cluster/fixed.test.js | 37 ++ .../selection-strategies.test.js | 349 +++++++----------- .../worker-choice-strategy-context.test.js | 118 +++--- tests/pools/thread/fixed.test.js | 37 ++ 5 files changed, 254 insertions(+), 289 deletions(-) diff --git a/tests/pools/abstract/abstract-pool.test.js b/tests/pools/abstract/abstract-pool.test.js index 61188975..1a5a75a0 100644 --- a/tests/pools/abstract/abstract-pool.test.js +++ b/tests/pools/abstract/abstract-pool.test.js @@ -263,7 +263,7 @@ describe('Abstract pool test suite', () => { promises.push(pool.execute()) } await Promise.all(promises) - // The `full` event is triggered when the number of submitted tasks at once reach the number of dynamic pool workers. + // The `full` event is triggered when the number of submitted tasks at once reach the max number of workers in the dynamic pool. // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool. expect(poolFull).toBe(numberOfWorkers + 1) await pool.destroy() diff --git a/tests/pools/cluster/fixed.test.js b/tests/pools/cluster/fixed.test.js index b9c7d676..ddcca105 100644 --- a/tests/pools/cluster/fixed.test.js +++ b/tests/pools/cluster/fixed.test.js @@ -12,6 +12,17 @@ describe('Fixed cluster pool test suite', () => { errorHandler: e => console.error(e) } ) + const queuePool = new FixedClusterPool( + numberOfWorkers, + './tests/worker-files/cluster/testWorker.js', + { + enableTasksQueue: true, + tasksQueueOptions: { + concurrency: 2 + }, + errorHandler: e => console.error(e) + } + ) const emptyPool = new FixedClusterPool( numberOfWorkers, './tests/worker-files/cluster/emptyWorker.js', @@ -47,6 +58,7 @@ describe('Fixed cluster pool test suite', () => { await errorPool.destroy() await asyncErrorPool.destroy() await emptyPool.destroy() + await queuePool.destroy() }) it('Verify that the function is executed in a worker cluster', async () => { @@ -76,6 +88,31 @@ describe('Fixed cluster pool test suite', () => { expect(poolBusy).toBe(numberOfWorkers + 1) }) + it('Verify that tasks queuing is working', async () => { + const maxMultiplier = 10 + for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) { + queuePool.execute() + } + for (const workerNode of queuePool.workerNodes) { + expect(workerNode.tasksUsage.running).toBeLessThanOrEqual( + queuePool.opts.tasksQueueOptions.concurrency + ) + expect(workerNode.tasksUsage.run).toBe(0) + expect(workerNode.tasksQueue.length).toBeGreaterThan(0) + } + // FIXME: wait for ongoing tasks to be executed + const promises = [] + for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) { + promises.push(queuePool.execute()) + } + await Promise.all(promises) + for (const workerNode of queuePool.workerNodes) { + expect(workerNode.tasksUsage.running).toBe(0) + expect(workerNode.tasksUsage.run).toBeGreaterThan(0) + expect(workerNode.tasksQueue.length).toBe(0) + } + }) + it('Verify that is possible to have a worker that return undefined', async () => { const result = await emptyPool.execute() expect(result).toBeUndefined() diff --git a/tests/pools/selection-strategies/selection-strategies.test.js b/tests/pools/selection-strategies/selection-strategies.test.js index d4d2ec1b..3292ce56 100644 --- a/tests/pools/selection-strategies/selection-strategies.test.js +++ b/tests/pools/selection-strategies/selection-strategies.test.js @@ -33,49 +33,103 @@ describe('Selection strategies test suite', () => { await pool.destroy() }) - it('Verify ROUND_ROBIN strategy is taken at pool creation', async () => { - const pool = new FixedThreadPool( - max, - './tests/worker-files/thread/testWorker.js', - { workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN } - ) - expect(pool.opts.workerChoiceStrategy).toBe( - WorkerChoiceStrategies.ROUND_ROBIN - ) - expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe( - WorkerChoiceStrategies.ROUND_ROBIN - ) - expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - pool.workerChoiceStrategyContext.workerChoiceStrategy - ).nextWorkerNodeId - ).toBe(0) - // We need to clean up the resources after our test - await pool.destroy() + it('Verify available strategies are taken at pool creation', async () => { + for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) { + const pool = new FixedThreadPool( + max, + './tests/worker-files/thread/testWorker.js', + { workerChoiceStrategy } + ) + expect(pool.opts.workerChoiceStrategy).toBe(workerChoiceStrategy) + expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe( + workerChoiceStrategy + ) + await pool.destroy() + } }) - it('Verify ROUND_ROBIN strategy can be set after pool creation', async () => { - const pool = new DynamicThreadPool( - min, + it('Verify available strategies can be set after pool creation', async () => { + for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) { + const pool = new DynamicThreadPool( + min, + max, + './tests/worker-files/thread/testWorker.js', + { workerChoiceStrategy } + ) + pool.setWorkerChoiceStrategy(workerChoiceStrategy) + expect(pool.opts.workerChoiceStrategy).toBe(workerChoiceStrategy) + expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe( + workerChoiceStrategy + ) + await pool.destroy() + } + }) + + it('Verify available strategies default internals at pool creation', async () => { + const pool = new FixedThreadPool( max, './tests/worker-files/thread/testWorker.js' ) - pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.ROUND_ROBIN) - expect(pool.opts.workerChoiceStrategy).toBe( - WorkerChoiceStrategies.ROUND_ROBIN - ) - expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe( - WorkerChoiceStrategies.ROUND_ROBIN - ) - // We need to clean up the resources after our test + for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) { + if (workerChoiceStrategy === WorkerChoiceStrategies.ROUND_ROBIN) { + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + workerChoiceStrategy + ).nextWorkerNodeId + ).toBe(0) + } else if (workerChoiceStrategy === WorkerChoiceStrategies.FAIR_SHARE) { + for (const workerNodeKey of pool.workerChoiceStrategyContext.workerChoiceStrategies + .get(workerChoiceStrategy) + .workerLastVirtualTaskTimestamp.keys()) { + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies + .get(workerChoiceStrategy) + .workerLastVirtualTaskTimestamp.get(workerNodeKey).start + ).toBe(0) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies + .get(workerChoiceStrategy) + .workerLastVirtualTaskTimestamp.get(workerNodeKey).end + ).toBe(0) + } + } else if ( + workerChoiceStrategy === WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN + ) { + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + workerChoiceStrategy + ).currentWorkerNodeId + ).toBe(0) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + workerChoiceStrategy + ).defaultWorkerWeight + ).toBeGreaterThan(0) + for (const workerNodeKey of pool.workerChoiceStrategyContext.workerChoiceStrategies + .get(workerChoiceStrategy) + .workersTaskRunTime.keys()) { + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies + .get(workerChoiceStrategy) + .workersTaskRunTime.get(workerNodeKey).weight + ).toBeGreaterThan(0) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies + .get(workerChoiceStrategy) + .workersTaskRunTime.get(workerNodeKey).runTime + ).toBe(0) + } + } + } await pool.destroy() }) it('Verify ROUND_ROBIN strategy default tasks usage statistics requirements', async () => { + const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN let pool = new FixedThreadPool( max, './tests/worker-files/thread/testWorker.js', - { workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN } + { workerChoiceStrategy } ) expect( pool.workerChoiceStrategyContext.getRequiredStatistics().runTime @@ -91,7 +145,7 @@ describe('Selection strategies test suite', () => { min, max, './tests/worker-files/thread/testWorker.js', - { workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN } + { workerChoiceStrategy } ) expect( pool.workerChoiceStrategyContext.getRequiredStatistics().runTime @@ -140,9 +194,11 @@ describe('Selection strategies test suite', () => { }) it('Verify ROUND_ROBIN strategy runtime behavior', async () => { + const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN let pool = new FixedClusterPool( max, - './tests/worker-files/cluster/testWorker.js' + './tests/worker-files/cluster/testWorker.js', + { workerChoiceStrategy } ) let results = new Set() for (let i = 0; i < max; i++) { @@ -150,7 +206,11 @@ describe('Selection strategies test suite', () => { } expect(results.size).toBe(max) await pool.destroy() - pool = new FixedThreadPool(max, './tests/worker-files/thread/testWorker.js') + pool = new FixedThreadPool( + max, + './tests/worker-files/thread/testWorker.js', + { workerChoiceStrategy } + ) results = new Set() for (let i = 0; i < max; i++) { results.add(pool.chooseWorkerNode()[1].worker.threadId) @@ -160,6 +220,7 @@ describe('Selection strategies test suite', () => { }) it('Verify ROUND_ROBIN strategy internals are resets after setting it', async () => { + const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN let pool = new FixedThreadPool( max, './tests/worker-files/thread/testWorker.js', @@ -167,10 +228,10 @@ describe('Selection strategies test suite', () => { ) expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - WorkerChoiceStrategies.ROUND_ROBIN + workerChoiceStrategy ).nextWorkerNodeId ).toBeDefined() - pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.ROUND_ROBIN) + pool.setWorkerChoiceStrategy(workerChoiceStrategy) expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( pool.workerChoiceStrategyContext.workerChoiceStrategy @@ -185,10 +246,10 @@ describe('Selection strategies test suite', () => { ) expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - WorkerChoiceStrategies.ROUND_ROBIN + workerChoiceStrategy ).nextWorkerNodeId ).toBeDefined() - pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.ROUND_ROBIN) + pool.setWorkerChoiceStrategy(workerChoiceStrategy) expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( pool.workerChoiceStrategyContext.workerChoiceStrategy @@ -198,43 +259,12 @@ describe('Selection strategies test suite', () => { await pool.destroy() }) - it('Verify LESS_USED strategy is taken at pool creation', async () => { - const pool = new FixedThreadPool( - max, - './tests/worker-files/thread/testWorker.js', - { workerChoiceStrategy: WorkerChoiceStrategies.LESS_USED } - ) - expect(pool.opts.workerChoiceStrategy).toBe( - WorkerChoiceStrategies.LESS_USED - ) - expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe( - WorkerChoiceStrategies.LESS_USED - ) - // We need to clean up the resources after our test - await pool.destroy() - }) - - it('Verify LESS_USED strategy can be set after pool creation', async () => { - const pool = new FixedThreadPool( - max, - './tests/worker-files/thread/testWorker.js' - ) - pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.LESS_USED) - expect(pool.opts.workerChoiceStrategy).toBe( - WorkerChoiceStrategies.LESS_USED - ) - expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe( - WorkerChoiceStrategies.LESS_USED - ) - // We need to clean up the resources after our test - await pool.destroy() - }) - it('Verify LESS_USED strategy default tasks usage statistics requirements', async () => { + const workerChoiceStrategy = WorkerChoiceStrategies.LESS_USED let pool = new FixedThreadPool( max, './tests/worker-files/thread/testWorker.js', - { workerChoiceStrategy: WorkerChoiceStrategies.LESS_USED } + { workerChoiceStrategy } ) expect( pool.workerChoiceStrategyContext.getRequiredStatistics().runTime @@ -250,7 +280,7 @@ describe('Selection strategies test suite', () => { min, max, './tests/worker-files/thread/testWorker.js', - { workerChoiceStrategy: WorkerChoiceStrategies.LESS_USED } + { workerChoiceStrategy } ) expect( pool.workerChoiceStrategyContext.getRequiredStatistics().runTime @@ -298,43 +328,12 @@ describe('Selection strategies test suite', () => { await pool.destroy() }) - it('Verify LESS_BUSY strategy is taken at pool creation', async () => { - const pool = new FixedThreadPool( - max, - './tests/worker-files/thread/testWorker.js', - { workerChoiceStrategy: WorkerChoiceStrategies.LESS_BUSY } - ) - expect(pool.opts.workerChoiceStrategy).toBe( - WorkerChoiceStrategies.LESS_BUSY - ) - expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe( - WorkerChoiceStrategies.LESS_BUSY - ) - // We need to clean up the resources after our test - await pool.destroy() - }) - - it('Verify LESS_BUSY strategy can be set after pool creation', async () => { - const pool = new FixedThreadPool( - max, - './tests/worker-files/thread/testWorker.js' - ) - pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.LESS_BUSY) - expect(pool.opts.workerChoiceStrategy).toBe( - WorkerChoiceStrategies.LESS_BUSY - ) - expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe( - WorkerChoiceStrategies.LESS_BUSY - ) - // We need to clean up the resources after our test - await pool.destroy() - }) - it('Verify LESS_BUSY strategy default tasks usage statistics requirements', async () => { + const workerChoiceStrategy = WorkerChoiceStrategies.LESS_BUSY let pool = new FixedThreadPool( max, './tests/worker-files/thread/testWorker.js', - { workerChoiceStrategy: WorkerChoiceStrategies.LESS_BUSY } + { workerChoiceStrategy } ) expect( pool.workerChoiceStrategyContext.getRequiredStatistics().runTime @@ -350,7 +349,7 @@ describe('Selection strategies test suite', () => { min, max, './tests/worker-files/thread/testWorker.js', - { workerChoiceStrategy: WorkerChoiceStrategies.LESS_BUSY } + { workerChoiceStrategy } ) expect( pool.workerChoiceStrategyContext.getRequiredStatistics().runTime @@ -398,57 +397,12 @@ describe('Selection strategies test suite', () => { await pool.destroy() }) - it('Verify FAIR_SHARE strategy is taken at pool creation', async () => { - const pool = new FixedThreadPool( - max, - './tests/worker-files/thread/testWorker.js', - { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE } - ) - expect(pool.opts.workerChoiceStrategy).toBe( - WorkerChoiceStrategies.FAIR_SHARE - ) - expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe( - WorkerChoiceStrategies.FAIR_SHARE - ) - for (const workerNodeKey of pool.workerChoiceStrategyContext.workerChoiceStrategies - .get(pool.workerChoiceStrategyContext.workerChoiceStrategy) - .workerLastVirtualTaskTimestamp.keys()) { - expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies - .get(pool.workerChoiceStrategyContext.workerChoiceStrategy) - .workerLastVirtualTaskTimestamp.get(workerNodeKey).start - ).toBe(0) - expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies - .get(pool.workerChoiceStrategyContext.workerChoiceStrategy) - .workerLastVirtualTaskTimestamp.get(workerNodeKey).end - ).toBe(0) - } - // We need to clean up the resources after our test - await pool.destroy() - }) - - it('Verify FAIR_SHARE strategy can be set after pool creation', async () => { - const pool = new FixedThreadPool( - max, - './tests/worker-files/thread/testWorker.js' - ) - pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE) - expect(pool.opts.workerChoiceStrategy).toBe( - WorkerChoiceStrategies.FAIR_SHARE - ) - expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe( - WorkerChoiceStrategies.FAIR_SHARE - ) - // We need to clean up the resources after our test - await pool.destroy() - }) - it('Verify FAIR_SHARE strategy default tasks usage statistics requirements', async () => { + const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE let pool = new FixedThreadPool( max, './tests/worker-files/thread/testWorker.js', - { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE } + { workerChoiceStrategy } ) expect( pool.workerChoiceStrategyContext.getRequiredStatistics().runTime @@ -464,7 +418,7 @@ describe('Selection strategies test suite', () => { min, max, './tests/worker-files/thread/testWorker.js', - { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE } + { workerChoiceStrategy } ) expect( pool.workerChoiceStrategyContext.getRequiredStatistics().runTime @@ -526,16 +480,17 @@ describe('Selection strategies test suite', () => { }) it('Verify FAIR_SHARE strategy internals are resets after setting it', async () => { + const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE let pool = new FixedThreadPool( max, './tests/worker-files/thread/testWorker.js' ) expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - WorkerChoiceStrategies.FAIR_SHARE + workerChoiceStrategy ).workerLastVirtualTaskTimestamp ).toBeDefined() - pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE) + pool.setWorkerChoiceStrategy(workerChoiceStrategy) for (const workerNodeKey of pool.workerChoiceStrategyContext.workerChoiceStrategies .get(pool.workerChoiceStrategyContext.workerChoiceStrategy) .workerLastVirtualTaskTimestamp.keys()) { @@ -558,10 +513,10 @@ describe('Selection strategies test suite', () => { ) expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - WorkerChoiceStrategies.FAIR_SHARE + workerChoiceStrategy ).workerLastVirtualTaskTimestamp ).toBeDefined() - pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE) + pool.setWorkerChoiceStrategy(workerChoiceStrategy) for (const workerNodeKey of pool.workerChoiceStrategyContext.workerChoiceStrategies .get(pool.workerChoiceStrategyContext.workerChoiceStrategy) .workerLastVirtualTaskTimestamp.keys()) { @@ -580,67 +535,12 @@ describe('Selection strategies test suite', () => { await pool.destroy() }) - it('Verify WEIGHTED_ROUND_ROBIN strategy is taken at pool creation', async () => { - const pool = new FixedThreadPool( - max, - './tests/worker-files/thread/testWorker.js', - { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN } - ) - expect(pool.opts.workerChoiceStrategy).toBe( - WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN - ) - expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe( - WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN - ) - expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - pool.workerChoiceStrategyContext.workerChoiceStrategy - ).currentWorkerNodeId - ).toBe(0) - expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - pool.workerChoiceStrategyContext.workerChoiceStrategy - ).defaultWorkerWeight - ).toBeGreaterThan(0) - for (const workerNodeKey of pool.workerChoiceStrategyContext.workerChoiceStrategies - .get(pool.workerChoiceStrategyContext.workerChoiceStrategy) - .workersTaskRunTime.keys()) { - expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies - .get(pool.workerChoiceStrategyContext.workerChoiceStrategy) - .workersTaskRunTime.get(workerNodeKey).weight - ).toBeGreaterThan(0) - expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies - .get(pool.workerChoiceStrategyContext.workerChoiceStrategy) - .workersTaskRunTime.get(workerNodeKey).runTime - ).toBe(0) - } - // We need to clean up the resources after our test - await pool.destroy() - }) - - it('Verify WEIGHTED_ROUND_ROBIN strategy can be set after pool creation', async () => { - const pool = new FixedThreadPool( - max, - './tests/worker-files/thread/testWorker.js' - ) - pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN) - expect(pool.opts.workerChoiceStrategy).toBe( - WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN - ) - expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe( - WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN - ) - // We need to clean up the resources after our test - await pool.destroy() - }) - it('Verify WEIGHTED_ROUND_ROBIN strategy default tasks usage statistics requirements', async () => { + const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN let pool = new FixedThreadPool( max, './tests/worker-files/thread/testWorker.js', - { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN } + { workerChoiceStrategy } ) expect( pool.workerChoiceStrategyContext.getRequiredStatistics().runTime @@ -656,7 +556,7 @@ describe('Selection strategies test suite', () => { min, max, './tests/worker-files/thread/testWorker.js', - { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN } + { workerChoiceStrategy } ) expect( pool.workerChoiceStrategyContext.getRequiredStatistics().runTime @@ -721,26 +621,27 @@ describe('Selection strategies test suite', () => { }) it('Verify WEIGHTED_ROUND_ROBIN strategy internals are resets after setting it', async () => { + const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN let pool = new FixedThreadPool( max, './tests/worker-files/thread/testWorker.js' ) expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN + workerChoiceStrategy ).currentWorkerNodeId ).toBeDefined() expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN + workerChoiceStrategy ).defaultWorkerWeight ).toBeDefined() expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN + workerChoiceStrategy ).workersTaskRunTime ).toBeDefined() - pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN) + pool.setWorkerChoiceStrategy(workerChoiceStrategy) expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( pool.workerChoiceStrategyContext.workerChoiceStrategy @@ -768,20 +669,20 @@ describe('Selection strategies test suite', () => { ) expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN + workerChoiceStrategy ).currentWorkerNodeId ).toBeDefined() expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN + workerChoiceStrategy ).defaultWorkerWeight ).toBeDefined() expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN + workerChoiceStrategy ).workersTaskRunTime ).toBeDefined() - pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN) + pool.setWorkerChoiceStrategy(workerChoiceStrategy) expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( pool.workerChoiceStrategyContext.workerChoiceStrategy diff --git a/tests/pools/selection-strategies/worker-choice-strategy-context.test.js b/tests/pools/selection-strategies/worker-choice-strategy-context.test.js index c571478c..23d954c9 100644 --- a/tests/pools/selection-strategies/worker-choice-strategy-context.test.js +++ b/tests/pools/selection-strategies/worker-choice-strategy-context.test.js @@ -112,238 +112,228 @@ describe('Worker choice strategy context test suite', () => { }) it('Verify that setWorkerChoiceStrategy() works with ROUND_ROBIN and fixed pool', () => { + const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN const workerChoiceStrategyContext = new WorkerChoiceStrategyContext( fixedPool ) expect( workerChoiceStrategyContext.workerChoiceStrategies.get( - WorkerChoiceStrategies.ROUND_ROBIN + workerChoiceStrategy ).isDynamicPool ).toBe(false) expect( workerChoiceStrategyContext.workerChoiceStrategies.get( - WorkerChoiceStrategies.ROUND_ROBIN + workerChoiceStrategy ) ).toBeInstanceOf(RoundRobinWorkerChoiceStrategy) expect(workerChoiceStrategyContext.workerChoiceStrategy).toBe( - WorkerChoiceStrategies.ROUND_ROBIN - ) - workerChoiceStrategyContext.setWorkerChoiceStrategy( - WorkerChoiceStrategies.ROUND_ROBIN + workerChoiceStrategy ) + workerChoiceStrategyContext.setWorkerChoiceStrategy(workerChoiceStrategy) expect( workerChoiceStrategyContext.workerChoiceStrategies.get( - WorkerChoiceStrategies.ROUND_ROBIN + workerChoiceStrategy ) ).toBeInstanceOf(RoundRobinWorkerChoiceStrategy) expect(workerChoiceStrategyContext.workerChoiceStrategy).toBe( - WorkerChoiceStrategies.ROUND_ROBIN + workerChoiceStrategy ) }) it('Verify that setWorkerChoiceStrategy() works with ROUND_ROBIN and dynamic pool', () => { + const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN const workerChoiceStrategyContext = new WorkerChoiceStrategyContext( dynamicPool ) expect( workerChoiceStrategyContext.workerChoiceStrategies.get( - WorkerChoiceStrategies.ROUND_ROBIN + workerChoiceStrategy ).isDynamicPool ).toBe(true) expect( workerChoiceStrategyContext.workerChoiceStrategies.get( - WorkerChoiceStrategies.ROUND_ROBIN + workerChoiceStrategy ) ).toBeInstanceOf(RoundRobinWorkerChoiceStrategy) expect(workerChoiceStrategyContext.workerChoiceStrategy).toBe( - WorkerChoiceStrategies.ROUND_ROBIN - ) - workerChoiceStrategyContext.setWorkerChoiceStrategy( - WorkerChoiceStrategies.ROUND_ROBIN + workerChoiceStrategy ) + workerChoiceStrategyContext.setWorkerChoiceStrategy(workerChoiceStrategy) expect( workerChoiceStrategyContext.workerChoiceStrategies.get( - WorkerChoiceStrategies.ROUND_ROBIN + workerChoiceStrategy ) ).toBeInstanceOf(RoundRobinWorkerChoiceStrategy) expect(workerChoiceStrategyContext.workerChoiceStrategy).toBe( - WorkerChoiceStrategies.ROUND_ROBIN + workerChoiceStrategy ) }) it('Verify that setWorkerChoiceStrategy() works with LESS_USED and fixed pool', () => { + const workerChoiceStrategy = WorkerChoiceStrategies.LESS_USED const workerChoiceStrategyContext = new WorkerChoiceStrategyContext( fixedPool ) expect( workerChoiceStrategyContext.workerChoiceStrategies.get( - WorkerChoiceStrategies.LESS_USED + workerChoiceStrategy ).isDynamicPool ).toBe(false) - workerChoiceStrategyContext.setWorkerChoiceStrategy( - WorkerChoiceStrategies.LESS_USED - ) + workerChoiceStrategyContext.setWorkerChoiceStrategy(workerChoiceStrategy) expect( workerChoiceStrategyContext.workerChoiceStrategies.get( - WorkerChoiceStrategies.LESS_USED + workerChoiceStrategy ) ).toBeInstanceOf(LessUsedWorkerChoiceStrategy) expect(workerChoiceStrategyContext.workerChoiceStrategy).toBe( - WorkerChoiceStrategies.LESS_USED + workerChoiceStrategy ) }) it('Verify that setWorkerChoiceStrategy() works with LESS_USED and dynamic pool', () => { + const workerChoiceStrategy = WorkerChoiceStrategies.LESS_USED const workerChoiceStrategyContext = new WorkerChoiceStrategyContext( dynamicPool ) expect( workerChoiceStrategyContext.workerChoiceStrategies.get( - WorkerChoiceStrategies.LESS_USED + workerChoiceStrategy ).isDynamicPool ).toBe(true) - workerChoiceStrategyContext.setWorkerChoiceStrategy( - WorkerChoiceStrategies.LESS_USED - ) + workerChoiceStrategyContext.setWorkerChoiceStrategy(workerChoiceStrategy) expect( workerChoiceStrategyContext.workerChoiceStrategies.get( - WorkerChoiceStrategies.LESS_USED + workerChoiceStrategy ) ).toBeInstanceOf(LessUsedWorkerChoiceStrategy) expect(workerChoiceStrategyContext.workerChoiceStrategy).toBe( - WorkerChoiceStrategies.LESS_USED + workerChoiceStrategy ) }) it('Verify that setWorkerChoiceStrategy() works with LESS_BUSY and fixed pool', () => { + const workerChoiceStrategy = WorkerChoiceStrategies.LESS_BUSY const workerChoiceStrategyContext = new WorkerChoiceStrategyContext( fixedPool ) expect( workerChoiceStrategyContext.workerChoiceStrategies.get( - WorkerChoiceStrategies.LESS_BUSY + workerChoiceStrategy ).isDynamicPool ).toBe(false) - workerChoiceStrategyContext.setWorkerChoiceStrategy( - WorkerChoiceStrategies.LESS_BUSY - ) + workerChoiceStrategyContext.setWorkerChoiceStrategy(workerChoiceStrategy) expect( workerChoiceStrategyContext.workerChoiceStrategies.get( - WorkerChoiceStrategies.LESS_BUSY + workerChoiceStrategy ) ).toBeInstanceOf(LessBusyWorkerChoiceStrategy) expect(workerChoiceStrategyContext.workerChoiceStrategy).toBe( - WorkerChoiceStrategies.LESS_BUSY + workerChoiceStrategy ) }) it('Verify that setWorkerChoiceStrategy() works with LESS_BUSY and dynamic pool', () => { + const workerChoiceStrategy = WorkerChoiceStrategies.LESS_BUSY const workerChoiceStrategyContext = new WorkerChoiceStrategyContext( dynamicPool ) expect( workerChoiceStrategyContext.workerChoiceStrategies.get( - WorkerChoiceStrategies.LESS_BUSY + workerChoiceStrategy ).isDynamicPool ).toBe(true) - workerChoiceStrategyContext.setWorkerChoiceStrategy( - WorkerChoiceStrategies.LESS_BUSY - ) + workerChoiceStrategyContext.setWorkerChoiceStrategy(workerChoiceStrategy) expect( workerChoiceStrategyContext.workerChoiceStrategies.get( - WorkerChoiceStrategies.LESS_BUSY + workerChoiceStrategy ) ).toBeInstanceOf(LessBusyWorkerChoiceStrategy) expect(workerChoiceStrategyContext.workerChoiceStrategy).toBe( - WorkerChoiceStrategies.LESS_BUSY + workerChoiceStrategy ) }) it('Verify that setWorkerChoiceStrategy() works with FAIR_SHARE and fixed pool', () => { + const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE const workerChoiceStrategyContext = new WorkerChoiceStrategyContext( fixedPool ) expect( workerChoiceStrategyContext.workerChoiceStrategies.get( - WorkerChoiceStrategies.FAIR_SHARE + workerChoiceStrategy ).isDynamicPool ).toBe(false) - workerChoiceStrategyContext.setWorkerChoiceStrategy( - WorkerChoiceStrategies.FAIR_SHARE - ) + workerChoiceStrategyContext.setWorkerChoiceStrategy(workerChoiceStrategy) expect( workerChoiceStrategyContext.workerChoiceStrategies.get( - WorkerChoiceStrategies.FAIR_SHARE + workerChoiceStrategy ) ).toBeInstanceOf(FairShareWorkerChoiceStrategy) expect(workerChoiceStrategyContext.workerChoiceStrategy).toBe( - WorkerChoiceStrategies.FAIR_SHARE + workerChoiceStrategy ) }) it('Verify that setWorkerChoiceStrategy() works with FAIR_SHARE and dynamic pool', () => { + const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE const workerChoiceStrategyContext = new WorkerChoiceStrategyContext( dynamicPool ) expect( workerChoiceStrategyContext.workerChoiceStrategies.get( - WorkerChoiceStrategies.FAIR_SHARE + workerChoiceStrategy ).isDynamicPool ).toBe(true) - workerChoiceStrategyContext.setWorkerChoiceStrategy( - WorkerChoiceStrategies.FAIR_SHARE - ) + workerChoiceStrategyContext.setWorkerChoiceStrategy(workerChoiceStrategy) expect( workerChoiceStrategyContext.workerChoiceStrategies.get( - WorkerChoiceStrategies.FAIR_SHARE + workerChoiceStrategy ) ).toBeInstanceOf(FairShareWorkerChoiceStrategy) expect(workerChoiceStrategyContext.workerChoiceStrategy).toBe( - WorkerChoiceStrategies.FAIR_SHARE + workerChoiceStrategy ) }) it('Verify that setWorkerChoiceStrategy() works with WEIGHTED_ROUND_ROBIN and fixed pool', () => { + const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN const workerChoiceStrategyContext = new WorkerChoiceStrategyContext( fixedPool ) expect( workerChoiceStrategyContext.workerChoiceStrategies.get( - WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN + workerChoiceStrategy ).isDynamicPool ).toBe(false) - workerChoiceStrategyContext.setWorkerChoiceStrategy( - WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN - ) + workerChoiceStrategyContext.setWorkerChoiceStrategy(workerChoiceStrategy) expect( workerChoiceStrategyContext.workerChoiceStrategies.get( - WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN + workerChoiceStrategy ) ).toBeInstanceOf(WeightedRoundRobinWorkerChoiceStrategy) expect(workerChoiceStrategyContext.workerChoiceStrategy).toBe( - WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN + workerChoiceStrategy ) }) it('Verify that setWorkerChoiceStrategy() works with WEIGHTED_ROUND_ROBIN and dynamic pool', () => { + const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN const workerChoiceStrategyContext = new WorkerChoiceStrategyContext( dynamicPool ) expect( workerChoiceStrategyContext.workerChoiceStrategies.get( - WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN + workerChoiceStrategy ).isDynamicPool ).toBe(true) - workerChoiceStrategyContext.setWorkerChoiceStrategy( - WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN - ) + workerChoiceStrategyContext.setWorkerChoiceStrategy(workerChoiceStrategy) expect( workerChoiceStrategyContext.workerChoiceStrategies.get( - WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN + workerChoiceStrategy ) ).toBeInstanceOf(WeightedRoundRobinWorkerChoiceStrategy) expect(workerChoiceStrategyContext.workerChoiceStrategy).toBe( - WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN + workerChoiceStrategy ) }) }) diff --git a/tests/pools/thread/fixed.test.js b/tests/pools/thread/fixed.test.js index cf5c3175..7b4b9bf5 100644 --- a/tests/pools/thread/fixed.test.js +++ b/tests/pools/thread/fixed.test.js @@ -12,6 +12,17 @@ describe('Fixed thread pool test suite', () => { errorHandler: e => console.error(e) } ) + const queuePool = new FixedThreadPool( + numberOfThreads, + './tests/worker-files/thread/testWorker.js', + { + enableTasksQueue: true, + tasksQueueOptions: { + concurrency: 2 + }, + errorHandler: e => console.error(e) + } + ) const emptyPool = new FixedThreadPool( numberOfThreads, './tests/worker-files/thread/emptyWorker.js', @@ -47,6 +58,7 @@ describe('Fixed thread pool test suite', () => { await errorPool.destroy() await asyncErrorPool.destroy() await emptyPool.destroy() + await queuePool.destroy() }) it('Verify that the function is executed in a worker thread', async () => { @@ -76,6 +88,31 @@ describe('Fixed thread pool test suite', () => { expect(poolBusy).toBe(numberOfThreads + 1) }) + it('Verify that tasks queuing is working', async () => { + const maxMultiplier = 10 + for (let i = 0; i < numberOfThreads * maxMultiplier; i++) { + queuePool.execute() + } + for (const workerNode of queuePool.workerNodes) { + expect(workerNode.tasksUsage.running).toBeLessThanOrEqual( + queuePool.opts.tasksQueueOptions.concurrency + ) + expect(workerNode.tasksUsage.run).toBe(0) + expect(workerNode.tasksQueue.length).toBeGreaterThan(0) + } + // FIXME: wait for ongoing tasks to be executed + const promises = [] + for (let i = 0; i < numberOfThreads * maxMultiplier; i++) { + promises.push(queuePool.execute()) + } + await Promise.all(promises) + for (const workerNode of queuePool.workerNodes) { + expect(workerNode.tasksUsage.running).toBe(0) + expect(workerNode.tasksUsage.run).toBeGreaterThan(0) + expect(workerNode.tasksQueue.length).toBe(0) + } + }) + it('Verify that is possible to have a worker that return undefined', async () => { const result = await emptyPool.execute() expect(result).toBeUndefined() -- 2.34.1