feat: add 'full' event on dynamic pool emitter
authorJérôme Benoit <jerome.benoit@sap.com>
Wed, 5 Apr 2023 15:49:50 +0000 (17:49 +0200)
committerJérôme Benoit <jerome.benoit@sap.com>
Wed, 5 Apr 2023 15:49:50 +0000 (17:49 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>
CHANGELOG.md
README.md
examples/dynamicExample.js
examples/fixedExample.js
src/pools/abstract-pool.ts
src/pools/pool.ts
src/pools/selection-strategies/selection-strategies-types.ts
tests/pools/abstract/abstract-pool.test.js

index 94ecf704f7e138aa6e90164b7ee16882957fcbb5..a2cc015da1f2bb7ec43e843eaa909fd963abb38e 100644 (file)
@@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
 
 ## [Unreleased]
 
+### Added
+
+- Add `full` event to dynamic pool.
+
 ## [2.4.1] - 2023-04-05
 
 ### Changed
index 1248db3f814f1dc273841b77d9171a7d0893940d..dc956759aef4cfa76ae4a409e4a4cd0a1fa1dd2c 100644 (file)
--- a/README.md
+++ b/README.md
@@ -120,11 +120,14 @@ const pool = new FixedThreadPool(15,
   './yourWorker.js',
   { errorHandler: (e) => console.error(e), onlineHandler: () => console.log('worker is online') })
 
+pool.emitter.on('busy', () => console.log('Pool is busy'))
+
 // or a dynamic worker-threads pool
 const pool = new DynamicThreadPool(10, 100,
   './yourWorker.js',
   { errorHandler: (e) => console.error(e), onlineHandler: () => console.log('worker is online') })
 
+pool.emitter.on('full', () => console.log('Pool is full'))
 pool.emitter.on('busy', () => console.log('Pool is busy'))
 
 // the execute method signature is the same for both implementations,
index ed1bddebe0e17914992b26d982e477bf95e8e7bc..79f73c11c53b0f296cd077ecca35fc53a18cab1c 100644 (file)
@@ -1,11 +1,13 @@
 const { DynamicThreadPool } = require('poolifier')
 let resolved = 0
-let maxReached = 0
+let poolFull = 0
+let poolBusy = 0
 const pool = new DynamicThreadPool(10, 20, './yourWorker.js', {
   errorHandler: e => console.error(e),
   onlineHandler: () => console.log('worker is online')
 })
-pool.emitter.on('busy', () => maxReached++)
+pool.emitter.on('full', () => poolFull++)
+pool.emitter.on('busy', () => poolBusy++)
 
 const start = Date.now()
 const iterations = 1000
@@ -15,8 +17,9 @@ for (let i = 1; i <= iterations; i++) {
     .then(() => {
       resolved++
       if (resolved === iterations) {
-        console.log('Time take is ' + (Date.now() - start))
-        return console.log('The pool was busy for ' + maxReached + ' times')
+        console.log('Time taken is ' + (Date.now() - start))
+        console.log('The pool was full for ' + poolFull + ' times')
+        return console.log('The pool was busy for ' + poolBusy + ' times')
       }
       return null
     })
index 2c36a7038695e4e4a8a10dae15bc0f748a4b3849..c6ea884897191956ddf2491608bfac175e2c923c 100644 (file)
@@ -1,9 +1,11 @@
 const { FixedThreadPool } = require('poolifier')
 let resolved = 0
+let poolBusy = 0
 const pool = new FixedThreadPool(15, './yourWorker.js', {
   errorHandler: e => console.error(e),
   onlineHandler: () => console.log('worker is online')
 })
+pool.emitter.on('busy', () => poolBusy++)
 
 const start = Date.now()
 const iterations = 1000
@@ -13,7 +15,8 @@ for (let i = 1; i <= iterations; i++) {
     .then(() => {
       resolved++
       if (resolved === iterations) {
-        return console.log('Time take is ' + (Date.now() - start))
+        console.log('Time taken is ' + (Date.now() - start))
+        return console.log('The pool was busy for ' + poolBusy + ' times')
       }
       return null
     })
index c1e6ddc09a9a41ce18d34ced63a162225f66818f..0da7f86bfd7870ae1f1efd020dee765ec9c142f9 100644 (file)
@@ -76,6 +76,7 @@ export abstract class AbstractPool<
 
     this.chooseWorker.bind(this)
     this.internalExecute.bind(this)
+    this.checkAndEmitFull.bind(this)
     this.checkAndEmitBusy.bind(this)
     this.sendToWorker.bind(this)
 
@@ -209,6 +210,7 @@ export abstract class AbstractPool<
     const [workerKey, worker] = this.chooseWorker()
     const messageId = crypto.randomUUID()
     const res = this.internalExecute(workerKey, worker, messageId)
+    this.checkAndEmitFull()
     this.checkAndEmitBusy()
     this.sendToWorker(worker, {
       // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
@@ -413,6 +415,16 @@ export abstract class AbstractPool<
     }
   }
 
+  private checkAndEmitFull (): void {
+    if (
+      this.type === PoolType.DYNAMIC &&
+      this.opts.enableEvents === true &&
+      this.full
+    ) {
+      this.emitter?.emit('full')
+    }
+  }
+
   /**
    * Gets worker tasks usage.
    *
index 8a5e505f4f2d23909db36ef3d66e987eee9e9afd..600cb4228b0b33da08a6cea5c13093ca790f1835 100644 (file)
@@ -56,7 +56,8 @@ export interface IPool<Data = unknown, Response = unknown> {
    *
    * Events that can currently be listened to:
    *
-   * - `'busy'`
+   * - `'full'`: Emitted when the pool is dynamic and full.
+   * - `'busy'`: Emitted when the pool is busy.
    */
   readonly emitter?: PoolEmitter
   /**
index 4fb2f3584aadffcc0f5224fdfb8ce6521a2979a8..bf364f923d033e24223afed35b07f3bdaaf1517b 100644 (file)
@@ -50,17 +50,14 @@ export interface IWorkerChoiceStrategy<
 > {
   /**
    * The pool instance.
-   * @readonly
    */
   readonly pool: IPoolInternal<Worker, Data, Response>
   /**
    * Is the pool attached to the strategy dynamic?.
-   * @readonly
    */
   readonly isDynamicPool: boolean
   /**
    * Required pool tasks usage statistics.
-   * @readonly
    */
   readonly requiredStatistics: RequiredStatistics
   /**
index 5bb5cde50cfbc16bbb439e101cb20ad2a8f23add..d9b5695e53f369bd45f889a28c4a4846d6ab351f 100644 (file)
@@ -210,6 +210,25 @@ describe('Abstract pool test suite', () => {
     await pool.destroy()
   })
 
+  it("Verify that pool event emitter 'full' event can register a callback", async () => {
+    const pool = new DynamicThreadPool(
+      numberOfWorkers,
+      numberOfWorkers,
+      './tests/worker-files/thread/testWorker.js'
+    )
+    const promises = []
+    let poolFull = 0
+    pool.emitter.on('full', () => ++poolFull)
+    for (let i = 0; i < numberOfWorkers * 2; i++) {
+      promises.push(pool.execute())
+    }
+    await Promise.all(promises)
+    // The `full` event is triggered when the number of submitted tasks at once reach the number of dynamic pool workers.
+    // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool.
+    expect(poolFull).toBe(numberOfWorkers + 1)
+    await pool.destroy()
+  })
+
   it("Verify that pool event emitter 'busy' event can register a callback", async () => {
     const pool = new FixedThreadPool(
       numberOfWorkers,
@@ -217,7 +236,7 @@ describe('Abstract pool test suite', () => {
     )
     const promises = []
     let poolBusy = 0
-    pool.emitter.on('busy', () => poolBusy++)
+    pool.emitter.on('busy', () => ++poolBusy)
     for (let i = 0; i < numberOfWorkers * 2; i++) {
       promises.push(pool.execute())
     }