feat: expose pool information
authorJérôme Benoit <jerome.benoit@sap.com>
Fri, 2 Jun 2023 15:28:54 +0000 (17:28 +0200)
committerJérôme Benoit <jerome.benoit@sap.com>
Fri, 2 Jun 2023 15:28:54 +0000 (17:28 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>
12 files changed:
CHANGELOG.md
src/index.ts
src/pools/abstract-pool.ts
src/pools/cluster/dynamic.ts
src/pools/cluster/fixed.ts
src/pools/pool.ts
src/pools/thread/dynamic.ts
src/pools/thread/fixed.ts
src/queue.ts
tests/pools/abstract/abstract-pool.test.js
tests/pools/cluster/fixed.test.js
tests/pools/thread/fixed.test.js

index a2d09e6e43f0b262d90d8728f612b2662c01169a..862fcc4c69735b825678843fb678313c17f62204 100644 (file)
@@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
 ### Added
 
 - Add `taskError` pool event for task execution error.
+- Add pool information `info` property to pool.
 - Emit pool information on `busy` and `full` pool events.
 
 ## [2.5.1] - 2023-06-01
index 03ee28090ba4aef76bdd84d8c5072185487e7f5a..fd3cefd29305757d0c474994dedfbeaec2259192 100644 (file)
@@ -2,11 +2,12 @@ export { DynamicClusterPool } from './pools/cluster/dynamic'
 export { FixedClusterPool } from './pools/cluster/fixed'
 export type { ClusterPoolOptions } from './pools/cluster/fixed'
 export type { AbstractPool } from './pools/abstract-pool'
-export { PoolEvents } from './pools/pool'
+export { PoolEvents, PoolTypes } from './pools/pool'
 export type {
   IPool,
   PoolEmitter,
   PoolEvent,
+  PoolInfo,
   PoolOptions,
   PoolType,
   TasksQueueOptions
index 3127557e20d38fe21ac5d30757450aff623fcef6..58fc28ce12d6a56416b6db05cec3e5186c549a14 100644 (file)
@@ -13,8 +13,10 @@ import {
   type IPool,
   PoolEmitter,
   PoolEvents,
+  type PoolInfo,
   type PoolOptions,
-  PoolType,
+  type PoolType,
+  PoolTypes,
   type TasksQueueOptions
 } from './pool'
 import type { IWorker, Task, TasksUsage, WorkerNode } from './worker'
@@ -133,7 +135,7 @@ export abstract class AbstractPool<
       throw new RangeError(
         'Cannot instantiate a pool with a negative number of workers'
       )
-    } else if (this.type === PoolType.FIXED && numberOfWorkers === 0) {
+    } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
       throw new Error('Cannot instantiate a fixed pool with no worker')
     }
   }
@@ -185,7 +187,7 @@ export abstract class AbstractPool<
     }
     if (
       workerChoiceStrategyOptions.weights != null &&
-      Object.keys(workerChoiceStrategyOptions.weights).length !== this.size
+      Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
     ) {
       throw new Error(
         'Invalid worker choice strategy options: must have a weight for each worker node'
@@ -212,30 +214,48 @@ export abstract class AbstractPool<
   public abstract get type (): PoolType
 
   /** @inheritDoc */
-  public abstract get size (): number
+  public get info (): PoolInfo {
+    return {
+      type: this.type,
+      minSize: this.minSize,
+      maxSize: this.maxSize,
+      workerNodes: this.workerNodes.length,
+      idleWorkerNodes: this.workerNodes.reduce(
+        (accumulator, workerNode) =>
+          workerNode.tasksUsage.running === 0 ? accumulator + 1 : accumulator,
+        0
+      ),
+      busyWorkerNodes: this.workerNodes.reduce(
+        (accumulator, workerNode) =>
+          workerNode.tasksUsage.running > 0 ? accumulator + 1 : accumulator,
+        0
+      ),
+      runningTasks: this.workerNodes.reduce(
+        (accumulator, workerNode) =>
+          accumulator + workerNode.tasksUsage.running,
+        0
+      ),
+      queuedTasks: this.workerNodes.reduce(
+        (accumulator, workerNode) => accumulator + workerNode.tasksQueue.size,
+        0
+      ),
+      maxQueuedTasks: this.workerNodes.reduce(
+        (accumulator, workerNode) =>
+          accumulator + workerNode.tasksQueue.maxSize,
+        0
+      )
+    }
+  }
 
   /**
-   * Number of tasks running in the pool.
+   * Pool minimum size.
    */
-  private get numberOfRunningTasks (): number {
-    return this.workerNodes.reduce(
-      (accumulator, workerNode) => accumulator + workerNode.tasksUsage.running,
-      0
-    )
-  }
+  protected abstract get minSize (): number
 
   /**
-   * Number of tasks queued in the pool.
+   * Pool maximum size.
    */
-  private get numberOfQueuedTasks (): number {
-    if (this.opts.enableTasksQueue === false) {
-      return 0
-    }
-    return this.workerNodes.reduce(
-      (accumulator, workerNode) => accumulator + workerNode.tasksQueue.size,
-      0
-    )
-  }
+  protected abstract get maxSize (): number
 
   /**
    * Gets the given worker its worker node key.
@@ -497,7 +517,7 @@ export abstract class AbstractPool<
    */
   protected chooseWorkerNode (): number {
     let workerNodeKey: number
-    if (this.type === PoolType.DYNAMIC && !this.full && this.internalBusy()) {
+    if (this.type === PoolTypes.dynamic && !this.full && this.internalBusy()) {
       const workerCreated = this.createAndSetupWorker()
       this.registerWorkerMessageListener(workerCreated, message => {
         const currentWorkerNodeKey = this.getWorkerNodeKey(workerCreated)
@@ -627,17 +647,11 @@ export abstract class AbstractPool<
 
   private checkAndEmitEvents (): void {
     if (this.emitter != null) {
-      const poolInfo = {
-        size: this.size,
-        workerNodes: this.workerNodes.length,
-        runningTasks: this.numberOfRunningTasks,
-        queuedTasks: this.numberOfQueuedTasks
-      }
       if (this.busy) {
-        this.emitter?.emit(PoolEvents.busy, poolInfo)
+        this.emitter?.emit(PoolEvents.busy, this.info)
       }
-      if (this.type === PoolType.DYNAMIC && this.full) {
-        this.emitter?.emit(PoolEvents.full, poolInfo)
+      if (this.type === PoolTypes.dynamic && this.full) {
+        this.emitter?.emit(PoolEvents.full, this.info)
       }
     }
   }
index 67020577d2be51aaedce5fb37015a0e3edddca91..d4f270a373bc4e0c79198edf7538cb55a62f5e9d 100644 (file)
@@ -1,6 +1,5 @@
-import { PoolType } from '../pool'
-import type { ClusterPoolOptions } from './fixed'
-import { FixedClusterPool } from './fixed'
+import { type PoolType, PoolTypes } from '../pool'
+import { type ClusterPoolOptions, FixedClusterPool } from './fixed'
 
 /**
  * A cluster pool with a dynamic number of workers, but a guaranteed minimum number of workers.
@@ -36,11 +35,11 @@ export class DynamicClusterPool<
 
   /** @inheritDoc */
   public get type (): PoolType {
-    return PoolType.DYNAMIC
+    return PoolTypes.dynamic
   }
 
   /** @inheritDoc */
-  public get size (): number {
+  protected get maxSize (): number {
     return this.max
   }
 
index 89cab8ef0eba31aacbbde03fa4de24d68d9530d6..a0c6b16f6ed7b5a16e38c30c75c62302cd54b9c7 100644 (file)
@@ -2,8 +2,7 @@ import type { ClusterSettings, Worker } from 'node:cluster'
 import cluster from 'node:cluster'
 import type { MessageValue } from '../../utility-types'
 import { AbstractPool } from '../abstract-pool'
-import type { PoolOptions } from '../pool'
-import { PoolType } from '../pool'
+import { type PoolOptions, type PoolType, PoolTypes } from '../pool'
 
 /**
  * Options for a poolifier cluster pool.
@@ -97,11 +96,16 @@ export class FixedClusterPool<
 
   /** @inheritDoc */
   public get type (): PoolType {
-    return PoolType.FIXED
+    return PoolTypes.fixed
   }
 
   /** @inheritDoc */
-  public get size (): number {
+  protected get minSize (): number {
+    return this.numberOfWorkers
+  }
+
+  /** @inheritDoc */
+  protected get maxSize (): number {
     return this.numberOfWorkers
   }
 
index 32adfc66ab7a7b7cd4e6dc95e7e8d9346592773c..0013bf5dc20a27a9b84b63645233d2e636c9df49 100644 (file)
@@ -13,21 +13,23 @@ import type {
 } from './selection-strategies/selection-strategies-types'
 
 /**
- * Pool types.
- *
- * @enum
- * @internal
+ * Enumeration of pool types.
  */
-export enum PoolType {
+export const PoolTypes = Object.freeze({
   /**
    * Fixed pool type.
    */
-  FIXED = 'fixed',
+  fixed: 'fixed',
   /**
    * Dynamic pool type.
    */
-  DYNAMIC = 'dynamic'
-}
+  dynamic: 'dynamic'
+} as const)
+
+/**
+ * Pool type.
+ */
+export type PoolType = keyof typeof PoolTypes
 
 /**
  * Pool events emitter.
@@ -49,6 +51,21 @@ export const PoolEvents = Object.freeze({
  */
 export type PoolEvent = keyof typeof PoolEvents
 
+/**
+ * Pool information.
+ */
+export interface PoolInfo {
+  type: PoolType
+  minSize: number
+  maxSize: number
+  workerNodes: number
+  idleWorkerNodes: number
+  busyWorkerNodes: number
+  runningTasks: number
+  queuedTasks: number
+  maxQueuedTasks: number
+}
+
 /**
  * Worker tasks queue options.
  */
@@ -134,9 +151,9 @@ export interface IPool<
    */
   readonly type: PoolType
   /**
-   * Pool maximum size.
+   * Pool information.
    */
-  readonly size: number
+  readonly info: PoolInfo
   /**
    * Pool worker nodes.
    */
index 0519873ec6d870ef0c1d08e6cb872541cf5a0865..00d25c0a5fd8d1060a575b9995c4e189475f15a5 100644 (file)
@@ -1,7 +1,5 @@
-import type { PoolOptions } from '../pool'
-import { PoolType } from '../pool'
-import type { ThreadWorkerWithMessageChannel } from './fixed'
-import { FixedThreadPool } from './fixed'
+import { type PoolOptions, type PoolType, PoolTypes } from '../pool'
+import { FixedThreadPool, type ThreadWorkerWithMessageChannel } from './fixed'
 
 /**
  * A thread pool with a dynamic number of threads, but a guaranteed minimum number of threads.
@@ -37,7 +35,7 @@ export class DynamicThreadPool<
 
   /** @inheritDoc */
   public get type (): PoolType {
-    return PoolType.DYNAMIC
+    return PoolTypes.dynamic
   }
 
   /** @inheritDoc */
@@ -46,7 +44,7 @@ export class DynamicThreadPool<
   }
 
   /** @inheritDoc */
-  public get size (): number {
+  protected get maxSize (): number {
     return this.max
   }
 
index 816fa61a432db55b0980092bb3bf7f45818c1364..9939a04b9e497f5e78a84396493db7819f837508 100644 (file)
@@ -6,8 +6,7 @@ import {
 } from 'node:worker_threads'
 import type { Draft, MessageValue } from '../../utility-types'
 import { AbstractPool } from '../abstract-pool'
-import type { PoolOptions } from '../pool'
-import { PoolType } from '../pool'
+import { type PoolOptions, type PoolType, PoolTypes } from '../pool'
 
 /**
  * A thread worker with message channels for communication between main thread and thread worker.
@@ -93,11 +92,16 @@ export class FixedThreadPool<
 
   /** @inheritDoc */
   public get type (): PoolType {
-    return PoolType.FIXED
+    return PoolTypes.fixed
   }
 
   /** @inheritDoc */
-  public get size (): number {
+  protected get minSize (): number {
+    return this.numberOfWorkers
+  }
+
+  /** @inheritDoc */
+  protected get maxSize (): number {
     return this.numberOfWorkers
   }
 
index 7682f65f952136743c1f81724a26e53264ac18ed..86b69051174672a92c158317f84beb1874cebf07 100644 (file)
@@ -9,11 +9,13 @@ export class Queue<T> {
   private items: Record<number, T>
   private head: number
   private tail: number
+  private max: number
 
   public constructor () {
     this.items = {}
     this.head = 0
     this.tail = 0
+    this.max = 0
   }
 
   /**
@@ -26,6 +28,16 @@ export class Queue<T> {
     return this.tail - this.head
   }
 
+  /**
+   * Get the maximum size of the queue.
+   *
+   * @returns The maximum size of the queue.
+   * @readonly
+   */
+  public get maxSize (): number {
+    return this.max
+  }
+
   /**
    * Enqueue an item.
    *
@@ -35,6 +47,7 @@ export class Queue<T> {
   public enqueue (item: T): number {
     this.items[this.tail] = item
     this.tail++
+    if (this.size > this.max) this.max = this.size
     return this.size
   }
 
index 91c942f646b65b9de10ec6f40dba2f70aa333730..b41ab1adc0265986b7f03603a4ca6e8553f5a0f9 100644 (file)
@@ -5,7 +5,8 @@ const {
   FixedClusterPool,
   FixedThreadPool,
   PoolEvents,
-  WorkerChoiceStrategies
+  WorkerChoiceStrategies,
+  PoolTypes
 } = require('../../../lib')
 const { CircularArray } = require('../../../lib/circular-array')
 const { Queue } = require('../../../lib/queue')
@@ -274,6 +275,42 @@ describe('Abstract pool test suite', () => {
     await pool.destroy()
   })
 
+  it('Verify that pool info is set', async () => {
+    let pool = new FixedThreadPool(
+      numberOfWorkers,
+      './tests/worker-files/thread/testWorker.js'
+    )
+    expect(pool.info).toStrictEqual({
+      type: PoolTypes.fixed,
+      minSize: numberOfWorkers,
+      maxSize: numberOfWorkers,
+      workerNodes: numberOfWorkers,
+      idleWorkerNodes: numberOfWorkers,
+      busyWorkerNodes: 0,
+      runningTasks: 0,
+      queuedTasks: 0,
+      maxQueuedTasks: 0
+    })
+    await pool.destroy()
+    pool = new DynamicClusterPool(
+      numberOfWorkers,
+      numberOfWorkers * 2,
+      './tests/worker-files/thread/testWorker.js'
+    )
+    expect(pool.info).toStrictEqual({
+      type: PoolTypes.dynamic,
+      minSize: numberOfWorkers,
+      maxSize: numberOfWorkers * 2,
+      workerNodes: numberOfWorkers,
+      idleWorkerNodes: numberOfWorkers,
+      busyWorkerNodes: 0,
+      runningTasks: 0,
+      queuedTasks: 0,
+      maxQueuedTasks: 0
+    })
+    await pool.destroy()
+  })
+
   it('Simulate worker not found', async () => {
     const pool = new StubPoolWithRemoveAllWorker(
       numberOfWorkers,
index ac5d1fda6943a67651ea521e924cfcf8283a20c3..9fc2b1b456a4cc5e323a14a276f3633372dba596 100644 (file)
@@ -102,8 +102,11 @@ describe('Fixed cluster pool test suite', () => {
       expect(workerNode.tasksUsage.run).toBe(0)
       expect(workerNode.tasksQueue.size).toBeGreaterThan(0)
     }
-    expect(queuePool.numberOfRunningTasks).toBe(numberOfWorkers)
-    expect(queuePool.numberOfQueuedTasks).toBe(
+    expect(queuePool.info.runningTasks).toBe(numberOfWorkers)
+    expect(queuePool.info.queuedTasks).toBe(
+      numberOfWorkers * maxMultiplier - numberOfWorkers
+    )
+    expect(queuePool.info.maxQueuedTasks).toBe(
       numberOfWorkers * maxMultiplier - numberOfWorkers
     )
     await Promise.all(promises)
index ef368968922282de2d5090f4d68242946c46e481..dcb7fac7f11804289e4a01729ee16980292d9696 100644 (file)
@@ -102,8 +102,11 @@ describe('Fixed thread pool test suite', () => {
       expect(workerNode.tasksUsage.run).toBe(0)
       expect(workerNode.tasksQueue.size).toBeGreaterThan(0)
     }
-    expect(queuePool.numberOfRunningTasks).toBe(numberOfThreads)
-    expect(queuePool.numberOfQueuedTasks).toBe(
+    expect(queuePool.info.runningTasks).toBe(numberOfThreads)
+    expect(queuePool.info.queuedTasks).toBe(
+      numberOfThreads * maxMultiplier - numberOfThreads
+    )
+    expect(queuePool.info.maxQueuedTasks).toBe(
       numberOfThreads * maxMultiplier - numberOfThreads
     )
     await Promise.all(promises)