fix: use estimated cpu speed instead of random one
[poolifier.git] / src / pools / utils.ts
index a97f3e6e121d1cb3e366aa277a4bcbd43b6d6429..7c1b7866e6d958687ba6dc2e2c29827706caab4f 100644 (file)
@@ -1,15 +1,23 @@
+import cluster, { Worker as ClusterWorker } from 'node:cluster'
 import { existsSync } from 'node:fs'
-import cluster from 'node:cluster'
-import { SHARE_ENV, Worker, type WorkerOptions } from 'node:worker_threads'
+import { cpus } from 'node:os'
 import { env } from 'node:process'
-import { average, isPlainObject, max, median, min } from '../utils.js'
+import {
+  SHARE_ENV,
+  Worker as ThreadWorker,
+  type WorkerOptions
+} from 'node:worker_threads'
+
 import type { MessageValue, Task } from '../utility-types.js'
+import { average, isPlainObject, max, median, min } from '../utils.js'
+import type { IPool, TasksQueueOptions } from './pool.js'
 import {
   type MeasurementStatisticsRequirements,
   WorkerChoiceStrategies,
-  type WorkerChoiceStrategy
+  type WorkerChoiceStrategy,
+  type WorkerChoiceStrategyOptions
 } from './selection-strategies/selection-strategies-types.js'
-import type { TasksQueueOptions } from './pool.js'
+import type { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context.js'
 import {
   type IWorker,
   type IWorkerNode,
@@ -19,7 +27,16 @@ import {
   WorkerTypes,
   type WorkerUsage
 } from './worker.js'
-import type { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context.js'
+
+/**
+ * Default measurement statistics requirements.
+ */
+export const DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS: MeasurementStatisticsRequirements =
+  {
+    aggregate: false,
+    average: false,
+    median: false
+  }
 
 export const getDefaultTasksQueueOptions = (
   poolMaxSize: number
@@ -33,6 +50,86 @@ export const getDefaultTasksQueueOptions = (
   }
 }
 
+export const getWorkerChoiceStrategyRetries = <
+  Worker extends IWorker,
+  Data,
+  Response
+>(
+    pool: IPool<Worker, Data, Response>,
+    opts?: WorkerChoiceStrategyOptions
+  ): number => {
+  return (
+    pool.info.maxSize +
+    Object.keys(opts?.weights ?? getDefaultWeights(pool.info.maxSize)).length
+  )
+}
+
+export const buildWorkerChoiceStrategyOptions = <
+  Worker extends IWorker,
+  Data,
+  Response
+>(
+    pool: IPool<Worker, Data, Response>,
+    opts?: WorkerChoiceStrategyOptions
+  ): WorkerChoiceStrategyOptions => {
+  opts = clone(opts ?? {})
+  opts.weights = opts.weights ?? getDefaultWeights(pool.info.maxSize)
+  return {
+    ...{
+      runTime: { median: false },
+      waitTime: { median: false },
+      elu: { median: false }
+    },
+    ...opts
+  }
+}
+
+const clone = <T>(object: T): T => {
+  return structuredClone<T>(object)
+}
+
+const getDefaultWeights = (
+  poolMaxSize: number,
+  defaultWorkerWeight?: number
+): Record<number, number> => {
+  defaultWorkerWeight = defaultWorkerWeight ?? getDefaultWorkerWeight()
+  const weights: Record<number, number> = {}
+  for (let workerNodeKey = 0; workerNodeKey < poolMaxSize; workerNodeKey++) {
+    weights[workerNodeKey] = defaultWorkerWeight
+  }
+  return weights
+}
+
+const estimatedCpuSpeed = (): number => {
+  const runs = 150000000
+  const begin = performance.now()
+  // eslint-disable-next-line no-empty
+  for (let i = runs; i > 0; i--) {}
+  const end = performance.now()
+  const duration = end - begin
+  return Math.trunc(runs / duration / 1000) // in MHz
+}
+
+const estCpuSpeed = estimatedCpuSpeed()
+
+const getDefaultWorkerWeight = (estimatedCpuSpeed = estCpuSpeed): number => {
+  let cpusCycleTimeWeight = 0
+  for (const cpu of cpus()) {
+    // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
+    if (cpu.speed == null || cpu.speed === 0) {
+      cpu.speed =
+        // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
+        cpus().find(cpu => cpu.speed != null && cpu.speed !== 0)?.speed ??
+        estimatedCpuSpeed
+    }
+    // CPU estimated cycle time
+    const numberOfDigits = cpu.speed.toString().length - 1
+    const cpuCycleTime = 1 / (cpu.speed / Math.pow(10, numberOfDigits))
+    cpusCycleTimeWeight += cpuCycleTime * Math.pow(10, numberOfDigits)
+  }
+  return Math.round(cpusCycleTimeWeight / cpus().length)
+}
+
 export const checkFilePath = (filePath: string | undefined): void => {
   if (filePath == null) {
     throw new TypeError('The worker file path must be specified')
@@ -316,7 +413,7 @@ export const createWorker = <Worker extends IWorker>(
 ): Worker => {
   switch (type) {
     case WorkerTypes.thread:
-      return new Worker(filePath, {
+      return new ThreadWorker(filePath, {
         env: SHARE_ENV,
         ...opts.workerOptions
       }) as unknown as Worker
@@ -328,6 +425,36 @@ export const createWorker = <Worker extends IWorker>(
   }
 }
 
+/**
+ * Returns the worker type of the given worker.
+ *
+ * @param worker - The worker to get the type of.
+ * @returns The worker type of the given worker.
+ * @internal
+ */
+export const getWorkerType = (worker: IWorker): WorkerType | undefined => {
+  if (worker instanceof ThreadWorker) {
+    return WorkerTypes.thread
+  } else if (worker instanceof ClusterWorker) {
+    return WorkerTypes.cluster
+  }
+}
+
+/**
+ * Returns the worker id of the given worker.
+ *
+ * @param worker - The worker to get the id of.
+ * @returns The worker id of the given worker.
+ * @internal
+ */
+export const getWorkerId = (worker: IWorker): number | undefined => {
+  if (worker instanceof ThreadWorker) {
+    return worker.threadId
+  } else if (worker instanceof ClusterWorker) {
+    return worker.id
+  }
+}
+
 export const waitWorkerNodeEvents = async <
   Worker extends IWorker,
   Data = unknown