feat: continuous task stealing
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Fri, 25 Aug 2023 14:27:32 +0000 (16:27 +0200)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Fri, 25 Aug 2023 14:27:32 +0000 (16:27 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
CHANGELOG.md
src/pools/abstract-pool.ts
src/pools/pool.ts
src/pools/worker-node.ts
src/pools/worker.ts
src/utils.ts
tests/pools/abstract/abstract-pool.test.js
tests/pools/selection-strategies/selection-strategies.test.js

index 0e8866ffd0b5096aec3d5ce5ab1900cd3f4af8ea..653c06a1211332991b1370b1644e4f74c0eeae04 100644 (file)
@@ -16,6 +16,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
 
 - Update simple moving average implementation to use a circular buffer.
 - Update simple moving median implementation to use a circular buffer.
+- Account for stolen tasks in worker usage statistics and pool information.
+
+### Added
+
+- Continuous tasks stealing algorithm.
 
 ## [2.6.34] - 2023-08-24
 
index 062df07791af8666fd94282eac229aef466c7a1b..4abbd615efe13869feaff7b9e1b19ee584b17990 100644 (file)
@@ -405,6 +405,13 @@ export abstract class AbstractPool<
       ...(this.opts.enableTasksQueue === true && {
         backPressure: this.hasBackPressure()
       }),
+      ...(this.opts.enableTasksQueue === true && {
+        stolenTasks: this.workerNodes.reduce(
+          (accumulator, workerNode) =>
+            accumulator + workerNode.usage.tasks.stolen,
+          0
+        )
+      }),
       failedTasks: this.workerNodes.reduce(
         (accumulator, workerNode) =>
           accumulator + workerNode.usage.tasks.failed,
@@ -1262,6 +1269,14 @@ export abstract class AbstractPool<
         } else {
           this.enqueueTask(destinationWorkerNodeKey, task)
         }
+        ++destinationWorkerNode.usage.tasks.stolen
+        if (this.shallUpdateTaskFunctionWorkerUsage(destinationWorkerNodeKey)) {
+          const taskFunctionWorkerUsage =
+            destinationWorkerNode.getTaskFunctionWorkerUsage(
+              task.name as string
+            ) as WorkerUsage
+          ++taskFunctionWorkerUsage.tasks.stolen
+        }
         break
       }
     }
@@ -1297,6 +1312,13 @@ export abstract class AbstractPool<
         } else {
           this.enqueueTask(workerNodeKey, task)
         }
+        ++workerNode.usage.tasks.stolen
+        if (this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey)) {
+          const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
+            task.name as string
+          ) as WorkerUsage
+          ++taskFunctionWorkerUsage.tasks.stolen
+        }
       }
     }
   }
index cc028fecf45d7e3106b09cd0028a55e977e284b9..4463560c2754e82fd8b14564bd7c18a3268dc671 100644 (file)
@@ -80,6 +80,7 @@ export interface PoolInfo {
   readonly queuedTasks?: number
   readonly maxQueuedTasks?: number
   readonly backPressure?: boolean
+  readonly stolenTasks?: number
   readonly failedTasks: number
   readonly runTime?: {
     readonly minimum: number
index e79aed0bc3aa68b880d0d265bca4fb2ecf8a9bff..e28372efa1b3c73cf9c8904d6077f87fed1dd553 100644 (file)
@@ -1,7 +1,12 @@
 import { MessageChannel } from 'node:worker_threads'
 import { CircularArray } from '../circular-array'
 import type { Task } from '../utility-types'
-import { DEFAULT_TASK_NAME } from '../utils'
+import {
+  DEFAULT_TASK_NAME,
+  EMPTY_FUNCTION,
+  exponentialDelay,
+  sleep
+} from '../utils'
 import { Deque } from '../deque'
 import {
   type IWorker,
@@ -36,6 +41,7 @@ implements IWorkerNode<Worker, Data> {
   public onEmptyQueue?: (workerId: number) => void
   private readonly taskFunctionsUsage: Map<string, WorkerUsage>
   private readonly tasksQueue: Deque<Task<Data>>
+  private onEmptyQueueCount: number
 
   /**
    * Constructs a new worker node.
@@ -76,6 +82,7 @@ implements IWorkerNode<Worker, Data> {
     this.taskFunctionsUsage = new Map<string, WorkerUsage>()
     this.tasksQueue = new Deque<Task<Data>>()
     this.tasksQueueBackPressureSize = tasksQueueBackPressureSize
+    this.onEmptyQueueCount = 0
   }
 
   /** @inheritdoc */
@@ -105,7 +112,7 @@ implements IWorkerNode<Worker, Data> {
   public dequeueTask (): Task<Data> | undefined {
     const task = this.tasksQueue.shift()
     if (this.onEmptyQueue != null && this.tasksQueue.size === 0) {
-      this.onEmptyQueue(this.info.id as number)
+      this.startOnEmptyQueue().catch(EMPTY_FUNCTION)
     }
     return task
   }
@@ -114,7 +121,7 @@ implements IWorkerNode<Worker, Data> {
   public popTask (): Task<Data> | undefined {
     const task = this.tasksQueue.pop()
     if (this.onEmptyQueue != null && this.tasksQueue.size === 0) {
-      this.onEmptyQueue(this.info.id as number)
+      this.startOnEmptyQueue().catch(EMPTY_FUNCTION)
     }
     return task
   }
@@ -170,6 +177,19 @@ implements IWorkerNode<Worker, Data> {
     return this.taskFunctionsUsage.get(name)
   }
 
+  private async startOnEmptyQueue (): Promise<void> {
+    if (this.onEmptyQueue != null) {
+      if (this.tasksQueue.size > 0) {
+        this.onEmptyQueueCount = 0
+        return
+      }
+      this.onEmptyQueue(this.info.id as number)
+      ++this.onEmptyQueueCount
+      await sleep(exponentialDelay(this.onEmptyQueueCount))
+      await this.startOnEmptyQueue()
+    }
+  }
+
   private initWorkerInfo (worker: Worker, workerType: WorkerType): WorkerInfo {
     return {
       id: this.getWorkerId(worker, workerType),
@@ -196,6 +216,7 @@ implements IWorkerNode<Worker, Data> {
         get maxQueued (): number {
           return getTasksQueueMaxSize()
         },
+        stolen: 0,
         failed: 0
       },
       runTime: {
@@ -236,6 +257,7 @@ implements IWorkerNode<Worker, Data> {
         get queued (): number {
           return getTaskFunctionQueueSize()
         },
+        stolen: 0,
         failed: 0
       },
       runTime: {
index 6e81112d27e62a532f519411cc72f1cefef80059..52b56a2411f0f41f7c56452270b2c63be7ecd0a5 100644 (file)
@@ -96,6 +96,10 @@ export interface TaskStatistics {
    * Maximum number of queued tasks.
    */
   readonly maxQueued?: number
+  /**
+   * Number of stolen tasks.
+   */
+  stolen: number
   /**
    * Number of failed tasks.
    */
index d39bfa02c8498ed54d8bebc8d9ace5a7a7fd756f..b5fb9afeda348ddd15e9c441a3c23d67c07ffeb0 100644 (file)
@@ -59,22 +59,34 @@ export const availableParallelism = (): number => {
   return availableParallelism
 }
 
-// /**
-//  * Computes the retry delay in milliseconds using an exponential back off algorithm.
-//  *
-//  * @param retryNumber - The number of retries that have already been attempted
-//  * @param maxDelayRatio - The maximum ratio of the delay that can be randomized
-//  * @returns Delay in milliseconds
-//  * @internal
-//  */
-// export const exponentialDelay = (
-//   retryNumber = 0,
-//   maxDelayRatio = 0.2
-// ): number => {
-//   const delay = Math.pow(2, retryNumber) * 100
-//   const randomSum = delay * maxDelayRatio * Math.random() // 0-(maxDelayRatio*100)% of the delay
-//   return delay + randomSum
-// }
+/**
+ * Sleeps for the given amount of milliseconds.
+ *
+ * @param ms - The amount of milliseconds to sleep.
+ * @returns A promise that resolves after the given amount of milliseconds.
+ */
+export const sleep = async (ms: number): Promise<void> => {
+  await new Promise((resolve) => {
+    setTimeout(resolve, ms)
+  })
+}
+
+/**
+ * Computes the retry delay in milliseconds using an exponential back off algorithm.
+ *
+ * @param retryNumber - The number of retries that have already been attempted
+ * @param maxDelayRatio - The maximum ratio of the delay that can be randomized
+ * @returns Delay in milliseconds
+ * @internal
+ */
+export const exponentialDelay = (
+  retryNumber = 0,
+  maxDelayRatio = 0.2
+): number => {
+  const delay = Math.pow(2, retryNumber) * 100
+  const randomSum = delay * maxDelayRatio * secureRandom() // 0-(maxDelayRatio*100)% of the delay
+  return delay + randomSum
+}
 
 /**
  * Computes the average of the given data set.
@@ -234,3 +246,12 @@ export const once = (
     }
   }
 }
+
+/**
+ * Generate a cryptographically secure random number in the [0,1[ range
+ *
+ * @returns A number in the [0,1[ range
+ */
+const secureRandom = (): number => {
+  return crypto.getRandomValues(new Uint32Array(1))[0] / 0x100000000
+}
index 7f7c4dce14220256bd7003451a1414df39d242e0..933e1f944f863bdbcdb4110c1d75359e8f571b4c 100644 (file)
@@ -618,6 +618,7 @@ describe('Abstract pool test suite', () => {
           executing: 0,
           queued: 0,
           maxQueued: 0,
+          stolen: 0,
           failed: 0
         },
         runTime: {
@@ -710,6 +711,7 @@ describe('Abstract pool test suite', () => {
           executing: maxMultiplier,
           queued: 0,
           maxQueued: 0,
+          stolen: 0,
           failed: 0
         },
         runTime: {
@@ -736,6 +738,7 @@ describe('Abstract pool test suite', () => {
           executing: 0,
           queued: 0,
           maxQueued: 0,
+          stolen: 0,
           failed: 0
         },
         runTime: {
@@ -776,6 +779,7 @@ describe('Abstract pool test suite', () => {
           executing: 0,
           queued: 0,
           maxQueued: 0,
+          stolen: 0,
           failed: 0
         },
         runTime: {
@@ -810,6 +814,7 @@ describe('Abstract pool test suite', () => {
           executing: 0,
           queued: 0,
           maxQueued: 0,
+          stolen: 0,
           failed: 0
         },
         runTime: {
@@ -977,6 +982,7 @@ describe('Abstract pool test suite', () => {
       maxQueuedTasks: expect.any(Number),
       queuedTasks: expect.any(Number),
       backPressure: true,
+      stolenTasks: expect.any(Number),
       failedTasks: expect.any(Number)
     })
     expect(pool.hasBackPressure.called).toBe(true)
@@ -1040,7 +1046,8 @@ describe('Abstract pool test suite', () => {
             executed: expect.any(Number),
             executing: expect.any(Number),
             failed: 0,
-            queued: 0
+            queued: 0,
+            stolen: 0
           },
           runTime: {
             history: expect.any(CircularArray)
index 0e47e18efade43e35b2894ba6824d666ddb13daa..77713b6f03f20f8dc883cad07820b292a16d47de 100644 (file)
@@ -217,6 +217,7 @@ describe('Selection strategies test suite', () => {
           executing: 0,
           queued: 0,
           maxQueued: 0,
+          stolen: 0,
           failed: 0
         },
         runTime: {
@@ -265,6 +266,7 @@ describe('Selection strategies test suite', () => {
           executing: 0,
           queued: 0,
           maxQueued: 0,
+          stolen: 0,
           failed: 0
         },
         runTime: {
@@ -464,6 +466,7 @@ describe('Selection strategies test suite', () => {
           executing: 0,
           queued: 0,
           maxQueued: 0,
+          stolen: 0,
           failed: 0
         },
         runTime: {
@@ -511,6 +514,7 @@ describe('Selection strategies test suite', () => {
           executing: 0,
           queued: 0,
           maxQueued: 0,
+          stolen: 0,
           failed: 0
         },
         runTime: {
@@ -639,6 +643,7 @@ describe('Selection strategies test suite', () => {
           executing: 0,
           queued: 0,
           maxQueued: 0,
+          stolen: 0,
           failed: 0
         },
         runTime: {
@@ -696,6 +701,7 @@ describe('Selection strategies test suite', () => {
           executing: 0,
           queued: 0,
           maxQueued: 0,
+          stolen: 0,
           failed: 0
         },
         runTime: {
@@ -834,6 +840,7 @@ describe('Selection strategies test suite', () => {
           executing: 0,
           queued: 0,
           maxQueued: 0,
+          stolen: 0,
           failed: 0
         },
         runTime: {
@@ -843,12 +850,12 @@ describe('Selection strategies test suite', () => {
           history: expect.any(CircularArray)
         },
         elu: {
-          idle: expect.objectContaining({
+          idle: {
             history: expect.any(CircularArray)
-          }),
-          active: expect.objectContaining({
+          },
+          active: {
             history: expect.any(CircularArray)
-          })
+          }
         }
       })
       expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
@@ -887,6 +894,7 @@ describe('Selection strategies test suite', () => {
           executing: 0,
           queued: 0,
           maxQueued: 0,
+          stolen: 0,
           failed: 0
         },
         runTime: {
@@ -896,12 +904,12 @@ describe('Selection strategies test suite', () => {
           history: expect.any(CircularArray)
         },
         elu: {
-          idle: expect.objectContaining({
+          idle: {
             history: expect.any(CircularArray)
-          }),
-          active: expect.objectContaining({
+          },
+          active: {
             history: expect.any(CircularArray)
-          })
+          }
         }
       })
       expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
@@ -1021,21 +1029,22 @@ describe('Selection strategies test suite', () => {
           executing: 0,
           queued: 0,
           maxQueued: 0,
+          stolen: 0,
           failed: 0
         },
-        runTime: expect.objectContaining({
+        runTime: {
           history: expect.any(CircularArray)
-        }),
+        },
         waitTime: {
           history: expect.any(CircularArray)
         },
         elu: {
-          idle: expect.objectContaining({
+          idle: {
             history: expect.any(CircularArray)
-          }),
-          active: expect.objectContaining({
+          },
+          active: {
             history: expect.any(CircularArray)
-          })
+          }
         }
       })
       expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
@@ -1089,21 +1098,22 @@ describe('Selection strategies test suite', () => {
           executing: 0,
           queued: 0,
           maxQueued: 0,
+          stolen: 0,
           failed: 0
         },
-        runTime: expect.objectContaining({
+        runTime: {
           history: expect.any(CircularArray)
-        }),
+        },
         waitTime: {
           history: expect.any(CircularArray)
         },
         elu: {
-          idle: expect.objectContaining({
+          idle: {
             history: expect.any(CircularArray)
-          }),
-          active: expect.objectContaining({
+          },
+          active: {
             history: expect.any(CircularArray)
-          })
+          }
         }
       })
       expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
@@ -1162,21 +1172,22 @@ describe('Selection strategies test suite', () => {
           executing: 0,
           queued: 0,
           maxQueued: 0,
+          stolen: 0,
           failed: 0
         },
-        runTime: expect.objectContaining({
+        runTime: {
           history: expect.any(CircularArray)
-        }),
+        },
         waitTime: {
           history: expect.any(CircularArray)
         },
         elu: {
-          idle: expect.objectContaining({
+          idle: {
             history: expect.any(CircularArray)
-          }),
-          active: expect.objectContaining({
+          },
+          active: {
             history: expect.any(CircularArray)
-          })
+          }
         }
       })
       expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
@@ -1385,6 +1396,7 @@ describe('Selection strategies test suite', () => {
           executing: 0,
           queued: 0,
           maxQueued: 0,
+          stolen: 0,
           failed: 0
         },
         runTime: expect.objectContaining({
@@ -1452,6 +1464,7 @@ describe('Selection strategies test suite', () => {
           executing: 0,
           queued: 0,
           maxQueued: 0,
+          stolen: 0,
           failed: 0
         },
         runTime: expect.objectContaining({
@@ -1524,6 +1537,7 @@ describe('Selection strategies test suite', () => {
           executing: 0,
           queued: 0,
           maxQueued: 0,
+          stolen: 0,
           failed: 0
         },
         runTime: expect.objectContaining({
@@ -1755,6 +1769,7 @@ describe('Selection strategies test suite', () => {
           executing: 0,
           queued: 0,
           maxQueued: 0,
+          stolen: 0,
           failed: 0
         },
         runTime: {
@@ -1825,6 +1840,7 @@ describe('Selection strategies test suite', () => {
           executing: 0,
           queued: 0,
           maxQueued: 0,
+          stolen: 0,
           failed: 0
         },
         runTime: {