repositories
/
poolifier.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
fix: refine pool statuses handling
[poolifier.git]
/
src
/
pools
/
abstract-pool.ts
diff --git
a/src/pools/abstract-pool.ts
b/src/pools/abstract-pool.ts
index 9045b063675a7071d77097ae51d3aa4d7bb9cfcb..b72d8cbcb4c3354294dda16718a6d229e64d04e0 100644
(file)
--- a/
src/pools/abstract-pool.ts
+++ b/
src/pools/abstract-pool.ts
@@
-298,6
+298,13
@@
export abstract class AbstractPool<
: accumulator,
0
),
: accumulator,
0
),
+ ...(this.opts.enableTasksQueue === true && {
+ stealingWorkerNodes: this.workerNodes.reduce(
+ (accumulator, workerNode) =>
+ workerNode.info.stealing ? accumulator + 1 : accumulator,
+ 0
+ )
+ }),
busyWorkerNodes: this.workerNodes.reduce(
(accumulator, _workerNode, workerNodeKey) =>
this.isWorkerNodeBusy(workerNodeKey) ? accumulator + 1 : accumulator,
busyWorkerNodes: this.workerNodes.reduce(
(accumulator, _workerNode, workerNodeKey) =>
this.isWorkerNodeBusy(workerNodeKey) ? accumulator + 1 : accumulator,
@@
-1178,9
+1185,7
@@
export abstract class AbstractPool<
*
* @returns Whether to create a dynamic worker or not.
*/
*
* @returns Whether to create a dynamic worker or not.
*/
- private shallCreateDynamicWorker (): boolean {
- return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
- }
+ protected abstract shallCreateDynamicWorker (): boolean
/**
* Sends a message to worker given its worker node key.
/**
* Sends a message to worker given its worker node key.
@@
-1219,7
+1224,6
@@
export abstract class AbstractPool<
this.emitter?.emit(PoolEvents.error, error)
if (
this.started &&
this.emitter?.emit(PoolEvents.error, error)
if (
this.started &&
- !this.starting &&
!this.destroying &&
this.opts.restartWorkerOnError === true
) {
!this.destroying &&
this.opts.restartWorkerOnError === true
) {
@@
-1229,7
+1233,11
@@
export abstract class AbstractPool<
this.createAndSetupWorkerNode()
}
}
this.createAndSetupWorkerNode()
}
}
- if (this.started && this.opts.enableTasksQueue === true) {
+ if (
+ this.started &&
+ !this.destroying &&
+ this.opts.enableTasksQueue === true
+ ) {
this.redistributeQueuedTasks(this.workerNodes.indexOf(workerNode))
}
workerNode?.terminate().catch(error => {
this.redistributeQueuedTasks(this.workerNodes.indexOf(workerNode))
}
workerNode?.terminate().catch(error => {
@@
-1399,6
+1407,10
@@
export abstract class AbstractPool<
})
}
})
}
+ private cannotStealTask (): boolean {
+ return this.workerNodes.length <= 1 || this.info.queuedTasks === 0
+ }
+
private handleTask (workerNodeKey: number, task: Task<Data>): void {
if (this.shallExecuteTask(workerNodeKey)) {
this.executeTask(workerNodeKey, task)
private handleTask (workerNodeKey: number, task: Task<Data>): void {
if (this.shallExecuteTask(workerNodeKey)) {
this.executeTask(workerNodeKey, task)
@@
-1411,7
+1423,7
@@
export abstract class AbstractPool<
if (workerNodeKey === -1) {
return
}
if (workerNodeKey === -1) {
return
}
- if (this.
workerNodes.length <= 1
) {
+ if (this.
cannotStealTask()
) {
return
}
while (this.tasksQueueSize(workerNodeKey) > 0) {
return
}
while (this.tasksQueueSize(workerNodeKey) > 0) {
@@
-1505,15
+1517,22
@@
export abstract class AbstractPool<
eventDetail: WorkerNodeEventDetail,
previousStolenTask?: Task<Data>
): void => {
eventDetail: WorkerNodeEventDetail,
previousStolenTask?: Task<Data>
): void => {
- if (this.workerNodes.length <= 1) {
- return
- }
const { workerNodeKey } = eventDetail
if (workerNodeKey == null) {
throw new Error(
const { workerNodeKey } = eventDetail
if (workerNodeKey == null) {
throw new Error(
- 'WorkerNode event detail workerNodeKey
attribute
must be defined'
+ 'WorkerNode event detail workerNodeKey
property
must be defined'
)
}
)
}
+ if (
+ this.cannotStealTask() ||
+ (this.info.stealingWorkerNodes as number) >
+ Math.floor(this.workerNodes.length / 2)
+ ) {
+ if (previousStolenTask != null) {
+ this.getWorkerInfo(workerNodeKey).stealing = false
+ }
+ return
+ }
const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks
if (
previousStolenTask != null &&
const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks
if (
previousStolenTask != null &&
@@
-1521,6
+1540,7
@@
export abstract class AbstractPool<
(workerNodeTasksUsage.executing > 0 ||
this.tasksQueueSize(workerNodeKey) > 0)
) {
(workerNodeTasksUsage.executing > 0 ||
this.tasksQueueSize(workerNodeKey) > 0)
) {
+ this.getWorkerInfo(workerNodeKey).stealing = false
for (const taskName of this.workerNodes[workerNodeKey].info
.taskFunctionNames as string[]) {
this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
for (const taskName of this.workerNodes[workerNodeKey].info
.taskFunctionNames as string[]) {
this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
@@
-1531,6
+1551,7
@@
export abstract class AbstractPool<
this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
return
}
this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
return
}
+ this.getWorkerInfo(workerNodeKey).stealing = true
const stolenTask = this.workerNodeStealTask(workerNodeKey)
if (
this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
const stolenTask = this.workerNodeStealTask(workerNodeKey)
if (
this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
@@
-1577,6
+1598,7
@@
export abstract class AbstractPool<
const sourceWorkerNode = workerNodes.find(
(sourceWorkerNode, sourceWorkerNodeKey) =>
sourceWorkerNode.info.ready &&
const sourceWorkerNode = workerNodes.find(
(sourceWorkerNode, sourceWorkerNodeKey) =>
sourceWorkerNode.info.ready &&
+ !sourceWorkerNode.info.stealing &&
sourceWorkerNodeKey !== workerNodeKey &&
sourceWorkerNode.usage.tasks.queued > 0
)
sourceWorkerNodeKey !== workerNodeKey &&
sourceWorkerNode.usage.tasks.queued > 0
)
@@
-1595,7
+1617,11
@@
export abstract class AbstractPool<
private readonly handleBackPressureEvent = (
eventDetail: WorkerNodeEventDetail
): void => {
private readonly handleBackPressureEvent = (
eventDetail: WorkerNodeEventDetail
): void => {
- if (this.workerNodes.length <= 1) {
+ if (
+ this.cannotStealTask() ||
+ (this.info.stealingWorkerNodes as number) >
+ Math.floor(this.workerNodes.length / 2)
+ ) {
return
}
const { workerId } = eventDetail
return
}
const { workerId } = eventDetail
@@
-1615,16
+1641,19
@@
export abstract class AbstractPool<
if (
sourceWorkerNode.usage.tasks.queued > 0 &&
workerNode.info.ready &&
if (
sourceWorkerNode.usage.tasks.queued > 0 &&
workerNode.info.ready &&
+ !workerNode.info.stealing &&
workerNode.info.id !== workerId &&
workerNode.usage.tasks.queued <
(this.opts.tasksQueueOptions?.size as number) - sizeOffset
) {
workerNode.info.id !== workerId &&
workerNode.usage.tasks.queued <
(this.opts.tasksQueueOptions?.size as number) - sizeOffset
) {
+ this.getWorkerInfo(workerNodeKey).stealing = true
const task = sourceWorkerNode.popTask() as Task<Data>
this.handleTask(workerNodeKey, task)
this.updateTaskStolenStatisticsWorkerUsage(
workerNodeKey,
task.name as string
)
const task = sourceWorkerNode.popTask() as Task<Data>
this.handleTask(workerNodeKey, task)
this.updateTaskStolenStatisticsWorkerUsage(
workerNodeKey,
task.name as string
)
+ this.getWorkerInfo(workerNodeKey).stealing = false
}
}
}
}
}
}
@@
-1729,13
+1758,10
@@
export abstract class AbstractPool<
}
}
}
}
- private checkAndEmitDynamicWorkerCreationEvents (): void {
- if (this.type === PoolTypes.dynamic) {
- if (this.full) {
- this.emitter?.emit(PoolEvents.full, this.info)
- }
- }
- }
+ /**
+ * Emits dynamic worker creation events.
+ */
+ protected abstract checkAndEmitDynamicWorkerCreationEvents (): void
/**
* Gets the worker information given its worker node key.
/**
* Gets the worker information given its worker node key.