X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=9aeeebed772b37234c2a8f0e88c5cb45c65f0dce;hb=b0b55f57cb5e2bc363bc75d84b483c9c29a5d22f;hp=779d2b59017c4493d3e4dba12f71ae347f6b1ba6;hpb=f789bd30490f446ba95b414b30f5e271cdfee83b;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 779d2b59..9aeeebed 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -768,8 +768,8 @@ export abstract class AbstractPool< workerNodeKey: number, message: MessageValue ): Promise { - const workerId = this.getWorkerInfo(workerNodeKey).id as number return await new Promise((resolve, reject) => { + const workerId = this.getWorkerInfo(workerNodeKey).id as number this.registerWorkerMessageListener(workerNodeKey, message => { if ( message.workerId === workerId && @@ -782,9 +782,11 @@ export abstract class AbstractPool< ) { reject( new Error( - `Task function operation ${ + `Task function operation '${ message.taskFunctionOperation as string - } failed on worker ${message.workerId}` + }' failed on worker ${message.workerId} with error: '${ + message.workerError?.message as string + }'` ) ) } @@ -794,7 +796,7 @@ export abstract class AbstractPool< } private async sendTaskFunctionOperationToWorkers ( - message: Omit, 'workerId'> + message: MessageValue ): Promise { return await new Promise((resolve, reject) => { const responsesReceived = new Array>() @@ -815,11 +817,18 @@ export abstract class AbstractPool< message => message.taskFunctionOperationStatus === false ) ) { + const errorResponse = responsesReceived.find( + response => response.taskFunctionOperationStatus === false + ) reject( new Error( - `Task function operation ${ + `Task function operation '${ message.taskFunctionOperation as string - } failed on worker ${message.workerId as number}` + }' failed on worker ${ + errorResponse?.workerId as number + } with error: '${ + errorResponse?.workerError?.message as string + }'` ) ) } @@ -846,14 +855,24 @@ export abstract class AbstractPool< /** @inheritDoc */ public async addTaskFunction ( name: string, - taskFunction: TaskFunction + fn: TaskFunction ): Promise { - this.taskFunctions.set(name, taskFunction) - return await this.sendTaskFunctionOperationToWorkers({ + if (typeof name !== 'string') { + throw new TypeError('name argument must be a string') + } + if (typeof name === 'string' && name.trim().length === 0) { + throw new TypeError('name argument must not be an empty string') + } + if (typeof fn !== 'function') { + throw new TypeError('fn argument must be a function') + } + const opResult = await this.sendTaskFunctionOperationToWorkers({ taskFunctionOperation: 'add', taskFunctionName: name, - taskFunction: taskFunction.toString() + taskFunction: fn.toString() }) + this.taskFunctions.set(name, fn) + return opResult } /** @inheritDoc */ @@ -863,11 +882,13 @@ export abstract class AbstractPool< 'Cannot remove a task function not handled on the pool side' ) } - this.taskFunctions.delete(name) - return await this.sendTaskFunctionOperationToWorkers({ + const opResult = await this.sendTaskFunctionOperationToWorkers({ taskFunctionOperation: 'remove', taskFunctionName: name }) + this.deleteTaskFunctionWorkerUsages(name) + this.taskFunctions.delete(name) + return opResult } /** @inheritDoc */ @@ -891,6 +912,12 @@ export abstract class AbstractPool< }) } + private deleteTaskFunctionWorkerUsages (name: string): void { + for (const workerNode of this.workerNodes) { + workerNode.deleteTaskFunctionWorkerUsage(name) + } + } + private shallExecuteTask (workerNodeKey: number): boolean { return ( this.tasksQueueSize(workerNodeKey) === 0 &&