X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=tests%2Fpools%2Fselection-strategies%2Fselection-strategies.test.js;h=9f1d3a09f42f74f8742e320eec6d6cca2f26dbd0;hb=a1c82d5d3cc4e6c932a065018b20ed716e9c3051;hp=d4d2ec1b233c3cc0eda0c78621733cc9441478d9;hpb=d710242dd39f5dd418b0a89536a9ad88c147fe3b;p=poolifier.git diff --git a/tests/pools/selection-strategies/selection-strategies.test.js b/tests/pools/selection-strategies/selection-strategies.test.js index d4d2ec1b..9f1d3a09 100644 --- a/tests/pools/selection-strategies/selection-strategies.test.js +++ b/tests/pools/selection-strategies/selection-strategies.test.js @@ -4,7 +4,9 @@ const { DynamicThreadPool, FixedThreadPool, FixedClusterPool -} = require('../../../lib/index') +} = require('../../../lib') +const { CircularArray } = require('../../../lib/circular-array') +const TestUtils = require('../../test-utils') describe('Selection strategies test suite', () => { const min = 0 @@ -12,12 +14,16 @@ describe('Selection strategies test suite', () => { it('Verify that WorkerChoiceStrategies enumeration provides string values', () => { expect(WorkerChoiceStrategies.ROUND_ROBIN).toBe('ROUND_ROBIN') - expect(WorkerChoiceStrategies.LESS_USED).toBe('LESS_USED') - expect(WorkerChoiceStrategies.LESS_BUSY).toBe('LESS_BUSY') + expect(WorkerChoiceStrategies.LEAST_USED).toBe('LEAST_USED') + expect(WorkerChoiceStrategies.LEAST_BUSY).toBe('LEAST_BUSY') + expect(WorkerChoiceStrategies.LEAST_ELU).toBe('LEAST_ELU') expect(WorkerChoiceStrategies.FAIR_SHARE).toBe('FAIR_SHARE') expect(WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN).toBe( 'WEIGHTED_ROUND_ROBIN' ) + expect(WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN).toBe( + 'INTERLEAVED_WEIGHTED_ROUND_ROBIN' + ) }) it('Verify ROUND_ROBIN strategy is the default at pool creation', async () => { @@ -33,75 +39,119 @@ describe('Selection strategies test suite', () => { await pool.destroy() }) - it('Verify ROUND_ROBIN strategy is taken at pool creation', async () => { - const pool = new FixedThreadPool( - max, - './tests/worker-files/thread/testWorker.js', - { workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN } - ) - expect(pool.opts.workerChoiceStrategy).toBe( - WorkerChoiceStrategies.ROUND_ROBIN - ) - expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe( - WorkerChoiceStrategies.ROUND_ROBIN - ) - expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - pool.workerChoiceStrategyContext.workerChoiceStrategy - ).nextWorkerNodeId - ).toBe(0) - // We need to clean up the resources after our test - await pool.destroy() + it('Verify available strategies are taken at pool creation', async () => { + for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) { + const pool = new FixedThreadPool( + max, + './tests/worker-files/thread/testWorker.js', + { workerChoiceStrategy } + ) + expect(pool.opts.workerChoiceStrategy).toBe(workerChoiceStrategy) + expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe( + workerChoiceStrategy + ) + await pool.destroy() + } }) - it('Verify ROUND_ROBIN strategy can be set after pool creation', async () => { - const pool = new DynamicThreadPool( - min, + it('Verify available strategies can be set after pool creation', async () => { + for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) { + const pool = new DynamicThreadPool( + min, + max, + './tests/worker-files/thread/testWorker.js' + ) + pool.setWorkerChoiceStrategy(workerChoiceStrategy) + expect(pool.opts.workerChoiceStrategy).toBe(workerChoiceStrategy) + expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe( + workerChoiceStrategy + ) + await pool.destroy() + } + }) + + it('Verify available strategies default internals at pool creation', async () => { + const pool = new FixedThreadPool( max, './tests/worker-files/thread/testWorker.js' ) - pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.ROUND_ROBIN) - expect(pool.opts.workerChoiceStrategy).toBe( - WorkerChoiceStrategies.ROUND_ROBIN - ) - expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe( - WorkerChoiceStrategies.ROUND_ROBIN - ) - // We need to clean up the resources after our test + for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) { + if (workerChoiceStrategy === WorkerChoiceStrategies.ROUND_ROBIN) { + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + workerChoiceStrategy + ).nextWorkerNodeId + ).toBe(0) + } else if (workerChoiceStrategy === WorkerChoiceStrategies.FAIR_SHARE) { + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + workerChoiceStrategy + ).workersVirtualTaskEndTimestamp + ).toBeInstanceOf(Array) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + workerChoiceStrategy + ).workersVirtualTaskEndTimestamp.length + ).toBe(0) + } else if ( + workerChoiceStrategy === WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN + ) { + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + workerChoiceStrategy + ).currentWorkerNodeId + ).toBe(0) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + workerChoiceStrategy + ).defaultWorkerWeight + ).toBeGreaterThan(0) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + workerChoiceStrategy + ).workerVirtualTaskRunTime + ).toBe(0) + } + } await pool.destroy() }) it('Verify ROUND_ROBIN strategy default tasks usage statistics requirements', async () => { + const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN let pool = new FixedThreadPool( max, './tests/worker-files/thread/testWorker.js', - { workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN } + { workerChoiceStrategy } ) expect( - pool.workerChoiceStrategyContext.getRequiredStatistics().runTime - ).toBe(false) - expect( - pool.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime - ).toBe(false) - expect( - pool.workerChoiceStrategyContext.getRequiredStatistics().medRunTime - ).toBe(false) + pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() + ).toStrictEqual({ + runTime: false, + avgRunTime: false, + medRunTime: false, + waitTime: false, + avgWaitTime: false, + medWaitTime: false, + elu: false + }) await pool.destroy() pool = new DynamicThreadPool( min, max, './tests/worker-files/thread/testWorker.js', - { workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN } + { workerChoiceStrategy } ) expect( - pool.workerChoiceStrategyContext.getRequiredStatistics().runTime - ).toBe(false) - expect( - pool.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime - ).toBe(false) - expect( - pool.workerChoiceStrategyContext.getRequiredStatistics().medRunTime - ).toBe(false) + pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() + ).toStrictEqual({ + runTime: false, + avgRunTime: false, + medRunTime: false, + waitTime: false, + avgWaitTime: false, + medWaitTime: false, + elu: false + }) // We need to clean up the resources after our test await pool.destroy() }) @@ -113,11 +163,40 @@ describe('Selection strategies test suite', () => { { workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN } ) // TODO: Create a better test to cover `RoundRobinWorkerChoiceStrategy#choose` - const promises = [] - for (let i = 0; i < max * 2; i++) { - promises.push(pool.execute()) + const promises = new Set() + const maxMultiplier = 2 + for (let i = 0; i < max * maxMultiplier; i++) { + promises.add(pool.execute()) } await Promise.all(promises) + for (const workerNode of pool.workerNodes) { + expect(workerNode.workerUsage).toStrictEqual({ + tasks: { + executed: maxMultiplier, + executing: 0, + queued: 0, + failed: 0 + }, + runTime: { + aggregation: 0, + average: 0, + median: 0, + history: expect.any(CircularArray) + }, + waitTime: { + aggregation: 0, + average: 0, + median: 0, + history: expect.any(CircularArray) + }, + elu: undefined + }) + } + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + WorkerChoiceStrategies.ROUND_ROBIN + ).nextWorkerNodeId + ).toBe(0) // We need to clean up the resources after our test await pool.destroy() }) @@ -130,36 +209,72 @@ describe('Selection strategies test suite', () => { { workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN } ) // TODO: Create a better test to cover `RoundRobinWorkerChoiceStrategy#choose` - const promises = [] - for (let i = 0; i < max * 2; i++) { - promises.push(pool.execute()) + const promises = new Set() + const maxMultiplier = 2 + for (let i = 0; i < max * maxMultiplier; i++) { + promises.add(pool.execute()) } await Promise.all(promises) + for (const workerNode of pool.workerNodes) { + expect(workerNode.workerUsage).toStrictEqual({ + tasks: { + executed: maxMultiplier, + executing: 0, + queued: 0, + failed: 0 + }, + runTime: { + aggregation: 0, + average: 0, + median: 0, + history: expect.any(CircularArray) + }, + waitTime: { + aggregation: 0, + average: 0, + median: 0, + history: expect.any(CircularArray) + }, + elu: undefined + }) + } + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + WorkerChoiceStrategies.ROUND_ROBIN + ).nextWorkerNodeId + ).toBe(0) // We need to clean up the resources after our test await pool.destroy() }) it('Verify ROUND_ROBIN strategy runtime behavior', async () => { + const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN let pool = new FixedClusterPool( max, - './tests/worker-files/cluster/testWorker.js' + './tests/worker-files/cluster/testWorker.js', + { workerChoiceStrategy } ) let results = new Set() for (let i = 0; i < max; i++) { - results.add(pool.chooseWorkerNode()[1].worker.id) + results.add(pool.workerNodes[pool.chooseWorkerNode()].worker.id) } expect(results.size).toBe(max) await pool.destroy() - pool = new FixedThreadPool(max, './tests/worker-files/thread/testWorker.js') + pool = new FixedThreadPool( + max, + './tests/worker-files/thread/testWorker.js', + { workerChoiceStrategy } + ) results = new Set() for (let i = 0; i < max; i++) { - results.add(pool.chooseWorkerNode()[1].worker.threadId) + results.add(pool.workerNodes[pool.chooseWorkerNode()].worker.threadId) } expect(results.size).toBe(max) await pool.destroy() }) it('Verify ROUND_ROBIN strategy internals are resets after setting it', async () => { + const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN let pool = new FixedThreadPool( max, './tests/worker-files/thread/testWorker.js', @@ -167,10 +282,10 @@ describe('Selection strategies test suite', () => { ) expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - WorkerChoiceStrategies.ROUND_ROBIN + workerChoiceStrategy ).nextWorkerNodeId ).toBeDefined() - pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.ROUND_ROBIN) + pool.setWorkerChoiceStrategy(workerChoiceStrategy) expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( pool.workerChoiceStrategyContext.workerChoiceStrategy @@ -185,10 +300,10 @@ describe('Selection strategies test suite', () => { ) expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - WorkerChoiceStrategies.ROUND_ROBIN + workerChoiceStrategy ).nextWorkerNodeId ).toBeDefined() - pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.ROUND_ROBIN) + pool.setWorkerChoiceStrategy(workerChoiceStrategy) expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( pool.workerChoiceStrategyContext.workerChoiceStrategy @@ -198,358 +313,878 @@ describe('Selection strategies test suite', () => { await pool.destroy() }) - it('Verify LESS_USED strategy is taken at pool creation', async () => { - const pool = new FixedThreadPool( + it('Verify LEAST_USED strategy default tasks usage statistics requirements', async () => { + const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_USED + let pool = new FixedThreadPool( max, './tests/worker-files/thread/testWorker.js', - { workerChoiceStrategy: WorkerChoiceStrategies.LESS_USED } - ) - expect(pool.opts.workerChoiceStrategy).toBe( - WorkerChoiceStrategies.LESS_USED + { workerChoiceStrategy } ) - expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe( - WorkerChoiceStrategies.LESS_USED + expect( + pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() + ).toStrictEqual({ + runTime: false, + avgRunTime: false, + medRunTime: false, + waitTime: false, + avgWaitTime: false, + medWaitTime: false, + elu: false + }) + await pool.destroy() + pool = new DynamicThreadPool( + min, + max, + './tests/worker-files/thread/testWorker.js', + { workerChoiceStrategy } ) + expect( + pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() + ).toStrictEqual({ + runTime: false, + avgRunTime: false, + medRunTime: false, + waitTime: false, + avgWaitTime: false, + medWaitTime: false, + elu: false + }) // We need to clean up the resources after our test await pool.destroy() }) - it('Verify LESS_USED strategy can be set after pool creation', async () => { + it('Verify LEAST_USED strategy can be run in a fixed pool', async () => { const pool = new FixedThreadPool( max, - './tests/worker-files/thread/testWorker.js' - ) - pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.LESS_USED) - expect(pool.opts.workerChoiceStrategy).toBe( - WorkerChoiceStrategies.LESS_USED + './tests/worker-files/thread/testWorker.js', + { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED } ) - expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe( - WorkerChoiceStrategies.LESS_USED + // TODO: Create a better test to cover `LeastUsedWorkerChoiceStrategy#choose` + const promises = new Set() + const maxMultiplier = 2 + for (let i = 0; i < max * maxMultiplier; i++) { + promises.add(pool.execute()) + } + await Promise.all(promises) + for (const workerNode of pool.workerNodes) { + expect(workerNode.workerUsage).toStrictEqual({ + tasks: { + executed: maxMultiplier, + executing: 0, + queued: 0, + failed: 0 + }, + runTime: { + aggregation: 0, + average: 0, + median: 0, + history: expect.any(CircularArray) + }, + waitTime: { + aggregation: 0, + average: 0, + median: 0, + history: expect.any(CircularArray) + }, + elu: undefined + }) + } + // We need to clean up the resources after our test + await pool.destroy() + }) + + it('Verify LEAST_USED strategy can be run in a dynamic pool', async () => { + const pool = new DynamicThreadPool( + min, + max, + './tests/worker-files/thread/testWorker.js', + { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED } ) + // TODO: Create a better test to cover `LeastUsedWorkerChoiceStrategy#choose` + const promises = new Set() + const maxMultiplier = 2 + for (let i = 0; i < max * maxMultiplier; i++) { + promises.add(pool.execute()) + } + await Promise.all(promises) + for (const workerNode of pool.workerNodes) { + expect(workerNode.workerUsage).toStrictEqual({ + tasks: { + executed: maxMultiplier, + executing: 0, + queued: 0, + failed: 0 + }, + runTime: { + aggregation: 0, + average: 0, + median: 0, + history: expect.any(CircularArray) + }, + waitTime: { + aggregation: 0, + average: 0, + median: 0, + history: expect.any(CircularArray) + }, + + elu: undefined + }) + } // We need to clean up the resources after our test await pool.destroy() }) - it('Verify LESS_USED strategy default tasks usage statistics requirements', async () => { + it('Verify LEAST_BUSY strategy default tasks usage statistics requirements', async () => { + const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_BUSY let pool = new FixedThreadPool( max, './tests/worker-files/thread/testWorker.js', - { workerChoiceStrategy: WorkerChoiceStrategies.LESS_USED } + { workerChoiceStrategy } ) expect( - pool.workerChoiceStrategyContext.getRequiredStatistics().runTime - ).toBe(false) - expect( - pool.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime - ).toBe(false) - expect( - pool.workerChoiceStrategyContext.getRequiredStatistics().medRunTime - ).toBe(false) + pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() + ).toStrictEqual({ + runTime: true, + avgRunTime: false, + medRunTime: false, + waitTime: true, + avgWaitTime: false, + medWaitTime: false, + elu: false + }) await pool.destroy() pool = new DynamicThreadPool( min, max, './tests/worker-files/thread/testWorker.js', - { workerChoiceStrategy: WorkerChoiceStrategies.LESS_USED } + { workerChoiceStrategy } ) expect( - pool.workerChoiceStrategyContext.getRequiredStatistics().runTime - ).toBe(false) - expect( - pool.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime - ).toBe(false) - expect( - pool.workerChoiceStrategyContext.getRequiredStatistics().medRunTime - ).toBe(false) + pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() + ).toStrictEqual({ + runTime: true, + avgRunTime: false, + medRunTime: false, + waitTime: true, + avgWaitTime: false, + medWaitTime: false, + elu: false + }) // We need to clean up the resources after our test await pool.destroy() }) - it('Verify LESS_USED strategy can be run in a fixed pool', async () => { + it('Verify LEAST_BUSY strategy can be run in a fixed pool', async () => { const pool = new FixedThreadPool( max, './tests/worker-files/thread/testWorker.js', - { workerChoiceStrategy: WorkerChoiceStrategies.LESS_USED } + { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_BUSY } ) - // TODO: Create a better test to cover `LessUsedWorkerChoiceStrategy#choose` - const promises = [] - for (let i = 0; i < max * 2; i++) { - promises.push(pool.execute()) + // TODO: Create a better test to cover `LeastBusyWorkerChoiceStrategy#choose` + const promises = new Set() + const maxMultiplier = 2 + for (let i = 0; i < max * maxMultiplier; i++) { + promises.add(pool.execute()) } await Promise.all(promises) + for (const workerNode of pool.workerNodes) { + expect(workerNode.workerUsage).toStrictEqual({ + tasks: { + executed: expect.any(Number), + executing: 0, + queued: 0, + failed: 0 + }, + runTime: { + aggregation: expect.any(Number), + average: 0, + median: 0, + history: expect.any(CircularArray) + }, + waitTime: { + aggregation: expect.any(Number), + average: 0, + median: 0, + history: expect.any(CircularArray) + }, + elu: undefined + }) + expect(workerNode.workerUsage.tasks.executed).toBeGreaterThanOrEqual(0) + expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual( + max * maxMultiplier + ) + expect(workerNode.workerUsage.runTime.aggregation).toBeGreaterThanOrEqual( + 0 + ) + expect( + workerNode.workerUsage.waitTime.aggregation + ).toBeGreaterThanOrEqual(0) + } // We need to clean up the resources after our test await pool.destroy() }) - it('Verify LESS_USED strategy can be run in a dynamic pool', async () => { + it('Verify LEAST_BUSY strategy can be run in a dynamic pool', async () => { const pool = new DynamicThreadPool( min, max, './tests/worker-files/thread/testWorker.js', - { workerChoiceStrategy: WorkerChoiceStrategies.LESS_USED } + { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_BUSY } ) - // TODO: Create a better test to cover `LessUsedWorkerChoiceStrategy#choose` - const promises = [] - for (let i = 0; i < max * 2; i++) { - promises.push(pool.execute()) + // TODO: Create a better test to cover `LeastBusyWorkerChoiceStrategy#choose` + const promises = new Set() + const maxMultiplier = 2 + for (let i = 0; i < max * maxMultiplier; i++) { + promises.add(pool.execute()) } await Promise.all(promises) + for (const workerNode of pool.workerNodes) { + expect(workerNode.workerUsage).toStrictEqual({ + tasks: { + executed: expect.any(Number), + executing: 0, + queued: 0, + failed: 0 + }, + runTime: { + aggregation: expect.any(Number), + average: 0, + median: 0, + history: expect.any(CircularArray) + }, + waitTime: { + aggregation: expect.any(Number), + average: 0, + median: 0, + history: expect.any(CircularArray) + }, + elu: undefined + }) + expect(workerNode.workerUsage.tasks.executed).toBeGreaterThan(0) + expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual( + max * maxMultiplier + ) + expect(workerNode.workerUsage.runTime.aggregation).toBeGreaterThan(0) + expect(workerNode.workerUsage.waitTime.aggregation).toBeGreaterThan(0) + } // We need to clean up the resources after our test await pool.destroy() }) - it('Verify LESS_BUSY strategy is taken at pool creation', async () => { - const pool = new FixedThreadPool( + it('Verify LEAST_ELU strategy default tasks usage statistics requirements', async () => { + const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_ELU + let pool = new FixedThreadPool( max, './tests/worker-files/thread/testWorker.js', - { workerChoiceStrategy: WorkerChoiceStrategies.LESS_BUSY } + { workerChoiceStrategy } ) - expect(pool.opts.workerChoiceStrategy).toBe( - WorkerChoiceStrategies.LESS_BUSY - ) - expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe( - WorkerChoiceStrategies.LESS_BUSY + expect( + pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() + ).toStrictEqual({ + runTime: false, + avgRunTime: false, + medRunTime: false, + waitTime: false, + avgWaitTime: false, + medWaitTime: false, + elu: true + }) + await pool.destroy() + pool = new DynamicThreadPool( + min, + max, + './tests/worker-files/thread/testWorker.js', + { workerChoiceStrategy } ) + expect( + pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() + ).toStrictEqual({ + runTime: false, + avgRunTime: false, + medRunTime: false, + waitTime: false, + avgWaitTime: false, + medWaitTime: false, + elu: true + }) // We need to clean up the resources after our test await pool.destroy() }) - it('Verify LESS_BUSY strategy can be set after pool creation', async () => { + it('Verify LEAST_ELU strategy can be run in a fixed pool', async () => { const pool = new FixedThreadPool( max, - './tests/worker-files/thread/testWorker.js' - ) - pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.LESS_BUSY) - expect(pool.opts.workerChoiceStrategy).toBe( - WorkerChoiceStrategies.LESS_BUSY - ) - expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe( - WorkerChoiceStrategies.LESS_BUSY + './tests/worker-files/thread/testWorker.js', + { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_ELU } ) + // TODO: Create a better test to cover `LeastEluWorkerChoiceStrategy#choose` + const maxMultiplier = 2 + for (let i = 0; i < max * maxMultiplier; i++) { + await pool.execute() + if (i !== max * maxMultiplier - 1) await TestUtils.sleep(500) + } + for (const workerNode of pool.workerNodes) { + const expectedWorkerUsage = { + tasks: { + executed: expect.any(Number), + executing: 0, + queued: 0, + failed: 0 + }, + runTime: { + aggregation: 0, + average: 0, + median: 0, + history: expect.any(CircularArray) + }, + waitTime: { + aggregation: 0, + average: 0, + median: 0, + history: expect.any(CircularArray) + } + } + if (workerNode.workerUsage.elu === undefined) { + expect(workerNode.workerUsage).toStrictEqual({ + ...expectedWorkerUsage, + elu: undefined + }) + } else { + expect(workerNode.workerUsage).toStrictEqual({ + ...expectedWorkerUsage, + elu: { + active: expect.any(Number), + idle: 0, + utilization: 1 + } + }) + } + expect(workerNode.workerUsage.tasks.executed).toBeGreaterThanOrEqual(0) + expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual( + max * maxMultiplier + ) + } // We need to clean up the resources after our test await pool.destroy() }) - it('Verify LESS_BUSY strategy default tasks usage statistics requirements', async () => { + it('Verify FAIR_SHARE strategy default tasks usage statistics requirements', async () => { + const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE let pool = new FixedThreadPool( max, './tests/worker-files/thread/testWorker.js', - { workerChoiceStrategy: WorkerChoiceStrategies.LESS_BUSY } + { workerChoiceStrategy } ) expect( - pool.workerChoiceStrategyContext.getRequiredStatistics().runTime - ).toBe(true) - expect( - pool.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime - ).toBe(false) - expect( - pool.workerChoiceStrategyContext.getRequiredStatistics().medRunTime - ).toBe(false) + pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() + ).toStrictEqual({ + runTime: true, + avgRunTime: true, + medRunTime: false, + waitTime: false, + avgWaitTime: false, + medWaitTime: false, + elu: false + }) await pool.destroy() pool = new DynamicThreadPool( min, max, './tests/worker-files/thread/testWorker.js', - { workerChoiceStrategy: WorkerChoiceStrategies.LESS_BUSY } + { workerChoiceStrategy } ) expect( - pool.workerChoiceStrategyContext.getRequiredStatistics().runTime - ).toBe(true) - expect( - pool.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime - ).toBe(false) - expect( - pool.workerChoiceStrategyContext.getRequiredStatistics().medRunTime - ).toBe(false) + pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() + ).toStrictEqual({ + runTime: true, + avgRunTime: true, + medRunTime: false, + waitTime: false, + avgWaitTime: false, + medWaitTime: false, + elu: false + }) // We need to clean up the resources after our test await pool.destroy() }) - it('Verify LESS_BUSY strategy can be run in a fixed pool', async () => { + it('Verify FAIR_SHARE strategy can be run in a fixed pool', async () => { const pool = new FixedThreadPool( max, './tests/worker-files/thread/testWorker.js', - { workerChoiceStrategy: WorkerChoiceStrategies.LESS_BUSY } + { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE } ) - // TODO: Create a better test to cover `LessBusyWorkerChoiceStrategy#choose` - const promises = [] - for (let i = 0; i < max * 2; i++) { - promises.push(pool.execute()) + // TODO: Create a better test to cover `FairShareChoiceStrategy#choose` + const promises = new Set() + const maxMultiplier = 2 + for (let i = 0; i < max * maxMultiplier; i++) { + promises.add(pool.execute()) } await Promise.all(promises) + for (const workerNode of pool.workerNodes) { + expect(workerNode.workerUsage).toStrictEqual({ + tasks: { + executed: maxMultiplier, + executing: 0, + queued: 0, + failed: 0 + }, + runTime: { + aggregation: expect.any(Number), + average: expect.any(Number), + median: 0, + history: expect.any(CircularArray) + }, + waitTime: { + aggregation: 0, + average: 0, + median: 0, + history: expect.any(CircularArray) + }, + elu: undefined + }) + expect(workerNode.workerUsage.runTime.aggregation).toBeGreaterThan(0) + expect(workerNode.workerUsage.runTime.average).toBeGreaterThan(0) + } + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).workersVirtualTaskEndTimestamp.length + ).toBe(pool.workerNodes.length) // We need to clean up the resources after our test await pool.destroy() }) - it('Verify LESS_BUSY strategy can be run in a dynamic pool', async () => { + it('Verify FAIR_SHARE strategy can be run in a dynamic pool', async () => { const pool = new DynamicThreadPool( min, max, './tests/worker-files/thread/testWorker.js', - { workerChoiceStrategy: WorkerChoiceStrategies.LESS_BUSY } + { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE } ) - // TODO: Create a better test to cover `LessBusyWorkerChoiceStrategy#choose` - const promises = [] - for (let i = 0; i < max * 2; i++) { - promises.push(pool.execute()) + // TODO: Create a better test to cover `FairShareChoiceStrategy#choose` + const promises = new Set() + const maxMultiplier = 2 + for (let i = 0; i < max * maxMultiplier; i++) { + promises.add(pool.execute()) } await Promise.all(promises) + for (const workerNode of pool.workerNodes) { + expect(workerNode.workerUsage).toStrictEqual({ + tasks: { + executed: maxMultiplier, + executing: 0, + queued: 0, + failed: 0 + }, + runTime: { + aggregation: expect.any(Number), + average: expect.any(Number), + median: 0, + history: expect.any(CircularArray) + }, + waitTime: { + aggregation: 0, + average: 0, + median: 0, + history: expect.any(CircularArray) + }, + elu: undefined + }) + expect(workerNode.workerUsage.runTime.aggregation).toBeGreaterThan(0) + expect(workerNode.workerUsage.runTime.average).toBeGreaterThan(0) + } + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).workersVirtualTaskEndTimestamp.length + ).toBe(pool.workerNodes.length) // We need to clean up the resources after our test await pool.destroy() }) - it('Verify FAIR_SHARE strategy is taken at pool creation', async () => { - const pool = new FixedThreadPool( + it('Verify FAIR_SHARE strategy can be run in a dynamic pool with median runtime statistic', async () => { + const pool = new DynamicThreadPool( + min, max, './tests/worker-files/thread/testWorker.js', - { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE } - ) - expect(pool.opts.workerChoiceStrategy).toBe( - WorkerChoiceStrategies.FAIR_SHARE - ) - expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe( - WorkerChoiceStrategies.FAIR_SHARE + { + workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE, + workerChoiceStrategyOptions: { + medRunTime: true + } + } ) - for (const workerNodeKey of pool.workerChoiceStrategyContext.workerChoiceStrategies - .get(pool.workerChoiceStrategyContext.workerChoiceStrategy) - .workerLastVirtualTaskTimestamp.keys()) { - expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies - .get(pool.workerChoiceStrategyContext.workerChoiceStrategy) - .workerLastVirtualTaskTimestamp.get(workerNodeKey).start - ).toBe(0) - expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies - .get(pool.workerChoiceStrategyContext.workerChoiceStrategy) - .workerLastVirtualTaskTimestamp.get(workerNodeKey).end - ).toBe(0) + // TODO: Create a better test to cover `FairShareChoiceStrategy#choose` + const promises = new Set() + const maxMultiplier = 2 + for (let i = 0; i < max * maxMultiplier; i++) { + promises.add(pool.execute()) } + await Promise.all(promises) + for (const workerNode of pool.workerNodes) { + expect(workerNode.workerUsage).toStrictEqual({ + tasks: { + executed: maxMultiplier, + executing: 0, + queued: 0, + failed: 0 + }, + runTime: { + aggregation: expect.any(Number), + average: 0, + median: expect.any(Number), + history: expect.any(CircularArray) + }, + waitTime: { + aggregation: 0, + average: 0, + median: 0, + history: expect.any(CircularArray) + }, + elu: undefined + }) + expect(workerNode.workerUsage.runTime.aggregation).toBeGreaterThan(0) + expect(workerNode.workerUsage.runTime.median).toBeGreaterThan(0) + } + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).workersVirtualTaskEndTimestamp.length + ).toBe(pool.workerNodes.length) // We need to clean up the resources after our test await pool.destroy() }) - it('Verify FAIR_SHARE strategy can be set after pool creation', async () => { - const pool = new FixedThreadPool( + it('Verify FAIR_SHARE strategy internals are resets after setting it', async () => { + const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE + let pool = new FixedThreadPool( max, './tests/worker-files/thread/testWorker.js' ) - pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE) - expect(pool.opts.workerChoiceStrategy).toBe( - WorkerChoiceStrategies.FAIR_SHARE - ) - expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe( - WorkerChoiceStrategies.FAIR_SHARE + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + workerChoiceStrategy + ).workersVirtualTaskEndTimestamp + ).toBeInstanceOf(Array) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + workerChoiceStrategy + ).workersVirtualTaskEndTimestamp.length + ).toBe(0) + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + workerChoiceStrategy + ).workersVirtualTaskEndTimestamp[0] = performance.now() + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + workerChoiceStrategy + ).workersVirtualTaskEndTimestamp.length + ).toBe(1) + pool.setWorkerChoiceStrategy(workerChoiceStrategy) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + workerChoiceStrategy + ).workersVirtualTaskEndTimestamp + ).toBeInstanceOf(Array) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + workerChoiceStrategy + ).workersVirtualTaskEndTimestamp.length + ).toBe(0) + await pool.destroy() + pool = new DynamicThreadPool( + min, + max, + './tests/worker-files/thread/testWorker.js' ) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + workerChoiceStrategy + ).workersVirtualTaskEndTimestamp + ).toBeInstanceOf(Array) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + workerChoiceStrategy + ).workersVirtualTaskEndTimestamp.length + ).toBe(0) + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + workerChoiceStrategy + ).workersVirtualTaskEndTimestamp[0] = performance.now() + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + workerChoiceStrategy + ).workersVirtualTaskEndTimestamp.length + ).toBe(1) + pool.setWorkerChoiceStrategy(workerChoiceStrategy) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + workerChoiceStrategy + ).workersVirtualTaskEndTimestamp + ).toBeInstanceOf(Array) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + workerChoiceStrategy + ).workersVirtualTaskEndTimestamp.length + ).toBe(0) // We need to clean up the resources after our test await pool.destroy() }) - it('Verify FAIR_SHARE strategy default tasks usage statistics requirements', async () => { + it('Verify WEIGHTED_ROUND_ROBIN strategy default tasks usage statistics requirements', async () => { + const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN let pool = new FixedThreadPool( max, './tests/worker-files/thread/testWorker.js', - { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE } + { workerChoiceStrategy } ) expect( - pool.workerChoiceStrategyContext.getRequiredStatistics().runTime - ).toBe(true) - expect( - pool.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime - ).toBe(true) - expect( - pool.workerChoiceStrategyContext.getRequiredStatistics().medRunTime - ).toBe(false) + pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() + ).toStrictEqual({ + runTime: true, + avgRunTime: true, + medRunTime: false, + waitTime: false, + avgWaitTime: false, + medWaitTime: false, + elu: false + }) await pool.destroy() pool = new DynamicThreadPool( min, max, './tests/worker-files/thread/testWorker.js', - { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE } + { workerChoiceStrategy } ) expect( - pool.workerChoiceStrategyContext.getRequiredStatistics().runTime - ).toBe(true) + pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() + ).toStrictEqual({ + runTime: true, + avgRunTime: true, + medRunTime: false, + waitTime: false, + avgWaitTime: false, + medWaitTime: false, + elu: false + }) + // We need to clean up the resources after our test + await pool.destroy() + }) + + it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a fixed pool', async () => { + const pool = new FixedThreadPool( + max, + './tests/worker-files/thread/testWorker.js', + { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN } + ) + // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose` + const promises = new Set() + const maxMultiplier = 2 + for (let i = 0; i < max * maxMultiplier; i++) { + promises.add(pool.execute()) + } + await Promise.all(promises) + for (const workerNode of pool.workerNodes) { + expect(workerNode.workerUsage).toStrictEqual({ + tasks: { + executed: expect.any(Number), + executing: 0, + queued: 0, + failed: 0 + }, + runTime: { + aggregation: expect.any(Number), + average: expect.any(Number), + median: 0, + history: expect.any(CircularArray) + }, + waitTime: { + aggregation: 0, + average: 0, + median: 0, + history: expect.any(CircularArray) + }, + elu: undefined + }) + expect(workerNode.workerUsage.tasks.executed).toBeGreaterThanOrEqual(0) + expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual( + max * maxMultiplier + ) + expect(workerNode.workerUsage.runTime.aggregation).toBeGreaterThanOrEqual( + 0 + ) + expect(workerNode.workerUsage.runTime.average).toBeGreaterThanOrEqual(0) + } expect( - pool.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime - ).toBe(true) + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).defaultWorkerWeight + ).toBeGreaterThan(0) expect( - pool.workerChoiceStrategyContext.getRequiredStatistics().medRunTime - ).toBe(false) + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).workerVirtualTaskRunTime + ).toBeGreaterThanOrEqual(0) // We need to clean up the resources after our test await pool.destroy() }) - it('Verify FAIR_SHARE strategy can be run in a fixed pool', async () => { - const pool = new FixedThreadPool( + it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool', async () => { + const pool = new DynamicThreadPool( + min, max, './tests/worker-files/thread/testWorker.js', - { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE } + { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN } ) - // TODO: Create a better test to cover `FairShareChoiceStrategy#choose` - const promises = [] - for (let i = 0; i < max * 2; i++) { - promises.push(pool.execute()) + // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose` + const promises = new Set() + const maxMultiplier = 2 + for (let i = 0; i < max * maxMultiplier; i++) { + promises.add(pool.execute()) } await Promise.all(promises) + for (const workerNode of pool.workerNodes) { + expect(workerNode.workerUsage).toStrictEqual({ + tasks: { + executed: expect.any(Number), + executing: 0, + queued: 0, + failed: 0 + }, + runTime: { + aggregation: expect.any(Number), + average: expect.any(Number), + median: 0, + history: expect.any(CircularArray) + }, + waitTime: { + aggregation: 0, + average: 0, + median: 0, + history: expect.any(CircularArray) + }, + elu: undefined + }) + expect(workerNode.workerUsage.tasks.executed).toBeGreaterThan(0) + expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual( + max * maxMultiplier + ) + expect(workerNode.workerUsage.runTime.aggregation).toBeGreaterThan(0) + expect(workerNode.workerUsage.runTime.average).toBeGreaterThan(0) + } expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( pool.workerChoiceStrategyContext.workerChoiceStrategy - ).workerLastVirtualTaskTimestamp.size - ).toBe(pool.workerNodes.length) + ).defaultWorkerWeight + ).toBeGreaterThan(0) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).workerVirtualTaskRunTime + ).toBeGreaterThanOrEqual(0) // We need to clean up the resources after our test await pool.destroy() }) - it('Verify FAIR_SHARE strategy can be run in a dynamic pool', async () => { + it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool with median runtime statistic', async () => { const pool = new DynamicThreadPool( min, max, './tests/worker-files/thread/testWorker.js', - { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE } + { + workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN, + workerChoiceStrategyOptions: { + medRunTime: true + } + } ) - // TODO: Create a better test to cover `FairShareChoiceStrategy#choose` - const promises = [] + // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose` + const promises = new Set() const maxMultiplier = 2 for (let i = 0; i < max * maxMultiplier; i++) { - promises.push(pool.execute()) + promises.add(pool.execute()) } await Promise.all(promises) - // if (process.platform !== 'win32') { - // expect( - // pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - // pool.workerChoiceStrategyContext.workerChoiceStrategy - // ).workerLastVirtualTaskTimestamp.size - // ).toBe(pool.workerNodes.length) - // } + for (const workerNode of pool.workerNodes) { + expect(workerNode.workerUsage).toStrictEqual({ + tasks: { + executed: expect.any(Number), + executing: 0, + queued: 0, + failed: 0 + }, + runTime: { + aggregation: expect.any(Number), + average: 0, + median: expect.any(Number), + history: expect.any(CircularArray) + }, + waitTime: { + aggregation: 0, + average: 0, + median: 0, + history: expect.any(CircularArray) + }, + elu: undefined + }) + expect(workerNode.workerUsage.tasks.executed).toBeGreaterThan(0) + expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual( + max * maxMultiplier + ) + expect(workerNode.workerUsage.runTime.aggregation).toBeGreaterThan(0) + expect(workerNode.workerUsage.runTime.median).toBeGreaterThan(0) + } + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).defaultWorkerWeight + ).toBeGreaterThan(0) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).workerVirtualTaskRunTime + ).toBeGreaterThanOrEqual(0) // We need to clean up the resources after our test await pool.destroy() }) - it('Verify FAIR_SHARE strategy internals are resets after setting it', async () => { + it('Verify WEIGHTED_ROUND_ROBIN strategy internals are resets after setting it', async () => { + const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN let pool = new FixedThreadPool( max, './tests/worker-files/thread/testWorker.js' ) expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - WorkerChoiceStrategies.FAIR_SHARE - ).workerLastVirtualTaskTimestamp + workerChoiceStrategy + ).currentWorkerNodeId ).toBeDefined() - pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE) - for (const workerNodeKey of pool.workerChoiceStrategyContext.workerChoiceStrategies - .get(pool.workerChoiceStrategyContext.workerChoiceStrategy) - .workerLastVirtualTaskTimestamp.keys()) { - expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies - .get(pool.workerChoiceStrategyContext.workerChoiceStrategy) - .workerLastVirtualTaskTimestamp.get(workerNodeKey).start - ).toBe(0) - expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies - .get(pool.workerChoiceStrategyContext.workerChoiceStrategy) - .workerLastVirtualTaskTimestamp.get(workerNodeKey).end - ).toBe(0) - } + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + workerChoiceStrategy + ).defaultWorkerWeight + ).toBeDefined() + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + workerChoiceStrategy + ).workerVirtualTaskRunTime + ).toBeDefined() + pool.setWorkerChoiceStrategy(workerChoiceStrategy) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).currentWorkerNodeId + ).toBe(0) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).defaultWorkerWeight + ).toBeGreaterThan(0) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + workerChoiceStrategy + ).workerVirtualTaskRunTime + ).toBe(0) await pool.destroy() pool = new DynamicThreadPool( min, @@ -558,40 +1193,20 @@ describe('Selection strategies test suite', () => { ) expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - WorkerChoiceStrategies.FAIR_SHARE - ).workerLastVirtualTaskTimestamp + workerChoiceStrategy + ).currentWorkerNodeId ).toBeDefined() - pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE) - for (const workerNodeKey of pool.workerChoiceStrategyContext.workerChoiceStrategies - .get(pool.workerChoiceStrategyContext.workerChoiceStrategy) - .workerLastVirtualTaskTimestamp.keys()) { - expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies - .get(pool.workerChoiceStrategyContext.workerChoiceStrategy) - .workerLastVirtualTaskTimestamp.get(workerNodeKey).start - ).toBe(0) - expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies - .get(pool.workerChoiceStrategyContext.workerChoiceStrategy) - .workerLastVirtualTaskTimestamp.get(workerNodeKey).end - ).toBe(0) - } - // We need to clean up the resources after our test - await pool.destroy() - }) - - it('Verify WEIGHTED_ROUND_ROBIN strategy is taken at pool creation', async () => { - const pool = new FixedThreadPool( - max, - './tests/worker-files/thread/testWorker.js', - { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN } - ) - expect(pool.opts.workerChoiceStrategy).toBe( - WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN - ) - expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe( - WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN - ) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + workerChoiceStrategy + ).defaultWorkerWeight + ).toBeDefined() + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + workerChoiceStrategy + ).workerVirtualTaskRunTime + ).toBeDefined() + pool.setWorkerChoiceStrategy(workerChoiceStrategy) expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( pool.workerChoiceStrategyContext.workerChoiceStrategy @@ -602,145 +1217,224 @@ describe('Selection strategies test suite', () => { pool.workerChoiceStrategyContext.workerChoiceStrategy ).defaultWorkerWeight ).toBeGreaterThan(0) - for (const workerNodeKey of pool.workerChoiceStrategyContext.workerChoiceStrategies - .get(pool.workerChoiceStrategyContext.workerChoiceStrategy) - .workersTaskRunTime.keys()) { - expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies - .get(pool.workerChoiceStrategyContext.workerChoiceStrategy) - .workersTaskRunTime.get(workerNodeKey).weight - ).toBeGreaterThan(0) - expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies - .get(pool.workerChoiceStrategyContext.workerChoiceStrategy) - .workersTaskRunTime.get(workerNodeKey).runTime - ).toBe(0) - } - // We need to clean up the resources after our test - await pool.destroy() - }) - - it('Verify WEIGHTED_ROUND_ROBIN strategy can be set after pool creation', async () => { - const pool = new FixedThreadPool( - max, - './tests/worker-files/thread/testWorker.js' - ) - pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN) - expect(pool.opts.workerChoiceStrategy).toBe( - WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN - ) - expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe( - WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN - ) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + workerChoiceStrategy + ).workerVirtualTaskRunTime + ).toBe(0) // We need to clean up the resources after our test await pool.destroy() }) - it('Verify WEIGHTED_ROUND_ROBIN strategy default tasks usage statistics requirements', async () => { + it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy default tasks usage statistics requirements', async () => { + const workerChoiceStrategy = + WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN let pool = new FixedThreadPool( max, './tests/worker-files/thread/testWorker.js', - { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN } + { workerChoiceStrategy } ) expect( - pool.workerChoiceStrategyContext.getRequiredStatistics().runTime - ).toBe(true) - expect( - pool.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime - ).toBe(true) - expect( - pool.workerChoiceStrategyContext.getRequiredStatistics().medRunTime - ).toBe(false) + pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() + ).toStrictEqual({ + runTime: false, + avgRunTime: false, + medRunTime: false, + waitTime: false, + avgWaitTime: false, + medWaitTime: false, + elu: false + }) await pool.destroy() pool = new DynamicThreadPool( min, max, './tests/worker-files/thread/testWorker.js', - { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN } + { workerChoiceStrategy } ) expect( - pool.workerChoiceStrategyContext.getRequiredStatistics().runTime - ).toBe(true) - expect( - pool.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime - ).toBe(true) - expect( - pool.workerChoiceStrategyContext.getRequiredStatistics().medRunTime - ).toBe(false) + pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() + ).toStrictEqual({ + runTime: false, + avgRunTime: false, + medRunTime: false, + waitTime: false, + avgWaitTime: false, + medWaitTime: false, + elu: false + }) // We need to clean up the resources after our test await pool.destroy() }) - it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a fixed pool', async () => { + it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy can be run in a fixed pool', async () => { const pool = new FixedThreadPool( max, './tests/worker-files/thread/testWorker.js', - { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN } + { + workerChoiceStrategy: + WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN + } ) - // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose` - const promises = [] - for (let i = 0; i < max * 2; i++) { - promises.push(pool.execute()) + // TODO: Create a better test to cover `InterleavedWeightedRoundRobinWorkerChoiceStrategy#choose` + const promises = new Set() + const maxMultiplier = 2 + for (let i = 0; i < max * maxMultiplier; i++) { + promises.add(pool.execute()) } await Promise.all(promises) + for (const workerNode of pool.workerNodes) { + expect(workerNode.workerUsage).toStrictEqual({ + tasks: { + executed: maxMultiplier, + executing: 0, + queued: 0, + failed: 0 + }, + runTime: { + aggregation: 0, + average: 0, + median: 0, + history: expect.any(CircularArray) + }, + waitTime: { + aggregation: 0, + average: 0, + median: 0, + history: expect.any(CircularArray) + }, + elu: undefined + }) + } expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( pool.workerChoiceStrategyContext.workerChoiceStrategy - ).workersTaskRunTime.size - ).toBe(pool.workerNodes.length) + ).defaultWorkerWeight + ).toBeGreaterThan(0) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).currentRoundId + ).toBe(0) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).currentWorkerNodeId + ).toBe(0) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).roundWeights + ).toStrictEqual([ + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).defaultWorkerWeight + ]) // We need to clean up the resources after our test await pool.destroy() }) - it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool', async () => { + it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool', async () => { const pool = new DynamicThreadPool( min, max, './tests/worker-files/thread/testWorker.js', - { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN } + { + workerChoiceStrategy: + WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN + } ) - // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose` - const promises = [] - const maxMultiplier = - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - pool.workerChoiceStrategyContext.workerChoiceStrategy - ).defaultWorkerWeight * 50 + // TODO: Create a better test to cover `InterleavedWeightedRoundRobinWorkerChoiceStrategy#choose` + const promises = new Set() + const maxMultiplier = 2 for (let i = 0; i < max * maxMultiplier; i++) { - promises.push(pool.execute()) + promises.add(pool.execute()) } await Promise.all(promises) - if (process.platform !== 'win32') { - expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - pool.workerChoiceStrategyContext.workerChoiceStrategy - ).workersTaskRunTime.size - ).toBe(pool.workerNodes.length) + for (const workerNode of pool.workerNodes) { + expect(workerNode.workerUsage).toStrictEqual({ + tasks: { + executed: maxMultiplier, + executing: 0, + queued: 0, + failed: 0 + }, + runTime: { + aggregation: 0, + average: 0, + median: 0, + history: expect.any(CircularArray) + }, + waitTime: { + aggregation: 0, + average: 0, + median: 0, + history: expect.any(CircularArray) + }, + elu: undefined + }) } + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).defaultWorkerWeight + ).toBeGreaterThan(0) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).currentRoundId + ).toBe(0) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).currentWorkerNodeId + ).toBe(0) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).roundWeights + ).toStrictEqual([ + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).defaultWorkerWeight + ]) // We need to clean up the resources after our test await pool.destroy() }) - it('Verify WEIGHTED_ROUND_ROBIN strategy internals are resets after setting it', async () => { + it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy internals are resets after setting it', async () => { + const workerChoiceStrategy = + WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN let pool = new FixedThreadPool( max, './tests/worker-files/thread/testWorker.js' ) expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN + workerChoiceStrategy + ).currentRoundId + ).toBeDefined() + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + workerChoiceStrategy ).currentWorkerNodeId ).toBeDefined() expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN + workerChoiceStrategy ).defaultWorkerWeight ).toBeDefined() expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN - ).workersTaskRunTime + workerChoiceStrategy + ).roundWeights ).toBeDefined() - pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN) + pool.setWorkerChoiceStrategy(workerChoiceStrategy) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + workerChoiceStrategy + ).currentRoundId + ).toBe(0) expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( pool.workerChoiceStrategyContext.workerChoiceStrategy @@ -751,15 +1445,15 @@ describe('Selection strategies test suite', () => { pool.workerChoiceStrategyContext.workerChoiceStrategy ).defaultWorkerWeight ).toBeGreaterThan(0) - for (const workerNodeKey of pool.workerChoiceStrategyContext.workerChoiceStrategies - .get(pool.workerChoiceStrategyContext.workerChoiceStrategy) - .workersTaskRunTime.keys()) { - expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies - .get(pool.workerChoiceStrategyContext.workerChoiceStrategy) - .workersTaskRunTime.get(workerNodeKey).runTime - ).toBe(0) - } + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + workerChoiceStrategy + ).roundWeights + ).toStrictEqual([ + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).defaultWorkerWeight + ]) await pool.destroy() pool = new DynamicThreadPool( min, @@ -768,20 +1462,25 @@ describe('Selection strategies test suite', () => { ) expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN + workerChoiceStrategy + ).currentRoundId + ).toBeDefined() + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + workerChoiceStrategy ).currentWorkerNodeId ).toBeDefined() expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN + workerChoiceStrategy ).defaultWorkerWeight ).toBeDefined() expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN - ).workersTaskRunTime + workerChoiceStrategy + ).roundWeights ).toBeDefined() - pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN) + pool.setWorkerChoiceStrategy(workerChoiceStrategy) expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( pool.workerChoiceStrategyContext.workerChoiceStrategy @@ -792,20 +1491,20 @@ describe('Selection strategies test suite', () => { pool.workerChoiceStrategyContext.workerChoiceStrategy ).defaultWorkerWeight ).toBeGreaterThan(0) - for (const workerNodeKey of pool.workerChoiceStrategyContext.workerChoiceStrategies - .get(pool.workerChoiceStrategyContext.workerChoiceStrategy) - .workersTaskRunTime.keys()) { - expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies - .get(pool.workerChoiceStrategyContext.workerChoiceStrategy) - .workersTaskRunTime.get(workerNodeKey).runTime - ).toBe(0) - } + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + workerChoiceStrategy + ).roundWeights + ).toStrictEqual([ + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).defaultWorkerWeight + ]) // We need to clean up the resources after our test await pool.destroy() }) - it('Verify unknown strategies throw error', () => { + it('Verify unknown strategy throw error', () => { expect( () => new DynamicThreadPool( @@ -814,8 +1513,6 @@ describe('Selection strategies test suite', () => { './tests/worker-files/thread/testWorker.js', { workerChoiceStrategy: 'UNKNOWN_STRATEGY' } ) - ).toThrowError( - new Error("Invalid worker choice strategy 'UNKNOWN_STRATEGY'") - ) + ).toThrowError("Invalid worker choice strategy 'UNKNOWN_STRATEGY'") }) })