X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=712de0b7da4c38c2808e1be61da7fced8e372da0;hb=884743b122a66be45c94d83d9230e28a9ab43836;hp=07fec75f140a81ee10f39671274ecc4037a4fee3;hpb=5d9133ae9ef83c5f69e26daf8dcfea584884d9c4;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 07fec75f..712de0b7 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -4,11 +4,12 @@ import { EventEmitterAsyncResource } from 'node:events' import { performance } from 'node:perf_hooks' import type { TransferListItem } from 'node:worker_threads' +import { defaultBucketSize } from '../priority-queue.js' import type { MessageValue, PromiseResponseWrapper, Task, - TaskFunctionProperties + TaskFunctionProperties, } from '../utility-types.js' import { average, @@ -22,11 +23,11 @@ import { median, min, round, - sleep + sleep, } from '../utils.js' import type { TaskFunction, - TaskFunctionObject + TaskFunctionObject, } from '../worker/task-functions.js' import { KillBehaviors } from '../worker/worker-options.js' import { @@ -36,13 +37,13 @@ import { type PoolOptions, type PoolType, PoolTypes, - type TasksQueueOptions + type TasksQueueOptions, } from './pool.js' import { Measurements, WorkerChoiceStrategies, type WorkerChoiceStrategy, - type WorkerChoiceStrategyOptions + type WorkerChoiceStrategyOptions, } from './selection-strategies/selection-strategies-types.js' import { WorkerChoiceStrategiesContext } from './selection-strategies/worker-choice-strategies-context.js' import { @@ -55,7 +56,7 @@ import { updateRunTimeWorkerUsage, updateTaskStatisticsWorkerUsage, updateWaitTimeWorkerUsage, - waitWorkerNodeEvents + waitWorkerNodeEvents, } from './utils.js' import { version } from './version.js' import type { @@ -63,13 +64,12 @@ import type { IWorkerNode, WorkerInfo, WorkerNodeEventDetail, - WorkerType + WorkerType, } from './worker.js' import { WorkerNode } from './worker-node.js' /** * Base class that implements some shared logic for all poolifier pools. - * * @typeParam Worker - Type of worker which manages this pool. * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data. * @typeParam Response - Type of execution response. This can only be structured-cloneable data. @@ -80,7 +80,7 @@ export abstract class AbstractPool< Response = unknown > implements IPool { /** @inheritDoc */ - public readonly workerNodes: Array> = [] + public readonly workerNodes: IWorkerNode[] = [] /** @inheritDoc */ public emitter?: EventEmitterAsyncResource @@ -92,16 +92,21 @@ export abstract class AbstractPool< * * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id. */ - protected promiseResponseMap: Map> = - new Map>() + protected promiseResponseMap: Map< + `${string}-${string}-${string}-${string}-${string}`, + PromiseResponseWrapper + > = new Map< + `${string}-${string}-${string}-${string}-${string}`, + PromiseResponseWrapper + >() /** * Worker choice strategies context referencing worker choice algorithms implementation. */ protected workerChoiceStrategiesContext?: WorkerChoiceStrategiesContext< - Worker, - Data, - Response + Worker, + Data, + Response > /** @@ -110,8 +115,8 @@ export abstract class AbstractPool< * - `value`: The task function object. */ private readonly taskFunctions: Map< - string, - TaskFunctionObject + string, + TaskFunctionObject > /** @@ -141,7 +146,6 @@ export abstract class AbstractPool< /** * Constructs a new poolifier pool. - * * @param minimumNumberOfWorkers - Minimum number of workers that this pool manages. * @param filePath - Path to the worker file. * @param opts - Options for the pool. @@ -171,9 +175,9 @@ export abstract class AbstractPool< this.initEventEmitter() } this.workerChoiceStrategiesContext = new WorkerChoiceStrategiesContext< - Worker, - Data, - Response + Worker, + Data, + Response >( this, // eslint-disable-next-line @typescript-eslint/no-non-null-assertion @@ -283,7 +287,7 @@ export abstract class AbstractPool< private initEventEmitter (): void { this.emitter = new EventEmitterAsyncResource({ - name: `poolifier:${this.type}-${this.worker}-pool` + name: `poolifier:${this.type}-${this.worker}-pool`, }) } @@ -304,7 +308,7 @@ export abstract class AbstractPool< .runTime.aggregate === true && this.workerChoiceStrategiesContext.getTaskStatisticsRequirements() .waitTime.aggregate && { - utilization: round(this.utilization) + utilization: round(this.utilization), }), workerNodes: this.workerNodes.length, idleWorkerNodes: this.workerNodes.reduce( @@ -319,10 +323,10 @@ export abstract class AbstractPool< (accumulator, workerNode) => workerNode.info.stealing ? accumulator + 1 : accumulator, 0 - ) + ), }), busyWorkerNodes: this.workerNodes.reduce( - (accumulator, _workerNode, workerNodeKey) => + (accumulator, _, workerNodeKey) => this.isWorkerNodeBusy(workerNodeKey) ? accumulator + 1 : accumulator, 0 ), @@ -341,24 +345,24 @@ export abstract class AbstractPool< (accumulator, workerNode) => accumulator + workerNode.usage.tasks.queued, 0 - ) + ), }), ...(this.opts.enableTasksQueue === true && { maxQueuedTasks: this.workerNodes.reduce( (accumulator, workerNode) => accumulator + (workerNode.usage.tasks.maxQueued ?? 0), 0 - ) + ), }), ...(this.opts.enableTasksQueue === true && { - backPressure: this.hasBackPressure() + backPressure: this.hasBackPressure(), }), ...(this.opts.enableTasksQueue === true && { stolenTasks: this.workerNodes.reduce( (accumulator, workerNode) => accumulator + workerNode.usage.tasks.stolen, 0 - ) + ), }), failedTasks: this.workerNodes.reduce( (accumulator, workerNode) => @@ -371,14 +375,16 @@ export abstract class AbstractPool< minimum: round( min( ...this.workerNodes.map( - workerNode => workerNode.usage.runTime.minimum ?? Infinity + workerNode => + workerNode.usage.runTime.minimum ?? Number.POSITIVE_INFINITY ) ) ), maximum: round( max( ...this.workerNodes.map( - workerNode => workerNode.usage.runTime.maximum ?? -Infinity + workerNode => + workerNode.usage.runTime.maximum ?? Number.NEGATIVE_INFINITY ) ) ), @@ -388,11 +394,13 @@ export abstract class AbstractPool< average( this.workerNodes.reduce( (accumulator, workerNode) => - accumulator.concat(workerNode.usage.runTime.history), + accumulator.concat( + workerNode.usage.runTime.history.toArray() + ), [] ) ) - ) + ), }), ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements() .runTime.median && { @@ -400,13 +408,15 @@ export abstract class AbstractPool< median( this.workerNodes.reduce( (accumulator, workerNode) => - accumulator.concat(workerNode.usage.runTime.history), + accumulator.concat( + workerNode.usage.runTime.history.toArray() + ), [] ) ) - ) - }) - } + ), + }), + }, }), ...(this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements() .waitTime.aggregate === true && { @@ -414,14 +424,16 @@ export abstract class AbstractPool< minimum: round( min( ...this.workerNodes.map( - workerNode => workerNode.usage.waitTime.minimum ?? Infinity + workerNode => + workerNode.usage.waitTime.minimum ?? Number.POSITIVE_INFINITY ) ) ), maximum: round( max( ...this.workerNodes.map( - workerNode => workerNode.usage.waitTime.maximum ?? -Infinity + workerNode => + workerNode.usage.waitTime.maximum ?? Number.NEGATIVE_INFINITY ) ) ), @@ -431,11 +443,13 @@ export abstract class AbstractPool< average( this.workerNodes.reduce( (accumulator, workerNode) => - accumulator.concat(workerNode.usage.waitTime.history), + accumulator.concat( + workerNode.usage.waitTime.history.toArray() + ), [] ) ) - ) + ), }), ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements() .waitTime.median && { @@ -443,19 +457,139 @@ export abstract class AbstractPool< median( this.workerNodes.reduce( (accumulator, workerNode) => - accumulator.concat(workerNode.usage.waitTime.history), + accumulator.concat( + workerNode.usage.waitTime.history.toArray() + ), [] ) ) - ) - }) - } - }) + ), + }), + }, + }), + ...(this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements() + .elu.aggregate === true && { + elu: { + idle: { + minimum: round( + min( + ...this.workerNodes.map( + workerNode => + workerNode.usage.elu.idle.minimum ?? + Number.POSITIVE_INFINITY + ) + ) + ), + maximum: round( + max( + ...this.workerNodes.map( + workerNode => + workerNode.usage.elu.idle.maximum ?? + Number.NEGATIVE_INFINITY + ) + ) + ), + ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements() + .elu.average && { + average: round( + average( + this.workerNodes.reduce( + (accumulator, workerNode) => + accumulator.concat( + workerNode.usage.elu.idle.history.toArray() + ), + [] + ) + ) + ), + }), + ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements() + .elu.median && { + median: round( + median( + this.workerNodes.reduce( + (accumulator, workerNode) => + accumulator.concat( + workerNode.usage.elu.idle.history.toArray() + ), + [] + ) + ) + ), + }), + }, + active: { + minimum: round( + min( + ...this.workerNodes.map( + workerNode => + workerNode.usage.elu.active.minimum ?? + Number.POSITIVE_INFINITY + ) + ) + ), + maximum: round( + max( + ...this.workerNodes.map( + workerNode => + workerNode.usage.elu.active.maximum ?? + Number.NEGATIVE_INFINITY + ) + ) + ), + ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements() + .elu.average && { + average: round( + average( + this.workerNodes.reduce( + (accumulator, workerNode) => + accumulator.concat( + workerNode.usage.elu.active.history.toArray() + ), + [] + ) + ) + ), + }), + ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements() + .elu.median && { + median: round( + median( + this.workerNodes.reduce( + (accumulator, workerNode) => + accumulator.concat( + workerNode.usage.elu.active.history.toArray() + ), + [] + ) + ) + ), + }), + }, + utilization: { + average: round( + average( + this.workerNodes.map( + workerNode => workerNode.usage.elu.utilization ?? 0 + ) + ) + ), + median: round( + median( + this.workerNodes.map( + workerNode => workerNode.usage.elu.utilization ?? 0 + ) + ) + ), + }, + }, + }), } } /** - * The pool readiness boolean status. + * Whether the pool is ready or not. + * @returns The pool readiness boolean status. */ private get ready (): boolean { if (this.empty) { @@ -473,7 +607,8 @@ export abstract class AbstractPool< } /** - * The pool emptiness boolean status. + * Whether the pool is empty or not. + * @returns The pool emptiness boolean status. */ protected get empty (): boolean { return this.minimumNumberOfWorkers === 0 && this.workerNodes.length === 0 @@ -481,7 +616,6 @@ export abstract class AbstractPool< /** * The approximate pool utilization. - * * @returns The pool utilization. */ private get utilization (): number { @@ -518,7 +652,6 @@ export abstract class AbstractPool< /** * Checks if the worker id sent in the received message from a worker is valid. - * * @param message - The received message. * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid. */ @@ -527,14 +660,13 @@ export abstract class AbstractPool< throw new Error('Worker message received without worker id') } else if (this.getWorkerNodeKeyByWorkerId(message.workerId) === -1) { throw new Error( - `Worker message received from unknown worker '${message.workerId}'` + `Worker message received from unknown worker '${message.workerId.toString()}'` ) } } /** * Gets the worker node key given its worker id. - * * @param workerId - The worker id. * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise. */ @@ -566,7 +698,7 @@ export abstract class AbstractPool< } if (requireSync) { this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies( - this.getWorkerWorkerChoiceStrategies(), + this.getWorkerChoiceStrategies(), this.opts.workerChoiceStrategyOptions ) for (const workerNodeKey of this.workerNodes.keys()) { @@ -586,7 +718,7 @@ export abstract class AbstractPool< this.opts.workerChoiceStrategyOptions ) this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies( - this.getWorkerWorkerChoiceStrategies(), + this.getWorkerChoiceStrategies(), this.opts.workerChoiceStrategyOptions ) for (const workerNodeKey of this.workerNodes.keys()) { @@ -645,7 +777,7 @@ export abstract class AbstractPool< ...getDefaultTasksQueueOptions( this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers ), - ...tasksQueueOptions + ...tasksQueueOptions, } } @@ -690,8 +822,7 @@ export abstract class AbstractPool< /** * Whether the pool is full or not. - * - * The pool filling boolean status. + * @returns The pool fullness boolean status. */ protected get full (): boolean { return ( @@ -702,14 +833,12 @@ export abstract class AbstractPool< /** * Whether the pool is busy or not. - * - * The pool busyness boolean status. + * @returns The pool busyness boolean status. */ protected abstract get busy (): boolean /** * Whether worker nodes are executing concurrently their tasks quota or not. - * * @returns Worker nodes busyness boolean status. */ protected internalBusy (): boolean { @@ -762,7 +891,11 @@ export abstract class AbstractPool< } else { reject( new Error( - `Task function operation '${message.taskFunctionOperation}' failed on worker ${message.workerId} with error: '${message.workerError?.message}'` + // eslint-disable-next-line @typescript-eslint/restrict-template-expressions + `Task function operation '${message.taskFunctionOperation?.toString()}' failed on worker ${message.workerId?.toString()} with error: '${ + // eslint-disable-next-line @typescript-eslint/restrict-template-expressions + message.workerError?.message + }'` ) ) } @@ -810,7 +943,9 @@ export abstract class AbstractPool< new Error( `Task function operation '${ message.taskFunctionOperation as string - }' failed on worker ${errorResponse?.workerId} with error: '${ + // eslint-disable-next-line @typescript-eslint/restrict-template-expressions + }' failed on worker ${errorResponse?.workerId?.toString()} with error: '${ + // eslint-disable-next-line @typescript-eslint/restrict-template-expressions errorResponse?.workerError?.message }'` ) @@ -862,11 +997,11 @@ export abstract class AbstractPool< const opResult = await this.sendTaskFunctionOperationToWorkers({ taskFunctionOperation: 'add', taskFunctionProperties: buildTaskFunctionProperties(name, fn), - taskFunction: fn.taskFunction.toString() + taskFunction: fn.taskFunction.toString(), }) this.taskFunctions.set(name, fn) this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies( - this.getWorkerWorkerChoiceStrategies() + this.getWorkerChoiceStrategies() ) for (const workerNodeKey of this.workerNodes.keys()) { this.sendStatisticsMessageToWorker(workerNodeKey) @@ -886,14 +1021,14 @@ export abstract class AbstractPool< taskFunctionProperties: buildTaskFunctionProperties( name, this.taskFunctions.get(name) - ) + ), }) for (const workerNode of this.workerNodes) { workerNode.deleteTaskFunctionWorkerUsage(name) } this.taskFunctions.delete(name) this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies( - this.getWorkerWorkerChoiceStrategies() + this.getWorkerChoiceStrategies() ) for (const workerNodeKey of this.workerNodes.keys()) { this.sendStatisticsMessageToWorker(workerNodeKey) @@ -915,47 +1050,77 @@ export abstract class AbstractPool< } /** - * Gets task function strategy, if any. - * + * Gets task function worker choice strategy, if any. * @param name - The task function name. * @returns The task function worker choice strategy if the task function worker choice strategy is defined, `undefined` otherwise. */ - private readonly getTaskFunctionWorkerWorkerChoiceStrategy = ( + private readonly getTaskFunctionWorkerChoiceStrategy = ( name?: string ): WorkerChoiceStrategy | undefined => { - if (name != null) { - return this.listTaskFunctionsProperties().find( - (taskFunctionProperties: TaskFunctionProperties) => - taskFunctionProperties.name === name - )?.strategy + name = name ?? DEFAULT_TASK_NAME + const taskFunctionsProperties = this.listTaskFunctionsProperties() + if (name === DEFAULT_TASK_NAME) { + name = taskFunctionsProperties[1]?.name } + return taskFunctionsProperties.find( + (taskFunctionProperties: TaskFunctionProperties) => + taskFunctionProperties.name === name + )?.strategy + } + + /** + * Gets worker node task function worker choice strategy, if any. + * @param workerNodeKey - The worker node key. + * @param name - The task function name. + * @returns The worker node task function worker choice strategy if the worker node task function worker choice strategy is defined, `undefined` otherwise. + */ + private readonly getWorkerNodeTaskFunctionWorkerChoiceStrategy = ( + workerNodeKey: number, + name?: string + ): WorkerChoiceStrategy | undefined => { + const workerInfo = this.getWorkerInfo(workerNodeKey) + if (workerInfo == null) { + return + } + name = name ?? DEFAULT_TASK_NAME + if (name === DEFAULT_TASK_NAME) { + name = workerInfo.taskFunctionsProperties?.[1]?.name + } + return workerInfo.taskFunctionsProperties?.find( + (taskFunctionProperties: TaskFunctionProperties) => + taskFunctionProperties.name === name + )?.strategy } /** * Gets worker node task function priority, if any. - * * @param workerNodeKey - The worker node key. * @param name - The task function name. - * @returns The task function worker choice priority if the task function worker choice priority is defined, `undefined` otherwise. + * @returns The worker node task function priority if the worker node task function priority is defined, `undefined` otherwise. */ private readonly getWorkerNodeTaskFunctionPriority = ( workerNodeKey: number, name?: string ): number | undefined => { - if (name != null) { - return this.getWorkerInfo(workerNodeKey)?.taskFunctionsProperties?.find( - (taskFunctionProperties: TaskFunctionProperties) => - taskFunctionProperties.name === name - )?.priority + const workerInfo = this.getWorkerInfo(workerNodeKey) + if (workerInfo == null) { + return + } + name = name ?? DEFAULT_TASK_NAME + if (name === DEFAULT_TASK_NAME) { + name = workerInfo.taskFunctionsProperties?.[1]?.name } + return workerInfo.taskFunctionsProperties?.find( + (taskFunctionProperties: TaskFunctionProperties) => + taskFunctionProperties.name === name + )?.priority } /** * Gets the worker choice strategies registered in this pool. - * * @returns The worker choice strategies. */ - private readonly getWorkerWorkerChoiceStrategies = + private readonly getWorkerChoiceStrategies = (): Set => { return new Set([ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion @@ -967,7 +1132,7 @@ export abstract class AbstractPool< ) .filter( (strategy: WorkerChoiceStrategy | undefined) => strategy != null - ) as WorkerChoiceStrategy[]) + ) as WorkerChoiceStrategy[]), ]) } @@ -978,7 +1143,7 @@ export abstract class AbstractPool< taskFunctionProperties: buildTaskFunctionProperties( name, this.taskFunctions.get(name) - ) + ), }) } @@ -1023,18 +1188,18 @@ export abstract class AbstractPool< return } const timestamp = performance.now() - const taskFunctionStrategy = - this.getTaskFunctionWorkerWorkerChoiceStrategy(name) - const workerNodeKey = this.chooseWorkerNode(taskFunctionStrategy) + const workerNodeKey = this.chooseWorkerNode(name) const task: Task = { name: name ?? DEFAULT_TASK_NAME, - // eslint-disable-next-line @typescript-eslint/consistent-type-assertions data: data ?? ({} as Data), priority: this.getWorkerNodeTaskFunctionPriority(workerNodeKey, name), - strategy: taskFunctionStrategy, + strategy: this.getWorkerNodeTaskFunctionWorkerChoiceStrategy( + workerNodeKey, + name + ), transferList, timestamp, - taskId: randomUUID() + taskId: randomUUID(), } // eslint-disable-next-line @typescript-eslint/no-non-null-assertion this.promiseResponseMap.set(task.taskId!, { @@ -1044,9 +1209,9 @@ export abstract class AbstractPool< ...(this.emitter != null && { asyncResource: new AsyncResource('poolifier:task', { triggerAsyncId: this.emitter.asyncId, - requireManualDestroy: true - }) - }) + requireManualDestroy: true, + }), + }), }) if ( this.opts.enableTasksQueue === false || @@ -1060,10 +1225,36 @@ export abstract class AbstractPool< }) } + + /** @inheritDoc */ + public mapExecute ( + data: Iterable, + name?: string, + transferList?: readonly TransferListItem[] + ): Promise { + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition + if (data == null) { + throw new TypeError('data argument must be a defined iterable') + } + if (typeof data[Symbol.iterator] !== 'function') { + throw new TypeError('data argument must be an iterable') + } + if (!Array.isArray(data)) { + data = [...data] + } + return Promise.all( + (data as Data[]).map(data => this.execute(data, name, transferList)) + ) + } + /** * Starts the minimum number of workers. + * @param initWorkerNodeUsage - Whether to initialize the worker node usage or not. @defaultValue false */ private startMinimumNumberOfWorkers (initWorkerNodeUsage = false): void { + if (this.minimumNumberOfWorkers === 0) { + return + } this.startingMinimumNumberOfWorkers = true while ( this.workerNodes.reduce( @@ -1110,7 +1301,7 @@ export abstract class AbstractPool< } this.destroying = true await Promise.all( - this.workerNodes.map(async (_workerNode, workerNodeKey) => { + this.workerNodes.map(async (_, workerNodeKey) => { await this.destroyWorkerNode(workerNodeKey) }) ) @@ -1136,7 +1327,8 @@ export abstract class AbstractPool< } else if (message.kill === 'failure') { reject( new Error( - `Kill message handling failed on worker ${message.workerId}` + // eslint-disable-next-line @typescript-eslint/restrict-template-expressions + `Kill message handling failed on worker ${message.workerId?.toString()}` ) ) } @@ -1149,7 +1341,6 @@ export abstract class AbstractPool< /** * Terminates the worker node given its worker node key. - * * @param workerNodeKey - The worker node key. */ protected async destroyWorkerNode (workerNodeKey: number): Promise { @@ -1172,8 +1363,6 @@ export abstract class AbstractPool< /** * Setup hook to execute code before worker nodes are created in the abstract constructor. * Can be overridden. - * - * @virtual */ protected setupHook (): void { /* Intentionally empty */ @@ -1181,7 +1370,6 @@ export abstract class AbstractPool< /** * Returns whether the worker is the main worker or not. - * * @returns `true` if the worker is the main worker, `false` otherwise. */ protected abstract isMain (): boolean @@ -1189,7 +1377,6 @@ export abstract class AbstractPool< /** * Hook executed before the worker task execution. * Can be overridden. - * * @param workerNodeKey - The worker node key. * @param task - The task to execute. */ @@ -1230,7 +1417,6 @@ export abstract class AbstractPool< /** * Hook executed after the worker task execution. * Can be overridden. - * * @param workerNodeKey - The worker node key. * @param message - The received message. */ @@ -1287,7 +1473,6 @@ export abstract class AbstractPool< /** * Whether the worker node shall update its task function worker usage or not. - * * @param workerNodeKey - The worker node key. * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise. */ @@ -1301,14 +1486,11 @@ export abstract class AbstractPool< } /** - * Chooses a worker node for the next task given the worker choice strategy. - * - * @param workerChoiceStrategy - The worker choice strategy. - * @returns The chosen worker node key + * Chooses a worker node for the next task. + * @param name - The task function name. + * @returns The chosen worker node key. */ - private chooseWorkerNode ( - workerChoiceStrategy?: WorkerChoiceStrategy - ): number { + private chooseWorkerNode (name?: string): number { if (this.shallCreateDynamicWorker()) { const workerNodeKey = this.createAndSetupDynamicWorkerNode() if ( @@ -1319,19 +1501,19 @@ export abstract class AbstractPool< } } // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - return this.workerChoiceStrategiesContext!.execute(workerChoiceStrategy) + return this.workerChoiceStrategiesContext!.execute( + this.getTaskFunctionWorkerChoiceStrategy(name) + ) } /** * Conditions for dynamic worker creation. - * * @returns Whether to create a dynamic worker or not. */ protected abstract shallCreateDynamicWorker (): boolean /** * Sends a message to worker given its worker node key. - * * @param workerNodeKey - The worker node key. * @param message - The message. * @param transferList - The optional array of transferable objects. @@ -1344,7 +1526,6 @@ export abstract class AbstractPool< /** * Initializes the worker node usage with sensible default values gathered during runtime. - * * @param workerNode - The worker node. */ private initWorkerNodeUsage (workerNode: IWorkerNode): void { @@ -1354,7 +1535,8 @@ export abstract class AbstractPool< ) { workerNode.usage.runTime.aggregate = min( ...this.workerNodes.map( - workerNode => workerNode.usage.runTime.aggregate ?? Infinity + workerNode => + workerNode.usage.runTime.aggregate ?? Number.POSITIVE_INFINITY ) ) } @@ -1364,7 +1546,8 @@ export abstract class AbstractPool< ) { workerNode.usage.waitTime.aggregate = min( ...this.workerNodes.map( - workerNode => workerNode.usage.waitTime.aggregate ?? Infinity + workerNode => + workerNode.usage.waitTime.aggregate ?? Number.POSITIVE_INFINITY ) ) } @@ -1374,7 +1557,8 @@ export abstract class AbstractPool< ) { workerNode.usage.elu.active.aggregate = min( ...this.workerNodes.map( - workerNode => workerNode.usage.elu.active.aggregate ?? Infinity + workerNode => + workerNode.usage.elu.active.aggregate ?? Number.POSITIVE_INFINITY ) ) } @@ -1382,7 +1566,6 @@ export abstract class AbstractPool< /** * Creates a new, completely set up worker node. - * * @returns New, completely set up worker node key. */ protected createAndSetupWorkerNode (): number { @@ -1446,7 +1629,6 @@ export abstract class AbstractPool< /** * Creates a new, completely set up dynamic worker node. - * * @returns New, completely set up dynamic worker node key. */ protected createAndSetupDynamicWorkerNode (): number { @@ -1456,6 +1638,7 @@ export abstract class AbstractPool< const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId( message.workerId ) + const workerInfo = this.getWorkerInfo(localWorkerNodeKey) const workerUsage = this.workerNodes[localWorkerNodeKey]?.usage // Kill message received from worker if ( @@ -1464,6 +1647,8 @@ export abstract class AbstractPool< ((this.opts.enableTasksQueue === false && workerUsage.tasks.executing === 0) || (this.opts.enableTasksQueue === true && + workerInfo != null && + !workerInfo.stealing && workerUsage.tasks.executing === 0 && this.tasksQueueSize(localWorkerNodeKey) === 0))) ) { @@ -1475,7 +1660,7 @@ export abstract class AbstractPool< } }) this.sendToWorker(workerNodeKey, { - checkActive: true + checkActive: true, }) if (this.taskFunctions.size > 0) { for (const [taskFunctionName, taskFunctionObject] of this.taskFunctions) { @@ -1485,7 +1670,7 @@ export abstract class AbstractPool< taskFunctionName, taskFunctionObject ), - taskFunction: taskFunctionObject.taskFunction.toString() + taskFunction: taskFunctionObject.taskFunction.toString(), }).catch((error: unknown) => { this.emitter?.emit(PoolEvents.error, error) }) @@ -1508,7 +1693,6 @@ export abstract class AbstractPool< /** * Registers a listener callback on the worker given its worker node key. - * * @param workerNodeKey - The worker node key. * @param listener - The message listener callback. */ @@ -1521,7 +1705,6 @@ export abstract class AbstractPool< /** * Registers once a listener callback on the worker given its worker node key. - * * @param workerNodeKey - The worker node key. * @param listener - The message listener callback. */ @@ -1534,7 +1717,6 @@ export abstract class AbstractPool< /** * Deregisters a listener callback on the worker given its worker node key. - * * @param workerNodeKey - The worker node key. * @param listener - The message listener callback. */ @@ -1548,7 +1730,6 @@ export abstract class AbstractPool< /** * Method hooked up after a worker node has been newly created. * Can be overridden. - * * @param workerNodeKey - The newly created worker node key. */ protected afterWorkerNodeSetup (workerNodeKey: number): void { @@ -1579,14 +1760,12 @@ export abstract class AbstractPool< /** * Sends the startup message to worker given its worker node key. - * * @param workerNodeKey - The worker node key. */ protected abstract sendStartupMessageToWorker (workerNodeKey: number): void /** * Sends the statistics message to worker given its worker node key. - * * @param workerNodeKey - The worker node key. */ private sendStatisticsMessageToWorker (workerNodeKey: number): void { @@ -1597,8 +1776,8 @@ export abstract class AbstractPool< .runTime.aggregate ?? false, elu: this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements() - .elu.aggregate ?? false - } + .elu.aggregate ?? false, + }, }) } @@ -1614,14 +1793,15 @@ export abstract class AbstractPool< } } - private redistributeQueuedTasks (workerNodeKey: number): void { - if (workerNodeKey === -1 || this.cannotStealTask()) { + private redistributeQueuedTasks (sourceWorkerNodeKey: number): void { + if (sourceWorkerNodeKey === -1 || this.cannotStealTask()) { return } - while (this.tasksQueueSize(workerNodeKey) > 0) { + while (this.tasksQueueSize(sourceWorkerNodeKey) > 0) { const destinationWorkerNodeKey = this.workerNodes.reduce( (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => { - return workerNode.info.ready && + return sourceWorkerNodeKey !== workerNodeKey && + workerNode.info.ready && workerNode.usage.tasks.queued < workerNodes[minWorkerNodeKey].usage.tasks.queued ? workerNodeKey @@ -1632,7 +1812,7 @@ export abstract class AbstractPool< this.handleTask( destinationWorkerNodeKey, // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - this.dequeueTask(workerNodeKey)! + this.dequeueTask(sourceWorkerNodeKey)! ) } } @@ -1650,28 +1830,21 @@ export abstract class AbstractPool< this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) && workerNode.getTaskFunctionWorkerUsage(taskName) != null ) { - const taskFunctionWorkerUsage = - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - workerNode.getTaskFunctionWorkerUsage(taskName)! - ++taskFunctionWorkerUsage.tasks.stolen + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + ++workerNode.getTaskFunctionWorkerUsage(taskName)!.tasks.stolen } } private updateTaskSequentiallyStolenStatisticsWorkerUsage ( - workerNodeKey: number + workerNodeKey: number, + taskName: string, + previousTaskName?: string ): void { const workerNode = this.workerNodes[workerNodeKey] // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition if (workerNode?.usage != null) { ++workerNode.usage.tasks.sequentiallyStolen } - } - - private updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage ( - workerNodeKey: number, - taskName: string - ): void { - const workerNode = this.workerNodes[workerNodeKey] if ( this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) && workerNode.getTaskFunctionWorkerUsage(taskName) != null @@ -1679,33 +1852,36 @@ export abstract class AbstractPool< const taskFunctionWorkerUsage = // eslint-disable-next-line @typescript-eslint/no-non-null-assertion workerNode.getTaskFunctionWorkerUsage(taskName)! - ++taskFunctionWorkerUsage.tasks.sequentiallyStolen + if ( + taskFunctionWorkerUsage.tasks.sequentiallyStolen === 0 || + (previousTaskName != null && + previousTaskName === taskName && + taskFunctionWorkerUsage.tasks.sequentiallyStolen > 0) + ) { + ++taskFunctionWorkerUsage.tasks.sequentiallyStolen + } else if (taskFunctionWorkerUsage.tasks.sequentiallyStolen > 0) { + taskFunctionWorkerUsage.tasks.sequentiallyStolen = 0 + } } } private resetTaskSequentiallyStolenStatisticsWorkerUsage ( - workerNodeKey: number + workerNodeKey: number, + taskName: string ): void { const workerNode = this.workerNodes[workerNodeKey] // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition if (workerNode?.usage != null) { workerNode.usage.tasks.sequentiallyStolen = 0 } - } - - private resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage ( - workerNodeKey: number, - taskName: string - ): void { - const workerNode = this.workerNodes[workerNodeKey] if ( this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) && workerNode.getTaskFunctionWorkerUsage(taskName) != null ) { - const taskFunctionWorkerUsage = - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - workerNode.getTaskFunctionWorkerUsage(taskName)! - taskFunctionWorkerUsage.tasks.sequentiallyStolen = 0 + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + workerNode.getTaskFunctionWorkerUsage( + taskName + )!.tasks.sequentiallyStolen = 0 } } @@ -1720,69 +1896,49 @@ export abstract class AbstractPool< ) } const workerInfo = this.getWorkerInfo(workerNodeKey) + if (workerInfo == null) { + throw new Error( + `Worker node with key '${workerNodeKey.toString()}' not found in pool` + ) + } if ( this.cannotStealTask() || (this.info.stealingWorkerNodes ?? 0) > Math.floor(this.workerNodes.length / 2) ) { - if (workerInfo != null && previousStolenTask != null) { + if (previousStolenTask != null) { workerInfo.stealing = false + this.resetTaskSequentiallyStolenStatisticsWorkerUsage( + workerNodeKey, + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + previousStolenTask.name! + ) } return } const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks if ( - workerInfo != null && previousStolenTask != null && - workerNodeTasksUsage.sequentiallyStolen > 0 && (workerNodeTasksUsage.executing > 0 || this.tasksQueueSize(workerNodeKey) > 0) ) { workerInfo.stealing = false - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - for (const taskFunctionProperties of workerInfo.taskFunctionsProperties!) { - this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage( - workerNodeKey, - taskFunctionProperties.name - ) - } - this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey) - return - } - if (workerInfo == null) { - throw new Error( - `Worker node with key '${workerNodeKey}' not found in pool` + this.resetTaskSequentiallyStolenStatisticsWorkerUsage( + workerNodeKey, + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + previousStolenTask.name! ) + return } workerInfo.stealing = true const stolenTask = this.workerNodeStealTask(workerNodeKey) - if ( - this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) && - stolenTask != null - ) { - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - const taskFunctionTasksWorkerUsage = this.workerNodes[ - workerNodeKey + if (stolenTask != null) { + this.updateTaskSequentiallyStolenStatisticsWorkerUsage( + workerNodeKey, // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - ].getTaskFunctionWorkerUsage(stolenTask.name!)!.tasks - if ( - taskFunctionTasksWorkerUsage.sequentiallyStolen === 0 || - (previousStolenTask != null && - previousStolenTask.name === stolenTask.name && - taskFunctionTasksWorkerUsage.sequentiallyStolen > 0) - ) { - this.updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage( - workerNodeKey, - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - stolenTask.name! - ) - } else { - this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage( - workerNodeKey, - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - stolenTask.name! - ) - } + stolenTask.name!, + previousStolenTask?.name + ) } sleep(exponentialDelay(workerNodeTasksUsage.sequentiallyStolen)) .then(() => { @@ -1812,9 +1968,8 @@ export abstract class AbstractPool< ) if (sourceWorkerNode != null) { // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - const task = sourceWorkerNode.dequeueTask(1)! + const task = sourceWorkerNode.dequeueLastPrioritizedTask()! this.handleTask(workerNodeKey, task) - this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey) // eslint-disable-next-line @typescript-eslint/no-non-null-assertion this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey, task.name!) return task @@ -1826,17 +1981,18 @@ export abstract class AbstractPool< ): void => { if ( this.cannotStealTask() || + this.hasBackPressure() || (this.info.stealingWorkerNodes ?? 0) > Math.floor(this.workerNodes.length / 2) ) { return } - const { workerId } = eventDetail const sizeOffset = 1 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion if (this.opts.tasksQueueOptions!.size! <= sizeOffset) { return } + const { workerId } = eventDetail const sourceWorkerNode = this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)] const workerNodes = this.workerNodes @@ -1858,12 +2014,12 @@ export abstract class AbstractPool< const workerInfo = this.getWorkerInfo(workerNodeKey) if (workerInfo == null) { throw new Error( - `Worker node with key '${workerNodeKey}' not found in pool` + `Worker node with key '${workerNodeKey.toString()}' not found in pool` ) } workerInfo.stealing = true // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - const task = sourceWorkerNode.dequeueTask(1)! + const task = sourceWorkerNode.dequeueLastPrioritizedTask()! this.handleTask(workerNodeKey, task) // eslint-disable-next-line @typescript-eslint/no-non-null-assertion this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey, task.name!) @@ -1872,8 +2028,15 @@ export abstract class AbstractPool< } } + private setTasksQueuePriority (workerNodeKey: number): void { + this.workerNodes[workerNodeKey].setTasksQueuePriority( + this.getTasksQueuePriority() + ) + } + /** * This method is the message listener registered on each worker. + * @param message - The message received from the worker. */ protected readonly workerMessageListener = ( message: MessageValue @@ -1890,6 +2053,7 @@ export abstract class AbstractPool< if (workerInfo != null) { workerInfo.taskFunctionsProperties = taskFunctionsProperties this.sendStatisticsMessageToWorker(workerNodeKey) + this.setTasksQueuePriority(workerNodeKey) } } else if (taskId != null) { // Task execution response received from worker @@ -1907,13 +2071,15 @@ export abstract class AbstractPool< private handleWorkerReadyResponse (message: MessageValue): void { const { workerId, ready, taskFunctionsProperties } = message if (ready == null || !ready) { - throw new Error(`Worker ${workerId} failed to initialize`) + // eslint-disable-next-line @typescript-eslint/restrict-template-expressions + throw new Error(`Worker ${workerId?.toString()} failed to initialize`) } const workerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId) const workerNode = this.workerNodes[workerNodeKey] workerNode.info.ready = ready workerNode.info.taskFunctionsProperties = taskFunctionsProperties this.sendStatisticsMessageToWorker(workerNodeKey) + this.setTasksQueuePriority(workerNodeKey) this.checkAndEmitReadyEvent() } @@ -1967,7 +2133,7 @@ export abstract class AbstractPool< ) { workerNode.emit('idle', { workerId, - workerNodeKey + workerNodeKey, }) } } @@ -1993,7 +2159,6 @@ export abstract class AbstractPool< /** * Gets the worker information given its worker node key. - * * @param workerNodeKey - The worker node key. * @returns The worker information. */ @@ -2001,9 +2166,14 @@ export abstract class AbstractPool< return this.workerNodes[workerNodeKey]?.info } + private getTasksQueuePriority (): boolean { + return this.listTaskFunctionsProperties().some( + taskFunctionProperties => taskFunctionProperties.priority != null + ) + } + /** * Creates a worker node. - * * @returns The created worker node. */ private createWorkerNode (): IWorkerNode { @@ -2018,8 +2188,8 @@ export abstract class AbstractPool< getDefaultTasksQueueOptions( this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers ).size, - tasksQueueBucketSize: - (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers) * 2 + tasksQueueBucketSize: defaultBucketSize, + tasksQueuePriority: this.getTasksQueuePriority(), } ) // Flag the worker node as ready at pool startup. @@ -2031,7 +2201,6 @@ export abstract class AbstractPool< /** * Adds the given worker node in the pool worker nodes. - * * @param workerNode - The worker node. * @returns The added worker node key. * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found. @@ -2054,7 +2223,6 @@ export abstract class AbstractPool< /** * Removes the worker node from the pool worker nodes. - * * @param workerNode - The worker node. */ private removeWorkerNode (workerNode: IWorkerNode): void { @@ -2084,7 +2252,6 @@ export abstract class AbstractPool< /** * Executes the given task on the worker given its worker node key. - * * @param workerNodeKey - The worker node key. * @param task - The task to execute. */ @@ -2100,11 +2267,8 @@ export abstract class AbstractPool< return tasksQueueSize } - private dequeueTask ( - workerNodeKey: number, - bucket?: number - ): Task | undefined { - return this.workerNodes[workerNodeKey].dequeueTask(bucket) + private dequeueTask (workerNodeKey: number): Task | undefined { + return this.workerNodes[workerNodeKey].dequeueTask() } private tasksQueueSize (workerNodeKey: number): number {