Report more cleanups from work in progress PRs
authorJérôme Benoit <jerome.benoit@sap.com>
Fri, 7 Oct 2022 20:29:20 +0000 (22:29 +0200)
committerJérôme Benoit <jerome.benoit@sap.com>
Fri, 7 Oct 2022 20:29:20 +0000 (22:29 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>
12 files changed:
src/pools/abstract-pool.ts
src/worker/abstract-worker.ts
tests/pools/abstract/abstract-pool.test.js
tests/pools/thread/dynamic.test.js
tests/test-utils.js
tests/worker-files/cluster/asyncWorker.js
tests/worker-files/cluster/longRunningWorkerHardBehavior.js
tests/worker-files/cluster/longRunningWorkerSoftBehavior.js
tests/worker-files/thread/asyncWorker.js
tests/worker-files/thread/longRunningWorkerHardBehavior.js
tests/worker-files/thread/longRunningWorkerSoftBehavior.js
tests/worker-files/thread/testWorker.js

index 484b691645e9b48b0c031e6740cbb27364b524a7..71046d494db7770ade3339d46edcde5d27291efe 100644 (file)
@@ -280,7 +280,8 @@ export abstract class AbstractPool<
     const messageId = ++this.nextMessageId
     const res = this.internalExecute(worker, messageId)
     this.checkAndEmitBusy()
-    this.sendToWorker(worker, { data: data ?? ({} as Data), id: messageId })
+    data = data ?? ({} as Data)
+    this.sendToWorker(worker, { data, id: messageId })
     return res
   }
 
index 8f86fb485cd26f30efcc51a1546eb1752579ba24..b49ede8b0802f1baf39230b087f9b6b99050e8e3 100644 (file)
@@ -33,7 +33,6 @@ export abstract class AbstractWorker<
    * Options for the worker.
    */
   public readonly opts: WorkerOptions
-
   /**
    * Constructs a new poolifier worker.
    *
@@ -75,25 +74,32 @@ export abstract class AbstractWorker<
     }
 
     this.mainWorker?.on('message', (value: MessageValue<Data, MainWorker>) => {
-      if (value?.data && value.id) {
-        // Here you will receive messages
-        if (this.opts.async) {
-          this.runInAsyncScope(this.runAsync.bind(this), this, fn, value)
-        } else {
-          this.runInAsyncScope(this.run.bind(this), this, fn, value)
-        }
-      } else if (value.parent) {
-        // Save a reference of the main worker to communicate with it
-        // This will be received once
-        this.mainWorker = value.parent
-      } else if (value.kill) {
-        // Here is time to kill this worker, just clearing the interval
-        if (this.aliveInterval) clearInterval(this.aliveInterval)
-        this.emitDestroy()
-      }
+      this.messageListener(value, fn)
     })
   }
 
+  protected messageListener (
+    value: MessageValue<Data, MainWorker>,
+    fn: (data: Data) => Response
+  ): void {
+    if (value.data !== undefined && value.id !== undefined) {
+      // Here you will receive messages
+      if (this.opts.async) {
+        this.runInAsyncScope(this.runAsync.bind(this), this, fn, value)
+      } else {
+        this.runInAsyncScope(this.run.bind(this), this, fn, value)
+      }
+    } else if (value.parent !== undefined) {
+      // Save a reference of the main worker to communicate with it
+      // This will be received once
+      this.mainWorker = value.parent
+    } else if (value.kill !== undefined) {
+      // Here is time to kill this worker, just clearing the interval
+      if (this.aliveInterval) clearInterval(this.aliveInterval)
+      this.emitDestroy()
+    }
+  }
+
   private checkWorkerOptions (opts: WorkerOptions) {
     this.opts.killBehavior = opts.killBehavior ?? DEFAULT_KILL_BEHAVIOR
     this.opts.maxInactiveTime =
index d4690b4fac4dcc5e9233b1a773ba4adb178f8b4b..d9ab30fec9d0a9f11629fcf69ca4b0921469fdb9 100644 (file)
@@ -139,7 +139,7 @@ describe('Abstract pool test suite', () => {
     pool.destroy()
   })
 
-  it("Verify that pool event emitter 'busy' event can register a callback", () => {
+  it("Verify that pool event emitter 'busy' event can register a callback", async () => {
     const pool = new FixedThreadPool(
       numberOfWorkers,
       './tests/worker-files/thread/testWorker.js'
@@ -150,6 +150,7 @@ describe('Abstract pool test suite', () => {
     for (let i = 0; i < numberOfWorkers * 2; i++) {
       promises.push(pool.execute({ test: 'test' }))
     }
+    await Promise.all(promises)
     // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
     // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
     expect(poolBusy).toBe(numberOfWorkers + 1)
index 3ae35ac937f2adcdf6ff2f1f4baf83f70dd35548..92fbd9b8f9e128382c5781f45719db104bebb19f 100644 (file)
@@ -51,14 +51,10 @@ describe('Dynamic thread pool test suite', () => {
   })
 
   it('Shutdown test', async () => {
-    let closedThreads = 0
-    pool.workers.forEach(w => {
-      w.on('exit', () => {
-        closedThreads++
-      })
-    })
+    const exitPromise = TestUtils.waitExits(pool, min)
     await pool.destroy()
-    expect(closedThreads).toBe(min)
+    const numberOfExitEvents = await exitPromise
+    expect(numberOfExitEvents).toBe(min)
   })
 
   it('Validation of inputs test', () => {
index 4668e0720c7ea1c2eb9cca649f17bb7ade4eb826..8565c7504bfe0a642679eafcbb1d62cfc3dc892a 100644 (file)
@@ -18,7 +18,7 @@ class TestUtils {
   }
 
   static async workerSleepFunction (data, ms) {
-    return new Promise((resolve, reject) => {
+    return new Promise(resolve => {
       setTimeout(() => resolve(data), ms)
     })
   }
index bceaffaee96e0e32f7f22c151930acfb0c57b3d8..70db6a8777966c681e010803a7c6562cc36a5ea3 100644 (file)
@@ -3,7 +3,7 @@ const { ClusterWorker, KillBehaviors } = require('../../../lib/index')
 const TestUtils = require('../../test-utils')
 
 async function sleep (data) {
-  return TestUtils.workerSleepFunction(data, 2000)
+  return await TestUtils.workerSleepFunction(data, 2000)
 }
 
 module.exports = new ClusterWorker(sleep, {
index 73fdad01706c896c943b0291cb6da16bd8282ad5..5c0b620239bdee156e8193cc43bc97d7bd70feed 100644 (file)
@@ -3,7 +3,7 @@ const { ClusterWorker, KillBehaviors } = require('../../../lib/index')
 const TestUtils = require('../../test-utils')
 
 async function sleep (data) {
-  return TestUtils.workerSleepFunction(data, 50000)
+  return await TestUtils.workerSleepFunction(data, 50000)
 }
 
 module.exports = new ClusterWorker(sleep, {
index 5498752fad17ca839415de018309205bab74102e..ea53bbea5b42cbc2b83474074430101d59cb33f6 100644 (file)
@@ -3,7 +3,7 @@ const { ClusterWorker } = require('../../../lib/index')
 const TestUtils = require('../../test-utils')
 
 async function sleep (data) {
-  return TestUtils.workerSleepFunction(data, 50000)
+  return await TestUtils.workerSleepFunction(data, 50000)
 }
 
 module.exports = new ClusterWorker(sleep, {
index 6508d6dac5d313bcd41b458643fa1a7efd3d1124..730d10ce26385e3ca1a88ab50dd59e4257d290eb 100644 (file)
@@ -3,7 +3,7 @@ const { ThreadWorker, KillBehaviors } = require('../../../lib/index')
 const TestUtils = require('../../test-utils')
 
 async function sleep (data) {
-  return TestUtils.workerSleepFunction(data, 2000)
+  return await TestUtils.workerSleepFunction(data, 2000)
 }
 
 module.exports = new ThreadWorker(sleep, {
index 3c707eb55548adeb2ac587381161bb41d8adb084..120e8e5ffa3e4ce457225d831a5c8556a28027cb 100644 (file)
@@ -3,7 +3,7 @@ const { ThreadWorker, KillBehaviors } = require('../../../lib/index')
 const TestUtils = require('../../test-utils')
 
 async function sleep (data) {
-  return TestUtils.workerSleepFunction(data, 50000)
+  return await TestUtils.workerSleepFunction(data, 50000)
 }
 
 module.exports = new ThreadWorker(sleep, {
index eed0586fd2e7d9254e81b1f6e51ad76b8914be19..1204809334544c44ca2eae19374bde4bc527b8af 100644 (file)
@@ -1,10 +1,9 @@
 'use strict'
 const { ThreadWorker } = require('../../../lib/index')
+const TestUtils = require('../../test-utils')
 
 async function sleep (data) {
-  return new Promise((resolve, reject) => {
-    setTimeout(() => resolve(data), 50000)
-  })
+  return await TestUtils.workerSleepFunction(data, 50000)
 }
 
 module.exports = new ThreadWorker(sleep, {
index 369dbdba477bc8f7f60c50976fd513e387f5ed5e..ccf55b56ba92d29e3848ecf0d19d37fe39f068e9 100644 (file)
@@ -1,14 +1,10 @@
 'use strict'
 const { ThreadWorker, KillBehaviors } = require('../../../lib/index')
 const { isMainThread } = require('worker_threads')
+const TestUtils = require('../../test-utils')
 
 function test (data) {
-  for (let i = 0; i < 50; i++) {
-    const o = {
-      a: i
-    }
-    JSON.stringify(o)
-  }
+  TestUtils.jsonIntegerSerialization(50)
   return isMainThread
 }