X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=8e32218dfd521567663a27b09db1b2747d74b582;hb=refs%2Ftags%2Fv3.1.15;hp=31e73db14ccf1f06998329c92674f3f348a0a924;hpb=67f3f2d6cb8f915ec71f81c4533ab80a6c6a6f0f;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 31e73db1..8e32218d 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -544,7 +544,6 @@ export abstract class AbstractPool< this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions } this.workerChoiceStrategyContext.setOptions( - this, this.opts.workerChoiceStrategyOptions ) } @@ -608,18 +607,15 @@ export abstract class AbstractPool< private setTaskStealing (): void { for (const [workerNodeKey] of this.workerNodes.entries()) { - this.workerNodes[workerNodeKey].on( - 'idleWorkerNode', - this.handleIdleWorkerNodeEvent - ) + this.workerNodes[workerNodeKey].on('idle', this.handleWorkerNodeIdleEvent) } } private unsetTaskStealing (): void { for (const [workerNodeKey] of this.workerNodes.entries()) { this.workerNodes[workerNodeKey].off( - 'idleWorkerNode', - this.handleIdleWorkerNodeEvent + 'idle', + this.handleWorkerNodeIdleEvent ) } } @@ -628,7 +624,7 @@ export abstract class AbstractPool< for (const [workerNodeKey] of this.workerNodes.entries()) { this.workerNodes[workerNodeKey].on( 'backPressure', - this.handleBackPressureEvent + this.handleWorkerNodeBackPressureEvent ) } } @@ -637,7 +633,7 @@ export abstract class AbstractPool< for (const [workerNodeKey] of this.workerNodes.entries()) { this.workerNodes[workerNodeKey].off( 'backPressure', - this.handleBackPressureEvent + this.handleWorkerNodeBackPressureEvent ) } } @@ -1299,7 +1295,6 @@ export abstract class AbstractPool< }) } }) - const workerInfo = this.getWorkerInfo(workerNodeKey) this.sendToWorker(workerNodeKey, { checkActive: true }) @@ -1314,12 +1309,13 @@ export abstract class AbstractPool< }) } } - workerInfo.dynamic = true + const workerNode = this.workerNodes[workerNodeKey] + workerNode.info.dynamic = true if ( this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady || this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage ) { - workerInfo.ready = true + workerNode.info.ready = true } this.checkAndEmitDynamicWorkerCreationEvents() return workerNodeKey @@ -1383,14 +1379,14 @@ export abstract class AbstractPool< if (this.opts.enableTasksQueue === true) { if (this.opts.tasksQueueOptions?.taskStealing === true) { this.workerNodes[workerNodeKey].on( - 'idleWorkerNode', - this.handleIdleWorkerNodeEvent + 'idle', + this.handleWorkerNodeIdleEvent ) } if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) { this.workerNodes[workerNodeKey].on( 'backPressure', - this.handleBackPressureEvent + this.handleWorkerNodeBackPressureEvent ) } } @@ -1524,7 +1520,7 @@ export abstract class AbstractPool< } } - private readonly handleIdleWorkerNodeEvent = ( + private readonly handleWorkerNodeIdleEvent = ( eventDetail: WorkerNodeEventDetail, previousStolenTask?: Task ): void => { @@ -1595,7 +1591,7 @@ export abstract class AbstractPool< } sleep(exponentialDelay(workerNodeTasksUsage.sequentiallyStolen)) .then(() => { - this.handleIdleWorkerNodeEvent(eventDetail, stolenTask) + this.handleWorkerNodeIdleEvent(eventDetail, stolenTask) return undefined }) .catch(EMPTY_FUNCTION) @@ -1628,7 +1624,7 @@ export abstract class AbstractPool< } } - private readonly handleBackPressureEvent = ( + private readonly handleWorkerNodeBackPressureEvent = ( eventDetail: WorkerNodeEventDetail ): void => { if ( @@ -1697,18 +1693,17 @@ export abstract class AbstractPool< private handleWorkerReadyResponse (message: MessageValue): void { const { workerId, ready, taskFunctionNames } = message - if (ready === false) { + if (ready == null || !ready) { // eslint-disable-next-line @typescript-eslint/no-non-null-assertion throw new Error(`Worker ${workerId!} failed to initialize`) } - const workerInfo = this.getWorkerInfo( - this.getWorkerNodeKeyByWorkerId(workerId) - ) - workerInfo.ready = ready as boolean - workerInfo.taskFunctionNames = taskFunctionNames + const workerNode = + this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)] + workerNode.info.ready = ready + workerNode.info.taskFunctionNames = taskFunctionNames if (!this.readyEventEmitted && this.ready) { - this.readyEventEmitted = true this.emitter?.emit(PoolEvents.ready, this.info) + this.readyEventEmitted = true } } @@ -1754,7 +1749,7 @@ export abstract class AbstractPool< this.tasksQueueSize(workerNodeKey) === 0 && workerNodeTasksUsage.sequentiallyStolen === 0 ) { - workerNode.emit('idleWorkerNode', { + workerNode.emit('idle', { // eslint-disable-next-line @typescript-eslint/no-non-null-assertion workerId: workerId!, workerNodeKey @@ -1850,14 +1845,6 @@ export abstract class AbstractPool< this.getWorkerInfo(workerNodeKey).ready = false } - /** @inheritDoc */ - public hasWorkerNodeBackPressure (workerNodeKey: number): boolean { - return ( - this.opts.enableTasksQueue === true && - this.workerNodes[workerNodeKey].hasBackPressure() - ) - } - private hasBackPressure (): boolean { return ( this.opts.enableTasksQueue === true &&