From 164d950a1bcdc39b4762294c1581e8befbb344e2 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Wed, 5 Apr 2023 17:49:50 +0200 Subject: [PATCH] feat: add 'full' event on dynamic pool emitter MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- CHANGELOG.md | 4 ++++ README.md | 3 +++ examples/dynamicExample.js | 11 ++++++---- examples/fixedExample.js | 5 ++++- src/pools/abstract-pool.ts | 12 +++++++++++ src/pools/pool.ts | 3 ++- .../selection-strategies-types.ts | 3 --- tests/pools/abstract/abstract-pool.test.js | 21 ++++++++++++++++++- 8 files changed, 52 insertions(+), 10 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 94ecf704..a2cc015d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- Add `full` event to dynamic pool. + ## [2.4.1] - 2023-04-05 ### Changed diff --git a/README.md b/README.md index 1248db3f..dc956759 100644 --- a/README.md +++ b/README.md @@ -120,11 +120,14 @@ const pool = new FixedThreadPool(15, './yourWorker.js', { errorHandler: (e) => console.error(e), onlineHandler: () => console.log('worker is online') }) +pool.emitter.on('busy', () => console.log('Pool is busy')) + // or a dynamic worker-threads pool const pool = new DynamicThreadPool(10, 100, './yourWorker.js', { errorHandler: (e) => console.error(e), onlineHandler: () => console.log('worker is online') }) +pool.emitter.on('full', () => console.log('Pool is full')) pool.emitter.on('busy', () => console.log('Pool is busy')) // the execute method signature is the same for both implementations, diff --git a/examples/dynamicExample.js b/examples/dynamicExample.js index ed1bddeb..79f73c11 100644 --- a/examples/dynamicExample.js +++ b/examples/dynamicExample.js @@ -1,11 +1,13 @@ const { DynamicThreadPool } = require('poolifier') let resolved = 0 -let maxReached = 0 +let poolFull = 0 +let poolBusy = 0 const pool = new DynamicThreadPool(10, 20, './yourWorker.js', { errorHandler: e => console.error(e), onlineHandler: () => console.log('worker is online') }) -pool.emitter.on('busy', () => maxReached++) +pool.emitter.on('full', () => poolFull++) +pool.emitter.on('busy', () => poolBusy++) const start = Date.now() const iterations = 1000 @@ -15,8 +17,9 @@ for (let i = 1; i <= iterations; i++) { .then(() => { resolved++ if (resolved === iterations) { - console.log('Time take is ' + (Date.now() - start)) - return console.log('The pool was busy for ' + maxReached + ' times') + console.log('Time taken is ' + (Date.now() - start)) + console.log('The pool was full for ' + poolFull + ' times') + return console.log('The pool was busy for ' + poolBusy + ' times') } return null }) diff --git a/examples/fixedExample.js b/examples/fixedExample.js index 2c36a703..c6ea8848 100644 --- a/examples/fixedExample.js +++ b/examples/fixedExample.js @@ -1,9 +1,11 @@ const { FixedThreadPool } = require('poolifier') let resolved = 0 +let poolBusy = 0 const pool = new FixedThreadPool(15, './yourWorker.js', { errorHandler: e => console.error(e), onlineHandler: () => console.log('worker is online') }) +pool.emitter.on('busy', () => poolBusy++) const start = Date.now() const iterations = 1000 @@ -13,7 +15,8 @@ for (let i = 1; i <= iterations; i++) { .then(() => { resolved++ if (resolved === iterations) { - return console.log('Time take is ' + (Date.now() - start)) + console.log('Time taken is ' + (Date.now() - start)) + return console.log('The pool was busy for ' + poolBusy + ' times') } return null }) diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index c1e6ddc0..0da7f86b 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -76,6 +76,7 @@ export abstract class AbstractPool< this.chooseWorker.bind(this) this.internalExecute.bind(this) + this.checkAndEmitFull.bind(this) this.checkAndEmitBusy.bind(this) this.sendToWorker.bind(this) @@ -209,6 +210,7 @@ export abstract class AbstractPool< const [workerKey, worker] = this.chooseWorker() const messageId = crypto.randomUUID() const res = this.internalExecute(workerKey, worker, messageId) + this.checkAndEmitFull() this.checkAndEmitBusy() this.sendToWorker(worker, { // eslint-disable-next-line @typescript-eslint/consistent-type-assertions @@ -413,6 +415,16 @@ export abstract class AbstractPool< } } + private checkAndEmitFull (): void { + if ( + this.type === PoolType.DYNAMIC && + this.opts.enableEvents === true && + this.full + ) { + this.emitter?.emit('full') + } + } + /** * Gets worker tasks usage. * diff --git a/src/pools/pool.ts b/src/pools/pool.ts index 8a5e505f..600cb422 100644 --- a/src/pools/pool.ts +++ b/src/pools/pool.ts @@ -56,7 +56,8 @@ export interface IPool { * * Events that can currently be listened to: * - * - `'busy'` + * - `'full'`: Emitted when the pool is dynamic and full. + * - `'busy'`: Emitted when the pool is busy. */ readonly emitter?: PoolEmitter /** diff --git a/src/pools/selection-strategies/selection-strategies-types.ts b/src/pools/selection-strategies/selection-strategies-types.ts index 4fb2f358..bf364f92 100644 --- a/src/pools/selection-strategies/selection-strategies-types.ts +++ b/src/pools/selection-strategies/selection-strategies-types.ts @@ -50,17 +50,14 @@ export interface IWorkerChoiceStrategy< > { /** * The pool instance. - * @readonly */ readonly pool: IPoolInternal /** * Is the pool attached to the strategy dynamic?. - * @readonly */ readonly isDynamicPool: boolean /** * Required pool tasks usage statistics. - * @readonly */ readonly requiredStatistics: RequiredStatistics /** diff --git a/tests/pools/abstract/abstract-pool.test.js b/tests/pools/abstract/abstract-pool.test.js index 5bb5cde5..d9b5695e 100644 --- a/tests/pools/abstract/abstract-pool.test.js +++ b/tests/pools/abstract/abstract-pool.test.js @@ -210,6 +210,25 @@ describe('Abstract pool test suite', () => { await pool.destroy() }) + it("Verify that pool event emitter 'full' event can register a callback", async () => { + const pool = new DynamicThreadPool( + numberOfWorkers, + numberOfWorkers, + './tests/worker-files/thread/testWorker.js' + ) + const promises = [] + let poolFull = 0 + pool.emitter.on('full', () => ++poolFull) + for (let i = 0; i < numberOfWorkers * 2; i++) { + promises.push(pool.execute()) + } + await Promise.all(promises) + // The `full` event is triggered when the number of submitted tasks at once reach the number of dynamic pool workers. + // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool. + expect(poolFull).toBe(numberOfWorkers + 1) + await pool.destroy() + }) + it("Verify that pool event emitter 'busy' event can register a callback", async () => { const pool = new FixedThreadPool( numberOfWorkers, @@ -217,7 +236,7 @@ describe('Abstract pool test suite', () => { ) const promises = [] let poolBusy = 0 - pool.emitter.on('busy', () => poolBusy++) + pool.emitter.on('busy', () => ++poolBusy) for (let i = 0; i < numberOfWorkers * 2; i++) { promises.push(pool.execute()) } -- 2.34.1