repositories
/
poolifier.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
Merge branch 'master' into combined-prs-branch
[poolifier.git]
/
src
/
pools
/
abstract-pool.ts
diff --git
a/src/pools/abstract-pool.ts
b/src/pools/abstract-pool.ts
index 7ea9379665050a982259a4181c934a7eecc81adb..9b0ec92f3df1f9a2d5cdec26608cde6595a9cf96 100644
(file)
--- a/
src/pools/abstract-pool.ts
+++ b/
src/pools/abstract-pool.ts
@@
-2,6
+2,7
@@
import { randomUUID } from 'node:crypto'
import { performance } from 'node:perf_hooks'
import type { TransferListItem } from 'node:worker_threads'
import { EventEmitterAsyncResource } from 'node:events'
import { performance } from 'node:perf_hooks'
import type { TransferListItem } from 'node:worker_threads'
import { EventEmitterAsyncResource } from 'node:events'
+import { AsyncResource } from 'node:async_hooks'
import type {
MessageValue,
PromiseResponseWrapper,
import type {
MessageValue,
PromiseResponseWrapper,
@@
-305,8
+306,8
@@
export abstract class AbstractPool<
0
),
busyWorkerNodes: this.workerNodes.reduce(
0
),
busyWorkerNodes: this.workerNodes.reduce(
- (accumulator,
workerNode
) =>
-
workerNode.usage.tasks.executing > 0
? accumulator + 1 : accumulator,
+ (accumulator,
_workerNode, workerNodeKey
) =>
+
this.isWorkerNodeBusy(workerNodeKey)
? accumulator + 1 : accumulator,
0
),
executedTasks: this.workerNodes.reduce(
0
),
executedTasks: this.workerNodes.reduce(
@@
-706,6
+707,16
@@
export abstract class AbstractPool<
)
}
)
}
+ private isWorkerNodeBusy (workerNodeKey: number): boolean {
+ if (this.opts.enableTasksQueue === true) {
+ return (
+ this.workerNodes[workerNodeKey].usage.tasks.executing >=
+ (this.opts.tasksQueueOptions?.concurrency as number)
+ )
+ }
+ return this.workerNodes[workerNodeKey].usage.tasks.executing > 0
+ }
+
private async sendTaskFunctionOperationToWorker (
workerNodeKey: number,
message: MessageValue<Data>
private async sendTaskFunctionOperationToWorker (
workerNodeKey: number,
message: MessageValue<Data>
@@
-933,7
+944,13
@@
export abstract class AbstractPool<
this.promiseResponseMap.set(task.taskId as string, {
resolve,
reject,
this.promiseResponseMap.set(task.taskId as string, {
resolve,
reject,
- workerNodeKey
+ workerNodeKey,
+ ...(this.emitter != null && {
+ asyncResource: new AsyncResource('poolifier:task', {
+ triggerAsyncId: this.emitter.asyncId,
+ requireManualDestroy: true
+ })
+ })
})
if (
this.opts.enableTasksQueue === false ||
})
if (
this.opts.enableTasksQueue === false ||
@@
-1444,6
+1461,14
@@
export abstract class AbstractPool<
})
}
})
}
+ private handleTask (workerNodeKey: number, task: Task<Data>): void {
+ if (this.shallExecuteTask(workerNodeKey)) {
+ this.executeTask(workerNodeKey, task)
+ } else {
+ this.enqueueTask(workerNodeKey, task)
+ }
+ }
+
private redistributeQueuedTasks (workerNodeKey: number): void {
if (this.workerNodes.length <= 1) {
return
private redistributeQueuedTasks (workerNodeKey: number): void {
if (this.workerNodes.length <= 1) {
return
@@
-1459,12
+1484,10
@@
export abstract class AbstractPool<
},
0
)
},
0
)
- const task = this.dequeueTask(workerNodeKey) as Task<Data>
- if (this.shallExecuteTask(destinationWorkerNodeKey)) {
- this.executeTask(destinationWorkerNodeKey, task)
- } else {
- this.enqueueTask(destinationWorkerNodeKey, task)
- }
+ this.handleTask(
+ destinationWorkerNodeKey,
+ this.dequeueTask(workerNodeKey) as Task<Data>
+ )
}
}
}
}
@@
-1618,11
+1641,7
@@
export abstract class AbstractPool<
)
if (sourceWorkerNode != null) {
const task = sourceWorkerNode.popTask() as Task<Data>
)
if (sourceWorkerNode != null) {
const task = sourceWorkerNode.popTask() as Task<Data>
- if (this.shallExecuteTask(workerNodeKey)) {
- this.executeTask(workerNodeKey, task)
- } else {
- this.enqueueTask(workerNodeKey, task)
- }
+ this.handleTask(workerNodeKey, task)
this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
this.updateTaskStolenStatisticsWorkerUsage(
workerNodeKey,
this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
this.updateTaskStolenStatisticsWorkerUsage(
workerNodeKey,
@@
-1660,11
+1679,7
@@
export abstract class AbstractPool<
(this.opts.tasksQueueOptions?.size as number) - sizeOffset
) {
const task = sourceWorkerNode.popTask() as Task<Data>
(this.opts.tasksQueueOptions?.size as number) - sizeOffset
) {
const task = sourceWorkerNode.popTask() as Task<Data>
- if (this.shallExecuteTask(workerNodeKey)) {
- this.executeTask(workerNodeKey, task)
- } else {
- this.enqueueTask(workerNodeKey, task)
- }
+ this.handleTask(workerNodeKey, task)
this.updateTaskStolenStatisticsWorkerUsage(
workerNodeKey,
task.name as string
this.updateTaskStolenStatisticsWorkerUsage(
workerNodeKey,
task.name as string
@@
-1715,13
+1730,22
@@
export abstract class AbstractPool<
const { workerId, taskId, workerError, data } = message
const promiseResponse = this.promiseResponseMap.get(taskId as string)
if (promiseResponse != null) {
const { workerId, taskId, workerError, data } = message
const promiseResponse = this.promiseResponseMap.get(taskId as string)
if (promiseResponse != null) {
- const { resolve, reject, workerNodeKey } = promiseResponse
+ const { resolve, reject, workerNodeKey
, asyncResource
} = promiseResponse
if (workerError != null) {
this.emitter?.emit(PoolEvents.taskError, workerError)
if (workerError != null) {
this.emitter?.emit(PoolEvents.taskError, workerError)
- reject(workerError.message)
+ asyncResource != null
+ ? asyncResource.runInAsyncScope(
+ reject,
+ this.emitter,
+ workerError.message
+ )
+ : reject(workerError.message)
} else {
} else {
- resolve(data as Response)
+ asyncResource != null
+ ? asyncResource.runInAsyncScope(resolve, this.emitter, data)
+ : resolve(data as Response)
}
}
+ asyncResource?.emitDestroy()
this.afterTaskExecutionHook(workerNodeKey, message)
this.workerChoiceStrategyContext.update(workerNodeKey)
this.promiseResponseMap.delete(taskId as string)
this.afterTaskExecutionHook(workerNodeKey, message)
this.workerChoiceStrategyContext.update(workerNodeKey)
this.promiseResponseMap.delete(taskId as string)