1 import { isKillBehavior
, KillBehaviors
} from
'../worker/worker-options'
2 import type { IWorker
} from
'./abstract-pool'
3 import type { IPoolInternal
} from
'./pool-internal'
6 * Enumeration of worker choice strategies.
8 export const WorkerChoiceStrategies
= Object.freeze({
10 * Round robin worker selection strategy.
12 ROUND_ROBIN
: 'ROUND_ROBIN',
14 * Less recently used worker selection strategy.
16 LESS_RECENTLY_USED
: 'LESS_RECENTLY_USED'
20 * Worker choice strategy.
22 export type WorkerChoiceStrategy
= keyof
typeof WorkerChoiceStrategies
25 * Worker choice strategy interface.
27 * @template Worker Type of worker which manages the strategy.
29 interface IWorkerChoiceStrategy
<Worker
extends IWorker
> {
31 * Choose a worker in the pool.
37 * Selects the next worker in a round robin fashion.
39 * @template Worker Type of worker which manages the strategy.
40 * @template Data Type of data sent to the worker. This can only be serializable data.
41 * @template Response Type of response of execution. This can only be serializable data.
43 class RoundRobinWorkerChoiceStrategy
<Worker
extends IWorker
, Data
, Response
>
44 implements IWorkerChoiceStrategy
<Worker
> {
46 * Index for the next worker.
48 private nextWorkerIndex
: number = 0
51 * Constructs a worker choice strategy that selects in a round robin fashion.
53 * @param pool The pool instance.
56 private readonly pool
: IPoolInternal
<Worker
, Data
, Response
>
60 public choose (): Worker
{
61 const chosenWorker
= this.pool
.workers
[this.nextWorkerIndex
]
62 this.nextWorkerIndex
=
63 this.pool
.workers
.length
- 1 === this.nextWorkerIndex
65 : this.nextWorkerIndex
+ 1
71 * Selects the less recently used worker.
73 * @template Worker Type of worker which manages the strategy.
74 * @template Data Type of data sent to the worker. This can only be serializable data.
75 * @template Response Type of response of execution. This can only be serializable data.
77 class LessRecentlyUsedWorkerChoiceStrategy
<
78 Worker
extends IWorker
,
81 > implements IWorkerChoiceStrategy
<Worker
> {
83 * Constructs a worker choice strategy that selects based on less recently used.
85 * @param pool The pool instance.
88 private readonly pool
: IPoolInternal
<Worker
, Data
, Response
>
92 public choose (): Worker
{
93 const isPoolDynamic
= this.pool
.dynamic
94 let minNumberOfTasks
= Infinity
95 // A worker is always found because it picks the one with fewer tasks
96 let lessRecentlyUsedWorker
!: Worker
97 for (const [worker
, numberOfTasks
] of this.pool
.tasks
) {
98 if (!isPoolDynamic
&& numberOfTasks
=== 0) {
100 } else if (numberOfTasks
< minNumberOfTasks
) {
101 minNumberOfTasks
= numberOfTasks
102 lessRecentlyUsedWorker
= worker
105 return lessRecentlyUsedWorker
110 * Dynamically choose a worker.
112 * @template Worker Type of worker which manages the strategy.
113 * @template Data Type of data sent to the worker. This can only be serializable data.
114 * @template Response Type of response of execution. This can only be serializable data.
116 class DynamicPoolWorkerChoiceStrategy
<Worker
extends IWorker
, Data
, Response
>
117 implements IWorkerChoiceStrategy
<Worker
> {
118 private workerChoiceStrategy
: IWorkerChoiceStrategy
<Worker
>
121 * Constructs a worker choice strategy for dynamical pools.
123 * @param pool The pool instance.
124 * @param workerChoiceStrategy The worker choice strategy when the pull is full.
127 private readonly pool
: IPoolInternal
<Worker
, Data
, Response
>,
128 workerChoiceStrategy
: WorkerChoiceStrategy
= WorkerChoiceStrategies
.ROUND_ROBIN
130 this.workerChoiceStrategy
= SelectionStrategiesUtils
.getWorkerChoiceStrategy(
137 public choose (): Worker
{
138 const freeWorker
= SelectionStrategiesUtils
.findFreeWorkerBasedOnTasks(
145 if (this.pool
.workers
.length
=== this.pool
.max
) {
146 this.pool
.emitter
.emit('FullPool')
147 return this.workerChoiceStrategy
.choose()
150 // All workers are busy, create a new worker
151 const workerCreated
= this.pool
.createAndSetupWorker()
152 this.pool
.registerWorkerMessageListener(workerCreated
, message
=> {
153 const tasksInProgress
= this.pool
.tasks
.get(workerCreated
)
155 isKillBehavior(KillBehaviors
.HARD
, message
.kill
) ||
156 tasksInProgress
=== 0
158 // Kill received from the worker, means that no new tasks are submitted to that worker for a while ( > maxInactiveTime)
159 void this.pool
.destroyWorker(workerCreated
)
167 * The worker choice strategy context.
169 * @template Worker Type of worker.
170 * @template Data Type of data sent to the worker. This can only be serializable data.
171 * @template Response Type of response of execution. This can only be serializable data.
173 export class WorkerChoiceStrategyContext
<
174 Worker
extends IWorker
,
178 // Will be set by setter in constructor
179 private workerChoiceStrategy
!: IWorkerChoiceStrategy
<Worker
>
182 * Worker choice strategy context constructor.
184 * @param pool The pool instance.
185 * @param workerChoiceStrategy The worker choice strategy.
188 private readonly pool
: IPoolInternal
<Worker
, Data
, Response
>,
189 workerChoiceStrategy
: WorkerChoiceStrategy
= WorkerChoiceStrategies
.ROUND_ROBIN
191 this.setWorkerChoiceStrategy(workerChoiceStrategy
)
195 * Get the worker choice strategy instance specific to the pool type.
197 * @param workerChoiceStrategy The worker choice strategy.
198 * @returns The worker choice strategy instance for the pool type.
200 private getPoolWorkerChoiceStrategy (
201 workerChoiceStrategy
: WorkerChoiceStrategy
= WorkerChoiceStrategies
.ROUND_ROBIN
202 ): IWorkerChoiceStrategy
<Worker
> {
203 if (this.pool
.dynamic
) {
204 return new DynamicPoolWorkerChoiceStrategy(
209 return SelectionStrategiesUtils
.getWorkerChoiceStrategy(
216 * Set the worker choice strategy to use in the context.
218 * @param workerChoiceStrategy The worker choice strategy to set.
220 public setWorkerChoiceStrategy (
221 workerChoiceStrategy
: WorkerChoiceStrategy
223 this.workerChoiceStrategy
= this.getPoolWorkerChoiceStrategy(
229 * Choose a worker with the underlying selection strategy.
231 * @returns The chosen one.
233 public execute (): Worker
{
234 return this.workerChoiceStrategy
.choose()
239 * Worker selection strategies helpers.
241 class SelectionStrategiesUtils
{
243 * Find a free worker based on number of tasks the worker has applied.
245 * If a worker was found that has `0` tasks, it is detected as free and will be returned.
247 * If no free worker was found, `null` will be returned.
249 * @param pool The pool instance.
250 * @returns A free worker if there was one, otherwise `null`.
252 public static findFreeWorkerBasedOnTasks
<
253 Worker
extends IWorker
,
256 > (pool
: IPoolInternal
<Worker
, Data
, Response
>): Worker
| null {
257 for (const [worker
, numberOfTasks
] of pool
.tasks
) {
258 if (numberOfTasks
=== 0) {
259 // A worker is free, use it
267 * Get the worker choice strategy instance.
269 * @param pool The pool instance.
270 * @param workerChoiceStrategy The worker choice strategy.
271 * @returns The worker choice strategy instance.
273 public static getWorkerChoiceStrategy
<
274 Worker
extends IWorker
,
278 pool
: IPoolInternal
<Worker
, Data
, Response
>,
279 workerChoiceStrategy
: WorkerChoiceStrategy
= WorkerChoiceStrategies
.ROUND_ROBIN
280 ): IWorkerChoiceStrategy
<Worker
> {
281 switch (workerChoiceStrategy
) {
282 case WorkerChoiceStrategies
.ROUND_ROBIN
:
283 return new RoundRobinWorkerChoiceStrategy(pool
)
284 case WorkerChoiceStrategies
.LESS_RECENTLY_USED
:
285 return new LessRecentlyUsedWorkerChoiceStrategy(pool
)
288 `Worker choice strategy '${workerChoiceStrategy}' not found`