3 PromiseWorkerResponseWrapper
4 } from
'../utility-types'
5 import type { IPoolInternal
} from
'./pool-internal'
6 import { PoolEmitter
} from
'./pool-internal'
7 import type { WorkerChoiceStrategy
} from
'./selection-strategies'
9 WorkerChoiceStrategies
,
10 WorkerChoiceStrategyContext
11 } from
'./selection-strategies'
14 * An intentional empty function.
16 const EMPTY_FUNCTION
: () => void = () => {
17 /* Intentionally empty */
21 * Callback invoked if the worker raised an error.
23 export type ErrorHandler
<Worker
> = (this: Worker
, e
: Error) => void
26 * Callback invoked when the worker has started successfully.
28 export type OnlineHandler
<Worker
> = (this: Worker
) => void
31 * Callback invoked when the worker exits successfully.
33 export type ExitHandler
<Worker
> = (this: Worker
, code
: number) => void
36 * Basic interface that describes the minimum required implementation of listener events for a pool-worker.
38 export interface IWorker
{
40 * Register a listener to the error event.
42 * @param event `'error'`.
43 * @param handler The error handler.
45 on(event
: 'error', handler
: ErrorHandler
<this>): void
47 * Register a listener to the online event.
49 * @param event `'online'`.
50 * @param handler The online handler.
52 on(event
: 'online', handler
: OnlineHandler
<this>): void
54 * Register a listener to the exit event.
56 * @param event `'exit'`.
57 * @param handler The exit handler.
59 on(event
: 'exit', handler
: ExitHandler
<this>): void
61 * Register a listener to the exit event that will only performed once.
63 * @param event `'exit'`.
64 * @param handler The exit handler.
66 once(event
: 'exit', handler
: ExitHandler
<this>): void
70 * Options for a poolifier pool.
72 export interface PoolOptions
<Worker
> {
74 * A function that will listen for error event on each worker.
76 errorHandler
?: ErrorHandler
<Worker
>
78 * A function that will listen for online event on each worker.
80 onlineHandler
?: OnlineHandler
<Worker
>
82 * A function that will listen for exit event on each worker.
84 exitHandler
?: ExitHandler
<Worker
>
86 * This is just to avoid non-useful warning messages.
88 * Will be used to set `maxListeners` on event emitters (workers are event emitters).
91 * @see [Node events emitter.setMaxListeners(n)](https://nodejs.org/api/events.html#events_emitter_setmaxlisteners_n)
95 * The work choice strategy to use in this pool.
97 workerChoiceStrategy
?: WorkerChoiceStrategy
101 * Base class containing some shared logic for all poolifier pools.
103 * @template Worker Type of worker which manages this pool.
104 * @template Data Type of data sent to the worker. This can only be serializable data.
105 * @template Response Type of response of execution. This can only be serializable data.
107 export abstract class AbstractPool
<
108 Worker
extends IWorker
,
111 > implements IPoolInternal
<Worker
, Data
, Response
> {
115 * - `key`: This is the message ID of each submitted task.
116 * - `value`: An object that contains the worker, the resolve function and the reject function.
118 * When we receive a message from the worker we get a map entry and resolve/reject the promise based on the message.
120 protected promiseMap
: Map
<
122 PromiseWorkerResponseWrapper
<Worker
, Response
>
123 > = new Map
<number, PromiseWorkerResponseWrapper
<Worker
, Response
>>()
126 public readonly workers
: Worker
[] = []
129 public readonly tasks
: Map
<Worker
, number> = new Map
<Worker
, number>()
132 public readonly emitter
: PoolEmitter
135 * ID of the next message.
137 protected nextMessageId
: number = 0
140 * Worker choice strategy instance implementing the worker choice algorithm.
142 * Default to a strategy implementing a round robin algorithm.
144 protected workerChoiceStrategyContext
: WorkerChoiceStrategyContext
<
151 * Constructs a new poolifier pool.
153 * @param numberOfWorkers Number of workers that this pool should manage.
154 * @param filePath Path to the worker-file.
155 * @param opts Options for the pool. Default: `{ maxTasks: 1000 }`
158 public readonly numberOfWorkers
: number,
159 public readonly filePath
: string,
160 public readonly opts
: PoolOptions
<Worker
> = { maxTasks
: 1000 }
162 if (!this.isMain()) {
163 throw new Error('Cannot start a pool from a worker!')
165 this.checkFilePath(this.filePath
)
168 for (let i
= 1; i
<= this.numberOfWorkers
; i
++) {
169 this.createAndSetupWorker()
172 this.emitter
= new PoolEmitter()
173 this.workerChoiceStrategyContext
= new WorkerChoiceStrategyContext(
175 opts
.workerChoiceStrategy
?? WorkerChoiceStrategies
.ROUND_ROBIN
179 private checkFilePath (filePath
: string): void {
181 throw new Error('Please specify a file with a worker implementation')
186 public isDynamic (): boolean {
191 public setWorkerChoiceStrategy (
192 workerChoiceStrategy
: WorkerChoiceStrategy
194 this.workerChoiceStrategyContext
.setWorkerChoiceStrategy(
200 public execute (data
: Data
): Promise
<Response
> {
201 // Configure worker to handle message with the specified task
202 const worker
= this.chooseWorker()
203 this.increaseWorkersTask(worker
)
204 const messageId
= ++this.nextMessageId
205 const res
= this.internalExecute(worker
, messageId
)
206 this.sendToWorker(worker
, { data
: data
|| ({} as Data
), id
: messageId
})
211 public async destroy (): Promise
<void> {
212 await Promise
.all(this.workers
.map(worker
=> this.destroyWorker(worker
)))
216 public abstract destroyWorker (worker
: Worker
): void | Promise
<void>
219 * Setup hook that can be overridden by a Poolifier pool implementation
220 * to run code before workers are created in the abstract constructor.
222 protected setupHook (): void {
227 * Should return whether the worker is the main worker or not.
229 protected abstract isMain (): boolean
232 * Increase the number of tasks that the given workers has done.
234 * @param worker Worker whose tasks are increased.
236 protected increaseWorkersTask (worker
: Worker
): void {
237 this.stepWorkerNumberOfTasks(worker
, 1)
241 * Decrease the number of tasks that the given workers has done.
243 * @param worker Worker whose tasks are decreased.
245 protected decreaseWorkersTasks (worker
: Worker
): void {
246 this.stepWorkerNumberOfTasks(worker
, -1)
250 * Step the number of tasks that the given workers has done.
252 * @param worker Worker whose tasks are set.
253 * @param step Worker number of tasks step.
255 private stepWorkerNumberOfTasks (worker
: Worker
, step
: number): void {
256 const numberOfTasksInProgress
= this.tasks
.get(worker
)
257 if (numberOfTasksInProgress
!== undefined) {
258 this.tasks
.set(worker
, numberOfTasksInProgress
+ step
)
260 throw Error('Worker could not be found in tasks map')
265 * Removes the given worker from the pool.
267 * @param worker Worker that will be removed.
269 protected removeWorker (worker
: Worker
): void {
270 // Clean worker from data structure
271 const workerIndex
= this.workers
.indexOf(worker
)
272 this.workers
.splice(workerIndex
, 1)
273 this.tasks
.delete(worker
)
277 * Choose a worker for the next task.
279 * The default implementation uses a round robin algorithm to distribute the load.
283 protected chooseWorker (): Worker
{
284 return this.workerChoiceStrategyContext
.execute()
288 * Send a message to the given worker.
290 * @param worker The worker which should receive the message.
291 * @param message The message.
293 protected abstract sendToWorker (
295 message
: MessageValue
<Data
>
299 public abstract registerWorkerMessageListener
<
300 Message
extends Data
| Response
301 > (worker
: Worker
, listener
: (message
: MessageValue
<Message
>) => void): void
303 protected internalExecute (
306 ): Promise
<Response
> {
307 return new Promise
<Response
>((resolve
, reject
) => {
308 this.promiseMap
.set(messageId
, { resolve
, reject
, worker
})
313 * Returns a newly created worker.
315 protected abstract createWorker (): Worker
318 * Function that can be hooked up when a worker has been newly created and moved to the workers registry.
320 * Can be used to update the `maxListeners` or binding the `main-worker`<->`worker` connection if not bind by default.
322 * @param worker The newly created worker.
324 protected abstract afterWorkerSetup (worker
: Worker
): void
327 public createAndSetupWorker (): Worker
{
328 const worker
: Worker
= this.createWorker()
330 worker
.on('error', this.opts
.errorHandler
?? EMPTY_FUNCTION
)
331 worker
.on('online', this.opts
.onlineHandler
?? EMPTY_FUNCTION
)
332 worker
.on('exit', this.opts
.exitHandler
?? EMPTY_FUNCTION
)
333 worker
.once('exit', () => this.removeWorker(worker
))
335 this.workers
.push(worker
)
338 this.tasks
.set(worker
, 0)
340 this.afterWorkerSetup(worker
)
346 * This function is the listener registered for each worker.
348 * @returns The listener function to execute when a message is sent from a worker.
350 protected workerListener (): (message
: MessageValue
<Response
>) => void {
351 const listener
: (message
: MessageValue
<Response
>) => void = message
=> {
353 const value
= this.promiseMap
.get(message
.id
)
355 this.decreaseWorkersTasks(value
.worker
)
356 if (message
.error
) value
.reject(message
.error
)
357 else value
.resolve(message
.data
as Response
)
358 this.promiseMap
.delete(message
.id
)