3 PromiseWorkerResponseWrapper
4 } from
'../utility-types'
5 import { EMPTY_FUNCTION
} from
'../utils'
6 import { isKillBehavior
, KillBehaviors
} from
'../worker/worker-options'
7 import type { IPoolInternal
} from
'./pool-internal'
8 import { PoolEmitter
, PoolType
} from
'./pool-internal'
9 import type { WorkerChoiceStrategy
} from
'./selection-strategies'
11 WorkerChoiceStrategies
,
12 WorkerChoiceStrategyContext
13 } from
'./selection-strategies'
16 * Callback invoked if the worker has received a message.
18 export type MessageHandler
<Worker
> = (this: Worker
, m
: unknown
) => void
21 * Callback invoked if the worker raised an error.
23 export type ErrorHandler
<Worker
> = (this: Worker
, e
: Error) => void
26 * Callback invoked when the worker has started successfully.
28 export type OnlineHandler
<Worker
> = (this: Worker
) => void
31 * Callback invoked when the worker exits successfully.
33 export type ExitHandler
<Worker
> = (this: Worker
, code
: number) => void
36 * Basic interface that describes the minimum required implementation of listener events for a pool-worker.
38 export interface IWorker
{
40 * Register a listener to the message event.
42 * @param event `'message'`.
43 * @param handler The message handler.
45 on(event
: 'message', handler
: MessageHandler
<this>): void
47 * Register a listener to the error event.
49 * @param event `'error'`.
50 * @param handler The error handler.
52 on(event
: 'error', handler
: ErrorHandler
<this>): void
54 * Register a listener to the online event.
56 * @param event `'online'`.
57 * @param handler The online handler.
59 on(event
: 'online', handler
: OnlineHandler
<this>): void
61 * Register a listener to the exit event.
63 * @param event `'exit'`.
64 * @param handler The exit handler.
66 on(event
: 'exit', handler
: ExitHandler
<this>): void
68 * Register a listener to the exit event that will only performed once.
70 * @param event `'exit'`.
71 * @param handler The exit handler.
73 once(event
: 'exit', handler
: ExitHandler
<this>): void
77 * Options for a poolifier pool.
79 export interface PoolOptions
<Worker
> {
81 * A function that will listen for message event on each worker.
83 messageHandler
?: MessageHandler
<Worker
>
85 * A function that will listen for error event on each worker.
87 errorHandler
?: ErrorHandler
<Worker
>
89 * A function that will listen for online event on each worker.
91 onlineHandler
?: OnlineHandler
<Worker
>
93 * A function that will listen for exit event on each worker.
95 exitHandler
?: ExitHandler
<Worker
>
97 * The work choice strategy to use in this pool.
99 workerChoiceStrategy
?: WorkerChoiceStrategy
101 * Pool events emission.
105 enableEvents
?: boolean
109 * Base class containing some shared logic for all poolifier pools.
111 * @template Worker Type of worker which manages this pool.
112 * @template Data Type of data sent to the worker. This can only be serializable data.
113 * @template Response Type of response of execution. This can only be serializable data.
115 export abstract class AbstractPool
<
116 Worker
extends IWorker
,
119 > implements IPoolInternal
<Worker
, Data
, Response
> {
121 public readonly workers
: Worker
[] = []
124 public readonly tasks
: Map
<Worker
, number> = new Map
<Worker
, number>()
127 public readonly emitter
?: PoolEmitter
130 public readonly max
?: number
135 * - `key`: This is the message Id of each submitted task.
136 * - `value`: An object that contains the worker, the resolve function and the reject function.
138 * When we receive a message from the worker we get a map entry and resolve/reject the promise based on the message.
140 protected promiseMap
: Map
<
142 PromiseWorkerResponseWrapper
<Worker
, Response
>
143 > = new Map
<number, PromiseWorkerResponseWrapper
<Worker
, Response
>>()
146 * Id of the next message.
148 protected nextMessageId
: number = 0
151 * Worker choice strategy instance implementing the worker choice algorithm.
153 * Default to a strategy implementing a round robin algorithm.
155 protected workerChoiceStrategyContext
: WorkerChoiceStrategyContext
<
162 * Constructs a new poolifier pool.
164 * @param numberOfWorkers Number of workers that this pool should manage.
165 * @param filePath Path to the worker-file.
166 * @param opts Options for the pool.
169 public readonly numberOfWorkers
: number,
170 public readonly filePath
: string,
171 public readonly opts
: PoolOptions
<Worker
>
173 if (!this.isMain()) {
174 throw new Error('Cannot start a pool from a worker!')
176 this.checkNumberOfWorkers(this.numberOfWorkers
)
177 this.checkFilePath(this.filePath
)
178 this.checkPoolOptions(this.opts
)
181 for (let i
= 1; i
<= this.numberOfWorkers
; i
++) {
182 this.createAndSetupWorker()
185 if (this.opts
.enableEvents
) {
186 this.emitter
= new PoolEmitter()
188 this.workerChoiceStrategyContext
= new WorkerChoiceStrategyContext(
191 const workerCreated
= this.createAndSetupWorker()
192 this.registerWorkerMessageListener(workerCreated
, message
=> {
194 isKillBehavior(KillBehaviors
.HARD
, message
.kill
) ||
195 this.tasks
.get(workerCreated
) === 0
197 // Kill received from the worker, means that no new tasks are submitted to that worker for a while ( > maxInactiveTime)
198 this.destroyWorker(workerCreated
) as void
203 this.opts
.workerChoiceStrategy
207 private checkFilePath (filePath
: string): void {
209 throw new Error('Please specify a file with a worker implementation')
213 private checkNumberOfWorkers (numberOfWorkers
: number): void {
214 if (numberOfWorkers
== null) {
216 'Cannot instantiate a pool without specifying the number of workers'
218 } else if (!Number.isSafeInteger(numberOfWorkers
)) {
220 'Cannot instantiate a pool with a non integer number of workers'
222 } else if (numberOfWorkers
< 0) {
224 'Cannot instantiate a pool with a negative number of workers'
226 } else if (this.type === PoolType
.FIXED
&& numberOfWorkers
=== 0) {
227 throw new Error('Cannot instantiate a fixed pool with no worker')
231 private checkPoolOptions (opts
: PoolOptions
<Worker
>): void {
232 this.opts
.workerChoiceStrategy
=
233 opts
.workerChoiceStrategy
?? WorkerChoiceStrategies
.ROUND_ROBIN
234 this.opts
.enableEvents
= opts
.enableEvents
?? true
238 public abstract get
type (): PoolType
241 public get
numberOfRunningTasks (): number {
242 return this.promiseMap
.size
246 public setWorkerChoiceStrategy (
247 workerChoiceStrategy
: WorkerChoiceStrategy
249 this.opts
.workerChoiceStrategy
= workerChoiceStrategy
250 this.workerChoiceStrategyContext
.setWorkerChoiceStrategy(
256 public abstract get
busy (): boolean
258 protected internalGetBusyStatus (): boolean {
260 this.numberOfRunningTasks
>= this.numberOfWorkers
&&
261 this.findFreeTasksMapEntry() === false
266 public findFreeTasksMapEntry (): [Worker
, number] | false {
267 for (const [worker
, numberOfTasks
] of this.tasks
) {
268 if (numberOfTasks
=== 0) {
269 // A worker is free, return the matching tasks map entry
270 return [worker
, numberOfTasks
]
277 public execute (data
: Data
): Promise
<Response
> {
278 // Configure worker to handle message with the specified task
279 const worker
= this.chooseWorker()
280 const messageId
= ++this.nextMessageId
281 const res
= this.internalExecute(worker
, messageId
)
282 this.checkAndEmitBusy()
283 this.sendToWorker(worker
, { data
: data
?? ({} as Data
), id
: messageId
})
288 public async destroy (): Promise
<void> {
289 await Promise
.all(this.workers
.map(worker
=> this.destroyWorker(worker
)))
293 * Shut down given worker.
295 * @param worker A worker within `workers`.
297 protected abstract destroyWorker (worker
: Worker
): void | Promise
<void>
300 * Setup hook that can be overridden by a Poolifier pool implementation
301 * to run code before workers are created in the abstract constructor.
303 protected setupHook (): void {
308 * Should return whether the worker is the main worker or not.
310 protected abstract isMain (): boolean
313 * Increase the number of tasks that the given worker has applied.
315 * @param worker Worker whose tasks are increased.
317 protected increaseWorkersTask (worker
: Worker
): void {
318 this.stepWorkerNumberOfTasks(worker
, 1)
322 * Decrease the number of tasks that the given worker has applied.
324 * @param worker Worker whose tasks are decreased.
326 protected decreaseWorkersTasks (worker
: Worker
): void {
327 this.stepWorkerNumberOfTasks(worker
, -1)
331 * Step the number of tasks that the given worker has applied.
333 * @param worker Worker whose tasks are set.
334 * @param step Worker number of tasks step.
336 private stepWorkerNumberOfTasks (worker
: Worker
, step
: number): void {
337 const numberOfTasksInProgress
= this.tasks
.get(worker
)
338 if (numberOfTasksInProgress
!== undefined) {
339 this.tasks
.set(worker
, numberOfTasksInProgress
+ step
)
341 throw Error('Worker could not be found in tasks map')
346 * Removes the given worker from the pool.
348 * @param worker Worker that will be removed.
350 protected removeWorker (worker
: Worker
): void {
351 // Clean worker from data structure
352 const workerIndex
= this.workers
.indexOf(worker
)
353 this.workers
.splice(workerIndex
, 1)
354 this.tasks
.delete(worker
)
358 * Choose a worker for the next task.
360 * The default implementation uses a round robin algorithm to distribute the load.
364 protected chooseWorker (): Worker
{
365 return this.workerChoiceStrategyContext
.execute()
369 * Send a message to the given worker.
371 * @param worker The worker which should receive the message.
372 * @param message The message.
374 protected abstract sendToWorker (
376 message
: MessageValue
<Data
>
380 * Register a listener callback on a given worker.
382 * @param worker A worker.
383 * @param listener A message listener callback.
385 protected abstract registerWorkerMessageListener
<
386 Message
extends Data
| Response
387 > (worker
: Worker
, listener
: (message
: MessageValue
<Message
>) => void): void
389 protected internalExecute (
392 ): Promise
<Response
> {
393 this.increaseWorkersTask(worker
)
394 return new Promise
<Response
>((resolve
, reject
) => {
395 this.promiseMap
.set(messageId
, { resolve
, reject
, worker
})
400 * Returns a newly created worker.
402 protected abstract createWorker (): Worker
405 * Function that can be hooked up when a worker has been newly created and moved to the workers registry.
407 * Can be used to update the `maxListeners` or binding the `main-worker`<->`worker` connection if not bind by default.
409 * @param worker The newly created worker.
411 protected abstract afterWorkerSetup (worker
: Worker
): void
414 * Creates a new worker for this pool and sets it up completely.
416 * @returns New, completely set up worker.
418 protected createAndSetupWorker (): Worker
{
419 const worker
= this.createWorker()
421 worker
.on('message', this.opts
.messageHandler
?? EMPTY_FUNCTION
)
422 worker
.on('error', this.opts
.errorHandler
?? EMPTY_FUNCTION
)
423 worker
.on('online', this.opts
.onlineHandler
?? EMPTY_FUNCTION
)
424 worker
.on('exit', this.opts
.exitHandler
?? EMPTY_FUNCTION
)
425 worker
.once('exit', () => this.removeWorker(worker
))
427 this.workers
.push(worker
)
430 this.tasks
.set(worker
, 0)
432 this.afterWorkerSetup(worker
)
438 * This function is the listener registered for each worker.
440 * @returns The listener function to execute when a message is received from a worker.
442 protected workerListener (): (message
: MessageValue
<Response
>) => void {
444 if (message
.id
!== undefined) {
445 const promise
= this.promiseMap
.get(message
.id
)
446 if (promise
!== undefined) {
447 this.decreaseWorkersTasks(promise
.worker
)
448 if (message
.error
) promise
.reject(message
.error
)
449 else promise
.resolve(message
.data
as Response
)
450 this.promiseMap
.delete(message
.id
)
456 private checkAndEmitBusy (): void {
457 if (this.opts
.enableEvents
&& this.busy
) {
458 this.emitter
?.emit('busy')