X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=8f57a9067e0967bf87dd303efabfe694b00bdbc6;hb=b7ea53bbd96886c5bc95c13943e5c92a3206f8a5;hp=a23c7983bfc652fbfcb8ece4a75853dbed270ac5;hpb=80115618ce24038f504dc447dfb7c4fbd9c5d698;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index a23c7983..8f57a906 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -4,6 +4,7 @@ import { EventEmitterAsyncResource } from 'node:events' import { performance } from 'node:perf_hooks' import type { TransferListItem } from 'node:worker_threads' +import { defaultBucketSize } from '../priority-queue.js' import type { MessageValue, PromiseResponseWrapper, @@ -395,7 +396,9 @@ export abstract class AbstractPool< average( this.workerNodes.reduce( (accumulator, workerNode) => - accumulator.concat(workerNode.usage.runTime.history), + accumulator.concat( + workerNode.usage.runTime.history.toArray() + ), [] ) ) @@ -407,7 +410,9 @@ export abstract class AbstractPool< median( this.workerNodes.reduce( (accumulator, workerNode) => - accumulator.concat(workerNode.usage.runTime.history), + accumulator.concat( + workerNode.usage.runTime.history.toArray() + ), [] ) ) @@ -440,7 +445,9 @@ export abstract class AbstractPool< average( this.workerNodes.reduce( (accumulator, workerNode) => - accumulator.concat(workerNode.usage.waitTime.history), + accumulator.concat( + workerNode.usage.waitTime.history.toArray() + ), [] ) ) @@ -452,7 +459,9 @@ export abstract class AbstractPool< median( this.workerNodes.reduce( (accumulator, workerNode) => - accumulator.concat(workerNode.usage.waitTime.history), + accumulator.concat( + workerNode.usage.waitTime.history.toArray() + ), [] ) ) @@ -488,7 +497,9 @@ export abstract class AbstractPool< average( this.workerNodes.reduce( (accumulator, workerNode) => - accumulator.concat(workerNode.usage.elu.idle.history), + accumulator.concat( + workerNode.usage.elu.idle.history.toArray() + ), [] ) ) @@ -500,7 +511,9 @@ export abstract class AbstractPool< median( this.workerNodes.reduce( (accumulator, workerNode) => - accumulator.concat(workerNode.usage.elu.idle.history), + accumulator.concat( + workerNode.usage.elu.idle.history.toArray() + ), [] ) ) @@ -532,7 +545,9 @@ export abstract class AbstractPool< average( this.workerNodes.reduce( (accumulator, workerNode) => - accumulator.concat(workerNode.usage.elu.active.history), + accumulator.concat( + workerNode.usage.elu.active.history.toArray() + ), [] ) ) @@ -544,7 +559,9 @@ export abstract class AbstractPool< median( this.workerNodes.reduce( (accumulator, workerNode) => - accumulator.concat(workerNode.usage.elu.active.history), + accumulator.concat( + workerNode.usage.elu.active.history.toArray() + ), [] ) ) @@ -1457,7 +1474,7 @@ export abstract class AbstractPool< * Chooses a worker node for the next task. * * @param name - The task function name. - * @returns The chosen worker node key + * @returns The chosen worker node key. */ private chooseWorkerNode (name?: string): number { if (this.shallCreateDynamicWorker()) { @@ -2008,6 +2025,12 @@ export abstract class AbstractPool< } } + private setTasksQueuePriority (workerNodeKey: number): void { + this.workerNodes[workerNodeKey].setTasksQueuePriority( + this.getTasksQueuePriority() + ) + } + /** * This method is the message listener registered on each worker. */ @@ -2026,6 +2049,7 @@ export abstract class AbstractPool< if (workerInfo != null) { workerInfo.taskFunctionsProperties = taskFunctionsProperties this.sendStatisticsMessageToWorker(workerNodeKey) + this.setTasksQueuePriority(workerNodeKey) } } else if (taskId != null) { // Task execution response received from worker @@ -2050,6 +2074,7 @@ export abstract class AbstractPool< workerNode.info.ready = ready workerNode.info.taskFunctionsProperties = taskFunctionsProperties this.sendStatisticsMessageToWorker(workerNodeKey) + this.setTasksQueuePriority(workerNodeKey) this.checkAndEmitReadyEvent() } @@ -2137,6 +2162,12 @@ export abstract class AbstractPool< return this.workerNodes[workerNodeKey]?.info } + private getTasksQueuePriority (): boolean { + return this.listTaskFunctionsProperties().some( + taskFunctionProperties => taskFunctionProperties.priority != null + ) + } + /** * Creates a worker node. * @@ -2154,8 +2185,8 @@ export abstract class AbstractPool< getDefaultTasksQueueOptions( this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers ).size, - tasksQueueBucketSize: - (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers) * 2 + tasksQueueBucketSize: defaultBucketSize, + tasksQueuePriority: this.getTasksQueuePriority() } ) // Flag the worker node as ready at pool startup.