X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=8f57a9067e0967bf87dd303efabfe694b00bdbc6;hb=937d524da3a5cce4795b85ddd1c430f0d184f731;hp=10c99171dcf06349063fd59772fe4a889c11d346;hpb=8bd0556f30713198367ca520d242b662f5fe147a;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 10c99171..8f57a906 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -4,6 +4,7 @@ import { EventEmitterAsyncResource } from 'node:events' import { performance } from 'node:perf_hooks' import type { TransferListItem } from 'node:worker_threads' +import { defaultBucketSize } from '../priority-queue.js' import type { MessageValue, PromiseResponseWrapper, @@ -327,7 +328,7 @@ export abstract class AbstractPool< ) }), busyWorkerNodes: this.workerNodes.reduce( - (accumulator, _workerNode, workerNodeKey) => + (accumulator, _, workerNodeKey) => this.isWorkerNodeBusy(workerNodeKey) ? accumulator + 1 : accumulator, 0 ), @@ -376,14 +377,16 @@ export abstract class AbstractPool< minimum: round( min( ...this.workerNodes.map( - workerNode => workerNode.usage.runTime.minimum ?? Infinity + workerNode => + workerNode.usage.runTime.minimum ?? Number.POSITIVE_INFINITY ) ) ), maximum: round( max( ...this.workerNodes.map( - workerNode => workerNode.usage.runTime.maximum ?? -Infinity + workerNode => + workerNode.usage.runTime.maximum ?? Number.NEGATIVE_INFINITY ) ) ), @@ -393,7 +396,9 @@ export abstract class AbstractPool< average( this.workerNodes.reduce( (accumulator, workerNode) => - accumulator.concat(workerNode.usage.runTime.history), + accumulator.concat( + workerNode.usage.runTime.history.toArray() + ), [] ) ) @@ -405,7 +410,9 @@ export abstract class AbstractPool< median( this.workerNodes.reduce( (accumulator, workerNode) => - accumulator.concat(workerNode.usage.runTime.history), + accumulator.concat( + workerNode.usage.runTime.history.toArray() + ), [] ) ) @@ -419,14 +426,16 @@ export abstract class AbstractPool< minimum: round( min( ...this.workerNodes.map( - workerNode => workerNode.usage.waitTime.minimum ?? Infinity + workerNode => + workerNode.usage.waitTime.minimum ?? Number.POSITIVE_INFINITY ) ) ), maximum: round( max( ...this.workerNodes.map( - workerNode => workerNode.usage.waitTime.maximum ?? -Infinity + workerNode => + workerNode.usage.waitTime.maximum ?? Number.NEGATIVE_INFINITY ) ) ), @@ -436,7 +445,9 @@ export abstract class AbstractPool< average( this.workerNodes.reduce( (accumulator, workerNode) => - accumulator.concat(workerNode.usage.waitTime.history), + accumulator.concat( + workerNode.usage.waitTime.history.toArray() + ), [] ) ) @@ -448,13 +459,132 @@ export abstract class AbstractPool< median( this.workerNodes.reduce( (accumulator, workerNode) => - accumulator.concat(workerNode.usage.waitTime.history), + accumulator.concat( + workerNode.usage.waitTime.history.toArray() + ), [] ) ) ) }) } + }), + ...(this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements() + .elu.aggregate === true && { + elu: { + idle: { + minimum: round( + min( + ...this.workerNodes.map( + workerNode => + workerNode.usage.elu.idle.minimum ?? + Number.POSITIVE_INFINITY + ) + ) + ), + maximum: round( + max( + ...this.workerNodes.map( + workerNode => + workerNode.usage.elu.idle.maximum ?? + Number.NEGATIVE_INFINITY + ) + ) + ), + ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements() + .elu.average && { + average: round( + average( + this.workerNodes.reduce( + (accumulator, workerNode) => + accumulator.concat( + workerNode.usage.elu.idle.history.toArray() + ), + [] + ) + ) + ) + }), + ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements() + .elu.median && { + median: round( + median( + this.workerNodes.reduce( + (accumulator, workerNode) => + accumulator.concat( + workerNode.usage.elu.idle.history.toArray() + ), + [] + ) + ) + ) + }) + }, + active: { + minimum: round( + min( + ...this.workerNodes.map( + workerNode => + workerNode.usage.elu.active.minimum ?? + Number.POSITIVE_INFINITY + ) + ) + ), + maximum: round( + max( + ...this.workerNodes.map( + workerNode => + workerNode.usage.elu.active.maximum ?? + Number.NEGATIVE_INFINITY + ) + ) + ), + ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements() + .elu.average && { + average: round( + average( + this.workerNodes.reduce( + (accumulator, workerNode) => + accumulator.concat( + workerNode.usage.elu.active.history.toArray() + ), + [] + ) + ) + ) + }), + ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements() + .elu.median && { + median: round( + median( + this.workerNodes.reduce( + (accumulator, workerNode) => + accumulator.concat( + workerNode.usage.elu.active.history.toArray() + ), + [] + ) + ) + ) + }) + }, + utilization: { + average: round( + average( + this.workerNodes.map( + workerNode => workerNode.usage.elu.utilization ?? 0 + ) + ) + ), + median: round( + median( + this.workerNodes.map( + workerNode => workerNode.usage.elu.utilization ?? 0 + ) + ) + ) + } + } }) } } @@ -571,7 +701,7 @@ export abstract class AbstractPool< } if (requireSync) { this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies( - this.getWorkerWorkerChoiceStrategies(), + this.getWorkerChoiceStrategies(), this.opts.workerChoiceStrategyOptions ) for (const workerNodeKey of this.workerNodes.keys()) { @@ -591,7 +721,7 @@ export abstract class AbstractPool< this.opts.workerChoiceStrategyOptions ) this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies( - this.getWorkerWorkerChoiceStrategies(), + this.getWorkerChoiceStrategies(), this.opts.workerChoiceStrategyOptions ) for (const workerNodeKey of this.workerNodes.keys()) { @@ -871,7 +1001,7 @@ export abstract class AbstractPool< }) this.taskFunctions.set(name, fn) this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies( - this.getWorkerWorkerChoiceStrategies() + this.getWorkerChoiceStrategies() ) for (const workerNodeKey of this.workerNodes.keys()) { this.sendStatisticsMessageToWorker(workerNodeKey) @@ -898,7 +1028,7 @@ export abstract class AbstractPool< } this.taskFunctions.delete(name) this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies( - this.getWorkerWorkerChoiceStrategies() + this.getWorkerChoiceStrategies() ) for (const workerNodeKey of this.workerNodes.keys()) { this.sendStatisticsMessageToWorker(workerNodeKey) @@ -920,20 +1050,48 @@ export abstract class AbstractPool< } /** - * Gets task function strategy, if any. + * Gets task function worker choice strategy, if any. * * @param name - The task function name. * @returns The task function worker choice strategy if the task function worker choice strategy is defined, `undefined` otherwise. */ - private readonly getTaskFunctionWorkerWorkerChoiceStrategy = ( + private readonly getTaskFunctionWorkerChoiceStrategy = ( name?: string ): WorkerChoiceStrategy | undefined => { - if (name != null) { - return this.listTaskFunctionsProperties().find( - (taskFunctionProperties: TaskFunctionProperties) => - taskFunctionProperties.name === name - )?.strategy + name = name ?? DEFAULT_TASK_NAME + const taskFunctionsProperties = this.listTaskFunctionsProperties() + if (name === DEFAULT_TASK_NAME) { + name = taskFunctionsProperties[1]?.name } + return taskFunctionsProperties.find( + (taskFunctionProperties: TaskFunctionProperties) => + taskFunctionProperties.name === name + )?.strategy + } + + /** + * Gets worker node task function worker choice strategy, if any. + * + * @param workerNodeKey - The worker node key. + * @param name - The task function name. + * @returns The worker node task function worker choice strategy if the worker node task function worker choice strategy is defined, `undefined` otherwise. + */ + private readonly getWorkerNodeTaskFunctionWorkerChoiceStrategy = ( + workerNodeKey: number, + name?: string + ): WorkerChoiceStrategy | undefined => { + const workerInfo = this.getWorkerInfo(workerNodeKey) + if (workerInfo == null) { + return + } + name = name ?? DEFAULT_TASK_NAME + if (name === DEFAULT_TASK_NAME) { + name = workerInfo.taskFunctionsProperties?.[1]?.name + } + return workerInfo.taskFunctionsProperties?.find( + (taskFunctionProperties: TaskFunctionProperties) => + taskFunctionProperties.name === name + )?.strategy } /** @@ -941,18 +1099,24 @@ export abstract class AbstractPool< * * @param workerNodeKey - The worker node key. * @param name - The task function name. - * @returns The task function worker choice priority if the task function worker choice priority is defined, `undefined` otherwise. + * @returns The worker node task function priority if the worker node task function priority is defined, `undefined` otherwise. */ private readonly getWorkerNodeTaskFunctionPriority = ( workerNodeKey: number, name?: string ): number | undefined => { - if (name != null) { - return this.getWorkerInfo(workerNodeKey)?.taskFunctionsProperties?.find( - (taskFunctionProperties: TaskFunctionProperties) => - taskFunctionProperties.name === name - )?.priority + const workerInfo = this.getWorkerInfo(workerNodeKey) + if (workerInfo == null) { + return + } + name = name ?? DEFAULT_TASK_NAME + if (name === DEFAULT_TASK_NAME) { + name = workerInfo.taskFunctionsProperties?.[1]?.name } + return workerInfo.taskFunctionsProperties?.find( + (taskFunctionProperties: TaskFunctionProperties) => + taskFunctionProperties.name === name + )?.priority } /** @@ -960,7 +1124,7 @@ export abstract class AbstractPool< * * @returns The worker choice strategies. */ - private readonly getWorkerWorkerChoiceStrategies = + private readonly getWorkerChoiceStrategies = (): Set => { return new Set([ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion @@ -1028,15 +1192,16 @@ export abstract class AbstractPool< return } const timestamp = performance.now() - const taskFunctionStrategy = - this.getTaskFunctionWorkerWorkerChoiceStrategy(name) - const workerNodeKey = this.chooseWorkerNode(taskFunctionStrategy) + const workerNodeKey = this.chooseWorkerNode(name) const task: Task = { name: name ?? DEFAULT_TASK_NAME, // eslint-disable-next-line @typescript-eslint/consistent-type-assertions data: data ?? ({} as Data), priority: this.getWorkerNodeTaskFunctionPriority(workerNodeKey, name), - strategy: taskFunctionStrategy, + strategy: this.getWorkerNodeTaskFunctionWorkerChoiceStrategy( + workerNodeKey, + name + ), transferList, timestamp, taskId: randomUUID() @@ -1115,7 +1280,7 @@ export abstract class AbstractPool< } this.destroying = true await Promise.all( - this.workerNodes.map(async (_workerNode, workerNodeKey) => { + this.workerNodes.map(async (_, workerNodeKey) => { await this.destroyWorkerNode(workerNodeKey) }) ) @@ -1306,14 +1471,12 @@ export abstract class AbstractPool< } /** - * Chooses a worker node for the next task given the worker choice strategy. + * Chooses a worker node for the next task. * - * @param workerChoiceStrategy - The worker choice strategy. - * @returns The chosen worker node key + * @param name - The task function name. + * @returns The chosen worker node key. */ - private chooseWorkerNode ( - workerChoiceStrategy?: WorkerChoiceStrategy - ): number { + private chooseWorkerNode (name?: string): number { if (this.shallCreateDynamicWorker()) { const workerNodeKey = this.createAndSetupDynamicWorkerNode() if ( @@ -1324,7 +1487,9 @@ export abstract class AbstractPool< } } // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - return this.workerChoiceStrategiesContext!.execute(workerChoiceStrategy) + return this.workerChoiceStrategiesContext!.execute( + this.getTaskFunctionWorkerChoiceStrategy(name) + ) } /** @@ -1359,7 +1524,8 @@ export abstract class AbstractPool< ) { workerNode.usage.runTime.aggregate = min( ...this.workerNodes.map( - workerNode => workerNode.usage.runTime.aggregate ?? Infinity + workerNode => + workerNode.usage.runTime.aggregate ?? Number.POSITIVE_INFINITY ) ) } @@ -1369,7 +1535,8 @@ export abstract class AbstractPool< ) { workerNode.usage.waitTime.aggregate = min( ...this.workerNodes.map( - workerNode => workerNode.usage.waitTime.aggregate ?? Infinity + workerNode => + workerNode.usage.waitTime.aggregate ?? Number.POSITIVE_INFINITY ) ) } @@ -1379,7 +1546,8 @@ export abstract class AbstractPool< ) { workerNode.usage.elu.active.aggregate = min( ...this.workerNodes.map( - workerNode => workerNode.usage.elu.active.aggregate ?? Infinity + workerNode => + workerNode.usage.elu.active.aggregate ?? Number.POSITIVE_INFINITY ) ) } @@ -1461,6 +1629,7 @@ export abstract class AbstractPool< const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId( message.workerId ) + const workerInfo = this.getWorkerInfo(localWorkerNodeKey) const workerUsage = this.workerNodes[localWorkerNodeKey]?.usage // Kill message received from worker if ( @@ -1469,6 +1638,8 @@ export abstract class AbstractPool< ((this.opts.enableTasksQueue === false && workerUsage.tasks.executing === 0) || (this.opts.enableTasksQueue === true && + workerInfo != null && + !workerInfo.stealing && workerUsage.tasks.executing === 0 && this.tasksQueueSize(localWorkerNodeKey) === 0))) ) { @@ -1671,23 +1842,23 @@ export abstract class AbstractPool< if (workerNode?.usage != null) { ++workerNode.usage.tasks.sequentiallyStolen } - const taskFunctionWorkerUsage = - workerNode.getTaskFunctionWorkerUsage(taskName) if ( this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) && - taskFunctionWorkerUsage != null && - (taskFunctionWorkerUsage.tasks.sequentiallyStolen === 0 || + workerNode.getTaskFunctionWorkerUsage(taskName) != null + ) { + const taskFunctionWorkerUsage = + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + workerNode.getTaskFunctionWorkerUsage(taskName)! + if ( + taskFunctionWorkerUsage.tasks.sequentiallyStolen === 0 || (previousTaskName != null && previousTaskName === taskName && - taskFunctionWorkerUsage.tasks.sequentiallyStolen > 0)) - ) { - ++taskFunctionWorkerUsage.tasks.sequentiallyStolen - } else if ( - this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) && - taskFunctionWorkerUsage != null && - taskFunctionWorkerUsage.tasks.sequentiallyStolen > 0 - ) { - taskFunctionWorkerUsage.tasks.sequentiallyStolen = 0 + taskFunctionWorkerUsage.tasks.sequentiallyStolen > 0) + ) { + ++taskFunctionWorkerUsage.tasks.sequentiallyStolen + } else if (taskFunctionWorkerUsage.tasks.sequentiallyStolen > 0) { + taskFunctionWorkerUsage.tasks.sequentiallyStolen = 0 + } } } @@ -1722,12 +1893,17 @@ export abstract class AbstractPool< ) } const workerInfo = this.getWorkerInfo(workerNodeKey) + if (workerInfo == null) { + throw new Error( + `Worker node with key '${workerNodeKey}' not found in pool` + ) + } if ( this.cannotStealTask() || (this.info.stealingWorkerNodes ?? 0) > Math.floor(this.workerNodes.length / 2) ) { - if (workerInfo != null && previousStolenTask != null) { + if (previousStolenTask != null) { workerInfo.stealing = false this.resetTaskSequentiallyStolenStatisticsWorkerUsage( workerNodeKey, @@ -1739,7 +1915,6 @@ export abstract class AbstractPool< } const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks if ( - workerInfo != null && previousStolenTask != null && (workerNodeTasksUsage.executing > 0 || this.tasksQueueSize(workerNodeKey) > 0) @@ -1752,11 +1927,6 @@ export abstract class AbstractPool< ) return } - if (workerInfo == null) { - throw new Error( - `Worker node with key '${workerNodeKey}' not found in pool` - ) - } workerInfo.stealing = true const stolenTask = this.workerNodeStealTask(workerNodeKey) if (stolenTask != null) { @@ -1814,12 +1984,12 @@ export abstract class AbstractPool< ) { return } - const { workerId } = eventDetail const sizeOffset = 1 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion if (this.opts.tasksQueueOptions!.size! <= sizeOffset) { return } + const { workerId } = eventDetail const sourceWorkerNode = this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)] const workerNodes = this.workerNodes @@ -1855,6 +2025,12 @@ export abstract class AbstractPool< } } + private setTasksQueuePriority (workerNodeKey: number): void { + this.workerNodes[workerNodeKey].setTasksQueuePriority( + this.getTasksQueuePriority() + ) + } + /** * This method is the message listener registered on each worker. */ @@ -1873,6 +2049,7 @@ export abstract class AbstractPool< if (workerInfo != null) { workerInfo.taskFunctionsProperties = taskFunctionsProperties this.sendStatisticsMessageToWorker(workerNodeKey) + this.setTasksQueuePriority(workerNodeKey) } } else if (taskId != null) { // Task execution response received from worker @@ -1897,6 +2074,7 @@ export abstract class AbstractPool< workerNode.info.ready = ready workerNode.info.taskFunctionsProperties = taskFunctionsProperties this.sendStatisticsMessageToWorker(workerNodeKey) + this.setTasksQueuePriority(workerNodeKey) this.checkAndEmitReadyEvent() } @@ -1984,6 +2162,12 @@ export abstract class AbstractPool< return this.workerNodes[workerNodeKey]?.info } + private getTasksQueuePriority (): boolean { + return this.listTaskFunctionsProperties().some( + taskFunctionProperties => taskFunctionProperties.priority != null + ) + } + /** * Creates a worker node. * @@ -2001,8 +2185,8 @@ export abstract class AbstractPool< getDefaultTasksQueueOptions( this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers ).size, - tasksQueueBucketSize: - (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers) * 2 + tasksQueueBucketSize: defaultBucketSize, + tasksQueuePriority: this.getTasksQueuePriority() } ) // Flag the worker node as ready at pool startup.