X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=8f57a9067e0967bf87dd303efabfe694b00bdbc6;hb=937d524da3a5cce4795b85ddd1c430f0d184f731;hp=c510b2ad99495226456b880ddd83edd760728b10;hpb=85bbc7ab16c9f69a5dd358b71e3e6d4204dfd630;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index c510b2ad..8f57a906 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -4,6 +4,7 @@ import { EventEmitterAsyncResource } from 'node:events' import { performance } from 'node:perf_hooks' import type { TransferListItem } from 'node:worker_threads' +import { defaultBucketSize } from '../priority-queue.js' import type { MessageValue, PromiseResponseWrapper, @@ -47,6 +48,7 @@ import { import { WorkerChoiceStrategiesContext } from './selection-strategies/worker-choice-strategies-context.js' import { checkFilePath, + checkValidPriority, checkValidTasksQueueOptions, checkValidWorkerChoiceStrategy, getDefaultTasksQueueOptions, @@ -91,8 +93,13 @@ export abstract class AbstractPool< * * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id. */ - protected promiseResponseMap: Map> = - new Map>() + protected promiseResponseMap: Map< + `${string}-${string}-${string}-${string}-${string}`, + PromiseResponseWrapper + > = new Map< + `${string}-${string}-${string}-${string}-${string}`, + PromiseResponseWrapper + >() /** * Worker choice strategies context referencing worker choice algorithms implementation. @@ -136,7 +143,7 @@ export abstract class AbstractPool< /** * The start timestamp of the pool. */ - private readonly startTimestamp + private startTimestamp?: number /** * Constructs a new poolifier pool. @@ -167,7 +174,7 @@ export abstract class AbstractPool< this.enqueueTask = this.enqueueTask.bind(this) if (this.opts.enableEvents === true) { - this.initializeEventEmitter() + this.initEventEmitter() } this.workerChoiceStrategiesContext = new WorkerChoiceStrategiesContext< Worker, @@ -192,8 +199,6 @@ export abstract class AbstractPool< if (this.opts.startWorkers === true) { this.start() } - - this.startTimestamp = performance.now() } private checkPoolType (): void { @@ -282,7 +287,7 @@ export abstract class AbstractPool< } } - private initializeEventEmitter (): void { + private initEventEmitter (): void { this.emitter = new EventEmitterAsyncResource({ name: `poolifier:${this.type}-${this.worker}-pool` }) @@ -323,7 +328,7 @@ export abstract class AbstractPool< ) }), busyWorkerNodes: this.workerNodes.reduce( - (accumulator, _workerNode, workerNodeKey) => + (accumulator, _, workerNodeKey) => this.isWorkerNodeBusy(workerNodeKey) ? accumulator + 1 : accumulator, 0 ), @@ -372,14 +377,16 @@ export abstract class AbstractPool< minimum: round( min( ...this.workerNodes.map( - workerNode => workerNode.usage.runTime.minimum ?? Infinity + workerNode => + workerNode.usage.runTime.minimum ?? Number.POSITIVE_INFINITY ) ) ), maximum: round( max( ...this.workerNodes.map( - workerNode => workerNode.usage.runTime.maximum ?? -Infinity + workerNode => + workerNode.usage.runTime.maximum ?? Number.NEGATIVE_INFINITY ) ) ), @@ -389,7 +396,9 @@ export abstract class AbstractPool< average( this.workerNodes.reduce( (accumulator, workerNode) => - accumulator.concat(workerNode.usage.runTime.history), + accumulator.concat( + workerNode.usage.runTime.history.toArray() + ), [] ) ) @@ -401,7 +410,9 @@ export abstract class AbstractPool< median( this.workerNodes.reduce( (accumulator, workerNode) => - accumulator.concat(workerNode.usage.runTime.history), + accumulator.concat( + workerNode.usage.runTime.history.toArray() + ), [] ) ) @@ -415,14 +426,16 @@ export abstract class AbstractPool< minimum: round( min( ...this.workerNodes.map( - workerNode => workerNode.usage.waitTime.minimum ?? Infinity + workerNode => + workerNode.usage.waitTime.minimum ?? Number.POSITIVE_INFINITY ) ) ), maximum: round( max( ...this.workerNodes.map( - workerNode => workerNode.usage.waitTime.maximum ?? -Infinity + workerNode => + workerNode.usage.waitTime.maximum ?? Number.NEGATIVE_INFINITY ) ) ), @@ -432,7 +445,9 @@ export abstract class AbstractPool< average( this.workerNodes.reduce( (accumulator, workerNode) => - accumulator.concat(workerNode.usage.waitTime.history), + accumulator.concat( + workerNode.usage.waitTime.history.toArray() + ), [] ) ) @@ -444,13 +459,132 @@ export abstract class AbstractPool< median( this.workerNodes.reduce( (accumulator, workerNode) => - accumulator.concat(workerNode.usage.waitTime.history), + accumulator.concat( + workerNode.usage.waitTime.history.toArray() + ), [] ) ) ) }) } + }), + ...(this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements() + .elu.aggregate === true && { + elu: { + idle: { + minimum: round( + min( + ...this.workerNodes.map( + workerNode => + workerNode.usage.elu.idle.minimum ?? + Number.POSITIVE_INFINITY + ) + ) + ), + maximum: round( + max( + ...this.workerNodes.map( + workerNode => + workerNode.usage.elu.idle.maximum ?? + Number.NEGATIVE_INFINITY + ) + ) + ), + ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements() + .elu.average && { + average: round( + average( + this.workerNodes.reduce( + (accumulator, workerNode) => + accumulator.concat( + workerNode.usage.elu.idle.history.toArray() + ), + [] + ) + ) + ) + }), + ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements() + .elu.median && { + median: round( + median( + this.workerNodes.reduce( + (accumulator, workerNode) => + accumulator.concat( + workerNode.usage.elu.idle.history.toArray() + ), + [] + ) + ) + ) + }) + }, + active: { + minimum: round( + min( + ...this.workerNodes.map( + workerNode => + workerNode.usage.elu.active.minimum ?? + Number.POSITIVE_INFINITY + ) + ) + ), + maximum: round( + max( + ...this.workerNodes.map( + workerNode => + workerNode.usage.elu.active.maximum ?? + Number.NEGATIVE_INFINITY + ) + ) + ), + ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements() + .elu.average && { + average: round( + average( + this.workerNodes.reduce( + (accumulator, workerNode) => + accumulator.concat( + workerNode.usage.elu.active.history.toArray() + ), + [] + ) + ) + ) + }), + ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements() + .elu.median && { + median: round( + median( + this.workerNodes.reduce( + (accumulator, workerNode) => + accumulator.concat( + workerNode.usage.elu.active.history.toArray() + ), + [] + ) + ) + ) + }) + }, + utilization: { + average: round( + average( + this.workerNodes.map( + workerNode => workerNode.usage.elu.utilization ?? 0 + ) + ) + ), + median: round( + median( + this.workerNodes.map( + workerNode => workerNode.usage.elu.utilization ?? 0 + ) + ) + ) + } + } }) } } @@ -486,6 +620,9 @@ export abstract class AbstractPool< * @returns The pool utilization. */ private get utilization (): number { + if (this.startTimestamp == null) { + return 0 + } const poolTimeCapacity = (performance.now() - this.startTimestamp) * (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers) @@ -547,9 +684,12 @@ export abstract class AbstractPool< workerChoiceStrategy: WorkerChoiceStrategy, workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions ): void { + let requireSync = false checkValidWorkerChoiceStrategy(workerChoiceStrategy) if (workerChoiceStrategyOptions != null) { - this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions) + requireSync = !this.setWorkerChoiceStrategyOptions( + workerChoiceStrategyOptions + ) } if (workerChoiceStrategy !== this.opts.workerChoiceStrategy) { this.opts.workerChoiceStrategy = workerChoiceStrategy @@ -557,11 +697,14 @@ export abstract class AbstractPool< this.opts.workerChoiceStrategy, this.opts.workerChoiceStrategyOptions ) + requireSync = true + } + if (requireSync) { this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies( - this.getWorkerWorkerChoiceStrategies(), + this.getWorkerChoiceStrategies(), this.opts.workerChoiceStrategyOptions ) - for (const [workerNodeKey] of this.workerNodes.entries()) { + for (const workerNodeKey of this.workerNodes.keys()) { this.sendStatisticsMessageToWorker(workerNodeKey) } } @@ -570,14 +713,23 @@ export abstract class AbstractPool< /** @inheritDoc */ public setWorkerChoiceStrategyOptions ( workerChoiceStrategyOptions: WorkerChoiceStrategyOptions | undefined - ): void { + ): boolean { this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions) if (workerChoiceStrategyOptions != null) { this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions + this.workerChoiceStrategiesContext?.setOptions( + this.opts.workerChoiceStrategyOptions + ) + this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies( + this.getWorkerChoiceStrategies(), + this.opts.workerChoiceStrategyOptions + ) + for (const workerNodeKey of this.workerNodes.keys()) { + this.sendStatisticsMessageToWorker(workerNodeKey) + } + return true } - this.workerChoiceStrategiesContext?.setOptions( - this.opts.workerChoiceStrategyOptions - ) + return false } /** @inheritDoc */ @@ -639,13 +791,13 @@ export abstract class AbstractPool< } private setTaskStealing (): void { - for (const [workerNodeKey] of this.workerNodes.entries()) { + for (const workerNodeKey of this.workerNodes.keys()) { this.workerNodes[workerNodeKey].on('idle', this.handleWorkerNodeIdleEvent) } } private unsetTaskStealing (): void { - for (const [workerNodeKey] of this.workerNodes.entries()) { + for (const workerNodeKey of this.workerNodes.keys()) { this.workerNodes[workerNodeKey].off( 'idle', this.handleWorkerNodeIdleEvent @@ -654,7 +806,7 @@ export abstract class AbstractPool< } private setTasksStealingOnBackPressure (): void { - for (const [workerNodeKey] of this.workerNodes.entries()) { + for (const workerNodeKey of this.workerNodes.keys()) { this.workerNodes[workerNodeKey].on( 'backPressure', this.handleWorkerNodeBackPressureEvent @@ -663,7 +815,7 @@ export abstract class AbstractPool< } private unsetTasksStealingOnBackPressure (): void { - for (const [workerNodeKey] of this.workerNodes.entries()) { + for (const workerNodeKey of this.workerNodes.keys()) { this.workerNodes[workerNodeKey].off( 'backPressure', this.handleWorkerNodeBackPressureEvent @@ -806,7 +958,7 @@ export abstract class AbstractPool< } } } - for (const [workerNodeKey] of this.workerNodes.entries()) { + for (const workerNodeKey of this.workerNodes.keys()) { this.registerWorkerMessageListener( workerNodeKey, taskFunctionOperationsListener @@ -840,6 +992,8 @@ export abstract class AbstractPool< if (typeof fn.taskFunction !== 'function') { throw new TypeError('taskFunction property must be a function') } + checkValidPriority(fn.priority) + checkValidWorkerChoiceStrategy(fn.strategy) const opResult = await this.sendTaskFunctionOperationToWorkers({ taskFunctionOperation: 'add', taskFunctionProperties: buildTaskFunctionProperties(name, fn), @@ -847,8 +1001,11 @@ export abstract class AbstractPool< }) this.taskFunctions.set(name, fn) this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies( - this.getWorkerWorkerChoiceStrategies() + this.getWorkerChoiceStrategies() ) + for (const workerNodeKey of this.workerNodes.keys()) { + this.sendStatisticsMessageToWorker(workerNodeKey) + } return opResult } @@ -866,11 +1023,16 @@ export abstract class AbstractPool< this.taskFunctions.get(name) ) }) - this.deleteTaskFunctionWorkerUsages(name) + for (const workerNode of this.workerNodes) { + workerNode.deleteTaskFunctionWorkerUsage(name) + } this.taskFunctions.delete(name) this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies( - this.getWorkerWorkerChoiceStrategies() + this.getWorkerChoiceStrategies() ) + for (const workerNodeKey of this.workerNodes.keys()) { + this.sendStatisticsMessageToWorker(workerNodeKey) + } return opResult } @@ -888,20 +1050,73 @@ export abstract class AbstractPool< } /** - * Gets task function strategy, if any. + * Gets task function worker choice strategy, if any. * * @param name - The task function name. * @returns The task function worker choice strategy if the task function worker choice strategy is defined, `undefined` otherwise. */ - private readonly getTaskFunctionWorkerWorkerChoiceStrategy = ( + private readonly getTaskFunctionWorkerChoiceStrategy = ( name?: string ): WorkerChoiceStrategy | undefined => { - if (name != null) { - return this.listTaskFunctionsProperties().find( - (taskFunctionProperties: TaskFunctionProperties) => - taskFunctionProperties.name === name - )?.strategy + name = name ?? DEFAULT_TASK_NAME + const taskFunctionsProperties = this.listTaskFunctionsProperties() + if (name === DEFAULT_TASK_NAME) { + name = taskFunctionsProperties[1]?.name } + return taskFunctionsProperties.find( + (taskFunctionProperties: TaskFunctionProperties) => + taskFunctionProperties.name === name + )?.strategy + } + + /** + * Gets worker node task function worker choice strategy, if any. + * + * @param workerNodeKey - The worker node key. + * @param name - The task function name. + * @returns The worker node task function worker choice strategy if the worker node task function worker choice strategy is defined, `undefined` otherwise. + */ + private readonly getWorkerNodeTaskFunctionWorkerChoiceStrategy = ( + workerNodeKey: number, + name?: string + ): WorkerChoiceStrategy | undefined => { + const workerInfo = this.getWorkerInfo(workerNodeKey) + if (workerInfo == null) { + return + } + name = name ?? DEFAULT_TASK_NAME + if (name === DEFAULT_TASK_NAME) { + name = workerInfo.taskFunctionsProperties?.[1]?.name + } + return workerInfo.taskFunctionsProperties?.find( + (taskFunctionProperties: TaskFunctionProperties) => + taskFunctionProperties.name === name + )?.strategy + } + + /** + * Gets worker node task function priority, if any. + * + * @param workerNodeKey - The worker node key. + * @param name - The task function name. + * @returns The worker node task function priority if the worker node task function priority is defined, `undefined` otherwise. + */ + private readonly getWorkerNodeTaskFunctionPriority = ( + workerNodeKey: number, + name?: string + ): number | undefined => { + const workerInfo = this.getWorkerInfo(workerNodeKey) + if (workerInfo == null) { + return + } + name = name ?? DEFAULT_TASK_NAME + if (name === DEFAULT_TASK_NAME) { + name = workerInfo.taskFunctionsProperties?.[1]?.name + } + return workerInfo.taskFunctionsProperties?.find( + (taskFunctionProperties: TaskFunctionProperties) => + taskFunctionProperties.name === name + )?.priority } /** @@ -909,7 +1124,7 @@ export abstract class AbstractPool< * * @returns The worker choice strategies. */ - private readonly getWorkerWorkerChoiceStrategies = + private readonly getWorkerChoiceStrategies = (): Set => { return new Set([ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion @@ -936,12 +1151,6 @@ export abstract class AbstractPool< }) } - private deleteTaskFunctionWorkerUsages (name: string): void { - for (const workerNode of this.workerNodes) { - workerNode.deleteTaskFunctionWorkerUsage(name) - } - } - private shallExecuteTask (workerNodeKey: number): boolean { return ( this.tasksQueueSize(workerNodeKey) === 0 && @@ -983,13 +1192,16 @@ export abstract class AbstractPool< return } const timestamp = performance.now() - const workerNodeKey = this.chooseWorkerNode( - this.getTaskFunctionWorkerWorkerChoiceStrategy(name) - ) + const workerNodeKey = this.chooseWorkerNode(name) const task: Task = { name: name ?? DEFAULT_TASK_NAME, // eslint-disable-next-line @typescript-eslint/consistent-type-assertions data: data ?? ({} as Data), + priority: this.getWorkerNodeTaskFunctionPriority(workerNodeKey, name), + strategy: this.getWorkerNodeTaskFunctionWorkerChoiceStrategy( + workerNodeKey, + name + ), transferList, timestamp, taskId: randomUUID() @@ -1021,7 +1233,7 @@ export abstract class AbstractPool< /** * Starts the minimum number of workers. */ - private startMinimumNumberOfWorkers (): void { + private startMinimumNumberOfWorkers (initWorkerNodeUsage = false): void { this.startingMinimumNumberOfWorkers = true while ( this.workerNodes.reduce( @@ -1030,7 +1242,9 @@ export abstract class AbstractPool< 0 ) < this.minimumNumberOfWorkers ) { - this.createAndSetupWorkerNode() + const workerNodeKey = this.createAndSetupWorkerNode() + initWorkerNodeUsage && + this.initWorkerNodeUsage(this.workerNodes[workerNodeKey]) } this.startingMinimumNumberOfWorkers = false } @@ -1048,6 +1262,7 @@ export abstract class AbstractPool< } this.starting = true this.startMinimumNumberOfWorkers() + this.startTimestamp = performance.now() this.starting = false this.started = true } @@ -1065,13 +1280,14 @@ export abstract class AbstractPool< } this.destroying = true await Promise.all( - this.workerNodes.map(async (_workerNode, workerNodeKey) => { + this.workerNodes.map(async (_, workerNodeKey) => { await this.destroyWorkerNode(workerNodeKey) }) ) this.emitter?.emit(PoolEvents.destroy, this.info) this.emitter?.emitDestroy() this.readyEventEmitted = false + delete this.startTimestamp this.destroying = false this.started = false } @@ -1192,7 +1408,7 @@ export abstract class AbstractPool< workerNodeKey: number, message: MessageValue ): void { - let needWorkerChoiceStrategyUpdate = false + let needWorkerChoiceStrategiesUpdate = false // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition if (this.workerNodes[workerNodeKey]?.usage != null) { const workerUsage = this.workerNodes[workerNodeKey].usage @@ -1207,7 +1423,7 @@ export abstract class AbstractPool< workerUsage, message ) - needWorkerChoiceStrategyUpdate = true + needWorkerChoiceStrategiesUpdate = true } if ( this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) && @@ -1232,9 +1448,9 @@ export abstract class AbstractPool< taskFunctionWorkerUsage, message ) - needWorkerChoiceStrategyUpdate = true + needWorkerChoiceStrategiesUpdate = true } - if (needWorkerChoiceStrategyUpdate) { + if (needWorkerChoiceStrategiesUpdate) { this.workerChoiceStrategiesContext?.update(workerNodeKey) } } @@ -1257,12 +1473,10 @@ export abstract class AbstractPool< /** * Chooses a worker node for the next task. * - * @param workerChoiceStrategy - The worker choice strategy. - * @returns The chosen worker node key + * @param name - The task function name. + * @returns The chosen worker node key. */ - private chooseWorkerNode ( - workerChoiceStrategy?: WorkerChoiceStrategy - ): number { + private chooseWorkerNode (name?: string): number { if (this.shallCreateDynamicWorker()) { const workerNodeKey = this.createAndSetupDynamicWorkerNode() if ( @@ -1273,7 +1487,9 @@ export abstract class AbstractPool< } } // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - return this.workerChoiceStrategiesContext!.execute(workerChoiceStrategy) + return this.workerChoiceStrategiesContext!.execute( + this.getTaskFunctionWorkerChoiceStrategy(name) + ) } /** @@ -1296,6 +1512,47 @@ export abstract class AbstractPool< transferList?: readonly TransferListItem[] ): void + /** + * Initializes the worker node usage with sensible default values gathered during runtime. + * + * @param workerNode - The worker node. + */ + private initWorkerNodeUsage (workerNode: IWorkerNode): void { + if ( + this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements() + .runTime.aggregate === true + ) { + workerNode.usage.runTime.aggregate = min( + ...this.workerNodes.map( + workerNode => + workerNode.usage.runTime.aggregate ?? Number.POSITIVE_INFINITY + ) + ) + } + if ( + this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements() + .waitTime.aggregate === true + ) { + workerNode.usage.waitTime.aggregate = min( + ...this.workerNodes.map( + workerNode => + workerNode.usage.waitTime.aggregate ?? Number.POSITIVE_INFINITY + ) + ) + } + if ( + this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements().elu + .aggregate === true + ) { + workerNode.usage.elu.active.aggregate = min( + ...this.workerNodes.map( + workerNode => + workerNode.usage.elu.active.aggregate ?? Number.POSITIVE_INFINITY + ) + ) + } + } + /** * Creates a new, completely set up worker node. * @@ -1326,7 +1583,7 @@ export abstract class AbstractPool< if (workerNode.info.dynamic) { this.createAndSetupDynamicWorkerNode() } else if (!this.startingMinimumNumberOfWorkers) { - this.startMinimumNumberOfWorkers() + this.startMinimumNumberOfWorkers(true) } } if ( @@ -1352,7 +1609,7 @@ export abstract class AbstractPool< !this.startingMinimumNumberOfWorkers && !this.destroying ) { - this.startMinimumNumberOfWorkers() + this.startMinimumNumberOfWorkers(true) } }) const workerNodeKey = this.addWorkerNode(workerNode) @@ -1372,6 +1629,7 @@ export abstract class AbstractPool< const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId( message.workerId ) + const workerInfo = this.getWorkerInfo(localWorkerNodeKey) const workerUsage = this.workerNodes[localWorkerNodeKey]?.usage // Kill message received from worker if ( @@ -1380,6 +1638,8 @@ export abstract class AbstractPool< ((this.opts.enableTasksQueue === false && workerUsage.tasks.executing === 0) || (this.opts.enableTasksQueue === true && + workerInfo != null && + !workerInfo.stealing && workerUsage.tasks.executing === 0 && this.tasksQueueSize(localWorkerNodeKey) === 0))) ) { @@ -1417,6 +1677,7 @@ export abstract class AbstractPool< ) { workerNode.info.ready = true } + this.initWorkerNodeUsage(workerNode) this.checkAndEmitDynamicWorkerCreationEvents() return workerNodeKey } @@ -1529,14 +1790,15 @@ export abstract class AbstractPool< } } - private redistributeQueuedTasks (workerNodeKey: number): void { - if (workerNodeKey === -1 || this.cannotStealTask()) { + private redistributeQueuedTasks (sourceWorkerNodeKey: number): void { + if (sourceWorkerNodeKey === -1 || this.cannotStealTask()) { return } - while (this.tasksQueueSize(workerNodeKey) > 0) { + while (this.tasksQueueSize(sourceWorkerNodeKey) > 0) { const destinationWorkerNodeKey = this.workerNodes.reduce( (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => { - return workerNode.info.ready && + return sourceWorkerNodeKey !== workerNodeKey && + workerNode.info.ready && workerNode.usage.tasks.queued < workerNodes[minWorkerNodeKey].usage.tasks.queued ? workerNodeKey @@ -1547,7 +1809,7 @@ export abstract class AbstractPool< this.handleTask( destinationWorkerNodeKey, // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - this.dequeueTask(workerNodeKey)! + this.dequeueTask(sourceWorkerNodeKey)! ) } } @@ -1565,28 +1827,21 @@ export abstract class AbstractPool< this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) && workerNode.getTaskFunctionWorkerUsage(taskName) != null ) { - const taskFunctionWorkerUsage = - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - workerNode.getTaskFunctionWorkerUsage(taskName)! - ++taskFunctionWorkerUsage.tasks.stolen + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + ++workerNode.getTaskFunctionWorkerUsage(taskName)!.tasks.stolen } } private updateTaskSequentiallyStolenStatisticsWorkerUsage ( - workerNodeKey: number + workerNodeKey: number, + taskName: string, + previousTaskName?: string ): void { const workerNode = this.workerNodes[workerNodeKey] // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition if (workerNode?.usage != null) { ++workerNode.usage.tasks.sequentiallyStolen } - } - - private updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage ( - workerNodeKey: number, - taskName: string - ): void { - const workerNode = this.workerNodes[workerNodeKey] if ( this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) && workerNode.getTaskFunctionWorkerUsage(taskName) != null @@ -1594,33 +1849,36 @@ export abstract class AbstractPool< const taskFunctionWorkerUsage = // eslint-disable-next-line @typescript-eslint/no-non-null-assertion workerNode.getTaskFunctionWorkerUsage(taskName)! - ++taskFunctionWorkerUsage.tasks.sequentiallyStolen + if ( + taskFunctionWorkerUsage.tasks.sequentiallyStolen === 0 || + (previousTaskName != null && + previousTaskName === taskName && + taskFunctionWorkerUsage.tasks.sequentiallyStolen > 0) + ) { + ++taskFunctionWorkerUsage.tasks.sequentiallyStolen + } else if (taskFunctionWorkerUsage.tasks.sequentiallyStolen > 0) { + taskFunctionWorkerUsage.tasks.sequentiallyStolen = 0 + } } } private resetTaskSequentiallyStolenStatisticsWorkerUsage ( - workerNodeKey: number + workerNodeKey: number, + taskName: string ): void { const workerNode = this.workerNodes[workerNodeKey] // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition if (workerNode?.usage != null) { workerNode.usage.tasks.sequentiallyStolen = 0 } - } - - private resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage ( - workerNodeKey: number, - taskName: string - ): void { - const workerNode = this.workerNodes[workerNodeKey] if ( this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) && workerNode.getTaskFunctionWorkerUsage(taskName) != null ) { - const taskFunctionWorkerUsage = - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - workerNode.getTaskFunctionWorkerUsage(taskName)! - taskFunctionWorkerUsage.tasks.sequentiallyStolen = 0 + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + workerNode.getTaskFunctionWorkerUsage( + taskName + )!.tasks.sequentiallyStolen = 0 } } @@ -1635,69 +1893,49 @@ export abstract class AbstractPool< ) } const workerInfo = this.getWorkerInfo(workerNodeKey) + if (workerInfo == null) { + throw new Error( + `Worker node with key '${workerNodeKey}' not found in pool` + ) + } if ( this.cannotStealTask() || (this.info.stealingWorkerNodes ?? 0) > Math.floor(this.workerNodes.length / 2) ) { - if (workerInfo != null && previousStolenTask != null) { + if (previousStolenTask != null) { workerInfo.stealing = false + this.resetTaskSequentiallyStolenStatisticsWorkerUsage( + workerNodeKey, + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + previousStolenTask.name! + ) } return } const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks if ( - workerInfo != null && previousStolenTask != null && - workerNodeTasksUsage.sequentiallyStolen > 0 && (workerNodeTasksUsage.executing > 0 || this.tasksQueueSize(workerNodeKey) > 0) ) { workerInfo.stealing = false - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - for (const taskFunctionProperties of workerInfo.taskFunctionsProperties!) { - this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage( - workerNodeKey, - taskFunctionProperties.name - ) - } - this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey) - return - } - if (workerInfo == null) { - throw new Error( - `Worker node with key '${workerNodeKey}' not found in pool` + this.resetTaskSequentiallyStolenStatisticsWorkerUsage( + workerNodeKey, + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + previousStolenTask.name! ) + return } workerInfo.stealing = true const stolenTask = this.workerNodeStealTask(workerNodeKey) - if ( - this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) && - stolenTask != null - ) { - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - const taskFunctionTasksWorkerUsage = this.workerNodes[ - workerNodeKey + if (stolenTask != null) { + this.updateTaskSequentiallyStolenStatisticsWorkerUsage( + workerNodeKey, // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - ].getTaskFunctionWorkerUsage(stolenTask.name!)!.tasks - if ( - taskFunctionTasksWorkerUsage.sequentiallyStolen === 0 || - (previousStolenTask != null && - previousStolenTask.name === stolenTask.name && - taskFunctionTasksWorkerUsage.sequentiallyStolen > 0) - ) { - this.updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage( - workerNodeKey, - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - stolenTask.name! - ) - } else { - this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage( - workerNodeKey, - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - stolenTask.name! - ) - } + stolenTask.name!, + previousStolenTask?.name + ) } sleep(exponentialDelay(workerNodeTasksUsage.sequentiallyStolen)) .then(() => { @@ -1727,9 +1965,8 @@ export abstract class AbstractPool< ) if (sourceWorkerNode != null) { // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - const task = sourceWorkerNode.popTask()! + const task = sourceWorkerNode.dequeueLastPrioritizedTask()! this.handleTask(workerNodeKey, task) - this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey) // eslint-disable-next-line @typescript-eslint/no-non-null-assertion this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey, task.name!) return task @@ -1741,17 +1978,18 @@ export abstract class AbstractPool< ): void => { if ( this.cannotStealTask() || + this.hasBackPressure() || (this.info.stealingWorkerNodes ?? 0) > Math.floor(this.workerNodes.length / 2) ) { return } - const { workerId } = eventDetail const sizeOffset = 1 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion if (this.opts.tasksQueueOptions!.size! <= sizeOffset) { return } + const { workerId } = eventDetail const sourceWorkerNode = this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)] const workerNodes = this.workerNodes @@ -1778,7 +2016,7 @@ export abstract class AbstractPool< } workerInfo.stealing = true // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - const task = sourceWorkerNode.popTask()! + const task = sourceWorkerNode.dequeueLastPrioritizedTask()! this.handleTask(workerNodeKey, task) // eslint-disable-next-line @typescript-eslint/no-non-null-assertion this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey, task.name!) @@ -1787,6 +2025,12 @@ export abstract class AbstractPool< } } + private setTasksQueuePriority (workerNodeKey: number): void { + this.workerNodes[workerNodeKey].setTasksQueuePriority( + this.getTasksQueuePriority() + ) + } + /** * This method is the message listener registered on each worker. */ @@ -1800,11 +2044,12 @@ export abstract class AbstractPool< this.handleWorkerReadyResponse(message) } else if (taskFunctionsProperties != null) { // Task function properties message received from worker - const workerInfo = this.getWorkerInfo( - this.getWorkerNodeKeyByWorkerId(workerId) - ) + const workerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId) + const workerInfo = this.getWorkerInfo(workerNodeKey) if (workerInfo != null) { workerInfo.taskFunctionsProperties = taskFunctionsProperties + this.sendStatisticsMessageToWorker(workerNodeKey) + this.setTasksQueuePriority(workerNodeKey) } } else if (taskId != null) { // Task execution response received from worker @@ -1824,10 +2069,12 @@ export abstract class AbstractPool< if (ready == null || !ready) { throw new Error(`Worker ${workerId} failed to initialize`) } - const workerNode = - this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)] + const workerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId) + const workerNode = this.workerNodes[workerNodeKey] workerNode.info.ready = ready workerNode.info.taskFunctionsProperties = taskFunctionsProperties + this.sendStatisticsMessageToWorker(workerNodeKey) + this.setTasksQueuePriority(workerNodeKey) this.checkAndEmitReadyEvent() } @@ -1915,6 +2162,12 @@ export abstract class AbstractPool< return this.workerNodes[workerNodeKey]?.info } + private getTasksQueuePriority (): boolean { + return this.listTaskFunctionsProperties().some( + taskFunctionProperties => taskFunctionProperties.priority != null + ) + } + /** * Creates a worker node. * @@ -1931,7 +2184,9 @@ export abstract class AbstractPool< this.opts.tasksQueueOptions?.size ?? getDefaultTasksQueueOptions( this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers - ).size + ).size, + tasksQueueBucketSize: defaultBucketSize, + tasksQueuePriority: this.getTasksQueuePriority() } ) // Flag the worker node as ready at pool startup. @@ -2032,7 +2287,7 @@ export abstract class AbstractPool< } private flushTasksQueues (): void { - for (const [workerNodeKey] of this.workerNodes.entries()) { + for (const workerNodeKey of this.workerNodes.keys()) { this.flushTasksQueue(workerNodeKey) } }