X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=d36e8a4737dbe5514aa936de561780baee5ba528;hb=cea399c8027c4420ae869f177d9d518d4b51775f;hp=eb47bc2f24f67b983b4829a763705c6fb8b45828;hpb=6b83e544b1827a8a04898c3032dab73fe19687e6;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index eb47bc2f..d36e8a47 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -28,13 +28,12 @@ import { PoolTypes, type TasksQueueOptions } from './pool' -import { - type IWorker, - type IWorkerNode, - type WorkerInfo, - type WorkerType, - WorkerTypes, - type WorkerUsage +import type { + IWorker, + IWorkerNode, + WorkerInfo, + WorkerType, + WorkerUsage } from './worker' import { type MeasurementStatisticsRequirements, @@ -93,6 +92,10 @@ export abstract class AbstractPool< * The start timestamp of the pool. */ private readonly startTimestamp + /** + * The task function names. + */ + private taskFunctions!: string[] /** * Constructs a new poolifier pool. @@ -107,7 +110,9 @@ export abstract class AbstractPool< protected readonly opts: PoolOptions ) { if (!this.isMain()) { - throw new Error('Cannot start a pool from a worker!') + throw new Error( + 'Cannot start a pool from a worker with the same type as the pool' + ) } this.checkNumberOfWorkers(this.numberOfWorkers) this.checkFilePath(this.filePath) @@ -361,14 +366,14 @@ export abstract class AbstractPool< minimum: round( Math.min( ...this.workerNodes.map( - workerNode => workerNode.usage.runTime?.minimum ?? Infinity + (workerNode) => workerNode.usage.runTime?.minimum ?? Infinity ) ) ), maximum: round( Math.max( ...this.workerNodes.map( - workerNode => workerNode.usage.runTime?.maximum ?? -Infinity + (workerNode) => workerNode.usage.runTime?.maximum ?? -Infinity ) ) ), @@ -389,7 +394,7 @@ export abstract class AbstractPool< median: round( median( this.workerNodes.map( - workerNode => workerNode.usage.runTime?.median ?? 0 + (workerNode) => workerNode.usage.runTime?.median ?? 0 ) ) ) @@ -402,14 +407,14 @@ export abstract class AbstractPool< minimum: round( Math.min( ...this.workerNodes.map( - workerNode => workerNode.usage.waitTime?.minimum ?? Infinity + (workerNode) => workerNode.usage.waitTime?.minimum ?? Infinity ) ) ), maximum: round( Math.max( ...this.workerNodes.map( - workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity + (workerNode) => workerNode.usage.waitTime?.maximum ?? -Infinity ) ) ), @@ -430,7 +435,7 @@ export abstract class AbstractPool< median: round( median( this.workerNodes.map( - workerNode => workerNode.usage.waitTime?.median ?? 0 + (workerNode) => workerNode.usage.waitTime?.median ?? 0 ) ) ) @@ -523,7 +528,7 @@ export abstract class AbstractPool< */ private getWorkerNodeKeyByWorker (worker: Worker): number { return this.workerNodes.findIndex( - workerNode => workerNode.worker === worker + (workerNode) => workerNode.worker === worker ) } @@ -535,7 +540,7 @@ export abstract class AbstractPool< */ private getWorkerNodeKeyByWorkerId (workerId: number): number { return this.workerNodes.findIndex( - workerNode => workerNode.info.id === workerId + (workerNode) => workerNode.info.id === workerId ) } @@ -554,7 +559,7 @@ export abstract class AbstractPool< } for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) { workerNode.resetUsage() - this.sendWorkerStatisticsMessageToWorker(workerNodeKey) + this.sendStatisticsMessageToWorker(workerNodeKey) } } @@ -625,7 +630,7 @@ export abstract class AbstractPool< if (this.opts.enableTasksQueue === true) { return ( this.workerNodes.findIndex( - workerNode => + (workerNode) => workerNode.info.ready && workerNode.usage.tasks.executing < (this.opts.tasksQueueOptions?.concurrency as number) @@ -634,13 +639,22 @@ export abstract class AbstractPool< } else { return ( this.workerNodes.findIndex( - workerNode => + (workerNode) => workerNode.info.ready && workerNode.usage.tasks.executing === 0 ) === -1 ) } } + /** @inheritDoc */ + public listTaskFunctions (): string[] { + if (this.taskFunctions != null) { + return this.taskFunctions + } else { + return [] + } + } + /** @inheritDoc */ public async execute ( data?: Data, @@ -651,6 +665,22 @@ export abstract class AbstractPool< if (name != null && typeof name !== 'string') { reject(new TypeError('name argument must be a string')) } + if ( + name != null && + typeof name === 'string' && + name.trim().length === 0 + ) { + reject(new TypeError('name argument must not be an empty string')) + } + if ( + name != null && + this.taskFunctions != null && + !this.taskFunctions.includes(name) + ) { + reject( + new Error(`Task function '${name}' is not registered in the pool`) + ) + } if (transferList != null && !Array.isArray(transferList)) { reject(new TypeError('transferList argument must be an array')) } @@ -693,6 +723,22 @@ export abstract class AbstractPool< ) } + protected async sendKillMessageToWorker ( + workerNodeKey: number, + workerId: number + ): Promise { + await new Promise((resolve, reject) => { + this.registerWorkerMessageListener(workerNodeKey, (message) => { + if (message.kill === 'success') { + resolve() + } else if (message.kill === 'failure') { + reject(new Error(`Worker ${workerId} kill message handling failed`)) + } + }) + this.sendToWorker(workerNodeKey, { kill: true, workerId }) + }) + } + /** * Terminates the worker node given its worker node key. * @@ -888,7 +934,7 @@ export abstract class AbstractPool< worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION) worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION) - worker.on('error', error => { + worker.on('error', (error) => { const workerNodeKey = this.getWorkerNodeKeyByWorker(worker) const workerInfo = this.getWorkerInfo(workerNodeKey) workerInfo.ready = false @@ -925,7 +971,7 @@ export abstract class AbstractPool< */ protected createAndSetupDynamicWorkerNode (): number { const workerNodeKey = this.createAndSetupWorkerNode() - this.registerWorkerMessageListener(workerNodeKey, message => { + this.registerWorkerMessageListener(workerNodeKey, (message) => { const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId( message.workerId ) @@ -933,14 +979,16 @@ export abstract class AbstractPool< // Kill message received from worker if ( isKillBehavior(KillBehaviors.HARD, message.kill) || - (message.kill != null && + (isKillBehavior(KillBehaviors.SOFT, message.kill) && ((this.opts.enableTasksQueue === false && workerUsage.tasks.executing === 0) || (this.opts.enableTasksQueue === true && workerUsage.tasks.executing === 0 && this.tasksQueueSize(localWorkerNodeKey) === 0))) ) { - this.destroyWorkerNode(localWorkerNodeKey).catch(EMPTY_FUNCTION) + this.destroyWorkerNode(localWorkerNodeKey).catch((error) => { + this.emitter?.emit(PoolEvents.error, error) + }) } }) const workerInfo = this.getWorkerInfo(workerNodeKey) @@ -979,8 +1027,8 @@ export abstract class AbstractPool< this.registerWorkerMessageListener(workerNodeKey, this.workerListener()) // Send the startup message to worker. this.sendStartupMessageToWorker(workerNodeKey) - // Send the worker statistics message to worker. - this.sendWorkerStatisticsMessageToWorker(workerNodeKey) + // Send the statistics message to worker. + this.sendStatisticsMessageToWorker(workerNodeKey) } /** @@ -991,11 +1039,11 @@ export abstract class AbstractPool< protected abstract sendStartupMessageToWorker (workerNodeKey: number): void /** - * Sends the worker statistics message to worker given its worker node key. + * Sends the statistics message to worker given its worker node key. * * @param workerNodeKey - The worker node key. */ - private sendWorkerStatisticsMessageToWorker (workerNodeKey: number): void { + private sendStatisticsMessageToWorker (workerNodeKey: number): void { this.sendToWorker(workerNodeKey, { statistics: { runTime: @@ -1058,7 +1106,7 @@ export abstract class AbstractPool< * @returns The listener function to execute when a message is received from a worker. */ protected workerListener (): (message: MessageValue) => void { - return message => { + return (message) => { this.checkMessageWorkerId(message) if (message.ready != null) { // Worker ready response received from worker @@ -1066,6 +1114,9 @@ export abstract class AbstractPool< } else if (message.taskId != null) { // Task execution response received from worker this.handleTaskExecutionResponse(message) + } else if (message.taskFunctions != null) { + // Task functions message received from worker + this.taskFunctions = message.taskFunctions } } } @@ -1171,13 +1222,7 @@ export abstract class AbstractPool< */ private executeTask (workerNodeKey: number, task: Task): void { this.beforeTaskExecutionHook(workerNodeKey, task) - this.sendToWorker( - workerNodeKey, - task, - this.worker === WorkerTypes.thread && task.transferList != null - ? task.transferList - : undefined - ) + this.sendToWorker(workerNodeKey, task, task.transferList) } private enqueueTask (workerNodeKey: number, task: Task): number {