Merge dependabot/npm_and_yarn/examples/typescript/http-server-pool/fastify-hybrid...
[poolifier.git] / src / pools / abstract-pool.ts
1 import { randomUUID } from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import type { TransferListItem } from 'node:worker_threads'
4 import { EventEmitterAsyncResource } from 'node:events'
5 import type {
6 MessageValue,
7 PromiseResponseWrapper,
8 Task
9 } from '../utility-types'
10 import {
11 DEFAULT_TASK_NAME,
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
13 EMPTY_FUNCTION,
14 average,
15 isKillBehavior,
16 isPlainObject,
17 max,
18 median,
19 min,
20 round
21 } from '../utils'
22 import { KillBehaviors } from '../worker/worker-options'
23 import type { TaskFunction } from '../worker/task-functions'
24 import {
25 type IPool,
26 PoolEvents,
27 type PoolInfo,
28 type PoolOptions,
29 type PoolType,
30 PoolTypes,
31 type TasksQueueOptions
32 } from './pool'
33 import type {
34 IWorker,
35 IWorkerNode,
36 WorkerInfo,
37 WorkerNodeEventDetail,
38 WorkerType,
39 WorkerUsage
40 } from './worker'
41 import {
42 type MeasurementStatisticsRequirements,
43 Measurements,
44 WorkerChoiceStrategies,
45 type WorkerChoiceStrategy,
46 type WorkerChoiceStrategyOptions
47 } from './selection-strategies/selection-strategies-types'
48 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
49 import { version } from './version'
50 import { WorkerNode } from './worker-node'
51 import {
52 checkFilePath,
53 checkValidTasksQueueOptions,
54 checkValidWorkerChoiceStrategy,
55 updateMeasurementStatistics
56 } from './utils'
57
58 /**
59 * Base class that implements some shared logic for all poolifier pools.
60 *
61 * @typeParam Worker - Type of worker which manages this pool.
62 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
63 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
64 */
65 export abstract class AbstractPool<
66 Worker extends IWorker,
67 Data = unknown,
68 Response = unknown
69 > implements IPool<Worker, Data, Response> {
70 /** @inheritDoc */
71 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
72
73 /** @inheritDoc */
74 public emitter?: EventEmitterAsyncResource
75
76 /**
77 * Dynamic pool maximum size property placeholder.
78 */
79 protected readonly max?: number
80
81 /**
82 * The task execution response promise map:
83 * - `key`: The message id of each submitted task.
84 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
85 *
86 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
87 */
88 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
89 new Map<string, PromiseResponseWrapper<Response>>()
90
91 /**
92 * Worker choice strategy context referencing a worker choice algorithm implementation.
93 */
94 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
95 Worker,
96 Data,
97 Response
98 >
99
100 /**
101 * The task functions added at runtime map:
102 * - `key`: The task function name.
103 * - `value`: The task function itself.
104 */
105 private readonly taskFunctions: Map<string, TaskFunction<Data, Response>>
106
107 /**
108 * Whether the pool is started or not.
109 */
110 private started: boolean
111 /**
112 * Whether the pool is starting or not.
113 */
114 private starting: boolean
115 /**
116 * The start timestamp of the pool.
117 */
118 private readonly startTimestamp
119
120 /**
121 * Constructs a new poolifier pool.
122 *
123 * @param numberOfWorkers - Number of workers that this pool should manage.
124 * @param filePath - Path to the worker file.
125 * @param opts - Options for the pool.
126 */
127 public constructor (
128 protected readonly numberOfWorkers: number,
129 protected readonly filePath: string,
130 protected readonly opts: PoolOptions<Worker>
131 ) {
132 if (!this.isMain()) {
133 throw new Error(
134 'Cannot start a pool from a worker with the same type as the pool'
135 )
136 }
137 checkFilePath(this.filePath)
138 this.checkNumberOfWorkers(this.numberOfWorkers)
139 this.checkPoolOptions(this.opts)
140
141 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
142 this.executeTask = this.executeTask.bind(this)
143 this.enqueueTask = this.enqueueTask.bind(this)
144
145 if (this.opts.enableEvents === true) {
146 this.initializeEventEmitter()
147 }
148 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
149 Worker,
150 Data,
151 Response
152 >(
153 this,
154 this.opts.workerChoiceStrategy,
155 this.opts.workerChoiceStrategyOptions
156 )
157
158 this.setupHook()
159
160 this.taskFunctions = new Map<string, TaskFunction<Data, Response>>()
161
162 this.started = false
163 this.starting = false
164 if (this.opts.startWorkers === true) {
165 this.start()
166 }
167
168 this.startTimestamp = performance.now()
169 }
170
171 private checkNumberOfWorkers (numberOfWorkers: number): void {
172 if (numberOfWorkers == null) {
173 throw new Error(
174 'Cannot instantiate a pool without specifying the number of workers'
175 )
176 } else if (!Number.isSafeInteger(numberOfWorkers)) {
177 throw new TypeError(
178 'Cannot instantiate a pool with a non safe integer number of workers'
179 )
180 } else if (numberOfWorkers < 0) {
181 throw new RangeError(
182 'Cannot instantiate a pool with a negative number of workers'
183 )
184 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
185 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
186 }
187 }
188
189 private checkPoolOptions (opts: PoolOptions<Worker>): void {
190 if (isPlainObject(opts)) {
191 this.opts.startWorkers = opts.startWorkers ?? true
192 checkValidWorkerChoiceStrategy(
193 opts.workerChoiceStrategy as WorkerChoiceStrategy
194 )
195 this.opts.workerChoiceStrategy =
196 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
197 this.checkValidWorkerChoiceStrategyOptions(
198 opts.workerChoiceStrategyOptions as WorkerChoiceStrategyOptions
199 )
200 this.opts.workerChoiceStrategyOptions = {
201 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
202 ...opts.workerChoiceStrategyOptions
203 }
204 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
205 this.opts.enableEvents = opts.enableEvents ?? true
206 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
207 if (this.opts.enableTasksQueue) {
208 checkValidTasksQueueOptions(opts.tasksQueueOptions as TasksQueueOptions)
209 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
210 opts.tasksQueueOptions as TasksQueueOptions
211 )
212 }
213 } else {
214 throw new TypeError('Invalid pool options: must be a plain object')
215 }
216 }
217
218 private checkValidWorkerChoiceStrategyOptions (
219 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
220 ): void {
221 if (
222 workerChoiceStrategyOptions != null &&
223 !isPlainObject(workerChoiceStrategyOptions)
224 ) {
225 throw new TypeError(
226 'Invalid worker choice strategy options: must be a plain object'
227 )
228 }
229 if (
230 workerChoiceStrategyOptions?.retries != null &&
231 !Number.isSafeInteger(workerChoiceStrategyOptions.retries)
232 ) {
233 throw new TypeError(
234 'Invalid worker choice strategy options: retries must be an integer'
235 )
236 }
237 if (
238 workerChoiceStrategyOptions?.retries != null &&
239 workerChoiceStrategyOptions.retries < 0
240 ) {
241 throw new RangeError(
242 `Invalid worker choice strategy options: retries '${workerChoiceStrategyOptions.retries}' must be greater or equal than zero`
243 )
244 }
245 if (
246 workerChoiceStrategyOptions?.weights != null &&
247 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
248 ) {
249 throw new Error(
250 'Invalid worker choice strategy options: must have a weight for each worker node'
251 )
252 }
253 if (
254 workerChoiceStrategyOptions?.measurement != null &&
255 !Object.values(Measurements).includes(
256 workerChoiceStrategyOptions.measurement
257 )
258 ) {
259 throw new Error(
260 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
261 )
262 }
263 }
264
265 private initializeEventEmitter (): void {
266 this.emitter = new EventEmitterAsyncResource({
267 name: `poolifier:${this.type}-${this.worker}-pool`
268 })
269 }
270
271 /** @inheritDoc */
272 public get info (): PoolInfo {
273 return {
274 version,
275 type: this.type,
276 worker: this.worker,
277 started: this.started,
278 ready: this.ready,
279 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
280 minSize: this.minSize,
281 maxSize: this.maxSize,
282 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
283 .runTime.aggregate &&
284 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
285 .waitTime.aggregate && { utilization: round(this.utilization) }),
286 workerNodes: this.workerNodes.length,
287 idleWorkerNodes: this.workerNodes.reduce(
288 (accumulator, workerNode) =>
289 workerNode.usage.tasks.executing === 0
290 ? accumulator + 1
291 : accumulator,
292 0
293 ),
294 busyWorkerNodes: this.workerNodes.reduce(
295 (accumulator, workerNode) =>
296 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
297 0
298 ),
299 executedTasks: this.workerNodes.reduce(
300 (accumulator, workerNode) =>
301 accumulator + workerNode.usage.tasks.executed,
302 0
303 ),
304 executingTasks: this.workerNodes.reduce(
305 (accumulator, workerNode) =>
306 accumulator + workerNode.usage.tasks.executing,
307 0
308 ),
309 ...(this.opts.enableTasksQueue === true && {
310 queuedTasks: this.workerNodes.reduce(
311 (accumulator, workerNode) =>
312 accumulator + workerNode.usage.tasks.queued,
313 0
314 )
315 }),
316 ...(this.opts.enableTasksQueue === true && {
317 maxQueuedTasks: this.workerNodes.reduce(
318 (accumulator, workerNode) =>
319 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
320 0
321 )
322 }),
323 ...(this.opts.enableTasksQueue === true && {
324 backPressure: this.hasBackPressure()
325 }),
326 ...(this.opts.enableTasksQueue === true && {
327 stolenTasks: this.workerNodes.reduce(
328 (accumulator, workerNode) =>
329 accumulator + workerNode.usage.tasks.stolen,
330 0
331 )
332 }),
333 failedTasks: this.workerNodes.reduce(
334 (accumulator, workerNode) =>
335 accumulator + workerNode.usage.tasks.failed,
336 0
337 ),
338 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
339 .runTime.aggregate && {
340 runTime: {
341 minimum: round(
342 min(
343 ...this.workerNodes.map(
344 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
345 )
346 )
347 ),
348 maximum: round(
349 max(
350 ...this.workerNodes.map(
351 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
352 )
353 )
354 ),
355 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
356 .runTime.average && {
357 average: round(
358 average(
359 this.workerNodes.reduce<number[]>(
360 (accumulator, workerNode) =>
361 accumulator.concat(workerNode.usage.runTime.history),
362 []
363 )
364 )
365 )
366 }),
367 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
368 .runTime.median && {
369 median: round(
370 median(
371 this.workerNodes.reduce<number[]>(
372 (accumulator, workerNode) =>
373 accumulator.concat(workerNode.usage.runTime.history),
374 []
375 )
376 )
377 )
378 })
379 }
380 }),
381 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
382 .waitTime.aggregate && {
383 waitTime: {
384 minimum: round(
385 min(
386 ...this.workerNodes.map(
387 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
388 )
389 )
390 ),
391 maximum: round(
392 max(
393 ...this.workerNodes.map(
394 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
395 )
396 )
397 ),
398 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
399 .waitTime.average && {
400 average: round(
401 average(
402 this.workerNodes.reduce<number[]>(
403 (accumulator, workerNode) =>
404 accumulator.concat(workerNode.usage.waitTime.history),
405 []
406 )
407 )
408 )
409 }),
410 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
411 .waitTime.median && {
412 median: round(
413 median(
414 this.workerNodes.reduce<number[]>(
415 (accumulator, workerNode) =>
416 accumulator.concat(workerNode.usage.waitTime.history),
417 []
418 )
419 )
420 )
421 })
422 }
423 })
424 }
425 }
426
427 /**
428 * The pool readiness boolean status.
429 */
430 private get ready (): boolean {
431 return (
432 this.workerNodes.reduce(
433 (accumulator, workerNode) =>
434 !workerNode.info.dynamic && workerNode.info.ready
435 ? accumulator + 1
436 : accumulator,
437 0
438 ) >= this.minSize
439 )
440 }
441
442 /**
443 * The approximate pool utilization.
444 *
445 * @returns The pool utilization.
446 */
447 private get utilization (): number {
448 const poolTimeCapacity =
449 (performance.now() - this.startTimestamp) * this.maxSize
450 const totalTasksRunTime = this.workerNodes.reduce(
451 (accumulator, workerNode) =>
452 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
453 0
454 )
455 const totalTasksWaitTime = this.workerNodes.reduce(
456 (accumulator, workerNode) =>
457 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
458 0
459 )
460 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
461 }
462
463 /**
464 * The pool type.
465 *
466 * If it is `'dynamic'`, it provides the `max` property.
467 */
468 protected abstract get type (): PoolType
469
470 /**
471 * The worker type.
472 */
473 protected abstract get worker (): WorkerType
474
475 /**
476 * The pool minimum size.
477 */
478 protected get minSize (): number {
479 return this.numberOfWorkers
480 }
481
482 /**
483 * The pool maximum size.
484 */
485 protected get maxSize (): number {
486 return this.max ?? this.numberOfWorkers
487 }
488
489 /**
490 * Checks if the worker id sent in the received message from a worker is valid.
491 *
492 * @param message - The received message.
493 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
494 */
495 private checkMessageWorkerId (message: MessageValue<Data | Response>): void {
496 if (message.workerId == null) {
497 throw new Error('Worker message received without worker id')
498 } else if (
499 message.workerId != null &&
500 this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
501 ) {
502 throw new Error(
503 `Worker message received from unknown worker '${message.workerId}'`
504 )
505 }
506 }
507
508 /**
509 * Gets the given worker its worker node key.
510 *
511 * @param worker - The worker.
512 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
513 */
514 private getWorkerNodeKeyByWorker (worker: Worker): number {
515 return this.workerNodes.findIndex(
516 workerNode => workerNode.worker === worker
517 )
518 }
519
520 /**
521 * Gets the worker node key given its worker id.
522 *
523 * @param workerId - The worker id.
524 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
525 */
526 private getWorkerNodeKeyByWorkerId (workerId: number | undefined): number {
527 return this.workerNodes.findIndex(
528 workerNode => workerNode.info.id === workerId
529 )
530 }
531
532 /** @inheritDoc */
533 public setWorkerChoiceStrategy (
534 workerChoiceStrategy: WorkerChoiceStrategy,
535 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
536 ): void {
537 checkValidWorkerChoiceStrategy(workerChoiceStrategy)
538 this.opts.workerChoiceStrategy = workerChoiceStrategy
539 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
540 this.opts.workerChoiceStrategy
541 )
542 if (workerChoiceStrategyOptions != null) {
543 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
544 }
545 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
546 workerNode.resetUsage()
547 this.sendStatisticsMessageToWorker(workerNodeKey)
548 }
549 }
550
551 /** @inheritDoc */
552 public setWorkerChoiceStrategyOptions (
553 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
554 ): void {
555 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
556 this.opts.workerChoiceStrategyOptions = {
557 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
558 ...workerChoiceStrategyOptions
559 }
560 this.workerChoiceStrategyContext.setOptions(
561 this.opts.workerChoiceStrategyOptions
562 )
563 }
564
565 /** @inheritDoc */
566 public enableTasksQueue (
567 enable: boolean,
568 tasksQueueOptions?: TasksQueueOptions
569 ): void {
570 if (this.opts.enableTasksQueue === true && !enable) {
571 this.unsetTaskStealing()
572 this.unsetTasksStealingOnBackPressure()
573 this.flushTasksQueues()
574 }
575 this.opts.enableTasksQueue = enable
576 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
577 }
578
579 /** @inheritDoc */
580 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
581 if (this.opts.enableTasksQueue === true) {
582 checkValidTasksQueueOptions(tasksQueueOptions)
583 this.opts.tasksQueueOptions =
584 this.buildTasksQueueOptions(tasksQueueOptions)
585 this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number)
586 if (this.opts.tasksQueueOptions.taskStealing === true) {
587 this.setTaskStealing()
588 } else {
589 this.unsetTaskStealing()
590 }
591 if (this.opts.tasksQueueOptions.tasksStealingOnBackPressure === true) {
592 this.setTasksStealingOnBackPressure()
593 } else {
594 this.unsetTasksStealingOnBackPressure()
595 }
596 } else if (this.opts.tasksQueueOptions != null) {
597 delete this.opts.tasksQueueOptions
598 }
599 }
600
601 private buildTasksQueueOptions (
602 tasksQueueOptions: TasksQueueOptions
603 ): TasksQueueOptions {
604 return {
605 ...{
606 size: Math.pow(this.maxSize, 2),
607 concurrency: 1,
608 taskStealing: true,
609 tasksStealingOnBackPressure: true
610 },
611 ...tasksQueueOptions
612 }
613 }
614
615 private setTasksQueueSize (size: number): void {
616 for (const workerNode of this.workerNodes) {
617 workerNode.tasksQueueBackPressureSize = size
618 }
619 }
620
621 private setTaskStealing (): void {
622 for (const [workerNodeKey] of this.workerNodes.entries()) {
623 this.workerNodes[workerNodeKey].addEventListener(
624 'emptyqueue',
625 this.handleEmptyQueueEvent as EventListener
626 )
627 }
628 }
629
630 private unsetTaskStealing (): void {
631 for (const [workerNodeKey] of this.workerNodes.entries()) {
632 this.workerNodes[workerNodeKey].removeEventListener(
633 'emptyqueue',
634 this.handleEmptyQueueEvent as EventListener
635 )
636 }
637 }
638
639 private setTasksStealingOnBackPressure (): void {
640 for (const [workerNodeKey] of this.workerNodes.entries()) {
641 this.workerNodes[workerNodeKey].addEventListener(
642 'backpressure',
643 this.handleBackPressureEvent as EventListener
644 )
645 }
646 }
647
648 private unsetTasksStealingOnBackPressure (): void {
649 for (const [workerNodeKey] of this.workerNodes.entries()) {
650 this.workerNodes[workerNodeKey].removeEventListener(
651 'backpressure',
652 this.handleBackPressureEvent as EventListener
653 )
654 }
655 }
656
657 /**
658 * Whether the pool is full or not.
659 *
660 * The pool filling boolean status.
661 */
662 protected get full (): boolean {
663 return this.workerNodes.length >= this.maxSize
664 }
665
666 /**
667 * Whether the pool is busy or not.
668 *
669 * The pool busyness boolean status.
670 */
671 protected abstract get busy (): boolean
672
673 /**
674 * Whether worker nodes are executing concurrently their tasks quota or not.
675 *
676 * @returns Worker nodes busyness boolean status.
677 */
678 protected internalBusy (): boolean {
679 if (this.opts.enableTasksQueue === true) {
680 return (
681 this.workerNodes.findIndex(
682 workerNode =>
683 workerNode.info.ready &&
684 workerNode.usage.tasks.executing <
685 (this.opts.tasksQueueOptions?.concurrency as number)
686 ) === -1
687 )
688 }
689 return (
690 this.workerNodes.findIndex(
691 workerNode =>
692 workerNode.info.ready && workerNode.usage.tasks.executing === 0
693 ) === -1
694 )
695 }
696
697 private async sendTaskFunctionOperationToWorker (
698 workerNodeKey: number,
699 message: MessageValue<Data>
700 ): Promise<boolean> {
701 return await new Promise<boolean>((resolve, reject) => {
702 const taskFunctionOperationListener = (
703 message: MessageValue<Response>
704 ): void => {
705 this.checkMessageWorkerId(message)
706 const workerId = this.getWorkerInfo(workerNodeKey).id as number
707 if (
708 message.taskFunctionOperationStatus != null &&
709 message.workerId === workerId
710 ) {
711 if (message.taskFunctionOperationStatus) {
712 resolve(true)
713 } else if (!message.taskFunctionOperationStatus) {
714 reject(
715 new Error(
716 `Task function operation '${
717 message.taskFunctionOperation as string
718 }' failed on worker ${message.workerId} with error: '${
719 message.workerError?.message as string
720 }'`
721 )
722 )
723 }
724 this.deregisterWorkerMessageListener(
725 this.getWorkerNodeKeyByWorkerId(message.workerId),
726 taskFunctionOperationListener
727 )
728 }
729 }
730 this.registerWorkerMessageListener(
731 workerNodeKey,
732 taskFunctionOperationListener
733 )
734 this.sendToWorker(workerNodeKey, message)
735 })
736 }
737
738 private async sendTaskFunctionOperationToWorkers (
739 message: MessageValue<Data>
740 ): Promise<boolean> {
741 return await new Promise<boolean>((resolve, reject) => {
742 const responsesReceived = new Array<MessageValue<Response>>()
743 const taskFunctionOperationsListener = (
744 message: MessageValue<Response>
745 ): void => {
746 this.checkMessageWorkerId(message)
747 if (message.taskFunctionOperationStatus != null) {
748 responsesReceived.push(message)
749 if (responsesReceived.length === this.workerNodes.length) {
750 if (
751 responsesReceived.every(
752 message => message.taskFunctionOperationStatus === true
753 )
754 ) {
755 resolve(true)
756 } else if (
757 responsesReceived.some(
758 message => message.taskFunctionOperationStatus === false
759 )
760 ) {
761 const errorResponse = responsesReceived.find(
762 response => response.taskFunctionOperationStatus === false
763 )
764 reject(
765 new Error(
766 `Task function operation '${
767 message.taskFunctionOperation as string
768 }' failed on worker ${
769 errorResponse?.workerId as number
770 } with error: '${
771 errorResponse?.workerError?.message as string
772 }'`
773 )
774 )
775 }
776 this.deregisterWorkerMessageListener(
777 this.getWorkerNodeKeyByWorkerId(message.workerId),
778 taskFunctionOperationsListener
779 )
780 }
781 }
782 }
783 for (const [workerNodeKey] of this.workerNodes.entries()) {
784 this.registerWorkerMessageListener(
785 workerNodeKey,
786 taskFunctionOperationsListener
787 )
788 this.sendToWorker(workerNodeKey, message)
789 }
790 })
791 }
792
793 /** @inheritDoc */
794 public hasTaskFunction (name: string): boolean {
795 for (const workerNode of this.workerNodes) {
796 if (
797 Array.isArray(workerNode.info.taskFunctionNames) &&
798 workerNode.info.taskFunctionNames.includes(name)
799 ) {
800 return true
801 }
802 }
803 return false
804 }
805
806 /** @inheritDoc */
807 public async addTaskFunction (
808 name: string,
809 fn: TaskFunction<Data, Response>
810 ): Promise<boolean> {
811 if (typeof name !== 'string') {
812 throw new TypeError('name argument must be a string')
813 }
814 if (typeof name === 'string' && name.trim().length === 0) {
815 throw new TypeError('name argument must not be an empty string')
816 }
817 if (typeof fn !== 'function') {
818 throw new TypeError('fn argument must be a function')
819 }
820 const opResult = await this.sendTaskFunctionOperationToWorkers({
821 taskFunctionOperation: 'add',
822 taskFunctionName: name,
823 taskFunction: fn.toString()
824 })
825 this.taskFunctions.set(name, fn)
826 return opResult
827 }
828
829 /** @inheritDoc */
830 public async removeTaskFunction (name: string): Promise<boolean> {
831 if (!this.taskFunctions.has(name)) {
832 throw new Error(
833 'Cannot remove a task function not handled on the pool side'
834 )
835 }
836 const opResult = await this.sendTaskFunctionOperationToWorkers({
837 taskFunctionOperation: 'remove',
838 taskFunctionName: name
839 })
840 this.deleteTaskFunctionWorkerUsages(name)
841 this.taskFunctions.delete(name)
842 return opResult
843 }
844
845 /** @inheritDoc */
846 public listTaskFunctionNames (): string[] {
847 for (const workerNode of this.workerNodes) {
848 if (
849 Array.isArray(workerNode.info.taskFunctionNames) &&
850 workerNode.info.taskFunctionNames.length > 0
851 ) {
852 return workerNode.info.taskFunctionNames
853 }
854 }
855 return []
856 }
857
858 /** @inheritDoc */
859 public async setDefaultTaskFunction (name: string): Promise<boolean> {
860 return await this.sendTaskFunctionOperationToWorkers({
861 taskFunctionOperation: 'default',
862 taskFunctionName: name
863 })
864 }
865
866 private deleteTaskFunctionWorkerUsages (name: string): void {
867 for (const workerNode of this.workerNodes) {
868 workerNode.deleteTaskFunctionWorkerUsage(name)
869 }
870 }
871
872 private shallExecuteTask (workerNodeKey: number): boolean {
873 return (
874 this.tasksQueueSize(workerNodeKey) === 0 &&
875 this.workerNodes[workerNodeKey].usage.tasks.executing <
876 (this.opts.tasksQueueOptions?.concurrency as number)
877 )
878 }
879
880 /** @inheritDoc */
881 public async execute (
882 data?: Data,
883 name?: string,
884 transferList?: TransferListItem[]
885 ): Promise<Response> {
886 return await new Promise<Response>((resolve, reject) => {
887 if (!this.started) {
888 reject(new Error('Cannot execute a task on not started pool'))
889 return
890 }
891 if (name != null && typeof name !== 'string') {
892 reject(new TypeError('name argument must be a string'))
893 return
894 }
895 if (
896 name != null &&
897 typeof name === 'string' &&
898 name.trim().length === 0
899 ) {
900 reject(new TypeError('name argument must not be an empty string'))
901 return
902 }
903 if (transferList != null && !Array.isArray(transferList)) {
904 reject(new TypeError('transferList argument must be an array'))
905 return
906 }
907 const timestamp = performance.now()
908 const workerNodeKey = this.chooseWorkerNode()
909 const task: Task<Data> = {
910 name: name ?? DEFAULT_TASK_NAME,
911 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
912 data: data ?? ({} as Data),
913 transferList,
914 timestamp,
915 taskId: randomUUID()
916 }
917 this.promiseResponseMap.set(task.taskId as string, {
918 resolve,
919 reject,
920 workerNodeKey
921 })
922 if (
923 this.opts.enableTasksQueue === false ||
924 (this.opts.enableTasksQueue === true &&
925 this.shallExecuteTask(workerNodeKey))
926 ) {
927 this.executeTask(workerNodeKey, task)
928 } else {
929 this.enqueueTask(workerNodeKey, task)
930 }
931 })
932 }
933
934 /** @inheritdoc */
935 public start (): void {
936 this.starting = true
937 while (
938 this.workerNodes.reduce(
939 (accumulator, workerNode) =>
940 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
941 0
942 ) < this.numberOfWorkers
943 ) {
944 this.createAndSetupWorkerNode()
945 }
946 this.starting = false
947 this.started = true
948 }
949
950 /** @inheritDoc */
951 public async destroy (): Promise<void> {
952 await Promise.all(
953 this.workerNodes.map(async (_, workerNodeKey) => {
954 await this.destroyWorkerNode(workerNodeKey)
955 })
956 )
957 this.emitter?.emit(PoolEvents.destroy, this.info)
958 this.emitter?.emitDestroy()
959 this.started = false
960 }
961
962 protected async sendKillMessageToWorker (
963 workerNodeKey: number
964 ): Promise<void> {
965 await new Promise<void>((resolve, reject) => {
966 const killMessageListener = (message: MessageValue<Response>): void => {
967 this.checkMessageWorkerId(message)
968 if (message.kill === 'success') {
969 resolve()
970 } else if (message.kill === 'failure') {
971 reject(
972 new Error(
973 `Kill message handling failed on worker ${
974 message.workerId as number
975 }`
976 )
977 )
978 }
979 }
980 this.registerWorkerMessageListener(workerNodeKey, killMessageListener)
981 this.sendToWorker(workerNodeKey, { kill: true })
982 })
983 }
984
985 /**
986 * Terminates the worker node given its worker node key.
987 *
988 * @param workerNodeKey - The worker node key.
989 */
990 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
991
992 /**
993 * Setup hook to execute code before worker nodes are created in the abstract constructor.
994 * Can be overridden.
995 *
996 * @virtual
997 */
998 protected setupHook (): void {
999 /* Intentionally empty */
1000 }
1001
1002 /**
1003 * Should return whether the worker is the main worker or not.
1004 */
1005 protected abstract isMain (): boolean
1006
1007 /**
1008 * Hook executed before the worker task execution.
1009 * Can be overridden.
1010 *
1011 * @param workerNodeKey - The worker node key.
1012 * @param task - The task to execute.
1013 */
1014 protected beforeTaskExecutionHook (
1015 workerNodeKey: number,
1016 task: Task<Data>
1017 ): void {
1018 if (this.workerNodes[workerNodeKey]?.usage != null) {
1019 const workerUsage = this.workerNodes[workerNodeKey].usage
1020 ++workerUsage.tasks.executing
1021 this.updateWaitTimeWorkerUsage(workerUsage, task)
1022 }
1023 if (
1024 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1025 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
1026 task.name as string
1027 ) != null
1028 ) {
1029 const taskFunctionWorkerUsage = this.workerNodes[
1030 workerNodeKey
1031 ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
1032 ++taskFunctionWorkerUsage.tasks.executing
1033 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
1034 }
1035 }
1036
1037 /**
1038 * Hook executed after the worker task execution.
1039 * Can be overridden.
1040 *
1041 * @param workerNodeKey - The worker node key.
1042 * @param message - The received message.
1043 */
1044 protected afterTaskExecutionHook (
1045 workerNodeKey: number,
1046 message: MessageValue<Response>
1047 ): void {
1048 if (this.workerNodes[workerNodeKey]?.usage != null) {
1049 const workerUsage = this.workerNodes[workerNodeKey].usage
1050 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
1051 this.updateRunTimeWorkerUsage(workerUsage, message)
1052 this.updateEluWorkerUsage(workerUsage, message)
1053 }
1054 if (
1055 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1056 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
1057 message.taskPerformance?.name as string
1058 ) != null
1059 ) {
1060 const taskFunctionWorkerUsage = this.workerNodes[
1061 workerNodeKey
1062 ].getTaskFunctionWorkerUsage(
1063 message.taskPerformance?.name as string
1064 ) as WorkerUsage
1065 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
1066 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
1067 this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
1068 }
1069 }
1070
1071 /**
1072 * Whether the worker node shall update its task function worker usage or not.
1073 *
1074 * @param workerNodeKey - The worker node key.
1075 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1076 */
1077 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
1078 const workerInfo = this.getWorkerInfo(workerNodeKey)
1079 return (
1080 workerInfo != null &&
1081 Array.isArray(workerInfo.taskFunctionNames) &&
1082 workerInfo.taskFunctionNames.length > 2
1083 )
1084 }
1085
1086 private updateTaskStatisticsWorkerUsage (
1087 workerUsage: WorkerUsage,
1088 message: MessageValue<Response>
1089 ): void {
1090 const workerTaskStatistics = workerUsage.tasks
1091 if (
1092 workerTaskStatistics.executing != null &&
1093 workerTaskStatistics.executing > 0
1094 ) {
1095 --workerTaskStatistics.executing
1096 }
1097 if (message.workerError == null) {
1098 ++workerTaskStatistics.executed
1099 } else {
1100 ++workerTaskStatistics.failed
1101 }
1102 }
1103
1104 private updateRunTimeWorkerUsage (
1105 workerUsage: WorkerUsage,
1106 message: MessageValue<Response>
1107 ): void {
1108 if (message.workerError != null) {
1109 return
1110 }
1111 updateMeasurementStatistics(
1112 workerUsage.runTime,
1113 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
1114 message.taskPerformance?.runTime ?? 0
1115 )
1116 }
1117
1118 private updateWaitTimeWorkerUsage (
1119 workerUsage: WorkerUsage,
1120 task: Task<Data>
1121 ): void {
1122 const timestamp = performance.now()
1123 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
1124 updateMeasurementStatistics(
1125 workerUsage.waitTime,
1126 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
1127 taskWaitTime
1128 )
1129 }
1130
1131 private updateEluWorkerUsage (
1132 workerUsage: WorkerUsage,
1133 message: MessageValue<Response>
1134 ): void {
1135 if (message.workerError != null) {
1136 return
1137 }
1138 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
1139 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
1140 updateMeasurementStatistics(
1141 workerUsage.elu.active,
1142 eluTaskStatisticsRequirements,
1143 message.taskPerformance?.elu?.active ?? 0
1144 )
1145 updateMeasurementStatistics(
1146 workerUsage.elu.idle,
1147 eluTaskStatisticsRequirements,
1148 message.taskPerformance?.elu?.idle ?? 0
1149 )
1150 if (eluTaskStatisticsRequirements.aggregate) {
1151 if (message.taskPerformance?.elu != null) {
1152 if (workerUsage.elu.utilization != null) {
1153 workerUsage.elu.utilization =
1154 (workerUsage.elu.utilization +
1155 message.taskPerformance.elu.utilization) /
1156 2
1157 } else {
1158 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
1159 }
1160 }
1161 }
1162 }
1163
1164 /**
1165 * Chooses a worker node for the next task.
1166 *
1167 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
1168 *
1169 * @returns The chosen worker node key
1170 */
1171 private chooseWorkerNode (): number {
1172 if (this.shallCreateDynamicWorker()) {
1173 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
1174 if (
1175 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1176 ) {
1177 return workerNodeKey
1178 }
1179 }
1180 return this.workerChoiceStrategyContext.execute()
1181 }
1182
1183 /**
1184 * Conditions for dynamic worker creation.
1185 *
1186 * @returns Whether to create a dynamic worker or not.
1187 */
1188 private shallCreateDynamicWorker (): boolean {
1189 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
1190 }
1191
1192 /**
1193 * Sends a message to worker given its worker node key.
1194 *
1195 * @param workerNodeKey - The worker node key.
1196 * @param message - The message.
1197 * @param transferList - The optional array of transferable objects.
1198 */
1199 protected abstract sendToWorker (
1200 workerNodeKey: number,
1201 message: MessageValue<Data>,
1202 transferList?: TransferListItem[]
1203 ): void
1204
1205 /**
1206 * Creates a new worker.
1207 *
1208 * @returns Newly created worker.
1209 */
1210 protected abstract createWorker (): Worker
1211
1212 /**
1213 * Creates a new, completely set up worker node.
1214 *
1215 * @returns New, completely set up worker node key.
1216 */
1217 protected createAndSetupWorkerNode (): number {
1218 const worker = this.createWorker()
1219
1220 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
1221 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
1222 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1223 worker.on('error', error => {
1224 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1225 const workerInfo = this.getWorkerInfo(workerNodeKey)
1226 workerInfo.ready = false
1227 this.emitter?.emit(PoolEvents.error, error)
1228 this.workerNodes[workerNodeKey].closeChannel()
1229 if (
1230 this.started &&
1231 !this.starting &&
1232 this.opts.restartWorkerOnError === true
1233 ) {
1234 if (workerInfo.dynamic) {
1235 this.createAndSetupDynamicWorkerNode()
1236 } else {
1237 this.createAndSetupWorkerNode()
1238 }
1239 }
1240 if (this.started && this.opts.enableTasksQueue === true) {
1241 this.redistributeQueuedTasks(workerNodeKey)
1242 }
1243 })
1244 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
1245 worker.once('exit', () => {
1246 this.removeWorkerNode(worker)
1247 })
1248
1249 const workerNodeKey = this.addWorkerNode(worker)
1250
1251 this.afterWorkerNodeSetup(workerNodeKey)
1252
1253 return workerNodeKey
1254 }
1255
1256 /**
1257 * Creates a new, completely set up dynamic worker node.
1258 *
1259 * @returns New, completely set up dynamic worker node key.
1260 */
1261 protected createAndSetupDynamicWorkerNode (): number {
1262 const workerNodeKey = this.createAndSetupWorkerNode()
1263 this.registerWorkerMessageListener(workerNodeKey, message => {
1264 this.checkMessageWorkerId(message)
1265 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1266 message.workerId
1267 )
1268 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
1269 // Kill message received from worker
1270 if (
1271 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1272 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
1273 ((this.opts.enableTasksQueue === false &&
1274 workerUsage.tasks.executing === 0) ||
1275 (this.opts.enableTasksQueue === true &&
1276 workerUsage.tasks.executing === 0 &&
1277 this.tasksQueueSize(localWorkerNodeKey) === 0)))
1278 ) {
1279 // Flag the worker node as not ready immediately
1280 this.flagWorkerNodeAsNotReady(localWorkerNodeKey)
1281 this.destroyWorkerNode(localWorkerNodeKey).catch(error => {
1282 this.emitter?.emit(PoolEvents.error, error)
1283 })
1284 }
1285 })
1286 const workerInfo = this.getWorkerInfo(workerNodeKey)
1287 this.sendToWorker(workerNodeKey, {
1288 checkActive: true
1289 })
1290 if (this.taskFunctions.size > 0) {
1291 for (const [taskFunctionName, taskFunction] of this.taskFunctions) {
1292 this.sendTaskFunctionOperationToWorker(workerNodeKey, {
1293 taskFunctionOperation: 'add',
1294 taskFunctionName,
1295 taskFunction: taskFunction.toString()
1296 }).catch(error => {
1297 this.emitter?.emit(PoolEvents.error, error)
1298 })
1299 }
1300 }
1301 workerInfo.dynamic = true
1302 if (
1303 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
1304 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1305 ) {
1306 workerInfo.ready = true
1307 }
1308 this.checkAndEmitDynamicWorkerCreationEvents()
1309 return workerNodeKey
1310 }
1311
1312 /**
1313 * Registers a listener callback on the worker given its worker node key.
1314 *
1315 * @param workerNodeKey - The worker node key.
1316 * @param listener - The message listener callback.
1317 */
1318 protected abstract registerWorkerMessageListener<
1319 Message extends Data | Response
1320 >(
1321 workerNodeKey: number,
1322 listener: (message: MessageValue<Message>) => void
1323 ): void
1324
1325 /**
1326 * Registers once a listener callback on the worker given its worker node key.
1327 *
1328 * @param workerNodeKey - The worker node key.
1329 * @param listener - The message listener callback.
1330 */
1331 protected abstract registerOnceWorkerMessageListener<
1332 Message extends Data | Response
1333 >(
1334 workerNodeKey: number,
1335 listener: (message: MessageValue<Message>) => void
1336 ): void
1337
1338 /**
1339 * Deregisters a listener callback on the worker given its worker node key.
1340 *
1341 * @param workerNodeKey - The worker node key.
1342 * @param listener - The message listener callback.
1343 */
1344 protected abstract deregisterWorkerMessageListener<
1345 Message extends Data | Response
1346 >(
1347 workerNodeKey: number,
1348 listener: (message: MessageValue<Message>) => void
1349 ): void
1350
1351 /**
1352 * Method hooked up after a worker node has been newly created.
1353 * Can be overridden.
1354 *
1355 * @param workerNodeKey - The newly created worker node key.
1356 */
1357 protected afterWorkerNodeSetup (workerNodeKey: number): void {
1358 // Listen to worker messages.
1359 this.registerWorkerMessageListener(
1360 workerNodeKey,
1361 this.workerMessageListener.bind(this)
1362 )
1363 // Send the startup message to worker.
1364 this.sendStartupMessageToWorker(workerNodeKey)
1365 // Send the statistics message to worker.
1366 this.sendStatisticsMessageToWorker(workerNodeKey)
1367 if (this.opts.enableTasksQueue === true) {
1368 if (this.opts.tasksQueueOptions?.taskStealing === true) {
1369 this.workerNodes[workerNodeKey].addEventListener(
1370 'emptyqueue',
1371 this.handleEmptyQueueEvent as EventListener
1372 )
1373 }
1374 if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
1375 this.workerNodes[workerNodeKey].addEventListener(
1376 'backpressure',
1377 this.handleBackPressureEvent as EventListener
1378 )
1379 }
1380 }
1381 }
1382
1383 /**
1384 * Sends the startup message to worker given its worker node key.
1385 *
1386 * @param workerNodeKey - The worker node key.
1387 */
1388 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1389
1390 /**
1391 * Sends the statistics message to worker given its worker node key.
1392 *
1393 * @param workerNodeKey - The worker node key.
1394 */
1395 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
1396 this.sendToWorker(workerNodeKey, {
1397 statistics: {
1398 runTime:
1399 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1400 .runTime.aggregate,
1401 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1402 .elu.aggregate
1403 }
1404 })
1405 }
1406
1407 private redistributeQueuedTasks (workerNodeKey: number): void {
1408 while (this.tasksQueueSize(workerNodeKey) > 0) {
1409 const destinationWorkerNodeKey = this.workerNodes.reduce(
1410 (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
1411 return workerNode.info.ready &&
1412 workerNode.usage.tasks.queued <
1413 workerNodes[minWorkerNodeKey].usage.tasks.queued
1414 ? workerNodeKey
1415 : minWorkerNodeKey
1416 },
1417 0
1418 )
1419 const task = this.dequeueTask(workerNodeKey) as Task<Data>
1420 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1421 this.executeTask(destinationWorkerNodeKey, task)
1422 } else {
1423 this.enqueueTask(destinationWorkerNodeKey, task)
1424 }
1425 }
1426 }
1427
1428 private updateTaskStolenStatisticsWorkerUsage (
1429 workerNodeKey: number,
1430 taskName: string
1431 ): void {
1432 const workerNode = this.workerNodes[workerNodeKey]
1433 if (workerNode?.usage != null) {
1434 ++workerNode.usage.tasks.stolen
1435 }
1436 if (
1437 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1438 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1439 ) {
1440 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1441 taskName
1442 ) as WorkerUsage
1443 ++taskFunctionWorkerUsage.tasks.stolen
1444 }
1445 }
1446
1447 private readonly handleEmptyQueueEvent = (
1448 event: CustomEvent<WorkerNodeEventDetail>
1449 ): void => {
1450 const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1451 event.detail.workerId
1452 )
1453 const workerNodes = this.workerNodes
1454 .slice()
1455 .sort(
1456 (workerNodeA, workerNodeB) =>
1457 workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
1458 )
1459 const sourceWorkerNode = workerNodes.find(
1460 workerNode =>
1461 workerNode.info.ready &&
1462 workerNode.info.id !== event.detail.workerId &&
1463 workerNode.usage.tasks.queued > 0
1464 )
1465 if (sourceWorkerNode != null) {
1466 const task = sourceWorkerNode.popTask() as Task<Data>
1467 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1468 this.executeTask(destinationWorkerNodeKey, task)
1469 } else {
1470 this.enqueueTask(destinationWorkerNodeKey, task)
1471 }
1472 this.updateTaskStolenStatisticsWorkerUsage(
1473 destinationWorkerNodeKey,
1474 task.name as string
1475 )
1476 }
1477 }
1478
1479 private readonly handleBackPressureEvent = (
1480 event: CustomEvent<WorkerNodeEventDetail>
1481 ): void => {
1482 const sizeOffset = 1
1483 if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) {
1484 return
1485 }
1486 const sourceWorkerNode =
1487 this.workerNodes[this.getWorkerNodeKeyByWorkerId(event.detail.workerId)]
1488 const workerNodes = this.workerNodes
1489 .slice()
1490 .sort(
1491 (workerNodeA, workerNodeB) =>
1492 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1493 )
1494 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1495 if (
1496 sourceWorkerNode.usage.tasks.queued > 0 &&
1497 workerNode.info.ready &&
1498 workerNode.info.id !== event.detail.workerId &&
1499 workerNode.usage.tasks.queued <
1500 (this.opts.tasksQueueOptions?.size as number) - sizeOffset
1501 ) {
1502 const task = sourceWorkerNode.popTask() as Task<Data>
1503 if (this.shallExecuteTask(workerNodeKey)) {
1504 this.executeTask(workerNodeKey, task)
1505 } else {
1506 this.enqueueTask(workerNodeKey, task)
1507 }
1508 this.updateTaskStolenStatisticsWorkerUsage(
1509 workerNodeKey,
1510 task.name as string
1511 )
1512 }
1513 }
1514 }
1515
1516 /**
1517 * This method is the message listener registered on each worker.
1518 */
1519 protected workerMessageListener (message: MessageValue<Response>): void {
1520 this.checkMessageWorkerId(message)
1521 if (message.ready != null && message.taskFunctionNames != null) {
1522 // Worker ready response received from worker
1523 this.handleWorkerReadyResponse(message)
1524 } else if (message.taskId != null) {
1525 // Task execution response received from worker
1526 this.handleTaskExecutionResponse(message)
1527 } else if (message.taskFunctionNames != null) {
1528 // Task function names message received from worker
1529 this.getWorkerInfo(
1530 this.getWorkerNodeKeyByWorkerId(message.workerId)
1531 ).taskFunctionNames = message.taskFunctionNames
1532 }
1533 }
1534
1535 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
1536 if (message.ready === false) {
1537 throw new Error(
1538 `Worker ${message.workerId as number} failed to initialize`
1539 )
1540 }
1541 const workerInfo = this.getWorkerInfo(
1542 this.getWorkerNodeKeyByWorkerId(message.workerId)
1543 )
1544 workerInfo.ready = message.ready as boolean
1545 workerInfo.taskFunctionNames = message.taskFunctionNames
1546 if (this.ready) {
1547 this.emitter?.emit(PoolEvents.ready, this.info)
1548 }
1549 }
1550
1551 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1552 const { taskId, workerError, data } = message
1553 const promiseResponse = this.promiseResponseMap.get(taskId as string)
1554 if (promiseResponse != null) {
1555 if (workerError != null) {
1556 this.emitter?.emit(PoolEvents.taskError, workerError)
1557 promiseResponse.reject(workerError.message)
1558 } else {
1559 promiseResponse.resolve(data as Response)
1560 }
1561 const workerNodeKey = promiseResponse.workerNodeKey
1562 this.afterTaskExecutionHook(workerNodeKey, message)
1563 this.workerChoiceStrategyContext.update(workerNodeKey)
1564 this.promiseResponseMap.delete(taskId as string)
1565 if (
1566 this.opts.enableTasksQueue === true &&
1567 this.tasksQueueSize(workerNodeKey) > 0 &&
1568 this.workerNodes[workerNodeKey].usage.tasks.executing <
1569 (this.opts.tasksQueueOptions?.concurrency as number)
1570 ) {
1571 this.executeTask(
1572 workerNodeKey,
1573 this.dequeueTask(workerNodeKey) as Task<Data>
1574 )
1575 }
1576 }
1577 }
1578
1579 private checkAndEmitTaskExecutionEvents (): void {
1580 if (this.busy) {
1581 this.emitter?.emit(PoolEvents.busy, this.info)
1582 }
1583 }
1584
1585 private checkAndEmitTaskQueuingEvents (): void {
1586 if (this.hasBackPressure()) {
1587 this.emitter?.emit(PoolEvents.backPressure, this.info)
1588 }
1589 }
1590
1591 private checkAndEmitDynamicWorkerCreationEvents (): void {
1592 if (this.type === PoolTypes.dynamic) {
1593 if (this.full) {
1594 this.emitter?.emit(PoolEvents.full, this.info)
1595 }
1596 }
1597 }
1598
1599 /**
1600 * Gets the worker information given its worker node key.
1601 *
1602 * @param workerNodeKey - The worker node key.
1603 * @returns The worker information.
1604 */
1605 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
1606 return this.workerNodes[workerNodeKey]?.info
1607 }
1608
1609 /**
1610 * Adds the given worker in the pool worker nodes.
1611 *
1612 * @param worker - The worker.
1613 * @returns The added worker node key.
1614 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1615 */
1616 private addWorkerNode (worker: Worker): number {
1617 const workerNode = new WorkerNode<Worker, Data>(
1618 worker,
1619 this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2)
1620 )
1621 // Flag the worker node as ready at pool startup.
1622 if (this.starting) {
1623 workerNode.info.ready = true
1624 }
1625 this.workerNodes.push(workerNode)
1626 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1627 if (workerNodeKey === -1) {
1628 throw new Error('Worker added not found in worker nodes')
1629 }
1630 return workerNodeKey
1631 }
1632
1633 /**
1634 * Removes the given worker from the pool worker nodes.
1635 *
1636 * @param worker - The worker.
1637 */
1638 private removeWorkerNode (worker: Worker): void {
1639 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1640 if (workerNodeKey !== -1) {
1641 this.workerNodes.splice(workerNodeKey, 1)
1642 this.workerChoiceStrategyContext.remove(workerNodeKey)
1643 }
1644 }
1645
1646 protected flagWorkerNodeAsNotReady (workerNodeKey: number): void {
1647 this.getWorkerInfo(workerNodeKey).ready = false
1648 }
1649
1650 /** @inheritDoc */
1651 public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
1652 return (
1653 this.opts.enableTasksQueue === true &&
1654 this.workerNodes[workerNodeKey].hasBackPressure()
1655 )
1656 }
1657
1658 private hasBackPressure (): boolean {
1659 return (
1660 this.opts.enableTasksQueue === true &&
1661 this.workerNodes.findIndex(
1662 workerNode => !workerNode.hasBackPressure()
1663 ) === -1
1664 )
1665 }
1666
1667 /**
1668 * Executes the given task on the worker given its worker node key.
1669 *
1670 * @param workerNodeKey - The worker node key.
1671 * @param task - The task to execute.
1672 */
1673 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1674 this.beforeTaskExecutionHook(workerNodeKey, task)
1675 this.sendToWorker(workerNodeKey, task, task.transferList)
1676 this.checkAndEmitTaskExecutionEvents()
1677 }
1678
1679 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1680 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1681 this.checkAndEmitTaskQueuingEvents()
1682 return tasksQueueSize
1683 }
1684
1685 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1686 return this.workerNodes[workerNodeKey].dequeueTask()
1687 }
1688
1689 private tasksQueueSize (workerNodeKey: number): number {
1690 return this.workerNodes[workerNodeKey].tasksQueueSize()
1691 }
1692
1693 protected flushTasksQueue (workerNodeKey: number): void {
1694 while (this.tasksQueueSize(workerNodeKey) > 0) {
1695 this.executeTask(
1696 workerNodeKey,
1697 this.dequeueTask(workerNodeKey) as Task<Data>
1698 )
1699 }
1700 this.workerNodes[workerNodeKey].clearTasksQueue()
1701 }
1702
1703 private flushTasksQueues (): void {
1704 for (const [workerNodeKey] of this.workerNodes.entries()) {
1705 this.flushTasksQueue(workerNodeKey)
1706 }
1707 }
1708 }