From aa9eede876376ffbb6d8585192f3865849252f5c Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Fri, 21 Jul 2023 13:48:12 +0200 Subject: [PATCH] perf: drastically reduce worker nodes array lookups MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- CHANGELOG.md | 4 + src/pools/abstract-pool.ts | 216 +++++++++--------- src/pools/cluster/fixed.ts | 22 +- src/pools/pool.ts | 4 +- .../worker-choice-strategy-context.ts | 2 +- src/pools/thread/fixed.ts | 24 +- src/worker/abstract-worker.ts | 2 +- .../worker-choice-strategy-context.test.js | 4 +- 8 files changed, 151 insertions(+), 127 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0ed0c9f6..91530cd6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Changed + +- Drastically reduce lookup by worker in the worker nodes. + ## [2.6.19] - 2023-07-20 ### Added diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 00bfaf8e..086d7ee8 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -284,7 +284,7 @@ export abstract class AbstractPool< 0 ) < this.numberOfWorkers ) { - this.createAndSetupWorker() + this.createAndSetupWorkerNode() } } @@ -425,6 +425,9 @@ export abstract class AbstractPool< } } + /** + * The pool readiness boolean status. + */ private get ready (): boolean { return ( this.workerNodes.reduce( @@ -438,7 +441,7 @@ export abstract class AbstractPool< } /** - * Gets the approximate pool utilization. + * The approximate pool utilization. * * @returns The pool utilization. */ @@ -459,38 +462,27 @@ export abstract class AbstractPool< } /** - * Pool type. + * The pool type. * * If it is `'dynamic'`, it provides the `max` property. */ protected abstract get type (): PoolType /** - * Gets the worker type. + * The worker type. */ protected abstract get worker (): WorkerType /** - * Pool minimum size. + * The pool minimum size. */ protected abstract get minSize (): number /** - * Pool maximum size. + * The pool maximum size. */ protected abstract get maxSize (): number - /** - * Get the worker given its id. - * - * @param workerId - The worker id. - * @returns The worker if found in the pool worker nodes, `undefined` otherwise. - */ - private getWorkerById (workerId: number): Worker | undefined { - return this.workerNodes.find(workerNode => workerNode.info.id === workerId) - ?.worker - } - /** * Checks if the worker id sent in the received message from a worker is valid. * @@ -500,7 +492,7 @@ export abstract class AbstractPool< private checkMessageWorkerId (message: MessageValue): void { if ( message.workerId != null && - this.getWorkerById(message.workerId) == null + this.getWorkerNodeKeyByWorkerId(message.workerId) == null ) { throw new Error( `Worker message received from unknown worker '${message.workerId}'` @@ -514,12 +506,26 @@ export abstract class AbstractPool< * @param worker - The worker. * @returns The worker node key if found in the pool worker nodes, `-1` otherwise. */ - protected getWorkerNodeKey (worker: Worker): number { + private getWorkerNodeKey (worker: Worker): number { return this.workerNodes.findIndex( workerNode => workerNode.worker === worker ) } + /** + * Gets the worker node key given its worker id. + * + * @param workerId - The worker id. + * @returns The worker node key if the worker id is found in the pool worker nodes, `undefined` otherwise. + */ + private getWorkerNodeKeyByWorkerId (workerId: number): number | undefined { + for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) { + if (workerNode.info.id === workerId) { + return workerNodeKey + } + } + } + /** @inheritDoc */ public setWorkerChoiceStrategy ( workerChoiceStrategy: WorkerChoiceStrategy, @@ -533,9 +539,9 @@ export abstract class AbstractPool< if (workerChoiceStrategyOptions != null) { this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions) } - for (const workerNode of this.workerNodes) { + for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) { workerNode.resetUsage() - this.setWorkerStatistics(workerNode.worker) + this.sendWorkerStatisticsMessageToWorker(workerNodeKey) } } @@ -654,18 +660,20 @@ export abstract class AbstractPool< resolve() }) }) - await this.destroyWorker(workerNode.worker) + await this.destroyWorkerNode(workerNodeKey) await workerExitPromise }) ) } /** - * Terminates the given worker. + * Terminates the worker node given its worker node key. * - * @param worker - A worker within `workerNodes`. + * @param workerNodeKey - The worker node key. */ - protected abstract destroyWorker (worker: Worker): void | Promise + protected abstract destroyWorkerNode ( + workerNodeKey: number + ): void | Promise /** * Setup hook to execute code before worker nodes are created in the abstract constructor. @@ -802,15 +810,15 @@ export abstract class AbstractPool< * * The default worker choice strategy uses a round robin algorithm to distribute the tasks. * - * @returns The worker node key + * @returns The chosen worker node key */ private chooseWorkerNode (): number { if (this.shallCreateDynamicWorker()) { - const worker = this.createAndSetupDynamicWorker() + const workerNodeKey = this.createAndSetupDynamicWorkerNode() if ( this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker ) { - return this.getWorkerNodeKey(worker) + return workerNodeKey } } return this.workerChoiceStrategyContext.execute() @@ -826,13 +834,13 @@ export abstract class AbstractPool< } /** - * Sends a message to the given worker. + * Sends a message to worker given its worker node key. * - * @param worker - The worker which should receive the message. + * @param workerNodeKey - The worker node key. * @param message - The message. */ protected abstract sendToWorker ( - worker: Worker, + workerNodeKey: number, message: MessageValue ): void @@ -844,11 +852,11 @@ export abstract class AbstractPool< protected abstract createWorker (): Worker /** - * Creates a new worker and sets it up completely in the pool worker nodes. + * Creates a new, completely set up worker node. * - * @returns New, completely set up worker. + * @returns New, completely set up worker node key. */ - protected createAndSetupWorker (): Worker { + protected createAndSetupWorkerNode (): number { const worker = this.createWorker() worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION) @@ -861,9 +869,9 @@ export abstract class AbstractPool< this.emitter?.emit(PoolEvents.error, error) if (this.opts.restartWorkerOnError === true && !this.starting) { if (workerInfo.dynamic) { - this.createAndSetupDynamicWorker() + this.createAndSetupDynamicWorkerNode() } else { - this.createAndSetupWorker() + this.createAndSetupWorkerNode() } } if (this.opts.enableTasksQueue === true) { @@ -876,78 +884,102 @@ export abstract class AbstractPool< this.removeWorkerNode(worker) }) - this.addWorkerNode(worker) + const workerNodeKey = this.addWorkerNode(worker) - this.afterWorkerSetup(worker) + this.afterWorkerNodeSetup(workerNodeKey) - return worker + return workerNodeKey } /** - * Creates a new dynamic worker and sets it up completely in the pool worker nodes. + * Creates a new, completely set up dynamic worker node. * - * @returns New, completely set up dynamic worker. + * @returns New, completely set up dynamic worker node key. */ - protected createAndSetupDynamicWorker (): Worker { - const worker = this.createAndSetupWorker() - this.registerWorkerMessageListener(worker, message => { - const workerNodeKey = this.getWorkerNodeKey(worker) + protected createAndSetupDynamicWorkerNode (): number { + const workerNodeKey = this.createAndSetupWorkerNode() + this.registerWorkerMessageListener(workerNodeKey, message => { + const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId( + message.workerId + ) as number + const workerUsage = this.workerNodes[localWorkerNodeKey].usage if ( isKillBehavior(KillBehaviors.HARD, message.kill) || (message.kill != null && ((this.opts.enableTasksQueue === false && - this.workerNodes[workerNodeKey].usage.tasks.executing === 0) || + workerUsage.tasks.executing === 0) || (this.opts.enableTasksQueue === true && - this.workerNodes[workerNodeKey].usage.tasks.executing === 0 && - this.tasksQueueSize(workerNodeKey) === 0))) + workerUsage.tasks.executing === 0 && + this.tasksQueueSize(localWorkerNodeKey) === 0))) ) { // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime) - void (this.destroyWorker(worker) as Promise) + void (this.destroyWorkerNode(localWorkerNodeKey) as Promise) } }) - const workerInfo = this.getWorkerInfoByWorker(worker) + const workerInfo = this.getWorkerInfo(workerNodeKey) workerInfo.dynamic = true if (this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker) { workerInfo.ready = true } - this.sendToWorker(worker, { + this.sendToWorker(workerNodeKey, { checkActive: true, workerId: workerInfo.id as number }) - return worker + return workerNodeKey } /** - * Registers a listener callback on the given worker. + * Registers a listener callback on the worker given its worker node key. * - * @param worker - The worker which should register a listener. + * @param workerNodeKey - The worker node key. * @param listener - The message listener callback. */ protected abstract registerWorkerMessageListener< Message extends Data | Response - >(worker: Worker, listener: (message: MessageValue) => void): void + >( + workerNodeKey: number, + listener: (message: MessageValue) => void + ): void /** - * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes. + * Method hooked up after a worker node has been newly created. * Can be overridden. * - * @param worker - The newly created worker. + * @param workerNodeKey - The newly created worker node key. */ - protected afterWorkerSetup (worker: Worker): void { + protected afterWorkerNodeSetup (workerNodeKey: number): void { // Listen to worker messages. - this.registerWorkerMessageListener(worker, this.workerListener()) + this.registerWorkerMessageListener(workerNodeKey, this.workerListener()) // Send the startup message to worker. - this.sendStartupMessageToWorker(worker) - // Setup worker task statistics computation. - this.setWorkerStatistics(worker) + this.sendStartupMessageToWorker(workerNodeKey) + // Send the worker statistics message to worker. + this.sendWorkerStatisticsMessageToWorker(workerNodeKey) } /** - * Sends the startup message to the given worker. + * Sends the startup message to worker given its worker node key. + * + * @param workerNodeKey - The worker node key. + */ + protected abstract sendStartupMessageToWorker (workerNodeKey: number): void + + /** + * Sends the worker statistics message to worker given its worker node key. * - * @param worker - The worker which should receive the startup message. + * @param workerNodeKey - The worker node key. */ - protected abstract sendStartupMessageToWorker (worker: Worker): void + private sendWorkerStatisticsMessageToWorker (workerNodeKey: number): void { + this.sendToWorker(workerNodeKey, { + statistics: { + runTime: + this.workerChoiceStrategyContext.getTaskStatisticsRequirements() + .runTime.aggregate, + elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements() + .elu.aggregate + }, + workerId: this.getWorkerInfo(workerNodeKey).id as number + }) + } private redistributeQueuedTasks (workerNodeKey: number): void { while (this.tasksQueueSize(workerNodeKey) > 0) { @@ -980,7 +1012,7 @@ export abstract class AbstractPool< } /** - * This function is the listener registered for each worker message. + * This method is the listener registered for each worker message. * * @returns The listener function to execute when a message is received from a worker. */ @@ -998,8 +1030,8 @@ export abstract class AbstractPool< } private handleWorkerReadyResponse (message: MessageValue): void { - this.getWorkerInfoByWorker( - this.getWorkerById(message.workerId) as Worker + this.getWorkerInfo( + this.getWorkerNodeKeyByWorkerId(message.workerId) as number ).ready = message.ready as boolean if (this.emitter != null && this.ready) { this.emitter.emit(PoolEvents.ready, this.info) @@ -1043,27 +1075,12 @@ export abstract class AbstractPool< } /** - * Gets the worker information from the given worker node key. + * Gets the worker information given its worker node key. * * @param workerNodeKey - The worker node key. * @returns The worker information. */ - private getWorkerInfo (workerNodeKey: number): WorkerInfo { - return this.workerNodes[workerNodeKey].info - } - - /** - * Gets the worker information from the given worker. - * - * @param worker - The worker. - * @returns The worker information. - * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker is not found. - */ - protected getWorkerInfoByWorker (worker: Worker): WorkerInfo { - const workerNodeKey = this.getWorkerNodeKey(worker) - if (workerNodeKey === -1) { - throw new Error('Worker not found') - } + protected getWorkerInfo (workerNodeKey: number): WorkerInfo { return this.workerNodes[workerNodeKey].info } @@ -1071,7 +1088,8 @@ export abstract class AbstractPool< * Adds the given worker in the pool worker nodes. * * @param worker - The worker. - * @returns The worker nodes length. + * @returns The added worker node key. + * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found. */ private addWorkerNode (worker: Worker): number { const workerNode = new WorkerNode(worker, this.worker) @@ -1079,7 +1097,12 @@ export abstract class AbstractPool< if (this.starting) { workerNode.info.ready = true } - return this.workerNodes.push(workerNode) + this.workerNodes.push(workerNode) + const workerNodeKey = this.getWorkerNodeKey(worker) + if (workerNodeKey === -1) { + throw new Error('Worker node not found') + } + return workerNodeKey } /** @@ -1096,14 +1119,14 @@ export abstract class AbstractPool< } /** - * Executes the given task on the given worker. + * Executes the given task on the worker given its worker node key. * - * @param worker - The worker. + * @param workerNodeKey - The worker node key. * @param task - The task to execute. */ private executeTask (workerNodeKey: number, task: Task): void { this.beforeTaskExecutionHook(workerNodeKey, task) - this.sendToWorker(this.workerNodes[workerNodeKey].worker, task) + this.sendToWorker(workerNodeKey, task) } private enqueueTask (workerNodeKey: number, task: Task): number { @@ -1133,17 +1156,4 @@ export abstract class AbstractPool< this.flushTasksQueue(workerNodeKey) } } - - private setWorkerStatistics (worker: Worker): void { - this.sendToWorker(worker, { - statistics: { - runTime: - this.workerChoiceStrategyContext.getTaskStatisticsRequirements() - .runTime.aggregate, - elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements() - .elu.aggregate - }, - workerId: this.getWorkerInfoByWorker(worker).id as number - }) - } } diff --git a/src/pools/cluster/fixed.ts b/src/pools/cluster/fixed.ts index dbf1d470..eadbfcfe 100644 --- a/src/pools/cluster/fixed.ts +++ b/src/pools/cluster/fixed.ts @@ -60,8 +60,9 @@ export class FixedClusterPool< } /** @inheritDoc */ - protected destroyWorker (worker: Worker): void { - this.sendToWorker(worker, { kill: true, workerId: worker.id }) + protected destroyWorkerNode (workerNodeKey: number): void { + const worker = this.workerNodes[workerNodeKey].worker + this.sendToWorker(workerNodeKey, { kill: true, workerId: worker.id }) worker.on('disconnect', () => { worker.kill() }) @@ -69,24 +70,27 @@ export class FixedClusterPool< } /** @inheritDoc */ - protected sendToWorker (worker: Worker, message: MessageValue): void { - worker.send(message) + protected sendToWorker ( + workerNodeKey: number, + message: MessageValue + ): void { + this.workerNodes[workerNodeKey].worker.send(message) } /** @inheritDoc */ - protected sendStartupMessageToWorker (worker: Worker): void { - this.sendToWorker(worker, { + protected sendStartupMessageToWorker (workerNodeKey: number): void { + this.sendToWorker(workerNodeKey, { ready: false, - workerId: worker.id + workerId: this.workerNodes[workerNodeKey].worker.id }) } /** @inheritDoc */ protected registerWorkerMessageListener( - worker: Worker, + workerNodeKey: number, listener: (message: MessageValue) => void ): void { - worker.on('message', listener) + this.workerNodes[workerNodeKey].worker.on('message', listener) } /** @inheritDoc */ diff --git a/src/pools/pool.ts b/src/pools/pool.ts index 4a0ab725..207a8598 100644 --- a/src/pools/pool.ts +++ b/src/pools/pool.ts @@ -64,7 +64,7 @@ export interface PoolInfo { readonly strategy: WorkerChoiceStrategy readonly minSize: number readonly maxSize: number - /** Pool utilization ratio. */ + /** Pool utilization. */ readonly utilization?: number /** Pool total worker nodes. */ readonly workerNodes: number @@ -198,7 +198,7 @@ export interface IPool< */ readonly execute: (data?: Data, name?: string) => Promise /** - * Terminates every current worker in this pool. + * Terminates all workers in this pool. */ readonly destroy: () => Promise /** diff --git a/src/pools/selection-strategies/worker-choice-strategy-context.ts b/src/pools/selection-strategies/worker-choice-strategy-context.ts index 28e3af08..122774dd 100644 --- a/src/pools/selection-strategies/worker-choice-strategy-context.ts +++ b/src/pools/selection-strategies/worker-choice-strategy-context.ts @@ -171,7 +171,7 @@ export class WorkerChoiceStrategyContext< ) as IWorkerChoiceStrategy ).choose() if (workerNodeKey == null) { - throw new Error('Worker node key chosen is null or undefined') + throw new TypeError('Worker node key chosen is null or undefined') } return workerNodeKey } diff --git a/src/pools/thread/fixed.ts b/src/pools/thread/fixed.ts index bef82826..f4f712df 100644 --- a/src/pools/thread/fixed.ts +++ b/src/pools/thread/fixed.ts @@ -56,23 +56,29 @@ export class FixedThreadPool< } /** @inheritDoc */ - protected async destroyWorker (worker: Worker): Promise { - this.sendToWorker(worker, { kill: true, workerId: worker.threadId }) - this.workerNodes[this.getWorkerNodeKey(worker)].closeChannel() + protected async destroyWorkerNode (workerNodeKey: number): Promise { + const workerNode = this.workerNodes[workerNodeKey] + const worker = workerNode.worker + this.sendToWorker(workerNodeKey, { kill: true, workerId: worker.threadId }) + workerNode.closeChannel() await worker.terminate() } /** @inheritDoc */ - protected sendToWorker (worker: Worker, message: MessageValue): void { + protected sendToWorker ( + workerNodeKey: number, + message: MessageValue + ): void { ( - this.getWorkerInfoByWorker(worker).messageChannel as MessageChannel + this.getWorkerInfo(workerNodeKey).messageChannel as MessageChannel ).port1.postMessage(message) } /** @inheritDoc */ - protected sendStartupMessageToWorker (worker: Worker): void { + protected sendStartupMessageToWorker (workerNodeKey: number): void { + const worker = this.workerNodes[workerNodeKey].worker const port2: MessagePort = ( - this.getWorkerInfoByWorker(worker).messageChannel as MessageChannel + this.getWorkerInfo(workerNodeKey).messageChannel as MessageChannel ).port2 worker.postMessage( { @@ -86,11 +92,11 @@ export class FixedThreadPool< /** @inheritDoc */ protected registerWorkerMessageListener( - worker: Worker, + workerNodeKey: number, listener: (message: MessageValue) => void ): void { ( - this.getWorkerInfoByWorker(worker).messageChannel as MessageChannel + this.getWorkerInfo(workerNodeKey).messageChannel as MessageChannel ).port1.on('message', listener) } diff --git a/src/worker/abstract-worker.ts b/src/worker/abstract-worker.ts index 0f1d442a..0f0e0a39 100644 --- a/src/worker/abstract-worker.ts +++ b/src/worker/abstract-worker.ts @@ -373,7 +373,7 @@ export abstract class AbstractWorker< } /** - * Sends a message to the main worker. + * Sends a message to main worker. * * @param message - The response message. */ diff --git a/tests/pools/selection-strategies/worker-choice-strategy-context.test.js b/tests/pools/selection-strategies/worker-choice-strategy-context.test.js index 2070f8e0..808a44cb 100644 --- a/tests/pools/selection-strategies/worker-choice-strategy-context.test.js +++ b/tests/pools/selection-strategies/worker-choice-strategy-context.test.js @@ -115,14 +115,14 @@ describe('Worker choice strategy context test suite', () => { WorkerChoiceStrategyUndefinedStub ) expect(() => workerChoiceStrategyContext.execute()).toThrowError( - new Error('Worker node key chosen is null or undefined') + new TypeError('Worker node key chosen is null or undefined') ) workerChoiceStrategyContext.workerChoiceStrategies.set( workerChoiceStrategyContext.workerChoiceStrategy, WorkerChoiceStrategyNullStub ) expect(() => workerChoiceStrategyContext.execute()).toThrowError( - new Error('Worker node key chosen is null or undefined') + new TypeError('Worker node key chosen is null or undefined') ) }) -- 2.34.1