const { expect } = require('expect')
const {
- WorkerChoiceStrategies,
DynamicThreadPool,
+ FixedClusterPool,
FixedThreadPool,
- FixedClusterPool
+ WorkerChoiceStrategies
} = require('../../../lib')
const { CircularArray } = require('../../../lib/circular-array')
expect(WorkerChoiceStrategies.ROUND_ROBIN).toBe('ROUND_ROBIN')
expect(WorkerChoiceStrategies.LEAST_USED).toBe('LEAST_USED')
expect(WorkerChoiceStrategies.LEAST_BUSY).toBe('LEAST_BUSY')
+ expect(WorkerChoiceStrategies.LEAST_ELU).toBe('LEAST_ELU')
expect(WorkerChoiceStrategies.FAIR_SHARE).toBe('FAIR_SHARE')
expect(WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN).toBe(
'WEIGHTED_ROUND_ROBIN'
)
+ expect(WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN).toBe(
+ 'INTERLEAVED_WEIGHTED_ROUND_ROBIN'
+ )
})
it('Verify ROUND_ROBIN strategy is the default at pool creation', async () => {
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
workerChoiceStrategy
- ).nextWorkerNodeId
+ ).nextWorkerNodeKey
).toBe(0)
} else if (workerChoiceStrategy === WorkerChoiceStrategies.FAIR_SHARE) {
expect(
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
workerChoiceStrategy
- ).currentWorkerNodeId
+ ).nextWorkerNodeKey
).toBe(0)
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
await pool.destroy()
})
- it('Verify ROUND_ROBIN strategy default tasks usage statistics requirements', async () => {
+ it('Verify ROUND_ROBIN strategy default policy', async () => {
+ const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
+ let pool = new FixedThreadPool(
+ max,
+ './tests/worker-files/thread/testWorker.js',
+ { workerChoiceStrategy }
+ )
+ expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
+ useDynamicWorker: true
+ })
+ await pool.destroy()
+ pool = new DynamicThreadPool(
+ min,
+ max,
+ './tests/worker-files/thread/testWorker.js',
+ { workerChoiceStrategy }
+ )
+ expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
+ useDynamicWorker: true
+ })
+ // We need to clean up the resources after our test
+ await pool.destroy()
+ })
+
+ it('Verify ROUND_ROBIN strategy default tasks statistics requirements', async () => {
const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
let pool = new FixedThreadPool(
max,
{ workerChoiceStrategy }
)
expect(
- pool.workerChoiceStrategyContext.getRequiredStatistics()
+ pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
).toStrictEqual({
- runTime: false,
- avgRunTime: false,
- medRunTime: false,
- waitTime: false,
- avgWaitTime: false,
- medWaitTime: false
+ runTime: {
+ aggregate: false,
+ average: false,
+ median: false
+ },
+ waitTime: {
+ aggregate: false,
+ average: false,
+ median: false
+ },
+ elu: {
+ aggregate: false,
+ average: false,
+ median: false
+ }
})
await pool.destroy()
pool = new DynamicThreadPool(
{ workerChoiceStrategy }
)
expect(
- pool.workerChoiceStrategyContext.getRequiredStatistics()
+ pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
).toStrictEqual({
- runTime: false,
- avgRunTime: false,
- medRunTime: false,
- waitTime: false,
- avgWaitTime: false,
- medWaitTime: false
+ runTime: {
+ aggregate: false,
+ average: false,
+ median: false
+ },
+ waitTime: {
+ aggregate: false,
+ average: false,
+ median: false
+ },
+ elu: {
+ aggregate: false,
+ average: false,
+ median: false
+ }
})
// We need to clean up the resources after our test
await pool.destroy()
{ workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN }
)
// TODO: Create a better test to cover `RoundRobinWorkerChoiceStrategy#choose`
+ const promises = new Set()
const maxMultiplier = 2
for (let i = 0; i < max * maxMultiplier; i++) {
- await pool.execute()
+ promises.add(pool.execute())
}
+ await Promise.all(promises)
+ for (const workerNode of pool.workerNodes) {
+ expect(workerNode.usage).toStrictEqual({
+ tasks: {
+ executed: maxMultiplier,
+ executing: 0,
+ queued: 0,
+ maxQueued: 0,
+ failed: 0
+ },
+ runTime: {
+ history: expect.any(CircularArray)
+ },
+ waitTime: {
+ history: expect.any(CircularArray)
+ },
+ elu: {
+ idle: {
+ history: expect.any(CircularArray)
+ },
+ active: {
+ history: expect.any(CircularArray)
+ }
+ }
+ })
+ }
+ expect(
+ pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
+ WorkerChoiceStrategies.ROUND_ROBIN
+ ).nextWorkerNodeKey
+ ).toBe(0)
// We need to clean up the resources after our test
await pool.destroy()
})
{ workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN }
)
// TODO: Create a better test to cover `RoundRobinWorkerChoiceStrategy#choose`
+ const promises = new Set()
const maxMultiplier = 2
for (let i = 0; i < max * maxMultiplier; i++) {
- await pool.execute()
+ promises.add(pool.execute())
}
+ await Promise.all(promises)
+ for (const workerNode of pool.workerNodes) {
+ expect(workerNode.usage).toStrictEqual({
+ tasks: {
+ executed: maxMultiplier,
+ executing: 0,
+ queued: 0,
+ maxQueued: 0,
+ failed: 0
+ },
+ runTime: {
+ history: expect.any(CircularArray)
+ },
+ waitTime: {
+ history: expect.any(CircularArray)
+ },
+ elu: {
+ idle: {
+ history: expect.any(CircularArray)
+ },
+ active: {
+ history: expect.any(CircularArray)
+ }
+ }
+ })
+ }
+ expect(
+ pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
+ WorkerChoiceStrategies.ROUND_ROBIN
+ ).nextWorkerNodeKey
+ ).toBe(0)
// We need to clean up the resources after our test
await pool.destroy()
})
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
workerChoiceStrategy
- ).nextWorkerNodeId
+ ).nextWorkerNodeKey
).toBeDefined()
pool.setWorkerChoiceStrategy(workerChoiceStrategy)
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
pool.workerChoiceStrategyContext.workerChoiceStrategy
- ).nextWorkerNodeId
+ ).nextWorkerNodeKey
).toBe(0)
await pool.destroy()
pool = new DynamicThreadPool(
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
workerChoiceStrategy
- ).nextWorkerNodeId
+ ).nextWorkerNodeKey
).toBeDefined()
pool.setWorkerChoiceStrategy(workerChoiceStrategy)
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
pool.workerChoiceStrategyContext.workerChoiceStrategy
- ).nextWorkerNodeId
+ ).nextWorkerNodeKey
).toBe(0)
// We need to clean up the resources after our test
await pool.destroy()
})
- it('Verify LEAST_USED strategy default tasks usage statistics requirements', async () => {
+ it('Verify LEAST_USED strategy default policy', async () => {
+ const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_USED
+ let pool = new FixedThreadPool(
+ max,
+ './tests/worker-files/thread/testWorker.js',
+ { workerChoiceStrategy }
+ )
+ expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
+ useDynamicWorker: false
+ })
+ await pool.destroy()
+ pool = new DynamicThreadPool(
+ min,
+ max,
+ './tests/worker-files/thread/testWorker.js',
+ { workerChoiceStrategy }
+ )
+ expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
+ useDynamicWorker: false
+ })
+ // We need to clean up the resources after our test
+ await pool.destroy()
+ })
+
+ it('Verify LEAST_USED strategy default tasks statistics requirements', async () => {
const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_USED
let pool = new FixedThreadPool(
max,
{ workerChoiceStrategy }
)
expect(
- pool.workerChoiceStrategyContext.getRequiredStatistics()
+ pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
).toStrictEqual({
- runTime: false,
- avgRunTime: false,
- medRunTime: false,
- waitTime: false,
- avgWaitTime: false,
- medWaitTime: false
+ runTime: {
+ aggregate: false,
+ average: false,
+ median: false
+ },
+ waitTime: {
+ aggregate: false,
+ average: false,
+ median: false
+ },
+ elu: {
+ aggregate: false,
+ average: false,
+ median: false
+ }
})
await pool.destroy()
pool = new DynamicThreadPool(
{ workerChoiceStrategy }
)
expect(
- pool.workerChoiceStrategyContext.getRequiredStatistics()
+ pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
).toStrictEqual({
- runTime: false,
- avgRunTime: false,
- medRunTime: false,
- waitTime: false,
- avgWaitTime: false,
- medWaitTime: false
+ runTime: {
+ aggregate: false,
+ average: false,
+ median: false
+ },
+ waitTime: {
+ aggregate: false,
+ average: false,
+ median: false
+ },
+ elu: {
+ aggregate: false,
+ average: false,
+ median: false
+ }
})
// We need to clean up the resources after our test
await pool.destroy()
{ workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED }
)
// TODO: Create a better test to cover `LeastUsedWorkerChoiceStrategy#choose`
+ const promises = new Set()
const maxMultiplier = 2
for (let i = 0; i < max * maxMultiplier; i++) {
- await pool.execute()
+ promises.add(pool.execute())
+ }
+ await Promise.all(promises)
+ for (const workerNode of pool.workerNodes) {
+ expect(workerNode.usage).toStrictEqual({
+ tasks: {
+ executed: expect.any(Number),
+ executing: 0,
+ queued: 0,
+ maxQueued: 0,
+ failed: 0
+ },
+ runTime: {
+ history: expect.any(CircularArray)
+ },
+ waitTime: {
+ history: expect.any(CircularArray)
+ },
+ elu: {
+ idle: {
+ history: expect.any(CircularArray)
+ },
+ active: {
+ history: expect.any(CircularArray)
+ }
+ }
+ })
+ expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
+ expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
+ max * maxMultiplier
+ )
}
// We need to clean up the resources after our test
await pool.destroy()
{ workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED }
)
// TODO: Create a better test to cover `LeastUsedWorkerChoiceStrategy#choose`
+ const promises = new Set()
const maxMultiplier = 2
for (let i = 0; i < max * maxMultiplier; i++) {
- await pool.execute()
+ promises.add(pool.execute())
+ }
+ await Promise.all(promises)
+ for (const workerNode of pool.workerNodes) {
+ expect(workerNode.usage).toStrictEqual({
+ tasks: {
+ executed: expect.any(Number),
+ executing: 0,
+ queued: 0,
+ maxQueued: 0,
+ failed: 0
+ },
+ runTime: {
+ history: expect.any(CircularArray)
+ },
+ waitTime: {
+ history: expect.any(CircularArray)
+ },
+ elu: {
+ idle: {
+ history: expect.any(CircularArray)
+ },
+ active: {
+ history: expect.any(CircularArray)
+ }
+ }
+ })
+ expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
+ expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
+ max * maxMultiplier
+ )
}
// We need to clean up the resources after our test
await pool.destroy()
})
- it('Verify LEAST_BUSY strategy default tasks usage statistics requirements', async () => {
+ it('Verify LEAST_BUSY strategy default policy', async () => {
+ const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_BUSY
+ let pool = new FixedThreadPool(
+ max,
+ './tests/worker-files/thread/testWorker.js',
+ { workerChoiceStrategy }
+ )
+ expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
+ useDynamicWorker: false
+ })
+ await pool.destroy()
+ pool = new DynamicThreadPool(
+ min,
+ max,
+ './tests/worker-files/thread/testWorker.js',
+ { workerChoiceStrategy }
+ )
+ expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
+ useDynamicWorker: false
+ })
+ // We need to clean up the resources after our test
+ await pool.destroy()
+ })
+
+ it('Verify LEAST_BUSY strategy default tasks statistics requirements', async () => {
const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_BUSY
let pool = new FixedThreadPool(
max,
{ workerChoiceStrategy }
)
expect(
- pool.workerChoiceStrategyContext.getRequiredStatistics()
+ pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
).toStrictEqual({
- runTime: true,
- avgRunTime: false,
- medRunTime: false,
- waitTime: false,
- avgWaitTime: false,
- medWaitTime: false
+ runTime: {
+ aggregate: true,
+ average: false,
+ median: false
+ },
+ waitTime: {
+ aggregate: true,
+ average: false,
+ median: false
+ },
+ elu: {
+ aggregate: false,
+ average: false,
+ median: false
+ }
})
await pool.destroy()
pool = new DynamicThreadPool(
{ workerChoiceStrategy }
)
expect(
- pool.workerChoiceStrategyContext.getRequiredStatistics()
+ pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
).toStrictEqual({
- runTime: true,
- avgRunTime: false,
- medRunTime: false,
- waitTime: false,
- avgWaitTime: false,
- medWaitTime: false
+ runTime: {
+ aggregate: true,
+ average: false,
+ median: false
+ },
+ waitTime: {
+ aggregate: true,
+ average: false,
+ median: false
+ },
+ elu: {
+ aggregate: false,
+ average: false,
+ median: false
+ }
})
// We need to clean up the resources after our test
await pool.destroy()
{ workerChoiceStrategy: WorkerChoiceStrategies.LEAST_BUSY }
)
// TODO: Create a better test to cover `LeastBusyWorkerChoiceStrategy#choose`
+ const promises = new Set()
const maxMultiplier = 2
for (let i = 0; i < max * maxMultiplier; i++) {
- await pool.execute()
+ promises.add(pool.execute())
+ }
+ await Promise.all(promises)
+ for (const workerNode of pool.workerNodes) {
+ expect(workerNode.usage).toMatchObject({
+ tasks: {
+ executed: expect.any(Number),
+ executing: 0,
+ queued: 0,
+ maxQueued: 0,
+ failed: 0
+ },
+ runTime: {
+ history: expect.any(CircularArray)
+ },
+ waitTime: {
+ history: expect.any(CircularArray)
+ },
+ elu: {
+ idle: {
+ history: expect.any(CircularArray)
+ },
+ active: {
+ history: expect.any(CircularArray)
+ }
+ }
+ })
+ expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
+ expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
+ max * maxMultiplier
+ )
+ if (workerNode.usage.runTime.aggregate == null) {
+ expect(workerNode.usage.runTime.aggregate).toBeUndefined()
+ } else {
+ expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
+ }
+ if (workerNode.usage.waitTime.aggregate == null) {
+ expect(workerNode.usage.waitTime.aggregate).toBeUndefined()
+ } else {
+ expect(workerNode.usage.waitTime.aggregate).toBeGreaterThan(0)
+ }
}
// We need to clean up the resources after our test
await pool.destroy()
{ workerChoiceStrategy: WorkerChoiceStrategies.LEAST_BUSY }
)
// TODO: Create a better test to cover `LeastBusyWorkerChoiceStrategy#choose`
+ const promises = new Set()
+ const maxMultiplier = 2
+ for (let i = 0; i < max * maxMultiplier; i++) {
+ promises.add(pool.execute())
+ }
+ await Promise.all(promises)
+ for (const workerNode of pool.workerNodes) {
+ expect(workerNode.usage).toMatchObject({
+ tasks: {
+ executed: expect.any(Number),
+ executing: 0,
+ queued: 0,
+ maxQueued: 0,
+ failed: 0
+ },
+ runTime: {
+ history: expect.any(CircularArray)
+ },
+ waitTime: {
+ history: expect.any(CircularArray)
+ },
+ elu: {
+ idle: {
+ history: expect.any(CircularArray)
+ },
+ active: {
+ history: expect.any(CircularArray)
+ }
+ }
+ })
+ expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
+ expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
+ max * maxMultiplier
+ )
+ if (workerNode.usage.runTime.aggregate == null) {
+ expect(workerNode.usage.runTime.aggregate).toBeUndefined()
+ } else {
+ expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
+ }
+ if (workerNode.usage.waitTime.aggregate == null) {
+ expect(workerNode.usage.waitTime.aggregate).toBeUndefined()
+ } else {
+ expect(workerNode.usage.waitTime.aggregate).toBeGreaterThan(0)
+ }
+ }
+ // We need to clean up the resources after our test
+ await pool.destroy()
+ })
+
+ it('Verify LEAST_ELU strategy default policy', async () => {
+ const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_ELU
+ let pool = new FixedThreadPool(
+ max,
+ './tests/worker-files/thread/testWorker.js',
+ { workerChoiceStrategy }
+ )
+ expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
+ useDynamicWorker: false
+ })
+ await pool.destroy()
+ pool = new DynamicThreadPool(
+ min,
+ max,
+ './tests/worker-files/thread/testWorker.js',
+ { workerChoiceStrategy }
+ )
+ expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
+ useDynamicWorker: false
+ })
+ // We need to clean up the resources after our test
+ await pool.destroy()
+ })
+
+ it('Verify LEAST_ELU strategy default tasks statistics requirements', async () => {
+ const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_ELU
+ let pool = new FixedThreadPool(
+ max,
+ './tests/worker-files/thread/testWorker.js',
+ { workerChoiceStrategy }
+ )
+ expect(
+ pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ ).toStrictEqual({
+ runTime: {
+ aggregate: false,
+ average: false,
+ median: false
+ },
+ waitTime: {
+ aggregate: false,
+ average: false,
+ median: false
+ },
+ elu: {
+ aggregate: true,
+ average: false,
+ median: false
+ }
+ })
+ await pool.destroy()
+ pool = new DynamicThreadPool(
+ min,
+ max,
+ './tests/worker-files/thread/testWorker.js',
+ { workerChoiceStrategy }
+ )
+ expect(
+ pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ ).toStrictEqual({
+ runTime: {
+ aggregate: false,
+ average: false,
+ median: false
+ },
+ waitTime: {
+ aggregate: false,
+ average: false,
+ median: false
+ },
+ elu: {
+ aggregate: true,
+ average: false,
+ median: false
+ }
+ })
+ // We need to clean up the resources after our test
+ await pool.destroy()
+ })
+
+ it('Verify LEAST_ELU strategy can be run in a fixed pool', async () => {
+ const pool = new FixedThreadPool(
+ max,
+ './tests/worker-files/thread/testWorker.js',
+ { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_ELU }
+ )
+ // TODO: Create a better test to cover `LeastEluWorkerChoiceStrategy#choose`
+ const promises = new Set()
const maxMultiplier = 2
for (let i = 0; i < max * maxMultiplier; i++) {
- await pool.execute()
+ promises.add(pool.execute())
+ }
+ await Promise.all(promises)
+ for (const workerNode of pool.workerNodes) {
+ expect(workerNode.usage).toMatchObject({
+ tasks: {
+ executed: expect.any(Number),
+ executing: 0,
+ queued: 0,
+ maxQueued: 0,
+ failed: 0
+ },
+ runTime: {
+ history: expect.any(CircularArray)
+ },
+ waitTime: {
+ history: expect.any(CircularArray)
+ },
+ elu: {
+ idle: expect.objectContaining({
+ history: expect.any(CircularArray)
+ }),
+ active: expect.objectContaining({
+ history: expect.any(CircularArray)
+ })
+ }
+ })
+ expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
+ expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
+ max * maxMultiplier
+ )
+ if (workerNode.usage.elu.utilization == null) {
+ expect(workerNode.usage.elu.utilization).toBeUndefined()
+ } else {
+ expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
+ expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
+ }
}
// We need to clean up the resources after our test
await pool.destroy()
})
- it('Verify FAIR_SHARE strategy default tasks usage statistics requirements', async () => {
+ it('Verify LEAST_ELU strategy can be run in a dynamic pool', async () => {
+ const pool = new DynamicThreadPool(
+ min,
+ max,
+ './tests/worker-files/thread/testWorker.js',
+ { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_ELU }
+ )
+ // TODO: Create a better test to cover `LeastEluWorkerChoiceStrategy#choose`
+ const promises = new Set()
+ const maxMultiplier = 2
+ for (let i = 0; i < max * maxMultiplier; i++) {
+ promises.add(pool.execute())
+ }
+ await Promise.all(promises)
+ for (const workerNode of pool.workerNodes) {
+ expect(workerNode.usage).toMatchObject({
+ tasks: {
+ executed: expect.any(Number),
+ executing: 0,
+ queued: 0,
+ maxQueued: 0,
+ failed: 0
+ },
+ runTime: {
+ history: expect.any(CircularArray)
+ },
+ waitTime: {
+ history: expect.any(CircularArray)
+ },
+ elu: {
+ idle: expect.objectContaining({
+ history: expect.any(CircularArray)
+ }),
+ active: expect.objectContaining({
+ history: expect.any(CircularArray)
+ })
+ }
+ })
+ expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
+ expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
+ max * maxMultiplier
+ )
+ if (workerNode.usage.elu.utilization == null) {
+ expect(workerNode.usage.elu.utilization).toBeUndefined()
+ } else {
+ expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
+ expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
+ }
+ }
+ // We need to clean up the resources after our test
+ await pool.destroy()
+ })
+
+ it('Verify FAIR_SHARE strategy default policy', async () => {
+ const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
+ let pool = new FixedThreadPool(
+ max,
+ './tests/worker-files/thread/testWorker.js',
+ { workerChoiceStrategy }
+ )
+ expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
+ useDynamicWorker: false
+ })
+ await pool.destroy()
+ pool = new DynamicThreadPool(
+ min,
+ max,
+ './tests/worker-files/thread/testWorker.js',
+ { workerChoiceStrategy }
+ )
+ expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
+ useDynamicWorker: false
+ })
+ // We need to clean up the resources after our test
+ await pool.destroy()
+ })
+
+ it('Verify FAIR_SHARE strategy default tasks statistics requirements', async () => {
const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
let pool = new FixedThreadPool(
max,
{ workerChoiceStrategy }
)
expect(
- pool.workerChoiceStrategyContext.getRequiredStatistics()
+ pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
).toStrictEqual({
- runTime: true,
- avgRunTime: true,
- medRunTime: false,
- waitTime: false,
- avgWaitTime: false,
- medWaitTime: false
+ runTime: {
+ aggregate: true,
+ average: true,
+ median: false
+ },
+ waitTime: {
+ aggregate: false,
+ average: false,
+ median: false
+ },
+ elu: {
+ aggregate: true,
+ average: true,
+ median: false
+ }
})
await pool.destroy()
pool = new DynamicThreadPool(
{ workerChoiceStrategy }
)
expect(
- pool.workerChoiceStrategyContext.getRequiredStatistics()
+ pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
).toStrictEqual({
- runTime: true,
- avgRunTime: true,
- medRunTime: false,
- waitTime: false,
- avgWaitTime: false,
- medWaitTime: false
+ runTime: {
+ aggregate: true,
+ average: true,
+ median: false
+ },
+ waitTime: {
+ aggregate: false,
+ average: false,
+ median: false
+ },
+ elu: {
+ aggregate: true,
+ average: true,
+ median: false
+ }
})
// We need to clean up the resources after our test
await pool.destroy()
{ workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
)
// TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
+ const promises = new Set()
const maxMultiplier = 2
for (let i = 0; i < max * maxMultiplier; i++) {
- await pool.execute()
+ promises.add(pool.execute())
}
+ await Promise.all(promises)
for (const workerNode of pool.workerNodes) {
- expect(workerNode.tasksUsage).toStrictEqual({
- run: maxMultiplier,
- running: 0,
- runTime: expect.any(Number),
- runTimeHistory: expect.any(CircularArray),
- avgRunTime: expect.any(Number),
- medRunTime: 0,
- waitTime: 0,
- waitTimeHistory: expect.any(CircularArray),
- avgWaitTime: 0,
- medWaitTime: 0,
- error: 0
+ expect(workerNode.usage).toMatchObject({
+ tasks: {
+ executed: expect.any(Number),
+ executing: 0,
+ queued: 0,
+ maxQueued: 0,
+ failed: 0
+ },
+ runTime: expect.objectContaining({
+ history: expect.any(CircularArray)
+ }),
+ waitTime: {
+ history: expect.any(CircularArray)
+ },
+ elu: {
+ idle: expect.objectContaining({
+ history: expect.any(CircularArray)
+ }),
+ active: expect.objectContaining({
+ history: expect.any(CircularArray)
+ })
+ }
})
- expect(workerNode.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0)
+ expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
+ expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
+ max * maxMultiplier
+ )
+ if (workerNode.usage.runTime.aggregate == null) {
+ expect(workerNode.usage.runTime.aggregate).toBeUndefined()
+ } else {
+ expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
+ }
+ if (workerNode.usage.runTime.average == null) {
+ expect(workerNode.usage.runTime.average).toBeUndefined()
+ } else {
+ expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
+ }
+ if (workerNode.usage.elu.utilization == null) {
+ expect(workerNode.usage.elu.utilization).toBeUndefined()
+ } else {
+ expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
+ expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
+ }
}
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
{ workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
)
// TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
+ const promises = new Set()
const maxMultiplier = 2
for (let i = 0; i < max * maxMultiplier; i++) {
- await pool.execute()
+ promises.add(pool.execute())
}
+ await Promise.all(promises)
for (const workerNode of pool.workerNodes) {
- expect(workerNode.tasksUsage).toStrictEqual({
- run: max * maxMultiplier,
- running: 0,
- runTime: expect.any(Number),
- runTimeHistory: expect.any(CircularArray),
- avgRunTime: expect.any(Number),
- medRunTime: 0,
- waitTime: 0,
- waitTimeHistory: expect.any(CircularArray),
- avgWaitTime: 0,
- medWaitTime: 0,
- error: 0
+ expect(workerNode.usage).toMatchObject({
+ tasks: {
+ executed: expect.any(Number),
+ executing: 0,
+ queued: 0,
+ maxQueued: 0,
+ failed: 0
+ },
+ runTime: expect.objectContaining({
+ history: expect.any(CircularArray)
+ }),
+ waitTime: {
+ history: expect.any(CircularArray)
+ },
+ elu: {
+ idle: expect.objectContaining({
+ history: expect.any(CircularArray)
+ }),
+ active: expect.objectContaining({
+ history: expect.any(CircularArray)
+ })
+ }
})
- expect(workerNode.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0)
+ expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
+ expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
+ max * maxMultiplier
+ )
+ if (workerNode.usage.runTime.aggregate == null) {
+ expect(workerNode.usage.runTime.aggregate).toBeUndefined()
+ } else {
+ expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
+ }
+ if (workerNode.usage.runTime.average == null) {
+ expect(workerNode.usage.runTime.average).toBeUndefined()
+ } else {
+ expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
+ }
+ if (workerNode.usage.elu.utilization == null) {
+ expect(workerNode.usage.elu.utilization).toBeUndefined()
+ } else {
+ expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
+ expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
+ }
}
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
{
workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE,
workerChoiceStrategyOptions: {
- medRunTime: true
+ runTime: { median: true }
}
}
)
// TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
+ const promises = new Set()
const maxMultiplier = 2
for (let i = 0; i < max * maxMultiplier; i++) {
- await pool.execute()
+ promises.add(pool.execute())
}
+ await Promise.all(promises)
for (const workerNode of pool.workerNodes) {
- expect(workerNode.tasksUsage).toStrictEqual({
- run: max * maxMultiplier,
- running: 0,
- runTime: expect.any(Number),
- runTimeHistory: expect.any(CircularArray),
- avgRunTime: 0,
- medRunTime: expect.any(Number),
- waitTime: 0,
- waitTimeHistory: expect.any(CircularArray),
- avgWaitTime: 0,
- medWaitTime: 0,
- error: 0
+ expect(workerNode.usage).toMatchObject({
+ tasks: {
+ executed: expect.any(Number),
+ executing: 0,
+ queued: 0,
+ maxQueued: 0,
+ failed: 0
+ },
+ runTime: expect.objectContaining({
+ history: expect.any(CircularArray)
+ }),
+ waitTime: {
+ history: expect.any(CircularArray)
+ },
+ elu: {
+ idle: expect.objectContaining({
+ history: expect.any(CircularArray)
+ }),
+ active: expect.objectContaining({
+ history: expect.any(CircularArray)
+ })
+ }
})
- expect(workerNode.tasksUsage.medRunTime).toBeGreaterThanOrEqual(0)
+ expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
+ expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
+ max * maxMultiplier
+ )
+ if (workerNode.usage.runTime.aggregate == null) {
+ expect(workerNode.usage.runTime.aggregate).toBeUndefined()
+ } else {
+ expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
+ }
+ if (workerNode.usage.runTime.median == null) {
+ expect(workerNode.usage.runTime.median).toBeUndefined()
+ } else {
+ expect(workerNode.usage.runTime.median).toBeGreaterThan(0)
+ }
+ if (workerNode.usage.elu.utilization == null) {
+ expect(workerNode.usage.elu.utilization).toBeUndefined()
+ } else {
+ expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
+ expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
+ }
}
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
await pool.destroy()
})
- it('Verify WEIGHTED_ROUND_ROBIN strategy default tasks usage statistics requirements', async () => {
+ it('Verify WEIGHTED_ROUND_ROBIN strategy default policy', async () => {
+ const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
+ let pool = new FixedThreadPool(
+ max,
+ './tests/worker-files/thread/testWorker.js',
+ { workerChoiceStrategy }
+ )
+ expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
+ useDynamicWorker: true
+ })
+ await pool.destroy()
+ pool = new DynamicThreadPool(
+ min,
+ max,
+ './tests/worker-files/thread/testWorker.js',
+ { workerChoiceStrategy }
+ )
+ expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
+ useDynamicWorker: true
+ })
+ // We need to clean up the resources after our test
+ await pool.destroy()
+ })
+
+ it('Verify WEIGHTED_ROUND_ROBIN strategy default tasks statistics requirements', async () => {
const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
let pool = new FixedThreadPool(
max,
{ workerChoiceStrategy }
)
expect(
- pool.workerChoiceStrategyContext.getRequiredStatistics()
+ pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
).toStrictEqual({
- runTime: true,
- avgRunTime: true,
- medRunTime: false,
- waitTime: false,
- avgWaitTime: false,
- medWaitTime: false
+ runTime: {
+ aggregate: true,
+ average: true,
+ median: false
+ },
+ waitTime: {
+ aggregate: false,
+ average: false,
+ median: false
+ },
+ elu: {
+ aggregate: false,
+ average: false,
+ median: false
+ }
})
await pool.destroy()
pool = new DynamicThreadPool(
{ workerChoiceStrategy }
)
expect(
- pool.workerChoiceStrategyContext.getRequiredStatistics()
+ pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
).toStrictEqual({
- runTime: true,
- avgRunTime: true,
- medRunTime: false,
- waitTime: false,
- avgWaitTime: false,
- medWaitTime: false
+ runTime: {
+ aggregate: true,
+ average: true,
+ median: false
+ },
+ waitTime: {
+ aggregate: false,
+ average: false,
+ median: false
+ },
+ elu: {
+ aggregate: false,
+ average: false,
+ median: false
+ }
})
// We need to clean up the resources after our test
await pool.destroy()
{ workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
)
// TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
+ const promises = new Set()
const maxMultiplier = 2
for (let i = 0; i < max * maxMultiplier; i++) {
- await pool.execute()
+ promises.add(pool.execute())
}
+ await Promise.all(promises)
for (const workerNode of pool.workerNodes) {
- expect(workerNode.tasksUsage).toStrictEqual({
- run: max * maxMultiplier,
- running: 0,
- runTime: expect.any(Number),
- runTimeHistory: expect.any(CircularArray),
- avgRunTime: expect.any(Number),
- medRunTime: 0,
- waitTime: 0,
- waitTimeHistory: expect.any(CircularArray),
- avgWaitTime: 0,
- medWaitTime: 0,
- error: 0
+ expect(workerNode.usage).toStrictEqual({
+ tasks: {
+ executed: expect.any(Number),
+ executing: 0,
+ queued: 0,
+ maxQueued: 0,
+ failed: 0
+ },
+ runTime: expect.objectContaining({
+ history: expect.any(CircularArray)
+ }),
+ waitTime: {
+ history: expect.any(CircularArray)
+ },
+ elu: {
+ idle: {
+ history: expect.any(CircularArray)
+ },
+ active: {
+ history: expect.any(CircularArray)
+ }
+ }
})
- expect(workerNode.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0)
+ expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
+ expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
+ max * maxMultiplier
+ )
+ if (workerNode.usage.runTime.aggregate == null) {
+ expect(workerNode.usage.runTime.aggregate).toBeUndefined()
+ } else {
+ expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
+ }
+ if (workerNode.usage.runTime.average == null) {
+ expect(workerNode.usage.runTime.average).toBeUndefined()
+ } else {
+ expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
+ }
}
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
{ workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
)
// TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
+ const promises = new Set()
const maxMultiplier = 2
for (let i = 0; i < max * maxMultiplier; i++) {
- await pool.execute()
+ promises.add(pool.execute())
}
+ await Promise.all(promises)
for (const workerNode of pool.workerNodes) {
- expect(workerNode.tasksUsage).toStrictEqual({
- run: max * maxMultiplier,
- running: 0,
- runTime: expect.any(Number),
- runTimeHistory: expect.any(CircularArray),
- avgRunTime: expect.any(Number),
- medRunTime: 0,
- waitTime: 0,
- waitTimeHistory: expect.any(CircularArray),
- avgWaitTime: 0,
- medWaitTime: 0,
- error: 0
+ expect(workerNode.usage).toStrictEqual({
+ tasks: {
+ executed: expect.any(Number),
+ executing: 0,
+ queued: 0,
+ maxQueued: 0,
+ failed: 0
+ },
+ runTime: {
+ aggregate: expect.any(Number),
+ maximum: expect.any(Number),
+ minimum: expect.any(Number),
+ average: expect.any(Number),
+ history: expect.any(CircularArray)
+ },
+ waitTime: {
+ history: expect.any(CircularArray)
+ },
+ elu: {
+ idle: {
+ history: expect.any(CircularArray)
+ },
+ active: {
+ history: expect.any(CircularArray)
+ }
+ }
})
- expect(workerNode.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0)
+ expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
+ expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
+ max * maxMultiplier
+ )
+ expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
+ expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
}
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
{
workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN,
workerChoiceStrategyOptions: {
- medRunTime: true
+ runTime: { median: true }
}
}
)
// TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
+ const promises = new Set()
const maxMultiplier = 2
for (let i = 0; i < max * maxMultiplier; i++) {
- await pool.execute()
+ promises.add(pool.execute())
}
+ await Promise.all(promises)
for (const workerNode of pool.workerNodes) {
- expect(workerNode.tasksUsage).toStrictEqual({
- run: max * maxMultiplier,
- running: 0,
- runTime: expect.any(Number),
- runTimeHistory: expect.any(CircularArray),
- avgRunTime: 0,
- medRunTime: expect.any(Number),
- waitTime: 0,
- waitTimeHistory: expect.any(CircularArray),
- avgWaitTime: 0,
- medWaitTime: 0,
- error: 0
+ expect(workerNode.usage).toStrictEqual({
+ tasks: {
+ executed: expect.any(Number),
+ executing: 0,
+ queued: 0,
+ maxQueued: 0,
+ failed: 0
+ },
+ runTime: {
+ aggregate: expect.any(Number),
+ maximum: expect.any(Number),
+ minimum: expect.any(Number),
+ median: expect.any(Number),
+ history: expect.any(CircularArray)
+ },
+ waitTime: {
+ history: expect.any(CircularArray)
+ },
+ elu: {
+ idle: {
+ history: expect.any(CircularArray)
+ },
+ active: {
+ history: expect.any(CircularArray)
+ }
+ }
})
- expect(workerNode.tasksUsage.medRunTime).toBeGreaterThanOrEqual(0)
+ expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
+ expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
+ max * maxMultiplier
+ )
+ expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
+ expect(workerNode.usage.runTime.median).toBeGreaterThan(0)
}
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
workerChoiceStrategy
- ).currentWorkerNodeId
+ ).nextWorkerNodeKey
).toBeDefined()
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
pool.workerChoiceStrategyContext.workerChoiceStrategy
- ).currentWorkerNodeId
+ ).nextWorkerNodeKey
).toBe(0)
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
workerChoiceStrategy
- ).currentWorkerNodeId
+ ).nextWorkerNodeKey
).toBeDefined()
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
pool.workerChoiceStrategyContext.workerChoiceStrategy
- ).currentWorkerNodeId
+ ).nextWorkerNodeKey
).toBe(0)
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
await pool.destroy()
})
+ it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy default policy', async () => {
+ const workerChoiceStrategy =
+ WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
+ let pool = new FixedThreadPool(
+ max,
+ './tests/worker-files/thread/testWorker.js',
+ { workerChoiceStrategy }
+ )
+ expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
+ useDynamicWorker: true
+ })
+ await pool.destroy()
+ pool = new DynamicThreadPool(
+ min,
+ max,
+ './tests/worker-files/thread/testWorker.js',
+ { workerChoiceStrategy }
+ )
+ expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
+ useDynamicWorker: true
+ })
+ // We need to clean up the resources after our test
+ await pool.destroy()
+ })
+
+ it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy default tasks statistics requirements', async () => {
+ const workerChoiceStrategy =
+ WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
+ let pool = new FixedThreadPool(
+ max,
+ './tests/worker-files/thread/testWorker.js',
+ { workerChoiceStrategy }
+ )
+ expect(
+ pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ ).toStrictEqual({
+ runTime: {
+ aggregate: false,
+ average: false,
+ median: false
+ },
+ waitTime: {
+ aggregate: false,
+ average: false,
+ median: false
+ },
+ elu: {
+ aggregate: false,
+ average: false,
+ median: false
+ }
+ })
+ await pool.destroy()
+ pool = new DynamicThreadPool(
+ min,
+ max,
+ './tests/worker-files/thread/testWorker.js',
+ { workerChoiceStrategy }
+ )
+ expect(
+ pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ ).toStrictEqual({
+ runTime: {
+ aggregate: false,
+ average: false,
+ median: false
+ },
+ waitTime: {
+ aggregate: false,
+ average: false,
+ median: false
+ },
+ elu: {
+ aggregate: false,
+ average: false,
+ median: false
+ }
+ })
+ // We need to clean up the resources after our test
+ await pool.destroy()
+ })
+
+ it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy can be run in a fixed pool', async () => {
+ const pool = new FixedThreadPool(
+ max,
+ './tests/worker-files/thread/testWorker.js',
+ {
+ workerChoiceStrategy:
+ WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
+ }
+ )
+ // TODO: Create a better test to cover `InterleavedWeightedRoundRobinWorkerChoiceStrategy#choose`
+ const promises = new Set()
+ const maxMultiplier = 2
+ for (let i = 0; i < max * maxMultiplier; i++) {
+ promises.add(pool.execute())
+ }
+ await Promise.all(promises)
+ for (const workerNode of pool.workerNodes) {
+ expect(workerNode.usage).toStrictEqual({
+ tasks: {
+ executed: maxMultiplier,
+ executing: 0,
+ queued: 0,
+ maxQueued: 0,
+ failed: 0
+ },
+ runTime: {
+ history: expect.any(CircularArray)
+ },
+ waitTime: {
+ history: expect.any(CircularArray)
+ },
+ elu: {
+ idle: {
+ history: expect.any(CircularArray)
+ },
+ active: {
+ history: expect.any(CircularArray)
+ }
+ }
+ })
+ }
+ expect(
+ pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
+ pool.workerChoiceStrategyContext.workerChoiceStrategy
+ ).defaultWorkerWeight
+ ).toBeGreaterThan(0)
+ expect(
+ pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
+ pool.workerChoiceStrategyContext.workerChoiceStrategy
+ ).roundId
+ ).toBe(0)
+ expect(
+ pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
+ pool.workerChoiceStrategyContext.workerChoiceStrategy
+ ).nextWorkerNodeKey
+ ).toBe(0)
+ expect(
+ pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
+ pool.workerChoiceStrategyContext.workerChoiceStrategy
+ ).roundWeights
+ ).toStrictEqual([
+ pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
+ pool.workerChoiceStrategyContext.workerChoiceStrategy
+ ).defaultWorkerWeight
+ ])
+ // We need to clean up the resources after our test
+ await pool.destroy()
+ })
+
+ it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool', async () => {
+ const pool = new DynamicThreadPool(
+ min,
+ max,
+ './tests/worker-files/thread/testWorker.js',
+ {
+ workerChoiceStrategy:
+ WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
+ }
+ )
+ // TODO: Create a better test to cover `InterleavedWeightedRoundRobinWorkerChoiceStrategy#choose`
+ const promises = new Set()
+ const maxMultiplier = 2
+ for (let i = 0; i < max * maxMultiplier; i++) {
+ promises.add(pool.execute())
+ }
+ await Promise.all(promises)
+ for (const workerNode of pool.workerNodes) {
+ expect(workerNode.usage).toStrictEqual({
+ tasks: {
+ executed: maxMultiplier,
+ executing: 0,
+ queued: 0,
+ maxQueued: 0,
+ failed: 0
+ },
+ runTime: {
+ history: expect.any(CircularArray)
+ },
+ waitTime: {
+ history: expect.any(CircularArray)
+ },
+ elu: {
+ idle: {
+ history: expect.any(CircularArray)
+ },
+ active: {
+ history: expect.any(CircularArray)
+ }
+ }
+ })
+ }
+ expect(
+ pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
+ pool.workerChoiceStrategyContext.workerChoiceStrategy
+ ).defaultWorkerWeight
+ ).toBeGreaterThan(0)
+ expect(
+ pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
+ pool.workerChoiceStrategyContext.workerChoiceStrategy
+ ).roundId
+ ).toBe(0)
+ expect(
+ pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
+ pool.workerChoiceStrategyContext.workerChoiceStrategy
+ ).nextWorkerNodeKey
+ ).toBe(0)
+ expect(
+ pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
+ pool.workerChoiceStrategyContext.workerChoiceStrategy
+ ).roundWeights
+ ).toStrictEqual([
+ pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
+ pool.workerChoiceStrategyContext.workerChoiceStrategy
+ ).defaultWorkerWeight
+ ])
+ // We need to clean up the resources after our test
+ await pool.destroy()
+ })
+
+ it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy internals are resets after setting it', async () => {
+ const workerChoiceStrategy =
+ WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
+ let pool = new FixedThreadPool(
+ max,
+ './tests/worker-files/thread/testWorker.js'
+ )
+ expect(
+ pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
+ workerChoiceStrategy
+ ).roundId
+ ).toBeDefined()
+ expect(
+ pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
+ workerChoiceStrategy
+ ).nextWorkerNodeKey
+ ).toBeDefined()
+ expect(
+ pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
+ workerChoiceStrategy
+ ).defaultWorkerWeight
+ ).toBeDefined()
+ expect(
+ pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
+ workerChoiceStrategy
+ ).roundWeights
+ ).toBeDefined()
+ pool.setWorkerChoiceStrategy(workerChoiceStrategy)
+ expect(
+ pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
+ workerChoiceStrategy
+ ).roundId
+ ).toBe(0)
+ expect(
+ pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
+ pool.workerChoiceStrategyContext.workerChoiceStrategy
+ ).nextWorkerNodeKey
+ ).toBe(0)
+ expect(
+ pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
+ pool.workerChoiceStrategyContext.workerChoiceStrategy
+ ).defaultWorkerWeight
+ ).toBeGreaterThan(0)
+ expect(
+ pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
+ workerChoiceStrategy
+ ).roundWeights
+ ).toStrictEqual([
+ pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
+ pool.workerChoiceStrategyContext.workerChoiceStrategy
+ ).defaultWorkerWeight
+ ])
+ await pool.destroy()
+ pool = new DynamicThreadPool(
+ min,
+ max,
+ './tests/worker-files/thread/testWorker.js'
+ )
+ expect(
+ pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
+ workerChoiceStrategy
+ ).roundId
+ ).toBeDefined()
+ expect(
+ pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
+ workerChoiceStrategy
+ ).nextWorkerNodeKey
+ ).toBeDefined()
+ expect(
+ pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
+ workerChoiceStrategy
+ ).defaultWorkerWeight
+ ).toBeDefined()
+ expect(
+ pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
+ workerChoiceStrategy
+ ).roundWeights
+ ).toBeDefined()
+ pool.setWorkerChoiceStrategy(workerChoiceStrategy)
+ expect(
+ pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
+ pool.workerChoiceStrategyContext.workerChoiceStrategy
+ ).nextWorkerNodeKey
+ ).toBe(0)
+ expect(
+ pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
+ pool.workerChoiceStrategyContext.workerChoiceStrategy
+ ).defaultWorkerWeight
+ ).toBeGreaterThan(0)
+ expect(
+ pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
+ workerChoiceStrategy
+ ).roundWeights
+ ).toStrictEqual([
+ pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
+ pool.workerChoiceStrategyContext.workerChoiceStrategy
+ ).defaultWorkerWeight
+ ])
+ // We need to clean up the resources after our test
+ await pool.destroy()
+ })
+
it('Verify unknown strategy throw error', () => {
expect(
() =>