- Update simple moving average implementation to use a circular buffer.
- Update simple moving median implementation to use a circular buffer.
+- Account for stolen tasks in worker usage statistics and pool information.
+
+### Added
+
+- Continuous tasks stealing algorithm.
## [2.6.34] - 2023-08-24
...(this.opts.enableTasksQueue === true && {
backPressure: this.hasBackPressure()
}),
+ ...(this.opts.enableTasksQueue === true && {
+ stolenTasks: this.workerNodes.reduce(
+ (accumulator, workerNode) =>
+ accumulator + workerNode.usage.tasks.stolen,
+ 0
+ )
+ }),
failedTasks: this.workerNodes.reduce(
(accumulator, workerNode) =>
accumulator + workerNode.usage.tasks.failed,
} else {
this.enqueueTask(destinationWorkerNodeKey, task)
}
+ ++destinationWorkerNode.usage.tasks.stolen
+ if (this.shallUpdateTaskFunctionWorkerUsage(destinationWorkerNodeKey)) {
+ const taskFunctionWorkerUsage =
+ destinationWorkerNode.getTaskFunctionWorkerUsage(
+ task.name as string
+ ) as WorkerUsage
+ ++taskFunctionWorkerUsage.tasks.stolen
+ }
break
}
}
} else {
this.enqueueTask(workerNodeKey, task)
}
+ ++workerNode.usage.tasks.stolen
+ if (this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey)) {
+ const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
+ task.name as string
+ ) as WorkerUsage
+ ++taskFunctionWorkerUsage.tasks.stolen
+ }
}
}
}
readonly queuedTasks?: number
readonly maxQueuedTasks?: number
readonly backPressure?: boolean
+ readonly stolenTasks?: number
readonly failedTasks: number
readonly runTime?: {
readonly minimum: number
import { MessageChannel } from 'node:worker_threads'
import { CircularArray } from '../circular-array'
import type { Task } from '../utility-types'
-import { DEFAULT_TASK_NAME } from '../utils'
+import {
+ DEFAULT_TASK_NAME,
+ EMPTY_FUNCTION,
+ exponentialDelay,
+ sleep
+} from '../utils'
import { Deque } from '../deque'
import {
type IWorker,
public onEmptyQueue?: (workerId: number) => void
private readonly taskFunctionsUsage: Map<string, WorkerUsage>
private readonly tasksQueue: Deque<Task<Data>>
+ private onEmptyQueueCount: number
/**
* Constructs a new worker node.
this.taskFunctionsUsage = new Map<string, WorkerUsage>()
this.tasksQueue = new Deque<Task<Data>>()
this.tasksQueueBackPressureSize = tasksQueueBackPressureSize
+ this.onEmptyQueueCount = 0
}
/** @inheritdoc */
public dequeueTask (): Task<Data> | undefined {
const task = this.tasksQueue.shift()
if (this.onEmptyQueue != null && this.tasksQueue.size === 0) {
- this.onEmptyQueue(this.info.id as number)
+ this.startOnEmptyQueue().catch(EMPTY_FUNCTION)
}
return task
}
public popTask (): Task<Data> | undefined {
const task = this.tasksQueue.pop()
if (this.onEmptyQueue != null && this.tasksQueue.size === 0) {
- this.onEmptyQueue(this.info.id as number)
+ this.startOnEmptyQueue().catch(EMPTY_FUNCTION)
}
return task
}
return this.taskFunctionsUsage.get(name)
}
+ private async startOnEmptyQueue (): Promise<void> {
+ if (this.onEmptyQueue != null) {
+ if (this.tasksQueue.size > 0) {
+ this.onEmptyQueueCount = 0
+ return
+ }
+ this.onEmptyQueue(this.info.id as number)
+ ++this.onEmptyQueueCount
+ await sleep(exponentialDelay(this.onEmptyQueueCount))
+ await this.startOnEmptyQueue()
+ }
+ }
+
private initWorkerInfo (worker: Worker, workerType: WorkerType): WorkerInfo {
return {
id: this.getWorkerId(worker, workerType),
get maxQueued (): number {
return getTasksQueueMaxSize()
},
+ stolen: 0,
failed: 0
},
runTime: {
get queued (): number {
return getTaskFunctionQueueSize()
},
+ stolen: 0,
failed: 0
},
runTime: {
* Maximum number of queued tasks.
*/
readonly maxQueued?: number
+ /**
+ * Number of stolen tasks.
+ */
+ stolen: number
/**
* Number of failed tasks.
*/
return availableParallelism
}
-// /**
-// * Computes the retry delay in milliseconds using an exponential back off algorithm.
-// *
-// * @param retryNumber - The number of retries that have already been attempted
-// * @param maxDelayRatio - The maximum ratio of the delay that can be randomized
-// * @returns Delay in milliseconds
-// * @internal
-// */
-// export const exponentialDelay = (
-// retryNumber = 0,
-// maxDelayRatio = 0.2
-// ): number => {
-// const delay = Math.pow(2, retryNumber) * 100
-// const randomSum = delay * maxDelayRatio * Math.random() // 0-(maxDelayRatio*100)% of the delay
-// return delay + randomSum
-// }
+/**
+ * Sleeps for the given amount of milliseconds.
+ *
+ * @param ms - The amount of milliseconds to sleep.
+ * @returns A promise that resolves after the given amount of milliseconds.
+ */
+export const sleep = async (ms: number): Promise<void> => {
+ await new Promise((resolve) => {
+ setTimeout(resolve, ms)
+ })
+}
+
+/**
+ * Computes the retry delay in milliseconds using an exponential back off algorithm.
+ *
+ * @param retryNumber - The number of retries that have already been attempted
+ * @param maxDelayRatio - The maximum ratio of the delay that can be randomized
+ * @returns Delay in milliseconds
+ * @internal
+ */
+export const exponentialDelay = (
+ retryNumber = 0,
+ maxDelayRatio = 0.2
+): number => {
+ const delay = Math.pow(2, retryNumber) * 100
+ const randomSum = delay * maxDelayRatio * secureRandom() // 0-(maxDelayRatio*100)% of the delay
+ return delay + randomSum
+}
/**
* Computes the average of the given data set.
}
}
}
+
+/**
+ * Generate a cryptographically secure random number in the [0,1[ range
+ *
+ * @returns A number in the [0,1[ range
+ */
+const secureRandom = (): number => {
+ return crypto.getRandomValues(new Uint32Array(1))[0] / 0x100000000
+}
executing: 0,
queued: 0,
maxQueued: 0,
+ stolen: 0,
failed: 0
},
runTime: {
executing: maxMultiplier,
queued: 0,
maxQueued: 0,
+ stolen: 0,
failed: 0
},
runTime: {
executing: 0,
queued: 0,
maxQueued: 0,
+ stolen: 0,
failed: 0
},
runTime: {
executing: 0,
queued: 0,
maxQueued: 0,
+ stolen: 0,
failed: 0
},
runTime: {
executing: 0,
queued: 0,
maxQueued: 0,
+ stolen: 0,
failed: 0
},
runTime: {
maxQueuedTasks: expect.any(Number),
queuedTasks: expect.any(Number),
backPressure: true,
+ stolenTasks: expect.any(Number),
failedTasks: expect.any(Number)
})
expect(pool.hasBackPressure.called).toBe(true)
executed: expect.any(Number),
executing: expect.any(Number),
failed: 0,
- queued: 0
+ queued: 0,
+ stolen: 0
},
runTime: {
history: expect.any(CircularArray)
executing: 0,
queued: 0,
maxQueued: 0,
+ stolen: 0,
failed: 0
},
runTime: {
executing: 0,
queued: 0,
maxQueued: 0,
+ stolen: 0,
failed: 0
},
runTime: {
executing: 0,
queued: 0,
maxQueued: 0,
+ stolen: 0,
failed: 0
},
runTime: {
executing: 0,
queued: 0,
maxQueued: 0,
+ stolen: 0,
failed: 0
},
runTime: {
executing: 0,
queued: 0,
maxQueued: 0,
+ stolen: 0,
failed: 0
},
runTime: {
executing: 0,
queued: 0,
maxQueued: 0,
+ stolen: 0,
failed: 0
},
runTime: {
executing: 0,
queued: 0,
maxQueued: 0,
+ stolen: 0,
failed: 0
},
runTime: {
history: expect.any(CircularArray)
},
elu: {
- idle: expect.objectContaining({
+ idle: {
history: expect.any(CircularArray)
- }),
- active: expect.objectContaining({
+ },
+ active: {
history: expect.any(CircularArray)
- })
+ }
}
})
expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
executing: 0,
queued: 0,
maxQueued: 0,
+ stolen: 0,
failed: 0
},
runTime: {
history: expect.any(CircularArray)
},
elu: {
- idle: expect.objectContaining({
+ idle: {
history: expect.any(CircularArray)
- }),
- active: expect.objectContaining({
+ },
+ active: {
history: expect.any(CircularArray)
- })
+ }
}
})
expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
executing: 0,
queued: 0,
maxQueued: 0,
+ stolen: 0,
failed: 0
},
- runTime: expect.objectContaining({
+ runTime: {
history: expect.any(CircularArray)
- }),
+ },
waitTime: {
history: expect.any(CircularArray)
},
elu: {
- idle: expect.objectContaining({
+ idle: {
history: expect.any(CircularArray)
- }),
- active: expect.objectContaining({
+ },
+ active: {
history: expect.any(CircularArray)
- })
+ }
}
})
expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
executing: 0,
queued: 0,
maxQueued: 0,
+ stolen: 0,
failed: 0
},
- runTime: expect.objectContaining({
+ runTime: {
history: expect.any(CircularArray)
- }),
+ },
waitTime: {
history: expect.any(CircularArray)
},
elu: {
- idle: expect.objectContaining({
+ idle: {
history: expect.any(CircularArray)
- }),
- active: expect.objectContaining({
+ },
+ active: {
history: expect.any(CircularArray)
- })
+ }
}
})
expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
executing: 0,
queued: 0,
maxQueued: 0,
+ stolen: 0,
failed: 0
},
- runTime: expect.objectContaining({
+ runTime: {
history: expect.any(CircularArray)
- }),
+ },
waitTime: {
history: expect.any(CircularArray)
},
elu: {
- idle: expect.objectContaining({
+ idle: {
history: expect.any(CircularArray)
- }),
- active: expect.objectContaining({
+ },
+ active: {
history: expect.any(CircularArray)
- })
+ }
}
})
expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
executing: 0,
queued: 0,
maxQueued: 0,
+ stolen: 0,
failed: 0
},
runTime: expect.objectContaining({
executing: 0,
queued: 0,
maxQueued: 0,
+ stolen: 0,
failed: 0
},
runTime: expect.objectContaining({
executing: 0,
queued: 0,
maxQueued: 0,
+ stolen: 0,
failed: 0
},
runTime: expect.objectContaining({
executing: 0,
queued: 0,
maxQueued: 0,
+ stolen: 0,
failed: 0
},
runTime: {
executing: 0,
queued: 0,
maxQueued: 0,
+ stolen: 0,
failed: 0
},
runTime: {