+ private async sendTaskFunctionOperationToWorker (
+ workerNodeKey: number,
+ message: MessageValue<Data>
+ ): Promise<boolean> {
+ return await new Promise<boolean>((resolve, reject) => {
+ const taskFunctionOperationListener = (
+ message: MessageValue<Response>
+ ): void => {
+ this.checkMessageWorkerId(message)
+ const workerId = this.getWorkerInfo(workerNodeKey).id as number
+ if (
+ message.taskFunctionOperationStatus != null &&
+ message.workerId === workerId
+ ) {
+ if (message.taskFunctionOperationStatus) {
+ resolve(true)
+ } else if (!message.taskFunctionOperationStatus) {
+ reject(
+ new Error(
+ `Task function operation '${
+ message.taskFunctionOperation as string
+ }' failed on worker ${message.workerId} with error: '${
+ message.workerError?.message as string
+ }'`
+ )
+ )
+ }
+ this.deregisterWorkerMessageListener(
+ this.getWorkerNodeKeyByWorkerId(message.workerId),
+ taskFunctionOperationListener
+ )
+ }
+ }
+ this.registerWorkerMessageListener(
+ workerNodeKey,
+ taskFunctionOperationListener
+ )
+ this.sendToWorker(workerNodeKey, message)
+ })
+ }
+
+ private async sendTaskFunctionOperationToWorkers (
+ message: MessageValue<Data>
+ ): Promise<boolean> {
+ return await new Promise<boolean>((resolve, reject) => {
+ const responsesReceived = new Array<MessageValue<Response>>()
+ const taskFunctionOperationsListener = (
+ message: MessageValue<Response>
+ ): void => {
+ this.checkMessageWorkerId(message)
+ if (message.taskFunctionOperationStatus != null) {
+ responsesReceived.push(message)
+ if (responsesReceived.length === this.workerNodes.length) {
+ if (
+ responsesReceived.every(
+ message => message.taskFunctionOperationStatus === true
+ )
+ ) {
+ resolve(true)
+ } else if (
+ responsesReceived.some(
+ message => message.taskFunctionOperationStatus === false
+ )
+ ) {
+ const errorResponse = responsesReceived.find(
+ response => response.taskFunctionOperationStatus === false
+ )
+ reject(
+ new Error(
+ `Task function operation '${
+ message.taskFunctionOperation as string
+ }' failed on worker ${
+ errorResponse?.workerId as number
+ } with error: '${
+ errorResponse?.workerError?.message as string
+ }'`
+ )
+ )
+ }
+ this.deregisterWorkerMessageListener(
+ this.getWorkerNodeKeyByWorkerId(message.workerId),
+ taskFunctionOperationsListener
+ )
+ }
+ }
+ }
+ for (const [workerNodeKey] of this.workerNodes.entries()) {
+ this.registerWorkerMessageListener(
+ workerNodeKey,
+ taskFunctionOperationsListener
+ )
+ this.sendToWorker(workerNodeKey, message)
+ }
+ })
+ }
+