workerNodeKey: number,
message: MessageValue<Data>
): Promise<boolean> {
- const workerId = this.getWorkerInfo(workerNodeKey).id as number
return await new Promise<boolean>((resolve, reject) => {
+ const workerId = this.getWorkerInfo(workerNodeKey).id as number
this.registerWorkerMessageListener(workerNodeKey, message => {
if (
message.workerId === workerId &&
}
private async sendTaskFunctionOperationToWorkers (
- message: Omit<MessageValue<Data>, 'workerId'>
+ message: MessageValue<Data>
): Promise<boolean> {
return await new Promise<boolean>((resolve, reject) => {
const responsesReceived = new Array<MessageValue<Data | Response>>()
/** @inheritDoc */
public async addTaskFunction (
name: string,
- taskFunction: TaskFunction<Data, Response>
+ fn: TaskFunction<Data, Response>
): Promise<boolean> {
- this.taskFunctions.set(name, taskFunction)
- return await this.sendTaskFunctionOperationToWorkers({
+ if (typeof name !== 'string') {
+ throw new TypeError('name argument must be a string')
+ }
+ if (typeof name === 'string' && name.trim().length === 0) {
+ throw new TypeError('name argument must not be an empty string')
+ }
+ if (typeof fn !== 'function') {
+ throw new TypeError('fn argument must be a function')
+ }
+ const opResult = await this.sendTaskFunctionOperationToWorkers({
taskFunctionOperation: 'add',
taskFunctionName: name,
- taskFunction: taskFunction.toString()
+ taskFunction: fn.toString()
})
+ this.taskFunctions.set(name, fn)
+ return opResult
}
/** @inheritDoc */
public async removeTaskFunction (name: string): Promise<boolean> {
if (!this.taskFunctions.has(name)) {
throw new Error(
- 'Cannot remove a task function that does not exist on the pool side'
+ 'Cannot remove a task function not handled on the pool side'
)
}
- this.taskFunctions.delete(name)
- return await this.sendTaskFunctionOperationToWorkers({
+ const opResult = await this.sendTaskFunctionOperationToWorkers({
taskFunctionOperation: 'remove',
taskFunctionName: name
})
+ this.deleteTaskFunctionWorkerUsages(name)
+ this.taskFunctions.delete(name)
+ return opResult
}
/** @inheritDoc */
})
}
+ private deleteTaskFunctionWorkerUsages (name: string): void {
+ for (const workerNode of this.workerNodes) {
+ workerNode.deleteTaskFunctionWorkerUsage(name)
+ }
+ }
+
private shallExecuteTask (workerNodeKey: number): boolean {
return (
this.tasksQueueSize(workerNodeKey) === 0 &&