this.setupHook()
- while (this.workerNodes.length < this.numberOfWorkers) {
+ while (
+ this.workerNodes.reduce(
+ (accumulator, workerNode) =>
+ !workerNode.info.dynamic ? accumulator + 1 : accumulator,
+ 0
+ ) < this.numberOfWorkers
+ ) {
this.createAndSetupWorker()
}
throw new RangeError(
'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
)
- } else if (min === 0 && max === 0) {
+ } else if (max === 0) {
throw new RangeError(
- 'Cannot instantiate a dynamic pool with a minimum pool size and a maximum pool size equal to zero'
+ 'Cannot instantiate a dynamic pool with a pool size equal to zero'
)
} else if (min === max) {
throw new RangeError(
private get starting (): boolean {
return (
- this.workerNodes.length < this.minSize ||
- (this.workerNodes.length >= this.minSize &&
- this.workerNodes.some(workerNode => !workerNode.info.ready))
+ this.workerNodes.reduce(
+ (accumulator, workerNode) =>
+ !workerNode.info.dynamic ? accumulator + 1 : accumulator,
+ 0
+ ) < this.minSize
)
}
private get ready (): boolean {
return (
- this.workerNodes.length >= this.minSize &&
- this.workerNodes.every(workerNode => workerNode.info.ready)
+ this.workerNodes.reduce(
+ (accumulator, workerNode) =>
+ !workerNode.info.dynamic && workerNode.info.ready
+ ? accumulator + 1
+ : accumulator,
+ 0
+ ) >= this.minSize
)
}
this.removeWorkerNode(worker)
})
- this.pushWorkerNode(worker)
+ this.addWorkerNode(worker)
this.afterWorkerSetup(worker)
})
const workerInfo = this.getWorkerInfo(this.getWorkerNodeKey(worker))
workerInfo.dynamic = true
+ if (this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker) {
+ workerInfo.ready = true
+ }
this.sendToWorker(worker, {
- checkAlive: true,
+ checkActive: true,
workerId: workerInfo.id as number
})
return worker
// Listen to worker messages.
this.registerWorkerMessageListener(worker, this.workerListener())
// Send startup message to worker.
+ this.sendWorkerStartupMessage(worker)
+ // Setup worker task statistics computation.
+ this.setWorkerStatistics(worker)
+ }
+
+ private sendWorkerStartupMessage (worker: Worker): void {
this.sendToWorker(worker, {
ready: false,
workerId: this.getWorkerInfo(this.getWorkerNodeKey(worker)).id as number
})
- // Setup worker task statistics computation.
- this.setWorkerStatistics(worker)
}
private redistributeQueuedTasks (workerNodeKey: number): void {
protected workerListener (): (message: MessageValue<Response>) => void {
return message => {
this.checkMessageWorkerId(message)
- if (message.ready != null && message.workerId != null) {
- // Worker ready message received
- this.handleWorkerReadyMessage(message)
+ if (message.ready != null) {
+ // Worker ready response received
+ this.handleWorkerReadyResponse(message)
} else if (message.id != null) {
// Task execution response received
this.handleTaskExecutionResponse(message)
}
}
- private handleWorkerReadyMessage (message: MessageValue<Response>): void {
+ private handleWorkerReadyResponse (message: MessageValue<Response>): void {
const worker = this.getWorkerById(message.workerId)
this.getWorkerInfo(this.getWorkerNodeKey(worker as Worker)).ready =
message.ready as boolean
}
/**
- * Pushes the given worker in the pool worker nodes.
+ * Adds the given worker in the pool worker nodes.
*
* @param worker - The worker.
* @returns The worker nodes length.
*/
- private pushWorkerNode (worker: Worker): number {
- return this.workerNodes.push(new WorkerNode(worker, this.worker))
+ private addWorkerNode (worker: Worker): number {
+ const workerNode = new WorkerNode<Worker, Data>(worker, this.worker)
+ // Flag the worker node as ready at pool startup.
+ if (this.starting) {
+ workerNode.info.ready = true
+ }
+ return this.workerNodes.push(workerNode)
}
/**
}
}
+ /**
+ * Executes the given task on the given worker.
+ *
+ * @param worker - The worker.
+ * @param task - The task to execute.
+ */
private executeTask (workerNodeKey: number, task: Task<Data>): void {
this.beforeTaskExecutionHook(workerNodeKey, task)
this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)