X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=c1c0a58c784e71dd3d049d5b16298f86dfd8d0a4;hb=e81c38f2f8046e9a482e94040b8f6d31dcda160c;hp=6457b1554ddbf5bfeb0156a0d40e9bb108dd02dc;hpb=b6bfca0158affbf4bdc0106a7b400eb9f7b73894;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 6457b155..c1c0a58c 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -21,6 +21,7 @@ import { updateMeasurementStatistics } from '../utils' import { KillBehaviors } from '../worker/worker-options' +import type { TaskFunction } from '../worker/task-functions' import { type IPool, PoolEmitter, @@ -722,19 +723,100 @@ export abstract class AbstractPool< } } + private async sendTaskFunctionOperationToWorker ( + message: Omit, 'workerId'> + ): Promise { + return await new Promise((resolve, reject) => { + const responsesReceived = new Array>() + for (const [workerNodeKey] of this.workerNodes.entries()) { + this.registerWorkerMessageListener(workerNodeKey, message => { + if (message.taskFunctionOperationStatus != null) { + responsesReceived.push(message) + if ( + responsesReceived.length === this.workerNodes.length && + responsesReceived.every( + message => message.taskFunctionOperationStatus === true + ) + ) { + resolve(true) + } else if ( + responsesReceived.length === this.workerNodes.length && + responsesReceived.some( + message => message.taskFunctionOperationStatus === false + ) + ) { + reject( + new Error( + `Task function operation ${ + message.taskFunctionOperation as string + } failed on worker ${message.workerId}` + ) + ) + } + } + }) + this.sendToWorker(workerNodeKey, { + ...message, + workerId: this.getWorkerInfo(workerNodeKey).id as number + }) + } + }) + } + + /** @inheritDoc */ + public hasTaskFunction (name: string): boolean { + for (const workerNode of this.workerNodes) { + if ( + Array.isArray(workerNode.info.taskFunctionNames) && + workerNode.info.taskFunctionNames.includes(name) + ) { + return true + } + } + return false + } + + /** @inheritDoc */ + public async addTaskFunction ( + name: string, + taskFunction: TaskFunction + ): Promise { + return await this.sendTaskFunctionOperationToWorker({ + taskFunctionOperation: 'add', + taskFunctionName: name, + taskFunction: taskFunction.toString() + }) + } + /** @inheritDoc */ - public listTaskFunctions (): string[] { + public async removeTaskFunction (name: string): Promise { + return await this.sendTaskFunctionOperationToWorker({ + taskFunctionOperation: 'remove', + taskFunctionName: name + }) + } + + /** @inheritDoc */ + public listTaskFunctionNames (): string[] { for (const workerNode of this.workerNodes) { if ( - Array.isArray(workerNode.info.taskFunctions) && - workerNode.info.taskFunctions.length > 0 + Array.isArray(workerNode.info.taskFunctionNames) && + workerNode.info.taskFunctionNames.length > 0 ) { - return workerNode.info.taskFunctions + return workerNode.info.taskFunctionNames } } return [] } + /** @inheritDoc */ + public async setDefaultTaskFunction (name: string): Promise { + return await this.sendTaskFunctionOperationToWorker({ + taskFunctionOperation: 'default', + taskFunctionName: name + }) + } + private shallExecuteTask (workerNodeKey: number): boolean { return ( this.tasksQueueSize(workerNodeKey) === 0 && @@ -921,8 +1003,8 @@ export abstract class AbstractPool< const workerInfo = this.getWorkerInfo(workerNodeKey) return ( workerInfo != null && - Array.isArray(workerInfo.taskFunctions) && - workerInfo.taskFunctions.length > 2 + Array.isArray(workerInfo.taskFunctionNames) && + workerInfo.taskFunctionNames.length > 2 ) } @@ -937,7 +1019,7 @@ export abstract class AbstractPool< ) { --workerTaskStatistics.executing } - if (message.taskError == null) { + if (message.workerError == null) { ++workerTaskStatistics.executed } else { ++workerTaskStatistics.failed @@ -948,7 +1030,7 @@ export abstract class AbstractPool< workerUsage: WorkerUsage, message: MessageValue ): void { - if (message.taskError != null) { + if (message.workerError != null) { return } updateMeasurementStatistics( @@ -975,7 +1057,7 @@ export abstract class AbstractPool< workerUsage: WorkerUsage, message: MessageValue ): void { - if (message.taskError != null) { + if (message.workerError != null) { return } const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements = @@ -1320,17 +1402,19 @@ export abstract class AbstractPool< protected workerListener (): (message: MessageValue) => void { return message => { this.checkMessageWorkerId(message) - if (message.ready != null && message.taskFunctions != null) { + if (message.ready != null && message.taskFunctionNames != null) { // Worker ready response received from worker this.handleWorkerReadyResponse(message) } else if (message.taskId != null) { // Task execution response received from worker this.handleTaskExecutionResponse(message) - } else if (message.taskFunctions != null) { - // Task functions message received from worker + } else if (message.taskFunctionNames != null) { + // Task function names message received from worker this.getWorkerInfo( this.getWorkerNodeKeyByWorkerId(message.workerId) - ).taskFunctions = message.taskFunctions + ).taskFunctionNames = message.taskFunctionNames + } else if (message.taskFunctionOperation != null) { + // Task function operation response received from worker } } } @@ -1343,19 +1427,19 @@ export abstract class AbstractPool< this.getWorkerNodeKeyByWorkerId(message.workerId) ) workerInfo.ready = message.ready as boolean - workerInfo.taskFunctions = message.taskFunctions + workerInfo.taskFunctionNames = message.taskFunctionNames if (this.emitter != null && this.ready) { this.emitter.emit(PoolEvents.ready, this.info) } } private handleTaskExecutionResponse (message: MessageValue): void { - const { taskId, taskError, data } = message + const { taskId, workerError, data } = message const promiseResponse = this.promiseResponseMap.get(taskId as string) if (promiseResponse != null) { - if (taskError != null) { - this.emitter?.emit(PoolEvents.taskError, taskError) - promiseResponse.reject(taskError.message) + if (workerError != null) { + this.emitter?.emit(PoolEvents.taskError, workerError) + promiseResponse.reject(workerError.message) } else { promiseResponse.resolve(data as Response) }