workerNodeKey: number,
message: MessageValue<Data>
): Promise<boolean> {
- const workerId = this.getWorkerInfo(workerNodeKey).id as number
return await new Promise<boolean>((resolve, reject) => {
+ const workerId = this.getWorkerInfo(workerNodeKey).id as number
this.registerWorkerMessageListener(workerNodeKey, message => {
if (
message.workerId === workerId &&
}
private async sendTaskFunctionOperationToWorkers (
- message: Omit<MessageValue<Data>, 'workerId'>
+ message: MessageValue<Data>
): Promise<boolean> {
return await new Promise<boolean>((resolve, reject) => {
const responsesReceived = new Array<MessageValue<Data | Response>>()
if (typeof fn !== 'function') {
throw new TypeError('fn argument must be a function')
}
- this.taskFunctions.set(name, fn)
- return await this.sendTaskFunctionOperationToWorkers({
+ const opResult = await this.sendTaskFunctionOperationToWorkers({
taskFunctionOperation: 'add',
taskFunctionName: name,
taskFunction: fn.toString()
})
+ this.taskFunctions.set(name, fn)
+ return opResult
}
/** @inheritDoc */
'Cannot remove a task function not handled on the pool side'
)
}
- this.taskFunctions.delete(name)
- return await this.sendTaskFunctionOperationToWorkers({
+ const opResult = await this.sendTaskFunctionOperationToWorkers({
taskFunctionOperation: 'remove',
taskFunctionName: name
})
+ this.deleteTaskFunctionWorkerUsages(name)
+ this.taskFunctions.delete(name)
+ return opResult
}
/** @inheritDoc */
})
}
+ private deleteTaskFunctionWorkerUsages (name: string): void {
+ for (const workerNode of this.workerNodes) {
+ workerNode.deleteTaskFunctionWorkerUsage(name)
+ }
+ }
+
private shallExecuteTask (workerNodeKey: number): boolean {
return (
this.tasksQueueSize(workerNodeKey) === 0 &&