feat: add worker kill handler success or failure reporting
authorJérôme Benoit <jerome.benoit@sap.com>
Mon, 14 Aug 2023 19:02:13 +0000 (21:02 +0200)
committerJérôme Benoit <jerome.benoit@sap.com>
Mon, 14 Aug 2023 19:02:13 +0000 (21:02 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>
src/pools/abstract-pool.ts
src/pools/cluster/fixed.ts
src/pools/thread/fixed.ts
src/utility-types.ts
src/worker/abstract-worker.ts
tests/worker/abstract-worker.test.js

index eb303232a208f3fcfc582614a7625831e1ab94c0..1d27428e05f3328427977bbda2947fcda9c873a4 100644 (file)
@@ -694,6 +694,23 @@ export abstract class AbstractPool<
     )
   }
 
+  protected async sendKillMessageToWorker (
+    workerNodeKey: number,
+    workerId: number
+  ): Promise<void> {
+    const waitForKillResponse = new Promise<void>((resolve, reject) => {
+      this.registerWorkerMessageListener(workerNodeKey, (message) => {
+        if (message.kill === 'success') {
+          resolve()
+        } else if (message.kill === 'failure') {
+          reject(new Error('Worker kill message handling failed'))
+        }
+      })
+    })
+    this.sendToWorker(workerNodeKey, { kill: true, workerId })
+    await waitForKillResponse
+  }
+
   /**
    * Terminates the worker node given its worker node key.
    *
@@ -934,7 +951,7 @@ export abstract class AbstractPool<
       // Kill message received from worker
       if (
         isKillBehavior(KillBehaviors.HARD, message.kill) ||
-        (message.kill != null &&
+        (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
           ((this.opts.enableTasksQueue === false &&
             workerUsage.tasks.executing === 0) ||
             (this.opts.enableTasksQueue === true &&
index b5b32527cf595b8b4ec8f52f3290a10e4647cbd4..9022c7025961e9fc206358a48843c9fc5070b0be 100644 (file)
@@ -72,7 +72,7 @@ export class FixedClusterPool<
     worker.on('disconnect', () => {
       worker.kill()
     })
-    this.sendToWorker(workerNodeKey, { kill: true, workerId: worker.id })
+    await this.sendKillMessageToWorker(workerNodeKey, worker.id)
     worker.disconnect()
     await waitWorkerExit
   }
index a193091f9fc7e1c4ac16c92a359eac470c0f6e78..13e4a11a50028364c055b6b60aca5cb03b87fcfd 100644 (file)
@@ -67,7 +67,7 @@ export class FixedThreadPool<
         resolve()
       })
     })
-    this.sendToWorker(workerNodeKey, { kill: true, workerId: worker.threadId })
+    await this.sendKillMessageToWorker(workerNodeKey, worker.threadId)
     workerNode.closeChannel()
     await worker.terminate()
     await waitWorkerExit
index d6869557d5af04ec06444e320d1fe6cac41cadc0..1783b4e6f36adfa9f778cbd74b3ce46975bb40a2 100644 (file)
@@ -101,7 +101,7 @@ export interface MessageValue<Data = unknown, ErrorData = unknown>
   /**
    * Kill code.
    */
-  readonly kill?: KillBehavior | true
+  readonly kill?: KillBehavior | true | 'success' | 'failure'
   /**
    * Task error.
    */
index 493ca0b293fc145b2eae812f0e0dd858b9b4e566..56a7696f0638fabaaccd7eb4100b7d19e358f0cd 100644 (file)
@@ -328,12 +328,27 @@ export abstract class AbstractWorker<
     this.stopCheckActive()
     if (isAsyncFunction(this.opts.killHandler)) {
       (this.opts.killHandler?.() as Promise<void>)
-        .then(() => this.emitDestroy())
+        .then(() => {
+          this.sendToMainWorker({ kill: 'success', workerId: this.id })
+          return null
+        })
+        .catch(() => {
+          this.sendToMainWorker({ kill: 'failure', workerId: this.id })
+        })
+        .finally(() => {
+          this.emitDestroy()
+        })
         .catch(EMPTY_FUNCTION)
     } else {
-      // eslint-disable-next-line @typescript-eslint/no-invalid-void-type
-      this.opts.killHandler?.() as void
-      this.emitDestroy()
+      try {
+        // eslint-disable-next-line @typescript-eslint/no-invalid-void-type
+        this.opts.killHandler?.() as void
+        this.sendToMainWorker({ kill: 'success', workerId: this.id })
+      } catch (error) {
+        this.sendToMainWorker({ kill: 'failure', workerId: this.id })
+      } finally {
+        this.emitDestroy()
+      }
     }
   }
 
index 46370777f4e1fcb5e1df8cef70ecdff08472578f..9efacab5e3f611d7a9b3f7cb9a75430954c3b3f4 100644 (file)
@@ -132,7 +132,12 @@ describe('Abstract worker test suite', () => {
       killHandler: sinon.stub().returns()
     })
     worker.isMain = false
+    worker.getMainWorker = sinon.stub().returns({
+      id: 1,
+      send: sinon.stub().returns()
+    })
     worker.handleKillMessage()
+    expect(worker.getMainWorker().send.calledOnce).toBe(true)
     expect(worker.opts.killHandler.calledOnce).toBe(true)
   })