De-duplicate code for workers (#154)
authorShinigami <chrissi92@hotmail.de>
Sun, 14 Feb 2021 17:18:34 +0000 (18:18 +0100)
committerGitHub <noreply@github.com>
Sun, 14 Feb 2021 17:18:34 +0000 (18:18 +0100)
src/utility-types.ts
src/worker/abstract-worker.ts
src/worker/cluster-worker.ts
src/worker/thread-worker.ts

index aa4d058cce142ffa5214c8d2ce2965e428309d14..eb3f9727369f837d9cd36d5bd6b7dd69c9d5f0c3 100644 (file)
@@ -1,3 +1,6 @@
+import type { Worker } from 'cluster'
+import type { MessagePort } from 'worker_threads'
+
 /**
  * Make all properties in T non-readonly
  */
@@ -24,7 +27,10 @@ export type JSONArray = Array<JSONValue>
 /**
  * Message object that is passed between worker and main worker.
  */
-export interface MessageValue<Data = unknown> {
+export interface MessageValue<
+  Data = unknown,
+  MainWorker extends Worker | MessagePort | unknown = unknown
+> {
   /**
    * Input data that will be passed to the worker.
    */
@@ -46,5 +52,5 @@ export interface MessageValue<Data = unknown> {
    *
    * _Only for internal use_
    */
-  readonly parent?: MessagePort
+  readonly parent?: MainWorker
 }
index 779088c56429780631385f78842dd6b8f01d336b..34ab964c68d17c873b13149d2b228f570a9a6595 100644 (file)
@@ -1,4 +1,6 @@
 import { AsyncResource } from 'async_hooks'
+import type { Worker } from 'cluster'
+import type { MessagePort } from 'worker_threads'
 import type { MessageValue } from '../utility-types'
 import type { WorkerOptions } from './worker-options'
 
@@ -10,7 +12,7 @@ import type { WorkerOptions } from './worker-options'
  * @template Response Type of response the worker sends back to the main worker.
  */
 export abstract class AbstractWorker<
-  MainWorker,
+  MainWorker extends Worker | MessagePort,
   Data = unknown,
   Response = unknown
 > extends AsyncResource {
@@ -37,12 +39,14 @@ export abstract class AbstractWorker<
    * @param type The type of async event.
    * @param isMain Whether this is the main worker or not.
    * @param fn Function processed by the worker when the pool's `execution` function is invoked.
+   * @param mainWorker Reference to main worker.
    * @param opts Options for the worker.
    */
   public constructor (
     type: string,
     isMain: boolean,
     fn: (data: Data) => Response,
+    protected mainWorker?: MainWorker | null,
     public readonly opts: WorkerOptions = {}
   ) {
     super(type)
@@ -51,7 +55,7 @@ export abstract class AbstractWorker<
     this.async = !!this.opts.async
     this.lastTask = Date.now()
     if (!fn) throw new Error('fn parameter is mandatory')
-    // keep the worker active
+    // Keep the worker active
     if (!isMain) {
       this.interval = setInterval(
         this.checkAlive.bind(this),
@@ -59,12 +63,38 @@ export abstract class AbstractWorker<
       )
       this.checkAlive.bind(this)()
     }
+
+    this.mainWorker?.on('message', (value: MessageValue<Data, MainWorker>) => {
+      if (value?.data && value.id) {
+        // Here you will receive messages
+        if (this.async) {
+          this.runInAsyncScope(this.runAsync.bind(this), this, fn, value)
+        } else {
+          this.runInAsyncScope(this.run.bind(this), this, fn, value)
+        }
+      } else if (value.parent) {
+        // Save a reference of the main worker to communicate with it
+        // This will be received once
+        this.mainWorker = value.parent
+      } else if (value.kill) {
+        // Here is time to kill this worker, just clearing the interval
+        if (this.interval) clearInterval(this.interval)
+        this.emitDestroy()
+      }
+    })
   }
 
   /**
    * Returns the main worker.
+   *
+   * @returns Reference to the main worker.
    */
-  protected abstract getMainWorker (): MainWorker
+  protected getMainWorker (): MainWorker {
+    if (!this.mainWorker) {
+      throw new Error('Main worker was not set')
+    }
+    return this.mainWorker
+  }
 
   /**
    * Send a message to the main worker.
index 9402ec1044e51ce9a42c8717e3c6c506dda1ac10..4b1efaca384a92869b67460236aefea72953df9d 100644 (file)
@@ -30,27 +30,7 @@ export class ClusterWorker<
    * @param opts Options for the worker.
    */
   public constructor (fn: (data: Data) => Response, opts: WorkerOptions = {}) {
-    super('worker-cluster-pool:pioardi', isMaster, fn, opts)
-
-    worker.on('message', (value: MessageValue<Data>) => {
-      if (value?.data && value.id) {
-        // here you will receive messages
-        // console.log('This is the main worker ' + isMaster)
-        if (this.async) {
-          this.runInAsyncScope(this.runAsync.bind(this), this, fn, value)
-        } else {
-          this.runInAsyncScope(this.run.bind(this), this, fn, value)
-        }
-      } else if (value.kill) {
-        // here is time to kill this worker, just clearing the interval
-        if (this.interval) clearInterval(this.interval)
-        this.emitDestroy()
-      }
-    })
-  }
-
-  protected getMainWorker (): Worker {
-    return worker
+    super('worker-cluster-pool:pioardi', isMaster, fn, worker, opts)
   }
 
   protected sendToMainWorker (message: MessageValue<Response>): void {
index b6fbdae257399ac7fa9f8d9fca9e37f6cdfc08d7..615e1d8b7f0b1dc5f92cc2d6417813ef60d83240 100644 (file)
@@ -1,3 +1,4 @@
+import type { MessagePort } from 'worker_threads'
 import { isMainThread, parentPort } from 'worker_threads'
 import type { JSONValue, MessageValue } from '../utility-types'
 import { AbstractWorker } from './abstract-worker'
@@ -22,11 +23,6 @@ export class ThreadWorker<
   Data extends JSONValue = JSONValue,
   Response extends JSONValue = JSONValue
 > extends AbstractWorker<MessagePort, Data, Response> {
-  /**
-   * Reference to main thread.
-   */
-  protected parent?: MessagePort
-
   /**
    * Constructs a new poolifier thread worker.
    *
@@ -34,34 +30,7 @@ export class ThreadWorker<
    * @param opts Options for the worker.
    */
   public constructor (fn: (data: Data) => Response, opts: WorkerOptions = {}) {
-    super('worker-thread-pool:pioardi', isMainThread, fn, opts)
-
-    parentPort?.on('message', (value: MessageValue<Data>) => {
-      if (value?.data && value.id) {
-        // here you will receive messages
-        // console.log('This is the main worker ' + isMainThread)
-        if (this.async) {
-          this.runInAsyncScope(this.runAsync.bind(this), this, fn, value)
-        } else {
-          this.runInAsyncScope(this.run.bind(this), this, fn, value)
-        }
-      } else if (value.parent) {
-        // save the port to communicate with the main thread
-        // this will be received once
-        this.parent = value.parent
-      } else if (value.kill) {
-        // here is time to kill this worker, just clearing the interval
-        if (this.interval) clearInterval(this.interval)
-        this.emitDestroy()
-      }
-    })
-  }
-
-  protected getMainWorker (): MessagePort {
-    if (!this.parent) {
-      throw new Error('Parent was not set')
-    }
-    return this.parent
+    super('worker-thread-pool:pioardi', isMainThread, fn, parentPort, opts)
   }
 
   protected sendToMainWorker (message: MessageValue<Response>): void {