X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=f0119f4ed3c8135e596d171a0f6ec093d7b29935;hb=c3eca146177be73305c08b03e16dccf531855e04;hp=1926a4a426fbcb84724ab55749778ff95c5910ee;hpb=f5d7178c9bf5e9c77fb69d191a2eadaf08539839;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 1926a4a4..f0119f4e 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -1142,6 +1142,10 @@ export abstract class AbstractPool< this.sendStartupMessageToWorker(workerNodeKey) // Send the statistics message to worker. this.sendStatisticsMessageToWorker(workerNodeKey) + if (this.opts.enableTasksQueue === true) { + this.workerNodes[workerNodeKey].onBackPressure = + this.tasksStealingOnBackPressure.bind(this) + } } /** @@ -1175,24 +1179,23 @@ export abstract class AbstractPool< let minQueuedTasks = Infinity let executeTask = false for (const [workerNodeId, workerNode] of this.workerNodes.entries()) { - const workerInfo = this.getWorkerInfo(workerNodeId) as WorkerInfo + if ( + this.workerNodes[workerNodeId].usage.tasks.executing < + (this.opts.tasksQueueOptions?.concurrency as number) + ) { + executeTask = true + } if ( workerNodeId !== workerNodeKey && - workerInfo.ready && + workerNode.info.ready && workerNode.usage.tasks.queued === 0 ) { - if ( - this.workerNodes[workerNodeId].usage.tasks.executing < - (this.opts.tasksQueueOptions?.concurrency as number) - ) { - executeTask = true - } targetWorkerNodeKey = workerNodeId break } if ( workerNodeId !== workerNodeKey && - workerInfo.ready && + workerNode.info.ready && workerNode.usage.tasks.queued < minQueuedTasks ) { minQueuedTasks = workerNode.usage.tasks.queued @@ -1202,12 +1205,48 @@ export abstract class AbstractPool< if (executeTask) { this.executeTask( targetWorkerNodeKey, - this.dequeueTask(workerNodeKey) as Task + this.popTask(workerNodeKey) as Task ) } else { this.enqueueTask( targetWorkerNodeKey, - this.dequeueTask(workerNodeKey) as Task + this.popTask(workerNodeKey) as Task + ) + } + } + } + + private tasksStealingOnBackPressure (workerId: number): void { + const sourceWorkerNode = + this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)] + const workerNodes = this.workerNodes + .filter((workerNode) => workerNode.info.id !== workerId) + .sort( + (workerNodeA, workerNodeB) => + workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued + ) + for (const [workerNodeKey, workerNode] of workerNodes.entries()) { + if ( + workerNode.info.ready && + sourceWorkerNode.usage.tasks.queued > 0 && + !workerNode.hasBackPressure() && + workerNode.usage.tasks.executing < + (this.opts.tasksQueueOptions?.concurrency as number) + ) { + this.executeTask( + workerNodeKey, + sourceWorkerNode.popTask() as Task + ) + } else if ( + workerNode.info.ready && + sourceWorkerNode.usage.tasks.queued > 0 && + !workerNode.hasBackPressure() && + workerNode.usage.tasks.executing >= + (this.opts.tasksQueueOptions?.concurrency as number) + ) { + this.enqueueTask( + workerNodeKey, + sourceWorkerNode.popTask() as Task ) } } @@ -1387,6 +1426,10 @@ export abstract class AbstractPool< return this.workerNodes[workerNodeKey].dequeueTask() } + private popTask (workerNodeKey: number): Task | undefined { + return this.workerNodes[workerNodeKey].popTask() + } + private tasksQueueSize (workerNodeKey: number): number { return this.workerNodes[workerNodeKey].tasksQueueSize() }