- `WorkerChoiceStrategies.ROUND_ROBIN`: Submit tasks to worker in a round robin fashion
- `WorkerChoiceStrategies.LEAST_USED`: Submit tasks to the worker with the minimum number of executed, executing and queued tasks
- `WorkerChoiceStrategies.LEAST_BUSY`: Submit tasks to the worker with the minimum tasks total execution and wait time
+ - `WorkerChoiceStrategies.LEAST_ELU`: Submit tasks to the worker with the minimum event loop utilization (ELU) (experimental)
- `WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN`: Submit tasks to worker by using a weighted round robin scheduling algorithm based on tasks execution time
- `WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN`: Submit tasks to worker by using an interleaved weighted round robin scheduling algorithm based on tasks execution time (experimental)
- `WorkerChoiceStrategies.FAIR_SHARE`: Submit tasks to worker by using a fair share tasks scheduling algorithm based on tasks execution time
--- /dev/null
+import { DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS } from '../../utils'
+import type { IPool } from '../pool'
+import type { IWorker } from '../worker'
+import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy'
+import type {
+ IWorkerChoiceStrategy,
+ TaskStatisticsRequirements,
+ WorkerChoiceStrategyOptions
+} from './selection-strategies-types'
+
+/**
+ * Selects the worker with the least ELU.
+ *
+ * @typeParam Worker - Type of worker which manages the strategy.
+ * @typeParam Data - Type of data sent to the worker. This can only be serializable data.
+ * @typeParam Response - Type of execution response. This can only be serializable data.
+ */
+export class LeastEluWorkerChoiceStrategy<
+ Worker extends IWorker,
+ Data = unknown,
+ Response = unknown
+ >
+ extends AbstractWorkerChoiceStrategy<Worker, Data, Response>
+ implements IWorkerChoiceStrategy {
+ /** @inheritDoc */
+ public readonly taskStatisticsRequirements: TaskStatisticsRequirements = {
+ runTime: false,
+ avgRunTime: false,
+ medRunTime: false,
+ waitTime: false,
+ avgWaitTime: false,
+ medWaitTime: false,
+ elu: true
+ }
+
+ /** @inheritDoc */
+ public constructor (
+ pool: IPool<Worker, Data, Response>,
+ opts: WorkerChoiceStrategyOptions = DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
+ ) {
+ super(pool, opts)
+ this.setTaskStatistics(this.opts)
+ }
+
+ /** @inheritDoc */
+ public reset (): boolean {
+ return true
+ }
+
+ /** @inheritDoc */
+ public update (): boolean {
+ return true
+ }
+
+ /** @inheritDoc */
+ public choose (): number {
+ let minWorkerElu = Infinity
+ let leastEluWorkerNodeKey!: number
+ for (const [workerNodeKey, workerNode] of this.pool.workerNodes.entries()) {
+ const workerUsage = workerNode.workerUsage
+ const workerElu = workerUsage.elu?.utilization ?? 0
+ if (workerElu === 0) {
+ return workerNodeKey
+ } else if (workerElu < minWorkerElu) {
+ minWorkerElu = workerElu
+ leastEluWorkerNodeKey = workerNodeKey
+ }
+ }
+ return leastEluWorkerNodeKey
+ }
+
+ /** @inheritDoc */
+ public remove (): boolean {
+ return true
+ }
+}
* Least busy worker selection strategy.
*/
LEAST_BUSY: 'LEAST_BUSY',
+ /**
+ * Least ELU worker selection strategy.
+ *
+ * @experimental
+ */
+ LEAST_ELU: 'LEAST_ELU',
/**
* Fair share worker selection strategy.
*/
import { InterleavedWeightedRoundRobinWorkerChoiceStrategy } from './interleaved-weighted-round-robin-worker-choice-strategy'
import { LeastBusyWorkerChoiceStrategy } from './least-busy-worker-choice-strategy'
import { LeastUsedWorkerChoiceStrategy } from './least-used-worker-choice-strategy'
+import { LeastEluWorkerChoiceStrategy } from './least-elu-worker-choice-strategy'
import { RoundRobinWorkerChoiceStrategy } from './round-robin-worker-choice-strategy'
import type {
IWorkerChoiceStrategy,
opts
)
],
+ [
+ WorkerChoiceStrategies.LEAST_ELU,
+ new (LeastEluWorkerChoiceStrategy.bind(this))<Worker, Data, Response>(
+ pool,
+ opts
+ )
+ ],
[
WorkerChoiceStrategies.FAIR_SHARE,
new (FairShareWorkerChoiceStrategy.bind(this))<Worker, Data, Response>(
FixedClusterPool
} = require('../../../lib')
const { CircularArray } = require('../../../lib/circular-array')
+const TestUtils = require('../../test-utils')
describe('Selection strategies test suite', () => {
const min = 0
expect(WorkerChoiceStrategies.ROUND_ROBIN).toBe('ROUND_ROBIN')
expect(WorkerChoiceStrategies.LEAST_USED).toBe('LEAST_USED')
expect(WorkerChoiceStrategies.LEAST_BUSY).toBe('LEAST_BUSY')
+ expect(WorkerChoiceStrategies.LEAST_ELU).toBe('LEAST_ELU')
expect(WorkerChoiceStrategies.FAIR_SHARE).toBe('FAIR_SHARE')
expect(WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN).toBe(
'WEIGHTED_ROUND_ROBIN'
await pool.destroy()
})
+ it('Verify LEAST_ELU strategy default tasks usage statistics requirements', async () => {
+ const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_ELU
+ let pool = new FixedThreadPool(
+ max,
+ './tests/worker-files/thread/testWorker.js',
+ { workerChoiceStrategy }
+ )
+ expect(
+ pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ ).toStrictEqual({
+ runTime: false,
+ avgRunTime: false,
+ medRunTime: false,
+ waitTime: false,
+ avgWaitTime: false,
+ medWaitTime: false,
+ elu: true
+ })
+ await pool.destroy()
+ pool = new DynamicThreadPool(
+ min,
+ max,
+ './tests/worker-files/thread/testWorker.js',
+ { workerChoiceStrategy }
+ )
+ expect(
+ pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ ).toStrictEqual({
+ runTime: false,
+ avgRunTime: false,
+ medRunTime: false,
+ waitTime: false,
+ avgWaitTime: false,
+ medWaitTime: false,
+ elu: true
+ })
+ // We need to clean up the resources after our test
+ await pool.destroy()
+ })
+
+ it('Verify LEAST_ELU strategy can be run in a fixed pool', async () => {
+ const pool = new FixedThreadPool(
+ max,
+ './tests/worker-files/thread/testWorker.js',
+ { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_ELU }
+ )
+ // TODO: Create a better test to cover `LeastEluWorkerChoiceStrategy#choose`
+ const maxMultiplier = 2
+ for (let i = 0; i < max * maxMultiplier; i++) {
+ await pool.execute()
+ if (i !== max * maxMultiplier - 1) await TestUtils.sleep(500)
+ }
+ for (const workerNode of pool.workerNodes) {
+ const expectedWorkerUsage = {
+ tasks: {
+ executed: expect.any(Number),
+ executing: 0,
+ queued: 0,
+ failed: 0
+ },
+ runTime: {
+ aggregation: 0,
+ average: 0,
+ median: 0,
+ history: expect.any(CircularArray)
+ },
+ waitTime: {
+ aggregation: 0,
+ average: 0,
+ median: 0,
+ history: expect.any(CircularArray)
+ }
+ }
+ if (workerNode.workerUsage.elu === undefined) {
+ expect(workerNode.workerUsage).toStrictEqual({
+ ...expectedWorkerUsage,
+ elu: undefined
+ })
+ } else {
+ expect(workerNode.workerUsage).toStrictEqual({
+ ...expectedWorkerUsage,
+ elu: {
+ active: expect.any(Number),
+ idle: 0,
+ utilization: 1
+ }
+ })
+ }
+ expect(workerNode.workerUsage.tasks.executed).toBeGreaterThanOrEqual(0)
+ expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual(
+ max * maxMultiplier
+ )
+ }
+ // We need to clean up the resources after our test
+ await pool.destroy()
+ })
+
it('Verify FAIR_SHARE strategy default tasks usage statistics requirements', async () => {
const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
let pool = new FixedThreadPool(
const {
LeastBusyWorkerChoiceStrategy
} = require('../../../lib/pools/selection-strategies/least-busy-worker-choice-strategy')
+const {
+ LeastEluWorkerChoiceStrategy
+} = require('../../../lib/pools/selection-strategies/least-elu-worker-choice-strategy')
const {
FairShareWorkerChoiceStrategy
} = require('../../../lib/pools/selection-strategies/fair-share-worker-choice-strategy')
)
})
+ it('Verify that setWorkerChoiceStrategy() works with LEAST_ELU and fixed pool', () => {
+ const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_ELU
+ const workerChoiceStrategyContext = new WorkerChoiceStrategyContext(
+ fixedPool
+ )
+ workerChoiceStrategyContext.setWorkerChoiceStrategy(workerChoiceStrategy)
+ expect(
+ workerChoiceStrategyContext.workerChoiceStrategies.get(
+ workerChoiceStrategy
+ )
+ ).toBeInstanceOf(LeastEluWorkerChoiceStrategy)
+ expect(workerChoiceStrategyContext.workerChoiceStrategy).toBe(
+ workerChoiceStrategy
+ )
+ })
+
+ it('Verify that setWorkerChoiceStrategy() works with LEAST_ELU and dynamic pool', () => {
+ const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_ELU
+ const workerChoiceStrategyContext = new WorkerChoiceStrategyContext(
+ dynamicPool
+ )
+ workerChoiceStrategyContext.setWorkerChoiceStrategy(workerChoiceStrategy)
+ expect(
+ workerChoiceStrategyContext.workerChoiceStrategies.get(
+ workerChoiceStrategy
+ )
+ ).toBeInstanceOf(LeastEluWorkerChoiceStrategy)
+ expect(workerChoiceStrategyContext.workerChoiceStrategy).toBe(
+ workerChoiceStrategy
+ )
+ })
+
it('Verify that setWorkerChoiceStrategy() works with FAIR_SHARE and fixed pool', () => {
const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
const workerChoiceStrategyContext = new WorkerChoiceStrategyContext(