X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=4f60dff0279643a864b8c570eaddf3e3af9e7546;hb=0c7b9e90e091ac71ede7a376236e947c7a81464f;hp=63f1f8595c928a6db6e8daf2fe8180b9685b459a;hpb=bcfb06ce041a682baf396a099c633a848d6a4045;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 63f1f859..4f60dff0 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -47,6 +47,7 @@ import { import { WorkerChoiceStrategiesContext } from './selection-strategies/worker-choice-strategies-context.js' import { checkFilePath, + checkValidPriority, checkValidTasksQueueOptions, checkValidWorkerChoiceStrategy, getDefaultTasksQueueOptions, @@ -136,7 +137,7 @@ export abstract class AbstractPool< /** * The start timestamp of the pool. */ - private readonly startTimestamp + private startTimestamp?: number /** * Constructs a new poolifier pool. @@ -192,8 +193,6 @@ export abstract class AbstractPool< if (this.opts.startWorkers === true) { this.start() } - - this.startTimestamp = performance.now() } private checkPoolType (): void { @@ -486,6 +485,9 @@ export abstract class AbstractPool< * @returns The pool utilization. */ private get utilization (): number { + if (this.startTimestamp == null) { + return 0 + } const poolTimeCapacity = (performance.now() - this.startTimestamp) * (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers) @@ -547,28 +549,52 @@ export abstract class AbstractPool< workerChoiceStrategy: WorkerChoiceStrategy, workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions ): void { + let requireSync = false checkValidWorkerChoiceStrategy(workerChoiceStrategy) - this.opts.workerChoiceStrategy = workerChoiceStrategy - this.workerChoiceStrategiesContext?.setDefaultWorkerChoiceStrategy( - this.opts.workerChoiceStrategy, - workerChoiceStrategyOptions - ) - for (const [workerNodeKey] of this.workerNodes.entries()) { - this.sendStatisticsMessageToWorker(workerNodeKey) + if (workerChoiceStrategyOptions != null) { + requireSync = !this.setWorkerChoiceStrategyOptions( + workerChoiceStrategyOptions + ) + } + if (workerChoiceStrategy !== this.opts.workerChoiceStrategy) { + this.opts.workerChoiceStrategy = workerChoiceStrategy + this.workerChoiceStrategiesContext?.setDefaultWorkerChoiceStrategy( + this.opts.workerChoiceStrategy, + this.opts.workerChoiceStrategyOptions + ) + requireSync = true + } + if (requireSync) { + this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies( + this.getWorkerWorkerChoiceStrategies(), + this.opts.workerChoiceStrategyOptions + ) + for (const workerNodeKey of this.workerNodes.keys()) { + this.sendStatisticsMessageToWorker(workerNodeKey) + } } } /** @inheritDoc */ public setWorkerChoiceStrategyOptions ( workerChoiceStrategyOptions: WorkerChoiceStrategyOptions | undefined - ): void { + ): boolean { this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions) if (workerChoiceStrategyOptions != null) { this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions + this.workerChoiceStrategiesContext?.setOptions( + this.opts.workerChoiceStrategyOptions + ) + this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies( + this.getWorkerWorkerChoiceStrategies(), + this.opts.workerChoiceStrategyOptions + ) + for (const workerNodeKey of this.workerNodes.keys()) { + this.sendStatisticsMessageToWorker(workerNodeKey) + } + return true } - this.workerChoiceStrategiesContext?.setOptions( - this.opts.workerChoiceStrategyOptions - ) + return false } /** @inheritDoc */ @@ -630,13 +656,13 @@ export abstract class AbstractPool< } private setTaskStealing (): void { - for (const [workerNodeKey] of this.workerNodes.entries()) { + for (const workerNodeKey of this.workerNodes.keys()) { this.workerNodes[workerNodeKey].on('idle', this.handleWorkerNodeIdleEvent) } } private unsetTaskStealing (): void { - for (const [workerNodeKey] of this.workerNodes.entries()) { + for (const workerNodeKey of this.workerNodes.keys()) { this.workerNodes[workerNodeKey].off( 'idle', this.handleWorkerNodeIdleEvent @@ -645,7 +671,7 @@ export abstract class AbstractPool< } private setTasksStealingOnBackPressure (): void { - for (const [workerNodeKey] of this.workerNodes.entries()) { + for (const workerNodeKey of this.workerNodes.keys()) { this.workerNodes[workerNodeKey].on( 'backPressure', this.handleWorkerNodeBackPressureEvent @@ -654,7 +680,7 @@ export abstract class AbstractPool< } private unsetTasksStealingOnBackPressure (): void { - for (const [workerNodeKey] of this.workerNodes.entries()) { + for (const workerNodeKey of this.workerNodes.keys()) { this.workerNodes[workerNodeKey].off( 'backPressure', this.handleWorkerNodeBackPressureEvent @@ -797,7 +823,7 @@ export abstract class AbstractPool< } } } - for (const [workerNodeKey] of this.workerNodes.entries()) { + for (const workerNodeKey of this.workerNodes.keys()) { this.registerWorkerMessageListener( workerNodeKey, taskFunctionOperationsListener @@ -809,17 +835,9 @@ export abstract class AbstractPool< /** @inheritDoc */ public hasTaskFunction (name: string): boolean { - for (const workerNode of this.workerNodes) { - if ( - Array.isArray(workerNode.info.taskFunctionsProperties) && - workerNode.info.taskFunctionsProperties.some( - taskFunctionProperties => taskFunctionProperties.name === name - ) - ) { - return true - } - } - return false + return this.listTaskFunctionsProperties().some( + taskFunctionProperties => taskFunctionProperties.name === name + ) } /** @inheritDoc */ @@ -839,12 +857,17 @@ export abstract class AbstractPool< if (typeof fn.taskFunction !== 'function') { throw new TypeError('taskFunction property must be a function') } + checkValidPriority(fn.priority) + checkValidWorkerChoiceStrategy(fn.strategy) const opResult = await this.sendTaskFunctionOperationToWorkers({ taskFunctionOperation: 'add', taskFunctionProperties: buildTaskFunctionProperties(name, fn), taskFunction: fn.taskFunction.toString() }) this.taskFunctions.set(name, fn) + this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies( + this.getWorkerWorkerChoiceStrategies() + ) return opResult } @@ -864,6 +887,9 @@ export abstract class AbstractPool< }) this.deleteTaskFunctionWorkerUsages(name) this.taskFunctions.delete(name) + this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies( + this.getWorkerWorkerChoiceStrategies() + ) return opResult } @@ -897,6 +923,46 @@ export abstract class AbstractPool< } } + /** + * Gets worker node task function priority, if any. + * + * @param workerNodeKey - The worker node key. + * @param name - The task function name. + * @returns The task function worker choice priority if the task function worker choice priority is defined, `undefined` otherwise. + */ + private readonly getWorkerNodeTaskFunctionPriority = ( + workerNodeKey: number, + name?: string + ): number | undefined => { + if (name != null) { + return this.getWorkerInfo(workerNodeKey)?.taskFunctionsProperties?.find( + (taskFunctionProperties: TaskFunctionProperties) => + taskFunctionProperties.name === name + )?.priority + } + } + + /** + * Gets the worker choice strategies registered in this pool. + * + * @returns The worker choice strategies. + */ + private readonly getWorkerWorkerChoiceStrategies = + (): Set => { + return new Set([ + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + this.opts.workerChoiceStrategy!, + ...(this.listTaskFunctionsProperties() + .map( + (taskFunctionProperties: TaskFunctionProperties) => + taskFunctionProperties.strategy + ) + .filter( + (strategy: WorkerChoiceStrategy | undefined) => strategy != null + ) as WorkerChoiceStrategy[]) + ]) + } + /** @inheritDoc */ public async setDefaultTaskFunction (name: string): Promise { return await this.sendTaskFunctionOperationToWorkers({ @@ -955,13 +1021,15 @@ export abstract class AbstractPool< return } const timestamp = performance.now() - const workerNodeKey = this.chooseWorkerNode( + const taskFunctionStrategy = this.getTaskFunctionWorkerWorkerChoiceStrategy(name) - ) + const workerNodeKey = this.chooseWorkerNode(taskFunctionStrategy) const task: Task = { name: name ?? DEFAULT_TASK_NAME, // eslint-disable-next-line @typescript-eslint/consistent-type-assertions data: data ?? ({} as Data), + priority: this.getWorkerNodeTaskFunctionPriority(workerNodeKey, name), + strategy: taskFunctionStrategy, transferList, timestamp, taskId: randomUUID() @@ -1020,6 +1088,7 @@ export abstract class AbstractPool< } this.starting = true this.startMinimumNumberOfWorkers() + this.startTimestamp = performance.now() this.starting = false this.started = true } @@ -1044,6 +1113,7 @@ export abstract class AbstractPool< this.emitter?.emit(PoolEvents.destroy, this.info) this.emitter?.emitDestroy() this.readyEventEmitted = false + delete this.startTimestamp this.destroying = false this.started = false } @@ -1227,7 +1297,7 @@ export abstract class AbstractPool< } /** - * Chooses a worker node for the next task. + * Chooses a worker node for the next task given the worker choice strategy. * * @param workerChoiceStrategy - The worker choice strategy. * @returns The chosen worker node key @@ -1699,7 +1769,7 @@ export abstract class AbstractPool< ) if (sourceWorkerNode != null) { // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - const task = sourceWorkerNode.popTask()! + const task = sourceWorkerNode.dequeueTask(1)! this.handleTask(workerNodeKey, task) this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey) // eslint-disable-next-line @typescript-eslint/no-non-null-assertion @@ -1750,7 +1820,7 @@ export abstract class AbstractPool< } workerInfo.stealing = true // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - const task = sourceWorkerNode.popTask()! + const task = sourceWorkerNode.dequeueTask(1)! this.handleTask(workerNodeKey, task) // eslint-disable-next-line @typescript-eslint/no-non-null-assertion this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey, task.name!) @@ -1903,7 +1973,9 @@ export abstract class AbstractPool< this.opts.tasksQueueOptions?.size ?? getDefaultTasksQueueOptions( this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers - ).size + ).size, + tasksQueueBucketSize: + (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers) * 2 } ) // Flag the worker node as ready at pool startup. @@ -1984,8 +2056,11 @@ export abstract class AbstractPool< return tasksQueueSize } - private dequeueTask (workerNodeKey: number): Task | undefined { - return this.workerNodes[workerNodeKey].dequeueTask() + private dequeueTask ( + workerNodeKey: number, + bucket?: number + ): Task | undefined { + return this.workerNodes[workerNodeKey].dequeueTask(bucket) } private tasksQueueSize (workerNodeKey: number): number { @@ -2004,7 +2079,7 @@ export abstract class AbstractPool< } private flushTasksQueues (): void { - for (const [workerNodeKey] of this.workerNodes.entries()) { + for (const workerNodeKey of this.workerNodes.keys()) { this.flushTasksQueue(workerNodeKey) } }