+ if (opts.tasksQueueBucketSize <= 0) {
+ throw new RangeError(
+ 'Cannot construct a worker node with a tasks queue bucket size option that is not a positive integer'
+ )
+ }
+ if (opts.tasksQueuePriority == null) {
+ throw new TypeError(
+ 'Cannot construct a worker node without a tasks queue priority option'
+ )
+ }
+ if (typeof opts.tasksQueuePriority !== 'boolean') {
+ throw new TypeError(
+ 'Cannot construct a worker node with a tasks queue priority option that is not a boolean'
+ )
+ }
+}
+
+/**
+ * Updates the given measurement statistics.
+ * @param measurementStatistics - The measurement statistics to update.
+ * @param measurementRequirements - The measurement statistics requirements.
+ * @param measurementValue - The measurement value.
+ * @internal
+ */
+const updateMeasurementStatistics = (
+ measurementStatistics: MeasurementStatistics,
+ measurementRequirements: MeasurementStatisticsRequirements | undefined,
+ measurementValue: number | undefined
+): void => {
+ if (
+ measurementRequirements != null &&
+ measurementValue != null &&
+ measurementRequirements.aggregate
+ ) {
+ measurementStatistics.aggregate =
+ (measurementStatistics.aggregate ?? 0) + measurementValue
+ measurementStatistics.minimum = min(
+ measurementValue,
+ measurementStatistics.minimum ?? Number.POSITIVE_INFINITY
+ )
+ measurementStatistics.maximum = max(
+ measurementValue,
+ measurementStatistics.maximum ?? Number.NEGATIVE_INFINITY
+ )
+ if (measurementRequirements.average || measurementRequirements.median) {
+ measurementStatistics.history.put(measurementValue)
+ if (measurementRequirements.average) {
+ measurementStatistics.average = average(
+ measurementStatistics.history.toArray()
+ )
+ } else if (measurementStatistics.average != null) {
+ delete measurementStatistics.average
+ }
+ if (measurementRequirements.median) {
+ measurementStatistics.median = median(
+ measurementStatistics.history.toArray()
+ )
+ } else if (measurementStatistics.median != null) {
+ delete measurementStatistics.median
+ }
+ }
+ }
+}
+if (env.NODE_ENV === 'test') {
+ // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
+ exports.updateMeasurementStatistics = updateMeasurementStatistics
+}
+
+export const updateWaitTimeWorkerUsage = <
+ Worker extends IWorker,
+ Data = unknown,
+ Response = unknown
+>(
+ workerChoiceStrategiesContext:
+ | WorkerChoiceStrategiesContext<Worker, Data, Response>
+ | undefined,
+ workerUsage: WorkerUsage,
+ task: Task<Data>
+ ): void => {
+ const timestamp = performance.now()
+ const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
+ updateMeasurementStatistics(
+ workerUsage.waitTime,
+ workerChoiceStrategiesContext?.getTaskStatisticsRequirements().waitTime,
+ taskWaitTime
+ )
+}
+
+export const updateTaskStatisticsWorkerUsage = <Response = unknown>(
+ workerUsage: WorkerUsage,
+ message: MessageValue<Response>
+): void => {
+ const workerTaskStatistics = workerUsage.tasks
+ if (
+ // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
+ workerTaskStatistics.executing != null &&
+ workerTaskStatistics.executing > 0
+ ) {
+ --workerTaskStatistics.executing
+ }
+ if (message.workerError == null) {
+ ++workerTaskStatistics.executed
+ } else {
+ ++workerTaskStatistics.failed
+ }
+}
+
+export const updateRunTimeWorkerUsage = <
+ Worker extends IWorker,
+ Data = unknown,
+ Response = unknown
+>(
+ workerChoiceStrategiesContext:
+ | WorkerChoiceStrategiesContext<Worker, Data, Response>
+ | undefined,
+ workerUsage: WorkerUsage,
+ message: MessageValue<Response>
+ ): void => {
+ if (message.workerError != null) {
+ return
+ }
+ updateMeasurementStatistics(
+ workerUsage.runTime,
+ workerChoiceStrategiesContext?.getTaskStatisticsRequirements().runTime,
+ message.taskPerformance?.runTime ?? 0
+ )
+}
+
+export const updateEluWorkerUsage = <
+ Worker extends IWorker,
+ Data = unknown,
+ Response = unknown
+>(
+ workerChoiceStrategiesContext:
+ | WorkerChoiceStrategiesContext<Worker, Data, Response>
+ | undefined,
+ workerUsage: WorkerUsage,
+ message: MessageValue<Response>
+ ): void => {
+ if (message.workerError != null) {
+ return
+ }
+ const eluTaskStatisticsRequirements =
+ workerChoiceStrategiesContext?.getTaskStatisticsRequirements().elu
+ updateMeasurementStatistics(
+ workerUsage.elu.active,
+ eluTaskStatisticsRequirements,
+ message.taskPerformance?.elu?.active ?? 0
+ )
+ updateMeasurementStatistics(
+ workerUsage.elu.idle,
+ eluTaskStatisticsRequirements,
+ message.taskPerformance?.elu?.idle ?? 0
+ )
+ if (eluTaskStatisticsRequirements?.aggregate === true) {
+ if (message.taskPerformance?.elu != null) {
+ if (workerUsage.elu.utilization != null) {
+ workerUsage.elu.utilization =
+ (workerUsage.elu.utilization +
+ message.taskPerformance.elu.utilization) /
+ 2
+ } else {
+ workerUsage.elu.utilization = message.taskPerformance.elu.utilization
+ }
+ }
+ }
+}
+
+export const createWorker = <Worker extends IWorker>(
+ type: WorkerType,
+ filePath: string,
+ opts: { env?: Record<string, unknown>; workerOptions?: WorkerOptions }
+): Worker => {
+ switch (type) {
+ case WorkerTypes.thread:
+ return new ThreadWorker(filePath, {
+ env: SHARE_ENV,
+ ...opts.workerOptions,
+ }) as unknown as Worker
+ case WorkerTypes.cluster:
+ return cluster.fork(opts.env) as unknown as Worker
+ default:
+ // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
+ throw new Error(`Unknown worker type '${type}'`)
+ }
+}
+
+/**
+ * Returns the worker type of the given worker.
+ * @param worker - The worker to get the type of.
+ * @returns The worker type of the given worker.
+ * @internal
+ */
+export const getWorkerType = (worker: IWorker): WorkerType | undefined => {
+ if (worker instanceof ThreadWorker) {
+ return WorkerTypes.thread
+ } else if (worker instanceof ClusterWorker) {
+ return WorkerTypes.cluster
+ }
+}
+
+/**
+ * Returns the worker id of the given worker.
+ * @param worker - The worker to get the id of.
+ * @returns The worker id of the given worker.
+ * @internal
+ */
+export const getWorkerId = (worker: IWorker): number | undefined => {
+ if (worker instanceof ThreadWorker) {
+ return worker.threadId
+ } else if (worker instanceof ClusterWorker) {
+ return worker.id
+ }
+}
+
+export const waitWorkerNodeEvents = async <
+ Worker extends IWorker,
+ Data = unknown
+>(
+ workerNode: IWorkerNode<Worker, Data>,
+ workerNodeEvent: string,
+ numberOfEventsToWait: number,
+ timeout: number
+): Promise<number> => {
+ return await new Promise<number>(resolve => {
+ let events = 0
+ if (numberOfEventsToWait === 0) {
+ resolve(events)
+ return
+ }
+ switch (workerNodeEvent) {
+ case 'idle':
+ case 'backPressure':
+ case 'taskFinished':
+ workerNode.on(workerNodeEvent, () => {
+ ++events
+ if (events === numberOfEventsToWait) {
+ resolve(events)
+ }
+ })
+ break
+ default:
+ throw new Error('Invalid worker node event')
+ }
+ if (timeout >= 0) {
+ setTimeout(() => {
+ resolve(events)
+ }, timeout)
+ }
+ })