fix: fix task execution tracking with pool async resource
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Mon, 11 Dec 2023 17:36:35 +0000 (18:36 +0100)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Mon, 11 Dec 2023 17:36:35 +0000 (18:36 +0100)
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
CHANGELOG.md
src/pools/abstract-pool.ts
src/utility-types.ts
tests/pools/abstract-pool.test.mjs

index e0f4d00eff14db593d39c04e8f985e3e7a6c2c48..7a4b2b1dc525750124fe7ee3bcd0780b4aeace8c 100644 (file)
@@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
 
 ## [Unreleased]
 
+### Fixed
+
+- Ensure pool asynchronous resource properly track tasks execution
+
 ## [3.0.10] - 2023-12-08
 
 ### Changed
index 7ea9379665050a982259a4181c934a7eecc81adb..da0eb335b26f41092fb57560d255b5b0b624ad6e 100644 (file)
@@ -2,6 +2,7 @@ import { randomUUID } from 'node:crypto'
 import { performance } from 'node:perf_hooks'
 import type { TransferListItem } from 'node:worker_threads'
 import { EventEmitterAsyncResource } from 'node:events'
+import { AsyncResource } from 'node:async_hooks'
 import type {
   MessageValue,
   PromiseResponseWrapper,
@@ -933,7 +934,13 @@ export abstract class AbstractPool<
       this.promiseResponseMap.set(task.taskId as string, {
         resolve,
         reject,
-        workerNodeKey
+        workerNodeKey,
+        ...(this.emitter != null && {
+          asyncResource: new AsyncResource('poolifier:task', {
+            triggerAsyncId: this.emitter.asyncId,
+            requireManualDestroy: true
+          })
+        })
       })
       if (
         this.opts.enableTasksQueue === false ||
@@ -1715,13 +1722,22 @@ export abstract class AbstractPool<
     const { workerId, taskId, workerError, data } = message
     const promiseResponse = this.promiseResponseMap.get(taskId as string)
     if (promiseResponse != null) {
-      const { resolve, reject, workerNodeKey } = promiseResponse
+      const { resolve, reject, workerNodeKey, asyncResource } = promiseResponse
       if (workerError != null) {
         this.emitter?.emit(PoolEvents.taskError, workerError)
-        reject(workerError.message)
+        asyncResource != null
+          ? asyncResource.runInAsyncScope(
+            reject,
+            this.emitter,
+            workerError.message
+          )
+          : reject(workerError.message)
       } else {
-        resolve(data as Response)
+        asyncResource != null
+          ? asyncResource.runInAsyncScope(resolve, this.emitter, data)
+          : resolve(data as Response)
       }
+      asyncResource?.emitDestroy()
       this.afterTaskExecutionHook(workerNodeKey, message)
       this.workerChoiceStrategyContext.update(workerNodeKey)
       this.promiseResponseMap.delete(taskId as string)
index 60ec2f546fe50f2ab14ea96b49b60d839dd12ea0..5d2376cc585d710a62150b8cb93b1a7a9cfa63da 100644 (file)
@@ -1,5 +1,6 @@
 import type { EventLoopUtilization } from 'node:perf_hooks'
 import type { MessagePort, TransferListItem } from 'node:worker_threads'
+import type { AsyncResource } from 'node:async_hooks'
 import type { KillBehavior } from './worker/worker-options'
 
 /**
@@ -176,6 +177,10 @@ export interface PromiseResponseWrapper<Response = unknown> {
    * The worker node key executing the task.
    */
   readonly workerNodeKey: number
+  /**
+   * The asynchronous resource used to track the task execution.
+   */
+  readonly asyncResource?: AsyncResource
 }
 
 export type Writable<T> = { -readonly [P in keyof T]: T[P] }
index 9c1e49a41e0bddb2298f1bcd9a5d038c8d46645c..ec4ffabcdbbe77a694ec652ca59b4e00ec298670 100644 (file)
@@ -2,6 +2,7 @@ import { EventEmitterAsyncResource } from 'node:events'
 import { dirname, join } from 'node:path'
 import { readFileSync } from 'node:fs'
 import { fileURLToPath } from 'node:url'
+import { createHook, executionAsyncId } from 'node:async_hooks'
 import { expect } from 'expect'
 import { restore, stub } from 'sinon'
 import {
@@ -1267,6 +1268,42 @@ describe('Abstract pool test suite', () => {
     await pool.destroy()
   })
 
+  it('Verify that pool asynchronous resource track tasks execution', async () => {
+    let taskAsyncId
+    let initCalls = 0
+    let beforeCalls = 0
+    let afterCalls = 0
+    let resolveCalls = 0
+    const hook = createHook({
+      init (asyncId, type) {
+        if (type === 'poolifier:task') {
+          initCalls++
+          taskAsyncId = asyncId
+        }
+      },
+      before (asyncId) {
+        if (asyncId === taskAsyncId) beforeCalls++
+      },
+      after (asyncId) {
+        if (asyncId === taskAsyncId) afterCalls++
+      },
+      promiseResolve () {
+        if (executionAsyncId() === taskAsyncId) resolveCalls++
+      }
+    })
+    hook.enable()
+    const pool = new FixedThreadPool(
+      numberOfWorkers,
+      './tests/worker-files/thread/testWorker.mjs'
+    )
+    await pool.execute()
+    hook.disable()
+    expect(initCalls).toBe(1)
+    expect(beforeCalls).toBe(1)
+    expect(afterCalls).toBe(1)
+    expect(resolveCalls).toBe(1)
+  })
+
   it('Verify that hasTaskFunction() is working', async () => {
     const dynamicThreadPool = new DynamicThreadPool(
       Math.floor(numberOfWorkers / 2),