From a05c10de598321c3ad8005f32b1cd082ec1500f5 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Fri, 21 Oct 2022 09:43:11 +0200 Subject: [PATCH] Optimize tasks usage lookup implementation. MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- CHANGELOG.md | 3 +- .../versus-external-pools/BENCH-100000.md | 22 +++---- src/pools/abstract-pool.ts | 63 +++++++++++-------- src/pools/cluster/fixed.ts | 2 +- src/pools/thread/fixed.ts | 2 +- src/worker/abstract-worker.ts | 2 +- tests/pools/abstract/abstract-pool.test.js | 2 +- 7 files changed, 53 insertions(+), 43 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a2fb8aeb..8bc2947a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,7 +9,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed -- Improve benchmarks: add IO intensive task workload, add task size option, integrate into eslint. +- Improve benchmarks: add IO intensive task workload, add task size option, integrate code into linter. +- Optimize tasks usage lookup implementation. ## [2.3.4] - 2022-10-17 diff --git a/benchmarks/versus-external-pools/BENCH-100000.md b/benchmarks/versus-external-pools/BENCH-100000.md index 6c2aae9c..a8cc6e8d 100644 --- a/benchmarks/versus-external-pools/BENCH-100000.md +++ b/benchmarks/versus-external-pools/BENCH-100000.md @@ -1,13 +1,13 @@ | Command | Mean [s] | Min [s] | Max [s] | Relative | | :--------------------------------------------------- | -------------: | ------: | ------: | ----------: | -| `node dynamic-piscina.js` | 27.917 ± 3.550 | 23.332 | 32.602 | 1.24 ± 0.17 | -| `node fixed-piscina.js` | 24.073 ± 0.603 | 23.245 | 24.980 | 1.07 ± 0.06 | -| `node dynamic-poolifier.js` | 22.571 ± 1.049 | 21.715 | 25.146 | 1.00 | -| `node fixed-poolifier.js` | 23.344 ± 0.508 | 22.659 | 24.247 | 1.03 ± 0.05 | -| `node dynamic-suchmokuo-node-worker-threads-pool.js` | 33.512 ± 1.607 | 32.231 | 37.384 | 1.48 ± 0.10 | -| `node static-suchmokuo-node-worker-threads-pool.js` | 24.475 ± 0.578 | 23.780 | 25.335 | 1.08 ± 0.06 | -| `node threadjs.js` | 92.927 ± 5.314 | 84.503 | 102.706 | 4.12 ± 0.30 | -| `node dynamic-workerpool.js` | 27.519 ± 0.717 | 26.510 | 29.063 | 1.22 ± 0.06 | -| `node fixed-workerpool.js` | 25.787 ± 0.620 | 25.066 | 26.622 | 1.14 ± 0.06 | -| `node fixed-threadwork.js` | 24.481 ± 0.505 | 23.711 | 25.274 | 1.08 ± 0.06 | -| `node fixed-microjob.js` | 41.766 ± 1.275 | 40.694 | 45.133 | 1.85 ± 0.10 | +| `node dynamic-poolifier.js` | 25.315 ± 2.256 | 23.169 | 30.705 | 1.00 | +| `node fixed-poolifier.js` | 26.079 ± 2.875 | 23.087 | 32.154 | 1.03 ± 0.15 | +| `node dynamic-piscina.js` | 25.897 ± 0.570 | 24.997 | 26.914 | 1.02 ± 0.09 | +| `node fixed-piscina.js` | 25.946 ± 0.891 | 24.953 | 27.346 | 1.02 ± 0.10 | +| `node dynamic-workerpool.js` | 29.314 ± 0.897 | 27.839 | 30.370 | 1.16 ± 0.11 | +| `node fixed-workerpool.js` | 28.926 ± 0.964 | 27.866 | 30.583 | 1.14 ± 0.11 | +| `node dynamic-suchmokuo-node-worker-threads-pool.js` | 84.852 ± 1.322 | 83.077 | 86.920 | 3.35 ± 0.30 | +| `node static-suchmokuo-node-worker-threads-pool.js` | 27.519 ± 0.963 | 26.248 | 29.518 | 1.09 ± 0.10 | +| `node threadjs.js` | 88.842 ± 3.197 | 85.012 | 94.289 | 3.51 ± 0.34 | +| `node fixed-threadwork.js` | 27.345 ± 0.920 | 26.167 | 28.972 | 1.08 ± 0.10 | +| `node fixed-microjob.js` | 44.998 ± 0.659 | 44.106 | 46.029 | 1.78 ± 0.16 | diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 346eb6d1..6393fe2e 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -14,9 +14,6 @@ import { } from './selection-strategies/selection-strategies-types' import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context' -const WORKER_NOT_FOUND_TASKS_USAGE_MAP = - 'Worker could not be found in worker tasks usage map' - /** * Base class that implements some shared logic for all poolifier pools. * @@ -210,11 +207,13 @@ export abstract class AbstractPool< public execute (data: Data): Promise { // Configure worker to handle message with the specified task const worker = this.chooseWorker() - const messageId = ++this.nextMessageId - const res = this.internalExecute(worker, messageId) + const res = this.internalExecute(worker, this.nextMessageId) this.checkAndEmitBusy() - data = data ?? ({} as Data) - this.sendToWorker(worker, { data, id: messageId }) + this.sendToWorker(worker, { + data: data ?? ({} as Data), + id: this.nextMessageId + }) + ++this.nextMessageId return res } @@ -370,9 +369,12 @@ export abstract class AbstractPool< if (message.id !== undefined) { const promise = this.promiseMap.get(message.id) if (promise !== undefined) { + if (message.error) { + promise.reject(message.error) + } else { + promise.resolve(message.data as Response) + } this.afterPromiseWorkerResponseHook(message, promise) - if (message.error) promise.reject(message.error) - else promise.resolve(message.data as Response) this.promiseMap.delete(message.id) } } @@ -410,12 +412,10 @@ export abstract class AbstractPool< * @param step Number of running tasks step. */ private stepWorkerRunningTasks (worker: Worker, step: number): void { - const tasksUsage = this.workersTasksUsage.get(worker) - if (tasksUsage !== undefined) { + if (this.checkWorkerTasksUsage(worker) === true) { + const tasksUsage = this.workersTasksUsage.get(worker) as TasksUsage tasksUsage.running = tasksUsage.running + step this.workersTasksUsage.set(worker, tasksUsage) - } else { - throw new Error(WORKER_NOT_FOUND_TASKS_USAGE_MAP) } } @@ -426,12 +426,10 @@ export abstract class AbstractPool< * @param step Number of run tasks step. */ private stepWorkerRunTasks (worker: Worker, step: number): void { - const tasksUsage = this.workersTasksUsage.get(worker) - if (tasksUsage !== undefined) { + if (this.checkWorkerTasksUsage(worker) === true) { + const tasksUsage = this.workersTasksUsage.get(worker) as TasksUsage tasksUsage.run = tasksUsage.run + step this.workersTasksUsage.set(worker, tasksUsage) - } else { - throw new Error(WORKER_NOT_FOUND_TASKS_USAGE_MAP) } } @@ -447,19 +445,30 @@ export abstract class AbstractPool< ): void { if ( this.workerChoiceStrategyContext.getWorkerChoiceStrategy() - .requiredStatistics.runTime === true + .requiredStatistics.runTime === true && + this.checkWorkerTasksUsage(worker) === true ) { - const tasksUsage = this.workersTasksUsage.get(worker) - if (tasksUsage !== undefined) { - tasksUsage.runTime += taskRunTime ?? 0 - if (tasksUsage.run !== 0) { - tasksUsage.avgRunTime = tasksUsage.runTime / tasksUsage.run - } - this.workersTasksUsage.set(worker, tasksUsage) - } else { - throw new Error(WORKER_NOT_FOUND_TASKS_USAGE_MAP) + const tasksUsage = this.workersTasksUsage.get(worker) as TasksUsage + tasksUsage.runTime += taskRunTime ?? 0 + if (tasksUsage.run !== 0) { + tasksUsage.avgRunTime = tasksUsage.runTime / tasksUsage.run } + this.workersTasksUsage.set(worker, tasksUsage) + } + } + + /** + * Checks if the given worker is registered in the workers tasks usage map. + * + * @param worker Worker to check. + * @returns `true` if the worker is registered in the workers tasks usage map. `false` otherwise. + */ + private checkWorkerTasksUsage (worker: Worker): boolean { + const hasTasksUsage = this.workersTasksUsage.has(worker) + if (hasTasksUsage === false) { + throw new Error('Worker could not be found in workers tasks usage map') } + return hasTasksUsage } /** diff --git a/src/pools/cluster/fixed.ts b/src/pools/cluster/fixed.ts index a98cd647..33d7f1c9 100644 --- a/src/pools/cluster/fixed.ts +++ b/src/pools/cluster/fixed.ts @@ -91,7 +91,7 @@ export class FixedClusterPool< /** @inheritDoc */ protected afterWorkerSetup (worker: Worker): void { - // Listen worker messages. + // Listen to worker messages. this.registerWorkerMessageListener(worker, super.workerListener()) } diff --git a/src/pools/thread/fixed.ts b/src/pools/thread/fixed.ts index 5b06ca49..cd414502 100644 --- a/src/pools/thread/fixed.ts +++ b/src/pools/thread/fixed.ts @@ -82,7 +82,7 @@ export class FixedThreadPool< worker.postMessage({ parent: port1 }, [port1]) worker.port1 = port1 worker.port2 = port2 - // Listen worker messages. + // Listen to worker messages. this.registerWorkerMessageListener(worker, super.workerListener()) } diff --git a/src/worker/abstract-worker.ts b/src/worker/abstract-worker.ts index 3090b6f5..c3c5d1bd 100644 --- a/src/worker/abstract-worker.ts +++ b/src/worker/abstract-worker.ts @@ -139,7 +139,7 @@ export abstract class AbstractWorker< protected abstract sendToMainWorker (message: MessageValue): void /** - * Check to see if the worker should be terminated, because its living too long. + * Checks if the worker should be terminated, because its living too long. */ protected checkAlive (): void { if ( diff --git a/tests/pools/abstract/abstract-pool.test.js b/tests/pools/abstract/abstract-pool.test.js index 4224f4ff..bc7bdd1e 100644 --- a/tests/pools/abstract/abstract-pool.test.js +++ b/tests/pools/abstract/abstract-pool.test.js @@ -9,7 +9,7 @@ const { describe('Abstract pool test suite', () => { const numberOfWorkers = 1 const workerNotFoundInTasksUsageMapError = new Error( - 'Worker could not be found in worker tasks usage map' + 'Worker could not be found in workers tasks usage map' ) class StubPoolWithWorkerTasksUsageMapClear extends FixedThreadPool { removeAllWorker () { -- 2.34.1