refactor: cleanup internal pool messaging code
authorJérôme Benoit <jerome.benoit@sap.com>
Mon, 26 Jun 2023 21:43:13 +0000 (23:43 +0200)
committerJérôme Benoit <jerome.benoit@sap.com>
Mon, 26 Jun 2023 21:43:13 +0000 (23:43 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>
19 files changed:
.eslintrc.js
.vscode/settings.json
CHANGELOG.md
README.md
examples/typescript/worker.ts
src/pools/abstract-pool.ts
src/pools/cluster/fixed.ts
src/pools/thread/fixed.ts
src/utility-types.ts
src/worker/abstract-worker.ts
src/worker/cluster-worker.ts
tests/worker-files/cluster/asyncErrorWorker.js
tests/worker-files/cluster/asyncWorker.js
tests/worker-files/cluster/longRunningWorkerHardBehavior.js
tests/worker-files/cluster/longRunningWorkerSoftBehavior.js
tests/worker-files/thread/asyncErrorWorker.js
tests/worker-files/thread/asyncWorker.js
tests/worker-files/thread/longRunningWorkerHardBehavior.js
tests/worker-files/thread/longRunningWorkerSoftBehavior.js

index e160b189d2394e46a51c9572026d1fd0aa14a1ef..e421cd282cf758417b1609b6a887f4335619b708 100644 (file)
@@ -72,7 +72,6 @@ module.exports = defineConfig({
           'poolify',
           'readonly',
           'resize',
-          'serializable',
           'sinon',
           'threadjs',
           'threadwork',
index 1004671cb8283803b1f6fe26fcddd69cab2a59a0..20e48e2dfbfd2f4d3a1650c5d588a01086f94985 100644 (file)
@@ -8,6 +8,7 @@
     "autobuild",
     "Benoit",
     "caffeinate",
+    "cloneable",
     "codeql",
     "commitlint",
     "Dependabot",
@@ -34,7 +35,6 @@
     "poolify",
     "preinstall",
     "Quadflieg",
-    "serializable",
     "Shinigami",
     "sonarsource",
     "suchmokuo",
index 3b544676be9239f8d78f889b190bb782884f972f..3c9ab37fa4459f6c8b2734c5845db605e6d5c420 100644 (file)
@@ -548,7 +548,7 @@ const { DynamicThreadPool } = require('poolifier')
 
 #### New type definitions for input data and response
 
-For cluster worker and worker-thread pools, you can now only send and receive serializable data.  
+For cluster worker and worker-thread pools, you can now only send and receive structured-cloneable data.  
 _This is not a limitation by poolifier but NodeJS._
 
 #### Public property replacements
index cb7a1ad994933295e35a53dbdeca21a8bf2e387a..f6bd0f1b3f031bd31284fee08b49dcbd97adbdc4 100644 (file)
--- a/README.md
+++ b/README.md
@@ -142,7 +142,7 @@ You can do the same with the classes ClusterWorker, FixedClusterPool and Dynamic
 
 **See examples folder for more details (in particular if you want to use a pool with [multiple worker functions](./examples/multiFunctionExample.js))**.
 
-Remember that workers can only send and receive serializable data.
+Remember that workers can only send and receive structured-cloneable data.
 
 ## Node versions
 
index adc8c4345ca1ead79eec6d31507e7fc07e53f7af..81d39df3c9eb5c34019dfeeca6e2ab495f6fc982 100644 (file)
@@ -13,8 +13,7 @@ class MyThreadWorker extends ThreadWorker<MyData, Promise<MyResponse>> {
   constructor () {
     // eslint-disable-next-line @typescript-eslint/promise-function-async
     super((data: MyData) => this.process(data), {
-      maxInactiveTime: 60000,
-      async: true
+      maxInactiveTime: 60000
     })
   }
 
index da8a5eb69c9f69b7744dfe23525877e88680290a..da0f2e77358525336a5936a42841edce9016f44b 100644 (file)
@@ -466,8 +466,8 @@ export abstract class AbstractPool<
   protected abstract destroyWorker (worker: Worker): void | Promise<void>
 
   /**
-   * Setup hook to execute code before worker node are created in the abstract constructor.
-   * Can be overridden
+   * Setup hook to execute code before worker nodes are created in the abstract constructor.
+   * Can be overridden.
    *
    * @virtual
    */
@@ -692,10 +692,11 @@ export abstract class AbstractPool<
 
   /**
    * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
+   * Can be overridden.
    *
    * @param worker - The newly created worker.
    */
-  private afterWorkerSetup (worker: Worker): void {
+  protected afterWorkerSetup (worker: Worker): void {
     // Listen to worker messages.
     this.registerWorkerMessageListener(worker, this.workerListener())
   }
index 7a3b630b8dd8bf261dac5a207542100f423f0c42..a88197f715bdd2c2a129638d3960dce9d9ac8420 100644 (file)
@@ -30,8 +30,6 @@ export interface ClusterPoolOptions extends PoolOptions<Worker> {
 /**
  * A cluster pool with a fixed number of workers.
  *
- * It is possible to perform tasks in sync or asynchronous mode as you prefer.
- *
  * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
  * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
  * @author [Christopher Quadflieg](https://github.com/Shinigami92)
index 881e37f636eb3a5ba4770c0c7b90cbb994ab029e..ac629e1fb0ff7f363be0a3fe5107f2b02748c8d8 100644 (file)
@@ -29,8 +29,6 @@ export interface ThreadPoolOptions extends PoolOptions<Worker> {
 /**
  * A thread pool with a fixed number of threads.
  *
- * It is possible to perform tasks in sync or asynchronous mode as you prefer.
- *
  * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
  * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
  * @author [Alessandro Pio Ardizio](https://github.com/pioardi)
index da2fa22300c58c99998a637c31f531dba8c982c0..f330d192296dcc62a5b86a4209126d21798acff6 100644 (file)
@@ -49,14 +49,10 @@ export interface WorkerStatistics {
  *
  * @typeParam Data - Type of data sent to the worker or execution response. This can only be structured-cloneable data.
  * @typeParam ErrorData - Type of data sent to the worker triggering an error. This can only be structured-cloneable data.
- * @typeParam MainWorker - Type of main worker.
  * @internal
  */
-export interface MessageValue<
-  Data = unknown,
-  ErrorData = unknown,
-  MainWorker = NodeJS.Process | MessagePort
-> extends Task<Data> {
+export interface MessageValue<Data = unknown, ErrorData = unknown>
+  extends Task<Data> {
   /**
    * Kill code.
    */
@@ -69,10 +65,6 @@ export interface MessageValue<
    * Task performance.
    */
   readonly taskPerformance?: TaskPerformance
-  /**
-   * Reference to main worker.
-   */
-  readonly parent?: MainWorker
   /**
    * Whether to compute the given statistics or not.
    */
index 8df09ee02f1ed5078f10393d48c4c84d3428683a..8d82ed61d0f239a156ada271b5967ce31317d459 100644 (file)
@@ -1,4 +1,5 @@
 import { AsyncResource } from 'node:async_hooks'
+import type { Worker } from 'node:cluster'
 import type { MessagePort } from 'node:worker_threads'
 import { performance } from 'node:perf_hooks'
 import type {
@@ -31,7 +32,7 @@ const DEFAULT_KILL_BEHAVIOR: KillBehavior = KillBehaviors.SOFT
  * @typeParam Response - Type of response the worker sends back to the main worker. This can only be structured-cloneable data.
  */
 export abstract class AbstractWorker<
-  MainWorker extends NodeJS.Process | MessagePort,
+  MainWorker extends Worker | MessagePort,
   Data = unknown,
   Response = unknown
 > extends AsyncResource {
@@ -145,9 +146,7 @@ export abstract class AbstractWorker<
    *
    * @param message - Message received.
    */
-  protected messageListener (
-    message: MessageValue<Data, Data, MainWorker>
-  ): void {
+  protected messageListener (message: MessageValue<Data, Data>): void {
     if (message.id != null && message.data != null) {
       // Task message received
       const fn = this.getTaskFunction(message.name)
@@ -156,9 +155,6 @@ export abstract class AbstractWorker<
       } else {
         this.runInAsyncScope(this.runSync.bind(this), this, fn, message)
       }
-    } else if (message.parent != null) {
-      // Main worker reference message received
-      this.mainWorker = message.parent
     } else if (message.statistics != null) {
       // Statistics message received
       this.statistics = message.statistics
index efc17acf0b9205f05ff621e399b448af2cfbbb47..13735b1d697c076c547981e9d5aad8bd22633f84 100644 (file)
@@ -1,4 +1,4 @@
-import cluster from 'node:cluster'
+import cluster, { type Worker } from 'node:cluster'
 import type { MessageValue } from '../utility-types'
 import { AbstractWorker } from './abstract-worker'
 import type { WorkerOptions } from './worker-options'
@@ -21,7 +21,7 @@ import type { TaskFunctions, WorkerFunction } from './worker-functions'
 export class ClusterWorker<
   Data = unknown,
   Response = unknown
-> extends AbstractWorker<NodeJS.Process, Data, Response> {
+> extends AbstractWorker<Worker, Data, Response> {
   /**
    * Constructs a new poolifier cluster worker.
    *
@@ -38,18 +38,14 @@ export class ClusterWorker<
       'worker-cluster-pool:poolifier',
       cluster.isPrimary,
       taskFunctions,
-      process,
+      cluster.worker as Worker,
       opts
     )
   }
 
   /** @inheritDoc */
   protected sendToMainWorker (message: MessageValue<Response>): void {
-    const mainWorker = this.getMainWorker()
-    if (mainWorker.send == null) {
-      throw new Error('Main worker does not support IPC communication')
-    }
-    mainWorker.send(message)
+    this.getMainWorker().send(message)
   }
 
   /** @inheritDoc */
index 09278851c72e31070e31784447ad8cda0e637a56..fba8423f4f46dec5e7c169d5316b82e5b3b2056e 100644 (file)
@@ -13,6 +13,5 @@ async function error (data) {
 
 module.exports = new ClusterWorker(error, {
   maxInactiveTime: 500,
-  async: true,
   killBehavior: KillBehaviors.HARD
 })
index e8866877c0ec2ac6d8f5aed1ce70666ddac85b13..c9bcc8921474dc1f0af630a0268097ad24a8ee95 100644 (file)
@@ -8,6 +8,5 @@ async function sleep (data) {
 
 module.exports = new ClusterWorker(sleep, {
   maxInactiveTime: 500,
-  async: true,
   killBehavior: KillBehaviors.HARD
 })
index 005699e92195beafe6c24bcbfeb1938e823f8efe..cb92fcaf3a3bb093e928d0f3ac891052ad87c9af 100644 (file)
@@ -8,6 +8,5 @@ async function sleep (data) {
 
 module.exports = new ClusterWorker(sleep, {
   maxInactiveTime: 500,
-  async: true,
   killBehavior: KillBehaviors.HARD
 })
index 5c6fdf42679b72a7e213ce098240363016cf366c..c9515a33adfbcdfcbfae0dc5df072fe06d7996f3 100644 (file)
@@ -7,6 +7,5 @@ async function sleep (data) {
 }
 
 module.exports = new ClusterWorker(sleep, {
-  maxInactiveTime: 500,
-  async: true
+  maxInactiveTime: 500
 })
index ac9663304c101eb49504fe8ce989c7610228afe8..1078982305231aa813708cda87f1d44c19fbe36d 100644 (file)
@@ -13,6 +13,5 @@ async function error (data) {
 
 module.exports = new ThreadWorker(error, {
   maxInactiveTime: 500,
-  async: true,
   killBehavior: KillBehaviors.HARD
 })
index 5af97b0734e6da16643b763a785d4069e6f48522..d5d9b3101ba862f06b12882b2a896aa3ec32c1f3 100644 (file)
@@ -8,6 +8,5 @@ async function sleep (data) {
 
 module.exports = new ThreadWorker(sleep, {
   maxInactiveTime: 500,
-  async: true,
   killBehavior: KillBehaviors.HARD
 })
index 29c633bbb2d2bb3f12163a70cc2adfe4c7481306..791853e368a62b24f4da5520fb8b0b81885ea316 100644 (file)
@@ -8,6 +8,5 @@ async function sleep (data) {
 
 module.exports = new ThreadWorker(sleep, {
   maxInactiveTime: 500,
-  async: true,
   killBehavior: KillBehaviors.HARD
 })
index c6fa5393f44830bd9e8af28dabc473ad6198d8b3..8adb43a7e322eccdbd7ba70e847310eb2ee63a9c 100644 (file)
@@ -7,6 +7,5 @@ async function sleep (data) {
 }
 
 module.exports = new ThreadWorker(sleep, {
-  maxInactiveTime: 500,
-  async: true
+  maxInactiveTime: 500
 })