import { existsSync } from 'node:fs'
import cluster from 'node:cluster'
import { SHARE_ENV, Worker, type WorkerOptions } from 'node:worker_threads'
+import { env } from 'node:process'
import { average, isPlainObject, max, median, min } from '../utils'
+import type { MessageValue, Task } from '../utility-types'
import {
type MeasurementStatisticsRequirements,
WorkerChoiceStrategies,
import type { TasksQueueOptions } from './pool'
import {
type IWorker,
+ type IWorkerNode,
type MeasurementStatistics,
type WorkerNodeOptions,
type WorkerType,
- WorkerTypes
+ WorkerTypes,
+ type WorkerUsage
} from './worker'
+import type { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
+
+export const getDefaultTasksQueueOptions = (
+ poolMaxSize: number
+): Required<TasksQueueOptions> => {
+ return {
+ size: Math.pow(poolMaxSize, 2),
+ concurrency: 1,
+ taskStealing: true,
+ tasksStealingOnBackPressure: true,
+ tasksFinishedTimeout: 2000
+ }
+}
export const checkFilePath = (filePath: string): void => {
if (filePath == null) {
* @param measurementStatistics - The measurement statistics to update.
* @param measurementRequirements - The measurement statistics requirements.
* @param measurementValue - The measurement value.
- * @param numberOfMeasurements - The number of measurements.
* @internal
*/
-export const updateMeasurementStatistics = (
+const updateMeasurementStatistics = (
measurementStatistics: MeasurementStatistics,
measurementRequirements: MeasurementStatisticsRequirements,
measurementValue: number
}
}
}
+if (env.NODE_ENV === 'test') {
+ // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
+ exports.updateMeasurementStatistics = updateMeasurementStatistics
+}
+
+export const updateWaitTimeWorkerUsage = <
+ Worker extends IWorker,
+ Data = unknown,
+ Response = unknown
+>(
+ workerChoiceStrategyContext: WorkerChoiceStrategyContext<
+ Worker,
+ Data,
+ Response
+ >,
+ workerUsage: WorkerUsage,
+ task: Task<Data>
+ ): void => {
+ const timestamp = performance.now()
+ const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
+ updateMeasurementStatistics(
+ workerUsage.waitTime,
+ workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
+ taskWaitTime
+ )
+}
+
+export const updateTaskStatisticsWorkerUsage = <Response = unknown>(
+ workerUsage: WorkerUsage,
+ message: MessageValue<Response>
+): void => {
+ const workerTaskStatistics = workerUsage.tasks
+ if (
+ workerTaskStatistics.executing != null &&
+ workerTaskStatistics.executing > 0
+ ) {
+ --workerTaskStatistics.executing
+ }
+ if (message.workerError == null) {
+ ++workerTaskStatistics.executed
+ } else {
+ ++workerTaskStatistics.failed
+ }
+}
+
+export const updateRunTimeWorkerUsage = <
+ Worker extends IWorker,
+ Data = unknown,
+ Response = unknown
+>(
+ workerChoiceStrategyContext: WorkerChoiceStrategyContext<
+ Worker,
+ Data,
+ Response
+ >,
+ workerUsage: WorkerUsage,
+ message: MessageValue<Response>
+ ): void => {
+ if (message.workerError != null) {
+ return
+ }
+ updateMeasurementStatistics(
+ workerUsage.runTime,
+ workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
+ message.taskPerformance?.runTime ?? 0
+ )
+}
+
+export const updateEluWorkerUsage = <
+ Worker extends IWorker,
+ Data = unknown,
+ Response = unknown
+>(
+ workerChoiceStrategyContext: WorkerChoiceStrategyContext<
+ Worker,
+ Data,
+ Response
+ >,
+ workerUsage: WorkerUsage,
+ message: MessageValue<Response>
+ ): void => {
+ if (message.workerError != null) {
+ return
+ }
+ const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
+ workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
+ updateMeasurementStatistics(
+ workerUsage.elu.active,
+ eluTaskStatisticsRequirements,
+ message.taskPerformance?.elu?.active ?? 0
+ )
+ updateMeasurementStatistics(
+ workerUsage.elu.idle,
+ eluTaskStatisticsRequirements,
+ message.taskPerformance?.elu?.idle ?? 0
+ )
+ if (eluTaskStatisticsRequirements.aggregate) {
+ if (message.taskPerformance?.elu != null) {
+ if (workerUsage.elu.utilization != null) {
+ workerUsage.elu.utilization =
+ (workerUsage.elu.utilization +
+ message.taskPerformance.elu.utilization) /
+ 2
+ } else {
+ workerUsage.elu.utilization = message.taskPerformance.elu.utilization
+ }
+ }
+ }
+}
export const createWorker = <Worker extends IWorker>(
type: WorkerType,
throw new Error(`Unknown worker type '${type}'`)
}
}
+
+export const waitWorkerNodeEvents = async <
+ Worker extends IWorker,
+ Data = unknown
+>(
+ workerNode: IWorkerNode<Worker, Data>,
+ workerNodeEvent: string,
+ numberOfEventsToWait: number,
+ timeout: number
+): Promise<number> => {
+ return await new Promise<number>(resolve => {
+ let events = 0
+ if (numberOfEventsToWait === 0) {
+ resolve(events)
+ return
+ }
+ workerNode.on(workerNodeEvent, () => {
+ ++events
+ if (events === numberOfEventsToWait) {
+ resolve(events)
+ }
+ })
+ if (timeout >= 0) {
+ setTimeout(() => {
+ resolve(events)
+ }, timeout)
+ }
+ })
+}