928b24c65832a38ef625b7b07d498ce9f3ad504b
[poolifier.git] / src / pools / abstract-pool.ts
1 import type {
2 MessageValue,
3 PromiseWorkerResponseWrapper
4 } from '../utility-types'
5 import { EMPTY_FUNCTION } from '../utils'
6 import { isKillBehavior, KillBehaviors } from '../worker/worker-options'
7 import type { AbstractPoolWorker } from './abstract-pool-worker'
8 import type { PoolOptions } from './pool'
9 import type { IPoolInternal } from './pool-internal'
10 import { PoolEmitter, PoolType } from './pool-internal'
11 import {
12 WorkerChoiceStrategies,
13 WorkerChoiceStrategy
14 } from './selection-strategies/selection-strategies-types'
15 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
16
17 /**
18 * Base class containing some shared logic for all poolifier pools.
19 *
20 * @template Worker Type of worker which manages this pool.
21 * @template Data Type of data sent to the worker. This can only be serializable data.
22 * @template Response Type of response of execution. This can only be serializable data.
23 */
24 export abstract class AbstractPool<
25 Worker extends AbstractPoolWorker,
26 Data = unknown,
27 Response = unknown
28 > implements IPoolInternal<Worker, Data, Response> {
29 /** @inheritdoc */
30 public readonly workers: Worker[] = []
31
32 /** @inheritdoc */
33 public readonly tasks: Map<Worker, number> = new Map<Worker, number>()
34
35 /** @inheritdoc */
36 public readonly emitter?: PoolEmitter
37
38 /** @inheritdoc */
39 public readonly max?: number
40
41 /**
42 * The promise map.
43 *
44 * - `key`: This is the message Id of each submitted task.
45 * - `value`: An object that contains the worker, the resolve function and the reject function.
46 *
47 * When we receive a message from the worker we get a map entry and resolve/reject the promise based on the message.
48 */
49 protected promiseMap: Map<
50 number,
51 PromiseWorkerResponseWrapper<Worker, Response>
52 > = new Map<number, PromiseWorkerResponseWrapper<Worker, Response>>()
53
54 /**
55 * Id of the next message.
56 */
57 protected nextMessageId: number = 0
58
59 /**
60 * Worker choice strategy instance implementing the worker choice algorithm.
61 *
62 * Default to a strategy implementing a round robin algorithm.
63 */
64 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
65 Worker,
66 Data,
67 Response
68 >
69
70 /**
71 * Constructs a new poolifier pool.
72 *
73 * @param numberOfWorkers Number of workers that this pool should manage.
74 * @param filePath Path to the worker-file.
75 * @param opts Options for the pool.
76 */
77 public constructor (
78 public readonly numberOfWorkers: number,
79 public readonly filePath: string,
80 public readonly opts: PoolOptions<Worker>
81 ) {
82 if (!this.isMain()) {
83 throw new Error('Cannot start a pool from a worker!')
84 }
85 this.checkNumberOfWorkers(this.numberOfWorkers)
86 this.checkFilePath(this.filePath)
87 this.checkPoolOptions(this.opts)
88 this.setupHook()
89
90 for (let i = 1; i <= this.numberOfWorkers; i++) {
91 this.createAndSetupWorker()
92 }
93
94 if (this.opts.enableEvents) {
95 this.emitter = new PoolEmitter()
96 }
97 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext(
98 this,
99 () => {
100 const workerCreated = this.createAndSetupWorker()
101 this.registerWorkerMessageListener(workerCreated, message => {
102 if (
103 isKillBehavior(KillBehaviors.HARD, message.kill) ||
104 this.getWorkerRunningTasks(workerCreated) === 0
105 ) {
106 // Kill received from the worker, means that no new tasks are submitted to that worker for a while ( > maxInactiveTime)
107 this.destroyWorker(workerCreated) as void
108 }
109 })
110 return workerCreated
111 },
112 this.opts.workerChoiceStrategy
113 )
114 }
115
116 private checkFilePath (filePath: string): void {
117 if (!filePath) {
118 throw new Error('Please specify a file with a worker implementation')
119 }
120 }
121
122 private checkNumberOfWorkers (numberOfWorkers: number): void {
123 if (numberOfWorkers == null) {
124 throw new Error(
125 'Cannot instantiate a pool without specifying the number of workers'
126 )
127 } else if (!Number.isSafeInteger(numberOfWorkers)) {
128 throw new Error(
129 'Cannot instantiate a pool with a non integer number of workers'
130 )
131 } else if (numberOfWorkers < 0) {
132 throw new Error(
133 'Cannot instantiate a pool with a negative number of workers'
134 )
135 } else if (this.type === PoolType.FIXED && numberOfWorkers === 0) {
136 throw new Error('Cannot instantiate a fixed pool with no worker')
137 }
138 }
139
140 private checkPoolOptions (opts: PoolOptions<Worker>): void {
141 this.opts.workerChoiceStrategy =
142 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
143 this.opts.enableEvents = opts.enableEvents ?? true
144 }
145
146 /** @inheritdoc */
147 public abstract get type (): PoolType
148
149 /** @inheritdoc */
150 public get numberOfRunningTasks (): number {
151 return this.promiseMap.size
152 }
153
154 /** @inheritdoc */
155 public getWorkerRunningTasks (worker: Worker): number | undefined {
156 return this.tasks.get(worker)
157 }
158
159 /** @inheritdoc */
160 public getWorkerIndex (worker: Worker): number {
161 return this.workers.indexOf(worker)
162 }
163
164 /** @inheritdoc */
165 public setWorkerChoiceStrategy (
166 workerChoiceStrategy: WorkerChoiceStrategy
167 ): void {
168 this.opts.workerChoiceStrategy = workerChoiceStrategy
169 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
170 workerChoiceStrategy
171 )
172 }
173
174 /** @inheritdoc */
175 public abstract get busy (): boolean
176
177 protected internalGetBusyStatus (): boolean {
178 return (
179 this.numberOfRunningTasks >= this.numberOfWorkers &&
180 this.findFreeWorker() === false
181 )
182 }
183
184 /** @inheritdoc */
185 public findFreeWorker (): Worker | false {
186 for (const worker of this.workers) {
187 if (this.getWorkerRunningTasks(worker) === 0) {
188 // A worker is free, return the matching worker
189 return worker
190 }
191 }
192 return false
193 }
194
195 /** @inheritdoc */
196 public execute (data: Data): Promise<Response> {
197 // Configure worker to handle message with the specified task
198 const worker = this.chooseWorker()
199 const messageId = ++this.nextMessageId
200 const res = this.internalExecute(worker, messageId)
201 this.checkAndEmitBusy()
202 data = data ?? ({} as Data)
203 this.sendToWorker(worker, { data, id: messageId })
204 return res
205 }
206
207 /** @inheritdoc */
208 public async destroy (): Promise<void> {
209 await Promise.all(this.workers.map(worker => this.destroyWorker(worker)))
210 }
211
212 /**
213 * Shut down given worker.
214 *
215 * @param worker A worker within `workers`.
216 */
217 protected abstract destroyWorker (worker: Worker): void | Promise<void>
218
219 /**
220 * Setup hook that can be overridden by a Poolifier pool implementation
221 * to run code before workers are created in the abstract constructor.
222 */
223 protected setupHook (): void {
224 // Can be overridden
225 }
226
227 /**
228 * Should return whether the worker is the main worker or not.
229 */
230 protected abstract isMain (): boolean
231
232 /**
233 * Increase the number of tasks that the given worker has applied.
234 *
235 * @param worker Worker whose tasks are increased.
236 */
237 protected increaseWorkersTask (worker: Worker): void {
238 this.stepWorkerNumberOfTasks(worker, 1)
239 }
240
241 /**
242 * Decrease the number of tasks that the given worker has applied.
243 *
244 * @param worker Worker whose tasks are decreased.
245 */
246 protected decreaseWorkersTasks (worker: Worker): void {
247 this.stepWorkerNumberOfTasks(worker, -1)
248 }
249
250 /**
251 * Step the number of tasks that the given worker has applied.
252 *
253 * @param worker Worker whose tasks are set.
254 * @param step Worker number of tasks step.
255 */
256 private stepWorkerNumberOfTasks (worker: Worker, step: number): void {
257 const numberOfTasksInProgress = this.tasks.get(worker)
258 if (numberOfTasksInProgress !== undefined) {
259 this.tasks.set(worker, numberOfTasksInProgress + step)
260 } else {
261 throw Error('Worker could not be found in tasks map')
262 }
263 }
264
265 /**
266 * Removes the given worker from the pool.
267 *
268 * @param worker Worker that will be removed.
269 */
270 protected removeWorker (worker: Worker): void {
271 // Clean worker from data structure
272 this.workers.splice(this.getWorkerIndex(worker), 1)
273 this.tasks.delete(worker)
274 }
275
276 /**
277 * Choose a worker for the next task.
278 *
279 * The default implementation uses a round robin algorithm to distribute the load.
280 *
281 * @returns Worker.
282 */
283 protected chooseWorker (): Worker {
284 return this.workerChoiceStrategyContext.execute()
285 }
286
287 /**
288 * Send a message to the given worker.
289 *
290 * @param worker The worker which should receive the message.
291 * @param message The message.
292 */
293 protected abstract sendToWorker (
294 worker: Worker,
295 message: MessageValue<Data>
296 ): void
297
298 /**
299 * Register a listener callback on a given worker.
300 *
301 * @param worker A worker.
302 * @param listener A message listener callback.
303 */
304 protected abstract registerWorkerMessageListener<
305 Message extends Data | Response
306 > (worker: Worker, listener: (message: MessageValue<Message>) => void): void
307
308 protected internalExecute (
309 worker: Worker,
310 messageId: number
311 ): Promise<Response> {
312 this.increaseWorkersTask(worker)
313 return new Promise<Response>((resolve, reject) => {
314 this.promiseMap.set(messageId, { resolve, reject, worker })
315 })
316 }
317
318 /**
319 * Returns a newly created worker.
320 */
321 protected abstract createWorker (): Worker
322
323 /**
324 * Function that can be hooked up when a worker has been newly created and moved to the workers registry.
325 *
326 * Can be used to update the `maxListeners` or binding the `main-worker`<->`worker` connection if not bind by default.
327 *
328 * @param worker The newly created worker.
329 */
330 protected abstract afterWorkerSetup (worker: Worker): void
331
332 /**
333 * Creates a new worker for this pool and sets it up completely.
334 *
335 * @returns New, completely set up worker.
336 */
337 protected createAndSetupWorker (): Worker {
338 const worker = this.createWorker()
339
340 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
341 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
342 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
343 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
344 worker.once('exit', () => this.removeWorker(worker))
345
346 this.workers.push(worker)
347
348 // Init tasks map
349 this.tasks.set(worker, 0)
350
351 this.afterWorkerSetup(worker)
352
353 return worker
354 }
355
356 /**
357 * This function is the listener registered for each worker.
358 *
359 * @returns The listener function to execute when a message is received from a worker.
360 */
361 protected workerListener (): (message: MessageValue<Response>) => void {
362 return message => {
363 if (message.id !== undefined) {
364 const promise = this.promiseMap.get(message.id)
365 if (promise !== undefined) {
366 this.decreaseWorkersTasks(promise.worker)
367 if (message.error) promise.reject(message.error)
368 else promise.resolve(message.data as Response)
369 this.promiseMap.delete(message.id)
370 }
371 }
372 }
373 }
374
375 private checkAndEmitBusy (): void {
376 if (this.opts.enableEvents && this.busy) {
377 this.emitter?.emit('busy')
378 }
379 }
380 }