max,
median,
min,
- round,
- updateMeasurementStatistics
+ round
} from '../utils'
import { KillBehaviors } from '../worker/worker-options'
import type { TaskFunction } from '../worker/task-functions'
import {
checkFilePath,
checkValidTasksQueueOptions,
- checkValidWorkerChoiceStrategy
+ checkValidWorkerChoiceStrategy,
+ updateMeasurementStatistics
} from './utils'
/**
import { existsSync } from 'node:fs'
-import { isPlainObject } from '../utils'
+import { average, isPlainObject, max, median, min } from '../utils'
import {
+ type MeasurementStatisticsRequirements,
WorkerChoiceStrategies,
type WorkerChoiceStrategy
} from './selection-strategies/selection-strategies-types'
import type { TasksQueueOptions } from './pool'
+import type { IWorker, MeasurementStatistics } from './worker'
export const checkFilePath = (filePath: string): void => {
if (
)
}
}
+
+export const checkWorkerNodeArguments = <Worker extends IWorker>(
+ worker: Worker,
+ tasksQueueBackPressureSize: number
+): void => {
+ if (worker == null) {
+ throw new TypeError('Cannot construct a worker node without a worker')
+ }
+ if (tasksQueueBackPressureSize == null) {
+ throw new TypeError(
+ 'Cannot construct a worker node without a tasks queue back pressure size'
+ )
+ }
+ if (!Number.isSafeInteger(tasksQueueBackPressureSize)) {
+ throw new TypeError(
+ 'Cannot construct a worker node with a tasks queue back pressure size that is not an integer'
+ )
+ }
+ if (tasksQueueBackPressureSize <= 0) {
+ throw new RangeError(
+ 'Cannot construct a worker node with a tasks queue back pressure size that is not a positive integer'
+ )
+ }
+}
+
+/**
+ * Updates the given measurement statistics.
+ *
+ * @param measurementStatistics - The measurement statistics to update.
+ * @param measurementRequirements - The measurement statistics requirements.
+ * @param measurementValue - The measurement value.
+ * @param numberOfMeasurements - The number of measurements.
+ * @internal
+ */
+export const updateMeasurementStatistics = (
+ measurementStatistics: MeasurementStatistics,
+ measurementRequirements: MeasurementStatisticsRequirements,
+ measurementValue: number
+): void => {
+ if (measurementRequirements.aggregate) {
+ measurementStatistics.aggregate =
+ (measurementStatistics.aggregate ?? 0) + measurementValue
+ measurementStatistics.minimum = min(
+ measurementValue,
+ measurementStatistics.minimum ?? Infinity
+ )
+ measurementStatistics.maximum = max(
+ measurementValue,
+ measurementStatistics.maximum ?? -Infinity
+ )
+ if (
+ (measurementRequirements.average || measurementRequirements.median) &&
+ measurementValue != null
+ ) {
+ measurementStatistics.history.push(measurementValue)
+ if (measurementRequirements.average) {
+ measurementStatistics.average = average(measurementStatistics.history)
+ } else if (measurementStatistics.average != null) {
+ delete measurementStatistics.average
+ }
+ if (measurementRequirements.median) {
+ measurementStatistics.median = median(measurementStatistics.history)
+ } else if (measurementStatistics.median != null) {
+ delete measurementStatistics.median
+ }
+ }
+ }
+}
WorkerTypes,
type WorkerUsage
} from './worker'
+import { checkWorkerNodeArguments } from './utils'
/**
* Worker node.
* @param tasksQueueBackPressureSize - The tasks queue back pressure size.
*/
constructor (worker: Worker, tasksQueueBackPressureSize: number) {
- this.checkWorkerNodeArguments(worker, tasksQueueBackPressureSize)
+ checkWorkerNodeArguments<Worker>(worker, tasksQueueBackPressureSize)
this.worker = worker
this.info = this.initWorkerInfo(worker)
this.usage = this.initWorkerUsage()
}
}
}
-
- private checkWorkerNodeArguments (
- worker: Worker,
- tasksQueueBackPressureSize: number
- ): void {
- if (worker == null) {
- throw new TypeError('Cannot construct a worker node without a worker')
- }
- if (tasksQueueBackPressureSize == null) {
- throw new TypeError(
- 'Cannot construct a worker node without a tasks queue back pressure size'
- )
- }
- if (!Number.isSafeInteger(tasksQueueBackPressureSize)) {
- throw new TypeError(
- 'Cannot construct a worker node with a tasks queue back pressure size that is not an integer'
- )
- }
- if (tasksQueueBackPressureSize <= 0) {
- throw new RangeError(
- 'Cannot construct a worker node with a tasks queue back pressure size that is not a positive integer'
- )
- }
- }
}
WorkerChoiceStrategyOptions
} from './pools/selection-strategies/selection-strategies-types'
import type { KillBehavior } from './worker/worker-options'
-import {
- type IWorker,
- type MeasurementStatistics,
- type WorkerType,
- WorkerTypes
-} from './pools/worker'
+import { type IWorker, type WorkerType, WorkerTypes } from './pools/worker'
/**
* Default task name.
return typeof fn === 'function' && fn.constructor.name === 'AsyncFunction'
}
-/**
- * Updates the given measurement statistics.
- *
- * @param measurementStatistics - The measurement statistics to update.
- * @param measurementRequirements - The measurement statistics requirements.
- * @param measurementValue - The measurement value.
- * @param numberOfMeasurements - The number of measurements.
- * @internal
- */
-export const updateMeasurementStatistics = (
- measurementStatistics: MeasurementStatistics,
- measurementRequirements: MeasurementStatisticsRequirements,
- measurementValue: number
-): void => {
- if (measurementRequirements.aggregate) {
- measurementStatistics.aggregate =
- (measurementStatistics.aggregate ?? 0) + measurementValue
- measurementStatistics.minimum = min(
- measurementValue,
- measurementStatistics.minimum ?? Infinity
- )
- measurementStatistics.maximum = max(
- measurementValue,
- measurementStatistics.maximum ?? -Infinity
- )
- if (
- (measurementRequirements.average || measurementRequirements.median) &&
- measurementValue != null
- ) {
- measurementStatistics.history.push(measurementValue)
- if (measurementRequirements.average) {
- measurementStatistics.average = average(measurementStatistics.history)
- } else if (measurementStatistics.average != null) {
- delete measurementStatistics.average
- }
- if (measurementRequirements.median) {
- measurementStatistics.median = median(measurementStatistics.history)
- } else if (measurementStatistics.median != null) {
- delete measurementStatistics.median
- }
- }
- }
-}
-
/**
* Generates a cryptographically secure random number in the [0,1[ range
*
TaskFunctions,
TaskSyncFunction
} from './task-functions'
+import {
+ checkTaskFunctionName,
+ checkValidTaskFunctionEntry,
+ checkValidWorkerOptions
+} from './utils'
const DEFAULT_MAX_INACTIVE_TIME = 60000
const DEFAULT_WORKER_OPTIONS: WorkerOptions = {
}
private checkWorkerOptions (opts: WorkerOptions): void {
- if (opts != null && !isPlainObject(opts)) {
- throw new TypeError('opts worker options parameter is not a plain object')
- }
- if (
- opts?.killBehavior != null &&
- !Object.values(KillBehaviors).includes(opts.killBehavior)
- ) {
- throw new TypeError(
- `killBehavior option '${opts.killBehavior}' is not valid`
- )
- }
- if (
- opts?.maxInactiveTime != null &&
- !Number.isSafeInteger(opts.maxInactiveTime)
- ) {
- throw new TypeError('maxInactiveTime option is not an integer')
- }
- if (opts?.maxInactiveTime != null && opts.maxInactiveTime < 5) {
- throw new TypeError(
- 'maxInactiveTime option is not a positive integer greater or equal than 5'
- )
- }
- if (opts?.killHandler != null && typeof opts.killHandler !== 'function') {
- throw new TypeError('killHandler option is not a function')
- }
- if (opts?.async != null) {
- throw new Error('async option is deprecated')
- }
+ checkValidWorkerOptions(opts)
this.opts = { ...DEFAULT_WORKER_OPTIONS, ...opts }
}
- private checkValidTaskFunctionEntry (
- name: string,
- fn: TaskFunction<Data, Response>
- ): void {
- if (typeof name !== 'string') {
- throw new TypeError(
- 'A taskFunctions parameter object key is not a string'
- )
- }
- if (typeof name === 'string' && name.trim().length === 0) {
- throw new TypeError(
- 'A taskFunctions parameter object key is an empty string'
- )
- }
- if (typeof fn !== 'function') {
- throw new TypeError(
- 'A taskFunctions parameter object value is not a function'
- )
- }
- }
-
/**
* Checks if the `taskFunctions` parameter is passed to the constructor and valid.
*
} else if (isPlainObject(taskFunctions)) {
let firstEntry = true
for (const [name, fn] of Object.entries(taskFunctions)) {
- this.checkValidTaskFunctionEntry(name, fn)
+ checkValidTaskFunctionEntry<Data, Response>(name, fn)
const boundFn = fn.bind(this)
if (firstEntry) {
this.taskFunctions.set(DEFAULT_TASK_NAME, boundFn)
*/
public hasTaskFunction (name: string): TaskFunctionOperationResult {
try {
- this.checkTaskFunctionName(name)
+ checkTaskFunctionName(name)
} catch (error) {
return { status: false, error: error as Error }
}
fn: TaskFunction<Data, Response>
): TaskFunctionOperationResult {
try {
- this.checkTaskFunctionName(name)
+ checkTaskFunctionName(name)
if (name === DEFAULT_TASK_NAME) {
throw new Error(
'Cannot add a task function with the default reserved name'
*/
public removeTaskFunction (name: string): TaskFunctionOperationResult {
try {
- this.checkTaskFunctionName(name)
+ checkTaskFunctionName(name)
if (name === DEFAULT_TASK_NAME) {
throw new Error(
'Cannot remove the task function with the default reserved name'
*/
public setDefaultTaskFunction (name: string): TaskFunctionOperationResult {
try {
- this.checkTaskFunctionName(name)
+ checkTaskFunctionName(name)
if (name === DEFAULT_TASK_NAME) {
throw new Error(
'Cannot set the default task function reserved name as the default task function'
}
}
- private checkTaskFunctionName (name: string): void {
- if (typeof name !== 'string') {
- throw new TypeError('name parameter is not a string')
- }
- if (typeof name === 'string' && name.trim().length === 0) {
- throw new TypeError('name parameter is an empty string')
- }
- }
-
/**
* Handles the ready message sent by the main worker.
*
* Runs the given task.
*
* @param task - The task to execute.
- * @throws {@link https://nodejs.org/api/errors.html#class-error} If the task function is not found.
*/
protected run (task: Task<Data>): void {
const { name, taskId, data } = task
--- /dev/null
+import { isPlainObject } from '../utils'
+import type { TaskFunction } from './task-functions'
+import { KillBehaviors, type WorkerOptions } from './worker-options'
+
+export const checkValidWorkerOptions = (opts: WorkerOptions): void => {
+ if (opts != null && !isPlainObject(opts)) {
+ throw new TypeError('opts worker options parameter is not a plain object')
+ }
+ if (
+ opts?.killBehavior != null &&
+ !Object.values(KillBehaviors).includes(opts.killBehavior)
+ ) {
+ throw new TypeError(
+ `killBehavior option '${opts.killBehavior}' is not valid`
+ )
+ }
+ if (
+ opts?.maxInactiveTime != null &&
+ !Number.isSafeInteger(opts.maxInactiveTime)
+ ) {
+ throw new TypeError('maxInactiveTime option is not an integer')
+ }
+ if (opts?.maxInactiveTime != null && opts.maxInactiveTime < 5) {
+ throw new TypeError(
+ 'maxInactiveTime option is not a positive integer greater or equal than 5'
+ )
+ }
+ if (opts?.killHandler != null && typeof opts.killHandler !== 'function') {
+ throw new TypeError('killHandler option is not a function')
+ }
+ if (opts?.async != null) {
+ throw new Error('async option is deprecated')
+ }
+}
+
+export const checkValidTaskFunctionEntry = <Data = unknown, Response = unknown>(
+ name: string,
+ fn: TaskFunction<Data, Response>
+): void => {
+ if (typeof name !== 'string') {
+ throw new TypeError('A taskFunctions parameter object key is not a string')
+ }
+ if (typeof name === 'string' && name.trim().length === 0) {
+ throw new TypeError(
+ 'A taskFunctions parameter object key is an empty string'
+ )
+ }
+ if (typeof fn !== 'function') {
+ throw new TypeError(
+ 'A taskFunctions parameter object value is not a function'
+ )
+ }
+}
+
+export const checkTaskFunctionName = (name: string): void => {
+ if (typeof name !== 'string') {
+ throw new TypeError('name parameter is not a string')
+ }
+ if (typeof name === 'string' && name.trim().length === 0) {
+ throw new TypeError('name parameter is an empty string')
+ }
+}
PoolTypes,
WorkerChoiceStrategies,
WorkerTypes
-} = require('../../../lib')
-const { CircularArray } = require('../../../lib/circular-array')
-const { Deque } = require('../../../lib/deque')
-const { DEFAULT_TASK_NAME } = require('../../../lib/utils')
-const { version } = require('../../../package.json')
-const { waitPoolEvents } = require('../../test-utils')
-const { WorkerNode } = require('../../../lib/pools/worker-node')
+} = require('../../lib')
+const { CircularArray } = require('../../lib/circular-array')
+const { Deque } = require('../../lib/deque')
+const { DEFAULT_TASK_NAME } = require('../../lib/utils')
+const { version } = require('../../package.json')
+const { waitPoolEvents } = require('../test-utils')
+const { WorkerNode } = require('../../lib/pools/worker-node')
describe('Abstract pool test suite', () => {
const numberOfWorkers = 2
--- /dev/null
+const { expect } = require('expect')
+const {
+ DEFAULT_CIRCULAR_ARRAY_SIZE,
+ CircularArray
+} = require('../../lib/circular-array')
+const { updateMeasurementStatistics } = require('../../lib/pools/utils')
+
+describe('Pool utils test suite', () => {
+ it('Verify updateMeasurementStatistics() behavior', () => {
+ const measurementStatistics = {
+ history: new CircularArray()
+ }
+ updateMeasurementStatistics(
+ measurementStatistics,
+ { aggregate: true, average: false, median: false },
+ 0.01
+ )
+ expect(measurementStatistics).toStrictEqual({
+ aggregate: 0.01,
+ maximum: 0.01,
+ minimum: 0.01,
+ history: new CircularArray()
+ })
+ updateMeasurementStatistics(
+ measurementStatistics,
+ { aggregate: true, average: false, median: false },
+ 0.02
+ )
+ expect(measurementStatistics).toStrictEqual({
+ aggregate: 0.03,
+ maximum: 0.02,
+ minimum: 0.01,
+ history: new CircularArray()
+ })
+ updateMeasurementStatistics(
+ measurementStatistics,
+ { aggregate: true, average: true, median: false },
+ 0.001
+ )
+ expect(measurementStatistics).toStrictEqual({
+ aggregate: 0.031,
+ maximum: 0.02,
+ minimum: 0.001,
+ average: 0.001,
+ history: new CircularArray(DEFAULT_CIRCULAR_ARRAY_SIZE, 0.001)
+ })
+ updateMeasurementStatistics(
+ measurementStatistics,
+ { aggregate: true, average: true, median: false },
+ 0.003
+ )
+ expect(measurementStatistics).toStrictEqual({
+ aggregate: 0.034,
+ maximum: 0.02,
+ minimum: 0.001,
+ average: 0.002,
+ history: new CircularArray(DEFAULT_CIRCULAR_ARRAY_SIZE, 0.001, 0.003)
+ })
+ updateMeasurementStatistics(
+ measurementStatistics,
+ { aggregate: true, average: false, median: true },
+ 0.006
+ )
+ expect(measurementStatistics).toStrictEqual({
+ aggregate: 0.04,
+ maximum: 0.02,
+ minimum: 0.001,
+ median: 0.003,
+ history: new CircularArray(
+ DEFAULT_CIRCULAR_ARRAY_SIZE,
+ 0.001,
+ 0.003,
+ 0.006
+ )
+ })
+ updateMeasurementStatistics(
+ measurementStatistics,
+ { aggregate: true, average: true, median: false },
+ 0.01
+ )
+ expect(measurementStatistics).toStrictEqual({
+ aggregate: 0.05,
+ maximum: 0.02,
+ minimum: 0.001,
+ average: 0.005,
+ history: new CircularArray(
+ DEFAULT_CIRCULAR_ARRAY_SIZE,
+ 0.001,
+ 0.003,
+ 0.006,
+ 0.01
+ )
+ })
+ })
+})
const { MessageChannel, Worker } = require('node:worker_threads')
const cluster = require('node:cluster')
const { expect } = require('expect')
-const { WorkerNode } = require('../../../lib/pools/worker-node')
-const { WorkerTypes } = require('../../../lib')
-const { CircularArray } = require('../../../lib/circular-array')
-const { Deque } = require('../../../lib/deque')
-const { DEFAULT_TASK_NAME } = require('../../../lib/utils')
+const { WorkerNode } = require('../../lib/pools/worker-node')
+const { WorkerTypes } = require('../../lib')
+const { CircularArray } = require('../../lib/circular-array')
+const { Deque } = require('../../lib/deque')
+const { DEFAULT_TASK_NAME } = require('../../lib/utils')
describe('Worker node test suite', () => {
const threadWorker = new Worker('./tests/worker-files/thread/testWorker.js')
const cluster = require('node:cluster')
const os = require('node:os')
const { expect } = require('expect')
-const {
- CircularArray,
- DEFAULT_CIRCULAR_ARRAY_SIZE
-} = require('../lib/circular-array')
const {
DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS,
DEFAULT_TASK_NAME,
min,
round,
secureRandom,
- sleep,
- updateMeasurementStatistics
+ sleep
} = require('../lib/utils')
const { KillBehaviors, WorkerTypes } = require('../lib')
expect(isAsyncFunction(async function named () {})).toBe(true)
})
- it('Verify updateMeasurementStatistics() behavior', () => {
- const measurementStatistics = {
- history: new CircularArray()
- }
- updateMeasurementStatistics(
- measurementStatistics,
- { aggregate: true, average: false, median: false },
- 0.01
- )
- expect(measurementStatistics).toStrictEqual({
- aggregate: 0.01,
- maximum: 0.01,
- minimum: 0.01,
- history: new CircularArray()
- })
- updateMeasurementStatistics(
- measurementStatistics,
- { aggregate: true, average: false, median: false },
- 0.02
- )
- expect(measurementStatistics).toStrictEqual({
- aggregate: 0.03,
- maximum: 0.02,
- minimum: 0.01,
- history: new CircularArray()
- })
- updateMeasurementStatistics(
- measurementStatistics,
- { aggregate: true, average: true, median: false },
- 0.001
- )
- expect(measurementStatistics).toStrictEqual({
- aggregate: 0.031,
- maximum: 0.02,
- minimum: 0.001,
- average: 0.001,
- history: new CircularArray(DEFAULT_CIRCULAR_ARRAY_SIZE, 0.001)
- })
- updateMeasurementStatistics(
- measurementStatistics,
- { aggregate: true, average: true, median: false },
- 0.003
- )
- expect(measurementStatistics).toStrictEqual({
- aggregate: 0.034,
- maximum: 0.02,
- minimum: 0.001,
- average: 0.002,
- history: new CircularArray(DEFAULT_CIRCULAR_ARRAY_SIZE, 0.001, 0.003)
- })
- updateMeasurementStatistics(
- measurementStatistics,
- { aggregate: true, average: false, median: true },
- 0.006
- )
- expect(measurementStatistics).toStrictEqual({
- aggregate: 0.04,
- maximum: 0.02,
- minimum: 0.001,
- median: 0.003,
- history: new CircularArray(
- DEFAULT_CIRCULAR_ARRAY_SIZE,
- 0.001,
- 0.003,
- 0.006
- )
- })
- updateMeasurementStatistics(
- measurementStatistics,
- { aggregate: true, average: true, median: false },
- 0.01
- )
- expect(measurementStatistics).toStrictEqual({
- aggregate: 0.05,
- maximum: 0.02,
- minimum: 0.001,
- average: 0.005,
- history: new CircularArray(
- DEFAULT_CIRCULAR_ARRAY_SIZE,
- 0.001,
- 0.003,
- 0.006,
- 0.01
- )
- })
- })
-
it('Verify secureRandom() behavior', () => {
const randomNumber = secureRandom()
expect(typeof randomNumber === 'number').toBe(true)