feat: add median task run time statistic
authorJérôme Benoit <jerome.benoit@sap.com>
Sat, 8 Apr 2023 12:06:56 +0000 (14:06 +0200)
committerJérôme Benoit <jerome.benoit@sap.com>
Sat, 8 Apr 2023 12:06:56 +0000 (14:06 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>
12 files changed:
src/circular-array.ts [new file with mode: 0644]
src/pools/abstract-pool.ts
src/pools/pool-internal.ts
src/pools/selection-strategies/abstract-worker-choice-strategy.ts
src/pools/selection-strategies/fair-share-worker-choice-strategy.ts
src/pools/selection-strategies/less-busy-worker-choice-strategy.ts
src/pools/selection-strategies/selection-strategies-types.ts
src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts
src/utils.ts
tests/circular-array.test.js [new file with mode: 0644]
tests/pools/abstract/abstract-pool.test.js
tests/pools/selection-strategies/selection-strategies.test.js

diff --git a/src/circular-array.ts b/src/circular-array.ts
new file mode 100644 (file)
index 0000000..8adf4b5
--- /dev/null
@@ -0,0 +1,94 @@
+// Copyright Jerome Benoit. 2021-2023. All Rights Reserved.
+
+const DEFAULT_CIRCULAR_ARRAY_SIZE = 1024
+
+/**
+ * Array with a maximum length shifting items when full.
+ */
+export class CircularArray<T> extends Array<T> {
+  public size: number
+
+  constructor (size: number = DEFAULT_CIRCULAR_ARRAY_SIZE, ...items: T[]) {
+    super()
+    this.checkSize(size)
+    this.size = size
+    if (arguments.length > 1) {
+      this.push(...items)
+    }
+  }
+
+  public push (...items: T[]): number {
+    const length = super.push(...items)
+    if (length > this.size) {
+      super.splice(0, length - this.size)
+    }
+    return this.length
+  }
+
+  public unshift (...items: T[]): number {
+    const length = super.unshift(...items)
+    if (length > this.size) {
+      super.splice(this.size, items.length)
+    }
+    return this.length
+  }
+
+  public concat (...items: Array<T | ConcatArray<T>>): CircularArray<T> {
+    const concatenatedCircularArray = super.concat(
+      items as T[]
+    ) as CircularArray<T>
+    concatenatedCircularArray.size = this.size
+    if (concatenatedCircularArray.length > concatenatedCircularArray.size) {
+      concatenatedCircularArray.splice(
+        0,
+        concatenatedCircularArray.length - concatenatedCircularArray.size
+      )
+    }
+    return concatenatedCircularArray
+  }
+
+  public splice (start: number, deleteCount?: number, ...items: T[]): T[] {
+    let itemsRemoved: T[]
+    if (arguments.length >= 3 && deleteCount !== undefined) {
+      itemsRemoved = super.splice(start, deleteCount)
+      // FIXME: that makes the items insert not in place
+      this.push(...items)
+    } else if (arguments.length === 2) {
+      itemsRemoved = super.splice(start, deleteCount)
+    } else {
+      itemsRemoved = super.splice(start)
+    }
+    return itemsRemoved
+  }
+
+  public resize (size: number): void {
+    this.checkSize(size)
+    if (size === 0) {
+      this.length = 0
+    } else if (size < this.size) {
+      for (let i = size; i < this.size; i++) {
+        super.pop()
+      }
+    }
+    this.size = size
+  }
+
+  public empty (): boolean {
+    return this.length === 0
+  }
+
+  public full (): boolean {
+    return this.length === this.size
+  }
+
+  private checkSize (size: number): void {
+    if (!Number.isSafeInteger(size)) {
+      throw new TypeError(
+        `Invalid circular array size: ${size} is not a safe integer`
+      )
+    }
+    if (size < 0) {
+      throw new RangeError(`Invalid circular array size: ${size} < 0`)
+    }
+  }
+}
index 9ac81ce56558ec76fbd66d7789b2399e910a2e69..75b5cf796c8a6c198c48bb8ad843bd6eae33ba29 100644 (file)
@@ -1,6 +1,6 @@
 import crypto from 'node:crypto'
 import type { MessageValue, PromiseResponseWrapper } from '../utility-types'
-import { EMPTY_FUNCTION } from '../utils'
+import { EMPTY_FUNCTION, median } from '../utils'
 import { KillBehaviors, isKillBehavior } from '../worker/worker-options'
 import { PoolEvents, type PoolOptions } from './pool'
 import { PoolEmitter } from './pool'
@@ -12,6 +12,7 @@ import {
   type WorkerChoiceStrategy
 } from './selection-strategies/selection-strategies-types'
 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
+import { CircularArray } from '../circular-array'
 
 /**
  * Base class that implements some shared logic for all poolifier pools.
@@ -171,7 +172,9 @@ export abstract class AbstractPool<
         run: 0,
         running: 0,
         runTime: 0,
+        runTimeHistory: new CircularArray(),
         avgRunTime: 0,
+        medRunTime: 0,
         error: 0
       })
     }
@@ -284,6 +287,10 @@ export abstract class AbstractPool<
         workerTasksUsage.avgRunTime =
           workerTasksUsage.runTime / workerTasksUsage.run
       }
+      if (this.workerChoiceStrategyContext.getRequiredStatistics().medRunTime) {
+        workerTasksUsage.runTimeHistory.push(message.runTime ?? 0)
+        workerTasksUsage.medRunTime = median(workerTasksUsage.runTimeHistory)
+      }
     }
   }
 
@@ -375,7 +382,9 @@ export abstract class AbstractPool<
       run: 0,
       running: 0,
       runTime: 0,
+      runTimeHistory: new CircularArray(),
       avgRunTime: 0,
+      medRunTime: 0,
       error: 0
     })
 
index c3ec39bea578d3de3c0423f2ef789e0cd6c6f9df..bf799581f138f62c4ef055e6dc40a6ee998358ea 100644 (file)
@@ -1,3 +1,4 @@
+import type { CircularArray } from '../circular-array'
 import type { IPool } from './pool'
 import type { IPoolWorker } from './pool-worker'
 
@@ -18,7 +19,9 @@ export interface TasksUsage {
   run: number
   running: number
   runTime: number
+  runTimeHistory: CircularArray<number>
   avgRunTime: number
+  medRunTime: number
   error: number
 }
 
index b0a624da9d5afb7b563d2a069e2b04866b02163a..8ab3d360ca67f210db221fcdd424039799cf6668 100644 (file)
@@ -23,7 +23,8 @@ export abstract class AbstractWorkerChoiceStrategy<
   /** @inheritDoc */
   public requiredStatistics: RequiredStatistics = {
     runTime: false,
-    avgRunTime: false
+    avgRunTime: false,
+    medRunTime: false
   }
 
   /**
index 16d5a7be1dd60ca213a873c1571f8335948ac826..fc4f0f69032c7f37b2730bb7d68c4b6778f5f9a7 100644 (file)
@@ -31,7 +31,8 @@ export class FairShareWorkerChoiceStrategy<
   /** @inheritDoc */
   public readonly requiredStatistics: RequiredStatistics = {
     runTime: true,
-    avgRunTime: true
+    avgRunTime: true,
+    medRunTime: false
   }
 
   /**
index 87ef804d50881f157e2810a715c2db4627315f33..c03c9da0a1580ce645b96df5bf24b44111aa080a 100644 (file)
@@ -22,7 +22,8 @@ export class LessBusyWorkerChoiceStrategy<
   /** @inheritDoc */
   public readonly requiredStatistics: RequiredStatistics = {
     runTime: true,
-    avgRunTime: false
+    avgRunTime: false,
+    medRunTime: false
   }
 
   /** @inheritDoc */
index 2c6c2e16d6e4d52e7f19841e6d6f331db3d678f9..2207173448458a81be1d383f74a51d4b1b08964a 100644 (file)
@@ -35,6 +35,7 @@ export type WorkerChoiceStrategy = keyof typeof WorkerChoiceStrategies
 export interface RequiredStatistics {
   runTime: boolean
   avgRunTime: boolean
+  medRunTime: boolean
 }
 
 /**
index ea598b207f2e1b20358ce0cd6c5da134077ae8f1..96b5867b5bfb1db98f8a8afd4505b5c24da75505 100644 (file)
@@ -33,7 +33,8 @@ export class WeightedRoundRobinWorkerChoiceStrategy<
   /** @inheritDoc */
   public readonly requiredStatistics: RequiredStatistics = {
     runTime: true,
-    avgRunTime: true
+    avgRunTime: true,
+    medRunTime: false
   }
 
   /**
index 809ca38b6bdb17d9f92274bc822543d61bf7a9f4..d85c5ba58b4988fad867a9b35f9dca5bb5bb2b45 100644 (file)
@@ -4,3 +4,21 @@
 export const EMPTY_FUNCTION: () => void = Object.freeze(() => {
   /* Intentionally empty */
 })
+
+/**
+ * Returns the median of the given data set.
+ *
+ * @param dataSet - Data set.
+ * @returns The median of the given data set.
+ */
+export const median = (dataSet: number[]): number => {
+  if (Array.isArray(dataSet) && dataSet.length === 1) {
+    return dataSet[0]
+  }
+  const sortedDataSet = dataSet.slice().sort((a, b) => a - b)
+  const middleIndex = Math.floor(sortedDataSet.length / 2)
+  if (sortedDataSet.length % 2 === 0) {
+    return sortedDataSet[middleIndex / 2]
+  }
+  return (sortedDataSet[middleIndex - 1] + sortedDataSet[middleIndex]) / 2
+}
diff --git a/tests/circular-array.test.js b/tests/circular-array.test.js
new file mode 100644 (file)
index 0000000..d821b6c
--- /dev/null
@@ -0,0 +1,144 @@
+const { expect } = require('expect')
+const { CircularArray } = require('../lib/circular-array')
+
+describe('Circular array test suite', () => {
+  it('Verify that circular array can be instantiated', () => {
+    const circularArray = new CircularArray()
+    expect(circularArray).toBeInstanceOf(CircularArray)
+  })
+
+  it('Verify circular array default size at instance creation', () => {
+    const circularArray = new CircularArray()
+    expect(circularArray.size).toBe(1024)
+  })
+
+  it('Verify that circular array size can be set at instance creation', () => {
+    const circularArray = new CircularArray(1000)
+    expect(circularArray.size).toBe(1000)
+  })
+
+  it('Verify that circular array size and items can be set at instance creation', () => {
+    let circularArray = new CircularArray(1000, 1, 2, 3, 4, 5)
+    expect(circularArray.size).toBe(1000)
+    expect(circularArray.length).toBe(5)
+    circularArray = new CircularArray(4, 1, 2, 3, 4, 5)
+    expect(circularArray.size).toBe(4)
+    expect(circularArray.length).toBe(4)
+  })
+
+  it('Verify that circular array size is valid at instance creation', () => {
+    expect(() => new CircularArray(0.25)).toThrowError(
+      new TypeError('Invalid circular array size: 0.25 is not a safe integer')
+    )
+    expect(() => new CircularArray(-1)).toThrowError(
+      new RangeError('Invalid circular array size: -1 < 0')
+    )
+    expect(() => new CircularArray(Number.MAX_SAFE_INTEGER + 1)).toThrowError(
+      new TypeError(
+        `Invalid circular array size: ${
+          Number.MAX_SAFE_INTEGER + 1
+        } is not a safe integer`
+      )
+    )
+  })
+
+  it('Verify that circular array empty works as intended', () => {
+    const circularArray = new CircularArray()
+    expect(circularArray.empty()).toBe(true)
+  })
+
+  it('Verify that circular array full works as intended', () => {
+    const circularArray = new CircularArray(5, 1, 2, 3, 4, 5)
+    expect(circularArray.full()).toBe(true)
+  })
+
+  it('Verify that circular array push works as intended', () => {
+    let circularArray = new CircularArray(4)
+    let arrayLength = circularArray.push(1, 2, 3, 4, 5)
+    expect(arrayLength).toBe(circularArray.size)
+    expect(circularArray.length).toBe(circularArray.size)
+    expect(circularArray).toStrictEqual(new CircularArray(4, 2, 3, 4, 5))
+    arrayLength = circularArray.push(6, 7)
+    expect(arrayLength).toBe(circularArray.size)
+    expect(circularArray.length).toBe(circularArray.size)
+    expect(circularArray).toStrictEqual(new CircularArray(4, 4, 5, 6, 7))
+    circularArray = new CircularArray(100)
+    arrayLength = circularArray.push(1, 2, 3, 4, 5)
+    expect(arrayLength).toBe(5)
+    expect(circularArray.size).toBe(100)
+    expect(circularArray.length).toBe(5)
+    expect(circularArray).toStrictEqual(new CircularArray(100, 1, 2, 3, 4, 5))
+  })
+
+  it('Verify that circular array splice works as intended', () => {
+    let circularArray = new CircularArray(1000, 1, 2, 3, 4, 5)
+    let deletedItems = circularArray.splice(2)
+    expect(deletedItems).toStrictEqual(new CircularArray(3, 3, 4, 5))
+    expect(circularArray.length).toBe(2)
+    expect(circularArray).toStrictEqual(new CircularArray(1000, 1, 2))
+    circularArray = new CircularArray(1000, 1, 2, 3, 4, 5)
+    deletedItems = circularArray.splice(2, 1)
+    expect(deletedItems).toStrictEqual(new CircularArray(1, 3))
+    expect(circularArray.length).toBe(4)
+    expect(circularArray).toStrictEqual(new CircularArray(1000, 1, 2, 4, 5))
+    circularArray = new CircularArray(4, 1, 2, 3, 4)
+    deletedItems = circularArray.splice(2, 1, 5, 6)
+    expect(deletedItems).toStrictEqual(new CircularArray(1, 3))
+    expect(circularArray.length).toBe(4)
+    expect(circularArray).toStrictEqual(new CircularArray(4, 2, 4, 5, 6))
+  })
+
+  it('Verify that circular array concat works as intended', () => {
+    let circularArray = new CircularArray(5, 1, 2, 3, 4, 5)
+    circularArray = circularArray.concat(6, 7)
+    expect(circularArray.length).toBe(5)
+    expect(circularArray).toStrictEqual(new CircularArray(5, 3, 4, 5, 6, 7))
+    circularArray = new CircularArray(1)
+    circularArray = circularArray.concat(6, 7)
+    expect(circularArray.length).toBe(1)
+    expect(circularArray).toStrictEqual(new CircularArray(1, 7))
+  })
+
+  it('Verify that circular array unshift works as intended', () => {
+    let circularArray = new CircularArray(5, 1, 2, 3, 4, 5)
+    let arrayLength = circularArray.unshift(6, 7)
+    expect(arrayLength).toBe(5)
+    expect(circularArray.length).toBe(5)
+    expect(circularArray).toStrictEqual(new CircularArray(5, 6, 7, 1, 2, 3))
+    circularArray = new CircularArray(1)
+    arrayLength = circularArray.unshift(6, 7)
+    expect(arrayLength).toBe(1)
+    expect(circularArray.length).toBe(1)
+    expect(circularArray).toStrictEqual(new CircularArray(1, 6))
+  })
+
+  it('Verify that circular array resize works as intended', () => {
+    expect(() => new CircularArray().resize(0.25)).toThrowError(
+      new TypeError('Invalid circular array size: 0.25 is not a safe integer')
+    )
+    expect(() => new CircularArray().resize(-1)).toThrowError(
+      new RangeError('Invalid circular array size: -1 < 0')
+    )
+    expect(() =>
+      new CircularArray().resize(Number.MAX_SAFE_INTEGER + 1)
+    ).toThrowError(
+      new TypeError(
+        `Invalid circular array size: ${
+          Number.MAX_SAFE_INTEGER + 1
+        } is not a safe integer`
+      )
+    )
+    let circularArray = new CircularArray(5, 1, 2, 3, 4, 5)
+    circularArray.resize(0)
+    expect(circularArray.size).toBe(0)
+    expect(circularArray).toStrictEqual(new CircularArray(0))
+    circularArray = new CircularArray(5, 1, 2, 3, 4, 5)
+    circularArray.resize(3)
+    expect(circularArray.size).toBe(3)
+    expect(circularArray).toStrictEqual(new CircularArray(3, 1, 2, 3))
+    circularArray = new CircularArray(5, 1, 2, 3, 4, 5)
+    circularArray.resize(8)
+    expect(circularArray.size).toBe(8)
+    expect(circularArray).toStrictEqual(new CircularArray(8, 1, 2, 3, 4, 5))
+  })
+})
index 658be6a9ef99ed63afb692ba864a4745ebf66f89..5063c7fd31c6f058450536dfea3a6519873a88ba 100644 (file)
@@ -6,6 +6,7 @@ const {
   PoolEvents,
   WorkerChoiceStrategies
 } = require('../../../lib/index')
+const { CircularArray } = require('../../../lib/circular-array')
 
 describe('Abstract pool test suite', () => {
   const numberOfWorkers = 1
@@ -145,7 +146,9 @@ describe('Abstract pool test suite', () => {
       expect(workerItem.tasksUsage.run).toBe(0)
       expect(workerItem.tasksUsage.running).toBe(0)
       expect(workerItem.tasksUsage.runTime).toBe(0)
+      expect(workerItem.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray)
       expect(workerItem.tasksUsage.avgRunTime).toBe(0)
+      expect(workerItem.tasksUsage.medRunTime).toBe(0)
       expect(workerItem.tasksUsage.error).toBe(0)
     }
     await pool.destroy()
@@ -165,7 +168,9 @@ describe('Abstract pool test suite', () => {
       expect(workerItem.tasksUsage.run).toBe(0)
       expect(workerItem.tasksUsage.running).toBe(numberOfWorkers * 2)
       expect(workerItem.tasksUsage.runTime).toBe(0)
+      expect(workerItem.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray)
       expect(workerItem.tasksUsage.avgRunTime).toBe(0)
+      expect(workerItem.tasksUsage.medRunTime).toBe(0)
       expect(workerItem.tasksUsage.error).toBe(0)
     }
     await Promise.all(promises)
@@ -174,7 +179,9 @@ describe('Abstract pool test suite', () => {
       expect(workerItem.tasksUsage.run).toBe(numberOfWorkers * 2)
       expect(workerItem.tasksUsage.running).toBe(0)
       expect(workerItem.tasksUsage.runTime).toBeGreaterThanOrEqual(0)
+      expect(workerItem.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray)
       expect(workerItem.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0)
+      expect(workerItem.tasksUsage.medRunTime).toBe(0)
       expect(workerItem.tasksUsage.error).toBe(0)
     }
     await pool.destroy()
@@ -196,7 +203,9 @@ describe('Abstract pool test suite', () => {
       expect(workerItem.tasksUsage.run).toBe(numberOfWorkers * 2)
       expect(workerItem.tasksUsage.running).toBe(0)
       expect(workerItem.tasksUsage.runTime).toBeGreaterThanOrEqual(0)
+      expect(workerItem.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray)
       expect(workerItem.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0)
+      expect(workerItem.tasksUsage.medRunTime).toBe(0)
       expect(workerItem.tasksUsage.error).toBe(0)
     }
     pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
@@ -205,7 +214,9 @@ describe('Abstract pool test suite', () => {
       expect(workerItem.tasksUsage.run).toBe(0)
       expect(workerItem.tasksUsage.running).toBe(0)
       expect(workerItem.tasksUsage.runTime).toBe(0)
+      expect(workerItem.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray)
       expect(workerItem.tasksUsage.avgRunTime).toBe(0)
+      expect(workerItem.tasksUsage.medRunTime).toBe(0)
       expect(workerItem.tasksUsage.error).toBe(0)
     }
     await pool.destroy()
index b1599a99511a18ea180dc71521fc19d3c34ba17d..5a7394a57aa5fe5abdf643a43d6bfcf48d35f67a 100644 (file)
@@ -77,6 +77,9 @@ describe('Selection strategies test suite', () => {
     expect(
       pool.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime
     ).toBe(false)
+    expect(
+      pool.workerChoiceStrategyContext.getRequiredStatistics().medRunTime
+    ).toBe(false)
     await pool.destroy()
     pool = new DynamicThreadPool(
       min,
@@ -90,6 +93,9 @@ describe('Selection strategies test suite', () => {
     expect(
       pool.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime
     ).toBe(false)
+    expect(
+      pool.workerChoiceStrategyContext.getRequiredStatistics().medRunTime
+    ).toBe(false)
     // We need to clean up the resources after our test
     await pool.destroy()
   })
@@ -230,6 +236,9 @@ describe('Selection strategies test suite', () => {
     expect(
       pool.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime
     ).toBe(false)
+    expect(
+      pool.workerChoiceStrategyContext.getRequiredStatistics().medRunTime
+    ).toBe(false)
     await pool.destroy()
     pool = new DynamicThreadPool(
       min,
@@ -243,6 +252,9 @@ describe('Selection strategies test suite', () => {
     expect(
       pool.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime
     ).toBe(false)
+    expect(
+      pool.workerChoiceStrategyContext.getRequiredStatistics().medRunTime
+    ).toBe(false)
     // We need to clean up the resources after our test
     await pool.destroy()
   })
@@ -318,6 +330,9 @@ describe('Selection strategies test suite', () => {
     expect(
       pool.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime
     ).toBe(false)
+    expect(
+      pool.workerChoiceStrategyContext.getRequiredStatistics().medRunTime
+    ).toBe(false)
     await pool.destroy()
     pool = new DynamicThreadPool(
       min,
@@ -331,6 +346,9 @@ describe('Selection strategies test suite', () => {
     expect(
       pool.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime
     ).toBe(false)
+    expect(
+      pool.workerChoiceStrategyContext.getRequiredStatistics().medRunTime
+    ).toBe(false)
     // We need to clean up the resources after our test
     await pool.destroy()
   })
@@ -420,6 +438,9 @@ describe('Selection strategies test suite', () => {
     expect(
       pool.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime
     ).toBe(true)
+    expect(
+      pool.workerChoiceStrategyContext.getRequiredStatistics().medRunTime
+    ).toBe(false)
     await pool.destroy()
     pool = new DynamicThreadPool(
       min,
@@ -433,6 +454,9 @@ describe('Selection strategies test suite', () => {
     expect(
       pool.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime
     ).toBe(true)
+    expect(
+      pool.workerChoiceStrategyContext.getRequiredStatistics().medRunTime
+    ).toBe(false)
     // We need to clean up the resources after our test
     await pool.destroy()
   })
@@ -600,6 +624,9 @@ describe('Selection strategies test suite', () => {
     expect(
       pool.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime
     ).toBe(true)
+    expect(
+      pool.workerChoiceStrategyContext.getRequiredStatistics().medRunTime
+    ).toBe(false)
     await pool.destroy()
     pool = new DynamicThreadPool(
       min,
@@ -613,6 +640,9 @@ describe('Selection strategies test suite', () => {
     expect(
       pool.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime
     ).toBe(true)
+    expect(
+      pool.workerChoiceStrategyContext.getRequiredStatistics().medRunTime
+    ).toBe(false)
     // We need to clean up the resources after our test
     await pool.destroy()
   })