## [Unreleased]
+### Added
+
+- Add `full` event to dynamic pool.
+
## [2.4.1] - 2023-04-05
### Changed
'./yourWorker.js',
{ errorHandler: (e) => console.error(e), onlineHandler: () => console.log('worker is online') })
+pool.emitter.on('busy', () => console.log('Pool is busy'))
+
// or a dynamic worker-threads pool
const pool = new DynamicThreadPool(10, 100,
'./yourWorker.js',
{ errorHandler: (e) => console.error(e), onlineHandler: () => console.log('worker is online') })
+pool.emitter.on('full', () => console.log('Pool is full'))
pool.emitter.on('busy', () => console.log('Pool is busy'))
// the execute method signature is the same for both implementations,
const { DynamicThreadPool } = require('poolifier')
let resolved = 0
-let maxReached = 0
+let poolFull = 0
+let poolBusy = 0
const pool = new DynamicThreadPool(10, 20, './yourWorker.js', {
errorHandler: e => console.error(e),
onlineHandler: () => console.log('worker is online')
})
-pool.emitter.on('busy', () => maxReached++)
+pool.emitter.on('full', () => poolFull++)
+pool.emitter.on('busy', () => poolBusy++)
const start = Date.now()
const iterations = 1000
.then(() => {
resolved++
if (resolved === iterations) {
- console.log('Time take is ' + (Date.now() - start))
- return console.log('The pool was busy for ' + maxReached + ' times')
+ console.log('Time taken is ' + (Date.now() - start))
+ console.log('The pool was full for ' + poolFull + ' times')
+ return console.log('The pool was busy for ' + poolBusy + ' times')
}
return null
})
const { FixedThreadPool } = require('poolifier')
let resolved = 0
+let poolBusy = 0
const pool = new FixedThreadPool(15, './yourWorker.js', {
errorHandler: e => console.error(e),
onlineHandler: () => console.log('worker is online')
})
+pool.emitter.on('busy', () => poolBusy++)
const start = Date.now()
const iterations = 1000
.then(() => {
resolved++
if (resolved === iterations) {
- return console.log('Time take is ' + (Date.now() - start))
+ console.log('Time taken is ' + (Date.now() - start))
+ return console.log('The pool was busy for ' + poolBusy + ' times')
}
return null
})
this.chooseWorker.bind(this)
this.internalExecute.bind(this)
+ this.checkAndEmitFull.bind(this)
this.checkAndEmitBusy.bind(this)
this.sendToWorker.bind(this)
const [workerKey, worker] = this.chooseWorker()
const messageId = crypto.randomUUID()
const res = this.internalExecute(workerKey, worker, messageId)
+ this.checkAndEmitFull()
this.checkAndEmitBusy()
this.sendToWorker(worker, {
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions
}
}
+ private checkAndEmitFull (): void {
+ if (
+ this.type === PoolType.DYNAMIC &&
+ this.opts.enableEvents === true &&
+ this.full
+ ) {
+ this.emitter?.emit('full')
+ }
+ }
+
/**
* Gets worker tasks usage.
*
*
* Events that can currently be listened to:
*
- * - `'busy'`
+ * - `'full'`: Emitted when the pool is dynamic and full.
+ * - `'busy'`: Emitted when the pool is busy.
*/
readonly emitter?: PoolEmitter
/**
> {
/**
* The pool instance.
- * @readonly
*/
readonly pool: IPoolInternal<Worker, Data, Response>
/**
* Is the pool attached to the strategy dynamic?.
- * @readonly
*/
readonly isDynamicPool: boolean
/**
* Required pool tasks usage statistics.
- * @readonly
*/
readonly requiredStatistics: RequiredStatistics
/**
await pool.destroy()
})
+ it("Verify that pool event emitter 'full' event can register a callback", async () => {
+ const pool = new DynamicThreadPool(
+ numberOfWorkers,
+ numberOfWorkers,
+ './tests/worker-files/thread/testWorker.js'
+ )
+ const promises = []
+ let poolFull = 0
+ pool.emitter.on('full', () => ++poolFull)
+ for (let i = 0; i < numberOfWorkers * 2; i++) {
+ promises.push(pool.execute())
+ }
+ await Promise.all(promises)
+ // The `full` event is triggered when the number of submitted tasks at once reach the number of dynamic pool workers.
+ // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool.
+ expect(poolFull).toBe(numberOfWorkers + 1)
+ await pool.destroy()
+ })
+
it("Verify that pool event emitter 'busy' event can register a callback", async () => {
const pool = new FixedThreadPool(
numberOfWorkers,
)
const promises = []
let poolBusy = 0
- pool.emitter.on('busy', () => poolBusy++)
+ pool.emitter.on('busy', () => ++poolBusy)
for (let i = 0; i < numberOfWorkers * 2; i++) {
promises.push(pool.execute())
}