From: Shinigami Date: Fri, 12 Feb 2021 07:17:48 +0000 (+0100) Subject: Encapsulate logic of cluster and thread worker/pool (#116) X-Git-Tag: v2.0.0-beta.2~52 X-Git-Url: https://git.piment-noir.org/?a=commitdiff_plain;h=c97c7edb14ea0699cd82bce5d0ffe50ae26af667;p=poolifier.git Encapsulate logic of cluster and thread worker/pool (#116) Co-authored-by: aardizio Co-authored-by: Jérôme Benoit --- diff --git a/.vscode/launch.json b/.vscode/launch.json index 8a86588a..c2c4c33f 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -11,13 +11,8 @@ "cwd": "${workspaceFolder}", "preLaunchTask": "Development build", "runtimeExecutable": "npm", - "runtimeArgs": [ - "run-script", - "test:debug:vscode" - ], - "skipFiles": [ - "/**" - ], + "runtimeArgs": ["run-script", "test:debug:vscode"], + "skipFiles": ["/**"], "stopOnEntry": true }, { @@ -27,13 +22,8 @@ "cwd": "${workspaceFolder}", "preLaunchTask": "Development build", "runtimeExecutable": "npm", - "runtimeArgs": [ - "run-script", - "benchmark:debug:vscode" - ], - "skipFiles": [ - "/**" - ], + "runtimeArgs": ["run-script", "benchmark:debug:vscode"], + "skipFiles": ["/**"], "stopOnEntry": true } ] diff --git a/package-lock.json b/package-lock.json index 68599a1c..929cc486 100644 --- a/package-lock.json +++ b/package-lock.json @@ -48,32 +48,11 @@ "source-map": "^0.5.0" }, "dependencies": { - "@babel/parser": { - "version": "7.12.15", - "resolved": "https://registry.npmjs.org/@babel/parser/-/parser-7.12.15.tgz", - "integrity": "sha512-AQBOU2Z9kWwSZMd6lNjCX0GUgFonL1wAM1db8L8PMk9UDaGsRCArBkU4Sc+UCM3AE4hjbXx+h58Lb3QT4oRmrA==", - "dev": true - }, - "json5": { - "version": "2.2.0", - "resolved": "https://registry.npmjs.org/json5/-/json5-2.2.0.tgz", - "integrity": "sha512-f+8cldu7X/y7RAJurMEJmdoKXGB/X550w2Nr3tTbezL6RwEE/iMcm+tZnXeoZtKuOq6ft8+CqzEkrIgx1fPoQA==", - "dev": true, - "requires": { - "minimist": "^1.2.5" - } - }, "semver": { "version": "5.7.1", "resolved": "https://registry.npmjs.org/semver/-/semver-5.7.1.tgz", "integrity": "sha512-sauaDf/PZdVgrLTNYHRtpXa1iRiKcaebiKQ1BJdpQlWH2lCvexQdX55snPFyK7QzpudqbCI0qXFfOasHdyNDGQ==", "dev": true - }, - "source-map": { - "version": "0.5.7", - "resolved": "https://registry.npmjs.org/source-map/-/source-map-0.5.7.tgz", - "integrity": "sha1-igOdLRAh0i0eoUyA2OpGi6LvP8w=", - "dev": true } } }, @@ -86,14 +65,6 @@ "@babel/types": "^7.12.13", "jsesc": "^2.5.1", "source-map": "^0.5.0" - }, - "dependencies": { - "source-map": { - "version": "0.5.7", - "resolved": "https://registry.npmjs.org/source-map/-/source-map-0.5.7.tgz", - "integrity": "sha1-igOdLRAh0i0eoUyA2OpGi6LvP8w=", - "dev": true - } } }, "@babel/helper-annotate-as-pure": { @@ -356,9 +327,9 @@ } }, "@babel/parser": { - "version": "7.12.11", - "resolved": "https://registry.npmjs.org/@babel/parser/-/parser-7.12.11.tgz", - "integrity": "sha512-N3UxG+uuF4CMYoNj8AhnbAcJF0PiuJ9KHuy1lQmkYsxTer/MAH9UBNHsBoAX/4s6NvlDD047No8mYVGGzLL4hg==", + "version": "7.12.15", + "resolved": "https://registry.npmjs.org/@babel/parser/-/parser-7.12.15.tgz", + "integrity": "sha512-AQBOU2Z9kWwSZMd6lNjCX0GUgFonL1wAM1db8L8PMk9UDaGsRCArBkU4Sc+UCM3AE4hjbXx+h58Lb3QT4oRmrA==", "dev": true }, "@babel/plugin-proposal-async-generator-functions": { @@ -653,14 +624,6 @@ "@babel/helper-replace-supers": "^7.12.13", "@babel/helper-split-export-declaration": "^7.12.13", "globals": "^11.1.0" - }, - "dependencies": { - "globals": { - "version": "11.12.0", - "resolved": "https://registry.npmjs.org/globals/-/globals-11.12.0.tgz", - "integrity": "sha512-WOBp/EEGUiIsJSp7wcv/y6MO+lV9UoncWqxuFfm8eBwzWNgyfBd6Gz+IeKQ9jCmyhoH99g15M3T+QaVHFjizVA==", - "dev": true - } } }, "@babel/plugin-transform-computed-properties": { @@ -1054,14 +1017,6 @@ "@babel/code-frame": "^7.12.13", "@babel/parser": "^7.12.13", "@babel/types": "^7.12.13" - }, - "dependencies": { - "@babel/parser": { - "version": "7.12.15", - "resolved": "https://registry.npmjs.org/@babel/parser/-/parser-7.12.15.tgz", - "integrity": "sha512-AQBOU2Z9kWwSZMd6lNjCX0GUgFonL1wAM1db8L8PMk9UDaGsRCArBkU4Sc+UCM3AE4hjbXx+h58Lb3QT4oRmrA==", - "dev": true - } } }, "@babel/traverse": { @@ -1079,20 +1034,6 @@ "debug": "^4.1.0", "globals": "^11.1.0", "lodash": "^4.17.19" - }, - "dependencies": { - "@babel/parser": { - "version": "7.12.15", - "resolved": "https://registry.npmjs.org/@babel/parser/-/parser-7.12.15.tgz", - "integrity": "sha512-AQBOU2Z9kWwSZMd6lNjCX0GUgFonL1wAM1db8L8PMk9UDaGsRCArBkU4Sc+UCM3AE4hjbXx+h58Lb3QT4oRmrA==", - "dev": true - }, - "globals": { - "version": "11.12.0", - "resolved": "https://registry.npmjs.org/globals/-/globals-11.12.0.tgz", - "integrity": "sha512-WOBp/EEGUiIsJSp7wcv/y6MO+lV9UoncWqxuFfm8eBwzWNgyfBd6Gz+IeKQ9jCmyhoH99g15M3T+QaVHFjizVA==", - "dev": true - } } }, "@babel/types": { @@ -1124,6 +1065,15 @@ "strip-json-comments": "^3.1.1" }, "dependencies": { + "globals": { + "version": "12.4.0", + "resolved": "https://registry.npmjs.org/globals/-/globals-12.4.0.tgz", + "integrity": "sha512-BWICuzzDvDoH54NHKCseDanAhE3CeDorgDL5MT6LMXXj2WCnd9UC2szdk4AWLfjdgNBCXLUanXYcpBBKOSWGwg==", + "dev": true, + "requires": { + "type-fest": "^0.8.1" + } + }, "ignore": { "version": "4.0.6", "resolved": "https://registry.npmjs.org/ignore/-/ignore-4.0.6.tgz", @@ -2198,14 +2148,6 @@ "dev": true, "requires": { "safe-buffer": "~5.1.1" - }, - "dependencies": { - "safe-buffer": { - "version": "5.1.2", - "resolved": "https://registry.npmjs.org/safe-buffer/-/safe-buffer-5.1.2.tgz", - "integrity": "sha512-Gd2UZBJDkXlY7GbJxfsE8/nvKkUEU1G38c1siN6QP6a9PT9MmHB8GnpscSmMJSoF8LOIrt8ud/wPtojys4G6+g==", - "dev": true - } } }, "core-js-compat": { @@ -2605,6 +2547,15 @@ "v8-compile-cache": "^2.0.3" }, "dependencies": { + "globals": { + "version": "12.4.0", + "resolved": "https://registry.npmjs.org/globals/-/globals-12.4.0.tgz", + "integrity": "sha512-BWICuzzDvDoH54NHKCseDanAhE3CeDorgDL5MT6LMXXj2WCnd9UC2szdk4AWLfjdgNBCXLUanXYcpBBKOSWGwg==", + "dev": true, + "requires": { + "type-fest": "^0.8.1" + } + }, "ignore": { "version": "4.0.6", "resolved": "https://registry.npmjs.org/ignore/-/ignore-4.0.6.tgz", @@ -3249,13 +3200,10 @@ } }, "globals": { - "version": "12.4.0", - "resolved": "https://registry.npmjs.org/globals/-/globals-12.4.0.tgz", - "integrity": "sha512-BWICuzzDvDoH54NHKCseDanAhE3CeDorgDL5MT6LMXXj2WCnd9UC2szdk4AWLfjdgNBCXLUanXYcpBBKOSWGwg==", - "dev": true, - "requires": { - "type-fest": "^0.8.1" - } + "version": "11.12.0", + "resolved": "https://registry.npmjs.org/globals/-/globals-11.12.0.tgz", + "integrity": "sha512-WOBp/EEGUiIsJSp7wcv/y6MO+lV9UoncWqxuFfm8eBwzWNgyfBd6Gz+IeKQ9jCmyhoH99g15M3T+QaVHFjizVA==", + "dev": true }, "globby": { "version": "11.0.2", @@ -3272,9 +3220,9 @@ } }, "graceful-fs": { - "version": "4.2.5", - "resolved": "https://registry.npmjs.org/graceful-fs/-/graceful-fs-4.2.5.tgz", - "integrity": "sha512-kBBSQbz2K0Nyn+31j/w36fUfxkBW9/gfwRWdUY1ULReH3iokVJgddZAFcD1D0xlgTmFxJCbUkUclAlc6/IDJkw==", + "version": "4.2.6", + "resolved": "https://registry.npmjs.org/graceful-fs/-/graceful-fs-4.2.6.tgz", + "integrity": "sha512-nTnJ528pbqxYanhpDYsi4Rd8MAeaBA67+RZ10CM1m3bTAVFEDcd5AuA4a6W5YkGZ1iNXHzZz8T6TBKLeBuNriQ==", "dev": true }, "graphql": { @@ -3300,6 +3248,14 @@ "source-map": "^0.6.1", "uglify-js": "^3.1.4", "wordwrap": "^1.0.0" + }, + "dependencies": { + "source-map": { + "version": "0.6.1", + "resolved": "https://registry.npmjs.org/source-map/-/source-map-0.6.1.tgz", + "integrity": "sha512-UjgapumWlbMhkBgzT7Ykc5YXUT46F0iKu8SGXq0bcwP5dz/h0Plj6enJqjz1Zbq2l5WaqYnrVbwWOWMyF3F47g==", + "dev": true + } } }, "har-schema": { @@ -3733,6 +3689,14 @@ "debug": "^4.1.1", "istanbul-lib-coverage": "^3.0.0", "source-map": "^0.6.1" + }, + "dependencies": { + "source-map": { + "version": "0.6.1", + "resolved": "https://registry.npmjs.org/source-map/-/source-map-0.6.1.tgz", + "integrity": "sha512-UjgapumWlbMhkBgzT7Ykc5YXUT46F0iKu8SGXq0bcwP5dz/h0Plj6enJqjz1Zbq2l5WaqYnrVbwWOWMyF3F47g==", + "dev": true + } } }, "istanbul-reports": { @@ -3875,12 +3839,12 @@ "dev": true }, "json5": { - "version": "1.0.1", - "resolved": "https://registry.npmjs.org/json5/-/json5-1.0.1.tgz", - "integrity": "sha512-aKS4WQjPenRxiQsC93MNfjx+nbF4PAdYzmd/1JIj8HYzqfbu86beTuNgXDzPknWk0n0uARlyewZo4s++ES36Ow==", + "version": "2.2.0", + "resolved": "https://registry.npmjs.org/json5/-/json5-2.2.0.tgz", + "integrity": "sha512-f+8cldu7X/y7RAJurMEJmdoKXGB/X550w2Nr3tTbezL6RwEE/iMcm+tZnXeoZtKuOq6ft8+CqzEkrIgx1fPoQA==", "dev": true, "requires": { - "minimist": "^1.2.0" + "minimist": "^1.2.5" } }, "jsonify": { @@ -4729,6 +4693,12 @@ } } }, + "source-map": { + "version": "0.6.1", + "resolved": "https://registry.npmjs.org/source-map/-/source-map-0.6.1.tgz", + "integrity": "sha512-UjgapumWlbMhkBgzT7Ykc5YXUT46F0iKu8SGXq0bcwP5dz/h0Plj6enJqjz1Zbq2l5WaqYnrVbwWOWMyF3F47g==", + "dev": true + }, "supports-color": { "version": "6.1.0", "resolved": "https://registry.npmjs.org/supports-color/-/supports-color-6.1.0.tgz", @@ -4889,6 +4859,12 @@ "@babel/highlight": "^7.10.4" } }, + "@babel/parser": { + "version": "7.12.11", + "resolved": "https://registry.npmjs.org/@babel/parser/-/parser-7.12.11.tgz", + "integrity": "sha512-N3UxG+uuF4CMYoNj8AhnbAcJF0PiuJ9KHuy1lQmkYsxTer/MAH9UBNHsBoAX/4s6NvlDD047No8mYVGGzLL4hg==", + "dev": true + }, "@typescript-eslint/typescript-estree": { "version": "2.34.0", "resolved": "https://registry.npmjs.org/@typescript-eslint/typescript-estree/-/typescript-estree-2.34.0.tgz", @@ -4921,6 +4897,16 @@ "resolved": "https://registry.npmjs.org/ignore/-/ignore-4.0.6.tgz", "integrity": "sha512-cyFDKrqc/YdcWFniJhzI42+AzS+gNwmUzOSFcRCQYwySuBBBy/KjuxWLZ/FHEH6Moq1NizMOBWyTcv8O4OZIMg==", "dev": true + }, + "resolve": { + "version": "1.19.0", + "resolved": "https://registry.npmjs.org/resolve/-/resolve-1.19.0.tgz", + "integrity": "sha512-rArEXAgsBG4UgRGcynxWIWKFvh/XZCcS8UJdHhwy91zwAvCZIbcs+vAbflgBnNjYMs/i/i+/Ux6IZhML1yPvxg==", + "dev": true, + "requires": { + "is-core-module": "^2.1.0", + "path-parse": "^1.0.6" + } } } }, @@ -5001,6 +4987,12 @@ "integrity": "sha512-N5ZAX4/LxJmF+7wN74pUD6qAh9/wnvdQcjq9TZjevvXzSUo7bfmw91saqMjzGS2xq91/odN2dW/WOl7qQHNDGA==", "dev": true }, + "queue-microtask": { + "version": "1.2.2", + "resolved": "https://registry.npmjs.org/queue-microtask/-/queue-microtask-1.2.2.tgz", + "integrity": "sha512-dB15eXv3p2jDlbOiNLyMabYg1/sXvppd8DP2J3EOCQ0AkuSXCW2tP7mnVouVLJKgUMY6yP0kcQDVpLCN13h4Xg==", + "dev": true + }, "randombytes": { "version": "2.1.0", "resolved": "https://registry.npmjs.org/randombytes/-/randombytes-2.1.0.tgz", @@ -5233,12 +5225,12 @@ "dev": true }, "resolve": { - "version": "1.19.0", - "resolved": "https://registry.npmjs.org/resolve/-/resolve-1.19.0.tgz", - "integrity": "sha512-rArEXAgsBG4UgRGcynxWIWKFvh/XZCcS8UJdHhwy91zwAvCZIbcs+vAbflgBnNjYMs/i/i+/Ux6IZhML1yPvxg==", + "version": "1.20.0", + "resolved": "https://registry.npmjs.org/resolve/-/resolve-1.20.0.tgz", + "integrity": "sha512-wENBPt4ySzg4ybFQW2TT1zMQucPK95HSh/nq2CFTZVOGut2+pQvSsgtda4d26YrYcr067wjbmzOG8byDPBX63A==", "dev": true, "requires": { - "is-core-module": "^2.1.0", + "is-core-module": "^2.2.0", "path-parse": "^1.0.6" } }, @@ -5291,15 +5283,18 @@ } }, "run-parallel": { - "version": "1.1.10", - "resolved": "https://registry.npmjs.org/run-parallel/-/run-parallel-1.1.10.tgz", - "integrity": "sha512-zb/1OuZ6flOlH6tQyMPUrE3x3Ulxjlo9WIVXR4yVYi4H9UXQaeIsPbLn2R3O3vQCnDKkAl2qHiuocKKX4Tz/Sw==", - "dev": true + "version": "1.2.0", + "resolved": "https://registry.npmjs.org/run-parallel/-/run-parallel-1.2.0.tgz", + "integrity": "sha512-5l4VyZR86LZ/lDxZTR6jqL8AFE2S0IFLMP26AbjsLVADxHdhB/c0GUsH+y39UfCi3dzz8OlQuPmnaJOMoDHQBA==", + "dev": true, + "requires": { + "queue-microtask": "^1.2.2" + } }, "safe-buffer": { - "version": "5.2.1", - "resolved": "https://registry.npmjs.org/safe-buffer/-/safe-buffer-5.2.1.tgz", - "integrity": "sha512-rp3So07KcdmmKbGvgaNxQSJr7bGVSVk5S9Eq1F+ppbRo70+YeaDxkw5Dd8NPN+GD6bjnYm2VuPuCXmpuYvmCXQ==", + "version": "5.1.2", + "resolved": "https://registry.npmjs.org/safe-buffer/-/safe-buffer-5.1.2.tgz", + "integrity": "sha512-Gd2UZBJDkXlY7GbJxfsE8/nvKkUEU1G38c1siN6QP6a9PT9MmHB8GnpscSmMJSoF8LOIrt8ud/wPtojys4G6+g==", "dev": true }, "safer-buffer": { @@ -5415,9 +5410,9 @@ } }, "source-map": { - "version": "0.6.1", - "resolved": "https://registry.npmjs.org/source-map/-/source-map-0.6.1.tgz", - "integrity": "sha512-UjgapumWlbMhkBgzT7Ykc5YXUT46F0iKu8SGXq0bcwP5dz/h0Plj6enJqjz1Zbq2l5WaqYnrVbwWOWMyF3F47g==", + "version": "0.5.7", + "resolved": "https://registry.npmjs.org/source-map/-/source-map-0.5.7.tgz", + "integrity": "sha1-igOdLRAh0i0eoUyA2OpGi6LvP8w=", "dev": true }, "source-map-support": { @@ -5428,6 +5423,14 @@ "requires": { "buffer-from": "^1.0.0", "source-map": "^0.6.0" + }, + "dependencies": { + "source-map": { + "version": "0.6.1", + "resolved": "https://registry.npmjs.org/source-map/-/source-map-0.6.1.tgz", + "integrity": "sha512-UjgapumWlbMhkBgzT7Ykc5YXUT46F0iKu8SGXq0bcwP5dz/h0Plj6enJqjz1Zbq2l5WaqYnrVbwWOWMyF3F47g==", + "dev": true + } } }, "sourcemap-codec": { @@ -5608,9 +5611,9 @@ }, "dependencies": { "ajv": { - "version": "7.0.4", - "resolved": "https://registry.npmjs.org/ajv/-/ajv-7.0.4.tgz", - "integrity": "sha512-xzzzaqgEQfmuhbhAoqjJ8T/1okb6gAzXn/eQRNpAN1AEUoHJTNF9xCDRTtf/s3SKldtZfa+RJeTs+BQq+eZ/sw==", + "version": "7.1.0", + "resolved": "https://registry.npmjs.org/ajv/-/ajv-7.1.0.tgz", + "integrity": "sha512-svS9uILze/cXbH0z2myCK2Brqprx/+JJYK5pHicT/GQiBfzzhUVAIT6MwqJg8y4xV/zoGsUeuPuwtoiKSGE15g==", "dev": true, "requires": { "fast-deep-equal": "^3.1.1", @@ -5697,6 +5700,17 @@ "json5": "^1.0.1", "minimist": "^1.2.0", "strip-bom": "^3.0.0" + }, + "dependencies": { + "json5": { + "version": "1.0.1", + "resolved": "https://registry.npmjs.org/json5/-/json5-1.0.1.tgz", + "integrity": "sha512-aKS4WQjPenRxiQsC93MNfjx+nbF4PAdYzmd/1JIj8HYzqfbu86beTuNgXDzPknWk0n0uARlyewZo4s++ES36Ow==", + "dev": true, + "requires": { + "minimist": "^1.2.0" + } + } } }, "tslib": { diff --git a/src/index.ts b/src/index.ts index 95ef8255..7242dd5e 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,26 +1,18 @@ -import { DynamicClusterPool } from './pools/cluster/dynamic' -import { FixedClusterPool } from './pools/cluster/fixed' -import { DynamicThreadPool } from './pools/thread/dynamic' -import { FixedThreadPool } from './pools/thread/fixed' -import { ClusterWorker } from './worker/cluster-worker' -import { ThreadWorker } from './worker/thread-worker' - -export type { DynamicClusterPoolOptions } from './pools/cluster/dynamic' export type { - FixedClusterPoolOptions, - WorkerWithMessageChannel as ClusterWorkerWithMessageChannel -} from './pools/cluster/fixed' -export type { DynamicThreadPoolOptions } from './pools/thread/dynamic' -export type { - FixedThreadPoolOptions, - WorkerWithMessageChannel as ThreadWorkerWithMessageChannel -} from './pools/thread/fixed' + ErrorHandler, + ExitHandler, + IWorker, + OnlineHandler, + PoolOptions +} from './pools/abstract-pool' +export { DynamicClusterPool } from './pools/cluster/dynamic' +export { FixedClusterPool } from './pools/cluster/fixed' +export type { ClusterPoolOptions } from './pools/cluster/fixed' +export type { IPool } from './pools/pool' +export { DynamicThreadPool } from './pools/thread/dynamic' +export { FixedThreadPool } from './pools/thread/fixed' +export type { ThreadWorkerWithMessageChannel } from './pools/thread/fixed' +export { AbstractWorker } from './worker/abstract-worker' +export { ClusterWorker } from './worker/cluster-worker' +export { ThreadWorker } from './worker/thread-worker' export type { WorkerOptions } from './worker/worker-options' -export { - FixedThreadPool, - FixedClusterPool, - DynamicClusterPool, - DynamicThreadPool, - ThreadWorker, - ClusterWorker -} diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts new file mode 100644 index 00000000..d169afad --- /dev/null +++ b/src/pools/abstract-pool.ts @@ -0,0 +1,173 @@ +import EventEmitter from 'events' +import type { MessageValue } from '../utility-types' +import type { IPool } from './pool' + +export type ErrorHandler = (this: Worker, e: Error) => void +export type OnlineHandler = (this: Worker) => void +export type ExitHandler = (this: Worker, code: number) => void + +export interface IWorker { + on(event: 'error', handler: ErrorHandler): void + on(event: 'online', handler: OnlineHandler): void + on(event: 'exit', handler: ExitHandler): void +} + +export interface PoolOptions { + /** + * A function that will listen for error event on each worker. + */ + errorHandler?: ErrorHandler + /** + * A function that will listen for online event on each worker. + */ + onlineHandler?: OnlineHandler + /** + * A function that will listen for exit event on each worker. + */ + exitHandler?: ExitHandler + /** + * This is just to avoid not useful warnings message, is used to set `maxListeners` on event emitters (workers are event emitters). + * + * @default 1000 + */ + maxTasks?: number +} + +class PoolEmitter extends EventEmitter {} + +export abstract class AbstractPool< + Worker extends IWorker, + // eslint-disable-next-line @typescript-eslint/no-explicit-any + Data = any, + // eslint-disable-next-line @typescript-eslint/no-explicit-any + Response = any +> implements IPool { + public readonly workers: Worker[] = [] + public nextWorker: number = 0 + + /** + * `workerId` as key and an integer value + */ + public readonly tasks: Map = new Map() + + public readonly emitter: PoolEmitter + + protected id: number = 0 + + public constructor ( + public readonly numWorkers: number, + public readonly filePath: string, + public readonly opts: PoolOptions = { maxTasks: 1000 } + ) { + if (!this.isMain()) { + throw new Error('Cannot start a pool from a worker!') + } + // TODO christopher 2021-02-07: Improve this check e.g. with a pattern or blank check + if (!this.filePath) { + throw new Error('Please specify a file with a worker implementation') + } + + this.setupHook() + + for (let i = 1; i <= this.numWorkers; i++) { + this.internalNewWorker() + } + + this.emitter = new PoolEmitter() + } + + protected setupHook (): void { + // Can be overridden + } + + protected abstract isMain (): boolean + + public async destroy (): Promise { + for (const worker of this.workers) { + await this.destroyWorker(worker) + } + } + + protected abstract destroyWorker (worker: Worker): void | Promise + + protected abstract sendToWorker ( + worker: Worker, + message: MessageValue + ): void + + protected addWorker (worker: Worker): void { + const previousWorkerIndex = this.tasks.get(worker) + if (previousWorkerIndex !== undefined) { + this.tasks.set(worker, previousWorkerIndex + 1) + } else { + throw Error('Worker could not be found in tasks map') + } + } + + /** + * Execute the task specified into the constructor with the data parameter. + * + * @param data The input for the task specified. + * @returns Promise that is resolved when the task is done. + */ + public execute (data: Data): Promise { + // configure worker to handle message with the specified task + const worker = this.chooseWorker() + this.addWorker(worker) + const id = ++this.id + const res = this.internalExecute(worker, id) + this.sendToWorker(worker, { data: data || ({} as Data), id: id }) + return res + } + + protected abstract registerWorkerMessageListener ( + port: Worker, + listener: (message: MessageValue) => void + ): void + + protected abstract unregisterWorkerMessageListener ( + port: Worker, + listener: (message: MessageValue) => void + ): void + + protected internalExecute (worker: Worker, id: number): Promise { + return new Promise((resolve, reject) => { + const listener: (message: MessageValue) => void = message => { + if (message.id === id) { + this.unregisterWorkerMessageListener(worker, listener) + this.addWorker(worker) + if (message.error) reject(message.error) + else resolve(message.data as Response) + } + } + this.registerWorkerMessageListener(worker, listener) + }) + } + + protected chooseWorker (): Worker { + if (this.workers.length - 1 === this.nextWorker) { + this.nextWorker = 0 + return this.workers[this.nextWorker] + } else { + this.nextWorker++ + return this.workers[this.nextWorker] + } + } + + protected abstract newWorker (): Worker + + protected abstract afterNewWorkerPushed (worker: Worker): void + + protected internalNewWorker (): Worker { + const worker: Worker = this.newWorker() + worker.on('error', this.opts.errorHandler ?? (() => {})) + worker.on('online', this.opts.onlineHandler ?? (() => {})) + // TODO handle properly when a worker exit + worker.on('exit', this.opts.exitHandler ?? (() => {})) + this.workers.push(worker) + this.afterNewWorkerPushed(worker) + // init tasks map + this.tasks.set(worker, 0) + return worker + } +} diff --git a/src/pools/cluster/dynamic.ts b/src/pools/cluster/dynamic.ts index d4bf30f3..f2375a02 100644 --- a/src/pools/cluster/dynamic.ts +++ b/src/pools/cluster/dynamic.ts @@ -1,11 +1,8 @@ -import { EventEmitter } from 'events' -import type { FixedClusterPoolOptions, WorkerWithMessageChannel } from './fixed' +import type { Worker } from 'cluster' +import type { MessageValue } from '../../utility-types' +import type { ClusterPoolOptions } from './fixed' import { FixedClusterPool } from './fixed' -class MyEmitter extends EventEmitter {} - -export type DynamicClusterPoolOptions = FixedClusterPoolOptions - /** * A cluster pool with a min/max number of workers, is possible to execute tasks in sync or async mode as you prefer. * @@ -21,8 +18,6 @@ export class DynamicClusterPool< // eslint-disable-next-line @typescript-eslint/no-explicit-any Response = any > extends FixedClusterPool { - public readonly emitter: MyEmitter - /** * @param min Min number of workers that will be always active * @param max Max number of workers that will be active @@ -30,18 +25,16 @@ export class DynamicClusterPool< * @param opts An object with possible options for example `errorHandler`, `onlineHandler`. Default: `{ maxTasks: 1000 }` */ public constructor ( - public readonly min: number, + min: number, public readonly max: number, - public readonly filename: string, - public readonly opts: DynamicClusterPoolOptions = { maxTasks: 1000 } + filename: string, + opts: ClusterPoolOptions = { maxTasks: 1000 } ) { super(min, filename, opts) - - this.emitter = new MyEmitter() } - protected chooseWorker (): WorkerWithMessageChannel { - let worker: WorkerWithMessageChannel | undefined + protected chooseWorker (): Worker { + let worker: Worker | undefined for (const entry of this.tasks) { if (entry[1] === 0) { worker = entry[0] @@ -58,11 +51,11 @@ export class DynamicClusterPool< return super.chooseWorker() } // all workers are busy create a new worker - const worker = this.newWorker() - worker.on('message', (message: { kill?: number }) => { + const worker = this.internalNewWorker() + worker.on('message', (message: MessageValue) => { if (message.kill) { - worker.send({ kill: 1 }) - worker.kill() + this.sendToWorker(worker, { kill: 1 }) + void this.destroyWorker(worker) // clean workers from data structures const workerIndex = this.workers.indexOf(worker) this.workers.splice(workerIndex, 1) diff --git a/src/pools/cluster/fixed.ts b/src/pools/cluster/fixed.ts index 332daa4d..00205449 100644 --- a/src/pools/cluster/fixed.ts +++ b/src/pools/cluster/fixed.ts @@ -1,28 +1,9 @@ -import type { SendHandle } from 'child_process' import { fork, isMaster, setupMaster, Worker } from 'cluster' import type { MessageValue } from '../../utility-types' +import type { PoolOptions } from '../abstract-pool' +import { AbstractPool } from '../abstract-pool' -export type WorkerWithMessageChannel = Worker // & Draft - -export interface FixedClusterPoolOptions { - /** - * A function that will listen for error event on each worker. - */ - errorHandler?: (this: Worker, e: Error) => void - /** - * A function that will listen for online event on each worker. - */ - onlineHandler?: (this: Worker) => void - /** - * A function that will listen for exit event on each worker. - */ - exitHandler?: (this: Worker, code: number) => void - /** - * This is just to avoid not useful warnings message, is used to set `maxListeners` on event emitters (workers are event emitters). - * - * @default 1000 - */ - maxTasks?: number +export interface ClusterPoolOptions extends PoolOptions { /** * Key/value pairs to add to worker process environment. * @@ -41,122 +22,63 @@ export interface FixedClusterPoolOptions { * @since 2.0.0 */ // eslint-disable-next-line @typescript-eslint/no-explicit-any -export class FixedClusterPool { - public readonly workers: WorkerWithMessageChannel[] = [] - public nextWorker: number = 0 - - // workerId as key and an integer value - public readonly tasks: Map = new Map< - WorkerWithMessageChannel, - number - >() - - protected id: number = 0 - +export class FixedClusterPool extends AbstractPool< + Worker, + Data, + Response +> { /** * @param numWorkers Number of workers for this pool. * @param filePath A file path with implementation of `ClusterWorker` class, relative path is fine. * @param opts An object with possible options for example `errorHandler`, `onlineHandler`. Default: `{ maxTasks: 1000 }` */ public constructor ( - public readonly numWorkers: number, - public readonly filePath: string, - public readonly opts: FixedClusterPoolOptions = { maxTasks: 1000 } + numWorkers: number, + filePath: string, + public readonly opts: ClusterPoolOptions = { maxTasks: 1000 } ) { - if (!isMaster) { - throw new Error('Cannot start a cluster pool from a worker!') - } - // TODO christopher 2021-02-09: Improve this check e.g. with a pattern or blank check - if (!this.filePath) { - throw new Error('Please specify a file with a worker implementation') - } + super(numWorkers, filePath, opts) + } + protected setupHook (): void { setupMaster({ exec: this.filePath }) + } - for (let i = 1; i <= this.numWorkers; i++) { - this.newWorker() - } + protected isMain (): boolean { + return isMaster } - public destroy (): void { - for (const worker of this.workers) { - worker.kill() - } + protected destroyWorker (worker: Worker): void { + worker.kill() } - /** - * Execute the task specified into the constructor with the data parameter. - * - * @param data The input for the task specified. - * @returns Promise that is resolved when the task is done. - */ - public execute (data: Data): Promise { - // configure worker to handle message with the specified task - const worker: WorkerWithMessageChannel = this.chooseWorker() - // console.log('FixedClusterPool#execute choosen worker:', worker) - const previousWorkerIndex = this.tasks.get(worker) - if (previousWorkerIndex !== undefined) { - this.tasks.set(worker, previousWorkerIndex + 1) - } else { - throw Error('Worker could not be found in tasks map') - } - const id: number = ++this.id - const res: Promise = this.internalExecute(worker, id) - // console.log('FixedClusterPool#execute send data to worker:', worker) - worker.send({ data: data || {}, id: id }) - return res + protected sendToWorker (worker: Worker, message: MessageValue): void { + worker.send(message) } - protected internalExecute ( - worker: WorkerWithMessageChannel, - id: number - ): Promise { - return new Promise((resolve, reject) => { - const listener: ( - message: MessageValue, - handle: SendHandle - ) => void = message => { - // console.log('FixedClusterPool#internalExecute listener:', message) - if (message.id === id) { - worker.removeListener('message', listener) - const previousWorkerIndex = this.tasks.get(worker) - if (previousWorkerIndex !== undefined) { - this.tasks.set(worker, previousWorkerIndex + 1) - } else { - throw Error('Worker could not be found in tasks map') - } - if (message.error) reject(message.error) - else resolve(message.data as Response) - } - } - worker.on('message', listener) - }) + protected registerWorkerMessageListener ( + port: Worker, + listener: (message: MessageValue) => void + ): void { + port.on('message', listener) + } + + protected unregisterWorkerMessageListener ( + port: Worker, + listener: (message: MessageValue) => void + ): void { + port.removeListener('message', listener) } - protected chooseWorker (): WorkerWithMessageChannel { - if (this.workers.length - 1 === this.nextWorker) { - this.nextWorker = 0 - return this.workers[this.nextWorker] - } else { - this.nextWorker++ - return this.workers[this.nextWorker] - } + protected newWorker (): Worker { + return fork(this.opts.env) } - protected newWorker (): WorkerWithMessageChannel { - const worker: WorkerWithMessageChannel = fork(this.opts.env) - worker.on('error', this.opts.errorHandler ?? (() => {})) - worker.on('online', this.opts.onlineHandler ?? (() => {})) - // TODO handle properly when a worker exit - worker.on('exit', this.opts.exitHandler ?? (() => {})) - this.workers.push(worker) + protected afterNewWorkerPushed (worker: Worker): void { // we will attach a listener for every task, // when task is completed the listener will be removed but to avoid warnings we are increasing the max listeners size worker.setMaxListeners(this.opts.maxTasks ?? 1000) - // init tasks map - this.tasks.set(worker, 0) - return worker } } diff --git a/src/pools/pool.ts b/src/pools/pool.ts new file mode 100644 index 00000000..0c9b232f --- /dev/null +++ b/src/pools/pool.ts @@ -0,0 +1,9 @@ +export interface IPool< + // eslint-disable-next-line @typescript-eslint/no-explicit-any + Data = any, + // eslint-disable-next-line @typescript-eslint/no-explicit-any + Response = any +> { + destroy(): Promise + execute(data: Data): Promise +} diff --git a/src/pools/thread/dynamic.ts b/src/pools/thread/dynamic.ts index e80276d9..d6a6e1ba 100644 --- a/src/pools/thread/dynamic.ts +++ b/src/pools/thread/dynamic.ts @@ -1,11 +1,8 @@ -import { EventEmitter } from 'events' -import type { FixedThreadPoolOptions, WorkerWithMessageChannel } from './fixed' +import type { MessageValue } from '../../utility-types' +import type { PoolOptions } from '../abstract-pool' +import type { ThreadWorkerWithMessageChannel } from './fixed' import { FixedThreadPool } from './fixed' -class MyEmitter extends EventEmitter {} - -export type DynamicThreadPoolOptions = FixedThreadPoolOptions - /** * A thread pool with a min/max number of threads, is possible to execute tasks in sync or async mode as you prefer. * @@ -21,8 +18,6 @@ export class DynamicThreadPool< // eslint-disable-next-line @typescript-eslint/no-explicit-any Response = any > extends FixedThreadPool { - public readonly emitter: MyEmitter - /** * @param min Min number of threads that will be always active * @param max Max number of threads that will be active @@ -30,18 +25,16 @@ export class DynamicThreadPool< * @param opts An object with possible options for example `errorHandler`, `onlineHandler`. Default: `{ maxTasks: 1000 }` */ public constructor ( - public readonly min: number, + min: number, public readonly max: number, - public readonly filename: string, - public readonly opts: DynamicThreadPoolOptions = { maxTasks: 1000 } + filename: string, + opts: PoolOptions = { maxTasks: 1000 } ) { super(min, filename, opts) - - this.emitter = new MyEmitter() } - protected chooseWorker (): WorkerWithMessageChannel { - let worker: WorkerWithMessageChannel | undefined + protected chooseWorker (): ThreadWorkerWithMessageChannel { + let worker: ThreadWorkerWithMessageChannel | undefined for (const entry of this.tasks) { if (entry[1] === 0) { worker = entry[0] @@ -58,11 +51,11 @@ export class DynamicThreadPool< return super.chooseWorker() } // all workers are busy create a new worker - const worker = this.newWorker() - worker.port2?.on('message', (message: { kill?: number }) => { + const worker = this.internalNewWorker() + worker.port2?.on('message', (message: MessageValue) => { if (message.kill) { - worker.postMessage({ kill: 1 }) - void worker.terminate() + this.sendToWorker(worker, { kill: 1 }) + void this.destroyWorker(worker) // clean workers from data structures const workerIndex = this.workers.indexOf(worker) this.workers.splice(workerIndex, 1) diff --git a/src/pools/thread/fixed.ts b/src/pools/thread/fixed.ts index 36eddace..bed8f3a7 100644 --- a/src/pools/thread/fixed.ts +++ b/src/pools/thread/fixed.ts @@ -1,28 +1,9 @@ import { isMainThread, MessageChannel, SHARE_ENV, Worker } from 'worker_threads' import type { Draft, MessageValue } from '../../utility-types' +import type { PoolOptions } from '../abstract-pool' +import { AbstractPool } from '../abstract-pool' -export type WorkerWithMessageChannel = Worker & Draft - -export interface FixedThreadPoolOptions { - /** - * A function that will listen for error event on each worker thread. - */ - errorHandler?: (this: Worker, e: Error) => void - /** - * A function that will listen for online event on each worker thread. - */ - onlineHandler?: (this: Worker) => void - /** - * A function that will listen for exit event on each worker thread. - */ - exitHandler?: (this: Worker, code: number) => void - /** - * This is just to avoid not useful warnings message, is used to set `maxListeners` on event emitters (workers are event emitters). - * - * @default 1000 - */ - maxTasks?: number -} +export type ThreadWorkerWithMessageChannel = Worker & Draft /** * A thread pool with a static number of threads, is possible to execute tasks in sync or async mode as you prefer. @@ -33,109 +14,64 @@ export interface FixedThreadPoolOptions { * @since 0.0.1 */ // eslint-disable-next-line @typescript-eslint/no-explicit-any -export class FixedThreadPool { - public readonly workers: WorkerWithMessageChannel[] = [] - public nextWorker: number = 0 - - // threadId as key and an integer value - public readonly tasks: Map = new Map< - WorkerWithMessageChannel, - number - >() - - protected id: number = 0 - +export class FixedThreadPool extends AbstractPool< + ThreadWorkerWithMessageChannel, + Data, + Response +> { /** * @param numThreads Num of threads for this worker pool. * @param filePath A file path with implementation of `ThreadWorker` class, relative path is fine. * @param opts An object with possible options for example `errorHandler`, `onlineHandler`. Default: `{ maxTasks: 1000 }` */ public constructor ( - public readonly numThreads: number, - public readonly filePath: string, - public readonly opts: FixedThreadPoolOptions = { maxTasks: 1000 } + numThreads: number, + filePath: string, + opts: PoolOptions = { maxTasks: 1000 } ) { - if (!isMainThread) { - throw new Error('Cannot start a thread pool from a worker thread !!!') - } - // TODO christopher 2021-02-07: Improve this check e.g. with a pattern or blank check - if (!this.filePath) { - throw new Error('Please specify a file with a worker implementation') - } + super(numThreads, filePath, opts) + } - for (let i = 1; i <= this.numThreads; i++) { - this.newWorker() - } + protected isMain (): boolean { + return isMainThread } - public async destroy (): Promise { - for (const worker of this.workers) { - await worker.terminate() - } + protected async destroyWorker ( + worker: ThreadWorkerWithMessageChannel + ): Promise { + await worker.terminate() } - /** - * Execute the task specified into the constructor with the data parameter. - * - * @param data The input for the task specified. - * @returns Promise that is resolved when the task is done. - */ - public execute (data: Data): Promise { - // configure worker to handle message with the specified task - const worker = this.chooseWorker() - const previousWorkerIndex = this.tasks.get(worker) - if (previousWorkerIndex !== undefined) { - this.tasks.set(worker, previousWorkerIndex + 1) - } else { - throw Error('Worker could not be found in tasks map') - } - const id = ++this.id - const res = this.internalExecute(worker, id) - worker.postMessage({ data: data || {}, id: id }) - return res + protected sendToWorker ( + worker: ThreadWorkerWithMessageChannel, + message: MessageValue + ): void { + worker.postMessage(message) } - protected internalExecute ( - worker: WorkerWithMessageChannel, - id: number - ): Promise { - return new Promise((resolve, reject) => { - const listener: (message: MessageValue) => void = message => { - if (message.id === id) { - worker.port2?.removeListener('message', listener) - const previousWorkerIndex = this.tasks.get(worker) - if (previousWorkerIndex !== undefined) { - this.tasks.set(worker, previousWorkerIndex + 1) - } else { - throw Error('Worker could not be found in tasks map') - } - if (message.error) reject(message.error) - else resolve(message.data as Response) - } - } - worker.port2?.on('message', listener) - }) + protected registerWorkerMessageListener ( + port: ThreadWorkerWithMessageChannel, + listener: (message: MessageValue) => void + ): void { + port.port2?.on('message', listener) } - protected chooseWorker (): WorkerWithMessageChannel { - if (this.workers.length - 1 === this.nextWorker) { - this.nextWorker = 0 - return this.workers[this.nextWorker] - } else { - this.nextWorker++ - return this.workers[this.nextWorker] - } + protected unregisterWorkerMessageListener ( + port: ThreadWorkerWithMessageChannel, + listener: (message: MessageValue) => void + ): void { + port.port2?.removeListener('message', listener) } - protected newWorker (): WorkerWithMessageChannel { - const worker: WorkerWithMessageChannel = new Worker(this.filePath, { + protected newWorker (): ThreadWorkerWithMessageChannel { + return new Worker(this.filePath, { env: SHARE_ENV }) - worker.on('error', this.opts.errorHandler ?? (() => {})) - worker.on('online', this.opts.onlineHandler ?? (() => {})) - // TODO handle properly when a thread exit - worker.on('exit', this.opts.exitHandler ?? (() => {})) - this.workers.push(worker) + } + + protected afterNewWorkerPushed ( + worker: ThreadWorkerWithMessageChannel + ): void { const { port1, port2 } = new MessageChannel() worker.postMessage({ parent: port1 }, [port1]) worker.port1 = port1 @@ -143,8 +79,5 @@ export class FixedThreadPool { // we will attach a listener for every task, // when task is completed the listener will be removed but to avoid warnings we are increasing the max listeners size worker.port2.setMaxListeners(this.opts.maxTasks ?? 1000) - // init tasks map - this.tasks.set(worker, 0) - return worker } } diff --git a/src/worker/abstract-worker.ts b/src/worker/abstract-worker.ts new file mode 100644 index 00000000..5e223623 --- /dev/null +++ b/src/worker/abstract-worker.ts @@ -0,0 +1,91 @@ +import { AsyncResource } from 'async_hooks' +import type { MessageValue } from '../utility-types' +import type { WorkerOptions } from './worker-options' + +export abstract class AbstractWorker< + MainWorker, + // eslint-disable-next-line @typescript-eslint/no-explicit-any + Data = any, + // eslint-disable-next-line @typescript-eslint/no-explicit-any + Response = any +> extends AsyncResource { + protected readonly maxInactiveTime: number + protected readonly async: boolean + protected lastTask: number + protected readonly interval?: NodeJS.Timeout + + /** + * + * @param type The type of async event. + * @param isMain + * @param fn + * @param opts + */ + public constructor ( + type: string, + isMain: boolean, + fn: (data: Data) => Response, + public readonly opts: WorkerOptions = {} + ) { + super(type) + + this.maxInactiveTime = this.opts.maxInactiveTime ?? 1000 * 60 + this.async = !!this.opts.async + this.lastTask = Date.now() + if (!fn) throw new Error('Fn parameter is mandatory') + // keep the worker active + if (!isMain) { + this.interval = setInterval( + this.checkAlive.bind(this), + this.maxInactiveTime / 2 + ) + this.checkAlive.bind(this)() + } + } + + protected abstract getMainWorker (): MainWorker + + protected abstract sendToMainWorker (message: MessageValue): void + + protected checkAlive (): void { + if (Date.now() - this.lastTask > this.maxInactiveTime) { + this.sendToMainWorker({ kill: 1 }) + } + } + + protected handleError (e: Error | string): string { + return (e as unknown) as string + } + + protected run ( + fn: (data?: Data) => Response, + value: MessageValue + ): void { + try { + const res = fn(value.data) + this.sendToMainWorker({ data: res, id: value.id }) + this.lastTask = Date.now() + } catch (e) { + const err = this.handleError(e) + this.sendToMainWorker({ error: err, id: value.id }) + this.lastTask = Date.now() + } + } + + protected runAsync ( + fn: (data?: Data) => Promise, + value: MessageValue + ): void { + fn(value.data) + .then(res => { + this.sendToMainWorker({ data: res, id: value.id }) + this.lastTask = Date.now() + return null + }) + .catch(e => { + const err = this.handleError(e) + this.sendToMainWorker({ error: err, id: value.id }) + this.lastTask = Date.now() + }) + } +} diff --git a/src/worker/cluster-worker.ts b/src/worker/cluster-worker.ts index e750126b..092dd511 100644 --- a/src/worker/cluster-worker.ts +++ b/src/worker/cluster-worker.ts @@ -1,6 +1,7 @@ -import { AsyncResource } from 'async_hooks' +import type { Worker } from 'cluster' import { isMaster, worker } from 'cluster' import type { MessageValue } from '../utility-types' +import { AbstractWorker } from './abstract-worker' import type { WorkerOptions } from './worker-options' /** @@ -13,36 +14,18 @@ import type { WorkerOptions } from './worker-options' * @since 2.0.0 */ // eslint-disable-next-line @typescript-eslint/no-explicit-any -export class ClusterWorker extends AsyncResource { - protected readonly maxInactiveTime: number - protected readonly async: boolean - protected lastTask: number - protected readonly interval?: NodeJS.Timeout +export class ClusterWorker extends AbstractWorker< + Worker, + Data, + Response +> { + public constructor (fn: (data: Data) => Response, opts: WorkerOptions = {}) { + super('worker-cluster-pool:pioardi', isMaster, fn, opts) - public constructor ( - fn: (data: Data) => Response, - public readonly opts: WorkerOptions = {} - ) { - super('worker-cluster-pool:pioardi') - - this.maxInactiveTime = this.opts.maxInactiveTime ?? 1000 * 60 - this.async = !!this.opts.async - this.lastTask = Date.now() - if (!fn) throw new Error('Fn parameter is mandatory') - // keep the worker active - if (!isMaster) { - // console.log('ClusterWorker#constructor', 'is not master') - this.interval = setInterval( - this.checkAlive.bind(this), - this.maxInactiveTime / 2 - ) - this.checkAlive.bind(this)() - } worker.on('message', (value: MessageValue) => { - // console.log("cluster.on('message', value)", value) if (value?.data && value.id) { // here you will receive messages - // console.log('This is the main worker ' + isMaster) + // console.log('This is the main worker ' + isMain) if (this.async) { this.runInAsyncScope(this.runAsync.bind(this), this, fn, value) } else { @@ -56,41 +39,15 @@ export class ClusterWorker extends AsyncResource { }) } - protected checkAlive (): void { - if (Date.now() - this.lastTask > this.maxInactiveTime) { - worker.send({ kill: 1 }) - } + protected getMainWorker (): Worker { + return worker } - protected run ( - fn: (data?: Data) => Response, - value: MessageValue - ): void { - try { - const res = fn(value.data as Data) - worker.send({ data: res, id: value.id }) - this.lastTask = Date.now() - } catch (e) { - const err = e instanceof Error ? e.message : e - worker.send({ error: err, id: value.id }) - this.lastTask = Date.now() - } + protected sendToMainWorker (message: MessageValue): void { + this.getMainWorker().send(message) } - protected runAsync ( - fn: (data?: Data) => Promise, - value: MessageValue - ): void { - fn(value.data) - .then(res => { - worker.send({ data: res, id: value.id }) - this.lastTask = Date.now() - return null - }) - .catch(e => { - const err = e instanceof Error ? e.message : e - worker.send({ error: err, id: value.id }) - this.lastTask = Date.now() - }) + protected handleError (e: Error | string): string { + return e instanceof Error ? e.message : e } } diff --git a/src/worker/thread-worker.ts b/src/worker/thread-worker.ts index d0af904b..5a16026c 100644 --- a/src/worker/thread-worker.ts +++ b/src/worker/thread-worker.ts @@ -1,6 +1,6 @@ -import { AsyncResource } from 'async_hooks' import { isMainThread, parentPort } from 'worker_threads' import type { MessageValue } from '../utility-types' +import { AbstractWorker } from './abstract-worker' import type { WorkerOptions } from './worker-options' /** @@ -13,35 +13,20 @@ import type { WorkerOptions } from './worker-options' * @since 0.0.1 */ // eslint-disable-next-line @typescript-eslint/no-explicit-any -export class ThreadWorker extends AsyncResource { - protected readonly maxInactiveTime: number - protected readonly async: boolean - protected lastTask: number - protected readonly interval?: NodeJS.Timeout +export class ThreadWorker extends AbstractWorker< + MessagePort, + Data, + Response +> { protected parent?: MessagePort - public constructor ( - fn: (data: Data) => Response, - public readonly opts: WorkerOptions = {} - ) { - super('worker-thread-pool:pioardi') + public constructor (fn: (data: Data) => Response, opts: WorkerOptions = {}) { + super('worker-thread-pool:pioardi', isMainThread, fn, opts) - this.maxInactiveTime = this.opts.maxInactiveTime ?? 1000 * 60 - this.async = !!this.opts.async - this.lastTask = Date.now() - if (!fn) throw new Error('Fn parameter is mandatory') - // keep the worker active - if (!isMainThread) { - this.interval = setInterval( - this.checkAlive.bind(this), - this.maxInactiveTime / 2 - ) - this.checkAlive.bind(this)() - } parentPort?.on('message', (value: MessageValue) => { if (value?.data && value.id) { // here you will receive messages - // console.log('This is the main thread ' + isMainThread) + // console.log('This is the main worker ' + isMain) if (this.async) { this.runInAsyncScope(this.runAsync.bind(this), this, fn, value) } else { @@ -52,46 +37,21 @@ export class ThreadWorker extends AsyncResource { // this will be received once this.parent = value.parent } else if (value.kill) { - // here is time to kill this thread, just clearing the interval + // here is time to kill this worker, just clearing the interval if (this.interval) clearInterval(this.interval) this.emitDestroy() } }) } - protected checkAlive (): void { - if (Date.now() - this.lastTask > this.maxInactiveTime) { - this.parent?.postMessage({ kill: 1 }) - } - } - - protected run ( - fn: (data?: Data) => Response, - value: MessageValue - ): void { - try { - const res = fn(value.data) - this.parent?.postMessage({ data: res, id: value.id }) - this.lastTask = Date.now() - } catch (e) { - this.parent?.postMessage({ error: e, id: value.id }) - this.lastTask = Date.now() + protected getMainWorker (): MessagePort { + if (!this.parent) { + throw new Error('Parent was not set') } + return this.parent } - protected runAsync ( - fn: (data?: Data) => Promise, - value: MessageValue - ): void { - fn(value.data) - .then(res => { - this.parent?.postMessage({ data: res, id: value.id }) - this.lastTask = Date.now() - return null - }) - .catch(e => { - this.parent?.postMessage({ error: e, id: value.id }) - this.lastTask = Date.now() - }) + protected sendToMainWorker (message: MessageValue): void { + this.getMainWorker().postMessage(message) } }