X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=c6712d198bc28a9d77c08ae817e5c96c3b9227c6;hb=0fa1d6f0433baf6ff3f67fefdbdf610612a4f3e9;hp=3a1a87807cc065df871bb1ca6d2195619672a64d;hpb=30c799e98a2d088cbca774d347983a04ebc5f5a5;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 3a1a8780..c6712d19 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -1156,6 +1156,8 @@ export abstract class AbstractPool< // Send the statistics message to worker. this.sendStatisticsMessageToWorker(workerNodeKey) if (this.opts.enableTasksQueue === true) { + this.workerNodes[workerNodeKey].onEmptyQueue = + this.taskStealingOnEmptyQueue.bind(this) this.workerNodes[workerNodeKey].onBackPressure = this.tasksStealingOnBackPressure.bind(this) } @@ -1187,42 +1189,77 @@ export abstract class AbstractPool< } private redistributeQueuedTasks (workerNodeKey: number): void { - const workerNodes = this.workerNodes.filter( - (_, workerNodeId) => workerNodeId !== workerNodeKey - ) while (this.tasksQueueSize(workerNodeKey) > 0) { - let targetWorkerNodeKey: number = workerNodeKey + let destinationWorkerNodeKey: number = workerNodeKey let minQueuedTasks = Infinity let executeTask = false - for (const [workerNodeId, workerNode] of workerNodes.entries()) { + for (const [workerNodeId, workerNode] of this.workerNodes.entries()) { if ( + workerNode.info.ready && + workerNodeId !== workerNodeKey && workerNode.usage.tasks.executing < - (this.opts.tasksQueueOptions?.concurrency as number) + (this.opts.tasksQueueOptions?.concurrency as number) ) { executeTask = true } - if (workerNode.info.ready && workerNode.usage.tasks.queued === 0) { - targetWorkerNodeKey = workerNodeId + if ( + workerNode.info.ready && + workerNodeId !== workerNodeKey && + workerNode.usage.tasks.queued === 0 + ) { + destinationWorkerNodeKey = workerNodeId break } if ( workerNode.info.ready && + workerNodeId !== workerNodeKey && workerNode.usage.tasks.queued < minQueuedTasks ) { minQueuedTasks = workerNode.usage.tasks.queued - targetWorkerNodeKey = workerNodeId + destinationWorkerNodeKey = workerNodeId } } + const task = { + ...(this.dequeueTask(workerNodeKey) as Task), + workerId: (this.getWorkerInfo(destinationWorkerNodeKey) as WorkerInfo) + .id as number + } if (executeTask) { - this.executeTask( - targetWorkerNodeKey, - this.dequeueTask(workerNodeKey) as Task - ) + this.executeTask(destinationWorkerNodeKey, task) } else { - this.enqueueTask( - targetWorkerNodeKey, - this.dequeueTask(workerNodeKey) as Task - ) + this.enqueueTask(destinationWorkerNodeKey, task) + } + } + } + + private taskStealingOnEmptyQueue (workerId: number): void { + const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId) + const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey] + const workerNodes = this.workerNodes + .slice() + .sort( + (workerNodeA, workerNodeB) => + workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued + ) + for (const sourceWorkerNode of workerNodes) { + if ( + sourceWorkerNode.info.ready && + sourceWorkerNode.info.id !== workerId && + sourceWorkerNode.usage.tasks.queued > 0 + ) { + const task = { + ...(sourceWorkerNode.popTask() as Task), + workerId: destinationWorkerNode.info.id as number + } + if ( + destinationWorkerNode.usage.tasks.executing < + (this.opts.tasksQueueOptions?.concurrency as number) + ) { + this.executeTask(destinationWorkerNodeKey, task) + } else { + this.enqueueTask(destinationWorkerNodeKey, task) + } + break } } } @@ -1231,7 +1268,7 @@ export abstract class AbstractPool< const sourceWorkerNode = this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)] const workerNodes = this.workerNodes - .filter((workerNode) => workerNode.info.id !== workerId) + .slice() .sort( (workerNodeA, workerNodeB) => workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued @@ -1239,22 +1276,21 @@ export abstract class AbstractPool< for (const [workerNodeKey, workerNode] of workerNodes.entries()) { if ( workerNode.info.ready && + workerNode.info.id !== workerId && sourceWorkerNode.usage.tasks.queued > 0 && !workerNode.hasBackPressure() ) { + const task = { + ...(sourceWorkerNode.popTask() as Task), + workerId: workerNode.info.id as number + } if ( workerNode.usage.tasks.executing < (this.opts.tasksQueueOptions?.concurrency as number) ) { - this.executeTask( - workerNodeKey, - sourceWorkerNode.popTask() as Task - ) + this.executeTask(workerNodeKey, task) } else { - this.enqueueTask( - workerNodeKey, - sourceWorkerNode.popTask() as Task - ) + this.enqueueTask(workerNodeKey, task) } } }