From: Jérôme Benoit Date: Wed, 23 Aug 2023 12:31:29 +0000 (+0200) Subject: refactor: cleanup tasks recheduling code X-Git-Tag: v2.6.32~2 X-Git-Url: https://git.piment-noir.org/?a=commitdiff_plain;h=4de3d785b5e8987cab449269a73b7dfa42aca0cc;p=poolifier.git refactor: cleanup tasks recheduling code Signed-off-by: Jérôme Benoit --- diff --git a/src/deque.ts b/src/deque.ts index 49373cef..4a763fc7 100644 --- a/src/deque.ts +++ b/src/deque.ts @@ -154,6 +154,12 @@ export class Deque { } } + /** + * Returns an backward iterator for the deque. + * + * @returns An backward iterator for the deque. + * @see https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Iteration_protocols + */ backward (): Iterable { return { [Symbol.iterator]: (): Iterator => { diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index e61991f2..d1bb5bc4 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -1180,27 +1180,25 @@ export abstract class AbstractPool< } private redistributeQueuedTasks (workerNodeKey: number): void { + const workerNodes = this.workerNodes.filter( + (_, workerNodeId) => workerNodeId !== workerNodeKey + ) while (this.tasksQueueSize(workerNodeKey) > 0) { let targetWorkerNodeKey: number = workerNodeKey let minQueuedTasks = Infinity let executeTask = false - for (const [workerNodeId, workerNode] of this.workerNodes.entries()) { + for (const [workerNodeId, workerNode] of workerNodes.entries()) { if ( this.workerNodes[workerNodeId].usage.tasks.executing < (this.opts.tasksQueueOptions?.concurrency as number) ) { executeTask = true } - if ( - workerNodeId !== workerNodeKey && - workerNode.info.ready && - workerNode.usage.tasks.queued === 0 - ) { + if (workerNode.info.ready && workerNode.usage.tasks.queued === 0) { targetWorkerNodeKey = workerNodeId break } if ( - workerNodeId !== workerNodeKey && workerNode.info.ready && workerNode.usage.tasks.queued < minQueuedTasks ) { @@ -1235,25 +1233,22 @@ export abstract class AbstractPool< if ( workerNode.info.ready && sourceWorkerNode.usage.tasks.queued > 0 && - !workerNode.hasBackPressure() && - workerNode.usage.tasks.executing < - (this.opts.tasksQueueOptions?.concurrency as number) + !workerNode.hasBackPressure() ) { - this.executeTask( - workerNodeKey, - sourceWorkerNode.popTask() as Task - ) - } else if ( - workerNode.info.ready && - sourceWorkerNode.usage.tasks.queued > 0 && - !workerNode.hasBackPressure() && - workerNode.usage.tasks.executing >= + if ( + workerNode.usage.tasks.executing < (this.opts.tasksQueueOptions?.concurrency as number) - ) { - this.enqueueTask( - workerNodeKey, - sourceWorkerNode.popTask() as Task - ) + ) { + this.executeTask( + workerNodeKey, + sourceWorkerNode.popTask() as Task + ) + } else { + this.enqueueTask( + workerNodeKey, + sourceWorkerNode.popTask() as Task + ) + } } } }