c1c0a58c784e71dd3d049d5b16298f86dfd8d0a4
[poolifier.git] / src / pools / abstract-pool.ts
1 import { randomUUID } from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import { existsSync } from 'node:fs'
4 import { type TransferListItem } from 'node:worker_threads'
5 import type {
6 MessageValue,
7 PromiseResponseWrapper,
8 Task
9 } from '../utility-types'
10 import {
11 DEFAULT_TASK_NAME,
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
13 EMPTY_FUNCTION,
14 average,
15 isKillBehavior,
16 isPlainObject,
17 max,
18 median,
19 min,
20 round,
21 updateMeasurementStatistics
22 } from '../utils'
23 import { KillBehaviors } from '../worker/worker-options'
24 import type { TaskFunction } from '../worker/task-functions'
25 import {
26 type IPool,
27 PoolEmitter,
28 PoolEvents,
29 type PoolInfo,
30 type PoolOptions,
31 type PoolType,
32 PoolTypes,
33 type TasksQueueOptions
34 } from './pool'
35 import type {
36 IWorker,
37 IWorkerNode,
38 WorkerInfo,
39 WorkerType,
40 WorkerUsage
41 } from './worker'
42 import {
43 type MeasurementStatisticsRequirements,
44 Measurements,
45 WorkerChoiceStrategies,
46 type WorkerChoiceStrategy,
47 type WorkerChoiceStrategyOptions
48 } from './selection-strategies/selection-strategies-types'
49 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
50 import { version } from './version'
51 import { WorkerNode } from './worker-node'
52
53 /**
54 * Base class that implements some shared logic for all poolifier pools.
55 *
56 * @typeParam Worker - Type of worker which manages this pool.
57 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
58 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
59 */
60 export abstract class AbstractPool<
61 Worker extends IWorker,
62 Data = unknown,
63 Response = unknown
64 > implements IPool<Worker, Data, Response> {
65 /** @inheritDoc */
66 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
67
68 /** @inheritDoc */
69 public readonly emitter?: PoolEmitter
70
71 /**
72 * The task execution response promise map.
73 *
74 * - `key`: The message id of each submitted task.
75 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
76 *
77 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
78 */
79 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
80 new Map<string, PromiseResponseWrapper<Response>>()
81
82 /**
83 * Worker choice strategy context referencing a worker choice algorithm implementation.
84 */
85 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
86 Worker,
87 Data,
88 Response
89 >
90
91 /**
92 * Dynamic pool maximum size property placeholder.
93 */
94 protected readonly max?: number
95
96 /**
97 * Whether the pool is starting or not.
98 */
99 private readonly starting: boolean
100 /**
101 * Whether the pool is started or not.
102 */
103 private started: boolean
104 /**
105 * The start timestamp of the pool.
106 */
107 private readonly startTimestamp
108
109 /**
110 * Constructs a new poolifier pool.
111 *
112 * @param numberOfWorkers - Number of workers that this pool should manage.
113 * @param filePath - Path to the worker file.
114 * @param opts - Options for the pool.
115 */
116 public constructor (
117 protected readonly numberOfWorkers: number,
118 protected readonly filePath: string,
119 protected readonly opts: PoolOptions<Worker>
120 ) {
121 if (!this.isMain()) {
122 throw new Error(
123 'Cannot start a pool from a worker with the same type as the pool'
124 )
125 }
126 this.checkNumberOfWorkers(this.numberOfWorkers)
127 this.checkFilePath(this.filePath)
128 this.checkPoolOptions(this.opts)
129
130 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
131 this.executeTask = this.executeTask.bind(this)
132 this.enqueueTask = this.enqueueTask.bind(this)
133
134 if (this.opts.enableEvents === true) {
135 this.emitter = new PoolEmitter()
136 }
137 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
138 Worker,
139 Data,
140 Response
141 >(
142 this,
143 this.opts.workerChoiceStrategy,
144 this.opts.workerChoiceStrategyOptions
145 )
146
147 this.setupHook()
148
149 this.starting = true
150 this.startPool()
151 this.starting = false
152 this.started = true
153
154 this.startTimestamp = performance.now()
155 }
156
157 private checkFilePath (filePath: string): void {
158 if (
159 filePath == null ||
160 typeof filePath !== 'string' ||
161 (typeof filePath === 'string' && filePath.trim().length === 0)
162 ) {
163 throw new Error('Please specify a file with a worker implementation')
164 }
165 if (!existsSync(filePath)) {
166 throw new Error(`Cannot find the worker file '${filePath}'`)
167 }
168 }
169
170 private checkNumberOfWorkers (numberOfWorkers: number): void {
171 if (numberOfWorkers == null) {
172 throw new Error(
173 'Cannot instantiate a pool without specifying the number of workers'
174 )
175 } else if (!Number.isSafeInteger(numberOfWorkers)) {
176 throw new TypeError(
177 'Cannot instantiate a pool with a non safe integer number of workers'
178 )
179 } else if (numberOfWorkers < 0) {
180 throw new RangeError(
181 'Cannot instantiate a pool with a negative number of workers'
182 )
183 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
184 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
185 }
186 }
187
188 protected checkDynamicPoolSize (min: number, max: number): void {
189 if (this.type === PoolTypes.dynamic) {
190 if (max == null) {
191 throw new TypeError(
192 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
193 )
194 } else if (!Number.isSafeInteger(max)) {
195 throw new TypeError(
196 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
197 )
198 } else if (min > max) {
199 throw new RangeError(
200 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
201 )
202 } else if (max === 0) {
203 throw new RangeError(
204 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
205 )
206 } else if (min === max) {
207 throw new RangeError(
208 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
209 )
210 }
211 }
212 }
213
214 private checkPoolOptions (opts: PoolOptions<Worker>): void {
215 if (isPlainObject(opts)) {
216 this.opts.workerChoiceStrategy =
217 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
218 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
219 this.opts.workerChoiceStrategyOptions = {
220 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
221 ...opts.workerChoiceStrategyOptions
222 }
223 this.checkValidWorkerChoiceStrategyOptions(
224 this.opts.workerChoiceStrategyOptions
225 )
226 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
227 this.opts.enableEvents = opts.enableEvents ?? true
228 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
229 if (this.opts.enableTasksQueue) {
230 this.checkValidTasksQueueOptions(
231 opts.tasksQueueOptions as TasksQueueOptions
232 )
233 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
234 opts.tasksQueueOptions as TasksQueueOptions
235 )
236 }
237 } else {
238 throw new TypeError('Invalid pool options: must be a plain object')
239 }
240 }
241
242 private checkValidWorkerChoiceStrategy (
243 workerChoiceStrategy: WorkerChoiceStrategy
244 ): void {
245 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
246 throw new Error(
247 `Invalid worker choice strategy '${workerChoiceStrategy}'`
248 )
249 }
250 }
251
252 private checkValidWorkerChoiceStrategyOptions (
253 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
254 ): void {
255 if (!isPlainObject(workerChoiceStrategyOptions)) {
256 throw new TypeError(
257 'Invalid worker choice strategy options: must be a plain object'
258 )
259 }
260 if (
261 workerChoiceStrategyOptions.retries != null &&
262 !Number.isSafeInteger(workerChoiceStrategyOptions.retries)
263 ) {
264 throw new TypeError(
265 'Invalid worker choice strategy options: retries must be an integer'
266 )
267 }
268 if (
269 workerChoiceStrategyOptions.retries != null &&
270 workerChoiceStrategyOptions.retries < 0
271 ) {
272 throw new RangeError(
273 `Invalid worker choice strategy options: retries '${workerChoiceStrategyOptions.retries}' must be greater or equal than zero`
274 )
275 }
276 if (
277 workerChoiceStrategyOptions.weights != null &&
278 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
279 ) {
280 throw new Error(
281 'Invalid worker choice strategy options: must have a weight for each worker node'
282 )
283 }
284 if (
285 workerChoiceStrategyOptions.measurement != null &&
286 !Object.values(Measurements).includes(
287 workerChoiceStrategyOptions.measurement
288 )
289 ) {
290 throw new Error(
291 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
292 )
293 }
294 }
295
296 private checkValidTasksQueueOptions (
297 tasksQueueOptions: TasksQueueOptions
298 ): void {
299 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
300 throw new TypeError('Invalid tasks queue options: must be a plain object')
301 }
302 if (
303 tasksQueueOptions?.concurrency != null &&
304 !Number.isSafeInteger(tasksQueueOptions?.concurrency)
305 ) {
306 throw new TypeError(
307 'Invalid worker node tasks concurrency: must be an integer'
308 )
309 }
310 if (
311 tasksQueueOptions?.concurrency != null &&
312 tasksQueueOptions?.concurrency <= 0
313 ) {
314 throw new RangeError(
315 `Invalid worker node tasks concurrency: ${tasksQueueOptions?.concurrency} is a negative integer or zero`
316 )
317 }
318 if (tasksQueueOptions?.queueMaxSize != null) {
319 throw new Error(
320 'Invalid tasks queue options: queueMaxSize is deprecated, please use size instead'
321 )
322 }
323 if (
324 tasksQueueOptions?.size != null &&
325 !Number.isSafeInteger(tasksQueueOptions?.size)
326 ) {
327 throw new TypeError(
328 'Invalid worker node tasks queue size: must be an integer'
329 )
330 }
331 if (tasksQueueOptions?.size != null && tasksQueueOptions?.size <= 0) {
332 throw new RangeError(
333 `Invalid worker node tasks queue size: ${tasksQueueOptions?.size} is a negative integer or zero`
334 )
335 }
336 }
337
338 private startPool (): void {
339 while (
340 this.workerNodes.reduce(
341 (accumulator, workerNode) =>
342 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
343 0
344 ) < this.numberOfWorkers
345 ) {
346 this.createAndSetupWorkerNode()
347 }
348 }
349
350 /** @inheritDoc */
351 public get info (): PoolInfo {
352 return {
353 version,
354 type: this.type,
355 worker: this.worker,
356 ready: this.ready,
357 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
358 minSize: this.minSize,
359 maxSize: this.maxSize,
360 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
361 .runTime.aggregate &&
362 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
363 .waitTime.aggregate && { utilization: round(this.utilization) }),
364 workerNodes: this.workerNodes.length,
365 idleWorkerNodes: this.workerNodes.reduce(
366 (accumulator, workerNode) =>
367 workerNode.usage.tasks.executing === 0
368 ? accumulator + 1
369 : accumulator,
370 0
371 ),
372 busyWorkerNodes: this.workerNodes.reduce(
373 (accumulator, workerNode) =>
374 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
375 0
376 ),
377 executedTasks: this.workerNodes.reduce(
378 (accumulator, workerNode) =>
379 accumulator + workerNode.usage.tasks.executed,
380 0
381 ),
382 executingTasks: this.workerNodes.reduce(
383 (accumulator, workerNode) =>
384 accumulator + workerNode.usage.tasks.executing,
385 0
386 ),
387 ...(this.opts.enableTasksQueue === true && {
388 queuedTasks: this.workerNodes.reduce(
389 (accumulator, workerNode) =>
390 accumulator + workerNode.usage.tasks.queued,
391 0
392 )
393 }),
394 ...(this.opts.enableTasksQueue === true && {
395 maxQueuedTasks: this.workerNodes.reduce(
396 (accumulator, workerNode) =>
397 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
398 0
399 )
400 }),
401 ...(this.opts.enableTasksQueue === true && {
402 backPressure: this.hasBackPressure()
403 }),
404 ...(this.opts.enableTasksQueue === true && {
405 stolenTasks: this.workerNodes.reduce(
406 (accumulator, workerNode) =>
407 accumulator + workerNode.usage.tasks.stolen,
408 0
409 )
410 }),
411 failedTasks: this.workerNodes.reduce(
412 (accumulator, workerNode) =>
413 accumulator + workerNode.usage.tasks.failed,
414 0
415 ),
416 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
417 .runTime.aggregate && {
418 runTime: {
419 minimum: round(
420 min(
421 ...this.workerNodes.map(
422 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
423 )
424 )
425 ),
426 maximum: round(
427 max(
428 ...this.workerNodes.map(
429 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
430 )
431 )
432 ),
433 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
434 .runTime.average && {
435 average: round(
436 average(
437 this.workerNodes.reduce<number[]>(
438 (accumulator, workerNode) =>
439 accumulator.concat(workerNode.usage.runTime.history),
440 []
441 )
442 )
443 )
444 }),
445 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
446 .runTime.median && {
447 median: round(
448 median(
449 this.workerNodes.reduce<number[]>(
450 (accumulator, workerNode) =>
451 accumulator.concat(workerNode.usage.runTime.history),
452 []
453 )
454 )
455 )
456 })
457 }
458 }),
459 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
460 .waitTime.aggregate && {
461 waitTime: {
462 minimum: round(
463 min(
464 ...this.workerNodes.map(
465 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
466 )
467 )
468 ),
469 maximum: round(
470 max(
471 ...this.workerNodes.map(
472 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
473 )
474 )
475 ),
476 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
477 .waitTime.average && {
478 average: round(
479 average(
480 this.workerNodes.reduce<number[]>(
481 (accumulator, workerNode) =>
482 accumulator.concat(workerNode.usage.waitTime.history),
483 []
484 )
485 )
486 )
487 }),
488 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
489 .waitTime.median && {
490 median: round(
491 median(
492 this.workerNodes.reduce<number[]>(
493 (accumulator, workerNode) =>
494 accumulator.concat(workerNode.usage.waitTime.history),
495 []
496 )
497 )
498 )
499 })
500 }
501 })
502 }
503 }
504
505 /**
506 * The pool readiness boolean status.
507 */
508 private get ready (): boolean {
509 return (
510 this.workerNodes.reduce(
511 (accumulator, workerNode) =>
512 !workerNode.info.dynamic && workerNode.info.ready
513 ? accumulator + 1
514 : accumulator,
515 0
516 ) >= this.minSize
517 )
518 }
519
520 /**
521 * The approximate pool utilization.
522 *
523 * @returns The pool utilization.
524 */
525 private get utilization (): number {
526 const poolTimeCapacity =
527 (performance.now() - this.startTimestamp) * this.maxSize
528 const totalTasksRunTime = this.workerNodes.reduce(
529 (accumulator, workerNode) =>
530 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
531 0
532 )
533 const totalTasksWaitTime = this.workerNodes.reduce(
534 (accumulator, workerNode) =>
535 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
536 0
537 )
538 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
539 }
540
541 /**
542 * The pool type.
543 *
544 * If it is `'dynamic'`, it provides the `max` property.
545 */
546 protected abstract get type (): PoolType
547
548 /**
549 * The worker type.
550 */
551 protected abstract get worker (): WorkerType
552
553 /**
554 * The pool minimum size.
555 */
556 protected get minSize (): number {
557 return this.numberOfWorkers
558 }
559
560 /**
561 * The pool maximum size.
562 */
563 protected get maxSize (): number {
564 return this.max ?? this.numberOfWorkers
565 }
566
567 /**
568 * Checks if the worker id sent in the received message from a worker is valid.
569 *
570 * @param message - The received message.
571 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
572 */
573 private checkMessageWorkerId (message: MessageValue<Response>): void {
574 if (message.workerId == null) {
575 throw new Error('Worker message received without worker id')
576 } else if (
577 message.workerId != null &&
578 this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
579 ) {
580 throw new Error(
581 `Worker message received from unknown worker '${message.workerId}'`
582 )
583 }
584 }
585
586 /**
587 * Gets the given worker its worker node key.
588 *
589 * @param worker - The worker.
590 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
591 */
592 private getWorkerNodeKeyByWorker (worker: Worker): number {
593 return this.workerNodes.findIndex(
594 workerNode => workerNode.worker === worker
595 )
596 }
597
598 /**
599 * Gets the worker node key given its worker id.
600 *
601 * @param workerId - The worker id.
602 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
603 */
604 private getWorkerNodeKeyByWorkerId (workerId: number): number {
605 return this.workerNodes.findIndex(
606 workerNode => workerNode.info.id === workerId
607 )
608 }
609
610 /** @inheritDoc */
611 public setWorkerChoiceStrategy (
612 workerChoiceStrategy: WorkerChoiceStrategy,
613 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
614 ): void {
615 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
616 this.opts.workerChoiceStrategy = workerChoiceStrategy
617 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
618 this.opts.workerChoiceStrategy
619 )
620 if (workerChoiceStrategyOptions != null) {
621 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
622 }
623 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
624 workerNode.resetUsage()
625 this.sendStatisticsMessageToWorker(workerNodeKey)
626 }
627 }
628
629 /** @inheritDoc */
630 public setWorkerChoiceStrategyOptions (
631 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
632 ): void {
633 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
634 this.opts.workerChoiceStrategyOptions = {
635 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
636 ...workerChoiceStrategyOptions
637 }
638 this.workerChoiceStrategyContext.setOptions(
639 this.opts.workerChoiceStrategyOptions
640 )
641 }
642
643 /** @inheritDoc */
644 public enableTasksQueue (
645 enable: boolean,
646 tasksQueueOptions?: TasksQueueOptions
647 ): void {
648 if (this.opts.enableTasksQueue === true && !enable) {
649 this.flushTasksQueues()
650 }
651 this.opts.enableTasksQueue = enable
652 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
653 }
654
655 /** @inheritDoc */
656 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
657 if (this.opts.enableTasksQueue === true) {
658 this.checkValidTasksQueueOptions(tasksQueueOptions)
659 this.opts.tasksQueueOptions =
660 this.buildTasksQueueOptions(tasksQueueOptions)
661 this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number)
662 } else if (this.opts.tasksQueueOptions != null) {
663 delete this.opts.tasksQueueOptions
664 }
665 }
666
667 private setTasksQueueSize (size: number): void {
668 for (const workerNode of this.workerNodes) {
669 workerNode.tasksQueueBackPressureSize = size
670 }
671 }
672
673 private buildTasksQueueOptions (
674 tasksQueueOptions: TasksQueueOptions
675 ): TasksQueueOptions {
676 return {
677 ...{
678 size: Math.pow(this.maxSize, 2),
679 concurrency: 1
680 },
681 ...tasksQueueOptions
682 }
683 }
684
685 /**
686 * Whether the pool is full or not.
687 *
688 * The pool filling boolean status.
689 */
690 protected get full (): boolean {
691 return this.workerNodes.length >= this.maxSize
692 }
693
694 /**
695 * Whether the pool is busy or not.
696 *
697 * The pool busyness boolean status.
698 */
699 protected abstract get busy (): boolean
700
701 /**
702 * Whether worker nodes are executing concurrently their tasks quota or not.
703 *
704 * @returns Worker nodes busyness boolean status.
705 */
706 protected internalBusy (): boolean {
707 if (this.opts.enableTasksQueue === true) {
708 return (
709 this.workerNodes.findIndex(
710 workerNode =>
711 workerNode.info.ready &&
712 workerNode.usage.tasks.executing <
713 (this.opts.tasksQueueOptions?.concurrency as number)
714 ) === -1
715 )
716 } else {
717 return (
718 this.workerNodes.findIndex(
719 workerNode =>
720 workerNode.info.ready && workerNode.usage.tasks.executing === 0
721 ) === -1
722 )
723 }
724 }
725
726 private async sendTaskFunctionOperationToWorker (
727 message: Omit<MessageValue<Data>, 'workerId'>
728 ): Promise<boolean> {
729 return await new Promise<boolean>((resolve, reject) => {
730 const responsesReceived = new Array<MessageValue<Data | Response>>()
731 for (const [workerNodeKey] of this.workerNodes.entries()) {
732 this.registerWorkerMessageListener(workerNodeKey, message => {
733 if (message.taskFunctionOperationStatus != null) {
734 responsesReceived.push(message)
735 if (
736 responsesReceived.length === this.workerNodes.length &&
737 responsesReceived.every(
738 message => message.taskFunctionOperationStatus === true
739 )
740 ) {
741 resolve(true)
742 } else if (
743 responsesReceived.length === this.workerNodes.length &&
744 responsesReceived.some(
745 message => message.taskFunctionOperationStatus === false
746 )
747 ) {
748 reject(
749 new Error(
750 `Task function operation ${
751 message.taskFunctionOperation as string
752 } failed on worker ${message.workerId}`
753 )
754 )
755 }
756 }
757 })
758 this.sendToWorker(workerNodeKey, {
759 ...message,
760 workerId: this.getWorkerInfo(workerNodeKey).id as number
761 })
762 }
763 })
764 }
765
766 /** @inheritDoc */
767 public hasTaskFunction (name: string): boolean {
768 for (const workerNode of this.workerNodes) {
769 if (
770 Array.isArray(workerNode.info.taskFunctionNames) &&
771 workerNode.info.taskFunctionNames.includes(name)
772 ) {
773 return true
774 }
775 }
776 return false
777 }
778
779 /** @inheritDoc */
780 public async addTaskFunction (
781 name: string,
782 taskFunction: TaskFunction
783 ): Promise<boolean> {
784 return await this.sendTaskFunctionOperationToWorker({
785 taskFunctionOperation: 'add',
786 taskFunctionName: name,
787 taskFunction: taskFunction.toString()
788 })
789 }
790
791 /** @inheritDoc */
792 public async removeTaskFunction (name: string): Promise<boolean> {
793 return await this.sendTaskFunctionOperationToWorker({
794 taskFunctionOperation: 'remove',
795 taskFunctionName: name
796 })
797 }
798
799 /** @inheritDoc */
800 public listTaskFunctionNames (): string[] {
801 for (const workerNode of this.workerNodes) {
802 if (
803 Array.isArray(workerNode.info.taskFunctionNames) &&
804 workerNode.info.taskFunctionNames.length > 0
805 ) {
806 return workerNode.info.taskFunctionNames
807 }
808 }
809 return []
810 }
811
812 /** @inheritDoc */
813 public async setDefaultTaskFunction (name: string): Promise<boolean> {
814 return await this.sendTaskFunctionOperationToWorker({
815 taskFunctionOperation: 'default',
816 taskFunctionName: name
817 })
818 }
819
820 private shallExecuteTask (workerNodeKey: number): boolean {
821 return (
822 this.tasksQueueSize(workerNodeKey) === 0 &&
823 this.workerNodes[workerNodeKey].usage.tasks.executing <
824 (this.opts.tasksQueueOptions?.concurrency as number)
825 )
826 }
827
828 /** @inheritDoc */
829 public async execute (
830 data?: Data,
831 name?: string,
832 transferList?: TransferListItem[]
833 ): Promise<Response> {
834 return await new Promise<Response>((resolve, reject) => {
835 if (!this.started) {
836 reject(new Error('Cannot execute a task on destroyed pool'))
837 return
838 }
839 if (name != null && typeof name !== 'string') {
840 reject(new TypeError('name argument must be a string'))
841 return
842 }
843 if (
844 name != null &&
845 typeof name === 'string' &&
846 name.trim().length === 0
847 ) {
848 reject(new TypeError('name argument must not be an empty string'))
849 return
850 }
851 if (transferList != null && !Array.isArray(transferList)) {
852 reject(new TypeError('transferList argument must be an array'))
853 return
854 }
855 const timestamp = performance.now()
856 const workerNodeKey = this.chooseWorkerNode()
857 const task: Task<Data> = {
858 name: name ?? DEFAULT_TASK_NAME,
859 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
860 data: data ?? ({} as Data),
861 transferList,
862 timestamp,
863 workerId: this.getWorkerInfo(workerNodeKey).id as number,
864 taskId: randomUUID()
865 }
866 this.promiseResponseMap.set(task.taskId as string, {
867 resolve,
868 reject,
869 workerNodeKey
870 })
871 if (
872 this.opts.enableTasksQueue === false ||
873 (this.opts.enableTasksQueue === true &&
874 this.shallExecuteTask(workerNodeKey))
875 ) {
876 this.executeTask(workerNodeKey, task)
877 } else {
878 this.enqueueTask(workerNodeKey, task)
879 }
880 })
881 }
882
883 /** @inheritDoc */
884 public async destroy (): Promise<void> {
885 await Promise.all(
886 this.workerNodes.map(async (_, workerNodeKey) => {
887 await this.destroyWorkerNode(workerNodeKey)
888 })
889 )
890 this.emitter?.emit(PoolEvents.destroy, this.info)
891 this.started = false
892 }
893
894 protected async sendKillMessageToWorker (
895 workerNodeKey: number,
896 workerId: number
897 ): Promise<void> {
898 await new Promise<void>((resolve, reject) => {
899 this.registerWorkerMessageListener(workerNodeKey, message => {
900 if (message.kill === 'success') {
901 resolve()
902 } else if (message.kill === 'failure') {
903 reject(new Error(`Worker ${workerId} kill message handling failed`))
904 }
905 })
906 this.sendToWorker(workerNodeKey, { kill: true, workerId })
907 })
908 }
909
910 /**
911 * Terminates the worker node given its worker node key.
912 *
913 * @param workerNodeKey - The worker node key.
914 */
915 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
916
917 /**
918 * Setup hook to execute code before worker nodes are created in the abstract constructor.
919 * Can be overridden.
920 *
921 * @virtual
922 */
923 protected setupHook (): void {
924 /* Intentionally empty */
925 }
926
927 /**
928 * Should return whether the worker is the main worker or not.
929 */
930 protected abstract isMain (): boolean
931
932 /**
933 * Hook executed before the worker task execution.
934 * Can be overridden.
935 *
936 * @param workerNodeKey - The worker node key.
937 * @param task - The task to execute.
938 */
939 protected beforeTaskExecutionHook (
940 workerNodeKey: number,
941 task: Task<Data>
942 ): void {
943 if (this.workerNodes[workerNodeKey]?.usage != null) {
944 const workerUsage = this.workerNodes[workerNodeKey].usage
945 ++workerUsage.tasks.executing
946 this.updateWaitTimeWorkerUsage(workerUsage, task)
947 }
948 if (
949 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
950 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
951 task.name as string
952 ) != null
953 ) {
954 const taskFunctionWorkerUsage = this.workerNodes[
955 workerNodeKey
956 ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
957 ++taskFunctionWorkerUsage.tasks.executing
958 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
959 }
960 }
961
962 /**
963 * Hook executed after the worker task execution.
964 * Can be overridden.
965 *
966 * @param workerNodeKey - The worker node key.
967 * @param message - The received message.
968 */
969 protected afterTaskExecutionHook (
970 workerNodeKey: number,
971 message: MessageValue<Response>
972 ): void {
973 if (this.workerNodes[workerNodeKey]?.usage != null) {
974 const workerUsage = this.workerNodes[workerNodeKey].usage
975 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
976 this.updateRunTimeWorkerUsage(workerUsage, message)
977 this.updateEluWorkerUsage(workerUsage, message)
978 }
979 if (
980 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
981 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
982 message.taskPerformance?.name as string
983 ) != null
984 ) {
985 const taskFunctionWorkerUsage = this.workerNodes[
986 workerNodeKey
987 ].getTaskFunctionWorkerUsage(
988 message.taskPerformance?.name as string
989 ) as WorkerUsage
990 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
991 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
992 this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
993 }
994 }
995
996 /**
997 * Whether the worker node shall update its task function worker usage or not.
998 *
999 * @param workerNodeKey - The worker node key.
1000 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1001 */
1002 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
1003 const workerInfo = this.getWorkerInfo(workerNodeKey)
1004 return (
1005 workerInfo != null &&
1006 Array.isArray(workerInfo.taskFunctionNames) &&
1007 workerInfo.taskFunctionNames.length > 2
1008 )
1009 }
1010
1011 private updateTaskStatisticsWorkerUsage (
1012 workerUsage: WorkerUsage,
1013 message: MessageValue<Response>
1014 ): void {
1015 const workerTaskStatistics = workerUsage.tasks
1016 if (
1017 workerTaskStatistics.executing != null &&
1018 workerTaskStatistics.executing > 0
1019 ) {
1020 --workerTaskStatistics.executing
1021 }
1022 if (message.workerError == null) {
1023 ++workerTaskStatistics.executed
1024 } else {
1025 ++workerTaskStatistics.failed
1026 }
1027 }
1028
1029 private updateRunTimeWorkerUsage (
1030 workerUsage: WorkerUsage,
1031 message: MessageValue<Response>
1032 ): void {
1033 if (message.workerError != null) {
1034 return
1035 }
1036 updateMeasurementStatistics(
1037 workerUsage.runTime,
1038 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
1039 message.taskPerformance?.runTime ?? 0
1040 )
1041 }
1042
1043 private updateWaitTimeWorkerUsage (
1044 workerUsage: WorkerUsage,
1045 task: Task<Data>
1046 ): void {
1047 const timestamp = performance.now()
1048 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
1049 updateMeasurementStatistics(
1050 workerUsage.waitTime,
1051 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
1052 taskWaitTime
1053 )
1054 }
1055
1056 private updateEluWorkerUsage (
1057 workerUsage: WorkerUsage,
1058 message: MessageValue<Response>
1059 ): void {
1060 if (message.workerError != null) {
1061 return
1062 }
1063 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
1064 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
1065 updateMeasurementStatistics(
1066 workerUsage.elu.active,
1067 eluTaskStatisticsRequirements,
1068 message.taskPerformance?.elu?.active ?? 0
1069 )
1070 updateMeasurementStatistics(
1071 workerUsage.elu.idle,
1072 eluTaskStatisticsRequirements,
1073 message.taskPerformance?.elu?.idle ?? 0
1074 )
1075 if (eluTaskStatisticsRequirements.aggregate) {
1076 if (message.taskPerformance?.elu != null) {
1077 if (workerUsage.elu.utilization != null) {
1078 workerUsage.elu.utilization =
1079 (workerUsage.elu.utilization +
1080 message.taskPerformance.elu.utilization) /
1081 2
1082 } else {
1083 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
1084 }
1085 }
1086 }
1087 }
1088
1089 /**
1090 * Chooses a worker node for the next task.
1091 *
1092 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
1093 *
1094 * @returns The chosen worker node key
1095 */
1096 private chooseWorkerNode (): number {
1097 if (this.shallCreateDynamicWorker()) {
1098 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
1099 if (
1100 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1101 ) {
1102 return workerNodeKey
1103 }
1104 }
1105 return this.workerChoiceStrategyContext.execute()
1106 }
1107
1108 /**
1109 * Conditions for dynamic worker creation.
1110 *
1111 * @returns Whether to create a dynamic worker or not.
1112 */
1113 private shallCreateDynamicWorker (): boolean {
1114 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
1115 }
1116
1117 /**
1118 * Sends a message to worker given its worker node key.
1119 *
1120 * @param workerNodeKey - The worker node key.
1121 * @param message - The message.
1122 * @param transferList - The optional array of transferable objects.
1123 */
1124 protected abstract sendToWorker (
1125 workerNodeKey: number,
1126 message: MessageValue<Data>,
1127 transferList?: TransferListItem[]
1128 ): void
1129
1130 /**
1131 * Creates a new worker.
1132 *
1133 * @returns Newly created worker.
1134 */
1135 protected abstract createWorker (): Worker
1136
1137 /**
1138 * Creates a new, completely set up worker node.
1139 *
1140 * @returns New, completely set up worker node key.
1141 */
1142 protected createAndSetupWorkerNode (): number {
1143 const worker = this.createWorker()
1144
1145 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
1146 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
1147 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1148 worker.on('error', error => {
1149 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1150 const workerInfo = this.getWorkerInfo(workerNodeKey)
1151 workerInfo.ready = false
1152 this.workerNodes[workerNodeKey].closeChannel()
1153 this.emitter?.emit(PoolEvents.error, error)
1154 if (
1155 this.opts.restartWorkerOnError === true &&
1156 this.started &&
1157 !this.starting
1158 ) {
1159 if (workerInfo.dynamic) {
1160 this.createAndSetupDynamicWorkerNode()
1161 } else {
1162 this.createAndSetupWorkerNode()
1163 }
1164 }
1165 if (this.opts.enableTasksQueue === true) {
1166 this.redistributeQueuedTasks(workerNodeKey)
1167 }
1168 })
1169 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
1170 worker.once('exit', () => {
1171 this.removeWorkerNode(worker)
1172 })
1173
1174 const workerNodeKey = this.addWorkerNode(worker)
1175
1176 this.afterWorkerNodeSetup(workerNodeKey)
1177
1178 return workerNodeKey
1179 }
1180
1181 /**
1182 * Creates a new, completely set up dynamic worker node.
1183 *
1184 * @returns New, completely set up dynamic worker node key.
1185 */
1186 protected createAndSetupDynamicWorkerNode (): number {
1187 const workerNodeKey = this.createAndSetupWorkerNode()
1188 this.registerWorkerMessageListener(workerNodeKey, message => {
1189 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1190 message.workerId
1191 )
1192 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
1193 // Kill message received from worker
1194 if (
1195 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1196 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
1197 ((this.opts.enableTasksQueue === false &&
1198 workerUsage.tasks.executing === 0) ||
1199 (this.opts.enableTasksQueue === true &&
1200 workerUsage.tasks.executing === 0 &&
1201 this.tasksQueueSize(localWorkerNodeKey) === 0)))
1202 ) {
1203 this.destroyWorkerNode(localWorkerNodeKey).catch(error => {
1204 this.emitter?.emit(PoolEvents.error, error)
1205 })
1206 }
1207 })
1208 const workerInfo = this.getWorkerInfo(workerNodeKey)
1209 this.sendToWorker(workerNodeKey, {
1210 checkActive: true,
1211 workerId: workerInfo.id as number
1212 })
1213 workerInfo.dynamic = true
1214 if (
1215 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
1216 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1217 ) {
1218 workerInfo.ready = true
1219 }
1220 this.checkAndEmitDynamicWorkerCreationEvents()
1221 return workerNodeKey
1222 }
1223
1224 /**
1225 * Registers a listener callback on the worker given its worker node key.
1226 *
1227 * @param workerNodeKey - The worker node key.
1228 * @param listener - The message listener callback.
1229 */
1230 protected abstract registerWorkerMessageListener<
1231 Message extends Data | Response
1232 >(
1233 workerNodeKey: number,
1234 listener: (message: MessageValue<Message>) => void
1235 ): void
1236
1237 /**
1238 * Method hooked up after a worker node has been newly created.
1239 * Can be overridden.
1240 *
1241 * @param workerNodeKey - The newly created worker node key.
1242 */
1243 protected afterWorkerNodeSetup (workerNodeKey: number): void {
1244 // Listen to worker messages.
1245 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
1246 // Send the startup message to worker.
1247 this.sendStartupMessageToWorker(workerNodeKey)
1248 // Send the statistics message to worker.
1249 this.sendStatisticsMessageToWorker(workerNodeKey)
1250 if (this.opts.enableTasksQueue === true) {
1251 this.workerNodes[workerNodeKey].onEmptyQueue =
1252 this.taskStealingOnEmptyQueue.bind(this)
1253 this.workerNodes[workerNodeKey].onBackPressure =
1254 this.tasksStealingOnBackPressure.bind(this)
1255 }
1256 }
1257
1258 /**
1259 * Sends the startup message to worker given its worker node key.
1260 *
1261 * @param workerNodeKey - The worker node key.
1262 */
1263 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1264
1265 /**
1266 * Sends the statistics message to worker given its worker node key.
1267 *
1268 * @param workerNodeKey - The worker node key.
1269 */
1270 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
1271 this.sendToWorker(workerNodeKey, {
1272 statistics: {
1273 runTime:
1274 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1275 .runTime.aggregate,
1276 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1277 .elu.aggregate
1278 },
1279 workerId: this.getWorkerInfo(workerNodeKey).id as number
1280 })
1281 }
1282
1283 private redistributeQueuedTasks (workerNodeKey: number): void {
1284 while (this.tasksQueueSize(workerNodeKey) > 0) {
1285 const destinationWorkerNodeKey = this.workerNodes.reduce(
1286 (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
1287 return workerNode.info.ready &&
1288 workerNode.usage.tasks.queued <
1289 workerNodes[minWorkerNodeKey].usage.tasks.queued
1290 ? workerNodeKey
1291 : minWorkerNodeKey
1292 },
1293 0
1294 )
1295 const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey]
1296 const task = {
1297 ...(this.dequeueTask(workerNodeKey) as Task<Data>),
1298 workerId: destinationWorkerNode.info.id as number
1299 }
1300 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1301 this.executeTask(destinationWorkerNodeKey, task)
1302 } else {
1303 this.enqueueTask(destinationWorkerNodeKey, task)
1304 }
1305 }
1306 }
1307
1308 private updateTaskStolenStatisticsWorkerUsage (
1309 workerNodeKey: number,
1310 taskName: string
1311 ): void {
1312 const workerNode = this.workerNodes[workerNodeKey]
1313 if (workerNode?.usage != null) {
1314 ++workerNode.usage.tasks.stolen
1315 }
1316 if (
1317 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1318 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1319 ) {
1320 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1321 taskName
1322 ) as WorkerUsage
1323 ++taskFunctionWorkerUsage.tasks.stolen
1324 }
1325 }
1326
1327 private taskStealingOnEmptyQueue (workerId: number): void {
1328 const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
1329 const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey]
1330 const workerNodes = this.workerNodes
1331 .slice()
1332 .sort(
1333 (workerNodeA, workerNodeB) =>
1334 workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
1335 )
1336 const sourceWorkerNode = workerNodes.find(
1337 workerNode =>
1338 workerNode.info.ready &&
1339 workerNode.info.id !== workerId &&
1340 workerNode.usage.tasks.queued > 0
1341 )
1342 if (sourceWorkerNode != null) {
1343 const task = {
1344 ...(sourceWorkerNode.popTask() as Task<Data>),
1345 workerId: destinationWorkerNode.info.id as number
1346 }
1347 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1348 this.executeTask(destinationWorkerNodeKey, task)
1349 } else {
1350 this.enqueueTask(destinationWorkerNodeKey, task)
1351 }
1352 this.updateTaskStolenStatisticsWorkerUsage(
1353 destinationWorkerNodeKey,
1354 task.name as string
1355 )
1356 }
1357 }
1358
1359 private tasksStealingOnBackPressure (workerId: number): void {
1360 const sizeOffset = 1
1361 if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) {
1362 return
1363 }
1364 const sourceWorkerNode =
1365 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
1366 const workerNodes = this.workerNodes
1367 .slice()
1368 .sort(
1369 (workerNodeA, workerNodeB) =>
1370 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1371 )
1372 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1373 if (
1374 sourceWorkerNode.usage.tasks.queued > 0 &&
1375 workerNode.info.ready &&
1376 workerNode.info.id !== workerId &&
1377 workerNode.usage.tasks.queued <
1378 (this.opts.tasksQueueOptions?.size as number) - sizeOffset
1379 ) {
1380 const task = {
1381 ...(sourceWorkerNode.popTask() as Task<Data>),
1382 workerId: workerNode.info.id as number
1383 }
1384 if (this.shallExecuteTask(workerNodeKey)) {
1385 this.executeTask(workerNodeKey, task)
1386 } else {
1387 this.enqueueTask(workerNodeKey, task)
1388 }
1389 this.updateTaskStolenStatisticsWorkerUsage(
1390 workerNodeKey,
1391 task.name as string
1392 )
1393 }
1394 }
1395 }
1396
1397 /**
1398 * This method is the listener registered for each worker message.
1399 *
1400 * @returns The listener function to execute when a message is received from a worker.
1401 */
1402 protected workerListener (): (message: MessageValue<Response>) => void {
1403 return message => {
1404 this.checkMessageWorkerId(message)
1405 if (message.ready != null && message.taskFunctionNames != null) {
1406 // Worker ready response received from worker
1407 this.handleWorkerReadyResponse(message)
1408 } else if (message.taskId != null) {
1409 // Task execution response received from worker
1410 this.handleTaskExecutionResponse(message)
1411 } else if (message.taskFunctionNames != null) {
1412 // Task function names message received from worker
1413 this.getWorkerInfo(
1414 this.getWorkerNodeKeyByWorkerId(message.workerId)
1415 ).taskFunctionNames = message.taskFunctionNames
1416 } else if (message.taskFunctionOperation != null) {
1417 // Task function operation response received from worker
1418 }
1419 }
1420 }
1421
1422 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
1423 if (message.ready === false) {
1424 throw new Error(`Worker ${message.workerId} failed to initialize`)
1425 }
1426 const workerInfo = this.getWorkerInfo(
1427 this.getWorkerNodeKeyByWorkerId(message.workerId)
1428 )
1429 workerInfo.ready = message.ready as boolean
1430 workerInfo.taskFunctionNames = message.taskFunctionNames
1431 if (this.emitter != null && this.ready) {
1432 this.emitter.emit(PoolEvents.ready, this.info)
1433 }
1434 }
1435
1436 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1437 const { taskId, workerError, data } = message
1438 const promiseResponse = this.promiseResponseMap.get(taskId as string)
1439 if (promiseResponse != null) {
1440 if (workerError != null) {
1441 this.emitter?.emit(PoolEvents.taskError, workerError)
1442 promiseResponse.reject(workerError.message)
1443 } else {
1444 promiseResponse.resolve(data as Response)
1445 }
1446 const workerNodeKey = promiseResponse.workerNodeKey
1447 this.afterTaskExecutionHook(workerNodeKey, message)
1448 this.workerChoiceStrategyContext.update(workerNodeKey)
1449 this.promiseResponseMap.delete(taskId as string)
1450 if (
1451 this.opts.enableTasksQueue === true &&
1452 this.tasksQueueSize(workerNodeKey) > 0 &&
1453 this.workerNodes[workerNodeKey].usage.tasks.executing <
1454 (this.opts.tasksQueueOptions?.concurrency as number)
1455 ) {
1456 this.executeTask(
1457 workerNodeKey,
1458 this.dequeueTask(workerNodeKey) as Task<Data>
1459 )
1460 }
1461 }
1462 }
1463
1464 private checkAndEmitTaskExecutionEvents (): void {
1465 if (this.busy) {
1466 this.emitter?.emit(PoolEvents.busy, this.info)
1467 }
1468 }
1469
1470 private checkAndEmitTaskQueuingEvents (): void {
1471 if (this.hasBackPressure()) {
1472 this.emitter?.emit(PoolEvents.backPressure, this.info)
1473 }
1474 }
1475
1476 private checkAndEmitDynamicWorkerCreationEvents (): void {
1477 if (this.type === PoolTypes.dynamic) {
1478 if (this.full) {
1479 this.emitter?.emit(PoolEvents.full, this.info)
1480 }
1481 }
1482 }
1483
1484 /**
1485 * Gets the worker information given its worker node key.
1486 *
1487 * @param workerNodeKey - The worker node key.
1488 * @returns The worker information.
1489 */
1490 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
1491 return this.workerNodes[workerNodeKey].info
1492 }
1493
1494 /**
1495 * Adds the given worker in the pool worker nodes.
1496 *
1497 * @param worker - The worker.
1498 * @returns The added worker node key.
1499 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1500 */
1501 private addWorkerNode (worker: Worker): number {
1502 const workerNode = new WorkerNode<Worker, Data>(
1503 worker,
1504 this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2)
1505 )
1506 // Flag the worker node as ready at pool startup.
1507 if (this.starting) {
1508 workerNode.info.ready = true
1509 }
1510 this.workerNodes.push(workerNode)
1511 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1512 if (workerNodeKey === -1) {
1513 throw new Error('Worker added not found in worker nodes')
1514 }
1515 return workerNodeKey
1516 }
1517
1518 /**
1519 * Removes the given worker from the pool worker nodes.
1520 *
1521 * @param worker - The worker.
1522 */
1523 private removeWorkerNode (worker: Worker): void {
1524 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1525 if (workerNodeKey !== -1) {
1526 this.workerNodes.splice(workerNodeKey, 1)
1527 this.workerChoiceStrategyContext.remove(workerNodeKey)
1528 }
1529 }
1530
1531 /** @inheritDoc */
1532 public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
1533 return (
1534 this.opts.enableTasksQueue === true &&
1535 this.workerNodes[workerNodeKey].hasBackPressure()
1536 )
1537 }
1538
1539 private hasBackPressure (): boolean {
1540 return (
1541 this.opts.enableTasksQueue === true &&
1542 this.workerNodes.findIndex(
1543 workerNode => !workerNode.hasBackPressure()
1544 ) === -1
1545 )
1546 }
1547
1548 /**
1549 * Executes the given task on the worker given its worker node key.
1550 *
1551 * @param workerNodeKey - The worker node key.
1552 * @param task - The task to execute.
1553 */
1554 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1555 this.beforeTaskExecutionHook(workerNodeKey, task)
1556 this.sendToWorker(workerNodeKey, task, task.transferList)
1557 this.checkAndEmitTaskExecutionEvents()
1558 }
1559
1560 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1561 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1562 this.checkAndEmitTaskQueuingEvents()
1563 return tasksQueueSize
1564 }
1565
1566 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1567 return this.workerNodes[workerNodeKey].dequeueTask()
1568 }
1569
1570 private tasksQueueSize (workerNodeKey: number): number {
1571 return this.workerNodes[workerNodeKey].tasksQueueSize()
1572 }
1573
1574 protected flushTasksQueue (workerNodeKey: number): void {
1575 while (this.tasksQueueSize(workerNodeKey) > 0) {
1576 this.executeTask(
1577 workerNodeKey,
1578 this.dequeueTask(workerNodeKey) as Task<Data>
1579 )
1580 }
1581 this.workerNodes[workerNodeKey].clearTasksQueue()
1582 }
1583
1584 private flushTasksQueues (): void {
1585 for (const [workerNodeKey] of this.workerNodes.entries()) {
1586 this.flushTasksQueue(workerNodeKey)
1587 }
1588 }
1589 }