X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;ds=sidebyside;f=tests%2Fpools%2Fselection-strategies%2Fselection-strategies.test.js;h=1f99e086fc05d16eb064433f8584013fd44a30d7;hb=086fd84368e031d54e321b3eab5cedb301ca3666;hp=ea5fc04a69dd2c1fe6cf8b124c22b63fd2af3638;hpb=c2d2a1eabaec792f2478e25952cb42d9c96bd167;p=poolifier.git diff --git a/tests/pools/selection-strategies/selection-strategies.test.js b/tests/pools/selection-strategies/selection-strategies.test.js index ea5fc04a..1f99e086 100644 --- a/tests/pools/selection-strategies/selection-strategies.test.js +++ b/tests/pools/selection-strategies/selection-strategies.test.js @@ -1,10 +1,12 @@ const { expect } = require('expect') const { - WorkerChoiceStrategies, + DynamicClusterPool, DynamicThreadPool, + FixedClusterPool, FixedThreadPool, - FixedClusterPool + WorkerChoiceStrategies } = require('../../../lib') +const { CircularArray } = require('../../../lib/circular-array') describe('Selection strategies test suite', () => { const min = 0 @@ -14,6 +16,7 @@ describe('Selection strategies test suite', () => { expect(WorkerChoiceStrategies.ROUND_ROBIN).toBe('ROUND_ROBIN') expect(WorkerChoiceStrategies.LEAST_USED).toBe('LEAST_USED') expect(WorkerChoiceStrategies.LEAST_BUSY).toBe('LEAST_BUSY') + expect(WorkerChoiceStrategies.LEAST_ELU).toBe('LEAST_ELU') expect(WorkerChoiceStrategies.FAIR_SHARE).toBe('FAIR_SHARE') expect(WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN).toBe( 'WEIGHTED_ROUND_ROBIN' @@ -63,6 +66,43 @@ describe('Selection strategies test suite', () => { expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe( workerChoiceStrategy ) + expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({ + retries: 6, + runTime: { median: false }, + waitTime: { median: false }, + elu: { median: false } + }) + expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({ + retries: 6, + runTime: { median: false }, + waitTime: { median: false }, + elu: { median: false } + }) + await pool.destroy() + } + for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) { + const pool = new DynamicClusterPool( + min, + max, + './tests/worker-files/cluster/testWorker.js' + ) + pool.setWorkerChoiceStrategy(workerChoiceStrategy, { retries: 3 }) + expect(pool.opts.workerChoiceStrategy).toBe(workerChoiceStrategy) + expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe( + workerChoiceStrategy + ) + expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({ + retries: 3, + runTime: { median: false }, + waitTime: { median: false }, + elu: { median: false } + }) + expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({ + retries: 3, + runTime: { median: false }, + waitTime: { median: false }, + elu: { median: false } + }) await pool.destroy() } }) @@ -73,13 +113,17 @@ describe('Selection strategies test suite', () => { './tests/worker-files/thread/testWorker.js' ) for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) { - if (workerChoiceStrategy === WorkerChoiceStrategies.ROUND_ROBIN) { - expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - workerChoiceStrategy - ).nextWorkerNodeId - ).toBe(0) - } else if (workerChoiceStrategy === WorkerChoiceStrategies.FAIR_SHARE) { + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + workerChoiceStrategy + ).nextWorkerNodeKey + ).toBe(0) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + workerChoiceStrategy + ).previousWorkerNodeKey + ).toBe(0) + if (workerChoiceStrategy === WorkerChoiceStrategies.FAIR_SHARE) { expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( workerChoiceStrategy @@ -96,8 +140,17 @@ describe('Selection strategies test suite', () => { expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( workerChoiceStrategy - ).currentWorkerNodeId + ).defaultWorkerWeight + ).toBeGreaterThan(0) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + workerChoiceStrategy + ).workerVirtualTaskRunTime ).toBe(0) + } else if ( + workerChoiceStrategy === + WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN + ) { expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( workerChoiceStrategy @@ -108,27 +161,41 @@ describe('Selection strategies test suite', () => { workerChoiceStrategy ).workerVirtualTaskRunTime ).toBe(0) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + workerChoiceStrategy + ).roundId + ).toBe(0) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + workerChoiceStrategy + ).workerNodeId + ).toBe(0) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + workerChoiceStrategy + ).roundWeights + ).toStrictEqual([ + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + workerChoiceStrategy + ).defaultWorkerWeight + ]) } } await pool.destroy() }) - it('Verify ROUND_ROBIN strategy default tasks usage statistics requirements', async () => { + it('Verify ROUND_ROBIN strategy default policy', async () => { const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN let pool = new FixedThreadPool( max, './tests/worker-files/thread/testWorker.js', { workerChoiceStrategy } ) - expect( - pool.workerChoiceStrategyContext.getRequiredStatistics().runTime - ).toBe(false) - expect( - pool.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime - ).toBe(false) - expect( - pool.workerChoiceStrategyContext.getRequiredStatistics().medRunTime - ).toBe(false) + expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({ + dynamicWorkerUsage: false, + dynamicWorkerReady: true + }) await pool.destroy() pool = new DynamicThreadPool( min, @@ -136,15 +203,66 @@ describe('Selection strategies test suite', () => { './tests/worker-files/thread/testWorker.js', { workerChoiceStrategy } ) + expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({ + dynamicWorkerUsage: false, + dynamicWorkerReady: true + }) + // We need to clean up the resources after our test + await pool.destroy() + }) + + it('Verify ROUND_ROBIN strategy default tasks statistics requirements', async () => { + const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN + let pool = new FixedThreadPool( + max, + './tests/worker-files/thread/testWorker.js', + { workerChoiceStrategy } + ) expect( - pool.workerChoiceStrategyContext.getRequiredStatistics().runTime - ).toBe(false) - expect( - pool.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime - ).toBe(false) + pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() + ).toStrictEqual({ + runTime: { + aggregate: false, + average: false, + median: false + }, + waitTime: { + aggregate: false, + average: false, + median: false + }, + elu: { + aggregate: false, + average: false, + median: false + } + }) + await pool.destroy() + pool = new DynamicThreadPool( + min, + max, + './tests/worker-files/thread/testWorker.js', + { workerChoiceStrategy } + ) expect( - pool.workerChoiceStrategyContext.getRequiredStatistics().medRunTime - ).toBe(false) + pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() + ).toStrictEqual({ + runTime: { + aggregate: false, + average: false, + median: false + }, + waitTime: { + aggregate: false, + average: false, + median: false + }, + elu: { + aggregate: false, + average: false, + median: false + } + }) // We need to clean up the resources after our test await pool.destroy() }) @@ -156,10 +274,48 @@ describe('Selection strategies test suite', () => { { workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN } ) // TODO: Create a better test to cover `RoundRobinWorkerChoiceStrategy#choose` + const promises = new Set() const maxMultiplier = 2 for (let i = 0; i < max * maxMultiplier; i++) { - await pool.execute() + promises.add(pool.execute()) + } + await Promise.all(promises) + for (const workerNode of pool.workerNodes) { + expect(workerNode.usage).toStrictEqual({ + tasks: { + executed: maxMultiplier, + executing: 0, + queued: 0, + maxQueued: 0, + stolen: 0, + failed: 0 + }, + runTime: { + history: new CircularArray() + }, + waitTime: { + history: new CircularArray() + }, + elu: { + idle: { + history: new CircularArray() + }, + active: { + history: new CircularArray() + } + } + }) } + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + WorkerChoiceStrategies.ROUND_ROBIN + ).nextWorkerNodeKey + ).toBe(0) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + WorkerChoiceStrategies.ROUND_ROBIN + ).previousWorkerNodeKey + ).toBe(pool.workerNodes.length - 1) // We need to clean up the resources after our test await pool.destroy() }) @@ -172,10 +328,52 @@ describe('Selection strategies test suite', () => { { workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN } ) // TODO: Create a better test to cover `RoundRobinWorkerChoiceStrategy#choose` + const promises = new Set() const maxMultiplier = 2 for (let i = 0; i < max * maxMultiplier; i++) { - await pool.execute() + promises.add(pool.execute()) + } + await Promise.all(promises) + for (const workerNode of pool.workerNodes) { + expect(workerNode.usage).toStrictEqual({ + tasks: { + executed: expect.any(Number), + executing: 0, + queued: 0, + maxQueued: 0, + stolen: 0, + failed: 0 + }, + runTime: { + history: new CircularArray() + }, + waitTime: { + history: new CircularArray() + }, + elu: { + idle: { + history: new CircularArray() + }, + active: { + history: new CircularArray() + } + } + }) + expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0) + expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual( + max * maxMultiplier + ) } + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + WorkerChoiceStrategies.ROUND_ROBIN + ).nextWorkerNodeKey + ).toBe(0) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + WorkerChoiceStrategies.ROUND_ROBIN + ).previousWorkerNodeKey + ).toBe(pool.workerNodes.length - 1) // We need to clean up the resources after our test await pool.destroy() }) @@ -213,16 +411,16 @@ describe('Selection strategies test suite', () => { './tests/worker-files/thread/testWorker.js', { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN } ) + pool.setWorkerChoiceStrategy(workerChoiceStrategy) expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - workerChoiceStrategy - ).nextWorkerNodeId - ).toBeDefined() - pool.setWorkerChoiceStrategy(workerChoiceStrategy) + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).nextWorkerNodeKey + ).toBe(0) expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( pool.workerChoiceStrategyContext.workerChoiceStrategy - ).nextWorkerNodeId + ).previousWorkerNodeKey ).toBe(0) await pool.destroy() pool = new DynamicThreadPool( @@ -231,37 +429,32 @@ describe('Selection strategies test suite', () => { './tests/worker-files/thread/testWorker.js', { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN } ) + pool.setWorkerChoiceStrategy(workerChoiceStrategy) expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - workerChoiceStrategy - ).nextWorkerNodeId - ).toBeDefined() - pool.setWorkerChoiceStrategy(workerChoiceStrategy) + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).nextWorkerNodeKey + ).toBe(0) expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( pool.workerChoiceStrategyContext.workerChoiceStrategy - ).nextWorkerNodeId + ).previousWorkerNodeKey ).toBe(0) // We need to clean up the resources after our test await pool.destroy() }) - it('Verify LEAST_USED strategy default tasks usage statistics requirements', async () => { + it('Verify LEAST_USED strategy default policy', async () => { const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_USED let pool = new FixedThreadPool( max, './tests/worker-files/thread/testWorker.js', { workerChoiceStrategy } ) - expect( - pool.workerChoiceStrategyContext.getRequiredStatistics().runTime - ).toBe(false) - expect( - pool.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime - ).toBe(false) - expect( - pool.workerChoiceStrategyContext.getRequiredStatistics().medRunTime - ).toBe(false) + expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({ + dynamicWorkerUsage: false, + dynamicWorkerReady: true + }) await pool.destroy() pool = new DynamicThreadPool( min, @@ -269,15 +462,66 @@ describe('Selection strategies test suite', () => { './tests/worker-files/thread/testWorker.js', { workerChoiceStrategy } ) + expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({ + dynamicWorkerUsage: false, + dynamicWorkerReady: true + }) + // We need to clean up the resources after our test + await pool.destroy() + }) + + it('Verify LEAST_USED strategy default tasks statistics requirements', async () => { + const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_USED + let pool = new FixedThreadPool( + max, + './tests/worker-files/thread/testWorker.js', + { workerChoiceStrategy } + ) expect( - pool.workerChoiceStrategyContext.getRequiredStatistics().runTime - ).toBe(false) - expect( - pool.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime - ).toBe(false) + pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() + ).toStrictEqual({ + runTime: { + aggregate: false, + average: false, + median: false + }, + waitTime: { + aggregate: false, + average: false, + median: false + }, + elu: { + aggregate: false, + average: false, + median: false + } + }) + await pool.destroy() + pool = new DynamicThreadPool( + min, + max, + './tests/worker-files/thread/testWorker.js', + { workerChoiceStrategy } + ) expect( - pool.workerChoiceStrategyContext.getRequiredStatistics().medRunTime - ).toBe(false) + pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() + ).toStrictEqual({ + runTime: { + aggregate: false, + average: false, + median: false + }, + waitTime: { + aggregate: false, + average: false, + median: false + }, + elu: { + aggregate: false, + average: false, + median: false + } + }) // We need to clean up the resources after our test await pool.destroy() }) @@ -289,9 +533,41 @@ describe('Selection strategies test suite', () => { { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED } ) // TODO: Create a better test to cover `LeastUsedWorkerChoiceStrategy#choose` + const promises = new Set() const maxMultiplier = 2 for (let i = 0; i < max * maxMultiplier; i++) { - await pool.execute() + promises.add(pool.execute()) + } + await Promise.all(promises) + for (const workerNode of pool.workerNodes) { + expect(workerNode.usage).toStrictEqual({ + tasks: { + executed: expect.any(Number), + executing: 0, + queued: 0, + maxQueued: 0, + stolen: 0, + failed: 0 + }, + runTime: { + history: new CircularArray() + }, + waitTime: { + history: new CircularArray() + }, + elu: { + idle: { + history: new CircularArray() + }, + active: { + history: new CircularArray() + } + } + }) + expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0) + expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual( + max * maxMultiplier + ) } // We need to clean up the resources after our test await pool.destroy() @@ -305,30 +581,57 @@ describe('Selection strategies test suite', () => { { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED } ) // TODO: Create a better test to cover `LeastUsedWorkerChoiceStrategy#choose` + const promises = new Set() const maxMultiplier = 2 for (let i = 0; i < max * maxMultiplier; i++) { - await pool.execute() + promises.add(pool.execute()) + } + await Promise.all(promises) + for (const workerNode of pool.workerNodes) { + expect(workerNode.usage).toStrictEqual({ + tasks: { + executed: expect.any(Number), + executing: 0, + queued: 0, + maxQueued: 0, + stolen: 0, + failed: 0 + }, + runTime: { + history: new CircularArray() + }, + waitTime: { + history: new CircularArray() + }, + elu: { + idle: { + history: new CircularArray() + }, + active: { + history: new CircularArray() + } + } + }) + expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0) + expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual( + max * maxMultiplier + ) } // We need to clean up the resources after our test await pool.destroy() }) - it('Verify LEAST_BUSY strategy default tasks usage statistics requirements', async () => { + it('Verify LEAST_BUSY strategy default policy', async () => { const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_BUSY let pool = new FixedThreadPool( max, './tests/worker-files/thread/testWorker.js', { workerChoiceStrategy } ) - expect( - pool.workerChoiceStrategyContext.getRequiredStatistics().runTime - ).toBe(true) - expect( - pool.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime - ).toBe(false) - expect( - pool.workerChoiceStrategyContext.getRequiredStatistics().medRunTime - ).toBe(false) + expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({ + dynamicWorkerUsage: false, + dynamicWorkerReady: true + }) await pool.destroy() pool = new DynamicThreadPool( min, @@ -336,15 +639,66 @@ describe('Selection strategies test suite', () => { './tests/worker-files/thread/testWorker.js', { workerChoiceStrategy } ) + expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({ + dynamicWorkerUsage: false, + dynamicWorkerReady: true + }) + // We need to clean up the resources after our test + await pool.destroy() + }) + + it('Verify LEAST_BUSY strategy default tasks statistics requirements', async () => { + const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_BUSY + let pool = new FixedThreadPool( + max, + './tests/worker-files/thread/testWorker.js', + { workerChoiceStrategy } + ) expect( - pool.workerChoiceStrategyContext.getRequiredStatistics().runTime - ).toBe(true) - expect( - pool.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime - ).toBe(false) + pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() + ).toStrictEqual({ + runTime: { + aggregate: true, + average: false, + median: false + }, + waitTime: { + aggregate: true, + average: false, + median: false + }, + elu: { + aggregate: false, + average: false, + median: false + } + }) + await pool.destroy() + pool = new DynamicThreadPool( + min, + max, + './tests/worker-files/thread/testWorker.js', + { workerChoiceStrategy } + ) expect( - pool.workerChoiceStrategyContext.getRequiredStatistics().medRunTime - ).toBe(false) + pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() + ).toStrictEqual({ + runTime: { + aggregate: true, + average: false, + median: false + }, + waitTime: { + aggregate: true, + average: false, + median: false + }, + elu: { + aggregate: false, + average: false, + median: false + } + }) // We need to clean up the resources after our test await pool.destroy() }) @@ -356,9 +710,51 @@ describe('Selection strategies test suite', () => { { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_BUSY } ) // TODO: Create a better test to cover `LeastBusyWorkerChoiceStrategy#choose` + const promises = new Set() const maxMultiplier = 2 for (let i = 0; i < max * maxMultiplier; i++) { - await pool.execute() + promises.add(pool.execute()) + } + await Promise.all(promises) + for (const workerNode of pool.workerNodes) { + expect(workerNode.usage).toStrictEqual({ + tasks: { + executed: expect.any(Number), + executing: 0, + queued: 0, + maxQueued: 0, + stolen: 0, + failed: 0 + }, + runTime: expect.objectContaining({ + history: expect.any(CircularArray) + }), + waitTime: expect.objectContaining({ + history: expect.any(CircularArray) + }), + elu: { + idle: { + history: new CircularArray() + }, + active: { + history: new CircularArray() + } + } + }) + expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0) + expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual( + max * maxMultiplier + ) + if (workerNode.usage.runTime.aggregate == null) { + expect(workerNode.usage.runTime.aggregate).toBeUndefined() + } else { + expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0) + } + if (workerNode.usage.waitTime.aggregate == null) { + expect(workerNode.usage.waitTime.aggregate).toBeUndefined() + } else { + expect(workerNode.usage.waitTime.aggregate).toBeGreaterThan(0) + } } // We need to clean up the resources after our test await pool.destroy() @@ -372,30 +768,67 @@ describe('Selection strategies test suite', () => { { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_BUSY } ) // TODO: Create a better test to cover `LeastBusyWorkerChoiceStrategy#choose` + const promises = new Set() const maxMultiplier = 2 for (let i = 0; i < max * maxMultiplier; i++) { - await pool.execute() + promises.add(pool.execute()) + } + await Promise.all(promises) + for (const workerNode of pool.workerNodes) { + expect(workerNode.usage).toStrictEqual({ + tasks: { + executed: expect.any(Number), + executing: 0, + queued: 0, + maxQueued: 0, + stolen: 0, + failed: 0 + }, + runTime: expect.objectContaining({ + history: expect.any(CircularArray) + }), + waitTime: expect.objectContaining({ + history: expect.any(CircularArray) + }), + elu: { + idle: { + history: new CircularArray() + }, + active: { + history: new CircularArray() + } + } + }) + expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0) + expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual( + max * maxMultiplier + ) + if (workerNode.usage.runTime.aggregate == null) { + expect(workerNode.usage.runTime.aggregate).toBeUndefined() + } else { + expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0) + } + if (workerNode.usage.waitTime.aggregate == null) { + expect(workerNode.usage.waitTime.aggregate).toBeUndefined() + } else { + expect(workerNode.usage.waitTime.aggregate).toBeGreaterThan(0) + } } // We need to clean up the resources after our test await pool.destroy() }) - it('Verify FAIR_SHARE strategy default tasks usage statistics requirements', async () => { - const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE + it('Verify LEAST_ELU strategy default policy', async () => { + const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_ELU let pool = new FixedThreadPool( max, './tests/worker-files/thread/testWorker.js', { workerChoiceStrategy } ) - expect( - pool.workerChoiceStrategyContext.getRequiredStatistics().runTime - ).toBe(true) - expect( - pool.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime - ).toBe(true) - expect( - pool.workerChoiceStrategyContext.getRequiredStatistics().medRunTime - ).toBe(false) + expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({ + dynamicWorkerUsage: false, + dynamicWorkerReady: true + }) await pool.destroy() pool = new DynamicThreadPool( min, @@ -403,94 +836,347 @@ describe('Selection strategies test suite', () => { './tests/worker-files/thread/testWorker.js', { workerChoiceStrategy } ) + expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({ + dynamicWorkerUsage: false, + dynamicWorkerReady: true + }) + // We need to clean up the resources after our test + await pool.destroy() + }) + + it('Verify LEAST_ELU strategy default tasks statistics requirements', async () => { + const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_ELU + let pool = new FixedThreadPool( + max, + './tests/worker-files/thread/testWorker.js', + { workerChoiceStrategy } + ) expect( - pool.workerChoiceStrategyContext.getRequiredStatistics().runTime - ).toBe(true) - expect( - pool.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime - ).toBe(true) + pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() + ).toStrictEqual({ + runTime: { + aggregate: false, + average: false, + median: false + }, + waitTime: { + aggregate: false, + average: false, + median: false + }, + elu: { + aggregate: true, + average: false, + median: false + } + }) + await pool.destroy() + pool = new DynamicThreadPool( + min, + max, + './tests/worker-files/thread/testWorker.js', + { workerChoiceStrategy } + ) expect( - pool.workerChoiceStrategyContext.getRequiredStatistics().medRunTime - ).toBe(false) + pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() + ).toStrictEqual({ + runTime: { + aggregate: false, + average: false, + median: false + }, + waitTime: { + aggregate: false, + average: false, + median: false + }, + elu: { + aggregate: true, + average: false, + median: false + } + }) // We need to clean up the resources after our test await pool.destroy() }) - it('Verify FAIR_SHARE strategy can be run in a fixed pool', async () => { + it('Verify LEAST_ELU strategy can be run in a fixed pool', async () => { const pool = new FixedThreadPool( max, './tests/worker-files/thread/testWorker.js', - { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE } + { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_ELU } ) - // TODO: Create a better test to cover `FairShareChoiceStrategy#choose` + // TODO: Create a better test to cover `LeastEluWorkerChoiceStrategy#choose` + const promises = new Set() const maxMultiplier = 2 for (let i = 0; i < max * maxMultiplier; i++) { - await pool.execute() + promises.add(pool.execute()) } + await Promise.all(promises) for (const workerNode of pool.workerNodes) { - expect(workerNode.tasksUsage.avgRunTime).toBeDefined() - expect(workerNode.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0) - expect(workerNode.tasksUsage.medRunTime).toBeDefined() - expect(workerNode.tasksUsage.medRunTime).toBe(0) + expect(workerNode.usage).toStrictEqual({ + tasks: { + executed: expect.any(Number), + executing: 0, + queued: 0, + maxQueued: 0, + stolen: 0, + failed: 0 + }, + runTime: { + history: new CircularArray() + }, + waitTime: { + history: new CircularArray() + }, + elu: expect.objectContaining({ + idle: expect.objectContaining({ + history: expect.any(CircularArray) + }), + active: expect.objectContaining({ + history: expect.any(CircularArray) + }) + }) + }) + expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0) + expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual( + max * maxMultiplier + ) + if (workerNode.usage.elu.active.aggregate == null) { + expect(workerNode.usage.elu.active.aggregate).toBeUndefined() + } else { + expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0) + } + if (workerNode.usage.elu.idle.aggregate == null) { + expect(workerNode.usage.elu.idle.aggregate).toBeUndefined() + } else { + expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0) + } + if (workerNode.usage.elu.utilization == null) { + expect(workerNode.usage.elu.utilization).toBeUndefined() + } else { + expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0) + expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1) + } } - expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - pool.workerChoiceStrategyContext.workerChoiceStrategy - ).workersVirtualTaskEndTimestamp.length - ).toBe(pool.workerNodes.length) // We need to clean up the resources after our test await pool.destroy() }) - it('Verify FAIR_SHARE strategy can be run in a dynamic pool', async () => { + it('Verify LEAST_ELU strategy can be run in a dynamic pool', async () => { const pool = new DynamicThreadPool( min, max, './tests/worker-files/thread/testWorker.js', - { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE } + { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_ELU } ) - // TODO: Create a better test to cover `FairShareChoiceStrategy#choose` + // TODO: Create a better test to cover `LeastEluWorkerChoiceStrategy#choose` + const promises = new Set() const maxMultiplier = 2 for (let i = 0; i < max * maxMultiplier; i++) { - await pool.execute() + promises.add(pool.execute()) } + await Promise.all(promises) for (const workerNode of pool.workerNodes) { - expect(workerNode.tasksUsage.avgRunTime).toBeDefined() - expect(workerNode.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0) - expect(workerNode.tasksUsage.medRunTime).toBeDefined() - expect(workerNode.tasksUsage.medRunTime).toBe(0) + expect(workerNode.usage).toStrictEqual({ + tasks: { + executed: expect.any(Number), + executing: 0, + queued: 0, + maxQueued: 0, + stolen: 0, + failed: 0 + }, + runTime: { + history: new CircularArray() + }, + waitTime: { + history: new CircularArray() + }, + elu: expect.objectContaining({ + idle: expect.objectContaining({ + history: expect.any(CircularArray) + }), + active: expect.objectContaining({ + history: expect.any(CircularArray) + }) + }) + }) + expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0) + expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual( + max * maxMultiplier + ) + if (workerNode.usage.elu.active.aggregate == null) { + expect(workerNode.usage.elu.active.aggregate).toBeUndefined() + } else { + expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0) + } + if (workerNode.usage.elu.idle.aggregate == null) { + expect(workerNode.usage.elu.idle.aggregate).toBeUndefined() + } else { + expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0) + } + if (workerNode.usage.elu.utilization == null) { + expect(workerNode.usage.elu.utilization).toBeUndefined() + } else { + expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0) + expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1) + } } - expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - pool.workerChoiceStrategyContext.workerChoiceStrategy - ).workersVirtualTaskEndTimestamp.length - ).toBe(pool.workerNodes.length) // We need to clean up the resources after our test await pool.destroy() }) - it('Verify FAIR_SHARE strategy can be run in a dynamic pool with median runtime statistic', async () => { - const pool = new DynamicThreadPool( + it('Verify FAIR_SHARE strategy default policy', async () => { + const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE + let pool = new FixedThreadPool( + max, + './tests/worker-files/thread/testWorker.js', + { workerChoiceStrategy } + ) + expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({ + dynamicWorkerUsage: false, + dynamicWorkerReady: true + }) + await pool.destroy() + pool = new DynamicThreadPool( min, max, './tests/worker-files/thread/testWorker.js', - { - workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE, - workerChoiceStrategyOptions: { - medRunTime: true - } + { workerChoiceStrategy } + ) + expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({ + dynamicWorkerUsage: false, + dynamicWorkerReady: true + }) + // We need to clean up the resources after our test + await pool.destroy() + }) + + it('Verify FAIR_SHARE strategy default tasks statistics requirements', async () => { + const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE + let pool = new FixedThreadPool( + max, + './tests/worker-files/thread/testWorker.js', + { workerChoiceStrategy } + ) + expect( + pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() + ).toStrictEqual({ + runTime: { + aggregate: true, + average: true, + median: false + }, + waitTime: { + aggregate: false, + average: false, + median: false + }, + elu: { + aggregate: true, + average: true, + median: false + } + }) + await pool.destroy() + pool = new DynamicThreadPool( + min, + max, + './tests/worker-files/thread/testWorker.js', + { workerChoiceStrategy } + ) + expect( + pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() + ).toStrictEqual({ + runTime: { + aggregate: true, + average: true, + median: false + }, + waitTime: { + aggregate: false, + average: false, + median: false + }, + elu: { + aggregate: true, + average: true, + median: false } + }) + // We need to clean up the resources after our test + await pool.destroy() + }) + + it('Verify FAIR_SHARE strategy can be run in a fixed pool', async () => { + const pool = new FixedThreadPool( + max, + './tests/worker-files/thread/testWorker.js', + { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE } ) // TODO: Create a better test to cover `FairShareChoiceStrategy#choose` + const promises = new Set() const maxMultiplier = 2 for (let i = 0; i < max * maxMultiplier; i++) { - await pool.execute() + promises.add(pool.execute()) } + await Promise.all(promises) for (const workerNode of pool.workerNodes) { - expect(workerNode.tasksUsage.avgRunTime).toBeDefined() - expect(workerNode.tasksUsage.avgRunTime).toBe(0) - expect(workerNode.tasksUsage.medRunTime).toBeDefined() - expect(workerNode.tasksUsage.medRunTime).toBeGreaterThanOrEqual(0) + expect(workerNode.usage).toStrictEqual({ + tasks: { + executed: expect.any(Number), + executing: 0, + queued: 0, + maxQueued: 0, + stolen: 0, + failed: 0 + }, + runTime: expect.objectContaining({ + history: expect.any(CircularArray) + }), + waitTime: { + history: new CircularArray() + }, + elu: expect.objectContaining({ + idle: expect.objectContaining({ + history: expect.any(CircularArray) + }), + active: expect.objectContaining({ + history: expect.any(CircularArray) + }) + }) + }) + expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0) + expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual( + max * maxMultiplier + ) + if (workerNode.usage.runTime.aggregate == null) { + expect(workerNode.usage.runTime.aggregate).toBeUndefined() + } else { + expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0) + } + if (workerNode.usage.runTime.average == null) { + expect(workerNode.usage.runTime.average).toBeUndefined() + } else { + expect(workerNode.usage.runTime.average).toBeGreaterThan(0) + } + if (workerNode.usage.elu.active.aggregate == null) { + expect(workerNode.usage.elu.active.aggregate).toBeUndefined() + } else { + expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0) + } + if (workerNode.usage.elu.idle.aggregate == null) { + expect(workerNode.usage.elu.idle.aggregate).toBeUndefined() + } else { + expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0) + } + if (workerNode.usage.elu.utilization == null) { + expect(workerNode.usage.elu.utilization).toBeUndefined() + } else { + expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0) + expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1) + } } expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( @@ -501,10 +1187,173 @@ describe('Selection strategies test suite', () => { await pool.destroy() }) - it('Verify FAIR_SHARE strategy internals are resets after setting it', async () => { - const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE - let pool = new FixedThreadPool( - max, + it('Verify FAIR_SHARE strategy can be run in a dynamic pool', async () => { + const pool = new DynamicThreadPool( + min, + max, + './tests/worker-files/thread/testWorker.js', + { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE } + ) + // TODO: Create a better test to cover `FairShareChoiceStrategy#choose` + const promises = new Set() + const maxMultiplier = 2 + for (let i = 0; i < max * maxMultiplier; i++) { + promises.add(pool.execute()) + } + await Promise.all(promises) + for (const workerNode of pool.workerNodes) { + expect(workerNode.usage).toStrictEqual({ + tasks: { + executed: expect.any(Number), + executing: 0, + queued: 0, + maxQueued: 0, + stolen: 0, + failed: 0 + }, + runTime: expect.objectContaining({ + history: expect.any(CircularArray) + }), + waitTime: { + history: new CircularArray() + }, + elu: expect.objectContaining({ + idle: expect.objectContaining({ + history: expect.any(CircularArray) + }), + active: expect.objectContaining({ + history: expect.any(CircularArray) + }) + }) + }) + expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0) + expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual( + max * maxMultiplier + ) + if (workerNode.usage.runTime.aggregate == null) { + expect(workerNode.usage.runTime.aggregate).toBeUndefined() + } else { + expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0) + } + if (workerNode.usage.runTime.average == null) { + expect(workerNode.usage.runTime.average).toBeUndefined() + } else { + expect(workerNode.usage.runTime.average).toBeGreaterThan(0) + } + if (workerNode.usage.elu.active.aggregate == null) { + expect(workerNode.usage.elu.active.aggregate).toBeUndefined() + } else { + expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0) + } + if (workerNode.usage.elu.idle.aggregate == null) { + expect(workerNode.usage.elu.idle.aggregate).toBeUndefined() + } else { + expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0) + } + if (workerNode.usage.elu.utilization == null) { + expect(workerNode.usage.elu.utilization).toBeUndefined() + } else { + expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0) + expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1) + } + } + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).workersVirtualTaskEndTimestamp.length + ).toBe(pool.workerNodes.length) + // We need to clean up the resources after our test + await pool.destroy() + }) + + it('Verify FAIR_SHARE strategy can be run in a dynamic pool with median runtime statistic', async () => { + const pool = new DynamicThreadPool( + min, + max, + './tests/worker-files/thread/testWorker.js', + { + workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE, + workerChoiceStrategyOptions: { + runTime: { median: true } + } + } + ) + // TODO: Create a better test to cover `FairShareChoiceStrategy#choose` + const promises = new Set() + const maxMultiplier = 2 + for (let i = 0; i < max * maxMultiplier; i++) { + promises.add(pool.execute()) + } + await Promise.all(promises) + for (const workerNode of pool.workerNodes) { + expect(workerNode.usage).toStrictEqual({ + tasks: { + executed: expect.any(Number), + executing: 0, + queued: 0, + maxQueued: 0, + stolen: 0, + failed: 0 + }, + runTime: expect.objectContaining({ + history: expect.any(CircularArray) + }), + waitTime: { + history: new CircularArray() + }, + elu: expect.objectContaining({ + idle: expect.objectContaining({ + history: expect.any(CircularArray) + }), + active: expect.objectContaining({ + history: expect.any(CircularArray) + }) + }) + }) + expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0) + expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual( + max * maxMultiplier + ) + if (workerNode.usage.runTime.aggregate == null) { + expect(workerNode.usage.runTime.aggregate).toBeUndefined() + } else { + expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0) + } + if (workerNode.usage.runTime.median == null) { + expect(workerNode.usage.runTime.median).toBeUndefined() + } else { + expect(workerNode.usage.runTime.median).toBeGreaterThan(0) + } + if (workerNode.usage.elu.active.aggregate == null) { + expect(workerNode.usage.elu.active.aggregate).toBeUndefined() + } else { + expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0) + } + if (workerNode.usage.elu.idle.aggregate == null) { + expect(workerNode.usage.elu.idle.aggregate).toBeUndefined() + } else { + expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0) + } + if (workerNode.usage.elu.utilization == null) { + expect(workerNode.usage.elu.utilization).toBeUndefined() + } else { + expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0) + expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1) + } + } + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).workersVirtualTaskEndTimestamp.length + ).toBe(pool.workerNodes.length) + // We need to clean up the resources after our test + await pool.destroy() + }) + + it('Verify FAIR_SHARE strategy internals are resets after setting it', async () => { + const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE + let pool = new FixedThreadPool( + max, './tests/worker-files/thread/testWorker.js' ) expect( @@ -528,12 +1377,12 @@ describe('Selection strategies test suite', () => { pool.setWorkerChoiceStrategy(workerChoiceStrategy) expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - workerChoiceStrategy + pool.workerChoiceStrategyContext.workerChoiceStrategy ).workersVirtualTaskEndTimestamp ).toBeInstanceOf(Array) expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - workerChoiceStrategy + pool.workerChoiceStrategyContext.workerChoiceStrategy ).workersVirtualTaskEndTimestamp.length ).toBe(0) await pool.destroy() @@ -563,34 +1412,29 @@ describe('Selection strategies test suite', () => { pool.setWorkerChoiceStrategy(workerChoiceStrategy) expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - workerChoiceStrategy + pool.workerChoiceStrategyContext.workerChoiceStrategy ).workersVirtualTaskEndTimestamp ).toBeInstanceOf(Array) expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - workerChoiceStrategy + pool.workerChoiceStrategyContext.workerChoiceStrategy ).workersVirtualTaskEndTimestamp.length ).toBe(0) // We need to clean up the resources after our test await pool.destroy() }) - it('Verify WEIGHTED_ROUND_ROBIN strategy default tasks usage statistics requirements', async () => { + it('Verify WEIGHTED_ROUND_ROBIN strategy default policy', async () => { const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN let pool = new FixedThreadPool( max, './tests/worker-files/thread/testWorker.js', { workerChoiceStrategy } ) - expect( - pool.workerChoiceStrategyContext.getRequiredStatistics().runTime - ).toBe(true) - expect( - pool.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime - ).toBe(true) - expect( - pool.workerChoiceStrategyContext.getRequiredStatistics().medRunTime - ).toBe(false) + expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({ + dynamicWorkerUsage: false, + dynamicWorkerReady: true + }) await pool.destroy() pool = new DynamicThreadPool( min, @@ -598,15 +1442,66 @@ describe('Selection strategies test suite', () => { './tests/worker-files/thread/testWorker.js', { workerChoiceStrategy } ) + expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({ + dynamicWorkerUsage: false, + dynamicWorkerReady: true + }) + // We need to clean up the resources after our test + await pool.destroy() + }) + + it('Verify WEIGHTED_ROUND_ROBIN strategy default tasks statistics requirements', async () => { + const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN + let pool = new FixedThreadPool( + max, + './tests/worker-files/thread/testWorker.js', + { workerChoiceStrategy } + ) expect( - pool.workerChoiceStrategyContext.getRequiredStatistics().runTime - ).toBe(true) - expect( - pool.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime - ).toBe(true) + pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() + ).toStrictEqual({ + runTime: { + aggregate: true, + average: true, + median: false + }, + waitTime: { + aggregate: false, + average: false, + median: false + }, + elu: { + aggregate: false, + average: false, + median: false + } + }) + await pool.destroy() + pool = new DynamicThreadPool( + min, + max, + './tests/worker-files/thread/testWorker.js', + { workerChoiceStrategy } + ) expect( - pool.workerChoiceStrategyContext.getRequiredStatistics().medRunTime - ).toBe(false) + pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() + ).toStrictEqual({ + runTime: { + aggregate: true, + average: true, + median: false + }, + waitTime: { + aggregate: false, + average: false, + median: false + }, + elu: { + aggregate: false, + average: false, + median: false + } + }) // We need to clean up the resources after our test await pool.destroy() }) @@ -618,15 +1513,51 @@ describe('Selection strategies test suite', () => { { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN } ) // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose` + const promises = new Set() const maxMultiplier = 2 for (let i = 0; i < max * maxMultiplier; i++) { - await pool.execute() + promises.add(pool.execute()) } + await Promise.all(promises) for (const workerNode of pool.workerNodes) { - expect(workerNode.tasksUsage.avgRunTime).toBeDefined() - expect(workerNode.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0) - expect(workerNode.tasksUsage.medRunTime).toBeDefined() - expect(workerNode.tasksUsage.medRunTime).toBe(0) + expect(workerNode.usage).toStrictEqual({ + tasks: { + executed: expect.any(Number), + executing: 0, + queued: 0, + maxQueued: 0, + stolen: 0, + failed: 0 + }, + runTime: expect.objectContaining({ + history: expect.any(CircularArray) + }), + waitTime: { + history: new CircularArray() + }, + elu: { + idle: { + history: new CircularArray() + }, + active: { + history: new CircularArray() + } + } + }) + expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0) + expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual( + max * maxMultiplier + ) + if (workerNode.usage.runTime.aggregate == null) { + expect(workerNode.usage.runTime.aggregate).toBeUndefined() + } else { + expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0) + } + if (workerNode.usage.runTime.average == null) { + expect(workerNode.usage.runTime.average).toBeUndefined() + } else { + expect(workerNode.usage.runTime.average).toBeGreaterThan(0) + } } expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( @@ -650,15 +1581,51 @@ describe('Selection strategies test suite', () => { { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN } ) // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose` + const promises = new Set() const maxMultiplier = 2 for (let i = 0; i < max * maxMultiplier; i++) { - await pool.execute() + promises.add(pool.execute()) } + await Promise.all(promises) for (const workerNode of pool.workerNodes) { - expect(workerNode.tasksUsage.avgRunTime).toBeDefined() - expect(workerNode.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0) - expect(workerNode.tasksUsage.medRunTime).toBeDefined() - expect(workerNode.tasksUsage.medRunTime).toBe(0) + expect(workerNode.usage).toStrictEqual({ + tasks: { + executed: expect.any(Number), + executing: 0, + queued: 0, + maxQueued: 0, + stolen: 0, + failed: 0 + }, + runTime: expect.objectContaining({ + history: expect.any(CircularArray) + }), + waitTime: { + history: new CircularArray() + }, + elu: { + idle: { + history: new CircularArray() + }, + active: { + history: new CircularArray() + } + } + }) + expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0) + expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual( + max * maxMultiplier + ) + if (workerNode.usage.runTime.aggregate == null) { + expect(workerNode.usage.runTime.aggregate).toBeUndefined() + } else { + expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0) + } + if (workerNode.usage.runTime.average == null) { + expect(workerNode.usage.runTime.average).toBeUndefined() + } else { + expect(workerNode.usage.runTime.average).toBeGreaterThan(0) + } } expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( @@ -682,20 +1649,56 @@ describe('Selection strategies test suite', () => { { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN, workerChoiceStrategyOptions: { - medRunTime: true + runTime: { median: true } } } ) // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose` + const promises = new Set() const maxMultiplier = 2 for (let i = 0; i < max * maxMultiplier; i++) { - await pool.execute() + promises.add(pool.execute()) } + await Promise.all(promises) for (const workerNode of pool.workerNodes) { - expect(workerNode.tasksUsage.avgRunTime).toBeDefined() - expect(workerNode.tasksUsage.avgRunTime).toBe(0) - expect(workerNode.tasksUsage.medRunTime).toBeDefined() - expect(workerNode.tasksUsage.medRunTime).toBeGreaterThanOrEqual(0) + expect(workerNode.usage).toStrictEqual({ + tasks: { + executed: expect.any(Number), + executing: 0, + queued: 0, + maxQueued: 0, + stolen: 0, + failed: 0 + }, + runTime: expect.objectContaining({ + history: expect.any(CircularArray) + }), + waitTime: { + history: new CircularArray() + }, + elu: { + idle: { + history: new CircularArray() + }, + active: { + history: new CircularArray() + } + } + }) + expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0) + expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual( + max * maxMultiplier + ) + if (workerNode.usage.runTime.aggregate == null) { + expect(workerNode.usage.runTime.aggregate).toBeUndefined() + } else { + expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0) + } + if (workerNode.usage.runTime.median == null) { + expect(workerNode.usage.runTime.median).toBeUndefined() + } else { + expect(workerNode.usage.runTime.median).toBeGreaterThan(0) + } } expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( @@ -720,7 +1723,12 @@ describe('Selection strategies test suite', () => { expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( workerChoiceStrategy - ).currentWorkerNodeId + ).nextWorkerNodeKey + ).toBeDefined() + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + workerChoiceStrategy + ).previousWorkerNodeKey ).toBeDefined() expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( @@ -736,7 +1744,12 @@ describe('Selection strategies test suite', () => { expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( pool.workerChoiceStrategyContext.workerChoiceStrategy - ).currentWorkerNodeId + ).nextWorkerNodeKey + ).toBe(0) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).previousWorkerNodeKey ).toBe(0) expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( @@ -745,7 +1758,7 @@ describe('Selection strategies test suite', () => { ).toBeGreaterThan(0) expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - workerChoiceStrategy + pool.workerChoiceStrategyContext.workerChoiceStrategy ).workerVirtualTaskRunTime ).toBe(0) await pool.destroy() @@ -757,7 +1770,12 @@ describe('Selection strategies test suite', () => { expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( workerChoiceStrategy - ).currentWorkerNodeId + ).nextWorkerNodeKey + ).toBeDefined() + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + workerChoiceStrategy + ).previousWorkerNodeKey ).toBeDefined() expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( @@ -773,7 +1791,12 @@ describe('Selection strategies test suite', () => { expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( pool.workerChoiceStrategyContext.workerChoiceStrategy - ).currentWorkerNodeId + ).nextWorkerNodeKey + ).toBe(0) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).previousWorkerNodeKey ).toBe(0) expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( @@ -782,13 +1805,413 @@ describe('Selection strategies test suite', () => { ).toBeGreaterThan(0) expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - workerChoiceStrategy + pool.workerChoiceStrategyContext.workerChoiceStrategy ).workerVirtualTaskRunTime ).toBe(0) // We need to clean up the resources after our test await pool.destroy() }) + it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy default policy', async () => { + const workerChoiceStrategy = + WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN + let pool = new FixedThreadPool( + max, + './tests/worker-files/thread/testWorker.js', + { workerChoiceStrategy } + ) + expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({ + dynamicWorkerUsage: false, + dynamicWorkerReady: true + }) + await pool.destroy() + pool = new DynamicThreadPool( + min, + max, + './tests/worker-files/thread/testWorker.js', + { workerChoiceStrategy } + ) + expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({ + dynamicWorkerUsage: false, + dynamicWorkerReady: true + }) + // We need to clean up the resources after our test + await pool.destroy() + }) + + it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy default tasks statistics requirements', async () => { + const workerChoiceStrategy = + WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN + let pool = new FixedThreadPool( + max, + './tests/worker-files/thread/testWorker.js', + { workerChoiceStrategy } + ) + expect( + pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() + ).toStrictEqual({ + runTime: { + aggregate: true, + average: true, + median: false + }, + waitTime: { + aggregate: false, + average: false, + median: false + }, + elu: { + aggregate: false, + average: false, + median: false + } + }) + await pool.destroy() + pool = new DynamicThreadPool( + min, + max, + './tests/worker-files/thread/testWorker.js', + { workerChoiceStrategy } + ) + expect( + pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() + ).toStrictEqual({ + runTime: { + aggregate: true, + average: true, + median: false + }, + waitTime: { + aggregate: false, + average: false, + median: false + }, + elu: { + aggregate: false, + average: false, + median: false + } + }) + // We need to clean up the resources after our test + await pool.destroy() + }) + + it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy can be run in a fixed pool', async () => { + const pool = new FixedThreadPool( + max, + './tests/worker-files/thread/testWorker.js', + { + workerChoiceStrategy: + WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN + } + ) + // TODO: Create a better test to cover `InterleavedWeightedRoundRobinWorkerChoiceStrategy#choose` + const promises = new Set() + const maxMultiplier = 2 + for (let i = 0; i < max * maxMultiplier; i++) { + promises.add(pool.execute()) + } + await Promise.all(promises) + for (const workerNode of pool.workerNodes) { + expect(workerNode.usage).toStrictEqual({ + tasks: { + executed: expect.any(Number), + executing: 0, + queued: 0, + maxQueued: 0, + stolen: 0, + failed: 0 + }, + runTime: expect.objectContaining({ + history: expect.any(CircularArray) + }), + waitTime: { + history: new CircularArray() + }, + elu: { + idle: { + history: new CircularArray() + }, + active: { + history: new CircularArray() + } + } + }) + expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0) + expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual( + max * maxMultiplier + ) + } + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).defaultWorkerWeight + ).toBeGreaterThan(0) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).roundId + ).toBe(0) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).workerNodeId + ).toBe(0) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).nextWorkerNodeKey + ).toBe(0) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).previousWorkerNodeKey + ).toEqual(expect.any(Number)) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).roundWeights + ).toStrictEqual([ + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).defaultWorkerWeight + ]) + // We need to clean up the resources after our test + await pool.destroy() + }) + + it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool', async () => { + const pool = new DynamicThreadPool( + min, + max, + './tests/worker-files/thread/testWorker.js', + { + workerChoiceStrategy: + WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN + } + ) + // TODO: Create a better test to cover `InterleavedWeightedRoundRobinWorkerChoiceStrategy#choose` + const promises = new Set() + const maxMultiplier = 2 + for (let i = 0; i < max * maxMultiplier; i++) { + promises.add(pool.execute()) + } + await Promise.all(promises) + for (const workerNode of pool.workerNodes) { + expect(workerNode.usage).toStrictEqual({ + tasks: { + executed: expect.any(Number), + executing: 0, + queued: 0, + maxQueued: 0, + stolen: 0, + failed: 0 + }, + runTime: expect.objectContaining({ + history: expect.any(CircularArray) + }), + waitTime: { + history: new CircularArray() + }, + elu: { + idle: { + history: new CircularArray() + }, + active: { + history: new CircularArray() + } + } + }) + expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0) + expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual( + max * maxMultiplier + ) + } + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).defaultWorkerWeight + ).toBeGreaterThan(0) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).roundId + ).toBe(0) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).workerNodeId + ).toBe(0) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).nextWorkerNodeKey + ).toBe(0) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).previousWorkerNodeKey + ).toEqual(expect.any(Number)) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).roundWeights + ).toStrictEqual([ + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).defaultWorkerWeight + ]) + // We need to clean up the resources after our test + await pool.destroy() + }) + + it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy internals are resets after setting it', async () => { + const workerChoiceStrategy = + WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN + let pool = new FixedThreadPool( + max, + './tests/worker-files/thread/testWorker.js' + ) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + workerChoiceStrategy + ).roundId + ).toBeDefined() + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + workerChoiceStrategy + ).workerNodeId + ).toBeDefined() + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + workerChoiceStrategy + ).nextWorkerNodeKey + ).toBeDefined() + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + workerChoiceStrategy + ).previousWorkerNodeKey + ).toBeDefined() + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + workerChoiceStrategy + ).defaultWorkerWeight + ).toBeDefined() + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + workerChoiceStrategy + ).roundWeights + ).toBeDefined() + pool.setWorkerChoiceStrategy(workerChoiceStrategy) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).roundId + ).toBe(0) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).workerNodeId + ).toBe(0) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).nextWorkerNodeKey + ).toBe(0) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).previousWorkerNodeKey + ).toBe(0) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).defaultWorkerWeight + ).toBeGreaterThan(0) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).roundWeights + ).toStrictEqual([ + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).defaultWorkerWeight + ]) + await pool.destroy() + pool = new DynamicThreadPool( + min, + max, + './tests/worker-files/thread/testWorker.js' + ) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + workerChoiceStrategy + ).roundId + ).toBeDefined() + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + workerChoiceStrategy + ).workerNodeId + ).toBeDefined() + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + workerChoiceStrategy + ).nextWorkerNodeKey + ).toBeDefined() + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + workerChoiceStrategy + ).previousWorkerNodeKey + ).toBeDefined() + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + workerChoiceStrategy + ).defaultWorkerWeight + ).toBeDefined() + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + workerChoiceStrategy + ).roundWeights + ).toBeDefined() + pool.setWorkerChoiceStrategy(workerChoiceStrategy) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).roundId + ).toBe(0) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).workerNodeId + ).toBe(0) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).nextWorkerNodeKey + ).toBe(0) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).previousWorkerNodeKey + ).toBe(0) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).defaultWorkerWeight + ).toBeGreaterThan(0) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).roundWeights + ).toStrictEqual([ + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).defaultWorkerWeight + ]) + // We need to clean up the resources after our test + await pool.destroy() + }) + it('Verify unknown strategy throw error', () => { expect( () =>