## [Unreleased]
+### Changed
+
+- Refactor pool worker node usage internals.
+
+### Fixed
+
+- Fix wait time accounting.
+- Ensure worker choice strategy `LEAST_BUSY` accounts also tasks wait time.
+- Ensure worker choice strategy `LEAST_USED` accounts also queued tasks.
+
## [2.5.4] - 2023-06-07
### Added
### Fixed
- Ensure one task at a time is executed per worker with tasks queueing enabled.
-- Properly count worker running tasks with tasks queueing enabled.
+- Properly count worker executing tasks with tasks queueing enabled.
## [2.4.5] - 2023-04-09
- `workerChoiceStrategy` (optional) - The worker choice strategy to use in this pool:
- `WorkerChoiceStrategies.ROUND_ROBIN`: Submit tasks to worker in a round robin fashion
- - `WorkerChoiceStrategies.LEAST_USED`: Submit tasks to the worker with the minimum number of running and ran tasks
- - `WorkerChoiceStrategies.LEAST_BUSY`: Submit tasks to the worker with the minimum tasks total execution time
+ - `WorkerChoiceStrategies.LEAST_USED`: Submit tasks to the worker with the minimum number of executed, executing and queued tasks
+ - `WorkerChoiceStrategies.LEAST_BUSY`: Submit tasks to the worker with the minimum tasks total execution and wait time
- `WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN`: Submit tasks to worker by using a weighted round robin scheduling algorithm based on tasks execution time
- `WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN`: Submit tasks to worker by using an interleaved weighted round robin scheduling algorithm based on tasks execution time (experimental)
- `WorkerChoiceStrategies.FAIR_SHARE`: Submit tasks to worker by using a fair share tasks scheduling algorithm based on tasks execution time
Default: `60000`
- `killBehavior` (optional) - Dictates if your async unit (worker/process) will be deleted in case that a task is active on it.
- **KillBehaviors.SOFT**: If `currentTime - lastActiveTime` is greater than `maxInactiveTime` but a task is still running, then the worker **won't** be deleted.
- **KillBehaviors.HARD**: If `currentTime - lastActiveTime` is greater than `maxInactiveTime` but a task is still running, then the worker will be deleted.
+ **KillBehaviors.SOFT**: If `currentTime - lastActiveTime` is greater than `maxInactiveTime` but a task is still executing, then the worker **won't** be deleted.
+ **KillBehaviors.HARD**: If `currentTime - lastActiveTime` is greater than `maxInactiveTime` but a task is still executing, then the worker will be deleted.
This option only apply to the newly created workers.
Default: `KillBehaviors.SOFT`
* Can be overridden.
*
* @param workerNodeKey - The worker node key.
+ * @param task - The task to execute.
*/
- protected beforeTaskExecutionHook (workerNodeKey: number): void {
- ++this.workerNodes[workerNodeKey].workerUsage.tasks.executing
+ protected beforeTaskExecutionHook (
+ workerNodeKey: number,
+ task: Task<Data>
+ ): void {
+ const workerUsage = this.workerNodes[workerNodeKey].workerUsage
+ ++workerUsage.tasks.executing
+ this.updateWaitTimeWorkerUsage(workerUsage, task)
}
/**
if (message.taskError != null) {
++workerTaskStatistics.failed
}
-
this.updateRunTimeWorkerUsage(workerUsage, message)
- this.updateWaitTimeWorkerUsage(workerUsage, message)
this.updateEluWorkerUsage(workerUsage, message)
}
private updateWaitTimeWorkerUsage (
workerUsage: WorkerUsage,
- message: MessageValue<Response>
+ task: Task<Data>
): void {
+ const timestamp = performance.now()
+ const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
if (
this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime
) {
- workerUsage.waitTime.aggregation += message.taskPerformance?.waitTime ?? 0
+ workerUsage.waitTime.aggregation += taskWaitTime ?? 0
if (
this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
.avgWaitTime &&
if (
this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
.medWaitTime &&
- message.taskPerformance?.waitTime != null
+ taskWaitTime != null
) {
- workerUsage.waitTime.history.push(message.taskPerformance.waitTime)
+ workerUsage.waitTime.history.push(taskWaitTime)
workerUsage.waitTime.median = median(workerUsage.waitTime.history)
}
}
}
private executeTask (workerNodeKey: number, task: Task<Data>): void {
- this.beforeTaskExecutionHook(workerNodeKey)
+ this.beforeTaskExecutionHook(workerNodeKey, task)
this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
}
runTime:
this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
.runTime,
- waitTime:
- this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
- .waitTime,
elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
.elu
}
private getWorkerUsage (worker: Worker): WorkerUsage {
return {
- tasks: this.getTaskStatistics(this, worker),
+ tasks: this.getTaskStatistics(worker),
runTime: {
aggregation: 0,
average: 0,
}
}
- private getTaskStatistics (
- self: AbstractPool<Worker, Data, Response>,
- worker: Worker
- ): TaskStatistics {
+ private getTaskStatistics (worker: Worker): TaskStatistics {
+ const queueSize =
+ this.workerNodes[this.getWorkerNodeKey(worker)]?.tasksQueue?.size
return {
executed: 0,
executing: 0,
get queued (): number {
- return self.tasksQueueSize(self.getWorkerNodeKey(worker))
+ return queueSize ?? 0
},
failed: 0
}
/**
* Finds the first free worker node key based on the number of tasks the worker has applied.
*
- * If a worker is found with `0` running tasks, it is detected as free and its worker node key is returned.
+ * If a worker is found with `0` executing tasks, it is detected as free and its worker node key is returned.
*
* If no free worker is found, `-1` is returned.
*
/**
* Finds the last free worker node key based on the number of tasks the worker has applied.
*
- * If a worker is found with `0` running tasks, it is detected as free and its worker node key is returned.
+ * If a worker is found with `0` executing tasks, it is detected as free and its worker node key is returned.
*
* If no free worker is found, `-1` is returned.
*
runTime: true,
avgRunTime: false,
medRunTime: false,
- waitTime: false,
+ waitTime: true,
avgWaitTime: false,
medWaitTime: false,
elu: false
/** @inheritDoc */
public choose (): number {
- let minRunTime = Infinity
+ let minTime = Infinity
let leastBusyWorkerNodeKey!: number
for (const [workerNodeKey, workerNode] of this.pool.workerNodes.entries()) {
- const workerRunTime = workerNode.workerUsage.runTime.aggregation
- if (workerRunTime === 0) {
+ const workerTime =
+ workerNode.workerUsage.runTime.aggregation +
+ workerNode.workerUsage.waitTime.aggregation
+ if (workerTime === 0) {
return workerNodeKey
- } else if (workerRunTime < minRunTime) {
- minRunTime = workerRunTime
+ } else if (workerTime < minTime) {
+ minTime = workerTime
leastBusyWorkerNodeKey = workerNodeKey
}
}
for (const [workerNodeKey, workerNode] of this.pool.workerNodes.entries()) {
const workerTaskStatistics = workerNode.workerUsage.tasks
const workerTasks =
- workerTaskStatistics.executed + workerTaskStatistics.executing
+ workerTaskStatistics.executed +
+ workerTaskStatistics.executing +
+ workerTaskStatistics.queued
if (workerTasks === 0) {
return workerNodeKey
} else if (workerTasks < minNumberOfTasks) {
* Task runtime.
*/
runTime?: number
- /**
- * Task wait time.
- */
- waitTime?: number
/**
* Task event loop utilization.
*/
*/
export interface WorkerStatistics {
runTime: boolean
- waitTime: boolean
elu: boolean
}
message: MessageValue<Data>
): void {
try {
- let taskPerformance = this.beginTaskPerformance(message)
+ let taskPerformance = this.beginTaskPerformance()
const res = fn(message.data)
taskPerformance = this.endTaskPerformance(taskPerformance)
this.sendToMainWorker({
fn: WorkerAsyncFunction<Data, Response>,
message: MessageValue<Data>
): void {
- let taskPerformance = this.beginTaskPerformance(message)
+ let taskPerformance = this.beginTaskPerformance()
fn(message.data)
.then(res => {
taskPerformance = this.endTaskPerformance(taskPerformance)
return fn
}
- private beginTaskPerformance (message: MessageValue<Data>): TaskPerformance {
- const timestamp = performance.now()
+ private beginTaskPerformance (): TaskPerformance {
return {
- timestamp,
- ...(this.statistics.waitTime && {
- waitTime: timestamp - (message.timestamp ?? timestamp)
- }),
+ timestamp: performance.now(),
...(this.statistics.elu && { elu: performance.eventLoopUtilization() })
}
}
*/
export const KillBehaviors = Object.freeze({
/**
- * If `currentTime - lastActiveTime` is greater than `maxInactiveTime` but a task is still running, then the worker **wont** be deleted.
+ * If `currentTime - lastActiveTime` is greater than `maxInactiveTime` but a task is still executing, then the worker **wont** be deleted.
*/
SOFT: 'SOFT',
/**
- * If `currentTime - lastActiveTime` is greater than `maxInactiveTime` but a task is still running, then the worker will be deleted.
+ * If `currentTime - lastActiveTime` is greater than `maxInactiveTime` but a task is still executing, then the worker will be deleted.
*/
HARD: 'HARD'
} as const)
/**
* `killBehavior` dictates if your async unit (worker/process) will be deleted in case that a task is active on it.
*
- * - SOFT: If `currentTime - lastActiveTime` is greater than `maxInactiveTime` but a task is still running, then the worker **won't** be deleted.
- * - HARD: If `currentTime - lastActiveTime` is greater than `maxInactiveTime` but a task is still running, then the worker will be deleted.
+ * - SOFT: If `currentTime - lastActiveTime` is greater than `maxInactiveTime` but a task is still executing, then the worker **won't** be deleted.
+ * - HARD: If `currentTime - lastActiveTime` is greater than `maxInactiveTime` but a task is still executing, then the worker will be deleted.
*
* This option only apply to the newly created workers.
*
await pool1.destroy()
})
- it('Verify scale processes up and down is working when long running task is used:hard', async () => {
+ it('Verify scale processes up and down is working when long executing task is used:hard', async () => {
const longRunningPool = new DynamicClusterPool(
min,
max,
'./tests/worker-files/cluster/longRunningWorkerHardBehavior.js',
{
errorHandler: e => console.error(e),
- onlineHandler: () => console.log('long running worker is online'),
- exitHandler: () => console.log('long running worker exited')
+ onlineHandler: () => console.log('long executing worker is online'),
+ exitHandler: () => console.log('long executing worker exited')
}
)
expect(longRunningPool.workerNodes.length).toBe(min)
await longRunningPool.destroy()
})
- it('Verify scale processes up and down is working when long running task is used:soft', async () => {
+ it('Verify scale processes up and down is working when long executing task is used:soft', async () => {
const longRunningPool = new DynamicClusterPool(
min,
max,
'./tests/worker-files/cluster/longRunningWorkerSoftBehavior.js',
{
errorHandler: e => console.error(e),
- onlineHandler: () => console.log('long running worker is online'),
- exitHandler: () => console.log('long running worker exited')
+ onlineHandler: () => console.log('long executing worker is online'),
+ exitHandler: () => console.log('long executing worker exited')
}
)
expect(longRunningPool.workerNodes.length).toBe(min)
}
expect(longRunningPool.workerNodes.length).toBe(max)
await TestUtils.sleep(1500)
- // Here we expect the workerNodes to be at the max size since the task is still running
+ // Here we expect the workerNodes to be at the max size since the task is still executing
expect(longRunningPool.workerNodes.length).toBe(max)
// We need to clean up the resources after our test
await longRunningPool.destroy()
runTime: true,
avgRunTime: false,
medRunTime: false,
- waitTime: false,
+ waitTime: true,
avgWaitTime: false,
medWaitTime: false,
elu: false
runTime: true,
avgRunTime: false,
medRunTime: false,
- waitTime: false,
+ waitTime: true,
avgWaitTime: false,
medWaitTime: false,
elu: false
history: expect.any(CircularArray)
},
waitTime: {
- aggregation: 0,
+ aggregation: expect.any(Number),
average: 0,
median: 0,
history: expect.any(CircularArray)
expect(workerNode.workerUsage.runTime.aggregation).toBeGreaterThanOrEqual(
0
)
+ expect(
+ workerNode.workerUsage.waitTime.aggregation
+ ).toBeGreaterThanOrEqual(0)
}
// We need to clean up the resources after our test
await pool.destroy()
history: expect.any(CircularArray)
},
waitTime: {
- aggregation: 0,
+ aggregation: expect.any(Number),
average: 0,
median: 0,
history: expect.any(CircularArray)
max * maxMultiplier
)
expect(workerNode.workerUsage.runTime.aggregation).toBeGreaterThan(0)
+ expect(workerNode.workerUsage.waitTime.aggregation).toBeGreaterThan(0)
}
// We need to clean up the resources after our test
await pool.destroy()
await pool1.destroy()
})
- it('Verify scale thread up and down is working when long running task is used:hard', async () => {
+ it('Verify scale thread up and down is working when long executing task is used:hard', async () => {
const longRunningPool = new DynamicThreadPool(
min,
max,
'./tests/worker-files/thread/longRunningWorkerHardBehavior.js',
{
errorHandler: e => console.error(e),
- onlineHandler: () => console.log('long running worker is online'),
- exitHandler: () => console.log('long running worker exited')
+ onlineHandler: () => console.log('long executing worker is online'),
+ exitHandler: () => console.log('long executing worker exited')
}
)
expect(longRunningPool.workerNodes.length).toBe(min)
await longRunningPool.destroy()
})
- it('Verify scale thread up and down is working when long running task is used:soft', async () => {
+ it('Verify scale thread up and down is working when long executing task is used:soft', async () => {
const longRunningPool = new DynamicThreadPool(
min,
max,
'./tests/worker-files/thread/longRunningWorkerSoftBehavior.js',
{
errorHandler: e => console.error(e),
- onlineHandler: () => console.log('long running worker is online'),
- exitHandler: () => console.log('long running worker exited')
+ onlineHandler: () => console.log('long executing worker is online'),
+ exitHandler: () => console.log('long executing worker exited')
}
)
expect(longRunningPool.workerNodes.length).toBe(min)
}
expect(longRunningPool.workerNodes.length).toBe(max)
await TestUtils.sleep(1500)
- // Here we expect the workerNodes to be at the max size since the task is still running
+ // Here we expect the workerNodes to be at the max size since the task is still executing
expect(longRunningPool.workerNodes.length).toBe(max)
// We need to clean up the resources after our test
await longRunningPool.destroy()