Fixes to worker selection strategies
authorJérôme Benoit <jerome.benoit@sap.com>
Fri, 14 Oct 2022 21:17:43 +0000 (23:17 +0200)
committerJérôme Benoit <jerome.benoit@sap.com>
Fri, 14 Oct 2022 21:17:43 +0000 (23:17 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>
CHANGELOG.md
package-lock.json
package.json
sonar-project.properties
src/pools/abstract-pool.ts
src/pools/selection-strategies/fair-share-worker-choice-strategy.ts
src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts
tests/pools/selection-strategies/selection-strategies.test.js
tests/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.test.js

index 2f81e0fe3907f67d9587a8b0b17856698a879d48..ed579f973bb543176ba724b765b91ce6c550a040 100644 (file)
@@ -5,6 +5,16 @@ All notable changes to this project will be documented in this file.
 The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
 and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
 
+## [2.3.2] - 2022-14-10
+
+### Changed
+
+- Optimize fair share worker selection strategy implementation.
+
+### Fixed
+
+- Fix WRR worker selection strategy: ensure the condition triggering the round robin can be fulfilled.
+
 ## [2.3.1] - 2022-13-10
 
 ### Added
index cdfb34768d5d7aadde5515892afc15ce7830a872..b297e2c0de8a2b5f5b17e7cc2acb027160787abc 100644 (file)
@@ -1,12 +1,12 @@
 {
   "name": "poolifier",
-  "version": "2.3.1",
+  "version": "2.3.2",
   "lockfileVersion": 2,
   "requires": true,
   "packages": {
     "": {
       "name": "poolifier",
-      "version": "2.3.1",
+      "version": "2.3.2",
       "license": "MIT",
       "devDependencies": {
         "@types/node": "^18.8.5",
index 298948de5bafc43357fb02a9fa87b17a5ec1ac09..85007e997171c6d0e6ad6ac7f700ba8bab400ede 100644 (file)
@@ -1,6 +1,6 @@
 {
   "name": "poolifier",
-  "version": "2.3.1",
+  "version": "2.3.2",
   "description": "A fast, easy to use Node.js Worker Thread Pool and Cluster Pool implementation",
   "main": "lib/index.js",
   "scripts": {
index fb053f5e1c8bb2d724272249ec59ae22aa54751e..893d989424a8e54c3f37e49770b56bcd9592fda8 100644 (file)
@@ -3,7 +3,7 @@ sonar.organization=pioardi
 sonar.javascript.lcov.reportPaths=coverage/lcov.info
 sonar.eslint.reportPaths=reports/eslint.json
 sonar.projectName=poolifier
-sonar.projectVersion=2.3.1
+sonar.projectVersion=2.3.2
 sonar.host.url=https://sonarcloud.io
 sonar.sources=src
 sonar.tests=tests
index c4181e76dfb62bc08cb2f7fbcf86e3d131038c1e..40e06b158fe3af59ec49527a6cc170822520657f 100644 (file)
@@ -272,7 +272,7 @@ export abstract class AbstractPool<
   /**
    * Removes the given worker from the pool.
    *
-   * @param worker Worker that will be removed.
+   * @param worker The worker that will be removed.
    */
   protected removeWorker (worker: Worker): void {
     // Clean worker from data structure
@@ -305,8 +305,8 @@ export abstract class AbstractPool<
   /**
    * Registers a listener callback on a given worker.
    *
-   * @param worker A worker.
-   * @param listener A message listener callback.
+   * @param worker The worker which should register a listener.
+   * @param listener The message listener callback.
    */
   protected abstract registerWorkerMessageListener<
     Message extends Data | Response
index 3735f9722319c34656d3498389962859ffd2f060..48f213ffebb1a4577debff4e701ef45f53e2ebed 100644 (file)
@@ -44,10 +44,10 @@ export class FairShareWorkerChoiceStrategy<
 
   /** @inheritDoc */
   public choose (): Worker {
-    this.computeWorkerLastVirtualTaskTimestamp()
     let minWorkerVirtualTaskEndTimestamp = Infinity
     let chosenWorker!: Worker
     for (const worker of this.pool.workers) {
+      this.computeWorkerLastVirtualTaskTimestamp(worker)
       const workerLastVirtualTaskEndTimestamp =
         this.workerLastVirtualTaskTimestamp.get(worker)?.end ?? 0
       if (
@@ -61,21 +61,21 @@ export class FairShareWorkerChoiceStrategy<
   }
 
   /**
-   * Computes workers last virtual task timestamp.
+   * Computes worker last virtual task timestamp.
+   *
+   * @param worker The worker.
    */
-  private computeWorkerLastVirtualTaskTimestamp () {
-    for (const worker of this.pool.workers) {
-      const workerVirtualTaskStartTimestamp = Math.max(
-        Date.now(),
-        this.workerLastVirtualTaskTimestamp.get(worker)?.end ?? -Infinity
-      )
-      const workerVirtualTaskEndTimestamp =
-        workerVirtualTaskStartTimestamp +
-        (this.pool.getWorkerAverageTasksRunTime(worker) ?? 0)
-      this.workerLastVirtualTaskTimestamp.set(worker, {
-        start: workerVirtualTaskStartTimestamp,
-        end: workerVirtualTaskEndTimestamp
-      })
-    }
+  private computeWorkerLastVirtualTaskTimestamp (worker: Worker): void {
+    const workerVirtualTaskStartTimestamp = Math.max(
+      Date.now(),
+      this.workerLastVirtualTaskTimestamp.get(worker)?.end ?? -Infinity
+    )
+    const workerVirtualTaskEndTimestamp =
+      workerVirtualTaskStartTimestamp +
+      (this.pool.getWorkerAverageTasksRunTime(worker) ?? 0)
+    this.workerLastVirtualTaskTimestamp.set(worker, {
+      start: workerVirtualTaskStartTimestamp,
+      end: workerVirtualTaskEndTimestamp
+    })
   }
 }
index c2f85145c44dfd4d3eee48fba3fd9d6c64db94d0..2b9cfc74536851ee39f0616d92932323c730031c 100644 (file)
@@ -30,10 +30,6 @@ export class WeightedRoundRobinWorkerChoiceStrategy<
     runTime: true
   }
 
-  /**
-   * Worker index where the previous task was submitted.
-   */
-  private previousWorkerIndex: number = 0
   /**
    * Worker index where the current task will be submitted.
    */
@@ -63,7 +59,6 @@ export class WeightedRoundRobinWorkerChoiceStrategy<
 
   /** @inheritDoc */
   public reset (): boolean {
-    this.previousWorkerIndex = 0
     this.currentWorkerIndex = 0
     this.workersTaskRunTime.clear()
     this.initWorkersTaskRunTime()
@@ -72,40 +67,35 @@ export class WeightedRoundRobinWorkerChoiceStrategy<
 
   /** @inheritDoc */
   public choose (): Worker {
-    const currentWorker = this.pool.workers[this.currentWorkerIndex]
+    let chosenWorker = this.pool.workers[this.currentWorkerIndex]
     if (
       this.isDynamicPool === true &&
-      this.workersTaskRunTime.has(currentWorker) === false
+      this.workersTaskRunTime.has(chosenWorker) === false
     ) {
-      this.initWorkerTaskRunTime(currentWorker)
+      this.initWorkerTaskRunTime(chosenWorker)
     }
-    const workerVirtualTaskRunTime =
-      this.getWorkerVirtualTaskRunTime(currentWorker) ?? 0
     const workerTaskWeight =
-      this.workersTaskRunTime.get(currentWorker)?.weight ??
+      this.workersTaskRunTime.get(chosenWorker)?.weight ??
       this.defaultWorkerWeight
-    if (this.currentWorkerIndex === this.previousWorkerIndex) {
-      const workerTaskRunTime =
-        (this.workersTaskRunTime.get(currentWorker)?.runTime ?? 0) +
-        workerVirtualTaskRunTime
+    if (
+      (this.workersTaskRunTime.get(chosenWorker)?.runTime ?? 0) <
+      workerTaskWeight
+    ) {
       this.setWorkerTaskRunTime(
-        currentWorker,
+        chosenWorker,
         workerTaskWeight,
-        workerTaskRunTime
+        (this.workersTaskRunTime.get(chosenWorker)?.runTime ?? 0) +
+          (this.getWorkerVirtualTaskRunTime(chosenWorker) ?? 0)
       )
     } else {
-      this.setWorkerTaskRunTime(currentWorker, workerTaskWeight, 0)
-    }
-    if (workerVirtualTaskRunTime < workerTaskWeight) {
-      this.previousWorkerIndex = this.currentWorkerIndex
-    } else {
-      this.previousWorkerIndex = this.currentWorkerIndex
       this.currentWorkerIndex =
         this.pool.workers.length - 1 === this.currentWorkerIndex
           ? 0
           : this.currentWorkerIndex + 1
+      chosenWorker = this.pool.workers[this.currentWorkerIndex]
+      this.setWorkerTaskRunTime(chosenWorker, workerTaskWeight, 0)
     }
-    return this.pool.workers[this.currentWorkerIndex]
+    return chosenWorker
   }
 
   private initWorkersTaskRunTime (): void {
index 64b26cd54c15426a61f857fe068f7466d11c814d..b8341f09e9093566b28c672fd55a5cf107a966d3 100644 (file)
@@ -402,10 +402,6 @@ describe('Selection strategies test suite', () => {
     expect(pool.opts.workerChoiceStrategy).toBe(
       WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
     )
-    expect(
-      pool.workerChoiceStrategyContext.getWorkerChoiceStrategy()
-        .previousWorkerIndex
-    ).toBe(0)
     expect(
       pool.workerChoiceStrategyContext.getWorkerChoiceStrategy()
         .currentWorkerIndex
@@ -508,10 +504,6 @@ describe('Selection strategies test suite', () => {
       max,
       './tests/worker-files/thread/testWorker.js'
     )
-    expect(
-      pool.workerChoiceStrategyContext.getWorkerChoiceStrategy()
-        .previousWorkerIndex
-    ).toBeUndefined()
     expect(
       pool.workerChoiceStrategyContext.getWorkerChoiceStrategy()
         .currentWorkerIndex
@@ -525,10 +517,6 @@ describe('Selection strategies test suite', () => {
         .workersTaskRunTime
     ).toBeUndefined()
     pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN)
-    expect(
-      pool.workerChoiceStrategyContext.getWorkerChoiceStrategy()
-        .previousWorkerIndex
-    ).toBe(0)
     expect(
       pool.workerChoiceStrategyContext.getWorkerChoiceStrategy()
         .currentWorkerIndex
@@ -552,10 +540,6 @@ describe('Selection strategies test suite', () => {
       max,
       './tests/worker-files/thread/testWorker.js'
     )
-    expect(
-      pool.workerChoiceStrategyContext.getWorkerChoiceStrategy()
-        .workerChoiceStrategy.previousWorkerIndex
-    ).toBeUndefined()
     expect(
       pool.workerChoiceStrategyContext.getWorkerChoiceStrategy()
         .workerChoiceStrategy.currentWorkerIndex
@@ -569,10 +553,6 @@ describe('Selection strategies test suite', () => {
         .workerChoiceStrategy.workersTaskRunTime
     ).toBeUndefined()
     pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN)
-    expect(
-      pool.workerChoiceStrategyContext.getWorkerChoiceStrategy()
-        .workerChoiceStrategy.previousWorkerIndex
-    ).toBe(0)
     expect(
       pool.workerChoiceStrategyContext.getWorkerChoiceStrategy()
         .workerChoiceStrategy.currentWorkerIndex
index 9d87c69970e6f371a8987483d1c06c090bb85a5f..d8ae8299cfd12874303c4c24e5875afb631585be 100644 (file)
@@ -25,7 +25,6 @@ describe('Weighted round robin strategy worker choice strategy test suite', () =
 
   it('Verify that reset() resets internals', () => {
     const strategy = new WeightedRoundRobinWorkerChoiceStrategy(pool)
-    strategy.previousWorkerIndex = TestUtils.generateRandomInteger()
     strategy.currentWorkerIndex = TestUtils.generateRandomInteger()
     const workersTaskRunTimeClearStub = sinon
       .stub(strategy.workersTaskRunTime, 'clear')
@@ -35,7 +34,6 @@ describe('Weighted round robin strategy worker choice strategy test suite', () =
       .returns()
     const resetResult = strategy.reset()
     expect(resetResult).toBe(true)
-    expect(strategy.previousWorkerIndex).toBe(0)
     expect(strategy.currentWorkerIndex).toBe(0)
     expect(workersTaskRunTimeClearStub.calledOnce).toBe(true)
     expect(initWorkersTaskRunTimeStub.calledOnce).toBe(true)