X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=8e32218dfd521567663a27b09db1b2747d74b582;hb=1087637e65991bbcf07248b277d0fa99594a1528;hp=7b18d3556c72e6aee4629ae9831452325ea9d087;hpb=0d03353828857a8d0eb17b318fd817ffb873fc40;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 7b18d355..8e32218d 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -7,7 +7,7 @@ import type { MessageValue, PromiseResponseWrapper, Task -} from '../utility-types' +} from '../utility-types.js' import { DEFAULT_TASK_NAME, EMPTY_FUNCTION, @@ -20,9 +20,9 @@ import { min, round, sleep -} from '../utils' -import { KillBehaviors } from '../worker/worker-options' -import type { TaskFunction } from '../worker/task-functions' +} from '../utils.js' +import { KillBehaviors } from '../worker/worker-options.js' +import type { TaskFunction } from '../worker/task-functions.js' import { type IPool, PoolEvents, @@ -31,25 +31,23 @@ import { type PoolType, PoolTypes, type TasksQueueOptions -} from './pool' +} from './pool.js' import type { IWorker, IWorkerNode, - TaskStatistics, WorkerInfo, WorkerNodeEventDetail, - WorkerType, - WorkerUsage -} from './worker' + WorkerType +} from './worker.js' import { Measurements, WorkerChoiceStrategies, type WorkerChoiceStrategy, type WorkerChoiceStrategyOptions -} from './selection-strategies/selection-strategies-types' -import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context' -import { version } from './version' -import { WorkerNode } from './worker-node' +} from './selection-strategies/selection-strategies-types.js' +import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context.js' +import { version } from './version.js' +import { WorkerNode } from './worker-node.js' import { checkFilePath, checkValidTasksQueueOptions, @@ -60,7 +58,7 @@ import { updateTaskStatisticsWorkerUsage, updateWaitTimeWorkerUsage, waitWorkerNodeEvents -} from './utils' +} from './utils.js' /** * Base class that implements some shared logic for all poolifier pools. @@ -212,13 +210,13 @@ export abstract class AbstractPool< private checkPoolOptions (opts: PoolOptions): void { if (isPlainObject(opts)) { this.opts.startWorkers = opts.startWorkers ?? true - checkValidWorkerChoiceStrategy( - opts.workerChoiceStrategy as WorkerChoiceStrategy - ) + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + checkValidWorkerChoiceStrategy(opts.workerChoiceStrategy!) this.opts.workerChoiceStrategy = opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN this.checkValidWorkerChoiceStrategyOptions( - opts.workerChoiceStrategyOptions as WorkerChoiceStrategyOptions + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + opts.workerChoiceStrategyOptions! ) if (opts.workerChoiceStrategyOptions != null) { this.opts.workerChoiceStrategyOptions = opts.workerChoiceStrategyOptions @@ -227,9 +225,11 @@ export abstract class AbstractPool< this.opts.enableEvents = opts.enableEvents ?? true this.opts.enableTasksQueue = opts.enableTasksQueue ?? false if (this.opts.enableTasksQueue) { - checkValidTasksQueueOptions(opts.tasksQueueOptions as TasksQueueOptions) + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + checkValidTasksQueueOptions(opts.tasksQueueOptions!) this.opts.tasksQueueOptions = this.buildTasksQueueOptions( - opts.tasksQueueOptions as TasksQueueOptions + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + opts.tasksQueueOptions! ) } } else { @@ -283,7 +283,8 @@ export abstract class AbstractPool< worker: this.worker, started: this.started, ready: this.ready, - strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy, + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + strategy: this.opts.workerChoiceStrategy!, minSize: this.minimumNumberOfWorkers, maxSize: this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers, ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements() @@ -543,7 +544,6 @@ export abstract class AbstractPool< this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions } this.workerChoiceStrategyContext.setOptions( - this, this.opts.workerChoiceStrategyOptions ) } @@ -559,7 +559,8 @@ export abstract class AbstractPool< this.flushTasksQueues() } this.opts.enableTasksQueue = enable - this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions) + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + this.setTasksQueueOptions(tasksQueueOptions!) } /** @inheritDoc */ @@ -568,7 +569,8 @@ export abstract class AbstractPool< checkValidTasksQueueOptions(tasksQueueOptions) this.opts.tasksQueueOptions = this.buildTasksQueueOptions(tasksQueueOptions) - this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number) + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + this.setTasksQueueSize(this.opts.tasksQueueOptions.size!) if (this.opts.tasksQueueOptions.taskStealing === true) { this.unsetTaskStealing() this.setTaskStealing() @@ -605,18 +607,15 @@ export abstract class AbstractPool< private setTaskStealing (): void { for (const [workerNodeKey] of this.workerNodes.entries()) { - this.workerNodes[workerNodeKey].on( - 'idleWorkerNode', - this.handleIdleWorkerNodeEvent - ) + this.workerNodes[workerNodeKey].on('idle', this.handleWorkerNodeIdleEvent) } } private unsetTaskStealing (): void { for (const [workerNodeKey] of this.workerNodes.entries()) { this.workerNodes[workerNodeKey].off( - 'idleWorkerNode', - this.handleIdleWorkerNodeEvent + 'idle', + this.handleWorkerNodeIdleEvent ) } } @@ -625,7 +624,7 @@ export abstract class AbstractPool< for (const [workerNodeKey] of this.workerNodes.entries()) { this.workerNodes[workerNodeKey].on( 'backPressure', - this.handleBackPressureEvent + this.handleWorkerNodeBackPressureEvent ) } } @@ -634,7 +633,7 @@ export abstract class AbstractPool< for (const [workerNodeKey] of this.workerNodes.entries()) { this.workerNodes[workerNodeKey].off( 'backPressure', - this.handleBackPressureEvent + this.handleWorkerNodeBackPressureEvent ) } } @@ -670,7 +669,8 @@ export abstract class AbstractPool< workerNode => workerNode.info.ready && workerNode.usage.tasks.executing < - (this.opts.tasksQueueOptions?.concurrency as number) + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + this.opts.tasksQueueOptions!.concurrency! ) === -1 ) } @@ -686,7 +686,8 @@ export abstract class AbstractPool< if (this.opts.enableTasksQueue === true) { return ( this.workerNodes[workerNodeKey].usage.tasks.executing >= - (this.opts.tasksQueueOptions?.concurrency as number) + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + this.opts.tasksQueueOptions!.concurrency! ) } return this.workerNodes[workerNodeKey].usage.tasks.executing > 0 @@ -701,7 +702,8 @@ export abstract class AbstractPool< message: MessageValue ): void => { this.checkMessageWorkerId(message) - const workerId = this.getWorkerInfo(workerNodeKey).id as number + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + const workerId = this.getWorkerInfo(workerNodeKey).id! if ( message.taskFunctionOperationStatus != null && message.workerId === workerId @@ -713,8 +715,10 @@ export abstract class AbstractPool< new Error( `Task function operation '${ message.taskFunctionOperation as string + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion }' failed on worker ${message.workerId} with error: '${ - message.workerError?.message as string + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + message.workerError!.message }'` ) ) @@ -763,10 +767,11 @@ export abstract class AbstractPool< new Error( `Task function operation '${ message.taskFunctionOperation as string - }' failed on worker ${ - errorResponse?.workerId as number - } with error: '${ - errorResponse?.workerError?.message as string + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + }' failed on worker ${errorResponse! + .workerId!} with error: '${ + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + errorResponse!.workerError!.message }'` ) ) @@ -871,7 +876,8 @@ export abstract class AbstractPool< return ( this.tasksQueueSize(workerNodeKey) === 0 && this.workerNodes[workerNodeKey].usage.tasks.executing < - (this.opts.tasksQueueOptions?.concurrency as number) + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + this.opts.tasksQueueOptions!.concurrency! ) } @@ -916,7 +922,8 @@ export abstract class AbstractPool< timestamp, taskId: randomUUID() } - this.promiseResponseMap.set(task.taskId as string, { + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + this.promiseResponseMap.set(task.taskId!, { resolve, reject, workerNodeKey, @@ -1002,9 +1009,8 @@ export abstract class AbstractPool< } else if (message.kill === 'failure') { reject( new Error( - `Kill message handling failed on worker ${ - message.workerId as number - }` + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + `Kill message handling failed on worker ${message.workerId!}` ) ) } @@ -1074,13 +1080,15 @@ export abstract class AbstractPool< } if ( this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) && - this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage( - task.name as string - ) != null + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(task.name!) != + null ) { + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion const taskFunctionWorkerUsage = this.workerNodes[ workerNodeKey - ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + ].getTaskFunctionWorkerUsage(task.name!)! ++taskFunctionWorkerUsage.tasks.executing updateWaitTimeWorkerUsage( this.workerChoiceStrategyContext, @@ -1120,14 +1128,15 @@ export abstract class AbstractPool< if ( this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) && this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage( - message.taskPerformance?.name as string + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + message.taskPerformance!.name ) != null ) { + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion const taskFunctionWorkerUsage = this.workerNodes[ workerNodeKey - ].getTaskFunctionWorkerUsage( - message.taskPerformance?.name as string - ) as WorkerUsage + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + ].getTaskFunctionWorkerUsage(message.taskPerformance!.name)! updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message) updateRunTimeWorkerUsage( this.workerChoiceStrategyContext, @@ -1286,7 +1295,6 @@ export abstract class AbstractPool< }) } }) - const workerInfo = this.getWorkerInfo(workerNodeKey) this.sendToWorker(workerNodeKey, { checkActive: true }) @@ -1301,12 +1309,13 @@ export abstract class AbstractPool< }) } } - workerInfo.dynamic = true + const workerNode = this.workerNodes[workerNodeKey] + workerNode.info.dynamic = true if ( this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady || this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage ) { - workerInfo.ready = true + workerNode.info.ready = true } this.checkAndEmitDynamicWorkerCreationEvents() return workerNodeKey @@ -1370,14 +1379,14 @@ export abstract class AbstractPool< if (this.opts.enableTasksQueue === true) { if (this.opts.tasksQueueOptions?.taskStealing === true) { this.workerNodes[workerNodeKey].on( - 'idleWorkerNode', - this.handleIdleWorkerNodeEvent + 'idle', + this.handleWorkerNodeIdleEvent ) } if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) { this.workerNodes[workerNodeKey].on( 'backPressure', - this.handleBackPressureEvent + this.handleWorkerNodeBackPressureEvent ) } } @@ -1436,7 +1445,8 @@ export abstract class AbstractPool< ) this.handleTask( destinationWorkerNodeKey, - this.dequeueTask(workerNodeKey) as Task + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + this.dequeueTask(workerNodeKey)! ) } } @@ -1453,9 +1463,9 @@ export abstract class AbstractPool< this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) && workerNode.getTaskFunctionWorkerUsage(taskName) != null ) { - const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage( - taskName - ) as WorkerUsage + const taskFunctionWorkerUsage = + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + workerNode.getTaskFunctionWorkerUsage(taskName)! ++taskFunctionWorkerUsage.tasks.stolen } } @@ -1478,9 +1488,9 @@ export abstract class AbstractPool< this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) && workerNode.getTaskFunctionWorkerUsage(taskName) != null ) { - const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage( - taskName - ) as WorkerUsage + const taskFunctionWorkerUsage = + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + workerNode.getTaskFunctionWorkerUsage(taskName)! ++taskFunctionWorkerUsage.tasks.sequentiallyStolen } } @@ -1503,14 +1513,14 @@ export abstract class AbstractPool< this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) && workerNode.getTaskFunctionWorkerUsage(taskName) != null ) { - const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage( - taskName - ) as WorkerUsage + const taskFunctionWorkerUsage = + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + workerNode.getTaskFunctionWorkerUsage(taskName)! taskFunctionWorkerUsage.tasks.sequentiallyStolen = 0 } } - private readonly handleIdleWorkerNodeEvent = ( + private readonly handleWorkerNodeIdleEvent = ( eventDetail: WorkerNodeEventDetail, previousStolenTask?: Task ): void => { @@ -1522,8 +1532,8 @@ export abstract class AbstractPool< } if ( this.cannotStealTask() || - (this.info.stealingWorkerNodes as number) > - Math.floor(this.workerNodes.length / 2) + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + this.info.stealingWorkerNodes! > Math.floor(this.workerNodes.length / 2) ) { if (previousStolenTask != null) { this.getWorkerInfo(workerNodeKey).stealing = false @@ -1538,8 +1548,9 @@ export abstract class AbstractPool< this.tasksQueueSize(workerNodeKey) > 0) ) { this.getWorkerInfo(workerNodeKey).stealing = false + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion for (const taskName of this.workerNodes[workerNodeKey].info - .taskFunctionNames as string[]) { + .taskFunctionNames!) { this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage( workerNodeKey, taskName @@ -1554,10 +1565,11 @@ export abstract class AbstractPool< this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) && stolenTask != null ) { + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion const taskFunctionTasksWorkerUsage = this.workerNodes[ workerNodeKey - ].getTaskFunctionWorkerUsage(stolenTask.name as string) - ?.tasks as TaskStatistics + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + ].getTaskFunctionWorkerUsage(stolenTask.name!)!.tasks if ( taskFunctionTasksWorkerUsage.sequentiallyStolen === 0 || (previousStolenTask != null && @@ -1566,18 +1578,20 @@ export abstract class AbstractPool< ) { this.updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage( workerNodeKey, - stolenTask.name as string + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + stolenTask.name! ) } else { this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage( workerNodeKey, - stolenTask.name as string + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + stolenTask.name! ) } } sleep(exponentialDelay(workerNodeTasksUsage.sequentiallyStolen)) .then(() => { - this.handleIdleWorkerNodeEvent(eventDetail, stolenTask) + this.handleWorkerNodeIdleEvent(eventDetail, stolenTask) return undefined }) .catch(EMPTY_FUNCTION) @@ -1600,30 +1614,30 @@ export abstract class AbstractPool< sourceWorkerNode.usage.tasks.queued > 0 ) if (sourceWorkerNode != null) { - const task = sourceWorkerNode.popTask() as Task + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + const task = sourceWorkerNode.popTask()! this.handleTask(workerNodeKey, task) this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey) - this.updateTaskStolenStatisticsWorkerUsage( - workerNodeKey, - task.name as string - ) + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey, task.name!) return task } } - private readonly handleBackPressureEvent = ( + private readonly handleWorkerNodeBackPressureEvent = ( eventDetail: WorkerNodeEventDetail ): void => { if ( this.cannotStealTask() || - (this.info.stealingWorkerNodes as number) > - Math.floor(this.workerNodes.length / 2) + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + this.info.stealingWorkerNodes! > Math.floor(this.workerNodes.length / 2) ) { return } const { workerId } = eventDetail const sizeOffset = 1 - if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) { + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + if (this.opts.tasksQueueOptions!.size! <= sizeOffset) { return } const sourceWorkerNode = @@ -1641,15 +1655,15 @@ export abstract class AbstractPool< !workerNode.info.stealing && workerNode.info.id !== workerId && workerNode.usage.tasks.queued < - (this.opts.tasksQueueOptions?.size as number) - sizeOffset + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + this.opts.tasksQueueOptions!.size! - sizeOffset ) { this.getWorkerInfo(workerNodeKey).stealing = true - const task = sourceWorkerNode.popTask() as Task + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + const task = sourceWorkerNode.popTask()! this.handleTask(workerNodeKey, task) - this.updateTaskStolenStatisticsWorkerUsage( - workerNodeKey, - task.name as string - ) + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey, task.name!) this.getWorkerInfo(workerNodeKey).stealing = false } } @@ -1679,23 +1693,24 @@ export abstract class AbstractPool< private handleWorkerReadyResponse (message: MessageValue): void { const { workerId, ready, taskFunctionNames } = message - if (ready === false) { - throw new Error(`Worker ${workerId as number} failed to initialize`) + if (ready == null || !ready) { + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + throw new Error(`Worker ${workerId!} failed to initialize`) } - const workerInfo = this.getWorkerInfo( - this.getWorkerNodeKeyByWorkerId(workerId) - ) - workerInfo.ready = ready as boolean - workerInfo.taskFunctionNames = taskFunctionNames + const workerNode = + this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)] + workerNode.info.ready = ready + workerNode.info.taskFunctionNames = taskFunctionNames if (!this.readyEventEmitted && this.ready) { - this.readyEventEmitted = true this.emitter?.emit(PoolEvents.ready, this.info) + this.readyEventEmitted = true } } private handleTaskExecutionResponse (message: MessageValue): void { const { workerId, taskId, workerError, data } = message - const promiseResponse = this.promiseResponseMap.get(taskId as string) + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + const promiseResponse = this.promiseResponseMap.get(taskId!) if (promiseResponse != null) { const { resolve, reject, workerNodeKey, asyncResource } = promiseResponse const workerNode = this.workerNodes[workerNodeKey] @@ -1715,27 +1730,28 @@ export abstract class AbstractPool< } asyncResource?.emitDestroy() this.afterTaskExecutionHook(workerNodeKey, message) - this.promiseResponseMap.delete(taskId as string) + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + this.promiseResponseMap.delete(taskId!) workerNode?.emit('taskFinished', taskId) if (this.opts.enableTasksQueue === true && !this.destroying) { const workerNodeTasksUsage = workerNode.usage.tasks if ( this.tasksQueueSize(workerNodeKey) > 0 && workerNodeTasksUsage.executing < - (this.opts.tasksQueueOptions?.concurrency as number) + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + this.opts.tasksQueueOptions!.concurrency! ) { - this.executeTask( - workerNodeKey, - this.dequeueTask(workerNodeKey) as Task - ) + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + this.executeTask(workerNodeKey, this.dequeueTask(workerNodeKey)!) } if ( workerNodeTasksUsage.executing === 0 && this.tasksQueueSize(workerNodeKey) === 0 && workerNodeTasksUsage.sequentiallyStolen === 0 ) { - workerNode.emit('idleWorkerNode', { - workerId: workerId as number, + workerNode.emit('idle', { + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + workerId: workerId!, workerNodeKey }) } @@ -1829,14 +1845,6 @@ export abstract class AbstractPool< this.getWorkerInfo(workerNodeKey).ready = false } - /** @inheritDoc */ - public hasWorkerNodeBackPressure (workerNodeKey: number): boolean { - return ( - this.opts.enableTasksQueue === true && - this.workerNodes[workerNodeKey].hasBackPressure() - ) - } - private hasBackPressure (): boolean { return ( this.opts.enableTasksQueue === true && @@ -1875,10 +1883,8 @@ export abstract class AbstractPool< protected flushTasksQueue (workerNodeKey: number): number { let flushedTasks = 0 while (this.tasksQueueSize(workerNodeKey) > 0) { - this.executeTask( - workerNodeKey, - this.dequeueTask(workerNodeKey) as Task - ) + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + this.executeTask(workerNodeKey, this.dequeueTask(workerNodeKey)!) ++flushedTasks } this.workerNodes[workerNodeKey].clearTasksQueue()