X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=d65ac2ffe7a92775e428e1a28ba8867a681ec136;hb=3d84cdaec91dfe96a82462c902bead4067b452d2;hp=52829c28163637f97947343b88aab983ab1a101f;hpb=95d1a734d57942c892202df7c0fcaf2fb5ab89ab;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 52829c28..d65ac2ff 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -92,8 +92,13 @@ export abstract class AbstractPool< * * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id. */ - protected promiseResponseMap: Map> = - new Map>() + protected promiseResponseMap: Map< + `${string}-${string}-${string}-${string}-${string}`, + PromiseResponseWrapper + > = new Map< + `${string}-${string}-${string}-${string}-${string}`, + PromiseResponseWrapper + >() /** * Worker choice strategies context referencing worker choice algorithms implementation. @@ -137,7 +142,7 @@ export abstract class AbstractPool< /** * The start timestamp of the pool. */ - private readonly startTimestamp + private startTimestamp?: number /** * Constructs a new poolifier pool. @@ -168,7 +173,7 @@ export abstract class AbstractPool< this.enqueueTask = this.enqueueTask.bind(this) if (this.opts.enableEvents === true) { - this.initializeEventEmitter() + this.initEventEmitter() } this.workerChoiceStrategiesContext = new WorkerChoiceStrategiesContext< Worker, @@ -193,8 +198,6 @@ export abstract class AbstractPool< if (this.opts.startWorkers === true) { this.start() } - - this.startTimestamp = performance.now() } private checkPoolType (): void { @@ -283,7 +286,7 @@ export abstract class AbstractPool< } } - private initializeEventEmitter (): void { + private initEventEmitter (): void { this.emitter = new EventEmitterAsyncResource({ name: `poolifier:${this.type}-${this.worker}-pool` }) @@ -487,6 +490,9 @@ export abstract class AbstractPool< * @returns The pool utilization. */ private get utilization (): number { + if (this.startTimestamp == null) { + return 0 + } const poolTimeCapacity = (performance.now() - this.startTimestamp) * (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers) @@ -551,7 +557,7 @@ export abstract class AbstractPool< let requireSync = false checkValidWorkerChoiceStrategy(workerChoiceStrategy) if (workerChoiceStrategyOptions != null) { - requireSync = this.setWorkerChoiceStrategyOptions( + requireSync = !this.setWorkerChoiceStrategyOptions( workerChoiceStrategyOptions ) } @@ -867,6 +873,9 @@ export abstract class AbstractPool< this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies( this.getWorkerWorkerChoiceStrategies() ) + for (const workerNodeKey of this.workerNodes.keys()) { + this.sendStatisticsMessageToWorker(workerNodeKey) + } return opResult } @@ -884,11 +893,16 @@ export abstract class AbstractPool< this.taskFunctions.get(name) ) }) - this.deleteTaskFunctionWorkerUsages(name) + for (const workerNode of this.workerNodes) { + workerNode.deleteTaskFunctionWorkerUsage(name) + } this.taskFunctions.delete(name) this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies( this.getWorkerWorkerChoiceStrategies() ) + for (const workerNodeKey of this.workerNodes.keys()) { + this.sendStatisticsMessageToWorker(workerNodeKey) + } return opResult } @@ -973,12 +987,6 @@ export abstract class AbstractPool< }) } - private deleteTaskFunctionWorkerUsages (name: string): void { - for (const workerNode of this.workerNodes) { - workerNode.deleteTaskFunctionWorkerUsage(name) - } - } - private shallExecuteTask (workerNodeKey: number): boolean { return ( this.tasksQueueSize(workerNodeKey) === 0 && @@ -1060,7 +1068,7 @@ export abstract class AbstractPool< /** * Starts the minimum number of workers. */ - private startMinimumNumberOfWorkers (): void { + private startMinimumNumberOfWorkers (initWorkerNodeUsage = false): void { this.startingMinimumNumberOfWorkers = true while ( this.workerNodes.reduce( @@ -1069,7 +1077,9 @@ export abstract class AbstractPool< 0 ) < this.minimumNumberOfWorkers ) { - this.createAndSetupWorkerNode() + const workerNodeKey = this.createAndSetupWorkerNode() + initWorkerNodeUsage && + this.initWorkerNodeUsage(this.workerNodes[workerNodeKey]) } this.startingMinimumNumberOfWorkers = false } @@ -1087,6 +1097,7 @@ export abstract class AbstractPool< } this.starting = true this.startMinimumNumberOfWorkers() + this.startTimestamp = performance.now() this.starting = false this.started = true } @@ -1111,6 +1122,7 @@ export abstract class AbstractPool< this.emitter?.emit(PoolEvents.destroy, this.info) this.emitter?.emitDestroy() this.readyEventEmitted = false + delete this.startTimestamp this.destroying = false this.started = false } @@ -1231,7 +1243,7 @@ export abstract class AbstractPool< workerNodeKey: number, message: MessageValue ): void { - let needWorkerChoiceStrategyUpdate = false + let needWorkerChoiceStrategiesUpdate = false // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition if (this.workerNodes[workerNodeKey]?.usage != null) { const workerUsage = this.workerNodes[workerNodeKey].usage @@ -1246,7 +1258,7 @@ export abstract class AbstractPool< workerUsage, message ) - needWorkerChoiceStrategyUpdate = true + needWorkerChoiceStrategiesUpdate = true } if ( this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) && @@ -1271,9 +1283,9 @@ export abstract class AbstractPool< taskFunctionWorkerUsage, message ) - needWorkerChoiceStrategyUpdate = true + needWorkerChoiceStrategiesUpdate = true } - if (needWorkerChoiceStrategyUpdate) { + if (needWorkerChoiceStrategiesUpdate) { this.workerChoiceStrategiesContext?.update(workerNodeKey) } } @@ -1294,7 +1306,7 @@ export abstract class AbstractPool< } /** - * Chooses a worker node for the next task. + * Chooses a worker node for the next task given the worker choice strategy. * * @param workerChoiceStrategy - The worker choice strategy. * @returns The chosen worker node key @@ -1335,6 +1347,44 @@ export abstract class AbstractPool< transferList?: readonly TransferListItem[] ): void + /** + * Initializes the worker node usage with sensible default values gathered during runtime. + * + * @param workerNode - The worker node. + */ + private initWorkerNodeUsage (workerNode: IWorkerNode): void { + if ( + this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements() + .runTime.aggregate === true + ) { + workerNode.usage.runTime.aggregate = min( + ...this.workerNodes.map( + workerNode => workerNode.usage.runTime.aggregate ?? Infinity + ) + ) + } + if ( + this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements() + .waitTime.aggregate === true + ) { + workerNode.usage.waitTime.aggregate = min( + ...this.workerNodes.map( + workerNode => workerNode.usage.waitTime.aggregate ?? Infinity + ) + ) + } + if ( + this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements().elu + .aggregate === true + ) { + workerNode.usage.elu.active.aggregate = min( + ...this.workerNodes.map( + workerNode => workerNode.usage.elu.active.aggregate ?? Infinity + ) + ) + } + } + /** * Creates a new, completely set up worker node. * @@ -1365,7 +1415,7 @@ export abstract class AbstractPool< if (workerNode.info.dynamic) { this.createAndSetupDynamicWorkerNode() } else if (!this.startingMinimumNumberOfWorkers) { - this.startMinimumNumberOfWorkers() + this.startMinimumNumberOfWorkers(true) } } if ( @@ -1391,7 +1441,7 @@ export abstract class AbstractPool< !this.startingMinimumNumberOfWorkers && !this.destroying ) { - this.startMinimumNumberOfWorkers() + this.startMinimumNumberOfWorkers(true) } }) const workerNodeKey = this.addWorkerNode(workerNode) @@ -1456,6 +1506,7 @@ export abstract class AbstractPool< ) { workerNode.info.ready = true } + this.initWorkerNodeUsage(workerNode) this.checkAndEmitDynamicWorkerCreationEvents() return workerNodeKey } @@ -1568,14 +1619,15 @@ export abstract class AbstractPool< } } - private redistributeQueuedTasks (workerNodeKey: number): void { - if (workerNodeKey === -1 || this.cannotStealTask()) { + private redistributeQueuedTasks (sourceWorkerNodeKey: number): void { + if (sourceWorkerNodeKey === -1 || this.cannotStealTask()) { return } - while (this.tasksQueueSize(workerNodeKey) > 0) { + while (this.tasksQueueSize(sourceWorkerNodeKey) > 0) { const destinationWorkerNodeKey = this.workerNodes.reduce( (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => { - return workerNode.info.ready && + return sourceWorkerNodeKey !== workerNodeKey && + workerNode.info.ready && workerNode.usage.tasks.queued < workerNodes[minWorkerNodeKey].usage.tasks.queued ? workerNodeKey @@ -1586,7 +1638,7 @@ export abstract class AbstractPool< this.handleTask( destinationWorkerNodeKey, // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - this.dequeueTask(workerNodeKey)! + this.dequeueTask(sourceWorkerNodeKey)! ) } } @@ -1604,28 +1656,21 @@ export abstract class AbstractPool< this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) && workerNode.getTaskFunctionWorkerUsage(taskName) != null ) { - const taskFunctionWorkerUsage = - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - workerNode.getTaskFunctionWorkerUsage(taskName)! - ++taskFunctionWorkerUsage.tasks.stolen + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + ++workerNode.getTaskFunctionWorkerUsage(taskName)!.tasks.stolen } } private updateTaskSequentiallyStolenStatisticsWorkerUsage ( - workerNodeKey: number + workerNodeKey: number, + taskName: string, + previousTaskName?: string ): void { const workerNode = this.workerNodes[workerNodeKey] // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition if (workerNode?.usage != null) { ++workerNode.usage.tasks.sequentiallyStolen } - } - - private updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage ( - workerNodeKey: number, - taskName: string - ): void { - const workerNode = this.workerNodes[workerNodeKey] if ( this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) && workerNode.getTaskFunctionWorkerUsage(taskName) != null @@ -1633,33 +1678,36 @@ export abstract class AbstractPool< const taskFunctionWorkerUsage = // eslint-disable-next-line @typescript-eslint/no-non-null-assertion workerNode.getTaskFunctionWorkerUsage(taskName)! - ++taskFunctionWorkerUsage.tasks.sequentiallyStolen + if ( + taskFunctionWorkerUsage.tasks.sequentiallyStolen === 0 || + (previousTaskName != null && + previousTaskName === taskName && + taskFunctionWorkerUsage.tasks.sequentiallyStolen > 0) + ) { + ++taskFunctionWorkerUsage.tasks.sequentiallyStolen + } else if (taskFunctionWorkerUsage.tasks.sequentiallyStolen > 0) { + taskFunctionWorkerUsage.tasks.sequentiallyStolen = 0 + } } } private resetTaskSequentiallyStolenStatisticsWorkerUsage ( - workerNodeKey: number + workerNodeKey: number, + taskName: string ): void { const workerNode = this.workerNodes[workerNodeKey] // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition if (workerNode?.usage != null) { workerNode.usage.tasks.sequentiallyStolen = 0 } - } - - private resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage ( - workerNodeKey: number, - taskName: string - ): void { - const workerNode = this.workerNodes[workerNodeKey] if ( this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) && workerNode.getTaskFunctionWorkerUsage(taskName) != null ) { - const taskFunctionWorkerUsage = - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - workerNode.getTaskFunctionWorkerUsage(taskName)! - taskFunctionWorkerUsage.tasks.sequentiallyStolen = 0 + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + workerNode.getTaskFunctionWorkerUsage( + taskName + )!.tasks.sequentiallyStolen = 0 } } @@ -1681,6 +1729,11 @@ export abstract class AbstractPool< ) { if (workerInfo != null && previousStolenTask != null) { workerInfo.stealing = false + this.resetTaskSequentiallyStolenStatisticsWorkerUsage( + workerNodeKey, + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + previousStolenTask.name! + ) } return } @@ -1688,19 +1741,15 @@ export abstract class AbstractPool< if ( workerInfo != null && previousStolenTask != null && - workerNodeTasksUsage.sequentiallyStolen > 0 && (workerNodeTasksUsage.executing > 0 || this.tasksQueueSize(workerNodeKey) > 0) ) { workerInfo.stealing = false - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - for (const taskFunctionProperties of workerInfo.taskFunctionsProperties!) { - this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage( - workerNodeKey, - taskFunctionProperties.name - ) - } - this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey) + this.resetTaskSequentiallyStolenStatisticsWorkerUsage( + workerNodeKey, + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + previousStolenTask.name! + ) return } if (workerInfo == null) { @@ -1710,33 +1759,13 @@ export abstract class AbstractPool< } workerInfo.stealing = true const stolenTask = this.workerNodeStealTask(workerNodeKey) - if ( - this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) && - stolenTask != null - ) { - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - const taskFunctionTasksWorkerUsage = this.workerNodes[ - workerNodeKey + if (stolenTask != null) { + this.updateTaskSequentiallyStolenStatisticsWorkerUsage( + workerNodeKey, // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - ].getTaskFunctionWorkerUsage(stolenTask.name!)!.tasks - if ( - taskFunctionTasksWorkerUsage.sequentiallyStolen === 0 || - (previousStolenTask != null && - previousStolenTask.name === stolenTask.name && - taskFunctionTasksWorkerUsage.sequentiallyStolen > 0) - ) { - this.updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage( - workerNodeKey, - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - stolenTask.name! - ) - } else { - this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage( - workerNodeKey, - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - stolenTask.name! - ) - } + stolenTask.name!, + previousStolenTask?.name + ) } sleep(exponentialDelay(workerNodeTasksUsage.sequentiallyStolen)) .then(() => { @@ -1766,9 +1795,8 @@ export abstract class AbstractPool< ) if (sourceWorkerNode != null) { // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - const task = sourceWorkerNode.dequeueTask(1)! + const task = sourceWorkerNode.dequeueLastPrioritizedTask()! this.handleTask(workerNodeKey, task) - this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey) // eslint-disable-next-line @typescript-eslint/no-non-null-assertion this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey, task.name!) return task @@ -1780,6 +1808,7 @@ export abstract class AbstractPool< ): void => { if ( this.cannotStealTask() || + this.hasBackPressure() || (this.info.stealingWorkerNodes ?? 0) > Math.floor(this.workerNodes.length / 2) ) { @@ -1817,7 +1846,7 @@ export abstract class AbstractPool< } workerInfo.stealing = true // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - const task = sourceWorkerNode.dequeueTask(1)! + const task = sourceWorkerNode.dequeueLastPrioritizedTask()! this.handleTask(workerNodeKey, task) // eslint-disable-next-line @typescript-eslint/no-non-null-assertion this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey, task.name!) @@ -1839,11 +1868,11 @@ export abstract class AbstractPool< this.handleWorkerReadyResponse(message) } else if (taskFunctionsProperties != null) { // Task function properties message received from worker - const workerInfo = this.getWorkerInfo( - this.getWorkerNodeKeyByWorkerId(workerId) - ) + const workerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId) + const workerInfo = this.getWorkerInfo(workerNodeKey) if (workerInfo != null) { workerInfo.taskFunctionsProperties = taskFunctionsProperties + this.sendStatisticsMessageToWorker(workerNodeKey) } } else if (taskId != null) { // Task execution response received from worker @@ -1863,10 +1892,11 @@ export abstract class AbstractPool< if (ready == null || !ready) { throw new Error(`Worker ${workerId} failed to initialize`) } - const workerNode = - this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)] + const workerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId) + const workerNode = this.workerNodes[workerNodeKey] workerNode.info.ready = ready workerNode.info.taskFunctionsProperties = taskFunctionsProperties + this.sendStatisticsMessageToWorker(workerNodeKey) this.checkAndEmitReadyEvent() } @@ -2053,11 +2083,8 @@ export abstract class AbstractPool< return tasksQueueSize } - private dequeueTask ( - workerNodeKey: number, - bucket?: number - ): Task | undefined { - return this.workerNodes[workerNodeKey].dequeueTask(bucket) + private dequeueTask (workerNodeKey: number): Task | undefined { + return this.workerNodes[workerNodeKey].dequeueTask() } private tasksQueueSize (workerNodeKey: number): number {