refactor: move worker setup into worker node constructor
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Fri, 15 Dec 2023 16:59:52 +0000 (17:59 +0100)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Fri, 15 Dec 2023 16:59:52 +0000 (17:59 +0100)
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
17 files changed:
CHANGELOG.md
docs/api.md
src/index.ts
src/pools/abstract-pool.ts
src/pools/cluster/dynamic.ts
src/pools/cluster/fixed.ts
src/pools/pool.ts
src/pools/thread/dynamic.ts
src/pools/thread/fixed.ts
src/pools/utils.ts
src/pools/worker-node.ts
src/pools/worker.ts
tests/pools/abstract-pool.test.mjs
tests/pools/cluster/dynamic.test.mjs
tests/pools/thread/dynamic.test.mjs
tests/pools/utils.test.mjs
tests/pools/worker-node.test.mjs

index a308d289b3a2cb03b3c2614c214e9f0ab74ec133..ff6fbaca60e6f466b545a7017143379c5b6449ac 100644 (file)
@@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
 
 ## [Unreleased]
 
+### Changed
+
+- TypeScript breaking change: merge ThreadPoolOptions and ClusterPoolOptions types into PoolOptions type.
+
 ## [3.0.14] - 2023-12-13
 
 ### Fixed
index 28282ac2314bc05322098d09ad5ad0d08a163595..0a13927586a9dd17c97d9925fce2fe77e4d9ef45 100644 (file)
@@ -14,8 +14,6 @@
   - [`pool.listTaskFunctionNames()`](#poollisttaskfunctionnames)
   - [`pool.setDefaultTaskFunction(name)`](#poolsetdefaulttaskfunctionname)
   - [`PoolOptions`](#pooloptions)
-    - [`ThreadPoolOptions extends PoolOptions`](#threadpooloptions-extends-pooloptions)
-    - [`ClusterPoolOptions extends PoolOptions`](#clusterpooloptions-extends-pooloptions)
 - [Worker](#worker)
   - [`class YourWorker extends ThreadWorker/ClusterWorker`](#class-yourworker-extends-threadworkerclusterworker)
     - [`YourWorker.hasTaskFunction(name)`](#yourworkerhastaskfunctionname)
@@ -141,12 +139,8 @@ An object with these properties:
 
   Default: `{ size: (pool maximum size)^2, concurrency: 1, taskStealing: true, tasksStealingOnBackPressure: true }`
 
-#### `ThreadPoolOptions extends PoolOptions`
-
 - `workerOptions` (optional) - An object with the worker options to pass to worker. See [worker_threads](https://nodejs.org/api/worker_threads.html#worker_threads_new_worker_filename_options) for more details.
 
-#### `ClusterPoolOptions extends PoolOptions`
-
 - `env` (optional) - An object with the environment variables to pass to worker. See [cluster](https://nodejs.org/api/cluster.html#cluster_cluster_fork_env) for more details.
 
 - `settings` (optional) - An object with the cluster settings. See [cluster](https://nodejs.org/api/cluster.html#cluster_cluster_settings) for more details.
index 802742891b198d6eda4523a2a8989e0f1c2cb1a6..a5ffc3a3528b7c737786958b392b199860626413 100644 (file)
@@ -1,9 +1,6 @@
 export type { AbstractPool } from './pools/abstract-pool'
 export { DynamicClusterPool } from './pools/cluster/dynamic'
-export {
-  FixedClusterPool,
-  type ClusterPoolOptions
-} from './pools/cluster/fixed'
+export { FixedClusterPool } from './pools/cluster/fixed'
 export { PoolEvents, PoolTypes } from './pools/pool'
 export type {
   IPool,
@@ -26,6 +23,7 @@ export type {
   StrategyData,
   TaskStatistics,
   WorkerInfo,
+  WorkerNodeOptions,
   WorkerType,
   WorkerUsage
 } from './pools/worker'
@@ -45,7 +43,7 @@ export type {
 } from './pools/selection-strategies/selection-strategies-types'
 export type { WorkerChoiceStrategyContext } from './pools/selection-strategies/worker-choice-strategy-context'
 export { DynamicThreadPool } from './pools/thread/dynamic'
-export { FixedThreadPool, type ThreadPoolOptions } from './pools/thread/fixed'
+export { FixedThreadPool } from './pools/thread/fixed'
 export type { AbstractWorker } from './worker/abstract-worker'
 export { ClusterWorker } from './worker/cluster-worker'
 export { ThreadWorker } from './worker/thread-worker'
index 9b0ec92f3df1f9a2d5cdec26608cde6595a9cf96..56be9a1a9ffe0ca22131b40d9308adab87b9894a 100644 (file)
@@ -1258,26 +1258,27 @@ export abstract class AbstractPool<
     transferList?: TransferListItem[]
   ): void
 
-  /**
-   * Creates a new worker.
-   *
-   * @returns Newly created worker.
-   */
-  protected abstract createWorker (): Worker
-
   /**
    * Creates a new, completely set up worker node.
    *
    * @returns New, completely set up worker node key.
    */
   protected createAndSetupWorkerNode (): number {
-    const worker = this.createWorker()
-
-    worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
-    worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
-    worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
-    worker.on('error', error => {
-      const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
+    const workerNode = this.createWorkerNode()
+    workerNode.registerWorkerEventHandler(
+      'online',
+      this.opts.onlineHandler ?? EMPTY_FUNCTION
+    )
+    workerNode.registerWorkerEventHandler(
+      'message',
+      this.opts.messageHandler ?? EMPTY_FUNCTION
+    )
+    workerNode.registerWorkerEventHandler(
+      'error',
+      this.opts.errorHandler ?? EMPTY_FUNCTION
+    )
+    workerNode.registerWorkerEventHandler('error', (error: Error) => {
+      const workerNodeKey = this.getWorkerNodeKeyByWorker(workerNode.worker)
       this.flagWorkerNodeAsNotReady(workerNodeKey)
       const workerInfo = this.getWorkerInfo(workerNodeKey)
       this.emitter?.emit(PoolEvents.error, error)
@@ -1298,15 +1299,15 @@ export abstract class AbstractPool<
         this.redistributeQueuedTasks(workerNodeKey)
       }
     })
-    worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
-    worker.once('exit', () => {
-      this.removeWorkerNode(worker)
+    workerNode.registerWorkerEventHandler(
+      'exit',
+      this.opts.exitHandler ?? EMPTY_FUNCTION
+    )
+    workerNode.registerOnceWorkerEventHandler('exit', () => {
+      this.removeWorkerNode(workerNode.worker)
     })
-
-    const workerNodeKey = this.addWorkerNode(worker)
-
+    const workerNodeKey = this.addWorkerNode(workerNode)
     this.afterWorkerNodeSetup(workerNodeKey)
-
     return workerNodeKey
   }
 
@@ -1806,23 +1807,38 @@ export abstract class AbstractPool<
   }
 
   /**
-   * Adds the given worker in the pool worker nodes.
+   * Creates a worker node.
    *
-   * @param worker - The worker.
-   * @returns The added worker node key.
-   * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
+   * @returns The created worker node.
    */
-  private addWorkerNode (worker: Worker): number {
+  private createWorkerNode (): IWorkerNode<Worker, Data> {
     const workerNode = new WorkerNode<Worker, Data>(
-      worker,
-      this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2)
+      this.worker,
+      this.filePath,
+      {
+        env: this.opts.env,
+        workerOptions: this.opts.workerOptions,
+        tasksQueueBackPressureSize:
+          this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2)
+      }
     )
     // Flag the worker node as ready at pool startup.
     if (this.starting) {
       workerNode.info.ready = true
     }
+    return workerNode
+  }
+
+  /**
+   * Adds the given worker node in the pool worker nodes.
+   *
+   * @param workerNode - The worker node.
+   * @returns The added worker node key.
+   * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
+   */
+  private addWorkerNode (workerNode: IWorkerNode<Worker, Data>): number {
     this.workerNodes.push(workerNode)
-    const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
+    const workerNodeKey = this.workerNodes.indexOf(workerNode)
     if (workerNodeKey === -1) {
       throw new Error('Worker added not found in worker nodes')
     }
@@ -1830,7 +1846,7 @@ export abstract class AbstractPool<
   }
 
   /**
-   * Removes the given worker from the pool worker nodes.
+   * Removes the worker node associated to the give given worker from the pool worker nodes.
    *
    * @param worker - The worker.
    */
index 99a89966e914507fa6868883a0f67c751c8b29c6..32aad22a738c8541a4147ebbad7a3048ff51f8af 100644 (file)
@@ -1,6 +1,7 @@
-import { type PoolType, PoolTypes } from '../pool'
+import { type Worker } from 'node:cluster'
+import { type PoolOptions, type PoolType, PoolTypes } from '../pool'
 import { checkDynamicPoolSize } from '../utils'
-import { type ClusterPoolOptions, FixedClusterPool } from './fixed'
+import { FixedClusterPool } from './fixed'
 
 /**
  * A cluster pool with a dynamic number of workers, but a guaranteed minimum number of workers.
@@ -29,7 +30,7 @@ export class DynamicClusterPool<
     min: number,
     protected readonly max: number,
     filePath: string,
-    opts: ClusterPoolOptions = {}
+    opts: PoolOptions<Worker> = {}
   ) {
     super(min, filePath, opts)
     checkDynamicPoolSize(this.numberOfWorkers, this.max)
index b74f034bc199d9d4e89f77f0856b00855cf1685a..f3fb54bbf4a08bd0ee6d28f623f3ac04f55630f1 100644 (file)
@@ -1,27 +1,9 @@
-import cluster, { type ClusterSettings, type Worker } from 'node:cluster'
+import cluster, { type Worker } from 'node:cluster'
 import type { MessageValue } from '../../utility-types'
 import { AbstractPool } from '../abstract-pool'
 import { type PoolOptions, type PoolType, PoolTypes } from '../pool'
 import { type WorkerType, WorkerTypes } from '../worker'
 
-/**
- * Options for a poolifier cluster pool.
- */
-export interface ClusterPoolOptions extends PoolOptions<Worker> {
-  /**
-   * Key/value pairs to add to worker process environment.
-   *
-   * @see https://nodejs.org/api/cluster.html#cluster_cluster_fork_env
-   */
-  env?: Record<string, unknown>
-  /**
-   * Cluster settings.
-   *
-   * @see https://nodejs.org/api/cluster.html#cluster_cluster_settings
-   */
-  settings?: ClusterSettings
-}
-
 /**
  * A cluster pool with a fixed number of workers.
  *
@@ -44,7 +26,7 @@ export class FixedClusterPool<
   public constructor (
     numberOfWorkers: number,
     filePath: string,
-    protected readonly opts: ClusterPoolOptions = {}
+    protected readonly opts: PoolOptions<Worker> = {}
   ) {
     super(numberOfWorkers, filePath, opts)
   }
@@ -65,18 +47,17 @@ export class FixedClusterPool<
     this.flushTasksQueue(workerNodeKey)
     // FIXME: wait for tasks to be finished
     const workerNode = this.workerNodes[workerNodeKey]
-    const worker = workerNode.worker
     const waitWorkerExit = new Promise<void>(resolve => {
-      worker.once('exit', () => {
+      workerNode.registerOnceWorkerEventHandler('exit', () => {
         resolve()
       })
     })
-    worker.once('disconnect', () => {
-      worker.kill()
+    workerNode.registerOnceWorkerEventHandler('disconnect', () => {
+      workerNode.worker.kill()
     })
     await this.sendKillMessageToWorker(workerNodeKey)
     workerNode.removeAllListeners()
-    worker.disconnect()
+    workerNode.worker.disconnect()
     await waitWorkerExit
   }
 
@@ -122,11 +103,6 @@ export class FixedClusterPool<
     this.workerNodes[workerNodeKey].worker.off('message', listener)
   }
 
-  /** @inheritDoc */
-  protected createWorker (): Worker {
-    return cluster.fork(this.opts.env)
-  }
-
   /** @inheritDoc */
   protected get type (): PoolType {
     return PoolTypes.fixed
index b1b504d59b22bba2aca61becc898fa87167c5c6c..856473ff354ee340ec9e990ec2f0de7d7d41e09b 100644 (file)
@@ -1,5 +1,6 @@
 import type { TransferListItem } from 'node:worker_threads'
 import type { EventEmitterAsyncResource } from 'node:events'
+import type { ClusterSettings } from 'node:cluster'
 import type { TaskFunction } from '../worker/task-functions'
 import type {
   ErrorHandler,
@@ -189,6 +190,24 @@ export interface PoolOptions<Worker extends IWorker> {
    * Pool worker node tasks queue options.
    */
   tasksQueueOptions?: TasksQueueOptions
+  /**
+   * Worker options.
+   *
+   * @see https://nodejs.org/api/worker_threads.html#new-workerfilename-options
+   */
+  workerOptions?: WorkerOptions
+  /**
+   * Key/value pairs to add to worker process environment.
+   *
+   * @see https://nodejs.org/api/cluster.html#cluster_cluster_fork_env
+   */
+  env?: Record<string, unknown>
+  /**
+   * Cluster settings.
+   *
+   * @see https://nodejs.org/api/cluster.html#cluster_cluster_settings
+   */
+  settings?: ClusterSettings
 }
 
 /**
index 119e556dd2ccb454fec3ed9c89c20df5d76199c2..6def273e24bcc413a880ad8fa793fad74a9e36d5 100644 (file)
@@ -1,6 +1,7 @@
-import { type PoolType, PoolTypes } from '../pool'
+import { type Worker } from 'node:worker_threads'
+import { type PoolOptions, type PoolType, PoolTypes } from '../pool'
 import { checkDynamicPoolSize } from '../utils'
-import { FixedThreadPool, type ThreadPoolOptions } from './fixed'
+import { FixedThreadPool } from './fixed'
 
 /**
  * A thread pool with a dynamic number of threads, but a guaranteed minimum number of threads.
@@ -29,7 +30,7 @@ export class DynamicThreadPool<
     min: number,
     protected readonly max: number,
     filePath: string,
-    opts: ThreadPoolOptions = {}
+    opts: PoolOptions<Worker> = {}
   ) {
     super(min, filePath, opts)
     checkDynamicPoolSize(this.numberOfWorkers, this.max)
index 28f8fceba679e5f9053820b8436583caf69f20af..e5074e392b45a94c1e9b80376cc08d3624a5c155 100644 (file)
@@ -1,10 +1,8 @@
 import {
   type MessageChannel,
   type MessagePort,
-  SHARE_ENV,
   type TransferListItem,
-  Worker,
-  type WorkerOptions,
+  type Worker,
   isMainThread
 } from 'node:worker_threads'
 import type { MessageValue } from '../../utility-types'
@@ -12,18 +10,6 @@ import { AbstractPool } from '../abstract-pool'
 import { type PoolOptions, type PoolType, PoolTypes } from '../pool'
 import { type WorkerType, WorkerTypes } from '../worker'
 
-/**
- * Options for a poolifier thread pool.
- */
-export interface ThreadPoolOptions extends PoolOptions<Worker> {
-  /**
-   * Worker options.
-   *
-   * @see https://nodejs.org/api/worker_threads.html#new-workerfilename-options
-   */
-  workerOptions?: WorkerOptions
-}
-
 /**
  * A thread pool with a fixed number of threads.
  *
@@ -46,7 +32,7 @@ export class FixedThreadPool<
   public constructor (
     numberOfThreads: number,
     filePath: string,
-    protected readonly opts: ThreadPoolOptions = {}
+    protected readonly opts: PoolOptions<Worker> = {}
   ) {
     super(numberOfThreads, filePath, opts)
   }
@@ -62,16 +48,15 @@ export class FixedThreadPool<
     this.flushTasksQueue(workerNodeKey)
     // FIXME: wait for tasks to be finished
     const workerNode = this.workerNodes[workerNodeKey]
-    const worker = workerNode.worker
     const waitWorkerExit = new Promise<void>(resolve => {
-      worker.once('exit', () => {
+      workerNode.registerOnceWorkerEventHandler('exit', () => {
         resolve()
       })
     })
     await this.sendKillMessageToWorker(workerNodeKey)
     workerNode.closeChannel()
     workerNode.removeAllListeners()
-    await worker.terminate()
+    await workerNode.worker.terminate()
     await waitWorkerExit
   }
 
@@ -135,14 +120,6 @@ export class FixedThreadPool<
     )
   }
 
-  /** @inheritDoc */
-  protected createWorker (): Worker {
-    return new Worker(this.filePath, {
-      env: SHARE_ENV,
-      ...this.opts.workerOptions
-    })
-  }
-
   /** @inheritDoc */
   protected get type (): PoolType {
     return PoolTypes.fixed
index eb800c4ab50f9d39e2b6fc98ebb4cf39bfc134b5..11c30ad1da9b5cf875be570116fdf240cb0af89b 100644 (file)
@@ -1,4 +1,6 @@
 import { existsSync } from 'node:fs'
+import cluster from 'node:cluster'
+import { SHARE_ENV, Worker, type WorkerOptions } from 'node:worker_threads'
 import { average, isPlainObject, max, median, min } from '../utils'
 import {
   type MeasurementStatisticsRequirements,
@@ -6,9 +8,21 @@ import {
   type WorkerChoiceStrategy
 } from './selection-strategies/selection-strategies-types'
 import type { TasksQueueOptions } from './pool'
-import type { IWorker, MeasurementStatistics } from './worker'
+import {
+  type IWorker,
+  type MeasurementStatistics,
+  type WorkerNodeOptions,
+  type WorkerType,
+  WorkerTypes
+} from './worker'
 
 export const checkFilePath = (filePath: string): void => {
+  if (filePath == null) {
+    throw new TypeError('The worker file path must be specified')
+  }
+  if (typeof filePath !== 'string') {
+    throw new TypeError('The worker file path must be a string')
+  }
   if (!existsSync(filePath)) {
     throw new Error(`Cannot find the worker file '${filePath}'`)
   }
@@ -86,26 +100,43 @@ export const checkValidTasksQueueOptions = (
   }
 }
 
-export const checkWorkerNodeArguments = <Worker extends IWorker>(
-  worker: Worker,
-  tasksQueueBackPressureSize: number
+export const checkWorkerNodeArguments = (
+  type: WorkerType,
+  filePath: string,
+  opts: WorkerNodeOptions
 ): void => {
-  if (worker == null) {
-    throw new TypeError('Cannot construct a worker node without a worker')
+  if (type == null) {
+    throw new TypeError('Cannot construct a worker node without a worker type')
+  }
+  if (!Object.values(WorkerTypes).includes(type)) {
+    throw new TypeError(
+      `Cannot construct a worker node with an invalid worker type '${type}'`
+    )
   }
-  if (tasksQueueBackPressureSize == null) {
+  checkFilePath(filePath)
+  if (opts == null) {
     throw new TypeError(
-      'Cannot construct a worker node without a tasks queue back pressure size'
+      'Cannot construct a worker node without worker node options'
     )
   }
-  if (!Number.isSafeInteger(tasksQueueBackPressureSize)) {
+  if (opts != null && !isPlainObject(opts)) {
     throw new TypeError(
-      'Cannot construct a worker node with a tasks queue back pressure size that is not an integer'
+      'Cannot construct a worker node with invalid options: must be a plain object'
     )
   }
-  if (tasksQueueBackPressureSize <= 0) {
+  if (opts.tasksQueueBackPressureSize == null) {
+    throw new TypeError(
+      'Cannot construct a worker node without a tasks queue back pressure size option'
+    )
+  }
+  if (!Number.isSafeInteger(opts.tasksQueueBackPressureSize)) {
+    throw new TypeError(
+      'Cannot construct a worker node with a tasks queue back pressure size option that is not an integer'
+    )
+  }
+  if (opts.tasksQueueBackPressureSize <= 0) {
     throw new RangeError(
-      'Cannot construct a worker node with a tasks queue back pressure size that is not a positive integer'
+      'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer'
     )
   }
 }
@@ -153,3 +184,22 @@ export const updateMeasurementStatistics = (
     }
   }
 }
+
+export const createWorker = <Worker extends IWorker>(
+  type: WorkerType,
+  filePath: string,
+  opts: { env?: Record<string, unknown>, workerOptions?: WorkerOptions }
+): Worker => {
+  switch (type) {
+    case WorkerTypes.thread:
+      return new Worker(filePath, {
+        env: SHARE_ENV,
+        ...opts?.workerOptions
+      }) as unknown as Worker
+    case WorkerTypes.cluster:
+      return cluster.fork(opts?.env) as unknown as Worker
+    default:
+      // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
+      throw new Error(`Unknown worker type '${type}'`)
+  }
+}
index 59c4de7a388e81434c373f1835b144c1b3c88f57..9a98458e59547378cef70ad00dcbb36fe6be695e 100644 (file)
@@ -5,15 +5,20 @@ import type { Task } from '../utility-types'
 import { DEFAULT_TASK_NAME, getWorkerId, getWorkerType } from '../utils'
 import { Deque } from '../deque'
 import {
+  type ErrorHandler,
+  type ExitHandler,
   type IWorker,
   type IWorkerNode,
+  type MessageHandler,
+  type OnlineHandler,
   type StrategyData,
   type WorkerInfo,
+  type WorkerNodeOptions,
   type WorkerType,
   WorkerTypes,
   type WorkerUsage
 } from './worker'
-import { checkWorkerNodeArguments } from './utils'
+import { checkWorkerNodeArguments, createWorker } from './utils'
 
 /**
  * Worker node.
@@ -43,19 +48,23 @@ export class WorkerNode<Worker extends IWorker, Data = unknown>
   /**
    * Constructs a new worker node.
    *
-   * @param worker - The worker.
-   * @param tasksQueueBackPressureSize - The tasks queue back pressure size.
+   * @param type - The worker type.
+   * @param filePath - The worker file path.
+   * @param opts - The worker node options.
    */
-  constructor (worker: Worker, tasksQueueBackPressureSize: number) {
+  constructor (type: WorkerType, filePath: string, opts: WorkerNodeOptions) {
     super()
-    checkWorkerNodeArguments<Worker>(worker, tasksQueueBackPressureSize)
-    this.worker = worker
-    this.info = this.initWorkerInfo(worker)
+    checkWorkerNodeArguments(type, filePath, opts)
+    this.worker = createWorker<Worker>(type, filePath, {
+      env: opts.env,
+      workerOptions: opts.workerOptions
+    })
+    this.info = this.initWorkerInfo(this.worker)
     this.usage = this.initWorkerUsage()
     if (this.info.type === WorkerTypes.thread) {
       this.messageChannel = new MessageChannel()
     }
-    this.tasksQueueBackPressureSize = tasksQueueBackPressureSize
+    this.tasksQueueBackPressureSize = opts.tasksQueueBackPressureSize
     this.tasksQueue = new Deque<Task<Data>>()
     this.onBackPressureStarted = false
     this.taskFunctionsUsage = new Map<string, WorkerUsage>()
@@ -125,6 +134,30 @@ export class WorkerNode<Worker extends IWorker, Data = unknown>
     }
   }
 
+  /** @inheritdoc */
+  public registerWorkerEventHandler (
+    event: string,
+    listener:
+    | OnlineHandler<Worker>
+    | MessageHandler<Worker>
+    | ErrorHandler<Worker>
+    | ExitHandler<Worker>
+  ): void {
+    this.worker.on(event, listener)
+  }
+
+  /** @inheritdoc */
+  public registerOnceWorkerEventHandler (
+    event: string,
+    listener:
+    | OnlineHandler<Worker>
+    | MessageHandler<Worker>
+    | ErrorHandler<Worker>
+    | ExitHandler<Worker>
+  ): void {
+    this.worker.once(event, listener)
+  }
+
   /** @inheritdoc */
   public getTaskFunctionWorkerUsage (name: string): WorkerUsage | undefined {
     if (!Array.isArray(this.info.taskFunctionNames)) {
index 5439606d420f0abd165b84c49157808de230aee6..1a94ef60950dffe84e9430374dcd13395a956bab 100644 (file)
@@ -208,27 +208,39 @@ export interface IWorker {
    * @param event - The event.
    * @param handler - The event handler.
    */
-  readonly on: ((event: 'online', handler: OnlineHandler<this>) => void) &
-  ((event: 'message', handler: MessageHandler<this>) => void) &
-  ((event: 'error', handler: ErrorHandler<this>) => void) &
-  ((event: 'exit', handler: ExitHandler<this>) => void)
-  /**
-   * Registers a listener to the exit event that will only be performed once.
+  readonly on: (
+    event: string,
+    handler:
+    | OnlineHandler<this>
+    | MessageHandler<this>
+    | ErrorHandler<this>
+    | ExitHandler<this>
+  ) => void
+  /**
+   * Registers once an event listener.
    *
-   * @param event - The `'exit'` event.
-   * @param handler - The exit handler.
+   * @param event - The event.
+   * @param handler - The event handler.
    */
-  readonly once: (event: 'exit', handler: ExitHandler<this>) => void
+  readonly once: (
+    event: string,
+    handler:
+    | OnlineHandler<this>
+    | MessageHandler<this>
+    | ErrorHandler<this>
+    | ExitHandler<this>
+  ) => void
 }
 
 /**
- * Worker node event detail.
+ * Worker node options.
  *
  * @internal
  */
-export interface WorkerNodeEventDetail {
-  workerId: number
-  workerNodeKey?: number
+export interface WorkerNodeOptions {
+  workerOptions?: WorkerOptions
+  env?: Record<string, unknown>
+  tasksQueueBackPressureSize: number
 }
 
 /**
@@ -316,6 +328,34 @@ export interface IWorkerNode<Worker extends IWorker, Data = unknown>
    * Closes communication channel.
    */
   readonly closeChannel: () => void
+  /**
+   * Registers a worker event handler.
+   *
+   * @param event - The event.
+   * @param listener - The event listener.
+   */
+  readonly registerWorkerEventHandler: (
+    event: string,
+    listener:
+    | OnlineHandler<Worker>
+    | MessageHandler<Worker>
+    | ErrorHandler<Worker>
+    | ExitHandler<Worker>
+  ) => void
+  /**
+   * Registers once a worker event handler.
+   *
+   * @param event - The event.
+   * @param listener - The event listener.
+   */
+  readonly registerOnceWorkerEventHandler: (
+    event: string,
+    listener:
+    | OnlineHandler<Worker>
+    | MessageHandler<Worker>
+    | ErrorHandler<Worker>
+    | ExitHandler<Worker>
+  ) => void
   /**
    * Gets task function worker usage statistics.
    *
@@ -331,3 +371,13 @@ export interface IWorkerNode<Worker extends IWorker, Data = unknown>
    */
   readonly deleteTaskFunctionWorkerUsage: (name: string) => boolean
 }
+
+/**
+ * Worker node event detail.
+ *
+ * @internal
+ */
+export interface WorkerNodeEventDetail {
+  workerId: number
+  workerNodeKey?: number
+}
index edf03f4ae327b4f7c79fe9a1a9c9917c91de6e19..5dc289359c704c51fb3feef8885eb75ff449c298 100644 (file)
@@ -78,7 +78,10 @@ describe('Abstract pool test suite', () => {
 
   it('Verify that filePath is checked', () => {
     expect(() => new FixedThreadPool(numberOfWorkers)).toThrow(
-      new Error("Cannot find the worker file 'undefined'")
+      new TypeError('The worker file path must be specified')
+    )
+    expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrow(
+      new TypeError('The worker file path must be a string')
     )
     expect(
       () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
index 6e8580e9d7cf07449c26d2fa74476b486b3178fe..2a654b6a878cab03a4da44afb7200ca122acf0c1 100644 (file)
@@ -76,7 +76,7 @@ describe('Dynamic cluster pool test suite', () => {
 
   it('Validation of inputs test', () => {
     expect(() => new DynamicClusterPool(min)).toThrow(
-      "Cannot find the worker file 'undefined'"
+      'The worker file path must be specified'
     )
   })
 
index efcc417e9a2ff5b98db606638efff112bfdc54ed..f7329710bd3c90e2264bebed7ab7aa628c0bccae 100644 (file)
@@ -76,7 +76,7 @@ describe('Dynamic thread pool test suite', () => {
 
   it('Validation of inputs test', () => {
     expect(() => new DynamicThreadPool(min)).toThrow(
-      "Cannot find the worker file 'undefined'"
+      'The worker file path must be specified'
     )
   })
 
index bf0c72afadadc78819bd5d938a30f962496f1fee..5feb6596296914252a94231e5de19132968a832f 100644 (file)
@@ -1,9 +1,15 @@
+import { Worker as ThreadWorker } from 'node:worker_threads'
+import { Worker as ClusterWorker } from 'node:cluster'
 import { expect } from 'expect'
 import {
   CircularArray,
   DEFAULT_CIRCULAR_ARRAY_SIZE
 } from '../../lib/circular-array.js'
-import { updateMeasurementStatistics } from '../../lib/pools/utils.js'
+import {
+  createWorker,
+  updateMeasurementStatistics
+} from '../../lib/pools/utils.js'
+import { WorkerTypes } from '../../lib/index.js'
 
 describe('Pool utils test suite', () => {
   it('Verify updateMeasurementStatistics() behavior', () => {
@@ -92,4 +98,19 @@ describe('Pool utils test suite', () => {
       )
     })
   })
+
+  it('Verify createWorker() behavior', () => {
+    expect(
+      createWorker(
+        WorkerTypes.thread,
+        './tests/worker-files/thread/testWorker.mjs'
+      )
+    ).toBeInstanceOf(ThreadWorker)
+    expect(
+      createWorker(
+        WorkerTypes.cluster,
+        './tests/worker-files/cluster/testWorker.mjs'
+      )
+    ).toBeInstanceOf(ClusterWorker)
+  })
 })
index f70ce37bdba07f7e04370b7dd05544efe7f960da..cfca3422b0f3ff727138608a65f7542dff63ba35 100644 (file)
@@ -1,5 +1,4 @@
-import { MessageChannel, Worker } from 'node:worker_threads'
-import cluster from 'node:cluster'
+import { MessageChannel } from 'node:worker_threads'
 import { expect } from 'expect'
 import { WorkerNode } from '../../lib/pools/worker-node.js'
 import { WorkerTypes } from '../../lib/index.js'
@@ -8,46 +7,119 @@ import { Deque } from '../../lib/deque.js'
 import { DEFAULT_TASK_NAME } from '../../lib/utils.js'
 
 describe('Worker node test suite', () => {
-  const threadWorker = new Worker('./tests/worker-files/thread/testWorker.mjs')
-  const clusterWorker = cluster.fork()
-  const threadWorkerNode = new WorkerNode(threadWorker, 12)
-  const clusterWorkerNode = new WorkerNode(clusterWorker, 12)
+  const threadWorkerNode = new WorkerNode(
+    WorkerTypes.thread,
+    './tests/worker-files/thread/testWorker.mjs',
+    { tasksQueueBackPressureSize: 12 }
+  )
+  const clusterWorkerNode = new WorkerNode(
+    WorkerTypes.cluster,
+    './tests/worker-files/cluster/testWorker.js',
+    { tasksQueueBackPressureSize: 12 }
+  )
 
   it('Worker node instantiation', () => {
     expect(() => new WorkerNode()).toThrow(
-      new TypeError('Cannot construct a worker node without a worker')
+      new TypeError('Cannot construct a worker node without a worker type')
     )
-    expect(() => new WorkerNode(threadWorker)).toThrow(
+    expect(
+      () =>
+        new WorkerNode(
+          'invalidWorkerType',
+          './tests/worker-files/thread/testWorker.mjs',
+          { tasksQueueBackPressureSize: 12 }
+        )
+    ).toThrow(
+      new TypeError(
+        "Cannot construct a worker node with an invalid worker type 'invalidWorkerType'"
+      )
+    )
+    expect(
+      () =>
+        new WorkerNode(
+          WorkerTypes.thread,
+          './tests/worker-files/thread/testWorker.mjs'
+        )
+    ).toThrow(
       new TypeError(
-        'Cannot construct a worker node without a tasks queue back pressure size'
+        'Cannot construct a worker node without worker node options'
       )
     )
     expect(
-      () => new WorkerNode(threadWorker, 'invalidTasksQueueBackPressureSize')
+      () =>
+        new WorkerNode(
+          WorkerTypes.thread,
+          './tests/worker-files/thread/testWorker.mjs',
+          ''
+        )
     ).toThrow(
       new TypeError(
-        'Cannot construct a worker node with a tasks queue back pressure size that is not an integer'
+        'Cannot construct a worker node with invalid options: must be a plain object'
       )
     )
-    expect(() => new WorkerNode(threadWorker, 0.2)).toThrow(
+    expect(
+      () =>
+        new WorkerNode(
+          WorkerTypes.thread,
+          './tests/worker-files/thread/testWorker.mjs',
+          {}
+        )
+    ).toThrow(
       new TypeError(
-        'Cannot construct a worker node with a tasks queue back pressure size that is not an integer'
+        'Cannot construct a worker node without a tasks queue back pressure size option'
       )
     )
-    expect(() => new WorkerNode(threadWorker, 0)).toThrow(
+    expect(
+      () =>
+        new WorkerNode(
+          WorkerTypes.thread,
+          './tests/worker-files/thread/testWorker.mjs',
+          { tasksQueueBackPressureSize: 'invalidTasksQueueBackPressureSize' }
+        )
+    ).toThrow(
+      new TypeError(
+        'Cannot construct a worker node with a tasks queue back pressure size option that is not an integer'
+      )
+    )
+    expect(
+      () =>
+        new WorkerNode(
+          WorkerTypes.thread,
+          './tests/worker-files/thread/testWorker.mjs',
+          { tasksQueueBackPressureSize: 0.2 }
+        )
+    ).toThrow(
+      new TypeError(
+        'Cannot construct a worker node with a tasks queue back pressure size option that is not an integer'
+      )
+    )
+    expect(
+      () =>
+        new WorkerNode(
+          WorkerTypes.thread,
+          './tests/worker-files/thread/testWorker.mjs',
+          { tasksQueueBackPressureSize: 0 }
+        )
+    ).toThrow(
       new RangeError(
-        'Cannot construct a worker node with a tasks queue back pressure size that is not a positive integer'
+        'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer'
       )
     )
-    expect(() => new WorkerNode(threadWorker, -1)).toThrow(
+    expect(
+      () =>
+        new WorkerNode(
+          WorkerTypes.thread,
+          './tests/worker-files/thread/testWorker.mjs',
+          { tasksQueueBackPressureSize: -1 }
+        )
+    ).toThrow(
       new RangeError(
-        'Cannot construct a worker node with a tasks queue back pressure size that is not a positive integer'
+        'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer'
       )
     )
     expect(threadWorkerNode).toBeInstanceOf(WorkerNode)
-    expect(threadWorkerNode.worker).toBe(threadWorker)
     expect(threadWorkerNode.info).toStrictEqual({
-      id: threadWorker.threadId,
+      id: threadWorkerNode.worker.threadId,
       type: WorkerTypes.thread,
       dynamic: false,
       ready: false
@@ -88,9 +160,8 @@ describe('Worker node test suite', () => {
     expect(threadWorkerNode.taskFunctionsUsage).toBeInstanceOf(Map)
 
     expect(clusterWorkerNode).toBeInstanceOf(WorkerNode)
-    expect(clusterWorkerNode.worker).toBe(clusterWorker)
     expect(clusterWorkerNode.info).toStrictEqual({
-      id: clusterWorker.id,
+      id: clusterWorkerNode.worker.id,
       type: WorkerTypes.cluster,
       dynamic: false,
       ready: false