X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;ds=sidebyside;f=src%2Fpools%2Fabstract-pool.ts;h=9b0ec92f3df1f9a2d5cdec26608cde6595a9cf96;hb=42c677c1fd2cacd8bb79c8be0aa024ac93c7159a;hp=3da4c70fa41e0cf5168dfccbe517d2ae626f8542;hpb=273b76d87560482b4114668a705e42d54b3ce3b0;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 3da4c70f..9b0ec92f 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -2,6 +2,7 @@ import { randomUUID } from 'node:crypto' import { performance } from 'node:perf_hooks' import type { TransferListItem } from 'node:worker_threads' import { EventEmitterAsyncResource } from 'node:events' +import { AsyncResource } from 'node:async_hooks' import type { MessageValue, PromiseResponseWrapper, @@ -305,8 +306,8 @@ export abstract class AbstractPool< 0 ), busyWorkerNodes: this.workerNodes.reduce( - (accumulator, workerNode) => - workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator, + (accumulator, _workerNode, workerNodeKey) => + this.isWorkerNodeBusy(workerNodeKey) ? accumulator + 1 : accumulator, 0 ), executedTasks: this.workerNodes.reduce( @@ -706,6 +707,16 @@ export abstract class AbstractPool< ) } + private isWorkerNodeBusy (workerNodeKey: number): boolean { + if (this.opts.enableTasksQueue === true) { + return ( + this.workerNodes[workerNodeKey].usage.tasks.executing >= + (this.opts.tasksQueueOptions?.concurrency as number) + ) + } + return this.workerNodes[workerNodeKey].usage.tasks.executing > 0 + } + private async sendTaskFunctionOperationToWorker ( workerNodeKey: number, message: MessageValue @@ -933,7 +944,13 @@ export abstract class AbstractPool< this.promiseResponseMap.set(task.taskId as string, { resolve, reject, - workerNodeKey + workerNodeKey, + ...(this.emitter != null && { + asyncResource: new AsyncResource('poolifier:task', { + triggerAsyncId: this.emitter.asyncId, + requireManualDestroy: true + }) + }) }) if ( this.opts.enableTasksQueue === false || @@ -1444,7 +1461,18 @@ export abstract class AbstractPool< }) } + private handleTask (workerNodeKey: number, task: Task): void { + if (this.shallExecuteTask(workerNodeKey)) { + this.executeTask(workerNodeKey, task) + } else { + this.enqueueTask(workerNodeKey, task) + } + } + private redistributeQueuedTasks (workerNodeKey: number): void { + if (this.workerNodes.length <= 1) { + return + } while (this.tasksQueueSize(workerNodeKey) > 0) { const destinationWorkerNodeKey = this.workerNodes.reduce( (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => { @@ -1456,12 +1484,10 @@ export abstract class AbstractPool< }, 0 ) - const task = this.dequeueTask(workerNodeKey) as Task - if (this.shallExecuteTask(destinationWorkerNodeKey)) { - this.executeTask(destinationWorkerNodeKey, task) - } else { - this.enqueueTask(destinationWorkerNodeKey, task) - } + this.handleTask( + destinationWorkerNodeKey, + this.dequeueTask(workerNodeKey) as Task + ) } } @@ -1538,6 +1564,9 @@ export abstract class AbstractPool< eventDetail: WorkerNodeEventDetail, previousStolenTask?: Task ): void => { + if (this.workerNodes.length <= 1) { + return + } const { workerNodeKey } = eventDetail if (workerNodeKey == null) { throw new Error( @@ -1612,11 +1641,7 @@ export abstract class AbstractPool< ) if (sourceWorkerNode != null) { const task = sourceWorkerNode.popTask() as Task - if (this.shallExecuteTask(workerNodeKey)) { - this.executeTask(workerNodeKey, task) - } else { - this.enqueueTask(workerNodeKey, task) - } + this.handleTask(workerNodeKey, task) this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey) this.updateTaskStolenStatisticsWorkerUsage( workerNodeKey, @@ -1629,6 +1654,9 @@ export abstract class AbstractPool< private readonly handleBackPressureEvent = ( eventDetail: WorkerNodeEventDetail ): void => { + if (this.workerNodes.length <= 1) { + return + } const { workerId } = eventDetail const sizeOffset = 1 if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) { @@ -1651,11 +1679,7 @@ export abstract class AbstractPool< (this.opts.tasksQueueOptions?.size as number) - sizeOffset ) { const task = sourceWorkerNode.popTask() as Task - if (this.shallExecuteTask(workerNodeKey)) { - this.executeTask(workerNodeKey, task) - } else { - this.enqueueTask(workerNodeKey, task) - } + this.handleTask(workerNodeKey, task) this.updateTaskStolenStatisticsWorkerUsage( workerNodeKey, task.name as string @@ -1706,13 +1730,22 @@ export abstract class AbstractPool< const { workerId, taskId, workerError, data } = message const promiseResponse = this.promiseResponseMap.get(taskId as string) if (promiseResponse != null) { - const { resolve, reject, workerNodeKey } = promiseResponse + const { resolve, reject, workerNodeKey, asyncResource } = promiseResponse if (workerError != null) { this.emitter?.emit(PoolEvents.taskError, workerError) - reject(workerError.message) + asyncResource != null + ? asyncResource.runInAsyncScope( + reject, + this.emitter, + workerError.message + ) + : reject(workerError.message) } else { - resolve(data as Response) + asyncResource != null + ? asyncResource.runInAsyncScope(resolve, this.emitter, data) + : resolve(data as Response) } + asyncResource?.emitDestroy() this.afterTaskExecutionHook(workerNodeKey, message) this.workerChoiceStrategyContext.update(workerNodeKey) this.promiseResponseMap.delete(taskId as string)