import { performance } from 'node:perf_hooks'
import type { TransferListItem } from 'node:worker_threads'
+import { defaultBucketSize } from '../priority-queue.js'
import type {
MessageValue,
PromiseResponseWrapper,
Task,
- TaskFunctionProperties
+ TaskFunctionProperties,
} from '../utility-types.js'
import {
average,
median,
min,
round,
- sleep
+ sleep,
} from '../utils.js'
import type {
TaskFunction,
- TaskFunctionObject
+ TaskFunctionObject,
} from '../worker/task-functions.js'
import { KillBehaviors } from '../worker/worker-options.js'
import {
type PoolOptions,
type PoolType,
PoolTypes,
- type TasksQueueOptions
+ type TasksQueueOptions,
} from './pool.js'
import {
Measurements,
WorkerChoiceStrategies,
type WorkerChoiceStrategy,
- type WorkerChoiceStrategyOptions
+ type WorkerChoiceStrategyOptions,
} from './selection-strategies/selection-strategies-types.js'
import { WorkerChoiceStrategiesContext } from './selection-strategies/worker-choice-strategies-context.js'
import {
updateRunTimeWorkerUsage,
updateTaskStatisticsWorkerUsage,
updateWaitTimeWorkerUsage,
- waitWorkerNodeEvents
+ waitWorkerNodeEvents,
} from './utils.js'
import { version } from './version.js'
import type {
IWorkerNode,
WorkerInfo,
WorkerNodeEventDetail,
- WorkerType
+ WorkerType,
} from './worker.js'
import { WorkerNode } from './worker-node.js'
/**
* Base class that implements some shared logic for all poolifier pools.
- *
* @typeParam Worker - Type of worker which manages this pool.
* @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
* @typeParam Response - Type of execution response. This can only be structured-cloneable data.
Response = unknown
> implements IPool<Worker, Data, Response> {
/** @inheritDoc */
- public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
+ public readonly workerNodes: IWorkerNode<Worker, Data>[] = []
/** @inheritDoc */
public emitter?: EventEmitterAsyncResource
*/
protected promiseResponseMap: Map<
`${string}-${string}-${string}-${string}-${string}`,
- PromiseResponseWrapper<Response>
+ PromiseResponseWrapper<Response>
> = new Map<
`${string}-${string}-${string}-${string}-${string}`,
PromiseResponseWrapper<Response>
- >()
+ >()
/**
* Worker choice strategies context referencing worker choice algorithms implementation.
*/
protected workerChoiceStrategiesContext?: WorkerChoiceStrategiesContext<
- Worker,
- Data,
- Response
+ Worker,
+ Data,
+ Response
>
/**
* - `value`: The task function object.
*/
private readonly taskFunctions: Map<
- string,
- TaskFunctionObject<Data, Response>
+ string,
+ TaskFunctionObject<Data, Response>
>
/**
/**
* Constructs a new poolifier pool.
- *
* @param minimumNumberOfWorkers - Minimum number of workers that this pool manages.
* @param filePath - Path to the worker file.
* @param opts - Options for the pool.
this.initEventEmitter()
}
this.workerChoiceStrategiesContext = new WorkerChoiceStrategiesContext<
- Worker,
- Data,
- Response
+ Worker,
+ Data,
+ Response
>(
this,
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
private initEventEmitter (): void {
this.emitter = new EventEmitterAsyncResource({
- name: `poolifier:${this.type}-${this.worker}-pool`
+ name: `poolifier:${this.type}-${this.worker}-pool`,
})
}
.runTime.aggregate === true &&
this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
.waitTime.aggregate && {
- utilization: round(this.utilization)
+ utilization: round(this.utilization),
}),
workerNodes: this.workerNodes.length,
idleWorkerNodes: this.workerNodes.reduce(
(accumulator, workerNode) =>
workerNode.info.stealing ? accumulator + 1 : accumulator,
0
- )
+ ),
}),
busyWorkerNodes: this.workerNodes.reduce(
(accumulator, _, workerNodeKey) =>
(accumulator, workerNode) =>
accumulator + workerNode.usage.tasks.queued,
0
- )
+ ),
}),
...(this.opts.enableTasksQueue === true && {
maxQueuedTasks: this.workerNodes.reduce(
(accumulator, workerNode) =>
accumulator + (workerNode.usage.tasks.maxQueued ?? 0),
0
- )
+ ),
}),
...(this.opts.enableTasksQueue === true && {
- backPressure: this.hasBackPressure()
+ backPressure: this.hasBackPressure(),
}),
...(this.opts.enableTasksQueue === true && {
stolenTasks: this.workerNodes.reduce(
(accumulator, workerNode) =>
accumulator + workerNode.usage.tasks.stolen,
0
- )
+ ),
}),
failedTasks: this.workerNodes.reduce(
(accumulator, workerNode) =>
minimum: round(
min(
...this.workerNodes.map(
- workerNode => workerNode.usage.runTime.minimum ?? Infinity
+ workerNode =>
+ workerNode.usage.runTime.minimum ?? Number.POSITIVE_INFINITY
)
)
),
maximum: round(
max(
...this.workerNodes.map(
- workerNode => workerNode.usage.runTime.maximum ?? -Infinity
+ workerNode =>
+ workerNode.usage.runTime.maximum ?? Number.NEGATIVE_INFINITY
)
)
),
average(
this.workerNodes.reduce<number[]>(
(accumulator, workerNode) =>
- accumulator.concat(workerNode.usage.runTime.history),
+ accumulator.concat(
+ workerNode.usage.runTime.history.toArray()
+ ),
[]
)
)
- )
+ ),
}),
...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
.runTime.median && {
median(
this.workerNodes.reduce<number[]>(
(accumulator, workerNode) =>
- accumulator.concat(workerNode.usage.runTime.history),
+ accumulator.concat(
+ workerNode.usage.runTime.history.toArray()
+ ),
[]
)
)
- )
- })
- }
+ ),
+ }),
+ },
}),
...(this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
.waitTime.aggregate === true && {
minimum: round(
min(
...this.workerNodes.map(
- workerNode => workerNode.usage.waitTime.minimum ?? Infinity
+ workerNode =>
+ workerNode.usage.waitTime.minimum ?? Number.POSITIVE_INFINITY
)
)
),
maximum: round(
max(
...this.workerNodes.map(
- workerNode => workerNode.usage.waitTime.maximum ?? -Infinity
+ workerNode =>
+ workerNode.usage.waitTime.maximum ?? Number.NEGATIVE_INFINITY
)
)
),
average(
this.workerNodes.reduce<number[]>(
(accumulator, workerNode) =>
- accumulator.concat(workerNode.usage.waitTime.history),
+ accumulator.concat(
+ workerNode.usage.waitTime.history.toArray()
+ ),
[]
)
)
- )
+ ),
}),
...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
.waitTime.median && {
median(
this.workerNodes.reduce<number[]>(
(accumulator, workerNode) =>
- accumulator.concat(workerNode.usage.waitTime.history),
+ accumulator.concat(
+ workerNode.usage.waitTime.history.toArray()
+ ),
[]
)
)
- )
- })
- }
+ ),
+ }),
+ },
}),
...(this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
.elu.aggregate === true && {
minimum: round(
min(
...this.workerNodes.map(
- workerNode => workerNode.usage.elu.idle.minimum ?? Infinity
+ workerNode =>
+ workerNode.usage.elu.idle.minimum ??
+ Number.POSITIVE_INFINITY
)
)
),
maximum: round(
max(
...this.workerNodes.map(
- workerNode => workerNode.usage.elu.idle.maximum ?? -Infinity
+ workerNode =>
+ workerNode.usage.elu.idle.maximum ??
+ Number.NEGATIVE_INFINITY
)
)
),
average(
this.workerNodes.reduce<number[]>(
(accumulator, workerNode) =>
- accumulator.concat(workerNode.usage.elu.idle.history),
+ accumulator.concat(
+ workerNode.usage.elu.idle.history.toArray()
+ ),
[]
)
)
- )
+ ),
}),
...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
.elu.median && {
median(
this.workerNodes.reduce<number[]>(
(accumulator, workerNode) =>
- accumulator.concat(workerNode.usage.elu.idle.history),
+ accumulator.concat(
+ workerNode.usage.elu.idle.history.toArray()
+ ),
[]
)
)
- )
- })
+ ),
+ }),
},
active: {
minimum: round(
min(
...this.workerNodes.map(
- workerNode => workerNode.usage.elu.active.minimum ?? Infinity
+ workerNode =>
+ workerNode.usage.elu.active.minimum ??
+ Number.POSITIVE_INFINITY
)
)
),
maximum: round(
max(
...this.workerNodes.map(
- workerNode => workerNode.usage.elu.active.maximum ?? -Infinity
+ workerNode =>
+ workerNode.usage.elu.active.maximum ??
+ Number.NEGATIVE_INFINITY
)
)
),
average(
this.workerNodes.reduce<number[]>(
(accumulator, workerNode) =>
- accumulator.concat(workerNode.usage.elu.active.history),
+ accumulator.concat(
+ workerNode.usage.elu.active.history.toArray()
+ ),
[]
)
)
- )
+ ),
}),
...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
.elu.median && {
median(
this.workerNodes.reduce<number[]>(
(accumulator, workerNode) =>
- accumulator.concat(workerNode.usage.elu.active.history),
+ accumulator.concat(
+ workerNode.usage.elu.active.history.toArray()
+ ),
[]
)
)
- )
- })
+ ),
+ }),
},
utilization: {
average: round(
workerNode => workerNode.usage.elu.utilization ?? 0
)
)
- )
- }
- }
- })
+ ),
+ },
+ },
+ }),
}
}
/**
* The approximate pool utilization.
- *
* @returns The pool utilization.
*/
private get utilization (): number {
/**
* Checks if the worker id sent in the received message from a worker is valid.
- *
* @param message - The received message.
* @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
*/
throw new Error('Worker message received without worker id')
} else if (this.getWorkerNodeKeyByWorkerId(message.workerId) === -1) {
throw new Error(
- `Worker message received from unknown worker '${message.workerId}'`
+ `Worker message received from unknown worker '${message.workerId.toString()}'`
)
}
}
/**
* Gets the worker node key given its worker id.
- *
* @param workerId - The worker id.
* @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
*/
...getDefaultTasksQueueOptions(
this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
),
- ...tasksQueueOptions
+ ...tasksQueueOptions,
}
}
/**
* Whether worker nodes are executing concurrently their tasks quota or not.
- *
* @returns Worker nodes busyness boolean status.
*/
protected internalBusy (): boolean {
} else {
reject(
new Error(
- `Task function operation '${message.taskFunctionOperation}' failed on worker ${message.workerId} with error: '${message.workerError?.message}'`
+ // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
+ `Task function operation '${message.taskFunctionOperation?.toString()}' failed on worker ${message.workerId?.toString()} with error: '${
+ // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
+ message.workerError?.message
+ }'`
)
)
}
new Error(
`Task function operation '${
message.taskFunctionOperation as string
- }' failed on worker ${errorResponse?.workerId} with error: '${
+ // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
+ }' failed on worker ${errorResponse?.workerId?.toString()} with error: '${
+ // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
errorResponse?.workerError?.message
}'`
)
const opResult = await this.sendTaskFunctionOperationToWorkers({
taskFunctionOperation: 'add',
taskFunctionProperties: buildTaskFunctionProperties(name, fn),
- taskFunction: fn.taskFunction.toString()
+ taskFunction: fn.taskFunction.toString(),
})
this.taskFunctions.set(name, fn)
this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies(
taskFunctionProperties: buildTaskFunctionProperties(
name,
this.taskFunctions.get(name)
- )
+ ),
})
for (const workerNode of this.workerNodes) {
workerNode.deleteTaskFunctionWorkerUsage(name)
/**
* Gets task function worker choice strategy, if any.
- *
* @param name - The task function name.
* @returns The task function worker choice strategy if the task function worker choice strategy is defined, `undefined` otherwise.
*/
/**
* Gets worker node task function worker choice strategy, if any.
- *
* @param workerNodeKey - The worker node key.
* @param name - The task function name.
* @returns The worker node task function worker choice strategy if the worker node task function worker choice strategy is defined, `undefined` otherwise.
/**
* Gets worker node task function priority, if any.
- *
* @param workerNodeKey - The worker node key.
* @param name - The task function name.
* @returns The worker node task function priority if the worker node task function priority is defined, `undefined` otherwise.
/**
* Gets the worker choice strategies registered in this pool.
- *
* @returns The worker choice strategies.
*/
private readonly getWorkerChoiceStrategies =
)
.filter(
(strategy: WorkerChoiceStrategy | undefined) => strategy != null
- ) as WorkerChoiceStrategy[])
+ ) as WorkerChoiceStrategy[]),
])
}
taskFunctionProperties: buildTaskFunctionProperties(
name,
this.taskFunctions.get(name)
- )
+ ),
})
}
const workerNodeKey = this.chooseWorkerNode(name)
const task: Task<Data> = {
name: name ?? DEFAULT_TASK_NAME,
- // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
data: data ?? ({} as Data),
priority: this.getWorkerNodeTaskFunctionPriority(workerNodeKey, name),
strategy: this.getWorkerNodeTaskFunctionWorkerChoiceStrategy(
),
transferList,
timestamp,
- taskId: randomUUID()
+ taskId: randomUUID(),
}
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
this.promiseResponseMap.set(task.taskId!, {
...(this.emitter != null && {
asyncResource: new AsyncResource('poolifier:task', {
triggerAsyncId: this.emitter.asyncId,
- requireManualDestroy: true
- })
- })
+ requireManualDestroy: true,
+ }),
+ }),
})
if (
this.opts.enableTasksQueue === false ||
})
}
+
+ /** @inheritDoc */
+ public mapExecute (
+ data: Iterable<Data>,
+ name?: string,
+ transferList?: readonly TransferListItem[]
+ ): Promise<Response[]> {
+ return Promise.all(
+ [...data].map(data => this.execute(data, name, transferList))
+ )
+ }
+
/**
* Starts the minimum number of workers.
+ * @param initWorkerNodeUsage - Whether to initialize the worker node usage or not. @defaultValue false
*/
private startMinimumNumberOfWorkers (initWorkerNodeUsage = false): void {
this.startingMinimumNumberOfWorkers = true
} else if (message.kill === 'failure') {
reject(
new Error(
- `Kill message handling failed on worker ${message.workerId}`
+ // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
+ `Kill message handling failed on worker ${message.workerId?.toString()}`
)
)
}
/**
* Terminates the worker node given its worker node key.
- *
* @param workerNodeKey - The worker node key.
*/
protected async destroyWorkerNode (workerNodeKey: number): Promise<void> {
/**
* Setup hook to execute code before worker nodes are created in the abstract constructor.
* Can be overridden.
- *
- * @virtual
*/
protected setupHook (): void {
/* Intentionally empty */
/**
* Returns whether the worker is the main worker or not.
- *
* @returns `true` if the worker is the main worker, `false` otherwise.
*/
protected abstract isMain (): boolean
/**
* Hook executed before the worker task execution.
* Can be overridden.
- *
* @param workerNodeKey - The worker node key.
* @param task - The task to execute.
*/
/**
* Hook executed after the worker task execution.
* Can be overridden.
- *
* @param workerNodeKey - The worker node key.
* @param message - The received message.
*/
/**
* Whether the worker node shall update its task function worker usage or not.
- *
* @param workerNodeKey - The worker node key.
* @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
*/
/**
* Chooses a worker node for the next task.
- *
* @param name - The task function name.
- * @returns The chosen worker node key
+ * @returns The chosen worker node key.
*/
private chooseWorkerNode (name?: string): number {
if (this.shallCreateDynamicWorker()) {
/**
* Conditions for dynamic worker creation.
- *
* @returns Whether to create a dynamic worker or not.
*/
protected abstract shallCreateDynamicWorker (): boolean
/**
* Sends a message to worker given its worker node key.
- *
* @param workerNodeKey - The worker node key.
* @param message - The message.
* @param transferList - The optional array of transferable objects.
/**
* Initializes the worker node usage with sensible default values gathered during runtime.
- *
* @param workerNode - The worker node.
*/
private initWorkerNodeUsage (workerNode: IWorkerNode<Worker, Data>): void {
) {
workerNode.usage.runTime.aggregate = min(
...this.workerNodes.map(
- workerNode => workerNode.usage.runTime.aggregate ?? Infinity
+ workerNode =>
+ workerNode.usage.runTime.aggregate ?? Number.POSITIVE_INFINITY
)
)
}
) {
workerNode.usage.waitTime.aggregate = min(
...this.workerNodes.map(
- workerNode => workerNode.usage.waitTime.aggregate ?? Infinity
+ workerNode =>
+ workerNode.usage.waitTime.aggregate ?? Number.POSITIVE_INFINITY
)
)
}
) {
workerNode.usage.elu.active.aggregate = min(
...this.workerNodes.map(
- workerNode => workerNode.usage.elu.active.aggregate ?? Infinity
+ workerNode =>
+ workerNode.usage.elu.active.aggregate ?? Number.POSITIVE_INFINITY
)
)
}
/**
* Creates a new, completely set up worker node.
- *
* @returns New, completely set up worker node key.
*/
protected createAndSetupWorkerNode (): number {
/**
* Creates a new, completely set up dynamic worker node.
- *
* @returns New, completely set up dynamic worker node key.
*/
protected createAndSetupDynamicWorkerNode (): number {
const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
message.workerId
)
+ const workerInfo = this.getWorkerInfo(localWorkerNodeKey)
const workerUsage = this.workerNodes[localWorkerNodeKey]?.usage
// Kill message received from worker
if (
((this.opts.enableTasksQueue === false &&
workerUsage.tasks.executing === 0) ||
(this.opts.enableTasksQueue === true &&
+ workerInfo != null &&
+ !workerInfo.stealing &&
workerUsage.tasks.executing === 0 &&
this.tasksQueueSize(localWorkerNodeKey) === 0)))
) {
}
})
this.sendToWorker(workerNodeKey, {
- checkActive: true
+ checkActive: true,
})
if (this.taskFunctions.size > 0) {
for (const [taskFunctionName, taskFunctionObject] of this.taskFunctions) {
taskFunctionName,
taskFunctionObject
),
- taskFunction: taskFunctionObject.taskFunction.toString()
+ taskFunction: taskFunctionObject.taskFunction.toString(),
}).catch((error: unknown) => {
this.emitter?.emit(PoolEvents.error, error)
})
/**
* Registers a listener callback on the worker given its worker node key.
- *
* @param workerNodeKey - The worker node key.
* @param listener - The message listener callback.
*/
/**
* Registers once a listener callback on the worker given its worker node key.
- *
* @param workerNodeKey - The worker node key.
* @param listener - The message listener callback.
*/
/**
* Deregisters a listener callback on the worker given its worker node key.
- *
* @param workerNodeKey - The worker node key.
* @param listener - The message listener callback.
*/
/**
* Method hooked up after a worker node has been newly created.
* Can be overridden.
- *
* @param workerNodeKey - The newly created worker node key.
*/
protected afterWorkerNodeSetup (workerNodeKey: number): void {
/**
* Sends the startup message to worker given its worker node key.
- *
* @param workerNodeKey - The worker node key.
*/
protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
/**
* Sends the statistics message to worker given its worker node key.
- *
* @param workerNodeKey - The worker node key.
*/
private sendStatisticsMessageToWorker (workerNodeKey: number): void {
.runTime.aggregate ?? false,
elu:
this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
- .elu.aggregate ?? false
- }
+ .elu.aggregate ?? false,
+ },
})
}
const workerInfo = this.getWorkerInfo(workerNodeKey)
if (workerInfo == null) {
throw new Error(
- `Worker node with key '${workerNodeKey}' not found in pool`
+ `Worker node with key '${workerNodeKey.toString()}' not found in pool`
)
}
if (
const workerInfo = this.getWorkerInfo(workerNodeKey)
if (workerInfo == null) {
throw new Error(
- `Worker node with key '${workerNodeKey}' not found in pool`
+ `Worker node with key '${workerNodeKey.toString()}' not found in pool`
)
}
workerInfo.stealing = true
}
}
+ private setTasksQueuePriority (workerNodeKey: number): void {
+ this.workerNodes[workerNodeKey].setTasksQueuePriority(
+ this.getTasksQueuePriority()
+ )
+ }
+
/**
* This method is the message listener registered on each worker.
+ * @param message - The message received from the worker.
*/
protected readonly workerMessageListener = (
message: MessageValue<Response>
if (workerInfo != null) {
workerInfo.taskFunctionsProperties = taskFunctionsProperties
this.sendStatisticsMessageToWorker(workerNodeKey)
+ this.setTasksQueuePriority(workerNodeKey)
}
} else if (taskId != null) {
// Task execution response received from worker
private handleWorkerReadyResponse (message: MessageValue<Response>): void {
const { workerId, ready, taskFunctionsProperties } = message
if (ready == null || !ready) {
- throw new Error(`Worker ${workerId} failed to initialize`)
+ // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
+ throw new Error(`Worker ${workerId?.toString()} failed to initialize`)
}
const workerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
const workerNode = this.workerNodes[workerNodeKey]
workerNode.info.ready = ready
workerNode.info.taskFunctionsProperties = taskFunctionsProperties
this.sendStatisticsMessageToWorker(workerNodeKey)
+ this.setTasksQueuePriority(workerNodeKey)
this.checkAndEmitReadyEvent()
}
) {
workerNode.emit('idle', {
workerId,
- workerNodeKey
+ workerNodeKey,
})
}
}
/**
* Gets the worker information given its worker node key.
- *
* @param workerNodeKey - The worker node key.
* @returns The worker information.
*/
return this.workerNodes[workerNodeKey]?.info
}
+ private getTasksQueuePriority (): boolean {
+ return this.listTaskFunctionsProperties().some(
+ taskFunctionProperties => taskFunctionProperties.priority != null
+ )
+ }
+
/**
* Creates a worker node.
- *
* @returns The created worker node.
*/
private createWorkerNode (): IWorkerNode<Worker, Data> {
getDefaultTasksQueueOptions(
this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
).size,
- tasksQueueBucketSize:
- (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers) * 2
+ tasksQueueBucketSize: defaultBucketSize,
+ tasksQueuePriority: this.getTasksQueuePriority(),
}
)
// Flag the worker node as ready at pool startup.
/**
* Adds the given worker node in the pool worker nodes.
- *
* @param workerNode - The worker node.
* @returns The added worker node key.
* @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
/**
* Removes the worker node from the pool worker nodes.
- *
* @param workerNode - The worker node.
*/
private removeWorkerNode (workerNode: IWorkerNode<Worker, Data>): void {
/**
* Executes the given task on the worker given its worker node key.
- *
* @param workerNodeKey - The worker node key.
* @param task - The task to execute.
*/