import { WorkerChoiceStrategiesContext } from './selection-strategies/worker-choice-strategies-context.js'
import {
checkFilePath,
+ checkValidPriority,
checkValidTasksQueueOptions,
checkValidWorkerChoiceStrategy,
getDefaultTasksQueueOptions,
if (typeof fn.taskFunction !== 'function') {
throw new TypeError('taskFunction property must be a function')
}
+ checkValidPriority(fn.priority)
+ checkValidWorkerChoiceStrategy(fn.strategy)
const opResult = await this.sendTaskFunctionOperationToWorkers({
taskFunctionOperation: 'add',
taskFunctionProperties: buildTaskFunctionProperties(name, fn),
}
}
+ /**
+ * Gets worker node task function priority, if any.
+ *
+ * @param workerNodeKey - The worker node key.
+ * @param name - The task function name.
+ * @returns The task function worker choice priority if the task function worker choice priority is defined, `undefined` otherwise.
+ */
+ private readonly getWorkerNodeTaskFunctionPriority = (
+ workerNodeKey: number,
+ name?: string
+ ): number | undefined => {
+ if (name != null) {
+ return this.getWorkerInfo(workerNodeKey)?.taskFunctionsProperties?.find(
+ (taskFunctionProperties: TaskFunctionProperties) =>
+ taskFunctionProperties.name === name
+ )?.priority
+ }
+ }
+
/**
* Gets the worker choice strategies registered in this pool.
*
return
}
const timestamp = performance.now()
- const workerNodeKey = this.chooseWorkerNode(
+ const taskFunctionStrategy =
this.getTaskFunctionWorkerWorkerChoiceStrategy(name)
- )
+ const workerNodeKey = this.chooseWorkerNode(taskFunctionStrategy)
const task: Task<Data> = {
name: name ?? DEFAULT_TASK_NAME,
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions
data: data ?? ({} as Data),
+ priority: this.getWorkerNodeTaskFunctionPriority(workerNodeKey, name),
+ strategy: taskFunctionStrategy,
transferList,
timestamp,
taskId: randomUUID()
)
if (sourceWorkerNode != null) {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- const task = sourceWorkerNode.popTask()!
+ const task = sourceWorkerNode.dequeueTask(1)!
this.handleTask(workerNodeKey, task)
this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
}
workerInfo.stealing = true
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- const task = sourceWorkerNode.popTask()!
+ const task = sourceWorkerNode.dequeueTask(1)!
this.handleTask(workerNodeKey, task)
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey, task.name!)
this.opts.tasksQueueOptions?.size ??
getDefaultTasksQueueOptions(
this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
- ).size
+ ).size,
+ tasksQueueBucketSize:
+ (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers) * 2
}
)
// Flag the worker node as ready at pool startup.
return tasksQueueSize
}
- private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
- return this.workerNodes[workerNodeKey].dequeueTask()
+ private dequeueTask (
+ workerNodeKey: number,
+ bucket?: number
+ ): Task<Data> | undefined {
+ return this.workerNodes[workerNodeKey].dequeueTask(bucket)
}
private tasksQueueSize (workerNodeKey: number): number {