X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=0d3d21832a715191e6efad9723c6a47454ae2215;hb=a5ed75b7c39de907a0047f4c30f2ea219ca4f917;hp=be1402a7eccf3e9a81d5e7736f61ed7751d1720a;hpb=dc02fc29818293c82727cb43ad5014e30aa6a64d;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index be1402a7..0d3d2183 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -10,6 +10,7 @@ import { DEFAULT_TASK_NAME, DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS, EMPTY_FUNCTION, + isAsyncFunction, isKillBehavior, isPlainObject, median, @@ -64,17 +65,15 @@ export abstract class AbstractPool< public readonly emitter?: PoolEmitter /** - * The execution response promise map. + * The task execution response promise map. * * - `key`: The message id of each submitted task. * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks. * * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id. */ - protected promiseResponseMap: Map< - string, - PromiseResponseWrapper - > = new Map>() + protected promiseResponseMap: Map> = + new Map>() /** * Worker choice strategy context referencing a worker choice algorithm implementation. @@ -116,6 +115,7 @@ export abstract class AbstractPool< this.chooseWorkerNode = this.chooseWorkerNode.bind(this) this.executeTask = this.executeTask.bind(this) this.enqueueTask = this.enqueueTask.bind(this) + this.dequeueTask = this.dequeueTask.bind(this) this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this) if (this.opts.enableEvents === true) { @@ -173,7 +173,15 @@ export abstract class AbstractPool< protected checkDynamicPoolSize (min: number, max: number): void { if (this.type === PoolTypes.dynamic) { - if (min > max) { + if (max == null) { + throw new Error( + 'Cannot instantiate a dynamic pool without specifying the maximum pool size' + ) + } else if (!Number.isSafeInteger(max)) { + throw new TypeError( + 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size' + ) + } else if (min > max) { throw new RangeError( 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size' ) @@ -286,7 +294,7 @@ export abstract class AbstractPool< 0 ) < this.numberOfWorkers ) { - this.createAndSetupWorker() + this.createAndSetupWorkerNode() } } @@ -427,6 +435,9 @@ export abstract class AbstractPool< } } + /** + * The pool readiness boolean status. + */ private get ready (): boolean { return ( this.workerNodes.reduce( @@ -440,7 +451,7 @@ export abstract class AbstractPool< } /** - * Gets the approximate pool utilization. + * The approximate pool utilization. * * @returns The pool utilization. */ @@ -461,38 +472,27 @@ export abstract class AbstractPool< } /** - * Pool type. + * The pool type. * * If it is `'dynamic'`, it provides the `max` property. */ protected abstract get type (): PoolType /** - * Gets the worker type. + * The worker type. */ protected abstract get worker (): WorkerType /** - * Pool minimum size. + * The pool minimum size. */ protected abstract get minSize (): number /** - * Pool maximum size. + * The pool maximum size. */ protected abstract get maxSize (): number - /** - * Get the worker given its id. - * - * @param workerId - The worker id. - * @returns The worker if found in the pool worker nodes, `undefined` otherwise. - */ - private getWorkerById (workerId: number): Worker | undefined { - return this.workerNodes.find(workerNode => workerNode.info.id === workerId) - ?.worker - } - /** * Checks if the worker id sent in the received message from a worker is valid. * @@ -502,7 +502,7 @@ export abstract class AbstractPool< private checkMessageWorkerId (message: MessageValue): void { if ( message.workerId != null && - this.getWorkerById(message.workerId) == null + this.getWorkerNodeKeyByWorkerId(message.workerId) == null ) { throw new Error( `Worker message received from unknown worker '${message.workerId}'` @@ -522,6 +522,20 @@ export abstract class AbstractPool< ) } + /** + * Gets the worker node key given its worker id. + * + * @param workerId - The worker id. + * @returns The worker node key if the worker id is found in the pool worker nodes, `undefined` otherwise. + */ + private getWorkerNodeKeyByWorkerId (workerId: number): number | undefined { + for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) { + if (workerNode.info.id === workerId) { + return workerNodeKey + } + } + } + /** @inheritDoc */ public setWorkerChoiceStrategy ( workerChoiceStrategy: WorkerChoiceStrategy, @@ -535,9 +549,9 @@ export abstract class AbstractPool< if (workerChoiceStrategyOptions != null) { this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions) } - for (const workerNode of this.workerNodes) { + for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) { workerNode.resetUsage() - this.setWorkerStatistics(workerNode.worker) + this.sendWorkerStatisticsMessageToWorker(workerNodeKey) } } @@ -614,37 +628,35 @@ export abstract class AbstractPool< /** @inheritDoc */ public async execute (data?: Data, name?: string): Promise { - const timestamp = performance.now() - const workerNodeKey = this.chooseWorkerNode() - const submittedTask: Task = { - name: name ?? DEFAULT_TASK_NAME, - // eslint-disable-next-line @typescript-eslint/consistent-type-assertions - data: data ?? ({} as Data), - timestamp, - workerId: this.getWorkerInfo(workerNodeKey).id as number, - id: randomUUID() - } - const res = new Promise((resolve, reject) => { - this.promiseResponseMap.set(submittedTask.id as string, { + return await new Promise((resolve, reject) => { + const timestamp = performance.now() + const workerNodeKey = this.chooseWorkerNode() + const task: Task = { + name: name ?? DEFAULT_TASK_NAME, + // eslint-disable-next-line @typescript-eslint/consistent-type-assertions + data: data ?? ({} as Data), + timestamp, + workerId: this.getWorkerInfo(workerNodeKey).id as number, + id: randomUUID() + } + this.promiseResponseMap.set(task.id as string, { resolve, reject, - worker: this.workerNodes[workerNodeKey].worker + workerNodeKey }) + if ( + this.opts.enableTasksQueue === true && + (this.busy || + this.workerNodes[workerNodeKey].usage.tasks.executing >= + ((this.opts.tasksQueueOptions as TasksQueueOptions) + .concurrency as number)) + ) { + this.enqueueTask(workerNodeKey, task) + } else { + this.executeTask(workerNodeKey, task) + } + this.checkAndEmitEvents() }) - if ( - this.opts.enableTasksQueue === true && - (this.busy || - this.workerNodes[workerNodeKey].usage.tasks.executing >= - ((this.opts.tasksQueueOptions as TasksQueueOptions) - .concurrency as number)) - ) { - this.enqueueTask(workerNodeKey, submittedTask) - } else { - this.executeTask(workerNodeKey, submittedTask) - } - this.checkAndEmitEvents() - // eslint-disable-next-line @typescript-eslint/return-await - return res } /** @inheritDoc */ @@ -658,18 +670,20 @@ export abstract class AbstractPool< resolve() }) }) - await this.destroyWorker(workerNode.worker) + await this.destroyWorkerNode(workerNodeKey) await workerExitPromise }) ) } /** - * Terminates the given worker. + * Terminates the worker node given its worker node key. * - * @param worker - A worker within `workerNodes`. + * @param workerNodeKey - The worker node key. */ - protected abstract destroyWorker (worker: Worker): void | Promise + protected abstract destroyWorkerNode ( + workerNodeKey: number + ): void | Promise /** * Setup hook to execute code before worker nodes are created in the abstract constructor. @@ -711,14 +725,13 @@ export abstract class AbstractPool< * Hook executed after the worker task execution. * Can be overridden. * - * @param worker - The worker. + * @param workerNodeKey - The worker node key. * @param message - The received message. */ protected afterTaskExecutionHook ( - worker: Worker, + workerNodeKey: number, message: MessageValue ): void { - const workerNodeKey = this.getWorkerNodeKey(worker) const workerUsage = this.workerNodes[workerNodeKey].usage this.updateTaskStatisticsWorkerUsage(workerUsage, message) this.updateRunTimeWorkerUsage(workerUsage, message) @@ -807,15 +820,15 @@ export abstract class AbstractPool< * * The default worker choice strategy uses a round robin algorithm to distribute the tasks. * - * @returns The worker node key + * @returns The chosen worker node key */ private chooseWorkerNode (): number { if (this.shallCreateDynamicWorker()) { - const worker = this.createAndSetupDynamicWorker() + const workerNodeKey = this.createAndSetupDynamicWorkerNode() if ( this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker ) { - return this.getWorkerNodeKey(worker) + return workerNodeKey } } return this.workerChoiceStrategyContext.execute() @@ -831,13 +844,13 @@ export abstract class AbstractPool< } /** - * Sends a message to the given worker. + * Sends a message to worker given its worker node key. * - * @param worker - The worker which should receive the message. + * @param workerNodeKey - The worker node key. * @param message - The message. */ protected abstract sendToWorker ( - worker: Worker, + workerNodeKey: number, message: MessageValue ): void @@ -849,11 +862,11 @@ export abstract class AbstractPool< protected abstract createWorker (): Worker /** - * Creates a new worker and sets it up completely in the pool worker nodes. + * Creates a new, completely set up worker node. * - * @returns New, completely set up worker. + * @returns New, completely set up worker node key. */ - protected createAndSetupWorker (): Worker { + protected createAndSetupWorkerNode (): number { const worker = this.createWorker() worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION) @@ -862,12 +875,13 @@ export abstract class AbstractPool< const workerNodeKey = this.getWorkerNodeKey(worker) const workerInfo = this.getWorkerInfo(workerNodeKey) workerInfo.ready = false + this.workerNodes[workerNodeKey].closeChannel() this.emitter?.emit(PoolEvents.error, error) if (this.opts.restartWorkerOnError === true && !this.starting) { if (workerInfo.dynamic) { - this.createAndSetupDynamicWorker() + this.createAndSetupDynamicWorkerNode() } else { - this.createAndSetupWorker() + this.createAndSetupWorkerNode() } } if (this.opts.enableTasksQueue === true) { @@ -877,91 +891,120 @@ export abstract class AbstractPool< worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION) worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION) worker.once('exit', () => { - const workerInfo = this.getWorkerInfoByWorker(worker) - if (workerInfo.messageChannel != null) { - workerInfo.messageChannel?.port1.close() - workerInfo.messageChannel?.port1.close() - } this.removeWorkerNode(worker) }) - this.addWorkerNode(worker) + const workerNodeKey = this.addWorkerNode(worker) - this.afterWorkerSetup(worker) + this.afterWorkerNodeSetup(workerNodeKey) - return worker + return workerNodeKey } /** - * Creates a new dynamic worker and sets it up completely in the pool worker nodes. + * Creates a new, completely set up dynamic worker node. * - * @returns New, completely set up dynamic worker. + * @returns New, completely set up dynamic worker node key. */ - protected createAndSetupDynamicWorker (): Worker { - const worker = this.createAndSetupWorker() - this.registerWorkerMessageListener(worker, message => { - const workerNodeKey = this.getWorkerNodeKey(worker) + protected createAndSetupDynamicWorkerNode (): number { + const workerNodeKey = this.createAndSetupWorkerNode() + this.registerWorkerMessageListener(workerNodeKey, message => { + const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId( + message.workerId + ) as number + const workerUsage = this.workerNodes[localWorkerNodeKey].usage if ( isKillBehavior(KillBehaviors.HARD, message.kill) || (message.kill != null && ((this.opts.enableTasksQueue === false && - this.workerNodes[workerNodeKey].usage.tasks.executing === 0) || + workerUsage.tasks.executing === 0) || (this.opts.enableTasksQueue === true && - this.workerNodes[workerNodeKey].usage.tasks.executing === 0 && - this.tasksQueueSize(workerNodeKey) === 0))) + workerUsage.tasks.executing === 0 && + this.tasksQueueSize(localWorkerNodeKey) === 0))) ) { // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime) - void (this.destroyWorker(worker) as Promise) + const destroyWorkerNodeBounded = this.destroyWorkerNode.bind(this) + if (isAsyncFunction(destroyWorkerNodeBounded)) { + ( + destroyWorkerNodeBounded as (workerNodeKey: number) => Promise + )(localWorkerNodeKey).catch(EMPTY_FUNCTION) + } else { + (destroyWorkerNodeBounded as (workerNodeKey: number) => void)( + localWorkerNodeKey + ) + } } }) - const workerInfo = this.getWorkerInfoByWorker(worker) + const workerInfo = this.getWorkerInfo(workerNodeKey) workerInfo.dynamic = true if (this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker) { workerInfo.ready = true } - this.sendToWorker(worker, { + this.sendToWorker(workerNodeKey, { checkActive: true, workerId: workerInfo.id as number }) - return worker + return workerNodeKey } /** - * Registers a listener callback on the given worker. + * Registers a listener callback on the worker given its worker node key. * - * @param worker - The worker which should register a listener. + * @param workerNodeKey - The worker node key. * @param listener - The message listener callback. */ protected abstract registerWorkerMessageListener< Message extends Data | Response - >(worker: Worker, listener: (message: MessageValue) => void): void + >( + workerNodeKey: number, + listener: (message: MessageValue) => void + ): void /** - * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes. + * Method hooked up after a worker node has been newly created. * Can be overridden. * - * @param worker - The newly created worker. + * @param workerNodeKey - The newly created worker node key. */ - protected afterWorkerSetup (worker: Worker): void { + protected afterWorkerNodeSetup (workerNodeKey: number): void { // Listen to worker messages. - this.registerWorkerMessageListener(worker, this.workerListener()) + this.registerWorkerMessageListener(workerNodeKey, this.workerListener()) // Send the startup message to worker. - this.sendStartupMessageToWorker(worker) - // Setup worker task statistics computation. - this.setWorkerStatistics(worker) + this.sendStartupMessageToWorker(workerNodeKey) + // Send the worker statistics message to worker. + this.sendWorkerStatisticsMessageToWorker(workerNodeKey) } /** - * Sends the startup message to the given worker. + * Sends the startup message to worker given its worker node key. * - * @param worker - The worker which should receive the startup message. + * @param workerNodeKey - The worker node key. */ - protected abstract sendStartupMessageToWorker (worker: Worker): void + protected abstract sendStartupMessageToWorker (workerNodeKey: number): void + + /** + * Sends the worker statistics message to worker given its worker node key. + * + * @param workerNodeKey - The worker node key. + */ + private sendWorkerStatisticsMessageToWorker (workerNodeKey: number): void { + this.sendToWorker(workerNodeKey, { + statistics: { + runTime: + this.workerChoiceStrategyContext.getTaskStatisticsRequirements() + .runTime.aggregate, + elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements() + .elu.aggregate + }, + workerId: this.getWorkerInfo(workerNodeKey).id as number + }) + } private redistributeQueuedTasks (workerNodeKey: number): void { while (this.tasksQueueSize(workerNodeKey) > 0) { let targetWorkerNodeKey: number = workerNodeKey let minQueuedTasks = Infinity + let executeTask = false for (const [workerNodeId, workerNode] of this.workerNodes.entries()) { const workerInfo = this.getWorkerInfo(workerNodeId) if ( @@ -969,6 +1012,12 @@ export abstract class AbstractPool< workerInfo.ready && workerNode.usage.tasks.queued === 0 ) { + if ( + this.workerNodes[workerNodeId].usage.tasks.executing < + (this.opts.tasksQueueOptions?.concurrency as number) + ) { + executeTask = true + } targetWorkerNodeKey = workerNodeId break } @@ -981,15 +1030,22 @@ export abstract class AbstractPool< targetWorkerNodeKey = workerNodeId } } - this.enqueueTask( - targetWorkerNodeKey, - this.dequeueTask(workerNodeKey) as Task - ) + if (executeTask) { + this.executeTask( + targetWorkerNodeKey, + this.dequeueTask(workerNodeKey) as Task + ) + } else { + this.enqueueTask( + targetWorkerNodeKey, + this.dequeueTask(workerNodeKey) as Task + ) + } } } /** - * This function is the listener registered for each worker message. + * This method is the listener registered for each worker message. * * @returns The listener function to execute when a message is received from a worker. */ @@ -1007,8 +1063,8 @@ export abstract class AbstractPool< } private handleWorkerReadyResponse (message: MessageValue): void { - this.getWorkerInfoByWorker( - this.getWorkerById(message.workerId) as Worker + this.getWorkerInfo( + this.getWorkerNodeKeyByWorkerId(message.workerId) as number ).ready = message.ready as boolean if (this.emitter != null && this.ready) { this.emitter.emit(PoolEvents.ready, this.info) @@ -1024,9 +1080,9 @@ export abstract class AbstractPool< } else { promiseResponse.resolve(message.data as Response) } - this.afterTaskExecutionHook(promiseResponse.worker, message) + const workerNodeKey = promiseResponse.workerNodeKey + this.afterTaskExecutionHook(workerNodeKey, message) this.promiseResponseMap.delete(message.id as string) - const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker) if ( this.opts.enableTasksQueue === true && this.tasksQueueSize(workerNodeKey) > 0 @@ -1052,24 +1108,12 @@ export abstract class AbstractPool< } /** - * Gets the worker information from the given worker node key. + * Gets the worker information given its worker node key. * * @param workerNodeKey - The worker node key. + * @returns The worker information. */ - private getWorkerInfo (workerNodeKey: number): WorkerInfo { - return this.workerNodes[workerNodeKey].info - } - - /** - * Gets the worker information from the given worker. - * - * @param worker - The worker. - */ - protected getWorkerInfoByWorker (worker: Worker): WorkerInfo { - const workerNodeKey = this.getWorkerNodeKey(worker) - if (workerNodeKey === -1) { - throw new Error('Worker not found') - } + protected getWorkerInfo (workerNodeKey: number): WorkerInfo { return this.workerNodes[workerNodeKey].info } @@ -1077,7 +1121,8 @@ export abstract class AbstractPool< * Adds the given worker in the pool worker nodes. * * @param worker - The worker. - * @returns The worker nodes length. + * @returns The added worker node key. + * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found. */ private addWorkerNode (worker: Worker): number { const workerNode = new WorkerNode(worker, this.worker) @@ -1085,7 +1130,12 @@ export abstract class AbstractPool< if (this.starting) { workerNode.info.ready = true } - return this.workerNodes.push(workerNode) + this.workerNodes.push(workerNode) + const workerNodeKey = this.getWorkerNodeKey(worker) + if (workerNodeKey === -1) { + throw new Error('Worker node not found') + } + return workerNodeKey } /** @@ -1102,14 +1152,14 @@ export abstract class AbstractPool< } /** - * Executes the given task on the given worker. + * Executes the given task on the worker given its worker node key. * - * @param worker - The worker. + * @param workerNodeKey - The worker node key. * @param task - The task to execute. */ private executeTask (workerNodeKey: number, task: Task): void { this.beforeTaskExecutionHook(workerNodeKey, task) - this.sendToWorker(this.workerNodes[workerNodeKey].worker, task) + this.sendToWorker(workerNodeKey, task) } private enqueueTask (workerNodeKey: number, task: Task): number { @@ -1139,17 +1189,4 @@ export abstract class AbstractPool< this.flushTasksQueue(workerNodeKey) } } - - private setWorkerStatistics (worker: Worker): void { - this.sendToWorker(worker, { - statistics: { - runTime: - this.workerChoiceStrategyContext.getTaskStatisticsRequirements() - .runTime.aggregate, - elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements() - .elu.aggregate - }, - workerId: this.getWorkerInfoByWorker(worker).id as number - }) - } }