From c329fd41c48904770df633b6d5ea2b3d37f3eafd Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Mon, 18 Dec 2023 14:07:38 +0100 Subject: [PATCH] perf: update worker choice strategies internal if needed MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- package.json | 7 +- pnpm-lock.yaml | 25 +++++-- src/pools/abstract-pool.ts | 131 ++++++++++++------------------------- src/pools/utils.ts | 117 ++++++++++++++++++++++++++++++++- 4 files changed, 179 insertions(+), 101 deletions(-) diff --git a/package.json b/package.json index 80bf3adf..4679669c 100644 --- a/package.json +++ b/package.json @@ -22,8 +22,8 @@ "benchmark": "pnpm build && node --max-old-space-size=8192 --enable-source-maps benchmarks/internal/bench.mjs", "benchmark:prod": "pnpm build:prod && node --max-old-space-size=8192 --enable-source-maps benchmarks/internal/bench.mjs", "benchmark:debug": "pnpm build && node --max-old-space-size=8192 --enable-source-maps --inspect benchmarks/internal/bench.mjs", - "test": "pnpm build --environment SOURCEMAP:false && c8 mocha 'tests/**/*.test.mjs'", - "test:debug": "pnpm build && mocha --no-parallel --inspect 'tests/**/*.test.mjs'", + "test": "pnpm build --environment SOURCEMAP:false && cross-env NODE_ENV=test c8 mocha 'tests/**/*.test.mjs'", + "test:debug": "pnpm build && cross-env NODE_ENV=test mocha --no-parallel --inspect 'tests/**/*.test.mjs'", "coverage": "c8 report --reporter=lcov", "coverage:html": "c8 report --reporter=html", "format": "biome format . --write; ts-standard . --fix", @@ -117,11 +117,12 @@ "@release-it/keep-a-changelog": "^5.0.0", "@rollup/plugin-terser": "^0.4.4", "@rollup/plugin-typescript": "^11.1.5", - "@types/node": "^20.10.4", + "@types/node": "^20.10.5", "@typescript-eslint/eslint-plugin": "^6.14.0", "@typescript-eslint/parser": "^6.14.0", "benchmark": "^2.1.4", "c8": "^8.0.1", + "cross-env": "^7.0.3", "eslint": "^8.56.0", "eslint-config-standard": "^17.1.0", "eslint-config-standard-with-typescript": "^43.0.0", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 91a52991..d0c70337 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -30,8 +30,8 @@ devDependencies: specifier: ^11.1.5 version: 11.1.5(rollup@4.9.1)(typescript@5.3.3) '@types/node': - specifier: ^20.10.4 - version: 20.10.4 + specifier: ^20.10.5 + version: 20.10.5 '@typescript-eslint/eslint-plugin': specifier: ^6.14.0 version: 6.14.0(@typescript-eslint/parser@6.14.0)(eslint@8.56.0)(typescript@5.3.3) @@ -44,6 +44,9 @@ devDependencies: c8: specifier: ^8.0.1 version: 8.0.1 + cross-env: + specifier: ^7.0.3 + version: 7.0.3 eslint: specifier: ^8.56.0 version: 8.56.0 @@ -492,7 +495,7 @@ packages: '@jest/schemas': 29.6.3 '@types/istanbul-lib-coverage': 2.0.6 '@types/istanbul-reports': 3.0.4 - '@types/node': 20.10.4 + '@types/node': 20.10.5 '@types/yargs': 17.0.32 chalk: 4.1.2 dev: true @@ -948,7 +951,7 @@ packages: resolution: {integrity: sha512-ZUxbzKl0IfJILTS6t7ip5fQQM/J3TJYubDm3nMbgubNNYS62eXeUpoLUC8/7fJNiFYHTrGPQn7hspDUzIHX3UA==} dependencies: '@types/minimatch': 5.1.2 - '@types/node': 20.10.4 + '@types/node': 20.10.5 dev: true /@types/http-cache-semantics@4.0.4: @@ -993,8 +996,8 @@ packages: undici-types: 5.26.5 dev: true - /@types/node@20.10.4: - resolution: {integrity: sha512-D08YG6rr8X90YB56tSIuBaddy/UXAA9RKJoFvrsnogAum/0pmjkgi4+2nx96A330FmioegBWmEYQ+syqCFaveg==} + /@types/node@20.10.5: + resolution: {integrity: sha512-nNPsNE65wjMxEKI93yOP+NPGGBJz/PoN3kZsVLee0XMiJolxSekEVD8wRwBUBqkwc7UWop0edW50yrCQW4CyRw==} dependencies: undici-types: 5.26.5 dev: true @@ -1990,6 +1993,14 @@ packages: typescript: 5.3.3 dev: true + /cross-env@7.0.3: + resolution: {integrity: sha512-+/HKd6EgcQCJGh2PSjZuUitQBQynKor4wrFbRg4DtAgS1aWO+gU52xpH7M9ScGgXSYmAVS9bIJ8EzuaGw0oNAw==} + engines: {node: '>=10.14', npm: '>=6', yarn: '>=1'} + hasBin: true + dependencies: + cross-spawn: 7.0.3 + dev: true + /cross-spawn@7.0.3: resolution: {integrity: sha512-iRDPJKUPVEND7dHPO8rkbOnPpyDygcDFtWjpeWNCgy8WP2rXcxXL8TskReQl6OrB2G7+UJrags1q15Fudc7G6w==} engines: {node: '>= 8'} @@ -4105,7 +4116,7 @@ packages: engines: {node: ^14.15.0 || ^16.10.0 || >=18.0.0} dependencies: '@jest/types': 29.6.3 - '@types/node': 20.10.4 + '@types/node': 20.10.5 chalk: 4.1.2 ci-info: 3.9.0 graceful-fs: 4.2.11 diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index ea07dc8f..670d490d 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -43,7 +43,6 @@ import type { WorkerUsage } from './worker' import { - type MeasurementStatisticsRequirements, Measurements, WorkerChoiceStrategies, type WorkerChoiceStrategy, @@ -56,7 +55,10 @@ import { checkFilePath, checkValidTasksQueueOptions, checkValidWorkerChoiceStrategy, - updateMeasurementStatistics, + updateEluWorkerUsage, + updateRunTimeWorkerUsage, + updateTaskStatisticsWorkerUsage, + updateWaitTimeWorkerUsage, waitWorkerNodeEvents } from './utils' @@ -1082,7 +1084,11 @@ export abstract class AbstractPool< if (this.workerNodes[workerNodeKey]?.usage != null) { const workerUsage = this.workerNodes[workerNodeKey].usage ++workerUsage.tasks.executing - this.updateWaitTimeWorkerUsage(workerUsage, task) + updateWaitTimeWorkerUsage( + this.workerChoiceStrategyContext, + workerUsage, + task + ) } if ( this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) && @@ -1094,7 +1100,11 @@ export abstract class AbstractPool< workerNodeKey ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage ++taskFunctionWorkerUsage.tasks.executing - this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task) + updateWaitTimeWorkerUsage( + this.workerChoiceStrategyContext, + taskFunctionWorkerUsage, + task + ) } } @@ -1109,11 +1119,21 @@ export abstract class AbstractPool< workerNodeKey: number, message: MessageValue ): void { + let needWorkerChoiceStrategyUpdate = false if (this.workerNodes[workerNodeKey]?.usage != null) { const workerUsage = this.workerNodes[workerNodeKey].usage - this.updateTaskStatisticsWorkerUsage(workerUsage, message) - this.updateRunTimeWorkerUsage(workerUsage, message) - this.updateEluWorkerUsage(workerUsage, message) + updateTaskStatisticsWorkerUsage(workerUsage, message) + updateRunTimeWorkerUsage( + this.workerChoiceStrategyContext, + workerUsage, + message + ) + updateEluWorkerUsage( + this.workerChoiceStrategyContext, + workerUsage, + message + ) + needWorkerChoiceStrategyUpdate = true } if ( this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) && @@ -1126,9 +1146,21 @@ export abstract class AbstractPool< ].getTaskFunctionWorkerUsage( message.taskPerformance?.name as string ) as WorkerUsage - this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message) - this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message) - this.updateEluWorkerUsage(taskFunctionWorkerUsage, message) + updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message) + updateRunTimeWorkerUsage( + this.workerChoiceStrategyContext, + taskFunctionWorkerUsage, + message + ) + updateEluWorkerUsage( + this.workerChoiceStrategyContext, + taskFunctionWorkerUsage, + message + ) + needWorkerChoiceStrategyUpdate = true + } + if (needWorkerChoiceStrategyUpdate) { + this.workerChoiceStrategyContext.update(workerNodeKey) } } @@ -1147,84 +1179,6 @@ export abstract class AbstractPool< ) } - private updateTaskStatisticsWorkerUsage ( - workerUsage: WorkerUsage, - message: MessageValue - ): void { - const workerTaskStatistics = workerUsage.tasks - if ( - workerTaskStatistics.executing != null && - workerTaskStatistics.executing > 0 - ) { - --workerTaskStatistics.executing - } - if (message.workerError == null) { - ++workerTaskStatistics.executed - } else { - ++workerTaskStatistics.failed - } - } - - private updateRunTimeWorkerUsage ( - workerUsage: WorkerUsage, - message: MessageValue - ): void { - if (message.workerError != null) { - return - } - updateMeasurementStatistics( - workerUsage.runTime, - this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime, - message.taskPerformance?.runTime ?? 0 - ) - } - - private updateWaitTimeWorkerUsage ( - workerUsage: WorkerUsage, - task: Task - ): void { - const timestamp = performance.now() - const taskWaitTime = timestamp - (task.timestamp ?? timestamp) - updateMeasurementStatistics( - workerUsage.waitTime, - this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime, - taskWaitTime - ) - } - - private updateEluWorkerUsage ( - workerUsage: WorkerUsage, - message: MessageValue - ): void { - if (message.workerError != null) { - return - } - const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements = - this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu - updateMeasurementStatistics( - workerUsage.elu.active, - eluTaskStatisticsRequirements, - message.taskPerformance?.elu?.active ?? 0 - ) - updateMeasurementStatistics( - workerUsage.elu.idle, - eluTaskStatisticsRequirements, - message.taskPerformance?.elu?.idle ?? 0 - ) - if (eluTaskStatisticsRequirements.aggregate) { - if (message.taskPerformance?.elu != null) { - if (workerUsage.elu.utilization != null) { - workerUsage.elu.utilization = - (workerUsage.elu.utilization + - message.taskPerformance.elu.utilization) / - 2 - } else { - workerUsage.elu.utilization = message.taskPerformance.elu.utilization - } - } - } - } - /** * Chooses a worker node for the next task. * @@ -1757,7 +1711,6 @@ export abstract class AbstractPool< } asyncResource?.emitDestroy() this.afterTaskExecutionHook(workerNodeKey, message) - this.workerChoiceStrategyContext.update(workerNodeKey) this.promiseResponseMap.delete(taskId as string) workerNode?.emit('taskFinished', taskId) if (this.opts.enableTasksQueue === true) { diff --git a/src/pools/utils.ts b/src/pools/utils.ts index 5e21c38c..ae09e8a9 100644 --- a/src/pools/utils.ts +++ b/src/pools/utils.ts @@ -1,7 +1,9 @@ import { existsSync } from 'node:fs' import cluster from 'node:cluster' import { SHARE_ENV, Worker, type WorkerOptions } from 'node:worker_threads' +import { env } from 'node:process' import { average, isPlainObject, max, median, min } from '../utils' +import type { MessageValue, Task } from '../utility-types' import { type MeasurementStatisticsRequirements, WorkerChoiceStrategies, @@ -14,8 +16,10 @@ import { type MeasurementStatistics, type WorkerNodeOptions, type WorkerType, - WorkerTypes + WorkerTypes, + type WorkerUsage } from './worker' +import type { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context' export const checkFilePath = (filePath: string): void => { if (filePath == null) { @@ -151,7 +155,7 @@ export const checkWorkerNodeArguments = ( * @param numberOfMeasurements - The number of measurements. * @internal */ -export const updateMeasurementStatistics = ( +const updateMeasurementStatistics = ( measurementStatistics: MeasurementStatistics, measurementRequirements: MeasurementStatisticsRequirements, measurementValue: number @@ -185,6 +189,115 @@ export const updateMeasurementStatistics = ( } } } +if (env.NODE_ENV === 'test') { + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access + exports.updateMeasurementStatistics = updateMeasurementStatistics +} + +export const updateWaitTimeWorkerUsage = < + Worker extends IWorker, + Data = unknown, + Response = unknown +>( + workerChoiceStrategyContext: WorkerChoiceStrategyContext< + Worker, + Data, + Response + >, + workerUsage: WorkerUsage, + task: Task + ): void => { + const timestamp = performance.now() + const taskWaitTime = timestamp - (task.timestamp ?? timestamp) + updateMeasurementStatistics( + workerUsage.waitTime, + workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime, + taskWaitTime + ) +} + +export const updateTaskStatisticsWorkerUsage = ( + workerUsage: WorkerUsage, + message: MessageValue +): void => { + const workerTaskStatistics = workerUsage.tasks + if ( + workerTaskStatistics.executing != null && + workerTaskStatistics.executing > 0 + ) { + --workerTaskStatistics.executing + } + if (message.workerError == null) { + ++workerTaskStatistics.executed + } else { + ++workerTaskStatistics.failed + } +} + +export const updateRunTimeWorkerUsage = < + Worker extends IWorker, + Data = unknown, + Response = unknown +>( + workerChoiceStrategyContext: WorkerChoiceStrategyContext< + Worker, + Data, + Response + >, + workerUsage: WorkerUsage, + message: MessageValue + ): void => { + if (message.workerError != null) { + return + } + updateMeasurementStatistics( + workerUsage.runTime, + workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime, + message.taskPerformance?.runTime ?? 0 + ) +} + +export const updateEluWorkerUsage = < + Worker extends IWorker, + Data = unknown, + Response = unknown +>( + workerChoiceStrategyContext: WorkerChoiceStrategyContext< + Worker, + Data, + Response + >, + workerUsage: WorkerUsage, + message: MessageValue + ): void => { + if (message.workerError != null) { + return + } + const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements = + workerChoiceStrategyContext.getTaskStatisticsRequirements().elu + updateMeasurementStatistics( + workerUsage.elu.active, + eluTaskStatisticsRequirements, + message.taskPerformance?.elu?.active ?? 0 + ) + updateMeasurementStatistics( + workerUsage.elu.idle, + eluTaskStatisticsRequirements, + message.taskPerformance?.elu?.idle ?? 0 + ) + if (eluTaskStatisticsRequirements.aggregate) { + if (message.taskPerformance?.elu != null) { + if (workerUsage.elu.utilization != null) { + workerUsage.elu.utilization = + (workerUsage.elu.utilization + + message.taskPerformance.elu.utilization) / + 2 + } else { + workerUsage.elu.utilization = message.taskPerformance.elu.utilization + } + } + } +} export const createWorker = ( type: WorkerType, -- 2.34.1