max,
median,
min,
+ once,
round
} from '../utils'
import { KillBehaviors } from '../worker/worker-options'
private checkMessageWorkerId (message: MessageValue<Data | Response>): void {
if (message.workerId == null) {
throw new Error('Worker message received without worker id')
- } else if (
- message.workerId != null &&
- this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
- ) {
+ } else if (this.getWorkerNodeKeyByWorkerId(message.workerId) === -1) {
throw new Error(
`Worker message received from unknown worker '${message.workerId}'`
)
)
}
}
+ // FIXME: should be registered only once
this.registerWorkerMessageListener(workerNodeKey, killMessageListener)
this.sendToWorker(workerNodeKey, { kill: true })
})
worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
worker.on('error', error => {
const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
+ this.flagWorkerNodeAsNotReady(workerNodeKey)
const workerInfo = this.getWorkerInfo(workerNodeKey)
- workerInfo.ready = false
this.emitter?.emit(PoolEvents.error, error)
this.workerNodes[workerNodeKey].closeChannel()
if (
workerUsage.tasks.executing === 0 &&
this.tasksQueueSize(localWorkerNodeKey) === 0)))
) {
+ // Flag the worker node as not ready immediately
+ this.flagWorkerNodeAsNotReady(localWorkerNodeKey)
this.destroyWorkerNode(localWorkerNodeKey).catch(error => {
this.emitter?.emit(PoolEvents.error, error)
})
workerInfo.ready = message.ready as boolean
workerInfo.taskFunctionNames = message.taskFunctionNames
if (this.ready) {
- this.emitter?.emit(PoolEvents.ready, this.info)
+ const emitPoolReadyEventOnce = once(
+ () => this.emitter?.emit(PoolEvents.ready, this.info),
+ this
+ )
+ emitPoolReadyEventOnce()
}
}
* @returns The worker information.
*/
protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
- return this.workerNodes[workerNodeKey].info
+ return this.workerNodes[workerNodeKey]?.info
}
/**
}
}
+ protected flagWorkerNodeAsNotReady (workerNodeKey: number): void {
+ this.getWorkerInfo(workerNodeKey).ready = false
+ }
+
/** @inheritDoc */
public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
return (