Extract selection strategies to classes (#176)
authorShinigami <chrissi92@hotmail.de>
Sun, 21 Feb 2021 14:01:33 +0000 (15:01 +0100)
committerGitHub <noreply@github.com>
Sun, 21 Feb 2021 14:01:33 +0000 (15:01 +0100)
Co-authored-by: Shinigami92 <chrissi92@hotmail.de>
Co-authored-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
19 files changed:
.eslintrc.js
.vscode/settings.json
package-lock.json
src/index.ts
src/pools/abstract-pool.ts
src/pools/cluster/dynamic.ts
src/pools/cluster/fixed.ts
src/pools/pool-internal.ts [new file with mode: 0644]
src/pools/pool.ts
src/pools/selection-strategies.ts [new file with mode: 0644]
src/pools/thread/dynamic.ts
src/pools/thread/fixed.ts
src/worker/abstract-worker.ts
tests/pools/abstract/abstract-pool.test.js
tests/pools/cluster/dynamic.test.js
tests/pools/cluster/fixed.test.js
tests/pools/selection-strategies.test.js [new file with mode: 0644]
tests/pools/thread/dynamic.test.js
tests/pools/thread/fixed.test.js

index c3c51fe731d28df4f2f5268a849cd84e24849d18..a5a0075e24e5f8acb9686ccf579174e97419b6ff 100644 (file)
@@ -50,12 +50,13 @@ module.exports = {
       'warn',
       {
         skipWords: [
-          'poolifier',
           'christopher',
           'ecma',
           'enum',
+          'inheritdoc',
           'jsdoc',
           'pioardi',
+          'poolifier',
           'readonly',
           'serializable',
           'unregister',
@@ -70,6 +71,8 @@ module.exports = {
       files: ['src/**/*.ts'],
       extends: 'plugin:jsdoc/recommended',
       rules: {
+        'no-useless-constructor': 'off',
+
         'jsdoc/match-description': [
           'warn',
           {
index cf389d7978277b8f3eeb6a9d2360217c9c5e6f8f..886a5af31d2e7b7f33f9920f9d3239edae6e0fc8 100644 (file)
@@ -6,6 +6,7 @@
     "Dependabot",
     "Gitter",
     "Shinigami",
+    "inheritdoc",
     "lcov",
     "loglevel",
     "markdownlint",
index 9713230d01a4b61de3a42c2e0045ad9d5c2bfd5e..24bd130963e23b09712bbd6fed9d61fb749277da 100644 (file)
       }
     },
     "array-includes": {
-      "version": "3.1.2",
-      "resolved": "https://registry.npmjs.org/array-includes/-/array-includes-3.1.2.tgz",
-      "integrity": "sha512-w2GspexNQpx+PutG3QpT437/BenZBj0M/MZGn5mzv/MofYqo0xmRHzn4lFsoDlWJ+THYsGJmFlW68WlDFx7VRw==",
+      "version": "3.1.3",
+      "resolved": "https://registry.npmjs.org/array-includes/-/array-includes-3.1.3.tgz",
+      "integrity": "sha512-gcem1KlBU7c9rB+Rq8/3PPKsK2kjqeEBa3bD5kkQo4nYlOHQCJqIJFqBXDEfwaRuYTT4E+FxA9xez7Gf/e3Q7A==",
       "dev": true,
       "requires": {
-        "call-bind": "^1.0.0",
+        "call-bind": "^1.0.2",
         "define-properties": "^1.1.3",
-        "es-abstract": "^1.18.0-next.1",
-        "get-intrinsic": "^1.0.1",
+        "es-abstract": "^1.18.0-next.2",
+        "get-intrinsic": "^1.1.1",
         "is-string": "^1.0.5"
       }
     },
       }
     },
     "file-entry-cache": {
-      "version": "6.0.0",
-      "resolved": "https://registry.npmjs.org/file-entry-cache/-/file-entry-cache-6.0.0.tgz",
-      "integrity": "sha512-fqoO76jZ3ZnYrXLDRxBR1YvOvc0k844kcOg40bgsPrE25LAb/PDqTY+ho64Xh2c8ZXgIKldchCFHczG2UVRcWA==",
+      "version": "6.0.1",
+      "resolved": "https://registry.npmjs.org/file-entry-cache/-/file-entry-cache-6.0.1.tgz",
+      "integrity": "sha512-7Gps/XWymbLk2QLYK4NzpMOrYjMhdIxXuIvy2QBsLE6ljuodKvdkWs/cpyJJ3CVIVpH0Oi1Hvg1ovbMzLdFBBg==",
       "dev": true,
       "requires": {
         "flat-cache": "^3.0.4"
       }
     },
     "lodash": {
-      "version": "4.17.20",
-      "resolved": "https://registry.npmjs.org/lodash/-/lodash-4.17.20.tgz",
-      "integrity": "sha512-PlhdFcillOINfeV7Ni6oF1TAEayyZBoZ8bcshTHqOYJYlrqzRK5hagpagky5o4HfCzzd1TRkXPMFq6cKk9rGmA==",
+      "version": "4.17.21",
+      "resolved": "https://registry.npmjs.org/lodash/-/lodash-4.17.21.tgz",
+      "integrity": "sha512-v2kDEe57lecTulaDIuNTPy3Ry4gLGJ6Z1O3vE1krgXZNrsQ+LFTGHVxVjcXPs17LhbZVGedAJv8XZ1tvj5FvSg==",
       "dev": true
     },
     "lodash.flattendeep": {
           "integrity": "sha512-cyFDKrqc/YdcWFniJhzI42+AzS+gNwmUzOSFcRCQYwySuBBBy/KjuxWLZ/FHEH6Moq1NizMOBWyTcv8O4OZIMg==",
           "dev": true
         },
+        "lodash": {
+          "version": "4.17.20",
+          "resolved": "https://registry.npmjs.org/lodash/-/lodash-4.17.20.tgz",
+          "integrity": "sha512-PlhdFcillOINfeV7Ni6oF1TAEayyZBoZ8bcshTHqOYJYlrqzRK5hagpagky5o4HfCzzd1TRkXPMFq6cKk9rGmA==",
+          "dev": true
+        },
         "resolve": {
           "version": "1.19.0",
           "resolved": "https://registry.npmjs.org/resolve/-/resolve-1.19.0.tgz",
index 6cc415000d1b27c19483417ad3ce9013b2b5aa59..2bae50e71083ebde5075ddff357f78db1970a896 100644 (file)
@@ -9,6 +9,8 @@ export { DynamicClusterPool } from './pools/cluster/dynamic'
 export { FixedClusterPool } from './pools/cluster/fixed'
 export type { ClusterPoolOptions } from './pools/cluster/fixed'
 export type { IPool } from './pools/pool'
+export { WorkerChoiceStrategies } from './pools/selection-strategies'
+export type { WorkerChoiceStrategy } from './pools/selection-strategies'
 export { DynamicThreadPool } from './pools/thread/dynamic'
 export { FixedThreadPool } from './pools/thread/fixed'
 export type { ThreadWorkerWithMessageChannel } from './pools/thread/fixed'
index 0435a32c47067442af13cde6773a7831754e516f..7944606d9c84dbcc7dc32a9547c8a6e425b48e01 100644 (file)
@@ -1,12 +1,17 @@
-import EventEmitter from 'events'
 import type { MessageValue } from '../utility-types'
-import type { IPool } from './pool'
+import type { IPoolInternal } from './pool-internal'
+import { PoolEmitter } from './pool-internal'
+import type { WorkerChoiceStrategy } from './selection-strategies'
+import {
+  WorkerChoiceStrategies,
+  WorkerChoiceStrategyContext
+} from './selection-strategies'
 
 /**
  * An intentional empty function.
  */
-function emptyFunction () {
-  // intentionally left blank
+const EMPTY_FUNCTION: () => void = () => {
+  /* Intentionally empty */
 }
 
 /**
@@ -83,13 +88,12 @@ export interface PoolOptions<Worker> {
    * @see [Node events emitter.setMaxListeners(n)](https://nodejs.org/api/events.html#events_emitter_setmaxlisteners_n)
    */
   maxTasks?: number
+  /**
+   * The work choice strategy to use in this pool.
+   */
+  workerChoiceStrategy?: WorkerChoiceStrategy
 }
 
-/**
- * Internal poolifier pool emitter.
- */
-class PoolEmitter extends EventEmitter {}
-
 /**
  * Base class containing some shared logic for all poolifier pools.
  *
@@ -101,32 +105,14 @@ export abstract class AbstractPool<
   Worker extends IWorker,
   Data = unknown,
   Response = unknown
-> implements IPool<Data, Response> {
-  /**
-   * List of currently available workers.
-   */
+> implements IPoolInternal<Worker, Data, Response> {
+  /** @inheritdoc */
   public readonly workers: Worker[] = []
 
-  /**
-   * Index for the next worker.
-   */
-  public nextWorkerIndex: number = 0
-
-  /**
-   * The tasks map.
-   *
-   * - `key`: The `Worker`
-   * - `value`: Number of tasks currently in progress on the worker.
-   */
+  /** @inheritdoc */
   public readonly tasks: Map<Worker, number> = new Map<Worker, number>()
 
-  /**
-   * Emitter on which events can be listened to.
-   *
-   * Events that can currently be listened to:
-   *
-   * - `'FullPool'`
-   */
+  /** @inheritdoc */
   public readonly emitter: PoolEmitter
 
   /**
@@ -134,6 +120,17 @@ export abstract class AbstractPool<
    */
   protected nextMessageId: number = 0
 
+  /**
+   * Worker choice strategy instance implementing the worker choice algorithm.
+   *
+   * Default to a strategy implementing a round robin algorithm.
+   */
+  protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
+    Worker,
+    Data,
+    Response
+  >
+
   /**
    * Constructs a new poolifier pool.
    *
@@ -157,20 +154,33 @@ export abstract class AbstractPool<
     }
 
     this.emitter = new PoolEmitter()
+    this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext(
+      this,
+      opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
+    )
   }
 
-  private checkFilePath (filePath: string) {
+  private checkFilePath (filePath: string): void {
     if (!filePath) {
       throw new Error('Please specify a file with a worker implementation')
     }
   }
 
-  /**
-   * Perform the task specified in the constructor with the data parameter.
-   *
-   * @param data The input for the specified task. This can only be serializable data.
-   * @returns Promise that will be resolved when the task is successfully completed.
-   */
+  /** @inheritdoc */
+  public isDynamic (): boolean {
+    return false
+  }
+
+  /** @inheritdoc */
+  public setWorkerChoiceStrategy (
+    workerChoiceStrategy: WorkerChoiceStrategy
+  ): void {
+    this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
+      workerChoiceStrategy
+    )
+  }
+
+  /** @inheritdoc */
   public execute (data: Data): Promise<Response> {
     // Configure worker to handle message with the specified task
     const worker = this.chooseWorker()
@@ -181,19 +191,13 @@ export abstract class AbstractPool<
     return res
   }
 
-  /**
-   * Shut down every current worker in this pool.
-   */
+  /** @inheritdoc */
   public async destroy (): Promise<void> {
     await Promise.all(this.workers.map(worker => this.destroyWorker(worker)))
   }
 
-  /**
-   * Shut down given worker.
-   *
-   * @param worker A worker within `workers`.
-   */
-  protected abstract destroyWorker (worker: Worker): void | Promise<void>
+  /** @inheritdoc */
+  public abstract destroyWorker (worker: Worker): void | Promise<void>
 
   /**
    * Setup hook that can be overridden by a Poolifier pool implementation
@@ -232,7 +236,7 @@ export abstract class AbstractPool<
    * @param worker Worker whose tasks are set.
    * @param step Worker number of tasks step.
    */
-  private stepWorkerNumberOfTasks (worker: Worker, step: number) {
+  private stepWorkerNumberOfTasks (worker: Worker, step: number): void {
     const numberOfTasksInProgress = this.tasks.get(worker)
     if (numberOfTasksInProgress !== undefined) {
       this.tasks.set(worker, numberOfTasksInProgress + step)
@@ -261,12 +265,7 @@ export abstract class AbstractPool<
    * @returns Worker.
    */
   protected chooseWorker (): Worker {
-    const chosenWorker = this.workers[this.nextWorkerIndex]
-    this.nextWorkerIndex =
-      this.workers.length - 1 === this.nextWorkerIndex
-        ? 0
-        : this.nextWorkerIndex + 1
-    return chosenWorker
+    return this.workerChoiceStrategyContext.execute()
   }
 
   /**
@@ -280,7 +279,8 @@ export abstract class AbstractPool<
     message: MessageValue<Data>
   ): void
 
-  protected abstract registerWorkerMessageListener<
+  /** @inheritdoc */
+  public abstract registerWorkerMessageListener<
     Message extends Data | Response
   > (worker: Worker, listener: (message: MessageValue<Message>) => void): void
 
@@ -319,17 +319,13 @@ export abstract class AbstractPool<
    */
   protected abstract afterWorkerSetup (worker: Worker): void
 
-  /**
-   * Creates a new worker for this pool and sets it up completely.
-   *
-   * @returns New, completely set up worker.
-   */
-  protected createAndSetupWorker (): Worker {
+  /** @inheritdoc */
+  public createAndSetupWorker (): Worker {
     const worker: Worker = this.createWorker()
 
-    worker.on('error', this.opts.errorHandler ?? emptyFunction)
-    worker.on('online', this.opts.onlineHandler ?? emptyFunction)
-    worker.on('exit', this.opts.exitHandler ?? emptyFunction)
+    worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
+    worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
+    worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
     worker.once('exit', () => this.removeWorker(worker))
 
     this.workers.push(worker)
index 2a25f7b3c81eae02270259671dc1b8f8fa0a021b..5445bd03d4a4faf772610c3c966b4056231319de 100644 (file)
@@ -1,5 +1,3 @@
-import type { Worker } from 'cluster'
-import { isKillBehavior, KillBehaviors } from '../../worker/worker-options'
 import type { ClusterPoolOptions } from './fixed'
 import { FixedClusterPool } from './fixed'
 
@@ -36,40 +34,8 @@ export class DynamicClusterPool<
     super(min, filePath, opts)
   }
 
-  /**
-   * Choose a worker for the next task.
-   *
-   * It will first check for and return an idle worker.
-   * If all workers are busy, then it will try to create a new one up to the `max` worker count.
-   * If the max worker count is reached, the emitter will emit a `FullPool` event and it will fall back to using a round robin algorithm to distribute the load.
-   *
-   * @returns Cluster worker.
-   */
-  protected chooseWorker (): Worker {
-    for (const [worker, numberOfTasks] of this.tasks) {
-      if (numberOfTasks === 0) {
-        // A worker is free, use it
-        return worker
-      }
-    }
-
-    if (this.workers.length === this.max) {
-      this.emitter.emit('FullPool')
-      return super.chooseWorker()
-    }
-
-    // All workers are busy, create a new worker
-    const workerCreated = this.createAndSetupWorker()
-    this.registerWorkerMessageListener<Data>(workerCreated, message => {
-      const tasksInProgress = this.tasks.get(workerCreated)
-      if (
-        isKillBehavior(KillBehaviors.HARD, message.kill) ||
-        tasksInProgress === 0
-      ) {
-        // Kill received from the worker, means that no new tasks are submitted to that worker for a while ( > maxInactiveTime)
-        void this.destroyWorker(workerCreated)
-      }
-    })
-    return workerCreated
+  /** @inheritdoc */
+  public isDynamic (): boolean {
+    return true
   }
 }
index 616523cb2885964ef2fee6e4165aaf5dad979000..6246320e3072d18371646d431692abda06b85ea1 100644 (file)
@@ -58,7 +58,8 @@ export class FixedClusterPool<
     return isMaster
   }
 
-  protected destroyWorker (worker: Worker): void {
+  /** @inheritdoc */
+  public destroyWorker (worker: Worker): void {
     this.sendToWorker(worker, { kill: 1 })
     worker.kill()
   }
@@ -67,7 +68,8 @@ export class FixedClusterPool<
     worker.send(message)
   }
 
-  protected registerWorkerMessageListener<Message extends Data | Response> (
+  /** @inheritdoc */
+  public registerWorkerMessageListener<Message extends Data | Response> (
     worker: Worker,
     listener: (message: MessageValue<Message>) => void
   ): void {
@@ -86,7 +88,7 @@ export class FixedClusterPool<
   }
 
   protected afterWorkerSetup (worker: Worker): void {
-    // we will attach a listener for every task,
+    // We will attach a listener for every task,
     // when task is completed the listener will be removed but to avoid warnings we are increasing the max listeners size
     worker.setMaxListeners(this.opts.maxTasks ?? 1000)
   }
diff --git a/src/pools/pool-internal.ts b/src/pools/pool-internal.ts
new file mode 100644 (file)
index 0000000..ddee27e
--- /dev/null
@@ -0,0 +1,81 @@
+import EventEmitter from 'events'
+import type { MessageValue } from '../utility-types'
+import type { IWorker } from './abstract-pool'
+import type { IPool } from './pool'
+
+/**
+ * Internal poolifier pool emitter.
+ */
+export class PoolEmitter extends EventEmitter {}
+
+/**
+ * Internal contract definition for a poolifier pool.
+ *
+ * @template Worker Type of worker which manages this pool.
+ * @template Data Type of data sent to the worker.
+ * @template Response Type of response of execution.
+ */
+export interface IPoolInternal<
+  Worker extends IWorker,
+  Data = unknown,
+  Response = unknown
+> extends IPool<Data, Response> {
+  /**
+   * List of currently available workers.
+   */
+  readonly workers: Worker[]
+
+  /**
+   * The tasks map.
+   *
+   * - `key`: The `Worker`
+   * - `value`: Number of tasks currently in progress on the worker.
+   */
+  readonly tasks: Map<Worker, number>
+
+  /**
+   * Emitter on which events can be listened to.
+   *
+   * Events that can currently be listened to:
+   *
+   * - `'FullPool'`
+   */
+  readonly emitter: PoolEmitter
+
+  /**
+   * Maximum number of workers that can be created by this pool.
+   */
+  readonly max?: number
+
+  /**
+   * Whether the pool is dynamic or not.
+   *
+   * If it is dynamic, it provides the `max` property.
+   */
+  isDynamic(): boolean
+
+  /**
+   * Creates a new worker for this pool and sets it up completely.
+   *
+   * @returns New, completely set up worker.
+   */
+  createAndSetupWorker(): Worker
+
+  /**
+   * Shut down given worker.
+   *
+   * @param worker A worker within `workers`.
+   */
+  destroyWorker(worker: Worker): void | Promise<void>
+
+  /**
+   * Register a listener callback on a given worker.
+   *
+   * @param worker A worker.
+   * @param listener A message listener callback.
+   */
+  registerWorkerMessageListener<Message extends Data | Response>(
+    worker: Worker,
+    listener: (message: MessageValue<Message>) => void
+  ): void
+}
index f48d8843d9596f914f60ae1349ff258af7df4518..752045829214f76681f92d36fce414c581fd0c95 100644 (file)
@@ -1,3 +1,5 @@
+import type { WorkerChoiceStrategy } from './selection-strategies'
+
 /**
  * Contract definition for a poolifier pool.
  *
@@ -16,4 +18,10 @@ export interface IPool<Data = unknown, Response = unknown> {
    * Shut down every current worker in this pool.
    */
   destroy(): Promise<void>
+  /**
+   * Set the worker choice strategy in this pool.
+   *
+   * @param workerChoiceStrategy The worker choice strategy.
+   */
+  setWorkerChoiceStrategy(workerChoiceStrategy: WorkerChoiceStrategy): void
 }
diff --git a/src/pools/selection-strategies.ts b/src/pools/selection-strategies.ts
new file mode 100644 (file)
index 0000000..5c40a71
--- /dev/null
@@ -0,0 +1,272 @@
+import { isKillBehavior, KillBehaviors } from '../worker/worker-options'
+import type { IWorker } from './abstract-pool'
+import type { IPoolInternal } from './pool-internal'
+
+/**
+ * Enumeration of worker choice strategies.
+ */
+export const WorkerChoiceStrategies = Object.freeze({
+  /**
+   * Round robin worker selection strategy.
+   */
+  ROUND_ROBIN: 'ROUND_ROBIN',
+  /**
+   * Less recently used worker selection strategy.
+   */
+  LESS_RECENTLY_USED: 'LESS_RECENTLY_USED'
+} as const)
+
+/**
+ * Worker choice strategy.
+ */
+export type WorkerChoiceStrategy = keyof typeof WorkerChoiceStrategies
+
+/**
+ * Worker choice strategy interface.
+ *
+ * @template Worker Type of worker which manages the strategy.
+ */
+interface IWorkerChoiceStrategy<Worker extends IWorker> {
+  /**
+   * Choose a worker in the pool.
+   */
+  choose(): Worker
+}
+
+/**
+ * Selects the next worker in a round robin fashion.
+ *
+ * @template Worker Type of worker which manages the strategy.
+ * @template Data Type of data sent to the worker. This can only be serializable data.
+ * @template Response Type of response of execution. This can only be serializable data.
+ */
+class RoundRobinWorkerChoiceStrategy<Worker extends IWorker, Data, Response>
+  implements IWorkerChoiceStrategy<Worker> {
+  /**
+   * Index for the next worker.
+   */
+  private nextWorkerIndex: number = 0
+
+  /**
+   * Constructs a worker choice strategy that selects in a round robin fashion.
+   *
+   * @param pool The pool instance.
+   */
+  public constructor (
+    private readonly pool: IPoolInternal<Worker, Data, Response>
+  ) {}
+
+  /** @inheritdoc */
+  public choose (): Worker {
+    const chosenWorker = this.pool.workers[this.nextWorkerIndex]
+    this.nextWorkerIndex =
+      this.pool.workers.length - 1 === this.nextWorkerIndex
+        ? 0
+        : this.nextWorkerIndex + 1
+    return chosenWorker
+  }
+}
+
+/**
+ * Selects the less recently used worker.
+ *
+ * @template Worker Type of worker which manages the strategy.
+ * @template Data Type of data sent to the worker. This can only be serializable data.
+ * @template Response Type of response of execution. This can only be serializable data.
+ */
+class LessRecentlyUsedWorkerChoiceStrategy<
+  Worker extends IWorker,
+  Data,
+  Response
+> implements IWorkerChoiceStrategy<Worker> {
+  /**
+   * Constructs a worker choice strategy that selects based on less recently used.
+   *
+   * @param pool The pool instance.
+   */
+  public constructor (
+    private readonly pool: IPoolInternal<Worker, Data, Response>
+  ) {}
+
+  /** @inheritdoc */
+  public choose (): Worker {
+    let minNumberOfTasks = Infinity
+    // A worker is always found because it picks the one with fewer tasks
+    let lessRecentlyUsedWorker!: Worker
+    for (const [worker, numberOfTasks] of this.pool.tasks) {
+      if (numberOfTasks === 0) {
+        return worker
+      } else if (numberOfTasks < minNumberOfTasks) {
+        minNumberOfTasks = numberOfTasks
+        lessRecentlyUsedWorker = worker
+      }
+    }
+    return lessRecentlyUsedWorker
+  }
+}
+
+/**
+ * Get the worker choice strategy instance.
+ *
+ * @param pool The pool instance.
+ * @param workerChoiceStrategy The worker choice strategy.
+ * @returns The worker choice strategy instance.
+ */
+function getWorkerChoiceStrategy<Worker extends IWorker, Data, Response> (
+  pool: IPoolInternal<Worker, Data, Response>,
+  workerChoiceStrategy: WorkerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
+): IWorkerChoiceStrategy<Worker> {
+  switch (workerChoiceStrategy) {
+    case WorkerChoiceStrategies.ROUND_ROBIN:
+      return new RoundRobinWorkerChoiceStrategy(pool)
+    case WorkerChoiceStrategies.LESS_RECENTLY_USED:
+      return new LessRecentlyUsedWorkerChoiceStrategy(pool)
+    default:
+      throw new Error(
+        `Worker choice strategy '${workerChoiceStrategy}' not found`
+      )
+  }
+}
+
+/**
+ * Dynamically choose a worker.
+ *
+ * @template Worker Type of worker which manages the strategy.
+ * @template Data Type of data sent to the worker. This can only be serializable data.
+ * @template Response Type of response of execution. This can only be serializable data.
+ */
+class DynamicPoolWorkerChoiceStrategy<Worker extends IWorker, Data, Response>
+  implements IWorkerChoiceStrategy<Worker> {
+  private workerChoiceStrategy: IWorkerChoiceStrategy<Worker>
+
+  /**
+   * Constructs a worker choice strategy for dynamical pools.
+   *
+   * @param pool The pool instance.
+   * @param workerChoiceStrategy The worker choice strategy when the pull is full.
+   */
+  public constructor (
+    private readonly pool: IPoolInternal<Worker, Data, Response>,
+    workerChoiceStrategy: WorkerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
+  ) {
+    this.workerChoiceStrategy = getWorkerChoiceStrategy(
+      this.pool,
+      workerChoiceStrategy
+    )
+  }
+
+  /**
+   * Find a free worker based on number of tasks the worker has applied.
+   *
+   * If a worker was found that has `0` tasks, it is detected as free and will be returned.
+   *
+   * If no free worker was found, `null` will be returned.
+   *
+   * @returns A free worker if there was one, otherwise `null`.
+   */
+  private findFreeWorkerBasedOnTasks (): Worker | null {
+    for (const [worker, numberOfTasks] of this.pool.tasks) {
+      if (numberOfTasks === 0) {
+        // A worker is free, use it
+        return worker
+      }
+    }
+    return null
+  }
+
+  /** @inheritdoc */
+  public choose (): Worker {
+    const freeWorker = this.findFreeWorkerBasedOnTasks()
+    if (freeWorker) {
+      return freeWorker
+    }
+
+    if (this.pool.workers.length === this.pool.max) {
+      this.pool.emitter.emit('FullPool')
+      return this.workerChoiceStrategy.choose()
+    }
+
+    // All workers are busy, create a new worker
+    const workerCreated = this.pool.createAndSetupWorker()
+    this.pool.registerWorkerMessageListener(workerCreated, message => {
+      const tasksInProgress = this.pool.tasks.get(workerCreated)
+      if (
+        isKillBehavior(KillBehaviors.HARD, message.kill) ||
+        tasksInProgress === 0
+      ) {
+        // Kill received from the worker, means that no new tasks are submitted to that worker for a while ( > maxInactiveTime)
+        void this.pool.destroyWorker(workerCreated)
+      }
+    })
+    return workerCreated
+  }
+}
+
+/**
+ * The worker choice strategy context.
+ *
+ * @template Worker Type of worker.
+ * @template Data Type of data sent to the worker. This can only be serializable data.
+ * @template Response Type of response of execution. This can only be serializable data.
+ */
+export class WorkerChoiceStrategyContext<
+  Worker extends IWorker,
+  Data,
+  Response
+> {
+  // Will be set by setter in constructor
+  private workerChoiceStrategy!: IWorkerChoiceStrategy<Worker>
+
+  /**
+   * Worker choice strategy context constructor.
+   *
+   * @param pool The pool instance.
+   * @param workerChoiceStrategy The worker choice strategy.
+   */
+  public constructor (
+    private readonly pool: IPoolInternal<Worker, Data, Response>,
+    workerChoiceStrategy: WorkerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
+  ) {
+    this.setWorkerChoiceStrategy(workerChoiceStrategy)
+  }
+
+  /**
+   * Get the worker choice strategy instance specific to the pool type.
+   *
+   * @param workerChoiceStrategy The worker choice strategy.
+   * @returns The worker choice strategy instance for the pool type.
+   */
+  private getPoolWorkerChoiceStrategy (
+    workerChoiceStrategy: WorkerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
+  ): IWorkerChoiceStrategy<Worker> {
+    if (this.pool.isDynamic()) {
+      return new DynamicPoolWorkerChoiceStrategy(
+        this.pool,
+        workerChoiceStrategy
+      )
+    }
+    return getWorkerChoiceStrategy(this.pool, workerChoiceStrategy)
+  }
+
+  /**
+   * Set the worker choice strategy to use in the context.
+   *
+   * @param workerChoiceStrategy The worker choice strategy to set.
+   */
+  public setWorkerChoiceStrategy (
+    workerChoiceStrategy: WorkerChoiceStrategy
+  ): void {
+    this.workerChoiceStrategy = this.getPoolWorkerChoiceStrategy(
+      workerChoiceStrategy
+    )
+  }
+
+  /**
+   * Choose a worker with the underlying selection strategy.
+   *
+   * @returns The chosen one.
+   */
+  public execute (): Worker {
+    return this.workerChoiceStrategy.choose()
+  }
+}
index 3e74a4ea4b7fe053445f3c204104b13f28e43e65..fcf4b2ff4652b58b3050adbd46153d4b9ca2c10c 100644 (file)
@@ -1,4 +1,3 @@
-import { isKillBehavior, KillBehaviors } from '../../worker/worker-options'
 import type { PoolOptions } from '../abstract-pool'
 import type { ThreadWorkerWithMessageChannel } from './fixed'
 import { FixedThreadPool } from './fixed'
@@ -36,40 +35,8 @@ export class DynamicThreadPool<
     super(min, filePath, opts)
   }
 
-  /**
-   * Choose a thread for the next task.
-   *
-   * It will first check for and return an idle thread.
-   * If all threads are busy, then it will try to create a new one up to the `max` thread count.
-   * If the max thread count is reached, the emitter will emit a `FullPool` event and it will fall back to using a round robin algorithm to distribute the load.
-   *
-   * @returns Thread worker.
-   */
-  protected chooseWorker (): ThreadWorkerWithMessageChannel {
-    for (const [worker, numberOfTasks] of this.tasks) {
-      if (numberOfTasks === 0) {
-        // A worker is free, use it
-        return worker
-      }
-    }
-
-    if (this.workers.length === this.max) {
-      this.emitter.emit('FullPool')
-      return super.chooseWorker()
-    }
-
-    // All workers are busy, create a new worker
-    const workerCreated = this.createAndSetupWorker()
-    this.registerWorkerMessageListener<Data>(workerCreated, message => {
-      const tasksInProgress = this.tasks.get(workerCreated)
-      if (
-        isKillBehavior(KillBehaviors.HARD, message.kill) ||
-        tasksInProgress === 0
-      ) {
-        // Kill received from the worker, means that no new tasks are submitted to that worker for a while ( > maxInactiveTime)
-        void this.destroyWorker(workerCreated)
-      }
-    })
-    return workerCreated
+  /** @inheritdoc */
+  public isDynamic (): boolean {
+    return true
   }
 }
index a61c10b2346c8a44c4094605dfe6aa25787b8f09..021de492540f50884a8e554d570f5194061212d0 100644 (file)
@@ -44,7 +44,8 @@ export class FixedThreadPool<
     return isMainThread
   }
 
-  protected async destroyWorker (
+  /** @inheritdoc */
+  public async destroyWorker (
     worker: ThreadWorkerWithMessageChannel
   ): Promise<void> {
     this.sendToWorker(worker, { kill: 1 })
@@ -58,7 +59,8 @@ export class FixedThreadPool<
     worker.postMessage(message)
   }
 
-  protected registerWorkerMessageListener<Message extends Data | Response> (
+  /** @inheritdoc */
+  public registerWorkerMessageListener<Message extends Data | Response> (
     messageChannel: ThreadWorkerWithMessageChannel,
     listener: (message: MessageValue<Message>) => void
   ): void {
@@ -83,7 +85,7 @@ export class FixedThreadPool<
     worker.postMessage({ parent: port1 }, [port1])
     worker.port1 = port1
     worker.port2 = port2
-    // we will attach a listener for every task,
+    // We will attach a listener for every task,
     // when task is completed the listener will be removed but to avoid warnings we are increasing the max listeners size
     worker.port2.setMaxListeners(this.opts.maxTasks ?? 1000)
   }
index d41115bc390b26f63eeaa9ae4500d678cb1bd7b4..4f741b47acf134146069e33d866b8f40a09e0bb1 100644 (file)
@@ -101,7 +101,7 @@ export abstract class AbstractWorker<
    *
    * @param fn The function that should be defined.
    */
-  private checkFunctionInput (fn: (data: Data) => Response) {
+  private checkFunctionInput (fn: (data: Data) => Response): void {
     if (!fn) throw new Error('fn parameter is mandatory')
   }
 
index bc2385f416c6c23b38d88fcccc6110cd70deaa5b..fe6aad35bd90a0d0d8fb16bea8f57760da3e6e42 100644 (file)
@@ -14,7 +14,7 @@ class StubPoolWithIsMainMethod extends FixedThreadPool {
   }
 }
 
-describe('Abstract pool test suite ', () => {
+describe('Abstract pool test suite', () => {
   it('Simulate worker not found during increaseWorkersTask', () => {
     const pool = new StubPoolWithTasksMapClear(
       1,
index 2cff475f99cd6b0368b216a9f45931fd7050fed6..53a5f8d0b232e18ac26effb3ccfe466297f5458a 100644 (file)
@@ -12,7 +12,7 @@ const pool = new DynamicClusterPool(
   }
 )
 
-describe('Dynamic cluster pool test suite ', () => {
+describe('Dynamic cluster pool test suite', () => {
   it('Verify that the function is executed in a worker cluster', async () => {
     const result = await pool.execute({ test: 'test' })
     expect(result).toBeDefined()
index 63399c6e4629ba4326080153cf5448ae3f2d7f40..ef44e6f15849d2b3fcf231cc7138bf908b2980ca 100644 (file)
@@ -41,7 +41,7 @@ const asyncPool = new FixedClusterPool(
   }
 )
 
-describe('Fixed cluster pool test suite ', () => {
+describe('Fixed cluster pool test suite', () => {
   after('Destroy all pools', async () => {
     // We need to clean up the resources after our test
     await echoPool.destroy()
diff --git a/tests/pools/selection-strategies.test.js b/tests/pools/selection-strategies.test.js
new file mode 100644 (file)
index 0000000..186dee7
--- /dev/null
@@ -0,0 +1,56 @@
+const expect = require('expect')
+const {
+  WorkerChoiceStrategies,
+  DynamicThreadPool,
+  FixedThreadPool
+} = require('../../lib/index')
+const TestUtils = require('../test-utils')
+
+describe('Selection strategies test suite', () => {
+  it('Verify that WorkerChoiceStrategies enumeration provides string values', () => {
+    expect(WorkerChoiceStrategies.ROUND_ROBIN).toBe('ROUND_ROBIN')
+    expect(WorkerChoiceStrategies.LESS_RECENTLY_USED).toBe('LESS_RECENTLY_USED')
+  })
+
+  it('Verify LESS_RECENTLY_USED is taken', async () => {
+    const max = 3
+    const pool = new FixedThreadPool(
+      max,
+      './tests/worker-files/thread/testWorker.js',
+      { workerChoiceStrategy: WorkerChoiceStrategies.LESS_RECENTLY_USED }
+    )
+
+    expect(pool.opts.workerChoiceStrategy).toBe(
+      WorkerChoiceStrategies.LESS_RECENTLY_USED
+    )
+
+    // TODO: Create a better test to cover `LessRecentlyUsedWorkerChoiceStrategy#choose`
+    const promises = []
+    for (let i = 0; i < max * 2; i++) {
+      promises.push(pool.execute({ test: 'test' }))
+    }
+    await Promise.all(promises)
+
+    // We need to clean up the resources after our test
+    await pool.destroy()
+  })
+
+  it('Verify unknown strategies throw error', () => {
+    const min = 1
+    const max = 3
+    expect(
+      () =>
+        new DynamicThreadPool(
+          min,
+          max,
+          './tests/worker-files/thread/testWorker.js',
+          {
+            maxTasks: 1000,
+            workerChoiceStrategy: 'UNKNOWN_STRATEGY'
+          }
+        )
+    ).toThrowError(
+      new Error("Worker choice strategy 'UNKNOWN_STRATEGY' not found")
+    )
+  })
+})
index 66aeaa3a20a9ebf870c67a2c8d99d5c46181f338..3a162267189d066c1beeae0aae6773ccc9b39106 100644 (file)
@@ -12,7 +12,7 @@ const pool = new DynamicThreadPool(
   }
 )
 
-describe('Dynamic thread pool test suite ', () => {
+describe('Dynamic thread pool test suite', () => {
   it('Verify that the function is executed in a worker thread', async () => {
     const result = await pool.execute({ test: 'test' })
     expect(result).toBeDefined()
index 2a7d005b3dcd6d88a53e4e53e1d2216c55483de4..881813c0d9512fe9acdb7aac623e1a1f056e90c9 100644 (file)
@@ -32,7 +32,7 @@ const asyncPool = new FixedThreadPool(
   { maxTasks: maxTasks }
 )
 
-describe('Fixed thread pool test suite ', () => {
+describe('Fixed thread pool test suite', () => {
   it('Choose worker round robin test', async () => {
     const results = new Set()
     for (let i = 0; i < numberOfThreads; i++) {