refactor: code cleanups
[poolifier.git] / src / pools / utils.ts
index 36bd8939fdf82155c208d966d502d3098f11ffe3..e69725df102ed2ea083512b76f5dce75139a6894 100644 (file)
@@ -43,7 +43,7 @@ export const getDefaultTasksQueueOptions = (
     size: Math.pow(poolMaxSize, 2),
     concurrency: 1,
     taskStealing: true,
-    tasksStealingOnBackPressure: true,
+    tasksStealingOnBackPressure: false,
     tasksFinishedTimeout: 2000
   }
 }
@@ -89,14 +89,14 @@ export const checkDynamicPoolSize = (
 
 export const checkValidPriority = (priority: number | undefined): void => {
   if (priority != null && !Number.isSafeInteger(priority)) {
-    throw new TypeError(`Invalid priority '${priority}'`)
+    throw new TypeError(`Invalid property 'priority': '${priority}'`)
   }
   if (
     priority != null &&
     Number.isSafeInteger(priority) &&
     (priority < -20 || priority > 19)
   ) {
-    throw new RangeError('Property priority must be between -20 and 19')
+    throw new RangeError("Property 'priority' must be between -20 and 19")
   }
 }
 
@@ -187,6 +187,21 @@ export const checkWorkerNodeArguments = (
       'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer'
     )
   }
+  if (opts.tasksQueueBucketSize == null) {
+    throw new TypeError(
+      'Cannot construct a worker node without a tasks queue bucket size option'
+    )
+  }
+  if (!Number.isSafeInteger(opts.tasksQueueBucketSize)) {
+    throw new TypeError(
+      'Cannot construct a worker node with a tasks queue bucket size option that is not an integer'
+    )
+  }
+  if (opts.tasksQueueBucketSize <= 0) {
+    throw new RangeError(
+      'Cannot construct a worker node with a tasks queue bucket size option that is not a positive integer'
+    )
+  }
 }
 
 /**
@@ -211,11 +226,11 @@ const updateMeasurementStatistics = (
       (measurementStatistics.aggregate ?? 0) + measurementValue
     measurementStatistics.minimum = min(
       measurementValue,
-      measurementStatistics.minimum ?? Infinity
+      measurementStatistics.minimum ?? Number.POSITIVE_INFINITY
     )
     measurementStatistics.maximum = max(
       measurementValue,
-      measurementStatistics.maximum ?? -Infinity
+      measurementStatistics.maximum ?? Number.NEGATIVE_INFINITY
     )
     if (measurementRequirements.average || measurementRequirements.median) {
       measurementStatistics.history.push(measurementValue)
@@ -242,7 +257,7 @@ export const updateWaitTimeWorkerUsage = <
   Data = unknown,
   Response = unknown
 >(
-    workerChoiceStrategyContext:
+    workerChoiceStrategiesContext:
     | WorkerChoiceStrategiesContext<Worker, Data, Response>
     | undefined,
     workerUsage: WorkerUsage,
@@ -252,7 +267,7 @@ export const updateWaitTimeWorkerUsage = <
   const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
   updateMeasurementStatistics(
     workerUsage.waitTime,
-    workerChoiceStrategyContext?.getTaskStatisticsRequirements().waitTime,
+    workerChoiceStrategiesContext?.getTaskStatisticsRequirements().waitTime,
     taskWaitTime
   )
 }
@@ -281,7 +296,7 @@ export const updateRunTimeWorkerUsage = <
   Data = unknown,
   Response = unknown
 >(
-    workerChoiceStrategyContext:
+    workerChoiceStrategiesContext:
     | WorkerChoiceStrategiesContext<Worker, Data, Response>
     | undefined,
     workerUsage: WorkerUsage,
@@ -292,7 +307,7 @@ export const updateRunTimeWorkerUsage = <
   }
   updateMeasurementStatistics(
     workerUsage.runTime,
-    workerChoiceStrategyContext?.getTaskStatisticsRequirements().runTime,
+    workerChoiceStrategiesContext?.getTaskStatisticsRequirements().runTime,
     message.taskPerformance?.runTime ?? 0
   )
 }
@@ -302,7 +317,7 @@ export const updateEluWorkerUsage = <
   Data = unknown,
   Response = unknown
 >(
-    workerChoiceStrategyContext:
+    workerChoiceStrategiesContext:
     | WorkerChoiceStrategiesContext<Worker, Data, Response>
     | undefined,
     workerUsage: WorkerUsage,
@@ -312,7 +327,7 @@ export const updateEluWorkerUsage = <
     return
   }
   const eluTaskStatisticsRequirements =
-    workerChoiceStrategyContext?.getTaskStatisticsRequirements().elu
+    workerChoiceStrategiesContext?.getTaskStatisticsRequirements().elu
   updateMeasurementStatistics(
     workerUsage.elu.active,
     eluTaskStatisticsRequirements,
@@ -401,12 +416,20 @@ export const waitWorkerNodeEvents = async <
       resolve(events)
       return
     }
-    workerNode.on(workerNodeEvent, () => {
-      ++events
-      if (events === numberOfEventsToWait) {
-        resolve(events)
-      }
-    })
+    switch (workerNodeEvent) {
+      case 'idle':
+      case 'backPressure':
+      case 'taskFinished':
+        workerNode.on(workerNodeEvent, () => {
+          ++events
+          if (events === numberOfEventsToWait) {
+            resolve(events)
+          }
+        })
+        break
+      default:
+        throw new Error('Invalid worker node event')
+    }
     if (timeout >= 0) {
       setTimeout(() => {
         resolve(events)