## [Unreleased]
+### Added
+
+- Add `LESS_BUSY` worker choice strategy.
+
### Changed
- Optimize worker storage in pool.
- Optimize worker alive status check.
-- Rename worker choice strategy `LESS_RECENTLY_USED to `LESS_USED` .
+- Rename worker choice strategy `LESS_RECENTLY_USED` to `LESS_USED`.
- Optimize `LESS_USED` worker choice strategy.
### Fixed
- `WorkerChoiceStrategies.ROUND_ROBIN`: Submit tasks to worker in a round robbin fashion
- `WorkerChoiceStrategies.LESS_USED`: Submit tasks to the less used worker
+ - `WorkerChoiceStrategies.LESS_BUSY`: Submit tasks to the less busy worker
- `WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN` Submit tasks to worker using a weighted round robin scheduling algorithm based on tasks execution time
- `WorkerChoiceStrategies.FAIR_SHARE`: Submit tasks to worker using a fair share tasks scheduling algorithm based on tasks execution time
const {
dynamicClusterTest,
dynamicClusterTestFairShare,
- dynamicClusterTestLessRecentlyUsed,
+ dynamicClusterTestLessUsed,
dynamicClusterTestWeightedRoundRobin
} = require('./cluster/dynamic')
const {
fixedClusterTest,
fixedClusterTestFairShare,
- fixedClusterTestLessRecentlyUsed,
+ fixedClusterTestLessUsed,
fixedClusterTestWeightedRoundRobin
} = require('./cluster/fixed')
const {
dynamicThreadTest,
dynamicThreadTestFairShare,
- dynamicThreadTestLessRecentlyUsed,
+ dynamicThreadTestLessUsed,
dynamicThreadTestWeightedRoundRobin
} = require('./thread/dynamic')
const {
fixedThreadTest,
fixedThreadTestFairShare,
- fixedThreadTestLessRecentlyUsed,
+ fixedThreadTestLessUsed,
fixedThreadTestWeightedRoundRobin
} = require('./thread/fixed')
Benchmark.add('Poolifier:Fixed:ThreadPool', async () => {
await fixedThreadTest()
}),
- Benchmark.add('Poolifier:Fixed:ThreadPool:LessRecentlyUsed', async () => {
- await fixedThreadTestLessRecentlyUsed()
+ Benchmark.add('Poolifier:Fixed:ThreadPool:LessUsed', async () => {
+ await fixedThreadTestLessUsed()
}),
Benchmark.add('Poolifier:Fixed:ThreadPool:WeightedRoundRobin', async () => {
await fixedThreadTestWeightedRoundRobin()
Benchmark.add('Poolifier:Dynamic:ThreadPool', async () => {
await dynamicThreadTest()
}),
- Benchmark.add('Poolifier:Dynamic:ThreadPool:LessRecentlyUsed', async () => {
- await dynamicThreadTestLessRecentlyUsed()
+ Benchmark.add('Poolifier:Dynamic:ThreadPool:LessUsed', async () => {
+ await dynamicThreadTestLessUsed()
}),
Benchmark.add('Poolifier:Dynamic:ThreadPool:WeightedRoundRobin', async () => {
await dynamicThreadTestWeightedRoundRobin()
Benchmark.add('Poolifier:Fixed:ClusterPool', async () => {
await fixedClusterTest()
}),
- Benchmark.add('Poolifier:Fixed:ClusterPool:LessRecentlyUsed', async () => {
- await fixedClusterTestLessRecentlyUsed()
+ Benchmark.add('Poolifier:Fixed:ClusterPool:LessUsed', async () => {
+ await fixedClusterTestLessUsed()
}),
Benchmark.add('Poolifier:Fixed:ClusterPool:WeightedRoundRobin', async () => {
await fixedClusterTestWeightedRoundRobin
Benchmark.add('Poolifier:Dynamic:ClusterPool', async () => {
await dynamicClusterTest()
}),
- Benchmark.add('Poolifier:Dynamic:ClusterPool:LessRecentlyUsed', async () => {
- await dynamicClusterTestLessRecentlyUsed()
+ Benchmark.add('Poolifier:Dynamic:ClusterPool:LessUsed', async () => {
+ await dynamicClusterTestLessUsed()
}),
Benchmark.add(
'Poolifier:Dynamic:ClusterPool:WeightedRoundRobin',
'./benchmarks/internal/cluster/worker.js'
)
-const dynamicPoolLessRecentlyUsed = new DynamicClusterPool(
+const dynamicPoolLessUsed = new DynamicClusterPool(
size / 2,
size * 3,
'./benchmarks/internal/cluster/worker.js',
return runPoolifierTest(dynamicPool, { tasks, workerData })
}
-async function dynamicClusterTestLessRecentlyUsed (
+async function dynamicClusterTestLessUsed (
{ tasks, workerData } = { tasks: numberOfTasks, workerData: { proof: 'ok' } }
) {
- return runPoolifierTest(dynamicPoolLessRecentlyUsed, { tasks, workerData })
+ return runPoolifierTest(dynamicPoolLessUsed, { tasks, workerData })
}
async function dynamicClusterTestWeightedRoundRobin (
module.exports = {
dynamicClusterTest,
- dynamicClusterTestLessRecentlyUsed,
+ dynamicClusterTestLessUsed,
dynamicClusterTestWeightedRoundRobin,
dynamicClusterTestFairShare
}
'./benchmarks/internal/cluster/worker.js'
)
-const fixedPoolLessRecentlyUsed = new FixedClusterPool(
+const fixedPoolLessUsed = new FixedClusterPool(
size,
'./benchmarks/internal/cluster/worker.js',
{ workerChoiceStrategy: WorkerChoiceStrategies.LESS_USED }
return runPoolifierTest(fixedPool, { tasks, workerData })
}
-async function fixedClusterTestLessRecentlyUsed (
+async function fixedClusterTestLessUsed (
{ tasks, workerData } = { tasks: numberOfTasks, workerData: { proof: 'ok' } }
) {
- return runPoolifierTest(fixedPoolLessRecentlyUsed, { tasks, workerData })
+ return runPoolifierTest(fixedPoolLessUsed, { tasks, workerData })
}
async function fixedClusterTestWeightedRoundRobin (
module.exports = {
fixedClusterTest,
- fixedClusterTestLessRecentlyUsed,
+ fixedClusterTestLessUsed,
fixedClusterTestWeightedRoundRobin,
fixedClusterTestFairShare
}
'./benchmarks/internal/thread/worker.js'
)
-const dynamicPoolLessRecentlyUsed = new DynamicThreadPool(
+const dynamicPoolLessUsed = new DynamicThreadPool(
size / 2,
size * 3,
'./benchmarks/internal/thread/worker.js',
return runPoolifierTest(dynamicPool, { tasks, workerData })
}
-async function dynamicThreadTestLessRecentlyUsed (
+async function dynamicThreadTestLessUsed (
{ tasks, workerData } = { tasks: numberOfTasks, workerData: { proof: 'ok' } }
) {
- return runPoolifierTest(dynamicPoolLessRecentlyUsed, { tasks, workerData })
+ return runPoolifierTest(dynamicPoolLessUsed, { tasks, workerData })
}
async function dynamicThreadTestWeightedRoundRobin (
module.exports = {
dynamicThreadTest,
- dynamicThreadTestLessRecentlyUsed,
+ dynamicThreadTestLessUsed,
dynamicThreadTestWeightedRoundRobin,
dynamicThreadTestFairShare
}
'./benchmarks/internal/thread/worker.js'
)
-const fixedPoolLessRecentlyUsed = new FixedThreadPool(
+const fixedPoolLessUsed = new FixedThreadPool(
size,
'./benchmarks/internal/thread/worker.js',
{ workerChoiceStrategy: WorkerChoiceStrategies.LESS_USED }
return runPoolifierTest(fixedPool, { tasks, workerData })
}
-async function fixedThreadTestLessRecentlyUsed (
+async function fixedThreadTestLessUsed (
{ tasks, workerData } = { tasks: numberOfTasks, workerData: { proof: 'ok' } }
) {
- return runPoolifierTest(fixedPoolLessRecentlyUsed, { tasks, workerData })
+ return runPoolifierTest(fixedPoolLessUsed, { tasks, workerData })
}
async function fixedThreadTestWeightedRoundRobin (
module.exports = {
fixedThreadTest,
- fixedThreadTestLessRecentlyUsed,
+ fixedThreadTestLessUsed,
fixedThreadTestWeightedRoundRobin,
fixedThreadTestFairShare
}
--- /dev/null
+import type { IPoolWorker } from '../pool-worker'
+import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy'
+import type { RequiredStatistics } from './selection-strategies-types'
+
+/**
+ * Selects the less busy worker.
+ *
+ * @typeParam Worker - Type of worker which manages the strategy.
+ * @typeParam Data - Type of data sent to the worker. This can only be serializable data.
+ * @typeParam Response - Type of response of execution. This can only be serializable data.
+ */
+export class LessBusyWorkerChoiceStrategy<
+ Worker extends IPoolWorker,
+ Data,
+ Response
+> extends AbstractWorkerChoiceStrategy<Worker, Data, Response> {
+ /** {@inheritDoc} */
+ public readonly requiredStatistics: RequiredStatistics = {
+ runTime: true
+ }
+
+ /** {@inheritDoc} */
+ public reset (): boolean {
+ return true
+ }
+
+ /** {@inheritDoc} */
+ public choose (): Worker {
+ let minRunTime = Infinity
+ let lessBusyWorker!: Worker
+ for (const value of this.pool.workers.values()) {
+ const worker = value.worker
+ const workerRunTime = this.pool.getWorkerTasksUsage(worker)
+ ?.runTime as number
+ if (!this.isDynamicPool && workerRunTime === 0) {
+ return worker
+ } else if (workerRunTime < minRunTime) {
+ minRunTime = workerRunTime
+ lessBusyWorker = worker
+ }
+ }
+ return lessBusyWorker
+ }
+}
/** {@inheritDoc} */
public choose (): Worker {
let minNumberOfTasks = Infinity
- // A worker is always found because it picks the one with fewer tasks
- let lessRecentlyUsedWorker!: Worker
+ let lessUsedWorker!: Worker
for (const value of this.pool.workers.values()) {
const worker = value.worker
const tasksUsage = this.pool.getWorkerTasksUsage(worker)
return worker
} else if (workerTasks < minNumberOfTasks) {
minNumberOfTasks = workerTasks
- lessRecentlyUsedWorker = worker
+ lessUsedWorker = worker
}
}
- return lessRecentlyUsedWorker
+ return lessUsedWorker
}
}
* Less used worker selection strategy.
*/
LESS_USED: 'LESS_USED',
+ /**
+ * Less busy worker selection strategy.
+ */
+ LESS_BUSY: 'LESS_BUSY',
/**
* Fair share worker selection strategy.
*/
import type { IPoolInternal } from '../pool-internal'
import type { IPoolWorker } from '../pool-worker'
import { FairShareWorkerChoiceStrategy } from './fair-share-worker-choice-strategy'
+import { LessBusyWorkerChoiceStrategy } from './less-busy-worker-choice-strategy'
import { LessUsedWorkerChoiceStrategy } from './less-used-worker-choice-strategy'
import { RoundRobinWorkerChoiceStrategy } from './round-robin-worker-choice-strategy'
import type {
return new RoundRobinWorkerChoiceStrategy(pool)
case WorkerChoiceStrategies.LESS_USED:
return new LessUsedWorkerChoiceStrategy(pool)
+ case WorkerChoiceStrategies.LESS_BUSY:
+ return new LessBusyWorkerChoiceStrategy(pool)
case WorkerChoiceStrategies.FAIR_SHARE:
return new FairShareWorkerChoiceStrategy(pool)
case WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN:
const {
LessUsedWorkerChoiceStrategy
} = require('../../../lib/pools/selection-strategies/less-used-worker-choice-strategy')
+const {
+ LessBusyWorkerChoiceStrategy
+} = require('../../../lib/pools/selection-strategies/less-busy-worker-choice-strategy')
const {
FairShareWorkerChoiceStrategy
} = require('../../../lib/pools/selection-strategies/fair-share-worker-choice-strategy')
expect(strategy).toBeInstanceOf(LessUsedWorkerChoiceStrategy)
})
+ it('Verify that getWorkerChoiceStrategy() can return LESS_BUSY strategy', () => {
+ const strategy = getWorkerChoiceStrategy(
+ pool,
+ WorkerChoiceStrategies.LESS_BUSY
+ )
+ expect(strategy).toBeInstanceOf(LessBusyWorkerChoiceStrategy)
+ })
+
it('Verify that getWorkerChoiceStrategy() can return FAIR_SHARE strategy', () => {
const strategy = getWorkerChoiceStrategy(
pool,
it('Verify that WorkerChoiceStrategies enumeration provides string values', () => {
expect(WorkerChoiceStrategies.ROUND_ROBIN).toBe('ROUND_ROBIN')
expect(WorkerChoiceStrategies.LESS_USED).toBe('LESS_USED')
+ expect(WorkerChoiceStrategies.LESS_BUSY).toBe('LESS_BUSY')
expect(WorkerChoiceStrategies.FAIR_SHARE).toBe('FAIR_SHARE')
expect(WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN).toBe(
'WEIGHTED_ROUND_ROBIN'
'./tests/worker-files/thread/testWorker.js',
{ workerChoiceStrategy: WorkerChoiceStrategies.LESS_USED }
)
- // TODO: Create a better test to cover `LessRecentlyUsedWorkerChoiceStrategy#choose`
+ // TODO: Create a better test to cover `LessUsedWorkerChoiceStrategy#choose`
const promises = []
for (let i = 0; i < max * 2; i++) {
promises.push(pool.execute())
'./tests/worker-files/thread/testWorker.js',
{ workerChoiceStrategy: WorkerChoiceStrategies.LESS_USED }
)
- // TODO: Create a better test to cover `LessRecentlyUsedWorkerChoiceStrategy#choose`
+ // TODO: Create a better test to cover `LessUsedWorkerChoiceStrategy#choose`
+ const promises = []
+ for (let i = 0; i < max * 2; i++) {
+ promises.push(pool.execute())
+ }
+ await Promise.all(promises)
+ // We need to clean up the resources after our test
+ await pool.destroy()
+ })
+
+ it('Verify LESS_BUSY strategy is taken at pool creation', async () => {
+ const pool = new FixedThreadPool(
+ max,
+ './tests/worker-files/thread/testWorker.js',
+ { workerChoiceStrategy: WorkerChoiceStrategies.LESS_BUSY }
+ )
+ expect(pool.opts.workerChoiceStrategy).toBe(
+ WorkerChoiceStrategies.LESS_BUSY
+ )
+ // We need to clean up the resources after our test
+ await pool.destroy()
+ })
+
+ it('Verify LESS_BUSY strategy can be set after pool creation', async () => {
+ const pool = new FixedThreadPool(
+ max,
+ './tests/worker-files/thread/testWorker.js'
+ )
+ pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.LESS_BUSY)
+ expect(pool.opts.workerChoiceStrategy).toBe(
+ WorkerChoiceStrategies.LESS_BUSY
+ )
+ // We need to clean up the resources after our test
+ await pool.destroy()
+ })
+
+ it('Verify LESS_BUSY strategy default tasks usage statistics requirements', async () => {
+ let pool = new FixedThreadPool(
+ max,
+ './tests/worker-files/thread/testWorker.js'
+ )
+ pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.LESS_BUSY)
+ expect(
+ pool.workerChoiceStrategyContext.getWorkerChoiceStrategy()
+ .requiredStatistics.runTime
+ ).toBe(true)
+ await pool.destroy()
+ pool = new DynamicThreadPool(
+ min,
+ max,
+ './tests/worker-files/thread/testWorker.js'
+ )
+ pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.LESS_BUSY)
+ expect(
+ pool.workerChoiceStrategyContext.getWorkerChoiceStrategy()
+ .requiredStatistics.runTime
+ ).toBe(true)
+ // We need to clean up the resources after our test
+ await pool.destroy()
+ })
+
+ it('Verify LESS_BUSY strategy can be run in a fixed pool', async () => {
+ const pool = new FixedThreadPool(
+ max,
+ './tests/worker-files/thread/testWorker.js',
+ { workerChoiceStrategy: WorkerChoiceStrategies.LESS_BUSY }
+ )
+ // TODO: Create a better test to cover `LessBusyWorkerChoiceStrategy#choose`
+ const promises = []
+ for (let i = 0; i < max * 2; i++) {
+ promises.push(pool.execute())
+ }
+ await Promise.all(promises)
+ // We need to clean up the resources after our test
+ await pool.destroy()
+ })
+
+ it('Verify LESS_BUSY strategy can be run in a dynamic pool', async () => {
+ const pool = new DynamicThreadPool(
+ min,
+ max,
+ './tests/worker-files/thread/testWorker.js',
+ { workerChoiceStrategy: WorkerChoiceStrategies.LESS_BUSY }
+ )
+ // TODO: Create a better test to cover `LessBusyWorkerChoiceStrategy#choose`
const promises = []
for (let i = 0; i < max * 2; i++) {
promises.push(pool.execute())
const {
LessUsedWorkerChoiceStrategy
} = require('../../../lib/pools/selection-strategies/less-used-worker-choice-strategy')
+const {
+ LessBusyWorkerChoiceStrategy
+} = require('../../../lib/pools/selection-strategies/less-busy-worker-choice-strategy')
const {
FairShareWorkerChoiceStrategy
} = require('../../../lib/pools/selection-strategies/fair-share-worker-choice-strategy')
).toBeInstanceOf(LessUsedWorkerChoiceStrategy)
})
+ it('Verify that setWorkerChoiceStrategy() works with LESS_BUSY and fixed pool', () => {
+ const workerChoiceStrategyContext = new WorkerChoiceStrategyContext(
+ fixedPool
+ )
+ workerChoiceStrategyContext.setWorkerChoiceStrategy(
+ WorkerChoiceStrategies.LESS_BUSY
+ )
+ expect(
+ workerChoiceStrategyContext.getWorkerChoiceStrategy()
+ ).toBeInstanceOf(LessBusyWorkerChoiceStrategy)
+ })
+
+ it('Verify that setWorkerChoiceStrategy() works with LESS_BUSY and dynamic pool', () => {
+ const workerChoiceStrategyContext = new WorkerChoiceStrategyContext(
+ dynamicPool
+ )
+ workerChoiceStrategyContext.setWorkerChoiceStrategy(
+ WorkerChoiceStrategies.LESS_BUSY
+ )
+ expect(
+ workerChoiceStrategyContext.getWorkerChoiceStrategy()
+ ).toBeInstanceOf(DynamicPoolWorkerChoiceStrategy)
+ expect(
+ workerChoiceStrategyContext.getWorkerChoiceStrategy().workerChoiceStrategy
+ ).toBeInstanceOf(LessBusyWorkerChoiceStrategy)
+ })
+
it('Verify that setWorkerChoiceStrategy() works with FAIR_SHARE and fixed pool', () => {
const workerChoiceStrategyContext = new WorkerChoiceStrategyContext(
fixedPool