X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=e4dac676ab95acae7ee60a82d332084766735e46;hb=ec8ed549f5422b9c23ff5b446ec885f39656373d;hp=ebdb0dd8924c0e18f71a456c620b1d08a6ad2e6b;hpb=db89e3bcb546d7302179437c4fa88d6fd03bccf0;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index ebdb0dd8..e4dac676 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -4,11 +4,12 @@ import { EventEmitterAsyncResource } from 'node:events' import { performance } from 'node:perf_hooks' import type { TransferListItem } from 'node:worker_threads' +import { defaultBucketSize } from '../priority-queue.js' import type { MessageValue, PromiseResponseWrapper, Task, - TaskFunctionProperties + TaskFunctionProperties, } from '../utility-types.js' import { average, @@ -22,11 +23,11 @@ import { median, min, round, - sleep + sleep, } from '../utils.js' import type { TaskFunction, - TaskFunctionObject + TaskFunctionObject, } from '../worker/task-functions.js' import { KillBehaviors } from '../worker/worker-options.js' import { @@ -36,13 +37,13 @@ import { type PoolOptions, type PoolType, PoolTypes, - type TasksQueueOptions + type TasksQueueOptions, } from './pool.js' import { Measurements, WorkerChoiceStrategies, type WorkerChoiceStrategy, - type WorkerChoiceStrategyOptions + type WorkerChoiceStrategyOptions, } from './selection-strategies/selection-strategies-types.js' import { WorkerChoiceStrategiesContext } from './selection-strategies/worker-choice-strategies-context.js' import { @@ -55,7 +56,7 @@ import { updateRunTimeWorkerUsage, updateTaskStatisticsWorkerUsage, updateWaitTimeWorkerUsage, - waitWorkerNodeEvents + waitWorkerNodeEvents, } from './utils.js' import { version } from './version.js' import type { @@ -63,13 +64,12 @@ import type { IWorkerNode, WorkerInfo, WorkerNodeEventDetail, - WorkerType + WorkerType, } from './worker.js' import { WorkerNode } from './worker-node.js' /** * Base class that implements some shared logic for all poolifier pools. - * * @typeParam Worker - Type of worker which manages this pool. * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data. * @typeParam Response - Type of execution response. This can only be structured-cloneable data. @@ -80,7 +80,7 @@ export abstract class AbstractPool< Response = unknown > implements IPool { /** @inheritDoc */ - public readonly workerNodes: Array> = [] + public readonly workerNodes: IWorkerNode[] = [] /** @inheritDoc */ public emitter?: EventEmitterAsyncResource @@ -94,19 +94,19 @@ export abstract class AbstractPool< */ protected promiseResponseMap: Map< `${string}-${string}-${string}-${string}-${string}`, - PromiseResponseWrapper + PromiseResponseWrapper > = new Map< `${string}-${string}-${string}-${string}-${string}`, PromiseResponseWrapper - >() + >() /** * Worker choice strategies context referencing worker choice algorithms implementation. */ protected workerChoiceStrategiesContext?: WorkerChoiceStrategiesContext< - Worker, - Data, - Response + Worker, + Data, + Response > /** @@ -115,8 +115,8 @@ export abstract class AbstractPool< * - `value`: The task function object. */ private readonly taskFunctions: Map< - string, - TaskFunctionObject + string, + TaskFunctionObject > /** @@ -146,7 +146,6 @@ export abstract class AbstractPool< /** * Constructs a new poolifier pool. - * * @param minimumNumberOfWorkers - Minimum number of workers that this pool manages. * @param filePath - Path to the worker file. * @param opts - Options for the pool. @@ -176,9 +175,9 @@ export abstract class AbstractPool< this.initEventEmitter() } this.workerChoiceStrategiesContext = new WorkerChoiceStrategiesContext< - Worker, - Data, - Response + Worker, + Data, + Response >( this, // eslint-disable-next-line @typescript-eslint/no-non-null-assertion @@ -288,7 +287,7 @@ export abstract class AbstractPool< private initEventEmitter (): void { this.emitter = new EventEmitterAsyncResource({ - name: `poolifier:${this.type}-${this.worker}-pool` + name: `poolifier:${this.type}-${this.worker}-pool`, }) } @@ -309,7 +308,7 @@ export abstract class AbstractPool< .runTime.aggregate === true && this.workerChoiceStrategiesContext.getTaskStatisticsRequirements() .waitTime.aggregate && { - utilization: round(this.utilization) + utilization: round(this.utilization), }), workerNodes: this.workerNodes.length, idleWorkerNodes: this.workerNodes.reduce( @@ -324,7 +323,7 @@ export abstract class AbstractPool< (accumulator, workerNode) => workerNode.info.stealing ? accumulator + 1 : accumulator, 0 - ) + ), }), busyWorkerNodes: this.workerNodes.reduce( (accumulator, _, workerNodeKey) => @@ -346,24 +345,24 @@ export abstract class AbstractPool< (accumulator, workerNode) => accumulator + workerNode.usage.tasks.queued, 0 - ) + ), }), ...(this.opts.enableTasksQueue === true && { maxQueuedTasks: this.workerNodes.reduce( (accumulator, workerNode) => accumulator + (workerNode.usage.tasks.maxQueued ?? 0), 0 - ) + ), }), ...(this.opts.enableTasksQueue === true && { - backPressure: this.hasBackPressure() + backPressure: this.hasBackPressure(), }), ...(this.opts.enableTasksQueue === true && { stolenTasks: this.workerNodes.reduce( (accumulator, workerNode) => accumulator + workerNode.usage.tasks.stolen, 0 - ) + ), }), failedTasks: this.workerNodes.reduce( (accumulator, workerNode) => @@ -401,7 +400,7 @@ export abstract class AbstractPool< [] ) ) - ) + ), }), ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements() .runTime.median && { @@ -415,9 +414,9 @@ export abstract class AbstractPool< [] ) ) - ) - }) - } + ), + }), + }, }), ...(this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements() .waitTime.aggregate === true && { @@ -450,7 +449,7 @@ export abstract class AbstractPool< [] ) ) - ) + ), }), ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements() .waitTime.median && { @@ -464,9 +463,9 @@ export abstract class AbstractPool< [] ) ) - ) - }) - } + ), + }), + }, }), ...(this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements() .elu.aggregate === true && { @@ -502,7 +501,7 @@ export abstract class AbstractPool< [] ) ) - ) + ), }), ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements() .elu.median && { @@ -516,8 +515,8 @@ export abstract class AbstractPool< [] ) ) - ) - }) + ), + }), }, active: { minimum: round( @@ -550,7 +549,7 @@ export abstract class AbstractPool< [] ) ) - ) + ), }), ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements() .elu.median && { @@ -564,8 +563,8 @@ export abstract class AbstractPool< [] ) ) - ) - }) + ), + }), }, utilization: { average: round( @@ -581,10 +580,10 @@ export abstract class AbstractPool< workerNode => workerNode.usage.elu.utilization ?? 0 ) ) - ) - } - } - }) + ), + }, + }, + }), } } @@ -615,7 +614,6 @@ export abstract class AbstractPool< /** * The approximate pool utilization. - * * @returns The pool utilization. */ private get utilization (): number { @@ -652,7 +650,6 @@ export abstract class AbstractPool< /** * Checks if the worker id sent in the received message from a worker is valid. - * * @param message - The received message. * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid. */ @@ -668,7 +665,6 @@ export abstract class AbstractPool< /** * Gets the worker node key given its worker id. - * * @param workerId - The worker id. * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise. */ @@ -779,7 +775,7 @@ export abstract class AbstractPool< ...getDefaultTasksQueueOptions( this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers ), - ...tasksQueueOptions + ...tasksQueueOptions, } } @@ -843,7 +839,6 @@ export abstract class AbstractPool< /** * Whether worker nodes are executing concurrently their tasks quota or not. - * * @returns Worker nodes busyness boolean status. */ protected internalBusy (): boolean { @@ -996,7 +991,7 @@ export abstract class AbstractPool< const opResult = await this.sendTaskFunctionOperationToWorkers({ taskFunctionOperation: 'add', taskFunctionProperties: buildTaskFunctionProperties(name, fn), - taskFunction: fn.taskFunction.toString() + taskFunction: fn.taskFunction.toString(), }) this.taskFunctions.set(name, fn) this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies( @@ -1020,7 +1015,7 @@ export abstract class AbstractPool< taskFunctionProperties: buildTaskFunctionProperties( name, this.taskFunctions.get(name) - ) + ), }) for (const workerNode of this.workerNodes) { workerNode.deleteTaskFunctionWorkerUsage(name) @@ -1050,7 +1045,6 @@ export abstract class AbstractPool< /** * Gets task function worker choice strategy, if any. - * * @param name - The task function name. * @returns The task function worker choice strategy if the task function worker choice strategy is defined, `undefined` otherwise. */ @@ -1070,7 +1064,6 @@ export abstract class AbstractPool< /** * Gets worker node task function worker choice strategy, if any. - * * @param workerNodeKey - The worker node key. * @param name - The task function name. * @returns The worker node task function worker choice strategy if the worker node task function worker choice strategy is defined, `undefined` otherwise. @@ -1095,7 +1088,6 @@ export abstract class AbstractPool< /** * Gets worker node task function priority, if any. - * * @param workerNodeKey - The worker node key. * @param name - The task function name. * @returns The worker node task function priority if the worker node task function priority is defined, `undefined` otherwise. @@ -1120,7 +1112,6 @@ export abstract class AbstractPool< /** * Gets the worker choice strategies registered in this pool. - * * @returns The worker choice strategies. */ private readonly getWorkerChoiceStrategies = @@ -1135,7 +1126,7 @@ export abstract class AbstractPool< ) .filter( (strategy: WorkerChoiceStrategy | undefined) => strategy != null - ) as WorkerChoiceStrategy[]) + ) as WorkerChoiceStrategy[]), ]) } @@ -1146,7 +1137,7 @@ export abstract class AbstractPool< taskFunctionProperties: buildTaskFunctionProperties( name, this.taskFunctions.get(name) - ) + ), }) } @@ -1194,7 +1185,6 @@ export abstract class AbstractPool< const workerNodeKey = this.chooseWorkerNode(name) const task: Task = { name: name ?? DEFAULT_TASK_NAME, - // eslint-disable-next-line @typescript-eslint/consistent-type-assertions data: data ?? ({} as Data), priority: this.getWorkerNodeTaskFunctionPriority(workerNodeKey, name), strategy: this.getWorkerNodeTaskFunctionWorkerChoiceStrategy( @@ -1203,7 +1193,7 @@ export abstract class AbstractPool< ), transferList, timestamp, - taskId: randomUUID() + taskId: randomUUID(), } // eslint-disable-next-line @typescript-eslint/no-non-null-assertion this.promiseResponseMap.set(task.taskId!, { @@ -1213,9 +1203,9 @@ export abstract class AbstractPool< ...(this.emitter != null && { asyncResource: new AsyncResource('poolifier:task', { triggerAsyncId: this.emitter.asyncId, - requireManualDestroy: true - }) - }) + requireManualDestroy: true, + }), + }), }) if ( this.opts.enableTasksQueue === false || @@ -1231,6 +1221,7 @@ export abstract class AbstractPool< /** * Starts the minimum number of workers. + * @param initWorkerNodeUsage - Whether to initialize the worker node usage or not. @defaultValue false */ private startMinimumNumberOfWorkers (initWorkerNodeUsage = false): void { this.startingMinimumNumberOfWorkers = true @@ -1293,7 +1284,6 @@ export abstract class AbstractPool< private async sendKillMessageToWorker (workerNodeKey: number): Promise { await new Promise((resolve, reject) => { - // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition if (this.workerNodes[workerNodeKey] == null) { resolve() return @@ -1318,7 +1308,6 @@ export abstract class AbstractPool< /** * Terminates the worker node given its worker node key. - * * @param workerNodeKey - The worker node key. */ protected async destroyWorkerNode (workerNodeKey: number): Promise { @@ -1341,8 +1330,6 @@ export abstract class AbstractPool< /** * Setup hook to execute code before worker nodes are created in the abstract constructor. * Can be overridden. - * - * @virtual */ protected setupHook (): void { /* Intentionally empty */ @@ -1350,7 +1337,6 @@ export abstract class AbstractPool< /** * Returns whether the worker is the main worker or not. - * * @returns `true` if the worker is the main worker, `false` otherwise. */ protected abstract isMain (): boolean @@ -1358,7 +1344,6 @@ export abstract class AbstractPool< /** * Hook executed before the worker task execution. * Can be overridden. - * * @param workerNodeKey - The worker node key. * @param task - The task to execute. */ @@ -1366,7 +1351,6 @@ export abstract class AbstractPool< workerNodeKey: number, task: Task ): void { - // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition if (this.workerNodes[workerNodeKey]?.usage != null) { const workerUsage = this.workerNodes[workerNodeKey].usage ++workerUsage.tasks.executing @@ -1399,7 +1383,6 @@ export abstract class AbstractPool< /** * Hook executed after the worker task execution. * Can be overridden. - * * @param workerNodeKey - The worker node key. * @param message - The received message. */ @@ -1408,7 +1391,6 @@ export abstract class AbstractPool< message: MessageValue ): void { let needWorkerChoiceStrategiesUpdate = false - // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition if (this.workerNodes[workerNodeKey]?.usage != null) { const workerUsage = this.workerNodes[workerNodeKey].usage updateTaskStatisticsWorkerUsage(workerUsage, message) @@ -1456,7 +1438,6 @@ export abstract class AbstractPool< /** * Whether the worker node shall update its task function worker usage or not. - * * @param workerNodeKey - The worker node key. * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise. */ @@ -1471,7 +1452,6 @@ export abstract class AbstractPool< /** * Chooses a worker node for the next task. - * * @param name - The task function name. * @returns The chosen worker node key. */ @@ -1493,14 +1473,12 @@ export abstract class AbstractPool< /** * Conditions for dynamic worker creation. - * * @returns Whether to create a dynamic worker or not. */ protected abstract shallCreateDynamicWorker (): boolean /** * Sends a message to worker given its worker node key. - * * @param workerNodeKey - The worker node key. * @param message - The message. * @param transferList - The optional array of transferable objects. @@ -1513,7 +1491,6 @@ export abstract class AbstractPool< /** * Initializes the worker node usage with sensible default values gathered during runtime. - * * @param workerNode - The worker node. */ private initWorkerNodeUsage (workerNode: IWorkerNode): void { @@ -1554,7 +1531,6 @@ export abstract class AbstractPool< /** * Creates a new, completely set up worker node. - * * @returns New, completely set up worker node key. */ protected createAndSetupWorkerNode (): number { @@ -1592,7 +1568,6 @@ export abstract class AbstractPool< ) { this.redistributeQueuedTasks(this.workerNodes.indexOf(workerNode)) } - // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition workerNode?.terminate().catch((error: unknown) => { this.emitter?.emit(PoolEvents.error, error) }) @@ -1618,7 +1593,6 @@ export abstract class AbstractPool< /** * Creates a new, completely set up dynamic worker node. - * * @returns New, completely set up dynamic worker node key. */ protected createAndSetupDynamicWorkerNode (): number { @@ -1650,7 +1624,7 @@ export abstract class AbstractPool< } }) this.sendToWorker(workerNodeKey, { - checkActive: true + checkActive: true, }) if (this.taskFunctions.size > 0) { for (const [taskFunctionName, taskFunctionObject] of this.taskFunctions) { @@ -1660,7 +1634,7 @@ export abstract class AbstractPool< taskFunctionName, taskFunctionObject ), - taskFunction: taskFunctionObject.taskFunction.toString() + taskFunction: taskFunctionObject.taskFunction.toString(), }).catch((error: unknown) => { this.emitter?.emit(PoolEvents.error, error) }) @@ -1683,7 +1657,6 @@ export abstract class AbstractPool< /** * Registers a listener callback on the worker given its worker node key. - * * @param workerNodeKey - The worker node key. * @param listener - The message listener callback. */ @@ -1696,7 +1669,6 @@ export abstract class AbstractPool< /** * Registers once a listener callback on the worker given its worker node key. - * * @param workerNodeKey - The worker node key. * @param listener - The message listener callback. */ @@ -1709,7 +1681,6 @@ export abstract class AbstractPool< /** * Deregisters a listener callback on the worker given its worker node key. - * * @param workerNodeKey - The worker node key. * @param listener - The message listener callback. */ @@ -1723,7 +1694,6 @@ export abstract class AbstractPool< /** * Method hooked up after a worker node has been newly created. * Can be overridden. - * * @param workerNodeKey - The newly created worker node key. */ protected afterWorkerNodeSetup (workerNodeKey: number): void { @@ -1754,14 +1724,12 @@ export abstract class AbstractPool< /** * Sends the startup message to worker given its worker node key. - * * @param workerNodeKey - The worker node key. */ protected abstract sendStartupMessageToWorker (workerNodeKey: number): void /** * Sends the statistics message to worker given its worker node key. - * * @param workerNodeKey - The worker node key. */ private sendStatisticsMessageToWorker (workerNodeKey: number): void { @@ -1772,8 +1740,8 @@ export abstract class AbstractPool< .runTime.aggregate ?? false, elu: this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements() - .elu.aggregate ?? false - } + .elu.aggregate ?? false, + }, }) } @@ -1818,7 +1786,6 @@ export abstract class AbstractPool< taskName: string ): void { const workerNode = this.workerNodes[workerNodeKey] - // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition if (workerNode?.usage != null) { ++workerNode.usage.tasks.stolen } @@ -1837,7 +1804,6 @@ export abstract class AbstractPool< previousTaskName?: string ): void { const workerNode = this.workerNodes[workerNodeKey] - // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition if (workerNode?.usage != null) { ++workerNode.usage.tasks.sequentiallyStolen } @@ -1866,7 +1832,6 @@ export abstract class AbstractPool< taskName: string ): void { const workerNode = this.workerNodes[workerNodeKey] - // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition if (workerNode?.usage != null) { workerNode.usage.tasks.sequentiallyStolen = 0 } @@ -2024,8 +1989,15 @@ export abstract class AbstractPool< } } + private setTasksQueuePriority (workerNodeKey: number): void { + this.workerNodes[workerNodeKey].setTasksQueuePriority( + this.getTasksQueuePriority() + ) + } + /** * This method is the message listener registered on each worker. + * @param message - The message received from the worker. */ protected readonly workerMessageListener = ( message: MessageValue @@ -2042,6 +2014,7 @@ export abstract class AbstractPool< if (workerInfo != null) { workerInfo.taskFunctionsProperties = taskFunctionsProperties this.sendStatisticsMessageToWorker(workerNodeKey) + this.setTasksQueuePriority(workerNodeKey) } } else if (taskId != null) { // Task execution response received from worker @@ -2066,6 +2039,7 @@ export abstract class AbstractPool< workerNode.info.ready = ready workerNode.info.taskFunctionsProperties = taskFunctionsProperties this.sendStatisticsMessageToWorker(workerNodeKey) + this.setTasksQueuePriority(workerNodeKey) this.checkAndEmitReadyEvent() } @@ -2094,12 +2068,10 @@ export abstract class AbstractPool< this.afterTaskExecutionHook(workerNodeKey, message) // eslint-disable-next-line @typescript-eslint/no-non-null-assertion this.promiseResponseMap.delete(taskId!) - // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition workerNode?.emit('taskFinished', taskId) if ( this.opts.enableTasksQueue === true && !this.destroying && - // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition workerNode != null ) { const workerNodeTasksUsage = workerNode.usage.tasks @@ -2119,7 +2091,7 @@ export abstract class AbstractPool< ) { workerNode.emit('idle', { workerId, - workerNodeKey + workerNodeKey, }) } } @@ -2145,7 +2117,6 @@ export abstract class AbstractPool< /** * Gets the worker information given its worker node key. - * * @param workerNodeKey - The worker node key. * @returns The worker information. */ @@ -2153,9 +2124,14 @@ export abstract class AbstractPool< return this.workerNodes[workerNodeKey]?.info } + private getTasksQueuePriority (): boolean { + return this.listTaskFunctionsProperties().some( + taskFunctionProperties => taskFunctionProperties.priority != null + ) + } + /** * Creates a worker node. - * * @returns The created worker node. */ private createWorkerNode (): IWorkerNode { @@ -2170,8 +2146,8 @@ export abstract class AbstractPool< getDefaultTasksQueueOptions( this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers ).size, - tasksQueueBucketSize: - (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers) * 2 + tasksQueueBucketSize: defaultBucketSize, + tasksQueuePriority: this.getTasksQueuePriority(), } ) // Flag the worker node as ready at pool startup. @@ -2183,7 +2159,6 @@ export abstract class AbstractPool< /** * Adds the given worker node in the pool worker nodes. - * * @param workerNode - The worker node. * @returns The added worker node key. * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found. @@ -2206,7 +2181,6 @@ export abstract class AbstractPool< /** * Removes the worker node from the pool worker nodes. - * * @param workerNode - The worker node. */ private removeWorkerNode (workerNode: IWorkerNode): void { @@ -2236,7 +2210,6 @@ export abstract class AbstractPool< /** * Executes the given task on the worker given its worker node key. - * * @param workerNodeKey - The worker node key. * @param task - The task to execute. */