feat: worker node readiness aware worker choice strategies
authorJérôme Benoit <jerome.benoit@sap.com>
Mon, 10 Jul 2023 07:04:06 +0000 (09:04 +0200)
committerJérôme Benoit <jerome.benoit@sap.com>
Mon, 10 Jul 2023 07:04:06 +0000 (09:04 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>
CHANGELOG.md
src/pools/abstract-pool.ts
src/pools/selection-strategies/abstract-worker-choice-strategy.ts
src/pools/selection-strategies/fair-share-worker-choice-strategy.ts
src/pools/selection-strategies/least-busy-worker-choice-strategy.ts
src/pools/selection-strategies/least-elu-worker-choice-strategy.ts
src/pools/selection-strategies/least-used-worker-choice-strategy.ts
tests/pools/selection-strategies/selection-strategies.test.js

index fdda81a5885f18f9e7825090695159a27d277693..774ec5f2179143213b6b03ad0d42781bf5fe1f95 100644 (file)
@@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
 
 ## [Unreleased]
 
+### Added
+
+- Take into account worker node readiness in worker choice strategies.
+
 ## [2.6.14] - 2023-07-10
 
 ### Fixed
index c200d5317da6ba47c1154cdeb15a9b5ca5c9ee30..812a2087419de6df8d457bd5d875c2d2b5f9f179 100644 (file)
@@ -950,9 +950,6 @@ export abstract class AbstractPool<
       if (this.emitter != null) {
         this.emitter.emit(PoolEvents.error, error)
       }
-      if (this.opts.enableTasksQueue === true) {
-        this.redistributeQueuedTasks(worker)
-      }
       if (this.opts.restartWorkerOnError === true && !this.starting) {
         if (this.getWorkerInfo(this.getWorkerNodeKey(worker)).dynamic) {
           this.createAndSetupDynamicWorker()
@@ -960,6 +957,9 @@ export abstract class AbstractPool<
           this.createAndSetupWorker()
         }
       }
+      if (this.opts.enableTasksQueue === true) {
+        this.redistributeQueuedTasks(worker)
+      }
     })
     worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
     worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
index 7f26706d6d2a7d4b8780bb086a07a413279c23bd..a6589d09836dc5b48e4a1d7e9a52eb93327bdfb9 100644 (file)
@@ -128,6 +128,10 @@ export abstract class AbstractWorkerChoiceStrategy<
     this.setTaskStatisticsRequirements(this.opts)
   }
 
+  protected workerNodeReady (workerNodeKey: number): boolean {
+    return this.pool.workerNodes[workerNodeKey].info.ready
+  }
+
   // /**
   //  * Finds a free worker node key.
   //  *
index f678f83b9719f93c3c3defd2013dec42fea31a8d..351f3d91e10c1a05a60382b63af16de5d5a70d43 100644 (file)
@@ -77,7 +77,10 @@ export class FairShareWorkerChoiceStrategy<
       }
       const workerVirtualTaskEndTimestamp =
         this.workersVirtualTaskEndTimestamp[workerNodeKey]
-      if (workerVirtualTaskEndTimestamp < minWorkerVirtualTaskEndTimestamp) {
+      if (
+        this.workerNodeReady(workerNodeKey) &&
+        workerVirtualTaskEndTimestamp < minWorkerVirtualTaskEndTimestamp
+      ) {
         minWorkerVirtualTaskEndTimestamp = workerVirtualTaskEndTimestamp
         this.nextWorkerNodeId = workerNodeKey
       }
index 928b1b2a932488132e3dcbb5a3c8b3d2be4b9892..b61bca1c8bdf450083474d8e2d8e78fef48ea2bc 100644 (file)
@@ -66,10 +66,10 @@ export class LeastBusyWorkerChoiceStrategy<
       const workerTime =
         (workerNode.usage.runTime?.aggregate ?? 0) +
         (workerNode.usage.waitTime?.aggregate ?? 0)
-      if (workerTime === 0) {
+      if (this.workerNodeReady(workerNodeKey) && workerTime === 0) {
         this.nextWorkerNodeId = workerNodeKey
         break
-      } else if (workerTime < minTime) {
+      } else if (this.workerNodeReady(workerNodeKey) && workerTime < minTime) {
         minTime = workerTime
         this.nextWorkerNodeId = workerNodeKey
       }
index 54d0e7cb054de064d8f9dfbfe074df5f1ba5bbf6..57e805e18b5b5fb9f25e3b9170fb3baf9e84fa49 100644 (file)
@@ -61,10 +61,13 @@ export class LeastEluWorkerChoiceStrategy<
     for (const [workerNodeKey, workerNode] of this.pool.workerNodes.entries()) {
       const workerUsage = workerNode.usage
       const workerElu = workerUsage.elu?.active?.aggregate ?? 0
-      if (workerElu === 0) {
+      if (this.workerNodeReady(workerNodeKey) && workerElu === 0) {
         this.nextWorkerNodeId = workerNodeKey
         break
-      } else if (workerElu < minWorkerElu) {
+      } else if (
+        this.workerNodeReady(workerNodeKey) &&
+        workerElu < minWorkerElu
+      ) {
         minWorkerElu = workerElu
         this.nextWorkerNodeId = workerNodeKey
       }
index 53aa05eed641d557631e56e964b7d57739a4a7f8..8c1384701de0c2d37b5c34a474394f8baa92141d 100644 (file)
@@ -49,10 +49,13 @@ export class LeastUsedWorkerChoiceStrategy<
         workerTaskStatistics.executed +
         workerTaskStatistics.executing +
         workerTaskStatistics.queued
-      if (workerTasks === 0) {
+      if (this.workerNodeReady(workerNodeKey) && workerTasks === 0) {
         this.nextWorkerNodeId = workerNodeKey
         break
-      } else if (workerTasks < minNumberOfTasks) {
+      } else if (
+        this.workerNodeReady(workerNodeKey) &&
+        workerTasks < minNumberOfTasks
+      ) {
         minNumberOfTasks = workerTasks
         this.nextWorkerNodeId = workerNodeKey
       }
index 167736edb9f11c24da068a0b43da56c48db14344..2e1a49b67e4db6f3a89347b1e81329151c4e4180 100644 (file)
@@ -623,7 +623,7 @@ describe('Selection strategies test suite', () => {
     }
     await Promise.all(promises)
     for (const workerNode of pool.workerNodes) {
-      expect(workerNode.usage).toStrictEqual({
+      expect(workerNode.usage).toMatchObject({
         tasks: {
           executed: expect.any(Number),
           executing: 0,
@@ -632,15 +632,9 @@ describe('Selection strategies test suite', () => {
           failed: 0
         },
         runTime: {
-          aggregate: expect.any(Number),
-          maximum: expect.any(Number),
-          minimum: expect.any(Number),
           history: expect.any(CircularArray)
         },
         waitTime: {
-          aggregate: expect.any(Number),
-          maximum: expect.any(Number),
-          minimum: expect.any(Number),
           history: expect.any(CircularArray)
         },
         elu: {
@@ -656,8 +650,16 @@ describe('Selection strategies test suite', () => {
       expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
         max * maxMultiplier
       )
-      expect(workerNode.usage.runTime.aggregate).toBeGreaterThanOrEqual(0)
-      expect(workerNode.usage.waitTime.aggregate).toBeGreaterThanOrEqual(0)
+      if (workerNode.usage.runTime.aggregate == null) {
+        expect(workerNode.usage.runTime.aggregate).toBeUndefined()
+      } else {
+        expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
+      }
+      if (workerNode.usage.waitTime.aggregate == null) {
+        expect(workerNode.usage.waitTime.aggregate).toBeUndefined()
+      } else {
+        expect(workerNode.usage.waitTime.aggregate).toBeGreaterThan(0)
+      }
     }
     // We need to clean up the resources after our test
     await pool.destroy()
@@ -678,7 +680,7 @@ describe('Selection strategies test suite', () => {
     }
     await Promise.all(promises)
     for (const workerNode of pool.workerNodes) {
-      expect(workerNode.usage).toStrictEqual({
+      expect(workerNode.usage).toMatchObject({
         tasks: {
           executed: expect.any(Number),
           executing: 0,
@@ -687,15 +689,9 @@ describe('Selection strategies test suite', () => {
           failed: 0
         },
         runTime: {
-          aggregate: expect.any(Number),
-          maximum: expect.any(Number),
-          minimum: expect.any(Number),
           history: expect.any(CircularArray)
         },
         waitTime: {
-          aggregate: expect.any(Number),
-          maximum: expect.any(Number),
-          minimum: expect.any(Number),
           history: expect.any(CircularArray)
         },
         elu: {
@@ -711,8 +707,16 @@ describe('Selection strategies test suite', () => {
       expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
         max * maxMultiplier
       )
-      expect(workerNode.usage.runTime.aggregate).toBeGreaterThanOrEqual(0)
-      expect(workerNode.usage.waitTime.aggregate).toBeGreaterThanOrEqual(0)
+      if (workerNode.usage.runTime.aggregate == null) {
+        expect(workerNode.usage.runTime.aggregate).toBeUndefined()
+      } else {
+        expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
+      }
+      if (workerNode.usage.waitTime.aggregate == null) {
+        expect(workerNode.usage.waitTime.aggregate).toBeUndefined()
+      } else {
+        expect(workerNode.usage.waitTime.aggregate).toBeGreaterThan(0)
+      }
     }
     // We need to clean up the resources after our test
     await pool.destroy()