repositories
/
poolifier.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
build(deps-dev): apply updates
[poolifier.git]
/
src
/
pools
/
abstract-pool.ts
diff --git
a/src/pools/abstract-pool.ts
b/src/pools/abstract-pool.ts
index f14c28e9925117ae6d73a51cb09b0fe811cad3f9..bc77874ff423bb3ee112f6150db81962ae5d6e69 100644
(file)
--- a/
src/pools/abstract-pool.ts
+++ b/
src/pools/abstract-pool.ts
@@
-283,10
+283,11
@@
export abstract class AbstractPool<
ready: this.ready,
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
strategy: this.opts.workerChoiceStrategy!,
ready: this.ready,
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
strategy: this.opts.workerChoiceStrategy!,
+ strategyRetries: this.workerChoiceStrategyContext?.retriesCount ?? 0,
minSize: this.minimumNumberOfWorkers,
maxSize: this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers,
...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
minSize: this.minimumNumberOfWorkers,
maxSize: this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers,
...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
-
?
.runTime.aggregate === true &&
+ .runTime.aggregate === true &&
this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
.waitTime.aggregate && {
utilization: round(this.utilization)
this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
.waitTime.aggregate && {
utilization: round(this.utilization)
@@
-351,7
+352,7
@@
export abstract class AbstractPool<
0
),
...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
0
),
...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
-
?
.runTime.aggregate === true && {
+ .runTime.aggregate === true && {
runTime: {
minimum: round(
min(
runTime: {
minimum: round(
min(
@@
-394,7
+395,7
@@
export abstract class AbstractPool<
}
}),
...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
}
}),
...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
-
?
.waitTime.aggregate === true && {
+ .waitTime.aggregate === true && {
waitTime: {
minimum: round(
min(
waitTime: {
minimum: round(
min(
@@
-443,6
+444,9
@@
export abstract class AbstractPool<
* The pool readiness boolean status.
*/
private get ready (): boolean {
* The pool readiness boolean status.
*/
private get ready (): boolean {
+ if (this.empty) {
+ return false
+ }
return (
this.workerNodes.reduce(
(accumulator, workerNode) =>
return (
this.workerNodes.reduce(
(accumulator, workerNode) =>
@@
-454,6
+458,13
@@
export abstract class AbstractPool<
)
}
)
}
+ /**
+ * The pool emptiness boolean status.
+ */
+ protected get empty (): boolean {
+ return this.minimumNumberOfWorkers === 0 && this.workerNodes.length === 0
+ }
+
/**
* The approximate pool utilization.
*
/**
* The approximate pool utilization.
*
@@
-1273,7
+1284,7
@@
export abstract class AbstractPool<
const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
message.workerId
)
const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
message.workerId
)
- const workerUsage = this.workerNodes[localWorkerNodeKey].usage
+ const workerUsage = this.workerNodes[localWorkerNodeKey]
?
.usage
// Kill message received from worker
if (
isKillBehavior(KillBehaviors.HARD, message.kill) ||
// Kill message received from worker
if (
isKillBehavior(KillBehaviors.HARD, message.kill) ||
@@
-1407,9
+1418,9
@@
export abstract class AbstractPool<
statistics: {
runTime:
this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
statistics: {
runTime:
this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
-
?
.runTime.aggregate ?? false,
+ .runTime.aggregate ?? false,
elu:
elu:
- this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
?
.elu
+ this.workerChoiceStrategyContext?.getTaskStatisticsRequirements().elu
.aggregate ?? false
}
})
.aggregate ?? false
}
})
@@
-1711,6
+1722,13
@@
export abstract class AbstractPool<
}
}
}
}
+ private checkAndEmitReadyEvent (): void {
+ if (!this.readyEventEmitted && this.ready) {
+ this.emitter?.emit(PoolEvents.ready, this.info)
+ this.readyEventEmitted = true
+ }
+ }
+
private handleWorkerReadyResponse (message: MessageValue<Response>): void {
const { workerId, ready, taskFunctionNames } = message
if (ready == null || !ready) {
private handleWorkerReadyResponse (message: MessageValue<Response>): void {
const { workerId, ready, taskFunctionNames } = message
if (ready == null || !ready) {
@@
-1720,10
+1738,7
@@
export abstract class AbstractPool<
this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
workerNode.info.ready = ready
workerNode.info.taskFunctionNames = taskFunctionNames
this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
workerNode.info.ready = ready
workerNode.info.taskFunctionNames = taskFunctionNames
- if (!this.readyEventEmitted && this.ready) {
- this.emitter?.emit(PoolEvents.ready, this.info)
- this.readyEventEmitted = true
- }
+ this.checkAndEmitReadyEvent()
}
private handleTaskExecutionResponse (message: MessageValue<Response>): void {
}
private handleTaskExecutionResponse (message: MessageValue<Response>): void {
@@
-1753,7
+1768,12
@@
export abstract class AbstractPool<
this.promiseResponseMap.delete(taskId!)
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
workerNode?.emit('taskFinished', taskId)
this.promiseResponseMap.delete(taskId!)
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
workerNode?.emit('taskFinished', taskId)
- if (this.opts.enableTasksQueue === true && !this.destroying) {
+ if (
+ this.opts.enableTasksQueue === true &&
+ !this.destroying &&
+ // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
+ workerNode != null
+ ) {
const workerNodeTasksUsage = workerNode.usage.tasks
if (
this.tasksQueueSize(workerNodeKey) > 0 &&
const workerNodeTasksUsage = workerNode.usage.tasks
if (
this.tasksQueueSize(workerNodeKey) > 0 &&
@@
-1847,6
+1867,13
@@
export abstract class AbstractPool<
return workerNodeKey
}
return workerNodeKey
}
+ private checkAndEmitEmptyEvent (): void {
+ if (this.empty) {
+ this.emitter?.emit(PoolEvents.empty, this.info)
+ this.readyEventEmitted = false
+ }
+ }
+
/**
* Removes the worker node from the pool worker nodes.
*
/**
* Removes the worker node from the pool worker nodes.
*
@@
-1858,6
+1885,7
@@
export abstract class AbstractPool<
this.workerNodes.splice(workerNodeKey, 1)
this.workerChoiceStrategyContext?.remove(workerNodeKey)
}
this.workerNodes.splice(workerNodeKey, 1)
this.workerChoiceStrategyContext?.remove(workerNodeKey)
}
+ this.checkAndEmitEmptyEvent()
}
protected flagWorkerNodeAsNotReady (workerNodeKey: number): void {
}
protected flagWorkerNodeAsNotReady (workerNodeKey: number): void {