"benchmark": "pnpm build && node --max-old-space-size=8192 --enable-source-maps benchmarks/internal/bench.mjs",
"benchmark:prod": "pnpm build:prod && node --max-old-space-size=8192 --enable-source-maps benchmarks/internal/bench.mjs",
"benchmark:debug": "pnpm build && node --max-old-space-size=8192 --enable-source-maps --inspect benchmarks/internal/bench.mjs",
- "test": "pnpm build --environment SOURCEMAP:false && c8 mocha 'tests/**/*.test.mjs'",
- "test:debug": "pnpm build && mocha --no-parallel --inspect 'tests/**/*.test.mjs'",
+ "test": "pnpm build --environment SOURCEMAP:false && cross-env NODE_ENV=test c8 mocha 'tests/**/*.test.mjs'",
+ "test:debug": "pnpm build && cross-env NODE_ENV=test mocha --no-parallel --inspect 'tests/**/*.test.mjs'",
"coverage": "c8 report --reporter=lcov",
"coverage:html": "c8 report --reporter=html",
"format": "biome format . --write; ts-standard . --fix",
"@release-it/keep-a-changelog": "^5.0.0",
"@rollup/plugin-terser": "^0.4.4",
"@rollup/plugin-typescript": "^11.1.5",
- "@types/node": "^20.10.4",
+ "@types/node": "^20.10.5",
"@typescript-eslint/eslint-plugin": "^6.14.0",
"@typescript-eslint/parser": "^6.14.0",
"benchmark": "^2.1.4",
"c8": "^8.0.1",
+ "cross-env": "^7.0.3",
"eslint": "^8.56.0",
"eslint-config-standard": "^17.1.0",
"eslint-config-standard-with-typescript": "^43.0.0",
specifier: ^11.1.5
version: 11.1.5(rollup@4.9.1)(typescript@5.3.3)
'@types/node':
- specifier: ^20.10.4
- version: 20.10.4
+ specifier: ^20.10.5
+ version: 20.10.5
'@typescript-eslint/eslint-plugin':
specifier: ^6.14.0
version: 6.14.0(@typescript-eslint/parser@6.14.0)(eslint@8.56.0)(typescript@5.3.3)
c8:
specifier: ^8.0.1
version: 8.0.1
+ cross-env:
+ specifier: ^7.0.3
+ version: 7.0.3
eslint:
specifier: ^8.56.0
version: 8.56.0
'@jest/schemas': 29.6.3
'@types/istanbul-lib-coverage': 2.0.6
'@types/istanbul-reports': 3.0.4
- '@types/node': 20.10.4
+ '@types/node': 20.10.5
'@types/yargs': 17.0.32
chalk: 4.1.2
dev: true
resolution: {integrity: sha512-ZUxbzKl0IfJILTS6t7ip5fQQM/J3TJYubDm3nMbgubNNYS62eXeUpoLUC8/7fJNiFYHTrGPQn7hspDUzIHX3UA==}
dependencies:
'@types/minimatch': 5.1.2
- '@types/node': 20.10.4
+ '@types/node': 20.10.5
dev: true
/@types/http-cache-semantics@4.0.4:
undici-types: 5.26.5
dev: true
- /@types/node@20.10.4:
- resolution: {integrity: sha512-D08YG6rr8X90YB56tSIuBaddy/UXAA9RKJoFvrsnogAum/0pmjkgi4+2nx96A330FmioegBWmEYQ+syqCFaveg==}
+ /@types/node@20.10.5:
+ resolution: {integrity: sha512-nNPsNE65wjMxEKI93yOP+NPGGBJz/PoN3kZsVLee0XMiJolxSekEVD8wRwBUBqkwc7UWop0edW50yrCQW4CyRw==}
dependencies:
undici-types: 5.26.5
dev: true
typescript: 5.3.3
dev: true
+ /cross-env@7.0.3:
+ resolution: {integrity: sha512-+/HKd6EgcQCJGh2PSjZuUitQBQynKor4wrFbRg4DtAgS1aWO+gU52xpH7M9ScGgXSYmAVS9bIJ8EzuaGw0oNAw==}
+ engines: {node: '>=10.14', npm: '>=6', yarn: '>=1'}
+ hasBin: true
+ dependencies:
+ cross-spawn: 7.0.3
+ dev: true
+
/cross-spawn@7.0.3:
resolution: {integrity: sha512-iRDPJKUPVEND7dHPO8rkbOnPpyDygcDFtWjpeWNCgy8WP2rXcxXL8TskReQl6OrB2G7+UJrags1q15Fudc7G6w==}
engines: {node: '>= 8'}
engines: {node: ^14.15.0 || ^16.10.0 || >=18.0.0}
dependencies:
'@jest/types': 29.6.3
- '@types/node': 20.10.4
+ '@types/node': 20.10.5
chalk: 4.1.2
ci-info: 3.9.0
graceful-fs: 4.2.11
WorkerUsage
} from './worker'
import {
- type MeasurementStatisticsRequirements,
Measurements,
WorkerChoiceStrategies,
type WorkerChoiceStrategy,
checkFilePath,
checkValidTasksQueueOptions,
checkValidWorkerChoiceStrategy,
- updateMeasurementStatistics,
+ updateEluWorkerUsage,
+ updateRunTimeWorkerUsage,
+ updateTaskStatisticsWorkerUsage,
+ updateWaitTimeWorkerUsage,
waitWorkerNodeEvents
} from './utils'
if (this.workerNodes[workerNodeKey]?.usage != null) {
const workerUsage = this.workerNodes[workerNodeKey].usage
++workerUsage.tasks.executing
- this.updateWaitTimeWorkerUsage(workerUsage, task)
+ updateWaitTimeWorkerUsage(
+ this.workerChoiceStrategyContext,
+ workerUsage,
+ task
+ )
}
if (
this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
workerNodeKey
].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
++taskFunctionWorkerUsage.tasks.executing
- this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
+ updateWaitTimeWorkerUsage(
+ this.workerChoiceStrategyContext,
+ taskFunctionWorkerUsage,
+ task
+ )
}
}
workerNodeKey: number,
message: MessageValue<Response>
): void {
+ let needWorkerChoiceStrategyUpdate = false
if (this.workerNodes[workerNodeKey]?.usage != null) {
const workerUsage = this.workerNodes[workerNodeKey].usage
- this.updateTaskStatisticsWorkerUsage(workerUsage, message)
- this.updateRunTimeWorkerUsage(workerUsage, message)
- this.updateEluWorkerUsage(workerUsage, message)
+ updateTaskStatisticsWorkerUsage(workerUsage, message)
+ updateRunTimeWorkerUsage(
+ this.workerChoiceStrategyContext,
+ workerUsage,
+ message
+ )
+ updateEluWorkerUsage(
+ this.workerChoiceStrategyContext,
+ workerUsage,
+ message
+ )
+ needWorkerChoiceStrategyUpdate = true
}
if (
this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
].getTaskFunctionWorkerUsage(
message.taskPerformance?.name as string
) as WorkerUsage
- this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
- this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
- this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
+ updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
+ updateRunTimeWorkerUsage(
+ this.workerChoiceStrategyContext,
+ taskFunctionWorkerUsage,
+ message
+ )
+ updateEluWorkerUsage(
+ this.workerChoiceStrategyContext,
+ taskFunctionWorkerUsage,
+ message
+ )
+ needWorkerChoiceStrategyUpdate = true
+ }
+ if (needWorkerChoiceStrategyUpdate) {
+ this.workerChoiceStrategyContext.update(workerNodeKey)
}
}
)
}
- private updateTaskStatisticsWorkerUsage (
- workerUsage: WorkerUsage,
- message: MessageValue<Response>
- ): void {
- const workerTaskStatistics = workerUsage.tasks
- if (
- workerTaskStatistics.executing != null &&
- workerTaskStatistics.executing > 0
- ) {
- --workerTaskStatistics.executing
- }
- if (message.workerError == null) {
- ++workerTaskStatistics.executed
- } else {
- ++workerTaskStatistics.failed
- }
- }
-
- private updateRunTimeWorkerUsage (
- workerUsage: WorkerUsage,
- message: MessageValue<Response>
- ): void {
- if (message.workerError != null) {
- return
- }
- updateMeasurementStatistics(
- workerUsage.runTime,
- this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
- message.taskPerformance?.runTime ?? 0
- )
- }
-
- private updateWaitTimeWorkerUsage (
- workerUsage: WorkerUsage,
- task: Task<Data>
- ): void {
- const timestamp = performance.now()
- const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
- updateMeasurementStatistics(
- workerUsage.waitTime,
- this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
- taskWaitTime
- )
- }
-
- private updateEluWorkerUsage (
- workerUsage: WorkerUsage,
- message: MessageValue<Response>
- ): void {
- if (message.workerError != null) {
- return
- }
- const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
- this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
- updateMeasurementStatistics(
- workerUsage.elu.active,
- eluTaskStatisticsRequirements,
- message.taskPerformance?.elu?.active ?? 0
- )
- updateMeasurementStatistics(
- workerUsage.elu.idle,
- eluTaskStatisticsRequirements,
- message.taskPerformance?.elu?.idle ?? 0
- )
- if (eluTaskStatisticsRequirements.aggregate) {
- if (message.taskPerformance?.elu != null) {
- if (workerUsage.elu.utilization != null) {
- workerUsage.elu.utilization =
- (workerUsage.elu.utilization +
- message.taskPerformance.elu.utilization) /
- 2
- } else {
- workerUsage.elu.utilization = message.taskPerformance.elu.utilization
- }
- }
- }
- }
-
/**
* Chooses a worker node for the next task.
*
}
asyncResource?.emitDestroy()
this.afterTaskExecutionHook(workerNodeKey, message)
- this.workerChoiceStrategyContext.update(workerNodeKey)
this.promiseResponseMap.delete(taskId as string)
workerNode?.emit('taskFinished', taskId)
if (this.opts.enableTasksQueue === true) {
import { existsSync } from 'node:fs'
import cluster from 'node:cluster'
import { SHARE_ENV, Worker, type WorkerOptions } from 'node:worker_threads'
+import { env } from 'node:process'
import { average, isPlainObject, max, median, min } from '../utils'
+import type { MessageValue, Task } from '../utility-types'
import {
type MeasurementStatisticsRequirements,
WorkerChoiceStrategies,
type MeasurementStatistics,
type WorkerNodeOptions,
type WorkerType,
- WorkerTypes
+ WorkerTypes,
+ type WorkerUsage
} from './worker'
+import type { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
export const checkFilePath = (filePath: string): void => {
if (filePath == null) {
* @param numberOfMeasurements - The number of measurements.
* @internal
*/
-export const updateMeasurementStatistics = (
+const updateMeasurementStatistics = (
measurementStatistics: MeasurementStatistics,
measurementRequirements: MeasurementStatisticsRequirements,
measurementValue: number
}
}
}
+if (env.NODE_ENV === 'test') {
+ // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
+ exports.updateMeasurementStatistics = updateMeasurementStatistics
+}
+
+export const updateWaitTimeWorkerUsage = <
+ Worker extends IWorker,
+ Data = unknown,
+ Response = unknown
+>(
+ workerChoiceStrategyContext: WorkerChoiceStrategyContext<
+ Worker,
+ Data,
+ Response
+ >,
+ workerUsage: WorkerUsage,
+ task: Task<Data>
+ ): void => {
+ const timestamp = performance.now()
+ const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
+ updateMeasurementStatistics(
+ workerUsage.waitTime,
+ workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
+ taskWaitTime
+ )
+}
+
+export const updateTaskStatisticsWorkerUsage = <Response = unknown>(
+ workerUsage: WorkerUsage,
+ message: MessageValue<Response>
+): void => {
+ const workerTaskStatistics = workerUsage.tasks
+ if (
+ workerTaskStatistics.executing != null &&
+ workerTaskStatistics.executing > 0
+ ) {
+ --workerTaskStatistics.executing
+ }
+ if (message.workerError == null) {
+ ++workerTaskStatistics.executed
+ } else {
+ ++workerTaskStatistics.failed
+ }
+}
+
+export const updateRunTimeWorkerUsage = <
+ Worker extends IWorker,
+ Data = unknown,
+ Response = unknown
+>(
+ workerChoiceStrategyContext: WorkerChoiceStrategyContext<
+ Worker,
+ Data,
+ Response
+ >,
+ workerUsage: WorkerUsage,
+ message: MessageValue<Response>
+ ): void => {
+ if (message.workerError != null) {
+ return
+ }
+ updateMeasurementStatistics(
+ workerUsage.runTime,
+ workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
+ message.taskPerformance?.runTime ?? 0
+ )
+}
+
+export const updateEluWorkerUsage = <
+ Worker extends IWorker,
+ Data = unknown,
+ Response = unknown
+>(
+ workerChoiceStrategyContext: WorkerChoiceStrategyContext<
+ Worker,
+ Data,
+ Response
+ >,
+ workerUsage: WorkerUsage,
+ message: MessageValue<Response>
+ ): void => {
+ if (message.workerError != null) {
+ return
+ }
+ const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
+ workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
+ updateMeasurementStatistics(
+ workerUsage.elu.active,
+ eluTaskStatisticsRequirements,
+ message.taskPerformance?.elu?.active ?? 0
+ )
+ updateMeasurementStatistics(
+ workerUsage.elu.idle,
+ eluTaskStatisticsRequirements,
+ message.taskPerformance?.elu?.idle ?? 0
+ )
+ if (eluTaskStatisticsRequirements.aggregate) {
+ if (message.taskPerformance?.elu != null) {
+ if (workerUsage.elu.utilization != null) {
+ workerUsage.elu.utilization =
+ (workerUsage.elu.utilization +
+ message.taskPerformance.elu.utilization) /
+ 2
+ } else {
+ workerUsage.elu.utilization = message.taskPerformance.elu.utilization
+ }
+ }
+ }
+}
export const createWorker = <Worker extends IWorker>(
type: WorkerType,