From 72ae84a2c9ccb043ce8b0ca2b83014d47a8120cb Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Wed, 13 Sep 2023 22:23:58 +0200 Subject: [PATCH] fix: handle added function at runtime with dynamic worker MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- src/pools/abstract-pool.ts | 108 +++++++++++++++++++++++++------------ src/pools/cluster/fixed.ts | 10 ++-- src/pools/pool.ts | 2 +- src/pools/thread/fixed.ts | 10 ++-- src/utility-types.ts | 2 +- 5 files changed, 86 insertions(+), 46 deletions(-) diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index c1c0a58c..6a26c7b7 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -93,6 +93,13 @@ export abstract class AbstractPool< */ protected readonly max?: number + /** + * The task functions added at runtime map: + * - `key`: The task function name. + * - `value`: The task function itself. + */ + private readonly taskFunctions: Map> + /** * Whether the pool is starting or not. */ @@ -146,6 +153,8 @@ export abstract class AbstractPool< this.setupHook() + this.taskFunctions = new Map>() + this.starting = true this.startPool() this.starting = false @@ -601,7 +610,7 @@ export abstract class AbstractPool< * @param workerId - The worker id. * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise. */ - private getWorkerNodeKeyByWorkerId (workerId: number): number { + private getWorkerNodeKeyByWorkerId (workerId: number | undefined): number { return this.workerNodes.findIndex( workerNode => workerNode.info.id === workerId ) @@ -724,6 +733,35 @@ export abstract class AbstractPool< } private async sendTaskFunctionOperationToWorker ( + workerNodeKey: number, + message: MessageValue + ): Promise { + const workerId = this.getWorkerInfo(workerNodeKey).id as number + return await new Promise((resolve, reject) => { + this.registerWorkerMessageListener(workerNodeKey, message => { + if ( + message.workerId === workerId && + message.taskFunctionOperationStatus === true + ) { + resolve(true) + } else if ( + message.workerId === workerId && + message.taskFunctionOperationStatus === false + ) { + reject( + new Error( + `Task function operation ${ + message.taskFunctionOperation as string + } failed on worker ${message.workerId}` + ) + ) + } + }) + this.sendToWorker(workerNodeKey, message) + }) + } + + private async sendTaskFunctionOperationToWorkers ( message: Omit, 'workerId'> ): Promise { return await new Promise((resolve, reject) => { @@ -749,16 +787,13 @@ export abstract class AbstractPool< new Error( `Task function operation ${ message.taskFunctionOperation as string - } failed on worker ${message.workerId}` + } failed on worker ${message.workerId as number}` ) ) } } }) - this.sendToWorker(workerNodeKey, { - ...message, - workerId: this.getWorkerInfo(workerNodeKey).id as number - }) + this.sendToWorker(workerNodeKey, message) } }) } @@ -779,9 +814,10 @@ export abstract class AbstractPool< /** @inheritDoc */ public async addTaskFunction ( name: string, - taskFunction: TaskFunction + taskFunction: TaskFunction ): Promise { - return await this.sendTaskFunctionOperationToWorker({ + this.taskFunctions.set(name, taskFunction) + return await this.sendTaskFunctionOperationToWorkers({ taskFunctionOperation: 'add', taskFunctionName: name, taskFunction: taskFunction.toString() @@ -790,7 +826,8 @@ export abstract class AbstractPool< /** @inheritDoc */ public async removeTaskFunction (name: string): Promise { - return await this.sendTaskFunctionOperationToWorker({ + this.taskFunctions.delete(name) + return await this.sendTaskFunctionOperationToWorkers({ taskFunctionOperation: 'remove', taskFunctionName: name }) @@ -811,7 +848,7 @@ export abstract class AbstractPool< /** @inheritDoc */ public async setDefaultTaskFunction (name: string): Promise { - return await this.sendTaskFunctionOperationToWorker({ + return await this.sendTaskFunctionOperationToWorkers({ taskFunctionOperation: 'default', taskFunctionName: name }) @@ -860,7 +897,6 @@ export abstract class AbstractPool< data: data ?? ({} as Data), transferList, timestamp, - workerId: this.getWorkerInfo(workerNodeKey).id as number, taskId: randomUUID() } this.promiseResponseMap.set(task.taskId as string, { @@ -892,18 +928,23 @@ export abstract class AbstractPool< } protected async sendKillMessageToWorker ( - workerNodeKey: number, - workerId: number + workerNodeKey: number ): Promise { await new Promise((resolve, reject) => { this.registerWorkerMessageListener(workerNodeKey, message => { if (message.kill === 'success') { resolve() } else if (message.kill === 'failure') { - reject(new Error(`Worker ${workerId} kill message handling failed`)) + reject( + new Error( + `Worker ${ + message.workerId as number + } kill message handling failed` + ) + ) } }) - this.sendToWorker(workerNodeKey, { kill: true, workerId }) + this.sendToWorker(workerNodeKey, { kill: true }) }) } @@ -1210,6 +1251,17 @@ export abstract class AbstractPool< checkActive: true, workerId: workerInfo.id as number }) + if (this.taskFunctions.size > 0) { + for (const [taskFunctionName, taskFunction] of this.taskFunctions) { + this.sendTaskFunctionOperationToWorker(workerNodeKey, { + taskFunctionOperation: 'add', + taskFunctionName, + taskFunction: taskFunction.toString() + }).catch(error => { + this.emitter?.emit(PoolEvents.error, error) + }) + } + } workerInfo.dynamic = true if ( this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady || @@ -1275,8 +1327,7 @@ export abstract class AbstractPool< .runTime.aggregate, elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements() .elu.aggregate - }, - workerId: this.getWorkerInfo(workerNodeKey).id as number + } }) } @@ -1292,11 +1343,7 @@ export abstract class AbstractPool< }, 0 ) - const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey] - const task = { - ...(this.dequeueTask(workerNodeKey) as Task), - workerId: destinationWorkerNode.info.id as number - } + const task = this.dequeueTask(workerNodeKey) as Task if (this.shallExecuteTask(destinationWorkerNodeKey)) { this.executeTask(destinationWorkerNodeKey, task) } else { @@ -1326,7 +1373,6 @@ export abstract class AbstractPool< private taskStealingOnEmptyQueue (workerId: number): void { const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId) - const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey] const workerNodes = this.workerNodes .slice() .sort( @@ -1340,10 +1386,7 @@ export abstract class AbstractPool< workerNode.usage.tasks.queued > 0 ) if (sourceWorkerNode != null) { - const task = { - ...(sourceWorkerNode.popTask() as Task), - workerId: destinationWorkerNode.info.id as number - } + const task = sourceWorkerNode.popTask() as Task if (this.shallExecuteTask(destinationWorkerNodeKey)) { this.executeTask(destinationWorkerNodeKey, task) } else { @@ -1377,10 +1420,7 @@ export abstract class AbstractPool< workerNode.usage.tasks.queued < (this.opts.tasksQueueOptions?.size as number) - sizeOffset ) { - const task = { - ...(sourceWorkerNode.popTask() as Task), - workerId: workerNode.info.id as number - } + const task = sourceWorkerNode.popTask() as Task if (this.shallExecuteTask(workerNodeKey)) { this.executeTask(workerNodeKey, task) } else { @@ -1413,15 +1453,15 @@ export abstract class AbstractPool< this.getWorkerInfo( this.getWorkerNodeKeyByWorkerId(message.workerId) ).taskFunctionNames = message.taskFunctionNames - } else if (message.taskFunctionOperation != null) { - // Task function operation response received from worker } } } private handleWorkerReadyResponse (message: MessageValue): void { if (message.ready === false) { - throw new Error(`Worker ${message.workerId} failed to initialize`) + throw new Error( + `Worker ${message.workerId as number} failed to initialize` + ) } const workerInfo = this.getWorkerInfo( this.getWorkerNodeKeyByWorkerId(message.workerId) diff --git a/src/pools/cluster/fixed.ts b/src/pools/cluster/fixed.ts index 17ef7e9e..20215df6 100644 --- a/src/pools/cluster/fixed.ts +++ b/src/pools/cluster/fixed.ts @@ -73,10 +73,7 @@ export class FixedClusterPool< worker.on('disconnect', () => { worker.kill() }) - await this.sendKillMessageToWorker( - workerNodeKey, - workerNode.info.id as number - ) + await this.sendKillMessageToWorker(workerNodeKey) worker.disconnect() await waitWorkerExit } @@ -86,7 +83,10 @@ export class FixedClusterPool< workerNodeKey: number, message: MessageValue ): void { - this.workerNodes[workerNodeKey].worker.send(message) + this.workerNodes[workerNodeKey].worker.send({ + ...message, + workerId: this.workerNodes[workerNodeKey].info.id as number + }) } /** @inheritDoc */ diff --git a/src/pools/pool.ts b/src/pools/pool.ts index a19b70a3..cb758089 100644 --- a/src/pools/pool.ts +++ b/src/pools/pool.ts @@ -251,7 +251,7 @@ export interface IPool< */ readonly addTaskFunction: ( name: string, - taskFunction: TaskFunction + taskFunction: TaskFunction ) => Promise /** * Removes a task function from this pool. diff --git a/src/pools/thread/fixed.ts b/src/pools/thread/fixed.ts index 6e234e2e..51123204 100644 --- a/src/pools/thread/fixed.ts +++ b/src/pools/thread/fixed.ts @@ -67,10 +67,7 @@ export class FixedThreadPool< resolve() }) }) - await this.sendKillMessageToWorker( - workerNodeKey, - workerNode.info.id as number - ) + await this.sendKillMessageToWorker(workerNodeKey) workerNode.closeChannel() await worker.terminate() await waitWorkerExit @@ -84,7 +81,10 @@ export class FixedThreadPool< ): void { ( this.workerNodes[workerNodeKey].messageChannel as MessageChannel - ).port1.postMessage(message, transferList) + ).port1.postMessage( + { ...message, workerId: this.workerNodes[workerNodeKey].info.id }, + transferList + ) } /** @inheritDoc */ diff --git a/src/utility-types.ts b/src/utility-types.ts index 3d5673f9..3817515a 100644 --- a/src/utility-types.ts +++ b/src/utility-types.ts @@ -72,7 +72,7 @@ export interface Task { /** * Worker id. */ - readonly workerId: number + readonly workerId?: number /** * Task name. */ -- 2.34.1