X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=eb517943a6dc74b6ec88ca9cd17f4ca5c5eb5365;hb=904f1dd1c228e9cc710aa03fab7e50bc6daa1192;hp=90cede3a8227f7e21cd7a0f32ec84e4f8e6bdbf6;hpb=6703b9f4492e347500111c42ffddbd8341c8f262;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 90cede3a..eb517943 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -93,6 +93,13 @@ export abstract class AbstractPool< */ protected readonly max?: number + /** + * The task functions added at runtime map: + * - `key`: The task function name. + * - `value`: The task function itself. + */ + private readonly taskFunctions: Map> + /** * Whether the pool is starting or not. */ @@ -146,6 +153,8 @@ export abstract class AbstractPool< this.setupHook() + this.taskFunctions = new Map>() + this.starting = true this.startPool() this.starting = false @@ -601,7 +610,7 @@ export abstract class AbstractPool< * @param workerId - The worker id. * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise. */ - private getWorkerNodeKeyByWorkerId (workerId: number): number { + private getWorkerNodeKeyByWorkerId (workerId: number | undefined): number { return this.workerNodes.findIndex( workerNode => workerNode.info.id === workerId ) @@ -723,44 +732,105 @@ export abstract class AbstractPool< } } - private sendToWorkers (message: Omit, 'workerId'>): number { - let messagesCount = 0 - for (const [workerNodeKey] of this.workerNodes.entries()) { - this.sendToWorker(workerNodeKey, { - ...message, - workerId: this.getWorkerInfo(workerNodeKey).id as number + private async sendTaskFunctionOperationToWorker ( + workerNodeKey: number, + message: MessageValue + ): Promise { + const workerId = this.getWorkerInfo(workerNodeKey).id as number + return await new Promise((resolve, reject) => { + this.registerWorkerMessageListener(workerNodeKey, message => { + if ( + message.workerId === workerId && + message.taskFunctionOperationStatus === true + ) { + resolve(true) + } else if ( + message.workerId === workerId && + message.taskFunctionOperationStatus === false + ) { + reject( + new Error( + `Task function operation ${ + message.taskFunctionOperation as string + } failed on worker ${message.workerId}` + ) + ) + } }) - ++messagesCount - } - return messagesCount + this.sendToWorker(workerNodeKey, message) + }) + } + + private async sendTaskFunctionOperationToWorkers ( + message: Omit, 'workerId'> + ): Promise { + return await new Promise((resolve, reject) => { + const responsesReceived = new Array>() + for (const [workerNodeKey] of this.workerNodes.entries()) { + this.registerWorkerMessageListener(workerNodeKey, message => { + if (message.taskFunctionOperationStatus != null) { + responsesReceived.push(message) + if ( + responsesReceived.length === this.workerNodes.length && + responsesReceived.every( + message => message.taskFunctionOperationStatus === true + ) + ) { + resolve(true) + } else if ( + responsesReceived.length === this.workerNodes.length && + responsesReceived.some( + message => message.taskFunctionOperationStatus === false + ) + ) { + reject( + new Error( + `Task function operation ${ + message.taskFunctionOperation as string + } failed on worker ${message.workerId as number}` + ) + ) + } + } + }) + this.sendToWorker(workerNodeKey, message) + } + }) } /** @inheritDoc */ public hasTaskFunction (name: string): boolean { - this.sendToWorkers({ - taskFunctionOperation: 'has', - taskFunctionName: name - }) - return true + for (const workerNode of this.workerNodes) { + if ( + Array.isArray(workerNode.info.taskFunctionNames) && + workerNode.info.taskFunctionNames.includes(name) + ) { + return true + } + } + return false } /** @inheritDoc */ - public addTaskFunction (name: string, taskFunction: TaskFunction): boolean { - this.sendToWorkers({ + public async addTaskFunction ( + name: string, + taskFunction: TaskFunction + ): Promise { + this.taskFunctions.set(name, taskFunction) + return await this.sendTaskFunctionOperationToWorkers({ taskFunctionOperation: 'add', taskFunctionName: name, taskFunction: taskFunction.toString() }) - return true } /** @inheritDoc */ - public removeTaskFunction (name: string): boolean { - this.sendToWorkers({ + public async removeTaskFunction (name: string): Promise { + this.taskFunctions.delete(name) + return await this.sendTaskFunctionOperationToWorkers({ taskFunctionOperation: 'remove', taskFunctionName: name }) - return true } /** @inheritDoc */ @@ -777,12 +847,11 @@ export abstract class AbstractPool< } /** @inheritDoc */ - public setDefaultTaskFunction (name: string): boolean { - this.sendToWorkers({ + public async setDefaultTaskFunction (name: string): Promise { + return await this.sendTaskFunctionOperationToWorkers({ taskFunctionOperation: 'default', taskFunctionName: name }) - return true } private shallExecuteTask (workerNodeKey: number): boolean { @@ -828,7 +897,6 @@ export abstract class AbstractPool< data: data ?? ({} as Data), transferList, timestamp, - workerId: this.getWorkerInfo(workerNodeKey).id as number, taskId: randomUUID() } this.promiseResponseMap.set(task.taskId as string, { @@ -860,18 +928,23 @@ export abstract class AbstractPool< } protected async sendKillMessageToWorker ( - workerNodeKey: number, - workerId: number + workerNodeKey: number ): Promise { await new Promise((resolve, reject) => { this.registerWorkerMessageListener(workerNodeKey, message => { if (message.kill === 'success') { resolve() } else if (message.kill === 'failure') { - reject(new Error(`Worker ${workerId} kill message handling failed`)) + reject( + new Error( + `Worker ${ + message.workerId as number + } kill message handling failed` + ) + ) } }) - this.sendToWorker(workerNodeKey, { kill: true, workerId }) + this.sendToWorker(workerNodeKey, { kill: true }) }) } @@ -1175,9 +1248,19 @@ export abstract class AbstractPool< }) const workerInfo = this.getWorkerInfo(workerNodeKey) this.sendToWorker(workerNodeKey, { - checkActive: true, - workerId: workerInfo.id as number + checkActive: true }) + if (this.taskFunctions.size > 0) { + for (const [taskFunctionName, taskFunction] of this.taskFunctions) { + this.sendTaskFunctionOperationToWorker(workerNodeKey, { + taskFunctionOperation: 'add', + taskFunctionName, + taskFunction: taskFunction.toString() + }).catch(error => { + this.emitter?.emit(PoolEvents.error, error) + }) + } + } workerInfo.dynamic = true if ( this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady || @@ -1243,8 +1326,7 @@ export abstract class AbstractPool< .runTime.aggregate, elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements() .elu.aggregate - }, - workerId: this.getWorkerInfo(workerNodeKey).id as number + } }) } @@ -1260,11 +1342,7 @@ export abstract class AbstractPool< }, 0 ) - const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey] - const task = { - ...(this.dequeueTask(workerNodeKey) as Task), - workerId: destinationWorkerNode.info.id as number - } + const task = this.dequeueTask(workerNodeKey) as Task if (this.shallExecuteTask(destinationWorkerNodeKey)) { this.executeTask(destinationWorkerNodeKey, task) } else { @@ -1294,7 +1372,6 @@ export abstract class AbstractPool< private taskStealingOnEmptyQueue (workerId: number): void { const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId) - const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey] const workerNodes = this.workerNodes .slice() .sort( @@ -1308,10 +1385,7 @@ export abstract class AbstractPool< workerNode.usage.tasks.queued > 0 ) if (sourceWorkerNode != null) { - const task = { - ...(sourceWorkerNode.popTask() as Task), - workerId: destinationWorkerNode.info.id as number - } + const task = sourceWorkerNode.popTask() as Task if (this.shallExecuteTask(destinationWorkerNodeKey)) { this.executeTask(destinationWorkerNodeKey, task) } else { @@ -1345,10 +1419,7 @@ export abstract class AbstractPool< workerNode.usage.tasks.queued < (this.opts.tasksQueueOptions?.size as number) - sizeOffset ) { - const task = { - ...(sourceWorkerNode.popTask() as Task), - workerId: workerNode.info.id as number - } + const task = sourceWorkerNode.popTask() as Task if (this.shallExecuteTask(workerNodeKey)) { this.executeTask(workerNodeKey, task) } else { @@ -1381,15 +1452,15 @@ export abstract class AbstractPool< this.getWorkerInfo( this.getWorkerNodeKeyByWorkerId(message.workerId) ).taskFunctionNames = message.taskFunctionNames - } else if (message.taskFunctionOperation != null) { - // Task function operation response received from worker } } } private handleWorkerReadyResponse (message: MessageValue): void { if (message.ready === false) { - throw new Error(`Worker ${message.workerId} failed to initialize`) + throw new Error( + `Worker ${message.workerId as number} failed to initialize` + ) } const workerInfo = this.getWorkerInfo( this.getWorkerNodeKeyByWorkerId(message.workerId)