refactor: refine worker message handling error messsage
[poolifier.git] / src / worker / abstract-worker.ts
index ae7778c4c6ae067b4513987f19df0e692e0b6ebe..6bf84b875622ebaece3d8b4b8dae3c637593ed68 100644 (file)
@@ -84,7 +84,11 @@ export abstract class AbstractWorker<
        * The maximum time to keep this worker active while idle.
        * The pool automatically checks and terminates this worker when the time expires.
        */
-      maxInactiveTime: DEFAULT_MAX_INACTIVE_TIME
+      maxInactiveTime: DEFAULT_MAX_INACTIVE_TIME,
+      /**
+       * The function to call when the worker is killed.
+       */
+      killHandler: EMPTY_FUNCTION
     }
   ) {
     super(type)
@@ -100,6 +104,7 @@ export abstract class AbstractWorker<
     this.opts.maxInactiveTime =
       opts.maxInactiveTime ?? DEFAULT_MAX_INACTIVE_TIME
     delete this.opts.async
+    this.opts.killHandler = opts.killHandler ?? EMPTY_FUNCTION
   }
 
   /**
@@ -293,17 +298,19 @@ export abstract class AbstractWorker<
    * @param message - The received message.
    */
   protected messageListener (message: MessageValue<Data>): void {
-    if (message.workerId != null && message.workerId !== this.id) {
-      throw new Error('Message worker id does not match worker id')
+    if (this.isMain) {
+      throw new Error('Cannot handle message to worker in main worker')
+    } else if (message.workerId != null && message.workerId !== this.id) {
+      throw new Error(
+        `Message worker id ${message.workerId} does not match the worker id ${this.id}`
+      )
     } else if (message.workerId === this.id) {
       if (message.statistics != null) {
         // Statistics message received
         this.statistics = message.statistics
       } else if (message.checkActive != null) {
         // Check active message received
-        !this.isMain && message.checkActive
-          ? this.startCheckActive()
-          : this.stopCheckActive()
+        message.checkActive ? this.startCheckActive() : this.stopCheckActive()
       } else if (message.taskId != null && message.data != null) {
         // Task message received
         this.run(message)
@@ -320,8 +327,31 @@ export abstract class AbstractWorker<
    * @param message - The kill message.
    */
   protected handleKillMessage (message: MessageValue<Data>): void {
-    !this.isMain && this.stopCheckActive()
-    this.emitDestroy()
+    this.stopCheckActive()
+    if (isAsyncFunction(this.opts.killHandler)) {
+      (this.opts.killHandler?.() as Promise<void>)
+        .then(() => {
+          this.sendToMainWorker({ kill: 'success', workerId: this.id })
+          return null
+        })
+        .catch(() => {
+          this.sendToMainWorker({ kill: 'failure', workerId: this.id })
+        })
+        .finally(() => {
+          this.emitDestroy()
+        })
+        .catch(EMPTY_FUNCTION)
+    } else {
+      try {
+        // eslint-disable-next-line @typescript-eslint/no-invalid-void-type
+        this.opts.killHandler?.() as void
+        this.sendToMainWorker({ kill: 'success', workerId: this.id })
+      } catch {
+        this.sendToMainWorker({ kill: 'failure', workerId: this.id })
+      } finally {
+        this.emitDestroy()
+      }
+    }
   }
 
   /**
@@ -395,9 +425,6 @@ export abstract class AbstractWorker<
    * @throws {@link https://nodejs.org/api/errors.html#class-error} If the task function is not found.
    */
   protected run (task: Task<Data>): void {
-    if (this.isMain) {
-      throw new Error('Cannot run a task in the main worker')
-    }
     const fn = this.getTaskFunction(task.name)
     if (isAsyncFunction(fn)) {
       this.runInAsyncScope(this.runAsync.bind(this), this, fn, task)
@@ -529,7 +556,7 @@ export abstract class AbstractWorker<
   }
 
   private updateLastTaskTimestamp (): void {
-    if (!this.isMain && this.activeInterval != null) {
+    if (this.activeInterval != null) {
       this.lastTaskTimestamp = performance.now()
     }
   }