fix: refine type definition for transferList
[poolifier.git] / src / pools / abstract-pool.ts
1 import { AsyncResource } from 'node:async_hooks'
2 import { randomUUID } from 'node:crypto'
3 import { EventEmitterAsyncResource } from 'node:events'
4 import { performance } from 'node:perf_hooks'
5 import type { TransferListItem } from 'node:worker_threads'
6
7 import type {
8 MessageValue,
9 PromiseResponseWrapper,
10 Task
11 } from '../utility-types.js'
12 import {
13 average,
14 DEFAULT_TASK_NAME,
15 EMPTY_FUNCTION,
16 exponentialDelay,
17 isKillBehavior,
18 isPlainObject,
19 max,
20 median,
21 min,
22 round,
23 sleep
24 } from '../utils.js'
25 import type { TaskFunction } from '../worker/task-functions.js'
26 import { KillBehaviors } from '../worker/worker-options.js'
27 import {
28 type IPool,
29 PoolEvents,
30 type PoolInfo,
31 type PoolOptions,
32 type PoolType,
33 PoolTypes,
34 type TasksQueueOptions
35 } from './pool.js'
36 import {
37 Measurements,
38 WorkerChoiceStrategies,
39 type WorkerChoiceStrategy,
40 type WorkerChoiceStrategyOptions
41 } from './selection-strategies/selection-strategies-types.js'
42 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context.js'
43 import {
44 checkFilePath,
45 checkValidTasksQueueOptions,
46 checkValidWorkerChoiceStrategy,
47 getDefaultTasksQueueOptions,
48 updateEluWorkerUsage,
49 updateRunTimeWorkerUsage,
50 updateTaskStatisticsWorkerUsage,
51 updateWaitTimeWorkerUsage,
52 waitWorkerNodeEvents
53 } from './utils.js'
54 import { version } from './version.js'
55 import type {
56 IWorker,
57 IWorkerNode,
58 WorkerInfo,
59 WorkerNodeEventDetail,
60 WorkerType
61 } from './worker.js'
62 import { WorkerNode } from './worker-node.js'
63
64 /**
65 * Base class that implements some shared logic for all poolifier pools.
66 *
67 * @typeParam Worker - Type of worker which manages this pool.
68 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
69 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
70 */
71 export abstract class AbstractPool<
72 Worker extends IWorker,
73 Data = unknown,
74 Response = unknown
75 > implements IPool<Worker, Data, Response> {
76 /** @inheritDoc */
77 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
78
79 /** @inheritDoc */
80 public emitter?: EventEmitterAsyncResource
81
82 /**
83 * The task execution response promise map:
84 * - `key`: The message id of each submitted task.
85 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
86 *
87 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
88 */
89 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
90 new Map<string, PromiseResponseWrapper<Response>>()
91
92 /**
93 * Worker choice strategy context referencing a worker choice algorithm implementation.
94 */
95 protected workerChoiceStrategyContext?: WorkerChoiceStrategyContext<
96 Worker,
97 Data,
98 Response
99 >
100
101 /**
102 * The task functions added at runtime map:
103 * - `key`: The task function name.
104 * - `value`: The task function itself.
105 */
106 private readonly taskFunctions: Map<string, TaskFunction<Data, Response>>
107
108 /**
109 * Whether the pool is started or not.
110 */
111 private started: boolean
112 /**
113 * Whether the pool is starting or not.
114 */
115 private starting: boolean
116 /**
117 * Whether the pool is destroying or not.
118 */
119 private destroying: boolean
120 /**
121 * Whether the minimum number of workers is starting or not.
122 */
123 private startingMinimumNumberOfWorkers: boolean
124 /**
125 * Whether the pool ready event has been emitted or not.
126 */
127 private readyEventEmitted: boolean
128 /**
129 * The start timestamp of the pool.
130 */
131 private readonly startTimestamp
132
133 /**
134 * Constructs a new poolifier pool.
135 *
136 * @param minimumNumberOfWorkers - Minimum number of workers that this pool manages.
137 * @param filePath - Path to the worker file.
138 * @param opts - Options for the pool.
139 * @param maximumNumberOfWorkers - Maximum number of workers that this pool manages.
140 */
141 public constructor (
142 protected readonly minimumNumberOfWorkers: number,
143 protected readonly filePath: string,
144 protected readonly opts: PoolOptions<Worker>,
145 protected readonly maximumNumberOfWorkers?: number
146 ) {
147 if (!this.isMain()) {
148 throw new Error(
149 'Cannot start a pool from a worker with the same type as the pool'
150 )
151 }
152 this.checkPoolType()
153 checkFilePath(this.filePath)
154 this.checkMinimumNumberOfWorkers(this.minimumNumberOfWorkers)
155 this.checkPoolOptions(this.opts)
156
157 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
158 this.executeTask = this.executeTask.bind(this)
159 this.enqueueTask = this.enqueueTask.bind(this)
160
161 if (this.opts.enableEvents === true) {
162 this.initializeEventEmitter()
163 }
164 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
165 Worker,
166 Data,
167 Response
168 >(
169 this,
170 this.opts.workerChoiceStrategy,
171 this.opts.workerChoiceStrategyOptions
172 )
173
174 this.setupHook()
175
176 this.taskFunctions = new Map<string, TaskFunction<Data, Response>>()
177
178 this.started = false
179 this.starting = false
180 this.destroying = false
181 this.readyEventEmitted = false
182 this.startingMinimumNumberOfWorkers = false
183 if (this.opts.startWorkers === true) {
184 this.start()
185 }
186
187 this.startTimestamp = performance.now()
188 }
189
190 private checkPoolType (): void {
191 if (this.type === PoolTypes.fixed && this.maximumNumberOfWorkers != null) {
192 throw new Error(
193 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
194 )
195 }
196 }
197
198 private checkMinimumNumberOfWorkers (
199 minimumNumberOfWorkers: number | undefined
200 ): void {
201 if (minimumNumberOfWorkers == null) {
202 throw new Error(
203 'Cannot instantiate a pool without specifying the number of workers'
204 )
205 } else if (!Number.isSafeInteger(minimumNumberOfWorkers)) {
206 throw new TypeError(
207 'Cannot instantiate a pool with a non safe integer number of workers'
208 )
209 } else if (minimumNumberOfWorkers < 0) {
210 throw new RangeError(
211 'Cannot instantiate a pool with a negative number of workers'
212 )
213 } else if (this.type === PoolTypes.fixed && minimumNumberOfWorkers === 0) {
214 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
215 }
216 }
217
218 private checkPoolOptions (opts: PoolOptions<Worker>): void {
219 if (isPlainObject(opts)) {
220 this.opts.startWorkers = opts.startWorkers ?? true
221 checkValidWorkerChoiceStrategy(opts.workerChoiceStrategy)
222 this.opts.workerChoiceStrategy =
223 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
224 this.checkValidWorkerChoiceStrategyOptions(
225 opts.workerChoiceStrategyOptions
226 )
227 if (opts.workerChoiceStrategyOptions != null) {
228 this.opts.workerChoiceStrategyOptions = opts.workerChoiceStrategyOptions
229 }
230 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
231 this.opts.enableEvents = opts.enableEvents ?? true
232 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
233 if (this.opts.enableTasksQueue) {
234 checkValidTasksQueueOptions(opts.tasksQueueOptions)
235 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
236 opts.tasksQueueOptions
237 )
238 }
239 } else {
240 throw new TypeError('Invalid pool options: must be a plain object')
241 }
242 }
243
244 private checkValidWorkerChoiceStrategyOptions (
245 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions | undefined
246 ): void {
247 if (
248 workerChoiceStrategyOptions != null &&
249 !isPlainObject(workerChoiceStrategyOptions)
250 ) {
251 throw new TypeError(
252 'Invalid worker choice strategy options: must be a plain object'
253 )
254 }
255 if (
256 workerChoiceStrategyOptions?.weights != null &&
257 Object.keys(workerChoiceStrategyOptions.weights).length !==
258 (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
259 ) {
260 throw new Error(
261 'Invalid worker choice strategy options: must have a weight for each worker node'
262 )
263 }
264 if (
265 workerChoiceStrategyOptions?.measurement != null &&
266 !Object.values(Measurements).includes(
267 workerChoiceStrategyOptions.measurement
268 )
269 ) {
270 throw new Error(
271 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
272 )
273 }
274 }
275
276 private initializeEventEmitter (): void {
277 this.emitter = new EventEmitterAsyncResource({
278 name: `poolifier:${this.type}-${this.worker}-pool`
279 })
280 }
281
282 /** @inheritDoc */
283 public get info (): PoolInfo {
284 return {
285 version,
286 type: this.type,
287 worker: this.worker,
288 started: this.started,
289 ready: this.ready,
290 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
291 strategy: this.opts.workerChoiceStrategy!,
292 strategyRetries: this.workerChoiceStrategyContext?.retriesCount ?? 0,
293 minSize: this.minimumNumberOfWorkers,
294 maxSize: this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers,
295 ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
296 .runTime.aggregate === true &&
297 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
298 .waitTime.aggregate && {
299 utilization: round(this.utilization)
300 }),
301 workerNodes: this.workerNodes.length,
302 idleWorkerNodes: this.workerNodes.reduce(
303 (accumulator, workerNode) =>
304 workerNode.usage.tasks.executing === 0
305 ? accumulator + 1
306 : accumulator,
307 0
308 ),
309 ...(this.opts.enableTasksQueue === true && {
310 stealingWorkerNodes: this.workerNodes.reduce(
311 (accumulator, workerNode) =>
312 workerNode.info.stealing ? accumulator + 1 : accumulator,
313 0
314 )
315 }),
316 busyWorkerNodes: this.workerNodes.reduce(
317 (accumulator, _workerNode, workerNodeKey) =>
318 this.isWorkerNodeBusy(workerNodeKey) ? accumulator + 1 : accumulator,
319 0
320 ),
321 executedTasks: this.workerNodes.reduce(
322 (accumulator, workerNode) =>
323 accumulator + workerNode.usage.tasks.executed,
324 0
325 ),
326 executingTasks: this.workerNodes.reduce(
327 (accumulator, workerNode) =>
328 accumulator + workerNode.usage.tasks.executing,
329 0
330 ),
331 ...(this.opts.enableTasksQueue === true && {
332 queuedTasks: this.workerNodes.reduce(
333 (accumulator, workerNode) =>
334 accumulator + workerNode.usage.tasks.queued,
335 0
336 )
337 }),
338 ...(this.opts.enableTasksQueue === true && {
339 maxQueuedTasks: this.workerNodes.reduce(
340 (accumulator, workerNode) =>
341 accumulator + (workerNode.usage.tasks.maxQueued ?? 0),
342 0
343 )
344 }),
345 ...(this.opts.enableTasksQueue === true && {
346 backPressure: this.hasBackPressure()
347 }),
348 ...(this.opts.enableTasksQueue === true && {
349 stolenTasks: this.workerNodes.reduce(
350 (accumulator, workerNode) =>
351 accumulator + workerNode.usage.tasks.stolen,
352 0
353 )
354 }),
355 failedTasks: this.workerNodes.reduce(
356 (accumulator, workerNode) =>
357 accumulator + workerNode.usage.tasks.failed,
358 0
359 ),
360 ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
361 .runTime.aggregate === true && {
362 runTime: {
363 minimum: round(
364 min(
365 ...this.workerNodes.map(
366 workerNode => workerNode.usage.runTime.minimum ?? Infinity
367 )
368 )
369 ),
370 maximum: round(
371 max(
372 ...this.workerNodes.map(
373 workerNode => workerNode.usage.runTime.maximum ?? -Infinity
374 )
375 )
376 ),
377 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
378 .runTime.average && {
379 average: round(
380 average(
381 this.workerNodes.reduce<number[]>(
382 (accumulator, workerNode) =>
383 accumulator.concat(workerNode.usage.runTime.history),
384 []
385 )
386 )
387 )
388 }),
389 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
390 .runTime.median && {
391 median: round(
392 median(
393 this.workerNodes.reduce<number[]>(
394 (accumulator, workerNode) =>
395 accumulator.concat(workerNode.usage.runTime.history),
396 []
397 )
398 )
399 )
400 })
401 }
402 }),
403 ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
404 .waitTime.aggregate === true && {
405 waitTime: {
406 minimum: round(
407 min(
408 ...this.workerNodes.map(
409 workerNode => workerNode.usage.waitTime.minimum ?? Infinity
410 )
411 )
412 ),
413 maximum: round(
414 max(
415 ...this.workerNodes.map(
416 workerNode => workerNode.usage.waitTime.maximum ?? -Infinity
417 )
418 )
419 ),
420 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
421 .waitTime.average && {
422 average: round(
423 average(
424 this.workerNodes.reduce<number[]>(
425 (accumulator, workerNode) =>
426 accumulator.concat(workerNode.usage.waitTime.history),
427 []
428 )
429 )
430 )
431 }),
432 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
433 .waitTime.median && {
434 median: round(
435 median(
436 this.workerNodes.reduce<number[]>(
437 (accumulator, workerNode) =>
438 accumulator.concat(workerNode.usage.waitTime.history),
439 []
440 )
441 )
442 )
443 })
444 }
445 })
446 }
447 }
448
449 /**
450 * The pool readiness boolean status.
451 */
452 private get ready (): boolean {
453 if (this.empty) {
454 return false
455 }
456 return (
457 this.workerNodes.reduce(
458 (accumulator, workerNode) =>
459 !workerNode.info.dynamic && workerNode.info.ready
460 ? accumulator + 1
461 : accumulator,
462 0
463 ) >= this.minimumNumberOfWorkers
464 )
465 }
466
467 /**
468 * The pool emptiness boolean status.
469 */
470 protected get empty (): boolean {
471 return this.minimumNumberOfWorkers === 0 && this.workerNodes.length === 0
472 }
473
474 /**
475 * The approximate pool utilization.
476 *
477 * @returns The pool utilization.
478 */
479 private get utilization (): number {
480 const poolTimeCapacity =
481 (performance.now() - this.startTimestamp) *
482 (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
483 const totalTasksRunTime = this.workerNodes.reduce(
484 (accumulator, workerNode) =>
485 accumulator + (workerNode.usage.runTime.aggregate ?? 0),
486 0
487 )
488 const totalTasksWaitTime = this.workerNodes.reduce(
489 (accumulator, workerNode) =>
490 accumulator + (workerNode.usage.waitTime.aggregate ?? 0),
491 0
492 )
493 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
494 }
495
496 /**
497 * The pool type.
498 *
499 * If it is `'dynamic'`, it provides the `max` property.
500 */
501 protected abstract get type (): PoolType
502
503 /**
504 * The worker type.
505 */
506 protected abstract get worker (): WorkerType
507
508 /**
509 * Checks if the worker id sent in the received message from a worker is valid.
510 *
511 * @param message - The received message.
512 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
513 */
514 private checkMessageWorkerId (message: MessageValue<Data | Response>): void {
515 if (message.workerId == null) {
516 throw new Error('Worker message received without worker id')
517 } else if (this.getWorkerNodeKeyByWorkerId(message.workerId) === -1) {
518 throw new Error(
519 `Worker message received from unknown worker '${message.workerId}'`
520 )
521 }
522 }
523
524 /**
525 * Gets the worker node key given its worker id.
526 *
527 * @param workerId - The worker id.
528 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
529 */
530 private getWorkerNodeKeyByWorkerId (workerId: number | undefined): number {
531 return this.workerNodes.findIndex(
532 workerNode => workerNode.info.id === workerId
533 )
534 }
535
536 /** @inheritDoc */
537 public setWorkerChoiceStrategy (
538 workerChoiceStrategy: WorkerChoiceStrategy,
539 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
540 ): void {
541 checkValidWorkerChoiceStrategy(workerChoiceStrategy)
542 this.opts.workerChoiceStrategy = workerChoiceStrategy
543 this.workerChoiceStrategyContext?.setWorkerChoiceStrategy(
544 this.opts.workerChoiceStrategy
545 )
546 if (workerChoiceStrategyOptions != null) {
547 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
548 }
549 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
550 workerNode.resetUsage()
551 this.sendStatisticsMessageToWorker(workerNodeKey)
552 }
553 }
554
555 /** @inheritDoc */
556 public setWorkerChoiceStrategyOptions (
557 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions | undefined
558 ): void {
559 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
560 if (workerChoiceStrategyOptions != null) {
561 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
562 }
563 this.workerChoiceStrategyContext?.setOptions(
564 this.opts.workerChoiceStrategyOptions
565 )
566 }
567
568 /** @inheritDoc */
569 public enableTasksQueue (
570 enable: boolean,
571 tasksQueueOptions?: TasksQueueOptions
572 ): void {
573 if (this.opts.enableTasksQueue === true && !enable) {
574 this.unsetTaskStealing()
575 this.unsetTasksStealingOnBackPressure()
576 this.flushTasksQueues()
577 }
578 this.opts.enableTasksQueue = enable
579 this.setTasksQueueOptions(tasksQueueOptions)
580 }
581
582 /** @inheritDoc */
583 public setTasksQueueOptions (
584 tasksQueueOptions: TasksQueueOptions | undefined
585 ): void {
586 if (this.opts.enableTasksQueue === true) {
587 checkValidTasksQueueOptions(tasksQueueOptions)
588 this.opts.tasksQueueOptions =
589 this.buildTasksQueueOptions(tasksQueueOptions)
590 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
591 this.setTasksQueueSize(this.opts.tasksQueueOptions.size!)
592 if (this.opts.tasksQueueOptions.taskStealing === true) {
593 this.unsetTaskStealing()
594 this.setTaskStealing()
595 } else {
596 this.unsetTaskStealing()
597 }
598 if (this.opts.tasksQueueOptions.tasksStealingOnBackPressure === true) {
599 this.unsetTasksStealingOnBackPressure()
600 this.setTasksStealingOnBackPressure()
601 } else {
602 this.unsetTasksStealingOnBackPressure()
603 }
604 } else if (this.opts.tasksQueueOptions != null) {
605 delete this.opts.tasksQueueOptions
606 }
607 }
608
609 private buildTasksQueueOptions (
610 tasksQueueOptions: TasksQueueOptions | undefined
611 ): TasksQueueOptions {
612 return {
613 ...getDefaultTasksQueueOptions(
614 this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
615 ),
616 ...tasksQueueOptions
617 }
618 }
619
620 private setTasksQueueSize (size: number): void {
621 for (const workerNode of this.workerNodes) {
622 workerNode.tasksQueueBackPressureSize = size
623 }
624 }
625
626 private setTaskStealing (): void {
627 for (const [workerNodeKey] of this.workerNodes.entries()) {
628 this.workerNodes[workerNodeKey].on('idle', this.handleWorkerNodeIdleEvent)
629 }
630 }
631
632 private unsetTaskStealing (): void {
633 for (const [workerNodeKey] of this.workerNodes.entries()) {
634 this.workerNodes[workerNodeKey].off(
635 'idle',
636 this.handleWorkerNodeIdleEvent
637 )
638 }
639 }
640
641 private setTasksStealingOnBackPressure (): void {
642 for (const [workerNodeKey] of this.workerNodes.entries()) {
643 this.workerNodes[workerNodeKey].on(
644 'backPressure',
645 this.handleWorkerNodeBackPressureEvent
646 )
647 }
648 }
649
650 private unsetTasksStealingOnBackPressure (): void {
651 for (const [workerNodeKey] of this.workerNodes.entries()) {
652 this.workerNodes[workerNodeKey].off(
653 'backPressure',
654 this.handleWorkerNodeBackPressureEvent
655 )
656 }
657 }
658
659 /**
660 * Whether the pool is full or not.
661 *
662 * The pool filling boolean status.
663 */
664 protected get full (): boolean {
665 return (
666 this.workerNodes.length >=
667 (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
668 )
669 }
670
671 /**
672 * Whether the pool is busy or not.
673 *
674 * The pool busyness boolean status.
675 */
676 protected abstract get busy (): boolean
677
678 /**
679 * Whether worker nodes are executing concurrently their tasks quota or not.
680 *
681 * @returns Worker nodes busyness boolean status.
682 */
683 protected internalBusy (): boolean {
684 if (this.opts.enableTasksQueue === true) {
685 return (
686 this.workerNodes.findIndex(
687 workerNode =>
688 workerNode.info.ready &&
689 workerNode.usage.tasks.executing <
690 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
691 this.opts.tasksQueueOptions!.concurrency!
692 ) === -1
693 )
694 }
695 return (
696 this.workerNodes.findIndex(
697 workerNode =>
698 workerNode.info.ready && workerNode.usage.tasks.executing === 0
699 ) === -1
700 )
701 }
702
703 private isWorkerNodeBusy (workerNodeKey: number): boolean {
704 if (this.opts.enableTasksQueue === true) {
705 return (
706 this.workerNodes[workerNodeKey].usage.tasks.executing >=
707 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
708 this.opts.tasksQueueOptions!.concurrency!
709 )
710 }
711 return this.workerNodes[workerNodeKey].usage.tasks.executing > 0
712 }
713
714 private async sendTaskFunctionOperationToWorker (
715 workerNodeKey: number,
716 message: MessageValue<Data>
717 ): Promise<boolean> {
718 return await new Promise<boolean>((resolve, reject) => {
719 const taskFunctionOperationListener = (
720 message: MessageValue<Response>
721 ): void => {
722 this.checkMessageWorkerId(message)
723 const workerId = this.getWorkerInfo(workerNodeKey)?.id
724 if (
725 message.taskFunctionOperationStatus != null &&
726 message.workerId === workerId
727 ) {
728 if (message.taskFunctionOperationStatus) {
729 resolve(true)
730 } else {
731 reject(
732 new Error(
733 `Task function operation '${message.taskFunctionOperation}' failed on worker ${message.workerId} with error: '${message.workerError?.message}'`
734 )
735 )
736 }
737 this.deregisterWorkerMessageListener(
738 this.getWorkerNodeKeyByWorkerId(message.workerId),
739 taskFunctionOperationListener
740 )
741 }
742 }
743 this.registerWorkerMessageListener(
744 workerNodeKey,
745 taskFunctionOperationListener
746 )
747 this.sendToWorker(workerNodeKey, message)
748 })
749 }
750
751 private async sendTaskFunctionOperationToWorkers (
752 message: MessageValue<Data>
753 ): Promise<boolean> {
754 return await new Promise<boolean>((resolve, reject) => {
755 const responsesReceived = new Array<MessageValue<Response>>()
756 const taskFunctionOperationsListener = (
757 message: MessageValue<Response>
758 ): void => {
759 this.checkMessageWorkerId(message)
760 if (message.taskFunctionOperationStatus != null) {
761 responsesReceived.push(message)
762 if (responsesReceived.length === this.workerNodes.length) {
763 if (
764 responsesReceived.every(
765 message => message.taskFunctionOperationStatus === true
766 )
767 ) {
768 resolve(true)
769 } else if (
770 responsesReceived.some(
771 message => message.taskFunctionOperationStatus === false
772 )
773 ) {
774 const errorResponse = responsesReceived.find(
775 response => response.taskFunctionOperationStatus === false
776 )
777 reject(
778 new Error(
779 `Task function operation '${
780 message.taskFunctionOperation as string
781 }' failed on worker ${errorResponse?.workerId} with error: '${
782 errorResponse?.workerError?.message
783 }'`
784 )
785 )
786 }
787 this.deregisterWorkerMessageListener(
788 this.getWorkerNodeKeyByWorkerId(message.workerId),
789 taskFunctionOperationsListener
790 )
791 }
792 }
793 }
794 for (const [workerNodeKey] of this.workerNodes.entries()) {
795 this.registerWorkerMessageListener(
796 workerNodeKey,
797 taskFunctionOperationsListener
798 )
799 this.sendToWorker(workerNodeKey, message)
800 }
801 })
802 }
803
804 /** @inheritDoc */
805 public hasTaskFunction (name: string): boolean {
806 for (const workerNode of this.workerNodes) {
807 if (
808 Array.isArray(workerNode.info.taskFunctionNames) &&
809 workerNode.info.taskFunctionNames.includes(name)
810 ) {
811 return true
812 }
813 }
814 return false
815 }
816
817 /** @inheritDoc */
818 public async addTaskFunction (
819 name: string,
820 fn: TaskFunction<Data, Response>
821 ): Promise<boolean> {
822 if (typeof name !== 'string') {
823 throw new TypeError('name argument must be a string')
824 }
825 if (typeof name === 'string' && name.trim().length === 0) {
826 throw new TypeError('name argument must not be an empty string')
827 }
828 if (typeof fn !== 'function') {
829 throw new TypeError('fn argument must be a function')
830 }
831 const opResult = await this.sendTaskFunctionOperationToWorkers({
832 taskFunctionOperation: 'add',
833 taskFunctionName: name,
834 taskFunction: fn.toString()
835 })
836 this.taskFunctions.set(name, fn)
837 return opResult
838 }
839
840 /** @inheritDoc */
841 public async removeTaskFunction (name: string): Promise<boolean> {
842 if (!this.taskFunctions.has(name)) {
843 throw new Error(
844 'Cannot remove a task function not handled on the pool side'
845 )
846 }
847 const opResult = await this.sendTaskFunctionOperationToWorkers({
848 taskFunctionOperation: 'remove',
849 taskFunctionName: name
850 })
851 this.deleteTaskFunctionWorkerUsages(name)
852 this.taskFunctions.delete(name)
853 return opResult
854 }
855
856 /** @inheritDoc */
857 public listTaskFunctionNames (): string[] {
858 for (const workerNode of this.workerNodes) {
859 if (
860 Array.isArray(workerNode.info.taskFunctionNames) &&
861 workerNode.info.taskFunctionNames.length > 0
862 ) {
863 return workerNode.info.taskFunctionNames
864 }
865 }
866 return []
867 }
868
869 /** @inheritDoc */
870 public async setDefaultTaskFunction (name: string): Promise<boolean> {
871 return await this.sendTaskFunctionOperationToWorkers({
872 taskFunctionOperation: 'default',
873 taskFunctionName: name
874 })
875 }
876
877 private deleteTaskFunctionWorkerUsages (name: string): void {
878 for (const workerNode of this.workerNodes) {
879 workerNode.deleteTaskFunctionWorkerUsage(name)
880 }
881 }
882
883 private shallExecuteTask (workerNodeKey: number): boolean {
884 return (
885 this.tasksQueueSize(workerNodeKey) === 0 &&
886 this.workerNodes[workerNodeKey].usage.tasks.executing <
887 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
888 this.opts.tasksQueueOptions!.concurrency!
889 )
890 }
891
892 /** @inheritDoc */
893 public async execute (
894 data?: Data,
895 name?: string,
896 transferList?: readonly TransferListItem[]
897 ): Promise<Response> {
898 return await new Promise<Response>((resolve, reject) => {
899 if (!this.started) {
900 reject(new Error('Cannot execute a task on not started pool'))
901 return
902 }
903 if (this.destroying) {
904 reject(new Error('Cannot execute a task on destroying pool'))
905 return
906 }
907 if (name != null && typeof name !== 'string') {
908 reject(new TypeError('name argument must be a string'))
909 return
910 }
911 if (
912 name != null &&
913 typeof name === 'string' &&
914 name.trim().length === 0
915 ) {
916 reject(new TypeError('name argument must not be an empty string'))
917 return
918 }
919 if (transferList != null && !Array.isArray(transferList)) {
920 reject(new TypeError('transferList argument must be an array'))
921 return
922 }
923 const timestamp = performance.now()
924 const workerNodeKey = this.chooseWorkerNode()
925 const task: Task<Data> = {
926 name: name ?? DEFAULT_TASK_NAME,
927 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
928 data: data ?? ({} as Data),
929 transferList,
930 timestamp,
931 taskId: randomUUID()
932 }
933 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
934 this.promiseResponseMap.set(task.taskId!, {
935 resolve,
936 reject,
937 workerNodeKey,
938 ...(this.emitter != null && {
939 asyncResource: new AsyncResource('poolifier:task', {
940 triggerAsyncId: this.emitter.asyncId,
941 requireManualDestroy: true
942 })
943 })
944 })
945 if (
946 this.opts.enableTasksQueue === false ||
947 (this.opts.enableTasksQueue === true &&
948 this.shallExecuteTask(workerNodeKey))
949 ) {
950 this.executeTask(workerNodeKey, task)
951 } else {
952 this.enqueueTask(workerNodeKey, task)
953 }
954 })
955 }
956
957 /**
958 * Starts the minimum number of workers.
959 */
960 private startMinimumNumberOfWorkers (): void {
961 this.startingMinimumNumberOfWorkers = true
962 while (
963 this.workerNodes.reduce(
964 (accumulator, workerNode) =>
965 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
966 0
967 ) < this.minimumNumberOfWorkers
968 ) {
969 this.createAndSetupWorkerNode()
970 }
971 this.startingMinimumNumberOfWorkers = false
972 }
973
974 /** @inheritdoc */
975 public start (): void {
976 if (this.started) {
977 throw new Error('Cannot start an already started pool')
978 }
979 if (this.starting) {
980 throw new Error('Cannot start an already starting pool')
981 }
982 if (this.destroying) {
983 throw new Error('Cannot start a destroying pool')
984 }
985 this.starting = true
986 this.startMinimumNumberOfWorkers()
987 this.starting = false
988 this.started = true
989 }
990
991 /** @inheritDoc */
992 public async destroy (): Promise<void> {
993 if (!this.started) {
994 throw new Error('Cannot destroy an already destroyed pool')
995 }
996 if (this.starting) {
997 throw new Error('Cannot destroy an starting pool')
998 }
999 if (this.destroying) {
1000 throw new Error('Cannot destroy an already destroying pool')
1001 }
1002 this.destroying = true
1003 await Promise.all(
1004 this.workerNodes.map(async (_workerNode, workerNodeKey) => {
1005 await this.destroyWorkerNode(workerNodeKey)
1006 })
1007 )
1008 this.emitter?.emit(PoolEvents.destroy, this.info)
1009 this.emitter?.emitDestroy()
1010 this.readyEventEmitted = false
1011 this.destroying = false
1012 this.started = false
1013 }
1014
1015 private async sendKillMessageToWorker (workerNodeKey: number): Promise<void> {
1016 await new Promise<void>((resolve, reject) => {
1017 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1018 if (this.workerNodes[workerNodeKey] == null) {
1019 resolve()
1020 return
1021 }
1022 const killMessageListener = (message: MessageValue<Response>): void => {
1023 this.checkMessageWorkerId(message)
1024 if (message.kill === 'success') {
1025 resolve()
1026 } else if (message.kill === 'failure') {
1027 reject(
1028 new Error(
1029 `Kill message handling failed on worker ${message.workerId}`
1030 )
1031 )
1032 }
1033 }
1034 // FIXME: should be registered only once
1035 this.registerWorkerMessageListener(workerNodeKey, killMessageListener)
1036 this.sendToWorker(workerNodeKey, { kill: true })
1037 })
1038 }
1039
1040 /**
1041 * Terminates the worker node given its worker node key.
1042 *
1043 * @param workerNodeKey - The worker node key.
1044 */
1045 protected async destroyWorkerNode (workerNodeKey: number): Promise<void> {
1046 this.flagWorkerNodeAsNotReady(workerNodeKey)
1047 const flushedTasks = this.flushTasksQueue(workerNodeKey)
1048 const workerNode = this.workerNodes[workerNodeKey]
1049 await waitWorkerNodeEvents(
1050 workerNode,
1051 'taskFinished',
1052 flushedTasks,
1053 this.opts.tasksQueueOptions?.tasksFinishedTimeout ??
1054 getDefaultTasksQueueOptions(
1055 this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
1056 ).tasksFinishedTimeout
1057 )
1058 await this.sendKillMessageToWorker(workerNodeKey)
1059 await workerNode.terminate()
1060 }
1061
1062 /**
1063 * Setup hook to execute code before worker nodes are created in the abstract constructor.
1064 * Can be overridden.
1065 *
1066 * @virtual
1067 */
1068 protected setupHook (): void {
1069 /* Intentionally empty */
1070 }
1071
1072 /**
1073 * Returns whether the worker is the main worker or not.
1074 *
1075 * @returns `true` if the worker is the main worker, `false` otherwise.
1076 */
1077 protected abstract isMain (): boolean
1078
1079 /**
1080 * Hook executed before the worker task execution.
1081 * Can be overridden.
1082 *
1083 * @param workerNodeKey - The worker node key.
1084 * @param task - The task to execute.
1085 */
1086 protected beforeTaskExecutionHook (
1087 workerNodeKey: number,
1088 task: Task<Data>
1089 ): void {
1090 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1091 if (this.workerNodes[workerNodeKey]?.usage != null) {
1092 const workerUsage = this.workerNodes[workerNodeKey].usage
1093 ++workerUsage.tasks.executing
1094 updateWaitTimeWorkerUsage(
1095 this.workerChoiceStrategyContext,
1096 workerUsage,
1097 task
1098 )
1099 }
1100 if (
1101 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1102 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1103 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(task.name!) !=
1104 null
1105 ) {
1106 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1107 const taskFunctionWorkerUsage = this.workerNodes[
1108 workerNodeKey
1109 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1110 ].getTaskFunctionWorkerUsage(task.name!)!
1111 ++taskFunctionWorkerUsage.tasks.executing
1112 updateWaitTimeWorkerUsage(
1113 this.workerChoiceStrategyContext,
1114 taskFunctionWorkerUsage,
1115 task
1116 )
1117 }
1118 }
1119
1120 /**
1121 * Hook executed after the worker task execution.
1122 * Can be overridden.
1123 *
1124 * @param workerNodeKey - The worker node key.
1125 * @param message - The received message.
1126 */
1127 protected afterTaskExecutionHook (
1128 workerNodeKey: number,
1129 message: MessageValue<Response>
1130 ): void {
1131 let needWorkerChoiceStrategyUpdate = false
1132 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1133 if (this.workerNodes[workerNodeKey]?.usage != null) {
1134 const workerUsage = this.workerNodes[workerNodeKey].usage
1135 updateTaskStatisticsWorkerUsage(workerUsage, message)
1136 updateRunTimeWorkerUsage(
1137 this.workerChoiceStrategyContext,
1138 workerUsage,
1139 message
1140 )
1141 updateEluWorkerUsage(
1142 this.workerChoiceStrategyContext,
1143 workerUsage,
1144 message
1145 )
1146 needWorkerChoiceStrategyUpdate = true
1147 }
1148 if (
1149 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1150 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
1151 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1152 message.taskPerformance!.name
1153 ) != null
1154 ) {
1155 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1156 const taskFunctionWorkerUsage = this.workerNodes[
1157 workerNodeKey
1158 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1159 ].getTaskFunctionWorkerUsage(message.taskPerformance!.name)!
1160 updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
1161 updateRunTimeWorkerUsage(
1162 this.workerChoiceStrategyContext,
1163 taskFunctionWorkerUsage,
1164 message
1165 )
1166 updateEluWorkerUsage(
1167 this.workerChoiceStrategyContext,
1168 taskFunctionWorkerUsage,
1169 message
1170 )
1171 needWorkerChoiceStrategyUpdate = true
1172 }
1173 if (needWorkerChoiceStrategyUpdate) {
1174 this.workerChoiceStrategyContext?.update(workerNodeKey)
1175 }
1176 }
1177
1178 /**
1179 * Whether the worker node shall update its task function worker usage or not.
1180 *
1181 * @param workerNodeKey - The worker node key.
1182 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1183 */
1184 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
1185 const workerInfo = this.getWorkerInfo(workerNodeKey)
1186 return (
1187 workerInfo != null &&
1188 Array.isArray(workerInfo.taskFunctionNames) &&
1189 workerInfo.taskFunctionNames.length > 2
1190 )
1191 }
1192
1193 /**
1194 * Chooses a worker node for the next task.
1195 *
1196 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
1197 *
1198 * @returns The chosen worker node key
1199 */
1200 private chooseWorkerNode (): number {
1201 if (this.shallCreateDynamicWorker()) {
1202 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
1203 if (
1204 this.workerChoiceStrategyContext?.getStrategyPolicy()
1205 .dynamicWorkerUsage === true
1206 ) {
1207 return workerNodeKey
1208 }
1209 }
1210 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1211 return this.workerChoiceStrategyContext!.execute()
1212 }
1213
1214 /**
1215 * Conditions for dynamic worker creation.
1216 *
1217 * @returns Whether to create a dynamic worker or not.
1218 */
1219 protected abstract shallCreateDynamicWorker (): boolean
1220
1221 /**
1222 * Sends a message to worker given its worker node key.
1223 *
1224 * @param workerNodeKey - The worker node key.
1225 * @param message - The message.
1226 * @param transferList - The optional array of transferable objects.
1227 */
1228 protected abstract sendToWorker (
1229 workerNodeKey: number,
1230 message: MessageValue<Data>,
1231 transferList?: readonly TransferListItem[]
1232 ): void
1233
1234 /**
1235 * Creates a new, completely set up worker node.
1236 *
1237 * @returns New, completely set up worker node key.
1238 */
1239 protected createAndSetupWorkerNode (): number {
1240 const workerNode = this.createWorkerNode()
1241 workerNode.registerWorkerEventHandler(
1242 'online',
1243 this.opts.onlineHandler ?? EMPTY_FUNCTION
1244 )
1245 workerNode.registerWorkerEventHandler(
1246 'message',
1247 this.opts.messageHandler ?? EMPTY_FUNCTION
1248 )
1249 workerNode.registerWorkerEventHandler(
1250 'error',
1251 this.opts.errorHandler ?? EMPTY_FUNCTION
1252 )
1253 workerNode.registerOnceWorkerEventHandler('error', (error: Error) => {
1254 workerNode.info.ready = false
1255 this.emitter?.emit(PoolEvents.error, error)
1256 if (
1257 this.started &&
1258 !this.destroying &&
1259 this.opts.restartWorkerOnError === true
1260 ) {
1261 if (workerNode.info.dynamic) {
1262 this.createAndSetupDynamicWorkerNode()
1263 } else if (!this.startingMinimumNumberOfWorkers) {
1264 this.startMinimumNumberOfWorkers()
1265 }
1266 }
1267 if (
1268 this.started &&
1269 !this.destroying &&
1270 this.opts.enableTasksQueue === true
1271 ) {
1272 this.redistributeQueuedTasks(this.workerNodes.indexOf(workerNode))
1273 }
1274 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1275 workerNode?.terminate().catch((error: unknown) => {
1276 this.emitter?.emit(PoolEvents.error, error)
1277 })
1278 })
1279 workerNode.registerWorkerEventHandler(
1280 'exit',
1281 this.opts.exitHandler ?? EMPTY_FUNCTION
1282 )
1283 workerNode.registerOnceWorkerEventHandler('exit', () => {
1284 this.removeWorkerNode(workerNode)
1285 if (
1286 this.started &&
1287 !this.startingMinimumNumberOfWorkers &&
1288 !this.destroying
1289 ) {
1290 this.startMinimumNumberOfWorkers()
1291 }
1292 })
1293 const workerNodeKey = this.addWorkerNode(workerNode)
1294 this.afterWorkerNodeSetup(workerNodeKey)
1295 return workerNodeKey
1296 }
1297
1298 /**
1299 * Creates a new, completely set up dynamic worker node.
1300 *
1301 * @returns New, completely set up dynamic worker node key.
1302 */
1303 protected createAndSetupDynamicWorkerNode (): number {
1304 const workerNodeKey = this.createAndSetupWorkerNode()
1305 this.registerWorkerMessageListener(workerNodeKey, message => {
1306 this.checkMessageWorkerId(message)
1307 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1308 message.workerId
1309 )
1310 const workerUsage = this.workerNodes[localWorkerNodeKey]?.usage
1311 // Kill message received from worker
1312 if (
1313 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1314 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
1315 ((this.opts.enableTasksQueue === false &&
1316 workerUsage.tasks.executing === 0) ||
1317 (this.opts.enableTasksQueue === true &&
1318 workerUsage.tasks.executing === 0 &&
1319 this.tasksQueueSize(localWorkerNodeKey) === 0)))
1320 ) {
1321 // Flag the worker node as not ready immediately
1322 this.flagWorkerNodeAsNotReady(localWorkerNodeKey)
1323 this.destroyWorkerNode(localWorkerNodeKey).catch((error: unknown) => {
1324 this.emitter?.emit(PoolEvents.error, error)
1325 })
1326 }
1327 })
1328 this.sendToWorker(workerNodeKey, {
1329 checkActive: true
1330 })
1331 if (this.taskFunctions.size > 0) {
1332 for (const [taskFunctionName, taskFunction] of this.taskFunctions) {
1333 this.sendTaskFunctionOperationToWorker(workerNodeKey, {
1334 taskFunctionOperation: 'add',
1335 taskFunctionName,
1336 taskFunction: taskFunction.toString()
1337 }).catch((error: unknown) => {
1338 this.emitter?.emit(PoolEvents.error, error)
1339 })
1340 }
1341 }
1342 const workerNode = this.workerNodes[workerNodeKey]
1343 workerNode.info.dynamic = true
1344 if (
1345 this.workerChoiceStrategyContext?.getStrategyPolicy()
1346 .dynamicWorkerReady === true ||
1347 this.workerChoiceStrategyContext?.getStrategyPolicy()
1348 .dynamicWorkerUsage === true
1349 ) {
1350 workerNode.info.ready = true
1351 }
1352 this.checkAndEmitDynamicWorkerCreationEvents()
1353 return workerNodeKey
1354 }
1355
1356 /**
1357 * Registers a listener callback on the worker given its worker node key.
1358 *
1359 * @param workerNodeKey - The worker node key.
1360 * @param listener - The message listener callback.
1361 */
1362 protected abstract registerWorkerMessageListener<
1363 Message extends Data | Response
1364 >(
1365 workerNodeKey: number,
1366 listener: (message: MessageValue<Message>) => void
1367 ): void
1368
1369 /**
1370 * Registers once a listener callback on the worker given its worker node key.
1371 *
1372 * @param workerNodeKey - The worker node key.
1373 * @param listener - The message listener callback.
1374 */
1375 protected abstract registerOnceWorkerMessageListener<
1376 Message extends Data | Response
1377 >(
1378 workerNodeKey: number,
1379 listener: (message: MessageValue<Message>) => void
1380 ): void
1381
1382 /**
1383 * Deregisters a listener callback on the worker given its worker node key.
1384 *
1385 * @param workerNodeKey - The worker node key.
1386 * @param listener - The message listener callback.
1387 */
1388 protected abstract deregisterWorkerMessageListener<
1389 Message extends Data | Response
1390 >(
1391 workerNodeKey: number,
1392 listener: (message: MessageValue<Message>) => void
1393 ): void
1394
1395 /**
1396 * Method hooked up after a worker node has been newly created.
1397 * Can be overridden.
1398 *
1399 * @param workerNodeKey - The newly created worker node key.
1400 */
1401 protected afterWorkerNodeSetup (workerNodeKey: number): void {
1402 // Listen to worker messages.
1403 this.registerWorkerMessageListener(
1404 workerNodeKey,
1405 this.workerMessageListener
1406 )
1407 // Send the startup message to worker.
1408 this.sendStartupMessageToWorker(workerNodeKey)
1409 // Send the statistics message to worker.
1410 this.sendStatisticsMessageToWorker(workerNodeKey)
1411 if (this.opts.enableTasksQueue === true) {
1412 if (this.opts.tasksQueueOptions?.taskStealing === true) {
1413 this.workerNodes[workerNodeKey].on(
1414 'idle',
1415 this.handleWorkerNodeIdleEvent
1416 )
1417 }
1418 if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
1419 this.workerNodes[workerNodeKey].on(
1420 'backPressure',
1421 this.handleWorkerNodeBackPressureEvent
1422 )
1423 }
1424 }
1425 }
1426
1427 /**
1428 * Sends the startup message to worker given its worker node key.
1429 *
1430 * @param workerNodeKey - The worker node key.
1431 */
1432 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1433
1434 /**
1435 * Sends the statistics message to worker given its worker node key.
1436 *
1437 * @param workerNodeKey - The worker node key.
1438 */
1439 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
1440 this.sendToWorker(workerNodeKey, {
1441 statistics: {
1442 runTime:
1443 this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
1444 .runTime.aggregate ?? false,
1445 elu:
1446 this.workerChoiceStrategyContext?.getTaskStatisticsRequirements().elu
1447 .aggregate ?? false
1448 }
1449 })
1450 }
1451
1452 private cannotStealTask (): boolean {
1453 return this.workerNodes.length <= 1 || this.info.queuedTasks === 0
1454 }
1455
1456 private handleTask (workerNodeKey: number, task: Task<Data>): void {
1457 if (this.shallExecuteTask(workerNodeKey)) {
1458 this.executeTask(workerNodeKey, task)
1459 } else {
1460 this.enqueueTask(workerNodeKey, task)
1461 }
1462 }
1463
1464 private redistributeQueuedTasks (workerNodeKey: number): void {
1465 if (workerNodeKey === -1 || this.cannotStealTask()) {
1466 return
1467 }
1468 while (this.tasksQueueSize(workerNodeKey) > 0) {
1469 const destinationWorkerNodeKey = this.workerNodes.reduce(
1470 (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
1471 return workerNode.info.ready &&
1472 workerNode.usage.tasks.queued <
1473 workerNodes[minWorkerNodeKey].usage.tasks.queued
1474 ? workerNodeKey
1475 : minWorkerNodeKey
1476 },
1477 0
1478 )
1479 this.handleTask(
1480 destinationWorkerNodeKey,
1481 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1482 this.dequeueTask(workerNodeKey)!
1483 )
1484 }
1485 }
1486
1487 private updateTaskStolenStatisticsWorkerUsage (
1488 workerNodeKey: number,
1489 taskName: string
1490 ): void {
1491 const workerNode = this.workerNodes[workerNodeKey]
1492 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1493 if (workerNode?.usage != null) {
1494 ++workerNode.usage.tasks.stolen
1495 }
1496 if (
1497 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1498 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1499 ) {
1500 const taskFunctionWorkerUsage =
1501 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1502 workerNode.getTaskFunctionWorkerUsage(taskName)!
1503 ++taskFunctionWorkerUsage.tasks.stolen
1504 }
1505 }
1506
1507 private updateTaskSequentiallyStolenStatisticsWorkerUsage (
1508 workerNodeKey: number
1509 ): void {
1510 const workerNode = this.workerNodes[workerNodeKey]
1511 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1512 if (workerNode?.usage != null) {
1513 ++workerNode.usage.tasks.sequentiallyStolen
1514 }
1515 }
1516
1517 private updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1518 workerNodeKey: number,
1519 taskName: string
1520 ): void {
1521 const workerNode = this.workerNodes[workerNodeKey]
1522 if (
1523 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1524 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1525 ) {
1526 const taskFunctionWorkerUsage =
1527 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1528 workerNode.getTaskFunctionWorkerUsage(taskName)!
1529 ++taskFunctionWorkerUsage.tasks.sequentiallyStolen
1530 }
1531 }
1532
1533 private resetTaskSequentiallyStolenStatisticsWorkerUsage (
1534 workerNodeKey: number
1535 ): void {
1536 const workerNode = this.workerNodes[workerNodeKey]
1537 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1538 if (workerNode?.usage != null) {
1539 workerNode.usage.tasks.sequentiallyStolen = 0
1540 }
1541 }
1542
1543 private resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1544 workerNodeKey: number,
1545 taskName: string
1546 ): void {
1547 const workerNode = this.workerNodes[workerNodeKey]
1548 if (
1549 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1550 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1551 ) {
1552 const taskFunctionWorkerUsage =
1553 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1554 workerNode.getTaskFunctionWorkerUsage(taskName)!
1555 taskFunctionWorkerUsage.tasks.sequentiallyStolen = 0
1556 }
1557 }
1558
1559 private readonly handleWorkerNodeIdleEvent = (
1560 eventDetail: WorkerNodeEventDetail,
1561 previousStolenTask?: Task<Data>
1562 ): void => {
1563 const { workerNodeKey } = eventDetail
1564 if (workerNodeKey == null) {
1565 throw new Error(
1566 "WorkerNode event detail 'workerNodeKey' property must be defined"
1567 )
1568 }
1569 const workerInfo = this.getWorkerInfo(workerNodeKey)
1570 if (
1571 this.cannotStealTask() ||
1572 (this.info.stealingWorkerNodes ?? 0) >
1573 Math.floor(this.workerNodes.length / 2)
1574 ) {
1575 if (workerInfo != null && previousStolenTask != null) {
1576 workerInfo.stealing = false
1577 }
1578 return
1579 }
1580 const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks
1581 if (
1582 workerInfo != null &&
1583 previousStolenTask != null &&
1584 workerNodeTasksUsage.sequentiallyStolen > 0 &&
1585 (workerNodeTasksUsage.executing > 0 ||
1586 this.tasksQueueSize(workerNodeKey) > 0)
1587 ) {
1588 workerInfo.stealing = false
1589 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1590 for (const taskName of workerInfo.taskFunctionNames!) {
1591 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1592 workerNodeKey,
1593 taskName
1594 )
1595 }
1596 this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
1597 return
1598 }
1599 if (workerInfo == null) {
1600 throw new Error(
1601 `Worker node with key '${workerNodeKey}' not found in pool`
1602 )
1603 }
1604 workerInfo.stealing = true
1605 const stolenTask = this.workerNodeStealTask(workerNodeKey)
1606 if (
1607 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1608 stolenTask != null
1609 ) {
1610 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1611 const taskFunctionTasksWorkerUsage = this.workerNodes[
1612 workerNodeKey
1613 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1614 ].getTaskFunctionWorkerUsage(stolenTask.name!)!.tasks
1615 if (
1616 taskFunctionTasksWorkerUsage.sequentiallyStolen === 0 ||
1617 (previousStolenTask != null &&
1618 previousStolenTask.name === stolenTask.name &&
1619 taskFunctionTasksWorkerUsage.sequentiallyStolen > 0)
1620 ) {
1621 this.updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1622 workerNodeKey,
1623 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1624 stolenTask.name!
1625 )
1626 } else {
1627 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1628 workerNodeKey,
1629 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1630 stolenTask.name!
1631 )
1632 }
1633 }
1634 sleep(exponentialDelay(workerNodeTasksUsage.sequentiallyStolen))
1635 .then(() => {
1636 this.handleWorkerNodeIdleEvent(eventDetail, stolenTask)
1637 return undefined
1638 })
1639 .catch((error: unknown) => {
1640 this.emitter?.emit(PoolEvents.error, error)
1641 })
1642 }
1643
1644 private readonly workerNodeStealTask = (
1645 workerNodeKey: number
1646 ): Task<Data> | undefined => {
1647 const workerNodes = this.workerNodes
1648 .slice()
1649 .sort(
1650 (workerNodeA, workerNodeB) =>
1651 workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
1652 )
1653 const sourceWorkerNode = workerNodes.find(
1654 (sourceWorkerNode, sourceWorkerNodeKey) =>
1655 sourceWorkerNode.info.ready &&
1656 !sourceWorkerNode.info.stealing &&
1657 sourceWorkerNodeKey !== workerNodeKey &&
1658 sourceWorkerNode.usage.tasks.queued > 0
1659 )
1660 if (sourceWorkerNode != null) {
1661 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1662 const task = sourceWorkerNode.popTask()!
1663 this.handleTask(workerNodeKey, task)
1664 this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
1665 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1666 this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey, task.name!)
1667 return task
1668 }
1669 }
1670
1671 private readonly handleWorkerNodeBackPressureEvent = (
1672 eventDetail: WorkerNodeEventDetail
1673 ): void => {
1674 if (
1675 this.cannotStealTask() ||
1676 (this.info.stealingWorkerNodes ?? 0) >
1677 Math.floor(this.workerNodes.length / 2)
1678 ) {
1679 return
1680 }
1681 const { workerId } = eventDetail
1682 const sizeOffset = 1
1683 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1684 if (this.opts.tasksQueueOptions!.size! <= sizeOffset) {
1685 return
1686 }
1687 const sourceWorkerNode =
1688 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
1689 const workerNodes = this.workerNodes
1690 .slice()
1691 .sort(
1692 (workerNodeA, workerNodeB) =>
1693 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1694 )
1695 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1696 if (
1697 sourceWorkerNode.usage.tasks.queued > 0 &&
1698 workerNode.info.ready &&
1699 !workerNode.info.stealing &&
1700 workerNode.info.id !== workerId &&
1701 workerNode.usage.tasks.queued <
1702 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1703 this.opts.tasksQueueOptions!.size! - sizeOffset
1704 ) {
1705 const workerInfo = this.getWorkerInfo(workerNodeKey)
1706 if (workerInfo == null) {
1707 throw new Error(
1708 `Worker node with key '${workerNodeKey}' not found in pool`
1709 )
1710 }
1711 workerInfo.stealing = true
1712 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1713 const task = sourceWorkerNode.popTask()!
1714 this.handleTask(workerNodeKey, task)
1715 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1716 this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey, task.name!)
1717 workerInfo.stealing = false
1718 }
1719 }
1720 }
1721
1722 /**
1723 * This method is the message listener registered on each worker.
1724 */
1725 protected readonly workerMessageListener = (
1726 message: MessageValue<Response>
1727 ): void => {
1728 this.checkMessageWorkerId(message)
1729 const { workerId, ready, taskId, taskFunctionNames } = message
1730 if (ready != null && taskFunctionNames != null) {
1731 // Worker ready response received from worker
1732 this.handleWorkerReadyResponse(message)
1733 } else if (taskId != null) {
1734 // Task execution response received from worker
1735 this.handleTaskExecutionResponse(message)
1736 } else if (taskFunctionNames != null) {
1737 // Task function names message received from worker
1738 const workerInfo = this.getWorkerInfo(
1739 this.getWorkerNodeKeyByWorkerId(workerId)
1740 )
1741 if (workerInfo != null) {
1742 workerInfo.taskFunctionNames = taskFunctionNames
1743 }
1744 }
1745 }
1746
1747 private checkAndEmitReadyEvent (): void {
1748 if (!this.readyEventEmitted && this.ready) {
1749 this.emitter?.emit(PoolEvents.ready, this.info)
1750 this.readyEventEmitted = true
1751 }
1752 }
1753
1754 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
1755 const { workerId, ready, taskFunctionNames } = message
1756 if (ready == null || !ready) {
1757 throw new Error(`Worker ${workerId} failed to initialize`)
1758 }
1759 const workerNode =
1760 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
1761 workerNode.info.ready = ready
1762 workerNode.info.taskFunctionNames = taskFunctionNames
1763 this.checkAndEmitReadyEvent()
1764 }
1765
1766 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1767 const { workerId, taskId, workerError, data } = message
1768 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1769 const promiseResponse = this.promiseResponseMap.get(taskId!)
1770 if (promiseResponse != null) {
1771 const { resolve, reject, workerNodeKey, asyncResource } = promiseResponse
1772 const workerNode = this.workerNodes[workerNodeKey]
1773 if (workerError != null) {
1774 this.emitter?.emit(PoolEvents.taskError, workerError)
1775 asyncResource != null
1776 ? asyncResource.runInAsyncScope(
1777 reject,
1778 this.emitter,
1779 workerError.message
1780 )
1781 : reject(workerError.message)
1782 } else {
1783 asyncResource != null
1784 ? asyncResource.runInAsyncScope(resolve, this.emitter, data)
1785 : resolve(data as Response)
1786 }
1787 asyncResource?.emitDestroy()
1788 this.afterTaskExecutionHook(workerNodeKey, message)
1789 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1790 this.promiseResponseMap.delete(taskId!)
1791 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1792 workerNode?.emit('taskFinished', taskId)
1793 if (
1794 this.opts.enableTasksQueue === true &&
1795 !this.destroying &&
1796 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1797 workerNode != null
1798 ) {
1799 const workerNodeTasksUsage = workerNode.usage.tasks
1800 if (
1801 this.tasksQueueSize(workerNodeKey) > 0 &&
1802 workerNodeTasksUsage.executing <
1803 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1804 this.opts.tasksQueueOptions!.concurrency!
1805 ) {
1806 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1807 this.executeTask(workerNodeKey, this.dequeueTask(workerNodeKey)!)
1808 }
1809 if (
1810 workerNodeTasksUsage.executing === 0 &&
1811 this.tasksQueueSize(workerNodeKey) === 0 &&
1812 workerNodeTasksUsage.sequentiallyStolen === 0
1813 ) {
1814 workerNode.emit('idle', {
1815 workerId,
1816 workerNodeKey
1817 })
1818 }
1819 }
1820 }
1821 }
1822
1823 private checkAndEmitTaskExecutionEvents (): void {
1824 if (this.busy) {
1825 this.emitter?.emit(PoolEvents.busy, this.info)
1826 }
1827 }
1828
1829 private checkAndEmitTaskQueuingEvents (): void {
1830 if (this.hasBackPressure()) {
1831 this.emitter?.emit(PoolEvents.backPressure, this.info)
1832 }
1833 }
1834
1835 /**
1836 * Emits dynamic worker creation events.
1837 */
1838 protected abstract checkAndEmitDynamicWorkerCreationEvents (): void
1839
1840 /**
1841 * Gets the worker information given its worker node key.
1842 *
1843 * @param workerNodeKey - The worker node key.
1844 * @returns The worker information.
1845 */
1846 protected getWorkerInfo (workerNodeKey: number): WorkerInfo | undefined {
1847 return this.workerNodes[workerNodeKey]?.info
1848 }
1849
1850 /**
1851 * Creates a worker node.
1852 *
1853 * @returns The created worker node.
1854 */
1855 private createWorkerNode (): IWorkerNode<Worker, Data> {
1856 const workerNode = new WorkerNode<Worker, Data>(
1857 this.worker,
1858 this.filePath,
1859 {
1860 env: this.opts.env,
1861 workerOptions: this.opts.workerOptions,
1862 tasksQueueBackPressureSize:
1863 this.opts.tasksQueueOptions?.size ??
1864 getDefaultTasksQueueOptions(
1865 this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
1866 ).size
1867 }
1868 )
1869 // Flag the worker node as ready at pool startup.
1870 if (this.starting) {
1871 workerNode.info.ready = true
1872 }
1873 return workerNode
1874 }
1875
1876 /**
1877 * Adds the given worker node in the pool worker nodes.
1878 *
1879 * @param workerNode - The worker node.
1880 * @returns The added worker node key.
1881 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1882 */
1883 private addWorkerNode (workerNode: IWorkerNode<Worker, Data>): number {
1884 this.workerNodes.push(workerNode)
1885 const workerNodeKey = this.workerNodes.indexOf(workerNode)
1886 if (workerNodeKey === -1) {
1887 throw new Error('Worker added not found in worker nodes')
1888 }
1889 return workerNodeKey
1890 }
1891
1892 private checkAndEmitEmptyEvent (): void {
1893 if (this.empty) {
1894 this.emitter?.emit(PoolEvents.empty, this.info)
1895 this.readyEventEmitted = false
1896 }
1897 }
1898
1899 /**
1900 * Removes the worker node from the pool worker nodes.
1901 *
1902 * @param workerNode - The worker node.
1903 */
1904 private removeWorkerNode (workerNode: IWorkerNode<Worker, Data>): void {
1905 const workerNodeKey = this.workerNodes.indexOf(workerNode)
1906 if (workerNodeKey !== -1) {
1907 this.workerNodes.splice(workerNodeKey, 1)
1908 this.workerChoiceStrategyContext?.remove(workerNodeKey)
1909 }
1910 this.checkAndEmitEmptyEvent()
1911 }
1912
1913 protected flagWorkerNodeAsNotReady (workerNodeKey: number): void {
1914 const workerInfo = this.getWorkerInfo(workerNodeKey)
1915 if (workerInfo != null) {
1916 workerInfo.ready = false
1917 }
1918 }
1919
1920 private hasBackPressure (): boolean {
1921 return (
1922 this.opts.enableTasksQueue === true &&
1923 this.workerNodes.findIndex(
1924 workerNode => !workerNode.hasBackPressure()
1925 ) === -1
1926 )
1927 }
1928
1929 /**
1930 * Executes the given task on the worker given its worker node key.
1931 *
1932 * @param workerNodeKey - The worker node key.
1933 * @param task - The task to execute.
1934 */
1935 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1936 this.beforeTaskExecutionHook(workerNodeKey, task)
1937 this.sendToWorker(workerNodeKey, task, task.transferList)
1938 this.checkAndEmitTaskExecutionEvents()
1939 }
1940
1941 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1942 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1943 this.checkAndEmitTaskQueuingEvents()
1944 return tasksQueueSize
1945 }
1946
1947 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1948 return this.workerNodes[workerNodeKey].dequeueTask()
1949 }
1950
1951 private tasksQueueSize (workerNodeKey: number): number {
1952 return this.workerNodes[workerNodeKey].tasksQueueSize()
1953 }
1954
1955 protected flushTasksQueue (workerNodeKey: number): number {
1956 let flushedTasks = 0
1957 while (this.tasksQueueSize(workerNodeKey) > 0) {
1958 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1959 this.executeTask(workerNodeKey, this.dequeueTask(workerNodeKey)!)
1960 ++flushedTasks
1961 }
1962 this.workerNodes[workerNodeKey].clearTasksQueue()
1963 return flushedTasks
1964 }
1965
1966 private flushTasksQueues (): void {
1967 for (const [workerNodeKey] of this.workerNodes.entries()) {
1968 this.flushTasksQueue(workerNodeKey)
1969 }
1970 }
1971 }