}
}
+ /**
+ * Whether the pool back pressure event has been emitted or not.
+ */
+ private backPressureEventEmitted: boolean
+
/**
* Whether the pool is destroying or not.
*/
this.starting = false
this.destroying = false
this.readyEventEmitted = false
+ this.backPressureEventEmitted = false
this.startingMinimumNumberOfWorkers = false
if (this.opts.startWorkers === true) {
this.start()
}
}
+ private checkAndEmitTaskDequeuingEvents (): void {
+ if (this.backPressureEventEmitted && !this.backPressure) {
+ this.emitter?.emit(PoolEvents.backPressureEnd, this.info)
+ this.backPressureEventEmitted = false
+ }
+ }
+
private checkAndEmitTaskExecutionEvents (): void {
if (this.busy) {
this.emitter?.emit(PoolEvents.busy, this.info)
}
private checkAndEmitTaskQueuingEvents (): void {
- if (this.backPressure) {
+ if (!this.backPressureEventEmitted && this.backPressure) {
this.emitter?.emit(PoolEvents.backPressure, this.info)
+ this.backPressureEventEmitted = true
}
}
}
private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
- return this.workerNodes[workerNodeKey].dequeueTask()
+ const task = this.workerNodes[workerNodeKey].dequeueTask()
+ this.checkAndEmitTaskDequeuingEvents()
+ return task
}
private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
this.emitter?.emit(PoolEvents.destroy, this.info)
this.emitter?.emitDestroy()
this.readyEventEmitted = false
+ this.backPressureEventEmitted = false
delete this.startTimestamp
this.destroying = false
this.started = false
import { readFileSync } from 'node:fs'
import { dirname, join } from 'node:path'
import { fileURLToPath } from 'node:url'
-import { restore, stub } from 'sinon'
import { CircularBuffer } from '../../lib/circular-buffer.cjs'
import {
}
}
- afterEach(() => {
- restore()
- })
-
it('Verify that pool can be created and destroyed', async () => {
const pool = new FixedThreadPool(
numberOfWorkers,
expect(pool.info.ready).toBe(false)
expect(pool.workerNodes).toStrictEqual([])
expect(pool.readyEventEmitted).toBe(false)
+ expect(pool.backPressureEventEmitted).toBe(false)
pool.start()
expect(pool.info.started).toBe(true)
expect(pool.info.ready).toBe(true)
await waitPoolEvents(pool, PoolEvents.ready, 1)
expect(pool.readyEventEmitted).toBe(true)
+ expect(pool.backPressureEventEmitted).toBe(false)
expect(pool.workerNodes.length).toBe(numberOfWorkers)
for (const workerNode of pool.workerNodes) {
expect(workerNode).toBeInstanceOf(WorkerNode)
await pool.destroy()
})
- it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
+ it("Verify that pool event emitter 'backPressure' and 'backPressureEnd' events can register a callback", async () => {
const pool = new FixedThreadPool(
numberOfWorkers,
'./tests/worker-files/thread/testWorker.mjs',
enableTasksQueue: true,
}
)
- const backPressureGetterStub = stub().returns(true)
- stub(pool, 'backPressure').get(backPressureGetterStub)
expect(pool.emitter.eventNames()).toStrictEqual([])
const promises = new Set()
let poolBackPressure = 0
- let poolInfo
+ let backPressurePoolInfo
pool.emitter.on(PoolEvents.backPressure, info => {
++poolBackPressure
- poolInfo = info
+ backPressurePoolInfo = info
})
- expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.backPressure])
- for (let i = 0; i < numberOfWorkers + 1; i++) {
+ let poolBackPressureEnd = 0
+ let backPressureEndPoolInfo
+ pool.emitter.on(PoolEvents.backPressureEnd, info => {
+ ++poolBackPressureEnd
+ backPressureEndPoolInfo = info
+ })
+ expect(pool.emitter.eventNames()).toStrictEqual([
+ PoolEvents.backPressure,
+ PoolEvents.backPressureEnd,
+ ])
+ for (let i = 0; i < numberOfWorkers * 10; i++) {
promises.add(pool.execute())
}
await Promise.all(promises)
expect(poolBackPressure).toBe(1)
- expect(poolInfo).toStrictEqual({
+ expect(backPressurePoolInfo).toStrictEqual({
backPressure: true,
backPressureWorkerNodes: expect.any(Number),
busyWorkerNodes: expect.any(Number),
worker: WorkerTypes.thread,
workerNodes: expect.any(Number),
})
- expect(backPressureGetterStub.callCount).toBeGreaterThanOrEqual(7)
+ expect(poolBackPressureEnd).toBe(1)
+ expect(backPressureEndPoolInfo).toStrictEqual({
+ backPressure: false,
+ backPressureWorkerNodes: expect.any(Number),
+ busyWorkerNodes: expect.any(Number),
+ defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
+ executedTasks: expect.any(Number),
+ executingTasks: expect.any(Number),
+ failedTasks: expect.any(Number),
+ idleWorkerNodes: expect.any(Number),
+ maxQueuedTasks: expect.any(Number),
+ maxSize: expect.any(Number),
+ minSize: expect.any(Number),
+ queuedTasks: expect.any(Number),
+ ready: true,
+ started: true,
+ stealingWorkerNodes: expect.any(Number),
+ stolenTasks: expect.any(Number),
+ strategyRetries: expect.any(Number),
+ type: PoolTypes.fixed,
+ version,
+ worker: WorkerTypes.thread,
+ workerNodes: expect.any(Number),
+ })
await pool.destroy()
})