X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=4f60dff0279643a864b8c570eaddf3e3af9e7546;hb=0c7b9e90e091ac71ede7a376236e947c7a81464f;hp=c510b2ad99495226456b880ddd83edd760728b10;hpb=85bbc7ab16c9f69a5dd358b71e3e6d4204dfd630;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index c510b2ad..4f60dff0 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -47,6 +47,7 @@ import { import { WorkerChoiceStrategiesContext } from './selection-strategies/worker-choice-strategies-context.js' import { checkFilePath, + checkValidPriority, checkValidTasksQueueOptions, checkValidWorkerChoiceStrategy, getDefaultTasksQueueOptions, @@ -136,7 +137,7 @@ export abstract class AbstractPool< /** * The start timestamp of the pool. */ - private readonly startTimestamp + private startTimestamp?: number /** * Constructs a new poolifier pool. @@ -192,8 +193,6 @@ export abstract class AbstractPool< if (this.opts.startWorkers === true) { this.start() } - - this.startTimestamp = performance.now() } private checkPoolType (): void { @@ -486,6 +485,9 @@ export abstract class AbstractPool< * @returns The pool utilization. */ private get utilization (): number { + if (this.startTimestamp == null) { + return 0 + } const poolTimeCapacity = (performance.now() - this.startTimestamp) * (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers) @@ -547,9 +549,12 @@ export abstract class AbstractPool< workerChoiceStrategy: WorkerChoiceStrategy, workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions ): void { + let requireSync = false checkValidWorkerChoiceStrategy(workerChoiceStrategy) if (workerChoiceStrategyOptions != null) { - this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions) + requireSync = !this.setWorkerChoiceStrategyOptions( + workerChoiceStrategyOptions + ) } if (workerChoiceStrategy !== this.opts.workerChoiceStrategy) { this.opts.workerChoiceStrategy = workerChoiceStrategy @@ -557,11 +562,14 @@ export abstract class AbstractPool< this.opts.workerChoiceStrategy, this.opts.workerChoiceStrategyOptions ) + requireSync = true + } + if (requireSync) { this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies( this.getWorkerWorkerChoiceStrategies(), this.opts.workerChoiceStrategyOptions ) - for (const [workerNodeKey] of this.workerNodes.entries()) { + for (const workerNodeKey of this.workerNodes.keys()) { this.sendStatisticsMessageToWorker(workerNodeKey) } } @@ -570,14 +578,23 @@ export abstract class AbstractPool< /** @inheritDoc */ public setWorkerChoiceStrategyOptions ( workerChoiceStrategyOptions: WorkerChoiceStrategyOptions | undefined - ): void { + ): boolean { this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions) if (workerChoiceStrategyOptions != null) { this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions + this.workerChoiceStrategiesContext?.setOptions( + this.opts.workerChoiceStrategyOptions + ) + this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies( + this.getWorkerWorkerChoiceStrategies(), + this.opts.workerChoiceStrategyOptions + ) + for (const workerNodeKey of this.workerNodes.keys()) { + this.sendStatisticsMessageToWorker(workerNodeKey) + } + return true } - this.workerChoiceStrategiesContext?.setOptions( - this.opts.workerChoiceStrategyOptions - ) + return false } /** @inheritDoc */ @@ -639,13 +656,13 @@ export abstract class AbstractPool< } private setTaskStealing (): void { - for (const [workerNodeKey] of this.workerNodes.entries()) { + for (const workerNodeKey of this.workerNodes.keys()) { this.workerNodes[workerNodeKey].on('idle', this.handleWorkerNodeIdleEvent) } } private unsetTaskStealing (): void { - for (const [workerNodeKey] of this.workerNodes.entries()) { + for (const workerNodeKey of this.workerNodes.keys()) { this.workerNodes[workerNodeKey].off( 'idle', this.handleWorkerNodeIdleEvent @@ -654,7 +671,7 @@ export abstract class AbstractPool< } private setTasksStealingOnBackPressure (): void { - for (const [workerNodeKey] of this.workerNodes.entries()) { + for (const workerNodeKey of this.workerNodes.keys()) { this.workerNodes[workerNodeKey].on( 'backPressure', this.handleWorkerNodeBackPressureEvent @@ -663,7 +680,7 @@ export abstract class AbstractPool< } private unsetTasksStealingOnBackPressure (): void { - for (const [workerNodeKey] of this.workerNodes.entries()) { + for (const workerNodeKey of this.workerNodes.keys()) { this.workerNodes[workerNodeKey].off( 'backPressure', this.handleWorkerNodeBackPressureEvent @@ -806,7 +823,7 @@ export abstract class AbstractPool< } } } - for (const [workerNodeKey] of this.workerNodes.entries()) { + for (const workerNodeKey of this.workerNodes.keys()) { this.registerWorkerMessageListener( workerNodeKey, taskFunctionOperationsListener @@ -840,6 +857,8 @@ export abstract class AbstractPool< if (typeof fn.taskFunction !== 'function') { throw new TypeError('taskFunction property must be a function') } + checkValidPriority(fn.priority) + checkValidWorkerChoiceStrategy(fn.strategy) const opResult = await this.sendTaskFunctionOperationToWorkers({ taskFunctionOperation: 'add', taskFunctionProperties: buildTaskFunctionProperties(name, fn), @@ -904,6 +923,25 @@ export abstract class AbstractPool< } } + /** + * Gets worker node task function priority, if any. + * + * @param workerNodeKey - The worker node key. + * @param name - The task function name. + * @returns The task function worker choice priority if the task function worker choice priority is defined, `undefined` otherwise. + */ + private readonly getWorkerNodeTaskFunctionPriority = ( + workerNodeKey: number, + name?: string + ): number | undefined => { + if (name != null) { + return this.getWorkerInfo(workerNodeKey)?.taskFunctionsProperties?.find( + (taskFunctionProperties: TaskFunctionProperties) => + taskFunctionProperties.name === name + )?.priority + } + } + /** * Gets the worker choice strategies registered in this pool. * @@ -983,13 +1021,15 @@ export abstract class AbstractPool< return } const timestamp = performance.now() - const workerNodeKey = this.chooseWorkerNode( + const taskFunctionStrategy = this.getTaskFunctionWorkerWorkerChoiceStrategy(name) - ) + const workerNodeKey = this.chooseWorkerNode(taskFunctionStrategy) const task: Task = { name: name ?? DEFAULT_TASK_NAME, // eslint-disable-next-line @typescript-eslint/consistent-type-assertions data: data ?? ({} as Data), + priority: this.getWorkerNodeTaskFunctionPriority(workerNodeKey, name), + strategy: taskFunctionStrategy, transferList, timestamp, taskId: randomUUID() @@ -1048,6 +1088,7 @@ export abstract class AbstractPool< } this.starting = true this.startMinimumNumberOfWorkers() + this.startTimestamp = performance.now() this.starting = false this.started = true } @@ -1072,6 +1113,7 @@ export abstract class AbstractPool< this.emitter?.emit(PoolEvents.destroy, this.info) this.emitter?.emitDestroy() this.readyEventEmitted = false + delete this.startTimestamp this.destroying = false this.started = false } @@ -1255,7 +1297,7 @@ export abstract class AbstractPool< } /** - * Chooses a worker node for the next task. + * Chooses a worker node for the next task given the worker choice strategy. * * @param workerChoiceStrategy - The worker choice strategy. * @returns The chosen worker node key @@ -1727,7 +1769,7 @@ export abstract class AbstractPool< ) if (sourceWorkerNode != null) { // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - const task = sourceWorkerNode.popTask()! + const task = sourceWorkerNode.dequeueTask(1)! this.handleTask(workerNodeKey, task) this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey) // eslint-disable-next-line @typescript-eslint/no-non-null-assertion @@ -1778,7 +1820,7 @@ export abstract class AbstractPool< } workerInfo.stealing = true // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - const task = sourceWorkerNode.popTask()! + const task = sourceWorkerNode.dequeueTask(1)! this.handleTask(workerNodeKey, task) // eslint-disable-next-line @typescript-eslint/no-non-null-assertion this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey, task.name!) @@ -1931,7 +1973,9 @@ export abstract class AbstractPool< this.opts.tasksQueueOptions?.size ?? getDefaultTasksQueueOptions( this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers - ).size + ).size, + tasksQueueBucketSize: + (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers) * 2 } ) // Flag the worker node as ready at pool startup. @@ -2012,8 +2056,11 @@ export abstract class AbstractPool< return tasksQueueSize } - private dequeueTask (workerNodeKey: number): Task | undefined { - return this.workerNodes[workerNodeKey].dequeueTask() + private dequeueTask ( + workerNodeKey: number, + bucket?: number + ): Task | undefined { + return this.workerNodes[workerNodeKey].dequeueTask(bucket) } private tasksQueueSize (workerNodeKey: number): number { @@ -2032,7 +2079,7 @@ export abstract class AbstractPool< } private flushTasksQueues (): void { - for (const [workerNodeKey] of this.workerNodes.entries()) { + for (const workerNodeKey of this.workerNodes.keys()) { this.flushTasksQueue(workerNodeKey) } }