Fixes to worker selection strategies
[poolifier.git] / src / pools / abstract-pool.ts
1 import type {
2 MessageValue,
3 PromiseWorkerResponseWrapper
4 } from '../utility-types'
5 import { EMPTY_FUNCTION } from '../utils'
6 import { isKillBehavior, KillBehaviors } from '../worker/worker-options'
7 import type { PoolOptions } from './pool'
8 import type { IPoolInternal, TasksUsage } from './pool-internal'
9 import { PoolEmitter, PoolType } from './pool-internal'
10 import type { IPoolWorker } from './pool-worker'
11 import {
12 WorkerChoiceStrategies,
13 WorkerChoiceStrategy
14 } from './selection-strategies/selection-strategies-types'
15 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
16
17 const WORKER_NOT_FOUND_TASKS_USAGE_MAP =
18 'Worker could not be found in worker tasks usage map'
19
20 /**
21 * Base class that implements some shared logic for all poolifier pools.
22 *
23 * @template Worker Type of worker which manages this pool.
24 * @template Data Type of data sent to the worker. This can only be serializable data.
25 * @template Response Type of response of execution. This can only be serializable data.
26 */
27 export abstract class AbstractPool<
28 Worker extends IPoolWorker,
29 Data = unknown,
30 Response = unknown
31 > implements IPoolInternal<Worker, Data, Response> {
32 /** @inheritDoc */
33 public readonly workers: Worker[] = []
34
35 /** @inheritDoc */
36 public readonly workersTasksUsage: Map<Worker, TasksUsage> = new Map<
37 Worker,
38 TasksUsage
39 >()
40
41 /** @inheritDoc */
42 public readonly emitter?: PoolEmitter
43
44 /** @inheritDoc */
45 public readonly max?: number
46
47 /**
48 * The promise map.
49 *
50 * - `key`: This is the message Id of each submitted task.
51 * - `value`: An object that contains the worker, the resolve function and the reject function.
52 *
53 * When we receive a message from the worker we get a map entry and resolve/reject the promise based on the message.
54 */
55 protected promiseMap: Map<
56 number,
57 PromiseWorkerResponseWrapper<Worker, Response>
58 > = new Map<number, PromiseWorkerResponseWrapper<Worker, Response>>()
59
60 /**
61 * Id of the next message.
62 */
63 protected nextMessageId: number = 0
64
65 /**
66 * Worker choice strategy instance implementing the worker choice algorithm.
67 *
68 * Default to a strategy implementing a round robin algorithm.
69 */
70 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
71 Worker,
72 Data,
73 Response
74 >
75
76 /**
77 * Constructs a new poolifier pool.
78 *
79 * @param numberOfWorkers Number of workers that this pool should manage.
80 * @param filePath Path to the worker-file.
81 * @param opts Options for the pool.
82 */
83 public constructor (
84 public readonly numberOfWorkers: number,
85 public readonly filePath: string,
86 public readonly opts: PoolOptions<Worker>
87 ) {
88 if (!this.isMain()) {
89 throw new Error('Cannot start a pool from a worker!')
90 }
91 this.checkNumberOfWorkers(this.numberOfWorkers)
92 this.checkFilePath(this.filePath)
93 this.checkPoolOptions(this.opts)
94 this.setupHook()
95
96 for (let i = 1; i <= this.numberOfWorkers; i++) {
97 this.createAndSetupWorker()
98 }
99
100 if (this.opts.enableEvents) {
101 this.emitter = new PoolEmitter()
102 }
103 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext(
104 this,
105 () => {
106 const workerCreated = this.createAndSetupWorker()
107 this.registerWorkerMessageListener(workerCreated, message => {
108 if (
109 isKillBehavior(KillBehaviors.HARD, message.kill) ||
110 this.getWorkerRunningTasks(workerCreated) === 0
111 ) {
112 // Kill received from the worker, means that no new tasks are submitted to that worker for a while ( > maxInactiveTime)
113 this.destroyWorker(workerCreated) as void
114 }
115 })
116 return workerCreated
117 },
118 this.opts.workerChoiceStrategy
119 )
120 }
121
122 private checkFilePath (filePath: string): void {
123 if (!filePath) {
124 throw new Error('Please specify a file with a worker implementation')
125 }
126 }
127
128 private checkNumberOfWorkers (numberOfWorkers: number): void {
129 if (numberOfWorkers == null) {
130 throw new Error(
131 'Cannot instantiate a pool without specifying the number of workers'
132 )
133 } else if (Number.isSafeInteger(numberOfWorkers) === false) {
134 throw new Error(
135 'Cannot instantiate a pool with a non integer number of workers'
136 )
137 } else if (numberOfWorkers < 0) {
138 throw new Error(
139 'Cannot instantiate a pool with a negative number of workers'
140 )
141 } else if (this.type === PoolType.FIXED && numberOfWorkers === 0) {
142 throw new Error('Cannot instantiate a fixed pool with no worker')
143 }
144 }
145
146 private checkPoolOptions (opts: PoolOptions<Worker>): void {
147 this.opts.workerChoiceStrategy =
148 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
149 this.opts.enableEvents = opts.enableEvents ?? true
150 }
151
152 /** @inheritDoc */
153 public abstract get type (): PoolType
154
155 /** @inheritDoc */
156 public get numberOfRunningTasks (): number {
157 return this.promiseMap.size
158 }
159
160 /** @inheritDoc */
161 public getWorkerIndex (worker: Worker): number {
162 return this.workers.indexOf(worker)
163 }
164
165 /** @inheritDoc */
166 public getWorkerRunningTasks (worker: Worker): number | undefined {
167 return this.workersTasksUsage.get(worker)?.running
168 }
169
170 /** @inheritDoc */
171 public getWorkerAverageTasksRunTime (worker: Worker): number | undefined {
172 return this.workersTasksUsage.get(worker)?.avgRunTime
173 }
174
175 /** @inheritDoc */
176 public setWorkerChoiceStrategy (
177 workerChoiceStrategy: WorkerChoiceStrategy
178 ): void {
179 this.opts.workerChoiceStrategy = workerChoiceStrategy
180 for (const worker of this.workers) {
181 this.resetWorkerTasksUsage(worker)
182 }
183 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
184 workerChoiceStrategy
185 )
186 }
187
188 /** @inheritDoc */
189 public abstract get busy (): boolean
190
191 protected internalGetBusyStatus (): boolean {
192 return (
193 this.numberOfRunningTasks >= this.numberOfWorkers &&
194 this.findFreeWorker() === false
195 )
196 }
197
198 /** @inheritDoc */
199 public findFreeWorker (): Worker | false {
200 for (const worker of this.workers) {
201 if (this.getWorkerRunningTasks(worker) === 0) {
202 // A worker is free, return the matching worker
203 return worker
204 }
205 }
206 return false
207 }
208
209 /** @inheritDoc */
210 public execute (data: Data): Promise<Response> {
211 // Configure worker to handle message with the specified task
212 const worker = this.chooseWorker()
213 const messageId = ++this.nextMessageId
214 const res = this.internalExecute(worker, messageId)
215 this.checkAndEmitBusy()
216 data = data ?? ({} as Data)
217 this.sendToWorker(worker, { data, id: messageId })
218 return res
219 }
220
221 /** @inheritDoc */
222 public async destroy (): Promise<void> {
223 await Promise.all(this.workers.map(worker => this.destroyWorker(worker)))
224 }
225
226 /**
227 * Shutdowns given worker.
228 *
229 * @param worker A worker within `workers`.
230 */
231 protected abstract destroyWorker (worker: Worker): void | Promise<void>
232
233 /**
234 * Setup hook that can be overridden by a Poolifier pool implementation
235 * to run code before workers are created in the abstract constructor.
236 */
237 protected setupHook (): void {
238 // Can be overridden
239 }
240
241 /**
242 * Should return whether the worker is the main worker or not.
243 */
244 protected abstract isMain (): boolean
245
246 /**
247 * Hook executed before the worker task promise resolution.
248 * Can be overridden.
249 *
250 * @param worker The worker.
251 */
252 protected beforePromiseWorkerResponseHook (worker: Worker): void {
253 this.increaseWorkerRunningTasks(worker)
254 }
255
256 /**
257 * Hook executed after the worker task promise resolution.
258 * Can be overridden.
259 *
260 * @param message The received message.
261 * @param promise The Promise response.
262 */
263 protected afterPromiseWorkerResponseHook (
264 message: MessageValue<Response>,
265 promise: PromiseWorkerResponseWrapper<Worker, Response>
266 ): void {
267 this.decreaseWorkerRunningTasks(promise.worker)
268 this.stepWorkerRunTasks(promise.worker, 1)
269 this.updateWorkerTasksRunTime(promise.worker, message.taskRunTime)
270 }
271
272 /**
273 * Removes the given worker from the pool.
274 *
275 * @param worker The worker that will be removed.
276 */
277 protected removeWorker (worker: Worker): void {
278 // Clean worker from data structure
279 this.workers.splice(this.getWorkerIndex(worker), 1)
280 this.removeWorkerTasksUsage(worker)
281 }
282
283 /**
284 * Chooses a worker for the next task.
285 *
286 * The default implementation uses a round robin algorithm to distribute the load.
287 *
288 * @returns Worker.
289 */
290 protected chooseWorker (): Worker {
291 return this.workerChoiceStrategyContext.execute()
292 }
293
294 /**
295 * Sends a message to the given worker.
296 *
297 * @param worker The worker which should receive the message.
298 * @param message The message.
299 */
300 protected abstract sendToWorker (
301 worker: Worker,
302 message: MessageValue<Data>
303 ): void
304
305 /**
306 * Registers a listener callback on a given worker.
307 *
308 * @param worker The worker which should register a listener.
309 * @param listener The message listener callback.
310 */
311 protected abstract registerWorkerMessageListener<
312 Message extends Data | Response
313 > (worker: Worker, listener: (message: MessageValue<Message>) => void): void
314
315 protected internalExecute (
316 worker: Worker,
317 messageId: number
318 ): Promise<Response> {
319 this.beforePromiseWorkerResponseHook(worker)
320 return new Promise<Response>((resolve, reject) => {
321 this.promiseMap.set(messageId, { resolve, reject, worker })
322 })
323 }
324
325 /**
326 * Returns a newly created worker.
327 */
328 protected abstract createWorker (): Worker
329
330 /**
331 * Function that can be hooked up when a worker has been newly created and moved to the workers registry.
332 *
333 * Can be used to update the `maxListeners` or binding the `main-worker`<->`worker` connection if not bind by default.
334 *
335 * @param worker The newly created worker.
336 */
337 protected abstract afterWorkerSetup (worker: Worker): void
338
339 /**
340 * Creates a new worker for this pool and sets it up completely.
341 *
342 * @returns New, completely set up worker.
343 */
344 protected createAndSetupWorker (): Worker {
345 const worker = this.createWorker()
346
347 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
348 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
349 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
350 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
351 worker.once('exit', () => this.removeWorker(worker))
352
353 this.workers.push(worker)
354
355 // Init worker tasks usage map
356 this.initWorkerTasksUsage(worker)
357
358 this.afterWorkerSetup(worker)
359
360 return worker
361 }
362
363 /**
364 * This function is the listener registered for each worker.
365 *
366 * @returns The listener function to execute when a message is received from a worker.
367 */
368 protected workerListener (): (message: MessageValue<Response>) => void {
369 return message => {
370 if (message.id !== undefined) {
371 const promise = this.promiseMap.get(message.id)
372 if (promise !== undefined) {
373 this.afterPromiseWorkerResponseHook(message, promise)
374 if (message.error) promise.reject(message.error)
375 else promise.resolve(message.data as Response)
376 this.promiseMap.delete(message.id)
377 }
378 }
379 }
380 }
381
382 private checkAndEmitBusy (): void {
383 if (this.opts.enableEvents && this.busy) {
384 this.emitter?.emit('busy')
385 }
386 }
387
388 /**
389 * Increases the number of tasks that the given worker has applied.
390 *
391 * @param worker Worker which running tasks is increased.
392 */
393 private increaseWorkerRunningTasks (worker: Worker): void {
394 this.stepWorkerRunningTasks(worker, 1)
395 }
396
397 /**
398 * Decreases the number of tasks that the given worker has applied.
399 *
400 * @param worker Worker which running tasks is decreased.
401 */
402 private decreaseWorkerRunningTasks (worker: Worker): void {
403 this.stepWorkerRunningTasks(worker, -1)
404 }
405
406 /**
407 * Steps the number of tasks that the given worker has applied.
408 *
409 * @param worker Worker which running tasks are stepped.
410 * @param step Number of running tasks step.
411 */
412 private stepWorkerRunningTasks (worker: Worker, step: number): void {
413 const tasksUsage = this.workersTasksUsage.get(worker)
414 if (tasksUsage !== undefined) {
415 tasksUsage.running = tasksUsage.running + step
416 this.workersTasksUsage.set(worker, tasksUsage)
417 } else {
418 throw new Error(WORKER_NOT_FOUND_TASKS_USAGE_MAP)
419 }
420 }
421
422 /**
423 * Steps the number of tasks that the given worker has run.
424 *
425 * @param worker Worker which has run tasks.
426 * @param step Number of run tasks step.
427 */
428 private stepWorkerRunTasks (worker: Worker, step: number): void {
429 const tasksUsage = this.workersTasksUsage.get(worker)
430 if (tasksUsage !== undefined) {
431 tasksUsage.run = tasksUsage.run + step
432 this.workersTasksUsage.set(worker, tasksUsage)
433 } else {
434 throw new Error(WORKER_NOT_FOUND_TASKS_USAGE_MAP)
435 }
436 }
437
438 /**
439 * Updates tasks runtime for the given worker.
440 *
441 * @param worker Worker which run the task.
442 * @param taskRunTime Worker task runtime.
443 */
444 private updateWorkerTasksRunTime (
445 worker: Worker,
446 taskRunTime: number | undefined
447 ): void {
448 if (
449 this.workerChoiceStrategyContext.getWorkerChoiceStrategy()
450 .requiredStatistics.runTime === true
451 ) {
452 const tasksUsage = this.workersTasksUsage.get(worker)
453 if (tasksUsage !== undefined) {
454 tasksUsage.runTime += taskRunTime ?? 0
455 if (tasksUsage.run !== 0) {
456 tasksUsage.avgRunTime = tasksUsage.runTime / tasksUsage.run
457 }
458 this.workersTasksUsage.set(worker, tasksUsage)
459 } else {
460 throw new Error(WORKER_NOT_FOUND_TASKS_USAGE_MAP)
461 }
462 }
463 }
464
465 /**
466 * Initializes tasks usage statistics.
467 *
468 * @param worker The worker.
469 */
470 initWorkerTasksUsage (worker: Worker): void {
471 this.workersTasksUsage.set(worker, {
472 run: 0,
473 running: 0,
474 runTime: 0,
475 avgRunTime: 0
476 })
477 }
478
479 /**
480 * Removes worker tasks usage statistics.
481 *
482 * @param worker The worker.
483 */
484 private removeWorkerTasksUsage (worker: Worker): void {
485 this.workersTasksUsage.delete(worker)
486 }
487
488 /**
489 * Resets worker tasks usage statistics.
490 *
491 * @param worker The worker.
492 */
493 private resetWorkerTasksUsage (worker: Worker): void {
494 this.removeWorkerTasksUsage(worker)
495 this.initWorkerTasksUsage(worker)
496 }
497 }