build: reenable eslint type checking
[poolifier.git] / src / pools / utils.ts
index 6a6c1fa9d3aa9d14946d760e09301ad878ce901a..24f2cfb2a6d075b2133473bef04aa9a650607c5b 100644 (file)
@@ -4,7 +4,7 @@ import { env } from 'node:process'
 import {
   SHARE_ENV,
   Worker as ThreadWorker,
-  type WorkerOptions
+  type WorkerOptions,
 } from 'node:worker_threads'
 
 import type { MessageValue, Task } from '../utility-types.js'
@@ -13,7 +13,7 @@ import type { TasksQueueOptions } from './pool.js'
 import {
   type MeasurementStatisticsRequirements,
   WorkerChoiceStrategies,
-  type WorkerChoiceStrategy
+  type WorkerChoiceStrategy,
 } from './selection-strategies/selection-strategies-types.js'
 import type { WorkerChoiceStrategiesContext } from './selection-strategies/worker-choice-strategies-context.js'
 import {
@@ -23,7 +23,7 @@ import {
   type WorkerNodeOptions,
   type WorkerType,
   WorkerTypes,
-  type WorkerUsage
+  type WorkerUsage,
 } from './worker.js'
 
 /**
@@ -33,7 +33,7 @@ export const DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS: MeasurementStatisticsR
   {
     aggregate: false,
     average: false,
-    median: false
+    median: false,
   }
 
 export const getDefaultTasksQueueOptions = (
@@ -43,8 +43,8 @@ export const getDefaultTasksQueueOptions = (
     size: Math.pow(poolMaxSize, 2),
     concurrency: 1,
     taskStealing: true,
-    tasksStealingOnBackPressure: true,
-    tasksFinishedTimeout: 2000
+    tasksStealingOnBackPressure: false,
+    tasksFinishedTimeout: 2000,
   }
 }
 
@@ -87,6 +87,19 @@ export const checkDynamicPoolSize = (
   }
 }
 
+export const checkValidPriority = (priority: number | undefined): void => {
+  if (priority != null && !Number.isSafeInteger(priority)) {
+    throw new TypeError(`Invalid property 'priority': '${priority.toString()}'`)
+  }
+  if (
+    priority != null &&
+    Number.isSafeInteger(priority) &&
+    (priority < -20 || priority > 19)
+  ) {
+    throw new RangeError("Property 'priority' must be between -20 and 19")
+  }
+}
+
 export const checkValidWorkerChoiceStrategy = (
   workerChoiceStrategy: WorkerChoiceStrategy | undefined
 ): void => {
@@ -117,7 +130,7 @@ export const checkValidTasksQueueOptions = (
     tasksQueueOptions.concurrency <= 0
   ) {
     throw new RangeError(
-      `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero`
+      `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency.toString()} is a negative integer or zero`
     )
   }
   if (
@@ -130,7 +143,7 @@ export const checkValidTasksQueueOptions = (
   }
   if (tasksQueueOptions?.size != null && tasksQueueOptions.size <= 0) {
     throw new RangeError(
-      `Invalid worker node tasks queue size: ${tasksQueueOptions.size} is a negative integer or zero`
+      `Invalid worker node tasks queue size: ${tasksQueueOptions.size.toString()} is a negative integer or zero`
     )
   }
 }
@@ -156,7 +169,7 @@ export const checkWorkerNodeArguments = (
   }
   if (!isPlainObject(opts)) {
     throw new TypeError(
-      'Cannot construct a worker node with invalid options: must be a plain object'
+      'Cannot construct a worker node with invalid worker node options: must be a plain object'
     )
   }
   if (opts.tasksQueueBackPressureSize == null) {
@@ -174,11 +187,35 @@ export const checkWorkerNodeArguments = (
       'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer'
     )
   }
+  if (opts.tasksQueueBucketSize == null) {
+    throw new TypeError(
+      'Cannot construct a worker node without a tasks queue bucket size option'
+    )
+  }
+  if (!Number.isSafeInteger(opts.tasksQueueBucketSize)) {
+    throw new TypeError(
+      'Cannot construct a worker node with a tasks queue bucket size option that is not an integer'
+    )
+  }
+  if (opts.tasksQueueBucketSize <= 0) {
+    throw new RangeError(
+      'Cannot construct a worker node with a tasks queue bucket size option that is not a positive integer'
+    )
+  }
+  if (opts.tasksQueuePriority == null) {
+    throw new TypeError(
+      'Cannot construct a worker node without a tasks queue priority option'
+    )
+  }
+  if (typeof opts.tasksQueuePriority !== 'boolean') {
+    throw new TypeError(
+      'Cannot construct a worker node with a tasks queue priority option that is not a boolean'
+    )
+  }
 }
 
 /**
  * Updates the given measurement statistics.
- *
  * @param measurementStatistics - The measurement statistics to update.
  * @param measurementRequirements - The measurement statistics requirements.
  * @param measurementValue - The measurement value.
@@ -198,21 +235,25 @@ const updateMeasurementStatistics = (
       (measurementStatistics.aggregate ?? 0) + measurementValue
     measurementStatistics.minimum = min(
       measurementValue,
-      measurementStatistics.minimum ?? Infinity
+      measurementStatistics.minimum ?? Number.POSITIVE_INFINITY
     )
     measurementStatistics.maximum = max(
       measurementValue,
-      measurementStatistics.maximum ?? -Infinity
+      measurementStatistics.maximum ?? Number.NEGATIVE_INFINITY
     )
     if (measurementRequirements.average || measurementRequirements.median) {
-      measurementStatistics.history.push(measurementValue)
+      measurementStatistics.history.put(measurementValue)
       if (measurementRequirements.average) {
-        measurementStatistics.average = average(measurementStatistics.history)
+        measurementStatistics.average = average(
+          measurementStatistics.history.toArray()
+        )
       } else if (measurementStatistics.average != null) {
         delete measurementStatistics.average
       }
       if (measurementRequirements.median) {
-        measurementStatistics.median = median(measurementStatistics.history)
+        measurementStatistics.median = median(
+          measurementStatistics.history.toArray()
+        )
       } else if (measurementStatistics.median != null) {
         delete measurementStatistics.median
       }
@@ -229,7 +270,7 @@ export const updateWaitTimeWorkerUsage = <
   Data = unknown,
   Response = unknown
 >(
-    workerChoiceStrategyContext:
+    workerChoiceStrategiesContext:
     | WorkerChoiceStrategiesContext<Worker, Data, Response>
     | undefined,
     workerUsage: WorkerUsage,
@@ -239,7 +280,7 @@ export const updateWaitTimeWorkerUsage = <
   const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
   updateMeasurementStatistics(
     workerUsage.waitTime,
-    workerChoiceStrategyContext?.getTaskStatisticsRequirements().waitTime,
+    workerChoiceStrategiesContext?.getTaskStatisticsRequirements().waitTime,
     taskWaitTime
   )
 }
@@ -268,7 +309,7 @@ export const updateRunTimeWorkerUsage = <
   Data = unknown,
   Response = unknown
 >(
-    workerChoiceStrategyContext:
+    workerChoiceStrategiesContext:
     | WorkerChoiceStrategiesContext<Worker, Data, Response>
     | undefined,
     workerUsage: WorkerUsage,
@@ -279,7 +320,7 @@ export const updateRunTimeWorkerUsage = <
   }
   updateMeasurementStatistics(
     workerUsage.runTime,
-    workerChoiceStrategyContext?.getTaskStatisticsRequirements().runTime,
+    workerChoiceStrategiesContext?.getTaskStatisticsRequirements().runTime,
     message.taskPerformance?.runTime ?? 0
   )
 }
@@ -289,7 +330,7 @@ export const updateEluWorkerUsage = <
   Data = unknown,
   Response = unknown
 >(
-    workerChoiceStrategyContext:
+    workerChoiceStrategiesContext:
     | WorkerChoiceStrategiesContext<Worker, Data, Response>
     | undefined,
     workerUsage: WorkerUsage,
@@ -299,7 +340,7 @@ export const updateEluWorkerUsage = <
     return
   }
   const eluTaskStatisticsRequirements =
-    workerChoiceStrategyContext?.getTaskStatisticsRequirements().elu
+    workerChoiceStrategiesContext?.getTaskStatisticsRequirements().elu
   updateMeasurementStatistics(
     workerUsage.elu.active,
     eluTaskStatisticsRequirements,
@@ -327,13 +368,13 @@ export const updateEluWorkerUsage = <
 export const createWorker = <Worker extends IWorker>(
   type: WorkerType,
   filePath: string,
-  opts: { env?: Record<string, unknown>, workerOptions?: WorkerOptions }
+  opts: { env?: Record<string, unknown>; workerOptions?: WorkerOptions }
 ): Worker => {
   switch (type) {
     case WorkerTypes.thread:
       return new ThreadWorker(filePath, {
         env: SHARE_ENV,
-        ...opts.workerOptions
+        ...opts.workerOptions,
       }) as unknown as Worker
     case WorkerTypes.cluster:
       return cluster.fork(opts.env) as unknown as Worker
@@ -345,7 +386,6 @@ export const createWorker = <Worker extends IWorker>(
 
 /**
  * Returns the worker type of the given worker.
- *
  * @param worker - The worker to get the type of.
  * @returns The worker type of the given worker.
  * @internal
@@ -360,7 +400,6 @@ export const getWorkerType = (worker: IWorker): WorkerType | undefined => {
 
 /**
  * Returns the worker id of the given worker.
- *
  * @param worker - The worker to get the id of.
  * @returns The worker id of the given worker.
  * @internal
@@ -388,12 +427,20 @@ export const waitWorkerNodeEvents = async <
       resolve(events)
       return
     }
-    workerNode.on(workerNodeEvent, () => {
-      ++events
-      if (events === numberOfEventsToWait) {
-        resolve(events)
-      }
-    })
+    switch (workerNodeEvent) {
+      case 'idle':
+      case 'backPressure':
+      case 'taskFinished':
+        workerNode.on(workerNodeEvent, () => {
+          ++events
+          if (events === numberOfEventsToWait) {
+            resolve(events)
+          }
+        })
+        break
+      default:
+        throw new Error('Invalid worker node event')
+    }
     if (timeout >= 0) {
       setTimeout(() => {
         resolve(events)