"cwd": "${workspaceFolder}",
"preLaunchTask": "Development build",
"runtimeExecutable": "npm",
- "runtimeArgs": [
- "run-script",
- "test:debug:vscode"
- ],
- "skipFiles": [
- "<node_internals>/**"
- ],
+ "runtimeArgs": ["run-script", "test:debug:vscode"],
+ "skipFiles": ["<node_internals>/**"],
"stopOnEntry": true
},
{
"cwd": "${workspaceFolder}",
"preLaunchTask": "Development build",
"runtimeExecutable": "npm",
- "runtimeArgs": [
- "run-script",
- "benchmark:debug:vscode"
- ],
- "skipFiles": [
- "<node_internals>/**"
- ],
+ "runtimeArgs": ["run-script", "benchmark:debug:vscode"],
+ "skipFiles": ["<node_internals>/**"],
"stopOnEntry": true
}
]
"source-map": "^0.5.0"
},
"dependencies": {
- "@babel/parser": {
- "version": "7.12.15",
- "resolved": "https://registry.npmjs.org/@babel/parser/-/parser-7.12.15.tgz",
- "integrity": "sha512-AQBOU2Z9kWwSZMd6lNjCX0GUgFonL1wAM1db8L8PMk9UDaGsRCArBkU4Sc+UCM3AE4hjbXx+h58Lb3QT4oRmrA==",
- "dev": true
- },
- "json5": {
- "version": "2.2.0",
- "resolved": "https://registry.npmjs.org/json5/-/json5-2.2.0.tgz",
- "integrity": "sha512-f+8cldu7X/y7RAJurMEJmdoKXGB/X550w2Nr3tTbezL6RwEE/iMcm+tZnXeoZtKuOq6ft8+CqzEkrIgx1fPoQA==",
- "dev": true,
- "requires": {
- "minimist": "^1.2.5"
- }
- },
"semver": {
"version": "5.7.1",
"resolved": "https://registry.npmjs.org/semver/-/semver-5.7.1.tgz",
"integrity": "sha512-sauaDf/PZdVgrLTNYHRtpXa1iRiKcaebiKQ1BJdpQlWH2lCvexQdX55snPFyK7QzpudqbCI0qXFfOasHdyNDGQ==",
"dev": true
- },
- "source-map": {
- "version": "0.5.7",
- "resolved": "https://registry.npmjs.org/source-map/-/source-map-0.5.7.tgz",
- "integrity": "sha1-igOdLRAh0i0eoUyA2OpGi6LvP8w=",
- "dev": true
}
}
},
"@babel/types": "^7.12.13",
"jsesc": "^2.5.1",
"source-map": "^0.5.0"
- },
- "dependencies": {
- "source-map": {
- "version": "0.5.7",
- "resolved": "https://registry.npmjs.org/source-map/-/source-map-0.5.7.tgz",
- "integrity": "sha1-igOdLRAh0i0eoUyA2OpGi6LvP8w=",
- "dev": true
- }
}
},
"@babel/helper-annotate-as-pure": {
}
},
"@babel/parser": {
- "version": "7.12.11",
- "resolved": "https://registry.npmjs.org/@babel/parser/-/parser-7.12.11.tgz",
- "integrity": "sha512-N3UxG+uuF4CMYoNj8AhnbAcJF0PiuJ9KHuy1lQmkYsxTer/MAH9UBNHsBoAX/4s6NvlDD047No8mYVGGzLL4hg==",
+ "version": "7.12.15",
+ "resolved": "https://registry.npmjs.org/@babel/parser/-/parser-7.12.15.tgz",
+ "integrity": "sha512-AQBOU2Z9kWwSZMd6lNjCX0GUgFonL1wAM1db8L8PMk9UDaGsRCArBkU4Sc+UCM3AE4hjbXx+h58Lb3QT4oRmrA==",
"dev": true
},
"@babel/plugin-proposal-async-generator-functions": {
"@babel/helper-replace-supers": "^7.12.13",
"@babel/helper-split-export-declaration": "^7.12.13",
"globals": "^11.1.0"
- },
- "dependencies": {
- "globals": {
- "version": "11.12.0",
- "resolved": "https://registry.npmjs.org/globals/-/globals-11.12.0.tgz",
- "integrity": "sha512-WOBp/EEGUiIsJSp7wcv/y6MO+lV9UoncWqxuFfm8eBwzWNgyfBd6Gz+IeKQ9jCmyhoH99g15M3T+QaVHFjizVA==",
- "dev": true
- }
}
},
"@babel/plugin-transform-computed-properties": {
"@babel/code-frame": "^7.12.13",
"@babel/parser": "^7.12.13",
"@babel/types": "^7.12.13"
- },
- "dependencies": {
- "@babel/parser": {
- "version": "7.12.15",
- "resolved": "https://registry.npmjs.org/@babel/parser/-/parser-7.12.15.tgz",
- "integrity": "sha512-AQBOU2Z9kWwSZMd6lNjCX0GUgFonL1wAM1db8L8PMk9UDaGsRCArBkU4Sc+UCM3AE4hjbXx+h58Lb3QT4oRmrA==",
- "dev": true
- }
}
},
"@babel/traverse": {
"debug": "^4.1.0",
"globals": "^11.1.0",
"lodash": "^4.17.19"
- },
- "dependencies": {
- "@babel/parser": {
- "version": "7.12.15",
- "resolved": "https://registry.npmjs.org/@babel/parser/-/parser-7.12.15.tgz",
- "integrity": "sha512-AQBOU2Z9kWwSZMd6lNjCX0GUgFonL1wAM1db8L8PMk9UDaGsRCArBkU4Sc+UCM3AE4hjbXx+h58Lb3QT4oRmrA==",
- "dev": true
- },
- "globals": {
- "version": "11.12.0",
- "resolved": "https://registry.npmjs.org/globals/-/globals-11.12.0.tgz",
- "integrity": "sha512-WOBp/EEGUiIsJSp7wcv/y6MO+lV9UoncWqxuFfm8eBwzWNgyfBd6Gz+IeKQ9jCmyhoH99g15M3T+QaVHFjizVA==",
- "dev": true
- }
}
},
"@babel/types": {
"strip-json-comments": "^3.1.1"
},
"dependencies": {
+ "globals": {
+ "version": "12.4.0",
+ "resolved": "https://registry.npmjs.org/globals/-/globals-12.4.0.tgz",
+ "integrity": "sha512-BWICuzzDvDoH54NHKCseDanAhE3CeDorgDL5MT6LMXXj2WCnd9UC2szdk4AWLfjdgNBCXLUanXYcpBBKOSWGwg==",
+ "dev": true,
+ "requires": {
+ "type-fest": "^0.8.1"
+ }
+ },
"ignore": {
"version": "4.0.6",
"resolved": "https://registry.npmjs.org/ignore/-/ignore-4.0.6.tgz",
"dev": true,
"requires": {
"safe-buffer": "~5.1.1"
- },
- "dependencies": {
- "safe-buffer": {
- "version": "5.1.2",
- "resolved": "https://registry.npmjs.org/safe-buffer/-/safe-buffer-5.1.2.tgz",
- "integrity": "sha512-Gd2UZBJDkXlY7GbJxfsE8/nvKkUEU1G38c1siN6QP6a9PT9MmHB8GnpscSmMJSoF8LOIrt8ud/wPtojys4G6+g==",
- "dev": true
- }
}
},
"core-js-compat": {
"v8-compile-cache": "^2.0.3"
},
"dependencies": {
+ "globals": {
+ "version": "12.4.0",
+ "resolved": "https://registry.npmjs.org/globals/-/globals-12.4.0.tgz",
+ "integrity": "sha512-BWICuzzDvDoH54NHKCseDanAhE3CeDorgDL5MT6LMXXj2WCnd9UC2szdk4AWLfjdgNBCXLUanXYcpBBKOSWGwg==",
+ "dev": true,
+ "requires": {
+ "type-fest": "^0.8.1"
+ }
+ },
"ignore": {
"version": "4.0.6",
"resolved": "https://registry.npmjs.org/ignore/-/ignore-4.0.6.tgz",
}
},
"globals": {
- "version": "12.4.0",
- "resolved": "https://registry.npmjs.org/globals/-/globals-12.4.0.tgz",
- "integrity": "sha512-BWICuzzDvDoH54NHKCseDanAhE3CeDorgDL5MT6LMXXj2WCnd9UC2szdk4AWLfjdgNBCXLUanXYcpBBKOSWGwg==",
- "dev": true,
- "requires": {
- "type-fest": "^0.8.1"
- }
+ "version": "11.12.0",
+ "resolved": "https://registry.npmjs.org/globals/-/globals-11.12.0.tgz",
+ "integrity": "sha512-WOBp/EEGUiIsJSp7wcv/y6MO+lV9UoncWqxuFfm8eBwzWNgyfBd6Gz+IeKQ9jCmyhoH99g15M3T+QaVHFjizVA==",
+ "dev": true
},
"globby": {
"version": "11.0.2",
}
},
"graceful-fs": {
- "version": "4.2.5",
- "resolved": "https://registry.npmjs.org/graceful-fs/-/graceful-fs-4.2.5.tgz",
- "integrity": "sha512-kBBSQbz2K0Nyn+31j/w36fUfxkBW9/gfwRWdUY1ULReH3iokVJgddZAFcD1D0xlgTmFxJCbUkUclAlc6/IDJkw==",
+ "version": "4.2.6",
+ "resolved": "https://registry.npmjs.org/graceful-fs/-/graceful-fs-4.2.6.tgz",
+ "integrity": "sha512-nTnJ528pbqxYanhpDYsi4Rd8MAeaBA67+RZ10CM1m3bTAVFEDcd5AuA4a6W5YkGZ1iNXHzZz8T6TBKLeBuNriQ==",
"dev": true
},
"graphql": {
"source-map": "^0.6.1",
"uglify-js": "^3.1.4",
"wordwrap": "^1.0.0"
+ },
+ "dependencies": {
+ "source-map": {
+ "version": "0.6.1",
+ "resolved": "https://registry.npmjs.org/source-map/-/source-map-0.6.1.tgz",
+ "integrity": "sha512-UjgapumWlbMhkBgzT7Ykc5YXUT46F0iKu8SGXq0bcwP5dz/h0Plj6enJqjz1Zbq2l5WaqYnrVbwWOWMyF3F47g==",
+ "dev": true
+ }
}
},
"har-schema": {
"debug": "^4.1.1",
"istanbul-lib-coverage": "^3.0.0",
"source-map": "^0.6.1"
+ },
+ "dependencies": {
+ "source-map": {
+ "version": "0.6.1",
+ "resolved": "https://registry.npmjs.org/source-map/-/source-map-0.6.1.tgz",
+ "integrity": "sha512-UjgapumWlbMhkBgzT7Ykc5YXUT46F0iKu8SGXq0bcwP5dz/h0Plj6enJqjz1Zbq2l5WaqYnrVbwWOWMyF3F47g==",
+ "dev": true
+ }
}
},
"istanbul-reports": {
"dev": true
},
"json5": {
- "version": "1.0.1",
- "resolved": "https://registry.npmjs.org/json5/-/json5-1.0.1.tgz",
- "integrity": "sha512-aKS4WQjPenRxiQsC93MNfjx+nbF4PAdYzmd/1JIj8HYzqfbu86beTuNgXDzPknWk0n0uARlyewZo4s++ES36Ow==",
+ "version": "2.2.0",
+ "resolved": "https://registry.npmjs.org/json5/-/json5-2.2.0.tgz",
+ "integrity": "sha512-f+8cldu7X/y7RAJurMEJmdoKXGB/X550w2Nr3tTbezL6RwEE/iMcm+tZnXeoZtKuOq6ft8+CqzEkrIgx1fPoQA==",
"dev": true,
"requires": {
- "minimist": "^1.2.0"
+ "minimist": "^1.2.5"
}
},
"jsonify": {
}
}
},
+ "source-map": {
+ "version": "0.6.1",
+ "resolved": "https://registry.npmjs.org/source-map/-/source-map-0.6.1.tgz",
+ "integrity": "sha512-UjgapumWlbMhkBgzT7Ykc5YXUT46F0iKu8SGXq0bcwP5dz/h0Plj6enJqjz1Zbq2l5WaqYnrVbwWOWMyF3F47g==",
+ "dev": true
+ },
"supports-color": {
"version": "6.1.0",
"resolved": "https://registry.npmjs.org/supports-color/-/supports-color-6.1.0.tgz",
"@babel/highlight": "^7.10.4"
}
},
+ "@babel/parser": {
+ "version": "7.12.11",
+ "resolved": "https://registry.npmjs.org/@babel/parser/-/parser-7.12.11.tgz",
+ "integrity": "sha512-N3UxG+uuF4CMYoNj8AhnbAcJF0PiuJ9KHuy1lQmkYsxTer/MAH9UBNHsBoAX/4s6NvlDD047No8mYVGGzLL4hg==",
+ "dev": true
+ },
"@typescript-eslint/typescript-estree": {
"version": "2.34.0",
"resolved": "https://registry.npmjs.org/@typescript-eslint/typescript-estree/-/typescript-estree-2.34.0.tgz",
"resolved": "https://registry.npmjs.org/ignore/-/ignore-4.0.6.tgz",
"integrity": "sha512-cyFDKrqc/YdcWFniJhzI42+AzS+gNwmUzOSFcRCQYwySuBBBy/KjuxWLZ/FHEH6Moq1NizMOBWyTcv8O4OZIMg==",
"dev": true
+ },
+ "resolve": {
+ "version": "1.19.0",
+ "resolved": "https://registry.npmjs.org/resolve/-/resolve-1.19.0.tgz",
+ "integrity": "sha512-rArEXAgsBG4UgRGcynxWIWKFvh/XZCcS8UJdHhwy91zwAvCZIbcs+vAbflgBnNjYMs/i/i+/Ux6IZhML1yPvxg==",
+ "dev": true,
+ "requires": {
+ "is-core-module": "^2.1.0",
+ "path-parse": "^1.0.6"
+ }
}
}
},
"integrity": "sha512-N5ZAX4/LxJmF+7wN74pUD6qAh9/wnvdQcjq9TZjevvXzSUo7bfmw91saqMjzGS2xq91/odN2dW/WOl7qQHNDGA==",
"dev": true
},
+ "queue-microtask": {
+ "version": "1.2.2",
+ "resolved": "https://registry.npmjs.org/queue-microtask/-/queue-microtask-1.2.2.tgz",
+ "integrity": "sha512-dB15eXv3p2jDlbOiNLyMabYg1/sXvppd8DP2J3EOCQ0AkuSXCW2tP7mnVouVLJKgUMY6yP0kcQDVpLCN13h4Xg==",
+ "dev": true
+ },
"randombytes": {
"version": "2.1.0",
"resolved": "https://registry.npmjs.org/randombytes/-/randombytes-2.1.0.tgz",
"dev": true
},
"resolve": {
- "version": "1.19.0",
- "resolved": "https://registry.npmjs.org/resolve/-/resolve-1.19.0.tgz",
- "integrity": "sha512-rArEXAgsBG4UgRGcynxWIWKFvh/XZCcS8UJdHhwy91zwAvCZIbcs+vAbflgBnNjYMs/i/i+/Ux6IZhML1yPvxg==",
+ "version": "1.20.0",
+ "resolved": "https://registry.npmjs.org/resolve/-/resolve-1.20.0.tgz",
+ "integrity": "sha512-wENBPt4ySzg4ybFQW2TT1zMQucPK95HSh/nq2CFTZVOGut2+pQvSsgtda4d26YrYcr067wjbmzOG8byDPBX63A==",
"dev": true,
"requires": {
- "is-core-module": "^2.1.0",
+ "is-core-module": "^2.2.0",
"path-parse": "^1.0.6"
}
},
}
},
"run-parallel": {
- "version": "1.1.10",
- "resolved": "https://registry.npmjs.org/run-parallel/-/run-parallel-1.1.10.tgz",
- "integrity": "sha512-zb/1OuZ6flOlH6tQyMPUrE3x3Ulxjlo9WIVXR4yVYi4H9UXQaeIsPbLn2R3O3vQCnDKkAl2qHiuocKKX4Tz/Sw==",
- "dev": true
+ "version": "1.2.0",
+ "resolved": "https://registry.npmjs.org/run-parallel/-/run-parallel-1.2.0.tgz",
+ "integrity": "sha512-5l4VyZR86LZ/lDxZTR6jqL8AFE2S0IFLMP26AbjsLVADxHdhB/c0GUsH+y39UfCi3dzz8OlQuPmnaJOMoDHQBA==",
+ "dev": true,
+ "requires": {
+ "queue-microtask": "^1.2.2"
+ }
},
"safe-buffer": {
- "version": "5.2.1",
- "resolved": "https://registry.npmjs.org/safe-buffer/-/safe-buffer-5.2.1.tgz",
- "integrity": "sha512-rp3So07KcdmmKbGvgaNxQSJr7bGVSVk5S9Eq1F+ppbRo70+YeaDxkw5Dd8NPN+GD6bjnYm2VuPuCXmpuYvmCXQ==",
+ "version": "5.1.2",
+ "resolved": "https://registry.npmjs.org/safe-buffer/-/safe-buffer-5.1.2.tgz",
+ "integrity": "sha512-Gd2UZBJDkXlY7GbJxfsE8/nvKkUEU1G38c1siN6QP6a9PT9MmHB8GnpscSmMJSoF8LOIrt8ud/wPtojys4G6+g==",
"dev": true
},
"safer-buffer": {
}
},
"source-map": {
- "version": "0.6.1",
- "resolved": "https://registry.npmjs.org/source-map/-/source-map-0.6.1.tgz",
- "integrity": "sha512-UjgapumWlbMhkBgzT7Ykc5YXUT46F0iKu8SGXq0bcwP5dz/h0Plj6enJqjz1Zbq2l5WaqYnrVbwWOWMyF3F47g==",
+ "version": "0.5.7",
+ "resolved": "https://registry.npmjs.org/source-map/-/source-map-0.5.7.tgz",
+ "integrity": "sha1-igOdLRAh0i0eoUyA2OpGi6LvP8w=",
"dev": true
},
"source-map-support": {
"requires": {
"buffer-from": "^1.0.0",
"source-map": "^0.6.0"
+ },
+ "dependencies": {
+ "source-map": {
+ "version": "0.6.1",
+ "resolved": "https://registry.npmjs.org/source-map/-/source-map-0.6.1.tgz",
+ "integrity": "sha512-UjgapumWlbMhkBgzT7Ykc5YXUT46F0iKu8SGXq0bcwP5dz/h0Plj6enJqjz1Zbq2l5WaqYnrVbwWOWMyF3F47g==",
+ "dev": true
+ }
}
},
"sourcemap-codec": {
},
"dependencies": {
"ajv": {
- "version": "7.0.4",
- "resolved": "https://registry.npmjs.org/ajv/-/ajv-7.0.4.tgz",
- "integrity": "sha512-xzzzaqgEQfmuhbhAoqjJ8T/1okb6gAzXn/eQRNpAN1AEUoHJTNF9xCDRTtf/s3SKldtZfa+RJeTs+BQq+eZ/sw==",
+ "version": "7.1.0",
+ "resolved": "https://registry.npmjs.org/ajv/-/ajv-7.1.0.tgz",
+ "integrity": "sha512-svS9uILze/cXbH0z2myCK2Brqprx/+JJYK5pHicT/GQiBfzzhUVAIT6MwqJg8y4xV/zoGsUeuPuwtoiKSGE15g==",
"dev": true,
"requires": {
"fast-deep-equal": "^3.1.1",
"json5": "^1.0.1",
"minimist": "^1.2.0",
"strip-bom": "^3.0.0"
+ },
+ "dependencies": {
+ "json5": {
+ "version": "1.0.1",
+ "resolved": "https://registry.npmjs.org/json5/-/json5-1.0.1.tgz",
+ "integrity": "sha512-aKS4WQjPenRxiQsC93MNfjx+nbF4PAdYzmd/1JIj8HYzqfbu86beTuNgXDzPknWk0n0uARlyewZo4s++ES36Ow==",
+ "dev": true,
+ "requires": {
+ "minimist": "^1.2.0"
+ }
+ }
}
},
"tslib": {
-import { DynamicClusterPool } from './pools/cluster/dynamic'
-import { FixedClusterPool } from './pools/cluster/fixed'
-import { DynamicThreadPool } from './pools/thread/dynamic'
-import { FixedThreadPool } from './pools/thread/fixed'
-import { ClusterWorker } from './worker/cluster-worker'
-import { ThreadWorker } from './worker/thread-worker'
-
-export type { DynamicClusterPoolOptions } from './pools/cluster/dynamic'
export type {
- FixedClusterPoolOptions,
- WorkerWithMessageChannel as ClusterWorkerWithMessageChannel
-} from './pools/cluster/fixed'
-export type { DynamicThreadPoolOptions } from './pools/thread/dynamic'
-export type {
- FixedThreadPoolOptions,
- WorkerWithMessageChannel as ThreadWorkerWithMessageChannel
-} from './pools/thread/fixed'
+ ErrorHandler,
+ ExitHandler,
+ IWorker,
+ OnlineHandler,
+ PoolOptions
+} from './pools/abstract-pool'
+export { DynamicClusterPool } from './pools/cluster/dynamic'
+export { FixedClusterPool } from './pools/cluster/fixed'
+export type { ClusterPoolOptions } from './pools/cluster/fixed'
+export type { IPool } from './pools/pool'
+export { DynamicThreadPool } from './pools/thread/dynamic'
+export { FixedThreadPool } from './pools/thread/fixed'
+export type { ThreadWorkerWithMessageChannel } from './pools/thread/fixed'
+export { AbstractWorker } from './worker/abstract-worker'
+export { ClusterWorker } from './worker/cluster-worker'
+export { ThreadWorker } from './worker/thread-worker'
export type { WorkerOptions } from './worker/worker-options'
-export {
- FixedThreadPool,
- FixedClusterPool,
- DynamicClusterPool,
- DynamicThreadPool,
- ThreadWorker,
- ClusterWorker
-}
--- /dev/null
+import EventEmitter from 'events'
+import type { MessageValue } from '../utility-types'
+import type { IPool } from './pool'
+
+export type ErrorHandler<Worker> = (this: Worker, e: Error) => void
+export type OnlineHandler<Worker> = (this: Worker) => void
+export type ExitHandler<Worker> = (this: Worker, code: number) => void
+
+export interface IWorker {
+ on(event: 'error', handler: ErrorHandler<this>): void
+ on(event: 'online', handler: OnlineHandler<this>): void
+ on(event: 'exit', handler: ExitHandler<this>): void
+}
+
+export interface PoolOptions<Worker> {
+ /**
+ * A function that will listen for error event on each worker.
+ */
+ errorHandler?: ErrorHandler<Worker>
+ /**
+ * A function that will listen for online event on each worker.
+ */
+ onlineHandler?: OnlineHandler<Worker>
+ /**
+ * A function that will listen for exit event on each worker.
+ */
+ exitHandler?: ExitHandler<Worker>
+ /**
+ * This is just to avoid not useful warnings message, is used to set `maxListeners` on event emitters (workers are event emitters).
+ *
+ * @default 1000
+ */
+ maxTasks?: number
+}
+
+class PoolEmitter extends EventEmitter {}
+
+export abstract class AbstractPool<
+ Worker extends IWorker,
+ // eslint-disable-next-line @typescript-eslint/no-explicit-any
+ Data = any,
+ // eslint-disable-next-line @typescript-eslint/no-explicit-any
+ Response = any
+> implements IPool<Data, Response> {
+ public readonly workers: Worker[] = []
+ public nextWorker: number = 0
+
+ /**
+ * `workerId` as key and an integer value
+ */
+ public readonly tasks: Map<Worker, number> = new Map<Worker, number>()
+
+ public readonly emitter: PoolEmitter
+
+ protected id: number = 0
+
+ public constructor (
+ public readonly numWorkers: number,
+ public readonly filePath: string,
+ public readonly opts: PoolOptions<Worker> = { maxTasks: 1000 }
+ ) {
+ if (!this.isMain()) {
+ throw new Error('Cannot start a pool from a worker!')
+ }
+ // TODO christopher 2021-02-07: Improve this check e.g. with a pattern or blank check
+ if (!this.filePath) {
+ throw new Error('Please specify a file with a worker implementation')
+ }
+
+ this.setupHook()
+
+ for (let i = 1; i <= this.numWorkers; i++) {
+ this.internalNewWorker()
+ }
+
+ this.emitter = new PoolEmitter()
+ }
+
+ protected setupHook (): void {
+ // Can be overridden
+ }
+
+ protected abstract isMain (): boolean
+
+ public async destroy (): Promise<void> {
+ for (const worker of this.workers) {
+ await this.destroyWorker(worker)
+ }
+ }
+
+ protected abstract destroyWorker (worker: Worker): void | Promise<void>
+
+ protected abstract sendToWorker (
+ worker: Worker,
+ message: MessageValue<Data>
+ ): void
+
+ protected addWorker (worker: Worker): void {
+ const previousWorkerIndex = this.tasks.get(worker)
+ if (previousWorkerIndex !== undefined) {
+ this.tasks.set(worker, previousWorkerIndex + 1)
+ } else {
+ throw Error('Worker could not be found in tasks map')
+ }
+ }
+
+ /**
+ * Execute the task specified into the constructor with the data parameter.
+ *
+ * @param data The input for the task specified.
+ * @returns Promise that is resolved when the task is done.
+ */
+ public execute (data: Data): Promise<Response> {
+ // configure worker to handle message with the specified task
+ const worker = this.chooseWorker()
+ this.addWorker(worker)
+ const id = ++this.id
+ const res = this.internalExecute(worker, id)
+ this.sendToWorker(worker, { data: data || ({} as Data), id: id })
+ return res
+ }
+
+ protected abstract registerWorkerMessageListener (
+ port: Worker,
+ listener: (message: MessageValue<Response>) => void
+ ): void
+
+ protected abstract unregisterWorkerMessageListener (
+ port: Worker,
+ listener: (message: MessageValue<Response>) => void
+ ): void
+
+ protected internalExecute (worker: Worker, id: number): Promise<Response> {
+ return new Promise((resolve, reject) => {
+ const listener: (message: MessageValue<Response>) => void = message => {
+ if (message.id === id) {
+ this.unregisterWorkerMessageListener(worker, listener)
+ this.addWorker(worker)
+ if (message.error) reject(message.error)
+ else resolve(message.data as Response)
+ }
+ }
+ this.registerWorkerMessageListener(worker, listener)
+ })
+ }
+
+ protected chooseWorker (): Worker {
+ if (this.workers.length - 1 === this.nextWorker) {
+ this.nextWorker = 0
+ return this.workers[this.nextWorker]
+ } else {
+ this.nextWorker++
+ return this.workers[this.nextWorker]
+ }
+ }
+
+ protected abstract newWorker (): Worker
+
+ protected abstract afterNewWorkerPushed (worker: Worker): void
+
+ protected internalNewWorker (): Worker {
+ const worker: Worker = this.newWorker()
+ worker.on('error', this.opts.errorHandler ?? (() => {}))
+ worker.on('online', this.opts.onlineHandler ?? (() => {}))
+ // TODO handle properly when a worker exit
+ worker.on('exit', this.opts.exitHandler ?? (() => {}))
+ this.workers.push(worker)
+ this.afterNewWorkerPushed(worker)
+ // init tasks map
+ this.tasks.set(worker, 0)
+ return worker
+ }
+}
-import { EventEmitter } from 'events'
-import type { FixedClusterPoolOptions, WorkerWithMessageChannel } from './fixed'
+import type { Worker } from 'cluster'
+import type { MessageValue } from '../../utility-types'
+import type { ClusterPoolOptions } from './fixed'
import { FixedClusterPool } from './fixed'
-class MyEmitter extends EventEmitter {}
-
-export type DynamicClusterPoolOptions = FixedClusterPoolOptions
-
/**
* A cluster pool with a min/max number of workers, is possible to execute tasks in sync or async mode as you prefer.
*
// eslint-disable-next-line @typescript-eslint/no-explicit-any
Response = any
> extends FixedClusterPool<Data, Response> {
- public readonly emitter: MyEmitter
-
/**
* @param min Min number of workers that will be always active
* @param max Max number of workers that will be active
* @param opts An object with possible options for example `errorHandler`, `onlineHandler`. Default: `{ maxTasks: 1000 }`
*/
public constructor (
- public readonly min: number,
+ min: number,
public readonly max: number,
- public readonly filename: string,
- public readonly opts: DynamicClusterPoolOptions = { maxTasks: 1000 }
+ filename: string,
+ opts: ClusterPoolOptions = { maxTasks: 1000 }
) {
super(min, filename, opts)
-
- this.emitter = new MyEmitter()
}
- protected chooseWorker (): WorkerWithMessageChannel {
- let worker: WorkerWithMessageChannel | undefined
+ protected chooseWorker (): Worker {
+ let worker: Worker | undefined
for (const entry of this.tasks) {
if (entry[1] === 0) {
worker = entry[0]
return super.chooseWorker()
}
// all workers are busy create a new worker
- const worker = this.newWorker()
- worker.on('message', (message: { kill?: number }) => {
+ const worker = this.internalNewWorker()
+ worker.on('message', (message: MessageValue<Data>) => {
if (message.kill) {
- worker.send({ kill: 1 })
- worker.kill()
+ this.sendToWorker(worker, { kill: 1 })
+ void this.destroyWorker(worker)
// clean workers from data structures
const workerIndex = this.workers.indexOf(worker)
this.workers.splice(workerIndex, 1)
-import type { SendHandle } from 'child_process'
import { fork, isMaster, setupMaster, Worker } from 'cluster'
import type { MessageValue } from '../../utility-types'
+import type { PoolOptions } from '../abstract-pool'
+import { AbstractPool } from '../abstract-pool'
-export type WorkerWithMessageChannel = Worker // & Draft<MessageChannel>
-
-export interface FixedClusterPoolOptions {
- /**
- * A function that will listen for error event on each worker.
- */
- errorHandler?: (this: Worker, e: Error) => void
- /**
- * A function that will listen for online event on each worker.
- */
- onlineHandler?: (this: Worker) => void
- /**
- * A function that will listen for exit event on each worker.
- */
- exitHandler?: (this: Worker, code: number) => void
- /**
- * This is just to avoid not useful warnings message, is used to set `maxListeners` on event emitters (workers are event emitters).
- *
- * @default 1000
- */
- maxTasks?: number
+export interface ClusterPoolOptions extends PoolOptions<Worker> {
/**
* Key/value pairs to add to worker process environment.
*
* @since 2.0.0
*/
// eslint-disable-next-line @typescript-eslint/no-explicit-any
-export class FixedClusterPool<Data = any, Response = any> {
- public readonly workers: WorkerWithMessageChannel[] = []
- public nextWorker: number = 0
-
- // workerId as key and an integer value
- public readonly tasks: Map<WorkerWithMessageChannel, number> = new Map<
- WorkerWithMessageChannel,
- number
- >()
-
- protected id: number = 0
-
+export class FixedClusterPool<Data = any, Response = any> extends AbstractPool<
+ Worker,
+ Data,
+ Response
+> {
/**
* @param numWorkers Number of workers for this pool.
* @param filePath A file path with implementation of `ClusterWorker` class, relative path is fine.
* @param opts An object with possible options for example `errorHandler`, `onlineHandler`. Default: `{ maxTasks: 1000 }`
*/
public constructor (
- public readonly numWorkers: number,
- public readonly filePath: string,
- public readonly opts: FixedClusterPoolOptions = { maxTasks: 1000 }
+ numWorkers: number,
+ filePath: string,
+ public readonly opts: ClusterPoolOptions = { maxTasks: 1000 }
) {
- if (!isMaster) {
- throw new Error('Cannot start a cluster pool from a worker!')
- }
- // TODO christopher 2021-02-09: Improve this check e.g. with a pattern or blank check
- if (!this.filePath) {
- throw new Error('Please specify a file with a worker implementation')
- }
+ super(numWorkers, filePath, opts)
+ }
+ protected setupHook (): void {
setupMaster({
exec: this.filePath
})
+ }
- for (let i = 1; i <= this.numWorkers; i++) {
- this.newWorker()
- }
+ protected isMain (): boolean {
+ return isMaster
}
- public destroy (): void {
- for (const worker of this.workers) {
- worker.kill()
- }
+ protected destroyWorker (worker: Worker): void {
+ worker.kill()
}
- /**
- * Execute the task specified into the constructor with the data parameter.
- *
- * @param data The input for the task specified.
- * @returns Promise that is resolved when the task is done.
- */
- public execute (data: Data): Promise<Response> {
- // configure worker to handle message with the specified task
- const worker: WorkerWithMessageChannel = this.chooseWorker()
- // console.log('FixedClusterPool#execute choosen worker:', worker)
- const previousWorkerIndex = this.tasks.get(worker)
- if (previousWorkerIndex !== undefined) {
- this.tasks.set(worker, previousWorkerIndex + 1)
- } else {
- throw Error('Worker could not be found in tasks map')
- }
- const id: number = ++this.id
- const res: Promise<Response> = this.internalExecute(worker, id)
- // console.log('FixedClusterPool#execute send data to worker:', worker)
- worker.send({ data: data || {}, id: id })
- return res
+ protected sendToWorker (worker: Worker, message: MessageValue<Data>): void {
+ worker.send(message)
}
- protected internalExecute (
- worker: WorkerWithMessageChannel,
- id: number
- ): Promise<Response> {
- return new Promise((resolve, reject) => {
- const listener: (
- message: MessageValue<Response>,
- handle: SendHandle
- ) => void = message => {
- // console.log('FixedClusterPool#internalExecute listener:', message)
- if (message.id === id) {
- worker.removeListener('message', listener)
- const previousWorkerIndex = this.tasks.get(worker)
- if (previousWorkerIndex !== undefined) {
- this.tasks.set(worker, previousWorkerIndex + 1)
- } else {
- throw Error('Worker could not be found in tasks map')
- }
- if (message.error) reject(message.error)
- else resolve(message.data as Response)
- }
- }
- worker.on('message', listener)
- })
+ protected registerWorkerMessageListener (
+ port: Worker,
+ listener: (message: MessageValue<Response>) => void
+ ): void {
+ port.on('message', listener)
+ }
+
+ protected unregisterWorkerMessageListener (
+ port: Worker,
+ listener: (message: MessageValue<Response>) => void
+ ): void {
+ port.removeListener('message', listener)
}
- protected chooseWorker (): WorkerWithMessageChannel {
- if (this.workers.length - 1 === this.nextWorker) {
- this.nextWorker = 0
- return this.workers[this.nextWorker]
- } else {
- this.nextWorker++
- return this.workers[this.nextWorker]
- }
+ protected newWorker (): Worker {
+ return fork(this.opts.env)
}
- protected newWorker (): WorkerWithMessageChannel {
- const worker: WorkerWithMessageChannel = fork(this.opts.env)
- worker.on('error', this.opts.errorHandler ?? (() => {}))
- worker.on('online', this.opts.onlineHandler ?? (() => {}))
- // TODO handle properly when a worker exit
- worker.on('exit', this.opts.exitHandler ?? (() => {}))
- this.workers.push(worker)
+ protected afterNewWorkerPushed (worker: Worker): void {
// we will attach a listener for every task,
// when task is completed the listener will be removed but to avoid warnings we are increasing the max listeners size
worker.setMaxListeners(this.opts.maxTasks ?? 1000)
- // init tasks map
- this.tasks.set(worker, 0)
- return worker
}
}
--- /dev/null
+export interface IPool<
+ // eslint-disable-next-line @typescript-eslint/no-explicit-any
+ Data = any,
+ // eslint-disable-next-line @typescript-eslint/no-explicit-any
+ Response = any
+> {
+ destroy(): Promise<void>
+ execute(data: Data): Promise<Response>
+}
-import { EventEmitter } from 'events'
-import type { FixedThreadPoolOptions, WorkerWithMessageChannel } from './fixed'
+import type { MessageValue } from '../../utility-types'
+import type { PoolOptions } from '../abstract-pool'
+import type { ThreadWorkerWithMessageChannel } from './fixed'
import { FixedThreadPool } from './fixed'
-class MyEmitter extends EventEmitter {}
-
-export type DynamicThreadPoolOptions = FixedThreadPoolOptions
-
/**
* A thread pool with a min/max number of threads, is possible to execute tasks in sync or async mode as you prefer.
*
// eslint-disable-next-line @typescript-eslint/no-explicit-any
Response = any
> extends FixedThreadPool<Data, Response> {
- public readonly emitter: MyEmitter
-
/**
* @param min Min number of threads that will be always active
* @param max Max number of threads that will be active
* @param opts An object with possible options for example `errorHandler`, `onlineHandler`. Default: `{ maxTasks: 1000 }`
*/
public constructor (
- public readonly min: number,
+ min: number,
public readonly max: number,
- public readonly filename: string,
- public readonly opts: DynamicThreadPoolOptions = { maxTasks: 1000 }
+ filename: string,
+ opts: PoolOptions<ThreadWorkerWithMessageChannel> = { maxTasks: 1000 }
) {
super(min, filename, opts)
-
- this.emitter = new MyEmitter()
}
- protected chooseWorker (): WorkerWithMessageChannel {
- let worker: WorkerWithMessageChannel | undefined
+ protected chooseWorker (): ThreadWorkerWithMessageChannel {
+ let worker: ThreadWorkerWithMessageChannel | undefined
for (const entry of this.tasks) {
if (entry[1] === 0) {
worker = entry[0]
return super.chooseWorker()
}
// all workers are busy create a new worker
- const worker = this.newWorker()
- worker.port2?.on('message', (message: { kill?: number }) => {
+ const worker = this.internalNewWorker()
+ worker.port2?.on('message', (message: MessageValue<Data>) => {
if (message.kill) {
- worker.postMessage({ kill: 1 })
- void worker.terminate()
+ this.sendToWorker(worker, { kill: 1 })
+ void this.destroyWorker(worker)
// clean workers from data structures
const workerIndex = this.workers.indexOf(worker)
this.workers.splice(workerIndex, 1)
import { isMainThread, MessageChannel, SHARE_ENV, Worker } from 'worker_threads'
import type { Draft, MessageValue } from '../../utility-types'
+import type { PoolOptions } from '../abstract-pool'
+import { AbstractPool } from '../abstract-pool'
-export type WorkerWithMessageChannel = Worker & Draft<MessageChannel>
-
-export interface FixedThreadPoolOptions {
- /**
- * A function that will listen for error event on each worker thread.
- */
- errorHandler?: (this: Worker, e: Error) => void
- /**
- * A function that will listen for online event on each worker thread.
- */
- onlineHandler?: (this: Worker) => void
- /**
- * A function that will listen for exit event on each worker thread.
- */
- exitHandler?: (this: Worker, code: number) => void
- /**
- * This is just to avoid not useful warnings message, is used to set `maxListeners` on event emitters (workers are event emitters).
- *
- * @default 1000
- */
- maxTasks?: number
-}
+export type ThreadWorkerWithMessageChannel = Worker & Draft<MessageChannel>
/**
* A thread pool with a static number of threads, is possible to execute tasks in sync or async mode as you prefer.
* @since 0.0.1
*/
// eslint-disable-next-line @typescript-eslint/no-explicit-any
-export class FixedThreadPool<Data = any, Response = any> {
- public readonly workers: WorkerWithMessageChannel[] = []
- public nextWorker: number = 0
-
- // threadId as key and an integer value
- public readonly tasks: Map<WorkerWithMessageChannel, number> = new Map<
- WorkerWithMessageChannel,
- number
- >()
-
- protected id: number = 0
-
+export class FixedThreadPool<Data = any, Response = any> extends AbstractPool<
+ ThreadWorkerWithMessageChannel,
+ Data,
+ Response
+> {
/**
* @param numThreads Num of threads for this worker pool.
* @param filePath A file path with implementation of `ThreadWorker` class, relative path is fine.
* @param opts An object with possible options for example `errorHandler`, `onlineHandler`. Default: `{ maxTasks: 1000 }`
*/
public constructor (
- public readonly numThreads: number,
- public readonly filePath: string,
- public readonly opts: FixedThreadPoolOptions = { maxTasks: 1000 }
+ numThreads: number,
+ filePath: string,
+ opts: PoolOptions<ThreadWorkerWithMessageChannel> = { maxTasks: 1000 }
) {
- if (!isMainThread) {
- throw new Error('Cannot start a thread pool from a worker thread !!!')
- }
- // TODO christopher 2021-02-07: Improve this check e.g. with a pattern or blank check
- if (!this.filePath) {
- throw new Error('Please specify a file with a worker implementation')
- }
+ super(numThreads, filePath, opts)
+ }
- for (let i = 1; i <= this.numThreads; i++) {
- this.newWorker()
- }
+ protected isMain (): boolean {
+ return isMainThread
}
- public async destroy (): Promise<void> {
- for (const worker of this.workers) {
- await worker.terminate()
- }
+ protected async destroyWorker (
+ worker: ThreadWorkerWithMessageChannel
+ ): Promise<void> {
+ await worker.terminate()
}
- /**
- * Execute the task specified into the constructor with the data parameter.
- *
- * @param data The input for the task specified.
- * @returns Promise that is resolved when the task is done.
- */
- public execute (data: Data): Promise<Response> {
- // configure worker to handle message with the specified task
- const worker = this.chooseWorker()
- const previousWorkerIndex = this.tasks.get(worker)
- if (previousWorkerIndex !== undefined) {
- this.tasks.set(worker, previousWorkerIndex + 1)
- } else {
- throw Error('Worker could not be found in tasks map')
- }
- const id = ++this.id
- const res = this.internalExecute(worker, id)
- worker.postMessage({ data: data || {}, id: id })
- return res
+ protected sendToWorker (
+ worker: ThreadWorkerWithMessageChannel,
+ message: MessageValue<Data>
+ ): void {
+ worker.postMessage(message)
}
- protected internalExecute (
- worker: WorkerWithMessageChannel,
- id: number
- ): Promise<Response> {
- return new Promise((resolve, reject) => {
- const listener: (message: MessageValue<Response>) => void = message => {
- if (message.id === id) {
- worker.port2?.removeListener('message', listener)
- const previousWorkerIndex = this.tasks.get(worker)
- if (previousWorkerIndex !== undefined) {
- this.tasks.set(worker, previousWorkerIndex + 1)
- } else {
- throw Error('Worker could not be found in tasks map')
- }
- if (message.error) reject(message.error)
- else resolve(message.data as Response)
- }
- }
- worker.port2?.on('message', listener)
- })
+ protected registerWorkerMessageListener (
+ port: ThreadWorkerWithMessageChannel,
+ listener: (message: MessageValue<Response>) => void
+ ): void {
+ port.port2?.on('message', listener)
}
- protected chooseWorker (): WorkerWithMessageChannel {
- if (this.workers.length - 1 === this.nextWorker) {
- this.nextWorker = 0
- return this.workers[this.nextWorker]
- } else {
- this.nextWorker++
- return this.workers[this.nextWorker]
- }
+ protected unregisterWorkerMessageListener (
+ port: ThreadWorkerWithMessageChannel,
+ listener: (message: MessageValue<Response>) => void
+ ): void {
+ port.port2?.removeListener('message', listener)
}
- protected newWorker (): WorkerWithMessageChannel {
- const worker: WorkerWithMessageChannel = new Worker(this.filePath, {
+ protected newWorker (): ThreadWorkerWithMessageChannel {
+ return new Worker(this.filePath, {
env: SHARE_ENV
})
- worker.on('error', this.opts.errorHandler ?? (() => {}))
- worker.on('online', this.opts.onlineHandler ?? (() => {}))
- // TODO handle properly when a thread exit
- worker.on('exit', this.opts.exitHandler ?? (() => {}))
- this.workers.push(worker)
+ }
+
+ protected afterNewWorkerPushed (
+ worker: ThreadWorkerWithMessageChannel
+ ): void {
const { port1, port2 } = new MessageChannel()
worker.postMessage({ parent: port1 }, [port1])
worker.port1 = port1
// we will attach a listener for every task,
// when task is completed the listener will be removed but to avoid warnings we are increasing the max listeners size
worker.port2.setMaxListeners(this.opts.maxTasks ?? 1000)
- // init tasks map
- this.tasks.set(worker, 0)
- return worker
}
}
--- /dev/null
+import { AsyncResource } from 'async_hooks'
+import type { MessageValue } from '../utility-types'
+import type { WorkerOptions } from './worker-options'
+
+export abstract class AbstractWorker<
+ MainWorker,
+ // eslint-disable-next-line @typescript-eslint/no-explicit-any
+ Data = any,
+ // eslint-disable-next-line @typescript-eslint/no-explicit-any
+ Response = any
+> extends AsyncResource {
+ protected readonly maxInactiveTime: number
+ protected readonly async: boolean
+ protected lastTask: number
+ protected readonly interval?: NodeJS.Timeout
+
+ /**
+ *
+ * @param type The type of async event.
+ * @param isMain
+ * @param fn
+ * @param opts
+ */
+ public constructor (
+ type: string,
+ isMain: boolean,
+ fn: (data: Data) => Response,
+ public readonly opts: WorkerOptions = {}
+ ) {
+ super(type)
+
+ this.maxInactiveTime = this.opts.maxInactiveTime ?? 1000 * 60
+ this.async = !!this.opts.async
+ this.lastTask = Date.now()
+ if (!fn) throw new Error('Fn parameter is mandatory')
+ // keep the worker active
+ if (!isMain) {
+ this.interval = setInterval(
+ this.checkAlive.bind(this),
+ this.maxInactiveTime / 2
+ )
+ this.checkAlive.bind(this)()
+ }
+ }
+
+ protected abstract getMainWorker (): MainWorker
+
+ protected abstract sendToMainWorker (message: MessageValue<Response>): void
+
+ protected checkAlive (): void {
+ if (Date.now() - this.lastTask > this.maxInactiveTime) {
+ this.sendToMainWorker({ kill: 1 })
+ }
+ }
+
+ protected handleError (e: Error | string): string {
+ return (e as unknown) as string
+ }
+
+ protected run (
+ fn: (data?: Data) => Response,
+ value: MessageValue<Data>
+ ): void {
+ try {
+ const res = fn(value.data)
+ this.sendToMainWorker({ data: res, id: value.id })
+ this.lastTask = Date.now()
+ } catch (e) {
+ const err = this.handleError(e)
+ this.sendToMainWorker({ error: err, id: value.id })
+ this.lastTask = Date.now()
+ }
+ }
+
+ protected runAsync (
+ fn: (data?: Data) => Promise<Response>,
+ value: MessageValue<Data>
+ ): void {
+ fn(value.data)
+ .then(res => {
+ this.sendToMainWorker({ data: res, id: value.id })
+ this.lastTask = Date.now()
+ return null
+ })
+ .catch(e => {
+ const err = this.handleError(e)
+ this.sendToMainWorker({ error: err, id: value.id })
+ this.lastTask = Date.now()
+ })
+ }
+}
-import { AsyncResource } from 'async_hooks'
+import type { Worker } from 'cluster'
import { isMaster, worker } from 'cluster'
import type { MessageValue } from '../utility-types'
+import { AbstractWorker } from './abstract-worker'
import type { WorkerOptions } from './worker-options'
/**
* @since 2.0.0
*/
// eslint-disable-next-line @typescript-eslint/no-explicit-any
-export class ClusterWorker<Data = any, Response = any> extends AsyncResource {
- protected readonly maxInactiveTime: number
- protected readonly async: boolean
- protected lastTask: number
- protected readonly interval?: NodeJS.Timeout
+export class ClusterWorker<Data = any, Response = any> extends AbstractWorker<
+ Worker,
+ Data,
+ Response
+> {
+ public constructor (fn: (data: Data) => Response, opts: WorkerOptions = {}) {
+ super('worker-cluster-pool:pioardi', isMaster, fn, opts)
- public constructor (
- fn: (data: Data) => Response,
- public readonly opts: WorkerOptions = {}
- ) {
- super('worker-cluster-pool:pioardi')
-
- this.maxInactiveTime = this.opts.maxInactiveTime ?? 1000 * 60
- this.async = !!this.opts.async
- this.lastTask = Date.now()
- if (!fn) throw new Error('Fn parameter is mandatory')
- // keep the worker active
- if (!isMaster) {
- // console.log('ClusterWorker#constructor', 'is not master')
- this.interval = setInterval(
- this.checkAlive.bind(this),
- this.maxInactiveTime / 2
- )
- this.checkAlive.bind(this)()
- }
worker.on('message', (value: MessageValue<Data>) => {
- // console.log("cluster.on('message', value)", value)
if (value?.data && value.id) {
// here you will receive messages
- // console.log('This is the main worker ' + isMaster)
+ // console.log('This is the main worker ' + isMain)
if (this.async) {
this.runInAsyncScope(this.runAsync.bind(this), this, fn, value)
} else {
})
}
- protected checkAlive (): void {
- if (Date.now() - this.lastTask > this.maxInactiveTime) {
- worker.send({ kill: 1 })
- }
+ protected getMainWorker (): Worker {
+ return worker
}
- protected run (
- fn: (data?: Data) => Response,
- value: MessageValue<Data>
- ): void {
- try {
- const res = fn(value.data as Data)
- worker.send({ data: res, id: value.id })
- this.lastTask = Date.now()
- } catch (e) {
- const err = e instanceof Error ? e.message : e
- worker.send({ error: err, id: value.id })
- this.lastTask = Date.now()
- }
+ protected sendToMainWorker (message: MessageValue<Response>): void {
+ this.getMainWorker().send(message)
}
- protected runAsync (
- fn: (data?: Data) => Promise<Response>,
- value: MessageValue<Data>
- ): void {
- fn(value.data)
- .then(res => {
- worker.send({ data: res, id: value.id })
- this.lastTask = Date.now()
- return null
- })
- .catch(e => {
- const err = e instanceof Error ? e.message : e
- worker.send({ error: err, id: value.id })
- this.lastTask = Date.now()
- })
+ protected handleError (e: Error | string): string {
+ return e instanceof Error ? e.message : e
}
}
-import { AsyncResource } from 'async_hooks'
import { isMainThread, parentPort } from 'worker_threads'
import type { MessageValue } from '../utility-types'
+import { AbstractWorker } from './abstract-worker'
import type { WorkerOptions } from './worker-options'
/**
* @since 0.0.1
*/
// eslint-disable-next-line @typescript-eslint/no-explicit-any
-export class ThreadWorker<Data = any, Response = any> extends AsyncResource {
- protected readonly maxInactiveTime: number
- protected readonly async: boolean
- protected lastTask: number
- protected readonly interval?: NodeJS.Timeout
+export class ThreadWorker<Data = any, Response = any> extends AbstractWorker<
+ MessagePort,
+ Data,
+ Response
+> {
protected parent?: MessagePort
- public constructor (
- fn: (data: Data) => Response,
- public readonly opts: WorkerOptions = {}
- ) {
- super('worker-thread-pool:pioardi')
+ public constructor (fn: (data: Data) => Response, opts: WorkerOptions = {}) {
+ super('worker-thread-pool:pioardi', isMainThread, fn, opts)
- this.maxInactiveTime = this.opts.maxInactiveTime ?? 1000 * 60
- this.async = !!this.opts.async
- this.lastTask = Date.now()
- if (!fn) throw new Error('Fn parameter is mandatory')
- // keep the worker active
- if (!isMainThread) {
- this.interval = setInterval(
- this.checkAlive.bind(this),
- this.maxInactiveTime / 2
- )
- this.checkAlive.bind(this)()
- }
parentPort?.on('message', (value: MessageValue<Data>) => {
if (value?.data && value.id) {
// here you will receive messages
- // console.log('This is the main thread ' + isMainThread)
+ // console.log('This is the main worker ' + isMain)
if (this.async) {
this.runInAsyncScope(this.runAsync.bind(this), this, fn, value)
} else {
// this will be received once
this.parent = value.parent
} else if (value.kill) {
- // here is time to kill this thread, just clearing the interval
+ // here is time to kill this worker, just clearing the interval
if (this.interval) clearInterval(this.interval)
this.emitDestroy()
}
})
}
- protected checkAlive (): void {
- if (Date.now() - this.lastTask > this.maxInactiveTime) {
- this.parent?.postMessage({ kill: 1 })
- }
- }
-
- protected run (
- fn: (data?: Data) => Response,
- value: MessageValue<Data>
- ): void {
- try {
- const res = fn(value.data)
- this.parent?.postMessage({ data: res, id: value.id })
- this.lastTask = Date.now()
- } catch (e) {
- this.parent?.postMessage({ error: e, id: value.id })
- this.lastTask = Date.now()
+ protected getMainWorker (): MessagePort {
+ if (!this.parent) {
+ throw new Error('Parent was not set')
}
+ return this.parent
}
- protected runAsync (
- fn: (data?: Data) => Promise<Response>,
- value: MessageValue<Data>
- ): void {
- fn(value.data)
- .then(res => {
- this.parent?.postMessage({ data: res, id: value.id })
- this.lastTask = Date.now()
- return null
- })
- .catch(e => {
- this.parent?.postMessage({ error: e, id: value.id })
- this.lastTask = Date.now()
- })
+ protected sendToMainWorker (message: MessageValue<Response>): void {
+ this.getMainWorker().postMessage(message)
}
}