Implementation for killBehavior based on the last feedbacks
authoraardizio <alessandroardizio94@gmail.com>
Tue, 16 Feb 2021 10:07:12 +0000 (11:07 +0100)
committeraardizio <alessandroardizio94@gmail.com>
Tue, 16 Feb 2021 10:07:12 +0000 (11:07 +0100)
23 files changed:
src/index.ts
src/pools/cluster/dynamic.ts
src/pools/thread/dynamic.ts
src/utility-types.ts
src/worker/abstract-worker.ts
src/worker/worker-options.ts
tests/pools/cluster/dynamic.test.js
tests/pools/thread/dynamic.test.js
tests/worker/cluster/asyncErrorWorker.js
tests/worker/cluster/asyncWorker.js
tests/worker/cluster/echoWorker.js
tests/worker/cluster/emptyWorker.js
tests/worker/cluster/errorWorker.js
tests/worker/cluster/longRunningWorkerHardBehavior.js [new file with mode: 0644]
tests/worker/cluster/longRunningWorkerSoftBehavior.js [moved from tests/worker/cluster/longRunningWorker.js with 69% similarity]
tests/worker/cluster/testWorker.js
tests/worker/thread/asyncWorker.js
tests/worker/thread/echoWorker.js
tests/worker/thread/emptyWorker.js
tests/worker/thread/errorWorker.js
tests/worker/thread/longRunningWorkerHardBehavior.js [new file with mode: 0644]
tests/worker/thread/longRunningWorkerSoftBehavior.js [moved from tests/worker/thread/longRunningWorker.js with 69% similarity]
tests/worker/thread/testWorker.js

index 7242dd5e1fbe4c36bbbbd00f17bc3af7cc2c756d..60536f99603b114c94df518f121bf30c1395f005 100644 (file)
@@ -16,3 +16,4 @@ export { AbstractWorker } from './worker/abstract-worker'
 export { ClusterWorker } from './worker/cluster-worker'
 export { ThreadWorker } from './worker/thread-worker'
 export type { WorkerOptions } from './worker/worker-options'
+export { killBehaviorEnumeration } from './worker/worker-options'
index 5eba843a3ae0726a3102d3068deced8679058844..58e2b7124d749717ff90df9ad14a387f988a8c53 100644 (file)
@@ -2,6 +2,7 @@ import type { Worker } from 'cluster'
 import type { JSONValue } from '../../utility-types'
 import type { ClusterPoolOptions } from './fixed'
 import { FixedClusterPool } from './fixed'
+import { killBehaviorEnumeration } from '../../worker/worker-options'
 
 /**
  * A cluster pool with a dynamic number of workers, but a guaranteed minimum number of workers.
@@ -62,9 +63,10 @@ export class DynamicClusterPool<
     const worker = this.createAndSetupWorker()
     this.registerWorkerMessageListener<Data>(worker, message => {
       const tasksInProgress = this.tasks.get(worker)
-      if (message.kill && tasksInProgress === 0) {
+      const isKillBehaviorOptionHard =
+        message.kill === killBehaviorEnumeration.HARD
+      if (isKillBehaviorOptionHard || tasksInProgress === 0) {
         // Kill received from the worker, means that no new tasks are submitted to that worker for a while ( > maxInactiveTime)
-        // To handle the case of a long-running task we will check if there is any active task
         this.sendToWorker(worker, { kill: 1 })
         void this.destroyWorker(worker)
       }
index 26d81d2fc15c99aa33a12088e225ab0889f534f4..7a0d4ffd8c0513bac3a4ffaf983182f109d23b3d 100644 (file)
@@ -2,6 +2,7 @@ import type { JSONValue } from '../../utility-types'
 import type { PoolOptions } from '../abstract-pool'
 import type { ThreadWorkerWithMessageChannel } from './fixed'
 import { FixedThreadPool } from './fixed'
+import { killBehaviorEnumeration } from '../../worker/worker-options'
 
 /**
  * A thread pool with a dynamic number of threads, but a guaranteed minimum number of threads.
@@ -62,9 +63,10 @@ export class DynamicThreadPool<
     const worker = this.createAndSetupWorker()
     this.registerWorkerMessageListener<Data>(worker, message => {
       const tasksInProgress = this.tasks.get(worker)
-      if (message.kill && tasksInProgress === 0) {
+      const isKillBehaviorOptionHard =
+        message.kill === killBehaviorEnumeration.HARD
+      if (isKillBehaviorOptionHard || tasksInProgress === 0) {
         // Kill received from the worker, means that no new tasks are submitted to that worker for a while ( > maxInactiveTime)
-        // To handle the case of a long-running task we will check if there is any active task
         this.sendToWorker(worker, { kill: 1 })
         void this.destroyWorker(worker)
       }
index eb3f9727369f837d9cd36d5bd6b7dd69c9d5f0c3..d7f9d6a20e68d4c3da8481727714d51a04a4aef0 100644 (file)
@@ -42,7 +42,7 @@ export interface MessageValue<
   /**
    * Kill code.
    */
-  readonly kill?: number
+  readonly kill?: string | number
   /**
    * Error.
    */
index 34ab964c68d17c873b13149d2b228f570a9a6595..c50cfbb081599ed955561edea56a634c93d885ef 100644 (file)
@@ -3,6 +3,10 @@ import type { Worker } from 'cluster'
 import type { MessagePort } from 'worker_threads'
 import type { MessageValue } from '../utility-types'
 import type { WorkerOptions } from './worker-options'
+import { killBehaviorEnumeration } from './worker-options'
+
+const defaultMaxInactiveTime = 1000 * 60
+const defaultKillBehavior = killBehaviorEnumeration.SOFT
 
 /**
  * Base class containing some shared logic for all poolifier workers.
@@ -20,6 +24,10 @@ export abstract class AbstractWorker<
    * The maximum time to keep this worker alive while idle. The pool automatically checks and terminates this worker when the time expires.
    */
   protected readonly maxInactiveTime: number
+  /**
+   * The kill behavior set as option on the Worker constructor or a default value.
+   */
+  protected readonly killBehavior: string
   /**
    * Whether the worker is working asynchronously or not.
    */
@@ -47,11 +55,14 @@ export abstract class AbstractWorker<
     isMain: boolean,
     fn: (data: Data) => Response,
     protected mainWorker?: MainWorker | null,
-    public readonly opts: WorkerOptions = {}
+    public readonly opts: WorkerOptions = {
+      killBehavior: defaultKillBehavior,
+      maxInactiveTime: defaultMaxInactiveTime
+    }
   ) {
     super(type)
-
-    this.maxInactiveTime = this.opts.maxInactiveTime ?? 1000 * 60
+    this.killBehavior = this.opts.killBehavior ?? defaultKillBehavior
+    this.maxInactiveTime = this.opts.maxInactiveTime ?? defaultMaxInactiveTime
     this.async = !!this.opts.async
     this.lastTask = Date.now()
     if (!fn) throw new Error('fn parameter is mandatory')
@@ -108,7 +119,7 @@ export abstract class AbstractWorker<
    */
   protected checkAlive (): void {
     if (Date.now() - this.lastTask > this.maxInactiveTime) {
-      this.sendToMainWorker({ kill: 1 })
+      this.sendToMainWorker({ kill: this.killBehavior })
     }
   }
 
index 0236164b382ed66972789c8ce3209e73e6b28564..8bb9cc50879e8c04e7bf6d2f8e56a856c36ab2fc 100644 (file)
@@ -1,11 +1,22 @@
+/**
+ * Kill behavior enumeration
+ */
+export const killBehaviorEnumeration = Object.freeze({
+  SOFT: 'SOFT',
+  HARD: 'HARD'
+})
+
 /**
  * Options for workers.
  */
 export interface WorkerOptions {
   /**
    * Maximum waiting time in milliseconds for tasks.
-   *
    * After this time, newly created workers will be terminated.
+   * The last active time of your worker unit will be updated when a task is submitted to a worker or when a worker terminate a task.
+   * If killBehavior is set to HARD this value represents also the timeout for the tasks that you submit to the pool,
+   * when this timeout expires your tasks is interrupted and the worker is killed if is not part of the minimum size of the pool.
+   * If killBehavior is set to SOFT your tasks have no timeout and your workers will not be terminated until your task is finished.
    *
    * @default 60.000 ms
    */
@@ -16,4 +27,13 @@ export interface WorkerOptions {
    * @default false
    */
   async?: boolean
+  /**
+   * killBehavior dictates if your async unit ( worker/process ) will be deleted in case that a task is active on it.
+   * SOFT: If current time - last active time is greater than maxInactiveTime option, but a task is still running then the worker will be not deleted.
+   * HARD: If last active time is greater than maxInactiveTime option, but a task is still running then the worker will be deleted.
+   * This option only apply to the newly created workers.
+   *
+   * @default SOFT
+   */
+  killBehavior?: string
 }
index cd946cdab294877980f8322cea21aca5461541ae..b45b7ccac91308fcb1fe5193f0ffa3ea8d882b60 100644 (file)
@@ -87,11 +87,27 @@ describe('Dynamic cluster pool test suite ', () => {
     expect(res).toBeFalsy()
   })
 
-  it('Verify scale processes up and down is working when long running task is used', async () => {
+  it('Verify scale processes up and down is working when long running task is used:hard', async () => {
     const longRunningPool = new DynamicClusterPool(
       min,
       max,
-      './tests/worker/cluster/longRunningWorker.js'
+      './tests/worker/cluster/longRunningWorkerHardBehavior.js'
+    )
+    expect(longRunningPool.workers.length).toBe(min)
+    for (let i = 0; i < max * 10; i++) {
+      longRunningPool.execute({ test: 'test' })
+    }
+    expect(longRunningPool.workers.length).toBe(max)
+    await new Promise(resolve => setTimeout(resolve, 3000))
+    // Here we expect the workers to be at the max size since that the task is still running
+    expect(longRunningPool.workers.length).toBe(min)
+  })
+
+  it('Verify scale processes up and down is working when long running task is used:soft', async () => {
+    const longRunningPool = new DynamicClusterPool(
+      min,
+      max,
+      './tests/worker/cluster/longRunningWorkerSoftBehavior.js'
     )
     expect(longRunningPool.workers.length).toBe(min)
     for (let i = 0; i < max * 10; i++) {
index 2f89b2ffea69b045b576db32287a5acc8bfd2a7b..836e6591481ef1cfa567e0d1abcc15a2830e94eb 100644 (file)
@@ -86,11 +86,30 @@ describe('Dynamic thread pool test suite ', () => {
     expect(res).toBeFalsy()
   })
 
-  it('Verify scale thread up and down is working when long running task is used', async () => {
+  it('Verify scale thread up and down is working when long running task is used:hard', async () => {
     const longRunningPool = new DynamicThreadPool(
       min,
       max,
-      './tests/worker/thread/longRunningWorker.js',
+      './tests/worker/thread/longRunningWorkerHardBehavior.js',
+      {
+        errorHandler: e => console.error(e),
+        onlineHandler: () => console.log('worker is online')
+      }
+    )
+    expect(longRunningPool.workers.length).toBe(min)
+    for (let i = 0; i < max * 10; i++) {
+      longRunningPool.execute({ test: 'test' })
+    }
+    expect(longRunningPool.workers.length).toBe(max)
+    await new Promise(resolve => setTimeout(resolve, 1000))
+    expect(longRunningPool.workers.length).toBe(min)
+  })
+
+  it('Verify scale thread up and down is working when long running task is used:soft', async () => {
+    const longRunningPool = new DynamicThreadPool(
+      min,
+      max,
+      './tests/worker/thread/longRunningWorkerSoftBehavior.js',
       {
         errorHandler: e => console.error(e),
         onlineHandler: () => console.log('worker is online')
index 2476853b55520ab5fea4c917fb70d0f05c66dfa0..be675e110e96bb8d6bc6ae5c232b3f8a3e6de132 100644 (file)
@@ -1,5 +1,5 @@
 'use strict'
-const { ClusterWorker } = require('../../../lib/index')
+const { ClusterWorker, killBehaviorEnumeration } = require('../../../lib/index')
 
 async function error (data) {
   return new Promise((resolve, reject) => {
@@ -12,5 +12,6 @@ async function error (data) {
 
 module.exports = new ClusterWorker(error, {
   maxInactiveTime: 500,
-  async: true
+  async: true,
+  killBehavior: killBehaviorEnumeration
 })
index 975b0b365efb92dad4d24f44a20434f1267a4cd4..b47adc916affe53ad7e12a7fe447aac1c7f85a4e 100644 (file)
@@ -1,5 +1,5 @@
 'use strict'
-const { ClusterWorker } = require('../../../lib/index')
+const { ClusterWorker, killBehaviorEnumeration } = require('../../../lib/index')
 
 async function sleep (data) {
   return new Promise((resolve, reject) => {
@@ -7,4 +7,8 @@ async function sleep (data) {
   })
 }
 
-module.exports = new ClusterWorker(sleep, { maxInactiveTime: 500, async: true })
+module.exports = new ClusterWorker(sleep, {
+  maxInactiveTime: 500,
+  async: true,
+  killBehavior: killBehaviorEnumeration.HARD
+})
index 6c77bcce888b77b20f4f6045d4bc3f4fe00905a3..8d4477ee386983aa6680d4b73a4c619b5fdeb9bc 100644 (file)
@@ -1,8 +1,11 @@
 'use strict'
-const { ClusterWorker } = require('../../../lib/index')
+const { ClusterWorker, killBehaviorEnumeration } = require('../../../lib/index')
 
 function echo (data) {
   return data
 }
 
-module.exports = new ClusterWorker(echo, { maxInactiveTime: 500 })
+module.exports = new ClusterWorker(echo, {
+  maxInactiveTime: 500,
+  killBehavior: killBehaviorEnumeration.HARD
+})
index 62c8e2bb0438e426dd3838101002b53bba20b692..4e5eeca7a7a33f8a67b90447de69a05d574dda83 100644 (file)
@@ -1,6 +1,9 @@
 'use strict'
-const { ClusterWorker } = require('../../../lib/index')
+const { ClusterWorker, killBehaviorEnumeration } = require('../../../lib/index')
 
 function test (data) {}
 
-module.exports = new ClusterWorker(test, { maxInactiveTime: 500 })
+module.exports = new ClusterWorker(test, {
+  maxInactiveTime: 500,
+  killBehavior: killBehaviorEnumeration.HARD
+})
index 87df92543664b6e3c5395a52e8c822d65ea0c8b6..bbcb78be74f5cf5fad08a097545303017013e737 100644 (file)
@@ -1,5 +1,5 @@
 'use strict'
-const { ClusterWorker } = require('../../../lib/index')
+const { ClusterWorker, killBehaviorEnumeration } = require('../../../lib/index')
 
 function error (data) {
   throw new Error('Error Message from ClusterWorker')
@@ -7,5 +7,6 @@ function error (data) {
 
 module.exports = new ClusterWorker(error, {
   maxInactiveTime: 500,
-  async: false
+  async: false,
+  killBehavior: killBehaviorEnumeration
 })
diff --git a/tests/worker/cluster/longRunningWorkerHardBehavior.js b/tests/worker/cluster/longRunningWorkerHardBehavior.js
new file mode 100644 (file)
index 0000000..9fe9d3f
--- /dev/null
@@ -0,0 +1,14 @@
+'use strict'
+const { ClusterWorker, killBehaviorEnumeration } = require('../../../lib/index')
+
+async function sleep (data) {
+  return new Promise((resolve, reject) => {
+    setTimeout(() => resolve(data), 50000)
+  })
+}
+
+module.exports = new ClusterWorker(sleep, {
+  maxInactiveTime: 500,
+  async: true,
+  killBehavior: killBehaviorEnumeration.HARD
+})
similarity index 69%
rename from tests/worker/cluster/longRunningWorker.js
rename to tests/worker/cluster/longRunningWorkerSoftBehavior.js
index d751d3514fbd947559ca240ce0fb70822a2722de..c4c00f6a17f41303be6636f59ebf2cb114adb153 100644 (file)
@@ -7,4 +7,7 @@ async function sleep (data) {
   })
 }
 
-module.exports = new ClusterWorker(sleep, { maxInactiveTime: 500, async: true })
+module.exports = new ClusterWorker(sleep, {
+  maxInactiveTime: 500,
+  async: true
+})
index a8a6bb9ee14a0e1e670bb12348dfc47a1590b1b3..3e1ed0d193f65ff029f1c2c954825993ba0330be 100644 (file)
@@ -1,5 +1,5 @@
 'use strict'
-const { ClusterWorker } = require('../../../lib/index')
+const { ClusterWorker, killBehaviorEnumeration } = require('../../../lib/index')
 const { isMaster } = require('cluster')
 
 function test (data) {
@@ -12,4 +12,7 @@ function test (data) {
   return isMaster
 }
 
-module.exports = new ClusterWorker(test, { maxInactiveTime: 500 })
+module.exports = new ClusterWorker(test, {
+  maxInactiveTime: 500,
+  killBehavior: killBehaviorEnumeration.HARD
+})
index 25401cfb14dea2ff9fed837f138f349218ff7b34..59a900e914404f89b1cedd28ed75330283e2d723 100644 (file)
@@ -1,5 +1,5 @@
 'use strict'
-const { ThreadWorker } = require('../../../lib/index')
+const { ThreadWorker, killBehaviorEnumeration } = require('../../../lib/index')
 
 async function sleep (data) {
   return new Promise((resolve, reject) => {
@@ -7,4 +7,8 @@ async function sleep (data) {
   })
 }
 
-module.exports = new ThreadWorker(sleep, { maxInactiveTime: 500, async: true })
+module.exports = new ThreadWorker(sleep, {
+  maxInactiveTime: 500,
+  async: true,
+  killBehavior: killBehaviorEnumeration.HARD
+})
index 006bf97cd390f59f2c5bd37c28a2bfbb307a889b..3b554ccc4588dc0b2675149bfb46a706a5222767 100644 (file)
@@ -1,8 +1,11 @@
 'use strict'
-const { ThreadWorker } = require('../../../lib/index')
+const { ThreadWorker, killBehaviorEnumeration } = require('../../../lib/index')
 
 function echo (data) {
   return data
 }
 
-module.exports = new ThreadWorker(echo, { maxInactiveTime: 500 })
+module.exports = new ThreadWorker(echo, {
+  maxInactiveTime: 500,
+  killBehavior: killBehaviorEnumeration.HARD
+})
index 69a83a7710fed25eeae541c9f1b83b737d135664..c7034caaa3b66f901f9ee26f7b9fd664b4f957c0 100644 (file)
@@ -1,6 +1,9 @@
 'use strict'
-const { ThreadWorker } = require('../../../lib/index')
+const { ThreadWorker, killBehaviorEnumeration } = require('../../../lib/index')
 
 function test (data) {}
 
-module.exports = new ThreadWorker(test, { maxInactiveTime: 500 })
+module.exports = new ThreadWorker(test, {
+  maxInactiveTime: 500,
+  killBehavior: killBehaviorEnumeration.HARD
+})
index 63a27513a74638aafbfabc08f04c6dec01f46ea9..6b7ee3c9e6dccdfcbcf063097e005ce8c531907e 100644 (file)
@@ -1,8 +1,11 @@
 'use strict'
-const { ThreadWorker } = require('../../../lib/index')
+const { ThreadWorker, killBehaviorEnumeration } = require('../../../lib/index')
 
 function error (data) {
   throw new Error(data)
 }
 
-module.exports = new ThreadWorker(error, { maxInactiveTime: 500 })
+module.exports = new ThreadWorker(error, {
+  maxInactiveTime: 500,
+  killBehavior: killBehaviorEnumeration.HARD
+})
diff --git a/tests/worker/thread/longRunningWorkerHardBehavior.js b/tests/worker/thread/longRunningWorkerHardBehavior.js
new file mode 100644 (file)
index 0000000..ec74579
--- /dev/null
@@ -0,0 +1,14 @@
+'use strict'
+const { ThreadWorker, killBehaviorEnumeration } = require('../../../lib/index')
+
+async function sleep (data) {
+  return new Promise((resolve, reject) => {
+    setTimeout(() => resolve(data), 50000)
+  })
+}
+
+module.exports = new ThreadWorker(sleep, {
+  maxInactiveTime: 500,
+  async: true,
+  killBehavior: killBehaviorEnumeration.HARD
+})
similarity index 69%
rename from tests/worker/thread/longRunningWorker.js
rename to tests/worker/thread/longRunningWorkerSoftBehavior.js
index 86891279a5ebeee7987395ed079ffe9944c4e7b1..eed0586fd2e7d9254e81b1f6e51ad76b8914be19 100644 (file)
@@ -7,4 +7,7 @@ async function sleep (data) {
   })
 }
 
-module.exports = new ThreadWorker(sleep, { maxInactiveTime: 500, async: true })
+module.exports = new ThreadWorker(sleep, {
+  maxInactiveTime: 500,
+  async: true
+})
index 3556da01f8051960125d833ec4b38e7196f6ad94..77dfdc8e050ccfca7a9a67f874b602e088c5234d 100644 (file)
@@ -1,5 +1,5 @@
 'use strict'
-const { ThreadWorker } = require('../../../lib/index')
+const { ThreadWorker, killBehaviorEnumeration } = require('../../../lib/index')
 const { isMainThread } = require('worker_threads')
 
 function test (data) {
@@ -12,4 +12,7 @@ function test (data) {
   return isMainThread
 }
 
-module.exports = new ThreadWorker(test, { maxInactiveTime: 500 })
+module.exports = new ThreadWorker(test, {
+  maxInactiveTime: 500,
+  killBehavior: killBehaviorEnumeration.HARD
+})