Encapsulate logic of cluster and thread worker/pool (#116)
authorShinigami <chrissi92@hotmail.de>
Fri, 12 Feb 2021 07:17:48 +0000 (08:17 +0100)
committerGitHub <noreply@github.com>
Fri, 12 Feb 2021 07:17:48 +0000 (08:17 +0100)
Co-authored-by: aardizio <alessandroardizio94@gmail.com>
Co-authored-by: Jérôme Benoit <jerome.benoit@sap.com>
12 files changed:
.vscode/launch.json
package-lock.json
src/index.ts
src/pools/abstract-pool.ts [new file with mode: 0644]
src/pools/cluster/dynamic.ts
src/pools/cluster/fixed.ts
src/pools/pool.ts [new file with mode: 0644]
src/pools/thread/dynamic.ts
src/pools/thread/fixed.ts
src/worker/abstract-worker.ts [new file with mode: 0644]
src/worker/cluster-worker.ts
src/worker/thread-worker.ts

index 8a86588aa11d31c6977020fed716ee643f5e7b2f..c2c4c33f19bc39cdc7a5dc966f14967c8e0dea0a 100644 (file)
       "cwd": "${workspaceFolder}",
       "preLaunchTask": "Development build",
       "runtimeExecutable": "npm",
-      "runtimeArgs": [
-        "run-script",
-        "test:debug:vscode"
-      ],
-      "skipFiles": [
-        "<node_internals>/**"
-      ],
+      "runtimeArgs": ["run-script", "test:debug:vscode"],
+      "skipFiles": ["<node_internals>/**"],
       "stopOnEntry": true
     },
     {
       "cwd": "${workspaceFolder}",
       "preLaunchTask": "Development build",
       "runtimeExecutable": "npm",
-      "runtimeArgs": [
-        "run-script",
-        "benchmark:debug:vscode"
-      ],
-      "skipFiles": [
-        "<node_internals>/**"
-      ],
+      "runtimeArgs": ["run-script", "benchmark:debug:vscode"],
+      "skipFiles": ["<node_internals>/**"],
       "stopOnEntry": true
     }
   ]
index 68599a1ce13c9417d9e45b0c8595f07ce739ec8e..929cc4864464809577465cda860e5570c2dace8d 100644 (file)
         "source-map": "^0.5.0"
       },
       "dependencies": {
-        "@babel/parser": {
-          "version": "7.12.15",
-          "resolved": "https://registry.npmjs.org/@babel/parser/-/parser-7.12.15.tgz",
-          "integrity": "sha512-AQBOU2Z9kWwSZMd6lNjCX0GUgFonL1wAM1db8L8PMk9UDaGsRCArBkU4Sc+UCM3AE4hjbXx+h58Lb3QT4oRmrA==",
-          "dev": true
-        },
-        "json5": {
-          "version": "2.2.0",
-          "resolved": "https://registry.npmjs.org/json5/-/json5-2.2.0.tgz",
-          "integrity": "sha512-f+8cldu7X/y7RAJurMEJmdoKXGB/X550w2Nr3tTbezL6RwEE/iMcm+tZnXeoZtKuOq6ft8+CqzEkrIgx1fPoQA==",
-          "dev": true,
-          "requires": {
-            "minimist": "^1.2.5"
-          }
-        },
         "semver": {
           "version": "5.7.1",
           "resolved": "https://registry.npmjs.org/semver/-/semver-5.7.1.tgz",
           "integrity": "sha512-sauaDf/PZdVgrLTNYHRtpXa1iRiKcaebiKQ1BJdpQlWH2lCvexQdX55snPFyK7QzpudqbCI0qXFfOasHdyNDGQ==",
           "dev": true
-        },
-        "source-map": {
-          "version": "0.5.7",
-          "resolved": "https://registry.npmjs.org/source-map/-/source-map-0.5.7.tgz",
-          "integrity": "sha1-igOdLRAh0i0eoUyA2OpGi6LvP8w=",
-          "dev": true
         }
       }
     },
         "@babel/types": "^7.12.13",
         "jsesc": "^2.5.1",
         "source-map": "^0.5.0"
-      },
-      "dependencies": {
-        "source-map": {
-          "version": "0.5.7",
-          "resolved": "https://registry.npmjs.org/source-map/-/source-map-0.5.7.tgz",
-          "integrity": "sha1-igOdLRAh0i0eoUyA2OpGi6LvP8w=",
-          "dev": true
-        }
       }
     },
     "@babel/helper-annotate-as-pure": {
       }
     },
     "@babel/parser": {
-      "version": "7.12.11",
-      "resolved": "https://registry.npmjs.org/@babel/parser/-/parser-7.12.11.tgz",
-      "integrity": "sha512-N3UxG+uuF4CMYoNj8AhnbAcJF0PiuJ9KHuy1lQmkYsxTer/MAH9UBNHsBoAX/4s6NvlDD047No8mYVGGzLL4hg==",
+      "version": "7.12.15",
+      "resolved": "https://registry.npmjs.org/@babel/parser/-/parser-7.12.15.tgz",
+      "integrity": "sha512-AQBOU2Z9kWwSZMd6lNjCX0GUgFonL1wAM1db8L8PMk9UDaGsRCArBkU4Sc+UCM3AE4hjbXx+h58Lb3QT4oRmrA==",
       "dev": true
     },
     "@babel/plugin-proposal-async-generator-functions": {
         "@babel/helper-replace-supers": "^7.12.13",
         "@babel/helper-split-export-declaration": "^7.12.13",
         "globals": "^11.1.0"
-      },
-      "dependencies": {
-        "globals": {
-          "version": "11.12.0",
-          "resolved": "https://registry.npmjs.org/globals/-/globals-11.12.0.tgz",
-          "integrity": "sha512-WOBp/EEGUiIsJSp7wcv/y6MO+lV9UoncWqxuFfm8eBwzWNgyfBd6Gz+IeKQ9jCmyhoH99g15M3T+QaVHFjizVA==",
-          "dev": true
-        }
       }
     },
     "@babel/plugin-transform-computed-properties": {
         "@babel/code-frame": "^7.12.13",
         "@babel/parser": "^7.12.13",
         "@babel/types": "^7.12.13"
-      },
-      "dependencies": {
-        "@babel/parser": {
-          "version": "7.12.15",
-          "resolved": "https://registry.npmjs.org/@babel/parser/-/parser-7.12.15.tgz",
-          "integrity": "sha512-AQBOU2Z9kWwSZMd6lNjCX0GUgFonL1wAM1db8L8PMk9UDaGsRCArBkU4Sc+UCM3AE4hjbXx+h58Lb3QT4oRmrA==",
-          "dev": true
-        }
       }
     },
     "@babel/traverse": {
         "debug": "^4.1.0",
         "globals": "^11.1.0",
         "lodash": "^4.17.19"
-      },
-      "dependencies": {
-        "@babel/parser": {
-          "version": "7.12.15",
-          "resolved": "https://registry.npmjs.org/@babel/parser/-/parser-7.12.15.tgz",
-          "integrity": "sha512-AQBOU2Z9kWwSZMd6lNjCX0GUgFonL1wAM1db8L8PMk9UDaGsRCArBkU4Sc+UCM3AE4hjbXx+h58Lb3QT4oRmrA==",
-          "dev": true
-        },
-        "globals": {
-          "version": "11.12.0",
-          "resolved": "https://registry.npmjs.org/globals/-/globals-11.12.0.tgz",
-          "integrity": "sha512-WOBp/EEGUiIsJSp7wcv/y6MO+lV9UoncWqxuFfm8eBwzWNgyfBd6Gz+IeKQ9jCmyhoH99g15M3T+QaVHFjizVA==",
-          "dev": true
-        }
       }
     },
     "@babel/types": {
         "strip-json-comments": "^3.1.1"
       },
       "dependencies": {
+        "globals": {
+          "version": "12.4.0",
+          "resolved": "https://registry.npmjs.org/globals/-/globals-12.4.0.tgz",
+          "integrity": "sha512-BWICuzzDvDoH54NHKCseDanAhE3CeDorgDL5MT6LMXXj2WCnd9UC2szdk4AWLfjdgNBCXLUanXYcpBBKOSWGwg==",
+          "dev": true,
+          "requires": {
+            "type-fest": "^0.8.1"
+          }
+        },
         "ignore": {
           "version": "4.0.6",
           "resolved": "https://registry.npmjs.org/ignore/-/ignore-4.0.6.tgz",
       "dev": true,
       "requires": {
         "safe-buffer": "~5.1.1"
-      },
-      "dependencies": {
-        "safe-buffer": {
-          "version": "5.1.2",
-          "resolved": "https://registry.npmjs.org/safe-buffer/-/safe-buffer-5.1.2.tgz",
-          "integrity": "sha512-Gd2UZBJDkXlY7GbJxfsE8/nvKkUEU1G38c1siN6QP6a9PT9MmHB8GnpscSmMJSoF8LOIrt8ud/wPtojys4G6+g==",
-          "dev": true
-        }
       }
     },
     "core-js-compat": {
         "v8-compile-cache": "^2.0.3"
       },
       "dependencies": {
+        "globals": {
+          "version": "12.4.0",
+          "resolved": "https://registry.npmjs.org/globals/-/globals-12.4.0.tgz",
+          "integrity": "sha512-BWICuzzDvDoH54NHKCseDanAhE3CeDorgDL5MT6LMXXj2WCnd9UC2szdk4AWLfjdgNBCXLUanXYcpBBKOSWGwg==",
+          "dev": true,
+          "requires": {
+            "type-fest": "^0.8.1"
+          }
+        },
         "ignore": {
           "version": "4.0.6",
           "resolved": "https://registry.npmjs.org/ignore/-/ignore-4.0.6.tgz",
       }
     },
     "globals": {
-      "version": "12.4.0",
-      "resolved": "https://registry.npmjs.org/globals/-/globals-12.4.0.tgz",
-      "integrity": "sha512-BWICuzzDvDoH54NHKCseDanAhE3CeDorgDL5MT6LMXXj2WCnd9UC2szdk4AWLfjdgNBCXLUanXYcpBBKOSWGwg==",
-      "dev": true,
-      "requires": {
-        "type-fest": "^0.8.1"
-      }
+      "version": "11.12.0",
+      "resolved": "https://registry.npmjs.org/globals/-/globals-11.12.0.tgz",
+      "integrity": "sha512-WOBp/EEGUiIsJSp7wcv/y6MO+lV9UoncWqxuFfm8eBwzWNgyfBd6Gz+IeKQ9jCmyhoH99g15M3T+QaVHFjizVA==",
+      "dev": true
     },
     "globby": {
       "version": "11.0.2",
       }
     },
     "graceful-fs": {
-      "version": "4.2.5",
-      "resolved": "https://registry.npmjs.org/graceful-fs/-/graceful-fs-4.2.5.tgz",
-      "integrity": "sha512-kBBSQbz2K0Nyn+31j/w36fUfxkBW9/gfwRWdUY1ULReH3iokVJgddZAFcD1D0xlgTmFxJCbUkUclAlc6/IDJkw==",
+      "version": "4.2.6",
+      "resolved": "https://registry.npmjs.org/graceful-fs/-/graceful-fs-4.2.6.tgz",
+      "integrity": "sha512-nTnJ528pbqxYanhpDYsi4Rd8MAeaBA67+RZ10CM1m3bTAVFEDcd5AuA4a6W5YkGZ1iNXHzZz8T6TBKLeBuNriQ==",
       "dev": true
     },
     "graphql": {
         "source-map": "^0.6.1",
         "uglify-js": "^3.1.4",
         "wordwrap": "^1.0.0"
+      },
+      "dependencies": {
+        "source-map": {
+          "version": "0.6.1",
+          "resolved": "https://registry.npmjs.org/source-map/-/source-map-0.6.1.tgz",
+          "integrity": "sha512-UjgapumWlbMhkBgzT7Ykc5YXUT46F0iKu8SGXq0bcwP5dz/h0Plj6enJqjz1Zbq2l5WaqYnrVbwWOWMyF3F47g==",
+          "dev": true
+        }
       }
     },
     "har-schema": {
         "debug": "^4.1.1",
         "istanbul-lib-coverage": "^3.0.0",
         "source-map": "^0.6.1"
+      },
+      "dependencies": {
+        "source-map": {
+          "version": "0.6.1",
+          "resolved": "https://registry.npmjs.org/source-map/-/source-map-0.6.1.tgz",
+          "integrity": "sha512-UjgapumWlbMhkBgzT7Ykc5YXUT46F0iKu8SGXq0bcwP5dz/h0Plj6enJqjz1Zbq2l5WaqYnrVbwWOWMyF3F47g==",
+          "dev": true
+        }
       }
     },
     "istanbul-reports": {
       "dev": true
     },
     "json5": {
-      "version": "1.0.1",
-      "resolved": "https://registry.npmjs.org/json5/-/json5-1.0.1.tgz",
-      "integrity": "sha512-aKS4WQjPenRxiQsC93MNfjx+nbF4PAdYzmd/1JIj8HYzqfbu86beTuNgXDzPknWk0n0uARlyewZo4s++ES36Ow==",
+      "version": "2.2.0",
+      "resolved": "https://registry.npmjs.org/json5/-/json5-2.2.0.tgz",
+      "integrity": "sha512-f+8cldu7X/y7RAJurMEJmdoKXGB/X550w2Nr3tTbezL6RwEE/iMcm+tZnXeoZtKuOq6ft8+CqzEkrIgx1fPoQA==",
       "dev": true,
       "requires": {
-        "minimist": "^1.2.0"
+        "minimist": "^1.2.5"
       }
     },
     "jsonify": {
             }
           }
         },
+        "source-map": {
+          "version": "0.6.1",
+          "resolved": "https://registry.npmjs.org/source-map/-/source-map-0.6.1.tgz",
+          "integrity": "sha512-UjgapumWlbMhkBgzT7Ykc5YXUT46F0iKu8SGXq0bcwP5dz/h0Plj6enJqjz1Zbq2l5WaqYnrVbwWOWMyF3F47g==",
+          "dev": true
+        },
         "supports-color": {
           "version": "6.1.0",
           "resolved": "https://registry.npmjs.org/supports-color/-/supports-color-6.1.0.tgz",
             "@babel/highlight": "^7.10.4"
           }
         },
+        "@babel/parser": {
+          "version": "7.12.11",
+          "resolved": "https://registry.npmjs.org/@babel/parser/-/parser-7.12.11.tgz",
+          "integrity": "sha512-N3UxG+uuF4CMYoNj8AhnbAcJF0PiuJ9KHuy1lQmkYsxTer/MAH9UBNHsBoAX/4s6NvlDD047No8mYVGGzLL4hg==",
+          "dev": true
+        },
         "@typescript-eslint/typescript-estree": {
           "version": "2.34.0",
           "resolved": "https://registry.npmjs.org/@typescript-eslint/typescript-estree/-/typescript-estree-2.34.0.tgz",
           "resolved": "https://registry.npmjs.org/ignore/-/ignore-4.0.6.tgz",
           "integrity": "sha512-cyFDKrqc/YdcWFniJhzI42+AzS+gNwmUzOSFcRCQYwySuBBBy/KjuxWLZ/FHEH6Moq1NizMOBWyTcv8O4OZIMg==",
           "dev": true
+        },
+        "resolve": {
+          "version": "1.19.0",
+          "resolved": "https://registry.npmjs.org/resolve/-/resolve-1.19.0.tgz",
+          "integrity": "sha512-rArEXAgsBG4UgRGcynxWIWKFvh/XZCcS8UJdHhwy91zwAvCZIbcs+vAbflgBnNjYMs/i/i+/Ux6IZhML1yPvxg==",
+          "dev": true,
+          "requires": {
+            "is-core-module": "^2.1.0",
+            "path-parse": "^1.0.6"
+          }
         }
       }
     },
       "integrity": "sha512-N5ZAX4/LxJmF+7wN74pUD6qAh9/wnvdQcjq9TZjevvXzSUo7bfmw91saqMjzGS2xq91/odN2dW/WOl7qQHNDGA==",
       "dev": true
     },
+    "queue-microtask": {
+      "version": "1.2.2",
+      "resolved": "https://registry.npmjs.org/queue-microtask/-/queue-microtask-1.2.2.tgz",
+      "integrity": "sha512-dB15eXv3p2jDlbOiNLyMabYg1/sXvppd8DP2J3EOCQ0AkuSXCW2tP7mnVouVLJKgUMY6yP0kcQDVpLCN13h4Xg==",
+      "dev": true
+    },
     "randombytes": {
       "version": "2.1.0",
       "resolved": "https://registry.npmjs.org/randombytes/-/randombytes-2.1.0.tgz",
       "dev": true
     },
     "resolve": {
-      "version": "1.19.0",
-      "resolved": "https://registry.npmjs.org/resolve/-/resolve-1.19.0.tgz",
-      "integrity": "sha512-rArEXAgsBG4UgRGcynxWIWKFvh/XZCcS8UJdHhwy91zwAvCZIbcs+vAbflgBnNjYMs/i/i+/Ux6IZhML1yPvxg==",
+      "version": "1.20.0",
+      "resolved": "https://registry.npmjs.org/resolve/-/resolve-1.20.0.tgz",
+      "integrity": "sha512-wENBPt4ySzg4ybFQW2TT1zMQucPK95HSh/nq2CFTZVOGut2+pQvSsgtda4d26YrYcr067wjbmzOG8byDPBX63A==",
       "dev": true,
       "requires": {
-        "is-core-module": "^2.1.0",
+        "is-core-module": "^2.2.0",
         "path-parse": "^1.0.6"
       }
     },
       }
     },
     "run-parallel": {
-      "version": "1.1.10",
-      "resolved": "https://registry.npmjs.org/run-parallel/-/run-parallel-1.1.10.tgz",
-      "integrity": "sha512-zb/1OuZ6flOlH6tQyMPUrE3x3Ulxjlo9WIVXR4yVYi4H9UXQaeIsPbLn2R3O3vQCnDKkAl2qHiuocKKX4Tz/Sw==",
-      "dev": true
+      "version": "1.2.0",
+      "resolved": "https://registry.npmjs.org/run-parallel/-/run-parallel-1.2.0.tgz",
+      "integrity": "sha512-5l4VyZR86LZ/lDxZTR6jqL8AFE2S0IFLMP26AbjsLVADxHdhB/c0GUsH+y39UfCi3dzz8OlQuPmnaJOMoDHQBA==",
+      "dev": true,
+      "requires": {
+        "queue-microtask": "^1.2.2"
+      }
     },
     "safe-buffer": {
-      "version": "5.2.1",
-      "resolved": "https://registry.npmjs.org/safe-buffer/-/safe-buffer-5.2.1.tgz",
-      "integrity": "sha512-rp3So07KcdmmKbGvgaNxQSJr7bGVSVk5S9Eq1F+ppbRo70+YeaDxkw5Dd8NPN+GD6bjnYm2VuPuCXmpuYvmCXQ==",
+      "version": "5.1.2",
+      "resolved": "https://registry.npmjs.org/safe-buffer/-/safe-buffer-5.1.2.tgz",
+      "integrity": "sha512-Gd2UZBJDkXlY7GbJxfsE8/nvKkUEU1G38c1siN6QP6a9PT9MmHB8GnpscSmMJSoF8LOIrt8ud/wPtojys4G6+g==",
       "dev": true
     },
     "safer-buffer": {
       }
     },
     "source-map": {
-      "version": "0.6.1",
-      "resolved": "https://registry.npmjs.org/source-map/-/source-map-0.6.1.tgz",
-      "integrity": "sha512-UjgapumWlbMhkBgzT7Ykc5YXUT46F0iKu8SGXq0bcwP5dz/h0Plj6enJqjz1Zbq2l5WaqYnrVbwWOWMyF3F47g==",
+      "version": "0.5.7",
+      "resolved": "https://registry.npmjs.org/source-map/-/source-map-0.5.7.tgz",
+      "integrity": "sha1-igOdLRAh0i0eoUyA2OpGi6LvP8w=",
       "dev": true
     },
     "source-map-support": {
       "requires": {
         "buffer-from": "^1.0.0",
         "source-map": "^0.6.0"
+      },
+      "dependencies": {
+        "source-map": {
+          "version": "0.6.1",
+          "resolved": "https://registry.npmjs.org/source-map/-/source-map-0.6.1.tgz",
+          "integrity": "sha512-UjgapumWlbMhkBgzT7Ykc5YXUT46F0iKu8SGXq0bcwP5dz/h0Plj6enJqjz1Zbq2l5WaqYnrVbwWOWMyF3F47g==",
+          "dev": true
+        }
       }
     },
     "sourcemap-codec": {
       },
       "dependencies": {
         "ajv": {
-          "version": "7.0.4",
-          "resolved": "https://registry.npmjs.org/ajv/-/ajv-7.0.4.tgz",
-          "integrity": "sha512-xzzzaqgEQfmuhbhAoqjJ8T/1okb6gAzXn/eQRNpAN1AEUoHJTNF9xCDRTtf/s3SKldtZfa+RJeTs+BQq+eZ/sw==",
+          "version": "7.1.0",
+          "resolved": "https://registry.npmjs.org/ajv/-/ajv-7.1.0.tgz",
+          "integrity": "sha512-svS9uILze/cXbH0z2myCK2Brqprx/+JJYK5pHicT/GQiBfzzhUVAIT6MwqJg8y4xV/zoGsUeuPuwtoiKSGE15g==",
           "dev": true,
           "requires": {
             "fast-deep-equal": "^3.1.1",
         "json5": "^1.0.1",
         "minimist": "^1.2.0",
         "strip-bom": "^3.0.0"
+      },
+      "dependencies": {
+        "json5": {
+          "version": "1.0.1",
+          "resolved": "https://registry.npmjs.org/json5/-/json5-1.0.1.tgz",
+          "integrity": "sha512-aKS4WQjPenRxiQsC93MNfjx+nbF4PAdYzmd/1JIj8HYzqfbu86beTuNgXDzPknWk0n0uARlyewZo4s++ES36Ow==",
+          "dev": true,
+          "requires": {
+            "minimist": "^1.2.0"
+          }
+        }
       }
     },
     "tslib": {
index 95ef8255d659c39e545044e05a749db6f201dad4..7242dd5e1fbe4c36bbbbd00f17bc3af7cc2c756d 100644 (file)
@@ -1,26 +1,18 @@
-import { DynamicClusterPool } from './pools/cluster/dynamic'
-import { FixedClusterPool } from './pools/cluster/fixed'
-import { DynamicThreadPool } from './pools/thread/dynamic'
-import { FixedThreadPool } from './pools/thread/fixed'
-import { ClusterWorker } from './worker/cluster-worker'
-import { ThreadWorker } from './worker/thread-worker'
-
-export type { DynamicClusterPoolOptions } from './pools/cluster/dynamic'
 export type {
-  FixedClusterPoolOptions,
-  WorkerWithMessageChannel as ClusterWorkerWithMessageChannel
-} from './pools/cluster/fixed'
-export type { DynamicThreadPoolOptions } from './pools/thread/dynamic'
-export type {
-  FixedThreadPoolOptions,
-  WorkerWithMessageChannel as ThreadWorkerWithMessageChannel
-} from './pools/thread/fixed'
+  ErrorHandler,
+  ExitHandler,
+  IWorker,
+  OnlineHandler,
+  PoolOptions
+} from './pools/abstract-pool'
+export { DynamicClusterPool } from './pools/cluster/dynamic'
+export { FixedClusterPool } from './pools/cluster/fixed'
+export type { ClusterPoolOptions } from './pools/cluster/fixed'
+export type { IPool } from './pools/pool'
+export { DynamicThreadPool } from './pools/thread/dynamic'
+export { FixedThreadPool } from './pools/thread/fixed'
+export type { ThreadWorkerWithMessageChannel } from './pools/thread/fixed'
+export { AbstractWorker } from './worker/abstract-worker'
+export { ClusterWorker } from './worker/cluster-worker'
+export { ThreadWorker } from './worker/thread-worker'
 export type { WorkerOptions } from './worker/worker-options'
-export {
-  FixedThreadPool,
-  FixedClusterPool,
-  DynamicClusterPool,
-  DynamicThreadPool,
-  ThreadWorker,
-  ClusterWorker
-}
diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts
new file mode 100644 (file)
index 0000000..d169afa
--- /dev/null
@@ -0,0 +1,173 @@
+import EventEmitter from 'events'
+import type { MessageValue } from '../utility-types'
+import type { IPool } from './pool'
+
+export type ErrorHandler<Worker> = (this: Worker, e: Error) => void
+export type OnlineHandler<Worker> = (this: Worker) => void
+export type ExitHandler<Worker> = (this: Worker, code: number) => void
+
+export interface IWorker {
+  on(event: 'error', handler: ErrorHandler<this>): void
+  on(event: 'online', handler: OnlineHandler<this>): void
+  on(event: 'exit', handler: ExitHandler<this>): void
+}
+
+export interface PoolOptions<Worker> {
+  /**
+   * A function that will listen for error event on each worker.
+   */
+  errorHandler?: ErrorHandler<Worker>
+  /**
+   * A function that will listen for online event on each worker.
+   */
+  onlineHandler?: OnlineHandler<Worker>
+  /**
+   * A function that will listen for exit event on each worker.
+   */
+  exitHandler?: ExitHandler<Worker>
+  /**
+   * This is just to avoid not useful warnings message, is used to set `maxListeners` on event emitters (workers are event emitters).
+   *
+   * @default 1000
+   */
+  maxTasks?: number
+}
+
+class PoolEmitter extends EventEmitter {}
+
+export abstract class AbstractPool<
+  Worker extends IWorker,
+  // eslint-disable-next-line @typescript-eslint/no-explicit-any
+  Data = any,
+  // eslint-disable-next-line @typescript-eslint/no-explicit-any
+  Response = any
+> implements IPool<Data, Response> {
+  public readonly workers: Worker[] = []
+  public nextWorker: number = 0
+
+  /**
+   * `workerId` as key and an integer value
+   */
+  public readonly tasks: Map<Worker, number> = new Map<Worker, number>()
+
+  public readonly emitter: PoolEmitter
+
+  protected id: number = 0
+
+  public constructor (
+    public readonly numWorkers: number,
+    public readonly filePath: string,
+    public readonly opts: PoolOptions<Worker> = { maxTasks: 1000 }
+  ) {
+    if (!this.isMain()) {
+      throw new Error('Cannot start a pool from a worker!')
+    }
+    // TODO christopher 2021-02-07: Improve this check e.g. with a pattern or blank check
+    if (!this.filePath) {
+      throw new Error('Please specify a file with a worker implementation')
+    }
+
+    this.setupHook()
+
+    for (let i = 1; i <= this.numWorkers; i++) {
+      this.internalNewWorker()
+    }
+
+    this.emitter = new PoolEmitter()
+  }
+
+  protected setupHook (): void {
+    // Can be overridden
+  }
+
+  protected abstract isMain (): boolean
+
+  public async destroy (): Promise<void> {
+    for (const worker of this.workers) {
+      await this.destroyWorker(worker)
+    }
+  }
+
+  protected abstract destroyWorker (worker: Worker): void | Promise<void>
+
+  protected abstract sendToWorker (
+    worker: Worker,
+    message: MessageValue<Data>
+  ): void
+
+  protected addWorker (worker: Worker): void {
+    const previousWorkerIndex = this.tasks.get(worker)
+    if (previousWorkerIndex !== undefined) {
+      this.tasks.set(worker, previousWorkerIndex + 1)
+    } else {
+      throw Error('Worker could not be found in tasks map')
+    }
+  }
+
+  /**
+   * Execute the task specified into the constructor with the data parameter.
+   *
+   * @param data The input for the task specified.
+   * @returns Promise that is resolved when the task is done.
+   */
+  public execute (data: Data): Promise<Response> {
+    // configure worker to handle message with the specified task
+    const worker = this.chooseWorker()
+    this.addWorker(worker)
+    const id = ++this.id
+    const res = this.internalExecute(worker, id)
+    this.sendToWorker(worker, { data: data || ({} as Data), id: id })
+    return res
+  }
+
+  protected abstract registerWorkerMessageListener (
+    port: Worker,
+    listener: (message: MessageValue<Response>) => void
+  ): void
+
+  protected abstract unregisterWorkerMessageListener (
+    port: Worker,
+    listener: (message: MessageValue<Response>) => void
+  ): void
+
+  protected internalExecute (worker: Worker, id: number): Promise<Response> {
+    return new Promise((resolve, reject) => {
+      const listener: (message: MessageValue<Response>) => void = message => {
+        if (message.id === id) {
+          this.unregisterWorkerMessageListener(worker, listener)
+          this.addWorker(worker)
+          if (message.error) reject(message.error)
+          else resolve(message.data as Response)
+        }
+      }
+      this.registerWorkerMessageListener(worker, listener)
+    })
+  }
+
+  protected chooseWorker (): Worker {
+    if (this.workers.length - 1 === this.nextWorker) {
+      this.nextWorker = 0
+      return this.workers[this.nextWorker]
+    } else {
+      this.nextWorker++
+      return this.workers[this.nextWorker]
+    }
+  }
+
+  protected abstract newWorker (): Worker
+
+  protected abstract afterNewWorkerPushed (worker: Worker): void
+
+  protected internalNewWorker (): Worker {
+    const worker: Worker = this.newWorker()
+    worker.on('error', this.opts.errorHandler ?? (() => {}))
+    worker.on('online', this.opts.onlineHandler ?? (() => {}))
+    // TODO handle properly when a worker exit
+    worker.on('exit', this.opts.exitHandler ?? (() => {}))
+    this.workers.push(worker)
+    this.afterNewWorkerPushed(worker)
+    // init tasks map
+    this.tasks.set(worker, 0)
+    return worker
+  }
+}
index d4bf30f39b3a58f7572064ca9627f434a6a77308..f2375a02d6e1babf875d27b40f01782849828af0 100644 (file)
@@ -1,11 +1,8 @@
-import { EventEmitter } from 'events'
-import type { FixedClusterPoolOptions, WorkerWithMessageChannel } from './fixed'
+import type { Worker } from 'cluster'
+import type { MessageValue } from '../../utility-types'
+import type { ClusterPoolOptions } from './fixed'
 import { FixedClusterPool } from './fixed'
 
-class MyEmitter extends EventEmitter {}
-
-export type DynamicClusterPoolOptions = FixedClusterPoolOptions
-
 /**
  * A cluster pool with a min/max number of workers, is possible to execute tasks in sync or async mode as you prefer.
  *
@@ -21,8 +18,6 @@ export class DynamicClusterPool<
   // eslint-disable-next-line @typescript-eslint/no-explicit-any
   Response = any
 > extends FixedClusterPool<Data, Response> {
-  public readonly emitter: MyEmitter
-
   /**
    * @param min Min number of workers that will be always active
    * @param max Max number of workers that will be active
@@ -30,18 +25,16 @@ export class DynamicClusterPool<
    * @param opts An object with possible options for example `errorHandler`, `onlineHandler`. Default: `{ maxTasks: 1000 }`
    */
   public constructor (
-    public readonly min: number,
+    min: number,
     public readonly max: number,
-    public readonly filename: string,
-    public readonly opts: DynamicClusterPoolOptions = { maxTasks: 1000 }
+    filename: string,
+    opts: ClusterPoolOptions = { maxTasks: 1000 }
   ) {
     super(min, filename, opts)
-
-    this.emitter = new MyEmitter()
   }
 
-  protected chooseWorker (): WorkerWithMessageChannel {
-    let worker: WorkerWithMessageChannel | undefined
+  protected chooseWorker (): Worker {
+    let worker: Worker | undefined
     for (const entry of this.tasks) {
       if (entry[1] === 0) {
         worker = entry[0]
@@ -58,11 +51,11 @@ export class DynamicClusterPool<
         return super.chooseWorker()
       }
       // all workers are busy create a new worker
-      const worker = this.newWorker()
-      worker.on('message', (message: { kill?: number }) => {
+      const worker = this.internalNewWorker()
+      worker.on('message', (message: MessageValue<Data>) => {
         if (message.kill) {
-          worker.send({ kill: 1 })
-          worker.kill()
+          this.sendToWorker(worker, { kill: 1 })
+          void this.destroyWorker(worker)
           // clean workers from data structures
           const workerIndex = this.workers.indexOf(worker)
           this.workers.splice(workerIndex, 1)
index 332daa4d2b4cfa33225f3ff10a557d5f1a2fbde7..00205449a2f3fc0bee0feb54b7d41bea56b8ad62 100644 (file)
@@ -1,28 +1,9 @@
-import type { SendHandle } from 'child_process'
 import { fork, isMaster, setupMaster, Worker } from 'cluster'
 import type { MessageValue } from '../../utility-types'
+import type { PoolOptions } from '../abstract-pool'
+import { AbstractPool } from '../abstract-pool'
 
-export type WorkerWithMessageChannel = Worker // & Draft<MessageChannel>
-
-export interface FixedClusterPoolOptions {
-  /**
-   * A function that will listen for error event on each worker.
-   */
-  errorHandler?: (this: Worker, e: Error) => void
-  /**
-   * A function that will listen for online event on each worker.
-   */
-  onlineHandler?: (this: Worker) => void
-  /**
-   * A function that will listen for exit event on each worker.
-   */
-  exitHandler?: (this: Worker, code: number) => void
-  /**
-   * This is just to avoid not useful warnings message, is used to set `maxListeners` on event emitters (workers are event emitters).
-   *
-   * @default 1000
-   */
-  maxTasks?: number
+export interface ClusterPoolOptions extends PoolOptions<Worker> {
   /**
    * Key/value pairs to add to worker process environment.
    *
@@ -41,122 +22,63 @@ export interface FixedClusterPoolOptions {
  * @since 2.0.0
  */
 // eslint-disable-next-line @typescript-eslint/no-explicit-any
-export class FixedClusterPool<Data = any, Response = any> {
-  public readonly workers: WorkerWithMessageChannel[] = []
-  public nextWorker: number = 0
-
-  // workerId as key and an integer value
-  public readonly tasks: Map<WorkerWithMessageChannel, number> = new Map<
-    WorkerWithMessageChannel,
-    number
-  >()
-
-  protected id: number = 0
-
+export class FixedClusterPool<Data = any, Response = any> extends AbstractPool<
+  Worker,
+  Data,
+  Response
+> {
   /**
    * @param numWorkers Number of workers for this pool.
    * @param filePath A file path with implementation of `ClusterWorker` class, relative path is fine.
    * @param opts An object with possible options for example `errorHandler`, `onlineHandler`. Default: `{ maxTasks: 1000 }`
    */
   public constructor (
-    public readonly numWorkers: number,
-    public readonly filePath: string,
-    public readonly opts: FixedClusterPoolOptions = { maxTasks: 1000 }
+    numWorkers: number,
+    filePath: string,
+    public readonly opts: ClusterPoolOptions = { maxTasks: 1000 }
   ) {
-    if (!isMaster) {
-      throw new Error('Cannot start a cluster pool from a worker!')
-    }
-    // TODO christopher 2021-02-09: Improve this check e.g. with a pattern or blank check
-    if (!this.filePath) {
-      throw new Error('Please specify a file with a worker implementation')
-    }
+    super(numWorkers, filePath, opts)
+  }
 
+  protected setupHook (): void {
     setupMaster({
       exec: this.filePath
     })
+  }
 
-    for (let i = 1; i <= this.numWorkers; i++) {
-      this.newWorker()
-    }
+  protected isMain (): boolean {
+    return isMaster
   }
 
-  public destroy (): void {
-    for (const worker of this.workers) {
-      worker.kill()
-    }
+  protected destroyWorker (worker: Worker): void {
+    worker.kill()
   }
 
-  /**
-   * Execute the task specified into the constructor with the data parameter.
-   *
-   * @param data The input for the task specified.
-   * @returns Promise that is resolved when the task is done.
-   */
-  public execute (data: Data): Promise<Response> {
-    // configure worker to handle message with the specified task
-    const worker: WorkerWithMessageChannel = this.chooseWorker()
-    // console.log('FixedClusterPool#execute choosen worker:', worker)
-    const previousWorkerIndex = this.tasks.get(worker)
-    if (previousWorkerIndex !== undefined) {
-      this.tasks.set(worker, previousWorkerIndex + 1)
-    } else {
-      throw Error('Worker could not be found in tasks map')
-    }
-    const id: number = ++this.id
-    const res: Promise<Response> = this.internalExecute(worker, id)
-    // console.log('FixedClusterPool#execute send data to worker:', worker)
-    worker.send({ data: data || {}, id: id })
-    return res
+  protected sendToWorker (worker: Worker, message: MessageValue<Data>): void {
+    worker.send(message)
   }
 
-  protected internalExecute (
-    worker: WorkerWithMessageChannel,
-    id: number
-  ): Promise<Response> {
-    return new Promise((resolve, reject) => {
-      const listener: (
-        message: MessageValue<Response>,
-        handle: SendHandle
-      ) => void = message => {
-        // console.log('FixedClusterPool#internalExecute listener:', message)
-        if (message.id === id) {
-          worker.removeListener('message', listener)
-          const previousWorkerIndex = this.tasks.get(worker)
-          if (previousWorkerIndex !== undefined) {
-            this.tasks.set(worker, previousWorkerIndex + 1)
-          } else {
-            throw Error('Worker could not be found in tasks map')
-          }
-          if (message.error) reject(message.error)
-          else resolve(message.data as Response)
-        }
-      }
-      worker.on('message', listener)
-    })
+  protected registerWorkerMessageListener (
+    port: Worker,
+    listener: (message: MessageValue<Response>) => void
+  ): void {
+    port.on('message', listener)
+  }
+
+  protected unregisterWorkerMessageListener (
+    port: Worker,
+    listener: (message: MessageValue<Response>) => void
+  ): void {
+    port.removeListener('message', listener)
   }
 
-  protected chooseWorker (): WorkerWithMessageChannel {
-    if (this.workers.length - 1 === this.nextWorker) {
-      this.nextWorker = 0
-      return this.workers[this.nextWorker]
-    } else {
-      this.nextWorker++
-      return this.workers[this.nextWorker]
-    }
+  protected newWorker (): Worker {
+    return fork(this.opts.env)
   }
 
-  protected newWorker (): WorkerWithMessageChannel {
-    const worker: WorkerWithMessageChannel = fork(this.opts.env)
-    worker.on('error', this.opts.errorHandler ?? (() => {}))
-    worker.on('online', this.opts.onlineHandler ?? (() => {}))
-    // TODO handle properly when a worker exit
-    worker.on('exit', this.opts.exitHandler ?? (() => {}))
-    this.workers.push(worker)
+  protected afterNewWorkerPushed (worker: Worker): void {
     // we will attach a listener for every task,
     // when task is completed the listener will be removed but to avoid warnings we are increasing the max listeners size
     worker.setMaxListeners(this.opts.maxTasks ?? 1000)
-    // init tasks map
-    this.tasks.set(worker, 0)
-    return worker
   }
 }
diff --git a/src/pools/pool.ts b/src/pools/pool.ts
new file mode 100644 (file)
index 0000000..0c9b232
--- /dev/null
@@ -0,0 +1,9 @@
+export interface IPool<
+  // eslint-disable-next-line @typescript-eslint/no-explicit-any
+  Data = any,
+  // eslint-disable-next-line @typescript-eslint/no-explicit-any
+  Response = any
+> {
+  destroy(): Promise<void>
+  execute(data: Data): Promise<Response>
+}
index e80276d976640c25a9fe194847289e1ba0d7db45..d6a6e1ba870f9d1938e42f0dc0f70bf844243448 100644 (file)
@@ -1,11 +1,8 @@
-import { EventEmitter } from 'events'
-import type { FixedThreadPoolOptions, WorkerWithMessageChannel } from './fixed'
+import type { MessageValue } from '../../utility-types'
+import type { PoolOptions } from '../abstract-pool'
+import type { ThreadWorkerWithMessageChannel } from './fixed'
 import { FixedThreadPool } from './fixed'
 
-class MyEmitter extends EventEmitter {}
-
-export type DynamicThreadPoolOptions = FixedThreadPoolOptions
-
 /**
  * A thread pool with a min/max number of threads, is possible to execute tasks in sync or async mode as you prefer.
  *
@@ -21,8 +18,6 @@ export class DynamicThreadPool<
   // eslint-disable-next-line @typescript-eslint/no-explicit-any
   Response = any
 > extends FixedThreadPool<Data, Response> {
-  public readonly emitter: MyEmitter
-
   /**
    * @param min Min number of threads that will be always active
    * @param max Max number of threads that will be active
@@ -30,18 +25,16 @@ export class DynamicThreadPool<
    * @param opts An object with possible options for example `errorHandler`, `onlineHandler`. Default: `{ maxTasks: 1000 }`
    */
   public constructor (
-    public readonly min: number,
+    min: number,
     public readonly max: number,
-    public readonly filename: string,
-    public readonly opts: DynamicThreadPoolOptions = { maxTasks: 1000 }
+    filename: string,
+    opts: PoolOptions<ThreadWorkerWithMessageChannel> = { maxTasks: 1000 }
   ) {
     super(min, filename, opts)
-
-    this.emitter = new MyEmitter()
   }
 
-  protected chooseWorker (): WorkerWithMessageChannel {
-    let worker: WorkerWithMessageChannel | undefined
+  protected chooseWorker (): ThreadWorkerWithMessageChannel {
+    let worker: ThreadWorkerWithMessageChannel | undefined
     for (const entry of this.tasks) {
       if (entry[1] === 0) {
         worker = entry[0]
@@ -58,11 +51,11 @@ export class DynamicThreadPool<
         return super.chooseWorker()
       }
       // all workers are busy create a new worker
-      const worker = this.newWorker()
-      worker.port2?.on('message', (message: { kill?: number }) => {
+      const worker = this.internalNewWorker()
+      worker.port2?.on('message', (message: MessageValue<Data>) => {
         if (message.kill) {
-          worker.postMessage({ kill: 1 })
-          void worker.terminate()
+          this.sendToWorker(worker, { kill: 1 })
+          void this.destroyWorker(worker)
           // clean workers from data structures
           const workerIndex = this.workers.indexOf(worker)
           this.workers.splice(workerIndex, 1)
index 36eddace5ec149c41b8daafbd4ada0b91babb32d..bed8f3a7dba8ae87ed6085df040723a93eb0e739 100644 (file)
@@ -1,28 +1,9 @@
 import { isMainThread, MessageChannel, SHARE_ENV, Worker } from 'worker_threads'
 import type { Draft, MessageValue } from '../../utility-types'
+import type { PoolOptions } from '../abstract-pool'
+import { AbstractPool } from '../abstract-pool'
 
-export type WorkerWithMessageChannel = Worker & Draft<MessageChannel>
-
-export interface FixedThreadPoolOptions {
-  /**
-   * A function that will listen for error event on each worker thread.
-   */
-  errorHandler?: (this: Worker, e: Error) => void
-  /**
-   * A function that will listen for online event on each worker thread.
-   */
-  onlineHandler?: (this: Worker) => void
-  /**
-   * A function that will listen for exit event on each worker thread.
-   */
-  exitHandler?: (this: Worker, code: number) => void
-  /**
-   * This is just to avoid not useful warnings message, is used to set `maxListeners` on event emitters (workers are event emitters).
-   *
-   * @default 1000
-   */
-  maxTasks?: number
-}
+export type ThreadWorkerWithMessageChannel = Worker & Draft<MessageChannel>
 
 /**
  * A thread pool with a static number of threads, is possible to execute tasks in sync or async mode as you prefer.
@@ -33,109 +14,64 @@ export interface FixedThreadPoolOptions {
  * @since 0.0.1
  */
 // eslint-disable-next-line @typescript-eslint/no-explicit-any
-export class FixedThreadPool<Data = any, Response = any> {
-  public readonly workers: WorkerWithMessageChannel[] = []
-  public nextWorker: number = 0
-
-  // threadId as key and an integer value
-  public readonly tasks: Map<WorkerWithMessageChannel, number> = new Map<
-    WorkerWithMessageChannel,
-    number
-  >()
-
-  protected id: number = 0
-
+export class FixedThreadPool<Data = any, Response = any> extends AbstractPool<
+  ThreadWorkerWithMessageChannel,
+  Data,
+  Response
+> {
   /**
    * @param numThreads Num of threads for this worker pool.
    * @param filePath A file path with implementation of `ThreadWorker` class, relative path is fine.
    * @param opts An object with possible options for example `errorHandler`, `onlineHandler`. Default: `{ maxTasks: 1000 }`
    */
   public constructor (
-    public readonly numThreads: number,
-    public readonly filePath: string,
-    public readonly opts: FixedThreadPoolOptions = { maxTasks: 1000 }
+    numThreads: number,
+    filePath: string,
+    opts: PoolOptions<ThreadWorkerWithMessageChannel> = { maxTasks: 1000 }
   ) {
-    if (!isMainThread) {
-      throw new Error('Cannot start a thread pool from a worker thread !!!')
-    }
-    // TODO christopher 2021-02-07: Improve this check e.g. with a pattern or blank check
-    if (!this.filePath) {
-      throw new Error('Please specify a file with a worker implementation')
-    }
+    super(numThreads, filePath, opts)
+  }
 
-    for (let i = 1; i <= this.numThreads; i++) {
-      this.newWorker()
-    }
+  protected isMain (): boolean {
+    return isMainThread
   }
 
-  public async destroy (): Promise<void> {
-    for (const worker of this.workers) {
-      await worker.terminate()
-    }
+  protected async destroyWorker (
+    worker: ThreadWorkerWithMessageChannel
+  ): Promise<void> {
+    await worker.terminate()
   }
 
-  /**
-   * Execute the task specified into the constructor with the data parameter.
-   *
-   * @param data The input for the task specified.
-   * @returns Promise that is resolved when the task is done.
-   */
-  public execute (data: Data): Promise<Response> {
-    // configure worker to handle message with the specified task
-    const worker = this.chooseWorker()
-    const previousWorkerIndex = this.tasks.get(worker)
-    if (previousWorkerIndex !== undefined) {
-      this.tasks.set(worker, previousWorkerIndex + 1)
-    } else {
-      throw Error('Worker could not be found in tasks map')
-    }
-    const id = ++this.id
-    const res = this.internalExecute(worker, id)
-    worker.postMessage({ data: data || {}, id: id })
-    return res
+  protected sendToWorker (
+    worker: ThreadWorkerWithMessageChannel,
+    message: MessageValue<Data>
+  ): void {
+    worker.postMessage(message)
   }
 
-  protected internalExecute (
-    worker: WorkerWithMessageChannel,
-    id: number
-  ): Promise<Response> {
-    return new Promise((resolve, reject) => {
-      const listener: (message: MessageValue<Response>) => void = message => {
-        if (message.id === id) {
-          worker.port2?.removeListener('message', listener)
-          const previousWorkerIndex = this.tasks.get(worker)
-          if (previousWorkerIndex !== undefined) {
-            this.tasks.set(worker, previousWorkerIndex + 1)
-          } else {
-            throw Error('Worker could not be found in tasks map')
-          }
-          if (message.error) reject(message.error)
-          else resolve(message.data as Response)
-        }
-      }
-      worker.port2?.on('message', listener)
-    })
+  protected registerWorkerMessageListener (
+    port: ThreadWorkerWithMessageChannel,
+    listener: (message: MessageValue<Response>) => void
+  ): void {
+    port.port2?.on('message', listener)
   }
 
-  protected chooseWorker (): WorkerWithMessageChannel {
-    if (this.workers.length - 1 === this.nextWorker) {
-      this.nextWorker = 0
-      return this.workers[this.nextWorker]
-    } else {
-      this.nextWorker++
-      return this.workers[this.nextWorker]
-    }
+  protected unregisterWorkerMessageListener (
+    port: ThreadWorkerWithMessageChannel,
+    listener: (message: MessageValue<Response>) => void
+  ): void {
+    port.port2?.removeListener('message', listener)
   }
 
-  protected newWorker (): WorkerWithMessageChannel {
-    const worker: WorkerWithMessageChannel = new Worker(this.filePath, {
+  protected newWorker (): ThreadWorkerWithMessageChannel {
+    return new Worker(this.filePath, {
       env: SHARE_ENV
     })
-    worker.on('error', this.opts.errorHandler ?? (() => {}))
-    worker.on('online', this.opts.onlineHandler ?? (() => {}))
-    // TODO handle properly when a thread exit
-    worker.on('exit', this.opts.exitHandler ?? (() => {}))
-    this.workers.push(worker)
+  }
+
+  protected afterNewWorkerPushed (
+    worker: ThreadWorkerWithMessageChannel
+  ): void {
     const { port1, port2 } = new MessageChannel()
     worker.postMessage({ parent: port1 }, [port1])
     worker.port1 = port1
@@ -143,8 +79,5 @@ export class FixedThreadPool<Data = any, Response = any> {
     // we will attach a listener for every task,
     // when task is completed the listener will be removed but to avoid warnings we are increasing the max listeners size
     worker.port2.setMaxListeners(this.opts.maxTasks ?? 1000)
-    // init tasks map
-    this.tasks.set(worker, 0)
-    return worker
   }
 }
diff --git a/src/worker/abstract-worker.ts b/src/worker/abstract-worker.ts
new file mode 100644 (file)
index 0000000..5e22362
--- /dev/null
@@ -0,0 +1,91 @@
+import { AsyncResource } from 'async_hooks'
+import type { MessageValue } from '../utility-types'
+import type { WorkerOptions } from './worker-options'
+
+export abstract class AbstractWorker<
+  MainWorker,
+  // eslint-disable-next-line @typescript-eslint/no-explicit-any
+  Data = any,
+  // eslint-disable-next-line @typescript-eslint/no-explicit-any
+  Response = any
+> extends AsyncResource {
+  protected readonly maxInactiveTime: number
+  protected readonly async: boolean
+  protected lastTask: number
+  protected readonly interval?: NodeJS.Timeout
+
+  /**
+   *
+   * @param type The type of async event.
+   * @param isMain
+   * @param fn
+   * @param opts
+   */
+  public constructor (
+    type: string,
+    isMain: boolean,
+    fn: (data: Data) => Response,
+    public readonly opts: WorkerOptions = {}
+  ) {
+    super(type)
+
+    this.maxInactiveTime = this.opts.maxInactiveTime ?? 1000 * 60
+    this.async = !!this.opts.async
+    this.lastTask = Date.now()
+    if (!fn) throw new Error('Fn parameter is mandatory')
+    // keep the worker active
+    if (!isMain) {
+      this.interval = setInterval(
+        this.checkAlive.bind(this),
+        this.maxInactiveTime / 2
+      )
+      this.checkAlive.bind(this)()
+    }
+  }
+
+  protected abstract getMainWorker (): MainWorker
+
+  protected abstract sendToMainWorker (message: MessageValue<Response>): void
+
+  protected checkAlive (): void {
+    if (Date.now() - this.lastTask > this.maxInactiveTime) {
+      this.sendToMainWorker({ kill: 1 })
+    }
+  }
+
+  protected handleError (e: Error | string): string {
+    return (e as unknown) as string
+  }
+
+  protected run (
+    fn: (data?: Data) => Response,
+    value: MessageValue<Data>
+  ): void {
+    try {
+      const res = fn(value.data)
+      this.sendToMainWorker({ data: res, id: value.id })
+      this.lastTask = Date.now()
+    } catch (e) {
+      const err = this.handleError(e)
+      this.sendToMainWorker({ error: err, id: value.id })
+      this.lastTask = Date.now()
+    }
+  }
+
+  protected runAsync (
+    fn: (data?: Data) => Promise<Response>,
+    value: MessageValue<Data>
+  ): void {
+    fn(value.data)
+      .then(res => {
+        this.sendToMainWorker({ data: res, id: value.id })
+        this.lastTask = Date.now()
+        return null
+      })
+      .catch(e => {
+        const err = this.handleError(e)
+        this.sendToMainWorker({ error: err, id: value.id })
+        this.lastTask = Date.now()
+      })
+  }
+}
index e750126b929ec93cb7d8e38bcfd9d754eefe6059..092dd51129089f761e06da0d8db39672396c119a 100644 (file)
@@ -1,6 +1,7 @@
-import { AsyncResource } from 'async_hooks'
+import type { Worker } from 'cluster'
 import { isMaster, worker } from 'cluster'
 import type { MessageValue } from '../utility-types'
+import { AbstractWorker } from './abstract-worker'
 import type { WorkerOptions } from './worker-options'
 
 /**
@@ -13,36 +14,18 @@ import type { WorkerOptions } from './worker-options'
  * @since 2.0.0
  */
 // eslint-disable-next-line @typescript-eslint/no-explicit-any
-export class ClusterWorker<Data = any, Response = any> extends AsyncResource {
-  protected readonly maxInactiveTime: number
-  protected readonly async: boolean
-  protected lastTask: number
-  protected readonly interval?: NodeJS.Timeout
+export class ClusterWorker<Data = any, Response = any> extends AbstractWorker<
+  Worker,
+  Data,
+  Response
+> {
+  public constructor (fn: (data: Data) => Response, opts: WorkerOptions = {}) {
+    super('worker-cluster-pool:pioardi', isMaster, fn, opts)
 
-  public constructor (
-    fn: (data: Data) => Response,
-    public readonly opts: WorkerOptions = {}
-  ) {
-    super('worker-cluster-pool:pioardi')
-
-    this.maxInactiveTime = this.opts.maxInactiveTime ?? 1000 * 60
-    this.async = !!this.opts.async
-    this.lastTask = Date.now()
-    if (!fn) throw new Error('Fn parameter is mandatory')
-    // keep the worker active
-    if (!isMaster) {
-      // console.log('ClusterWorker#constructor', 'is not master')
-      this.interval = setInterval(
-        this.checkAlive.bind(this),
-        this.maxInactiveTime / 2
-      )
-      this.checkAlive.bind(this)()
-    }
     worker.on('message', (value: MessageValue<Data>) => {
-      // console.log("cluster.on('message', value)", value)
       if (value?.data && value.id) {
         // here you will receive messages
-        // console.log('This is the main worker ' + isMaster)
+        // console.log('This is the main worker ' + isMain)
         if (this.async) {
           this.runInAsyncScope(this.runAsync.bind(this), this, fn, value)
         } else {
@@ -56,41 +39,15 @@ export class ClusterWorker<Data = any, Response = any> extends AsyncResource {
     })
   }
 
-  protected checkAlive (): void {
-    if (Date.now() - this.lastTask > this.maxInactiveTime) {
-      worker.send({ kill: 1 })
-    }
+  protected getMainWorker (): Worker {
+    return worker
   }
 
-  protected run (
-    fn: (data?: Data) => Response,
-    value: MessageValue<Data>
-  ): void {
-    try {
-      const res = fn(value.data as Data)
-      worker.send({ data: res, id: value.id })
-      this.lastTask = Date.now()
-    } catch (e) {
-      const err = e instanceof Error ? e.message : e
-      worker.send({ error: err, id: value.id })
-      this.lastTask = Date.now()
-    }
+  protected sendToMainWorker (message: MessageValue<Response>): void {
+    this.getMainWorker().send(message)
   }
 
-  protected runAsync (
-    fn: (data?: Data) => Promise<Response>,
-    value: MessageValue<Data>
-  ): void {
-    fn(value.data)
-      .then(res => {
-        worker.send({ data: res, id: value.id })
-        this.lastTask = Date.now()
-        return null
-      })
-      .catch(e => {
-        const err = e instanceof Error ? e.message : e
-        worker.send({ error: err, id: value.id })
-        this.lastTask = Date.now()
-      })
+  protected handleError (e: Error | string): string {
+    return e instanceof Error ? e.message : e
   }
 }
index d0af904bfb16d9fd2ba396684a12fd2220d3c89e..5a16026c497679a73332cc82a0fee704b5a5e72c 100644 (file)
@@ -1,6 +1,6 @@
-import { AsyncResource } from 'async_hooks'
 import { isMainThread, parentPort } from 'worker_threads'
 import type { MessageValue } from '../utility-types'
+import { AbstractWorker } from './abstract-worker'
 import type { WorkerOptions } from './worker-options'
 
 /**
@@ -13,35 +13,20 @@ import type { WorkerOptions } from './worker-options'
  * @since 0.0.1
  */
 // eslint-disable-next-line @typescript-eslint/no-explicit-any
-export class ThreadWorker<Data = any, Response = any> extends AsyncResource {
-  protected readonly maxInactiveTime: number
-  protected readonly async: boolean
-  protected lastTask: number
-  protected readonly interval?: NodeJS.Timeout
+export class ThreadWorker<Data = any, Response = any> extends AbstractWorker<
+  MessagePort,
+  Data,
+  Response
+> {
   protected parent?: MessagePort
 
-  public constructor (
-    fn: (data: Data) => Response,
-    public readonly opts: WorkerOptions = {}
-  ) {
-    super('worker-thread-pool:pioardi')
+  public constructor (fn: (data: Data) => Response, opts: WorkerOptions = {}) {
+    super('worker-thread-pool:pioardi', isMainThread, fn, opts)
 
-    this.maxInactiveTime = this.opts.maxInactiveTime ?? 1000 * 60
-    this.async = !!this.opts.async
-    this.lastTask = Date.now()
-    if (!fn) throw new Error('Fn parameter is mandatory')
-    // keep the worker active
-    if (!isMainThread) {
-      this.interval = setInterval(
-        this.checkAlive.bind(this),
-        this.maxInactiveTime / 2
-      )
-      this.checkAlive.bind(this)()
-    }
     parentPort?.on('message', (value: MessageValue<Data>) => {
       if (value?.data && value.id) {
         // here you will receive messages
-        // console.log('This is the main thread ' + isMainThread)
+        // console.log('This is the main worker ' + isMain)
         if (this.async) {
           this.runInAsyncScope(this.runAsync.bind(this), this, fn, value)
         } else {
@@ -52,46 +37,21 @@ export class ThreadWorker<Data = any, Response = any> extends AsyncResource {
         // this will be received once
         this.parent = value.parent
       } else if (value.kill) {
-        // here is time to kill this thread, just clearing the interval
+        // here is time to kill this worker, just clearing the interval
         if (this.interval) clearInterval(this.interval)
         this.emitDestroy()
       }
     })
   }
 
-  protected checkAlive (): void {
-    if (Date.now() - this.lastTask > this.maxInactiveTime) {
-      this.parent?.postMessage({ kill: 1 })
-    }
-  }
-
-  protected run (
-    fn: (data?: Data) => Response,
-    value: MessageValue<Data>
-  ): void {
-    try {
-      const res = fn(value.data)
-      this.parent?.postMessage({ data: res, id: value.id })
-      this.lastTask = Date.now()
-    } catch (e) {
-      this.parent?.postMessage({ error: e, id: value.id })
-      this.lastTask = Date.now()
+  protected getMainWorker (): MessagePort {
+    if (!this.parent) {
+      throw new Error('Parent was not set')
     }
+    return this.parent
   }
 
-  protected runAsync (
-    fn: (data?: Data) => Promise<Response>,
-    value: MessageValue<Data>
-  ): void {
-    fn(value.data)
-      .then(res => {
-        this.parent?.postMessage({ data: res, id: value.id })
-        this.lastTask = Date.now()
-        return null
-      })
-      .catch(e => {
-        this.parent?.postMessage({ error: e, id: value.id })
-        this.lastTask = Date.now()
-      })
+  protected sendToMainWorker (message: MessageValue<Response>): void {
+    this.getMainWorker().postMessage(message)
   }
 }