feat: add ratio of worker nodes in a pool allowed to perform concurrent tasks stealing
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Thu, 18 Jul 2024 17:53:08 +0000 (19:53 +0200)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Thu, 18 Jul 2024 17:53:33 +0000 (19:53 +0200)
closes #2284

Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
18 files changed:
examples/typescript/http-client-pool/package.json
examples/typescript/http-server-pool/express-cluster/package.json
examples/typescript/http-server-pool/express-hybrid/package.json
examples/typescript/http-server-pool/express-worker_threads/package.json
examples/typescript/http-server-pool/fastify-cluster/package.json
examples/typescript/http-server-pool/fastify-hybrid/package.json
examples/typescript/http-server-pool/fastify-worker_threads/package.json
examples/typescript/smtp-client-pool/package.json
examples/typescript/websocket-server-pool/ws-cluster/package.json
examples/typescript/websocket-server-pool/ws-hybrid/package.json
examples/typescript/websocket-server-pool/ws-worker_threads/package.json
package.json
src/pools/abstract-pool.ts
src/pools/pool.ts
src/pools/utils.ts
tests/pools/abstract-pool.test.mjs
tests/pools/utils.test.mjs
typedoc.mjs

index 7e15591360171ed876b0a1d1184344c7e7c893ec..176841357edf6c848b6e40bc2f5c1054c47d34c6 100644 (file)
@@ -6,7 +6,7 @@
   "main": "dist/main.js",
   "type": "module",
   "volta": {
-    "node": "22.4.1",
+    "node": "22.5.0",
     "pnpm": "9.5.0"
   },
   "packageManager": "pnpm@9.5.0",
index e5c0bf96eb4e1554edf8f57ba057f4f29257801d..7c7d79a49dc637c09db0f50c67754030244d8f08 100644 (file)
@@ -6,7 +6,7 @@
   "main": "dist/main.js",
   "type": "module",
   "volta": {
-    "node": "22.4.1",
+    "node": "22.5.0",
     "pnpm": "9.5.0"
   },
   "packageManager": "pnpm@9.5.0",
index 7425b13c5c7c516ac21787009a4f75bf4576b7f0..c2de18c33cd4e30899ba9bc09e8adf048c0fcf6f 100644 (file)
@@ -6,7 +6,7 @@
   "main": "dist/main.js",
   "type": "module",
   "volta": {
-    "node": "22.4.1",
+    "node": "22.5.0",
     "pnpm": "9.5.0"
   },
   "packageManager": "pnpm@9.5.0",
index f960c001fb917062a49ad0ea1d99f584a1769e25..a37d7b49bf8504674ab467a1bdd0bb471cd005d9 100644 (file)
@@ -6,7 +6,7 @@
   "main": "dist/main.js",
   "type": "module",
   "volta": {
-    "node": "22.4.1",
+    "node": "22.5.0",
     "pnpm": "9.5.0"
   },
   "packageManager": "pnpm@9.5.0",
index b25045376ae407f40cb1b12326d68e2819b98756..05328c50312edc1dbd06f87135f5e09568b0c3e5 100644 (file)
@@ -6,7 +6,7 @@
   "main": "dist/main.js",
   "type": "module",
   "volta": {
-    "node": "22.4.1",
+    "node": "22.5.0",
     "pnpm": "9.5.0"
   },
   "packageManager": "pnpm@9.5.0",
index 8690f376a82dfe2bd6e4dba2abe807a529c44a59..fa98ba54e823232f195311b2259514257e3dbaff 100644 (file)
@@ -6,7 +6,7 @@
   "main": "dist/main.js",
   "type": "module",
   "volta": {
-    "node": "22.4.1",
+    "node": "22.5.0",
     "pnpm": "9.5.0"
   },
   "packageManager": "pnpm@9.5.0",
index 3b27174532f9f72fed35fe117afcf91a7f3c824d..d8dd863df4c903b1a35f74a7e5572c10e1f9d1c4 100644 (file)
@@ -6,7 +6,7 @@
   "main": "dist/main.js",
   "type": "module",
   "volta": {
-    "node": "22.4.1",
+    "node": "22.5.0",
     "pnpm": "9.5.0"
   },
   "packageManager": "pnpm@9.5.0",
index 66564b6114bcd7b6ec149c6736435342831605b1..10df87dc2056b2471faa9afc06ad2e95074a3587 100644 (file)
@@ -6,7 +6,7 @@
   "main": "dist/main.js",
   "type": "module",
   "volta": {
-    "node": "22.4.1",
+    "node": "22.5.0",
     "pnpm": "9.5.0"
   },
   "packageManager": "pnpm@9.5.0",
index 1e5252dfdb13a5159d90c83ef7093cf6f0f8d7d0..29b27b5c0d8b1f2c996ebed9b0763e4676e8cda8 100644 (file)
@@ -6,7 +6,7 @@
   "main": "dist/main.js",
   "type": "module",
   "volta": {
-    "node": "22.4.1",
+    "node": "22.5.0",
     "pnpm": "9.5.0"
   },
   "packageManager": "pnpm@9.5.0",
index 336d80a16e1b626072d6fd5aac1d023c5fbb7c56..adac296b4fc27554ec9e3df22960dbf38abcc430 100644 (file)
@@ -6,7 +6,7 @@
   "main": "dist/main.js",
   "type": "module",
   "volta": {
-    "node": "22.4.1",
+    "node": "22.5.0",
     "pnpm": "9.5.0"
   },
   "packageManager": "pnpm@9.5.0",
index 67e32a1268cc53d2f0cf9cf77a511a221fe72566..6d49166d3c79b2a8a384686789f21a721396dbbe 100644 (file)
@@ -6,7 +6,7 @@
   "main": "dist/main.js",
   "type": "module",
   "volta": {
-    "node": "22.4.1",
+    "node": "22.5.0",
     "pnpm": "9.5.0"
   },
   "packageManager": "pnpm@9.5.0",
index 176f23152c0051ecd3a608e48d67602c1d198f28..dac0b9e4b73391efb2948f193ce7625fd99d47a4 100644 (file)
@@ -40,7 +40,7 @@
     "pnpm": ">=9.0.0"
   },
   "volta": {
-    "node": "22.4.1",
+    "node": "22.5.0",
     "pnpm": "9.5.0"
   },
   "packageManager": "pnpm@9.5.0",
index defc41a0f9fda00fda7af57eeb7aa484d9913811..08c5d0a7718f82185dd9ad9fdb6b1c2cc54cf544 100644 (file)
@@ -713,7 +713,10 @@ export abstract class AbstractPool<
   ): boolean {
     this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
     if (workerChoiceStrategyOptions != null) {
-      this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
+      this.opts.workerChoiceStrategyOptions = {
+        ...this.opts.workerChoiceStrategyOptions,
+        ...workerChoiceStrategyOptions,
+      }
       this.workerChoiceStrategiesContext?.setOptions(
         this.opts.workerChoiceStrategyOptions
       )
@@ -777,6 +780,7 @@ export abstract class AbstractPool<
       ...getDefaultTasksQueueOptions(
         this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
       ),
+      ...this.opts.tasksQueueOptions,
       ...tasksQueueOptions,
     }
   }
@@ -1402,7 +1406,7 @@ export abstract class AbstractPool<
       // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
       const taskFunctionWorkerUsage = this.workerNodes[
         workerNodeKey
-        // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+      // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
       ].getTaskFunctionWorkerUsage(task.name!)!
       ++taskFunctionWorkerUsage.tasks.executing
       updateWaitTimeWorkerUsage(
@@ -1902,7 +1906,11 @@ export abstract class AbstractPool<
     if (
       this.cannotStealTask() ||
       (this.info.stealingWorkerNodes ?? 0) >
-        Math.floor(this.workerNodes.length / 2)
+        Math.round(
+          this.workerNodes.length *
+            // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+            this.opts.tasksQueueOptions!.tasksStealingRatio!
+        )
     ) {
       if (previousStolenTask != null) {
         workerInfo.stealing = false
@@ -1981,7 +1989,11 @@ export abstract class AbstractPool<
       this.cannotStealTask() ||
       this.hasBackPressure() ||
       (this.info.stealingWorkerNodes ?? 0) >
-        Math.floor(this.workerNodes.length / 2)
+        Math.round(
+          this.workerNodes.length *
+            // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+            this.opts.tasksQueueOptions!.tasksStealingRatio!
+        )
     ) {
       return
     }
index 0df7549ace8ba8589e69fae51b3e9fd4d237736f..226c2fb4bbe33cdaa1b6ba195181ceb90db92779 100644 (file)
@@ -157,6 +157,11 @@ export interface TasksQueueOptions {
    * @defaultValue false
    */
   readonly tasksStealingOnBackPressure?: boolean
+  /**
+   * Ratio of worker nodes that can steal tasks from another worker node.
+   * @defaultValue 0.6
+   */
+  readonly tasksStealingRatio?: number
   /**
    * Queued tasks finished timeout in milliseconds at worker node termination.
    * @defaultValue 2000
index 24f2cfb2a6d075b2133473bef04aa9a650607c5b..b652488ad57521af21badd8ead0af04cc2b699c8 100644 (file)
@@ -44,6 +44,7 @@ export const getDefaultTasksQueueOptions = (
     concurrency: 1,
     taskStealing: true,
     tasksStealingOnBackPressure: false,
+    tasksStealingRatio: 0.6,
     tasksFinishedTimeout: 2000,
   }
 }
@@ -146,6 +147,23 @@ export const checkValidTasksQueueOptions = (
       `Invalid worker node tasks queue size: ${tasksQueueOptions.size.toString()} is a negative integer or zero`
     )
   }
+  if (
+    tasksQueueOptions?.tasksStealingRatio != null &&
+    typeof tasksQueueOptions.tasksStealingRatio !== 'number'
+  ) {
+    throw new TypeError(
+      'Invalid worker node tasks stealing ratio: must be a number'
+    )
+  }
+  if (
+    tasksQueueOptions?.tasksStealingRatio != null &&
+    (tasksQueueOptions.tasksStealingRatio < 0 ||
+      tasksQueueOptions.tasksStealingRatio > 1)
+  ) {
+    throw new RangeError(
+      'Invalid worker node tasks stealing ratio: must be between 0 and 1'
+    )
+  }
 }
 
 export const checkWorkerNodeArguments = (
index 1f9213f3a57ae515bd314cf2d6ecf0c877b98a09..421a24982acad994ab1792f18c7fbd9da8e5a11b 100644 (file)
@@ -277,6 +277,7 @@ describe('Abstract pool test suite', () => {
         size: Math.pow(numberOfWorkers, 2),
         taskStealing: true,
         tasksStealingOnBackPressure: false,
+        tasksStealingRatio: 0.6,
         tasksFinishedTimeout: 2000,
       },
       workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
@@ -439,6 +440,36 @@ describe('Abstract pool test suite', () => {
     ).toThrow(
       new TypeError('Invalid worker node tasks queue size: must be an integer')
     )
+    expect(
+      () =>
+        new FixedThreadPool(
+          numberOfWorkers,
+          './tests/worker-files/thread/testWorker.mjs',
+          {
+            enableTasksQueue: true,
+            tasksQueueOptions: { tasksStealingRatio: '' },
+          }
+        )
+    ).toThrow(
+      new TypeError(
+        'Invalid worker node tasks stealing ratio: must be a number'
+      )
+    )
+    expect(
+      () =>
+        new FixedThreadPool(
+          numberOfWorkers,
+          './tests/worker-files/thread/testWorker.mjs',
+          {
+            enableTasksQueue: true,
+            tasksQueueOptions: { tasksStealingRatio: 1.1 },
+          }
+        )
+    ).toThrow(
+      new RangeError(
+        'Invalid worker node tasks stealing ratio: must be between 0 and 1'
+      )
+    )
   })
 
   it('Verify that pool worker choice strategy options can be set', async () => {
@@ -593,6 +624,7 @@ describe('Abstract pool test suite', () => {
       size: Math.pow(numberOfWorkers, 2),
       taskStealing: true,
       tasksStealingOnBackPressure: false,
+      tasksStealingRatio: 0.6,
       tasksFinishedTimeout: 2000,
     })
     pool.enableTasksQueue(true, { concurrency: 2 })
@@ -602,6 +634,7 @@ describe('Abstract pool test suite', () => {
       size: Math.pow(numberOfWorkers, 2),
       taskStealing: true,
       tasksStealingOnBackPressure: false,
+      tasksStealingRatio: 0.6,
       tasksFinishedTimeout: 2000,
     })
     pool.enableTasksQueue(false)
@@ -621,6 +654,7 @@ describe('Abstract pool test suite', () => {
       size: Math.pow(numberOfWorkers, 2),
       taskStealing: true,
       tasksStealingOnBackPressure: false,
+      tasksStealingRatio: 0.6,
       tasksFinishedTimeout: 2000,
     })
     for (const workerNode of pool.workerNodes) {
@@ -633,6 +667,7 @@ describe('Abstract pool test suite', () => {
       size: 2,
       taskStealing: false,
       tasksStealingOnBackPressure: false,
+      tasksStealingRatio: 0.5,
       tasksFinishedTimeout: 3000,
     })
     expect(pool.opts.tasksQueueOptions).toStrictEqual({
@@ -640,6 +675,7 @@ describe('Abstract pool test suite', () => {
       size: 2,
       taskStealing: false,
       tasksStealingOnBackPressure: false,
+      tasksStealingRatio: 0.5,
       tasksFinishedTimeout: 3000,
     })
     for (const workerNode of pool.workerNodes) {
@@ -654,10 +690,11 @@ describe('Abstract pool test suite', () => {
     })
     expect(pool.opts.tasksQueueOptions).toStrictEqual({
       concurrency: 1,
-      size: Math.pow(numberOfWorkers, 2),
+      size: 2,
       taskStealing: true,
       tasksStealingOnBackPressure: true,
-      tasksFinishedTimeout: 2000,
+      tasksStealingRatio: 0.5,
+      tasksFinishedTimeout: 3000,
     })
     for (const workerNode of pool.workerNodes) {
       expect(workerNode.tasksQueueBackPressureSize).toBe(
@@ -693,6 +730,18 @@ describe('Abstract pool test suite', () => {
     expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrow(
       new TypeError('Invalid worker node tasks queue size: must be an integer')
     )
+    expect(() => pool.setTasksQueueOptions({ tasksStealingRatio: '' })).toThrow(
+      new TypeError(
+        'Invalid worker node tasks stealing ratio: must be a number'
+      )
+    )
+    expect(() =>
+      pool.setTasksQueueOptions({ tasksStealingRatio: 1.1 })
+    ).toThrow(
+      new TypeError(
+        'Invalid worker node tasks stealing ratio: must be between 0 and 1'
+      )
+    )
     await pool.destroy()
   })
 
index 21ddaa777b151af60ffb4be74c46130fb4e6c3c7..e79294a288ebd1a49be845a55646ce689d0bbf3d 100644 (file)
@@ -31,6 +31,7 @@ describe('Pool utils test suite', () => {
       size: Math.pow(poolMaxSize, 2),
       taskStealing: true,
       tasksStealingOnBackPressure: false,
+      tasksStealingRatio: 0.6,
       tasksFinishedTimeout: 2000,
     })
   })
index 0b0c7b4605a9a526e7d97a7257e1f05509bcf17d..d139d4e8102203dca02e0c06d47c499e54470fa7 100644 (file)
@@ -16,7 +16,7 @@ try {
       join(dirname(fileURLToPath(import.meta.url)), 'tmp', markdownFile)
     )
   }
-  execSync('pnpm dlx typedoc', { stdio: 'inherit' })
+  execSync('pnpm exec typedoc', { stdio: 'inherit' })
   for (const markdownFile of markdownFiles) {
     copyFileSync(
       join(dirname(fileURLToPath(import.meta.url)), 'tmp', markdownFile),