fix: fix back pressure detection
[poolifier.git] / tests / pools / thread / fixed.test.js
index 849590a69f1bad0cbe8c903593002654b4dea70d..aeebc4a7a623a1151f37f41aaf7f56ddcfcadbc1 100644 (file)
@@ -10,7 +10,7 @@ describe('Fixed thread pool test suite', () => {
     numberOfThreads,
     './tests/worker-files/thread/testWorker.js',
     {
-      errorHandler: e => console.error(e)
+      errorHandler: (e) => console.error(e)
     }
   )
   const queuePool = new FixedThreadPool(
@@ -21,7 +21,7 @@ describe('Fixed thread pool test suite', () => {
       tasksQueueOptions: {
         concurrency: tasksConcurrency
       },
-      errorHandler: e => console.error(e)
+      errorHandler: (e) => console.error(e)
     }
   )
   const emptyPool = new FixedThreadPool(
@@ -37,14 +37,14 @@ describe('Fixed thread pool test suite', () => {
     numberOfThreads,
     './tests/worker-files/thread/errorWorker.js',
     {
-      errorHandler: e => console.error(e)
+      errorHandler: (e) => console.error(e)
     }
   )
   const asyncErrorPool = new FixedThreadPool(
     numberOfThreads,
     './tests/worker-files/thread/asyncErrorWorker.js',
     {
-      errorHandler: e => console.error(e)
+      errorHandler: (e) => console.error(e)
     }
   )
   const asyncPool = new FixedThreadPool(
@@ -83,7 +83,7 @@ describe('Fixed thread pool test suite', () => {
       numberOfThreads,
       './tests/worker-files/thread/testWorker.js',
       {
-        errorHandler: e => console.error(e)
+        errorHandler: (e) => console.error(e)
       }
     )
     let poolReady = 0
@@ -111,6 +111,7 @@ describe('Fixed thread pool test suite', () => {
     }
     expect(promises.size).toBe(numberOfThreads * maxMultiplier)
     for (const workerNode of queuePool.workerNodes) {
+      expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
       expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
         queuePool.opts.tasksQueueOptions.concurrency
       )
@@ -133,6 +134,7 @@ describe('Fixed thread pool test suite', () => {
       numberOfThreads *
         (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency)
     )
+    expect(queuePool.info.backPressure).toBe(false)
     await Promise.all(promises)
     for (const workerNode of queuePool.workerNodes) {
       expect(workerNode.usage.tasks.executing).toBe(0)
@@ -156,22 +158,35 @@ describe('Fixed thread pool test suite', () => {
   })
 
   it('Verify that transferable objects are sent to the worker correctly', async () => {
-    const transferList = [new ArrayBuffer(16), new MessageChannel().port1]
     let error
     let result
     try {
-      result = await pool.execute(undefined, undefined, transferList)
+      result = await pool.execute(undefined, undefined, [
+        new ArrayBuffer(16),
+        new MessageChannel().port1
+      ])
     } catch (e) {
       error = e
     }
     expect(result).toStrictEqual({ ok: 1 })
     expect(error).toBeUndefined()
+    try {
+      result = await pool.execute(undefined, undefined, [
+        new SharedArrayBuffer(16)
+      ])
+    } catch (e) {
+      error = e
+    }
+    expect(result).toStrictEqual({ ok: 1 })
+    expect(error).toStrictEqual(
+      new TypeError('Found invalid object in transferList')
+    )
   })
 
   it('Verify that error handling is working properly:sync', async () => {
     const data = { f: 10 }
     let taskError
-    errorPool.emitter.on(PoolEvents.taskError, e => {
+    errorPool.emitter.on(PoolEvents.taskError, (e) => {
       taskError = e
     })
     let inError
@@ -192,7 +207,7 @@ describe('Fixed thread pool test suite', () => {
     })
     expect(
       errorPool.workerNodes.some(
-        workerNode => workerNode.usage.tasks.failed === 1
+        (workerNode) => workerNode.usage.tasks.failed === 1
       )
     ).toBe(true)
   })
@@ -200,7 +215,7 @@ describe('Fixed thread pool test suite', () => {
   it('Verify that error handling is working properly:async', async () => {
     const data = { f: 10 }
     let taskError
-    asyncErrorPool.emitter.on(PoolEvents.taskError, e => {
+    asyncErrorPool.emitter.on(PoolEvents.taskError, (e) => {
       taskError = e
     })
     let inError
@@ -221,7 +236,7 @@ describe('Fixed thread pool test suite', () => {
     })
     expect(
       asyncErrorPool.workerNodes.some(
-        workerNode => workerNode.usage.tasks.failed === 1
+        (workerNode) => workerNode.usage.tasks.failed === 1
       )
     ).toBe(true)
   })
@@ -237,9 +252,12 @@ describe('Fixed thread pool test suite', () => {
 
   it('Shutdown test', async () => {
     const exitPromise = waitWorkerEvents(pool, 'exit', numberOfThreads)
+    let poolDestroy = 0
+    pool.emitter.on(PoolEvents.destroy, () => ++poolDestroy)
     await pool.destroy()
     const numberOfExitEvents = await exitPromise
     expect(numberOfExitEvents).toBe(numberOfThreads)
+    expect(poolDestroy).toBe(1)
   })
 
   it('Verify that thread pool options are checked', async () => {