From: Jérôme Benoit Date: Mon, 10 Apr 2023 19:58:30 +0000 (+0200) Subject: feat: add worker tasks queue options to pool options X-Git-Tag: v2.4.7~16 X-Git-Url: https://git.piment-noir.org/?a=commitdiff_plain;h=7171d33f7583a56bf43108f4b77b2ce4a0e9d969;p=poolifier.git feat: add worker tasks queue options to pool options Signed-off-by: Jérôme Benoit --- diff --git a/CHANGELOG.md b/CHANGELOG.md index 3af71d54..8e85ba1e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- Add worker tasks queue options to pool options. + ## [2.4.6] - 2023-04-10 ### Fixed diff --git a/README.md b/README.md index da169020..9b1f8131 100644 --- a/README.md +++ b/README.md @@ -178,11 +178,19 @@ Node versions >= 16.x are supported. Properties: - `medRunTime` (optional) - Use the tasks median run time instead of the tasks average run time in worker choice strategies. - Default: { medRunTime: false } + + Default: { medRunTime: false } - `enableEvents` (optional) - Events emission enablement in this pool. Default: true - `enableTasksQueue` (optional, experimental) - Tasks queue per worker enablement in this pool. Default: false +- `tasksQueueOptions` (optional, experimental) - The worker tasks queue options object to use in this pool. + Properties: + + - `concurrency` (optional) - The maximum number of tasks that can be executed concurrently on a worker. + + Default: { concurrency: 1 } + ### `pool = new DynamicThreadPool/DynamicClusterPool(min, max, filePath, opts)` `min` (mandatory) Same as FixedThreadPool/FixedClusterPool numberOfThreads/numberOfWorkers, this number of workers will be always active diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 6462cb09..1fd9a173 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -6,7 +6,7 @@ import { median } from '../utils' import { KillBehaviors, isKillBehavior } from '../worker/worker-options' -import { PoolEvents, type PoolOptions } from './pool' +import { PoolEvents, type PoolOptions, type TasksQueueOptions } from './pool' import { PoolEmitter } from './pool' import type { IPoolInternal } from './pool-internal' import { PoolType } from './pool-internal' @@ -139,6 +139,18 @@ export abstract class AbstractPool< opts.workerChoiceStrategyOptions ?? DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS this.opts.enableEvents = opts.enableEvents ?? true this.opts.enableTasksQueue = opts.enableTasksQueue ?? false + if (this.opts.enableTasksQueue) { + if ((opts.tasksQueueOptions?.concurrency as number) <= 0) { + throw new Error( + `Invalid tasks queue concurrency '${ + (opts.tasksQueueOptions as TasksQueueOptions).concurrency as number + }'` + ) + } + this.opts.tasksQueueOptions = { + concurrency: opts.tasksQueueOptions?.concurrency ?? 1 + } + } } private checkValidWorkerChoiceStrategy ( @@ -245,7 +257,11 @@ export abstract class AbstractPool< }) if ( this.opts.enableTasksQueue === true && - (this.busy || this.workerNodes[workerNodeKey].tasksUsage.running > 0) + (this.busy || + this.workerNodes[workerNodeKey].tasksUsage.running > + ((this.opts.tasksQueueOptions as TasksQueueOptions) + .concurrency as number) - + 1) ) { this.enqueueTask(workerNodeKey, submittedTask) } else { diff --git a/src/pools/pool.ts b/src/pools/pool.ts index f24d79e4..2cdf9b74 100644 --- a/src/pools/pool.ts +++ b/src/pools/pool.ts @@ -29,6 +29,18 @@ export const PoolEvents = Object.freeze({ */ export type PoolEvent = keyof typeof PoolEvents +/** + * Worker tasks queue options. + */ +export interface TasksQueueOptions { + /** + * Maximum number of tasks that can be executed concurrently on a worker. + * + * @defaultValue 1 + */ + concurrency?: number +} + /** * Options for a poolifier pool. */ @@ -70,6 +82,13 @@ export interface PoolOptions { * @defaultValue false */ enableTasksQueue?: boolean + /** + * Pool worker tasks queue options. + * + * @experimental + * @defaultValue \{ concurrency: 1 \} + */ + tasksQueueOptions?: TasksQueueOptions } /**