refactor: use object destructuration in worker message listener
[poolifier.git] / src / pools / abstract-pool.ts
1 import { randomUUID } from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import type { TransferListItem } from 'node:worker_threads'
4 import { EventEmitterAsyncResource } from 'node:events'
5 import type {
6 MessageValue,
7 PromiseResponseWrapper,
8 Task
9 } from '../utility-types'
10 import {
11 DEFAULT_TASK_NAME,
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
13 EMPTY_FUNCTION,
14 average,
15 exponentialDelay,
16 isKillBehavior,
17 isPlainObject,
18 max,
19 median,
20 min,
21 round,
22 sleep
23 } from '../utils'
24 import { KillBehaviors } from '../worker/worker-options'
25 import type { TaskFunction } from '../worker/task-functions'
26 import {
27 type IPool,
28 PoolEvents,
29 type PoolInfo,
30 type PoolOptions,
31 type PoolType,
32 PoolTypes,
33 type TasksQueueOptions
34 } from './pool'
35 import type {
36 IWorker,
37 IWorkerNode,
38 TaskStatistics,
39 WorkerInfo,
40 WorkerNodeEventDetail,
41 WorkerType,
42 WorkerUsage
43 } from './worker'
44 import {
45 type MeasurementStatisticsRequirements,
46 Measurements,
47 WorkerChoiceStrategies,
48 type WorkerChoiceStrategy,
49 type WorkerChoiceStrategyOptions
50 } from './selection-strategies/selection-strategies-types'
51 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
52 import { version } from './version'
53 import { WorkerNode } from './worker-node'
54 import {
55 checkFilePath,
56 checkValidTasksQueueOptions,
57 checkValidWorkerChoiceStrategy,
58 updateMeasurementStatistics
59 } from './utils'
60
61 /**
62 * Base class that implements some shared logic for all poolifier pools.
63 *
64 * @typeParam Worker - Type of worker which manages this pool.
65 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
66 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
67 */
68 export abstract class AbstractPool<
69 Worker extends IWorker,
70 Data = unknown,
71 Response = unknown
72 > implements IPool<Worker, Data, Response> {
73 /** @inheritDoc */
74 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
75
76 /** @inheritDoc */
77 public emitter?: EventEmitterAsyncResource
78
79 /**
80 * Dynamic pool maximum size property placeholder.
81 */
82 protected readonly max?: number
83
84 /**
85 * The task execution response promise map:
86 * - `key`: The message id of each submitted task.
87 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
88 *
89 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
90 */
91 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
92 new Map<string, PromiseResponseWrapper<Response>>()
93
94 /**
95 * Worker choice strategy context referencing a worker choice algorithm implementation.
96 */
97 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
98 Worker,
99 Data,
100 Response
101 >
102
103 /**
104 * The task functions added at runtime map:
105 * - `key`: The task function name.
106 * - `value`: The task function itself.
107 */
108 private readonly taskFunctions: Map<string, TaskFunction<Data, Response>>
109
110 /**
111 * Whether the pool is started or not.
112 */
113 private started: boolean
114 /**
115 * Whether the pool is starting or not.
116 */
117 private starting: boolean
118 /**
119 * Whether the pool is destroying or not.
120 */
121 private destroying: boolean
122 /**
123 * Whether the pool ready event has been emitted or not.
124 */
125 private readyEventEmitted: boolean
126 /**
127 * The start timestamp of the pool.
128 */
129 private readonly startTimestamp
130
131 /**
132 * Constructs a new poolifier pool.
133 *
134 * @param numberOfWorkers - Number of workers that this pool should manage.
135 * @param filePath - Path to the worker file.
136 * @param opts - Options for the pool.
137 */
138 public constructor (
139 protected readonly numberOfWorkers: number,
140 protected readonly filePath: string,
141 protected readonly opts: PoolOptions<Worker>
142 ) {
143 if (!this.isMain()) {
144 throw new Error(
145 'Cannot start a pool from a worker with the same type as the pool'
146 )
147 }
148 checkFilePath(this.filePath)
149 this.checkNumberOfWorkers(this.numberOfWorkers)
150 this.checkPoolOptions(this.opts)
151
152 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
153 this.executeTask = this.executeTask.bind(this)
154 this.enqueueTask = this.enqueueTask.bind(this)
155
156 if (this.opts.enableEvents === true) {
157 this.initializeEventEmitter()
158 }
159 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
160 Worker,
161 Data,
162 Response
163 >(
164 this,
165 this.opts.workerChoiceStrategy,
166 this.opts.workerChoiceStrategyOptions
167 )
168
169 this.setupHook()
170
171 this.taskFunctions = new Map<string, TaskFunction<Data, Response>>()
172
173 this.started = false
174 this.starting = false
175 this.destroying = false
176 this.readyEventEmitted = false
177 if (this.opts.startWorkers === true) {
178 this.start()
179 }
180
181 this.startTimestamp = performance.now()
182 }
183
184 private checkNumberOfWorkers (numberOfWorkers: number): void {
185 if (numberOfWorkers == null) {
186 throw new Error(
187 'Cannot instantiate a pool without specifying the number of workers'
188 )
189 } else if (!Number.isSafeInteger(numberOfWorkers)) {
190 throw new TypeError(
191 'Cannot instantiate a pool with a non safe integer number of workers'
192 )
193 } else if (numberOfWorkers < 0) {
194 throw new RangeError(
195 'Cannot instantiate a pool with a negative number of workers'
196 )
197 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
198 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
199 }
200 }
201
202 private checkPoolOptions (opts: PoolOptions<Worker>): void {
203 if (isPlainObject(opts)) {
204 this.opts.startWorkers = opts.startWorkers ?? true
205 checkValidWorkerChoiceStrategy(
206 opts.workerChoiceStrategy as WorkerChoiceStrategy
207 )
208 this.opts.workerChoiceStrategy =
209 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
210 this.checkValidWorkerChoiceStrategyOptions(
211 opts.workerChoiceStrategyOptions as WorkerChoiceStrategyOptions
212 )
213 this.opts.workerChoiceStrategyOptions = {
214 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
215 ...opts.workerChoiceStrategyOptions
216 }
217 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
218 this.opts.enableEvents = opts.enableEvents ?? true
219 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
220 if (this.opts.enableTasksQueue) {
221 checkValidTasksQueueOptions(opts.tasksQueueOptions as TasksQueueOptions)
222 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
223 opts.tasksQueueOptions as TasksQueueOptions
224 )
225 }
226 } else {
227 throw new TypeError('Invalid pool options: must be a plain object')
228 }
229 }
230
231 private checkValidWorkerChoiceStrategyOptions (
232 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
233 ): void {
234 if (
235 workerChoiceStrategyOptions != null &&
236 !isPlainObject(workerChoiceStrategyOptions)
237 ) {
238 throw new TypeError(
239 'Invalid worker choice strategy options: must be a plain object'
240 )
241 }
242 if (
243 workerChoiceStrategyOptions?.retries != null &&
244 !Number.isSafeInteger(workerChoiceStrategyOptions.retries)
245 ) {
246 throw new TypeError(
247 'Invalid worker choice strategy options: retries must be an integer'
248 )
249 }
250 if (
251 workerChoiceStrategyOptions?.retries != null &&
252 workerChoiceStrategyOptions.retries < 0
253 ) {
254 throw new RangeError(
255 `Invalid worker choice strategy options: retries '${workerChoiceStrategyOptions.retries}' must be greater or equal than zero`
256 )
257 }
258 if (
259 workerChoiceStrategyOptions?.weights != null &&
260 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
261 ) {
262 throw new Error(
263 'Invalid worker choice strategy options: must have a weight for each worker node'
264 )
265 }
266 if (
267 workerChoiceStrategyOptions?.measurement != null &&
268 !Object.values(Measurements).includes(
269 workerChoiceStrategyOptions.measurement
270 )
271 ) {
272 throw new Error(
273 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
274 )
275 }
276 }
277
278 private initializeEventEmitter (): void {
279 this.emitter = new EventEmitterAsyncResource({
280 name: `poolifier:${this.type}-${this.worker}-pool`
281 })
282 }
283
284 /** @inheritDoc */
285 public get info (): PoolInfo {
286 return {
287 version,
288 type: this.type,
289 worker: this.worker,
290 started: this.started,
291 ready: this.ready,
292 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
293 minSize: this.minSize,
294 maxSize: this.maxSize,
295 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
296 .runTime.aggregate &&
297 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
298 .waitTime.aggregate && { utilization: round(this.utilization) }),
299 workerNodes: this.workerNodes.length,
300 idleWorkerNodes: this.workerNodes.reduce(
301 (accumulator, workerNode) =>
302 workerNode.usage.tasks.executing === 0
303 ? accumulator + 1
304 : accumulator,
305 0
306 ),
307 busyWorkerNodes: this.workerNodes.reduce(
308 (accumulator, workerNode) =>
309 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
310 0
311 ),
312 executedTasks: this.workerNodes.reduce(
313 (accumulator, workerNode) =>
314 accumulator + workerNode.usage.tasks.executed,
315 0
316 ),
317 executingTasks: this.workerNodes.reduce(
318 (accumulator, workerNode) =>
319 accumulator + workerNode.usage.tasks.executing,
320 0
321 ),
322 ...(this.opts.enableTasksQueue === true && {
323 queuedTasks: this.workerNodes.reduce(
324 (accumulator, workerNode) =>
325 accumulator + workerNode.usage.tasks.queued,
326 0
327 )
328 }),
329 ...(this.opts.enableTasksQueue === true && {
330 maxQueuedTasks: this.workerNodes.reduce(
331 (accumulator, workerNode) =>
332 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
333 0
334 )
335 }),
336 ...(this.opts.enableTasksQueue === true && {
337 backPressure: this.hasBackPressure()
338 }),
339 ...(this.opts.enableTasksQueue === true && {
340 stolenTasks: this.workerNodes.reduce(
341 (accumulator, workerNode) =>
342 accumulator + workerNode.usage.tasks.stolen,
343 0
344 )
345 }),
346 failedTasks: this.workerNodes.reduce(
347 (accumulator, workerNode) =>
348 accumulator + workerNode.usage.tasks.failed,
349 0
350 ),
351 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
352 .runTime.aggregate && {
353 runTime: {
354 minimum: round(
355 min(
356 ...this.workerNodes.map(
357 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
358 )
359 )
360 ),
361 maximum: round(
362 max(
363 ...this.workerNodes.map(
364 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
365 )
366 )
367 ),
368 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
369 .runTime.average && {
370 average: round(
371 average(
372 this.workerNodes.reduce<number[]>(
373 (accumulator, workerNode) =>
374 accumulator.concat(workerNode.usage.runTime.history),
375 []
376 )
377 )
378 )
379 }),
380 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
381 .runTime.median && {
382 median: round(
383 median(
384 this.workerNodes.reduce<number[]>(
385 (accumulator, workerNode) =>
386 accumulator.concat(workerNode.usage.runTime.history),
387 []
388 )
389 )
390 )
391 })
392 }
393 }),
394 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
395 .waitTime.aggregate && {
396 waitTime: {
397 minimum: round(
398 min(
399 ...this.workerNodes.map(
400 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
401 )
402 )
403 ),
404 maximum: round(
405 max(
406 ...this.workerNodes.map(
407 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
408 )
409 )
410 ),
411 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
412 .waitTime.average && {
413 average: round(
414 average(
415 this.workerNodes.reduce<number[]>(
416 (accumulator, workerNode) =>
417 accumulator.concat(workerNode.usage.waitTime.history),
418 []
419 )
420 )
421 )
422 }),
423 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
424 .waitTime.median && {
425 median: round(
426 median(
427 this.workerNodes.reduce<number[]>(
428 (accumulator, workerNode) =>
429 accumulator.concat(workerNode.usage.waitTime.history),
430 []
431 )
432 )
433 )
434 })
435 }
436 })
437 }
438 }
439
440 /**
441 * The pool readiness boolean status.
442 */
443 private get ready (): boolean {
444 return (
445 this.workerNodes.reduce(
446 (accumulator, workerNode) =>
447 !workerNode.info.dynamic && workerNode.info.ready
448 ? accumulator + 1
449 : accumulator,
450 0
451 ) >= this.minSize
452 )
453 }
454
455 /**
456 * The approximate pool utilization.
457 *
458 * @returns The pool utilization.
459 */
460 private get utilization (): number {
461 const poolTimeCapacity =
462 (performance.now() - this.startTimestamp) * this.maxSize
463 const totalTasksRunTime = this.workerNodes.reduce(
464 (accumulator, workerNode) =>
465 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
466 0
467 )
468 const totalTasksWaitTime = this.workerNodes.reduce(
469 (accumulator, workerNode) =>
470 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
471 0
472 )
473 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
474 }
475
476 /**
477 * The pool type.
478 *
479 * If it is `'dynamic'`, it provides the `max` property.
480 */
481 protected abstract get type (): PoolType
482
483 /**
484 * The worker type.
485 */
486 protected abstract get worker (): WorkerType
487
488 /**
489 * The pool minimum size.
490 */
491 protected get minSize (): number {
492 return this.numberOfWorkers
493 }
494
495 /**
496 * The pool maximum size.
497 */
498 protected get maxSize (): number {
499 return this.max ?? this.numberOfWorkers
500 }
501
502 /**
503 * Checks if the worker id sent in the received message from a worker is valid.
504 *
505 * @param message - The received message.
506 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
507 */
508 private checkMessageWorkerId (message: MessageValue<Data | Response>): void {
509 if (message.workerId == null) {
510 throw new Error('Worker message received without worker id')
511 } else if (this.getWorkerNodeKeyByWorkerId(message.workerId) === -1) {
512 throw new Error(
513 `Worker message received from unknown worker '${message.workerId}'`
514 )
515 }
516 }
517
518 /**
519 * Gets the given worker its worker node key.
520 *
521 * @param worker - The worker.
522 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
523 */
524 private getWorkerNodeKeyByWorker (worker: Worker): number {
525 return this.workerNodes.findIndex(
526 workerNode => workerNode.worker === worker
527 )
528 }
529
530 /**
531 * Gets the worker node key given its worker id.
532 *
533 * @param workerId - The worker id.
534 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
535 */
536 private getWorkerNodeKeyByWorkerId (workerId: number | undefined): number {
537 return this.workerNodes.findIndex(
538 workerNode => workerNode.info.id === workerId
539 )
540 }
541
542 /** @inheritDoc */
543 public setWorkerChoiceStrategy (
544 workerChoiceStrategy: WorkerChoiceStrategy,
545 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
546 ): void {
547 checkValidWorkerChoiceStrategy(workerChoiceStrategy)
548 this.opts.workerChoiceStrategy = workerChoiceStrategy
549 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
550 this.opts.workerChoiceStrategy
551 )
552 if (workerChoiceStrategyOptions != null) {
553 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
554 }
555 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
556 workerNode.resetUsage()
557 this.sendStatisticsMessageToWorker(workerNodeKey)
558 }
559 }
560
561 /** @inheritDoc */
562 public setWorkerChoiceStrategyOptions (
563 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
564 ): void {
565 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
566 this.opts.workerChoiceStrategyOptions = {
567 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
568 ...workerChoiceStrategyOptions
569 }
570 this.workerChoiceStrategyContext.setOptions(
571 this.opts.workerChoiceStrategyOptions
572 )
573 }
574
575 /** @inheritDoc */
576 public enableTasksQueue (
577 enable: boolean,
578 tasksQueueOptions?: TasksQueueOptions
579 ): void {
580 if (this.opts.enableTasksQueue === true && !enable) {
581 this.unsetTaskStealing()
582 this.unsetTasksStealingOnBackPressure()
583 this.flushTasksQueues()
584 }
585 this.opts.enableTasksQueue = enable
586 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
587 }
588
589 /** @inheritDoc */
590 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
591 if (this.opts.enableTasksQueue === true) {
592 checkValidTasksQueueOptions(tasksQueueOptions)
593 this.opts.tasksQueueOptions =
594 this.buildTasksQueueOptions(tasksQueueOptions)
595 this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number)
596 if (this.opts.tasksQueueOptions.taskStealing === true) {
597 this.setTaskStealing()
598 } else {
599 this.unsetTaskStealing()
600 }
601 if (this.opts.tasksQueueOptions.tasksStealingOnBackPressure === true) {
602 this.setTasksStealingOnBackPressure()
603 } else {
604 this.unsetTasksStealingOnBackPressure()
605 }
606 } else if (this.opts.tasksQueueOptions != null) {
607 delete this.opts.tasksQueueOptions
608 }
609 }
610
611 private buildTasksQueueOptions (
612 tasksQueueOptions: TasksQueueOptions
613 ): TasksQueueOptions {
614 return {
615 ...{
616 size: Math.pow(this.maxSize, 2),
617 concurrency: 1,
618 taskStealing: true,
619 tasksStealingOnBackPressure: true
620 },
621 ...tasksQueueOptions
622 }
623 }
624
625 private setTasksQueueSize (size: number): void {
626 for (const workerNode of this.workerNodes) {
627 workerNode.tasksQueueBackPressureSize = size
628 }
629 }
630
631 private setTaskStealing (): void {
632 for (const [workerNodeKey] of this.workerNodes.entries()) {
633 this.workerNodes[workerNodeKey].addEventListener(
634 'idleWorkerNode',
635 this.handleIdleWorkerNodeEvent as EventListener
636 )
637 }
638 }
639
640 private unsetTaskStealing (): void {
641 for (const [workerNodeKey] of this.workerNodes.entries()) {
642 this.workerNodes[workerNodeKey].removeEventListener(
643 'idleWorkerNode',
644 this.handleIdleWorkerNodeEvent as EventListener
645 )
646 }
647 }
648
649 private setTasksStealingOnBackPressure (): void {
650 for (const [workerNodeKey] of this.workerNodes.entries()) {
651 this.workerNodes[workerNodeKey].addEventListener(
652 'backPressure',
653 this.handleBackPressureEvent as EventListener
654 )
655 }
656 }
657
658 private unsetTasksStealingOnBackPressure (): void {
659 for (const [workerNodeKey] of this.workerNodes.entries()) {
660 this.workerNodes[workerNodeKey].removeEventListener(
661 'backPressure',
662 this.handleBackPressureEvent as EventListener
663 )
664 }
665 }
666
667 /**
668 * Whether the pool is full or not.
669 *
670 * The pool filling boolean status.
671 */
672 protected get full (): boolean {
673 return this.workerNodes.length >= this.maxSize
674 }
675
676 /**
677 * Whether the pool is busy or not.
678 *
679 * The pool busyness boolean status.
680 */
681 protected abstract get busy (): boolean
682
683 /**
684 * Whether worker nodes are executing concurrently their tasks quota or not.
685 *
686 * @returns Worker nodes busyness boolean status.
687 */
688 protected internalBusy (): boolean {
689 if (this.opts.enableTasksQueue === true) {
690 return (
691 this.workerNodes.findIndex(
692 workerNode =>
693 workerNode.info.ready &&
694 workerNode.usage.tasks.executing <
695 (this.opts.tasksQueueOptions?.concurrency as number)
696 ) === -1
697 )
698 }
699 return (
700 this.workerNodes.findIndex(
701 workerNode =>
702 workerNode.info.ready && workerNode.usage.tasks.executing === 0
703 ) === -1
704 )
705 }
706
707 private async sendTaskFunctionOperationToWorker (
708 workerNodeKey: number,
709 message: MessageValue<Data>
710 ): Promise<boolean> {
711 return await new Promise<boolean>((resolve, reject) => {
712 const taskFunctionOperationListener = (
713 message: MessageValue<Response>
714 ): void => {
715 this.checkMessageWorkerId(message)
716 const workerId = this.getWorkerInfo(workerNodeKey).id as number
717 if (
718 message.taskFunctionOperationStatus != null &&
719 message.workerId === workerId
720 ) {
721 if (message.taskFunctionOperationStatus) {
722 resolve(true)
723 } else if (!message.taskFunctionOperationStatus) {
724 reject(
725 new Error(
726 `Task function operation '${
727 message.taskFunctionOperation as string
728 }' failed on worker ${message.workerId} with error: '${
729 message.workerError?.message as string
730 }'`
731 )
732 )
733 }
734 this.deregisterWorkerMessageListener(
735 this.getWorkerNodeKeyByWorkerId(message.workerId),
736 taskFunctionOperationListener
737 )
738 }
739 }
740 this.registerWorkerMessageListener(
741 workerNodeKey,
742 taskFunctionOperationListener
743 )
744 this.sendToWorker(workerNodeKey, message)
745 })
746 }
747
748 private async sendTaskFunctionOperationToWorkers (
749 message: MessageValue<Data>
750 ): Promise<boolean> {
751 return await new Promise<boolean>((resolve, reject) => {
752 const responsesReceived = new Array<MessageValue<Response>>()
753 const taskFunctionOperationsListener = (
754 message: MessageValue<Response>
755 ): void => {
756 this.checkMessageWorkerId(message)
757 if (message.taskFunctionOperationStatus != null) {
758 responsesReceived.push(message)
759 if (responsesReceived.length === this.workerNodes.length) {
760 if (
761 responsesReceived.every(
762 message => message.taskFunctionOperationStatus === true
763 )
764 ) {
765 resolve(true)
766 } else if (
767 responsesReceived.some(
768 message => message.taskFunctionOperationStatus === false
769 )
770 ) {
771 const errorResponse = responsesReceived.find(
772 response => response.taskFunctionOperationStatus === false
773 )
774 reject(
775 new Error(
776 `Task function operation '${
777 message.taskFunctionOperation as string
778 }' failed on worker ${
779 errorResponse?.workerId as number
780 } with error: '${
781 errorResponse?.workerError?.message as string
782 }'`
783 )
784 )
785 }
786 this.deregisterWorkerMessageListener(
787 this.getWorkerNodeKeyByWorkerId(message.workerId),
788 taskFunctionOperationsListener
789 )
790 }
791 }
792 }
793 for (const [workerNodeKey] of this.workerNodes.entries()) {
794 this.registerWorkerMessageListener(
795 workerNodeKey,
796 taskFunctionOperationsListener
797 )
798 this.sendToWorker(workerNodeKey, message)
799 }
800 })
801 }
802
803 /** @inheritDoc */
804 public hasTaskFunction (name: string): boolean {
805 for (const workerNode of this.workerNodes) {
806 if (
807 Array.isArray(workerNode.info.taskFunctionNames) &&
808 workerNode.info.taskFunctionNames.includes(name)
809 ) {
810 return true
811 }
812 }
813 return false
814 }
815
816 /** @inheritDoc */
817 public async addTaskFunction (
818 name: string,
819 fn: TaskFunction<Data, Response>
820 ): Promise<boolean> {
821 if (typeof name !== 'string') {
822 throw new TypeError('name argument must be a string')
823 }
824 if (typeof name === 'string' && name.trim().length === 0) {
825 throw new TypeError('name argument must not be an empty string')
826 }
827 if (typeof fn !== 'function') {
828 throw new TypeError('fn argument must be a function')
829 }
830 const opResult = await this.sendTaskFunctionOperationToWorkers({
831 taskFunctionOperation: 'add',
832 taskFunctionName: name,
833 taskFunction: fn.toString()
834 })
835 this.taskFunctions.set(name, fn)
836 return opResult
837 }
838
839 /** @inheritDoc */
840 public async removeTaskFunction (name: string): Promise<boolean> {
841 if (!this.taskFunctions.has(name)) {
842 throw new Error(
843 'Cannot remove a task function not handled on the pool side'
844 )
845 }
846 const opResult = await this.sendTaskFunctionOperationToWorkers({
847 taskFunctionOperation: 'remove',
848 taskFunctionName: name
849 })
850 this.deleteTaskFunctionWorkerUsages(name)
851 this.taskFunctions.delete(name)
852 return opResult
853 }
854
855 /** @inheritDoc */
856 public listTaskFunctionNames (): string[] {
857 for (const workerNode of this.workerNodes) {
858 if (
859 Array.isArray(workerNode.info.taskFunctionNames) &&
860 workerNode.info.taskFunctionNames.length > 0
861 ) {
862 return workerNode.info.taskFunctionNames
863 }
864 }
865 return []
866 }
867
868 /** @inheritDoc */
869 public async setDefaultTaskFunction (name: string): Promise<boolean> {
870 return await this.sendTaskFunctionOperationToWorkers({
871 taskFunctionOperation: 'default',
872 taskFunctionName: name
873 })
874 }
875
876 private deleteTaskFunctionWorkerUsages (name: string): void {
877 for (const workerNode of this.workerNodes) {
878 workerNode.deleteTaskFunctionWorkerUsage(name)
879 }
880 }
881
882 private shallExecuteTask (workerNodeKey: number): boolean {
883 return (
884 this.tasksQueueSize(workerNodeKey) === 0 &&
885 this.workerNodes[workerNodeKey].usage.tasks.executing <
886 (this.opts.tasksQueueOptions?.concurrency as number)
887 )
888 }
889
890 /** @inheritDoc */
891 public async execute (
892 data?: Data,
893 name?: string,
894 transferList?: TransferListItem[]
895 ): Promise<Response> {
896 return await new Promise<Response>((resolve, reject) => {
897 if (!this.started) {
898 reject(new Error('Cannot execute a task on not started pool'))
899 return
900 }
901 if (this.destroying) {
902 reject(new Error('Cannot execute a task on destroying pool'))
903 return
904 }
905 if (name != null && typeof name !== 'string') {
906 reject(new TypeError('name argument must be a string'))
907 return
908 }
909 if (
910 name != null &&
911 typeof name === 'string' &&
912 name.trim().length === 0
913 ) {
914 reject(new TypeError('name argument must not be an empty string'))
915 return
916 }
917 if (transferList != null && !Array.isArray(transferList)) {
918 reject(new TypeError('transferList argument must be an array'))
919 return
920 }
921 const timestamp = performance.now()
922 const workerNodeKey = this.chooseWorkerNode()
923 const task: Task<Data> = {
924 name: name ?? DEFAULT_TASK_NAME,
925 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
926 data: data ?? ({} as Data),
927 transferList,
928 timestamp,
929 taskId: randomUUID()
930 }
931 this.promiseResponseMap.set(task.taskId as string, {
932 resolve,
933 reject,
934 workerNodeKey
935 })
936 if (
937 this.opts.enableTasksQueue === false ||
938 (this.opts.enableTasksQueue === true &&
939 this.shallExecuteTask(workerNodeKey))
940 ) {
941 this.executeTask(workerNodeKey, task)
942 } else {
943 this.enqueueTask(workerNodeKey, task)
944 }
945 })
946 }
947
948 /** @inheritdoc */
949 public start (): void {
950 if (this.started) {
951 throw new Error('Cannot start an already started pool')
952 }
953 if (this.starting) {
954 throw new Error('Cannot start an already starting pool')
955 }
956 if (this.destroying) {
957 throw new Error('Cannot start a destroying pool')
958 }
959 this.starting = true
960 while (
961 this.workerNodes.reduce(
962 (accumulator, workerNode) =>
963 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
964 0
965 ) < this.numberOfWorkers
966 ) {
967 this.createAndSetupWorkerNode()
968 }
969 this.starting = false
970 this.started = true
971 }
972
973 /** @inheritDoc */
974 public async destroy (): Promise<void> {
975 if (!this.started) {
976 throw new Error('Cannot destroy an already destroyed pool')
977 }
978 if (this.starting) {
979 throw new Error('Cannot destroy an starting pool')
980 }
981 if (this.destroying) {
982 throw new Error('Cannot destroy an already destroying pool')
983 }
984 this.destroying = true
985 await Promise.all(
986 this.workerNodes.map(async (_, workerNodeKey) => {
987 await this.destroyWorkerNode(workerNodeKey)
988 })
989 )
990 this.emitter?.emit(PoolEvents.destroy, this.info)
991 this.emitter?.emitDestroy()
992 this.readyEventEmitted = false
993 this.destroying = false
994 this.started = false
995 }
996
997 protected async sendKillMessageToWorker (
998 workerNodeKey: number
999 ): Promise<void> {
1000 await new Promise<void>((resolve, reject) => {
1001 const killMessageListener = (message: MessageValue<Response>): void => {
1002 this.checkMessageWorkerId(message)
1003 if (message.kill === 'success') {
1004 resolve()
1005 } else if (message.kill === 'failure') {
1006 reject(
1007 new Error(
1008 `Kill message handling failed on worker ${
1009 message.workerId as number
1010 }`
1011 )
1012 )
1013 }
1014 }
1015 // FIXME: should be registered only once
1016 this.registerWorkerMessageListener(workerNodeKey, killMessageListener)
1017 this.sendToWorker(workerNodeKey, { kill: true })
1018 })
1019 }
1020
1021 /**
1022 * Terminates the worker node given its worker node key.
1023 *
1024 * @param workerNodeKey - The worker node key.
1025 */
1026 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
1027
1028 /**
1029 * Setup hook to execute code before worker nodes are created in the abstract constructor.
1030 * Can be overridden.
1031 *
1032 * @virtual
1033 */
1034 protected setupHook (): void {
1035 /* Intentionally empty */
1036 }
1037
1038 /**
1039 * Should return whether the worker is the main worker or not.
1040 */
1041 protected abstract isMain (): boolean
1042
1043 /**
1044 * Hook executed before the worker task execution.
1045 * Can be overridden.
1046 *
1047 * @param workerNodeKey - The worker node key.
1048 * @param task - The task to execute.
1049 */
1050 protected beforeTaskExecutionHook (
1051 workerNodeKey: number,
1052 task: Task<Data>
1053 ): void {
1054 if (this.workerNodes[workerNodeKey]?.usage != null) {
1055 const workerUsage = this.workerNodes[workerNodeKey].usage
1056 ++workerUsage.tasks.executing
1057 this.updateWaitTimeWorkerUsage(workerUsage, task)
1058 }
1059 if (
1060 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1061 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
1062 task.name as string
1063 ) != null
1064 ) {
1065 const taskFunctionWorkerUsage = this.workerNodes[
1066 workerNodeKey
1067 ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
1068 ++taskFunctionWorkerUsage.tasks.executing
1069 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
1070 }
1071 }
1072
1073 /**
1074 * Hook executed after the worker task execution.
1075 * Can be overridden.
1076 *
1077 * @param workerNodeKey - The worker node key.
1078 * @param message - The received message.
1079 */
1080 protected afterTaskExecutionHook (
1081 workerNodeKey: number,
1082 message: MessageValue<Response>
1083 ): void {
1084 if (this.workerNodes[workerNodeKey]?.usage != null) {
1085 const workerUsage = this.workerNodes[workerNodeKey].usage
1086 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
1087 this.updateRunTimeWorkerUsage(workerUsage, message)
1088 this.updateEluWorkerUsage(workerUsage, message)
1089 }
1090 if (
1091 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1092 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
1093 message.taskPerformance?.name as string
1094 ) != null
1095 ) {
1096 const taskFunctionWorkerUsage = this.workerNodes[
1097 workerNodeKey
1098 ].getTaskFunctionWorkerUsage(
1099 message.taskPerformance?.name as string
1100 ) as WorkerUsage
1101 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
1102 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
1103 this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
1104 }
1105 }
1106
1107 /**
1108 * Whether the worker node shall update its task function worker usage or not.
1109 *
1110 * @param workerNodeKey - The worker node key.
1111 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1112 */
1113 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
1114 const workerInfo = this.getWorkerInfo(workerNodeKey)
1115 return (
1116 workerInfo != null &&
1117 Array.isArray(workerInfo.taskFunctionNames) &&
1118 workerInfo.taskFunctionNames.length > 2
1119 )
1120 }
1121
1122 private updateTaskStatisticsWorkerUsage (
1123 workerUsage: WorkerUsage,
1124 message: MessageValue<Response>
1125 ): void {
1126 const workerTaskStatistics = workerUsage.tasks
1127 if (
1128 workerTaskStatistics.executing != null &&
1129 workerTaskStatistics.executing > 0
1130 ) {
1131 --workerTaskStatistics.executing
1132 }
1133 if (message.workerError == null) {
1134 ++workerTaskStatistics.executed
1135 } else {
1136 ++workerTaskStatistics.failed
1137 }
1138 }
1139
1140 private updateRunTimeWorkerUsage (
1141 workerUsage: WorkerUsage,
1142 message: MessageValue<Response>
1143 ): void {
1144 if (message.workerError != null) {
1145 return
1146 }
1147 updateMeasurementStatistics(
1148 workerUsage.runTime,
1149 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
1150 message.taskPerformance?.runTime ?? 0
1151 )
1152 }
1153
1154 private updateWaitTimeWorkerUsage (
1155 workerUsage: WorkerUsage,
1156 task: Task<Data>
1157 ): void {
1158 const timestamp = performance.now()
1159 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
1160 updateMeasurementStatistics(
1161 workerUsage.waitTime,
1162 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
1163 taskWaitTime
1164 )
1165 }
1166
1167 private updateEluWorkerUsage (
1168 workerUsage: WorkerUsage,
1169 message: MessageValue<Response>
1170 ): void {
1171 if (message.workerError != null) {
1172 return
1173 }
1174 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
1175 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
1176 updateMeasurementStatistics(
1177 workerUsage.elu.active,
1178 eluTaskStatisticsRequirements,
1179 message.taskPerformance?.elu?.active ?? 0
1180 )
1181 updateMeasurementStatistics(
1182 workerUsage.elu.idle,
1183 eluTaskStatisticsRequirements,
1184 message.taskPerformance?.elu?.idle ?? 0
1185 )
1186 if (eluTaskStatisticsRequirements.aggregate) {
1187 if (message.taskPerformance?.elu != null) {
1188 if (workerUsage.elu.utilization != null) {
1189 workerUsage.elu.utilization =
1190 (workerUsage.elu.utilization +
1191 message.taskPerformance.elu.utilization) /
1192 2
1193 } else {
1194 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
1195 }
1196 }
1197 }
1198 }
1199
1200 /**
1201 * Chooses a worker node for the next task.
1202 *
1203 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
1204 *
1205 * @returns The chosen worker node key
1206 */
1207 private chooseWorkerNode (): number {
1208 if (this.shallCreateDynamicWorker()) {
1209 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
1210 if (
1211 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1212 ) {
1213 return workerNodeKey
1214 }
1215 }
1216 return this.workerChoiceStrategyContext.execute()
1217 }
1218
1219 /**
1220 * Conditions for dynamic worker creation.
1221 *
1222 * @returns Whether to create a dynamic worker or not.
1223 */
1224 private shallCreateDynamicWorker (): boolean {
1225 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
1226 }
1227
1228 /**
1229 * Sends a message to worker given its worker node key.
1230 *
1231 * @param workerNodeKey - The worker node key.
1232 * @param message - The message.
1233 * @param transferList - The optional array of transferable objects.
1234 */
1235 protected abstract sendToWorker (
1236 workerNodeKey: number,
1237 message: MessageValue<Data>,
1238 transferList?: TransferListItem[]
1239 ): void
1240
1241 /**
1242 * Creates a new worker.
1243 *
1244 * @returns Newly created worker.
1245 */
1246 protected abstract createWorker (): Worker
1247
1248 /**
1249 * Creates a new, completely set up worker node.
1250 *
1251 * @returns New, completely set up worker node key.
1252 */
1253 protected createAndSetupWorkerNode (): number {
1254 const worker = this.createWorker()
1255
1256 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
1257 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
1258 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1259 worker.on('error', error => {
1260 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1261 this.flagWorkerNodeAsNotReady(workerNodeKey)
1262 const workerInfo = this.getWorkerInfo(workerNodeKey)
1263 this.emitter?.emit(PoolEvents.error, error)
1264 this.workerNodes[workerNodeKey].closeChannel()
1265 if (
1266 this.started &&
1267 !this.starting &&
1268 !this.destroying &&
1269 this.opts.restartWorkerOnError === true
1270 ) {
1271 if (workerInfo.dynamic) {
1272 this.createAndSetupDynamicWorkerNode()
1273 } else {
1274 this.createAndSetupWorkerNode()
1275 }
1276 }
1277 if (this.started && this.opts.enableTasksQueue === true) {
1278 this.redistributeQueuedTasks(workerNodeKey)
1279 }
1280 })
1281 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
1282 worker.once('exit', () => {
1283 this.removeWorkerNode(worker)
1284 })
1285
1286 const workerNodeKey = this.addWorkerNode(worker)
1287
1288 this.afterWorkerNodeSetup(workerNodeKey)
1289
1290 return workerNodeKey
1291 }
1292
1293 /**
1294 * Creates a new, completely set up dynamic worker node.
1295 *
1296 * @returns New, completely set up dynamic worker node key.
1297 */
1298 protected createAndSetupDynamicWorkerNode (): number {
1299 const workerNodeKey = this.createAndSetupWorkerNode()
1300 this.registerWorkerMessageListener(workerNodeKey, message => {
1301 this.checkMessageWorkerId(message)
1302 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1303 message.workerId
1304 )
1305 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
1306 // Kill message received from worker
1307 if (
1308 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1309 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
1310 ((this.opts.enableTasksQueue === false &&
1311 workerUsage.tasks.executing === 0) ||
1312 (this.opts.enableTasksQueue === true &&
1313 workerUsage.tasks.executing === 0 &&
1314 this.tasksQueueSize(localWorkerNodeKey) === 0)))
1315 ) {
1316 // Flag the worker node as not ready immediately
1317 this.flagWorkerNodeAsNotReady(localWorkerNodeKey)
1318 this.destroyWorkerNode(localWorkerNodeKey).catch(error => {
1319 this.emitter?.emit(PoolEvents.error, error)
1320 })
1321 }
1322 })
1323 const workerInfo = this.getWorkerInfo(workerNodeKey)
1324 this.sendToWorker(workerNodeKey, {
1325 checkActive: true
1326 })
1327 if (this.taskFunctions.size > 0) {
1328 for (const [taskFunctionName, taskFunction] of this.taskFunctions) {
1329 this.sendTaskFunctionOperationToWorker(workerNodeKey, {
1330 taskFunctionOperation: 'add',
1331 taskFunctionName,
1332 taskFunction: taskFunction.toString()
1333 }).catch(error => {
1334 this.emitter?.emit(PoolEvents.error, error)
1335 })
1336 }
1337 }
1338 workerInfo.dynamic = true
1339 if (
1340 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
1341 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1342 ) {
1343 workerInfo.ready = true
1344 }
1345 this.checkAndEmitDynamicWorkerCreationEvents()
1346 return workerNodeKey
1347 }
1348
1349 /**
1350 * Registers a listener callback on the worker given its worker node key.
1351 *
1352 * @param workerNodeKey - The worker node key.
1353 * @param listener - The message listener callback.
1354 */
1355 protected abstract registerWorkerMessageListener<
1356 Message extends Data | Response
1357 >(
1358 workerNodeKey: number,
1359 listener: (message: MessageValue<Message>) => void
1360 ): void
1361
1362 /**
1363 * Registers once a listener callback on the worker given its worker node key.
1364 *
1365 * @param workerNodeKey - The worker node key.
1366 * @param listener - The message listener callback.
1367 */
1368 protected abstract registerOnceWorkerMessageListener<
1369 Message extends Data | Response
1370 >(
1371 workerNodeKey: number,
1372 listener: (message: MessageValue<Message>) => void
1373 ): void
1374
1375 /**
1376 * Deregisters a listener callback on the worker given its worker node key.
1377 *
1378 * @param workerNodeKey - The worker node key.
1379 * @param listener - The message listener callback.
1380 */
1381 protected abstract deregisterWorkerMessageListener<
1382 Message extends Data | Response
1383 >(
1384 workerNodeKey: number,
1385 listener: (message: MessageValue<Message>) => void
1386 ): void
1387
1388 /**
1389 * Method hooked up after a worker node has been newly created.
1390 * Can be overridden.
1391 *
1392 * @param workerNodeKey - The newly created worker node key.
1393 */
1394 protected afterWorkerNodeSetup (workerNodeKey: number): void {
1395 // Listen to worker messages.
1396 this.registerWorkerMessageListener(
1397 workerNodeKey,
1398 this.workerMessageListener.bind(this)
1399 )
1400 // Send the startup message to worker.
1401 this.sendStartupMessageToWorker(workerNodeKey)
1402 // Send the statistics message to worker.
1403 this.sendStatisticsMessageToWorker(workerNodeKey)
1404 if (this.opts.enableTasksQueue === true) {
1405 if (this.opts.tasksQueueOptions?.taskStealing === true) {
1406 this.workerNodes[workerNodeKey].addEventListener(
1407 'idleWorkerNode',
1408 this.handleIdleWorkerNodeEvent as EventListener
1409 )
1410 }
1411 if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
1412 this.workerNodes[workerNodeKey].addEventListener(
1413 'backPressure',
1414 this.handleBackPressureEvent as EventListener
1415 )
1416 }
1417 }
1418 }
1419
1420 /**
1421 * Sends the startup message to worker given its worker node key.
1422 *
1423 * @param workerNodeKey - The worker node key.
1424 */
1425 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1426
1427 /**
1428 * Sends the statistics message to worker given its worker node key.
1429 *
1430 * @param workerNodeKey - The worker node key.
1431 */
1432 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
1433 this.sendToWorker(workerNodeKey, {
1434 statistics: {
1435 runTime:
1436 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1437 .runTime.aggregate,
1438 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1439 .elu.aggregate
1440 }
1441 })
1442 }
1443
1444 private redistributeQueuedTasks (workerNodeKey: number): void {
1445 while (this.tasksQueueSize(workerNodeKey) > 0) {
1446 const destinationWorkerNodeKey = this.workerNodes.reduce(
1447 (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
1448 return workerNode.info.ready &&
1449 workerNode.usage.tasks.queued <
1450 workerNodes[minWorkerNodeKey].usage.tasks.queued
1451 ? workerNodeKey
1452 : minWorkerNodeKey
1453 },
1454 0
1455 )
1456 const task = this.dequeueTask(workerNodeKey) as Task<Data>
1457 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1458 this.executeTask(destinationWorkerNodeKey, task)
1459 } else {
1460 this.enqueueTask(destinationWorkerNodeKey, task)
1461 }
1462 }
1463 }
1464
1465 private updateTaskStolenStatisticsWorkerUsage (
1466 workerNodeKey: number,
1467 taskName: string
1468 ): void {
1469 const workerNode = this.workerNodes[workerNodeKey]
1470 if (workerNode?.usage != null) {
1471 ++workerNode.usage.tasks.stolen
1472 }
1473 if (
1474 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1475 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1476 ) {
1477 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1478 taskName
1479 ) as WorkerUsage
1480 ++taskFunctionWorkerUsage.tasks.stolen
1481 }
1482 }
1483
1484 private updateTaskSequentiallyStolenStatisticsWorkerUsage (
1485 workerNodeKey: number
1486 ): void {
1487 const workerNode = this.workerNodes[workerNodeKey]
1488 if (workerNode?.usage != null) {
1489 ++workerNode.usage.tasks.sequentiallyStolen
1490 }
1491 }
1492
1493 private updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1494 workerNodeKey: number,
1495 taskName: string
1496 ): void {
1497 const workerNode = this.workerNodes[workerNodeKey]
1498 if (
1499 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1500 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1501 ) {
1502 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1503 taskName
1504 ) as WorkerUsage
1505 ++taskFunctionWorkerUsage.tasks.sequentiallyStolen
1506 }
1507 }
1508
1509 private resetTaskSequentiallyStolenStatisticsWorkerUsage (
1510 workerNodeKey: number
1511 ): void {
1512 const workerNode = this.workerNodes[workerNodeKey]
1513 if (workerNode?.usage != null) {
1514 workerNode.usage.tasks.sequentiallyStolen = 0
1515 }
1516 }
1517
1518 private resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1519 workerNodeKey: number,
1520 taskName: string
1521 ): void {
1522 const workerNode = this.workerNodes[workerNodeKey]
1523 if (
1524 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1525 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1526 ) {
1527 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1528 taskName
1529 ) as WorkerUsage
1530 taskFunctionWorkerUsage.tasks.sequentiallyStolen = 0
1531 }
1532 }
1533
1534 private readonly handleIdleWorkerNodeEvent = (
1535 event: CustomEvent<WorkerNodeEventDetail>,
1536 previousStolenTask?: Task<Data>
1537 ): void => {
1538 const { workerNodeKey } = event.detail
1539 if (workerNodeKey == null) {
1540 throw new Error(
1541 'WorkerNode event detail workerNodeKey attribute must be defined'
1542 )
1543 }
1544 const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks
1545 if (
1546 previousStolenTask != null &&
1547 workerNodeTasksUsage.sequentiallyStolen > 0 &&
1548 (workerNodeTasksUsage.executing > 0 ||
1549 this.tasksQueueSize(workerNodeKey) > 0)
1550 ) {
1551 for (const taskName of this.workerNodes[workerNodeKey].info
1552 .taskFunctionNames as string[]) {
1553 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1554 workerNodeKey,
1555 taskName
1556 )
1557 }
1558 this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
1559 return
1560 }
1561 const stolenTask = this.workerNodeStealTask(workerNodeKey)
1562 if (
1563 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1564 stolenTask != null
1565 ) {
1566 const taskFunctionTasksWorkerUsage = this.workerNodes[
1567 workerNodeKey
1568 ].getTaskFunctionWorkerUsage(stolenTask.name as string)
1569 ?.tasks as TaskStatistics
1570 if (
1571 taskFunctionTasksWorkerUsage.sequentiallyStolen === 0 ||
1572 (previousStolenTask != null &&
1573 previousStolenTask.name === stolenTask.name &&
1574 taskFunctionTasksWorkerUsage.sequentiallyStolen > 0)
1575 ) {
1576 this.updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1577 workerNodeKey,
1578 stolenTask.name as string
1579 )
1580 } else {
1581 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1582 workerNodeKey,
1583 stolenTask.name as string
1584 )
1585 }
1586 }
1587 sleep(exponentialDelay(workerNodeTasksUsage.sequentiallyStolen))
1588 .then(() => {
1589 this.handleIdleWorkerNodeEvent(event, stolenTask)
1590 return undefined
1591 })
1592 .catch(EMPTY_FUNCTION)
1593 }
1594
1595 private readonly workerNodeStealTask = (
1596 workerNodeKey: number
1597 ): Task<Data> | undefined => {
1598 const workerNodes = this.workerNodes
1599 .slice()
1600 .sort(
1601 (workerNodeA, workerNodeB) =>
1602 workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
1603 )
1604 const sourceWorkerNode = workerNodes.find(
1605 (sourceWorkerNode, sourceWorkerNodeKey) =>
1606 sourceWorkerNode.info.ready &&
1607 sourceWorkerNodeKey !== workerNodeKey &&
1608 sourceWorkerNode.usage.tasks.queued > 0
1609 )
1610 if (sourceWorkerNode != null) {
1611 const task = sourceWorkerNode.popTask() as Task<Data>
1612 if (this.shallExecuteTask(workerNodeKey)) {
1613 this.executeTask(workerNodeKey, task)
1614 } else {
1615 this.enqueueTask(workerNodeKey, task)
1616 }
1617 this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
1618 this.updateTaskStolenStatisticsWorkerUsage(
1619 workerNodeKey,
1620 task.name as string
1621 )
1622 return task
1623 }
1624 }
1625
1626 private readonly handleBackPressureEvent = (
1627 event: CustomEvent<WorkerNodeEventDetail>
1628 ): void => {
1629 const { workerId } = event.detail
1630 const sizeOffset = 1
1631 if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) {
1632 return
1633 }
1634 const sourceWorkerNode =
1635 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
1636 const workerNodes = this.workerNodes
1637 .slice()
1638 .sort(
1639 (workerNodeA, workerNodeB) =>
1640 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1641 )
1642 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1643 if (
1644 sourceWorkerNode.usage.tasks.queued > 0 &&
1645 workerNode.info.ready &&
1646 workerNode.info.id !== workerId &&
1647 workerNode.usage.tasks.queued <
1648 (this.opts.tasksQueueOptions?.size as number) - sizeOffset
1649 ) {
1650 const task = sourceWorkerNode.popTask() as Task<Data>
1651 if (this.shallExecuteTask(workerNodeKey)) {
1652 this.executeTask(workerNodeKey, task)
1653 } else {
1654 this.enqueueTask(workerNodeKey, task)
1655 }
1656 this.updateTaskStolenStatisticsWorkerUsage(
1657 workerNodeKey,
1658 task.name as string
1659 )
1660 }
1661 }
1662 }
1663
1664 /**
1665 * This method is the message listener registered on each worker.
1666 */
1667 protected workerMessageListener (message: MessageValue<Response>): void {
1668 this.checkMessageWorkerId(message)
1669 const { workerId, ready, taskId, taskFunctionNames } = message
1670 if (ready != null && taskFunctionNames != null) {
1671 // Worker ready response received from worker
1672 this.handleWorkerReadyResponse(message)
1673 } else if (taskId != null) {
1674 // Task execution response received from worker
1675 this.handleTaskExecutionResponse(message)
1676 } else if (taskFunctionNames != null) {
1677 // Task function names message received from worker
1678 this.getWorkerInfo(
1679 this.getWorkerNodeKeyByWorkerId(workerId)
1680 ).taskFunctionNames = taskFunctionNames
1681 }
1682 }
1683
1684 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
1685 const { workerId, ready, taskFunctionNames } = message
1686 if (ready === false) {
1687 throw new Error(`Worker ${workerId as number} failed to initialize`)
1688 }
1689 const workerInfo = this.getWorkerInfo(
1690 this.getWorkerNodeKeyByWorkerId(workerId)
1691 )
1692 workerInfo.ready = ready as boolean
1693 workerInfo.taskFunctionNames = taskFunctionNames
1694 if (!this.readyEventEmitted && this.ready) {
1695 this.readyEventEmitted = true
1696 this.emitter?.emit(PoolEvents.ready, this.info)
1697 }
1698 }
1699
1700 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1701 const { workerId, taskId, workerError, data } = message
1702 const promiseResponse = this.promiseResponseMap.get(taskId as string)
1703 if (promiseResponse != null) {
1704 const { resolve, reject, workerNodeKey } = promiseResponse
1705 if (workerError != null) {
1706 this.emitter?.emit(PoolEvents.taskError, workerError)
1707 reject(workerError.message)
1708 } else {
1709 resolve(data as Response)
1710 }
1711 this.afterTaskExecutionHook(workerNodeKey, message)
1712 this.workerChoiceStrategyContext.update(workerNodeKey)
1713 this.promiseResponseMap.delete(taskId as string)
1714 if (this.opts.enableTasksQueue === true) {
1715 const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks
1716 if (
1717 this.tasksQueueSize(workerNodeKey) > 0 &&
1718 workerNodeTasksUsage.executing <
1719 (this.opts.tasksQueueOptions?.concurrency as number)
1720 ) {
1721 this.executeTask(
1722 workerNodeKey,
1723 this.dequeueTask(workerNodeKey) as Task<Data>
1724 )
1725 }
1726 if (
1727 workerNodeTasksUsage.executing === 0 &&
1728 this.tasksQueueSize(workerNodeKey) === 0 &&
1729 workerNodeTasksUsage.sequentiallyStolen === 0
1730 ) {
1731 this.workerNodes[workerNodeKey].dispatchEvent(
1732 new CustomEvent<WorkerNodeEventDetail>('idleWorkerNode', {
1733 detail: { workerId: workerId as number, workerNodeKey }
1734 })
1735 )
1736 }
1737 }
1738 }
1739 }
1740
1741 private checkAndEmitTaskExecutionEvents (): void {
1742 if (this.busy) {
1743 this.emitter?.emit(PoolEvents.busy, this.info)
1744 }
1745 }
1746
1747 private checkAndEmitTaskQueuingEvents (): void {
1748 if (this.hasBackPressure()) {
1749 this.emitter?.emit(PoolEvents.backPressure, this.info)
1750 }
1751 }
1752
1753 private checkAndEmitDynamicWorkerCreationEvents (): void {
1754 if (this.type === PoolTypes.dynamic) {
1755 if (this.full) {
1756 this.emitter?.emit(PoolEvents.full, this.info)
1757 }
1758 }
1759 }
1760
1761 /**
1762 * Gets the worker information given its worker node key.
1763 *
1764 * @param workerNodeKey - The worker node key.
1765 * @returns The worker information.
1766 */
1767 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
1768 return this.workerNodes[workerNodeKey]?.info
1769 }
1770
1771 /**
1772 * Adds the given worker in the pool worker nodes.
1773 *
1774 * @param worker - The worker.
1775 * @returns The added worker node key.
1776 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1777 */
1778 private addWorkerNode (worker: Worker): number {
1779 const workerNode = new WorkerNode<Worker, Data>(
1780 worker,
1781 this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2)
1782 )
1783 // Flag the worker node as ready at pool startup.
1784 if (this.starting) {
1785 workerNode.info.ready = true
1786 }
1787 this.workerNodes.push(workerNode)
1788 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1789 if (workerNodeKey === -1) {
1790 throw new Error('Worker added not found in worker nodes')
1791 }
1792 return workerNodeKey
1793 }
1794
1795 /**
1796 * Removes the given worker from the pool worker nodes.
1797 *
1798 * @param worker - The worker.
1799 */
1800 private removeWorkerNode (worker: Worker): void {
1801 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1802 if (workerNodeKey !== -1) {
1803 this.workerNodes.splice(workerNodeKey, 1)
1804 this.workerChoiceStrategyContext.remove(workerNodeKey)
1805 }
1806 }
1807
1808 protected flagWorkerNodeAsNotReady (workerNodeKey: number): void {
1809 this.getWorkerInfo(workerNodeKey).ready = false
1810 }
1811
1812 /** @inheritDoc */
1813 public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
1814 return (
1815 this.opts.enableTasksQueue === true &&
1816 this.workerNodes[workerNodeKey].hasBackPressure()
1817 )
1818 }
1819
1820 private hasBackPressure (): boolean {
1821 return (
1822 this.opts.enableTasksQueue === true &&
1823 this.workerNodes.findIndex(
1824 workerNode => !workerNode.hasBackPressure()
1825 ) === -1
1826 )
1827 }
1828
1829 /**
1830 * Executes the given task on the worker given its worker node key.
1831 *
1832 * @param workerNodeKey - The worker node key.
1833 * @param task - The task to execute.
1834 */
1835 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1836 this.beforeTaskExecutionHook(workerNodeKey, task)
1837 this.sendToWorker(workerNodeKey, task, task.transferList)
1838 this.checkAndEmitTaskExecutionEvents()
1839 }
1840
1841 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1842 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1843 this.checkAndEmitTaskQueuingEvents()
1844 return tasksQueueSize
1845 }
1846
1847 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1848 return this.workerNodes[workerNodeKey].dequeueTask()
1849 }
1850
1851 private tasksQueueSize (workerNodeKey: number): number {
1852 return this.workerNodes[workerNodeKey].tasksQueueSize()
1853 }
1854
1855 protected flushTasksQueue (workerNodeKey: number): void {
1856 while (this.tasksQueueSize(workerNodeKey) > 0) {
1857 this.executeTask(
1858 workerNodeKey,
1859 this.dequeueTask(workerNodeKey) as Task<Data>
1860 )
1861 }
1862 this.workerNodes[workerNodeKey].clearTasksQueue()
1863 }
1864
1865 private flushTasksQueues (): void {
1866 for (const [workerNodeKey] of this.workerNodes.entries()) {
1867 this.flushTasksQueue(workerNodeKey)
1868 }
1869 }
1870 }