refactor: use helper to flag worker node as not ready
[poolifier.git] / src / pools / abstract-pool.ts
1 import { randomUUID } from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import type { TransferListItem } from 'node:worker_threads'
4 import { EventEmitterAsyncResource } from 'node:events'
5 import type {
6 MessageValue,
7 PromiseResponseWrapper,
8 Task
9 } from '../utility-types'
10 import {
11 DEFAULT_TASK_NAME,
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
13 EMPTY_FUNCTION,
14 average,
15 isKillBehavior,
16 isPlainObject,
17 max,
18 median,
19 min,
20 round
21 } from '../utils'
22 import { KillBehaviors } from '../worker/worker-options'
23 import type { TaskFunction } from '../worker/task-functions'
24 import {
25 type IPool,
26 PoolEvents,
27 type PoolInfo,
28 type PoolOptions,
29 type PoolType,
30 PoolTypes,
31 type TasksQueueOptions
32 } from './pool'
33 import type {
34 IWorker,
35 IWorkerNode,
36 WorkerInfo,
37 WorkerNodeEventDetail,
38 WorkerType,
39 WorkerUsage
40 } from './worker'
41 import {
42 type MeasurementStatisticsRequirements,
43 Measurements,
44 WorkerChoiceStrategies,
45 type WorkerChoiceStrategy,
46 type WorkerChoiceStrategyOptions
47 } from './selection-strategies/selection-strategies-types'
48 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
49 import { version } from './version'
50 import { WorkerNode } from './worker-node'
51 import {
52 checkFilePath,
53 checkValidTasksQueueOptions,
54 checkValidWorkerChoiceStrategy,
55 updateMeasurementStatistics
56 } from './utils'
57
58 /**
59 * Base class that implements some shared logic for all poolifier pools.
60 *
61 * @typeParam Worker - Type of worker which manages this pool.
62 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
63 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
64 */
65 export abstract class AbstractPool<
66 Worker extends IWorker,
67 Data = unknown,
68 Response = unknown
69 > implements IPool<Worker, Data, Response> {
70 /** @inheritDoc */
71 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
72
73 /** @inheritDoc */
74 public emitter?: EventEmitterAsyncResource
75
76 /**
77 * Dynamic pool maximum size property placeholder.
78 */
79 protected readonly max?: number
80
81 /**
82 * The task execution response promise map:
83 * - `key`: The message id of each submitted task.
84 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
85 *
86 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
87 */
88 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
89 new Map<string, PromiseResponseWrapper<Response>>()
90
91 /**
92 * Worker choice strategy context referencing a worker choice algorithm implementation.
93 */
94 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
95 Worker,
96 Data,
97 Response
98 >
99
100 /**
101 * The task functions added at runtime map:
102 * - `key`: The task function name.
103 * - `value`: The task function itself.
104 */
105 private readonly taskFunctions: Map<string, TaskFunction<Data, Response>>
106
107 /**
108 * Whether the pool is started or not.
109 */
110 private started: boolean
111 /**
112 * Whether the pool is starting or not.
113 */
114 private starting: boolean
115 /**
116 * The start timestamp of the pool.
117 */
118 private readonly startTimestamp
119
120 /**
121 * Constructs a new poolifier pool.
122 *
123 * @param numberOfWorkers - Number of workers that this pool should manage.
124 * @param filePath - Path to the worker file.
125 * @param opts - Options for the pool.
126 */
127 public constructor (
128 protected readonly numberOfWorkers: number,
129 protected readonly filePath: string,
130 protected readonly opts: PoolOptions<Worker>
131 ) {
132 if (!this.isMain()) {
133 throw new Error(
134 'Cannot start a pool from a worker with the same type as the pool'
135 )
136 }
137 checkFilePath(this.filePath)
138 this.checkNumberOfWorkers(this.numberOfWorkers)
139 this.checkPoolOptions(this.opts)
140
141 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
142 this.executeTask = this.executeTask.bind(this)
143 this.enqueueTask = this.enqueueTask.bind(this)
144
145 if (this.opts.enableEvents === true) {
146 this.initializeEventEmitter()
147 }
148 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
149 Worker,
150 Data,
151 Response
152 >(
153 this,
154 this.opts.workerChoiceStrategy,
155 this.opts.workerChoiceStrategyOptions
156 )
157
158 this.setupHook()
159
160 this.taskFunctions = new Map<string, TaskFunction<Data, Response>>()
161
162 this.started = false
163 this.starting = false
164 if (this.opts.startWorkers === true) {
165 this.start()
166 }
167
168 this.startTimestamp = performance.now()
169 }
170
171 private checkNumberOfWorkers (numberOfWorkers: number): void {
172 if (numberOfWorkers == null) {
173 throw new Error(
174 'Cannot instantiate a pool without specifying the number of workers'
175 )
176 } else if (!Number.isSafeInteger(numberOfWorkers)) {
177 throw new TypeError(
178 'Cannot instantiate a pool with a non safe integer number of workers'
179 )
180 } else if (numberOfWorkers < 0) {
181 throw new RangeError(
182 'Cannot instantiate a pool with a negative number of workers'
183 )
184 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
185 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
186 }
187 }
188
189 private checkPoolOptions (opts: PoolOptions<Worker>): void {
190 if (isPlainObject(opts)) {
191 this.opts.startWorkers = opts.startWorkers ?? true
192 checkValidWorkerChoiceStrategy(
193 opts.workerChoiceStrategy as WorkerChoiceStrategy
194 )
195 this.opts.workerChoiceStrategy =
196 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
197 this.checkValidWorkerChoiceStrategyOptions(
198 opts.workerChoiceStrategyOptions as WorkerChoiceStrategyOptions
199 )
200 this.opts.workerChoiceStrategyOptions = {
201 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
202 ...opts.workerChoiceStrategyOptions
203 }
204 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
205 this.opts.enableEvents = opts.enableEvents ?? true
206 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
207 if (this.opts.enableTasksQueue) {
208 checkValidTasksQueueOptions(opts.tasksQueueOptions as TasksQueueOptions)
209 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
210 opts.tasksQueueOptions as TasksQueueOptions
211 )
212 }
213 } else {
214 throw new TypeError('Invalid pool options: must be a plain object')
215 }
216 }
217
218 private checkValidWorkerChoiceStrategyOptions (
219 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
220 ): void {
221 if (
222 workerChoiceStrategyOptions != null &&
223 !isPlainObject(workerChoiceStrategyOptions)
224 ) {
225 throw new TypeError(
226 'Invalid worker choice strategy options: must be a plain object'
227 )
228 }
229 if (
230 workerChoiceStrategyOptions?.retries != null &&
231 !Number.isSafeInteger(workerChoiceStrategyOptions.retries)
232 ) {
233 throw new TypeError(
234 'Invalid worker choice strategy options: retries must be an integer'
235 )
236 }
237 if (
238 workerChoiceStrategyOptions?.retries != null &&
239 workerChoiceStrategyOptions.retries < 0
240 ) {
241 throw new RangeError(
242 `Invalid worker choice strategy options: retries '${workerChoiceStrategyOptions.retries}' must be greater or equal than zero`
243 )
244 }
245 if (
246 workerChoiceStrategyOptions?.weights != null &&
247 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
248 ) {
249 throw new Error(
250 'Invalid worker choice strategy options: must have a weight for each worker node'
251 )
252 }
253 if (
254 workerChoiceStrategyOptions?.measurement != null &&
255 !Object.values(Measurements).includes(
256 workerChoiceStrategyOptions.measurement
257 )
258 ) {
259 throw new Error(
260 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
261 )
262 }
263 }
264
265 private initializeEventEmitter (): void {
266 this.emitter = new EventEmitterAsyncResource({
267 name: `poolifier:${this.type}-${this.worker}-pool`
268 })
269 }
270
271 /** @inheritDoc */
272 public get info (): PoolInfo {
273 return {
274 version,
275 type: this.type,
276 worker: this.worker,
277 started: this.started,
278 ready: this.ready,
279 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
280 minSize: this.minSize,
281 maxSize: this.maxSize,
282 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
283 .runTime.aggregate &&
284 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
285 .waitTime.aggregate && { utilization: round(this.utilization) }),
286 workerNodes: this.workerNodes.length,
287 idleWorkerNodes: this.workerNodes.reduce(
288 (accumulator, workerNode) =>
289 workerNode.usage.tasks.executing === 0
290 ? accumulator + 1
291 : accumulator,
292 0
293 ),
294 busyWorkerNodes: this.workerNodes.reduce(
295 (accumulator, workerNode) =>
296 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
297 0
298 ),
299 executedTasks: this.workerNodes.reduce(
300 (accumulator, workerNode) =>
301 accumulator + workerNode.usage.tasks.executed,
302 0
303 ),
304 executingTasks: this.workerNodes.reduce(
305 (accumulator, workerNode) =>
306 accumulator + workerNode.usage.tasks.executing,
307 0
308 ),
309 ...(this.opts.enableTasksQueue === true && {
310 queuedTasks: this.workerNodes.reduce(
311 (accumulator, workerNode) =>
312 accumulator + workerNode.usage.tasks.queued,
313 0
314 )
315 }),
316 ...(this.opts.enableTasksQueue === true && {
317 maxQueuedTasks: this.workerNodes.reduce(
318 (accumulator, workerNode) =>
319 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
320 0
321 )
322 }),
323 ...(this.opts.enableTasksQueue === true && {
324 backPressure: this.hasBackPressure()
325 }),
326 ...(this.opts.enableTasksQueue === true && {
327 stolenTasks: this.workerNodes.reduce(
328 (accumulator, workerNode) =>
329 accumulator + workerNode.usage.tasks.stolen,
330 0
331 )
332 }),
333 failedTasks: this.workerNodes.reduce(
334 (accumulator, workerNode) =>
335 accumulator + workerNode.usage.tasks.failed,
336 0
337 ),
338 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
339 .runTime.aggregate && {
340 runTime: {
341 minimum: round(
342 min(
343 ...this.workerNodes.map(
344 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
345 )
346 )
347 ),
348 maximum: round(
349 max(
350 ...this.workerNodes.map(
351 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
352 )
353 )
354 ),
355 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
356 .runTime.average && {
357 average: round(
358 average(
359 this.workerNodes.reduce<number[]>(
360 (accumulator, workerNode) =>
361 accumulator.concat(workerNode.usage.runTime.history),
362 []
363 )
364 )
365 )
366 }),
367 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
368 .runTime.median && {
369 median: round(
370 median(
371 this.workerNodes.reduce<number[]>(
372 (accumulator, workerNode) =>
373 accumulator.concat(workerNode.usage.runTime.history),
374 []
375 )
376 )
377 )
378 })
379 }
380 }),
381 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
382 .waitTime.aggregate && {
383 waitTime: {
384 minimum: round(
385 min(
386 ...this.workerNodes.map(
387 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
388 )
389 )
390 ),
391 maximum: round(
392 max(
393 ...this.workerNodes.map(
394 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
395 )
396 )
397 ),
398 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
399 .waitTime.average && {
400 average: round(
401 average(
402 this.workerNodes.reduce<number[]>(
403 (accumulator, workerNode) =>
404 accumulator.concat(workerNode.usage.waitTime.history),
405 []
406 )
407 )
408 )
409 }),
410 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
411 .waitTime.median && {
412 median: round(
413 median(
414 this.workerNodes.reduce<number[]>(
415 (accumulator, workerNode) =>
416 accumulator.concat(workerNode.usage.waitTime.history),
417 []
418 )
419 )
420 )
421 })
422 }
423 })
424 }
425 }
426
427 /**
428 * The pool readiness boolean status.
429 */
430 private get ready (): boolean {
431 return (
432 this.workerNodes.reduce(
433 (accumulator, workerNode) =>
434 !workerNode.info.dynamic && workerNode.info.ready
435 ? accumulator + 1
436 : accumulator,
437 0
438 ) >= this.minSize
439 )
440 }
441
442 /**
443 * The approximate pool utilization.
444 *
445 * @returns The pool utilization.
446 */
447 private get utilization (): number {
448 const poolTimeCapacity =
449 (performance.now() - this.startTimestamp) * this.maxSize
450 const totalTasksRunTime = this.workerNodes.reduce(
451 (accumulator, workerNode) =>
452 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
453 0
454 )
455 const totalTasksWaitTime = this.workerNodes.reduce(
456 (accumulator, workerNode) =>
457 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
458 0
459 )
460 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
461 }
462
463 /**
464 * The pool type.
465 *
466 * If it is `'dynamic'`, it provides the `max` property.
467 */
468 protected abstract get type (): PoolType
469
470 /**
471 * The worker type.
472 */
473 protected abstract get worker (): WorkerType
474
475 /**
476 * The pool minimum size.
477 */
478 protected get minSize (): number {
479 return this.numberOfWorkers
480 }
481
482 /**
483 * The pool maximum size.
484 */
485 protected get maxSize (): number {
486 return this.max ?? this.numberOfWorkers
487 }
488
489 /**
490 * Checks if the worker id sent in the received message from a worker is valid.
491 *
492 * @param message - The received message.
493 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
494 */
495 private checkMessageWorkerId (message: MessageValue<Data | Response>): void {
496 if (message.workerId == null) {
497 throw new Error('Worker message received without worker id')
498 } else if (this.getWorkerNodeKeyByWorkerId(message.workerId) === -1) {
499 throw new Error(
500 `Worker message received from unknown worker '${message.workerId}'`
501 )
502 }
503 }
504
505 /**
506 * Gets the given worker its worker node key.
507 *
508 * @param worker - The worker.
509 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
510 */
511 private getWorkerNodeKeyByWorker (worker: Worker): number {
512 return this.workerNodes.findIndex(
513 workerNode => workerNode.worker === worker
514 )
515 }
516
517 /**
518 * Gets the worker node key given its worker id.
519 *
520 * @param workerId - The worker id.
521 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
522 */
523 private getWorkerNodeKeyByWorkerId (workerId: number | undefined): number {
524 return this.workerNodes.findIndex(
525 workerNode => workerNode.info.id === workerId
526 )
527 }
528
529 /** @inheritDoc */
530 public setWorkerChoiceStrategy (
531 workerChoiceStrategy: WorkerChoiceStrategy,
532 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
533 ): void {
534 checkValidWorkerChoiceStrategy(workerChoiceStrategy)
535 this.opts.workerChoiceStrategy = workerChoiceStrategy
536 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
537 this.opts.workerChoiceStrategy
538 )
539 if (workerChoiceStrategyOptions != null) {
540 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
541 }
542 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
543 workerNode.resetUsage()
544 this.sendStatisticsMessageToWorker(workerNodeKey)
545 }
546 }
547
548 /** @inheritDoc */
549 public setWorkerChoiceStrategyOptions (
550 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
551 ): void {
552 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
553 this.opts.workerChoiceStrategyOptions = {
554 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
555 ...workerChoiceStrategyOptions
556 }
557 this.workerChoiceStrategyContext.setOptions(
558 this.opts.workerChoiceStrategyOptions
559 )
560 }
561
562 /** @inheritDoc */
563 public enableTasksQueue (
564 enable: boolean,
565 tasksQueueOptions?: TasksQueueOptions
566 ): void {
567 if (this.opts.enableTasksQueue === true && !enable) {
568 this.unsetTaskStealing()
569 this.unsetTasksStealingOnBackPressure()
570 this.flushTasksQueues()
571 }
572 this.opts.enableTasksQueue = enable
573 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
574 }
575
576 /** @inheritDoc */
577 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
578 if (this.opts.enableTasksQueue === true) {
579 checkValidTasksQueueOptions(tasksQueueOptions)
580 this.opts.tasksQueueOptions =
581 this.buildTasksQueueOptions(tasksQueueOptions)
582 this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number)
583 if (this.opts.tasksQueueOptions.taskStealing === true) {
584 this.setTaskStealing()
585 } else {
586 this.unsetTaskStealing()
587 }
588 if (this.opts.tasksQueueOptions.tasksStealingOnBackPressure === true) {
589 this.setTasksStealingOnBackPressure()
590 } else {
591 this.unsetTasksStealingOnBackPressure()
592 }
593 } else if (this.opts.tasksQueueOptions != null) {
594 delete this.opts.tasksQueueOptions
595 }
596 }
597
598 private buildTasksQueueOptions (
599 tasksQueueOptions: TasksQueueOptions
600 ): TasksQueueOptions {
601 return {
602 ...{
603 size: Math.pow(this.maxSize, 2),
604 concurrency: 1,
605 taskStealing: true,
606 tasksStealingOnBackPressure: true
607 },
608 ...tasksQueueOptions
609 }
610 }
611
612 private setTasksQueueSize (size: number): void {
613 for (const workerNode of this.workerNodes) {
614 workerNode.tasksQueueBackPressureSize = size
615 }
616 }
617
618 private setTaskStealing (): void {
619 for (const [workerNodeKey] of this.workerNodes.entries()) {
620 this.workerNodes[workerNodeKey].addEventListener(
621 'emptyqueue',
622 this.handleEmptyQueueEvent as EventListener
623 )
624 }
625 }
626
627 private unsetTaskStealing (): void {
628 for (const [workerNodeKey] of this.workerNodes.entries()) {
629 this.workerNodes[workerNodeKey].removeEventListener(
630 'emptyqueue',
631 this.handleEmptyQueueEvent as EventListener
632 )
633 }
634 }
635
636 private setTasksStealingOnBackPressure (): void {
637 for (const [workerNodeKey] of this.workerNodes.entries()) {
638 this.workerNodes[workerNodeKey].addEventListener(
639 'backpressure',
640 this.handleBackPressureEvent as EventListener
641 )
642 }
643 }
644
645 private unsetTasksStealingOnBackPressure (): void {
646 for (const [workerNodeKey] of this.workerNodes.entries()) {
647 this.workerNodes[workerNodeKey].removeEventListener(
648 'backpressure',
649 this.handleBackPressureEvent as EventListener
650 )
651 }
652 }
653
654 /**
655 * Whether the pool is full or not.
656 *
657 * The pool filling boolean status.
658 */
659 protected get full (): boolean {
660 return this.workerNodes.length >= this.maxSize
661 }
662
663 /**
664 * Whether the pool is busy or not.
665 *
666 * The pool busyness boolean status.
667 */
668 protected abstract get busy (): boolean
669
670 /**
671 * Whether worker nodes are executing concurrently their tasks quota or not.
672 *
673 * @returns Worker nodes busyness boolean status.
674 */
675 protected internalBusy (): boolean {
676 if (this.opts.enableTasksQueue === true) {
677 return (
678 this.workerNodes.findIndex(
679 workerNode =>
680 workerNode.info.ready &&
681 workerNode.usage.tasks.executing <
682 (this.opts.tasksQueueOptions?.concurrency as number)
683 ) === -1
684 )
685 }
686 return (
687 this.workerNodes.findIndex(
688 workerNode =>
689 workerNode.info.ready && workerNode.usage.tasks.executing === 0
690 ) === -1
691 )
692 }
693
694 private async sendTaskFunctionOperationToWorker (
695 workerNodeKey: number,
696 message: MessageValue<Data>
697 ): Promise<boolean> {
698 return await new Promise<boolean>((resolve, reject) => {
699 const taskFunctionOperationListener = (
700 message: MessageValue<Response>
701 ): void => {
702 this.checkMessageWorkerId(message)
703 const workerId = this.getWorkerInfo(workerNodeKey).id as number
704 if (
705 message.taskFunctionOperationStatus != null &&
706 message.workerId === workerId
707 ) {
708 if (message.taskFunctionOperationStatus) {
709 resolve(true)
710 } else if (!message.taskFunctionOperationStatus) {
711 reject(
712 new Error(
713 `Task function operation '${
714 message.taskFunctionOperation as string
715 }' failed on worker ${message.workerId} with error: '${
716 message.workerError?.message as string
717 }'`
718 )
719 )
720 }
721 this.deregisterWorkerMessageListener(
722 this.getWorkerNodeKeyByWorkerId(message.workerId),
723 taskFunctionOperationListener
724 )
725 }
726 }
727 this.registerWorkerMessageListener(
728 workerNodeKey,
729 taskFunctionOperationListener
730 )
731 this.sendToWorker(workerNodeKey, message)
732 })
733 }
734
735 private async sendTaskFunctionOperationToWorkers (
736 message: MessageValue<Data>
737 ): Promise<boolean> {
738 return await new Promise<boolean>((resolve, reject) => {
739 const responsesReceived = new Array<MessageValue<Response>>()
740 const taskFunctionOperationsListener = (
741 message: MessageValue<Response>
742 ): void => {
743 this.checkMessageWorkerId(message)
744 if (message.taskFunctionOperationStatus != null) {
745 responsesReceived.push(message)
746 if (responsesReceived.length === this.workerNodes.length) {
747 if (
748 responsesReceived.every(
749 message => message.taskFunctionOperationStatus === true
750 )
751 ) {
752 resolve(true)
753 } else if (
754 responsesReceived.some(
755 message => message.taskFunctionOperationStatus === false
756 )
757 ) {
758 const errorResponse = responsesReceived.find(
759 response => response.taskFunctionOperationStatus === false
760 )
761 reject(
762 new Error(
763 `Task function operation '${
764 message.taskFunctionOperation as string
765 }' failed on worker ${
766 errorResponse?.workerId as number
767 } with error: '${
768 errorResponse?.workerError?.message as string
769 }'`
770 )
771 )
772 }
773 this.deregisterWorkerMessageListener(
774 this.getWorkerNodeKeyByWorkerId(message.workerId),
775 taskFunctionOperationsListener
776 )
777 }
778 }
779 }
780 for (const [workerNodeKey] of this.workerNodes.entries()) {
781 this.registerWorkerMessageListener(
782 workerNodeKey,
783 taskFunctionOperationsListener
784 )
785 this.sendToWorker(workerNodeKey, message)
786 }
787 })
788 }
789
790 /** @inheritDoc */
791 public hasTaskFunction (name: string): boolean {
792 for (const workerNode of this.workerNodes) {
793 if (
794 Array.isArray(workerNode.info.taskFunctionNames) &&
795 workerNode.info.taskFunctionNames.includes(name)
796 ) {
797 return true
798 }
799 }
800 return false
801 }
802
803 /** @inheritDoc */
804 public async addTaskFunction (
805 name: string,
806 fn: TaskFunction<Data, Response>
807 ): Promise<boolean> {
808 if (typeof name !== 'string') {
809 throw new TypeError('name argument must be a string')
810 }
811 if (typeof name === 'string' && name.trim().length === 0) {
812 throw new TypeError('name argument must not be an empty string')
813 }
814 if (typeof fn !== 'function') {
815 throw new TypeError('fn argument must be a function')
816 }
817 const opResult = await this.sendTaskFunctionOperationToWorkers({
818 taskFunctionOperation: 'add',
819 taskFunctionName: name,
820 taskFunction: fn.toString()
821 })
822 this.taskFunctions.set(name, fn)
823 return opResult
824 }
825
826 /** @inheritDoc */
827 public async removeTaskFunction (name: string): Promise<boolean> {
828 if (!this.taskFunctions.has(name)) {
829 throw new Error(
830 'Cannot remove a task function not handled on the pool side'
831 )
832 }
833 const opResult = await this.sendTaskFunctionOperationToWorkers({
834 taskFunctionOperation: 'remove',
835 taskFunctionName: name
836 })
837 this.deleteTaskFunctionWorkerUsages(name)
838 this.taskFunctions.delete(name)
839 return opResult
840 }
841
842 /** @inheritDoc */
843 public listTaskFunctionNames (): string[] {
844 for (const workerNode of this.workerNodes) {
845 if (
846 Array.isArray(workerNode.info.taskFunctionNames) &&
847 workerNode.info.taskFunctionNames.length > 0
848 ) {
849 return workerNode.info.taskFunctionNames
850 }
851 }
852 return []
853 }
854
855 /** @inheritDoc */
856 public async setDefaultTaskFunction (name: string): Promise<boolean> {
857 return await this.sendTaskFunctionOperationToWorkers({
858 taskFunctionOperation: 'default',
859 taskFunctionName: name
860 })
861 }
862
863 private deleteTaskFunctionWorkerUsages (name: string): void {
864 for (const workerNode of this.workerNodes) {
865 workerNode.deleteTaskFunctionWorkerUsage(name)
866 }
867 }
868
869 private shallExecuteTask (workerNodeKey: number): boolean {
870 return (
871 this.tasksQueueSize(workerNodeKey) === 0 &&
872 this.workerNodes[workerNodeKey].usage.tasks.executing <
873 (this.opts.tasksQueueOptions?.concurrency as number)
874 )
875 }
876
877 /** @inheritDoc */
878 public async execute (
879 data?: Data,
880 name?: string,
881 transferList?: TransferListItem[]
882 ): Promise<Response> {
883 return await new Promise<Response>((resolve, reject) => {
884 if (!this.started) {
885 reject(new Error('Cannot execute a task on not started pool'))
886 return
887 }
888 if (name != null && typeof name !== 'string') {
889 reject(new TypeError('name argument must be a string'))
890 return
891 }
892 if (
893 name != null &&
894 typeof name === 'string' &&
895 name.trim().length === 0
896 ) {
897 reject(new TypeError('name argument must not be an empty string'))
898 return
899 }
900 if (transferList != null && !Array.isArray(transferList)) {
901 reject(new TypeError('transferList argument must be an array'))
902 return
903 }
904 const timestamp = performance.now()
905 const workerNodeKey = this.chooseWorkerNode()
906 const task: Task<Data> = {
907 name: name ?? DEFAULT_TASK_NAME,
908 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
909 data: data ?? ({} as Data),
910 transferList,
911 timestamp,
912 taskId: randomUUID()
913 }
914 this.promiseResponseMap.set(task.taskId as string, {
915 resolve,
916 reject,
917 workerNodeKey
918 })
919 if (
920 this.opts.enableTasksQueue === false ||
921 (this.opts.enableTasksQueue === true &&
922 this.shallExecuteTask(workerNodeKey))
923 ) {
924 this.executeTask(workerNodeKey, task)
925 } else {
926 this.enqueueTask(workerNodeKey, task)
927 }
928 })
929 }
930
931 /** @inheritdoc */
932 public start (): void {
933 this.starting = true
934 while (
935 this.workerNodes.reduce(
936 (accumulator, workerNode) =>
937 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
938 0
939 ) < this.numberOfWorkers
940 ) {
941 this.createAndSetupWorkerNode()
942 }
943 this.starting = false
944 this.started = true
945 }
946
947 /** @inheritDoc */
948 public async destroy (): Promise<void> {
949 await Promise.all(
950 this.workerNodes.map(async (_, workerNodeKey) => {
951 await this.destroyWorkerNode(workerNodeKey)
952 })
953 )
954 this.emitter?.emit(PoolEvents.destroy, this.info)
955 this.emitter?.emitDestroy()
956 this.started = false
957 }
958
959 protected async sendKillMessageToWorker (
960 workerNodeKey: number
961 ): Promise<void> {
962 await new Promise<void>((resolve, reject) => {
963 const killMessageListener = (message: MessageValue<Response>): void => {
964 this.checkMessageWorkerId(message)
965 if (message.kill === 'success') {
966 resolve()
967 } else if (message.kill === 'failure') {
968 reject(
969 new Error(
970 `Kill message handling failed on worker ${
971 message.workerId as number
972 }`
973 )
974 )
975 }
976 }
977 this.registerWorkerMessageListener(workerNodeKey, killMessageListener)
978 this.sendToWorker(workerNodeKey, { kill: true })
979 })
980 }
981
982 /**
983 * Terminates the worker node given its worker node key.
984 *
985 * @param workerNodeKey - The worker node key.
986 */
987 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
988
989 /**
990 * Setup hook to execute code before worker nodes are created in the abstract constructor.
991 * Can be overridden.
992 *
993 * @virtual
994 */
995 protected setupHook (): void {
996 /* Intentionally empty */
997 }
998
999 /**
1000 * Should return whether the worker is the main worker or not.
1001 */
1002 protected abstract isMain (): boolean
1003
1004 /**
1005 * Hook executed before the worker task execution.
1006 * Can be overridden.
1007 *
1008 * @param workerNodeKey - The worker node key.
1009 * @param task - The task to execute.
1010 */
1011 protected beforeTaskExecutionHook (
1012 workerNodeKey: number,
1013 task: Task<Data>
1014 ): void {
1015 if (this.workerNodes[workerNodeKey]?.usage != null) {
1016 const workerUsage = this.workerNodes[workerNodeKey].usage
1017 ++workerUsage.tasks.executing
1018 this.updateWaitTimeWorkerUsage(workerUsage, task)
1019 }
1020 if (
1021 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1022 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
1023 task.name as string
1024 ) != null
1025 ) {
1026 const taskFunctionWorkerUsage = this.workerNodes[
1027 workerNodeKey
1028 ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
1029 ++taskFunctionWorkerUsage.tasks.executing
1030 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
1031 }
1032 }
1033
1034 /**
1035 * Hook executed after the worker task execution.
1036 * Can be overridden.
1037 *
1038 * @param workerNodeKey - The worker node key.
1039 * @param message - The received message.
1040 */
1041 protected afterTaskExecutionHook (
1042 workerNodeKey: number,
1043 message: MessageValue<Response>
1044 ): void {
1045 if (this.workerNodes[workerNodeKey]?.usage != null) {
1046 const workerUsage = this.workerNodes[workerNodeKey].usage
1047 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
1048 this.updateRunTimeWorkerUsage(workerUsage, message)
1049 this.updateEluWorkerUsage(workerUsage, message)
1050 }
1051 if (
1052 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1053 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
1054 message.taskPerformance?.name as string
1055 ) != null
1056 ) {
1057 const taskFunctionWorkerUsage = this.workerNodes[
1058 workerNodeKey
1059 ].getTaskFunctionWorkerUsage(
1060 message.taskPerformance?.name as string
1061 ) as WorkerUsage
1062 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
1063 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
1064 this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
1065 }
1066 }
1067
1068 /**
1069 * Whether the worker node shall update its task function worker usage or not.
1070 *
1071 * @param workerNodeKey - The worker node key.
1072 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1073 */
1074 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
1075 const workerInfo = this.getWorkerInfo(workerNodeKey)
1076 return (
1077 workerInfo != null &&
1078 Array.isArray(workerInfo.taskFunctionNames) &&
1079 workerInfo.taskFunctionNames.length > 2
1080 )
1081 }
1082
1083 private updateTaskStatisticsWorkerUsage (
1084 workerUsage: WorkerUsage,
1085 message: MessageValue<Response>
1086 ): void {
1087 const workerTaskStatistics = workerUsage.tasks
1088 if (
1089 workerTaskStatistics.executing != null &&
1090 workerTaskStatistics.executing > 0
1091 ) {
1092 --workerTaskStatistics.executing
1093 }
1094 if (message.workerError == null) {
1095 ++workerTaskStatistics.executed
1096 } else {
1097 ++workerTaskStatistics.failed
1098 }
1099 }
1100
1101 private updateRunTimeWorkerUsage (
1102 workerUsage: WorkerUsage,
1103 message: MessageValue<Response>
1104 ): void {
1105 if (message.workerError != null) {
1106 return
1107 }
1108 updateMeasurementStatistics(
1109 workerUsage.runTime,
1110 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
1111 message.taskPerformance?.runTime ?? 0
1112 )
1113 }
1114
1115 private updateWaitTimeWorkerUsage (
1116 workerUsage: WorkerUsage,
1117 task: Task<Data>
1118 ): void {
1119 const timestamp = performance.now()
1120 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
1121 updateMeasurementStatistics(
1122 workerUsage.waitTime,
1123 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
1124 taskWaitTime
1125 )
1126 }
1127
1128 private updateEluWorkerUsage (
1129 workerUsage: WorkerUsage,
1130 message: MessageValue<Response>
1131 ): void {
1132 if (message.workerError != null) {
1133 return
1134 }
1135 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
1136 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
1137 updateMeasurementStatistics(
1138 workerUsage.elu.active,
1139 eluTaskStatisticsRequirements,
1140 message.taskPerformance?.elu?.active ?? 0
1141 )
1142 updateMeasurementStatistics(
1143 workerUsage.elu.idle,
1144 eluTaskStatisticsRequirements,
1145 message.taskPerformance?.elu?.idle ?? 0
1146 )
1147 if (eluTaskStatisticsRequirements.aggregate) {
1148 if (message.taskPerformance?.elu != null) {
1149 if (workerUsage.elu.utilization != null) {
1150 workerUsage.elu.utilization =
1151 (workerUsage.elu.utilization +
1152 message.taskPerformance.elu.utilization) /
1153 2
1154 } else {
1155 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
1156 }
1157 }
1158 }
1159 }
1160
1161 /**
1162 * Chooses a worker node for the next task.
1163 *
1164 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
1165 *
1166 * @returns The chosen worker node key
1167 */
1168 private chooseWorkerNode (): number {
1169 if (this.shallCreateDynamicWorker()) {
1170 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
1171 if (
1172 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1173 ) {
1174 return workerNodeKey
1175 }
1176 }
1177 return this.workerChoiceStrategyContext.execute()
1178 }
1179
1180 /**
1181 * Conditions for dynamic worker creation.
1182 *
1183 * @returns Whether to create a dynamic worker or not.
1184 */
1185 private shallCreateDynamicWorker (): boolean {
1186 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
1187 }
1188
1189 /**
1190 * Sends a message to worker given its worker node key.
1191 *
1192 * @param workerNodeKey - The worker node key.
1193 * @param message - The message.
1194 * @param transferList - The optional array of transferable objects.
1195 */
1196 protected abstract sendToWorker (
1197 workerNodeKey: number,
1198 message: MessageValue<Data>,
1199 transferList?: TransferListItem[]
1200 ): void
1201
1202 /**
1203 * Creates a new worker.
1204 *
1205 * @returns Newly created worker.
1206 */
1207 protected abstract createWorker (): Worker
1208
1209 /**
1210 * Creates a new, completely set up worker node.
1211 *
1212 * @returns New, completely set up worker node key.
1213 */
1214 protected createAndSetupWorkerNode (): number {
1215 const worker = this.createWorker()
1216
1217 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
1218 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
1219 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1220 worker.on('error', error => {
1221 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1222 this.flagWorkerNodeAsNotReady(workerNodeKey)
1223 const workerInfo = this.getWorkerInfo(workerNodeKey)
1224 this.emitter?.emit(PoolEvents.error, error)
1225 this.workerNodes[workerNodeKey].closeChannel()
1226 if (
1227 this.started &&
1228 !this.starting &&
1229 this.opts.restartWorkerOnError === true
1230 ) {
1231 if (workerInfo.dynamic) {
1232 this.createAndSetupDynamicWorkerNode()
1233 } else {
1234 this.createAndSetupWorkerNode()
1235 }
1236 }
1237 if (this.started && this.opts.enableTasksQueue === true) {
1238 this.redistributeQueuedTasks(workerNodeKey)
1239 }
1240 })
1241 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
1242 worker.once('exit', () => {
1243 this.removeWorkerNode(worker)
1244 })
1245
1246 const workerNodeKey = this.addWorkerNode(worker)
1247
1248 this.afterWorkerNodeSetup(workerNodeKey)
1249
1250 return workerNodeKey
1251 }
1252
1253 /**
1254 * Creates a new, completely set up dynamic worker node.
1255 *
1256 * @returns New, completely set up dynamic worker node key.
1257 */
1258 protected createAndSetupDynamicWorkerNode (): number {
1259 const workerNodeKey = this.createAndSetupWorkerNode()
1260 this.registerWorkerMessageListener(workerNodeKey, message => {
1261 this.checkMessageWorkerId(message)
1262 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1263 message.workerId
1264 )
1265 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
1266 // Kill message received from worker
1267 if (
1268 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1269 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
1270 ((this.opts.enableTasksQueue === false &&
1271 workerUsage.tasks.executing === 0) ||
1272 (this.opts.enableTasksQueue === true &&
1273 workerUsage.tasks.executing === 0 &&
1274 this.tasksQueueSize(localWorkerNodeKey) === 0)))
1275 ) {
1276 // Flag the worker node as not ready immediately
1277 this.flagWorkerNodeAsNotReady(localWorkerNodeKey)
1278 this.destroyWorkerNode(localWorkerNodeKey).catch(error => {
1279 this.emitter?.emit(PoolEvents.error, error)
1280 })
1281 }
1282 })
1283 const workerInfo = this.getWorkerInfo(workerNodeKey)
1284 this.sendToWorker(workerNodeKey, {
1285 checkActive: true
1286 })
1287 if (this.taskFunctions.size > 0) {
1288 for (const [taskFunctionName, taskFunction] of this.taskFunctions) {
1289 this.sendTaskFunctionOperationToWorker(workerNodeKey, {
1290 taskFunctionOperation: 'add',
1291 taskFunctionName,
1292 taskFunction: taskFunction.toString()
1293 }).catch(error => {
1294 this.emitter?.emit(PoolEvents.error, error)
1295 })
1296 }
1297 }
1298 workerInfo.dynamic = true
1299 if (
1300 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
1301 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1302 ) {
1303 workerInfo.ready = true
1304 }
1305 this.checkAndEmitDynamicWorkerCreationEvents()
1306 return workerNodeKey
1307 }
1308
1309 /**
1310 * Registers a listener callback on the worker given its worker node key.
1311 *
1312 * @param workerNodeKey - The worker node key.
1313 * @param listener - The message listener callback.
1314 */
1315 protected abstract registerWorkerMessageListener<
1316 Message extends Data | Response
1317 >(
1318 workerNodeKey: number,
1319 listener: (message: MessageValue<Message>) => void
1320 ): void
1321
1322 /**
1323 * Registers once a listener callback on the worker given its worker node key.
1324 *
1325 * @param workerNodeKey - The worker node key.
1326 * @param listener - The message listener callback.
1327 */
1328 protected abstract registerOnceWorkerMessageListener<
1329 Message extends Data | Response
1330 >(
1331 workerNodeKey: number,
1332 listener: (message: MessageValue<Message>) => void
1333 ): void
1334
1335 /**
1336 * Deregisters a listener callback on the worker given its worker node key.
1337 *
1338 * @param workerNodeKey - The worker node key.
1339 * @param listener - The message listener callback.
1340 */
1341 protected abstract deregisterWorkerMessageListener<
1342 Message extends Data | Response
1343 >(
1344 workerNodeKey: number,
1345 listener: (message: MessageValue<Message>) => void
1346 ): void
1347
1348 /**
1349 * Method hooked up after a worker node has been newly created.
1350 * Can be overridden.
1351 *
1352 * @param workerNodeKey - The newly created worker node key.
1353 */
1354 protected afterWorkerNodeSetup (workerNodeKey: number): void {
1355 // Listen to worker messages.
1356 this.registerWorkerMessageListener(
1357 workerNodeKey,
1358 this.workerMessageListener.bind(this)
1359 )
1360 // Send the startup message to worker.
1361 this.sendStartupMessageToWorker(workerNodeKey)
1362 // Send the statistics message to worker.
1363 this.sendStatisticsMessageToWorker(workerNodeKey)
1364 if (this.opts.enableTasksQueue === true) {
1365 if (this.opts.tasksQueueOptions?.taskStealing === true) {
1366 this.workerNodes[workerNodeKey].addEventListener(
1367 'emptyqueue',
1368 this.handleEmptyQueueEvent as EventListener
1369 )
1370 }
1371 if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
1372 this.workerNodes[workerNodeKey].addEventListener(
1373 'backpressure',
1374 this.handleBackPressureEvent as EventListener
1375 )
1376 }
1377 }
1378 }
1379
1380 /**
1381 * Sends the startup message to worker given its worker node key.
1382 *
1383 * @param workerNodeKey - The worker node key.
1384 */
1385 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1386
1387 /**
1388 * Sends the statistics message to worker given its worker node key.
1389 *
1390 * @param workerNodeKey - The worker node key.
1391 */
1392 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
1393 this.sendToWorker(workerNodeKey, {
1394 statistics: {
1395 runTime:
1396 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1397 .runTime.aggregate,
1398 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1399 .elu.aggregate
1400 }
1401 })
1402 }
1403
1404 private redistributeQueuedTasks (workerNodeKey: number): void {
1405 while (this.tasksQueueSize(workerNodeKey) > 0) {
1406 const destinationWorkerNodeKey = this.workerNodes.reduce(
1407 (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
1408 return workerNode.info.ready &&
1409 workerNode.usage.tasks.queued <
1410 workerNodes[minWorkerNodeKey].usage.tasks.queued
1411 ? workerNodeKey
1412 : minWorkerNodeKey
1413 },
1414 0
1415 )
1416 const task = this.dequeueTask(workerNodeKey) as Task<Data>
1417 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1418 this.executeTask(destinationWorkerNodeKey, task)
1419 } else {
1420 this.enqueueTask(destinationWorkerNodeKey, task)
1421 }
1422 }
1423 }
1424
1425 private updateTaskStolenStatisticsWorkerUsage (
1426 workerNodeKey: number,
1427 taskName: string
1428 ): void {
1429 const workerNode = this.workerNodes[workerNodeKey]
1430 if (workerNode?.usage != null) {
1431 ++workerNode.usage.tasks.stolen
1432 }
1433 if (
1434 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1435 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1436 ) {
1437 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1438 taskName
1439 ) as WorkerUsage
1440 ++taskFunctionWorkerUsage.tasks.stolen
1441 }
1442 }
1443
1444 private readonly handleEmptyQueueEvent = (
1445 event: CustomEvent<WorkerNodeEventDetail>
1446 ): void => {
1447 const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1448 event.detail.workerId
1449 )
1450 const workerNodes = this.workerNodes
1451 .slice()
1452 .sort(
1453 (workerNodeA, workerNodeB) =>
1454 workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
1455 )
1456 const sourceWorkerNode = workerNodes.find(
1457 workerNode =>
1458 workerNode.info.ready &&
1459 workerNode.info.id !== event.detail.workerId &&
1460 workerNode.usage.tasks.queued > 0
1461 )
1462 if (sourceWorkerNode != null) {
1463 const task = sourceWorkerNode.popTask() as Task<Data>
1464 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1465 this.executeTask(destinationWorkerNodeKey, task)
1466 } else {
1467 this.enqueueTask(destinationWorkerNodeKey, task)
1468 }
1469 this.updateTaskStolenStatisticsWorkerUsage(
1470 destinationWorkerNodeKey,
1471 task.name as string
1472 )
1473 }
1474 }
1475
1476 private readonly handleBackPressureEvent = (
1477 event: CustomEvent<WorkerNodeEventDetail>
1478 ): void => {
1479 const sizeOffset = 1
1480 if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) {
1481 return
1482 }
1483 const sourceWorkerNode =
1484 this.workerNodes[this.getWorkerNodeKeyByWorkerId(event.detail.workerId)]
1485 const workerNodes = this.workerNodes
1486 .slice()
1487 .sort(
1488 (workerNodeA, workerNodeB) =>
1489 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1490 )
1491 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1492 if (
1493 sourceWorkerNode.usage.tasks.queued > 0 &&
1494 workerNode.info.ready &&
1495 workerNode.info.id !== event.detail.workerId &&
1496 workerNode.usage.tasks.queued <
1497 (this.opts.tasksQueueOptions?.size as number) - sizeOffset
1498 ) {
1499 const task = sourceWorkerNode.popTask() as Task<Data>
1500 if (this.shallExecuteTask(workerNodeKey)) {
1501 this.executeTask(workerNodeKey, task)
1502 } else {
1503 this.enqueueTask(workerNodeKey, task)
1504 }
1505 this.updateTaskStolenStatisticsWorkerUsage(
1506 workerNodeKey,
1507 task.name as string
1508 )
1509 }
1510 }
1511 }
1512
1513 /**
1514 * This method is the message listener registered on each worker.
1515 */
1516 protected workerMessageListener (message: MessageValue<Response>): void {
1517 this.checkMessageWorkerId(message)
1518 if (message.ready != null && message.taskFunctionNames != null) {
1519 // Worker ready response received from worker
1520 this.handleWorkerReadyResponse(message)
1521 } else if (message.taskId != null) {
1522 // Task execution response received from worker
1523 this.handleTaskExecutionResponse(message)
1524 } else if (message.taskFunctionNames != null) {
1525 // Task function names message received from worker
1526 this.getWorkerInfo(
1527 this.getWorkerNodeKeyByWorkerId(message.workerId)
1528 ).taskFunctionNames = message.taskFunctionNames
1529 }
1530 }
1531
1532 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
1533 if (message.ready === false) {
1534 throw new Error(
1535 `Worker ${message.workerId as number} failed to initialize`
1536 )
1537 }
1538 const workerInfo = this.getWorkerInfo(
1539 this.getWorkerNodeKeyByWorkerId(message.workerId)
1540 )
1541 workerInfo.ready = message.ready as boolean
1542 workerInfo.taskFunctionNames = message.taskFunctionNames
1543 if (this.ready) {
1544 this.emitter?.emit(PoolEvents.ready, this.info)
1545 }
1546 }
1547
1548 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1549 const { taskId, workerError, data } = message
1550 const promiseResponse = this.promiseResponseMap.get(taskId as string)
1551 if (promiseResponse != null) {
1552 if (workerError != null) {
1553 this.emitter?.emit(PoolEvents.taskError, workerError)
1554 promiseResponse.reject(workerError.message)
1555 } else {
1556 promiseResponse.resolve(data as Response)
1557 }
1558 const workerNodeKey = promiseResponse.workerNodeKey
1559 this.afterTaskExecutionHook(workerNodeKey, message)
1560 this.workerChoiceStrategyContext.update(workerNodeKey)
1561 this.promiseResponseMap.delete(taskId as string)
1562 if (
1563 this.opts.enableTasksQueue === true &&
1564 this.tasksQueueSize(workerNodeKey) > 0 &&
1565 this.workerNodes[workerNodeKey].usage.tasks.executing <
1566 (this.opts.tasksQueueOptions?.concurrency as number)
1567 ) {
1568 this.executeTask(
1569 workerNodeKey,
1570 this.dequeueTask(workerNodeKey) as Task<Data>
1571 )
1572 }
1573 }
1574 }
1575
1576 private checkAndEmitTaskExecutionEvents (): void {
1577 if (this.busy) {
1578 this.emitter?.emit(PoolEvents.busy, this.info)
1579 }
1580 }
1581
1582 private checkAndEmitTaskQueuingEvents (): void {
1583 if (this.hasBackPressure()) {
1584 this.emitter?.emit(PoolEvents.backPressure, this.info)
1585 }
1586 }
1587
1588 private checkAndEmitDynamicWorkerCreationEvents (): void {
1589 if (this.type === PoolTypes.dynamic) {
1590 if (this.full) {
1591 this.emitter?.emit(PoolEvents.full, this.info)
1592 }
1593 }
1594 }
1595
1596 /**
1597 * Gets the worker information given its worker node key.
1598 *
1599 * @param workerNodeKey - The worker node key.
1600 * @returns The worker information.
1601 */
1602 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
1603 return this.workerNodes[workerNodeKey]?.info
1604 }
1605
1606 /**
1607 * Adds the given worker in the pool worker nodes.
1608 *
1609 * @param worker - The worker.
1610 * @returns The added worker node key.
1611 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1612 */
1613 private addWorkerNode (worker: Worker): number {
1614 const workerNode = new WorkerNode<Worker, Data>(
1615 worker,
1616 this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2)
1617 )
1618 // Flag the worker node as ready at pool startup.
1619 if (this.starting) {
1620 workerNode.info.ready = true
1621 }
1622 this.workerNodes.push(workerNode)
1623 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1624 if (workerNodeKey === -1) {
1625 throw new Error('Worker added not found in worker nodes')
1626 }
1627 return workerNodeKey
1628 }
1629
1630 /**
1631 * Removes the given worker from the pool worker nodes.
1632 *
1633 * @param worker - The worker.
1634 */
1635 private removeWorkerNode (worker: Worker): void {
1636 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1637 if (workerNodeKey !== -1) {
1638 this.workerNodes.splice(workerNodeKey, 1)
1639 this.workerChoiceStrategyContext.remove(workerNodeKey)
1640 }
1641 }
1642
1643 protected flagWorkerNodeAsNotReady (workerNodeKey: number): void {
1644 this.getWorkerInfo(workerNodeKey).ready = false
1645 }
1646
1647 /** @inheritDoc */
1648 public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
1649 return (
1650 this.opts.enableTasksQueue === true &&
1651 this.workerNodes[workerNodeKey].hasBackPressure()
1652 )
1653 }
1654
1655 private hasBackPressure (): boolean {
1656 return (
1657 this.opts.enableTasksQueue === true &&
1658 this.workerNodes.findIndex(
1659 workerNode => !workerNode.hasBackPressure()
1660 ) === -1
1661 )
1662 }
1663
1664 /**
1665 * Executes the given task on the worker given its worker node key.
1666 *
1667 * @param workerNodeKey - The worker node key.
1668 * @param task - The task to execute.
1669 */
1670 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1671 this.beforeTaskExecutionHook(workerNodeKey, task)
1672 this.sendToWorker(workerNodeKey, task, task.transferList)
1673 this.checkAndEmitTaskExecutionEvents()
1674 }
1675
1676 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1677 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1678 this.checkAndEmitTaskQueuingEvents()
1679 return tasksQueueSize
1680 }
1681
1682 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1683 return this.workerNodes[workerNodeKey].dequeueTask()
1684 }
1685
1686 private tasksQueueSize (workerNodeKey: number): number {
1687 return this.workerNodes[workerNodeKey].tasksQueueSize()
1688 }
1689
1690 protected flushTasksQueue (workerNodeKey: number): void {
1691 while (this.tasksQueueSize(workerNodeKey) > 0) {
1692 this.executeTask(
1693 workerNodeKey,
1694 this.dequeueTask(workerNodeKey) as Task<Data>
1695 )
1696 }
1697 this.workerNodes[workerNodeKey].clearTasksQueue()
1698 }
1699
1700 private flushTasksQueues (): void {
1701 for (const [workerNodeKey] of this.workerNodes.entries()) {
1702 this.flushTasksQueue(workerNodeKey)
1703 }
1704 }
1705 }