this.buildTasksQueueOptions(tasksQueueOptions)
this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number)
if (this.opts.tasksQueueOptions.taskStealing === true) {
+ this.unsetTaskStealing()
this.setTaskStealing()
} else {
this.unsetTaskStealing()
}
if (this.opts.tasksQueueOptions.tasksStealingOnBackPressure === true) {
+ this.unsetTasksStealingOnBackPressure()
this.setTasksStealingOnBackPressure()
} else {
this.unsetTasksStealingOnBackPressure()
private setTaskStealing (): void {
for (const [workerNodeKey] of this.workerNodes.entries()) {
- this.workerNodes[workerNodeKey].addEventListener(
+ this.workerNodes[workerNodeKey].on(
'idleWorkerNode',
- this.handleIdleWorkerNodeEvent as EventListener
+ this.handleIdleWorkerNodeEvent
)
}
}
private unsetTaskStealing (): void {
for (const [workerNodeKey] of this.workerNodes.entries()) {
- this.workerNodes[workerNodeKey].removeEventListener(
+ this.workerNodes[workerNodeKey].off(
'idleWorkerNode',
- this.handleIdleWorkerNodeEvent as EventListener
+ this.handleIdleWorkerNodeEvent
)
}
}
private setTasksStealingOnBackPressure (): void {
for (const [workerNodeKey] of this.workerNodes.entries()) {
- this.workerNodes[workerNodeKey].addEventListener(
+ this.workerNodes[workerNodeKey].on(
'backPressure',
- this.handleBackPressureEvent as EventListener
+ this.handleBackPressureEvent
)
}
}
private unsetTasksStealingOnBackPressure (): void {
for (const [workerNodeKey] of this.workerNodes.entries()) {
- this.workerNodes[workerNodeKey].removeEventListener(
+ this.workerNodes[workerNodeKey].off(
'backPressure',
- this.handleBackPressureEvent as EventListener
+ this.handleBackPressureEvent
)
}
}
}
this.destroying = true
await Promise.all(
- this.workerNodes.map(async (_, workerNodeKey) => {
+ this.workerNodes.map(async (_workerNode, workerNodeKey) => {
await this.destroyWorkerNode(workerNodeKey)
})
)
this.emitter?.emit(PoolEvents.destroy, this.info)
this.emitter?.emitDestroy()
+ this.emitter?.removeAllListeners()
this.readyEventEmitted = false
this.destroying = false
this.started = false
// Listen to worker messages.
this.registerWorkerMessageListener(
workerNodeKey,
- this.workerMessageListener.bind(this)
+ this.workerMessageListener
)
// Send the startup message to worker.
this.sendStartupMessageToWorker(workerNodeKey)
this.sendStatisticsMessageToWorker(workerNodeKey)
if (this.opts.enableTasksQueue === true) {
if (this.opts.tasksQueueOptions?.taskStealing === true) {
- this.workerNodes[workerNodeKey].addEventListener(
+ this.workerNodes[workerNodeKey].on(
'idleWorkerNode',
- this.handleIdleWorkerNodeEvent as EventListener
+ this.handleIdleWorkerNodeEvent
)
}
if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
- this.workerNodes[workerNodeKey].addEventListener(
+ this.workerNodes[workerNodeKey].on(
'backPressure',
- this.handleBackPressureEvent as EventListener
+ this.handleBackPressureEvent
)
}
}
}
private redistributeQueuedTasks (workerNodeKey: number): void {
+ if (this.workerNodes.length <= 1) {
+ return
+ }
while (this.tasksQueueSize(workerNodeKey) > 0) {
const destinationWorkerNodeKey = this.workerNodes.reduce(
(minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
}
private readonly handleIdleWorkerNodeEvent = (
- event: CustomEvent<WorkerNodeEventDetail>,
+ eventDetail: WorkerNodeEventDetail,
previousStolenTask?: Task<Data>
): void => {
- const { workerNodeKey } = event.detail
+ if (this.workerNodes.length <= 1) {
+ return
+ }
+ const { workerNodeKey } = eventDetail
if (workerNodeKey == null) {
throw new Error(
'WorkerNode event detail workerNodeKey attribute must be defined'
}
sleep(exponentialDelay(workerNodeTasksUsage.sequentiallyStolen))
.then(() => {
- this.handleIdleWorkerNodeEvent(event, stolenTask)
+ this.handleIdleWorkerNodeEvent(eventDetail, stolenTask)
return undefined
})
.catch(EMPTY_FUNCTION)
}
private readonly handleBackPressureEvent = (
- event: CustomEvent<WorkerNodeEventDetail>
+ eventDetail: WorkerNodeEventDetail
): void => {
- const { workerId } = event.detail
+ if (this.workerNodes.length <= 1) {
+ return
+ }
+ const { workerId } = eventDetail
const sizeOffset = 1
if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) {
return
/**
* This method is the message listener registered on each worker.
*/
- protected workerMessageListener (message: MessageValue<Response>): void {
+ protected readonly workerMessageListener = (
+ message: MessageValue<Response>
+ ): void => {
this.checkMessageWorkerId(message)
const { workerId, ready, taskId, taskFunctionNames } = message
if (ready != null && taskFunctionNames != null) {
this.tasksQueueSize(workerNodeKey) === 0 &&
workerNodeTasksUsage.sequentiallyStolen === 0
) {
- this.workerNodes[workerNodeKey].dispatchEvent(
- new CustomEvent<WorkerNodeEventDetail>('idleWorkerNode', {
- detail: { workerId: workerId as number, workerNodeKey }
- })
- )
+ this.workerNodes[workerNodeKey].emit('idleWorkerNode', {
+ workerId: workerId as number,
+ workerNodeKey
+ })
}
}
}