import { performance } from 'node:perf_hooks'
import type { TransferListItem } from 'node:worker_threads'
import { EventEmitterAsyncResource } from 'node:events'
+import { AsyncResource } from 'node:async_hooks'
import type {
MessageValue,
PromiseResponseWrapper,
import type {
IWorker,
IWorkerNode,
+ TaskStatistics,
WorkerInfo,
WorkerNodeEventDetail,
WorkerType,
this.buildTasksQueueOptions(tasksQueueOptions)
this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number)
if (this.opts.tasksQueueOptions.taskStealing === true) {
+ this.unsetTaskStealing()
this.setTaskStealing()
} else {
this.unsetTaskStealing()
}
if (this.opts.tasksQueueOptions.tasksStealingOnBackPressure === true) {
+ this.unsetTasksStealingOnBackPressure()
this.setTasksStealingOnBackPressure()
} else {
this.unsetTasksStealingOnBackPressure()
private setTaskStealing (): void {
for (const [workerNodeKey] of this.workerNodes.entries()) {
- this.workerNodes[workerNodeKey].addEventListener(
+ this.workerNodes[workerNodeKey].on(
'idleWorkerNode',
- this.handleIdleWorkerNodeEvent as EventListener
+ this.handleIdleWorkerNodeEvent
)
}
}
private unsetTaskStealing (): void {
for (const [workerNodeKey] of this.workerNodes.entries()) {
- this.workerNodes[workerNodeKey].removeEventListener(
+ this.workerNodes[workerNodeKey].off(
'idleWorkerNode',
- this.handleIdleWorkerNodeEvent as EventListener
+ this.handleIdleWorkerNodeEvent
)
}
}
private setTasksStealingOnBackPressure (): void {
for (const [workerNodeKey] of this.workerNodes.entries()) {
- this.workerNodes[workerNodeKey].addEventListener(
+ this.workerNodes[workerNodeKey].on(
'backPressure',
- this.handleBackPressureEvent as EventListener
+ this.handleBackPressureEvent
)
}
}
private unsetTasksStealingOnBackPressure (): void {
for (const [workerNodeKey] of this.workerNodes.entries()) {
- this.workerNodes[workerNodeKey].removeEventListener(
+ this.workerNodes[workerNodeKey].off(
'backPressure',
- this.handleBackPressureEvent as EventListener
+ this.handleBackPressureEvent
)
}
}
this.promiseResponseMap.set(task.taskId as string, {
resolve,
reject,
- workerNodeKey
+ workerNodeKey,
+ ...(this.emitter != null && {
+ asyncResource: new AsyncResource('poolifier:task', {
+ triggerAsyncId: this.emitter.asyncId,
+ requireManualDestroy: true
+ })
+ })
})
if (
this.opts.enableTasksQueue === false ||
}
this.destroying = true
await Promise.all(
- this.workerNodes.map(async (_, workerNodeKey) => {
+ this.workerNodes.map(async (_workerNode, workerNodeKey) => {
await this.destroyWorkerNode(workerNodeKey)
})
)
this.emitter?.emit(PoolEvents.destroy, this.info)
this.emitter?.emitDestroy()
+ this.emitter?.removeAllListeners()
this.readyEventEmitted = false
this.destroying = false
this.started = false
// Listen to worker messages.
this.registerWorkerMessageListener(
workerNodeKey,
- this.workerMessageListener.bind(this)
+ this.workerMessageListener
)
// Send the startup message to worker.
this.sendStartupMessageToWorker(workerNodeKey)
this.sendStatisticsMessageToWorker(workerNodeKey)
if (this.opts.enableTasksQueue === true) {
if (this.opts.tasksQueueOptions?.taskStealing === true) {
- this.workerNodes[workerNodeKey].addEventListener(
+ this.workerNodes[workerNodeKey].on(
'idleWorkerNode',
- this.handleIdleWorkerNodeEvent as EventListener
+ this.handleIdleWorkerNodeEvent
)
}
if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
- this.workerNodes[workerNodeKey].addEventListener(
+ this.workerNodes[workerNodeKey].on(
'backPressure',
- this.handleBackPressureEvent as EventListener
+ this.handleBackPressureEvent
)
}
}
}
private redistributeQueuedTasks (workerNodeKey: number): void {
+ if (this.workerNodes.length <= 1) {
+ return
+ }
while (this.tasksQueueSize(workerNodeKey) > 0) {
const destinationWorkerNodeKey = this.workerNodes.reduce(
(minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
}
private updateTaskSequentiallyStolenStatisticsWorkerUsage (
- workerNodeKey: number,
- taskName: string
+ workerNodeKey: number
): void {
const workerNode = this.workerNodes[workerNodeKey]
if (workerNode?.usage != null) {
++workerNode.usage.tasks.sequentiallyStolen
}
+ }
+
+ private updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
+ workerNodeKey: number,
+ taskName: string
+ ): void {
+ const workerNode = this.workerNodes[workerNodeKey]
if (
this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
workerNode.getTaskFunctionWorkerUsage(taskName) != null
}
private resetTaskSequentiallyStolenStatisticsWorkerUsage (
- workerNodeKey: number,
- taskName: string
+ workerNodeKey: number
): void {
const workerNode = this.workerNodes[workerNodeKey]
if (workerNode?.usage != null) {
workerNode.usage.tasks.sequentiallyStolen = 0
}
+ }
+
+ private resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
+ workerNodeKey: number,
+ taskName: string
+ ): void {
+ const workerNode = this.workerNodes[workerNodeKey]
if (
this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
workerNode.getTaskFunctionWorkerUsage(taskName) != null
}
private readonly handleIdleWorkerNodeEvent = (
- event: CustomEvent<WorkerNodeEventDetail>,
+ eventDetail: WorkerNodeEventDetail,
previousStolenTask?: Task<Data>
): void => {
- const { workerNodeKey } = event.detail
+ if (this.workerNodes.length <= 1) {
+ return
+ }
+ const { workerNodeKey } = eventDetail
if (workerNodeKey == null) {
throw new Error(
'WorkerNode event detail workerNodeKey attribute must be defined'
(workerNodeTasksUsage.executing > 0 ||
this.tasksQueueSize(workerNodeKey) > 0)
) {
- this.resetTaskSequentiallyStolenStatisticsWorkerUsage(
- workerNodeKey,
- previousStolenTask.name as string
- )
+ for (const taskName of this.workerNodes[workerNodeKey].info
+ .taskFunctionNames as string[]) {
+ this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
+ workerNodeKey,
+ taskName
+ )
+ }
+ this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
return
}
const stolenTask = this.workerNodeStealTask(workerNodeKey)
+ if (
+ this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
+ stolenTask != null
+ ) {
+ const taskFunctionTasksWorkerUsage = this.workerNodes[
+ workerNodeKey
+ ].getTaskFunctionWorkerUsage(stolenTask.name as string)
+ ?.tasks as TaskStatistics
+ if (
+ taskFunctionTasksWorkerUsage.sequentiallyStolen === 0 ||
+ (previousStolenTask != null &&
+ previousStolenTask.name === stolenTask.name &&
+ taskFunctionTasksWorkerUsage.sequentiallyStolen > 0)
+ ) {
+ this.updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
+ workerNodeKey,
+ stolenTask.name as string
+ )
+ } else {
+ this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
+ workerNodeKey,
+ stolenTask.name as string
+ )
+ }
+ }
sleep(exponentialDelay(workerNodeTasksUsage.sequentiallyStolen))
.then(() => {
- this.handleIdleWorkerNodeEvent(event, stolenTask)
+ this.handleIdleWorkerNodeEvent(eventDetail, stolenTask)
return undefined
})
.catch(EMPTY_FUNCTION)
} else {
this.enqueueTask(workerNodeKey, task)
}
- this.updateTaskSequentiallyStolenStatisticsWorkerUsage(
- workerNodeKey,
- task.name as string
- )
+ this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
this.updateTaskStolenStatisticsWorkerUsage(
workerNodeKey,
task.name as string
}
private readonly handleBackPressureEvent = (
- event: CustomEvent<WorkerNodeEventDetail>
+ eventDetail: WorkerNodeEventDetail
): void => {
- const { workerId } = event.detail
+ if (this.workerNodes.length <= 1) {
+ return
+ }
+ const { workerId } = eventDetail
const sizeOffset = 1
if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) {
return
/**
* This method is the message listener registered on each worker.
*/
- protected workerMessageListener (message: MessageValue<Response>): void {
+ protected readonly workerMessageListener = (
+ message: MessageValue<Response>
+ ): void => {
this.checkMessageWorkerId(message)
- if (message.ready != null && message.taskFunctionNames != null) {
+ const { workerId, ready, taskId, taskFunctionNames } = message
+ if (ready != null && taskFunctionNames != null) {
// Worker ready response received from worker
this.handleWorkerReadyResponse(message)
- } else if (message.taskId != null) {
+ } else if (taskId != null) {
// Task execution response received from worker
this.handleTaskExecutionResponse(message)
- } else if (message.taskFunctionNames != null) {
+ } else if (taskFunctionNames != null) {
// Task function names message received from worker
this.getWorkerInfo(
- this.getWorkerNodeKeyByWorkerId(message.workerId)
- ).taskFunctionNames = message.taskFunctionNames
+ this.getWorkerNodeKeyByWorkerId(workerId)
+ ).taskFunctionNames = taskFunctionNames
}
}
const { workerId, taskId, workerError, data } = message
const promiseResponse = this.promiseResponseMap.get(taskId as string)
if (promiseResponse != null) {
- const { resolve, reject, workerNodeKey } = promiseResponse
+ const { resolve, reject, workerNodeKey, asyncResource } = promiseResponse
if (workerError != null) {
this.emitter?.emit(PoolEvents.taskError, workerError)
- reject(workerError.message)
+ asyncResource != null
+ ? asyncResource.runInAsyncScope(
+ reject,
+ this.emitter,
+ workerError.message
+ )
+ : reject(workerError.message)
} else {
- resolve(data as Response)
+ asyncResource != null
+ ? asyncResource.runInAsyncScope(resolve, this.emitter, data)
+ : resolve(data as Response)
}
+ asyncResource?.emitDestroy()
this.afterTaskExecutionHook(workerNodeKey, message)
this.workerChoiceStrategyContext.update(workerNodeKey)
this.promiseResponseMap.delete(taskId as string)
this.tasksQueueSize(workerNodeKey) === 0 &&
workerNodeTasksUsage.sequentiallyStolen === 0
) {
- this.workerNodes[workerNodeKey].dispatchEvent(
- new CustomEvent<WorkerNodeEventDetail>('idleWorkerNode', {
- detail: { workerId: workerId as number, workerNodeKey }
- })
- )
+ this.workerNodes[workerNodeKey].emit('idleWorkerNode', {
+ workerId: workerId as number,
+ workerNodeKey
+ })
}
}
}