1 import EventEmitter from
'events'
2 import type { MessageValue
} from
'../utility-types'
3 import type { IPool
} from
'./pool'
6 * An intentional empty function.
8 function emptyFunction () {
9 // intentionally left blank
13 * Callback invoked if the worker raised an error.
15 export type ErrorHandler
<Worker
> = (this: Worker
, e
: Error) => void
18 * Callback invoked when the worker has started successfully.
20 export type OnlineHandler
<Worker
> = (this: Worker
) => void
23 * Callback invoked when the worker exits successfully.
25 export type ExitHandler
<Worker
> = (this: Worker
, code
: number) => void
28 * Basic interface that describes the minimum required implementation of listener events for a pool-worker.
30 export interface IWorker
{
32 * Register a listener to the error event.
34 * @param event `'error'`.
35 * @param handler The error handler.
37 on(event
: 'error', handler
: ErrorHandler
<this>): void
39 * Register a listener to the online event.
41 * @param event `'online'`.
42 * @param handler The online handler.
44 on(event
: 'online', handler
: OnlineHandler
<this>): void
46 * Register a listener to the exit event.
48 * @param event `'exit'`.
49 * @param handler The exit handler.
51 on(event
: 'exit', handler
: ExitHandler
<this>): void
53 * Register a listener to the exit event that will only performed once.
55 * @param event `'exit'`.
56 * @param handler The exit handler.
58 once(event
: 'exit', handler
: ExitHandler
<this>): void
62 * Options for a poolifier pool.
64 export interface PoolOptions
<Worker
> {
66 * A function that will listen for error event on each worker.
68 errorHandler
?: ErrorHandler
<Worker
>
70 * A function that will listen for online event on each worker.
72 onlineHandler
?: OnlineHandler
<Worker
>
74 * A function that will listen for exit event on each worker.
76 exitHandler
?: ExitHandler
<Worker
>
78 * This is just to avoid non-useful warning messages.
80 * Will be used to set `maxListeners` on event emitters (workers are event emitters).
83 * @see [Node events emitter.setMaxListeners(n)](https://nodejs.org/api/events.html#events_emitter_setmaxlisteners_n)
89 * Internal poolifier pool emitter.
91 class PoolEmitter
extends EventEmitter
{}
94 * Base class containing some shared logic for all poolifier pools.
96 * @template Worker Type of worker which manages this pool.
97 * @template Data Type of data sent to the worker.
98 * @template Response Type of response of execution.
100 export abstract class AbstractPool
<
101 Worker
extends IWorker
,
104 > implements IPool
<Data
, Response
> {
106 * List of currently available workers.
108 public readonly workers
: Worker
[] = []
111 * Index for the next worker.
113 public nextWorkerIndex
: number = 0
118 * - `key`: The `Worker`
119 * - `value`: Number of tasks currently in progress on the worker.
121 public readonly tasks
: Map
<Worker
, number> = new Map
<Worker
, number>()
124 * Emitter on which events can be listened to.
126 * Events that can currently be listened to:
130 public readonly emitter
: PoolEmitter
133 * ID of the next message.
135 protected nextMessageId
: number = 0
138 * Constructs a new poolifier pool.
140 * @param numberOfWorkers Number of workers that this pool should manage.
141 * @param filePath Path to the worker-file.
142 * @param opts Options for the pool. Default: `{ maxTasks: 1000 }`
145 public readonly numberOfWorkers
: number,
146 public readonly filePath
: string,
147 public readonly opts
: PoolOptions
<Worker
> = { maxTasks
: 1000 }
149 if (!this.isMain()) {
150 throw new Error('Cannot start a pool from a worker!')
152 this.checkFilePath(this.filePath
)
155 for (let i
= 1; i
<= this.numberOfWorkers
; i
++) {
156 this.createAndSetupWorker()
159 this.emitter
= new PoolEmitter()
162 private checkFilePath (filePath
: string) {
164 throw new Error('Please specify a file with a worker implementation')
169 * Perform the task specified in the constructor with the data parameter.
171 * @param data The input for the specified task.
172 * @returns Promise that will be resolved when the task is successfully completed.
174 public execute (data
: Data
): Promise
<Response
> {
175 // Configure worker to handle message with the specified task
176 const worker
= this.chooseWorker()
177 this.increaseWorkersTask(worker
)
178 const messageId
= ++this.nextMessageId
179 const res
= this.internalExecute(worker
, messageId
)
180 this.sendToWorker(worker
, { data
: data
|| ({} as Data
), id
: messageId
})
185 * Shut down every current worker in this pool.
187 public async destroy (): Promise
<void> {
188 await Promise
.all(this.workers
.map(worker
=> this.destroyWorker(worker
)))
192 * Shut down given worker.
194 * @param worker A worker within `workers`.
196 protected abstract destroyWorker (worker
: Worker
): void | Promise
<void>
199 * Setup hook that can be overridden by a Poolifier pool implementation
200 * to run code before workers are created in the abstract constructor.
202 protected setupHook (): void {
207 * Should return whether the worker is the main worker or not.
209 protected abstract isMain (): boolean
212 * Increase the number of tasks that the given workers has done.
214 * @param worker Worker whose tasks are increased.
216 protected increaseWorkersTask (worker
: Worker
): void {
217 this.stepWorkerNumberOfTasks(worker
, 1)
221 * Decrease the number of tasks that the given workers has done.
223 * @param worker Worker whose tasks are decreased.
225 protected decreaseWorkersTasks (worker
: Worker
): void {
226 this.stepWorkerNumberOfTasks(worker
, -1)
230 * Step the number of tasks that the given workers has done.
232 * @param worker Worker whose tasks are set.
233 * @param step Worker number of tasks step.
235 private stepWorkerNumberOfTasks (worker
: Worker
, step
: number) {
236 const numberOfTasksInProgress
= this.tasks
.get(worker
)
237 if (numberOfTasksInProgress
!== undefined) {
238 this.tasks
.set(worker
, numberOfTasksInProgress
+ step
)
240 throw Error('Worker could not be found in tasks map')
245 * Removes the given worker from the pool.
247 * @param worker Worker that will be removed.
249 protected removeWorker (worker
: Worker
): void {
250 // Clean worker from data structure
251 const workerIndex
= this.workers
.indexOf(worker
)
252 this.workers
.splice(workerIndex
, 1)
253 this.tasks
.delete(worker
)
257 * Choose a worker for the next task.
259 * The default implementation uses a round robin algorithm to distribute the load.
263 protected chooseWorker (): Worker
{
264 const chosenWorker
= this.workers
[this.nextWorkerIndex
]
265 this.nextWorkerIndex
=
266 this.workers
.length
- 1 === this.nextWorkerIndex
268 : this.nextWorkerIndex
+ 1
273 * Send a message to the given worker.
275 * @param worker The worker which should receive the message.
276 * @param message The message.
278 protected abstract sendToWorker (
280 message
: MessageValue
<Data
>
283 protected abstract registerWorkerMessageListener
<
284 Message
extends Data
| Response
285 > (worker
: Worker
, listener
: (message
: MessageValue
<Message
>) => void): void
287 protected abstract unregisterWorkerMessageListener
<
288 Message
extends Data
| Response
289 > (worker
: Worker
, listener
: (message
: MessageValue
<Message
>) => void): void
291 protected internalExecute (
294 ): Promise
<Response
> {
295 return new Promise((resolve
, reject
) => {
296 const listener
: (message
: MessageValue
<Response
>) => void = message
=> {
297 if (message
.id
=== messageId
) {
298 this.unregisterWorkerMessageListener(worker
, listener
)
299 this.decreaseWorkersTasks(worker
)
300 if (message
.error
) reject(message
.error
)
301 else resolve(message
.data
as Response
)
304 this.registerWorkerMessageListener(worker
, listener
)
309 * Returns a newly created worker.
311 protected abstract createWorker (): Worker
314 * Function that can be hooked up when a worker has been newly created and moved to the workers registry.
316 * Can be used to update the `maxListeners` or binding the `main-worker`<->`worker` connection if not bind by default.
318 * @param worker The newly created worker.
320 protected abstract afterWorkerSetup (worker
: Worker
): void
323 * Creates a new worker for this pool and sets it up completely.
325 * @returns New, completely set up worker.
327 protected createAndSetupWorker (): Worker
{
328 const worker
: Worker
= this.createWorker()
330 worker
.on('error', this.opts
.errorHandler
?? emptyFunction
)
331 worker
.on('online', this.opts
.onlineHandler
?? emptyFunction
)
332 worker
.on('exit', this.opts
.exitHandler
?? emptyFunction
)
333 worker
.once('exit', () => this.removeWorker(worker
))
335 this.workers
.push(worker
)
338 this.tasks
.set(worker
, 0)
340 this.afterWorkerSetup(worker
)