summary |
shortlog |
log |
commit | commitdiff |
tree
raw |
patch |
inline | side by side (from parent 1:
9b2bdf9)
Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>
this.opts.workerChoiceStrategy = workerChoiceStrategy
for (const workerNode of this.workerNodes) {
this.setWorkerNodeTasksUsage(workerNode, {
this.opts.workerChoiceStrategy = workerChoiceStrategy
for (const workerNode of this.workerNodes) {
this.setWorkerNodeTasksUsage(workerNode, {
running: 0,
runTime: 0,
runTimeHistory: new CircularArray(),
running: 0,
runTime: 0,
runTimeHistory: new CircularArray(),
const workerTasksUsage =
this.workerNodes[this.getWorkerNodeKey(worker)].tasksUsage
--workerTasksUsage.running
const workerTasksUsage =
this.workerNodes[this.getWorkerNodeKey(worker)].tasksUsage
--workerTasksUsage.running
if (message.error != null) {
++workerTasksUsage.error
}
if (message.error != null) {
++workerTasksUsage.error
}
workerTasksUsage.runTime += message.runTime ?? 0
if (
this.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime &&
workerTasksUsage.runTime += message.runTime ?? 0
if (
this.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime &&
- workerTasksUsage.run !== 0
+ workerTasksUsage.ran !== 0
) {
workerTasksUsage.avgRunTime =
) {
workerTasksUsage.avgRunTime =
- workerTasksUsage.runTime / workerTasksUsage.run
+ workerTasksUsage.runTime / workerTasksUsage.ran
}
if (
this.workerChoiceStrategyContext.getRequiredStatistics().medRunTime &&
}
if (
this.workerChoiceStrategyContext.getRequiredStatistics().medRunTime &&
workerTasksUsage.waitTime += message.waitTime ?? 0
if (
this.workerChoiceStrategyContext.getRequiredStatistics().avgWaitTime &&
workerTasksUsage.waitTime += message.waitTime ?? 0
if (
this.workerChoiceStrategyContext.getRequiredStatistics().avgWaitTime &&
- workerTasksUsage.run !== 0
+ workerTasksUsage.ran !== 0
) {
workerTasksUsage.avgWaitTime =
) {
workerTasksUsage.avgWaitTime =
- workerTasksUsage.waitTime / workerTasksUsage.run
+ workerTasksUsage.waitTime / workerTasksUsage.ran
}
if (
this.workerChoiceStrategyContext.getRequiredStatistics().medWaitTime &&
}
if (
this.workerChoiceStrategyContext.getRequiredStatistics().medWaitTime &&
return this.workerNodes.push({
worker,
tasksUsage: {
return this.workerNodes.push({
worker,
tasksUsage: {
running: 0,
runTime: 0,
runTimeHistory: new CircularArray(),
running: 0,
runTime: 0,
runTimeHistory: new CircularArray(),
let leastUsedWorkerNodeKey!: number
for (const [workerNodeKey, workerNode] of this.pool.workerNodes.entries()) {
const tasksUsage = workerNode.tasksUsage
let leastUsedWorkerNodeKey!: number
for (const [workerNodeKey, workerNode] of this.pool.workerNodes.entries()) {
const tasksUsage = workerNode.tasksUsage
- const workerTasks = tasksUsage.run + tasksUsage.running
+ const workerTasks = tasksUsage.ran + tasksUsage.running
if (workerTasks === 0) {
return workerNodeKey
} else if (workerTasks < minNumberOfTasks) {
if (workerTasks === 0) {
return workerNodeKey
} else if (workerTasks < minNumberOfTasks) {
/**
* Number of tasks executed.
*/
/**
* Number of tasks executed.
*/
/**
* Number of tasks running.
*/
/**
* Number of tasks running.
*/
)
for (const workerNode of pool.workerNodes) {
expect(workerNode.tasksUsage).toStrictEqual({
)
for (const workerNode of pool.workerNodes) {
expect(workerNode.tasksUsage).toStrictEqual({
running: 0,
runTime: 0,
runTimeHistory: expect.any(CircularArray),
running: 0,
runTime: 0,
runTimeHistory: expect.any(CircularArray),
}
for (const workerNode of pool.workerNodes) {
expect(workerNode.tasksUsage).toStrictEqual({
}
for (const workerNode of pool.workerNodes) {
expect(workerNode.tasksUsage).toStrictEqual({
running: maxMultiplier,
runTime: 0,
runTimeHistory: expect.any(CircularArray),
running: maxMultiplier,
runTime: 0,
runTimeHistory: expect.any(CircularArray),
await Promise.all(promises)
for (const workerNode of pool.workerNodes) {
expect(workerNode.tasksUsage).toStrictEqual({
await Promise.all(promises)
for (const workerNode of pool.workerNodes) {
expect(workerNode.tasksUsage).toStrictEqual({
running: 0,
runTime: 0,
runTimeHistory: expect.any(CircularArray),
running: 0,
runTime: 0,
runTimeHistory: expect.any(CircularArray),
await Promise.all(promises)
for (const workerNode of pool.workerNodes) {
expect(workerNode.tasksUsage).toStrictEqual({
await Promise.all(promises)
for (const workerNode of pool.workerNodes) {
expect(workerNode.tasksUsage).toStrictEqual({
- run: expect.any(Number),
+ ran: expect.any(Number),
running: 0,
runTime: 0,
runTimeHistory: expect.any(CircularArray),
running: 0,
runTime: 0,
runTimeHistory: expect.any(CircularArray),
medWaitTime: 0,
error: 0
})
medWaitTime: 0,
error: 0
})
- expect(workerNode.tasksUsage.run).toBeGreaterThan(0)
- expect(workerNode.tasksUsage.run).toBeLessThanOrEqual(maxMultiplier)
+ expect(workerNode.tasksUsage.ran).toBeGreaterThan(0)
+ expect(workerNode.tasksUsage.ran).toBeLessThanOrEqual(maxMultiplier)
}
pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
for (const workerNode of pool.workerNodes) {
expect(workerNode.tasksUsage).toStrictEqual({
}
pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
for (const workerNode of pool.workerNodes) {
expect(workerNode.tasksUsage).toStrictEqual({
running: 0,
runTime: 0,
runTimeHistory: expect.any(CircularArray),
running: 0,
runTime: 0,
runTimeHistory: expect.any(CircularArray),
expect(workerNode.tasksUsage.running).toBeLessThanOrEqual(
queuePool.opts.tasksQueueOptions.concurrency
)
expect(workerNode.tasksUsage.running).toBeLessThanOrEqual(
queuePool.opts.tasksQueueOptions.concurrency
)
- expect(workerNode.tasksUsage.run).toBe(0)
+ expect(workerNode.tasksUsage.ran).toBe(0)
expect(workerNode.tasksQueue.size).toBeGreaterThan(0)
}
expect(queuePool.info.runningTasks).toBe(numberOfWorkers)
expect(workerNode.tasksQueue.size).toBeGreaterThan(0)
}
expect(queuePool.info.runningTasks).toBe(numberOfWorkers)
await Promise.all(promises)
for (const workerNode of queuePool.workerNodes) {
expect(workerNode.tasksUsage.running).toBe(0)
await Promise.all(promises)
for (const workerNode of queuePool.workerNodes) {
expect(workerNode.tasksUsage.running).toBe(0)
- expect(workerNode.tasksUsage.run).toBeGreaterThan(0)
- expect(workerNode.tasksUsage.run).toBeLessThanOrEqual(maxMultiplier)
+ expect(workerNode.tasksUsage.ran).toBeGreaterThan(0)
+ expect(workerNode.tasksUsage.ran).toBeLessThanOrEqual(maxMultiplier)
expect(workerNode.tasksQueue.size).toBe(0)
}
})
expect(workerNode.tasksQueue.size).toBe(0)
}
})
await Promise.all(promises)
for (const workerNode of pool.workerNodes) {
expect(workerNode.tasksUsage).toStrictEqual({
await Promise.all(promises)
for (const workerNode of pool.workerNodes) {
expect(workerNode.tasksUsage).toStrictEqual({
running: 0,
runTime: 0,
runTimeHistory: expect.any(CircularArray),
running: 0,
runTime: 0,
runTimeHistory: expect.any(CircularArray),
await Promise.all(promises)
for (const workerNode of pool.workerNodes) {
expect(workerNode.tasksUsage).toStrictEqual({
await Promise.all(promises)
for (const workerNode of pool.workerNodes) {
expect(workerNode.tasksUsage).toStrictEqual({
running: 0,
runTime: 0,
runTimeHistory: expect.any(CircularArray),
running: 0,
runTime: 0,
runTimeHistory: expect.any(CircularArray),
await Promise.all(promises)
for (const workerNode of pool.workerNodes) {
expect(workerNode.tasksUsage).toStrictEqual({
await Promise.all(promises)
for (const workerNode of pool.workerNodes) {
expect(workerNode.tasksUsage).toStrictEqual({
running: 0,
runTime: 0,
runTimeHistory: expect.any(CircularArray),
running: 0,
runTime: 0,
runTimeHistory: expect.any(CircularArray),
await Promise.all(promises)
for (const workerNode of pool.workerNodes) {
expect(workerNode.tasksUsage).toStrictEqual({
await Promise.all(promises)
for (const workerNode of pool.workerNodes) {
expect(workerNode.tasksUsage).toStrictEqual({
running: 0,
runTime: 0,
runTimeHistory: expect.any(CircularArray),
running: 0,
runTime: 0,
runTimeHistory: expect.any(CircularArray),
await Promise.all(promises)
for (const workerNode of pool.workerNodes) {
expect(workerNode.tasksUsage).toStrictEqual({
await Promise.all(promises)
for (const workerNode of pool.workerNodes) {
expect(workerNode.tasksUsage).toStrictEqual({
- run: expect.any(Number),
+ ran: expect.any(Number),
running: 0,
runTime: expect.any(Number),
runTimeHistory: expect.any(CircularArray),
running: 0,
runTime: expect.any(Number),
runTimeHistory: expect.any(CircularArray),
medWaitTime: 0,
error: 0
})
medWaitTime: 0,
error: 0
})
- expect(workerNode.tasksUsage.run).toBeGreaterThan(0)
- expect(workerNode.tasksUsage.run).toBeLessThanOrEqual(max * maxMultiplier)
+ expect(workerNode.tasksUsage.ran).toBeGreaterThan(0)
+ expect(workerNode.tasksUsage.ran).toBeLessThanOrEqual(max * maxMultiplier)
expect(workerNode.tasksUsage.runTime).toBeGreaterThan(0)
}
// We need to clean up the resources after our test
expect(workerNode.tasksUsage.runTime).toBeGreaterThan(0)
}
// We need to clean up the resources after our test
await Promise.all(promises)
for (const workerNode of pool.workerNodes) {
expect(workerNode.tasksUsage).toStrictEqual({
await Promise.all(promises)
for (const workerNode of pool.workerNodes) {
expect(workerNode.tasksUsage).toStrictEqual({
- run: expect.any(Number),
+ ran: expect.any(Number),
running: 0,
runTime: expect.any(Number),
runTimeHistory: expect.any(CircularArray),
running: 0,
runTime: expect.any(Number),
runTimeHistory: expect.any(CircularArray),
medWaitTime: 0,
error: 0
})
medWaitTime: 0,
error: 0
})
- expect(workerNode.tasksUsage.run).toBeGreaterThan(0)
- expect(workerNode.tasksUsage.run).toBeLessThanOrEqual(max * maxMultiplier)
+ expect(workerNode.tasksUsage.ran).toBeGreaterThan(0)
+ expect(workerNode.tasksUsage.ran).toBeLessThanOrEqual(max * maxMultiplier)
expect(workerNode.tasksUsage.runTime).toBeGreaterThan(0)
}
// We need to clean up the resources after our test
expect(workerNode.tasksUsage.runTime).toBeGreaterThan(0)
}
// We need to clean up the resources after our test
await Promise.all(promises)
for (const workerNode of pool.workerNodes) {
expect(workerNode.tasksUsage).toStrictEqual({
await Promise.all(promises)
for (const workerNode of pool.workerNodes) {
expect(workerNode.tasksUsage).toStrictEqual({
running: 0,
runTime: expect.any(Number),
runTimeHistory: expect.any(CircularArray),
running: 0,
runTime: expect.any(Number),
runTimeHistory: expect.any(CircularArray),
await Promise.all(promises)
for (const workerNode of pool.workerNodes) {
expect(workerNode.tasksUsage).toStrictEqual({
await Promise.all(promises)
for (const workerNode of pool.workerNodes) {
expect(workerNode.tasksUsage).toStrictEqual({
running: 0,
runTime: expect.any(Number),
runTimeHistory: expect.any(CircularArray),
running: 0,
runTime: expect.any(Number),
runTimeHistory: expect.any(CircularArray),
await Promise.all(promises)
for (const workerNode of pool.workerNodes) {
expect(workerNode.tasksUsage).toStrictEqual({
await Promise.all(promises)
for (const workerNode of pool.workerNodes) {
expect(workerNode.tasksUsage).toStrictEqual({
running: 0,
runTime: expect.any(Number),
runTimeHistory: expect.any(CircularArray),
running: 0,
runTime: expect.any(Number),
runTimeHistory: expect.any(CircularArray),
await Promise.all(promises)
for (const workerNode of pool.workerNodes) {
expect(workerNode.tasksUsage).toStrictEqual({
await Promise.all(promises)
for (const workerNode of pool.workerNodes) {
expect(workerNode.tasksUsage).toStrictEqual({
- run: expect.any(Number),
+ ran: expect.any(Number),
running: 0,
runTime: expect.any(Number),
runTimeHistory: expect.any(CircularArray),
running: 0,
runTime: expect.any(Number),
runTimeHistory: expect.any(CircularArray),
medWaitTime: 0,
error: 0
})
medWaitTime: 0,
error: 0
})
- expect(workerNode.tasksUsage.run).toBeGreaterThanOrEqual(0)
- expect(workerNode.tasksUsage.run).toBeLessThanOrEqual(max * maxMultiplier)
+ expect(workerNode.tasksUsage.ran).toBeGreaterThanOrEqual(0)
+ expect(workerNode.tasksUsage.ran).toBeLessThanOrEqual(max * maxMultiplier)
expect(workerNode.tasksUsage.runTime).toBeGreaterThanOrEqual(0)
expect(workerNode.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0)
}
expect(workerNode.tasksUsage.runTime).toBeGreaterThanOrEqual(0)
expect(workerNode.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0)
}
await Promise.all(promises)
for (const workerNode of pool.workerNodes) {
expect(workerNode.tasksUsage).toStrictEqual({
await Promise.all(promises)
for (const workerNode of pool.workerNodes) {
expect(workerNode.tasksUsage).toStrictEqual({
- run: expect.any(Number),
+ ran: expect.any(Number),
running: 0,
runTime: expect.any(Number),
runTimeHistory: expect.any(CircularArray),
running: 0,
runTime: expect.any(Number),
runTimeHistory: expect.any(CircularArray),
medWaitTime: 0,
error: 0
})
medWaitTime: 0,
error: 0
})
- expect(workerNode.tasksUsage.run).toBeGreaterThan(0)
- expect(workerNode.tasksUsage.run).toBeLessThanOrEqual(max * maxMultiplier)
+ expect(workerNode.tasksUsage.ran).toBeGreaterThan(0)
+ expect(workerNode.tasksUsage.ran).toBeLessThanOrEqual(max * maxMultiplier)
expect(workerNode.tasksUsage.runTime).toBeGreaterThan(0)
expect(workerNode.tasksUsage.avgRunTime).toBeGreaterThan(0)
}
expect(workerNode.tasksUsage.runTime).toBeGreaterThan(0)
expect(workerNode.tasksUsage.avgRunTime).toBeGreaterThan(0)
}
await Promise.all(promises)
for (const workerNode of pool.workerNodes) {
expect(workerNode.tasksUsage).toStrictEqual({
await Promise.all(promises)
for (const workerNode of pool.workerNodes) {
expect(workerNode.tasksUsage).toStrictEqual({
- run: expect.any(Number),
+ ran: expect.any(Number),
running: 0,
runTime: expect.any(Number),
runTimeHistory: expect.any(CircularArray),
running: 0,
runTime: expect.any(Number),
runTimeHistory: expect.any(CircularArray),
medWaitTime: 0,
error: 0
})
medWaitTime: 0,
error: 0
})
- expect(workerNode.tasksUsage.run).toBeGreaterThan(0)
- expect(workerNode.tasksUsage.run).toBeLessThanOrEqual(max * maxMultiplier)
+ expect(workerNode.tasksUsage.ran).toBeGreaterThan(0)
+ expect(workerNode.tasksUsage.ran).toBeLessThanOrEqual(max * maxMultiplier)
expect(workerNode.tasksUsage.runTime).toBeGreaterThan(0)
expect(workerNode.tasksUsage.medRunTime).toBeGreaterThan(0)
}
expect(workerNode.tasksUsage.runTime).toBeGreaterThan(0)
expect(workerNode.tasksUsage.medRunTime).toBeGreaterThan(0)
}
await Promise.all(promises)
for (const workerNode of pool.workerNodes) {
expect(workerNode.tasksUsage).toStrictEqual({
await Promise.all(promises)
for (const workerNode of pool.workerNodes) {
expect(workerNode.tasksUsage).toStrictEqual({
running: 0,
runTime: 0,
runTimeHistory: expect.any(CircularArray),
running: 0,
runTime: 0,
runTimeHistory: expect.any(CircularArray),
await Promise.all(promises)
for (const workerNode of pool.workerNodes) {
expect(workerNode.tasksUsage).toStrictEqual({
await Promise.all(promises)
for (const workerNode of pool.workerNodes) {
expect(workerNode.tasksUsage).toStrictEqual({
running: 0,
runTime: 0,
runTimeHistory: expect.any(CircularArray),
running: 0,
runTime: 0,
runTimeHistory: expect.any(CircularArray),
expect(workerNode.tasksUsage.running).toBeLessThanOrEqual(
queuePool.opts.tasksQueueOptions.concurrency
)
expect(workerNode.tasksUsage.running).toBeLessThanOrEqual(
queuePool.opts.tasksQueueOptions.concurrency
)
- expect(workerNode.tasksUsage.run).toBe(0)
+ expect(workerNode.tasksUsage.ran).toBe(0)
expect(workerNode.tasksQueue.size).toBeGreaterThan(0)
}
expect(queuePool.info.runningTasks).toBe(numberOfThreads)
expect(workerNode.tasksQueue.size).toBeGreaterThan(0)
}
expect(queuePool.info.runningTasks).toBe(numberOfThreads)
await Promise.all(promises)
for (const workerNode of queuePool.workerNodes) {
expect(workerNode.tasksUsage.running).toBe(0)
await Promise.all(promises)
for (const workerNode of queuePool.workerNodes) {
expect(workerNode.tasksUsage.running).toBe(0)
- expect(workerNode.tasksUsage.run).toBeGreaterThan(0)
- expect(workerNode.tasksUsage.run).toBeLessThanOrEqual(maxMultiplier)
+ expect(workerNode.tasksUsage.ran).toBeGreaterThan(0)
+ expect(workerNode.tasksUsage.ran).toBeLessThanOrEqual(maxMultiplier)
expect(workerNode.tasksQueue.size).toBe(0)
}
})
expect(workerNode.tasksQueue.size).toBe(0)
}
})