fix: disable `tasksStealingOnBackPressure` by default
[poolifier.git] / src / pools / utils.ts
index ff2709826076459a8ef4ee417221b333b3ca70e3..df9c256e95390d0731b730ae6726678d5bcbdfb7 100644 (file)
@@ -1,22 +1,21 @@
-import { existsSync } from 'node:fs'
 import cluster, { Worker as ClusterWorker } from 'node:cluster'
+import { existsSync } from 'node:fs'
+import { env } from 'node:process'
 import {
   SHARE_ENV,
   Worker as ThreadWorker,
   type WorkerOptions
 } from 'node:worker_threads'
-import { env } from 'node:process'
-import { randomInt } from 'node:crypto'
-import { cpus } from 'node:os'
-import { average, isPlainObject, max, median, min } from '../utils.js'
+
 import type { MessageValue, Task } from '../utility-types.js'
+import { average, isPlainObject, max, median, min } from '../utils.js'
+import type { TasksQueueOptions } from './pool.js'
 import {
   type MeasurementStatisticsRequirements,
   WorkerChoiceStrategies,
-  type WorkerChoiceStrategy,
-  type WorkerChoiceStrategyOptions
+  type WorkerChoiceStrategy
 } from './selection-strategies/selection-strategies-types.js'
-import type { IPool, TasksQueueOptions } from './pool.js'
+import type { WorkerChoiceStrategiesContext } from './selection-strategies/worker-choice-strategies-context.js'
 import {
   type IWorker,
   type IWorkerNode,
@@ -26,7 +25,6 @@ import {
   WorkerTypes,
   type WorkerUsage
 } from './worker.js'
-import type { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context.js'
 
 /**
  * Default measurement statistics requirements.
@@ -45,80 +43,11 @@ export const getDefaultTasksQueueOptions = (
     size: Math.pow(poolMaxSize, 2),
     concurrency: 1,
     taskStealing: true,
-    tasksStealingOnBackPressure: true,
+    tasksStealingOnBackPressure: false,
     tasksFinishedTimeout: 2000
   }
 }
 
-export const getWorkerChoiceStrategyRetries = <
-  Worker extends IWorker,
-  Data,
-  Response
->(
-    pool: IPool<Worker, Data, Response>,
-    opts?: WorkerChoiceStrategyOptions
-  ): number => {
-  return (
-    pool.info.maxSize +
-    Object.keys(opts?.weights ?? getDefaultWeights(pool.info.maxSize)).length
-  )
-}
-
-export const buildWorkerChoiceStrategyOptions = <
-  Worker extends IWorker,
-  Data,
-  Response
->(
-    pool: IPool<Worker, Data, Response>,
-    opts?: WorkerChoiceStrategyOptions
-  ): WorkerChoiceStrategyOptions => {
-  opts = clone(opts ?? {})
-  opts.weights = opts.weights ?? getDefaultWeights(pool.info.maxSize)
-  return {
-    ...{
-      runTime: { median: false },
-      waitTime: { median: false },
-      elu: { median: false }
-    },
-    ...opts
-  }
-}
-
-const clone = <T>(object: T): T => {
-  return structuredClone<T>(object)
-}
-
-const getDefaultWeights = (
-  poolMaxSize: number,
-  defaultWorkerWeight?: number
-): Record<number, number> => {
-  defaultWorkerWeight = defaultWorkerWeight ?? getDefaultWorkerWeight()
-  const weights: Record<number, number> = {}
-  for (let workerNodeKey = 0; workerNodeKey < poolMaxSize; workerNodeKey++) {
-    weights[workerNodeKey] = defaultWorkerWeight
-  }
-  return weights
-}
-
-const getDefaultWorkerWeight = (): number => {
-  const cpuSpeed = randomInt(500, 2500)
-  let cpusCycleTimeWeight = 0
-  for (const cpu of cpus()) {
-    // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
-    if (cpu.speed == null || cpu.speed === 0) {
-      cpu.speed =
-        // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
-        cpus().find(cpu => cpu.speed != null && cpu.speed !== 0)?.speed ??
-        cpuSpeed
-    }
-    // CPU estimated cycle time
-    const numberOfDigits = cpu.speed.toString().length - 1
-    const cpuCycleTime = 1 / (cpu.speed / Math.pow(10, numberOfDigits))
-    cpusCycleTimeWeight += cpuCycleTime * Math.pow(10, numberOfDigits)
-  }
-  return Math.round(cpusCycleTimeWeight / cpus().length)
-}
-
 export const checkFilePath = (filePath: string | undefined): void => {
   if (filePath == null) {
     throw new TypeError('The worker file path must be specified')
@@ -158,6 +87,19 @@ export const checkDynamicPoolSize = (
   }
 }
 
+export const checkValidPriority = (priority: number | undefined): void => {
+  if (priority != null && !Number.isSafeInteger(priority)) {
+    throw new TypeError(`Invalid property 'priority': '${priority}'`)
+  }
+  if (
+    priority != null &&
+    Number.isSafeInteger(priority) &&
+    (priority < -20 || priority > 19)
+  ) {
+    throw new RangeError("Property 'priority' must be between -20 and 19")
+  }
+}
+
 export const checkValidWorkerChoiceStrategy = (
   workerChoiceStrategy: WorkerChoiceStrategy | undefined
 ): void => {
@@ -245,6 +187,21 @@ export const checkWorkerNodeArguments = (
       'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer'
     )
   }
+  if (opts.tasksQueueBucketSize == null) {
+    throw new TypeError(
+      'Cannot construct a worker node without a tasks queue bucket size option'
+    )
+  }
+  if (!Number.isSafeInteger(opts.tasksQueueBucketSize)) {
+    throw new TypeError(
+      'Cannot construct a worker node with a tasks queue bucket size option that is not an integer'
+    )
+  }
+  if (opts.tasksQueueBucketSize <= 0) {
+    throw new RangeError(
+      'Cannot construct a worker node with a tasks queue bucket size option that is not a positive integer'
+    )
+  }
 }
 
 /**
@@ -300,8 +257,8 @@ export const updateWaitTimeWorkerUsage = <
   Data = unknown,
   Response = unknown
 >(
-    workerChoiceStrategyContext:
-    | WorkerChoiceStrategyContext<Worker, Data, Response>
+    workerChoiceStrategiesContext:
+    | WorkerChoiceStrategiesContext<Worker, Data, Response>
     | undefined,
     workerUsage: WorkerUsage,
     task: Task<Data>
@@ -310,7 +267,7 @@ export const updateWaitTimeWorkerUsage = <
   const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
   updateMeasurementStatistics(
     workerUsage.waitTime,
-    workerChoiceStrategyContext?.getTaskStatisticsRequirements().waitTime,
+    workerChoiceStrategiesContext?.getTaskStatisticsRequirements().waitTime,
     taskWaitTime
   )
 }
@@ -339,8 +296,8 @@ export const updateRunTimeWorkerUsage = <
   Data = unknown,
   Response = unknown
 >(
-    workerChoiceStrategyContext:
-    | WorkerChoiceStrategyContext<Worker, Data, Response>
+    workerChoiceStrategiesContext:
+    | WorkerChoiceStrategiesContext<Worker, Data, Response>
     | undefined,
     workerUsage: WorkerUsage,
     message: MessageValue<Response>
@@ -350,7 +307,7 @@ export const updateRunTimeWorkerUsage = <
   }
   updateMeasurementStatistics(
     workerUsage.runTime,
-    workerChoiceStrategyContext?.getTaskStatisticsRequirements().runTime,
+    workerChoiceStrategiesContext?.getTaskStatisticsRequirements().runTime,
     message.taskPerformance?.runTime ?? 0
   )
 }
@@ -360,8 +317,8 @@ export const updateEluWorkerUsage = <
   Data = unknown,
   Response = unknown
 >(
-    workerChoiceStrategyContext:
-    | WorkerChoiceStrategyContext<Worker, Data, Response>
+    workerChoiceStrategiesContext:
+    | WorkerChoiceStrategiesContext<Worker, Data, Response>
     | undefined,
     workerUsage: WorkerUsage,
     message: MessageValue<Response>
@@ -370,7 +327,7 @@ export const updateEluWorkerUsage = <
     return
   }
   const eluTaskStatisticsRequirements =
-    workerChoiceStrategyContext?.getTaskStatisticsRequirements().elu
+    workerChoiceStrategiesContext?.getTaskStatisticsRequirements().elu
   updateMeasurementStatistics(
     workerUsage.elu.active,
     eluTaskStatisticsRequirements,