Improve chooseWorker function in dynamic pools (#152)
authorShinigami <chrissi92@hotmail.de>
Sun, 14 Feb 2021 19:02:52 +0000 (20:02 +0100)
committerGitHub <noreply@github.com>
Sun, 14 Feb 2021 19:02:52 +0000 (20:02 +0100)
package-lock.json
src/pools/abstract-pool.ts
src/pools/cluster/dynamic.ts
src/pools/cluster/fixed.ts
src/pools/thread/dynamic.ts
src/pools/thread/fixed.ts

index efed1e62096d6efa27caa84ce398f8199a3c0c69..fe7d419b434cc8a7e0d2c7c6361ae3bb4fc4469f 100644 (file)
       "dev": true
     },
     "uglify-js": {
-      "version": "3.12.7",
-      "resolved": "https://registry.npmjs.org/uglify-js/-/uglify-js-3.12.7.tgz",
-      "integrity": "sha512-SIZhkoh+U/wjW+BHGhVwE9nt8tWJspncloBcFapkpGRwNPqcH8pzX36BXe3TPBjzHWPMUZotpCigak/udWNr1Q==",
+      "version": "3.12.8",
+      "resolved": "https://registry.npmjs.org/uglify-js/-/uglify-js-3.12.8.tgz",
+      "integrity": "sha512-fvBeuXOsvqjecUtF/l1dwsrrf5y2BCUk9AOJGzGcm6tE7vegku5u/YvqjyDaAGr422PLoLnrxg3EnRvTqsdC1w==",
       "dev": true,
       "optional": true
     },
index ac3bd6cc7b02f0f7e592de747a5668a25d845f09..2ffd292afab9a0b540826820935b4a0febd34057 100644 (file)
@@ -238,15 +238,13 @@ export abstract class AbstractPool<
     message: MessageValue<Data>
   ): void
 
-  protected abstract registerWorkerMessageListener (
-    port: Worker,
-    listener: (message: MessageValue<Response>) => void
-  ): void
+  protected abstract registerWorkerMessageListener<
+    Message extends Data | Response
+  > (worker: Worker, listener: (message: MessageValue<Message>) => void): void
 
-  protected abstract unregisterWorkerMessageListener (
-    port: Worker,
-    listener: (message: MessageValue<Response>) => void
-  ): void
+  protected abstract unregisterWorkerMessageListener<
+    Message extends Data | Response
+  > (worker: Worker, listener: (message: MessageValue<Message>) => void): void
 
   protected internalExecute (
     worker: Worker,
index 432feb3abb6b91017540561efcaa4f48608869da..b0bb697372cdc3312d4cf78200ecc7ceca214cac 100644 (file)
@@ -1,5 +1,5 @@
 import type { Worker } from 'cluster'
-import type { JSONValue, MessageValue } from '../../utility-types'
+import type { JSONValue } from '../../utility-types'
 import type { ClusterPoolOptions } from './fixed'
 import { FixedClusterPool } from './fixed'
 
@@ -46,31 +46,26 @@ export class DynamicClusterPool<
    * @returns Cluster worker.
    */
   protected chooseWorker (): Worker {
-    let worker: Worker | undefined
-    for (const entry of this.tasks) {
-      if (entry[1] === 0) {
-        worker = entry[0]
-        break
+    for (const [worker, numberOfTasks] of this.tasks) {
+      if (numberOfTasks === 0) {
+        // A worker is free, use it
+        return worker
       }
     }
 
-    if (worker) {
-      // A worker is free, use it
-      return worker
-    } else {
-      if (this.workers.length === this.max) {
-        this.emitter.emit('FullPool')
-        return super.chooseWorker()
-      }
-      // All workers are busy, create a new worker
-      const worker = this.createAndSetupWorker()
-      worker.on('message', (message: MessageValue<Data>) => {
-        if (message.kill) {
-          this.sendToWorker(worker, { kill: 1 })
-          void this.destroyWorker(worker)
-        }
-      })
-      return worker
+    if (this.workers.length === this.max) {
+      this.emitter.emit('FullPool')
+      return super.chooseWorker()
     }
+
+    // All workers are busy, create a new worker
+    const worker = this.createAndSetupWorker()
+    this.registerWorkerMessageListener<Data>(worker, message => {
+      if (message.kill) {
+        this.sendToWorker(worker, { kill: 1 })
+        void this.destroyWorker(worker)
+      }
+    })
+    return worker
   }
 }
index 6620d5f3149fa42821dc00ac8721706bdc118950..b2724426a02111108b33debcaa3c116fe9e2b0c5 100644 (file)
@@ -66,18 +66,18 @@ export class FixedClusterPool<
     worker.send(message)
   }
 
-  protected registerWorkerMessageListener (
-    port: Worker,
-    listener: (message: MessageValue<Response>) => void
+  protected registerWorkerMessageListener<Message extends Data | Response> (
+    worker: Worker,
+    listener: (message: MessageValue<Message>) => void
   ): void {
-    port.on('message', listener)
+    worker.on('message', listener)
   }
 
-  protected unregisterWorkerMessageListener (
-    port: Worker,
-    listener: (message: MessageValue<Response>) => void
+  protected unregisterWorkerMessageListener<Message extends Data | Response> (
+    worker: Worker,
+    listener: (message: MessageValue<Message>) => void
   ): void {
-    port.removeListener('message', listener)
+    worker.removeListener('message', listener)
   }
 
   protected createWorker (): Worker {
index 9ab6bf30a7764ff3d18abd5ca99d1896cd055842..fddae4662e8b4a4a2b68baf043015fa498d78b4f 100644 (file)
@@ -1,4 +1,4 @@
-import type { JSONValue, MessageValue } from '../../utility-types'
+import type { JSONValue } from '../../utility-types'
 import type { PoolOptions } from '../abstract-pool'
 import type { ThreadWorkerWithMessageChannel } from './fixed'
 import { FixedThreadPool } from './fixed'
@@ -46,31 +46,26 @@ export class DynamicThreadPool<
    * @returns Thread worker.
    */
   protected chooseWorker (): ThreadWorkerWithMessageChannel {
-    let worker: ThreadWorkerWithMessageChannel | undefined
-    for (const entry of this.tasks) {
-      if (entry[1] === 0) {
-        worker = entry[0]
-        break
+    for (const [worker, numberOfTasks] of this.tasks) {
+      if (numberOfTasks === 0) {
+        // A worker is free, use it
+        return worker
       }
     }
 
-    if (worker) {
-      // A worker is free, use it
-      return worker
-    } else {
-      if (this.workers.length === this.max) {
-        this.emitter.emit('FullPool')
-        return super.chooseWorker()
-      }
-      // All workers are busy, create a new worker
-      const worker = this.createAndSetupWorker()
-      worker.port2?.on('message', (message: MessageValue<Data>) => {
-        if (message.kill) {
-          this.sendToWorker(worker, { kill: 1 })
-          void this.destroyWorker(worker)
-        }
-      })
-      return worker
+    if (this.workers.length === this.max) {
+      this.emitter.emit('FullPool')
+      return super.chooseWorker()
     }
+
+    // All workers are busy, create a new worker
+    const worker = this.createAndSetupWorker()
+    this.registerWorkerMessageListener<Data>(worker, message => {
+      if (message.kill) {
+        this.sendToWorker(worker, { kill: 1 })
+        void this.destroyWorker(worker)
+      }
+    })
+    return worker
   }
 }
index bfd6f016d0d4f08525d1c19435d4a138e2a404a0..705f124aea4dda5f35234dea3f8d2f1b59e827a6 100644 (file)
@@ -57,18 +57,18 @@ export class FixedThreadPool<
     worker.postMessage(message)
   }
 
-  protected registerWorkerMessageListener (
-    port: ThreadWorkerWithMessageChannel,
-    listener: (message: MessageValue<Response>) => void
+  protected registerWorkerMessageListener<Message extends Data | Response> (
+    messageChannel: ThreadWorkerWithMessageChannel,
+    listener: (message: MessageValue<Message>) => void
   ): void {
-    port.port2?.on('message', listener)
+    messageChannel.port2?.on('message', listener)
   }
 
-  protected unregisterWorkerMessageListener (
-    port: ThreadWorkerWithMessageChannel,
-    listener: (message: MessageValue<Response>) => void
+  protected unregisterWorkerMessageListener<Message extends Data | Response> (
+    messageChannel: ThreadWorkerWithMessageChannel,
+    listener: (message: MessageValue<Message>) => void
   ): void {
-    port.port2?.removeListener('message', listener)
+    messageChannel.port2?.removeListener('message', listener)
   }
 
   protected createWorker (): ThreadWorkerWithMessageChannel {