feat: improve events emission
authorJérôme Benoit <jerome.benoit@sap.com>
Fri, 2 Jun 2023 07:08:24 +0000 (09:08 +0200)
committerJérôme Benoit <jerome.benoit@sap.com>
Fri, 2 Jun 2023 07:08:24 +0000 (09:08 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>
CHANGELOG.md
src/pools/abstract-pool.ts
src/pools/pool.ts
src/utility-types.ts
src/worker/abstract-worker.ts

index 088d734a111947281a93fc231af596493473d926..a2d09e6e43f0b262d90d8728f612b2662c01169a 100644 (file)
@@ -7,11 +7,17 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
 
 ## [Unreleased]
 
+### Added
+
+- Add `taskError` pool event for task execution error.
+- Emit pool information on `busy` and `full` pool events.
+
 ## [2.5.1] - 2023-06-01
 
 ### Added
 
 - Add pool option `restartWorkerOnError` to restart worker on uncaught error. Default to `true`.
+- Add `error` pool event for uncaught worker error.
 
 ## [2.5.0] - 2023-05-31
 
index a558ef504568ad95abcf91d8f413968a2f07bf8f..3127557e20d38fe21ac5d30757450aff623fcef6 100644 (file)
@@ -599,6 +599,12 @@ export abstract class AbstractPool<
         if (promiseResponse != null) {
           if (message.error != null) {
             promiseResponse.reject(message.error)
+            if (this.emitter != null) {
+              this.emitter.emit(PoolEvents.taskError, {
+                error: message.error,
+                errorData: message.errorData
+              })
+            }
           } else {
             promiseResponse.resolve(message.data as Response)
           }
@@ -621,11 +627,17 @@ export abstract class AbstractPool<
 
   private checkAndEmitEvents (): void {
     if (this.emitter != null) {
+      const poolInfo = {
+        size: this.size,
+        workerNodes: this.workerNodes.length,
+        runningTasks: this.numberOfRunningTasks,
+        queuedTasks: this.numberOfQueuedTasks
+      }
       if (this.busy) {
-        this.emitter?.emit(PoolEvents.busy)
+        this.emitter?.emit(PoolEvents.busy, poolInfo)
       }
       if (this.type === PoolType.DYNAMIC && this.full) {
-        this.emitter?.emit(PoolEvents.full)
+        this.emitter?.emit(PoolEvents.full, poolInfo)
       }
     }
   }
index 89559d2c9a1bf0e6694709d3908a0263af0c448e..32adfc66ab7a7b7cd4e6dc95e7e8d9346592773c 100644 (file)
@@ -40,7 +40,8 @@ export class PoolEmitter extends EventEmitterAsyncResource {}
 export const PoolEvents = Object.freeze({
   full: 'full',
   busy: 'busy',
-  error: 'error'
+  error: 'error',
+  taskError: 'taskError'
 } as const)
 
 /**
@@ -147,7 +148,8 @@ export interface IPool<
    *
    * - `'full'`: Emitted when the pool is dynamic and full.
    * - `'busy'`: Emitted when the pool is busy.
-   * - `'error'`: Emitted when an error occurs.
+   * - `'error'`: Emitted when an uncaught error occurs.
+   * - `'taskError'`: Emitted when an error occurs while executing a task.
    */
   readonly emitter?: PoolEmitter
   /**
index cc04f2cb99113b9cb66305c102ae5c8fe88563ad..21a442538d4988eba011f7904b2467cc895c3593 100644 (file)
@@ -26,9 +26,13 @@ export interface MessageValue<
    */
   readonly kill?: KillBehavior | 1
   /**
-   * Error.
+   * Task error.
    */
   readonly error?: string
+  /**
+   * Task data triggering task error.
+   */
+  readonly errorData?: unknown
   /**
    * Runtime.
    */
index 8a38b4e576464ecb88078540cc8604d7b3ad323f..8e55b30152fc60fb47a403ad31c44e7fc32457dd 100644 (file)
@@ -212,13 +212,17 @@ export abstract class AbstractWorker<
       const runTime = performance.now() - startTimestamp
       this.sendToMainWorker({
         data: res,
-        id: message.id,
         runTime,
-        waitTime
+        waitTime,
+        id: message.id
       })
     } catch (e) {
       const err = this.handleError(e as Error)
-      this.sendToMainWorker({ error: err, id: message.id })
+      this.sendToMainWorker({
+        error: err,
+        errorData: message.data,
+        id: message.id
+      })
     } finally {
       !this.isMain && (this.lastTaskTimestamp = performance.now())
     }
@@ -241,15 +245,19 @@ export abstract class AbstractWorker<
         const runTime = performance.now() - startTimestamp
         this.sendToMainWorker({
           data: res,
-          id: message.id,
           runTime,
-          waitTime
+          waitTime,
+          id: message.id
         })
         return null
       })
       .catch(e => {
         const err = this.handleError(e as Error)
-        this.sendToMainWorker({ error: err, id: message.id })
+        this.sendToMainWorker({
+          error: err,
+          errorData: message.data,
+          id: message.id
+        })
       })
       .finally(() => {
         !this.isMain && (this.lastTaskTimestamp = performance.now())