X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=8fb25b645fb55f2e0f2e48192aae20a1ec5363c4;hb=c6ad8edf55abf4e21cd84f0c707c565663a3d8e8;hp=1d27428e05f3328427977bbda2947fcda9c873a4;hpb=1e3214b63e262557aadebf3c57e8388de6a4bbe4;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 1d27428e..8fb25b64 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -92,6 +92,10 @@ export abstract class AbstractPool< * The start timestamp of the pool. */ private readonly startTimestamp + /** + * The task function names. + */ + private taskFunctions!: string[] /** * Constructs a new poolifier pool. @@ -555,7 +559,7 @@ export abstract class AbstractPool< } for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) { workerNode.resetUsage() - this.sendWorkerStatisticsMessageToWorker(workerNodeKey) + this.sendStatisticsMessageToWorker(workerNodeKey) } } @@ -642,6 +646,15 @@ export abstract class AbstractPool< } } + /** @inheritDoc */ + public listTaskFunctions (): string[] { + if (this.taskFunctions != null) { + return this.taskFunctions + } else { + return [] + } + } + /** @inheritDoc */ public async execute ( data?: Data, @@ -652,6 +665,22 @@ export abstract class AbstractPool< if (name != null && typeof name !== 'string') { reject(new TypeError('name argument must be a string')) } + if ( + name != null && + typeof name === 'string' && + name.trim().length === 0 + ) { + reject(new TypeError('name argument must not be an empty string')) + } + if ( + name != null && + this.taskFunctions != null && + !this.taskFunctions.includes(name) + ) { + reject( + new Error(`Task function '${name}' is not registered in the pool`) + ) + } if (transferList != null && !Array.isArray(transferList)) { reject(new TypeError('transferList argument must be an array')) } @@ -692,23 +721,23 @@ export abstract class AbstractPool< await this.destroyWorkerNode(workerNodeKey) }) ) + this.emitter?.emit(PoolEvents.destroy) } protected async sendKillMessageToWorker ( workerNodeKey: number, workerId: number ): Promise { - const waitForKillResponse = new Promise((resolve, reject) => { + await new Promise((resolve, reject) => { this.registerWorkerMessageListener(workerNodeKey, (message) => { if (message.kill === 'success') { resolve() } else if (message.kill === 'failure') { - reject(new Error('Worker kill message handling failed')) + reject(new Error(`Worker ${workerId} kill message handling failed`)) } }) + this.sendToWorker(workerNodeKey, { kill: true, workerId }) }) - this.sendToWorker(workerNodeKey, { kill: true, workerId }) - await waitForKillResponse } /** @@ -958,7 +987,9 @@ export abstract class AbstractPool< workerUsage.tasks.executing === 0 && this.tasksQueueSize(localWorkerNodeKey) === 0))) ) { - this.destroyWorkerNode(localWorkerNodeKey).catch(EMPTY_FUNCTION) + this.destroyWorkerNode(localWorkerNodeKey).catch((error) => { + this.emitter?.emit(PoolEvents.error, error) + }) } }) const workerInfo = this.getWorkerInfo(workerNodeKey) @@ -997,8 +1028,8 @@ export abstract class AbstractPool< this.registerWorkerMessageListener(workerNodeKey, this.workerListener()) // Send the startup message to worker. this.sendStartupMessageToWorker(workerNodeKey) - // Send the worker statistics message to worker. - this.sendWorkerStatisticsMessageToWorker(workerNodeKey) + // Send the statistics message to worker. + this.sendStatisticsMessageToWorker(workerNodeKey) } /** @@ -1009,11 +1040,11 @@ export abstract class AbstractPool< protected abstract sendStartupMessageToWorker (workerNodeKey: number): void /** - * Sends the worker statistics message to worker given its worker node key. + * Sends the statistics message to worker given its worker node key. * * @param workerNodeKey - The worker node key. */ - private sendWorkerStatisticsMessageToWorker (workerNodeKey: number): void { + private sendStatisticsMessageToWorker (workerNodeKey: number): void { this.sendToWorker(workerNodeKey, { statistics: { runTime: @@ -1084,6 +1115,9 @@ export abstract class AbstractPool< } else if (message.taskId != null) { // Task execution response received from worker this.handleTaskExecutionResponse(message) + } else if (message.taskFunctions != null) { + // Task functions message received from worker + this.taskFunctions = message.taskFunctions } } }