import { performance } from 'node:perf_hooks'
import type { TransferListItem } from 'node:worker_threads'
+import { defaultBucketSize } from '../priority-queue.js'
import type {
MessageValue,
PromiseResponseWrapper,
minimum: round(
min(
...this.workerNodes.map(
- workerNode => workerNode.usage.runTime.minimum ?? Infinity
+ workerNode =>
+ workerNode.usage.runTime.minimum ?? Number.POSITIVE_INFINITY
)
)
),
maximum: round(
max(
...this.workerNodes.map(
- workerNode => workerNode.usage.runTime.maximum ?? -Infinity
+ workerNode =>
+ workerNode.usage.runTime.maximum ?? Number.NEGATIVE_INFINITY
)
)
),
average(
this.workerNodes.reduce<number[]>(
(accumulator, workerNode) =>
- accumulator.concat(workerNode.usage.runTime.history),
+ accumulator.concat(
+ workerNode.usage.runTime.history.toArray()
+ ),
[]
)
)
median(
this.workerNodes.reduce<number[]>(
(accumulator, workerNode) =>
- accumulator.concat(workerNode.usage.runTime.history),
+ accumulator.concat(
+ workerNode.usage.runTime.history.toArray()
+ ),
[]
)
)
minimum: round(
min(
...this.workerNodes.map(
- workerNode => workerNode.usage.waitTime.minimum ?? Infinity
+ workerNode =>
+ workerNode.usage.waitTime.minimum ?? Number.POSITIVE_INFINITY
)
)
),
maximum: round(
max(
...this.workerNodes.map(
- workerNode => workerNode.usage.waitTime.maximum ?? -Infinity
+ workerNode =>
+ workerNode.usage.waitTime.maximum ?? Number.NEGATIVE_INFINITY
)
)
),
average(
this.workerNodes.reduce<number[]>(
(accumulator, workerNode) =>
- accumulator.concat(workerNode.usage.waitTime.history),
+ accumulator.concat(
+ workerNode.usage.waitTime.history.toArray()
+ ),
[]
)
)
median(
this.workerNodes.reduce<number[]>(
(accumulator, workerNode) =>
- accumulator.concat(workerNode.usage.waitTime.history),
+ accumulator.concat(
+ workerNode.usage.waitTime.history.toArray()
+ ),
[]
)
)
minimum: round(
min(
...this.workerNodes.map(
- workerNode => workerNode.usage.elu.idle.minimum ?? Infinity
+ workerNode =>
+ workerNode.usage.elu.idle.minimum ??
+ Number.POSITIVE_INFINITY
)
)
),
maximum: round(
max(
...this.workerNodes.map(
- workerNode => workerNode.usage.elu.idle.maximum ?? -Infinity
+ workerNode =>
+ workerNode.usage.elu.idle.maximum ??
+ Number.NEGATIVE_INFINITY
)
)
),
average(
this.workerNodes.reduce<number[]>(
(accumulator, workerNode) =>
- accumulator.concat(workerNode.usage.elu.idle.history),
+ accumulator.concat(
+ workerNode.usage.elu.idle.history.toArray()
+ ),
[]
)
)
median(
this.workerNodes.reduce<number[]>(
(accumulator, workerNode) =>
- accumulator.concat(workerNode.usage.elu.idle.history),
+ accumulator.concat(
+ workerNode.usage.elu.idle.history.toArray()
+ ),
[]
)
)
minimum: round(
min(
...this.workerNodes.map(
- workerNode => workerNode.usage.elu.active.minimum ?? Infinity
+ workerNode =>
+ workerNode.usage.elu.active.minimum ??
+ Number.POSITIVE_INFINITY
)
)
),
maximum: round(
max(
...this.workerNodes.map(
- workerNode => workerNode.usage.elu.active.maximum ?? -Infinity
+ workerNode =>
+ workerNode.usage.elu.active.maximum ??
+ Number.NEGATIVE_INFINITY
)
)
),
average(
this.workerNodes.reduce<number[]>(
(accumulator, workerNode) =>
- accumulator.concat(workerNode.usage.elu.active.history),
+ accumulator.concat(
+ workerNode.usage.elu.active.history.toArray()
+ ),
[]
)
)
median(
this.workerNodes.reduce<number[]>(
(accumulator, workerNode) =>
- accumulator.concat(workerNode.usage.elu.active.history),
+ accumulator.concat(
+ workerNode.usage.elu.active.history.toArray()
+ ),
[]
)
)
)
})
+ },
+ utilization: {
+ average: round(
+ average(
+ this.workerNodes.map(
+ workerNode => workerNode.usage.elu.utilization ?? 0
+ )
+ )
+ ),
+ median: round(
+ median(
+ this.workerNodes.map(
+ workerNode => workerNode.usage.elu.utilization ?? 0
+ )
+ )
+ )
}
}
})
}
if (requireSync) {
this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies(
- this.getWorkerWorkerChoiceStrategies(),
+ this.getWorkerChoiceStrategies(),
this.opts.workerChoiceStrategyOptions
)
for (const workerNodeKey of this.workerNodes.keys()) {
this.opts.workerChoiceStrategyOptions
)
this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies(
- this.getWorkerWorkerChoiceStrategies(),
+ this.getWorkerChoiceStrategies(),
this.opts.workerChoiceStrategyOptions
)
for (const workerNodeKey of this.workerNodes.keys()) {
})
this.taskFunctions.set(name, fn)
this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies(
- this.getWorkerWorkerChoiceStrategies()
+ this.getWorkerChoiceStrategies()
)
for (const workerNodeKey of this.workerNodes.keys()) {
this.sendStatisticsMessageToWorker(workerNodeKey)
}
this.taskFunctions.delete(name)
this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies(
- this.getWorkerWorkerChoiceStrategies()
+ this.getWorkerChoiceStrategies()
)
for (const workerNodeKey of this.workerNodes.keys()) {
this.sendStatisticsMessageToWorker(workerNodeKey)
}
/**
- * Gets task function strategy, if any.
+ * Gets task function worker choice strategy, if any.
*
* @param name - The task function name.
* @returns The task function worker choice strategy if the task function worker choice strategy is defined, `undefined` otherwise.
*/
- private readonly getTaskFunctionWorkerWorkerChoiceStrategy = (
+ private readonly getTaskFunctionWorkerChoiceStrategy = (
name?: string
): WorkerChoiceStrategy | undefined => {
- if (name != null) {
- return this.listTaskFunctionsProperties().find(
- (taskFunctionProperties: TaskFunctionProperties) =>
- taskFunctionProperties.name === name
- )?.strategy
+ name = name ?? DEFAULT_TASK_NAME
+ const taskFunctionsProperties = this.listTaskFunctionsProperties()
+ if (name === DEFAULT_TASK_NAME) {
+ name = taskFunctionsProperties[1]?.name
}
+ return taskFunctionsProperties.find(
+ (taskFunctionProperties: TaskFunctionProperties) =>
+ taskFunctionProperties.name === name
+ )?.strategy
+ }
+
+ /**
+ * Gets worker node task function worker choice strategy, if any.
+ *
+ * @param workerNodeKey - The worker node key.
+ * @param name - The task function name.
+ * @returns The worker node task function worker choice strategy if the worker node task function worker choice strategy is defined, `undefined` otherwise.
+ */
+ private readonly getWorkerNodeTaskFunctionWorkerChoiceStrategy = (
+ workerNodeKey: number,
+ name?: string
+ ): WorkerChoiceStrategy | undefined => {
+ const workerInfo = this.getWorkerInfo(workerNodeKey)
+ if (workerInfo == null) {
+ return
+ }
+ name = name ?? DEFAULT_TASK_NAME
+ if (name === DEFAULT_TASK_NAME) {
+ name = workerInfo.taskFunctionsProperties?.[1]?.name
+ }
+ return workerInfo.taskFunctionsProperties?.find(
+ (taskFunctionProperties: TaskFunctionProperties) =>
+ taskFunctionProperties.name === name
+ )?.strategy
}
/**
*
* @param workerNodeKey - The worker node key.
* @param name - The task function name.
- * @returns The task function worker choice priority if the task function worker choice priority is defined, `undefined` otherwise.
+ * @returns The worker node task function priority if the worker node task function priority is defined, `undefined` otherwise.
*/
private readonly getWorkerNodeTaskFunctionPriority = (
workerNodeKey: number,
name?: string
): number | undefined => {
- if (name != null) {
- return this.getWorkerInfo(workerNodeKey)?.taskFunctionsProperties?.find(
- (taskFunctionProperties: TaskFunctionProperties) =>
- taskFunctionProperties.name === name
- )?.priority
+ const workerInfo = this.getWorkerInfo(workerNodeKey)
+ if (workerInfo == null) {
+ return
+ }
+ name = name ?? DEFAULT_TASK_NAME
+ if (name === DEFAULT_TASK_NAME) {
+ name = workerInfo.taskFunctionsProperties?.[1]?.name
}
+ return workerInfo.taskFunctionsProperties?.find(
+ (taskFunctionProperties: TaskFunctionProperties) =>
+ taskFunctionProperties.name === name
+ )?.priority
}
/**
*
* @returns The worker choice strategies.
*/
- private readonly getWorkerWorkerChoiceStrategies =
+ private readonly getWorkerChoiceStrategies =
(): Set<WorkerChoiceStrategy> => {
return new Set([
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
return
}
const timestamp = performance.now()
- const taskFunctionStrategy =
- this.getTaskFunctionWorkerWorkerChoiceStrategy(name)
- const workerNodeKey = this.chooseWorkerNode(taskFunctionStrategy)
+ const workerNodeKey = this.chooseWorkerNode(name)
const task: Task<Data> = {
name: name ?? DEFAULT_TASK_NAME,
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions
data: data ?? ({} as Data),
priority: this.getWorkerNodeTaskFunctionPriority(workerNodeKey, name),
- strategy: taskFunctionStrategy,
+ strategy: this.getWorkerNodeTaskFunctionWorkerChoiceStrategy(
+ workerNodeKey,
+ name
+ ),
transferList,
timestamp,
taskId: randomUUID()
}
/**
- * Chooses a worker node for the next task given the worker choice strategy.
+ * Chooses a worker node for the next task.
*
- * @param workerChoiceStrategy - The worker choice strategy.
- * @returns The chosen worker node key
+ * @param name - The task function name.
+ * @returns The chosen worker node key.
*/
- private chooseWorkerNode (
- workerChoiceStrategy?: WorkerChoiceStrategy
- ): number {
+ private chooseWorkerNode (name?: string): number {
if (this.shallCreateDynamicWorker()) {
const workerNodeKey = this.createAndSetupDynamicWorkerNode()
if (
}
}
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- return this.workerChoiceStrategiesContext!.execute(workerChoiceStrategy)
+ return this.workerChoiceStrategiesContext!.execute(
+ this.getTaskFunctionWorkerChoiceStrategy(name)
+ )
}
/**
) {
workerNode.usage.runTime.aggregate = min(
...this.workerNodes.map(
- workerNode => workerNode.usage.runTime.aggregate ?? Infinity
+ workerNode =>
+ workerNode.usage.runTime.aggregate ?? Number.POSITIVE_INFINITY
)
)
}
) {
workerNode.usage.waitTime.aggregate = min(
...this.workerNodes.map(
- workerNode => workerNode.usage.waitTime.aggregate ?? Infinity
+ workerNode =>
+ workerNode.usage.waitTime.aggregate ?? Number.POSITIVE_INFINITY
)
)
}
) {
workerNode.usage.elu.active.aggregate = min(
...this.workerNodes.map(
- workerNode => workerNode.usage.elu.active.aggregate ?? Infinity
+ workerNode =>
+ workerNode.usage.elu.active.aggregate ?? Number.POSITIVE_INFINITY
)
)
}
const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
message.workerId
)
+ const workerInfo = this.getWorkerInfo(localWorkerNodeKey)
const workerUsage = this.workerNodes[localWorkerNodeKey]?.usage
// Kill message received from worker
if (
((this.opts.enableTasksQueue === false &&
workerUsage.tasks.executing === 0) ||
(this.opts.enableTasksQueue === true &&
+ workerInfo != null &&
+ !workerInfo.stealing &&
workerUsage.tasks.executing === 0 &&
this.tasksQueueSize(localWorkerNodeKey) === 0)))
) {
}
}
+ private setTasksQueuePriority (workerNodeKey: number): void {
+ this.workerNodes[workerNodeKey].setTasksQueuePriority(
+ this.getTasksQueuePriority()
+ )
+ }
+
/**
* This method is the message listener registered on each worker.
*/
if (workerInfo != null) {
workerInfo.taskFunctionsProperties = taskFunctionsProperties
this.sendStatisticsMessageToWorker(workerNodeKey)
+ this.setTasksQueuePriority(workerNodeKey)
}
} else if (taskId != null) {
// Task execution response received from worker
workerNode.info.ready = ready
workerNode.info.taskFunctionsProperties = taskFunctionsProperties
this.sendStatisticsMessageToWorker(workerNodeKey)
+ this.setTasksQueuePriority(workerNodeKey)
this.checkAndEmitReadyEvent()
}
return this.workerNodes[workerNodeKey]?.info
}
+ private getTasksQueuePriority (): boolean {
+ return this.listTaskFunctionsProperties().some(
+ taskFunctionProperties => taskFunctionProperties.priority != null
+ )
+ }
+
/**
* Creates a worker node.
*
getDefaultTasksQueueOptions(
this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
).size,
- tasksQueueBucketSize:
- (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers) * 2
+ tasksQueueBucketSize: defaultBucketSize,
+ tasksQueuePriority: this.getTasksQueuePriority()
}
)
// Flag the worker node as ready at pool startup.