X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=6d71e87e34378abe5c75ad1c1806eacaad979a3b;hb=2115798751b31f025acae1ff8c4f5cbcec8ad3af;hp=2a52d5486984133e620552877e90921d50d96190;hpb=5af9e4679f4c53948b332cc775564e5a464ed822;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 2a52d548..6d71e87e 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -92,6 +92,10 @@ export abstract class AbstractPool< * The start timestamp of the pool. */ private readonly startTimestamp + /** + * The task function names. + */ + private taskFunctions!: string[] /** * Constructs a new poolifier pool. @@ -506,7 +510,9 @@ export abstract class AbstractPool< * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid. */ private checkMessageWorkerId (message: MessageValue): void { - if ( + if (message.workerId == null) { + throw new Error('Worker message received without worker id') + } else if ( message.workerId != null && this.getWorkerNodeKeyByWorkerId(message.workerId) === -1 ) { @@ -555,7 +561,7 @@ export abstract class AbstractPool< } for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) { workerNode.resetUsage() - this.sendWorkerStatisticsMessageToWorker(workerNodeKey) + this.sendStatisticsMessageToWorker(workerNodeKey) } } @@ -642,6 +648,15 @@ export abstract class AbstractPool< } } + /** @inheritDoc */ + public listTaskFunctions (): string[] { + if (this.taskFunctions != null) { + return this.taskFunctions + } else { + return [] + } + } + /** @inheritDoc */ public async execute ( data?: Data, @@ -652,6 +667,22 @@ export abstract class AbstractPool< if (name != null && typeof name !== 'string') { reject(new TypeError('name argument must be a string')) } + if ( + name != null && + typeof name === 'string' && + name.trim().length === 0 + ) { + reject(new TypeError('name argument must not be an empty string')) + } + if ( + name != null && + this.taskFunctions != null && + !this.taskFunctions.includes(name) + ) { + reject( + new Error(`Task function '${name}' is not registered in the pool`) + ) + } if (transferList != null && !Array.isArray(transferList)) { reject(new TypeError('transferList argument must be an array')) } @@ -692,13 +723,14 @@ export abstract class AbstractPool< await this.destroyWorkerNode(workerNodeKey) }) ) + this.emitter?.emit(PoolEvents.destroy) } protected async sendKillMessageToWorker ( workerNodeKey: number, workerId: number ): Promise { - const waitForKillResponse = new Promise((resolve, reject) => { + await new Promise((resolve, reject) => { this.registerWorkerMessageListener(workerNodeKey, (message) => { if (message.kill === 'success') { resolve() @@ -706,9 +738,8 @@ export abstract class AbstractPool< reject(new Error(`Worker ${workerId} kill message handling failed`)) } }) + this.sendToWorker(workerNodeKey, { kill: true, workerId }) }) - this.sendToWorker(workerNodeKey, { kill: true, workerId }) - await waitForKillResponse } /** @@ -904,6 +935,7 @@ export abstract class AbstractPool< protected createAndSetupWorkerNode (): number { const worker = this.createWorker() + worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION) worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION) worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION) worker.on('error', (error) => { @@ -923,7 +955,6 @@ export abstract class AbstractPool< this.redistributeQueuedTasks(workerNodeKey) } }) - worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION) worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION) worker.once('exit', () => { this.removeWorkerNode(worker) @@ -999,8 +1030,8 @@ export abstract class AbstractPool< this.registerWorkerMessageListener(workerNodeKey, this.workerListener()) // Send the startup message to worker. this.sendStartupMessageToWorker(workerNodeKey) - // Send the worker statistics message to worker. - this.sendWorkerStatisticsMessageToWorker(workerNodeKey) + // Send the statistics message to worker. + this.sendStatisticsMessageToWorker(workerNodeKey) } /** @@ -1011,11 +1042,11 @@ export abstract class AbstractPool< protected abstract sendStartupMessageToWorker (workerNodeKey: number): void /** - * Sends the worker statistics message to worker given its worker node key. + * Sends the statistics message to worker given its worker node key. * * @param workerNodeKey - The worker node key. */ - private sendWorkerStatisticsMessageToWorker (workerNodeKey: number): void { + private sendStatisticsMessageToWorker (workerNodeKey: number): void { this.sendToWorker(workerNodeKey, { statistics: { runTime: @@ -1086,11 +1117,17 @@ export abstract class AbstractPool< } else if (message.taskId != null) { // Task execution response received from worker this.handleTaskExecutionResponse(message) + } else if (message.taskFunctions != null) { + // Task functions message received from worker + this.taskFunctions = message.taskFunctions } } } private handleWorkerReadyResponse (message: MessageValue): void { + if (message.ready === false) { + throw new Error(`Worker ${message.workerId} failed to initialize`) + } this.getWorkerInfo( this.getWorkerNodeKeyByWorkerId(message.workerId) ).ready = message.ready as boolean