refactor: cleanup worker node event handler implementation
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Sat, 18 Nov 2023 09:06:47 +0000 (10:06 +0100)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Sat, 18 Nov 2023 09:06:47 +0000 (10:06 +0100)
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
src/pools/abstract-pool.ts
src/pools/worker-node.ts

index 94902356f44a7ac180b1282bdeeda70590a39a01..de37c0f9266cc6d6b6ca37fdd7c174ccd94afaec 100644 (file)
@@ -624,7 +624,7 @@ export abstract class AbstractPool<
   private setTaskStealing (): void {
     for (const [workerNodeKey] of this.workerNodes.entries()) {
       this.workerNodes[workerNodeKey].addEventListener(
-        'emptyqueue',
+        'emptyQueue',
         this.handleEmptyQueueEvent as EventListener
       )
     }
@@ -633,7 +633,7 @@ export abstract class AbstractPool<
   private unsetTaskStealing (): void {
     for (const [workerNodeKey] of this.workerNodes.entries()) {
       this.workerNodes[workerNodeKey].removeEventListener(
-        'emptyqueue',
+        'emptyQueue',
         this.handleEmptyQueueEvent as EventListener
       )
     }
@@ -642,7 +642,7 @@ export abstract class AbstractPool<
   private setTasksStealingOnBackPressure (): void {
     for (const [workerNodeKey] of this.workerNodes.entries()) {
       this.workerNodes[workerNodeKey].addEventListener(
-        'backpressure',
+        'backPressure',
         this.handleBackPressureEvent as EventListener
       )
     }
@@ -651,7 +651,7 @@ export abstract class AbstractPool<
   private unsetTasksStealingOnBackPressure (): void {
     for (const [workerNodeKey] of this.workerNodes.entries()) {
       this.workerNodes[workerNodeKey].removeEventListener(
-        'backpressure',
+        'backPressure',
         this.handleBackPressureEvent as EventListener
       )
     }
@@ -1396,13 +1396,13 @@ export abstract class AbstractPool<
     if (this.opts.enableTasksQueue === true) {
       if (this.opts.tasksQueueOptions?.taskStealing === true) {
         this.workerNodes[workerNodeKey].addEventListener(
-          'emptyqueue',
+          'emptyQueue',
           this.handleEmptyQueueEvent as EventListener
         )
       }
       if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
         this.workerNodes[workerNodeKey].addEventListener(
-          'backpressure',
+          'backPressure',
           this.handleBackPressureEvent as EventListener
         )
       }
@@ -1476,9 +1476,8 @@ export abstract class AbstractPool<
   private readonly handleEmptyQueueEvent = (
     event: CustomEvent<WorkerNodeEventDetail>
   ): void => {
-    const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
-      event.detail.workerId
-    )
+    const { workerId } = event.detail
+    const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
     const workerNodes = this.workerNodes
       .slice()
       .sort(
@@ -1488,7 +1487,7 @@ export abstract class AbstractPool<
     const sourceWorkerNode = workerNodes.find(
       workerNode =>
         workerNode.info.ready &&
-        workerNode.info.id !== event.detail.workerId &&
+        workerNode.info.id !== workerId &&
         workerNode.usage.tasks.queued > 0
     )
     if (sourceWorkerNode != null) {
@@ -1508,12 +1507,13 @@ export abstract class AbstractPool<
   private readonly handleBackPressureEvent = (
     event: CustomEvent<WorkerNodeEventDetail>
   ): void => {
+    const { workerId } = event.detail
     const sizeOffset = 1
     if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) {
       return
     }
     const sourceWorkerNode =
-      this.workerNodes[this.getWorkerNodeKeyByWorkerId(event.detail.workerId)]
+      this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
     const workerNodes = this.workerNodes
       .slice()
       .sort(
@@ -1524,7 +1524,7 @@ export abstract class AbstractPool<
       if (
         sourceWorkerNode.usage.tasks.queued > 0 &&
         workerNode.info.ready &&
-        workerNode.info.id !== event.detail.workerId &&
+        workerNode.info.id !== workerId &&
         workerNode.usage.tasks.queued <
           (this.opts.tasksQueueOptions?.size as number) - sizeOffset
       ) {
index de65f27012634c493d697a0ae9e3972a7f5234db..ef0133ac716b2a01087799154f3ff414518309cc 100644 (file)
@@ -81,7 +81,7 @@ export class WorkerNode<Worker extends IWorker, Data = unknown>
     if (this.hasBackPressure() && !this.onBackPressureStarted) {
       this.onBackPressureStarted = true
       this.dispatchEvent(
-        new CustomEvent<WorkerNodeEventDetail>('backpressure', {
+        new CustomEvent<WorkerNodeEventDetail>('backPressure', {
           detail: { workerId: this.info.id as number }
         })
       )
@@ -96,7 +96,7 @@ export class WorkerNode<Worker extends IWorker, Data = unknown>
     if (this.hasBackPressure() && !this.onBackPressureStarted) {
       this.onBackPressureStarted = true
       this.dispatchEvent(
-        new CustomEvent<WorkerNodeEventDetail>('backpressure', {
+        new CustomEvent<WorkerNodeEventDetail>('backPressure', {
           detail: { workerId: this.info.id as number }
         })
       )
@@ -189,7 +189,7 @@ export class WorkerNode<Worker extends IWorker, Data = unknown>
     }
     ++this.onEmptyQueueCount
     this.dispatchEvent(
-      new CustomEvent<WorkerNodeEventDetail>('emptyqueue', {
+      new CustomEvent<WorkerNodeEventDetail>('emptyQueue', {
         detail: { workerId: this.info.id as number }
       })
     )