Merge pull request #2174 from poolifier/task-functions-properties
[poolifier.git] / src / pools / utils.ts
index a97f3e6e121d1cb3e366aa277a4bcbd43b6d6429..3fa17312d1fe24f54fb121eeda4d9dee6124e0c9 100644 (file)
@@ -1,15 +1,21 @@
+import cluster, { Worker as ClusterWorker } from 'node:cluster'
 import { existsSync } from 'node:fs'
-import cluster from 'node:cluster'
-import { SHARE_ENV, Worker, type WorkerOptions } from 'node:worker_threads'
 import { env } from 'node:process'
-import { average, isPlainObject, max, median, min } from '../utils.js'
+import {
+  SHARE_ENV,
+  Worker as ThreadWorker,
+  type WorkerOptions
+} from 'node:worker_threads'
+
 import type { MessageValue, Task } from '../utility-types.js'
+import { average, isPlainObject, max, median, min } from '../utils.js'
+import type { TasksQueueOptions } from './pool.js'
 import {
   type MeasurementStatisticsRequirements,
   WorkerChoiceStrategies,
   type WorkerChoiceStrategy
 } from './selection-strategies/selection-strategies-types.js'
-import type { TasksQueueOptions } from './pool.js'
+import type { WorkerChoiceStrategiesContext } from './selection-strategies/worker-choice-strategies-context.js'
 import {
   type IWorker,
   type IWorkerNode,
@@ -19,7 +25,16 @@ import {
   WorkerTypes,
   type WorkerUsage
 } from './worker.js'
-import type { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context.js'
+
+/**
+ * Default measurement statistics requirements.
+ */
+export const DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS: MeasurementStatisticsRequirements =
+  {
+    aggregate: false,
+    average: false,
+    median: false
+  }
 
 export const getDefaultTasksQueueOptions = (
   poolMaxSize: number
@@ -72,6 +87,19 @@ export const checkDynamicPoolSize = (
   }
 }
 
+export const checkValidPriority = (priority: number | undefined): void => {
+  if (priority != null && !Number.isSafeInteger(priority)) {
+    throw new TypeError(`Invalid property 'priority': '${priority}'`)
+  }
+  if (
+    priority != null &&
+    Number.isSafeInteger(priority) &&
+    (priority < -20 || priority > 19)
+  ) {
+    throw new RangeError("Property 'priority' must be between -20 and 19")
+  }
+}
+
 export const checkValidWorkerChoiceStrategy = (
   workerChoiceStrategy: WorkerChoiceStrategy | undefined
 ): void => {
@@ -215,7 +243,7 @@ export const updateWaitTimeWorkerUsage = <
   Response = unknown
 >(
     workerChoiceStrategyContext:
-    | WorkerChoiceStrategyContext<Worker, Data, Response>
+    | WorkerChoiceStrategiesContext<Worker, Data, Response>
     | undefined,
     workerUsage: WorkerUsage,
     task: Task<Data>
@@ -254,7 +282,7 @@ export const updateRunTimeWorkerUsage = <
   Response = unknown
 >(
     workerChoiceStrategyContext:
-    | WorkerChoiceStrategyContext<Worker, Data, Response>
+    | WorkerChoiceStrategiesContext<Worker, Data, Response>
     | undefined,
     workerUsage: WorkerUsage,
     message: MessageValue<Response>
@@ -275,7 +303,7 @@ export const updateEluWorkerUsage = <
   Response = unknown
 >(
     workerChoiceStrategyContext:
-    | WorkerChoiceStrategyContext<Worker, Data, Response>
+    | WorkerChoiceStrategiesContext<Worker, Data, Response>
     | undefined,
     workerUsage: WorkerUsage,
     message: MessageValue<Response>
@@ -316,7 +344,7 @@ export const createWorker = <Worker extends IWorker>(
 ): Worker => {
   switch (type) {
     case WorkerTypes.thread:
-      return new Worker(filePath, {
+      return new ThreadWorker(filePath, {
         env: SHARE_ENV,
         ...opts.workerOptions
       }) as unknown as Worker
@@ -328,6 +356,36 @@ export const createWorker = <Worker extends IWorker>(
   }
 }
 
+/**
+ * Returns the worker type of the given worker.
+ *
+ * @param worker - The worker to get the type of.
+ * @returns The worker type of the given worker.
+ * @internal
+ */
+export const getWorkerType = (worker: IWorker): WorkerType | undefined => {
+  if (worker instanceof ThreadWorker) {
+    return WorkerTypes.thread
+  } else if (worker instanceof ClusterWorker) {
+    return WorkerTypes.cluster
+  }
+}
+
+/**
+ * Returns the worker id of the given worker.
+ *
+ * @param worker - The worker to get the id of.
+ * @returns The worker id of the given worker.
+ * @internal
+ */
+export const getWorkerId = (worker: IWorker): number | undefined => {
+  if (worker instanceof ThreadWorker) {
+    return worker.threadId
+  } else if (worker instanceof ClusterWorker) {
+    return worker.id
+  }
+}
+
 export const waitWorkerNodeEvents = async <
   Worker extends IWorker,
   Data = unknown