From 1f68cedebde2cf95b6345300ac52cf7ca3ecbdff Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Thu, 1 Jun 2023 19:20:00 +0200 Subject: [PATCH] feat: restart worker in case of uncaught error MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- CHANGELOG.md | 2 +- README.md | 3 ++- src/pools/abstract-pool.ts | 19 ++++++++++++++++--- src/pools/cluster/dynamic.ts | 2 +- src/pools/cluster/fixed.ts | 2 +- src/pools/pool.ts | 8 +++++++- src/pools/thread/dynamic.ts | 2 +- src/pools/thread/fixed.ts | 2 +- src/worker/abstract-worker.ts | 5 +---- tests/pools/abstract/abstract-pool.test.js | 7 +++++-- tests/worker/abstract-worker.test.js | 4 ++-- 11 files changed, 38 insertions(+), 18 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 94ddcdf9..23210596 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,7 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added -- Allow to disable tasks timeout check in worker. +- Add pool option `restartWorkerOnError` to restart worker on uncaught error. Default to `true`. ## [2.5.0] - 2023-05-31 diff --git a/README.md b/README.md index 2f5fff41..1a311e4d 100644 --- a/README.md +++ b/README.md @@ -179,6 +179,8 @@ Node versions >= 16.14.x are supported. Default: `{ medRunTime: false }` +- `restartWorkerOnError` (optional) - Restart worker on uncaught error in this pool. + Default: true - `enableEvents` (optional) - Events emission enablement in this pool. Default: true - `enableTasksQueue` (optional) - Tasks queue per worker enablement in this pool. @@ -217,7 +219,6 @@ This method will call the terminate method on each worker. The last active time of your worker unit will be updated when a task is submitted to a worker or when a worker terminate a task. If `killBehavior` is set to `KillBehaviors.HARD` this value represents also the timeout for the tasks that you submit to the pool, when this timeout expires your tasks is interrupted and the worker is killed if is not part of the minimum size of the pool. If `killBehavior` is set to `KillBehaviors.SOFT` your tasks have no timeout and your workers will not be terminated until your task is completed. - 0: no tasks timeout check. Default: 60000 - `killBehavior` (optional) - Dictates if your async unit (worker/process) will be deleted in case that a task is active on it. diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index f32aab0d..a558ef50 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -149,6 +149,7 @@ export abstract class AbstractPool< this.checkValidWorkerChoiceStrategyOptions( this.opts.workerChoiceStrategyOptions ) + this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true this.opts.enableEvents = opts.enableEvents ?? true this.opts.enableTasksQueue = opts.enableTasksQueue ?? false if (this.opts.enableTasksQueue) { @@ -562,6 +563,16 @@ export abstract class AbstractPool< worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION) worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION) + worker.on('error', error => { + if (this.emitter != null) { + this.emitter.emit(PoolEvents.error, error) + } + }) + if (this.opts.restartWorkerOnError === true) { + worker.on('error', () => { + this.createAndSetupWorker() + }) + } worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION) worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION) worker.once('exit', () => { @@ -609,7 +620,7 @@ export abstract class AbstractPool< } private checkAndEmitEvents (): void { - if (this.opts.enableEvents === true) { + if (this.emitter != null) { if (this.busy) { this.emitter?.emit(PoolEvents.busy) } @@ -686,8 +697,10 @@ export abstract class AbstractPool< */ private removeWorkerNode (worker: Worker): void { const workerNodeKey = this.getWorkerNodeKey(worker) - this.workerNodes.splice(workerNodeKey, 1) - this.workerChoiceStrategyContext.remove(workerNodeKey) + if (workerNodeKey !== -1) { + this.workerNodes.splice(workerNodeKey, 1) + this.workerChoiceStrategyContext.remove(workerNodeKey) + } } private executeTask (workerNodeKey: number, task: Task): void { diff --git a/src/pools/cluster/dynamic.ts b/src/pools/cluster/dynamic.ts index eb2b9965..67020577 100644 --- a/src/pools/cluster/dynamic.ts +++ b/src/pools/cluster/dynamic.ts @@ -46,7 +46,7 @@ export class DynamicClusterPool< /** @inheritDoc */ protected get full (): boolean { - return this.workerNodes.length === this.max + return this.workerNodes.length >= this.max } /** @inheritDoc */ diff --git a/src/pools/cluster/fixed.ts b/src/pools/cluster/fixed.ts index 10ee92c5..89cab8ef 100644 --- a/src/pools/cluster/fixed.ts +++ b/src/pools/cluster/fixed.ts @@ -107,7 +107,7 @@ export class FixedClusterPool< /** @inheritDoc */ protected get full (): boolean { - return this.workerNodes.length === this.numberOfWorkers + return this.workerNodes.length >= this.numberOfWorkers } /** @inheritDoc */ diff --git a/src/pools/pool.ts b/src/pools/pool.ts index 14b30812..89559d2c 100644 --- a/src/pools/pool.ts +++ b/src/pools/pool.ts @@ -39,7 +39,8 @@ export class PoolEmitter extends EventEmitterAsyncResource {} */ export const PoolEvents = Object.freeze({ full: 'full', - busy: 'busy' + busy: 'busy', + error: 'error' } as const) /** @@ -91,6 +92,10 @@ export interface PoolOptions { * The worker choice strategy options. */ workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions + /** + * Restart worker on error. + */ + restartWorkerOnError?: boolean /** * Pool events emission. * @@ -142,6 +147,7 @@ export interface IPool< * * - `'full'`: Emitted when the pool is dynamic and full. * - `'busy'`: Emitted when the pool is busy. + * - `'error'`: Emitted when an error occurs. */ readonly emitter?: PoolEmitter /** diff --git a/src/pools/thread/dynamic.ts b/src/pools/thread/dynamic.ts index 56b923c4..0519873e 100644 --- a/src/pools/thread/dynamic.ts +++ b/src/pools/thread/dynamic.ts @@ -42,7 +42,7 @@ export class DynamicThreadPool< /** @inheritDoc */ protected get full (): boolean { - return this.workerNodes.length === this.max + return this.workerNodes.length >= this.max } /** @inheritDoc */ diff --git a/src/pools/thread/fixed.ts b/src/pools/thread/fixed.ts index 41fcee2b..816fa61a 100644 --- a/src/pools/thread/fixed.ts +++ b/src/pools/thread/fixed.ts @@ -103,7 +103,7 @@ export class FixedThreadPool< /** @inheritDoc */ protected get full (): boolean { - return this.workerNodes.length === this.numberOfWorkers + return this.workerNodes.length >= this.numberOfWorkers } /** @inheritDoc */ diff --git a/src/worker/abstract-worker.ts b/src/worker/abstract-worker.ts index 2afd6f34..8a38b4e5 100644 --- a/src/worker/abstract-worker.ts +++ b/src/worker/abstract-worker.ts @@ -71,10 +71,7 @@ export abstract class AbstractWorker< super(type) this.checkWorkerOptions(this.opts) this.checkTaskFunctions(taskFunctions) - if ( - !this.isMain && - (this.opts.maxInactiveTime ?? DEFAULT_MAX_INACTIVE_TIME) > 0 - ) { + if (!this.isMain) { this.lastTaskTimestamp = performance.now() this.aliveInterval = setInterval( this.checkAlive.bind(this), diff --git a/tests/pools/abstract/abstract-pool.test.js b/tests/pools/abstract/abstract-pool.test.js index 05abdc69..91c942f6 100644 --- a/tests/pools/abstract/abstract-pool.test.js +++ b/tests/pools/abstract/abstract-pool.test.js @@ -82,8 +82,9 @@ describe('Abstract pool test suite', () => { numberOfWorkers, './tests/worker-files/thread/testWorker.js' ) - expect(pool.opts.enableEvents).toBe(true) expect(pool.emitter).toBeDefined() + expect(pool.opts.enableEvents).toBe(true) + expect(pool.opts.restartWorkerOnError).toBe(true) expect(pool.opts.enableTasksQueue).toBe(false) expect(pool.opts.tasksQueueOptions).toBeUndefined() expect(pool.opts.workerChoiceStrategy).toBe( @@ -109,6 +110,7 @@ describe('Abstract pool test suite', () => { weights: { 0: 300, 1: 200 } }, enableEvents: false, + restartWorkerOnError: false, enableTasksQueue: true, tasksQueueOptions: { concurrency: 2 }, messageHandler: testHandler, @@ -117,8 +119,9 @@ describe('Abstract pool test suite', () => { exitHandler: testHandler } ) - expect(pool.opts.enableEvents).toBe(false) expect(pool.emitter).toBeUndefined() + expect(pool.opts.enableEvents).toBe(false) + expect(pool.opts.restartWorkerOnError).toBe(false) expect(pool.opts.enableTasksQueue).toBe(true) expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 }) expect(pool.opts.workerChoiceStrategy).toBe( diff --git a/tests/worker/abstract-worker.test.js b/tests/worker/abstract-worker.test.js index 02042620..458cec80 100644 --- a/tests/worker/abstract-worker.test.js +++ b/tests/worker/abstract-worker.test.js @@ -2,7 +2,7 @@ const { expect } = require('expect') const { ClusterWorker, KillBehaviors, ThreadWorker } = require('../../lib') describe('Abstract worker test suite', () => { - class StubPoolWithIsMainWorker extends ThreadWorker { + class StubWorkerWithMainWorker extends ThreadWorker { constructor (fn, opts) { super(fn, opts) this.mainWorker = undefined @@ -113,7 +113,7 @@ describe('Abstract worker test suite', () => { it('Verify that getMainWorker() throw error if main worker is not set', () => { expect(() => - new StubPoolWithIsMainWorker(() => {}).getMainWorker() + new StubWorkerWithMainWorker(() => {}).getMainWorker() ).toThrowError('Main worker was not set') }) }) -- 2.34.1