From: Jérôme Benoit Date: Fri, 7 Oct 2022 20:29:20 +0000 (+0200) Subject: Report more cleanups from work in progress PRs X-Git-Tag: v2.2.1~16 X-Git-Url: https://git.piment-noir.org/?a=commitdiff_plain;h=cf597bc58029b9b5bf861c29fa9e917a6a72d859;p=poolifier.git Report more cleanups from work in progress PRs Signed-off-by: Jérôme Benoit --- diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 484b6916..71046d49 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -280,7 +280,8 @@ export abstract class AbstractPool< const messageId = ++this.nextMessageId const res = this.internalExecute(worker, messageId) this.checkAndEmitBusy() - this.sendToWorker(worker, { data: data ?? ({} as Data), id: messageId }) + data = data ?? ({} as Data) + this.sendToWorker(worker, { data, id: messageId }) return res } diff --git a/src/worker/abstract-worker.ts b/src/worker/abstract-worker.ts index 8f86fb48..b49ede8b 100644 --- a/src/worker/abstract-worker.ts +++ b/src/worker/abstract-worker.ts @@ -33,7 +33,6 @@ export abstract class AbstractWorker< * Options for the worker. */ public readonly opts: WorkerOptions - /** * Constructs a new poolifier worker. * @@ -75,25 +74,32 @@ export abstract class AbstractWorker< } this.mainWorker?.on('message', (value: MessageValue) => { - if (value?.data && value.id) { - // Here you will receive messages - if (this.opts.async) { - this.runInAsyncScope(this.runAsync.bind(this), this, fn, value) - } else { - this.runInAsyncScope(this.run.bind(this), this, fn, value) - } - } else if (value.parent) { - // Save a reference of the main worker to communicate with it - // This will be received once - this.mainWorker = value.parent - } else if (value.kill) { - // Here is time to kill this worker, just clearing the interval - if (this.aliveInterval) clearInterval(this.aliveInterval) - this.emitDestroy() - } + this.messageListener(value, fn) }) } + protected messageListener ( + value: MessageValue, + fn: (data: Data) => Response + ): void { + if (value.data !== undefined && value.id !== undefined) { + // Here you will receive messages + if (this.opts.async) { + this.runInAsyncScope(this.runAsync.bind(this), this, fn, value) + } else { + this.runInAsyncScope(this.run.bind(this), this, fn, value) + } + } else if (value.parent !== undefined) { + // Save a reference of the main worker to communicate with it + // This will be received once + this.mainWorker = value.parent + } else if (value.kill !== undefined) { + // Here is time to kill this worker, just clearing the interval + if (this.aliveInterval) clearInterval(this.aliveInterval) + this.emitDestroy() + } + } + private checkWorkerOptions (opts: WorkerOptions) { this.opts.killBehavior = opts.killBehavior ?? DEFAULT_KILL_BEHAVIOR this.opts.maxInactiveTime = diff --git a/tests/pools/abstract/abstract-pool.test.js b/tests/pools/abstract/abstract-pool.test.js index d4690b4f..d9ab30fe 100644 --- a/tests/pools/abstract/abstract-pool.test.js +++ b/tests/pools/abstract/abstract-pool.test.js @@ -139,7 +139,7 @@ describe('Abstract pool test suite', () => { pool.destroy() }) - it("Verify that pool event emitter 'busy' event can register a callback", () => { + it("Verify that pool event emitter 'busy' event can register a callback", async () => { const pool = new FixedThreadPool( numberOfWorkers, './tests/worker-files/thread/testWorker.js' @@ -150,6 +150,7 @@ describe('Abstract pool test suite', () => { for (let i = 0; i < numberOfWorkers * 2; i++) { promises.push(pool.execute({ test: 'test' })) } + await Promise.all(promises) // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers. // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool. expect(poolBusy).toBe(numberOfWorkers + 1) diff --git a/tests/pools/thread/dynamic.test.js b/tests/pools/thread/dynamic.test.js index 3ae35ac9..92fbd9b8 100644 --- a/tests/pools/thread/dynamic.test.js +++ b/tests/pools/thread/dynamic.test.js @@ -51,14 +51,10 @@ describe('Dynamic thread pool test suite', () => { }) it('Shutdown test', async () => { - let closedThreads = 0 - pool.workers.forEach(w => { - w.on('exit', () => { - closedThreads++ - }) - }) + const exitPromise = TestUtils.waitExits(pool, min) await pool.destroy() - expect(closedThreads).toBe(min) + const numberOfExitEvents = await exitPromise + expect(numberOfExitEvents).toBe(min) }) it('Validation of inputs test', () => { diff --git a/tests/test-utils.js b/tests/test-utils.js index 4668e072..8565c750 100644 --- a/tests/test-utils.js +++ b/tests/test-utils.js @@ -18,7 +18,7 @@ class TestUtils { } static async workerSleepFunction (data, ms) { - return new Promise((resolve, reject) => { + return new Promise(resolve => { setTimeout(() => resolve(data), ms) }) } diff --git a/tests/worker-files/cluster/asyncWorker.js b/tests/worker-files/cluster/asyncWorker.js index bceaffae..70db6a87 100644 --- a/tests/worker-files/cluster/asyncWorker.js +++ b/tests/worker-files/cluster/asyncWorker.js @@ -3,7 +3,7 @@ const { ClusterWorker, KillBehaviors } = require('../../../lib/index') const TestUtils = require('../../test-utils') async function sleep (data) { - return TestUtils.workerSleepFunction(data, 2000) + return await TestUtils.workerSleepFunction(data, 2000) } module.exports = new ClusterWorker(sleep, { diff --git a/tests/worker-files/cluster/longRunningWorkerHardBehavior.js b/tests/worker-files/cluster/longRunningWorkerHardBehavior.js index 73fdad01..5c0b6202 100644 --- a/tests/worker-files/cluster/longRunningWorkerHardBehavior.js +++ b/tests/worker-files/cluster/longRunningWorkerHardBehavior.js @@ -3,7 +3,7 @@ const { ClusterWorker, KillBehaviors } = require('../../../lib/index') const TestUtils = require('../../test-utils') async function sleep (data) { - return TestUtils.workerSleepFunction(data, 50000) + return await TestUtils.workerSleepFunction(data, 50000) } module.exports = new ClusterWorker(sleep, { diff --git a/tests/worker-files/cluster/longRunningWorkerSoftBehavior.js b/tests/worker-files/cluster/longRunningWorkerSoftBehavior.js index 5498752f..ea53bbea 100644 --- a/tests/worker-files/cluster/longRunningWorkerSoftBehavior.js +++ b/tests/worker-files/cluster/longRunningWorkerSoftBehavior.js @@ -3,7 +3,7 @@ const { ClusterWorker } = require('../../../lib/index') const TestUtils = require('../../test-utils') async function sleep (data) { - return TestUtils.workerSleepFunction(data, 50000) + return await TestUtils.workerSleepFunction(data, 50000) } module.exports = new ClusterWorker(sleep, { diff --git a/tests/worker-files/thread/asyncWorker.js b/tests/worker-files/thread/asyncWorker.js index 6508d6da..730d10ce 100644 --- a/tests/worker-files/thread/asyncWorker.js +++ b/tests/worker-files/thread/asyncWorker.js @@ -3,7 +3,7 @@ const { ThreadWorker, KillBehaviors } = require('../../../lib/index') const TestUtils = require('../../test-utils') async function sleep (data) { - return TestUtils.workerSleepFunction(data, 2000) + return await TestUtils.workerSleepFunction(data, 2000) } module.exports = new ThreadWorker(sleep, { diff --git a/tests/worker-files/thread/longRunningWorkerHardBehavior.js b/tests/worker-files/thread/longRunningWorkerHardBehavior.js index 3c707eb5..120e8e5f 100644 --- a/tests/worker-files/thread/longRunningWorkerHardBehavior.js +++ b/tests/worker-files/thread/longRunningWorkerHardBehavior.js @@ -3,7 +3,7 @@ const { ThreadWorker, KillBehaviors } = require('../../../lib/index') const TestUtils = require('../../test-utils') async function sleep (data) { - return TestUtils.workerSleepFunction(data, 50000) + return await TestUtils.workerSleepFunction(data, 50000) } module.exports = new ThreadWorker(sleep, { diff --git a/tests/worker-files/thread/longRunningWorkerSoftBehavior.js b/tests/worker-files/thread/longRunningWorkerSoftBehavior.js index eed0586f..12048093 100644 --- a/tests/worker-files/thread/longRunningWorkerSoftBehavior.js +++ b/tests/worker-files/thread/longRunningWorkerSoftBehavior.js @@ -1,10 +1,9 @@ 'use strict' const { ThreadWorker } = require('../../../lib/index') +const TestUtils = require('../../test-utils') async function sleep (data) { - return new Promise((resolve, reject) => { - setTimeout(() => resolve(data), 50000) - }) + return await TestUtils.workerSleepFunction(data, 50000) } module.exports = new ThreadWorker(sleep, { diff --git a/tests/worker-files/thread/testWorker.js b/tests/worker-files/thread/testWorker.js index 369dbdba..ccf55b56 100644 --- a/tests/worker-files/thread/testWorker.js +++ b/tests/worker-files/thread/testWorker.js @@ -1,14 +1,10 @@ 'use strict' const { ThreadWorker, KillBehaviors } = require('../../../lib/index') const { isMainThread } = require('worker_threads') +const TestUtils = require('../../test-utils') function test (data) { - for (let i = 0; i < 50; i++) { - const o = { - a: i - } - JSON.stringify(o) - } + TestUtils.jsonIntegerSerialization(50) return isMainThread }