-import { existsSync } from 'node:fs'
import cluster, { Worker as ClusterWorker } from 'node:cluster'
+import { existsSync } from 'node:fs'
+import { env } from 'node:process'
import {
SHARE_ENV,
Worker as ThreadWorker,
- type WorkerOptions
+ type WorkerOptions,
} from 'node:worker_threads'
-import { env } from 'node:process'
-import { randomInt } from 'node:crypto'
-import { cpus } from 'node:os'
-import { average, isPlainObject, max, median, min } from '../utils.js'
+
import type { MessageValue, Task } from '../utility-types.js'
+import { average, isPlainObject, max, median, min } from '../utils.js'
+import type { TasksQueueOptions } from './pool.js'
import {
type MeasurementStatisticsRequirements,
WorkerChoiceStrategies,
type WorkerChoiceStrategy,
- type WorkerChoiceStrategyOptions
} from './selection-strategies/selection-strategies-types.js'
-import type { IPool, TasksQueueOptions } from './pool.js'
+import type { WorkerChoiceStrategiesContext } from './selection-strategies/worker-choice-strategies-context.js'
import {
type IWorker,
type IWorkerNode,
type WorkerNodeOptions,
type WorkerType,
WorkerTypes,
- type WorkerUsage
+ type WorkerUsage,
} from './worker.js'
-import type { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context.js'
/**
* Default measurement statistics requirements.
{
aggregate: false,
average: false,
- median: false
+ median: false,
}
export const getDefaultTasksQueueOptions = (
size: Math.pow(poolMaxSize, 2),
concurrency: 1,
taskStealing: true,
- tasksStealingOnBackPressure: true,
- tasksFinishedTimeout: 2000
- }
-}
-
-export const getWorkerChoiceStrategyRetries = <
- Worker extends IWorker,
- Data,
- Response
->(
- pool: IPool<Worker, Data, Response>,
- opts?: WorkerChoiceStrategyOptions
- ): number => {
- return (
- pool.info.maxSize +
- Object.keys(opts?.weights ?? getDefaultWeights(pool.info.maxSize)).length
- )
-}
-
-export const buildWorkerChoiceStrategyOptions = <
- Worker extends IWorker,
- Data,
- Response
->(
- pool: IPool<Worker, Data, Response>,
- opts?: WorkerChoiceStrategyOptions
- ): WorkerChoiceStrategyOptions => {
- opts = clone(opts ?? {})
- opts.weights = opts.weights ?? getDefaultWeights(pool.info.maxSize)
- return {
- ...{
- runTime: { median: false },
- waitTime: { median: false },
- elu: { median: false }
- },
- ...opts
+ tasksStealingOnBackPressure: false,
+ tasksFinishedTimeout: 2000,
}
}
-const clone = <T>(object: T): T => {
- return structuredClone<T>(object)
-}
-
-const getDefaultWeights = (
- poolMaxSize: number,
- defaultWorkerWeight?: number
-): Record<number, number> => {
- defaultWorkerWeight = defaultWorkerWeight ?? getDefaultWorkerWeight()
- const weights: Record<number, number> = {}
- for (let workerNodeKey = 0; workerNodeKey < poolMaxSize; workerNodeKey++) {
- weights[workerNodeKey] = defaultWorkerWeight
- }
- return weights
-}
-
-const getDefaultWorkerWeight = (): number => {
- const cpuSpeed = randomInt(500, 2500)
- let cpusCycleTimeWeight = 0
- for (const cpu of cpus()) {
- // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
- if (cpu.speed == null || cpu.speed === 0) {
- cpu.speed =
- // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
- cpus().find(cpu => cpu.speed != null && cpu.speed !== 0)?.speed ??
- cpuSpeed
- }
- // CPU estimated cycle time
- const numberOfDigits = cpu.speed.toString().length - 1
- const cpuCycleTime = 1 / (cpu.speed / Math.pow(10, numberOfDigits))
- cpusCycleTimeWeight += cpuCycleTime * Math.pow(10, numberOfDigits)
- }
- return Math.round(cpusCycleTimeWeight / cpus().length)
-}
-
export const checkFilePath = (filePath: string | undefined): void => {
if (filePath == null) {
throw new TypeError('The worker file path must be specified')
}
}
+export const checkValidPriority = (priority: number | undefined): void => {
+ if (priority != null && !Number.isSafeInteger(priority)) {
+ throw new TypeError(`Invalid property 'priority': '${priority.toString()}'`)
+ }
+ if (
+ priority != null &&
+ Number.isSafeInteger(priority) &&
+ (priority < -20 || priority > 19)
+ ) {
+ throw new RangeError("Property 'priority' must be between -20 and 19")
+ }
+}
+
export const checkValidWorkerChoiceStrategy = (
workerChoiceStrategy: WorkerChoiceStrategy | undefined
): void => {
tasksQueueOptions.concurrency <= 0
) {
throw new RangeError(
- `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero`
+ `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency.toString()} is a negative integer or zero`
)
}
if (
}
if (tasksQueueOptions?.size != null && tasksQueueOptions.size <= 0) {
throw new RangeError(
- `Invalid worker node tasks queue size: ${tasksQueueOptions.size} is a negative integer or zero`
+ `Invalid worker node tasks queue size: ${tasksQueueOptions.size.toString()} is a negative integer or zero`
)
}
}
}
if (!isPlainObject(opts)) {
throw new TypeError(
- 'Cannot construct a worker node with invalid options: must be a plain object'
+ 'Cannot construct a worker node with invalid worker node options: must be a plain object'
)
}
if (opts.tasksQueueBackPressureSize == null) {
'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer'
)
}
+ if (opts.tasksQueueBucketSize == null) {
+ throw new TypeError(
+ 'Cannot construct a worker node without a tasks queue bucket size option'
+ )
+ }
+ if (!Number.isSafeInteger(opts.tasksQueueBucketSize)) {
+ throw new TypeError(
+ 'Cannot construct a worker node with a tasks queue bucket size option that is not an integer'
+ )
+ }
+ if (opts.tasksQueueBucketSize <= 0) {
+ throw new RangeError(
+ 'Cannot construct a worker node with a tasks queue bucket size option that is not a positive integer'
+ )
+ }
+ if (opts.tasksQueuePriority == null) {
+ throw new TypeError(
+ 'Cannot construct a worker node without a tasks queue priority option'
+ )
+ }
+ if (typeof opts.tasksQueuePriority !== 'boolean') {
+ throw new TypeError(
+ 'Cannot construct a worker node with a tasks queue priority option that is not a boolean'
+ )
+ }
}
/**
* Updates the given measurement statistics.
- *
* @param measurementStatistics - The measurement statistics to update.
* @param measurementRequirements - The measurement statistics requirements.
* @param measurementValue - The measurement value.
(measurementStatistics.aggregate ?? 0) + measurementValue
measurementStatistics.minimum = min(
measurementValue,
- measurementStatistics.minimum ?? Infinity
+ measurementStatistics.minimum ?? Number.POSITIVE_INFINITY
)
measurementStatistics.maximum = max(
measurementValue,
- measurementStatistics.maximum ?? -Infinity
+ measurementStatistics.maximum ?? Number.NEGATIVE_INFINITY
)
if (measurementRequirements.average || measurementRequirements.median) {
- measurementStatistics.history.push(measurementValue)
+ measurementStatistics.history.put(measurementValue)
if (measurementRequirements.average) {
- measurementStatistics.average = average(measurementStatistics.history)
+ measurementStatistics.average = average(
+ measurementStatistics.history.toArray()
+ )
} else if (measurementStatistics.average != null) {
delete measurementStatistics.average
}
if (measurementRequirements.median) {
- measurementStatistics.median = median(measurementStatistics.history)
+ measurementStatistics.median = median(
+ measurementStatistics.history.toArray()
+ )
} else if (measurementStatistics.median != null) {
delete measurementStatistics.median
}
Data = unknown,
Response = unknown
>(
- workerChoiceStrategyContext:
- | WorkerChoiceStrategyContext<Worker, Data, Response>
+ workerChoiceStrategiesContext:
+ | WorkerChoiceStrategiesContext<Worker, Data, Response>
| undefined,
workerUsage: WorkerUsage,
task: Task<Data>
const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
updateMeasurementStatistics(
workerUsage.waitTime,
- workerChoiceStrategyContext?.getTaskStatisticsRequirements().waitTime,
+ workerChoiceStrategiesContext?.getTaskStatisticsRequirements().waitTime,
taskWaitTime
)
}
Data = unknown,
Response = unknown
>(
- workerChoiceStrategyContext:
- | WorkerChoiceStrategyContext<Worker, Data, Response>
+ workerChoiceStrategiesContext:
+ | WorkerChoiceStrategiesContext<Worker, Data, Response>
| undefined,
workerUsage: WorkerUsage,
message: MessageValue<Response>
}
updateMeasurementStatistics(
workerUsage.runTime,
- workerChoiceStrategyContext?.getTaskStatisticsRequirements().runTime,
+ workerChoiceStrategiesContext?.getTaskStatisticsRequirements().runTime,
message.taskPerformance?.runTime ?? 0
)
}
Data = unknown,
Response = unknown
>(
- workerChoiceStrategyContext:
- | WorkerChoiceStrategyContext<Worker, Data, Response>
+ workerChoiceStrategiesContext:
+ | WorkerChoiceStrategiesContext<Worker, Data, Response>
| undefined,
workerUsage: WorkerUsage,
message: MessageValue<Response>
return
}
const eluTaskStatisticsRequirements =
- workerChoiceStrategyContext?.getTaskStatisticsRequirements().elu
+ workerChoiceStrategiesContext?.getTaskStatisticsRequirements().elu
updateMeasurementStatistics(
workerUsage.elu.active,
eluTaskStatisticsRequirements,
export const createWorker = <Worker extends IWorker>(
type: WorkerType,
filePath: string,
- opts: { env?: Record<string, unknown>, workerOptions?: WorkerOptions }
+ opts: { env?: Record<string, unknown>; workerOptions?: WorkerOptions }
): Worker => {
switch (type) {
case WorkerTypes.thread:
return new ThreadWorker(filePath, {
env: SHARE_ENV,
- ...opts.workerOptions
+ ...opts.workerOptions,
}) as unknown as Worker
case WorkerTypes.cluster:
return cluster.fork(opts.env) as unknown as Worker
/**
* Returns the worker type of the given worker.
- *
* @param worker - The worker to get the type of.
* @returns The worker type of the given worker.
* @internal
/**
* Returns the worker id of the given worker.
- *
* @param worker - The worker to get the id of.
* @returns The worker id of the given worker.
* @internal
resolve(events)
return
}
- workerNode.on(workerNodeEvent, () => {
- ++events
- if (events === numberOfEventsToWait) {
- resolve(events)
- }
- })
+ switch (workerNodeEvent) {
+ case 'idle':
+ case 'backPressure':
+ case 'taskFinished':
+ workerNode.on(workerNodeEvent, () => {
+ ++events
+ if (events === numberOfEventsToWait) {
+ resolve(events)
+ }
+ })
+ break
+ default:
+ throw new Error('Invalid worker node event')
+ }
if (timeout >= 0) {
setTimeout(() => {
resolve(events)