feat: add per task function strategy support
[poolifier.git] / src / pools / utils.ts
index 724c4ab960c691e8f4a5d8661f9c442ff14d5b3d..6a6c1fa9d3aa9d14946d760e09301ad878ce901a 100644 (file)
@@ -1,6 +1,5 @@
 import cluster, { Worker as ClusterWorker } from 'node:cluster'
 import { existsSync } from 'node:fs'
-import { cpus } from 'node:os'
 import { env } from 'node:process'
 import {
   SHARE_ENV,
@@ -10,14 +9,13 @@ import {
 
 import type { MessageValue, Task } from '../utility-types.js'
 import { average, isPlainObject, max, median, min } from '../utils.js'
-import type { IPool, TasksQueueOptions } from './pool.js'
+import type { TasksQueueOptions } from './pool.js'
 import {
   type MeasurementStatisticsRequirements,
   WorkerChoiceStrategies,
-  type WorkerChoiceStrategy,
-  type WorkerChoiceStrategyOptions
+  type WorkerChoiceStrategy
 } from './selection-strategies/selection-strategies-types.js'
-import type { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context.js'
+import type { WorkerChoiceStrategiesContext } from './selection-strategies/worker-choice-strategies-context.js'
 import {
   type IWorker,
   type IWorkerNode,
@@ -50,91 +48,6 @@ export const getDefaultTasksQueueOptions = (
   }
 }
 
-export const getWorkerChoiceStrategyRetries = <
-  Worker extends IWorker,
-  Data,
-  Response
->(
-    pool: IPool<Worker, Data, Response>,
-    opts?: WorkerChoiceStrategyOptions
-  ): number => {
-  return (
-    pool.info.maxSize +
-    Object.keys(opts?.weights ?? getDefaultWeights(pool.info.maxSize)).length
-  )
-}
-
-export const buildWorkerChoiceStrategyOptions = <
-  Worker extends IWorker,
-  Data,
-  Response
->(
-    pool: IPool<Worker, Data, Response>,
-    opts?: WorkerChoiceStrategyOptions
-  ): WorkerChoiceStrategyOptions => {
-  opts = clone(opts ?? {})
-  opts.weights = opts.weights ?? getDefaultWeights(pool.info.maxSize)
-  return {
-    ...{
-      runTime: { median: false },
-      waitTime: { median: false },
-      elu: { median: false }
-    },
-    ...opts
-  }
-}
-
-const clone = <T>(object: T): T => {
-  return structuredClone<T>(object)
-}
-
-const getDefaultWeights = (
-  poolMaxSize: number,
-  defaultWorkerWeight?: number
-): Record<number, number> => {
-  defaultWorkerWeight = defaultWorkerWeight ?? getDefaultWorkerWeight()
-  const weights: Record<number, number> = {}
-  for (let workerNodeKey = 0; workerNodeKey < poolMaxSize; workerNodeKey++) {
-    weights[workerNodeKey] = defaultWorkerWeight
-  }
-  return weights
-}
-
-const estimatedCpuSpeed = (): number => {
-  const runs = 150000000
-  const begin = performance.now()
-  // eslint-disable-next-line no-empty
-  for (let i = runs; i > 0; i--) {}
-  const end = performance.now()
-  const duration = end - begin
-  return Math.trunc(runs / duration / 1000) // in MHz
-}
-
-const getDefaultWorkerWeight = (): number => {
-  const currentCpus = cpus()
-  let estCpuSpeed: number | undefined
-  // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
-  if (currentCpus.every(cpu => cpu.speed == null || cpu.speed === 0)) {
-    estCpuSpeed = estimatedCpuSpeed()
-  }
-  let cpusCycleTimeWeight = 0
-  for (const cpu of currentCpus) {
-    // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
-    if (cpu.speed == null || cpu.speed === 0) {
-      cpu.speed =
-        // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
-        currentCpus.find(cpu => cpu.speed != null && cpu.speed !== 0)?.speed ??
-        estCpuSpeed ??
-        2000
-    }
-    // CPU estimated cycle time
-    const numberOfDigits = cpu.speed.toString().length - 1
-    const cpuCycleTime = 1 / (cpu.speed / Math.pow(10, numberOfDigits))
-    cpusCycleTimeWeight += cpuCycleTime * Math.pow(10, numberOfDigits)
-  }
-  return Math.round(cpusCycleTimeWeight / currentCpus.length)
-}
-
 export const checkFilePath = (filePath: string | undefined): void => {
   if (filePath == null) {
     throw new TypeError('The worker file path must be specified')
@@ -317,7 +230,7 @@ export const updateWaitTimeWorkerUsage = <
   Response = unknown
 >(
     workerChoiceStrategyContext:
-    | WorkerChoiceStrategyContext<Worker, Data, Response>
+    | WorkerChoiceStrategiesContext<Worker, Data, Response>
     | undefined,
     workerUsage: WorkerUsage,
     task: Task<Data>
@@ -356,7 +269,7 @@ export const updateRunTimeWorkerUsage = <
   Response = unknown
 >(
     workerChoiceStrategyContext:
-    | WorkerChoiceStrategyContext<Worker, Data, Response>
+    | WorkerChoiceStrategiesContext<Worker, Data, Response>
     | undefined,
     workerUsage: WorkerUsage,
     message: MessageValue<Response>
@@ -377,7 +290,7 @@ export const updateEluWorkerUsage = <
   Response = unknown
 >(
     workerChoiceStrategyContext:
-    | WorkerChoiceStrategyContext<Worker, Data, Response>
+    | WorkerChoiceStrategiesContext<Worker, Data, Response>
     | undefined,
     workerUsage: WorkerUsage,
     message: MessageValue<Response>