}
}
- private sendToWorkers (message: Omit<MessageValue<Data>, 'workerId'>): number {
- let messagesCount = 0
- for (const [workerNodeKey] of this.workerNodes.entries()) {
- this.sendToWorker(workerNodeKey, {
- ...message,
- workerId: this.getWorkerInfo(workerNodeKey).id as number
- })
- ++messagesCount
- }
- return messagesCount
+ private async sendTaskFunctionOperationToWorker (
+ message: Omit<MessageValue<Data>, 'workerId'>
+ ): Promise<boolean> {
+ return await new Promise<boolean>((resolve, reject) => {
+ const responsesReceived = new Array<MessageValue<Data | Response>>()
+ for (const [workerNodeKey] of this.workerNodes.entries()) {
+ this.registerWorkerMessageListener(workerNodeKey, message => {
+ if (message.taskFunctionOperationStatus != null) {
+ responsesReceived.push(message)
+ if (
+ responsesReceived.length === this.workerNodes.length &&
+ responsesReceived.every(
+ message => message.taskFunctionOperationStatus === true
+ )
+ ) {
+ resolve(true)
+ } else if (
+ responsesReceived.length === this.workerNodes.length &&
+ responsesReceived.some(
+ message => message.taskFunctionOperationStatus === false
+ )
+ ) {
+ reject(
+ new Error(
+ `Task function operation ${
+ message.taskFunctionOperation as string
+ } failed on worker ${message.workerId}`
+ )
+ )
+ }
+ }
+ })
+ this.sendToWorker(workerNodeKey, {
+ ...message,
+ workerId: this.getWorkerInfo(workerNodeKey).id as number
+ })
+ }
+ })
}
/** @inheritDoc */
}
/** @inheritDoc */
- public addTaskFunction (name: string, taskFunction: TaskFunction): boolean {
- this.sendToWorkers({
+ public async addTaskFunction (
+ name: string,
+ taskFunction: TaskFunction
+ ): Promise<boolean> {
+ return await this.sendTaskFunctionOperationToWorker({
taskFunctionOperation: 'add',
taskFunctionName: name,
taskFunction: taskFunction.toString()
})
- return true
}
/** @inheritDoc */
- public removeTaskFunction (name: string): boolean {
- this.sendToWorkers({
+ public async removeTaskFunction (name: string): Promise<boolean> {
+ return await this.sendTaskFunctionOperationToWorker({
taskFunctionOperation: 'remove',
taskFunctionName: name
})
- return true
}
/** @inheritDoc */
}
/** @inheritDoc */
- public setDefaultTaskFunction (name: string): boolean {
- this.sendToWorkers({
+ public async setDefaultTaskFunction (name: string): Promise<boolean> {
+ return await this.sendTaskFunctionOperationToWorker({
taskFunctionOperation: 'default',
taskFunctionName: name
})
- return true
}
private shallExecuteTask (workerNodeKey: number): boolean {
readonly addTaskFunction: (
name: string,
taskFunction: TaskFunction
- ) => boolean
+ ) => Promise<boolean>
/**
* Removes a task function from this pool.
*
* @param name - The name of the task function.
* @returns `true` if the task function was removed, `false` otherwise.
*/
- readonly removeTaskFunction: (name: string) => boolean
+ readonly removeTaskFunction: (name: string) => Promise<boolean>
/**
* Lists the names of task function available in this pool.
*
* @param name - The name of the task function.
* @returns `true` if the default task function was set, `false` otherwise.
*/
- readonly setDefaultTaskFunction: (name: string) => boolean
+ readonly setDefaultTaskFunction: (name: string) => Promise<boolean>
/**
* Sets the worker choice strategy in this pool.
*
import type {
TaskAsyncFunction,
TaskFunction,
+ TaskFunctionOperationReturnType,
TaskFunctions,
TaskSyncFunction
} from './task-functions'
-interface TaskFunctionOperationReturnType {
- status: boolean
- error?: Error
-}
-
const DEFAULT_MAX_INACTIVE_TIME = 60000
const DEFAULT_WORKER_OPTIONS: WorkerOptions = {
/**
this.taskFunctions.set(DEFAULT_TASK_NAME, boundFn)
}
this.taskFunctions.set(name, boundFn)
- this.sendTaskFunctionsListToMainWorker()
+ this.sendTaskFunctionNamesToMainWorker()
return { status: true }
} catch (error) {
return { status: false, error: error as Error }
)
}
const deleteStatus = this.taskFunctions.delete(name)
- this.sendTaskFunctionsListToMainWorker()
+ this.sendTaskFunctionNamesToMainWorker()
return { status: deleteStatus }
} catch (error) {
return { status: false, error: error as Error }
): void
/**
- * Sends the list of task function names to the main worker.
+ * Sends task function names to the main worker.
*/
- protected sendTaskFunctionsListToMainWorker (): void {
+ protected sendTaskFunctionNamesToMainWorker (): void {
this.sendToMainWorker({
taskFunctionNames: this.listTaskFunctionNames(),
workerId: this.id