3 PromiseWorkerResponseWrapper
4 } from
'../utility-types'
5 import { EMPTY_FUNCTION
} from
'../utils'
6 import { KillBehaviors
, isKillBehavior
} from
'../worker/worker-options'
7 import type { PoolOptions
} from
'./pool'
8 import { PoolEmitter
} from
'./pool'
9 import type { IPoolInternal
, TasksUsage
, WorkerType
} from
'./pool-internal'
10 import { PoolType
} from
'./pool-internal'
11 import type { IPoolWorker
} from
'./pool-worker'
13 WorkerChoiceStrategies
,
14 type WorkerChoiceStrategy
15 } from
'./selection-strategies/selection-strategies-types'
16 import { WorkerChoiceStrategyContext
} from
'./selection-strategies/worker-choice-strategy-context'
19 * Base class that implements some shared logic for all poolifier pools.
21 * @typeParam Worker - Type of worker which manages this pool.
22 * @typeParam Data - Type of data sent to the worker. This can only be serializable data.
23 * @typeParam Response - Type of response of execution. This can only be serializable data.
25 export abstract class AbstractPool
<
26 Worker
extends IPoolWorker
,
29 > implements IPoolInternal
<Worker
, Data
, Response
> {
31 public readonly workers
: Map
<number, WorkerType
<Worker
>> = new Map
<
37 public readonly emitter
?: PoolEmitter
40 * Id of the next worker.
42 protected nextWorkerId
: number = 0
47 * - `key`: This is the message id of each submitted task.
48 * - `value`: An object that contains the worker, the resolve function and the reject function.
50 * When we receive a message from the worker we get a map entry and resolve/reject the promise based on the message.
52 protected promiseMap
: Map
<
54 PromiseWorkerResponseWrapper
<Worker
, Response
>
55 > = new Map
<number, PromiseWorkerResponseWrapper
<Worker
, Response
>>()
58 * Id of the next message.
60 protected nextMessageId
: number = 0
63 * Worker choice strategy instance implementing the worker choice algorithm.
65 * Default to a strategy implementing a round robin algorithm.
67 protected workerChoiceStrategyContext
: WorkerChoiceStrategyContext
<
74 * Constructs a new poolifier pool.
76 * @param numberOfWorkers - Number of workers that this pool should manage.
77 * @param filePath - Path to the worker-file.
78 * @param opts - Options for the pool.
81 public readonly numberOfWorkers
: number,
82 public readonly filePath
: string,
83 public readonly opts
: PoolOptions
<Worker
>
86 throw new Error('Cannot start a pool from a worker!')
88 this.checkNumberOfWorkers(this.numberOfWorkers
)
89 this.checkFilePath(this.filePath
)
90 this.checkPoolOptions(this.opts
)
93 for (let i
= 1; i
<= this.numberOfWorkers
; i
++) {
94 this.createAndSetupWorker()
97 if (this.opts
.enableEvents
=== true) {
98 this.emitter
= new PoolEmitter()
100 this.workerChoiceStrategyContext
= new WorkerChoiceStrategyContext(
103 const workerCreated
= this.createAndSetupWorker()
104 this.registerWorkerMessageListener(workerCreated
, message
=> {
106 isKillBehavior(KillBehaviors
.HARD
, message
.kill
) ||
107 this.getWorkerRunningTasks(workerCreated
) === 0
109 // Kill received from the worker, means that no new tasks are submitted to that worker for a while ( > maxInactiveTime)
110 void this.destroyWorker(workerCreated
)
115 this.opts
.workerChoiceStrategy
119 private checkFilePath (filePath
: string): void {
122 (typeof filePath
=== 'string' && filePath
.trim().length
=== 0)
124 throw new Error('Please specify a file with a worker implementation')
128 private checkNumberOfWorkers (numberOfWorkers
: number): void {
129 if (numberOfWorkers
== null) {
131 'Cannot instantiate a pool without specifying the number of workers'
133 } else if (!Number.isSafeInteger(numberOfWorkers
)) {
135 'Cannot instantiate a pool with a non integer number of workers'
137 } else if (numberOfWorkers
< 0) {
138 throw new RangeError(
139 'Cannot instantiate a pool with a negative number of workers'
141 } else if (this.type === PoolType
.FIXED
&& numberOfWorkers
=== 0) {
142 throw new Error('Cannot instantiate a fixed pool with no worker')
146 private checkPoolOptions (opts
: PoolOptions
<Worker
>): void {
147 this.opts
.workerChoiceStrategy
=
148 opts
.workerChoiceStrategy
?? WorkerChoiceStrategies
.ROUND_ROBIN
149 this.opts
.enableEvents
= opts
.enableEvents
?? true
153 public abstract get
type (): PoolType
156 public get
numberOfRunningTasks (): number {
157 return this.promiseMap
.size
163 * @param worker - The worker.
164 * @returns The worker key.
166 private getWorkerKey (worker
: Worker
): number | undefined {
167 return [...this.workers
].find(([, value
]) => value
.worker
=== worker
)?.[0]
171 public getWorkerRunningTasks (worker
: Worker
): number | undefined {
172 return this.workers
.get(this.getWorkerKey(worker
) as number)?.tasksUsage
177 public getWorkerAverageTasksRunTime (worker
: Worker
): number | undefined {
178 return this.workers
.get(this.getWorkerKey(worker
) as number)?.tasksUsage
183 public setWorkerChoiceStrategy (
184 workerChoiceStrategy
: WorkerChoiceStrategy
186 this.opts
.workerChoiceStrategy
= workerChoiceStrategy
187 for (const [key
, value
] of this.workers
) {
188 this.setWorker(key
, value
.worker
, {
195 this.workerChoiceStrategyContext
.setWorkerChoiceStrategy(
201 public abstract get
busy (): boolean
203 protected internalGetBusyStatus (): boolean {
205 this.numberOfRunningTasks
>= this.numberOfWorkers
&&
206 this.findFreeWorker() === false
211 public findFreeWorker (): Worker
| false {
212 for (const value
of this.workers
.values()) {
213 if (value
.tasksUsage
.running
=== 0) {
214 // A worker is free, return the matching worker
222 public async execute (data
: Data
): Promise
<Response
> {
223 // Configure worker to handle message with the specified task
224 const worker
= this.chooseWorker()
225 const res
= this.internalExecute(worker
, this.nextMessageId
)
226 this.checkAndEmitBusy()
227 this.sendToWorker(worker
, {
228 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
229 data
: data
?? ({} as Data
),
230 id
: this.nextMessageId
233 // eslint-disable-next-line @typescript-eslint/return-await
238 public async destroy (): Promise
<void> {
240 [...this.workers
].map(async ([, value
]) => {
241 await this.destroyWorker(value
.worker
)
247 * Shutdowns given worker.
249 * @param worker - A worker within `workers`.
251 protected abstract destroyWorker (worker
: Worker
): void | Promise
<void>
254 * Setup hook that can be overridden by a Poolifier pool implementation
255 * to run code before workers are created in the abstract constructor.
257 protected setupHook (): void {
262 * Should return whether the worker is the main worker or not.
264 protected abstract isMain (): boolean
267 * Hook executed before the worker task promise resolution.
270 * @param worker - The worker.
272 protected beforePromiseWorkerResponseHook (worker
: Worker
): void {
273 this.increaseWorkerRunningTasks(worker
)
277 * Hook executed after the worker task promise resolution.
280 * @param message - The received message.
281 * @param promise - The Promise response.
283 protected afterPromiseWorkerResponseHook (
284 message
: MessageValue
<Response
>,
285 promise
: PromiseWorkerResponseWrapper
<Worker
, Response
>
287 this.decreaseWorkerRunningTasks(promise
.worker
)
288 this.stepWorkerRunTasks(promise
.worker
, 1)
289 this.updateWorkerTasksRunTime(promise
.worker
, message
.taskRunTime
)
293 * Removes the given worker from the pool.
295 * @param worker - The worker that will be removed.
297 protected removeWorker (worker
: Worker
): void {
298 this.workers
.delete(this.getWorkerKey(worker
) as number)
303 * Chooses a worker for the next task.
305 * The default implementation uses a round robin algorithm to distribute the load.
309 protected chooseWorker (): Worker
{
310 return this.workerChoiceStrategyContext
.execute()
314 * Sends a message to the given worker.
316 * @param worker - The worker which should receive the message.
317 * @param message - The message.
319 protected abstract sendToWorker (
321 message
: MessageValue
<Data
>
325 * Registers a listener callback on a given worker.
327 * @param worker - The worker which should register a listener.
328 * @param listener - The message listener callback.
330 protected abstract registerWorkerMessageListener
<
331 Message
extends Data
| Response
332 >(worker
: Worker
, listener
: (message
: MessageValue
<Message
>) => void): void
335 * Returns a newly created worker.
337 protected abstract createWorker (): Worker
340 * Function that can be hooked up when a worker has been newly created and moved to the workers registry.
342 * Can be used to update the `maxListeners` or binding the `main-worker`\<-\>`worker` connection if not bind by default.
344 * @param worker - The newly created worker.
346 protected abstract afterWorkerSetup (worker
: Worker
): void
349 * Creates a new worker for this pool and sets it up completely.
351 * @returns New, completely set up worker.
353 protected createAndSetupWorker (): Worker
{
354 const worker
= this.createWorker()
356 worker
.on('message', this.opts
.messageHandler
?? EMPTY_FUNCTION
)
357 worker
.on('error', this.opts
.errorHandler
?? EMPTY_FUNCTION
)
358 worker
.on('online', this.opts
.onlineHandler
?? EMPTY_FUNCTION
)
359 worker
.on('exit', this.opts
.exitHandler
?? EMPTY_FUNCTION
)
360 worker
.once('exit', () => {
361 this.removeWorker(worker
)
364 this.setWorker(this.nextWorkerId
, worker
, {
372 this.afterWorkerSetup(worker
)
378 * This function is the listener registered for each worker.
380 * @returns The listener function to execute when a message is received from a worker.
382 protected workerListener (): (message
: MessageValue
<Response
>) => void {
384 if (message
.id
!== undefined) {
385 const promise
= this.promiseMap
.get(message
.id
)
386 if (promise
!== undefined) {
387 if (message
.error
!= null) {
388 promise
.reject(message
.error
)
390 promise
.resolve(message
.data
as Response
)
392 this.afterPromiseWorkerResponseHook(message
, promise
)
393 this.promiseMap
.delete(message
.id
)
399 private async internalExecute (
402 ): Promise
<Response
> {
403 this.beforePromiseWorkerResponseHook(worker
)
404 return await new Promise
<Response
>((resolve
, reject
) => {
405 this.promiseMap
.set(messageId
, { resolve
, reject
, worker
})
409 private checkAndEmitBusy (): void {
410 if (this.opts
.enableEvents
=== true && this.busy
) {
411 this.emitter
?.emit('busy')
416 * Increases the number of tasks that the given worker has applied.
418 * @param worker - Worker which running tasks is increased.
420 private increaseWorkerRunningTasks (worker
: Worker
): void {
421 this.stepWorkerRunningTasks(worker
, 1)
425 * Decreases the number of tasks that the given worker has applied.
427 * @param worker - Worker which running tasks is decreased.
429 private decreaseWorkerRunningTasks (worker
: Worker
): void {
430 this.stepWorkerRunningTasks(worker
, -1)
434 * Get tasks usage of the given worker.
436 * @param worker - Worker which tasks usage is returned.
438 private getWorkerTasksUsage (worker
: Worker
): TasksUsage
| undefined {
439 if (this.checkWorker(worker
)) {
440 const workerKey
= this.getWorkerKey(worker
) as number
441 const workerEntry
= this.workers
.get(workerKey
) as WorkerType
<Worker
>
442 return workerEntry
.tasksUsage
447 * Steps the number of tasks that the given worker has applied.
449 * @param worker - Worker which running tasks are stepped.
450 * @param step - Number of running tasks step.
452 private stepWorkerRunningTasks (worker
: Worker
, step
: number): void {
454 (this.getWorkerTasksUsage(worker
) as TasksUsage
).running
+= step
458 * Steps the number of tasks that the given worker has run.
460 * @param worker - Worker which has run tasks.
461 * @param step - Number of run tasks step.
463 private stepWorkerRunTasks (worker
: Worker
, step
: number): void {
465 (this.getWorkerTasksUsage(worker
) as TasksUsage
).run
+= step
469 * Updates tasks runtime for the given worker.
471 * @param worker - Worker which run the task.
472 * @param taskRunTime - Worker task runtime.
474 private updateWorkerTasksRunTime (
476 taskRunTime
: number | undefined
479 this.workerChoiceStrategyContext
.getWorkerChoiceStrategy()
480 .requiredStatistics
.runTime
482 const workerTasksUsage
= this.getWorkerTasksUsage(worker
) as TasksUsage
483 workerTasksUsage
.runTime
+= taskRunTime
?? 0
484 if (workerTasksUsage
.run
!== 0) {
485 workerTasksUsage
.avgRunTime
=
486 workerTasksUsage
.runTime
/ workerTasksUsage
.run
492 * Sets the given worker.
494 * @param workerKey - The worker key.
495 * @param worker - The worker.
496 * @param tasksUsage - The worker tasks usage.
501 tasksUsage
: TasksUsage
503 this.workers
.set(workerKey
, {
510 * Checks if the given worker is registered in the pool.
512 * @param worker - Worker to check.
513 * @returns `true` if the worker is registered in the pool.
515 private checkWorker (worker
: Worker
): boolean {
516 if (this.getWorkerKey(worker
) == null) {
517 throw new Error('Worker could not be found in the pool')