fix: fix race condition at counting executing tasks on worker node
authorJérôme Benoit <jerome.benoit@sap.com>
Fri, 18 Aug 2023 18:30:39 +0000 (20:30 +0200)
committerJérôme Benoit <jerome.benoit@sap.com>
Fri, 18 Aug 2023 18:30:39 +0000 (20:30 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>
CHANGELOG.md
package.json
pnpm-lock.yaml
src/pools/abstract-pool.ts
tests/pools/abstract/abstract-pool.test.js
tests/pools/cluster/fixed.test.js
tests/pools/thread/fixed.test.js

index 9f8bd4ff8e5b375facca9307f6b47b5830f228fc..1c61fc49de631077b2b075f052dbce1944d02c25 100644 (file)
@@ -12,6 +12,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
 - Fix race condition between ready and task functions worker message handling at startup.
 - Fix duplicate task function worker usage statistics computation per task function.
 - Update task function worker usage statistics if and only if there's at least two different task functions.
+- Fix race condition at task function worker usage executing task computation leading to negative value.
 
 ### Added
 
index ea4783ec2fdfe2f49793b3c89d0e3f6d53a101a1..c402a1bcd8a26a602ef2761d52c76f526ef6f136 100644 (file)
     "@release-it/keep-a-changelog": "^4.0.0",
     "@rollup/plugin-terser": "^0.4.3",
     "@rollup/plugin-typescript": "^11.1.2",
-    "@types/node": "^20.5.0",
+    "@types/node": "^20.5.1",
     "@typescript-eslint/eslint-plugin": "^5.62.0",
     "@typescript-eslint/parser": "^5.62.0",
     "benny": "^3.7.1",
index dcddb5e2da8261756f66c15f1162c3e193c7409b..c807e95b33d0886a461c25f2abf52acb02719204 100644 (file)
@@ -27,8 +27,8 @@ devDependencies:
     specifier: ^11.1.2
     version: 11.1.2(rollup@3.28.0)(typescript@5.1.6)
   '@types/node':
-    specifier: ^20.5.0
-    version: 20.5.0
+    specifier: ^20.5.1
+    version: 20.5.1
   '@typescript-eslint/eslint-plugin':
     specifier: ^5.62.0
     version: 5.62.0(@typescript-eslint/parser@5.62.0)(eslint@8.47.0)(typescript@5.1.6)
@@ -285,7 +285,7 @@ packages:
       lodash.merge: 4.6.2
       lodash.uniq: 4.5.0
       resolve-from: 5.0.0
-      ts-node: 10.9.1(@types/node@20.5.0)(typescript@5.1.6)
+      ts-node: 10.9.1(@types/node@20.5.1)(typescript@5.1.6)
       typescript: 5.1.6
     transitivePeerDependencies:
       - '@swc/core'
@@ -462,7 +462,7 @@ packages:
       '@jest/schemas': 29.6.0
       '@types/istanbul-lib-coverage': 2.0.4
       '@types/istanbul-reports': 3.0.1
-      '@types/node': 20.5.0
+      '@types/node': 20.5.1
       '@types/yargs': 17.0.24
       chalk: 4.1.2
     dev: true
@@ -893,7 +893,7 @@ packages:
     resolution: {integrity: sha512-ZUxbzKl0IfJILTS6t7ip5fQQM/J3TJYubDm3nMbgubNNYS62eXeUpoLUC8/7fJNiFYHTrGPQn7hspDUzIHX3UA==}
     dependencies:
       '@types/minimatch': 5.1.2
-      '@types/node': 20.5.0
+      '@types/node': 20.5.1
     dev: true
 
   /@types/http-cache-semantics@4.0.1:
@@ -936,8 +936,8 @@ packages:
     resolution: {integrity: sha512-bUBrPjEry2QUTsnuEjzjbS7voGWCc30W0qzgMf90GPeDGFRakvrz47ju+oqDAKCXLUCe39u57/ORMl/O/04/9g==}
     dev: true
 
-  /@types/node@20.5.0:
-    resolution: {integrity: sha512-Mgq7eCtoTjT89FqNoTzzXg2XvCi5VMhRV6+I2aYanc6kQCBImeNaAYRs/DyoVqk1YEUJK5gN9VO7HRIdz4Wo3Q==}
+  /@types/node@20.5.1:
+    resolution: {integrity: sha512-4tT2UrL5LBqDwoed9wZ6N3umC4Yhz3W3FloMmiiG4JwmUJWpie0c7lcnUNd4gtMKuDEO4wRVS8B6Xa0uMRsMKg==}
     dev: true
 
   /@types/normalize-package-data@2.4.1:
@@ -1338,7 +1338,7 @@ packages:
     resolution: {integrity: sha512-x1FCFnFifvYDDzTaLII71vG5uvDwgtmDTEVWAxrgeiR8VjMONcCXJx7E+USjDtHlwFmt9MysbqgF9b9Vjr6w+w==}
     engines: {node: '>=4'}
     dependencies:
-      tslib: 2.6.1
+      tslib: 2.6.2
     dev: true
 
   /astral-regex@2.0.0:
@@ -1810,7 +1810,7 @@ packages:
     dependencies:
       '@types/node': 20.4.7
       cosmiconfig: 8.2.0
-      ts-node: 10.9.1(@types/node@20.5.0)(typescript@5.1.6)
+      ts-node: 10.9.1(@types/node@20.5.1)(typescript@5.1.6)
       typescript: 5.1.6
     dev: true
 
@@ -2733,7 +2733,7 @@ packages:
     engines: {node: ^14.15.0 || ^16.10.0 || >=18.0.0}
     dependencies:
       '@jest/expect-utils': 29.6.2
-      '@types/node': 20.5.0
+      '@types/node': 20.5.1
       jest-get-type: 29.4.3
       jest-matcher-utils: 29.6.2
       jest-message-util: 29.6.2
@@ -3830,7 +3830,7 @@ packages:
     engines: {node: ^14.15.0 || ^16.10.0 || >=18.0.0}
     dependencies:
       '@jest/types': 29.6.1
-      '@types/node': 20.5.0
+      '@types/node': 20.5.1
       chalk: 4.1.2
       ci-info: 3.8.0
       graceful-fs: 4.2.11
@@ -5323,7 +5323,7 @@ packages:
   /rxjs@7.8.1:
     resolution: {integrity: sha512-AA3TVj+0A2iuIoQkWEK/tqFjBq2j+6PO6Y0zJcvzLAFhEFIO3HL0vls9hWLncZbAAbK0mar7oZ4V079I/qPMxg==}
     dependencies:
-      tslib: 2.6.1
+      tslib: 2.6.2
     dev: true
 
   /safe-array-concat@1.0.0:
@@ -5793,7 +5793,7 @@ packages:
     engines: {node: '>=8'}
     dev: true
 
-  /ts-node@10.9.1(@types/node@20.5.0)(typescript@5.1.6):
+  /ts-node@10.9.1(@types/node@20.5.1)(typescript@5.1.6):
     resolution: {integrity: sha512-NtVysVPkxxrwFGUUxGYhfux8k78pQB3JqYBXlLRZgdGUqTO5wU/UyHop5p70iEbGhB7q5KmiZiU0Y3KlJrScEw==}
     hasBin: true
     peerDependencies:
@@ -5812,7 +5812,7 @@ packages:
       '@tsconfig/node12': 1.0.11
       '@tsconfig/node14': 1.0.3
       '@tsconfig/node16': 1.0.4
-      '@types/node': 20.5.0
+      '@types/node': 20.5.1
       acorn: 8.10.0
       acorn-walk: 8.2.0
       arg: 4.1.3
@@ -5863,8 +5863,8 @@ packages:
     resolution: {integrity: sha512-Xni35NKzjgMrwevysHTCArtLDpPvye8zV/0E4EyYn43P7/7qvQwPh9BGkHewbMulVntbigmcT7rdX3BNo9wRJg==}
     dev: true
 
-  /tslib@2.6.1:
-    resolution: {integrity: sha512-t0hLfiEKfMUoqhG+U1oid7Pva4bbDPHYfJNiB7BiIjRkj1pyC++4N3huJfqY6aRH6VTB0rvtzQwjM4K6qpfOig==}
+  /tslib@2.6.2:
+    resolution: {integrity: sha512-AEYxH93jGFPn/a2iVAwW87VuUIkR1FVUKB77NwMF7nBTDkDrrT/Hpt/IrCJ0QXhW27jTBDcf5ZY7w6RiqTMw2Q==}
     dev: true
 
   /tsutils@3.21.0(typescript@5.1.6):
index 53aad1fdfcb548f56c9b1e87e93e86799b11271d..bb5f02d49923c99c4b21dd068b431524e5b8ce38 100644 (file)
@@ -854,7 +854,19 @@ export abstract class AbstractPool<
     message: MessageValue<Response>
   ): void {
     const workerTaskStatistics = workerUsage.tasks
-    --workerTaskStatistics.executing
+    if (
+      workerTaskStatistics.executing != null &&
+      workerTaskStatistics.executing > 0
+    ) {
+      --workerTaskStatistics.executing
+    } else if (
+      workerTaskStatistics.executing != null &&
+      workerTaskStatistics.executing < 0
+    ) {
+      throw new Error(
+        'Worker usage statistics for tasks executing cannot be negative'
+      )
+    }
     if (message.taskError == null) {
       ++workerTaskStatistics.executed
     } else {
index a395782c32a3e184ee784c78a5577127fd4adeb5..58d221ab04a996f7dc8b98272b54c7d607f2d5b0 100644 (file)
@@ -913,5 +913,36 @@ describe('Abstract pool test suite', () => {
     expect(result2).toBe(3628800)
     const result3 = await pool.execute(data, 'fibonacci')
     expect(result3).toBe(55)
+    expect(pool.info.executingTasks).toBe(0)
+    expect(pool.info.executedTasks).toBe(4)
+    for (const name of pool.listTaskFunctions()) {
+      for (const workerNode of pool.workerNodes) {
+        expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
+          tasks: {
+            executed: expect.any(Number),
+            executing: expect.any(Number),
+            failed: 0,
+            queued: 0
+          },
+          runTime: {
+            history: expect.any(CircularArray)
+          },
+          waitTime: {
+            history: expect.any(CircularArray)
+          },
+          elu: {
+            idle: {
+              history: expect.any(CircularArray)
+            },
+            active: {
+              history: expect.any(CircularArray)
+            }
+          }
+        })
+        expect(
+          workerNode.getTaskFunctionWorkerUsage(name).tasks.executing
+        ).toBeGreaterThanOrEqual(0)
+      }
+    }
   })
 })
index 749cacaa4d4ade34727242e21caa1071be504b4c..e563b0ba520c67f2f11b44221e61b88b098c6496 100644 (file)
@@ -111,6 +111,7 @@ describe('Fixed cluster pool test suite', () => {
     }
     expect(promises.size).toBe(numberOfWorkers * maxMultiplier)
     for (const workerNode of queuePool.workerNodes) {
+      expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
       expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
         queuePool.opts.tasksQueueOptions.concurrency
       )
index 9ccbf67cfb38ab49a36a06bcc3974208925ca0e5..1a372a26f4e66238f59f64a009a69ddc21982ad8 100644 (file)
@@ -111,6 +111,7 @@ describe('Fixed thread pool test suite', () => {
     }
     expect(promises.size).toBe(numberOfThreads * maxMultiplier)
     for (const workerNode of queuePool.workerNodes) {
+      expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
       expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
         queuePool.opts.tasksQueueOptions.concurrency
       )