feat: use SMA and SMM for worker tasks usage
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Thu, 24 Aug 2023 17:32:08 +0000 (19:32 +0200)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Thu, 24 Aug 2023 17:32:08 +0000 (19:32 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
CHANGELOG.md
docs/api.md
docs/worker-choice-strategies.md
src/circular-array.ts
src/pools/abstract-pool.ts
src/utils.ts
tests/pools/abstract/abstract-pool.test.js
tests/utils.test.js
tests/worker/abstract-worker.test.js

index 93289b8dce36a22b66dca782b2e391bac47df293..811b839732be96b0b934d490f1b457922255b955 100644 (file)
@@ -7,6 +7,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
 
 ## [Unreleased]
 
+### Fixed
+
+- Don't account worker usage statistics for tasks that have failed.
+
+### Changed
+
+- Update simple moving average implementation to use a circular buffer.
+- Update simple moving median implementation to use a circular buffer.
+
 ## [2.6.34] - 2023-08-24
 
 ### Fixes
index e50ebf0dcced0aff6bdb89e21478007534910d2a..5ffe92c08b2e0052fc4dcead9622bbdf13a1fc99 100644 (file)
@@ -76,9 +76,9 @@ An object with these properties:
 
   - `choiceRetries` (optional) - The number of retries to perform if no worker is eligible.
   - `measurement` (optional) - The measurement to use in worker choice strategies: `runTime`, `waitTime` or `elu`.
-  - `runTime` (optional) - Use the tasks [median](./../docs/worker-choice-strategies.md#median) runtime instead of the tasks average runtime in worker choice strategies.
-  - `waitTime` (optional) - Use the tasks [median](./../docs/worker-choice-strategies.md#median) wait time instead of the tasks average wait time in worker choice strategies.
-  - `elu` (optional) - Use the tasks [median](./../docs/worker-choice-strategies.md#median) ELU instead of the tasks average ELU in worker choice strategies.
+  - `runTime` (optional) - Use the tasks simple moving [median](./../docs/worker-choice-strategies.md#simple-moving-median) runtime instead of the tasks simple moving average runtime in worker choice strategies.
+  - `waitTime` (optional) - Use the tasks simple moving [median](./../docs/worker-choice-strategies.md#simple-moving-median) wait time instead of the tasks simple moving average wait time in worker choice strategies.
+  - `elu` (optional) - Use the tasks simple moving [median](./../docs/worker-choice-strategies.md#simple-moving-median) ELU instead of the tasks simple moving average ELU in worker choice strategies.
   - `weights` (optional) - The worker weights to use in weighted round robin worker choice strategies: `{ 0: 200, 1: 300, ..., n: 100 }`.
 
   Default: `{ choiceRetries: 6, runTime: { median: false }, waitTime: { median: false }, elu: { median: false } }`
index 147a500e44bd4beb02a5c79637d23c71b7fc3261..296d8c3b1cc7370058577d654d44625d9d3cfe13 100644 (file)
@@ -9,14 +9,14 @@ All duration or timestamp are expressed in milliseconds.
   - [Weighted round robin](#weighted-round-robin)
   - [Interleaved weighted round robin](#interleaved-weighted-round-robin)
 - [Statistics](#statistics)
-  - [Median](#median)
+  - [Simple moving median](#simple-moving-median)
 
 ## Strategies
 
 ### Fair share
 
-Its goal is to distribute the load evenly across all workers. To achieve this, the strategy keeps track of the average task execution time for each worker and assigns the next task to the worker with the lowest task end prediction time: `task_end_prediction = max(current_time, task_end_prediction) + average_task_execution_time`.  
-By default, the strategy uses the average task execution time for each worker but it can be configured to use the average task event loop utilization (ELU) active time instead.
+Its goal is to distribute the load evenly across all workers. To achieve this, the strategy keeps track of the simple moving average task execution time for each worker and assigns the next task to the worker with the lowest task end prediction time: `task_end_prediction = max(current_time, task_end_prediction) + simple_moving_average_task_execution_time`.  
+By default, the strategy uses the simple moving average task execution time for each worker but it can be configured to use the simple moving average task event loop utilization (ELU) active time instead.
 
 ### Weighted round robin
 
@@ -32,6 +32,6 @@ The worker default weights are the same for all workers and is computed given th
 
 Worker choice strategies enable only the statistics that are needed to choose the next worker to avoid unnecessary overhead.
 
-### Median
+### Simple moving median
 
-Strategies using the average task execution time for each worker can use the median instead. Median is more robust to outliers and can be used to avoid assigning tasks to workers that are currently overloaded. Median usage introduces a small overhead: measurement history must be kept for each worker and the median must be recomputed each time a task has finished.
+Strategies using the average task execution time for each worker can use the simple moving median instead. Simple moving median is more robust to outliers and can be used to avoid assigning tasks to workers that are currently overloaded. Simple moving median usage introduces a small overhead: measurement history must be kept for each worker and the simple moving median must be recomputed each time a task has finished.
index 9cea08bcdd7f09463a2e7eeab323483b1ee676cc..8f545179ab2a90feb3d06fec15ee038552a25221 100644 (file)
@@ -1,6 +1,6 @@
 // Copyright Jerome Benoit. 2021-2023. All Rights Reserved.
 
-const DEFAULT_CIRCULAR_ARRAY_SIZE = 1024
+export const DEFAULT_CIRCULAR_ARRAY_SIZE = 1024
 
 /**
  * Array with a maximum length and shifting items when full.
index 507fcc10e91e809b13948b85325e413c1ccac621..a11a574147cf074dc938d1e0d47b3797e6e15ad2 100644 (file)
@@ -12,6 +12,7 @@ import {
   DEFAULT_TASK_NAME,
   DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
   EMPTY_FUNCTION,
+  average,
   isKillBehavior,
   isPlainObject,
   median,
@@ -427,16 +428,13 @@ export abstract class AbstractPool<
             )
           ),
           average: round(
-            this.workerNodes.reduce(
-              (accumulator, workerNode) =>
-                accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
-              0
-            ) /
-              this.workerNodes.reduce(
+            average(
+              this.workerNodes.reduce<number[]>(
                 (accumulator, workerNode) =>
-                  accumulator + (workerNode.usage.tasks?.executed ?? 0),
-                0
+                  accumulator.concat(workerNode.usage.runTime.history),
+                []
               )
+            )
           ),
           ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
             .runTime.median && {
@@ -468,16 +466,13 @@ export abstract class AbstractPool<
             )
           ),
           average: round(
-            this.workerNodes.reduce(
-              (accumulator, workerNode) =>
-                accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
-              0
-            ) /
-              this.workerNodes.reduce(
+            average(
+              this.workerNodes.reduce<number[]>(
                 (accumulator, workerNode) =>
-                  accumulator + (workerNode.usage.tasks?.executed ?? 0),
-                0
+                  accumulator.concat(workerNode.usage.waitTime.history),
+                []
               )
+            )
           ),
           ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
             .waitTime.median && {
@@ -940,11 +935,13 @@ export abstract class AbstractPool<
     workerUsage: WorkerUsage,
     message: MessageValue<Response>
   ): void {
+    if (message.taskError != null) {
+      return
+    }
     updateMeasurementStatistics(
       workerUsage.runTime,
       this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
-      message.taskPerformance?.runTime ?? 0,
-      workerUsage.tasks.executed
+      message.taskPerformance?.runTime ?? 0
     )
   }
 
@@ -957,8 +954,7 @@ export abstract class AbstractPool<
     updateMeasurementStatistics(
       workerUsage.waitTime,
       this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
-      taskWaitTime,
-      workerUsage.tasks.executed
+      taskWaitTime
     )
   }
 
@@ -966,19 +962,20 @@ export abstract class AbstractPool<
     workerUsage: WorkerUsage,
     message: MessageValue<Response>
   ): void {
+    if (message.taskError != null) {
+      return
+    }
     const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
       this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
     updateMeasurementStatistics(
       workerUsage.elu.active,
       eluTaskStatisticsRequirements,
-      message.taskPerformance?.elu?.active ?? 0,
-      workerUsage.tasks.executed
+      message.taskPerformance?.elu?.active ?? 0
     )
     updateMeasurementStatistics(
       workerUsage.elu.idle,
       eluTaskStatisticsRequirements,
-      message.taskPerformance?.elu?.idle ?? 0,
-      workerUsage.tasks.executed
+      message.taskPerformance?.elu?.idle ?? 0
     )
     if (eluTaskStatisticsRequirements.aggregate) {
       if (message.taskPerformance?.elu != null) {
index 77f889c9b417ad008b0352b8c2e1779788ceb6d9..d39bfa02c8498ed54d8bebc8d9ace5a7a7fd756f 100644 (file)
@@ -76,6 +76,26 @@ export const availableParallelism = (): number => {
 //   return delay + randomSum
 // }
 
+/**
+ * Computes the average of the given data set.
+ *
+ * @param dataSet - Data set.
+ * @returns The average of the given data set.
+ * @internal
+ */
+export const average = (dataSet: number[]): number => {
+  if (Array.isArray(dataSet) && dataSet.length === 0) {
+    return 0
+  }
+  if (Array.isArray(dataSet) && dataSet.length === 1) {
+    return dataSet[0]
+  }
+  return (
+    dataSet.reduce((accumulator, number) => accumulator + number, 0) /
+    dataSet.length
+  )
+}
+
 /**
  * Computes the median of the given data set.
  *
@@ -163,8 +183,7 @@ export const isAsyncFunction = (
 export const updateMeasurementStatistics = (
   measurementStatistics: MeasurementStatistics,
   measurementRequirements: MeasurementStatisticsRequirements,
-  measurementValue: number,
-  numberOfMeasurements: number
+  measurementValue: number
 ): void => {
   if (measurementRequirements.aggregate) {
     measurementStatistics.aggregate =
@@ -177,13 +196,17 @@ export const updateMeasurementStatistics = (
       measurementValue,
       measurementStatistics.maximum ?? -Infinity
     )
-    if (measurementRequirements.average && numberOfMeasurements !== 0) {
-      measurementStatistics.average =
-        measurementStatistics.aggregate / numberOfMeasurements
-    }
-    if (measurementRequirements.median && measurementValue != null) {
+    if (
+      (measurementRequirements.average || measurementRequirements.median) &&
+      measurementValue != null
+    ) {
       measurementStatistics.history.push(measurementValue)
-      measurementStatistics.median = median(measurementStatistics.history)
+      if (measurementRequirements.average) {
+        measurementStatistics.average = average(measurementStatistics.history)
+      }
+      if (measurementRequirements.median) {
+        measurementStatistics.median = median(measurementStatistics.history)
+      }
     }
   }
 }
index 812509513bffee46808b739d1fb7422e8c8eb3b7..a5976d99d88d16d381d2e82cd56889ec91539d91 100644 (file)
@@ -23,6 +23,10 @@ describe('Abstract pool test suite', () => {
     }
   }
 
+  afterEach(() => {
+    sinon.restore()
+  })
+
   it('Simulate pool creation from a non main thread/process', () => {
     expect(
       () =>
index f5a5366de86bbd212f456d5a142ec9d9dd1bd5aa..97b5e8a985bd9569efe405c813740b4551c4c607 100644 (file)
@@ -1,7 +1,11 @@
 const { expect } = require('expect')
-const { CircularArray } = require('../lib/circular-array')
+const {
+  CircularArray,
+  DEFAULT_CIRCULAR_ARRAY_SIZE
+} = require('../lib/circular-array')
 const {
   availableParallelism,
+  average,
   isAsyncFunction,
   isKillBehavior,
   isPlainObject,
@@ -18,6 +22,17 @@ describe('Utils test suite', () => {
     expect(Number.isSafeInteger(availableParallelism())).toBe(true)
   })
 
+  it('Verify average() computation', () => {
+    expect(average([])).toBe(0)
+    expect(average([0.08])).toBe(0.08)
+    expect(average([0.25, 4.75, 3.05, 6.04, 1.01, 2.02, 5.03])).toBe(
+      3.1642857142857146
+    )
+    expect(average([0.25, 4.75, 3.05, 6.04, 1.01, 2.02])).toBe(
+      2.8533333333333335
+    )
+  })
+
   it('Verify median() computation', () => {
     expect(median([])).toBe(0)
     expect(median([0.08])).toBe(0.08)
@@ -137,39 +152,48 @@ describe('Utils test suite', () => {
     updateMeasurementStatistics(
       measurementStatistics,
       { aggregate: true, average: false, median: false },
-      0.01,
-      1
+      0.01
     )
     expect(measurementStatistics).toStrictEqual({
       aggregate: 0.01,
       maximum: 0.01,
       minimum: 0.01,
-      history: expect.any(CircularArray)
+      history: new CircularArray()
     })
     updateMeasurementStatistics(
       measurementStatistics,
       { aggregate: true, average: false, median: false },
-      0.02,
-      2
+      0.02
     )
     expect(measurementStatistics).toStrictEqual({
       aggregate: 0.03,
       maximum: 0.02,
       minimum: 0.01,
-      history: expect.any(CircularArray)
+      history: new CircularArray()
     })
     updateMeasurementStatistics(
       measurementStatistics,
       { aggregate: true, average: true, median: false },
-      0.001,
-      3
+      0.001
     )
     expect(measurementStatistics).toStrictEqual({
       aggregate: 0.031,
       maximum: 0.02,
       minimum: 0.001,
-      average: 0.010333333333333333,
-      history: expect.any(CircularArray)
+      average: 0.001,
+      history: new CircularArray(DEFAULT_CIRCULAR_ARRAY_SIZE, 0.001)
+    })
+    updateMeasurementStatistics(
+      measurementStatistics,
+      { aggregate: true, average: true, median: false },
+      0.003
+    )
+    expect(measurementStatistics).toStrictEqual({
+      aggregate: 0.034,
+      maximum: 0.02,
+      minimum: 0.001,
+      average: 0.002,
+      history: new CircularArray(DEFAULT_CIRCULAR_ARRAY_SIZE, 0.001, 0.003)
     })
   })
 })
index 68665bc442fd540c9daaba0163340825ed347c29..6fd0ea2cad02f720c7de7392bbf9246259869ba0 100644 (file)
@@ -11,6 +11,10 @@ describe('Abstract worker test suite', () => {
     }
   }
 
+  afterEach(() => {
+    sinon.restore()
+  })
+
   it('Verify worker options default values', () => {
     const worker = new ThreadWorker(() => {})
     expect(worker.opts.maxInactiveTime).toStrictEqual(60000)