import { cpus } from 'node:os'
import {
DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS,
- DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
+ getDefaultInternalWorkerChoiceStrategyOptions
} from '../../utils'
import type { IPool } from '../pool'
import type { IWorker } from '../worker'
import type {
IWorkerChoiceStrategy,
+ InternalWorkerChoiceStrategyOptions,
MeasurementStatisticsRequirements,
StrategyPolicy,
- TaskStatisticsRequirements,
- WorkerChoiceStrategyOptions
+ TaskStatisticsRequirements
} from './selection-strategies-types'
/**
*/
public constructor (
protected readonly pool: IPool<Worker, Data, Response>,
- protected opts: WorkerChoiceStrategyOptions = DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
+ protected opts: InternalWorkerChoiceStrategyOptions
) {
- this.opts = { ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS, ...opts }
+ this.setOptions(this.opts)
this.choose = this.choose.bind(this)
}
protected setTaskStatisticsRequirements (
- opts: WorkerChoiceStrategyOptions
+ opts: InternalWorkerChoiceStrategyOptions
): void {
this.toggleMedianMeasurementStatisticsRequirements(
this.taskStatisticsRequirements.runTime,
public abstract remove (workerNodeKey: number): boolean
/** @inheritDoc */
- public setOptions (opts: WorkerChoiceStrategyOptions): void {
- this.opts = { ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS, ...opts }
+ public setOptions (opts: InternalWorkerChoiceStrategyOptions): void {
+ this.opts = {
+ ...getDefaultInternalWorkerChoiceStrategyOptions(this.pool.info.maxSize),
+ ...opts
+ }
this.setTaskStatisticsRequirements(this.opts)
}
+ /** @inheritDoc */
+ public hasPoolWorkerNodesReady (): boolean {
+ return this.pool.workerNodes.some(workerNode => workerNode.info.ready)
+ }
+
/**
* Whether the worker node is ready or not.
*
* @param workerNodeKey - The worker node key.
* @returns Whether the worker node is ready or not.
*/
- private isWorkerNodeReady (workerNodeKey: number): boolean {
- return this.pool.workerNodes[workerNodeKey].info.ready
+ protected isWorkerNodeReady (workerNodeKey: number): boolean {
+ return this.pool.workerNodes[workerNodeKey]?.info?.ready ?? false
}
/**
- * Whether the worker node has back pressure or not (i.e. its tasks queue is full).
- *
- * @param workerNodeKey - The worker node key.
- * @returns `true` if the worker node has back pressure, `false` otherwise.
+ * Check the next worker node readiness.
*/
- private hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
- return this.pool.hasWorkerNodeBackPressure(workerNodeKey)
- }
-
- /**
- * Whether the worker node is eligible or not.
- * A worker node is eligible if it is ready and does not have back pressure.
- *
- * @param workerNodeKey - The worker node key.
- * @returns `true` if the worker node is eligible, `false` otherwise.
- * @see {@link isWorkerNodeReady}
- * @see {@link hasWorkerNodeBackPressure}
- */
- protected isWorkerNodeEligible (workerNodeKey: number): boolean {
- return (
- this.isWorkerNodeReady(workerNodeKey) &&
- !this.hasWorkerNodeBackPressure(workerNodeKey)
- )
+ protected checkNextWorkerNodeReadiness (): void {
+ if (!this.isWorkerNodeReady(this.nextWorkerNodeKey as number)) {
+ delete this.nextWorkerNodeKey
+ }
}
/**
this.previousWorkerNodeKey = workerNodeKey ?? this.previousWorkerNodeKey
}
- /**
- * Check the next worker node eligibility.
- */
- protected checkNextWorkerNodeEligibility (): void {
- if (!this.isWorkerNodeEligible(this.nextWorkerNodeKey as number)) {
- delete this.nextWorkerNodeKey
- }
- }
-
protected computeDefaultWorkerWeight (): number {
let cpusCycleTimeWeight = 0
for (const cpu of cpus()) {