feat: add worker tasks queue options to pool options
authorJérôme Benoit <jerome.benoit@sap.com>
Mon, 10 Apr 2023 19:58:30 +0000 (21:58 +0200)
committerJérôme Benoit <jerome.benoit@sap.com>
Mon, 10 Apr 2023 19:58:30 +0000 (21:58 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>
CHANGELOG.md
README.md
src/pools/abstract-pool.ts
src/pools/pool.ts

index 3af71d5449038677d17aa26769277403f01f3941..8e85ba1ec295230c62aed50011d889f7b8b73645 100644 (file)
@@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
 
 ## [Unreleased]
 
+### Added
+
+- Add worker tasks queue options to pool options.
+
 ## [2.4.6] - 2023-04-10
 
 ### Fixed
index da1690207e21ce6cea04dbe87f3768f3641132d6..9b1f81317b5a28e920789a096be724b14459f793 100644 (file)
--- a/README.md
+++ b/README.md
@@ -178,11 +178,19 @@ Node versions >= 16.x are supported.
   Properties:
 
   - `medRunTime` (optional) - Use the tasks median run time instead of the tasks average run time in worker choice strategies.
-    Default: { medRunTime: false }
+
+  Default: { medRunTime: false }
 
 - `enableEvents` (optional) - Events emission enablement in this pool. Default: true
 - `enableTasksQueue` (optional, experimental) - Tasks queue per worker enablement in this pool. Default: false
 
+- `tasksQueueOptions` (optional, experimental) - The worker tasks queue options object to use in this pool.  
+  Properties:
+
+  - `concurrency` (optional) - The maximum number of tasks that can be executed concurrently on a worker.
+
+  Default: { concurrency: 1 }
+
 ### `pool = new DynamicThreadPool/DynamicClusterPool(min, max, filePath, opts)`
 
 `min` (mandatory) Same as FixedThreadPool/FixedClusterPool numberOfThreads/numberOfWorkers, this number of workers will be always active  
index 6462cb09a882c6c005ceea481cacb5519e2de0de..1fd9a1737f10991489af3b006829dcaf3e7bd580 100644 (file)
@@ -6,7 +6,7 @@ import {
   median
 } from '../utils'
 import { KillBehaviors, isKillBehavior } from '../worker/worker-options'
-import { PoolEvents, type PoolOptions } from './pool'
+import { PoolEvents, type PoolOptions, type TasksQueueOptions } from './pool'
 import { PoolEmitter } from './pool'
 import type { IPoolInternal } from './pool-internal'
 import { PoolType } from './pool-internal'
@@ -139,6 +139,18 @@ export abstract class AbstractPool<
       opts.workerChoiceStrategyOptions ?? DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
     this.opts.enableEvents = opts.enableEvents ?? true
     this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
+    if (this.opts.enableTasksQueue) {
+      if ((opts.tasksQueueOptions?.concurrency as number) <= 0) {
+        throw new Error(
+          `Invalid tasks queue concurrency '${
+            (opts.tasksQueueOptions as TasksQueueOptions).concurrency as number
+          }'`
+        )
+      }
+      this.opts.tasksQueueOptions = {
+        concurrency: opts.tasksQueueOptions?.concurrency ?? 1
+      }
+    }
   }
 
   private checkValidWorkerChoiceStrategy (
@@ -245,7 +257,11 @@ export abstract class AbstractPool<
     })
     if (
       this.opts.enableTasksQueue === true &&
-      (this.busy || this.workerNodes[workerNodeKey].tasksUsage.running > 0)
+      (this.busy ||
+        this.workerNodes[workerNodeKey].tasksUsage.running >
+          ((this.opts.tasksQueueOptions as TasksQueueOptions)
+            .concurrency as number) -
+            1)
     ) {
       this.enqueueTask(workerNodeKey, submittedTask)
     } else {
index f24d79e41e01dfba297821ea88acc22066052550..2cdf9b748db6350b0bfa7c9e64aedd8b32b296cf 100644 (file)
@@ -29,6 +29,18 @@ export const PoolEvents = Object.freeze({
  */
 export type PoolEvent = keyof typeof PoolEvents
 
+/**
+ * Worker tasks queue options.
+ */
+export interface TasksQueueOptions {
+  /**
+   * Maximum number of tasks that can be executed concurrently on a worker.
+   *
+   * @defaultValue 1
+   */
+  concurrency?: number
+}
+
 /**
  * Options for a poolifier pool.
  */
@@ -70,6 +82,13 @@ export interface PoolOptions<Worker extends IWorker> {
    * @defaultValue false
    */
   enableTasksQueue?: boolean
+  /**
+   * Pool worker tasks queue options.
+   *
+   * @experimental
+   * @defaultValue \{ concurrency: 1 \}
+   */
+  tasksQueueOptions?: TasksQueueOptions
 }
 
 /**