repositories
/
poolifier.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
fix: fix build after merge with main
[poolifier.git]
/
src
/
pools
/
abstract-pool.ts
diff --git
a/src/pools/abstract-pool.ts
b/src/pools/abstract-pool.ts
index ac7da967eccda63f3cebc2e27d16d6f991b82226..e4947cdc6b6caeca4c77d485d8b15d83c3591272 100644
(file)
--- a/
src/pools/abstract-pool.ts
+++ b/
src/pools/abstract-pool.ts
@@
-21,7
+21,7
@@
import {
type TasksQueueOptions,
type WorkerType
} from './pool'
type TasksQueueOptions,
type WorkerType
} from './pool'
-import type { IWorker, Task,
TasksUsage, WorkerNod
e } from './worker'
+import type { IWorker, Task,
WorkerNode, WorkerUsag
e } from './worker'
import {
WorkerChoiceStrategies,
type WorkerChoiceStrategy,
import {
WorkerChoiceStrategies,
type WorkerChoiceStrategy,
@@
-222,17
+222,26
@@
export abstract class AbstractPool<
workerNodes: this.workerNodes.length,
idleWorkerNodes: this.workerNodes.reduce(
(accumulator, workerNode) =>
workerNodes: this.workerNodes.length,
idleWorkerNodes: this.workerNodes.reduce(
(accumulator, workerNode) =>
- workerNode.tasksUsage.running === 0 ? accumulator + 1 : accumulator,
+ workerNode.workerUsage.tasks.executing === 0
+ ? accumulator + 1
+ : accumulator,
0
),
busyWorkerNodes: this.workerNodes.reduce(
(accumulator, workerNode) =>
0
),
busyWorkerNodes: this.workerNodes.reduce(
(accumulator, workerNode) =>
- workerNode.tasksUsage.running > 0 ? accumulator + 1 : accumulator,
+ workerNode.workerUsage.tasks.executing > 0
+ ? accumulator + 1
+ : accumulator,
0
),
0
),
-
running
Tasks: this.workerNodes.reduce(
+
executed
Tasks: this.workerNodes.reduce(
(accumulator, workerNode) =>
(accumulator, workerNode) =>
- accumulator + workerNode.tasksUsage.running,
+ accumulator + workerNode.workerUsage.tasks.executed,
+ 0
+ ),
+ executingTasks: this.workerNodes.reduce(
+ (accumulator, workerNode) =>
+ accumulator + workerNode.workerUsage.tasks.executing,
0
),
queuedTasks: this.workerNodes.reduce(
0
),
queuedTasks: this.workerNodes.reduce(
@@
-243,6
+252,11
@@
export abstract class AbstractPool<
(accumulator, workerNode) =>
accumulator + workerNode.tasksQueue.maxSize,
0
(accumulator, workerNode) =>
accumulator + workerNode.tasksQueue.maxSize,
0
+ ),
+ failedTasks: this.workerNodes.reduce(
+ (accumulator, workerNode) =>
+ accumulator + workerNode.workerUsage.tasks.failed,
+ 0
)
}
}
)
}
}
@@
-296,17
+310,27
@@
export abstract class AbstractPool<
}
for (const workerNode of this.workerNodes) {
this.setWorkerNodeTasksUsage(workerNode, {
}
for (const workerNode of this.workerNodes) {
this.setWorkerNodeTasksUsage(workerNode, {
- ran: 0,
- running: 0,
- runTime: 0,
- runTimeHistory: new CircularArray(),
- avgRunTime: 0,
- medRunTime: 0,
- waitTime: 0,
- waitTimeHistory: new CircularArray(),
- avgWaitTime: 0,
- medWaitTime: 0,
- error: 0,
+ tasks: {
+ executed: 0,
+ executing: 0,
+ queued:
+ this.opts.enableTasksQueue === true
+ ? workerNode.tasksQueue.size
+ : 0,
+ failed: 0
+ },
+ runTime: {
+ aggregation: 0,
+ average: 0,
+ median: 0,
+ history: new CircularArray()
+ },
+ waitTime: {
+ aggregation: 0,
+ average: 0,
+ median: 0,
+ history: new CircularArray()
+ },
elu: undefined
})
this.setWorkerStatistics(workerNode.worker)
elu: undefined
})
this.setWorkerStatistics(workerNode.worker)
@@
-374,7
+398,7
@@
export abstract class AbstractPool<
protected internalBusy (): boolean {
return (
this.workerNodes.findIndex(workerNode => {
protected internalBusy (): boolean {
return (
this.workerNodes.findIndex(workerNode => {
- return workerNode.
tasksUsage.runn
ing === 0
+ return workerNode.
workerUsage.tasks.execut
ing === 0
}) === -1
)
}
}) === -1
)
}
@@
-400,7
+424,7
@@
export abstract class AbstractPool<
if (
this.opts.enableTasksQueue === true &&
(this.busy ||
if (
this.opts.enableTasksQueue === true &&
(this.busy ||
- this.workerNodes[workerNodeKey].
tasksUsage.runn
ing >=
+ this.workerNodes[workerNodeKey].
workerUsage.tasks.execut
ing >=
((this.opts.tasksQueueOptions as TasksQueueOptions)
.concurrency as number))
) {
((this.opts.tasksQueueOptions as TasksQueueOptions)
.concurrency as number))
) {
@@
-454,7
+478,11
@@
export abstract class AbstractPool<
* @param workerNodeKey - The worker node key.
*/
protected beforeTaskExecutionHook (workerNodeKey: number): void {
* @param workerNodeKey - The worker node key.
*/
protected beforeTaskExecutionHook (workerNodeKey: number): void {
- ++this.workerNodes[workerNodeKey].tasksUsage.running
+ ++this.workerNodes[workerNodeKey].workerUsage.tasks.executing
+ if (this.opts.enableTasksQueue === true) {
+ this.workerNodes[workerNodeKey].workerUsage.tasks.queued =
+ this.tasksQueueSize(workerNodeKey)
+ }
}
/**
}
/**
@@
-468,66
+496,68
@@
export abstract class AbstractPool<
worker: Worker,
message: MessageValue<Response>
): void {
worker: Worker,
message: MessageValue<Response>
): void {
- const workerTasksUsage =
- this.workerNodes[this.getWorkerNodeKey(worker)].tasksUsage
- --workerTasksUsage.running
- ++workerTasksUsage.ran
+ const workerUsage =
+ this.workerNodes[this.getWorkerNodeKey(worker)].workerUsage
+ const workerTaskStatistics = workerUsage.tasks
+ --workerTaskStatistics.executing
+ ++workerTaskStatistics.executed
if (message.taskError != null) {
if (message.taskError != null) {
- ++workerTask
sUsage.error
+ ++workerTask
Statistics.failed
}
}
- this.updateRunTimeTasksUsage(workerTasksUsage, message)
- this.updateWaitTimeTasksUsage(workerTasksUsage, message)
- this.updateEluTasksUsage(workerTasksUsage, message)
+
+ this.updateRunTimeWorkerUsage(workerUsage, message)
+ this.updateWaitTimeWorkerUsage(workerUsage, message)
+ this.updateEluWorkerUsage(workerUsage, message)
}
}
- private updateRunTime
Tasks
Usage (
- worker
TasksUsage: Tasks
Usage,
+ private updateRunTime
Worker
Usage (
+ worker
Usage: Worker
Usage,
message: MessageValue<Response>
): void {
if (this.workerChoiceStrategyContext.getTaskStatistics().runTime) {
message: MessageValue<Response>
): void {
if (this.workerChoiceStrategyContext.getTaskStatistics().runTime) {
- worker
TasksUsage.runTime
+= message.taskPerformance?.runTime ?? 0
+ worker
Usage.runTime.aggregation
+= message.taskPerformance?.runTime ?? 0
if (
this.workerChoiceStrategyContext.getTaskStatistics().avgRunTime &&
if (
this.workerChoiceStrategyContext.getTaskStatistics().avgRunTime &&
- worker
TasksUsage.ran
!== 0
+ worker
Usage.tasks.executed
!== 0
) {
) {
- worker
TasksUsage.avgRunTim
e =
- worker
TasksUsage.runTime / workerTasksUsage.ran
+ worker
Usage.runTime.averag
e =
+ worker
Usage.runTime.aggregation / workerUsage.tasks.executed
}
if (
this.workerChoiceStrategyContext.getTaskStatistics().medRunTime &&
message.taskPerformance?.runTime != null
) {
}
if (
this.workerChoiceStrategyContext.getTaskStatistics().medRunTime &&
message.taskPerformance?.runTime != null
) {
- worker
TasksUsage.runTimeH
istory.push(message.taskPerformance.runTime)
- worker
TasksUsage.medRunTime = median(workerTasksUsage.runTimeH
istory)
+ worker
Usage.runTime.h
istory.push(message.taskPerformance.runTime)
+ worker
Usage.runTime.median = median(workerUsage.runTime.h
istory)
}
}
}
}
}
}
- private updateWaitTime
Tasks
Usage (
- worker
TasksUsage: Tasks
Usage,
+ private updateWaitTime
Worker
Usage (
+ worker
Usage: Worker
Usage,
message: MessageValue<Response>
): void {
if (this.workerChoiceStrategyContext.getTaskStatistics().waitTime) {
message: MessageValue<Response>
): void {
if (this.workerChoiceStrategyContext.getTaskStatistics().waitTime) {
- worker
TasksUsage.waitTime
+= message.taskPerformance?.waitTime ?? 0
+ worker
Usage.waitTime.aggregation
+= message.taskPerformance?.waitTime ?? 0
if (
this.workerChoiceStrategyContext.getTaskStatistics().avgWaitTime &&
if (
this.workerChoiceStrategyContext.getTaskStatistics().avgWaitTime &&
- worker
TasksUsage.ran
!== 0
+ worker
Usage.tasks.executed
!== 0
) {
) {
- worker
TasksUsage.avgWaitTim
e =
- worker
TasksUsage.waitTime / workerTasksUsage.ran
+ worker
Usage.waitTime.averag
e =
+ worker
Usage.waitTime.aggregation / workerUsage.tasks.executed
}
if (
this.workerChoiceStrategyContext.getTaskStatistics().medWaitTime &&
message.taskPerformance?.waitTime != null
) {
}
if (
this.workerChoiceStrategyContext.getTaskStatistics().medWaitTime &&
message.taskPerformance?.waitTime != null
) {
- worker
TasksUsage.waitTimeH
istory.push(message.taskPerformance.waitTime)
- worker
TasksUsage.medWaitTime = median(workerTasksUsage.waitTimeH
istory)
+ worker
Usage.waitTime.h
istory.push(message.taskPerformance.waitTime)
+ worker
Usage.waitTime.median = median(workerUsage.waitTime.h
istory)
}
}
}
}
}
}
- private updateElu
Tasks
Usage (
- workerTasksUsage:
Tasks
Usage,
+ private updateElu
Worker
Usage (
+ workerTasksUsage:
Worker
Usage,
message: MessageValue<Response>
): void {
if (this.workerChoiceStrategyContext.getTaskStatistics().elu) {
message: MessageValue<Response>
): void {
if (this.workerChoiceStrategyContext.getTaskStatistics().elu) {
@@
-566,7
+596,8
@@
export abstract class AbstractPool<
if (
isKillBehavior(KillBehaviors.HARD, message.kill) ||
(message.kill != null &&
if (
isKillBehavior(KillBehaviors.HARD, message.kill) ||
(message.kill != null &&
- this.workerNodes[currentWorkerNodeKey].tasksUsage.running === 0)
+ this.workerNodes[currentWorkerNodeKey].workerUsage.tasks
+ .executing === 0)
) {
// Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
this.flushTasksQueue(currentWorkerNodeKey)
) {
// Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
this.flushTasksQueue(currentWorkerNodeKey)
@@
-702,13
+733,13
@@
export abstract class AbstractPool<
* Sets the given worker node its tasks usage in the pool.
*
* @param workerNode - The worker node.
* Sets the given worker node its tasks usage in the pool.
*
* @param workerNode - The worker node.
- * @param
tasksUsage - The worker node tasks
usage.
+ * @param
workerUsage - The worker
usage.
*/
private setWorkerNodeTasksUsage (
workerNode: WorkerNode<Worker, Data>,
*/
private setWorkerNodeTasksUsage (
workerNode: WorkerNode<Worker, Data>,
-
tasksUsage: Tasks
Usage
+
workerUsage: Worker
Usage
): void {
): void {
- workerNode.
tasksUsage = tasks
Usage
+ workerNode.
workerUsage = worker
Usage
}
/**
}
/**
@@
-720,18
+751,26
@@
export abstract class AbstractPool<
private pushWorkerNode (worker: Worker): number {
return this.workerNodes.push({
worker,
private pushWorkerNode (worker: Worker): number {
return this.workerNodes.push({
worker,
- tasksUsage: {
- ran: 0,
- running: 0,
- runTime: 0,
- runTimeHistory: new CircularArray(),
- avgRunTime: 0,
- medRunTime: 0,
- waitTime: 0,
- waitTimeHistory: new CircularArray(),
- avgWaitTime: 0,
- medWaitTime: 0,
- error: 0,
+ workerUsage: {
+ tasks: {
+ executed: 0,
+ executing: 0,
+ queued: 0,
+ failed: 0
+ },
+ runTime: {
+ aggregation: 0,
+ average: 0,
+ median: 0,
+ history: new CircularArray()
+ },
+
+ waitTime: {
+ aggregation: 0,
+ average: 0,
+ median: 0,
+ history: new CircularArray()
+ },
elu: undefined
},
tasksQueue: new Queue<Task<Data>>()
elu: undefined
},
tasksQueue: new Queue<Task<Data>>()
@@
-743,18
+782,18
@@
export abstract class AbstractPool<
*
* @param workerNodeKey - The worker node key.
* @param worker - The worker.
*
* @param workerNodeKey - The worker node key.
* @param worker - The worker.
- * @param
tasksUsage - The worker tasks
usage.
+ * @param
workerUsage - The worker
usage.
* @param tasksQueue - The worker task queue.
*/
private setWorkerNode (
workerNodeKey: number,
worker: Worker,
* @param tasksQueue - The worker task queue.
*/
private setWorkerNode (
workerNodeKey: number,
worker: Worker,
-
tasksUsage: Tasks
Usage,
+
workerUsage: Worker
Usage,
tasksQueue: Queue<Task<Data>>
): void {
this.workerNodes[workerNodeKey] = {
worker,
tasksQueue: Queue<Task<Data>>
): void {
this.workerNodes[workerNodeKey] = {
worker,
-
tasks
Usage,
+
worker
Usage,
tasksQueue
}
}
tasksQueue
}
}