'cpus',
'cryptographically',
'ctx',
+ 'deduped',
'deprecations',
'deque',
'dequeue',
"cloneable",
"codeql",
"commitlint",
+ "deduped",
"Dependabot",
"deque",
"deregisters",
workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
): void {
checkValidWorkerChoiceStrategy(workerChoiceStrategy)
- this.opts.workerChoiceStrategy = workerChoiceStrategy
- this.workerChoiceStrategiesContext?.setDefaultWorkerChoiceStrategy(
- this.opts.workerChoiceStrategy,
- workerChoiceStrategyOptions
- )
- for (const [workerNodeKey] of this.workerNodes.entries()) {
- this.sendStatisticsMessageToWorker(workerNodeKey)
+ if (workerChoiceStrategyOptions != null) {
+ this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
+ }
+ if (workerChoiceStrategy !== this.opts.workerChoiceStrategy) {
+ this.opts.workerChoiceStrategy = workerChoiceStrategy
+ this.workerChoiceStrategiesContext?.setDefaultWorkerChoiceStrategy(
+ this.opts.workerChoiceStrategy,
+ this.opts.workerChoiceStrategyOptions
+ )
+ this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies(
+ this.getWorkerWorkerChoiceStrategies(),
+ this.opts.workerChoiceStrategyOptions
+ )
+ for (const [workerNodeKey] of this.workerNodes.entries()) {
+ this.sendStatisticsMessageToWorker(workerNodeKey)
+ }
}
}
/** @inheritDoc */
public hasTaskFunction (name: string): boolean {
- for (const workerNode of this.workerNodes) {
- if (
- Array.isArray(workerNode.info.taskFunctionsProperties) &&
- workerNode.info.taskFunctionsProperties.some(
- taskFunctionProperties => taskFunctionProperties.name === name
- )
- ) {
- return true
- }
- }
- return false
+ return this.listTaskFunctionsProperties().some(
+ taskFunctionProperties => taskFunctionProperties.name === name
+ )
}
/** @inheritDoc */
taskFunction: fn.taskFunction.toString()
})
this.taskFunctions.set(name, fn)
+ this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies(
+ this.getWorkerWorkerChoiceStrategies()
+ )
return opResult
}
})
this.deleteTaskFunctionWorkerUsages(name)
this.taskFunctions.delete(name)
+ this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies(
+ this.getWorkerWorkerChoiceStrategies()
+ )
return opResult
}
}
}
+ /**
+ * Gets the worker choice strategies registered in this pool.
+ *
+ * @returns The worker choice strategies.
+ */
+ private readonly getWorkerWorkerChoiceStrategies =
+ (): Set<WorkerChoiceStrategy> => {
+ return new Set([
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ this.opts.workerChoiceStrategy!,
+ ...(this.listTaskFunctionsProperties()
+ .map(
+ (taskFunctionProperties: TaskFunctionProperties) =>
+ taskFunctionProperties.strategy
+ )
+ .filter(
+ (strategy: WorkerChoiceStrategy | undefined) => strategy != null
+ ) as WorkerChoiceStrategy[])
+ ])
+ }
+
/** @inheritDoc */
public async setDefaultTaskFunction (name: string): Promise<boolean> {
return await this.sendTaskFunctionOperationToWorkers({
import { WeightedRoundRobinWorkerChoiceStrategy } from './weighted-round-robin-worker-choice-strategy.js'
import type { WorkerChoiceStrategiesContext } from './worker-choice-strategies-context.js'
-const clone = <T>(object: T): T => {
- return structuredClone<T>(object)
-}
-
const estimatedCpuSpeed = (): number => {
const runs = 150000000
const begin = performance.now()
pool: IPool<Worker, Data, Response>,
opts?: WorkerChoiceStrategyOptions
): WorkerChoiceStrategyOptions => {
- opts = clone(opts ?? {})
+ opts = structuredClone(opts ?? {})
opts.weights = opts.weights ?? getDefaultWeights(pool.info.maxSize)
return {
...{
}
/**
- * Gets the active worker choice strategies policy in the context.
+ * Gets the active worker choice strategies in the context policy.
*
* @returns The strategies policy.
*/
workerChoiceStrategy: WorkerChoiceStrategy,
opts?: WorkerChoiceStrategyOptions
): void {
- this.defaultWorkerChoiceStrategy = workerChoiceStrategy
- this.addWorkerChoiceStrategy(workerChoiceStrategy, this.pool, opts)
+ if (workerChoiceStrategy !== this.defaultWorkerChoiceStrategy) {
+ this.defaultWorkerChoiceStrategy = workerChoiceStrategy
+ this.addWorkerChoiceStrategy(workerChoiceStrategy, this.pool, opts)
+ }
}
/**
}
}
+ /**
+ * Synchronizes the active worker choice strategies in the context with the given worker choice strategies.
+ *
+ * @param workerChoiceStrategies - The worker choice strategies to synchronize.
+ * @param opts - The worker choice strategy options.
+ */
+ public syncWorkerChoiceStrategies (
+ workerChoiceStrategies: Set<WorkerChoiceStrategy>,
+ opts?: WorkerChoiceStrategyOptions
+ ): void {
+ for (const workerChoiceStrategy of this.workerChoiceStrategies.keys()) {
+ if (!workerChoiceStrategies.has(workerChoiceStrategy)) {
+ this.removeWorkerChoiceStrategy(workerChoiceStrategy)
+ }
+ }
+ for (const workerChoiceStrategy of workerChoiceStrategies) {
+ if (!this.workerChoiceStrategies.has(workerChoiceStrategy)) {
+ this.addWorkerChoiceStrategy(workerChoiceStrategy, this.pool, opts)
+ }
+ }
+ }
+
+ /**
+ * Adds a worker choice strategy to the context.
+ *
+ * @param workerChoiceStrategy - The worker choice strategy to add.
+ * @param opts - The worker choice strategy options.
+ * @returns The worker choice strategies.
+ */
private addWorkerChoiceStrategy (
workerChoiceStrategy: WorkerChoiceStrategy,
pool: IPool<Worker, Data, Response>,
return this.workerChoiceStrategies
}
- // private removeWorkerChoiceStrategy (
- // workerChoiceStrategy: WorkerChoiceStrategy
- // ): boolean {
- // return this.workerChoiceStrategies.delete(workerChoiceStrategy)
- // }
+ /**
+ * Removes a worker choice strategy from the context.
+ *
+ * @param workerChoiceStrategy - The worker choice strategy to remove.
+ * @returns `true` if the worker choice strategy is removed, `false` otherwise.
+ */
+ private removeWorkerChoiceStrategy (
+ workerChoiceStrategy: WorkerChoiceStrategy
+ ): boolean {
+ return this.workerChoiceStrategies.delete(workerChoiceStrategy)
+ }
}
export const checkValidPriority = (priority: number | undefined): void => {
if (priority != null && !Number.isSafeInteger(priority)) {
- throw new TypeError(`Invalid priority '${priority}'`)
+ throw new TypeError(`Invalid property 'priority': '${priority}'`)
}
if (
priority != null &&
Number.isSafeInteger(priority) &&
(priority < -20 || priority > 19)
) {
- throw new RangeError('Property priority must be between -20 and 19')
+ throw new RangeError("Property 'priority' must be between -20 and 19")
}
}
)
expect(
() => new ThreadWorker({ fn1: { taskFunction: fn1, priority: '' } })
- ).toThrow(new TypeError("Invalid priority ''"))
+ ).toThrow(new TypeError("Invalid property 'priority': ''"))
expect(
() => new ThreadWorker({ fn1: { taskFunction: fn1, priority: -21 } })
- ).toThrow(new TypeError('Property priority must be between -20 and 19'))
+ ).toThrow(new TypeError("Property 'priority' must be between -20 and 19"))
expect(
() => new ThreadWorker({ fn1: { taskFunction: fn1, priority: 20 } })
- ).toThrow(new RangeError('Property priority must be between -20 and 19'))
+ ).toThrow(new RangeError("Property 'priority' must be between -20 and 19"))
expect(
() =>
new ThreadWorker({