repositories
/
poolifier.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
refactor: methods namepace cleanup
[poolifier.git]
/
src
/
pools
/
abstract-pool.ts
diff --git
a/src/pools/abstract-pool.ts
b/src/pools/abstract-pool.ts
index 10a5ce42af8d58e183761f454069d00f6b76a593..b43d44a59eea1cb1c7990e660b96cb4cd5483a03 100644
(file)
--- a/
src/pools/abstract-pool.ts
+++ b/
src/pools/abstract-pool.ts
@@
-128,7
+128,13
@@
export abstract class AbstractPool<
this.setupHook()
this.setupHook()
- while (this.workerNodes.length < this.numberOfWorkers) {
+ while (
+ this.workerNodes.reduce(
+ (accumulator, workerNode) =>
+ !workerNode.info.dynamic ? accumulator + 1 : accumulator,
+ 0
+ ) < this.numberOfWorkers
+ ) {
this.createAndSetupWorker()
}
this.createAndSetupWorker()
}
@@
-172,9
+178,9
@@
export abstract class AbstractPool<
throw new RangeError(
'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
)
throw new RangeError(
'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
)
- } else if (m
in === 0 && m
ax === 0) {
+ } else if (max === 0) {
throw new RangeError(
throw new RangeError(
- 'Cannot instantiate a dynamic pool with a
minimum pool size and a maximum
pool size equal to zero'
+ 'Cannot instantiate a dynamic pool with a pool size equal to zero'
)
} else if (min === max) {
throw new RangeError(
)
} else if (min === max) {
throw new RangeError(
@@
-411,16
+417,24
@@
export abstract class AbstractPool<
}
private get starting (): boolean {
}
private get starting (): boolean {
- return this.workerNodes.length < this.minSize
+ return (
+ this.workerNodes.reduce(
+ (accumulator, workerNode) =>
+ !workerNode.info.dynamic ? accumulator + 1 : accumulator,
+ 0
+ ) < this.minSize
+ )
}
private get ready (): boolean {
return (
}
private get ready (): boolean {
return (
- this.workerNodes.length >= this.minSize &&
- this.workerNodes.every(
- (workerNode, workerNodeKey) =>
- workerNodeKey < this.minSize && workerNode.info.ready
- )
+ this.workerNodes.reduce(
+ (accumulator, workerNode) =>
+ !workerNode.info.dynamic && workerNode.info.ready
+ ? accumulator + 1
+ : accumulator,
+ 0
+ ) >= this.minSize
)
}
)
}
@@
-949,7
+963,7
@@
export abstract class AbstractPool<
this.removeWorkerNode(worker)
})
this.removeWorkerNode(worker)
})
- this.
push
WorkerNode(worker)
+ this.
add
WorkerNode(worker)
this.afterWorkerSetup(worker)
this.afterWorkerSetup(worker)
@@
-979,10
+993,12
@@
export abstract class AbstractPool<
}
})
const workerInfo = this.getWorkerInfo(this.getWorkerNodeKey(worker))
}
})
const workerInfo = this.getWorkerInfo(this.getWorkerNodeKey(worker))
- workerInfo.ready = true
workerInfo.dynamic = true
workerInfo.dynamic = true
+ if (this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker) {
+ workerInfo.ready = true
+ }
this.sendToWorker(worker, {
this.sendToWorker(worker, {
- checkA
l
ive: true,
+ checkA
ct
ive: true,
workerId: workerInfo.id as number
})
return worker
workerId: workerInfo.id as number
})
return worker
@@
-1062,8
+1078,8
@@
export abstract class AbstractPool<
return message => {
this.checkMessageWorkerId(message)
if (message.ready != null) {
return message => {
this.checkMessageWorkerId(message)
if (message.ready != null) {
- // Worker ready
messag
e received
- this.handleWorkerReady
Messag
e(message)
+ // Worker ready
respons
e received
+ this.handleWorkerReady
Respons
e(message)
} else if (message.id != null) {
// Task execution response received
this.handleTaskExecutionResponse(message)
} else if (message.id != null) {
// Task execution response received
this.handleTaskExecutionResponse(message)
@@
-1071,7
+1087,7
@@
export abstract class AbstractPool<
}
}
}
}
- private handleWorkerReady
Messag
e (message: MessageValue<Response>): void {
+ private handleWorkerReady
Respons
e (message: MessageValue<Response>): void {
const worker = this.getWorkerById(message.workerId)
this.getWorkerInfo(this.getWorkerNodeKey(worker as Worker)).ready =
message.ready as boolean
const worker = this.getWorkerById(message.workerId)
this.getWorkerInfo(this.getWorkerNodeKey(worker as Worker)).ready =
message.ready as boolean
@@
-1128,17
+1144,18
@@
export abstract class AbstractPool<
}
/**
}
/**
- *
Pushe
s the given worker in the pool worker nodes.
+ *
Add
s the given worker in the pool worker nodes.
*
* @param worker - The worker.
* @returns The worker nodes length.
*/
*
* @param worker - The worker.
* @returns The worker nodes length.
*/
- private pushWorkerNode (worker: Worker): number {
- const workerNode = new WorkerNode(worker, this.worker)
+ private addWorkerNode (worker: Worker): number {
+ const workerNode = new WorkerNode<Worker, Data>(worker, this.worker)
+ // Flag the worker node as ready at pool startup.
if (this.starting) {
workerNode.info.ready = true
}
if (this.starting) {
workerNode.info.ready = true
}
- return this.workerNodes.push(
new WorkerNode(worker, this.worker)
)
+ return this.workerNodes.push(
workerNode
)
}
/**
}
/**
@@
-1154,6
+1171,12
@@
export abstract class AbstractPool<
}
}
}
}
+ /**
+ * Executes the given task on the given worker.
+ *
+ * @param worker - The worker.
+ * @param task - The task to execute.
+ */
private executeTask (workerNodeKey: number, task: Task<Data>): void {
this.beforeTaskExecutionHook(workerNodeKey, task)
this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
private executeTask (workerNodeKey: number, task: Task<Data>): void {
this.beforeTaskExecutionHook(workerNodeKey, task)
this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)