workerChoiceStrategy: WorkerChoiceStrategy,
workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
): void {
+ let requireSync = false
checkValidWorkerChoiceStrategy(workerChoiceStrategy)
if (workerChoiceStrategyOptions != null) {
- this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
+ requireSync = this.setWorkerChoiceStrategyOptions(
+ workerChoiceStrategyOptions
+ )
}
if (workerChoiceStrategy !== this.opts.workerChoiceStrategy) {
this.opts.workerChoiceStrategy = workerChoiceStrategy
this.opts.workerChoiceStrategy,
this.opts.workerChoiceStrategyOptions
)
+ requireSync = true
+ }
+ if (requireSync) {
this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies(
this.getWorkerWorkerChoiceStrategies(),
this.opts.workerChoiceStrategyOptions
)
- for (const [workerNodeKey] of this.workerNodes.entries()) {
+ for (const workerNodeKey of this.workerNodes.keys()) {
this.sendStatisticsMessageToWorker(workerNodeKey)
}
}
/** @inheritDoc */
public setWorkerChoiceStrategyOptions (
workerChoiceStrategyOptions: WorkerChoiceStrategyOptions | undefined
- ): void {
+ ): boolean {
this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
if (workerChoiceStrategyOptions != null) {
this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
+ this.workerChoiceStrategiesContext?.setOptions(
+ this.opts.workerChoiceStrategyOptions
+ )
+ this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies(
+ this.getWorkerWorkerChoiceStrategies(),
+ this.opts.workerChoiceStrategyOptions
+ )
+ for (const workerNodeKey of this.workerNodes.keys()) {
+ this.sendStatisticsMessageToWorker(workerNodeKey)
+ }
+ return true
}
- this.workerChoiceStrategiesContext?.setOptions(
- this.opts.workerChoiceStrategyOptions
- )
+ return false
}
/** @inheritDoc */
}
private setTaskStealing (): void {
- for (const [workerNodeKey] of this.workerNodes.entries()) {
+ for (const workerNodeKey of this.workerNodes.keys()) {
this.workerNodes[workerNodeKey].on('idle', this.handleWorkerNodeIdleEvent)
}
}
private unsetTaskStealing (): void {
- for (const [workerNodeKey] of this.workerNodes.entries()) {
+ for (const workerNodeKey of this.workerNodes.keys()) {
this.workerNodes[workerNodeKey].off(
'idle',
this.handleWorkerNodeIdleEvent
}
private setTasksStealingOnBackPressure (): void {
- for (const [workerNodeKey] of this.workerNodes.entries()) {
+ for (const workerNodeKey of this.workerNodes.keys()) {
this.workerNodes[workerNodeKey].on(
'backPressure',
this.handleWorkerNodeBackPressureEvent
}
private unsetTasksStealingOnBackPressure (): void {
- for (const [workerNodeKey] of this.workerNodes.entries()) {
+ for (const workerNodeKey of this.workerNodes.keys()) {
this.workerNodes[workerNodeKey].off(
'backPressure',
this.handleWorkerNodeBackPressureEvent
}
}
}
- for (const [workerNodeKey] of this.workerNodes.entries()) {
+ for (const workerNodeKey of this.workerNodes.keys()) {
this.registerWorkerMessageListener(
workerNodeKey,
taskFunctionOperationsListener
}
private flushTasksQueues (): void {
- for (const [workerNodeKey] of this.workerNodes.entries()) {
+ for (const workerNodeKey of this.workerNodes.keys()) {
this.flushTasksQueue(workerNodeKey)
}
}
* Sets the worker choice strategy options in this pool.
*
* @param workerChoiceStrategyOptions - The worker choice strategy options.
+ * @returns `true` if the worker choice strategy options were set, `false` otherwise.
*/
readonly setWorkerChoiceStrategyOptions: (
workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
- ) => void
+ ) => boolean
/**
* Enables/disables the worker node tasks queue in this pool.
*
import { RoundRobinWorkerChoiceStrategy } from './round-robin-worker-choice-strategy.js'
import {
type IWorkerChoiceStrategy,
+ type StrategyPolicy,
+ type TaskStatisticsRequirements,
WorkerChoiceStrategies,
type WorkerChoiceStrategy,
type WorkerChoiceStrategyOptions
}
}
+export const buildWorkerChoiceStrategiesPolicy = (
+ workerChoiceStrategies: Map<WorkerChoiceStrategy, IWorkerChoiceStrategy>
+): StrategyPolicy => {
+ const policies: StrategyPolicy[] = []
+ for (const workerChoiceStrategy of workerChoiceStrategies.values()) {
+ policies.push(workerChoiceStrategy.strategyPolicy)
+ }
+ return {
+ dynamicWorkerUsage: policies.some(p => p.dynamicWorkerUsage),
+ dynamicWorkerReady: policies.some(p => p.dynamicWorkerReady)
+ }
+}
+
+export const buildWorkerChoiceStrategiesTaskStatisticsRequirements = (
+ workerChoiceStrategies: Map<WorkerChoiceStrategy, IWorkerChoiceStrategy>
+): TaskStatisticsRequirements => {
+ const taskStatisticsRequirements: TaskStatisticsRequirements[] = []
+ for (const workerChoiceStrategy of workerChoiceStrategies.values()) {
+ taskStatisticsRequirements.push(
+ workerChoiceStrategy.taskStatisticsRequirements
+ )
+ }
+ return {
+ runTime: {
+ aggregate: taskStatisticsRequirements.some(r => r.runTime.aggregate),
+ average: taskStatisticsRequirements.some(r => r.runTime.average),
+ median: taskStatisticsRequirements.some(r => r.runTime.median)
+ },
+ waitTime: {
+ aggregate: taskStatisticsRequirements.some(r => r.waitTime.aggregate),
+ average: taskStatisticsRequirements.some(r => r.waitTime.average),
+ median: taskStatisticsRequirements.some(r => r.waitTime.median)
+ },
+ elu: {
+ aggregate: taskStatisticsRequirements.some(r => r.elu.aggregate),
+ average: taskStatisticsRequirements.some(r => r.elu.average),
+ median: taskStatisticsRequirements.some(r => r.elu.median)
+ }
+ }
+}
+
export const getWorkerChoiceStrategy = <Worker extends IWorker, Data, Response>(
workerChoiceStrategy: WorkerChoiceStrategy,
pool: IPool<Worker, Data, Response>,
} from './selection-strategies-types.js'
import { WorkerChoiceStrategies } from './selection-strategies-types.js'
import {
+ buildWorkerChoiceStrategiesPolicy,
+ buildWorkerChoiceStrategiesTaskStatisticsRequirements,
getWorkerChoiceStrategiesRetries,
getWorkerChoiceStrategy
} from './selection-strategies-utils.js'
IWorkerChoiceStrategy
>
+ /**
+ * The active worker choice strategies in the context policy.
+ */
+ private workerChoiceStrategiesPolicy: StrategyPolicy
+
+ /**
+ * The active worker choice strategies in the context task statistics requirements.
+ */
+ private workerChoiceStrategiesTaskStatisticsRequirements: TaskStatisticsRequirements
+
/**
* The maximum number of worker choice strategies execution retries.
*/
for (const workerChoiceStrategy of workerChoiceStrategies) {
this.addWorkerChoiceStrategy(workerChoiceStrategy, this.pool, opts)
}
+ this.workerChoiceStrategiesPolicy = buildWorkerChoiceStrategiesPolicy(
+ this.workerChoiceStrategies
+ )
+ this.workerChoiceStrategiesTaskStatisticsRequirements =
+ buildWorkerChoiceStrategiesTaskStatisticsRequirements(
+ this.workerChoiceStrategies
+ )
this.retriesCount = 0
this.retries = getWorkerChoiceStrategiesRetries<Worker, Data, Response>(
this.pool,
* @returns The strategies policy.
*/
public getPolicy (): StrategyPolicy {
- const policies: StrategyPolicy[] = []
- for (const workerChoiceStrategy of this.workerChoiceStrategies.values()) {
- policies.push(workerChoiceStrategy.strategyPolicy)
- }
- return {
- dynamicWorkerUsage: policies.some(p => p.dynamicWorkerUsage),
- dynamicWorkerReady: policies.some(p => p.dynamicWorkerReady)
- }
+ return this.workerChoiceStrategiesPolicy
}
/**
* Gets the active worker choice strategies in the context task statistics requirements.
*
- * @returns The task statistics requirements.
+ * @returns The strategies task statistics requirements.
*/
public getTaskStatisticsRequirements (): TaskStatisticsRequirements {
- const taskStatisticsRequirements: TaskStatisticsRequirements[] = []
- for (const workerChoiceStrategy of this.workerChoiceStrategies.values()) {
- taskStatisticsRequirements.push(
- workerChoiceStrategy.taskStatisticsRequirements
- )
- }
- return {
- runTime: {
- aggregate: taskStatisticsRequirements.some(r => r.runTime.aggregate),
- average: taskStatisticsRequirements.some(r => r.runTime.average),
- median: taskStatisticsRequirements.some(r => r.runTime.median)
- },
- waitTime: {
- aggregate: taskStatisticsRequirements.some(r => r.waitTime.aggregate),
- average: taskStatisticsRequirements.some(r => r.waitTime.average),
- median: taskStatisticsRequirements.some(r => r.waitTime.median)
- },
- elu: {
- aggregate: taskStatisticsRequirements.some(r => r.elu.aggregate),
- average: taskStatisticsRequirements.some(r => r.elu.average),
- median: taskStatisticsRequirements.some(r => r.elu.median)
- }
- }
+ return this.workerChoiceStrategiesTaskStatisticsRequirements
}
/**
}
/**
- * Executes the worker choice strategy in the context algorithm.
+ * Executes the given worker choice strategy in the context algorithm.
*
* @param workerChoiceStrategy - The worker choice strategy algorithm to execute. @defaultValue this.defaultWorkerChoiceStrategy
* @returns The key of the worker node.
this.addWorkerChoiceStrategy(workerChoiceStrategy, this.pool, opts)
}
}
+ this.workerChoiceStrategiesPolicy = buildWorkerChoiceStrategiesPolicy(
+ this.workerChoiceStrategies
+ )
+ this.workerChoiceStrategiesTaskStatisticsRequirements =
+ buildWorkerChoiceStrategiesTaskStatisticsRequirements(
+ this.workerChoiceStrategies
+ )
}
/**