X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=eb517943a6dc74b6ec88ca9cd17f4ca5c5eb5365;hb=904f1dd1c228e9cc710aa03fab7e50bc6daa1192;hp=9dcc6eddd28fcc9b0e27e6eebbe55e9dc31b6458;hpb=b7e141c40bccfd7a4ec0ff98b7829f7d296f048b;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 9dcc6edd..eb517943 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -21,6 +21,7 @@ import { updateMeasurementStatistics } from '../utils' import { KillBehaviors } from '../worker/worker-options' +import type { TaskFunction } from '../worker/task-functions' import { type IPool, PoolEmitter, @@ -92,6 +93,13 @@ export abstract class AbstractPool< */ protected readonly max?: number + /** + * The task functions added at runtime map: + * - `key`: The task function name. + * - `value`: The task function itself. + */ + private readonly taskFunctions: Map> + /** * Whether the pool is starting or not. */ @@ -145,6 +153,8 @@ export abstract class AbstractPool< this.setupHook() + this.taskFunctions = new Map>() + this.starting = true this.startPool() this.starting = false @@ -418,14 +428,14 @@ export abstract class AbstractPool< minimum: round( min( ...this.workerNodes.map( - (workerNode) => workerNode.usage.runTime?.minimum ?? Infinity + workerNode => workerNode.usage.runTime?.minimum ?? Infinity ) ) ), maximum: round( max( ...this.workerNodes.map( - (workerNode) => workerNode.usage.runTime?.maximum ?? -Infinity + workerNode => workerNode.usage.runTime?.maximum ?? -Infinity ) ) ), @@ -461,14 +471,14 @@ export abstract class AbstractPool< minimum: round( min( ...this.workerNodes.map( - (workerNode) => workerNode.usage.waitTime?.minimum ?? Infinity + workerNode => workerNode.usage.waitTime?.minimum ?? Infinity ) ) ), maximum: round( max( ...this.workerNodes.map( - (workerNode) => workerNode.usage.waitTime?.maximum ?? -Infinity + workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity ) ) ), @@ -590,7 +600,7 @@ export abstract class AbstractPool< */ private getWorkerNodeKeyByWorker (worker: Worker): number { return this.workerNodes.findIndex( - (workerNode) => workerNode.worker === worker + workerNode => workerNode.worker === worker ) } @@ -600,9 +610,9 @@ export abstract class AbstractPool< * @param workerId - The worker id. * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise. */ - private getWorkerNodeKeyByWorkerId (workerId: number): number { + private getWorkerNodeKeyByWorkerId (workerId: number | undefined): number { return this.workerNodes.findIndex( - (workerNode) => workerNode.info.id === workerId + workerNode => workerNode.info.id === workerId ) } @@ -706,7 +716,7 @@ export abstract class AbstractPool< if (this.opts.enableTasksQueue === true) { return ( this.workerNodes.findIndex( - (workerNode) => + workerNode => workerNode.info.ready && workerNode.usage.tasks.executing < (this.opts.tasksQueueOptions?.concurrency as number) @@ -715,26 +725,135 @@ export abstract class AbstractPool< } else { return ( this.workerNodes.findIndex( - (workerNode) => + workerNode => workerNode.info.ready && workerNode.usage.tasks.executing === 0 ) === -1 ) } } + private async sendTaskFunctionOperationToWorker ( + workerNodeKey: number, + message: MessageValue + ): Promise { + const workerId = this.getWorkerInfo(workerNodeKey).id as number + return await new Promise((resolve, reject) => { + this.registerWorkerMessageListener(workerNodeKey, message => { + if ( + message.workerId === workerId && + message.taskFunctionOperationStatus === true + ) { + resolve(true) + } else if ( + message.workerId === workerId && + message.taskFunctionOperationStatus === false + ) { + reject( + new Error( + `Task function operation ${ + message.taskFunctionOperation as string + } failed on worker ${message.workerId}` + ) + ) + } + }) + this.sendToWorker(workerNodeKey, message) + }) + } + + private async sendTaskFunctionOperationToWorkers ( + message: Omit, 'workerId'> + ): Promise { + return await new Promise((resolve, reject) => { + const responsesReceived = new Array>() + for (const [workerNodeKey] of this.workerNodes.entries()) { + this.registerWorkerMessageListener(workerNodeKey, message => { + if (message.taskFunctionOperationStatus != null) { + responsesReceived.push(message) + if ( + responsesReceived.length === this.workerNodes.length && + responsesReceived.every( + message => message.taskFunctionOperationStatus === true + ) + ) { + resolve(true) + } else if ( + responsesReceived.length === this.workerNodes.length && + responsesReceived.some( + message => message.taskFunctionOperationStatus === false + ) + ) { + reject( + new Error( + `Task function operation ${ + message.taskFunctionOperation as string + } failed on worker ${message.workerId as number}` + ) + ) + } + } + }) + this.sendToWorker(workerNodeKey, message) + } + }) + } + + /** @inheritDoc */ + public hasTaskFunction (name: string): boolean { + for (const workerNode of this.workerNodes) { + if ( + Array.isArray(workerNode.info.taskFunctionNames) && + workerNode.info.taskFunctionNames.includes(name) + ) { + return true + } + } + return false + } + + /** @inheritDoc */ + public async addTaskFunction ( + name: string, + taskFunction: TaskFunction + ): Promise { + this.taskFunctions.set(name, taskFunction) + return await this.sendTaskFunctionOperationToWorkers({ + taskFunctionOperation: 'add', + taskFunctionName: name, + taskFunction: taskFunction.toString() + }) + } + + /** @inheritDoc */ + public async removeTaskFunction (name: string): Promise { + this.taskFunctions.delete(name) + return await this.sendTaskFunctionOperationToWorkers({ + taskFunctionOperation: 'remove', + taskFunctionName: name + }) + } + /** @inheritDoc */ - public listTaskFunctions (): string[] { + public listTaskFunctionNames (): string[] { for (const workerNode of this.workerNodes) { if ( - Array.isArray(workerNode.info.taskFunctions) && - workerNode.info.taskFunctions.length > 0 + Array.isArray(workerNode.info.taskFunctionNames) && + workerNode.info.taskFunctionNames.length > 0 ) { - return workerNode.info.taskFunctions + return workerNode.info.taskFunctionNames } } return [] } + /** @inheritDoc */ + public async setDefaultTaskFunction (name: string): Promise { + return await this.sendTaskFunctionOperationToWorkers({ + taskFunctionOperation: 'default', + taskFunctionName: name + }) + } + private shallExecuteTask (workerNodeKey: number): boolean { return ( this.tasksQueueSize(workerNodeKey) === 0 && @@ -772,14 +891,12 @@ export abstract class AbstractPool< } const timestamp = performance.now() const workerNodeKey = this.chooseWorkerNode() - const workerInfo = this.getWorkerInfo(workerNodeKey) const task: Task = { name: name ?? DEFAULT_TASK_NAME, // eslint-disable-next-line @typescript-eslint/consistent-type-assertions data: data ?? ({} as Data), transferList, timestamp, - workerId: workerInfo.id as number, taskId: randomUUID() } this.promiseResponseMap.set(task.taskId as string, { @@ -811,18 +928,23 @@ export abstract class AbstractPool< } protected async sendKillMessageToWorker ( - workerNodeKey: number, - workerId: number + workerNodeKey: number ): Promise { await new Promise((resolve, reject) => { - this.registerWorkerMessageListener(workerNodeKey, (message) => { + this.registerWorkerMessageListener(workerNodeKey, message => { if (message.kill === 'success') { resolve() } else if (message.kill === 'failure') { - reject(new Error(`Worker ${workerId} kill message handling failed`)) + reject( + new Error( + `Worker ${ + message.workerId as number + } kill message handling failed` + ) + ) } }) - this.sendToWorker(workerNodeKey, { kill: true, workerId }) + this.sendToWorker(workerNodeKey, { kill: true }) }) } @@ -922,8 +1044,8 @@ export abstract class AbstractPool< const workerInfo = this.getWorkerInfo(workerNodeKey) return ( workerInfo != null && - Array.isArray(workerInfo.taskFunctions) && - workerInfo.taskFunctions.length > 2 + Array.isArray(workerInfo.taskFunctionNames) && + workerInfo.taskFunctionNames.length > 2 ) } @@ -938,7 +1060,7 @@ export abstract class AbstractPool< ) { --workerTaskStatistics.executing } - if (message.taskError == null) { + if (message.workerError == null) { ++workerTaskStatistics.executed } else { ++workerTaskStatistics.failed @@ -949,7 +1071,7 @@ export abstract class AbstractPool< workerUsage: WorkerUsage, message: MessageValue ): void { - if (message.taskError != null) { + if (message.workerError != null) { return } updateMeasurementStatistics( @@ -976,7 +1098,7 @@ export abstract class AbstractPool< workerUsage: WorkerUsage, message: MessageValue ): void { - if (message.taskError != null) { + if (message.workerError != null) { return } const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements = @@ -1064,7 +1186,7 @@ export abstract class AbstractPool< worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION) worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION) worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION) - worker.on('error', (error) => { + worker.on('error', error => { const workerNodeKey = this.getWorkerNodeKeyByWorker(worker) const workerInfo = this.getWorkerInfo(workerNodeKey) workerInfo.ready = false @@ -1072,8 +1194,8 @@ export abstract class AbstractPool< this.emitter?.emit(PoolEvents.error, error) if ( this.opts.restartWorkerOnError === true && - !this.starting && - this.started + this.started && + !this.starting ) { if (workerInfo.dynamic) { this.createAndSetupDynamicWorkerNode() @@ -1104,7 +1226,7 @@ export abstract class AbstractPool< */ protected createAndSetupDynamicWorkerNode (): number { const workerNodeKey = this.createAndSetupWorkerNode() - this.registerWorkerMessageListener(workerNodeKey, (message) => { + this.registerWorkerMessageListener(workerNodeKey, message => { const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId( message.workerId ) @@ -1119,16 +1241,26 @@ export abstract class AbstractPool< workerUsage.tasks.executing === 0 && this.tasksQueueSize(localWorkerNodeKey) === 0))) ) { - this.destroyWorkerNode(localWorkerNodeKey).catch((error) => { + this.destroyWorkerNode(localWorkerNodeKey).catch(error => { this.emitter?.emit(PoolEvents.error, error) }) } }) const workerInfo = this.getWorkerInfo(workerNodeKey) this.sendToWorker(workerNodeKey, { - checkActive: true, - workerId: workerInfo.id as number + checkActive: true }) + if (this.taskFunctions.size > 0) { + for (const [taskFunctionName, taskFunction] of this.taskFunctions) { + this.sendTaskFunctionOperationToWorker(workerNodeKey, { + taskFunctionOperation: 'add', + taskFunctionName, + taskFunction: taskFunction.toString() + }).catch(error => { + this.emitter?.emit(PoolEvents.error, error) + }) + } + } workerInfo.dynamic = true if ( this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady || @@ -1194,8 +1326,7 @@ export abstract class AbstractPool< .runTime.aggregate, elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements() .elu.aggregate - }, - workerId: this.getWorkerInfo(workerNodeKey).id as number + } }) } @@ -1211,11 +1342,7 @@ export abstract class AbstractPool< }, 0 ) - const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey] - const task = { - ...(this.dequeueTask(workerNodeKey) as Task), - workerId: destinationWorkerNode.info.id as number - } + const task = this.dequeueTask(workerNodeKey) as Task if (this.shallExecuteTask(destinationWorkerNodeKey)) { this.executeTask(destinationWorkerNodeKey, task) } else { @@ -1245,7 +1372,6 @@ export abstract class AbstractPool< private taskStealingOnEmptyQueue (workerId: number): void { const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId) - const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey] const workerNodes = this.workerNodes .slice() .sort( @@ -1253,16 +1379,13 @@ export abstract class AbstractPool< workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued ) const sourceWorkerNode = workerNodes.find( - (workerNode) => + workerNode => workerNode.info.ready && workerNode.info.id !== workerId && workerNode.usage.tasks.queued > 0 ) if (sourceWorkerNode != null) { - const task = { - ...(sourceWorkerNode.popTask() as Task), - workerId: destinationWorkerNode.info.id as number - } + const task = sourceWorkerNode.popTask() as Task if (this.shallExecuteTask(destinationWorkerNodeKey)) { this.executeTask(destinationWorkerNodeKey, task) } else { @@ -1296,10 +1419,7 @@ export abstract class AbstractPool< workerNode.usage.tasks.queued < (this.opts.tasksQueueOptions?.size as number) - sizeOffset ) { - const task = { - ...(sourceWorkerNode.popTask() as Task), - workerId: workerNode.info.id as number - } + const task = sourceWorkerNode.popTask() as Task if (this.shallExecuteTask(workerNodeKey)) { this.executeTask(workerNodeKey, task) } else { @@ -1319,44 +1439,46 @@ export abstract class AbstractPool< * @returns The listener function to execute when a message is received from a worker. */ protected workerListener (): (message: MessageValue) => void { - return (message) => { + return message => { this.checkMessageWorkerId(message) - if (message.ready != null && message.taskFunctions != null) { + if (message.ready != null && message.taskFunctionNames != null) { // Worker ready response received from worker this.handleWorkerReadyResponse(message) } else if (message.taskId != null) { // Task execution response received from worker this.handleTaskExecutionResponse(message) - } else if (message.taskFunctions != null) { - // Task functions message received from worker + } else if (message.taskFunctionNames != null) { + // Task function names message received from worker this.getWorkerInfo( this.getWorkerNodeKeyByWorkerId(message.workerId) - ).taskFunctions = message.taskFunctions + ).taskFunctionNames = message.taskFunctionNames } } } private handleWorkerReadyResponse (message: MessageValue): void { if (message.ready === false) { - throw new Error(`Worker ${message.workerId} failed to initialize`) + throw new Error( + `Worker ${message.workerId as number} failed to initialize` + ) } const workerInfo = this.getWorkerInfo( this.getWorkerNodeKeyByWorkerId(message.workerId) ) workerInfo.ready = message.ready as boolean - workerInfo.taskFunctions = message.taskFunctions + workerInfo.taskFunctionNames = message.taskFunctionNames if (this.emitter != null && this.ready) { this.emitter.emit(PoolEvents.ready, this.info) } } private handleTaskExecutionResponse (message: MessageValue): void { - const { taskId, taskError, data } = message + const { taskId, workerError, data } = message const promiseResponse = this.promiseResponseMap.get(taskId as string) if (promiseResponse != null) { - if (taskError != null) { - this.emitter?.emit(PoolEvents.taskError, taskError) - promiseResponse.reject(taskError.message) + if (workerError != null) { + this.emitter?.emit(PoolEvents.taskError, workerError) + promiseResponse.reject(workerError.message) } else { promiseResponse.resolve(data as Response) } @@ -1457,7 +1579,7 @@ export abstract class AbstractPool< return ( this.opts.enableTasksQueue === true && this.workerNodes.findIndex( - (workerNode) => !workerNode.hasBackPressure() + workerNode => !workerNode.hasBackPressure() ) === -1 ) }