Add dynamic worker choice strategy change at runtime
authorJérôme Benoit <jerome.benoit@sap.com>
Sat, 8 Oct 2022 10:24:16 +0000 (12:24 +0200)
committerJérôme Benoit <jerome.benoit@sap.com>
Sat, 8 Oct 2022 10:24:16 +0000 (12:24 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>
25 files changed:
.eslintrc.js
CHANGELOG.md
benchmarks/internal/benchmark-utils.js
src/index.ts
src/pools/abstract-pool-worker.ts [new file with mode: 0644]
src/pools/abstract-pool.ts
src/pools/cluster/fixed.ts
src/pools/pool-internal.ts
src/pools/pool-worker.ts [new file with mode: 0644]
src/pools/pool.ts
src/pools/selection-strategies.ts [deleted file]
src/pools/selection-strategies/abstract-worker-choice-strategy.ts [new file with mode: 0644]
src/pools/selection-strategies/dynamic-pool-worker-choice-strategy.ts [new file with mode: 0644]
src/pools/selection-strategies/less-recently-used-worker-choice-strategy.ts [new file with mode: 0644]
src/pools/selection-strategies/round-robin-worker-choice-strategy.ts [new file with mode: 0644]
src/pools/selection-strategies/selection-strategies-types.ts [new file with mode: 0644]
src/pools/selection-strategies/selection-strategies-utils.ts [new file with mode: 0644]
src/pools/selection-strategies/worker-choice-strategy-context.ts [new file with mode: 0644]
src/pools/thread/dynamic.ts
src/pools/thread/fixed.ts
src/utility-types.ts
tests/pools/selection-strategies.test.js
tests/pools/thread/dynamic.test.js
tests/test-utils.js
tsconfig.json

index fe04926fd46565df433f78e3c01a083283bdbc85..ca01b306546da789593b9de39c479763bacf7640 100644 (file)
@@ -35,6 +35,7 @@ module.exports = defineConfig({
           'comparator',
           'ecma',
           'enum',
+          'fibonacci',
           'inheritdoc',
           'jsdoc',
           'poolifier',
index cbaf1909658c1367adc6e232631020143943abde..da16390ff5c2990fa9faaf78546d0562e9a66e05 100644 (file)
@@ -7,7 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
 
 ## [2.2.1] - 2022-05-01
 
--
+### Added
+
+- Dynamic worker choice strategy change at runtime.
 
 ## [2.2.0] - 2022-05-01
 
index 5f0c643b0c189a9d6d232deae5e8c0effe9cbae8..debe0b1cea518fd43a53295322a314e6736c5999 100644 (file)
@@ -37,7 +37,7 @@ function generateRandomInteger (max, min = 0) {
 /**
  * Intentionally inefficient implementation.
  *
- * @param {*} n
+ * @param {number} n
  * @returns {number}
  */
 function fibonacci (n) {
index 2bae50e71083ebde5075ddff357f78db1970a896..b0c419ca89e4f6d1838ffe93368d26e3d292d77c 100644 (file)
@@ -1,16 +1,15 @@
-export type {
-  ErrorHandler,
-  ExitHandler,
-  IWorker,
-  OnlineHandler,
-  PoolOptions
-} from './pools/abstract-pool'
 export { DynamicClusterPool } from './pools/cluster/dynamic'
 export { FixedClusterPool } from './pools/cluster/fixed'
 export type { ClusterPoolOptions } from './pools/cluster/fixed'
-export type { IPool } from './pools/pool'
-export { WorkerChoiceStrategies } from './pools/selection-strategies'
-export type { WorkerChoiceStrategy } from './pools/selection-strategies'
+export type { IPool, PoolOptions } from './pools/pool'
+export type {
+  ErrorHandler,
+  ExitHandler,
+  IPoolWorker,
+  OnlineHandler
+} from './pools/pool-worker'
+export { WorkerChoiceStrategies } from './pools/selection-strategies/selection-strategies-types'
+export type { WorkerChoiceStrategy } from './pools/selection-strategies/selection-strategies-types'
 export { DynamicThreadPool } from './pools/thread/dynamic'
 export { FixedThreadPool } from './pools/thread/fixed'
 export type { ThreadWorkerWithMessageChannel } from './pools/thread/fixed'
diff --git a/src/pools/abstract-pool-worker.ts b/src/pools/abstract-pool-worker.ts
new file mode 100644 (file)
index 0000000..f591b24
--- /dev/null
@@ -0,0 +1,23 @@
+import type {
+  ErrorHandler,
+  ExitHandler,
+  IPoolWorker,
+  MessageHandler,
+  OnlineHandler
+} from './pool-worker'
+
+/**
+ * Basic class that implement the minimum required for a pool worker.
+ */
+export abstract class AbstractPoolWorker implements IPoolWorker {
+  /** @inheritdoc  */
+  abstract on (event: 'message', handler: MessageHandler<this>): void
+  /** @inheritdoc  */
+  abstract on (event: 'error', handler: ErrorHandler<this>): void
+  /** @inheritdoc  */
+  abstract on (event: 'online', handler: OnlineHandler<this>): void
+  /** @inheritdoc  */
+  abstract on (event: 'exit', handler: ExitHandler<this>): void
+  /** @inheritdoc  */
+  abstract once (event: 'exit', handler: ExitHandler<this>): void
+}
index 71046d494db7770ade3339d46edcde5d27291efe..928b24c65832a38ef625b7b07d498ce9f3ad504b 100644 (file)
@@ -4,106 +4,15 @@ import type {
 } from '../utility-types'
 import { EMPTY_FUNCTION } from '../utils'
 import { isKillBehavior, KillBehaviors } from '../worker/worker-options'
+import type { AbstractPoolWorker } from './abstract-pool-worker'
+import type { PoolOptions } from './pool'
 import type { IPoolInternal } from './pool-internal'
 import { PoolEmitter, PoolType } from './pool-internal'
-import type { WorkerChoiceStrategy } from './selection-strategies'
 import {
   WorkerChoiceStrategies,
-  WorkerChoiceStrategyContext
-} from './selection-strategies'
-
-/**
- * Callback invoked if the worker has received a message.
- */
-export type MessageHandler<Worker> = (this: Worker, m: unknown) => void
-
-/**
- * Callback invoked if the worker raised an error.
- */
-export type ErrorHandler<Worker> = (this: Worker, e: Error) => void
-
-/**
- * Callback invoked when the worker has started successfully.
- */
-export type OnlineHandler<Worker> = (this: Worker) => void
-
-/**
- * Callback invoked when the worker exits successfully.
- */
-export type ExitHandler<Worker> = (this: Worker, code: number) => void
-
-/**
- * Basic interface that describes the minimum required implementation of listener events for a pool-worker.
- */
-export interface IWorker {
-  /**
-   * Register a listener to the message event.
-   *
-   * @param event `'message'`.
-   * @param handler The message handler.
-   */
-  on(event: 'message', handler: MessageHandler<this>): void
-  /**
-   * Register a listener to the error event.
-   *
-   * @param event `'error'`.
-   * @param handler The error handler.
-   */
-  on(event: 'error', handler: ErrorHandler<this>): void
-  /**
-   * Register a listener to the online event.
-   *
-   * @param event `'online'`.
-   * @param handler The online handler.
-   */
-  on(event: 'online', handler: OnlineHandler<this>): void
-  /**
-   * Register a listener to the exit event.
-   *
-   * @param event `'exit'`.
-   * @param handler The exit handler.
-   */
-  on(event: 'exit', handler: ExitHandler<this>): void
-  /**
-   * Register a listener to the exit event that will only performed once.
-   *
-   * @param event `'exit'`.
-   * @param handler The exit handler.
-   */
-  once(event: 'exit', handler: ExitHandler<this>): void
-}
-
-/**
- * Options for a poolifier pool.
- */
-export interface PoolOptions<Worker> {
-  /**
-   * A function that will listen for message event on each worker.
-   */
-  messageHandler?: MessageHandler<Worker>
-  /**
-   * A function that will listen for error event on each worker.
-   */
-  errorHandler?: ErrorHandler<Worker>
-  /**
-   * A function that will listen for online event on each worker.
-   */
-  onlineHandler?: OnlineHandler<Worker>
-  /**
-   * A function that will listen for exit event on each worker.
-   */
-  exitHandler?: ExitHandler<Worker>
-  /**
-   * The work choice strategy to use in this pool.
-   */
-  workerChoiceStrategy?: WorkerChoiceStrategy
-  /**
-   * Pool events emission.
-   *
-   * @default true
-   */
-  enableEvents?: boolean
-}
+  WorkerChoiceStrategy
+} from './selection-strategies/selection-strategies-types'
+import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
 
 /**
  * Base class containing some shared logic for all poolifier pools.
@@ -113,7 +22,7 @@ export interface PoolOptions<Worker> {
  * @template Response Type of response of execution. This can only be serializable data.
  */
 export abstract class AbstractPool<
-  Worker extends IWorker,
+  Worker extends AbstractPoolWorker,
   Data = unknown,
   Response = unknown
 > implements IPoolInternal<Worker, Data, Response> {
@@ -192,7 +101,7 @@ export abstract class AbstractPool<
         this.registerWorkerMessageListener(workerCreated, message => {
           if (
             isKillBehavior(KillBehaviors.HARD, message.kill) ||
-            this.tasks.get(workerCreated) === 0
+            this.getWorkerRunningTasks(workerCreated) === 0
           ) {
             // Kill received from the worker, means that no new tasks are submitted to that worker for a while ( > maxInactiveTime)
             this.destroyWorker(workerCreated) as void
@@ -242,6 +151,16 @@ export abstract class AbstractPool<
     return this.promiseMap.size
   }
 
+  /** @inheritdoc */
+  public getWorkerRunningTasks (worker: Worker): number | undefined {
+    return this.tasks.get(worker)
+  }
+
+  /** @inheritdoc */
+  public getWorkerIndex (worker: Worker): number {
+    return this.workers.indexOf(worker)
+  }
+
   /** @inheritdoc */
   public setWorkerChoiceStrategy (
     workerChoiceStrategy: WorkerChoiceStrategy
@@ -258,16 +177,16 @@ export abstract class AbstractPool<
   protected internalGetBusyStatus (): boolean {
     return (
       this.numberOfRunningTasks >= this.numberOfWorkers &&
-      this.findFreeTasksMapEntry() === false
+      this.findFreeWorker() === false
     )
   }
 
   /** @inheritdoc */
-  public findFreeTasksMapEntry (): [Worker, number] | false {
-    for (const [worker, numberOfTasks] of this.tasks) {
-      if (numberOfTasks === 0) {
-        // A worker is free, return the matching tasks map entry
-        return [worker, numberOfTasks]
+  public findFreeWorker (): Worker | false {
+    for (const worker of this.workers) {
+      if (this.getWorkerRunningTasks(worker) === 0) {
+        // A worker is free, return the matching worker
+        return worker
       }
     }
     return false
@@ -350,8 +269,7 @@ export abstract class AbstractPool<
    */
   protected removeWorker (worker: Worker): void {
     // Clean worker from data structure
-    const workerIndex = this.workers.indexOf(worker)
-    this.workers.splice(workerIndex, 1)
+    this.workers.splice(this.getWorkerIndex(worker), 1)
     this.tasks.delete(worker)
   }
 
index 9d162d8983a7bdce2a68b19e040392be40e83cd4..971116bc27cfbd3cd470faa83a904a3949984920 100644 (file)
@@ -1,8 +1,8 @@
 import type { Worker } from 'cluster'
 import cluster from 'cluster'
 import type { MessageValue } from '../../utility-types'
-import type { PoolOptions } from '../abstract-pool'
 import { AbstractPool } from '../abstract-pool'
+import type { PoolOptions } from '../pool'
 import { PoolType } from '../pool-internal'
 
 /**
index dd71ae71045b555d99b64a24bba516461c77b561..71120110f75bfb48c6e005876b8b7715e53f53fb 100644 (file)
@@ -1,5 +1,5 @@
 import EventEmitter from 'events'
-import type { IWorker } from './abstract-pool'
+import type { AbstractPoolWorker } from './abstract-pool-worker'
 import type { IPool } from './pool'
 
 /**
@@ -23,7 +23,7 @@ export class PoolEmitter extends EventEmitter {}
  * @template Response Type of response of execution.
  */
 export interface IPoolInternal<
-  Worker extends IWorker,
+  Worker extends AbstractPoolWorker,
   Data = unknown,
   Response = unknown
 > extends IPool<Data, Response> {
@@ -74,13 +74,29 @@ export interface IPoolInternal<
   readonly numberOfRunningTasks: number
 
   /**
-   * Find a tasks map entry with a free worker based on the number of tasks the worker has applied.
+   * Find a free worker based on the number of tasks the worker has applied.
    *
-   * If an entry is found with a worker that has `0` tasks, it is detected as free.
+   * If a worker is found with `0` running tasks, it is detected as free and returned.
    *
-   * If no tasks map entry with a free worker was found, `false` will be returned.
+   * If no free worker is found, `false` is returned.
    *
-   * @returns A tasks map entry with a free worker if there was one, otherwise `false`.
+   * @returns A free worker if there is one, otherwise `false`.
    */
-  findFreeTasksMapEntry(): [Worker, number] | false
+  findFreeWorker(): Worker | false
+
+  /**
+   * Get worker index.
+   *
+   * @param worker The worker.
+   * @returns The worker index.
+   */
+  getWorkerIndex(worker: Worker): number
+
+  /**
+   * Get worker running tasks.
+   *
+   * @param worker The worker.
+   * @returns The number of tasks currently running on the worker.
+   */
+  getWorkerRunningTasks(worker: Worker): number | undefined
 }
diff --git a/src/pools/pool-worker.ts b/src/pools/pool-worker.ts
new file mode 100644 (file)
index 0000000..d46eda1
--- /dev/null
@@ -0,0 +1,73 @@
+import type { Worker as ClusterWorker } from 'cluster'
+import type { Worker as WorkerThread } from 'worker_threads'
+import type { Draft } from '../utility-types'
+
+/**
+ * Poolifier supported worker type.
+ */
+export type WorkerType = WorkerThread & ClusterWorker & Draft<MessageChannel>
+
+/**
+ * Callback invoked if the worker has received a message.
+ */
+export type MessageHandler<Worker> = (this: Worker, m: unknown) => void
+
+/**
+ * Callback invoked if the worker raised an error.
+ */
+export type ErrorHandler<Worker> = (this: Worker, e: Error) => void
+
+/**
+ * Callback invoked when the worker has started successfully.
+ */
+export type OnlineHandler<Worker> = (this: Worker) => void
+
+/**
+ * Callback invoked when the worker exits successfully.
+ */
+export type ExitHandler<Worker> = (this: Worker, code: number) => void
+
+/**
+ * Basic interface that describes the minimum required implementation of listener events for a pool worker.
+ */
+export interface IPoolWorker {
+  /**
+   * Worker identifier.
+   */
+  readonly id?: number
+  /**
+   * Register a listener to the message event.
+   *
+   * @param event `'message'`.
+   * @param handler The message handler.
+   */
+  on(event: 'message', handler: MessageHandler<this>): void
+  /**
+   * Register a listener to the error event.
+   *
+   * @param event `'error'`.
+   * @param handler The error handler.
+   */
+  on(event: 'error', handler: ErrorHandler<this>): void
+  /**
+   * Register a listener to the online event.
+   *
+   * @param event `'online'`.
+   * @param handler The online handler.
+   */
+  on(event: 'online', handler: OnlineHandler<this>): void
+  /**
+   * Register a listener to the exit event.
+   *
+   * @param event `'exit'`.
+   * @param handler The exit handler.
+   */
+  on(event: 'exit', handler: ExitHandler<this>): void
+  /**
+   * Register a listener to the exit event that will only performed once.
+   *
+   * @param event `'exit'`.
+   * @param handler The exit handler.
+   */
+  once(event: 'exit', handler: ExitHandler<this>): void
+}
index 752045829214f76681f92d36fce414c581fd0c95..8da89156ecef7869dd1d8a4e5fe5096a168e8280 100644 (file)
@@ -1,4 +1,42 @@
-import type { WorkerChoiceStrategy } from './selection-strategies'
+import type {
+  ErrorHandler,
+  ExitHandler,
+  MessageHandler,
+  OnlineHandler
+} from './pool-worker'
+import type { WorkerChoiceStrategy } from './selection-strategies/selection-strategies-types'
+
+/**
+ * Options for a poolifier pool.
+ */
+export interface PoolOptions<Worker> {
+  /**
+   * A function that will listen for message event on each worker.
+   */
+  messageHandler?: MessageHandler<Worker>
+  /**
+   * A function that will listen for error event on each worker.
+   */
+  errorHandler?: ErrorHandler<Worker>
+  /**
+   * A function that will listen for online event on each worker.
+   */
+  onlineHandler?: OnlineHandler<Worker>
+  /**
+   * A function that will listen for exit event on each worker.
+   */
+  exitHandler?: ExitHandler<Worker>
+  /**
+   * The work choice strategy to use in this pool.
+   */
+  workerChoiceStrategy?: WorkerChoiceStrategy
+  /**
+   * Pool events emission.
+   *
+   * @default true
+   */
+  enableEvents?: boolean
+}
 
 /**
  * Contract definition for a poolifier pool.
diff --git a/src/pools/selection-strategies.ts b/src/pools/selection-strategies.ts
deleted file mode 100644 (file)
index f1c20be..0000000
+++ /dev/null
@@ -1,260 +0,0 @@
-import type { IWorker } from './abstract-pool'
-import type { IPoolInternal } from './pool-internal'
-import { PoolType } from './pool-internal'
-
-/**
- * Enumeration of worker choice strategies.
- */
-export const WorkerChoiceStrategies = Object.freeze({
-  /**
-   * Round robin worker selection strategy.
-   */
-  ROUND_ROBIN: 'ROUND_ROBIN',
-  /**
-   * Less recently used worker selection strategy.
-   */
-  LESS_RECENTLY_USED: 'LESS_RECENTLY_USED'
-} as const)
-
-/**
- * Worker choice strategy.
- */
-export type WorkerChoiceStrategy = keyof typeof WorkerChoiceStrategies
-
-/**
- * Worker choice strategy interface.
- *
- * @template Worker Type of worker which manages the strategy.
- */
-interface IWorkerChoiceStrategy<Worker extends IWorker> {
-  /**
-   * Choose a worker in the pool.
-   */
-  choose(): Worker
-}
-
-/**
- * Selects the next worker in a round robin fashion.
- *
- * @template Worker Type of worker which manages the strategy.
- * @template Data Type of data sent to the worker. This can only be serializable data.
- * @template Response Type of response of execution. This can only be serializable data.
- */
-class RoundRobinWorkerChoiceStrategy<Worker extends IWorker, Data, Response>
-  implements IWorkerChoiceStrategy<Worker> {
-  /**
-   * Index for the next worker.
-   */
-  private nextWorkerIndex: number = 0
-
-  /**
-   * Constructs a worker choice strategy that selects in a round robin fashion.
-   *
-   * @param pool The pool instance.
-   */
-  public constructor (
-    private readonly pool: IPoolInternal<Worker, Data, Response>
-  ) {}
-
-  /** @inheritdoc */
-  public choose (): Worker {
-    const chosenWorker = this.pool.workers[this.nextWorkerIndex]
-    this.nextWorkerIndex =
-      this.nextWorkerIndex === this.pool.workers.length - 1
-        ? 0
-        : this.nextWorkerIndex + 1
-    return chosenWorker
-  }
-}
-
-/**
- * Selects the less recently used worker.
- *
- * @template Worker Type of worker which manages the strategy.
- * @template Data Type of data sent to the worker. This can only be serializable data.
- * @template Response Type of response of execution. This can only be serializable data.
- */
-class LessRecentlyUsedWorkerChoiceStrategy<
-  Worker extends IWorker,
-  Data,
-  Response
-> implements IWorkerChoiceStrategy<Worker> {
-  /**
-   * Constructs a worker choice strategy that selects based on less recently used.
-   *
-   * @param pool The pool instance.
-   */
-  public constructor (
-    private readonly pool: IPoolInternal<Worker, Data, Response>
-  ) {}
-
-  /** @inheritdoc */
-  public choose (): Worker {
-    const isPoolDynamic = this.pool.type === PoolType.DYNAMIC
-    let minNumberOfTasks = Infinity
-    // A worker is always found because it picks the one with fewer tasks
-    let lessRecentlyUsedWorker!: Worker
-    for (const [worker, numberOfTasks] of this.pool.tasks) {
-      if (!isPoolDynamic && numberOfTasks === 0) {
-        return worker
-      } else if (numberOfTasks < minNumberOfTasks) {
-        lessRecentlyUsedWorker = worker
-        minNumberOfTasks = numberOfTasks
-      }
-    }
-    return lessRecentlyUsedWorker
-  }
-}
-
-/**
- * Dynamically choose a worker.
- *
- * @template Worker Type of worker which manages the strategy.
- * @template Data Type of data sent to the worker. This can only be serializable data.
- * @template Response Type of response of execution. This can only be serializable data.
- */
-class DynamicPoolWorkerChoiceStrategy<Worker extends IWorker, Data, Response>
-  implements IWorkerChoiceStrategy<Worker> {
-  private workerChoiceStrategy: IWorkerChoiceStrategy<Worker>
-
-  /**
-   * Constructs a worker choice strategy for dynamical pools.
-   *
-   * @param pool The pool instance.
-   * @param createDynamicallyWorkerCallback The worker creation callback for dynamic pool.
-   * @param workerChoiceStrategy The worker choice strategy when the pull is busy.
-   */
-  public constructor (
-    private readonly pool: IPoolInternal<Worker, Data, Response>,
-    private createDynamicallyWorkerCallback: () => Worker,
-    workerChoiceStrategy: WorkerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
-  ) {
-    this.workerChoiceStrategy = SelectionStrategiesUtils.getWorkerChoiceStrategy(
-      this.pool,
-      workerChoiceStrategy
-    )
-  }
-
-  /** @inheritdoc */
-  public choose (): Worker {
-    const freeTaskMapEntry = this.pool.findFreeTasksMapEntry()
-    if (freeTaskMapEntry) {
-      return freeTaskMapEntry[0]
-    }
-
-    if (this.pool.busy) {
-      return this.workerChoiceStrategy.choose()
-    }
-
-    // All workers are busy, create a new worker
-    return this.createDynamicallyWorkerCallback()
-  }
-}
-
-/**
- * The worker choice strategy context.
- *
- * @template Worker Type of worker.
- * @template Data Type of data sent to the worker. This can only be serializable data.
- * @template Response Type of response of execution. This can only be serializable data.
- */
-export class WorkerChoiceStrategyContext<
-  Worker extends IWorker,
-  Data,
-  Response
-> {
-  // Will be set by setter in constructor
-  private workerChoiceStrategy!: IWorkerChoiceStrategy<Worker>
-
-  /**
-   * Worker choice strategy context constructor.
-   *
-   * @param pool The pool instance.
-   * @param createDynamicallyWorkerCallback The worker creation callback for dynamic pool.
-   * @param workerChoiceStrategy The worker choice strategy.
-   */
-  public constructor (
-    private readonly pool: IPoolInternal<Worker, Data, Response>,
-    private createDynamicallyWorkerCallback: () => Worker,
-    workerChoiceStrategy: WorkerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
-  ) {
-    this.setWorkerChoiceStrategy(workerChoiceStrategy)
-  }
-
-  /**
-   * Get the worker choice strategy instance specific to the pool type.
-   *
-   * @param workerChoiceStrategy The worker choice strategy.
-   * @returns The worker choice strategy instance for the pool type.
-   */
-  private getPoolWorkerChoiceStrategy (
-    workerChoiceStrategy: WorkerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
-  ): IWorkerChoiceStrategy<Worker> {
-    if (this.pool.type === PoolType.DYNAMIC) {
-      return new DynamicPoolWorkerChoiceStrategy(
-        this.pool,
-        this.createDynamicallyWorkerCallback,
-        workerChoiceStrategy
-      )
-    }
-    return SelectionStrategiesUtils.getWorkerChoiceStrategy(
-      this.pool,
-      workerChoiceStrategy
-    )
-  }
-
-  /**
-   * Set the worker choice strategy to use in the context.
-   *
-   * @param workerChoiceStrategy The worker choice strategy to set.
-   */
-  public setWorkerChoiceStrategy (
-    workerChoiceStrategy: WorkerChoiceStrategy
-  ): void {
-    this.workerChoiceStrategy = this.getPoolWorkerChoiceStrategy(
-      workerChoiceStrategy
-    )
-  }
-
-  /**
-   * Choose a worker with the underlying selection strategy.
-   *
-   * @returns The chosen one.
-   */
-  public execute (): Worker {
-    return this.workerChoiceStrategy.choose()
-  }
-}
-
-/**
- * Worker selection strategies helpers class.
- */
-class SelectionStrategiesUtils {
-  /**
-   * Get the worker choice strategy instance.
-   *
-   * @param pool The pool instance.
-   * @param workerChoiceStrategy The worker choice strategy.
-   * @returns The worker choice strategy instance.
-   */
-  public static getWorkerChoiceStrategy<
-    Worker extends IWorker,
-    Data,
-    Response
-  > (
-    pool: IPoolInternal<Worker, Data, Response>,
-    workerChoiceStrategy: WorkerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
-  ): IWorkerChoiceStrategy<Worker> {
-    switch (workerChoiceStrategy) {
-      case WorkerChoiceStrategies.ROUND_ROBIN:
-        return new RoundRobinWorkerChoiceStrategy(pool)
-      case WorkerChoiceStrategies.LESS_RECENTLY_USED:
-        return new LessRecentlyUsedWorkerChoiceStrategy(pool)
-      default:
-        throw new Error(
-          // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
-          `Worker choice strategy '${workerChoiceStrategy}' not found`
-        )
-    }
-  }
-}
diff --git a/src/pools/selection-strategies/abstract-worker-choice-strategy.ts b/src/pools/selection-strategies/abstract-worker-choice-strategy.ts
new file mode 100644 (file)
index 0000000..9cbebc3
--- /dev/null
@@ -0,0 +1,32 @@
+import type { AbstractPoolWorker } from '../abstract-pool-worker'
+import type { IPoolInternal } from '../pool-internal'
+import { PoolType } from '../pool-internal'
+import type { IWorkerChoiceStrategy } from './selection-strategies-types'
+
+/**
+ * Abstract worker choice strategy class.
+ *
+ * @template Worker Type of worker which manages the strategy.
+ * @template Data Type of data sent to the worker. This can only be serializable data.
+ * @template Response Type of response of execution. This can only be serializable data.
+ */
+export abstract class AbstractWorkerChoiceStrategy<
+  Worker extends AbstractPoolWorker,
+  Data,
+  Response
+> implements IWorkerChoiceStrategy<Worker> {
+  /** @inheritdoc */
+  public isDynamicPool: boolean = this.pool.type === PoolType.DYNAMIC
+
+  /**
+   * Constructs a worker choice strategy attached to the pool.
+   *
+   * @param pool The pool instance.
+   */
+  public constructor (
+    protected readonly pool: IPoolInternal<Worker, Data, Response>
+  ) {}
+
+  /** @inheritdoc */
+  public abstract choose (): Worker
+}
diff --git a/src/pools/selection-strategies/dynamic-pool-worker-choice-strategy.ts b/src/pools/selection-strategies/dynamic-pool-worker-choice-strategy.ts
new file mode 100644 (file)
index 0000000..f98ec5e
--- /dev/null
@@ -0,0 +1,58 @@
+import type { AbstractPoolWorker } from '../abstract-pool-worker'
+import type { IPoolInternal } from '../pool-internal'
+import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy'
+import type {
+  IWorkerChoiceStrategy,
+  WorkerChoiceStrategy
+} from './selection-strategies-types'
+import { WorkerChoiceStrategies } from './selection-strategies-types'
+import { SelectionStrategiesUtils } from './selection-strategies-utils'
+
+/**
+ * Dynamically choose a worker.
+ *
+ * @template Worker Type of worker which manages the strategy.
+ * @template Data Type of data sent to the worker. This can only be serializable data.
+ * @template Response Type of response of execution. This can only be serializable data.
+ */
+export class DynamicPoolWorkerChoiceStrategy<
+  Worker extends AbstractPoolWorker,
+  Data,
+  Response
+> extends AbstractWorkerChoiceStrategy<Worker, Data, Response> {
+  private workerChoiceStrategy: IWorkerChoiceStrategy<Worker>
+
+  /**
+   * Constructs a worker choice strategy for dynamical pool.
+   *
+   * @param pool The pool instance.
+   * @param createDynamicallyWorkerCallback The worker creation callback for dynamic pool.
+   * @param workerChoiceStrategy The worker choice strategy when the pull is busy.
+   */
+  public constructor (
+    pool: IPoolInternal<Worker, Data, Response>,
+    private createDynamicallyWorkerCallback: () => Worker,
+    workerChoiceStrategy: WorkerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
+  ) {
+    super(pool)
+    this.workerChoiceStrategy = SelectionStrategiesUtils.getWorkerChoiceStrategy(
+      this.pool,
+      workerChoiceStrategy
+    )
+  }
+
+  /** @inheritdoc */
+  public choose (): Worker {
+    const freeWorker = this.pool.findFreeWorker()
+    if (freeWorker) {
+      return freeWorker
+    }
+
+    if (this.pool.busy) {
+      return this.workerChoiceStrategy.choose()
+    }
+
+    // All workers are busy, create a new worker
+    return this.createDynamicallyWorkerCallback()
+  }
+}
diff --git a/src/pools/selection-strategies/less-recently-used-worker-choice-strategy.ts b/src/pools/selection-strategies/less-recently-used-worker-choice-strategy.ts
new file mode 100644 (file)
index 0000000..a7892c4
--- /dev/null
@@ -0,0 +1,35 @@
+import type { AbstractPoolWorker } from '../abstract-pool-worker'
+import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy'
+
+/**
+ * Selects the less recently used worker.
+ *
+ * @template Worker Type of worker which manages the strategy.
+ * @template Data Type of data sent to the worker. This can only be serializable data.
+ * @template Response Type of response of execution. This can only be serializable data.
+ */
+export class LessRecentlyUsedWorkerChoiceStrategy<
+  Worker extends AbstractPoolWorker,
+  Data,
+  Response
+> extends AbstractWorkerChoiceStrategy<Worker, Data, Response> {
+  /** @inheritdoc */
+  public choose (): Worker {
+    let minNumberOfRunningTasks = Infinity
+    // A worker is always found because it picks the one with fewer tasks
+    let lessRecentlyUsedWorker!: Worker
+    for (const worker of this.pool.workers) {
+      const workerRunningTasks = this.pool.getWorkerRunningTasks(worker)
+      if (!this.isDynamicPool && workerRunningTasks === 0) {
+        return worker
+      } else if (
+        workerRunningTasks !== undefined &&
+        workerRunningTasks < minNumberOfRunningTasks
+      ) {
+        lessRecentlyUsedWorker = worker
+        minNumberOfRunningTasks = workerRunningTasks
+      }
+    }
+    return lessRecentlyUsedWorker
+  }
+}
diff --git a/src/pools/selection-strategies/round-robin-worker-choice-strategy.ts b/src/pools/selection-strategies/round-robin-worker-choice-strategy.ts
new file mode 100644 (file)
index 0000000..ea1ad56
--- /dev/null
@@ -0,0 +1,30 @@
+import type { AbstractPoolWorker } from '../abstract-pool-worker'
+import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy'
+
+/**
+ * Selects the next worker in a round robin fashion.
+ *
+ * @template Worker Type of worker which manages the strategy.
+ * @template Data Type of data sent to the worker. This can only be serializable data.
+ * @template Response Type of response of execution. This can only be serializable data.
+ */
+export class RoundRobinWorkerChoiceStrategy<
+  Worker extends AbstractPoolWorker,
+  Data,
+  Response
+> extends AbstractWorkerChoiceStrategy<Worker, Data, Response> {
+  /**
+   * Index for the next worker.
+   */
+  private nextWorkerIndex: number = 0
+
+  /** @inheritdoc */
+  public choose (): Worker {
+    const chosenWorker = this.pool.workers[this.nextWorkerIndex]
+    this.nextWorkerIndex =
+      this.nextWorkerIndex === this.pool.workers.length - 1
+        ? 0
+        : this.nextWorkerIndex + 1
+    return chosenWorker
+  }
+}
diff --git a/src/pools/selection-strategies/selection-strategies-types.ts b/src/pools/selection-strategies/selection-strategies-types.ts
new file mode 100644 (file)
index 0000000..edefcfa
--- /dev/null
@@ -0,0 +1,36 @@
+import type { AbstractPoolWorker } from '../abstract-pool-worker'
+
+/**
+ * Enumeration of worker choice strategies.
+ */
+export const WorkerChoiceStrategies = Object.freeze({
+  /**
+   * Round robin worker selection strategy.
+   */
+  ROUND_ROBIN: 'ROUND_ROBIN',
+  /**
+   * Less recently used worker selection strategy.
+   */
+  LESS_RECENTLY_USED: 'LESS_RECENTLY_USED'
+} as const)
+
+/**
+ * Worker choice strategy.
+ */
+export type WorkerChoiceStrategy = keyof typeof WorkerChoiceStrategies
+
+/**
+ * Worker choice strategy interface.
+ *
+ * @template Worker Type of worker which manages the strategy.
+ */
+export interface IWorkerChoiceStrategy<Worker extends AbstractPoolWorker> {
+  /**
+   * Is the pool attached to the strategy dynamic?.
+   */
+  isDynamicPool: boolean
+  /**
+   * Choose a worker in the pool.
+   */
+  choose(): Worker
+}
diff --git a/src/pools/selection-strategies/selection-strategies-utils.ts b/src/pools/selection-strategies/selection-strategies-utils.ts
new file mode 100644 (file)
index 0000000..e76a699
--- /dev/null
@@ -0,0 +1,42 @@
+import type { AbstractPoolWorker } from '../abstract-pool-worker'
+import type { IPoolInternal } from '../pool-internal'
+import { LessRecentlyUsedWorkerChoiceStrategy } from './less-recently-used-worker-choice-strategy'
+import { RoundRobinWorkerChoiceStrategy } from './round-robin-worker-choice-strategy'
+import type {
+  IWorkerChoiceStrategy,
+  WorkerChoiceStrategy
+} from './selection-strategies-types'
+import { WorkerChoiceStrategies } from './selection-strategies-types'
+
+/**
+ * Worker selection strategies helpers class.
+ */
+export class SelectionStrategiesUtils {
+  /**
+   * Get the worker choice strategy instance.
+   *
+   * @param pool The pool instance.
+   * @param workerChoiceStrategy The worker choice strategy.
+   * @returns The worker choice strategy instance.
+   */
+  public static getWorkerChoiceStrategy<
+    Worker extends AbstractPoolWorker,
+    Data,
+    Response
+  > (
+    pool: IPoolInternal<Worker, Data, Response>,
+    workerChoiceStrategy: WorkerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
+  ): IWorkerChoiceStrategy<Worker> {
+    switch (workerChoiceStrategy) {
+      case WorkerChoiceStrategies.ROUND_ROBIN:
+        return new RoundRobinWorkerChoiceStrategy(pool)
+      case WorkerChoiceStrategies.LESS_RECENTLY_USED:
+        return new LessRecentlyUsedWorkerChoiceStrategy(pool)
+      default:
+        throw new Error(
+          // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
+          `Worker choice strategy '${workerChoiceStrategy}' not found`
+        )
+    }
+  }
+}
diff --git a/src/pools/selection-strategies/worker-choice-strategy-context.ts b/src/pools/selection-strategies/worker-choice-strategy-context.ts
new file mode 100644 (file)
index 0000000..3de7353
--- /dev/null
@@ -0,0 +1,85 @@
+import type { AbstractPoolWorker } from '../abstract-pool-worker'
+import type { IPoolInternal } from '../pool-internal'
+import { PoolType } from '../pool-internal'
+import { DynamicPoolWorkerChoiceStrategy } from './dynamic-pool-worker-choice-strategy'
+import type {
+  IWorkerChoiceStrategy,
+  WorkerChoiceStrategy
+} from './selection-strategies-types'
+import { WorkerChoiceStrategies } from './selection-strategies-types'
+import { SelectionStrategiesUtils } from './selection-strategies-utils'
+
+/**
+ * The worker choice strategy context.
+ *
+ * @template Worker Type of worker.
+ * @template Data Type of data sent to the worker. This can only be serializable data.
+ * @template Response Type of response of execution. This can only be serializable data.
+ */
+export class WorkerChoiceStrategyContext<
+  Worker extends AbstractPoolWorker,
+  Data,
+  Response
+> {
+  // Will be set by setter in constructor
+  private workerChoiceStrategy!: IWorkerChoiceStrategy<Worker>
+
+  /**
+   * Worker choice strategy context constructor.
+   *
+   * @param pool The pool instance.
+   * @param createDynamicallyWorkerCallback The worker creation callback for dynamic pool.
+   * @param workerChoiceStrategy The worker choice strategy.
+   */
+  public constructor (
+    private readonly pool: IPoolInternal<Worker, Data, Response>,
+    private createDynamicallyWorkerCallback: () => Worker,
+    workerChoiceStrategy: WorkerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
+  ) {
+    this.setWorkerChoiceStrategy(workerChoiceStrategy)
+  }
+
+  /**
+   * Get the worker choice strategy instance specific to the pool type.
+   *
+   * @param workerChoiceStrategy The worker choice strategy.
+   * @returns The worker choice strategy instance for the pool type.
+   */
+  private getPoolWorkerChoiceStrategy (
+    workerChoiceStrategy: WorkerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
+  ): IWorkerChoiceStrategy<Worker> {
+    if (this.pool.type === PoolType.DYNAMIC) {
+      return new DynamicPoolWorkerChoiceStrategy(
+        this.pool,
+        this.createDynamicallyWorkerCallback,
+        workerChoiceStrategy
+      )
+    }
+    return SelectionStrategiesUtils.getWorkerChoiceStrategy(
+      this.pool,
+      workerChoiceStrategy
+    )
+  }
+
+  /**
+   * Set the worker choice strategy to use in the context.
+   *
+   * @param workerChoiceStrategy The worker choice strategy to set.
+   */
+  public setWorkerChoiceStrategy (
+    workerChoiceStrategy: WorkerChoiceStrategy
+  ): void {
+    this.workerChoiceStrategy = this.getPoolWorkerChoiceStrategy(
+      workerChoiceStrategy
+    )
+  }
+
+  /**
+   * Choose a worker with the underlying selection strategy.
+   *
+   * @returns The chosen one.
+   */
+  public execute (): Worker {
+    return this.workerChoiceStrategy.choose()
+  }
+}
index 361c615c78a8724421748036900021d550c9535e..a5b9eb7c37c49d945a23112ea9ee95832079ad58 100644 (file)
@@ -1,4 +1,4 @@
-import type { PoolOptions } from '../abstract-pool'
+import type { PoolOptions } from '../pool'
 import { PoolType } from '../pool-internal'
 import type { ThreadWorkerWithMessageChannel } from './fixed'
 import { FixedThreadPool } from './fixed'
index e3f9602cd93cdfc3a3e03b3ab7860fe25cdb5135..da3c0138387495eb63c9f8c1c9c29429088f6cbb 100644 (file)
@@ -1,7 +1,7 @@
 import { isMainThread, MessageChannel, SHARE_ENV, Worker } from 'worker_threads'
 import type { Draft, MessageValue } from '../../utility-types'
-import type { PoolOptions } from '../abstract-pool'
 import { AbstractPool } from '../abstract-pool'
+import type { PoolOptions } from '../pool'
 import { PoolType } from '../pool-internal'
 
 /**
index 4e2f3de4f726c9cfbcb6f890e2eee9a514feb859..c9c5492cb7411cb78e0363514badd243c5e49add 100644 (file)
@@ -1,6 +1,6 @@
 import type { Worker as ClusterWorker } from 'cluster'
 import type { MessagePort } from 'worker_threads'
-import type { IWorker } from './pools/abstract-pool'
+import type { AbstractPoolWorker } from './pools/abstract-pool-worker'
 import type { KillBehavior } from './worker/worker-options'
 
 /**
@@ -46,7 +46,7 @@ export interface MessageValue<
  * @template Response Type of response of execution. This can only be serializable data.
  */
 export interface PromiseWorkerResponseWrapper<
-  Worker extends IWorker,
+  Worker extends AbstractPoolWorker,
   Response = unknown
 > {
   /**
index 9071b5712e4f3ec71f705da9e4cc9f2eadbae996..c554ab14f1304f0805ef865798d299e1fc7426bf 100644 (file)
@@ -42,6 +42,48 @@ describe('Selection strategies test suite', () => {
     await pool.destroy()
   })
 
+  it('Verify ROUND_ROBIN strategy can be run in a fixed pool', async () => {
+    const max = 3
+    const pool = new FixedThreadPool(
+      max,
+      './tests/worker-files/thread/testWorker.js',
+      { workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN }
+    )
+    expect(pool.opts.workerChoiceStrategy).toBe(
+      WorkerChoiceStrategies.ROUND_ROBIN
+    )
+    // TODO: Create a better test to cover `RoundRobinWorkerChoiceStrategy#choose`
+    const promises = []
+    for (let i = 0; i < max * 2; i++) {
+      promises.push(pool.execute({ test: 'test' }))
+    }
+    await Promise.all(promises)
+    // We need to clean up the resources after our test
+    await pool.destroy()
+  })
+
+  it('Verify ROUND_ROBIN strategy can be run in a dynamic pool', async () => {
+    const min = 0
+    const max = 3
+    const pool = new DynamicThreadPool(
+      min,
+      max,
+      './tests/worker-files/thread/testWorker.js',
+      { workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN }
+    )
+    expect(pool.opts.workerChoiceStrategy).toBe(
+      WorkerChoiceStrategies.ROUND_ROBIN
+    )
+    // TODO: Create a better test to cover `RoundRobinWorkerChoiceStrategy#choose`
+    const promises = []
+    for (let i = 0; i < max * 2; i++) {
+      promises.push(pool.execute({ test: 'test' }))
+    }
+    await Promise.all(promises)
+    // We need to clean up the resources after our test
+    await pool.destroy()
+  })
+
   it('Verify LESS_RECENTLY_USED strategy is taken at pool creation', async () => {
     const max = 3
     const pool = new FixedThreadPool(
index 92fbd9b8f9e128382c5781f45719db104bebb19f..94320429ce6d2cfa9deb956bcfac39f996ab26b9 100644 (file)
@@ -26,7 +26,8 @@ describe('Dynamic thread pool test suite', () => {
     for (let i = 0; i < max * 2; i++) {
       promises.push(pool.execute({ test: 'test' }))
     }
-    expect(pool.workers.length).toBe(max)
+    expect(pool.workers.length).toBeLessThanOrEqual(max)
+    expect(pool.workers.length).toBeGreaterThan(min)
     // The `busy` event is triggered when the number of submitted tasks at once reach the max number of workers in the dynamic pool.
     // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool.
     expect(poolBusy).toBe(max + 1)
index 8565c7504bfe0a642679eafcbb1d62cfc3dc892a..a34cd78886584cdad6866bbb7ac470dba7eb1622 100644 (file)
@@ -35,7 +35,7 @@ class TestUtils {
   /**
    * Intentionally inefficient implementation.
    *
-   * @param {*} n
+   * @param {number} n
    * @returns {number}
    */
   static fibonacci (n) {
@@ -46,7 +46,7 @@ class TestUtils {
   /**
    * Intentionally inefficient implementation.
    *
-   * @param {*} n
+   * @param {number} n
    * @returns {number}
    */
   static factorial (n) {
index 2ac8fa7d1898feaf841d59d98872f761e9c19a5e..e3c2919eb1dee7fc9f5e1f1632f72c73481c8a50 100644 (file)
@@ -5,7 +5,8 @@
     "outDir": "lib",
     "esModuleInterop": true,
     "declaration": true,
-    "strict": true
+    "strict": true,
+    "importsNotUsedAsValues": "error"
   },
   "include": ["src/**/*.ts"],
   "exclude": ["node_modules"]