From c01733f14c19a0aabef6f310f29691912942d55c Mon Sep 17 00:00:00 2001 From: aardizio Date: Mon, 15 Feb 2021 17:00:30 +0100 Subject: [PATCH] Better handling for maxInactiveTime on dynamic pools --- src/pools/abstract-pool.ts | 19 ++++++++++++++++--- src/pools/cluster/dynamic.ts | 1 + src/pools/thread/dynamic.ts | 6 +++++- tests/pools/cluster/dynamic.test.js | 15 +++++++++++++++ tests/pools/thread/dynamic.test.js | 21 +++++++++++++++++++++ tests/worker/cluster/longRunningWorker.js | 10 ++++++++++ tests/worker/thread/longRunningWorker.js | 10 ++++++++++ 7 files changed, 78 insertions(+), 4 deletions(-) create mode 100644 tests/worker/cluster/longRunningWorker.js create mode 100644 tests/worker/thread/longRunningWorker.js diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 2ffd292a..879e3e4f 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -83,7 +83,7 @@ export abstract class AbstractPool< /** * - `key`: The `Worker` - * - `value`: Number of tasks that has been assigned to that worker since it started + * - `value`: Number of tasks currently in progress on the worker. */ public readonly tasks: Map = new Map() @@ -120,7 +120,6 @@ export abstract class AbstractPool< if (!this.filePath) { throw new Error('Please specify a file with a worker implementation') } - this.setupHook() for (let i = 1; i <= this.numberOfWorkers; i++) { @@ -199,6 +198,20 @@ export abstract class AbstractPool< } } + /** + * Increase the number of tasks that the given workers has done. + * + * @param worker Workers whose tasks are increased. + */ + protected decreaseWorkersTasks (worker: Worker): void { + const numberOfTasksTheWorkerHas = this.tasks.get(worker) + if (numberOfTasksTheWorkerHas !== undefined) { + this.tasks.set(worker, numberOfTasksTheWorkerHas - 1) + } else { + throw Error('Worker could not be found in tasks map') + } + } + /** * Removes the given worker from the pool. * @@ -254,7 +267,7 @@ export abstract class AbstractPool< const listener: (message: MessageValue) => void = message => { if (message.id === messageId) { this.unregisterWorkerMessageListener(worker, listener) - this.increaseWorkersTask(worker) + this.decreaseWorkersTasks(worker) if (message.error) reject(message.error) else resolve(message.data as Response) } diff --git a/src/pools/cluster/dynamic.ts b/src/pools/cluster/dynamic.ts index b0bb6973..44847b61 100644 --- a/src/pools/cluster/dynamic.ts +++ b/src/pools/cluster/dynamic.ts @@ -61,6 +61,7 @@ export class DynamicClusterPool< // All workers are busy, create a new worker const worker = this.createAndSetupWorker() this.registerWorkerMessageListener(worker, message => { + const tasksInProgress = this.tasks.get(worker) if (message.kill) { this.sendToWorker(worker, { kill: 1 }) void this.destroyWorker(worker) diff --git a/src/pools/thread/dynamic.ts b/src/pools/thread/dynamic.ts index fddae466..92a5fdf2 100644 --- a/src/pools/thread/dynamic.ts +++ b/src/pools/thread/dynamic.ts @@ -61,7 +61,11 @@ export class DynamicThreadPool< // All workers are busy, create a new worker const worker = this.createAndSetupWorker() this.registerWorkerMessageListener(worker, message => { - if (message.kill) { + const tasksInProgress = this.tasks.get(worker) + if (message.kill && !tasksInProgress) { + // Kill received from the worker, means that no new tasks are submitted to that worker for a while( > maxInactiveTime) + // To handle the case of a long-running task we will check if the there is any active task + console.log('Here we are') this.sendToWorker(worker, { kill: 1 }) void this.destroyWorker(worker) } diff --git a/tests/pools/cluster/dynamic.test.js b/tests/pools/cluster/dynamic.test.js index 6a82b057..024e2670 100644 --- a/tests/pools/cluster/dynamic.test.js +++ b/tests/pools/cluster/dynamic.test.js @@ -87,4 +87,19 @@ describe('Dynamic cluster pool test suite ', () => { const res = await pool1.execute({ test: 'test' }) expect(res).toBeFalsy() }) + it('Verify scale processes up and down is working when long running task is used', async () => { + const longRunningPool = new DynamicClusterPool( + min, + max, + './tests/worker/thread/longRunningWorker.js' + ) + expect(longRunningPool.workers.length).toBe(min) + for (let i = 0; i < max * 10; i++) { + longRunningPool.execute({ test: 'test' }) + } + expect(longRunningPool.workers.length).toBe(max) + await new Promise(resolve => setTimeout(resolve, 1000)) + // Here we expect the workers to be at the max size since that the task is still running + expect(longRunningPool.workers.length).toBe(max) + }) }) diff --git a/tests/pools/thread/dynamic.test.js b/tests/pools/thread/dynamic.test.js index 98a0dd0f..d59b9858 100644 --- a/tests/pools/thread/dynamic.test.js +++ b/tests/pools/thread/dynamic.test.js @@ -53,6 +53,7 @@ describe('Dynamic thread pool test suite ', () => { await new Promise(resolve => setTimeout(resolve, 1000)) expect(pool.workers.length).toBe(min) }) + it('Shutdown test', async () => { let closedThreads = 0 pool.workers.forEach(w => { @@ -85,4 +86,24 @@ describe('Dynamic thread pool test suite ', () => { const res = await pool1.execute({ test: 'test' }) expect(res).toBeFalsy() }) + + it('Verify scale thread up and down is working when long running task is used', async () => { + const longRunningPool = new DynamicThreadPool( + min, + max, + './tests/worker/thread/longRunningWorker.js', + { + errorHandler: e => console.error(e), + onlineHandler: () => console.log('worker is online') + } + ) + expect(longRunningPool.workers.length).toBe(min) + for (let i = 0; i < max * 10; i++) { + longRunningPool.execute({ test: 'test' }) + } + expect(longRunningPool.workers.length).toBe(max) + await new Promise(resolve => setTimeout(resolve, 1000)) + // Here we expect the workers to be at the max size since that the task is still running + expect(longRunningPool.workers.length).toBe(max) + }) }) diff --git a/tests/worker/cluster/longRunningWorker.js b/tests/worker/cluster/longRunningWorker.js new file mode 100644 index 00000000..d751d351 --- /dev/null +++ b/tests/worker/cluster/longRunningWorker.js @@ -0,0 +1,10 @@ +'use strict' +const { ClusterWorker } = require('../../../lib/index') + +async function sleep (data) { + return new Promise((resolve, reject) => { + setTimeout(() => resolve(data), 50000) + }) +} + +module.exports = new ClusterWorker(sleep, { maxInactiveTime: 500, async: true }) diff --git a/tests/worker/thread/longRunningWorker.js b/tests/worker/thread/longRunningWorker.js new file mode 100644 index 00000000..86891279 --- /dev/null +++ b/tests/worker/thread/longRunningWorker.js @@ -0,0 +1,10 @@ +'use strict' +const { ThreadWorker } = require('../../../lib/index') + +async function sleep (data) { + return new Promise((resolve, reject) => { + setTimeout(() => resolve(data), 50000) + }) +} + +module.exports = new ThreadWorker(sleep, { maxInactiveTime: 500, async: true }) -- 2.34.1