X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;ds=sidebyside;f=src%2Fpools%2Fselection-strategies.ts;h=64ea7ee58509057d6cae4e235b07350f8f087e41;hb=767e23e80a540dcedab8a2aa5203b8526e3df7b1;hp=5c40a71b404e3dadb74a468e282f9e5accb4ac95;hpb=a35560bac09e829e1e19f88f8fd1d71a64c9d50b;p=poolifier.git diff --git a/src/pools/selection-strategies.ts b/src/pools/selection-strategies.ts index 5c40a71b..64ea7ee5 100644 --- a/src/pools/selection-strategies.ts +++ b/src/pools/selection-strategies.ts @@ -1,6 +1,6 @@ -import { isKillBehavior, KillBehaviors } from '../worker/worker-options' import type { IWorker } from './abstract-pool' import type { IPoolInternal } from './pool-internal' +import { PoolType } from './pool-internal' /** * Enumeration of worker choice strategies. @@ -41,7 +41,8 @@ interface IWorkerChoiceStrategy { * @template Response Type of response of execution. This can only be serializable data. */ class RoundRobinWorkerChoiceStrategy - implements IWorkerChoiceStrategy { + implements IWorkerChoiceStrategy +{ /** * Index for the next worker. */ @@ -78,7 +79,8 @@ class LessRecentlyUsedWorkerChoiceStrategy< Worker extends IWorker, Data, Response -> implements IWorkerChoiceStrategy { +> implements IWorkerChoiceStrategy +{ /** * Constructs a worker choice strategy that selects based on less recently used. * @@ -90,44 +92,22 @@ class LessRecentlyUsedWorkerChoiceStrategy< /** @inheritdoc */ public choose (): Worker { + const isPoolDynamic = this.pool.type === PoolType.DYNAMIC let minNumberOfTasks = Infinity // A worker is always found because it picks the one with fewer tasks let lessRecentlyUsedWorker!: Worker for (const [worker, numberOfTasks] of this.pool.tasks) { - if (numberOfTasks === 0) { + if (!isPoolDynamic && numberOfTasks === 0) { return worker } else if (numberOfTasks < minNumberOfTasks) { - minNumberOfTasks = numberOfTasks lessRecentlyUsedWorker = worker + minNumberOfTasks = numberOfTasks } } return lessRecentlyUsedWorker } } -/** - * Get the worker choice strategy instance. - * - * @param pool The pool instance. - * @param workerChoiceStrategy The worker choice strategy. - * @returns The worker choice strategy instance. - */ -function getWorkerChoiceStrategy ( - pool: IPoolInternal, - workerChoiceStrategy: WorkerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN -): IWorkerChoiceStrategy { - switch (workerChoiceStrategy) { - case WorkerChoiceStrategies.ROUND_ROBIN: - return new RoundRobinWorkerChoiceStrategy(pool) - case WorkerChoiceStrategies.LESS_RECENTLY_USED: - return new LessRecentlyUsedWorkerChoiceStrategy(pool) - default: - throw new Error( - `Worker choice strategy '${workerChoiceStrategy}' not found` - ) - } -} - /** * Dynamically choose a worker. * @@ -136,69 +116,42 @@ function getWorkerChoiceStrategy ( * @template Response Type of response of execution. This can only be serializable data. */ class DynamicPoolWorkerChoiceStrategy - implements IWorkerChoiceStrategy { + implements IWorkerChoiceStrategy +{ private workerChoiceStrategy: IWorkerChoiceStrategy /** * Constructs a worker choice strategy for dynamical pools. * * @param pool The pool instance. - * @param workerChoiceStrategy The worker choice strategy when the pull is full. + * @param createDynamicallyWorkerCallback The worker creation callback for dynamic pool. + * @param workerChoiceStrategy The worker choice strategy when the pull is busy. */ public constructor ( private readonly pool: IPoolInternal, + private createDynamicallyWorkerCallback: () => Worker, workerChoiceStrategy: WorkerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN ) { - this.workerChoiceStrategy = getWorkerChoiceStrategy( - this.pool, - workerChoiceStrategy - ) - } - - /** - * Find a free worker based on number of tasks the worker has applied. - * - * If a worker was found that has `0` tasks, it is detected as free and will be returned. - * - * If no free worker was found, `null` will be returned. - * - * @returns A free worker if there was one, otherwise `null`. - */ - private findFreeWorkerBasedOnTasks (): Worker | null { - for (const [worker, numberOfTasks] of this.pool.tasks) { - if (numberOfTasks === 0) { - // A worker is free, use it - return worker - } - } - return null + this.workerChoiceStrategy = + SelectionStrategiesUtils.getWorkerChoiceStrategy( + this.pool, + workerChoiceStrategy + ) } /** @inheritdoc */ public choose (): Worker { - const freeWorker = this.findFreeWorkerBasedOnTasks() - if (freeWorker) { - return freeWorker + const freeTaskMapEntry = this.pool.findFreeTasksMapEntry() + if (freeTaskMapEntry) { + return freeTaskMapEntry[0] } - if (this.pool.workers.length === this.pool.max) { - this.pool.emitter.emit('FullPool') + if (this.pool.busy) { return this.workerChoiceStrategy.choose() } // All workers are busy, create a new worker - const workerCreated = this.pool.createAndSetupWorker() - this.pool.registerWorkerMessageListener(workerCreated, message => { - const tasksInProgress = this.pool.tasks.get(workerCreated) - if ( - isKillBehavior(KillBehaviors.HARD, message.kill) || - tasksInProgress === 0 - ) { - // Kill received from the worker, means that no new tasks are submitted to that worker for a while ( > maxInactiveTime) - void this.pool.destroyWorker(workerCreated) - } - }) - return workerCreated + return this.createDynamicallyWorkerCallback() } } @@ -221,10 +174,12 @@ export class WorkerChoiceStrategyContext< * Worker choice strategy context constructor. * * @param pool The pool instance. + * @param createDynamicallyWorkerCallback The worker creation callback for dynamic pool. * @param workerChoiceStrategy The worker choice strategy. */ public constructor ( private readonly pool: IPoolInternal, + private createDynamicallyWorkerCallback: () => Worker, workerChoiceStrategy: WorkerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN ) { this.setWorkerChoiceStrategy(workerChoiceStrategy) @@ -239,13 +194,17 @@ export class WorkerChoiceStrategyContext< private getPoolWorkerChoiceStrategy ( workerChoiceStrategy: WorkerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN ): IWorkerChoiceStrategy { - if (this.pool.isDynamic()) { + if (this.pool.type === PoolType.DYNAMIC) { return new DynamicPoolWorkerChoiceStrategy( this.pool, + this.createDynamicallyWorkerCallback, workerChoiceStrategy ) } - return getWorkerChoiceStrategy(this.pool, workerChoiceStrategy) + return SelectionStrategiesUtils.getWorkerChoiceStrategy( + this.pool, + workerChoiceStrategy + ) } /** @@ -256,9 +215,8 @@ export class WorkerChoiceStrategyContext< public setWorkerChoiceStrategy ( workerChoiceStrategy: WorkerChoiceStrategy ): void { - this.workerChoiceStrategy = this.getPoolWorkerChoiceStrategy( - workerChoiceStrategy - ) + this.workerChoiceStrategy = + this.getPoolWorkerChoiceStrategy(workerChoiceStrategy) } /** @@ -270,3 +228,35 @@ export class WorkerChoiceStrategyContext< return this.workerChoiceStrategy.choose() } } + +/** + * Worker selection strategies helpers class. + */ +class SelectionStrategiesUtils { + /** + * Get the worker choice strategy instance. + * + * @param pool The pool instance. + * @param workerChoiceStrategy The worker choice strategy. + * @returns The worker choice strategy instance. + */ + public static getWorkerChoiceStrategy< + Worker extends IWorker, + Data, + Response + > ( + pool: IPoolInternal, + workerChoiceStrategy: WorkerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN + ): IWorkerChoiceStrategy { + switch (workerChoiceStrategy) { + case WorkerChoiceStrategies.ROUND_ROBIN: + return new RoundRobinWorkerChoiceStrategy(pool) + case WorkerChoiceStrategies.LESS_RECENTLY_USED: + return new LessRecentlyUsedWorkerChoiceStrategy(pool) + default: + throw new Error( + `Worker choice strategy '${workerChoiceStrategy}' not found` + ) + } + } +}