Optimize tasks usage lookup implementation.
authorJérôme Benoit <jerome.benoit@sap.com>
Fri, 21 Oct 2022 07:43:11 +0000 (09:43 +0200)
committerJérôme Benoit <jerome.benoit@sap.com>
Fri, 21 Oct 2022 07:43:11 +0000 (09:43 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>
CHANGELOG.md
benchmarks/versus-external-pools/BENCH-100000.md
src/pools/abstract-pool.ts
src/pools/cluster/fixed.ts
src/pools/thread/fixed.ts
src/worker/abstract-worker.ts
tests/pools/abstract/abstract-pool.test.js

index a2fb8aebba817ab78c09d9f0eef2ae214e3c7f27..8bc2947aab564a95ed25e3eebd65cd7a37e87b67 100644 (file)
@@ -9,7 +9,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
 
 ### Changed
 
-- Improve benchmarks: add IO intensive task workload, add task size option, integrate into eslint.
+- Improve benchmarks: add IO intensive task workload, add task size option, integrate code into linter.
+- Optimize tasks usage lookup implementation.
 
 ## [2.3.4] - 2022-10-17
 
index 6c2aae9c7638eaaf95d59fa1902806da76a840e4..a8cc6e8da536938af8963e31ed18723632c247d5 100644 (file)
@@ -1,13 +1,13 @@
 | Command                                              |       Mean [s] | Min [s] | Max [s] |    Relative |
 | :--------------------------------------------------- | -------------: | ------: | ------: | ----------: |
-| `node dynamic-piscina.js`                            | 27.917 ± 3.550 |  23.332 |  32.602 | 1.24 ± 0.17 |
-| `node fixed-piscina.js`                              | 24.073 ± 0.603 |  23.245 |  24.980 | 1.07 ± 0.06 |
-| `node dynamic-poolifier.js`                          | 22.571 ± 1.049 |  21.715 |  25.146 |        1.00 |
-| `node fixed-poolifier.js`                            | 23.344 ± 0.508 |  22.659 |  24.247 | 1.03 ± 0.05 |
-| `node dynamic-suchmokuo-node-worker-threads-pool.js` | 33.512 ± 1.607 |  32.231 |  37.384 | 1.48 ± 0.10 |
-| `node static-suchmokuo-node-worker-threads-pool.js`  | 24.475 ± 0.578 |  23.780 |  25.335 | 1.08 ± 0.06 |
-| `node threadjs.js`                                   | 92.927 ± 5.314 |  84.503 | 102.706 | 4.12 ± 0.30 |
-| `node dynamic-workerpool.js`                         | 27.519 ± 0.717 |  26.510 |  29.063 | 1.22 ± 0.06 |
-| `node fixed-workerpool.js`                           | 25.787 ± 0.620 |  25.066 |  26.622 | 1.14 ± 0.06 |
-| `node fixed-threadwork.js`                           | 24.481 ± 0.505 |  23.711 |  25.274 | 1.08 ± 0.06 |
-| `node fixed-microjob.js`                             | 41.766 ± 1.275 |  40.694 |  45.133 | 1.85 ± 0.10 |
+| `node dynamic-poolifier.js`                          | 25.315 ± 2.256 |  23.169 |  30.705 |        1.00 |
+| `node fixed-poolifier.js`                            | 26.079 ± 2.875 |  23.087 |  32.154 | 1.03 ± 0.15 |
+| `node dynamic-piscina.js`                            | 25.897 ± 0.570 |  24.997 |  26.914 | 1.02 ± 0.09 |
+| `node fixed-piscina.js`                              | 25.946 ± 0.891 |  24.953 |  27.346 | 1.02 ± 0.10 |
+| `node dynamic-workerpool.js`                         | 29.314 ± 0.897 |  27.839 |  30.370 | 1.16 ± 0.11 |
+| `node fixed-workerpool.js`                           | 28.926 ± 0.964 |  27.866 |  30.583 | 1.14 ± 0.11 |
+| `node dynamic-suchmokuo-node-worker-threads-pool.js` | 84.852 ± 1.322 |  83.077 |  86.920 | 3.35 ± 0.30 |
+| `node static-suchmokuo-node-worker-threads-pool.js`  | 27.519 ± 0.963 |  26.248 |  29.518 | 1.09 ± 0.10 |
+| `node threadjs.js`                                   | 88.842 ± 3.197 |  85.012 |  94.289 | 3.51 ± 0.34 |
+| `node fixed-threadwork.js`                           | 27.345 ± 0.920 |  26.167 |  28.972 | 1.08 ± 0.10 |
+| `node fixed-microjob.js`                             | 44.998 ± 0.659 |  44.106 |  46.029 | 1.78 ± 0.16 |
index 346eb6d1fdb02cdf992f84b91502ae21f465800e..6393fe2eb599daa0971f5d66072a5c2f890737eb 100644 (file)
@@ -14,9 +14,6 @@ import {
 } from './selection-strategies/selection-strategies-types'
 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
 
-const WORKER_NOT_FOUND_TASKS_USAGE_MAP =
-  'Worker could not be found in worker tasks usage map'
-
 /**
  * Base class that implements some shared logic for all poolifier pools.
  *
@@ -210,11 +207,13 @@ export abstract class AbstractPool<
   public execute (data: Data): Promise<Response> {
     // Configure worker to handle message with the specified task
     const worker = this.chooseWorker()
-    const messageId = ++this.nextMessageId
-    const res = this.internalExecute(worker, messageId)
+    const res = this.internalExecute(worker, this.nextMessageId)
     this.checkAndEmitBusy()
-    data = data ?? ({} as Data)
-    this.sendToWorker(worker, { data, id: messageId })
+    this.sendToWorker(worker, {
+      data: data ?? ({} as Data),
+      id: this.nextMessageId
+    })
+    ++this.nextMessageId
     return res
   }
 
@@ -370,9 +369,12 @@ export abstract class AbstractPool<
       if (message.id !== undefined) {
         const promise = this.promiseMap.get(message.id)
         if (promise !== undefined) {
+          if (message.error) {
+            promise.reject(message.error)
+          } else {
+            promise.resolve(message.data as Response)
+          }
           this.afterPromiseWorkerResponseHook(message, promise)
-          if (message.error) promise.reject(message.error)
-          else promise.resolve(message.data as Response)
           this.promiseMap.delete(message.id)
         }
       }
@@ -410,12 +412,10 @@ export abstract class AbstractPool<
    * @param step Number of running tasks step.
    */
   private stepWorkerRunningTasks (worker: Worker, step: number): void {
-    const tasksUsage = this.workersTasksUsage.get(worker)
-    if (tasksUsage !== undefined) {
+    if (this.checkWorkerTasksUsage(worker) === true) {
+      const tasksUsage = this.workersTasksUsage.get(worker) as TasksUsage
       tasksUsage.running = tasksUsage.running + step
       this.workersTasksUsage.set(worker, tasksUsage)
-    } else {
-      throw new Error(WORKER_NOT_FOUND_TASKS_USAGE_MAP)
     }
   }
 
@@ -426,12 +426,10 @@ export abstract class AbstractPool<
    * @param step Number of run tasks step.
    */
   private stepWorkerRunTasks (worker: Worker, step: number): void {
-    const tasksUsage = this.workersTasksUsage.get(worker)
-    if (tasksUsage !== undefined) {
+    if (this.checkWorkerTasksUsage(worker) === true) {
+      const tasksUsage = this.workersTasksUsage.get(worker) as TasksUsage
       tasksUsage.run = tasksUsage.run + step
       this.workersTasksUsage.set(worker, tasksUsage)
-    } else {
-      throw new Error(WORKER_NOT_FOUND_TASKS_USAGE_MAP)
     }
   }
 
@@ -447,19 +445,30 @@ export abstract class AbstractPool<
   ): void {
     if (
       this.workerChoiceStrategyContext.getWorkerChoiceStrategy()
-        .requiredStatistics.runTime === true
+        .requiredStatistics.runTime === true &&
+      this.checkWorkerTasksUsage(worker) === true
     ) {
-      const tasksUsage = this.workersTasksUsage.get(worker)
-      if (tasksUsage !== undefined) {
-        tasksUsage.runTime += taskRunTime ?? 0
-        if (tasksUsage.run !== 0) {
-          tasksUsage.avgRunTime = tasksUsage.runTime / tasksUsage.run
-        }
-        this.workersTasksUsage.set(worker, tasksUsage)
-      } else {
-        throw new Error(WORKER_NOT_FOUND_TASKS_USAGE_MAP)
+      const tasksUsage = this.workersTasksUsage.get(worker) as TasksUsage
+      tasksUsage.runTime += taskRunTime ?? 0
+      if (tasksUsage.run !== 0) {
+        tasksUsage.avgRunTime = tasksUsage.runTime / tasksUsage.run
       }
+      this.workersTasksUsage.set(worker, tasksUsage)
+    }
+  }
+
+  /**
+   * Checks if the given worker is registered in the workers tasks usage map.
+   *
+   * @param worker Worker to check.
+   * @returns `true` if the worker is registered in the workers tasks usage map. `false` otherwise.
+   */
+  private checkWorkerTasksUsage (worker: Worker): boolean {
+    const hasTasksUsage = this.workersTasksUsage.has(worker)
+    if (hasTasksUsage === false) {
+      throw new Error('Worker could not be found in workers tasks usage map')
     }
+    return hasTasksUsage
   }
 
   /**
index a98cd647eeb59d925f7bd8c3ed6685abf80c1eaf..33d7f1c91fa5f057bfcb1ad70e7725a6a6910102 100644 (file)
@@ -91,7 +91,7 @@ export class FixedClusterPool<
 
   /** @inheritDoc */
   protected afterWorkerSetup (worker: Worker): void {
-    // Listen worker messages.
+    // Listen to worker messages.
     this.registerWorkerMessageListener(worker, super.workerListener())
   }
 
index 5b06ca4971d3ed8b7f01dd917ba5befed36a410d..cd414502cfd8a706e8d30836b23a90ce64c35a91 100644 (file)
@@ -82,7 +82,7 @@ export class FixedThreadPool<
     worker.postMessage({ parent: port1 }, [port1])
     worker.port1 = port1
     worker.port2 = port2
-    // Listen worker messages.
+    // Listen to worker messages.
     this.registerWorkerMessageListener(worker, super.workerListener())
   }
 
index 3090b6f534f29597e7dca3dabddee8935f46ddbe..c3c5d1bda82894265565f83bf662cc1618013edd 100644 (file)
@@ -139,7 +139,7 @@ export abstract class AbstractWorker<
   protected abstract sendToMainWorker (message: MessageValue<Response>): void
 
   /**
-   * Check to see if the worker should be terminated, because its living too long.
+   * Checks if the worker should be terminated, because its living too long.
    */
   protected checkAlive (): void {
     if (
index 4224f4ffe7a9e87bdc86169ef66a7b1437352c53..bc7bdd1eb7932a90257bcb730f1e78f6dab4de43 100644 (file)
@@ -9,7 +9,7 @@ const {
 describe('Abstract pool test suite', () => {
   const numberOfWorkers = 1
   const workerNotFoundInTasksUsageMapError = new Error(
-    'Worker could not be found in worker tasks usage map'
+    'Worker could not be found in workers tasks usage map'
   )
   class StubPoolWithWorkerTasksUsageMapClear extends FixedThreadPool {
     removeAllWorker () {