From e088a00c285c320395d883d57d1db51d42300b10 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Sun, 16 May 2021 20:17:08 +0200 Subject: [PATCH] Dedupe worker attributes (#364) --- .eslintrc.js | 1 - .vscode/settings.json | 1 - CHANGELOG.md | 8 ++++ src/pools/abstract-pool.ts | 4 +- src/utility-types.ts | 2 +- src/worker/abstract-worker.ts | 61 +++++++++++++++------------- src/worker/cluster-worker.ts | 2 +- src/worker/thread-worker.ts | 2 +- tests/worker/abstract-worker.test.js | 14 +++---- tests/worker/cluster-worker.test.js | 2 +- tests/worker/thread-worker.test.js | 2 +- 11 files changed, 55 insertions(+), 44 deletions(-) diff --git a/.eslintrc.js b/.eslintrc.js index e35a48df..15fcb940 100644 --- a/.eslintrc.js +++ b/.eslintrc.js @@ -59,7 +59,6 @@ module.exports = defineConfig({ 'enum', 'inheritdoc', 'jsdoc', - 'pioardi', 'poolifier', 'readonly', 'serializable', diff --git a/.vscode/settings.json b/.vscode/settings.json index 886a5af3..4f0283b2 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -11,7 +11,6 @@ "loglevel", "markdownlint", "piment", - "pioardi", "poolifier", "prettierx", "serializable", diff --git a/CHANGELOG.md b/CHANGELOG.md index b2525a5e..20b83d5b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,14 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [2.1.0] - 2021-dd-mm + +### Breaking Changes + +- `AbstractWorker` class `maxInactiveTime`, `killBehavior` and `async` attributes have been removed in favour of the same ones in the worker options `opts` public attribute. +- `AbstractWorker` class `lastTask` attribute have been renamed to `lastTaskTimestamp`. +- `AbstractWorker` class `interval` attribute have been renamed to `aliveInterval`. + ## [2.0.2] - 2021-12-05 ### Bug fixes diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index dc49dc8f..e0aa1810 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -116,7 +116,7 @@ export abstract class AbstractPool< /** * The promise map. * - * - `key`: This is the message ID of each submitted task. + * - `key`: This is the message Id of each submitted task. * - `value`: An object that contains the worker, the resolve function and the reject function. * * When we receive a message from the worker we get a map entry and resolve/reject the promise based on the message. @@ -127,7 +127,7 @@ export abstract class AbstractPool< > = new Map>() /** - * ID of the next message. + * Id of the next message. */ protected nextMessageId: number = 0 diff --git a/src/utility-types.ts b/src/utility-types.ts index 9441fec4..9c8d9650 100644 --- a/src/utility-types.ts +++ b/src/utility-types.ts @@ -20,7 +20,7 @@ export interface MessageValue< */ readonly data?: Data /** - * ID of the message. + * Id of the message. */ readonly id?: number /** diff --git a/src/worker/abstract-worker.ts b/src/worker/abstract-worker.ts index 26468aeb..4a6241a8 100644 --- a/src/worker/abstract-worker.ts +++ b/src/worker/abstract-worker.ts @@ -21,26 +21,14 @@ export abstract class AbstractWorker< Data = unknown, Response = unknown > extends AsyncResource { - /** - * The maximum time to keep this worker alive while idle. The pool automatically checks and terminates this worker when the time expires. - */ - protected readonly maxInactiveTime: number - /** - * The kill behavior set as option on the Worker constructor or a default value. - */ - protected readonly killBehavior: KillBehavior - /** - * Whether the worker is working asynchronously or not. - */ - protected readonly async: boolean /** * Timestamp of the last task processed by this worker. */ - protected lastTask: number + protected lastTaskTimestamp: number /** - * Handler ID of the `interval` alive check. + * Handler Id of the `aliveInterval` worker alive check. */ - protected readonly interval?: NodeJS.Timeout + protected readonly aliveInterval?: NodeJS.Timeout /** * Constructs a new poolifier worker. @@ -57,22 +45,26 @@ export abstract class AbstractWorker< fn: (data: Data) => Response, protected mainWorker?: MainWorker | null, public readonly opts: WorkerOptions = { + /** + * The kill behavior option on this Worker or its default value. + */ killBehavior: DEFAULT_KILL_BEHAVIOR, + /** + * The maximum time to keep this worker alive while idle. + * The pool automatically checks and terminates this worker when the time expires. + */ maxInactiveTime: DEFAULT_MAX_INACTIVE_TIME } ) { super(type) - this.killBehavior = this.opts.killBehavior ?? DEFAULT_KILL_BEHAVIOR - this.maxInactiveTime = - this.opts.maxInactiveTime ?? DEFAULT_MAX_INACTIVE_TIME - this.async = !!this.opts.async - this.lastTask = Date.now() this.checkFunctionInput(fn) + this.checkWorkerOptions(this.opts) + this.lastTaskTimestamp = Date.now() // Keep the worker active if (!isMain) { - this.interval = setInterval( + this.aliveInterval = setInterval( this.checkAlive.bind(this), - this.maxInactiveTime / 2 + (this.opts.maxInactiveTime ?? DEFAULT_MAX_INACTIVE_TIME) / 2 ) this.checkAlive.bind(this)() } @@ -80,7 +72,7 @@ export abstract class AbstractWorker< this.mainWorker?.on('message', (value: MessageValue) => { if (value?.data && value.id) { // Here you will receive messages - if (this.async) { + if (this.opts.async) { this.runInAsyncScope(this.runAsync.bind(this), this, fn, value) } else { this.runInAsyncScope(this.run.bind(this), this, fn, value) @@ -91,12 +83,22 @@ export abstract class AbstractWorker< this.mainWorker = value.parent } else if (value.kill) { // Here is time to kill this worker, just clearing the interval - if (this.interval) clearInterval(this.interval) + if (this.aliveInterval) clearInterval(this.aliveInterval) this.emitDestroy() } }) } + private checkWorkerOptions (opts: WorkerOptions) { + this.opts.killBehavior = opts.killBehavior ?? DEFAULT_KILL_BEHAVIOR + this.opts.maxInactiveTime = + opts.maxInactiveTime ?? DEFAULT_MAX_INACTIVE_TIME + /** + * Whether the worker is working asynchronously or not. + */ + this.opts.async = !!opts.async + } + /** * Check if the `fn` parameter is passed to the constructor. * @@ -129,8 +131,11 @@ export abstract class AbstractWorker< * Check to see if the worker should be terminated, because its living too long. */ protected checkAlive (): void { - if (Date.now() - this.lastTask > this.maxInactiveTime) { - this.sendToMainWorker({ kill: this.killBehavior }) + if ( + Date.now() - this.lastTaskTimestamp > + (this.opts.maxInactiveTime ?? DEFAULT_MAX_INACTIVE_TIME) + ) { + this.sendToMainWorker({ kill: this.opts.killBehavior }) } } @@ -161,7 +166,7 @@ export abstract class AbstractWorker< const err = this.handleError(e) this.sendToMainWorker({ error: err, id: value.id }) } finally { - this.lastTask = Date.now() + this.lastTaskTimestamp = Date.now() } } @@ -185,7 +190,7 @@ export abstract class AbstractWorker< this.sendToMainWorker({ error: err, id: value.id }) }) .finally(() => { - this.lastTask = Date.now() + this.lastTaskTimestamp = Date.now() }) .catch(EMPTY_FUNCTION) } diff --git a/src/worker/cluster-worker.ts b/src/worker/cluster-worker.ts index 2c326a67..aa6aa92f 100644 --- a/src/worker/cluster-worker.ts +++ b/src/worker/cluster-worker.ts @@ -30,7 +30,7 @@ export class ClusterWorker< * @param opts Options for the worker. */ public constructor (fn: (data: Data) => Response, opts: WorkerOptions = {}) { - super('worker-cluster-pool:pioardi', isMaster, fn, worker, opts) + super('worker-cluster-pool:poolifier', isMaster, fn, worker, opts) } /** @inheritdoc */ diff --git a/src/worker/thread-worker.ts b/src/worker/thread-worker.ts index 4c458aa1..f1e38076 100644 --- a/src/worker/thread-worker.ts +++ b/src/worker/thread-worker.ts @@ -30,7 +30,7 @@ export class ThreadWorker< * @param opts Options for the worker. */ public constructor (fn: (data: Data) => Response, opts: WorkerOptions = {}) { - super('worker-thread-pool:pioardi', isMainThread, fn, parentPort, opts) + super('worker-thread-pool:poolifier', isMainThread, fn, parentPort, opts) } /** @inheritdoc */ diff --git a/tests/worker/abstract-worker.test.js b/tests/worker/abstract-worker.test.js index 9b4864c7..4788e5c6 100644 --- a/tests/worker/abstract-worker.test.js +++ b/tests/worker/abstract-worker.test.js @@ -15,11 +15,11 @@ describe('Abstract worker test suite', () => { ) }) - it('Verify worker default values', () => { + it('Verify worker options default values', () => { const worker = new ThreadWorker(() => {}) - expect(worker.maxInactiveTime).toBe(1000 * 60) - expect(worker.killBehavior).toBe(KillBehaviors.SOFT) - expect(worker.async).toBe(false) + expect(worker.opts.maxInactiveTime).toBe(1000 * 60) + expect(worker.opts.killBehavior).toBe(KillBehaviors.SOFT) + expect(worker.opts.async).toBe(false) }) it('Verify that worker options are set at worker creation', () => { @@ -28,9 +28,9 @@ describe('Abstract worker test suite', () => { async: true, killBehavior: KillBehaviors.HARD }) - expect(worker.maxInactiveTime).toBe(6000) - expect(worker.killBehavior).toBe(KillBehaviors.HARD) - expect(worker.async).toBe(true) + expect(worker.opts.maxInactiveTime).toBe(6000) + expect(worker.opts.killBehavior).toBe(KillBehaviors.HARD) + expect(worker.opts.async).toBe(true) }) it('Verify that handleError function is working properly', () => { diff --git a/tests/worker/cluster-worker.test.js b/tests/worker/cluster-worker.test.js index 196564a4..fccf3c78 100644 --- a/tests/worker/cluster-worker.test.js +++ b/tests/worker/cluster-worker.test.js @@ -4,7 +4,7 @@ const { ClusterWorker } = require('../../lib') describe('Cluster worker test suite', () => { it('Verify worker has default maxInactiveTime', () => { const worker = new ClusterWorker(() => {}) - expect(worker.maxInactiveTime).toEqual(60_000) + expect(worker.opts.maxInactiveTime).toEqual(60_000) }) it('Verify that handleError function works properly', () => { diff --git a/tests/worker/thread-worker.test.js b/tests/worker/thread-worker.test.js index fd136a73..0024fe0a 100644 --- a/tests/worker/thread-worker.test.js +++ b/tests/worker/thread-worker.test.js @@ -14,7 +14,7 @@ class SpyWorker extends ThreadWorker { describe('Thread worker test suite', () => { it('Verify worker has default maxInactiveTime', () => { const worker = new ThreadWorker(() => {}) - expect(worker.maxInactiveTime).toEqual(60_000) + expect(worker.opts.maxInactiveTime).toEqual(60_000) }) it('Verify worker invoke the getMainWorker and postMessage methods', () => { -- 2.34.1