X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=da0eb335b26f41092fb57560d255b5b0b624ad6e;hb=0743823772420af0e3ada63d8a547c7c4c22b836;hp=eb92eb548903a719169f15e658ccbfe86cefe4be;hpb=e1c2dba7ec9c30556abfc492327abdc8957a664b;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index eb92eb54..da0eb335 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -2,6 +2,7 @@ import { randomUUID } from 'node:crypto' import { performance } from 'node:perf_hooks' import type { TransferListItem } from 'node:worker_threads' import { EventEmitterAsyncResource } from 'node:events' +import { AsyncResource } from 'node:async_hooks' import type { MessageValue, PromiseResponseWrapper, @@ -594,11 +595,13 @@ export abstract class AbstractPool< this.buildTasksQueueOptions(tasksQueueOptions) this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number) if (this.opts.tasksQueueOptions.taskStealing === true) { + this.unsetTaskStealing() this.setTaskStealing() } else { this.unsetTaskStealing() } if (this.opts.tasksQueueOptions.tasksStealingOnBackPressure === true) { + this.unsetTasksStealingOnBackPressure() this.setTasksStealingOnBackPressure() } else { this.unsetTasksStealingOnBackPressure() @@ -931,7 +934,13 @@ export abstract class AbstractPool< this.promiseResponseMap.set(task.taskId as string, { resolve, reject, - workerNodeKey + workerNodeKey, + ...(this.emitter != null && { + asyncResource: new AsyncResource('poolifier:task', { + triggerAsyncId: this.emitter.asyncId, + requireManualDestroy: true + }) + }) }) if ( this.opts.enableTasksQueue === false || @@ -983,12 +992,13 @@ export abstract class AbstractPool< } this.destroying = true await Promise.all( - this.workerNodes.map(async (_, workerNodeKey) => { + this.workerNodes.map(async (_workerNode, workerNodeKey) => { await this.destroyWorkerNode(workerNodeKey) }) ) this.emitter?.emit(PoolEvents.destroy, this.info) this.emitter?.emitDestroy() + this.emitter?.removeAllListeners() this.readyEventEmitted = false this.destroying = false this.started = false @@ -1395,7 +1405,7 @@ export abstract class AbstractPool< // Listen to worker messages. this.registerWorkerMessageListener( workerNodeKey, - this.workerMessageListener.bind(this) + this.workerMessageListener ) // Send the startup message to worker. this.sendStartupMessageToWorker(workerNodeKey) @@ -1442,6 +1452,9 @@ export abstract class AbstractPool< } private redistributeQueuedTasks (workerNodeKey: number): void { + if (this.workerNodes.length <= 1) { + return + } while (this.tasksQueueSize(workerNodeKey) > 0) { const destinationWorkerNodeKey = this.workerNodes.reduce( (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => { @@ -1535,6 +1548,9 @@ export abstract class AbstractPool< eventDetail: WorkerNodeEventDetail, previousStolenTask?: Task ): void => { + if (this.workerNodes.length <= 1) { + return + } const { workerNodeKey } = eventDetail if (workerNodeKey == null) { throw new Error( @@ -1626,6 +1642,9 @@ export abstract class AbstractPool< private readonly handleBackPressureEvent = ( eventDetail: WorkerNodeEventDetail ): void => { + if (this.workerNodes.length <= 1) { + return + } const { workerId } = eventDetail const sizeOffset = 1 if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) { @@ -1664,7 +1683,9 @@ export abstract class AbstractPool< /** * This method is the message listener registered on each worker. */ - protected workerMessageListener (message: MessageValue): void { + protected readonly workerMessageListener = ( + message: MessageValue + ): void => { this.checkMessageWorkerId(message) const { workerId, ready, taskId, taskFunctionNames } = message if (ready != null && taskFunctionNames != null) { @@ -1701,13 +1722,22 @@ export abstract class AbstractPool< const { workerId, taskId, workerError, data } = message const promiseResponse = this.promiseResponseMap.get(taskId as string) if (promiseResponse != null) { - const { resolve, reject, workerNodeKey } = promiseResponse + const { resolve, reject, workerNodeKey, asyncResource } = promiseResponse if (workerError != null) { this.emitter?.emit(PoolEvents.taskError, workerError) - reject(workerError.message) + asyncResource != null + ? asyncResource.runInAsyncScope( + reject, + this.emitter, + workerError.message + ) + : reject(workerError.message) } else { - resolve(data as Response) + asyncResource != null + ? asyncResource.runInAsyncScope(resolve, this.emitter, data) + : resolve(data as Response) } + asyncResource?.emitDestroy() this.afterTaskExecutionHook(workerNodeKey, message) this.workerChoiceStrategyContext.update(workerNodeKey) this.promiseResponseMap.delete(taskId as string)