fix: ensure task function ops sync worker choice strategies
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Sun, 28 Apr 2024 21:35:28 +0000 (23:35 +0200)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Sun, 28 Apr 2024 21:35:28 +0000 (23:35 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
.eslintrc.cjs
.vscode/settings.json
src/pools/abstract-pool.ts
src/pools/selection-strategies/selection-strategies-utils.ts
src/pools/selection-strategies/worker-choice-strategies-context.ts
src/pools/utils.ts
tests/worker/abstract-worker.test.mjs

index ef9e86c091254a8920eecfbe2706ff2c2d71736a..cc22b6a4ec8a5dd66a3cb1ba9ad3d0768d7d78a7 100644 (file)
@@ -45,6 +45,7 @@ module.exports = defineConfig({
           'cpus',
           'cryptographically',
           'ctx',
+          'deduped',
           'deprecations',
           'deque',
           'dequeue',
index b014abdad4a6b7d9bdff169e9a2b2d9676f83d22..b644a303239a1acfafdf54a0ed7662eb0fd8b2f5 100644 (file)
@@ -14,6 +14,7 @@
     "cloneable",
     "codeql",
     "commitlint",
+    "deduped",
     "Dependabot",
     "deque",
     "deregisters",
index 63f1f8595c928a6db6e8daf2fe8180b9685b459a..c510b2ad99495226456b880ddd83edd760728b10 100644 (file)
@@ -548,13 +548,22 @@ export abstract class AbstractPool<
     workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
   ): void {
     checkValidWorkerChoiceStrategy(workerChoiceStrategy)
-    this.opts.workerChoiceStrategy = workerChoiceStrategy
-    this.workerChoiceStrategiesContext?.setDefaultWorkerChoiceStrategy(
-      this.opts.workerChoiceStrategy,
-      workerChoiceStrategyOptions
-    )
-    for (const [workerNodeKey] of this.workerNodes.entries()) {
-      this.sendStatisticsMessageToWorker(workerNodeKey)
+    if (workerChoiceStrategyOptions != null) {
+      this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
+    }
+    if (workerChoiceStrategy !== this.opts.workerChoiceStrategy) {
+      this.opts.workerChoiceStrategy = workerChoiceStrategy
+      this.workerChoiceStrategiesContext?.setDefaultWorkerChoiceStrategy(
+        this.opts.workerChoiceStrategy,
+        this.opts.workerChoiceStrategyOptions
+      )
+      this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies(
+        this.getWorkerWorkerChoiceStrategies(),
+        this.opts.workerChoiceStrategyOptions
+      )
+      for (const [workerNodeKey] of this.workerNodes.entries()) {
+        this.sendStatisticsMessageToWorker(workerNodeKey)
+      }
     }
   }
 
@@ -809,17 +818,9 @@ export abstract class AbstractPool<
 
   /** @inheritDoc */
   public hasTaskFunction (name: string): boolean {
-    for (const workerNode of this.workerNodes) {
-      if (
-        Array.isArray(workerNode.info.taskFunctionsProperties) &&
-        workerNode.info.taskFunctionsProperties.some(
-          taskFunctionProperties => taskFunctionProperties.name === name
-        )
-      ) {
-        return true
-      }
-    }
-    return false
+    return this.listTaskFunctionsProperties().some(
+      taskFunctionProperties => taskFunctionProperties.name === name
+    )
   }
 
   /** @inheritDoc */
@@ -845,6 +846,9 @@ export abstract class AbstractPool<
       taskFunction: fn.taskFunction.toString()
     })
     this.taskFunctions.set(name, fn)
+    this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies(
+      this.getWorkerWorkerChoiceStrategies()
+    )
     return opResult
   }
 
@@ -864,6 +868,9 @@ export abstract class AbstractPool<
     })
     this.deleteTaskFunctionWorkerUsages(name)
     this.taskFunctions.delete(name)
+    this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies(
+      this.getWorkerWorkerChoiceStrategies()
+    )
     return opResult
   }
 
@@ -897,6 +904,27 @@ export abstract class AbstractPool<
     }
   }
 
+  /**
+   * Gets the worker choice strategies registered in this pool.
+   *
+   * @returns The worker choice strategies.
+   */
+  private readonly getWorkerWorkerChoiceStrategies =
+    (): Set<WorkerChoiceStrategy> => {
+      return new Set([
+        // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+        this.opts.workerChoiceStrategy!,
+        ...(this.listTaskFunctionsProperties()
+          .map(
+            (taskFunctionProperties: TaskFunctionProperties) =>
+              taskFunctionProperties.strategy
+          )
+          .filter(
+            (strategy: WorkerChoiceStrategy | undefined) => strategy != null
+          ) as WorkerChoiceStrategy[])
+      ])
+    }
+
   /** @inheritDoc */
   public async setDefaultTaskFunction (name: string): Promise<boolean> {
     return await this.sendTaskFunctionOperationToWorkers({
index bed0d624ec4c6d5856baab6b42653f2c521f6093..d1e60e0350839893631a9e153ef143521ca2f6b3 100644 (file)
@@ -17,10 +17,6 @@ import {
 import { WeightedRoundRobinWorkerChoiceStrategy } from './weighted-round-robin-worker-choice-strategy.js'
 import type { WorkerChoiceStrategiesContext } from './worker-choice-strategies-context.js'
 
-const clone = <T>(object: T): T => {
-  return structuredClone<T>(object)
-}
-
 const estimatedCpuSpeed = (): number => {
   const runs = 150000000
   const begin = performance.now()
@@ -90,7 +86,7 @@ export const buildWorkerChoiceStrategyOptions = <
     pool: IPool<Worker, Data, Response>,
     opts?: WorkerChoiceStrategyOptions
   ): WorkerChoiceStrategyOptions => {
-  opts = clone(opts ?? {})
+  opts = structuredClone(opts ?? {})
   opts.weights = opts.weights ?? getDefaultWeights(pool.info.maxSize)
   return {
     ...{
index 6b9d726d14b09c7947cc915039b83d712da79a8c..7e633e164f7c93d96661cd0bd0b6be1226f09e32 100644 (file)
@@ -79,7 +79,7 @@ export class WorkerChoiceStrategiesContext<
   }
 
   /**
-   * Gets the active worker choice strategies policy in the context.
+   * Gets the active worker choice strategies in the context policy.
    *
    * @returns The strategies policy.
    */
@@ -135,8 +135,10 @@ export class WorkerChoiceStrategiesContext<
     workerChoiceStrategy: WorkerChoiceStrategy,
     opts?: WorkerChoiceStrategyOptions
   ): void {
-    this.defaultWorkerChoiceStrategy = workerChoiceStrategy
-    this.addWorkerChoiceStrategy(workerChoiceStrategy, this.pool, opts)
+    if (workerChoiceStrategy !== this.defaultWorkerChoiceStrategy) {
+      this.defaultWorkerChoiceStrategy = workerChoiceStrategy
+      this.addWorkerChoiceStrategy(workerChoiceStrategy, this.pool, opts)
+    }
   }
 
   /**
@@ -221,6 +223,35 @@ export class WorkerChoiceStrategiesContext<
     }
   }
 
+  /**
+   * Synchronizes the active worker choice strategies in the context with the given worker choice strategies.
+   *
+   * @param workerChoiceStrategies - The worker choice strategies to synchronize.
+   * @param opts - The worker choice strategy options.
+   */
+  public syncWorkerChoiceStrategies (
+    workerChoiceStrategies: Set<WorkerChoiceStrategy>,
+    opts?: WorkerChoiceStrategyOptions
+  ): void {
+    for (const workerChoiceStrategy of this.workerChoiceStrategies.keys()) {
+      if (!workerChoiceStrategies.has(workerChoiceStrategy)) {
+        this.removeWorkerChoiceStrategy(workerChoiceStrategy)
+      }
+    }
+    for (const workerChoiceStrategy of workerChoiceStrategies) {
+      if (!this.workerChoiceStrategies.has(workerChoiceStrategy)) {
+        this.addWorkerChoiceStrategy(workerChoiceStrategy, this.pool, opts)
+      }
+    }
+  }
+
+  /**
+   * Adds a worker choice strategy to the context.
+   *
+   * @param workerChoiceStrategy - The worker choice strategy to add.
+   * @param opts - The worker choice strategy options.
+   * @returns The worker choice strategies.
+   */
   private addWorkerChoiceStrategy (
     workerChoiceStrategy: WorkerChoiceStrategy,
     pool: IPool<Worker, Data, Response>,
@@ -240,9 +271,15 @@ export class WorkerChoiceStrategiesContext<
     return this.workerChoiceStrategies
   }
 
-  // private removeWorkerChoiceStrategy (
-  //   workerChoiceStrategy: WorkerChoiceStrategy
-  // ): boolean {
-  //   return this.workerChoiceStrategies.delete(workerChoiceStrategy)
-  // }
+  /**
+   * Removes a worker choice strategy from the context.
+   *
+   * @param workerChoiceStrategy - The worker choice strategy to remove.
+   * @returns `true` if the worker choice strategy is removed, `false` otherwise.
+   */
+  private removeWorkerChoiceStrategy (
+    workerChoiceStrategy: WorkerChoiceStrategy
+  ): boolean {
+    return this.workerChoiceStrategies.delete(workerChoiceStrategy)
+  }
 }
index 36bd8939fdf82155c208d966d502d3098f11ffe3..3fa17312d1fe24f54fb121eeda4d9dee6124e0c9 100644 (file)
@@ -89,14 +89,14 @@ export const checkDynamicPoolSize = (
 
 export const checkValidPriority = (priority: number | undefined): void => {
   if (priority != null && !Number.isSafeInteger(priority)) {
-    throw new TypeError(`Invalid priority '${priority}'`)
+    throw new TypeError(`Invalid property 'priority': '${priority}'`)
   }
   if (
     priority != null &&
     Number.isSafeInteger(priority) &&
     (priority < -20 || priority > 19)
   ) {
-    throw new RangeError('Property priority must be between -20 and 19')
+    throw new RangeError("Property 'priority' must be between -20 and 19")
   }
 }
 
index d94cd4400d2cd70cb7f618c4b1af773700d8a6ec..38eaab5bd636b1b2a6414f299c0217e2289a8624 100644 (file)
@@ -174,13 +174,13 @@ describe('Abstract worker test suite', () => {
     )
     expect(
       () => new ThreadWorker({ fn1: { taskFunction: fn1, priority: '' } })
-    ).toThrow(new TypeError("Invalid priority ''"))
+    ).toThrow(new TypeError("Invalid property 'priority': ''"))
     expect(
       () => new ThreadWorker({ fn1: { taskFunction: fn1, priority: -21 } })
-    ).toThrow(new TypeError('Property priority must be between -20 and 19'))
+    ).toThrow(new TypeError("Property 'priority' must be between -20 and 19"))
     expect(
       () => new ThreadWorker({ fn1: { taskFunction: fn1, priority: 20 } })
-    ).toThrow(new RangeError('Property priority must be between -20 and 19'))
+    ).toThrow(new RangeError("Property 'priority' must be between -20 and 19"))
     expect(
       () =>
         new ThreadWorker({