From 5bb5be17a795c3061bdb0a7748b42b2ba2060db3 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Fri, 18 Aug 2023 20:30:39 +0200 Subject: [PATCH] fix: fix race condition at counting executing tasks on worker node MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- CHANGELOG.md | 1 + package.json | 2 +- pnpm-lock.yaml | 32 +++++++++++----------- src/pools/abstract-pool.ts | 14 +++++++++- tests/pools/abstract/abstract-pool.test.js | 31 +++++++++++++++++++++ tests/pools/cluster/fixed.test.js | 1 + tests/pools/thread/fixed.test.js | 1 + 7 files changed, 64 insertions(+), 18 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9f8bd4ff..1c61fc49 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Fix race condition between ready and task functions worker message handling at startup. - Fix duplicate task function worker usage statistics computation per task function. - Update task function worker usage statistics if and only if there's at least two different task functions. +- Fix race condition at task function worker usage executing task computation leading to negative value. ### Added diff --git a/package.json b/package.json index ea4783ec..c402a1bc 100644 --- a/package.json +++ b/package.json @@ -112,7 +112,7 @@ "@release-it/keep-a-changelog": "^4.0.0", "@rollup/plugin-terser": "^0.4.3", "@rollup/plugin-typescript": "^11.1.2", - "@types/node": "^20.5.0", + "@types/node": "^20.5.1", "@typescript-eslint/eslint-plugin": "^5.62.0", "@typescript-eslint/parser": "^5.62.0", "benny": "^3.7.1", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index dcddb5e2..c807e95b 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -27,8 +27,8 @@ devDependencies: specifier: ^11.1.2 version: 11.1.2(rollup@3.28.0)(typescript@5.1.6) '@types/node': - specifier: ^20.5.0 - version: 20.5.0 + specifier: ^20.5.1 + version: 20.5.1 '@typescript-eslint/eslint-plugin': specifier: ^5.62.0 version: 5.62.0(@typescript-eslint/parser@5.62.0)(eslint@8.47.0)(typescript@5.1.6) @@ -285,7 +285,7 @@ packages: lodash.merge: 4.6.2 lodash.uniq: 4.5.0 resolve-from: 5.0.0 - ts-node: 10.9.1(@types/node@20.5.0)(typescript@5.1.6) + ts-node: 10.9.1(@types/node@20.5.1)(typescript@5.1.6) typescript: 5.1.6 transitivePeerDependencies: - '@swc/core' @@ -462,7 +462,7 @@ packages: '@jest/schemas': 29.6.0 '@types/istanbul-lib-coverage': 2.0.4 '@types/istanbul-reports': 3.0.1 - '@types/node': 20.5.0 + '@types/node': 20.5.1 '@types/yargs': 17.0.24 chalk: 4.1.2 dev: true @@ -893,7 +893,7 @@ packages: resolution: {integrity: sha512-ZUxbzKl0IfJILTS6t7ip5fQQM/J3TJYubDm3nMbgubNNYS62eXeUpoLUC8/7fJNiFYHTrGPQn7hspDUzIHX3UA==} dependencies: '@types/minimatch': 5.1.2 - '@types/node': 20.5.0 + '@types/node': 20.5.1 dev: true /@types/http-cache-semantics@4.0.1: @@ -936,8 +936,8 @@ packages: resolution: {integrity: sha512-bUBrPjEry2QUTsnuEjzjbS7voGWCc30W0qzgMf90GPeDGFRakvrz47ju+oqDAKCXLUCe39u57/ORMl/O/04/9g==} dev: true - /@types/node@20.5.0: - resolution: {integrity: sha512-Mgq7eCtoTjT89FqNoTzzXg2XvCi5VMhRV6+I2aYanc6kQCBImeNaAYRs/DyoVqk1YEUJK5gN9VO7HRIdz4Wo3Q==} + /@types/node@20.5.1: + resolution: {integrity: sha512-4tT2UrL5LBqDwoed9wZ6N3umC4Yhz3W3FloMmiiG4JwmUJWpie0c7lcnUNd4gtMKuDEO4wRVS8B6Xa0uMRsMKg==} dev: true /@types/normalize-package-data@2.4.1: @@ -1338,7 +1338,7 @@ packages: resolution: {integrity: sha512-x1FCFnFifvYDDzTaLII71vG5uvDwgtmDTEVWAxrgeiR8VjMONcCXJx7E+USjDtHlwFmt9MysbqgF9b9Vjr6w+w==} engines: {node: '>=4'} dependencies: - tslib: 2.6.1 + tslib: 2.6.2 dev: true /astral-regex@2.0.0: @@ -1810,7 +1810,7 @@ packages: dependencies: '@types/node': 20.4.7 cosmiconfig: 8.2.0 - ts-node: 10.9.1(@types/node@20.5.0)(typescript@5.1.6) + ts-node: 10.9.1(@types/node@20.5.1)(typescript@5.1.6) typescript: 5.1.6 dev: true @@ -2733,7 +2733,7 @@ packages: engines: {node: ^14.15.0 || ^16.10.0 || >=18.0.0} dependencies: '@jest/expect-utils': 29.6.2 - '@types/node': 20.5.0 + '@types/node': 20.5.1 jest-get-type: 29.4.3 jest-matcher-utils: 29.6.2 jest-message-util: 29.6.2 @@ -3830,7 +3830,7 @@ packages: engines: {node: ^14.15.0 || ^16.10.0 || >=18.0.0} dependencies: '@jest/types': 29.6.1 - '@types/node': 20.5.0 + '@types/node': 20.5.1 chalk: 4.1.2 ci-info: 3.8.0 graceful-fs: 4.2.11 @@ -5323,7 +5323,7 @@ packages: /rxjs@7.8.1: resolution: {integrity: sha512-AA3TVj+0A2iuIoQkWEK/tqFjBq2j+6PO6Y0zJcvzLAFhEFIO3HL0vls9hWLncZbAAbK0mar7oZ4V079I/qPMxg==} dependencies: - tslib: 2.6.1 + tslib: 2.6.2 dev: true /safe-array-concat@1.0.0: @@ -5793,7 +5793,7 @@ packages: engines: {node: '>=8'} dev: true - /ts-node@10.9.1(@types/node@20.5.0)(typescript@5.1.6): + /ts-node@10.9.1(@types/node@20.5.1)(typescript@5.1.6): resolution: {integrity: sha512-NtVysVPkxxrwFGUUxGYhfux8k78pQB3JqYBXlLRZgdGUqTO5wU/UyHop5p70iEbGhB7q5KmiZiU0Y3KlJrScEw==} hasBin: true peerDependencies: @@ -5812,7 +5812,7 @@ packages: '@tsconfig/node12': 1.0.11 '@tsconfig/node14': 1.0.3 '@tsconfig/node16': 1.0.4 - '@types/node': 20.5.0 + '@types/node': 20.5.1 acorn: 8.10.0 acorn-walk: 8.2.0 arg: 4.1.3 @@ -5863,8 +5863,8 @@ packages: resolution: {integrity: sha512-Xni35NKzjgMrwevysHTCArtLDpPvye8zV/0E4EyYn43P7/7qvQwPh9BGkHewbMulVntbigmcT7rdX3BNo9wRJg==} dev: true - /tslib@2.6.1: - resolution: {integrity: sha512-t0hLfiEKfMUoqhG+U1oid7Pva4bbDPHYfJNiB7BiIjRkj1pyC++4N3huJfqY6aRH6VTB0rvtzQwjM4K6qpfOig==} + /tslib@2.6.2: + resolution: {integrity: sha512-AEYxH93jGFPn/a2iVAwW87VuUIkR1FVUKB77NwMF7nBTDkDrrT/Hpt/IrCJ0QXhW27jTBDcf5ZY7w6RiqTMw2Q==} dev: true /tsutils@3.21.0(typescript@5.1.6): diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 53aad1fd..bb5f02d4 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -854,7 +854,19 @@ export abstract class AbstractPool< message: MessageValue ): void { const workerTaskStatistics = workerUsage.tasks - --workerTaskStatistics.executing + if ( + workerTaskStatistics.executing != null && + workerTaskStatistics.executing > 0 + ) { + --workerTaskStatistics.executing + } else if ( + workerTaskStatistics.executing != null && + workerTaskStatistics.executing < 0 + ) { + throw new Error( + 'Worker usage statistics for tasks executing cannot be negative' + ) + } if (message.taskError == null) { ++workerTaskStatistics.executed } else { diff --git a/tests/pools/abstract/abstract-pool.test.js b/tests/pools/abstract/abstract-pool.test.js index a395782c..58d221ab 100644 --- a/tests/pools/abstract/abstract-pool.test.js +++ b/tests/pools/abstract/abstract-pool.test.js @@ -913,5 +913,36 @@ describe('Abstract pool test suite', () => { expect(result2).toBe(3628800) const result3 = await pool.execute(data, 'fibonacci') expect(result3).toBe(55) + expect(pool.info.executingTasks).toBe(0) + expect(pool.info.executedTasks).toBe(4) + for (const name of pool.listTaskFunctions()) { + for (const workerNode of pool.workerNodes) { + expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({ + tasks: { + executed: expect.any(Number), + executing: expect.any(Number), + failed: 0, + queued: 0 + }, + runTime: { + history: expect.any(CircularArray) + }, + waitTime: { + history: expect.any(CircularArray) + }, + elu: { + idle: { + history: expect.any(CircularArray) + }, + active: { + history: expect.any(CircularArray) + } + } + }) + expect( + workerNode.getTaskFunctionWorkerUsage(name).tasks.executing + ).toBeGreaterThanOrEqual(0) + } + } }) }) diff --git a/tests/pools/cluster/fixed.test.js b/tests/pools/cluster/fixed.test.js index 749cacaa..e563b0ba 100644 --- a/tests/pools/cluster/fixed.test.js +++ b/tests/pools/cluster/fixed.test.js @@ -111,6 +111,7 @@ describe('Fixed cluster pool test suite', () => { } expect(promises.size).toBe(numberOfWorkers * maxMultiplier) for (const workerNode of queuePool.workerNodes) { + expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0) expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual( queuePool.opts.tasksQueueOptions.concurrency ) diff --git a/tests/pools/thread/fixed.test.js b/tests/pools/thread/fixed.test.js index 9ccbf67c..1a372a26 100644 --- a/tests/pools/thread/fixed.test.js +++ b/tests/pools/thread/fixed.test.js @@ -111,6 +111,7 @@ describe('Fixed thread pool test suite', () => { } expect(promises.size).toBe(numberOfThreads * maxMultiplier) for (const workerNode of queuePool.workerNodes) { + expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0) expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual( queuePool.opts.tasksQueueOptions.concurrency ) -- 2.34.1