X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=2f5c61896658311c96c551deee3fb99552273be2;hb=8b80810a81554f0e6442dd681e68f59fdfd23312;hp=1aa1a13413f807019352907d2dac7a635ab665d1;hpb=1079e6ed099b03189005b33702c1e9fa912c74d1;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 1aa1a134..2f5c6189 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -5,12 +5,14 @@ import { type TransferListItem } from 'node:worker_threads' import type { MessageValue, PromiseResponseWrapper, - Task + Task, + Writable } from '../utility-types' import { DEFAULT_TASK_NAME, DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS, EMPTY_FUNCTION, + average, isKillBehavior, isPlainObject, median, @@ -93,6 +95,10 @@ export abstract class AbstractPool< * Whether the pool is starting or not. */ private readonly starting: boolean + /** + * Whether the pool is started or not. + */ + private started: boolean /** * The start timestamp of the pool. */ @@ -141,6 +147,7 @@ export abstract class AbstractPool< this.starting = true this.startPool() this.starting = false + this.started = true this.startTimestamp = performance.now() } @@ -179,7 +186,7 @@ export abstract class AbstractPool< protected checkDynamicPoolSize (min: number, max: number): void { if (this.type === PoolTypes.dynamic) { if (max == null) { - throw new Error( + throw new TypeError( 'Cannot instantiate a dynamic pool without specifying the maximum pool size' ) } else if (!Number.isSafeInteger(max)) { @@ -258,10 +265,10 @@ export abstract class AbstractPool< } if ( workerChoiceStrategyOptions.choiceRetries != null && - workerChoiceStrategyOptions.choiceRetries <= 0 + workerChoiceStrategyOptions.choiceRetries < 0 ) { throw new RangeError( - `Invalid worker choice strategy options: choice retries '${workerChoiceStrategyOptions.choiceRetries}' must be greater than zero` + `Invalid worker choice strategy options: choice retries '${workerChoiceStrategyOptions.choiceRetries}' must be greater or equal than zero` ) } if ( @@ -285,7 +292,7 @@ export abstract class AbstractPool< } private checkValidTasksQueueOptions ( - tasksQueueOptions: TasksQueueOptions + tasksQueueOptions: Writable ): void { if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) { throw new TypeError('Invalid tasks queue options: must be a plain object') @@ -295,15 +302,33 @@ export abstract class AbstractPool< !Number.isSafeInteger(tasksQueueOptions.concurrency) ) { throw new TypeError( - 'Invalid worker tasks concurrency: must be an integer' + 'Invalid worker node tasks concurrency: must be an integer' ) } if ( tasksQueueOptions?.concurrency != null && tasksQueueOptions.concurrency <= 0 ) { + throw new RangeError( + `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero` + ) + } + if (tasksQueueOptions?.queueMaxSize != null) { throw new Error( - `Invalid worker tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero` + 'Invalid tasks queue options: queueMaxSize is deprecated, please use size instead' + ) + } + if ( + tasksQueueOptions?.size != null && + !Number.isSafeInteger(tasksQueueOptions.size) + ) { + throw new TypeError( + 'Invalid worker node tasks queue size: must be an integer' + ) + } + if (tasksQueueOptions?.size != null && tasksQueueOptions.size <= 0) { + throw new RangeError( + `Invalid worker node tasks queue size: ${tasksQueueOptions.size} is a negative integer or zero` ) } } @@ -374,6 +399,13 @@ export abstract class AbstractPool< ...(this.opts.enableTasksQueue === true && { backPressure: this.hasBackPressure() }), + ...(this.opts.enableTasksQueue === true && { + stolenTasks: this.workerNodes.reduce( + (accumulator, workerNode) => + accumulator + workerNode.usage.tasks.stolen, + 0 + ) + }), failedTasks: this.workerNodes.reduce( (accumulator, workerNode) => accumulator + workerNode.usage.tasks.failed, @@ -396,24 +428,26 @@ export abstract class AbstractPool< ) ) ), - average: round( - this.workerNodes.reduce( - (accumulator, workerNode) => - accumulator + (workerNode.usage.runTime?.aggregate ?? 0), - 0 - ) / - this.workerNodes.reduce( - (accumulator, workerNode) => - accumulator + (workerNode.usage.tasks?.executed ?? 0), - 0 + ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements() + .runTime.average && { + average: round( + average( + this.workerNodes.reduce( + (accumulator, workerNode) => + accumulator.concat(workerNode.usage.runTime.history), + [] + ) ) - ), + ) + }), ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements() .runTime.median && { median: round( median( - this.workerNodes.map( - (workerNode) => workerNode.usage.runTime?.median ?? 0 + this.workerNodes.reduce( + (accumulator, workerNode) => + accumulator.concat(workerNode.usage.runTime.history), + [] ) ) ) @@ -437,24 +471,26 @@ export abstract class AbstractPool< ) ) ), - average: round( - this.workerNodes.reduce( - (accumulator, workerNode) => - accumulator + (workerNode.usage.waitTime?.aggregate ?? 0), - 0 - ) / - this.workerNodes.reduce( - (accumulator, workerNode) => - accumulator + (workerNode.usage.tasks?.executed ?? 0), - 0 + ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements() + .waitTime.average && { + average: round( + average( + this.workerNodes.reduce( + (accumulator, workerNode) => + accumulator.concat(workerNode.usage.waitTime.history), + [] + ) ) - ), + ) + }), ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements() .waitTime.median && { median: round( median( - this.workerNodes.map( - (workerNode) => workerNode.usage.waitTime?.median ?? 0 + this.workerNodes.reduce( + (accumulator, workerNode) => + accumulator.concat(workerNode.usage.waitTime.history), + [] ) ) ) @@ -620,16 +656,27 @@ export abstract class AbstractPool< this.checkValidTasksQueueOptions(tasksQueueOptions) this.opts.tasksQueueOptions = this.buildTasksQueueOptions(tasksQueueOptions) + this.setTasksQueueMaxSize(this.opts.tasksQueueOptions.size as number) } else if (this.opts.tasksQueueOptions != null) { delete this.opts.tasksQueueOptions } } + private setTasksQueueMaxSize (size: number): void { + for (const workerNode of this.workerNodes) { + workerNode.tasksQueueBackPressureSize = size + } + } + private buildTasksQueueOptions ( tasksQueueOptions: TasksQueueOptions ): TasksQueueOptions { return { - concurrency: tasksQueueOptions?.concurrency ?? 1 + ...{ + size: Math.pow(this.maxSize, 2), + concurrency: 1 + }, + ...tasksQueueOptions } } @@ -694,8 +741,13 @@ export abstract class AbstractPool< transferList?: TransferListItem[] ): Promise { return await new Promise((resolve, reject) => { + if (!this.started) { + reject(new Error('Cannot execute a task on destroyed pool')) + return + } if (name != null && typeof name !== 'string') { reject(new TypeError('name argument must be a string')) + return } if ( name != null && @@ -703,22 +755,15 @@ export abstract class AbstractPool< name.trim().length === 0 ) { reject(new TypeError('name argument must not be an empty string')) + return } if (transferList != null && !Array.isArray(transferList)) { reject(new TypeError('transferList argument must be an array')) + return } const timestamp = performance.now() const workerNodeKey = this.chooseWorkerNode() const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo - if ( - name != null && - Array.isArray(workerInfo.taskFunctions) && - !workerInfo.taskFunctions.includes(name) - ) { - reject( - new Error(`Task function '${name}' is not registered in the pool`) - ) - } const task: Task = { name: name ?? DEFAULT_TASK_NAME, // eslint-disable-next-line @typescript-eslint/consistent-type-assertions @@ -736,6 +781,7 @@ export abstract class AbstractPool< if ( this.opts.enableTasksQueue === false || (this.opts.enableTasksQueue === true && + this.tasksQueueSize(workerNodeKey) === 0 && this.workerNodes[workerNodeKey].usage.tasks.executing < (this.opts.tasksQueueOptions?.concurrency as number)) ) { @@ -754,6 +800,7 @@ export abstract class AbstractPool< }) ) this.emitter?.emit(PoolEvents.destroy, this.info) + this.started = false } protected async sendKillMessageToWorker ( @@ -786,7 +833,7 @@ export abstract class AbstractPool< * @virtual */ protected setupHook (): void { - // Intentionally empty + /** Intentionally empty */ } /** @@ -850,7 +897,7 @@ export abstract class AbstractPool< const taskFunctionWorkerUsage = this.workerNodes[ workerNodeKey ].getTaskFunctionWorkerUsage( - message.taskPerformance?.name ?? DEFAULT_TASK_NAME + message.taskPerformance?.name as string ) as WorkerUsage this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message) this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message) @@ -883,13 +930,6 @@ export abstract class AbstractPool< workerTaskStatistics.executing > 0 ) { --workerTaskStatistics.executing - } else if ( - workerTaskStatistics.executing != null && - workerTaskStatistics.executing < 0 - ) { - throw new Error( - 'Worker usage statistic for tasks executing cannot be negative' - ) } if (message.taskError == null) { ++workerTaskStatistics.executed @@ -902,11 +942,13 @@ export abstract class AbstractPool< workerUsage: WorkerUsage, message: MessageValue ): void { + if (message.taskError != null) { + return + } updateMeasurementStatistics( workerUsage.runTime, this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime, - message.taskPerformance?.runTime ?? 0, - workerUsage.tasks.executed + message.taskPerformance?.runTime ?? 0 ) } @@ -919,8 +961,7 @@ export abstract class AbstractPool< updateMeasurementStatistics( workerUsage.waitTime, this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime, - taskWaitTime, - workerUsage.tasks.executed + taskWaitTime ) } @@ -928,19 +969,20 @@ export abstract class AbstractPool< workerUsage: WorkerUsage, message: MessageValue ): void { + if (message.taskError != null) { + return + } const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements = this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu updateMeasurementStatistics( workerUsage.elu.active, eluTaskStatisticsRequirements, - message.taskPerformance?.elu?.active ?? 0, - workerUsage.tasks.executed + message.taskPerformance?.elu?.active ?? 0 ) updateMeasurementStatistics( workerUsage.elu.idle, eluTaskStatisticsRequirements, - message.taskPerformance?.elu?.idle ?? 0, - workerUsage.tasks.executed + message.taskPerformance?.elu?.idle ?? 0 ) if (eluTaskStatisticsRequirements.aggregate) { if (message.taskPerformance?.elu != null) { @@ -1021,7 +1063,11 @@ export abstract class AbstractPool< workerInfo.ready = false this.workerNodes[workerNodeKey].closeChannel() this.emitter?.emit(PoolEvents.error, error) - if (this.opts.restartWorkerOnError === true && !this.starting) { + if ( + this.opts.restartWorkerOnError === true && + !this.starting && + this.started + ) { if (workerInfo.dynamic) { this.createAndSetupDynamicWorkerNode() } else { @@ -1113,6 +1159,12 @@ export abstract class AbstractPool< this.sendStartupMessageToWorker(workerNodeKey) // Send the statistics message to worker. this.sendStatisticsMessageToWorker(workerNodeKey) + if (this.opts.enableTasksQueue === true) { + this.workerNodes[workerNodeKey].onEmptyQueue = + this.taskStealingOnEmptyQueue.bind(this) + this.workerNodes[workerNodeKey].onBackPressure = + this.tasksStealingOnBackPressure.bind(this) + } } /** @@ -1142,44 +1194,131 @@ export abstract class AbstractPool< private redistributeQueuedTasks (workerNodeKey: number): void { while (this.tasksQueueSize(workerNodeKey) > 0) { - let targetWorkerNodeKey: number = workerNodeKey + let destinationWorkerNodeKey!: number let minQueuedTasks = Infinity - let executeTask = false for (const [workerNodeId, workerNode] of this.workerNodes.entries()) { - const workerInfo = this.getWorkerInfo(workerNodeId) as WorkerInfo + if (workerNode.info.ready && workerNodeId !== workerNodeKey) { + if (workerNode.usage.tasks.queued === 0) { + destinationWorkerNodeKey = workerNodeId + break + } + if (workerNode.usage.tasks.queued < minQueuedTasks) { + minQueuedTasks = workerNode.usage.tasks.queued + destinationWorkerNodeKey = workerNodeId + } + } + } + if (destinationWorkerNodeKey != null) { + const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey] + const task = { + ...(this.dequeueTask(workerNodeKey) as Task), + workerId: destinationWorkerNode.info.id as number + } if ( - workerNodeId !== workerNodeKey && - workerInfo.ready && - workerNode.usage.tasks.queued === 0 + this.tasksQueueSize(destinationWorkerNodeKey) === 0 && + destinationWorkerNode.usage.tasks.executing < + (this.opts.tasksQueueOptions?.concurrency as number) ) { - if ( - this.workerNodes[workerNodeId].usage.tasks.executing < + this.executeTask(destinationWorkerNodeKey, task) + } else { + this.enqueueTask(destinationWorkerNodeKey, task) + } + } + } + } + + private taskStealingOnEmptyQueue (workerId: number): void { + const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId) + const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey] + const workerNodes = this.workerNodes + .slice() + .sort( + (workerNodeA, workerNodeB) => + workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued + ) + for (const sourceWorkerNode of workerNodes) { + if (sourceWorkerNode.usage.tasks.queued === 0) { + break + } + if ( + sourceWorkerNode.info.ready && + sourceWorkerNode.info.id !== workerId && + sourceWorkerNode.usage.tasks.queued > 0 + ) { + const task = { + ...(sourceWorkerNode.popTask() as Task), + workerId: destinationWorkerNode.info.id as number + } + if ( + this.tasksQueueSize(destinationWorkerNodeKey) === 0 && + destinationWorkerNode.usage.tasks.executing < (this.opts.tasksQueueOptions?.concurrency as number) - ) { - executeTask = true - } - targetWorkerNodeKey = workerNodeId - break + ) { + this.executeTask(destinationWorkerNodeKey, task) + } else { + this.enqueueTask(destinationWorkerNodeKey, task) + } + if (destinationWorkerNode?.usage != null) { + ++destinationWorkerNode.usage.tasks.stolen } if ( - workerNodeId !== workerNodeKey && - workerInfo.ready && - workerNode.usage.tasks.queued < minQueuedTasks + this.shallUpdateTaskFunctionWorkerUsage(destinationWorkerNodeKey) && + destinationWorkerNode.getTaskFunctionWorkerUsage( + task.name as string + ) != null ) { - minQueuedTasks = workerNode.usage.tasks.queued - targetWorkerNodeKey = workerNodeId + const taskFunctionWorkerUsage = + destinationWorkerNode.getTaskFunctionWorkerUsage( + task.name as string + ) as WorkerUsage + ++taskFunctionWorkerUsage.tasks.stolen } + break } - if (executeTask) { - this.executeTask( - targetWorkerNodeKey, - this.dequeueTask(workerNodeKey) as Task - ) - } else { - this.enqueueTask( - targetWorkerNodeKey, - this.dequeueTask(workerNodeKey) as Task - ) + } + } + + private tasksStealingOnBackPressure (workerId: number): void { + if ((this.opts.tasksQueueOptions?.size as number) <= 1) { + return + } + const sourceWorkerNode = + this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)] + const workerNodes = this.workerNodes + .slice() + .sort( + (workerNodeA, workerNodeB) => + workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued + ) + for (const [workerNodeKey, workerNode] of workerNodes.entries()) { + if ( + sourceWorkerNode.usage.tasks.queued > 0 && + workerNode.info.ready && + workerNode.info.id !== workerId && + workerNode.usage.tasks.queued < + (this.opts.tasksQueueOptions?.size as number) - 1 + ) { + const task = { + ...(sourceWorkerNode.popTask() as Task), + workerId: workerNode.info.id as number + } + if (this.tasksQueueSize(workerNodeKey) === 0) { + this.executeTask(workerNodeKey, task) + } else { + this.enqueueTask(workerNodeKey, task) + } + if (workerNode?.usage != null) { + ++workerNode.usage.tasks.stolen + } + if ( + this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) && + workerNode.getTaskFunctionWorkerUsage(task.name as string) != null + ) { + const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage( + task.name as string + ) as WorkerUsage + ++taskFunctionWorkerUsage.tasks.stolen + } } } } @@ -1292,7 +1431,7 @@ export abstract class AbstractPool< const workerNode = new WorkerNode( worker, this.worker, - this.maxSize + this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2) ) // Flag the worker node as ready at pool startup. if (this.starting) {