Merge dependabot/npm_and_yarn/examples/typescript/smtp-client-pool/types/node-20...
authorgithub-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
Wed, 23 Aug 2023 23:00:16 +0000 (23:00 +0000)
committerGitHub <noreply@github.com>
Wed, 23 Aug 2023 23:00:16 +0000 (23:00 +0000)
12 files changed:
CHANGELOG.md
README.md
examples/typescript/http-server-pool/fastify-worker_threads/package.json
examples/typescript/http-server-pool/fastify-worker_threads/pnpm-lock.yaml
package.json
sonar-project.properties
src/pools/abstract-pool.ts
src/pools/version.ts
src/pools/worker-node.ts
src/pools/worker.ts
tests/pools/cluster/fixed.test.js
tests/pools/thread/fixed.test.js

index 89b1918650b9c7e49de75fa9dcefa1aa42637674..ab21a07781f819151919de0ce36a53bf9e7e638f 100644 (file)
@@ -7,10 +7,20 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
 
 ## [Unreleased]
 
+## [2.6.33] - 2023-08-24
+
+### Fixed
+
+- Fix queued tasks rescheduling.
+
 ### Changed
 
 - Rename tasks queue options `queueMaxSize` to `size`.
 
+### Added
+
+- Task stealing scheduling algorithm if tasks queueing is enabled.
+
 ## [2.6.32] - 2023-08-23
 
 ### Fixed
index eae0023f7b9f9923971601d425aedd89cf8a0a3d..47a680d039609442639275395e62b5b699a540cb 100644 (file)
--- a/README.md
+++ b/README.md
@@ -47,8 +47,9 @@ Please consult our [general guidelines](#general-guidelines).
 - Tasks distribution strategies :white_check_mark:
 - Lockless tasks queueing :white_check_mark:
 - Queued tasks rescheduling:
-  - Tasks redistribution on worker error :white_check_mark:
+  - Task stealing :white_check_mark:
   - Tasks stealing under back pressure :white_check_mark:
+  - Tasks redistribution on worker error :white_check_mark:
 - General guidelines on pool choice :white_check_mark:
 - Error handling out of the box :white_check_mark:
 - Widely tested :white_check_mark:
index ea95de2713cf3aafe5fa664e3a456cef6b2f4666..0ccdbfa4ffb6e1873568ee1a6aaaaf469661445b 100644 (file)
   "dependencies": {
     "fastify": "^4.21.0",
     "fastify-plugin": "^4.5.1",
-    "poolifier": "^2.6.28"
+    "poolifier": "^2.6.32"
   },
   "devDependencies": {
-    "@types/node": "^20.5.1",
+    "@types/node": "^20.5.4",
     "typescript": "^5.1.6"
   }
 }
index 4af3c13cadc86f08087b0775aedc322089fe237a..7c7bb4e409d305755da97e09020c68124161c9c3 100644 (file)
@@ -12,13 +12,13 @@ dependencies:
     specifier: ^4.5.1
     version: 4.5.1
   poolifier:
-    specifier: ^2.6.28
-    version: 2.6.28
+    specifier: ^2.6.32
+    version: 2.6.32
 
 devDependencies:
   '@types/node':
-    specifier: ^20.5.1
-    version: 20.5.1
+    specifier: ^20.5.4
+    version: 20.5.4
   typescript:
     specifier: ^5.1.6
     version: 5.1.6
@@ -47,8 +47,8 @@ packages:
       fast-json-stringify: 5.8.0
     dev: false
 
-  /@types/node@20.5.1:
-    resolution: {integrity: sha512-4tT2UrL5LBqDwoed9wZ6N3umC4Yhz3W3FloMmiiG4JwmUJWpie0c7lcnUNd4gtMKuDEO4wRVS8B6Xa0uMRsMKg==}
+  /@types/node@20.5.4:
+    resolution: {integrity: sha512-Y9vbIAoM31djQZrPYjpTLo0XlaSwOIsrlfE3LpulZeRblttsLQRFRlBAppW0LOxyT3ALj2M5vU1ucQQayQH3jA==}
     dev: true
 
   /abort-controller@3.0.0:
@@ -288,8 +288,8 @@ packages:
       thread-stream: 2.4.0
     dev: false
 
-  /poolifier@2.6.28:
-    resolution: {integrity: sha512-dm3N/BcJ6n1m9aDidXYx53mI7nQc1QvTGdIL0rA29hOjLsfWq6+7pz977vnxvfGxKG5+pCG+VIT7PcJENQSzng==}
+  /poolifier@2.6.32:
+    resolution: {integrity: sha512-2bXB7C5Uazckw8Li7ZAoSHfBaMHA2h0I5VlsprubPcFrr/KRVi1EUg/j6wPFlZs4tcHouD4ZJOcJjySXpV5EgQ==}
     engines: {node: '>=16.14.0', pnpm: '>=8.6.0'}
     requiresBuild: true
     dev: false
index f29841f9b937ffb54ec3c31c0ac25b96cb4383bc..ebc8909e980024df565c4c8703559598d6959484 100644 (file)
@@ -1,7 +1,7 @@
 {
   "$schema": "https://json.schemastore.org/package",
   "name": "poolifier",
-  "version": "2.6.32",
+  "version": "2.6.33",
   "description": "Fast and small Node.js Worker_Threads and Cluster Worker Pool",
   "license": "MIT",
   "main": "./lib/index.js",
index 757cdd54b92adf0f2f93d08050fe29f77eb6b4d0..f367be262f51bcd1d477698ef9f5b2a3f870c67e 100644 (file)
@@ -2,7 +2,7 @@ sonar.projectKey=pioardi_poolifier
 sonar.organization=pioardi
 sonar.javascript.lcov.reportPaths=coverage/lcov.info
 sonar.projectName=poolifier
-sonar.projectVersion=2.6.32
+sonar.projectVersion=2.6.33
 sonar.host.url=https://sonarcloud.io
 sonar.sources=src
 sonar.tests=tests
index 3a1a87807cc065df871bb1ca6d2195619672a64d..c6712d198bc28a9d77c08ae817e5c96c3b9227c6 100644 (file)
@@ -1156,6 +1156,8 @@ export abstract class AbstractPool<
     // Send the statistics message to worker.
     this.sendStatisticsMessageToWorker(workerNodeKey)
     if (this.opts.enableTasksQueue === true) {
+      this.workerNodes[workerNodeKey].onEmptyQueue =
+        this.taskStealingOnEmptyQueue.bind(this)
       this.workerNodes[workerNodeKey].onBackPressure =
         this.tasksStealingOnBackPressure.bind(this)
     }
@@ -1187,42 +1189,77 @@ export abstract class AbstractPool<
   }
 
   private redistributeQueuedTasks (workerNodeKey: number): void {
-    const workerNodes = this.workerNodes.filter(
-      (_, workerNodeId) => workerNodeId !== workerNodeKey
-    )
     while (this.tasksQueueSize(workerNodeKey) > 0) {
-      let targetWorkerNodeKey: number = workerNodeKey
+      let destinationWorkerNodeKey: number = workerNodeKey
       let minQueuedTasks = Infinity
       let executeTask = false
-      for (const [workerNodeId, workerNode] of workerNodes.entries()) {
+      for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
         if (
+          workerNode.info.ready &&
+          workerNodeId !== workerNodeKey &&
           workerNode.usage.tasks.executing <
-          (this.opts.tasksQueueOptions?.concurrency as number)
+            (this.opts.tasksQueueOptions?.concurrency as number)
         ) {
           executeTask = true
         }
-        if (workerNode.info.ready && workerNode.usage.tasks.queued === 0) {
-          targetWorkerNodeKey = workerNodeId
+        if (
+          workerNode.info.ready &&
+          workerNodeId !== workerNodeKey &&
+          workerNode.usage.tasks.queued === 0
+        ) {
+          destinationWorkerNodeKey = workerNodeId
           break
         }
         if (
           workerNode.info.ready &&
+          workerNodeId !== workerNodeKey &&
           workerNode.usage.tasks.queued < minQueuedTasks
         ) {
           minQueuedTasks = workerNode.usage.tasks.queued
-          targetWorkerNodeKey = workerNodeId
+          destinationWorkerNodeKey = workerNodeId
         }
       }
+      const task = {
+        ...(this.dequeueTask(workerNodeKey) as Task<Data>),
+        workerId: (this.getWorkerInfo(destinationWorkerNodeKey) as WorkerInfo)
+          .id as number
+      }
       if (executeTask) {
-        this.executeTask(
-          targetWorkerNodeKey,
-          this.dequeueTask(workerNodeKey) as Task<Data>
-        )
+        this.executeTask(destinationWorkerNodeKey, task)
       } else {
-        this.enqueueTask(
-          targetWorkerNodeKey,
-          this.dequeueTask(workerNodeKey) as Task<Data>
-        )
+        this.enqueueTask(destinationWorkerNodeKey, task)
+      }
+    }
+  }
+
+  private taskStealingOnEmptyQueue (workerId: number): void {
+    const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
+    const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey]
+    const workerNodes = this.workerNodes
+      .slice()
+      .sort(
+        (workerNodeA, workerNodeB) =>
+          workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
+      )
+    for (const sourceWorkerNode of workerNodes) {
+      if (
+        sourceWorkerNode.info.ready &&
+        sourceWorkerNode.info.id !== workerId &&
+        sourceWorkerNode.usage.tasks.queued > 0
+      ) {
+        const task = {
+          ...(sourceWorkerNode.popTask() as Task<Data>),
+          workerId: destinationWorkerNode.info.id as number
+        }
+        if (
+          destinationWorkerNode.usage.tasks.executing <
+          (this.opts.tasksQueueOptions?.concurrency as number)
+        ) {
+          this.executeTask(destinationWorkerNodeKey, task)
+        } else {
+          this.enqueueTask(destinationWorkerNodeKey, task)
+        }
+        break
       }
     }
   }
@@ -1231,7 +1268,7 @@ export abstract class AbstractPool<
     const sourceWorkerNode =
       this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
     const workerNodes = this.workerNodes
-      .filter((workerNode) => workerNode.info.id !== workerId)
+      .slice()
       .sort(
         (workerNodeA, workerNodeB) =>
           workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
@@ -1239,22 +1276,21 @@ export abstract class AbstractPool<
     for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
       if (
         workerNode.info.ready &&
+        workerNode.info.id !== workerId &&
         sourceWorkerNode.usage.tasks.queued > 0 &&
         !workerNode.hasBackPressure()
       ) {
+        const task = {
+          ...(sourceWorkerNode.popTask() as Task<Data>),
+          workerId: workerNode.info.id as number
+        }
         if (
           workerNode.usage.tasks.executing <
           (this.opts.tasksQueueOptions?.concurrency as number)
         ) {
-          this.executeTask(
-            workerNodeKey,
-            sourceWorkerNode.popTask() as Task<Data>
-          )
+          this.executeTask(workerNodeKey, task)
         } else {
-          this.enqueueTask(
-            workerNodeKey,
-            sourceWorkerNode.popTask() as Task<Data>
-          )
+          this.enqueueTask(workerNodeKey, task)
         }
       }
     }
index bbfd33bd43585b12e7aee5f778bcae78c98c46d3..bc7a7cd6f54b2acf9baf9df7a9c386c564ccd04c 100644 (file)
@@ -1 +1 @@
-export const version = '2.6.32'
+export const version = '2.6.33'
index d8044032014d02d36e6cbe70ffd10026f6a085ea..116fb8448d8435ecb98c5573bb781f258026e589 100644 (file)
@@ -32,6 +32,8 @@ implements IWorkerNode<Worker, Data> {
   public tasksQueueBackPressureSize: number
   /** @inheritdoc */
   public onBackPressure?: (workerId: number) => void
+  /** @inheritdoc */
+  public onEmptyQueue?: (workerId: number) => void
   private readonly taskFunctionsUsage: Map<string, WorkerUsage>
   private readonly tasksQueue: Deque<Task<Data>>
 
@@ -81,15 +83,6 @@ implements IWorkerNode<Worker, Data> {
     return this.tasksQueue.size
   }
 
-  /**
-   * Tasks queue maximum size.
-   *
-   * @returns The tasks queue maximum size.
-   */
-  private tasksQueueMaxSize (): number {
-    return this.tasksQueue.maxSize
-  }
-
   /** @inheritdoc */
   public enqueueTask (task: Task<Data>): number {
     const tasksQueueSize = this.tasksQueue.push(task)
@@ -110,12 +103,20 @@ implements IWorkerNode<Worker, Data> {
 
   /** @inheritdoc */
   public dequeueTask (): Task<Data> | undefined {
-    return this.tasksQueue.shift()
+    const task = this.tasksQueue.shift()
+    if (this.onEmptyQueue != null && this.tasksQueue.size === 0) {
+      once(this.onEmptyQueue, this)(this.info.id as number)
+    }
+    return task
   }
 
   /** @inheritdoc */
   public popTask (): Task<Data> | undefined {
-    return this.tasksQueue.pop()
+    const task = this.tasksQueue.pop()
+    if (this.onEmptyQueue != null && this.tasksQueue.size === 0) {
+      once(this.onEmptyQueue, this)(this.info.id as number)
+    }
+    return task
   }
 
   /** @inheritdoc */
@@ -180,10 +181,10 @@ implements IWorkerNode<Worker, Data> {
 
   private initWorkerUsage (): WorkerUsage {
     const getTasksQueueSize = (): number => {
-      return this.tasksQueueSize()
+      return this.tasksQueue.size
     }
     const getTasksQueueMaxSize = (): number => {
-      return this.tasksQueueMaxSize()
+      return this.tasksQueue.maxSize
     }
     return {
       tasks: {
index 6b387a96088d430d455ed0a4dbb3759c1cbbe86e..14c8dbe1e1d444f5404e8562c1c59ecf1585d869 100644 (file)
@@ -230,6 +230,12 @@ export interface IWorkerNode<Worker extends IWorker, Data = unknown> {
    * @param workerId - The worker id.
    */
   onBackPressure?: (workerId: number) => void
+  /**
+   * Callback invoked when worker node tasks queue is empty.
+   *
+   * @param workerId - The worker id.
+   */
+  onEmptyQueue?: (workerId: number) => void
   /**
    * Tasks queue size.
    *
index a5790db075f400b3ff4ff93dd6c0a01a3beb62fd..900f0edc9d139a5289e4b54509f80629e60802dc 100644 (file)
@@ -139,7 +139,10 @@ describe('Fixed cluster pool test suite', () => {
     expect(queuePool.info.backPressure).toBe(false)
     await Promise.all(promises)
     for (const workerNode of queuePool.workerNodes) {
-      expect(workerNode.usage.tasks.executing).toBe(0)
+      expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
+      expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
+        numberOfWorkers * maxMultiplier
+      )
       expect(workerNode.usage.tasks.executed).toBe(maxMultiplier)
       expect(workerNode.usage.tasks.queued).toBe(0)
       expect(workerNode.usage.tasks.maxQueued).toBe(
index 4c2c6952944902647177af13756cec63e6f04b0f..8cd337ee6fa1ceb7109e42a7e55255f76690392c 100644 (file)
@@ -139,7 +139,10 @@ describe('Fixed thread pool test suite', () => {
     expect(queuePool.info.backPressure).toBe(false)
     await Promise.all(promises)
     for (const workerNode of queuePool.workerNodes) {
-      expect(workerNode.usage.tasks.executing).toBe(0)
+      expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
+      expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
+        numberOfThreads * maxMultiplier
+      )
       expect(workerNode.usage.tasks.executed).toBe(maxMultiplier)
       expect(workerNode.usage.tasks.queued).toBe(0)
       expect(workerNode.usage.tasks.maxQueued).toBe(