// Send the statistics message to worker.
this.sendStatisticsMessageToWorker(workerNodeKey)
if (this.opts.enableTasksQueue === true) {
- // this.workerNodes[workerNodeKey].onEmptyQueue =
- // this.taskStealingOnEmptyQueue.bind(this)
+ this.workerNodes[workerNodeKey].onEmptyQueue =
+ this.taskStealingOnEmptyQueue.bind(this)
this.workerNodes[workerNodeKey].onBackPressure =
this.tasksStealingOnBackPressure.bind(this)
}
}
private redistributeQueuedTasks (workerNodeKey: number): void {
- const workerNodes = this.workerNodes.filter(
- (workerNode, workerNodeId) =>
- workerNode.info.ready && workerNodeId !== workerNodeKey
- )
while (this.tasksQueueSize(workerNodeKey) > 0) {
let destinationWorkerNodeKey: number = workerNodeKey
let minQueuedTasks = Infinity
let executeTask = false
- for (const [workerNodeId, workerNode] of workerNodes.entries()) {
+ for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
if (
+ workerNode.info.ready &&
+ workerNodeId !== workerNodeKey &&
workerNode.usage.tasks.executing <
- (this.opts.tasksQueueOptions?.concurrency as number)
+ (this.opts.tasksQueueOptions?.concurrency as number)
) {
executeTask = true
}
- if (workerNode.usage.tasks.queued === 0) {
+ if (
+ workerNode.info.ready &&
+ workerNodeId !== workerNodeKey &&
+ workerNode.usage.tasks.queued === 0
+ ) {
destinationWorkerNodeKey = workerNodeId
break
}
- if (workerNode.usage.tasks.queued < minQueuedTasks) {
+ if (
+ workerNode.info.ready &&
+ workerNodeId !== workerNodeKey &&
+ workerNode.usage.tasks.queued < minQueuedTasks
+ ) {
minQueuedTasks = workerNode.usage.tasks.queued
destinationWorkerNodeKey = workerNodeId
}
}
private taskStealingOnEmptyQueue (workerId: number): void {
+ const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
+ const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey]
const workerNodes = this.workerNodes
- .filter(
- (workerNode) => workerNode.info.ready && workerNode.info.id !== workerId
- )
+ .slice()
.sort(
(workerNodeA, workerNodeB) =>
workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
)
- const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
- const destinationWorkerNode = workerNodes[destinationWorkerNodeKey]
for (const sourceWorkerNode of workerNodes) {
- if (sourceWorkerNode.usage.tasks.queued > 0) {
+ if (
+ sourceWorkerNode.info.ready &&
+ sourceWorkerNode.info.id !== workerId &&
+ sourceWorkerNode.usage.tasks.queued > 0
+ ) {
+ const task = {
+ ...(sourceWorkerNode.popTask() as Task<Data>),
+ workerId: destinationWorkerNode.info.id as number
+ }
if (
- destinationWorkerNode?.usage?.tasks?.executing <
+ destinationWorkerNode.usage.tasks.executing <
(this.opts.tasksQueueOptions?.concurrency as number)
) {
- const task = {
- ...(sourceWorkerNode.popTask() as Task<Data>),
- workerId: destinationWorkerNode.info.id as number
- }
this.executeTask(destinationWorkerNodeKey, task)
} else {
- const task = {
- ...(sourceWorkerNode.popTask() as Task<Data>),
- workerId: destinationWorkerNode.info.id as number
- }
this.enqueueTask(destinationWorkerNodeKey, task)
}
break
const sourceWorkerNode =
this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
const workerNodes = this.workerNodes
- .filter(
- (workerNode) => workerNode.info.ready && workerNode.info.id !== workerId
- )
+ .slice()
.sort(
(workerNodeA, workerNodeB) =>
workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
)
for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
if (
+ workerNode.info.ready &&
+ workerNode.info.id !== workerId &&
sourceWorkerNode.usage.tasks.queued > 0 &&
!workerNode.hasBackPressure()
) {