import { performance } from 'node:perf_hooks'
import type { TransferListItem } from 'node:worker_threads'
import { EventEmitterAsyncResource } from 'node:events'
+import { AsyncResource } from 'node:async_hooks'
import type {
MessageValue,
PromiseResponseWrapper,
} from '../utility-types'
import {
DEFAULT_TASK_NAME,
- DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
EMPTY_FUNCTION,
average,
exponentialDelay,
WorkerUsage
} from './worker'
import {
- type MeasurementStatisticsRequirements,
Measurements,
WorkerChoiceStrategies,
type WorkerChoiceStrategy,
checkFilePath,
checkValidTasksQueueOptions,
checkValidWorkerChoiceStrategy,
- updateMeasurementStatistics
+ getDefaultTasksQueueOptions,
+ updateEluWorkerUsage,
+ updateRunTimeWorkerUsage,
+ updateTaskStatisticsWorkerUsage,
+ updateWaitTimeWorkerUsage,
+ waitWorkerNodeEvents
} from './utils'
/**
/** @inheritDoc */
public emitter?: EventEmitterAsyncResource
- /**
- * Dynamic pool maximum size property placeholder.
- */
- protected readonly max?: number
-
/**
* The task execution response promise map:
* - `key`: The message id of each submitted task.
/**
* Constructs a new poolifier pool.
*
- * @param numberOfWorkers - Number of workers that this pool should manage.
+ * @param minimumNumberOfWorkers - Minimum number of workers that this pool should manage.
* @param filePath - Path to the worker file.
* @param opts - Options for the pool.
+ * @param maximumNumberOfWorkers - Maximum number of workers that this pool should manage.
*/
public constructor (
- protected readonly numberOfWorkers: number,
+ protected readonly minimumNumberOfWorkers: number,
protected readonly filePath: string,
- protected readonly opts: PoolOptions<Worker>
+ protected readonly opts: PoolOptions<Worker>,
+ protected readonly maximumNumberOfWorkers?: number
) {
if (!this.isMain()) {
throw new Error(
'Cannot start a pool from a worker with the same type as the pool'
)
}
+ this.checkPoolType()
checkFilePath(this.filePath)
- this.checkNumberOfWorkers(this.numberOfWorkers)
+ this.checkMinimumNumberOfWorkers(this.minimumNumberOfWorkers)
this.checkPoolOptions(this.opts)
this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
this.startTimestamp = performance.now()
}
- private checkNumberOfWorkers (numberOfWorkers: number): void {
- if (numberOfWorkers == null) {
+ private checkPoolType (): void {
+ if (this.type === PoolTypes.fixed && this.maximumNumberOfWorkers != null) {
+ throw new Error(
+ 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
+ )
+ }
+ }
+
+ private checkMinimumNumberOfWorkers (minimumNumberOfWorkers: number): void {
+ if (minimumNumberOfWorkers == null) {
throw new Error(
'Cannot instantiate a pool without specifying the number of workers'
)
- } else if (!Number.isSafeInteger(numberOfWorkers)) {
+ } else if (!Number.isSafeInteger(minimumNumberOfWorkers)) {
throw new TypeError(
'Cannot instantiate a pool with a non safe integer number of workers'
)
- } else if (numberOfWorkers < 0) {
+ } else if (minimumNumberOfWorkers < 0) {
throw new RangeError(
'Cannot instantiate a pool with a negative number of workers'
)
- } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
+ } else if (this.type === PoolTypes.fixed && minimumNumberOfWorkers === 0) {
throw new RangeError('Cannot instantiate a fixed pool with zero worker')
}
}
this.checkValidWorkerChoiceStrategyOptions(
opts.workerChoiceStrategyOptions as WorkerChoiceStrategyOptions
)
- this.opts.workerChoiceStrategyOptions = {
- ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
- ...opts.workerChoiceStrategyOptions
+ if (opts.workerChoiceStrategyOptions != null) {
+ this.opts.workerChoiceStrategyOptions = opts.workerChoiceStrategyOptions
}
this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
this.opts.enableEvents = opts.enableEvents ?? true
'Invalid worker choice strategy options: must be a plain object'
)
}
- if (
- workerChoiceStrategyOptions?.retries != null &&
- !Number.isSafeInteger(workerChoiceStrategyOptions.retries)
- ) {
- throw new TypeError(
- 'Invalid worker choice strategy options: retries must be an integer'
- )
- }
- if (
- workerChoiceStrategyOptions?.retries != null &&
- workerChoiceStrategyOptions.retries < 0
- ) {
- throw new RangeError(
- `Invalid worker choice strategy options: retries '${workerChoiceStrategyOptions.retries}' must be greater or equal than zero`
- )
- }
if (
workerChoiceStrategyOptions?.weights != null &&
- Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
+ Object.keys(workerChoiceStrategyOptions.weights).length !==
+ (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
) {
throw new Error(
'Invalid worker choice strategy options: must have a weight for each worker node'
started: this.started,
ready: this.ready,
strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
- minSize: this.minSize,
- maxSize: this.maxSize,
- ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ minSize: this.minimumNumberOfWorkers,
+ maxSize: this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers,
+ ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
.runTime.aggregate &&
- this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
.waitTime.aggregate && { utilization: round(this.utilization) }),
workerNodes: this.workerNodes.length,
idleWorkerNodes: this.workerNodes.reduce(
0
),
busyWorkerNodes: this.workerNodes.reduce(
- (accumulator, workerNode) =>
- workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
+ (accumulator, _workerNode, workerNodeKey) =>
+ this.isWorkerNodeBusy(workerNodeKey) ? accumulator + 1 : accumulator,
0
),
executedTasks: this.workerNodes.reduce(
accumulator + workerNode.usage.tasks.failed,
0
),
- ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
.runTime.aggregate && {
runTime: {
minimum: round(
)
)
),
- ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
.runTime.average && {
average: round(
average(
)
)
}),
- ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
.runTime.median && {
median: round(
median(
})
}
}),
- ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
.waitTime.aggregate && {
waitTime: {
minimum: round(
)
)
),
- ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
.waitTime.average && {
average: round(
average(
)
)
}),
- ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
.waitTime.median && {
median: round(
median(
? accumulator + 1
: accumulator,
0
- ) >= this.minSize
+ ) >= this.minimumNumberOfWorkers
)
}
*/
private get utilization (): number {
const poolTimeCapacity =
- (performance.now() - this.startTimestamp) * this.maxSize
+ (performance.now() - this.startTimestamp) *
+ (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
const totalTasksRunTime = this.workerNodes.reduce(
(accumulator, workerNode) =>
accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
*/
protected abstract get worker (): WorkerType
- /**
- * The pool minimum size.
- */
- protected get minSize (): number {
- return this.numberOfWorkers
- }
-
- /**
- * The pool maximum size.
- */
- protected get maxSize (): number {
- return this.max ?? this.numberOfWorkers
- }
-
/**
* Checks if the worker id sent in the received message from a worker is valid.
*
}
}
- /**
- * Gets the given worker its worker node key.
- *
- * @param worker - The worker.
- * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
- */
- private getWorkerNodeKeyByWorker (worker: Worker): number {
- return this.workerNodes.findIndex(
- workerNode => workerNode.worker === worker
- )
- }
-
/**
* Gets the worker node key given its worker id.
*
workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
): void {
this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
- this.opts.workerChoiceStrategyOptions = {
- ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
- ...workerChoiceStrategyOptions
+ if (workerChoiceStrategyOptions != null) {
+ this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
}
this.workerChoiceStrategyContext.setOptions(
+ this,
this.opts.workerChoiceStrategyOptions
)
}
this.buildTasksQueueOptions(tasksQueueOptions)
this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number)
if (this.opts.tasksQueueOptions.taskStealing === true) {
+ this.unsetTaskStealing()
this.setTaskStealing()
} else {
this.unsetTaskStealing()
}
if (this.opts.tasksQueueOptions.tasksStealingOnBackPressure === true) {
+ this.unsetTasksStealingOnBackPressure()
this.setTasksStealingOnBackPressure()
} else {
this.unsetTasksStealingOnBackPressure()
tasksQueueOptions: TasksQueueOptions
): TasksQueueOptions {
return {
- ...{
- size: Math.pow(this.maxSize, 2),
- concurrency: 1,
- taskStealing: true,
- tasksStealingOnBackPressure: true
- },
+ ...getDefaultTasksQueueOptions(
+ this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
+ ),
...tasksQueueOptions
}
}
* The pool filling boolean status.
*/
protected get full (): boolean {
- return this.workerNodes.length >= this.maxSize
+ return (
+ this.workerNodes.length >=
+ (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
+ )
}
/**
)
}
+ private isWorkerNodeBusy (workerNodeKey: number): boolean {
+ if (this.opts.enableTasksQueue === true) {
+ return (
+ this.workerNodes[workerNodeKey].usage.tasks.executing >=
+ (this.opts.tasksQueueOptions?.concurrency as number)
+ )
+ }
+ return this.workerNodes[workerNodeKey].usage.tasks.executing > 0
+ }
+
private async sendTaskFunctionOperationToWorker (
workerNodeKey: number,
message: MessageValue<Data>
this.promiseResponseMap.set(task.taskId as string, {
resolve,
reject,
- workerNodeKey
+ workerNodeKey,
+ ...(this.emitter != null && {
+ asyncResource: new AsyncResource('poolifier:task', {
+ triggerAsyncId: this.emitter.asyncId,
+ requireManualDestroy: true
+ })
+ })
})
if (
this.opts.enableTasksQueue === false ||
(accumulator, workerNode) =>
!workerNode.info.dynamic ? accumulator + 1 : accumulator,
0
- ) < this.numberOfWorkers
+ ) < this.minimumNumberOfWorkers
) {
this.createAndSetupWorkerNode()
}
}
this.destroying = true
await Promise.all(
- this.workerNodes.map(async (_, workerNodeKey) => {
+ this.workerNodes.map(async (_workerNode, workerNodeKey) => {
await this.destroyWorkerNode(workerNodeKey)
})
)
this.started = false
}
- protected async sendKillMessageToWorker (
- workerNodeKey: number
- ): Promise<void> {
+ private async sendKillMessageToWorker (workerNodeKey: number): Promise<void> {
await new Promise<void>((resolve, reject) => {
+ if (workerNodeKey < 0 || workerNodeKey >= this.workerNodes.length) {
+ reject(new Error(`Invalid worker node key '${workerNodeKey}'`))
+ return
+ }
const killMessageListener = (message: MessageValue<Response>): void => {
this.checkMessageWorkerId(message)
if (message.kill === 'success') {
*
* @param workerNodeKey - The worker node key.
*/
- protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
+ protected async destroyWorkerNode (workerNodeKey: number): Promise<void> {
+ this.flagWorkerNodeAsNotReady(workerNodeKey)
+ const flushedTasks = this.flushTasksQueue(workerNodeKey)
+ const workerNode = this.workerNodes[workerNodeKey]
+ await waitWorkerNodeEvents(
+ workerNode,
+ 'taskFinished',
+ flushedTasks,
+ this.opts.tasksQueueOptions?.tasksFinishedTimeout ??
+ getDefaultTasksQueueOptions(
+ this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
+ ).tasksFinishedTimeout
+ )
+ await this.sendKillMessageToWorker(workerNodeKey)
+ await workerNode.terminate()
+ }
/**
* Setup hook to execute code before worker nodes are created in the abstract constructor.
if (this.workerNodes[workerNodeKey]?.usage != null) {
const workerUsage = this.workerNodes[workerNodeKey].usage
++workerUsage.tasks.executing
- this.updateWaitTimeWorkerUsage(workerUsage, task)
+ updateWaitTimeWorkerUsage(
+ this.workerChoiceStrategyContext,
+ workerUsage,
+ task
+ )
}
if (
this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
workerNodeKey
].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
++taskFunctionWorkerUsage.tasks.executing
- this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
+ updateWaitTimeWorkerUsage(
+ this.workerChoiceStrategyContext,
+ taskFunctionWorkerUsage,
+ task
+ )
}
}
workerNodeKey: number,
message: MessageValue<Response>
): void {
+ let needWorkerChoiceStrategyUpdate = false
if (this.workerNodes[workerNodeKey]?.usage != null) {
const workerUsage = this.workerNodes[workerNodeKey].usage
- this.updateTaskStatisticsWorkerUsage(workerUsage, message)
- this.updateRunTimeWorkerUsage(workerUsage, message)
- this.updateEluWorkerUsage(workerUsage, message)
+ updateTaskStatisticsWorkerUsage(workerUsage, message)
+ updateRunTimeWorkerUsage(
+ this.workerChoiceStrategyContext,
+ workerUsage,
+ message
+ )
+ updateEluWorkerUsage(
+ this.workerChoiceStrategyContext,
+ workerUsage,
+ message
+ )
+ needWorkerChoiceStrategyUpdate = true
}
if (
this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
].getTaskFunctionWorkerUsage(
message.taskPerformance?.name as string
) as WorkerUsage
- this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
- this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
- this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
+ updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
+ updateRunTimeWorkerUsage(
+ this.workerChoiceStrategyContext,
+ taskFunctionWorkerUsage,
+ message
+ )
+ updateEluWorkerUsage(
+ this.workerChoiceStrategyContext,
+ taskFunctionWorkerUsage,
+ message
+ )
+ needWorkerChoiceStrategyUpdate = true
+ }
+ if (needWorkerChoiceStrategyUpdate) {
+ this.workerChoiceStrategyContext.update(workerNodeKey)
}
}
)
}
- private updateTaskStatisticsWorkerUsage (
- workerUsage: WorkerUsage,
- message: MessageValue<Response>
- ): void {
- const workerTaskStatistics = workerUsage.tasks
- if (
- workerTaskStatistics.executing != null &&
- workerTaskStatistics.executing > 0
- ) {
- --workerTaskStatistics.executing
- }
- if (message.workerError == null) {
- ++workerTaskStatistics.executed
- } else {
- ++workerTaskStatistics.failed
- }
- }
-
- private updateRunTimeWorkerUsage (
- workerUsage: WorkerUsage,
- message: MessageValue<Response>
- ): void {
- if (message.workerError != null) {
- return
- }
- updateMeasurementStatistics(
- workerUsage.runTime,
- this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
- message.taskPerformance?.runTime ?? 0
- )
- }
-
- private updateWaitTimeWorkerUsage (
- workerUsage: WorkerUsage,
- task: Task<Data>
- ): void {
- const timestamp = performance.now()
- const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
- updateMeasurementStatistics(
- workerUsage.waitTime,
- this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
- taskWaitTime
- )
- }
-
- private updateEluWorkerUsage (
- workerUsage: WorkerUsage,
- message: MessageValue<Response>
- ): void {
- if (message.workerError != null) {
- return
- }
- const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
- this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
- updateMeasurementStatistics(
- workerUsage.elu.active,
- eluTaskStatisticsRequirements,
- message.taskPerformance?.elu?.active ?? 0
- )
- updateMeasurementStatistics(
- workerUsage.elu.idle,
- eluTaskStatisticsRequirements,
- message.taskPerformance?.elu?.idle ?? 0
- )
- if (eluTaskStatisticsRequirements.aggregate) {
- if (message.taskPerformance?.elu != null) {
- if (workerUsage.elu.utilization != null) {
- workerUsage.elu.utilization =
- (workerUsage.elu.utilization +
- message.taskPerformance.elu.utilization) /
- 2
- } else {
- workerUsage.elu.utilization = message.taskPerformance.elu.utilization
- }
- }
- }
- }
-
/**
* Chooses a worker node for the next task.
*
transferList?: TransferListItem[]
): void
- /**
- * Creates a new worker.
- *
- * @returns Newly created worker.
- */
- protected abstract createWorker (): Worker
-
/**
* Creates a new, completely set up worker node.
*
* @returns New, completely set up worker node key.
*/
protected createAndSetupWorkerNode (): number {
- const worker = this.createWorker()
-
- worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
- worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
- worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
- worker.on('error', error => {
- const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
- this.flagWorkerNodeAsNotReady(workerNodeKey)
- const workerInfo = this.getWorkerInfo(workerNodeKey)
+ const workerNode = this.createWorkerNode()
+ workerNode.registerWorkerEventHandler(
+ 'online',
+ this.opts.onlineHandler ?? EMPTY_FUNCTION
+ )
+ workerNode.registerWorkerEventHandler(
+ 'message',
+ this.opts.messageHandler ?? EMPTY_FUNCTION
+ )
+ workerNode.registerWorkerEventHandler(
+ 'error',
+ this.opts.errorHandler ?? EMPTY_FUNCTION
+ )
+ workerNode.registerWorkerEventHandler('error', (error: Error) => {
+ workerNode.info.ready = false
this.emitter?.emit(PoolEvents.error, error)
- this.workerNodes[workerNodeKey].closeChannel()
if (
this.started &&
!this.starting &&
!this.destroying &&
this.opts.restartWorkerOnError === true
) {
- if (workerInfo.dynamic) {
+ if (workerNode.info.dynamic) {
this.createAndSetupDynamicWorkerNode()
} else {
this.createAndSetupWorkerNode()
}
}
if (this.started && this.opts.enableTasksQueue === true) {
- this.redistributeQueuedTasks(workerNodeKey)
+ this.redistributeQueuedTasks(this.workerNodes.indexOf(workerNode))
}
+ workerNode?.terminate().catch(error => {
+ this.emitter?.emit(PoolEvents.error, error)
+ })
})
- worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
- worker.once('exit', () => {
- this.removeWorkerNode(worker)
+ workerNode.registerWorkerEventHandler(
+ 'exit',
+ this.opts.exitHandler ?? EMPTY_FUNCTION
+ )
+ workerNode.registerOnceWorkerEventHandler('exit', () => {
+ this.removeWorkerNode(workerNode)
})
-
- const workerNodeKey = this.addWorkerNode(worker)
-
+ const workerNodeKey = this.addWorkerNode(workerNode)
this.afterWorkerNodeSetup(workerNodeKey)
-
return workerNodeKey
}
// Listen to worker messages.
this.registerWorkerMessageListener(
workerNodeKey,
- this.workerMessageListener.bind(this)
+ this.workerMessageListener
)
// Send the startup message to worker.
this.sendStartupMessageToWorker(workerNodeKey)
})
}
+ private handleTask (workerNodeKey: number, task: Task<Data>): void {
+ if (this.shallExecuteTask(workerNodeKey)) {
+ this.executeTask(workerNodeKey, task)
+ } else {
+ this.enqueueTask(workerNodeKey, task)
+ }
+ }
+
private redistributeQueuedTasks (workerNodeKey: number): void {
+ if (workerNodeKey === -1) {
+ return
+ }
+ if (this.workerNodes.length <= 1) {
+ return
+ }
while (this.tasksQueueSize(workerNodeKey) > 0) {
const destinationWorkerNodeKey = this.workerNodes.reduce(
(minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
},
0
)
- const task = this.dequeueTask(workerNodeKey) as Task<Data>
- if (this.shallExecuteTask(destinationWorkerNodeKey)) {
- this.executeTask(destinationWorkerNodeKey, task)
- } else {
- this.enqueueTask(destinationWorkerNodeKey, task)
- }
+ this.handleTask(
+ destinationWorkerNodeKey,
+ this.dequeueTask(workerNodeKey) as Task<Data>
+ )
}
}
eventDetail: WorkerNodeEventDetail,
previousStolenTask?: Task<Data>
): void => {
+ if (this.workerNodes.length <= 1) {
+ return
+ }
const { workerNodeKey } = eventDetail
if (workerNodeKey == null) {
throw new Error(
)
if (sourceWorkerNode != null) {
const task = sourceWorkerNode.popTask() as Task<Data>
- if (this.shallExecuteTask(workerNodeKey)) {
- this.executeTask(workerNodeKey, task)
- } else {
- this.enqueueTask(workerNodeKey, task)
- }
+ this.handleTask(workerNodeKey, task)
this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
this.updateTaskStolenStatisticsWorkerUsage(
workerNodeKey,
private readonly handleBackPressureEvent = (
eventDetail: WorkerNodeEventDetail
): void => {
+ if (this.workerNodes.length <= 1) {
+ return
+ }
const { workerId } = eventDetail
const sizeOffset = 1
if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) {
(this.opts.tasksQueueOptions?.size as number) - sizeOffset
) {
const task = sourceWorkerNode.popTask() as Task<Data>
- if (this.shallExecuteTask(workerNodeKey)) {
- this.executeTask(workerNodeKey, task)
- } else {
- this.enqueueTask(workerNodeKey, task)
- }
+ this.handleTask(workerNodeKey, task)
this.updateTaskStolenStatisticsWorkerUsage(
workerNodeKey,
task.name as string
/**
* This method is the message listener registered on each worker.
*/
- protected workerMessageListener (message: MessageValue<Response>): void {
+ protected readonly workerMessageListener = (
+ message: MessageValue<Response>
+ ): void => {
this.checkMessageWorkerId(message)
const { workerId, ready, taskId, taskFunctionNames } = message
if (ready != null && taskFunctionNames != null) {
const { workerId, taskId, workerError, data } = message
const promiseResponse = this.promiseResponseMap.get(taskId as string)
if (promiseResponse != null) {
- const { resolve, reject, workerNodeKey } = promiseResponse
+ const { resolve, reject, workerNodeKey, asyncResource } = promiseResponse
+ const workerNode = this.workerNodes[workerNodeKey]
if (workerError != null) {
this.emitter?.emit(PoolEvents.taskError, workerError)
- reject(workerError.message)
+ asyncResource != null
+ ? asyncResource.runInAsyncScope(
+ reject,
+ this.emitter,
+ workerError.message
+ )
+ : reject(workerError.message)
} else {
- resolve(data as Response)
+ asyncResource != null
+ ? asyncResource.runInAsyncScope(resolve, this.emitter, data)
+ : resolve(data as Response)
}
+ asyncResource?.emitDestroy()
this.afterTaskExecutionHook(workerNodeKey, message)
- this.workerChoiceStrategyContext.update(workerNodeKey)
this.promiseResponseMap.delete(taskId as string)
- if (this.opts.enableTasksQueue === true) {
- const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks
+ workerNode?.emit('taskFinished', taskId)
+ if (this.opts.enableTasksQueue === true && !this.destroying) {
+ const workerNodeTasksUsage = workerNode.usage.tasks
if (
this.tasksQueueSize(workerNodeKey) > 0 &&
workerNodeTasksUsage.executing <
this.tasksQueueSize(workerNodeKey) === 0 &&
workerNodeTasksUsage.sequentiallyStolen === 0
) {
- this.workerNodes[workerNodeKey].emit('idleWorkerNode', {
+ workerNode.emit('idleWorkerNode', {
workerId: workerId as number,
workerNodeKey
})
}
/**
- * Adds the given worker in the pool worker nodes.
+ * Creates a worker node.
*
- * @param worker - The worker.
- * @returns The added worker node key.
- * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
+ * @returns The created worker node.
*/
- private addWorkerNode (worker: Worker): number {
+ private createWorkerNode (): IWorkerNode<Worker, Data> {
const workerNode = new WorkerNode<Worker, Data>(
- worker,
- this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2)
+ this.worker,
+ this.filePath,
+ {
+ env: this.opts.env,
+ workerOptions: this.opts.workerOptions,
+ tasksQueueBackPressureSize:
+ this.opts.tasksQueueOptions?.size ??
+ getDefaultTasksQueueOptions(
+ this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
+ ).size
+ }
)
// Flag the worker node as ready at pool startup.
if (this.starting) {
workerNode.info.ready = true
}
+ return workerNode
+ }
+
+ /**
+ * Adds the given worker node in the pool worker nodes.
+ *
+ * @param workerNode - The worker node.
+ * @returns The added worker node key.
+ * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
+ */
+ private addWorkerNode (workerNode: IWorkerNode<Worker, Data>): number {
this.workerNodes.push(workerNode)
- const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
+ const workerNodeKey = this.workerNodes.indexOf(workerNode)
if (workerNodeKey === -1) {
throw new Error('Worker added not found in worker nodes')
}
}
/**
- * Removes the given worker from the pool worker nodes.
+ * Removes the worker node from the pool worker nodes.
*
- * @param worker - The worker.
+ * @param workerNode - The worker node.
*/
- private removeWorkerNode (worker: Worker): void {
- const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
+ private removeWorkerNode (workerNode: IWorkerNode<Worker, Data>): void {
+ const workerNodeKey = this.workerNodes.indexOf(workerNode)
if (workerNodeKey !== -1) {
this.workerNodes.splice(workerNodeKey, 1)
this.workerChoiceStrategyContext.remove(workerNodeKey)
return this.workerNodes[workerNodeKey].tasksQueueSize()
}
- protected flushTasksQueue (workerNodeKey: number): void {
+ protected flushTasksQueue (workerNodeKey: number): number {
+ let flushedTasks = 0
while (this.tasksQueueSize(workerNodeKey) > 0) {
this.executeTask(
workerNodeKey,
this.dequeueTask(workerNodeKey) as Task<Data>
)
+ ++flushedTasks
}
this.workerNodes[workerNodeKey].clearTasksQueue()
+ return flushedTasks
}
private flushTasksQueues (): void {