X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=b5e528ec643bb7d58325ab46d49f82c64f0f442b;hb=bcf1c155ec2e2d9208c8f818abd031662bd61d7f;hp=b72d8cbcb4c3354294dda16718a6d229e64d04e0;hpb=f8b3d84c1c95773a1e18d2c025d89d73c4f2ab15;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index b72d8cbc..b5e528ec 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -1,17 +1,18 @@ +import { AsyncResource } from 'node:async_hooks' import { randomUUID } from 'node:crypto' +import { EventEmitterAsyncResource } from 'node:events' import { performance } from 'node:perf_hooks' import type { TransferListItem } from 'node:worker_threads' -import { EventEmitterAsyncResource } from 'node:events' -import { AsyncResource } from 'node:async_hooks' + import type { MessageValue, PromiseResponseWrapper, Task -} from '../utility-types' +} from '../utility-types.js' import { + average, DEFAULT_TASK_NAME, EMPTY_FUNCTION, - average, exponentialDelay, isKillBehavior, isPlainObject, @@ -20,9 +21,9 @@ import { min, round, sleep -} from '../utils' -import { KillBehaviors } from '../worker/worker-options' -import type { TaskFunction } from '../worker/task-functions' +} from '../utils.js' +import type { TaskFunction } from '../worker/task-functions.js' +import { KillBehaviors } from '../worker/worker-options.js' import { type IPool, PoolEvents, @@ -31,25 +32,14 @@ import { type PoolType, PoolTypes, type TasksQueueOptions -} from './pool' -import type { - IWorker, - IWorkerNode, - TaskStatistics, - WorkerInfo, - WorkerNodeEventDetail, - WorkerType, - WorkerUsage -} from './worker' +} from './pool.js' import { Measurements, WorkerChoiceStrategies, type WorkerChoiceStrategy, type WorkerChoiceStrategyOptions -} from './selection-strategies/selection-strategies-types' -import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context' -import { version } from './version' -import { WorkerNode } from './worker-node' +} from './selection-strategies/selection-strategies-types.js' +import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context.js' import { checkFilePath, checkValidTasksQueueOptions, @@ -60,7 +50,16 @@ import { updateTaskStatisticsWorkerUsage, updateWaitTimeWorkerUsage, waitWorkerNodeEvents -} from './utils' +} from './utils.js' +import { version } from './version.js' +import type { + IWorker, + IWorkerNode, + WorkerInfo, + WorkerNodeEventDetail, + WorkerType +} from './worker.js' +import { WorkerNode } from './worker-node.js' /** * Base class that implements some shared logic for all poolifier pools. @@ -93,7 +92,7 @@ export abstract class AbstractPool< /** * Worker choice strategy context referencing a worker choice algorithm implementation. */ - protected workerChoiceStrategyContext: WorkerChoiceStrategyContext< + protected workerChoiceStrategyContext?: WorkerChoiceStrategyContext< Worker, Data, Response @@ -118,6 +117,10 @@ export abstract class AbstractPool< * Whether the pool is destroying or not. */ private destroying: boolean + /** + * Whether the minimum number of workers is starting or not. + */ + private startingMinimumNumberOfWorkers: boolean /** * Whether the pool ready event has been emitted or not. */ @@ -176,6 +179,7 @@ export abstract class AbstractPool< this.starting = false this.destroying = false this.readyEventEmitted = false + this.startingMinimumNumberOfWorkers = false if (this.opts.startWorkers === true) { this.start() } @@ -191,7 +195,9 @@ export abstract class AbstractPool< } } - private checkMinimumNumberOfWorkers (minimumNumberOfWorkers: number): void { + private checkMinimumNumberOfWorkers ( + minimumNumberOfWorkers: number | undefined + ): void { if (minimumNumberOfWorkers == null) { throw new Error( 'Cannot instantiate a pool without specifying the number of workers' @@ -212,13 +218,11 @@ export abstract class AbstractPool< private checkPoolOptions (opts: PoolOptions): void { if (isPlainObject(opts)) { this.opts.startWorkers = opts.startWorkers ?? true - checkValidWorkerChoiceStrategy( - opts.workerChoiceStrategy as WorkerChoiceStrategy - ) + checkValidWorkerChoiceStrategy(opts.workerChoiceStrategy) this.opts.workerChoiceStrategy = opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN this.checkValidWorkerChoiceStrategyOptions( - opts.workerChoiceStrategyOptions as WorkerChoiceStrategyOptions + opts.workerChoiceStrategyOptions ) if (opts.workerChoiceStrategyOptions != null) { this.opts.workerChoiceStrategyOptions = opts.workerChoiceStrategyOptions @@ -227,9 +231,9 @@ export abstract class AbstractPool< this.opts.enableEvents = opts.enableEvents ?? true this.opts.enableTasksQueue = opts.enableTasksQueue ?? false if (this.opts.enableTasksQueue) { - checkValidTasksQueueOptions(opts.tasksQueueOptions as TasksQueueOptions) + checkValidTasksQueueOptions(opts.tasksQueueOptions) this.opts.tasksQueueOptions = this.buildTasksQueueOptions( - opts.tasksQueueOptions as TasksQueueOptions + opts.tasksQueueOptions ) } } else { @@ -238,7 +242,7 @@ export abstract class AbstractPool< } private checkValidWorkerChoiceStrategyOptions ( - workerChoiceStrategyOptions: WorkerChoiceStrategyOptions + workerChoiceStrategyOptions: WorkerChoiceStrategyOptions | undefined ): void { if ( workerChoiceStrategyOptions != null && @@ -283,13 +287,17 @@ export abstract class AbstractPool< worker: this.worker, started: this.started, ready: this.ready, - strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy, + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + strategy: this.opts.workerChoiceStrategy!, + strategyRetries: this.workerChoiceStrategyContext?.retriesCount ?? 0, minSize: this.minimumNumberOfWorkers, maxSize: this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers, ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements() - .runTime.aggregate && - this.workerChoiceStrategyContext?.getTaskStatisticsRequirements() - .waitTime.aggregate && { utilization: round(this.utilization) }), + .runTime.aggregate === true && + this.workerChoiceStrategyContext.getTaskStatisticsRequirements() + .waitTime.aggregate && { + utilization: round(this.utilization) + }), workerNodes: this.workerNodes.length, idleWorkerNodes: this.workerNodes.reduce( (accumulator, workerNode) => @@ -330,7 +338,7 @@ export abstract class AbstractPool< ...(this.opts.enableTasksQueue === true && { maxQueuedTasks: this.workerNodes.reduce( (accumulator, workerNode) => - accumulator + (workerNode.usage.tasks?.maxQueued ?? 0), + accumulator + (workerNode.usage.tasks.maxQueued ?? 0), 0 ) }), @@ -350,23 +358,23 @@ export abstract class AbstractPool< 0 ), ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements() - .runTime.aggregate && { + .runTime.aggregate === true && { runTime: { minimum: round( min( ...this.workerNodes.map( - workerNode => workerNode.usage.runTime?.minimum ?? Infinity + workerNode => workerNode.usage.runTime.minimum ?? Infinity ) ) ), maximum: round( max( ...this.workerNodes.map( - workerNode => workerNode.usage.runTime?.maximum ?? -Infinity + workerNode => workerNode.usage.runTime.maximum ?? -Infinity ) ) ), - ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements() + ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements() .runTime.average && { average: round( average( @@ -378,7 +386,7 @@ export abstract class AbstractPool< ) ) }), - ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements() + ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements() .runTime.median && { median: round( median( @@ -393,23 +401,23 @@ export abstract class AbstractPool< } }), ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements() - .waitTime.aggregate && { + .waitTime.aggregate === true && { waitTime: { minimum: round( min( ...this.workerNodes.map( - workerNode => workerNode.usage.waitTime?.minimum ?? Infinity + workerNode => workerNode.usage.waitTime.minimum ?? Infinity ) ) ), maximum: round( max( ...this.workerNodes.map( - workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity + workerNode => workerNode.usage.waitTime.maximum ?? -Infinity ) ) ), - ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements() + ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements() .waitTime.average && { average: round( average( @@ -421,7 +429,7 @@ export abstract class AbstractPool< ) ) }), - ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements() + ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements() .waitTime.median && { median: round( median( @@ -442,6 +450,9 @@ export abstract class AbstractPool< * The pool readiness boolean status. */ private get ready (): boolean { + if (this.empty) { + return false + } return ( this.workerNodes.reduce( (accumulator, workerNode) => @@ -453,6 +464,13 @@ export abstract class AbstractPool< ) } + /** + * The pool emptiness boolean status. + */ + protected get empty (): boolean { + return this.minimumNumberOfWorkers === 0 && this.workerNodes.length === 0 + } + /** * The approximate pool utilization. * @@ -464,12 +482,12 @@ export abstract class AbstractPool< (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers) const totalTasksRunTime = this.workerNodes.reduce( (accumulator, workerNode) => - accumulator + (workerNode.usage.runTime?.aggregate ?? 0), + accumulator + (workerNode.usage.runTime.aggregate ?? 0), 0 ) const totalTasksWaitTime = this.workerNodes.reduce( (accumulator, workerNode) => - accumulator + (workerNode.usage.waitTime?.aggregate ?? 0), + accumulator + (workerNode.usage.waitTime.aggregate ?? 0), 0 ) return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity @@ -522,7 +540,7 @@ export abstract class AbstractPool< ): void { checkValidWorkerChoiceStrategy(workerChoiceStrategy) this.opts.workerChoiceStrategy = workerChoiceStrategy - this.workerChoiceStrategyContext.setWorkerChoiceStrategy( + this.workerChoiceStrategyContext?.setWorkerChoiceStrategy( this.opts.workerChoiceStrategy ) if (workerChoiceStrategyOptions != null) { @@ -536,14 +554,13 @@ export abstract class AbstractPool< /** @inheritDoc */ public setWorkerChoiceStrategyOptions ( - workerChoiceStrategyOptions: WorkerChoiceStrategyOptions + workerChoiceStrategyOptions: WorkerChoiceStrategyOptions | undefined ): void { this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions) if (workerChoiceStrategyOptions != null) { this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions } - this.workerChoiceStrategyContext.setOptions( - this, + this.workerChoiceStrategyContext?.setOptions( this.opts.workerChoiceStrategyOptions ) } @@ -559,16 +576,19 @@ export abstract class AbstractPool< this.flushTasksQueues() } this.opts.enableTasksQueue = enable - this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions) + this.setTasksQueueOptions(tasksQueueOptions) } /** @inheritDoc */ - public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void { + public setTasksQueueOptions ( + tasksQueueOptions: TasksQueueOptions | undefined + ): void { if (this.opts.enableTasksQueue === true) { checkValidTasksQueueOptions(tasksQueueOptions) this.opts.tasksQueueOptions = this.buildTasksQueueOptions(tasksQueueOptions) - this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number) + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + this.setTasksQueueSize(this.opts.tasksQueueOptions.size!) if (this.opts.tasksQueueOptions.taskStealing === true) { this.unsetTaskStealing() this.setTaskStealing() @@ -587,7 +607,7 @@ export abstract class AbstractPool< } private buildTasksQueueOptions ( - tasksQueueOptions: TasksQueueOptions + tasksQueueOptions: TasksQueueOptions | undefined ): TasksQueueOptions { return { ...getDefaultTasksQueueOptions( @@ -605,18 +625,15 @@ export abstract class AbstractPool< private setTaskStealing (): void { for (const [workerNodeKey] of this.workerNodes.entries()) { - this.workerNodes[workerNodeKey].on( - 'idleWorkerNode', - this.handleIdleWorkerNodeEvent - ) + this.workerNodes[workerNodeKey].on('idle', this.handleWorkerNodeIdleEvent) } } private unsetTaskStealing (): void { for (const [workerNodeKey] of this.workerNodes.entries()) { this.workerNodes[workerNodeKey].off( - 'idleWorkerNode', - this.handleIdleWorkerNodeEvent + 'idle', + this.handleWorkerNodeIdleEvent ) } } @@ -625,7 +642,7 @@ export abstract class AbstractPool< for (const [workerNodeKey] of this.workerNodes.entries()) { this.workerNodes[workerNodeKey].on( 'backPressure', - this.handleBackPressureEvent + this.handleWorkerNodeBackPressureEvent ) } } @@ -634,7 +651,7 @@ export abstract class AbstractPool< for (const [workerNodeKey] of this.workerNodes.entries()) { this.workerNodes[workerNodeKey].off( 'backPressure', - this.handleBackPressureEvent + this.handleWorkerNodeBackPressureEvent ) } } @@ -670,7 +687,8 @@ export abstract class AbstractPool< workerNode => workerNode.info.ready && workerNode.usage.tasks.executing < - (this.opts.tasksQueueOptions?.concurrency as number) + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + this.opts.tasksQueueOptions!.concurrency! ) === -1 ) } @@ -686,7 +704,8 @@ export abstract class AbstractPool< if (this.opts.enableTasksQueue === true) { return ( this.workerNodes[workerNodeKey].usage.tasks.executing >= - (this.opts.tasksQueueOptions?.concurrency as number) + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + this.opts.tasksQueueOptions!.concurrency! ) } return this.workerNodes[workerNodeKey].usage.tasks.executing > 0 @@ -701,21 +720,17 @@ export abstract class AbstractPool< message: MessageValue ): void => { this.checkMessageWorkerId(message) - const workerId = this.getWorkerInfo(workerNodeKey).id as number + const workerId = this.getWorkerInfo(workerNodeKey)?.id if ( message.taskFunctionOperationStatus != null && message.workerId === workerId ) { if (message.taskFunctionOperationStatus) { resolve(true) - } else if (!message.taskFunctionOperationStatus) { + } else { reject( new Error( - `Task function operation '${ - message.taskFunctionOperation as string - }' failed on worker ${message.workerId} with error: '${ - message.workerError?.message as string - }'` + `Task function operation '${message.taskFunctionOperation}' failed on worker ${message.workerId} with error: '${message.workerError?.message}'` ) ) } @@ -763,10 +778,8 @@ export abstract class AbstractPool< new Error( `Task function operation '${ message.taskFunctionOperation as string - }' failed on worker ${ - errorResponse?.workerId as number - } with error: '${ - errorResponse?.workerError?.message as string + }' failed on worker ${errorResponse?.workerId} with error: '${ + errorResponse?.workerError?.message }'` ) ) @@ -871,7 +884,8 @@ export abstract class AbstractPool< return ( this.tasksQueueSize(workerNodeKey) === 0 && this.workerNodes[workerNodeKey].usage.tasks.executing < - (this.opts.tasksQueueOptions?.concurrency as number) + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + this.opts.tasksQueueOptions!.concurrency! ) } @@ -916,7 +930,8 @@ export abstract class AbstractPool< timestamp, taskId: randomUUID() } - this.promiseResponseMap.set(task.taskId as string, { + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + this.promiseResponseMap.set(task.taskId!, { resolve, reject, workerNodeKey, @@ -939,6 +954,23 @@ export abstract class AbstractPool< }) } + /** + * Starts the minimum number of workers. + */ + private startMinimumNumberOfWorkers (): void { + this.startingMinimumNumberOfWorkers = true + while ( + this.workerNodes.reduce( + (accumulator, workerNode) => + !workerNode.info.dynamic ? accumulator + 1 : accumulator, + 0 + ) < this.minimumNumberOfWorkers + ) { + this.createAndSetupWorkerNode() + } + this.startingMinimumNumberOfWorkers = false + } + /** @inheritdoc */ public start (): void { if (this.started) { @@ -951,15 +983,7 @@ export abstract class AbstractPool< throw new Error('Cannot start a destroying pool') } this.starting = true - while ( - this.workerNodes.reduce( - (accumulator, workerNode) => - !workerNode.info.dynamic ? accumulator + 1 : accumulator, - 0 - ) < this.minimumNumberOfWorkers - ) { - this.createAndSetupWorkerNode() - } + this.startMinimumNumberOfWorkers() this.starting = false this.started = true } @@ -983,7 +1007,6 @@ export abstract class AbstractPool< ) this.emitter?.emit(PoolEvents.destroy, this.info) this.emitter?.emitDestroy() - this.emitter?.removeAllListeners() this.readyEventEmitted = false this.destroying = false this.started = false @@ -991,7 +1014,8 @@ export abstract class AbstractPool< private async sendKillMessageToWorker (workerNodeKey: number): Promise { await new Promise((resolve, reject) => { - if (this.workerNodes?.[workerNodeKey] == null) { + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition + if (this.workerNodes[workerNodeKey] == null) { resolve() return } @@ -1002,9 +1026,7 @@ export abstract class AbstractPool< } else if (message.kill === 'failure') { reject( new Error( - `Kill message handling failed on worker ${ - message.workerId as number - }` + `Kill message handling failed on worker ${message.workerId}` ) ) } @@ -1048,7 +1070,9 @@ export abstract class AbstractPool< } /** - * Should return whether the worker is the main worker or not. + * Returns whether the worker is the main worker or not. + * + * @returns `true` if the worker is the main worker, `false` otherwise. */ protected abstract isMain (): boolean @@ -1063,6 +1087,7 @@ export abstract class AbstractPool< workerNodeKey: number, task: Task ): void { + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition if (this.workerNodes[workerNodeKey]?.usage != null) { const workerUsage = this.workerNodes[workerNodeKey].usage ++workerUsage.tasks.executing @@ -1074,13 +1099,15 @@ export abstract class AbstractPool< } if ( this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) && - this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage( - task.name as string - ) != null + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(task.name!) != + null ) { + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion const taskFunctionWorkerUsage = this.workerNodes[ workerNodeKey - ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + ].getTaskFunctionWorkerUsage(task.name!)! ++taskFunctionWorkerUsage.tasks.executing updateWaitTimeWorkerUsage( this.workerChoiceStrategyContext, @@ -1102,6 +1129,7 @@ export abstract class AbstractPool< message: MessageValue ): void { let needWorkerChoiceStrategyUpdate = false + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition if (this.workerNodes[workerNodeKey]?.usage != null) { const workerUsage = this.workerNodes[workerNodeKey].usage updateTaskStatisticsWorkerUsage(workerUsage, message) @@ -1120,14 +1148,15 @@ export abstract class AbstractPool< if ( this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) && this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage( - message.taskPerformance?.name as string + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + message.taskPerformance!.name ) != null ) { + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion const taskFunctionWorkerUsage = this.workerNodes[ workerNodeKey - ].getTaskFunctionWorkerUsage( - message.taskPerformance?.name as string - ) as WorkerUsage + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + ].getTaskFunctionWorkerUsage(message.taskPerformance!.name)! updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message) updateRunTimeWorkerUsage( this.workerChoiceStrategyContext, @@ -1142,7 +1171,7 @@ export abstract class AbstractPool< needWorkerChoiceStrategyUpdate = true } if (needWorkerChoiceStrategyUpdate) { - this.workerChoiceStrategyContext.update(workerNodeKey) + this.workerChoiceStrategyContext?.update(workerNodeKey) } } @@ -1172,12 +1201,14 @@ export abstract class AbstractPool< if (this.shallCreateDynamicWorker()) { const workerNodeKey = this.createAndSetupDynamicWorkerNode() if ( - this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage + this.workerChoiceStrategyContext?.getStrategyPolicy() + .dynamicWorkerUsage === true ) { return workerNodeKey } } - return this.workerChoiceStrategyContext.execute() + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + return this.workerChoiceStrategyContext!.execute() } /** @@ -1219,7 +1250,7 @@ export abstract class AbstractPool< 'error', this.opts.errorHandler ?? EMPTY_FUNCTION ) - workerNode.registerWorkerEventHandler('error', (error: Error) => { + workerNode.registerOnceWorkerEventHandler('error', (error: Error) => { workerNode.info.ready = false this.emitter?.emit(PoolEvents.error, error) if ( @@ -1229,8 +1260,8 @@ export abstract class AbstractPool< ) { if (workerNode.info.dynamic) { this.createAndSetupDynamicWorkerNode() - } else { - this.createAndSetupWorkerNode() + } else if (!this.startingMinimumNumberOfWorkers) { + this.startMinimumNumberOfWorkers() } } if ( @@ -1240,7 +1271,8 @@ export abstract class AbstractPool< ) { this.redistributeQueuedTasks(this.workerNodes.indexOf(workerNode)) } - workerNode?.terminate().catch(error => { + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition + workerNode?.terminate().catch((error: unknown) => { this.emitter?.emit(PoolEvents.error, error) }) }) @@ -1250,6 +1282,13 @@ export abstract class AbstractPool< ) workerNode.registerOnceWorkerEventHandler('exit', () => { this.removeWorkerNode(workerNode) + if ( + this.started && + !this.startingMinimumNumberOfWorkers && + !this.destroying + ) { + this.startMinimumNumberOfWorkers() + } }) const workerNodeKey = this.addWorkerNode(workerNode) this.afterWorkerNodeSetup(workerNodeKey) @@ -1268,7 +1307,7 @@ export abstract class AbstractPool< const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId( message.workerId ) - const workerUsage = this.workerNodes[localWorkerNodeKey].usage + const workerUsage = this.workerNodes[localWorkerNodeKey]?.usage // Kill message received from worker if ( isKillBehavior(KillBehaviors.HARD, message.kill) || @@ -1281,12 +1320,11 @@ export abstract class AbstractPool< ) { // Flag the worker node as not ready immediately this.flagWorkerNodeAsNotReady(localWorkerNodeKey) - this.destroyWorkerNode(localWorkerNodeKey).catch(error => { + this.destroyWorkerNode(localWorkerNodeKey).catch((error: unknown) => { this.emitter?.emit(PoolEvents.error, error) }) } }) - const workerInfo = this.getWorkerInfo(workerNodeKey) this.sendToWorker(workerNodeKey, { checkActive: true }) @@ -1296,17 +1334,20 @@ export abstract class AbstractPool< taskFunctionOperation: 'add', taskFunctionName, taskFunction: taskFunction.toString() - }).catch(error => { + }).catch((error: unknown) => { this.emitter?.emit(PoolEvents.error, error) }) } } - workerInfo.dynamic = true + const workerNode = this.workerNodes[workerNodeKey] + workerNode.info.dynamic = true if ( - this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady || - this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage + this.workerChoiceStrategyContext?.getStrategyPolicy() + .dynamicWorkerReady === true || + this.workerChoiceStrategyContext?.getStrategyPolicy() + .dynamicWorkerUsage === true ) { - workerInfo.ready = true + workerNode.info.ready = true } this.checkAndEmitDynamicWorkerCreationEvents() return workerNodeKey @@ -1370,14 +1411,14 @@ export abstract class AbstractPool< if (this.opts.enableTasksQueue === true) { if (this.opts.tasksQueueOptions?.taskStealing === true) { this.workerNodes[workerNodeKey].on( - 'idleWorkerNode', - this.handleIdleWorkerNodeEvent + 'idle', + this.handleWorkerNodeIdleEvent ) } if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) { this.workerNodes[workerNodeKey].on( 'backPressure', - this.handleBackPressureEvent + this.handleWorkerNodeBackPressureEvent ) } } @@ -1399,10 +1440,11 @@ export abstract class AbstractPool< this.sendToWorker(workerNodeKey, { statistics: { runTime: - this.workerChoiceStrategyContext.getTaskStatisticsRequirements() - .runTime.aggregate, - elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements() - .elu.aggregate + this.workerChoiceStrategyContext?.getTaskStatisticsRequirements() + .runTime.aggregate ?? false, + elu: + this.workerChoiceStrategyContext?.getTaskStatisticsRequirements().elu + .aggregate ?? false } }) } @@ -1420,10 +1462,7 @@ export abstract class AbstractPool< } private redistributeQueuedTasks (workerNodeKey: number): void { - if (workerNodeKey === -1) { - return - } - if (this.cannotStealTask()) { + if (workerNodeKey === -1 || this.cannotStealTask()) { return } while (this.tasksQueueSize(workerNodeKey) > 0) { @@ -1439,7 +1478,8 @@ export abstract class AbstractPool< ) this.handleTask( destinationWorkerNodeKey, - this.dequeueTask(workerNodeKey) as Task + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + this.dequeueTask(workerNodeKey)! ) } } @@ -1449,6 +1489,7 @@ export abstract class AbstractPool< taskName: string ): void { const workerNode = this.workerNodes[workerNodeKey] + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition if (workerNode?.usage != null) { ++workerNode.usage.tasks.stolen } @@ -1456,9 +1497,9 @@ export abstract class AbstractPool< this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) && workerNode.getTaskFunctionWorkerUsage(taskName) != null ) { - const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage( - taskName - ) as WorkerUsage + const taskFunctionWorkerUsage = + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + workerNode.getTaskFunctionWorkerUsage(taskName)! ++taskFunctionWorkerUsage.tasks.stolen } } @@ -1467,6 +1508,7 @@ export abstract class AbstractPool< workerNodeKey: number ): void { const workerNode = this.workerNodes[workerNodeKey] + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition if (workerNode?.usage != null) { ++workerNode.usage.tasks.sequentiallyStolen } @@ -1481,9 +1523,9 @@ export abstract class AbstractPool< this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) && workerNode.getTaskFunctionWorkerUsage(taskName) != null ) { - const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage( - taskName - ) as WorkerUsage + const taskFunctionWorkerUsage = + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + workerNode.getTaskFunctionWorkerUsage(taskName)! ++taskFunctionWorkerUsage.tasks.sequentiallyStolen } } @@ -1492,6 +1534,7 @@ export abstract class AbstractPool< workerNodeKey: number ): void { const workerNode = this.workerNodes[workerNodeKey] + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition if (workerNode?.usage != null) { workerNode.usage.tasks.sequentiallyStolen = 0 } @@ -1506,43 +1549,45 @@ export abstract class AbstractPool< this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) && workerNode.getTaskFunctionWorkerUsage(taskName) != null ) { - const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage( - taskName - ) as WorkerUsage + const taskFunctionWorkerUsage = + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + workerNode.getTaskFunctionWorkerUsage(taskName)! taskFunctionWorkerUsage.tasks.sequentiallyStolen = 0 } } - private readonly handleIdleWorkerNodeEvent = ( + private readonly handleWorkerNodeIdleEvent = ( eventDetail: WorkerNodeEventDetail, previousStolenTask?: Task ): void => { const { workerNodeKey } = eventDetail if (workerNodeKey == null) { throw new Error( - 'WorkerNode event detail workerNodeKey property must be defined' + "WorkerNode event detail 'workerNodeKey' property must be defined" ) } + const workerInfo = this.getWorkerInfo(workerNodeKey) if ( this.cannotStealTask() || - (this.info.stealingWorkerNodes as number) > + (this.info.stealingWorkerNodes ?? 0) > Math.floor(this.workerNodes.length / 2) ) { - if (previousStolenTask != null) { - this.getWorkerInfo(workerNodeKey).stealing = false + if (workerInfo != null && previousStolenTask != null) { + workerInfo.stealing = false } return } const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks if ( + workerInfo != null && previousStolenTask != null && workerNodeTasksUsage.sequentiallyStolen > 0 && (workerNodeTasksUsage.executing > 0 || this.tasksQueueSize(workerNodeKey) > 0) ) { - this.getWorkerInfo(workerNodeKey).stealing = false - for (const taskName of this.workerNodes[workerNodeKey].info - .taskFunctionNames as string[]) { + workerInfo.stealing = false + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + for (const taskName of workerInfo.taskFunctionNames!) { this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage( workerNodeKey, taskName @@ -1551,16 +1596,22 @@ export abstract class AbstractPool< this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey) return } - this.getWorkerInfo(workerNodeKey).stealing = true + if (workerInfo == null) { + throw new Error( + `Worker node with key '${workerNodeKey}' not found in pool` + ) + } + workerInfo.stealing = true const stolenTask = this.workerNodeStealTask(workerNodeKey) if ( this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) && stolenTask != null ) { + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion const taskFunctionTasksWorkerUsage = this.workerNodes[ workerNodeKey - ].getTaskFunctionWorkerUsage(stolenTask.name as string) - ?.tasks as TaskStatistics + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + ].getTaskFunctionWorkerUsage(stolenTask.name!)!.tasks if ( taskFunctionTasksWorkerUsage.sequentiallyStolen === 0 || (previousStolenTask != null && @@ -1569,21 +1620,25 @@ export abstract class AbstractPool< ) { this.updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage( workerNodeKey, - stolenTask.name as string + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + stolenTask.name! ) } else { this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage( workerNodeKey, - stolenTask.name as string + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + stolenTask.name! ) } } sleep(exponentialDelay(workerNodeTasksUsage.sequentiallyStolen)) .then(() => { - this.handleIdleWorkerNodeEvent(eventDetail, stolenTask) + this.handleWorkerNodeIdleEvent(eventDetail, stolenTask) return undefined }) - .catch(EMPTY_FUNCTION) + .catch((error: unknown) => { + this.emitter?.emit(PoolEvents.error, error) + }) } private readonly workerNodeStealTask = ( @@ -1603,30 +1658,30 @@ export abstract class AbstractPool< sourceWorkerNode.usage.tasks.queued > 0 ) if (sourceWorkerNode != null) { - const task = sourceWorkerNode.popTask() as Task + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + const task = sourceWorkerNode.popTask()! this.handleTask(workerNodeKey, task) this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey) - this.updateTaskStolenStatisticsWorkerUsage( - workerNodeKey, - task.name as string - ) + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey, task.name!) return task } } - private readonly handleBackPressureEvent = ( + private readonly handleWorkerNodeBackPressureEvent = ( eventDetail: WorkerNodeEventDetail ): void => { if ( this.cannotStealTask() || - (this.info.stealingWorkerNodes as number) > + (this.info.stealingWorkerNodes ?? 0) > Math.floor(this.workerNodes.length / 2) ) { return } const { workerId } = eventDetail const sizeOffset = 1 - if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) { + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + if (this.opts.tasksQueueOptions!.size! <= sizeOffset) { return } const sourceWorkerNode = @@ -1644,16 +1699,22 @@ export abstract class AbstractPool< !workerNode.info.stealing && workerNode.info.id !== workerId && workerNode.usage.tasks.queued < - (this.opts.tasksQueueOptions?.size as number) - sizeOffset + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + this.opts.tasksQueueOptions!.size! - sizeOffset ) { - this.getWorkerInfo(workerNodeKey).stealing = true - const task = sourceWorkerNode.popTask() as Task + const workerInfo = this.getWorkerInfo(workerNodeKey) + if (workerInfo == null) { + throw new Error( + `Worker node with key '${workerNodeKey}' not found in pool` + ) + } + workerInfo.stealing = true + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + const task = sourceWorkerNode.popTask()! this.handleTask(workerNodeKey, task) - this.updateTaskStolenStatisticsWorkerUsage( - workerNodeKey, - task.name as string - ) - this.getWorkerInfo(workerNodeKey).stealing = false + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey, task.name!) + workerInfo.stealing = false } } } @@ -1674,31 +1735,38 @@ export abstract class AbstractPool< this.handleTaskExecutionResponse(message) } else if (taskFunctionNames != null) { // Task function names message received from worker - this.getWorkerInfo( + const workerInfo = this.getWorkerInfo( this.getWorkerNodeKeyByWorkerId(workerId) - ).taskFunctionNames = taskFunctionNames + ) + if (workerInfo != null) { + workerInfo.taskFunctionNames = taskFunctionNames + } } } - private handleWorkerReadyResponse (message: MessageValue): void { - const { workerId, ready, taskFunctionNames } = message - if (ready === false) { - throw new Error(`Worker ${workerId as number} failed to initialize`) - } - const workerInfo = this.getWorkerInfo( - this.getWorkerNodeKeyByWorkerId(workerId) - ) - workerInfo.ready = ready as boolean - workerInfo.taskFunctionNames = taskFunctionNames + private checkAndEmitReadyEvent (): void { if (!this.readyEventEmitted && this.ready) { - this.readyEventEmitted = true this.emitter?.emit(PoolEvents.ready, this.info) + this.readyEventEmitted = true + } + } + + private handleWorkerReadyResponse (message: MessageValue): void { + const { workerId, ready, taskFunctionNames } = message + if (ready == null || !ready) { + throw new Error(`Worker ${workerId} failed to initialize`) } + const workerNode = + this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)] + workerNode.info.ready = ready + workerNode.info.taskFunctionNames = taskFunctionNames + this.checkAndEmitReadyEvent() } private handleTaskExecutionResponse (message: MessageValue): void { const { workerId, taskId, workerError, data } = message - const promiseResponse = this.promiseResponseMap.get(taskId as string) + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + const promiseResponse = this.promiseResponseMap.get(taskId!) if (promiseResponse != null) { const { resolve, reject, workerNodeKey, asyncResource } = promiseResponse const workerNode = this.workerNodes[workerNodeKey] @@ -1718,27 +1786,33 @@ export abstract class AbstractPool< } asyncResource?.emitDestroy() this.afterTaskExecutionHook(workerNodeKey, message) - this.promiseResponseMap.delete(taskId as string) + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + this.promiseResponseMap.delete(taskId!) + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition workerNode?.emit('taskFinished', taskId) - if (this.opts.enableTasksQueue === true && !this.destroying) { + if ( + this.opts.enableTasksQueue === true && + !this.destroying && + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition + workerNode != null + ) { const workerNodeTasksUsage = workerNode.usage.tasks if ( this.tasksQueueSize(workerNodeKey) > 0 && workerNodeTasksUsage.executing < - (this.opts.tasksQueueOptions?.concurrency as number) + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + this.opts.tasksQueueOptions!.concurrency! ) { - this.executeTask( - workerNodeKey, - this.dequeueTask(workerNodeKey) as Task - ) + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + this.executeTask(workerNodeKey, this.dequeueTask(workerNodeKey)!) } if ( workerNodeTasksUsage.executing === 0 && this.tasksQueueSize(workerNodeKey) === 0 && workerNodeTasksUsage.sequentiallyStolen === 0 ) { - workerNode.emit('idleWorkerNode', { - workerId: workerId as number, + workerNode.emit('idle', { + workerId, workerNodeKey }) } @@ -1769,7 +1843,7 @@ export abstract class AbstractPool< * @param workerNodeKey - The worker node key. * @returns The worker information. */ - protected getWorkerInfo (workerNodeKey: number): WorkerInfo { + protected getWorkerInfo (workerNodeKey: number): WorkerInfo | undefined { return this.workerNodes[workerNodeKey]?.info } @@ -1815,6 +1889,13 @@ export abstract class AbstractPool< return workerNodeKey } + private checkAndEmitEmptyEvent (): void { + if (this.empty) { + this.emitter?.emit(PoolEvents.empty, this.info) + this.readyEventEmitted = false + } + } + /** * Removes the worker node from the pool worker nodes. * @@ -1824,20 +1905,16 @@ export abstract class AbstractPool< const workerNodeKey = this.workerNodes.indexOf(workerNode) if (workerNodeKey !== -1) { this.workerNodes.splice(workerNodeKey, 1) - this.workerChoiceStrategyContext.remove(workerNodeKey) + this.workerChoiceStrategyContext?.remove(workerNodeKey) } + this.checkAndEmitEmptyEvent() } protected flagWorkerNodeAsNotReady (workerNodeKey: number): void { - this.getWorkerInfo(workerNodeKey).ready = false - } - - /** @inheritDoc */ - public hasWorkerNodeBackPressure (workerNodeKey: number): boolean { - return ( - this.opts.enableTasksQueue === true && - this.workerNodes[workerNodeKey].hasBackPressure() - ) + const workerInfo = this.getWorkerInfo(workerNodeKey) + if (workerInfo != null) { + workerInfo.ready = false + } } private hasBackPressure (): boolean { @@ -1878,10 +1955,8 @@ export abstract class AbstractPool< protected flushTasksQueue (workerNodeKey: number): number { let flushedTasks = 0 while (this.tasksQueueSize(workerNodeKey) > 0) { - this.executeTask( - workerNodeKey, - this.dequeueTask(workerNodeKey) as Task - ) + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + this.executeTask(workerNodeKey, this.dequeueTask(workerNodeKey)!) ++flushedTasks } this.workerNodes[workerNodeKey].clearTasksQueue()