}
private get starting (): boolean {
- return (
- this.workerNodes.length < this.minSize ||
- (this.workerNodes.length >= this.minSize &&
- this.workerNodes.some(workerNode => !workerNode.info.ready))
- )
+ return this.workerNodes.length < this.minSize
}
private get ready (): boolean {
return (
this.workerNodes.length >= this.minSize &&
- this.workerNodes.every(workerNode => workerNode.info.ready)
+ this.workerNodes.every(
+ (workerNode, workerNodeKey) =>
+ workerNodeKey < this.minSize && workerNode.info.ready
+ )
)
}
}
})
const workerInfo = this.getWorkerInfo(this.getWorkerNodeKey(worker))
+ workerInfo.ready = true
workerInfo.dynamic = true
this.sendToWorker(worker, {
checkAlive: true,
// Listen to worker messages.
this.registerWorkerMessageListener(worker, this.workerListener())
// Send startup message to worker.
+ this.sendWorkerStartupMessage(worker)
+ // Setup worker task statistics computation.
+ this.setWorkerStatistics(worker)
+ }
+
+ private sendWorkerStartupMessage (worker: Worker): void {
this.sendToWorker(worker, {
ready: false,
workerId: this.getWorkerInfo(this.getWorkerNodeKey(worker)).id as number
})
- // Setup worker task statistics computation.
- this.setWorkerStatistics(worker)
}
private redistributeQueuedTasks (workerNodeKey: number): void {
protected workerListener (): (message: MessageValue<Response>) => void {
return message => {
this.checkMessageWorkerId(message)
- if (message.ready != null && message.workerId != null) {
+ if (message.ready != null) {
// Worker ready message received
this.handleWorkerReadyMessage(message)
} else if (message.id != null) {
* @returns The worker nodes length.
*/
private pushWorkerNode (worker: Worker): number {
+ const workerNode = new WorkerNode(worker, this.worker)
+ if (this.starting) {
+ workerNode.info.ready = true
+ }
return this.workerNodes.push(new WorkerNode(worker, this.worker))
}
WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
}
)
- if (!pool.info.ready) {
- await waitPoolEvents(pool, PoolEvents.ready, 1)
- }
+ // FIXME: shall not be needed
+ await waitPoolEvents(pool, PoolEvents.ready, 1)
// TODO: Create a better test to cover `InterleavedWeightedRoundRobinWorkerChoiceStrategy#choose`
const promises = new Set()
const maxMultiplier = 2
WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
}
)
- if (!pool.info.ready) {
- await waitPoolEvents(pool, PoolEvents.ready, 1)
- }
// TODO: Create a better test to cover `InterleavedWeightedRoundRobinWorkerChoiceStrategy#choose`
const promises = new Set()
const maxMultiplier = 2
for (const workerNode of pool.workerNodes) {
expect(workerNode.usage).toStrictEqual({
tasks: {
- executed: expect.any(Number),
+ executed: maxMultiplier,
executing: 0,
queued: 0,
maxQueued: 0,
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
pool.workerChoiceStrategyContext.workerChoiceStrategy
).nextWorkerNodeKey
- ).toBe(1)
+ ).toBe(0)
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
pool.workerChoiceStrategyContext.workerChoiceStrategy