): void => {
if (
this.cannotStealTask() ||
- this.hasBackPressure() ||
+ this.backPressure ||
(this.info.stealingWorkerNodes ?? 0) >
Math.round(
this.workerNodes.length *
return this.workerNodes[workerNodeKey]?.info
}
+ /**
+ * Whether the worker nodes are back pressured or not.
+ * @returns Worker nodes back pressure boolean status.
+ */
+ protected internalBackPressure (): boolean {
+ return (
+ this.opts.enableTasksQueue === true &&
+ this.workerNodes.reduce(
+ (accumulator, workerNode) =>
+ workerNode.info.backPressure ? accumulator + 1 : accumulator,
+ 0
+ ) === this.workerNodes.length
+ )
+ }
+
/**
* Whether worker nodes are executing concurrently their tasks quota or not.
* @returns Worker nodes busyness boolean status.
return (
this.workerNodes.reduce(
(accumulator, _, workerNodeKey) =>
- this.isWorkerNodeIdle(workerNodeKey) ? accumulator + 1 : accumulator,
+ this.isWorkerNodeBusy(workerNodeKey) ? accumulator + 1 : accumulator,
0
- ) === 0
+ ) === this.workerNodes.length
)
}
}
private checkAndEmitTaskQueuingEvents (): void {
- if (this.hasBackPressure()) {
+ if (this.backPressure) {
this.emitter?.emit(PoolEvents.backPressure, this.info)
}
}
this.checkAndEmitReadyEvent()
}
- private hasBackPressure (): boolean {
- return (
- this.opts.enableTasksQueue === true &&
- this.workerNodes.reduce(
- (accumulator, workerNode) =>
- workerNode.info.backPressure ? accumulator + 1 : accumulator,
- 0
- ) === this.workerNodes.length
- )
- }
-
private initEventEmitter (): void {
this.emitter = new EventEmitterAsyncResource({
name: `poolifier:${this.type}-${this.worker}-pool`,
this.started = true
}
+ /**
+ * Whether the pool is back pressured or not.
+ * @returns The pool back pressure boolean status.
+ */
+ protected abstract get backPressure (): boolean
+
/**
* Whether the pool is busy or not.
* @returns The pool busyness boolean status.
* @returns The pool emptiness boolean status.
*/
protected get empty (): boolean {
- return this.minimumNumberOfWorkers === 0 && this.workerNodes.length === 0
+ return (
+ this.minimumNumberOfWorkers === 0 &&
+ this.workerNodes.length === this.minimumNumberOfWorkers
+ )
}
/**
),
}),
...(this.opts.enableTasksQueue === true && {
- backPressure: this.hasBackPressure(),
+ backPressure: this.backPressure,
}),
...(this.opts.enableTasksQueue === true && {
stolenTasks: this.workerNodes.reduce(
enableTasksQueue: true,
}
)
- stub(pool, 'hasBackPressure').returns(true)
+ const backPressureGetterStub = stub().returns(true)
+ stub(pool, 'backPressure').get(backPressureGetterStub)
expect(pool.emitter.eventNames()).toStrictEqual([])
const promises = new Set()
let poolBackPressure = 0
worker: WorkerTypes.thread,
workerNodes: expect.any(Number),
})
- expect(pool.hasBackPressure.callCount).toBeGreaterThanOrEqual(7)
+ expect(backPressureGetterStub.callCount).toBeGreaterThanOrEqual(7)
await pool.destroy()
})