fix: ensure a dynamic scheduled for removal can't be used
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Tue, 17 Oct 2023 14:42:34 +0000 (16:42 +0200)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Tue, 17 Oct 2023 14:42:34 +0000 (16:42 +0200)
closes #1468 and #1468

Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
12 files changed:
README.md
src/pools/abstract-pool.ts
src/pools/cluster/fixed.ts
src/pools/selection-strategies/abstract-worker-choice-strategy.ts
src/pools/selection-strategies/fair-share-worker-choice-strategy.ts
src/pools/selection-strategies/least-busy-worker-choice-strategy.ts
src/pools/selection-strategies/least-elu-worker-choice-strategy.ts
src/pools/selection-strategies/least-used-worker-choice-strategy.ts
src/pools/selection-strategies/round-robin-worker-choice-strategy.ts
src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts
src/pools/thread/fixed.ts
src/pools/worker.ts

index 6db467fdbef07444b65082a53740b37f8a5d06b8..9e441e23b24e591b1e5d498422c09b871de2d8c8 100644 (file)
--- a/README.md
+++ b/README.md
@@ -18,7 +18,7 @@
 [![Javascript Standard Style Guide](<https://badgen.net/static/code style/standard/green>)](https://standardjs.com)
 [![Discord](https://badgen.net/discord/online-members/vXxZhyb3b6?icon=discord&label=discord&color=green)](https://discord.gg/vXxZhyb3b6)
 [![Open Collective](https://opencollective.com/poolifier/tiers/badge.svg)](https://opencollective.com/poolifier)
-[![PRs Welcome](https://badgen.net/static/PRs/welcome/green)](http://makeapullrequest.com)
+[![PRs Welcome](https://badgen.net/static/PRs/welcome/green)](https://makeapullrequest.com)
 [![No Dependencies](<https://badgen.net/static/dependencies/no dependencies/green>)](<https://badgen.net/static/dependencies/no dependencies/green>)
 
 </div>
index 8e0a259af8ea3e6f0edad24f29dbb9b4e81821fe..9601564fa6458ba4c10fff62bbc265660c9ec26a 100644 (file)
@@ -1276,6 +1276,8 @@ export abstract class AbstractPool<
               workerUsage.tasks.executing === 0 &&
               this.tasksQueueSize(localWorkerNodeKey) === 0)))
       ) {
+        // Flag the worker as not ready immediately
+        this.flagWorkerNodeAsNotReady(localWorkerNodeKey)
         this.destroyWorkerNode(localWorkerNodeKey).catch(error => {
           this.emitter?.emit(PoolEvents.error, error)
         })
@@ -1641,6 +1643,10 @@ export abstract class AbstractPool<
     }
   }
 
+  protected flagWorkerNodeAsNotReady (workerNodeKey: number): void {
+    this.getWorkerInfo(workerNodeKey).ready = false
+  }
+
   /** @inheritDoc */
   public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
     return (
index 68ae50050ba24d38ac1cd2b6119af626d248e9c6..5f067bd7fb4a716f20297f2c907c7632a9607a1b 100644 (file)
@@ -61,6 +61,7 @@ export class FixedClusterPool<
 
   /** @inheritDoc */
   protected async destroyWorkerNode (workerNodeKey: number): Promise<void> {
+    this.flagWorkerNodeAsNotReady(workerNodeKey)
     this.flushTasksQueue(workerNodeKey)
     // FIXME: wait for tasks to be finished
     const workerNode = this.workerNodes[workerNodeKey]
index 68e09238b204c35e4f6bb87392021f314e3e2923..75c7450d8ccc5f8b73d74b5404f4410f4376a46e 100644 (file)
@@ -122,8 +122,8 @@ export abstract class AbstractWorkerChoiceStrategy<
    * @param workerNodeKey - The worker node key.
    * @returns Whether the worker node is ready or not.
    */
-  private isWorkerNodeReady (workerNodeKey: number): boolean {
-    return this.pool.workerNodes[workerNodeKey]?.info.ready
+  protected isWorkerNodeReady (workerNodeKey: number): boolean {
+    return this.pool.workerNodes[workerNodeKey]?.info.ready ?? false
   }
 
   /**
@@ -132,26 +132,10 @@ export abstract class AbstractWorkerChoiceStrategy<
    * @param workerNodeKey - The worker node key.
    * @returns `true` if the worker node has back pressure, `false` otherwise.
    */
-  private hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
+  protected hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
     return this.pool.hasWorkerNodeBackPressure(workerNodeKey)
   }
 
-  /**
-   * Whether the worker node is eligible or not.
-   * A worker node is eligible if it is ready and does not have back pressure.
-   *
-   * @param workerNodeKey - The worker node key.
-   * @returns `true` if the worker node is eligible, `false` otherwise.
-   * @see {@link isWorkerNodeReady}
-   * @see {@link hasWorkerNodeBackPressure}
-   */
-  protected isWorkerNodeEligible (workerNodeKey: number): boolean {
-    return (
-      this.isWorkerNodeReady(workerNodeKey) &&
-      !this.hasWorkerNodeBackPressure(workerNodeKey)
-    )
-  }
-
   /**
    * Gets the worker node task runtime.
    * If the task statistics require the average runtime, the average runtime is returned.
@@ -203,15 +187,6 @@ export abstract class AbstractWorkerChoiceStrategy<
     this.previousWorkerNodeKey = workerNodeKey ?? this.previousWorkerNodeKey
   }
 
-  /**
-   * Check the next worker node eligibility.
-   */
-  protected checkNextWorkerNodeEligibility (): void {
-    if (!this.isWorkerNodeEligible(this.nextWorkerNodeKey as number)) {
-      delete this.nextWorkerNodeKey
-    }
-  }
-
   protected computeDefaultWorkerWeight (): number {
     let cpusCycleTimeWeight = 0
     for (const cpu of cpus()) {
index a337278ba9ce75a074c1b32d751d8f928ec86496..f47a1e6e9ba7724fa665235a2a9204219ed2220e 100644 (file)
@@ -89,9 +89,10 @@ export class FairShareWorkerChoiceStrategy<
               this.computeWorkerNodeVirtualTaskEndTimestamp(workerNodeKey)
           }
         }
-        return (workerNode.strategyData.virtualTaskEndTimestamp as number) <
-          ((workerNodes[minWorkerNodeKey].strategyData as StrategyData)
-            .virtualTaskEndTimestamp as number)
+        return this.isWorkerNodeReady(workerNodeKey) &&
+          (workerNode.strategyData.virtualTaskEndTimestamp as number) <
+            ((workerNodes[minWorkerNodeKey].strategyData as StrategyData)
+              .virtualTaskEndTimestamp as number)
           ? workerNodeKey
           : minWorkerNodeKey
       },
index 984340b552f458c1f2d9704f2767dc39e329d649..81cadde164c9d4371179546fe6fff1a275bba5a7 100644 (file)
@@ -74,10 +74,11 @@ export class LeastBusyWorkerChoiceStrategy<
   private leastBusyNextWorkerNodeKey (): number | undefined {
     return this.pool.workerNodes.reduce(
       (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
-        return (workerNode.usage.runTime.aggregate ?? 0) +
-          (workerNode.usage.waitTime.aggregate ?? 0) <
-          (workerNodes[minWorkerNodeKey].usage.runTime.aggregate ?? 0) +
-            (workerNodes[minWorkerNodeKey].usage.waitTime.aggregate ?? 0)
+        return this.isWorkerNodeReady(workerNodeKey) &&
+          (workerNode.usage.runTime.aggregate ?? 0) +
+            (workerNode.usage.waitTime.aggregate ?? 0) <
+            (workerNodes[minWorkerNodeKey].usage.runTime.aggregate ?? 0) +
+              (workerNodes[minWorkerNodeKey].usage.waitTime.aggregate ?? 0)
           ? workerNodeKey
           : minWorkerNodeKey
       },
index 0fe9cbec0275513ee8d6feed443915cf77536f07..2ad4e90d965bb3aec2b372e417a052a4c13f43ed 100644 (file)
@@ -70,8 +70,9 @@ export class LeastEluWorkerChoiceStrategy<
   private leastEluNextWorkerNodeKey (): number | undefined {
     return this.pool.workerNodes.reduce(
       (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
-        return (workerNode.usage.elu.active.aggregate ?? 0) <
-          (workerNodes[minWorkerNodeKey].usage.elu.active.aggregate ?? 0)
+        return this.isWorkerNodeReady(workerNodeKey) &&
+          (workerNode.usage.elu.active.aggregate ?? 0) <
+            (workerNodes[minWorkerNodeKey].usage.elu.active.aggregate ?? 0)
           ? workerNodeKey
           : minWorkerNodeKey
       },
index dc249e6523b9b792d52063185b1dffc3e8e37f6b..1bd8d059fd5f9bc40484db9bbece55d319eddcbc 100644 (file)
@@ -55,12 +55,13 @@ export class LeastUsedWorkerChoiceStrategy<
   private leastUsedNextWorkerNodeKey (): number | undefined {
     return this.pool.workerNodes.reduce(
       (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
-        return workerNode.usage.tasks.executed +
-          workerNode.usage.tasks.executing +
-          workerNode.usage.tasks.queued <
-          workerNodes[minWorkerNodeKey].usage.tasks.executed +
-            workerNodes[minWorkerNodeKey].usage.tasks.executing +
-            workerNodes[minWorkerNodeKey].usage.tasks.queued
+        return this.isWorkerNodeReady(workerNodeKey) &&
+          workerNode.usage.tasks.executed +
+            workerNode.usage.tasks.executing +
+            workerNode.usage.tasks.queued <
+            workerNodes[minWorkerNodeKey].usage.tasks.executed +
+              workerNodes[minWorkerNodeKey].usage.tasks.executing +
+              workerNodes[minWorkerNodeKey].usage.tasks.queued
           ? workerNodeKey
           : minWorkerNodeKey
       },
index aa3ee108ced7d528f1d5ea90204c4306f841226c..19dd3872c116723756fb146a4a57506b10523ab0 100644 (file)
@@ -70,10 +70,12 @@ export class RoundRobinWorkerChoiceStrategy<
   }
 
   private roundRobinNextWorkerNodeKey (): number | undefined {
-    this.nextWorkerNodeKey =
-      this.nextWorkerNodeKey === this.pool.workerNodes.length - 1
-        ? 0
-        : (this.nextWorkerNodeKey ?? this.previousWorkerNodeKey) + 1
+    do {
+      this.nextWorkerNodeKey =
+        this.nextWorkerNodeKey === this.pool.workerNodes.length - 1
+          ? 0
+          : (this.nextWorkerNodeKey ?? this.previousWorkerNodeKey) + 1
+    } while (!this.isWorkerNodeReady(this.nextWorkerNodeKey))
     return this.nextWorkerNodeKey
   }
 }
index b9b74b6cb96634f665628d1032fd0b37a83ddef9..22bf458febd3734a7f147c27172b486acdde5169 100644 (file)
@@ -95,23 +95,25 @@ export class WeightedRoundRobinWorkerChoiceStrategy<
   }
 
   private weightedRoundRobinNextWorkerNodeKey (): number | undefined {
-    const workerWeight =
-      this.opts.weights?.[
-        this.nextWorkerNodeKey ?? this.previousWorkerNodeKey
-      ] ?? this.defaultWorkerWeight
-    if (this.workerNodeVirtualTaskRunTime < workerWeight) {
-      this.workerNodeVirtualTaskRunTime =
-        this.workerNodeVirtualTaskRunTime +
-        this.getWorkerNodeTaskRunTime(
+    do {
+      const workerWeight =
+        this.opts.weights?.[
           this.nextWorkerNodeKey ?? this.previousWorkerNodeKey
-        )
-    } else {
-      this.nextWorkerNodeKey =
-        this.nextWorkerNodeKey === this.pool.workerNodes.length - 1
-          ? 0
-          : (this.nextWorkerNodeKey ?? this.previousWorkerNodeKey) + 1
-      this.workerNodeVirtualTaskRunTime = 0
-    }
+        ] ?? this.defaultWorkerWeight
+      if (this.workerNodeVirtualTaskRunTime < workerWeight) {
+        this.workerNodeVirtualTaskRunTime =
+          this.workerNodeVirtualTaskRunTime +
+          this.getWorkerNodeTaskRunTime(
+            this.nextWorkerNodeKey ?? this.previousWorkerNodeKey
+          )
+      } else {
+        this.nextWorkerNodeKey =
+          this.nextWorkerNodeKey === this.pool.workerNodes.length - 1
+            ? 0
+            : (this.nextWorkerNodeKey ?? this.previousWorkerNodeKey) + 1
+        this.workerNodeVirtualTaskRunTime = 0
+      }
+    } while (!this.isWorkerNodeReady(this.nextWorkerNodeKey as number))
     return this.nextWorkerNodeKey
   }
 }
index 7d47e81ae937abe695a87841babed4011873fc7a..bf111d39482528c4078f13e5c76557d565895f7f 100644 (file)
@@ -58,6 +58,7 @@ export class FixedThreadPool<
 
   /** @inheritDoc */
   protected async destroyWorkerNode (workerNodeKey: number): Promise<void> {
+    this.flagWorkerNodeAsNotReady(workerNodeKey)
     this.flushTasksQueue(workerNodeKey)
     // FIXME: wait for tasks to be finished
     const workerNode = this.workerNodes[workerNodeKey]
@@ -80,7 +81,7 @@ export class FixedThreadPool<
     transferList?: TransferListItem[]
   ): void {
     (
-      this.workerNodes[workerNodeKey].messageChannel as MessageChannel
+      this.workerNodes[workerNodeKey]?.messageChannel as MessageChannel
     ).port1.postMessage(
       { ...message, workerId: this.getWorkerInfo(workerNodeKey).id },
       transferList
@@ -108,7 +109,7 @@ export class FixedThreadPool<
     listener: (message: MessageValue<Message>) => void
   ): void {
     (
-      this.workerNodes[workerNodeKey].messageChannel as MessageChannel
+      this.workerNodes[workerNodeKey]?.messageChannel as MessageChannel
     ).port1.on('message', listener)
   }
 
@@ -118,7 +119,7 @@ export class FixedThreadPool<
     listener: (message: MessageValue<Message>) => void
   ): void {
     (
-      this.workerNodes[workerNodeKey].messageChannel as MessageChannel
+      this.workerNodes[workerNodeKey]?.messageChannel as MessageChannel
     ).port1.once('message', listener)
   }
 
@@ -128,7 +129,7 @@ export class FixedThreadPool<
     listener: (message: MessageValue<Message>) => void
   ): void {
     (
-      this.workerNodes[workerNodeKey].messageChannel as MessageChannel
+      this.workerNodes[workerNodeKey]?.messageChannel as MessageChannel
     ).port1.off('message', listener)
   }
 
index 43e7fc608cb9be8d569ade1acc7dc72fe2194cad..03cecce849a2cb9d71ad04a566795d3588238509 100644 (file)
@@ -240,7 +240,7 @@ export interface IWorkerNode<Worker extends IWorker, Data = unknown>
   readonly usage: WorkerUsage
   /**
    * Worker choice strategy data.
-   * This is used to store data that is specific to the worker choice strategy.
+   * This is used to store data that are specific to the worker choice strategy.
    */
   strategyData?: StrategyData
   /**