From 1a81f8af06213d08267e8e7d593c5ec7be087535 Mon Sep 17 00:00:00 2001 From: Shinigami Date: Tue, 16 Feb 2021 15:57:09 +0100 Subject: [PATCH] Improvements for #161 (#169) --- CHANGELOG.md | 3 +- package-lock.json | 12 ++--- src/index.ts | 4 +- src/pools/cluster/dynamic.ts | 8 +-- src/pools/thread/dynamic.ts | 8 +-- src/utility-types.ts | 4 +- src/worker/abstract-worker.ts | 19 +++---- src/worker/worker-options.ts | 52 ++++++++++++++----- tests/worker/cluster/asyncErrorWorker.js | 4 +- tests/worker/cluster/asyncWorker.js | 4 +- tests/worker/cluster/echoWorker.js | 4 +- tests/worker/cluster/emptyWorker.js | 4 +- tests/worker/cluster/errorWorker.js | 4 +- .../cluster/longRunningWorkerHardBehavior.js | 4 +- tests/worker/cluster/testWorker.js | 4 +- tests/worker/thread/asyncWorker.js | 4 +- tests/worker/thread/echoWorker.js | 4 +- tests/worker/thread/emptyWorker.js | 4 +- tests/worker/thread/errorWorker.js | 4 +- .../thread/longRunningWorkerHardBehavior.js | 4 +- tests/worker/thread/testWorker.js | 4 +- 21 files changed, 98 insertions(+), 64 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f70cef0a..d23919ef 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,7 +13,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Breaking Changes -- maxInactiveTime default behavior is now changed, if you want to keep the old behavior set killBehavior to HARD ( Find more details on our JSDoc ). +- `maxInactiveTime` default behavior is now changed, if you want to keep the old behavior set `killBehavior` to `KillBehaviors.HARD`. + _Find more details on our JSDoc._ - We changed some internal structures, but you shouldn't be too affected by them as these are internal changes. diff --git a/package-lock.json b/package-lock.json index 94eef92e..cab635a4 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1683,9 +1683,9 @@ }, "dependencies": { "globals": { - "version": "13.5.0", - "resolved": "https://registry.npmjs.org/globals/-/globals-13.5.0.tgz", - "integrity": "sha512-TMJe2Iu/qCIEFnG7IQ62C9N/iKdgX5wSvmGOVuk75+UAGDW+Yv/hH5+Ky6d/8UMqo4WCzhFCy+pHsvv09zhBoQ==", + "version": "13.6.0", + "resolved": "https://registry.npmjs.org/globals/-/globals-13.6.0.tgz", + "integrity": "sha512-YFKCX0SiPg7l5oKYCJ2zZGxcXprVXHcSnVuvzrT3oSENQonVLqM5pf9fN5dLGZGyCjhw8TN8Btwe/jKnZ0pjvQ==", "dev": true, "requires": { "type-fest": "^0.20.2" @@ -2192,9 +2192,9 @@ "dev": true }, "handlebars": { - "version": "4.7.6", - "resolved": "https://registry.npmjs.org/handlebars/-/handlebars-4.7.6.tgz", - "integrity": "sha512-1f2BACcBfiwAfStCKZNrUCgqNZkGsAT7UM3kkYtXuLo0KnaVfjKOyf7PRzB6++aK9STyT1Pd2ZCPe3EGOXleXA==", + "version": "4.7.7", + "resolved": "https://registry.npmjs.org/handlebars/-/handlebars-4.7.7.tgz", + "integrity": "sha512-aAcXm5OAfE/8IXkcZvCepKU3VzW1/39Fb5ZuqMtgI/hT8X2YgoMvBY5dLhq/cpOvw7Lk1nK/UF71aLG/ZnVYRA==", "dev": true, "requires": { "minimist": "^1.2.5", diff --git a/src/index.ts b/src/index.ts index b64a274b..6cc41500 100644 --- a/src/index.ts +++ b/src/index.ts @@ -15,5 +15,5 @@ export type { ThreadWorkerWithMessageChannel } from './pools/thread/fixed' export { AbstractWorker } from './worker/abstract-worker' export { ClusterWorker } from './worker/cluster-worker' export { ThreadWorker } from './worker/thread-worker' -export type { WorkerOptions } from './worker/worker-options' -export { killBehaviorTypes } from './worker/worker-options' +export { KillBehaviors } from './worker/worker-options' +export type { KillBehavior, WorkerOptions } from './worker/worker-options' diff --git a/src/pools/cluster/dynamic.ts b/src/pools/cluster/dynamic.ts index bfafd66b..e97ab4a9 100644 --- a/src/pools/cluster/dynamic.ts +++ b/src/pools/cluster/dynamic.ts @@ -1,8 +1,8 @@ import type { Worker } from 'cluster' import type { JSONValue } from '../../utility-types' +import { isKillBehavior, KillBehaviors } from '../../worker/worker-options' import type { ClusterPoolOptions } from './fixed' import { FixedClusterPool } from './fixed' -import { killBehaviorTypes } from '../../worker/worker-options' /** * A cluster pool with a dynamic number of workers, but a guaranteed minimum number of workers. @@ -63,8 +63,10 @@ export class DynamicClusterPool< const workerCreated = this.createAndSetupWorker() this.registerWorkerMessageListener(workerCreated, message => { const tasksInProgress = this.tasks.get(workerCreated) - const isKillBehaviorOptionHard = message.kill === killBehaviorTypes.HARD - if (isKillBehaviorOptionHard || tasksInProgress === 0) { + if ( + isKillBehavior(KillBehaviors.HARD, message.kill) || + tasksInProgress === 0 + ) { // Kill received from the worker, means that no new tasks are submitted to that worker for a while ( > maxInactiveTime) this.sendToWorker(workerCreated, { kill: 1 }) void this.destroyWorker(workerCreated) diff --git a/src/pools/thread/dynamic.ts b/src/pools/thread/dynamic.ts index 4a2fd5e8..86950b43 100644 --- a/src/pools/thread/dynamic.ts +++ b/src/pools/thread/dynamic.ts @@ -1,8 +1,8 @@ import type { JSONValue } from '../../utility-types' +import { isKillBehavior, KillBehaviors } from '../../worker/worker-options' import type { PoolOptions } from '../abstract-pool' import type { ThreadWorkerWithMessageChannel } from './fixed' import { FixedThreadPool } from './fixed' -import { killBehaviorTypes } from '../../worker/worker-options' /** * A thread pool with a dynamic number of threads, but a guaranteed minimum number of threads. @@ -63,8 +63,10 @@ export class DynamicThreadPool< const workerCreated = this.createAndSetupWorker() this.registerWorkerMessageListener(workerCreated, message => { const tasksInProgress = this.tasks.get(workerCreated) - const isKillBehaviorOptionHard = message.kill === killBehaviorTypes.HARD - if (isKillBehaviorOptionHard || tasksInProgress === 0) { + if ( + isKillBehavior(KillBehaviors.HARD, message.kill) || + tasksInProgress === 0 + ) { // Kill received from the worker, means that no new tasks are submitted to that worker for a while ( > maxInactiveTime) this.sendToWorker(workerCreated, { kill: 1 }) void this.destroyWorker(workerCreated) diff --git a/src/utility-types.ts b/src/utility-types.ts index 2a2c1993..d680fa16 100644 --- a/src/utility-types.ts +++ b/src/utility-types.ts @@ -1,7 +1,7 @@ import type { Worker } from 'cluster' import type { MessagePort } from 'worker_threads' +import type { KillBehavior } from './worker/worker-options' -export type KillBehavior = 'HARD' | 'SOFT' /** * Make all properties in T non-readonly */ @@ -43,7 +43,7 @@ export interface MessageValue< /** * Kill code. */ - readonly kill?: KillBehavior | number + readonly kill?: KillBehavior | 1 /** * Error. */ diff --git a/src/worker/abstract-worker.ts b/src/worker/abstract-worker.ts index 768c8d95..6aa34e8e 100644 --- a/src/worker/abstract-worker.ts +++ b/src/worker/abstract-worker.ts @@ -1,12 +1,12 @@ import { AsyncResource } from 'async_hooks' import type { Worker } from 'cluster' import type { MessagePort } from 'worker_threads' -import type { MessageValue, KillBehavior } from '../utility-types' -import type { WorkerOptions } from './worker-options' +import type { MessageValue } from '../utility-types' +import type { KillBehavior, WorkerOptions } from './worker-options' +import { KillBehaviors } from './worker-options' -const defaultMaxInactiveTime = 1000 * 60 -// TODO fix this and avoid that SOFT/HARD words are replicated so much times into the project -const defaultKillBehavior: KillBehavior = 'SOFT' +const DEFAULT_MAX_INACTIVE_TIME = 1000 * 60 +const DEFAULT_KILL_BEHAVIOR: KillBehavior = KillBehaviors.SOFT /** * Base class containing some shared logic for all poolifier workers. @@ -56,13 +56,14 @@ export abstract class AbstractWorker< fn: (data: Data) => Response, protected mainWorker?: MainWorker | null, public readonly opts: WorkerOptions = { - killBehavior: defaultKillBehavior, - maxInactiveTime: defaultMaxInactiveTime + killBehavior: DEFAULT_KILL_BEHAVIOR, + maxInactiveTime: DEFAULT_MAX_INACTIVE_TIME } ) { super(type) - this.killBehavior = this.opts.killBehavior ?? defaultKillBehavior - this.maxInactiveTime = this.opts.maxInactiveTime ?? defaultMaxInactiveTime + this.killBehavior = this.opts.killBehavior ?? DEFAULT_KILL_BEHAVIOR + this.maxInactiveTime = + this.opts.maxInactiveTime ?? DEFAULT_MAX_INACTIVE_TIME this.async = !!this.opts.async this.lastTask = Date.now() if (!fn) throw new Error('fn parameter is mandatory') diff --git a/src/worker/worker-options.ts b/src/worker/worker-options.ts index bdad1a8d..4473a975 100644 --- a/src/worker/worker-options.ts +++ b/src/worker/worker-options.ts @@ -1,12 +1,36 @@ -import type { KillBehavior } from '../utility-types' - /** - * Kill behavior enumeration + * Enumeration of kill behaviors. */ -export const killBehaviorTypes = Object.freeze({ +export const KillBehaviors = Object.freeze({ + /** + * If `currentTime - lastActiveTime` is greater than `maxInactiveTime` but a task is still running, then the worker **wont** be deleted. + */ SOFT: 'SOFT', + /** + * If `lastActiveTime` is greater than `maxInactiveTime` but a task is still running, then the worker will be deleted. + */ HARD: 'HARD' -}) +} as const) + +/** + * Kill behavior. + */ +export type KillBehavior = keyof typeof KillBehaviors + +/** + * Detects whether the given value is a kill behavior or not. + * + * @template KB Which specific KillBehavior to test against. + * @param killBehavior Which kind of kill behavior to detect. Default: `KillBehaviors.HARD`. + * @param value Any value. + * @returns `true` if `value` was strictly equals to `killBehavior`, otherwise `false`. + */ +export function isKillBehavior ( + killBehavior: KB, + value: unknown +): value is KB { + return value === killBehavior +} /** * Options for workers. @@ -14,11 +38,13 @@ export const killBehaviorTypes = Object.freeze({ export interface WorkerOptions { /** * Maximum waiting time in milliseconds for tasks. + * * After this time, newly created workers will be terminated. * The last active time of your worker unit will be updated when a task is submitted to a worker or when a worker terminate a task. - * If killBehavior is set to HARD this value represents also the timeout for the tasks that you submit to the pool, - * when this timeout expires your tasks is interrupted and the worker is killed if is not part of the minimum size of the pool. - * If killBehavior is set to SOFT your tasks have no timeout and your workers will not be terminated until your task is + * + * - If `killBehavior` is set to `KillBehaviors.HARD` this value represents also the timeout for the tasks that you submit to the pool, + * when this timeout expires your tasks is interrupted and the worker is killed if is not part of the minimum size of the pool. + * - If `killBehavior` is set to `KillBehaviors.SOFT` your tasks have no timeout and your workers will not be terminated until your task is. * * @default 60.000 ms */ @@ -30,12 +56,14 @@ export interface WorkerOptions { */ async?: boolean /** - * killBehavior dictates if your async unit ( worker/process ) will be deleted in case that a task is active on it. - * SOFT: If current time - last active time is greater than maxInactiveTime option, but a task is still running then the worker will be not deleted. - * HARD: If last active time is greater than maxInactiveTime option, but a task is still running then the worker will be deleted. + * `killBehavior` dictates if your async unit (worker/process) will be deleted in case that a task is active on it. + * + * - SOFT: If `currentTime - lastActiveTime` is greater than `maxInactiveTime` but a task is still running, then the worker **wont** be deleted. + * - HARD: If `lastActiveTime` is greater than `maxInactiveTime` but a task is still running, then the worker will be deleted. + * * This option only apply to the newly created workers. * - * @default SOFT + * @default KillBehaviors.SOFT */ killBehavior?: KillBehavior } diff --git a/tests/worker/cluster/asyncErrorWorker.js b/tests/worker/cluster/asyncErrorWorker.js index f3e0e7c8..b9fd9016 100644 --- a/tests/worker/cluster/asyncErrorWorker.js +++ b/tests/worker/cluster/asyncErrorWorker.js @@ -1,5 +1,5 @@ 'use strict' -const { ClusterWorker, killBehaviorTypes } = require('../../../lib/index') +const { ClusterWorker, KillBehaviors } = require('../../../lib/index') async function error (data) { return new Promise((resolve, reject) => { @@ -13,5 +13,5 @@ async function error (data) { module.exports = new ClusterWorker(error, { maxInactiveTime: 500, async: true, - killBehavior: killBehaviorTypes + killBehavior: KillBehaviors.HARD }) diff --git a/tests/worker/cluster/asyncWorker.js b/tests/worker/cluster/asyncWorker.js index 106df109..b5c784df 100644 --- a/tests/worker/cluster/asyncWorker.js +++ b/tests/worker/cluster/asyncWorker.js @@ -1,5 +1,5 @@ 'use strict' -const { ClusterWorker, killBehaviorTypes } = require('../../../lib/index') +const { ClusterWorker, KillBehaviors } = require('../../../lib/index') async function sleep (data) { return new Promise((resolve, reject) => { @@ -10,5 +10,5 @@ async function sleep (data) { module.exports = new ClusterWorker(sleep, { maxInactiveTime: 500, async: true, - killBehavior: killBehaviorTypes.HARD + killBehavior: KillBehaviors.HARD }) diff --git a/tests/worker/cluster/echoWorker.js b/tests/worker/cluster/echoWorker.js index 77898bac..054c4bb3 100644 --- a/tests/worker/cluster/echoWorker.js +++ b/tests/worker/cluster/echoWorker.js @@ -1,5 +1,5 @@ 'use strict' -const { ClusterWorker, killBehaviorTypes } = require('../../../lib/index') +const { ClusterWorker, KillBehaviors } = require('../../../lib/index') function echo (data) { return data @@ -7,5 +7,5 @@ function echo (data) { module.exports = new ClusterWorker(echo, { maxInactiveTime: 500, - killBehavior: killBehaviorTypes.HARD + killBehavior: KillBehaviors.HARD }) diff --git a/tests/worker/cluster/emptyWorker.js b/tests/worker/cluster/emptyWorker.js index 979057c2..58c55af1 100644 --- a/tests/worker/cluster/emptyWorker.js +++ b/tests/worker/cluster/emptyWorker.js @@ -1,9 +1,9 @@ 'use strict' -const { ClusterWorker, killBehaviorTypes } = require('../../../lib/index') +const { ClusterWorker, KillBehaviors } = require('../../../lib/index') function test (data) {} module.exports = new ClusterWorker(test, { maxInactiveTime: 500, - killBehavior: killBehaviorTypes.HARD + killBehavior: KillBehaviors.HARD }) diff --git a/tests/worker/cluster/errorWorker.js b/tests/worker/cluster/errorWorker.js index 02168a67..d6d9297a 100644 --- a/tests/worker/cluster/errorWorker.js +++ b/tests/worker/cluster/errorWorker.js @@ -1,5 +1,5 @@ 'use strict' -const { ClusterWorker, killBehaviorTypes } = require('../../../lib/index') +const { ClusterWorker, KillBehaviors } = require('../../../lib/index') function error (data) { throw new Error('Error Message from ClusterWorker') @@ -8,5 +8,5 @@ function error (data) { module.exports = new ClusterWorker(error, { maxInactiveTime: 500, async: false, - killBehavior: killBehaviorTypes + killBehavior: KillBehaviors.HARD }) diff --git a/tests/worker/cluster/longRunningWorkerHardBehavior.js b/tests/worker/cluster/longRunningWorkerHardBehavior.js index 4dc69525..04c78f46 100644 --- a/tests/worker/cluster/longRunningWorkerHardBehavior.js +++ b/tests/worker/cluster/longRunningWorkerHardBehavior.js @@ -1,5 +1,5 @@ 'use strict' -const { ClusterWorker, killBehaviorTypes } = require('../../../lib/index') +const { ClusterWorker, KillBehaviors } = require('../../../lib/index') async function sleep (data) { return new Promise((resolve, reject) => { @@ -10,5 +10,5 @@ async function sleep (data) { module.exports = new ClusterWorker(sleep, { maxInactiveTime: 500, async: true, - killBehavior: killBehaviorTypes.HARD + killBehavior: KillBehaviors.HARD }) diff --git a/tests/worker/cluster/testWorker.js b/tests/worker/cluster/testWorker.js index 9b95294b..7caad947 100644 --- a/tests/worker/cluster/testWorker.js +++ b/tests/worker/cluster/testWorker.js @@ -1,5 +1,5 @@ 'use strict' -const { ClusterWorker, killBehaviorTypes } = require('../../../lib/index') +const { ClusterWorker, KillBehaviors } = require('../../../lib/index') const { isMaster } = require('cluster') function test (data) { @@ -14,5 +14,5 @@ function test (data) { module.exports = new ClusterWorker(test, { maxInactiveTime: 500, - killBehavior: killBehaviorTypes.HARD + killBehavior: KillBehaviors.HARD }) diff --git a/tests/worker/thread/asyncWorker.js b/tests/worker/thread/asyncWorker.js index a6a9590d..0bf5d244 100644 --- a/tests/worker/thread/asyncWorker.js +++ b/tests/worker/thread/asyncWorker.js @@ -1,5 +1,5 @@ 'use strict' -const { ThreadWorker, killBehaviorTypes } = require('../../../lib/index') +const { ThreadWorker, KillBehaviors } = require('../../../lib/index') async function sleep (data) { return new Promise((resolve, reject) => { @@ -10,5 +10,5 @@ async function sleep (data) { module.exports = new ThreadWorker(sleep, { maxInactiveTime: 500, async: true, - killBehavior: killBehaviorTypes.HARD + killBehavior: KillBehaviors.HARD }) diff --git a/tests/worker/thread/echoWorker.js b/tests/worker/thread/echoWorker.js index 9471891e..071428c5 100644 --- a/tests/worker/thread/echoWorker.js +++ b/tests/worker/thread/echoWorker.js @@ -1,5 +1,5 @@ 'use strict' -const { ThreadWorker, killBehaviorTypes } = require('../../../lib/index') +const { ThreadWorker, KillBehaviors } = require('../../../lib/index') function echo (data) { return data @@ -7,5 +7,5 @@ function echo (data) { module.exports = new ThreadWorker(echo, { maxInactiveTime: 500, - killBehavior: killBehaviorTypes.HARD + killBehavior: KillBehaviors.HARD }) diff --git a/tests/worker/thread/emptyWorker.js b/tests/worker/thread/emptyWorker.js index 1787a79c..6a146c26 100644 --- a/tests/worker/thread/emptyWorker.js +++ b/tests/worker/thread/emptyWorker.js @@ -1,9 +1,9 @@ 'use strict' -const { ThreadWorker, killBehaviorTypes } = require('../../../lib/index') +const { ThreadWorker, KillBehaviors } = require('../../../lib/index') function test (data) {} module.exports = new ThreadWorker(test, { maxInactiveTime: 500, - killBehavior: killBehaviorTypes.HARD + killBehavior: KillBehaviors.HARD }) diff --git a/tests/worker/thread/errorWorker.js b/tests/worker/thread/errorWorker.js index 7f7fdadd..e9f20ab8 100644 --- a/tests/worker/thread/errorWorker.js +++ b/tests/worker/thread/errorWorker.js @@ -1,5 +1,5 @@ 'use strict' -const { ThreadWorker, killBehaviorTypes } = require('../../../lib/index') +const { ThreadWorker, KillBehaviors } = require('../../../lib/index') function error (data) { throw new Error(data) @@ -7,5 +7,5 @@ function error (data) { module.exports = new ThreadWorker(error, { maxInactiveTime: 500, - killBehavior: killBehaviorTypes.HARD + killBehavior: KillBehaviors.HARD }) diff --git a/tests/worker/thread/longRunningWorkerHardBehavior.js b/tests/worker/thread/longRunningWorkerHardBehavior.js index 8e3eb38a..7d9714a8 100644 --- a/tests/worker/thread/longRunningWorkerHardBehavior.js +++ b/tests/worker/thread/longRunningWorkerHardBehavior.js @@ -1,5 +1,5 @@ 'use strict' -const { ThreadWorker, killBehaviorTypes } = require('../../../lib/index') +const { ThreadWorker, KillBehaviors } = require('../../../lib/index') async function sleep (data) { return new Promise((resolve, reject) => { @@ -10,5 +10,5 @@ async function sleep (data) { module.exports = new ThreadWorker(sleep, { maxInactiveTime: 500, async: true, - killBehavior: killBehaviorTypes.HARD + killBehavior: KillBehaviors.HARD }) diff --git a/tests/worker/thread/testWorker.js b/tests/worker/thread/testWorker.js index 7510f00b..c70069c7 100644 --- a/tests/worker/thread/testWorker.js +++ b/tests/worker/thread/testWorker.js @@ -1,5 +1,5 @@ 'use strict' -const { ThreadWorker, killBehaviorTypes } = require('../../../lib/index') +const { ThreadWorker, KillBehaviors } = require('../../../lib/index') const { isMainThread } = require('worker_threads') function test (data) { @@ -14,5 +14,5 @@ function test (data) { module.exports = new ThreadWorker(test, { maxInactiveTime: 500, - killBehavior: killBehaviorTypes.HARD + killBehavior: KillBehaviors.HARD }) -- 2.34.1