+import cluster, { Worker as ClusterWorker } from 'node:cluster'
import { existsSync } from 'node:fs'
-import cluster from 'node:cluster'
-import { SHARE_ENV, Worker, type WorkerOptions } from 'node:worker_threads'
+import { cpus } from 'node:os'
import { env } from 'node:process'
-import { average, isPlainObject, max, median, min } from '../utils'
-import type { MessageValue, Task } from '../utility-types'
+import {
+ SHARE_ENV,
+ Worker as ThreadWorker,
+ type WorkerOptions
+} from 'node:worker_threads'
+
+import type { MessageValue, Task } from '../utility-types.js'
+import { average, isPlainObject, max, median, min } from '../utils.js'
+import type { IPool, TasksQueueOptions } from './pool.js'
import {
type MeasurementStatisticsRequirements,
WorkerChoiceStrategies,
- type WorkerChoiceStrategy
-} from './selection-strategies/selection-strategies-types'
-import type { TasksQueueOptions } from './pool'
+ type WorkerChoiceStrategy,
+ type WorkerChoiceStrategyOptions
+} from './selection-strategies/selection-strategies-types.js'
+import type { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context.js'
import {
type IWorker,
type IWorkerNode,
type WorkerType,
WorkerTypes,
type WorkerUsage
-} from './worker'
-import type { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
+} from './worker.js'
+
+/**
+ * Default measurement statistics requirements.
+ */
+export const DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS: MeasurementStatisticsRequirements =
+ {
+ aggregate: false,
+ average: false,
+ median: false
+ }
export const getDefaultTasksQueueOptions = (
poolMaxSize: number
}
}
-export const checkFilePath = (filePath: string): void => {
+export const getWorkerChoiceStrategyRetries = <
+ Worker extends IWorker,
+ Data,
+ Response
+>(
+ pool: IPool<Worker, Data, Response>,
+ opts?: WorkerChoiceStrategyOptions
+ ): number => {
+ return (
+ pool.info.maxSize +
+ Object.keys(opts?.weights ?? getDefaultWeights(pool.info.maxSize)).length
+ )
+}
+
+export const buildWorkerChoiceStrategyOptions = <
+ Worker extends IWorker,
+ Data,
+ Response
+>(
+ pool: IPool<Worker, Data, Response>,
+ opts?: WorkerChoiceStrategyOptions
+ ): WorkerChoiceStrategyOptions => {
+ opts = clone(opts ?? {})
+ opts.weights = opts.weights ?? getDefaultWeights(pool.info.maxSize)
+ return {
+ ...{
+ runTime: { median: false },
+ waitTime: { median: false },
+ elu: { median: false }
+ },
+ ...opts
+ }
+}
+
+const clone = <T>(object: T): T => {
+ return structuredClone<T>(object)
+}
+
+const getDefaultWeights = (
+ poolMaxSize: number,
+ defaultWorkerWeight?: number
+): Record<number, number> => {
+ defaultWorkerWeight = defaultWorkerWeight ?? getDefaultWorkerWeight()
+ const weights: Record<number, number> = {}
+ for (let workerNodeKey = 0; workerNodeKey < poolMaxSize; workerNodeKey++) {
+ weights[workerNodeKey] = defaultWorkerWeight
+ }
+ return weights
+}
+
+const estimatedCpuSpeed = (): number => {
+ const runs = 150000000
+ const begin = performance.now()
+ // eslint-disable-next-line no-empty
+ for (let i = runs; i > 0; i--) {}
+ const end = performance.now()
+ const duration = end - begin
+ return Math.trunc(runs / duration / 1000) // in MHz
+}
+
+const getDefaultWorkerWeight = (): number => {
+ const currentCpus = cpus()
+ let estCpuSpeed: number | undefined
+ // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
+ if (currentCpus.every(cpu => cpu.speed == null || cpu.speed === 0)) {
+ estCpuSpeed = estimatedCpuSpeed()
+ }
+ let cpusCycleTimeWeight = 0
+ for (const cpu of currentCpus) {
+ // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
+ if (cpu.speed == null || cpu.speed === 0) {
+ cpu.speed =
+ // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
+ currentCpus.find(cpu => cpu.speed != null && cpu.speed !== 0)?.speed ??
+ estCpuSpeed ??
+ 2000
+ }
+ // CPU estimated cycle time
+ const numberOfDigits = cpu.speed.toString().length - 1
+ const cpuCycleTime = 1 / (cpu.speed / Math.pow(10, numberOfDigits))
+ cpusCycleTimeWeight += cpuCycleTime * Math.pow(10, numberOfDigits)
+ }
+ return Math.round(cpusCycleTimeWeight / currentCpus.length)
+}
+
+export const checkFilePath = (filePath: string | undefined): void => {
if (filePath == null) {
throw new TypeError('The worker file path must be specified')
}
}
}
-export const checkDynamicPoolSize = (min: number, max: number): void => {
+export const checkDynamicPoolSize = (
+ min: number,
+ max: number | undefined
+): void => {
if (max == null) {
throw new TypeError(
'Cannot instantiate a dynamic pool without specifying the maximum pool size'
}
export const checkValidWorkerChoiceStrategy = (
- workerChoiceStrategy: WorkerChoiceStrategy
+ workerChoiceStrategy: WorkerChoiceStrategy | undefined
): void => {
if (
workerChoiceStrategy != null &&
}
export const checkValidTasksQueueOptions = (
- tasksQueueOptions: TasksQueueOptions
+ tasksQueueOptions: TasksQueueOptions | undefined
): void => {
if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
throw new TypeError('Invalid tasks queue options: must be a plain object')
}
export const checkWorkerNodeArguments = (
- type: WorkerType,
- filePath: string,
- opts: WorkerNodeOptions
+ type: WorkerType | undefined,
+ filePath: string | undefined,
+ opts: WorkerNodeOptions | undefined
): void => {
if (type == null) {
throw new TypeError('Cannot construct a worker node without a worker type')
* @param measurementStatistics - The measurement statistics to update.
* @param measurementRequirements - The measurement statistics requirements.
* @param measurementValue - The measurement value.
- * @param numberOfMeasurements - The number of measurements.
* @internal
*/
const updateMeasurementStatistics = (
measurementStatistics: MeasurementStatistics,
- measurementRequirements: MeasurementStatisticsRequirements,
- measurementValue: number
+ measurementRequirements: MeasurementStatisticsRequirements | undefined,
+ measurementValue: number | undefined
): void => {
- if (measurementRequirements.aggregate) {
+ if (
+ measurementRequirements != null &&
+ measurementValue != null &&
+ measurementRequirements.aggregate
+ ) {
measurementStatistics.aggregate =
(measurementStatistics.aggregate ?? 0) + measurementValue
measurementStatistics.minimum = min(
measurementValue,
measurementStatistics.maximum ?? -Infinity
)
- if (
- (measurementRequirements.average || measurementRequirements.median) &&
- measurementValue != null
- ) {
+ if (measurementRequirements.average || measurementRequirements.median) {
measurementStatistics.history.push(measurementValue)
if (measurementRequirements.average) {
measurementStatistics.average = average(measurementStatistics.history)
Data = unknown,
Response = unknown
>(
- workerChoiceStrategyContext: WorkerChoiceStrategyContext<
- Worker,
- Data,
- Response
- >,
+ workerChoiceStrategyContext:
+ | WorkerChoiceStrategyContext<Worker, Data, Response>
+ | undefined,
workerUsage: WorkerUsage,
task: Task<Data>
): void => {
const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
updateMeasurementStatistics(
workerUsage.waitTime,
- workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
+ workerChoiceStrategyContext?.getTaskStatisticsRequirements().waitTime,
taskWaitTime
)
}
): void => {
const workerTaskStatistics = workerUsage.tasks
if (
+ // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
workerTaskStatistics.executing != null &&
workerTaskStatistics.executing > 0
) {
Data = unknown,
Response = unknown
>(
- workerChoiceStrategyContext: WorkerChoiceStrategyContext<
- Worker,
- Data,
- Response
- >,
+ workerChoiceStrategyContext:
+ | WorkerChoiceStrategyContext<Worker, Data, Response>
+ | undefined,
workerUsage: WorkerUsage,
message: MessageValue<Response>
): void => {
}
updateMeasurementStatistics(
workerUsage.runTime,
- workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
+ workerChoiceStrategyContext?.getTaskStatisticsRequirements().runTime,
message.taskPerformance?.runTime ?? 0
)
}
Data = unknown,
Response = unknown
>(
- workerChoiceStrategyContext: WorkerChoiceStrategyContext<
- Worker,
- Data,
- Response
- >,
+ workerChoiceStrategyContext:
+ | WorkerChoiceStrategyContext<Worker, Data, Response>
+ | undefined,
workerUsage: WorkerUsage,
message: MessageValue<Response>
): void => {
if (message.workerError != null) {
return
}
- const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
- workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
+ const eluTaskStatisticsRequirements =
+ workerChoiceStrategyContext?.getTaskStatisticsRequirements().elu
updateMeasurementStatistics(
workerUsage.elu.active,
eluTaskStatisticsRequirements,
eluTaskStatisticsRequirements,
message.taskPerformance?.elu?.idle ?? 0
)
- if (eluTaskStatisticsRequirements.aggregate) {
+ if (eluTaskStatisticsRequirements?.aggregate === true) {
if (message.taskPerformance?.elu != null) {
if (workerUsage.elu.utilization != null) {
workerUsage.elu.utilization =
): Worker => {
switch (type) {
case WorkerTypes.thread:
- return new Worker(filePath, {
+ return new ThreadWorker(filePath, {
env: SHARE_ENV,
- ...opts?.workerOptions
+ ...opts.workerOptions
}) as unknown as Worker
case WorkerTypes.cluster:
- return cluster.fork(opts?.env) as unknown as Worker
+ return cluster.fork(opts.env) as unknown as Worker
default:
// eslint-disable-next-line @typescript-eslint/restrict-template-expressions
throw new Error(`Unknown worker type '${type}'`)
}
}
+/**
+ * Returns the worker type of the given worker.
+ *
+ * @param worker - The worker to get the type of.
+ * @returns The worker type of the given worker.
+ * @internal
+ */
+export const getWorkerType = (worker: IWorker): WorkerType | undefined => {
+ if (worker instanceof ThreadWorker) {
+ return WorkerTypes.thread
+ } else if (worker instanceof ClusterWorker) {
+ return WorkerTypes.cluster
+ }
+}
+
+/**
+ * Returns the worker id of the given worker.
+ *
+ * @param worker - The worker to get the id of.
+ * @returns The worker id of the given worker.
+ * @internal
+ */
+export const getWorkerId = (worker: IWorker): number | undefined => {
+ if (worker instanceof ThreadWorker) {
+ return worker.threadId
+ } else if (worker instanceof ClusterWorker) {
+ return worker.id
+ }
+}
+
export const waitWorkerNodeEvents = async <
Worker extends IWorker,
Data = unknown
resolve(events)
}
})
- if (timeout > 0) {
+ if (timeout >= 0) {
setTimeout(() => {
resolve(events)
}, timeout)