/** @inheritDoc */
public get info (): PoolInfo {
+ const taskStatisticsRequirements =
+ this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
return {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
defaultStrategy: this.opts.workerChoiceStrategy!,
type: this.type,
version,
worker: this.worker,
- ...(this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
- .runTime.aggregate === true &&
- this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
- .waitTime.aggregate && {
+ ...(taskStatisticsRequirements?.runTime.aggregate === true &&
+ taskStatisticsRequirements.waitTime.aggregate && {
utilization: round(this.utilization),
}),
busyWorkerNodes: this.workerNodes.reduce(
0
),
}),
- ...(this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
- .runTime.aggregate === true && {
+ ...(taskStatisticsRequirements?.runTime.aggregate === true && {
runTime: {
maximum: round(
max(
)
)
),
- ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
- .runTime.average && {
+ ...(taskStatisticsRequirements.runTime.average && {
average: round(
average(
this.workerNodes.reduce<number[]>(
)
),
}),
- ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
- .runTime.median && {
+ ...(taskStatisticsRequirements.runTime.median && {
median: round(
median(
this.workerNodes.reduce<number[]>(
}),
},
}),
- ...(this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
- .waitTime.aggregate === true && {
+ ...(taskStatisticsRequirements?.waitTime.aggregate === true && {
waitTime: {
maximum: round(
max(
)
)
),
- ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
- .waitTime.average && {
+ ...(taskStatisticsRequirements.waitTime.average && {
average: round(
average(
this.workerNodes.reduce<number[]>(
)
),
}),
- ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
- .waitTime.median && {
+ ...(taskStatisticsRequirements.waitTime.median && {
median: round(
median(
this.workerNodes.reduce<number[]>(
}),
},
}),
- ...(this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
- .elu.aggregate === true && {
+ ...(taskStatisticsRequirements?.elu.aggregate === true && {
elu: {
active: {
maximum: round(
)
)
),
- ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
- .elu.average && {
+ ...(taskStatisticsRequirements.elu.average && {
average: round(
average(
this.workerNodes.reduce<number[]>(
)
),
}),
- ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
- .elu.median && {
+ ...(taskStatisticsRequirements.elu.median && {
median: round(
median(
this.workerNodes.reduce<number[]>(
)
)
),
- ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
- .elu.average && {
+ ...(taskStatisticsRequirements.elu.average && {
average: round(
average(
this.workerNodes.reduce<number[]>(
)
),
}),
- ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
- .elu.median && {
+ ...(taskStatisticsRequirements.elu.median && {
median: round(
median(
this.workerNodes.reduce<number[]>(
const poolTimeCapacity =
(performance.now() - this.startTimestamp) *
(this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
+ if (!Number.isFinite(poolTimeCapacity) || poolTimeCapacity <= 0) {
+ return 0
+ }
const totalTasksRunTime = this.workerNodes.reduce(
(accumulator, workerNode) =>
accumulator + (workerNode.usage.runTime.aggregate ?? 0),
* @returns Worker nodes back pressure boolean status.
*/
protected internalBackPressure (): boolean {
+ if (this.workerNodes.length === 0) return false
return (
this.workerNodes.reduce(
(accumulator, _, workerNodeKey) =>
* @returns Worker nodes busyness boolean status.
*/
protected internalBusy (): boolean {
+ if (this.workerNodes.length === 0) return false
return (
this.workerNodes.reduce(
(accumulator, _, workerNodeKey) =>
(workerNodeA, workerNodeB) =>
workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
)
- for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
+ for (const workerNode of workerNodes) {
if (sourceWorkerNode.usage.tasks.queued === 0) {
break
}
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
this.opts.tasksQueueOptions!.size! - sizeOffset
) {
+ const workerNodeKey = this.workerNodes.indexOf(workerNode)
workerNode.info.backPressureStealing = true
this.stealTask(sourceWorkerNode, workerNodeKey)
workerNode.info.backPressureStealing = false
* @param workerNode - The worker node.
*/
private initWorkerNodeUsage (workerNode: IWorkerNode<Worker, Data>): void {
- if (
+ const taskStatisticsRequirements =
this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
- .runTime.aggregate === true
- ) {
+ if (taskStatisticsRequirements?.runTime.aggregate === true) {
workerNode.usage.runTime.aggregate = min(
...this.workerNodes.map(
workerNode =>
)
)
}
- if (
- this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
- .waitTime.aggregate === true
- ) {
+ if (taskStatisticsRequirements?.waitTime.aggregate === true) {
workerNode.usage.waitTime.aggregate = min(
...this.workerNodes.map(
workerNode =>
)
)
}
- if (
- this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements().elu
- .aggregate === true
- ) {
+ if (taskStatisticsRequirements?.elu.aggregate === true) {
workerNode.usage.elu.active.aggregate = min(
...this.workerNodes.map(
workerNode =>
while (this.tasksQueueSize(sourceWorkerNodeKey) > 0) {
const destinationWorkerNodeKey = this.workerNodes.reduce(
(minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
- return sourceWorkerNodeKey !== workerNodeKey &&
- workerNode.info.ready &&
- workerNode.usage.tasks.queued <
- workerNodes[minWorkerNodeKey].usage.tasks.queued
+ if (workerNodeKey === sourceWorkerNodeKey || !workerNode.info.ready) {
+ return minWorkerNodeKey
+ }
+ if (minWorkerNodeKey === -1) {
+ return workerNodeKey
+ }
+ return workerNode.usage.tasks.queued <
+ workerNodes[minWorkerNodeKey].usage.tasks.queued
? workerNodeKey
: minWorkerNodeKey
},
- 0
+ -1
)
+ if (destinationWorkerNodeKey === -1) {
+ break
+ }
this.handleTask(
destinationWorkerNodeKey,
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
* @param workerNodeKey - The worker node key.
*/
private sendStatisticsMessageToWorker (workerNodeKey: number): void {
+ const taskStatisticsRequirements =
+ this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
this.sendToWorker(workerNodeKey, {
statistics: {
- elu:
- this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
- .elu.aggregate ?? false,
- runTime:
- this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
- .runTime.aggregate ?? false,
+ elu: taskStatisticsRequirements?.elu.aggregate ?? false,
+ runTime: taskStatisticsRequirements?.runTime.aggregate ?? false,
},
})
}
message: MessageValue<Data>
): Promise<boolean> {
const targetWorkerNodeKeys = [...this.workerNodes.keys()]
+ const responsesReceived: MessageValue<Response>[] = []
const taskFunctionOperationsListener = (
message: MessageValue<Response>,
resolve: (value: boolean | PromiseLike<boolean>) => void,
reject: (reason?: unknown) => void
): void => {
- const responsesReceived: MessageValue<Response>[] = []
this.checkMessageWorkerId(message)
if (
message.taskFunctionOperationStatus != null &&
}
destinationWorkerNode.info.stealing = true
sourceWorkerNode.info.stolen = true
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- const stolenTask = sourceWorkerNode.dequeueLastPrioritizedTask()!
+ const stolenTask = sourceWorkerNode.dequeueLastPrioritizedTask()
+ if (stolenTask == null) {
+ sourceWorkerNode.info.stolen = false
+ destinationWorkerNode.info.stealing = false
+ return
+ }
sourceWorkerNode.info.stolen = false
destinationWorkerNode.info.stealing = false
this.handleTask(destinationWorkerNodeKey, stolenTask)
private readonly workerNodeStealTask = (
workerNodeKey: number
): Task<Data> | undefined => {
+ const workerNode = this.workerNodes[workerNodeKey]
+ // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
+ if (workerNode == null) return
const workerNodes = this.workerNodes
.slice()
.sort(
workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
)
const sourceWorkerNode = workerNodes.find(
- (sourceWorkerNode, sourceWorkerNodeKey) =>
- sourceWorkerNodeKey !== workerNodeKey &&
+ sourceWorkerNode =>
+ sourceWorkerNode !== workerNode &&
sourceWorkerNode.usage.tasks.queued > 0
)
if (sourceWorkerNode != null) {