From 4de3d785b5e8987cab449269a73b7dfa42aca0cc Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Wed, 23 Aug 2023 14:31:29 +0200 Subject: [PATCH] refactor: cleanup tasks recheduling code MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- src/deque.ts | 6 ++++++ src/pools/abstract-pool.ts | 43 +++++++++++++++++--------------------- 2 files changed, 25 insertions(+), 24 deletions(-) diff --git a/src/deque.ts b/src/deque.ts index 49373cef..4a763fc7 100644 --- a/src/deque.ts +++ b/src/deque.ts @@ -154,6 +154,12 @@ export class Deque { } } + /** + * Returns an backward iterator for the deque. + * + * @returns An backward iterator for the deque. + * @see https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Iteration_protocols + */ backward (): Iterable { return { [Symbol.iterator]: (): Iterator => { diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index e61991f2..d1bb5bc4 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -1180,27 +1180,25 @@ export abstract class AbstractPool< } private redistributeQueuedTasks (workerNodeKey: number): void { + const workerNodes = this.workerNodes.filter( + (_, workerNodeId) => workerNodeId !== workerNodeKey + ) while (this.tasksQueueSize(workerNodeKey) > 0) { let targetWorkerNodeKey: number = workerNodeKey let minQueuedTasks = Infinity let executeTask = false - for (const [workerNodeId, workerNode] of this.workerNodes.entries()) { + for (const [workerNodeId, workerNode] of workerNodes.entries()) { if ( this.workerNodes[workerNodeId].usage.tasks.executing < (this.opts.tasksQueueOptions?.concurrency as number) ) { executeTask = true } - if ( - workerNodeId !== workerNodeKey && - workerNode.info.ready && - workerNode.usage.tasks.queued === 0 - ) { + if (workerNode.info.ready && workerNode.usage.tasks.queued === 0) { targetWorkerNodeKey = workerNodeId break } if ( - workerNodeId !== workerNodeKey && workerNode.info.ready && workerNode.usage.tasks.queued < minQueuedTasks ) { @@ -1235,25 +1233,22 @@ export abstract class AbstractPool< if ( workerNode.info.ready && sourceWorkerNode.usage.tasks.queued > 0 && - !workerNode.hasBackPressure() && - workerNode.usage.tasks.executing < - (this.opts.tasksQueueOptions?.concurrency as number) + !workerNode.hasBackPressure() ) { - this.executeTask( - workerNodeKey, - sourceWorkerNode.popTask() as Task - ) - } else if ( - workerNode.info.ready && - sourceWorkerNode.usage.tasks.queued > 0 && - !workerNode.hasBackPressure() && - workerNode.usage.tasks.executing >= + if ( + workerNode.usage.tasks.executing < (this.opts.tasksQueueOptions?.concurrency as number) - ) { - this.enqueueTask( - workerNodeKey, - sourceWorkerNode.popTask() as Task - ) + ) { + this.executeTask( + workerNodeKey, + sourceWorkerNode.popTask() as Task + ) + } else { + this.enqueueTask( + workerNodeKey, + sourceWorkerNode.popTask() as Task + ) + } } } } -- 2.34.1