X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=0cb4c30ce14ca40b3c2784ffa9cab3e65de0004b;hb=4608939f425568da5553db1f5bf883d4cca18e02;hp=eb4c415b4a21be72446bb2da22db83ea9891acb7;hpb=9867896b122160ea5d8e6dbccba4cc5916bf523b;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index eb4c415b..0cb4c30c 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -21,6 +21,7 @@ import { updateMeasurementStatistics } from '../utils' import { KillBehaviors } from '../worker/worker-options' +import type { TaskFunction } from '../worker/task-functions' import { type IPool, PoolEmitter, @@ -91,6 +92,13 @@ export abstract class AbstractPool< */ protected readonly max?: number + /** + * The task functions added at runtime map: + * - `key`: The task function name. + * - `value`: The task function itself. + */ + private readonly taskFunctions: Map> + /** * Whether the pool is started or not. */ @@ -144,6 +152,8 @@ export abstract class AbstractPool< this.setupHook() + this.taskFunctions = new Map>() + this.started = false this.starting = false if (this.opts.startWorkers === true) { @@ -585,7 +595,7 @@ export abstract class AbstractPool< * @param workerId - The worker id. * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise. */ - private getWorkerNodeKeyByWorkerId (workerId: number): number { + private getWorkerNodeKeyByWorkerId (workerId: number | undefined): number { return this.workerNodes.findIndex( workerNode => workerNode.info.id === workerId ) @@ -746,19 +756,128 @@ export abstract class AbstractPool< ) } + private async sendTaskFunctionOperationToWorker ( + workerNodeKey: number, + message: MessageValue + ): Promise { + const workerId = this.getWorkerInfo(workerNodeKey).id as number + return await new Promise((resolve, reject) => { + this.registerWorkerMessageListener(workerNodeKey, message => { + if ( + message.workerId === workerId && + message.taskFunctionOperationStatus === true + ) { + resolve(true) + } else if ( + message.workerId === workerId && + message.taskFunctionOperationStatus === false + ) { + reject( + new Error( + `Task function operation ${ + message.taskFunctionOperation as string + } failed on worker ${message.workerId}` + ) + ) + } + }) + this.sendToWorker(workerNodeKey, message) + }) + } + + private async sendTaskFunctionOperationToWorkers ( + message: Omit, 'workerId'> + ): Promise { + return await new Promise((resolve, reject) => { + const responsesReceived = new Array>() + for (const [workerNodeKey] of this.workerNodes.entries()) { + this.registerWorkerMessageListener(workerNodeKey, message => { + if (message.taskFunctionOperationStatus != null) { + responsesReceived.push(message) + if ( + responsesReceived.length === this.workerNodes.length && + responsesReceived.every( + message => message.taskFunctionOperationStatus === true + ) + ) { + resolve(true) + } else if ( + responsesReceived.length === this.workerNodes.length && + responsesReceived.some( + message => message.taskFunctionOperationStatus === false + ) + ) { + reject( + new Error( + `Task function operation ${ + message.taskFunctionOperation as string + } failed on worker ${message.workerId as number}` + ) + ) + } + } + }) + this.sendToWorker(workerNodeKey, message) + } + }) + } + + /** @inheritDoc */ + public hasTaskFunction (name: string): boolean { + for (const workerNode of this.workerNodes) { + if ( + Array.isArray(workerNode.info.taskFunctionNames) && + workerNode.info.taskFunctionNames.includes(name) + ) { + return true + } + } + return false + } + + /** @inheritDoc */ + public async addTaskFunction ( + name: string, + taskFunction: TaskFunction + ): Promise { + this.taskFunctions.set(name, taskFunction) + return await this.sendTaskFunctionOperationToWorkers({ + taskFunctionOperation: 'add', + taskFunctionName: name, + taskFunction: taskFunction.toString() + }) + } + + /** @inheritDoc */ + public async removeTaskFunction (name: string): Promise { + this.taskFunctions.delete(name) + return await this.sendTaskFunctionOperationToWorkers({ + taskFunctionOperation: 'remove', + taskFunctionName: name + }) + } + /** @inheritDoc */ - public listTaskFunctions (): string[] { + public listTaskFunctionNames (): string[] { for (const workerNode of this.workerNodes) { if ( - Array.isArray(workerNode.info.taskFunctions) && - workerNode.info.taskFunctions.length > 0 + Array.isArray(workerNode.info.taskFunctionNames) && + workerNode.info.taskFunctionNames.length > 0 ) { - return workerNode.info.taskFunctions + return workerNode.info.taskFunctionNames } } return [] } + /** @inheritDoc */ + public async setDefaultTaskFunction (name: string): Promise { + return await this.sendTaskFunctionOperationToWorkers({ + taskFunctionOperation: 'default', + taskFunctionName: name + }) + } + private shallExecuteTask (workerNodeKey: number): boolean { return ( this.tasksQueueSize(workerNodeKey) === 0 && @@ -802,7 +921,6 @@ export abstract class AbstractPool< data: data ?? ({} as Data), transferList, timestamp, - workerId: this.getWorkerInfo(workerNodeKey).id as number, taskId: randomUUID() } this.promiseResponseMap.set(task.taskId as string, { @@ -850,18 +968,23 @@ export abstract class AbstractPool< } protected async sendKillMessageToWorker ( - workerNodeKey: number, - workerId: number + workerNodeKey: number ): Promise { await new Promise((resolve, reject) => { this.registerWorkerMessageListener(workerNodeKey, message => { if (message.kill === 'success') { resolve() } else if (message.kill === 'failure') { - reject(new Error(`Worker ${workerId} kill message handling failed`)) + reject( + new Error( + `Worker ${ + message.workerId as number + } kill message handling failed` + ) + ) } }) - this.sendToWorker(workerNodeKey, { kill: true, workerId }) + this.sendToWorker(workerNodeKey, { kill: true }) }) } @@ -961,8 +1084,8 @@ export abstract class AbstractPool< const workerInfo = this.getWorkerInfo(workerNodeKey) return ( workerInfo != null && - Array.isArray(workerInfo.taskFunctions) && - workerInfo.taskFunctions.length > 2 + Array.isArray(workerInfo.taskFunctionNames) && + workerInfo.taskFunctionNames.length > 2 ) } @@ -977,7 +1100,7 @@ export abstract class AbstractPool< ) { --workerTaskStatistics.executing } - if (message.taskError == null) { + if (message.workerError == null) { ++workerTaskStatistics.executed } else { ++workerTaskStatistics.failed @@ -988,7 +1111,7 @@ export abstract class AbstractPool< workerUsage: WorkerUsage, message: MessageValue ): void { - if (message.taskError != null) { + if (message.workerError != null) { return } updateMeasurementStatistics( @@ -1015,7 +1138,7 @@ export abstract class AbstractPool< workerUsage: WorkerUsage, message: MessageValue ): void { - if (message.taskError != null) { + if (message.workerError != null) { return } const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements = @@ -1165,9 +1288,19 @@ export abstract class AbstractPool< }) const workerInfo = this.getWorkerInfo(workerNodeKey) this.sendToWorker(workerNodeKey, { - checkActive: true, - workerId: workerInfo.id as number + checkActive: true }) + if (this.taskFunctions.size > 0) { + for (const [taskFunctionName, taskFunction] of this.taskFunctions) { + this.sendTaskFunctionOperationToWorker(workerNodeKey, { + taskFunctionOperation: 'add', + taskFunctionName, + taskFunction: taskFunction.toString() + }).catch(error => { + this.emitter?.emit(PoolEvents.error, error) + }) + } + } workerInfo.dynamic = true if ( this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady || @@ -1237,8 +1370,7 @@ export abstract class AbstractPool< .runTime.aggregate, elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements() .elu.aggregate - }, - workerId: this.getWorkerInfo(workerNodeKey).id as number + } }) } @@ -1254,11 +1386,7 @@ export abstract class AbstractPool< }, 0 ) - const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey] - const task = { - ...(this.dequeueTask(workerNodeKey) as Task), - workerId: destinationWorkerNode.info.id as number - } + const task = this.dequeueTask(workerNodeKey) as Task if (this.shallExecuteTask(destinationWorkerNodeKey)) { this.executeTask(destinationWorkerNodeKey, task) } else { @@ -1288,7 +1416,6 @@ export abstract class AbstractPool< private taskStealingOnEmptyQueue (workerId: number): void { const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId) - const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey] const workerNodes = this.workerNodes .slice() .sort( @@ -1302,10 +1429,7 @@ export abstract class AbstractPool< workerNode.usage.tasks.queued > 0 ) if (sourceWorkerNode != null) { - const task = { - ...(sourceWorkerNode.popTask() as Task), - workerId: destinationWorkerNode.info.id as number - } + const task = sourceWorkerNode.popTask() as Task if (this.shallExecuteTask(destinationWorkerNodeKey)) { this.executeTask(destinationWorkerNodeKey, task) } else { @@ -1339,10 +1463,7 @@ export abstract class AbstractPool< workerNode.usage.tasks.queued < (this.opts.tasksQueueOptions?.size as number) - sizeOffset ) { - const task = { - ...(sourceWorkerNode.popTask() as Task), - workerId: workerNode.info.id as number - } + const task = sourceWorkerNode.popTask() as Task if (this.shallExecuteTask(workerNodeKey)) { this.executeTask(workerNodeKey, task) } else { @@ -1364,42 +1485,44 @@ export abstract class AbstractPool< protected workerListener (): (message: MessageValue) => void { return message => { this.checkMessageWorkerId(message) - if (message.ready != null && message.taskFunctions != null) { + if (message.ready != null && message.taskFunctionNames != null) { // Worker ready response received from worker this.handleWorkerReadyResponse(message) } else if (message.taskId != null) { // Task execution response received from worker this.handleTaskExecutionResponse(message) - } else if (message.taskFunctions != null) { - // Task functions message received from worker + } else if (message.taskFunctionNames != null) { + // Task function names message received from worker this.getWorkerInfo( this.getWorkerNodeKeyByWorkerId(message.workerId) - ).taskFunctions = message.taskFunctions + ).taskFunctionNames = message.taskFunctionNames } } } private handleWorkerReadyResponse (message: MessageValue): void { if (message.ready === false) { - throw new Error(`Worker ${message.workerId} failed to initialize`) + throw new Error( + `Worker ${message.workerId as number} failed to initialize` + ) } const workerInfo = this.getWorkerInfo( this.getWorkerNodeKeyByWorkerId(message.workerId) ) workerInfo.ready = message.ready as boolean - workerInfo.taskFunctions = message.taskFunctions + workerInfo.taskFunctionNames = message.taskFunctionNames if (this.emitter != null && this.ready) { this.emitter.emit(PoolEvents.ready, this.info) } } private handleTaskExecutionResponse (message: MessageValue): void { - const { taskId, taskError, data } = message + const { taskId, workerError, data } = message const promiseResponse = this.promiseResponseMap.get(taskId as string) if (promiseResponse != null) { - if (taskError != null) { - this.emitter?.emit(PoolEvents.taskError, taskError) - promiseResponse.reject(taskError.message) + if (workerError != null) { + this.emitter?.emit(PoolEvents.taskError, workerError) + promiseResponse.reject(workerError.message) } else { promiseResponse.resolve(data as Response) }