test: improve task error handling
authorJérôme Benoit <jerome.benoit@sap.com>
Tue, 6 Jun 2023 22:07:53 +0000 (00:07 +0200)
committerJérôme Benoit <jerome.benoit@sap.com>
Tue, 6 Jun 2023 22:11:05 +0000 (00:11 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>
tests/pools/cluster/dynamic.test.js
tests/pools/cluster/fixed.test.js
tests/pools/thread/dynamic.test.js
tests/pools/thread/fixed.test.js
tests/test-utils.js

index c737c1d90ebc08743d5ae2a0af99c8957433376c..291629f59b93f195eaf2d9b4a57f5e2649786af0 100644 (file)
@@ -37,7 +37,7 @@ describe('Dynamic cluster pool test suite', () => {
     // The `busy` event is triggered when the number of submitted tasks at once reach the max number of workers in the dynamic pool.
     // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool.
     expect(poolBusy).toBe(max + 1)
-    const numberOfExitEvents = await TestUtils.waitExits(pool, max - min)
+    const numberOfExitEvents = await TestUtils.waitWorkerExits(pool, max - min)
     expect(numberOfExitEvents).toBe(max - min)
   })
 
@@ -47,18 +47,18 @@ describe('Dynamic cluster pool test suite', () => {
       pool.execute()
     }
     expect(pool.workerNodes.length).toBeGreaterThan(min)
-    await TestUtils.waitExits(pool, max - min)
+    await TestUtils.waitWorkerExits(pool, max - min)
     expect(pool.workerNodes.length).toBe(min)
     for (let i = 0; i < max * 2; i++) {
       pool.execute()
     }
     expect(pool.workerNodes.length).toBeGreaterThan(min)
-    await TestUtils.waitExits(pool, max - min)
+    await TestUtils.waitWorkerExits(pool, max - min)
     expect(pool.workerNodes.length).toBe(min)
   })
 
   it('Shutdown test', async () => {
-    const exitPromise = TestUtils.waitExits(pool, min)
+    const exitPromise = TestUtils.waitWorkerExits(pool, min)
     await pool.destroy()
     const numberOfExitEvents = await exitPromise
     expect(numberOfExitEvents).toBe(min)
@@ -98,7 +98,7 @@ describe('Dynamic cluster pool test suite', () => {
       longRunningPool.execute()
     }
     expect(longRunningPool.workerNodes.length).toBe(max)
-    await TestUtils.waitExits(longRunningPool, max - min)
+    await TestUtils.waitWorkerExits(longRunningPool, max - min)
     expect(longRunningPool.workerNodes.length).toBe(min)
     expect(
       longRunningPool.workerChoiceStrategyContext.workerChoiceStrategies.get(
index 2cba2d8b7c1d09435538d547f9e6674e9427c36e..c6cdce54617c36ffc05d76cfcfc5ff190cfda194 100644 (file)
@@ -157,10 +157,10 @@ describe('Fixed cluster pool test suite', () => {
 
   it('Verify that error handling is working properly:async', async () => {
     const data = { f: 10 }
-    // let taskError
-    // errorPool.emitter.on(PoolEvents.taskError, e => {
-    //   taskError = e
-    // })
+    let taskError
+    asyncErrorPool.emitter.on(PoolEvents.taskError, e => {
+      taskError = e
+    })
     let inError
     try {
       await asyncErrorPool.execute(data)
@@ -170,10 +170,10 @@ describe('Fixed cluster pool test suite', () => {
     expect(inError).toBeDefined()
     expect(typeof inError === 'string').toBe(true)
     expect(inError).toBe('Error Message from ClusterWorker:async')
-    // expect(taskError).toStrictEqual({
-    //   error: 'Error Message from ClusterWorker:async',
-    //   errorData: data
-    // })
+    expect(taskError).toStrictEqual({
+      error: 'Error Message from ClusterWorker:async',
+      errorData: data
+    })
     expect(
       asyncErrorPool.workerNodes.some(
         workerNode => workerNode.tasksUsage.error === 1
@@ -191,7 +191,7 @@ describe('Fixed cluster pool test suite', () => {
   })
 
   it('Shutdown test', async () => {
-    const exitPromise = TestUtils.waitExits(pool, numberOfWorkers)
+    const exitPromise = TestUtils.waitWorkerExits(pool, numberOfWorkers)
     await pool.destroy()
     const numberOfExitEvents = await exitPromise
     expect(numberOfExitEvents).toBe(numberOfWorkers)
index 1f2a646355b04ba8b10908edd1bda43c817c43f9..404f2113d95c3cbefc27ddd9efa8da08b1eeb1cd 100644 (file)
@@ -37,7 +37,7 @@ describe('Dynamic thread pool test suite', () => {
     // The `busy` event is triggered when the number of submitted tasks at once reach the max number of workers in the dynamic pool.
     // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool.
     expect(poolBusy).toBe(max + 1)
-    const numberOfExitEvents = await TestUtils.waitExits(pool, max - min)
+    const numberOfExitEvents = await TestUtils.waitWorkerExits(pool, max - min)
     expect(numberOfExitEvents).toBe(max - min)
   })
 
@@ -47,18 +47,18 @@ describe('Dynamic thread pool test suite', () => {
       pool.execute()
     }
     expect(pool.workerNodes.length).toBe(max)
-    await TestUtils.waitExits(pool, max - min)
+    await TestUtils.waitWorkerExits(pool, max - min)
     expect(pool.workerNodes.length).toBe(min)
     for (let i = 0; i < max * 2; i++) {
       pool.execute()
     }
     expect(pool.workerNodes.length).toBe(max)
-    await TestUtils.waitExits(pool, max - min)
+    await TestUtils.waitWorkerExits(pool, max - min)
     expect(pool.workerNodes.length).toBe(min)
   })
 
   it('Shutdown test', async () => {
-    const exitPromise = TestUtils.waitExits(pool, min)
+    const exitPromise = TestUtils.waitWorkerExits(pool, min)
     await pool.destroy()
     const numberOfExitEvents = await exitPromise
     expect(numberOfExitEvents).toBe(min)
@@ -98,7 +98,7 @@ describe('Dynamic thread pool test suite', () => {
       longRunningPool.execute()
     }
     expect(longRunningPool.workerNodes.length).toBe(max)
-    await TestUtils.waitExits(longRunningPool, max - min)
+    await TestUtils.waitWorkerExits(longRunningPool, max - min)
     expect(longRunningPool.workerNodes.length).toBe(min)
     expect(
       longRunningPool.workerChoiceStrategyContext.workerChoiceStrategies.get(
index 48dd10b6800d788de30f3040ec9646e88db15019..5dd97665f6177767e1502da8825b4200567e570e 100644 (file)
@@ -159,10 +159,10 @@ describe('Fixed thread pool test suite', () => {
 
   it('Verify that error handling is working properly:async', async () => {
     const data = { f: 10 }
-    // let taskError
-    // errorPool.emitter.on(PoolEvents.taskError, e => {
-    //   taskError = e
-    // })
+    let taskError
+    asyncErrorPool.emitter.on(PoolEvents.taskError, e => {
+      taskError = e
+    })
     let inError
     try {
       await asyncErrorPool.execute(data)
@@ -174,10 +174,10 @@ describe('Fixed thread pool test suite', () => {
     expect(inError.message).toBeDefined()
     expect(typeof inError.message === 'string').toBe(true)
     expect(inError.message).toBe('Error Message from ThreadWorker:async')
-    // expect(taskError).toStrictEqual({
-    //   error: new Error('Error Message from ThreadWorker:async'),
-    //   errorData: data
-    // })
+    expect(taskError).toStrictEqual({
+      error: new Error('Error Message from ThreadWorker:async'),
+      errorData: data
+    })
     expect(
       asyncErrorPool.workerNodes.some(
         workerNode => workerNode.tasksUsage.error === 1
@@ -195,7 +195,7 @@ describe('Fixed thread pool test suite', () => {
   })
 
   it('Shutdown test', async () => {
-    const exitPromise = TestUtils.waitExits(pool, numberOfThreads)
+    const exitPromise = TestUtils.waitWorkerExits(pool, numberOfThreads)
     await pool.destroy()
     const numberOfExitEvents = await exitPromise
     expect(numberOfExitEvents).toBe(numberOfThreads)
index 0e71c718a8eee83c85011e10f417725811ae34f8..cba320a75896311e11d7b0b69ff27cf721dd0807 100644 (file)
@@ -1,7 +1,7 @@
 const { WorkerFunctions } = require('./test-types')
 
 class TestUtils {
-  static async waitExits (pool, numberOfExitEventsToWait) {
+  static async waitWorkerExits (pool, numberOfExitEventsToWait) {
     return new Promise(resolve => {
       let exitEvents = 0
       if (numberOfExitEventsToWait === 0) {
@@ -18,6 +18,21 @@ class TestUtils {
     })
   }
 
+  static async waitPoolEvents (pool, poolEvent, numberOfEventsToWait) {
+    return new Promise(resolve => {
+      let events = 0
+      if (numberOfEventsToWait === 0) {
+        resolve(events)
+      }
+      pool.emitter.on(poolEvent, () => {
+        ++events
+        if (events === numberOfEventsToWait) {
+          resolve(events)
+        }
+      })
+    })
+  }
+
   static async sleep (ms) {
     return new Promise(resolve => setTimeout(resolve, ms))
   }