Enhance tasks statistics
authorJérôme Benoit <jerome.benoit@sap.com>
Sun, 9 Oct 2022 18:27:37 +0000 (20:27 +0200)
committerJérôme Benoit <jerome.benoit@sap.com>
Sun, 9 Oct 2022 18:27:37 +0000 (20:27 +0200)
Close #577

Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>
README.md
src/pools/abstract-pool.ts
src/pools/pool-internal.ts
src/utility-types.ts
src/worker/abstract-worker.ts
tests/pools/abstract/abstract-pool.test.js
tests/worker-files/cluster/testWorker.js
tests/worker-files/thread/testWorker.js

index a292ce8414dd628f3813a8d0d5aad15180799469..2db54d26d4742ea0e687fad888e3972ccd49f8c7 100644 (file)
--- a/README.md
+++ b/README.md
@@ -99,7 +99,7 @@ You can implement a worker-threads worker in a simple way by extending the class
 'use strict'
 const { ThreadWorker } = require('poolifier')
 
-function yourFunction (data) {
+function yourFunction(data) {
   // this will be executed in the worker thread,
   // the data will be received by using the execute method
   return { ok: 1 }
@@ -194,7 +194,7 @@ This method will call the terminate method on each worker.
   The last active time of your worker unit will be updated when a task is submitted to a worker or when a worker terminate a task.  
   If `killBehavior` is set to `KillBehaviors.HARD` this value represents also the timeout for the tasks that you submit to the pool, when this timeout expires your tasks is interrupted and the worker is killed if is not part of the minimum size of the pool.  
   If `killBehavior` is set to `KillBehaviors.SOFT` your tasks have no timeout and your workers will not be terminated until your task is completed.  
-  Default: 60.000 ms
+  Default: 60000 ms
 
 - `async` - true/false, true if your function contains async pieces else false
 - `killBehavior` - Dictates if your async unit (worker/process) will be deleted in case that a task is active on it.  
index 10fe128170b5363b0b04d58b82e451369a0f9d3d..0921f45644446c18512f876a0a13b3379f8026fb 100644 (file)
@@ -6,7 +6,7 @@ import { EMPTY_FUNCTION } from '../utils'
 import { isKillBehavior, KillBehaviors } from '../worker/worker-options'
 import type { AbstractPoolWorker } from './abstract-pool-worker'
 import type { PoolOptions } from './pool'
-import type { IPoolInternal } from './pool-internal'
+import type { IPoolInternal, TasksUsage } from './pool-internal'
 import { PoolEmitter, PoolType } from './pool-internal'
 import {
   WorkerChoiceStrategies,
@@ -14,6 +14,9 @@ import {
 } from './selection-strategies/selection-strategies-types'
 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
 
+const WORKER_NOT_FOUND_TASKS_USAGE_MAP =
+  'Worker could not be found in worker tasks usage map'
+
 /**
  * Base class containing some shared logic for all poolifier pools.
  *
@@ -29,8 +32,16 @@ export abstract class AbstractPool<
   /** @inheritDoc */
   public readonly workers: Worker[] = []
 
-  /** @inheritDoc */
-  public readonly tasks: Map<Worker, number> = new Map<Worker, number>()
+  /**
+   * The workers tasks usage map.
+   *
+   *  `key`: The `Worker`
+   *  `value`: Worker tasks usage statistics.
+   */
+  protected workersTasksUsage: Map<Worker, TasksUsage> = new Map<
+    Worker,
+    TasksUsage
+  >()
 
   /** @inheritDoc */
   public readonly emitter?: PoolEmitter
@@ -151,14 +162,19 @@ export abstract class AbstractPool<
     return this.promiseMap.size
   }
 
+  /** @inheritDoc */
+  public getWorkerIndex (worker: Worker): number {
+    return this.workers.indexOf(worker)
+  }
+
   /** @inheritDoc */
   public getWorkerRunningTasks (worker: Worker): number | undefined {
-    return this.tasks.get(worker)
+    return this.workersTasksUsage.get(worker)?.running
   }
 
   /** @inheritDoc */
-  public getWorkerIndex (worker: Worker): number {
-    return this.workers.indexOf(worker)
+  public getWorkerAverageTasksRunTime (worker: Worker): number | undefined {
+    return this.workersTasksUsage.get(worker)?.avgRunTime
   }
 
   /** @inheritDoc */
@@ -230,36 +246,29 @@ export abstract class AbstractPool<
   protected abstract isMain (): boolean
 
   /**
-   * Increase the number of tasks that the given worker has applied.
+   * Hook executed before the worker task promise resolution.
+   * Can be overridden.
    *
-   * @param worker Worker whose tasks are increased.
+   * @param worker The worker.
    */
-  protected increaseWorkersTask (worker: Worker): void {
-    this.stepWorkerNumberOfTasks(worker, 1)
+  protected beforePromiseWorkerResponseHook (worker: Worker): void {
+    this.increaseWorkerRunningTasks(worker)
   }
 
   /**
-   * Decrease the number of tasks that the given worker has applied.
+   * Hook executed after the worker task promise resolution.
+   * Can be overridden.
    *
-   * @param worker Worker whose tasks are decreased.
+   * @param message The received message.
+   * @param promise The Promise response.
    */
-  protected decreaseWorkersTasks (worker: Worker): void {
-    this.stepWorkerNumberOfTasks(worker, -1)
-  }
-
-  /**
-   * Step the number of tasks that the given worker has applied.
-   *
-   * @param worker Worker whose tasks are set.
-   * @param step Worker number of tasks step.
-   */
-  private stepWorkerNumberOfTasks (worker: Worker, step: number): void {
-    const numberOfTasksInProgress = this.tasks.get(worker)
-    if (numberOfTasksInProgress !== undefined) {
-      this.tasks.set(worker, numberOfTasksInProgress + step)
-    } else {
-      throw Error('Worker could not be found in tasks map')
-    }
+  protected afterPromiseWorkerResponseHook (
+    message: MessageValue<Response>,
+    promise: PromiseWorkerResponseWrapper<Worker, Response>
+  ): void {
+    this.decreaseWorkerRunningTasks(promise.worker)
+    this.stepWorkerRunTasks(promise.worker, 1)
+    this.updateWorkerTasksRunTime(promise.worker, message.taskRunTime)
   }
 
   /**
@@ -270,7 +279,7 @@ export abstract class AbstractPool<
   protected removeWorker (worker: Worker): void {
     // Clean worker from data structure
     this.workers.splice(this.getWorkerIndex(worker), 1)
-    this.tasks.delete(worker)
+    this.resetWorkerTasksUsage(worker)
   }
 
   /**
@@ -309,7 +318,7 @@ export abstract class AbstractPool<
     worker: Worker,
     messageId: number
   ): Promise<Response> {
-    this.increaseWorkersTask(worker)
+    this.beforePromiseWorkerResponseHook(worker)
     return new Promise<Response>((resolve, reject) => {
       this.promiseMap.set(messageId, { resolve, reject, worker })
     })
@@ -345,8 +354,13 @@ export abstract class AbstractPool<
 
     this.workers.push(worker)
 
-    // Init tasks map
-    this.tasks.set(worker, 0)
+    // Init worker tasks usage map
+    this.workersTasksUsage.set(worker, {
+      run: 0,
+      running: 0,
+      runTime: 0,
+      avgRunTime: 0
+    })
 
     this.afterWorkerSetup(worker)
 
@@ -363,7 +377,7 @@ export abstract class AbstractPool<
       if (message.id !== undefined) {
         const promise = this.promiseMap.get(message.id)
         if (promise !== undefined) {
-          this.decreaseWorkersTasks(promise.worker)
+          this.afterPromiseWorkerResponseHook(message, promise)
           if (message.error) promise.reject(message.error)
           else promise.resolve(message.data as Response)
           this.promiseMap.delete(message.id)
@@ -377,4 +391,83 @@ export abstract class AbstractPool<
       this.emitter?.emit('busy')
     }
   }
+
+  /**
+   * Increase the number of tasks that the given worker has applied.
+   *
+   * @param worker Worker which running tasks is increased.
+   */
+  private increaseWorkerRunningTasks (worker: Worker): void {
+    this.stepWorkerRunningTasks(worker, 1)
+  }
+
+  /**
+   * Decrease the number of tasks that the given worker has applied.
+   *
+   * @param worker Worker which running tasks is decreased.
+   */
+  private decreaseWorkerRunningTasks (worker: Worker): void {
+    this.stepWorkerRunningTasks(worker, -1)
+  }
+
+  /**
+   * Step the number of tasks that the given worker has applied.
+   *
+   * @param worker Worker which running tasks are stepped.
+   * @param step Number of running tasks step.
+   */
+  private stepWorkerRunningTasks (worker: Worker, step: number): void {
+    const tasksUsage = this.workersTasksUsage.get(worker)
+    if (tasksUsage !== undefined) {
+      tasksUsage.running = tasksUsage.running + step
+      this.workersTasksUsage.set(worker, tasksUsage)
+    } else {
+      throw new Error(WORKER_NOT_FOUND_TASKS_USAGE_MAP)
+    }
+  }
+
+  /**
+   * Step the number of tasks that the given worker has run.
+   *
+   * @param worker Worker which has run tasks.
+   * @param step Number of run tasks step.
+   */
+  private stepWorkerRunTasks (worker: Worker, step: number) {
+    const tasksUsage = this.workersTasksUsage.get(worker)
+    if (tasksUsage !== undefined) {
+      tasksUsage.run = tasksUsage.run + step
+      this.workersTasksUsage.set(worker, tasksUsage)
+    } else {
+      throw new Error(WORKER_NOT_FOUND_TASKS_USAGE_MAP)
+    }
+  }
+
+  /**
+   * Update tasks run time for the given worker.
+   *
+   * @param worker Worker which run the task.
+   * @param taskRunTime Worker task run time.
+   */
+  private updateWorkerTasksRunTime (
+    worker: Worker,
+    taskRunTime: number | undefined
+  ) {
+    const tasksUsage = this.workersTasksUsage.get(worker)
+    if (tasksUsage !== undefined && tasksUsage.run !== 0) {
+      tasksUsage.runTime += taskRunTime ?? 0
+      tasksUsage.avgRunTime = tasksUsage.runTime / tasksUsage.run
+      this.workersTasksUsage.set(worker, tasksUsage)
+    } else {
+      throw new Error(WORKER_NOT_FOUND_TASKS_USAGE_MAP)
+    }
+  }
+
+  /**
+   * Reset worker tasks usage statistics.
+   *
+   * @param worker The worker.
+   */
+  private resetWorkerTasksUsage (worker: Worker): void {
+    this.workersTasksUsage.delete(worker)
+  }
 }
index 71120110f75bfb48c6e005876b8b7715e53f53fb..f5f1d1908a14d60f2e26f9d6baa75c627468e96b 100644 (file)
@@ -10,6 +10,16 @@ export enum PoolType {
   DYNAMIC = 'dynamic'
 }
 
+/**
+ * Tasks usage statistics.
+ */
+export interface TasksUsage {
+  run: number
+  running: number
+  runTime: number
+  avgRunTime: number
+}
+
 /**
  * Internal poolifier pool emitter.
  */
@@ -32,14 +42,6 @@ export interface IPoolInternal<
    */
   readonly workers: Worker[]
 
-  /**
-   * The tasks map.
-   *
-   * - `key`: The `Worker`
-   * - `value`: Number of tasks currently in progress on the worker.
-   */
-  readonly tasks: Map<Worker, number>
-
   /**
    * Emitter on which events can be listened to.
    *
@@ -99,4 +101,12 @@ export interface IPoolInternal<
    * @returns The number of tasks currently running on the worker.
    */
   getWorkerRunningTasks(worker: Worker): number | undefined
+
+  /**
+   * Get worker average tasks run time.
+   *
+   * @param worker The worker.
+   * @returns The average tasks run time on the worker.
+   */
+  getWorkerAverageTasksRunTime(worker: Worker): number | undefined
 }
index c9c5492cb7411cb78e0363514badd243c5e49add..104725ac4299ec9827e81c52d9e7b09dc3105d96 100644 (file)
@@ -31,6 +31,10 @@ export interface MessageValue<
    * Error.
    */
   readonly error?: string
+  /**
+   * Task run time.
+   */
+  readonly taskRunTime?: number
   /**
    * Reference to main worker.
    *
index b49ede8b0802f1baf39230b087f9b6b99050e8e3..349940faea20418b2bfa954561f7d258cc8ce3b2 100644 (file)
@@ -171,8 +171,10 @@ export abstract class AbstractWorker<
     value: MessageValue<Data>
   ): void {
     try {
+      const startTaskTimestamp = Date.now()
       const res = fn(value.data)
-      this.sendToMainWorker({ data: res, id: value.id })
+      const taskRunTime = Date.now() - startTaskTimestamp
+      this.sendToMainWorker({ data: res, id: value.id, taskRunTime })
     } catch (e) {
       const err = this.handleError(e as Error)
       this.sendToMainWorker({ error: err, id: value.id })
@@ -191,9 +193,11 @@ export abstract class AbstractWorker<
     fn: (data?: Data) => Promise<Response>,
     value: MessageValue<Data>
   ): void {
+    const startTaskTimestamp = Date.now()
     fn(value.data)
       .then(res => {
-        this.sendToMainWorker({ data: res, id: value.id })
+        const taskRunTime = Date.now() - startTaskTimestamp
+        this.sendToMainWorker({ data: res, id: value.id, taskRunTime })
         return null
       })
       .catch(e => {
index d9ab30fec9d0a9f11629fcf69ca4b0921469fdb9..24c7debd3c86deda03f4e312682d60546ed07c61 100644 (file)
@@ -4,13 +4,16 @@ const {
   FixedThreadPool,
   WorkerChoiceStrategies
 } = require('../../../lib/index')
-const expectedError = new Error('Worker could not be found in tasks map')
 
 const numberOfWorkers = 1
 
-class StubPoolWithTasksMapClear extends FixedThreadPool {
+const workerNotFoundInTasksUsageMapError = new Error(
+  'Worker could not be found in worker tasks usage map'
+)
+
+class StubPoolWithWorkerTasksUsageMapClear extends FixedThreadPool {
   removeAllWorker () {
-    this.tasks.clear()
+    this.workersTasksUsage.clear()
   }
 }
 
@@ -21,31 +24,6 @@ class StubPoolWithIsMainMethod extends FixedThreadPool {
 }
 
 describe('Abstract pool test suite', () => {
-  it('Simulate worker not found during increaseWorkersTask', () => {
-    const pool = new StubPoolWithTasksMapClear(
-      numberOfWorkers,
-      './tests/worker-files/thread/testWorker.js'
-    )
-    // Simulate worker not found.
-    pool.removeAllWorker()
-    expect(() => pool.increaseWorkersTask()).toThrowError(expectedError)
-    pool.destroy()
-  })
-
-  it('Simulate worker not found during decreaseWorkersTasks', () => {
-    const pool = new StubPoolWithTasksMapClear(
-      numberOfWorkers,
-      './tests/worker-files/thread/testWorker.js',
-      {
-        errorHandler: e => console.error(e)
-      }
-    )
-    // Simulate worker not found.
-    pool.removeAllWorker()
-    expect(() => pool.decreaseWorkersTasks()).toThrowError(expectedError)
-    pool.destroy()
-  })
-
   it('Simulate pool creation from a non main thread/process', () => {
     expect(
       () =>
@@ -139,6 +117,109 @@ describe('Abstract pool test suite', () => {
     pool.destroy()
   })
 
+  it('Simulate worker not found during increaseWorkerRunningTasks', () => {
+    const pool = new StubPoolWithWorkerTasksUsageMapClear(
+      numberOfWorkers,
+      './tests/worker-files/cluster/testWorker.js'
+    )
+    // Simulate worker not found.
+    pool.removeAllWorker()
+    expect(() => pool.increaseWorkerRunningTasks()).toThrowError(
+      workerNotFoundInTasksUsageMapError
+    )
+    pool.destroy()
+  })
+
+  it('Simulate worker not found during decreaseWorkerRunningTasks', () => {
+    const pool = new StubPoolWithWorkerTasksUsageMapClear(
+      numberOfWorkers,
+      './tests/worker-files/cluster/testWorker.js',
+      {
+        errorHandler: e => console.error(e)
+      }
+    )
+    // Simulate worker not found.
+    pool.removeAllWorker()
+    expect(() => pool.decreaseWorkerRunningTasks()).toThrowError(
+      workerNotFoundInTasksUsageMapError
+    )
+    pool.destroy()
+  })
+
+  it('Simulate worker not found during stepWorkerRunTasks', () => {
+    const pool = new StubPoolWithWorkerTasksUsageMapClear(
+      numberOfWorkers,
+      './tests/worker-files/cluster/testWorker.js',
+      {
+        errorHandler: e => console.error(e)
+      }
+    )
+    // Simulate worker not found.
+    pool.removeAllWorker()
+    expect(() => pool.stepWorkerRunTasks()).toThrowError(
+      workerNotFoundInTasksUsageMapError
+    )
+    pool.destroy()
+  })
+
+  it('Simulate worker not found during updateWorkerTasksRunTime', () => {
+    const pool = new StubPoolWithWorkerTasksUsageMapClear(
+      numberOfWorkers,
+      './tests/worker-files/cluster/testWorker.js',
+      {
+        errorHandler: e => console.error(e)
+      }
+    )
+    // Simulate worker not found.
+    pool.removeAllWorker()
+    expect(() => pool.updateWorkerTasksRunTime()).toThrowError(
+      workerNotFoundInTasksUsageMapError
+    )
+    pool.destroy()
+  })
+
+  it('Verify that worker pool tasks usage are initialized', () => {
+    const pool = new FixedClusterPool(
+      numberOfWorkers,
+      './tests/worker-files/cluster/testWorker.js'
+    )
+    for (const tasksUsage of pool.workersTasksUsage.values()) {
+      expect(tasksUsage).toBeDefined()
+      expect(tasksUsage.run).toBe(0)
+      expect(tasksUsage.running).toBe(0)
+      expect(tasksUsage.runTime).toBe(0)
+      expect(tasksUsage.avgRunTime).toBe(0)
+    }
+    pool.destroy()
+  })
+
+  it('Verify that worker pool tasks usage are computed', async () => {
+    const pool = new FixedClusterPool(
+      numberOfWorkers,
+      './tests/worker-files/cluster/testWorker.js'
+    )
+    const promises = []
+    for (let i = 0; i < numberOfWorkers * 2; i++) {
+      promises.push(pool.execute({ test: 'test' }))
+    }
+    for (const tasksUsage of pool.workersTasksUsage.values()) {
+      expect(tasksUsage).toBeDefined()
+      expect(tasksUsage.run).toBe(0)
+      expect(tasksUsage.running).toBe(numberOfWorkers * 2)
+      expect(tasksUsage.runTime).toBe(0)
+      expect(tasksUsage.avgRunTime).toBe(0)
+    }
+    await Promise.all(promises)
+    for (const tasksUsage of pool.workersTasksUsage.values()) {
+      expect(tasksUsage).toBeDefined()
+      expect(tasksUsage.run).toBe(numberOfWorkers * 2)
+      expect(tasksUsage.running).toBe(0)
+      expect(tasksUsage.runTime).toBeGreaterThanOrEqual(0)
+      expect(tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0)
+    }
+    pool.destroy()
+  })
+
   it("Verify that pool event emitter 'busy' event can register a callback", async () => {
     const pool = new FixedThreadPool(
       numberOfWorkers,
index f6115cdeca471a8895175860df716a40e1731c36..cba1ec8ecef91f899c8653c5c7ae54744eb29166 100644 (file)
@@ -4,7 +4,7 @@ const { isMaster } = require('cluster')
 const TestUtils = require('../../test-utils')
 
 function test (data) {
-  TestUtils.jsonIntegerSerialization(50)
+  TestUtils.jsonIntegerSerialization(100)
   return isMaster
 }
 
index ccf55b56ba92d29e3848ecf0d19d37fe39f068e9..773e11164014a35a01e3cfc0dfe3f8f797d46336 100644 (file)
@@ -4,7 +4,7 @@ const { isMainThread } = require('worker_threads')
 const TestUtils = require('../../test-utils')
 
 function test (data) {
-  TestUtils.jsonIntegerSerialization(50)
+  TestUtils.jsonIntegerSerialization(100)
   return isMainThread
 }