From 91ee39ed9280daa760f898e84b4e7dc68debe5ba Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Fri, 2 Jun 2023 09:08:24 +0200 Subject: [PATCH] feat: improve events emission MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- CHANGELOG.md | 6 ++++++ src/pools/abstract-pool.ts | 16 ++++++++++++++-- src/pools/pool.ts | 6 ++++-- src/utility-types.ts | 6 +++++- src/worker/abstract-worker.ts | 20 ++++++++++++++------ 5 files changed, 43 insertions(+), 11 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 088d734a..a2d09e6e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,11 +7,17 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- Add `taskError` pool event for task execution error. +- Emit pool information on `busy` and `full` pool events. + ## [2.5.1] - 2023-06-01 ### Added - Add pool option `restartWorkerOnError` to restart worker on uncaught error. Default to `true`. +- Add `error` pool event for uncaught worker error. ## [2.5.0] - 2023-05-31 diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index a558ef50..3127557e 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -599,6 +599,12 @@ export abstract class AbstractPool< if (promiseResponse != null) { if (message.error != null) { promiseResponse.reject(message.error) + if (this.emitter != null) { + this.emitter.emit(PoolEvents.taskError, { + error: message.error, + errorData: message.errorData + }) + } } else { promiseResponse.resolve(message.data as Response) } @@ -621,11 +627,17 @@ export abstract class AbstractPool< private checkAndEmitEvents (): void { if (this.emitter != null) { + const poolInfo = { + size: this.size, + workerNodes: this.workerNodes.length, + runningTasks: this.numberOfRunningTasks, + queuedTasks: this.numberOfQueuedTasks + } if (this.busy) { - this.emitter?.emit(PoolEvents.busy) + this.emitter?.emit(PoolEvents.busy, poolInfo) } if (this.type === PoolType.DYNAMIC && this.full) { - this.emitter?.emit(PoolEvents.full) + this.emitter?.emit(PoolEvents.full, poolInfo) } } } diff --git a/src/pools/pool.ts b/src/pools/pool.ts index 89559d2c..32adfc66 100644 --- a/src/pools/pool.ts +++ b/src/pools/pool.ts @@ -40,7 +40,8 @@ export class PoolEmitter extends EventEmitterAsyncResource {} export const PoolEvents = Object.freeze({ full: 'full', busy: 'busy', - error: 'error' + error: 'error', + taskError: 'taskError' } as const) /** @@ -147,7 +148,8 @@ export interface IPool< * * - `'full'`: Emitted when the pool is dynamic and full. * - `'busy'`: Emitted when the pool is busy. - * - `'error'`: Emitted when an error occurs. + * - `'error'`: Emitted when an uncaught error occurs. + * - `'taskError'`: Emitted when an error occurs while executing a task. */ readonly emitter?: PoolEmitter /** diff --git a/src/utility-types.ts b/src/utility-types.ts index cc04f2cb..21a44253 100644 --- a/src/utility-types.ts +++ b/src/utility-types.ts @@ -26,9 +26,13 @@ export interface MessageValue< */ readonly kill?: KillBehavior | 1 /** - * Error. + * Task error. */ readonly error?: string + /** + * Task data triggering task error. + */ + readonly errorData?: unknown /** * Runtime. */ diff --git a/src/worker/abstract-worker.ts b/src/worker/abstract-worker.ts index 8a38b4e5..8e55b301 100644 --- a/src/worker/abstract-worker.ts +++ b/src/worker/abstract-worker.ts @@ -212,13 +212,17 @@ export abstract class AbstractWorker< const runTime = performance.now() - startTimestamp this.sendToMainWorker({ data: res, - id: message.id, runTime, - waitTime + waitTime, + id: message.id }) } catch (e) { const err = this.handleError(e as Error) - this.sendToMainWorker({ error: err, id: message.id }) + this.sendToMainWorker({ + error: err, + errorData: message.data, + id: message.id + }) } finally { !this.isMain && (this.lastTaskTimestamp = performance.now()) } @@ -241,15 +245,19 @@ export abstract class AbstractWorker< const runTime = performance.now() - startTimestamp this.sendToMainWorker({ data: res, - id: message.id, runTime, - waitTime + waitTime, + id: message.id }) return null }) .catch(e => { const err = this.handleError(e as Error) - this.sendToMainWorker({ error: err, id: message.id }) + this.sendToMainWorker({ + error: err, + errorData: message.data, + id: message.id + }) }) .finally(() => { !this.isMain && (this.lastTaskTimestamp = performance.now()) -- 2.34.1