refactor: cleanup eslint configuration
[poolifier.git] / src / pools / abstract-pool.ts
1 import { AsyncResource } from 'node:async_hooks'
2 import { randomUUID } from 'node:crypto'
3 import { EventEmitterAsyncResource } from 'node:events'
4 import { performance } from 'node:perf_hooks'
5 import type { TransferListItem } from 'node:worker_threads'
6
7 import type {
8 MessageValue,
9 PromiseResponseWrapper,
10 Task
11 } from '../utility-types.js'
12 import {
13 average,
14 DEFAULT_TASK_NAME,
15 EMPTY_FUNCTION,
16 exponentialDelay,
17 isKillBehavior,
18 isPlainObject,
19 max,
20 median,
21 min,
22 round,
23 sleep
24 } from '../utils.js'
25 import type { TaskFunction } from '../worker/task-functions.js'
26 import { KillBehaviors } from '../worker/worker-options.js'
27 import {
28 type IPool,
29 PoolEvents,
30 type PoolInfo,
31 type PoolOptions,
32 type PoolType,
33 PoolTypes,
34 type TasksQueueOptions
35 } from './pool.js'
36 import {
37 Measurements,
38 WorkerChoiceStrategies,
39 type WorkerChoiceStrategy,
40 type WorkerChoiceStrategyOptions
41 } from './selection-strategies/selection-strategies-types.js'
42 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context.js'
43 import {
44 checkFilePath,
45 checkValidTasksQueueOptions,
46 checkValidWorkerChoiceStrategy,
47 getDefaultTasksQueueOptions,
48 updateEluWorkerUsage,
49 updateRunTimeWorkerUsage,
50 updateTaskStatisticsWorkerUsage,
51 updateWaitTimeWorkerUsage,
52 waitWorkerNodeEvents
53 } from './utils.js'
54 import { version } from './version.js'
55 import type {
56 IWorker,
57 IWorkerNode,
58 WorkerInfo,
59 WorkerNodeEventDetail,
60 WorkerType
61 } from './worker.js'
62 import { WorkerNode } from './worker-node.js'
63
64 /**
65 * Base class that implements some shared logic for all poolifier pools.
66 *
67 * @typeParam Worker - Type of worker which manages this pool.
68 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
69 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
70 */
71 export abstract class AbstractPool<
72 Worker extends IWorker,
73 Data = unknown,
74 Response = unknown
75 > implements IPool<Worker, Data, Response> {
76 /** @inheritDoc */
77 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
78
79 /** @inheritDoc */
80 public emitter?: EventEmitterAsyncResource
81
82 /**
83 * The task execution response promise map:
84 * - `key`: The message id of each submitted task.
85 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
86 *
87 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
88 */
89 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
90 new Map<string, PromiseResponseWrapper<Response>>()
91
92 /**
93 * Worker choice strategy context referencing a worker choice algorithm implementation.
94 */
95 protected workerChoiceStrategyContext?: WorkerChoiceStrategyContext<
96 Worker,
97 Data,
98 Response
99 >
100
101 /**
102 * The task functions added at runtime map:
103 * - `key`: The task function name.
104 * - `value`: The task function itself.
105 */
106 private readonly taskFunctions: Map<string, TaskFunction<Data, Response>>
107
108 /**
109 * Whether the pool is started or not.
110 */
111 private started: boolean
112 /**
113 * Whether the pool is starting or not.
114 */
115 private starting: boolean
116 /**
117 * Whether the pool is destroying or not.
118 */
119 private destroying: boolean
120 /**
121 * Whether the pool ready event has been emitted or not.
122 */
123 private readyEventEmitted: boolean
124 /**
125 * The start timestamp of the pool.
126 */
127 private readonly startTimestamp
128
129 /**
130 * Constructs a new poolifier pool.
131 *
132 * @param minimumNumberOfWorkers - Minimum number of workers that this pool manages.
133 * @param filePath - Path to the worker file.
134 * @param opts - Options for the pool.
135 * @param maximumNumberOfWorkers - Maximum number of workers that this pool manages.
136 */
137 public constructor (
138 protected readonly minimumNumberOfWorkers: number,
139 protected readonly filePath: string,
140 protected readonly opts: PoolOptions<Worker>,
141 protected readonly maximumNumberOfWorkers?: number
142 ) {
143 if (!this.isMain()) {
144 throw new Error(
145 'Cannot start a pool from a worker with the same type as the pool'
146 )
147 }
148 this.checkPoolType()
149 checkFilePath(this.filePath)
150 this.checkMinimumNumberOfWorkers(this.minimumNumberOfWorkers)
151 this.checkPoolOptions(this.opts)
152
153 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
154 this.executeTask = this.executeTask.bind(this)
155 this.enqueueTask = this.enqueueTask.bind(this)
156
157 if (this.opts.enableEvents === true) {
158 this.initializeEventEmitter()
159 }
160 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
161 Worker,
162 Data,
163 Response
164 >(
165 this,
166 this.opts.workerChoiceStrategy,
167 this.opts.workerChoiceStrategyOptions
168 )
169
170 this.setupHook()
171
172 this.taskFunctions = new Map<string, TaskFunction<Data, Response>>()
173
174 this.started = false
175 this.starting = false
176 this.destroying = false
177 this.readyEventEmitted = false
178 if (this.opts.startWorkers === true) {
179 this.start()
180 }
181
182 this.startTimestamp = performance.now()
183 }
184
185 private checkPoolType (): void {
186 if (this.type === PoolTypes.fixed && this.maximumNumberOfWorkers != null) {
187 throw new Error(
188 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
189 )
190 }
191 }
192
193 private checkMinimumNumberOfWorkers (
194 minimumNumberOfWorkers: number | undefined
195 ): void {
196 if (minimumNumberOfWorkers == null) {
197 throw new Error(
198 'Cannot instantiate a pool without specifying the number of workers'
199 )
200 } else if (!Number.isSafeInteger(minimumNumberOfWorkers)) {
201 throw new TypeError(
202 'Cannot instantiate a pool with a non safe integer number of workers'
203 )
204 } else if (minimumNumberOfWorkers < 0) {
205 throw new RangeError(
206 'Cannot instantiate a pool with a negative number of workers'
207 )
208 } else if (this.type === PoolTypes.fixed && minimumNumberOfWorkers === 0) {
209 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
210 }
211 }
212
213 private checkPoolOptions (opts: PoolOptions<Worker>): void {
214 if (isPlainObject(opts)) {
215 this.opts.startWorkers = opts.startWorkers ?? true
216 checkValidWorkerChoiceStrategy(opts.workerChoiceStrategy)
217 this.opts.workerChoiceStrategy =
218 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
219 this.checkValidWorkerChoiceStrategyOptions(
220 opts.workerChoiceStrategyOptions
221 )
222 if (opts.workerChoiceStrategyOptions != null) {
223 this.opts.workerChoiceStrategyOptions = opts.workerChoiceStrategyOptions
224 }
225 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
226 this.opts.enableEvents = opts.enableEvents ?? true
227 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
228 if (this.opts.enableTasksQueue) {
229 checkValidTasksQueueOptions(opts.tasksQueueOptions)
230 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
231 opts.tasksQueueOptions
232 )
233 }
234 } else {
235 throw new TypeError('Invalid pool options: must be a plain object')
236 }
237 }
238
239 private checkValidWorkerChoiceStrategyOptions (
240 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions | undefined
241 ): void {
242 if (
243 workerChoiceStrategyOptions != null &&
244 !isPlainObject(workerChoiceStrategyOptions)
245 ) {
246 throw new TypeError(
247 'Invalid worker choice strategy options: must be a plain object'
248 )
249 }
250 if (
251 workerChoiceStrategyOptions?.weights != null &&
252 Object.keys(workerChoiceStrategyOptions.weights).length !==
253 (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
254 ) {
255 throw new Error(
256 'Invalid worker choice strategy options: must have a weight for each worker node'
257 )
258 }
259 if (
260 workerChoiceStrategyOptions?.measurement != null &&
261 !Object.values(Measurements).includes(
262 workerChoiceStrategyOptions.measurement
263 )
264 ) {
265 throw new Error(
266 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
267 )
268 }
269 }
270
271 private initializeEventEmitter (): void {
272 this.emitter = new EventEmitterAsyncResource({
273 name: `poolifier:${this.type}-${this.worker}-pool`
274 })
275 }
276
277 /** @inheritDoc */
278 public get info (): PoolInfo {
279 return {
280 version,
281 type: this.type,
282 worker: this.worker,
283 started: this.started,
284 ready: this.ready,
285 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
286 strategy: this.opts.workerChoiceStrategy!,
287 strategyRetries: this.workerChoiceStrategyContext?.retriesCount ?? 0,
288 minSize: this.minimumNumberOfWorkers,
289 maxSize: this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers,
290 ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
291 .runTime.aggregate === true &&
292 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
293 .waitTime.aggregate && {
294 utilization: round(this.utilization)
295 }),
296 workerNodes: this.workerNodes.length,
297 idleWorkerNodes: this.workerNodes.reduce(
298 (accumulator, workerNode) =>
299 workerNode.usage.tasks.executing === 0
300 ? accumulator + 1
301 : accumulator,
302 0
303 ),
304 ...(this.opts.enableTasksQueue === true && {
305 stealingWorkerNodes: this.workerNodes.reduce(
306 (accumulator, workerNode) =>
307 workerNode.info.stealing ? accumulator + 1 : accumulator,
308 0
309 )
310 }),
311 busyWorkerNodes: this.workerNodes.reduce(
312 (accumulator, _workerNode, workerNodeKey) =>
313 this.isWorkerNodeBusy(workerNodeKey) ? accumulator + 1 : accumulator,
314 0
315 ),
316 executedTasks: this.workerNodes.reduce(
317 (accumulator, workerNode) =>
318 accumulator + workerNode.usage.tasks.executed,
319 0
320 ),
321 executingTasks: this.workerNodes.reduce(
322 (accumulator, workerNode) =>
323 accumulator + workerNode.usage.tasks.executing,
324 0
325 ),
326 ...(this.opts.enableTasksQueue === true && {
327 queuedTasks: this.workerNodes.reduce(
328 (accumulator, workerNode) =>
329 accumulator + workerNode.usage.tasks.queued,
330 0
331 )
332 }),
333 ...(this.opts.enableTasksQueue === true && {
334 maxQueuedTasks: this.workerNodes.reduce(
335 (accumulator, workerNode) =>
336 accumulator + (workerNode.usage.tasks.maxQueued ?? 0),
337 0
338 )
339 }),
340 ...(this.opts.enableTasksQueue === true && {
341 backPressure: this.hasBackPressure()
342 }),
343 ...(this.opts.enableTasksQueue === true && {
344 stolenTasks: this.workerNodes.reduce(
345 (accumulator, workerNode) =>
346 accumulator + workerNode.usage.tasks.stolen,
347 0
348 )
349 }),
350 failedTasks: this.workerNodes.reduce(
351 (accumulator, workerNode) =>
352 accumulator + workerNode.usage.tasks.failed,
353 0
354 ),
355 ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
356 .runTime.aggregate === true && {
357 runTime: {
358 minimum: round(
359 min(
360 ...this.workerNodes.map(
361 workerNode => workerNode.usage.runTime.minimum ?? Infinity
362 )
363 )
364 ),
365 maximum: round(
366 max(
367 ...this.workerNodes.map(
368 workerNode => workerNode.usage.runTime.maximum ?? -Infinity
369 )
370 )
371 ),
372 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
373 .runTime.average && {
374 average: round(
375 average(
376 this.workerNodes.reduce<number[]>(
377 (accumulator, workerNode) =>
378 accumulator.concat(workerNode.usage.runTime.history),
379 []
380 )
381 )
382 )
383 }),
384 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
385 .runTime.median && {
386 median: round(
387 median(
388 this.workerNodes.reduce<number[]>(
389 (accumulator, workerNode) =>
390 accumulator.concat(workerNode.usage.runTime.history),
391 []
392 )
393 )
394 )
395 })
396 }
397 }),
398 ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
399 .waitTime.aggregate === true && {
400 waitTime: {
401 minimum: round(
402 min(
403 ...this.workerNodes.map(
404 workerNode => workerNode.usage.waitTime.minimum ?? Infinity
405 )
406 )
407 ),
408 maximum: round(
409 max(
410 ...this.workerNodes.map(
411 workerNode => workerNode.usage.waitTime.maximum ?? -Infinity
412 )
413 )
414 ),
415 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
416 .waitTime.average && {
417 average: round(
418 average(
419 this.workerNodes.reduce<number[]>(
420 (accumulator, workerNode) =>
421 accumulator.concat(workerNode.usage.waitTime.history),
422 []
423 )
424 )
425 )
426 }),
427 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
428 .waitTime.median && {
429 median: round(
430 median(
431 this.workerNodes.reduce<number[]>(
432 (accumulator, workerNode) =>
433 accumulator.concat(workerNode.usage.waitTime.history),
434 []
435 )
436 )
437 )
438 })
439 }
440 })
441 }
442 }
443
444 /**
445 * The pool readiness boolean status.
446 */
447 private get ready (): boolean {
448 if (this.empty) {
449 return false
450 }
451 return (
452 this.workerNodes.reduce(
453 (accumulator, workerNode) =>
454 !workerNode.info.dynamic && workerNode.info.ready
455 ? accumulator + 1
456 : accumulator,
457 0
458 ) >= this.minimumNumberOfWorkers
459 )
460 }
461
462 /**
463 * The pool emptiness boolean status.
464 */
465 protected get empty (): boolean {
466 return this.minimumNumberOfWorkers === 0 && this.workerNodes.length === 0
467 }
468
469 /**
470 * The approximate pool utilization.
471 *
472 * @returns The pool utilization.
473 */
474 private get utilization (): number {
475 const poolTimeCapacity =
476 (performance.now() - this.startTimestamp) *
477 (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
478 const totalTasksRunTime = this.workerNodes.reduce(
479 (accumulator, workerNode) =>
480 accumulator + (workerNode.usage.runTime.aggregate ?? 0),
481 0
482 )
483 const totalTasksWaitTime = this.workerNodes.reduce(
484 (accumulator, workerNode) =>
485 accumulator + (workerNode.usage.waitTime.aggregate ?? 0),
486 0
487 )
488 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
489 }
490
491 /**
492 * The pool type.
493 *
494 * If it is `'dynamic'`, it provides the `max` property.
495 */
496 protected abstract get type (): PoolType
497
498 /**
499 * The worker type.
500 */
501 protected abstract get worker (): WorkerType
502
503 /**
504 * Checks if the worker id sent in the received message from a worker is valid.
505 *
506 * @param message - The received message.
507 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
508 */
509 private checkMessageWorkerId (message: MessageValue<Data | Response>): void {
510 if (message.workerId == null) {
511 throw new Error('Worker message received without worker id')
512 } else if (this.getWorkerNodeKeyByWorkerId(message.workerId) === -1) {
513 throw new Error(
514 `Worker message received from unknown worker '${message.workerId}'`
515 )
516 }
517 }
518
519 /**
520 * Gets the worker node key given its worker id.
521 *
522 * @param workerId - The worker id.
523 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
524 */
525 private getWorkerNodeKeyByWorkerId (workerId: number | undefined): number {
526 return this.workerNodes.findIndex(
527 workerNode => workerNode.info.id === workerId
528 )
529 }
530
531 /** @inheritDoc */
532 public setWorkerChoiceStrategy (
533 workerChoiceStrategy: WorkerChoiceStrategy,
534 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
535 ): void {
536 checkValidWorkerChoiceStrategy(workerChoiceStrategy)
537 this.opts.workerChoiceStrategy = workerChoiceStrategy
538 this.workerChoiceStrategyContext?.setWorkerChoiceStrategy(
539 this.opts.workerChoiceStrategy
540 )
541 if (workerChoiceStrategyOptions != null) {
542 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
543 }
544 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
545 workerNode.resetUsage()
546 this.sendStatisticsMessageToWorker(workerNodeKey)
547 }
548 }
549
550 /** @inheritDoc */
551 public setWorkerChoiceStrategyOptions (
552 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions | undefined
553 ): void {
554 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
555 if (workerChoiceStrategyOptions != null) {
556 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
557 }
558 this.workerChoiceStrategyContext?.setOptions(
559 this.opts.workerChoiceStrategyOptions
560 )
561 }
562
563 /** @inheritDoc */
564 public enableTasksQueue (
565 enable: boolean,
566 tasksQueueOptions?: TasksQueueOptions
567 ): void {
568 if (this.opts.enableTasksQueue === true && !enable) {
569 this.unsetTaskStealing()
570 this.unsetTasksStealingOnBackPressure()
571 this.flushTasksQueues()
572 }
573 this.opts.enableTasksQueue = enable
574 this.setTasksQueueOptions(tasksQueueOptions)
575 }
576
577 /** @inheritDoc */
578 public setTasksQueueOptions (
579 tasksQueueOptions: TasksQueueOptions | undefined
580 ): void {
581 if (this.opts.enableTasksQueue === true) {
582 checkValidTasksQueueOptions(tasksQueueOptions)
583 this.opts.tasksQueueOptions =
584 this.buildTasksQueueOptions(tasksQueueOptions)
585 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
586 this.setTasksQueueSize(this.opts.tasksQueueOptions.size!)
587 if (this.opts.tasksQueueOptions.taskStealing === true) {
588 this.unsetTaskStealing()
589 this.setTaskStealing()
590 } else {
591 this.unsetTaskStealing()
592 }
593 if (this.opts.tasksQueueOptions.tasksStealingOnBackPressure === true) {
594 this.unsetTasksStealingOnBackPressure()
595 this.setTasksStealingOnBackPressure()
596 } else {
597 this.unsetTasksStealingOnBackPressure()
598 }
599 } else if (this.opts.tasksQueueOptions != null) {
600 delete this.opts.tasksQueueOptions
601 }
602 }
603
604 private buildTasksQueueOptions (
605 tasksQueueOptions: TasksQueueOptions | undefined
606 ): TasksQueueOptions {
607 return {
608 ...getDefaultTasksQueueOptions(
609 this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
610 ),
611 ...tasksQueueOptions
612 }
613 }
614
615 private setTasksQueueSize (size: number): void {
616 for (const workerNode of this.workerNodes) {
617 workerNode.tasksQueueBackPressureSize = size
618 }
619 }
620
621 private setTaskStealing (): void {
622 for (const [workerNodeKey] of this.workerNodes.entries()) {
623 this.workerNodes[workerNodeKey].on('idle', this.handleWorkerNodeIdleEvent)
624 }
625 }
626
627 private unsetTaskStealing (): void {
628 for (const [workerNodeKey] of this.workerNodes.entries()) {
629 this.workerNodes[workerNodeKey].off(
630 'idle',
631 this.handleWorkerNodeIdleEvent
632 )
633 }
634 }
635
636 private setTasksStealingOnBackPressure (): void {
637 for (const [workerNodeKey] of this.workerNodes.entries()) {
638 this.workerNodes[workerNodeKey].on(
639 'backPressure',
640 this.handleWorkerNodeBackPressureEvent
641 )
642 }
643 }
644
645 private unsetTasksStealingOnBackPressure (): void {
646 for (const [workerNodeKey] of this.workerNodes.entries()) {
647 this.workerNodes[workerNodeKey].off(
648 'backPressure',
649 this.handleWorkerNodeBackPressureEvent
650 )
651 }
652 }
653
654 /**
655 * Whether the pool is full or not.
656 *
657 * The pool filling boolean status.
658 */
659 protected get full (): boolean {
660 return (
661 this.workerNodes.length >=
662 (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
663 )
664 }
665
666 /**
667 * Whether the pool is busy or not.
668 *
669 * The pool busyness boolean status.
670 */
671 protected abstract get busy (): boolean
672
673 /**
674 * Whether worker nodes are executing concurrently their tasks quota or not.
675 *
676 * @returns Worker nodes busyness boolean status.
677 */
678 protected internalBusy (): boolean {
679 if (this.opts.enableTasksQueue === true) {
680 return (
681 this.workerNodes.findIndex(
682 workerNode =>
683 workerNode.info.ready &&
684 workerNode.usage.tasks.executing <
685 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
686 this.opts.tasksQueueOptions!.concurrency!
687 ) === -1
688 )
689 }
690 return (
691 this.workerNodes.findIndex(
692 workerNode =>
693 workerNode.info.ready && workerNode.usage.tasks.executing === 0
694 ) === -1
695 )
696 }
697
698 private isWorkerNodeBusy (workerNodeKey: number): boolean {
699 if (this.opts.enableTasksQueue === true) {
700 return (
701 this.workerNodes[workerNodeKey].usage.tasks.executing >=
702 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
703 this.opts.tasksQueueOptions!.concurrency!
704 )
705 }
706 return this.workerNodes[workerNodeKey].usage.tasks.executing > 0
707 }
708
709 private async sendTaskFunctionOperationToWorker (
710 workerNodeKey: number,
711 message: MessageValue<Data>
712 ): Promise<boolean> {
713 return await new Promise<boolean>((resolve, reject) => {
714 const taskFunctionOperationListener = (
715 message: MessageValue<Response>
716 ): void => {
717 this.checkMessageWorkerId(message)
718 const workerId = this.getWorkerInfo(workerNodeKey)?.id
719 if (
720 message.taskFunctionOperationStatus != null &&
721 message.workerId === workerId
722 ) {
723 if (message.taskFunctionOperationStatus) {
724 resolve(true)
725 } else {
726 reject(
727 new Error(
728 `Task function operation '${message.taskFunctionOperation}' failed on worker ${message.workerId} with error: '${message.workerError?.message}'`
729 )
730 )
731 }
732 this.deregisterWorkerMessageListener(
733 this.getWorkerNodeKeyByWorkerId(message.workerId),
734 taskFunctionOperationListener
735 )
736 }
737 }
738 this.registerWorkerMessageListener(
739 workerNodeKey,
740 taskFunctionOperationListener
741 )
742 this.sendToWorker(workerNodeKey, message)
743 })
744 }
745
746 private async sendTaskFunctionOperationToWorkers (
747 message: MessageValue<Data>
748 ): Promise<boolean> {
749 return await new Promise<boolean>((resolve, reject) => {
750 const responsesReceived = new Array<MessageValue<Response>>()
751 const taskFunctionOperationsListener = (
752 message: MessageValue<Response>
753 ): void => {
754 this.checkMessageWorkerId(message)
755 if (message.taskFunctionOperationStatus != null) {
756 responsesReceived.push(message)
757 if (responsesReceived.length === this.workerNodes.length) {
758 if (
759 responsesReceived.every(
760 message => message.taskFunctionOperationStatus === true
761 )
762 ) {
763 resolve(true)
764 } else if (
765 responsesReceived.some(
766 message => message.taskFunctionOperationStatus === false
767 )
768 ) {
769 const errorResponse = responsesReceived.find(
770 response => response.taskFunctionOperationStatus === false
771 )
772 reject(
773 new Error(
774 `Task function operation '${
775 message.taskFunctionOperation as string
776 }' failed on worker ${errorResponse?.workerId} with error: '${
777 errorResponse?.workerError?.message
778 }'`
779 )
780 )
781 }
782 this.deregisterWorkerMessageListener(
783 this.getWorkerNodeKeyByWorkerId(message.workerId),
784 taskFunctionOperationsListener
785 )
786 }
787 }
788 }
789 for (const [workerNodeKey] of this.workerNodes.entries()) {
790 this.registerWorkerMessageListener(
791 workerNodeKey,
792 taskFunctionOperationsListener
793 )
794 this.sendToWorker(workerNodeKey, message)
795 }
796 })
797 }
798
799 /** @inheritDoc */
800 public hasTaskFunction (name: string): boolean {
801 for (const workerNode of this.workerNodes) {
802 if (
803 Array.isArray(workerNode.info.taskFunctionNames) &&
804 workerNode.info.taskFunctionNames.includes(name)
805 ) {
806 return true
807 }
808 }
809 return false
810 }
811
812 /** @inheritDoc */
813 public async addTaskFunction (
814 name: string,
815 fn: TaskFunction<Data, Response>
816 ): Promise<boolean> {
817 if (typeof name !== 'string') {
818 throw new TypeError('name argument must be a string')
819 }
820 if (typeof name === 'string' && name.trim().length === 0) {
821 throw new TypeError('name argument must not be an empty string')
822 }
823 if (typeof fn !== 'function') {
824 throw new TypeError('fn argument must be a function')
825 }
826 const opResult = await this.sendTaskFunctionOperationToWorkers({
827 taskFunctionOperation: 'add',
828 taskFunctionName: name,
829 taskFunction: fn.toString()
830 })
831 this.taskFunctions.set(name, fn)
832 return opResult
833 }
834
835 /** @inheritDoc */
836 public async removeTaskFunction (name: string): Promise<boolean> {
837 if (!this.taskFunctions.has(name)) {
838 throw new Error(
839 'Cannot remove a task function not handled on the pool side'
840 )
841 }
842 const opResult = await this.sendTaskFunctionOperationToWorkers({
843 taskFunctionOperation: 'remove',
844 taskFunctionName: name
845 })
846 this.deleteTaskFunctionWorkerUsages(name)
847 this.taskFunctions.delete(name)
848 return opResult
849 }
850
851 /** @inheritDoc */
852 public listTaskFunctionNames (): string[] {
853 for (const workerNode of this.workerNodes) {
854 if (
855 Array.isArray(workerNode.info.taskFunctionNames) &&
856 workerNode.info.taskFunctionNames.length > 0
857 ) {
858 return workerNode.info.taskFunctionNames
859 }
860 }
861 return []
862 }
863
864 /** @inheritDoc */
865 public async setDefaultTaskFunction (name: string): Promise<boolean> {
866 return await this.sendTaskFunctionOperationToWorkers({
867 taskFunctionOperation: 'default',
868 taskFunctionName: name
869 })
870 }
871
872 private deleteTaskFunctionWorkerUsages (name: string): void {
873 for (const workerNode of this.workerNodes) {
874 workerNode.deleteTaskFunctionWorkerUsage(name)
875 }
876 }
877
878 private shallExecuteTask (workerNodeKey: number): boolean {
879 return (
880 this.tasksQueueSize(workerNodeKey) === 0 &&
881 this.workerNodes[workerNodeKey].usage.tasks.executing <
882 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
883 this.opts.tasksQueueOptions!.concurrency!
884 )
885 }
886
887 /** @inheritDoc */
888 public async execute (
889 data?: Data,
890 name?: string,
891 transferList?: TransferListItem[]
892 ): Promise<Response> {
893 return await new Promise<Response>((resolve, reject) => {
894 if (!this.started) {
895 reject(new Error('Cannot execute a task on not started pool'))
896 return
897 }
898 if (this.destroying) {
899 reject(new Error('Cannot execute a task on destroying pool'))
900 return
901 }
902 if (name != null && typeof name !== 'string') {
903 reject(new TypeError('name argument must be a string'))
904 return
905 }
906 if (
907 name != null &&
908 typeof name === 'string' &&
909 name.trim().length === 0
910 ) {
911 reject(new TypeError('name argument must not be an empty string'))
912 return
913 }
914 if (transferList != null && !Array.isArray(transferList)) {
915 reject(new TypeError('transferList argument must be an array'))
916 return
917 }
918 const timestamp = performance.now()
919 const workerNodeKey = this.chooseWorkerNode()
920 const task: Task<Data> = {
921 name: name ?? DEFAULT_TASK_NAME,
922 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
923 data: data ?? ({} as Data),
924 transferList,
925 timestamp,
926 taskId: randomUUID()
927 }
928 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
929 this.promiseResponseMap.set(task.taskId!, {
930 resolve,
931 reject,
932 workerNodeKey,
933 ...(this.emitter != null && {
934 asyncResource: new AsyncResource('poolifier:task', {
935 triggerAsyncId: this.emitter.asyncId,
936 requireManualDestroy: true
937 })
938 })
939 })
940 if (
941 this.opts.enableTasksQueue === false ||
942 (this.opts.enableTasksQueue === true &&
943 this.shallExecuteTask(workerNodeKey))
944 ) {
945 this.executeTask(workerNodeKey, task)
946 } else {
947 this.enqueueTask(workerNodeKey, task)
948 }
949 })
950 }
951
952 /** @inheritdoc */
953 public start (): void {
954 if (this.started) {
955 throw new Error('Cannot start an already started pool')
956 }
957 if (this.starting) {
958 throw new Error('Cannot start an already starting pool')
959 }
960 if (this.destroying) {
961 throw new Error('Cannot start a destroying pool')
962 }
963 this.starting = true
964 while (
965 this.workerNodes.reduce(
966 (accumulator, workerNode) =>
967 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
968 0
969 ) < this.minimumNumberOfWorkers
970 ) {
971 this.createAndSetupWorkerNode()
972 }
973 this.starting = false
974 this.started = true
975 }
976
977 /** @inheritDoc */
978 public async destroy (): Promise<void> {
979 if (!this.started) {
980 throw new Error('Cannot destroy an already destroyed pool')
981 }
982 if (this.starting) {
983 throw new Error('Cannot destroy an starting pool')
984 }
985 if (this.destroying) {
986 throw new Error('Cannot destroy an already destroying pool')
987 }
988 this.destroying = true
989 await Promise.all(
990 this.workerNodes.map(async (_workerNode, workerNodeKey) => {
991 await this.destroyWorkerNode(workerNodeKey)
992 })
993 )
994 this.emitter?.emit(PoolEvents.destroy, this.info)
995 this.emitter?.emitDestroy()
996 this.emitter?.removeAllListeners()
997 this.readyEventEmitted = false
998 this.destroying = false
999 this.started = false
1000 }
1001
1002 private async sendKillMessageToWorker (workerNodeKey: number): Promise<void> {
1003 await new Promise<void>((resolve, reject) => {
1004 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1005 if (this.workerNodes[workerNodeKey] == null) {
1006 resolve()
1007 return
1008 }
1009 const killMessageListener = (message: MessageValue<Response>): void => {
1010 this.checkMessageWorkerId(message)
1011 if (message.kill === 'success') {
1012 resolve()
1013 } else if (message.kill === 'failure') {
1014 reject(
1015 new Error(
1016 `Kill message handling failed on worker ${message.workerId}`
1017 )
1018 )
1019 }
1020 }
1021 // FIXME: should be registered only once
1022 this.registerWorkerMessageListener(workerNodeKey, killMessageListener)
1023 this.sendToWorker(workerNodeKey, { kill: true })
1024 })
1025 }
1026
1027 /**
1028 * Terminates the worker node given its worker node key.
1029 *
1030 * @param workerNodeKey - The worker node key.
1031 */
1032 protected async destroyWorkerNode (workerNodeKey: number): Promise<void> {
1033 this.flagWorkerNodeAsNotReady(workerNodeKey)
1034 const flushedTasks = this.flushTasksQueue(workerNodeKey)
1035 const workerNode = this.workerNodes[workerNodeKey]
1036 await waitWorkerNodeEvents(
1037 workerNode,
1038 'taskFinished',
1039 flushedTasks,
1040 this.opts.tasksQueueOptions?.tasksFinishedTimeout ??
1041 getDefaultTasksQueueOptions(
1042 this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
1043 ).tasksFinishedTimeout
1044 )
1045 await this.sendKillMessageToWorker(workerNodeKey)
1046 await workerNode.terminate()
1047 }
1048
1049 /**
1050 * Setup hook to execute code before worker nodes are created in the abstract constructor.
1051 * Can be overridden.
1052 *
1053 * @virtual
1054 */
1055 protected setupHook (): void {
1056 /* Intentionally empty */
1057 }
1058
1059 /**
1060 * Should return whether the worker is the main worker or not.
1061 */
1062 protected abstract isMain (): boolean
1063
1064 /**
1065 * Hook executed before the worker task execution.
1066 * Can be overridden.
1067 *
1068 * @param workerNodeKey - The worker node key.
1069 * @param task - The task to execute.
1070 */
1071 protected beforeTaskExecutionHook (
1072 workerNodeKey: number,
1073 task: Task<Data>
1074 ): void {
1075 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1076 if (this.workerNodes[workerNodeKey]?.usage != null) {
1077 const workerUsage = this.workerNodes[workerNodeKey].usage
1078 ++workerUsage.tasks.executing
1079 updateWaitTimeWorkerUsage(
1080 this.workerChoiceStrategyContext,
1081 workerUsage,
1082 task
1083 )
1084 }
1085 if (
1086 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1087 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1088 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(task.name!) !=
1089 null
1090 ) {
1091 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1092 const taskFunctionWorkerUsage = this.workerNodes[
1093 workerNodeKey
1094 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1095 ].getTaskFunctionWorkerUsage(task.name!)!
1096 ++taskFunctionWorkerUsage.tasks.executing
1097 updateWaitTimeWorkerUsage(
1098 this.workerChoiceStrategyContext,
1099 taskFunctionWorkerUsage,
1100 task
1101 )
1102 }
1103 }
1104
1105 /**
1106 * Hook executed after the worker task execution.
1107 * Can be overridden.
1108 *
1109 * @param workerNodeKey - The worker node key.
1110 * @param message - The received message.
1111 */
1112 protected afterTaskExecutionHook (
1113 workerNodeKey: number,
1114 message: MessageValue<Response>
1115 ): void {
1116 let needWorkerChoiceStrategyUpdate = false
1117 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1118 if (this.workerNodes[workerNodeKey]?.usage != null) {
1119 const workerUsage = this.workerNodes[workerNodeKey].usage
1120 updateTaskStatisticsWorkerUsage(workerUsage, message)
1121 updateRunTimeWorkerUsage(
1122 this.workerChoiceStrategyContext,
1123 workerUsage,
1124 message
1125 )
1126 updateEluWorkerUsage(
1127 this.workerChoiceStrategyContext,
1128 workerUsage,
1129 message
1130 )
1131 needWorkerChoiceStrategyUpdate = true
1132 }
1133 if (
1134 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1135 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
1136 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1137 message.taskPerformance!.name
1138 ) != null
1139 ) {
1140 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1141 const taskFunctionWorkerUsage = this.workerNodes[
1142 workerNodeKey
1143 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1144 ].getTaskFunctionWorkerUsage(message.taskPerformance!.name)!
1145 updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
1146 updateRunTimeWorkerUsage(
1147 this.workerChoiceStrategyContext,
1148 taskFunctionWorkerUsage,
1149 message
1150 )
1151 updateEluWorkerUsage(
1152 this.workerChoiceStrategyContext,
1153 taskFunctionWorkerUsage,
1154 message
1155 )
1156 needWorkerChoiceStrategyUpdate = true
1157 }
1158 if (needWorkerChoiceStrategyUpdate) {
1159 this.workerChoiceStrategyContext?.update(workerNodeKey)
1160 }
1161 }
1162
1163 /**
1164 * Whether the worker node shall update its task function worker usage or not.
1165 *
1166 * @param workerNodeKey - The worker node key.
1167 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1168 */
1169 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
1170 const workerInfo = this.getWorkerInfo(workerNodeKey)
1171 return (
1172 workerInfo != null &&
1173 Array.isArray(workerInfo.taskFunctionNames) &&
1174 workerInfo.taskFunctionNames.length > 2
1175 )
1176 }
1177
1178 /**
1179 * Chooses a worker node for the next task.
1180 *
1181 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
1182 *
1183 * @returns The chosen worker node key
1184 */
1185 private chooseWorkerNode (): number {
1186 if (this.shallCreateDynamicWorker()) {
1187 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
1188 if (
1189 this.workerChoiceStrategyContext?.getStrategyPolicy()
1190 .dynamicWorkerUsage === true
1191 ) {
1192 return workerNodeKey
1193 }
1194 }
1195 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1196 return this.workerChoiceStrategyContext!.execute()
1197 }
1198
1199 /**
1200 * Conditions for dynamic worker creation.
1201 *
1202 * @returns Whether to create a dynamic worker or not.
1203 */
1204 protected abstract shallCreateDynamicWorker (): boolean
1205
1206 /**
1207 * Sends a message to worker given its worker node key.
1208 *
1209 * @param workerNodeKey - The worker node key.
1210 * @param message - The message.
1211 * @param transferList - The optional array of transferable objects.
1212 */
1213 protected abstract sendToWorker (
1214 workerNodeKey: number,
1215 message: MessageValue<Data>,
1216 transferList?: TransferListItem[]
1217 ): void
1218
1219 /**
1220 * Creates a new, completely set up worker node.
1221 *
1222 * @returns New, completely set up worker node key.
1223 */
1224 protected createAndSetupWorkerNode (): number {
1225 const workerNode = this.createWorkerNode()
1226 workerNode.registerWorkerEventHandler(
1227 'online',
1228 this.opts.onlineHandler ?? EMPTY_FUNCTION
1229 )
1230 workerNode.registerWorkerEventHandler(
1231 'message',
1232 this.opts.messageHandler ?? EMPTY_FUNCTION
1233 )
1234 workerNode.registerWorkerEventHandler(
1235 'error',
1236 this.opts.errorHandler ?? EMPTY_FUNCTION
1237 )
1238 workerNode.registerWorkerEventHandler('error', (error: Error) => {
1239 workerNode.info.ready = false
1240 this.emitter?.emit(PoolEvents.error, error)
1241 if (
1242 this.started &&
1243 !this.destroying &&
1244 this.opts.restartWorkerOnError === true
1245 ) {
1246 if (workerNode.info.dynamic) {
1247 this.createAndSetupDynamicWorkerNode()
1248 } else {
1249 this.createAndSetupWorkerNode()
1250 }
1251 }
1252 if (
1253 this.started &&
1254 !this.destroying &&
1255 this.opts.enableTasksQueue === true
1256 ) {
1257 this.redistributeQueuedTasks(this.workerNodes.indexOf(workerNode))
1258 }
1259 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1260 workerNode?.terminate().catch(error => {
1261 this.emitter?.emit(PoolEvents.error, error)
1262 })
1263 })
1264 workerNode.registerWorkerEventHandler(
1265 'exit',
1266 this.opts.exitHandler ?? EMPTY_FUNCTION
1267 )
1268 workerNode.registerOnceWorkerEventHandler('exit', () => {
1269 this.removeWorkerNode(workerNode)
1270 })
1271 const workerNodeKey = this.addWorkerNode(workerNode)
1272 this.afterWorkerNodeSetup(workerNodeKey)
1273 return workerNodeKey
1274 }
1275
1276 /**
1277 * Creates a new, completely set up dynamic worker node.
1278 *
1279 * @returns New, completely set up dynamic worker node key.
1280 */
1281 protected createAndSetupDynamicWorkerNode (): number {
1282 const workerNodeKey = this.createAndSetupWorkerNode()
1283 this.registerWorkerMessageListener(workerNodeKey, message => {
1284 this.checkMessageWorkerId(message)
1285 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1286 message.workerId
1287 )
1288 const workerUsage = this.workerNodes[localWorkerNodeKey]?.usage
1289 // Kill message received from worker
1290 if (
1291 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1292 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
1293 ((this.opts.enableTasksQueue === false &&
1294 workerUsage.tasks.executing === 0) ||
1295 (this.opts.enableTasksQueue === true &&
1296 workerUsage.tasks.executing === 0 &&
1297 this.tasksQueueSize(localWorkerNodeKey) === 0)))
1298 ) {
1299 // Flag the worker node as not ready immediately
1300 this.flagWorkerNodeAsNotReady(localWorkerNodeKey)
1301 this.destroyWorkerNode(localWorkerNodeKey).catch(error => {
1302 this.emitter?.emit(PoolEvents.error, error)
1303 })
1304 }
1305 })
1306 this.sendToWorker(workerNodeKey, {
1307 checkActive: true
1308 })
1309 if (this.taskFunctions.size > 0) {
1310 for (const [taskFunctionName, taskFunction] of this.taskFunctions) {
1311 this.sendTaskFunctionOperationToWorker(workerNodeKey, {
1312 taskFunctionOperation: 'add',
1313 taskFunctionName,
1314 taskFunction: taskFunction.toString()
1315 }).catch(error => {
1316 this.emitter?.emit(PoolEvents.error, error)
1317 })
1318 }
1319 }
1320 const workerNode = this.workerNodes[workerNodeKey]
1321 workerNode.info.dynamic = true
1322 if (
1323 this.workerChoiceStrategyContext?.getStrategyPolicy()
1324 .dynamicWorkerReady === true ||
1325 this.workerChoiceStrategyContext?.getStrategyPolicy()
1326 .dynamicWorkerUsage === true
1327 ) {
1328 workerNode.info.ready = true
1329 }
1330 this.checkAndEmitDynamicWorkerCreationEvents()
1331 return workerNodeKey
1332 }
1333
1334 /**
1335 * Registers a listener callback on the worker given its worker node key.
1336 *
1337 * @param workerNodeKey - The worker node key.
1338 * @param listener - The message listener callback.
1339 */
1340 protected abstract registerWorkerMessageListener<
1341 Message extends Data | Response
1342 >(
1343 workerNodeKey: number,
1344 listener: (message: MessageValue<Message>) => void
1345 ): void
1346
1347 /**
1348 * Registers once a listener callback on the worker given its worker node key.
1349 *
1350 * @param workerNodeKey - The worker node key.
1351 * @param listener - The message listener callback.
1352 */
1353 protected abstract registerOnceWorkerMessageListener<
1354 Message extends Data | Response
1355 >(
1356 workerNodeKey: number,
1357 listener: (message: MessageValue<Message>) => void
1358 ): void
1359
1360 /**
1361 * Deregisters a listener callback on the worker given its worker node key.
1362 *
1363 * @param workerNodeKey - The worker node key.
1364 * @param listener - The message listener callback.
1365 */
1366 protected abstract deregisterWorkerMessageListener<
1367 Message extends Data | Response
1368 >(
1369 workerNodeKey: number,
1370 listener: (message: MessageValue<Message>) => void
1371 ): void
1372
1373 /**
1374 * Method hooked up after a worker node has been newly created.
1375 * Can be overridden.
1376 *
1377 * @param workerNodeKey - The newly created worker node key.
1378 */
1379 protected afterWorkerNodeSetup (workerNodeKey: number): void {
1380 // Listen to worker messages.
1381 this.registerWorkerMessageListener(
1382 workerNodeKey,
1383 this.workerMessageListener
1384 )
1385 // Send the startup message to worker.
1386 this.sendStartupMessageToWorker(workerNodeKey)
1387 // Send the statistics message to worker.
1388 this.sendStatisticsMessageToWorker(workerNodeKey)
1389 if (this.opts.enableTasksQueue === true) {
1390 if (this.opts.tasksQueueOptions?.taskStealing === true) {
1391 this.workerNodes[workerNodeKey].on(
1392 'idle',
1393 this.handleWorkerNodeIdleEvent
1394 )
1395 }
1396 if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
1397 this.workerNodes[workerNodeKey].on(
1398 'backPressure',
1399 this.handleWorkerNodeBackPressureEvent
1400 )
1401 }
1402 }
1403 }
1404
1405 /**
1406 * Sends the startup message to worker given its worker node key.
1407 *
1408 * @param workerNodeKey - The worker node key.
1409 */
1410 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1411
1412 /**
1413 * Sends the statistics message to worker given its worker node key.
1414 *
1415 * @param workerNodeKey - The worker node key.
1416 */
1417 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
1418 this.sendToWorker(workerNodeKey, {
1419 statistics: {
1420 runTime:
1421 this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
1422 .runTime.aggregate ?? false,
1423 elu:
1424 this.workerChoiceStrategyContext?.getTaskStatisticsRequirements().elu
1425 .aggregate ?? false
1426 }
1427 })
1428 }
1429
1430 private cannotStealTask (): boolean {
1431 return this.workerNodes.length <= 1 || this.info.queuedTasks === 0
1432 }
1433
1434 private handleTask (workerNodeKey: number, task: Task<Data>): void {
1435 if (this.shallExecuteTask(workerNodeKey)) {
1436 this.executeTask(workerNodeKey, task)
1437 } else {
1438 this.enqueueTask(workerNodeKey, task)
1439 }
1440 }
1441
1442 private redistributeQueuedTasks (workerNodeKey: number): void {
1443 if (workerNodeKey === -1 || this.cannotStealTask()) {
1444 return
1445 }
1446 while (this.tasksQueueSize(workerNodeKey) > 0) {
1447 const destinationWorkerNodeKey = this.workerNodes.reduce(
1448 (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
1449 return workerNode.info.ready &&
1450 workerNode.usage.tasks.queued <
1451 workerNodes[minWorkerNodeKey].usage.tasks.queued
1452 ? workerNodeKey
1453 : minWorkerNodeKey
1454 },
1455 0
1456 )
1457 this.handleTask(
1458 destinationWorkerNodeKey,
1459 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1460 this.dequeueTask(workerNodeKey)!
1461 )
1462 }
1463 }
1464
1465 private updateTaskStolenStatisticsWorkerUsage (
1466 workerNodeKey: number,
1467 taskName: string
1468 ): void {
1469 const workerNode = this.workerNodes[workerNodeKey]
1470 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1471 if (workerNode?.usage != null) {
1472 ++workerNode.usage.tasks.stolen
1473 }
1474 if (
1475 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1476 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1477 ) {
1478 const taskFunctionWorkerUsage =
1479 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1480 workerNode.getTaskFunctionWorkerUsage(taskName)!
1481 ++taskFunctionWorkerUsage.tasks.stolen
1482 }
1483 }
1484
1485 private updateTaskSequentiallyStolenStatisticsWorkerUsage (
1486 workerNodeKey: number
1487 ): void {
1488 const workerNode = this.workerNodes[workerNodeKey]
1489 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1490 if (workerNode?.usage != null) {
1491 ++workerNode.usage.tasks.sequentiallyStolen
1492 }
1493 }
1494
1495 private updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1496 workerNodeKey: number,
1497 taskName: string
1498 ): void {
1499 const workerNode = this.workerNodes[workerNodeKey]
1500 if (
1501 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1502 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1503 ) {
1504 const taskFunctionWorkerUsage =
1505 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1506 workerNode.getTaskFunctionWorkerUsage(taskName)!
1507 ++taskFunctionWorkerUsage.tasks.sequentiallyStolen
1508 }
1509 }
1510
1511 private resetTaskSequentiallyStolenStatisticsWorkerUsage (
1512 workerNodeKey: number
1513 ): void {
1514 const workerNode = this.workerNodes[workerNodeKey]
1515 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1516 if (workerNode?.usage != null) {
1517 workerNode.usage.tasks.sequentiallyStolen = 0
1518 }
1519 }
1520
1521 private resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1522 workerNodeKey: number,
1523 taskName: string
1524 ): void {
1525 const workerNode = this.workerNodes[workerNodeKey]
1526 if (
1527 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1528 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1529 ) {
1530 const taskFunctionWorkerUsage =
1531 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1532 workerNode.getTaskFunctionWorkerUsage(taskName)!
1533 taskFunctionWorkerUsage.tasks.sequentiallyStolen = 0
1534 }
1535 }
1536
1537 private readonly handleWorkerNodeIdleEvent = (
1538 eventDetail: WorkerNodeEventDetail,
1539 previousStolenTask?: Task<Data>
1540 ): void => {
1541 const { workerNodeKey } = eventDetail
1542 if (workerNodeKey == null) {
1543 throw new Error(
1544 "WorkerNode event detail 'workerNodeKey' property must be defined"
1545 )
1546 }
1547 const workerInfo = this.getWorkerInfo(workerNodeKey)
1548 if (
1549 this.cannotStealTask() ||
1550 (this.info.stealingWorkerNodes ?? 0) >
1551 Math.floor(this.workerNodes.length / 2)
1552 ) {
1553 if (workerInfo != null && previousStolenTask != null) {
1554 workerInfo.stealing = false
1555 }
1556 return
1557 }
1558 const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks
1559 if (
1560 workerInfo != null &&
1561 previousStolenTask != null &&
1562 workerNodeTasksUsage.sequentiallyStolen > 0 &&
1563 (workerNodeTasksUsage.executing > 0 ||
1564 this.tasksQueueSize(workerNodeKey) > 0)
1565 ) {
1566 workerInfo.stealing = false
1567 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1568 for (const taskName of workerInfo.taskFunctionNames!) {
1569 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1570 workerNodeKey,
1571 taskName
1572 )
1573 }
1574 this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
1575 return
1576 }
1577 if (workerInfo == null) {
1578 throw new Error(
1579 `Worker node with key '${workerNodeKey}' not found in pool`
1580 )
1581 }
1582 workerInfo.stealing = true
1583 const stolenTask = this.workerNodeStealTask(workerNodeKey)
1584 if (
1585 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1586 stolenTask != null
1587 ) {
1588 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1589 const taskFunctionTasksWorkerUsage = this.workerNodes[
1590 workerNodeKey
1591 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1592 ].getTaskFunctionWorkerUsage(stolenTask.name!)!.tasks
1593 if (
1594 taskFunctionTasksWorkerUsage.sequentiallyStolen === 0 ||
1595 (previousStolenTask != null &&
1596 previousStolenTask.name === stolenTask.name &&
1597 taskFunctionTasksWorkerUsage.sequentiallyStolen > 0)
1598 ) {
1599 this.updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1600 workerNodeKey,
1601 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1602 stolenTask.name!
1603 )
1604 } else {
1605 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1606 workerNodeKey,
1607 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1608 stolenTask.name!
1609 )
1610 }
1611 }
1612 sleep(exponentialDelay(workerNodeTasksUsage.sequentiallyStolen))
1613 .then(() => {
1614 this.handleWorkerNodeIdleEvent(eventDetail, stolenTask)
1615 return undefined
1616 })
1617 .catch(error => {
1618 this.emitter?.emit(PoolEvents.error, error)
1619 })
1620 }
1621
1622 private readonly workerNodeStealTask = (
1623 workerNodeKey: number
1624 ): Task<Data> | undefined => {
1625 const workerNodes = this.workerNodes
1626 .slice()
1627 .sort(
1628 (workerNodeA, workerNodeB) =>
1629 workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
1630 )
1631 const sourceWorkerNode = workerNodes.find(
1632 (sourceWorkerNode, sourceWorkerNodeKey) =>
1633 sourceWorkerNode.info.ready &&
1634 !sourceWorkerNode.info.stealing &&
1635 sourceWorkerNodeKey !== workerNodeKey &&
1636 sourceWorkerNode.usage.tasks.queued > 0
1637 )
1638 if (sourceWorkerNode != null) {
1639 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1640 const task = sourceWorkerNode.popTask()!
1641 this.handleTask(workerNodeKey, task)
1642 this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
1643 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1644 this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey, task.name!)
1645 return task
1646 }
1647 }
1648
1649 private readonly handleWorkerNodeBackPressureEvent = (
1650 eventDetail: WorkerNodeEventDetail
1651 ): void => {
1652 if (
1653 this.cannotStealTask() ||
1654 (this.info.stealingWorkerNodes ?? 0) >
1655 Math.floor(this.workerNodes.length / 2)
1656 ) {
1657 return
1658 }
1659 const { workerId } = eventDetail
1660 const sizeOffset = 1
1661 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1662 if (this.opts.tasksQueueOptions!.size! <= sizeOffset) {
1663 return
1664 }
1665 const sourceWorkerNode =
1666 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
1667 const workerNodes = this.workerNodes
1668 .slice()
1669 .sort(
1670 (workerNodeA, workerNodeB) =>
1671 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1672 )
1673 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1674 if (
1675 sourceWorkerNode.usage.tasks.queued > 0 &&
1676 workerNode.info.ready &&
1677 !workerNode.info.stealing &&
1678 workerNode.info.id !== workerId &&
1679 workerNode.usage.tasks.queued <
1680 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1681 this.opts.tasksQueueOptions!.size! - sizeOffset
1682 ) {
1683 const workerInfo = this.getWorkerInfo(workerNodeKey)
1684 if (workerInfo == null) {
1685 throw new Error(
1686 `Worker node with key '${workerNodeKey}' not found in pool`
1687 )
1688 }
1689 workerInfo.stealing = true
1690 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1691 const task = sourceWorkerNode.popTask()!
1692 this.handleTask(workerNodeKey, task)
1693 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1694 this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey, task.name!)
1695 workerInfo.stealing = false
1696 }
1697 }
1698 }
1699
1700 /**
1701 * This method is the message listener registered on each worker.
1702 */
1703 protected readonly workerMessageListener = (
1704 message: MessageValue<Response>
1705 ): void => {
1706 this.checkMessageWorkerId(message)
1707 const { workerId, ready, taskId, taskFunctionNames } = message
1708 if (ready != null && taskFunctionNames != null) {
1709 // Worker ready response received from worker
1710 this.handleWorkerReadyResponse(message)
1711 } else if (taskId != null) {
1712 // Task execution response received from worker
1713 this.handleTaskExecutionResponse(message)
1714 } else if (taskFunctionNames != null) {
1715 // Task function names message received from worker
1716 const workerInfo = this.getWorkerInfo(
1717 this.getWorkerNodeKeyByWorkerId(workerId)
1718 )
1719 if (workerInfo != null) {
1720 workerInfo.taskFunctionNames = taskFunctionNames
1721 }
1722 }
1723 }
1724
1725 private checkAndEmitReadyEvent (): void {
1726 if (!this.readyEventEmitted && this.ready) {
1727 this.emitter?.emit(PoolEvents.ready, this.info)
1728 this.readyEventEmitted = true
1729 }
1730 }
1731
1732 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
1733 const { workerId, ready, taskFunctionNames } = message
1734 if (ready == null || !ready) {
1735 throw new Error(`Worker ${workerId} failed to initialize`)
1736 }
1737 const workerNode =
1738 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
1739 workerNode.info.ready = ready
1740 workerNode.info.taskFunctionNames = taskFunctionNames
1741 this.checkAndEmitReadyEvent()
1742 }
1743
1744 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1745 const { workerId, taskId, workerError, data } = message
1746 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1747 const promiseResponse = this.promiseResponseMap.get(taskId!)
1748 if (promiseResponse != null) {
1749 const { resolve, reject, workerNodeKey, asyncResource } = promiseResponse
1750 const workerNode = this.workerNodes[workerNodeKey]
1751 if (workerError != null) {
1752 this.emitter?.emit(PoolEvents.taskError, workerError)
1753 asyncResource != null
1754 ? asyncResource.runInAsyncScope(
1755 reject,
1756 this.emitter,
1757 workerError.message
1758 )
1759 : reject(workerError.message)
1760 } else {
1761 asyncResource != null
1762 ? asyncResource.runInAsyncScope(resolve, this.emitter, data)
1763 : resolve(data as Response)
1764 }
1765 asyncResource?.emitDestroy()
1766 this.afterTaskExecutionHook(workerNodeKey, message)
1767 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1768 this.promiseResponseMap.delete(taskId!)
1769 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1770 workerNode?.emit('taskFinished', taskId)
1771 if (
1772 this.opts.enableTasksQueue === true &&
1773 !this.destroying &&
1774 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1775 workerNode != null
1776 ) {
1777 const workerNodeTasksUsage = workerNode.usage.tasks
1778 if (
1779 this.tasksQueueSize(workerNodeKey) > 0 &&
1780 workerNodeTasksUsage.executing <
1781 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1782 this.opts.tasksQueueOptions!.concurrency!
1783 ) {
1784 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1785 this.executeTask(workerNodeKey, this.dequeueTask(workerNodeKey)!)
1786 }
1787 if (
1788 workerNodeTasksUsage.executing === 0 &&
1789 this.tasksQueueSize(workerNodeKey) === 0 &&
1790 workerNodeTasksUsage.sequentiallyStolen === 0
1791 ) {
1792 workerNode.emit('idle', {
1793 workerId,
1794 workerNodeKey
1795 })
1796 }
1797 }
1798 }
1799 }
1800
1801 private checkAndEmitTaskExecutionEvents (): void {
1802 if (this.busy) {
1803 this.emitter?.emit(PoolEvents.busy, this.info)
1804 }
1805 }
1806
1807 private checkAndEmitTaskQueuingEvents (): void {
1808 if (this.hasBackPressure()) {
1809 this.emitter?.emit(PoolEvents.backPressure, this.info)
1810 }
1811 }
1812
1813 /**
1814 * Emits dynamic worker creation events.
1815 */
1816 protected abstract checkAndEmitDynamicWorkerCreationEvents (): void
1817
1818 /**
1819 * Gets the worker information given its worker node key.
1820 *
1821 * @param workerNodeKey - The worker node key.
1822 * @returns The worker information.
1823 */
1824 protected getWorkerInfo (workerNodeKey: number): WorkerInfo | undefined {
1825 return this.workerNodes[workerNodeKey]?.info
1826 }
1827
1828 /**
1829 * Creates a worker node.
1830 *
1831 * @returns The created worker node.
1832 */
1833 private createWorkerNode (): IWorkerNode<Worker, Data> {
1834 const workerNode = new WorkerNode<Worker, Data>(
1835 this.worker,
1836 this.filePath,
1837 {
1838 env: this.opts.env,
1839 workerOptions: this.opts.workerOptions,
1840 tasksQueueBackPressureSize:
1841 this.opts.tasksQueueOptions?.size ??
1842 getDefaultTasksQueueOptions(
1843 this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
1844 ).size
1845 }
1846 )
1847 // Flag the worker node as ready at pool startup.
1848 if (this.starting) {
1849 workerNode.info.ready = true
1850 }
1851 return workerNode
1852 }
1853
1854 /**
1855 * Adds the given worker node in the pool worker nodes.
1856 *
1857 * @param workerNode - The worker node.
1858 * @returns The added worker node key.
1859 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1860 */
1861 private addWorkerNode (workerNode: IWorkerNode<Worker, Data>): number {
1862 this.workerNodes.push(workerNode)
1863 const workerNodeKey = this.workerNodes.indexOf(workerNode)
1864 if (workerNodeKey === -1) {
1865 throw new Error('Worker added not found in worker nodes')
1866 }
1867 return workerNodeKey
1868 }
1869
1870 private checkAndEmitEmptyEvent (): void {
1871 if (this.empty) {
1872 this.emitter?.emit(PoolEvents.empty, this.info)
1873 this.readyEventEmitted = false
1874 }
1875 }
1876
1877 /**
1878 * Removes the worker node from the pool worker nodes.
1879 *
1880 * @param workerNode - The worker node.
1881 */
1882 private removeWorkerNode (workerNode: IWorkerNode<Worker, Data>): void {
1883 const workerNodeKey = this.workerNodes.indexOf(workerNode)
1884 if (workerNodeKey !== -1) {
1885 this.workerNodes.splice(workerNodeKey, 1)
1886 this.workerChoiceStrategyContext?.remove(workerNodeKey)
1887 }
1888 this.checkAndEmitEmptyEvent()
1889 }
1890
1891 protected flagWorkerNodeAsNotReady (workerNodeKey: number): void {
1892 const workerInfo = this.getWorkerInfo(workerNodeKey)
1893 if (workerInfo != null) {
1894 workerInfo.ready = false
1895 }
1896 }
1897
1898 private hasBackPressure (): boolean {
1899 return (
1900 this.opts.enableTasksQueue === true &&
1901 this.workerNodes.findIndex(
1902 workerNode => !workerNode.hasBackPressure()
1903 ) === -1
1904 )
1905 }
1906
1907 /**
1908 * Executes the given task on the worker given its worker node key.
1909 *
1910 * @param workerNodeKey - The worker node key.
1911 * @param task - The task to execute.
1912 */
1913 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1914 this.beforeTaskExecutionHook(workerNodeKey, task)
1915 this.sendToWorker(workerNodeKey, task, task.transferList)
1916 this.checkAndEmitTaskExecutionEvents()
1917 }
1918
1919 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1920 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1921 this.checkAndEmitTaskQueuingEvents()
1922 return tasksQueueSize
1923 }
1924
1925 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1926 return this.workerNodes[workerNodeKey].dequeueTask()
1927 }
1928
1929 private tasksQueueSize (workerNodeKey: number): number {
1930 return this.workerNodes[workerNodeKey].tasksQueueSize()
1931 }
1932
1933 protected flushTasksQueue (workerNodeKey: number): number {
1934 let flushedTasks = 0
1935 while (this.tasksQueueSize(workerNodeKey) > 0) {
1936 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1937 this.executeTask(workerNodeKey, this.dequeueTask(workerNodeKey)!)
1938 ++flushedTasks
1939 }
1940 this.workerNodes[workerNodeKey].clearTasksQueue()
1941 return flushedTasks
1942 }
1943
1944 private flushTasksQueues (): void {
1945 for (const [workerNodeKey] of this.workerNodes.entries()) {
1946 this.flushTasksQueue(workerNodeKey)
1947 }
1948 }
1949 }