refactor: encapsulate worker node handling logic into its own class
authorJérôme Benoit <jerome.benoit@sap.com>
Fri, 7 Jul 2023 16:08:11 +0000 (18:08 +0200)
committerJérôme Benoit <jerome.benoit@sap.com>
Fri, 7 Jul 2023 16:08:11 +0000 (18:08 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>
src/index.ts
src/pools/abstract-pool.ts
src/pools/cluster/fixed.ts
src/pools/pool.ts
src/pools/selection-strategies/selection-strategies-types.ts
src/pools/thread/fixed.ts
src/pools/worker-node.ts [new file with mode: 0644]
src/pools/worker.ts

index eeda859ec863ab317e78d0303f9801c5a7f16f1d..54c352fc9f4f4b527201b4ef30198cd1c89e5c28 100644 (file)
@@ -4,7 +4,7 @@ export {
   FixedClusterPool,
   type ClusterPoolOptions
 } from './pools/cluster/fixed'
-export { PoolEvents, PoolTypes, WorkerTypes } from './pools/pool'
+export { PoolEvents, PoolTypes } from './pools/pool'
 export type {
   IPool,
   PoolEmitter,
@@ -12,21 +12,22 @@ export type {
   PoolInfo,
   PoolOptions,
   PoolType,
-  TasksQueueOptions,
-  WorkerType
+  TasksQueueOptions
 } from './pools/pool'
+export { WorkerTypes } from './pools/worker'
 export type {
   ErrorHandler,
   EventLoopUtilizationMeasurementStatistics,
   ExitHandler,
   IWorker,
+  IWorkerNode,
   MeasurementStatistics,
   MessageHandler,
   OnlineHandler,
   Task,
   TaskStatistics,
   WorkerInfo,
-  WorkerNode,
+  WorkerType,
   WorkerUsage
 } from './pools/worker'
 export {
index 4b1a05ff63f7a9b78ecc66c8541b272cad18a61b..097882b7dc971112b98050d95c52d43776f27362 100644 (file)
@@ -10,8 +10,6 @@ import {
   round
 } from '../utils'
 import { KillBehaviors } from '../worker/worker-options'
-import { CircularArray } from '../circular-array'
-import { Queue } from '../queue'
 import {
   type IPool,
   PoolEmitter,
@@ -20,16 +18,15 @@ import {
   type PoolOptions,
   type PoolType,
   PoolTypes,
-  type TasksQueueOptions,
-  type WorkerType,
-  WorkerTypes
+  type TasksQueueOptions
 } from './pool'
 import type {
   IWorker,
+  IWorkerNode,
   MessageHandler,
   Task,
   WorkerInfo,
-  WorkerNode,
+  WorkerType,
   WorkerUsage
 } from './worker'
 import {
@@ -40,6 +37,7 @@ import {
 } from './selection-strategies/selection-strategies-types'
 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
 import { version } from './version'
+import { WorkerNode } from './worker-node'
 
 /**
  * Base class that implements some shared logic for all poolifier pools.
@@ -54,7 +52,7 @@ export abstract class AbstractPool<
   Response = unknown
 > implements IPool<Worker, Data, Response> {
   /** @inheritDoc */
-  public readonly workerNodes: Array<WorkerNode<Worker, Data>> = []
+  public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
 
   /** @inheritDoc */
   public readonly emitter?: PoolEmitter
@@ -463,10 +461,7 @@ export abstract class AbstractPool<
       this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
     }
     for (const workerNode of this.workerNodes) {
-      this.setWorkerNodeTasksUsage(
-        workerNode,
-        this.getInitialWorkerUsage(workerNode.worker)
-      )
+      workerNode.resetUsage()
       this.setWorkerStatistics(workerNode.worker)
     }
   }
@@ -1036,19 +1031,6 @@ export abstract class AbstractPool<
     }
   }
 
-  /**
-   * Sets the given worker node its tasks usage in the pool.
-   *
-   * @param workerNode - The worker node.
-   * @param workerUsage - The worker usage.
-   */
-  private setWorkerNodeTasksUsage (
-    workerNode: WorkerNode<Worker, Data>,
-    workerUsage: WorkerUsage
-  ): void {
-    workerNode.usage = workerUsage
-  }
-
   /**
    * Gets the worker information.
    *
@@ -1065,57 +1047,9 @@ export abstract class AbstractPool<
    * @returns The worker nodes length.
    */
   private pushWorkerNode (worker: Worker): number {
-    this.workerNodes.push({
-      worker,
-      info: this.getInitialWorkerInfo(worker),
-      usage: this.getInitialWorkerUsage(),
-      tasksQueue: new Queue<Task<Data>>()
-    })
-    this.setWorkerNodeTasksUsage(
-      this.workerNodes[this.getWorkerNodeKey(worker)],
-      this.getInitialWorkerUsage(worker)
-    )
-    return this.workerNodes.length
+    return this.workerNodes.push(new WorkerNode(worker, this.worker))
   }
 
-  /**
-   * Gets the worker id.
-   *
-   * @param worker - The worker.
-   * @returns The worker id.
-   */
-  private getWorkerId (worker: Worker): number | undefined {
-    if (this.worker === WorkerTypes.thread) {
-      return worker.threadId
-    } else if (this.worker === WorkerTypes.cluster) {
-      return worker.id
-    }
-  }
-
-  // /**
-  //  * Sets the given worker in the pool worker nodes.
-  //  *
-  //  * @param workerNodeKey - The worker node key.
-  //  * @param worker - The worker.
-  //  * @param workerInfo - The worker info.
-  //  * @param workerUsage - The worker usage.
-  //  * @param tasksQueue - The worker task queue.
-  //  */
-  // private setWorkerNode (
-  //   workerNodeKey: number,
-  //   worker: Worker,
-  //   workerInfo: WorkerInfo,
-  //   workerUsage: WorkerUsage,
-  //   tasksQueue: Queue<Task<Data>>
-  // ): void {
-  //   this.workerNodes[workerNodeKey] = {
-  //     worker,
-  //     info: workerInfo,
-  //     usage: workerUsage,
-  //     tasksQueue
-  //   }
-  // }
-
   /**
    * Removes the given worker from the pool worker nodes.
    *
@@ -1135,19 +1069,15 @@ export abstract class AbstractPool<
   }
 
   private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
-    return this.workerNodes[workerNodeKey].tasksQueue.enqueue(task)
+    return this.workerNodes[workerNodeKey].enqueueTask(task)
   }
 
   private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
-    return this.workerNodes[workerNodeKey].tasksQueue.dequeue()
+    return this.workerNodes[workerNodeKey].dequeueTask()
   }
 
   private tasksQueueSize (workerNodeKey: number): number {
-    return this.workerNodes[workerNodeKey].tasksQueue.size
-  }
-
-  private tasksMaxQueueSize (workerNodeKey: number): number {
-    return this.workerNodes[workerNodeKey].tasksQueue.maxSize
+    return this.workerNodes[workerNodeKey].tasksQueueSize()
   }
 
   private flushTasksQueue (workerNodeKey: number): void {
@@ -1157,7 +1087,7 @@ export abstract class AbstractPool<
         this.dequeueTask(workerNodeKey) as Task<Data>
       )
     }
-    this.workerNodes[workerNodeKey].tasksQueue.clear()
+    this.workerNodes[workerNodeKey].clearTasksQueue()
   }
 
   private flushTasksQueues (): void {
@@ -1177,50 +1107,4 @@ export abstract class AbstractPool<
       }
     })
   }
-
-  private getInitialWorkerUsage (worker?: Worker): WorkerUsage {
-    const getTasksQueueSize = (worker?: Worker): number => {
-      if (worker == null) {
-        return 0
-      }
-      return this.tasksQueueSize(this.getWorkerNodeKey(worker))
-    }
-    const getTasksMaxQueueSize = (worker?: Worker): number => {
-      if (worker == null) {
-        return 0
-      }
-      return this.tasksMaxQueueSize(this.getWorkerNodeKey(worker))
-    }
-    return {
-      tasks: {
-        executed: 0,
-        executing: 0,
-        get queued (): number {
-          return getTasksQueueSize(worker)
-        },
-        get maxQueued (): number {
-          return getTasksMaxQueueSize(worker)
-        },
-        failed: 0
-      },
-      runTime: {
-        history: new CircularArray()
-      },
-      waitTime: {
-        history: new CircularArray()
-      },
-      elu: {
-        idle: {
-          history: new CircularArray()
-        },
-        active: {
-          history: new CircularArray()
-        }
-      }
-    }
-  }
-
-  private getInitialWorkerInfo (worker: Worker): WorkerInfo {
-    return { id: this.getWorkerId(worker), dynamic: false, started: true }
-  }
 }
index a88197f715bdd2c2a129638d3960dce9d9ac8420..fe920f8a48126529a31dafc2d0c9f31bb4c0e4cf 100644 (file)
@@ -1,13 +1,8 @@
 import cluster, { type ClusterSettings, type Worker } from 'node:cluster'
 import type { MessageValue } from '../../utility-types'
 import { AbstractPool } from '../abstract-pool'
-import {
-  type PoolOptions,
-  type PoolType,
-  PoolTypes,
-  type WorkerType,
-  WorkerTypes
-} from '../pool'
+import { type PoolOptions, type PoolType, PoolTypes } from '../pool'
+import { type WorkerType, WorkerTypes } from '../worker'
 
 /**
  * Options for a poolifier cluster pool.
index 5be4f95a9e62ceb643c49a98faffd0f0dc29395e..b1892752111884b3a9edcf82f8e33a776b34172e 100644 (file)
@@ -3,9 +3,10 @@ import type {
   ErrorHandler,
   ExitHandler,
   IWorker,
+  IWorkerNode,
   MessageHandler,
   OnlineHandler,
-  WorkerNode
+  WorkerType
 } from './worker'
 import type {
   WorkerChoiceStrategy,
@@ -31,19 +32,6 @@ export const PoolTypes = Object.freeze({
  */
 export type PoolType = keyof typeof PoolTypes
 
-/**
- * Enumeration of worker types.
- */
-export const WorkerTypes = Object.freeze({
-  cluster: 'cluster',
-  thread: 'thread'
-} as const)
-
-/**
- * Worker type.
- */
-export type WorkerType = keyof typeof WorkerTypes
-
 /**
  * Pool events emitter.
  */
@@ -68,35 +56,35 @@ export type PoolEvent = keyof typeof PoolEvents
  * Pool information.
  */
 export interface PoolInfo {
-  version: string
-  type: PoolType
-  worker: WorkerType
-  minSize: number
-  maxSize: number
+  readonly version: string
+  readonly type: PoolType
+  readonly worker: WorkerType
+  readonly minSize: number
+  readonly maxSize: number
   /** Pool utilization ratio. */
-  utilization?: number
+  readonly utilization?: number
   /** Pool total worker nodes */
-  workerNodes: number
+  readonly workerNodes: number
   /** Pool idle worker nodes */
-  idleWorkerNodes: number
+  readonly idleWorkerNodes: number
   /** Pool busy worker nodes */
-  busyWorkerNodes: number
-  executedTasks: number
-  executingTasks: number
-  queuedTasks: number
-  maxQueuedTasks: number
-  failedTasks: number
-  runTime?: {
-    minimum: number
-    maximum: number
-    average: number
-    median?: number
+  readonly busyWorkerNodes: number
+  readonly executedTasks: number
+  readonly executingTasks: number
+  readonly queuedTasks: number
+  readonly maxQueuedTasks: number
+  readonly failedTasks: number
+  readonly runTime?: {
+    readonly minimum: number
+    readonly maximum: number
+    readonly average: number
+    readonly median?: number
   }
-  waitTime?: {
-    minimum: number
-    maximum: number
-    average: number
-    median?: number
+  readonly waitTime?: {
+    readonly minimum: number
+    readonly maximum: number
+    readonly average: number
+    readonly median?: number
   }
 }
 
@@ -185,7 +173,7 @@ export interface IPool<
   /**
    * Pool worker nodes.
    */
-  readonly workerNodes: Array<WorkerNode<Worker, Data>>
+  readonly workerNodes: Array<IWorkerNode<Worker, Data>>
   /**
    * Emitter on which events can be listened to.
    *
@@ -204,18 +192,18 @@ export interface IPool<
    * @param name - The name of the worker function to execute. If not specified, the default worker function will be executed.
    * @returns Promise that will be fulfilled when the task is completed.
    */
-  execute: (data?: Data, name?: string) => Promise<Response>
+  readonly execute: (data?: Data, name?: string) => Promise<Response>
   /**
    * Terminates every current worker in this pool.
    */
-  destroy: () => Promise<void>
+  readonly destroy: () => Promise<void>
   /**
    * Sets the worker choice strategy in this pool.
    *
    * @param workerChoiceStrategy - The worker choice strategy.
    * @param workerChoiceStrategyOptions - The worker choice strategy options.
    */
-  setWorkerChoiceStrategy: (
+  readonly setWorkerChoiceStrategy: (
     workerChoiceStrategy: WorkerChoiceStrategy,
     workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
   ) => void
@@ -224,7 +212,7 @@ export interface IPool<
    *
    * @param workerChoiceStrategyOptions - The worker choice strategy options.
    */
-  setWorkerChoiceStrategyOptions: (
+  readonly setWorkerChoiceStrategyOptions: (
     workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
   ) => void
   /**
@@ -233,7 +221,7 @@ export interface IPool<
    * @param enable - Whether to enable or disable the worker tasks queue.
    * @param tasksQueueOptions - The worker tasks queue options.
    */
-  enableTasksQueue: (
+  readonly enableTasksQueue: (
     enable: boolean,
     tasksQueueOptions?: TasksQueueOptions
   ) => void
@@ -242,5 +230,5 @@ export interface IPool<
    *
    * @param tasksQueueOptions - The worker tasks queue options.
    */
-  setTasksQueueOptions: (tasksQueueOptions: TasksQueueOptions) => void
+  readonly setTasksQueueOptions: (tasksQueueOptions: TasksQueueOptions) => void
 }
index 9f68c22f892c50a3e424b59a3cb0cffe9f61557f..628e309566f7d37daaafbdd9c5277a0dd4420bea 100644 (file)
@@ -169,30 +169,30 @@ export interface IWorkerChoiceStrategy {
    *
    * @returns `true` if the reset is successful, `false` otherwise.
    */
-  reset: () => boolean
+  readonly reset: () => boolean
   /**
    * Updates the worker node key strategy internals.
    *
    * @returns `true` if the update is successful, `false` otherwise.
    */
-  update: (workerNodeKey: number) => boolean
+  readonly update: (workerNodeKey: number) => boolean
   /**
    * Chooses a worker node in the pool and returns its key.
    *
    * @returns The worker node key.
    */
-  choose: () => number
+  readonly choose: () => number
   /**
    * Removes the worker node key from strategy internals.
    *
    * @param workerNodeKey - The worker node key.
    * @returns `true` if the worker node key is removed, `false` otherwise.
    */
-  remove: (workerNodeKey: number) => boolean
+  readonly remove: (workerNodeKey: number) => boolean
   /**
    * Sets the worker choice strategy options.
    *
    * @param opts - The worker choice strategy options.
    */
-  setOptions: (opts: WorkerChoiceStrategyOptions) => void
+  readonly setOptions: (opts: WorkerChoiceStrategyOptions) => void
 }
index ac629e1fb0ff7f363be0a3fe5107f2b02748c8d8..9861a3c15c15bc79b9ec6e4966dddb8021d8ecc3 100644 (file)
@@ -6,13 +6,8 @@ import {
 } from 'node:worker_threads'
 import type { MessageValue } from '../../utility-types'
 import { AbstractPool } from '../abstract-pool'
-import {
-  type PoolOptions,
-  type PoolType,
-  PoolTypes,
-  type WorkerType,
-  WorkerTypes
-} from '../pool'
+import { type PoolOptions, type PoolType, PoolTypes } from '../pool'
+import { type WorkerType, WorkerTypes } from '../worker'
 
 /**
  * Options for a poolifier thread pool.
diff --git a/src/pools/worker-node.ts b/src/pools/worker-node.ts
new file mode 100644 (file)
index 0000000..a493b65
--- /dev/null
@@ -0,0 +1,121 @@
+import { CircularArray } from '../circular-array'
+import { Queue } from '../queue'
+import {
+  type IWorker,
+  type IWorkerNode,
+  type Task,
+  type WorkerInfo,
+  type WorkerType,
+  WorkerTypes,
+  type WorkerUsage
+} from './worker'
+
+export class WorkerNode<Worker extends IWorker, Data = unknown>
+implements IWorkerNode<Worker, Data> {
+  public readonly worker: Worker
+  public readonly info: WorkerInfo
+  public usage: WorkerUsage
+  private readonly tasksQueue: Queue<Task<Data>>
+
+  constructor (worker: Worker, workerType: WorkerType) {
+    this.worker = worker
+    this.info = this.initWorkerInfo(worker, workerType)
+    this.usage = this.initWorkerUsage()
+    this.tasksQueue = new Queue<Task<Data>>()
+  }
+
+  /** @inheritdoc */
+  public tasksQueueSize (): number {
+    return this.tasksQueue.size
+  }
+
+  /**
+   * Worker node tasks queue maximum size.
+   *
+   * @returns The tasks queue maximum size.
+   */
+  private tasksQueueMaxSize (): number {
+    return this.tasksQueue.maxSize
+  }
+
+  /** @inheritdoc */
+  public enqueueTask (task: Task<Data>): number {
+    return this.tasksQueue.enqueue(task)
+  }
+
+  /** @inheritdoc */
+  public dequeueTask (): Task<Data> | undefined {
+    return this.tasksQueue.dequeue()
+  }
+
+  /** @inheritdoc */
+  public clearTasksQueue (): void {
+    this.tasksQueue.clear()
+  }
+
+  public resetUsage (): void {
+    this.usage = this.initWorkerUsage()
+  }
+
+  private initWorkerInfo (worker: Worker, workerType: WorkerType): WorkerInfo {
+    return {
+      id: this.getWorkerId(worker, workerType),
+      type: workerType,
+      dynamic: false,
+      started: true
+    }
+  }
+
+  private initWorkerUsage (): WorkerUsage {
+    const getTasksQueueSize = (): number => {
+      return this.tasksQueueSize()
+    }
+    const getTasksMaxQueueSize = (): number => {
+      return this.tasksQueueMaxSize()
+    }
+    return {
+      tasks: {
+        executed: 0,
+        executing: 0,
+        get queued (): number {
+          return getTasksQueueSize()
+        },
+        get maxQueued (): number {
+          return getTasksMaxQueueSize()
+        },
+        failed: 0
+      },
+      runTime: {
+        history: new CircularArray()
+      },
+      waitTime: {
+        history: new CircularArray()
+      },
+      elu: {
+        idle: {
+          history: new CircularArray()
+        },
+        active: {
+          history: new CircularArray()
+        }
+      }
+    }
+  }
+
+  /**
+   * Gets the worker id.
+   *
+   * @param worker - The worker.
+   * @returns The worker id.
+   */
+  private getWorkerId (
+    worker: Worker,
+    workerType: WorkerType
+  ): number | undefined {
+    if (workerType === WorkerTypes.thread) {
+      return worker.threadId
+    } else if (workerType === WorkerTypes.cluster) {
+      return worker.id
+    }
+  }
+}
index d071413b04a0c1ab5f79f3bff17ac9c78354f2d5..81b49d3bc4a0f02fb99f964b626fecae387c2d33 100644 (file)
@@ -1,5 +1,4 @@
 import type { CircularArray } from '../circular-array'
-import type { Queue } from '../queue'
 
 /**
  * Callback invoked if the worker has received a message.
@@ -126,6 +125,19 @@ export interface TaskStatistics {
   failed: number
 }
 
+/**
+ * Enumeration of worker types.
+ */
+export const WorkerTypes = Object.freeze({
+  cluster: 'cluster',
+  thread: 'thread'
+} as const)
+
+/**
+ * Worker type.
+ */
+export type WorkerType = keyof typeof WorkerTypes
+
 /**
  * Worker information.
  *
@@ -136,6 +148,10 @@ export interface WorkerInfo {
    * Worker id.
    */
   readonly id: number | undefined
+  /**
+   * Worker type.
+   */
+  type: WorkerType
   /**
    * Dynamic flag.
    */
@@ -185,7 +201,7 @@ export interface IWorker {
    * @param event - The event.
    * @param handler - The event handler.
    */
-  on: ((event: 'message', handler: MessageHandler<this>) => void) &
+  readonly on: ((event: 'message', handler: MessageHandler<this>) => void) &
   ((event: 'error', handler: ErrorHandler<this>) => void) &
   ((event: 'online', handler: OnlineHandler<this>) => void) &
   ((event: 'exit', handler: ExitHandler<this>) => void)
@@ -195,7 +211,7 @@ export interface IWorker {
    * @param event - `'exit'`.
    * @param handler - The exit handler.
    */
-  once: (event: 'exit', handler: ExitHandler<this>) => void
+  readonly once: (event: 'exit', handler: ExitHandler<this>) => void
 }
 
 /**
@@ -205,7 +221,7 @@ export interface IWorker {
  * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
  * @internal
  */
-export interface WorkerNode<Worker extends IWorker, Data = unknown> {
+export interface IWorkerNode<Worker extends IWorker, Data = unknown> {
   /**
    * Worker node worker.
    */
@@ -219,7 +235,30 @@ export interface WorkerNode<Worker extends IWorker, Data = unknown> {
    */
   usage: WorkerUsage
   /**
-   * Worker node tasks queue.
+   * Worker node tasks queue size.
+   *
+   * @returns The tasks queue size.
+   */
+  readonly tasksQueueSize: () => number
+  /**
+   * Worker node enqueue task.
+   *
+   * @param task - The task to queue.
+   * @returns The task queue size.
+   */
+  readonly enqueueTask: (task: Task<Data>) => number
+  /**
+   * Worker node dequeue task.
+   *
+   * @returns The dequeued task.
+   */
+  readonly dequeueTask: () => Task<Data> | undefined
+  /**
+   * Worker node clear tasks queue.
+   */
+  readonly clearTasksQueue: () => void
+  /**
+   * Worker node reset usage statistics .
    */
-  readonly tasksQueue: Queue<Task<Data>>
+  readonly resetUsage: () => void
 }