repositories
/
poolifier.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
fix: fix worker node removal handling in worker choice strategies
[poolifier.git]
/
src
/
pools
/
abstract-pool.ts
diff --git
a/src/pools/abstract-pool.ts
b/src/pools/abstract-pool.ts
index 65ace6d907a20ca5a88d820d638c1aaa1a6f8bf1..ab114e1a3fbdf6a5ae1575d7ba6dbf43615ef0b9 100644
(file)
--- a/
src/pools/abstract-pool.ts
+++ b/
src/pools/abstract-pool.ts
@@
-443,6
+443,9
@@
export abstract class AbstractPool<
* The pool readiness boolean status.
*/
private get ready (): boolean {
* The pool readiness boolean status.
*/
private get ready (): boolean {
+ if (this.empty) {
+ return false
+ }
return (
this.workerNodes.reduce(
(accumulator, workerNode) =>
return (
this.workerNodes.reduce(
(accumulator, workerNode) =>
@@
-454,6
+457,16
@@
export abstract class AbstractPool<
)
}
)
}
+ /**
+ * The pool emptiness boolean status.
+ */
+ protected get empty (): boolean {
+ if (this.minimumNumberOfWorkers === 0 && this.workerNodes.length === 0) {
+ return true
+ }
+ return false
+ }
+
/**
* The approximate pool utilization.
*
/**
* The approximate pool utilization.
*
@@
-1244,7
+1257,8
@@
export abstract class AbstractPool<
) {
this.redistributeQueuedTasks(this.workerNodes.indexOf(workerNode))
}
) {
this.redistributeQueuedTasks(this.workerNodes.indexOf(workerNode))
}
- workerNode.terminate().catch(error => {
+ // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
+ workerNode?.terminate().catch(error => {
this.emitter?.emit(PoolEvents.error, error)
})
})
this.emitter?.emit(PoolEvents.error, error)
})
})
@@
-1272,7
+1286,7
@@
export abstract class AbstractPool<
const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
message.workerId
)
const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
message.workerId
)
- const workerUsage = this.workerNodes[localWorkerNodeKey].usage
+ const workerUsage = this.workerNodes[localWorkerNodeKey]
?
.usage
// Kill message received from worker
if (
isKillBehavior(KillBehaviors.HARD, message.kill) ||
// Kill message received from worker
if (
isKillBehavior(KillBehaviors.HARD, message.kill) ||
@@
-1602,7
+1616,9
@@
export abstract class AbstractPool<
this.handleWorkerNodeIdleEvent(eventDetail, stolenTask)
return undefined
})
this.handleWorkerNodeIdleEvent(eventDetail, stolenTask)
return undefined
})
- .catch(EMPTY_FUNCTION)
+ .catch(error => {
+ this.emitter?.emit(PoolEvents.error, error)
+ })
}
private readonly workerNodeStealTask = (
}
private readonly workerNodeStealTask = (
@@
-1708,6
+1724,13
@@
export abstract class AbstractPool<
}
}
}
}
+ private checkAndEmitReadyEvent (): void {
+ if (!this.readyEventEmitted && this.ready) {
+ this.emitter?.emit(PoolEvents.ready, this.info)
+ this.readyEventEmitted = true
+ }
+ }
+
private handleWorkerReadyResponse (message: MessageValue<Response>): void {
const { workerId, ready, taskFunctionNames } = message
if (ready == null || !ready) {
private handleWorkerReadyResponse (message: MessageValue<Response>): void {
const { workerId, ready, taskFunctionNames } = message
if (ready == null || !ready) {
@@
-1717,10
+1740,7
@@
export abstract class AbstractPool<
this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
workerNode.info.ready = ready
workerNode.info.taskFunctionNames = taskFunctionNames
this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
workerNode.info.ready = ready
workerNode.info.taskFunctionNames = taskFunctionNames
- if (!this.readyEventEmitted && this.ready) {
- this.emitter?.emit(PoolEvents.ready, this.info)
- this.readyEventEmitted = true
- }
+ this.checkAndEmitReadyEvent()
}
private handleTaskExecutionResponse (message: MessageValue<Response>): void {
}
private handleTaskExecutionResponse (message: MessageValue<Response>): void {
@@
-1767,8
+1787,7
@@
export abstract class AbstractPool<
workerNodeTasksUsage.sequentiallyStolen === 0
) {
workerNode.emit('idle', {
workerNodeTasksUsage.sequentiallyStolen === 0
) {
workerNode.emit('idle', {
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- workerId: workerId!,
+ workerId,
workerNodeKey
})
}
workerNodeKey
})
}
@@
-1845,6
+1864,13
@@
export abstract class AbstractPool<
return workerNodeKey
}
return workerNodeKey
}
+ private checkAndEmitEmptyEvent (): void {
+ if (this.empty) {
+ this.emitter?.emit(PoolEvents.empty, this.info)
+ this.readyEventEmitted = false
+ }
+ }
+
/**
* Removes the worker node from the pool worker nodes.
*
/**
* Removes the worker node from the pool worker nodes.
*
@@
-1856,6
+1882,7
@@
export abstract class AbstractPool<
this.workerNodes.splice(workerNodeKey, 1)
this.workerChoiceStrategyContext?.remove(workerNodeKey)
}
this.workerNodes.splice(workerNodeKey, 1)
this.workerChoiceStrategyContext?.remove(workerNodeKey)
}
+ this.checkAndEmitEmptyEvent()
}
protected flagWorkerNodeAsNotReady (workerNodeKey: number): void {
}
protected flagWorkerNodeAsNotReady (workerNodeKey: number): void {