From f18fd12bf1ec2ea3649374af210a65b9a9b5a54c Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Mon, 11 Dec 2023 18:36:35 +0100 Subject: [PATCH] fix: fix task execution tracking with pool async resource MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- CHANGELOG.md | 4 ++++ src/pools/abstract-pool.ts | 24 +++++++++++++++---- src/utility-types.ts | 5 ++++ tests/pools/abstract-pool.test.mjs | 37 ++++++++++++++++++++++++++++++ 4 files changed, 66 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e0f4d00e..7a4b2b1d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Fixed + +- Ensure pool asynchronous resource properly track tasks execution + ## [3.0.10] - 2023-12-08 ### Changed diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 7ea93796..da0eb335 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -2,6 +2,7 @@ import { randomUUID } from 'node:crypto' import { performance } from 'node:perf_hooks' import type { TransferListItem } from 'node:worker_threads' import { EventEmitterAsyncResource } from 'node:events' +import { AsyncResource } from 'node:async_hooks' import type { MessageValue, PromiseResponseWrapper, @@ -933,7 +934,13 @@ export abstract class AbstractPool< this.promiseResponseMap.set(task.taskId as string, { resolve, reject, - workerNodeKey + workerNodeKey, + ...(this.emitter != null && { + asyncResource: new AsyncResource('poolifier:task', { + triggerAsyncId: this.emitter.asyncId, + requireManualDestroy: true + }) + }) }) if ( this.opts.enableTasksQueue === false || @@ -1715,13 +1722,22 @@ export abstract class AbstractPool< const { workerId, taskId, workerError, data } = message const promiseResponse = this.promiseResponseMap.get(taskId as string) if (promiseResponse != null) { - const { resolve, reject, workerNodeKey } = promiseResponse + const { resolve, reject, workerNodeKey, asyncResource } = promiseResponse if (workerError != null) { this.emitter?.emit(PoolEvents.taskError, workerError) - reject(workerError.message) + asyncResource != null + ? asyncResource.runInAsyncScope( + reject, + this.emitter, + workerError.message + ) + : reject(workerError.message) } else { - resolve(data as Response) + asyncResource != null + ? asyncResource.runInAsyncScope(resolve, this.emitter, data) + : resolve(data as Response) } + asyncResource?.emitDestroy() this.afterTaskExecutionHook(workerNodeKey, message) this.workerChoiceStrategyContext.update(workerNodeKey) this.promiseResponseMap.delete(taskId as string) diff --git a/src/utility-types.ts b/src/utility-types.ts index 60ec2f54..5d2376cc 100644 --- a/src/utility-types.ts +++ b/src/utility-types.ts @@ -1,5 +1,6 @@ import type { EventLoopUtilization } from 'node:perf_hooks' import type { MessagePort, TransferListItem } from 'node:worker_threads' +import type { AsyncResource } from 'node:async_hooks' import type { KillBehavior } from './worker/worker-options' /** @@ -176,6 +177,10 @@ export interface PromiseResponseWrapper { * The worker node key executing the task. */ readonly workerNodeKey: number + /** + * The asynchronous resource used to track the task execution. + */ + readonly asyncResource?: AsyncResource } export type Writable = { -readonly [P in keyof T]: T[P] } diff --git a/tests/pools/abstract-pool.test.mjs b/tests/pools/abstract-pool.test.mjs index 9c1e49a4..ec4ffabc 100644 --- a/tests/pools/abstract-pool.test.mjs +++ b/tests/pools/abstract-pool.test.mjs @@ -2,6 +2,7 @@ import { EventEmitterAsyncResource } from 'node:events' import { dirname, join } from 'node:path' import { readFileSync } from 'node:fs' import { fileURLToPath } from 'node:url' +import { createHook, executionAsyncId } from 'node:async_hooks' import { expect } from 'expect' import { restore, stub } from 'sinon' import { @@ -1267,6 +1268,42 @@ describe('Abstract pool test suite', () => { await pool.destroy() }) + it('Verify that pool asynchronous resource track tasks execution', async () => { + let taskAsyncId + let initCalls = 0 + let beforeCalls = 0 + let afterCalls = 0 + let resolveCalls = 0 + const hook = createHook({ + init (asyncId, type) { + if (type === 'poolifier:task') { + initCalls++ + taskAsyncId = asyncId + } + }, + before (asyncId) { + if (asyncId === taskAsyncId) beforeCalls++ + }, + after (asyncId) { + if (asyncId === taskAsyncId) afterCalls++ + }, + promiseResolve () { + if (executionAsyncId() === taskAsyncId) resolveCalls++ + } + }) + hook.enable() + const pool = new FixedThreadPool( + numberOfWorkers, + './tests/worker-files/thread/testWorker.mjs' + ) + await pool.execute() + hook.disable() + expect(initCalls).toBe(1) + expect(beforeCalls).toBe(1) + expect(afterCalls).toBe(1) + expect(resolveCalls).toBe(1) + }) + it('Verify that hasTaskFunction() is working', async () => { const dynamicThreadPool = new DynamicThreadPool( Math.floor(numberOfWorkers / 2), -- 2.34.1