max,
median,
min,
+ once,
round
} from '../utils'
import { KillBehaviors } from '../worker/worker-options'
IWorker,
IWorkerNode,
WorkerInfo,
+ WorkerNodeEventDetail,
WorkerType,
WorkerUsage
} from './worker'
private checkMessageWorkerId (message: MessageValue<Data | Response>): void {
if (message.workerId == null) {
throw new Error('Worker message received without worker id')
- } else if (
- message.workerId != null &&
- this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
- ) {
+ } else if (this.getWorkerNodeKeyByWorkerId(message.workerId) === -1) {
throw new Error(
`Worker message received from unknown worker '${message.workerId}'`
)
private setTaskStealing (): void {
for (const [workerNodeKey] of this.workerNodes.entries()) {
- this.workerNodes[workerNodeKey].onEmptyQueue =
- this.taskStealingOnEmptyQueue.bind(this)
+ this.workerNodes[workerNodeKey].addEventListener(
+ 'emptyqueue',
+ this.handleEmptyQueueEvent as EventListener
+ )
}
}
private unsetTaskStealing (): void {
for (const [workerNodeKey] of this.workerNodes.entries()) {
- delete this.workerNodes[workerNodeKey].onEmptyQueue
+ this.workerNodes[workerNodeKey].removeEventListener(
+ 'emptyqueue',
+ this.handleEmptyQueueEvent as EventListener
+ )
}
}
private setTasksStealingOnBackPressure (): void {
for (const [workerNodeKey] of this.workerNodes.entries()) {
- this.workerNodes[workerNodeKey].onBackPressure =
- this.tasksStealingOnBackPressure.bind(this)
+ this.workerNodes[workerNodeKey].addEventListener(
+ 'backpressure',
+ this.handleBackPressureEvent as EventListener
+ )
}
}
private unsetTasksStealingOnBackPressure (): void {
for (const [workerNodeKey] of this.workerNodes.entries()) {
- delete this.workerNodes[workerNodeKey].onBackPressure
+ this.workerNodes[workerNodeKey].removeEventListener(
+ 'backpressure',
+ this.handleBackPressureEvent as EventListener
+ )
}
}
)
}
}
+ // FIXME: should be registered only once
this.registerWorkerMessageListener(workerNodeKey, killMessageListener)
this.sendToWorker(workerNodeKey, { kill: true })
})
worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
worker.on('error', error => {
const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
+ this.flagWorkerNodeAsNotReady(workerNodeKey)
const workerInfo = this.getWorkerInfo(workerNodeKey)
- workerInfo.ready = false
- this.workerNodes[workerNodeKey].closeChannel()
this.emitter?.emit(PoolEvents.error, error)
+ this.workerNodes[workerNodeKey].closeChannel()
if (
this.started &&
!this.starting &&
workerUsage.tasks.executing === 0 &&
this.tasksQueueSize(localWorkerNodeKey) === 0)))
) {
+ // Flag the worker node as not ready immediately
+ this.flagWorkerNodeAsNotReady(localWorkerNodeKey)
this.destroyWorkerNode(localWorkerNodeKey).catch(error => {
this.emitter?.emit(PoolEvents.error, error)
})
this.sendStatisticsMessageToWorker(workerNodeKey)
if (this.opts.enableTasksQueue === true) {
if (this.opts.tasksQueueOptions?.taskStealing === true) {
- this.workerNodes[workerNodeKey].onEmptyQueue =
- this.taskStealingOnEmptyQueue.bind(this)
+ this.workerNodes[workerNodeKey].addEventListener(
+ 'emptyqueue',
+ this.handleEmptyQueueEvent as EventListener
+ )
}
if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
- this.workerNodes[workerNodeKey].onBackPressure =
- this.tasksStealingOnBackPressure.bind(this)
+ this.workerNodes[workerNodeKey].addEventListener(
+ 'backpressure',
+ this.handleBackPressureEvent as EventListener
+ )
}
}
}
}
}
- private taskStealingOnEmptyQueue (workerId: number): void {
- const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
+ private readonly handleEmptyQueueEvent = (
+ event: CustomEvent<WorkerNodeEventDetail>
+ ): void => {
+ const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
+ event.detail.workerId
+ )
const workerNodes = this.workerNodes
.slice()
.sort(
const sourceWorkerNode = workerNodes.find(
workerNode =>
workerNode.info.ready &&
- workerNode.info.id !== workerId &&
+ workerNode.info.id !== event.detail.workerId &&
workerNode.usage.tasks.queued > 0
)
if (sourceWorkerNode != null) {
}
}
- private tasksStealingOnBackPressure (workerId: number): void {
+ private readonly handleBackPressureEvent = (
+ event: CustomEvent<WorkerNodeEventDetail>
+ ): void => {
const sizeOffset = 1
if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) {
return
}
const sourceWorkerNode =
- this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
+ this.workerNodes[this.getWorkerNodeKeyByWorkerId(event.detail.workerId)]
const workerNodes = this.workerNodes
.slice()
.sort(
if (
sourceWorkerNode.usage.tasks.queued > 0 &&
workerNode.info.ready &&
- workerNode.info.id !== workerId &&
+ workerNode.info.id !== event.detail.workerId &&
workerNode.usage.tasks.queued <
(this.opts.tasksQueueOptions?.size as number) - sizeOffset
) {
workerInfo.ready = message.ready as boolean
workerInfo.taskFunctionNames = message.taskFunctionNames
if (this.ready) {
- this.emitter?.emit(PoolEvents.ready, this.info)
+ const emitPoolReadyEventOnce = once(
+ () => this.emitter?.emit(PoolEvents.ready, this.info),
+ this
+ )
+ emitPoolReadyEventOnce()
}
}
* @returns The worker information.
*/
protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
- return this.workerNodes[workerNodeKey].info
+ return this.workerNodes[workerNodeKey]?.info
}
/**
}
}
+ protected flagWorkerNodeAsNotReady (workerNodeKey: number): void {
+ this.getWorkerInfo(workerNodeKey).ready = false
+ }
+
/** @inheritDoc */
public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
return (