import { performance } from 'node:perf_hooks'
import type { TransferListItem } from 'node:worker_threads'
import { EventEmitterAsyncResource } from 'node:events'
+import { AsyncResource } from 'node:async_hooks'
import type {
MessageValue,
PromiseResponseWrapper,
this.promiseResponseMap.set(task.taskId as string, {
resolve,
reject,
- workerNodeKey
+ workerNodeKey,
+ ...(this.emitter != null && {
+ asyncResource: new AsyncResource('poolifier:task', {
+ triggerAsyncId: this.emitter.asyncId,
+ requireManualDestroy: true
+ })
+ })
})
if (
this.opts.enableTasksQueue === false ||
const { workerId, taskId, workerError, data } = message
const promiseResponse = this.promiseResponseMap.get(taskId as string)
if (promiseResponse != null) {
- const { resolve, reject, workerNodeKey } = promiseResponse
+ const { resolve, reject, workerNodeKey, asyncResource } = promiseResponse
if (workerError != null) {
this.emitter?.emit(PoolEvents.taskError, workerError)
- reject(workerError.message)
+ asyncResource != null
+ ? asyncResource.runInAsyncScope(
+ reject,
+ this.emitter,
+ workerError.message
+ )
+ : reject(workerError.message)
} else {
- resolve(data as Response)
+ asyncResource != null
+ ? asyncResource.runInAsyncScope(resolve, this.emitter, data)
+ : resolve(data as Response)
}
+ asyncResource?.emitDestroy()
this.afterTaskExecutionHook(workerNodeKey, message)
this.workerChoiceStrategyContext.update(workerNodeKey)
this.promiseResponseMap.delete(taskId as string)
import type { EventLoopUtilization } from 'node:perf_hooks'
import type { MessagePort, TransferListItem } from 'node:worker_threads'
+import type { AsyncResource } from 'node:async_hooks'
import type { KillBehavior } from './worker/worker-options'
/**
* The worker node key executing the task.
*/
readonly workerNodeKey: number
+ /**
+ * The asynchronous resource used to track the task execution.
+ */
+ readonly asyncResource?: AsyncResource
}
export type Writable<T> = { -readonly [P in keyof T]: T[P] }
import { dirname, join } from 'node:path'
import { readFileSync } from 'node:fs'
import { fileURLToPath } from 'node:url'
+import { createHook, executionAsyncId } from 'node:async_hooks'
import { expect } from 'expect'
import { restore, stub } from 'sinon'
import {
await pool.destroy()
})
+ it('Verify that pool asynchronous resource track tasks execution', async () => {
+ let taskAsyncId
+ let initCalls = 0
+ let beforeCalls = 0
+ let afterCalls = 0
+ let resolveCalls = 0
+ const hook = createHook({
+ init (asyncId, type) {
+ if (type === 'poolifier:task') {
+ initCalls++
+ taskAsyncId = asyncId
+ }
+ },
+ before (asyncId) {
+ if (asyncId === taskAsyncId) beforeCalls++
+ },
+ after (asyncId) {
+ if (asyncId === taskAsyncId) afterCalls++
+ },
+ promiseResolve () {
+ if (executionAsyncId() === taskAsyncId) resolveCalls++
+ }
+ })
+ hook.enable()
+ const pool = new FixedThreadPool(
+ numberOfWorkers,
+ './tests/worker-files/thread/testWorker.mjs'
+ )
+ await pool.execute()
+ hook.disable()
+ expect(initCalls).toBe(1)
+ expect(beforeCalls).toBe(1)
+ expect(afterCalls).toBe(1)
+ expect(resolveCalls).toBe(1)
+ })
+
it('Verify that hasTaskFunction() is working', async () => {
const dynamicThreadPool = new DynamicThreadPool(
Math.floor(numberOfWorkers / 2),