Dedupe worker attributes (#364)
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Sun, 16 May 2021 18:17:08 +0000 (20:17 +0200)
committerGitHub <noreply@github.com>
Sun, 16 May 2021 18:17:08 +0000 (20:17 +0200)
.eslintrc.js
.vscode/settings.json
CHANGELOG.md
src/pools/abstract-pool.ts
src/utility-types.ts
src/worker/abstract-worker.ts
src/worker/cluster-worker.ts
src/worker/thread-worker.ts
tests/worker/abstract-worker.test.js
tests/worker/cluster-worker.test.js
tests/worker/thread-worker.test.js

index e35a48df6bc3e96fbcf3ebfb247e53ae875ad030..15fcb940ae0e8025076ceebc05196192d900e010 100644 (file)
@@ -59,7 +59,6 @@ module.exports = defineConfig({
           'enum',
           'inheritdoc',
           'jsdoc',
-          'pioardi',
           'poolifier',
           'readonly',
           'serializable',
index 886a5af31d2e7b7f33f9920f9d3239edae6e0fc8..4f0283b2e95ba8080078489da1adedda1e639a50 100644 (file)
@@ -11,7 +11,6 @@
     "loglevel",
     "markdownlint",
     "piment",
-    "pioardi",
     "poolifier",
     "prettierx",
     "serializable",
index b2525a5eae39d5f417e1890f7e517c74357c7c6b..20b83d5bea6703dfd57ef509564fc86402aacfcc 100644 (file)
@@ -5,6 +5,14 @@ All notable changes to this project will be documented in this file.
 The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
 and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
 
+## [2.1.0] - 2021-dd-mm
+
+### Breaking Changes
+
+- `AbstractWorker` class `maxInactiveTime`, `killBehavior` and `async` attributes have been removed in favour of the same ones in the worker options `opts` public attribute.
+- `AbstractWorker` class `lastTask` attribute have been renamed to `lastTaskTimestamp`.
+- `AbstractWorker` class `interval` attribute have been renamed to `aliveInterval`.
+
 ## [2.0.2] - 2021-12-05
 
 ### Bug fixes
index dc49dc8f6edada76925ddb66ae27b0224c9cce8c..e0aa1810c4d839f39d29eee350ae010a6945ffab 100644 (file)
@@ -116,7 +116,7 @@ export abstract class AbstractPool<
   /**
    * The promise map.
    *
-   * - `key`: This is the message ID of each submitted task.
+   * - `key`: This is the message Id of each submitted task.
    * - `value`: An object that contains the worker, the resolve function and the reject function.
    *
    * When we receive a message from the worker we get a map entry and resolve/reject the promise based on the message.
@@ -127,7 +127,7 @@ export abstract class AbstractPool<
   > = new Map<number, PromiseWorkerResponseWrapper<Worker, Response>>()
 
   /**
-   * ID of the next message.
+   * Id of the next message.
    */
   protected nextMessageId: number = 0
 
index 9441fec4e420292905a23150d73580e73b3411cd..9c8d9650b15ccc31007a19d199117860a650ac58 100644 (file)
@@ -20,7 +20,7 @@ export interface MessageValue<
    */
   readonly data?: Data
   /**
-   * ID of the message.
+   * Id of the message.
    */
   readonly id?: number
   /**
index 26468aeb96ae0d5f838518e2e9308b2e31c16c0e..4a6241a8ccfdb1c7426504568989c3c293983405 100644 (file)
@@ -21,26 +21,14 @@ export abstract class AbstractWorker<
   Data = unknown,
   Response = unknown
 > extends AsyncResource {
-  /**
-   * The maximum time to keep this worker alive while idle. The pool automatically checks and terminates this worker when the time expires.
-   */
-  protected readonly maxInactiveTime: number
-  /**
-   * The kill behavior set as option on the Worker constructor or a default value.
-   */
-  protected readonly killBehavior: KillBehavior
-  /**
-   * Whether the worker is working asynchronously or not.
-   */
-  protected readonly async: boolean
   /**
    * Timestamp of the last task processed by this worker.
    */
-  protected lastTask: number
+  protected lastTaskTimestamp: number
   /**
-   * Handler ID of the `interval` alive check.
+   * Handler Id of the `aliveInterval` worker alive check.
    */
-  protected readonly interval?: NodeJS.Timeout
+  protected readonly aliveInterval?: NodeJS.Timeout
 
   /**
    * Constructs a new poolifier worker.
@@ -57,22 +45,26 @@ export abstract class AbstractWorker<
     fn: (data: Data) => Response,
     protected mainWorker?: MainWorker | null,
     public readonly opts: WorkerOptions = {
+      /**
+       * The kill behavior option on this Worker or its default value.
+       */
       killBehavior: DEFAULT_KILL_BEHAVIOR,
+      /**
+       * The maximum time to keep this worker alive while idle.
+       * The pool automatically checks and terminates this worker when the time expires.
+       */
       maxInactiveTime: DEFAULT_MAX_INACTIVE_TIME
     }
   ) {
     super(type)
-    this.killBehavior = this.opts.killBehavior ?? DEFAULT_KILL_BEHAVIOR
-    this.maxInactiveTime =
-      this.opts.maxInactiveTime ?? DEFAULT_MAX_INACTIVE_TIME
-    this.async = !!this.opts.async
-    this.lastTask = Date.now()
     this.checkFunctionInput(fn)
+    this.checkWorkerOptions(this.opts)
+    this.lastTaskTimestamp = Date.now()
     // Keep the worker active
     if (!isMain) {
-      this.interval = setInterval(
+      this.aliveInterval = setInterval(
         this.checkAlive.bind(this),
-        this.maxInactiveTime / 2
+        (this.opts.maxInactiveTime ?? DEFAULT_MAX_INACTIVE_TIME) / 2
       )
       this.checkAlive.bind(this)()
     }
@@ -80,7 +72,7 @@ export abstract class AbstractWorker<
     this.mainWorker?.on('message', (value: MessageValue<Data, MainWorker>) => {
       if (value?.data && value.id) {
         // Here you will receive messages
-        if (this.async) {
+        if (this.opts.async) {
           this.runInAsyncScope(this.runAsync.bind(this), this, fn, value)
         } else {
           this.runInAsyncScope(this.run.bind(this), this, fn, value)
@@ -91,12 +83,22 @@ export abstract class AbstractWorker<
         this.mainWorker = value.parent
       } else if (value.kill) {
         // Here is time to kill this worker, just clearing the interval
-        if (this.interval) clearInterval(this.interval)
+        if (this.aliveInterval) clearInterval(this.aliveInterval)
         this.emitDestroy()
       }
     })
   }
 
+  private checkWorkerOptions (opts: WorkerOptions) {
+    this.opts.killBehavior = opts.killBehavior ?? DEFAULT_KILL_BEHAVIOR
+    this.opts.maxInactiveTime =
+      opts.maxInactiveTime ?? DEFAULT_MAX_INACTIVE_TIME
+    /**
+     * Whether the worker is working asynchronously or not.
+     */
+    this.opts.async = !!opts.async
+  }
+
   /**
    * Check if the `fn` parameter is passed to the constructor.
    *
@@ -129,8 +131,11 @@ export abstract class AbstractWorker<
    * Check to see if the worker should be terminated, because its living too long.
    */
   protected checkAlive (): void {
-    if (Date.now() - this.lastTask > this.maxInactiveTime) {
-      this.sendToMainWorker({ kill: this.killBehavior })
+    if (
+      Date.now() - this.lastTaskTimestamp >
+      (this.opts.maxInactiveTime ?? DEFAULT_MAX_INACTIVE_TIME)
+    ) {
+      this.sendToMainWorker({ kill: this.opts.killBehavior })
     }
   }
 
@@ -161,7 +166,7 @@ export abstract class AbstractWorker<
       const err = this.handleError(e)
       this.sendToMainWorker({ error: err, id: value.id })
     } finally {
-      this.lastTask = Date.now()
+      this.lastTaskTimestamp = Date.now()
     }
   }
 
@@ -185,7 +190,7 @@ export abstract class AbstractWorker<
         this.sendToMainWorker({ error: err, id: value.id })
       })
       .finally(() => {
-        this.lastTask = Date.now()
+        this.lastTaskTimestamp = Date.now()
       })
       .catch(EMPTY_FUNCTION)
   }
index 2c326a67380e96589083a8bb451e4353180e232b..aa6aa92f6f1471839530b07e718df4e1f3bcdd56 100644 (file)
@@ -30,7 +30,7 @@ export class ClusterWorker<
    * @param opts Options for the worker.
    */
   public constructor (fn: (data: Data) => Response, opts: WorkerOptions = {}) {
-    super('worker-cluster-pool:pioardi', isMaster, fn, worker, opts)
+    super('worker-cluster-pool:poolifier', isMaster, fn, worker, opts)
   }
 
   /** @inheritdoc */
index 4c458aa19d0f5ed47091689deb3727037663100d..f1e38076698622d1d4f010e821453842d16113a3 100644 (file)
@@ -30,7 +30,7 @@ export class ThreadWorker<
    * @param opts Options for the worker.
    */
   public constructor (fn: (data: Data) => Response, opts: WorkerOptions = {}) {
-    super('worker-thread-pool:pioardi', isMainThread, fn, parentPort, opts)
+    super('worker-thread-pool:poolifier', isMainThread, fn, parentPort, opts)
   }
 
   /** @inheritdoc */
index 9b4864c77dda745d3d162b833875b6709a38b6d9..4788e5c6964916873f53ab34bd15b45b0c3dc966 100644 (file)
@@ -15,11 +15,11 @@ describe('Abstract worker test suite', () => {
     )
   })
 
-  it('Verify worker default values', () => {
+  it('Verify worker options default values', () => {
     const worker = new ThreadWorker(() => {})
-    expect(worker.maxInactiveTime).toBe(1000 * 60)
-    expect(worker.killBehavior).toBe(KillBehaviors.SOFT)
-    expect(worker.async).toBe(false)
+    expect(worker.opts.maxInactiveTime).toBe(1000 * 60)
+    expect(worker.opts.killBehavior).toBe(KillBehaviors.SOFT)
+    expect(worker.opts.async).toBe(false)
   })
 
   it('Verify that worker options are set at worker creation', () => {
@@ -28,9 +28,9 @@ describe('Abstract worker test suite', () => {
       async: true,
       killBehavior: KillBehaviors.HARD
     })
-    expect(worker.maxInactiveTime).toBe(6000)
-    expect(worker.killBehavior).toBe(KillBehaviors.HARD)
-    expect(worker.async).toBe(true)
+    expect(worker.opts.maxInactiveTime).toBe(6000)
+    expect(worker.opts.killBehavior).toBe(KillBehaviors.HARD)
+    expect(worker.opts.async).toBe(true)
   })
 
   it('Verify that handleError function is working properly', () => {
index 196564a48c51aee9e32a2f4af7cd91208d9b92de..fccf3c78c80e59e049d8f9bc4a131354621468ec 100644 (file)
@@ -4,7 +4,7 @@ const { ClusterWorker } = require('../../lib')
 describe('Cluster worker test suite', () => {
   it('Verify worker has default maxInactiveTime', () => {
     const worker = new ClusterWorker(() => {})
-    expect(worker.maxInactiveTime).toEqual(60_000)
+    expect(worker.opts.maxInactiveTime).toEqual(60_000)
   })
 
   it('Verify that handleError function works properly', () => {
index fd136a73a4f349e4c032398236bd426ed09f1747..0024fe0a212726834b97b0d39679d5b86f8a5030 100644 (file)
@@ -14,7 +14,7 @@ class SpyWorker extends ThreadWorker {
 describe('Thread worker test suite', () => {
   it('Verify worker has default maxInactiveTime', () => {
     const worker = new ThreadWorker(() => {})
-    expect(worker.maxInactiveTime).toEqual(60_000)
+    expect(worker.opts.maxInactiveTime).toEqual(60_000)
   })
 
   it('Verify worker invoke the getMainWorker and postMessage methods', () => {