6b29669ff624a2ccdc58faebc2faa909125f97a0
1 import EventEmitter from
'events'
2 import type { MessageValue
} from
'../utility-types'
3 import type { IPool
} from
'./pool'
6 * Callback invoked if the worker raised an error.
8 export type ErrorHandler
<Worker
> = (this: Worker
, e
: Error) => void
11 * Callback invoked when the worker has started successfully.
13 export type OnlineHandler
<Worker
> = (this: Worker
) => void
16 * Callback invoked when the worker exits successfully.
18 export type ExitHandler
<Worker
> = (this: Worker
, code
: number) => void
21 * Basic interface that describes the minimum required implementation of listener events for a pool-worker.
23 export interface IWorker
{
24 on(event
: 'error', handler
: ErrorHandler
<this>): void
25 on(event
: 'online', handler
: OnlineHandler
<this>): void
26 on(event
: 'exit', handler
: ExitHandler
<this>): void
30 * Options for a poolifier pool.
32 export interface PoolOptions
<Worker
> {
34 * A function that will listen for error event on each worker.
36 errorHandler
?: ErrorHandler
<Worker
>
38 * A function that will listen for online event on each worker.
40 onlineHandler
?: OnlineHandler
<Worker
>
42 * A function that will listen for exit event on each worker.
44 exitHandler
?: ExitHandler
<Worker
>
46 * This is just to avoid non-useful warning messages.
48 * Will be used to set `maxListeners` on event emitters (workers are event emitters).
51 * @see [Node events emitter.setMaxListeners(n)](https://nodejs.org/api/events.html#events_emitter_setmaxlisteners_n)
57 * Internal poolifier pool emitter.
59 class PoolEmitter
extends EventEmitter
{}
62 * Base class containing some shared logic for all poolifier pools.
64 * @template Worker Type of worker which manages this pool.
65 * @template Data Type of data sent to the worker.
66 * @template Response Type of response of execution.
68 export abstract class AbstractPool
<
69 Worker
extends IWorker
,
72 > implements IPool
<Data
, Response
> {
74 * List of currently available workers.
76 public readonly workers
: Worker
[] = []
79 * ID for the next worker.
81 public nextWorker
: number = 0
84 * - `key`: The `Worker`
85 * - `value`: Number of tasks that has been assigned to that worker since it started
87 public readonly tasks
: Map
<Worker
, number> = new Map
<Worker
, number>()
90 * Emitter on which events can be listened to.
92 * Events that can currently be listened to:
96 public readonly emitter
: PoolEmitter
99 * ID of the next message.
101 protected id
: number = 0
104 * Constructs a new poolifier pool.
106 * @param numWorkers Number of workers that this pool should manage.
107 * @param filePath Path to the worker-file.
108 * @param opts Options for the pool. Default: `{ maxTasks: 1000 }`
111 public readonly numWorkers
: number,
112 public readonly filePath
: string,
113 public readonly opts
: PoolOptions
<Worker
> = { maxTasks
: 1000 }
115 if (!this.isMain()) {
116 throw new Error('Cannot start a pool from a worker!')
118 // TODO christopher 2021-02-07: Improve this check e.g. with a pattern or blank check
119 if (!this.filePath
) {
120 throw new Error('Please specify a file with a worker implementation')
125 for (let i
= 1; i
<= this.numWorkers
; i
++) {
126 this.internalNewWorker()
129 this.emitter
= new PoolEmitter()
133 * Setup hook that can be overridden by a Poolifer pool implementation
134 * to run code before workers are created in the abstract constructor.
136 protected setupHook (): void {
141 * Should return whether the worker is the main worker or not.
143 protected abstract isMain (): boolean
145 public async destroy (): Promise
<void> {
146 for (const worker
of this.workers
) {
147 await this.destroyWorker(worker
)
152 * Shut down given worker.
154 * @param worker A worker within `workers`.
156 protected abstract destroyWorker (worker
: Worker
): void | Promise
<void>
159 * Send a message to the given worker.
161 * @param worker The worker which should receive the message.
162 * @param message The message.
164 protected abstract sendToWorker (
166 message
: MessageValue
<Data
>
170 * Adds the given worker to the pool.
172 * @param worker Worker that will be added.
174 protected addWorker (worker
: Worker
): void {
175 const previousWorkerIndex
= this.tasks
.get(worker
)
176 if (previousWorkerIndex
!== undefined) {
177 this.tasks
.set(worker
, previousWorkerIndex
+ 1)
179 throw Error('Worker could not be found in tasks map')
184 * Removes the given worker from the pool.
186 * @param worker Worker that will be removed.
188 protected removeWorker (worker
: Worker
): void {
189 // Clean worker from data structure
190 const workerIndex
= this.workers
.indexOf(worker
)
191 this.workers
.splice(workerIndex
, 1)
192 this.tasks
.delete(worker
)
195 public execute (data
: Data
): Promise
<Response
> {
196 // Configure worker to handle message with the specified task
197 const worker
= this.chooseWorker()
198 this.addWorker(worker
)
200 const res
= this.internalExecute(worker
, id
)
201 this.sendToWorker(worker
, { data
: data
|| ({} as Data
), id
: id
})
205 protected abstract registerWorkerMessageListener (
207 listener
: (message
: MessageValue
<Response
>) => void
210 protected abstract unregisterWorkerMessageListener (
212 listener
: (message
: MessageValue
<Response
>) => void
215 protected internalExecute (worker
: Worker
, id
: number): Promise
<Response
> {
216 return new Promise((resolve
, reject
) => {
217 const listener
: (message
: MessageValue
<Response
>) => void = message
=> {
218 if (message
.id
=== id
) {
219 this.unregisterWorkerMessageListener(worker
, listener
)
220 this.addWorker(worker
)
221 if (message
.error
) reject(message
.error
)
222 else resolve(message
.data
as Response
)
225 this.registerWorkerMessageListener(worker
, listener
)
230 * Choose a worker for the next task.
232 * The default implementation uses a round robin algorithm to distribute the load.
236 protected chooseWorker (): Worker
{
238 this.nextWorker
=== this.workers
.length
- 1 ? 0 : this.nextWorker
+ 1
239 return this.workers
[this.nextWorker
]
243 * Returns a newly created worker.
245 protected abstract newWorker (): Worker
248 * Function that can be hooked up when a worker has been newly created and moved to the workers registry.
250 * Can be used to update the `maxListeners` or binding the `main-worker`<->`worker` connection if not bind by default.
252 * @param worker The newly created worker.
254 protected abstract afterNewWorkerPushed (worker
: Worker
): void
257 * Creates a new worker for this pool and sets it up completely.
259 * @returns New, completely set up worker.
261 protected internalNewWorker (): Worker
{
262 const worker
: Worker
= this.newWorker()
263 worker
.on('error', this.opts
.errorHandler
?? (() => {}))
264 worker
.on('online', this.opts
.onlineHandler
?? (() => {}))
265 // TODO handle properly when a worker exit
266 worker
.on('exit', this.opts
.exitHandler
?? (() => {}))
267 this.workers
.push(worker
)
268 this.afterNewWorkerPushed(worker
)
270 this.tasks
.set(worker
, 0)