type WorkerChoiceStrategyOptions
} from './selection-strategies/selection-strategies-types'
import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
+import { version } from './version'
/**
* Base class that implements some shared logic for all poolifier pools.
/** @inheritDoc */
public get info (): PoolInfo {
return {
+ version,
type: this.type,
worker: this.worker,
minSize: this.minSize,
maxSize: this.maxSize,
...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
.runTime.aggregate &&
- this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
- .aggregate && { utilization: round(this.utilization) }),
+ this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ .waitTime.aggregate && { utilization: round(this.utilization) }),
workerNodes: this.workerNodes.length,
idleWorkerNodes: this.workerNodes.reduce(
(accumulator, workerNode) =>
}
}
- /**
- * Gets the pool run time.
- *
- * @returns The pool run time in milliseconds.
- */
- private get runTime (): number {
- return performance.now() - this.startTimestamp
- }
-
/**
* Gets the approximate pool utilization.
*
* @returns The pool utilization.
*/
private get utilization (): number {
- const poolRunTimeCapacity = this.runTime * this.maxSize
+ const poolRunTimeCapacity =
+ (performance.now() - this.startTimestamp) * this.maxSize
const totalTasksRunTime = this.workerNodes.reduce(
(accumulator, workerNode) =>
accumulator + workerNode.usage.runTime.aggregate,
this.workerNodes.map(async (workerNode, workerNodeKey) => {
this.flushTasksQueue(workerNodeKey)
// FIXME: wait for tasks to be finished
+ const workerExitPromise = new Promise<void>(resolve => {
+ workerNode.worker.on('exit', () => {
+ resolve()
+ })
+ })
await this.destroyWorker(workerNode.worker)
+ await workerExitPromise
})
)
}
this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
.aggregate
) {
- workerUsage.runTime.aggregate += message.taskPerformance?.runTime ?? 0
+ const taskRunTime = message.taskPerformance?.runTime ?? 0
+ workerUsage.runTime.aggregate += taskRunTime
+ workerUsage.runTime.minimum = Math.min(
+ taskRunTime,
+ workerUsage.runTime.minimum
+ )
+ workerUsage.runTime.maximum = Math.max(
+ taskRunTime,
+ workerUsage.runTime.maximum
+ )
if (
this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
.average &&
this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime
.aggregate
) {
- workerUsage.waitTime.aggregate += taskWaitTime ?? 0
+ workerUsage.waitTime.aggregate += taskWaitTime
+ workerUsage.waitTime.minimum = Math.min(
+ taskWaitTime,
+ workerUsage.waitTime.minimum
+ )
+ workerUsage.waitTime.maximum = Math.max(
+ taskWaitTime,
+ workerUsage.waitTime.maximum
+ )
if (
this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
.waitTime.average &&
this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
.aggregate
) {
- if (workerUsage.elu != null && message.taskPerformance?.elu != null) {
+ if (message.taskPerformance?.elu != null) {
workerUsage.elu.idle.aggregate += message.taskPerformance.elu.idle
workerUsage.elu.active.aggregate += message.taskPerformance.elu.active
- workerUsage.elu.utilization =
- (workerUsage.elu.utilization +
- message.taskPerformance.elu.utilization) /
- 2
- } else if (message.taskPerformance?.elu != null) {
- workerUsage.elu.idle.aggregate = message.taskPerformance.elu.idle
- workerUsage.elu.active.aggregate = message.taskPerformance.elu.active
- workerUsage.elu.utilization = message.taskPerformance.elu.utilization
+ if (workerUsage.elu.utilization != null) {
+ workerUsage.elu.utilization =
+ (workerUsage.elu.utilization +
+ message.taskPerformance.elu.utilization) /
+ 2
+ } else {
+ workerUsage.elu.utilization = message.taskPerformance.elu.utilization
+ }
+ workerUsage.elu.idle.minimum = Math.min(
+ message.taskPerformance.elu.idle,
+ workerUsage.elu.idle.minimum
+ )
+ workerUsage.elu.idle.maximum = Math.max(
+ message.taskPerformance.elu.idle,
+ workerUsage.elu.idle.maximum
+ )
+ workerUsage.elu.active.minimum = Math.min(
+ message.taskPerformance.elu.active,
+ workerUsage.elu.active.minimum
+ )
+ workerUsage.elu.active.maximum = Math.max(
+ message.taskPerformance.elu.active,
+ workerUsage.elu.active.maximum
+ )
}
if (
this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
if (this.emitter != null) {
this.emitter.emit(PoolEvents.error, error)
}
+ if (this.opts.enableTasksQueue === true) {
+ const workerNodeKey = this.getWorkerNodeKey(worker)
+ while (this.tasksQueueSize(workerNodeKey) > 0) {
+ let targetWorkerNodeKey: number = workerNodeKey
+ let minQueuedTasks = Infinity
+ for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
+ if (
+ workerNodeId !== workerNodeKey &&
+ workerNode.usage.tasks.queued === 0
+ ) {
+ targetWorkerNodeKey = workerNodeId
+ break
+ }
+ if (
+ workerNodeId !== workerNodeKey &&
+ workerNode.usage.tasks.queued < minQueuedTasks
+ ) {
+ minQueuedTasks = workerNode.usage.tasks.queued
+ targetWorkerNodeKey = workerNodeId
+ }
+ }
+ this.enqueueTask(
+ targetWorkerNodeKey,
+ this.dequeueTask(workerNodeKey) as Task<Data>
+ )
+ }
+ }
if (this.opts.restartWorkerOnError === true) {
this.createAndSetupWorker()
}
}
private flushTasksQueue (workerNodeKey: number): void {
- if (this.tasksQueueSize(workerNodeKey) > 0) {
- for (let i = 0; i < this.tasksQueueSize(workerNodeKey); i++) {
- this.executeTask(
- workerNodeKey,
- this.dequeueTask(workerNodeKey) as Task<Data>
- )
- }
+ while (this.tasksQueueSize(workerNodeKey) > 0) {
+ this.executeTask(
+ workerNodeKey,
+ this.dequeueTask(workerNodeKey) as Task<Data>
+ )
}
this.workerNodes[workerNodeKey].tasksQueue.clear()
}
},
runTime: {
aggregate: 0,
+ maximum: 0,
+ minimum: 0,
average: 0,
median: 0,
history: new CircularArray()
},
waitTime: {
aggregate: 0,
+ maximum: 0,
+ minimum: 0,
average: 0,
median: 0,
history: new CircularArray()
elu: {
idle: {
aggregate: 0,
+ maximum: 0,
+ minimum: 0,
average: 0,
median: 0,
history: new CircularArray()
},
active: {
aggregate: 0,
+ maximum: 0,
+ minimum: 0,
average: 0,
median: 0,
history: new CircularArray()
- },
- utilization: 0
+ }
}
}
}