"formatter": {
"prettier": {
"disabled": true
+ },
+ "biome": {
+ "disabled": true
}
}
}
- Support for sync and async task function :white_check_mark:
- Support for abortable task function :white_check_mark:
- Support for multiple task functions with per task function queuing priority and tasks distribution strategy :white_check_mark:
+- Support for worker node affinity per task function :white_check_mark:
- Support for task functions [CRUD](https://en.wikipedia.org/wiki/Create,_read,_update_and_delete) operations at runtime :white_check_mark:
- General guidelines on pool choice :white_check_mark:
- Error handling out of the box :white_check_mark:
### `pool.addTaskFunction(name, fn)`
`name` (mandatory) The task function name.
-`fn` (mandatory) The task function `(data?: Data) => Response | Promise<Response>` or task function object `{ taskFunction: (data?: Data) => Response | Promise<Response>, priority?: number, strategy?: WorkerChoiceStrategy }`. Priority range is the same as Unix nice levels.
+`fn` (mandatory) The task function `(data?: Data) => Response | Promise<Response>` or task function object `{ taskFunction: (data?: Data) => Response | Promise<Response>, priority?: number, strategy?: WorkerChoiceStrategy, workerNodeKeys?: number[] }`. Priority range is the same as Unix nice levels. `workerNodeKeys` is an array of worker node keys to restrict task execution to specific workers (worker node affinity).
+
+#### Worker Node Affinity Notes
+
+- Worker node keys are validated at registration time against the pool's maximum size (`maximumNumberOfWorkers ?? minimumNumberOfWorkers`).
+- The number of worker node keys cannot exceed the pool's maximum size (`maximumNumberOfWorkers ?? minimumNumberOfWorkers`).
+- In dynamic pools, you can reference worker node keys up to the maximum pool size. Workers that don't exist yet are automatically created when a task targeting them is executed.
+- At execution time, if no specified worker is ready, selection retries until one becomes available or retries are exhausted.
This method is available on both pool implementations and returns a boolean promise.
### `class YourWorker extends ThreadWorker/ClusterWorker`
-`taskFunctions` (mandatory) The task function or task functions object `Record<string, (data?: Data) => Response | Promise<Response> | { taskFunction: (data?: Data) => Response | Promise<Response>, priority?: number, strategy?: WorkerChoiceStrategy }>` that you want to execute on the worker. Priority range is the same as Unix nice levels.
+`taskFunctions` (mandatory) The task function or task functions object `Record<string, (data?: Data) => Response | Promise<Response> | { taskFunction: (data?: Data) => Response | Promise<Response>, priority?: number, strategy?: WorkerChoiceStrategy, workerNodeKeys?: number[] }>` that you want to execute on the worker. Priority range is the same as Unix nice levels. `workerNodeKeys` is an array of worker node keys to restrict task execution to specific workers (worker node affinity). See [Worker Node Affinity Notes](#worker-node-affinity-notes) above for validation behavior.
`opts` (optional) An object with these properties:
- `killBehavior` (optional) - Dictates if your worker will be deleted in case a task is active on it.
#### `YourWorker.addTaskFunction(name, fn)`
`name` (mandatory) The task function name.
-`fn` (mandatory) The task function `(data?: Data) => Response | Promise<Response>` or task function object `{ taskFunction: (data?: Data) => Response | Promise<Response>, priority?: number, strategy?: WorkerChoiceStrategy }`. Priority range is the same as Unix nice levels.
+`fn` (mandatory) The task function `(data?: Data) => Response | Promise<Response>` or task function object `{ taskFunction: (data?: Data) => Response | Promise<Response>, priority?: number, strategy?: WorkerChoiceStrategy, workerNodeKeys?: number[] }`. Priority range is the same as Unix nice levels. `workerNodeKeys` is an array of worker node keys to restrict task execution to specific workers (worker node affinity). See [Worker Node Affinity Notes](#worker-node-affinity-notes) above for validation behavior.
This method is available on both worker implementations and returns `{ status: boolean, error?: Error }`.
checkValidPriority,
checkValidTasksQueueOptions,
checkValidWorkerChoiceStrategy,
+ checkValidWorkerNodeKeys,
getDefaultTasksQueueOptions,
updateEluWorkerUsage,
updateRunTimeWorkerUsage,
}
checkValidPriority(fn.priority)
checkValidWorkerChoiceStrategy(fn.strategy)
+ checkValidWorkerNodeKeys(
+ fn.workerNodeKeys,
+ this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
+ )
const opResult = await this.sendTaskFunctionOperationToWorkers({
taskFunction: fn.taskFunction.toString(),
taskFunctionOperation: 'add',
* @returns The chosen worker node key.
*/
private chooseWorkerNode (name?: string): number {
- if (this.shallCreateDynamicWorker()) {
+ const workerNodeKeysSet = this.getTaskFunctionWorkerNodeKeysSet(name)
+ if (workerNodeKeysSet != null) {
+ const maxPoolSize =
+ this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
+ const targetSize = max(...workerNodeKeysSet) + 1
+ while (
+ this.started &&
+ !this.destroying &&
+ this.workerNodes.length < targetSize &&
+ this.workerNodes.length < maxPoolSize
+ ) {
+ this.createAndSetupDynamicWorkerNode()
+ }
+ } else if (this.shallCreateDynamicWorker()) {
const workerNodeKey = this.createAndSetupDynamicWorkerNode()
if (
this.workerChoiceStrategiesContext?.getPolicy().dynamicWorkerUsage ===
}
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
return this.workerChoiceStrategiesContext!.execute(
- this.getTaskFunctionWorkerChoiceStrategy(name)
+ this.getTaskFunctionWorkerChoiceStrategy(name),
+ workerNodeKeysSet
)
}
)?.strategy
}
+ /**
+ * Gets task function worker node keys affinity set, if any.
+ * @param name - The task function name.
+ * @returns The task function worker node keys affinity set, or `undefined` if not defined.
+ */
+ private readonly getTaskFunctionWorkerNodeKeysSet = (
+ name?: string
+ ): ReadonlySet<number> | undefined => {
+ name = name ?? DEFAULT_TASK_NAME
+ const taskFunctionsProperties = this.listTaskFunctionsProperties()
+ if (name === DEFAULT_TASK_NAME) {
+ name = taskFunctionsProperties[1]?.name
+ }
+ const workerNodeKeys = taskFunctionsProperties.find(
+ (taskFunctionProperties: TaskFunctionProperties) =>
+ taskFunctionProperties.name === name
+ )?.workerNodeKeys
+ return workerNodeKeys != null ? new Set(workerNodeKeys) : undefined
+ }
+
private getTasksQueuePriority (): boolean {
return this.listTaskFunctionsProperties().some(
taskFunctionProperties => taskFunctionProperties.priority != null
private async sendTaskFunctionOperationToWorkers (
message: MessageValue<Data>
): Promise<boolean> {
- const targetWorkerNodeKeys = [...this.workerNodes.keys()]
- if (targetWorkerNodeKeys.length === 0) {
+ const targetWorkerNodeCount = this.workerNodes.length
+ if (targetWorkerNodeCount === 0) {
return true
}
const responsesReceived: MessageValue<Response>[] = []
reject: (reason?: unknown) => void
): void => {
this.checkMessageWorkerId(message)
+ const workerNodeKey = this.getWorkerNodeKeyByWorkerId(message.workerId)
if (
message.taskFunctionOperationStatus != null &&
- targetWorkerNodeKeys.includes(
- this.getWorkerNodeKeyByWorkerId(message.workerId)
- )
+ workerNodeKey >= 0 &&
+ workerNodeKey < targetWorkerNodeCount
) {
responsesReceived.push(message)
- if (responsesReceived.length >= targetWorkerNodeKeys.length) {
+ if (responsesReceived.length >= targetWorkerNodeCount) {
if (
responsesReceived.every(
msg => msg.taskFunctionOperationStatus === true
}
}
let listener: ((message: MessageValue<Response>) => void) | undefined
+ const workerNodeKeys = [...this.workerNodes.keys()]
try {
return await new Promise<boolean>((resolve, reject) => {
listener = (message: MessageValue<Response>) => {
taskFunctionOperationsListener(message, resolve, reject)
}
- for (const workerNodeKey of targetWorkerNodeKeys) {
+ for (const workerNodeKey of workerNodeKeys) {
this.registerWorkerMessageListener(workerNodeKey, listener)
this.sendToWorker(workerNodeKey, message)
}
})
} finally {
if (listener != null) {
- for (const workerNodeKey of targetWorkerNodeKeys) {
+ for (const workerNodeKey of workerNodeKeys) {
this.deregisterWorkerMessageListener(workerNodeKey, listener)
}
}
}
/** @inheritDoc */
- public abstract choose (): number | undefined
+ public abstract choose (
+ workerNodeKeysSet?: ReadonlySet<number>
+ ): number | undefined
/** @inheritDoc */
public abstract remove (workerNodeKey: number): boolean
return workerNodeKey
}
+ /**
+ * Gets the next worker node key in a round-robin fashion.
+ * @returns The next worker node key.
+ */
+ protected getRoundRobinNextWorkerNodeKey (): number {
+ return this.nextWorkerNodeKey === this.pool.workerNodes.length - 1
+ ? 0
+ : (this.nextWorkerNodeKey ?? this.previousWorkerNodeKey) + 1
+ }
+
+ /**
+ * Gets the worker node key from a single-element affinity set.
+ * @param workerNodeKeysSet - The worker node keys affinity set.
+ * @returns The worker node key if ready, `undefined` otherwise.
+ */
+ protected getSingleWorkerNodeKey (
+ workerNodeKeysSet: ReadonlySet<number>
+ ): number | undefined {
+ const [workerNodeKey] = workerNodeKeysSet
+ return this.isWorkerNodeReady(workerNodeKey) ? workerNodeKey : undefined
+ }
+
/**
* Gets the worker node task ELU.
* If the task statistics require the average ELU, the average ELU is returned.
: (this.pool.workerNodes[workerNodeKey]?.usage.waitTime.average ?? 0)
}
+ /**
+ * Whether the worker node is eligible for selection (ready and in affinity set).
+ * @param workerNodeKey - The worker node key.
+ * @param workerNodeKeysSet - The worker node keys affinity set. If undefined, all workers are eligible.
+ * @returns Whether the worker node is eligible.
+ */
+ protected isWorkerNodeEligible (
+ workerNodeKey: number,
+ workerNodeKeysSet?: ReadonlySet<number>
+ ): boolean {
+ return (
+ this.isWorkerNodeReady(workerNodeKey) &&
+ (workerNodeKeysSet == null || workerNodeKeysSet.has(workerNodeKey))
+ )
+ }
+
/**
* Whether the worker node is ready or not.
* @param workerNodeKey - The worker node key.
}
/** @inheritDoc */
- public choose (): number | undefined {
+ public choose (workerNodeKeysSet?: ReadonlySet<number>): number | undefined {
this.setPreviousWorkerNodeKey(this.nextWorkerNodeKey)
- this.nextWorkerNodeKey = this.fairShareNextWorkerNodeKey()
+ this.nextWorkerNodeKey = this.fairShareNextWorkerNodeKey(workerNodeKeysSet)
return this.nextWorkerNodeKey
}
)
}
- private fairShareNextWorkerNodeKey (): number | undefined {
+ private fairShareNextWorkerNodeKey (
+ workerNodeKeysSet?: ReadonlySet<number>
+ ): number | undefined {
+ if (workerNodeKeysSet?.size === 0) {
+ return undefined
+ }
+ if (workerNodeKeysSet?.size === 1) {
+ return this.getSingleWorkerNodeKey(workerNodeKeysSet)
+ }
const chosenWorkerNodeKey = this.pool.workerNodes.reduce(
(minWorkerNodeKey: number, workerNode, workerNodeKey, workerNodes) => {
- if (!this.isWorkerNodeReady(workerNodeKey)) {
+ if (!this.isWorkerNodeEligible(workerNodeKey, workerNodeKeysSet)) {
return minWorkerNodeKey
}
- if (minWorkerNodeKey === -1) {
- workerNode.strategyData = {
- ...workerNode.strategyData,
- virtualTaskEndTimestamp:
- this.computeWorkerNodeVirtualTaskEndTimestamp(workerNodeKey),
- }
- return workerNodeKey
- }
if (workerNode.strategyData?.virtualTaskEndTimestamp == null) {
workerNode.strategyData = {
...workerNode.strategyData,
this.computeWorkerNodeVirtualTaskEndTimestamp(workerNodeKey),
}
}
+ if (minWorkerNodeKey === -1) {
+ return workerNodeKey
+ }
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
return workerNode.strategyData.virtualTaskEndTimestamp! <
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
}
/** @inheritDoc */
- public choose (): number | undefined {
+ public choose (workerNodeKeysSet?: ReadonlySet<number>): number | undefined {
+ if (workerNodeKeysSet?.size === 0) {
+ return undefined
+ }
+ if (workerNodeKeysSet?.size === 1) {
+ return this.getSingleWorkerNodeKey(workerNodeKeysSet)
+ }
for (
let roundIndex = this.roundId;
roundIndex < this.roundWeights.length;
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const workerWeight = this.opts!.weights![workerNodeKey]
if (
- this.isWorkerNodeReady(workerNodeKey) &&
+ this.isWorkerNodeEligible(workerNodeKey, workerNodeKeysSet) &&
workerWeight >= this.roundWeights[roundIndex] &&
this.workerNodeVirtualTaskExecutionTime < workerWeight
) {
return this.nextWorkerNodeKey
}
}
+ this.workerNodeId = 0
}
this.interleavedWeightedRoundRobinNextWorkerNodeId()
return undefined
}
/** @inheritDoc */
- public choose (): number | undefined {
+ public choose (workerNodeKeysSet?: ReadonlySet<number>): number | undefined {
this.setPreviousWorkerNodeKey(this.nextWorkerNodeKey)
- this.nextWorkerNodeKey = this.leastBusyNextWorkerNodeKey()
+ this.nextWorkerNodeKey = this.leastBusyNextWorkerNodeKey(workerNodeKeysSet)
return this.nextWorkerNodeKey
}
return true
}
- private leastBusyNextWorkerNodeKey (): number | undefined {
+ private leastBusyNextWorkerNodeKey (
+ workerNodeKeysSet?: ReadonlySet<number>
+ ): number | undefined {
+ if (workerNodeKeysSet?.size === 0) {
+ return undefined
+ }
+ if (workerNodeKeysSet?.size === 1) {
+ return this.getSingleWorkerNodeKey(workerNodeKeysSet)
+ }
const chosenWorkerNodeKey = this.pool.workerNodes.reduce(
(minWorkerNodeKey: number, workerNode, workerNodeKey, workerNodes) => {
- if (!this.isWorkerNodeReady(workerNodeKey)) {
+ if (!this.isWorkerNodeEligible(workerNodeKey, workerNodeKeysSet)) {
return minWorkerNodeKey
}
if (minWorkerNodeKey === -1) {
}
/** @inheritDoc */
- public choose (): number | undefined {
+ public choose (workerNodeKeysSet?: ReadonlySet<number>): number | undefined {
this.setPreviousWorkerNodeKey(this.nextWorkerNodeKey)
- this.nextWorkerNodeKey = this.leastEluNextWorkerNodeKey()
+ this.nextWorkerNodeKey = this.leastEluNextWorkerNodeKey(workerNodeKeysSet)
return this.nextWorkerNodeKey
}
return true
}
- private leastEluNextWorkerNodeKey (): number | undefined {
+ private leastEluNextWorkerNodeKey (
+ workerNodeKeysSet?: ReadonlySet<number>
+ ): number | undefined {
+ if (workerNodeKeysSet?.size === 0) {
+ return undefined
+ }
+ if (workerNodeKeysSet?.size === 1) {
+ return this.getSingleWorkerNodeKey(workerNodeKeysSet)
+ }
const chosenWorkerNodeKey = this.pool.workerNodes.reduce(
(minWorkerNodeKey: number, workerNode, workerNodeKey, workerNodes) => {
- if (!this.isWorkerNodeReady(workerNodeKey)) {
+ if (!this.isWorkerNodeEligible(workerNodeKey, workerNodeKeysSet)) {
return minWorkerNodeKey
}
if (minWorkerNodeKey === -1) {
}
/** @inheritDoc */
- public choose (): number | undefined {
+ public choose (workerNodeKeysSet?: ReadonlySet<number>): number | undefined {
this.setPreviousWorkerNodeKey(this.nextWorkerNodeKey)
- this.nextWorkerNodeKey = this.leastUsedNextWorkerNodeKey()
+ this.nextWorkerNodeKey = this.leastUsedNextWorkerNodeKey(workerNodeKeysSet)
return this.nextWorkerNodeKey
}
return true
}
- private leastUsedNextWorkerNodeKey (): number | undefined {
+ private leastUsedNextWorkerNodeKey (
+ workerNodeKeysSet?: ReadonlySet<number>
+ ): number | undefined {
+ if (workerNodeKeysSet?.size === 0) {
+ return undefined
+ }
+ if (workerNodeKeysSet?.size === 1) {
+ return this.getSingleWorkerNodeKey(workerNodeKeysSet)
+ }
const chosenWorkerNodeKey = this.pool.workerNodes.reduce(
(minWorkerNodeKey: number, workerNode, workerNodeKey, workerNodes) => {
- if (!this.isWorkerNodeReady(workerNodeKey)) {
+ if (!this.isWorkerNodeEligible(workerNodeKey, workerNodeKeysSet)) {
return minWorkerNodeKey
}
if (minWorkerNodeKey === -1) {
}
/** @inheritDoc */
- public choose (): number | undefined {
+ public choose (workerNodeKeysSet?: ReadonlySet<number>): number | undefined {
this.setPreviousWorkerNodeKey(this.nextWorkerNodeKey)
- this.roundRobinNextWorkerNodeKey()
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- if (!this.isWorkerNodeReady(this.nextWorkerNodeKey!)) {
+ const chosenWorkerNodeKey =
+ this.roundRobinNextWorkerNodeKey(workerNodeKeysSet)
+ if (chosenWorkerNodeKey == null) {
return undefined
}
- return this.checkWorkerNodeKey(this.nextWorkerNodeKey)
+ if (!this.isWorkerNodeEligible(chosenWorkerNodeKey, workerNodeKeysSet)) {
+ return undefined
+ }
+ return this.checkWorkerNodeKey(chosenWorkerNodeKey)
}
/** @inheritDoc */
return true
}
- private roundRobinNextWorkerNodeKey (): number | undefined {
- this.nextWorkerNodeKey =
- this.nextWorkerNodeKey === this.pool.workerNodes.length - 1
- ? 0
- : (this.nextWorkerNodeKey ?? this.previousWorkerNodeKey) + 1
- return this.nextWorkerNodeKey
+ private roundRobinNextWorkerNodeKey (
+ workerNodeKeysSet?: ReadonlySet<number>
+ ): number | undefined {
+ if (workerNodeKeysSet == null) {
+ this.nextWorkerNodeKey = this.getRoundRobinNextWorkerNodeKey()
+ return this.nextWorkerNodeKey
+ }
+ if (workerNodeKeysSet.size === 0) {
+ return undefined
+ }
+ if (workerNodeKeysSet.size === 1) {
+ const selectedWorkerNodeKey =
+ this.getSingleWorkerNodeKey(workerNodeKeysSet)
+ if (selectedWorkerNodeKey != null) {
+ this.nextWorkerNodeKey = selectedWorkerNodeKey
+ }
+ return selectedWorkerNodeKey
+ }
+ const workerNodesCount = this.pool.workerNodes.length
+ for (let i = 0; i < workerNodesCount; i++) {
+ this.nextWorkerNodeKey = this.getRoundRobinNextWorkerNodeKey()
+ if (workerNodeKeysSet.has(this.nextWorkerNodeKey)) {
+ return this.nextWorkerNodeKey
+ }
+ }
+ return undefined
}
}
export interface IWorkerChoiceStrategy {
/**
* Chooses a worker node in the pool and returns its key.
- * If no worker nodes are not eligible, `undefined` is returned.
- * If `undefined` is returned, the caller retry.
+ * If no worker nodes are eligible, `undefined` is returned and the caller retries.
+ * @param workerNodeKeysSet - The worker node keys affinity set. If undefined, all workers are eligible.
* @returns The worker node key or `undefined`.
*/
- readonly choose: () => number | undefined
+ readonly choose: (
+ workerNodeKeysSet?: ReadonlySet<number>
+ ) => number | undefined
/**
* The worker choice strategy name.
*/
}
/** @inheritDoc */
- public choose (): number | undefined {
+ public choose (workerNodeKeysSet?: ReadonlySet<number>): number | undefined {
this.setPreviousWorkerNodeKey(this.nextWorkerNodeKey)
- this.weightedRoundRobinNextWorkerNodeKey()
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- if (!this.isWorkerNodeReady(this.nextWorkerNodeKey!)) {
+ const chosenWorkerNodeKey =
+ this.weightedRoundRobinNextWorkerNodeKey(workerNodeKeysSet)
+ if (chosenWorkerNodeKey == null) {
+ return undefined
+ }
+ if (!this.isWorkerNodeEligible(chosenWorkerNodeKey, workerNodeKeysSet)) {
return undefined
}
- return this.checkWorkerNodeKey(this.nextWorkerNodeKey)
+ return this.checkWorkerNodeKey(chosenWorkerNodeKey)
}
/** @inheritDoc */
return true
}
- private weightedRoundRobinNextWorkerNodeKey (): number | undefined {
+ private findEligibleWorkerNodeKey (
+ workerNodeKeysSet: ReadonlySet<number>
+ ): number | undefined {
+ const workerNodesCount = this.pool.workerNodes.length
+ for (let i = 0; i < workerNodesCount; i++) {
+ this.nextWorkerNodeKey = this.getRoundRobinNextWorkerNodeKey()
+ if (workerNodeKeysSet.has(this.nextWorkerNodeKey)) {
+ return this.nextWorkerNodeKey
+ }
+ }
+ return undefined
+ }
+
+ private weightedRoundRobinNextWorkerNodeKey (
+ workerNodeKeysSet?: ReadonlySet<number>
+ ): number | undefined {
+ if (workerNodeKeysSet == null) {
+ const workerNodeKey = this.nextWorkerNodeKey ?? this.previousWorkerNodeKey
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ const workerWeight = this.opts!.weights![workerNodeKey]
+ if (this.workerNodeVirtualTaskExecutionTime < workerWeight) {
+ this.nextWorkerNodeKey = workerNodeKey
+ this.workerNodeVirtualTaskExecutionTime +=
+ this.getWorkerNodeTaskWaitTime(workerNodeKey) +
+ this.getWorkerNodeTaskRunTime(workerNodeKey)
+ } else {
+ this.nextWorkerNodeKey = this.getRoundRobinNextWorkerNodeKey()
+ this.workerNodeVirtualTaskExecutionTime = 0
+ }
+ return this.nextWorkerNodeKey
+ }
+ if (workerNodeKeysSet.size === 0) {
+ return undefined
+ }
+ if (workerNodeKeysSet.size === 1) {
+ const selectedWorkerNodeKey =
+ this.getSingleWorkerNodeKey(workerNodeKeysSet)
+ if (selectedWorkerNodeKey != null) {
+ this.nextWorkerNodeKey = selectedWorkerNodeKey
+ this.workerNodeVirtualTaskExecutionTime = 0
+ }
+ return selectedWorkerNodeKey
+ }
const workerNodeKey = this.nextWorkerNodeKey ?? this.previousWorkerNodeKey
+ if (!workerNodeKeysSet.has(workerNodeKey)) {
+ this.nextWorkerNodeKey = this.findEligibleWorkerNodeKey(workerNodeKeysSet)
+ this.workerNodeVirtualTaskExecutionTime = 0
+ return this.nextWorkerNodeKey
+ }
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const workerWeight = this.opts!.weights![workerNodeKey]
if (this.workerNodeVirtualTaskExecutionTime < workerWeight) {
+ this.nextWorkerNodeKey = workerNodeKey
this.workerNodeVirtualTaskExecutionTime +=
this.getWorkerNodeTaskWaitTime(workerNodeKey) +
this.getWorkerNodeTaskRunTime(workerNodeKey)
} else {
- this.nextWorkerNodeKey =
- this.nextWorkerNodeKey === this.pool.workerNodes.length - 1
- ? 0
- : workerNodeKey + 1
+ this.nextWorkerNodeKey = this.findEligibleWorkerNodeKey(workerNodeKeysSet)
this.workerNodeVirtualTaskExecutionTime = 0
}
return this.nextWorkerNodeKey
}
/**
- * Executes the given worker choice strategy in the context algorithm.
- * @param workerChoiceStrategy - The worker choice strategy algorithm to execute.
- * @defaultValue this.defaultWorkerChoiceStrategy
+ * Executes the given worker choice strategy.
+ * @param workerChoiceStrategy - The worker choice strategy.
+ * @param workerNodeKeysSet - The worker node keys affinity set. If undefined, all workers are eligible.
* @returns The key of the worker node.
* @throws {Error} If after computed retries the worker node key is null or undefined.
*/
public execute (
workerChoiceStrategy: WorkerChoiceStrategy = this
- .defaultWorkerChoiceStrategy
+ .defaultWorkerChoiceStrategy,
+ workerNodeKeysSet?: ReadonlySet<number>
): number {
return this.executeStrategy(
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- this.workerChoiceStrategies.get(workerChoiceStrategy)!
+ this.workerChoiceStrategies.get(workerChoiceStrategy)!,
+ workerNodeKeysSet
)
}
}
/**
- * Executes the given worker choice strategy.
- * @param workerChoiceStrategy - The worker choice strategy.
+ * Executes the given worker choice strategy in the context algorithm.
+ * @param workerChoiceStrategy - The worker choice strategy algorithm to execute.
+ * @param workerNodeKeysSet - The worker node keys affinity set. If undefined, all workers are eligible.
* @returns The key of the worker node.
* @throws {Error} If after computed retries the worker node key is null or undefined.
*/
- private executeStrategy (workerChoiceStrategy: IWorkerChoiceStrategy): number {
- let workerNodeKey: number | undefined = workerChoiceStrategy.choose()
+ private executeStrategy (
+ workerChoiceStrategy: IWorkerChoiceStrategy,
+ workerNodeKeysSet?: ReadonlySet<number>
+ ): number {
+ let workerNodeKey: number | undefined =
+ workerChoiceStrategy.choose(workerNodeKeysSet)
let retriesCount = 0
while (workerNodeKey == null && retriesCount < this.retries) {
retriesCount++
- workerNodeKey = workerChoiceStrategy.choose()
+ workerNodeKey = workerChoiceStrategy.choose(workerNodeKeysSet)
}
workerChoiceStrategy.retriesCount = retriesCount
if (workerNodeKey == null) {
}
}
+export const checkValidWorkerNodeKeys = (
+ workerNodeKeys: null | number[] | undefined,
+ maxPoolSize?: number
+): void => {
+ if (workerNodeKeys != null && !Array.isArray(workerNodeKeys)) {
+ throw new TypeError('Invalid worker node keys: must be an array')
+ }
+ if (workerNodeKeys?.length === 0) {
+ throw new RangeError('Invalid worker node keys: must not be an empty array')
+ }
+ if (workerNodeKeys != null) {
+ for (const workerNodeKey of workerNodeKeys) {
+ if (!Number.isSafeInteger(workerNodeKey) || workerNodeKey < 0) {
+ throw new TypeError(
+ `Invalid worker node key '${workerNodeKey.toString()}': must be a non-negative safe integer`
+ )
+ }
+ }
+ }
+ if (
+ workerNodeKeys != null &&
+ new Set(workerNodeKeys).size !== workerNodeKeys.length
+ ) {
+ throw new TypeError('Invalid worker node keys: must not contain duplicates')
+ }
+ if (maxPoolSize != null && workerNodeKeys != null) {
+ if (workerNodeKeys.length > maxPoolSize) {
+ throw new RangeError(
+ 'Cannot add a task function with more worker node keys than the maximum number of workers in the pool'
+ )
+ }
+ const invalidWorkerNodeKeys = workerNodeKeys.filter(
+ workerNodeKey => workerNodeKey >= maxPoolSize
+ )
+ if (invalidWorkerNodeKeys.length > 0) {
+ throw new RangeError(
+ `Cannot add a task function with invalid worker node keys: ${invalidWorkerNodeKeys.toString()}. Valid keys are: 0..${(maxPoolSize - 1).toString()}`
+ )
+ }
+ }
+}
+
export const checkValidTasksQueueOptions = (
tasksQueueOptions: TasksQueueOptions | undefined
): void => {
* Task function worker choice strategy.
*/
readonly strategy?: WorkerChoiceStrategy
+ /**
+ * Task function worker node keys affinity.
+ * Restricts task execution to specified worker nodes by their indices.
+ * Must contain valid indices within [0, pool max size - 1].
+ * If undefined, task can execute on any worker node.
+ */
+ readonly workerNodeKeys?: number[]
}
/**
...(taskFunctionObject?.strategy != null && {
strategy: taskFunctionObject.strategy,
}),
+ ...(taskFunctionObject?.workerNodeKeys != null && {
+ workerNodeKeys: taskFunctionObject.workerNodeKeys,
+ }),
}
}
...(taskFunctionProperties.strategy != null && {
strategy: taskFunctionProperties.strategy,
}),
+ ...(taskFunctionProperties.workerNodeKeys != null && {
+ workerNodeKeys: taskFunctionProperties.workerNodeKeys,
+ }),
})
break
case 'default':
* Task function.
*/
taskFunction: TaskFunction<Data, Response>
+ /**
+ * Task function worker node keys affinity.
+ * Restricts task execution to specified worker nodes by their indices.
+ * Must contain valid indices within [0, pool max size - 1].
+ * If undefined, task can execute on any worker node.
+ * @remarks `null` is not accepted here. Use `null` only via
+ * {@link TaskFunctionProperties.workerNodeKeys} to clear affinity at runtime.
+ */
+ workerNodeKeys?: number[]
}
/**
import {
checkValidPriority,
checkValidWorkerChoiceStrategy,
+ checkValidWorkerNodeKeys,
} from '../pools/utils.js'
import { isPlainObject } from '../utils.js'
import { KillBehaviors, type WorkerOptions } from './worker-options.js'
}
checkValidPriority(fnObj.priority)
checkValidWorkerChoiceStrategy(fnObj.strategy)
+ checkValidWorkerNodeKeys(fnObj.workerNodeKeys)
}
export const checkTaskFunctionName = (name: string): void => {
await dynamicThreadPool.destroy()
})
+ it('Verify that addTaskFunction() with workerNodeKeys is working', async () => {
+ const dynamicThreadPool = new DynamicThreadPool(
+ Math.floor(numberOfWorkers / 2),
+ numberOfWorkers,
+ './tests/worker-files/thread/testWorker.mjs'
+ )
+ await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
+ const poolWorkerNodeKeys = [...dynamicThreadPool.workerNodes.keys()]
+
+ // Test with valid workerNodeKeys
+ const echoTaskFunction = data => {
+ return data
+ }
+ await expect(
+ dynamicThreadPool.addTaskFunction('affinityEcho', {
+ taskFunction: echoTaskFunction,
+ workerNodeKeys: [poolWorkerNodeKeys[0]],
+ })
+ ).resolves.toBe(true)
+ expect(dynamicThreadPool.taskFunctions.get('affinityEcho')).toStrictEqual({
+ taskFunction: echoTaskFunction,
+ workerNodeKeys: [poolWorkerNodeKeys[0]],
+ })
+
+ // Test with invalid workerNodeKeys (out of range)
+ await expect(
+ dynamicThreadPool.addTaskFunction('invalidKeys', {
+ taskFunction: () => {},
+ workerNodeKeys: [999],
+ })
+ ).rejects.toThrow(
+ new RangeError(
+ 'Cannot add a task function with invalid worker node keys: 999. Valid keys are: 0..1'
+ )
+ )
+
+ // Test with empty array workerNodeKeys
+ await expect(
+ dynamicThreadPool.addTaskFunction('emptyKeys', {
+ taskFunction: () => {},
+ workerNodeKeys: [],
+ })
+ ).rejects.toThrow(
+ new RangeError('Invalid worker node keys: must not be an empty array')
+ )
+
+ // Test exceeding max workers
+ const tooManyKeys = Array.from({ length: numberOfWorkers + 1 }, (_, i) => i)
+ await expect(
+ dynamicThreadPool.addTaskFunction('tooManyKeys', {
+ taskFunction: () => {},
+ workerNodeKeys: tooManyKeys,
+ })
+ ).rejects.toThrow(
+ new RangeError(
+ 'Cannot add a task function with more worker node keys than the maximum number of workers in the pool'
+ )
+ )
+
+ // Test with duplicate workerNodeKeys
+ await expect(
+ dynamicThreadPool.addTaskFunction('duplicateKeys', {
+ taskFunction: () => {},
+ workerNodeKeys: [poolWorkerNodeKeys[0], poolWorkerNodeKeys[0]],
+ })
+ ).rejects.toThrow(
+ new TypeError('Invalid worker node keys: must not contain duplicates')
+ )
+
+ // Test with non-integer values
+ await expect(
+ dynamicThreadPool.addTaskFunction('nonIntegerKeys', {
+ taskFunction: () => {},
+ workerNodeKeys: [1.5],
+ })
+ ).rejects.toThrow(
+ new TypeError(
+ "Invalid worker node key '1.5': must be a non-negative safe integer"
+ )
+ )
+
+ // Test with negative values
+ await expect(
+ dynamicThreadPool.addTaskFunction('negativeKeys', {
+ taskFunction: () => {},
+ workerNodeKeys: [-1],
+ })
+ ).rejects.toThrow(
+ new TypeError(
+ "Invalid worker node key '-1': must be a non-negative safe integer"
+ )
+ )
+
+ await dynamicThreadPool.destroy()
+ })
+
+ it('Verify that execute() respects workerNodeKeys affinity', async () => {
+ const dynamicThreadPool = new DynamicThreadPool(
+ Math.floor(numberOfWorkers / 2),
+ numberOfWorkers,
+ './tests/worker-files/thread/testWorker.mjs'
+ )
+ await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
+ const poolWorkerNodeKeys = [...dynamicThreadPool.workerNodes.keys()]
+
+ // Add task function with affinity to first worker only
+ const affinityTaskFunction = data => {
+ return data
+ }
+ await dynamicThreadPool.addTaskFunction('affinityTask', {
+ taskFunction: affinityTaskFunction,
+ workerNodeKeys: [poolWorkerNodeKeys[0]],
+ })
+
+ // Reset task counts to track new executions
+ for (const workerNode of dynamicThreadPool.workerNodes) {
+ workerNode.usage.tasks.executed = 0
+ }
+
+ // Execute multiple tasks with affinity
+ const numTasks = 5
+ const tasks = []
+ for (let i = 0; i < numTasks; i++) {
+ tasks.push(dynamicThreadPool.execute({ test: i }, 'affinityTask'))
+ }
+ await Promise.all(tasks)
+
+ // Verify that only the affinity worker received the tasks
+ const affinityWorkerNode =
+ dynamicThreadPool.workerNodes[poolWorkerNodeKeys[0]]
+ expect(affinityWorkerNode.usage.tasks.executed).toBe(numTasks)
+
+ // Other workers should have 0 tasks from affinityTask
+ for (let i = 0; i < dynamicThreadPool.workerNodes.length; i++) {
+ if (i !== poolWorkerNodeKeys[0]) {
+ expect(dynamicThreadPool.workerNodes[i].usage.tasks.executed).toBe(0)
+ }
+ }
+
+ await dynamicThreadPool.destroy()
+ })
+
+ it('Verify that execute() creates dynamic workers for workerNodeKeys affinity', async () => {
+ const dynamicThreadPool = new DynamicThreadPool(
+ 1,
+ 4,
+ './tests/worker-files/thread/testWorker.mjs'
+ )
+ await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
+ expect(dynamicThreadPool.workerNodes.length).toBe(1)
+
+ await dynamicThreadPool.addTaskFunction('affinityBeyondMin', {
+ taskFunction: data => data,
+ workerNodeKeys: [2, 3],
+ })
+
+ for (const workerNode of dynamicThreadPool.workerNodes) {
+ workerNode.usage.tasks.executed = 0
+ }
+
+ const tasks = []
+ for (let i = 0; i < 4; i++) {
+ tasks.push(dynamicThreadPool.execute({ test: i }, 'affinityBeyondMin'))
+ }
+ await Promise.all(tasks)
+
+ expect(dynamicThreadPool.workerNodes.length).toBeGreaterThanOrEqual(4)
+ const executedOnAffinity =
+ dynamicThreadPool.workerNodes[2].usage.tasks.executed +
+ dynamicThreadPool.workerNodes[3].usage.tasks.executed
+ expect(executedOnAffinity).toBe(4)
+
+ await dynamicThreadPool.destroy()
+ })
+
it('Verify that removeTaskFunction() is working', async () => {
const dynamicThreadPool = new DynamicThreadPool(
Math.floor(numberOfWorkers / 2),
)
await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
- { name: DEFAULT_TASK_NAME },
- { name: 'factorial' },
- { name: 'fibonacci' },
+ { name: DEFAULT_TASK_NAME, priority: 1, workerNodeKeys: [0] },
+ { name: 'factorial', priority: 1, workerNodeKeys: [0] },
+ { name: 'fibonacci', priority: 2, workerNodeKeys: [0, 1] },
{ name: 'jsonIntegerSerialization' },
])
await dynamicThreadPool.destroy()
)
await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
expect(fixedClusterPool.listTaskFunctionsProperties()).toStrictEqual([
- { name: DEFAULT_TASK_NAME },
- { name: 'factorial' },
- { name: 'fibonacci' },
+ { name: DEFAULT_TASK_NAME, priority: 1, workerNodeKeys: [0] },
+ { name: 'factorial', priority: 1, workerNodeKeys: [0] },
+ { name: 'fibonacci', priority: 2, workerNodeKeys: [0, 1] },
{ name: 'jsonIntegerSerialization' },
])
await fixedClusterPool.destroy()
)
)
expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
- { name: DEFAULT_TASK_NAME },
- { name: 'factorial' },
- { name: 'fibonacci' },
+ { name: DEFAULT_TASK_NAME, priority: 1, workerNodeKeys: [0] },
+ { name: 'factorial', priority: 1, workerNodeKeys: [0] },
+ { name: 'fibonacci', priority: 2, workerNodeKeys: [0, 1] },
{ name: 'jsonIntegerSerialization' },
])
await expect(
dynamicThreadPool.setDefaultTaskFunction('factorial')
).resolves.toBe(true)
expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
- { name: DEFAULT_TASK_NAME },
- { name: 'factorial' },
- { name: 'fibonacci' },
+ { name: DEFAULT_TASK_NAME, priority: 1, workerNodeKeys: [0] },
+ { name: 'factorial', priority: 1, workerNodeKeys: [0] },
+ { name: 'fibonacci', priority: 2, workerNodeKeys: [0, 1] },
{ name: 'jsonIntegerSerialization' },
])
await expect(
dynamicThreadPool.setDefaultTaskFunction('fibonacci')
).resolves.toBe(true)
expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
- { name: DEFAULT_TASK_NAME },
- { name: 'fibonacci' },
- { name: 'factorial' },
+ { name: DEFAULT_TASK_NAME, priority: 2, workerNodeKeys: [0, 1] },
+ { name: 'fibonacci', priority: 2, workerNodeKeys: [0, 1] },
+ { name: 'factorial', priority: 1, workerNodeKeys: [0] },
{ name: 'jsonIntegerSerialization' },
])
await dynamicThreadPool.destroy()
expect(pool.info.executingTasks).toBe(0)
expect(pool.info.executedTasks).toBe(4)
for (const workerNode of pool.workerNodes) {
+ if (workerNode.info.taskFunctionsProperties == null) {
+ continue
+ }
expect(workerNode.info.taskFunctionsProperties).toStrictEqual([
- { name: DEFAULT_TASK_NAME },
- { name: 'factorial' },
- { name: 'fibonacci' },
+ { name: DEFAULT_TASK_NAME, priority: 1, workerNodeKeys: [0] },
+ { name: 'factorial', priority: 1, workerNodeKeys: [0] },
+ { name: 'fibonacci', priority: 2, workerNodeKeys: [0, 1] },
{ name: 'jsonIntegerSerialization' },
])
expect(workerNode.taskFunctionsUsage.size).toBe(3)
expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
- expect(workerNode.tasksQueue.enablePriority).toBe(false)
+ expect(workerNode.tasksQueue.enablePriority).toBe(true)
for (const taskFunctionProperties of pool.listTaskFunctionsProperties()) {
expect(
workerNode.getTaskFunctionWorkerUsage(taskFunctionProperties.name)
expect(pool.info.executingTasks).toBe(0)
expect(pool.info.executedTasks).toBe(4)
for (const workerNode of pool.workerNodes) {
+ if (workerNode.info.taskFunctionsProperties == null) {
+ continue
+ }
expect(workerNode.info.taskFunctionsProperties).toStrictEqual([
- { name: DEFAULT_TASK_NAME },
- { name: 'factorial' },
- { name: 'fibonacci', priority: -5 },
+ { name: DEFAULT_TASK_NAME, workerNodeKeys: [0] },
+ { name: 'factorial', workerNodeKeys: [0] },
+ { name: 'fibonacci', priority: -5, workerNodeKeys: [0, 1] },
{ name: 'jsonIntegerSerialization' },
])
expect(workerNode.taskFunctionsUsage.size).toBe(3)
import { randomInt } from 'node:crypto'
import { FixedThreadPool } from '../../../lib/index.cjs'
+import { FairShareWorkerChoiceStrategy } from '../../../lib/pools/selection-strategies/fair-share-worker-choice-strategy.cjs'
import { InterleavedWeightedRoundRobinWorkerChoiceStrategy } from '../../../lib/pools/selection-strategies/interleaved-weighted-round-robin-worker-choice-strategy.cjs'
+import { LeastBusyWorkerChoiceStrategy } from '../../../lib/pools/selection-strategies/least-busy-worker-choice-strategy.cjs'
+import { LeastEluWorkerChoiceStrategy } from '../../../lib/pools/selection-strategies/least-elu-worker-choice-strategy.cjs'
+import { LeastUsedWorkerChoiceStrategy } from '../../../lib/pools/selection-strategies/least-used-worker-choice-strategy.cjs'
+import { RoundRobinWorkerChoiceStrategy } from '../../../lib/pools/selection-strategies/round-robin-worker-choice-strategy.cjs'
import { WeightedRoundRobinWorkerChoiceStrategy } from '../../../lib/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.cjs'
-describe('Weighted round robin strategy worker choice strategy test suite', () => {
- // const min = 1
+describe('Weighted round robin worker choice strategy test suite', () => {
const max = 3
let pool
expect(strategy.workerNodeId).toBe(0)
expect(strategy.workerNodeVirtualTaskExecutionTime).toBe(0)
})
+
+ it('Verify that RoundRobin choose() with empty workerNodeKeysSet returns undefined', () => {
+ const strategy = new RoundRobinWorkerChoiceStrategy(pool)
+ expect(strategy.choose(new Set())).toBe(undefined)
+ })
+
+ it('Verify that RoundRobin choose() with single workerNodeKey returns that key if ready', () => {
+ const strategy = new RoundRobinWorkerChoiceStrategy(pool)
+ expect(strategy.choose(new Set([0]))).toBe(0)
+ })
+
+ it('Verify that RoundRobin choose() respects workerNodeKeys affinity', () => {
+ const strategy = new RoundRobinWorkerChoiceStrategy(pool)
+ const workerNodeKeysSet = new Set([1, 2])
+ expect(workerNodeKeysSet.has(strategy.choose(workerNodeKeysSet))).toBe(true)
+ })
+
+ it('Verify that LeastUsed choose() with empty workerNodeKeysSet returns undefined', () => {
+ const strategy = new LeastUsedWorkerChoiceStrategy(pool)
+ expect(strategy.choose(new Set())).toBe(undefined)
+ })
+
+ it('Verify that LeastUsed choose() with single workerNodeKey returns that key if ready', () => {
+ const strategy = new LeastUsedWorkerChoiceStrategy(pool)
+ expect(strategy.choose(new Set([0]))).toBe(0)
+ })
+
+ it('Verify that LeastUsed choose() respects workerNodeKeys affinity', () => {
+ const strategy = new LeastUsedWorkerChoiceStrategy(pool)
+ const workerNodeKeysSet = new Set([1, 2])
+ expect(workerNodeKeysSet.has(strategy.choose(workerNodeKeysSet))).toBe(true)
+ })
+
+ it('Verify that LeastBusy choose() with empty workerNodeKeysSet returns undefined', () => {
+ const strategy = new LeastBusyWorkerChoiceStrategy(pool)
+ expect(strategy.choose(new Set())).toBe(undefined)
+ })
+
+ it('Verify that LeastBusy choose() with single workerNodeKey returns that key if ready', () => {
+ const strategy = new LeastBusyWorkerChoiceStrategy(pool)
+ expect(strategy.choose(new Set([0]))).toBe(0)
+ })
+
+ it('Verify that LeastBusy choose() respects workerNodeKeys affinity', () => {
+ const strategy = new LeastBusyWorkerChoiceStrategy(pool)
+ const workerNodeKeysSet = new Set([1, 2])
+ expect(workerNodeKeysSet.has(strategy.choose(workerNodeKeysSet))).toBe(true)
+ })
+
+ it('Verify that LeastElu choose() with empty workerNodeKeysSet returns undefined', () => {
+ const strategy = new LeastEluWorkerChoiceStrategy(pool)
+ expect(strategy.choose(new Set())).toBe(undefined)
+ })
+
+ it('Verify that LeastElu choose() with single workerNodeKey returns that key if ready', () => {
+ const strategy = new LeastEluWorkerChoiceStrategy(pool)
+ expect(strategy.choose(new Set([0]))).toBe(0)
+ })
+
+ it('Verify that LeastElu choose() respects workerNodeKeys affinity', () => {
+ const strategy = new LeastEluWorkerChoiceStrategy(pool)
+ const workerNodeKeysSet = new Set([1, 2])
+ expect(workerNodeKeysSet.has(strategy.choose(workerNodeKeysSet))).toBe(true)
+ })
+
+ it('Verify that FairShare choose() with empty workerNodeKeysSet returns undefined', () => {
+ const strategy = new FairShareWorkerChoiceStrategy(pool)
+ expect(strategy.choose(new Set())).toBe(undefined)
+ })
+
+ it('Verify that FairShare choose() with single workerNodeKey returns that key if ready', () => {
+ const strategy = new FairShareWorkerChoiceStrategy(pool)
+ expect(strategy.choose(new Set([0]))).toBe(0)
+ })
+
+ it('Verify that FairShare choose() respects workerNodeKeys affinity', () => {
+ const strategy = new FairShareWorkerChoiceStrategy(pool)
+ const workerNodeKeysSet = new Set([1, 2])
+ expect(workerNodeKeysSet.has(strategy.choose(workerNodeKeysSet))).toBe(true)
+ })
+
+ it('Verify that WeightedRoundRobin choose() with empty workerNodeKeysSet returns undefined', () => {
+ const strategy = new WeightedRoundRobinWorkerChoiceStrategy(pool)
+ expect(strategy.choose(new Set())).toBe(undefined)
+ })
+
+ it('Verify that WeightedRoundRobin choose() with single workerNodeKey returns that key if ready', () => {
+ const strategy = new WeightedRoundRobinWorkerChoiceStrategy(pool)
+ expect(strategy.choose(new Set([0]))).toBe(0)
+ })
+
+ it('Verify that WeightedRoundRobin choose() respects workerNodeKeys affinity', () => {
+ const strategy = new WeightedRoundRobinWorkerChoiceStrategy(pool)
+ const workerNodeKeysSet = new Set([1, 2])
+ const result = strategy.choose(workerNodeKeysSet)
+ expect(result === undefined || workerNodeKeysSet.has(result)).toBe(true)
+ })
+
+ it('Verify that InterleavedWeightedRoundRobin choose() with empty workerNodeKeysSet returns undefined', () => {
+ const strategy = new InterleavedWeightedRoundRobinWorkerChoiceStrategy(pool)
+ expect(strategy.choose(new Set())).toBe(undefined)
+ })
+
+ it('Verify that InterleavedWeightedRoundRobin choose() with single workerNodeKey returns that key if ready', () => {
+ const strategy = new InterleavedWeightedRoundRobinWorkerChoiceStrategy(pool)
+ expect(strategy.choose(new Set([0]))).toBe(0)
+ })
+
+ it('Verify that InterleavedWeightedRoundRobin choose() respects workerNodeKeys affinity', () => {
+ const strategy = new InterleavedWeightedRoundRobinWorkerChoiceStrategy(pool)
+ const workerNodeKeysSet = new Set([1, 2])
+ const result = strategy.choose(workerNodeKeysSet)
+ expect(result === undefined || workerNodeKeysSet.has(result)).toBe(true)
+ })
})
import { expect } from '@std/expect'
-import { restore, stub } from 'sinon'
+import { createStubInstance, restore, stub } from 'sinon'
import {
DynamicThreadPool,
.median
).toBe(true)
})
+
+ it('Verify that execute() passes workerNodeKeysSet to strategy choose()', () => {
+ const workerChoiceStrategiesContext = new WorkerChoiceStrategiesContext(
+ fixedPool
+ )
+ const workerChoiceStrategyStub = createStubInstance(
+ RoundRobinWorkerChoiceStrategy,
+ {
+ choose: stub().returns(1),
+ }
+ )
+ workerChoiceStrategiesContext.workerChoiceStrategies.set(
+ workerChoiceStrategiesContext.defaultWorkerChoiceStrategy,
+ workerChoiceStrategyStub
+ )
+ const workerNodeKeys = [1, 2]
+ const chosenWorkerKey = workerChoiceStrategiesContext.execute(
+ undefined,
+ new Set(workerNodeKeys)
+ )
+ expect(
+ workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
+ ).choose.calledOnce
+ ).toBe(true)
+ // Verify it was called with a Set containing the same elements
+ const callArg = workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
+ ).choose.firstCall.args[0]
+ expect(callArg).toBeInstanceOf(Set)
+ expect([...callArg]).toStrictEqual(workerNodeKeys)
+ expect(chosenWorkerKey).toBe(1)
+ })
+
+ it('Verify that execute() with workerNodeKeys affinity filters worker selection', () => {
+ const workerChoiceStrategiesContext = new WorkerChoiceStrategiesContext(
+ fixedPool
+ )
+ // Stub returns the first valid key from workerNodeKeysSet
+ const workerChoiceStrategyStub = createStubInstance(
+ RoundRobinWorkerChoiceStrategy,
+ {
+ choose: stub().callsFake(workerNodeKeysSet => {
+ if (workerNodeKeysSet != null && workerNodeKeysSet.size > 0) {
+ return [...workerNodeKeysSet][0]
+ }
+ return 0
+ }),
+ }
+ )
+ workerChoiceStrategiesContext.workerChoiceStrategies.set(
+ workerChoiceStrategiesContext.defaultWorkerChoiceStrategy,
+ workerChoiceStrategyStub
+ )
+ // Test with specific worker node affinity
+ const workerNodeKeys = [2]
+ const chosenWorkerKey = workerChoiceStrategiesContext.execute(
+ undefined,
+ new Set(workerNodeKeys)
+ )
+ expect(chosenWorkerKey).toBe(2)
+ // Verify it was called with a Set containing the same elements
+ const callArg = workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
+ ).choose.firstCall.args[0]
+ expect(callArg).toBeInstanceOf(Set)
+ expect([...callArg]).toStrictEqual(workerNodeKeys)
+ })
+
+ it('Verify that execute() retries with workerNodes until valid worker found', () => {
+ const workerChoiceStrategiesContext = new WorkerChoiceStrategiesContext(
+ fixedPool
+ )
+ const workerChoiceStrategyStub = createStubInstance(
+ RoundRobinWorkerChoiceStrategy,
+ {
+ choose: stub()
+ .onCall(0)
+ .returns(undefined)
+ .onCall(1)
+ .returns(undefined)
+ .onCall(2)
+ .returns(1),
+ }
+ )
+ workerChoiceStrategiesContext.workerChoiceStrategies.set(
+ workerChoiceStrategiesContext.defaultWorkerChoiceStrategy,
+ workerChoiceStrategyStub
+ )
+ const chosenWorkerKey = workerChoiceStrategiesContext.execute(
+ undefined,
+ new Set([0, 1, 2])
+ )
+ expect(
+ workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
+ ).choose.callCount
+ ).toBe(3)
+ expect(chosenWorkerKey).toBe(1)
+ })
})
import { CircularBuffer } from '../../lib/circular-buffer.cjs'
import { WorkerTypes } from '../../lib/index.cjs'
import {
+ checkValidWorkerNodeKeys,
createWorker,
DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS,
getDefaultTasksQueueOptions,
type: WorkerTypes.cluster,
})
})
+
+ it('Verify checkValidWorkerNodeKeys() behavior', () => {
+ // Should not throw for undefined
+ expect(() => checkValidWorkerNodeKeys(undefined)).not.toThrow()
+ // Should not throw for null
+ expect(() => checkValidWorkerNodeKeys(null)).not.toThrow()
+ // Should not throw for valid array with elements
+ expect(() => checkValidWorkerNodeKeys([0, 1, 2])).not.toThrow()
+ // Should throw TypeError for non-array
+ expect(() => checkValidWorkerNodeKeys('not an array')).toThrow(
+ new TypeError('Invalid worker node keys: must be an array')
+ )
+ expect(() => checkValidWorkerNodeKeys(123)).toThrow(
+ new TypeError('Invalid worker node keys: must be an array')
+ )
+ expect(() => checkValidWorkerNodeKeys({})).toThrow(
+ new TypeError('Invalid worker node keys: must be an array')
+ )
+ // Should throw RangeError for empty array
+ expect(() => checkValidWorkerNodeKeys([])).toThrow(
+ new RangeError('Invalid worker node keys: must not be an empty array')
+ )
+ // Should throw TypeError for non-integer values
+ expect(() => checkValidWorkerNodeKeys([1.5])).toThrow(
+ new TypeError(
+ "Invalid worker node key '1.5': must be a non-negative safe integer"
+ )
+ )
+ expect(() => checkValidWorkerNodeKeys([0, 1.5, 2])).toThrow(
+ new TypeError(
+ "Invalid worker node key '1.5': must be a non-negative safe integer"
+ )
+ )
+ // Should throw TypeError for negative values
+ expect(() => checkValidWorkerNodeKeys([-1])).toThrow(
+ new TypeError(
+ "Invalid worker node key '-1': must be a non-negative safe integer"
+ )
+ )
+ expect(() => checkValidWorkerNodeKeys([0, -1, 2])).toThrow(
+ new TypeError(
+ "Invalid worker node key '-1': must be a non-negative safe integer"
+ )
+ )
+ // Should throw TypeError for NaN
+ expect(() => checkValidWorkerNodeKeys([NaN])).toThrow(
+ new TypeError(
+ "Invalid worker node key 'NaN': must be a non-negative safe integer"
+ )
+ )
+ // Should throw TypeError for Infinity
+ expect(() => checkValidWorkerNodeKeys([Infinity])).toThrow(
+ new TypeError(
+ "Invalid worker node key 'Infinity': must be a non-negative safe integer"
+ )
+ )
+ expect(() => checkValidWorkerNodeKeys([-Infinity])).toThrow(
+ new TypeError(
+ "Invalid worker node key '-Infinity': must be a non-negative safe integer"
+ )
+ )
+ // Should throw TypeError for duplicate keys
+ expect(() => checkValidWorkerNodeKeys([0, 0, 1])).toThrow(
+ new TypeError('Invalid worker node keys: must not contain duplicates')
+ )
+ expect(() => checkValidWorkerNodeKeys([1, 2, 1])).toThrow(
+ new TypeError('Invalid worker node keys: must not contain duplicates')
+ )
+ // Should not throw with maxPoolSize when keys are in range
+ expect(() => checkValidWorkerNodeKeys([0, 1, 2], 4)).not.toThrow()
+ // Should throw RangeError when keys exceed maxPoolSize count
+ expect(() => checkValidWorkerNodeKeys([0, 1, 2, 3, 4], 4)).toThrow(
+ new RangeError(
+ 'Cannot add a task function with more worker node keys than the maximum number of workers in the pool'
+ )
+ )
+ // Should throw RangeError when a key is out of range
+ expect(() => checkValidWorkerNodeKeys([0, 4], 4)).toThrow(
+ new RangeError(
+ 'Cannot add a task function with invalid worker node keys: 4. Valid keys are: 0..3'
+ )
+ )
+ expect(() => checkValidWorkerNodeKeys([999], 4)).toThrow(
+ new RangeError(
+ 'Cannot add a task function with invalid worker node keys: 999. Valid keys are: 0..3'
+ )
+ )
+ })
})
module.exports = new ClusterWorker(
{
- factorial: data => factorial(data.n),
- fibonacci: data => fibonacci(data.n),
+ factorial: {
+ priority: 1,
+ taskFunction: data => factorial(data.n),
+ workerNodeKeys: [0],
+ },
+ fibonacci: {
+ priority: 2,
+ taskFunction: data => fibonacci(data.n),
+ workerNodeKeys: [0, 1],
+ },
jsonIntegerSerialization: data => jsonIntegerSerialization(data.n),
},
{
export default new ThreadWorker(
{
- factorial: data => factorial(data.n),
- fibonacci: data => fibonacci(data.n),
+ factorial: {
+ priority: 1,
+ taskFunction: data => factorial(data.n),
+ workerNodeKeys: [0],
+ },
+ fibonacci: {
+ priority: 2,
+ taskFunction: data => fibonacci(data.n),
+ workerNodeKeys: [0, 1],
+ },
jsonIntegerSerialization: data => jsonIntegerSerialization(data.n),
},
{
export default new ThreadWorker(
{
- factorial: { taskFunction: data => factorial(data.n) },
- fibonacci: { priority: -5, taskFunction: data => fibonacci(data.n) },
+ factorial: {
+ taskFunction: data => factorial(data.n),
+ workerNodeKeys: [0],
+ },
+ fibonacci: {
+ priority: -5,
+ taskFunction: data => fibonacci(data.n),
+ workerNodeKeys: [0, 1],
+ },
jsonIntegerSerialization: {
taskFunction: data => jsonIntegerSerialization(data.n),
},