repositories
/
poolifier.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
chore: v3.0.13
[poolifier.git]
/
src
/
pools
/
abstract-pool.ts
diff --git
a/src/pools/abstract-pool.ts
b/src/pools/abstract-pool.ts
index 5da69ca1cfd8190cdf39e78c0d5867e4218b140d..da0eb335b26f41092fb57560d255b5b0b624ad6e 100644
(file)
--- a/
src/pools/abstract-pool.ts
+++ b/
src/pools/abstract-pool.ts
@@
-2,6
+2,7
@@
import { randomUUID } from 'node:crypto'
import { performance } from 'node:perf_hooks'
import type { TransferListItem } from 'node:worker_threads'
import { EventEmitterAsyncResource } from 'node:events'
import { performance } from 'node:perf_hooks'
import type { TransferListItem } from 'node:worker_threads'
import { EventEmitterAsyncResource } from 'node:events'
+import { AsyncResource } from 'node:async_hooks'
import type {
MessageValue,
PromiseResponseWrapper,
import type {
MessageValue,
PromiseResponseWrapper,
@@
-594,11
+595,13
@@
export abstract class AbstractPool<
this.buildTasksQueueOptions(tasksQueueOptions)
this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number)
if (this.opts.tasksQueueOptions.taskStealing === true) {
this.buildTasksQueueOptions(tasksQueueOptions)
this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number)
if (this.opts.tasksQueueOptions.taskStealing === true) {
+ this.unsetTaskStealing()
this.setTaskStealing()
} else {
this.unsetTaskStealing()
}
if (this.opts.tasksQueueOptions.tasksStealingOnBackPressure === true) {
this.setTaskStealing()
} else {
this.unsetTaskStealing()
}
if (this.opts.tasksQueueOptions.tasksStealingOnBackPressure === true) {
+ this.unsetTasksStealingOnBackPressure()
this.setTasksStealingOnBackPressure()
} else {
this.unsetTasksStealingOnBackPressure()
this.setTasksStealingOnBackPressure()
} else {
this.unsetTasksStealingOnBackPressure()
@@
-630,36
+633,36
@@
export abstract class AbstractPool<
private setTaskStealing (): void {
for (const [workerNodeKey] of this.workerNodes.entries()) {
private setTaskStealing (): void {
for (const [workerNodeKey] of this.workerNodes.entries()) {
- this.workerNodes[workerNodeKey].
addEventListener
(
+ this.workerNodes[workerNodeKey].
on
(
'idleWorkerNode',
'idleWorkerNode',
- this.handleIdleWorkerNodeEvent
as EventListener
+ this.handleIdleWorkerNodeEvent
)
}
}
private unsetTaskStealing (): void {
for (const [workerNodeKey] of this.workerNodes.entries()) {
)
}
}
private unsetTaskStealing (): void {
for (const [workerNodeKey] of this.workerNodes.entries()) {
- this.workerNodes[workerNodeKey].
removeEventListener
(
+ this.workerNodes[workerNodeKey].
off
(
'idleWorkerNode',
'idleWorkerNode',
- this.handleIdleWorkerNodeEvent
as EventListener
+ this.handleIdleWorkerNodeEvent
)
}
}
private setTasksStealingOnBackPressure (): void {
for (const [workerNodeKey] of this.workerNodes.entries()) {
)
}
}
private setTasksStealingOnBackPressure (): void {
for (const [workerNodeKey] of this.workerNodes.entries()) {
- this.workerNodes[workerNodeKey].
addEventListener
(
+ this.workerNodes[workerNodeKey].
on
(
'backPressure',
'backPressure',
- this.handleBackPressureEvent
as EventListener
+ this.handleBackPressureEvent
)
}
}
private unsetTasksStealingOnBackPressure (): void {
for (const [workerNodeKey] of this.workerNodes.entries()) {
)
}
}
private unsetTasksStealingOnBackPressure (): void {
for (const [workerNodeKey] of this.workerNodes.entries()) {
- this.workerNodes[workerNodeKey].
removeEventListener
(
+ this.workerNodes[workerNodeKey].
off
(
'backPressure',
'backPressure',
- this.handleBackPressureEvent
as EventListener
+ this.handleBackPressureEvent
)
}
}
)
}
}
@@
-931,7
+934,13
@@
export abstract class AbstractPool<
this.promiseResponseMap.set(task.taskId as string, {
resolve,
reject,
this.promiseResponseMap.set(task.taskId as string, {
resolve,
reject,
- workerNodeKey
+ workerNodeKey,
+ ...(this.emitter != null && {
+ asyncResource: new AsyncResource('poolifier:task', {
+ triggerAsyncId: this.emitter.asyncId,
+ requireManualDestroy: true
+ })
+ })
})
if (
this.opts.enableTasksQueue === false ||
})
if (
this.opts.enableTasksQueue === false ||
@@
-983,12
+992,13
@@
export abstract class AbstractPool<
}
this.destroying = true
await Promise.all(
}
this.destroying = true
await Promise.all(
- this.workerNodes.map(async (_, workerNodeKey) => {
+ this.workerNodes.map(async (_
workerNode
, workerNodeKey) => {
await this.destroyWorkerNode(workerNodeKey)
})
)
this.emitter?.emit(PoolEvents.destroy, this.info)
this.emitter?.emitDestroy()
await this.destroyWorkerNode(workerNodeKey)
})
)
this.emitter?.emit(PoolEvents.destroy, this.info)
this.emitter?.emitDestroy()
+ this.emitter?.removeAllListeners()
this.readyEventEmitted = false
this.destroying = false
this.started = false
this.readyEventEmitted = false
this.destroying = false
this.started = false
@@
-1395,7
+1405,7
@@
export abstract class AbstractPool<
// Listen to worker messages.
this.registerWorkerMessageListener(
workerNodeKey,
// Listen to worker messages.
this.registerWorkerMessageListener(
workerNodeKey,
- this.workerMessageListener
.bind(this)
+ this.workerMessageListener
)
// Send the startup message to worker.
this.sendStartupMessageToWorker(workerNodeKey)
)
// Send the startup message to worker.
this.sendStartupMessageToWorker(workerNodeKey)
@@
-1403,15
+1413,15
@@
export abstract class AbstractPool<
this.sendStatisticsMessageToWorker(workerNodeKey)
if (this.opts.enableTasksQueue === true) {
if (this.opts.tasksQueueOptions?.taskStealing === true) {
this.sendStatisticsMessageToWorker(workerNodeKey)
if (this.opts.enableTasksQueue === true) {
if (this.opts.tasksQueueOptions?.taskStealing === true) {
- this.workerNodes[workerNodeKey].
addEventListener
(
+ this.workerNodes[workerNodeKey].
on
(
'idleWorkerNode',
'idleWorkerNode',
- this.handleIdleWorkerNodeEvent
as EventListener
+ this.handleIdleWorkerNodeEvent
)
}
if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
)
}
if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
- this.workerNodes[workerNodeKey].
addEventListener
(
+ this.workerNodes[workerNodeKey].
on
(
'backPressure',
'backPressure',
- this.handleBackPressureEvent
as EventListener
+ this.handleBackPressureEvent
)
}
}
)
}
}
@@
-1442,6
+1452,9
@@
export abstract class AbstractPool<
}
private redistributeQueuedTasks (workerNodeKey: number): void {
}
private redistributeQueuedTasks (workerNodeKey: number): void {
+ if (this.workerNodes.length <= 1) {
+ return
+ }
while (this.tasksQueueSize(workerNodeKey) > 0) {
const destinationWorkerNodeKey = this.workerNodes.reduce(
(minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
while (this.tasksQueueSize(workerNodeKey) > 0) {
const destinationWorkerNodeKey = this.workerNodes.reduce(
(minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
@@
-1532,10
+1545,13
@@
export abstract class AbstractPool<
}
private readonly handleIdleWorkerNodeEvent = (
}
private readonly handleIdleWorkerNodeEvent = (
- event
: CustomEvent<WorkerNodeEventDetail>
,
+ event
Detail: WorkerNodeEventDetail
,
previousStolenTask?: Task<Data>
): void => {
previousStolenTask?: Task<Data>
): void => {
- const { workerNodeKey } = event.detail
+ if (this.workerNodes.length <= 1) {
+ return
+ }
+ const { workerNodeKey } = eventDetail
if (workerNodeKey == null) {
throw new Error(
'WorkerNode event detail workerNodeKey attribute must be defined'
if (workerNodeKey == null) {
throw new Error(
'WorkerNode event detail workerNodeKey attribute must be defined'
@@
-1586,7
+1602,7
@@
export abstract class AbstractPool<
}
sleep(exponentialDelay(workerNodeTasksUsage.sequentiallyStolen))
.then(() => {
}
sleep(exponentialDelay(workerNodeTasksUsage.sequentiallyStolen))
.then(() => {
- this.handleIdleWorkerNodeEvent(event, stolenTask)
+ this.handleIdleWorkerNodeEvent(event
Detail
, stolenTask)
return undefined
})
.catch(EMPTY_FUNCTION)
return undefined
})
.catch(EMPTY_FUNCTION)
@@
-1624,9
+1640,12
@@
export abstract class AbstractPool<
}
private readonly handleBackPressureEvent = (
}
private readonly handleBackPressureEvent = (
- event
: CustomEvent<WorkerNodeEventDetail>
+ event
Detail: WorkerNodeEventDetail
): void => {
): void => {
- const { workerId } = event.detail
+ if (this.workerNodes.length <= 1) {
+ return
+ }
+ const { workerId } = eventDetail
const sizeOffset = 1
if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) {
return
const sizeOffset = 1
if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) {
return
@@
-1664,7
+1683,9
@@
export abstract class AbstractPool<
/**
* This method is the message listener registered on each worker.
*/
/**
* This method is the message listener registered on each worker.
*/
- protected workerMessageListener (message: MessageValue<Response>): void {
+ protected readonly workerMessageListener = (
+ message: MessageValue<Response>
+ ): void => {
this.checkMessageWorkerId(message)
const { workerId, ready, taskId, taskFunctionNames } = message
if (ready != null && taskFunctionNames != null) {
this.checkMessageWorkerId(message)
const { workerId, ready, taskId, taskFunctionNames } = message
if (ready != null && taskFunctionNames != null) {
@@
-1701,13
+1722,22
@@
export abstract class AbstractPool<
const { workerId, taskId, workerError, data } = message
const promiseResponse = this.promiseResponseMap.get(taskId as string)
if (promiseResponse != null) {
const { workerId, taskId, workerError, data } = message
const promiseResponse = this.promiseResponseMap.get(taskId as string)
if (promiseResponse != null) {
- const { resolve, reject, workerNodeKey } = promiseResponse
+ const { resolve, reject, workerNodeKey
, asyncResource
} = promiseResponse
if (workerError != null) {
this.emitter?.emit(PoolEvents.taskError, workerError)
if (workerError != null) {
this.emitter?.emit(PoolEvents.taskError, workerError)
- reject(workerError.message)
+ asyncResource != null
+ ? asyncResource.runInAsyncScope(
+ reject,
+ this.emitter,
+ workerError.message
+ )
+ : reject(workerError.message)
} else {
} else {
- resolve(data as Response)
+ asyncResource != null
+ ? asyncResource.runInAsyncScope(resolve, this.emitter, data)
+ : resolve(data as Response)
}
}
+ asyncResource?.emitDestroy()
this.afterTaskExecutionHook(workerNodeKey, message)
this.workerChoiceStrategyContext.update(workerNodeKey)
this.promiseResponseMap.delete(taskId as string)
this.afterTaskExecutionHook(workerNodeKey, message)
this.workerChoiceStrategyContext.update(workerNodeKey)
this.promiseResponseMap.delete(taskId as string)
@@
-1728,11
+1758,10
@@
export abstract class AbstractPool<
this.tasksQueueSize(workerNodeKey) === 0 &&
workerNodeTasksUsage.sequentiallyStolen === 0
) {
this.tasksQueueSize(workerNodeKey) === 0 &&
workerNodeTasksUsage.sequentiallyStolen === 0
) {
- this.workerNodes[workerNodeKey].dispatchEvent(
- new CustomEvent<WorkerNodeEventDetail>('idleWorkerNode', {
- detail: { workerId: workerId as number, workerNodeKey }
- })
- )
+ this.workerNodes[workerNodeKey].emit('idleWorkerNode', {
+ workerId: workerId as number,
+ workerNodeKey
+ })
}
}
}
}
}
}