Report some code cleanups from work in progress PR
authorJérôme Benoit <jerome.benoit@sap.com>
Fri, 7 Oct 2022 19:16:17 +0000 (21:16 +0200)
committerJérôme Benoit <jerome.benoit@sap.com>
Fri, 7 Oct 2022 19:16:17 +0000 (21:16 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>
16 files changed:
CHANGELOG.md
benchmarks/internal/benchmark-utils.js
benchmarks/internal/cluster/worker.js
benchmarks/internal/thread/worker.js
src/pools/abstract-pool.ts
tests/pools/cluster/dynamic.test.js
tests/pools/cluster/fixed.test.js
tests/pools/thread/dynamic.test.js
tests/pools/thread/fixed.test.js
tests/test-utils.js
tests/worker-files/cluster/asyncWorker.js
tests/worker-files/cluster/longRunningWorkerHardBehavior.js
tests/worker-files/cluster/longRunningWorkerSoftBehavior.js
tests/worker-files/cluster/testWorker.js
tests/worker-files/thread/asyncWorker.js
tests/worker-files/thread/longRunningWorkerHardBehavior.js

index ee4d6fd906d371239df3b6d00cf2982f148d21a5..cbaf1909658c1367adc6e232631020143943abde 100644 (file)
@@ -5,6 +5,10 @@ All notable changes to this project will be documented in this file.
 The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
 and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
 
+## [2.2.1] - 2022-05-01
+
+-
+
 ## [2.2.0] - 2022-05-01
 
 ### Breaking Changes
index a915a044211fa7c8c14f98c369e82c30739c1733..5f0c643b0c189a9d6d232deae5e8c0effe9cbae8 100644 (file)
@@ -16,6 +16,15 @@ async function runPoolifierTest (pool, { tasks, workerData }) {
   })
 }
 
+function jsonIntegerSerialization (n) {
+  for (let i = 0; i < n; i++) {
+    const o = {
+      a: i
+    }
+    JSON.stringify(o)
+  }
+}
+
 function generateRandomInteger (max, min = 0) {
   max = Math.floor(max)
   if (min) {
@@ -25,9 +34,26 @@ function generateRandomInteger (max, min = 0) {
   return Math.floor(Math.random() * (max + 1))
 }
 
+/**
+ * Intentionally inefficient implementation.
+ *
+ * @param {*} n
+ * @returns {number}
+ */
+function fibonacci (n) {
+  if (n <= 1) return 1
+  return fibonacci(n - 1) + fibonacci(n - 2)
+}
+
 const LIST_FORMATTER = new Intl.ListFormat('en-US', {
   style: 'long',
   type: 'conjunction'
 })
 
-module.exports = { generateRandomInteger, LIST_FORMATTER, runPoolifierTest }
+module.exports = {
+  runPoolifierTest,
+  jsonIntegerSerialization,
+  generateRandomInteger,
+  fibonacci,
+  LIST_FORMATTER
+}
index b4d54fca9976e65bbbb76da3ff5d54a4673815ff..9817322c2870864d3b18537eae6952e50afe43e1 100644 (file)
@@ -1,13 +1,9 @@
 'use strict'
 const { ClusterWorker } = require('../../../lib/index')
+const { jsonIntegerSerialization } = require('../benchmark-utils')
 
 function yourFunction (data) {
-  for (let i = 0; i < 1000; i++) {
-    const o = {
-      a: i
-    }
-    JSON.stringify(o)
-  }
+  jsonIntegerSerialization(1000)
   // console.log('This is the main thread ' + isMaster)
   return { ok: 1 }
 }
index f5f36ed88b82cbfef176e0cbdb1d9969d9a7af1c..6437c0afe6e1f6a06a9383b9855e6b3cb0fb2a8d 100644 (file)
@@ -1,13 +1,9 @@
 'use strict'
 const { ThreadWorker } = require('../../../lib/index')
+const { jsonIntegerSerialization } = require('../benchmark-utils')
 
 function yourFunction (data) {
-  for (let i = 0; i < 1000; i++) {
-    const o = {
-      a: i
-    }
-    JSON.stringify(o)
-  }
+  jsonIntegerSerialization(1000)
   // console.log('This is the main thread ' + isMainThread)
   return { ok: 1 }
 }
index 5efecb58209d3b07e3f1d6cc25cfec0e71fc3281..484b691645e9b48b0c031e6740cbb27364b524a7 100644 (file)
@@ -190,10 +190,9 @@ export abstract class AbstractPool<
       () => {
         const workerCreated = this.createAndSetupWorker()
         this.registerWorkerMessageListener(workerCreated, message => {
-          const tasksInProgress = this.tasks.get(workerCreated)
           if (
             isKillBehavior(KillBehaviors.HARD, message.kill) ||
-            tasksInProgress === 0
+            this.tasks.get(workerCreated) === 0
           ) {
             // Kill received from the worker, means that no new tasks are submitted to that worker for a while ( > maxInactiveTime)
             this.destroyWorker(workerCreated) as void
@@ -281,7 +280,7 @@ export abstract class AbstractPool<
     const messageId = ++this.nextMessageId
     const res = this.internalExecute(worker, messageId)
     this.checkAndEmitBusy()
-    this.sendToWorker(worker, { data: data || ({} as Data), id: messageId })
+    this.sendToWorker(worker, { data: data ?? ({} as Data), id: messageId })
     return res
   }
 
@@ -417,7 +416,7 @@ export abstract class AbstractPool<
    * @returns New, completely set up worker.
    */
   protected createAndSetupWorker (): Worker {
-    const worker: Worker = this.createWorker()
+    const worker = this.createWorker()
 
     worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
     worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
@@ -438,16 +437,16 @@ export abstract class AbstractPool<
   /**
    * This function is the listener registered for each worker.
    *
-   * @returns The listener function to execute when a message is sent from a worker.
+   * @returns The listener function to execute when a message is received from a worker.
    */
   protected workerListener (): (message: MessageValue<Response>) => void {
     return message => {
-      if (message.id) {
-        const value = this.promiseMap.get(message.id)
-        if (value) {
-          this.decreaseWorkersTasks(value.worker)
-          if (message.error) value.reject(message.error)
-          else value.resolve(message.data as Response)
+      if (message.id !== undefined) {
+        const promise = this.promiseMap.get(message.id)
+        if (promise !== undefined) {
+          this.decreaseWorkersTasks(promise.worker)
+          if (message.error) promise.reject(message.error)
+          else promise.resolve(message.data as Response)
           this.promiseMap.delete(message.id)
         }
       }
index b1201ea587c9803dafd33e7e974b800c3d218afc..61e1d6e2cd131bc8dabf95265854ec244bfdaa94 100644 (file)
@@ -54,8 +54,8 @@ describe('Dynamic cluster pool test suite', () => {
   it('Shutdown test', async () => {
     const exitPromise = TestUtils.waitExits(pool, min)
     await pool.destroy()
-    const res = await exitPromise
-    expect(res).toBe(min)
+    const numberOfExitEvents = await exitPromise
+    expect(numberOfExitEvents).toBe(min)
   })
 
   it('Validation of inputs test', () => {
index 2a0a8e2e70f02e15839d89d972d433254ab744e6..2ef709c1d0fc71fa3d6da9af508d8afa3859d5cf 100644 (file)
@@ -130,8 +130,8 @@ describe('Fixed cluster pool test suite', () => {
   it('Shutdown test', async () => {
     const exitPromise = TestUtils.waitExits(pool, numberOfWorkers)
     await pool.destroy()
-    const res = await exitPromise
-    expect(res).toBe(numberOfWorkers)
+    const numberOfExitEvents = await exitPromise
+    expect(numberOfExitEvents).toBe(numberOfWorkers)
   })
 
   it('Should work even without opts in input', async () => {
index ae78649f91d2851d91163d2ebf6f65e2c084c468..3ae35ac937f2adcdf6ff2f1f4baf83f70dd35548 100644 (file)
@@ -30,8 +30,8 @@ describe('Dynamic thread pool test suite', () => {
     // The `busy` event is triggered when the number of submitted tasks at once reach the max number of workers in the dynamic pool.
     // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool.
     expect(poolBusy).toBe(max + 1)
-    const res = await TestUtils.waitExits(pool, max - min)
-    expect(res).toBe(max - min)
+    const numberOfExitEvents = await TestUtils.waitExits(pool, max - min)
+    expect(numberOfExitEvents).toBe(max - min)
   })
 
   it('Verify scale thread up and down is working', async () => {
index d0a0e36717b4793581045b7d93fcc1df231ab30e..763e3f881dbc066d5b868e981608d1730a3668e7 100644 (file)
@@ -132,8 +132,8 @@ describe('Fixed thread pool test suite', () => {
   it('Shutdown test', async () => {
     const exitPromise = TestUtils.waitExits(pool, numberOfThreads)
     await pool.destroy()
-    const res = await exitPromise
-    expect(res).toBe(numberOfThreads)
+    const numberOfExitEvents = await exitPromise
+    expect(numberOfExitEvents).toBe(numberOfThreads)
   })
 
   it('Should work even without opts in input', async () => {
index 690f4b37487cf10d369b5cc1589727b358a5f0f7..4668e0720c7ea1c2eb9cca649f17bb7ade4eb826 100644 (file)
@@ -16,6 +16,46 @@ class TestUtils {
   static async sleep (ms) {
     return new Promise(resolve => setTimeout(resolve, ms))
   }
+
+  static async workerSleepFunction (data, ms) {
+    return new Promise((resolve, reject) => {
+      setTimeout(() => resolve(data), ms)
+    })
+  }
+
+  static jsonIntegerSerialization (n) {
+    for (let i = 0; i < n; i++) {
+      const o = {
+        a: i
+      }
+      JSON.stringify(o)
+    }
+  }
+
+  /**
+   * Intentionally inefficient implementation.
+   *
+   * @param {*} n
+   * @returns {number}
+   */
+  static fibonacci (n) {
+    if (n <= 1) return 1
+    return TestUtils.fibonacci(n - 1) + TestUtils.fibonacci(n - 2)
+  }
+
+  /**
+   * Intentionally inefficient implementation.
+   *
+   * @param {*} n
+   * @returns {number}
+   */
+  static factorial (n) {
+    if (n === 0) {
+      return 1
+    } else {
+      return TestUtils.factorial(n - 1) * n
+    }
+  }
 }
 
 module.exports = TestUtils
index b5c784dfaca6b7491b25914ceefa81a94042c44f..bceaffaee96e0e32f7f22c151930acfb0c57b3d8 100644 (file)
@@ -1,10 +1,9 @@
 'use strict'
 const { ClusterWorker, KillBehaviors } = require('../../../lib/index')
+const TestUtils = require('../../test-utils')
 
 async function sleep (data) {
-  return new Promise((resolve, reject) => {
-    setTimeout(() => resolve(data), 2000)
-  })
+  return TestUtils.workerSleepFunction(data, 2000)
 }
 
 module.exports = new ClusterWorker(sleep, {
index 04c78f4659738fa053ac67888663833d0a2eb2a1..73fdad01706c896c943b0291cb6da16bd8282ad5 100644 (file)
@@ -1,10 +1,9 @@
 'use strict'
 const { ClusterWorker, KillBehaviors } = require('../../../lib/index')
+const TestUtils = require('../../test-utils')
 
 async function sleep (data) {
-  return new Promise((resolve, reject) => {
-    setTimeout(() => resolve(data), 50000)
-  })
+  return TestUtils.workerSleepFunction(data, 50000)
 }
 
 module.exports = new ClusterWorker(sleep, {
index c4c00f6a17f41303be6636f59ebf2cb114adb153..5498752fad17ca839415de018309205bab74102e 100644 (file)
@@ -1,10 +1,9 @@
 'use strict'
 const { ClusterWorker } = require('../../../lib/index')
+const TestUtils = require('../../test-utils')
 
 async function sleep (data) {
-  return new Promise((resolve, reject) => {
-    setTimeout(() => resolve(data), 50000)
-  })
+  return TestUtils.workerSleepFunction(data, 50000)
 }
 
 module.exports = new ClusterWorker(sleep, {
index 549386073f3413ac26b44d43dc50fb88ff43a43e..f6115cdeca471a8895175860df716a40e1731c36 100644 (file)
@@ -1,14 +1,10 @@
 'use strict'
 const { ClusterWorker, KillBehaviors } = require('../../../lib/index')
 const { isMaster } = require('cluster')
+const TestUtils = require('../../test-utils')
 
 function test (data) {
-  for (let i = 0; i < 50; i++) {
-    const o = {
-      a: i
-    }
-    JSON.stringify(o)
-  }
+  TestUtils.jsonIntegerSerialization(50)
   return isMaster
 }
 
index 0bf5d244758ce100683ea9c23faf114bd03428c8..6508d6dac5d313bcd41b458643fa1a7efd3d1124 100644 (file)
@@ -1,10 +1,9 @@
 'use strict'
 const { ThreadWorker, KillBehaviors } = require('../../../lib/index')
+const TestUtils = require('../../test-utils')
 
 async function sleep (data) {
-  return new Promise((resolve, reject) => {
-    setTimeout(() => resolve(data), 2000)
-  })
+  return TestUtils.workerSleepFunction(data, 2000)
 }
 
 module.exports = new ThreadWorker(sleep, {
index 7d9714a8140aed85399c5b53aff47a6ec5b2a002..3c707eb55548adeb2ac587381161bb41d8adb084 100644 (file)
@@ -1,10 +1,9 @@
 'use strict'
 const { ThreadWorker, KillBehaviors } = require('../../../lib/index')
+const TestUtils = require('../../test-utils')
 
 async function sleep (data) {
-  return new Promise((resolve, reject) => {
-    setTimeout(() => resolve(data), 50000)
-  })
+  return TestUtils.workerSleepFunction(data, 50000)
 }
 
 module.exports = new ThreadWorker(sleep, {