1 import type { MessageValue
} from
'../utility-types'
2 import type { IPoolInternal
} from
'./pool-internal'
3 import { PoolEmitter
} from
'./pool-internal'
4 import type { WorkerChoiceStrategy
} from
'./selection-strategies'
6 WorkerChoiceStrategies
,
7 WorkerChoiceStrategyContext
8 } from
'./selection-strategies'
11 * An intentional empty function.
13 const EMPTY_FUNCTION
: () => void = () => {
14 /* Intentionally empty */
18 * Callback invoked if the worker raised an error.
20 export type ErrorHandler
<Worker
> = (this: Worker
, e
: Error) => void
23 * Callback invoked when the worker has started successfully.
25 export type OnlineHandler
<Worker
> = (this: Worker
) => void
28 * Callback invoked when the worker exits successfully.
30 export type ExitHandler
<Worker
> = (this: Worker
, code
: number) => void
33 * Basic interface that describes the minimum required implementation of listener events for a pool-worker.
35 export interface IWorker
{
37 * Register a listener to the error event.
39 * @param event `'error'`.
40 * @param handler The error handler.
42 on(event
: 'error', handler
: ErrorHandler
<this>): void
44 * Register a listener to the online event.
46 * @param event `'online'`.
47 * @param handler The online handler.
49 on(event
: 'online', handler
: OnlineHandler
<this>): void
51 * Register a listener to the exit event.
53 * @param event `'exit'`.
54 * @param handler The exit handler.
56 on(event
: 'exit', handler
: ExitHandler
<this>): void
58 * Register a listener to the exit event that will only performed once.
60 * @param event `'exit'`.
61 * @param handler The exit handler.
63 once(event
: 'exit', handler
: ExitHandler
<this>): void
67 * Options for a poolifier pool.
69 export interface PoolOptions
<Worker
> {
71 * A function that will listen for error event on each worker.
73 errorHandler
?: ErrorHandler
<Worker
>
75 * A function that will listen for online event on each worker.
77 onlineHandler
?: OnlineHandler
<Worker
>
79 * A function that will listen for exit event on each worker.
81 exitHandler
?: ExitHandler
<Worker
>
83 * This is just to avoid non-useful warning messages.
85 * Will be used to set `maxListeners` on event emitters (workers are event emitters).
88 * @see [Node events emitter.setMaxListeners(n)](https://nodejs.org/api/events.html#events_emitter_setmaxlisteners_n)
92 * The work choice strategy to use in this pool.
94 workerChoiceStrategy
?: WorkerChoiceStrategy
98 * Base class containing some shared logic for all poolifier pools.
100 * @template Worker Type of worker which manages this pool.
101 * @template Data Type of data sent to the worker. This can only be serializable data.
102 * @template Response Type of response of execution. This can only be serializable data.
104 export abstract class AbstractPool
<
105 Worker
extends IWorker
,
108 > implements IPoolInternal
<Worker
, Data
, Response
> {
110 public readonly workers
: Worker
[] = []
113 public readonly tasks
: Map
<Worker
, number> = new Map
<Worker
, number>()
116 public readonly emitter
: PoolEmitter
119 * ID of the next message.
121 protected nextMessageId
: number = 0
124 * Worker choice strategy instance implementing the worker choice algorithm.
126 * Default to a strategy implementing a round robin algorithm.
128 protected workerChoiceStrategyContext
: WorkerChoiceStrategyContext
<
135 * Constructs a new poolifier pool.
137 * @param numberOfWorkers Number of workers that this pool should manage.
138 * @param filePath Path to the worker-file.
139 * @param opts Options for the pool. Default: `{ maxTasks: 1000 }`
142 public readonly numberOfWorkers
: number,
143 public readonly filePath
: string,
144 public readonly opts
: PoolOptions
<Worker
> = { maxTasks
: 1000 }
146 if (!this.isMain()) {
147 throw new Error('Cannot start a pool from a worker!')
149 this.checkFilePath(this.filePath
)
152 for (let i
= 1; i
<= this.numberOfWorkers
; i
++) {
153 this.createAndSetupWorker()
156 this.emitter
= new PoolEmitter()
157 this.workerChoiceStrategyContext
= new WorkerChoiceStrategyContext(
159 opts
.workerChoiceStrategy
?? WorkerChoiceStrategies
.ROUND_ROBIN
163 private checkFilePath (filePath
: string): void {
165 throw new Error('Please specify a file with a worker implementation')
170 public isDynamic (): boolean {
175 public setWorkerChoiceStrategy (
176 workerChoiceStrategy
: WorkerChoiceStrategy
178 this.workerChoiceStrategyContext
.setWorkerChoiceStrategy(
184 public execute (data
: Data
): Promise
<Response
> {
185 // Configure worker to handle message with the specified task
186 const worker
= this.chooseWorker()
187 this.increaseWorkersTask(worker
)
188 const messageId
= ++this.nextMessageId
189 const res
= this.internalExecute(worker
, messageId
)
190 this.sendToWorker(worker
, { data
: data
|| ({} as Data
), id
: messageId
})
195 public async destroy (): Promise
<void> {
196 await Promise
.all(this.workers
.map(worker
=> this.destroyWorker(worker
)))
200 public abstract destroyWorker (worker
: Worker
): void | Promise
<void>
203 * Setup hook that can be overridden by a Poolifier pool implementation
204 * to run code before workers are created in the abstract constructor.
206 protected setupHook (): void {
211 * Should return whether the worker is the main worker or not.
213 protected abstract isMain (): boolean
216 * Increase the number of tasks that the given workers has done.
218 * @param worker Worker whose tasks are increased.
220 protected increaseWorkersTask (worker
: Worker
): void {
221 this.stepWorkerNumberOfTasks(worker
, 1)
225 * Decrease the number of tasks that the given workers has done.
227 * @param worker Worker whose tasks are decreased.
229 protected decreaseWorkersTasks (worker
: Worker
): void {
230 this.stepWorkerNumberOfTasks(worker
, -1)
234 * Step the number of tasks that the given workers has done.
236 * @param worker Worker whose tasks are set.
237 * @param step Worker number of tasks step.
239 private stepWorkerNumberOfTasks (worker
: Worker
, step
: number): void {
240 const numberOfTasksInProgress
= this.tasks
.get(worker
)
241 if (numberOfTasksInProgress
!== undefined) {
242 this.tasks
.set(worker
, numberOfTasksInProgress
+ step
)
244 throw Error('Worker could not be found in tasks map')
249 * Removes the given worker from the pool.
251 * @param worker Worker that will be removed.
253 protected removeWorker (worker
: Worker
): void {
254 // Clean worker from data structure
255 const workerIndex
= this.workers
.indexOf(worker
)
256 this.workers
.splice(workerIndex
, 1)
257 this.tasks
.delete(worker
)
261 * Choose a worker for the next task.
263 * The default implementation uses a round robin algorithm to distribute the load.
267 protected chooseWorker (): Worker
{
268 return this.workerChoiceStrategyContext
.execute()
272 * Send a message to the given worker.
274 * @param worker The worker which should receive the message.
275 * @param message The message.
277 protected abstract sendToWorker (
279 message
: MessageValue
<Data
>
283 public abstract registerWorkerMessageListener
<
284 Message
extends Data
| Response
285 > (worker
: Worker
, listener
: (message
: MessageValue
<Message
>) => void): void
287 protected abstract unregisterWorkerMessageListener
<
288 Message
extends Data
| Response
289 > (worker
: Worker
, listener
: (message
: MessageValue
<Message
>) => void): void
291 protected internalExecute (
294 ): Promise
<Response
> {
295 return new Promise((resolve
, reject
) => {
296 const listener
: (message
: MessageValue
<Response
>) => void = message
=> {
297 if (message
.id
=== messageId
) {
298 this.unregisterWorkerMessageListener(worker
, listener
)
299 this.decreaseWorkersTasks(worker
)
300 if (message
.error
) reject(message
.error
)
301 else resolve(message
.data
as Response
)
304 this.registerWorkerMessageListener(worker
, listener
)
309 * Returns a newly created worker.
311 protected abstract createWorker (): Worker
314 * Function that can be hooked up when a worker has been newly created and moved to the workers registry.
316 * Can be used to update the `maxListeners` or binding the `main-worker`<->`worker` connection if not bind by default.
318 * @param worker The newly created worker.
320 protected abstract afterWorkerSetup (worker
: Worker
): void
323 public createAndSetupWorker (): Worker
{
324 const worker
: Worker
= this.createWorker()
326 worker
.on('error', this.opts
.errorHandler
?? EMPTY_FUNCTION
)
327 worker
.on('online', this.opts
.onlineHandler
?? EMPTY_FUNCTION
)
328 worker
.on('exit', this.opts
.exitHandler
?? EMPTY_FUNCTION
)
329 worker
.once('exit', () => this.removeWorker(worker
))
331 this.workers
.push(worker
)
334 this.tasks
.set(worker
, 0)
336 this.afterWorkerSetup(worker
)