-import { isKillBehavior, KillBehaviors } from '../worker/worker-options'
import type { IWorker } from './abstract-pool'
import type { IPoolInternal } from './pool-internal'
+import { PoolType } from './pool-internal'
/**
* Enumeration of worker choice strategies.
* @template Response Type of response of execution. This can only be serializable data.
*/
class RoundRobinWorkerChoiceStrategy<Worker extends IWorker, Data, Response>
- implements IWorkerChoiceStrategy<Worker> {
+ implements IWorkerChoiceStrategy<Worker>
+{
/**
* Index for the next worker.
*/
Worker extends IWorker,
Data,
Response
-> implements IWorkerChoiceStrategy<Worker> {
+> implements IWorkerChoiceStrategy<Worker>
+{
/**
* Constructs a worker choice strategy that selects based on less recently used.
*
/** @inheritdoc */
public choose (): Worker {
+ const isPoolDynamic = this.pool.type === PoolType.DYNAMIC
let minNumberOfTasks = Infinity
// A worker is always found because it picks the one with fewer tasks
let lessRecentlyUsedWorker!: Worker
for (const [worker, numberOfTasks] of this.pool.tasks) {
- if (numberOfTasks === 0) {
+ if (!isPoolDynamic && numberOfTasks === 0) {
return worker
} else if (numberOfTasks < minNumberOfTasks) {
- minNumberOfTasks = numberOfTasks
lessRecentlyUsedWorker = worker
+ minNumberOfTasks = numberOfTasks
}
}
return lessRecentlyUsedWorker
}
}
-/**
- * Get the worker choice strategy instance.
- *
- * @param pool The pool instance.
- * @param workerChoiceStrategy The worker choice strategy.
- * @returns The worker choice strategy instance.
- */
-function getWorkerChoiceStrategy<Worker extends IWorker, Data, Response> (
- pool: IPoolInternal<Worker, Data, Response>,
- workerChoiceStrategy: WorkerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
-): IWorkerChoiceStrategy<Worker> {
- switch (workerChoiceStrategy) {
- case WorkerChoiceStrategies.ROUND_ROBIN:
- return new RoundRobinWorkerChoiceStrategy(pool)
- case WorkerChoiceStrategies.LESS_RECENTLY_USED:
- return new LessRecentlyUsedWorkerChoiceStrategy(pool)
- default:
- throw new Error(
- `Worker choice strategy '${workerChoiceStrategy}' not found`
- )
- }
-}
-
/**
* Dynamically choose a worker.
*
* @template Response Type of response of execution. This can only be serializable data.
*/
class DynamicPoolWorkerChoiceStrategy<Worker extends IWorker, Data, Response>
- implements IWorkerChoiceStrategy<Worker> {
+ implements IWorkerChoiceStrategy<Worker>
+{
private workerChoiceStrategy: IWorkerChoiceStrategy<Worker>
/**
* Constructs a worker choice strategy for dynamical pools.
*
* @param pool The pool instance.
- * @param workerChoiceStrategy The worker choice strategy when the pull is full.
+ * @param createDynamicallyWorkerCallback The worker creation callback for dynamic pool.
+ * @param workerChoiceStrategy The worker choice strategy when the pull is busy.
*/
public constructor (
private readonly pool: IPoolInternal<Worker, Data, Response>,
+ private createDynamicallyWorkerCallback: () => Worker,
workerChoiceStrategy: WorkerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
) {
- this.workerChoiceStrategy = getWorkerChoiceStrategy(
- this.pool,
- workerChoiceStrategy
- )
- }
-
- /**
- * Find a free worker based on number of tasks the worker has applied.
- *
- * If a worker was found that has `0` tasks, it is detected as free and will be returned.
- *
- * If no free worker was found, `null` will be returned.
- *
- * @returns A free worker if there was one, otherwise `null`.
- */
- private findFreeWorkerBasedOnTasks (): Worker | null {
- for (const [worker, numberOfTasks] of this.pool.tasks) {
- if (numberOfTasks === 0) {
- // A worker is free, use it
- return worker
- }
- }
- return null
+ this.workerChoiceStrategy =
+ SelectionStrategiesUtils.getWorkerChoiceStrategy(
+ this.pool,
+ workerChoiceStrategy
+ )
}
/** @inheritdoc */
public choose (): Worker {
- const freeWorker = this.findFreeWorkerBasedOnTasks()
- if (freeWorker) {
- return freeWorker
+ const freeTaskMapEntry = this.pool.findFreeTasksMapEntry()
+ if (freeTaskMapEntry) {
+ return freeTaskMapEntry[0]
}
- if (this.pool.workers.length === this.pool.max) {
- this.pool.emitter.emit('FullPool')
+ if (this.pool.busy) {
return this.workerChoiceStrategy.choose()
}
// All workers are busy, create a new worker
- const workerCreated = this.pool.createAndSetupWorker()
- this.pool.registerWorkerMessageListener(workerCreated, message => {
- const tasksInProgress = this.pool.tasks.get(workerCreated)
- if (
- isKillBehavior(KillBehaviors.HARD, message.kill) ||
- tasksInProgress === 0
- ) {
- // Kill received from the worker, means that no new tasks are submitted to that worker for a while ( > maxInactiveTime)
- void this.pool.destroyWorker(workerCreated)
- }
- })
- return workerCreated
+ return this.createDynamicallyWorkerCallback()
}
}
* Worker choice strategy context constructor.
*
* @param pool The pool instance.
+ * @param createDynamicallyWorkerCallback The worker creation callback for dynamic pool.
* @param workerChoiceStrategy The worker choice strategy.
*/
public constructor (
private readonly pool: IPoolInternal<Worker, Data, Response>,
+ private createDynamicallyWorkerCallback: () => Worker,
workerChoiceStrategy: WorkerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
) {
this.setWorkerChoiceStrategy(workerChoiceStrategy)
private getPoolWorkerChoiceStrategy (
workerChoiceStrategy: WorkerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
): IWorkerChoiceStrategy<Worker> {
- if (this.pool.isDynamic()) {
+ if (this.pool.type === PoolType.DYNAMIC) {
return new DynamicPoolWorkerChoiceStrategy(
this.pool,
+ this.createDynamicallyWorkerCallback,
workerChoiceStrategy
)
}
- return getWorkerChoiceStrategy(this.pool, workerChoiceStrategy)
+ return SelectionStrategiesUtils.getWorkerChoiceStrategy(
+ this.pool,
+ workerChoiceStrategy
+ )
}
/**
public setWorkerChoiceStrategy (
workerChoiceStrategy: WorkerChoiceStrategy
): void {
- this.workerChoiceStrategy = this.getPoolWorkerChoiceStrategy(
- workerChoiceStrategy
- )
+ this.workerChoiceStrategy =
+ this.getPoolWorkerChoiceStrategy(workerChoiceStrategy)
}
/**
return this.workerChoiceStrategy.choose()
}
}
+
+/**
+ * Worker selection strategies helpers class.
+ */
+class SelectionStrategiesUtils {
+ /**
+ * Get the worker choice strategy instance.
+ *
+ * @param pool The pool instance.
+ * @param workerChoiceStrategy The worker choice strategy.
+ * @returns The worker choice strategy instance.
+ */
+ public static getWorkerChoiceStrategy<
+ Worker extends IWorker,
+ Data,
+ Response
+ > (
+ pool: IPoolInternal<Worker, Data, Response>,
+ workerChoiceStrategy: WorkerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
+ ): IWorkerChoiceStrategy<Worker> {
+ switch (workerChoiceStrategy) {
+ case WorkerChoiceStrategies.ROUND_ROBIN:
+ return new RoundRobinWorkerChoiceStrategy(pool)
+ case WorkerChoiceStrategies.LESS_RECENTLY_USED:
+ return new LessRecentlyUsedWorkerChoiceStrategy(pool)
+ default:
+ throw new Error(
+ `Worker choice strategy '${workerChoiceStrategy}' not found`
+ )
+ }
+ }
+}