import { randomUUID } from 'node:crypto'
import { performance } from 'node:perf_hooks'
-import { type TransferListItem } from 'node:worker_threads'
+import type { TransferListItem } from 'node:worker_threads'
+import { EventEmitterAsyncResource } from 'node:events'
import type {
MessageValue,
PromiseResponseWrapper,
max,
median,
min,
+ once,
round
} from '../utils'
import { KillBehaviors } from '../worker/worker-options'
import type { TaskFunction } from '../worker/task-functions'
import {
type IPool,
- PoolEmitter,
PoolEvents,
type PoolInfo,
type PoolOptions,
PoolTypes,
type TasksQueueOptions
} from './pool'
-import {
- type IWorker,
- type IWorkerNode,
- type WorkerInfo,
- type WorkerType,
- type WorkerUsage
+import type {
+ IWorker,
+ IWorkerNode,
+ WorkerInfo,
+ WorkerNodeEventDetail,
+ WorkerType,
+ WorkerUsage
} from './worker'
import {
type MeasurementStatisticsRequirements,
public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
/** @inheritDoc */
- public readonly emitter?: PoolEmitter
+ public emitter?: EventEmitterAsyncResource
+
+ /**
+ * Dynamic pool maximum size property placeholder.
+ */
+ protected readonly max?: number
/**
* The task execution response promise map:
Response
>
- /**
- * Dynamic pool maximum size property placeholder.
- */
- protected readonly max?: number
-
/**
* The task functions added at runtime map:
* - `key`: The task function name.
'Cannot start a pool from a worker with the same type as the pool'
)
}
- this.checkNumberOfWorkers(this.numberOfWorkers)
checkFilePath(this.filePath)
+ this.checkNumberOfWorkers(this.numberOfWorkers)
this.checkPoolOptions(this.opts)
this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
this.enqueueTask = this.enqueueTask.bind(this)
if (this.opts.enableEvents === true) {
- this.emitter = new PoolEmitter()
+ this.initializeEventEmitter()
}
this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
Worker,
}
}
+ private initializeEventEmitter (): void {
+ this.emitter = new EventEmitterAsyncResource({
+ name: `poolifier:${this.type}-${this.worker}-pool`
+ })
+ }
+
/** @inheritDoc */
public get info (): PoolInfo {
return {
* @param message - The received message.
* @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
*/
- private checkMessageWorkerId (message: MessageValue<Response>): void {
+ private checkMessageWorkerId (message: MessageValue<Data | Response>): void {
if (message.workerId == null) {
throw new Error('Worker message received without worker id')
- } else if (
- message.workerId != null &&
- this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
- ) {
+ } else if (this.getWorkerNodeKeyByWorkerId(message.workerId) === -1) {
throw new Error(
`Worker message received from unknown worker '${message.workerId}'`
)
private setTaskStealing (): void {
for (const [workerNodeKey] of this.workerNodes.entries()) {
- this.workerNodes[workerNodeKey].onEmptyQueue =
- this.taskStealingOnEmptyQueue.bind(this)
+ this.workerNodes[workerNodeKey].addEventListener(
+ 'emptyqueue',
+ this.handleEmptyQueueEvent as EventListener
+ )
}
}
private unsetTaskStealing (): void {
for (const [workerNodeKey] of this.workerNodes.entries()) {
- delete this.workerNodes[workerNodeKey].onEmptyQueue
+ this.workerNodes[workerNodeKey].removeEventListener(
+ 'emptyqueue',
+ this.handleEmptyQueueEvent as EventListener
+ )
}
}
private setTasksStealingOnBackPressure (): void {
for (const [workerNodeKey] of this.workerNodes.entries()) {
- this.workerNodes[workerNodeKey].onBackPressure =
- this.tasksStealingOnBackPressure.bind(this)
+ this.workerNodes[workerNodeKey].addEventListener(
+ 'backpressure',
+ this.handleBackPressureEvent as EventListener
+ )
}
}
private unsetTasksStealingOnBackPressure (): void {
for (const [workerNodeKey] of this.workerNodes.entries()) {
- delete this.workerNodes[workerNodeKey].onBackPressure
+ this.workerNodes[workerNodeKey].removeEventListener(
+ 'backpressure',
+ this.handleBackPressureEvent as EventListener
+ )
}
}
})
)
this.emitter?.emit(PoolEvents.destroy, this.info)
+ this.emitter?.emitDestroy()
this.started = false
}
)
}
}
+ // FIXME: should be registered only once
this.registerWorkerMessageListener(workerNodeKey, killMessageListener)
this.sendToWorker(workerNodeKey, { kill: true })
})
worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
worker.on('error', error => {
const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
+ this.flagWorkerNodeAsNotReady(workerNodeKey)
const workerInfo = this.getWorkerInfo(workerNodeKey)
- workerInfo.ready = false
- this.workerNodes[workerNodeKey].closeChannel()
this.emitter?.emit(PoolEvents.error, error)
+ this.workerNodes[workerNodeKey].closeChannel()
if (
this.started &&
!this.starting &&
protected createAndSetupDynamicWorkerNode (): number {
const workerNodeKey = this.createAndSetupWorkerNode()
this.registerWorkerMessageListener(workerNodeKey, message => {
+ this.checkMessageWorkerId(message)
const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
message.workerId
)
workerUsage.tasks.executing === 0 &&
this.tasksQueueSize(localWorkerNodeKey) === 0)))
) {
+ // Flag the worker node as not ready immediately
+ this.flagWorkerNodeAsNotReady(localWorkerNodeKey)
this.destroyWorkerNode(localWorkerNodeKey).catch(error => {
this.emitter?.emit(PoolEvents.error, error)
})
*/
protected afterWorkerNodeSetup (workerNodeKey: number): void {
// Listen to worker messages.
- this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
+ this.registerWorkerMessageListener(
+ workerNodeKey,
+ this.workerMessageListener.bind(this)
+ )
// Send the startup message to worker.
this.sendStartupMessageToWorker(workerNodeKey)
// Send the statistics message to worker.
this.sendStatisticsMessageToWorker(workerNodeKey)
if (this.opts.enableTasksQueue === true) {
if (this.opts.tasksQueueOptions?.taskStealing === true) {
- this.workerNodes[workerNodeKey].onEmptyQueue =
- this.taskStealingOnEmptyQueue.bind(this)
+ this.workerNodes[workerNodeKey].addEventListener(
+ 'emptyqueue',
+ this.handleEmptyQueueEvent as EventListener
+ )
}
if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
- this.workerNodes[workerNodeKey].onBackPressure =
- this.tasksStealingOnBackPressure.bind(this)
+ this.workerNodes[workerNodeKey].addEventListener(
+ 'backpressure',
+ this.handleBackPressureEvent as EventListener
+ )
}
}
}
}
}
- private taskStealingOnEmptyQueue (workerId: number): void {
- const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
+ private readonly handleEmptyQueueEvent = (
+ event: CustomEvent<WorkerNodeEventDetail>
+ ): void => {
+ const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
+ event.detail.workerId
+ )
const workerNodes = this.workerNodes
.slice()
.sort(
const sourceWorkerNode = workerNodes.find(
workerNode =>
workerNode.info.ready &&
- workerNode.info.id !== workerId &&
+ workerNode.info.id !== event.detail.workerId &&
workerNode.usage.tasks.queued > 0
)
if (sourceWorkerNode != null) {
}
}
- private tasksStealingOnBackPressure (workerId: number): void {
+ private readonly handleBackPressureEvent = (
+ event: CustomEvent<WorkerNodeEventDetail>
+ ): void => {
const sizeOffset = 1
if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) {
return
}
const sourceWorkerNode =
- this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
+ this.workerNodes[this.getWorkerNodeKeyByWorkerId(event.detail.workerId)]
const workerNodes = this.workerNodes
.slice()
.sort(
if (
sourceWorkerNode.usage.tasks.queued > 0 &&
workerNode.info.ready &&
- workerNode.info.id !== workerId &&
+ workerNode.info.id !== event.detail.workerId &&
workerNode.usage.tasks.queued <
(this.opts.tasksQueueOptions?.size as number) - sizeOffset
) {
}
/**
- * This method is the listener registered for each worker message.
- *
- * @returns The listener function to execute when a message is received from a worker.
+ * This method is the message listener registered on each worker.
*/
- protected workerListener (): (message: MessageValue<Response>) => void {
- return message => {
- this.checkMessageWorkerId(message)
- if (message.ready != null && message.taskFunctionNames != null) {
- // Worker ready response received from worker
- this.handleWorkerReadyResponse(message)
- } else if (message.taskId != null) {
- // Task execution response received from worker
- this.handleTaskExecutionResponse(message)
- } else if (message.taskFunctionNames != null) {
- // Task function names message received from worker
- this.getWorkerInfo(
- this.getWorkerNodeKeyByWorkerId(message.workerId)
- ).taskFunctionNames = message.taskFunctionNames
- }
+ protected workerMessageListener (message: MessageValue<Response>): void {
+ this.checkMessageWorkerId(message)
+ if (message.ready != null && message.taskFunctionNames != null) {
+ // Worker ready response received from worker
+ this.handleWorkerReadyResponse(message)
+ } else if (message.taskId != null) {
+ // Task execution response received from worker
+ this.handleTaskExecutionResponse(message)
+ } else if (message.taskFunctionNames != null) {
+ // Task function names message received from worker
+ this.getWorkerInfo(
+ this.getWorkerNodeKeyByWorkerId(message.workerId)
+ ).taskFunctionNames = message.taskFunctionNames
}
}
workerInfo.ready = message.ready as boolean
workerInfo.taskFunctionNames = message.taskFunctionNames
if (this.ready) {
- this.emitter?.emit(PoolEvents.ready, this.info)
+ const emitPoolReadyEventOnce = once(
+ () => this.emitter?.emit(PoolEvents.ready, this.info),
+ this
+ )
+ emitPoolReadyEventOnce()
}
}
* @returns The worker information.
*/
protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
- return this.workerNodes[workerNodeKey].info
+ return this.workerNodes[workerNodeKey]?.info
}
/**
}
}
+ protected flagWorkerNodeAsNotReady (workerNodeKey: number): void {
+ this.getWorkerInfo(workerNodeKey).ready = false
+ }
+
/** @inheritDoc */
public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
return (