X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=712de0b7da4c38c2808e1be61da7fced8e372da0;hb=884743b122a66be45c94d83d9230e28a9ab43836;hp=536f4a03d8a0b2e052bc515b972d1de8098a94ae;hpb=e44639e9af74427b71f1556ff7ec9f7606373e0d;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 536f4a03..712de0b7 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -1,17 +1,21 @@ +import { AsyncResource } from 'node:async_hooks' import { randomUUID } from 'node:crypto' +import { EventEmitterAsyncResource } from 'node:events' import { performance } from 'node:perf_hooks' import type { TransferListItem } from 'node:worker_threads' -import { EventEmitterAsyncResource } from 'node:events' -import { AsyncResource } from 'node:async_hooks' + +import { defaultBucketSize } from '../priority-queue.js' import type { MessageValue, PromiseResponseWrapper, - Task + Task, + TaskFunctionProperties, } from '../utility-types.js' import { + average, + buildTaskFunctionProperties, DEFAULT_TASK_NAME, EMPTY_FUNCTION, - average, exponentialDelay, isKillBehavior, isPlainObject, @@ -19,10 +23,13 @@ import { median, min, round, - sleep + sleep, } from '../utils.js' +import type { + TaskFunction, + TaskFunctionObject, +} from '../worker/task-functions.js' import { KillBehaviors } from '../worker/worker-options.js' -import type { TaskFunction } from '../worker/task-functions.js' import { type IPool, PoolEvents, @@ -30,26 +37,18 @@ import { type PoolOptions, type PoolType, PoolTypes, - type TasksQueueOptions + type TasksQueueOptions, } from './pool.js' -import type { - IWorker, - IWorkerNode, - WorkerInfo, - WorkerNodeEventDetail, - WorkerType -} from './worker.js' import { Measurements, WorkerChoiceStrategies, type WorkerChoiceStrategy, - type WorkerChoiceStrategyOptions + type WorkerChoiceStrategyOptions, } from './selection-strategies/selection-strategies-types.js' -import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context.js' -import { version } from './version.js' -import { WorkerNode } from './worker-node.js' +import { WorkerChoiceStrategiesContext } from './selection-strategies/worker-choice-strategies-context.js' import { checkFilePath, + checkValidPriority, checkValidTasksQueueOptions, checkValidWorkerChoiceStrategy, getDefaultTasksQueueOptions, @@ -57,12 +56,20 @@ import { updateRunTimeWorkerUsage, updateTaskStatisticsWorkerUsage, updateWaitTimeWorkerUsage, - waitWorkerNodeEvents + waitWorkerNodeEvents, } from './utils.js' +import { version } from './version.js' +import type { + IWorker, + IWorkerNode, + WorkerInfo, + WorkerNodeEventDetail, + WorkerType, +} from './worker.js' +import { WorkerNode } from './worker-node.js' /** * Base class that implements some shared logic for all poolifier pools. - * * @typeParam Worker - Type of worker which manages this pool. * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data. * @typeParam Response - Type of execution response. This can only be structured-cloneable data. @@ -73,7 +80,7 @@ export abstract class AbstractPool< Response = unknown > implements IPool { /** @inheritDoc */ - public readonly workerNodes: Array> = [] + public readonly workerNodes: IWorkerNode[] = [] /** @inheritDoc */ public emitter?: EventEmitterAsyncResource @@ -81,28 +88,36 @@ export abstract class AbstractPool< /** * The task execution response promise map: * - `key`: The message id of each submitted task. - * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks. + * - `value`: An object that contains task's worker node key, execution response promise resolve and reject callbacks, async resource. * * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id. */ - protected promiseResponseMap: Map> = - new Map>() + protected promiseResponseMap: Map< + `${string}-${string}-${string}-${string}-${string}`, + PromiseResponseWrapper + > = new Map< + `${string}-${string}-${string}-${string}-${string}`, + PromiseResponseWrapper + >() /** - * Worker choice strategy context referencing a worker choice algorithm implementation. + * Worker choice strategies context referencing worker choice algorithms implementation. */ - protected workerChoiceStrategyContext: WorkerChoiceStrategyContext< - Worker, - Data, - Response + protected workerChoiceStrategiesContext?: WorkerChoiceStrategiesContext< + Worker, + Data, + Response > /** * The task functions added at runtime map: * - `key`: The task function name. - * - `value`: The task function itself. + * - `value`: The task function object. */ - private readonly taskFunctions: Map> + private readonly taskFunctions: Map< + string, + TaskFunctionObject + > /** * Whether the pool is started or not. @@ -116,6 +131,10 @@ export abstract class AbstractPool< * Whether the pool is destroying or not. */ private destroying: boolean + /** + * Whether the minimum number of workers is starting or not. + */ + private startingMinimumNumberOfWorkers: boolean /** * Whether the pool ready event has been emitted or not. */ @@ -123,11 +142,10 @@ export abstract class AbstractPool< /** * The start timestamp of the pool. */ - private readonly startTimestamp + private startTimestamp?: number /** * Constructs a new poolifier pool. - * * @param minimumNumberOfWorkers - Minimum number of workers that this pool manages. * @param filePath - Path to the worker file. * @param opts - Options for the pool. @@ -154,31 +172,31 @@ export abstract class AbstractPool< this.enqueueTask = this.enqueueTask.bind(this) if (this.opts.enableEvents === true) { - this.initializeEventEmitter() + this.initEventEmitter() } - this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext< - Worker, - Data, - Response + this.workerChoiceStrategiesContext = new WorkerChoiceStrategiesContext< + Worker, + Data, + Response >( this, - this.opts.workerChoiceStrategy, + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + [this.opts.workerChoiceStrategy!], this.opts.workerChoiceStrategyOptions ) this.setupHook() - this.taskFunctions = new Map>() + this.taskFunctions = new Map>() this.started = false this.starting = false this.destroying = false this.readyEventEmitted = false + this.startingMinimumNumberOfWorkers = false if (this.opts.startWorkers === true) { this.start() } - - this.startTimestamp = performance.now() } private checkPoolType (): void { @@ -189,7 +207,9 @@ export abstract class AbstractPool< } } - private checkMinimumNumberOfWorkers (minimumNumberOfWorkers: number): void { + private checkMinimumNumberOfWorkers ( + minimumNumberOfWorkers: number | undefined + ): void { if (minimumNumberOfWorkers == null) { throw new Error( 'Cannot instantiate a pool without specifying the number of workers' @@ -210,13 +230,11 @@ export abstract class AbstractPool< private checkPoolOptions (opts: PoolOptions): void { if (isPlainObject(opts)) { this.opts.startWorkers = opts.startWorkers ?? true - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - checkValidWorkerChoiceStrategy(opts.workerChoiceStrategy!) + checkValidWorkerChoiceStrategy(opts.workerChoiceStrategy) this.opts.workerChoiceStrategy = opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN this.checkValidWorkerChoiceStrategyOptions( - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - opts.workerChoiceStrategyOptions! + opts.workerChoiceStrategyOptions ) if (opts.workerChoiceStrategyOptions != null) { this.opts.workerChoiceStrategyOptions = opts.workerChoiceStrategyOptions @@ -225,11 +243,9 @@ export abstract class AbstractPool< this.opts.enableEvents = opts.enableEvents ?? true this.opts.enableTasksQueue = opts.enableTasksQueue ?? false if (this.opts.enableTasksQueue) { - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - checkValidTasksQueueOptions(opts.tasksQueueOptions!) + checkValidTasksQueueOptions(opts.tasksQueueOptions) this.opts.tasksQueueOptions = this.buildTasksQueueOptions( - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - opts.tasksQueueOptions! + opts.tasksQueueOptions ) } } else { @@ -238,7 +254,7 @@ export abstract class AbstractPool< } private checkValidWorkerChoiceStrategyOptions ( - workerChoiceStrategyOptions: WorkerChoiceStrategyOptions + workerChoiceStrategyOptions: WorkerChoiceStrategyOptions | undefined ): void { if ( workerChoiceStrategyOptions != null && @@ -269,9 +285,9 @@ export abstract class AbstractPool< } } - private initializeEventEmitter (): void { + private initEventEmitter (): void { this.emitter = new EventEmitterAsyncResource({ - name: `poolifier:${this.type}-${this.worker}-pool` + name: `poolifier:${this.type}-${this.worker}-pool`, }) } @@ -284,13 +300,16 @@ export abstract class AbstractPool< started: this.started, ready: this.ready, // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - strategy: this.opts.workerChoiceStrategy!, + defaultStrategy: this.opts.workerChoiceStrategy!, + strategyRetries: this.workerChoiceStrategiesContext?.retriesCount ?? 0, minSize: this.minimumNumberOfWorkers, maxSize: this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers, - ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements() - .runTime.aggregate && - this.workerChoiceStrategyContext?.getTaskStatisticsRequirements() - .waitTime.aggregate && { utilization: round(this.utilization) }), + ...(this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements() + .runTime.aggregate === true && + this.workerChoiceStrategiesContext.getTaskStatisticsRequirements() + .waitTime.aggregate && { + utilization: round(this.utilization), + }), workerNodes: this.workerNodes.length, idleWorkerNodes: this.workerNodes.reduce( (accumulator, workerNode) => @@ -304,10 +323,10 @@ export abstract class AbstractPool< (accumulator, workerNode) => workerNode.info.stealing ? accumulator + 1 : accumulator, 0 - ) + ), }), busyWorkerNodes: this.workerNodes.reduce( - (accumulator, _workerNode, workerNodeKey) => + (accumulator, _, workerNodeKey) => this.isWorkerNodeBusy(workerNodeKey) ? accumulator + 1 : accumulator, 0 ), @@ -326,123 +345,256 @@ export abstract class AbstractPool< (accumulator, workerNode) => accumulator + workerNode.usage.tasks.queued, 0 - ) + ), }), ...(this.opts.enableTasksQueue === true && { maxQueuedTasks: this.workerNodes.reduce( (accumulator, workerNode) => - accumulator + (workerNode.usage.tasks?.maxQueued ?? 0), + accumulator + (workerNode.usage.tasks.maxQueued ?? 0), 0 - ) + ), }), ...(this.opts.enableTasksQueue === true && { - backPressure: this.hasBackPressure() + backPressure: this.hasBackPressure(), }), ...(this.opts.enableTasksQueue === true && { stolenTasks: this.workerNodes.reduce( (accumulator, workerNode) => accumulator + workerNode.usage.tasks.stolen, 0 - ) + ), }), failedTasks: this.workerNodes.reduce( (accumulator, workerNode) => accumulator + workerNode.usage.tasks.failed, 0 ), - ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements() - .runTime.aggregate && { + ...(this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements() + .runTime.aggregate === true && { runTime: { minimum: round( min( ...this.workerNodes.map( - workerNode => workerNode.usage.runTime?.minimum ?? Infinity + workerNode => + workerNode.usage.runTime.minimum ?? Number.POSITIVE_INFINITY ) ) ), maximum: round( max( ...this.workerNodes.map( - workerNode => workerNode.usage.runTime?.maximum ?? -Infinity + workerNode => + workerNode.usage.runTime.maximum ?? Number.NEGATIVE_INFINITY ) ) ), - ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements() + ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements() .runTime.average && { average: round( average( this.workerNodes.reduce( (accumulator, workerNode) => - accumulator.concat(workerNode.usage.runTime.history), + accumulator.concat( + workerNode.usage.runTime.history.toArray() + ), [] ) ) - ) + ), }), - ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements() + ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements() .runTime.median && { median: round( median( this.workerNodes.reduce( (accumulator, workerNode) => - accumulator.concat(workerNode.usage.runTime.history), + accumulator.concat( + workerNode.usage.runTime.history.toArray() + ), [] ) ) - ) - }) - } + ), + }), + }, }), - ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements() - .waitTime.aggregate && { + ...(this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements() + .waitTime.aggregate === true && { waitTime: { minimum: round( min( ...this.workerNodes.map( - workerNode => workerNode.usage.waitTime?.minimum ?? Infinity + workerNode => + workerNode.usage.waitTime.minimum ?? Number.POSITIVE_INFINITY ) ) ), maximum: round( max( ...this.workerNodes.map( - workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity + workerNode => + workerNode.usage.waitTime.maximum ?? Number.NEGATIVE_INFINITY ) ) ), - ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements() + ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements() .waitTime.average && { average: round( average( this.workerNodes.reduce( (accumulator, workerNode) => - accumulator.concat(workerNode.usage.waitTime.history), + accumulator.concat( + workerNode.usage.waitTime.history.toArray() + ), [] ) ) - ) + ), }), - ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements() + ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements() .waitTime.median && { median: round( median( this.workerNodes.reduce( (accumulator, workerNode) => - accumulator.concat(workerNode.usage.waitTime.history), + accumulator.concat( + workerNode.usage.waitTime.history.toArray() + ), [] ) ) - ) - }) - } - }) + ), + }), + }, + }), + ...(this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements() + .elu.aggregate === true && { + elu: { + idle: { + minimum: round( + min( + ...this.workerNodes.map( + workerNode => + workerNode.usage.elu.idle.minimum ?? + Number.POSITIVE_INFINITY + ) + ) + ), + maximum: round( + max( + ...this.workerNodes.map( + workerNode => + workerNode.usage.elu.idle.maximum ?? + Number.NEGATIVE_INFINITY + ) + ) + ), + ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements() + .elu.average && { + average: round( + average( + this.workerNodes.reduce( + (accumulator, workerNode) => + accumulator.concat( + workerNode.usage.elu.idle.history.toArray() + ), + [] + ) + ) + ), + }), + ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements() + .elu.median && { + median: round( + median( + this.workerNodes.reduce( + (accumulator, workerNode) => + accumulator.concat( + workerNode.usage.elu.idle.history.toArray() + ), + [] + ) + ) + ), + }), + }, + active: { + minimum: round( + min( + ...this.workerNodes.map( + workerNode => + workerNode.usage.elu.active.minimum ?? + Number.POSITIVE_INFINITY + ) + ) + ), + maximum: round( + max( + ...this.workerNodes.map( + workerNode => + workerNode.usage.elu.active.maximum ?? + Number.NEGATIVE_INFINITY + ) + ) + ), + ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements() + .elu.average && { + average: round( + average( + this.workerNodes.reduce( + (accumulator, workerNode) => + accumulator.concat( + workerNode.usage.elu.active.history.toArray() + ), + [] + ) + ) + ), + }), + ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements() + .elu.median && { + median: round( + median( + this.workerNodes.reduce( + (accumulator, workerNode) => + accumulator.concat( + workerNode.usage.elu.active.history.toArray() + ), + [] + ) + ) + ), + }), + }, + utilization: { + average: round( + average( + this.workerNodes.map( + workerNode => workerNode.usage.elu.utilization ?? 0 + ) + ) + ), + median: round( + median( + this.workerNodes.map( + workerNode => workerNode.usage.elu.utilization ?? 0 + ) + ) + ), + }, + }, + }), } } /** - * The pool readiness boolean status. + * Whether the pool is ready or not. + * @returns The pool readiness boolean status. */ private get ready (): boolean { + if (this.empty) { + return false + } return ( this.workerNodes.reduce( (accumulator, workerNode) => @@ -454,23 +606,33 @@ export abstract class AbstractPool< ) } + /** + * Whether the pool is empty or not. + * @returns The pool emptiness boolean status. + */ + protected get empty (): boolean { + return this.minimumNumberOfWorkers === 0 && this.workerNodes.length === 0 + } + /** * The approximate pool utilization. - * * @returns The pool utilization. */ private get utilization (): number { + if (this.startTimestamp == null) { + return 0 + } const poolTimeCapacity = (performance.now() - this.startTimestamp) * (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers) const totalTasksRunTime = this.workerNodes.reduce( (accumulator, workerNode) => - accumulator + (workerNode.usage.runTime?.aggregate ?? 0), + accumulator + (workerNode.usage.runTime.aggregate ?? 0), 0 ) const totalTasksWaitTime = this.workerNodes.reduce( (accumulator, workerNode) => - accumulator + (workerNode.usage.waitTime?.aggregate ?? 0), + accumulator + (workerNode.usage.waitTime.aggregate ?? 0), 0 ) return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity @@ -490,7 +652,6 @@ export abstract class AbstractPool< /** * Checks if the worker id sent in the received message from a worker is valid. - * * @param message - The received message. * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid. */ @@ -499,14 +660,13 @@ export abstract class AbstractPool< throw new Error('Worker message received without worker id') } else if (this.getWorkerNodeKeyByWorkerId(message.workerId) === -1) { throw new Error( - `Worker message received from unknown worker '${message.workerId}'` + `Worker message received from unknown worker '${message.workerId.toString()}'` ) } } /** * Gets the worker node key given its worker id. - * * @param workerId - The worker id. * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise. */ @@ -521,31 +681,52 @@ export abstract class AbstractPool< workerChoiceStrategy: WorkerChoiceStrategy, workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions ): void { + let requireSync = false checkValidWorkerChoiceStrategy(workerChoiceStrategy) - this.opts.workerChoiceStrategy = workerChoiceStrategy - this.workerChoiceStrategyContext.setWorkerChoiceStrategy( - this.opts.workerChoiceStrategy - ) if (workerChoiceStrategyOptions != null) { - this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions) + requireSync = !this.setWorkerChoiceStrategyOptions( + workerChoiceStrategyOptions + ) } - for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) { - workerNode.resetUsage() - this.sendStatisticsMessageToWorker(workerNodeKey) + if (workerChoiceStrategy !== this.opts.workerChoiceStrategy) { + this.opts.workerChoiceStrategy = workerChoiceStrategy + this.workerChoiceStrategiesContext?.setDefaultWorkerChoiceStrategy( + this.opts.workerChoiceStrategy, + this.opts.workerChoiceStrategyOptions + ) + requireSync = true + } + if (requireSync) { + this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies( + this.getWorkerChoiceStrategies(), + this.opts.workerChoiceStrategyOptions + ) + for (const workerNodeKey of this.workerNodes.keys()) { + this.sendStatisticsMessageToWorker(workerNodeKey) + } } } /** @inheritDoc */ public setWorkerChoiceStrategyOptions ( - workerChoiceStrategyOptions: WorkerChoiceStrategyOptions - ): void { + workerChoiceStrategyOptions: WorkerChoiceStrategyOptions | undefined + ): boolean { this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions) if (workerChoiceStrategyOptions != null) { this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions + this.workerChoiceStrategiesContext?.setOptions( + this.opts.workerChoiceStrategyOptions + ) + this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies( + this.getWorkerChoiceStrategies(), + this.opts.workerChoiceStrategyOptions + ) + for (const workerNodeKey of this.workerNodes.keys()) { + this.sendStatisticsMessageToWorker(workerNodeKey) + } + return true } - this.workerChoiceStrategyContext.setOptions( - this.opts.workerChoiceStrategyOptions - ) + return false } /** @inheritDoc */ @@ -559,12 +740,13 @@ export abstract class AbstractPool< this.flushTasksQueues() } this.opts.enableTasksQueue = enable - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - this.setTasksQueueOptions(tasksQueueOptions!) + this.setTasksQueueOptions(tasksQueueOptions) } /** @inheritDoc */ - public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void { + public setTasksQueueOptions ( + tasksQueueOptions: TasksQueueOptions | undefined + ): void { if (this.opts.enableTasksQueue === true) { checkValidTasksQueueOptions(tasksQueueOptions) this.opts.tasksQueueOptions = @@ -589,13 +771,13 @@ export abstract class AbstractPool< } private buildTasksQueueOptions ( - tasksQueueOptions: TasksQueueOptions + tasksQueueOptions: TasksQueueOptions | undefined ): TasksQueueOptions { return { ...getDefaultTasksQueueOptions( this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers ), - ...tasksQueueOptions + ...tasksQueueOptions, } } @@ -606,13 +788,13 @@ export abstract class AbstractPool< } private setTaskStealing (): void { - for (const [workerNodeKey] of this.workerNodes.entries()) { + for (const workerNodeKey of this.workerNodes.keys()) { this.workerNodes[workerNodeKey].on('idle', this.handleWorkerNodeIdleEvent) } } private unsetTaskStealing (): void { - for (const [workerNodeKey] of this.workerNodes.entries()) { + for (const workerNodeKey of this.workerNodes.keys()) { this.workerNodes[workerNodeKey].off( 'idle', this.handleWorkerNodeIdleEvent @@ -621,7 +803,7 @@ export abstract class AbstractPool< } private setTasksStealingOnBackPressure (): void { - for (const [workerNodeKey] of this.workerNodes.entries()) { + for (const workerNodeKey of this.workerNodes.keys()) { this.workerNodes[workerNodeKey].on( 'backPressure', this.handleWorkerNodeBackPressureEvent @@ -630,7 +812,7 @@ export abstract class AbstractPool< } private unsetTasksStealingOnBackPressure (): void { - for (const [workerNodeKey] of this.workerNodes.entries()) { + for (const workerNodeKey of this.workerNodes.keys()) { this.workerNodes[workerNodeKey].off( 'backPressure', this.handleWorkerNodeBackPressureEvent @@ -640,8 +822,7 @@ export abstract class AbstractPool< /** * Whether the pool is full or not. - * - * The pool filling boolean status. + * @returns The pool fullness boolean status. */ protected get full (): boolean { return ( @@ -652,14 +833,12 @@ export abstract class AbstractPool< /** * Whether the pool is busy or not. - * - * The pool busyness boolean status. + * @returns The pool busyness boolean status. */ protected abstract get busy (): boolean /** * Whether worker nodes are executing concurrently their tasks quota or not. - * * @returns Worker nodes busyness boolean status. */ protected internalBusy (): boolean { @@ -702,23 +881,20 @@ export abstract class AbstractPool< message: MessageValue ): void => { this.checkMessageWorkerId(message) - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - const workerId = this.getWorkerInfo(workerNodeKey).id! + const workerId = this.getWorkerInfo(workerNodeKey)?.id if ( message.taskFunctionOperationStatus != null && message.workerId === workerId ) { if (message.taskFunctionOperationStatus) { resolve(true) - } else if (!message.taskFunctionOperationStatus) { + } else { reject( new Error( - `Task function operation '${ - message.taskFunctionOperation as string - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - }' failed on worker ${message.workerId} with error: '${ - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - message.workerError!.message + // eslint-disable-next-line @typescript-eslint/restrict-template-expressions + `Task function operation '${message.taskFunctionOperation?.toString()}' failed on worker ${message.workerId?.toString()} with error: '${ + // eslint-disable-next-line @typescript-eslint/restrict-template-expressions + message.workerError?.message }'` ) ) @@ -767,11 +943,10 @@ export abstract class AbstractPool< new Error( `Task function operation '${ message.taskFunctionOperation as string - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - }' failed on worker ${errorResponse! - .workerId!} with error: '${ - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - errorResponse!.workerError!.message + // eslint-disable-next-line @typescript-eslint/restrict-template-expressions + }' failed on worker ${errorResponse?.workerId?.toString()} with error: '${ + // eslint-disable-next-line @typescript-eslint/restrict-template-expressions + errorResponse?.workerError?.message }'` ) ) @@ -783,7 +958,7 @@ export abstract class AbstractPool< } } } - for (const [workerNodeKey] of this.workerNodes.entries()) { + for (const workerNodeKey of this.workerNodes.keys()) { this.registerWorkerMessageListener( workerNodeKey, taskFunctionOperationsListener @@ -795,21 +970,15 @@ export abstract class AbstractPool< /** @inheritDoc */ public hasTaskFunction (name: string): boolean { - for (const workerNode of this.workerNodes) { - if ( - Array.isArray(workerNode.info.taskFunctionNames) && - workerNode.info.taskFunctionNames.includes(name) - ) { - return true - } - } - return false + return this.listTaskFunctionsProperties().some( + taskFunctionProperties => taskFunctionProperties.name === name + ) } /** @inheritDoc */ public async addTaskFunction ( name: string, - fn: TaskFunction + fn: TaskFunction | TaskFunctionObject ): Promise { if (typeof name !== 'string') { throw new TypeError('name argument must be a string') @@ -817,15 +986,26 @@ export abstract class AbstractPool< if (typeof name === 'string' && name.trim().length === 0) { throw new TypeError('name argument must not be an empty string') } - if (typeof fn !== 'function') { - throw new TypeError('fn argument must be a function') + if (typeof fn === 'function') { + fn = { taskFunction: fn } satisfies TaskFunctionObject + } + if (typeof fn.taskFunction !== 'function') { + throw new TypeError('taskFunction property must be a function') } + checkValidPriority(fn.priority) + checkValidWorkerChoiceStrategy(fn.strategy) const opResult = await this.sendTaskFunctionOperationToWorkers({ taskFunctionOperation: 'add', - taskFunctionName: name, - taskFunction: fn.toString() + taskFunctionProperties: buildTaskFunctionProperties(name, fn), + taskFunction: fn.taskFunction.toString(), }) this.taskFunctions.set(name, fn) + this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies( + this.getWorkerChoiceStrategies() + ) + for (const workerNodeKey of this.workerNodes.keys()) { + this.sendStatisticsMessageToWorker(workerNodeKey) + } return opResult } @@ -838,40 +1018,135 @@ export abstract class AbstractPool< } const opResult = await this.sendTaskFunctionOperationToWorkers({ taskFunctionOperation: 'remove', - taskFunctionName: name + taskFunctionProperties: buildTaskFunctionProperties( + name, + this.taskFunctions.get(name) + ), }) - this.deleteTaskFunctionWorkerUsages(name) + for (const workerNode of this.workerNodes) { + workerNode.deleteTaskFunctionWorkerUsage(name) + } this.taskFunctions.delete(name) + this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies( + this.getWorkerChoiceStrategies() + ) + for (const workerNodeKey of this.workerNodes.keys()) { + this.sendStatisticsMessageToWorker(workerNodeKey) + } return opResult } /** @inheritDoc */ - public listTaskFunctionNames (): string[] { + public listTaskFunctionsProperties (): TaskFunctionProperties[] { for (const workerNode of this.workerNodes) { if ( - Array.isArray(workerNode.info.taskFunctionNames) && - workerNode.info.taskFunctionNames.length > 0 + Array.isArray(workerNode.info.taskFunctionsProperties) && + workerNode.info.taskFunctionsProperties.length > 0 ) { - return workerNode.info.taskFunctionNames + return workerNode.info.taskFunctionsProperties } } return [] } + /** + * Gets task function worker choice strategy, if any. + * @param name - The task function name. + * @returns The task function worker choice strategy if the task function worker choice strategy is defined, `undefined` otherwise. + */ + private readonly getTaskFunctionWorkerChoiceStrategy = ( + name?: string + ): WorkerChoiceStrategy | undefined => { + name = name ?? DEFAULT_TASK_NAME + const taskFunctionsProperties = this.listTaskFunctionsProperties() + if (name === DEFAULT_TASK_NAME) { + name = taskFunctionsProperties[1]?.name + } + return taskFunctionsProperties.find( + (taskFunctionProperties: TaskFunctionProperties) => + taskFunctionProperties.name === name + )?.strategy + } + + /** + * Gets worker node task function worker choice strategy, if any. + * @param workerNodeKey - The worker node key. + * @param name - The task function name. + * @returns The worker node task function worker choice strategy if the worker node task function worker choice strategy is defined, `undefined` otherwise. + */ + private readonly getWorkerNodeTaskFunctionWorkerChoiceStrategy = ( + workerNodeKey: number, + name?: string + ): WorkerChoiceStrategy | undefined => { + const workerInfo = this.getWorkerInfo(workerNodeKey) + if (workerInfo == null) { + return + } + name = name ?? DEFAULT_TASK_NAME + if (name === DEFAULT_TASK_NAME) { + name = workerInfo.taskFunctionsProperties?.[1]?.name + } + return workerInfo.taskFunctionsProperties?.find( + (taskFunctionProperties: TaskFunctionProperties) => + taskFunctionProperties.name === name + )?.strategy + } + + /** + * Gets worker node task function priority, if any. + * @param workerNodeKey - The worker node key. + * @param name - The task function name. + * @returns The worker node task function priority if the worker node task function priority is defined, `undefined` otherwise. + */ + private readonly getWorkerNodeTaskFunctionPriority = ( + workerNodeKey: number, + name?: string + ): number | undefined => { + const workerInfo = this.getWorkerInfo(workerNodeKey) + if (workerInfo == null) { + return + } + name = name ?? DEFAULT_TASK_NAME + if (name === DEFAULT_TASK_NAME) { + name = workerInfo.taskFunctionsProperties?.[1]?.name + } + return workerInfo.taskFunctionsProperties?.find( + (taskFunctionProperties: TaskFunctionProperties) => + taskFunctionProperties.name === name + )?.priority + } + + /** + * Gets the worker choice strategies registered in this pool. + * @returns The worker choice strategies. + */ + private readonly getWorkerChoiceStrategies = + (): Set => { + return new Set([ + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + this.opts.workerChoiceStrategy!, + ...(this.listTaskFunctionsProperties() + .map( + (taskFunctionProperties: TaskFunctionProperties) => + taskFunctionProperties.strategy + ) + .filter( + (strategy: WorkerChoiceStrategy | undefined) => strategy != null + ) as WorkerChoiceStrategy[]), + ]) + } + /** @inheritDoc */ public async setDefaultTaskFunction (name: string): Promise { return await this.sendTaskFunctionOperationToWorkers({ taskFunctionOperation: 'default', - taskFunctionName: name + taskFunctionProperties: buildTaskFunctionProperties( + name, + this.taskFunctions.get(name) + ), }) } - private deleteTaskFunctionWorkerUsages (name: string): void { - for (const workerNode of this.workerNodes) { - workerNode.deleteTaskFunctionWorkerUsage(name) - } - } - private shallExecuteTask (workerNodeKey: number): boolean { return ( this.tasksQueueSize(workerNodeKey) === 0 && @@ -885,7 +1160,7 @@ export abstract class AbstractPool< public async execute ( data?: Data, name?: string, - transferList?: TransferListItem[] + transferList?: readonly TransferListItem[] ): Promise { return await new Promise((resolve, reject) => { if (!this.started) { @@ -913,14 +1188,18 @@ export abstract class AbstractPool< return } const timestamp = performance.now() - const workerNodeKey = this.chooseWorkerNode() + const workerNodeKey = this.chooseWorkerNode(name) const task: Task = { name: name ?? DEFAULT_TASK_NAME, - // eslint-disable-next-line @typescript-eslint/consistent-type-assertions data: data ?? ({} as Data), + priority: this.getWorkerNodeTaskFunctionPriority(workerNodeKey, name), + strategy: this.getWorkerNodeTaskFunctionWorkerChoiceStrategy( + workerNodeKey, + name + ), transferList, timestamp, - taskId: randomUUID() + taskId: randomUUID(), } // eslint-disable-next-line @typescript-eslint/no-non-null-assertion this.promiseResponseMap.set(task.taskId!, { @@ -930,9 +1209,9 @@ export abstract class AbstractPool< ...(this.emitter != null && { asyncResource: new AsyncResource('poolifier:task', { triggerAsyncId: this.emitter.asyncId, - requireManualDestroy: true - }) - }) + requireManualDestroy: true, + }), + }), }) if ( this.opts.enableTasksQueue === false || @@ -946,6 +1225,51 @@ export abstract class AbstractPool< }) } + + /** @inheritDoc */ + public mapExecute ( + data: Iterable, + name?: string, + transferList?: readonly TransferListItem[] + ): Promise { + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition + if (data == null) { + throw new TypeError('data argument must be a defined iterable') + } + if (typeof data[Symbol.iterator] !== 'function') { + throw new TypeError('data argument must be an iterable') + } + if (!Array.isArray(data)) { + data = [...data] + } + return Promise.all( + (data as Data[]).map(data => this.execute(data, name, transferList)) + ) + } + + /** + * Starts the minimum number of workers. + * @param initWorkerNodeUsage - Whether to initialize the worker node usage or not. @defaultValue false + */ + private startMinimumNumberOfWorkers (initWorkerNodeUsage = false): void { + if (this.minimumNumberOfWorkers === 0) { + return + } + this.startingMinimumNumberOfWorkers = true + while ( + this.workerNodes.reduce( + (accumulator, workerNode) => + !workerNode.info.dynamic ? accumulator + 1 : accumulator, + 0 + ) < this.minimumNumberOfWorkers + ) { + const workerNodeKey = this.createAndSetupWorkerNode() + initWorkerNodeUsage && + this.initWorkerNodeUsage(this.workerNodes[workerNodeKey]) + } + this.startingMinimumNumberOfWorkers = false + } + /** @inheritdoc */ public start (): void { if (this.started) { @@ -958,15 +1282,8 @@ export abstract class AbstractPool< throw new Error('Cannot start a destroying pool') } this.starting = true - while ( - this.workerNodes.reduce( - (accumulator, workerNode) => - !workerNode.info.dynamic ? accumulator + 1 : accumulator, - 0 - ) < this.minimumNumberOfWorkers - ) { - this.createAndSetupWorkerNode() - } + this.startMinimumNumberOfWorkers() + this.startTimestamp = performance.now() this.starting = false this.started = true } @@ -984,21 +1301,22 @@ export abstract class AbstractPool< } this.destroying = true await Promise.all( - this.workerNodes.map(async (_workerNode, workerNodeKey) => { + this.workerNodes.map(async (_, workerNodeKey) => { await this.destroyWorkerNode(workerNodeKey) }) ) this.emitter?.emit(PoolEvents.destroy, this.info) this.emitter?.emitDestroy() - this.emitter?.removeAllListeners() this.readyEventEmitted = false + delete this.startTimestamp this.destroying = false this.started = false } private async sendKillMessageToWorker (workerNodeKey: number): Promise { await new Promise((resolve, reject) => { - if (this.workerNodes?.[workerNodeKey] == null) { + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition + if (this.workerNodes[workerNodeKey] == null) { resolve() return } @@ -1009,8 +1327,8 @@ export abstract class AbstractPool< } else if (message.kill === 'failure') { reject( new Error( - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - `Kill message handling failed on worker ${message.workerId!}` + // eslint-disable-next-line @typescript-eslint/restrict-template-expressions + `Kill message handling failed on worker ${message.workerId?.toString()}` ) ) } @@ -1023,7 +1341,6 @@ export abstract class AbstractPool< /** * Terminates the worker node given its worker node key. - * * @param workerNodeKey - The worker node key. */ protected async destroyWorkerNode (workerNodeKey: number): Promise { @@ -1046,22 +1363,20 @@ export abstract class AbstractPool< /** * Setup hook to execute code before worker nodes are created in the abstract constructor. * Can be overridden. - * - * @virtual */ protected setupHook (): void { /* Intentionally empty */ } /** - * Should return whether the worker is the main worker or not. + * Returns whether the worker is the main worker or not. + * @returns `true` if the worker is the main worker, `false` otherwise. */ protected abstract isMain (): boolean /** * Hook executed before the worker task execution. * Can be overridden. - * * @param workerNodeKey - The worker node key. * @param task - The task to execute. */ @@ -1069,11 +1384,12 @@ export abstract class AbstractPool< workerNodeKey: number, task: Task ): void { + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition if (this.workerNodes[workerNodeKey]?.usage != null) { const workerUsage = this.workerNodes[workerNodeKey].usage ++workerUsage.tasks.executing updateWaitTimeWorkerUsage( - this.workerChoiceStrategyContext, + this.workerChoiceStrategiesContext, workerUsage, task ) @@ -1091,7 +1407,7 @@ export abstract class AbstractPool< ].getTaskFunctionWorkerUsage(task.name!)! ++taskFunctionWorkerUsage.tasks.executing updateWaitTimeWorkerUsage( - this.workerChoiceStrategyContext, + this.workerChoiceStrategiesContext, taskFunctionWorkerUsage, task ) @@ -1101,7 +1417,6 @@ export abstract class AbstractPool< /** * Hook executed after the worker task execution. * Can be overridden. - * * @param workerNodeKey - The worker node key. * @param message - The received message. */ @@ -1109,21 +1424,22 @@ export abstract class AbstractPool< workerNodeKey: number, message: MessageValue ): void { - let needWorkerChoiceStrategyUpdate = false + let needWorkerChoiceStrategiesUpdate = false + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition if (this.workerNodes[workerNodeKey]?.usage != null) { const workerUsage = this.workerNodes[workerNodeKey].usage updateTaskStatisticsWorkerUsage(workerUsage, message) updateRunTimeWorkerUsage( - this.workerChoiceStrategyContext, + this.workerChoiceStrategiesContext, workerUsage, message ) updateEluWorkerUsage( - this.workerChoiceStrategyContext, + this.workerChoiceStrategiesContext, workerUsage, message ) - needWorkerChoiceStrategyUpdate = true + needWorkerChoiceStrategiesUpdate = true } if ( this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) && @@ -1139,25 +1455,24 @@ export abstract class AbstractPool< ].getTaskFunctionWorkerUsage(message.taskPerformance!.name)! updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message) updateRunTimeWorkerUsage( - this.workerChoiceStrategyContext, + this.workerChoiceStrategiesContext, taskFunctionWorkerUsage, message ) updateEluWorkerUsage( - this.workerChoiceStrategyContext, + this.workerChoiceStrategiesContext, taskFunctionWorkerUsage, message ) - needWorkerChoiceStrategyUpdate = true + needWorkerChoiceStrategiesUpdate = true } - if (needWorkerChoiceStrategyUpdate) { - this.workerChoiceStrategyContext.update(workerNodeKey) + if (needWorkerChoiceStrategiesUpdate) { + this.workerChoiceStrategiesContext?.update(workerNodeKey) } } /** * Whether the worker node shall update its task function worker usage or not. - * * @param workerNodeKey - The worker node key. * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise. */ @@ -1165,40 +1480,40 @@ export abstract class AbstractPool< const workerInfo = this.getWorkerInfo(workerNodeKey) return ( workerInfo != null && - Array.isArray(workerInfo.taskFunctionNames) && - workerInfo.taskFunctionNames.length > 2 + Array.isArray(workerInfo.taskFunctionsProperties) && + workerInfo.taskFunctionsProperties.length > 2 ) } /** * Chooses a worker node for the next task. - * - * The default worker choice strategy uses a round robin algorithm to distribute the tasks. - * - * @returns The chosen worker node key + * @param name - The task function name. + * @returns The chosen worker node key. */ - private chooseWorkerNode (): number { + private chooseWorkerNode (name?: string): number { if (this.shallCreateDynamicWorker()) { const workerNodeKey = this.createAndSetupDynamicWorkerNode() if ( - this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage + this.workerChoiceStrategiesContext?.getPolicy().dynamicWorkerUsage === + true ) { return workerNodeKey } } - return this.workerChoiceStrategyContext.execute() + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + return this.workerChoiceStrategiesContext!.execute( + this.getTaskFunctionWorkerChoiceStrategy(name) + ) } /** * Conditions for dynamic worker creation. - * * @returns Whether to create a dynamic worker or not. */ protected abstract shallCreateDynamicWorker (): boolean /** * Sends a message to worker given its worker node key. - * * @param workerNodeKey - The worker node key. * @param message - The message. * @param transferList - The optional array of transferable objects. @@ -1206,12 +1521,51 @@ export abstract class AbstractPool< protected abstract sendToWorker ( workerNodeKey: number, message: MessageValue, - transferList?: TransferListItem[] + transferList?: readonly TransferListItem[] ): void + /** + * Initializes the worker node usage with sensible default values gathered during runtime. + * @param workerNode - The worker node. + */ + private initWorkerNodeUsage (workerNode: IWorkerNode): void { + if ( + this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements() + .runTime.aggregate === true + ) { + workerNode.usage.runTime.aggregate = min( + ...this.workerNodes.map( + workerNode => + workerNode.usage.runTime.aggregate ?? Number.POSITIVE_INFINITY + ) + ) + } + if ( + this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements() + .waitTime.aggregate === true + ) { + workerNode.usage.waitTime.aggregate = min( + ...this.workerNodes.map( + workerNode => + workerNode.usage.waitTime.aggregate ?? Number.POSITIVE_INFINITY + ) + ) + } + if ( + this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements().elu + .aggregate === true + ) { + workerNode.usage.elu.active.aggregate = min( + ...this.workerNodes.map( + workerNode => + workerNode.usage.elu.active.aggregate ?? Number.POSITIVE_INFINITY + ) + ) + } + } + /** * Creates a new, completely set up worker node. - * * @returns New, completely set up worker node key. */ protected createAndSetupWorkerNode (): number { @@ -1228,7 +1582,7 @@ export abstract class AbstractPool< 'error', this.opts.errorHandler ?? EMPTY_FUNCTION ) - workerNode.registerWorkerEventHandler('error', (error: Error) => { + workerNode.registerOnceWorkerEventHandler('error', (error: Error) => { workerNode.info.ready = false this.emitter?.emit(PoolEvents.error, error) if ( @@ -1238,8 +1592,8 @@ export abstract class AbstractPool< ) { if (workerNode.info.dynamic) { this.createAndSetupDynamicWorkerNode() - } else { - this.createAndSetupWorkerNode() + } else if (!this.startingMinimumNumberOfWorkers) { + this.startMinimumNumberOfWorkers(true) } } if ( @@ -1249,7 +1603,8 @@ export abstract class AbstractPool< ) { this.redistributeQueuedTasks(this.workerNodes.indexOf(workerNode)) } - workerNode?.terminate().catch(error => { + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition + workerNode?.terminate().catch((error: unknown) => { this.emitter?.emit(PoolEvents.error, error) }) }) @@ -1259,6 +1614,13 @@ export abstract class AbstractPool< ) workerNode.registerOnceWorkerEventHandler('exit', () => { this.removeWorkerNode(workerNode) + if ( + this.started && + !this.startingMinimumNumberOfWorkers && + !this.destroying + ) { + this.startMinimumNumberOfWorkers(true) + } }) const workerNodeKey = this.addWorkerNode(workerNode) this.afterWorkerNodeSetup(workerNodeKey) @@ -1267,7 +1629,6 @@ export abstract class AbstractPool< /** * Creates a new, completely set up dynamic worker node. - * * @returns New, completely set up dynamic worker node key. */ protected createAndSetupDynamicWorkerNode (): number { @@ -1277,7 +1638,8 @@ export abstract class AbstractPool< const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId( message.workerId ) - const workerUsage = this.workerNodes[localWorkerNodeKey].usage + const workerInfo = this.getWorkerInfo(localWorkerNodeKey) + const workerUsage = this.workerNodes[localWorkerNodeKey]?.usage // Kill message received from worker if ( isKillBehavior(KillBehaviors.HARD, message.kill) || @@ -1285,26 +1647,31 @@ export abstract class AbstractPool< ((this.opts.enableTasksQueue === false && workerUsage.tasks.executing === 0) || (this.opts.enableTasksQueue === true && + workerInfo != null && + !workerInfo.stealing && workerUsage.tasks.executing === 0 && this.tasksQueueSize(localWorkerNodeKey) === 0))) ) { // Flag the worker node as not ready immediately this.flagWorkerNodeAsNotReady(localWorkerNodeKey) - this.destroyWorkerNode(localWorkerNodeKey).catch(error => { + this.destroyWorkerNode(localWorkerNodeKey).catch((error: unknown) => { this.emitter?.emit(PoolEvents.error, error) }) } }) this.sendToWorker(workerNodeKey, { - checkActive: true + checkActive: true, }) if (this.taskFunctions.size > 0) { - for (const [taskFunctionName, taskFunction] of this.taskFunctions) { + for (const [taskFunctionName, taskFunctionObject] of this.taskFunctions) { this.sendTaskFunctionOperationToWorker(workerNodeKey, { taskFunctionOperation: 'add', - taskFunctionName, - taskFunction: taskFunction.toString() - }).catch(error => { + taskFunctionProperties: buildTaskFunctionProperties( + taskFunctionName, + taskFunctionObject + ), + taskFunction: taskFunctionObject.taskFunction.toString(), + }).catch((error: unknown) => { this.emitter?.emit(PoolEvents.error, error) }) } @@ -1312,18 +1679,20 @@ export abstract class AbstractPool< const workerNode = this.workerNodes[workerNodeKey] workerNode.info.dynamic = true if ( - this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady || - this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage + this.workerChoiceStrategiesContext?.getPolicy().dynamicWorkerReady === + true || + this.workerChoiceStrategiesContext?.getPolicy().dynamicWorkerUsage === + true ) { workerNode.info.ready = true } + this.initWorkerNodeUsage(workerNode) this.checkAndEmitDynamicWorkerCreationEvents() return workerNodeKey } /** * Registers a listener callback on the worker given its worker node key. - * * @param workerNodeKey - The worker node key. * @param listener - The message listener callback. */ @@ -1336,7 +1705,6 @@ export abstract class AbstractPool< /** * Registers once a listener callback on the worker given its worker node key. - * * @param workerNodeKey - The worker node key. * @param listener - The message listener callback. */ @@ -1349,7 +1717,6 @@ export abstract class AbstractPool< /** * Deregisters a listener callback on the worker given its worker node key. - * * @param workerNodeKey - The worker node key. * @param listener - The message listener callback. */ @@ -1363,7 +1730,6 @@ export abstract class AbstractPool< /** * Method hooked up after a worker node has been newly created. * Can be overridden. - * * @param workerNodeKey - The newly created worker node key. */ protected afterWorkerNodeSetup (workerNodeKey: number): void { @@ -1394,25 +1760,24 @@ export abstract class AbstractPool< /** * Sends the startup message to worker given its worker node key. - * * @param workerNodeKey - The worker node key. */ protected abstract sendStartupMessageToWorker (workerNodeKey: number): void /** * Sends the statistics message to worker given its worker node key. - * * @param workerNodeKey - The worker node key. */ private sendStatisticsMessageToWorker (workerNodeKey: number): void { this.sendToWorker(workerNodeKey, { statistics: { runTime: - this.workerChoiceStrategyContext.getTaskStatisticsRequirements() - .runTime.aggregate, - elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements() - .elu.aggregate - } + this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements() + .runTime.aggregate ?? false, + elu: + this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements() + .elu.aggregate ?? false, + }, }) } @@ -1428,14 +1793,15 @@ export abstract class AbstractPool< } } - private redistributeQueuedTasks (workerNodeKey: number): void { - if (workerNodeKey === -1 || this.cannotStealTask()) { + private redistributeQueuedTasks (sourceWorkerNodeKey: number): void { + if (sourceWorkerNodeKey === -1 || this.cannotStealTask()) { return } - while (this.tasksQueueSize(workerNodeKey) > 0) { + while (this.tasksQueueSize(sourceWorkerNodeKey) > 0) { const destinationWorkerNodeKey = this.workerNodes.reduce( (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => { - return workerNode.info.ready && + return sourceWorkerNodeKey !== workerNodeKey && + workerNode.info.ready && workerNode.usage.tasks.queued < workerNodes[minWorkerNodeKey].usage.tasks.queued ? workerNodeKey @@ -1446,7 +1812,7 @@ export abstract class AbstractPool< this.handleTask( destinationWorkerNodeKey, // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - this.dequeueTask(workerNodeKey)! + this.dequeueTask(sourceWorkerNodeKey)! ) } } @@ -1456,6 +1822,7 @@ export abstract class AbstractPool< taskName: string ): void { const workerNode = this.workerNodes[workerNodeKey] + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition if (workerNode?.usage != null) { ++workerNode.usage.tasks.stolen } @@ -1463,27 +1830,21 @@ export abstract class AbstractPool< this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) && workerNode.getTaskFunctionWorkerUsage(taskName) != null ) { - const taskFunctionWorkerUsage = - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - workerNode.getTaskFunctionWorkerUsage(taskName)! - ++taskFunctionWorkerUsage.tasks.stolen + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + ++workerNode.getTaskFunctionWorkerUsage(taskName)!.tasks.stolen } } private updateTaskSequentiallyStolenStatisticsWorkerUsage ( - workerNodeKey: number + workerNodeKey: number, + taskName: string, + previousTaskName?: string ): void { const workerNode = this.workerNodes[workerNodeKey] + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition if (workerNode?.usage != null) { ++workerNode.usage.tasks.sequentiallyStolen } - } - - private updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage ( - workerNodeKey: number, - taskName: string - ): void { - const workerNode = this.workerNodes[workerNodeKey] if ( this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) && workerNode.getTaskFunctionWorkerUsage(taskName) != null @@ -1491,32 +1852,36 @@ export abstract class AbstractPool< const taskFunctionWorkerUsage = // eslint-disable-next-line @typescript-eslint/no-non-null-assertion workerNode.getTaskFunctionWorkerUsage(taskName)! - ++taskFunctionWorkerUsage.tasks.sequentiallyStolen + if ( + taskFunctionWorkerUsage.tasks.sequentiallyStolen === 0 || + (previousTaskName != null && + previousTaskName === taskName && + taskFunctionWorkerUsage.tasks.sequentiallyStolen > 0) + ) { + ++taskFunctionWorkerUsage.tasks.sequentiallyStolen + } else if (taskFunctionWorkerUsage.tasks.sequentiallyStolen > 0) { + taskFunctionWorkerUsage.tasks.sequentiallyStolen = 0 + } } } private resetTaskSequentiallyStolenStatisticsWorkerUsage ( - workerNodeKey: number + workerNodeKey: number, + taskName: string ): void { const workerNode = this.workerNodes[workerNodeKey] + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition if (workerNode?.usage != null) { workerNode.usage.tasks.sequentiallyStolen = 0 } - } - - private resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage ( - workerNodeKey: number, - taskName: string - ): void { - const workerNode = this.workerNodes[workerNodeKey] if ( this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) && workerNode.getTaskFunctionWorkerUsage(taskName) != null ) { - const taskFunctionWorkerUsage = - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - workerNode.getTaskFunctionWorkerUsage(taskName)! - taskFunctionWorkerUsage.tasks.sequentiallyStolen = 0 + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + workerNode.getTaskFunctionWorkerUsage( + taskName + )!.tasks.sequentiallyStolen = 0 } } @@ -1527,74 +1892,62 @@ export abstract class AbstractPool< const { workerNodeKey } = eventDetail if (workerNodeKey == null) { throw new Error( - 'WorkerNode event detail workerNodeKey property must be defined' + "WorkerNode event detail 'workerNodeKey' property must be defined" + ) + } + const workerInfo = this.getWorkerInfo(workerNodeKey) + if (workerInfo == null) { + throw new Error( + `Worker node with key '${workerNodeKey.toString()}' not found in pool` ) } if ( this.cannotStealTask() || - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - this.info.stealingWorkerNodes! > Math.floor(this.workerNodes.length / 2) + (this.info.stealingWorkerNodes ?? 0) > + Math.floor(this.workerNodes.length / 2) ) { if (previousStolenTask != null) { - this.getWorkerInfo(workerNodeKey).stealing = false + workerInfo.stealing = false + this.resetTaskSequentiallyStolenStatisticsWorkerUsage( + workerNodeKey, + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + previousStolenTask.name! + ) } return } const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks if ( previousStolenTask != null && - workerNodeTasksUsage.sequentiallyStolen > 0 && (workerNodeTasksUsage.executing > 0 || this.tasksQueueSize(workerNodeKey) > 0) ) { - this.getWorkerInfo(workerNodeKey).stealing = false - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - for (const taskName of this.workerNodes[workerNodeKey].info - .taskFunctionNames!) { - this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage( - workerNodeKey, - taskName - ) - } - this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey) + workerInfo.stealing = false + this.resetTaskSequentiallyStolenStatisticsWorkerUsage( + workerNodeKey, + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + previousStolenTask.name! + ) return } - this.getWorkerInfo(workerNodeKey).stealing = true + workerInfo.stealing = true const stolenTask = this.workerNodeStealTask(workerNodeKey) - if ( - this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) && - stolenTask != null - ) { - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - const taskFunctionTasksWorkerUsage = this.workerNodes[ - workerNodeKey + if (stolenTask != null) { + this.updateTaskSequentiallyStolenStatisticsWorkerUsage( + workerNodeKey, // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - ].getTaskFunctionWorkerUsage(stolenTask.name!)!.tasks - if ( - taskFunctionTasksWorkerUsage.sequentiallyStolen === 0 || - (previousStolenTask != null && - previousStolenTask.name === stolenTask.name && - taskFunctionTasksWorkerUsage.sequentiallyStolen > 0) - ) { - this.updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage( - workerNodeKey, - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - stolenTask.name! - ) - } else { - this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage( - workerNodeKey, - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - stolenTask.name! - ) - } + stolenTask.name!, + previousStolenTask?.name + ) } sleep(exponentialDelay(workerNodeTasksUsage.sequentiallyStolen)) .then(() => { this.handleWorkerNodeIdleEvent(eventDetail, stolenTask) return undefined }) - .catch(EMPTY_FUNCTION) + .catch((error: unknown) => { + this.emitter?.emit(PoolEvents.error, error) + }) } private readonly workerNodeStealTask = ( @@ -1615,9 +1968,8 @@ export abstract class AbstractPool< ) if (sourceWorkerNode != null) { // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - const task = sourceWorkerNode.popTask()! + const task = sourceWorkerNode.dequeueLastPrioritizedTask()! this.handleTask(workerNodeKey, task) - this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey) // eslint-disable-next-line @typescript-eslint/no-non-null-assertion this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey, task.name!) return task @@ -1629,17 +1981,18 @@ export abstract class AbstractPool< ): void => { if ( this.cannotStealTask() || - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - this.info.stealingWorkerNodes! > Math.floor(this.workerNodes.length / 2) + this.hasBackPressure() || + (this.info.stealingWorkerNodes ?? 0) > + Math.floor(this.workerNodes.length / 2) ) { return } - const { workerId } = eventDetail const sizeOffset = 1 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion if (this.opts.tasksQueueOptions!.size! <= sizeOffset) { return } + const { workerId } = eventDetail const sourceWorkerNode = this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)] const workerNodes = this.workerNodes @@ -1658,53 +2011,76 @@ export abstract class AbstractPool< // eslint-disable-next-line @typescript-eslint/no-non-null-assertion this.opts.tasksQueueOptions!.size! - sizeOffset ) { - this.getWorkerInfo(workerNodeKey).stealing = true + const workerInfo = this.getWorkerInfo(workerNodeKey) + if (workerInfo == null) { + throw new Error( + `Worker node with key '${workerNodeKey.toString()}' not found in pool` + ) + } + workerInfo.stealing = true // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - const task = sourceWorkerNode.popTask()! + const task = sourceWorkerNode.dequeueLastPrioritizedTask()! this.handleTask(workerNodeKey, task) // eslint-disable-next-line @typescript-eslint/no-non-null-assertion this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey, task.name!) - this.getWorkerInfo(workerNodeKey).stealing = false + workerInfo.stealing = false } } } + private setTasksQueuePriority (workerNodeKey: number): void { + this.workerNodes[workerNodeKey].setTasksQueuePriority( + this.getTasksQueuePriority() + ) + } + /** * This method is the message listener registered on each worker. + * @param message - The message received from the worker. */ protected readonly workerMessageListener = ( message: MessageValue ): void => { this.checkMessageWorkerId(message) - const { workerId, ready, taskId, taskFunctionNames } = message - if (ready != null && taskFunctionNames != null) { + const { workerId, ready, taskId, taskFunctionsProperties } = message + if (ready != null && taskFunctionsProperties != null) { // Worker ready response received from worker this.handleWorkerReadyResponse(message) + } else if (taskFunctionsProperties != null) { + // Task function properties message received from worker + const workerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId) + const workerInfo = this.getWorkerInfo(workerNodeKey) + if (workerInfo != null) { + workerInfo.taskFunctionsProperties = taskFunctionsProperties + this.sendStatisticsMessageToWorker(workerNodeKey) + this.setTasksQueuePriority(workerNodeKey) + } } else if (taskId != null) { // Task execution response received from worker this.handleTaskExecutionResponse(message) - } else if (taskFunctionNames != null) { - // Task function names message received from worker - this.getWorkerInfo( - this.getWorkerNodeKeyByWorkerId(workerId) - ).taskFunctionNames = taskFunctionNames + } + } + + private checkAndEmitReadyEvent (): void { + if (!this.readyEventEmitted && this.ready) { + this.emitter?.emit(PoolEvents.ready, this.info) + this.readyEventEmitted = true } } private handleWorkerReadyResponse (message: MessageValue): void { - const { workerId, ready, taskFunctionNames } = message + const { workerId, ready, taskFunctionsProperties } = message if (ready == null || !ready) { - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - throw new Error(`Worker ${workerId!} failed to initialize`) + // eslint-disable-next-line @typescript-eslint/restrict-template-expressions + throw new Error(`Worker ${workerId?.toString()} failed to initialize`) } - const workerNode = - this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)] + const workerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId) + const workerNode = this.workerNodes[workerNodeKey] workerNode.info.ready = ready - workerNode.info.taskFunctionNames = taskFunctionNames - if (!this.readyEventEmitted && this.ready) { - this.readyEventEmitted = true - this.emitter?.emit(PoolEvents.ready, this.info) - } + workerNode.info.taskFunctionsProperties = taskFunctionsProperties + this.sendStatisticsMessageToWorker(workerNodeKey) + this.setTasksQueuePriority(workerNodeKey) + this.checkAndEmitReadyEvent() } private handleTaskExecutionResponse (message: MessageValue): void { @@ -1732,8 +2108,14 @@ export abstract class AbstractPool< this.afterTaskExecutionHook(workerNodeKey, message) // eslint-disable-next-line @typescript-eslint/no-non-null-assertion this.promiseResponseMap.delete(taskId!) + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition workerNode?.emit('taskFinished', taskId) - if (this.opts.enableTasksQueue === true && !this.destroying) { + if ( + this.opts.enableTasksQueue === true && + !this.destroying && + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition + workerNode != null + ) { const workerNodeTasksUsage = workerNode.usage.tasks if ( this.tasksQueueSize(workerNodeKey) > 0 && @@ -1750,9 +2132,8 @@ export abstract class AbstractPool< workerNodeTasksUsage.sequentiallyStolen === 0 ) { workerNode.emit('idle', { - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - workerId: workerId!, - workerNodeKey + workerId, + workerNodeKey, }) } } @@ -1778,17 +2159,21 @@ export abstract class AbstractPool< /** * Gets the worker information given its worker node key. - * * @param workerNodeKey - The worker node key. * @returns The worker information. */ - protected getWorkerInfo (workerNodeKey: number): WorkerInfo { + protected getWorkerInfo (workerNodeKey: number): WorkerInfo | undefined { return this.workerNodes[workerNodeKey]?.info } + private getTasksQueuePriority (): boolean { + return this.listTaskFunctionsProperties().some( + taskFunctionProperties => taskFunctionProperties.priority != null + ) + } + /** * Creates a worker node. - * * @returns The created worker node. */ private createWorkerNode (): IWorkerNode { @@ -1802,7 +2187,9 @@ export abstract class AbstractPool< this.opts.tasksQueueOptions?.size ?? getDefaultTasksQueueOptions( this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers - ).size + ).size, + tasksQueueBucketSize: defaultBucketSize, + tasksQueuePriority: this.getTasksQueuePriority(), } ) // Flag the worker node as ready at pool startup. @@ -1814,7 +2201,6 @@ export abstract class AbstractPool< /** * Adds the given worker node in the pool worker nodes. - * * @param workerNode - The worker node. * @returns The added worker node key. * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found. @@ -1828,29 +2214,31 @@ export abstract class AbstractPool< return workerNodeKey } + private checkAndEmitEmptyEvent (): void { + if (this.empty) { + this.emitter?.emit(PoolEvents.empty, this.info) + this.readyEventEmitted = false + } + } + /** * Removes the worker node from the pool worker nodes. - * * @param workerNode - The worker node. */ private removeWorkerNode (workerNode: IWorkerNode): void { const workerNodeKey = this.workerNodes.indexOf(workerNode) if (workerNodeKey !== -1) { this.workerNodes.splice(workerNodeKey, 1) - this.workerChoiceStrategyContext.remove(workerNodeKey) + this.workerChoiceStrategiesContext?.remove(workerNodeKey) } + this.checkAndEmitEmptyEvent() } protected flagWorkerNodeAsNotReady (workerNodeKey: number): void { - this.getWorkerInfo(workerNodeKey).ready = false - } - - /** @inheritDoc */ - public hasWorkerNodeBackPressure (workerNodeKey: number): boolean { - return ( - this.opts.enableTasksQueue === true && - this.workerNodes[workerNodeKey].hasBackPressure() - ) + const workerInfo = this.getWorkerInfo(workerNodeKey) + if (workerInfo != null) { + workerInfo.ready = false + } } private hasBackPressure (): boolean { @@ -1864,7 +2252,6 @@ export abstract class AbstractPool< /** * Executes the given task on the worker given its worker node key. - * * @param workerNodeKey - The worker node key. * @param task - The task to execute. */ @@ -1900,7 +2287,7 @@ export abstract class AbstractPool< } private flushTasksQueues (): void { - for (const [workerNodeKey] of this.workerNodes.entries()) { + for (const workerNodeKey of this.workerNodes.keys()) { this.flushTasksQueue(workerNodeKey) } }