import { performance } from 'node:perf_hooks'
import type { TransferListItem } from 'node:worker_threads'
+import { defaultBucketSize } from '../priority-queue.js'
import type {
MessageValue,
PromiseResponseWrapper,
Task,
- TaskFunctionProperties
+ TaskFunctionProperties,
} from '../utility-types.js'
import {
average,
median,
min,
round,
- sleep
+ sleep,
} from '../utils.js'
import type {
TaskFunction,
- TaskFunctionObject
+ TaskFunctionObject,
} from '../worker/task-functions.js'
import { KillBehaviors } from '../worker/worker-options.js'
import {
type PoolOptions,
type PoolType,
PoolTypes,
- type TasksQueueOptions
+ type TasksQueueOptions,
} from './pool.js'
import {
Measurements,
WorkerChoiceStrategies,
type WorkerChoiceStrategy,
- type WorkerChoiceStrategyOptions
+ type WorkerChoiceStrategyOptions,
} from './selection-strategies/selection-strategies-types.js'
import { WorkerChoiceStrategiesContext } from './selection-strategies/worker-choice-strategies-context.js'
import {
updateRunTimeWorkerUsage,
updateTaskStatisticsWorkerUsage,
updateWaitTimeWorkerUsage,
- waitWorkerNodeEvents
+ waitWorkerNodeEvents,
} from './utils.js'
import { version } from './version.js'
import type {
IWorkerNode,
WorkerInfo,
WorkerNodeEventDetail,
- WorkerType
+ WorkerType,
} from './worker.js'
import { WorkerNode } from './worker-node.js'
/**
* Base class that implements some shared logic for all poolifier pools.
- *
* @typeParam Worker - Type of worker which manages this pool.
* @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
* @typeParam Response - Type of execution response. This can only be structured-cloneable data.
Response = unknown
> implements IPool<Worker, Data, Response> {
/** @inheritDoc */
- public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
+ public readonly workerNodes: IWorkerNode<Worker, Data>[] = []
/** @inheritDoc */
public emitter?: EventEmitterAsyncResource
*
* When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
*/
- protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
- new Map<string, PromiseResponseWrapper<Response>>()
+ protected promiseResponseMap: Map<
+ `${string}-${string}-${string}-${string}-${string}`,
+ PromiseResponseWrapper<Response>
+ > = new Map<
+ `${string}-${string}-${string}-${string}-${string}`,
+ PromiseResponseWrapper<Response>
+ >()
/**
* Worker choice strategies context referencing worker choice algorithms implementation.
*/
protected workerChoiceStrategiesContext?: WorkerChoiceStrategiesContext<
- Worker,
- Data,
- Response
+ Worker,
+ Data,
+ Response
>
/**
* - `value`: The task function object.
*/
private readonly taskFunctions: Map<
- string,
- TaskFunctionObject<Data, Response>
+ string,
+ TaskFunctionObject<Data, Response>
>
/**
/**
* The start timestamp of the pool.
*/
- private readonly startTimestamp
+ private startTimestamp?: number
/**
* Constructs a new poolifier pool.
- *
* @param minimumNumberOfWorkers - Minimum number of workers that this pool manages.
* @param filePath - Path to the worker file.
* @param opts - Options for the pool.
this.enqueueTask = this.enqueueTask.bind(this)
if (this.opts.enableEvents === true) {
- this.initializeEventEmitter()
+ this.initEventEmitter()
}
this.workerChoiceStrategiesContext = new WorkerChoiceStrategiesContext<
- Worker,
- Data,
- Response
+ Worker,
+ Data,
+ Response
>(
this,
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
if (this.opts.startWorkers === true) {
this.start()
}
-
- this.startTimestamp = performance.now()
}
private checkPoolType (): void {
}
}
- private initializeEventEmitter (): void {
+ private initEventEmitter (): void {
this.emitter = new EventEmitterAsyncResource({
- name: `poolifier:${this.type}-${this.worker}-pool`
+ name: `poolifier:${this.type}-${this.worker}-pool`,
})
}
.runTime.aggregate === true &&
this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
.waitTime.aggregate && {
- utilization: round(this.utilization)
+ utilization: round(this.utilization),
}),
workerNodes: this.workerNodes.length,
idleWorkerNodes: this.workerNodes.reduce(
(accumulator, workerNode) =>
workerNode.info.stealing ? accumulator + 1 : accumulator,
0
- )
+ ),
}),
busyWorkerNodes: this.workerNodes.reduce(
- (accumulator, _workerNode, workerNodeKey) =>
+ (accumulator, _, workerNodeKey) =>
this.isWorkerNodeBusy(workerNodeKey) ? accumulator + 1 : accumulator,
0
),
(accumulator, workerNode) =>
accumulator + workerNode.usage.tasks.queued,
0
- )
+ ),
}),
...(this.opts.enableTasksQueue === true && {
maxQueuedTasks: this.workerNodes.reduce(
(accumulator, workerNode) =>
accumulator + (workerNode.usage.tasks.maxQueued ?? 0),
0
- )
+ ),
}),
...(this.opts.enableTasksQueue === true && {
- backPressure: this.hasBackPressure()
+ backPressure: this.hasBackPressure(),
}),
...(this.opts.enableTasksQueue === true && {
stolenTasks: this.workerNodes.reduce(
(accumulator, workerNode) =>
accumulator + workerNode.usage.tasks.stolen,
0
- )
+ ),
}),
failedTasks: this.workerNodes.reduce(
(accumulator, workerNode) =>
minimum: round(
min(
...this.workerNodes.map(
- workerNode => workerNode.usage.runTime.minimum ?? Infinity
+ workerNode =>
+ workerNode.usage.runTime.minimum ?? Number.POSITIVE_INFINITY
)
)
),
maximum: round(
max(
...this.workerNodes.map(
- workerNode => workerNode.usage.runTime.maximum ?? -Infinity
+ workerNode =>
+ workerNode.usage.runTime.maximum ?? Number.NEGATIVE_INFINITY
)
)
),
average(
this.workerNodes.reduce<number[]>(
(accumulator, workerNode) =>
- accumulator.concat(workerNode.usage.runTime.history),
+ accumulator.concat(
+ workerNode.usage.runTime.history.toArray()
+ ),
[]
)
)
- )
+ ),
}),
...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
.runTime.median && {
median(
this.workerNodes.reduce<number[]>(
(accumulator, workerNode) =>
- accumulator.concat(workerNode.usage.runTime.history),
+ accumulator.concat(
+ workerNode.usage.runTime.history.toArray()
+ ),
[]
)
)
- )
- })
- }
+ ),
+ }),
+ },
}),
...(this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
.waitTime.aggregate === true && {
minimum: round(
min(
...this.workerNodes.map(
- workerNode => workerNode.usage.waitTime.minimum ?? Infinity
+ workerNode =>
+ workerNode.usage.waitTime.minimum ?? Number.POSITIVE_INFINITY
)
)
),
maximum: round(
max(
...this.workerNodes.map(
- workerNode => workerNode.usage.waitTime.maximum ?? -Infinity
+ workerNode =>
+ workerNode.usage.waitTime.maximum ?? Number.NEGATIVE_INFINITY
)
)
),
average(
this.workerNodes.reduce<number[]>(
(accumulator, workerNode) =>
- accumulator.concat(workerNode.usage.waitTime.history),
+ accumulator.concat(
+ workerNode.usage.waitTime.history.toArray()
+ ),
[]
)
)
- )
+ ),
}),
...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
.waitTime.median && {
median(
this.workerNodes.reduce<number[]>(
(accumulator, workerNode) =>
- accumulator.concat(workerNode.usage.waitTime.history),
+ accumulator.concat(
+ workerNode.usage.waitTime.history.toArray()
+ ),
[]
)
)
- )
- })
- }
- })
+ ),
+ }),
+ },
+ }),
+ ...(this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
+ .elu.aggregate === true && {
+ elu: {
+ idle: {
+ minimum: round(
+ min(
+ ...this.workerNodes.map(
+ workerNode =>
+ workerNode.usage.elu.idle.minimum ??
+ Number.POSITIVE_INFINITY
+ )
+ )
+ ),
+ maximum: round(
+ max(
+ ...this.workerNodes.map(
+ workerNode =>
+ workerNode.usage.elu.idle.maximum ??
+ Number.NEGATIVE_INFINITY
+ )
+ )
+ ),
+ ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
+ .elu.average && {
+ average: round(
+ average(
+ this.workerNodes.reduce<number[]>(
+ (accumulator, workerNode) =>
+ accumulator.concat(
+ workerNode.usage.elu.idle.history.toArray()
+ ),
+ []
+ )
+ )
+ ),
+ }),
+ ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
+ .elu.median && {
+ median: round(
+ median(
+ this.workerNodes.reduce<number[]>(
+ (accumulator, workerNode) =>
+ accumulator.concat(
+ workerNode.usage.elu.idle.history.toArray()
+ ),
+ []
+ )
+ )
+ ),
+ }),
+ },
+ active: {
+ minimum: round(
+ min(
+ ...this.workerNodes.map(
+ workerNode =>
+ workerNode.usage.elu.active.minimum ??
+ Number.POSITIVE_INFINITY
+ )
+ )
+ ),
+ maximum: round(
+ max(
+ ...this.workerNodes.map(
+ workerNode =>
+ workerNode.usage.elu.active.maximum ??
+ Number.NEGATIVE_INFINITY
+ )
+ )
+ ),
+ ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
+ .elu.average && {
+ average: round(
+ average(
+ this.workerNodes.reduce<number[]>(
+ (accumulator, workerNode) =>
+ accumulator.concat(
+ workerNode.usage.elu.active.history.toArray()
+ ),
+ []
+ )
+ )
+ ),
+ }),
+ ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
+ .elu.median && {
+ median: round(
+ median(
+ this.workerNodes.reduce<number[]>(
+ (accumulator, workerNode) =>
+ accumulator.concat(
+ workerNode.usage.elu.active.history.toArray()
+ ),
+ []
+ )
+ )
+ ),
+ }),
+ },
+ utilization: {
+ average: round(
+ average(
+ this.workerNodes.map(
+ workerNode => workerNode.usage.elu.utilization ?? 0
+ )
+ )
+ ),
+ median: round(
+ median(
+ this.workerNodes.map(
+ workerNode => workerNode.usage.elu.utilization ?? 0
+ )
+ )
+ ),
+ },
+ },
+ }),
}
}
/**
* The approximate pool utilization.
- *
* @returns The pool utilization.
*/
private get utilization (): number {
+ if (this.startTimestamp == null) {
+ return 0
+ }
const poolTimeCapacity =
(performance.now() - this.startTimestamp) *
(this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
/**
* Checks if the worker id sent in the received message from a worker is valid.
- *
* @param message - The received message.
* @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
*/
throw new Error('Worker message received without worker id')
} else if (this.getWorkerNodeKeyByWorkerId(message.workerId) === -1) {
throw new Error(
- `Worker message received from unknown worker '${message.workerId}'`
+ `Worker message received from unknown worker '${message.workerId.toString()}'`
)
}
}
/**
* Gets the worker node key given its worker id.
- *
* @param workerId - The worker id.
* @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
*/
}
if (requireSync) {
this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies(
- this.getWorkerWorkerChoiceStrategies(),
+ this.getWorkerChoiceStrategies(),
this.opts.workerChoiceStrategyOptions
)
for (const workerNodeKey of this.workerNodes.keys()) {
this.opts.workerChoiceStrategyOptions
)
this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies(
- this.getWorkerWorkerChoiceStrategies(),
+ this.getWorkerChoiceStrategies(),
this.opts.workerChoiceStrategyOptions
)
for (const workerNodeKey of this.workerNodes.keys()) {
...getDefaultTasksQueueOptions(
this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
),
- ...tasksQueueOptions
+ ...tasksQueueOptions,
}
}
/**
* Whether worker nodes are executing concurrently their tasks quota or not.
- *
* @returns Worker nodes busyness boolean status.
*/
protected internalBusy (): boolean {
} else {
reject(
new Error(
- `Task function operation '${message.taskFunctionOperation}' failed on worker ${message.workerId} with error: '${message.workerError?.message}'`
+ // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
+ `Task function operation '${message.taskFunctionOperation?.toString()}' failed on worker ${message.workerId?.toString()} with error: '${
+ // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
+ message.workerError?.message
+ }'`
)
)
}
new Error(
`Task function operation '${
message.taskFunctionOperation as string
- }' failed on worker ${errorResponse?.workerId} with error: '${
+ // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
+ }' failed on worker ${errorResponse?.workerId?.toString()} with error: '${
+ // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
errorResponse?.workerError?.message
}'`
)
const opResult = await this.sendTaskFunctionOperationToWorkers({
taskFunctionOperation: 'add',
taskFunctionProperties: buildTaskFunctionProperties(name, fn),
- taskFunction: fn.taskFunction.toString()
+ taskFunction: fn.taskFunction.toString(),
})
this.taskFunctions.set(name, fn)
this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies(
- this.getWorkerWorkerChoiceStrategies()
+ this.getWorkerChoiceStrategies()
)
+ for (const workerNodeKey of this.workerNodes.keys()) {
+ this.sendStatisticsMessageToWorker(workerNodeKey)
+ }
return opResult
}
taskFunctionProperties: buildTaskFunctionProperties(
name,
this.taskFunctions.get(name)
- )
+ ),
})
- this.deleteTaskFunctionWorkerUsages(name)
+ for (const workerNode of this.workerNodes) {
+ workerNode.deleteTaskFunctionWorkerUsage(name)
+ }
this.taskFunctions.delete(name)
this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies(
- this.getWorkerWorkerChoiceStrategies()
+ this.getWorkerChoiceStrategies()
)
+ for (const workerNodeKey of this.workerNodes.keys()) {
+ this.sendStatisticsMessageToWorker(workerNodeKey)
+ }
return opResult
}
}
/**
- * Gets task function strategy, if any.
- *
+ * Gets task function worker choice strategy, if any.
* @param name - The task function name.
* @returns The task function worker choice strategy if the task function worker choice strategy is defined, `undefined` otherwise.
*/
- private readonly getTaskFunctionWorkerWorkerChoiceStrategy = (
+ private readonly getTaskFunctionWorkerChoiceStrategy = (
name?: string
): WorkerChoiceStrategy | undefined => {
- if (name != null) {
- return this.listTaskFunctionsProperties().find(
- (taskFunctionProperties: TaskFunctionProperties) =>
- taskFunctionProperties.name === name
- )?.strategy
+ name = name ?? DEFAULT_TASK_NAME
+ const taskFunctionsProperties = this.listTaskFunctionsProperties()
+ if (name === DEFAULT_TASK_NAME) {
+ name = taskFunctionsProperties[1]?.name
}
+ return taskFunctionsProperties.find(
+ (taskFunctionProperties: TaskFunctionProperties) =>
+ taskFunctionProperties.name === name
+ )?.strategy
+ }
+
+ /**
+ * Gets worker node task function worker choice strategy, if any.
+ * @param workerNodeKey - The worker node key.
+ * @param name - The task function name.
+ * @returns The worker node task function worker choice strategy if the worker node task function worker choice strategy is defined, `undefined` otherwise.
+ */
+ private readonly getWorkerNodeTaskFunctionWorkerChoiceStrategy = (
+ workerNodeKey: number,
+ name?: string
+ ): WorkerChoiceStrategy | undefined => {
+ const workerInfo = this.getWorkerInfo(workerNodeKey)
+ if (workerInfo == null) {
+ return
+ }
+ name = name ?? DEFAULT_TASK_NAME
+ if (name === DEFAULT_TASK_NAME) {
+ name = workerInfo.taskFunctionsProperties?.[1]?.name
+ }
+ return workerInfo.taskFunctionsProperties?.find(
+ (taskFunctionProperties: TaskFunctionProperties) =>
+ taskFunctionProperties.name === name
+ )?.strategy
}
/**
* Gets worker node task function priority, if any.
- *
* @param workerNodeKey - The worker node key.
* @param name - The task function name.
- * @returns The task function worker choice priority if the task function worker choice priority is defined, `undefined` otherwise.
+ * @returns The worker node task function priority if the worker node task function priority is defined, `undefined` otherwise.
*/
private readonly getWorkerNodeTaskFunctionPriority = (
workerNodeKey: number,
name?: string
): number | undefined => {
- if (name != null) {
- return this.getWorkerInfo(workerNodeKey)?.taskFunctionsProperties?.find(
- (taskFunctionProperties: TaskFunctionProperties) =>
- taskFunctionProperties.name === name
- )?.priority
+ const workerInfo = this.getWorkerInfo(workerNodeKey)
+ if (workerInfo == null) {
+ return
}
+ name = name ?? DEFAULT_TASK_NAME
+ if (name === DEFAULT_TASK_NAME) {
+ name = workerInfo.taskFunctionsProperties?.[1]?.name
+ }
+ return workerInfo.taskFunctionsProperties?.find(
+ (taskFunctionProperties: TaskFunctionProperties) =>
+ taskFunctionProperties.name === name
+ )?.priority
}
/**
* Gets the worker choice strategies registered in this pool.
- *
* @returns The worker choice strategies.
*/
- private readonly getWorkerWorkerChoiceStrategies =
+ private readonly getWorkerChoiceStrategies =
(): Set<WorkerChoiceStrategy> => {
return new Set([
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
)
.filter(
(strategy: WorkerChoiceStrategy | undefined) => strategy != null
- ) as WorkerChoiceStrategy[])
+ ) as WorkerChoiceStrategy[]),
])
}
taskFunctionProperties: buildTaskFunctionProperties(
name,
this.taskFunctions.get(name)
- )
+ ),
})
}
- private deleteTaskFunctionWorkerUsages (name: string): void {
- for (const workerNode of this.workerNodes) {
- workerNode.deleteTaskFunctionWorkerUsage(name)
- }
- }
-
private shallExecuteTask (workerNodeKey: number): boolean {
return (
this.tasksQueueSize(workerNodeKey) === 0 &&
return
}
const timestamp = performance.now()
- const taskFunctionStrategy =
- this.getTaskFunctionWorkerWorkerChoiceStrategy(name)
- const workerNodeKey = this.chooseWorkerNode(taskFunctionStrategy)
+ const workerNodeKey = this.chooseWorkerNode(name)
const task: Task<Data> = {
name: name ?? DEFAULT_TASK_NAME,
- // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
data: data ?? ({} as Data),
priority: this.getWorkerNodeTaskFunctionPriority(workerNodeKey, name),
- strategy: taskFunctionStrategy,
+ strategy: this.getWorkerNodeTaskFunctionWorkerChoiceStrategy(
+ workerNodeKey,
+ name
+ ),
transferList,
timestamp,
- taskId: randomUUID()
+ taskId: randomUUID(),
}
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
this.promiseResponseMap.set(task.taskId!, {
...(this.emitter != null && {
asyncResource: new AsyncResource('poolifier:task', {
triggerAsyncId: this.emitter.asyncId,
- requireManualDestroy: true
- })
- })
+ requireManualDestroy: true,
+ }),
+ }),
})
if (
this.opts.enableTasksQueue === false ||
})
}
+
+ /** @inheritDoc */
+ public mapExecute (
+ data: Iterable<Data>,
+ name?: string,
+ transferList?: readonly TransferListItem[]
+ ): Promise<Response[]> {
+ return Promise.all(
+ [...data].map(data => this.execute(data, name, transferList))
+ )
+ }
+
/**
* Starts the minimum number of workers.
+ * @param initWorkerNodeUsage - Whether to initialize the worker node usage or not. @defaultValue false
*/
- private startMinimumNumberOfWorkers (): void {
+ private startMinimumNumberOfWorkers (initWorkerNodeUsage = false): void {
this.startingMinimumNumberOfWorkers = true
while (
this.workerNodes.reduce(
0
) < this.minimumNumberOfWorkers
) {
- this.createAndSetupWorkerNode()
+ const workerNodeKey = this.createAndSetupWorkerNode()
+ initWorkerNodeUsage &&
+ this.initWorkerNodeUsage(this.workerNodes[workerNodeKey])
}
this.startingMinimumNumberOfWorkers = false
}
}
this.starting = true
this.startMinimumNumberOfWorkers()
+ this.startTimestamp = performance.now()
this.starting = false
this.started = true
}
}
this.destroying = true
await Promise.all(
- this.workerNodes.map(async (_workerNode, workerNodeKey) => {
+ this.workerNodes.map(async (_, workerNodeKey) => {
await this.destroyWorkerNode(workerNodeKey)
})
)
this.emitter?.emit(PoolEvents.destroy, this.info)
this.emitter?.emitDestroy()
this.readyEventEmitted = false
+ delete this.startTimestamp
this.destroying = false
this.started = false
}
} else if (message.kill === 'failure') {
reject(
new Error(
- `Kill message handling failed on worker ${message.workerId}`
+ // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
+ `Kill message handling failed on worker ${message.workerId?.toString()}`
)
)
}
/**
* Terminates the worker node given its worker node key.
- *
* @param workerNodeKey - The worker node key.
*/
protected async destroyWorkerNode (workerNodeKey: number): Promise<void> {
/**
* Setup hook to execute code before worker nodes are created in the abstract constructor.
* Can be overridden.
- *
- * @virtual
*/
protected setupHook (): void {
/* Intentionally empty */
/**
* Returns whether the worker is the main worker or not.
- *
* @returns `true` if the worker is the main worker, `false` otherwise.
*/
protected abstract isMain (): boolean
/**
* Hook executed before the worker task execution.
* Can be overridden.
- *
* @param workerNodeKey - The worker node key.
* @param task - The task to execute.
*/
/**
* Hook executed after the worker task execution.
* Can be overridden.
- *
* @param workerNodeKey - The worker node key.
* @param message - The received message.
*/
workerNodeKey: number,
message: MessageValue<Response>
): void {
- let needWorkerChoiceStrategyUpdate = false
+ let needWorkerChoiceStrategiesUpdate = false
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
if (this.workerNodes[workerNodeKey]?.usage != null) {
const workerUsage = this.workerNodes[workerNodeKey].usage
workerUsage,
message
)
- needWorkerChoiceStrategyUpdate = true
+ needWorkerChoiceStrategiesUpdate = true
}
if (
this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
taskFunctionWorkerUsage,
message
)
- needWorkerChoiceStrategyUpdate = true
+ needWorkerChoiceStrategiesUpdate = true
}
- if (needWorkerChoiceStrategyUpdate) {
+ if (needWorkerChoiceStrategiesUpdate) {
this.workerChoiceStrategiesContext?.update(workerNodeKey)
}
}
/**
* Whether the worker node shall update its task function worker usage or not.
- *
* @param workerNodeKey - The worker node key.
* @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
*/
}
/**
- * Chooses a worker node for the next task given the worker choice strategy.
- *
- * @param workerChoiceStrategy - The worker choice strategy.
- * @returns The chosen worker node key
+ * Chooses a worker node for the next task.
+ * @param name - The task function name.
+ * @returns The chosen worker node key.
*/
- private chooseWorkerNode (
- workerChoiceStrategy?: WorkerChoiceStrategy
- ): number {
+ private chooseWorkerNode (name?: string): number {
if (this.shallCreateDynamicWorker()) {
const workerNodeKey = this.createAndSetupDynamicWorkerNode()
if (
}
}
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- return this.workerChoiceStrategiesContext!.execute(workerChoiceStrategy)
+ return this.workerChoiceStrategiesContext!.execute(
+ this.getTaskFunctionWorkerChoiceStrategy(name)
+ )
}
/**
* Conditions for dynamic worker creation.
- *
* @returns Whether to create a dynamic worker or not.
*/
protected abstract shallCreateDynamicWorker (): boolean
/**
* Sends a message to worker given its worker node key.
- *
* @param workerNodeKey - The worker node key.
* @param message - The message.
* @param transferList - The optional array of transferable objects.
transferList?: readonly TransferListItem[]
): void
+ /**
+ * Initializes the worker node usage with sensible default values gathered during runtime.
+ * @param workerNode - The worker node.
+ */
+ private initWorkerNodeUsage (workerNode: IWorkerNode<Worker, Data>): void {
+ if (
+ this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
+ .runTime.aggregate === true
+ ) {
+ workerNode.usage.runTime.aggregate = min(
+ ...this.workerNodes.map(
+ workerNode =>
+ workerNode.usage.runTime.aggregate ?? Number.POSITIVE_INFINITY
+ )
+ )
+ }
+ if (
+ this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
+ .waitTime.aggregate === true
+ ) {
+ workerNode.usage.waitTime.aggregate = min(
+ ...this.workerNodes.map(
+ workerNode =>
+ workerNode.usage.waitTime.aggregate ?? Number.POSITIVE_INFINITY
+ )
+ )
+ }
+ if (
+ this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements().elu
+ .aggregate === true
+ ) {
+ workerNode.usage.elu.active.aggregate = min(
+ ...this.workerNodes.map(
+ workerNode =>
+ workerNode.usage.elu.active.aggregate ?? Number.POSITIVE_INFINITY
+ )
+ )
+ }
+ }
+
/**
* Creates a new, completely set up worker node.
- *
* @returns New, completely set up worker node key.
*/
protected createAndSetupWorkerNode (): number {
if (workerNode.info.dynamic) {
this.createAndSetupDynamicWorkerNode()
} else if (!this.startingMinimumNumberOfWorkers) {
- this.startMinimumNumberOfWorkers()
+ this.startMinimumNumberOfWorkers(true)
}
}
if (
!this.startingMinimumNumberOfWorkers &&
!this.destroying
) {
- this.startMinimumNumberOfWorkers()
+ this.startMinimumNumberOfWorkers(true)
}
})
const workerNodeKey = this.addWorkerNode(workerNode)
/**
* Creates a new, completely set up dynamic worker node.
- *
* @returns New, completely set up dynamic worker node key.
*/
protected createAndSetupDynamicWorkerNode (): number {
const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
message.workerId
)
+ const workerInfo = this.getWorkerInfo(localWorkerNodeKey)
const workerUsage = this.workerNodes[localWorkerNodeKey]?.usage
// Kill message received from worker
if (
((this.opts.enableTasksQueue === false &&
workerUsage.tasks.executing === 0) ||
(this.opts.enableTasksQueue === true &&
+ workerInfo != null &&
+ !workerInfo.stealing &&
workerUsage.tasks.executing === 0 &&
this.tasksQueueSize(localWorkerNodeKey) === 0)))
) {
}
})
this.sendToWorker(workerNodeKey, {
- checkActive: true
+ checkActive: true,
})
if (this.taskFunctions.size > 0) {
for (const [taskFunctionName, taskFunctionObject] of this.taskFunctions) {
taskFunctionName,
taskFunctionObject
),
- taskFunction: taskFunctionObject.taskFunction.toString()
+ taskFunction: taskFunctionObject.taskFunction.toString(),
}).catch((error: unknown) => {
this.emitter?.emit(PoolEvents.error, error)
})
) {
workerNode.info.ready = true
}
+ this.initWorkerNodeUsage(workerNode)
this.checkAndEmitDynamicWorkerCreationEvents()
return workerNodeKey
}
/**
* Registers a listener callback on the worker given its worker node key.
- *
* @param workerNodeKey - The worker node key.
* @param listener - The message listener callback.
*/
/**
* Registers once a listener callback on the worker given its worker node key.
- *
* @param workerNodeKey - The worker node key.
* @param listener - The message listener callback.
*/
/**
* Deregisters a listener callback on the worker given its worker node key.
- *
* @param workerNodeKey - The worker node key.
* @param listener - The message listener callback.
*/
/**
* Method hooked up after a worker node has been newly created.
* Can be overridden.
- *
* @param workerNodeKey - The newly created worker node key.
*/
protected afterWorkerNodeSetup (workerNodeKey: number): void {
/**
* Sends the startup message to worker given its worker node key.
- *
* @param workerNodeKey - The worker node key.
*/
protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
/**
* Sends the statistics message to worker given its worker node key.
- *
* @param workerNodeKey - The worker node key.
*/
private sendStatisticsMessageToWorker (workerNodeKey: number): void {
.runTime.aggregate ?? false,
elu:
this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
- .elu.aggregate ?? false
- }
+ .elu.aggregate ?? false,
+ },
})
}
}
}
- private redistributeQueuedTasks (workerNodeKey: number): void {
- if (workerNodeKey === -1 || this.cannotStealTask()) {
+ private redistributeQueuedTasks (sourceWorkerNodeKey: number): void {
+ if (sourceWorkerNodeKey === -1 || this.cannotStealTask()) {
return
}
- while (this.tasksQueueSize(workerNodeKey) > 0) {
+ while (this.tasksQueueSize(sourceWorkerNodeKey) > 0) {
const destinationWorkerNodeKey = this.workerNodes.reduce(
(minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
- return workerNode.info.ready &&
+ return sourceWorkerNodeKey !== workerNodeKey &&
+ workerNode.info.ready &&
workerNode.usage.tasks.queued <
workerNodes[minWorkerNodeKey].usage.tasks.queued
? workerNodeKey
this.handleTask(
destinationWorkerNodeKey,
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- this.dequeueTask(workerNodeKey)!
+ this.dequeueTask(sourceWorkerNodeKey)!
)
}
}
this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
workerNode.getTaskFunctionWorkerUsage(taskName) != null
) {
- const taskFunctionWorkerUsage =
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- workerNode.getTaskFunctionWorkerUsage(taskName)!
- ++taskFunctionWorkerUsage.tasks.stolen
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ ++workerNode.getTaskFunctionWorkerUsage(taskName)!.tasks.stolen
}
}
private updateTaskSequentiallyStolenStatisticsWorkerUsage (
- workerNodeKey: number
+ workerNodeKey: number,
+ taskName: string,
+ previousTaskName?: string
): void {
const workerNode = this.workerNodes[workerNodeKey]
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
if (workerNode?.usage != null) {
++workerNode.usage.tasks.sequentiallyStolen
}
- }
-
- private updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
- workerNodeKey: number,
- taskName: string
- ): void {
- const workerNode = this.workerNodes[workerNodeKey]
if (
this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
workerNode.getTaskFunctionWorkerUsage(taskName) != null
const taskFunctionWorkerUsage =
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
workerNode.getTaskFunctionWorkerUsage(taskName)!
- ++taskFunctionWorkerUsage.tasks.sequentiallyStolen
+ if (
+ taskFunctionWorkerUsage.tasks.sequentiallyStolen === 0 ||
+ (previousTaskName != null &&
+ previousTaskName === taskName &&
+ taskFunctionWorkerUsage.tasks.sequentiallyStolen > 0)
+ ) {
+ ++taskFunctionWorkerUsage.tasks.sequentiallyStolen
+ } else if (taskFunctionWorkerUsage.tasks.sequentiallyStolen > 0) {
+ taskFunctionWorkerUsage.tasks.sequentiallyStolen = 0
+ }
}
}
private resetTaskSequentiallyStolenStatisticsWorkerUsage (
- workerNodeKey: number
+ workerNodeKey: number,
+ taskName: string
): void {
const workerNode = this.workerNodes[workerNodeKey]
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
if (workerNode?.usage != null) {
workerNode.usage.tasks.sequentiallyStolen = 0
}
- }
-
- private resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
- workerNodeKey: number,
- taskName: string
- ): void {
- const workerNode = this.workerNodes[workerNodeKey]
if (
this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
workerNode.getTaskFunctionWorkerUsage(taskName) != null
) {
- const taskFunctionWorkerUsage =
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- workerNode.getTaskFunctionWorkerUsage(taskName)!
- taskFunctionWorkerUsage.tasks.sequentiallyStolen = 0
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ workerNode.getTaskFunctionWorkerUsage(
+ taskName
+ )!.tasks.sequentiallyStolen = 0
}
}
)
}
const workerInfo = this.getWorkerInfo(workerNodeKey)
+ if (workerInfo == null) {
+ throw new Error(
+ `Worker node with key '${workerNodeKey.toString()}' not found in pool`
+ )
+ }
if (
this.cannotStealTask() ||
(this.info.stealingWorkerNodes ?? 0) >
Math.floor(this.workerNodes.length / 2)
) {
- if (workerInfo != null && previousStolenTask != null) {
+ if (previousStolenTask != null) {
workerInfo.stealing = false
+ this.resetTaskSequentiallyStolenStatisticsWorkerUsage(
+ workerNodeKey,
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ previousStolenTask.name!
+ )
}
return
}
const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks
if (
- workerInfo != null &&
previousStolenTask != null &&
- workerNodeTasksUsage.sequentiallyStolen > 0 &&
(workerNodeTasksUsage.executing > 0 ||
this.tasksQueueSize(workerNodeKey) > 0)
) {
workerInfo.stealing = false
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- for (const taskFunctionProperties of workerInfo.taskFunctionsProperties!) {
- this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
- workerNodeKey,
- taskFunctionProperties.name
- )
- }
- this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
- return
- }
- if (workerInfo == null) {
- throw new Error(
- `Worker node with key '${workerNodeKey}' not found in pool`
+ this.resetTaskSequentiallyStolenStatisticsWorkerUsage(
+ workerNodeKey,
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ previousStolenTask.name!
)
+ return
}
workerInfo.stealing = true
const stolenTask = this.workerNodeStealTask(workerNodeKey)
- if (
- this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
- stolenTask != null
- ) {
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- const taskFunctionTasksWorkerUsage = this.workerNodes[
- workerNodeKey
+ if (stolenTask != null) {
+ this.updateTaskSequentiallyStolenStatisticsWorkerUsage(
+ workerNodeKey,
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- ].getTaskFunctionWorkerUsage(stolenTask.name!)!.tasks
- if (
- taskFunctionTasksWorkerUsage.sequentiallyStolen === 0 ||
- (previousStolenTask != null &&
- previousStolenTask.name === stolenTask.name &&
- taskFunctionTasksWorkerUsage.sequentiallyStolen > 0)
- ) {
- this.updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
- workerNodeKey,
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- stolenTask.name!
- )
- } else {
- this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
- workerNodeKey,
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- stolenTask.name!
- )
- }
+ stolenTask.name!,
+ previousStolenTask?.name
+ )
}
sleep(exponentialDelay(workerNodeTasksUsage.sequentiallyStolen))
.then(() => {
)
if (sourceWorkerNode != null) {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- const task = sourceWorkerNode.dequeueTask(1)!
+ const task = sourceWorkerNode.dequeueLastPrioritizedTask()!
this.handleTask(workerNodeKey, task)
- this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey, task.name!)
return task
): void => {
if (
this.cannotStealTask() ||
+ this.hasBackPressure() ||
(this.info.stealingWorkerNodes ?? 0) >
Math.floor(this.workerNodes.length / 2)
) {
return
}
- const { workerId } = eventDetail
const sizeOffset = 1
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
if (this.opts.tasksQueueOptions!.size! <= sizeOffset) {
return
}
+ const { workerId } = eventDetail
const sourceWorkerNode =
this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
const workerNodes = this.workerNodes
const workerInfo = this.getWorkerInfo(workerNodeKey)
if (workerInfo == null) {
throw new Error(
- `Worker node with key '${workerNodeKey}' not found in pool`
+ `Worker node with key '${workerNodeKey.toString()}' not found in pool`
)
}
workerInfo.stealing = true
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- const task = sourceWorkerNode.dequeueTask(1)!
+ const task = sourceWorkerNode.dequeueLastPrioritizedTask()!
this.handleTask(workerNodeKey, task)
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey, task.name!)
}
}
+ private setTasksQueuePriority (workerNodeKey: number): void {
+ this.workerNodes[workerNodeKey].setTasksQueuePriority(
+ this.getTasksQueuePriority()
+ )
+ }
+
/**
* This method is the message listener registered on each worker.
+ * @param message - The message received from the worker.
*/
protected readonly workerMessageListener = (
message: MessageValue<Response>
this.handleWorkerReadyResponse(message)
} else if (taskFunctionsProperties != null) {
// Task function properties message received from worker
- const workerInfo = this.getWorkerInfo(
- this.getWorkerNodeKeyByWorkerId(workerId)
- )
+ const workerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
+ const workerInfo = this.getWorkerInfo(workerNodeKey)
if (workerInfo != null) {
workerInfo.taskFunctionsProperties = taskFunctionsProperties
+ this.sendStatisticsMessageToWorker(workerNodeKey)
+ this.setTasksQueuePriority(workerNodeKey)
}
} else if (taskId != null) {
// Task execution response received from worker
private handleWorkerReadyResponse (message: MessageValue<Response>): void {
const { workerId, ready, taskFunctionsProperties } = message
if (ready == null || !ready) {
- throw new Error(`Worker ${workerId} failed to initialize`)
+ // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
+ throw new Error(`Worker ${workerId?.toString()} failed to initialize`)
}
- const workerNode =
- this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
+ const workerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
+ const workerNode = this.workerNodes[workerNodeKey]
workerNode.info.ready = ready
workerNode.info.taskFunctionsProperties = taskFunctionsProperties
+ this.sendStatisticsMessageToWorker(workerNodeKey)
+ this.setTasksQueuePriority(workerNodeKey)
this.checkAndEmitReadyEvent()
}
) {
workerNode.emit('idle', {
workerId,
- workerNodeKey
+ workerNodeKey,
})
}
}
/**
* Gets the worker information given its worker node key.
- *
* @param workerNodeKey - The worker node key.
* @returns The worker information.
*/
return this.workerNodes[workerNodeKey]?.info
}
+ private getTasksQueuePriority (): boolean {
+ return this.listTaskFunctionsProperties().some(
+ taskFunctionProperties => taskFunctionProperties.priority != null
+ )
+ }
+
/**
* Creates a worker node.
- *
* @returns The created worker node.
*/
private createWorkerNode (): IWorkerNode<Worker, Data> {
getDefaultTasksQueueOptions(
this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
).size,
- tasksQueueBucketSize:
- (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers) * 2
+ tasksQueueBucketSize: defaultBucketSize,
+ tasksQueuePriority: this.getTasksQueuePriority(),
}
)
// Flag the worker node as ready at pool startup.
/**
* Adds the given worker node in the pool worker nodes.
- *
* @param workerNode - The worker node.
* @returns The added worker node key.
* @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
/**
* Removes the worker node from the pool worker nodes.
- *
* @param workerNode - The worker node.
*/
private removeWorkerNode (workerNode: IWorkerNode<Worker, Data>): void {
/**
* Executes the given task on the worker given its worker node key.
- *
* @param workerNodeKey - The worker node key.
* @param task - The task to execute.
*/
return tasksQueueSize
}
- private dequeueTask (
- workerNodeKey: number,
- bucket?: number
- ): Task<Data> | undefined {
- return this.workerNodes[workerNodeKey].dequeueTask(bucket)
+ private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
+ return this.workerNodes[workerNodeKey].dequeueTask()
}
private tasksQueueSize (workerNodeKey: number): number {