Merge branch 'master' of github.com:poolifier/poolifier into feature/task-functions
[poolifier.git] / src / pools / abstract-pool.ts
1 import { randomUUID } from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import { existsSync } from 'node:fs'
4 import { type TransferListItem } from 'node:worker_threads'
5 import type {
6 MessageValue,
7 PromiseResponseWrapper,
8 Task
9 } from '../utility-types'
10 import {
11 DEFAULT_TASK_NAME,
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
13 EMPTY_FUNCTION,
14 average,
15 isKillBehavior,
16 isPlainObject,
17 max,
18 median,
19 min,
20 round,
21 updateMeasurementStatistics
22 } from '../utils'
23 import { KillBehaviors } from '../worker/worker-options'
24 import type { TaskFunction } from '../worker/task-functions'
25 import {
26 type IPool,
27 PoolEmitter,
28 PoolEvents,
29 type PoolInfo,
30 type PoolOptions,
31 type PoolType,
32 PoolTypes,
33 type TasksQueueOptions
34 } from './pool'
35 import type {
36 IWorker,
37 IWorkerNode,
38 WorkerInfo,
39 WorkerType,
40 WorkerUsage
41 } from './worker'
42 import {
43 type MeasurementStatisticsRequirements,
44 Measurements,
45 WorkerChoiceStrategies,
46 type WorkerChoiceStrategy,
47 type WorkerChoiceStrategyOptions
48 } from './selection-strategies/selection-strategies-types'
49 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
50 import { version } from './version'
51 import { WorkerNode } from './worker-node'
52
53 /**
54 * Base class that implements some shared logic for all poolifier pools.
55 *
56 * @typeParam Worker - Type of worker which manages this pool.
57 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
58 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
59 */
60 export abstract class AbstractPool<
61 Worker extends IWorker,
62 Data = unknown,
63 Response = unknown
64 > implements IPool<Worker, Data, Response> {
65 /** @inheritDoc */
66 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
67
68 /** @inheritDoc */
69 public readonly emitter?: PoolEmitter
70
71 /**
72 * The task execution response promise map.
73 *
74 * - `key`: The message id of each submitted task.
75 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
76 *
77 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
78 */
79 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
80 new Map<string, PromiseResponseWrapper<Response>>()
81
82 /**
83 * Worker choice strategy context referencing a worker choice algorithm implementation.
84 */
85 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
86 Worker,
87 Data,
88 Response
89 >
90
91 /**
92 * Dynamic pool maximum size property placeholder.
93 */
94 protected readonly max?: number
95
96 /**
97 * Whether the pool is starting or not.
98 */
99 private readonly starting: boolean
100 /**
101 * Whether the pool is started or not.
102 */
103 private started: boolean
104 /**
105 * The start timestamp of the pool.
106 */
107 private readonly startTimestamp
108
109 /**
110 * Constructs a new poolifier pool.
111 *
112 * @param numberOfWorkers - Number of workers that this pool should manage.
113 * @param filePath - Path to the worker file.
114 * @param opts - Options for the pool.
115 */
116 public constructor (
117 protected readonly numberOfWorkers: number,
118 protected readonly filePath: string,
119 protected readonly opts: PoolOptions<Worker>
120 ) {
121 if (!this.isMain()) {
122 throw new Error(
123 'Cannot start a pool from a worker with the same type as the pool'
124 )
125 }
126 this.checkNumberOfWorkers(this.numberOfWorkers)
127 this.checkFilePath(this.filePath)
128 this.checkPoolOptions(this.opts)
129
130 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
131 this.executeTask = this.executeTask.bind(this)
132 this.enqueueTask = this.enqueueTask.bind(this)
133
134 if (this.opts.enableEvents === true) {
135 this.emitter = new PoolEmitter()
136 }
137 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
138 Worker,
139 Data,
140 Response
141 >(
142 this,
143 this.opts.workerChoiceStrategy,
144 this.opts.workerChoiceStrategyOptions
145 )
146
147 this.setupHook()
148
149 this.starting = true
150 this.startPool()
151 this.starting = false
152 this.started = true
153
154 this.startTimestamp = performance.now()
155 }
156
157 private checkFilePath (filePath: string): void {
158 if (
159 filePath == null ||
160 typeof filePath !== 'string' ||
161 (typeof filePath === 'string' && filePath.trim().length === 0)
162 ) {
163 throw new Error('Please specify a file with a worker implementation')
164 }
165 if (!existsSync(filePath)) {
166 throw new Error(`Cannot find the worker file '${filePath}'`)
167 }
168 }
169
170 private checkNumberOfWorkers (numberOfWorkers: number): void {
171 if (numberOfWorkers == null) {
172 throw new Error(
173 'Cannot instantiate a pool without specifying the number of workers'
174 )
175 } else if (!Number.isSafeInteger(numberOfWorkers)) {
176 throw new TypeError(
177 'Cannot instantiate a pool with a non safe integer number of workers'
178 )
179 } else if (numberOfWorkers < 0) {
180 throw new RangeError(
181 'Cannot instantiate a pool with a negative number of workers'
182 )
183 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
184 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
185 }
186 }
187
188 protected checkDynamicPoolSize (min: number, max: number): void {
189 if (this.type === PoolTypes.dynamic) {
190 if (max == null) {
191 throw new TypeError(
192 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
193 )
194 } else if (!Number.isSafeInteger(max)) {
195 throw new TypeError(
196 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
197 )
198 } else if (min > max) {
199 throw new RangeError(
200 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
201 )
202 } else if (max === 0) {
203 throw new RangeError(
204 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
205 )
206 } else if (min === max) {
207 throw new RangeError(
208 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
209 )
210 }
211 }
212 }
213
214 private checkPoolOptions (opts: PoolOptions<Worker>): void {
215 if (isPlainObject(opts)) {
216 this.opts.workerChoiceStrategy =
217 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
218 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
219 this.opts.workerChoiceStrategyOptions = {
220 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
221 ...opts.workerChoiceStrategyOptions
222 }
223 this.checkValidWorkerChoiceStrategyOptions(
224 this.opts.workerChoiceStrategyOptions
225 )
226 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
227 this.opts.enableEvents = opts.enableEvents ?? true
228 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
229 if (this.opts.enableTasksQueue) {
230 this.checkValidTasksQueueOptions(
231 opts.tasksQueueOptions as TasksQueueOptions
232 )
233 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
234 opts.tasksQueueOptions as TasksQueueOptions
235 )
236 }
237 } else {
238 throw new TypeError('Invalid pool options: must be a plain object')
239 }
240 }
241
242 private checkValidWorkerChoiceStrategy (
243 workerChoiceStrategy: WorkerChoiceStrategy
244 ): void {
245 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
246 throw new Error(
247 `Invalid worker choice strategy '${workerChoiceStrategy}'`
248 )
249 }
250 }
251
252 private checkValidWorkerChoiceStrategyOptions (
253 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
254 ): void {
255 if (!isPlainObject(workerChoiceStrategyOptions)) {
256 throw new TypeError(
257 'Invalid worker choice strategy options: must be a plain object'
258 )
259 }
260 if (
261 workerChoiceStrategyOptions.retries != null &&
262 !Number.isSafeInteger(workerChoiceStrategyOptions.retries)
263 ) {
264 throw new TypeError(
265 'Invalid worker choice strategy options: retries must be an integer'
266 )
267 }
268 if (
269 workerChoiceStrategyOptions.retries != null &&
270 workerChoiceStrategyOptions.retries < 0
271 ) {
272 throw new RangeError(
273 `Invalid worker choice strategy options: retries '${workerChoiceStrategyOptions.retries}' must be greater or equal than zero`
274 )
275 }
276 if (
277 workerChoiceStrategyOptions.weights != null &&
278 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
279 ) {
280 throw new Error(
281 'Invalid worker choice strategy options: must have a weight for each worker node'
282 )
283 }
284 if (
285 workerChoiceStrategyOptions.measurement != null &&
286 !Object.values(Measurements).includes(
287 workerChoiceStrategyOptions.measurement
288 )
289 ) {
290 throw new Error(
291 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
292 )
293 }
294 }
295
296 private checkValidTasksQueueOptions (
297 tasksQueueOptions: TasksQueueOptions
298 ): void {
299 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
300 throw new TypeError('Invalid tasks queue options: must be a plain object')
301 }
302 if (
303 tasksQueueOptions?.concurrency != null &&
304 !Number.isSafeInteger(tasksQueueOptions?.concurrency)
305 ) {
306 throw new TypeError(
307 'Invalid worker node tasks concurrency: must be an integer'
308 )
309 }
310 if (
311 tasksQueueOptions?.concurrency != null &&
312 tasksQueueOptions?.concurrency <= 0
313 ) {
314 throw new RangeError(
315 `Invalid worker node tasks concurrency: ${tasksQueueOptions?.concurrency} is a negative integer or zero`
316 )
317 }
318 if (tasksQueueOptions?.queueMaxSize != null) {
319 throw new Error(
320 'Invalid tasks queue options: queueMaxSize is deprecated, please use size instead'
321 )
322 }
323 if (
324 tasksQueueOptions?.size != null &&
325 !Number.isSafeInteger(tasksQueueOptions?.size)
326 ) {
327 throw new TypeError(
328 'Invalid worker node tasks queue size: must be an integer'
329 )
330 }
331 if (tasksQueueOptions?.size != null && tasksQueueOptions?.size <= 0) {
332 throw new RangeError(
333 `Invalid worker node tasks queue size: ${tasksQueueOptions?.size} is a negative integer or zero`
334 )
335 }
336 }
337
338 private startPool (): void {
339 while (
340 this.workerNodes.reduce(
341 (accumulator, workerNode) =>
342 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
343 0
344 ) < this.numberOfWorkers
345 ) {
346 this.createAndSetupWorkerNode()
347 }
348 }
349
350 /** @inheritDoc */
351 public get info (): PoolInfo {
352 return {
353 version,
354 type: this.type,
355 worker: this.worker,
356 ready: this.ready,
357 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
358 minSize: this.minSize,
359 maxSize: this.maxSize,
360 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
361 .runTime.aggregate &&
362 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
363 .waitTime.aggregate && { utilization: round(this.utilization) }),
364 workerNodes: this.workerNodes.length,
365 idleWorkerNodes: this.workerNodes.reduce(
366 (accumulator, workerNode) =>
367 workerNode.usage.tasks.executing === 0
368 ? accumulator + 1
369 : accumulator,
370 0
371 ),
372 busyWorkerNodes: this.workerNodes.reduce(
373 (accumulator, workerNode) =>
374 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
375 0
376 ),
377 executedTasks: this.workerNodes.reduce(
378 (accumulator, workerNode) =>
379 accumulator + workerNode.usage.tasks.executed,
380 0
381 ),
382 executingTasks: this.workerNodes.reduce(
383 (accumulator, workerNode) =>
384 accumulator + workerNode.usage.tasks.executing,
385 0
386 ),
387 ...(this.opts.enableTasksQueue === true && {
388 queuedTasks: this.workerNodes.reduce(
389 (accumulator, workerNode) =>
390 accumulator + workerNode.usage.tasks.queued,
391 0
392 )
393 }),
394 ...(this.opts.enableTasksQueue === true && {
395 maxQueuedTasks: this.workerNodes.reduce(
396 (accumulator, workerNode) =>
397 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
398 0
399 )
400 }),
401 ...(this.opts.enableTasksQueue === true && {
402 backPressure: this.hasBackPressure()
403 }),
404 ...(this.opts.enableTasksQueue === true && {
405 stolenTasks: this.workerNodes.reduce(
406 (accumulator, workerNode) =>
407 accumulator + workerNode.usage.tasks.stolen,
408 0
409 )
410 }),
411 failedTasks: this.workerNodes.reduce(
412 (accumulator, workerNode) =>
413 accumulator + workerNode.usage.tasks.failed,
414 0
415 ),
416 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
417 .runTime.aggregate && {
418 runTime: {
419 minimum: round(
420 min(
421 ...this.workerNodes.map(
422 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
423 )
424 )
425 ),
426 maximum: round(
427 max(
428 ...this.workerNodes.map(
429 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
430 )
431 )
432 ),
433 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
434 .runTime.average && {
435 average: round(
436 average(
437 this.workerNodes.reduce<number[]>(
438 (accumulator, workerNode) =>
439 accumulator.concat(workerNode.usage.runTime.history),
440 []
441 )
442 )
443 )
444 }),
445 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
446 .runTime.median && {
447 median: round(
448 median(
449 this.workerNodes.reduce<number[]>(
450 (accumulator, workerNode) =>
451 accumulator.concat(workerNode.usage.runTime.history),
452 []
453 )
454 )
455 )
456 })
457 }
458 }),
459 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
460 .waitTime.aggregate && {
461 waitTime: {
462 minimum: round(
463 min(
464 ...this.workerNodes.map(
465 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
466 )
467 )
468 ),
469 maximum: round(
470 max(
471 ...this.workerNodes.map(
472 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
473 )
474 )
475 ),
476 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
477 .waitTime.average && {
478 average: round(
479 average(
480 this.workerNodes.reduce<number[]>(
481 (accumulator, workerNode) =>
482 accumulator.concat(workerNode.usage.waitTime.history),
483 []
484 )
485 )
486 )
487 }),
488 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
489 .waitTime.median && {
490 median: round(
491 median(
492 this.workerNodes.reduce<number[]>(
493 (accumulator, workerNode) =>
494 accumulator.concat(workerNode.usage.waitTime.history),
495 []
496 )
497 )
498 )
499 })
500 }
501 })
502 }
503 }
504
505 /**
506 * The pool readiness boolean status.
507 */
508 private get ready (): boolean {
509 return (
510 this.workerNodes.reduce(
511 (accumulator, workerNode) =>
512 !workerNode.info.dynamic && workerNode.info.ready
513 ? accumulator + 1
514 : accumulator,
515 0
516 ) >= this.minSize
517 )
518 }
519
520 /**
521 * The approximate pool utilization.
522 *
523 * @returns The pool utilization.
524 */
525 private get utilization (): number {
526 const poolTimeCapacity =
527 (performance.now() - this.startTimestamp) * this.maxSize
528 const totalTasksRunTime = this.workerNodes.reduce(
529 (accumulator, workerNode) =>
530 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
531 0
532 )
533 const totalTasksWaitTime = this.workerNodes.reduce(
534 (accumulator, workerNode) =>
535 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
536 0
537 )
538 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
539 }
540
541 /**
542 * The pool type.
543 *
544 * If it is `'dynamic'`, it provides the `max` property.
545 */
546 protected abstract get type (): PoolType
547
548 /**
549 * The worker type.
550 */
551 protected abstract get worker (): WorkerType
552
553 /**
554 * The pool minimum size.
555 */
556 protected get minSize (): number {
557 return this.numberOfWorkers
558 }
559
560 /**
561 * The pool maximum size.
562 */
563 protected get maxSize (): number {
564 return this.max ?? this.numberOfWorkers
565 }
566
567 /**
568 * Checks if the worker id sent in the received message from a worker is valid.
569 *
570 * @param message - The received message.
571 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
572 */
573 private checkMessageWorkerId (message: MessageValue<Response>): void {
574 if (message.workerId == null) {
575 throw new Error('Worker message received without worker id')
576 } else if (
577 message.workerId != null &&
578 this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
579 ) {
580 throw new Error(
581 `Worker message received from unknown worker '${message.workerId}'`
582 )
583 }
584 }
585
586 /**
587 * Gets the given worker its worker node key.
588 *
589 * @param worker - The worker.
590 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
591 */
592 private getWorkerNodeKeyByWorker (worker: Worker): number {
593 return this.workerNodes.findIndex(
594 workerNode => workerNode.worker === worker
595 )
596 }
597
598 /**
599 * Gets the worker node key given its worker id.
600 *
601 * @param workerId - The worker id.
602 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
603 */
604 private getWorkerNodeKeyByWorkerId (workerId: number): number {
605 return this.workerNodes.findIndex(
606 workerNode => workerNode.info.id === workerId
607 )
608 }
609
610 /** @inheritDoc */
611 public setWorkerChoiceStrategy (
612 workerChoiceStrategy: WorkerChoiceStrategy,
613 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
614 ): void {
615 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
616 this.opts.workerChoiceStrategy = workerChoiceStrategy
617 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
618 this.opts.workerChoiceStrategy
619 )
620 if (workerChoiceStrategyOptions != null) {
621 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
622 }
623 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
624 workerNode.resetUsage()
625 this.sendStatisticsMessageToWorker(workerNodeKey)
626 }
627 }
628
629 /** @inheritDoc */
630 public setWorkerChoiceStrategyOptions (
631 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
632 ): void {
633 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
634 this.opts.workerChoiceStrategyOptions = {
635 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
636 ...workerChoiceStrategyOptions
637 }
638 this.workerChoiceStrategyContext.setOptions(
639 this.opts.workerChoiceStrategyOptions
640 )
641 }
642
643 /** @inheritDoc */
644 public enableTasksQueue (
645 enable: boolean,
646 tasksQueueOptions?: TasksQueueOptions
647 ): void {
648 if (this.opts.enableTasksQueue === true && !enable) {
649 this.flushTasksQueues()
650 }
651 this.opts.enableTasksQueue = enable
652 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
653 }
654
655 /** @inheritDoc */
656 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
657 if (this.opts.enableTasksQueue === true) {
658 this.checkValidTasksQueueOptions(tasksQueueOptions)
659 this.opts.tasksQueueOptions =
660 this.buildTasksQueueOptions(tasksQueueOptions)
661 this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number)
662 } else if (this.opts.tasksQueueOptions != null) {
663 delete this.opts.tasksQueueOptions
664 }
665 }
666
667 private setTasksQueueSize (size: number): void {
668 for (const workerNode of this.workerNodes) {
669 workerNode.tasksQueueBackPressureSize = size
670 }
671 }
672
673 private buildTasksQueueOptions (
674 tasksQueueOptions: TasksQueueOptions
675 ): TasksQueueOptions {
676 return {
677 ...{
678 size: Math.pow(this.maxSize, 2),
679 concurrency: 1
680 },
681 ...tasksQueueOptions
682 }
683 }
684
685 /**
686 * Whether the pool is full or not.
687 *
688 * The pool filling boolean status.
689 */
690 protected get full (): boolean {
691 return this.workerNodes.length >= this.maxSize
692 }
693
694 /**
695 * Whether the pool is busy or not.
696 *
697 * The pool busyness boolean status.
698 */
699 protected abstract get busy (): boolean
700
701 /**
702 * Whether worker nodes are executing concurrently their tasks quota or not.
703 *
704 * @returns Worker nodes busyness boolean status.
705 */
706 protected internalBusy (): boolean {
707 if (this.opts.enableTasksQueue === true) {
708 return (
709 this.workerNodes.findIndex(
710 workerNode =>
711 workerNode.info.ready &&
712 workerNode.usage.tasks.executing <
713 (this.opts.tasksQueueOptions?.concurrency as number)
714 ) === -1
715 )
716 } else {
717 return (
718 this.workerNodes.findIndex(
719 workerNode =>
720 workerNode.info.ready && workerNode.usage.tasks.executing === 0
721 ) === -1
722 )
723 }
724 }
725
726 private sendToWorkers (message: Omit<MessageValue<Data>, 'workerId'>): number {
727 let messagesCount = 0
728 for (const [workerNodeKey] of this.workerNodes.entries()) {
729 this.sendToWorker(workerNodeKey, {
730 ...message,
731 workerId: this.getWorkerInfo(workerNodeKey).id as number
732 })
733 ++messagesCount
734 }
735 return messagesCount
736 }
737
738 /** @inheritDoc */
739 public hasTaskFunction (name: string): boolean {
740 this.sendToWorkers({
741 taskFunctionOperation: 'has',
742 taskFunctionName: name
743 })
744 return true
745 }
746
747 /** @inheritDoc */
748 public addTaskFunction (name: string, taskFunction: TaskFunction): boolean {
749 this.sendToWorkers({
750 taskFunctionOperation: 'add',
751 taskFunctionName: name,
752 taskFunction: taskFunction.toString()
753 })
754 return true
755 }
756
757 /** @inheritDoc */
758 public removeTaskFunction (name: string): boolean {
759 this.sendToWorkers({
760 taskFunctionOperation: 'remove',
761 taskFunctionName: name
762 })
763 return true
764 }
765
766 /** @inheritDoc */
767 public listTaskFunctionNames (): string[] {
768 for (const workerNode of this.workerNodes) {
769 if (
770 Array.isArray(workerNode.info.taskFunctionNames) &&
771 workerNode.info.taskFunctionNames.length > 0
772 ) {
773 return workerNode.info.taskFunctionNames
774 }
775 }
776 return []
777 }
778
779 /** @inheritDoc */
780 public setDefaultTaskFunction (name: string): boolean {
781 this.sendToWorkers({
782 taskFunctionOperation: 'default',
783 taskFunctionName: name
784 })
785 return true
786 }
787
788 private shallExecuteTask (workerNodeKey: number): boolean {
789 return (
790 this.tasksQueueSize(workerNodeKey) === 0 &&
791 this.workerNodes[workerNodeKey].usage.tasks.executing <
792 (this.opts.tasksQueueOptions?.concurrency as number)
793 )
794 }
795
796 /** @inheritDoc */
797 public async execute (
798 data?: Data,
799 name?: string,
800 transferList?: TransferListItem[]
801 ): Promise<Response> {
802 return await new Promise<Response>((resolve, reject) => {
803 if (!this.started) {
804 reject(new Error('Cannot execute a task on destroyed pool'))
805 return
806 }
807 if (name != null && typeof name !== 'string') {
808 reject(new TypeError('name argument must be a string'))
809 return
810 }
811 if (
812 name != null &&
813 typeof name === 'string' &&
814 name.trim().length === 0
815 ) {
816 reject(new TypeError('name argument must not be an empty string'))
817 return
818 }
819 if (transferList != null && !Array.isArray(transferList)) {
820 reject(new TypeError('transferList argument must be an array'))
821 return
822 }
823 const timestamp = performance.now()
824 const workerNodeKey = this.chooseWorkerNode()
825 const task: Task<Data> = {
826 name: name ?? DEFAULT_TASK_NAME,
827 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
828 data: data ?? ({} as Data),
829 transferList,
830 timestamp,
831 workerId: this.getWorkerInfo(workerNodeKey).id as number,
832 taskId: randomUUID()
833 }
834 this.promiseResponseMap.set(task.taskId as string, {
835 resolve,
836 reject,
837 workerNodeKey
838 })
839 if (
840 this.opts.enableTasksQueue === false ||
841 (this.opts.enableTasksQueue === true &&
842 this.shallExecuteTask(workerNodeKey))
843 ) {
844 this.executeTask(workerNodeKey, task)
845 } else {
846 this.enqueueTask(workerNodeKey, task)
847 }
848 })
849 }
850
851 /** @inheritDoc */
852 public async destroy (): Promise<void> {
853 await Promise.all(
854 this.workerNodes.map(async (_, workerNodeKey) => {
855 await this.destroyWorkerNode(workerNodeKey)
856 })
857 )
858 this.emitter?.emit(PoolEvents.destroy, this.info)
859 this.started = false
860 }
861
862 protected async sendKillMessageToWorker (
863 workerNodeKey: number,
864 workerId: number
865 ): Promise<void> {
866 await new Promise<void>((resolve, reject) => {
867 this.registerWorkerMessageListener(workerNodeKey, message => {
868 if (message.kill === 'success') {
869 resolve()
870 } else if (message.kill === 'failure') {
871 reject(new Error(`Worker ${workerId} kill message handling failed`))
872 }
873 })
874 this.sendToWorker(workerNodeKey, { kill: true, workerId })
875 })
876 }
877
878 /**
879 * Terminates the worker node given its worker node key.
880 *
881 * @param workerNodeKey - The worker node key.
882 */
883 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
884
885 /**
886 * Setup hook to execute code before worker nodes are created in the abstract constructor.
887 * Can be overridden.
888 *
889 * @virtual
890 */
891 protected setupHook (): void {
892 /* Intentionally empty */
893 }
894
895 /**
896 * Should return whether the worker is the main worker or not.
897 */
898 protected abstract isMain (): boolean
899
900 /**
901 * Hook executed before the worker task execution.
902 * Can be overridden.
903 *
904 * @param workerNodeKey - The worker node key.
905 * @param task - The task to execute.
906 */
907 protected beforeTaskExecutionHook (
908 workerNodeKey: number,
909 task: Task<Data>
910 ): void {
911 if (this.workerNodes[workerNodeKey]?.usage != null) {
912 const workerUsage = this.workerNodes[workerNodeKey].usage
913 ++workerUsage.tasks.executing
914 this.updateWaitTimeWorkerUsage(workerUsage, task)
915 }
916 if (
917 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
918 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
919 task.name as string
920 ) != null
921 ) {
922 const taskFunctionWorkerUsage = this.workerNodes[
923 workerNodeKey
924 ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
925 ++taskFunctionWorkerUsage.tasks.executing
926 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
927 }
928 }
929
930 /**
931 * Hook executed after the worker task execution.
932 * Can be overridden.
933 *
934 * @param workerNodeKey - The worker node key.
935 * @param message - The received message.
936 */
937 protected afterTaskExecutionHook (
938 workerNodeKey: number,
939 message: MessageValue<Response>
940 ): void {
941 if (this.workerNodes[workerNodeKey]?.usage != null) {
942 const workerUsage = this.workerNodes[workerNodeKey].usage
943 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
944 this.updateRunTimeWorkerUsage(workerUsage, message)
945 this.updateEluWorkerUsage(workerUsage, message)
946 }
947 if (
948 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
949 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
950 message.taskPerformance?.name as string
951 ) != null
952 ) {
953 const taskFunctionWorkerUsage = this.workerNodes[
954 workerNodeKey
955 ].getTaskFunctionWorkerUsage(
956 message.taskPerformance?.name as string
957 ) as WorkerUsage
958 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
959 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
960 this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
961 }
962 }
963
964 /**
965 * Whether the worker node shall update its task function worker usage or not.
966 *
967 * @param workerNodeKey - The worker node key.
968 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
969 */
970 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
971 const workerInfo = this.getWorkerInfo(workerNodeKey)
972 return (
973 workerInfo != null &&
974 Array.isArray(workerInfo.taskFunctionNames) &&
975 workerInfo.taskFunctionNames.length > 2
976 )
977 }
978
979 private updateTaskStatisticsWorkerUsage (
980 workerUsage: WorkerUsage,
981 message: MessageValue<Response>
982 ): void {
983 const workerTaskStatistics = workerUsage.tasks
984 if (
985 workerTaskStatistics.executing != null &&
986 workerTaskStatistics.executing > 0
987 ) {
988 --workerTaskStatistics.executing
989 }
990 if (message.workerError == null) {
991 ++workerTaskStatistics.executed
992 } else {
993 ++workerTaskStatistics.failed
994 }
995 }
996
997 private updateRunTimeWorkerUsage (
998 workerUsage: WorkerUsage,
999 message: MessageValue<Response>
1000 ): void {
1001 if (message.workerError != null) {
1002 return
1003 }
1004 updateMeasurementStatistics(
1005 workerUsage.runTime,
1006 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
1007 message.taskPerformance?.runTime ?? 0
1008 )
1009 }
1010
1011 private updateWaitTimeWorkerUsage (
1012 workerUsage: WorkerUsage,
1013 task: Task<Data>
1014 ): void {
1015 const timestamp = performance.now()
1016 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
1017 updateMeasurementStatistics(
1018 workerUsage.waitTime,
1019 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
1020 taskWaitTime
1021 )
1022 }
1023
1024 private updateEluWorkerUsage (
1025 workerUsage: WorkerUsage,
1026 message: MessageValue<Response>
1027 ): void {
1028 if (message.workerError != null) {
1029 return
1030 }
1031 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
1032 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
1033 updateMeasurementStatistics(
1034 workerUsage.elu.active,
1035 eluTaskStatisticsRequirements,
1036 message.taskPerformance?.elu?.active ?? 0
1037 )
1038 updateMeasurementStatistics(
1039 workerUsage.elu.idle,
1040 eluTaskStatisticsRequirements,
1041 message.taskPerformance?.elu?.idle ?? 0
1042 )
1043 if (eluTaskStatisticsRequirements.aggregate) {
1044 if (message.taskPerformance?.elu != null) {
1045 if (workerUsage.elu.utilization != null) {
1046 workerUsage.elu.utilization =
1047 (workerUsage.elu.utilization +
1048 message.taskPerformance.elu.utilization) /
1049 2
1050 } else {
1051 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
1052 }
1053 }
1054 }
1055 }
1056
1057 /**
1058 * Chooses a worker node for the next task.
1059 *
1060 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
1061 *
1062 * @returns The chosen worker node key
1063 */
1064 private chooseWorkerNode (): number {
1065 if (this.shallCreateDynamicWorker()) {
1066 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
1067 if (
1068 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1069 ) {
1070 return workerNodeKey
1071 }
1072 }
1073 return this.workerChoiceStrategyContext.execute()
1074 }
1075
1076 /**
1077 * Conditions for dynamic worker creation.
1078 *
1079 * @returns Whether to create a dynamic worker or not.
1080 */
1081 private shallCreateDynamicWorker (): boolean {
1082 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
1083 }
1084
1085 /**
1086 * Sends a message to worker given its worker node key.
1087 *
1088 * @param workerNodeKey - The worker node key.
1089 * @param message - The message.
1090 * @param transferList - The optional array of transferable objects.
1091 */
1092 protected abstract sendToWorker (
1093 workerNodeKey: number,
1094 message: MessageValue<Data>,
1095 transferList?: TransferListItem[]
1096 ): void
1097
1098 /**
1099 * Creates a new worker.
1100 *
1101 * @returns Newly created worker.
1102 */
1103 protected abstract createWorker (): Worker
1104
1105 /**
1106 * Creates a new, completely set up worker node.
1107 *
1108 * @returns New, completely set up worker node key.
1109 */
1110 protected createAndSetupWorkerNode (): number {
1111 const worker = this.createWorker()
1112
1113 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
1114 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
1115 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1116 worker.on('error', error => {
1117 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1118 const workerInfo = this.getWorkerInfo(workerNodeKey)
1119 workerInfo.ready = false
1120 this.workerNodes[workerNodeKey].closeChannel()
1121 this.emitter?.emit(PoolEvents.error, error)
1122 if (
1123 this.opts.restartWorkerOnError === true &&
1124 this.started &&
1125 !this.starting
1126 ) {
1127 if (workerInfo.dynamic) {
1128 this.createAndSetupDynamicWorkerNode()
1129 } else {
1130 this.createAndSetupWorkerNode()
1131 }
1132 }
1133 if (this.opts.enableTasksQueue === true) {
1134 this.redistributeQueuedTasks(workerNodeKey)
1135 }
1136 })
1137 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
1138 worker.once('exit', () => {
1139 this.removeWorkerNode(worker)
1140 })
1141
1142 const workerNodeKey = this.addWorkerNode(worker)
1143
1144 this.afterWorkerNodeSetup(workerNodeKey)
1145
1146 return workerNodeKey
1147 }
1148
1149 /**
1150 * Creates a new, completely set up dynamic worker node.
1151 *
1152 * @returns New, completely set up dynamic worker node key.
1153 */
1154 protected createAndSetupDynamicWorkerNode (): number {
1155 const workerNodeKey = this.createAndSetupWorkerNode()
1156 this.registerWorkerMessageListener(workerNodeKey, message => {
1157 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1158 message.workerId
1159 )
1160 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
1161 // Kill message received from worker
1162 if (
1163 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1164 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
1165 ((this.opts.enableTasksQueue === false &&
1166 workerUsage.tasks.executing === 0) ||
1167 (this.opts.enableTasksQueue === true &&
1168 workerUsage.tasks.executing === 0 &&
1169 this.tasksQueueSize(localWorkerNodeKey) === 0)))
1170 ) {
1171 this.destroyWorkerNode(localWorkerNodeKey).catch(error => {
1172 this.emitter?.emit(PoolEvents.error, error)
1173 })
1174 }
1175 })
1176 const workerInfo = this.getWorkerInfo(workerNodeKey)
1177 this.sendToWorker(workerNodeKey, {
1178 checkActive: true,
1179 workerId: workerInfo.id as number
1180 })
1181 workerInfo.dynamic = true
1182 if (
1183 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
1184 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1185 ) {
1186 workerInfo.ready = true
1187 }
1188 this.checkAndEmitDynamicWorkerCreationEvents()
1189 return workerNodeKey
1190 }
1191
1192 /**
1193 * Registers a listener callback on the worker given its worker node key.
1194 *
1195 * @param workerNodeKey - The worker node key.
1196 * @param listener - The message listener callback.
1197 */
1198 protected abstract registerWorkerMessageListener<
1199 Message extends Data | Response
1200 >(
1201 workerNodeKey: number,
1202 listener: (message: MessageValue<Message>) => void
1203 ): void
1204
1205 /**
1206 * Method hooked up after a worker node has been newly created.
1207 * Can be overridden.
1208 *
1209 * @param workerNodeKey - The newly created worker node key.
1210 */
1211 protected afterWorkerNodeSetup (workerNodeKey: number): void {
1212 // Listen to worker messages.
1213 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
1214 // Send the startup message to worker.
1215 this.sendStartupMessageToWorker(workerNodeKey)
1216 // Send the statistics message to worker.
1217 this.sendStatisticsMessageToWorker(workerNodeKey)
1218 if (this.opts.enableTasksQueue === true) {
1219 this.workerNodes[workerNodeKey].onEmptyQueue =
1220 this.taskStealingOnEmptyQueue.bind(this)
1221 this.workerNodes[workerNodeKey].onBackPressure =
1222 this.tasksStealingOnBackPressure.bind(this)
1223 }
1224 }
1225
1226 /**
1227 * Sends the startup message to worker given its worker node key.
1228 *
1229 * @param workerNodeKey - The worker node key.
1230 */
1231 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1232
1233 /**
1234 * Sends the statistics message to worker given its worker node key.
1235 *
1236 * @param workerNodeKey - The worker node key.
1237 */
1238 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
1239 this.sendToWorker(workerNodeKey, {
1240 statistics: {
1241 runTime:
1242 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1243 .runTime.aggregate,
1244 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1245 .elu.aggregate
1246 },
1247 workerId: this.getWorkerInfo(workerNodeKey).id as number
1248 })
1249 }
1250
1251 private redistributeQueuedTasks (workerNodeKey: number): void {
1252 while (this.tasksQueueSize(workerNodeKey) > 0) {
1253 const destinationWorkerNodeKey = this.workerNodes.reduce(
1254 (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
1255 return workerNode.info.ready &&
1256 workerNode.usage.tasks.queued <
1257 workerNodes[minWorkerNodeKey].usage.tasks.queued
1258 ? workerNodeKey
1259 : minWorkerNodeKey
1260 },
1261 0
1262 )
1263 const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey]
1264 const task = {
1265 ...(this.dequeueTask(workerNodeKey) as Task<Data>),
1266 workerId: destinationWorkerNode.info.id as number
1267 }
1268 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1269 this.executeTask(destinationWorkerNodeKey, task)
1270 } else {
1271 this.enqueueTask(destinationWorkerNodeKey, task)
1272 }
1273 }
1274 }
1275
1276 private updateTaskStolenStatisticsWorkerUsage (
1277 workerNodeKey: number,
1278 taskName: string
1279 ): void {
1280 const workerNode = this.workerNodes[workerNodeKey]
1281 if (workerNode?.usage != null) {
1282 ++workerNode.usage.tasks.stolen
1283 }
1284 if (
1285 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1286 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1287 ) {
1288 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1289 taskName
1290 ) as WorkerUsage
1291 ++taskFunctionWorkerUsage.tasks.stolen
1292 }
1293 }
1294
1295 private taskStealingOnEmptyQueue (workerId: number): void {
1296 const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
1297 const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey]
1298 const workerNodes = this.workerNodes
1299 .slice()
1300 .sort(
1301 (workerNodeA, workerNodeB) =>
1302 workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
1303 )
1304 const sourceWorkerNode = workerNodes.find(
1305 workerNode =>
1306 workerNode.info.ready &&
1307 workerNode.info.id !== workerId &&
1308 workerNode.usage.tasks.queued > 0
1309 )
1310 if (sourceWorkerNode != null) {
1311 const task = {
1312 ...(sourceWorkerNode.popTask() as Task<Data>),
1313 workerId: destinationWorkerNode.info.id as number
1314 }
1315 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1316 this.executeTask(destinationWorkerNodeKey, task)
1317 } else {
1318 this.enqueueTask(destinationWorkerNodeKey, task)
1319 }
1320 this.updateTaskStolenStatisticsWorkerUsage(
1321 destinationWorkerNodeKey,
1322 task.name as string
1323 )
1324 }
1325 }
1326
1327 private tasksStealingOnBackPressure (workerId: number): void {
1328 const sizeOffset = 1
1329 if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) {
1330 return
1331 }
1332 const sourceWorkerNode =
1333 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
1334 const workerNodes = this.workerNodes
1335 .slice()
1336 .sort(
1337 (workerNodeA, workerNodeB) =>
1338 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1339 )
1340 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1341 if (
1342 sourceWorkerNode.usage.tasks.queued > 0 &&
1343 workerNode.info.ready &&
1344 workerNode.info.id !== workerId &&
1345 workerNode.usage.tasks.queued <
1346 (this.opts.tasksQueueOptions?.size as number) - sizeOffset
1347 ) {
1348 const task = {
1349 ...(sourceWorkerNode.popTask() as Task<Data>),
1350 workerId: workerNode.info.id as number
1351 }
1352 if (this.shallExecuteTask(workerNodeKey)) {
1353 this.executeTask(workerNodeKey, task)
1354 } else {
1355 this.enqueueTask(workerNodeKey, task)
1356 }
1357 this.updateTaskStolenStatisticsWorkerUsage(
1358 workerNodeKey,
1359 task.name as string
1360 )
1361 }
1362 }
1363 }
1364
1365 /**
1366 * This method is the listener registered for each worker message.
1367 *
1368 * @returns The listener function to execute when a message is received from a worker.
1369 */
1370 protected workerListener (): (message: MessageValue<Response>) => void {
1371 return message => {
1372 this.checkMessageWorkerId(message)
1373 if (message.ready != null && message.taskFunctionNames != null) {
1374 // Worker ready response received from worker
1375 this.handleWorkerReadyResponse(message)
1376 } else if (message.taskId != null) {
1377 // Task execution response received from worker
1378 this.handleTaskExecutionResponse(message)
1379 } else if (message.taskFunctionNames != null) {
1380 // Task function names message received from worker
1381 this.getWorkerInfo(
1382 this.getWorkerNodeKeyByWorkerId(message.workerId)
1383 ).taskFunctionNames = message.taskFunctionNames
1384 } else if (message.taskFunctionOperation != null) {
1385 // Task function operation response received from worker
1386 }
1387 }
1388 }
1389
1390 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
1391 if (message.ready === false) {
1392 throw new Error(`Worker ${message.workerId} failed to initialize`)
1393 }
1394 const workerInfo = this.getWorkerInfo(
1395 this.getWorkerNodeKeyByWorkerId(message.workerId)
1396 )
1397 workerInfo.ready = message.ready as boolean
1398 workerInfo.taskFunctionNames = message.taskFunctionNames
1399 if (this.emitter != null && this.ready) {
1400 this.emitter.emit(PoolEvents.ready, this.info)
1401 }
1402 }
1403
1404 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1405 const { taskId, workerError, data } = message
1406 const promiseResponse = this.promiseResponseMap.get(taskId as string)
1407 if (promiseResponse != null) {
1408 if (workerError != null) {
1409 this.emitter?.emit(PoolEvents.taskError, workerError)
1410 promiseResponse.reject(workerError.message)
1411 } else {
1412 promiseResponse.resolve(data as Response)
1413 }
1414 const workerNodeKey = promiseResponse.workerNodeKey
1415 this.afterTaskExecutionHook(workerNodeKey, message)
1416 this.workerChoiceStrategyContext.update(workerNodeKey)
1417 this.promiseResponseMap.delete(taskId as string)
1418 if (
1419 this.opts.enableTasksQueue === true &&
1420 this.tasksQueueSize(workerNodeKey) > 0 &&
1421 this.workerNodes[workerNodeKey].usage.tasks.executing <
1422 (this.opts.tasksQueueOptions?.concurrency as number)
1423 ) {
1424 this.executeTask(
1425 workerNodeKey,
1426 this.dequeueTask(workerNodeKey) as Task<Data>
1427 )
1428 }
1429 }
1430 }
1431
1432 private checkAndEmitTaskExecutionEvents (): void {
1433 if (this.busy) {
1434 this.emitter?.emit(PoolEvents.busy, this.info)
1435 }
1436 }
1437
1438 private checkAndEmitTaskQueuingEvents (): void {
1439 if (this.hasBackPressure()) {
1440 this.emitter?.emit(PoolEvents.backPressure, this.info)
1441 }
1442 }
1443
1444 private checkAndEmitDynamicWorkerCreationEvents (): void {
1445 if (this.type === PoolTypes.dynamic) {
1446 if (this.full) {
1447 this.emitter?.emit(PoolEvents.full, this.info)
1448 }
1449 }
1450 }
1451
1452 /**
1453 * Gets the worker information given its worker node key.
1454 *
1455 * @param workerNodeKey - The worker node key.
1456 * @returns The worker information.
1457 */
1458 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
1459 return this.workerNodes[workerNodeKey].info
1460 }
1461
1462 /**
1463 * Adds the given worker in the pool worker nodes.
1464 *
1465 * @param worker - The worker.
1466 * @returns The added worker node key.
1467 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1468 */
1469 private addWorkerNode (worker: Worker): number {
1470 const workerNode = new WorkerNode<Worker, Data>(
1471 worker,
1472 this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2)
1473 )
1474 // Flag the worker node as ready at pool startup.
1475 if (this.starting) {
1476 workerNode.info.ready = true
1477 }
1478 this.workerNodes.push(workerNode)
1479 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1480 if (workerNodeKey === -1) {
1481 throw new Error('Worker added not found in worker nodes')
1482 }
1483 return workerNodeKey
1484 }
1485
1486 /**
1487 * Removes the given worker from the pool worker nodes.
1488 *
1489 * @param worker - The worker.
1490 */
1491 private removeWorkerNode (worker: Worker): void {
1492 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1493 if (workerNodeKey !== -1) {
1494 this.workerNodes.splice(workerNodeKey, 1)
1495 this.workerChoiceStrategyContext.remove(workerNodeKey)
1496 }
1497 }
1498
1499 /** @inheritDoc */
1500 public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
1501 return (
1502 this.opts.enableTasksQueue === true &&
1503 this.workerNodes[workerNodeKey].hasBackPressure()
1504 )
1505 }
1506
1507 private hasBackPressure (): boolean {
1508 return (
1509 this.opts.enableTasksQueue === true &&
1510 this.workerNodes.findIndex(
1511 workerNode => !workerNode.hasBackPressure()
1512 ) === -1
1513 )
1514 }
1515
1516 /**
1517 * Executes the given task on the worker given its worker node key.
1518 *
1519 * @param workerNodeKey - The worker node key.
1520 * @param task - The task to execute.
1521 */
1522 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1523 this.beforeTaskExecutionHook(workerNodeKey, task)
1524 this.sendToWorker(workerNodeKey, task, task.transferList)
1525 this.checkAndEmitTaskExecutionEvents()
1526 }
1527
1528 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1529 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1530 this.checkAndEmitTaskQueuingEvents()
1531 return tasksQueueSize
1532 }
1533
1534 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1535 return this.workerNodes[workerNodeKey].dequeueTask()
1536 }
1537
1538 private tasksQueueSize (workerNodeKey: number): number {
1539 return this.workerNodes[workerNodeKey].tasksQueueSize()
1540 }
1541
1542 protected flushTasksQueue (workerNodeKey: number): void {
1543 while (this.tasksQueueSize(workerNodeKey) > 0) {
1544 this.executeTask(
1545 workerNodeKey,
1546 this.dequeueTask(workerNodeKey) as Task<Data>
1547 )
1548 }
1549 this.workerNodes[workerNodeKey].clearTasksQueue()
1550 }
1551
1552 private flushTasksQueues (): void {
1553 for (const [workerNodeKey] of this.workerNodes.entries()) {
1554 this.flushTasksQueue(workerNodeKey)
1555 }
1556 }
1557 }