X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=f862bbbecfff8c4068c7243e362035470f00b392;hb=6cd5248f7b289828220cac63f8fe77d021c572e9;hp=507fcc10e91e809b13948b85325e413c1ccac621;hpb=3f25036f6dab9916ac40916ca1dc5358ec1f88ba;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 507fcc10..f862bbbe 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -12,6 +12,7 @@ import { DEFAULT_TASK_NAME, DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS, EMPTY_FUNCTION, + average, isKillBehavior, isPlainObject, median, @@ -264,10 +265,10 @@ export abstract class AbstractPool< } if ( workerChoiceStrategyOptions.choiceRetries != null && - workerChoiceStrategyOptions.choiceRetries <= 0 + workerChoiceStrategyOptions.choiceRetries < 0 ) { throw new RangeError( - `Invalid worker choice strategy options: choice retries '${workerChoiceStrategyOptions.choiceRetries}' must be greater than zero` + `Invalid worker choice strategy options: choice retries '${workerChoiceStrategyOptions.choiceRetries}' must be greater or equal than zero` ) } if ( @@ -312,28 +313,22 @@ export abstract class AbstractPool< `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero` ) } - if ( - tasksQueueOptions?.queueMaxSize != null && - tasksQueueOptions?.size != null - ) { + if (tasksQueueOptions?.queueMaxSize != null) { throw new Error( - 'Invalid tasks queue options: cannot specify both queueMaxSize and size' + 'Invalid tasks queue options: queueMaxSize is deprecated, please use size instead' ) } - if (tasksQueueOptions?.queueMaxSize != null) { - tasksQueueOptions.size = tasksQueueOptions.queueMaxSize - } if ( tasksQueueOptions?.size != null && !Number.isSafeInteger(tasksQueueOptions.size) ) { throw new TypeError( - 'Invalid worker node tasks queue max size: must be an integer' + 'Invalid worker node tasks queue size: must be an integer' ) } if (tasksQueueOptions?.size != null && tasksQueueOptions.size <= 0) { throw new RangeError( - `Invalid worker node tasks queue max size: ${tasksQueueOptions.size} is a negative integer or zero` + `Invalid worker node tasks queue size: ${tasksQueueOptions.size} is a negative integer or zero` ) } } @@ -404,6 +399,13 @@ export abstract class AbstractPool< ...(this.opts.enableTasksQueue === true && { backPressure: this.hasBackPressure() }), + ...(this.opts.enableTasksQueue === true && { + stolenTasks: this.workerNodes.reduce( + (accumulator, workerNode) => + accumulator + workerNode.usage.tasks.stolen, + 0 + ) + }), failedTasks: this.workerNodes.reduce( (accumulator, workerNode) => accumulator + workerNode.usage.tasks.failed, @@ -426,24 +428,26 @@ export abstract class AbstractPool< ) ) ), - average: round( - this.workerNodes.reduce( - (accumulator, workerNode) => - accumulator + (workerNode.usage.runTime?.aggregate ?? 0), - 0 - ) / - this.workerNodes.reduce( - (accumulator, workerNode) => - accumulator + (workerNode.usage.tasks?.executed ?? 0), - 0 + ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements() + .runTime.average && { + average: round( + average( + this.workerNodes.reduce( + (accumulator, workerNode) => + accumulator.concat(workerNode.usage.runTime.history), + [] + ) ) - ), + ) + }), ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements() .runTime.median && { median: round( median( - this.workerNodes.map( - (workerNode) => workerNode.usage.runTime?.median ?? 0 + this.workerNodes.reduce( + (accumulator, workerNode) => + accumulator.concat(workerNode.usage.runTime.history), + [] ) ) ) @@ -467,24 +471,26 @@ export abstract class AbstractPool< ) ) ), - average: round( - this.workerNodes.reduce( - (accumulator, workerNode) => - accumulator + (workerNode.usage.waitTime?.aggregate ?? 0), - 0 - ) / - this.workerNodes.reduce( - (accumulator, workerNode) => - accumulator + (workerNode.usage.tasks?.executed ?? 0), - 0 + ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements() + .waitTime.average && { + average: round( + average( + this.workerNodes.reduce( + (accumulator, workerNode) => + accumulator.concat(workerNode.usage.waitTime.history), + [] + ) ) - ), + ) + }), ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements() .waitTime.median && { median: round( median( - this.workerNodes.map( - (workerNode) => workerNode.usage.waitTime?.median ?? 0 + this.workerNodes.reduce( + (accumulator, workerNode) => + accumulator.concat(workerNode.usage.waitTime.history), + [] ) ) ) @@ -737,9 +743,11 @@ export abstract class AbstractPool< return await new Promise((resolve, reject) => { if (!this.started) { reject(new Error('Cannot execute a task on destroyed pool')) + return } if (name != null && typeof name !== 'string') { reject(new TypeError('name argument must be a string')) + return } if ( name != null && @@ -747,22 +755,15 @@ export abstract class AbstractPool< name.trim().length === 0 ) { reject(new TypeError('name argument must not be an empty string')) + return } if (transferList != null && !Array.isArray(transferList)) { reject(new TypeError('transferList argument must be an array')) + return } const timestamp = performance.now() const workerNodeKey = this.chooseWorkerNode() const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo - if ( - name != null && - Array.isArray(workerInfo.taskFunctions) && - !workerInfo.taskFunctions.includes(name) - ) { - reject( - new Error(`Task function '${name}' is not registered in the pool`) - ) - } const task: Task = { name: name ?? DEFAULT_TASK_NAME, // eslint-disable-next-line @typescript-eslint/consistent-type-assertions @@ -780,6 +781,7 @@ export abstract class AbstractPool< if ( this.opts.enableTasksQueue === false || (this.opts.enableTasksQueue === true && + this.tasksQueueSize(workerNodeKey) === 0 && this.workerNodes[workerNodeKey].usage.tasks.executing < (this.opts.tasksQueueOptions?.concurrency as number)) ) { @@ -831,7 +833,7 @@ export abstract class AbstractPool< * @virtual */ protected setupHook (): void { - // Intentionally empty + /** Intentionally empty */ } /** @@ -940,11 +942,13 @@ export abstract class AbstractPool< workerUsage: WorkerUsage, message: MessageValue ): void { + if (message.taskError != null) { + return + } updateMeasurementStatistics( workerUsage.runTime, this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime, - message.taskPerformance?.runTime ?? 0, - workerUsage.tasks.executed + message.taskPerformance?.runTime ?? 0 ) } @@ -957,8 +961,7 @@ export abstract class AbstractPool< updateMeasurementStatistics( workerUsage.waitTime, this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime, - taskWaitTime, - workerUsage.tasks.executed + taskWaitTime ) } @@ -966,19 +969,20 @@ export abstract class AbstractPool< workerUsage: WorkerUsage, message: MessageValue ): void { + if (message.taskError != null) { + return + } const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements = this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu updateMeasurementStatistics( workerUsage.elu.active, eluTaskStatisticsRequirements, - message.taskPerformance?.elu?.active ?? 0, - workerUsage.tasks.executed + message.taskPerformance?.elu?.active ?? 0 ) updateMeasurementStatistics( workerUsage.elu.idle, eluTaskStatisticsRequirements, - message.taskPerformance?.elu?.idle ?? 0, - workerUsage.tasks.executed + message.taskPerformance?.elu?.idle ?? 0 ) if (eluTaskStatisticsRequirements.aggregate) { if (message.taskPerformance?.elu != null) { @@ -1192,15 +1196,8 @@ export abstract class AbstractPool< while (this.tasksQueueSize(workerNodeKey) > 0) { let destinationWorkerNodeKey!: number let minQueuedTasks = Infinity - let executeTask = false for (const [workerNodeId, workerNode] of this.workerNodes.entries()) { if (workerNode.info.ready && workerNodeId !== workerNodeKey) { - if ( - workerNode.usage.tasks.executing < - (this.opts.tasksQueueOptions?.concurrency as number) - ) { - executeTask = true - } if (workerNode.usage.tasks.queued === 0) { destinationWorkerNodeKey = workerNodeId break @@ -1212,12 +1209,16 @@ export abstract class AbstractPool< } } if (destinationWorkerNodeKey != null) { + const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey] const task = { ...(this.dequeueTask(workerNodeKey) as Task), - workerId: (this.getWorkerInfo(destinationWorkerNodeKey) as WorkerInfo) - .id as number + workerId: destinationWorkerNode.info.id as number } - if (executeTask) { + if ( + this.tasksQueueSize(destinationWorkerNodeKey) === 0 && + destinationWorkerNode.usage.tasks.executing < + (this.opts.tasksQueueOptions?.concurrency as number) + ) { this.executeTask(destinationWorkerNodeKey, task) } else { this.enqueueTask(destinationWorkerNodeKey, task) @@ -1249,19 +1250,38 @@ export abstract class AbstractPool< workerId: destinationWorkerNode.info.id as number } if ( + this.tasksQueueSize(destinationWorkerNodeKey) === 0 && destinationWorkerNode.usage.tasks.executing < - (this.opts.tasksQueueOptions?.concurrency as number) + (this.opts.tasksQueueOptions?.concurrency as number) ) { this.executeTask(destinationWorkerNodeKey, task) } else { this.enqueueTask(destinationWorkerNodeKey, task) } + if (destinationWorkerNode?.usage != null) { + ++destinationWorkerNode.usage.tasks.stolen + } + if ( + this.shallUpdateTaskFunctionWorkerUsage(destinationWorkerNodeKey) && + destinationWorkerNode.getTaskFunctionWorkerUsage( + task.name as string + ) != null + ) { + const taskFunctionWorkerUsage = + destinationWorkerNode.getTaskFunctionWorkerUsage( + task.name as string + ) as WorkerUsage + ++taskFunctionWorkerUsage.tasks.stolen + } break } } } private tasksStealingOnBackPressure (workerId: number): void { + if ((this.opts.tasksQueueOptions?.size as number) <= 1) { + return + } const sourceWorkerNode = this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)] const workerNodes = this.workerNodes @@ -1283,13 +1303,26 @@ export abstract class AbstractPool< workerId: workerNode.info.id as number } if ( + this.tasksQueueSize(workerNodeKey) === 0 && workerNode.usage.tasks.executing < - (this.opts.tasksQueueOptions?.concurrency as number) + (this.opts.tasksQueueOptions?.concurrency as number) ) { this.executeTask(workerNodeKey, task) } else { this.enqueueTask(workerNodeKey, task) } + if (workerNode?.usage != null) { + ++workerNode.usage.tasks.stolen + } + if ( + this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) && + workerNode.getTaskFunctionWorkerUsage(task.name as string) != null + ) { + const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage( + task.name as string + ) as WorkerUsage + ++taskFunctionWorkerUsage.tasks.stolen + } } } }