*/
private backPressureEventEmitted: boolean
+ /**
+ * Whether the pool busy event has been emitted or not.
+ */
+ private busyEventEmitted: boolean
+
/**
* Whether the pool is destroying or not.
*/
this.starting = false
this.destroying = false
this.readyEventEmitted = false
+ this.busyEventEmitted = false
this.backPressureEventEmitted = false
this.startingMinimumNumberOfWorkers = false
if (this.opts.startWorkers === true) {
this.opts.enableTasksQueue === true &&
this.workerNodes.reduce(
(accumulator, workerNode) =>
- workerNode.info.backPressure ? accumulator + 1 : accumulator,
+ workerNode.info.ready && workerNode.info.backPressure
+ ? accumulator + 1
+ : accumulator,
0
) === this.workerNodes.length
)
}
private checkAndEmitEmptyEvent (): void {
- if (this.empty) {
- this.emitter?.emit(PoolEvents.empty, this.info)
+ if (this.emitter != null && this.empty) {
+ this.emitter.emit(PoolEvents.empty, this.info)
}
}
private checkAndEmitReadyEvent (): void {
- if (!this.readyEventEmitted && this.ready) {
- this.emitter?.emit(PoolEvents.ready, this.info)
+ if (this.emitter != null && !this.readyEventEmitted && this.ready) {
+ this.emitter.emit(PoolEvents.ready, this.info)
this.readyEventEmitted = true
}
}
private checkAndEmitTaskDequeuingEvents (): void {
- if (this.backPressureEventEmitted && !this.backPressure) {
- this.emitter?.emit(PoolEvents.backPressureEnd, this.info)
+ if (
+ this.emitter != null &&
+ this.backPressureEventEmitted &&
+ !this.backPressure
+ ) {
+ this.emitter.emit(PoolEvents.backPressureEnd, this.info)
this.backPressureEventEmitted = false
}
}
private checkAndEmitTaskExecutionEvents (): void {
- if (this.busy) {
- this.emitter?.emit(PoolEvents.busy, this.info)
+ if (this.emitter != null && !this.busyEventEmitted && this.busy) {
+ this.emitter.emit(PoolEvents.busy, this.info)
+ this.busyEventEmitted = true
+ }
+ }
+
+ private checkAndEmitTaskExecutionFinishedEvents (): void {
+ if (this.emitter != null && this.busyEventEmitted && !this.busy) {
+ this.emitter.emit(PoolEvents.busyEnd, this.info)
+ this.busyEventEmitted = false
}
}
private checkAndEmitTaskQueuingEvents (): void {
- if (!this.backPressureEventEmitted && this.backPressure) {
- this.emitter?.emit(PoolEvents.backPressure, this.info)
+ if (
+ this.emitter != null &&
+ !this.backPressureEventEmitted &&
+ this.backPressure
+ ) {
+ this.emitter.emit(PoolEvents.backPressure, this.info)
this.backPressureEventEmitted = true
}
}
}
asyncResource?.emitDestroy()
this.afterTaskExecutionHook(workerNodeKey, message)
+ this.checkAndEmitTaskExecutionFinishedEvents()
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
this.promiseResponseMap.delete(taskId!)
if (this.opts.enableTasksQueue === true && !this.destroying) {
await this.destroyWorkerNode(workerNodeKey)
})
)
- this.emitter?.emit(PoolEvents.destroy, this.info)
- this.emitter?.emitDestroy()
- this.readyEventEmitted = false
- this.backPressureEventEmitted = false
+ if (this.emitter != null) {
+ this.emitter.emit(PoolEvents.destroy, this.info)
+ this.emitter.emitDestroy()
+ this.readyEventEmitted = false
+ this.busyEventEmitted = false
+ this.backPressureEventEmitted = false
+ }
delete this.startTimestamp
this.destroying = false
this.started = false
/** @inheritDoc */
protected checkAndEmitDynamicWorkerCreationEvents (): void {
- if (this.full) {
- this.emitter?.emit(PoolEvents.full, this.info)
+ if (this.emitter != null && this.full) {
+ this.emitter.emit(PoolEvents.full, this.info)
}
}
backPressure: 'backPressure'
backPressureEnd: 'backPressureEnd'
busy: 'busy'
+ busyEnd: 'busyEnd'
destroy: 'destroy'
empty: 'empty'
error: 'error'
backPressure: 'backPressure',
backPressureEnd: 'backPressureEnd',
busy: 'busy',
+ busyEnd: 'busyEnd',
destroy: 'destroy',
empty: 'empty',
error: 'error',
*
* - `'ready'`: Emitted when the number of workers created in the pool has reached the minimum size expected and are ready. If the pool is dynamic with a minimum number of workers is set to zero, this event is emitted when at least one dynamic worker is ready.
* - `'busy'`: Emitted when the number of workers created in the pool has reached the maximum size expected and are executing concurrently their tasks quota.
+ * - `'busyEnd'`: Emitted when the number of workers created in the pool has reached the maximum size expected and are no longer executing concurrently their tasks quota.
* - `'full'`: Emitted when the pool is dynamic and the number of workers created has reached the maximum size expected.
* - `'empty'`: Emitted when the pool is dynamic with a minimum number of workers set to zero and the number of workers has reached the minimum size expected.
* - `'destroy'`: Emitted when the pool is destroyed.
/** @inheritDoc */
protected checkAndEmitDynamicWorkerCreationEvents (): void {
- if (this.full) {
- this.emitter?.emit(PoolEvents.full, this.info)
+ if (this.emitter != null && this.full) {
+ this.emitter.emit(PoolEvents.full, this.info)
}
}
expect(pool.info.ready).toBe(false)
expect(pool.workerNodes).toStrictEqual([])
expect(pool.readyEventEmitted).toBe(false)
+ expect(pool.busyEventEmitted).toBe(false)
expect(pool.backPressureEventEmitted).toBe(false)
pool.start()
expect(pool.info.started).toBe(true)
expect(pool.info.ready).toBe(true)
await waitPoolEvents(pool, PoolEvents.ready, 1)
expect(pool.readyEventEmitted).toBe(true)
+ expect(pool.busyEventEmitted).toBe(false)
expect(pool.backPressureEventEmitted).toBe(false)
expect(pool.workerNodes.length).toBe(numberOfWorkers)
for (const workerNode of pool.workerNodes) {
await pool.destroy()
})
- it("Verify that pool event emitter 'busy' event can register a callback", async () => {
+ it("Verify that pool event emitter 'busy' and 'busyEnd' events can register a callback", async () => {
const pool = new FixedThreadPool(
numberOfWorkers,
'./tests/worker-files/thread/testWorker.mjs'
expect(pool.emitter.eventNames()).toStrictEqual([])
const promises = new Set()
let poolBusy = 0
- let poolInfo
+ let poolBusyInfo
pool.emitter.on(PoolEvents.busy, info => {
++poolBusy
- poolInfo = info
+ poolBusyInfo = info
+ })
+ let poolBusyEnd = 0
+ let poolBusyEndInfo
+ pool.emitter.on(PoolEvents.busyEnd, info => {
+ ++poolBusyEnd
+ poolBusyEndInfo = info
})
- expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
+ expect(pool.emitter.eventNames()).toStrictEqual([
+ PoolEvents.busy,
+ PoolEvents.busyEnd,
+ ])
for (let i = 0; i < numberOfWorkers * 2; i++) {
promises.add(pool.execute())
}
await Promise.all(promises)
- // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
- // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
- expect(poolBusy).toBe(numberOfWorkers + 1)
- expect(poolInfo).toStrictEqual({
+ expect(poolBusy).toBe(1)
+ expect(poolBusyInfo).toStrictEqual({
+ busyWorkerNodes: expect.any(Number),
+ defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
+ executedTasks: expect.any(Number),
+ executingTasks: expect.any(Number),
+ failedTasks: expect.any(Number),
+ idleWorkerNodes: expect.any(Number),
+ maxSize: expect.any(Number),
+ minSize: expect.any(Number),
+ ready: true,
+ started: true,
+ strategyRetries: expect.any(Number),
+ type: PoolTypes.fixed,
+ version,
+ worker: WorkerTypes.thread,
+ workerNodes: expect.any(Number),
+ })
+ expect(poolBusyEnd).toBe(1)
+ expect(poolBusyEndInfo).toStrictEqual({
busyWorkerNodes: expect.any(Number),
defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
executedTasks: expect.any(Number),
expect(pool.emitter.eventNames()).toStrictEqual([])
const promises = new Set()
let poolBackPressure = 0
- let backPressurePoolInfo
+ let poolBackPressureInfo
pool.emitter.on(PoolEvents.backPressure, info => {
++poolBackPressure
- backPressurePoolInfo = info
+ poolBackPressureInfo = info
})
let poolBackPressureEnd = 0
- let backPressureEndPoolInfo
+ let poolBackPressureEndInfo
pool.emitter.on(PoolEvents.backPressureEnd, info => {
++poolBackPressureEnd
- backPressureEndPoolInfo = info
+ poolBackPressureEndInfo = info
})
expect(pool.emitter.eventNames()).toStrictEqual([
PoolEvents.backPressure,
}
await Promise.all(promises)
expect(poolBackPressure).toBe(1)
- expect(backPressurePoolInfo).toStrictEqual({
+ expect(poolBackPressureInfo).toStrictEqual({
backPressure: true,
backPressureWorkerNodes: expect.any(Number),
busyWorkerNodes: expect.any(Number),
workerNodes: expect.any(Number),
})
expect(poolBackPressureEnd).toBe(1)
- expect(backPressureEndPoolInfo).toStrictEqual({
+ expect(poolBackPressureEndInfo).toStrictEqual({
backPressure: false,
backPressureWorkerNodes: expect.any(Number),
busyWorkerNodes: expect.any(Number),
PoolEvents.destroy,
])
expect(pool.readyEventEmitted).toBe(false)
+ expect(pool.busyEventEmitted).toBe(false)
expect(pool.backPressureEventEmitted).toBe(false)
expect(pool.workerNodes.length).toBe(0)
expect(numberOfExitEvents).toBe(min)
expect(pool.info.ready).toBe(false)
expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.destroy])
expect(pool.readyEventEmitted).toBe(false)
+ expect(pool.busyEventEmitted).toBe(false)
expect(pool.backPressureEventEmitted).toBe(false)
expect(pool.workerNodes.length).toBe(0)
expect(numberOfExitEvents).toBe(numberOfWorkers)
PoolEvents.destroy,
])
expect(pool.readyEventEmitted).toBe(false)
+ expect(pool.busyEventEmitted).toBe(false)
expect(pool.backPressureEventEmitted).toBe(false)
expect(pool.workerNodes.length).toBe(0)
expect(numberOfExitEvents).toBe(min)
expect(pool.info.ready).toBe(false)
expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.destroy])
expect(pool.readyEventEmitted).toBe(false)
+ expect(pool.busyEventEmitted).toBe(false)
expect(pool.backPressureEventEmitted).toBe(false)
expect(pool.workerNodes.length).toBe(0)
expect(numberOfExitEvents).toBe(numberOfThreads)