From: Jérôme Benoit Date: Wed, 8 May 2024 20:44:42 +0000 (+0200) Subject: fix: disable `tasksStealingOnBackPressure` by default X-Git-Tag: v4.0.4~4 X-Git-Url: https://git.piment-noir.org/?a=commitdiff_plain;h=2eee72204bc851f616ded11cb5381f96c6dc5cbf;p=poolifier.git fix: disable `tasksStealingOnBackPressure` by default Signed-off-by: Jérôme Benoit --- diff --git a/CHANGELOG.md b/CHANGELOG.md index 66877534..a5b67be0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Fixed + +- Disable `tasksStealingOnBackPressure` by default until performance issues under heavy load are sorted out. + ## [4.0.3] - 2024-05-08 ### Changed @@ -410,7 +414,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Add `startWorkers` to pool options to whether start the minimum number of workers at pool initialization or not. - Add `start()` method to pool API to start the minimum number of workers. -- Add `taskStealing` and `tasksStealingOnPressure` to tasks queue options to whether enable task stealing or not and whether enable tasks stealing under back pressure or not. +- Add `taskStealing` and `tasksStealingOnBackPressure` to tasks queue options to whether enable task stealing or not and whether enable tasks stealing under back pressure or not. - Continuous internal benchmarking: [https://poolifier.github.io/benchmark-results/dev/bench](https://poolifier.github.io/benchmark-results/dev/bench). ## [2.6.44] - 2023-09-08 diff --git a/docs/api.md b/docs/api.md index 634ab08b..458017a1 100644 --- a/docs/api.md +++ b/docs/api.md @@ -137,7 +137,7 @@ An object with these properties: - `tasksStealingOnBackPressure` (optional) - Tasks stealing enablement under back pressure. - `tasksFinishedTimeout` (optional) - Queued tasks finished timeout in milliseconds at worker termination. - Default: `{ size: (pool maximum size)^2, concurrency: 1, taskStealing: true, tasksStealingOnBackPressure: true, tasksFinishedTimeout: 2000 }` + Default: `{ size: (pool maximum size)^2, concurrency: 1, taskStealing: true, tasksStealingOnBackPressure: false, tasksFinishedTimeout: 2000 }` - `workerOptions` (optional) - An object with the worker options to pass to worker. See [worker_threads](https://nodejs.org/api/worker_threads.html#worker_threads_new_worker_filename_options) for more details. diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 065d3996..0de96a41 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -1817,7 +1817,7 @@ export abstract class AbstractPool< ) if (sourceWorkerNode != null) { // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - const task = sourceWorkerNode.dequeueLastBucketTask()! + const task = sourceWorkerNode.dequeueLastPrioritizedTask()! this.handleTask(workerNodeKey, task) this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey) // eslint-disable-next-line @typescript-eslint/no-non-null-assertion @@ -1831,6 +1831,7 @@ export abstract class AbstractPool< ): void => { if ( this.cannotStealTask() || + this.hasBackPressure() || (this.info.stealingWorkerNodes ?? 0) > Math.floor(this.workerNodes.length / 2) ) { @@ -1868,7 +1869,7 @@ export abstract class AbstractPool< } workerInfo.stealing = true // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - const task = sourceWorkerNode.dequeueLastBucketTask()! + const task = sourceWorkerNode.dequeueLastPrioritizedTask()! this.handleTask(workerNodeKey, task) // eslint-disable-next-line @typescript-eslint/no-non-null-assertion this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey, task.name!) diff --git a/src/pools/pool.ts b/src/pools/pool.ts index ccb4b5b9..66815bf9 100644 --- a/src/pools/pool.ts +++ b/src/pools/pool.ts @@ -137,7 +137,7 @@ export interface TasksQueueOptions { /** * Whether to enable tasks stealing under back pressure. * - * @defaultValue true + * @defaultValue false */ readonly tasksStealingOnBackPressure?: boolean /** diff --git a/src/pools/utils.ts b/src/pools/utils.ts index b7b38ac4..df9c256e 100644 --- a/src/pools/utils.ts +++ b/src/pools/utils.ts @@ -43,7 +43,7 @@ export const getDefaultTasksQueueOptions = ( size: Math.pow(poolMaxSize, 2), concurrency: 1, taskStealing: true, - tasksStealingOnBackPressure: true, + tasksStealingOnBackPressure: false, tasksFinishedTimeout: 2000 } } diff --git a/src/pools/worker-node.ts b/src/pools/worker-node.ts index 5d28802c..772a839b 100644 --- a/src/pools/worker-node.ts +++ b/src/pools/worker-node.ts @@ -45,7 +45,7 @@ export class WorkerNode /** @inheritdoc */ public tasksQueueBackPressureSize: number private readonly tasksQueue: PriorityQueue> - private onBackPressureStarted: boolean + private setBackPressureFlag: boolean private readonly taskFunctionsUsage: Map /** @@ -70,7 +70,7 @@ export class WorkerNode // eslint-disable-next-line @typescript-eslint/no-non-null-assertion this.tasksQueueBackPressureSize = opts.tasksQueueBackPressureSize! this.tasksQueue = new PriorityQueue>(opts.tasksQueueBucketSize) - this.onBackPressureStarted = false + this.setBackPressureFlag = false this.taskFunctionsUsage = new Map() } @@ -82,23 +82,38 @@ export class WorkerNode /** @inheritdoc */ public enqueueTask (task: Task): number { const tasksQueueSize = this.tasksQueue.enqueue(task, task.priority) - if (this.hasBackPressure() && !this.onBackPressureStarted) { - this.onBackPressureStarted = true + if ( + !this.setBackPressureFlag && + this.hasBackPressure() && + !this.info.backPressure + ) { + this.setBackPressureFlag = true + this.info.backPressure = true this.emit('backPressure', { workerId: this.info.id }) - this.onBackPressureStarted = false } + this.setBackPressureFlag = false return tasksQueueSize } /** @inheritdoc */ public dequeueTask (bucket?: number): Task | undefined { - return this.tasksQueue.dequeue(bucket) + const task = this.tasksQueue.dequeue(bucket) + if ( + !this.setBackPressureFlag && + !this.hasBackPressure() && + this.info.backPressure + ) { + this.setBackPressureFlag = true + this.info.backPressure = false + } + this.setBackPressureFlag = false + return task } /** @inheritdoc */ - public dequeueLastBucketTask (): Task | undefined { + public dequeueLastPrioritizedTask (): Task | undefined { // Start from the last empty or partially filled bucket - return this.tasksQueue.dequeue(this.tasksQueue.buckets + 1) + return this.dequeueTask(this.tasksQueue.buckets + 1) } /** @inheritdoc */ @@ -197,6 +212,7 @@ export class WorkerNode type: getWorkerType(worker)!, dynamic: false, ready: false, + backPressure: false, stealing: false } } diff --git a/src/pools/worker.ts b/src/pools/worker.ts index 1a9973ba..99c0d9a6 100644 --- a/src/pools/worker.ts +++ b/src/pools/worker.ts @@ -172,6 +172,11 @@ export interface WorkerInfo { * This flag is set to `true` when worker node is stealing tasks from another worker node. */ stealing: boolean + /** + * Back pressure flag. + * This flag is set to `true` when worker node tasks queue has back pressure. + */ + backPressure: boolean /** * Task functions properties. */ @@ -326,11 +331,11 @@ export interface IWorkerNode */ readonly dequeueTask: (bucket?: number) => Task | undefined /** - * Dequeue last bucket task. + * Dequeue last prioritized task. * * @returns The dequeued task. */ - readonly dequeueLastBucketTask: () => Task | undefined + readonly dequeueLastPrioritizedTask: () => Task | undefined /** * Clears tasks queue. */ diff --git a/tests/pools/abstract-pool.test.mjs b/tests/pools/abstract-pool.test.mjs index 6a65f380..8cbbc0a5 100644 --- a/tests/pools/abstract-pool.test.mjs +++ b/tests/pools/abstract-pool.test.mjs @@ -275,7 +275,7 @@ describe('Abstract pool test suite', () => { concurrency: 2, size: Math.pow(numberOfWorkers, 2), taskStealing: true, - tasksStealingOnBackPressure: true, + tasksStealingOnBackPressure: false, tasksFinishedTimeout: 2000 }, workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED, @@ -591,7 +591,7 @@ describe('Abstract pool test suite', () => { concurrency: 1, size: Math.pow(numberOfWorkers, 2), taskStealing: true, - tasksStealingOnBackPressure: true, + tasksStealingOnBackPressure: false, tasksFinishedTimeout: 2000 }) pool.enableTasksQueue(true, { concurrency: 2 }) @@ -600,7 +600,7 @@ describe('Abstract pool test suite', () => { concurrency: 2, size: Math.pow(numberOfWorkers, 2), taskStealing: true, - tasksStealingOnBackPressure: true, + tasksStealingOnBackPressure: false, tasksFinishedTimeout: 2000 }) pool.enableTasksQueue(false) @@ -619,7 +619,7 @@ describe('Abstract pool test suite', () => { concurrency: 1, size: Math.pow(numberOfWorkers, 2), taskStealing: true, - tasksStealingOnBackPressure: true, + tasksStealingOnBackPressure: false, tasksFinishedTimeout: 2000 }) for (const workerNode of pool.workerNodes) { @@ -819,6 +819,7 @@ describe('Abstract pool test suite', () => { type: WorkerTypes.cluster, dynamic: false, ready: true, + backPressure: false, stealing: false }) } @@ -835,6 +836,7 @@ describe('Abstract pool test suite', () => { type: WorkerTypes.thread, dynamic: false, ready: true, + backPressure: false, stealing: false }) } diff --git a/tests/pools/utils.test.mjs b/tests/pools/utils.test.mjs index cfa378e1..3842c8c5 100644 --- a/tests/pools/utils.test.mjs +++ b/tests/pools/utils.test.mjs @@ -32,7 +32,7 @@ describe('Pool utils test suite', () => { concurrency: 1, size: Math.pow(poolMaxSize, 2), taskStealing: true, - tasksStealingOnBackPressure: true, + tasksStealingOnBackPressure: false, tasksFinishedTimeout: 2000 }) }) diff --git a/tests/pools/worker-node.test.mjs b/tests/pools/worker-node.test.mjs index d6339080..27ce3d2d 100644 --- a/tests/pools/worker-node.test.mjs +++ b/tests/pools/worker-node.test.mjs @@ -192,6 +192,7 @@ describe('Worker node test suite', () => { type: WorkerTypes.thread, dynamic: false, ready: false, + backPressure: false, stealing: false }) expect(threadWorkerNode.usage).toStrictEqual({ @@ -227,7 +228,7 @@ describe('Worker node test suite', () => { expect(threadWorkerNode.tasksQueueSize()).toBe( threadWorkerNode.tasksQueue.size ) - expect(threadWorkerNode.onBackPressureStarted).toBe(false) + expect(threadWorkerNode.setBackPressureFlag).toBe(false) expect(threadWorkerNode.taskFunctionsUsage).toBeInstanceOf(Map) expect(clusterWorkerNode).toBeInstanceOf(WorkerNode) @@ -237,6 +238,7 @@ describe('Worker node test suite', () => { type: WorkerTypes.cluster, dynamic: false, ready: false, + backPressure: false, stealing: false }) expect(clusterWorkerNode.usage).toStrictEqual({ @@ -272,7 +274,7 @@ describe('Worker node test suite', () => { expect(clusterWorkerNode.tasksQueueSize()).toBe( clusterWorkerNode.tasksQueue.size ) - expect(clusterWorkerNode.onBackPressureStarted).toBe(false) + expect(clusterWorkerNode.setBackPressureFlag).toBe(false) expect(clusterWorkerNode.taskFunctionsUsage).toBeInstanceOf(Map) })