type WorkerChoiceStrategy,
type WorkerChoiceStrategyOptions
} from './selection-strategies/selection-strategies-types.js'
-import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context.js'
+import { WorkerChoiceStrategiesContext } from './selection-strategies/worker-choice-strategies-context.js'
import {
checkFilePath,
checkValidTasksQueueOptions,
/**
* The task execution response promise map:
* - `key`: The message id of each submitted task.
- * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
+ * - `value`: An object that contains task's worker node key, execution response promise resolve and reject callbacks, async resource.
*
* When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
*/
new Map<string, PromiseResponseWrapper<Response>>()
/**
- * Worker choice strategy context referencing a worker choice algorithm implementation.
+ * Worker choice strategies context referencing worker choice algorithms implementation.
*/
- protected workerChoiceStrategyContext?: WorkerChoiceStrategyContext<
+ protected workerChoiceStrategiesContext?: WorkerChoiceStrategiesContext<
Worker,
Data,
Response
if (this.opts.enableEvents === true) {
this.initializeEventEmitter()
}
- this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
+ this.workerChoiceStrategiesContext = new WorkerChoiceStrategiesContext<
Worker,
Data,
Response
>(
this,
- this.opts.workerChoiceStrategy,
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ [this.opts.workerChoiceStrategy!],
this.opts.workerChoiceStrategyOptions
)
started: this.started,
ready: this.ready,
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- strategy: this.opts.workerChoiceStrategy!,
- strategyRetries: this.workerChoiceStrategyContext?.retriesCount ?? 0,
+ defaultStrategy: this.opts.workerChoiceStrategy!,
+ strategyRetries: this.workerChoiceStrategiesContext?.retriesCount ?? 0,
minSize: this.minimumNumberOfWorkers,
maxSize: this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers,
- ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
+ ...(this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
.runTime.aggregate === true &&
- this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
.waitTime.aggregate && {
utilization: round(this.utilization)
}),
accumulator + workerNode.usage.tasks.failed,
0
),
- ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
+ ...(this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
.runTime.aggregate === true && {
runTime: {
minimum: round(
)
)
),
- ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
.runTime.average && {
average: round(
average(
)
)
}),
- ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
.runTime.median && {
median: round(
median(
})
}
}),
- ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
+ ...(this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
.waitTime.aggregate === true && {
waitTime: {
minimum: round(
)
)
),
- ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
.waitTime.average && {
average: round(
average(
)
)
}),
- ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
.waitTime.median && {
median: round(
median(
): void {
checkValidWorkerChoiceStrategy(workerChoiceStrategy)
this.opts.workerChoiceStrategy = workerChoiceStrategy
- this.workerChoiceStrategyContext?.setWorkerChoiceStrategy(
- this.opts.workerChoiceStrategy
+ this.workerChoiceStrategiesContext?.setDefaultWorkerChoiceStrategy(
+ this.opts.workerChoiceStrategy,
+ workerChoiceStrategyOptions
)
- if (workerChoiceStrategyOptions != null) {
- this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
- }
- for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
- workerNode.resetUsage()
+ for (const [workerNodeKey] of this.workerNodes.entries()) {
this.sendStatisticsMessageToWorker(workerNodeKey)
}
}
if (workerChoiceStrategyOptions != null) {
this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
}
- this.workerChoiceStrategyContext?.setOptions(
+ this.workerChoiceStrategiesContext?.setOptions(
this.opts.workerChoiceStrategyOptions
)
}
return []
}
+ /**
+ * Gets task function strategy, if any.
+ *
+ * @param name - The task function name.
+ * @returns The task function worker choice strategy if the task function worker choice strategy is defined, `undefined` otherwise.
+ */
+ private readonly getTaskFunctionWorkerWorkerChoiceStrategy = (
+ name?: string
+ ): WorkerChoiceStrategy | undefined => {
+ if (name != null) {
+ return this.listTaskFunctionsProperties().find(
+ (taskFunctionProperties: TaskFunctionProperties) =>
+ taskFunctionProperties.name === name
+ )?.strategy
+ }
+ }
+
/** @inheritDoc */
public async setDefaultTaskFunction (name: string): Promise<boolean> {
return await this.sendTaskFunctionOperationToWorkers({
return
}
const timestamp = performance.now()
- const workerNodeKey = this.chooseWorkerNode()
+ const workerNodeKey = this.chooseWorkerNode(
+ this.getTaskFunctionWorkerWorkerChoiceStrategy(name)
+ )
const task: Task<Data> = {
name: name ?? DEFAULT_TASK_NAME,
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions
const workerUsage = this.workerNodes[workerNodeKey].usage
++workerUsage.tasks.executing
updateWaitTimeWorkerUsage(
- this.workerChoiceStrategyContext,
+ this.workerChoiceStrategiesContext,
workerUsage,
task
)
].getTaskFunctionWorkerUsage(task.name!)!
++taskFunctionWorkerUsage.tasks.executing
updateWaitTimeWorkerUsage(
- this.workerChoiceStrategyContext,
+ this.workerChoiceStrategiesContext,
taskFunctionWorkerUsage,
task
)
const workerUsage = this.workerNodes[workerNodeKey].usage
updateTaskStatisticsWorkerUsage(workerUsage, message)
updateRunTimeWorkerUsage(
- this.workerChoiceStrategyContext,
+ this.workerChoiceStrategiesContext,
workerUsage,
message
)
updateEluWorkerUsage(
- this.workerChoiceStrategyContext,
+ this.workerChoiceStrategiesContext,
workerUsage,
message
)
].getTaskFunctionWorkerUsage(message.taskPerformance!.name)!
updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
updateRunTimeWorkerUsage(
- this.workerChoiceStrategyContext,
+ this.workerChoiceStrategiesContext,
taskFunctionWorkerUsage,
message
)
updateEluWorkerUsage(
- this.workerChoiceStrategyContext,
+ this.workerChoiceStrategiesContext,
taskFunctionWorkerUsage,
message
)
needWorkerChoiceStrategyUpdate = true
}
if (needWorkerChoiceStrategyUpdate) {
- this.workerChoiceStrategyContext?.update(workerNodeKey)
+ this.workerChoiceStrategiesContext?.update(workerNodeKey)
}
}
/**
* Chooses a worker node for the next task.
*
- * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
- *
+ * @param workerChoiceStrategy - The worker choice strategy.
* @returns The chosen worker node key
*/
- private chooseWorkerNode (): number {
+ private chooseWorkerNode (
+ workerChoiceStrategy?: WorkerChoiceStrategy
+ ): number {
if (this.shallCreateDynamicWorker()) {
const workerNodeKey = this.createAndSetupDynamicWorkerNode()
if (
- this.workerChoiceStrategyContext?.getStrategyPolicy()
- .dynamicWorkerUsage === true
+ this.workerChoiceStrategiesContext?.getPolicy().dynamicWorkerUsage ===
+ true
) {
return workerNodeKey
}
}
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- return this.workerChoiceStrategyContext!.execute()
+ return this.workerChoiceStrategiesContext!.execute(workerChoiceStrategy)
}
/**
const workerNode = this.workerNodes[workerNodeKey]
workerNode.info.dynamic = true
if (
- this.workerChoiceStrategyContext?.getStrategyPolicy()
- .dynamicWorkerReady === true ||
- this.workerChoiceStrategyContext?.getStrategyPolicy()
- .dynamicWorkerUsage === true
+ this.workerChoiceStrategiesContext?.getPolicy().dynamicWorkerReady ===
+ true ||
+ this.workerChoiceStrategiesContext?.getPolicy().dynamicWorkerUsage ===
+ true
) {
workerNode.info.ready = true
}
this.sendToWorker(workerNodeKey, {
statistics: {
runTime:
- this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
+ this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
.runTime.aggregate ?? false,
elu:
- this.workerChoiceStrategyContext?.getTaskStatisticsRequirements().elu
- .aggregate ?? false
+ this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
+ .elu.aggregate ?? false
}
})
}
const workerNodeKey = this.workerNodes.indexOf(workerNode)
if (workerNodeKey !== -1) {
this.workerNodes.splice(workerNodeKey, 1)
- this.workerChoiceStrategyContext?.remove(workerNodeKey)
+ this.workerChoiceStrategiesContext?.remove(workerNodeKey)
}
this.checkAndEmitEmptyEvent()
}