X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=91cb5facd01f736d43703d4d0530ec7c01a3bf41;hb=3d76750ac8dacd95003ca89cf9542c3b2a014b8c;hp=10a2d514246748a7d21187f22b6b8400b1697864;hpb=5e972953bf3193a557a14fa794d559b2acc1d6de;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 10a2d514..91cb5fac 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -30,7 +30,6 @@ import { import type { IWorker, IWorkerNode, - MessageHandler, WorkerInfo, WorkerType, WorkerUsage @@ -65,17 +64,15 @@ export abstract class AbstractPool< public readonly emitter?: PoolEmitter /** - * The execution response promise map. + * The task execution response promise map. * * - `key`: The message id of each submitted task. * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks. * * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id. */ - protected promiseResponseMap: Map< - string, - PromiseResponseWrapper - > = new Map>() + protected promiseResponseMap: Map> = + new Map>() /** * Worker choice strategy context referencing a worker choice algorithm implementation. @@ -117,6 +114,7 @@ export abstract class AbstractPool< this.chooseWorkerNode = this.chooseWorkerNode.bind(this) this.executeTask = this.executeTask.bind(this) this.enqueueTask = this.enqueueTask.bind(this) + this.dequeueTask = this.dequeueTask.bind(this) this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this) if (this.opts.enableEvents === true) { @@ -174,13 +172,21 @@ export abstract class AbstractPool< protected checkDynamicPoolSize (min: number, max: number): void { if (this.type === PoolTypes.dynamic) { - if (min > max) { + if (max == null) { + throw new Error( + 'Cannot instantiate a dynamic pool without specifying the maximum pool size' + ) + } else if (!Number.isSafeInteger(max)) { + throw new TypeError( + 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size' + ) + } else if (min > max) { throw new RangeError( 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size' ) } else if (max === 0) { throw new RangeError( - 'Cannot instantiate a dynamic pool with a pool size equal to zero' + 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero' ) } else if (min === max) { throw new RangeError( @@ -287,7 +293,7 @@ export abstract class AbstractPool< 0 ) < this.numberOfWorkers ) { - this.createAndSetupWorker() + this.createAndSetupWorkerNode() } } @@ -328,16 +334,20 @@ export abstract class AbstractPool< accumulator + workerNode.usage.tasks.executing, 0 ), - queuedTasks: this.workerNodes.reduce( - (accumulator, workerNode) => - accumulator + workerNode.usage.tasks.queued, - 0 - ), - maxQueuedTasks: this.workerNodes.reduce( - (accumulator, workerNode) => - accumulator + (workerNode.usage.tasks?.maxQueued ?? 0), - 0 - ), + ...(this.opts.enableTasksQueue === true && { + queuedTasks: this.workerNodes.reduce( + (accumulator, workerNode) => + accumulator + workerNode.usage.tasks.queued, + 0 + ) + }), + ...(this.opts.enableTasksQueue === true && { + maxQueuedTasks: this.workerNodes.reduce( + (accumulator, workerNode) => + accumulator + (workerNode.usage.tasks?.maxQueued ?? 0), + 0 + ) + }), failedTasks: this.workerNodes.reduce( (accumulator, workerNode) => accumulator + workerNode.usage.tasks.failed, @@ -428,6 +438,9 @@ export abstract class AbstractPool< } } + /** + * The pool readiness boolean status. + */ private get ready (): boolean { return ( this.workerNodes.reduce( @@ -441,7 +454,7 @@ export abstract class AbstractPool< } /** - * Gets the approximate pool utilization. + * The approximate pool utilization. * * @returns The pool utilization. */ @@ -462,38 +475,27 @@ export abstract class AbstractPool< } /** - * Pool type. + * The pool type. * * If it is `'dynamic'`, it provides the `max` property. */ protected abstract get type (): PoolType /** - * Gets the worker type. + * The worker type. */ protected abstract get worker (): WorkerType /** - * Pool minimum size. + * The pool minimum size. */ protected abstract get minSize (): number /** - * Pool maximum size. + * The pool maximum size. */ protected abstract get maxSize (): number - /** - * Get the worker given its id. - * - * @param workerId - The worker id. - * @returns The worker if found in the pool worker nodes, `undefined` otherwise. - */ - private getWorkerById (workerId: number): Worker | undefined { - return this.workerNodes.find(workerNode => workerNode.info.id === workerId) - ?.worker - } - /** * Checks if the worker id sent in the received message from a worker is valid. * @@ -503,7 +505,7 @@ export abstract class AbstractPool< private checkMessageWorkerId (message: MessageValue): void { if ( message.workerId != null && - this.getWorkerById(message.workerId) == null + this.getWorkerNodeKeyByWorkerId(message.workerId) === -1 ) { throw new Error( `Worker message received from unknown worker '${message.workerId}'` @@ -517,12 +519,24 @@ export abstract class AbstractPool< * @param worker - The worker. * @returns The worker node key if found in the pool worker nodes, `-1` otherwise. */ - private getWorkerNodeKey (worker: Worker): number { + private getWorkerNodeKeyByWorker (worker: Worker): number { return this.workerNodes.findIndex( workerNode => workerNode.worker === worker ) } + /** + * Gets the worker node key given its worker id. + * + * @param workerId - The worker id. + * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise. + */ + private getWorkerNodeKeyByWorkerId (workerId: number): number { + return this.workerNodes.findIndex( + workerNode => workerNode.info.id === workerId + ) + } + /** @inheritDoc */ public setWorkerChoiceStrategy ( workerChoiceStrategy: WorkerChoiceStrategy, @@ -536,9 +550,9 @@ export abstract class AbstractPool< if (workerChoiceStrategyOptions != null) { this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions) } - for (const workerNode of this.workerNodes) { + for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) { workerNode.resetUsage() - this.setWorkerStatistics(workerNode.worker) + this.sendWorkerStatisticsMessageToWorker(workerNodeKey) } } @@ -601,76 +615,77 @@ export abstract class AbstractPool< protected abstract get busy (): boolean /** - * Whether worker nodes are executing at least one task. + * Whether worker nodes are executing concurrently their tasks quota or not. * * @returns Worker nodes busyness boolean status. */ protected internalBusy (): boolean { - return ( - this.workerNodes.findIndex(workerNode => { - return workerNode.usage.tasks.executing === 0 - }) === -1 - ) + if (this.opts.enableTasksQueue === true) { + return ( + this.workerNodes.findIndex( + workerNode => + workerNode.info.ready && + workerNode.usage.tasks.executing < + (this.opts.tasksQueueOptions?.concurrency as number) + ) === -1 + ) + } else { + return ( + this.workerNodes.findIndex( + workerNode => + workerNode.info.ready && workerNode.usage.tasks.executing === 0 + ) === -1 + ) + } } /** @inheritDoc */ public async execute (data?: Data, name?: string): Promise { - const timestamp = performance.now() - const workerNodeKey = this.chooseWorkerNode() - const submittedTask: Task = { - name: name ?? DEFAULT_TASK_NAME, - // eslint-disable-next-line @typescript-eslint/consistent-type-assertions - data: data ?? ({} as Data), - timestamp, - workerId: this.getWorkerInfo(workerNodeKey).id as number, - id: randomUUID() - } - const res = new Promise((resolve, reject) => { - this.promiseResponseMap.set(submittedTask.id as string, { + return await new Promise((resolve, reject) => { + const timestamp = performance.now() + const workerNodeKey = this.chooseWorkerNode() + const task: Task = { + name: name ?? DEFAULT_TASK_NAME, + // eslint-disable-next-line @typescript-eslint/consistent-type-assertions + data: data ?? ({} as Data), + timestamp, + workerId: this.getWorkerInfo(workerNodeKey).id as number, + taskId: randomUUID() + } + this.promiseResponseMap.set(task.taskId as string, { resolve, reject, - worker: this.workerNodes[workerNodeKey].worker + workerNodeKey }) + if ( + this.opts.enableTasksQueue === false || + (this.opts.enableTasksQueue === true && + this.workerNodes[workerNodeKey].usage.tasks.executing < + (this.opts.tasksQueueOptions?.concurrency as number)) + ) { + this.executeTask(workerNodeKey, task) + } else { + this.enqueueTask(workerNodeKey, task) + } + this.checkAndEmitEvents() }) - if ( - this.opts.enableTasksQueue === true && - (this.busy || - this.workerNodes[workerNodeKey].usage.tasks.executing >= - ((this.opts.tasksQueueOptions as TasksQueueOptions) - .concurrency as number)) - ) { - this.enqueueTask(workerNodeKey, submittedTask) - } else { - this.executeTask(workerNodeKey, submittedTask) - } - this.checkAndEmitEvents() - // eslint-disable-next-line @typescript-eslint/return-await - return res } /** @inheritDoc */ public async destroy (): Promise { await Promise.all( - this.workerNodes.map(async (workerNode, workerNodeKey) => { - this.flushTasksQueue(workerNodeKey) - // FIXME: wait for tasks to be finished - const workerExitPromise = new Promise(resolve => { - workerNode.worker.on('exit', () => { - resolve() - }) - }) - await this.destroyWorker(workerNode.worker) - await workerExitPromise + this.workerNodes.map(async (_, workerNodeKey) => { + await this.destroyWorkerNode(workerNodeKey) }) ) } /** - * Terminates the given worker. + * Terminates the worker node given its worker node key. * - * @param worker - A worker within `workerNodes`. + * @param workerNodeKey - The worker node key. */ - protected abstract destroyWorker (worker: Worker): void | Promise + protected abstract destroyWorkerNode (workerNodeKey: number): Promise /** * Setup hook to execute code before worker nodes are created in the abstract constructor. @@ -712,14 +727,13 @@ export abstract class AbstractPool< * Hook executed after the worker task execution. * Can be overridden. * - * @param worker - The worker. + * @param workerNodeKey - The worker node key. * @param message - The received message. */ protected afterTaskExecutionHook ( - worker: Worker, + workerNodeKey: number, message: MessageValue ): void { - const workerNodeKey = this.getWorkerNodeKey(worker) const workerUsage = this.workerNodes[workerNodeKey].usage this.updateTaskStatisticsWorkerUsage(workerUsage, message) this.updateRunTimeWorkerUsage(workerUsage, message) @@ -808,15 +822,15 @@ export abstract class AbstractPool< * * The default worker choice strategy uses a round robin algorithm to distribute the tasks. * - * @returns The worker node key + * @returns The chosen worker node key */ private chooseWorkerNode (): number { if (this.shallCreateDynamicWorker()) { - const worker = this.createAndSetupDynamicWorker() + const workerNodeKey = this.createAndSetupDynamicWorkerNode() if ( this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker ) { - return this.getWorkerNodeKey(worker) + return workerNodeKey } } return this.workerChoiceStrategyContext.execute() @@ -832,13 +846,13 @@ export abstract class AbstractPool< } /** - * Sends a message to the given worker. + * Sends a message to worker given its worker node key. * - * @param worker - The worker which should receive the message. + * @param workerNodeKey - The worker node key. * @param message - The message. */ protected abstract sendToWorker ( - worker: Worker, + workerNodeKey: number, message: MessageValue ): void @@ -850,25 +864,26 @@ export abstract class AbstractPool< protected abstract createWorker (): Worker /** - * Creates a new worker and sets it up completely in the pool worker nodes. + * Creates a new, completely set up worker node. * - * @returns New, completely set up worker. + * @returns New, completely set up worker node key. */ - protected createAndSetupWorker (): Worker { + protected createAndSetupWorkerNode (): number { const worker = this.createWorker() worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION) worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION) worker.on('error', error => { - const workerNodeKey = this.getWorkerNodeKey(worker) + const workerNodeKey = this.getWorkerNodeKeyByWorker(worker) const workerInfo = this.getWorkerInfo(workerNodeKey) workerInfo.ready = false + this.workerNodes[workerNodeKey].closeChannel() this.emitter?.emit(PoolEvents.error, error) if (this.opts.restartWorkerOnError === true && !this.starting) { if (workerInfo.dynamic) { - this.createAndSetupDynamicWorker() + this.createAndSetupDynamicWorkerNode() } else { - this.createAndSetupWorker() + this.createAndSetupWorkerNode() } } if (this.opts.enableTasksQueue === true) { @@ -881,79 +896,100 @@ export abstract class AbstractPool< this.removeWorkerNode(worker) }) - this.addWorkerNode(worker) + const workerNodeKey = this.addWorkerNode(worker) - this.afterWorkerSetup(worker) + this.afterWorkerNodeSetup(workerNodeKey) - return worker + return workerNodeKey } /** - * Creates a new dynamic worker and sets it up completely in the pool worker nodes. + * Creates a new, completely set up dynamic worker node. * - * @returns New, completely set up dynamic worker. + * @returns New, completely set up dynamic worker node key. */ - protected createAndSetupDynamicWorker (): Worker { - const worker = this.createAndSetupWorker() - this.registerWorkerMessageListener(worker, message => { - const workerNodeKey = this.getWorkerNodeKey(worker) + protected createAndSetupDynamicWorkerNode (): number { + const workerNodeKey = this.createAndSetupWorkerNode() + this.registerWorkerMessageListener(workerNodeKey, message => { + const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId( + message.workerId + ) + const workerUsage = this.workerNodes[localWorkerNodeKey].usage + // Kill message received from worker if ( isKillBehavior(KillBehaviors.HARD, message.kill) || (message.kill != null && ((this.opts.enableTasksQueue === false && - this.workerNodes[workerNodeKey].usage.tasks.executing === 0) || + workerUsage.tasks.executing === 0) || (this.opts.enableTasksQueue === true && - this.workerNodes[workerNodeKey].usage.tasks.executing === 0 && - this.tasksQueueSize(workerNodeKey) === 0))) + workerUsage.tasks.executing === 0 && + this.tasksQueueSize(localWorkerNodeKey) === 0))) ) { - // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime) - void (this.destroyWorker(worker) as Promise) + this.destroyWorkerNode(localWorkerNodeKey).catch(EMPTY_FUNCTION) } }) - const workerInfo = this.getWorkerInfoByWorker(worker) + const workerInfo = this.getWorkerInfo(workerNodeKey) + this.sendToWorker(workerNodeKey, { + checkActive: true, + workerId: workerInfo.id as number + }) workerInfo.dynamic = true if (this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker) { workerInfo.ready = true } - this.sendToWorker(worker, { - checkActive: true, - workerId: workerInfo.id as number - }) - return worker + return workerNodeKey } /** - * Registers a listener callback on the given worker. + * Registers a listener callback on the worker given its worker node key. * - * @param worker - The worker which should register a listener. + * @param workerNodeKey - The worker node key. * @param listener - The message listener callback. */ - private registerWorkerMessageListener( - worker: Worker, + protected abstract registerWorkerMessageListener< + Message extends Data | Response + >( + workerNodeKey: number, listener: (message: MessageValue) => void - ): void { - worker.on('message', listener as MessageHandler) - } + ): void /** - * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes. + * Method hooked up after a worker node has been newly created. * Can be overridden. * - * @param worker - The newly created worker. + * @param workerNodeKey - The newly created worker node key. */ - protected afterWorkerSetup (worker: Worker): void { + protected afterWorkerNodeSetup (workerNodeKey: number): void { // Listen to worker messages. - this.registerWorkerMessageListener(worker, this.workerListener()) - // Send startup message to worker. - this.sendWorkerStartupMessage(worker) - // Setup worker task statistics computation. - this.setWorkerStatistics(worker) + this.registerWorkerMessageListener(workerNodeKey, this.workerListener()) + // Send the startup message to worker. + this.sendStartupMessageToWorker(workerNodeKey) + // Send the worker statistics message to worker. + this.sendWorkerStatisticsMessageToWorker(workerNodeKey) } - private sendWorkerStartupMessage (worker: Worker): void { - this.sendToWorker(worker, { - ready: false, - workerId: this.getWorkerInfoByWorker(worker).id as number + /** + * Sends the startup message to worker given its worker node key. + * + * @param workerNodeKey - The worker node key. + */ + protected abstract sendStartupMessageToWorker (workerNodeKey: number): void + + /** + * Sends the worker statistics message to worker given its worker node key. + * + * @param workerNodeKey - The worker node key. + */ + private sendWorkerStatisticsMessageToWorker (workerNodeKey: number): void { + this.sendToWorker(workerNodeKey, { + statistics: { + runTime: + this.workerChoiceStrategyContext.getTaskStatisticsRequirements() + .runTime.aggregate, + elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements() + .elu.aggregate + }, + workerId: this.getWorkerInfo(workerNodeKey).id as number }) } @@ -961,6 +997,7 @@ export abstract class AbstractPool< while (this.tasksQueueSize(workerNodeKey) > 0) { let targetWorkerNodeKey: number = workerNodeKey let minQueuedTasks = Infinity + let executeTask = false for (const [workerNodeId, workerNode] of this.workerNodes.entries()) { const workerInfo = this.getWorkerInfo(workerNodeId) if ( @@ -968,6 +1005,12 @@ export abstract class AbstractPool< workerInfo.ready && workerNode.usage.tasks.queued === 0 ) { + if ( + this.workerNodes[workerNodeId].usage.tasks.executing < + (this.opts.tasksQueueOptions?.concurrency as number) + ) { + executeTask = true + } targetWorkerNodeKey = workerNodeId break } @@ -980,15 +1023,22 @@ export abstract class AbstractPool< targetWorkerNodeKey = workerNodeId } } - this.enqueueTask( - targetWorkerNodeKey, - this.dequeueTask(workerNodeKey) as Task - ) + if (executeTask) { + this.executeTask( + targetWorkerNodeKey, + this.dequeueTask(workerNodeKey) as Task + ) + } else { + this.enqueueTask( + targetWorkerNodeKey, + this.dequeueTask(workerNodeKey) as Task + ) + } } } /** - * This function is the listener registered for each worker message. + * This method is the listener registered for each worker message. * * @returns The listener function to execute when a message is received from a worker. */ @@ -996,18 +1046,18 @@ export abstract class AbstractPool< return message => { this.checkMessageWorkerId(message) if (message.ready != null) { - // Worker ready response received + // Worker ready response received from worker this.handleWorkerReadyResponse(message) - } else if (message.id != null) { - // Task execution response received + } else if (message.taskId != null) { + // Task execution response received from worker this.handleTaskExecutionResponse(message) } } } private handleWorkerReadyResponse (message: MessageValue): void { - this.getWorkerInfoByWorker( - this.getWorkerById(message.workerId) as Worker + this.getWorkerInfo( + this.getWorkerNodeKeyByWorkerId(message.workerId) ).ready = message.ready as boolean if (this.emitter != null && this.ready) { this.emitter.emit(PoolEvents.ready, this.info) @@ -1015,7 +1065,9 @@ export abstract class AbstractPool< } private handleTaskExecutionResponse (message: MessageValue): void { - const promiseResponse = this.promiseResponseMap.get(message.id as string) + const promiseResponse = this.promiseResponseMap.get( + message.taskId as string + ) if (promiseResponse != null) { if (message.taskError != null) { this.emitter?.emit(PoolEvents.taskError, message.taskError) @@ -1023,12 +1075,14 @@ export abstract class AbstractPool< } else { promiseResponse.resolve(message.data as Response) } - this.afterTaskExecutionHook(promiseResponse.worker, message) - this.promiseResponseMap.delete(message.id as string) - const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker) + const workerNodeKey = promiseResponse.workerNodeKey + this.afterTaskExecutionHook(workerNodeKey, message) + this.promiseResponseMap.delete(message.taskId as string) if ( this.opts.enableTasksQueue === true && - this.tasksQueueSize(workerNodeKey) > 0 + this.tasksQueueSize(workerNodeKey) > 0 && + this.workerNodes[workerNodeKey].usage.tasks.executing < + (this.opts.tasksQueueOptions?.concurrency as number) ) { this.executeTask( workerNodeKey, @@ -1051,28 +1105,21 @@ export abstract class AbstractPool< } /** - * Gets the worker information from the given worker node key. + * Gets the worker information given its worker node key. * * @param workerNodeKey - The worker node key. + * @returns The worker information. */ - private getWorkerInfo (workerNodeKey: number): WorkerInfo { + protected getWorkerInfo (workerNodeKey: number): WorkerInfo { return this.workerNodes[workerNodeKey].info } - /** - * Gets the worker information from the given worker. - * - * @param worker - The worker. - */ - private getWorkerInfoByWorker (worker: Worker): WorkerInfo { - return this.workerNodes[this.getWorkerNodeKey(worker)].info - } - /** * Adds the given worker in the pool worker nodes. * * @param worker - The worker. - * @returns The worker nodes length. + * @returns The added worker node key. + * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found. */ private addWorkerNode (worker: Worker): number { const workerNode = new WorkerNode(worker, this.worker) @@ -1080,7 +1127,12 @@ export abstract class AbstractPool< if (this.starting) { workerNode.info.ready = true } - return this.workerNodes.push(workerNode) + this.workerNodes.push(workerNode) + const workerNodeKey = this.getWorkerNodeKeyByWorker(worker) + if (workerNodeKey === -1) { + throw new Error('Worker node not found') + } + return workerNodeKey } /** @@ -1089,7 +1141,7 @@ export abstract class AbstractPool< * @param worker - The worker. */ private removeWorkerNode (worker: Worker): void { - const workerNodeKey = this.getWorkerNodeKey(worker) + const workerNodeKey = this.getWorkerNodeKeyByWorker(worker) if (workerNodeKey !== -1) { this.workerNodes.splice(workerNodeKey, 1) this.workerChoiceStrategyContext.remove(workerNodeKey) @@ -1097,14 +1149,14 @@ export abstract class AbstractPool< } /** - * Executes the given task on the given worker. + * Executes the given task on the worker given its worker node key. * - * @param worker - The worker. + * @param workerNodeKey - The worker node key. * @param task - The task to execute. */ private executeTask (workerNodeKey: number, task: Task): void { this.beforeTaskExecutionHook(workerNodeKey, task) - this.sendToWorker(this.workerNodes[workerNodeKey].worker, task) + this.sendToWorker(workerNodeKey, task) } private enqueueTask (workerNodeKey: number, task: Task): number { @@ -1119,7 +1171,7 @@ export abstract class AbstractPool< return this.workerNodes[workerNodeKey].tasksQueueSize() } - private flushTasksQueue (workerNodeKey: number): void { + protected flushTasksQueue (workerNodeKey: number): void { while (this.tasksQueueSize(workerNodeKey) > 0) { this.executeTask( workerNodeKey, @@ -1134,17 +1186,4 @@ export abstract class AbstractPool< this.flushTasksQueue(workerNodeKey) } } - - private setWorkerStatistics (worker: Worker): void { - this.sendToWorker(worker, { - statistics: { - runTime: - this.workerChoiceStrategyContext.getTaskStatisticsRequirements() - .runTime.aggregate, - elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements() - .elu.aggregate - }, - workerId: this.getWorkerInfoByWorker(worker).id as number - }) - } }