X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=c1c0a58c784e71dd3d049d5b16298f86dfd8d0a4;hb=e81c38f2f8046e9a482e94040b8f6d31dcda160c;hp=90cede3a8227f7e21cd7a0f32ec84e4f8e6bdbf6;hpb=6703b9f4492e347500111c42ffddbd8341c8f262;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 90cede3a..c1c0a58c 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -723,44 +723,77 @@ export abstract class AbstractPool< } } - private sendToWorkers (message: Omit, 'workerId'>): number { - let messagesCount = 0 - for (const [workerNodeKey] of this.workerNodes.entries()) { - this.sendToWorker(workerNodeKey, { - ...message, - workerId: this.getWorkerInfo(workerNodeKey).id as number - }) - ++messagesCount - } - return messagesCount + private async sendTaskFunctionOperationToWorker ( + message: Omit, 'workerId'> + ): Promise { + return await new Promise((resolve, reject) => { + const responsesReceived = new Array>() + for (const [workerNodeKey] of this.workerNodes.entries()) { + this.registerWorkerMessageListener(workerNodeKey, message => { + if (message.taskFunctionOperationStatus != null) { + responsesReceived.push(message) + if ( + responsesReceived.length === this.workerNodes.length && + responsesReceived.every( + message => message.taskFunctionOperationStatus === true + ) + ) { + resolve(true) + } else if ( + responsesReceived.length === this.workerNodes.length && + responsesReceived.some( + message => message.taskFunctionOperationStatus === false + ) + ) { + reject( + new Error( + `Task function operation ${ + message.taskFunctionOperation as string + } failed on worker ${message.workerId}` + ) + ) + } + } + }) + this.sendToWorker(workerNodeKey, { + ...message, + workerId: this.getWorkerInfo(workerNodeKey).id as number + }) + } + }) } /** @inheritDoc */ public hasTaskFunction (name: string): boolean { - this.sendToWorkers({ - taskFunctionOperation: 'has', - taskFunctionName: name - }) - return true + for (const workerNode of this.workerNodes) { + if ( + Array.isArray(workerNode.info.taskFunctionNames) && + workerNode.info.taskFunctionNames.includes(name) + ) { + return true + } + } + return false } /** @inheritDoc */ - public addTaskFunction (name: string, taskFunction: TaskFunction): boolean { - this.sendToWorkers({ + public async addTaskFunction ( + name: string, + taskFunction: TaskFunction + ): Promise { + return await this.sendTaskFunctionOperationToWorker({ taskFunctionOperation: 'add', taskFunctionName: name, taskFunction: taskFunction.toString() }) - return true } /** @inheritDoc */ - public removeTaskFunction (name: string): boolean { - this.sendToWorkers({ + public async removeTaskFunction (name: string): Promise { + return await this.sendTaskFunctionOperationToWorker({ taskFunctionOperation: 'remove', taskFunctionName: name }) - return true } /** @inheritDoc */ @@ -777,12 +810,11 @@ export abstract class AbstractPool< } /** @inheritDoc */ - public setDefaultTaskFunction (name: string): boolean { - this.sendToWorkers({ + public async setDefaultTaskFunction (name: string): Promise { + return await this.sendTaskFunctionOperationToWorker({ taskFunctionOperation: 'default', taskFunctionName: name }) - return true } private shallExecuteTask (workerNodeKey: number): boolean {