X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=f0119f4ed3c8135e596d171a0f6ec093d7b29935;hb=c3eca146177be73305c08b03e16dccf531855e04;hp=dc1fc53f90bfbda6031a56cfd2c92e62867f07ec;hpb=5137e2ae971a1af7de3afc12d4e2c6f5959199f4;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index dc1fc53f..f0119f4e 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -84,6 +84,11 @@ export abstract class AbstractPool< Response > + /** + * Dynamic pool maximum size property placeholder. + */ + protected readonly max?: number + /** * Whether the pool is starting or not. */ @@ -117,8 +122,6 @@ export abstract class AbstractPool< this.chooseWorkerNode = this.chooseWorkerNode.bind(this) this.executeTask = this.executeTask.bind(this) this.enqueueTask = this.enqueueTask.bind(this) - this.dequeueTask = this.dequeueTask.bind(this) - this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this) if (this.opts.enableEvents === true) { this.emitter = new PoolEmitter() @@ -176,7 +179,7 @@ export abstract class AbstractPool< protected checkDynamicPoolSize (min: number, max: number): void { if (this.type === PoolTypes.dynamic) { if (max == null) { - throw new Error( + throw new TypeError( 'Cannot instantiate a dynamic pool without specifying the maximum pool size' ) } else if (!Number.isSafeInteger(max)) { @@ -292,15 +295,31 @@ export abstract class AbstractPool< !Number.isSafeInteger(tasksQueueOptions.concurrency) ) { throw new TypeError( - 'Invalid worker tasks concurrency: must be an integer' + 'Invalid worker node tasks concurrency: must be an integer' ) } if ( tasksQueueOptions?.concurrency != null && tasksQueueOptions.concurrency <= 0 ) { - throw new Error( - `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}' is a negative integer or zero` + throw new RangeError( + `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero` + ) + } + if ( + tasksQueueOptions?.queueMaxSize != null && + !Number.isSafeInteger(tasksQueueOptions.queueMaxSize) + ) { + throw new TypeError( + 'Invalid worker node tasks queue max size: must be an integer' + ) + } + if ( + tasksQueueOptions?.queueMaxSize != null && + tasksQueueOptions.queueMaxSize <= 0 + ) { + throw new RangeError( + `Invalid worker node tasks queue max size: ${tasksQueueOptions.queueMaxSize} is a negative integer or zero` ) } } @@ -368,6 +387,9 @@ export abstract class AbstractPool< 0 ) }), + ...(this.opts.enableTasksQueue === true && { + backPressure: this.hasBackPressure() + }), failedTasks: this.workerNodes.reduce( (accumulator, workerNode) => accumulator + workerNode.usage.tasks.failed, @@ -509,12 +531,16 @@ export abstract class AbstractPool< /** * The pool minimum size. */ - protected abstract get minSize (): number + protected get minSize (): number { + return this.numberOfWorkers + } /** * The pool maximum size. */ - protected abstract get maxSize (): number + protected get maxSize (): number { + return this.max ?? this.numberOfWorkers + } /** * Checks if the worker id sent in the received message from a worker is valid. @@ -610,16 +636,29 @@ export abstract class AbstractPool< this.checkValidTasksQueueOptions(tasksQueueOptions) this.opts.tasksQueueOptions = this.buildTasksQueueOptions(tasksQueueOptions) + this.setTasksQueueMaxSize( + this.opts.tasksQueueOptions.queueMaxSize as number + ) } else if (this.opts.tasksQueueOptions != null) { delete this.opts.tasksQueueOptions } } + private setTasksQueueMaxSize (queueMaxSize: number): void { + for (const workerNode of this.workerNodes) { + workerNode.tasksQueueBackPressureSize = queueMaxSize + } + } + private buildTasksQueueOptions ( tasksQueueOptions: TasksQueueOptions ): TasksQueueOptions { return { - concurrency: tasksQueueOptions?.concurrency ?? 1 + ...{ + queueMaxSize: Math.pow(this.maxSize, 2), + concurrency: 1 + }, + ...tasksQueueOptions } } @@ -699,7 +738,7 @@ export abstract class AbstractPool< } const timestamp = performance.now() const workerNodeKey = this.chooseWorkerNode() - const workerInfo = this.getWorkerInfo(workerNodeKey) + const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo if ( name != null && Array.isArray(workerInfo.taskFunctions) && @@ -733,7 +772,6 @@ export abstract class AbstractPool< } else { this.enqueueTask(workerNodeKey, task) } - this.checkAndEmitEvents() }) } @@ -744,7 +782,7 @@ export abstract class AbstractPool< await this.destroyWorkerNode(workerNodeKey) }) ) - this.emitter?.emit(PoolEvents.destroy) + this.emitter?.emit(PoolEvents.destroy, this.info) } protected async sendKillMessageToWorker ( @@ -796,10 +834,17 @@ export abstract class AbstractPool< workerNodeKey: number, task: Task ): void { - const workerUsage = this.workerNodes[workerNodeKey].usage - ++workerUsage.tasks.executing - this.updateWaitTimeWorkerUsage(workerUsage, task) - if (this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey)) { + if (this.workerNodes[workerNodeKey]?.usage != null) { + const workerUsage = this.workerNodes[workerNodeKey].usage + ++workerUsage.tasks.executing + this.updateWaitTimeWorkerUsage(workerUsage, task) + } + if ( + this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) && + this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage( + task.name as string + ) != null + ) { const taskFunctionWorkerUsage = this.workerNodes[ workerNodeKey ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage @@ -819,15 +864,22 @@ export abstract class AbstractPool< workerNodeKey: number, message: MessageValue ): void { - const workerUsage = this.workerNodes[workerNodeKey].usage - this.updateTaskStatisticsWorkerUsage(workerUsage, message) - this.updateRunTimeWorkerUsage(workerUsage, message) - this.updateEluWorkerUsage(workerUsage, message) - if (this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey)) { + if (this.workerNodes[workerNodeKey]?.usage != null) { + const workerUsage = this.workerNodes[workerNodeKey].usage + this.updateTaskStatisticsWorkerUsage(workerUsage, message) + this.updateRunTimeWorkerUsage(workerUsage, message) + this.updateEluWorkerUsage(workerUsage, message) + } + if ( + this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) && + this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage( + message.taskPerformance?.name as string + ) != null + ) { const taskFunctionWorkerUsage = this.workerNodes[ workerNodeKey ].getTaskFunctionWorkerUsage( - message.taskPerformance?.name ?? DEFAULT_TASK_NAME + message.taskPerformance?.name as string ) as WorkerUsage this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message) this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message) @@ -844,6 +896,7 @@ export abstract class AbstractPool< private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean { const workerInfo = this.getWorkerInfo(workerNodeKey) return ( + workerInfo != null && Array.isArray(workerInfo.taskFunctions) && workerInfo.taskFunctions.length > 2 ) @@ -943,7 +996,7 @@ export abstract class AbstractPool< if (this.shallCreateDynamicWorker()) { const workerNodeKey = this.createAndSetupDynamicWorkerNode() if ( - this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker + this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage ) { return workerNodeKey } @@ -993,7 +1046,7 @@ export abstract class AbstractPool< worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION) worker.on('error', (error) => { const workerNodeKey = this.getWorkerNodeKeyByWorker(worker) - const workerInfo = this.getWorkerInfo(workerNodeKey) + const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo workerInfo.ready = false this.workerNodes[workerNodeKey].closeChannel() this.emitter?.emit(PoolEvents.error, error) @@ -1047,15 +1100,19 @@ export abstract class AbstractPool< }) } }) - const workerInfo = this.getWorkerInfo(workerNodeKey) + const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo this.sendToWorker(workerNodeKey, { checkActive: true, workerId: workerInfo.id as number }) workerInfo.dynamic = true - if (this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker) { + if ( + this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady || + this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage + ) { workerInfo.ready = true } + this.checkAndEmitDynamicWorkerCreationEvents() return workerNodeKey } @@ -1085,6 +1142,10 @@ export abstract class AbstractPool< this.sendStartupMessageToWorker(workerNodeKey) // Send the statistics message to worker. this.sendStatisticsMessageToWorker(workerNodeKey) + if (this.opts.enableTasksQueue === true) { + this.workerNodes[workerNodeKey].onBackPressure = + this.tasksStealingOnBackPressure.bind(this) + } } /** @@ -1108,7 +1169,7 @@ export abstract class AbstractPool< elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements() .elu.aggregate }, - workerId: this.getWorkerInfo(workerNodeKey).id as number + workerId: (this.getWorkerInfo(workerNodeKey) as WorkerInfo).id as number }) } @@ -1118,24 +1179,23 @@ export abstract class AbstractPool< let minQueuedTasks = Infinity let executeTask = false for (const [workerNodeId, workerNode] of this.workerNodes.entries()) { - const workerInfo = this.getWorkerInfo(workerNodeId) + if ( + this.workerNodes[workerNodeId].usage.tasks.executing < + (this.opts.tasksQueueOptions?.concurrency as number) + ) { + executeTask = true + } if ( workerNodeId !== workerNodeKey && - workerInfo.ready && + workerNode.info.ready && workerNode.usage.tasks.queued === 0 ) { - if ( - this.workerNodes[workerNodeId].usage.tasks.executing < - (this.opts.tasksQueueOptions?.concurrency as number) - ) { - executeTask = true - } targetWorkerNodeKey = workerNodeId break } if ( workerNodeId !== workerNodeKey && - workerInfo.ready && + workerNode.info.ready && workerNode.usage.tasks.queued < minQueuedTasks ) { minQueuedTasks = workerNode.usage.tasks.queued @@ -1145,12 +1205,48 @@ export abstract class AbstractPool< if (executeTask) { this.executeTask( targetWorkerNodeKey, - this.dequeueTask(workerNodeKey) as Task + this.popTask(workerNodeKey) as Task ) } else { this.enqueueTask( targetWorkerNodeKey, - this.dequeueTask(workerNodeKey) as Task + this.popTask(workerNodeKey) as Task + ) + } + } + } + + private tasksStealingOnBackPressure (workerId: number): void { + const sourceWorkerNode = + this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)] + const workerNodes = this.workerNodes + .filter((workerNode) => workerNode.info.id !== workerId) + .sort( + (workerNodeA, workerNodeB) => + workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued + ) + for (const [workerNodeKey, workerNode] of workerNodes.entries()) { + if ( + workerNode.info.ready && + sourceWorkerNode.usage.tasks.queued > 0 && + !workerNode.hasBackPressure() && + workerNode.usage.tasks.executing < + (this.opts.tasksQueueOptions?.concurrency as number) + ) { + this.executeTask( + workerNodeKey, + sourceWorkerNode.popTask() as Task + ) + } else if ( + workerNode.info.ready && + sourceWorkerNode.usage.tasks.queued > 0 && + !workerNode.hasBackPressure() && + workerNode.usage.tasks.executing >= + (this.opts.tasksQueueOptions?.concurrency as number) + ) { + this.enqueueTask( + workerNodeKey, + sourceWorkerNode.popTask() as Task ) } } @@ -1172,8 +1268,10 @@ export abstract class AbstractPool< this.handleTaskExecutionResponse(message) } else if (message.taskFunctions != null) { // Task functions message received from worker - this.getWorkerInfo( - this.getWorkerNodeKeyByWorkerId(message.workerId) + ( + this.getWorkerInfo( + this.getWorkerNodeKeyByWorkerId(message.workerId) + ) as WorkerInfo ).taskFunctions = message.taskFunctions } } @@ -1185,7 +1283,7 @@ export abstract class AbstractPool< } const workerInfo = this.getWorkerInfo( this.getWorkerNodeKeyByWorkerId(message.workerId) - ) + ) as WorkerInfo workerInfo.ready = message.ready as boolean workerInfo.taskFunctions = message.taskFunctions if (this.emitter != null && this.ready) { @@ -1221,13 +1319,22 @@ export abstract class AbstractPool< } } - private checkAndEmitEvents (): void { - if (this.emitter != null) { - if (this.busy) { - this.emitter.emit(PoolEvents.busy, this.info) - } - if (this.type === PoolTypes.dynamic && this.full) { - this.emitter.emit(PoolEvents.full, this.info) + private checkAndEmitTaskExecutionEvents (): void { + if (this.busy) { + this.emitter?.emit(PoolEvents.busy, this.info) + } + } + + private checkAndEmitTaskQueuingEvents (): void { + if (this.hasBackPressure()) { + this.emitter?.emit(PoolEvents.backPressure, this.info) + } + } + + private checkAndEmitDynamicWorkerCreationEvents (): void { + if (this.type === PoolTypes.dynamic) { + if (this.full) { + this.emitter?.emit(PoolEvents.full, this.info) } } } @@ -1238,8 +1345,8 @@ export abstract class AbstractPool< * @param workerNodeKey - The worker node key. * @returns The worker information. */ - protected getWorkerInfo (workerNodeKey: number): WorkerInfo { - return this.workerNodes[workerNodeKey].info + protected getWorkerInfo (workerNodeKey: number): WorkerInfo | undefined { + return this.workerNodes[workerNodeKey]?.info } /** @@ -1253,7 +1360,7 @@ export abstract class AbstractPool< const workerNode = new WorkerNode( worker, this.worker, - this.maxSize + this.opts.tasksQueueOptions?.queueMaxSize ?? Math.pow(this.maxSize, 2) ) // Flag the worker node as ready at pool startup. if (this.starting) { @@ -1293,7 +1400,7 @@ export abstract class AbstractPool< this.opts.enableTasksQueue === true && this.workerNodes.findIndex( (workerNode) => !workerNode.hasBackPressure() - ) !== -1 + ) === -1 ) } @@ -1306,13 +1413,12 @@ export abstract class AbstractPool< private executeTask (workerNodeKey: number, task: Task): void { this.beforeTaskExecutionHook(workerNodeKey, task) this.sendToWorker(workerNodeKey, task, task.transferList) + this.checkAndEmitTaskExecutionEvents() } private enqueueTask (workerNodeKey: number, task: Task): number { const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task) - if (this.hasBackPressure()) { - this.emitter?.emit(PoolEvents.backPressure, this.info) - } + this.checkAndEmitTaskQueuingEvents() return tasksQueueSize } @@ -1320,6 +1426,10 @@ export abstract class AbstractPool< return this.workerNodes[workerNodeKey].dequeueTask() } + private popTask (workerNodeKey: number): Task | undefined { + return this.workerNodes[workerNodeKey].popTask() + } + private tasksQueueSize (workerNodeKey: number): number { return this.workerNodes[workerNodeKey].tasksQueueSize() }