### Added
-- Allow to disable tasks timeout check in worker.
+- Add pool option `restartWorkerOnError` to restart worker on uncaught error. Default to `true`.
## [2.5.0] - 2023-05-31
Default: `{ medRunTime: false }`
+- `restartWorkerOnError` (optional) - Restart worker on uncaught error in this pool.
+ Default: true
- `enableEvents` (optional) - Events emission enablement in this pool.
Default: true
- `enableTasksQueue` (optional) - Tasks queue per worker enablement in this pool.
The last active time of your worker unit will be updated when a task is submitted to a worker or when a worker terminate a task.
If `killBehavior` is set to `KillBehaviors.HARD` this value represents also the timeout for the tasks that you submit to the pool, when this timeout expires your tasks is interrupted and the worker is killed if is not part of the minimum size of the pool.
If `killBehavior` is set to `KillBehaviors.SOFT` your tasks have no timeout and your workers will not be terminated until your task is completed.
- 0: no tasks timeout check.
Default: 60000
- `killBehavior` (optional) - Dictates if your async unit (worker/process) will be deleted in case that a task is active on it.
this.checkValidWorkerChoiceStrategyOptions(
this.opts.workerChoiceStrategyOptions
)
+ this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
this.opts.enableEvents = opts.enableEvents ?? true
this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
if (this.opts.enableTasksQueue) {
worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
+ worker.on('error', error => {
+ if (this.emitter != null) {
+ this.emitter.emit(PoolEvents.error, error)
+ }
+ })
+ if (this.opts.restartWorkerOnError === true) {
+ worker.on('error', () => {
+ this.createAndSetupWorker()
+ })
+ }
worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
worker.once('exit', () => {
}
private checkAndEmitEvents (): void {
- if (this.opts.enableEvents === true) {
+ if (this.emitter != null) {
if (this.busy) {
this.emitter?.emit(PoolEvents.busy)
}
*/
private removeWorkerNode (worker: Worker): void {
const workerNodeKey = this.getWorkerNodeKey(worker)
- this.workerNodes.splice(workerNodeKey, 1)
- this.workerChoiceStrategyContext.remove(workerNodeKey)
+ if (workerNodeKey !== -1) {
+ this.workerNodes.splice(workerNodeKey, 1)
+ this.workerChoiceStrategyContext.remove(workerNodeKey)
+ }
}
private executeTask (workerNodeKey: number, task: Task<Data>): void {
/** @inheritDoc */
protected get full (): boolean {
- return this.workerNodes.length === this.max
+ return this.workerNodes.length >= this.max
}
/** @inheritDoc */
/** @inheritDoc */
protected get full (): boolean {
- return this.workerNodes.length === this.numberOfWorkers
+ return this.workerNodes.length >= this.numberOfWorkers
}
/** @inheritDoc */
*/
export const PoolEvents = Object.freeze({
full: 'full',
- busy: 'busy'
+ busy: 'busy',
+ error: 'error'
} as const)
/**
* The worker choice strategy options.
*/
workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
+ /**
+ * Restart worker on error.
+ */
+ restartWorkerOnError?: boolean
/**
* Pool events emission.
*
*
* - `'full'`: Emitted when the pool is dynamic and full.
* - `'busy'`: Emitted when the pool is busy.
+ * - `'error'`: Emitted when an error occurs.
*/
readonly emitter?: PoolEmitter
/**
/** @inheritDoc */
protected get full (): boolean {
- return this.workerNodes.length === this.max
+ return this.workerNodes.length >= this.max
}
/** @inheritDoc */
/** @inheritDoc */
protected get full (): boolean {
- return this.workerNodes.length === this.numberOfWorkers
+ return this.workerNodes.length >= this.numberOfWorkers
}
/** @inheritDoc */
super(type)
this.checkWorkerOptions(this.opts)
this.checkTaskFunctions(taskFunctions)
- if (
- !this.isMain &&
- (this.opts.maxInactiveTime ?? DEFAULT_MAX_INACTIVE_TIME) > 0
- ) {
+ if (!this.isMain) {
this.lastTaskTimestamp = performance.now()
this.aliveInterval = setInterval(
this.checkAlive.bind(this),
numberOfWorkers,
'./tests/worker-files/thread/testWorker.js'
)
- expect(pool.opts.enableEvents).toBe(true)
expect(pool.emitter).toBeDefined()
+ expect(pool.opts.enableEvents).toBe(true)
+ expect(pool.opts.restartWorkerOnError).toBe(true)
expect(pool.opts.enableTasksQueue).toBe(false)
expect(pool.opts.tasksQueueOptions).toBeUndefined()
expect(pool.opts.workerChoiceStrategy).toBe(
weights: { 0: 300, 1: 200 }
},
enableEvents: false,
+ restartWorkerOnError: false,
enableTasksQueue: true,
tasksQueueOptions: { concurrency: 2 },
messageHandler: testHandler,
exitHandler: testHandler
}
)
- expect(pool.opts.enableEvents).toBe(false)
expect(pool.emitter).toBeUndefined()
+ expect(pool.opts.enableEvents).toBe(false)
+ expect(pool.opts.restartWorkerOnError).toBe(false)
expect(pool.opts.enableTasksQueue).toBe(true)
expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
expect(pool.opts.workerChoiceStrategy).toBe(
const { ClusterWorker, KillBehaviors, ThreadWorker } = require('../../lib')
describe('Abstract worker test suite', () => {
- class StubPoolWithIsMainWorker extends ThreadWorker {
+ class StubWorkerWithMainWorker extends ThreadWorker {
constructor (fn, opts) {
super(fn, opts)
this.mainWorker = undefined
it('Verify that getMainWorker() throw error if main worker is not set', () => {
expect(() =>
- new StubPoolWithIsMainWorker(() => {}).getMainWorker()
+ new StubWorkerWithMainWorker(() => {}).getMainWorker()
).toThrowError('Main worker was not set')
})
})