Provide a cluster worker pool (#92)
authorShinigami <chrissi92@hotmail.de>
Thu, 11 Feb 2021 13:57:37 +0000 (14:57 +0100)
committerGitHub <noreply@github.com>
Thu, 11 Feb 2021 13:57:37 +0000 (14:57 +0100)
Co-authored-by: aardizio <alessandroardizio94@gmail.com>
Co-authored-by: Jérôme Benoit <jerome.benoit@sap.com>
35 files changed:
benchmarks/bench.js
benchmarks/cluster/dynamic.js [new file with mode: 0644]
benchmarks/cluster/fixed.js [new file with mode: 0644]
benchmarks/cluster/worker.js [new file with mode: 0644]
benchmarks/external/workerThreadsWorker.js [moved from benchmarks/workerThreadsWorker.js with 100% similarity]
benchmarks/external/workerpoolWorker.js [moved from benchmarks/workerpoolWorker.js with 100% similarity]
benchmarks/myBench.js
benchmarks/thread/dynamic.js [new file with mode: 0644]
benchmarks/thread/fixed.js [new file with mode: 0644]
benchmarks/thread/worker.js [moved from benchmarks/threadWorker.js with 83% similarity]
package-lock.json
src/index.ts
src/pools/cluster/dynamic.ts [new file with mode: 0644]
src/pools/cluster/fixed.ts [new file with mode: 0644]
src/pools/thread/dynamic.ts [moved from src/dynamic.ts with 90% similarity]
src/pools/thread/fixed.ts [moved from src/fixed.ts with 95% similarity]
src/utility-types.ts [new file with mode: 0644]
src/worker/cluster-worker.ts [new file with mode: 0644]
src/worker/thread-worker.ts [moved from src/workers.ts with 59% similarity]
src/worker/worker-options.ts [new file with mode: 0644]
tests/pools/cluster/dynamic.test.js [new file with mode: 0644]
tests/pools/cluster/fixed.test.js [new file with mode: 0644]
tests/pools/thread/dynamic.test.js [moved from tests/dynamic.test.js with 85% similarity]
tests/pools/thread/fixed.test.js [moved from tests/fixed.test.js with 76% similarity]
tests/worker/cluster/asyncErrorWorker.js [new file with mode: 0644]
tests/worker/cluster/asyncWorker.js [new file with mode: 0644]
tests/worker/cluster/echoWorker.js [new file with mode: 0644]
tests/worker/cluster/emptyWorker.js [new file with mode: 0644]
tests/worker/cluster/errorWorker.js [new file with mode: 0644]
tests/worker/cluster/testWorker.js [new file with mode: 0644]
tests/worker/thread/asyncWorker.js [moved from tests/workers/asyncWorker.js with 79% similarity]
tests/worker/thread/echoWorker.js [moved from tests/workers/echoWorker.js with 68% similarity]
tests/worker/thread/emptyWorker.js [moved from tests/workers/emptyWorker.js with 65% similarity]
tests/worker/thread/errorWorker.js [moved from tests/workers/errorWorker.js with 70% similarity]
tests/worker/thread/testWorker.js [moved from tests/workers/testWorker.js with 83% similarity]

index 017ac20be223e0fd26faba804e54c2329dcf931c..b2091613e7909d4f920043032413014923e3e740 100644 (file)
@@ -1,79 +1,35 @@
 const Benchmark = require('benchmark')
+const { dynamicClusterTest } = require('./cluster/dynamic')
+const { fixedClusterTest } = require('./cluster/fixed')
+const { dynamicThreadTest } = require('./thread/dynamic')
+const { fixedThreadTest } = require('./thread/fixed')
+
 const suite = new Benchmark.Suite()
-const { FixedThreadPool } = require('../lib/index')
-const { DynamicThreadPool } = require('../lib/index')
-const size = 30
-const tasks = 1
 
 const LIST_FORMATTER = new Intl.ListFormat('en-US', {
   style: 'long',
   type: 'conjunction'
 })
 
-// pools
-const fixedPool = new FixedThreadPool(size, './threadWorker.js', {
-  maxTasks: 10000
-})
-const dynamicPool = new DynamicThreadPool(
-  size / 2,
-  size * 3,
-  './threadWorker.js',
-  { maxTasks: 10000 }
-)
-const workerData = { proof: 'ok' }
-
 // wait some seconds before start, my pools need to load threads !!!
 setTimeout(async () => {
   test()
 }, 3000)
 
-// fixed pool proof
-async function fixedTest () {
-  return new Promise((resolve, reject) => {
-    let executions = 0
-    for (let i = 0; i <= tasks; i++) {
-      fixedPool
-        .execute(workerData)
-        .then(res => {
-          executions++
-          if (executions === tasks) {
-            return resolve('FINISH')
-          }
-          return null
-        })
-        .catch(err => {
-          console.error(err)
-        })
-    }
-  })
-}
-
-async function dynamicTest () {
-  return new Promise((resolve, reject) => {
-    let executions = 0
-    for (let i = 0; i <= tasks; i++) {
-      dynamicPool
-        .execute(workerData)
-        .then(res => {
-          executions++
-          if (executions === tasks) {
-            return resolve('FINISH')
-          }
-          return null
-        })
-        .catch(err => console.error(err))
-    }
-  })
-}
-
 async function test () {
   // add tests
   suite
-    .add('PioardiStaticPool', async function () {
-      await fixedTest()
+    .add('Pioardi:Static:ThreadPool', async function () {
+      await fixedThreadTest()
+    })
+    .add('Pioardi:Dynamic:ThreadPool', async function () {
+      await dynamicThreadTest()
+    })
+    .add('Pioardi:Static:ClusterPool', async function () {
+      await fixedClusterTest()
     })
-    .add('PioardiDynamicPool', async function () {
-      await dynamicTest()
+    .add('Pioardi:Dynamic:ClusterPool', async function () {
+      await dynamicClusterTest()
     })
     // add listeners
     .on('cycle', function (event) {
diff --git a/benchmarks/cluster/dynamic.js b/benchmarks/cluster/dynamic.js
new file mode 100644 (file)
index 0000000..9321054
--- /dev/null
@@ -0,0 +1,34 @@
+const { DynamicClusterPool } = require('../../lib/index')
+
+const size = 30
+
+const dynamicPool = new DynamicClusterPool(
+  size / 2,
+  size * 3,
+  './benchmarks/cluster/worker.js',
+  {
+    maxTasks: 10000
+  }
+)
+
+async function dynamicClusterTest (
+  { tasks, workerData } = { tasks: 1, workerData: { proof: 'ok' } }
+) {
+  return new Promise((resolve, reject) => {
+    let executions = 0
+    for (let i = 0; i <= tasks; i++) {
+      dynamicPool
+        .execute(workerData)
+        .then(res => {
+          executions++
+          if (executions === tasks) {
+            return resolve('FINISH')
+          }
+          return null
+        })
+        .catch(err => console.error(err))
+    }
+  })
+}
+
+module.exports = { dynamicClusterTest }
diff --git a/benchmarks/cluster/fixed.js b/benchmarks/cluster/fixed.js
new file mode 100644 (file)
index 0000000..c8fe713
--- /dev/null
@@ -0,0 +1,31 @@
+const { FixedClusterPool } = require('../../lib/index')
+
+const size = 30
+
+const fixedPool = new FixedClusterPool(size, './benchmarks/cluster/worker.js', {
+  maxTasks: 10000
+})
+
+async function fixedClusterTest (
+  { tasks, workerData } = { tasks: 1, workerData: { proof: 'ok' } }
+) {
+  return new Promise((resolve, reject) => {
+    let executions = 0
+    for (let i = 0; i <= tasks; i++) {
+      fixedPool
+        .execute(workerData)
+        .then(res => {
+          executions++
+          if (executions === tasks) {
+            return resolve('FINISH')
+          }
+          return null
+        })
+        .catch(err => {
+          console.error(err)
+        })
+    }
+  })
+}
+
+module.exports = { fixedClusterTest }
diff --git a/benchmarks/cluster/worker.js b/benchmarks/cluster/worker.js
new file mode 100644 (file)
index 0000000..82408e6
--- /dev/null
@@ -0,0 +1,15 @@
+'use strict'
+const { ClusterWorker } = require('../../lib/index')
+
+function yourFunction (data) {
+  for (let i = 0; i <= 1000; i++) {
+    const o = {
+      a: i
+    }
+    JSON.stringify(o)
+  }
+  // console.log('This is the main thread ' + isMainThread)
+  return { ok: 1 }
+}
+
+module.exports = new ClusterWorker(yourFunction)
index 216b338e1777f9a74a9a825c4f53dbbd5be849db..24ccba53aee46614ed6128efe74b8637f9e010af 100644 (file)
@@ -7,18 +7,18 @@ const size = 16
 
 // pools
 const workerThreadsPool = new WorkerThreadsPool({ max: size })
-const workerPool = workerpool.pool('./workerpoolWorker.js', {
+const workerPool = workerpool.pool('./external/workerpoolWorker.js', {
   minWorkers: size / 2,
   maxWorkers: size * 3,
   workerType: 'thread'
 })
-const fixedPool = new FixedThreadPool(size, './threadWorker.js', {
+const fixedPool = new FixedThreadPool(size, './thread/worker.js', {
   maxTasks: 10000
 })
 const dynamicPool = new DynamicThreadPool(
   size / 2,
   size * 3,
-  './threadWorker.js',
+  './thread/worker.js',
   { maxTasks: 10000 }
 )
 
@@ -74,7 +74,7 @@ async function workerThreadsPoolTest () {
   for (let i = 0; i <= tasks; i++) {
     new Promise((resolve, reject) => {
       workerThreadsPool.acquire(
-        './workerThreadsWorker.js',
+        './external/workerThreadsWorker.js',
         { workerData: workerData },
         (err, worker) => {
           if (err) {
diff --git a/benchmarks/thread/dynamic.js b/benchmarks/thread/dynamic.js
new file mode 100644 (file)
index 0000000..ad98000
--- /dev/null
@@ -0,0 +1,29 @@
+const { DynamicThreadPool } = require('../../lib/index')
+
+const size = 30
+
+const dynamicPool = new DynamicThreadPool(size / 2, size * 3, './worker.js', {
+  maxTasks: 10000
+})
+
+async function dynamicThreadTest (
+  { tasks, workerData } = { tasks: 1, workerData: { proof: 'ok' } }
+) {
+  return new Promise((resolve, reject) => {
+    let executions = 0
+    for (let i = 0; i <= tasks; i++) {
+      dynamicPool
+        .execute(workerData)
+        .then(res => {
+          executions++
+          if (executions === tasks) {
+            return resolve('FINISH')
+          }
+          return null
+        })
+        .catch(err => console.error(err))
+    }
+  })
+}
+
+module.exports = { dynamicThreadTest }
diff --git a/benchmarks/thread/fixed.js b/benchmarks/thread/fixed.js
new file mode 100644 (file)
index 0000000..b596682
--- /dev/null
@@ -0,0 +1,31 @@
+const { FixedThreadPool } = require('../../lib/index')
+
+const size = 30
+
+const fixedPool = new FixedThreadPool(size, './worker.js', {
+  maxTasks: 10000
+})
+
+async function fixedThreadTest (
+  { tasks, workerData } = { tasks: 1, workerData: { proof: 'ok' } }
+) {
+  return new Promise((resolve, reject) => {
+    let executions = 0
+    for (let i = 0; i <= tasks; i++) {
+      fixedPool
+        .execute(workerData)
+        .then(res => {
+          executions++
+          if (executions === tasks) {
+            return resolve('FINISH')
+          }
+          return null
+        })
+        .catch(err => {
+          console.error(err)
+        })
+    }
+  })
+}
+
+module.exports = { fixedThreadTest }
similarity index 83%
rename from benchmarks/threadWorker.js
rename to benchmarks/thread/worker.js
index b8f974d885218f5ff9b60a593111534a0f5df640..7272a485897bee3699936c13e2e23950c470f53c 100644 (file)
@@ -1,5 +1,5 @@
 'use strict'
-const { ThreadWorker } = require('../lib/index')
+const { ThreadWorker } = require('../../lib/index')
 
 function yourFunction (data) {
   for (let i = 0; i <= 1000; i++) {
index 29383f867552e5971b676575b072e8e157b6b376..e18dc7230296f9bd567ad5a4f7a9ba3433bd18d3 100644 (file)
       }
     },
     "graceful-fs": {
-      "version": "4.2.6",
-      "resolved": "https://registry.npmjs.org/graceful-fs/-/graceful-fs-4.2.6.tgz",
-      "integrity": "sha512-nTnJ528pbqxYanhpDYsi4Rd8MAeaBA67+RZ10CM1m3bTAVFEDcd5AuA4a6W5YkGZ1iNXHzZz8T6TBKLeBuNriQ==",
+      "version": "4.2.5",
+      "resolved": "https://registry.npmjs.org/graceful-fs/-/graceful-fs-4.2.5.tgz",
+      "integrity": "sha512-kBBSQbz2K0Nyn+31j/w36fUfxkBW9/gfwRWdUY1ULReH3iokVJgddZAFcD1D0xlgTmFxJCbUkUclAlc6/IDJkw==",
       "dev": true
     },
     "graphql": {
       "integrity": "sha512-N5ZAX4/LxJmF+7wN74pUD6qAh9/wnvdQcjq9TZjevvXzSUo7bfmw91saqMjzGS2xq91/odN2dW/WOl7qQHNDGA==",
       "dev": true
     },
-    "queue-microtask": {
-      "version": "1.2.2",
-      "resolved": "https://registry.npmjs.org/queue-microtask/-/queue-microtask-1.2.2.tgz",
-      "integrity": "sha512-dB15eXv3p2jDlbOiNLyMabYg1/sXvppd8DP2J3EOCQ0AkuSXCW2tP7mnVouVLJKgUMY6yP0kcQDVpLCN13h4Xg==",
-      "dev": true
-    },
     "randombytes": {
       "version": "2.1.0",
       "resolved": "https://registry.npmjs.org/randombytes/-/randombytes-2.1.0.tgz",
       }
     },
     "run-parallel": {
-      "version": "1.2.0",
-      "resolved": "https://registry.npmjs.org/run-parallel/-/run-parallel-1.2.0.tgz",
-      "integrity": "sha512-5l4VyZR86LZ/lDxZTR6jqL8AFE2S0IFLMP26AbjsLVADxHdhB/c0GUsH+y39UfCi3dzz8OlQuPmnaJOMoDHQBA==",
-      "dev": true,
-      "requires": {
-        "queue-microtask": "^1.2.2"
-      }
+      "version": "1.1.10",
+      "resolved": "https://registry.npmjs.org/run-parallel/-/run-parallel-1.1.10.tgz",
+      "integrity": "sha512-zb/1OuZ6flOlH6tQyMPUrE3x3Ulxjlo9WIVXR4yVYi4H9UXQaeIsPbLn2R3O3vQCnDKkAl2qHiuocKKX4Tz/Sw==",
+      "dev": true
     },
     "safe-buffer": {
       "version": "5.2.1",
index 4bd61868fe4b2365632b2e2a312be274f60752aa..95ef8255d659c39e545044e05a749db6f201dad4 100644 (file)
@@ -1,12 +1,26 @@
-import { DynamicThreadPool } from './dynamic'
-import { FixedThreadPool } from './fixed'
-import { ThreadWorker } from './workers'
+import { DynamicClusterPool } from './pools/cluster/dynamic'
+import { FixedClusterPool } from './pools/cluster/fixed'
+import { DynamicThreadPool } from './pools/thread/dynamic'
+import { FixedThreadPool } from './pools/thread/fixed'
+import { ClusterWorker } from './worker/cluster-worker'
+import { ThreadWorker } from './worker/thread-worker'
 
-export { DynamicThreadPoolOptions } from './dynamic'
-export {
-  Draft,
+export type { DynamicClusterPoolOptions } from './pools/cluster/dynamic'
+export type {
+  FixedClusterPoolOptions,
+  WorkerWithMessageChannel as ClusterWorkerWithMessageChannel
+} from './pools/cluster/fixed'
+export type { DynamicThreadPoolOptions } from './pools/thread/dynamic'
+export type {
   FixedThreadPoolOptions,
-  WorkerWithMessageChannel
-} from './fixed'
-export { ThreadWorkerOptions } from './workers'
-export { FixedThreadPool, DynamicThreadPool, ThreadWorker }
+  WorkerWithMessageChannel as ThreadWorkerWithMessageChannel
+} from './pools/thread/fixed'
+export type { WorkerOptions } from './worker/worker-options'
+export {
+  FixedThreadPool,
+  FixedClusterPool,
+  DynamicClusterPool,
+  DynamicThreadPool,
+  ThreadWorker,
+  ClusterWorker
+}
diff --git a/src/pools/cluster/dynamic.ts b/src/pools/cluster/dynamic.ts
new file mode 100644 (file)
index 0000000..d4bf30f
--- /dev/null
@@ -0,0 +1,75 @@
+import { EventEmitter } from 'events'
+import type { FixedClusterPoolOptions, WorkerWithMessageChannel } from './fixed'
+import { FixedClusterPool } from './fixed'
+
+class MyEmitter extends EventEmitter {}
+
+export type DynamicClusterPoolOptions = FixedClusterPoolOptions
+
+/**
+ * A cluster pool with a min/max number of workers, is possible to execute tasks in sync or async mode as you prefer.
+ *
+ * This cluster pool will create new workers when the other ones are busy, until the max number of workers,
+ * when the max number of workers is reached, an event will be emitted, if you want to listen this event use the emitter method.
+ *
+ * @author [Christopher Quadflieg](https://github.com/Shinigami92)
+ * @since 2.0.0
+ */
+export class DynamicClusterPool<
+  // eslint-disable-next-line @typescript-eslint/no-explicit-any
+  Data = any,
+  // eslint-disable-next-line @typescript-eslint/no-explicit-any
+  Response = any
+> extends FixedClusterPool<Data, Response> {
+  public readonly emitter: MyEmitter
+
+  /**
+   * @param min Min number of workers that will be always active
+   * @param max Max number of workers that will be active
+   * @param filename A file path with implementation of `ClusterWorker` class, relative path is fine.
+   * @param opts An object with possible options for example `errorHandler`, `onlineHandler`. Default: `{ maxTasks: 1000 }`
+   */
+  public constructor (
+    public readonly min: number,
+    public readonly max: number,
+    public readonly filename: string,
+    public readonly opts: DynamicClusterPoolOptions = { maxTasks: 1000 }
+  ) {
+    super(min, filename, opts)
+
+    this.emitter = new MyEmitter()
+  }
+
+  protected chooseWorker (): WorkerWithMessageChannel {
+    let worker: WorkerWithMessageChannel | undefined
+    for (const entry of this.tasks) {
+      if (entry[1] === 0) {
+        worker = entry[0]
+        break
+      }
+    }
+
+    if (worker) {
+      // a worker is free, use it
+      return worker
+    } else {
+      if (this.workers.length === this.max) {
+        this.emitter.emit('FullPool')
+        return super.chooseWorker()
+      }
+      // all workers are busy create a new worker
+      const worker = this.newWorker()
+      worker.on('message', (message: { kill?: number }) => {
+        if (message.kill) {
+          worker.send({ kill: 1 })
+          worker.kill()
+          // clean workers from data structures
+          const workerIndex = this.workers.indexOf(worker)
+          this.workers.splice(workerIndex, 1)
+          this.tasks.delete(worker)
+        }
+      })
+      return worker
+    }
+  }
+}
diff --git a/src/pools/cluster/fixed.ts b/src/pools/cluster/fixed.ts
new file mode 100644 (file)
index 0000000..332daa4
--- /dev/null
@@ -0,0 +1,162 @@
+import type { SendHandle } from 'child_process'
+import { fork, isMaster, setupMaster, Worker } from 'cluster'
+import type { MessageValue } from '../../utility-types'
+
+export type WorkerWithMessageChannel = Worker // & Draft<MessageChannel>
+
+export interface FixedClusterPoolOptions {
+  /**
+   * A function that will listen for error event on each worker.
+   */
+  errorHandler?: (this: Worker, e: Error) => void
+  /**
+   * A function that will listen for online event on each worker.
+   */
+  onlineHandler?: (this: Worker) => void
+  /**
+   * A function that will listen for exit event on each worker.
+   */
+  exitHandler?: (this: Worker, code: number) => void
+  /**
+   * This is just to avoid not useful warnings message, is used to set `maxListeners` on event emitters (workers are event emitters).
+   *
+   * @default 1000
+   */
+  maxTasks?: number
+  /**
+   * Key/value pairs to add to worker process environment.
+   *
+   * @see https://nodejs.org/api/cluster.html#cluster_cluster_fork_env
+   */
+  // eslint-disable-next-line @typescript-eslint/no-explicit-any
+  env?: any
+}
+
+/**
+ * A cluster pool with a static number of workers, is possible to execute tasks in sync or async mode as you prefer.
+ *
+ * This pool will select the worker in a round robin fashion.
+ *
+ * @author [Christopher Quadflieg](https://github.com/Shinigami92)
+ * @since 2.0.0
+ */
+// eslint-disable-next-line @typescript-eslint/no-explicit-any
+export class FixedClusterPool<Data = any, Response = any> {
+  public readonly workers: WorkerWithMessageChannel[] = []
+  public nextWorker: number = 0
+
+  // workerId as key and an integer value
+  public readonly tasks: Map<WorkerWithMessageChannel, number> = new Map<
+    WorkerWithMessageChannel,
+    number
+  >()
+
+  protected id: number = 0
+
+  /**
+   * @param numWorkers Number of workers for this pool.
+   * @param filePath A file path with implementation of `ClusterWorker` class, relative path is fine.
+   * @param opts An object with possible options for example `errorHandler`, `onlineHandler`. Default: `{ maxTasks: 1000 }`
+   */
+  public constructor (
+    public readonly numWorkers: number,
+    public readonly filePath: string,
+    public readonly opts: FixedClusterPoolOptions = { maxTasks: 1000 }
+  ) {
+    if (!isMaster) {
+      throw new Error('Cannot start a cluster pool from a worker!')
+    }
+    // TODO christopher 2021-02-09: Improve this check e.g. with a pattern or blank check
+    if (!this.filePath) {
+      throw new Error('Please specify a file with a worker implementation')
+    }
+
+    setupMaster({
+      exec: this.filePath
+    })
+
+    for (let i = 1; i <= this.numWorkers; i++) {
+      this.newWorker()
+    }
+  }
+
+  public destroy (): void {
+    for (const worker of this.workers) {
+      worker.kill()
+    }
+  }
+
+  /**
+   * Execute the task specified into the constructor with the data parameter.
+   *
+   * @param data The input for the task specified.
+   * @returns Promise that is resolved when the task is done.
+   */
+  public execute (data: Data): Promise<Response> {
+    // configure worker to handle message with the specified task
+    const worker: WorkerWithMessageChannel = this.chooseWorker()
+    // console.log('FixedClusterPool#execute choosen worker:', worker)
+    const previousWorkerIndex = this.tasks.get(worker)
+    if (previousWorkerIndex !== undefined) {
+      this.tasks.set(worker, previousWorkerIndex + 1)
+    } else {
+      throw Error('Worker could not be found in tasks map')
+    }
+    const id: number = ++this.id
+    const res: Promise<Response> = this.internalExecute(worker, id)
+    // console.log('FixedClusterPool#execute send data to worker:', worker)
+    worker.send({ data: data || {}, id: id })
+    return res
+  }
+
+  protected internalExecute (
+    worker: WorkerWithMessageChannel,
+    id: number
+  ): Promise<Response> {
+    return new Promise((resolve, reject) => {
+      const listener: (
+        message: MessageValue<Response>,
+        handle: SendHandle
+      ) => void = message => {
+        // console.log('FixedClusterPool#internalExecute listener:', message)
+        if (message.id === id) {
+          worker.removeListener('message', listener)
+          const previousWorkerIndex = this.tasks.get(worker)
+          if (previousWorkerIndex !== undefined) {
+            this.tasks.set(worker, previousWorkerIndex + 1)
+          } else {
+            throw Error('Worker could not be found in tasks map')
+          }
+          if (message.error) reject(message.error)
+          else resolve(message.data as Response)
+        }
+      }
+      worker.on('message', listener)
+    })
+  }
+
+  protected chooseWorker (): WorkerWithMessageChannel {
+    if (this.workers.length - 1 === this.nextWorker) {
+      this.nextWorker = 0
+      return this.workers[this.nextWorker]
+    } else {
+      this.nextWorker++
+      return this.workers[this.nextWorker]
+    }
+  }
+
+  protected newWorker (): WorkerWithMessageChannel {
+    const worker: WorkerWithMessageChannel = fork(this.opts.env)
+    worker.on('error', this.opts.errorHandler ?? (() => {}))
+    worker.on('online', this.opts.onlineHandler ?? (() => {}))
+    // TODO handle properly when a worker exit
+    worker.on('exit', this.opts.exitHandler ?? (() => {}))
+    this.workers.push(worker)
+    // we will attach a listener for every task,
+    // when task is completed the listener will be removed but to avoid warnings we are increasing the max listeners size
+    worker.setMaxListeners(this.opts.maxTasks ?? 1000)
+    // init tasks map
+    this.tasks.set(worker, 0)
+    return worker
+  }
+}
similarity index 90%
rename from src/dynamic.ts
rename to src/pools/thread/dynamic.ts
index b4c6bddf30e8e9c964cb1038a05a7788d6bce589..e80276d976640c25a9fe194847289e1ba0d7db45 100644 (file)
@@ -1,9 +1,6 @@
 import { EventEmitter } from 'events'
-import {
-  FixedThreadPool,
-  FixedThreadPoolOptions,
-  WorkerWithMessageChannel
-} from './fixed'
+import type { FixedThreadPoolOptions, WorkerWithMessageChannel } from './fixed'
+import { FixedThreadPool } from './fixed'
 
 class MyEmitter extends EventEmitter {}
 
@@ -18,12 +15,12 @@ export type DynamicThreadPoolOptions = FixedThreadPoolOptions
  * @author [Alessandro Pio Ardizio](https://github.com/pioardi)
  * @since 0.0.1
  */
-/* eslint-disable @typescript-eslint/no-explicit-any */
 export class DynamicThreadPool<
+  // eslint-disable-next-line @typescript-eslint/no-explicit-any
   Data = any,
+  // eslint-disable-next-line @typescript-eslint/no-explicit-any
   Response = any
 > extends FixedThreadPool<Data, Response> {
-  /* eslint-enable @typescript-eslint/no-explicit-any */
   public readonly emitter: MyEmitter
 
   /**
similarity index 95%
rename from src/fixed.ts
rename to src/pools/thread/fixed.ts
index d2c6ba0c2fd542c8dd928d08ad6f54902224734b..36eddace5ec149c41b8daafbd4ada0b91babb32d 100644 (file)
@@ -1,6 +1,5 @@
 import { isMainThread, MessageChannel, SHARE_ENV, Worker } from 'worker_threads'
-
-export type Draft<T> = { -readonly [P in keyof T]?: T[P] }
+import type { Draft, MessageValue } from '../../utility-types'
 
 export type WorkerWithMessageChannel = Worker & Draft<MessageChannel>
 
@@ -101,11 +100,7 @@ export class FixedThreadPool<Data = any, Response = any> {
     id: number
   ): Promise<Response> {
     return new Promise((resolve, reject) => {
-      const listener = (message: {
-        id: number
-        error?: string
-        data: Response
-      }): void => {
+      const listener: (message: MessageValue<Response>) => void = message => {
         if (message.id === id) {
           worker.port2?.removeListener('message', listener)
           const previousWorkerIndex = this.tasks.get(worker)
@@ -115,7 +110,7 @@ export class FixedThreadPool<Data = any, Response = any> {
             throw Error('Worker could not be found in tasks map')
           }
           if (message.error) reject(message.error)
-          else resolve(message.data)
+          else resolve(message.data as Response)
         }
       }
       worker.port2?.on('message', listener)
diff --git a/src/utility-types.ts b/src/utility-types.ts
new file mode 100644 (file)
index 0000000..5a3ef4c
--- /dev/null
@@ -0,0 +1,9 @@
+export type Draft<T> = { -readonly [P in keyof T]?: T[P] }
+
+export interface MessageValue<Data> {
+  readonly data?: Data
+  readonly id?: number
+  readonly kill?: number
+  readonly error?: string
+  readonly parent?: MessagePort
+}
diff --git a/src/worker/cluster-worker.ts b/src/worker/cluster-worker.ts
new file mode 100644 (file)
index 0000000..e750126
--- /dev/null
@@ -0,0 +1,96 @@
+import { AsyncResource } from 'async_hooks'
+import { isMaster, worker } from 'cluster'
+import type { MessageValue } from '../utility-types'
+import type { WorkerOptions } from './worker-options'
+
+/**
+ * An example worker that will be always alive, you just need to **extend** this class if you want a static pool.
+ *
+ * When this worker is inactive for more than 1 minute, it will send this info to the main worker,
+ * if you are using DynamicClusterPool, the workers created after will be killed, the min num of worker will be guaranteed.
+ *
+ * @author [Christopher Quadflieg](https://github.com/Shinigami92)
+ * @since 2.0.0
+ */
+// eslint-disable-next-line @typescript-eslint/no-explicit-any
+export class ClusterWorker<Data = any, Response = any> extends AsyncResource {
+  protected readonly maxInactiveTime: number
+  protected readonly async: boolean
+  protected lastTask: number
+  protected readonly interval?: NodeJS.Timeout
+
+  public constructor (
+    fn: (data: Data) => Response,
+    public readonly opts: WorkerOptions = {}
+  ) {
+    super('worker-cluster-pool:pioardi')
+
+    this.maxInactiveTime = this.opts.maxInactiveTime ?? 1000 * 60
+    this.async = !!this.opts.async
+    this.lastTask = Date.now()
+    if (!fn) throw new Error('Fn parameter is mandatory')
+    // keep the worker active
+    if (!isMaster) {
+      // console.log('ClusterWorker#constructor', 'is not master')
+      this.interval = setInterval(
+        this.checkAlive.bind(this),
+        this.maxInactiveTime / 2
+      )
+      this.checkAlive.bind(this)()
+    }
+    worker.on('message', (value: MessageValue<Data>) => {
+      // console.log("cluster.on('message', value)", value)
+      if (value?.data && value.id) {
+        // here you will receive messages
+        // console.log('This is the main worker ' + isMaster)
+        if (this.async) {
+          this.runInAsyncScope(this.runAsync.bind(this), this, fn, value)
+        } else {
+          this.runInAsyncScope(this.run.bind(this), this, fn, value)
+        }
+      } else if (value.kill) {
+        // here is time to kill this worker, just clearing the interval
+        if (this.interval) clearInterval(this.interval)
+        this.emitDestroy()
+      }
+    })
+  }
+
+  protected checkAlive (): void {
+    if (Date.now() - this.lastTask > this.maxInactiveTime) {
+      worker.send({ kill: 1 })
+    }
+  }
+
+  protected run (
+    fn: (data?: Data) => Response,
+    value: MessageValue<Data>
+  ): void {
+    try {
+      const res = fn(value.data as Data)
+      worker.send({ data: res, id: value.id })
+      this.lastTask = Date.now()
+    } catch (e) {
+      const err = e instanceof Error ? e.message : e
+      worker.send({ error: err, id: value.id })
+      this.lastTask = Date.now()
+    }
+  }
+
+  protected runAsync (
+    fn: (data?: Data) => Promise<Response>,
+    value: MessageValue<Data>
+  ): void {
+    fn(value.data)
+      .then(res => {
+        worker.send({ data: res, id: value.id })
+        this.lastTask = Date.now()
+        return null
+      })
+      .catch(e => {
+        const err = e instanceof Error ? e.message : e
+        worker.send({ error: err, id: value.id })
+        this.lastTask = Date.now()
+      })
+  }
+}
similarity index 59%
rename from src/workers.ts
rename to src/worker/thread-worker.ts
index dc4f319419a7d2b31b9402e6c8e5ad59590355ca..d0af904bfb16d9fd2ba396684a12fd2220d3c89e 100644 (file)
@@ -1,20 +1,7 @@
 import { AsyncResource } from 'async_hooks'
 import { isMainThread, parentPort } from 'worker_threads'
-
-export interface ThreadWorkerOptions {
-  /**
-   * Max time to wait tasks to work on (in ms), after this period the new worker threads will die.
-   *
-   * @default 60.000 ms
-   */
-  maxInactiveTime?: number
-  /**
-   * `true` if your function contains async pieces, else `false`.
-   *
-   * @default false
-   */
-  async?: boolean
-}
+import type { MessageValue } from '../utility-types'
+import type { WorkerOptions } from './worker-options'
 
 /**
  * An example worker that will be always alive, you just need to **extend** this class if you want a static pool.
@@ -35,7 +22,7 @@ export class ThreadWorker<Data = any, Response = any> extends AsyncResource {
 
   public constructor (
     fn: (data: Data) => Response,
-    public readonly opts: ThreadWorkerOptions = {}
+    public readonly opts: WorkerOptions = {}
   ) {
     super('worker-thread-pool:pioardi')
 
@@ -51,33 +38,25 @@ export class ThreadWorker<Data = any, Response = any> extends AsyncResource {
       )
       this.checkAlive.bind(this)()
     }
-    parentPort?.on(
-      'message',
-      (value: {
-        data?: Response
-        id?: number
-        parent?: MessagePort
-        kill?: number
-      }) => {
-        if (value?.data && value.id) {
-          // here you will receive messages
-          // console.log('This is the main thread ' + isMainThread)
-          if (this.async) {
-            this.runInAsyncScope(this.runAsync.bind(this), this, fn, value)
-          } else {
-            this.runInAsyncScope(this.run.bind(this), this, fn, value)
-          }
-        } else if (value.parent) {
-          // save the port to communicate with the main thread
-          // this will be received once
-          this.parent = value.parent
-        } else if (value.kill) {
-          // here is time to kill this thread, just clearing the interval
-          if (this.interval) clearInterval(this.interval)
-          this.emitDestroy()
+    parentPort?.on('message', (value: MessageValue<Data>) => {
+      if (value?.data && value.id) {
+        // here you will receive messages
+        // console.log('This is the main thread ' + isMainThread)
+        if (this.async) {
+          this.runInAsyncScope(this.runAsync.bind(this), this, fn, value)
+        } else {
+          this.runInAsyncScope(this.run.bind(this), this, fn, value)
         }
+      } else if (value.parent) {
+        // save the port to communicate with the main thread
+        // this will be received once
+        this.parent = value.parent
+      } else if (value.kill) {
+        // here is time to kill this thread, just clearing the interval
+        if (this.interval) clearInterval(this.interval)
+        this.emitDestroy()
       }
-    )
+    })
   }
 
   protected checkAlive (): void {
@@ -87,8 +66,8 @@ export class ThreadWorker<Data = any, Response = any> extends AsyncResource {
   }
 
   protected run (
-    fn: (data: Data) => Response,
-    value: { readonly data: Data; readonly id: number }
+    fn: (data?: Data) => Response,
+    value: MessageValue<Data>
   ): void {
     try {
       const res = fn(value.data)
@@ -101,8 +80,8 @@ export class ThreadWorker<Data = any, Response = any> extends AsyncResource {
   }
 
   protected runAsync (
-    fn: (data: Data) => Promise<Response>,
-    value: { readonly data: Data; readonly id: number }
+    fn: (data?: Data) => Promise<Response>,
+    value: MessageValue<Data>
   ): void {
     fn(value.data)
       .then(res => {
diff --git a/src/worker/worker-options.ts b/src/worker/worker-options.ts
new file mode 100644 (file)
index 0000000..8612990
--- /dev/null
@@ -0,0 +1,14 @@
+export interface WorkerOptions {
+  /**
+   * Max time to wait tasks to work on (in ms), after this period the new worker threads will die.
+   *
+   * @default 60.000 ms
+   */
+  maxInactiveTime?: number
+  /**
+   * `true` if your function contains async pieces, else `false`.
+   *
+   * @default false
+   */
+  async?: boolean
+}
diff --git a/tests/pools/cluster/dynamic.test.js b/tests/pools/cluster/dynamic.test.js
new file mode 100644 (file)
index 0000000..6a82b05
--- /dev/null
@@ -0,0 +1,90 @@
+const expect = require('expect')
+const { DynamicClusterPool } = require('../../../lib/index')
+const min = 1
+const max = 3
+const pool = new DynamicClusterPool(
+  min,
+  max,
+  './tests/worker/cluster/testWorker.js',
+  {
+    errorHandler: e => console.error(e),
+    onlineHandler: () => console.log('worker is online')
+  }
+)
+
+describe('Dynamic cluster pool test suite ', () => {
+  it('Verify that the function is executed in a worker cluster', async () => {
+    const result = await pool.execute({ test: 'test' })
+    expect(result).toBeDefined()
+    expect(result).toBeFalsy()
+  })
+
+  it('Verify that new workers are created when required, max size is not exceeded and that after a while new workers will die', async () => {
+    const promises = []
+    let closedWorkers = 0
+    let fullPool = 0
+    pool.emitter.on('FullPool', () => fullPool++)
+    for (let i = 0; i < max * 2; i++) {
+      promises.push(pool.execute({ test: 'test' }))
+    }
+    expect(pool.workers.length).toBeLessThanOrEqual(max)
+    expect(pool.workers.length).toBeGreaterThan(min)
+    pool.workers.forEach(w => {
+      w.on('exit', () => {
+        closedWorkers++
+      })
+    })
+    expect(fullPool > 1).toBeTruthy()
+    await new Promise(resolve => setTimeout(resolve, 5000))
+    expect(closedWorkers).toBe(max - min)
+  })
+
+  it('Verify scale worker up and down is working', async () => {
+    expect(pool.workers.length).toBe(min)
+    for (let i = 0; i < max * 10; i++) {
+      pool.execute({ test: 'test' })
+    }
+    expect(pool.workers.length).toBeGreaterThan(min)
+    await new Promise(resolve => setTimeout(resolve, 3000))
+    expect(pool.workers.length).toBe(min)
+    for (let i = 0; i < max * 10; i++) {
+      pool.execute({ test: 'test' })
+    }
+    expect(pool.workers.length).toBeGreaterThan(min)
+    await new Promise(resolve => setTimeout(resolve, 2000))
+    expect(pool.workers.length).toBe(min)
+  })
+  it('Shutdown test', async () => {
+    let closedWorkers = 0
+    pool.workers.forEach(w => {
+      w.on('exit', () => {
+        closedWorkers++
+      })
+    })
+    pool.destroy()
+    await new Promise(resolve => setTimeout(resolve, 1000))
+    expect(closedWorkers).toBe(min)
+  })
+
+  it('Validations test', () => {
+    let error
+    try {
+      const pool1 = new DynamicClusterPool()
+      console.log(pool1)
+    } catch (e) {
+      error = e
+    }
+    expect(error).toBeTruthy()
+    expect(error.message).toBeTruthy()
+  })
+
+  it('Should work even without opts in input', async () => {
+    const pool1 = new DynamicClusterPool(
+      1,
+      1,
+      './tests/worker/cluster/testWorker.js'
+    )
+    const res = await pool1.execute({ test: 'test' })
+    expect(res).toBeFalsy()
+  })
+})
diff --git a/tests/pools/cluster/fixed.test.js b/tests/pools/cluster/fixed.test.js
new file mode 100644 (file)
index 0000000..7e319ac
--- /dev/null
@@ -0,0 +1,140 @@
+const expect = require('expect')
+const { FixedClusterPool } = require('../../../lib/index')
+const numWorkers = 10
+const pool = new FixedClusterPool(
+  numWorkers,
+  './tests/worker/cluster/testWorker.js',
+  {
+    errorHandler: e => console.error(e),
+    onlineHandler: () => console.log('worker is online')
+  }
+)
+const emptyPool = new FixedClusterPool(
+  1,
+  './tests/worker/cluster/emptyWorker.js'
+)
+const echoPool = new FixedClusterPool(1, './tests/worker/cluster/echoWorker.js')
+const errorPool = new FixedClusterPool(
+  1,
+  './tests/worker/cluster/errorWorker.js',
+  {
+    errorHandler: e => console.error(e),
+    onlineHandler: () => console.log('worker is online')
+  }
+)
+
+const asyncErrorPool = new FixedClusterPool(
+  1,
+  './tests/worker/cluster/asyncErrorWorker.js',
+  {
+    errorHandler: e => console.error(e),
+    onlineHandler: () => console.log('worker is online')
+  }
+)
+const asyncPool = new FixedClusterPool(
+  1,
+  './tests/worker/cluster/asyncWorker.js'
+)
+
+describe('Fixed cluster pool test suite ', () => {
+  it('Choose worker round robin test', async () => {
+    const results = new Set()
+    for (let i = 0; i < numWorkers; i++) {
+      results.add(pool.chooseWorker().id)
+    }
+    expect(results.size).toBe(numWorkers)
+  })
+
+  it('Verify that the function is executed in a worker cluster', async () => {
+    const result = await pool.execute({ test: 'test' })
+    expect(result).toBeDefined()
+    expect(result).toBeFalsy()
+  })
+
+  it('Verify that is possible to invoke the execute method without input', async () => {
+    const result = await pool.execute()
+    expect(result).toBeDefined()
+    expect(result).toBeFalsy()
+  })
+
+  it('Verify that is possible to have a worker that return undefined', async () => {
+    const result = await emptyPool.execute()
+    expect(result).toBeFalsy()
+  })
+
+  it('Verify that data are sent to the worker correctly', async () => {
+    const data = { f: 10 }
+    const result = await echoPool.execute(data)
+    expect(result).toBeTruthy()
+    expect(result.f).toBe(data.f)
+  })
+
+  it('Verify that error handling is working properly:sync', async () => {
+    const data = { f: 10 }
+    let inError
+    try {
+      await errorPool.execute(data)
+    } catch (e) {
+      inError = e
+    }
+    expect(inError).toBeDefined()
+    expect(typeof inError === 'string').toBeTruthy()
+    expect(inError).toBe('Error Message from ClusterWorker')
+  })
+
+  it('Verify that error handling is working properly:async', async () => {
+    const data = { f: 10 }
+    let inError
+    try {
+      await asyncErrorPool.execute(data)
+    } catch (e) {
+      inError = e
+    }
+    expect(inError).toBeDefined()
+    expect(typeof inError === 'string').toBeTruthy()
+    expect(inError).toBe('Error Message from ClusterWorker:async')
+  })
+
+  it('Verify that async function is working properly', async () => {
+    const data = { f: 10 }
+    const startTime = new Date().getTime()
+    const result = await asyncPool.execute(data)
+    const usedTime = new Date().getTime() - startTime
+    expect(result).toBeTruthy()
+    expect(result.f).toBe(data.f)
+    expect(usedTime).toBeGreaterThanOrEqual(2000)
+  })
+
+  it('Shutdown test', async () => {
+    let closedWorkers = 0
+    pool.workers.forEach(w => {
+      w.on('exit', () => {
+        closedWorkers++
+      })
+    })
+    pool.destroy()
+    await new Promise(resolve => setTimeout(resolve, 200))
+    expect(closedWorkers).toBe(numWorkers)
+  })
+
+  it('Validations test', () => {
+    let error
+    try {
+      const pool1 = new FixedClusterPool()
+      console.log(pool1)
+    } catch (e) {
+      error = e
+    }
+    expect(error).toBeTruthy()
+    expect(error.message).toBeTruthy()
+  })
+
+  it('Should work even without opts in input', async () => {
+    const pool1 = new FixedClusterPool(
+      1,
+      './tests/worker/cluster/testWorker.js'
+    )
+    const res = await pool1.execute({ test: 'test' })
+    expect(res).toBeFalsy()
+  })
+})
similarity index 85%
rename from tests/dynamic.test.js
rename to tests/pools/thread/dynamic.test.js
index 5b16a361c173a726c5479c4a2da3565882fd53cf..98a0dd0f0d180076f4a87a17f7e9bbe1ee050310 100644 (file)
@@ -1,11 +1,16 @@
 const expect = require('expect')
-const { DynamicThreadPool } = require('../lib/dynamic')
+const { DynamicThreadPool } = require('../../../lib/index')
 const min = 1
 const max = 3
-const pool = new DynamicThreadPool(min, max, './tests/workers/testWorker.js', {
-  errorHandler: e => console.error(e),
-  onlineHandler: () => console.log('worker is online')
-})
+const pool = new DynamicThreadPool(
+  min,
+  max,
+  './tests/worker/thread/testWorker.js',
+  {
+    errorHandler: e => console.error(e),
+    onlineHandler: () => console.log('worker is online')
+  }
+)
 
 describe('Dynamic thread pool test suite ', () => {
   it('Verify that the function is executed in a worker thread', async () => {
@@ -72,7 +77,11 @@ describe('Dynamic thread pool test suite ', () => {
   })
 
   it('Should work even without opts in input', async () => {
-    const pool1 = new DynamicThreadPool(1, 1, './tests/workers/testWorker.js')
+    const pool1 = new DynamicThreadPool(
+      1,
+      1,
+      './tests/worker/thread/testWorker.js'
+    )
     const res = await pool1.execute({ test: 'test' })
     expect(res).toBeFalsy()
   })
similarity index 76%
rename from tests/fixed.test.js
rename to tests/pools/thread/fixed.test.js
index b8f8726450b9b730484b6a61dc2f535414e6f8de..b0cf9e576517dfe9179747ed734f38544ea2c107 100644 (file)
@@ -1,17 +1,25 @@
 const expect = require('expect')
-const { FixedThreadPool } = require('../lib/fixed')
+const { FixedThreadPool } = require('../../../lib/index')
 const numThreads = 10
-const pool = new FixedThreadPool(numThreads, './tests/workers/testWorker.js', {
-  errorHandler: e => console.error(e),
-  onlineHandler: () => console.log('worker is online')
-})
-const emptyPool = new FixedThreadPool(1, './tests/workers/emptyWorker.js')
-const echoPool = new FixedThreadPool(1, './tests/workers/echoWorker.js')
-const errorPool = new FixedThreadPool(1, './tests/workers/errorWorker.js', {
-  errorHandler: e => console.error(e),
-  onlineHandler: () => console.log('worker is online')
-})
-const asyncPool = new FixedThreadPool(1, './tests/workers/asyncWorker.js')
+const pool = new FixedThreadPool(
+  numThreads,
+  './tests/worker/thread/testWorker.js',
+  {
+    errorHandler: e => console.error(e),
+    onlineHandler: () => console.log('worker is online')
+  }
+)
+const emptyPool = new FixedThreadPool(1, './tests/worker/thread/emptyWorker.js')
+const echoPool = new FixedThreadPool(1, './tests/worker/thread/echoWorker.js')
+const errorPool = new FixedThreadPool(
+  1,
+  './tests/worker/thread/errorWorker.js',
+  {
+    errorHandler: e => console.error(e),
+    onlineHandler: () => console.log('worker is online')
+  }
+)
+const asyncPool = new FixedThreadPool(1, './tests/worker/thread/asyncWorker.js')
 
 describe('Fixed thread pool test suite ', () => {
   it('Choose worker round robin test', async () => {
@@ -93,7 +101,7 @@ describe('Fixed thread pool test suite ', () => {
   })
 
   it('Should work even without opts in input', async () => {
-    const pool1 = new FixedThreadPool(1, './tests/workers/testWorker.js')
+    const pool1 = new FixedThreadPool(1, './tests/worker/thread/testWorker.js')
     const res = await pool1.execute({ test: 'test' })
     expect(res).toBeFalsy()
   })
diff --git a/tests/worker/cluster/asyncErrorWorker.js b/tests/worker/cluster/asyncErrorWorker.js
new file mode 100644 (file)
index 0000000..2476853
--- /dev/null
@@ -0,0 +1,16 @@
+'use strict'
+const { ClusterWorker } = require('../../../lib/index')
+
+async function error (data) {
+  return new Promise((resolve, reject) => {
+    setTimeout(
+      () => reject(new Error('Error Message from ClusterWorker:async')),
+      2000
+    )
+  })
+}
+
+module.exports = new ClusterWorker(error, {
+  maxInactiveTime: 500,
+  async: true
+})
diff --git a/tests/worker/cluster/asyncWorker.js b/tests/worker/cluster/asyncWorker.js
new file mode 100644 (file)
index 0000000..975b0b3
--- /dev/null
@@ -0,0 +1,10 @@
+'use strict'
+const { ClusterWorker } = require('../../../lib/index')
+
+async function sleep (data) {
+  return new Promise((resolve, reject) => {
+    setTimeout(() => resolve(data), 2000)
+  })
+}
+
+module.exports = new ClusterWorker(sleep, { maxInactiveTime: 500, async: true })
diff --git a/tests/worker/cluster/echoWorker.js b/tests/worker/cluster/echoWorker.js
new file mode 100644 (file)
index 0000000..6c77bcc
--- /dev/null
@@ -0,0 +1,8 @@
+'use strict'
+const { ClusterWorker } = require('../../../lib/index')
+
+function echo (data) {
+  return data
+}
+
+module.exports = new ClusterWorker(echo, { maxInactiveTime: 500 })
diff --git a/tests/worker/cluster/emptyWorker.js b/tests/worker/cluster/emptyWorker.js
new file mode 100644 (file)
index 0000000..62c8e2b
--- /dev/null
@@ -0,0 +1,6 @@
+'use strict'
+const { ClusterWorker } = require('../../../lib/index')
+
+function test (data) {}
+
+module.exports = new ClusterWorker(test, { maxInactiveTime: 500 })
diff --git a/tests/worker/cluster/errorWorker.js b/tests/worker/cluster/errorWorker.js
new file mode 100644 (file)
index 0000000..87df925
--- /dev/null
@@ -0,0 +1,11 @@
+'use strict'
+const { ClusterWorker } = require('../../../lib/index')
+
+function error (data) {
+  throw new Error('Error Message from ClusterWorker')
+}
+
+module.exports = new ClusterWorker(error, {
+  maxInactiveTime: 500,
+  async: false
+})
diff --git a/tests/worker/cluster/testWorker.js b/tests/worker/cluster/testWorker.js
new file mode 100644 (file)
index 0000000..fd8c560
--- /dev/null
@@ -0,0 +1,15 @@
+'use strict'
+const { ClusterWorker } = require('../../../lib/index')
+const cluster = require('cluster')
+
+function test (data) {
+  for (let i = 0; i <= 50; i++) {
+    const o = {
+      a: i
+    }
+    JSON.stringify(o)
+  }
+  return cluster.isMaster
+}
+
+module.exports = new ClusterWorker(test, { maxInactiveTime: 500 })
similarity index 79%
rename from tests/workers/asyncWorker.js
rename to tests/worker/thread/asyncWorker.js
index 098774e09936424a5946e20f2b98830ba78afd88..25401cfb14dea2ff9fed837f138f349218ff7b34 100644 (file)
@@ -1,5 +1,5 @@
 'use strict'
-const { ThreadWorker } = require('../../lib/workers')
+const { ThreadWorker } = require('../../../lib/index')
 
 async function sleep (data) {
   return new Promise((resolve, reject) => {
similarity index 68%
rename from tests/workers/echoWorker.js
rename to tests/worker/thread/echoWorker.js
index 1026a369199fabf767532e463d436794f502ef6d..006bf97cd390f59f2c5bd37c28a2bfbb307a889b 100644 (file)
@@ -1,5 +1,5 @@
 'use strict'
-const { ThreadWorker } = require('../../lib/workers')
+const { ThreadWorker } = require('../../../lib/index')
 
 function echo (data) {
   return data
similarity index 65%
rename from tests/workers/emptyWorker.js
rename to tests/worker/thread/emptyWorker.js
index 4c7dce51cec05f2b8bce0c25cf36dc732f598618..69a83a7710fed25eeae541c9f1b83b737d135664 100644 (file)
@@ -1,5 +1,5 @@
 'use strict'
-const { ThreadWorker } = require('../../lib/workers')
+const { ThreadWorker } = require('../../../lib/index')
 
 function test (data) {}
 
similarity index 70%
rename from tests/workers/errorWorker.js
rename to tests/worker/thread/errorWorker.js
index c9b4f9635e3465f5c3de964a659ded7dceb838bd..63a27513a74638aafbfabc08f04c6dec01f46ea9 100644 (file)
@@ -1,5 +1,5 @@
 'use strict'
-const { ThreadWorker } = require('../../lib/workers')
+const { ThreadWorker } = require('../../../lib/index')
 
 function error (data) {
   throw new Error(data)
similarity index 83%
rename from tests/workers/testWorker.js
rename to tests/worker/thread/testWorker.js
index 5c436a14ab1ae811006f83555b89146e33e03e16..3556da01f8051960125d833ec4b38e7196f6ad94 100644 (file)
@@ -1,5 +1,5 @@
 'use strict'
-const { ThreadWorker } = require('../../lib/workers')
+const { ThreadWorker } = require('../../../lib/index')
 const { isMainThread } = require('worker_threads')
 
 function test (data) {