From e81c38f2f8046e9a482e94040b8f6d31dcda160c Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Wed, 13 Sep 2023 21:27:13 +0200 Subject: [PATCH] fix: properly handle response for add/remove/set task function operaions MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- src/index.ts | 1 + src/pools/abstract-pool.ts | 66 +++++++++++++++++++++++++---------- src/pools/pool.ts | 6 ++-- src/worker/abstract-worker.ts | 14 +++----- src/worker/task-functions.ts | 8 +++++ 5 files changed, 64 insertions(+), 31 deletions(-) diff --git a/src/index.ts b/src/index.ts index d1610953..8faba255 100644 --- a/src/index.ts +++ b/src/index.ts @@ -60,6 +60,7 @@ export type { export type { TaskAsyncFunction, TaskFunction, + TaskFunctionOperationReturnType, TaskFunctions, TaskSyncFunction } from './worker/task-functions' diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 1b53065c..c1c0a58c 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -723,16 +723,44 @@ export abstract class AbstractPool< } } - private sendToWorkers (message: Omit, 'workerId'>): number { - let messagesCount = 0 - for (const [workerNodeKey] of this.workerNodes.entries()) { - this.sendToWorker(workerNodeKey, { - ...message, - workerId: this.getWorkerInfo(workerNodeKey).id as number - }) - ++messagesCount - } - return messagesCount + private async sendTaskFunctionOperationToWorker ( + message: Omit, 'workerId'> + ): Promise { + return await new Promise((resolve, reject) => { + const responsesReceived = new Array>() + for (const [workerNodeKey] of this.workerNodes.entries()) { + this.registerWorkerMessageListener(workerNodeKey, message => { + if (message.taskFunctionOperationStatus != null) { + responsesReceived.push(message) + if ( + responsesReceived.length === this.workerNodes.length && + responsesReceived.every( + message => message.taskFunctionOperationStatus === true + ) + ) { + resolve(true) + } else if ( + responsesReceived.length === this.workerNodes.length && + responsesReceived.some( + message => message.taskFunctionOperationStatus === false + ) + ) { + reject( + new Error( + `Task function operation ${ + message.taskFunctionOperation as string + } failed on worker ${message.workerId}` + ) + ) + } + } + }) + this.sendToWorker(workerNodeKey, { + ...message, + workerId: this.getWorkerInfo(workerNodeKey).id as number + }) + } + }) } /** @inheritDoc */ @@ -749,22 +777,23 @@ export abstract class AbstractPool< } /** @inheritDoc */ - public addTaskFunction (name: string, taskFunction: TaskFunction): boolean { - this.sendToWorkers({ + public async addTaskFunction ( + name: string, + taskFunction: TaskFunction + ): Promise { + return await this.sendTaskFunctionOperationToWorker({ taskFunctionOperation: 'add', taskFunctionName: name, taskFunction: taskFunction.toString() }) - return true } /** @inheritDoc */ - public removeTaskFunction (name: string): boolean { - this.sendToWorkers({ + public async removeTaskFunction (name: string): Promise { + return await this.sendTaskFunctionOperationToWorker({ taskFunctionOperation: 'remove', taskFunctionName: name }) - return true } /** @inheritDoc */ @@ -781,12 +810,11 @@ export abstract class AbstractPool< } /** @inheritDoc */ - public setDefaultTaskFunction (name: string): boolean { - this.sendToWorkers({ + public async setDefaultTaskFunction (name: string): Promise { + return await this.sendTaskFunctionOperationToWorker({ taskFunctionOperation: 'default', taskFunctionName: name }) - return true } private shallExecuteTask (workerNodeKey: number): boolean { diff --git a/src/pools/pool.ts b/src/pools/pool.ts index 6156a042..a19b70a3 100644 --- a/src/pools/pool.ts +++ b/src/pools/pool.ts @@ -252,14 +252,14 @@ export interface IPool< readonly addTaskFunction: ( name: string, taskFunction: TaskFunction - ) => boolean + ) => Promise /** * Removes a task function from this pool. * * @param name - The name of the task function. * @returns `true` if the task function was removed, `false` otherwise. */ - readonly removeTaskFunction: (name: string) => boolean + readonly removeTaskFunction: (name: string) => Promise /** * Lists the names of task function available in this pool. * @@ -272,7 +272,7 @@ export interface IPool< * @param name - The name of the task function. * @returns `true` if the default task function was set, `false` otherwise. */ - readonly setDefaultTaskFunction: (name: string) => boolean + readonly setDefaultTaskFunction: (name: string) => Promise /** * Sets the worker choice strategy in this pool. * diff --git a/src/worker/abstract-worker.ts b/src/worker/abstract-worker.ts index 8cd370de..107d7238 100644 --- a/src/worker/abstract-worker.ts +++ b/src/worker/abstract-worker.ts @@ -18,15 +18,11 @@ import { KillBehaviors, type WorkerOptions } from './worker-options' import type { TaskAsyncFunction, TaskFunction, + TaskFunctionOperationReturnType, TaskFunctions, TaskSyncFunction } from './task-functions' -interface TaskFunctionOperationReturnType { - status: boolean - error?: Error -} - const DEFAULT_MAX_INACTIVE_TIME = 60000 const DEFAULT_WORKER_OPTIONS: WorkerOptions = { /** @@ -217,7 +213,7 @@ export abstract class AbstractWorker< this.taskFunctions.set(DEFAULT_TASK_NAME, boundFn) } this.taskFunctions.set(name, boundFn) - this.sendTaskFunctionsListToMainWorker() + this.sendTaskFunctionNamesToMainWorker() return { status: true } } catch (error) { return { status: false, error: error as Error } @@ -247,7 +243,7 @@ export abstract class AbstractWorker< ) } const deleteStatus = this.taskFunctions.delete(name) - this.sendTaskFunctionsListToMainWorker() + this.sendTaskFunctionNamesToMainWorker() return { status: deleteStatus } } catch (error) { return { status: false, error: error as Error } @@ -485,9 +481,9 @@ export abstract class AbstractWorker< ): void /** - * Sends the list of task function names to the main worker. + * Sends task function names to the main worker. */ - protected sendTaskFunctionsListToMainWorker (): void { + protected sendTaskFunctionNamesToMainWorker (): void { this.sendToMainWorker({ taskFunctionNames: this.listTaskFunctionNames(), workerId: this.id diff --git a/src/worker/task-functions.ts b/src/worker/task-functions.ts index a61d6962..353b8b0c 100644 --- a/src/worker/task-functions.ts +++ b/src/worker/task-functions.ts @@ -43,3 +43,11 @@ export type TaskFunctions = Record< string, TaskFunction > + +/** + * Task function operation return type. + */ +export interface TaskFunctionOperationReturnType { + status: boolean + error?: Error +} -- 2.34.1