1 import EventEmitter from
'events'
2 import type { MessageValue
} from
'../utility-types'
3 import type { IPool
} from
'./pool'
6 * Callback invoked if the worker raised an error.
8 export type ErrorHandler
<Worker
> = (this: Worker
, e
: Error) => void
11 * Callback invoked when the worker has started successfully.
13 export type OnlineHandler
<Worker
> = (this: Worker
) => void
16 * Callback invoked when the worker exits successfully.
18 export type ExitHandler
<Worker
> = (this: Worker
, code
: number) => void
21 * Basic interface that describes the minimum required implementation of listener events for a pool-worker.
23 export interface IWorker
{
25 * Register a listener to the error event.
27 * @param event `'error'`.
28 * @param handler The error handler.
30 on(event
: 'error', handler
: ErrorHandler
<this>): void
32 * Register a listener to the online event.
34 * @param event `'online'`.
35 * @param handler The online handler.
37 on(event
: 'online', handler
: OnlineHandler
<this>): void
39 * Register a listener to the exit event.
41 * @param event `'exit'`.
42 * @param handler The exit handler.
44 on(event
: 'exit', handler
: ExitHandler
<this>): void
46 * Register a listener to the exit event that will only performed once.
48 * @param event `'exit'`.
49 * @param handler The exit handler.
51 once(event
: 'exit', handler
: ExitHandler
<this>): void
55 * Options for a poolifier pool.
57 export interface PoolOptions
<Worker
> {
59 * A function that will listen for error event on each worker.
61 errorHandler
?: ErrorHandler
<Worker
>
63 * A function that will listen for online event on each worker.
65 onlineHandler
?: OnlineHandler
<Worker
>
67 * A function that will listen for exit event on each worker.
69 exitHandler
?: ExitHandler
<Worker
>
71 * This is just to avoid non-useful warning messages.
73 * Will be used to set `maxListeners` on event emitters (workers are event emitters).
76 * @see [Node events emitter.setMaxListeners(n)](https://nodejs.org/api/events.html#events_emitter_setmaxlisteners_n)
82 * Internal poolifier pool emitter.
84 class PoolEmitter
extends EventEmitter
{}
87 * Base class containing some shared logic for all poolifier pools.
89 * @template Worker Type of worker which manages this pool.
90 * @template Data Type of data sent to the worker.
91 * @template Response Type of response of execution.
93 export abstract class AbstractPool
<
94 Worker
extends IWorker
,
97 > implements IPool
<Data
, Response
> {
99 * List of currently available workers.
101 public readonly workers
: Worker
[] = []
104 * Index for the next worker.
106 public nextWorkerIndex
: number = 0
111 * - `key`: The `Worker`
112 * - `value`: Number of tasks currently in progress on the worker.
114 public readonly tasks
: Map
<Worker
, number> = new Map
<Worker
, number>()
117 * Emitter on which events can be listened to.
119 * Events that can currently be listened to:
123 public readonly emitter
: PoolEmitter
126 * ID of the next message.
128 protected nextMessageId
: number = 0
131 * Constructs a new poolifier pool.
133 * @param numberOfWorkers Number of workers that this pool should manage.
134 * @param filePath Path to the worker-file.
135 * @param opts Options for the pool. Default: `{ maxTasks: 1000 }`
138 public readonly numberOfWorkers
: number,
139 public readonly filePath
: string,
140 public readonly opts
: PoolOptions
<Worker
> = { maxTasks
: 1000 }
142 if (!this.isMain()) {
143 throw new Error('Cannot start a pool from a worker!')
145 // TODO christopher 2021-02-07: Improve this check e.g. with a pattern or blank check
146 if (!this.filePath
) {
147 throw new Error('Please specify a file with a worker implementation')
151 for (let i
= 1; i
<= this.numberOfWorkers
; i
++) {
152 this.createAndSetupWorker()
155 this.emitter
= new PoolEmitter()
159 * Perform the task specified in the constructor with the data parameter.
161 * @param data The input for the specified task.
162 * @returns Promise that will be resolved when the task is successfully completed.
164 public execute (data
: Data
): Promise
<Response
> {
165 // Configure worker to handle message with the specified task
166 const worker
= this.chooseWorker()
167 this.increaseWorkersTask(worker
)
168 const messageId
= ++this.nextMessageId
169 const res
= this.internalExecute(worker
, messageId
)
170 this.sendToWorker(worker
, { data
: data
|| ({} as Data
), id
: messageId
})
175 * Shut down every current worker in this pool.
177 public async destroy (): Promise
<void> {
178 await Promise
.all(this.workers
.map(worker
=> this.destroyWorker(worker
)))
182 * Shut down given worker.
184 * @param worker A worker within `workers`.
186 protected abstract destroyWorker (worker
: Worker
): void | Promise
<void>
189 * Setup hook that can be overridden by a Poolifier pool implementation
190 * to run code before workers are created in the abstract constructor.
192 protected setupHook (): void {
197 * Should return whether the worker is the main worker or not.
199 protected abstract isMain (): boolean
202 * Increase the number of tasks that the given workers has done.
204 * @param worker Worker whose tasks are increased.
206 protected increaseWorkersTask (worker
: Worker
): void {
207 this.stepWorkerNumberOfTasks(worker
, 1)
211 * Decrease the number of tasks that the given workers has done.
213 * @param worker Worker whose tasks are decreased.
215 protected decreaseWorkersTasks (worker
: Worker
): void {
216 this.stepWorkerNumberOfTasks(worker
, -1)
220 * Step the number of tasks that the given workers has done.
222 * @param worker Worker whose tasks are set.
223 * @param step Worker number of tasks step.
225 private stepWorkerNumberOfTasks (worker
: Worker
, step
: number) {
226 const numberOfTasksInProgress
= this.tasks
.get(worker
)
227 if (numberOfTasksInProgress
!== undefined) {
228 this.tasks
.set(worker
, numberOfTasksInProgress
+ step
)
230 throw Error('Worker could not be found in tasks map')
235 * Removes the given worker from the pool.
237 * @param worker Worker that will be removed.
239 protected removeWorker (worker
: Worker
): void {
240 // Clean worker from data structure
241 const workerIndex
= this.workers
.indexOf(worker
)
242 this.workers
.splice(workerIndex
, 1)
243 this.tasks
.delete(worker
)
247 * Choose a worker for the next task.
249 * The default implementation uses a round robin algorithm to distribute the load.
253 protected chooseWorker (): Worker
{
254 const chosenWorker
= this.workers
[this.nextWorkerIndex
]
255 this.nextWorkerIndex
=
256 this.workers
.length
- 1 === this.nextWorkerIndex
258 : this.nextWorkerIndex
+ 1
263 * Send a message to the given worker.
265 * @param worker The worker which should receive the message.
266 * @param message The message.
268 protected abstract sendToWorker (
270 message
: MessageValue
<Data
>
273 protected abstract registerWorkerMessageListener
<
274 Message
extends Data
| Response
275 > (worker
: Worker
, listener
: (message
: MessageValue
<Message
>) => void): void
277 protected abstract unregisterWorkerMessageListener
<
278 Message
extends Data
| Response
279 > (worker
: Worker
, listener
: (message
: MessageValue
<Message
>) => void): void
281 protected internalExecute (
284 ): Promise
<Response
> {
285 return new Promise((resolve
, reject
) => {
286 const listener
: (message
: MessageValue
<Response
>) => void = message
=> {
287 if (message
.id
=== messageId
) {
288 this.unregisterWorkerMessageListener(worker
, listener
)
289 this.decreaseWorkersTasks(worker
)
290 if (message
.error
) reject(message
.error
)
291 else resolve(message
.data
as Response
)
294 this.registerWorkerMessageListener(worker
, listener
)
299 * Returns a newly created worker.
301 protected abstract createWorker (): Worker
304 * Function that can be hooked up when a worker has been newly created and moved to the workers registry.
306 * Can be used to update the `maxListeners` or binding the `main-worker`<->`worker` connection if not bind by default.
308 * @param worker The newly created worker.
310 protected abstract afterWorkerSetup (worker
: Worker
): void
313 * Creates a new worker for this pool and sets it up completely.
315 * @returns New, completely set up worker.
317 protected createAndSetupWorker (): Worker
{
318 const worker
: Worker
= this.createWorker()
320 worker
.on('error', this.opts
.errorHandler
?? (() => {}))
321 worker
.on('online', this.opts
.onlineHandler
?? (() => {}))
322 worker
.on('exit', this.opts
.exitHandler
?? (() => {}))
323 worker
.once('exit', () => this.removeWorker(worker
))
325 this.workers
.push(worker
)
328 this.tasks
.set(worker
, 0)
330 this.afterWorkerSetup(worker
)