'Cannot instantiate a pool with a negative number of workers'
)
} else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
- throw new Error('Cannot instantiate a fixed pool with no worker')
+ throw new RangeError('Cannot instantiate a fixed pool with zero worker')
+ }
+ }
+
+ protected checkDynamicPoolSize (min: number, max: number): void {
+ if (this.type === PoolTypes.dynamic && min > max) {
+ throw new RangeError(
+ 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
+ )
+ } else if (this.type === PoolTypes.dynamic && min === 0 && max === 0) {
+ throw new RangeError(
+ 'Cannot instantiate a dynamic pool with a minimum pool size and a maximum pool size equal to zero'
+ )
+ } else if (this.type === PoolTypes.dynamic && min === max) {
+ throw new RangeError(
+ 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
+ )
}
}
version,
type: this.type,
worker: this.worker,
+ ready: this.ready,
+ strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
minSize: this.minSize,
maxSize: this.maxSize,
...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
}
}
+ private get starting (): boolean {
+ return (
+ !this.full ||
+ (this.full && this.workerNodes.some(workerNode => !workerNode.info.ready))
+ )
+ }
+
+ private get ready (): boolean {
+ return (
+ this.full && this.workerNodes.every(workerNode => workerNode.info.ready)
+ )
+ }
+
/**
* Gets the approximate pool utilization.
*
?.worker
}
+ private checkMessageWorkerId (message: MessageValue<Response>): void {
+ if (
+ message.workerId != null &&
+ this.getWorkerById(message.workerId) == null
+ ) {
+ throw new Error(
+ `Worker message received from unknown worker '${message.workerId}'`
+ )
+ }
+ }
+
/**
* Gets the given worker its worker node key.
*
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions
data: data ?? ({} as Data),
timestamp,
+ workerId: this.getWorkerInfo(workerNodeKey).id as number,
id: randomUUID()
}
const res = new Promise<Response>((resolve, reject) => {
protected afterWorkerSetup (worker: Worker): void {
// Listen to worker messages.
this.registerWorkerMessageListener(worker, this.workerListener())
+ // Send startup message to worker.
+ this.sendToWorker(worker, {
+ ready: false,
+ workerId: this.getWorkerInfo(this.getWorkerNodeKey(worker)).id as number
+ })
+ // Setup worker task statistics computation.
+ this.setWorkerStatistics(worker)
}
/**
if (this.opts.enableTasksQueue === true) {
this.redistributeQueuedTasks(worker)
}
- if (this.opts.restartWorkerOnError === true) {
+ if (this.opts.restartWorkerOnError === true && !this.starting) {
if (this.getWorkerInfo(this.getWorkerNodeKey(worker)).dynamic) {
this.createAndSetupDynamicWorker()
} else {
this.pushWorkerNode(worker)
- this.setWorkerStatistics(worker)
-
this.afterWorkerSetup(worker)
return worker
*/
protected createAndSetupDynamicWorker (): Worker {
const worker = this.createAndSetupWorker()
- this.getWorkerInfo(this.getWorkerNodeKey(worker)).dynamic = true
this.registerWorkerMessageListener(worker, message => {
const workerNodeKey = this.getWorkerNodeKey(worker)
if (
void (this.destroyWorker(worker) as Promise<void>)
}
})
- this.sendToWorker(worker, { dynamic: true })
+ const workerInfo = this.getWorkerInfo(this.getWorkerNodeKey(worker))
+ workerInfo.dynamic = true
+ this.sendToWorker(worker, {
+ checkAlive: true,
+ workerId: workerInfo.id as number
+ })
return worker
}
*/
protected workerListener (): (message: MessageValue<Response>) => void {
return message => {
- if (message.workerId != null && message.started != null) {
- // Worker started message received
- this.handleWorkerStartedMessage(message)
+ this.checkMessageWorkerId(message)
+ if (message.ready != null && message.workerId != null) {
+ // Worker ready message received
+ this.handleWorkerReadyMessage(message)
} else if (message.id != null) {
// Task execution response received
this.handleTaskExecutionResponse(message)
}
}
- private handleWorkerStartedMessage (message: MessageValue<Response>): void {
- // Worker started message received
- const worker = this.getWorkerById(message.workerId as number)
- if (worker != null) {
- this.workerNodes[this.getWorkerNodeKey(worker)].info.started =
- message.started as boolean
- } else {
- throw new Error(
- `Worker started message received from unknown worker '${
- message.workerId as number
- }'`
- )
+ private handleWorkerReadyMessage (message: MessageValue<Response>): void {
+ const worker = this.getWorkerById(message.workerId)
+ this.getWorkerInfo(this.getWorkerNodeKey(worker as Worker)).ready =
+ message.ready as boolean
+ if (this.emitter != null && this.ready) {
+ this.emitter.emit(PoolEvents.ready, this.info)
}
}
.runTime.aggregate,
elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
.elu.aggregate
- }
+ },
+ workerId: this.getWorkerInfo(this.getWorkerNodeKey(worker)).id as number
})
}
}