From e9ed6eeed0f1c96d89c1506ee342b3000a95b4ba Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Sat, 13 Jan 2024 14:40:22 +0100 Subject: [PATCH] refactor: untangle utils purpose MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- .../abstract-worker-choice-strategy.ts | 4 +- .../fair-share-worker-choice-strategy.ts | 2 +- ...hted-round-robin-worker-choice-strategy.ts | 2 +- .../least-busy-worker-choice-strategy.ts | 2 +- .../least-elu-worker-choice-strategy.ts | 2 +- ...hted-round-robin-worker-choice-strategy.ts | 2 +- .../worker-choice-strategy-context.ts | 2 +- src/pools/utils.ts | 126 ++++++++++++++++- src/pools/worker-node.ts | 9 +- src/utils.ts | 130 +----------------- tests/pools/utils.test.mjs | 94 ++++++++++++- tests/utils.test.mjs | 93 +------------ 12 files changed, 234 insertions(+), 234 deletions(-) diff --git a/src/pools/selection-strategies/abstract-worker-choice-strategy.ts b/src/pools/selection-strategies/abstract-worker-choice-strategy.ts index f2443742..ff1d4b08 100644 --- a/src/pools/selection-strategies/abstract-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/abstract-worker-choice-strategy.ts @@ -1,8 +1,8 @@ +import type { IPool } from '../pool.js' import { DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS, buildWorkerChoiceStrategyOptions -} from '../../utils.js' -import type { IPool } from '../pool.js' +} from '../utils.js' import type { IWorker } from '../worker.js' import type { IWorkerChoiceStrategy, diff --git a/src/pools/selection-strategies/fair-share-worker-choice-strategy.ts b/src/pools/selection-strategies/fair-share-worker-choice-strategy.ts index f8746558..ee617d36 100644 --- a/src/pools/selection-strategies/fair-share-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/fair-share-worker-choice-strategy.ts @@ -1,5 +1,5 @@ -import { DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS } from '../../utils.js' import type { IPool } from '../pool.js' +import { DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS } from '../utils.js' import type { IWorker } from '../worker.js' import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy.js' import { diff --git a/src/pools/selection-strategies/interleaved-weighted-round-robin-worker-choice-strategy.ts b/src/pools/selection-strategies/interleaved-weighted-round-robin-worker-choice-strategy.ts index 681870cb..6bbcc5f5 100644 --- a/src/pools/selection-strategies/interleaved-weighted-round-robin-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/interleaved-weighted-round-robin-worker-choice-strategy.ts @@ -1,6 +1,6 @@ import type { IWorker } from '../worker.js' import type { IPool } from '../pool.js' -import { DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS } from '../../utils.js' +import { DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS } from '../utils.js' import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy.js' import type { IWorkerChoiceStrategy, diff --git a/src/pools/selection-strategies/least-busy-worker-choice-strategy.ts b/src/pools/selection-strategies/least-busy-worker-choice-strategy.ts index 5ef438f6..8db32ac7 100644 --- a/src/pools/selection-strategies/least-busy-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/least-busy-worker-choice-strategy.ts @@ -1,5 +1,5 @@ -import { DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS } from '../../utils.js' import type { IPool } from '../pool.js' +import { DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS } from '../utils.js' import type { IWorker } from '../worker.js' import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy.js' import type { diff --git a/src/pools/selection-strategies/least-elu-worker-choice-strategy.ts b/src/pools/selection-strategies/least-elu-worker-choice-strategy.ts index c3fe32d1..9cb28bac 100644 --- a/src/pools/selection-strategies/least-elu-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/least-elu-worker-choice-strategy.ts @@ -1,5 +1,5 @@ -import { DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS } from '../../utils.js' import type { IPool } from '../pool.js' +import { DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS } from '../utils.js' import type { IWorker } from '../worker.js' import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy.js' import type { diff --git a/src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts b/src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts index 434b1f86..c86682cb 100644 --- a/src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts @@ -1,6 +1,6 @@ import type { IWorker } from '../worker.js' import type { IPool } from '../pool.js' -import { DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS } from '../../utils.js' +import { DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS } from '../utils.js' import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy.js' import type { IWorkerChoiceStrategy, diff --git a/src/pools/selection-strategies/worker-choice-strategy-context.ts b/src/pools/selection-strategies/worker-choice-strategy-context.ts index d8d6f9c4..fc940a2f 100644 --- a/src/pools/selection-strategies/worker-choice-strategy-context.ts +++ b/src/pools/selection-strategies/worker-choice-strategy-context.ts @@ -1,6 +1,6 @@ import type { IPool } from '../pool.js' import type { IWorker } from '../worker.js' -import { getWorkerChoiceStrategyRetries } from '../../utils.js' +import { getWorkerChoiceStrategyRetries } from '../utils.js' import { FairShareWorkerChoiceStrategy } from './fair-share-worker-choice-strategy.js' import { InterleavedWeightedRoundRobinWorkerChoiceStrategy } from './interleaved-weighted-round-robin-worker-choice-strategy.js' import { LeastBusyWorkerChoiceStrategy } from './least-busy-worker-choice-strategy.js' diff --git a/src/pools/utils.ts b/src/pools/utils.ts index a97f3e6e..ff270982 100644 --- a/src/pools/utils.ts +++ b/src/pools/utils.ts @@ -1,15 +1,22 @@ import { existsSync } from 'node:fs' -import cluster from 'node:cluster' -import { SHARE_ENV, Worker, type WorkerOptions } from 'node:worker_threads' +import cluster, { Worker as ClusterWorker } from 'node:cluster' +import { + SHARE_ENV, + Worker as ThreadWorker, + type WorkerOptions +} from 'node:worker_threads' import { env } from 'node:process' +import { randomInt } from 'node:crypto' +import { cpus } from 'node:os' import { average, isPlainObject, max, median, min } from '../utils.js' import type { MessageValue, Task } from '../utility-types.js' import { type MeasurementStatisticsRequirements, WorkerChoiceStrategies, - type WorkerChoiceStrategy + type WorkerChoiceStrategy, + type WorkerChoiceStrategyOptions } from './selection-strategies/selection-strategies-types.js' -import type { TasksQueueOptions } from './pool.js' +import type { IPool, TasksQueueOptions } from './pool.js' import { type IWorker, type IWorkerNode, @@ -21,6 +28,16 @@ import { } from './worker.js' import type { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context.js' +/** + * Default measurement statistics requirements. + */ +export const DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS: MeasurementStatisticsRequirements = + { + aggregate: false, + average: false, + median: false + } + export const getDefaultTasksQueueOptions = ( poolMaxSize: number ): Required => { @@ -33,6 +50,75 @@ export const getDefaultTasksQueueOptions = ( } } +export const getWorkerChoiceStrategyRetries = < + Worker extends IWorker, + Data, + Response +>( + pool: IPool, + opts?: WorkerChoiceStrategyOptions + ): number => { + return ( + pool.info.maxSize + + Object.keys(opts?.weights ?? getDefaultWeights(pool.info.maxSize)).length + ) +} + +export const buildWorkerChoiceStrategyOptions = < + Worker extends IWorker, + Data, + Response +>( + pool: IPool, + opts?: WorkerChoiceStrategyOptions + ): WorkerChoiceStrategyOptions => { + opts = clone(opts ?? {}) + opts.weights = opts.weights ?? getDefaultWeights(pool.info.maxSize) + return { + ...{ + runTime: { median: false }, + waitTime: { median: false }, + elu: { median: false } + }, + ...opts + } +} + +const clone = (object: T): T => { + return structuredClone(object) +} + +const getDefaultWeights = ( + poolMaxSize: number, + defaultWorkerWeight?: number +): Record => { + defaultWorkerWeight = defaultWorkerWeight ?? getDefaultWorkerWeight() + const weights: Record = {} + for (let workerNodeKey = 0; workerNodeKey < poolMaxSize; workerNodeKey++) { + weights[workerNodeKey] = defaultWorkerWeight + } + return weights +} + +const getDefaultWorkerWeight = (): number => { + const cpuSpeed = randomInt(500, 2500) + let cpusCycleTimeWeight = 0 + for (const cpu of cpus()) { + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition + if (cpu.speed == null || cpu.speed === 0) { + cpu.speed = + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition + cpus().find(cpu => cpu.speed != null && cpu.speed !== 0)?.speed ?? + cpuSpeed + } + // CPU estimated cycle time + const numberOfDigits = cpu.speed.toString().length - 1 + const cpuCycleTime = 1 / (cpu.speed / Math.pow(10, numberOfDigits)) + cpusCycleTimeWeight += cpuCycleTime * Math.pow(10, numberOfDigits) + } + return Math.round(cpusCycleTimeWeight / cpus().length) +} + export const checkFilePath = (filePath: string | undefined): void => { if (filePath == null) { throw new TypeError('The worker file path must be specified') @@ -316,7 +402,7 @@ export const createWorker = ( ): Worker => { switch (type) { case WorkerTypes.thread: - return new Worker(filePath, { + return new ThreadWorker(filePath, { env: SHARE_ENV, ...opts.workerOptions }) as unknown as Worker @@ -328,6 +414,36 @@ export const createWorker = ( } } +/** + * Returns the worker type of the given worker. + * + * @param worker - The worker to get the type of. + * @returns The worker type of the given worker. + * @internal + */ +export const getWorkerType = (worker: IWorker): WorkerType | undefined => { + if (worker instanceof ThreadWorker) { + return WorkerTypes.thread + } else if (worker instanceof ClusterWorker) { + return WorkerTypes.cluster + } +} + +/** + * Returns the worker id of the given worker. + * + * @param worker - The worker to get the id of. + * @returns The worker id of the given worker. + * @internal + */ +export const getWorkerId = (worker: IWorker): number | undefined => { + if (worker instanceof ThreadWorker) { + return worker.threadId + } else if (worker instanceof ClusterWorker) { + return worker.id + } +} + export const waitWorkerNodeEvents = async < Worker extends IWorker, Data = unknown diff --git a/src/pools/worker-node.ts b/src/pools/worker-node.ts index a9f9ca5f..30821517 100644 --- a/src/pools/worker-node.ts +++ b/src/pools/worker-node.ts @@ -2,7 +2,7 @@ import { MessageChannel } from 'node:worker_threads' import { EventEmitter } from 'node:events' import { CircularArray } from '../circular-array.js' import type { Task } from '../utility-types.js' -import { DEFAULT_TASK_NAME, getWorkerId, getWorkerType } from '../utils.js' +import { DEFAULT_TASK_NAME } from '../utils.js' import { Deque } from '../deque.js' import { type EventHandler, @@ -15,7 +15,12 @@ import { WorkerTypes, type WorkerUsage } from './worker.js' -import { checkWorkerNodeArguments, createWorker } from './utils.js' +import { + checkWorkerNodeArguments, + createWorker, + getWorkerId, + getWorkerType +} from './utils.js' /** * Worker node. diff --git a/src/utils.ts b/src/utils.ts index b38cf948..d74cfbfd 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -1,15 +1,6 @@ import * as os from 'node:os' -import { getRandomValues, randomInt } from 'node:crypto' -import { Worker as ClusterWorker } from 'node:cluster' -import { Worker as ThreadWorker } from 'node:worker_threads' -import { cpus } from 'node:os' -import type { - MeasurementStatisticsRequirements, - WorkerChoiceStrategyOptions -} from './pools/selection-strategies/selection-strategies-types.js' +import { getRandomValues } from 'node:crypto' import type { KillBehavior } from './worker/worker-options.js' -import { type IWorker, type WorkerType, WorkerTypes } from './pools/worker.js' -import type { IPool } from './pools/pool.js' /** * Default task name. @@ -23,16 +14,6 @@ export const EMPTY_FUNCTION: () => void = Object.freeze(() => { /* Intentionally empty */ }) -/** - * Default measurement statistics requirements. - */ -export const DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS: MeasurementStatisticsRequirements = - { - aggregate: false, - average: false, - median: false - } - /** * Returns safe host OS optimized estimate of the default amount of parallelism a pool should use. * Always returns a value greater than zero. @@ -52,36 +33,6 @@ export const availableParallelism = (): number => { return availableParallelism } -/** - * Returns the worker type of the given worker. - * - * @param worker - The worker to get the type of. - * @returns The worker type of the given worker. - * @internal - */ -export const getWorkerType = (worker: IWorker): WorkerType | undefined => { - if (worker instanceof ThreadWorker) { - return WorkerTypes.thread - } else if (worker instanceof ClusterWorker) { - return WorkerTypes.cluster - } -} - -/** - * Returns the worker id of the given worker. - * - * @param worker - The worker to get the id of. - * @returns The worker id of the given worker. - * @internal - */ -export const getWorkerId = (worker: IWorker): number | undefined => { - if (worker instanceof ThreadWorker) { - return worker.threadId - } else if (worker instanceof ClusterWorker) { - return worker.id - } -} - /** * Sleeps for the given amount of milliseconds. * @@ -122,8 +73,7 @@ export const exponentialDelay = ( export const average = (dataSet: number[]): number => { if (Array.isArray(dataSet) && dataSet.length === 0) { return 0 - } - if (Array.isArray(dataSet) && dataSet.length === 1) { + } else if (Array.isArray(dataSet) && dataSet.length === 1) { return dataSet[0] } return ( @@ -142,8 +92,7 @@ export const average = (dataSet: number[]): number => { export const median = (dataSet: number[]): number => { if (Array.isArray(dataSet) && dataSet.length === 0) { return 0 - } - if (Array.isArray(dataSet) && dataSet.length === 1) { + } else if (Array.isArray(dataSet) && dataSet.length === 1) { return dataSet[0] } const sortedDataSet = dataSet.slice().sort((a, b) => a - b) @@ -175,7 +124,7 @@ export const round = (num: number, scale = 2): number => { * @returns `true` if the given object is a plain object, `false` otherwise. * @internal */ -export const isPlainObject = (obj: unknown): boolean => +export const isPlainObject = (obj: unknown): obj is object => typeof obj === 'object' && obj !== null && obj.constructor === Object && @@ -200,7 +149,7 @@ export const isKillBehavior = ( /** * Detects whether the given value is an asynchronous function or not. * - * @param fn - Any value. + * @param fn - Unknown value. * @returns `true` if `fn` was an asynchronous function, otherwise `false`. * @internal */ @@ -266,72 +215,3 @@ export const once = ( return result } } - -export const getWorkerChoiceStrategyRetries = < - Worker extends IWorker, - Data, - Response ->( - pool: IPool, - opts?: WorkerChoiceStrategyOptions - ): number => { - return ( - pool.info.maxSize + - Object.keys(opts?.weights ?? getDefaultWeights(pool.info.maxSize)).length - ) -} - -const clone = (object: T): T => { - return structuredClone(object) -} - -export const buildWorkerChoiceStrategyOptions = < - Worker extends IWorker, - Data, - Response ->( - pool: IPool, - opts?: WorkerChoiceStrategyOptions - ): WorkerChoiceStrategyOptions => { - opts = clone(opts ?? {}) - opts.weights = opts.weights ?? getDefaultWeights(pool.info.maxSize) - return { - ...{ - runTime: { median: false }, - waitTime: { median: false }, - elu: { median: false } - }, - ...opts - } -} - -const getDefaultWeights = ( - poolMaxSize: number, - defaultWorkerWeight?: number -): Record => { - defaultWorkerWeight = defaultWorkerWeight ?? getDefaultWorkerWeight() - const weights: Record = {} - for (let workerNodeKey = 0; workerNodeKey < poolMaxSize; workerNodeKey++) { - weights[workerNodeKey] = defaultWorkerWeight - } - return weights -} - -const getDefaultWorkerWeight = (): number => { - const cpuSpeed = randomInt(500, 2500) - let cpusCycleTimeWeight = 0 - for (const cpu of cpus()) { - // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition - if (cpu.speed == null || cpu.speed === 0) { - cpu.speed = - // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition - cpus().find(cpu => cpu.speed != null && cpu.speed !== 0)?.speed ?? - cpuSpeed - } - // CPU estimated cycle time - const numberOfDigits = cpu.speed.toString().length - 1 - const cpuCycleTime = 1 / (cpu.speed / Math.pow(10, numberOfDigits)) - cpusCycleTimeWeight += cpuCycleTime * Math.pow(10, numberOfDigits) - } - return Math.round(cpusCycleTimeWeight / cpus().length) -} diff --git a/tests/pools/utils.test.mjs b/tests/pools/utils.test.mjs index 610b92c3..6869bda6 100644 --- a/tests/pools/utils.test.mjs +++ b/tests/pools/utils.test.mjs @@ -1,18 +1,35 @@ import { Worker as ThreadWorker } from 'node:worker_threads' -import { Worker as ClusterWorker } from 'node:cluster' +import cluster, { Worker as ClusterWorker } from 'node:cluster' import { expect } from 'expect' import { CircularArray, DEFAULT_CIRCULAR_ARRAY_SIZE } from '../../lib/circular-array.cjs' import { + DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS, + buildWorkerChoiceStrategyOptions, createWorker, getDefaultTasksQueueOptions, + getWorkerChoiceStrategyRetries, + getWorkerId, + getWorkerType, updateMeasurementStatistics } from '../../lib/pools/utils.cjs' -import { WorkerTypes } from '../../lib/index.cjs' +import { + FixedClusterPool, + FixedThreadPool, + WorkerTypes +} from '../../lib/index.cjs' describe('Pool utils test suite', () => { + it('Verify DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS values', () => { + expect(DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS).toStrictEqual({ + aggregate: false, + average: false, + median: false + }) + }) + it('Verify getDefaultTasksQueueOptions() behavior', () => { const poolMaxSize = 4 expect(getDefaultTasksQueueOptions(poolMaxSize)).toStrictEqual({ @@ -24,6 +41,61 @@ describe('Pool utils test suite', () => { }) }) + it('Verify getWorkerChoiceStrategyRetries() behavior', async () => { + const numberOfThreads = 4 + const pool = new FixedThreadPool( + numberOfThreads, + './tests/worker-files/thread/testWorker.mjs' + ) + expect(getWorkerChoiceStrategyRetries(pool)).toBe(pool.info.maxSize * 2) + const workerChoiceStrategyOptions = { + runTime: { median: true }, + waitTime: { median: true }, + elu: { median: true }, + weights: { + 0: 100, + 1: 100 + } + } + expect( + getWorkerChoiceStrategyRetries(pool, workerChoiceStrategyOptions) + ).toBe( + pool.info.maxSize + + Object.keys(workerChoiceStrategyOptions.weights).length + ) + await pool.destroy() + }) + + it('Verify buildWorkerChoiceStrategyOptions() behavior', async () => { + const numberOfWorkers = 4 + const pool = new FixedClusterPool( + numberOfWorkers, + './tests/worker-files/cluster/testWorker.cjs' + ) + expect(buildWorkerChoiceStrategyOptions(pool)).toStrictEqual({ + runTime: { median: false }, + waitTime: { median: false }, + elu: { median: false }, + weights: expect.objectContaining({ + 0: expect.any(Number), + [pool.info.maxSize - 1]: expect.any(Number) + }) + }) + const workerChoiceStrategyOptions = { + runTime: { median: true }, + waitTime: { median: true }, + elu: { median: true }, + weights: { + 0: 100, + 1: 100 + } + } + expect( + buildWorkerChoiceStrategyOptions(pool, workerChoiceStrategyOptions) + ).toStrictEqual(workerChoiceStrategyOptions) + await pool.destroy() + }) + it('Verify updateMeasurementStatistics() behavior', () => { const measurementStatistics = { history: new CircularArray() @@ -127,4 +199,22 @@ describe('Pool utils test suite', () => { ) ).toBeInstanceOf(ClusterWorker) }) + + it('Verify getWorkerType() behavior', () => { + expect( + getWorkerType( + new ThreadWorker('./tests/worker-files/thread/testWorker.mjs') + ) + ).toBe(WorkerTypes.thread) + expect(getWorkerType(cluster.fork())).toBe(WorkerTypes.cluster) + }) + + it('Verify getWorkerId() behavior', () => { + const threadWorker = new ThreadWorker( + './tests/worker-files/thread/testWorker.mjs' + ) + const clusterWorker = cluster.fork() + expect(getWorkerId(threadWorker)).toBe(threadWorker.threadId) + expect(getWorkerId(clusterWorker)).toBe(clusterWorker.id) + }) }) diff --git a/tests/utils.test.mjs b/tests/utils.test.mjs index 5f54f6fc..1078c1fd 100644 --- a/tests/utils.test.mjs +++ b/tests/utils.test.mjs @@ -1,19 +1,12 @@ -import { Worker } from 'node:worker_threads' -import cluster from 'node:cluster' import os from 'node:os' import { randomInt } from 'node:crypto' import { expect } from 'expect' import { - DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS, DEFAULT_TASK_NAME, EMPTY_FUNCTION, availableParallelism, average, - buildWorkerChoiceStrategyOptions, exponentialDelay, - getWorkerChoiceStrategyRetries, - getWorkerId, - getWorkerType, isAsyncFunction, isKillBehavior, isPlainObject, @@ -25,12 +18,7 @@ import { secureRandom, sleep } from '../lib/utils.cjs' -import { - FixedClusterPool, - FixedThreadPool, - KillBehaviors, - WorkerTypes -} from '../lib/index.cjs' +import { KillBehaviors } from '../lib/index.cjs' describe('Utils test suite', () => { it('Verify DEFAULT_TASK_NAME value', () => { @@ -41,14 +29,6 @@ describe('Utils test suite', () => { expect(EMPTY_FUNCTION).toStrictEqual(expect.any(Function)) }) - it('Verify DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS values', () => { - expect(DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS).toStrictEqual({ - aggregate: false, - average: false, - median: false - }) - }) - it('Verify availableParallelism() behavior', () => { const parallelism = availableParallelism() expect(typeof parallelism === 'number').toBe(true) @@ -62,22 +42,6 @@ describe('Utils test suite', () => { expect(parallelism).toBe(expectedParallelism) }) - it('Verify getWorkerType() behavior', () => { - expect( - getWorkerType(new Worker('./tests/worker-files/thread/testWorker.mjs')) - ).toBe(WorkerTypes.thread) - expect(getWorkerType(cluster.fork())).toBe(WorkerTypes.cluster) - }) - - it('Verify getWorkerId() behavior', () => { - const threadWorker = new Worker( - './tests/worker-files/thread/testWorker.mjs' - ) - const clusterWorker = cluster.fork() - expect(getWorkerId(threadWorker)).toBe(threadWorker.threadId) - expect(getWorkerId(clusterWorker)).toBe(clusterWorker.id) - }) - it('Verify sleep() behavior', async () => { const start = performance.now() const sleepMs = 1000 @@ -237,61 +201,6 @@ describe('Utils test suite', () => { expect(max(1, 1)).toBe(1) }) - it('Verify getWorkerChoiceStrategyRetries() behavior', async () => { - const numberOfThreads = 4 - const pool = new FixedThreadPool( - numberOfThreads, - './tests/worker-files/thread/testWorker.mjs' - ) - expect(getWorkerChoiceStrategyRetries(pool)).toBe(pool.info.maxSize * 2) - const workerChoiceStrategyOptions = { - runTime: { median: true }, - waitTime: { median: true }, - elu: { median: true }, - weights: { - 0: 100, - 1: 100 - } - } - expect( - getWorkerChoiceStrategyRetries(pool, workerChoiceStrategyOptions) - ).toBe( - pool.info.maxSize + - Object.keys(workerChoiceStrategyOptions.weights).length - ) - await pool.destroy() - }) - - it('Verify buildWorkerChoiceStrategyOptions() behavior', async () => { - const numberOfWorkers = 4 - const pool = new FixedClusterPool( - numberOfWorkers, - './tests/worker-files/cluster/testWorker.cjs' - ) - expect(buildWorkerChoiceStrategyOptions(pool)).toStrictEqual({ - runTime: { median: false }, - waitTime: { median: false }, - elu: { median: false }, - weights: expect.objectContaining({ - 0: expect.any(Number), - [pool.info.maxSize - 1]: expect.any(Number) - }) - }) - const workerChoiceStrategyOptions = { - runTime: { median: true }, - waitTime: { median: true }, - elu: { median: true }, - weights: { - 0: 100, - 1: 100 - } - } - expect( - buildWorkerChoiceStrategyOptions(pool, workerChoiceStrategyOptions) - ).toStrictEqual(workerChoiceStrategyOptions) - await pool.destroy() - }) - // it('Verify once()', () => { // let called = 0 // const fn = () => ++called -- 2.34.1