feat: restart worker in case of uncaught error
authorJérôme Benoit <jerome.benoit@sap.com>
Thu, 1 Jun 2023 17:20:00 +0000 (19:20 +0200)
committerJérôme Benoit <jerome.benoit@sap.com>
Thu, 1 Jun 2023 17:20:00 +0000 (19:20 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>
CHANGELOG.md
README.md
src/pools/abstract-pool.ts
src/pools/cluster/dynamic.ts
src/pools/cluster/fixed.ts
src/pools/pool.ts
src/pools/thread/dynamic.ts
src/pools/thread/fixed.ts
src/worker/abstract-worker.ts
tests/pools/abstract/abstract-pool.test.js
tests/worker/abstract-worker.test.js

index 94ddcdf97388917855a63ed3cac2a60bf4fe99ab..2321059654a113312f655e769b76b7c6836b3ab2 100644 (file)
@@ -9,7 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
 
 ### Added
 
-- Allow to disable tasks timeout check in worker.
+- Add pool option `restartWorkerOnError` to restart worker on uncaught error. Default to `true`.
 
 ## [2.5.0] - 2023-05-31
 
index 2f5fff4102e607afc604c3d062198637f3071660..1a311e4d5fcec3b0a46125e11ffb42f7853b18d3 100644 (file)
--- a/README.md
+++ b/README.md
@@ -179,6 +179,8 @@ Node versions >= 16.14.x are supported.
 
   Default: `{ medRunTime: false }`
 
+- `restartWorkerOnError` (optional) - Restart worker on uncaught error in this pool.  
+  Default: true
 - `enableEvents` (optional) - Events emission enablement in this pool.  
   Default: true
 - `enableTasksQueue` (optional) - Tasks queue per worker enablement in this pool.  
@@ -217,7 +219,6 @@ This method will call the terminate method on each worker.
   The last active time of your worker unit will be updated when a task is submitted to a worker or when a worker terminate a task.  
   If `killBehavior` is set to `KillBehaviors.HARD` this value represents also the timeout for the tasks that you submit to the pool, when this timeout expires your tasks is interrupted and the worker is killed if is not part of the minimum size of the pool.  
   If `killBehavior` is set to `KillBehaviors.SOFT` your tasks have no timeout and your workers will not be terminated until your task is completed.  
-  0: no tasks timeout check.  
   Default: 60000
 
 - `killBehavior` (optional) - Dictates if your async unit (worker/process) will be deleted in case that a task is active on it.  
index f32aab0dc838409e2514bf0377c4ba9276180579..a558ef504568ad95abcf91d8f413968a2f07bf8f 100644 (file)
@@ -149,6 +149,7 @@ export abstract class AbstractPool<
       this.checkValidWorkerChoiceStrategyOptions(
         this.opts.workerChoiceStrategyOptions
       )
+      this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
       this.opts.enableEvents = opts.enableEvents ?? true
       this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
       if (this.opts.enableTasksQueue) {
@@ -562,6 +563,16 @@ export abstract class AbstractPool<
 
     worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
     worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
+    worker.on('error', error => {
+      if (this.emitter != null) {
+        this.emitter.emit(PoolEvents.error, error)
+      }
+    })
+    if (this.opts.restartWorkerOnError === true) {
+      worker.on('error', () => {
+        this.createAndSetupWorker()
+      })
+    }
     worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
     worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
     worker.once('exit', () => {
@@ -609,7 +620,7 @@ export abstract class AbstractPool<
   }
 
   private checkAndEmitEvents (): void {
-    if (this.opts.enableEvents === true) {
+    if (this.emitter != null) {
       if (this.busy) {
         this.emitter?.emit(PoolEvents.busy)
       }
@@ -686,8 +697,10 @@ export abstract class AbstractPool<
    */
   private removeWorkerNode (worker: Worker): void {
     const workerNodeKey = this.getWorkerNodeKey(worker)
-    this.workerNodes.splice(workerNodeKey, 1)
-    this.workerChoiceStrategyContext.remove(workerNodeKey)
+    if (workerNodeKey !== -1) {
+      this.workerNodes.splice(workerNodeKey, 1)
+      this.workerChoiceStrategyContext.remove(workerNodeKey)
+    }
   }
 
   private executeTask (workerNodeKey: number, task: Task<Data>): void {
index eb2b996571f665ff14154cf338c387c5dfa28266..67020577d2be51aaedce5fb37015a0e3edddca91 100644 (file)
@@ -46,7 +46,7 @@ export class DynamicClusterPool<
 
   /** @inheritDoc */
   protected get full (): boolean {
-    return this.workerNodes.length === this.max
+    return this.workerNodes.length >= this.max
   }
 
   /** @inheritDoc */
index 10ee92c57876663d07ae96ecdcde036013395126..89cab8ef0eba31aacbbde03fa4de24d68d9530d6 100644 (file)
@@ -107,7 +107,7 @@ export class FixedClusterPool<
 
   /** @inheritDoc */
   protected get full (): boolean {
-    return this.workerNodes.length === this.numberOfWorkers
+    return this.workerNodes.length >= this.numberOfWorkers
   }
 
   /** @inheritDoc */
index 14b30812d14bd21e55c0af4aee77db5daf8bc7e4..89559d2c9a1bf0e6694709d3908a0263af0c448e 100644 (file)
@@ -39,7 +39,8 @@ export class PoolEmitter extends EventEmitterAsyncResource {}
  */
 export const PoolEvents = Object.freeze({
   full: 'full',
-  busy: 'busy'
+  busy: 'busy',
+  error: 'error'
 } as const)
 
 /**
@@ -91,6 +92,10 @@ export interface PoolOptions<Worker extends IWorker> {
    * The worker choice strategy options.
    */
   workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
+  /**
+   * Restart worker on error.
+   */
+  restartWorkerOnError?: boolean
   /**
    * Pool events emission.
    *
@@ -142,6 +147,7 @@ export interface IPool<
    *
    * - `'full'`: Emitted when the pool is dynamic and full.
    * - `'busy'`: Emitted when the pool is busy.
+   * - `'error'`: Emitted when an error occurs.
    */
   readonly emitter?: PoolEmitter
   /**
index 56b923c4d2cd0f4c3800aba0b24abd41f83b4799..0519873ec6d870ef0c1d08e6cb872541cf5a0865 100644 (file)
@@ -42,7 +42,7 @@ export class DynamicThreadPool<
 
   /** @inheritDoc */
   protected get full (): boolean {
-    return this.workerNodes.length === this.max
+    return this.workerNodes.length >= this.max
   }
 
   /** @inheritDoc */
index 41fcee2b60c2c5a36b5f24bc35d1cdce4a03c326..816fa61a432db55b0980092bb3bf7f45818c1364 100644 (file)
@@ -103,7 +103,7 @@ export class FixedThreadPool<
 
   /** @inheritDoc */
   protected get full (): boolean {
-    return this.workerNodes.length === this.numberOfWorkers
+    return this.workerNodes.length >= this.numberOfWorkers
   }
 
   /** @inheritDoc */
index 2afd6f3486a1103347a258c575cece7773e3c4d3..8a38b4e576464ecb88078540cc8604d7b3ad323f 100644 (file)
@@ -71,10 +71,7 @@ export abstract class AbstractWorker<
     super(type)
     this.checkWorkerOptions(this.opts)
     this.checkTaskFunctions(taskFunctions)
-    if (
-      !this.isMain &&
-      (this.opts.maxInactiveTime ?? DEFAULT_MAX_INACTIVE_TIME) > 0
-    ) {
+    if (!this.isMain) {
       this.lastTaskTimestamp = performance.now()
       this.aliveInterval = setInterval(
         this.checkAlive.bind(this),
index 05abdc69d7c31369f42db85505531aa77022f9d3..91c942f646b65b9de10ec6f40dba2f70aa333730 100644 (file)
@@ -82,8 +82,9 @@ describe('Abstract pool test suite', () => {
       numberOfWorkers,
       './tests/worker-files/thread/testWorker.js'
     )
-    expect(pool.opts.enableEvents).toBe(true)
     expect(pool.emitter).toBeDefined()
+    expect(pool.opts.enableEvents).toBe(true)
+    expect(pool.opts.restartWorkerOnError).toBe(true)
     expect(pool.opts.enableTasksQueue).toBe(false)
     expect(pool.opts.tasksQueueOptions).toBeUndefined()
     expect(pool.opts.workerChoiceStrategy).toBe(
@@ -109,6 +110,7 @@ describe('Abstract pool test suite', () => {
           weights: { 0: 300, 1: 200 }
         },
         enableEvents: false,
+        restartWorkerOnError: false,
         enableTasksQueue: true,
         tasksQueueOptions: { concurrency: 2 },
         messageHandler: testHandler,
@@ -117,8 +119,9 @@ describe('Abstract pool test suite', () => {
         exitHandler: testHandler
       }
     )
-    expect(pool.opts.enableEvents).toBe(false)
     expect(pool.emitter).toBeUndefined()
+    expect(pool.opts.enableEvents).toBe(false)
+    expect(pool.opts.restartWorkerOnError).toBe(false)
     expect(pool.opts.enableTasksQueue).toBe(true)
     expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
     expect(pool.opts.workerChoiceStrategy).toBe(
index 02042620ea0a0bd4f57d41912de49e79829c37bf..458cec807bbb7a22b4c4dce5023e0186821de0ee 100644 (file)
@@ -2,7 +2,7 @@ const { expect } = require('expect')
 const { ClusterWorker, KillBehaviors, ThreadWorker } = require('../../lib')
 
 describe('Abstract worker test suite', () => {
-  class StubPoolWithIsMainWorker extends ThreadWorker {
+  class StubWorkerWithMainWorker extends ThreadWorker {
     constructor (fn, opts) {
       super(fn, opts)
       this.mainWorker = undefined
@@ -113,7 +113,7 @@ describe('Abstract worker test suite', () => {
 
   it('Verify that getMainWorker() throw error if main worker is not set', () => {
     expect(() =>
-      new StubPoolWithIsMainWorker(() => {}).getMainWorker()
+      new StubWorkerWithMainWorker(() => {}).getMainWorker()
     ).toThrowError('Main worker was not set')
   })
 })