feat: add continuous task stealing
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Thu, 24 Aug 2023 20:53:02 +0000 (22:53 +0200)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Thu, 24 Aug 2023 20:53:02 +0000 (22:53 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
CHANGELOG.md
docs/api.md
src/pools/abstract-pool.ts
src/pools/pool.ts
tests/pools/abstract/abstract-pool.test.js

index 0e8866ffd0b5096aec3d5ce5ab1900cd3f4af8ea..3435faeb07a473b20c5f7047eab75e47308854bf 100644 (file)
@@ -17,6 +17,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
 - Update simple moving average implementation to use a circular buffer.
 - Update simple moving median implementation to use a circular buffer.
 
+### Added
+
+- Support for continuous tasks stealing.
+
 ## [2.6.34] - 2023-08-24
 
 ### Fixes
index 5ffe92c08b2e0052fc4dcead9622bbdf13a1fc99..9769c3c7d43eab28b128161d83953b82584c86ce 100644 (file)
@@ -76,9 +76,9 @@ An object with these properties:
 
   - `choiceRetries` (optional) - The number of retries to perform if no worker is eligible.
   - `measurement` (optional) - The measurement to use in worker choice strategies: `runTime`, `waitTime` or `elu`.
-  - `runTime` (optional) - Use the tasks simple moving [median](./../docs/worker-choice-strategies.md#simple-moving-median) runtime instead of the tasks simple moving average runtime in worker choice strategies.
-  - `waitTime` (optional) - Use the tasks simple moving [median](./../docs/worker-choice-strategies.md#simple-moving-median) wait time instead of the tasks simple moving average wait time in worker choice strategies.
-  - `elu` (optional) - Use the tasks simple moving [median](./../docs/worker-choice-strategies.md#simple-moving-median) ELU instead of the tasks simple moving average ELU in worker choice strategies.
+  - `runTime` (optional) - Use the tasks [simple moving median](./../docs/worker-choice-strategies.md#simple-moving-median) runtime instead of the tasks simple moving average runtime in worker choice strategies.
+  - `waitTime` (optional) - Use the tasks [simple moving median](./../docs/worker-choice-strategies.md#simple-moving-median) wait time instead of the tasks simple moving average wait time in worker choice strategies.
+  - `elu` (optional) - Use the tasks [simple moving median](./../docs/worker-choice-strategies.md#simple-moving-median) ELU instead of the tasks simple moving average ELU in worker choice strategies.
   - `weights` (optional) - The worker weights to use in weighted round robin worker choice strategies: `{ 0: 200, 1: 300, ..., n: 100 }`.
 
   Default: `{ choiceRetries: 6, runTime: { median: false }, waitTime: { median: false }, elu: { median: false } }`
index 8c202696cd03d01b4e1c02163bfd32930c2ea141..a6ffcbbb3959caa7ac2480656a25798fa5223856 100644 (file)
@@ -785,6 +785,7 @@ export abstract class AbstractPool<
       if (
         this.opts.enableTasksQueue === false ||
         (this.opts.enableTasksQueue === true &&
+          this.tasksQueueSize(workerNodeKey) === 0 &&
           this.workerNodes[workerNodeKey].usage.tasks.executing <
             (this.opts.tasksQueueOptions?.concurrency as number))
       ) {
@@ -836,7 +837,7 @@ export abstract class AbstractPool<
    * @virtual
    */
   protected setupHook (): void {
-    // Intentionally empty
+    /** Intentionally empty */
   }
 
   /**
@@ -1199,15 +1200,8 @@ export abstract class AbstractPool<
     while (this.tasksQueueSize(workerNodeKey) > 0) {
       let destinationWorkerNodeKey!: number
       let minQueuedTasks = Infinity
-      let executeTask = false
       for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
         if (workerNode.info.ready && workerNodeId !== workerNodeKey) {
-          if (
-            workerNode.usage.tasks.executing <
-            (this.opts.tasksQueueOptions?.concurrency as number)
-          ) {
-            executeTask = true
-          }
           if (workerNode.usage.tasks.queued === 0) {
             destinationWorkerNodeKey = workerNodeId
             break
@@ -1219,12 +1213,16 @@ export abstract class AbstractPool<
         }
       }
       if (destinationWorkerNodeKey != null) {
+        const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey]
         const task = {
           ...(this.dequeueTask(workerNodeKey) as Task<Data>),
-          workerId: (this.getWorkerInfo(destinationWorkerNodeKey) as WorkerInfo)
-            .id as number
+          workerId: destinationWorkerNode.info.id as number
         }
-        if (executeTask) {
+        if (
+          this.tasksQueueSize(destinationWorkerNodeKey) === 0 &&
+          destinationWorkerNode.usage.tasks.executing <
+            (this.opts.tasksQueueOptions?.concurrency as number)
+        ) {
           this.executeTask(destinationWorkerNodeKey, task)
         } else {
           this.enqueueTask(destinationWorkerNodeKey, task)
@@ -1255,13 +1253,18 @@ export abstract class AbstractPool<
           ...(sourceWorkerNode.popTask() as Task<Data>),
           workerId: destinationWorkerNode.info.id as number
         }
+        // Enqueue task for continuous task stealing
+        this.enqueueTask(destinationWorkerNodeKey, task)
+        // Avoid starvation
         if (
+          this.tasksQueueSize(destinationWorkerNodeKey) > 0 &&
           destinationWorkerNode.usage.tasks.executing <
-          (this.opts.tasksQueueOptions?.concurrency as number)
+            (this.opts.tasksQueueOptions?.concurrency as number)
         ) {
-          this.executeTask(destinationWorkerNodeKey, task)
-        } else {
-          this.enqueueTask(destinationWorkerNodeKey, task)
+          this.executeTask(
+            destinationWorkerNodeKey,
+            this.dequeueTask(destinationWorkerNodeKey) as Task<Data>
+          )
         }
         break
       }
@@ -1290,8 +1293,9 @@ export abstract class AbstractPool<
           workerId: workerNode.info.id as number
         }
         if (
+          this.tasksQueueSize(workerNodeKey) === 0 &&
           workerNode.usage.tasks.executing <
-          (this.opts.tasksQueueOptions?.concurrency as number)
+            (this.opts.tasksQueueOptions?.concurrency as number)
         ) {
           this.executeTask(workerNodeKey, task)
         } else {
index 7b5498096315988a93c247223ef582af6a5f99f0..cc028fecf45d7e3106b09cd0028a55e977e284b9 100644 (file)
@@ -209,7 +209,7 @@ export interface IPool<
    * - `'ready'`: Emitted when the number of workers created in the pool has reached the minimum size expected and are ready.
    * - `'busy'`: Emitted when the number of workers created in the pool has reached the maximum size expected and are executing at least one task.
    * - `'full'`: Emitted when the pool is dynamic and the number of workers created has reached the maximum size expected.
-   * - '`destroy`': Emitted when the pool is destroyed.
+   * - `'destroy'`: Emitted when the pool is destroyed.
    * - `'error'`: Emitted when an uncaught error occurs.
    * - `'taskError'`: Emitted when an error occurs while executing a task.
    * - `'backPressure'`: Emitted when all worker nodes have back pressure (i.e. their tasks queue is full: queue size \>= maximum queue size).
index a5976d99d88d16d381d2e82cd56889ec91539d91..7f7c4dce14220256bd7003451a1414df39d242e0 100644 (file)
@@ -956,11 +956,11 @@ describe('Abstract pool test suite', () => {
       ++poolBackPressure
       poolInfo = info
     })
-    for (let i = 0; i < numberOfWorkers * 2; i++) {
+    for (let i = 0; i < numberOfWorkers + 1; i++) {
       promises.add(pool.execute())
     }
     await Promise.all(promises)
-    expect(poolBackPressure).toBe(2)
+    expect(poolBackPressure).toBe(1)
     expect(poolInfo).toStrictEqual({
       version,
       type: PoolTypes.fixed,