5eeb57911903587c0760dac8fee11e30407cb86a
[poolifier.git] / src / pools / abstract-pool.ts
1 import { AsyncResource } from 'node:async_hooks'
2 import { randomUUID } from 'node:crypto'
3 import { EventEmitterAsyncResource } from 'node:events'
4 import { performance } from 'node:perf_hooks'
5 import type { TransferListItem } from 'node:worker_threads'
6
7 import type {
8 MessageValue,
9 PromiseResponseWrapper,
10 Task
11 } from '../utility-types.js'
12 import {
13 average,
14 DEFAULT_TASK_NAME,
15 EMPTY_FUNCTION,
16 exponentialDelay,
17 isKillBehavior,
18 isPlainObject,
19 max,
20 median,
21 min,
22 round,
23 sleep
24 } from '../utils.js'
25 import type { TaskFunction } from '../worker/task-functions.js'
26 import { KillBehaviors } from '../worker/worker-options.js'
27 import {
28 type IPool,
29 PoolEvents,
30 type PoolInfo,
31 type PoolOptions,
32 type PoolType,
33 PoolTypes,
34 type TasksQueueOptions
35 } from './pool.js'
36 import {
37 Measurements,
38 WorkerChoiceStrategies,
39 type WorkerChoiceStrategy,
40 type WorkerChoiceStrategyOptions
41 } from './selection-strategies/selection-strategies-types.js'
42 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context.js'
43 import {
44 checkFilePath,
45 checkValidTasksQueueOptions,
46 checkValidWorkerChoiceStrategy,
47 getDefaultTasksQueueOptions,
48 updateEluWorkerUsage,
49 updateRunTimeWorkerUsage,
50 updateTaskStatisticsWorkerUsage,
51 updateWaitTimeWorkerUsage,
52 waitWorkerNodeEvents
53 } from './utils.js'
54 import { version } from './version.js'
55 import type {
56 IWorker,
57 IWorkerNode,
58 WorkerInfo,
59 WorkerNodeEventDetail,
60 WorkerType
61 } from './worker.js'
62 import { WorkerNode } from './worker-node.js'
63
64 /**
65 * Base class that implements some shared logic for all poolifier pools.
66 *
67 * @typeParam Worker - Type of worker which manages this pool.
68 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
69 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
70 */
71 export abstract class AbstractPool<
72 Worker extends IWorker,
73 Data = unknown,
74 Response = unknown
75 > implements IPool<Worker, Data, Response> {
76 /** @inheritDoc */
77 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
78
79 /** @inheritDoc */
80 public emitter?: EventEmitterAsyncResource
81
82 /**
83 * The task execution response promise map:
84 * - `key`: The message id of each submitted task.
85 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
86 *
87 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
88 */
89 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
90 new Map<string, PromiseResponseWrapper<Response>>()
91
92 /**
93 * Worker choice strategy context referencing a worker choice algorithm implementation.
94 */
95 protected workerChoiceStrategyContext?: WorkerChoiceStrategyContext<
96 Worker,
97 Data,
98 Response
99 >
100
101 /**
102 * The task functions added at runtime map:
103 * - `key`: The task function name.
104 * - `value`: The task function itself.
105 */
106 private readonly taskFunctions: Map<string, TaskFunction<Data, Response>>
107
108 /**
109 * Whether the pool is started or not.
110 */
111 private started: boolean
112 /**
113 * Whether the pool is starting or not.
114 */
115 private starting: boolean
116 /**
117 * Whether the pool is destroying or not.
118 */
119 private destroying: boolean
120 /**
121 * Whether the minimum number of workers is starting or not.
122 */
123 private startingMinimumNumberOfWorkers: boolean
124 /**
125 * Whether the pool ready event has been emitted or not.
126 */
127 private readyEventEmitted: boolean
128 /**
129 * The start timestamp of the pool.
130 */
131 private readonly startTimestamp
132
133 /**
134 * Constructs a new poolifier pool.
135 *
136 * @param minimumNumberOfWorkers - Minimum number of workers that this pool manages.
137 * @param filePath - Path to the worker file.
138 * @param opts - Options for the pool.
139 * @param maximumNumberOfWorkers - Maximum number of workers that this pool manages.
140 */
141 public constructor (
142 protected readonly minimumNumberOfWorkers: number,
143 protected readonly filePath: string,
144 protected readonly opts: PoolOptions<Worker>,
145 protected readonly maximumNumberOfWorkers?: number
146 ) {
147 if (!this.isMain()) {
148 throw new Error(
149 'Cannot start a pool from a worker with the same type as the pool'
150 )
151 }
152 this.checkPoolType()
153 checkFilePath(this.filePath)
154 this.checkMinimumNumberOfWorkers(this.minimumNumberOfWorkers)
155 this.checkPoolOptions(this.opts)
156
157 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
158 this.executeTask = this.executeTask.bind(this)
159 this.enqueueTask = this.enqueueTask.bind(this)
160
161 if (this.opts.enableEvents === true) {
162 this.initializeEventEmitter()
163 }
164 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
165 Worker,
166 Data,
167 Response
168 >(
169 this,
170 this.opts.workerChoiceStrategy,
171 this.opts.workerChoiceStrategyOptions
172 )
173
174 this.setupHook()
175
176 this.taskFunctions = new Map<string, TaskFunction<Data, Response>>()
177
178 this.started = false
179 this.starting = false
180 this.destroying = false
181 this.readyEventEmitted = false
182 this.startingMinimumNumberOfWorkers = false
183 if (this.opts.startWorkers === true) {
184 this.start()
185 }
186
187 this.startTimestamp = performance.now()
188 }
189
190 private checkPoolType (): void {
191 if (this.type === PoolTypes.fixed && this.maximumNumberOfWorkers != null) {
192 throw new Error(
193 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
194 )
195 }
196 }
197
198 private checkMinimumNumberOfWorkers (
199 minimumNumberOfWorkers: number | undefined
200 ): void {
201 if (minimumNumberOfWorkers == null) {
202 throw new Error(
203 'Cannot instantiate a pool without specifying the number of workers'
204 )
205 } else if (!Number.isSafeInteger(minimumNumberOfWorkers)) {
206 throw new TypeError(
207 'Cannot instantiate a pool with a non safe integer number of workers'
208 )
209 } else if (minimumNumberOfWorkers < 0) {
210 throw new RangeError(
211 'Cannot instantiate a pool with a negative number of workers'
212 )
213 } else if (this.type === PoolTypes.fixed && minimumNumberOfWorkers === 0) {
214 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
215 }
216 }
217
218 private checkPoolOptions (opts: PoolOptions<Worker>): void {
219 if (isPlainObject(opts)) {
220 this.opts.startWorkers = opts.startWorkers ?? true
221 checkValidWorkerChoiceStrategy(opts.workerChoiceStrategy)
222 this.opts.workerChoiceStrategy =
223 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
224 this.checkValidWorkerChoiceStrategyOptions(
225 opts.workerChoiceStrategyOptions
226 )
227 if (opts.workerChoiceStrategyOptions != null) {
228 this.opts.workerChoiceStrategyOptions = opts.workerChoiceStrategyOptions
229 }
230 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
231 this.opts.enableEvents = opts.enableEvents ?? true
232 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
233 if (this.opts.enableTasksQueue) {
234 checkValidTasksQueueOptions(opts.tasksQueueOptions)
235 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
236 opts.tasksQueueOptions
237 )
238 }
239 } else {
240 throw new TypeError('Invalid pool options: must be a plain object')
241 }
242 }
243
244 private checkValidWorkerChoiceStrategyOptions (
245 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions | undefined
246 ): void {
247 if (
248 workerChoiceStrategyOptions != null &&
249 !isPlainObject(workerChoiceStrategyOptions)
250 ) {
251 throw new TypeError(
252 'Invalid worker choice strategy options: must be a plain object'
253 )
254 }
255 if (
256 workerChoiceStrategyOptions?.weights != null &&
257 Object.keys(workerChoiceStrategyOptions.weights).length !==
258 (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
259 ) {
260 throw new Error(
261 'Invalid worker choice strategy options: must have a weight for each worker node'
262 )
263 }
264 if (
265 workerChoiceStrategyOptions?.measurement != null &&
266 !Object.values(Measurements).includes(
267 workerChoiceStrategyOptions.measurement
268 )
269 ) {
270 throw new Error(
271 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
272 )
273 }
274 }
275
276 private initializeEventEmitter (): void {
277 this.emitter = new EventEmitterAsyncResource({
278 name: `poolifier:${this.type}-${this.worker}-pool`
279 })
280 }
281
282 /** @inheritDoc */
283 public get info (): PoolInfo {
284 return {
285 version,
286 type: this.type,
287 worker: this.worker,
288 started: this.started,
289 ready: this.ready,
290 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
291 strategy: this.opts.workerChoiceStrategy!,
292 strategyRetries: this.workerChoiceStrategyContext?.retriesCount ?? 0,
293 minSize: this.minimumNumberOfWorkers,
294 maxSize: this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers,
295 ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
296 .runTime.aggregate === true &&
297 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
298 .waitTime.aggregate && {
299 utilization: round(this.utilization)
300 }),
301 workerNodes: this.workerNodes.length,
302 idleWorkerNodes: this.workerNodes.reduce(
303 (accumulator, workerNode) =>
304 workerNode.usage.tasks.executing === 0
305 ? accumulator + 1
306 : accumulator,
307 0
308 ),
309 ...(this.opts.enableTasksQueue === true && {
310 stealingWorkerNodes: this.workerNodes.reduce(
311 (accumulator, workerNode) =>
312 workerNode.info.stealing ? accumulator + 1 : accumulator,
313 0
314 )
315 }),
316 busyWorkerNodes: this.workerNodes.reduce(
317 (accumulator, _workerNode, workerNodeKey) =>
318 this.isWorkerNodeBusy(workerNodeKey) ? accumulator + 1 : accumulator,
319 0
320 ),
321 executedTasks: this.workerNodes.reduce(
322 (accumulator, workerNode) =>
323 accumulator + workerNode.usage.tasks.executed,
324 0
325 ),
326 executingTasks: this.workerNodes.reduce(
327 (accumulator, workerNode) =>
328 accumulator + workerNode.usage.tasks.executing,
329 0
330 ),
331 ...(this.opts.enableTasksQueue === true && {
332 queuedTasks: this.workerNodes.reduce(
333 (accumulator, workerNode) =>
334 accumulator + workerNode.usage.tasks.queued,
335 0
336 )
337 }),
338 ...(this.opts.enableTasksQueue === true && {
339 maxQueuedTasks: this.workerNodes.reduce(
340 (accumulator, workerNode) =>
341 accumulator + (workerNode.usage.tasks.maxQueued ?? 0),
342 0
343 )
344 }),
345 ...(this.opts.enableTasksQueue === true && {
346 backPressure: this.hasBackPressure()
347 }),
348 ...(this.opts.enableTasksQueue === true && {
349 stolenTasks: this.workerNodes.reduce(
350 (accumulator, workerNode) =>
351 accumulator + workerNode.usage.tasks.stolen,
352 0
353 )
354 }),
355 failedTasks: this.workerNodes.reduce(
356 (accumulator, workerNode) =>
357 accumulator + workerNode.usage.tasks.failed,
358 0
359 ),
360 ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
361 .runTime.aggregate === true && {
362 runTime: {
363 minimum: round(
364 min(
365 ...this.workerNodes.map(
366 workerNode => workerNode.usage.runTime.minimum ?? Infinity
367 )
368 )
369 ),
370 maximum: round(
371 max(
372 ...this.workerNodes.map(
373 workerNode => workerNode.usage.runTime.maximum ?? -Infinity
374 )
375 )
376 ),
377 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
378 .runTime.average && {
379 average: round(
380 average(
381 this.workerNodes.reduce<number[]>(
382 (accumulator, workerNode) =>
383 accumulator.concat(workerNode.usage.runTime.history),
384 []
385 )
386 )
387 )
388 }),
389 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
390 .runTime.median && {
391 median: round(
392 median(
393 this.workerNodes.reduce<number[]>(
394 (accumulator, workerNode) =>
395 accumulator.concat(workerNode.usage.runTime.history),
396 []
397 )
398 )
399 )
400 })
401 }
402 }),
403 ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
404 .waitTime.aggregate === true && {
405 waitTime: {
406 minimum: round(
407 min(
408 ...this.workerNodes.map(
409 workerNode => workerNode.usage.waitTime.minimum ?? Infinity
410 )
411 )
412 ),
413 maximum: round(
414 max(
415 ...this.workerNodes.map(
416 workerNode => workerNode.usage.waitTime.maximum ?? -Infinity
417 )
418 )
419 ),
420 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
421 .waitTime.average && {
422 average: round(
423 average(
424 this.workerNodes.reduce<number[]>(
425 (accumulator, workerNode) =>
426 accumulator.concat(workerNode.usage.waitTime.history),
427 []
428 )
429 )
430 )
431 }),
432 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
433 .waitTime.median && {
434 median: round(
435 median(
436 this.workerNodes.reduce<number[]>(
437 (accumulator, workerNode) =>
438 accumulator.concat(workerNode.usage.waitTime.history),
439 []
440 )
441 )
442 )
443 })
444 }
445 })
446 }
447 }
448
449 /**
450 * The pool readiness boolean status.
451 */
452 private get ready (): boolean {
453 if (this.empty) {
454 return false
455 }
456 return (
457 this.workerNodes.reduce(
458 (accumulator, workerNode) =>
459 !workerNode.info.dynamic && workerNode.info.ready
460 ? accumulator + 1
461 : accumulator,
462 0
463 ) >= this.minimumNumberOfWorkers
464 )
465 }
466
467 /**
468 * The pool emptiness boolean status.
469 */
470 protected get empty (): boolean {
471 return this.minimumNumberOfWorkers === 0 && this.workerNodes.length === 0
472 }
473
474 /**
475 * The approximate pool utilization.
476 *
477 * @returns The pool utilization.
478 */
479 private get utilization (): number {
480 const poolTimeCapacity =
481 (performance.now() - this.startTimestamp) *
482 (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
483 const totalTasksRunTime = this.workerNodes.reduce(
484 (accumulator, workerNode) =>
485 accumulator + (workerNode.usage.runTime.aggregate ?? 0),
486 0
487 )
488 const totalTasksWaitTime = this.workerNodes.reduce(
489 (accumulator, workerNode) =>
490 accumulator + (workerNode.usage.waitTime.aggregate ?? 0),
491 0
492 )
493 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
494 }
495
496 /**
497 * The pool type.
498 *
499 * If it is `'dynamic'`, it provides the `max` property.
500 */
501 protected abstract get type (): PoolType
502
503 /**
504 * The worker type.
505 */
506 protected abstract get worker (): WorkerType
507
508 /**
509 * Checks if the worker id sent in the received message from a worker is valid.
510 *
511 * @param message - The received message.
512 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
513 */
514 private checkMessageWorkerId (message: MessageValue<Data | Response>): void {
515 if (message.workerId == null) {
516 throw new Error('Worker message received without worker id')
517 } else if (this.getWorkerNodeKeyByWorkerId(message.workerId) === -1) {
518 throw new Error(
519 `Worker message received from unknown worker '${message.workerId}'`
520 )
521 }
522 }
523
524 /**
525 * Gets the worker node key given its worker id.
526 *
527 * @param workerId - The worker id.
528 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
529 */
530 private getWorkerNodeKeyByWorkerId (workerId: number | undefined): number {
531 return this.workerNodes.findIndex(
532 workerNode => workerNode.info.id === workerId
533 )
534 }
535
536 /** @inheritDoc */
537 public setWorkerChoiceStrategy (
538 workerChoiceStrategy: WorkerChoiceStrategy,
539 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
540 ): void {
541 checkValidWorkerChoiceStrategy(workerChoiceStrategy)
542 this.opts.workerChoiceStrategy = workerChoiceStrategy
543 this.workerChoiceStrategyContext?.setWorkerChoiceStrategy(
544 this.opts.workerChoiceStrategy
545 )
546 if (workerChoiceStrategyOptions != null) {
547 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
548 }
549 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
550 workerNode.resetUsage()
551 this.sendStatisticsMessageToWorker(workerNodeKey)
552 }
553 }
554
555 /** @inheritDoc */
556 public setWorkerChoiceStrategyOptions (
557 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions | undefined
558 ): void {
559 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
560 if (workerChoiceStrategyOptions != null) {
561 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
562 }
563 this.workerChoiceStrategyContext?.setOptions(
564 this.opts.workerChoiceStrategyOptions
565 )
566 }
567
568 /** @inheritDoc */
569 public enableTasksQueue (
570 enable: boolean,
571 tasksQueueOptions?: TasksQueueOptions
572 ): void {
573 if (this.opts.enableTasksQueue === true && !enable) {
574 this.unsetTaskStealing()
575 this.unsetTasksStealingOnBackPressure()
576 this.flushTasksQueues()
577 }
578 this.opts.enableTasksQueue = enable
579 this.setTasksQueueOptions(tasksQueueOptions)
580 }
581
582 /** @inheritDoc */
583 public setTasksQueueOptions (
584 tasksQueueOptions: TasksQueueOptions | undefined
585 ): void {
586 if (this.opts.enableTasksQueue === true) {
587 checkValidTasksQueueOptions(tasksQueueOptions)
588 this.opts.tasksQueueOptions =
589 this.buildTasksQueueOptions(tasksQueueOptions)
590 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
591 this.setTasksQueueSize(this.opts.tasksQueueOptions.size!)
592 if (this.opts.tasksQueueOptions.taskStealing === true) {
593 this.unsetTaskStealing()
594 this.setTaskStealing()
595 } else {
596 this.unsetTaskStealing()
597 }
598 if (this.opts.tasksQueueOptions.tasksStealingOnBackPressure === true) {
599 this.unsetTasksStealingOnBackPressure()
600 this.setTasksStealingOnBackPressure()
601 } else {
602 this.unsetTasksStealingOnBackPressure()
603 }
604 } else if (this.opts.tasksQueueOptions != null) {
605 delete this.opts.tasksQueueOptions
606 }
607 }
608
609 private buildTasksQueueOptions (
610 tasksQueueOptions: TasksQueueOptions | undefined
611 ): TasksQueueOptions {
612 return {
613 ...getDefaultTasksQueueOptions(
614 this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
615 ),
616 ...tasksQueueOptions
617 }
618 }
619
620 private setTasksQueueSize (size: number): void {
621 for (const workerNode of this.workerNodes) {
622 workerNode.tasksQueueBackPressureSize = size
623 }
624 }
625
626 private setTaskStealing (): void {
627 for (const [workerNodeKey] of this.workerNodes.entries()) {
628 this.workerNodes[workerNodeKey].on('idle', this.handleWorkerNodeIdleEvent)
629 }
630 }
631
632 private unsetTaskStealing (): void {
633 for (const [workerNodeKey] of this.workerNodes.entries()) {
634 this.workerNodes[workerNodeKey].off(
635 'idle',
636 this.handleWorkerNodeIdleEvent
637 )
638 }
639 }
640
641 private setTasksStealingOnBackPressure (): void {
642 for (const [workerNodeKey] of this.workerNodes.entries()) {
643 this.workerNodes[workerNodeKey].on(
644 'backPressure',
645 this.handleWorkerNodeBackPressureEvent
646 )
647 }
648 }
649
650 private unsetTasksStealingOnBackPressure (): void {
651 for (const [workerNodeKey] of this.workerNodes.entries()) {
652 this.workerNodes[workerNodeKey].off(
653 'backPressure',
654 this.handleWorkerNodeBackPressureEvent
655 )
656 }
657 }
658
659 /**
660 * Whether the pool is full or not.
661 *
662 * The pool filling boolean status.
663 */
664 protected get full (): boolean {
665 return (
666 this.workerNodes.length >=
667 (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
668 )
669 }
670
671 /**
672 * Whether the pool is busy or not.
673 *
674 * The pool busyness boolean status.
675 */
676 protected abstract get busy (): boolean
677
678 /**
679 * Whether worker nodes are executing concurrently their tasks quota or not.
680 *
681 * @returns Worker nodes busyness boolean status.
682 */
683 protected internalBusy (): boolean {
684 if (this.opts.enableTasksQueue === true) {
685 return (
686 this.workerNodes.findIndex(
687 workerNode =>
688 workerNode.info.ready &&
689 workerNode.usage.tasks.executing <
690 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
691 this.opts.tasksQueueOptions!.concurrency!
692 ) === -1
693 )
694 }
695 return (
696 this.workerNodes.findIndex(
697 workerNode =>
698 workerNode.info.ready && workerNode.usage.tasks.executing === 0
699 ) === -1
700 )
701 }
702
703 private isWorkerNodeBusy (workerNodeKey: number): boolean {
704 if (this.opts.enableTasksQueue === true) {
705 return (
706 this.workerNodes[workerNodeKey].usage.tasks.executing >=
707 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
708 this.opts.tasksQueueOptions!.concurrency!
709 )
710 }
711 return this.workerNodes[workerNodeKey].usage.tasks.executing > 0
712 }
713
714 private async sendTaskFunctionOperationToWorker (
715 workerNodeKey: number,
716 message: MessageValue<Data>
717 ): Promise<boolean> {
718 return await new Promise<boolean>((resolve, reject) => {
719 const taskFunctionOperationListener = (
720 message: MessageValue<Response>
721 ): void => {
722 this.checkMessageWorkerId(message)
723 const workerId = this.getWorkerInfo(workerNodeKey)?.id
724 if (
725 message.taskFunctionOperationStatus != null &&
726 message.workerId === workerId
727 ) {
728 if (message.taskFunctionOperationStatus) {
729 resolve(true)
730 } else {
731 reject(
732 new Error(
733 `Task function operation '${message.taskFunctionOperation}' failed on worker ${message.workerId} with error: '${message.workerError?.message}'`
734 )
735 )
736 }
737 this.deregisterWorkerMessageListener(
738 this.getWorkerNodeKeyByWorkerId(message.workerId),
739 taskFunctionOperationListener
740 )
741 }
742 }
743 this.registerWorkerMessageListener(
744 workerNodeKey,
745 taskFunctionOperationListener
746 )
747 this.sendToWorker(workerNodeKey, message)
748 })
749 }
750
751 private async sendTaskFunctionOperationToWorkers (
752 message: MessageValue<Data>
753 ): Promise<boolean> {
754 return await new Promise<boolean>((resolve, reject) => {
755 const responsesReceived = new Array<MessageValue<Response>>()
756 const taskFunctionOperationsListener = (
757 message: MessageValue<Response>
758 ): void => {
759 this.checkMessageWorkerId(message)
760 if (message.taskFunctionOperationStatus != null) {
761 responsesReceived.push(message)
762 if (responsesReceived.length === this.workerNodes.length) {
763 if (
764 responsesReceived.every(
765 message => message.taskFunctionOperationStatus === true
766 )
767 ) {
768 resolve(true)
769 } else if (
770 responsesReceived.some(
771 message => message.taskFunctionOperationStatus === false
772 )
773 ) {
774 const errorResponse = responsesReceived.find(
775 response => response.taskFunctionOperationStatus === false
776 )
777 reject(
778 new Error(
779 `Task function operation '${
780 message.taskFunctionOperation as string
781 }' failed on worker ${errorResponse?.workerId} with error: '${
782 errorResponse?.workerError?.message
783 }'`
784 )
785 )
786 }
787 this.deregisterWorkerMessageListener(
788 this.getWorkerNodeKeyByWorkerId(message.workerId),
789 taskFunctionOperationsListener
790 )
791 }
792 }
793 }
794 for (const [workerNodeKey] of this.workerNodes.entries()) {
795 this.registerWorkerMessageListener(
796 workerNodeKey,
797 taskFunctionOperationsListener
798 )
799 this.sendToWorker(workerNodeKey, message)
800 }
801 })
802 }
803
804 /** @inheritDoc */
805 public hasTaskFunction (name: string): boolean {
806 for (const workerNode of this.workerNodes) {
807 if (
808 Array.isArray(workerNode.info.taskFunctionNames) &&
809 workerNode.info.taskFunctionNames.includes(name)
810 ) {
811 return true
812 }
813 }
814 return false
815 }
816
817 /** @inheritDoc */
818 public async addTaskFunction (
819 name: string,
820 fn: TaskFunction<Data, Response>
821 ): Promise<boolean> {
822 if (typeof name !== 'string') {
823 throw new TypeError('name argument must be a string')
824 }
825 if (typeof name === 'string' && name.trim().length === 0) {
826 throw new TypeError('name argument must not be an empty string')
827 }
828 if (typeof fn !== 'function') {
829 throw new TypeError('fn argument must be a function')
830 }
831 const opResult = await this.sendTaskFunctionOperationToWorkers({
832 taskFunctionOperation: 'add',
833 taskFunctionName: name,
834 taskFunction: fn.toString()
835 })
836 this.taskFunctions.set(name, fn)
837 return opResult
838 }
839
840 /** @inheritDoc */
841 public async removeTaskFunction (name: string): Promise<boolean> {
842 if (!this.taskFunctions.has(name)) {
843 throw new Error(
844 'Cannot remove a task function not handled on the pool side'
845 )
846 }
847 const opResult = await this.sendTaskFunctionOperationToWorkers({
848 taskFunctionOperation: 'remove',
849 taskFunctionName: name
850 })
851 this.deleteTaskFunctionWorkerUsages(name)
852 this.taskFunctions.delete(name)
853 return opResult
854 }
855
856 /** @inheritDoc */
857 public listTaskFunctionNames (): string[] {
858 for (const workerNode of this.workerNodes) {
859 if (
860 Array.isArray(workerNode.info.taskFunctionNames) &&
861 workerNode.info.taskFunctionNames.length > 0
862 ) {
863 return workerNode.info.taskFunctionNames
864 }
865 }
866 return []
867 }
868
869 /** @inheritDoc */
870 public async setDefaultTaskFunction (name: string): Promise<boolean> {
871 return await this.sendTaskFunctionOperationToWorkers({
872 taskFunctionOperation: 'default',
873 taskFunctionName: name
874 })
875 }
876
877 private deleteTaskFunctionWorkerUsages (name: string): void {
878 for (const workerNode of this.workerNodes) {
879 workerNode.deleteTaskFunctionWorkerUsage(name)
880 }
881 }
882
883 private shallExecuteTask (workerNodeKey: number): boolean {
884 return (
885 this.tasksQueueSize(workerNodeKey) === 0 &&
886 this.workerNodes[workerNodeKey].usage.tasks.executing <
887 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
888 this.opts.tasksQueueOptions!.concurrency!
889 )
890 }
891
892 /** @inheritDoc */
893 public async execute (
894 data?: Data,
895 name?: string,
896 transferList?: TransferListItem[]
897 ): Promise<Response> {
898 return await new Promise<Response>((resolve, reject) => {
899 if (!this.started) {
900 reject(new Error('Cannot execute a task on not started pool'))
901 return
902 }
903 if (this.destroying) {
904 reject(new Error('Cannot execute a task on destroying pool'))
905 return
906 }
907 if (name != null && typeof name !== 'string') {
908 reject(new TypeError('name argument must be a string'))
909 return
910 }
911 if (
912 name != null &&
913 typeof name === 'string' &&
914 name.trim().length === 0
915 ) {
916 reject(new TypeError('name argument must not be an empty string'))
917 return
918 }
919 if (transferList != null && !Array.isArray(transferList)) {
920 reject(new TypeError('transferList argument must be an array'))
921 return
922 }
923 const timestamp = performance.now()
924 const workerNodeKey = this.chooseWorkerNode()
925 const task: Task<Data> = {
926 name: name ?? DEFAULT_TASK_NAME,
927 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
928 data: data ?? ({} as Data),
929 transferList,
930 timestamp,
931 taskId: randomUUID()
932 }
933 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
934 this.promiseResponseMap.set(task.taskId!, {
935 resolve,
936 reject,
937 workerNodeKey,
938 ...(this.emitter != null && {
939 asyncResource: new AsyncResource('poolifier:task', {
940 triggerAsyncId: this.emitter.asyncId,
941 requireManualDestroy: true
942 })
943 })
944 })
945 if (
946 this.opts.enableTasksQueue === false ||
947 (this.opts.enableTasksQueue === true &&
948 this.shallExecuteTask(workerNodeKey))
949 ) {
950 this.executeTask(workerNodeKey, task)
951 } else {
952 this.enqueueTask(workerNodeKey, task)
953 }
954 })
955 }
956
957 /**
958 * Starts the minimum number of workers.
959 */
960 private startMinimumNumberOfWorkers (): void {
961 this.startingMinimumNumberOfWorkers = true
962 while (
963 this.workerNodes.reduce(
964 (accumulator, workerNode) =>
965 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
966 0
967 ) < this.minimumNumberOfWorkers
968 ) {
969 this.createAndSetupWorkerNode()
970 }
971 this.startingMinimumNumberOfWorkers = false
972 }
973
974 /** @inheritdoc */
975 public start (): void {
976 if (this.started) {
977 throw new Error('Cannot start an already started pool')
978 }
979 if (this.starting) {
980 throw new Error('Cannot start an already starting pool')
981 }
982 if (this.destroying) {
983 throw new Error('Cannot start a destroying pool')
984 }
985 this.starting = true
986 this.startMinimumNumberOfWorkers()
987 this.starting = false
988 this.started = true
989 }
990
991 /** @inheritDoc */
992 public async destroy (): Promise<void> {
993 if (!this.started) {
994 throw new Error('Cannot destroy an already destroyed pool')
995 }
996 if (this.starting) {
997 throw new Error('Cannot destroy an starting pool')
998 }
999 if (this.destroying) {
1000 throw new Error('Cannot destroy an already destroying pool')
1001 }
1002 this.destroying = true
1003 await Promise.all(
1004 this.workerNodes.map(async (_workerNode, workerNodeKey) => {
1005 await this.destroyWorkerNode(workerNodeKey)
1006 })
1007 )
1008 this.emitter?.emit(PoolEvents.destroy, this.info)
1009 this.emitter?.emitDestroy()
1010 this.readyEventEmitted = false
1011 this.destroying = false
1012 this.started = false
1013 }
1014
1015 private async sendKillMessageToWorker (workerNodeKey: number): Promise<void> {
1016 await new Promise<void>((resolve, reject) => {
1017 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1018 if (this.workerNodes[workerNodeKey] == null) {
1019 resolve()
1020 return
1021 }
1022 const killMessageListener = (message: MessageValue<Response>): void => {
1023 this.checkMessageWorkerId(message)
1024 if (message.kill === 'success') {
1025 resolve()
1026 } else if (message.kill === 'failure') {
1027 reject(
1028 new Error(
1029 `Kill message handling failed on worker ${message.workerId}`
1030 )
1031 )
1032 }
1033 }
1034 // FIXME: should be registered only once
1035 this.registerWorkerMessageListener(workerNodeKey, killMessageListener)
1036 this.sendToWorker(workerNodeKey, { kill: true })
1037 })
1038 }
1039
1040 /**
1041 * Terminates the worker node given its worker node key.
1042 *
1043 * @param workerNodeKey - The worker node key.
1044 */
1045 protected async destroyWorkerNode (workerNodeKey: number): Promise<void> {
1046 this.flagWorkerNodeAsNotReady(workerNodeKey)
1047 const flushedTasks = this.flushTasksQueue(workerNodeKey)
1048 const workerNode = this.workerNodes[workerNodeKey]
1049 await waitWorkerNodeEvents(
1050 workerNode,
1051 'taskFinished',
1052 flushedTasks,
1053 this.opts.tasksQueueOptions?.tasksFinishedTimeout ??
1054 getDefaultTasksQueueOptions(
1055 this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
1056 ).tasksFinishedTimeout
1057 )
1058 await this.sendKillMessageToWorker(workerNodeKey)
1059 await workerNode.terminate()
1060 }
1061
1062 /**
1063 * Setup hook to execute code before worker nodes are created in the abstract constructor.
1064 * Can be overridden.
1065 *
1066 * @virtual
1067 */
1068 protected setupHook (): void {
1069 /* Intentionally empty */
1070 }
1071
1072 /**
1073 * Should return whether the worker is the main worker or not.
1074 */
1075 protected abstract isMain (): boolean
1076
1077 /**
1078 * Hook executed before the worker task execution.
1079 * Can be overridden.
1080 *
1081 * @param workerNodeKey - The worker node key.
1082 * @param task - The task to execute.
1083 */
1084 protected beforeTaskExecutionHook (
1085 workerNodeKey: number,
1086 task: Task<Data>
1087 ): void {
1088 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1089 if (this.workerNodes[workerNodeKey]?.usage != null) {
1090 const workerUsage = this.workerNodes[workerNodeKey].usage
1091 ++workerUsage.tasks.executing
1092 updateWaitTimeWorkerUsage(
1093 this.workerChoiceStrategyContext,
1094 workerUsage,
1095 task
1096 )
1097 }
1098 if (
1099 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1100 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1101 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(task.name!) !=
1102 null
1103 ) {
1104 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1105 const taskFunctionWorkerUsage = this.workerNodes[
1106 workerNodeKey
1107 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1108 ].getTaskFunctionWorkerUsage(task.name!)!
1109 ++taskFunctionWorkerUsage.tasks.executing
1110 updateWaitTimeWorkerUsage(
1111 this.workerChoiceStrategyContext,
1112 taskFunctionWorkerUsage,
1113 task
1114 )
1115 }
1116 }
1117
1118 /**
1119 * Hook executed after the worker task execution.
1120 * Can be overridden.
1121 *
1122 * @param workerNodeKey - The worker node key.
1123 * @param message - The received message.
1124 */
1125 protected afterTaskExecutionHook (
1126 workerNodeKey: number,
1127 message: MessageValue<Response>
1128 ): void {
1129 let needWorkerChoiceStrategyUpdate = false
1130 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1131 if (this.workerNodes[workerNodeKey]?.usage != null) {
1132 const workerUsage = this.workerNodes[workerNodeKey].usage
1133 updateTaskStatisticsWorkerUsage(workerUsage, message)
1134 updateRunTimeWorkerUsage(
1135 this.workerChoiceStrategyContext,
1136 workerUsage,
1137 message
1138 )
1139 updateEluWorkerUsage(
1140 this.workerChoiceStrategyContext,
1141 workerUsage,
1142 message
1143 )
1144 needWorkerChoiceStrategyUpdate = true
1145 }
1146 if (
1147 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1148 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
1149 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1150 message.taskPerformance!.name
1151 ) != null
1152 ) {
1153 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1154 const taskFunctionWorkerUsage = this.workerNodes[
1155 workerNodeKey
1156 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1157 ].getTaskFunctionWorkerUsage(message.taskPerformance!.name)!
1158 updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
1159 updateRunTimeWorkerUsage(
1160 this.workerChoiceStrategyContext,
1161 taskFunctionWorkerUsage,
1162 message
1163 )
1164 updateEluWorkerUsage(
1165 this.workerChoiceStrategyContext,
1166 taskFunctionWorkerUsage,
1167 message
1168 )
1169 needWorkerChoiceStrategyUpdate = true
1170 }
1171 if (needWorkerChoiceStrategyUpdate) {
1172 this.workerChoiceStrategyContext?.update(workerNodeKey)
1173 }
1174 }
1175
1176 /**
1177 * Whether the worker node shall update its task function worker usage or not.
1178 *
1179 * @param workerNodeKey - The worker node key.
1180 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1181 */
1182 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
1183 const workerInfo = this.getWorkerInfo(workerNodeKey)
1184 return (
1185 workerInfo != null &&
1186 Array.isArray(workerInfo.taskFunctionNames) &&
1187 workerInfo.taskFunctionNames.length > 2
1188 )
1189 }
1190
1191 /**
1192 * Chooses a worker node for the next task.
1193 *
1194 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
1195 *
1196 * @returns The chosen worker node key
1197 */
1198 private chooseWorkerNode (): number {
1199 if (this.shallCreateDynamicWorker()) {
1200 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
1201 if (
1202 this.workerChoiceStrategyContext?.getStrategyPolicy()
1203 .dynamicWorkerUsage === true
1204 ) {
1205 return workerNodeKey
1206 }
1207 }
1208 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1209 return this.workerChoiceStrategyContext!.execute()
1210 }
1211
1212 /**
1213 * Conditions for dynamic worker creation.
1214 *
1215 * @returns Whether to create a dynamic worker or not.
1216 */
1217 protected abstract shallCreateDynamicWorker (): boolean
1218
1219 /**
1220 * Sends a message to worker given its worker node key.
1221 *
1222 * @param workerNodeKey - The worker node key.
1223 * @param message - The message.
1224 * @param transferList - The optional array of transferable objects.
1225 */
1226 protected abstract sendToWorker (
1227 workerNodeKey: number,
1228 message: MessageValue<Data>,
1229 transferList?: TransferListItem[]
1230 ): void
1231
1232 /**
1233 * Creates a new, completely set up worker node.
1234 *
1235 * @returns New, completely set up worker node key.
1236 */
1237 protected createAndSetupWorkerNode (): number {
1238 const workerNode = this.createWorkerNode()
1239 workerNode.registerWorkerEventHandler(
1240 'online',
1241 this.opts.onlineHandler ?? EMPTY_FUNCTION
1242 )
1243 workerNode.registerWorkerEventHandler(
1244 'message',
1245 this.opts.messageHandler ?? EMPTY_FUNCTION
1246 )
1247 workerNode.registerWorkerEventHandler(
1248 'error',
1249 this.opts.errorHandler ?? EMPTY_FUNCTION
1250 )
1251 workerNode.registerOnceWorkerEventHandler('error', (error: Error) => {
1252 workerNode.info.ready = false
1253 this.emitter?.emit(PoolEvents.error, error)
1254 if (
1255 this.started &&
1256 !this.destroying &&
1257 this.opts.restartWorkerOnError === true
1258 ) {
1259 if (workerNode.info.dynamic) {
1260 this.createAndSetupDynamicWorkerNode()
1261 } else if (!this.startingMinimumNumberOfWorkers) {
1262 this.startMinimumNumberOfWorkers()
1263 }
1264 }
1265 if (
1266 this.started &&
1267 !this.destroying &&
1268 this.opts.enableTasksQueue === true
1269 ) {
1270 this.redistributeQueuedTasks(this.workerNodes.indexOf(workerNode))
1271 }
1272 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1273 workerNode?.terminate().catch((error: unknown) => {
1274 this.emitter?.emit(PoolEvents.error, error)
1275 })
1276 })
1277 workerNode.registerWorkerEventHandler(
1278 'exit',
1279 this.opts.exitHandler ?? EMPTY_FUNCTION
1280 )
1281 workerNode.registerOnceWorkerEventHandler('exit', () => {
1282 this.removeWorkerNode(workerNode)
1283 if (
1284 this.started &&
1285 !this.startingMinimumNumberOfWorkers &&
1286 !this.destroying
1287 ) {
1288 this.startMinimumNumberOfWorkers()
1289 }
1290 })
1291 const workerNodeKey = this.addWorkerNode(workerNode)
1292 this.afterWorkerNodeSetup(workerNodeKey)
1293 return workerNodeKey
1294 }
1295
1296 /**
1297 * Creates a new, completely set up dynamic worker node.
1298 *
1299 * @returns New, completely set up dynamic worker node key.
1300 */
1301 protected createAndSetupDynamicWorkerNode (): number {
1302 const workerNodeKey = this.createAndSetupWorkerNode()
1303 this.registerWorkerMessageListener(workerNodeKey, message => {
1304 this.checkMessageWorkerId(message)
1305 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1306 message.workerId
1307 )
1308 const workerUsage = this.workerNodes[localWorkerNodeKey]?.usage
1309 // Kill message received from worker
1310 if (
1311 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1312 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
1313 ((this.opts.enableTasksQueue === false &&
1314 workerUsage.tasks.executing === 0) ||
1315 (this.opts.enableTasksQueue === true &&
1316 workerUsage.tasks.executing === 0 &&
1317 this.tasksQueueSize(localWorkerNodeKey) === 0)))
1318 ) {
1319 // Flag the worker node as not ready immediately
1320 this.flagWorkerNodeAsNotReady(localWorkerNodeKey)
1321 this.destroyWorkerNode(localWorkerNodeKey).catch((error: unknown) => {
1322 this.emitter?.emit(PoolEvents.error, error)
1323 })
1324 }
1325 })
1326 this.sendToWorker(workerNodeKey, {
1327 checkActive: true
1328 })
1329 if (this.taskFunctions.size > 0) {
1330 for (const [taskFunctionName, taskFunction] of this.taskFunctions) {
1331 this.sendTaskFunctionOperationToWorker(workerNodeKey, {
1332 taskFunctionOperation: 'add',
1333 taskFunctionName,
1334 taskFunction: taskFunction.toString()
1335 }).catch((error: unknown) => {
1336 this.emitter?.emit(PoolEvents.error, error)
1337 })
1338 }
1339 }
1340 const workerNode = this.workerNodes[workerNodeKey]
1341 workerNode.info.dynamic = true
1342 if (
1343 this.workerChoiceStrategyContext?.getStrategyPolicy()
1344 .dynamicWorkerReady === true ||
1345 this.workerChoiceStrategyContext?.getStrategyPolicy()
1346 .dynamicWorkerUsage === true
1347 ) {
1348 workerNode.info.ready = true
1349 }
1350 this.checkAndEmitDynamicWorkerCreationEvents()
1351 return workerNodeKey
1352 }
1353
1354 /**
1355 * Registers a listener callback on the worker given its worker node key.
1356 *
1357 * @param workerNodeKey - The worker node key.
1358 * @param listener - The message listener callback.
1359 */
1360 protected abstract registerWorkerMessageListener<
1361 Message extends Data | Response
1362 >(
1363 workerNodeKey: number,
1364 listener: (message: MessageValue<Message>) => void
1365 ): void
1366
1367 /**
1368 * Registers once a listener callback on the worker given its worker node key.
1369 *
1370 * @param workerNodeKey - The worker node key.
1371 * @param listener - The message listener callback.
1372 */
1373 protected abstract registerOnceWorkerMessageListener<
1374 Message extends Data | Response
1375 >(
1376 workerNodeKey: number,
1377 listener: (message: MessageValue<Message>) => void
1378 ): void
1379
1380 /**
1381 * Deregisters a listener callback on the worker given its worker node key.
1382 *
1383 * @param workerNodeKey - The worker node key.
1384 * @param listener - The message listener callback.
1385 */
1386 protected abstract deregisterWorkerMessageListener<
1387 Message extends Data | Response
1388 >(
1389 workerNodeKey: number,
1390 listener: (message: MessageValue<Message>) => void
1391 ): void
1392
1393 /**
1394 * Method hooked up after a worker node has been newly created.
1395 * Can be overridden.
1396 *
1397 * @param workerNodeKey - The newly created worker node key.
1398 */
1399 protected afterWorkerNodeSetup (workerNodeKey: number): void {
1400 // Listen to worker messages.
1401 this.registerWorkerMessageListener(
1402 workerNodeKey,
1403 this.workerMessageListener
1404 )
1405 // Send the startup message to worker.
1406 this.sendStartupMessageToWorker(workerNodeKey)
1407 // Send the statistics message to worker.
1408 this.sendStatisticsMessageToWorker(workerNodeKey)
1409 if (this.opts.enableTasksQueue === true) {
1410 if (this.opts.tasksQueueOptions?.taskStealing === true) {
1411 this.workerNodes[workerNodeKey].on(
1412 'idle',
1413 this.handleWorkerNodeIdleEvent
1414 )
1415 }
1416 if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
1417 this.workerNodes[workerNodeKey].on(
1418 'backPressure',
1419 this.handleWorkerNodeBackPressureEvent
1420 )
1421 }
1422 }
1423 }
1424
1425 /**
1426 * Sends the startup message to worker given its worker node key.
1427 *
1428 * @param workerNodeKey - The worker node key.
1429 */
1430 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1431
1432 /**
1433 * Sends the statistics message to worker given its worker node key.
1434 *
1435 * @param workerNodeKey - The worker node key.
1436 */
1437 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
1438 this.sendToWorker(workerNodeKey, {
1439 statistics: {
1440 runTime:
1441 this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
1442 .runTime.aggregate ?? false,
1443 elu:
1444 this.workerChoiceStrategyContext?.getTaskStatisticsRequirements().elu
1445 .aggregate ?? false
1446 }
1447 })
1448 }
1449
1450 private cannotStealTask (): boolean {
1451 return this.workerNodes.length <= 1 || this.info.queuedTasks === 0
1452 }
1453
1454 private handleTask (workerNodeKey: number, task: Task<Data>): void {
1455 if (this.shallExecuteTask(workerNodeKey)) {
1456 this.executeTask(workerNodeKey, task)
1457 } else {
1458 this.enqueueTask(workerNodeKey, task)
1459 }
1460 }
1461
1462 private redistributeQueuedTasks (workerNodeKey: number): void {
1463 if (workerNodeKey === -1 || this.cannotStealTask()) {
1464 return
1465 }
1466 while (this.tasksQueueSize(workerNodeKey) > 0) {
1467 const destinationWorkerNodeKey = this.workerNodes.reduce(
1468 (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
1469 return workerNode.info.ready &&
1470 workerNode.usage.tasks.queued <
1471 workerNodes[minWorkerNodeKey].usage.tasks.queued
1472 ? workerNodeKey
1473 : minWorkerNodeKey
1474 },
1475 0
1476 )
1477 this.handleTask(
1478 destinationWorkerNodeKey,
1479 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1480 this.dequeueTask(workerNodeKey)!
1481 )
1482 }
1483 }
1484
1485 private updateTaskStolenStatisticsWorkerUsage (
1486 workerNodeKey: number,
1487 taskName: string
1488 ): void {
1489 const workerNode = this.workerNodes[workerNodeKey]
1490 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1491 if (workerNode?.usage != null) {
1492 ++workerNode.usage.tasks.stolen
1493 }
1494 if (
1495 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1496 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1497 ) {
1498 const taskFunctionWorkerUsage =
1499 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1500 workerNode.getTaskFunctionWorkerUsage(taskName)!
1501 ++taskFunctionWorkerUsage.tasks.stolen
1502 }
1503 }
1504
1505 private updateTaskSequentiallyStolenStatisticsWorkerUsage (
1506 workerNodeKey: number
1507 ): void {
1508 const workerNode = this.workerNodes[workerNodeKey]
1509 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1510 if (workerNode?.usage != null) {
1511 ++workerNode.usage.tasks.sequentiallyStolen
1512 }
1513 }
1514
1515 private updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1516 workerNodeKey: number,
1517 taskName: string
1518 ): void {
1519 const workerNode = this.workerNodes[workerNodeKey]
1520 if (
1521 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1522 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1523 ) {
1524 const taskFunctionWorkerUsage =
1525 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1526 workerNode.getTaskFunctionWorkerUsage(taskName)!
1527 ++taskFunctionWorkerUsage.tasks.sequentiallyStolen
1528 }
1529 }
1530
1531 private resetTaskSequentiallyStolenStatisticsWorkerUsage (
1532 workerNodeKey: number
1533 ): void {
1534 const workerNode = this.workerNodes[workerNodeKey]
1535 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1536 if (workerNode?.usage != null) {
1537 workerNode.usage.tasks.sequentiallyStolen = 0
1538 }
1539 }
1540
1541 private resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1542 workerNodeKey: number,
1543 taskName: string
1544 ): void {
1545 const workerNode = this.workerNodes[workerNodeKey]
1546 if (
1547 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1548 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1549 ) {
1550 const taskFunctionWorkerUsage =
1551 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1552 workerNode.getTaskFunctionWorkerUsage(taskName)!
1553 taskFunctionWorkerUsage.tasks.sequentiallyStolen = 0
1554 }
1555 }
1556
1557 private readonly handleWorkerNodeIdleEvent = (
1558 eventDetail: WorkerNodeEventDetail,
1559 previousStolenTask?: Task<Data>
1560 ): void => {
1561 const { workerNodeKey } = eventDetail
1562 if (workerNodeKey == null) {
1563 throw new Error(
1564 "WorkerNode event detail 'workerNodeKey' property must be defined"
1565 )
1566 }
1567 const workerInfo = this.getWorkerInfo(workerNodeKey)
1568 if (
1569 this.cannotStealTask() ||
1570 (this.info.stealingWorkerNodes ?? 0) >
1571 Math.floor(this.workerNodes.length / 2)
1572 ) {
1573 if (workerInfo != null && previousStolenTask != null) {
1574 workerInfo.stealing = false
1575 }
1576 return
1577 }
1578 const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks
1579 if (
1580 workerInfo != null &&
1581 previousStolenTask != null &&
1582 workerNodeTasksUsage.sequentiallyStolen > 0 &&
1583 (workerNodeTasksUsage.executing > 0 ||
1584 this.tasksQueueSize(workerNodeKey) > 0)
1585 ) {
1586 workerInfo.stealing = false
1587 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1588 for (const taskName of workerInfo.taskFunctionNames!) {
1589 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1590 workerNodeKey,
1591 taskName
1592 )
1593 }
1594 this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
1595 return
1596 }
1597 if (workerInfo == null) {
1598 throw new Error(
1599 `Worker node with key '${workerNodeKey}' not found in pool`
1600 )
1601 }
1602 workerInfo.stealing = true
1603 const stolenTask = this.workerNodeStealTask(workerNodeKey)
1604 if (
1605 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1606 stolenTask != null
1607 ) {
1608 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1609 const taskFunctionTasksWorkerUsage = this.workerNodes[
1610 workerNodeKey
1611 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1612 ].getTaskFunctionWorkerUsage(stolenTask.name!)!.tasks
1613 if (
1614 taskFunctionTasksWorkerUsage.sequentiallyStolen === 0 ||
1615 (previousStolenTask != null &&
1616 previousStolenTask.name === stolenTask.name &&
1617 taskFunctionTasksWorkerUsage.sequentiallyStolen > 0)
1618 ) {
1619 this.updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1620 workerNodeKey,
1621 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1622 stolenTask.name!
1623 )
1624 } else {
1625 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1626 workerNodeKey,
1627 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1628 stolenTask.name!
1629 )
1630 }
1631 }
1632 sleep(exponentialDelay(workerNodeTasksUsage.sequentiallyStolen))
1633 .then(() => {
1634 this.handleWorkerNodeIdleEvent(eventDetail, stolenTask)
1635 return undefined
1636 })
1637 .catch((error: unknown) => {
1638 this.emitter?.emit(PoolEvents.error, error)
1639 })
1640 }
1641
1642 private readonly workerNodeStealTask = (
1643 workerNodeKey: number
1644 ): Task<Data> | undefined => {
1645 const workerNodes = this.workerNodes
1646 .slice()
1647 .sort(
1648 (workerNodeA, workerNodeB) =>
1649 workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
1650 )
1651 const sourceWorkerNode = workerNodes.find(
1652 (sourceWorkerNode, sourceWorkerNodeKey) =>
1653 sourceWorkerNode.info.ready &&
1654 !sourceWorkerNode.info.stealing &&
1655 sourceWorkerNodeKey !== workerNodeKey &&
1656 sourceWorkerNode.usage.tasks.queued > 0
1657 )
1658 if (sourceWorkerNode != null) {
1659 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1660 const task = sourceWorkerNode.popTask()!
1661 this.handleTask(workerNodeKey, task)
1662 this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
1663 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1664 this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey, task.name!)
1665 return task
1666 }
1667 }
1668
1669 private readonly handleWorkerNodeBackPressureEvent = (
1670 eventDetail: WorkerNodeEventDetail
1671 ): void => {
1672 if (
1673 this.cannotStealTask() ||
1674 (this.info.stealingWorkerNodes ?? 0) >
1675 Math.floor(this.workerNodes.length / 2)
1676 ) {
1677 return
1678 }
1679 const { workerId } = eventDetail
1680 const sizeOffset = 1
1681 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1682 if (this.opts.tasksQueueOptions!.size! <= sizeOffset) {
1683 return
1684 }
1685 const sourceWorkerNode =
1686 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
1687 const workerNodes = this.workerNodes
1688 .slice()
1689 .sort(
1690 (workerNodeA, workerNodeB) =>
1691 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1692 )
1693 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1694 if (
1695 sourceWorkerNode.usage.tasks.queued > 0 &&
1696 workerNode.info.ready &&
1697 !workerNode.info.stealing &&
1698 workerNode.info.id !== workerId &&
1699 workerNode.usage.tasks.queued <
1700 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1701 this.opts.tasksQueueOptions!.size! - sizeOffset
1702 ) {
1703 const workerInfo = this.getWorkerInfo(workerNodeKey)
1704 if (workerInfo == null) {
1705 throw new Error(
1706 `Worker node with key '${workerNodeKey}' not found in pool`
1707 )
1708 }
1709 workerInfo.stealing = true
1710 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1711 const task = sourceWorkerNode.popTask()!
1712 this.handleTask(workerNodeKey, task)
1713 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1714 this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey, task.name!)
1715 workerInfo.stealing = false
1716 }
1717 }
1718 }
1719
1720 /**
1721 * This method is the message listener registered on each worker.
1722 */
1723 protected readonly workerMessageListener = (
1724 message: MessageValue<Response>
1725 ): void => {
1726 this.checkMessageWorkerId(message)
1727 const { workerId, ready, taskId, taskFunctionNames } = message
1728 if (ready != null && taskFunctionNames != null) {
1729 // Worker ready response received from worker
1730 this.handleWorkerReadyResponse(message)
1731 } else if (taskId != null) {
1732 // Task execution response received from worker
1733 this.handleTaskExecutionResponse(message)
1734 } else if (taskFunctionNames != null) {
1735 // Task function names message received from worker
1736 const workerInfo = this.getWorkerInfo(
1737 this.getWorkerNodeKeyByWorkerId(workerId)
1738 )
1739 if (workerInfo != null) {
1740 workerInfo.taskFunctionNames = taskFunctionNames
1741 }
1742 }
1743 }
1744
1745 private checkAndEmitReadyEvent (): void {
1746 if (!this.readyEventEmitted && this.ready) {
1747 this.emitter?.emit(PoolEvents.ready, this.info)
1748 this.readyEventEmitted = true
1749 }
1750 }
1751
1752 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
1753 const { workerId, ready, taskFunctionNames } = message
1754 if (ready == null || !ready) {
1755 throw new Error(`Worker ${workerId} failed to initialize`)
1756 }
1757 const workerNode =
1758 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
1759 workerNode.info.ready = ready
1760 workerNode.info.taskFunctionNames = taskFunctionNames
1761 this.checkAndEmitReadyEvent()
1762 }
1763
1764 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1765 const { workerId, taskId, workerError, data } = message
1766 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1767 const promiseResponse = this.promiseResponseMap.get(taskId!)
1768 if (promiseResponse != null) {
1769 const { resolve, reject, workerNodeKey, asyncResource } = promiseResponse
1770 const workerNode = this.workerNodes[workerNodeKey]
1771 if (workerError != null) {
1772 this.emitter?.emit(PoolEvents.taskError, workerError)
1773 asyncResource != null
1774 ? asyncResource.runInAsyncScope(
1775 reject,
1776 this.emitter,
1777 workerError.message
1778 )
1779 : reject(workerError.message)
1780 } else {
1781 asyncResource != null
1782 ? asyncResource.runInAsyncScope(resolve, this.emitter, data)
1783 : resolve(data as Response)
1784 }
1785 asyncResource?.emitDestroy()
1786 this.afterTaskExecutionHook(workerNodeKey, message)
1787 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1788 this.promiseResponseMap.delete(taskId!)
1789 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1790 workerNode?.emit('taskFinished', taskId)
1791 if (
1792 this.opts.enableTasksQueue === true &&
1793 !this.destroying &&
1794 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1795 workerNode != null
1796 ) {
1797 const workerNodeTasksUsage = workerNode.usage.tasks
1798 if (
1799 this.tasksQueueSize(workerNodeKey) > 0 &&
1800 workerNodeTasksUsage.executing <
1801 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1802 this.opts.tasksQueueOptions!.concurrency!
1803 ) {
1804 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1805 this.executeTask(workerNodeKey, this.dequeueTask(workerNodeKey)!)
1806 }
1807 if (
1808 workerNodeTasksUsage.executing === 0 &&
1809 this.tasksQueueSize(workerNodeKey) === 0 &&
1810 workerNodeTasksUsage.sequentiallyStolen === 0
1811 ) {
1812 workerNode.emit('idle', {
1813 workerId,
1814 workerNodeKey
1815 })
1816 }
1817 }
1818 }
1819 }
1820
1821 private checkAndEmitTaskExecutionEvents (): void {
1822 if (this.busy) {
1823 this.emitter?.emit(PoolEvents.busy, this.info)
1824 }
1825 }
1826
1827 private checkAndEmitTaskQueuingEvents (): void {
1828 if (this.hasBackPressure()) {
1829 this.emitter?.emit(PoolEvents.backPressure, this.info)
1830 }
1831 }
1832
1833 /**
1834 * Emits dynamic worker creation events.
1835 */
1836 protected abstract checkAndEmitDynamicWorkerCreationEvents (): void
1837
1838 /**
1839 * Gets the worker information given its worker node key.
1840 *
1841 * @param workerNodeKey - The worker node key.
1842 * @returns The worker information.
1843 */
1844 protected getWorkerInfo (workerNodeKey: number): WorkerInfo | undefined {
1845 return this.workerNodes[workerNodeKey]?.info
1846 }
1847
1848 /**
1849 * Creates a worker node.
1850 *
1851 * @returns The created worker node.
1852 */
1853 private createWorkerNode (): IWorkerNode<Worker, Data> {
1854 const workerNode = new WorkerNode<Worker, Data>(
1855 this.worker,
1856 this.filePath,
1857 {
1858 env: this.opts.env,
1859 workerOptions: this.opts.workerOptions,
1860 tasksQueueBackPressureSize:
1861 this.opts.tasksQueueOptions?.size ??
1862 getDefaultTasksQueueOptions(
1863 this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
1864 ).size
1865 }
1866 )
1867 // Flag the worker node as ready at pool startup.
1868 if (this.starting) {
1869 workerNode.info.ready = true
1870 }
1871 return workerNode
1872 }
1873
1874 /**
1875 * Adds the given worker node in the pool worker nodes.
1876 *
1877 * @param workerNode - The worker node.
1878 * @returns The added worker node key.
1879 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1880 */
1881 private addWorkerNode (workerNode: IWorkerNode<Worker, Data>): number {
1882 this.workerNodes.push(workerNode)
1883 const workerNodeKey = this.workerNodes.indexOf(workerNode)
1884 if (workerNodeKey === -1) {
1885 throw new Error('Worker added not found in worker nodes')
1886 }
1887 return workerNodeKey
1888 }
1889
1890 private checkAndEmitEmptyEvent (): void {
1891 if (this.empty) {
1892 this.emitter?.emit(PoolEvents.empty, this.info)
1893 this.readyEventEmitted = false
1894 }
1895 }
1896
1897 /**
1898 * Removes the worker node from the pool worker nodes.
1899 *
1900 * @param workerNode - The worker node.
1901 */
1902 private removeWorkerNode (workerNode: IWorkerNode<Worker, Data>): void {
1903 const workerNodeKey = this.workerNodes.indexOf(workerNode)
1904 if (workerNodeKey !== -1) {
1905 this.workerNodes.splice(workerNodeKey, 1)
1906 this.workerChoiceStrategyContext?.remove(workerNodeKey)
1907 }
1908 this.checkAndEmitEmptyEvent()
1909 }
1910
1911 protected flagWorkerNodeAsNotReady (workerNodeKey: number): void {
1912 const workerInfo = this.getWorkerInfo(workerNodeKey)
1913 if (workerInfo != null) {
1914 workerInfo.ready = false
1915 }
1916 }
1917
1918 private hasBackPressure (): boolean {
1919 return (
1920 this.opts.enableTasksQueue === true &&
1921 this.workerNodes.findIndex(
1922 workerNode => !workerNode.hasBackPressure()
1923 ) === -1
1924 )
1925 }
1926
1927 /**
1928 * Executes the given task on the worker given its worker node key.
1929 *
1930 * @param workerNodeKey - The worker node key.
1931 * @param task - The task to execute.
1932 */
1933 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1934 this.beforeTaskExecutionHook(workerNodeKey, task)
1935 this.sendToWorker(workerNodeKey, task, task.transferList)
1936 this.checkAndEmitTaskExecutionEvents()
1937 }
1938
1939 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1940 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1941 this.checkAndEmitTaskQueuingEvents()
1942 return tasksQueueSize
1943 }
1944
1945 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1946 return this.workerNodes[workerNodeKey].dequeueTask()
1947 }
1948
1949 private tasksQueueSize (workerNodeKey: number): number {
1950 return this.workerNodes[workerNodeKey].tasksQueueSize()
1951 }
1952
1953 protected flushTasksQueue (workerNodeKey: number): number {
1954 let flushedTasks = 0
1955 while (this.tasksQueueSize(workerNodeKey) > 0) {
1956 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1957 this.executeTask(workerNodeKey, this.dequeueTask(workerNodeKey)!)
1958 ++flushedTasks
1959 }
1960 this.workerNodes[workerNodeKey].clearTasksQueue()
1961 return flushedTasks
1962 }
1963
1964 private flushTasksQueues (): void {
1965 for (const [workerNodeKey] of this.workerNodes.entries()) {
1966 this.flushTasksQueue(workerNodeKey)
1967 }
1968 }
1969 }