repositories
/
poolifier.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
chore: v4.0.13
[poolifier.git]
/
src
/
pools
/
abstract-pool.ts
diff --git
a/src/pools/abstract-pool.ts
b/src/pools/abstract-pool.ts
index d71b724446ff150d983329233668e19c2e63f6b5..8f57a9067e0967bf87dd303efabfe694b00bdbc6 100644
(file)
--- a/
src/pools/abstract-pool.ts
+++ b/
src/pools/abstract-pool.ts
@@
-4,6
+4,7
@@
import { EventEmitterAsyncResource } from 'node:events'
import { performance } from 'node:perf_hooks'
import type { TransferListItem } from 'node:worker_threads'
import { performance } from 'node:perf_hooks'
import type { TransferListItem } from 'node:worker_threads'
+import { defaultBucketSize } from '../priority-queue.js'
import type {
MessageValue,
PromiseResponseWrapper,
import type {
MessageValue,
PromiseResponseWrapper,
@@
-376,14
+377,16
@@
export abstract class AbstractPool<
minimum: round(
min(
...this.workerNodes.map(
minimum: round(
min(
...this.workerNodes.map(
- workerNode => workerNode.usage.runTime.minimum ?? Infinity
+ workerNode =>
+ workerNode.usage.runTime.minimum ?? Number.POSITIVE_INFINITY
)
)
),
maximum: round(
max(
...this.workerNodes.map(
)
)
),
maximum: round(
max(
...this.workerNodes.map(
- workerNode => workerNode.usage.runTime.maximum ?? -Infinity
+ workerNode =>
+ workerNode.usage.runTime.maximum ?? Number.NEGATIVE_INFINITY
)
)
),
)
)
),
@@
-393,7
+396,9
@@
export abstract class AbstractPool<
average(
this.workerNodes.reduce<number[]>(
(accumulator, workerNode) =>
average(
this.workerNodes.reduce<number[]>(
(accumulator, workerNode) =>
- accumulator.concat(workerNode.usage.runTime.history),
+ accumulator.concat(
+ workerNode.usage.runTime.history.toArray()
+ ),
[]
)
)
[]
)
)
@@
-405,7
+410,9
@@
export abstract class AbstractPool<
median(
this.workerNodes.reduce<number[]>(
(accumulator, workerNode) =>
median(
this.workerNodes.reduce<number[]>(
(accumulator, workerNode) =>
- accumulator.concat(workerNode.usage.runTime.history),
+ accumulator.concat(
+ workerNode.usage.runTime.history.toArray()
+ ),
[]
)
)
[]
)
)
@@
-419,14
+426,16
@@
export abstract class AbstractPool<
minimum: round(
min(
...this.workerNodes.map(
minimum: round(
min(
...this.workerNodes.map(
- workerNode => workerNode.usage.waitTime.minimum ?? Infinity
+ workerNode =>
+ workerNode.usage.waitTime.minimum ?? Number.POSITIVE_INFINITY
)
)
),
maximum: round(
max(
...this.workerNodes.map(
)
)
),
maximum: round(
max(
...this.workerNodes.map(
- workerNode => workerNode.usage.waitTime.maximum ?? -Infinity
+ workerNode =>
+ workerNode.usage.waitTime.maximum ?? Number.NEGATIVE_INFINITY
)
)
),
)
)
),
@@
-436,7
+445,9
@@
export abstract class AbstractPool<
average(
this.workerNodes.reduce<number[]>(
(accumulator, workerNode) =>
average(
this.workerNodes.reduce<number[]>(
(accumulator, workerNode) =>
- accumulator.concat(workerNode.usage.waitTime.history),
+ accumulator.concat(
+ workerNode.usage.waitTime.history.toArray()
+ ),
[]
)
)
[]
)
)
@@
-448,7
+459,9
@@
export abstract class AbstractPool<
median(
this.workerNodes.reduce<number[]>(
(accumulator, workerNode) =>
median(
this.workerNodes.reduce<number[]>(
(accumulator, workerNode) =>
- accumulator.concat(workerNode.usage.waitTime.history),
+ accumulator.concat(
+ workerNode.usage.waitTime.history.toArray()
+ ),
[]
)
)
[]
)
)
@@
-463,14
+476,18
@@
export abstract class AbstractPool<
minimum: round(
min(
...this.workerNodes.map(
minimum: round(
min(
...this.workerNodes.map(
- workerNode => workerNode.usage.elu.idle.minimum ?? Infinity
+ workerNode =>
+ workerNode.usage.elu.idle.minimum ??
+ Number.POSITIVE_INFINITY
)
)
),
maximum: round(
max(
...this.workerNodes.map(
)
)
),
maximum: round(
max(
...this.workerNodes.map(
- workerNode => workerNode.usage.elu.idle.maximum ?? -Infinity
+ workerNode =>
+ workerNode.usage.elu.idle.maximum ??
+ Number.NEGATIVE_INFINITY
)
)
),
)
)
),
@@
-480,7
+497,9
@@
export abstract class AbstractPool<
average(
this.workerNodes.reduce<number[]>(
(accumulator, workerNode) =>
average(
this.workerNodes.reduce<number[]>(
(accumulator, workerNode) =>
- accumulator.concat(workerNode.usage.elu.idle.history),
+ accumulator.concat(
+ workerNode.usage.elu.idle.history.toArray()
+ ),
[]
)
)
[]
)
)
@@
-492,7
+511,9
@@
export abstract class AbstractPool<
median(
this.workerNodes.reduce<number[]>(
(accumulator, workerNode) =>
median(
this.workerNodes.reduce<number[]>(
(accumulator, workerNode) =>
- accumulator.concat(workerNode.usage.elu.idle.history),
+ accumulator.concat(
+ workerNode.usage.elu.idle.history.toArray()
+ ),
[]
)
)
[]
)
)
@@
-503,14
+524,18
@@
export abstract class AbstractPool<
minimum: round(
min(
...this.workerNodes.map(
minimum: round(
min(
...this.workerNodes.map(
- workerNode => workerNode.usage.elu.active.minimum ?? Infinity
+ workerNode =>
+ workerNode.usage.elu.active.minimum ??
+ Number.POSITIVE_INFINITY
)
)
),
maximum: round(
max(
...this.workerNodes.map(
)
)
),
maximum: round(
max(
...this.workerNodes.map(
- workerNode => workerNode.usage.elu.active.maximum ?? -Infinity
+ workerNode =>
+ workerNode.usage.elu.active.maximum ??
+ Number.NEGATIVE_INFINITY
)
)
),
)
)
),
@@
-520,7
+545,9
@@
export abstract class AbstractPool<
average(
this.workerNodes.reduce<number[]>(
(accumulator, workerNode) =>
average(
this.workerNodes.reduce<number[]>(
(accumulator, workerNode) =>
- accumulator.concat(workerNode.usage.elu.active.history),
+ accumulator.concat(
+ workerNode.usage.elu.active.history.toArray()
+ ),
[]
)
)
[]
)
)
@@
-532,12
+559,30
@@
export abstract class AbstractPool<
median(
this.workerNodes.reduce<number[]>(
(accumulator, workerNode) =>
median(
this.workerNodes.reduce<number[]>(
(accumulator, workerNode) =>
- accumulator.concat(workerNode.usage.elu.active.history),
+ accumulator.concat(
+ workerNode.usage.elu.active.history.toArray()
+ ),
[]
)
)
)
})
[]
)
)
)
})
+ },
+ utilization: {
+ average: round(
+ average(
+ this.workerNodes.map(
+ workerNode => workerNode.usage.elu.utilization ?? 0
+ )
+ )
+ ),
+ median: round(
+ median(
+ this.workerNodes.map(
+ workerNode => workerNode.usage.elu.utilization ?? 0
+ )
+ )
+ )
}
}
})
}
}
})
@@
-1054,7
+1099,7
@@
export abstract class AbstractPool<
*
* @param workerNodeKey - The worker node key.
* @param name - The task function name.
*
* @param workerNodeKey - The worker node key.
* @param name - The task function name.
- * @returns The
task function worker choice priority if the task function worker choice
priority is defined, `undefined` otherwise.
+ * @returns The
worker node task function priority if the worker node task function
priority is defined, `undefined` otherwise.
*/
private readonly getWorkerNodeTaskFunctionPriority = (
workerNodeKey: number,
*/
private readonly getWorkerNodeTaskFunctionPriority = (
workerNodeKey: number,
@@
-1429,7
+1474,7
@@
export abstract class AbstractPool<
* Chooses a worker node for the next task.
*
* @param name - The task function name.
* Chooses a worker node for the next task.
*
* @param name - The task function name.
- * @returns The chosen worker node key
+ * @returns The chosen worker node key
.
*/
private chooseWorkerNode (name?: string): number {
if (this.shallCreateDynamicWorker()) {
*/
private chooseWorkerNode (name?: string): number {
if (this.shallCreateDynamicWorker()) {
@@
-1479,7
+1524,8
@@
export abstract class AbstractPool<
) {
workerNode.usage.runTime.aggregate = min(
...this.workerNodes.map(
) {
workerNode.usage.runTime.aggregate = min(
...this.workerNodes.map(
- workerNode => workerNode.usage.runTime.aggregate ?? Infinity
+ workerNode =>
+ workerNode.usage.runTime.aggregate ?? Number.POSITIVE_INFINITY
)
)
}
)
)
}
@@
-1489,7
+1535,8
@@
export abstract class AbstractPool<
) {
workerNode.usage.waitTime.aggregate = min(
...this.workerNodes.map(
) {
workerNode.usage.waitTime.aggregate = min(
...this.workerNodes.map(
- workerNode => workerNode.usage.waitTime.aggregate ?? Infinity
+ workerNode =>
+ workerNode.usage.waitTime.aggregate ?? Number.POSITIVE_INFINITY
)
)
}
)
)
}
@@
-1499,7
+1546,8
@@
export abstract class AbstractPool<
) {
workerNode.usage.elu.active.aggregate = min(
...this.workerNodes.map(
) {
workerNode.usage.elu.active.aggregate = min(
...this.workerNodes.map(
- workerNode => workerNode.usage.elu.active.aggregate ?? Infinity
+ workerNode =>
+ workerNode.usage.elu.active.aggregate ?? Number.POSITIVE_INFINITY
)
)
}
)
)
}
@@
-1581,6
+1629,7
@@
export abstract class AbstractPool<
const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
message.workerId
)
const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
message.workerId
)
+ const workerInfo = this.getWorkerInfo(localWorkerNodeKey)
const workerUsage = this.workerNodes[localWorkerNodeKey]?.usage
// Kill message received from worker
if (
const workerUsage = this.workerNodes[localWorkerNodeKey]?.usage
// Kill message received from worker
if (
@@
-1589,6
+1638,8
@@
export abstract class AbstractPool<
((this.opts.enableTasksQueue === false &&
workerUsage.tasks.executing === 0) ||
(this.opts.enableTasksQueue === true &&
((this.opts.enableTasksQueue === false &&
workerUsage.tasks.executing === 0) ||
(this.opts.enableTasksQueue === true &&
+ workerInfo != null &&
+ !workerInfo.stealing &&
workerUsage.tasks.executing === 0 &&
this.tasksQueueSize(localWorkerNodeKey) === 0)))
) {
workerUsage.tasks.executing === 0 &&
this.tasksQueueSize(localWorkerNodeKey) === 0)))
) {
@@
-1974,6
+2025,12
@@
export abstract class AbstractPool<
}
}
}
}
+ private setTasksQueuePriority (workerNodeKey: number): void {
+ this.workerNodes[workerNodeKey].setTasksQueuePriority(
+ this.getTasksQueuePriority()
+ )
+ }
+
/**
* This method is the message listener registered on each worker.
*/
/**
* This method is the message listener registered on each worker.
*/
@@
-1992,6
+2049,7
@@
export abstract class AbstractPool<
if (workerInfo != null) {
workerInfo.taskFunctionsProperties = taskFunctionsProperties
this.sendStatisticsMessageToWorker(workerNodeKey)
if (workerInfo != null) {
workerInfo.taskFunctionsProperties = taskFunctionsProperties
this.sendStatisticsMessageToWorker(workerNodeKey)
+ this.setTasksQueuePriority(workerNodeKey)
}
} else if (taskId != null) {
// Task execution response received from worker
}
} else if (taskId != null) {
// Task execution response received from worker
@@
-2016,6
+2074,7
@@
export abstract class AbstractPool<
workerNode.info.ready = ready
workerNode.info.taskFunctionsProperties = taskFunctionsProperties
this.sendStatisticsMessageToWorker(workerNodeKey)
workerNode.info.ready = ready
workerNode.info.taskFunctionsProperties = taskFunctionsProperties
this.sendStatisticsMessageToWorker(workerNodeKey)
+ this.setTasksQueuePriority(workerNodeKey)
this.checkAndEmitReadyEvent()
}
this.checkAndEmitReadyEvent()
}
@@
-2103,6
+2162,12
@@
export abstract class AbstractPool<
return this.workerNodes[workerNodeKey]?.info
}
return this.workerNodes[workerNodeKey]?.info
}
+ private getTasksQueuePriority (): boolean {
+ return this.listTaskFunctionsProperties().some(
+ taskFunctionProperties => taskFunctionProperties.priority != null
+ )
+ }
+
/**
* Creates a worker node.
*
/**
* Creates a worker node.
*
@@
-2120,8
+2185,8
@@
export abstract class AbstractPool<
getDefaultTasksQueueOptions(
this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
).size,
getDefaultTasksQueueOptions(
this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
).size,
- tasksQueueBucketSize:
- (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers) * 2
+ tasksQueueBucketSize:
defaultBucketSize,
+ tasksQueuePriority: this.getTasksQueuePriority()
}
)
// Flag the worker node as ready at pool startup.
}
)
// Flag the worker node as ready at pool startup.