1 import { isKillBehavior
, KillBehaviors
} from
'../worker/worker-options'
2 import type { IWorker
} from
'./abstract-pool'
3 import type { IPoolInternal
} from
'./pool-internal'
6 * Enumeration of worker choice strategies.
8 export const WorkerChoiceStrategies
= Object.freeze({
10 * Round robin worker selection strategy.
12 ROUND_ROBIN
: 'ROUND_ROBIN',
14 * Less recently used worker selection strategy.
16 LESS_RECENTLY_USED
: 'LESS_RECENTLY_USED'
20 * Worker choice strategy.
22 export type WorkerChoiceStrategy
= keyof
typeof WorkerChoiceStrategies
25 * Worker choice strategy interface.
27 * @template Worker Type of worker which manages the strategy.
29 interface IWorkerChoiceStrategy
<Worker
extends IWorker
> {
31 * Choose a worker in the pool.
37 * Selects the next worker in a round robin fashion.
39 * @template Worker Type of worker which manages the strategy.
40 * @template Data Type of data sent to the worker. This can only be serializable data.
41 * @template Response Type of response of execution. This can only be serializable data.
43 class RoundRobinWorkerChoiceStrategy
<Worker
extends IWorker
, Data
, Response
>
44 implements IWorkerChoiceStrategy
<Worker
> {
46 * Index for the next worker.
48 private nextWorkerIndex
: number = 0
51 * Constructs a worker choice strategy that selects in a round robin fashion.
53 * @param pool The pool instance.
56 private readonly pool
: IPoolInternal
<Worker
, Data
, Response
>
60 public choose (): Worker
{
61 const chosenWorker
= this.pool
.workers
[this.nextWorkerIndex
]
62 this.nextWorkerIndex
=
63 this.pool
.workers
.length
- 1 === this.nextWorkerIndex
65 : this.nextWorkerIndex
+ 1
71 * Selects the less recently used worker.
73 * @template Worker Type of worker which manages the strategy.
74 * @template Data Type of data sent to the worker. This can only be serializable data.
75 * @template Response Type of response of execution. This can only be serializable data.
77 class LessRecentlyUsedWorkerChoiceStrategy
<
78 Worker
extends IWorker
,
81 > implements IWorkerChoiceStrategy
<Worker
> {
83 * Constructs a worker choice strategy that selects based on less recently used.
85 * @param pool The pool instance.
88 private readonly pool
: IPoolInternal
<Worker
, Data
, Response
>
92 public choose (): Worker
{
93 let minNumberOfTasks
= Infinity
94 // A worker is always found because it picks the one with fewer tasks
95 let lessRecentlyUsedWorker
!: Worker
96 for (const [worker
, numberOfTasks
] of this.pool
.tasks
) {
97 if (numberOfTasks
=== 0) {
99 } else if (numberOfTasks
< minNumberOfTasks
) {
100 minNumberOfTasks
= numberOfTasks
101 lessRecentlyUsedWorker
= worker
104 return lessRecentlyUsedWorker
109 * Get the worker choice strategy instance.
111 * @param pool The pool instance.
112 * @param workerChoiceStrategy The worker choice strategy.
113 * @returns The worker choice strategy instance.
115 function getWorkerChoiceStrategy
<Worker
extends IWorker
, Data
, Response
> (
116 pool
: IPoolInternal
<Worker
, Data
, Response
>,
117 workerChoiceStrategy
: WorkerChoiceStrategy
= WorkerChoiceStrategies
.ROUND_ROBIN
118 ): IWorkerChoiceStrategy
<Worker
> {
119 switch (workerChoiceStrategy
) {
120 case WorkerChoiceStrategies
.ROUND_ROBIN
:
121 return new RoundRobinWorkerChoiceStrategy(pool
)
122 case WorkerChoiceStrategies
.LESS_RECENTLY_USED
:
123 return new LessRecentlyUsedWorkerChoiceStrategy(pool
)
126 `Worker choice strategy '${workerChoiceStrategy}' not found`
132 * Dynamically choose a worker.
134 * @template Worker Type of worker which manages the strategy.
135 * @template Data Type of data sent to the worker. This can only be serializable data.
136 * @template Response Type of response of execution. This can only be serializable data.
138 class DynamicPoolWorkerChoiceStrategy
<Worker
extends IWorker
, Data
, Response
>
139 implements IWorkerChoiceStrategy
<Worker
> {
140 private workerChoiceStrategy
: IWorkerChoiceStrategy
<Worker
>
143 * Constructs a worker choice strategy for dynamical pools.
145 * @param pool The pool instance.
146 * @param workerChoiceStrategy The worker choice strategy when the pull is full.
149 private readonly pool
: IPoolInternal
<Worker
, Data
, Response
>,
150 workerChoiceStrategy
: WorkerChoiceStrategy
= WorkerChoiceStrategies
.ROUND_ROBIN
152 this.workerChoiceStrategy
= getWorkerChoiceStrategy(
159 * Find a free worker based on number of tasks the worker has applied.
161 * If a worker was found that has `0` tasks, it is detected as free and will be returned.
163 * If no free worker was found, `null` will be returned.
165 * @returns A free worker if there was one, otherwise `null`.
167 private findFreeWorkerBasedOnTasks (): Worker
| null {
168 for (const [worker
, numberOfTasks
] of this.pool
.tasks
) {
169 if (numberOfTasks
=== 0) {
170 // A worker is free, use it
178 public choose (): Worker
{
179 const freeWorker
= this.findFreeWorkerBasedOnTasks()
184 if (this.pool
.workers
.length
=== this.pool
.max
) {
185 this.pool
.emitter
.emit('FullPool')
186 return this.workerChoiceStrategy
.choose()
189 // All workers are busy, create a new worker
190 const workerCreated
= this.pool
.createAndSetupWorker()
191 this.pool
.registerWorkerMessageListener(workerCreated
, message
=> {
192 const tasksInProgress
= this.pool
.tasks
.get(workerCreated
)
194 isKillBehavior(KillBehaviors
.HARD
, message
.kill
) ||
195 tasksInProgress
=== 0
197 // Kill received from the worker, means that no new tasks are submitted to that worker for a while ( > maxInactiveTime)
198 void this.pool
.destroyWorker(workerCreated
)
206 * The worker choice strategy context.
208 * @template Worker Type of worker.
209 * @template Data Type of data sent to the worker. This can only be serializable data.
210 * @template Response Type of response of execution. This can only be serializable data.
212 export class WorkerChoiceStrategyContext
<
213 Worker
extends IWorker
,
217 // Will be set by setter in constructor
218 private workerChoiceStrategy
!: IWorkerChoiceStrategy
<Worker
>
221 * Worker choice strategy context constructor.
223 * @param pool The pool instance.
224 * @param workerChoiceStrategy The worker choice strategy.
227 private readonly pool
: IPoolInternal
<Worker
, Data
, Response
>,
228 workerChoiceStrategy
: WorkerChoiceStrategy
= WorkerChoiceStrategies
.ROUND_ROBIN
230 this.setWorkerChoiceStrategy(workerChoiceStrategy
)
234 * Get the worker choice strategy instance specific to the pool type.
236 * @param workerChoiceStrategy The worker choice strategy.
237 * @returns The worker choice strategy instance for the pool type.
239 private getPoolWorkerChoiceStrategy (
240 workerChoiceStrategy
: WorkerChoiceStrategy
= WorkerChoiceStrategies
.ROUND_ROBIN
241 ): IWorkerChoiceStrategy
<Worker
> {
242 if (this.pool
.isDynamic()) {
243 return new DynamicPoolWorkerChoiceStrategy(
248 return getWorkerChoiceStrategy(this.pool
, workerChoiceStrategy
)
252 * Set the worker choice strategy to use in the context.
254 * @param workerChoiceStrategy The worker choice strategy to set.
256 public setWorkerChoiceStrategy (
257 workerChoiceStrategy
: WorkerChoiceStrategy
259 this.workerChoiceStrategy
= this.getPoolWorkerChoiceStrategy(
265 * Choose a worker with the underlying selection strategy.
267 * @returns The chosen one.
269 public execute (): Worker
{
270 return this.workerChoiceStrategy
.choose()