## [Unreleased]
+### Fixed
+
+- Disable `tasksStealingOnBackPressure` by default until performance issues under heavy load are sorted out.
+
## [4.0.3] - 2024-05-08
### Changed
- Add `startWorkers` to pool options to whether start the minimum number of workers at pool initialization or not.
- Add `start()` method to pool API to start the minimum number of workers.
-- Add `taskStealing` and `tasksStealingOnPressure` to tasks queue options to whether enable task stealing or not and whether enable tasks stealing under back pressure or not.
+- Add `taskStealing` and `tasksStealingOnBackPressure` to tasks queue options to whether enable task stealing or not and whether enable tasks stealing under back pressure or not.
- Continuous internal benchmarking: [https://poolifier.github.io/benchmark-results/dev/bench](https://poolifier.github.io/benchmark-results/dev/bench).
## [2.6.44] - 2023-09-08
- `tasksStealingOnBackPressure` (optional) - Tasks stealing enablement under back pressure.
- `tasksFinishedTimeout` (optional) - Queued tasks finished timeout in milliseconds at worker termination.
- Default: `{ size: (pool maximum size)^2, concurrency: 1, taskStealing: true, tasksStealingOnBackPressure: true, tasksFinishedTimeout: 2000 }`
+ Default: `{ size: (pool maximum size)^2, concurrency: 1, taskStealing: true, tasksStealingOnBackPressure: false, tasksFinishedTimeout: 2000 }`
- `workerOptions` (optional) - An object with the worker options to pass to worker. See [worker_threads](https://nodejs.org/api/worker_threads.html#worker_threads_new_worker_filename_options) for more details.
)
if (sourceWorkerNode != null) {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- const task = sourceWorkerNode.dequeueLastBucketTask()!
+ const task = sourceWorkerNode.dequeueLastPrioritizedTask()!
this.handleTask(workerNodeKey, task)
this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
): void => {
if (
this.cannotStealTask() ||
+ this.hasBackPressure() ||
(this.info.stealingWorkerNodes ?? 0) >
Math.floor(this.workerNodes.length / 2)
) {
}
workerInfo.stealing = true
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- const task = sourceWorkerNode.dequeueLastBucketTask()!
+ const task = sourceWorkerNode.dequeueLastPrioritizedTask()!
this.handleTask(workerNodeKey, task)
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey, task.name!)
/**
* Whether to enable tasks stealing under back pressure.
*
- * @defaultValue true
+ * @defaultValue false
*/
readonly tasksStealingOnBackPressure?: boolean
/**
size: Math.pow(poolMaxSize, 2),
concurrency: 1,
taskStealing: true,
- tasksStealingOnBackPressure: true,
+ tasksStealingOnBackPressure: false,
tasksFinishedTimeout: 2000
}
}
/** @inheritdoc */
public tasksQueueBackPressureSize: number
private readonly tasksQueue: PriorityQueue<Task<Data>>
- private onBackPressureStarted: boolean
+ private setBackPressureFlag: boolean
private readonly taskFunctionsUsage: Map<string, WorkerUsage>
/**
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
this.tasksQueueBackPressureSize = opts.tasksQueueBackPressureSize!
this.tasksQueue = new PriorityQueue<Task<Data>>(opts.tasksQueueBucketSize)
- this.onBackPressureStarted = false
+ this.setBackPressureFlag = false
this.taskFunctionsUsage = new Map<string, WorkerUsage>()
}
/** @inheritdoc */
public enqueueTask (task: Task<Data>): number {
const tasksQueueSize = this.tasksQueue.enqueue(task, task.priority)
- if (this.hasBackPressure() && !this.onBackPressureStarted) {
- this.onBackPressureStarted = true
+ if (
+ !this.setBackPressureFlag &&
+ this.hasBackPressure() &&
+ !this.info.backPressure
+ ) {
+ this.setBackPressureFlag = true
+ this.info.backPressure = true
this.emit('backPressure', { workerId: this.info.id })
- this.onBackPressureStarted = false
}
+ this.setBackPressureFlag = false
return tasksQueueSize
}
/** @inheritdoc */
public dequeueTask (bucket?: number): Task<Data> | undefined {
- return this.tasksQueue.dequeue(bucket)
+ const task = this.tasksQueue.dequeue(bucket)
+ if (
+ !this.setBackPressureFlag &&
+ !this.hasBackPressure() &&
+ this.info.backPressure
+ ) {
+ this.setBackPressureFlag = true
+ this.info.backPressure = false
+ }
+ this.setBackPressureFlag = false
+ return task
}
/** @inheritdoc */
- public dequeueLastBucketTask (): Task<Data> | undefined {
+ public dequeueLastPrioritizedTask (): Task<Data> | undefined {
// Start from the last empty or partially filled bucket
- return this.tasksQueue.dequeue(this.tasksQueue.buckets + 1)
+ return this.dequeueTask(this.tasksQueue.buckets + 1)
}
/** @inheritdoc */
type: getWorkerType(worker)!,
dynamic: false,
ready: false,
+ backPressure: false,
stealing: false
}
}
* This flag is set to `true` when worker node is stealing tasks from another worker node.
*/
stealing: boolean
+ /**
+ * Back pressure flag.
+ * This flag is set to `true` when worker node tasks queue has back pressure.
+ */
+ backPressure: boolean
/**
* Task functions properties.
*/
*/
readonly dequeueTask: (bucket?: number) => Task<Data> | undefined
/**
- * Dequeue last bucket task.
+ * Dequeue last prioritized task.
*
* @returns The dequeued task.
*/
- readonly dequeueLastBucketTask: () => Task<Data> | undefined
+ readonly dequeueLastPrioritizedTask: () => Task<Data> | undefined
/**
* Clears tasks queue.
*/
concurrency: 2,
size: Math.pow(numberOfWorkers, 2),
taskStealing: true,
- tasksStealingOnBackPressure: true,
+ tasksStealingOnBackPressure: false,
tasksFinishedTimeout: 2000
},
workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
concurrency: 1,
size: Math.pow(numberOfWorkers, 2),
taskStealing: true,
- tasksStealingOnBackPressure: true,
+ tasksStealingOnBackPressure: false,
tasksFinishedTimeout: 2000
})
pool.enableTasksQueue(true, { concurrency: 2 })
concurrency: 2,
size: Math.pow(numberOfWorkers, 2),
taskStealing: true,
- tasksStealingOnBackPressure: true,
+ tasksStealingOnBackPressure: false,
tasksFinishedTimeout: 2000
})
pool.enableTasksQueue(false)
concurrency: 1,
size: Math.pow(numberOfWorkers, 2),
taskStealing: true,
- tasksStealingOnBackPressure: true,
+ tasksStealingOnBackPressure: false,
tasksFinishedTimeout: 2000
})
for (const workerNode of pool.workerNodes) {
type: WorkerTypes.cluster,
dynamic: false,
ready: true,
+ backPressure: false,
stealing: false
})
}
type: WorkerTypes.thread,
dynamic: false,
ready: true,
+ backPressure: false,
stealing: false
})
}
concurrency: 1,
size: Math.pow(poolMaxSize, 2),
taskStealing: true,
- tasksStealingOnBackPressure: true,
+ tasksStealingOnBackPressure: false,
tasksFinishedTimeout: 2000
})
})
type: WorkerTypes.thread,
dynamic: false,
ready: false,
+ backPressure: false,
stealing: false
})
expect(threadWorkerNode.usage).toStrictEqual({
expect(threadWorkerNode.tasksQueueSize()).toBe(
threadWorkerNode.tasksQueue.size
)
- expect(threadWorkerNode.onBackPressureStarted).toBe(false)
+ expect(threadWorkerNode.setBackPressureFlag).toBe(false)
expect(threadWorkerNode.taskFunctionsUsage).toBeInstanceOf(Map)
expect(clusterWorkerNode).toBeInstanceOf(WorkerNode)
type: WorkerTypes.cluster,
dynamic: false,
ready: false,
+ backPressure: false,
stealing: false
})
expect(clusterWorkerNode.usage).toStrictEqual({
expect(clusterWorkerNode.tasksQueueSize()).toBe(
clusterWorkerNode.tasksQueue.size
)
- expect(clusterWorkerNode.onBackPressureStarted).toBe(false)
+ expect(clusterWorkerNode.setBackPressureFlag).toBe(false)
expect(clusterWorkerNode.taskFunctionsUsage).toBeInstanceOf(Map)
})