+import type { IPool } from '../pool.js'
import {
DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS,
buildWorkerChoiceStrategyOptions
-} from '../../utils.js'
-import type { IPool } from '../pool.js'
+} from '../utils.js'
import type { IWorker } from '../worker.js'
import type {
IWorkerChoiceStrategy,
-import { DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS } from '../../utils.js'
import type { IPool } from '../pool.js'
+import { DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS } from '../utils.js'
import type { IWorker } from '../worker.js'
import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy.js'
import {
import type { IWorker } from '../worker.js'
import type { IPool } from '../pool.js'
-import { DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS } from '../../utils.js'
+import { DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS } from '../utils.js'
import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy.js'
import type {
IWorkerChoiceStrategy,
-import { DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS } from '../../utils.js'
import type { IPool } from '../pool.js'
+import { DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS } from '../utils.js'
import type { IWorker } from '../worker.js'
import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy.js'
import type {
-import { DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS } from '../../utils.js'
import type { IPool } from '../pool.js'
+import { DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS } from '../utils.js'
import type { IWorker } from '../worker.js'
import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy.js'
import type {
import type { IWorker } from '../worker.js'
import type { IPool } from '../pool.js'
-import { DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS } from '../../utils.js'
+import { DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS } from '../utils.js'
import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy.js'
import type {
IWorkerChoiceStrategy,
import type { IPool } from '../pool.js'
import type { IWorker } from '../worker.js'
-import { getWorkerChoiceStrategyRetries } from '../../utils.js'
+import { getWorkerChoiceStrategyRetries } from '../utils.js'
import { FairShareWorkerChoiceStrategy } from './fair-share-worker-choice-strategy.js'
import { InterleavedWeightedRoundRobinWorkerChoiceStrategy } from './interleaved-weighted-round-robin-worker-choice-strategy.js'
import { LeastBusyWorkerChoiceStrategy } from './least-busy-worker-choice-strategy.js'
import { existsSync } from 'node:fs'
-import cluster from 'node:cluster'
-import { SHARE_ENV, Worker, type WorkerOptions } from 'node:worker_threads'
+import cluster, { Worker as ClusterWorker } from 'node:cluster'
+import {
+ SHARE_ENV,
+ Worker as ThreadWorker,
+ type WorkerOptions
+} from 'node:worker_threads'
import { env } from 'node:process'
+import { randomInt } from 'node:crypto'
+import { cpus } from 'node:os'
import { average, isPlainObject, max, median, min } from '../utils.js'
import type { MessageValue, Task } from '../utility-types.js'
import {
type MeasurementStatisticsRequirements,
WorkerChoiceStrategies,
- type WorkerChoiceStrategy
+ type WorkerChoiceStrategy,
+ type WorkerChoiceStrategyOptions
} from './selection-strategies/selection-strategies-types.js'
-import type { TasksQueueOptions } from './pool.js'
+import type { IPool, TasksQueueOptions } from './pool.js'
import {
type IWorker,
type IWorkerNode,
} from './worker.js'
import type { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context.js'
+/**
+ * Default measurement statistics requirements.
+ */
+export const DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS: MeasurementStatisticsRequirements =
+ {
+ aggregate: false,
+ average: false,
+ median: false
+ }
+
export const getDefaultTasksQueueOptions = (
poolMaxSize: number
): Required<TasksQueueOptions> => {
}
}
+export const getWorkerChoiceStrategyRetries = <
+ Worker extends IWorker,
+ Data,
+ Response
+>(
+ pool: IPool<Worker, Data, Response>,
+ opts?: WorkerChoiceStrategyOptions
+ ): number => {
+ return (
+ pool.info.maxSize +
+ Object.keys(opts?.weights ?? getDefaultWeights(pool.info.maxSize)).length
+ )
+}
+
+export const buildWorkerChoiceStrategyOptions = <
+ Worker extends IWorker,
+ Data,
+ Response
+>(
+ pool: IPool<Worker, Data, Response>,
+ opts?: WorkerChoiceStrategyOptions
+ ): WorkerChoiceStrategyOptions => {
+ opts = clone(opts ?? {})
+ opts.weights = opts.weights ?? getDefaultWeights(pool.info.maxSize)
+ return {
+ ...{
+ runTime: { median: false },
+ waitTime: { median: false },
+ elu: { median: false }
+ },
+ ...opts
+ }
+}
+
+const clone = <T>(object: T): T => {
+ return structuredClone<T>(object)
+}
+
+const getDefaultWeights = (
+ poolMaxSize: number,
+ defaultWorkerWeight?: number
+): Record<number, number> => {
+ defaultWorkerWeight = defaultWorkerWeight ?? getDefaultWorkerWeight()
+ const weights: Record<number, number> = {}
+ for (let workerNodeKey = 0; workerNodeKey < poolMaxSize; workerNodeKey++) {
+ weights[workerNodeKey] = defaultWorkerWeight
+ }
+ return weights
+}
+
+const getDefaultWorkerWeight = (): number => {
+ const cpuSpeed = randomInt(500, 2500)
+ let cpusCycleTimeWeight = 0
+ for (const cpu of cpus()) {
+ // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
+ if (cpu.speed == null || cpu.speed === 0) {
+ cpu.speed =
+ // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
+ cpus().find(cpu => cpu.speed != null && cpu.speed !== 0)?.speed ??
+ cpuSpeed
+ }
+ // CPU estimated cycle time
+ const numberOfDigits = cpu.speed.toString().length - 1
+ const cpuCycleTime = 1 / (cpu.speed / Math.pow(10, numberOfDigits))
+ cpusCycleTimeWeight += cpuCycleTime * Math.pow(10, numberOfDigits)
+ }
+ return Math.round(cpusCycleTimeWeight / cpus().length)
+}
+
export const checkFilePath = (filePath: string | undefined): void => {
if (filePath == null) {
throw new TypeError('The worker file path must be specified')
): Worker => {
switch (type) {
case WorkerTypes.thread:
- return new Worker(filePath, {
+ return new ThreadWorker(filePath, {
env: SHARE_ENV,
...opts.workerOptions
}) as unknown as Worker
}
}
+/**
+ * Returns the worker type of the given worker.
+ *
+ * @param worker - The worker to get the type of.
+ * @returns The worker type of the given worker.
+ * @internal
+ */
+export const getWorkerType = (worker: IWorker): WorkerType | undefined => {
+ if (worker instanceof ThreadWorker) {
+ return WorkerTypes.thread
+ } else if (worker instanceof ClusterWorker) {
+ return WorkerTypes.cluster
+ }
+}
+
+/**
+ * Returns the worker id of the given worker.
+ *
+ * @param worker - The worker to get the id of.
+ * @returns The worker id of the given worker.
+ * @internal
+ */
+export const getWorkerId = (worker: IWorker): number | undefined => {
+ if (worker instanceof ThreadWorker) {
+ return worker.threadId
+ } else if (worker instanceof ClusterWorker) {
+ return worker.id
+ }
+}
+
export const waitWorkerNodeEvents = async <
Worker extends IWorker,
Data = unknown
import { EventEmitter } from 'node:events'
import { CircularArray } from '../circular-array.js'
import type { Task } from '../utility-types.js'
-import { DEFAULT_TASK_NAME, getWorkerId, getWorkerType } from '../utils.js'
+import { DEFAULT_TASK_NAME } from '../utils.js'
import { Deque } from '../deque.js'
import {
type EventHandler,
WorkerTypes,
type WorkerUsage
} from './worker.js'
-import { checkWorkerNodeArguments, createWorker } from './utils.js'
+import {
+ checkWorkerNodeArguments,
+ createWorker,
+ getWorkerId,
+ getWorkerType
+} from './utils.js'
/**
* Worker node.
import * as os from 'node:os'
-import { getRandomValues, randomInt } from 'node:crypto'
-import { Worker as ClusterWorker } from 'node:cluster'
-import { Worker as ThreadWorker } from 'node:worker_threads'
-import { cpus } from 'node:os'
-import type {
- MeasurementStatisticsRequirements,
- WorkerChoiceStrategyOptions
-} from './pools/selection-strategies/selection-strategies-types.js'
+import { getRandomValues } from 'node:crypto'
import type { KillBehavior } from './worker/worker-options.js'
-import { type IWorker, type WorkerType, WorkerTypes } from './pools/worker.js'
-import type { IPool } from './pools/pool.js'
/**
* Default task name.
/* Intentionally empty */
})
-/**
- * Default measurement statistics requirements.
- */
-export const DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS: MeasurementStatisticsRequirements =
- {
- aggregate: false,
- average: false,
- median: false
- }
-
/**
* Returns safe host OS optimized estimate of the default amount of parallelism a pool should use.
* Always returns a value greater than zero.
return availableParallelism
}
-/**
- * Returns the worker type of the given worker.
- *
- * @param worker - The worker to get the type of.
- * @returns The worker type of the given worker.
- * @internal
- */
-export const getWorkerType = (worker: IWorker): WorkerType | undefined => {
- if (worker instanceof ThreadWorker) {
- return WorkerTypes.thread
- } else if (worker instanceof ClusterWorker) {
- return WorkerTypes.cluster
- }
-}
-
-/**
- * Returns the worker id of the given worker.
- *
- * @param worker - The worker to get the id of.
- * @returns The worker id of the given worker.
- * @internal
- */
-export const getWorkerId = (worker: IWorker): number | undefined => {
- if (worker instanceof ThreadWorker) {
- return worker.threadId
- } else if (worker instanceof ClusterWorker) {
- return worker.id
- }
-}
-
/**
* Sleeps for the given amount of milliseconds.
*
export const average = (dataSet: number[]): number => {
if (Array.isArray(dataSet) && dataSet.length === 0) {
return 0
- }
- if (Array.isArray(dataSet) && dataSet.length === 1) {
+ } else if (Array.isArray(dataSet) && dataSet.length === 1) {
return dataSet[0]
}
return (
export const median = (dataSet: number[]): number => {
if (Array.isArray(dataSet) && dataSet.length === 0) {
return 0
- }
- if (Array.isArray(dataSet) && dataSet.length === 1) {
+ } else if (Array.isArray(dataSet) && dataSet.length === 1) {
return dataSet[0]
}
const sortedDataSet = dataSet.slice().sort((a, b) => a - b)
* @returns `true` if the given object is a plain object, `false` otherwise.
* @internal
*/
-export const isPlainObject = (obj: unknown): boolean =>
+export const isPlainObject = (obj: unknown): obj is object =>
typeof obj === 'object' &&
obj !== null &&
obj.constructor === Object &&
/**
* Detects whether the given value is an asynchronous function or not.
*
- * @param fn - Any value.
+ * @param fn - Unknown value.
* @returns `true` if `fn` was an asynchronous function, otherwise `false`.
* @internal
*/
return result
}
}
-
-export const getWorkerChoiceStrategyRetries = <
- Worker extends IWorker,
- Data,
- Response
->(
- pool: IPool<Worker, Data, Response>,
- opts?: WorkerChoiceStrategyOptions
- ): number => {
- return (
- pool.info.maxSize +
- Object.keys(opts?.weights ?? getDefaultWeights(pool.info.maxSize)).length
- )
-}
-
-const clone = <T>(object: T): T => {
- return structuredClone<T>(object)
-}
-
-export const buildWorkerChoiceStrategyOptions = <
- Worker extends IWorker,
- Data,
- Response
->(
- pool: IPool<Worker, Data, Response>,
- opts?: WorkerChoiceStrategyOptions
- ): WorkerChoiceStrategyOptions => {
- opts = clone(opts ?? {})
- opts.weights = opts.weights ?? getDefaultWeights(pool.info.maxSize)
- return {
- ...{
- runTime: { median: false },
- waitTime: { median: false },
- elu: { median: false }
- },
- ...opts
- }
-}
-
-const getDefaultWeights = (
- poolMaxSize: number,
- defaultWorkerWeight?: number
-): Record<number, number> => {
- defaultWorkerWeight = defaultWorkerWeight ?? getDefaultWorkerWeight()
- const weights: Record<number, number> = {}
- for (let workerNodeKey = 0; workerNodeKey < poolMaxSize; workerNodeKey++) {
- weights[workerNodeKey] = defaultWorkerWeight
- }
- return weights
-}
-
-const getDefaultWorkerWeight = (): number => {
- const cpuSpeed = randomInt(500, 2500)
- let cpusCycleTimeWeight = 0
- for (const cpu of cpus()) {
- // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
- if (cpu.speed == null || cpu.speed === 0) {
- cpu.speed =
- // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
- cpus().find(cpu => cpu.speed != null && cpu.speed !== 0)?.speed ??
- cpuSpeed
- }
- // CPU estimated cycle time
- const numberOfDigits = cpu.speed.toString().length - 1
- const cpuCycleTime = 1 / (cpu.speed / Math.pow(10, numberOfDigits))
- cpusCycleTimeWeight += cpuCycleTime * Math.pow(10, numberOfDigits)
- }
- return Math.round(cpusCycleTimeWeight / cpus().length)
-}
import { Worker as ThreadWorker } from 'node:worker_threads'
-import { Worker as ClusterWorker } from 'node:cluster'
+import cluster, { Worker as ClusterWorker } from 'node:cluster'
import { expect } from 'expect'
import {
CircularArray,
DEFAULT_CIRCULAR_ARRAY_SIZE
} from '../../lib/circular-array.cjs'
import {
+ DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS,
+ buildWorkerChoiceStrategyOptions,
createWorker,
getDefaultTasksQueueOptions,
+ getWorkerChoiceStrategyRetries,
+ getWorkerId,
+ getWorkerType,
updateMeasurementStatistics
} from '../../lib/pools/utils.cjs'
-import { WorkerTypes } from '../../lib/index.cjs'
+import {
+ FixedClusterPool,
+ FixedThreadPool,
+ WorkerTypes
+} from '../../lib/index.cjs'
describe('Pool utils test suite', () => {
+ it('Verify DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS values', () => {
+ expect(DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS).toStrictEqual({
+ aggregate: false,
+ average: false,
+ median: false
+ })
+ })
+
it('Verify getDefaultTasksQueueOptions() behavior', () => {
const poolMaxSize = 4
expect(getDefaultTasksQueueOptions(poolMaxSize)).toStrictEqual({
})
})
+ it('Verify getWorkerChoiceStrategyRetries() behavior', async () => {
+ const numberOfThreads = 4
+ const pool = new FixedThreadPool(
+ numberOfThreads,
+ './tests/worker-files/thread/testWorker.mjs'
+ )
+ expect(getWorkerChoiceStrategyRetries(pool)).toBe(pool.info.maxSize * 2)
+ const workerChoiceStrategyOptions = {
+ runTime: { median: true },
+ waitTime: { median: true },
+ elu: { median: true },
+ weights: {
+ 0: 100,
+ 1: 100
+ }
+ }
+ expect(
+ getWorkerChoiceStrategyRetries(pool, workerChoiceStrategyOptions)
+ ).toBe(
+ pool.info.maxSize +
+ Object.keys(workerChoiceStrategyOptions.weights).length
+ )
+ await pool.destroy()
+ })
+
+ it('Verify buildWorkerChoiceStrategyOptions() behavior', async () => {
+ const numberOfWorkers = 4
+ const pool = new FixedClusterPool(
+ numberOfWorkers,
+ './tests/worker-files/cluster/testWorker.cjs'
+ )
+ expect(buildWorkerChoiceStrategyOptions(pool)).toStrictEqual({
+ runTime: { median: false },
+ waitTime: { median: false },
+ elu: { median: false },
+ weights: expect.objectContaining({
+ 0: expect.any(Number),
+ [pool.info.maxSize - 1]: expect.any(Number)
+ })
+ })
+ const workerChoiceStrategyOptions = {
+ runTime: { median: true },
+ waitTime: { median: true },
+ elu: { median: true },
+ weights: {
+ 0: 100,
+ 1: 100
+ }
+ }
+ expect(
+ buildWorkerChoiceStrategyOptions(pool, workerChoiceStrategyOptions)
+ ).toStrictEqual(workerChoiceStrategyOptions)
+ await pool.destroy()
+ })
+
it('Verify updateMeasurementStatistics() behavior', () => {
const measurementStatistics = {
history: new CircularArray()
)
).toBeInstanceOf(ClusterWorker)
})
+
+ it('Verify getWorkerType() behavior', () => {
+ expect(
+ getWorkerType(
+ new ThreadWorker('./tests/worker-files/thread/testWorker.mjs')
+ )
+ ).toBe(WorkerTypes.thread)
+ expect(getWorkerType(cluster.fork())).toBe(WorkerTypes.cluster)
+ })
+
+ it('Verify getWorkerId() behavior', () => {
+ const threadWorker = new ThreadWorker(
+ './tests/worker-files/thread/testWorker.mjs'
+ )
+ const clusterWorker = cluster.fork()
+ expect(getWorkerId(threadWorker)).toBe(threadWorker.threadId)
+ expect(getWorkerId(clusterWorker)).toBe(clusterWorker.id)
+ })
})
-import { Worker } from 'node:worker_threads'
-import cluster from 'node:cluster'
import os from 'node:os'
import { randomInt } from 'node:crypto'
import { expect } from 'expect'
import {
- DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS,
DEFAULT_TASK_NAME,
EMPTY_FUNCTION,
availableParallelism,
average,
- buildWorkerChoiceStrategyOptions,
exponentialDelay,
- getWorkerChoiceStrategyRetries,
- getWorkerId,
- getWorkerType,
isAsyncFunction,
isKillBehavior,
isPlainObject,
secureRandom,
sleep
} from '../lib/utils.cjs'
-import {
- FixedClusterPool,
- FixedThreadPool,
- KillBehaviors,
- WorkerTypes
-} from '../lib/index.cjs'
+import { KillBehaviors } from '../lib/index.cjs'
describe('Utils test suite', () => {
it('Verify DEFAULT_TASK_NAME value', () => {
expect(EMPTY_FUNCTION).toStrictEqual(expect.any(Function))
})
- it('Verify DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS values', () => {
- expect(DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS).toStrictEqual({
- aggregate: false,
- average: false,
- median: false
- })
- })
-
it('Verify availableParallelism() behavior', () => {
const parallelism = availableParallelism()
expect(typeof parallelism === 'number').toBe(true)
expect(parallelism).toBe(expectedParallelism)
})
- it('Verify getWorkerType() behavior', () => {
- expect(
- getWorkerType(new Worker('./tests/worker-files/thread/testWorker.mjs'))
- ).toBe(WorkerTypes.thread)
- expect(getWorkerType(cluster.fork())).toBe(WorkerTypes.cluster)
- })
-
- it('Verify getWorkerId() behavior', () => {
- const threadWorker = new Worker(
- './tests/worker-files/thread/testWorker.mjs'
- )
- const clusterWorker = cluster.fork()
- expect(getWorkerId(threadWorker)).toBe(threadWorker.threadId)
- expect(getWorkerId(clusterWorker)).toBe(clusterWorker.id)
- })
-
it('Verify sleep() behavior', async () => {
const start = performance.now()
const sleepMs = 1000
expect(max(1, 1)).toBe(1)
})
- it('Verify getWorkerChoiceStrategyRetries() behavior', async () => {
- const numberOfThreads = 4
- const pool = new FixedThreadPool(
- numberOfThreads,
- './tests/worker-files/thread/testWorker.mjs'
- )
- expect(getWorkerChoiceStrategyRetries(pool)).toBe(pool.info.maxSize * 2)
- const workerChoiceStrategyOptions = {
- runTime: { median: true },
- waitTime: { median: true },
- elu: { median: true },
- weights: {
- 0: 100,
- 1: 100
- }
- }
- expect(
- getWorkerChoiceStrategyRetries(pool, workerChoiceStrategyOptions)
- ).toBe(
- pool.info.maxSize +
- Object.keys(workerChoiceStrategyOptions.weights).length
- )
- await pool.destroy()
- })
-
- it('Verify buildWorkerChoiceStrategyOptions() behavior', async () => {
- const numberOfWorkers = 4
- const pool = new FixedClusterPool(
- numberOfWorkers,
- './tests/worker-files/cluster/testWorker.cjs'
- )
- expect(buildWorkerChoiceStrategyOptions(pool)).toStrictEqual({
- runTime: { median: false },
- waitTime: { median: false },
- elu: { median: false },
- weights: expect.objectContaining({
- 0: expect.any(Number),
- [pool.info.maxSize - 1]: expect.any(Number)
- })
- })
- const workerChoiceStrategyOptions = {
- runTime: { median: true },
- waitTime: { median: true },
- elu: { median: true },
- weights: {
- 0: 100,
- 1: 100
- }
- }
- expect(
- buildWorkerChoiceStrategyOptions(pool, workerChoiceStrategyOptions)
- ).toStrictEqual(workerChoiceStrategyOptions)
- await pool.destroy()
- })
-
// it('Verify once()', () => {
// let called = 0
// const fn = () => ++called