perf: build worker choice strategies policy and task equirements on
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Mon, 29 Apr 2024 09:58:13 +0000 (11:58 +0200)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Mon, 29 Apr 2024 09:58:13 +0000 (11:58 +0200)
demand

Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
src/pools/abstract-pool.ts
src/pools/pool.ts
src/pools/selection-strategies/selection-strategies-utils.ts
src/pools/selection-strategies/worker-choice-strategies-context.ts

index c510b2ad99495226456b880ddd83edd760728b10..5e88a7cd3392e98ae9e4757a4d4a00c174b4d471 100644 (file)
@@ -547,9 +547,12 @@ export abstract class AbstractPool<
     workerChoiceStrategy: WorkerChoiceStrategy,
     workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
   ): void {
+    let requireSync = false
     checkValidWorkerChoiceStrategy(workerChoiceStrategy)
     if (workerChoiceStrategyOptions != null) {
-      this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
+      requireSync = this.setWorkerChoiceStrategyOptions(
+        workerChoiceStrategyOptions
+      )
     }
     if (workerChoiceStrategy !== this.opts.workerChoiceStrategy) {
       this.opts.workerChoiceStrategy = workerChoiceStrategy
@@ -557,11 +560,14 @@ export abstract class AbstractPool<
         this.opts.workerChoiceStrategy,
         this.opts.workerChoiceStrategyOptions
       )
+      requireSync = true
+    }
+    if (requireSync) {
       this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies(
         this.getWorkerWorkerChoiceStrategies(),
         this.opts.workerChoiceStrategyOptions
       )
-      for (const [workerNodeKey] of this.workerNodes.entries()) {
+      for (const workerNodeKey of this.workerNodes.keys()) {
         this.sendStatisticsMessageToWorker(workerNodeKey)
       }
     }
@@ -570,14 +576,23 @@ export abstract class AbstractPool<
   /** @inheritDoc */
   public setWorkerChoiceStrategyOptions (
     workerChoiceStrategyOptions: WorkerChoiceStrategyOptions | undefined
-  ): void {
+  ): boolean {
     this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
     if (workerChoiceStrategyOptions != null) {
       this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
+      this.workerChoiceStrategiesContext?.setOptions(
+        this.opts.workerChoiceStrategyOptions
+      )
+      this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies(
+        this.getWorkerWorkerChoiceStrategies(),
+        this.opts.workerChoiceStrategyOptions
+      )
+      for (const workerNodeKey of this.workerNodes.keys()) {
+        this.sendStatisticsMessageToWorker(workerNodeKey)
+      }
+      return true
     }
-    this.workerChoiceStrategiesContext?.setOptions(
-      this.opts.workerChoiceStrategyOptions
-    )
+    return false
   }
 
   /** @inheritDoc */
@@ -639,13 +654,13 @@ export abstract class AbstractPool<
   }
 
   private setTaskStealing (): void {
-    for (const [workerNodeKey] of this.workerNodes.entries()) {
+    for (const workerNodeKey of this.workerNodes.keys()) {
       this.workerNodes[workerNodeKey].on('idle', this.handleWorkerNodeIdleEvent)
     }
   }
 
   private unsetTaskStealing (): void {
-    for (const [workerNodeKey] of this.workerNodes.entries()) {
+    for (const workerNodeKey of this.workerNodes.keys()) {
       this.workerNodes[workerNodeKey].off(
         'idle',
         this.handleWorkerNodeIdleEvent
@@ -654,7 +669,7 @@ export abstract class AbstractPool<
   }
 
   private setTasksStealingOnBackPressure (): void {
-    for (const [workerNodeKey] of this.workerNodes.entries()) {
+    for (const workerNodeKey of this.workerNodes.keys()) {
       this.workerNodes[workerNodeKey].on(
         'backPressure',
         this.handleWorkerNodeBackPressureEvent
@@ -663,7 +678,7 @@ export abstract class AbstractPool<
   }
 
   private unsetTasksStealingOnBackPressure (): void {
-    for (const [workerNodeKey] of this.workerNodes.entries()) {
+    for (const workerNodeKey of this.workerNodes.keys()) {
       this.workerNodes[workerNodeKey].off(
         'backPressure',
         this.handleWorkerNodeBackPressureEvent
@@ -806,7 +821,7 @@ export abstract class AbstractPool<
           }
         }
       }
-      for (const [workerNodeKey] of this.workerNodes.entries()) {
+      for (const workerNodeKey of this.workerNodes.keys()) {
         this.registerWorkerMessageListener(
           workerNodeKey,
           taskFunctionOperationsListener
@@ -2032,7 +2047,7 @@ export abstract class AbstractPool<
   }
 
   private flushTasksQueues (): void {
-    for (const [workerNodeKey] of this.workerNodes.entries()) {
+    for (const workerNodeKey of this.workerNodes.keys()) {
       this.flushTasksQueue(workerNodeKey)
     }
   }
index 82f3d8d91f05fd4f921a7feb17a901a44e84f966..ccb4b5b94ef858ad00c0eb5a28c490651854489e 100644 (file)
@@ -348,10 +348,11 @@ export interface IPool<
    * Sets the worker choice strategy options in this pool.
    *
    * @param workerChoiceStrategyOptions - The worker choice strategy options.
+   * @returns `true` if the worker choice strategy options were set, `false` otherwise.
    */
   readonly setWorkerChoiceStrategyOptions: (
     workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
-  ) => void
+  ) => boolean
   /**
    * Enables/disables the worker node tasks queue in this pool.
    *
index d1e60e0350839893631a9e153ef143521ca2f6b3..777be6e4d4a2be104afd7986bd835e7e04312e96 100644 (file)
@@ -10,6 +10,8 @@ import { LeastUsedWorkerChoiceStrategy } from './least-used-worker-choice-strate
 import { RoundRobinWorkerChoiceStrategy } from './round-robin-worker-choice-strategy.js'
 import {
   type IWorkerChoiceStrategy,
+  type StrategyPolicy,
+  type TaskStatisticsRequirements,
   WorkerChoiceStrategies,
   type WorkerChoiceStrategy,
   type WorkerChoiceStrategyOptions
@@ -98,6 +100,47 @@ export const buildWorkerChoiceStrategyOptions = <
   }
 }
 
+export const buildWorkerChoiceStrategiesPolicy = (
+  workerChoiceStrategies: Map<WorkerChoiceStrategy, IWorkerChoiceStrategy>
+): StrategyPolicy => {
+  const policies: StrategyPolicy[] = []
+  for (const workerChoiceStrategy of workerChoiceStrategies.values()) {
+    policies.push(workerChoiceStrategy.strategyPolicy)
+  }
+  return {
+    dynamicWorkerUsage: policies.some(p => p.dynamicWorkerUsage),
+    dynamicWorkerReady: policies.some(p => p.dynamicWorkerReady)
+  }
+}
+
+export const buildWorkerChoiceStrategiesTaskStatisticsRequirements = (
+  workerChoiceStrategies: Map<WorkerChoiceStrategy, IWorkerChoiceStrategy>
+): TaskStatisticsRequirements => {
+  const taskStatisticsRequirements: TaskStatisticsRequirements[] = []
+  for (const workerChoiceStrategy of workerChoiceStrategies.values()) {
+    taskStatisticsRequirements.push(
+      workerChoiceStrategy.taskStatisticsRequirements
+    )
+  }
+  return {
+    runTime: {
+      aggregate: taskStatisticsRequirements.some(r => r.runTime.aggregate),
+      average: taskStatisticsRequirements.some(r => r.runTime.average),
+      median: taskStatisticsRequirements.some(r => r.runTime.median)
+    },
+    waitTime: {
+      aggregate: taskStatisticsRequirements.some(r => r.waitTime.aggregate),
+      average: taskStatisticsRequirements.some(r => r.waitTime.average),
+      median: taskStatisticsRequirements.some(r => r.waitTime.median)
+    },
+    elu: {
+      aggregate: taskStatisticsRequirements.some(r => r.elu.aggregate),
+      average: taskStatisticsRequirements.some(r => r.elu.average),
+      median: taskStatisticsRequirements.some(r => r.elu.median)
+    }
+  }
+}
+
 export const getWorkerChoiceStrategy = <Worker extends IWorker, Data, Response>(
   workerChoiceStrategy: WorkerChoiceStrategy,
   pool: IPool<Worker, Data, Response>,
index 7e633e164f7c93d96661cd0bd0b6be1226f09e32..d088015acc1ad9b62fd57a1221871e42210418bb 100644 (file)
@@ -9,6 +9,8 @@ import type {
 } from './selection-strategies-types.js'
 import { WorkerChoiceStrategies } from './selection-strategies-types.js'
 import {
+  buildWorkerChoiceStrategiesPolicy,
+  buildWorkerChoiceStrategiesTaskStatisticsRequirements,
   getWorkerChoiceStrategiesRetries,
   getWorkerChoiceStrategy
 } from './selection-strategies-utils.js'
@@ -43,6 +45,16 @@ export class WorkerChoiceStrategiesContext<
   IWorkerChoiceStrategy
   >
 
+  /**
+   * The active worker choice strategies in the context policy.
+   */
+  private workerChoiceStrategiesPolicy: StrategyPolicy
+
+  /**
+   * The active worker choice strategies in the context task statistics requirements.
+   */
+  private workerChoiceStrategiesTaskStatisticsRequirements: TaskStatisticsRequirements
+
   /**
    * The maximum number of worker choice strategies execution retries.
    */
@@ -71,6 +83,13 @@ export class WorkerChoiceStrategiesContext<
     for (const workerChoiceStrategy of workerChoiceStrategies) {
       this.addWorkerChoiceStrategy(workerChoiceStrategy, this.pool, opts)
     }
+    this.workerChoiceStrategiesPolicy = buildWorkerChoiceStrategiesPolicy(
+      this.workerChoiceStrategies
+    )
+    this.workerChoiceStrategiesTaskStatisticsRequirements =
+      buildWorkerChoiceStrategiesTaskStatisticsRequirements(
+        this.workerChoiceStrategies
+      )
     this.retriesCount = 0
     this.retries = getWorkerChoiceStrategiesRetries<Worker, Data, Response>(
       this.pool,
@@ -84,45 +103,16 @@ export class WorkerChoiceStrategiesContext<
    * @returns The strategies policy.
    */
   public getPolicy (): StrategyPolicy {
-    const policies: StrategyPolicy[] = []
-    for (const workerChoiceStrategy of this.workerChoiceStrategies.values()) {
-      policies.push(workerChoiceStrategy.strategyPolicy)
-    }
-    return {
-      dynamicWorkerUsage: policies.some(p => p.dynamicWorkerUsage),
-      dynamicWorkerReady: policies.some(p => p.dynamicWorkerReady)
-    }
+    return this.workerChoiceStrategiesPolicy
   }
 
   /**
    * Gets the active worker choice strategies in the context task statistics requirements.
    *
-   * @returns The task statistics requirements.
+   * @returns The strategies task statistics requirements.
    */
   public getTaskStatisticsRequirements (): TaskStatisticsRequirements {
-    const taskStatisticsRequirements: TaskStatisticsRequirements[] = []
-    for (const workerChoiceStrategy of this.workerChoiceStrategies.values()) {
-      taskStatisticsRequirements.push(
-        workerChoiceStrategy.taskStatisticsRequirements
-      )
-    }
-    return {
-      runTime: {
-        aggregate: taskStatisticsRequirements.some(r => r.runTime.aggregate),
-        average: taskStatisticsRequirements.some(r => r.runTime.average),
-        median: taskStatisticsRequirements.some(r => r.runTime.median)
-      },
-      waitTime: {
-        aggregate: taskStatisticsRequirements.some(r => r.waitTime.aggregate),
-        average: taskStatisticsRequirements.some(r => r.waitTime.average),
-        median: taskStatisticsRequirements.some(r => r.waitTime.median)
-      },
-      elu: {
-        aggregate: taskStatisticsRequirements.some(r => r.elu.aggregate),
-        average: taskStatisticsRequirements.some(r => r.elu.average),
-        median: taskStatisticsRequirements.some(r => r.elu.median)
-      }
-    }
+    return this.workerChoiceStrategiesTaskStatisticsRequirements
   }
 
   /**
@@ -155,7 +145,7 @@ export class WorkerChoiceStrategiesContext<
   }
 
   /**
-   * Executes the worker choice strategy in the context algorithm.
+   * Executes the given worker choice strategy in the context algorithm.
    *
    * @param workerChoiceStrategy - The worker choice strategy algorithm to execute. @defaultValue this.defaultWorkerChoiceStrategy
    * @returns The key of the worker node.
@@ -243,6 +233,13 @@ export class WorkerChoiceStrategiesContext<
         this.addWorkerChoiceStrategy(workerChoiceStrategy, this.pool, opts)
       }
     }
+    this.workerChoiceStrategiesPolicy = buildWorkerChoiceStrategiesPolicy(
+      this.workerChoiceStrategies
+    )
+    this.workerChoiceStrategiesTaskStatisticsRequirements =
+      buildWorkerChoiceStrategiesTaskStatisticsRequirements(
+        this.workerChoiceStrategies
+      )
   }
 
   /**