}
}
- private sendToWorkers (message: Omit<MessageValue<Data>, 'workerId'>): number {
- let messagesCount = 0
- for (const [workerNodeKey] of this.workerNodes.entries()) {
- this.sendToWorker(workerNodeKey, {
- ...message,
- workerId: this.getWorkerInfo(workerNodeKey).id as number
- })
- ++messagesCount
- }
- return messagesCount
+ private async sendTaskFunctionOperationToWorker (
+ message: Omit<MessageValue<Data>, 'workerId'>
+ ): Promise<boolean> {
+ return await new Promise<boolean>((resolve, reject) => {
+ const responsesReceived = new Array<MessageValue<Data | Response>>()
+ for (const [workerNodeKey] of this.workerNodes.entries()) {
+ this.registerWorkerMessageListener(workerNodeKey, message => {
+ if (message.taskFunctionOperationStatus != null) {
+ responsesReceived.push(message)
+ if (
+ responsesReceived.length === this.workerNodes.length &&
+ responsesReceived.every(
+ message => message.taskFunctionOperationStatus === true
+ )
+ ) {
+ resolve(true)
+ } else if (
+ responsesReceived.length === this.workerNodes.length &&
+ responsesReceived.some(
+ message => message.taskFunctionOperationStatus === false
+ )
+ ) {
+ reject(
+ new Error(
+ `Task function operation ${
+ message.taskFunctionOperation as string
+ } failed on worker ${message.workerId}`
+ )
+ )
+ }
+ }
+ })
+ this.sendToWorker(workerNodeKey, {
+ ...message,
+ workerId: this.getWorkerInfo(workerNodeKey).id as number
+ })
+ }
+ })
}
/** @inheritDoc */
public hasTaskFunction (name: string): boolean {
- this.sendToWorkers({
- taskFunctionOperation: 'has',
- taskFunctionName: name
- })
- return true
+ for (const workerNode of this.workerNodes) {
+ if (
+ Array.isArray(workerNode.info.taskFunctionNames) &&
+ workerNode.info.taskFunctionNames.includes(name)
+ ) {
+ return true
+ }
+ }
+ return false
}
/** @inheritDoc */
- public addTaskFunction (name: string, taskFunction: TaskFunction): boolean {
- this.sendToWorkers({
+ public async addTaskFunction (
+ name: string,
+ taskFunction: TaskFunction
+ ): Promise<boolean> {
+ return await this.sendTaskFunctionOperationToWorker({
taskFunctionOperation: 'add',
taskFunctionName: name,
taskFunction: taskFunction.toString()
})
- return true
}
/** @inheritDoc */
- public removeTaskFunction (name: string): boolean {
- this.sendToWorkers({
+ public async removeTaskFunction (name: string): Promise<boolean> {
+ return await this.sendTaskFunctionOperationToWorker({
taskFunctionOperation: 'remove',
taskFunctionName: name
})
- return true
}
/** @inheritDoc */
}
/** @inheritDoc */
- public setDefaultTaskFunction (name: string): boolean {
- this.sendToWorkers({
+ public async setDefaultTaskFunction (name: string): Promise<boolean> {
+ return await this.sendTaskFunctionOperationToWorker({
taskFunctionOperation: 'default',
taskFunctionName: name
})
- return true
}
private shallExecuteTask (workerNodeKey: number): boolean {