From b5e75be8f136c28e738b90acb094b031b0517b07 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Sat, 18 Nov 2023 10:06:47 +0100 Subject: [PATCH] refactor: cleanup worker node event handler implementation MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- src/pools/abstract-pool.ts | 24 ++++++++++++------------ src/pools/worker-node.ts | 6 +++--- 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 94902356..de37c0f9 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -624,7 +624,7 @@ export abstract class AbstractPool< private setTaskStealing (): void { for (const [workerNodeKey] of this.workerNodes.entries()) { this.workerNodes[workerNodeKey].addEventListener( - 'emptyqueue', + 'emptyQueue', this.handleEmptyQueueEvent as EventListener ) } @@ -633,7 +633,7 @@ export abstract class AbstractPool< private unsetTaskStealing (): void { for (const [workerNodeKey] of this.workerNodes.entries()) { this.workerNodes[workerNodeKey].removeEventListener( - 'emptyqueue', + 'emptyQueue', this.handleEmptyQueueEvent as EventListener ) } @@ -642,7 +642,7 @@ export abstract class AbstractPool< private setTasksStealingOnBackPressure (): void { for (const [workerNodeKey] of this.workerNodes.entries()) { this.workerNodes[workerNodeKey].addEventListener( - 'backpressure', + 'backPressure', this.handleBackPressureEvent as EventListener ) } @@ -651,7 +651,7 @@ export abstract class AbstractPool< private unsetTasksStealingOnBackPressure (): void { for (const [workerNodeKey] of this.workerNodes.entries()) { this.workerNodes[workerNodeKey].removeEventListener( - 'backpressure', + 'backPressure', this.handleBackPressureEvent as EventListener ) } @@ -1396,13 +1396,13 @@ export abstract class AbstractPool< if (this.opts.enableTasksQueue === true) { if (this.opts.tasksQueueOptions?.taskStealing === true) { this.workerNodes[workerNodeKey].addEventListener( - 'emptyqueue', + 'emptyQueue', this.handleEmptyQueueEvent as EventListener ) } if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) { this.workerNodes[workerNodeKey].addEventListener( - 'backpressure', + 'backPressure', this.handleBackPressureEvent as EventListener ) } @@ -1476,9 +1476,8 @@ export abstract class AbstractPool< private readonly handleEmptyQueueEvent = ( event: CustomEvent ): void => { - const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId( - event.detail.workerId - ) + const { workerId } = event.detail + const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId) const workerNodes = this.workerNodes .slice() .sort( @@ -1488,7 +1487,7 @@ export abstract class AbstractPool< const sourceWorkerNode = workerNodes.find( workerNode => workerNode.info.ready && - workerNode.info.id !== event.detail.workerId && + workerNode.info.id !== workerId && workerNode.usage.tasks.queued > 0 ) if (sourceWorkerNode != null) { @@ -1508,12 +1507,13 @@ export abstract class AbstractPool< private readonly handleBackPressureEvent = ( event: CustomEvent ): void => { + const { workerId } = event.detail const sizeOffset = 1 if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) { return } const sourceWorkerNode = - this.workerNodes[this.getWorkerNodeKeyByWorkerId(event.detail.workerId)] + this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)] const workerNodes = this.workerNodes .slice() .sort( @@ -1524,7 +1524,7 @@ export abstract class AbstractPool< if ( sourceWorkerNode.usage.tasks.queued > 0 && workerNode.info.ready && - workerNode.info.id !== event.detail.workerId && + workerNode.info.id !== workerId && workerNode.usage.tasks.queued < (this.opts.tasksQueueOptions?.size as number) - sizeOffset ) { diff --git a/src/pools/worker-node.ts b/src/pools/worker-node.ts index de65f270..ef0133ac 100644 --- a/src/pools/worker-node.ts +++ b/src/pools/worker-node.ts @@ -81,7 +81,7 @@ export class WorkerNode if (this.hasBackPressure() && !this.onBackPressureStarted) { this.onBackPressureStarted = true this.dispatchEvent( - new CustomEvent('backpressure', { + new CustomEvent('backPressure', { detail: { workerId: this.info.id as number } }) ) @@ -96,7 +96,7 @@ export class WorkerNode if (this.hasBackPressure() && !this.onBackPressureStarted) { this.onBackPressureStarted = true this.dispatchEvent( - new CustomEvent('backpressure', { + new CustomEvent('backPressure', { detail: { workerId: this.info.id as number } }) ) @@ -189,7 +189,7 @@ export class WorkerNode } ++this.onEmptyQueueCount this.dispatchEvent( - new CustomEvent('emptyqueue', { + new CustomEvent('emptyQueue', { detail: { workerId: this.info.id as number } }) ) -- 2.34.1