From: Jérôme Benoit Date: Thu, 24 Aug 2023 20:53:02 +0000 (+0200) Subject: feat: add continuous task stealing X-Git-Tag: v2.6.35~21 X-Git-Url: https://git.piment-noir.org/?a=commitdiff_plain;h=033f1776d603b75ce8dcd763278a1ce8fca5c479;p=poolifier.git feat: add continuous task stealing Signed-off-by: Jérôme Benoit --- diff --git a/CHANGELOG.md b/CHANGELOG.md index 0e8866ff..3435faeb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Update simple moving average implementation to use a circular buffer. - Update simple moving median implementation to use a circular buffer. +### Added + +- Support for continuous tasks stealing. + ## [2.6.34] - 2023-08-24 ### Fixes diff --git a/docs/api.md b/docs/api.md index 5ffe92c0..9769c3c7 100644 --- a/docs/api.md +++ b/docs/api.md @@ -76,9 +76,9 @@ An object with these properties: - `choiceRetries` (optional) - The number of retries to perform if no worker is eligible. - `measurement` (optional) - The measurement to use in worker choice strategies: `runTime`, `waitTime` or `elu`. - - `runTime` (optional) - Use the tasks simple moving [median](./../docs/worker-choice-strategies.md#simple-moving-median) runtime instead of the tasks simple moving average runtime in worker choice strategies. - - `waitTime` (optional) - Use the tasks simple moving [median](./../docs/worker-choice-strategies.md#simple-moving-median) wait time instead of the tasks simple moving average wait time in worker choice strategies. - - `elu` (optional) - Use the tasks simple moving [median](./../docs/worker-choice-strategies.md#simple-moving-median) ELU instead of the tasks simple moving average ELU in worker choice strategies. + - `runTime` (optional) - Use the tasks [simple moving median](./../docs/worker-choice-strategies.md#simple-moving-median) runtime instead of the tasks simple moving average runtime in worker choice strategies. + - `waitTime` (optional) - Use the tasks [simple moving median](./../docs/worker-choice-strategies.md#simple-moving-median) wait time instead of the tasks simple moving average wait time in worker choice strategies. + - `elu` (optional) - Use the tasks [simple moving median](./../docs/worker-choice-strategies.md#simple-moving-median) ELU instead of the tasks simple moving average ELU in worker choice strategies. - `weights` (optional) - The worker weights to use in weighted round robin worker choice strategies: `{ 0: 200, 1: 300, ..., n: 100 }`. Default: `{ choiceRetries: 6, runTime: { median: false }, waitTime: { median: false }, elu: { median: false } }` diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 8c202696..a6ffcbbb 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -785,6 +785,7 @@ export abstract class AbstractPool< if ( this.opts.enableTasksQueue === false || (this.opts.enableTasksQueue === true && + this.tasksQueueSize(workerNodeKey) === 0 && this.workerNodes[workerNodeKey].usage.tasks.executing < (this.opts.tasksQueueOptions?.concurrency as number)) ) { @@ -836,7 +837,7 @@ export abstract class AbstractPool< * @virtual */ protected setupHook (): void { - // Intentionally empty + /** Intentionally empty */ } /** @@ -1199,15 +1200,8 @@ export abstract class AbstractPool< while (this.tasksQueueSize(workerNodeKey) > 0) { let destinationWorkerNodeKey!: number let minQueuedTasks = Infinity - let executeTask = false for (const [workerNodeId, workerNode] of this.workerNodes.entries()) { if (workerNode.info.ready && workerNodeId !== workerNodeKey) { - if ( - workerNode.usage.tasks.executing < - (this.opts.tasksQueueOptions?.concurrency as number) - ) { - executeTask = true - } if (workerNode.usage.tasks.queued === 0) { destinationWorkerNodeKey = workerNodeId break @@ -1219,12 +1213,16 @@ export abstract class AbstractPool< } } if (destinationWorkerNodeKey != null) { + const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey] const task = { ...(this.dequeueTask(workerNodeKey) as Task), - workerId: (this.getWorkerInfo(destinationWorkerNodeKey) as WorkerInfo) - .id as number + workerId: destinationWorkerNode.info.id as number } - if (executeTask) { + if ( + this.tasksQueueSize(destinationWorkerNodeKey) === 0 && + destinationWorkerNode.usage.tasks.executing < + (this.opts.tasksQueueOptions?.concurrency as number) + ) { this.executeTask(destinationWorkerNodeKey, task) } else { this.enqueueTask(destinationWorkerNodeKey, task) @@ -1255,13 +1253,18 @@ export abstract class AbstractPool< ...(sourceWorkerNode.popTask() as Task), workerId: destinationWorkerNode.info.id as number } + // Enqueue task for continuous task stealing + this.enqueueTask(destinationWorkerNodeKey, task) + // Avoid starvation if ( + this.tasksQueueSize(destinationWorkerNodeKey) > 0 && destinationWorkerNode.usage.tasks.executing < - (this.opts.tasksQueueOptions?.concurrency as number) + (this.opts.tasksQueueOptions?.concurrency as number) ) { - this.executeTask(destinationWorkerNodeKey, task) - } else { - this.enqueueTask(destinationWorkerNodeKey, task) + this.executeTask( + destinationWorkerNodeKey, + this.dequeueTask(destinationWorkerNodeKey) as Task + ) } break } @@ -1290,8 +1293,9 @@ export abstract class AbstractPool< workerId: workerNode.info.id as number } if ( + this.tasksQueueSize(workerNodeKey) === 0 && workerNode.usage.tasks.executing < - (this.opts.tasksQueueOptions?.concurrency as number) + (this.opts.tasksQueueOptions?.concurrency as number) ) { this.executeTask(workerNodeKey, task) } else { diff --git a/src/pools/pool.ts b/src/pools/pool.ts index 7b549809..cc028fec 100644 --- a/src/pools/pool.ts +++ b/src/pools/pool.ts @@ -209,7 +209,7 @@ export interface IPool< * - `'ready'`: Emitted when the number of workers created in the pool has reached the minimum size expected and are ready. * - `'busy'`: Emitted when the number of workers created in the pool has reached the maximum size expected and are executing at least one task. * - `'full'`: Emitted when the pool is dynamic and the number of workers created has reached the maximum size expected. - * - '`destroy`': Emitted when the pool is destroyed. + * - `'destroy'`: Emitted when the pool is destroyed. * - `'error'`: Emitted when an uncaught error occurs. * - `'taskError'`: Emitted when an error occurs while executing a task. * - `'backPressure'`: Emitted when all worker nodes have back pressure (i.e. their tasks queue is full: queue size \>= maximum queue size). diff --git a/tests/pools/abstract/abstract-pool.test.js b/tests/pools/abstract/abstract-pool.test.js index a5976d99..7f7c4dce 100644 --- a/tests/pools/abstract/abstract-pool.test.js +++ b/tests/pools/abstract/abstract-pool.test.js @@ -956,11 +956,11 @@ describe('Abstract pool test suite', () => { ++poolBackPressure poolInfo = info }) - for (let i = 0; i < numberOfWorkers * 2; i++) { + for (let i = 0; i < numberOfWorkers + 1; i++) { promises.add(pool.execute()) } await Promise.all(promises) - expect(poolBackPressure).toBe(2) + expect(poolBackPressure).toBe(1) expect(poolInfo).toStrictEqual({ version, type: PoolTypes.fixed,