protected internalBusy (): boolean {
return (
this.workerNodes.findIndex(workerNode => {
- return workerNode.tasksUsage?.running === 0
+ return workerNode.tasksUsage.running === 0
}) === -1
)
}
worker: Worker,
message: MessageValue<Response>
): void {
- const workerTasksUsage = this.getWorkerTasksUsage(worker)
+ const workerNodeKey = this.getWorkerNodeKey(worker)
+ const workerTasksUsage = this.workerNodes[workerNodeKey].tasksUsage
--workerTasksUsage.running
++workerTasksUsage.run
if (message.error != null) {
workerTasksUsage.medRunTime = median(workerTasksUsage.runTimeHistory)
}
}
- this.workerChoiceStrategyContext.update()
+ this.workerChoiceStrategyContext.update(workerNodeKey)
}
/**
if (this.type === PoolType.DYNAMIC && !this.full && this.internalBusy()) {
const workerCreated = this.createAndSetupWorker()
this.registerWorkerMessageListener(workerCreated, message => {
+ const currentWorkerNodeKey = this.getWorkerNodeKey(workerCreated)
if (
isKillBehavior(KillBehaviors.HARD, message.kill) ||
(message.kill != null &&
- this.getWorkerTasksUsage(workerCreated)?.running === 0)
+ this.workerNodes[currentWorkerNodeKey].tasksUsage.running === 0)
) {
// Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
- this.flushTasksQueueByWorker(workerCreated)
+ this.flushTasksQueue(currentWorkerNodeKey)
void (this.destroyWorker(workerCreated) as Promise<void>)
}
})
workerNode.tasksUsage = tasksUsage
}
- /**
- * Gets the given worker its tasks usage in the pool.
- *
- * @param worker - The worker.
- * @throws Error if the worker is not found in the pool worker nodes.
- * @returns The worker tasks usage.
- */
- private getWorkerTasksUsage (worker: Worker): TasksUsage {
- const workerNodeKey = this.getWorkerNodeKey(worker)
- if (workerNodeKey !== -1) {
- return this.workerNodes[workerNodeKey].tasksUsage
- }
- throw new Error('Worker could not be found in the pool worker nodes')
- }
-
/**
* Pushes the given worker in the pool worker nodes.
*
}
}
- private flushTasksQueueByWorker (worker: Worker): void {
- const workerNodeKey = this.getWorkerNodeKey(worker)
- this.flushTasksQueue(workerNodeKey)
- }
-
private flushTasksQueues (): void {
for (const [workerNodeKey] of this.workerNodes.entries()) {
this.flushTasksQueue(workerNodeKey)
{ workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN }
)
// TODO: Create a better test to cover `RoundRobinWorkerChoiceStrategy#choose`
- const promises = []
const maxMultiplier = 2
for (let i = 0; i < max * maxMultiplier; i++) {
- promises.push(pool.execute())
+ await pool.execute()
}
- await Promise.all(promises)
// We need to clean up the resources after our test
await pool.destroy()
})
{ workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN }
)
// TODO: Create a better test to cover `RoundRobinWorkerChoiceStrategy#choose`
- const promises = []
const maxMultiplier = 2
for (let i = 0; i < max * maxMultiplier; i++) {
- promises.push(pool.execute())
+ await pool.execute()
}
- await Promise.all(promises)
// We need to clean up the resources after our test
await pool.destroy()
})
{ workerChoiceStrategy: WorkerChoiceStrategies.LESS_USED }
)
// TODO: Create a better test to cover `LessUsedWorkerChoiceStrategy#choose`
- const promises = []
const maxMultiplier = 2
for (let i = 0; i < max * maxMultiplier; i++) {
- promises.push(pool.execute())
+ await pool.execute()
}
- await Promise.all(promises)
// We need to clean up the resources after our test
await pool.destroy()
})
{ workerChoiceStrategy: WorkerChoiceStrategies.LESS_USED }
)
// TODO: Create a better test to cover `LessUsedWorkerChoiceStrategy#choose`
- const promises = []
const maxMultiplier = 2
for (let i = 0; i < max * maxMultiplier; i++) {
- promises.push(pool.execute())
+ await pool.execute()
}
- await Promise.all(promises)
// We need to clean up the resources after our test
await pool.destroy()
})
{ workerChoiceStrategy: WorkerChoiceStrategies.LESS_BUSY }
)
// TODO: Create a better test to cover `LessBusyWorkerChoiceStrategy#choose`
- const promises = []
const maxMultiplier = 2
for (let i = 0; i < max * maxMultiplier; i++) {
- promises.push(pool.execute())
+ await pool.execute()
}
- await Promise.all(promises)
// We need to clean up the resources after our test
await pool.destroy()
})
{ workerChoiceStrategy: WorkerChoiceStrategies.LESS_BUSY }
)
// TODO: Create a better test to cover `LessBusyWorkerChoiceStrategy#choose`
- const promises = []
const maxMultiplier = 2
for (let i = 0; i < max * maxMultiplier; i++) {
- promises.push(pool.execute())
+ await pool.execute()
}
- await Promise.all(promises)
// We need to clean up the resources after our test
await pool.destroy()
})
{ workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
)
// TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
- const promises = []
const maxMultiplier = 2
for (let i = 0; i < max * maxMultiplier; i++) {
- promises.push(pool.execute())
+ await pool.execute()
}
- await Promise.all(promises)
for (const workerNode of pool.workerNodes) {
expect(workerNode.tasksUsage.avgRunTime).toBeDefined()
expect(workerNode.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0)
{ workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
)
// TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
- const promises = []
const maxMultiplier = 2
for (let i = 0; i < max * maxMultiplier; i++) {
- promises.push(pool.execute())
+ await pool.execute()
}
- await Promise.all(promises)
for (const workerNode of pool.workerNodes) {
expect(workerNode.tasksUsage.avgRunTime).toBeDefined()
expect(workerNode.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0)
}
)
// TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
- const promises = []
const maxMultiplier = 2
for (let i = 0; i < max * maxMultiplier; i++) {
- promises.push(pool.execute())
+ await pool.execute()
}
- await Promise.all(promises)
for (const workerNode of pool.workerNodes) {
expect(workerNode.tasksUsage.avgRunTime).toBeDefined()
expect(workerNode.tasksUsage.avgRunTime).toBe(0)
{ workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
)
// TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
- const promises = []
const maxMultiplier = 2
for (let i = 0; i < max * maxMultiplier; i++) {
- promises.push(pool.execute())
+ await pool.execute()
}
- await Promise.all(promises)
for (const workerNode of pool.workerNodes) {
expect(workerNode.tasksUsage.avgRunTime).toBeDefined()
expect(workerNode.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0)
{ workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
)
// TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
- const promises = []
const maxMultiplier = 2
for (let i = 0; i < max * maxMultiplier; i++) {
- promises.push(pool.execute())
+ await pool.execute()
}
- await Promise.all(promises)
for (const workerNode of pool.workerNodes) {
expect(workerNode.tasksUsage.avgRunTime).toBeDefined()
expect(workerNode.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0)
}
)
// TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
- const promises = []
const maxMultiplier = 2
for (let i = 0; i < max * maxMultiplier; i++) {
- promises.push(pool.execute())
+ await pool.execute()
}
- await Promise.all(promises)
for (const workerNode of pool.workerNodes) {
expect(workerNode.tasksUsage.avgRunTime).toBeDefined()
expect(workerNode.tasksUsage.avgRunTime).toBe(0)