X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=11cc9cc15ef419346827e5f627150f04e73ba158;hb=55d7d6002049be09a06b08da26febe2e8bfa494b;hp=d55c3101bcaead44cab1b424a170383165dc52be;hpb=0dc838e376fd1fea7146350e99e487159e4ba40a;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index d55c3101..11cc9cc1 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -1,6 +1,8 @@ import { randomUUID } from 'node:crypto' import { performance } from 'node:perf_hooks' -import { existsSync } from 'node:fs' +import type { TransferListItem } from 'node:worker_threads' +import { EventEmitterAsyncResource } from 'node:events' +import { AsyncResource } from 'node:async_hooks' import type { MessageValue, PromiseResponseWrapper, @@ -10,16 +12,20 @@ import { DEFAULT_TASK_NAME, DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS, EMPTY_FUNCTION, + average, + exponentialDelay, isKillBehavior, isPlainObject, + max, median, + min, round, - updateMeasurementStatistics + sleep } from '../utils' import { KillBehaviors } from '../worker/worker-options' +import type { TaskFunction } from '../worker/task-functions' import { type IPool, - PoolEmitter, PoolEvents, type PoolInfo, type PoolOptions, @@ -30,12 +36,13 @@ import { import type { IWorker, IWorkerNode, + TaskStatistics, WorkerInfo, + WorkerNodeEventDetail, WorkerType, WorkerUsage } from './worker' import { - type MeasurementStatisticsRequirements, Measurements, WorkerChoiceStrategies, type WorkerChoiceStrategy, @@ -44,6 +51,17 @@ import { import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context' import { version } from './version' import { WorkerNode } from './worker-node' +import { + checkFilePath, + checkValidTasksQueueOptions, + checkValidWorkerChoiceStrategy, + getDefaultTasksQueueOptions, + updateEluWorkerUsage, + updateRunTimeWorkerUsage, + updateTaskStatisticsWorkerUsage, + updateWaitTimeWorkerUsage, + waitWorkerNodeEvents +} from './utils' /** * Base class that implements some shared logic for all poolifier pools. @@ -61,20 +79,22 @@ export abstract class AbstractPool< public readonly workerNodes: Array> = [] /** @inheritDoc */ - public readonly emitter?: PoolEmitter + public emitter?: EventEmitterAsyncResource /** - * The execution response promise map. - * + * Dynamic pool maximum size property placeholder. + */ + protected readonly max?: number + + /** + * The task execution response promise map: * - `key`: The message id of each submitted task. * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks. * * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id. */ - protected promiseResponseMap: Map< - string, - PromiseResponseWrapper - > = new Map>() + protected promiseResponseMap: Map> = + new Map>() /** * Worker choice strategy context referencing a worker choice algorithm implementation. @@ -85,10 +105,29 @@ export abstract class AbstractPool< Response > + /** + * The task functions added at runtime map: + * - `key`: The task function name. + * - `value`: The task function itself. + */ + private readonly taskFunctions: Map> + + /** + * Whether the pool is started or not. + */ + private started: boolean /** * Whether the pool is starting or not. */ - private readonly starting: boolean + private starting: boolean + /** + * Whether the pool is destroying or not. + */ + private destroying: boolean + /** + * Whether the pool ready event has been emitted or not. + */ + private readyEventEmitted: boolean /** * The start timestamp of the pool. */ @@ -107,19 +146,20 @@ export abstract class AbstractPool< protected readonly opts: PoolOptions ) { if (!this.isMain()) { - throw new Error('Cannot start a pool from a worker!') + throw new Error( + 'Cannot start a pool from a worker with the same type as the pool' + ) } + checkFilePath(this.filePath) this.checkNumberOfWorkers(this.numberOfWorkers) - this.checkFilePath(this.filePath) this.checkPoolOptions(this.opts) this.chooseWorkerNode = this.chooseWorkerNode.bind(this) this.executeTask = this.executeTask.bind(this) this.enqueueTask = this.enqueueTask.bind(this) - this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this) if (this.opts.enableEvents === true) { - this.emitter = new PoolEmitter() + this.initializeEventEmitter() } this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext< Worker, @@ -133,26 +173,19 @@ export abstract class AbstractPool< this.setupHook() - this.starting = true - this.startPool() + this.taskFunctions = new Map>() + + this.started = false this.starting = false + this.destroying = false + this.readyEventEmitted = false + if (this.opts.startWorkers === true) { + this.start() + } this.startTimestamp = performance.now() } - private checkFilePath (filePath: string): void { - if ( - filePath == null || - typeof filePath !== 'string' || - (typeof filePath === 'string' && filePath.trim().length === 0) - ) { - throw new Error('Please specify a file with a worker implementation') - } - if (!existsSync(filePath)) { - throw new Error(`Cannot find the worker file '${filePath}'`) - } - } - private checkNumberOfWorkers (numberOfWorkers: number): void { if (numberOfWorkers == null) { throw new Error( @@ -171,42 +204,26 @@ export abstract class AbstractPool< } } - protected checkDynamicPoolSize (min: number, max: number): void { - if (this.type === PoolTypes.dynamic) { - if (min > max) { - throw new RangeError( - 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size' - ) - } else if (max === 0) { - throw new RangeError( - 'Cannot instantiate a dynamic pool with a pool size equal to zero' - ) - } else if (min === max) { - throw new RangeError( - 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead' - ) - } - } - } - private checkPoolOptions (opts: PoolOptions): void { if (isPlainObject(opts)) { + this.opts.startWorkers = opts.startWorkers ?? true + checkValidWorkerChoiceStrategy( + opts.workerChoiceStrategy as WorkerChoiceStrategy + ) this.opts.workerChoiceStrategy = opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN - this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy) - this.opts.workerChoiceStrategyOptions = - opts.workerChoiceStrategyOptions ?? - DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS this.checkValidWorkerChoiceStrategyOptions( - this.opts.workerChoiceStrategyOptions + opts.workerChoiceStrategyOptions as WorkerChoiceStrategyOptions ) + this.opts.workerChoiceStrategyOptions = { + ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS, + ...opts.workerChoiceStrategyOptions + } this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true this.opts.enableEvents = opts.enableEvents ?? true this.opts.enableTasksQueue = opts.enableTasksQueue ?? false if (this.opts.enableTasksQueue) { - this.checkValidTasksQueueOptions( - opts.tasksQueueOptions as TasksQueueOptions - ) + checkValidTasksQueueOptions(opts.tasksQueueOptions as TasksQueueOptions) this.opts.tasksQueueOptions = this.buildTasksQueueOptions( opts.tasksQueueOptions as TasksQueueOptions ) @@ -216,78 +233,57 @@ export abstract class AbstractPool< } } - private checkValidWorkerChoiceStrategy ( - workerChoiceStrategy: WorkerChoiceStrategy - ): void { - if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) { - throw new Error( - `Invalid worker choice strategy '${workerChoiceStrategy}'` - ) - } - } - private checkValidWorkerChoiceStrategyOptions ( workerChoiceStrategyOptions: WorkerChoiceStrategyOptions ): void { - if (!isPlainObject(workerChoiceStrategyOptions)) { + if ( + workerChoiceStrategyOptions != null && + !isPlainObject(workerChoiceStrategyOptions) + ) { throw new TypeError( 'Invalid worker choice strategy options: must be a plain object' ) } if ( - workerChoiceStrategyOptions.weights != null && - Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize + workerChoiceStrategyOptions?.retries != null && + !Number.isSafeInteger(workerChoiceStrategyOptions.retries) ) { - throw new Error( - 'Invalid worker choice strategy options: must have a weight for each worker node' + throw new TypeError( + 'Invalid worker choice strategy options: retries must be an integer' ) } if ( - workerChoiceStrategyOptions.measurement != null && - !Object.values(Measurements).includes( - workerChoiceStrategyOptions.measurement - ) + workerChoiceStrategyOptions?.retries != null && + workerChoiceStrategyOptions.retries < 0 ) { - throw new Error( - `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'` + throw new RangeError( + `Invalid worker choice strategy options: retries '${workerChoiceStrategyOptions.retries}' must be greater or equal than zero` ) } - } - - private checkValidTasksQueueOptions ( - tasksQueueOptions: TasksQueueOptions - ): void { - if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) { - throw new TypeError('Invalid tasks queue options: must be a plain object') - } if ( - tasksQueueOptions?.concurrency != null && - !Number.isSafeInteger(tasksQueueOptions.concurrency) + workerChoiceStrategyOptions?.weights != null && + Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize ) { - throw new TypeError( - 'Invalid worker tasks concurrency: must be an integer' + throw new Error( + 'Invalid worker choice strategy options: must have a weight for each worker node' ) } if ( - tasksQueueOptions?.concurrency != null && - tasksQueueOptions.concurrency <= 0 + workerChoiceStrategyOptions?.measurement != null && + !Object.values(Measurements).includes( + workerChoiceStrategyOptions.measurement + ) ) { throw new Error( - `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'` + `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'` ) } } - private startPool (): void { - while ( - this.workerNodes.reduce( - (accumulator, workerNode) => - !workerNode.info.dynamic ? accumulator + 1 : accumulator, - 0 - ) < this.numberOfWorkers - ) { - this.createAndSetupWorker() - } + private initializeEventEmitter (): void { + this.emitter = new EventEmitterAsyncResource({ + name: `poolifier:${this.type}-${this.worker}-pool` + }) } /** @inheritDoc */ @@ -296,6 +292,7 @@ export abstract class AbstractPool< version, type: this.type, worker: this.worker, + started: this.started, ready: this.ready, strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy, minSize: this.minSize, @@ -313,8 +310,8 @@ export abstract class AbstractPool< 0 ), busyWorkerNodes: this.workerNodes.reduce( - (accumulator, workerNode) => - workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator, + (accumulator, _workerNode, workerNodeKey) => + this.isWorkerNodeBusy(workerNodeKey) ? accumulator + 1 : accumulator, 0 ), executedTasks: this.workerNodes.reduce( @@ -327,16 +324,30 @@ export abstract class AbstractPool< accumulator + workerNode.usage.tasks.executing, 0 ), - queuedTasks: this.workerNodes.reduce( - (accumulator, workerNode) => - accumulator + workerNode.usage.tasks.queued, - 0 - ), - maxQueuedTasks: this.workerNodes.reduce( - (accumulator, workerNode) => - accumulator + (workerNode.usage.tasks?.maxQueued ?? 0), - 0 - ), + ...(this.opts.enableTasksQueue === true && { + queuedTasks: this.workerNodes.reduce( + (accumulator, workerNode) => + accumulator + workerNode.usage.tasks.queued, + 0 + ) + }), + ...(this.opts.enableTasksQueue === true && { + maxQueuedTasks: this.workerNodes.reduce( + (accumulator, workerNode) => + accumulator + (workerNode.usage.tasks?.maxQueued ?? 0), + 0 + ) + }), + ...(this.opts.enableTasksQueue === true && { + backPressure: this.hasBackPressure() + }), + ...(this.opts.enableTasksQueue === true && { + stolenTasks: this.workerNodes.reduce( + (accumulator, workerNode) => + accumulator + workerNode.usage.tasks.stolen, + 0 + ) + }), failedTasks: this.workerNodes.reduce( (accumulator, workerNode) => accumulator + workerNode.usage.tasks.failed, @@ -346,37 +357,39 @@ export abstract class AbstractPool< .runTime.aggregate && { runTime: { minimum: round( - Math.min( + min( ...this.workerNodes.map( workerNode => workerNode.usage.runTime?.minimum ?? Infinity ) ) ), maximum: round( - Math.max( + max( ...this.workerNodes.map( workerNode => workerNode.usage.runTime?.maximum ?? -Infinity ) ) ), - average: round( - this.workerNodes.reduce( - (accumulator, workerNode) => - accumulator + (workerNode.usage.runTime?.aggregate ?? 0), - 0 - ) / - this.workerNodes.reduce( - (accumulator, workerNode) => - accumulator + (workerNode.usage.tasks?.executed ?? 0), - 0 + ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements() + .runTime.average && { + average: round( + average( + this.workerNodes.reduce( + (accumulator, workerNode) => + accumulator.concat(workerNode.usage.runTime.history), + [] + ) ) - ), + ) + }), ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements() .runTime.median && { median: round( median( - this.workerNodes.map( - workerNode => workerNode.usage.runTime?.median ?? 0 + this.workerNodes.reduce( + (accumulator, workerNode) => + accumulator.concat(workerNode.usage.runTime.history), + [] ) ) ) @@ -387,37 +400,39 @@ export abstract class AbstractPool< .waitTime.aggregate && { waitTime: { minimum: round( - Math.min( + min( ...this.workerNodes.map( workerNode => workerNode.usage.waitTime?.minimum ?? Infinity ) ) ), maximum: round( - Math.max( + max( ...this.workerNodes.map( workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity ) ) ), - average: round( - this.workerNodes.reduce( - (accumulator, workerNode) => - accumulator + (workerNode.usage.waitTime?.aggregate ?? 0), - 0 - ) / - this.workerNodes.reduce( - (accumulator, workerNode) => - accumulator + (workerNode.usage.tasks?.executed ?? 0), - 0 + ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements() + .waitTime.average && { + average: round( + average( + this.workerNodes.reduce( + (accumulator, workerNode) => + accumulator.concat(workerNode.usage.waitTime.history), + [] + ) ) - ), + ) + }), ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements() .waitTime.median && { median: round( median( - this.workerNodes.map( - workerNode => workerNode.usage.waitTime?.median ?? 0 + this.workerNodes.reduce( + (accumulator, workerNode) => + accumulator.concat(workerNode.usage.waitTime.history), + [] ) ) ) @@ -427,6 +442,9 @@ export abstract class AbstractPool< } } + /** + * The pool readiness boolean status. + */ private get ready (): boolean { return ( this.workerNodes.reduce( @@ -440,7 +458,7 @@ export abstract class AbstractPool< } /** - * Gets the approximate pool utilization. + * The approximate pool utilization. * * @returns The pool utilization. */ @@ -461,36 +479,29 @@ export abstract class AbstractPool< } /** - * Pool type. + * The pool type. * * If it is `'dynamic'`, it provides the `max` property. */ protected abstract get type (): PoolType /** - * Gets the worker type. + * The worker type. */ protected abstract get worker (): WorkerType /** - * Pool minimum size. - */ - protected abstract get minSize (): number - - /** - * Pool maximum size. + * The pool minimum size. */ - protected abstract get maxSize (): number + protected get minSize (): number { + return this.numberOfWorkers + } /** - * Get the worker given its id. - * - * @param workerId - The worker id. - * @returns The worker if found in the pool worker nodes, `undefined` otherwise. + * The pool maximum size. */ - private getWorkerById (workerId: number): Worker | undefined { - return this.workerNodes.find(workerNode => workerNode.info.id === workerId) - ?.worker + protected get maxSize (): number { + return this.max ?? this.numberOfWorkers } /** @@ -499,11 +510,10 @@ export abstract class AbstractPool< * @param message - The received message. * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid. */ - private checkMessageWorkerId (message: MessageValue): void { - if ( - message.workerId != null && - this.getWorkerById(message.workerId) == null - ) { + private checkMessageWorkerId (message: MessageValue): void { + if (message.workerId == null) { + throw new Error('Worker message received without worker id') + } else if (this.getWorkerNodeKeyByWorkerId(message.workerId) === -1) { throw new Error( `Worker message received from unknown worker '${message.workerId}'` ) @@ -511,14 +521,14 @@ export abstract class AbstractPool< } /** - * Gets the given worker its worker node key. + * Gets the worker node key given its worker id. * - * @param worker - The worker. - * @returns The worker node key if found in the pool worker nodes, `-1` otherwise. + * @param workerId - The worker id. + * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise. */ - protected getWorkerNodeKey (worker: Worker): number { + private getWorkerNodeKeyByWorkerId (workerId: number | undefined): number { return this.workerNodes.findIndex( - workerNode => workerNode.worker === worker + workerNode => workerNode.info.id === workerId ) } @@ -527,7 +537,7 @@ export abstract class AbstractPool< workerChoiceStrategy: WorkerChoiceStrategy, workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions ): void { - this.checkValidWorkerChoiceStrategy(workerChoiceStrategy) + checkValidWorkerChoiceStrategy(workerChoiceStrategy) this.opts.workerChoiceStrategy = workerChoiceStrategy this.workerChoiceStrategyContext.setWorkerChoiceStrategy( this.opts.workerChoiceStrategy @@ -535,9 +545,9 @@ export abstract class AbstractPool< if (workerChoiceStrategyOptions != null) { this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions) } - for (const workerNode of this.workerNodes) { + for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) { workerNode.resetUsage() - this.setWorkerStatistics(workerNode.worker) + this.sendStatisticsMessageToWorker(workerNodeKey) } } @@ -546,7 +556,10 @@ export abstract class AbstractPool< workerChoiceStrategyOptions: WorkerChoiceStrategyOptions ): void { this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions) - this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions + this.opts.workerChoiceStrategyOptions = { + ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS, + ...workerChoiceStrategyOptions + } this.workerChoiceStrategyContext.setOptions( this.opts.workerChoiceStrategyOptions ) @@ -558,6 +571,8 @@ export abstract class AbstractPool< tasksQueueOptions?: TasksQueueOptions ): void { if (this.opts.enableTasksQueue === true && !enable) { + this.unsetTaskStealing() + this.unsetTasksStealingOnBackPressure() this.flushTasksQueues() } this.opts.enableTasksQueue = enable @@ -567,9 +582,22 @@ export abstract class AbstractPool< /** @inheritDoc */ public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void { if (this.opts.enableTasksQueue === true) { - this.checkValidTasksQueueOptions(tasksQueueOptions) + checkValidTasksQueueOptions(tasksQueueOptions) this.opts.tasksQueueOptions = this.buildTasksQueueOptions(tasksQueueOptions) + this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number) + if (this.opts.tasksQueueOptions.taskStealing === true) { + this.unsetTaskStealing() + this.setTaskStealing() + } else { + this.unsetTaskStealing() + } + if (this.opts.tasksQueueOptions.tasksStealingOnBackPressure === true) { + this.unsetTasksStealingOnBackPressure() + this.setTasksStealingOnBackPressure() + } else { + this.unsetTasksStealingOnBackPressure() + } } else if (this.opts.tasksQueueOptions != null) { delete this.opts.tasksQueueOptions } @@ -579,7 +607,50 @@ export abstract class AbstractPool< tasksQueueOptions: TasksQueueOptions ): TasksQueueOptions { return { - concurrency: tasksQueueOptions?.concurrency ?? 1 + ...getDefaultTasksQueueOptions(this.maxSize), + ...tasksQueueOptions + } + } + + private setTasksQueueSize (size: number): void { + for (const workerNode of this.workerNodes) { + workerNode.tasksQueueBackPressureSize = size + } + } + + private setTaskStealing (): void { + for (const [workerNodeKey] of this.workerNodes.entries()) { + this.workerNodes[workerNodeKey].on( + 'idleWorkerNode', + this.handleIdleWorkerNodeEvent + ) + } + } + + private unsetTaskStealing (): void { + for (const [workerNodeKey] of this.workerNodes.entries()) { + this.workerNodes[workerNodeKey].off( + 'idleWorkerNode', + this.handleIdleWorkerNodeEvent + ) + } + } + + private setTasksStealingOnBackPressure (): void { + for (const [workerNodeKey] of this.workerNodes.entries()) { + this.workerNodes[workerNodeKey].on( + 'backPressure', + this.handleBackPressureEvent + ) + } + } + + private unsetTasksStealingOnBackPressure (): void { + for (const [workerNodeKey] of this.workerNodes.entries()) { + this.workerNodes[workerNodeKey].off( + 'backPressure', + this.handleBackPressureEvent + ) } } @@ -600,222 +671,522 @@ export abstract class AbstractPool< protected abstract get busy (): boolean /** - * Whether worker nodes are executing at least one task. + * Whether worker nodes are executing concurrently their tasks quota or not. * * @returns Worker nodes busyness boolean status. */ protected internalBusy (): boolean { + if (this.opts.enableTasksQueue === true) { + return ( + this.workerNodes.findIndex( + workerNode => + workerNode.info.ready && + workerNode.usage.tasks.executing < + (this.opts.tasksQueueOptions?.concurrency as number) + ) === -1 + ) + } return ( - this.workerNodes.findIndex(workerNode => { - return workerNode.usage.tasks.executing === 0 - }) === -1 + this.workerNodes.findIndex( + workerNode => + workerNode.info.ready && workerNode.usage.tasks.executing === 0 + ) === -1 ) } - /** @inheritDoc */ - public async execute (data?: Data, name?: string): Promise { - const timestamp = performance.now() - const workerNodeKey = this.chooseWorkerNode() - const submittedTask: Task = { - name: name ?? DEFAULT_TASK_NAME, - // eslint-disable-next-line @typescript-eslint/consistent-type-assertions - data: data ?? ({} as Data), - timestamp, - workerId: this.getWorkerInfo(workerNodeKey).id as number, - id: randomUUID() - } - const res = new Promise((resolve, reject) => { - this.promiseResponseMap.set(submittedTask.id as string, { - resolve, - reject, - worker: this.workerNodes[workerNodeKey].worker - }) - }) - if ( - this.opts.enableTasksQueue === true && - (this.busy || + private isWorkerNodeBusy (workerNodeKey: number): boolean { + if (this.opts.enableTasksQueue === true) { + return ( this.workerNodes[workerNodeKey].usage.tasks.executing >= - ((this.opts.tasksQueueOptions as TasksQueueOptions) - .concurrency as number)) - ) { - this.enqueueTask(workerNodeKey, submittedTask) - } else { - this.executeTask(workerNodeKey, submittedTask) + (this.opts.tasksQueueOptions?.concurrency as number) + ) } - this.checkAndEmitEvents() - // eslint-disable-next-line @typescript-eslint/return-await - return res + return this.workerNodes[workerNodeKey].usage.tasks.executing > 0 } - /** @inheritDoc */ - public async destroy (): Promise { - await Promise.all( - this.workerNodes.map(async (workerNode, workerNodeKey) => { - this.flushTasksQueue(workerNodeKey) - // FIXME: wait for tasks to be finished - const workerExitPromise = new Promise(resolve => { - workerNode.worker.on('exit', () => { - resolve() - }) - }) - await this.destroyWorker(workerNode.worker) - await workerExitPromise - }) - ) + private async sendTaskFunctionOperationToWorker ( + workerNodeKey: number, + message: MessageValue + ): Promise { + return await new Promise((resolve, reject) => { + const taskFunctionOperationListener = ( + message: MessageValue + ): void => { + this.checkMessageWorkerId(message) + const workerId = this.getWorkerInfo(workerNodeKey).id as number + if ( + message.taskFunctionOperationStatus != null && + message.workerId === workerId + ) { + if (message.taskFunctionOperationStatus) { + resolve(true) + } else if (!message.taskFunctionOperationStatus) { + reject( + new Error( + `Task function operation '${ + message.taskFunctionOperation as string + }' failed on worker ${message.workerId} with error: '${ + message.workerError?.message as string + }'` + ) + ) + } + this.deregisterWorkerMessageListener( + this.getWorkerNodeKeyByWorkerId(message.workerId), + taskFunctionOperationListener + ) + } + } + this.registerWorkerMessageListener( + workerNodeKey, + taskFunctionOperationListener + ) + this.sendToWorker(workerNodeKey, message) + }) } - /** - * Terminates the given worker. - * - * @param worker - A worker within `workerNodes`. - */ - protected abstract destroyWorker (worker: Worker): void | Promise + private async sendTaskFunctionOperationToWorkers ( + message: MessageValue + ): Promise { + return await new Promise((resolve, reject) => { + const responsesReceived = new Array>() + const taskFunctionOperationsListener = ( + message: MessageValue + ): void => { + this.checkMessageWorkerId(message) + if (message.taskFunctionOperationStatus != null) { + responsesReceived.push(message) + if (responsesReceived.length === this.workerNodes.length) { + if ( + responsesReceived.every( + message => message.taskFunctionOperationStatus === true + ) + ) { + resolve(true) + } else if ( + responsesReceived.some( + message => message.taskFunctionOperationStatus === false + ) + ) { + const errorResponse = responsesReceived.find( + response => response.taskFunctionOperationStatus === false + ) + reject( + new Error( + `Task function operation '${ + message.taskFunctionOperation as string + }' failed on worker ${ + errorResponse?.workerId as number + } with error: '${ + errorResponse?.workerError?.message as string + }'` + ) + ) + } + this.deregisterWorkerMessageListener( + this.getWorkerNodeKeyByWorkerId(message.workerId), + taskFunctionOperationsListener + ) + } + } + } + for (const [workerNodeKey] of this.workerNodes.entries()) { + this.registerWorkerMessageListener( + workerNodeKey, + taskFunctionOperationsListener + ) + this.sendToWorker(workerNodeKey, message) + } + }) + } - /** - * Setup hook to execute code before worker nodes are created in the abstract constructor. - * Can be overridden. - * - * @virtual - */ - protected setupHook (): void { - // Intentionally empty + /** @inheritDoc */ + public hasTaskFunction (name: string): boolean { + for (const workerNode of this.workerNodes) { + if ( + Array.isArray(workerNode.info.taskFunctionNames) && + workerNode.info.taskFunctionNames.includes(name) + ) { + return true + } + } + return false } - /** - * Should return whether the worker is the main worker or not. - */ - protected abstract isMain (): boolean + /** @inheritDoc */ + public async addTaskFunction ( + name: string, + fn: TaskFunction + ): Promise { + if (typeof name !== 'string') { + throw new TypeError('name argument must be a string') + } + if (typeof name === 'string' && name.trim().length === 0) { + throw new TypeError('name argument must not be an empty string') + } + if (typeof fn !== 'function') { + throw new TypeError('fn argument must be a function') + } + const opResult = await this.sendTaskFunctionOperationToWorkers({ + taskFunctionOperation: 'add', + taskFunctionName: name, + taskFunction: fn.toString() + }) + this.taskFunctions.set(name, fn) + return opResult + } - /** - * Hook executed before the worker task execution. - * Can be overridden. - * - * @param workerNodeKey - The worker node key. - * @param task - The task to execute. - */ - protected beforeTaskExecutionHook ( - workerNodeKey: number, - task: Task - ): void { - const workerUsage = this.workerNodes[workerNodeKey].usage - ++workerUsage.tasks.executing - this.updateWaitTimeWorkerUsage(workerUsage, task) - const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage( - task.name as string - ) as WorkerUsage - ++taskWorkerUsage.tasks.executing - this.updateWaitTimeWorkerUsage(taskWorkerUsage, task) + /** @inheritDoc */ + public async removeTaskFunction (name: string): Promise { + if (!this.taskFunctions.has(name)) { + throw new Error( + 'Cannot remove a task function not handled on the pool side' + ) + } + const opResult = await this.sendTaskFunctionOperationToWorkers({ + taskFunctionOperation: 'remove', + taskFunctionName: name + }) + this.deleteTaskFunctionWorkerUsages(name) + this.taskFunctions.delete(name) + return opResult } - /** - * Hook executed after the worker task execution. - * Can be overridden. - * - * @param worker - The worker. - * @param message - The received message. - */ - protected afterTaskExecutionHook ( - worker: Worker, - message: MessageValue - ): void { - const workerNodeKey = this.getWorkerNodeKey(worker) - const workerUsage = this.workerNodes[workerNodeKey].usage - this.updateTaskStatisticsWorkerUsage(workerUsage, message) - this.updateRunTimeWorkerUsage(workerUsage, message) - this.updateEluWorkerUsage(workerUsage, message) - const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage( - message.taskPerformance?.name ?? DEFAULT_TASK_NAME - ) as WorkerUsage - this.updateTaskStatisticsWorkerUsage(taskWorkerUsage, message) - this.updateRunTimeWorkerUsage(taskWorkerUsage, message) - this.updateEluWorkerUsage(taskWorkerUsage, message) - } - - private updateTaskStatisticsWorkerUsage ( - workerUsage: WorkerUsage, - message: MessageValue - ): void { - const workerTaskStatistics = workerUsage.tasks - --workerTaskStatistics.executing - if (message.taskError == null) { - ++workerTaskStatistics.executed - } else { - ++workerTaskStatistics.failed + /** @inheritDoc */ + public listTaskFunctionNames (): string[] { + for (const workerNode of this.workerNodes) { + if ( + Array.isArray(workerNode.info.taskFunctionNames) && + workerNode.info.taskFunctionNames.length > 0 + ) { + return workerNode.info.taskFunctionNames + } } + return [] } - private updateRunTimeWorkerUsage ( - workerUsage: WorkerUsage, - message: MessageValue - ): void { - updateMeasurementStatistics( - workerUsage.runTime, - this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime, - message.taskPerformance?.runTime ?? 0, - workerUsage.tasks.executed - ) + /** @inheritDoc */ + public async setDefaultTaskFunction (name: string): Promise { + return await this.sendTaskFunctionOperationToWorkers({ + taskFunctionOperation: 'default', + taskFunctionName: name + }) } - private updateWaitTimeWorkerUsage ( - workerUsage: WorkerUsage, - task: Task - ): void { - const timestamp = performance.now() - const taskWaitTime = timestamp - (task.timestamp ?? timestamp) - updateMeasurementStatistics( - workerUsage.waitTime, - this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime, - taskWaitTime, - workerUsage.tasks.executed - ) + private deleteTaskFunctionWorkerUsages (name: string): void { + for (const workerNode of this.workerNodes) { + workerNode.deleteTaskFunctionWorkerUsage(name) + } } - private updateEluWorkerUsage ( - workerUsage: WorkerUsage, - message: MessageValue - ): void { - const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements = - this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu - updateMeasurementStatistics( - workerUsage.elu.active, - eluTaskStatisticsRequirements, - message.taskPerformance?.elu?.active ?? 0, - workerUsage.tasks.executed + private shallExecuteTask (workerNodeKey: number): boolean { + return ( + this.tasksQueueSize(workerNodeKey) === 0 && + this.workerNodes[workerNodeKey].usage.tasks.executing < + (this.opts.tasksQueueOptions?.concurrency as number) ) - updateMeasurementStatistics( - workerUsage.elu.idle, - eluTaskStatisticsRequirements, - message.taskPerformance?.elu?.idle ?? 0, - workerUsage.tasks.executed + } + + /** @inheritDoc */ + public async execute ( + data?: Data, + name?: string, + transferList?: TransferListItem[] + ): Promise { + return await new Promise((resolve, reject) => { + if (!this.started) { + reject(new Error('Cannot execute a task on not started pool')) + return + } + if (this.destroying) { + reject(new Error('Cannot execute a task on destroying pool')) + return + } + if (name != null && typeof name !== 'string') { + reject(new TypeError('name argument must be a string')) + return + } + if ( + name != null && + typeof name === 'string' && + name.trim().length === 0 + ) { + reject(new TypeError('name argument must not be an empty string')) + return + } + if (transferList != null && !Array.isArray(transferList)) { + reject(new TypeError('transferList argument must be an array')) + return + } + const timestamp = performance.now() + const workerNodeKey = this.chooseWorkerNode() + const task: Task = { + name: name ?? DEFAULT_TASK_NAME, + // eslint-disable-next-line @typescript-eslint/consistent-type-assertions + data: data ?? ({} as Data), + transferList, + timestamp, + taskId: randomUUID() + } + this.promiseResponseMap.set(task.taskId as string, { + resolve, + reject, + workerNodeKey, + ...(this.emitter != null && { + asyncResource: new AsyncResource('poolifier:task', { + triggerAsyncId: this.emitter.asyncId, + requireManualDestroy: true + }) + }) + }) + if ( + this.opts.enableTasksQueue === false || + (this.opts.enableTasksQueue === true && + this.shallExecuteTask(workerNodeKey)) + ) { + this.executeTask(workerNodeKey, task) + } else { + this.enqueueTask(workerNodeKey, task) + } + }) + } + + /** @inheritdoc */ + public start (): void { + if (this.started) { + throw new Error('Cannot start an already started pool') + } + if (this.starting) { + throw new Error('Cannot start an already starting pool') + } + if (this.destroying) { + throw new Error('Cannot start a destroying pool') + } + this.starting = true + while ( + this.workerNodes.reduce( + (accumulator, workerNode) => + !workerNode.info.dynamic ? accumulator + 1 : accumulator, + 0 + ) < this.numberOfWorkers + ) { + this.createAndSetupWorkerNode() + } + this.starting = false + this.started = true + } + + /** @inheritDoc */ + public async destroy (): Promise { + if (!this.started) { + throw new Error('Cannot destroy an already destroyed pool') + } + if (this.starting) { + throw new Error('Cannot destroy an starting pool') + } + if (this.destroying) { + throw new Error('Cannot destroy an already destroying pool') + } + this.destroying = true + await Promise.all( + this.workerNodes.map(async (_workerNode, workerNodeKey) => { + await this.destroyWorkerNode(workerNodeKey) + }) ) - if (eluTaskStatisticsRequirements.aggregate) { - if (message.taskPerformance?.elu != null) { - if (workerUsage.elu.utilization != null) { - workerUsage.elu.utilization = - (workerUsage.elu.utilization + - message.taskPerformance.elu.utilization) / - 2 - } else { - workerUsage.elu.utilization = message.taskPerformance.elu.utilization + this.emitter?.emit(PoolEvents.destroy, this.info) + this.emitter?.emitDestroy() + this.emitter?.removeAllListeners() + this.readyEventEmitted = false + this.destroying = false + this.started = false + } + + protected async sendKillMessageToWorker ( + workerNodeKey: number + ): Promise { + await new Promise((resolve, reject) => { + if (workerNodeKey < 0 || workerNodeKey >= this.workerNodes.length) { + reject(new Error(`Invalid worker node key '${workerNodeKey}'`)) + return + } + const killMessageListener = (message: MessageValue): void => { + this.checkMessageWorkerId(message) + if (message.kill === 'success') { + resolve() + } else if (message.kill === 'failure') { + reject( + new Error( + `Kill message handling failed on worker ${ + message.workerId as number + }` + ) + ) } } + // FIXME: should be registered only once + this.registerWorkerMessageListener(workerNodeKey, killMessageListener) + this.sendToWorker(workerNodeKey, { kill: true }) + }) + } + + /** + * Terminates the worker node given its worker node key. + * + * @param workerNodeKey - The worker node key. + */ + protected async destroyWorkerNode (workerNodeKey: number): Promise { + this.flagWorkerNodeAsNotReady(workerNodeKey) + const flushedTasks = this.flushTasksQueue(workerNodeKey) + const workerNode = this.workerNodes[workerNodeKey] + await waitWorkerNodeEvents( + workerNode, + 'taskFinished', + flushedTasks, + this.opts.tasksQueueOptions?.tasksFinishedTimeout ?? + getDefaultTasksQueueOptions(this.maxSize).tasksFinishedTimeout + ) + await this.sendKillMessageToWorker(workerNodeKey) + await workerNode.terminate() + } + + /** + * Setup hook to execute code before worker nodes are created in the abstract constructor. + * Can be overridden. + * + * @virtual + */ + protected setupHook (): void { + /* Intentionally empty */ + } + + /** + * Should return whether the worker is the main worker or not. + */ + protected abstract isMain (): boolean + + /** + * Hook executed before the worker task execution. + * Can be overridden. + * + * @param workerNodeKey - The worker node key. + * @param task - The task to execute. + */ + protected beforeTaskExecutionHook ( + workerNodeKey: number, + task: Task + ): void { + if (this.workerNodes[workerNodeKey]?.usage != null) { + const workerUsage = this.workerNodes[workerNodeKey].usage + ++workerUsage.tasks.executing + updateWaitTimeWorkerUsage( + this.workerChoiceStrategyContext, + workerUsage, + task + ) + } + if ( + this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) && + this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage( + task.name as string + ) != null + ) { + const taskFunctionWorkerUsage = this.workerNodes[ + workerNodeKey + ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage + ++taskFunctionWorkerUsage.tasks.executing + updateWaitTimeWorkerUsage( + this.workerChoiceStrategyContext, + taskFunctionWorkerUsage, + task + ) } } + /** + * Hook executed after the worker task execution. + * Can be overridden. + * + * @param workerNodeKey - The worker node key. + * @param message - The received message. + */ + protected afterTaskExecutionHook ( + workerNodeKey: number, + message: MessageValue + ): void { + let needWorkerChoiceStrategyUpdate = false + if (this.workerNodes[workerNodeKey]?.usage != null) { + const workerUsage = this.workerNodes[workerNodeKey].usage + updateTaskStatisticsWorkerUsage(workerUsage, message) + updateRunTimeWorkerUsage( + this.workerChoiceStrategyContext, + workerUsage, + message + ) + updateEluWorkerUsage( + this.workerChoiceStrategyContext, + workerUsage, + message + ) + needWorkerChoiceStrategyUpdate = true + } + if ( + this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) && + this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage( + message.taskPerformance?.name as string + ) != null + ) { + const taskFunctionWorkerUsage = this.workerNodes[ + workerNodeKey + ].getTaskFunctionWorkerUsage( + message.taskPerformance?.name as string + ) as WorkerUsage + updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message) + updateRunTimeWorkerUsage( + this.workerChoiceStrategyContext, + taskFunctionWorkerUsage, + message + ) + updateEluWorkerUsage( + this.workerChoiceStrategyContext, + taskFunctionWorkerUsage, + message + ) + needWorkerChoiceStrategyUpdate = true + } + if (needWorkerChoiceStrategyUpdate) { + this.workerChoiceStrategyContext.update(workerNodeKey) + } + } + + /** + * Whether the worker node shall update its task function worker usage or not. + * + * @param workerNodeKey - The worker node key. + * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise. + */ + private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean { + const workerInfo = this.getWorkerInfo(workerNodeKey) + return ( + workerInfo != null && + Array.isArray(workerInfo.taskFunctionNames) && + workerInfo.taskFunctionNames.length > 2 + ) + } + /** * Chooses a worker node for the next task. * * The default worker choice strategy uses a round robin algorithm to distribute the tasks. * - * @returns The worker node key + * @returns The chosen worker node key */ private chooseWorkerNode (): number { if (this.shallCreateDynamicWorker()) { - const worker = this.createAndSetupDynamicWorker() + const workerNodeKey = this.createAndSetupDynamicWorkerNode() if ( - this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker + this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage ) { - return this.getWorkerNodeKey(worker) + return workerNodeKey } } return this.workerChoiceStrategyContext.execute() @@ -831,288 +1202,660 @@ export abstract class AbstractPool< } /** - * Sends a message to the given worker. + * Sends a message to worker given its worker node key. * - * @param worker - The worker which should receive the message. + * @param workerNodeKey - The worker node key. * @param message - The message. + * @param transferList - The optional array of transferable objects. */ protected abstract sendToWorker ( - worker: Worker, - message: MessageValue + workerNodeKey: number, + message: MessageValue, + transferList?: TransferListItem[] ): void /** - * Creates a new worker. + * Creates a new, completely set up worker node. * - * @returns Newly created worker. + * @returns New, completely set up worker node key. */ - protected abstract createWorker (): Worker - - /** - * Creates a new worker and sets it up completely in the pool worker nodes. - * - * @returns New, completely set up worker. - */ - protected createAndSetupWorker (): Worker { - const worker = this.createWorker() - - worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION) - worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION) - worker.on('error', error => { - const workerNodeKey = this.getWorkerNodeKey(worker) - const workerInfo = this.getWorkerInfo(workerNodeKey) - workerInfo.ready = false - this.workerNodes[workerNodeKey].closeChannel() + protected createAndSetupWorkerNode (): number { + const workerNode = this.createWorkerNode() + workerNode.registerWorkerEventHandler( + 'online', + this.opts.onlineHandler ?? EMPTY_FUNCTION + ) + workerNode.registerWorkerEventHandler( + 'message', + this.opts.messageHandler ?? EMPTY_FUNCTION + ) + workerNode.registerWorkerEventHandler( + 'error', + this.opts.errorHandler ?? EMPTY_FUNCTION + ) + workerNode.registerWorkerEventHandler('error', (error: Error) => { + workerNode.info.ready = false this.emitter?.emit(PoolEvents.error, error) - if (this.opts.restartWorkerOnError === true && !this.starting) { - if (workerInfo.dynamic) { - this.createAndSetupDynamicWorker() + if ( + this.started && + !this.starting && + !this.destroying && + this.opts.restartWorkerOnError === true + ) { + if (workerNode.info.dynamic) { + this.createAndSetupDynamicWorkerNode() } else { - this.createAndSetupWorker() + this.createAndSetupWorkerNode() } } - if (this.opts.enableTasksQueue === true) { - this.redistributeQueuedTasks(workerNodeKey) + if (this.started && this.opts.enableTasksQueue === true) { + this.redistributeQueuedTasks(this.workerNodes.indexOf(workerNode)) } + workerNode?.terminate().catch(error => { + this.emitter?.emit(PoolEvents.error, error) + }) }) - worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION) - worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION) - worker.once('exit', () => { - this.removeWorkerNode(worker) + workerNode.registerWorkerEventHandler( + 'exit', + this.opts.exitHandler ?? EMPTY_FUNCTION + ) + workerNode.registerOnceWorkerEventHandler('exit', () => { + this.removeWorkerNode(workerNode) }) - - this.addWorkerNode(worker) - - this.afterWorkerSetup(worker) - - return worker + const workerNodeKey = this.addWorkerNode(workerNode) + this.afterWorkerNodeSetup(workerNodeKey) + return workerNodeKey } /** - * Creates a new dynamic worker and sets it up completely in the pool worker nodes. + * Creates a new, completely set up dynamic worker node. * - * @returns New, completely set up dynamic worker. + * @returns New, completely set up dynamic worker node key. */ - protected createAndSetupDynamicWorker (): Worker { - const worker = this.createAndSetupWorker() - this.registerWorkerMessageListener(worker, message => { - const workerNodeKey = this.getWorkerNodeKey(worker) + protected createAndSetupDynamicWorkerNode (): number { + const workerNodeKey = this.createAndSetupWorkerNode() + this.registerWorkerMessageListener(workerNodeKey, message => { + this.checkMessageWorkerId(message) + const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId( + message.workerId + ) + const workerUsage = this.workerNodes[localWorkerNodeKey].usage + // Kill message received from worker if ( isKillBehavior(KillBehaviors.HARD, message.kill) || - (message.kill != null && + (isKillBehavior(KillBehaviors.SOFT, message.kill) && ((this.opts.enableTasksQueue === false && - this.workerNodes[workerNodeKey].usage.tasks.executing === 0) || + workerUsage.tasks.executing === 0) || (this.opts.enableTasksQueue === true && - this.workerNodes[workerNodeKey].usage.tasks.executing === 0 && - this.tasksQueueSize(workerNodeKey) === 0))) + workerUsage.tasks.executing === 0 && + this.tasksQueueSize(localWorkerNodeKey) === 0))) ) { - // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime) - void (this.destroyWorker(worker) as Promise) + // Flag the worker node as not ready immediately + this.flagWorkerNodeAsNotReady(localWorkerNodeKey) + this.destroyWorkerNode(localWorkerNodeKey).catch(error => { + this.emitter?.emit(PoolEvents.error, error) + }) } }) - const workerInfo = this.getWorkerInfoByWorker(worker) + const workerInfo = this.getWorkerInfo(workerNodeKey) + this.sendToWorker(workerNodeKey, { + checkActive: true + }) + if (this.taskFunctions.size > 0) { + for (const [taskFunctionName, taskFunction] of this.taskFunctions) { + this.sendTaskFunctionOperationToWorker(workerNodeKey, { + taskFunctionOperation: 'add', + taskFunctionName, + taskFunction: taskFunction.toString() + }).catch(error => { + this.emitter?.emit(PoolEvents.error, error) + }) + } + } workerInfo.dynamic = true - if (this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker) { + if ( + this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady || + this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage + ) { workerInfo.ready = true } - this.sendToWorker(worker, { - checkActive: true, - workerId: workerInfo.id as number - }) - return worker + this.checkAndEmitDynamicWorkerCreationEvents() + return workerNodeKey } /** - * Registers a listener callback on the given worker. + * Registers a listener callback on the worker given its worker node key. * - * @param worker - The worker which should register a listener. + * @param workerNodeKey - The worker node key. * @param listener - The message listener callback. */ protected abstract registerWorkerMessageListener< Message extends Data | Response - >(worker: Worker, listener: (message: MessageValue) => void): void + >( + workerNodeKey: number, + listener: (message: MessageValue) => void + ): void + + /** + * Registers once a listener callback on the worker given its worker node key. + * + * @param workerNodeKey - The worker node key. + * @param listener - The message listener callback. + */ + protected abstract registerOnceWorkerMessageListener< + Message extends Data | Response + >( + workerNodeKey: number, + listener: (message: MessageValue) => void + ): void + + /** + * Deregisters a listener callback on the worker given its worker node key. + * + * @param workerNodeKey - The worker node key. + * @param listener - The message listener callback. + */ + protected abstract deregisterWorkerMessageListener< + Message extends Data | Response + >( + workerNodeKey: number, + listener: (message: MessageValue) => void + ): void /** - * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes. + * Method hooked up after a worker node has been newly created. * Can be overridden. * - * @param worker - The newly created worker. + * @param workerNodeKey - The newly created worker node key. */ - protected afterWorkerSetup (worker: Worker): void { + protected afterWorkerNodeSetup (workerNodeKey: number): void { // Listen to worker messages. - this.registerWorkerMessageListener(worker, this.workerListener()) + this.registerWorkerMessageListener( + workerNodeKey, + this.workerMessageListener + ) // Send the startup message to worker. - this.sendStartupMessageToWorker(worker) - // Setup worker task statistics computation. - this.setWorkerStatistics(worker) + this.sendStartupMessageToWorker(workerNodeKey) + // Send the statistics message to worker. + this.sendStatisticsMessageToWorker(workerNodeKey) + if (this.opts.enableTasksQueue === true) { + if (this.opts.tasksQueueOptions?.taskStealing === true) { + this.workerNodes[workerNodeKey].on( + 'idleWorkerNode', + this.handleIdleWorkerNodeEvent + ) + } + if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) { + this.workerNodes[workerNodeKey].on( + 'backPressure', + this.handleBackPressureEvent + ) + } + } } /** - * Sends the startup message to the given worker. + * Sends the startup message to worker given its worker node key. * - * @param worker - The worker which should receive the startup message. + * @param workerNodeKey - The worker node key. */ - protected abstract sendStartupMessageToWorker (worker: Worker): void + protected abstract sendStartupMessageToWorker (workerNodeKey: number): void + + /** + * Sends the statistics message to worker given its worker node key. + * + * @param workerNodeKey - The worker node key. + */ + private sendStatisticsMessageToWorker (workerNodeKey: number): void { + this.sendToWorker(workerNodeKey, { + statistics: { + runTime: + this.workerChoiceStrategyContext.getTaskStatisticsRequirements() + .runTime.aggregate, + elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements() + .elu.aggregate + } + }) + } + + private handleTask (workerNodeKey: number, task: Task): void { + if (this.shallExecuteTask(workerNodeKey)) { + this.executeTask(workerNodeKey, task) + } else { + this.enqueueTask(workerNodeKey, task) + } + } private redistributeQueuedTasks (workerNodeKey: number): void { + if (workerNodeKey === -1) { + return + } + if (this.workerNodes.length <= 1) { + return + } while (this.tasksQueueSize(workerNodeKey) > 0) { - let targetWorkerNodeKey: number = workerNodeKey - let minQueuedTasks = Infinity - for (const [workerNodeId, workerNode] of this.workerNodes.entries()) { - const workerInfo = this.getWorkerInfo(workerNodeId) - if ( - workerNodeId !== workerNodeKey && - workerInfo.ready && - workerNode.usage.tasks.queued === 0 - ) { - targetWorkerNodeKey = workerNodeId - break - } - if ( - workerNodeId !== workerNodeKey && - workerInfo.ready && - workerNode.usage.tasks.queued < minQueuedTasks - ) { - minQueuedTasks = workerNode.usage.tasks.queued - targetWorkerNodeKey = workerNodeId - } - } - this.enqueueTask( - targetWorkerNodeKey, + const destinationWorkerNodeKey = this.workerNodes.reduce( + (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => { + return workerNode.info.ready && + workerNode.usage.tasks.queued < + workerNodes[minWorkerNodeKey].usage.tasks.queued + ? workerNodeKey + : minWorkerNodeKey + }, + 0 + ) + this.handleTask( + destinationWorkerNodeKey, this.dequeueTask(workerNodeKey) as Task ) } } - /** - * This function is the listener registered for each worker message. - * - * @returns The listener function to execute when a message is received from a worker. - */ - protected workerListener (): (message: MessageValue) => void { - return message => { - this.checkMessageWorkerId(message) - if (message.ready != null) { - // Worker ready response received - this.handleWorkerReadyResponse(message) - } else if (message.id != null) { - // Task execution response received - this.handleTaskExecutionResponse(message) - } + private updateTaskStolenStatisticsWorkerUsage ( + workerNodeKey: number, + taskName: string + ): void { + const workerNode = this.workerNodes[workerNodeKey] + if (workerNode?.usage != null) { + ++workerNode.usage.tasks.stolen + } + if ( + this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) && + workerNode.getTaskFunctionWorkerUsage(taskName) != null + ) { + const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage( + taskName + ) as WorkerUsage + ++taskFunctionWorkerUsage.tasks.stolen } } - private handleWorkerReadyResponse (message: MessageValue): void { - this.getWorkerInfoByWorker( - this.getWorkerById(message.workerId) as Worker - ).ready = message.ready as boolean - if (this.emitter != null && this.ready) { - this.emitter.emit(PoolEvents.ready, this.info) + private updateTaskSequentiallyStolenStatisticsWorkerUsage ( + workerNodeKey: number + ): void { + const workerNode = this.workerNodes[workerNodeKey] + if (workerNode?.usage != null) { + ++workerNode.usage.tasks.sequentiallyStolen } } - private handleTaskExecutionResponse (message: MessageValue): void { - const promiseResponse = this.promiseResponseMap.get(message.id as string) - if (promiseResponse != null) { - if (message.taskError != null) { - this.emitter?.emit(PoolEvents.taskError, message.taskError) - promiseResponse.reject(message.taskError.message) + private updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage ( + workerNodeKey: number, + taskName: string + ): void { + const workerNode = this.workerNodes[workerNodeKey] + if ( + this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) && + workerNode.getTaskFunctionWorkerUsage(taskName) != null + ) { + const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage( + taskName + ) as WorkerUsage + ++taskFunctionWorkerUsage.tasks.sequentiallyStolen + } + } + + private resetTaskSequentiallyStolenStatisticsWorkerUsage ( + workerNodeKey: number + ): void { + const workerNode = this.workerNodes[workerNodeKey] + if (workerNode?.usage != null) { + workerNode.usage.tasks.sequentiallyStolen = 0 + } + } + + private resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage ( + workerNodeKey: number, + taskName: string + ): void { + const workerNode = this.workerNodes[workerNodeKey] + if ( + this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) && + workerNode.getTaskFunctionWorkerUsage(taskName) != null + ) { + const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage( + taskName + ) as WorkerUsage + taskFunctionWorkerUsage.tasks.sequentiallyStolen = 0 + } + } + + private readonly handleIdleWorkerNodeEvent = ( + eventDetail: WorkerNodeEventDetail, + previousStolenTask?: Task + ): void => { + if (this.workerNodes.length <= 1) { + return + } + const { workerNodeKey } = eventDetail + if (workerNodeKey == null) { + throw new Error( + 'WorkerNode event detail workerNodeKey attribute must be defined' + ) + } + const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks + if ( + previousStolenTask != null && + workerNodeTasksUsage.sequentiallyStolen > 0 && + (workerNodeTasksUsage.executing > 0 || + this.tasksQueueSize(workerNodeKey) > 0) + ) { + for (const taskName of this.workerNodes[workerNodeKey].info + .taskFunctionNames as string[]) { + this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage( + workerNodeKey, + taskName + ) + } + this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey) + return + } + const stolenTask = this.workerNodeStealTask(workerNodeKey) + if ( + this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) && + stolenTask != null + ) { + const taskFunctionTasksWorkerUsage = this.workerNodes[ + workerNodeKey + ].getTaskFunctionWorkerUsage(stolenTask.name as string) + ?.tasks as TaskStatistics + if ( + taskFunctionTasksWorkerUsage.sequentiallyStolen === 0 || + (previousStolenTask != null && + previousStolenTask.name === stolenTask.name && + taskFunctionTasksWorkerUsage.sequentiallyStolen > 0) + ) { + this.updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage( + workerNodeKey, + stolenTask.name as string + ) } else { - promiseResponse.resolve(message.data as Response) + this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage( + workerNodeKey, + stolenTask.name as string + ) } - this.afterTaskExecutionHook(promiseResponse.worker, message) - this.promiseResponseMap.delete(message.id as string) - const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker) + } + sleep(exponentialDelay(workerNodeTasksUsage.sequentiallyStolen)) + .then(() => { + this.handleIdleWorkerNodeEvent(eventDetail, stolenTask) + return undefined + }) + .catch(EMPTY_FUNCTION) + } + + private readonly workerNodeStealTask = ( + workerNodeKey: number + ): Task | undefined => { + const workerNodes = this.workerNodes + .slice() + .sort( + (workerNodeA, workerNodeB) => + workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued + ) + const sourceWorkerNode = workerNodes.find( + (sourceWorkerNode, sourceWorkerNodeKey) => + sourceWorkerNode.info.ready && + sourceWorkerNodeKey !== workerNodeKey && + sourceWorkerNode.usage.tasks.queued > 0 + ) + if (sourceWorkerNode != null) { + const task = sourceWorkerNode.popTask() as Task + this.handleTask(workerNodeKey, task) + this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey) + this.updateTaskStolenStatisticsWorkerUsage( + workerNodeKey, + task.name as string + ) + return task + } + } + + private readonly handleBackPressureEvent = ( + eventDetail: WorkerNodeEventDetail + ): void => { + if (this.workerNodes.length <= 1) { + return + } + const { workerId } = eventDetail + const sizeOffset = 1 + if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) { + return + } + const sourceWorkerNode = + this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)] + const workerNodes = this.workerNodes + .slice() + .sort( + (workerNodeA, workerNodeB) => + workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued + ) + for (const [workerNodeKey, workerNode] of workerNodes.entries()) { if ( - this.opts.enableTasksQueue === true && - this.tasksQueueSize(workerNodeKey) > 0 + sourceWorkerNode.usage.tasks.queued > 0 && + workerNode.info.ready && + workerNode.info.id !== workerId && + workerNode.usage.tasks.queued < + (this.opts.tasksQueueOptions?.size as number) - sizeOffset ) { - this.executeTask( + const task = sourceWorkerNode.popTask() as Task + this.handleTask(workerNodeKey, task) + this.updateTaskStolenStatisticsWorkerUsage( workerNodeKey, - this.dequeueTask(workerNodeKey) as Task + task.name as string ) } - this.workerChoiceStrategyContext.update(workerNodeKey) } } - private checkAndEmitEvents (): void { - if (this.emitter != null) { - if (this.busy) { - this.emitter.emit(PoolEvents.busy, this.info) + /** + * This method is the message listener registered on each worker. + */ + protected readonly workerMessageListener = ( + message: MessageValue + ): void => { + this.checkMessageWorkerId(message) + const { workerId, ready, taskId, taskFunctionNames } = message + if (ready != null && taskFunctionNames != null) { + // Worker ready response received from worker + this.handleWorkerReadyResponse(message) + } else if (taskId != null) { + // Task execution response received from worker + this.handleTaskExecutionResponse(message) + } else if (taskFunctionNames != null) { + // Task function names message received from worker + this.getWorkerInfo( + this.getWorkerNodeKeyByWorkerId(workerId) + ).taskFunctionNames = taskFunctionNames + } + } + + private handleWorkerReadyResponse (message: MessageValue): void { + const { workerId, ready, taskFunctionNames } = message + if (ready === false) { + throw new Error(`Worker ${workerId as number} failed to initialize`) + } + const workerInfo = this.getWorkerInfo( + this.getWorkerNodeKeyByWorkerId(workerId) + ) + workerInfo.ready = ready as boolean + workerInfo.taskFunctionNames = taskFunctionNames + if (!this.readyEventEmitted && this.ready) { + this.readyEventEmitted = true + this.emitter?.emit(PoolEvents.ready, this.info) + } + } + + private handleTaskExecutionResponse (message: MessageValue): void { + const { workerId, taskId, workerError, data } = message + const promiseResponse = this.promiseResponseMap.get(taskId as string) + if (promiseResponse != null) { + const { resolve, reject, workerNodeKey, asyncResource } = promiseResponse + const workerNode = this.workerNodes[workerNodeKey] + if (workerError != null) { + this.emitter?.emit(PoolEvents.taskError, workerError) + asyncResource != null + ? asyncResource.runInAsyncScope( + reject, + this.emitter, + workerError.message + ) + : reject(workerError.message) + } else { + asyncResource != null + ? asyncResource.runInAsyncScope(resolve, this.emitter, data) + : resolve(data as Response) + } + asyncResource?.emitDestroy() + this.afterTaskExecutionHook(workerNodeKey, message) + this.promiseResponseMap.delete(taskId as string) + workerNode?.emit('taskFinished', taskId) + if (this.opts.enableTasksQueue === true && !this.destroying) { + const workerNodeTasksUsage = workerNode.usage.tasks + if ( + this.tasksQueueSize(workerNodeKey) > 0 && + workerNodeTasksUsage.executing < + (this.opts.tasksQueueOptions?.concurrency as number) + ) { + this.executeTask( + workerNodeKey, + this.dequeueTask(workerNodeKey) as Task + ) + } + if ( + workerNodeTasksUsage.executing === 0 && + this.tasksQueueSize(workerNodeKey) === 0 && + workerNodeTasksUsage.sequentiallyStolen === 0 + ) { + workerNode.emit('idleWorkerNode', { + workerId: workerId as number, + workerNodeKey + }) + } } - if (this.type === PoolTypes.dynamic && this.full) { - this.emitter.emit(PoolEvents.full, this.info) + } + } + + private checkAndEmitTaskExecutionEvents (): void { + if (this.busy) { + this.emitter?.emit(PoolEvents.busy, this.info) + } + } + + private checkAndEmitTaskQueuingEvents (): void { + if (this.hasBackPressure()) { + this.emitter?.emit(PoolEvents.backPressure, this.info) + } + } + + private checkAndEmitDynamicWorkerCreationEvents (): void { + if (this.type === PoolTypes.dynamic) { + if (this.full) { + this.emitter?.emit(PoolEvents.full, this.info) } } } /** - * Gets the worker information from the given worker node key. + * Gets the worker information given its worker node key. * * @param workerNodeKey - The worker node key. * @returns The worker information. */ - private getWorkerInfo (workerNodeKey: number): WorkerInfo { - return this.workerNodes[workerNodeKey].info + protected getWorkerInfo (workerNodeKey: number): WorkerInfo { + return this.workerNodes[workerNodeKey]?.info } /** - * Gets the worker information from the given worker. + * Creates a worker node. * - * @param worker - The worker. - * @returns The worker information. - * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker is not found. + * @returns The created worker node. */ - protected getWorkerInfoByWorker (worker: Worker): WorkerInfo { - const workerNodeKey = this.getWorkerNodeKey(worker) - if (workerNodeKey === -1) { - throw new Error('Worker not found') + private createWorkerNode (): IWorkerNode { + const workerNode = new WorkerNode( + this.worker, + this.filePath, + { + env: this.opts.env, + workerOptions: this.opts.workerOptions, + tasksQueueBackPressureSize: + this.opts.tasksQueueOptions?.size ?? + getDefaultTasksQueueOptions(this.maxSize).size + } + ) + // Flag the worker node as ready at pool startup. + if (this.starting) { + workerNode.info.ready = true } - return this.workerNodes[workerNodeKey].info + return workerNode } /** - * Adds the given worker in the pool worker nodes. + * Adds the given worker node in the pool worker nodes. * - * @param worker - The worker. - * @returns The worker nodes length. + * @param workerNode - The worker node. + * @returns The added worker node key. + * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found. */ - private addWorkerNode (worker: Worker): number { - const workerNode = new WorkerNode(worker, this.worker) - // Flag the worker node as ready at pool startup. - if (this.starting) { - workerNode.info.ready = true + private addWorkerNode (workerNode: IWorkerNode): number { + this.workerNodes.push(workerNode) + const workerNodeKey = this.workerNodes.indexOf(workerNode) + if (workerNodeKey === -1) { + throw new Error('Worker added not found in worker nodes') } - return this.workerNodes.push(workerNode) + return workerNodeKey } /** - * Removes the given worker from the pool worker nodes. + * Removes the worker node from the pool worker nodes. * - * @param worker - The worker. + * @param workerNode - The worker node. */ - private removeWorkerNode (worker: Worker): void { - const workerNodeKey = this.getWorkerNodeKey(worker) + private removeWorkerNode (workerNode: IWorkerNode): void { + const workerNodeKey = this.workerNodes.indexOf(workerNode) if (workerNodeKey !== -1) { this.workerNodes.splice(workerNodeKey, 1) this.workerChoiceStrategyContext.remove(workerNodeKey) } } + protected flagWorkerNodeAsNotReady (workerNodeKey: number): void { + this.getWorkerInfo(workerNodeKey).ready = false + } + + /** @inheritDoc */ + public hasWorkerNodeBackPressure (workerNodeKey: number): boolean { + return ( + this.opts.enableTasksQueue === true && + this.workerNodes[workerNodeKey].hasBackPressure() + ) + } + + private hasBackPressure (): boolean { + return ( + this.opts.enableTasksQueue === true && + this.workerNodes.findIndex( + workerNode => !workerNode.hasBackPressure() + ) === -1 + ) + } + /** - * Executes the given task on the given worker. + * Executes the given task on the worker given its worker node key. * - * @param worker - The worker. + * @param workerNodeKey - The worker node key. * @param task - The task to execute. */ private executeTask (workerNodeKey: number, task: Task): void { this.beforeTaskExecutionHook(workerNodeKey, task) - this.sendToWorker(this.workerNodes[workerNodeKey].worker, task) + this.sendToWorker(workerNodeKey, task, task.transferList) + this.checkAndEmitTaskExecutionEvents() } private enqueueTask (workerNodeKey: number, task: Task): number { - return this.workerNodes[workerNodeKey].enqueueTask(task) + const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task) + this.checkAndEmitTaskQueuingEvents() + return tasksQueueSize } private dequeueTask (workerNodeKey: number): Task | undefined { @@ -1123,14 +1866,17 @@ export abstract class AbstractPool< return this.workerNodes[workerNodeKey].tasksQueueSize() } - private flushTasksQueue (workerNodeKey: number): void { + protected flushTasksQueue (workerNodeKey: number): number { + let flushedTasks = 0 while (this.tasksQueueSize(workerNodeKey) > 0) { this.executeTask( workerNodeKey, this.dequeueTask(workerNodeKey) as Task ) + ++flushedTasks } this.workerNodes[workerNodeKey].clearTasksQueue() + return flushedTasks } private flushTasksQueues (): void { @@ -1138,17 +1884,4 @@ export abstract class AbstractPool< this.flushTasksQueue(workerNodeKey) } } - - private setWorkerStatistics (worker: Worker): void { - this.sendToWorker(worker, { - statistics: { - runTime: - this.workerChoiceStrategyContext.getTaskStatisticsRequirements() - .runTime.aggregate, - elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements() - .elu.aggregate - }, - workerId: this.getWorkerInfoByWorker(worker).id as number - }) - } }