]> Piment Noir Git Repositories - poolifier.git/commitdiff
feat: task function worker node affinity (#2269)
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Mon, 16 Feb 2026 15:52:42 +0000 (16:52 +0100)
committerGitHub <noreply@github.com>
Mon, 16 Feb 2026 15:52:42 +0000 (16:52 +0100)
* feat: task function worker node affinity

closes #778

Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
* refactor: cleanup worker node affinity namespace

Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
* refactor: validate worker nodes affinity

Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
* refactor: refine worke nodes affinity validation

Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
* fix: check worker node keys affinity is part of pool worker nodes

Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
* fix: fix default task function properties handling

Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
* test: improve task function objects coverage

Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
* test: cleanup tests

Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
* test: fix mismerge

Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
* refactor: silence linter

Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
* refactor: code reformatting

Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
* refactor: code formatting

Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
* chore: merge eslint-plugin-perfectionist reformatting

Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
* chore: reenabled lint-staged

Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
* chore: fix mismerge

Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
* refactor: code formatting

Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
* [autofix.ci] apply automated fixes

* [autofix.ci] apply automated fixes (attempt 2/3)

* chore: fix mismerge

Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
* perf: lighter worker node keys validation on hot path

Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
* refactor: cleanups

Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
* [autofix.ci] apply automated fixes

* [autofix.ci] apply automated fixes

* refactor: early exit in worker choice strategies

Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
* refactor: align unique worker node affinity handling

Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
* Apply suggestion from @Copilot

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
* [autofix.ci] apply automated fixes

* refactor: remove uneeded comment

Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
* perf: reorder conditions

Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
* refactor: remove uneeded conditions

Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
* Apply suggestion from @Copilot

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
* Apply suggestion from @Copilot

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
* Apply suggestion from @Copilot

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
* perf: add missing optimization

Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
* refactor: cleanup worker nodes array validation code

Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
* fix: add defensive checks for worker node affinity in selection strategies

- Add bounded loops to prevent infinite loops when no valid worker in affinity set
- Add empty/single-key checks in all strategies
- Add comprehensive tests for workerNodeKeys affinity

* fix: add empty array guard and fix iteration in worker node affinity strategies

* refactor(selection-strategies): extract getSingleWorkerNodeKey helper to reduce duplication

Consolidate the early-return pattern for single-element workerNodeKeys
array into a reusable protected method in AbstractWorkerChoiceStrategy.
All 7 selection strategies now use this helper instead of duplicating
the same check and isWorkerNodeReady call.

* fix(selection-strategies): correct affinity handling and optimize to O(1) lookup

- Fix WeightedRoundRobin: check affinity membership before using workerNodeKey
- Fix InterleavedWeightedRR: add wrap-around to find valid workers behind current position
- Optimize: change checkWorkerNodeKeys() to return Set<number> for O(1) .has() lookup
- Update all 7 strategies to use Set.has() instead of Array.includes()

* refactor(iwrr): remove unnecessary wrap-around logic

The retry mechanism in executeStrategy() already handles the case where
choose() returns undefined. Each call advances the position via
interleavedWeightedRoundRobinNextWorkerNodeId(), eventually covering
the entire (roundId, workerNodeId) space.

Removed the interleavedWeightedRoundRobinChoose() helper method and
inlined the logic directly in choose() for simplicity.

* refactor(api): rename workerNodes to workerNodeKeys for clarity

Rename the task function affinity property from 'workerNodes' to
'workerNodeKeys' to avoid confusion with pool.workerNodes (IWorkerNode[])
and align with internal naming conventions where keys refer to indices.

* test(selection-strategies): add workerNodeKeys affinity tests for all strategies

Add comprehensive tests for worker node keys affinity in choose() method:
- Empty workerNodeKeys returns undefined
- Single workerNodeKey returns that key if ready
- Multiple workerNodeKeys respects affinity constraint

Covers: RoundRobin, LeastUsed, LeastBusy, LeastElu, FairShare,
WeightedRoundRobin, InterleavedWeightedRoundRobin strategies

* test(pool): add workerNodeKeys validation and affinity tests

- Add checkValidWorkerNodes edge case tests
- Add addTaskFunction workerNodeKeys validation tests
- Add integration test for execute() workerNodeKeys affinity

* docs: improve workerNodeKeys JSDoc and error messages

- Expand JSDoc for workerNodeKeys parameter across selection strategies
- Improve error message clarity for invalid worker node keys
- Update tests to match improved error messages

* docs: harmonize workerNodeKeys JSDoc at getSingleWorkerNodeKey

* fix(weighted-round-robin): set nextWorkerNodeKey when staying on same worker

* fix(pool): check worker node affinity before returning dynamic worker

* docs: clarify worker node affinity validation behavior

* refactor: rename checkValidWorkerNodes to checkValidWorkerNodeKeys

Align function name, parameter name, and error messages with actual
semantics: the function validates worker node keys (numeric indices),
not worker node objects.

* refactor: eliminate unnecessary Set/Array allocations in worker node affinity hot paths

- Remove workerNodeKeys getter from IPool interface and AbstractPool
- Replace new Set(this.workerNodeKeys) validation with O(1) range checks
- Rename getTaskFunctionWorkerNodes() to getTaskFunctionWorkerNodeKeysSet()
  returning Set<number> directly from source
- Propagate Set<number> through execute() → executeStrategy() → choose()
  eliminating intermediate array-to-Set conversions
- Replace new Set(this.workerNodes.keys()) in sendTaskFunctionOperationToWorkers()
  with direct workerNodes.length/keys() usage

* fix(wrr): update state in single-element affinity shortcut

Reset nextWorkerNodeKey and workerNodeVirtualTaskExecutionTime in the
size === 1 path to maintain consistent round-robin state.

* fix: address Copilot review comments

- Check affinity before creating dynamic worker
- Add null to workerNodeKeys signature
- Fix anchor link and document length constraint

* chore: disable biome formatter in opencode

* perf(strategies): fast path for round-robin without affinity

* test: consolidate worker choice strategies test suite

* test: verify workerNodeKeys transmission from worker constructor

* fix: address Copilot review concerns for worker node affinity

- Snapshot workerNodeKeys in sendTaskFunctionOperationToWorkers to prevent race conditions
- Use ReadonlySet<number> for workerNodeKeysSet in selection strategies for type safety
- Compute workerNodeKeysSet lazily instead of storing in TaskFunctionProperties
- Allow null in workerNodeKeys type to explicitly disable affinity

* fix: improve error message grammar in addTaskFunction validation

Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-opencode)

Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
* fix: validate worker node keys against max pool size and fix single-key state update

- Validate workerNodeKeys against maximumNumberOfWorkers instead of current
  pool size, allowing dynamic pools to reference future worker indices
- Only update nextWorkerNodeKey in RR/WRR single-key path when worker is
  ready, preventing state inconsistency on retry
- Clarify TaskFunctionObject.workerNodeKeys doc regarding null vs undefined

* feat: create dynamic workers to satisfy worker node affinity constraints

In dynamic pools, when a task's workerNodeKeys affinity references
worker nodes beyond the current pool size, automatically create
dynamic workers up to the maximum pool size to satisfy the affinity.

* test: harmonize variable naming in dynamic worker affinity test

* refactor: consolidate worker node keys validation into helper and enforce error types in tests

* fix: remove unused null from workerNodeKeys type and restore test suite title

* docs: mention worker node affinity feature in README

Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-opencode)

Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
* refactor: use project max helper instead of Math.max

Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-opencode)

Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
---------

Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
27 files changed:
.opencode/opencode.jsonc
README.md
docs/api.md
src/pools/abstract-pool.ts
src/pools/selection-strategies/abstract-worker-choice-strategy.ts
src/pools/selection-strategies/fair-share-worker-choice-strategy.ts
src/pools/selection-strategies/interleaved-weighted-round-robin-worker-choice-strategy.ts
src/pools/selection-strategies/least-busy-worker-choice-strategy.ts
src/pools/selection-strategies/least-elu-worker-choice-strategy.ts
src/pools/selection-strategies/least-used-worker-choice-strategy.ts
src/pools/selection-strategies/round-robin-worker-choice-strategy.ts
src/pools/selection-strategies/selection-strategies-types.ts
src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts
src/pools/selection-strategies/worker-choice-strategies-context.ts
src/pools/utils.ts
src/utility-types.ts
src/utils.ts
src/worker/abstract-worker.ts
src/worker/task-functions.ts
src/worker/utils.ts
tests/pools/abstract-pool.test.mjs
tests/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.test.mjs
tests/pools/selection-strategies/worker-choice-strategies-context.test.mjs
tests/pools/utils.test.mjs
tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs
tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs
tests/worker-files/thread/testTaskFunctionObjectsWorker.mjs

index 8a9a6fc3b90ac71528688fe87c8f967a34f36fe4..0f2cb97c63fecc6814ca4c98d53279a9d617816e 100644 (file)
@@ -3,6 +3,9 @@
   "formatter": {
     "prettier": {
       "disabled": true
+    },
+    "biome": {
+      "disabled": true
     }
   }
 }
index 2bd85571e03ddbce0aeba049a56837c22cd5f372..d10d2db7b3337df8236d9ab54375ab1ef5c0e51e 100644 (file)
--- a/README.md
+++ b/README.md
@@ -48,6 +48,7 @@ Please consult our [general guidelines](#general-guidelines).
 - Support for sync and async task function :white_check_mark:
 - Support for abortable task function :white_check_mark:
 - Support for multiple task functions with per task function queuing priority and tasks distribution strategy :white_check_mark:
+- Support for worker node affinity per task function :white_check_mark:
 - Support for task functions [CRUD](https://en.wikipedia.org/wiki/Create,_read,_update_and_delete) operations at runtime :white_check_mark:
 - General guidelines on pool choice :white_check_mark:
 - Error handling out of the box :white_check_mark:
index 1d877dc106c0aac6e14932c071a35acbd698968c..40d2fb4acb39222a25cd2a50f73a79d9bbaebb0f 100644 (file)
@@ -73,7 +73,14 @@ This method is available on both pool implementations and returns a boolean.
 ### `pool.addTaskFunction(name, fn)`
 
 `name` (mandatory) The task function name.  
-`fn` (mandatory) The task function `(data?: Data) => Response | Promise<Response>` or task function object `{ taskFunction: (data?: Data) => Response | Promise<Response>, priority?: number, strategy?: WorkerChoiceStrategy }`. Priority range is the same as Unix nice levels.
+`fn` (mandatory) The task function `(data?: Data) => Response | Promise<Response>` or task function object `{ taskFunction: (data?: Data) => Response | Promise<Response>, priority?: number, strategy?: WorkerChoiceStrategy, workerNodeKeys?: number[] }`. Priority range is the same as Unix nice levels. `workerNodeKeys` is an array of worker node keys to restrict task execution to specific workers (worker node affinity).
+
+#### Worker Node Affinity Notes
+
+- Worker node keys are validated at registration time against the pool's maximum size (`maximumNumberOfWorkers ?? minimumNumberOfWorkers`).
+- The number of worker node keys cannot exceed the pool's maximum size (`maximumNumberOfWorkers ?? minimumNumberOfWorkers`).
+- In dynamic pools, you can reference worker node keys up to the maximum pool size. Workers that don't exist yet are automatically created when a task targeting them is executed.
+- At execution time, if no specified worker is ready, selection retries until one becomes available or retries are exhausted.
 
 This method is available on both pool implementations and returns a boolean promise.
 
@@ -158,7 +165,7 @@ An object with these properties:
 
 ### `class YourWorker extends ThreadWorker/ClusterWorker`
 
-`taskFunctions` (mandatory) The task function or task functions object `Record<string, (data?: Data) => Response | Promise<Response> | { taskFunction: (data?: Data) => Response | Promise<Response>, priority?: number, strategy?: WorkerChoiceStrategy }>` that you want to execute on the worker. Priority range is the same as Unix nice levels.  
+`taskFunctions` (mandatory) The task function or task functions object `Record<string, (data?: Data) => Response | Promise<Response> | { taskFunction: (data?: Data) => Response | Promise<Response>, priority?: number, strategy?: WorkerChoiceStrategy, workerNodeKeys?: number[] }>` that you want to execute on the worker. Priority range is the same as Unix nice levels. `workerNodeKeys` is an array of worker node keys to restrict task execution to specific workers (worker node affinity). See [Worker Node Affinity Notes](#worker-node-affinity-notes) above for validation behavior.  
 `opts` (optional) An object with these properties:
 
 - `killBehavior` (optional) - Dictates if your worker will be deleted in case a task is active on it.  
@@ -185,7 +192,7 @@ This method is available on both worker implementations and returns `{ status: b
 #### `YourWorker.addTaskFunction(name, fn)`
 
 `name` (mandatory) The task function name.  
-`fn` (mandatory) The task function `(data?: Data) => Response | Promise<Response>` or task function object `{ taskFunction: (data?: Data) => Response | Promise<Response>, priority?: number, strategy?: WorkerChoiceStrategy }`. Priority range is the same as Unix nice levels.
+`fn` (mandatory) The task function `(data?: Data) => Response | Promise<Response>` or task function object `{ taskFunction: (data?: Data) => Response | Promise<Response>, priority?: number, strategy?: WorkerChoiceStrategy, workerNodeKeys?: number[] }`. Priority range is the same as Unix nice levels. `workerNodeKeys` is an array of worker node keys to restrict task execution to specific workers (worker node affinity). See [Worker Node Affinity Notes](#worker-node-affinity-notes) above for validation behavior.
 
 This method is available on both worker implementations and returns `{ status: boolean, error?: Error }`.
 
index 507bb151e3f9adf1fb5d1b2a3435355bfb6b9145..d7de872c25642d13c686ae7f0909d4d13c094784 100644 (file)
@@ -61,6 +61,7 @@ import {
   checkValidPriority,
   checkValidTasksQueueOptions,
   checkValidWorkerChoiceStrategy,
+  checkValidWorkerNodeKeys,
   getDefaultTasksQueueOptions,
   updateEluWorkerUsage,
   updateRunTimeWorkerUsage,
@@ -590,6 +591,10 @@ export abstract class AbstractPool<
     }
     checkValidPriority(fn.priority)
     checkValidWorkerChoiceStrategy(fn.strategy)
+    checkValidWorkerNodeKeys(
+      fn.workerNodeKeys,
+      this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
+    )
     const opResult = await this.sendTaskFunctionOperationToWorkers({
       taskFunction: fn.taskFunction.toString(),
       taskFunctionOperation: 'add',
@@ -1583,7 +1588,20 @@ export abstract class AbstractPool<
    * @returns The chosen worker node key.
    */
   private chooseWorkerNode (name?: string): number {
-    if (this.shallCreateDynamicWorker()) {
+    const workerNodeKeysSet = this.getTaskFunctionWorkerNodeKeysSet(name)
+    if (workerNodeKeysSet != null) {
+      const maxPoolSize =
+        this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
+      const targetSize = max(...workerNodeKeysSet) + 1
+      while (
+        this.started &&
+        !this.destroying &&
+        this.workerNodes.length < targetSize &&
+        this.workerNodes.length < maxPoolSize
+      ) {
+        this.createAndSetupDynamicWorkerNode()
+      }
+    } else if (this.shallCreateDynamicWorker()) {
       const workerNodeKey = this.createAndSetupDynamicWorkerNode()
       if (
         this.workerChoiceStrategiesContext?.getPolicy().dynamicWorkerUsage ===
@@ -1594,7 +1612,8 @@ export abstract class AbstractPool<
     }
     // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
     return this.workerChoiceStrategiesContext!.execute(
-      this.getTaskFunctionWorkerChoiceStrategy(name)
+      this.getTaskFunctionWorkerChoiceStrategy(name),
+      workerNodeKeysSet
     )
   }
 
@@ -1703,6 +1722,26 @@ export abstract class AbstractPool<
     )?.strategy
   }
 
+  /**
+   * Gets task function worker node keys affinity set, if any.
+   * @param name - The task function name.
+   * @returns The task function worker node keys affinity set, or `undefined` if not defined.
+   */
+  private readonly getTaskFunctionWorkerNodeKeysSet = (
+    name?: string
+  ): ReadonlySet<number> | undefined => {
+    name = name ?? DEFAULT_TASK_NAME
+    const taskFunctionsProperties = this.listTaskFunctionsProperties()
+    if (name === DEFAULT_TASK_NAME) {
+      name = taskFunctionsProperties[1]?.name
+    }
+    const workerNodeKeys = taskFunctionsProperties.find(
+      (taskFunctionProperties: TaskFunctionProperties) =>
+        taskFunctionProperties.name === name
+    )?.workerNodeKeys
+    return workerNodeKeys != null ? new Set(workerNodeKeys) : undefined
+  }
+
   private getTasksQueuePriority (): boolean {
     return this.listTaskFunctionsProperties().some(
       taskFunctionProperties => taskFunctionProperties.priority != null
@@ -2321,8 +2360,8 @@ export abstract class AbstractPool<
   private async sendTaskFunctionOperationToWorkers (
     message: MessageValue<Data>
   ): Promise<boolean> {
-    const targetWorkerNodeKeys = [...this.workerNodes.keys()]
-    if (targetWorkerNodeKeys.length === 0) {
+    const targetWorkerNodeCount = this.workerNodes.length
+    if (targetWorkerNodeCount === 0) {
       return true
     }
     const responsesReceived: MessageValue<Response>[] = []
@@ -2332,14 +2371,14 @@ export abstract class AbstractPool<
       reject: (reason?: unknown) => void
     ): void => {
       this.checkMessageWorkerId(message)
+      const workerNodeKey = this.getWorkerNodeKeyByWorkerId(message.workerId)
       if (
         message.taskFunctionOperationStatus != null &&
-        targetWorkerNodeKeys.includes(
-          this.getWorkerNodeKeyByWorkerId(message.workerId)
-        )
+        workerNodeKey >= 0 &&
+        workerNodeKey < targetWorkerNodeCount
       ) {
         responsesReceived.push(message)
-        if (responsesReceived.length >= targetWorkerNodeKeys.length) {
+        if (responsesReceived.length >= targetWorkerNodeCount) {
           if (
             responsesReceived.every(
               msg => msg.taskFunctionOperationStatus === true
@@ -2361,19 +2400,20 @@ export abstract class AbstractPool<
       }
     }
     let listener: ((message: MessageValue<Response>) => void) | undefined
+    const workerNodeKeys = [...this.workerNodes.keys()]
     try {
       return await new Promise<boolean>((resolve, reject) => {
         listener = (message: MessageValue<Response>) => {
           taskFunctionOperationsListener(message, resolve, reject)
         }
-        for (const workerNodeKey of targetWorkerNodeKeys) {
+        for (const workerNodeKey of workerNodeKeys) {
           this.registerWorkerMessageListener(workerNodeKey, listener)
           this.sendToWorker(workerNodeKey, message)
         }
       })
     } finally {
       if (listener != null) {
-        for (const workerNodeKey of targetWorkerNodeKeys) {
+        for (const workerNodeKey of workerNodeKeys) {
           this.deregisterWorkerMessageListener(workerNodeKey, listener)
         }
       }
index e2c825cfe1140c986df191ffc6ad380c0014183a..2abc13599f52ce29b294a099cca434894c56e996 100644 (file)
@@ -72,7 +72,9 @@ export abstract class AbstractWorkerChoiceStrategy<
   }
 
   /** @inheritDoc */
-  public abstract choose (): number | undefined
+  public abstract choose (
+    workerNodeKeysSet?: ReadonlySet<number>
+  ): number | undefined
 
   /** @inheritDoc */
   public abstract remove (workerNodeKey: number): boolean
@@ -110,6 +112,28 @@ export abstract class AbstractWorkerChoiceStrategy<
     return workerNodeKey
   }
 
+  /**
+   * Gets the next worker node key in a round-robin fashion.
+   * @returns The next worker node key.
+   */
+  protected getRoundRobinNextWorkerNodeKey (): number {
+    return this.nextWorkerNodeKey === this.pool.workerNodes.length - 1
+      ? 0
+      : (this.nextWorkerNodeKey ?? this.previousWorkerNodeKey) + 1
+  }
+
+  /**
+   * Gets the worker node key from a single-element affinity set.
+   * @param workerNodeKeysSet - The worker node keys affinity set.
+   * @returns The worker node key if ready, `undefined` otherwise.
+   */
+  protected getSingleWorkerNodeKey (
+    workerNodeKeysSet: ReadonlySet<number>
+  ): number | undefined {
+    const [workerNodeKey] = workerNodeKeysSet
+    return this.isWorkerNodeReady(workerNodeKey) ? workerNodeKey : undefined
+  }
+
   /**
    * Gets the worker node task ELU.
    * If the task statistics require the average ELU, the average ELU is returned.
@@ -149,6 +173,22 @@ export abstract class AbstractWorkerChoiceStrategy<
       : (this.pool.workerNodes[workerNodeKey]?.usage.waitTime.average ?? 0)
   }
 
+  /**
+   * Whether the worker node is eligible for selection (ready and in affinity set).
+   * @param workerNodeKey - The worker node key.
+   * @param workerNodeKeysSet - The worker node keys affinity set. If undefined, all workers are eligible.
+   * @returns Whether the worker node is eligible.
+   */
+  protected isWorkerNodeEligible (
+    workerNodeKey: number,
+    workerNodeKeysSet?: ReadonlySet<number>
+  ): boolean {
+    return (
+      this.isWorkerNodeReady(workerNodeKey) &&
+      (workerNodeKeysSet == null || workerNodeKeysSet.has(workerNodeKey))
+    )
+  }
+
   /**
    * Whether the worker node is ready or not.
    * @param workerNodeKey - The worker node key.
index 602a29c6687647df91ebbcd98d3d36227d34778f..35f61adbb412daf307c3c5e084947aef924c50d6 100644 (file)
@@ -58,9 +58,9 @@ export class FairShareWorkerChoiceStrategy<
   }
 
   /** @inheritDoc */
-  public choose (): number | undefined {
+  public choose (workerNodeKeysSet?: ReadonlySet<number>): number | undefined {
     this.setPreviousWorkerNodeKey(this.nextWorkerNodeKey)
-    this.nextWorkerNodeKey = this.fairShareNextWorkerNodeKey()
+    this.nextWorkerNodeKey = this.fairShareNextWorkerNodeKey(workerNodeKeysSet)
     return this.nextWorkerNodeKey
   }
 
@@ -111,20 +111,20 @@ export class FairShareWorkerChoiceStrategy<
     )
   }
 
-  private fairShareNextWorkerNodeKey (): number | undefined {
+  private fairShareNextWorkerNodeKey (
+    workerNodeKeysSet?: ReadonlySet<number>
+  ): number | undefined {
+    if (workerNodeKeysSet?.size === 0) {
+      return undefined
+    }
+    if (workerNodeKeysSet?.size === 1) {
+      return this.getSingleWorkerNodeKey(workerNodeKeysSet)
+    }
     const chosenWorkerNodeKey = this.pool.workerNodes.reduce(
       (minWorkerNodeKey: number, workerNode, workerNodeKey, workerNodes) => {
-        if (!this.isWorkerNodeReady(workerNodeKey)) {
+        if (!this.isWorkerNodeEligible(workerNodeKey, workerNodeKeysSet)) {
           return minWorkerNodeKey
         }
-        if (minWorkerNodeKey === -1) {
-          workerNode.strategyData = {
-            ...workerNode.strategyData,
-            virtualTaskEndTimestamp:
-              this.computeWorkerNodeVirtualTaskEndTimestamp(workerNodeKey),
-          }
-          return workerNodeKey
-        }
         if (workerNode.strategyData?.virtualTaskEndTimestamp == null) {
           workerNode.strategyData = {
             ...workerNode.strategyData,
@@ -132,6 +132,9 @@ export class FairShareWorkerChoiceStrategy<
               this.computeWorkerNodeVirtualTaskEndTimestamp(workerNodeKey),
           }
         }
+        if (minWorkerNodeKey === -1) {
+          return workerNodeKey
+        }
         // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
         return workerNode.strategyData.virtualTaskEndTimestamp! <
           // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
index 0eb680db248bbc364a9ecf22caf28153103aaff5..99dcb006c44af46b1abde510832c84c416532173 100644 (file)
@@ -72,7 +72,13 @@ export class InterleavedWeightedRoundRobinWorkerChoiceStrategy<
   }
 
   /** @inheritDoc */
-  public choose (): number | undefined {
+  public choose (workerNodeKeysSet?: ReadonlySet<number>): number | undefined {
+    if (workerNodeKeysSet?.size === 0) {
+      return undefined
+    }
+    if (workerNodeKeysSet?.size === 1) {
+      return this.getSingleWorkerNodeKey(workerNodeKeysSet)
+    }
     for (
       let roundIndex = this.roundId;
       roundIndex < this.roundWeights.length;
@@ -94,7 +100,7 @@ export class InterleavedWeightedRoundRobinWorkerChoiceStrategy<
         // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
         const workerWeight = this.opts!.weights![workerNodeKey]
         if (
-          this.isWorkerNodeReady(workerNodeKey) &&
+          this.isWorkerNodeEligible(workerNodeKey, workerNodeKeysSet) &&
           workerWeight >= this.roundWeights[roundIndex] &&
           this.workerNodeVirtualTaskExecutionTime < workerWeight
         ) {
@@ -106,6 +112,7 @@ export class InterleavedWeightedRoundRobinWorkerChoiceStrategy<
           return this.nextWorkerNodeKey
         }
       }
+      this.workerNodeId = 0
     }
     this.interleavedWeightedRoundRobinNextWorkerNodeId()
     return undefined
index 7bfeca79789986411c34c2b110da581631025e40..792166c632b6d4741dde8304358258d58ec509f1 100644 (file)
@@ -53,9 +53,9 @@ export class LeastBusyWorkerChoiceStrategy<
   }
 
   /** @inheritDoc */
-  public choose (): number | undefined {
+  public choose (workerNodeKeysSet?: ReadonlySet<number>): number | undefined {
     this.setPreviousWorkerNodeKey(this.nextWorkerNodeKey)
-    this.nextWorkerNodeKey = this.leastBusyNextWorkerNodeKey()
+    this.nextWorkerNodeKey = this.leastBusyNextWorkerNodeKey(workerNodeKeysSet)
     return this.nextWorkerNodeKey
   }
 
@@ -74,10 +74,18 @@ export class LeastBusyWorkerChoiceStrategy<
     return true
   }
 
-  private leastBusyNextWorkerNodeKey (): number | undefined {
+  private leastBusyNextWorkerNodeKey (
+    workerNodeKeysSet?: ReadonlySet<number>
+  ): number | undefined {
+    if (workerNodeKeysSet?.size === 0) {
+      return undefined
+    }
+    if (workerNodeKeysSet?.size === 1) {
+      return this.getSingleWorkerNodeKey(workerNodeKeysSet)
+    }
     const chosenWorkerNodeKey = this.pool.workerNodes.reduce(
       (minWorkerNodeKey: number, workerNode, workerNodeKey, workerNodes) => {
-        if (!this.isWorkerNodeReady(workerNodeKey)) {
+        if (!this.isWorkerNodeEligible(workerNodeKey, workerNodeKeysSet)) {
           return minWorkerNodeKey
         }
         if (minWorkerNodeKey === -1) {
index e72d57267bcfe7784268a6c9293cd9883d20480c..1a560f180d17e133781e536ead6c1774ec29fa12 100644 (file)
@@ -53,9 +53,9 @@ export class LeastEluWorkerChoiceStrategy<
   }
 
   /** @inheritDoc */
-  public choose (): number | undefined {
+  public choose (workerNodeKeysSet?: ReadonlySet<number>): number | undefined {
     this.setPreviousWorkerNodeKey(this.nextWorkerNodeKey)
-    this.nextWorkerNodeKey = this.leastEluNextWorkerNodeKey()
+    this.nextWorkerNodeKey = this.leastEluNextWorkerNodeKey(workerNodeKeysSet)
     return this.nextWorkerNodeKey
   }
 
@@ -74,10 +74,18 @@ export class LeastEluWorkerChoiceStrategy<
     return true
   }
 
-  private leastEluNextWorkerNodeKey (): number | undefined {
+  private leastEluNextWorkerNodeKey (
+    workerNodeKeysSet?: ReadonlySet<number>
+  ): number | undefined {
+    if (workerNodeKeysSet?.size === 0) {
+      return undefined
+    }
+    if (workerNodeKeysSet?.size === 1) {
+      return this.getSingleWorkerNodeKey(workerNodeKeysSet)
+    }
     const chosenWorkerNodeKey = this.pool.workerNodes.reduce(
       (minWorkerNodeKey: number, workerNode, workerNodeKey, workerNodes) => {
-        if (!this.isWorkerNodeReady(workerNodeKey)) {
+        if (!this.isWorkerNodeEligible(workerNodeKey, workerNodeKeysSet)) {
           return minWorkerNodeKey
         }
         if (minWorkerNodeKey === -1) {
index 02405f031c7cb3e276efe88f033ac7275fd3e9ae..705d02bd40133cb29c2cb17528153d073d03efed 100644 (file)
@@ -34,9 +34,9 @@ export class LeastUsedWorkerChoiceStrategy<
   }
 
   /** @inheritDoc */
-  public choose (): number | undefined {
+  public choose (workerNodeKeysSet?: ReadonlySet<number>): number | undefined {
     this.setPreviousWorkerNodeKey(this.nextWorkerNodeKey)
-    this.nextWorkerNodeKey = this.leastUsedNextWorkerNodeKey()
+    this.nextWorkerNodeKey = this.leastUsedNextWorkerNodeKey(workerNodeKeysSet)
     return this.nextWorkerNodeKey
   }
 
@@ -55,10 +55,18 @@ export class LeastUsedWorkerChoiceStrategy<
     return true
   }
 
-  private leastUsedNextWorkerNodeKey (): number | undefined {
+  private leastUsedNextWorkerNodeKey (
+    workerNodeKeysSet?: ReadonlySet<number>
+  ): number | undefined {
+    if (workerNodeKeysSet?.size === 0) {
+      return undefined
+    }
+    if (workerNodeKeysSet?.size === 1) {
+      return this.getSingleWorkerNodeKey(workerNodeKeysSet)
+    }
     const chosenWorkerNodeKey = this.pool.workerNodes.reduce(
       (minWorkerNodeKey: number, workerNode, workerNodeKey, workerNodes) => {
-        if (!this.isWorkerNodeReady(workerNodeKey)) {
+        if (!this.isWorkerNodeEligible(workerNodeKey, workerNodeKeysSet)) {
           return minWorkerNodeKey
         }
         if (minWorkerNodeKey === -1) {
index 705be62734b4c55c770e85840fbc01672a53a532..4d64e11bab60114c72275f71f164c41420ffff56 100644 (file)
@@ -35,14 +35,17 @@ export class RoundRobinWorkerChoiceStrategy<
   }
 
   /** @inheritDoc */
-  public choose (): number | undefined {
+  public choose (workerNodeKeysSet?: ReadonlySet<number>): number | undefined {
     this.setPreviousWorkerNodeKey(this.nextWorkerNodeKey)
-    this.roundRobinNextWorkerNodeKey()
-    // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
-    if (!this.isWorkerNodeReady(this.nextWorkerNodeKey!)) {
+    const chosenWorkerNodeKey =
+      this.roundRobinNextWorkerNodeKey(workerNodeKeysSet)
+    if (chosenWorkerNodeKey == null) {
       return undefined
     }
-    return this.checkWorkerNodeKey(this.nextWorkerNodeKey)
+    if (!this.isWorkerNodeEligible(chosenWorkerNodeKey, workerNodeKeysSet)) {
+      return undefined
+    }
+    return this.checkWorkerNodeKey(chosenWorkerNodeKey)
   }
 
   /** @inheritDoc */
@@ -75,11 +78,31 @@ export class RoundRobinWorkerChoiceStrategy<
     return true
   }
 
-  private roundRobinNextWorkerNodeKey (): number | undefined {
-    this.nextWorkerNodeKey =
-      this.nextWorkerNodeKey === this.pool.workerNodes.length - 1
-        ? 0
-        : (this.nextWorkerNodeKey ?? this.previousWorkerNodeKey) + 1
-    return this.nextWorkerNodeKey
+  private roundRobinNextWorkerNodeKey (
+    workerNodeKeysSet?: ReadonlySet<number>
+  ): number | undefined {
+    if (workerNodeKeysSet == null) {
+      this.nextWorkerNodeKey = this.getRoundRobinNextWorkerNodeKey()
+      return this.nextWorkerNodeKey
+    }
+    if (workerNodeKeysSet.size === 0) {
+      return undefined
+    }
+    if (workerNodeKeysSet.size === 1) {
+      const selectedWorkerNodeKey =
+        this.getSingleWorkerNodeKey(workerNodeKeysSet)
+      if (selectedWorkerNodeKey != null) {
+        this.nextWorkerNodeKey = selectedWorkerNodeKey
+      }
+      return selectedWorkerNodeKey
+    }
+    const workerNodesCount = this.pool.workerNodes.length
+    for (let i = 0; i < workerNodesCount; i++) {
+      this.nextWorkerNodeKey = this.getRoundRobinNextWorkerNodeKey()
+      if (workerNodeKeysSet.has(this.nextWorkerNodeKey)) {
+        return this.nextWorkerNodeKey
+      }
+    }
+    return undefined
   }
 }
index 2947c9824528eff739e4d64f8fea053bdf9af0cc..0eedee041d88da58289c18ca2079e96a5148d7de 100644 (file)
@@ -66,11 +66,13 @@ export const Measurements: Readonly<{
 export interface IWorkerChoiceStrategy {
   /**
    * Chooses a worker node in the pool and returns its key.
-   * If no worker nodes are not eligible, `undefined` is returned.
-   * If `undefined` is returned, the caller retry.
+   * If no worker nodes are eligible, `undefined` is returned and the caller retries.
+   * @param workerNodeKeysSet - The worker node keys affinity set. If undefined, all workers are eligible.
    * @returns The worker node key or `undefined`.
    */
-  readonly choose: () => number | undefined
+  readonly choose: (
+    workerNodeKeysSet?: ReadonlySet<number>
+  ) => number | undefined
   /**
    * The worker choice strategy name.
    */
index fdcedc06e2b1444c8b2b7f84463b1a97e47f3b5b..b9e39079126303fca6ba2f91ef27acc7a00d0c33 100644 (file)
@@ -60,14 +60,17 @@ export class WeightedRoundRobinWorkerChoiceStrategy<
   }
 
   /** @inheritDoc */
-  public choose (): number | undefined {
+  public choose (workerNodeKeysSet?: ReadonlySet<number>): number | undefined {
     this.setPreviousWorkerNodeKey(this.nextWorkerNodeKey)
-    this.weightedRoundRobinNextWorkerNodeKey()
-    // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
-    if (!this.isWorkerNodeReady(this.nextWorkerNodeKey!)) {
+    const chosenWorkerNodeKey =
+      this.weightedRoundRobinNextWorkerNodeKey(workerNodeKeysSet)
+    if (chosenWorkerNodeKey == null) {
+      return undefined
+    }
+    if (!this.isWorkerNodeEligible(chosenWorkerNodeKey, workerNodeKeysSet)) {
       return undefined
     }
-    return this.checkWorkerNodeKey(this.nextWorkerNodeKey)
+    return this.checkWorkerNodeKey(chosenWorkerNodeKey)
   }
 
   /** @inheritDoc */
@@ -104,19 +107,64 @@ export class WeightedRoundRobinWorkerChoiceStrategy<
     return true
   }
 
-  private weightedRoundRobinNextWorkerNodeKey (): number | undefined {
+  private findEligibleWorkerNodeKey (
+    workerNodeKeysSet: ReadonlySet<number>
+  ): number | undefined {
+    const workerNodesCount = this.pool.workerNodes.length
+    for (let i = 0; i < workerNodesCount; i++) {
+      this.nextWorkerNodeKey = this.getRoundRobinNextWorkerNodeKey()
+      if (workerNodeKeysSet.has(this.nextWorkerNodeKey)) {
+        return this.nextWorkerNodeKey
+      }
+    }
+    return undefined
+  }
+
+  private weightedRoundRobinNextWorkerNodeKey (
+    workerNodeKeysSet?: ReadonlySet<number>
+  ): number | undefined {
+    if (workerNodeKeysSet == null) {
+      const workerNodeKey = this.nextWorkerNodeKey ?? this.previousWorkerNodeKey
+      // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+      const workerWeight = this.opts!.weights![workerNodeKey]
+      if (this.workerNodeVirtualTaskExecutionTime < workerWeight) {
+        this.nextWorkerNodeKey = workerNodeKey
+        this.workerNodeVirtualTaskExecutionTime +=
+          this.getWorkerNodeTaskWaitTime(workerNodeKey) +
+          this.getWorkerNodeTaskRunTime(workerNodeKey)
+      } else {
+        this.nextWorkerNodeKey = this.getRoundRobinNextWorkerNodeKey()
+        this.workerNodeVirtualTaskExecutionTime = 0
+      }
+      return this.nextWorkerNodeKey
+    }
+    if (workerNodeKeysSet.size === 0) {
+      return undefined
+    }
+    if (workerNodeKeysSet.size === 1) {
+      const selectedWorkerNodeKey =
+        this.getSingleWorkerNodeKey(workerNodeKeysSet)
+      if (selectedWorkerNodeKey != null) {
+        this.nextWorkerNodeKey = selectedWorkerNodeKey
+        this.workerNodeVirtualTaskExecutionTime = 0
+      }
+      return selectedWorkerNodeKey
+    }
     const workerNodeKey = this.nextWorkerNodeKey ?? this.previousWorkerNodeKey
+    if (!workerNodeKeysSet.has(workerNodeKey)) {
+      this.nextWorkerNodeKey = this.findEligibleWorkerNodeKey(workerNodeKeysSet)
+      this.workerNodeVirtualTaskExecutionTime = 0
+      return this.nextWorkerNodeKey
+    }
     // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
     const workerWeight = this.opts!.weights![workerNodeKey]
     if (this.workerNodeVirtualTaskExecutionTime < workerWeight) {
+      this.nextWorkerNodeKey = workerNodeKey
       this.workerNodeVirtualTaskExecutionTime +=
         this.getWorkerNodeTaskWaitTime(workerNodeKey) +
         this.getWorkerNodeTaskRunTime(workerNodeKey)
     } else {
-      this.nextWorkerNodeKey =
-        this.nextWorkerNodeKey === this.pool.workerNodes.length - 1
-          ? 0
-          : workerNodeKey + 1
+      this.nextWorkerNodeKey = this.findEligibleWorkerNodeKey(workerNodeKeysSet)
       this.workerNodeVirtualTaskExecutionTime = 0
     }
     return this.nextWorkerNodeKey
index 8c75842aec592a7def4a20fe15b25698bd0cc73c..0348b7106071184a6788de0ccaab09b82d8d31e5 100644 (file)
@@ -93,19 +93,21 @@ export class WorkerChoiceStrategiesContext<
   }
 
   /**
-   * Executes the given worker choice strategy in the context algorithm.
-   * @param workerChoiceStrategy - The worker choice strategy algorithm to execute.
-   * @defaultValue this.defaultWorkerChoiceStrategy
+   * Executes the given worker choice strategy.
+   * @param workerChoiceStrategy - The worker choice strategy.
+   * @param workerNodeKeysSet - The worker node keys affinity set. If undefined, all workers are eligible.
    * @returns The key of the worker node.
    * @throws {Error} If after computed retries the worker node key is null or undefined.
    */
   public execute (
     workerChoiceStrategy: WorkerChoiceStrategy = this
-      .defaultWorkerChoiceStrategy
+      .defaultWorkerChoiceStrategy,
+    workerNodeKeysSet?: ReadonlySet<number>
   ): number {
     return this.executeStrategy(
       // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
-      this.workerChoiceStrategies.get(workerChoiceStrategy)!
+      this.workerChoiceStrategies.get(workerChoiceStrategy)!,
+      workerNodeKeysSet
     )
   }
 
@@ -240,17 +242,22 @@ export class WorkerChoiceStrategiesContext<
   }
 
   /**
-   * Executes the given worker choice strategy.
-   * @param workerChoiceStrategy - The worker choice strategy.
+   * Executes the given worker choice strategy in the context algorithm.
+   * @param workerChoiceStrategy - The worker choice strategy algorithm to execute.
+   * @param workerNodeKeysSet - The worker node keys affinity set. If undefined, all workers are eligible.
    * @returns The key of the worker node.
    * @throws {Error} If after computed retries the worker node key is null or undefined.
    */
-  private executeStrategy (workerChoiceStrategy: IWorkerChoiceStrategy): number {
-    let workerNodeKey: number | undefined = workerChoiceStrategy.choose()
+  private executeStrategy (
+    workerChoiceStrategy: IWorkerChoiceStrategy,
+    workerNodeKeysSet?: ReadonlySet<number>
+  ): number {
+    let workerNodeKey: number | undefined =
+      workerChoiceStrategy.choose(workerNodeKeysSet)
     let retriesCount = 0
     while (workerNodeKey == null && retriesCount < this.retries) {
       retriesCount++
-      workerNodeKey = workerChoiceStrategy.choose()
+      workerNodeKey = workerChoiceStrategy.choose(workerNodeKeysSet)
     }
     workerChoiceStrategy.retriesCount = retriesCount
     if (workerNodeKey == null) {
index 5c5bbdef85fc8c098351d06b6a1aa7cdd8152a17..a599f1debdb2ce371a104d5023f093cbe18a4be0 100644 (file)
@@ -118,6 +118,48 @@ export const checkValidWorkerChoiceStrategy = (
   }
 }
 
+export const checkValidWorkerNodeKeys = (
+  workerNodeKeys: null | number[] | undefined,
+  maxPoolSize?: number
+): void => {
+  if (workerNodeKeys != null && !Array.isArray(workerNodeKeys)) {
+    throw new TypeError('Invalid worker node keys: must be an array')
+  }
+  if (workerNodeKeys?.length === 0) {
+    throw new RangeError('Invalid worker node keys: must not be an empty array')
+  }
+  if (workerNodeKeys != null) {
+    for (const workerNodeKey of workerNodeKeys) {
+      if (!Number.isSafeInteger(workerNodeKey) || workerNodeKey < 0) {
+        throw new TypeError(
+          `Invalid worker node key '${workerNodeKey.toString()}': must be a non-negative safe integer`
+        )
+      }
+    }
+  }
+  if (
+    workerNodeKeys != null &&
+    new Set(workerNodeKeys).size !== workerNodeKeys.length
+  ) {
+    throw new TypeError('Invalid worker node keys: must not contain duplicates')
+  }
+  if (maxPoolSize != null && workerNodeKeys != null) {
+    if (workerNodeKeys.length > maxPoolSize) {
+      throw new RangeError(
+        'Cannot add a task function with more worker node keys than the maximum number of workers in the pool'
+      )
+    }
+    const invalidWorkerNodeKeys = workerNodeKeys.filter(
+      workerNodeKey => workerNodeKey >= maxPoolSize
+    )
+    if (invalidWorkerNodeKeys.length > 0) {
+      throw new RangeError(
+        `Cannot add a task function with invalid worker node keys: ${invalidWorkerNodeKeys.toString()}. Valid keys are: 0..${(maxPoolSize - 1).toString()}`
+      )
+    }
+  }
+}
+
 export const checkValidTasksQueueOptions = (
   tasksQueueOptions: TasksQueueOptions | undefined
 ): void => {
index b3bdebc394e1f9d958acba50776ad0c64b6c885a..9c17112b14042bbbf7ab82ae39e006af6f1d0dc0 100644 (file)
@@ -160,6 +160,13 @@ export interface TaskFunctionProperties {
    * Task function worker choice strategy.
    */
   readonly strategy?: WorkerChoiceStrategy
+  /**
+   * Task function worker node keys affinity.
+   * Restricts task execution to specified worker nodes by their indices.
+   * Must contain valid indices within [0, pool max size - 1].
+   * If undefined, task can execute on any worker node.
+   */
+  readonly workerNodeKeys?: number[]
 }
 
 /**
index d15ffbb759dad334f0fc003b14fd8bab50a7ae04..c38e119a704de560a0e22c39b05162dc85ec4dc5 100644 (file)
@@ -231,5 +231,8 @@ export const buildTaskFunctionProperties = <Data, Response>(
     ...(taskFunctionObject?.strategy != null && {
       strategy: taskFunctionObject.strategy,
     }),
+    ...(taskFunctionObject?.workerNodeKeys != null && {
+      workerNodeKeys: taskFunctionObject.workerNodeKeys,
+    }),
   }
 }
index 28ab193a8ff0f58cc5558c263956933c82b5ab0a..ac2e55b4605823d713c44d60a2c8109b69767297 100644 (file)
@@ -346,6 +346,9 @@ export abstract class AbstractWorker<
           ...(taskFunctionProperties.strategy != null && {
             strategy: taskFunctionProperties.strategy,
           }),
+          ...(taskFunctionProperties.workerNodeKeys != null && {
+            workerNodeKeys: taskFunctionProperties.workerNodeKeys,
+          }),
         })
         break
       case 'default':
index 7bbcbf9383165c2c16f95726d12f620ddc6068ec..8d02f33bcffe88a1878bd35c64a572de6175bcf9 100644 (file)
@@ -40,6 +40,15 @@ export interface TaskFunctionObject<Data = unknown, Response = unknown> {
    * Task function.
    */
   taskFunction: TaskFunction<Data, Response>
+  /**
+   * Task function worker node keys affinity.
+   * Restricts task execution to specified worker nodes by their indices.
+   * Must contain valid indices within [0, pool max size - 1].
+   * If undefined, task can execute on any worker node.
+   * @remarks `null` is not accepted here. Use `null` only via
+   * {@link TaskFunctionProperties.workerNodeKeys} to clear affinity at runtime.
+   */
+  workerNodeKeys?: number[]
 }
 
 /**
index 28822508f260a75eb69871792b31852038c016b8..34f6d256db2ba9d13337136442dc6d79d351136b 100644 (file)
@@ -3,6 +3,7 @@ import type { TaskFunctionObject } from './task-functions.js'
 import {
   checkValidPriority,
   checkValidWorkerChoiceStrategy,
+  checkValidWorkerNodeKeys,
 } from '../pools/utils.js'
 import { isPlainObject } from '../utils.js'
 import { KillBehaviors, type WorkerOptions } from './worker-options.js'
@@ -50,6 +51,7 @@ export const checkValidTaskFunctionObjectEntry = <
   }
   checkValidPriority(fnObj.priority)
   checkValidWorkerChoiceStrategy(fnObj.strategy)
+  checkValidWorkerNodeKeys(fnObj.workerNodeKeys)
 }
 
 export const checkTaskFunctionName = (name: string): void => {
index 952e599f88cd8249f8a868bea142990e56b544ba..487d6ee27ffc28deacb1fbd8727427d01be31550 100644 (file)
@@ -1717,6 +1717,181 @@ describe('Abstract pool test suite', () => {
     await dynamicThreadPool.destroy()
   })
 
+  it('Verify that addTaskFunction() with workerNodeKeys is working', async () => {
+    const dynamicThreadPool = new DynamicThreadPool(
+      Math.floor(numberOfWorkers / 2),
+      numberOfWorkers,
+      './tests/worker-files/thread/testWorker.mjs'
+    )
+    await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
+    const poolWorkerNodeKeys = [...dynamicThreadPool.workerNodes.keys()]
+
+    // Test with valid workerNodeKeys
+    const echoTaskFunction = data => {
+      return data
+    }
+    await expect(
+      dynamicThreadPool.addTaskFunction('affinityEcho', {
+        taskFunction: echoTaskFunction,
+        workerNodeKeys: [poolWorkerNodeKeys[0]],
+      })
+    ).resolves.toBe(true)
+    expect(dynamicThreadPool.taskFunctions.get('affinityEcho')).toStrictEqual({
+      taskFunction: echoTaskFunction,
+      workerNodeKeys: [poolWorkerNodeKeys[0]],
+    })
+
+    // Test with invalid workerNodeKeys (out of range)
+    await expect(
+      dynamicThreadPool.addTaskFunction('invalidKeys', {
+        taskFunction: () => {},
+        workerNodeKeys: [999],
+      })
+    ).rejects.toThrow(
+      new RangeError(
+        'Cannot add a task function with invalid worker node keys: 999. Valid keys are: 0..1'
+      )
+    )
+
+    // Test with empty array workerNodeKeys
+    await expect(
+      dynamicThreadPool.addTaskFunction('emptyKeys', {
+        taskFunction: () => {},
+        workerNodeKeys: [],
+      })
+    ).rejects.toThrow(
+      new RangeError('Invalid worker node keys: must not be an empty array')
+    )
+
+    // Test exceeding max workers
+    const tooManyKeys = Array.from({ length: numberOfWorkers + 1 }, (_, i) => i)
+    await expect(
+      dynamicThreadPool.addTaskFunction('tooManyKeys', {
+        taskFunction: () => {},
+        workerNodeKeys: tooManyKeys,
+      })
+    ).rejects.toThrow(
+      new RangeError(
+        'Cannot add a task function with more worker node keys than the maximum number of workers in the pool'
+      )
+    )
+
+    // Test with duplicate workerNodeKeys
+    await expect(
+      dynamicThreadPool.addTaskFunction('duplicateKeys', {
+        taskFunction: () => {},
+        workerNodeKeys: [poolWorkerNodeKeys[0], poolWorkerNodeKeys[0]],
+      })
+    ).rejects.toThrow(
+      new TypeError('Invalid worker node keys: must not contain duplicates')
+    )
+
+    // Test with non-integer values
+    await expect(
+      dynamicThreadPool.addTaskFunction('nonIntegerKeys', {
+        taskFunction: () => {},
+        workerNodeKeys: [1.5],
+      })
+    ).rejects.toThrow(
+      new TypeError(
+        "Invalid worker node key '1.5': must be a non-negative safe integer"
+      )
+    )
+
+    // Test with negative values
+    await expect(
+      dynamicThreadPool.addTaskFunction('negativeKeys', {
+        taskFunction: () => {},
+        workerNodeKeys: [-1],
+      })
+    ).rejects.toThrow(
+      new TypeError(
+        "Invalid worker node key '-1': must be a non-negative safe integer"
+      )
+    )
+
+    await dynamicThreadPool.destroy()
+  })
+
+  it('Verify that execute() respects workerNodeKeys affinity', async () => {
+    const dynamicThreadPool = new DynamicThreadPool(
+      Math.floor(numberOfWorkers / 2),
+      numberOfWorkers,
+      './tests/worker-files/thread/testWorker.mjs'
+    )
+    await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
+    const poolWorkerNodeKeys = [...dynamicThreadPool.workerNodes.keys()]
+
+    // Add task function with affinity to first worker only
+    const affinityTaskFunction = data => {
+      return data
+    }
+    await dynamicThreadPool.addTaskFunction('affinityTask', {
+      taskFunction: affinityTaskFunction,
+      workerNodeKeys: [poolWorkerNodeKeys[0]],
+    })
+
+    // Reset task counts to track new executions
+    for (const workerNode of dynamicThreadPool.workerNodes) {
+      workerNode.usage.tasks.executed = 0
+    }
+
+    // Execute multiple tasks with affinity
+    const numTasks = 5
+    const tasks = []
+    for (let i = 0; i < numTasks; i++) {
+      tasks.push(dynamicThreadPool.execute({ test: i }, 'affinityTask'))
+    }
+    await Promise.all(tasks)
+
+    // Verify that only the affinity worker received the tasks
+    const affinityWorkerNode =
+      dynamicThreadPool.workerNodes[poolWorkerNodeKeys[0]]
+    expect(affinityWorkerNode.usage.tasks.executed).toBe(numTasks)
+
+    // Other workers should have 0 tasks from affinityTask
+    for (let i = 0; i < dynamicThreadPool.workerNodes.length; i++) {
+      if (i !== poolWorkerNodeKeys[0]) {
+        expect(dynamicThreadPool.workerNodes[i].usage.tasks.executed).toBe(0)
+      }
+    }
+
+    await dynamicThreadPool.destroy()
+  })
+
+  it('Verify that execute() creates dynamic workers for workerNodeKeys affinity', async () => {
+    const dynamicThreadPool = new DynamicThreadPool(
+      1,
+      4,
+      './tests/worker-files/thread/testWorker.mjs'
+    )
+    await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
+    expect(dynamicThreadPool.workerNodes.length).toBe(1)
+
+    await dynamicThreadPool.addTaskFunction('affinityBeyondMin', {
+      taskFunction: data => data,
+      workerNodeKeys: [2, 3],
+    })
+
+    for (const workerNode of dynamicThreadPool.workerNodes) {
+      workerNode.usage.tasks.executed = 0
+    }
+
+    const tasks = []
+    for (let i = 0; i < 4; i++) {
+      tasks.push(dynamicThreadPool.execute({ test: i }, 'affinityBeyondMin'))
+    }
+    await Promise.all(tasks)
+
+    expect(dynamicThreadPool.workerNodes.length).toBeGreaterThanOrEqual(4)
+    const executedOnAffinity =
+      dynamicThreadPool.workerNodes[2].usage.tasks.executed +
+      dynamicThreadPool.workerNodes[3].usage.tasks.executed
+    expect(executedOnAffinity).toBe(4)
+
+    await dynamicThreadPool.destroy()
+  })
+
   it('Verify that removeTaskFunction() is working', async () => {
     const dynamicThreadPool = new DynamicThreadPool(
       Math.floor(numberOfWorkers / 2),
@@ -1777,9 +1952,9 @@ describe('Abstract pool test suite', () => {
     )
     await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
     expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
-      { name: DEFAULT_TASK_NAME },
-      { name: 'factorial' },
-      { name: 'fibonacci' },
+      { name: DEFAULT_TASK_NAME, priority: 1, workerNodeKeys: [0] },
+      { name: 'factorial', priority: 1, workerNodeKeys: [0] },
+      { name: 'fibonacci', priority: 2, workerNodeKeys: [0, 1] },
       { name: 'jsonIntegerSerialization' },
     ])
     await dynamicThreadPool.destroy()
@@ -1789,9 +1964,9 @@ describe('Abstract pool test suite', () => {
     )
     await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
     expect(fixedClusterPool.listTaskFunctionsProperties()).toStrictEqual([
-      { name: DEFAULT_TASK_NAME },
-      { name: 'factorial' },
-      { name: 'fibonacci' },
+      { name: DEFAULT_TASK_NAME, priority: 1, workerNodeKeys: [0] },
+      { name: 'factorial', priority: 1, workerNodeKeys: [0] },
+      { name: 'fibonacci', priority: 2, workerNodeKeys: [0, 1] },
       { name: 'jsonIntegerSerialization' },
     ])
     await fixedClusterPool.destroy()
@@ -1825,27 +2000,27 @@ describe('Abstract pool test suite', () => {
       )
     )
     expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
-      { name: DEFAULT_TASK_NAME },
-      { name: 'factorial' },
-      { name: 'fibonacci' },
+      { name: DEFAULT_TASK_NAME, priority: 1, workerNodeKeys: [0] },
+      { name: 'factorial', priority: 1, workerNodeKeys: [0] },
+      { name: 'fibonacci', priority: 2, workerNodeKeys: [0, 1] },
       { name: 'jsonIntegerSerialization' },
     ])
     await expect(
       dynamicThreadPool.setDefaultTaskFunction('factorial')
     ).resolves.toBe(true)
     expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
-      { name: DEFAULT_TASK_NAME },
-      { name: 'factorial' },
-      { name: 'fibonacci' },
+      { name: DEFAULT_TASK_NAME, priority: 1, workerNodeKeys: [0] },
+      { name: 'factorial', priority: 1, workerNodeKeys: [0] },
+      { name: 'fibonacci', priority: 2, workerNodeKeys: [0, 1] },
       { name: 'jsonIntegerSerialization' },
     ])
     await expect(
       dynamicThreadPool.setDefaultTaskFunction('fibonacci')
     ).resolves.toBe(true)
     expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
-      { name: DEFAULT_TASK_NAME },
-      { name: 'fibonacci' },
-      { name: 'factorial' },
+      { name: DEFAULT_TASK_NAME, priority: 2, workerNodeKeys: [0, 1] },
+      { name: 'fibonacci', priority: 2, workerNodeKeys: [0, 1] },
+      { name: 'factorial', priority: 1, workerNodeKeys: [0] },
       { name: 'jsonIntegerSerialization' },
     ])
     await dynamicThreadPool.destroy()
@@ -1869,15 +2044,18 @@ describe('Abstract pool test suite', () => {
     expect(pool.info.executingTasks).toBe(0)
     expect(pool.info.executedTasks).toBe(4)
     for (const workerNode of pool.workerNodes) {
+      if (workerNode.info.taskFunctionsProperties == null) {
+        continue
+      }
       expect(workerNode.info.taskFunctionsProperties).toStrictEqual([
-        { name: DEFAULT_TASK_NAME },
-        { name: 'factorial' },
-        { name: 'fibonacci' },
+        { name: DEFAULT_TASK_NAME, priority: 1, workerNodeKeys: [0] },
+        { name: 'factorial', priority: 1, workerNodeKeys: [0] },
+        { name: 'fibonacci', priority: 2, workerNodeKeys: [0, 1] },
         { name: 'jsonIntegerSerialization' },
       ])
       expect(workerNode.taskFunctionsUsage.size).toBe(3)
       expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
-      expect(workerNode.tasksQueue.enablePriority).toBe(false)
+      expect(workerNode.tasksQueue.enablePriority).toBe(true)
       for (const taskFunctionProperties of pool.listTaskFunctionsProperties()) {
         expect(
           workerNode.getTaskFunctionWorkerUsage(taskFunctionProperties.name)
@@ -2023,10 +2201,13 @@ describe('Abstract pool test suite', () => {
     expect(pool.info.executingTasks).toBe(0)
     expect(pool.info.executedTasks).toBe(4)
     for (const workerNode of pool.workerNodes) {
+      if (workerNode.info.taskFunctionsProperties == null) {
+        continue
+      }
       expect(workerNode.info.taskFunctionsProperties).toStrictEqual([
-        { name: DEFAULT_TASK_NAME },
-        { name: 'factorial' },
-        { name: 'fibonacci', priority: -5 },
+        { name: DEFAULT_TASK_NAME, workerNodeKeys: [0] },
+        { name: 'factorial', workerNodeKeys: [0] },
+        { name: 'fibonacci', priority: -5, workerNodeKeys: [0, 1] },
         { name: 'jsonIntegerSerialization' },
       ])
       expect(workerNode.taskFunctionsUsage.size).toBe(3)
index 7bfeb4b006ea54f780ce532027d433c85c3bc9de..7b7b0118350ea46c619a2de2250d2e05e2ca1801 100644 (file)
@@ -2,11 +2,15 @@ import { expect } from '@std/expect'
 import { randomInt } from 'node:crypto'
 
 import { FixedThreadPool } from '../../../lib/index.cjs'
+import { FairShareWorkerChoiceStrategy } from '../../../lib/pools/selection-strategies/fair-share-worker-choice-strategy.cjs'
 import { InterleavedWeightedRoundRobinWorkerChoiceStrategy } from '../../../lib/pools/selection-strategies/interleaved-weighted-round-robin-worker-choice-strategy.cjs'
+import { LeastBusyWorkerChoiceStrategy } from '../../../lib/pools/selection-strategies/least-busy-worker-choice-strategy.cjs'
+import { LeastEluWorkerChoiceStrategy } from '../../../lib/pools/selection-strategies/least-elu-worker-choice-strategy.cjs'
+import { LeastUsedWorkerChoiceStrategy } from '../../../lib/pools/selection-strategies/least-used-worker-choice-strategy.cjs'
+import { RoundRobinWorkerChoiceStrategy } from '../../../lib/pools/selection-strategies/round-robin-worker-choice-strategy.cjs'
 import { WeightedRoundRobinWorkerChoiceStrategy } from '../../../lib/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.cjs'
 
-describe('Weighted round robin strategy worker choice strategy test suite', () => {
-  // const min = 1
+describe('Weighted round robin worker choice strategy test suite', () => {
   const max = 3
   let pool
 
@@ -54,4 +58,118 @@ describe('Weighted round robin strategy worker choice strategy test suite', () =
     expect(strategy.workerNodeId).toBe(0)
     expect(strategy.workerNodeVirtualTaskExecutionTime).toBe(0)
   })
+
+  it('Verify that RoundRobin choose() with empty workerNodeKeysSet returns undefined', () => {
+    const strategy = new RoundRobinWorkerChoiceStrategy(pool)
+    expect(strategy.choose(new Set())).toBe(undefined)
+  })
+
+  it('Verify that RoundRobin choose() with single workerNodeKey returns that key if ready', () => {
+    const strategy = new RoundRobinWorkerChoiceStrategy(pool)
+    expect(strategy.choose(new Set([0]))).toBe(0)
+  })
+
+  it('Verify that RoundRobin choose() respects workerNodeKeys affinity', () => {
+    const strategy = new RoundRobinWorkerChoiceStrategy(pool)
+    const workerNodeKeysSet = new Set([1, 2])
+    expect(workerNodeKeysSet.has(strategy.choose(workerNodeKeysSet))).toBe(true)
+  })
+
+  it('Verify that LeastUsed choose() with empty workerNodeKeysSet returns undefined', () => {
+    const strategy = new LeastUsedWorkerChoiceStrategy(pool)
+    expect(strategy.choose(new Set())).toBe(undefined)
+  })
+
+  it('Verify that LeastUsed choose() with single workerNodeKey returns that key if ready', () => {
+    const strategy = new LeastUsedWorkerChoiceStrategy(pool)
+    expect(strategy.choose(new Set([0]))).toBe(0)
+  })
+
+  it('Verify that LeastUsed choose() respects workerNodeKeys affinity', () => {
+    const strategy = new LeastUsedWorkerChoiceStrategy(pool)
+    const workerNodeKeysSet = new Set([1, 2])
+    expect(workerNodeKeysSet.has(strategy.choose(workerNodeKeysSet))).toBe(true)
+  })
+
+  it('Verify that LeastBusy choose() with empty workerNodeKeysSet returns undefined', () => {
+    const strategy = new LeastBusyWorkerChoiceStrategy(pool)
+    expect(strategy.choose(new Set())).toBe(undefined)
+  })
+
+  it('Verify that LeastBusy choose() with single workerNodeKey returns that key if ready', () => {
+    const strategy = new LeastBusyWorkerChoiceStrategy(pool)
+    expect(strategy.choose(new Set([0]))).toBe(0)
+  })
+
+  it('Verify that LeastBusy choose() respects workerNodeKeys affinity', () => {
+    const strategy = new LeastBusyWorkerChoiceStrategy(pool)
+    const workerNodeKeysSet = new Set([1, 2])
+    expect(workerNodeKeysSet.has(strategy.choose(workerNodeKeysSet))).toBe(true)
+  })
+
+  it('Verify that LeastElu choose() with empty workerNodeKeysSet returns undefined', () => {
+    const strategy = new LeastEluWorkerChoiceStrategy(pool)
+    expect(strategy.choose(new Set())).toBe(undefined)
+  })
+
+  it('Verify that LeastElu choose() with single workerNodeKey returns that key if ready', () => {
+    const strategy = new LeastEluWorkerChoiceStrategy(pool)
+    expect(strategy.choose(new Set([0]))).toBe(0)
+  })
+
+  it('Verify that LeastElu choose() respects workerNodeKeys affinity', () => {
+    const strategy = new LeastEluWorkerChoiceStrategy(pool)
+    const workerNodeKeysSet = new Set([1, 2])
+    expect(workerNodeKeysSet.has(strategy.choose(workerNodeKeysSet))).toBe(true)
+  })
+
+  it('Verify that FairShare choose() with empty workerNodeKeysSet returns undefined', () => {
+    const strategy = new FairShareWorkerChoiceStrategy(pool)
+    expect(strategy.choose(new Set())).toBe(undefined)
+  })
+
+  it('Verify that FairShare choose() with single workerNodeKey returns that key if ready', () => {
+    const strategy = new FairShareWorkerChoiceStrategy(pool)
+    expect(strategy.choose(new Set([0]))).toBe(0)
+  })
+
+  it('Verify that FairShare choose() respects workerNodeKeys affinity', () => {
+    const strategy = new FairShareWorkerChoiceStrategy(pool)
+    const workerNodeKeysSet = new Set([1, 2])
+    expect(workerNodeKeysSet.has(strategy.choose(workerNodeKeysSet))).toBe(true)
+  })
+
+  it('Verify that WeightedRoundRobin choose() with empty workerNodeKeysSet returns undefined', () => {
+    const strategy = new WeightedRoundRobinWorkerChoiceStrategy(pool)
+    expect(strategy.choose(new Set())).toBe(undefined)
+  })
+
+  it('Verify that WeightedRoundRobin choose() with single workerNodeKey returns that key if ready', () => {
+    const strategy = new WeightedRoundRobinWorkerChoiceStrategy(pool)
+    expect(strategy.choose(new Set([0]))).toBe(0)
+  })
+
+  it('Verify that WeightedRoundRobin choose() respects workerNodeKeys affinity', () => {
+    const strategy = new WeightedRoundRobinWorkerChoiceStrategy(pool)
+    const workerNodeKeysSet = new Set([1, 2])
+    const result = strategy.choose(workerNodeKeysSet)
+    expect(result === undefined || workerNodeKeysSet.has(result)).toBe(true)
+  })
+
+  it('Verify that InterleavedWeightedRoundRobin choose() with empty workerNodeKeysSet returns undefined', () => {
+    const strategy = new InterleavedWeightedRoundRobinWorkerChoiceStrategy(pool)
+    expect(strategy.choose(new Set())).toBe(undefined)
+  })
+
+  it('Verify that InterleavedWeightedRoundRobin choose() with single workerNodeKey returns that key if ready', () => {
+    const strategy = new InterleavedWeightedRoundRobinWorkerChoiceStrategy(pool)
+    expect(strategy.choose(new Set([0]))).toBe(0)
+  })
+
+  it('Verify that InterleavedWeightedRoundRobin choose() respects workerNodeKeys affinity', () => {
+    const strategy = new InterleavedWeightedRoundRobinWorkerChoiceStrategy(pool)
+    const workerNodeKeysSet = new Set([1, 2])
+    const result = strategy.choose(workerNodeKeysSet)
+    expect(result === undefined || workerNodeKeysSet.has(result)).toBe(true)
+  })
 })
index 8bec9a952112e22678c2906d13fee0f4a1bdb815..3e132dcabf27243093a93223f520b340921a8031 100644 (file)
@@ -1,5 +1,5 @@
 import { expect } from '@std/expect'
-import { restore, stub } from 'sinon'
+import { createStubInstance, restore, stub } from 'sinon'
 
 import {
   DynamicThreadPool,
@@ -504,4 +504,104 @@ describe('Worker choice strategies context test suite', () => {
         .median
     ).toBe(true)
   })
+
+  it('Verify that execute() passes workerNodeKeysSet to strategy choose()', () => {
+    const workerChoiceStrategiesContext = new WorkerChoiceStrategiesContext(
+      fixedPool
+    )
+    const workerChoiceStrategyStub = createStubInstance(
+      RoundRobinWorkerChoiceStrategy,
+      {
+        choose: stub().returns(1),
+      }
+    )
+    workerChoiceStrategiesContext.workerChoiceStrategies.set(
+      workerChoiceStrategiesContext.defaultWorkerChoiceStrategy,
+      workerChoiceStrategyStub
+    )
+    const workerNodeKeys = [1, 2]
+    const chosenWorkerKey = workerChoiceStrategiesContext.execute(
+      undefined,
+      new Set(workerNodeKeys)
+    )
+    expect(
+      workerChoiceStrategiesContext.workerChoiceStrategies.get(
+        workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
+      ).choose.calledOnce
+    ).toBe(true)
+    // Verify it was called with a Set containing the same elements
+    const callArg = workerChoiceStrategiesContext.workerChoiceStrategies.get(
+      workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
+    ).choose.firstCall.args[0]
+    expect(callArg).toBeInstanceOf(Set)
+    expect([...callArg]).toStrictEqual(workerNodeKeys)
+    expect(chosenWorkerKey).toBe(1)
+  })
+
+  it('Verify that execute() with workerNodeKeys affinity filters worker selection', () => {
+    const workerChoiceStrategiesContext = new WorkerChoiceStrategiesContext(
+      fixedPool
+    )
+    // Stub returns the first valid key from workerNodeKeysSet
+    const workerChoiceStrategyStub = createStubInstance(
+      RoundRobinWorkerChoiceStrategy,
+      {
+        choose: stub().callsFake(workerNodeKeysSet => {
+          if (workerNodeKeysSet != null && workerNodeKeysSet.size > 0) {
+            return [...workerNodeKeysSet][0]
+          }
+          return 0
+        }),
+      }
+    )
+    workerChoiceStrategiesContext.workerChoiceStrategies.set(
+      workerChoiceStrategiesContext.defaultWorkerChoiceStrategy,
+      workerChoiceStrategyStub
+    )
+    // Test with specific worker node affinity
+    const workerNodeKeys = [2]
+    const chosenWorkerKey = workerChoiceStrategiesContext.execute(
+      undefined,
+      new Set(workerNodeKeys)
+    )
+    expect(chosenWorkerKey).toBe(2)
+    // Verify it was called with a Set containing the same elements
+    const callArg = workerChoiceStrategiesContext.workerChoiceStrategies.get(
+      workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
+    ).choose.firstCall.args[0]
+    expect(callArg).toBeInstanceOf(Set)
+    expect([...callArg]).toStrictEqual(workerNodeKeys)
+  })
+
+  it('Verify that execute() retries with workerNodes until valid worker found', () => {
+    const workerChoiceStrategiesContext = new WorkerChoiceStrategiesContext(
+      fixedPool
+    )
+    const workerChoiceStrategyStub = createStubInstance(
+      RoundRobinWorkerChoiceStrategy,
+      {
+        choose: stub()
+          .onCall(0)
+          .returns(undefined)
+          .onCall(1)
+          .returns(undefined)
+          .onCall(2)
+          .returns(1),
+      }
+    )
+    workerChoiceStrategiesContext.workerChoiceStrategies.set(
+      workerChoiceStrategiesContext.defaultWorkerChoiceStrategy,
+      workerChoiceStrategyStub
+    )
+    const chosenWorkerKey = workerChoiceStrategiesContext.execute(
+      undefined,
+      new Set([0, 1, 2])
+    )
+    expect(
+      workerChoiceStrategiesContext.workerChoiceStrategies.get(
+        workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
+      ).choose.callCount
+    ).toBe(3)
+    expect(chosenWorkerKey).toBe(1)
+  })
 })
index dc3964160735f331e6a099e0ed62c61f16961060..003dfbb8454f5df68793eba847707940fd44c6ba 100644 (file)
@@ -5,6 +5,7 @@ import { Worker as ThreadWorker } from 'node:worker_threads'
 import { CircularBuffer } from '../../lib/circular-buffer.cjs'
 import { WorkerTypes } from '../../lib/index.cjs'
 import {
+  checkValidWorkerNodeKeys,
   createWorker,
   DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS,
   getDefaultTasksQueueOptions,
@@ -159,4 +160,92 @@ describe('Pool utils test suite', () => {
       type: WorkerTypes.cluster,
     })
   })
+
+  it('Verify checkValidWorkerNodeKeys() behavior', () => {
+    // Should not throw for undefined
+    expect(() => checkValidWorkerNodeKeys(undefined)).not.toThrow()
+    // Should not throw for null
+    expect(() => checkValidWorkerNodeKeys(null)).not.toThrow()
+    // Should not throw for valid array with elements
+    expect(() => checkValidWorkerNodeKeys([0, 1, 2])).not.toThrow()
+    // Should throw TypeError for non-array
+    expect(() => checkValidWorkerNodeKeys('not an array')).toThrow(
+      new TypeError('Invalid worker node keys: must be an array')
+    )
+    expect(() => checkValidWorkerNodeKeys(123)).toThrow(
+      new TypeError('Invalid worker node keys: must be an array')
+    )
+    expect(() => checkValidWorkerNodeKeys({})).toThrow(
+      new TypeError('Invalid worker node keys: must be an array')
+    )
+    // Should throw RangeError for empty array
+    expect(() => checkValidWorkerNodeKeys([])).toThrow(
+      new RangeError('Invalid worker node keys: must not be an empty array')
+    )
+    // Should throw TypeError for non-integer values
+    expect(() => checkValidWorkerNodeKeys([1.5])).toThrow(
+      new TypeError(
+        "Invalid worker node key '1.5': must be a non-negative safe integer"
+      )
+    )
+    expect(() => checkValidWorkerNodeKeys([0, 1.5, 2])).toThrow(
+      new TypeError(
+        "Invalid worker node key '1.5': must be a non-negative safe integer"
+      )
+    )
+    // Should throw TypeError for negative values
+    expect(() => checkValidWorkerNodeKeys([-1])).toThrow(
+      new TypeError(
+        "Invalid worker node key '-1': must be a non-negative safe integer"
+      )
+    )
+    expect(() => checkValidWorkerNodeKeys([0, -1, 2])).toThrow(
+      new TypeError(
+        "Invalid worker node key '-1': must be a non-negative safe integer"
+      )
+    )
+    // Should throw TypeError for NaN
+    expect(() => checkValidWorkerNodeKeys([NaN])).toThrow(
+      new TypeError(
+        "Invalid worker node key 'NaN': must be a non-negative safe integer"
+      )
+    )
+    // Should throw TypeError for Infinity
+    expect(() => checkValidWorkerNodeKeys([Infinity])).toThrow(
+      new TypeError(
+        "Invalid worker node key 'Infinity': must be a non-negative safe integer"
+      )
+    )
+    expect(() => checkValidWorkerNodeKeys([-Infinity])).toThrow(
+      new TypeError(
+        "Invalid worker node key '-Infinity': must be a non-negative safe integer"
+      )
+    )
+    // Should throw TypeError for duplicate keys
+    expect(() => checkValidWorkerNodeKeys([0, 0, 1])).toThrow(
+      new TypeError('Invalid worker node keys: must not contain duplicates')
+    )
+    expect(() => checkValidWorkerNodeKeys([1, 2, 1])).toThrow(
+      new TypeError('Invalid worker node keys: must not contain duplicates')
+    )
+    // Should not throw with maxPoolSize when keys are in range
+    expect(() => checkValidWorkerNodeKeys([0, 1, 2], 4)).not.toThrow()
+    // Should throw RangeError when keys exceed maxPoolSize count
+    expect(() => checkValidWorkerNodeKeys([0, 1, 2, 3, 4], 4)).toThrow(
+      new RangeError(
+        'Cannot add a task function with more worker node keys than the maximum number of workers in the pool'
+      )
+    )
+    // Should throw RangeError when a key is out of range
+    expect(() => checkValidWorkerNodeKeys([0, 4], 4)).toThrow(
+      new RangeError(
+        'Cannot add a task function with invalid worker node keys: 4. Valid keys are: 0..3'
+      )
+    )
+    expect(() => checkValidWorkerNodeKeys([999], 4)).toThrow(
+      new RangeError(
+        'Cannot add a task function with invalid worker node keys: 999. Valid keys are: 0..3'
+      )
+    )
+  })
 })
index 6f1ebadc53dcb1e5141959ecaf088e34eb4b3d6a..3ce4f7c53e2f6c8f9a2e2a75747d3d2eda03499d 100644 (file)
@@ -8,8 +8,16 @@ const {
 
 module.exports = new ClusterWorker(
   {
-    factorial: data => factorial(data.n),
-    fibonacci: data => fibonacci(data.n),
+    factorial: {
+      priority: 1,
+      taskFunction: data => factorial(data.n),
+      workerNodeKeys: [0],
+    },
+    fibonacci: {
+      priority: 2,
+      taskFunction: data => fibonacci(data.n),
+      workerNodeKeys: [0, 1],
+    },
     jsonIntegerSerialization: data => jsonIntegerSerialization(data.n),
   },
   {
index e94097284be807ff504b9e555316c1d26d3afbc4..f78d3180a97cf4990b97ebf751f362297487c37f 100644 (file)
@@ -7,8 +7,16 @@ import {
 
 export default new ThreadWorker(
   {
-    factorial: data => factorial(data.n),
-    fibonacci: data => fibonacci(data.n),
+    factorial: {
+      priority: 1,
+      taskFunction: data => factorial(data.n),
+      workerNodeKeys: [0],
+    },
+    fibonacci: {
+      priority: 2,
+      taskFunction: data => fibonacci(data.n),
+      workerNodeKeys: [0, 1],
+    },
     jsonIntegerSerialization: data => jsonIntegerSerialization(data.n),
   },
   {
index 566f436b269d31ce279acee3764250829c8ff1bb..70251dd283879b416b18dfdfc8f6218c6a9a3754 100644 (file)
@@ -7,8 +7,15 @@ import {
 
 export default new ThreadWorker(
   {
-    factorial: { taskFunction: data => factorial(data.n) },
-    fibonacci: { priority: -5, taskFunction: data => fibonacci(data.n) },
+    factorial: {
+      taskFunction: data => factorial(data.n),
+      workerNodeKeys: [0],
+    },
+    fibonacci: {
+      priority: -5,
+      taskFunction: data => fibonacci(data.n),
+      workerNodeKeys: [0, 1],
+    },
     jsonIntegerSerialization: {
       taskFunction: data => jsonIntegerSerialization(data.n),
     },