X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=8f57a9067e0967bf87dd303efabfe694b00bdbc6;hb=b3b1857ba11fd35d0f7118ef8046c38676473a15;hp=d71b724446ff150d983329233668e19c2e63f6b5;hpb=910416386b4f7d0da4e6f0d8551cefa2539c5ced;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index d71b7244..8f57a906 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -4,6 +4,7 @@ import { EventEmitterAsyncResource } from 'node:events' import { performance } from 'node:perf_hooks' import type { TransferListItem } from 'node:worker_threads' +import { defaultBucketSize } from '../priority-queue.js' import type { MessageValue, PromiseResponseWrapper, @@ -376,14 +377,16 @@ export abstract class AbstractPool< minimum: round( min( ...this.workerNodes.map( - workerNode => workerNode.usage.runTime.minimum ?? Infinity + workerNode => + workerNode.usage.runTime.minimum ?? Number.POSITIVE_INFINITY ) ) ), maximum: round( max( ...this.workerNodes.map( - workerNode => workerNode.usage.runTime.maximum ?? -Infinity + workerNode => + workerNode.usage.runTime.maximum ?? Number.NEGATIVE_INFINITY ) ) ), @@ -393,7 +396,9 @@ export abstract class AbstractPool< average( this.workerNodes.reduce( (accumulator, workerNode) => - accumulator.concat(workerNode.usage.runTime.history), + accumulator.concat( + workerNode.usage.runTime.history.toArray() + ), [] ) ) @@ -405,7 +410,9 @@ export abstract class AbstractPool< median( this.workerNodes.reduce( (accumulator, workerNode) => - accumulator.concat(workerNode.usage.runTime.history), + accumulator.concat( + workerNode.usage.runTime.history.toArray() + ), [] ) ) @@ -419,14 +426,16 @@ export abstract class AbstractPool< minimum: round( min( ...this.workerNodes.map( - workerNode => workerNode.usage.waitTime.minimum ?? Infinity + workerNode => + workerNode.usage.waitTime.minimum ?? Number.POSITIVE_INFINITY ) ) ), maximum: round( max( ...this.workerNodes.map( - workerNode => workerNode.usage.waitTime.maximum ?? -Infinity + workerNode => + workerNode.usage.waitTime.maximum ?? Number.NEGATIVE_INFINITY ) ) ), @@ -436,7 +445,9 @@ export abstract class AbstractPool< average( this.workerNodes.reduce( (accumulator, workerNode) => - accumulator.concat(workerNode.usage.waitTime.history), + accumulator.concat( + workerNode.usage.waitTime.history.toArray() + ), [] ) ) @@ -448,7 +459,9 @@ export abstract class AbstractPool< median( this.workerNodes.reduce( (accumulator, workerNode) => - accumulator.concat(workerNode.usage.waitTime.history), + accumulator.concat( + workerNode.usage.waitTime.history.toArray() + ), [] ) ) @@ -463,14 +476,18 @@ export abstract class AbstractPool< minimum: round( min( ...this.workerNodes.map( - workerNode => workerNode.usage.elu.idle.minimum ?? Infinity + workerNode => + workerNode.usage.elu.idle.minimum ?? + Number.POSITIVE_INFINITY ) ) ), maximum: round( max( ...this.workerNodes.map( - workerNode => workerNode.usage.elu.idle.maximum ?? -Infinity + workerNode => + workerNode.usage.elu.idle.maximum ?? + Number.NEGATIVE_INFINITY ) ) ), @@ -480,7 +497,9 @@ export abstract class AbstractPool< average( this.workerNodes.reduce( (accumulator, workerNode) => - accumulator.concat(workerNode.usage.elu.idle.history), + accumulator.concat( + workerNode.usage.elu.idle.history.toArray() + ), [] ) ) @@ -492,7 +511,9 @@ export abstract class AbstractPool< median( this.workerNodes.reduce( (accumulator, workerNode) => - accumulator.concat(workerNode.usage.elu.idle.history), + accumulator.concat( + workerNode.usage.elu.idle.history.toArray() + ), [] ) ) @@ -503,14 +524,18 @@ export abstract class AbstractPool< minimum: round( min( ...this.workerNodes.map( - workerNode => workerNode.usage.elu.active.minimum ?? Infinity + workerNode => + workerNode.usage.elu.active.minimum ?? + Number.POSITIVE_INFINITY ) ) ), maximum: round( max( ...this.workerNodes.map( - workerNode => workerNode.usage.elu.active.maximum ?? -Infinity + workerNode => + workerNode.usage.elu.active.maximum ?? + Number.NEGATIVE_INFINITY ) ) ), @@ -520,7 +545,9 @@ export abstract class AbstractPool< average( this.workerNodes.reduce( (accumulator, workerNode) => - accumulator.concat(workerNode.usage.elu.active.history), + accumulator.concat( + workerNode.usage.elu.active.history.toArray() + ), [] ) ) @@ -532,12 +559,30 @@ export abstract class AbstractPool< median( this.workerNodes.reduce( (accumulator, workerNode) => - accumulator.concat(workerNode.usage.elu.active.history), + accumulator.concat( + workerNode.usage.elu.active.history.toArray() + ), [] ) ) ) }) + }, + utilization: { + average: round( + average( + this.workerNodes.map( + workerNode => workerNode.usage.elu.utilization ?? 0 + ) + ) + ), + median: round( + median( + this.workerNodes.map( + workerNode => workerNode.usage.elu.utilization ?? 0 + ) + ) + ) } } }) @@ -1054,7 +1099,7 @@ export abstract class AbstractPool< * * @param workerNodeKey - The worker node key. * @param name - The task function name. - * @returns The task function worker choice priority if the task function worker choice priority is defined, `undefined` otherwise. + * @returns The worker node task function priority if the worker node task function priority is defined, `undefined` otherwise. */ private readonly getWorkerNodeTaskFunctionPriority = ( workerNodeKey: number, @@ -1429,7 +1474,7 @@ export abstract class AbstractPool< * Chooses a worker node for the next task. * * @param name - The task function name. - * @returns The chosen worker node key + * @returns The chosen worker node key. */ private chooseWorkerNode (name?: string): number { if (this.shallCreateDynamicWorker()) { @@ -1479,7 +1524,8 @@ export abstract class AbstractPool< ) { workerNode.usage.runTime.aggregate = min( ...this.workerNodes.map( - workerNode => workerNode.usage.runTime.aggregate ?? Infinity + workerNode => + workerNode.usage.runTime.aggregate ?? Number.POSITIVE_INFINITY ) ) } @@ -1489,7 +1535,8 @@ export abstract class AbstractPool< ) { workerNode.usage.waitTime.aggregate = min( ...this.workerNodes.map( - workerNode => workerNode.usage.waitTime.aggregate ?? Infinity + workerNode => + workerNode.usage.waitTime.aggregate ?? Number.POSITIVE_INFINITY ) ) } @@ -1499,7 +1546,8 @@ export abstract class AbstractPool< ) { workerNode.usage.elu.active.aggregate = min( ...this.workerNodes.map( - workerNode => workerNode.usage.elu.active.aggregate ?? Infinity + workerNode => + workerNode.usage.elu.active.aggregate ?? Number.POSITIVE_INFINITY ) ) } @@ -1581,6 +1629,7 @@ export abstract class AbstractPool< const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId( message.workerId ) + const workerInfo = this.getWorkerInfo(localWorkerNodeKey) const workerUsage = this.workerNodes[localWorkerNodeKey]?.usage // Kill message received from worker if ( @@ -1589,6 +1638,8 @@ export abstract class AbstractPool< ((this.opts.enableTasksQueue === false && workerUsage.tasks.executing === 0) || (this.opts.enableTasksQueue === true && + workerInfo != null && + !workerInfo.stealing && workerUsage.tasks.executing === 0 && this.tasksQueueSize(localWorkerNodeKey) === 0))) ) { @@ -1974,6 +2025,12 @@ export abstract class AbstractPool< } } + private setTasksQueuePriority (workerNodeKey: number): void { + this.workerNodes[workerNodeKey].setTasksQueuePriority( + this.getTasksQueuePriority() + ) + } + /** * This method is the message listener registered on each worker. */ @@ -1992,6 +2049,7 @@ export abstract class AbstractPool< if (workerInfo != null) { workerInfo.taskFunctionsProperties = taskFunctionsProperties this.sendStatisticsMessageToWorker(workerNodeKey) + this.setTasksQueuePriority(workerNodeKey) } } else if (taskId != null) { // Task execution response received from worker @@ -2016,6 +2074,7 @@ export abstract class AbstractPool< workerNode.info.ready = ready workerNode.info.taskFunctionsProperties = taskFunctionsProperties this.sendStatisticsMessageToWorker(workerNodeKey) + this.setTasksQueuePriority(workerNodeKey) this.checkAndEmitReadyEvent() } @@ -2103,6 +2162,12 @@ export abstract class AbstractPool< return this.workerNodes[workerNodeKey]?.info } + private getTasksQueuePriority (): boolean { + return this.listTaskFunctionsProperties().some( + taskFunctionProperties => taskFunctionProperties.priority != null + ) + } + /** * Creates a worker node. * @@ -2120,8 +2185,8 @@ export abstract class AbstractPool< getDefaultTasksQueueOptions( this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers ).size, - tasksQueueBucketSize: - (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers) * 2 + tasksQueueBucketSize: defaultBucketSize, + tasksQueuePriority: this.getTasksQueuePriority() } ) // Flag the worker node as ready at pool startup.