From 6677a3d36e9e7241c54db7cd69daa40f52fcbcb3 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Mon, 26 Jun 2023 23:43:13 +0200 Subject: [PATCH] refactor: cleanup internal pool messaging code MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- .eslintrc.js | 1 - .vscode/settings.json | 2 +- CHANGELOG.md | 2 +- README.md | 2 +- examples/typescript/worker.ts | 3 +-- src/pools/abstract-pool.ts | 7 ++++--- src/pools/cluster/fixed.ts | 2 -- src/pools/thread/fixed.ts | 2 -- src/utility-types.ts | 12 ++---------- src/worker/abstract-worker.ts | 10 +++------- src/worker/cluster-worker.ts | 12 ++++-------- tests/worker-files/cluster/asyncErrorWorker.js | 1 - tests/worker-files/cluster/asyncWorker.js | 1 - .../cluster/longRunningWorkerHardBehavior.js | 1 - .../cluster/longRunningWorkerSoftBehavior.js | 3 +-- tests/worker-files/thread/asyncErrorWorker.js | 1 - tests/worker-files/thread/asyncWorker.js | 1 - .../thread/longRunningWorkerHardBehavior.js | 1 - .../thread/longRunningWorkerSoftBehavior.js | 3 +-- 19 files changed, 19 insertions(+), 48 deletions(-) diff --git a/.eslintrc.js b/.eslintrc.js index e160b189..e421cd28 100644 --- a/.eslintrc.js +++ b/.eslintrc.js @@ -72,7 +72,6 @@ module.exports = defineConfig({ 'poolify', 'readonly', 'resize', - 'serializable', 'sinon', 'threadjs', 'threadwork', diff --git a/.vscode/settings.json b/.vscode/settings.json index 1004671c..20e48e2d 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -8,6 +8,7 @@ "autobuild", "Benoit", "caffeinate", + "cloneable", "codeql", "commitlint", "Dependabot", @@ -34,7 +35,6 @@ "poolify", "preinstall", "Quadflieg", - "serializable", "Shinigami", "sonarsource", "suchmokuo", diff --git a/CHANGELOG.md b/CHANGELOG.md index 3b544676..3c9ab37f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -548,7 +548,7 @@ const { DynamicThreadPool } = require('poolifier') #### New type definitions for input data and response -For cluster worker and worker-thread pools, you can now only send and receive serializable data. +For cluster worker and worker-thread pools, you can now only send and receive structured-cloneable data. _This is not a limitation by poolifier but NodeJS._ #### Public property replacements diff --git a/README.md b/README.md index cb7a1ad9..f6bd0f1b 100644 --- a/README.md +++ b/README.md @@ -142,7 +142,7 @@ You can do the same with the classes ClusterWorker, FixedClusterPool and Dynamic **See examples folder for more details (in particular if you want to use a pool with [multiple worker functions](./examples/multiFunctionExample.js))**. -Remember that workers can only send and receive serializable data. +Remember that workers can only send and receive structured-cloneable data. ## Node versions diff --git a/examples/typescript/worker.ts b/examples/typescript/worker.ts index adc8c434..81d39df3 100644 --- a/examples/typescript/worker.ts +++ b/examples/typescript/worker.ts @@ -13,8 +13,7 @@ class MyThreadWorker extends ThreadWorker> { constructor () { // eslint-disable-next-line @typescript-eslint/promise-function-async super((data: MyData) => this.process(data), { - maxInactiveTime: 60000, - async: true + maxInactiveTime: 60000 }) } diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index da8a5eb6..da0f2e77 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -466,8 +466,8 @@ export abstract class AbstractPool< protected abstract destroyWorker (worker: Worker): void | Promise /** - * Setup hook to execute code before worker node are created in the abstract constructor. - * Can be overridden + * Setup hook to execute code before worker nodes are created in the abstract constructor. + * Can be overridden. * * @virtual */ @@ -692,10 +692,11 @@ export abstract class AbstractPool< /** * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes. + * Can be overridden. * * @param worker - The newly created worker. */ - private afterWorkerSetup (worker: Worker): void { + protected afterWorkerSetup (worker: Worker): void { // Listen to worker messages. this.registerWorkerMessageListener(worker, this.workerListener()) } diff --git a/src/pools/cluster/fixed.ts b/src/pools/cluster/fixed.ts index 7a3b630b..a88197f7 100644 --- a/src/pools/cluster/fixed.ts +++ b/src/pools/cluster/fixed.ts @@ -30,8 +30,6 @@ export interface ClusterPoolOptions extends PoolOptions { /** * A cluster pool with a fixed number of workers. * - * It is possible to perform tasks in sync or asynchronous mode as you prefer. - * * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data. * @typeParam Response - Type of execution response. This can only be structured-cloneable data. * @author [Christopher Quadflieg](https://github.com/Shinigami92) diff --git a/src/pools/thread/fixed.ts b/src/pools/thread/fixed.ts index 881e37f6..ac629e1f 100644 --- a/src/pools/thread/fixed.ts +++ b/src/pools/thread/fixed.ts @@ -29,8 +29,6 @@ export interface ThreadPoolOptions extends PoolOptions { /** * A thread pool with a fixed number of threads. * - * It is possible to perform tasks in sync or asynchronous mode as you prefer. - * * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data. * @typeParam Response - Type of execution response. This can only be structured-cloneable data. * @author [Alessandro Pio Ardizio](https://github.com/pioardi) diff --git a/src/utility-types.ts b/src/utility-types.ts index da2fa223..f330d192 100644 --- a/src/utility-types.ts +++ b/src/utility-types.ts @@ -49,14 +49,10 @@ export interface WorkerStatistics { * * @typeParam Data - Type of data sent to the worker or execution response. This can only be structured-cloneable data. * @typeParam ErrorData - Type of data sent to the worker triggering an error. This can only be structured-cloneable data. - * @typeParam MainWorker - Type of main worker. * @internal */ -export interface MessageValue< - Data = unknown, - ErrorData = unknown, - MainWorker = NodeJS.Process | MessagePort -> extends Task { +export interface MessageValue + extends Task { /** * Kill code. */ @@ -69,10 +65,6 @@ export interface MessageValue< * Task performance. */ readonly taskPerformance?: TaskPerformance - /** - * Reference to main worker. - */ - readonly parent?: MainWorker /** * Whether to compute the given statistics or not. */ diff --git a/src/worker/abstract-worker.ts b/src/worker/abstract-worker.ts index 8df09ee0..8d82ed61 100644 --- a/src/worker/abstract-worker.ts +++ b/src/worker/abstract-worker.ts @@ -1,4 +1,5 @@ import { AsyncResource } from 'node:async_hooks' +import type { Worker } from 'node:cluster' import type { MessagePort } from 'node:worker_threads' import { performance } from 'node:perf_hooks' import type { @@ -31,7 +32,7 @@ const DEFAULT_KILL_BEHAVIOR: KillBehavior = KillBehaviors.SOFT * @typeParam Response - Type of response the worker sends back to the main worker. This can only be structured-cloneable data. */ export abstract class AbstractWorker< - MainWorker extends NodeJS.Process | MessagePort, + MainWorker extends Worker | MessagePort, Data = unknown, Response = unknown > extends AsyncResource { @@ -145,9 +146,7 @@ export abstract class AbstractWorker< * * @param message - Message received. */ - protected messageListener ( - message: MessageValue - ): void { + protected messageListener (message: MessageValue): void { if (message.id != null && message.data != null) { // Task message received const fn = this.getTaskFunction(message.name) @@ -156,9 +155,6 @@ export abstract class AbstractWorker< } else { this.runInAsyncScope(this.runSync.bind(this), this, fn, message) } - } else if (message.parent != null) { - // Main worker reference message received - this.mainWorker = message.parent } else if (message.statistics != null) { // Statistics message received this.statistics = message.statistics diff --git a/src/worker/cluster-worker.ts b/src/worker/cluster-worker.ts index efc17acf..13735b1d 100644 --- a/src/worker/cluster-worker.ts +++ b/src/worker/cluster-worker.ts @@ -1,4 +1,4 @@ -import cluster from 'node:cluster' +import cluster, { type Worker } from 'node:cluster' import type { MessageValue } from '../utility-types' import { AbstractWorker } from './abstract-worker' import type { WorkerOptions } from './worker-options' @@ -21,7 +21,7 @@ import type { TaskFunctions, WorkerFunction } from './worker-functions' export class ClusterWorker< Data = unknown, Response = unknown -> extends AbstractWorker { +> extends AbstractWorker { /** * Constructs a new poolifier cluster worker. * @@ -38,18 +38,14 @@ export class ClusterWorker< 'worker-cluster-pool:poolifier', cluster.isPrimary, taskFunctions, - process, + cluster.worker as Worker, opts ) } /** @inheritDoc */ protected sendToMainWorker (message: MessageValue): void { - const mainWorker = this.getMainWorker() - if (mainWorker.send == null) { - throw new Error('Main worker does not support IPC communication') - } - mainWorker.send(message) + this.getMainWorker().send(message) } /** @inheritDoc */ diff --git a/tests/worker-files/cluster/asyncErrorWorker.js b/tests/worker-files/cluster/asyncErrorWorker.js index 09278851..fba8423f 100644 --- a/tests/worker-files/cluster/asyncErrorWorker.js +++ b/tests/worker-files/cluster/asyncErrorWorker.js @@ -13,6 +13,5 @@ async function error (data) { module.exports = new ClusterWorker(error, { maxInactiveTime: 500, - async: true, killBehavior: KillBehaviors.HARD }) diff --git a/tests/worker-files/cluster/asyncWorker.js b/tests/worker-files/cluster/asyncWorker.js index e8866877..c9bcc892 100644 --- a/tests/worker-files/cluster/asyncWorker.js +++ b/tests/worker-files/cluster/asyncWorker.js @@ -8,6 +8,5 @@ async function sleep (data) { module.exports = new ClusterWorker(sleep, { maxInactiveTime: 500, - async: true, killBehavior: KillBehaviors.HARD }) diff --git a/tests/worker-files/cluster/longRunningWorkerHardBehavior.js b/tests/worker-files/cluster/longRunningWorkerHardBehavior.js index 005699e9..cb92fcaf 100644 --- a/tests/worker-files/cluster/longRunningWorkerHardBehavior.js +++ b/tests/worker-files/cluster/longRunningWorkerHardBehavior.js @@ -8,6 +8,5 @@ async function sleep (data) { module.exports = new ClusterWorker(sleep, { maxInactiveTime: 500, - async: true, killBehavior: KillBehaviors.HARD }) diff --git a/tests/worker-files/cluster/longRunningWorkerSoftBehavior.js b/tests/worker-files/cluster/longRunningWorkerSoftBehavior.js index 5c6fdf42..c9515a33 100644 --- a/tests/worker-files/cluster/longRunningWorkerSoftBehavior.js +++ b/tests/worker-files/cluster/longRunningWorkerSoftBehavior.js @@ -7,6 +7,5 @@ async function sleep (data) { } module.exports = new ClusterWorker(sleep, { - maxInactiveTime: 500, - async: true + maxInactiveTime: 500 }) diff --git a/tests/worker-files/thread/asyncErrorWorker.js b/tests/worker-files/thread/asyncErrorWorker.js index ac966330..10789823 100644 --- a/tests/worker-files/thread/asyncErrorWorker.js +++ b/tests/worker-files/thread/asyncErrorWorker.js @@ -13,6 +13,5 @@ async function error (data) { module.exports = new ThreadWorker(error, { maxInactiveTime: 500, - async: true, killBehavior: KillBehaviors.HARD }) diff --git a/tests/worker-files/thread/asyncWorker.js b/tests/worker-files/thread/asyncWorker.js index 5af97b07..d5d9b310 100644 --- a/tests/worker-files/thread/asyncWorker.js +++ b/tests/worker-files/thread/asyncWorker.js @@ -8,6 +8,5 @@ async function sleep (data) { module.exports = new ThreadWorker(sleep, { maxInactiveTime: 500, - async: true, killBehavior: KillBehaviors.HARD }) diff --git a/tests/worker-files/thread/longRunningWorkerHardBehavior.js b/tests/worker-files/thread/longRunningWorkerHardBehavior.js index 29c633bb..791853e3 100644 --- a/tests/worker-files/thread/longRunningWorkerHardBehavior.js +++ b/tests/worker-files/thread/longRunningWorkerHardBehavior.js @@ -8,6 +8,5 @@ async function sleep (data) { module.exports = new ThreadWorker(sleep, { maxInactiveTime: 500, - async: true, killBehavior: KillBehaviors.HARD }) diff --git a/tests/worker-files/thread/longRunningWorkerSoftBehavior.js b/tests/worker-files/thread/longRunningWorkerSoftBehavior.js index c6fa5393..8adb43a7 100644 --- a/tests/worker-files/thread/longRunningWorkerSoftBehavior.js +++ b/tests/worker-files/thread/longRunningWorkerSoftBehavior.js @@ -7,6 +7,5 @@ async function sleep (data) { } module.exports = new ThreadWorker(sleep, { - maxInactiveTime: 500, - async: true + maxInactiveTime: 500 }) -- 2.34.1