X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=8f57a9067e0967bf87dd303efabfe694b00bdbc6;hb=b3b1857ba11fd35d0f7118ef8046c38676473a15;hp=ebdb0dd8924c0e18f71a456c620b1d08a6ad2e6b;hpb=db89e3bcb546d7302179437c4fa88d6fd03bccf0;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index ebdb0dd8..8f57a906 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -4,6 +4,7 @@ import { EventEmitterAsyncResource } from 'node:events' import { performance } from 'node:perf_hooks' import type { TransferListItem } from 'node:worker_threads' +import { defaultBucketSize } from '../priority-queue.js' import type { MessageValue, PromiseResponseWrapper, @@ -2024,6 +2025,12 @@ export abstract class AbstractPool< } } + private setTasksQueuePriority (workerNodeKey: number): void { + this.workerNodes[workerNodeKey].setTasksQueuePriority( + this.getTasksQueuePriority() + ) + } + /** * This method is the message listener registered on each worker. */ @@ -2042,6 +2049,7 @@ export abstract class AbstractPool< if (workerInfo != null) { workerInfo.taskFunctionsProperties = taskFunctionsProperties this.sendStatisticsMessageToWorker(workerNodeKey) + this.setTasksQueuePriority(workerNodeKey) } } else if (taskId != null) { // Task execution response received from worker @@ -2066,6 +2074,7 @@ export abstract class AbstractPool< workerNode.info.ready = ready workerNode.info.taskFunctionsProperties = taskFunctionsProperties this.sendStatisticsMessageToWorker(workerNodeKey) + this.setTasksQueuePriority(workerNodeKey) this.checkAndEmitReadyEvent() } @@ -2153,6 +2162,12 @@ export abstract class AbstractPool< return this.workerNodes[workerNodeKey]?.info } + private getTasksQueuePriority (): boolean { + return this.listTaskFunctionsProperties().some( + taskFunctionProperties => taskFunctionProperties.priority != null + ) + } + /** * Creates a worker node. * @@ -2170,8 +2185,8 @@ export abstract class AbstractPool< getDefaultTasksQueueOptions( this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers ).size, - tasksQueueBucketSize: - (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers) * 2 + tasksQueueBucketSize: defaultBucketSize, + tasksQueuePriority: this.getTasksQueuePriority() } ) // Flag the worker node as ready at pool startup.