private redistributeQueuedTasks (workerNodeKey: number): void {
while (this.tasksQueueSize(workerNodeKey) > 0) {
- let destinationWorkerNodeKey!: number
- let minQueuedTasks = Infinity
- for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
- if (workerNode.info.ready && workerNodeId !== workerNodeKey) {
- if (workerNode.usage.tasks.queued === 0) {
- destinationWorkerNodeKey = workerNodeId
- break
+ const destinationWorkerNodeKey = this.workerNodes.reduce(
+ (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
+ if (!workerNode.info.ready) {
+ return minWorkerNodeKey
}
- if (workerNode.usage.tasks.queued < minQueuedTasks) {
- minQueuedTasks = workerNode.usage.tasks.queued
- destinationWorkerNodeKey = workerNodeId
- }
- }
- }
+ return workerNode.usage.tasks.queued <
+ workerNodes[minWorkerNodeKey].usage.tasks.queued
+ ? workerNodeKey
+ : minWorkerNodeKey
+ },
+ 0
+ )
if (destinationWorkerNodeKey != null) {
const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey]
const task = {
(workerNodeA, workerNodeB) =>
workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
)
- for (const sourceWorkerNode of workerNodes) {
- if (sourceWorkerNode.usage.tasks.queued === 0) {
- break
+ const sourceWorkerNode = workerNodes.find(
+ (workerNode) =>
+ workerNode.info.ready &&
+ workerNode.info.id !== workerId &&
+ workerNode.usage.tasks.queued > 0
+ )
+ if (sourceWorkerNode != null) {
+ const task = {
+ ...(sourceWorkerNode.popTask() as Task<Data>),
+ workerId: destinationWorkerNode.info.id as number
}
- if (
- sourceWorkerNode.info.ready &&
- sourceWorkerNode.info.id !== workerId &&
- sourceWorkerNode.usage.tasks.queued > 0
- ) {
- const task = {
- ...(sourceWorkerNode.popTask() as Task<Data>),
- workerId: destinationWorkerNode.info.id as number
- }
- if (this.shallExecuteTask(destinationWorkerNodeKey)) {
- this.executeTask(destinationWorkerNodeKey, task)
- } else {
- this.enqueueTask(destinationWorkerNodeKey, task)
- }
- this.updateTaskStolenStatisticsWorkerUsage(
- destinationWorkerNodeKey,
- task.name as string
- )
- break
+ if (this.shallExecuteTask(destinationWorkerNodeKey)) {
+ this.executeTask(destinationWorkerNodeKey, task)
+ } else {
+ this.enqueueTask(destinationWorkerNodeKey, task)
}
+ this.updateTaskStolenStatisticsWorkerUsage(
+ destinationWorkerNodeKey,
+ task.name as string
+ )
}
}
private getWorkerNodeVirtualTaskStartTimestamp (
workerNodeKey: number
): number {
+ const virtualTaskEndTimestamp =
+ this.pool.workerNodes[workerNodeKey]?.strategyData
+ ?.virtualTaskEndTimestamp
const now = performance.now()
- return now <
- (this.pool.workerNodes[workerNodeKey]?.strategyData
- ?.virtualTaskEndTimestamp ?? -Infinity)
- ? (this.pool.workerNodes[workerNodeKey]?.strategyData
- ?.virtualTaskEndTimestamp as number)
+ return now < (virtualTaskEndTimestamp ?? -Infinity)
+ ? (virtualTaskEndTimestamp as number)
: now
}
}