604c7af8ad32dde3959dcb75166427458999c9f7
[poolifier.git] / src / pools / abstract-pool.ts
1 import { randomUUID } from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import type { TransferListItem } from 'node:worker_threads'
4 import { EventEmitterAsyncResource } from 'node:events'
5 import type {
6 MessageValue,
7 PromiseResponseWrapper,
8 Task
9 } from '../utility-types'
10 import {
11 DEFAULT_TASK_NAME,
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
13 EMPTY_FUNCTION,
14 average,
15 isKillBehavior,
16 isPlainObject,
17 max,
18 median,
19 min,
20 once,
21 round
22 } from '../utils'
23 import { KillBehaviors } from '../worker/worker-options'
24 import type { TaskFunction } from '../worker/task-functions'
25 import {
26 type IPool,
27 PoolEvents,
28 type PoolInfo,
29 type PoolOptions,
30 type PoolType,
31 PoolTypes,
32 type TasksQueueOptions
33 } from './pool'
34 import type {
35 IWorker,
36 IWorkerNode,
37 WorkerInfo,
38 WorkerNodeEventDetail,
39 WorkerType,
40 WorkerUsage
41 } from './worker'
42 import {
43 type MeasurementStatisticsRequirements,
44 Measurements,
45 WorkerChoiceStrategies,
46 type WorkerChoiceStrategy,
47 type WorkerChoiceStrategyOptions
48 } from './selection-strategies/selection-strategies-types'
49 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
50 import { version } from './version'
51 import { WorkerNode } from './worker-node'
52 import {
53 checkFilePath,
54 checkValidTasksQueueOptions,
55 checkValidWorkerChoiceStrategy,
56 updateMeasurementStatistics
57 } from './utils'
58
59 /**
60 * Base class that implements some shared logic for all poolifier pools.
61 *
62 * @typeParam Worker - Type of worker which manages this pool.
63 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
64 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
65 */
66 export abstract class AbstractPool<
67 Worker extends IWorker,
68 Data = unknown,
69 Response = unknown
70 > implements IPool<Worker, Data, Response> {
71 /** @inheritDoc */
72 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
73
74 /** @inheritDoc */
75 public emitter?: EventEmitterAsyncResource
76
77 /**
78 * Dynamic pool maximum size property placeholder.
79 */
80 protected readonly max?: number
81
82 /**
83 * The task execution response promise map:
84 * - `key`: The message id of each submitted task.
85 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
86 *
87 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
88 */
89 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
90 new Map<string, PromiseResponseWrapper<Response>>()
91
92 /**
93 * Worker choice strategy context referencing a worker choice algorithm implementation.
94 */
95 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
96 Worker,
97 Data,
98 Response
99 >
100
101 /**
102 * The task functions added at runtime map:
103 * - `key`: The task function name.
104 * - `value`: The task function itself.
105 */
106 private readonly taskFunctions: Map<string, TaskFunction<Data, Response>>
107
108 /**
109 * Whether the pool is started or not.
110 */
111 private started: boolean
112 /**
113 * Whether the pool is starting or not.
114 */
115 private starting: boolean
116 /**
117 * The start timestamp of the pool.
118 */
119 private readonly startTimestamp
120
121 /**
122 * Constructs a new poolifier pool.
123 *
124 * @param numberOfWorkers - Number of workers that this pool should manage.
125 * @param filePath - Path to the worker file.
126 * @param opts - Options for the pool.
127 */
128 public constructor (
129 protected readonly numberOfWorkers: number,
130 protected readonly filePath: string,
131 protected readonly opts: PoolOptions<Worker>
132 ) {
133 if (!this.isMain()) {
134 throw new Error(
135 'Cannot start a pool from a worker with the same type as the pool'
136 )
137 }
138 checkFilePath(this.filePath)
139 this.checkNumberOfWorkers(this.numberOfWorkers)
140 this.checkPoolOptions(this.opts)
141
142 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
143 this.executeTask = this.executeTask.bind(this)
144 this.enqueueTask = this.enqueueTask.bind(this)
145
146 if (this.opts.enableEvents === true) {
147 this.initializeEventEmitter()
148 }
149 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
150 Worker,
151 Data,
152 Response
153 >(
154 this,
155 this.opts.workerChoiceStrategy,
156 this.opts.workerChoiceStrategyOptions
157 )
158
159 this.setupHook()
160
161 this.taskFunctions = new Map<string, TaskFunction<Data, Response>>()
162
163 this.started = false
164 this.starting = false
165 if (this.opts.startWorkers === true) {
166 this.start()
167 }
168
169 this.startTimestamp = performance.now()
170 }
171
172 private checkNumberOfWorkers (numberOfWorkers: number): void {
173 if (numberOfWorkers == null) {
174 throw new Error(
175 'Cannot instantiate a pool without specifying the number of workers'
176 )
177 } else if (!Number.isSafeInteger(numberOfWorkers)) {
178 throw new TypeError(
179 'Cannot instantiate a pool with a non safe integer number of workers'
180 )
181 } else if (numberOfWorkers < 0) {
182 throw new RangeError(
183 'Cannot instantiate a pool with a negative number of workers'
184 )
185 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
186 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
187 }
188 }
189
190 private checkPoolOptions (opts: PoolOptions<Worker>): void {
191 if (isPlainObject(opts)) {
192 this.opts.startWorkers = opts.startWorkers ?? true
193 checkValidWorkerChoiceStrategy(
194 opts.workerChoiceStrategy as WorkerChoiceStrategy
195 )
196 this.opts.workerChoiceStrategy =
197 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
198 this.checkValidWorkerChoiceStrategyOptions(
199 opts.workerChoiceStrategyOptions as WorkerChoiceStrategyOptions
200 )
201 this.opts.workerChoiceStrategyOptions = {
202 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
203 ...opts.workerChoiceStrategyOptions
204 }
205 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
206 this.opts.enableEvents = opts.enableEvents ?? true
207 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
208 if (this.opts.enableTasksQueue) {
209 checkValidTasksQueueOptions(opts.tasksQueueOptions as TasksQueueOptions)
210 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
211 opts.tasksQueueOptions as TasksQueueOptions
212 )
213 }
214 } else {
215 throw new TypeError('Invalid pool options: must be a plain object')
216 }
217 }
218
219 private checkValidWorkerChoiceStrategyOptions (
220 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
221 ): void {
222 if (
223 workerChoiceStrategyOptions != null &&
224 !isPlainObject(workerChoiceStrategyOptions)
225 ) {
226 throw new TypeError(
227 'Invalid worker choice strategy options: must be a plain object'
228 )
229 }
230 if (
231 workerChoiceStrategyOptions?.retries != null &&
232 !Number.isSafeInteger(workerChoiceStrategyOptions.retries)
233 ) {
234 throw new TypeError(
235 'Invalid worker choice strategy options: retries must be an integer'
236 )
237 }
238 if (
239 workerChoiceStrategyOptions?.retries != null &&
240 workerChoiceStrategyOptions.retries < 0
241 ) {
242 throw new RangeError(
243 `Invalid worker choice strategy options: retries '${workerChoiceStrategyOptions.retries}' must be greater or equal than zero`
244 )
245 }
246 if (
247 workerChoiceStrategyOptions?.weights != null &&
248 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
249 ) {
250 throw new Error(
251 'Invalid worker choice strategy options: must have a weight for each worker node'
252 )
253 }
254 if (
255 workerChoiceStrategyOptions?.measurement != null &&
256 !Object.values(Measurements).includes(
257 workerChoiceStrategyOptions.measurement
258 )
259 ) {
260 throw new Error(
261 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
262 )
263 }
264 }
265
266 private initializeEventEmitter (): void {
267 this.emitter = new EventEmitterAsyncResource({
268 name: `poolifier:${this.type}-${this.worker}-pool`
269 })
270 }
271
272 /** @inheritDoc */
273 public get info (): PoolInfo {
274 return {
275 version,
276 type: this.type,
277 worker: this.worker,
278 started: this.started,
279 ready: this.ready,
280 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
281 minSize: this.minSize,
282 maxSize: this.maxSize,
283 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
284 .runTime.aggregate &&
285 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
286 .waitTime.aggregate && { utilization: round(this.utilization) }),
287 workerNodes: this.workerNodes.length,
288 idleWorkerNodes: this.workerNodes.reduce(
289 (accumulator, workerNode) =>
290 workerNode.usage.tasks.executing === 0
291 ? accumulator + 1
292 : accumulator,
293 0
294 ),
295 busyWorkerNodes: this.workerNodes.reduce(
296 (accumulator, workerNode) =>
297 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
298 0
299 ),
300 executedTasks: this.workerNodes.reduce(
301 (accumulator, workerNode) =>
302 accumulator + workerNode.usage.tasks.executed,
303 0
304 ),
305 executingTasks: this.workerNodes.reduce(
306 (accumulator, workerNode) =>
307 accumulator + workerNode.usage.tasks.executing,
308 0
309 ),
310 ...(this.opts.enableTasksQueue === true && {
311 queuedTasks: this.workerNodes.reduce(
312 (accumulator, workerNode) =>
313 accumulator + workerNode.usage.tasks.queued,
314 0
315 )
316 }),
317 ...(this.opts.enableTasksQueue === true && {
318 maxQueuedTasks: this.workerNodes.reduce(
319 (accumulator, workerNode) =>
320 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
321 0
322 )
323 }),
324 ...(this.opts.enableTasksQueue === true && {
325 backPressure: this.hasBackPressure()
326 }),
327 ...(this.opts.enableTasksQueue === true && {
328 stolenTasks: this.workerNodes.reduce(
329 (accumulator, workerNode) =>
330 accumulator + workerNode.usage.tasks.stolen,
331 0
332 )
333 }),
334 failedTasks: this.workerNodes.reduce(
335 (accumulator, workerNode) =>
336 accumulator + workerNode.usage.tasks.failed,
337 0
338 ),
339 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
340 .runTime.aggregate && {
341 runTime: {
342 minimum: round(
343 min(
344 ...this.workerNodes.map(
345 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
346 )
347 )
348 ),
349 maximum: round(
350 max(
351 ...this.workerNodes.map(
352 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
353 )
354 )
355 ),
356 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
357 .runTime.average && {
358 average: round(
359 average(
360 this.workerNodes.reduce<number[]>(
361 (accumulator, workerNode) =>
362 accumulator.concat(workerNode.usage.runTime.history),
363 []
364 )
365 )
366 )
367 }),
368 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
369 .runTime.median && {
370 median: round(
371 median(
372 this.workerNodes.reduce<number[]>(
373 (accumulator, workerNode) =>
374 accumulator.concat(workerNode.usage.runTime.history),
375 []
376 )
377 )
378 )
379 })
380 }
381 }),
382 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
383 .waitTime.aggregate && {
384 waitTime: {
385 minimum: round(
386 min(
387 ...this.workerNodes.map(
388 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
389 )
390 )
391 ),
392 maximum: round(
393 max(
394 ...this.workerNodes.map(
395 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
396 )
397 )
398 ),
399 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
400 .waitTime.average && {
401 average: round(
402 average(
403 this.workerNodes.reduce<number[]>(
404 (accumulator, workerNode) =>
405 accumulator.concat(workerNode.usage.waitTime.history),
406 []
407 )
408 )
409 )
410 }),
411 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
412 .waitTime.median && {
413 median: round(
414 median(
415 this.workerNodes.reduce<number[]>(
416 (accumulator, workerNode) =>
417 accumulator.concat(workerNode.usage.waitTime.history),
418 []
419 )
420 )
421 )
422 })
423 }
424 })
425 }
426 }
427
428 /**
429 * The pool readiness boolean status.
430 */
431 private get ready (): boolean {
432 return (
433 this.workerNodes.reduce(
434 (accumulator, workerNode) =>
435 !workerNode.info.dynamic && workerNode.info.ready
436 ? accumulator + 1
437 : accumulator,
438 0
439 ) >= this.minSize
440 )
441 }
442
443 /**
444 * The approximate pool utilization.
445 *
446 * @returns The pool utilization.
447 */
448 private get utilization (): number {
449 const poolTimeCapacity =
450 (performance.now() - this.startTimestamp) * this.maxSize
451 const totalTasksRunTime = this.workerNodes.reduce(
452 (accumulator, workerNode) =>
453 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
454 0
455 )
456 const totalTasksWaitTime = this.workerNodes.reduce(
457 (accumulator, workerNode) =>
458 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
459 0
460 )
461 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
462 }
463
464 /**
465 * The pool type.
466 *
467 * If it is `'dynamic'`, it provides the `max` property.
468 */
469 protected abstract get type (): PoolType
470
471 /**
472 * The worker type.
473 */
474 protected abstract get worker (): WorkerType
475
476 /**
477 * The pool minimum size.
478 */
479 protected get minSize (): number {
480 return this.numberOfWorkers
481 }
482
483 /**
484 * The pool maximum size.
485 */
486 protected get maxSize (): number {
487 return this.max ?? this.numberOfWorkers
488 }
489
490 /**
491 * Checks if the worker id sent in the received message from a worker is valid.
492 *
493 * @param message - The received message.
494 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
495 */
496 private checkMessageWorkerId (message: MessageValue<Data | Response>): void {
497 if (message.workerId == null) {
498 throw new Error('Worker message received without worker id')
499 } else if (this.getWorkerNodeKeyByWorkerId(message.workerId) === -1) {
500 throw new Error(
501 `Worker message received from unknown worker '${message.workerId}'`
502 )
503 }
504 }
505
506 /**
507 * Gets the given worker its worker node key.
508 *
509 * @param worker - The worker.
510 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
511 */
512 private getWorkerNodeKeyByWorker (worker: Worker): number {
513 return this.workerNodes.findIndex(
514 workerNode => workerNode.worker === worker
515 )
516 }
517
518 /**
519 * Gets the worker node key given its worker id.
520 *
521 * @param workerId - The worker id.
522 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
523 */
524 private getWorkerNodeKeyByWorkerId (workerId: number | undefined): number {
525 return this.workerNodes.findIndex(
526 workerNode => workerNode.info.id === workerId
527 )
528 }
529
530 /** @inheritDoc */
531 public setWorkerChoiceStrategy (
532 workerChoiceStrategy: WorkerChoiceStrategy,
533 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
534 ): void {
535 checkValidWorkerChoiceStrategy(workerChoiceStrategy)
536 this.opts.workerChoiceStrategy = workerChoiceStrategy
537 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
538 this.opts.workerChoiceStrategy
539 )
540 if (workerChoiceStrategyOptions != null) {
541 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
542 }
543 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
544 workerNode.resetUsage()
545 this.sendStatisticsMessageToWorker(workerNodeKey)
546 }
547 }
548
549 /** @inheritDoc */
550 public setWorkerChoiceStrategyOptions (
551 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
552 ): void {
553 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
554 this.opts.workerChoiceStrategyOptions = {
555 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
556 ...workerChoiceStrategyOptions
557 }
558 this.workerChoiceStrategyContext.setOptions(
559 this.opts.workerChoiceStrategyOptions
560 )
561 }
562
563 /** @inheritDoc */
564 public enableTasksQueue (
565 enable: boolean,
566 tasksQueueOptions?: TasksQueueOptions
567 ): void {
568 if (this.opts.enableTasksQueue === true && !enable) {
569 this.unsetTaskStealing()
570 this.unsetTasksStealingOnBackPressure()
571 this.flushTasksQueues()
572 }
573 this.opts.enableTasksQueue = enable
574 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
575 }
576
577 /** @inheritDoc */
578 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
579 if (this.opts.enableTasksQueue === true) {
580 checkValidTasksQueueOptions(tasksQueueOptions)
581 this.opts.tasksQueueOptions =
582 this.buildTasksQueueOptions(tasksQueueOptions)
583 this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number)
584 if (this.opts.tasksQueueOptions.taskStealing === true) {
585 this.setTaskStealing()
586 } else {
587 this.unsetTaskStealing()
588 }
589 if (this.opts.tasksQueueOptions.tasksStealingOnBackPressure === true) {
590 this.setTasksStealingOnBackPressure()
591 } else {
592 this.unsetTasksStealingOnBackPressure()
593 }
594 } else if (this.opts.tasksQueueOptions != null) {
595 delete this.opts.tasksQueueOptions
596 }
597 }
598
599 private buildTasksQueueOptions (
600 tasksQueueOptions: TasksQueueOptions
601 ): TasksQueueOptions {
602 return {
603 ...{
604 size: Math.pow(this.maxSize, 2),
605 concurrency: 1,
606 taskStealing: true,
607 tasksStealingOnBackPressure: true
608 },
609 ...tasksQueueOptions
610 }
611 }
612
613 private setTasksQueueSize (size: number): void {
614 for (const workerNode of this.workerNodes) {
615 workerNode.tasksQueueBackPressureSize = size
616 }
617 }
618
619 private setTaskStealing (): void {
620 for (const [workerNodeKey] of this.workerNodes.entries()) {
621 this.workerNodes[workerNodeKey].addEventListener(
622 'emptyqueue',
623 this.handleEmptyQueueEvent as EventListener
624 )
625 }
626 }
627
628 private unsetTaskStealing (): void {
629 for (const [workerNodeKey] of this.workerNodes.entries()) {
630 this.workerNodes[workerNodeKey].removeEventListener(
631 'emptyqueue',
632 this.handleEmptyQueueEvent as EventListener
633 )
634 }
635 }
636
637 private setTasksStealingOnBackPressure (): void {
638 for (const [workerNodeKey] of this.workerNodes.entries()) {
639 this.workerNodes[workerNodeKey].addEventListener(
640 'backpressure',
641 this.handleBackPressureEvent as EventListener
642 )
643 }
644 }
645
646 private unsetTasksStealingOnBackPressure (): void {
647 for (const [workerNodeKey] of this.workerNodes.entries()) {
648 this.workerNodes[workerNodeKey].removeEventListener(
649 'backpressure',
650 this.handleBackPressureEvent as EventListener
651 )
652 }
653 }
654
655 /**
656 * Whether the pool is full or not.
657 *
658 * The pool filling boolean status.
659 */
660 protected get full (): boolean {
661 return this.workerNodes.length >= this.maxSize
662 }
663
664 /**
665 * Whether the pool is busy or not.
666 *
667 * The pool busyness boolean status.
668 */
669 protected abstract get busy (): boolean
670
671 /**
672 * Whether worker nodes are executing concurrently their tasks quota or not.
673 *
674 * @returns Worker nodes busyness boolean status.
675 */
676 protected internalBusy (): boolean {
677 if (this.opts.enableTasksQueue === true) {
678 return (
679 this.workerNodes.findIndex(
680 workerNode =>
681 workerNode.info.ready &&
682 workerNode.usage.tasks.executing <
683 (this.opts.tasksQueueOptions?.concurrency as number)
684 ) === -1
685 )
686 }
687 return (
688 this.workerNodes.findIndex(
689 workerNode =>
690 workerNode.info.ready && workerNode.usage.tasks.executing === 0
691 ) === -1
692 )
693 }
694
695 private async sendTaskFunctionOperationToWorker (
696 workerNodeKey: number,
697 message: MessageValue<Data>
698 ): Promise<boolean> {
699 return await new Promise<boolean>((resolve, reject) => {
700 const taskFunctionOperationListener = (
701 message: MessageValue<Response>
702 ): void => {
703 this.checkMessageWorkerId(message)
704 const workerId = this.getWorkerInfo(workerNodeKey).id as number
705 if (
706 message.taskFunctionOperationStatus != null &&
707 message.workerId === workerId
708 ) {
709 if (message.taskFunctionOperationStatus) {
710 resolve(true)
711 } else if (!message.taskFunctionOperationStatus) {
712 reject(
713 new Error(
714 `Task function operation '${
715 message.taskFunctionOperation as string
716 }' failed on worker ${message.workerId} with error: '${
717 message.workerError?.message as string
718 }'`
719 )
720 )
721 }
722 this.deregisterWorkerMessageListener(
723 this.getWorkerNodeKeyByWorkerId(message.workerId),
724 taskFunctionOperationListener
725 )
726 }
727 }
728 this.registerWorkerMessageListener(
729 workerNodeKey,
730 taskFunctionOperationListener
731 )
732 this.sendToWorker(workerNodeKey, message)
733 })
734 }
735
736 private async sendTaskFunctionOperationToWorkers (
737 message: MessageValue<Data>
738 ): Promise<boolean> {
739 return await new Promise<boolean>((resolve, reject) => {
740 const responsesReceived = new Array<MessageValue<Response>>()
741 const taskFunctionOperationsListener = (
742 message: MessageValue<Response>
743 ): void => {
744 this.checkMessageWorkerId(message)
745 if (message.taskFunctionOperationStatus != null) {
746 responsesReceived.push(message)
747 if (responsesReceived.length === this.workerNodes.length) {
748 if (
749 responsesReceived.every(
750 message => message.taskFunctionOperationStatus === true
751 )
752 ) {
753 resolve(true)
754 } else if (
755 responsesReceived.some(
756 message => message.taskFunctionOperationStatus === false
757 )
758 ) {
759 const errorResponse = responsesReceived.find(
760 response => response.taskFunctionOperationStatus === false
761 )
762 reject(
763 new Error(
764 `Task function operation '${
765 message.taskFunctionOperation as string
766 }' failed on worker ${
767 errorResponse?.workerId as number
768 } with error: '${
769 errorResponse?.workerError?.message as string
770 }'`
771 )
772 )
773 }
774 this.deregisterWorkerMessageListener(
775 this.getWorkerNodeKeyByWorkerId(message.workerId),
776 taskFunctionOperationsListener
777 )
778 }
779 }
780 }
781 for (const [workerNodeKey] of this.workerNodes.entries()) {
782 this.registerWorkerMessageListener(
783 workerNodeKey,
784 taskFunctionOperationsListener
785 )
786 this.sendToWorker(workerNodeKey, message)
787 }
788 })
789 }
790
791 /** @inheritDoc */
792 public hasTaskFunction (name: string): boolean {
793 for (const workerNode of this.workerNodes) {
794 if (
795 Array.isArray(workerNode.info.taskFunctionNames) &&
796 workerNode.info.taskFunctionNames.includes(name)
797 ) {
798 return true
799 }
800 }
801 return false
802 }
803
804 /** @inheritDoc */
805 public async addTaskFunction (
806 name: string,
807 fn: TaskFunction<Data, Response>
808 ): Promise<boolean> {
809 if (typeof name !== 'string') {
810 throw new TypeError('name argument must be a string')
811 }
812 if (typeof name === 'string' && name.trim().length === 0) {
813 throw new TypeError('name argument must not be an empty string')
814 }
815 if (typeof fn !== 'function') {
816 throw new TypeError('fn argument must be a function')
817 }
818 const opResult = await this.sendTaskFunctionOperationToWorkers({
819 taskFunctionOperation: 'add',
820 taskFunctionName: name,
821 taskFunction: fn.toString()
822 })
823 this.taskFunctions.set(name, fn)
824 return opResult
825 }
826
827 /** @inheritDoc */
828 public async removeTaskFunction (name: string): Promise<boolean> {
829 if (!this.taskFunctions.has(name)) {
830 throw new Error(
831 'Cannot remove a task function not handled on the pool side'
832 )
833 }
834 const opResult = await this.sendTaskFunctionOperationToWorkers({
835 taskFunctionOperation: 'remove',
836 taskFunctionName: name
837 })
838 this.deleteTaskFunctionWorkerUsages(name)
839 this.taskFunctions.delete(name)
840 return opResult
841 }
842
843 /** @inheritDoc */
844 public listTaskFunctionNames (): string[] {
845 for (const workerNode of this.workerNodes) {
846 if (
847 Array.isArray(workerNode.info.taskFunctionNames) &&
848 workerNode.info.taskFunctionNames.length > 0
849 ) {
850 return workerNode.info.taskFunctionNames
851 }
852 }
853 return []
854 }
855
856 /** @inheritDoc */
857 public async setDefaultTaskFunction (name: string): Promise<boolean> {
858 return await this.sendTaskFunctionOperationToWorkers({
859 taskFunctionOperation: 'default',
860 taskFunctionName: name
861 })
862 }
863
864 private deleteTaskFunctionWorkerUsages (name: string): void {
865 for (const workerNode of this.workerNodes) {
866 workerNode.deleteTaskFunctionWorkerUsage(name)
867 }
868 }
869
870 private shallExecuteTask (workerNodeKey: number): boolean {
871 return (
872 this.tasksQueueSize(workerNodeKey) === 0 &&
873 this.workerNodes[workerNodeKey].usage.tasks.executing <
874 (this.opts.tasksQueueOptions?.concurrency as number)
875 )
876 }
877
878 /** @inheritDoc */
879 public async execute (
880 data?: Data,
881 name?: string,
882 transferList?: TransferListItem[]
883 ): Promise<Response> {
884 return await new Promise<Response>((resolve, reject) => {
885 if (!this.started) {
886 reject(new Error('Cannot execute a task on not started pool'))
887 return
888 }
889 if (name != null && typeof name !== 'string') {
890 reject(new TypeError('name argument must be a string'))
891 return
892 }
893 if (
894 name != null &&
895 typeof name === 'string' &&
896 name.trim().length === 0
897 ) {
898 reject(new TypeError('name argument must not be an empty string'))
899 return
900 }
901 if (transferList != null && !Array.isArray(transferList)) {
902 reject(new TypeError('transferList argument must be an array'))
903 return
904 }
905 const timestamp = performance.now()
906 const workerNodeKey = this.chooseWorkerNode()
907 const task: Task<Data> = {
908 name: name ?? DEFAULT_TASK_NAME,
909 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
910 data: data ?? ({} as Data),
911 transferList,
912 timestamp,
913 taskId: randomUUID()
914 }
915 this.promiseResponseMap.set(task.taskId as string, {
916 resolve,
917 reject,
918 workerNodeKey
919 })
920 if (
921 this.opts.enableTasksQueue === false ||
922 (this.opts.enableTasksQueue === true &&
923 this.shallExecuteTask(workerNodeKey))
924 ) {
925 this.executeTask(workerNodeKey, task)
926 } else {
927 this.enqueueTask(workerNodeKey, task)
928 }
929 })
930 }
931
932 /** @inheritdoc */
933 public start (): void {
934 this.starting = true
935 while (
936 this.workerNodes.reduce(
937 (accumulator, workerNode) =>
938 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
939 0
940 ) < this.numberOfWorkers
941 ) {
942 this.createAndSetupWorkerNode()
943 }
944 this.starting = false
945 this.started = true
946 }
947
948 /** @inheritDoc */
949 public async destroy (): Promise<void> {
950 await Promise.all(
951 this.workerNodes.map(async (_, workerNodeKey) => {
952 await this.destroyWorkerNode(workerNodeKey)
953 })
954 )
955 this.emitter?.emit(PoolEvents.destroy, this.info)
956 this.emitter?.emitDestroy()
957 this.started = false
958 }
959
960 protected async sendKillMessageToWorker (
961 workerNodeKey: number
962 ): Promise<void> {
963 await new Promise<void>((resolve, reject) => {
964 const killMessageListener = (message: MessageValue<Response>): void => {
965 this.checkMessageWorkerId(message)
966 if (message.kill === 'success') {
967 resolve()
968 } else if (message.kill === 'failure') {
969 reject(
970 new Error(
971 `Kill message handling failed on worker ${
972 message.workerId as number
973 }`
974 )
975 )
976 }
977 }
978 // FIXME: should be registered only once
979 this.registerWorkerMessageListener(workerNodeKey, killMessageListener)
980 this.sendToWorker(workerNodeKey, { kill: true })
981 })
982 }
983
984 /**
985 * Terminates the worker node given its worker node key.
986 *
987 * @param workerNodeKey - The worker node key.
988 */
989 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
990
991 /**
992 * Setup hook to execute code before worker nodes are created in the abstract constructor.
993 * Can be overridden.
994 *
995 * @virtual
996 */
997 protected setupHook (): void {
998 /* Intentionally empty */
999 }
1000
1001 /**
1002 * Should return whether the worker is the main worker or not.
1003 */
1004 protected abstract isMain (): boolean
1005
1006 /**
1007 * Hook executed before the worker task execution.
1008 * Can be overridden.
1009 *
1010 * @param workerNodeKey - The worker node key.
1011 * @param task - The task to execute.
1012 */
1013 protected beforeTaskExecutionHook (
1014 workerNodeKey: number,
1015 task: Task<Data>
1016 ): void {
1017 if (this.workerNodes[workerNodeKey]?.usage != null) {
1018 const workerUsage = this.workerNodes[workerNodeKey].usage
1019 ++workerUsage.tasks.executing
1020 this.updateWaitTimeWorkerUsage(workerUsage, task)
1021 }
1022 if (
1023 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1024 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
1025 task.name as string
1026 ) != null
1027 ) {
1028 const taskFunctionWorkerUsage = this.workerNodes[
1029 workerNodeKey
1030 ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
1031 ++taskFunctionWorkerUsage.tasks.executing
1032 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
1033 }
1034 }
1035
1036 /**
1037 * Hook executed after the worker task execution.
1038 * Can be overridden.
1039 *
1040 * @param workerNodeKey - The worker node key.
1041 * @param message - The received message.
1042 */
1043 protected afterTaskExecutionHook (
1044 workerNodeKey: number,
1045 message: MessageValue<Response>
1046 ): void {
1047 if (this.workerNodes[workerNodeKey]?.usage != null) {
1048 const workerUsage = this.workerNodes[workerNodeKey].usage
1049 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
1050 this.updateRunTimeWorkerUsage(workerUsage, message)
1051 this.updateEluWorkerUsage(workerUsage, message)
1052 }
1053 if (
1054 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1055 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
1056 message.taskPerformance?.name as string
1057 ) != null
1058 ) {
1059 const taskFunctionWorkerUsage = this.workerNodes[
1060 workerNodeKey
1061 ].getTaskFunctionWorkerUsage(
1062 message.taskPerformance?.name as string
1063 ) as WorkerUsage
1064 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
1065 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
1066 this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
1067 }
1068 }
1069
1070 /**
1071 * Whether the worker node shall update its task function worker usage or not.
1072 *
1073 * @param workerNodeKey - The worker node key.
1074 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1075 */
1076 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
1077 const workerInfo = this.getWorkerInfo(workerNodeKey)
1078 return (
1079 workerInfo != null &&
1080 Array.isArray(workerInfo.taskFunctionNames) &&
1081 workerInfo.taskFunctionNames.length > 2
1082 )
1083 }
1084
1085 private updateTaskStatisticsWorkerUsage (
1086 workerUsage: WorkerUsage,
1087 message: MessageValue<Response>
1088 ): void {
1089 const workerTaskStatistics = workerUsage.tasks
1090 if (
1091 workerTaskStatistics.executing != null &&
1092 workerTaskStatistics.executing > 0
1093 ) {
1094 --workerTaskStatistics.executing
1095 }
1096 if (message.workerError == null) {
1097 ++workerTaskStatistics.executed
1098 } else {
1099 ++workerTaskStatistics.failed
1100 }
1101 }
1102
1103 private updateRunTimeWorkerUsage (
1104 workerUsage: WorkerUsage,
1105 message: MessageValue<Response>
1106 ): void {
1107 if (message.workerError != null) {
1108 return
1109 }
1110 updateMeasurementStatistics(
1111 workerUsage.runTime,
1112 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
1113 message.taskPerformance?.runTime ?? 0
1114 )
1115 }
1116
1117 private updateWaitTimeWorkerUsage (
1118 workerUsage: WorkerUsage,
1119 task: Task<Data>
1120 ): void {
1121 const timestamp = performance.now()
1122 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
1123 updateMeasurementStatistics(
1124 workerUsage.waitTime,
1125 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
1126 taskWaitTime
1127 )
1128 }
1129
1130 private updateEluWorkerUsage (
1131 workerUsage: WorkerUsage,
1132 message: MessageValue<Response>
1133 ): void {
1134 if (message.workerError != null) {
1135 return
1136 }
1137 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
1138 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
1139 updateMeasurementStatistics(
1140 workerUsage.elu.active,
1141 eluTaskStatisticsRequirements,
1142 message.taskPerformance?.elu?.active ?? 0
1143 )
1144 updateMeasurementStatistics(
1145 workerUsage.elu.idle,
1146 eluTaskStatisticsRequirements,
1147 message.taskPerformance?.elu?.idle ?? 0
1148 )
1149 if (eluTaskStatisticsRequirements.aggregate) {
1150 if (message.taskPerformance?.elu != null) {
1151 if (workerUsage.elu.utilization != null) {
1152 workerUsage.elu.utilization =
1153 (workerUsage.elu.utilization +
1154 message.taskPerformance.elu.utilization) /
1155 2
1156 } else {
1157 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
1158 }
1159 }
1160 }
1161 }
1162
1163 /**
1164 * Chooses a worker node for the next task.
1165 *
1166 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
1167 *
1168 * @returns The chosen worker node key
1169 */
1170 private chooseWorkerNode (): number {
1171 if (this.shallCreateDynamicWorker()) {
1172 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
1173 if (
1174 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1175 ) {
1176 return workerNodeKey
1177 }
1178 }
1179 return this.workerChoiceStrategyContext.execute()
1180 }
1181
1182 /**
1183 * Conditions for dynamic worker creation.
1184 *
1185 * @returns Whether to create a dynamic worker or not.
1186 */
1187 private shallCreateDynamicWorker (): boolean {
1188 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
1189 }
1190
1191 /**
1192 * Sends a message to worker given its worker node key.
1193 *
1194 * @param workerNodeKey - The worker node key.
1195 * @param message - The message.
1196 * @param transferList - The optional array of transferable objects.
1197 */
1198 protected abstract sendToWorker (
1199 workerNodeKey: number,
1200 message: MessageValue<Data>,
1201 transferList?: TransferListItem[]
1202 ): void
1203
1204 /**
1205 * Creates a new worker.
1206 *
1207 * @returns Newly created worker.
1208 */
1209 protected abstract createWorker (): Worker
1210
1211 /**
1212 * Creates a new, completely set up worker node.
1213 *
1214 * @returns New, completely set up worker node key.
1215 */
1216 protected createAndSetupWorkerNode (): number {
1217 const worker = this.createWorker()
1218
1219 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
1220 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
1221 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1222 worker.on('error', error => {
1223 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1224 this.flagWorkerNodeAsNotReady(workerNodeKey)
1225 const workerInfo = this.getWorkerInfo(workerNodeKey)
1226 this.emitter?.emit(PoolEvents.error, error)
1227 this.workerNodes[workerNodeKey].closeChannel()
1228 if (
1229 this.started &&
1230 !this.starting &&
1231 this.opts.restartWorkerOnError === true
1232 ) {
1233 if (workerInfo.dynamic) {
1234 this.createAndSetupDynamicWorkerNode()
1235 } else {
1236 this.createAndSetupWorkerNode()
1237 }
1238 }
1239 if (this.started && this.opts.enableTasksQueue === true) {
1240 this.redistributeQueuedTasks(workerNodeKey)
1241 }
1242 })
1243 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
1244 worker.once('exit', () => {
1245 this.removeWorkerNode(worker)
1246 })
1247
1248 const workerNodeKey = this.addWorkerNode(worker)
1249
1250 this.afterWorkerNodeSetup(workerNodeKey)
1251
1252 return workerNodeKey
1253 }
1254
1255 /**
1256 * Creates a new, completely set up dynamic worker node.
1257 *
1258 * @returns New, completely set up dynamic worker node key.
1259 */
1260 protected createAndSetupDynamicWorkerNode (): number {
1261 const workerNodeKey = this.createAndSetupWorkerNode()
1262 this.registerWorkerMessageListener(workerNodeKey, message => {
1263 this.checkMessageWorkerId(message)
1264 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1265 message.workerId
1266 )
1267 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
1268 // Kill message received from worker
1269 if (
1270 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1271 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
1272 ((this.opts.enableTasksQueue === false &&
1273 workerUsage.tasks.executing === 0) ||
1274 (this.opts.enableTasksQueue === true &&
1275 workerUsage.tasks.executing === 0 &&
1276 this.tasksQueueSize(localWorkerNodeKey) === 0)))
1277 ) {
1278 // Flag the worker node as not ready immediately
1279 this.flagWorkerNodeAsNotReady(localWorkerNodeKey)
1280 this.destroyWorkerNode(localWorkerNodeKey).catch(error => {
1281 this.emitter?.emit(PoolEvents.error, error)
1282 })
1283 }
1284 })
1285 const workerInfo = this.getWorkerInfo(workerNodeKey)
1286 this.sendToWorker(workerNodeKey, {
1287 checkActive: true
1288 })
1289 if (this.taskFunctions.size > 0) {
1290 for (const [taskFunctionName, taskFunction] of this.taskFunctions) {
1291 this.sendTaskFunctionOperationToWorker(workerNodeKey, {
1292 taskFunctionOperation: 'add',
1293 taskFunctionName,
1294 taskFunction: taskFunction.toString()
1295 }).catch(error => {
1296 this.emitter?.emit(PoolEvents.error, error)
1297 })
1298 }
1299 }
1300 workerInfo.dynamic = true
1301 if (
1302 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
1303 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1304 ) {
1305 workerInfo.ready = true
1306 }
1307 this.checkAndEmitDynamicWorkerCreationEvents()
1308 return workerNodeKey
1309 }
1310
1311 /**
1312 * Registers a listener callback on the worker given its worker node key.
1313 *
1314 * @param workerNodeKey - The worker node key.
1315 * @param listener - The message listener callback.
1316 */
1317 protected abstract registerWorkerMessageListener<
1318 Message extends Data | Response
1319 >(
1320 workerNodeKey: number,
1321 listener: (message: MessageValue<Message>) => void
1322 ): void
1323
1324 /**
1325 * Registers once a listener callback on the worker given its worker node key.
1326 *
1327 * @param workerNodeKey - The worker node key.
1328 * @param listener - The message listener callback.
1329 */
1330 protected abstract registerOnceWorkerMessageListener<
1331 Message extends Data | Response
1332 >(
1333 workerNodeKey: number,
1334 listener: (message: MessageValue<Message>) => void
1335 ): void
1336
1337 /**
1338 * Deregisters a listener callback on the worker given its worker node key.
1339 *
1340 * @param workerNodeKey - The worker node key.
1341 * @param listener - The message listener callback.
1342 */
1343 protected abstract deregisterWorkerMessageListener<
1344 Message extends Data | Response
1345 >(
1346 workerNodeKey: number,
1347 listener: (message: MessageValue<Message>) => void
1348 ): void
1349
1350 /**
1351 * Method hooked up after a worker node has been newly created.
1352 * Can be overridden.
1353 *
1354 * @param workerNodeKey - The newly created worker node key.
1355 */
1356 protected afterWorkerNodeSetup (workerNodeKey: number): void {
1357 // Listen to worker messages.
1358 this.registerWorkerMessageListener(
1359 workerNodeKey,
1360 this.workerMessageListener.bind(this)
1361 )
1362 // Send the startup message to worker.
1363 this.sendStartupMessageToWorker(workerNodeKey)
1364 // Send the statistics message to worker.
1365 this.sendStatisticsMessageToWorker(workerNodeKey)
1366 if (this.opts.enableTasksQueue === true) {
1367 if (this.opts.tasksQueueOptions?.taskStealing === true) {
1368 this.workerNodes[workerNodeKey].addEventListener(
1369 'emptyqueue',
1370 this.handleEmptyQueueEvent as EventListener
1371 )
1372 }
1373 if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
1374 this.workerNodes[workerNodeKey].addEventListener(
1375 'backpressure',
1376 this.handleBackPressureEvent as EventListener
1377 )
1378 }
1379 }
1380 }
1381
1382 /**
1383 * Sends the startup message to worker given its worker node key.
1384 *
1385 * @param workerNodeKey - The worker node key.
1386 */
1387 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1388
1389 /**
1390 * Sends the statistics message to worker given its worker node key.
1391 *
1392 * @param workerNodeKey - The worker node key.
1393 */
1394 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
1395 this.sendToWorker(workerNodeKey, {
1396 statistics: {
1397 runTime:
1398 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1399 .runTime.aggregate,
1400 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1401 .elu.aggregate
1402 }
1403 })
1404 }
1405
1406 private redistributeQueuedTasks (workerNodeKey: number): void {
1407 while (this.tasksQueueSize(workerNodeKey) > 0) {
1408 const destinationWorkerNodeKey = this.workerNodes.reduce(
1409 (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
1410 return workerNode.info.ready &&
1411 workerNode.usage.tasks.queued <
1412 workerNodes[minWorkerNodeKey].usage.tasks.queued
1413 ? workerNodeKey
1414 : minWorkerNodeKey
1415 },
1416 0
1417 )
1418 const task = this.dequeueTask(workerNodeKey) as Task<Data>
1419 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1420 this.executeTask(destinationWorkerNodeKey, task)
1421 } else {
1422 this.enqueueTask(destinationWorkerNodeKey, task)
1423 }
1424 }
1425 }
1426
1427 private updateTaskStolenStatisticsWorkerUsage (
1428 workerNodeKey: number,
1429 taskName: string
1430 ): void {
1431 const workerNode = this.workerNodes[workerNodeKey]
1432 if (workerNode?.usage != null) {
1433 ++workerNode.usage.tasks.stolen
1434 }
1435 if (
1436 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1437 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1438 ) {
1439 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1440 taskName
1441 ) as WorkerUsage
1442 ++taskFunctionWorkerUsage.tasks.stolen
1443 }
1444 }
1445
1446 private readonly handleEmptyQueueEvent = (
1447 event: CustomEvent<WorkerNodeEventDetail>
1448 ): void => {
1449 const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1450 event.detail.workerId
1451 )
1452 const workerNodes = this.workerNodes
1453 .slice()
1454 .sort(
1455 (workerNodeA, workerNodeB) =>
1456 workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
1457 )
1458 const sourceWorkerNode = workerNodes.find(
1459 workerNode =>
1460 workerNode.info.ready &&
1461 workerNode.info.id !== event.detail.workerId &&
1462 workerNode.usage.tasks.queued > 0
1463 )
1464 if (sourceWorkerNode != null) {
1465 const task = sourceWorkerNode.popTask() as Task<Data>
1466 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1467 this.executeTask(destinationWorkerNodeKey, task)
1468 } else {
1469 this.enqueueTask(destinationWorkerNodeKey, task)
1470 }
1471 this.updateTaskStolenStatisticsWorkerUsage(
1472 destinationWorkerNodeKey,
1473 task.name as string
1474 )
1475 }
1476 }
1477
1478 private readonly handleBackPressureEvent = (
1479 event: CustomEvent<WorkerNodeEventDetail>
1480 ): void => {
1481 const sizeOffset = 1
1482 if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) {
1483 return
1484 }
1485 const sourceWorkerNode =
1486 this.workerNodes[this.getWorkerNodeKeyByWorkerId(event.detail.workerId)]
1487 const workerNodes = this.workerNodes
1488 .slice()
1489 .sort(
1490 (workerNodeA, workerNodeB) =>
1491 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1492 )
1493 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1494 if (
1495 sourceWorkerNode.usage.tasks.queued > 0 &&
1496 workerNode.info.ready &&
1497 workerNode.info.id !== event.detail.workerId &&
1498 workerNode.usage.tasks.queued <
1499 (this.opts.tasksQueueOptions?.size as number) - sizeOffset
1500 ) {
1501 const task = sourceWorkerNode.popTask() as Task<Data>
1502 if (this.shallExecuteTask(workerNodeKey)) {
1503 this.executeTask(workerNodeKey, task)
1504 } else {
1505 this.enqueueTask(workerNodeKey, task)
1506 }
1507 this.updateTaskStolenStatisticsWorkerUsage(
1508 workerNodeKey,
1509 task.name as string
1510 )
1511 }
1512 }
1513 }
1514
1515 /**
1516 * This method is the message listener registered on each worker.
1517 */
1518 protected workerMessageListener (message: MessageValue<Response>): void {
1519 this.checkMessageWorkerId(message)
1520 if (message.ready != null && message.taskFunctionNames != null) {
1521 // Worker ready response received from worker
1522 this.handleWorkerReadyResponse(message)
1523 } else if (message.taskId != null) {
1524 // Task execution response received from worker
1525 this.handleTaskExecutionResponse(message)
1526 } else if (message.taskFunctionNames != null) {
1527 // Task function names message received from worker
1528 this.getWorkerInfo(
1529 this.getWorkerNodeKeyByWorkerId(message.workerId)
1530 ).taskFunctionNames = message.taskFunctionNames
1531 }
1532 }
1533
1534 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
1535 if (message.ready === false) {
1536 throw new Error(
1537 `Worker ${message.workerId as number} failed to initialize`
1538 )
1539 }
1540 const workerInfo = this.getWorkerInfo(
1541 this.getWorkerNodeKeyByWorkerId(message.workerId)
1542 )
1543 workerInfo.ready = message.ready as boolean
1544 workerInfo.taskFunctionNames = message.taskFunctionNames
1545 if (this.ready) {
1546 const emitPoolReadyEventOnce = once(
1547 () => this.emitter?.emit(PoolEvents.ready, this.info),
1548 this
1549 )
1550 emitPoolReadyEventOnce()
1551 }
1552 }
1553
1554 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1555 const { taskId, workerError, data } = message
1556 const promiseResponse = this.promiseResponseMap.get(taskId as string)
1557 if (promiseResponse != null) {
1558 if (workerError != null) {
1559 this.emitter?.emit(PoolEvents.taskError, workerError)
1560 promiseResponse.reject(workerError.message)
1561 } else {
1562 promiseResponse.resolve(data as Response)
1563 }
1564 const workerNodeKey = promiseResponse.workerNodeKey
1565 this.afterTaskExecutionHook(workerNodeKey, message)
1566 this.workerChoiceStrategyContext.update(workerNodeKey)
1567 this.promiseResponseMap.delete(taskId as string)
1568 if (
1569 this.opts.enableTasksQueue === true &&
1570 this.tasksQueueSize(workerNodeKey) > 0 &&
1571 this.workerNodes[workerNodeKey].usage.tasks.executing <
1572 (this.opts.tasksQueueOptions?.concurrency as number)
1573 ) {
1574 this.executeTask(
1575 workerNodeKey,
1576 this.dequeueTask(workerNodeKey) as Task<Data>
1577 )
1578 }
1579 }
1580 }
1581
1582 private checkAndEmitTaskExecutionEvents (): void {
1583 if (this.busy) {
1584 this.emitter?.emit(PoolEvents.busy, this.info)
1585 }
1586 }
1587
1588 private checkAndEmitTaskQueuingEvents (): void {
1589 if (this.hasBackPressure()) {
1590 this.emitter?.emit(PoolEvents.backPressure, this.info)
1591 }
1592 }
1593
1594 private checkAndEmitDynamicWorkerCreationEvents (): void {
1595 if (this.type === PoolTypes.dynamic) {
1596 if (this.full) {
1597 this.emitter?.emit(PoolEvents.full, this.info)
1598 }
1599 }
1600 }
1601
1602 /**
1603 * Gets the worker information given its worker node key.
1604 *
1605 * @param workerNodeKey - The worker node key.
1606 * @returns The worker information.
1607 */
1608 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
1609 return this.workerNodes[workerNodeKey]?.info
1610 }
1611
1612 /**
1613 * Adds the given worker in the pool worker nodes.
1614 *
1615 * @param worker - The worker.
1616 * @returns The added worker node key.
1617 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1618 */
1619 private addWorkerNode (worker: Worker): number {
1620 const workerNode = new WorkerNode<Worker, Data>(
1621 worker,
1622 this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2)
1623 )
1624 // Flag the worker node as ready at pool startup.
1625 if (this.starting) {
1626 workerNode.info.ready = true
1627 }
1628 this.workerNodes.push(workerNode)
1629 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1630 if (workerNodeKey === -1) {
1631 throw new Error('Worker added not found in worker nodes')
1632 }
1633 return workerNodeKey
1634 }
1635
1636 /**
1637 * Removes the given worker from the pool worker nodes.
1638 *
1639 * @param worker - The worker.
1640 */
1641 private removeWorkerNode (worker: Worker): void {
1642 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1643 if (workerNodeKey !== -1) {
1644 this.workerNodes.splice(workerNodeKey, 1)
1645 this.workerChoiceStrategyContext.remove(workerNodeKey)
1646 }
1647 }
1648
1649 protected flagWorkerNodeAsNotReady (workerNodeKey: number): void {
1650 this.getWorkerInfo(workerNodeKey).ready = false
1651 }
1652
1653 /** @inheritDoc */
1654 public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
1655 return (
1656 this.opts.enableTasksQueue === true &&
1657 this.workerNodes[workerNodeKey].hasBackPressure()
1658 )
1659 }
1660
1661 private hasBackPressure (): boolean {
1662 return (
1663 this.opts.enableTasksQueue === true &&
1664 this.workerNodes.findIndex(
1665 workerNode => !workerNode.hasBackPressure()
1666 ) === -1
1667 )
1668 }
1669
1670 /**
1671 * Executes the given task on the worker given its worker node key.
1672 *
1673 * @param workerNodeKey - The worker node key.
1674 * @param task - The task to execute.
1675 */
1676 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1677 this.beforeTaskExecutionHook(workerNodeKey, task)
1678 this.sendToWorker(workerNodeKey, task, task.transferList)
1679 this.checkAndEmitTaskExecutionEvents()
1680 }
1681
1682 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1683 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1684 this.checkAndEmitTaskQueuingEvents()
1685 return tasksQueueSize
1686 }
1687
1688 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1689 return this.workerNodes[workerNodeKey].dequeueTask()
1690 }
1691
1692 private tasksQueueSize (workerNodeKey: number): number {
1693 return this.workerNodes[workerNodeKey].tasksQueueSize()
1694 }
1695
1696 protected flushTasksQueue (workerNodeKey: number): void {
1697 while (this.tasksQueueSize(workerNodeKey) > 0) {
1698 this.executeTask(
1699 workerNodeKey,
1700 this.dequeueTask(workerNodeKey) as Task<Data>
1701 )
1702 }
1703 this.workerNodes[workerNodeKey].clearTasksQueue()
1704 }
1705
1706 private flushTasksQueues (): void {
1707 for (const [workerNodeKey] of this.workerNodes.entries()) {
1708 this.flushTasksQueue(workerNodeKey)
1709 }
1710 }
1711 }