X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=eb2c2f7818a687ac8f8016eb27489895cadfaaee;hb=671d515455c745dc74f4c385fe23683975bfc3df;hp=086d7ee82550bc40c46781b83616c1639b32155b;hpb=aa9eede876376ffbb6d8585192f3865849252f5c;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 086d7ee8..eb2c2f78 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -1,6 +1,7 @@ import { randomUUID } from 'node:crypto' import { performance } from 'node:perf_hooks' import { existsSync } from 'node:fs' +import { type TransferListItem } from 'node:worker_threads' import type { MessageValue, PromiseResponseWrapper, @@ -105,7 +106,9 @@ export abstract class AbstractPool< protected readonly opts: PoolOptions ) { if (!this.isMain()) { - throw new Error('Cannot start a pool from a worker!') + throw new Error( + 'Cannot start a pool from a worker with the same type as the pool' + ) } this.checkNumberOfWorkers(this.numberOfWorkers) this.checkFilePath(this.filePath) @@ -114,6 +117,7 @@ export abstract class AbstractPool< this.chooseWorkerNode = this.chooseWorkerNode.bind(this) this.executeTask = this.executeTask.bind(this) this.enqueueTask = this.enqueueTask.bind(this) + this.dequeueTask = this.dequeueTask.bind(this) this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this) if (this.opts.enableEvents === true) { @@ -171,13 +175,21 @@ export abstract class AbstractPool< protected checkDynamicPoolSize (min: number, max: number): void { if (this.type === PoolTypes.dynamic) { - if (min > max) { + if (max == null) { + throw new Error( + 'Cannot instantiate a dynamic pool without specifying the maximum pool size' + ) + } else if (!Number.isSafeInteger(max)) { + throw new TypeError( + 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size' + ) + } else if (min > max) { throw new RangeError( 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size' ) } else if (max === 0) { throw new RangeError( - 'Cannot instantiate a dynamic pool with a pool size equal to zero' + 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero' ) } else if (min === max) { throw new RangeError( @@ -325,16 +337,20 @@ export abstract class AbstractPool< accumulator + workerNode.usage.tasks.executing, 0 ), - queuedTasks: this.workerNodes.reduce( - (accumulator, workerNode) => - accumulator + workerNode.usage.tasks.queued, - 0 - ), - maxQueuedTasks: this.workerNodes.reduce( - (accumulator, workerNode) => - accumulator + (workerNode.usage.tasks?.maxQueued ?? 0), - 0 - ), + ...(this.opts.enableTasksQueue === true && { + queuedTasks: this.workerNodes.reduce( + (accumulator, workerNode) => + accumulator + workerNode.usage.tasks.queued, + 0 + ) + }), + ...(this.opts.enableTasksQueue === true && { + maxQueuedTasks: this.workerNodes.reduce( + (accumulator, workerNode) => + accumulator + (workerNode.usage.tasks?.maxQueued ?? 0), + 0 + ) + }), failedTasks: this.workerNodes.reduce( (accumulator, workerNode) => accumulator + workerNode.usage.tasks.failed, @@ -346,14 +362,14 @@ export abstract class AbstractPool< minimum: round( Math.min( ...this.workerNodes.map( - workerNode => workerNode.usage.runTime?.minimum ?? Infinity + (workerNode) => workerNode.usage.runTime?.minimum ?? Infinity ) ) ), maximum: round( Math.max( ...this.workerNodes.map( - workerNode => workerNode.usage.runTime?.maximum ?? -Infinity + (workerNode) => workerNode.usage.runTime?.maximum ?? -Infinity ) ) ), @@ -374,7 +390,7 @@ export abstract class AbstractPool< median: round( median( this.workerNodes.map( - workerNode => workerNode.usage.runTime?.median ?? 0 + (workerNode) => workerNode.usage.runTime?.median ?? 0 ) ) ) @@ -387,14 +403,14 @@ export abstract class AbstractPool< minimum: round( Math.min( ...this.workerNodes.map( - workerNode => workerNode.usage.waitTime?.minimum ?? Infinity + (workerNode) => workerNode.usage.waitTime?.minimum ?? Infinity ) ) ), maximum: round( Math.max( ...this.workerNodes.map( - workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity + (workerNode) => workerNode.usage.waitTime?.maximum ?? -Infinity ) ) ), @@ -415,7 +431,7 @@ export abstract class AbstractPool< median: round( median( this.workerNodes.map( - workerNode => workerNode.usage.waitTime?.median ?? 0 + (workerNode) => workerNode.usage.waitTime?.median ?? 0 ) ) ) @@ -490,9 +506,11 @@ export abstract class AbstractPool< * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid. */ private checkMessageWorkerId (message: MessageValue): void { - if ( + if (message.workerId == null) { + throw new Error('Worker message received without worker id') + } else if ( message.workerId != null && - this.getWorkerNodeKeyByWorkerId(message.workerId) == null + this.getWorkerNodeKeyByWorkerId(message.workerId) === -1 ) { throw new Error( `Worker message received from unknown worker '${message.workerId}'` @@ -506,9 +524,9 @@ export abstract class AbstractPool< * @param worker - The worker. * @returns The worker node key if found in the pool worker nodes, `-1` otherwise. */ - private getWorkerNodeKey (worker: Worker): number { + private getWorkerNodeKeyByWorker (worker: Worker): number { return this.workerNodes.findIndex( - workerNode => workerNode.worker === worker + (workerNode) => workerNode.worker === worker ) } @@ -516,14 +534,12 @@ export abstract class AbstractPool< * Gets the worker node key given its worker id. * * @param workerId - The worker id. - * @returns The worker node key if the worker id is found in the pool worker nodes, `undefined` otherwise. + * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise. */ - private getWorkerNodeKeyByWorkerId (workerId: number): number | undefined { - for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) { - if (workerNode.info.id === workerId) { - return workerNodeKey - } - } + private getWorkerNodeKeyByWorkerId (workerId: number): number { + return this.workerNodes.findIndex( + (workerNode) => workerNode.info.id === workerId + ) } /** @inheritDoc */ @@ -541,7 +557,7 @@ export abstract class AbstractPool< } for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) { workerNode.resetUsage() - this.sendWorkerStatisticsMessageToWorker(workerNodeKey) + this.sendStatisticsMessageToWorker(workerNodeKey) } } @@ -604,46 +620,98 @@ export abstract class AbstractPool< protected abstract get busy (): boolean /** - * Whether worker nodes are executing at least one task. + * Whether worker nodes are executing concurrently their tasks quota or not. * * @returns Worker nodes busyness boolean status. */ protected internalBusy (): boolean { - return ( - this.workerNodes.findIndex(workerNode => { - return workerNode.usage.tasks.executing === 0 - }) === -1 - ) + if (this.opts.enableTasksQueue === true) { + return ( + this.workerNodes.findIndex( + (workerNode) => + workerNode.info.ready && + workerNode.usage.tasks.executing < + (this.opts.tasksQueueOptions?.concurrency as number) + ) === -1 + ) + } else { + return ( + this.workerNodes.findIndex( + (workerNode) => + workerNode.info.ready && workerNode.usage.tasks.executing === 0 + ) === -1 + ) + } + } + + /** @inheritDoc */ + public listTaskFunctions (): string[] { + for (const workerNode of this.workerNodes) { + if ( + Array.isArray(workerNode.info.taskFunctions) && + workerNode.info.taskFunctions.length > 0 + ) { + return workerNode.info.taskFunctions + } + } + return [] } /** @inheritDoc */ - public async execute (data?: Data, name?: string): Promise { + public async execute ( + data?: Data, + name?: string, + transferList?: TransferListItem[] + ): Promise { return await new Promise((resolve, reject) => { + if (name != null && typeof name !== 'string') { + reject(new TypeError('name argument must be a string')) + } + if ( + name != null && + typeof name === 'string' && + name.trim().length === 0 + ) { + reject(new TypeError('name argument must not be an empty string')) + } + if (transferList != null && !Array.isArray(transferList)) { + reject(new TypeError('transferList argument must be an array')) + } const timestamp = performance.now() const workerNodeKey = this.chooseWorkerNode() + const workerInfo = this.getWorkerInfo(workerNodeKey) + if ( + name != null && + Array.isArray(workerInfo.taskFunctions) && + !workerInfo.taskFunctions.includes(name) + ) { + reject( + new Error(`Task function '${name}' is not registered in the pool`) + ) + } const task: Task = { name: name ?? DEFAULT_TASK_NAME, // eslint-disable-next-line @typescript-eslint/consistent-type-assertions data: data ?? ({} as Data), + transferList, timestamp, - workerId: this.getWorkerInfo(workerNodeKey).id as number, - id: randomUUID() + workerId: workerInfo.id as number, + taskId: randomUUID() } - this.promiseResponseMap.set(task.id as string, { + this.promiseResponseMap.set(task.taskId as string, { resolve, reject, workerNodeKey }) if ( - this.opts.enableTasksQueue === true && - (this.busy || - this.workerNodes[workerNodeKey].usage.tasks.executing >= - ((this.opts.tasksQueueOptions as TasksQueueOptions) - .concurrency as number)) + this.opts.enableTasksQueue === false || + (this.opts.enableTasksQueue === true && + this.workerNodes[workerNodeKey].usage.tasks.executing < + (this.opts.tasksQueueOptions?.concurrency as number)) ) { - this.enqueueTask(workerNodeKey, task) - } else { this.executeTask(workerNodeKey, task) + } else { + this.enqueueTask(workerNodeKey, task) } this.checkAndEmitEvents() }) @@ -652,18 +720,27 @@ export abstract class AbstractPool< /** @inheritDoc */ public async destroy (): Promise { await Promise.all( - this.workerNodes.map(async (workerNode, workerNodeKey) => { - this.flushTasksQueue(workerNodeKey) - // FIXME: wait for tasks to be finished - const workerExitPromise = new Promise(resolve => { - workerNode.worker.on('exit', () => { - resolve() - }) - }) + this.workerNodes.map(async (_, workerNodeKey) => { await this.destroyWorkerNode(workerNodeKey) - await workerExitPromise }) ) + this.emitter?.emit(PoolEvents.destroy) + } + + protected async sendKillMessageToWorker ( + workerNodeKey: number, + workerId: number + ): Promise { + await new Promise((resolve, reject) => { + this.registerWorkerMessageListener(workerNodeKey, (message) => { + if (message.kill === 'success') { + resolve() + } else if (message.kill === 'failure') { + reject(new Error(`Worker ${workerId} kill message handling failed`)) + } + }) + this.sendToWorker(workerNodeKey, { kill: true, workerId }) + }) } /** @@ -671,9 +748,7 @@ export abstract class AbstractPool< * * @param workerNodeKey - The worker node key. */ - protected abstract destroyWorkerNode ( - workerNodeKey: number - ): void | Promise + protected abstract destroyWorkerNode (workerNodeKey: number): Promise /** * Setup hook to execute code before worker nodes are created in the abstract constructor. @@ -704,11 +779,13 @@ export abstract class AbstractPool< const workerUsage = this.workerNodes[workerNodeKey].usage ++workerUsage.tasks.executing this.updateWaitTimeWorkerUsage(workerUsage, task) - const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage( - task.name as string - ) as WorkerUsage - ++taskWorkerUsage.tasks.executing - this.updateWaitTimeWorkerUsage(taskWorkerUsage, task) + if (this.canUpdateTaskWorkerUsage(workerNodeKey)) { + const taskWorkerUsage = this.workerNodes[ + workerNodeKey + ].getTaskWorkerUsage(task.name as string) as WorkerUsage + ++taskWorkerUsage.tasks.executing + this.updateWaitTimeWorkerUsage(taskWorkerUsage, task) + } } /** @@ -726,12 +803,24 @@ export abstract class AbstractPool< this.updateTaskStatisticsWorkerUsage(workerUsage, message) this.updateRunTimeWorkerUsage(workerUsage, message) this.updateEluWorkerUsage(workerUsage, message) - const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage( - message.taskPerformance?.name ?? DEFAULT_TASK_NAME - ) as WorkerUsage - this.updateTaskStatisticsWorkerUsage(taskWorkerUsage, message) - this.updateRunTimeWorkerUsage(taskWorkerUsage, message) - this.updateEluWorkerUsage(taskWorkerUsage, message) + if (this.canUpdateTaskWorkerUsage(workerNodeKey)) { + const taskWorkerUsage = this.workerNodes[ + workerNodeKey + ].getTaskWorkerUsage( + message.taskPerformance?.name ?? DEFAULT_TASK_NAME + ) as WorkerUsage + this.updateTaskStatisticsWorkerUsage(taskWorkerUsage, message) + this.updateRunTimeWorkerUsage(taskWorkerUsage, message) + this.updateEluWorkerUsage(taskWorkerUsage, message) + } + } + + private canUpdateTaskWorkerUsage (workerNodeKey: number): boolean { + const workerInfo = this.getWorkerInfo(workerNodeKey) + return ( + Array.isArray(workerInfo.taskFunctions) && + workerInfo.taskFunctions.length > 1 + ) } private updateTaskStatisticsWorkerUsage ( @@ -838,10 +927,12 @@ export abstract class AbstractPool< * * @param workerNodeKey - The worker node key. * @param message - The message. + * @param transferList - The optional array of transferable objects. */ protected abstract sendToWorker ( workerNodeKey: number, - message: MessageValue + message: MessageValue, + transferList?: TransferListItem[] ): void /** @@ -859,10 +950,11 @@ export abstract class AbstractPool< protected createAndSetupWorkerNode (): number { const worker = this.createWorker() + worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION) worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION) worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION) - worker.on('error', error => { - const workerNodeKey = this.getWorkerNodeKey(worker) + worker.on('error', (error) => { + const workerNodeKey = this.getWorkerNodeKeyByWorker(worker) const workerInfo = this.getWorkerInfo(workerNodeKey) workerInfo.ready = false this.workerNodes[workerNodeKey].closeChannel() @@ -878,7 +970,6 @@ export abstract class AbstractPool< this.redistributeQueuedTasks(workerNodeKey) } }) - worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION) worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION) worker.once('exit', () => { this.removeWorkerNode(worker) @@ -898,33 +989,35 @@ export abstract class AbstractPool< */ protected createAndSetupDynamicWorkerNode (): number { const workerNodeKey = this.createAndSetupWorkerNode() - this.registerWorkerMessageListener(workerNodeKey, message => { + this.registerWorkerMessageListener(workerNodeKey, (message) => { const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId( message.workerId - ) as number + ) const workerUsage = this.workerNodes[localWorkerNodeKey].usage + // Kill message received from worker if ( isKillBehavior(KillBehaviors.HARD, message.kill) || - (message.kill != null && + (isKillBehavior(KillBehaviors.SOFT, message.kill) && ((this.opts.enableTasksQueue === false && workerUsage.tasks.executing === 0) || (this.opts.enableTasksQueue === true && workerUsage.tasks.executing === 0 && this.tasksQueueSize(localWorkerNodeKey) === 0))) ) { - // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime) - void (this.destroyWorkerNode(localWorkerNodeKey) as Promise) + this.destroyWorkerNode(localWorkerNodeKey).catch((error) => { + this.emitter?.emit(PoolEvents.error, error) + }) } }) const workerInfo = this.getWorkerInfo(workerNodeKey) - workerInfo.dynamic = true - if (this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker) { - workerInfo.ready = true - } this.sendToWorker(workerNodeKey, { checkActive: true, workerId: workerInfo.id as number }) + workerInfo.dynamic = true + if (this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker) { + workerInfo.ready = true + } return workerNodeKey } @@ -952,8 +1045,8 @@ export abstract class AbstractPool< this.registerWorkerMessageListener(workerNodeKey, this.workerListener()) // Send the startup message to worker. this.sendStartupMessageToWorker(workerNodeKey) - // Send the worker statistics message to worker. - this.sendWorkerStatisticsMessageToWorker(workerNodeKey) + // Send the statistics message to worker. + this.sendStatisticsMessageToWorker(workerNodeKey) } /** @@ -964,11 +1057,11 @@ export abstract class AbstractPool< protected abstract sendStartupMessageToWorker (workerNodeKey: number): void /** - * Sends the worker statistics message to worker given its worker node key. + * Sends the statistics message to worker given its worker node key. * * @param workerNodeKey - The worker node key. */ - private sendWorkerStatisticsMessageToWorker (workerNodeKey: number): void { + private sendStatisticsMessageToWorker (workerNodeKey: number): void { this.sendToWorker(workerNodeKey, { statistics: { runTime: @@ -985,6 +1078,7 @@ export abstract class AbstractPool< while (this.tasksQueueSize(workerNodeKey) > 0) { let targetWorkerNodeKey: number = workerNodeKey let minQueuedTasks = Infinity + let executeTask = false for (const [workerNodeId, workerNode] of this.workerNodes.entries()) { const workerInfo = this.getWorkerInfo(workerNodeId) if ( @@ -992,6 +1086,12 @@ export abstract class AbstractPool< workerInfo.ready && workerNode.usage.tasks.queued === 0 ) { + if ( + this.workerNodes[workerNodeId].usage.tasks.executing < + (this.opts.tasksQueueOptions?.concurrency as number) + ) { + executeTask = true + } targetWorkerNodeKey = workerNodeId break } @@ -1004,10 +1104,17 @@ export abstract class AbstractPool< targetWorkerNodeKey = workerNodeId } } - this.enqueueTask( - targetWorkerNodeKey, - this.dequeueTask(workerNodeKey) as Task - ) + if (executeTask) { + this.executeTask( + targetWorkerNodeKey, + this.dequeueTask(workerNodeKey) as Task + ) + } else { + this.enqueueTask( + targetWorkerNodeKey, + this.dequeueTask(workerNodeKey) as Task + ) + } } } @@ -1017,42 +1124,55 @@ export abstract class AbstractPool< * @returns The listener function to execute when a message is received from a worker. */ protected workerListener (): (message: MessageValue) => void { - return message => { + return (message) => { this.checkMessageWorkerId(message) - if (message.ready != null) { - // Worker ready response received + if (message.ready != null && message.taskFunctions != null) { + // Worker ready response received from worker this.handleWorkerReadyResponse(message) - } else if (message.id != null) { - // Task execution response received + } else if (message.taskId != null) { + // Task execution response received from worker this.handleTaskExecutionResponse(message) + } else if (message.taskFunctions != null) { + // Task functions message received from worker + this.getWorkerInfo( + this.getWorkerNodeKeyByWorkerId(message.workerId) + ).taskFunctions = message.taskFunctions } } } private handleWorkerReadyResponse (message: MessageValue): void { - this.getWorkerInfo( - this.getWorkerNodeKeyByWorkerId(message.workerId) as number - ).ready = message.ready as boolean + if (message.ready === false) { + throw new Error(`Worker ${message.workerId} failed to initialize`) + } + const workerInfo = this.getWorkerInfo( + this.getWorkerNodeKeyByWorkerId(message.workerId) + ) + workerInfo.ready = message.ready as boolean + workerInfo.taskFunctions = message.taskFunctions if (this.emitter != null && this.ready) { this.emitter.emit(PoolEvents.ready, this.info) } } private handleTaskExecutionResponse (message: MessageValue): void { - const promiseResponse = this.promiseResponseMap.get(message.id as string) + const { taskId, taskError, data } = message + const promiseResponse = this.promiseResponseMap.get(taskId as string) if (promiseResponse != null) { - if (message.taskError != null) { - this.emitter?.emit(PoolEvents.taskError, message.taskError) - promiseResponse.reject(message.taskError.message) + if (taskError != null) { + this.emitter?.emit(PoolEvents.taskError, taskError) + promiseResponse.reject(taskError.message) } else { - promiseResponse.resolve(message.data as Response) + promiseResponse.resolve(data as Response) } const workerNodeKey = promiseResponse.workerNodeKey this.afterTaskExecutionHook(workerNodeKey, message) - this.promiseResponseMap.delete(message.id as string) + this.promiseResponseMap.delete(taskId as string) if ( this.opts.enableTasksQueue === true && - this.tasksQueueSize(workerNodeKey) > 0 + this.tasksQueueSize(workerNodeKey) > 0 && + this.workerNodes[workerNodeKey].usage.tasks.executing < + (this.opts.tasksQueueOptions?.concurrency as number) ) { this.executeTask( workerNodeKey, @@ -1092,15 +1212,19 @@ export abstract class AbstractPool< * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found. */ private addWorkerNode (worker: Worker): number { - const workerNode = new WorkerNode(worker, this.worker) + const workerNode = new WorkerNode( + worker, + this.worker, + this.maxSize + ) // Flag the worker node as ready at pool startup. if (this.starting) { workerNode.info.ready = true } this.workerNodes.push(workerNode) - const workerNodeKey = this.getWorkerNodeKey(worker) + const workerNodeKey = this.getWorkerNodeKeyByWorker(worker) if (workerNodeKey === -1) { - throw new Error('Worker node not found') + throw new Error('Worker node added not found') } return workerNodeKey } @@ -1111,7 +1235,7 @@ export abstract class AbstractPool< * @param worker - The worker. */ private removeWorkerNode (worker: Worker): void { - const workerNodeKey = this.getWorkerNodeKey(worker) + const workerNodeKey = this.getWorkerNodeKeyByWorker(worker) if (workerNodeKey !== -1) { this.workerNodes.splice(workerNodeKey, 1) this.workerChoiceStrategyContext.remove(workerNodeKey) @@ -1126,10 +1250,19 @@ export abstract class AbstractPool< */ private executeTask (workerNodeKey: number, task: Task): void { this.beforeTaskExecutionHook(workerNodeKey, task) - this.sendToWorker(workerNodeKey, task) + this.sendToWorker(workerNodeKey, task, task.transferList) } private enqueueTask (workerNodeKey: number, task: Task): number { + if ( + this.opts.enableTasksQueue === true && + this.workerNodes[workerNodeKey].hasBackPressure() + ) { + this.emitter?.emit(PoolEvents.backPressure, { + workerId: this.getWorkerInfo(workerNodeKey).id, + ...this.info + }) + } return this.workerNodes[workerNodeKey].enqueueTask(task) } @@ -1141,7 +1274,7 @@ export abstract class AbstractPool< return this.workerNodes[workerNodeKey].tasksQueueSize() } - private flushTasksQueue (workerNodeKey: number): void { + protected flushTasksQueue (workerNodeKey: number): void { while (this.tasksQueueSize(workerNodeKey) > 0) { this.executeTask( workerNodeKey,