X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=6457b1554ddbf5bfeb0156a0d40e9bb108dd02dc;hb=3725d989b40800f70d495107b363c7efa2c96fc3;hp=ab7fcf518ed781596a74bb953150fa02f05cb6f0;hpb=b7d085c432048df7f7d9fd555f7ada41d5078405;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index ab7fcf51..6457b155 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -14,7 +14,9 @@ import { average, isKillBehavior, isPlainObject, + max, median, + min, round, updateMeasurementStatistics } from '../utils' @@ -414,16 +416,16 @@ export abstract class AbstractPool< .runTime.aggregate && { runTime: { minimum: round( - Math.min( + min( ...this.workerNodes.map( - (workerNode) => workerNode.usage.runTime?.minimum ?? Infinity + workerNode => workerNode.usage.runTime?.minimum ?? Infinity ) ) ), maximum: round( - Math.max( + max( ...this.workerNodes.map( - (workerNode) => workerNode.usage.runTime?.maximum ?? -Infinity + workerNode => workerNode.usage.runTime?.maximum ?? -Infinity ) ) ), @@ -457,16 +459,16 @@ export abstract class AbstractPool< .waitTime.aggregate && { waitTime: { minimum: round( - Math.min( + min( ...this.workerNodes.map( - (workerNode) => workerNode.usage.waitTime?.minimum ?? Infinity + workerNode => workerNode.usage.waitTime?.minimum ?? Infinity ) ) ), maximum: round( - Math.max( + max( ...this.workerNodes.map( - (workerNode) => workerNode.usage.waitTime?.maximum ?? -Infinity + workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity ) ) ), @@ -588,7 +590,7 @@ export abstract class AbstractPool< */ private getWorkerNodeKeyByWorker (worker: Worker): number { return this.workerNodes.findIndex( - (workerNode) => workerNode.worker === worker + workerNode => workerNode.worker === worker ) } @@ -600,7 +602,7 @@ export abstract class AbstractPool< */ private getWorkerNodeKeyByWorkerId (workerId: number): number { return this.workerNodes.findIndex( - (workerNode) => workerNode.info.id === workerId + workerNode => workerNode.info.id === workerId ) } @@ -704,7 +706,7 @@ export abstract class AbstractPool< if (this.opts.enableTasksQueue === true) { return ( this.workerNodes.findIndex( - (workerNode) => + workerNode => workerNode.info.ready && workerNode.usage.tasks.executing < (this.opts.tasksQueueOptions?.concurrency as number) @@ -713,7 +715,7 @@ export abstract class AbstractPool< } else { return ( this.workerNodes.findIndex( - (workerNode) => + workerNode => workerNode.info.ready && workerNode.usage.tasks.executing === 0 ) === -1 ) @@ -770,14 +772,13 @@ export abstract class AbstractPool< } const timestamp = performance.now() const workerNodeKey = this.chooseWorkerNode() - const workerInfo = this.getWorkerInfo(workerNodeKey) const task: Task = { name: name ?? DEFAULT_TASK_NAME, // eslint-disable-next-line @typescript-eslint/consistent-type-assertions data: data ?? ({} as Data), transferList, timestamp, - workerId: workerInfo.id as number, + workerId: this.getWorkerInfo(workerNodeKey).id as number, taskId: randomUUID() } this.promiseResponseMap.set(task.taskId as string, { @@ -813,7 +814,7 @@ export abstract class AbstractPool< workerId: number ): Promise { await new Promise((resolve, reject) => { - this.registerWorkerMessageListener(workerNodeKey, (message) => { + this.registerWorkerMessageListener(workerNodeKey, message => { if (message.kill === 'success') { resolve() } else if (message.kill === 'failure') { @@ -1062,7 +1063,7 @@ export abstract class AbstractPool< worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION) worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION) worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION) - worker.on('error', (error) => { + worker.on('error', error => { const workerNodeKey = this.getWorkerNodeKeyByWorker(worker) const workerInfo = this.getWorkerInfo(workerNodeKey) workerInfo.ready = false @@ -1070,8 +1071,8 @@ export abstract class AbstractPool< this.emitter?.emit(PoolEvents.error, error) if ( this.opts.restartWorkerOnError === true && - !this.starting && - this.started + this.started && + !this.starting ) { if (workerInfo.dynamic) { this.createAndSetupDynamicWorkerNode() @@ -1102,7 +1103,7 @@ export abstract class AbstractPool< */ protected createAndSetupDynamicWorkerNode (): number { const workerNodeKey = this.createAndSetupWorkerNode() - this.registerWorkerMessageListener(workerNodeKey, (message) => { + this.registerWorkerMessageListener(workerNodeKey, message => { const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId( message.workerId ) @@ -1117,7 +1118,7 @@ export abstract class AbstractPool< workerUsage.tasks.executing === 0 && this.tasksQueueSize(localWorkerNodeKey) === 0))) ) { - this.destroyWorkerNode(localWorkerNodeKey).catch((error) => { + this.destroyWorkerNode(localWorkerNodeKey).catch(error => { this.emitter?.emit(PoolEvents.error, error) }) } @@ -1199,31 +1200,25 @@ export abstract class AbstractPool< private redistributeQueuedTasks (workerNodeKey: number): void { while (this.tasksQueueSize(workerNodeKey) > 0) { - let destinationWorkerNodeKey!: number - let minQueuedTasks = Infinity - for (const [workerNodeId, workerNode] of this.workerNodes.entries()) { - if (workerNode.info.ready && workerNodeId !== workerNodeKey) { - if (workerNode.usage.tasks.queued === 0) { - destinationWorkerNodeKey = workerNodeId - break - } - if (workerNode.usage.tasks.queued < minQueuedTasks) { - minQueuedTasks = workerNode.usage.tasks.queued - destinationWorkerNodeKey = workerNodeId - } - } + const destinationWorkerNodeKey = this.workerNodes.reduce( + (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => { + return workerNode.info.ready && + workerNode.usage.tasks.queued < + workerNodes[minWorkerNodeKey].usage.tasks.queued + ? workerNodeKey + : minWorkerNodeKey + }, + 0 + ) + const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey] + const task = { + ...(this.dequeueTask(workerNodeKey) as Task), + workerId: destinationWorkerNode.info.id as number } - if (destinationWorkerNodeKey != null) { - const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey] - const task = { - ...(this.dequeueTask(workerNodeKey) as Task), - workerId: destinationWorkerNode.info.id as number - } - if (this.shallExecuteTask(destinationWorkerNodeKey)) { - this.executeTask(destinationWorkerNodeKey, task) - } else { - this.enqueueTask(destinationWorkerNodeKey, task) - } + if (this.shallExecuteTask(destinationWorkerNodeKey)) { + this.executeTask(destinationWorkerNodeKey, task) + } else { + this.enqueueTask(destinationWorkerNodeKey, task) } } } @@ -1256,30 +1251,26 @@ export abstract class AbstractPool< (workerNodeA, workerNodeB) => workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued ) - for (const sourceWorkerNode of workerNodes) { - if (sourceWorkerNode.usage.tasks.queued === 0) { - break + const sourceWorkerNode = workerNodes.find( + workerNode => + workerNode.info.ready && + workerNode.info.id !== workerId && + workerNode.usage.tasks.queued > 0 + ) + if (sourceWorkerNode != null) { + const task = { + ...(sourceWorkerNode.popTask() as Task), + workerId: destinationWorkerNode.info.id as number } - if ( - sourceWorkerNode.info.ready && - sourceWorkerNode.info.id !== workerId && - sourceWorkerNode.usage.tasks.queued > 0 - ) { - const task = { - ...(sourceWorkerNode.popTask() as Task), - workerId: destinationWorkerNode.info.id as number - } - if (this.shallExecuteTask(destinationWorkerNodeKey)) { - this.executeTask(destinationWorkerNodeKey, task) - } else { - this.enqueueTask(destinationWorkerNodeKey, task) - } - this.updateTaskStolenStatisticsWorkerUsage( - destinationWorkerNodeKey, - task.name as string - ) - break + if (this.shallExecuteTask(destinationWorkerNodeKey)) { + this.executeTask(destinationWorkerNodeKey, task) + } else { + this.enqueueTask(destinationWorkerNodeKey, task) } + this.updateTaskStolenStatisticsWorkerUsage( + destinationWorkerNodeKey, + task.name as string + ) } } @@ -1327,7 +1318,7 @@ export abstract class AbstractPool< * @returns The listener function to execute when a message is received from a worker. */ protected workerListener (): (message: MessageValue) => void { - return (message) => { + return message => { this.checkMessageWorkerId(message) if (message.ready != null && message.taskFunctions != null) { // Worker ready response received from worker @@ -1465,7 +1456,7 @@ export abstract class AbstractPool< return ( this.opts.enableTasksQueue === true && this.workerNodes.findIndex( - (workerNode) => !workerNode.hasBackPressure() + workerNode => !workerNode.hasBackPressure() ) === -1 ) }