docs: add scope on pool public API
[poolifier.git] / src / pools / abstract-pool.ts
1 import { randomUUID } from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import { existsSync } from 'node:fs'
4 import { type TransferListItem } from 'node:worker_threads'
5 import type {
6 MessageValue,
7 PromiseResponseWrapper,
8 Task
9 } from '../utility-types'
10 import {
11 DEFAULT_TASK_NAME,
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
13 EMPTY_FUNCTION,
14 isKillBehavior,
15 isPlainObject,
16 median,
17 round,
18 updateMeasurementStatistics
19 } from '../utils'
20 import { KillBehaviors } from '../worker/worker-options'
21 import {
22 type IPool,
23 PoolEmitter,
24 PoolEvents,
25 type PoolInfo,
26 type PoolOptions,
27 type PoolType,
28 PoolTypes,
29 type TasksQueueOptions
30 } from './pool'
31 import type {
32 IWorker,
33 IWorkerNode,
34 WorkerInfo,
35 WorkerType,
36 WorkerUsage
37 } from './worker'
38 import {
39 type MeasurementStatisticsRequirements,
40 Measurements,
41 WorkerChoiceStrategies,
42 type WorkerChoiceStrategy,
43 type WorkerChoiceStrategyOptions
44 } from './selection-strategies/selection-strategies-types'
45 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
46 import { version } from './version'
47 import { WorkerNode } from './worker-node'
48
49 /**
50 * Base class that implements some shared logic for all poolifier pools.
51 *
52 * @typeParam Worker - Type of worker which manages this pool.
53 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
54 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
55 */
56 export abstract class AbstractPool<
57 Worker extends IWorker,
58 Data = unknown,
59 Response = unknown
60 > implements IPool<Worker, Data, Response> {
61 /** @inheritDoc */
62 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
63
64 /** @inheritDoc */
65 public readonly emitter?: PoolEmitter
66
67 /**
68 * The task execution response promise map.
69 *
70 * - `key`: The message id of each submitted task.
71 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
72 *
73 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
74 */
75 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
76 new Map<string, PromiseResponseWrapper<Response>>()
77
78 /**
79 * Worker choice strategy context referencing a worker choice algorithm implementation.
80 */
81 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
82 Worker,
83 Data,
84 Response
85 >
86
87 /**
88 * Whether the pool is starting or not.
89 */
90 private readonly starting: boolean
91 /**
92 * The start timestamp of the pool.
93 */
94 private readonly startTimestamp
95
96 /**
97 * Constructs a new poolifier pool.
98 *
99 * @param numberOfWorkers - Number of workers that this pool should manage.
100 * @param filePath - Path to the worker file.
101 * @param opts - Options for the pool.
102 */
103 public constructor (
104 protected readonly numberOfWorkers: number,
105 protected readonly filePath: string,
106 protected readonly opts: PoolOptions<Worker>
107 ) {
108 if (!this.isMain()) {
109 throw new Error(
110 'Cannot start a pool from a worker with the same type as the pool'
111 )
112 }
113 this.checkNumberOfWorkers(this.numberOfWorkers)
114 this.checkFilePath(this.filePath)
115 this.checkPoolOptions(this.opts)
116
117 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
118 this.executeTask = this.executeTask.bind(this)
119 this.enqueueTask = this.enqueueTask.bind(this)
120 this.dequeueTask = this.dequeueTask.bind(this)
121 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
122
123 if (this.opts.enableEvents === true) {
124 this.emitter = new PoolEmitter()
125 }
126 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
127 Worker,
128 Data,
129 Response
130 >(
131 this,
132 this.opts.workerChoiceStrategy,
133 this.opts.workerChoiceStrategyOptions
134 )
135
136 this.setupHook()
137
138 this.starting = true
139 this.startPool()
140 this.starting = false
141
142 this.startTimestamp = performance.now()
143 }
144
145 private checkFilePath (filePath: string): void {
146 if (
147 filePath == null ||
148 typeof filePath !== 'string' ||
149 (typeof filePath === 'string' && filePath.trim().length === 0)
150 ) {
151 throw new Error('Please specify a file with a worker implementation')
152 }
153 if (!existsSync(filePath)) {
154 throw new Error(`Cannot find the worker file '${filePath}'`)
155 }
156 }
157
158 private checkNumberOfWorkers (numberOfWorkers: number): void {
159 if (numberOfWorkers == null) {
160 throw new Error(
161 'Cannot instantiate a pool without specifying the number of workers'
162 )
163 } else if (!Number.isSafeInteger(numberOfWorkers)) {
164 throw new TypeError(
165 'Cannot instantiate a pool with a non safe integer number of workers'
166 )
167 } else if (numberOfWorkers < 0) {
168 throw new RangeError(
169 'Cannot instantiate a pool with a negative number of workers'
170 )
171 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
172 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
173 }
174 }
175
176 protected checkDynamicPoolSize (min: number, max: number): void {
177 if (this.type === PoolTypes.dynamic) {
178 if (max == null) {
179 throw new Error(
180 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
181 )
182 } else if (!Number.isSafeInteger(max)) {
183 throw new TypeError(
184 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
185 )
186 } else if (min > max) {
187 throw new RangeError(
188 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
189 )
190 } else if (max === 0) {
191 throw new RangeError(
192 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
193 )
194 } else if (min === max) {
195 throw new RangeError(
196 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
197 )
198 }
199 }
200 }
201
202 private checkPoolOptions (opts: PoolOptions<Worker>): void {
203 if (isPlainObject(opts)) {
204 this.opts.workerChoiceStrategy =
205 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
206 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
207 this.opts.workerChoiceStrategyOptions =
208 opts.workerChoiceStrategyOptions ??
209 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
210 this.checkValidWorkerChoiceStrategyOptions(
211 this.opts.workerChoiceStrategyOptions
212 )
213 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
214 this.opts.enableEvents = opts.enableEvents ?? true
215 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
216 if (this.opts.enableTasksQueue) {
217 this.checkValidTasksQueueOptions(
218 opts.tasksQueueOptions as TasksQueueOptions
219 )
220 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
221 opts.tasksQueueOptions as TasksQueueOptions
222 )
223 }
224 } else {
225 throw new TypeError('Invalid pool options: must be a plain object')
226 }
227 }
228
229 private checkValidWorkerChoiceStrategy (
230 workerChoiceStrategy: WorkerChoiceStrategy
231 ): void {
232 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
233 throw new Error(
234 `Invalid worker choice strategy '${workerChoiceStrategy}'`
235 )
236 }
237 }
238
239 private checkValidWorkerChoiceStrategyOptions (
240 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
241 ): void {
242 if (!isPlainObject(workerChoiceStrategyOptions)) {
243 throw new TypeError(
244 'Invalid worker choice strategy options: must be a plain object'
245 )
246 }
247 if (
248 workerChoiceStrategyOptions.weights != null &&
249 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
250 ) {
251 throw new Error(
252 'Invalid worker choice strategy options: must have a weight for each worker node'
253 )
254 }
255 if (
256 workerChoiceStrategyOptions.measurement != null &&
257 !Object.values(Measurements).includes(
258 workerChoiceStrategyOptions.measurement
259 )
260 ) {
261 throw new Error(
262 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
263 )
264 }
265 }
266
267 private checkValidTasksQueueOptions (
268 tasksQueueOptions: TasksQueueOptions
269 ): void {
270 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
271 throw new TypeError('Invalid tasks queue options: must be a plain object')
272 }
273 if (
274 tasksQueueOptions?.concurrency != null &&
275 !Number.isSafeInteger(tasksQueueOptions.concurrency)
276 ) {
277 throw new TypeError(
278 'Invalid worker tasks concurrency: must be an integer'
279 )
280 }
281 if (
282 tasksQueueOptions?.concurrency != null &&
283 tasksQueueOptions.concurrency <= 0
284 ) {
285 throw new Error(
286 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
287 )
288 }
289 }
290
291 private startPool (): void {
292 while (
293 this.workerNodes.reduce(
294 (accumulator, workerNode) =>
295 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
296 0
297 ) < this.numberOfWorkers
298 ) {
299 this.createAndSetupWorkerNode()
300 }
301 }
302
303 /** @inheritDoc */
304 public get info (): PoolInfo {
305 return {
306 version,
307 type: this.type,
308 worker: this.worker,
309 ready: this.ready,
310 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
311 minSize: this.minSize,
312 maxSize: this.maxSize,
313 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
314 .runTime.aggregate &&
315 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
316 .waitTime.aggregate && { utilization: round(this.utilization) }),
317 workerNodes: this.workerNodes.length,
318 idleWorkerNodes: this.workerNodes.reduce(
319 (accumulator, workerNode) =>
320 workerNode.usage.tasks.executing === 0
321 ? accumulator + 1
322 : accumulator,
323 0
324 ),
325 busyWorkerNodes: this.workerNodes.reduce(
326 (accumulator, workerNode) =>
327 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
328 0
329 ),
330 executedTasks: this.workerNodes.reduce(
331 (accumulator, workerNode) =>
332 accumulator + workerNode.usage.tasks.executed,
333 0
334 ),
335 executingTasks: this.workerNodes.reduce(
336 (accumulator, workerNode) =>
337 accumulator + workerNode.usage.tasks.executing,
338 0
339 ),
340 ...(this.opts.enableTasksQueue === true && {
341 queuedTasks: this.workerNodes.reduce(
342 (accumulator, workerNode) =>
343 accumulator + workerNode.usage.tasks.queued,
344 0
345 )
346 }),
347 ...(this.opts.enableTasksQueue === true && {
348 maxQueuedTasks: this.workerNodes.reduce(
349 (accumulator, workerNode) =>
350 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
351 0
352 )
353 }),
354 failedTasks: this.workerNodes.reduce(
355 (accumulator, workerNode) =>
356 accumulator + workerNode.usage.tasks.failed,
357 0
358 ),
359 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
360 .runTime.aggregate && {
361 runTime: {
362 minimum: round(
363 Math.min(
364 ...this.workerNodes.map(
365 (workerNode) => workerNode.usage.runTime?.minimum ?? Infinity
366 )
367 )
368 ),
369 maximum: round(
370 Math.max(
371 ...this.workerNodes.map(
372 (workerNode) => workerNode.usage.runTime?.maximum ?? -Infinity
373 )
374 )
375 ),
376 average: round(
377 this.workerNodes.reduce(
378 (accumulator, workerNode) =>
379 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
380 0
381 ) /
382 this.workerNodes.reduce(
383 (accumulator, workerNode) =>
384 accumulator + (workerNode.usage.tasks?.executed ?? 0),
385 0
386 )
387 ),
388 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
389 .runTime.median && {
390 median: round(
391 median(
392 this.workerNodes.map(
393 (workerNode) => workerNode.usage.runTime?.median ?? 0
394 )
395 )
396 )
397 })
398 }
399 }),
400 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
401 .waitTime.aggregate && {
402 waitTime: {
403 minimum: round(
404 Math.min(
405 ...this.workerNodes.map(
406 (workerNode) => workerNode.usage.waitTime?.minimum ?? Infinity
407 )
408 )
409 ),
410 maximum: round(
411 Math.max(
412 ...this.workerNodes.map(
413 (workerNode) => workerNode.usage.waitTime?.maximum ?? -Infinity
414 )
415 )
416 ),
417 average: round(
418 this.workerNodes.reduce(
419 (accumulator, workerNode) =>
420 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
421 0
422 ) /
423 this.workerNodes.reduce(
424 (accumulator, workerNode) =>
425 accumulator + (workerNode.usage.tasks?.executed ?? 0),
426 0
427 )
428 ),
429 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
430 .waitTime.median && {
431 median: round(
432 median(
433 this.workerNodes.map(
434 (workerNode) => workerNode.usage.waitTime?.median ?? 0
435 )
436 )
437 )
438 })
439 }
440 })
441 }
442 }
443
444 /**
445 * The pool readiness boolean status.
446 */
447 private get ready (): boolean {
448 return (
449 this.workerNodes.reduce(
450 (accumulator, workerNode) =>
451 !workerNode.info.dynamic && workerNode.info.ready
452 ? accumulator + 1
453 : accumulator,
454 0
455 ) >= this.minSize
456 )
457 }
458
459 /**
460 * The approximate pool utilization.
461 *
462 * @returns The pool utilization.
463 */
464 private get utilization (): number {
465 const poolTimeCapacity =
466 (performance.now() - this.startTimestamp) * this.maxSize
467 const totalTasksRunTime = this.workerNodes.reduce(
468 (accumulator, workerNode) =>
469 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
470 0
471 )
472 const totalTasksWaitTime = this.workerNodes.reduce(
473 (accumulator, workerNode) =>
474 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
475 0
476 )
477 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
478 }
479
480 /**
481 * The pool type.
482 *
483 * If it is `'dynamic'`, it provides the `max` property.
484 */
485 protected abstract get type (): PoolType
486
487 /**
488 * The worker type.
489 */
490 protected abstract get worker (): WorkerType
491
492 /**
493 * The pool minimum size.
494 */
495 protected abstract get minSize (): number
496
497 /**
498 * The pool maximum size.
499 */
500 protected abstract get maxSize (): number
501
502 /**
503 * Checks if the worker id sent in the received message from a worker is valid.
504 *
505 * @param message - The received message.
506 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
507 */
508 private checkMessageWorkerId (message: MessageValue<Response>): void {
509 if (message.workerId == null) {
510 throw new Error('Worker message received without worker id')
511 } else if (
512 message.workerId != null &&
513 this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
514 ) {
515 throw new Error(
516 `Worker message received from unknown worker '${message.workerId}'`
517 )
518 }
519 }
520
521 /**
522 * Gets the given worker its worker node key.
523 *
524 * @param worker - The worker.
525 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
526 */
527 private getWorkerNodeKeyByWorker (worker: Worker): number {
528 return this.workerNodes.findIndex(
529 (workerNode) => workerNode.worker === worker
530 )
531 }
532
533 /**
534 * Gets the worker node key given its worker id.
535 *
536 * @param workerId - The worker id.
537 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
538 */
539 private getWorkerNodeKeyByWorkerId (workerId: number): number {
540 return this.workerNodes.findIndex(
541 (workerNode) => workerNode.info.id === workerId
542 )
543 }
544
545 /** @inheritDoc */
546 public setWorkerChoiceStrategy (
547 workerChoiceStrategy: WorkerChoiceStrategy,
548 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
549 ): void {
550 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
551 this.opts.workerChoiceStrategy = workerChoiceStrategy
552 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
553 this.opts.workerChoiceStrategy
554 )
555 if (workerChoiceStrategyOptions != null) {
556 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
557 }
558 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
559 workerNode.resetUsage()
560 this.sendStatisticsMessageToWorker(workerNodeKey)
561 }
562 }
563
564 /** @inheritDoc */
565 public setWorkerChoiceStrategyOptions (
566 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
567 ): void {
568 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
569 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
570 this.workerChoiceStrategyContext.setOptions(
571 this.opts.workerChoiceStrategyOptions
572 )
573 }
574
575 /** @inheritDoc */
576 public enableTasksQueue (
577 enable: boolean,
578 tasksQueueOptions?: TasksQueueOptions
579 ): void {
580 if (this.opts.enableTasksQueue === true && !enable) {
581 this.flushTasksQueues()
582 }
583 this.opts.enableTasksQueue = enable
584 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
585 }
586
587 /** @inheritDoc */
588 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
589 if (this.opts.enableTasksQueue === true) {
590 this.checkValidTasksQueueOptions(tasksQueueOptions)
591 this.opts.tasksQueueOptions =
592 this.buildTasksQueueOptions(tasksQueueOptions)
593 } else if (this.opts.tasksQueueOptions != null) {
594 delete this.opts.tasksQueueOptions
595 }
596 }
597
598 private buildTasksQueueOptions (
599 tasksQueueOptions: TasksQueueOptions
600 ): TasksQueueOptions {
601 return {
602 concurrency: tasksQueueOptions?.concurrency ?? 1
603 }
604 }
605
606 /**
607 * Whether the pool is full or not.
608 *
609 * The pool filling boolean status.
610 */
611 protected get full (): boolean {
612 return this.workerNodes.length >= this.maxSize
613 }
614
615 /**
616 * Whether the pool is busy or not.
617 *
618 * The pool busyness boolean status.
619 */
620 protected abstract get busy (): boolean
621
622 /**
623 * Whether worker nodes are executing concurrently their tasks quota or not.
624 *
625 * @returns Worker nodes busyness boolean status.
626 */
627 protected internalBusy (): boolean {
628 if (this.opts.enableTasksQueue === true) {
629 return (
630 this.workerNodes.findIndex(
631 (workerNode) =>
632 workerNode.info.ready &&
633 workerNode.usage.tasks.executing <
634 (this.opts.tasksQueueOptions?.concurrency as number)
635 ) === -1
636 )
637 } else {
638 return (
639 this.workerNodes.findIndex(
640 (workerNode) =>
641 workerNode.info.ready && workerNode.usage.tasks.executing === 0
642 ) === -1
643 )
644 }
645 }
646
647 /** @inheritDoc */
648 public listTaskFunctions (): string[] {
649 for (const workerNode of this.workerNodes) {
650 if (
651 Array.isArray(workerNode.info.taskFunctions) &&
652 workerNode.info.taskFunctions.length > 0
653 ) {
654 return workerNode.info.taskFunctions
655 }
656 }
657 return []
658 }
659
660 /** @inheritDoc */
661 public async execute (
662 data?: Data,
663 name?: string,
664 transferList?: TransferListItem[]
665 ): Promise<Response> {
666 return await new Promise<Response>((resolve, reject) => {
667 if (name != null && typeof name !== 'string') {
668 reject(new TypeError('name argument must be a string'))
669 }
670 if (
671 name != null &&
672 typeof name === 'string' &&
673 name.trim().length === 0
674 ) {
675 reject(new TypeError('name argument must not be an empty string'))
676 }
677 if (transferList != null && !Array.isArray(transferList)) {
678 reject(new TypeError('transferList argument must be an array'))
679 }
680 const timestamp = performance.now()
681 const workerNodeKey = this.chooseWorkerNode()
682 const workerInfo = this.getWorkerInfo(workerNodeKey)
683 if (
684 name != null &&
685 Array.isArray(workerInfo.taskFunctions) &&
686 !workerInfo.taskFunctions.includes(name)
687 ) {
688 reject(
689 new Error(`Task function '${name}' is not registered in the pool`)
690 )
691 }
692 const task: Task<Data> = {
693 name: name ?? DEFAULT_TASK_NAME,
694 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
695 data: data ?? ({} as Data),
696 transferList,
697 timestamp,
698 workerId: workerInfo.id as number,
699 taskId: randomUUID()
700 }
701 this.promiseResponseMap.set(task.taskId as string, {
702 resolve,
703 reject,
704 workerNodeKey
705 })
706 if (
707 this.opts.enableTasksQueue === false ||
708 (this.opts.enableTasksQueue === true &&
709 this.workerNodes[workerNodeKey].usage.tasks.executing <
710 (this.opts.tasksQueueOptions?.concurrency as number))
711 ) {
712 this.executeTask(workerNodeKey, task)
713 } else {
714 this.enqueueTask(workerNodeKey, task)
715 }
716 this.checkAndEmitEvents()
717 })
718 }
719
720 /** @inheritDoc */
721 public async destroy (): Promise<void> {
722 await Promise.all(
723 this.workerNodes.map(async (_, workerNodeKey) => {
724 await this.destroyWorkerNode(workerNodeKey)
725 })
726 )
727 this.emitter?.emit(PoolEvents.destroy)
728 }
729
730 protected async sendKillMessageToWorker (
731 workerNodeKey: number,
732 workerId: number
733 ): Promise<void> {
734 await new Promise<void>((resolve, reject) => {
735 this.registerWorkerMessageListener(workerNodeKey, (message) => {
736 if (message.kill === 'success') {
737 resolve()
738 } else if (message.kill === 'failure') {
739 reject(new Error(`Worker ${workerId} kill message handling failed`))
740 }
741 })
742 this.sendToWorker(workerNodeKey, { kill: true, workerId })
743 })
744 }
745
746 /**
747 * Terminates the worker node given its worker node key.
748 *
749 * @param workerNodeKey - The worker node key.
750 */
751 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
752
753 /**
754 * Setup hook to execute code before worker nodes are created in the abstract constructor.
755 * Can be overridden.
756 *
757 * @virtual
758 */
759 protected setupHook (): void {
760 // Intentionally empty
761 }
762
763 /**
764 * Should return whether the worker is the main worker or not.
765 */
766 protected abstract isMain (): boolean
767
768 /**
769 * Hook executed before the worker task execution.
770 * Can be overridden.
771 *
772 * @param workerNodeKey - The worker node key.
773 * @param task - The task to execute.
774 */
775 protected beforeTaskExecutionHook (
776 workerNodeKey: number,
777 task: Task<Data>
778 ): void {
779 const workerUsage = this.workerNodes[workerNodeKey].usage
780 ++workerUsage.tasks.executing
781 this.updateWaitTimeWorkerUsage(workerUsage, task)
782 if (this.canUpdateTaskWorkerUsage(workerNodeKey)) {
783 const taskWorkerUsage = this.workerNodes[
784 workerNodeKey
785 ].getTaskWorkerUsage(task.name as string) as WorkerUsage
786 ++taskWorkerUsage.tasks.executing
787 this.updateWaitTimeWorkerUsage(taskWorkerUsage, task)
788 }
789 }
790
791 /**
792 * Hook executed after the worker task execution.
793 * Can be overridden.
794 *
795 * @param workerNodeKey - The worker node key.
796 * @param message - The received message.
797 */
798 protected afterTaskExecutionHook (
799 workerNodeKey: number,
800 message: MessageValue<Response>
801 ): void {
802 const workerUsage = this.workerNodes[workerNodeKey].usage
803 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
804 this.updateRunTimeWorkerUsage(workerUsage, message)
805 this.updateEluWorkerUsage(workerUsage, message)
806 if (this.canUpdateTaskWorkerUsage(workerNodeKey)) {
807 const taskWorkerUsage = this.workerNodes[
808 workerNodeKey
809 ].getTaskWorkerUsage(
810 message.taskPerformance?.name ?? DEFAULT_TASK_NAME
811 ) as WorkerUsage
812 this.updateTaskStatisticsWorkerUsage(taskWorkerUsage, message)
813 this.updateRunTimeWorkerUsage(taskWorkerUsage, message)
814 this.updateEluWorkerUsage(taskWorkerUsage, message)
815 }
816 }
817
818 private canUpdateTaskWorkerUsage (workerNodeKey: number): boolean {
819 const workerInfo = this.getWorkerInfo(workerNodeKey)
820 return (
821 Array.isArray(workerInfo.taskFunctions) &&
822 workerInfo.taskFunctions.length > 1
823 )
824 }
825
826 private updateTaskStatisticsWorkerUsage (
827 workerUsage: WorkerUsage,
828 message: MessageValue<Response>
829 ): void {
830 const workerTaskStatistics = workerUsage.tasks
831 --workerTaskStatistics.executing
832 if (message.taskError == null) {
833 ++workerTaskStatistics.executed
834 } else {
835 ++workerTaskStatistics.failed
836 }
837 }
838
839 private updateRunTimeWorkerUsage (
840 workerUsage: WorkerUsage,
841 message: MessageValue<Response>
842 ): void {
843 updateMeasurementStatistics(
844 workerUsage.runTime,
845 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
846 message.taskPerformance?.runTime ?? 0,
847 workerUsage.tasks.executed
848 )
849 }
850
851 private updateWaitTimeWorkerUsage (
852 workerUsage: WorkerUsage,
853 task: Task<Data>
854 ): void {
855 const timestamp = performance.now()
856 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
857 updateMeasurementStatistics(
858 workerUsage.waitTime,
859 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
860 taskWaitTime,
861 workerUsage.tasks.executed
862 )
863 }
864
865 private updateEluWorkerUsage (
866 workerUsage: WorkerUsage,
867 message: MessageValue<Response>
868 ): void {
869 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
870 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
871 updateMeasurementStatistics(
872 workerUsage.elu.active,
873 eluTaskStatisticsRequirements,
874 message.taskPerformance?.elu?.active ?? 0,
875 workerUsage.tasks.executed
876 )
877 updateMeasurementStatistics(
878 workerUsage.elu.idle,
879 eluTaskStatisticsRequirements,
880 message.taskPerformance?.elu?.idle ?? 0,
881 workerUsage.tasks.executed
882 )
883 if (eluTaskStatisticsRequirements.aggregate) {
884 if (message.taskPerformance?.elu != null) {
885 if (workerUsage.elu.utilization != null) {
886 workerUsage.elu.utilization =
887 (workerUsage.elu.utilization +
888 message.taskPerformance.elu.utilization) /
889 2
890 } else {
891 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
892 }
893 }
894 }
895 }
896
897 /**
898 * Chooses a worker node for the next task.
899 *
900 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
901 *
902 * @returns The chosen worker node key
903 */
904 private chooseWorkerNode (): number {
905 if (this.shallCreateDynamicWorker()) {
906 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
907 if (
908 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
909 ) {
910 return workerNodeKey
911 }
912 }
913 return this.workerChoiceStrategyContext.execute()
914 }
915
916 /**
917 * Conditions for dynamic worker creation.
918 *
919 * @returns Whether to create a dynamic worker or not.
920 */
921 private shallCreateDynamicWorker (): boolean {
922 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
923 }
924
925 /**
926 * Sends a message to worker given its worker node key.
927 *
928 * @param workerNodeKey - The worker node key.
929 * @param message - The message.
930 * @param transferList - The optional array of transferable objects.
931 */
932 protected abstract sendToWorker (
933 workerNodeKey: number,
934 message: MessageValue<Data>,
935 transferList?: TransferListItem[]
936 ): void
937
938 /**
939 * Creates a new worker.
940 *
941 * @returns Newly created worker.
942 */
943 protected abstract createWorker (): Worker
944
945 /**
946 * Creates a new, completely set up worker node.
947 *
948 * @returns New, completely set up worker node key.
949 */
950 protected createAndSetupWorkerNode (): number {
951 const worker = this.createWorker()
952
953 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
954 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
955 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
956 worker.on('error', (error) => {
957 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
958 const workerInfo = this.getWorkerInfo(workerNodeKey)
959 workerInfo.ready = false
960 this.workerNodes[workerNodeKey].closeChannel()
961 this.emitter?.emit(PoolEvents.error, error)
962 if (this.opts.restartWorkerOnError === true && !this.starting) {
963 if (workerInfo.dynamic) {
964 this.createAndSetupDynamicWorkerNode()
965 } else {
966 this.createAndSetupWorkerNode()
967 }
968 }
969 if (this.opts.enableTasksQueue === true) {
970 this.redistributeQueuedTasks(workerNodeKey)
971 }
972 })
973 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
974 worker.once('exit', () => {
975 this.removeWorkerNode(worker)
976 })
977
978 const workerNodeKey = this.addWorkerNode(worker)
979
980 this.afterWorkerNodeSetup(workerNodeKey)
981
982 return workerNodeKey
983 }
984
985 /**
986 * Creates a new, completely set up dynamic worker node.
987 *
988 * @returns New, completely set up dynamic worker node key.
989 */
990 protected createAndSetupDynamicWorkerNode (): number {
991 const workerNodeKey = this.createAndSetupWorkerNode()
992 this.registerWorkerMessageListener(workerNodeKey, (message) => {
993 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
994 message.workerId
995 )
996 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
997 // Kill message received from worker
998 if (
999 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1000 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
1001 ((this.opts.enableTasksQueue === false &&
1002 workerUsage.tasks.executing === 0) ||
1003 (this.opts.enableTasksQueue === true &&
1004 workerUsage.tasks.executing === 0 &&
1005 this.tasksQueueSize(localWorkerNodeKey) === 0)))
1006 ) {
1007 this.destroyWorkerNode(localWorkerNodeKey).catch((error) => {
1008 this.emitter?.emit(PoolEvents.error, error)
1009 })
1010 }
1011 })
1012 const workerInfo = this.getWorkerInfo(workerNodeKey)
1013 this.sendToWorker(workerNodeKey, {
1014 checkActive: true,
1015 workerId: workerInfo.id as number
1016 })
1017 workerInfo.dynamic = true
1018 if (this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker) {
1019 workerInfo.ready = true
1020 }
1021 return workerNodeKey
1022 }
1023
1024 /**
1025 * Registers a listener callback on the worker given its worker node key.
1026 *
1027 * @param workerNodeKey - The worker node key.
1028 * @param listener - The message listener callback.
1029 */
1030 protected abstract registerWorkerMessageListener<
1031 Message extends Data | Response
1032 >(
1033 workerNodeKey: number,
1034 listener: (message: MessageValue<Message>) => void
1035 ): void
1036
1037 /**
1038 * Method hooked up after a worker node has been newly created.
1039 * Can be overridden.
1040 *
1041 * @param workerNodeKey - The newly created worker node key.
1042 */
1043 protected afterWorkerNodeSetup (workerNodeKey: number): void {
1044 // Listen to worker messages.
1045 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
1046 // Send the startup message to worker.
1047 this.sendStartupMessageToWorker(workerNodeKey)
1048 // Send the statistics message to worker.
1049 this.sendStatisticsMessageToWorker(workerNodeKey)
1050 }
1051
1052 /**
1053 * Sends the startup message to worker given its worker node key.
1054 *
1055 * @param workerNodeKey - The worker node key.
1056 */
1057 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1058
1059 /**
1060 * Sends the statistics message to worker given its worker node key.
1061 *
1062 * @param workerNodeKey - The worker node key.
1063 */
1064 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
1065 this.sendToWorker(workerNodeKey, {
1066 statistics: {
1067 runTime:
1068 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1069 .runTime.aggregate,
1070 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1071 .elu.aggregate
1072 },
1073 workerId: this.getWorkerInfo(workerNodeKey).id as number
1074 })
1075 }
1076
1077 private redistributeQueuedTasks (workerNodeKey: number): void {
1078 while (this.tasksQueueSize(workerNodeKey) > 0) {
1079 let targetWorkerNodeKey: number = workerNodeKey
1080 let minQueuedTasks = Infinity
1081 let executeTask = false
1082 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
1083 const workerInfo = this.getWorkerInfo(workerNodeId)
1084 if (
1085 workerNodeId !== workerNodeKey &&
1086 workerInfo.ready &&
1087 workerNode.usage.tasks.queued === 0
1088 ) {
1089 if (
1090 this.workerNodes[workerNodeId].usage.tasks.executing <
1091 (this.opts.tasksQueueOptions?.concurrency as number)
1092 ) {
1093 executeTask = true
1094 }
1095 targetWorkerNodeKey = workerNodeId
1096 break
1097 }
1098 if (
1099 workerNodeId !== workerNodeKey &&
1100 workerInfo.ready &&
1101 workerNode.usage.tasks.queued < minQueuedTasks
1102 ) {
1103 minQueuedTasks = workerNode.usage.tasks.queued
1104 targetWorkerNodeKey = workerNodeId
1105 }
1106 }
1107 if (executeTask) {
1108 this.executeTask(
1109 targetWorkerNodeKey,
1110 this.dequeueTask(workerNodeKey) as Task<Data>
1111 )
1112 } else {
1113 this.enqueueTask(
1114 targetWorkerNodeKey,
1115 this.dequeueTask(workerNodeKey) as Task<Data>
1116 )
1117 }
1118 }
1119 }
1120
1121 /**
1122 * This method is the listener registered for each worker message.
1123 *
1124 * @returns The listener function to execute when a message is received from a worker.
1125 */
1126 protected workerListener (): (message: MessageValue<Response>) => void {
1127 return (message) => {
1128 this.checkMessageWorkerId(message)
1129 if (message.ready != null && message.taskFunctions != null) {
1130 // Worker ready response received from worker
1131 this.handleWorkerReadyResponse(message)
1132 } else if (message.taskId != null) {
1133 // Task execution response received from worker
1134 this.handleTaskExecutionResponse(message)
1135 } else if (message.taskFunctions != null) {
1136 // Task functions message received from worker
1137 this.getWorkerInfo(
1138 this.getWorkerNodeKeyByWorkerId(message.workerId)
1139 ).taskFunctions = message.taskFunctions
1140 }
1141 }
1142 }
1143
1144 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
1145 if (message.ready === false) {
1146 throw new Error(`Worker ${message.workerId} failed to initialize`)
1147 }
1148 const workerInfo = this.getWorkerInfo(
1149 this.getWorkerNodeKeyByWorkerId(message.workerId)
1150 )
1151 workerInfo.ready = message.ready as boolean
1152 workerInfo.taskFunctions = message.taskFunctions
1153 if (this.emitter != null && this.ready) {
1154 this.emitter.emit(PoolEvents.ready, this.info)
1155 }
1156 }
1157
1158 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1159 const { taskId, taskError, data } = message
1160 const promiseResponse = this.promiseResponseMap.get(taskId as string)
1161 if (promiseResponse != null) {
1162 if (taskError != null) {
1163 this.emitter?.emit(PoolEvents.taskError, taskError)
1164 promiseResponse.reject(taskError.message)
1165 } else {
1166 promiseResponse.resolve(data as Response)
1167 }
1168 const workerNodeKey = promiseResponse.workerNodeKey
1169 this.afterTaskExecutionHook(workerNodeKey, message)
1170 this.promiseResponseMap.delete(taskId as string)
1171 if (
1172 this.opts.enableTasksQueue === true &&
1173 this.tasksQueueSize(workerNodeKey) > 0 &&
1174 this.workerNodes[workerNodeKey].usage.tasks.executing <
1175 (this.opts.tasksQueueOptions?.concurrency as number)
1176 ) {
1177 this.executeTask(
1178 workerNodeKey,
1179 this.dequeueTask(workerNodeKey) as Task<Data>
1180 )
1181 }
1182 this.workerChoiceStrategyContext.update(workerNodeKey)
1183 }
1184 }
1185
1186 private checkAndEmitEvents (): void {
1187 if (this.emitter != null) {
1188 if (this.busy) {
1189 this.emitter.emit(PoolEvents.busy, this.info)
1190 }
1191 if (this.type === PoolTypes.dynamic && this.full) {
1192 this.emitter.emit(PoolEvents.full, this.info)
1193 }
1194 }
1195 }
1196
1197 /**
1198 * Gets the worker information given its worker node key.
1199 *
1200 * @param workerNodeKey - The worker node key.
1201 * @returns The worker information.
1202 */
1203 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
1204 return this.workerNodes[workerNodeKey].info
1205 }
1206
1207 /**
1208 * Adds the given worker in the pool worker nodes.
1209 *
1210 * @param worker - The worker.
1211 * @returns The added worker node key.
1212 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1213 */
1214 private addWorkerNode (worker: Worker): number {
1215 const workerNode = new WorkerNode<Worker, Data>(
1216 worker,
1217 this.worker,
1218 this.maxSize
1219 )
1220 // Flag the worker node as ready at pool startup.
1221 if (this.starting) {
1222 workerNode.info.ready = true
1223 }
1224 this.workerNodes.push(workerNode)
1225 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1226 if (workerNodeKey === -1) {
1227 throw new Error('Worker node added not found')
1228 }
1229 return workerNodeKey
1230 }
1231
1232 /**
1233 * Removes the given worker from the pool worker nodes.
1234 *
1235 * @param worker - The worker.
1236 */
1237 private removeWorkerNode (worker: Worker): void {
1238 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1239 if (workerNodeKey !== -1) {
1240 this.workerNodes.splice(workerNodeKey, 1)
1241 this.workerChoiceStrategyContext.remove(workerNodeKey)
1242 }
1243 }
1244
1245 /** @inheritDoc */
1246 public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
1247 if (
1248 this.opts.enableTasksQueue === true &&
1249 this.workerNodes[workerNodeKey].hasBackPressure()
1250 ) {
1251 return true
1252 }
1253 return false
1254 }
1255
1256 /**
1257 * Executes the given task on the worker given its worker node key.
1258 *
1259 * @param workerNodeKey - The worker node key.
1260 * @param task - The task to execute.
1261 */
1262 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1263 this.beforeTaskExecutionHook(workerNodeKey, task)
1264 this.sendToWorker(workerNodeKey, task, task.transferList)
1265 }
1266
1267 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1268 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1269 if (this.hasWorkerNodeBackPressure(workerNodeKey)) {
1270 this.emitter?.emit(PoolEvents.backPressure, {
1271 workerId: this.getWorkerInfo(workerNodeKey).id,
1272 ...this.info
1273 })
1274 }
1275 return tasksQueueSize
1276 }
1277
1278 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1279 return this.workerNodes[workerNodeKey].dequeueTask()
1280 }
1281
1282 private tasksQueueSize (workerNodeKey: number): number {
1283 return this.workerNodes[workerNodeKey].tasksQueueSize()
1284 }
1285
1286 protected flushTasksQueue (workerNodeKey: number): void {
1287 while (this.tasksQueueSize(workerNodeKey) > 0) {
1288 this.executeTask(
1289 workerNodeKey,
1290 this.dequeueTask(workerNodeKey) as Task<Data>
1291 )
1292 }
1293 this.workerNodes[workerNodeKey].clearTasksQueue()
1294 }
1295
1296 private flushTasksQueues (): void {
1297 for (const [workerNodeKey] of this.workerNodes.entries()) {
1298 this.flushTasksQueue(workerNodeKey)
1299 }
1300 }
1301 }