3 PromiseWorkerResponseWrapper
4 } from
'../utility-types'
5 import { isKillBehavior
, KillBehaviors
} from
'../worker/worker-options'
6 import type { IPoolInternal
} from
'./pool-internal'
7 import { PoolEmitter
} from
'./pool-internal'
8 import type { WorkerChoiceStrategy
} from
'./selection-strategies'
10 WorkerChoiceStrategies
,
11 WorkerChoiceStrategyContext
12 } from
'./selection-strategies'
15 * An intentional empty function.
17 const EMPTY_FUNCTION
: () => void = () => {
18 /* Intentionally empty */
22 * Callback invoked if the worker raised an error.
24 export type ErrorHandler
<Worker
> = (this: Worker
, e
: Error) => void
27 * Callback invoked when the worker has started successfully.
29 export type OnlineHandler
<Worker
> = (this: Worker
) => void
32 * Callback invoked when the worker exits successfully.
34 export type ExitHandler
<Worker
> = (this: Worker
, code
: number) => void
37 * Basic interface that describes the minimum required implementation of listener events for a pool-worker.
39 export interface IWorker
{
41 * Register a listener to the error event.
43 * @param event `'error'`.
44 * @param handler The error handler.
46 on(event
: 'error', handler
: ErrorHandler
<this>): void
48 * Register a listener to the online event.
50 * @param event `'online'`.
51 * @param handler The online handler.
53 on(event
: 'online', handler
: OnlineHandler
<this>): void
55 * Register a listener to the exit event.
57 * @param event `'exit'`.
58 * @param handler The exit handler.
60 on(event
: 'exit', handler
: ExitHandler
<this>): void
62 * Register a listener to the exit event that will only performed once.
64 * @param event `'exit'`.
65 * @param handler The exit handler.
67 once(event
: 'exit', handler
: ExitHandler
<this>): void
71 * Options for a poolifier pool.
73 export interface PoolOptions
<Worker
> {
75 * A function that will listen for error event on each worker.
77 errorHandler
?: ErrorHandler
<Worker
>
79 * A function that will listen for online event on each worker.
81 onlineHandler
?: OnlineHandler
<Worker
>
83 * A function that will listen for exit event on each worker.
85 exitHandler
?: ExitHandler
<Worker
>
87 * The work choice strategy to use in this pool.
89 workerChoiceStrategy
?: WorkerChoiceStrategy
93 * Base class containing some shared logic for all poolifier pools.
95 * @template Worker Type of worker which manages this pool.
96 * @template Data Type of data sent to the worker. This can only be serializable data.
97 * @template Response Type of response of execution. This can only be serializable data.
99 export abstract class AbstractPool
<
100 Worker
extends IWorker
,
103 > implements IPoolInternal
<Worker
, Data
, Response
> {
105 public readonly workers
: Worker
[] = []
108 public readonly tasks
: Map
<Worker
, number> = new Map
<Worker
, number>()
111 public readonly emitter
: PoolEmitter
116 * - `key`: This is the message ID of each submitted task.
117 * - `value`: An object that contains the worker, the resolve function and the reject function.
119 * When we receive a message from the worker we get a map entry and resolve/reject the promise based on the message.
121 protected promiseMap
: Map
<
123 PromiseWorkerResponseWrapper
<Worker
, Response
>
124 > = new Map
<number, PromiseWorkerResponseWrapper
<Worker
, Response
>>()
127 * ID of the next message.
129 protected nextMessageId
: number = 0
132 * Worker choice strategy instance implementing the worker choice algorithm.
134 * Default to a strategy implementing a round robin algorithm.
136 protected workerChoiceStrategyContext
: WorkerChoiceStrategyContext
<
143 * Constructs a new poolifier pool.
145 * @param numberOfWorkers Number of workers that this pool should manage.
146 * @param filePath Path to the worker-file.
147 * @param opts Options for the pool.
150 public readonly numberOfWorkers
: number,
151 public readonly filePath
: string,
152 public readonly opts
: PoolOptions
<Worker
>
154 if (!this.isMain()) {
155 throw new Error('Cannot start a pool from a worker!')
157 this.checkNumberOfWorkers(this.numberOfWorkers
)
158 this.checkFilePath(this.filePath
)
161 for (let i
= 1; i
<= this.numberOfWorkers
; i
++) {
162 this.createAndSetupWorker()
165 this.emitter
= new PoolEmitter()
166 this.workerChoiceStrategyContext
= new WorkerChoiceStrategyContext(
169 const workerCreated
= this.createAndSetupWorker()
170 this.registerWorkerMessageListener(workerCreated
, message
=> {
171 const tasksInProgress
= this.tasks
.get(workerCreated
)
173 isKillBehavior(KillBehaviors
.HARD
, message
.kill
) ||
174 tasksInProgress
=== 0
176 // Kill received from the worker, means that no new tasks are submitted to that worker for a while ( > maxInactiveTime)
177 void this.destroyWorker(workerCreated
)
182 opts
.workerChoiceStrategy
?? WorkerChoiceStrategies
.ROUND_ROBIN
186 private checkFilePath (filePath
: string): void {
188 throw new Error('Please specify a file with a worker implementation')
192 private checkNumberOfWorkers (numberOfWorkers
: number): void {
193 if (numberOfWorkers
== null) {
195 'Cannot instantiate a pool without specifying the number of workers'
197 } else if (!Number.isSafeInteger(numberOfWorkers
)) {
199 'Cannot instantiate a pool with a non integer number of workers'
201 } else if (numberOfWorkers
< 0) {
203 'Cannot instantiate a pool with a negative number of workers'
205 } else if (!this.dynamic
&& numberOfWorkers
=== 0) {
206 throw new Error('Cannot instantiate a fixed pool with no worker')
211 public get
dynamic (): boolean {
216 public setWorkerChoiceStrategy (
217 workerChoiceStrategy
: WorkerChoiceStrategy
219 this.opts
.workerChoiceStrategy
= workerChoiceStrategy
220 this.workerChoiceStrategyContext
.setWorkerChoiceStrategy(
226 public execute (data
: Data
): Promise
<Response
> {
227 // Configure worker to handle message with the specified task
228 const worker
= this.chooseWorker()
229 this.increaseWorkersTask(worker
)
230 const messageId
= ++this.nextMessageId
231 const res
= this.internalExecute(worker
, messageId
)
232 this.sendToWorker(worker
, { data
: data
|| ({} as Data
), id
: messageId
})
237 public async destroy (): Promise
<void> {
238 await Promise
.all(this.workers
.map(worker
=> this.destroyWorker(worker
)))
242 * Shut down given worker.
244 * @param worker A worker within `workers`.
246 protected abstract destroyWorker (worker
: Worker
): void | Promise
<void>
249 * Setup hook that can be overridden by a Poolifier pool implementation
250 * to run code before workers are created in the abstract constructor.
252 protected setupHook (): void {
257 * Should return whether the worker is the main worker or not.
259 protected abstract isMain (): boolean
262 * Increase the number of tasks that the given workers has done.
264 * @param worker Worker whose tasks are increased.
266 protected increaseWorkersTask (worker
: Worker
): void {
267 this.stepWorkerNumberOfTasks(worker
, 1)
271 * Decrease the number of tasks that the given workers has done.
273 * @param worker Worker whose tasks are decreased.
275 protected decreaseWorkersTasks (worker
: Worker
): void {
276 this.stepWorkerNumberOfTasks(worker
, -1)
280 * Step the number of tasks that the given workers has done.
282 * @param worker Worker whose tasks are set.
283 * @param step Worker number of tasks step.
285 private stepWorkerNumberOfTasks (worker
: Worker
, step
: number): void {
286 const numberOfTasksInProgress
= this.tasks
.get(worker
)
287 if (numberOfTasksInProgress
!== undefined) {
288 this.tasks
.set(worker
, numberOfTasksInProgress
+ step
)
290 throw Error('Worker could not be found in tasks map')
295 * Removes the given worker from the pool.
297 * @param worker Worker that will be removed.
299 protected removeWorker (worker
: Worker
): void {
300 // Clean worker from data structure
301 const workerIndex
= this.workers
.indexOf(worker
)
302 this.workers
.splice(workerIndex
, 1)
303 this.tasks
.delete(worker
)
307 * Choose a worker for the next task.
309 * The default implementation uses a round robin algorithm to distribute the load.
313 protected chooseWorker (): Worker
{
314 return this.workerChoiceStrategyContext
.execute()
318 * Send a message to the given worker.
320 * @param worker The worker which should receive the message.
321 * @param message The message.
323 protected abstract sendToWorker (
325 message
: MessageValue
<Data
>
329 * Register a listener callback on a given worker.
331 * @param worker A worker.
332 * @param listener A message listener callback.
334 protected abstract registerWorkerMessageListener
<
335 Message
extends Data
| Response
336 > (worker
: Worker
, listener
: (message
: MessageValue
<Message
>) => void): void
338 protected internalExecute (
341 ): Promise
<Response
> {
342 return new Promise
<Response
>((resolve
, reject
) => {
343 this.promiseMap
.set(messageId
, { resolve
, reject
, worker
})
348 * Returns a newly created worker.
350 protected abstract createWorker (): Worker
353 * Function that can be hooked up when a worker has been newly created and moved to the workers registry.
355 * Can be used to update the `maxListeners` or binding the `main-worker`<->`worker` connection if not bind by default.
357 * @param worker The newly created worker.
359 protected abstract afterWorkerSetup (worker
: Worker
): void
362 * Creates a new worker for this pool and sets it up completely.
364 * @returns New, completely set up worker.
366 protected createAndSetupWorker (): Worker
{
367 const worker
: Worker
= this.createWorker()
369 worker
.on('error', this.opts
.errorHandler
?? EMPTY_FUNCTION
)
370 worker
.on('online', this.opts
.onlineHandler
?? EMPTY_FUNCTION
)
371 worker
.on('exit', this.opts
.exitHandler
?? EMPTY_FUNCTION
)
372 worker
.once('exit', () => this.removeWorker(worker
))
374 this.workers
.push(worker
)
377 this.tasks
.set(worker
, 0)
379 this.afterWorkerSetup(worker
)
385 * This function is the listener registered for each worker.
387 * @returns The listener function to execute when a message is sent from a worker.
389 protected workerListener (): (message
: MessageValue
<Response
>) => void {
392 const value
= this.promiseMap
.get(message
.id
)
394 this.decreaseWorkersTasks(value
.worker
)
395 if (message
.error
) value
.reject(message
.error
)
396 else value
.resolve(message
.data
as Response
)
397 this.promiseMap
.delete(message
.id
)