import { WorkerChoiceStrategiesContext } from './selection-strategies/worker-choice-strategies-context.js'
import {
checkFilePath,
+ checkValidPriority,
checkValidTasksQueueOptions,
checkValidWorkerChoiceStrategy,
getDefaultTasksQueueOptions,
/**
* The start timestamp of the pool.
*/
- private readonly startTimestamp
+ private startTimestamp?: number
/**
* Constructs a new poolifier pool.
if (this.opts.startWorkers === true) {
this.start()
}
-
- this.startTimestamp = performance.now()
}
private checkPoolType (): void {
* @returns The pool utilization.
*/
private get utilization (): number {
+ if (this.startTimestamp == null) {
+ return 0
+ }
const poolTimeCapacity =
(performance.now() - this.startTimestamp) *
(this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
workerChoiceStrategy: WorkerChoiceStrategy,
workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
): void {
+ let requireSync = false
checkValidWorkerChoiceStrategy(workerChoiceStrategy)
if (workerChoiceStrategyOptions != null) {
- this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
+ requireSync = !this.setWorkerChoiceStrategyOptions(
+ workerChoiceStrategyOptions
+ )
}
if (workerChoiceStrategy !== this.opts.workerChoiceStrategy) {
this.opts.workerChoiceStrategy = workerChoiceStrategy
this.opts.workerChoiceStrategy,
this.opts.workerChoiceStrategyOptions
)
+ requireSync = true
+ }
+ if (requireSync) {
this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies(
this.getWorkerWorkerChoiceStrategies(),
this.opts.workerChoiceStrategyOptions
)
- for (const [workerNodeKey] of this.workerNodes.entries()) {
+ for (const workerNodeKey of this.workerNodes.keys()) {
this.sendStatisticsMessageToWorker(workerNodeKey)
}
}
/** @inheritDoc */
public setWorkerChoiceStrategyOptions (
workerChoiceStrategyOptions: WorkerChoiceStrategyOptions | undefined
- ): void {
+ ): boolean {
this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
if (workerChoiceStrategyOptions != null) {
this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
+ this.workerChoiceStrategiesContext?.setOptions(
+ this.opts.workerChoiceStrategyOptions
+ )
+ this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies(
+ this.getWorkerWorkerChoiceStrategies(),
+ this.opts.workerChoiceStrategyOptions
+ )
+ for (const workerNodeKey of this.workerNodes.keys()) {
+ this.sendStatisticsMessageToWorker(workerNodeKey)
+ }
+ return true
}
- this.workerChoiceStrategiesContext?.setOptions(
- this.opts.workerChoiceStrategyOptions
- )
+ return false
}
/** @inheritDoc */
}
private setTaskStealing (): void {
- for (const [workerNodeKey] of this.workerNodes.entries()) {
+ for (const workerNodeKey of this.workerNodes.keys()) {
this.workerNodes[workerNodeKey].on('idle', this.handleWorkerNodeIdleEvent)
}
}
private unsetTaskStealing (): void {
- for (const [workerNodeKey] of this.workerNodes.entries()) {
+ for (const workerNodeKey of this.workerNodes.keys()) {
this.workerNodes[workerNodeKey].off(
'idle',
this.handleWorkerNodeIdleEvent
}
private setTasksStealingOnBackPressure (): void {
- for (const [workerNodeKey] of this.workerNodes.entries()) {
+ for (const workerNodeKey of this.workerNodes.keys()) {
this.workerNodes[workerNodeKey].on(
'backPressure',
this.handleWorkerNodeBackPressureEvent
}
private unsetTasksStealingOnBackPressure (): void {
- for (const [workerNodeKey] of this.workerNodes.entries()) {
+ for (const workerNodeKey of this.workerNodes.keys()) {
this.workerNodes[workerNodeKey].off(
'backPressure',
this.handleWorkerNodeBackPressureEvent
}
}
}
- for (const [workerNodeKey] of this.workerNodes.entries()) {
+ for (const workerNodeKey of this.workerNodes.keys()) {
this.registerWorkerMessageListener(
workerNodeKey,
taskFunctionOperationsListener
if (typeof fn.taskFunction !== 'function') {
throw new TypeError('taskFunction property must be a function')
}
+ checkValidPriority(fn.priority)
+ checkValidWorkerChoiceStrategy(fn.strategy)
const opResult = await this.sendTaskFunctionOperationToWorkers({
taskFunctionOperation: 'add',
taskFunctionProperties: buildTaskFunctionProperties(name, fn),
}
}
+ /**
+ * Gets worker node task function priority, if any.
+ *
+ * @param workerNodeKey - The worker node key.
+ * @param name - The task function name.
+ * @returns The task function worker choice priority if the task function worker choice priority is defined, `undefined` otherwise.
+ */
+ private readonly getWorkerNodeTaskFunctionPriority = (
+ workerNodeKey: number,
+ name?: string
+ ): number | undefined => {
+ if (name != null) {
+ return this.getWorkerInfo(workerNodeKey)?.taskFunctionsProperties?.find(
+ (taskFunctionProperties: TaskFunctionProperties) =>
+ taskFunctionProperties.name === name
+ )?.priority
+ }
+ }
+
/**
* Gets the worker choice strategies registered in this pool.
*
return
}
const timestamp = performance.now()
- const workerNodeKey = this.chooseWorkerNode(
+ const taskFunctionStrategy =
this.getTaskFunctionWorkerWorkerChoiceStrategy(name)
- )
+ const workerNodeKey = this.chooseWorkerNode(taskFunctionStrategy)
const task: Task<Data> = {
name: name ?? DEFAULT_TASK_NAME,
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions
data: data ?? ({} as Data),
+ priority: this.getWorkerNodeTaskFunctionPriority(workerNodeKey, name),
+ strategy: taskFunctionStrategy,
transferList,
timestamp,
taskId: randomUUID()
}
this.starting = true
this.startMinimumNumberOfWorkers()
+ this.startTimestamp = performance.now()
this.starting = false
this.started = true
}
this.emitter?.emit(PoolEvents.destroy, this.info)
this.emitter?.emitDestroy()
this.readyEventEmitted = false
+ delete this.startTimestamp
this.destroying = false
this.started = false
}
}
/**
- * Chooses a worker node for the next task.
+ * Chooses a worker node for the next task given the worker choice strategy.
*
* @param workerChoiceStrategy - The worker choice strategy.
* @returns The chosen worker node key
)
if (sourceWorkerNode != null) {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- const task = sourceWorkerNode.popTask()!
+ const task = sourceWorkerNode.dequeueTask(1)!
this.handleTask(workerNodeKey, task)
this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
}
workerInfo.stealing = true
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- const task = sourceWorkerNode.popTask()!
+ const task = sourceWorkerNode.dequeueTask(1)!
this.handleTask(workerNodeKey, task)
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey, task.name!)
this.opts.tasksQueueOptions?.size ??
getDefaultTasksQueueOptions(
this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
- ).size
+ ).size,
+ tasksQueueBucketSize:
+ (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers) * 2
}
)
// Flag the worker node as ready at pool startup.
return tasksQueueSize
}
- private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
- return this.workerNodes[workerNodeKey].dequeueTask()
+ private dequeueTask (
+ workerNodeKey: number,
+ bucket?: number
+ ): Task<Data> | undefined {
+ return this.workerNodes[workerNodeKey].dequeueTask(bucket)
}
private tasksQueueSize (workerNodeKey: number): number {
}
private flushTasksQueues (): void {
- for (const [workerNodeKey] of this.workerNodes.entries()) {
+ for (const workerNodeKey of this.workerNodes.keys()) {
this.flushTasksQueue(workerNodeKey)
}
}