Generate documentation
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
be0676b3
APA
1import type {
2 MessageValue,
3 PromiseWorkerResponseWrapper
4} from '../utility-types'
6e9d10db 5import { EMPTY_FUNCTION } from '../utils'
4a6952ff 6import { isKillBehavior, KillBehaviors } from '../worker/worker-options'
bdaf31cd
JB
7import type { AbstractPoolWorker } from './abstract-pool-worker'
8import type { PoolOptions } from './pool'
a35560ba 9import type { IPoolInternal } from './pool-internal'
7c0ba920 10import { PoolEmitter, PoolType } from './pool-internal'
a35560ba
S
11import {
12 WorkerChoiceStrategies,
bdaf31cd
JB
13 WorkerChoiceStrategy
14} from './selection-strategies/selection-strategies-types'
15import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
c97c7edb 16
729c563d
S
17/**
18 * Base class containing some shared logic for all poolifier pools.
19 *
20 * @template Worker Type of worker which manages this pool.
deb85c12
JB
21 * @template Data Type of data sent to the worker. This can only be serializable data.
22 * @template Response Type of response of execution. This can only be serializable data.
729c563d 23 */
c97c7edb 24export abstract class AbstractPool<
bdaf31cd 25 Worker extends AbstractPoolWorker,
d3c8a1a8
S
26 Data = unknown,
27 Response = unknown
9b2fdd9f 28> implements IPoolInternal<Worker, Data, Response> {
a76fac14 29 /** @inheritDoc */
4a6952ff
JB
30 public readonly workers: Worker[] = []
31
a76fac14 32 /** @inheritDoc */
4a6952ff
JB
33 public readonly tasks: Map<Worker, number> = new Map<Worker, number>()
34
a76fac14 35 /** @inheritDoc */
7c0ba920
JB
36 public readonly emitter?: PoolEmitter
37
a76fac14 38 /** @inheritDoc */
7c0ba920 39 public readonly max?: number
4a6952ff 40
be0676b3
APA
41 /**
42 * The promise map.
43 *
e088a00c 44 * - `key`: This is the message Id of each submitted task.
be0676b3
APA
45 * - `value`: An object that contains the worker, the resolve function and the reject function.
46 *
47 * When we receive a message from the worker we get a map entry and resolve/reject the promise based on the message.
48 */
49 protected promiseMap: Map<
50 number,
51 PromiseWorkerResponseWrapper<Worker, Response>
52 > = new Map<number, PromiseWorkerResponseWrapper<Worker, Response>>()
53
729c563d 54 /**
e088a00c 55 * Id of the next message.
729c563d 56 */
280c2a77 57 protected nextMessageId: number = 0
c97c7edb 58
a35560ba
S
59 /**
60 * Worker choice strategy instance implementing the worker choice algorithm.
61 *
62 * Default to a strategy implementing a round robin algorithm.
63 */
64 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
65 Worker,
66 Data,
67 Response
68 >
69
729c563d
S
70 /**
71 * Constructs a new poolifier pool.
72 *
5c5a1fb7 73 * @param numberOfWorkers Number of workers that this pool should manage.
729c563d 74 * @param filePath Path to the worker-file.
1927ee67 75 * @param opts Options for the pool.
729c563d 76 */
c97c7edb 77 public constructor (
5c5a1fb7 78 public readonly numberOfWorkers: number,
c97c7edb 79 public readonly filePath: string,
1927ee67 80 public readonly opts: PoolOptions<Worker>
c97c7edb
S
81 ) {
82 if (!this.isMain()) {
83 throw new Error('Cannot start a pool from a worker!')
84 }
8d3782fa 85 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 86 this.checkFilePath(this.filePath)
7c0ba920 87 this.checkPoolOptions(this.opts)
c97c7edb
S
88 this.setupHook()
89
5c5a1fb7 90 for (let i = 1; i <= this.numberOfWorkers; i++) {
280c2a77 91 this.createAndSetupWorker()
c97c7edb
S
92 }
93
7c0ba920
JB
94 if (this.opts.enableEvents) {
95 this.emitter = new PoolEmitter()
96 }
a35560ba
S
97 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext(
98 this,
4a6952ff
JB
99 () => {
100 const workerCreated = this.createAndSetupWorker()
f3636726 101 this.registerWorkerMessageListener(workerCreated, message => {
4a6952ff
JB
102 if (
103 isKillBehavior(KillBehaviors.HARD, message.kill) ||
bdaf31cd 104 this.getWorkerRunningTasks(workerCreated) === 0
4a6952ff
JB
105 ) {
106 // Kill received from the worker, means that no new tasks are submitted to that worker for a while ( > maxInactiveTime)
f3636726 107 this.destroyWorker(workerCreated) as void
4a6952ff
JB
108 }
109 })
110 return workerCreated
111 },
e843b904 112 this.opts.workerChoiceStrategy
a35560ba 113 )
c97c7edb
S
114 }
115
a35560ba 116 private checkFilePath (filePath: string): void {
c510fea7
APA
117 if (!filePath) {
118 throw new Error('Please specify a file with a worker implementation')
119 }
120 }
121
8d3782fa
JB
122 private checkNumberOfWorkers (numberOfWorkers: number): void {
123 if (numberOfWorkers == null) {
124 throw new Error(
125 'Cannot instantiate a pool without specifying the number of workers'
126 )
127 } else if (!Number.isSafeInteger(numberOfWorkers)) {
128 throw new Error(
129 'Cannot instantiate a pool with a non integer number of workers'
130 )
131 } else if (numberOfWorkers < 0) {
132 throw new Error(
133 'Cannot instantiate a pool with a negative number of workers'
134 )
7c0ba920 135 } else if (this.type === PoolType.FIXED && numberOfWorkers === 0) {
8d3782fa
JB
136 throw new Error('Cannot instantiate a fixed pool with no worker')
137 }
138 }
139
7c0ba920 140 private checkPoolOptions (opts: PoolOptions<Worker>): void {
e843b904
JB
141 this.opts.workerChoiceStrategy =
142 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
7c0ba920
JB
143 this.opts.enableEvents = opts.enableEvents ?? true
144 }
145
a76fac14 146 /** @inheritDoc */
7c0ba920
JB
147 public abstract get type (): PoolType
148
a76fac14 149 /** @inheritDoc */
7c0ba920
JB
150 public get numberOfRunningTasks (): number {
151 return this.promiseMap.size
a35560ba
S
152 }
153
a76fac14 154 /** @inheritDoc */
bdaf31cd
JB
155 public getWorkerRunningTasks (worker: Worker): number | undefined {
156 return this.tasks.get(worker)
157 }
158
a76fac14 159 /** @inheritDoc */
bdaf31cd
JB
160 public getWorkerIndex (worker: Worker): number {
161 return this.workers.indexOf(worker)
162 }
163
a76fac14 164 /** @inheritDoc */
a35560ba
S
165 public setWorkerChoiceStrategy (
166 workerChoiceStrategy: WorkerChoiceStrategy
167 ): void {
b98ec2e6 168 this.opts.workerChoiceStrategy = workerChoiceStrategy
a35560ba
S
169 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
170 workerChoiceStrategy
171 )
172 }
173
a76fac14 174 /** @inheritDoc */
7c0ba920
JB
175 public abstract get busy (): boolean
176
177 protected internalGetBusyStatus (): boolean {
178 return (
179 this.numberOfRunningTasks >= this.numberOfWorkers &&
bdaf31cd 180 this.findFreeWorker() === false
7c0ba920
JB
181 )
182 }
183
a76fac14 184 /** @inheritDoc */
bdaf31cd
JB
185 public findFreeWorker (): Worker | false {
186 for (const worker of this.workers) {
187 if (this.getWorkerRunningTasks(worker) === 0) {
188 // A worker is free, return the matching worker
189 return worker
7c0ba920
JB
190 }
191 }
192 return false
193 }
194
a76fac14 195 /** @inheritDoc */
280c2a77
S
196 public execute (data: Data): Promise<Response> {
197 // Configure worker to handle message with the specified task
198 const worker = this.chooseWorker()
280c2a77
S
199 const messageId = ++this.nextMessageId
200 const res = this.internalExecute(worker, messageId)
14916bf9 201 this.checkAndEmitBusy()
cf597bc5
JB
202 data = data ?? ({} as Data)
203 this.sendToWorker(worker, { data, id: messageId })
280c2a77
S
204 return res
205 }
c97c7edb 206
a76fac14 207 /** @inheritDoc */
c97c7edb 208 public async destroy (): Promise<void> {
45dbbb14 209 await Promise.all(this.workers.map(worker => this.destroyWorker(worker)))
c97c7edb
S
210 }
211
4a6952ff
JB
212 /**
213 * Shut down given worker.
214 *
215 * @param worker A worker within `workers`.
216 */
217 protected abstract destroyWorker (worker: Worker): void | Promise<void>
c97c7edb 218
729c563d 219 /**
280c2a77
S
220 * Setup hook that can be overridden by a Poolifier pool implementation
221 * to run code before workers are created in the abstract constructor.
729c563d 222 */
280c2a77
S
223 protected setupHook (): void {
224 // Can be overridden
225 }
c97c7edb 226
729c563d 227 /**
280c2a77
S
228 * Should return whether the worker is the main worker or not.
229 */
230 protected abstract isMain (): boolean
231
232 /**
35cf1c03 233 * Increase the number of tasks that the given worker has applied.
729c563d 234 *
f416c098 235 * @param worker Worker whose tasks are increased.
729c563d 236 */
280c2a77 237 protected increaseWorkersTask (worker: Worker): void {
f416c098 238 this.stepWorkerNumberOfTasks(worker, 1)
c97c7edb
S
239 }
240
c01733f1 241 /**
35cf1c03 242 * Decrease the number of tasks that the given worker has applied.
c01733f1 243 *
f416c098 244 * @param worker Worker whose tasks are decreased.
c01733f1 245 */
246 protected decreaseWorkersTasks (worker: Worker): void {
f416c098
JB
247 this.stepWorkerNumberOfTasks(worker, -1)
248 }
249
250 /**
35cf1c03 251 * Step the number of tasks that the given worker has applied.
f416c098
JB
252 *
253 * @param worker Worker whose tasks are set.
254 * @param step Worker number of tasks step.
255 */
a35560ba 256 private stepWorkerNumberOfTasks (worker: Worker, step: number): void {
d63d3be3 257 const numberOfTasksInProgress = this.tasks.get(worker)
258 if (numberOfTasksInProgress !== undefined) {
f416c098 259 this.tasks.set(worker, numberOfTasksInProgress + step)
c01733f1 260 } else {
261 throw Error('Worker could not be found in tasks map')
262 }
263 }
264
729c563d
S
265 /**
266 * Removes the given worker from the pool.
267 *
268 * @param worker Worker that will be removed.
269 */
f2fdaa86
JB
270 protected removeWorker (worker: Worker): void {
271 // Clean worker from data structure
bdaf31cd 272 this.workers.splice(this.getWorkerIndex(worker), 1)
f2fdaa86
JB
273 this.tasks.delete(worker)
274 }
275
280c2a77
S
276 /**
277 * Choose a worker for the next task.
278 *
279 * The default implementation uses a round robin algorithm to distribute the load.
280 *
281 * @returns Worker.
282 */
283 protected chooseWorker (): Worker {
a35560ba 284 return this.workerChoiceStrategyContext.execute()
c97c7edb
S
285 }
286
280c2a77
S
287 /**
288 * Send a message to the given worker.
289 *
290 * @param worker The worker which should receive the message.
291 * @param message The message.
292 */
293 protected abstract sendToWorker (
294 worker: Worker,
295 message: MessageValue<Data>
296 ): void
297
4a6952ff
JB
298 /**
299 * Register a listener callback on a given worker.
300 *
301 * @param worker A worker.
302 * @param listener A message listener callback.
303 */
304 protected abstract registerWorkerMessageListener<
4f7fa42a
S
305 Message extends Data | Response
306 > (worker: Worker, listener: (message: MessageValue<Message>) => void): void
c97c7edb 307
280c2a77
S
308 protected internalExecute (
309 worker: Worker,
310 messageId: number
311 ): Promise<Response> {
14916bf9 312 this.increaseWorkersTask(worker)
be0676b3
APA
313 return new Promise<Response>((resolve, reject) => {
314 this.promiseMap.set(messageId, { resolve, reject, worker })
c97c7edb
S
315 })
316 }
317
729c563d
S
318 /**
319 * Returns a newly created worker.
320 */
280c2a77 321 protected abstract createWorker (): Worker
c97c7edb 322
729c563d
S
323 /**
324 * Function that can be hooked up when a worker has been newly created and moved to the workers registry.
325 *
326 * Can be used to update the `maxListeners` or binding the `main-worker`<->`worker` connection if not bind by default.
327 *
328 * @param worker The newly created worker.
329 */
280c2a77 330 protected abstract afterWorkerSetup (worker: Worker): void
c97c7edb 331
4a6952ff
JB
332 /**
333 * Creates a new worker for this pool and sets it up completely.
334 *
335 * @returns New, completely set up worker.
336 */
337 protected createAndSetupWorker (): Worker {
bdacc2d2 338 const worker = this.createWorker()
280c2a77 339
35cf1c03 340 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba
S
341 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
342 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
343 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
45dbbb14 344 worker.once('exit', () => this.removeWorker(worker))
280c2a77 345
c97c7edb 346 this.workers.push(worker)
280c2a77
S
347
348 // Init tasks map
c97c7edb 349 this.tasks.set(worker, 0)
280c2a77
S
350
351 this.afterWorkerSetup(worker)
352
c97c7edb
S
353 return worker
354 }
be0676b3
APA
355
356 /**
357 * This function is the listener registered for each worker.
358 *
bdacc2d2 359 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
360 */
361 protected workerListener (): (message: MessageValue<Response>) => void {
4a6952ff 362 return message => {
bdacc2d2
JB
363 if (message.id !== undefined) {
364 const promise = this.promiseMap.get(message.id)
365 if (promise !== undefined) {
366 this.decreaseWorkersTasks(promise.worker)
367 if (message.error) promise.reject(message.error)
368 else promise.resolve(message.data as Response)
be0676b3
APA
369 this.promiseMap.delete(message.id)
370 }
371 }
372 }
be0676b3 373 }
7c0ba920
JB
374
375 private checkAndEmitBusy (): void {
376 if (this.opts.enableEvents && this.busy) {
377 this.emitter?.emit('busy')
378 }
379 }
c97c7edb 380}