Reset all internal statistics at worker choice strategy change
authorJérôme Benoit <jerome.benoit@sap.com>
Tue, 11 Oct 2022 20:35:06 +0000 (22:35 +0200)
committerJérôme Benoit <jerome.benoit@sap.com>
Tue, 11 Oct 2022 20:35:06 +0000 (22:35 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>
17 files changed:
src/index.ts
src/pools/abstract-pool-worker.ts [deleted file]
src/pools/abstract-pool.ts
src/pools/pool-internal.ts
src/pools/pool-worker.ts
src/pools/selection-strategies/abstract-worker-choice-strategy.ts
src/pools/selection-strategies/dynamic-pool-worker-choice-strategy.ts
src/pools/selection-strategies/fair-share-worker-choice-strategy.ts
src/pools/selection-strategies/less-recently-used-worker-choice-strategy.ts
src/pools/selection-strategies/round-robin-worker-choice-strategy.ts
src/pools/selection-strategies/selection-strategies-types.ts
src/pools/selection-strategies/selection-strategies-utils.ts
src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts
src/pools/selection-strategies/worker-choice-strategy-context.ts
src/utility-types.ts
src/worker/abstract-worker.ts
src/worker/worker-options.ts

index b0c419ca89e4f6d1838ffe93368d26e3d292d77c..6c139a27950240a97fc69357f6094d5d765001b4 100644 (file)
@@ -6,6 +6,7 @@ export type {
   ErrorHandler,
   ExitHandler,
   IPoolWorker,
+  MessageHandler,
   OnlineHandler
 } from './pools/pool-worker'
 export { WorkerChoiceStrategies } from './pools/selection-strategies/selection-strategies-types'
diff --git a/src/pools/abstract-pool-worker.ts b/src/pools/abstract-pool-worker.ts
deleted file mode 100644 (file)
index a27d03e..0000000
+++ /dev/null
@@ -1,23 +0,0 @@
-import type {
-  ErrorHandler,
-  ExitHandler,
-  IPoolWorker,
-  MessageHandler,
-  OnlineHandler
-} from './pool-worker'
-
-/**
- * Basic class that implement the minimum required for a pool worker.
- */
-export abstract class AbstractPoolWorker implements IPoolWorker {
-  /** @inheritDoc  */
-  abstract on (event: 'message', handler: MessageHandler<this>): void
-  /** @inheritDoc  */
-  abstract on (event: 'error', handler: ErrorHandler<this>): void
-  /** @inheritDoc  */
-  abstract on (event: 'online', handler: OnlineHandler<this>): void
-  /** @inheritDoc  */
-  abstract on (event: 'exit', handler: ExitHandler<this>): void
-  /** @inheritDoc  */
-  abstract once (event: 'exit', handler: ExitHandler<this>): void
-}
index 369e5b90617bab507d97e36331c00a93d6a47c67..c4181e76dfb62bc08cb2f7fbcf86e3d131038c1e 100644 (file)
@@ -4,10 +4,10 @@ import type {
 } from '../utility-types'
 import { EMPTY_FUNCTION } from '../utils'
 import { isKillBehavior, KillBehaviors } from '../worker/worker-options'
-import type { AbstractPoolWorker } from './abstract-pool-worker'
 import type { PoolOptions } from './pool'
 import type { IPoolInternal, TasksUsage } from './pool-internal'
 import { PoolEmitter, PoolType } from './pool-internal'
+import type { IPoolWorker } from './pool-worker'
 import {
   WorkerChoiceStrategies,
   WorkerChoiceStrategy
@@ -18,27 +18,22 @@ const WORKER_NOT_FOUND_TASKS_USAGE_MAP =
   'Worker could not be found in worker tasks usage map'
 
 /**
- * Base class containing some shared logic for all poolifier pools.
+ * Base class that implements some shared logic for all poolifier pools.
  *
  * @template Worker Type of worker which manages this pool.
  * @template Data Type of data sent to the worker. This can only be serializable data.
  * @template Response Type of response of execution. This can only be serializable data.
  */
 export abstract class AbstractPool<
-  Worker extends AbstractPoolWorker,
+  Worker extends IPoolWorker,
   Data = unknown,
   Response = unknown
 > implements IPoolInternal<Worker, Data, Response> {
   /** @inheritDoc */
   public readonly workers: Worker[] = []
 
-  /**
-   * The workers tasks usage map.
-   *
-   *  `key`: The `Worker`
-   *  `value`: Worker tasks usage statistics.
-   */
-  protected workersTasksUsage: Map<Worker, TasksUsage> = new Map<
+  /** @inheritDoc */
+  public readonly workersTasksUsage: Map<Worker, TasksUsage> = new Map<
     Worker,
     TasksUsage
   >()
@@ -182,6 +177,9 @@ export abstract class AbstractPool<
     workerChoiceStrategy: WorkerChoiceStrategy
   ): void {
     this.opts.workerChoiceStrategy = workerChoiceStrategy
+    for (const worker of this.workers) {
+      this.resetWorkerTasksUsage(worker)
+    }
     this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
       workerChoiceStrategy
     )
@@ -355,12 +353,7 @@ export abstract class AbstractPool<
     this.workers.push(worker)
 
     // Init worker tasks usage map
-    this.workersTasksUsage.set(worker, {
-      run: 0,
-      running: 0,
-      runTime: 0,
-      avgRunTime: 0
-    })
+    this.initWorkerTasksUsage(worker)
 
     this.afterWorkerSetup(worker)
 
@@ -469,6 +462,20 @@ export abstract class AbstractPool<
     }
   }
 
+  /**
+   * Initializes tasks usage statistics.
+   *
+   * @param worker The worker.
+   */
+  initWorkerTasksUsage (worker: Worker): void {
+    this.workersTasksUsage.set(worker, {
+      run: 0,
+      running: 0,
+      runTime: 0,
+      avgRunTime: 0
+    })
+  }
+
   /**
    * Removes worker tasks usage statistics.
    *
@@ -477,4 +484,14 @@ export abstract class AbstractPool<
   private removeWorkerTasksUsage (worker: Worker): void {
     this.workersTasksUsage.delete(worker)
   }
+
+  /**
+   * Resets worker tasks usage statistics.
+   *
+   * @param worker The worker.
+   */
+  private resetWorkerTasksUsage (worker: Worker): void {
+    this.removeWorkerTasksUsage(worker)
+    this.initWorkerTasksUsage(worker)
+  }
 }
index 2cf84adbc6bfd28e9d552795cbb3b49bbe8ffee4..6e2850a892d8d63f024e09d6db40e23b43129c06 100644 (file)
@@ -1,6 +1,6 @@
 import EventEmitter from 'events'
-import type { AbstractPoolWorker } from './abstract-pool-worker'
 import type { IPool } from './pool'
+import type { IPoolWorker } from './pool-worker'
 
 /**
  * Pool types.
@@ -33,7 +33,7 @@ export class PoolEmitter extends EventEmitter {}
  * @template Response Type of response of execution.
  */
 export interface IPoolInternal<
-  Worker extends AbstractPoolWorker,
+  Worker extends IPoolWorker,
   Data = unknown,
   Response = unknown
 > extends IPool<Data, Response> {
@@ -42,6 +42,14 @@ export interface IPoolInternal<
    */
   readonly workers: Worker[]
 
+  /**
+   * The workers tasks usage map.
+   *
+   *  `key`: The `Worker`
+   *  `value`: Worker tasks usage statistics.
+   */
+  readonly workersTasksUsage: Map<Worker, TasksUsage>
+
   /**
    * Emitter on which events can be listened to.
    *
index d46eda19c75cd51533562f06380a3a70cf8ef05e..c89205ac93582b3dff2c1123285f09790edfb509 100644 (file)
@@ -28,13 +28,9 @@ export type OnlineHandler<Worker> = (this: Worker) => void
 export type ExitHandler<Worker> = (this: Worker, code: number) => void
 
 /**
- * Basic interface that describes the minimum required implementation of listener events for a pool worker.
+ * Interface that describes the minimum required implementation of listener events for a pool worker.
  */
 export interface IPoolWorker {
-  /**
-   * Worker identifier.
-   */
-  readonly id?: number
   /**
    * Register a listener to the message event.
    *
index cd50f700bdb74d1fe75f6f37a00d3867ece5c178..d624bdd2f0128079d68eef6ee16d888d8373031f 100644 (file)
@@ -1,6 +1,6 @@
-import type { AbstractPoolWorker } from '../abstract-pool-worker'
 import type { IPoolInternal } from '../pool-internal'
 import { PoolType } from '../pool-internal'
+import type { IPoolWorker } from '../pool-worker'
 import type {
   IWorkerChoiceStrategy,
   RequiredStatistics
@@ -14,12 +14,12 @@ import type {
  * @template Response Type of response of execution. This can only be serializable data.
  */
 export abstract class AbstractWorkerChoiceStrategy<
-  Worker extends AbstractPoolWorker,
+  Worker extends IPoolWorker,
   Data,
   Response
 > implements IWorkerChoiceStrategy<Worker> {
   /** @inheritDoc */
-  public isDynamicPool: boolean = this.pool.type === PoolType.DYNAMIC
+  public readonly isDynamicPool: boolean = this.pool.type === PoolType.DYNAMIC
   /** @inheritDoc */
   public requiredStatistics: RequiredStatistics = {
     runTime: false
@@ -34,6 +34,9 @@ export abstract class AbstractWorkerChoiceStrategy<
     protected readonly pool: IPoolInternal<Worker, Data, Response>
   ) {}
 
+  /** @inheritDoc */
+  public abstract resetStatistics (): boolean
+
   /** @inheritDoc */
   public abstract choose (): Worker
 }
index 68b48e32744739c5060458b71d815d1889a3640a..731bb91b4b0fad58d7ea40fb9511ada5b538b17e 100644 (file)
@@ -1,5 +1,5 @@
-import type { AbstractPoolWorker } from '../abstract-pool-worker'
 import type { IPoolInternal } from '../pool-internal'
+import type { IPoolWorker } from '../pool-worker'
 import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy'
 import type {
   IWorkerChoiceStrategy,
@@ -16,7 +16,7 @@ import { SelectionStrategiesUtils } from './selection-strategies-utils'
  * @template Response Type of response of execution. This can only be serializable data.
  */
 export class DynamicPoolWorkerChoiceStrategy<
-  Worker extends AbstractPoolWorker,
+  Worker extends IPoolWorker,
   Data,
   Response
 > extends AbstractWorkerChoiceStrategy<Worker, Data, Response> {
@@ -42,6 +42,11 @@ export class DynamicPoolWorkerChoiceStrategy<
     this.requiredStatistics = this.workerChoiceStrategy.requiredStatistics
   }
 
+  /** @inheritDoc */
+  public resetStatistics (): boolean {
+    return this.workerChoiceStrategy.resetStatistics()
+  }
+
   /** @inheritDoc */
   public choose (): Worker {
     const freeWorker = this.pool.findFreeWorker()
index 25a013ea64d604319cfe6d11678f59c8f7435a96..ee9351ec0b1011615a2f7489d83b5931a8ff8673 100644 (file)
@@ -1,4 +1,4 @@
-import type { AbstractPoolWorker } from '../abstract-pool-worker'
+import type { IPoolWorker } from '../pool-worker'
 import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy'
 import type { RequiredStatistics } from './selection-strategies-types'
 
@@ -19,26 +19,32 @@ type WorkerVirtualTaskTimestamp = {
  * @template Response Type of response of execution. This can only be serializable data.
  */
 export class FairShareWorkerChoiceStrategy<
-  Worker extends AbstractPoolWorker,
+  Worker extends IPoolWorker,
   Data,
   Response
 > extends AbstractWorkerChoiceStrategy<Worker, Data, Response> {
   /** @inheritDoc */
-  public requiredStatistics: RequiredStatistics = {
+  public readonly requiredStatistics: RequiredStatistics = {
     runTime: true
   }
 
   /**
    *  Worker last virtual task execution timestamp.
    */
-  private workerLastVirtualTaskTimestamp: Map<
+  private readonly workerLastVirtualTaskTimestamp: Map<
     Worker,
     WorkerVirtualTaskTimestamp
   > = new Map<Worker, WorkerVirtualTaskTimestamp>()
 
+  /** @inheritDoc */
+  public resetStatistics (): boolean {
+    this.workerLastVirtualTaskTimestamp.clear()
+    return true
+  }
+
   /** @inheritDoc */
   public choose (): Worker {
-    this.updateWorkerLastVirtualTaskTimestamp()
+    this.computeWorkerLastVirtualTaskTimestamp()
     let minWorkerVirtualTaskEndTimestamp = Infinity
     let chosenWorker!: Worker
     for (const worker of this.pool.workers) {
@@ -57,7 +63,7 @@ export class FairShareWorkerChoiceStrategy<
   /**
    * Computes workers last virtual task timestamp.
    */
-  private updateWorkerLastVirtualTaskTimestamp () {
+  private computeWorkerLastVirtualTaskTimestamp () {
     for (const worker of this.pool.workers) {
       const workerVirtualTaskStartTimestamp = Math.max(
         Date.now(),
index 03c6524d004846ff925ffaf6f0c498ed75cfb9e4..0e2a2bf473b64cafa25051fbf13e603e5ab71701 100644 (file)
@@ -1,4 +1,4 @@
-import type { AbstractPoolWorker } from '../abstract-pool-worker'
+import type { IPoolWorker } from '../pool-worker'
 import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy'
 
 /**
@@ -9,10 +9,15 @@ import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy'
  * @template Response Type of response of execution. This can only be serializable data.
  */
 export class LessRecentlyUsedWorkerChoiceStrategy<
-  Worker extends AbstractPoolWorker,
+  Worker extends IPoolWorker,
   Data,
   Response
 > extends AbstractWorkerChoiceStrategy<Worker, Data, Response> {
+  /** @inheritDoc */
+  public resetStatistics (): boolean {
+    return true
+  }
+
   /** @inheritDoc */
   public choose (): Worker {
     let minNumberOfRunningTasks = Infinity
index 69bffea13f05e8644d0f1c7441a8229ef3386e3e..082c7d21a6b988f18ff8768fdd0868c357205d9e 100644 (file)
@@ -1,4 +1,4 @@
-import type { AbstractPoolWorker } from '../abstract-pool-worker'
+import type { IPoolWorker } from '../pool-worker'
 import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy'
 
 /**
@@ -9,7 +9,7 @@ import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy'
  * @template Response Type of response of execution. This can only be serializable data.
  */
 export class RoundRobinWorkerChoiceStrategy<
-  Worker extends AbstractPoolWorker,
+  Worker extends IPoolWorker,
   Data,
   Response
 > extends AbstractWorkerChoiceStrategy<Worker, Data, Response> {
@@ -18,6 +18,11 @@ export class RoundRobinWorkerChoiceStrategy<
    */
   private nextWorkerIndex: number = 0
 
+  /** @inheritDoc */
+  public resetStatistics (): boolean {
+    return true
+  }
+
   /** @inheritDoc */
   public choose (): Worker {
     const chosenWorker = this.pool.workers[this.nextWorkerIndex]
index 5b844cc572756930980c29991ea3af94ae9a6532..f875c2906ee3a173ffab6790d93aece511b2a0ef 100644 (file)
@@ -1,4 +1,4 @@
-import type { AbstractPoolWorker } from '../abstract-pool-worker'
+import type { IPoolWorker } from '../pool-worker'
 
 /**
  * Enumeration of worker choice strategies.
@@ -28,7 +28,7 @@ export const WorkerChoiceStrategies = Object.freeze({
 export type WorkerChoiceStrategy = keyof typeof WorkerChoiceStrategies
 
 /**
- * Tasks usage statistics requirements.
+ * Pool tasks usage statistics requirements.
  */
 export type RequiredStatistics = {
   runTime: boolean
@@ -39,15 +39,19 @@ export type RequiredStatistics = {
  *
  * @template Worker Type of worker which manages the strategy.
  */
-export interface IWorkerChoiceStrategy<Worker extends AbstractPoolWorker> {
+export interface IWorkerChoiceStrategy<Worker extends IPoolWorker> {
   /**
    * Is the pool attached to the strategy dynamic?.
    */
-  isDynamicPool: boolean
+  readonly isDynamicPool: boolean
   /**
-   * Required tasks usage statistics.
+   * Required pool tasks usage statistics.
    */
-  requiredStatistics: RequiredStatistics
+  readonly requiredStatistics: RequiredStatistics
+  /**
+   * Resets strategy internal statistics.
+   */
+  resetStatistics(): boolean
   /**
    * Chooses a worker in the pool.
    */
index 699bc719b80d99ad9091c20b3544f69cbfcf02d7..f1f33d693996fedbe9862008f64e3e5eb46cdbfe 100644 (file)
@@ -1,5 +1,5 @@
-import type { AbstractPoolWorker } from '../abstract-pool-worker'
 import type { IPoolInternal } from '../pool-internal'
+import type { IPoolWorker } from '../pool-worker'
 import { FairShareWorkerChoiceStrategy } from './fair-share-worker-choice-strategy'
 import { LessRecentlyUsedWorkerChoiceStrategy } from './less-recently-used-worker-choice-strategy'
 import { RoundRobinWorkerChoiceStrategy } from './round-robin-worker-choice-strategy'
@@ -22,7 +22,7 @@ export class SelectionStrategiesUtils {
    * @returns The worker choice strategy instance.
    */
   public static getWorkerChoiceStrategy<
-    Worker extends AbstractPoolWorker,
+    Worker extends IPoolWorker,
     Data,
     Response
   > (
index ed8690918f6177656020d84a915f11de85023aca..925f318c6cd24b124068c20791ca81c3a494e6f1 100644 (file)
@@ -1,6 +1,6 @@
 import { cpus } from 'os'
-import type { AbstractPoolWorker } from '../abstract-pool-worker'
 import type { IPoolInternal } from '../pool-internal'
+import type { IPoolWorker } from '../pool-worker'
 import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy'
 import type { RequiredStatistics } from './selection-strategies-types'
 
@@ -21,12 +21,12 @@ type TaskRunTime = {
  * @template Response Type of response of execution. This can only be serializable data.
  */
 export class WeightedRoundRobinWorkerChoiceStrategy<
-  Worker extends AbstractPoolWorker,
+  Worker extends IPoolWorker,
   Data,
   Response
 > extends AbstractWorkerChoiceStrategy<Worker, Data, Response> {
   /** @inheritDoc */
-  public requiredStatistics: RequiredStatistics = {
+  public readonly requiredStatistics: RequiredStatistics = {
     runTime: true
   }
 
@@ -45,7 +45,7 @@ export class WeightedRoundRobinWorkerChoiceStrategy<
   /**
    * Per worker virtual task runtime map.
    */
-  private workersTaskRunTime: Map<Worker, TaskRunTime> = new Map<
+  private readonly workersTaskRunTime: Map<Worker, TaskRunTime> = new Map<
     Worker,
     TaskRunTime
   >()
@@ -61,6 +61,13 @@ export class WeightedRoundRobinWorkerChoiceStrategy<
     this.initWorkersTaskRunTime()
   }
 
+  /** @inheritDoc */
+  public resetStatistics (): boolean {
+    this.workersTaskRunTime.clear()
+    this.initWorkersTaskRunTime()
+    return true
+  }
+
   /** @inheritDoc */
   public choose (): Worker {
     const currentWorker = this.pool.workers[this.currentWorkerIndex]
index a88a8eed1eee1b2f627fcd4b1fe89489e44a8394..491444755aafd5076663d14b0c22f6cd79eaf716 100644 (file)
@@ -1,6 +1,6 @@
-import type { AbstractPoolWorker } from '../abstract-pool-worker'
 import type { IPoolInternal } from '../pool-internal'
 import { PoolType } from '../pool-internal'
+import type { IPoolWorker } from '../pool-worker'
 import { DynamicPoolWorkerChoiceStrategy } from './dynamic-pool-worker-choice-strategy'
 import type {
   IWorkerChoiceStrategy,
@@ -17,7 +17,7 @@ import { SelectionStrategiesUtils } from './selection-strategies-utils'
  * @template Response Type of response of execution. This can only be serializable data.
  */
 export class WorkerChoiceStrategyContext<
-  Worker extends AbstractPoolWorker,
+  Worker extends IPoolWorker,
   Data,
   Response
 > {
@@ -77,6 +77,7 @@ export class WorkerChoiceStrategyContext<
   public setWorkerChoiceStrategy (
     workerChoiceStrategy: WorkerChoiceStrategy
   ): void {
+    this.workerChoiceStrategy?.resetStatistics()
     this.workerChoiceStrategy = this.getPoolWorkerChoiceStrategy(
       workerChoiceStrategy
     )
index 926b3280b2f85e4378031cee08b59ce0fee65c98..7bcd076c291736cad1729d66a671a51b2115c1b7 100644 (file)
@@ -1,6 +1,6 @@
 import type { Worker as ClusterWorker } from 'cluster'
 import type { MessagePort } from 'worker_threads'
-import type { AbstractPoolWorker } from './pools/abstract-pool-worker'
+import type { IPoolWorker } from './pools/pool-worker'
 import type { KillBehavior } from './worker/worker-options'
 
 /**
@@ -50,7 +50,7 @@ export interface MessageValue<
  * @template Response Type of response of execution. This can only be serializable data.
  */
 export interface PromiseWorkerResponseWrapper<
-  Worker extends AbstractPoolWorker,
+  Worker extends IPoolWorker,
   Response = unknown
 > {
   /**
index 9b170330fabe7ea7514733b4553aac535a2528c8..4aad8e62669335c3940ba3fb88cc08afda062eb3 100644 (file)
@@ -10,7 +10,7 @@ const DEFAULT_MAX_INACTIVE_TIME = 1000 * 60
 const DEFAULT_KILL_BEHAVIOR: KillBehavior = KillBehaviors.SOFT
 
 /**
- * Base class containing some shared logic for all poolifier workers.
+ * Base class that implements some shared logic for all poolifier workers.
  *
  * @template MainWorker Type of main worker.
  * @template Data Type of data this worker receives from pool's execution. This can only be serializable data.
index 608c3dea7db411b2f8d46f07783e61c91a81b7ad..c9a3cdfa6f43c9828f3bb3e60d6ab7e4acc0f757 100644 (file)
@@ -46,7 +46,7 @@ export interface WorkerOptions {
    *   when this timeout expires your tasks is interrupted and the worker is killed if is not part of the minimum size of the pool.
    * - If `killBehavior` is set to `KillBehaviors.SOFT` your tasks have no timeout and your workers will not be terminated until your task is completed.
    *
-   * @default 60.000 ms
+   * @default 60000 ms
    */
   maxInactiveTime?: number
   /**