Merge dependabot/npm_and_yarn/examples/typescript/http-client-pool/types/node-20...
authorgithub-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
Wed, 23 Aug 2023 23:00:21 +0000 (23:00 +0000)
committerGitHub <noreply@github.com>
Wed, 23 Aug 2023 23:00:21 +0000 (23:00 +0000)
21 files changed:
CHANGELOG.md
README.md
examples/typescript/http-client-pool/package.json
examples/typescript/http-client-pool/pnpm-lock.yaml
examples/typescript/http-server-pool/fastify-cluster/package.json
examples/typescript/http-server-pool/fastify-cluster/pnpm-lock.yaml
examples/typescript/http-server-pool/fastify-worker_threads/package.json
examples/typescript/http-server-pool/fastify-worker_threads/pnpm-lock.yaml
examples/typescript/smtp-client-pool/package.json
examples/typescript/smtp-client-pool/pnpm-lock.yaml
examples/typescript/websocket-server-pool/ws-cluster/package.json
examples/typescript/websocket-server-pool/ws-cluster/pnpm-lock.yaml
package.json
pnpm-lock.yaml
sonar-project.properties
src/pools/abstract-pool.ts
src/pools/version.ts
src/pools/worker-node.ts
src/pools/worker.ts
tests/pools/cluster/fixed.test.js
tests/pools/thread/fixed.test.js

index 89b1918650b9c7e49de75fa9dcefa1aa42637674..ab21a07781f819151919de0ce36a53bf9e7e638f 100644 (file)
@@ -7,10 +7,20 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
 
 ## [Unreleased]
 
+## [2.6.33] - 2023-08-24
+
+### Fixed
+
+- Fix queued tasks rescheduling.
+
 ### Changed
 
 - Rename tasks queue options `queueMaxSize` to `size`.
 
+### Added
+
+- Task stealing scheduling algorithm if tasks queueing is enabled.
+
 ## [2.6.32] - 2023-08-23
 
 ### Fixed
index eae0023f7b9f9923971601d425aedd89cf8a0a3d..47a680d039609442639275395e62b5b699a540cb 100644 (file)
--- a/README.md
+++ b/README.md
@@ -47,8 +47,9 @@ Please consult our [general guidelines](#general-guidelines).
 - Tasks distribution strategies :white_check_mark:
 - Lockless tasks queueing :white_check_mark:
 - Queued tasks rescheduling:
-  - Tasks redistribution on worker error :white_check_mark:
+  - Task stealing :white_check_mark:
   - Tasks stealing under back pressure :white_check_mark:
+  - Tasks redistribution on worker error :white_check_mark:
 - General guidelines on pool choice :white_check_mark:
 - Error handling out of the box :white_check_mark:
 - Widely tested :white_check_mark:
index 2bfc6bbfae8bbd1d9c35db49446770a28b3850fa..76e1364ad137bd2635425f66c253dca5e0092432 100644 (file)
@@ -22,7 +22,7 @@
   "dependencies": {
     "axios": "^1.4.0",
     "node-fetch": "^3.3.2",
-    "poolifier": "^2.6.28"
+    "poolifier": "^2.6.32"
   },
   "devDependencies": {
     "@types/node": "^20.5.4",
index d96f419276d529a3139cbbb9b6fece149f2fb3b6..a304e371c45c7fff7b21eb7890a08dc275cb8f01 100644 (file)
@@ -12,8 +12,8 @@ dependencies:
     specifier: ^3.3.2
     version: 3.3.2
   poolifier:
-    specifier: ^2.6.28
-    version: 2.6.28
+    specifier: ^2.6.32
+    version: 2.6.32
 
 devDependencies:
   '@types/node':
@@ -120,8 +120,8 @@ packages:
       formdata-polyfill: 4.0.10
     dev: false
 
-  /poolifier@2.6.28:
-    resolution: {integrity: sha512-dm3N/BcJ6n1m9aDidXYx53mI7nQc1QvTGdIL0rA29hOjLsfWq6+7pz977vnxvfGxKG5+pCG+VIT7PcJENQSzng==}
+  /poolifier@2.6.32:
+    resolution: {integrity: sha512-2bXB7C5Uazckw8Li7ZAoSHfBaMHA2h0I5VlsprubPcFrr/KRVi1EUg/j6wPFlZs4tcHouD4ZJOcJjySXpV5EgQ==}
     engines: {node: '>=16.14.0', pnpm: '>=8.6.0'}
     requiresBuild: true
     dev: false
index 0d11646e2a15a0ce1b923a92cf1fcaf7377e1dc2..8d2121ee77703f8dacc429c6f9ec79e5566d326f 100644 (file)
   "license": "ISC",
   "dependencies": {
     "fastify": "^4.21.0",
-    "poolifier": "^2.6.28"
+    "poolifier": "^2.6.32"
   },
   "devDependencies": {
     "@rollup/plugin-typescript": "^11.1.2",
-    "@types/node": "^20.5.1",
+    "@types/node": "^20.5.4",
     "rollup": "^3.28.0",
     "rollup-plugin-delete": "^2.0.0",
     "tslib": "^2.6.2",
index 9749aedcbe58632ae1234da348de0af1917ccca4..d55cfe1e22036966afb1314a80e6be99a4dbfac5 100644 (file)
@@ -9,16 +9,16 @@ dependencies:
     specifier: ^4.21.0
     version: 4.21.0
   poolifier:
-    specifier: ^2.6.28
-    version: 2.6.28
+    specifier: ^2.6.32
+    version: 2.6.32
 
 devDependencies:
   '@rollup/plugin-typescript':
     specifier: ^11.1.2
     version: 11.1.2(rollup@3.28.0)(tslib@2.6.2)(typescript@5.1.6)
   '@types/node':
-    specifier: ^20.5.1
-    version: 20.5.1
+    specifier: ^20.5.4
+    version: 20.5.4
   rollup:
     specifier: ^3.28.0
     version: 3.28.0
@@ -120,15 +120,15 @@ packages:
     resolution: {integrity: sha512-ZUxbzKl0IfJILTS6t7ip5fQQM/J3TJYubDm3nMbgubNNYS62eXeUpoLUC8/7fJNiFYHTrGPQn7hspDUzIHX3UA==}
     dependencies:
       '@types/minimatch': 5.1.2
-      '@types/node': 20.5.1
+      '@types/node': 20.5.4
     dev: true
 
   /@types/minimatch@5.1.2:
     resolution: {integrity: sha512-K0VQKziLUWkVKiRVrx4a40iPaxTUefQmjtkQofBkYRcoaaL/8rhwDWww9qWbrgicNOgnpIsMxyNIUM4+n6dUIA==}
     dev: true
 
-  /@types/node@20.5.1:
-    resolution: {integrity: sha512-4tT2UrL5LBqDwoed9wZ6N3umC4Yhz3W3FloMmiiG4JwmUJWpie0c7lcnUNd4gtMKuDEO4wRVS8B6Xa0uMRsMKg==}
+  /@types/node@20.5.4:
+    resolution: {integrity: sha512-Y9vbIAoM31djQZrPYjpTLo0XlaSwOIsrlfE3LpulZeRblttsLQRFRlBAppW0LOxyT3ALj2M5vU1ucQQayQH3jA==}
     dev: true
 
   /abort-controller@3.0.0:
@@ -610,8 +610,8 @@ packages:
       thread-stream: 2.4.0
     dev: false
 
-  /poolifier@2.6.28:
-    resolution: {integrity: sha512-dm3N/BcJ6n1m9aDidXYx53mI7nQc1QvTGdIL0rA29hOjLsfWq6+7pz977vnxvfGxKG5+pCG+VIT7PcJENQSzng==}
+  /poolifier@2.6.32:
+    resolution: {integrity: sha512-2bXB7C5Uazckw8Li7ZAoSHfBaMHA2h0I5VlsprubPcFrr/KRVi1EUg/j6wPFlZs4tcHouD4ZJOcJjySXpV5EgQ==}
     engines: {node: '>=16.14.0', pnpm: '>=8.6.0'}
     requiresBuild: true
     dev: false
index ea95de2713cf3aafe5fa664e3a456cef6b2f4666..0ccdbfa4ffb6e1873568ee1a6aaaaf469661445b 100644 (file)
   "dependencies": {
     "fastify": "^4.21.0",
     "fastify-plugin": "^4.5.1",
-    "poolifier": "^2.6.28"
+    "poolifier": "^2.6.32"
   },
   "devDependencies": {
-    "@types/node": "^20.5.1",
+    "@types/node": "^20.5.4",
     "typescript": "^5.1.6"
   }
 }
index 4af3c13cadc86f08087b0775aedc322089fe237a..7c7bb4e409d305755da97e09020c68124161c9c3 100644 (file)
@@ -12,13 +12,13 @@ dependencies:
     specifier: ^4.5.1
     version: 4.5.1
   poolifier:
-    specifier: ^2.6.28
-    version: 2.6.28
+    specifier: ^2.6.32
+    version: 2.6.32
 
 devDependencies:
   '@types/node':
-    specifier: ^20.5.1
-    version: 20.5.1
+    specifier: ^20.5.4
+    version: 20.5.4
   typescript:
     specifier: ^5.1.6
     version: 5.1.6
@@ -47,8 +47,8 @@ packages:
       fast-json-stringify: 5.8.0
     dev: false
 
-  /@types/node@20.5.1:
-    resolution: {integrity: sha512-4tT2UrL5LBqDwoed9wZ6N3umC4Yhz3W3FloMmiiG4JwmUJWpie0c7lcnUNd4gtMKuDEO4wRVS8B6Xa0uMRsMKg==}
+  /@types/node@20.5.4:
+    resolution: {integrity: sha512-Y9vbIAoM31djQZrPYjpTLo0XlaSwOIsrlfE3LpulZeRblttsLQRFRlBAppW0LOxyT3ALj2M5vU1ucQQayQH3jA==}
     dev: true
 
   /abort-controller@3.0.0:
@@ -288,8 +288,8 @@ packages:
       thread-stream: 2.4.0
     dev: false
 
-  /poolifier@2.6.28:
-    resolution: {integrity: sha512-dm3N/BcJ6n1m9aDidXYx53mI7nQc1QvTGdIL0rA29hOjLsfWq6+7pz977vnxvfGxKG5+pCG+VIT7PcJENQSzng==}
+  /poolifier@2.6.32:
+    resolution: {integrity: sha512-2bXB7C5Uazckw8Li7ZAoSHfBaMHA2h0I5VlsprubPcFrr/KRVi1EUg/j6wPFlZs4tcHouD4ZJOcJjySXpV5EgQ==}
     engines: {node: '>=16.14.0', pnpm: '>=8.6.0'}
     requiresBuild: true
     dev: false
index c23fa670ba14779416e6f6d6928271bd610b4e06..42eb8093b4f59579ac5f10d1b3351e904228d46d 100644 (file)
   "license": "ISC",
   "dependencies": {
     "nodemailer": "^6.9.4",
-    "poolifier": "^2.6.28"
+    "poolifier": "^2.6.32"
   },
   "devDependencies": {
-    "@types/node": "^20.5.1",
+    "@types/node": "^20.5.4",
     "@types/nodemailer": "^6.4.9",
     "typescript": "^5.1.6"
   }
index 7638824f9dda3ed21445a2cc0d1ae697638d32fc..818444079a50b6ba0d70a14116fd1c7c9d37f518 100644 (file)
@@ -9,13 +9,13 @@ dependencies:
     specifier: ^6.9.4
     version: 6.9.4
   poolifier:
-    specifier: ^2.6.28
-    version: 2.6.28
+    specifier: ^2.6.32
+    version: 2.6.32
 
 devDependencies:
   '@types/node':
-    specifier: ^20.5.1
-    version: 20.5.1
+    specifier: ^20.5.4
+    version: 20.5.4
   '@types/nodemailer':
     specifier: ^6.4.9
     version: 6.4.9
@@ -25,14 +25,14 @@ devDependencies:
 
 packages:
 
-  /@types/node@20.5.1:
-    resolution: {integrity: sha512-4tT2UrL5LBqDwoed9wZ6N3umC4Yhz3W3FloMmiiG4JwmUJWpie0c7lcnUNd4gtMKuDEO4wRVS8B6Xa0uMRsMKg==}
+  /@types/node@20.5.4:
+    resolution: {integrity: sha512-Y9vbIAoM31djQZrPYjpTLo0XlaSwOIsrlfE3LpulZeRblttsLQRFRlBAppW0LOxyT3ALj2M5vU1ucQQayQH3jA==}
     dev: true
 
   /@types/nodemailer@6.4.9:
     resolution: {integrity: sha512-XYG8Gv+sHjaOtUpiuytahMy2mM3rectgroNbs6R3djZEKmPNiIJwe9KqOJBGzKKnNZNKvnuvmugBgpq3w/S0ig==}
     dependencies:
-      '@types/node': 20.5.1
+      '@types/node': 20.5.4
     dev: true
 
   /nodemailer@6.9.4:
@@ -40,8 +40,8 @@ packages:
     engines: {node: '>=6.0.0'}
     dev: false
 
-  /poolifier@2.6.28:
-    resolution: {integrity: sha512-dm3N/BcJ6n1m9aDidXYx53mI7nQc1QvTGdIL0rA29hOjLsfWq6+7pz977vnxvfGxKG5+pCG+VIT7PcJENQSzng==}
+  /poolifier@2.6.32:
+    resolution: {integrity: sha512-2bXB7C5Uazckw8Li7ZAoSHfBaMHA2h0I5VlsprubPcFrr/KRVi1EUg/j6wPFlZs4tcHouD4ZJOcJjySXpV5EgQ==}
     engines: {node: '>=16.14.0', pnpm: '>=8.6.0'}
     requiresBuild: true
     dev: false
index d6d16d4e6211cea6f2d4c841f07d5446eb09ae3b..fcd74193c95f81b11ebd10e6dcbb7d7d7821e214 100644 (file)
   "author": "",
   "license": "ISC",
   "dependencies": {
-    "poolifier": "^2.6.28",
+    "poolifier": "^2.6.32",
     "ws": "^8.13.0"
   },
   "devDependencies": {
     "@rollup/plugin-typescript": "^11.1.2",
-    "@types/node": "^20.5.1",
+    "@types/node": "^20.5.4",
     "@types/ws": "^8.5.5",
     "rollup": "^3.28.0",
     "rollup-plugin-delete": "^2.0.0",
index 09085128687427f4de7334b202295af473e6c0da..d940a19eb3ce8ffcc6d0c9b6aeb63b883798b285 100644 (file)
@@ -6,8 +6,8 @@ settings:
 
 dependencies:
   poolifier:
-    specifier: ^2.6.28
-    version: 2.6.28
+    specifier: ^2.6.32
+    version: 2.6.32
   ws:
     specifier: ^8.13.0
     version: 8.13.0(bufferutil@4.0.7)(utf-8-validate@6.0.3)
@@ -25,8 +25,8 @@ devDependencies:
     specifier: ^11.1.2
     version: 11.1.2(rollup@3.28.0)(tslib@2.6.2)(typescript@5.1.6)
   '@types/node':
-    specifier: ^20.5.1
-    version: 20.5.1
+    specifier: ^20.5.4
+    version: 20.5.4
   '@types/ws':
     specifier: ^8.5.5
     version: 8.5.5
@@ -109,21 +109,21 @@ packages:
     resolution: {integrity: sha512-ZUxbzKl0IfJILTS6t7ip5fQQM/J3TJYubDm3nMbgubNNYS62eXeUpoLUC8/7fJNiFYHTrGPQn7hspDUzIHX3UA==}
     dependencies:
       '@types/minimatch': 5.1.2
-      '@types/node': 20.5.1
+      '@types/node': 20.5.4
     dev: true
 
   /@types/minimatch@5.1.2:
     resolution: {integrity: sha512-K0VQKziLUWkVKiRVrx4a40iPaxTUefQmjtkQofBkYRcoaaL/8rhwDWww9qWbrgicNOgnpIsMxyNIUM4+n6dUIA==}
     dev: true
 
-  /@types/node@20.5.1:
-    resolution: {integrity: sha512-4tT2UrL5LBqDwoed9wZ6N3umC4Yhz3W3FloMmiiG4JwmUJWpie0c7lcnUNd4gtMKuDEO4wRVS8B6Xa0uMRsMKg==}
+  /@types/node@20.5.4:
+    resolution: {integrity: sha512-Y9vbIAoM31djQZrPYjpTLo0XlaSwOIsrlfE3LpulZeRblttsLQRFRlBAppW0LOxyT3ALj2M5vU1ucQQayQH3jA==}
     dev: true
 
   /@types/ws@8.5.5:
     resolution: {integrity: sha512-lwhs8hktwxSjf9UaZ9tG5M03PGogvFaH8gUgLNbN9HKIg0dvv6q+gkSuJ8HN4/VbyxkuLzCjlN7GquQ0gUJfIg==}
     dependencies:
-      '@types/node': 20.5.1
+      '@types/node': 20.5.4
     dev: true
 
   /aggregate-error@3.1.0:
@@ -393,8 +393,8 @@ packages:
     engines: {node: '>=8.6'}
     dev: true
 
-  /poolifier@2.6.28:
-    resolution: {integrity: sha512-dm3N/BcJ6n1m9aDidXYx53mI7nQc1QvTGdIL0rA29hOjLsfWq6+7pz977vnxvfGxKG5+pCG+VIT7PcJENQSzng==}
+  /poolifier@2.6.32:
+    resolution: {integrity: sha512-2bXB7C5Uazckw8Li7ZAoSHfBaMHA2h0I5VlsprubPcFrr/KRVi1EUg/j6wPFlZs4tcHouD4ZJOcJjySXpV5EgQ==}
     engines: {node: '>=16.14.0', pnpm: '>=8.6.0'}
     requiresBuild: true
     dev: false
index f29841f9b937ffb54ec3c31c0ac25b96cb4383bc..fb788b80359a99523c8b5206b74cc88875a7e42e 100644 (file)
@@ -1,7 +1,7 @@
 {
   "$schema": "https://json.schemastore.org/package",
   "name": "poolifier",
-  "version": "2.6.32",
+  "version": "2.6.33",
   "description": "Fast and small Node.js Worker_Threads and Cluster Worker Pool",
   "license": "MIT",
   "main": "./lib/index.js",
     "@release-it/keep-a-changelog": "^4.0.0",
     "@rollup/plugin-terser": "^0.4.3",
     "@rollup/plugin-typescript": "^11.1.2",
-    "@types/node": "^20.5.3",
+    "@types/node": "^20.5.4",
     "@typescript-eslint/eslint-plugin": "^6.4.1",
     "@typescript-eslint/parser": "^6.4.1",
     "benny": "^3.7.1",
index 2eafe64aad732196518e376347ad1c9e8a4db195..86aaa52f71db1d10908414620fb8b80b6feb206c 100644 (file)
@@ -27,8 +27,8 @@ devDependencies:
     specifier: ^11.1.2
     version: 11.1.2(rollup@3.28.1)(typescript@5.1.6)
   '@types/node':
-    specifier: ^20.5.3
-    version: 20.5.3
+    specifier: ^20.5.4
+    version: 20.5.4
   '@typescript-eslint/eslint-plugin':
     specifier: ^6.4.1
     version: 6.4.1(@typescript-eslint/parser@6.4.1)(eslint@8.47.0)(typescript@5.1.6)
@@ -285,7 +285,7 @@ packages:
       lodash.merge: 4.6.2
       lodash.uniq: 4.5.0
       resolve-from: 5.0.0
-      ts-node: 10.9.1(@types/node@20.5.3)(typescript@5.1.6)
+      ts-node: 10.9.1(@types/node@20.5.4)(typescript@5.1.6)
       typescript: 5.1.6
     transitivePeerDependencies:
       - '@swc/core'
@@ -462,7 +462,7 @@ packages:
       '@jest/schemas': 29.6.3
       '@types/istanbul-lib-coverage': 2.0.4
       '@types/istanbul-reports': 3.0.1
-      '@types/node': 20.5.3
+      '@types/node': 20.5.4
       '@types/yargs': 17.0.24
       chalk: 4.1.2
     dev: true
@@ -893,7 +893,7 @@ packages:
     resolution: {integrity: sha512-ZUxbzKl0IfJILTS6t7ip5fQQM/J3TJYubDm3nMbgubNNYS62eXeUpoLUC8/7fJNiFYHTrGPQn7hspDUzIHX3UA==}
     dependencies:
       '@types/minimatch': 5.1.2
-      '@types/node': 20.5.3
+      '@types/node': 20.5.4
     dev: true
 
   /@types/http-cache-semantics@4.0.1:
@@ -936,8 +936,8 @@ packages:
     resolution: {integrity: sha512-bUBrPjEry2QUTsnuEjzjbS7voGWCc30W0qzgMf90GPeDGFRakvrz47ju+oqDAKCXLUCe39u57/ORMl/O/04/9g==}
     dev: true
 
-  /@types/node@20.5.3:
-    resolution: {integrity: sha512-ITI7rbWczR8a/S6qjAW7DMqxqFMjjTo61qZVWJ1ubPvbIQsL5D/TvwjYEalM8Kthpe3hTzOGrF2TGbAu2uyqeA==}
+  /@types/node@20.5.4:
+    resolution: {integrity: sha512-Y9vbIAoM31djQZrPYjpTLo0XlaSwOIsrlfE3LpulZeRblttsLQRFRlBAppW0LOxyT3ALj2M5vU1ucQQayQH3jA==}
     dev: true
 
   /@types/normalize-package-data@2.4.1:
@@ -1941,7 +1941,7 @@ packages:
     dependencies:
       '@types/node': 20.4.7
       cosmiconfig: 8.2.0
-      ts-node: 10.9.1(@types/node@20.5.3)(typescript@5.1.6)
+      ts-node: 10.9.1(@types/node@20.5.4)(typescript@5.1.6)
       typescript: 5.1.6
     dev: true
 
@@ -4024,7 +4024,7 @@ packages:
     engines: {node: ^14.15.0 || ^16.10.0 || >=18.0.0}
     dependencies:
       '@jest/types': 29.6.3
-      '@types/node': 20.5.3
+      '@types/node': 20.5.4
       chalk: 4.1.2
       ci-info: 3.8.0
       graceful-fs: 4.2.11
@@ -5996,7 +5996,7 @@ packages:
       typescript: 5.1.6
     dev: true
 
-  /ts-node@10.9.1(@types/node@20.5.3)(typescript@5.1.6):
+  /ts-node@10.9.1(@types/node@20.5.4)(typescript@5.1.6):
     resolution: {integrity: sha512-NtVysVPkxxrwFGUUxGYhfux8k78pQB3JqYBXlLRZgdGUqTO5wU/UyHop5p70iEbGhB7q5KmiZiU0Y3KlJrScEw==}
     hasBin: true
     peerDependencies:
@@ -6015,7 +6015,7 @@ packages:
       '@tsconfig/node12': 1.0.11
       '@tsconfig/node14': 1.0.3
       '@tsconfig/node16': 1.0.4
-      '@types/node': 20.5.3
+      '@types/node': 20.5.4
       acorn: 8.10.0
       acorn-walk: 8.2.0
       arg: 4.1.3
index 757cdd54b92adf0f2f93d08050fe29f77eb6b4d0..f367be262f51bcd1d477698ef9f5b2a3f870c67e 100644 (file)
@@ -2,7 +2,7 @@ sonar.projectKey=pioardi_poolifier
 sonar.organization=pioardi
 sonar.javascript.lcov.reportPaths=coverage/lcov.info
 sonar.projectName=poolifier
-sonar.projectVersion=2.6.32
+sonar.projectVersion=2.6.33
 sonar.host.url=https://sonarcloud.io
 sonar.sources=src
 sonar.tests=tests
index 3a1a87807cc065df871bb1ca6d2195619672a64d..c6712d198bc28a9d77c08ae817e5c96c3b9227c6 100644 (file)
@@ -1156,6 +1156,8 @@ export abstract class AbstractPool<
     // Send the statistics message to worker.
     this.sendStatisticsMessageToWorker(workerNodeKey)
     if (this.opts.enableTasksQueue === true) {
+      this.workerNodes[workerNodeKey].onEmptyQueue =
+        this.taskStealingOnEmptyQueue.bind(this)
       this.workerNodes[workerNodeKey].onBackPressure =
         this.tasksStealingOnBackPressure.bind(this)
     }
@@ -1187,42 +1189,77 @@ export abstract class AbstractPool<
   }
 
   private redistributeQueuedTasks (workerNodeKey: number): void {
-    const workerNodes = this.workerNodes.filter(
-      (_, workerNodeId) => workerNodeId !== workerNodeKey
-    )
     while (this.tasksQueueSize(workerNodeKey) > 0) {
-      let targetWorkerNodeKey: number = workerNodeKey
+      let destinationWorkerNodeKey: number = workerNodeKey
       let minQueuedTasks = Infinity
       let executeTask = false
-      for (const [workerNodeId, workerNode] of workerNodes.entries()) {
+      for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
         if (
+          workerNode.info.ready &&
+          workerNodeId !== workerNodeKey &&
           workerNode.usage.tasks.executing <
-          (this.opts.tasksQueueOptions?.concurrency as number)
+            (this.opts.tasksQueueOptions?.concurrency as number)
         ) {
           executeTask = true
         }
-        if (workerNode.info.ready && workerNode.usage.tasks.queued === 0) {
-          targetWorkerNodeKey = workerNodeId
+        if (
+          workerNode.info.ready &&
+          workerNodeId !== workerNodeKey &&
+          workerNode.usage.tasks.queued === 0
+        ) {
+          destinationWorkerNodeKey = workerNodeId
           break
         }
         if (
           workerNode.info.ready &&
+          workerNodeId !== workerNodeKey &&
           workerNode.usage.tasks.queued < minQueuedTasks
         ) {
           minQueuedTasks = workerNode.usage.tasks.queued
-          targetWorkerNodeKey = workerNodeId
+          destinationWorkerNodeKey = workerNodeId
         }
       }
+      const task = {
+        ...(this.dequeueTask(workerNodeKey) as Task<Data>),
+        workerId: (this.getWorkerInfo(destinationWorkerNodeKey) as WorkerInfo)
+          .id as number
+      }
       if (executeTask) {
-        this.executeTask(
-          targetWorkerNodeKey,
-          this.dequeueTask(workerNodeKey) as Task<Data>
-        )
+        this.executeTask(destinationWorkerNodeKey, task)
       } else {
-        this.enqueueTask(
-          targetWorkerNodeKey,
-          this.dequeueTask(workerNodeKey) as Task<Data>
-        )
+        this.enqueueTask(destinationWorkerNodeKey, task)
+      }
+    }
+  }
+
+  private taskStealingOnEmptyQueue (workerId: number): void {
+    const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
+    const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey]
+    const workerNodes = this.workerNodes
+      .slice()
+      .sort(
+        (workerNodeA, workerNodeB) =>
+          workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
+      )
+    for (const sourceWorkerNode of workerNodes) {
+      if (
+        sourceWorkerNode.info.ready &&
+        sourceWorkerNode.info.id !== workerId &&
+        sourceWorkerNode.usage.tasks.queued > 0
+      ) {
+        const task = {
+          ...(sourceWorkerNode.popTask() as Task<Data>),
+          workerId: destinationWorkerNode.info.id as number
+        }
+        if (
+          destinationWorkerNode.usage.tasks.executing <
+          (this.opts.tasksQueueOptions?.concurrency as number)
+        ) {
+          this.executeTask(destinationWorkerNodeKey, task)
+        } else {
+          this.enqueueTask(destinationWorkerNodeKey, task)
+        }
+        break
       }
     }
   }
@@ -1231,7 +1268,7 @@ export abstract class AbstractPool<
     const sourceWorkerNode =
       this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
     const workerNodes = this.workerNodes
-      .filter((workerNode) => workerNode.info.id !== workerId)
+      .slice()
       .sort(
         (workerNodeA, workerNodeB) =>
           workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
@@ -1239,22 +1276,21 @@ export abstract class AbstractPool<
     for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
       if (
         workerNode.info.ready &&
+        workerNode.info.id !== workerId &&
         sourceWorkerNode.usage.tasks.queued > 0 &&
         !workerNode.hasBackPressure()
       ) {
+        const task = {
+          ...(sourceWorkerNode.popTask() as Task<Data>),
+          workerId: workerNode.info.id as number
+        }
         if (
           workerNode.usage.tasks.executing <
           (this.opts.tasksQueueOptions?.concurrency as number)
         ) {
-          this.executeTask(
-            workerNodeKey,
-            sourceWorkerNode.popTask() as Task<Data>
-          )
+          this.executeTask(workerNodeKey, task)
         } else {
-          this.enqueueTask(
-            workerNodeKey,
-            sourceWorkerNode.popTask() as Task<Data>
-          )
+          this.enqueueTask(workerNodeKey, task)
         }
       }
     }
index bbfd33bd43585b12e7aee5f778bcae78c98c46d3..bc7a7cd6f54b2acf9baf9df7a9c386c564ccd04c 100644 (file)
@@ -1 +1 @@
-export const version = '2.6.32'
+export const version = '2.6.33'
index d8044032014d02d36e6cbe70ffd10026f6a085ea..116fb8448d8435ecb98c5573bb781f258026e589 100644 (file)
@@ -32,6 +32,8 @@ implements IWorkerNode<Worker, Data> {
   public tasksQueueBackPressureSize: number
   /** @inheritdoc */
   public onBackPressure?: (workerId: number) => void
+  /** @inheritdoc */
+  public onEmptyQueue?: (workerId: number) => void
   private readonly taskFunctionsUsage: Map<string, WorkerUsage>
   private readonly tasksQueue: Deque<Task<Data>>
 
@@ -81,15 +83,6 @@ implements IWorkerNode<Worker, Data> {
     return this.tasksQueue.size
   }
 
-  /**
-   * Tasks queue maximum size.
-   *
-   * @returns The tasks queue maximum size.
-   */
-  private tasksQueueMaxSize (): number {
-    return this.tasksQueue.maxSize
-  }
-
   /** @inheritdoc */
   public enqueueTask (task: Task<Data>): number {
     const tasksQueueSize = this.tasksQueue.push(task)
@@ -110,12 +103,20 @@ implements IWorkerNode<Worker, Data> {
 
   /** @inheritdoc */
   public dequeueTask (): Task<Data> | undefined {
-    return this.tasksQueue.shift()
+    const task = this.tasksQueue.shift()
+    if (this.onEmptyQueue != null && this.tasksQueue.size === 0) {
+      once(this.onEmptyQueue, this)(this.info.id as number)
+    }
+    return task
   }
 
   /** @inheritdoc */
   public popTask (): Task<Data> | undefined {
-    return this.tasksQueue.pop()
+    const task = this.tasksQueue.pop()
+    if (this.onEmptyQueue != null && this.tasksQueue.size === 0) {
+      once(this.onEmptyQueue, this)(this.info.id as number)
+    }
+    return task
   }
 
   /** @inheritdoc */
@@ -180,10 +181,10 @@ implements IWorkerNode<Worker, Data> {
 
   private initWorkerUsage (): WorkerUsage {
     const getTasksQueueSize = (): number => {
-      return this.tasksQueueSize()
+      return this.tasksQueue.size
     }
     const getTasksQueueMaxSize = (): number => {
-      return this.tasksQueueMaxSize()
+      return this.tasksQueue.maxSize
     }
     return {
       tasks: {
index 6b387a96088d430d455ed0a4dbb3759c1cbbe86e..14c8dbe1e1d444f5404e8562c1c59ecf1585d869 100644 (file)
@@ -230,6 +230,12 @@ export interface IWorkerNode<Worker extends IWorker, Data = unknown> {
    * @param workerId - The worker id.
    */
   onBackPressure?: (workerId: number) => void
+  /**
+   * Callback invoked when worker node tasks queue is empty.
+   *
+   * @param workerId - The worker id.
+   */
+  onEmptyQueue?: (workerId: number) => void
   /**
    * Tasks queue size.
    *
index a5790db075f400b3ff4ff93dd6c0a01a3beb62fd..900f0edc9d139a5289e4b54509f80629e60802dc 100644 (file)
@@ -139,7 +139,10 @@ describe('Fixed cluster pool test suite', () => {
     expect(queuePool.info.backPressure).toBe(false)
     await Promise.all(promises)
     for (const workerNode of queuePool.workerNodes) {
-      expect(workerNode.usage.tasks.executing).toBe(0)
+      expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
+      expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
+        numberOfWorkers * maxMultiplier
+      )
       expect(workerNode.usage.tasks.executed).toBe(maxMultiplier)
       expect(workerNode.usage.tasks.queued).toBe(0)
       expect(workerNode.usage.tasks.maxQueued).toBe(
index 4c2c6952944902647177af13756cec63e6f04b0f..8cd337ee6fa1ceb7109e42a7e55255f76690392c 100644 (file)
@@ -139,7 +139,10 @@ describe('Fixed thread pool test suite', () => {
     expect(queuePool.info.backPressure).toBe(false)
     await Promise.all(promises)
     for (const workerNode of queuePool.workerNodes) {
-      expect(workerNode.usage.tasks.executing).toBe(0)
+      expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
+      expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
+        numberOfThreads * maxMultiplier
+      )
       expect(workerNode.usage.tasks.executed).toBe(maxMultiplier)
       expect(workerNode.usage.tasks.queued).toBe(0)
       expect(workerNode.usage.tasks.maxQueued).toBe(