From 2740a743fcbf64e0ee674530e3cbcf6df710c1ef Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Mon, 3 Apr 2023 13:42:14 +0200 Subject: [PATCH] perf: use worker key instead of worker instance MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- src/pools/abstract-pool.ts | 68 ++++++++++++++++++-------------------- src/pools/pool-internal.ts | 1 + src/utility-types.ts | 15 +++------ 3 files changed, 39 insertions(+), 45 deletions(-) diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 5ed353f6..62def61e 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -1,8 +1,5 @@ import crypto from 'node:crypto' -import type { - MessageValue, - PromiseWorkerResponseWrapper -} from '../utility-types' +import type { MessageValue, PromiseResponseWrapper } from '../utility-types' import { EMPTY_FUNCTION } from '../utils' import { KillBehaviors, isKillBehavior } from '../worker/worker-options' import type { PoolOptions } from './pool' @@ -35,17 +32,15 @@ export abstract class AbstractPool< public readonly emitter?: PoolEmitter /** - * The promise map. + * The promise response map. * - * - `key`: This is the message id of each submitted task. - * - `value`: An object that contains the worker, the resolve function and the reject function. + * - `key`: The message id of each submitted task. + * - `value`: An object that contains the worker key, the promise resolve and reject callbacks. * - * When we receive a message from the worker we get a map entry and resolve/reject the promise based on the message. + * When we receive a message from the worker we get a map entry with the promise resolve/reject bound to the message. */ - protected promiseMap: Map< - string, - PromiseWorkerResponseWrapper - > = new Map>() + protected promiseResponseMap: Map> = + new Map>() /** * Worker choice strategy instance implementing the worker choice algorithm. @@ -142,7 +137,7 @@ export abstract class AbstractPool< /** {@inheritDoc} */ public get numberOfRunningTasks (): number { - return this.promiseMap.size + return this.promiseResponseMap.size } /** @@ -165,7 +160,8 @@ export abstract class AbstractPool< run: 0, running: 0, runTime: 0, - avgRunTime: 0 + avgRunTime: 0, + error: 0 }) } this.workerChoiceStrategyContext.setWorkerChoiceStrategy( @@ -198,7 +194,7 @@ export abstract class AbstractPool< public async execute (data: Data): Promise { const worker = this.chooseWorker() const messageId = crypto.randomUUID() - const res = this.internalExecute(worker, messageId) + const res = this.internalExecute(this.getWorkerKey(worker), messageId) this.checkAndEmitBusy() this.sendToWorker(worker, { // eslint-disable-next-line @typescript-eslint/consistent-type-assertions @@ -242,28 +238,29 @@ export abstract class AbstractPool< * Hook executed before the worker task promise resolution. * Can be overridden. * - * @param worker - The worker. + * @param workerKey - The worker key. */ - protected beforePromiseWorkerResponseHook (worker: Worker): void { - ++(this.getWorkerTasksUsage(worker) as TasksUsage).running + protected beforePromiseResponseHook (workerKey: number): void { + ++this.workers[workerKey].tasksUsage.running } /** * Hook executed after the worker task promise resolution. * Can be overridden. * + * @param workerKey - The worker key. * @param message - The received message. - * @param promise - The Promise response. */ - protected afterPromiseWorkerResponseHook ( - message: MessageValue, - promise: PromiseWorkerResponseWrapper + protected afterPromiseResponseHook ( + workerKey: number, + message: MessageValue ): void { - const workerTasksUsage = this.getWorkerTasksUsage( - promise.worker - ) as TasksUsage + const workerTasksUsage = this.workers[workerKey].tasksUsage --workerTasksUsage.running ++workerTasksUsage.run + if (message.error != null) { + ++workerTasksUsage.error + } if ( this.workerChoiceStrategyContext.getWorkerChoiceStrategy() .requiredStatistics.runTime @@ -351,7 +348,8 @@ export abstract class AbstractPool< run: 0, running: 0, runTime: 0, - avgRunTime: 0 + avgRunTime: 0, + error: 0 }) this.afterWorkerSetup(worker) @@ -367,27 +365,27 @@ export abstract class AbstractPool< protected workerListener (): (message: MessageValue) => void { return message => { if (message.id !== undefined) { - const promise = this.promiseMap.get(message.id) - if (promise !== undefined) { + const promiseResponse = this.promiseResponseMap.get(message.id) + if (promiseResponse !== undefined) { if (message.error != null) { - promise.reject(message.error) + promiseResponse.reject(message.error) } else { - promise.resolve(message.data as Response) + promiseResponse.resolve(message.data as Response) } - this.afterPromiseWorkerResponseHook(message, promise) - this.promiseMap.delete(message.id) + this.afterPromiseResponseHook(promiseResponse.workerKey, message) + this.promiseResponseMap.delete(message.id) } } } } private async internalExecute ( - worker: Worker, + workerKey: number, messageId: string ): Promise { - this.beforePromiseWorkerResponseHook(worker) + this.beforePromiseResponseHook(workerKey) return await new Promise((resolve, reject) => { - this.promiseMap.set(messageId, { resolve, reject, worker }) + this.promiseResponseMap.set(messageId, { resolve, reject, workerKey }) }) } diff --git a/src/pools/pool-internal.ts b/src/pools/pool-internal.ts index 9489d96f..bd75cf4f 100644 --- a/src/pools/pool-internal.ts +++ b/src/pools/pool-internal.ts @@ -17,6 +17,7 @@ export interface TasksUsage { running: number runTime: number avgRunTime: number + error: number } /** diff --git a/src/utility-types.ts b/src/utility-types.ts index 1a660434..54bb7461 100644 --- a/src/utility-types.ts +++ b/src/utility-types.ts @@ -1,6 +1,5 @@ import type { Worker as ClusterWorker } from 'node:cluster' import type { MessagePort } from 'node:worker_threads' -import type { IPoolWorker } from './pools/pool-worker' import type { KillBehavior } from './worker/worker-options' /** @@ -44,15 +43,11 @@ export interface MessageValue< } /** - * An object holding the worker that will be used to resolve/rejects the promise later on. + * An object holding the execution response promise resolve/reject callbacks. * - * @typeParam Worker - Type of worker. - * @typeParam Response - Type of response of execution. This can only be serializable data. + * @typeParam Response - Type of execution response. This can only be serializable data. */ -export interface PromiseWorkerResponseWrapper< - Worker extends IPoolWorker, - Response = unknown -> { +export interface PromiseResponseWrapper { /** * Resolve callback to fulfill the promise. */ @@ -62,7 +57,7 @@ export interface PromiseWorkerResponseWrapper< */ readonly reject: (reason?: string) => void /** - * The worker that has the assigned task. + * The worker handling the promise key . */ - readonly worker: Worker + readonly workerKey: number } -- 2.34.1