this.opts.workerChoiceStrategyOptions =
opts.workerChoiceStrategyOptions ??
DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
+ this.checkValidWorkerChoiceStrategyOptions(
+ this.opts.workerChoiceStrategyOptions
+ )
this.opts.enableEvents = opts.enableEvents ?? true
this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
if (this.opts.enableTasksQueue) {
'Invalid worker choice strategy options: must be a plain object'
)
}
+ if (
+ workerChoiceStrategyOptions.weights != null &&
+ Object.keys(workerChoiceStrategyOptions.weights).length !== this.size
+ ) {
+ throw new Error(
+ 'Invalid worker choice strategy options: must have a weight for each worker node'
+ )
+ }
}
private checkValidTasksQueueOptions (
/** @inheritDoc */
public async execute (data?: Data, name?: string): Promise<Response> {
- const [workerNodeKey, workerNode] = this.chooseWorkerNode()
+ const workerNodeKey = this.chooseWorkerNode()
const submittedTask: Task<Data> = {
name,
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions
this.promiseResponseMap.set(submittedTask.id as string, {
resolve,
reject,
- worker: workerNode.worker
+ worker: this.workerNodes[workerNodeKey].worker
})
})
if (
workerTasksUsage.avgRunTime =
workerTasksUsage.runTime / workerTasksUsage.run
}
- if (this.workerChoiceStrategyContext.getRequiredStatistics().medRunTime) {
- workerTasksUsage.runTimeHistory.push(message.runTime ?? 0)
+ if (
+ this.workerChoiceStrategyContext.getRequiredStatistics().medRunTime &&
+ message.runTime != null
+ ) {
+ workerTasksUsage.runTimeHistory.push(message.runTime)
workerTasksUsage.medRunTime = median(workerTasksUsage.runTimeHistory)
}
}
/**
* Chooses a worker node for the next task.
*
- * The default uses a round robin algorithm to distribute the load.
+ * The default worker choice strategy uses a round robin algorithm to distribute the load.
*
- * @returns [worker node key, worker node].
+ * @returns The worker node key
*/
- protected chooseWorkerNode (): [number, WorkerNode<Worker, Data>] {
+ protected chooseWorkerNode (): number {
let workerNodeKey: number
if (this.type === PoolType.DYNAMIC && !this.full && this.internalBusy()) {
const workerCreated = this.createAndSetupWorker()
} else {
workerNodeKey = this.workerChoiceStrategyContext.execute()
}
- return [workerNodeKey, this.workerNodes[workerNodeKey]]
+ return workerNodeKey
}
/**