From 4c35177b63cac8a87aa6de389e1232e94c59b8c9 Mon Sep 17 00:00:00 2001 From: aardizio Date: Tue, 16 Feb 2021 11:07:12 +0100 Subject: [PATCH] Implementation for killBehavior based on the last feedbacks --- src/index.ts | 1 + src/pools/cluster/dynamic.ts | 6 +++-- src/pools/thread/dynamic.ts | 6 +++-- src/utility-types.ts | 2 +- src/worker/abstract-worker.ts | 19 +++++++++++---- src/worker/worker-options.ts | 22 +++++++++++++++++- tests/pools/cluster/dynamic.test.js | 20 ++++++++++++++-- tests/pools/thread/dynamic.test.js | 23 +++++++++++++++++-- tests/worker/cluster/asyncErrorWorker.js | 5 ++-- tests/worker/cluster/asyncWorker.js | 8 +++++-- tests/worker/cluster/echoWorker.js | 7 ++++-- tests/worker/cluster/emptyWorker.js | 7 ++++-- tests/worker/cluster/errorWorker.js | 5 ++-- .../cluster/longRunningWorkerHardBehavior.js | 14 +++++++++++ ...er.js => longRunningWorkerSoftBehavior.js} | 5 +++- tests/worker/cluster/testWorker.js | 7 ++++-- tests/worker/thread/asyncWorker.js | 8 +++++-- tests/worker/thread/echoWorker.js | 7 ++++-- tests/worker/thread/emptyWorker.js | 7 ++++-- tests/worker/thread/errorWorker.js | 7 ++++-- .../thread/longRunningWorkerHardBehavior.js | 14 +++++++++++ ...er.js => longRunningWorkerSoftBehavior.js} | 5 +++- tests/worker/thread/testWorker.js | 7 ++++-- 23 files changed, 174 insertions(+), 38 deletions(-) create mode 100644 tests/worker/cluster/longRunningWorkerHardBehavior.js rename tests/worker/cluster/{longRunningWorker.js => longRunningWorkerSoftBehavior.js} (69%) create mode 100644 tests/worker/thread/longRunningWorkerHardBehavior.js rename tests/worker/thread/{longRunningWorker.js => longRunningWorkerSoftBehavior.js} (69%) diff --git a/src/index.ts b/src/index.ts index 7242dd5e..60536f99 100644 --- a/src/index.ts +++ b/src/index.ts @@ -16,3 +16,4 @@ export { AbstractWorker } from './worker/abstract-worker' export { ClusterWorker } from './worker/cluster-worker' export { ThreadWorker } from './worker/thread-worker' export type { WorkerOptions } from './worker/worker-options' +export { killBehaviorEnumeration } from './worker/worker-options' diff --git a/src/pools/cluster/dynamic.ts b/src/pools/cluster/dynamic.ts index 5eba843a..58e2b712 100644 --- a/src/pools/cluster/dynamic.ts +++ b/src/pools/cluster/dynamic.ts @@ -2,6 +2,7 @@ import type { Worker } from 'cluster' import type { JSONValue } from '../../utility-types' import type { ClusterPoolOptions } from './fixed' import { FixedClusterPool } from './fixed' +import { killBehaviorEnumeration } from '../../worker/worker-options' /** * A cluster pool with a dynamic number of workers, but a guaranteed minimum number of workers. @@ -62,9 +63,10 @@ export class DynamicClusterPool< const worker = this.createAndSetupWorker() this.registerWorkerMessageListener(worker, message => { const tasksInProgress = this.tasks.get(worker) - if (message.kill && tasksInProgress === 0) { + const isKillBehaviorOptionHard = + message.kill === killBehaviorEnumeration.HARD + if (isKillBehaviorOptionHard || tasksInProgress === 0) { // Kill received from the worker, means that no new tasks are submitted to that worker for a while ( > maxInactiveTime) - // To handle the case of a long-running task we will check if there is any active task this.sendToWorker(worker, { kill: 1 }) void this.destroyWorker(worker) } diff --git a/src/pools/thread/dynamic.ts b/src/pools/thread/dynamic.ts index 26d81d2f..7a0d4ffd 100644 --- a/src/pools/thread/dynamic.ts +++ b/src/pools/thread/dynamic.ts @@ -2,6 +2,7 @@ import type { JSONValue } from '../../utility-types' import type { PoolOptions } from '../abstract-pool' import type { ThreadWorkerWithMessageChannel } from './fixed' import { FixedThreadPool } from './fixed' +import { killBehaviorEnumeration } from '../../worker/worker-options' /** * A thread pool with a dynamic number of threads, but a guaranteed minimum number of threads. @@ -62,9 +63,10 @@ export class DynamicThreadPool< const worker = this.createAndSetupWorker() this.registerWorkerMessageListener(worker, message => { const tasksInProgress = this.tasks.get(worker) - if (message.kill && tasksInProgress === 0) { + const isKillBehaviorOptionHard = + message.kill === killBehaviorEnumeration.HARD + if (isKillBehaviorOptionHard || tasksInProgress === 0) { // Kill received from the worker, means that no new tasks are submitted to that worker for a while ( > maxInactiveTime) - // To handle the case of a long-running task we will check if there is any active task this.sendToWorker(worker, { kill: 1 }) void this.destroyWorker(worker) } diff --git a/src/utility-types.ts b/src/utility-types.ts index eb3f9727..d7f9d6a2 100644 --- a/src/utility-types.ts +++ b/src/utility-types.ts @@ -42,7 +42,7 @@ export interface MessageValue< /** * Kill code. */ - readonly kill?: number + readonly kill?: string | number /** * Error. */ diff --git a/src/worker/abstract-worker.ts b/src/worker/abstract-worker.ts index 34ab964c..c50cfbb0 100644 --- a/src/worker/abstract-worker.ts +++ b/src/worker/abstract-worker.ts @@ -3,6 +3,10 @@ import type { Worker } from 'cluster' import type { MessagePort } from 'worker_threads' import type { MessageValue } from '../utility-types' import type { WorkerOptions } from './worker-options' +import { killBehaviorEnumeration } from './worker-options' + +const defaultMaxInactiveTime = 1000 * 60 +const defaultKillBehavior = killBehaviorEnumeration.SOFT /** * Base class containing some shared logic for all poolifier workers. @@ -20,6 +24,10 @@ export abstract class AbstractWorker< * The maximum time to keep this worker alive while idle. The pool automatically checks and terminates this worker when the time expires. */ protected readonly maxInactiveTime: number + /** + * The kill behavior set as option on the Worker constructor or a default value. + */ + protected readonly killBehavior: string /** * Whether the worker is working asynchronously or not. */ @@ -47,11 +55,14 @@ export abstract class AbstractWorker< isMain: boolean, fn: (data: Data) => Response, protected mainWorker?: MainWorker | null, - public readonly opts: WorkerOptions = {} + public readonly opts: WorkerOptions = { + killBehavior: defaultKillBehavior, + maxInactiveTime: defaultMaxInactiveTime + } ) { super(type) - - this.maxInactiveTime = this.opts.maxInactiveTime ?? 1000 * 60 + this.killBehavior = this.opts.killBehavior ?? defaultKillBehavior + this.maxInactiveTime = this.opts.maxInactiveTime ?? defaultMaxInactiveTime this.async = !!this.opts.async this.lastTask = Date.now() if (!fn) throw new Error('fn parameter is mandatory') @@ -108,7 +119,7 @@ export abstract class AbstractWorker< */ protected checkAlive (): void { if (Date.now() - this.lastTask > this.maxInactiveTime) { - this.sendToMainWorker({ kill: 1 }) + this.sendToMainWorker({ kill: this.killBehavior }) } } diff --git a/src/worker/worker-options.ts b/src/worker/worker-options.ts index 0236164b..8bb9cc50 100644 --- a/src/worker/worker-options.ts +++ b/src/worker/worker-options.ts @@ -1,11 +1,22 @@ +/** + * Kill behavior enumeration + */ +export const killBehaviorEnumeration = Object.freeze({ + SOFT: 'SOFT', + HARD: 'HARD' +}) + /** * Options for workers. */ export interface WorkerOptions { /** * Maximum waiting time in milliseconds for tasks. - * * After this time, newly created workers will be terminated. + * The last active time of your worker unit will be updated when a task is submitted to a worker or when a worker terminate a task. + * If killBehavior is set to HARD this value represents also the timeout for the tasks that you submit to the pool, + * when this timeout expires your tasks is interrupted and the worker is killed if is not part of the minimum size of the pool. + * If killBehavior is set to SOFT your tasks have no timeout and your workers will not be terminated until your task is finished. * * @default 60.000 ms */ @@ -16,4 +27,13 @@ export interface WorkerOptions { * @default false */ async?: boolean + /** + * killBehavior dictates if your async unit ( worker/process ) will be deleted in case that a task is active on it. + * SOFT: If current time - last active time is greater than maxInactiveTime option, but a task is still running then the worker will be not deleted. + * HARD: If last active time is greater than maxInactiveTime option, but a task is still running then the worker will be deleted. + * This option only apply to the newly created workers. + * + * @default SOFT + */ + killBehavior?: string } diff --git a/tests/pools/cluster/dynamic.test.js b/tests/pools/cluster/dynamic.test.js index cd946cda..b45b7cca 100644 --- a/tests/pools/cluster/dynamic.test.js +++ b/tests/pools/cluster/dynamic.test.js @@ -87,11 +87,27 @@ describe('Dynamic cluster pool test suite ', () => { expect(res).toBeFalsy() }) - it('Verify scale processes up and down is working when long running task is used', async () => { + it('Verify scale processes up and down is working when long running task is used:hard', async () => { const longRunningPool = new DynamicClusterPool( min, max, - './tests/worker/cluster/longRunningWorker.js' + './tests/worker/cluster/longRunningWorkerHardBehavior.js' + ) + expect(longRunningPool.workers.length).toBe(min) + for (let i = 0; i < max * 10; i++) { + longRunningPool.execute({ test: 'test' }) + } + expect(longRunningPool.workers.length).toBe(max) + await new Promise(resolve => setTimeout(resolve, 3000)) + // Here we expect the workers to be at the max size since that the task is still running + expect(longRunningPool.workers.length).toBe(min) + }) + + it('Verify scale processes up and down is working when long running task is used:soft', async () => { + const longRunningPool = new DynamicClusterPool( + min, + max, + './tests/worker/cluster/longRunningWorkerSoftBehavior.js' ) expect(longRunningPool.workers.length).toBe(min) for (let i = 0; i < max * 10; i++) { diff --git a/tests/pools/thread/dynamic.test.js b/tests/pools/thread/dynamic.test.js index 2f89b2ff..836e6591 100644 --- a/tests/pools/thread/dynamic.test.js +++ b/tests/pools/thread/dynamic.test.js @@ -86,11 +86,30 @@ describe('Dynamic thread pool test suite ', () => { expect(res).toBeFalsy() }) - it('Verify scale thread up and down is working when long running task is used', async () => { + it('Verify scale thread up and down is working when long running task is used:hard', async () => { const longRunningPool = new DynamicThreadPool( min, max, - './tests/worker/thread/longRunningWorker.js', + './tests/worker/thread/longRunningWorkerHardBehavior.js', + { + errorHandler: e => console.error(e), + onlineHandler: () => console.log('worker is online') + } + ) + expect(longRunningPool.workers.length).toBe(min) + for (let i = 0; i < max * 10; i++) { + longRunningPool.execute({ test: 'test' }) + } + expect(longRunningPool.workers.length).toBe(max) + await new Promise(resolve => setTimeout(resolve, 1000)) + expect(longRunningPool.workers.length).toBe(min) + }) + + it('Verify scale thread up and down is working when long running task is used:soft', async () => { + const longRunningPool = new DynamicThreadPool( + min, + max, + './tests/worker/thread/longRunningWorkerSoftBehavior.js', { errorHandler: e => console.error(e), onlineHandler: () => console.log('worker is online') diff --git a/tests/worker/cluster/asyncErrorWorker.js b/tests/worker/cluster/asyncErrorWorker.js index 2476853b..be675e11 100644 --- a/tests/worker/cluster/asyncErrorWorker.js +++ b/tests/worker/cluster/asyncErrorWorker.js @@ -1,5 +1,5 @@ 'use strict' -const { ClusterWorker } = require('../../../lib/index') +const { ClusterWorker, killBehaviorEnumeration } = require('../../../lib/index') async function error (data) { return new Promise((resolve, reject) => { @@ -12,5 +12,6 @@ async function error (data) { module.exports = new ClusterWorker(error, { maxInactiveTime: 500, - async: true + async: true, + killBehavior: killBehaviorEnumeration }) diff --git a/tests/worker/cluster/asyncWorker.js b/tests/worker/cluster/asyncWorker.js index 975b0b36..b47adc91 100644 --- a/tests/worker/cluster/asyncWorker.js +++ b/tests/worker/cluster/asyncWorker.js @@ -1,5 +1,5 @@ 'use strict' -const { ClusterWorker } = require('../../../lib/index') +const { ClusterWorker, killBehaviorEnumeration } = require('../../../lib/index') async function sleep (data) { return new Promise((resolve, reject) => { @@ -7,4 +7,8 @@ async function sleep (data) { }) } -module.exports = new ClusterWorker(sleep, { maxInactiveTime: 500, async: true }) +module.exports = new ClusterWorker(sleep, { + maxInactiveTime: 500, + async: true, + killBehavior: killBehaviorEnumeration.HARD +}) diff --git a/tests/worker/cluster/echoWorker.js b/tests/worker/cluster/echoWorker.js index 6c77bcce..8d4477ee 100644 --- a/tests/worker/cluster/echoWorker.js +++ b/tests/worker/cluster/echoWorker.js @@ -1,8 +1,11 @@ 'use strict' -const { ClusterWorker } = require('../../../lib/index') +const { ClusterWorker, killBehaviorEnumeration } = require('../../../lib/index') function echo (data) { return data } -module.exports = new ClusterWorker(echo, { maxInactiveTime: 500 }) +module.exports = new ClusterWorker(echo, { + maxInactiveTime: 500, + killBehavior: killBehaviorEnumeration.HARD +}) diff --git a/tests/worker/cluster/emptyWorker.js b/tests/worker/cluster/emptyWorker.js index 62c8e2bb..4e5eeca7 100644 --- a/tests/worker/cluster/emptyWorker.js +++ b/tests/worker/cluster/emptyWorker.js @@ -1,6 +1,9 @@ 'use strict' -const { ClusterWorker } = require('../../../lib/index') +const { ClusterWorker, killBehaviorEnumeration } = require('../../../lib/index') function test (data) {} -module.exports = new ClusterWorker(test, { maxInactiveTime: 500 }) +module.exports = new ClusterWorker(test, { + maxInactiveTime: 500, + killBehavior: killBehaviorEnumeration.HARD +}) diff --git a/tests/worker/cluster/errorWorker.js b/tests/worker/cluster/errorWorker.js index 87df9254..bbcb78be 100644 --- a/tests/worker/cluster/errorWorker.js +++ b/tests/worker/cluster/errorWorker.js @@ -1,5 +1,5 @@ 'use strict' -const { ClusterWorker } = require('../../../lib/index') +const { ClusterWorker, killBehaviorEnumeration } = require('../../../lib/index') function error (data) { throw new Error('Error Message from ClusterWorker') @@ -7,5 +7,6 @@ function error (data) { module.exports = new ClusterWorker(error, { maxInactiveTime: 500, - async: false + async: false, + killBehavior: killBehaviorEnumeration }) diff --git a/tests/worker/cluster/longRunningWorkerHardBehavior.js b/tests/worker/cluster/longRunningWorkerHardBehavior.js new file mode 100644 index 00000000..9fe9d3f8 --- /dev/null +++ b/tests/worker/cluster/longRunningWorkerHardBehavior.js @@ -0,0 +1,14 @@ +'use strict' +const { ClusterWorker, killBehaviorEnumeration } = require('../../../lib/index') + +async function sleep (data) { + return new Promise((resolve, reject) => { + setTimeout(() => resolve(data), 50000) + }) +} + +module.exports = new ClusterWorker(sleep, { + maxInactiveTime: 500, + async: true, + killBehavior: killBehaviorEnumeration.HARD +}) diff --git a/tests/worker/cluster/longRunningWorker.js b/tests/worker/cluster/longRunningWorkerSoftBehavior.js similarity index 69% rename from tests/worker/cluster/longRunningWorker.js rename to tests/worker/cluster/longRunningWorkerSoftBehavior.js index d751d351..c4c00f6a 100644 --- a/tests/worker/cluster/longRunningWorker.js +++ b/tests/worker/cluster/longRunningWorkerSoftBehavior.js @@ -7,4 +7,7 @@ async function sleep (data) { }) } -module.exports = new ClusterWorker(sleep, { maxInactiveTime: 500, async: true }) +module.exports = new ClusterWorker(sleep, { + maxInactiveTime: 500, + async: true +}) diff --git a/tests/worker/cluster/testWorker.js b/tests/worker/cluster/testWorker.js index a8a6bb9e..3e1ed0d1 100644 --- a/tests/worker/cluster/testWorker.js +++ b/tests/worker/cluster/testWorker.js @@ -1,5 +1,5 @@ 'use strict' -const { ClusterWorker } = require('../../../lib/index') +const { ClusterWorker, killBehaviorEnumeration } = require('../../../lib/index') const { isMaster } = require('cluster') function test (data) { @@ -12,4 +12,7 @@ function test (data) { return isMaster } -module.exports = new ClusterWorker(test, { maxInactiveTime: 500 }) +module.exports = new ClusterWorker(test, { + maxInactiveTime: 500, + killBehavior: killBehaviorEnumeration.HARD +}) diff --git a/tests/worker/thread/asyncWorker.js b/tests/worker/thread/asyncWorker.js index 25401cfb..59a900e9 100644 --- a/tests/worker/thread/asyncWorker.js +++ b/tests/worker/thread/asyncWorker.js @@ -1,5 +1,5 @@ 'use strict' -const { ThreadWorker } = require('../../../lib/index') +const { ThreadWorker, killBehaviorEnumeration } = require('../../../lib/index') async function sleep (data) { return new Promise((resolve, reject) => { @@ -7,4 +7,8 @@ async function sleep (data) { }) } -module.exports = new ThreadWorker(sleep, { maxInactiveTime: 500, async: true }) +module.exports = new ThreadWorker(sleep, { + maxInactiveTime: 500, + async: true, + killBehavior: killBehaviorEnumeration.HARD +}) diff --git a/tests/worker/thread/echoWorker.js b/tests/worker/thread/echoWorker.js index 006bf97c..3b554ccc 100644 --- a/tests/worker/thread/echoWorker.js +++ b/tests/worker/thread/echoWorker.js @@ -1,8 +1,11 @@ 'use strict' -const { ThreadWorker } = require('../../../lib/index') +const { ThreadWorker, killBehaviorEnumeration } = require('../../../lib/index') function echo (data) { return data } -module.exports = new ThreadWorker(echo, { maxInactiveTime: 500 }) +module.exports = new ThreadWorker(echo, { + maxInactiveTime: 500, + killBehavior: killBehaviorEnumeration.HARD +}) diff --git a/tests/worker/thread/emptyWorker.js b/tests/worker/thread/emptyWorker.js index 69a83a77..c7034caa 100644 --- a/tests/worker/thread/emptyWorker.js +++ b/tests/worker/thread/emptyWorker.js @@ -1,6 +1,9 @@ 'use strict' -const { ThreadWorker } = require('../../../lib/index') +const { ThreadWorker, killBehaviorEnumeration } = require('../../../lib/index') function test (data) {} -module.exports = new ThreadWorker(test, { maxInactiveTime: 500 }) +module.exports = new ThreadWorker(test, { + maxInactiveTime: 500, + killBehavior: killBehaviorEnumeration.HARD +}) diff --git a/tests/worker/thread/errorWorker.js b/tests/worker/thread/errorWorker.js index 63a27513..6b7ee3c9 100644 --- a/tests/worker/thread/errorWorker.js +++ b/tests/worker/thread/errorWorker.js @@ -1,8 +1,11 @@ 'use strict' -const { ThreadWorker } = require('../../../lib/index') +const { ThreadWorker, killBehaviorEnumeration } = require('../../../lib/index') function error (data) { throw new Error(data) } -module.exports = new ThreadWorker(error, { maxInactiveTime: 500 }) +module.exports = new ThreadWorker(error, { + maxInactiveTime: 500, + killBehavior: killBehaviorEnumeration.HARD +}) diff --git a/tests/worker/thread/longRunningWorkerHardBehavior.js b/tests/worker/thread/longRunningWorkerHardBehavior.js new file mode 100644 index 00000000..ec745794 --- /dev/null +++ b/tests/worker/thread/longRunningWorkerHardBehavior.js @@ -0,0 +1,14 @@ +'use strict' +const { ThreadWorker, killBehaviorEnumeration } = require('../../../lib/index') + +async function sleep (data) { + return new Promise((resolve, reject) => { + setTimeout(() => resolve(data), 50000) + }) +} + +module.exports = new ThreadWorker(sleep, { + maxInactiveTime: 500, + async: true, + killBehavior: killBehaviorEnumeration.HARD +}) diff --git a/tests/worker/thread/longRunningWorker.js b/tests/worker/thread/longRunningWorkerSoftBehavior.js similarity index 69% rename from tests/worker/thread/longRunningWorker.js rename to tests/worker/thread/longRunningWorkerSoftBehavior.js index 86891279..eed0586f 100644 --- a/tests/worker/thread/longRunningWorker.js +++ b/tests/worker/thread/longRunningWorkerSoftBehavior.js @@ -7,4 +7,7 @@ async function sleep (data) { }) } -module.exports = new ThreadWorker(sleep, { maxInactiveTime: 500, async: true }) +module.exports = new ThreadWorker(sleep, { + maxInactiveTime: 500, + async: true +}) diff --git a/tests/worker/thread/testWorker.js b/tests/worker/thread/testWorker.js index 3556da01..77dfdc8e 100644 --- a/tests/worker/thread/testWorker.js +++ b/tests/worker/thread/testWorker.js @@ -1,5 +1,5 @@ 'use strict' -const { ThreadWorker } = require('../../../lib/index') +const { ThreadWorker, killBehaviorEnumeration } = require('../../../lib/index') const { isMainThread } = require('worker_threads') function test (data) { @@ -12,4 +12,7 @@ function test (data) { return isMainThread } -module.exports = new ThreadWorker(test, { maxInactiveTime: 500 }) +module.exports = new ThreadWorker(test, { + maxInactiveTime: 500, + killBehavior: killBehaviorEnumeration.HARD +}) -- 2.34.1