X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=c8d4551167d28d0bcf64cf3e7afb082dff53841d;hb=5eb72b9e26eaffb43c67147fbc6b4d2b1b959d62;hp=4bfc3acbc47fb80b001f54f2ab429937fa46e2a9;hpb=00e1bdeb5c50b0eede8fe2f72d47bf8992e4aede;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 4bfc3acb..c8d45511 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -298,6 +298,13 @@ export abstract class AbstractPool< : accumulator, 0 ), + ...(this.opts.enableTasksQueue === true && { + stealingWorkerNodes: this.workerNodes.reduce( + (accumulator, workerNode) => + workerNode.info.stealing ? accumulator + 1 : accumulator, + 0 + ) + }), busyWorkerNodes: this.workerNodes.reduce( (accumulator, _workerNode, workerNodeKey) => this.isWorkerNodeBusy(workerNodeKey) ? accumulator + 1 : accumulator, @@ -984,8 +991,8 @@ export abstract class AbstractPool< private async sendKillMessageToWorker (workerNodeKey: number): Promise { await new Promise((resolve, reject) => { - if (workerNodeKey < 0 || workerNodeKey >= this.workerNodes.length) { - reject(new Error(`Invalid worker node key '${workerNodeKey}'`)) + if (this.workerNodes?.[workerNodeKey] == null) { + resolve() return } const killMessageListener = (message: MessageValue): void => { @@ -1178,9 +1185,7 @@ export abstract class AbstractPool< * * @returns Whether to create a dynamic worker or not. */ - private shallCreateDynamicWorker (): boolean { - return this.type === PoolTypes.dynamic && !this.full && this.internalBusy() - } + protected abstract shallCreateDynamicWorker (): boolean /** * Sends a message to worker given its worker node key. @@ -1399,6 +1404,10 @@ export abstract class AbstractPool< }) } + private cannotStealTask (): boolean { + return this.workerNodes.length <= 1 || this.info.queuedTasks === 0 + } + private handleTask (workerNodeKey: number, task: Task): void { if (this.shallExecuteTask(workerNodeKey)) { this.executeTask(workerNodeKey, task) @@ -1411,7 +1420,7 @@ export abstract class AbstractPool< if (workerNodeKey === -1) { return } - if (this.workerNodes.length <= 1) { + if (this.cannotStealTask()) { return } while (this.tasksQueueSize(workerNodeKey) > 0) { @@ -1505,15 +1514,22 @@ export abstract class AbstractPool< eventDetail: WorkerNodeEventDetail, previousStolenTask?: Task ): void => { - if (this.workerNodes.length <= 1) { - return - } const { workerNodeKey } = eventDetail if (workerNodeKey == null) { throw new Error( - 'WorkerNode event detail workerNodeKey attribute must be defined' + 'WorkerNode event detail workerNodeKey property must be defined' ) } + if ( + this.cannotStealTask() || + (this.info.stealingWorkerNodes as number) > + Math.floor(this.workerNodes.length / 2) + ) { + if (previousStolenTask != null) { + this.getWorkerInfo(workerNodeKey).stealing = false + } + return + } const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks if ( previousStolenTask != null && @@ -1521,6 +1537,7 @@ export abstract class AbstractPool< (workerNodeTasksUsage.executing > 0 || this.tasksQueueSize(workerNodeKey) > 0) ) { + this.getWorkerInfo(workerNodeKey).stealing = false for (const taskName of this.workerNodes[workerNodeKey].info .taskFunctionNames as string[]) { this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage( @@ -1531,6 +1548,7 @@ export abstract class AbstractPool< this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey) return } + this.getWorkerInfo(workerNodeKey).stealing = true const stolenTask = this.workerNodeStealTask(workerNodeKey) if ( this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) && @@ -1577,6 +1595,7 @@ export abstract class AbstractPool< const sourceWorkerNode = workerNodes.find( (sourceWorkerNode, sourceWorkerNodeKey) => sourceWorkerNode.info.ready && + !sourceWorkerNode.info.stealing && sourceWorkerNodeKey !== workerNodeKey && sourceWorkerNode.usage.tasks.queued > 0 ) @@ -1595,7 +1614,11 @@ export abstract class AbstractPool< private readonly handleBackPressureEvent = ( eventDetail: WorkerNodeEventDetail ): void => { - if (this.workerNodes.length <= 1) { + if ( + this.cannotStealTask() || + (this.info.stealingWorkerNodes as number) > + Math.floor(this.workerNodes.length / 2) + ) { return } const { workerId } = eventDetail @@ -1615,16 +1638,19 @@ export abstract class AbstractPool< if ( sourceWorkerNode.usage.tasks.queued > 0 && workerNode.info.ready && + !workerNode.info.stealing && workerNode.info.id !== workerId && workerNode.usage.tasks.queued < (this.opts.tasksQueueOptions?.size as number) - sizeOffset ) { + this.getWorkerInfo(workerNodeKey).stealing = true const task = sourceWorkerNode.popTask() as Task this.handleTask(workerNodeKey, task) this.updateTaskStolenStatisticsWorkerUsage( workerNodeKey, task.name as string ) + this.getWorkerInfo(workerNodeKey).stealing = false } } } @@ -1729,13 +1755,10 @@ export abstract class AbstractPool< } } - private checkAndEmitDynamicWorkerCreationEvents (): void { - if (this.type === PoolTypes.dynamic) { - if (this.full) { - this.emitter?.emit(PoolEvents.full, this.info) - } - } - } + /** + * Emits dynamic worker creation events. + */ + protected abstract checkAndEmitDynamicWorkerCreationEvents (): void /** * Gets the worker information given its worker node key.