perf: improve node eligibility branching on worker choice strategies
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Sun, 27 Aug 2023 19:35:22 +0000 (21:35 +0200)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Sun, 27 Aug 2023 19:35:22 +0000 (21:35 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
.c8rc.json
src/pools/selection-strategies/fair-share-worker-choice-strategy.ts
src/pools/selection-strategies/interleaved-weighted-round-robin-worker-choice-strategy.ts
src/pools/selection-strategies/least-busy-worker-choice-strategy.ts
src/pools/selection-strategies/least-elu-worker-choice-strategy.ts
src/pools/selection-strategies/least-used-worker-choice-strategy.ts
src/utils.ts
tests/utils.test.js

index ab1b20fa6e8670179e6a804576b2dcf7987f48d6..a6da2792307b6fea19313da25a353042c8d3d56c 100644 (file)
@@ -3,5 +3,5 @@
   "lines": 92,
   "statements": 92,
   "functions": 95,
-  "branches": 92
+  "branches": 90
 }
index 094e9b02f438d711de40431288864a55982cf9cb..fc4bd1fd4de4f281c8dbae20fcf3014ca0a5565f 100644 (file)
@@ -84,15 +84,15 @@ export class FairShareWorkerChoiceStrategy<
     let minWorkerVirtualTaskEndTimestamp = Infinity
     let chosenWorkerNodeKey: number | undefined
     for (const [workerNodeKey] of this.pool.workerNodes.entries()) {
+      if (!this.isWorkerNodeEligible(workerNodeKey)) {
+        continue
+      }
       if (this.workersVirtualTaskEndTimestamp[workerNodeKey] == null) {
         this.computeWorkerVirtualTaskEndTimestamp(workerNodeKey)
       }
       const workerVirtualTaskEndTimestamp =
         this.workersVirtualTaskEndTimestamp[workerNodeKey]
-      if (
-        this.isWorkerNodeEligible(workerNodeKey) &&
-        workerVirtualTaskEndTimestamp < minWorkerVirtualTaskEndTimestamp
-      ) {
+      if (workerVirtualTaskEndTimestamp < minWorkerVirtualTaskEndTimestamp) {
         minWorkerVirtualTaskEndTimestamp = workerVirtualTaskEndTimestamp
         chosenWorkerNodeKey = workerNodeKey
       }
index 86a7f5ca8f801c6b5e98dc380e9ee5dea80d37e3..e37543820593a68b6860e7b597c40919c8866e9d 100644 (file)
@@ -74,12 +74,12 @@ export class InterleavedWeightedRoundRobinWorkerChoiceStrategy<
         workerNodeKey < this.pool.workerNodes.length;
         workerNodeKey++
       ) {
+        if (!this.isWorkerNodeEligible(workerNodeKey)) {
+          continue
+        }
         const workerWeight =
           this.opts.weights?.[workerNodeKey] ?? this.defaultWorkerWeight
-        if (
-          this.isWorkerNodeEligible(workerNodeKey) &&
-          workerWeight >= this.roundWeights[roundIndex]
-        ) {
+        if (workerWeight >= this.roundWeights[roundIndex]) {
           workerNodeId = workerNodeKey
           break
         }
index 6fad8939e75aa0f2a3dca8e332c08ba529ba3a83..338ae379d2663e3777366353691bdc5ab8639592 100644 (file)
@@ -74,16 +74,16 @@ export class LeastBusyWorkerChoiceStrategy<
     let minTime = Infinity
     let chosenWorkerNodeKey: number | undefined
     for (const [workerNodeKey, workerNode] of this.pool.workerNodes.entries()) {
+      if (!this.isWorkerNodeEligible(workerNodeKey)) {
+        continue
+      }
       const workerTime =
         (workerNode.usage.runTime?.aggregate ?? 0) +
         (workerNode.usage.waitTime?.aggregate ?? 0)
-      if (this.isWorkerNodeEligible(workerNodeKey) && workerTime === 0) {
+      if (workerTime === 0) {
         chosenWorkerNodeKey = workerNodeKey
         break
-      } else if (
-        this.isWorkerNodeEligible(workerNodeKey) &&
-        workerTime < minTime
-      ) {
+      } else if (workerTime < minTime) {
         minTime = workerTime
         chosenWorkerNodeKey = workerNodeKey
       }
index 00919f33f58f5878fd381692bfb2009f2c5b611c..938d2072e998eed7ac26deeb225662ad884217b8 100644 (file)
@@ -70,15 +70,15 @@ export class LeastEluWorkerChoiceStrategy<
     let minWorkerElu = Infinity
     let chosenWorkerNodeKey: number | undefined
     for (const [workerNodeKey, workerNode] of this.pool.workerNodes.entries()) {
+      if (!this.isWorkerNodeEligible(workerNodeKey)) {
+        continue
+      }
       const workerUsage = workerNode.usage
       const workerElu = workerUsage.elu?.active?.aggregate ?? 0
-      if (this.isWorkerNodeEligible(workerNodeKey) && workerElu === 0) {
+      if (workerElu === 0) {
         chosenWorkerNodeKey = workerNodeKey
         break
-      } else if (
-        this.isWorkerNodeEligible(workerNodeKey) &&
-        workerElu < minWorkerElu
-      ) {
+      } else if (workerElu < minWorkerElu) {
         minWorkerElu = workerElu
         chosenWorkerNodeKey = workerNodeKey
       }
index 4387d792ef951c2e602308b48d4caffd95e67d53..d0adbcd7b77c4ef095f25fced318148498f42935 100644 (file)
@@ -55,18 +55,18 @@ export class LeastUsedWorkerChoiceStrategy<
     let minNumberOfTasks = Infinity
     let chosenWorkerNodeKey: number | undefined
     for (const [workerNodeKey, workerNode] of this.pool.workerNodes.entries()) {
+      if (!this.isWorkerNodeEligible(workerNodeKey)) {
+        continue
+      }
       const workerTaskStatistics = workerNode.usage.tasks
       const workerTasks =
         workerTaskStatistics.executed +
         workerTaskStatistics.executing +
         workerTaskStatistics.queued
-      if (this.isWorkerNodeEligible(workerNodeKey) && workerTasks === 0) {
+      if (workerTasks === 0) {
         chosenWorkerNodeKey = workerNodeKey
         break
-      } else if (
-        this.isWorkerNodeEligible(workerNodeKey) &&
-        workerTasks < minNumberOfTasks
-      ) {
+      } else if (workerTasks < minNumberOfTasks) {
         minNumberOfTasks = workerTasks
         chosenWorkerNodeKey = workerNodeKey
       }
index 21b2032b47086484e2b39475b06310950d95c08b..54d0d8d6d3c20b822f7138229a3e31d583f7ce3e 100644 (file)
@@ -67,6 +67,40 @@ export const availableParallelism = (): number => {
   return availableParallelism
 }
 
+/**
+ * Returns the worker type of the given worker.
+ *
+ * @param worker - The worker to get the type of.
+ * @returns The worker type of the given worker.
+ * @internal
+ */
+export const getWorkerType = <Worker extends IWorker>(
+  worker: Worker
+): WorkerType | undefined => {
+  if (worker instanceof ThreadWorker) {
+    return WorkerTypes.thread
+  } else if (worker instanceof ClusterWorker) {
+    return WorkerTypes.cluster
+  }
+}
+
+/**
+ * Returns the worker id of the given worker.
+ *
+ * @param worker - The worker to get the id of.
+ * @returns The worker id of the given worker.
+ * @internal
+ */
+export const getWorkerId = <Worker extends IWorker>(
+  worker: Worker
+): number | undefined => {
+  if (worker instanceof ThreadWorker) {
+    return worker.threadId
+  } else if (worker instanceof ClusterWorker) {
+    return worker.id
+  }
+}
+
 /**
  * Sleeps for the given amount of milliseconds.
  *
@@ -116,41 +150,6 @@ export const average = (dataSet: number[]): number => {
   )
 }
 
-/**
- * Returns the worker type of the given worker.
- *
- * @param worker - The worker to get the type of.
- * @returns The worker type of the given worker.
- * @internal
- */
-export const getWorkerType = <Worker extends IWorker>(
-  worker: Worker
-): WorkerType | undefined => {
-  if (worker instanceof ThreadWorker) {
-    return WorkerTypes.thread
-  }
-  if (worker instanceof ClusterWorker) {
-    return WorkerTypes.cluster
-  }
-}
-
-/**
- * Returns the worker id of the given worker.
- *
- * @param worker - The worker to get the id of.
- * @returns The worker id of the given worker.
- * @internal
- */
-export const getWorkerId = <Worker extends IWorker>(
-  worker: Worker
-): number | undefined => {
-  if (worker instanceof ThreadWorker) {
-    return worker.threadId
-  } else if (worker instanceof ClusterWorker) {
-    return worker.id
-  }
-}
-
 /**
  * Computes the median of the given data set.
  *
index 6cfe99ae2f1198592d940bd0b1414d4fa038bb36..ef7688d6f337514bea5a405b18c608819b8d66ac 100644 (file)
@@ -1,4 +1,6 @@
 const { randomInt } = require('crypto')
+const { Worker } = require('worker_threads')
+const cluster = require('cluster')
 const { expect } = require('expect')
 const {
   CircularArray,
@@ -8,6 +10,8 @@ const {
   availableParallelism,
   average,
   exponentialDelay,
+  getWorkerType,
+  getWorkerId,
   isAsyncFunction,
   isKillBehavior,
   isPlainObject,
@@ -17,13 +21,28 @@ const {
   sleep,
   updateMeasurementStatistics
 } = require('../lib/utils')
-const { KillBehaviors } = require('../lib/worker/worker-options')
+const { KillBehaviors, WorkerTypes } = require('../lib')
 
 describe('Utils test suite', () => {
   it('Verify availableParallelism() behavior', () => {
-    expect(typeof availableParallelism() === 'number').toBe(true)
-    expect(availableParallelism()).toBeGreaterThan(0)
-    expect(Number.isSafeInteger(availableParallelism())).toBe(true)
+    const parallelism = availableParallelism()
+    expect(typeof parallelism === 'number').toBe(true)
+    expect(parallelism).toBeGreaterThan(0)
+    expect(Number.isSafeInteger(parallelism)).toBe(true)
+  })
+
+  it('Verify getWorkerType() behavior', () => {
+    expect(
+      getWorkerType(new Worker('./tests/worker-files/thread/testWorker.js'))
+    ).toBe(WorkerTypes.thread)
+    expect(getWorkerType(cluster.fork())).toBe(WorkerTypes.cluster)
+  })
+
+  it('Verify getWorkerId() behavior', () => {
+    const threadWorker = new Worker('./tests/worker-files/thread/testWorker.js')
+    const clusterWorker = cluster.fork()
+    expect(getWorkerId(threadWorker)).toBe(threadWorker.threadId)
+    expect(getWorkerId(clusterWorker)).toBe(clusterWorker.id)
   })
 
   it.skip('Verify sleep() behavior', async () => {