- `choiceRetries` (optional) - The number of retries to perform if no worker is eligible.
- `measurement` (optional) - The measurement to use in worker choice strategies: `runTime`, `waitTime` or `elu`.
- - `runTime` (optional) - Use the tasks simple moving [median](./../docs/worker-choice-strategies.md#simple-moving-median) runtime instead of the tasks simple moving average runtime in worker choice strategies.
- - `waitTime` (optional) - Use the tasks simple moving [median](./../docs/worker-choice-strategies.md#simple-moving-median) wait time instead of the tasks simple moving average wait time in worker choice strategies.
- - `elu` (optional) - Use the tasks simple moving [median](./../docs/worker-choice-strategies.md#simple-moving-median) ELU instead of the tasks simple moving average ELU in worker choice strategies.
+ - `runTime` (optional) - Use the tasks [simple moving median](./../docs/worker-choice-strategies.md#simple-moving-median) runtime instead of the tasks simple moving average runtime in worker choice strategies.
+ - `waitTime` (optional) - Use the tasks [simple moving median](./../docs/worker-choice-strategies.md#simple-moving-median) wait time instead of the tasks simple moving average wait time in worker choice strategies.
+ - `elu` (optional) - Use the tasks [simple moving median](./../docs/worker-choice-strategies.md#simple-moving-median) ELU instead of the tasks simple moving average ELU in worker choice strategies.
- `weights` (optional) - The worker weights to use in weighted round robin worker choice strategies: `{ 0: 200, 1: 300, ..., n: 100 }`.
Default: `{ choiceRetries: 6, runTime: { median: false }, waitTime: { median: false }, elu: { median: false } }`
if (
this.opts.enableTasksQueue === false ||
(this.opts.enableTasksQueue === true &&
+ this.tasksQueueSize(workerNodeKey) === 0 &&
this.workerNodes[workerNodeKey].usage.tasks.executing <
(this.opts.tasksQueueOptions?.concurrency as number))
) {
* @virtual
*/
protected setupHook (): void {
- // Intentionally empty
+ /** Intentionally empty */
}
/**
while (this.tasksQueueSize(workerNodeKey) > 0) {
let destinationWorkerNodeKey!: number
let minQueuedTasks = Infinity
- let executeTask = false
for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
if (workerNode.info.ready && workerNodeId !== workerNodeKey) {
- if (
- workerNode.usage.tasks.executing <
- (this.opts.tasksQueueOptions?.concurrency as number)
- ) {
- executeTask = true
- }
if (workerNode.usage.tasks.queued === 0) {
destinationWorkerNodeKey = workerNodeId
break
}
}
if (destinationWorkerNodeKey != null) {
+ const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey]
const task = {
...(this.dequeueTask(workerNodeKey) as Task<Data>),
- workerId: (this.getWorkerInfo(destinationWorkerNodeKey) as WorkerInfo)
- .id as number
+ workerId: destinationWorkerNode.info.id as number
}
- if (executeTask) {
+ if (
+ this.tasksQueueSize(destinationWorkerNodeKey) === 0 &&
+ destinationWorkerNode.usage.tasks.executing <
+ (this.opts.tasksQueueOptions?.concurrency as number)
+ ) {
this.executeTask(destinationWorkerNodeKey, task)
} else {
this.enqueueTask(destinationWorkerNodeKey, task)
...(sourceWorkerNode.popTask() as Task<Data>),
workerId: destinationWorkerNode.info.id as number
}
+ // Enqueue task for continuous task stealing
+ this.enqueueTask(destinationWorkerNodeKey, task)
+ // Avoid starvation
if (
+ this.tasksQueueSize(destinationWorkerNodeKey) > 0 &&
destinationWorkerNode.usage.tasks.executing <
- (this.opts.tasksQueueOptions?.concurrency as number)
+ (this.opts.tasksQueueOptions?.concurrency as number)
) {
- this.executeTask(destinationWorkerNodeKey, task)
- } else {
- this.enqueueTask(destinationWorkerNodeKey, task)
+ this.executeTask(
+ destinationWorkerNodeKey,
+ this.dequeueTask(destinationWorkerNodeKey) as Task<Data>
+ )
}
break
}
workerId: workerNode.info.id as number
}
if (
+ this.tasksQueueSize(workerNodeKey) === 0 &&
workerNode.usage.tasks.executing <
- (this.opts.tasksQueueOptions?.concurrency as number)
+ (this.opts.tasksQueueOptions?.concurrency as number)
) {
this.executeTask(workerNodeKey, task)
} else {
* - `'ready'`: Emitted when the number of workers created in the pool has reached the minimum size expected and are ready.
* - `'busy'`: Emitted when the number of workers created in the pool has reached the maximum size expected and are executing at least one task.
* - `'full'`: Emitted when the pool is dynamic and the number of workers created has reached the maximum size expected.
- * - '`destroy`': Emitted when the pool is destroyed.
+ * - `'destroy'`: Emitted when the pool is destroyed.
* - `'error'`: Emitted when an uncaught error occurs.
* - `'taskError'`: Emitted when an error occurs while executing a task.
* - `'backPressure'`: Emitted when all worker nodes have back pressure (i.e. their tasks queue is full: queue size \>= maximum queue size).