refactor: renable standard JS linter rules
[poolifier.git] / src / pools / abstract-pool.ts
1 import { randomUUID } from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import type { TransferListItem } from 'node:worker_threads'
4 import { EventEmitterAsyncResource } from 'node:events'
5 import { AsyncResource } from 'node:async_hooks'
6 import type {
7 MessageValue,
8 PromiseResponseWrapper,
9 Task
10 } from '../utility-types.js'
11 import {
12 DEFAULT_TASK_NAME,
13 EMPTY_FUNCTION,
14 average,
15 exponentialDelay,
16 isKillBehavior,
17 isPlainObject,
18 max,
19 median,
20 min,
21 round,
22 sleep
23 } from '../utils.js'
24 import { KillBehaviors } from '../worker/worker-options.js'
25 import type { TaskFunction } from '../worker/task-functions.js'
26 import {
27 type IPool,
28 PoolEvents,
29 type PoolInfo,
30 type PoolOptions,
31 type PoolType,
32 PoolTypes,
33 type TasksQueueOptions
34 } from './pool.js'
35 import type {
36 IWorker,
37 IWorkerNode,
38 WorkerInfo,
39 WorkerNodeEventDetail,
40 WorkerType
41 } from './worker.js'
42 import {
43 Measurements,
44 WorkerChoiceStrategies,
45 type WorkerChoiceStrategy,
46 type WorkerChoiceStrategyOptions
47 } from './selection-strategies/selection-strategies-types.js'
48 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context.js'
49 import { version } from './version.js'
50 import { WorkerNode } from './worker-node.js'
51 import {
52 checkFilePath,
53 checkValidTasksQueueOptions,
54 checkValidWorkerChoiceStrategy,
55 getDefaultTasksQueueOptions,
56 updateEluWorkerUsage,
57 updateRunTimeWorkerUsage,
58 updateTaskStatisticsWorkerUsage,
59 updateWaitTimeWorkerUsage,
60 waitWorkerNodeEvents
61 } from './utils.js'
62
63 /**
64 * Base class that implements some shared logic for all poolifier pools.
65 *
66 * @typeParam Worker - Type of worker which manages this pool.
67 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
68 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
69 */
70 export abstract class AbstractPool<
71 Worker extends IWorker,
72 Data = unknown,
73 Response = unknown
74 > implements IPool<Worker, Data, Response> {
75 /** @inheritDoc */
76 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
77
78 /** @inheritDoc */
79 public emitter?: EventEmitterAsyncResource
80
81 /**
82 * The task execution response promise map:
83 * - `key`: The message id of each submitted task.
84 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
85 *
86 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
87 */
88 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
89 new Map<string, PromiseResponseWrapper<Response>>()
90
91 /**
92 * Worker choice strategy context referencing a worker choice algorithm implementation.
93 */
94 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
95 Worker,
96 Data,
97 Response
98 >
99
100 /**
101 * The task functions added at runtime map:
102 * - `key`: The task function name.
103 * - `value`: The task function itself.
104 */
105 private readonly taskFunctions: Map<string, TaskFunction<Data, Response>>
106
107 /**
108 * Whether the pool is started or not.
109 */
110 private started: boolean
111 /**
112 * Whether the pool is starting or not.
113 */
114 private starting: boolean
115 /**
116 * Whether the pool is destroying or not.
117 */
118 private destroying: boolean
119 /**
120 * Whether the pool ready event has been emitted or not.
121 */
122 private readyEventEmitted: boolean
123 /**
124 * The start timestamp of the pool.
125 */
126 private readonly startTimestamp
127
128 /**
129 * Constructs a new poolifier pool.
130 *
131 * @param minimumNumberOfWorkers - Minimum number of workers that this pool manages.
132 * @param filePath - Path to the worker file.
133 * @param opts - Options for the pool.
134 * @param maximumNumberOfWorkers - Maximum number of workers that this pool manages.
135 */
136 public constructor (
137 protected readonly minimumNumberOfWorkers: number,
138 protected readonly filePath: string,
139 protected readonly opts: PoolOptions<Worker>,
140 protected readonly maximumNumberOfWorkers?: number
141 ) {
142 if (!this.isMain()) {
143 throw new Error(
144 'Cannot start a pool from a worker with the same type as the pool'
145 )
146 }
147 this.checkPoolType()
148 checkFilePath(this.filePath)
149 this.checkMinimumNumberOfWorkers(this.minimumNumberOfWorkers)
150 this.checkPoolOptions(this.opts)
151
152 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
153 this.executeTask = this.executeTask.bind(this)
154 this.enqueueTask = this.enqueueTask.bind(this)
155
156 if (this.opts.enableEvents === true) {
157 this.initializeEventEmitter()
158 }
159 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
160 Worker,
161 Data,
162 Response
163 >(
164 this,
165 this.opts.workerChoiceStrategy,
166 this.opts.workerChoiceStrategyOptions
167 )
168
169 this.setupHook()
170
171 this.taskFunctions = new Map<string, TaskFunction<Data, Response>>()
172
173 this.started = false
174 this.starting = false
175 this.destroying = false
176 this.readyEventEmitted = false
177 if (this.opts.startWorkers === true) {
178 this.start()
179 }
180
181 this.startTimestamp = performance.now()
182 }
183
184 private checkPoolType (): void {
185 if (this.type === PoolTypes.fixed && this.maximumNumberOfWorkers != null) {
186 throw new Error(
187 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
188 )
189 }
190 }
191
192 private checkMinimumNumberOfWorkers (minimumNumberOfWorkers: number): void {
193 if (minimumNumberOfWorkers == null) {
194 throw new Error(
195 'Cannot instantiate a pool without specifying the number of workers'
196 )
197 } else if (!Number.isSafeInteger(minimumNumberOfWorkers)) {
198 throw new TypeError(
199 'Cannot instantiate a pool with a non safe integer number of workers'
200 )
201 } else if (minimumNumberOfWorkers < 0) {
202 throw new RangeError(
203 'Cannot instantiate a pool with a negative number of workers'
204 )
205 } else if (this.type === PoolTypes.fixed && minimumNumberOfWorkers === 0) {
206 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
207 }
208 }
209
210 private checkPoolOptions (opts: PoolOptions<Worker>): void {
211 if (isPlainObject(opts)) {
212 this.opts.startWorkers = opts.startWorkers ?? true
213 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
214 checkValidWorkerChoiceStrategy(opts.workerChoiceStrategy!)
215 this.opts.workerChoiceStrategy =
216 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
217 this.checkValidWorkerChoiceStrategyOptions(
218 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
219 opts.workerChoiceStrategyOptions!
220 )
221 if (opts.workerChoiceStrategyOptions != null) {
222 this.opts.workerChoiceStrategyOptions = opts.workerChoiceStrategyOptions
223 }
224 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
225 this.opts.enableEvents = opts.enableEvents ?? true
226 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
227 if (this.opts.enableTasksQueue) {
228 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
229 checkValidTasksQueueOptions(opts.tasksQueueOptions!)
230 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
231 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
232 opts.tasksQueueOptions!
233 )
234 }
235 } else {
236 throw new TypeError('Invalid pool options: must be a plain object')
237 }
238 }
239
240 private checkValidWorkerChoiceStrategyOptions (
241 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
242 ): void {
243 if (
244 workerChoiceStrategyOptions != null &&
245 !isPlainObject(workerChoiceStrategyOptions)
246 ) {
247 throw new TypeError(
248 'Invalid worker choice strategy options: must be a plain object'
249 )
250 }
251 if (
252 workerChoiceStrategyOptions?.weights != null &&
253 Object.keys(workerChoiceStrategyOptions.weights).length !==
254 (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
255 ) {
256 throw new Error(
257 'Invalid worker choice strategy options: must have a weight for each worker node'
258 )
259 }
260 if (
261 workerChoiceStrategyOptions?.measurement != null &&
262 !Object.values(Measurements).includes(
263 workerChoiceStrategyOptions.measurement
264 )
265 ) {
266 throw new Error(
267 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
268 )
269 }
270 }
271
272 private initializeEventEmitter (): void {
273 this.emitter = new EventEmitterAsyncResource({
274 name: `poolifier:${this.type}-${this.worker}-pool`
275 })
276 }
277
278 /** @inheritDoc */
279 public get info (): PoolInfo {
280 return {
281 version,
282 type: this.type,
283 worker: this.worker,
284 started: this.started,
285 ready: this.ready,
286 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
287 strategy: this.opts.workerChoiceStrategy!,
288 minSize: this.minimumNumberOfWorkers,
289 maxSize: this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers,
290 ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
291 .runTime.aggregate &&
292 this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
293 .waitTime.aggregate && { utilization: round(this.utilization) }),
294 workerNodes: this.workerNodes.length,
295 idleWorkerNodes: this.workerNodes.reduce(
296 (accumulator, workerNode) =>
297 workerNode.usage.tasks.executing === 0
298 ? accumulator + 1
299 : accumulator,
300 0
301 ),
302 ...(this.opts.enableTasksQueue === true && {
303 stealingWorkerNodes: this.workerNodes.reduce(
304 (accumulator, workerNode) =>
305 workerNode.info.stealing ? accumulator + 1 : accumulator,
306 0
307 )
308 }),
309 busyWorkerNodes: this.workerNodes.reduce(
310 (accumulator, _workerNode, workerNodeKey) =>
311 this.isWorkerNodeBusy(workerNodeKey) ? accumulator + 1 : accumulator,
312 0
313 ),
314 executedTasks: this.workerNodes.reduce(
315 (accumulator, workerNode) =>
316 accumulator + workerNode.usage.tasks.executed,
317 0
318 ),
319 executingTasks: this.workerNodes.reduce(
320 (accumulator, workerNode) =>
321 accumulator + workerNode.usage.tasks.executing,
322 0
323 ),
324 ...(this.opts.enableTasksQueue === true && {
325 queuedTasks: this.workerNodes.reduce(
326 (accumulator, workerNode) =>
327 accumulator + workerNode.usage.tasks.queued,
328 0
329 )
330 }),
331 ...(this.opts.enableTasksQueue === true && {
332 maxQueuedTasks: this.workerNodes.reduce(
333 (accumulator, workerNode) =>
334 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
335 0
336 )
337 }),
338 ...(this.opts.enableTasksQueue === true && {
339 backPressure: this.hasBackPressure()
340 }),
341 ...(this.opts.enableTasksQueue === true && {
342 stolenTasks: this.workerNodes.reduce(
343 (accumulator, workerNode) =>
344 accumulator + workerNode.usage.tasks.stolen,
345 0
346 )
347 }),
348 failedTasks: this.workerNodes.reduce(
349 (accumulator, workerNode) =>
350 accumulator + workerNode.usage.tasks.failed,
351 0
352 ),
353 ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
354 .runTime.aggregate && {
355 runTime: {
356 minimum: round(
357 min(
358 ...this.workerNodes.map(
359 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
360 )
361 )
362 ),
363 maximum: round(
364 max(
365 ...this.workerNodes.map(
366 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
367 )
368 )
369 ),
370 ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
371 .runTime.average && {
372 average: round(
373 average(
374 this.workerNodes.reduce<number[]>(
375 (accumulator, workerNode) =>
376 accumulator.concat(workerNode.usage.runTime.history),
377 []
378 )
379 )
380 )
381 }),
382 ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
383 .runTime.median && {
384 median: round(
385 median(
386 this.workerNodes.reduce<number[]>(
387 (accumulator, workerNode) =>
388 accumulator.concat(workerNode.usage.runTime.history),
389 []
390 )
391 )
392 )
393 })
394 }
395 }),
396 ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
397 .waitTime.aggregate && {
398 waitTime: {
399 minimum: round(
400 min(
401 ...this.workerNodes.map(
402 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
403 )
404 )
405 ),
406 maximum: round(
407 max(
408 ...this.workerNodes.map(
409 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
410 )
411 )
412 ),
413 ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
414 .waitTime.average && {
415 average: round(
416 average(
417 this.workerNodes.reduce<number[]>(
418 (accumulator, workerNode) =>
419 accumulator.concat(workerNode.usage.waitTime.history),
420 []
421 )
422 )
423 )
424 }),
425 ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
426 .waitTime.median && {
427 median: round(
428 median(
429 this.workerNodes.reduce<number[]>(
430 (accumulator, workerNode) =>
431 accumulator.concat(workerNode.usage.waitTime.history),
432 []
433 )
434 )
435 )
436 })
437 }
438 })
439 }
440 }
441
442 /**
443 * The pool readiness boolean status.
444 */
445 private get ready (): boolean {
446 return (
447 this.workerNodes.reduce(
448 (accumulator, workerNode) =>
449 !workerNode.info.dynamic && workerNode.info.ready
450 ? accumulator + 1
451 : accumulator,
452 0
453 ) >= this.minimumNumberOfWorkers
454 )
455 }
456
457 /**
458 * The approximate pool utilization.
459 *
460 * @returns The pool utilization.
461 */
462 private get utilization (): number {
463 const poolTimeCapacity =
464 (performance.now() - this.startTimestamp) *
465 (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
466 const totalTasksRunTime = this.workerNodes.reduce(
467 (accumulator, workerNode) =>
468 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
469 0
470 )
471 const totalTasksWaitTime = this.workerNodes.reduce(
472 (accumulator, workerNode) =>
473 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
474 0
475 )
476 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
477 }
478
479 /**
480 * The pool type.
481 *
482 * If it is `'dynamic'`, it provides the `max` property.
483 */
484 protected abstract get type (): PoolType
485
486 /**
487 * The worker type.
488 */
489 protected abstract get worker (): WorkerType
490
491 /**
492 * Checks if the worker id sent in the received message from a worker is valid.
493 *
494 * @param message - The received message.
495 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
496 */
497 private checkMessageWorkerId (message: MessageValue<Data | Response>): void {
498 if (message.workerId == null) {
499 throw new Error('Worker message received without worker id')
500 } else if (this.getWorkerNodeKeyByWorkerId(message.workerId) === -1) {
501 throw new Error(
502 `Worker message received from unknown worker '${message.workerId}'`
503 )
504 }
505 }
506
507 /**
508 * Gets the worker node key given its worker id.
509 *
510 * @param workerId - The worker id.
511 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
512 */
513 private getWorkerNodeKeyByWorkerId (workerId: number | undefined): number {
514 return this.workerNodes.findIndex(
515 workerNode => workerNode.info.id === workerId
516 )
517 }
518
519 /** @inheritDoc */
520 public setWorkerChoiceStrategy (
521 workerChoiceStrategy: WorkerChoiceStrategy,
522 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
523 ): void {
524 checkValidWorkerChoiceStrategy(workerChoiceStrategy)
525 this.opts.workerChoiceStrategy = workerChoiceStrategy
526 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
527 this.opts.workerChoiceStrategy
528 )
529 if (workerChoiceStrategyOptions != null) {
530 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
531 }
532 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
533 workerNode.resetUsage()
534 this.sendStatisticsMessageToWorker(workerNodeKey)
535 }
536 }
537
538 /** @inheritDoc */
539 public setWorkerChoiceStrategyOptions (
540 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
541 ): void {
542 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
543 if (workerChoiceStrategyOptions != null) {
544 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
545 }
546 this.workerChoiceStrategyContext.setOptions(
547 this,
548 this.opts.workerChoiceStrategyOptions
549 )
550 }
551
552 /** @inheritDoc */
553 public enableTasksQueue (
554 enable: boolean,
555 tasksQueueOptions?: TasksQueueOptions
556 ): void {
557 if (this.opts.enableTasksQueue === true && !enable) {
558 this.unsetTaskStealing()
559 this.unsetTasksStealingOnBackPressure()
560 this.flushTasksQueues()
561 }
562 this.opts.enableTasksQueue = enable
563 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
564 this.setTasksQueueOptions(tasksQueueOptions!)
565 }
566
567 /** @inheritDoc */
568 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
569 if (this.opts.enableTasksQueue === true) {
570 checkValidTasksQueueOptions(tasksQueueOptions)
571 this.opts.tasksQueueOptions =
572 this.buildTasksQueueOptions(tasksQueueOptions)
573 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
574 this.setTasksQueueSize(this.opts.tasksQueueOptions.size!)
575 if (this.opts.tasksQueueOptions.taskStealing === true) {
576 this.unsetTaskStealing()
577 this.setTaskStealing()
578 } else {
579 this.unsetTaskStealing()
580 }
581 if (this.opts.tasksQueueOptions.tasksStealingOnBackPressure === true) {
582 this.unsetTasksStealingOnBackPressure()
583 this.setTasksStealingOnBackPressure()
584 } else {
585 this.unsetTasksStealingOnBackPressure()
586 }
587 } else if (this.opts.tasksQueueOptions != null) {
588 delete this.opts.tasksQueueOptions
589 }
590 }
591
592 private buildTasksQueueOptions (
593 tasksQueueOptions: TasksQueueOptions
594 ): TasksQueueOptions {
595 return {
596 ...getDefaultTasksQueueOptions(
597 this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
598 ),
599 ...tasksQueueOptions
600 }
601 }
602
603 private setTasksQueueSize (size: number): void {
604 for (const workerNode of this.workerNodes) {
605 workerNode.tasksQueueBackPressureSize = size
606 }
607 }
608
609 private setTaskStealing (): void {
610 for (const [workerNodeKey] of this.workerNodes.entries()) {
611 this.workerNodes[workerNodeKey].on(
612 'idleWorkerNode',
613 this.handleIdleWorkerNodeEvent
614 )
615 }
616 }
617
618 private unsetTaskStealing (): void {
619 for (const [workerNodeKey] of this.workerNodes.entries()) {
620 this.workerNodes[workerNodeKey].off(
621 'idleWorkerNode',
622 this.handleIdleWorkerNodeEvent
623 )
624 }
625 }
626
627 private setTasksStealingOnBackPressure (): void {
628 for (const [workerNodeKey] of this.workerNodes.entries()) {
629 this.workerNodes[workerNodeKey].on(
630 'backPressure',
631 this.handleBackPressureEvent
632 )
633 }
634 }
635
636 private unsetTasksStealingOnBackPressure (): void {
637 for (const [workerNodeKey] of this.workerNodes.entries()) {
638 this.workerNodes[workerNodeKey].off(
639 'backPressure',
640 this.handleBackPressureEvent
641 )
642 }
643 }
644
645 /**
646 * Whether the pool is full or not.
647 *
648 * The pool filling boolean status.
649 */
650 protected get full (): boolean {
651 return (
652 this.workerNodes.length >=
653 (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
654 )
655 }
656
657 /**
658 * Whether the pool is busy or not.
659 *
660 * The pool busyness boolean status.
661 */
662 protected abstract get busy (): boolean
663
664 /**
665 * Whether worker nodes are executing concurrently their tasks quota or not.
666 *
667 * @returns Worker nodes busyness boolean status.
668 */
669 protected internalBusy (): boolean {
670 if (this.opts.enableTasksQueue === true) {
671 return (
672 this.workerNodes.findIndex(
673 workerNode =>
674 workerNode.info.ready &&
675 workerNode.usage.tasks.executing <
676 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
677 this.opts.tasksQueueOptions!.concurrency!
678 ) === -1
679 )
680 }
681 return (
682 this.workerNodes.findIndex(
683 workerNode =>
684 workerNode.info.ready && workerNode.usage.tasks.executing === 0
685 ) === -1
686 )
687 }
688
689 private isWorkerNodeBusy (workerNodeKey: number): boolean {
690 if (this.opts.enableTasksQueue === true) {
691 return (
692 this.workerNodes[workerNodeKey].usage.tasks.executing >=
693 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
694 this.opts.tasksQueueOptions!.concurrency!
695 )
696 }
697 return this.workerNodes[workerNodeKey].usage.tasks.executing > 0
698 }
699
700 private async sendTaskFunctionOperationToWorker (
701 workerNodeKey: number,
702 message: MessageValue<Data>
703 ): Promise<boolean> {
704 return await new Promise<boolean>((resolve, reject) => {
705 const taskFunctionOperationListener = (
706 message: MessageValue<Response>
707 ): void => {
708 this.checkMessageWorkerId(message)
709 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
710 const workerId = this.getWorkerInfo(workerNodeKey).id!
711 if (
712 message.taskFunctionOperationStatus != null &&
713 message.workerId === workerId
714 ) {
715 if (message.taskFunctionOperationStatus) {
716 resolve(true)
717 } else if (!message.taskFunctionOperationStatus) {
718 reject(
719 new Error(
720 `Task function operation '${
721 message.taskFunctionOperation as string
722 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
723 }' failed on worker ${message.workerId} with error: '${
724 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
725 message.workerError!.message
726 }'`
727 )
728 )
729 }
730 this.deregisterWorkerMessageListener(
731 this.getWorkerNodeKeyByWorkerId(message.workerId),
732 taskFunctionOperationListener
733 )
734 }
735 }
736 this.registerWorkerMessageListener(
737 workerNodeKey,
738 taskFunctionOperationListener
739 )
740 this.sendToWorker(workerNodeKey, message)
741 })
742 }
743
744 private async sendTaskFunctionOperationToWorkers (
745 message: MessageValue<Data>
746 ): Promise<boolean> {
747 return await new Promise<boolean>((resolve, reject) => {
748 const responsesReceived = new Array<MessageValue<Response>>()
749 const taskFunctionOperationsListener = (
750 message: MessageValue<Response>
751 ): void => {
752 this.checkMessageWorkerId(message)
753 if (message.taskFunctionOperationStatus != null) {
754 responsesReceived.push(message)
755 if (responsesReceived.length === this.workerNodes.length) {
756 if (
757 responsesReceived.every(
758 message => message.taskFunctionOperationStatus === true
759 )
760 ) {
761 resolve(true)
762 } else if (
763 responsesReceived.some(
764 message => message.taskFunctionOperationStatus === false
765 )
766 ) {
767 const errorResponse = responsesReceived.find(
768 response => response.taskFunctionOperationStatus === false
769 )
770 reject(
771 new Error(
772 `Task function operation '${
773 message.taskFunctionOperation as string
774 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
775 }' failed on worker ${errorResponse!
776 .workerId!} with error: '${
777 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
778 errorResponse!.workerError!.message
779 }'`
780 )
781 )
782 }
783 this.deregisterWorkerMessageListener(
784 this.getWorkerNodeKeyByWorkerId(message.workerId),
785 taskFunctionOperationsListener
786 )
787 }
788 }
789 }
790 for (const [workerNodeKey] of this.workerNodes.entries()) {
791 this.registerWorkerMessageListener(
792 workerNodeKey,
793 taskFunctionOperationsListener
794 )
795 this.sendToWorker(workerNodeKey, message)
796 }
797 })
798 }
799
800 /** @inheritDoc */
801 public hasTaskFunction (name: string): boolean {
802 for (const workerNode of this.workerNodes) {
803 if (
804 Array.isArray(workerNode.info.taskFunctionNames) &&
805 workerNode.info.taskFunctionNames.includes(name)
806 ) {
807 return true
808 }
809 }
810 return false
811 }
812
813 /** @inheritDoc */
814 public async addTaskFunction (
815 name: string,
816 fn: TaskFunction<Data, Response>
817 ): Promise<boolean> {
818 if (typeof name !== 'string') {
819 throw new TypeError('name argument must be a string')
820 }
821 if (typeof name === 'string' && name.trim().length === 0) {
822 throw new TypeError('name argument must not be an empty string')
823 }
824 if (typeof fn !== 'function') {
825 throw new TypeError('fn argument must be a function')
826 }
827 const opResult = await this.sendTaskFunctionOperationToWorkers({
828 taskFunctionOperation: 'add',
829 taskFunctionName: name,
830 taskFunction: fn.toString()
831 })
832 this.taskFunctions.set(name, fn)
833 return opResult
834 }
835
836 /** @inheritDoc */
837 public async removeTaskFunction (name: string): Promise<boolean> {
838 if (!this.taskFunctions.has(name)) {
839 throw new Error(
840 'Cannot remove a task function not handled on the pool side'
841 )
842 }
843 const opResult = await this.sendTaskFunctionOperationToWorkers({
844 taskFunctionOperation: 'remove',
845 taskFunctionName: name
846 })
847 this.deleteTaskFunctionWorkerUsages(name)
848 this.taskFunctions.delete(name)
849 return opResult
850 }
851
852 /** @inheritDoc */
853 public listTaskFunctionNames (): string[] {
854 for (const workerNode of this.workerNodes) {
855 if (
856 Array.isArray(workerNode.info.taskFunctionNames) &&
857 workerNode.info.taskFunctionNames.length > 0
858 ) {
859 return workerNode.info.taskFunctionNames
860 }
861 }
862 return []
863 }
864
865 /** @inheritDoc */
866 public async setDefaultTaskFunction (name: string): Promise<boolean> {
867 return await this.sendTaskFunctionOperationToWorkers({
868 taskFunctionOperation: 'default',
869 taskFunctionName: name
870 })
871 }
872
873 private deleteTaskFunctionWorkerUsages (name: string): void {
874 for (const workerNode of this.workerNodes) {
875 workerNode.deleteTaskFunctionWorkerUsage(name)
876 }
877 }
878
879 private shallExecuteTask (workerNodeKey: number): boolean {
880 return (
881 this.tasksQueueSize(workerNodeKey) === 0 &&
882 this.workerNodes[workerNodeKey].usage.tasks.executing <
883 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
884 this.opts.tasksQueueOptions!.concurrency!
885 )
886 }
887
888 /** @inheritDoc */
889 public async execute (
890 data?: Data,
891 name?: string,
892 transferList?: TransferListItem[]
893 ): Promise<Response> {
894 return await new Promise<Response>((resolve, reject) => {
895 if (!this.started) {
896 reject(new Error('Cannot execute a task on not started pool'))
897 return
898 }
899 if (this.destroying) {
900 reject(new Error('Cannot execute a task on destroying pool'))
901 return
902 }
903 if (name != null && typeof name !== 'string') {
904 reject(new TypeError('name argument must be a string'))
905 return
906 }
907 if (
908 name != null &&
909 typeof name === 'string' &&
910 name.trim().length === 0
911 ) {
912 reject(new TypeError('name argument must not be an empty string'))
913 return
914 }
915 if (transferList != null && !Array.isArray(transferList)) {
916 reject(new TypeError('transferList argument must be an array'))
917 return
918 }
919 const timestamp = performance.now()
920 const workerNodeKey = this.chooseWorkerNode()
921 const task: Task<Data> = {
922 name: name ?? DEFAULT_TASK_NAME,
923 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
924 data: data ?? ({} as Data),
925 transferList,
926 timestamp,
927 taskId: randomUUID()
928 }
929 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
930 this.promiseResponseMap.set(task.taskId!, {
931 resolve,
932 reject,
933 workerNodeKey,
934 ...(this.emitter != null && {
935 asyncResource: new AsyncResource('poolifier:task', {
936 triggerAsyncId: this.emitter.asyncId,
937 requireManualDestroy: true
938 })
939 })
940 })
941 if (
942 this.opts.enableTasksQueue === false ||
943 (this.opts.enableTasksQueue === true &&
944 this.shallExecuteTask(workerNodeKey))
945 ) {
946 this.executeTask(workerNodeKey, task)
947 } else {
948 this.enqueueTask(workerNodeKey, task)
949 }
950 })
951 }
952
953 /** @inheritdoc */
954 public start (): void {
955 if (this.started) {
956 throw new Error('Cannot start an already started pool')
957 }
958 if (this.starting) {
959 throw new Error('Cannot start an already starting pool')
960 }
961 if (this.destroying) {
962 throw new Error('Cannot start a destroying pool')
963 }
964 this.starting = true
965 while (
966 this.workerNodes.reduce(
967 (accumulator, workerNode) =>
968 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
969 0
970 ) < this.minimumNumberOfWorkers
971 ) {
972 this.createAndSetupWorkerNode()
973 }
974 this.starting = false
975 this.started = true
976 }
977
978 /** @inheritDoc */
979 public async destroy (): Promise<void> {
980 if (!this.started) {
981 throw new Error('Cannot destroy an already destroyed pool')
982 }
983 if (this.starting) {
984 throw new Error('Cannot destroy an starting pool')
985 }
986 if (this.destroying) {
987 throw new Error('Cannot destroy an already destroying pool')
988 }
989 this.destroying = true
990 await Promise.all(
991 this.workerNodes.map(async (_workerNode, workerNodeKey) => {
992 await this.destroyWorkerNode(workerNodeKey)
993 })
994 )
995 this.emitter?.emit(PoolEvents.destroy, this.info)
996 this.emitter?.emitDestroy()
997 this.emitter?.removeAllListeners()
998 this.readyEventEmitted = false
999 this.destroying = false
1000 this.started = false
1001 }
1002
1003 private async sendKillMessageToWorker (workerNodeKey: number): Promise<void> {
1004 await new Promise<void>((resolve, reject) => {
1005 if (this.workerNodes?.[workerNodeKey] == null) {
1006 resolve()
1007 return
1008 }
1009 const killMessageListener = (message: MessageValue<Response>): void => {
1010 this.checkMessageWorkerId(message)
1011 if (message.kill === 'success') {
1012 resolve()
1013 } else if (message.kill === 'failure') {
1014 reject(
1015 new Error(
1016 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1017 `Kill message handling failed on worker ${message.workerId!}`
1018 )
1019 )
1020 }
1021 }
1022 // FIXME: should be registered only once
1023 this.registerWorkerMessageListener(workerNodeKey, killMessageListener)
1024 this.sendToWorker(workerNodeKey, { kill: true })
1025 })
1026 }
1027
1028 /**
1029 * Terminates the worker node given its worker node key.
1030 *
1031 * @param workerNodeKey - The worker node key.
1032 */
1033 protected async destroyWorkerNode (workerNodeKey: number): Promise<void> {
1034 this.flagWorkerNodeAsNotReady(workerNodeKey)
1035 const flushedTasks = this.flushTasksQueue(workerNodeKey)
1036 const workerNode = this.workerNodes[workerNodeKey]
1037 await waitWorkerNodeEvents(
1038 workerNode,
1039 'taskFinished',
1040 flushedTasks,
1041 this.opts.tasksQueueOptions?.tasksFinishedTimeout ??
1042 getDefaultTasksQueueOptions(
1043 this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
1044 ).tasksFinishedTimeout
1045 )
1046 await this.sendKillMessageToWorker(workerNodeKey)
1047 await workerNode.terminate()
1048 }
1049
1050 /**
1051 * Setup hook to execute code before worker nodes are created in the abstract constructor.
1052 * Can be overridden.
1053 *
1054 * @virtual
1055 */
1056 protected setupHook (): void {
1057 /* Intentionally empty */
1058 }
1059
1060 /**
1061 * Should return whether the worker is the main worker or not.
1062 */
1063 protected abstract isMain (): boolean
1064
1065 /**
1066 * Hook executed before the worker task execution.
1067 * Can be overridden.
1068 *
1069 * @param workerNodeKey - The worker node key.
1070 * @param task - The task to execute.
1071 */
1072 protected beforeTaskExecutionHook (
1073 workerNodeKey: number,
1074 task: Task<Data>
1075 ): void {
1076 if (this.workerNodes[workerNodeKey]?.usage != null) {
1077 const workerUsage = this.workerNodes[workerNodeKey].usage
1078 ++workerUsage.tasks.executing
1079 updateWaitTimeWorkerUsage(
1080 this.workerChoiceStrategyContext,
1081 workerUsage,
1082 task
1083 )
1084 }
1085 if (
1086 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1087 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1088 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(task.name!) !=
1089 null
1090 ) {
1091 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1092 const taskFunctionWorkerUsage = this.workerNodes[
1093 workerNodeKey
1094 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1095 ].getTaskFunctionWorkerUsage(task.name!)!
1096 ++taskFunctionWorkerUsage.tasks.executing
1097 updateWaitTimeWorkerUsage(
1098 this.workerChoiceStrategyContext,
1099 taskFunctionWorkerUsage,
1100 task
1101 )
1102 }
1103 }
1104
1105 /**
1106 * Hook executed after the worker task execution.
1107 * Can be overridden.
1108 *
1109 * @param workerNodeKey - The worker node key.
1110 * @param message - The received message.
1111 */
1112 protected afterTaskExecutionHook (
1113 workerNodeKey: number,
1114 message: MessageValue<Response>
1115 ): void {
1116 let needWorkerChoiceStrategyUpdate = false
1117 if (this.workerNodes[workerNodeKey]?.usage != null) {
1118 const workerUsage = this.workerNodes[workerNodeKey].usage
1119 updateTaskStatisticsWorkerUsage(workerUsage, message)
1120 updateRunTimeWorkerUsage(
1121 this.workerChoiceStrategyContext,
1122 workerUsage,
1123 message
1124 )
1125 updateEluWorkerUsage(
1126 this.workerChoiceStrategyContext,
1127 workerUsage,
1128 message
1129 )
1130 needWorkerChoiceStrategyUpdate = true
1131 }
1132 if (
1133 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1134 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
1135 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1136 message.taskPerformance!.name
1137 ) != null
1138 ) {
1139 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1140 const taskFunctionWorkerUsage = this.workerNodes[
1141 workerNodeKey
1142 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1143 ].getTaskFunctionWorkerUsage(message.taskPerformance!.name)!
1144 updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
1145 updateRunTimeWorkerUsage(
1146 this.workerChoiceStrategyContext,
1147 taskFunctionWorkerUsage,
1148 message
1149 )
1150 updateEluWorkerUsage(
1151 this.workerChoiceStrategyContext,
1152 taskFunctionWorkerUsage,
1153 message
1154 )
1155 needWorkerChoiceStrategyUpdate = true
1156 }
1157 if (needWorkerChoiceStrategyUpdate) {
1158 this.workerChoiceStrategyContext.update(workerNodeKey)
1159 }
1160 }
1161
1162 /**
1163 * Whether the worker node shall update its task function worker usage or not.
1164 *
1165 * @param workerNodeKey - The worker node key.
1166 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1167 */
1168 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
1169 const workerInfo = this.getWorkerInfo(workerNodeKey)
1170 return (
1171 workerInfo != null &&
1172 Array.isArray(workerInfo.taskFunctionNames) &&
1173 workerInfo.taskFunctionNames.length > 2
1174 )
1175 }
1176
1177 /**
1178 * Chooses a worker node for the next task.
1179 *
1180 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
1181 *
1182 * @returns The chosen worker node key
1183 */
1184 private chooseWorkerNode (): number {
1185 if (this.shallCreateDynamicWorker()) {
1186 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
1187 if (
1188 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1189 ) {
1190 return workerNodeKey
1191 }
1192 }
1193 return this.workerChoiceStrategyContext.execute()
1194 }
1195
1196 /**
1197 * Conditions for dynamic worker creation.
1198 *
1199 * @returns Whether to create a dynamic worker or not.
1200 */
1201 protected abstract shallCreateDynamicWorker (): boolean
1202
1203 /**
1204 * Sends a message to worker given its worker node key.
1205 *
1206 * @param workerNodeKey - The worker node key.
1207 * @param message - The message.
1208 * @param transferList - The optional array of transferable objects.
1209 */
1210 protected abstract sendToWorker (
1211 workerNodeKey: number,
1212 message: MessageValue<Data>,
1213 transferList?: TransferListItem[]
1214 ): void
1215
1216 /**
1217 * Creates a new, completely set up worker node.
1218 *
1219 * @returns New, completely set up worker node key.
1220 */
1221 protected createAndSetupWorkerNode (): number {
1222 const workerNode = this.createWorkerNode()
1223 workerNode.registerWorkerEventHandler(
1224 'online',
1225 this.opts.onlineHandler ?? EMPTY_FUNCTION
1226 )
1227 workerNode.registerWorkerEventHandler(
1228 'message',
1229 this.opts.messageHandler ?? EMPTY_FUNCTION
1230 )
1231 workerNode.registerWorkerEventHandler(
1232 'error',
1233 this.opts.errorHandler ?? EMPTY_FUNCTION
1234 )
1235 workerNode.registerWorkerEventHandler('error', (error: Error) => {
1236 workerNode.info.ready = false
1237 this.emitter?.emit(PoolEvents.error, error)
1238 if (
1239 this.started &&
1240 !this.destroying &&
1241 this.opts.restartWorkerOnError === true
1242 ) {
1243 if (workerNode.info.dynamic) {
1244 this.createAndSetupDynamicWorkerNode()
1245 } else {
1246 this.createAndSetupWorkerNode()
1247 }
1248 }
1249 if (
1250 this.started &&
1251 !this.destroying &&
1252 this.opts.enableTasksQueue === true
1253 ) {
1254 this.redistributeQueuedTasks(this.workerNodes.indexOf(workerNode))
1255 }
1256 workerNode?.terminate().catch(error => {
1257 this.emitter?.emit(PoolEvents.error, error)
1258 })
1259 })
1260 workerNode.registerWorkerEventHandler(
1261 'exit',
1262 this.opts.exitHandler ?? EMPTY_FUNCTION
1263 )
1264 workerNode.registerOnceWorkerEventHandler('exit', () => {
1265 this.removeWorkerNode(workerNode)
1266 })
1267 const workerNodeKey = this.addWorkerNode(workerNode)
1268 this.afterWorkerNodeSetup(workerNodeKey)
1269 return workerNodeKey
1270 }
1271
1272 /**
1273 * Creates a new, completely set up dynamic worker node.
1274 *
1275 * @returns New, completely set up dynamic worker node key.
1276 */
1277 protected createAndSetupDynamicWorkerNode (): number {
1278 const workerNodeKey = this.createAndSetupWorkerNode()
1279 this.registerWorkerMessageListener(workerNodeKey, message => {
1280 this.checkMessageWorkerId(message)
1281 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1282 message.workerId
1283 )
1284 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
1285 // Kill message received from worker
1286 if (
1287 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1288 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
1289 ((this.opts.enableTasksQueue === false &&
1290 workerUsage.tasks.executing === 0) ||
1291 (this.opts.enableTasksQueue === true &&
1292 workerUsage.tasks.executing === 0 &&
1293 this.tasksQueueSize(localWorkerNodeKey) === 0)))
1294 ) {
1295 // Flag the worker node as not ready immediately
1296 this.flagWorkerNodeAsNotReady(localWorkerNodeKey)
1297 this.destroyWorkerNode(localWorkerNodeKey).catch(error => {
1298 this.emitter?.emit(PoolEvents.error, error)
1299 })
1300 }
1301 })
1302 const workerInfo = this.getWorkerInfo(workerNodeKey)
1303 this.sendToWorker(workerNodeKey, {
1304 checkActive: true
1305 })
1306 if (this.taskFunctions.size > 0) {
1307 for (const [taskFunctionName, taskFunction] of this.taskFunctions) {
1308 this.sendTaskFunctionOperationToWorker(workerNodeKey, {
1309 taskFunctionOperation: 'add',
1310 taskFunctionName,
1311 taskFunction: taskFunction.toString()
1312 }).catch(error => {
1313 this.emitter?.emit(PoolEvents.error, error)
1314 })
1315 }
1316 }
1317 workerInfo.dynamic = true
1318 if (
1319 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
1320 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1321 ) {
1322 workerInfo.ready = true
1323 }
1324 this.checkAndEmitDynamicWorkerCreationEvents()
1325 return workerNodeKey
1326 }
1327
1328 /**
1329 * Registers a listener callback on the worker given its worker node key.
1330 *
1331 * @param workerNodeKey - The worker node key.
1332 * @param listener - The message listener callback.
1333 */
1334 protected abstract registerWorkerMessageListener<
1335 Message extends Data | Response
1336 >(
1337 workerNodeKey: number,
1338 listener: (message: MessageValue<Message>) => void
1339 ): void
1340
1341 /**
1342 * Registers once a listener callback on the worker given its worker node key.
1343 *
1344 * @param workerNodeKey - The worker node key.
1345 * @param listener - The message listener callback.
1346 */
1347 protected abstract registerOnceWorkerMessageListener<
1348 Message extends Data | Response
1349 >(
1350 workerNodeKey: number,
1351 listener: (message: MessageValue<Message>) => void
1352 ): void
1353
1354 /**
1355 * Deregisters a listener callback on the worker given its worker node key.
1356 *
1357 * @param workerNodeKey - The worker node key.
1358 * @param listener - The message listener callback.
1359 */
1360 protected abstract deregisterWorkerMessageListener<
1361 Message extends Data | Response
1362 >(
1363 workerNodeKey: number,
1364 listener: (message: MessageValue<Message>) => void
1365 ): void
1366
1367 /**
1368 * Method hooked up after a worker node has been newly created.
1369 * Can be overridden.
1370 *
1371 * @param workerNodeKey - The newly created worker node key.
1372 */
1373 protected afterWorkerNodeSetup (workerNodeKey: number): void {
1374 // Listen to worker messages.
1375 this.registerWorkerMessageListener(
1376 workerNodeKey,
1377 this.workerMessageListener
1378 )
1379 // Send the startup message to worker.
1380 this.sendStartupMessageToWorker(workerNodeKey)
1381 // Send the statistics message to worker.
1382 this.sendStatisticsMessageToWorker(workerNodeKey)
1383 if (this.opts.enableTasksQueue === true) {
1384 if (this.opts.tasksQueueOptions?.taskStealing === true) {
1385 this.workerNodes[workerNodeKey].on(
1386 'idleWorkerNode',
1387 this.handleIdleWorkerNodeEvent
1388 )
1389 }
1390 if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
1391 this.workerNodes[workerNodeKey].on(
1392 'backPressure',
1393 this.handleBackPressureEvent
1394 )
1395 }
1396 }
1397 }
1398
1399 /**
1400 * Sends the startup message to worker given its worker node key.
1401 *
1402 * @param workerNodeKey - The worker node key.
1403 */
1404 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1405
1406 /**
1407 * Sends the statistics message to worker given its worker node key.
1408 *
1409 * @param workerNodeKey - The worker node key.
1410 */
1411 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
1412 this.sendToWorker(workerNodeKey, {
1413 statistics: {
1414 runTime:
1415 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1416 .runTime.aggregate,
1417 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1418 .elu.aggregate
1419 }
1420 })
1421 }
1422
1423 private cannotStealTask (): boolean {
1424 return this.workerNodes.length <= 1 || this.info.queuedTasks === 0
1425 }
1426
1427 private handleTask (workerNodeKey: number, task: Task<Data>): void {
1428 if (this.shallExecuteTask(workerNodeKey)) {
1429 this.executeTask(workerNodeKey, task)
1430 } else {
1431 this.enqueueTask(workerNodeKey, task)
1432 }
1433 }
1434
1435 private redistributeQueuedTasks (workerNodeKey: number): void {
1436 if (workerNodeKey === -1 || this.cannotStealTask()) {
1437 return
1438 }
1439 while (this.tasksQueueSize(workerNodeKey) > 0) {
1440 const destinationWorkerNodeKey = this.workerNodes.reduce(
1441 (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
1442 return workerNode.info.ready &&
1443 workerNode.usage.tasks.queued <
1444 workerNodes[minWorkerNodeKey].usage.tasks.queued
1445 ? workerNodeKey
1446 : minWorkerNodeKey
1447 },
1448 0
1449 )
1450 this.handleTask(
1451 destinationWorkerNodeKey,
1452 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1453 this.dequeueTask(workerNodeKey)!
1454 )
1455 }
1456 }
1457
1458 private updateTaskStolenStatisticsWorkerUsage (
1459 workerNodeKey: number,
1460 taskName: string
1461 ): void {
1462 const workerNode = this.workerNodes[workerNodeKey]
1463 if (workerNode?.usage != null) {
1464 ++workerNode.usage.tasks.stolen
1465 }
1466 if (
1467 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1468 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1469 ) {
1470 const taskFunctionWorkerUsage =
1471 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1472 workerNode.getTaskFunctionWorkerUsage(taskName)!
1473 ++taskFunctionWorkerUsage.tasks.stolen
1474 }
1475 }
1476
1477 private updateTaskSequentiallyStolenStatisticsWorkerUsage (
1478 workerNodeKey: number
1479 ): void {
1480 const workerNode = this.workerNodes[workerNodeKey]
1481 if (workerNode?.usage != null) {
1482 ++workerNode.usage.tasks.sequentiallyStolen
1483 }
1484 }
1485
1486 private updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1487 workerNodeKey: number,
1488 taskName: string
1489 ): void {
1490 const workerNode = this.workerNodes[workerNodeKey]
1491 if (
1492 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1493 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1494 ) {
1495 const taskFunctionWorkerUsage =
1496 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1497 workerNode.getTaskFunctionWorkerUsage(taskName)!
1498 ++taskFunctionWorkerUsage.tasks.sequentiallyStolen
1499 }
1500 }
1501
1502 private resetTaskSequentiallyStolenStatisticsWorkerUsage (
1503 workerNodeKey: number
1504 ): void {
1505 const workerNode = this.workerNodes[workerNodeKey]
1506 if (workerNode?.usage != null) {
1507 workerNode.usage.tasks.sequentiallyStolen = 0
1508 }
1509 }
1510
1511 private resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1512 workerNodeKey: number,
1513 taskName: string
1514 ): void {
1515 const workerNode = this.workerNodes[workerNodeKey]
1516 if (
1517 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1518 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1519 ) {
1520 const taskFunctionWorkerUsage =
1521 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1522 workerNode.getTaskFunctionWorkerUsage(taskName)!
1523 taskFunctionWorkerUsage.tasks.sequentiallyStolen = 0
1524 }
1525 }
1526
1527 private readonly handleIdleWorkerNodeEvent = (
1528 eventDetail: WorkerNodeEventDetail,
1529 previousStolenTask?: Task<Data>
1530 ): void => {
1531 const { workerNodeKey } = eventDetail
1532 if (workerNodeKey == null) {
1533 throw new Error(
1534 'WorkerNode event detail workerNodeKey property must be defined'
1535 )
1536 }
1537 if (
1538 this.cannotStealTask() ||
1539 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1540 this.info.stealingWorkerNodes! > Math.floor(this.workerNodes.length / 2)
1541 ) {
1542 if (previousStolenTask != null) {
1543 this.getWorkerInfo(workerNodeKey).stealing = false
1544 }
1545 return
1546 }
1547 const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks
1548 if (
1549 previousStolenTask != null &&
1550 workerNodeTasksUsage.sequentiallyStolen > 0 &&
1551 (workerNodeTasksUsage.executing > 0 ||
1552 this.tasksQueueSize(workerNodeKey) > 0)
1553 ) {
1554 this.getWorkerInfo(workerNodeKey).stealing = false
1555 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1556 for (const taskName of this.workerNodes[workerNodeKey].info
1557 .taskFunctionNames!) {
1558 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1559 workerNodeKey,
1560 taskName
1561 )
1562 }
1563 this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
1564 return
1565 }
1566 this.getWorkerInfo(workerNodeKey).stealing = true
1567 const stolenTask = this.workerNodeStealTask(workerNodeKey)
1568 if (
1569 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1570 stolenTask != null
1571 ) {
1572 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1573 const taskFunctionTasksWorkerUsage = this.workerNodes[
1574 workerNodeKey
1575 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1576 ].getTaskFunctionWorkerUsage(stolenTask.name!)!.tasks
1577 if (
1578 taskFunctionTasksWorkerUsage.sequentiallyStolen === 0 ||
1579 (previousStolenTask != null &&
1580 previousStolenTask.name === stolenTask.name &&
1581 taskFunctionTasksWorkerUsage.sequentiallyStolen > 0)
1582 ) {
1583 this.updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1584 workerNodeKey,
1585 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1586 stolenTask.name!
1587 )
1588 } else {
1589 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1590 workerNodeKey,
1591 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1592 stolenTask.name!
1593 )
1594 }
1595 }
1596 sleep(exponentialDelay(workerNodeTasksUsage.sequentiallyStolen))
1597 .then(() => {
1598 this.handleIdleWorkerNodeEvent(eventDetail, stolenTask)
1599 return undefined
1600 })
1601 .catch(EMPTY_FUNCTION)
1602 }
1603
1604 private readonly workerNodeStealTask = (
1605 workerNodeKey: number
1606 ): Task<Data> | undefined => {
1607 const workerNodes = this.workerNodes
1608 .slice()
1609 .sort(
1610 (workerNodeA, workerNodeB) =>
1611 workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
1612 )
1613 const sourceWorkerNode = workerNodes.find(
1614 (sourceWorkerNode, sourceWorkerNodeKey) =>
1615 sourceWorkerNode.info.ready &&
1616 !sourceWorkerNode.info.stealing &&
1617 sourceWorkerNodeKey !== workerNodeKey &&
1618 sourceWorkerNode.usage.tasks.queued > 0
1619 )
1620 if (sourceWorkerNode != null) {
1621 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1622 const task = sourceWorkerNode.popTask()!
1623 this.handleTask(workerNodeKey, task)
1624 this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
1625 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1626 this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey, task.name!)
1627 return task
1628 }
1629 }
1630
1631 private readonly handleBackPressureEvent = (
1632 eventDetail: WorkerNodeEventDetail
1633 ): void => {
1634 if (
1635 this.cannotStealTask() ||
1636 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1637 this.info.stealingWorkerNodes! > Math.floor(this.workerNodes.length / 2)
1638 ) {
1639 return
1640 }
1641 const { workerId } = eventDetail
1642 const sizeOffset = 1
1643 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1644 if (this.opts.tasksQueueOptions!.size! <= sizeOffset) {
1645 return
1646 }
1647 const sourceWorkerNode =
1648 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
1649 const workerNodes = this.workerNodes
1650 .slice()
1651 .sort(
1652 (workerNodeA, workerNodeB) =>
1653 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1654 )
1655 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1656 if (
1657 sourceWorkerNode.usage.tasks.queued > 0 &&
1658 workerNode.info.ready &&
1659 !workerNode.info.stealing &&
1660 workerNode.info.id !== workerId &&
1661 workerNode.usage.tasks.queued <
1662 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1663 this.opts.tasksQueueOptions!.size! - sizeOffset
1664 ) {
1665 this.getWorkerInfo(workerNodeKey).stealing = true
1666 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1667 const task = sourceWorkerNode.popTask()!
1668 this.handleTask(workerNodeKey, task)
1669 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1670 this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey, task.name!)
1671 this.getWorkerInfo(workerNodeKey).stealing = false
1672 }
1673 }
1674 }
1675
1676 /**
1677 * This method is the message listener registered on each worker.
1678 */
1679 protected readonly workerMessageListener = (
1680 message: MessageValue<Response>
1681 ): void => {
1682 this.checkMessageWorkerId(message)
1683 const { workerId, ready, taskId, taskFunctionNames } = message
1684 if (ready != null && taskFunctionNames != null) {
1685 // Worker ready response received from worker
1686 this.handleWorkerReadyResponse(message)
1687 } else if (taskId != null) {
1688 // Task execution response received from worker
1689 this.handleTaskExecutionResponse(message)
1690 } else if (taskFunctionNames != null) {
1691 // Task function names message received from worker
1692 this.getWorkerInfo(
1693 this.getWorkerNodeKeyByWorkerId(workerId)
1694 ).taskFunctionNames = taskFunctionNames
1695 }
1696 }
1697
1698 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
1699 const { workerId, ready, taskFunctionNames } = message
1700 if (ready === false) {
1701 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1702 throw new Error(`Worker ${workerId!} failed to initialize`)
1703 }
1704 const workerInfo = this.getWorkerInfo(
1705 this.getWorkerNodeKeyByWorkerId(workerId)
1706 )
1707 workerInfo.ready = ready as boolean
1708 workerInfo.taskFunctionNames = taskFunctionNames
1709 if (!this.readyEventEmitted && this.ready) {
1710 this.readyEventEmitted = true
1711 this.emitter?.emit(PoolEvents.ready, this.info)
1712 }
1713 }
1714
1715 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1716 const { workerId, taskId, workerError, data } = message
1717 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1718 const promiseResponse = this.promiseResponseMap.get(taskId!)
1719 if (promiseResponse != null) {
1720 const { resolve, reject, workerNodeKey, asyncResource } = promiseResponse
1721 const workerNode = this.workerNodes[workerNodeKey]
1722 if (workerError != null) {
1723 this.emitter?.emit(PoolEvents.taskError, workerError)
1724 asyncResource != null
1725 ? asyncResource.runInAsyncScope(
1726 reject,
1727 this.emitter,
1728 workerError.message
1729 )
1730 : reject(workerError.message)
1731 } else {
1732 asyncResource != null
1733 ? asyncResource.runInAsyncScope(resolve, this.emitter, data)
1734 : resolve(data as Response)
1735 }
1736 asyncResource?.emitDestroy()
1737 this.afterTaskExecutionHook(workerNodeKey, message)
1738 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1739 this.promiseResponseMap.delete(taskId!)
1740 workerNode?.emit('taskFinished', taskId)
1741 if (this.opts.enableTasksQueue === true && !this.destroying) {
1742 const workerNodeTasksUsage = workerNode.usage.tasks
1743 if (
1744 this.tasksQueueSize(workerNodeKey) > 0 &&
1745 workerNodeTasksUsage.executing <
1746 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1747 this.opts.tasksQueueOptions!.concurrency!
1748 ) {
1749 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1750 this.executeTask(workerNodeKey, this.dequeueTask(workerNodeKey)!)
1751 }
1752 if (
1753 workerNodeTasksUsage.executing === 0 &&
1754 this.tasksQueueSize(workerNodeKey) === 0 &&
1755 workerNodeTasksUsage.sequentiallyStolen === 0
1756 ) {
1757 workerNode.emit('idleWorkerNode', {
1758 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1759 workerId: workerId!,
1760 workerNodeKey
1761 })
1762 }
1763 }
1764 }
1765 }
1766
1767 private checkAndEmitTaskExecutionEvents (): void {
1768 if (this.busy) {
1769 this.emitter?.emit(PoolEvents.busy, this.info)
1770 }
1771 }
1772
1773 private checkAndEmitTaskQueuingEvents (): void {
1774 if (this.hasBackPressure()) {
1775 this.emitter?.emit(PoolEvents.backPressure, this.info)
1776 }
1777 }
1778
1779 /**
1780 * Emits dynamic worker creation events.
1781 */
1782 protected abstract checkAndEmitDynamicWorkerCreationEvents (): void
1783
1784 /**
1785 * Gets the worker information given its worker node key.
1786 *
1787 * @param workerNodeKey - The worker node key.
1788 * @returns The worker information.
1789 */
1790 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
1791 return this.workerNodes[workerNodeKey]?.info
1792 }
1793
1794 /**
1795 * Creates a worker node.
1796 *
1797 * @returns The created worker node.
1798 */
1799 private createWorkerNode (): IWorkerNode<Worker, Data> {
1800 const workerNode = new WorkerNode<Worker, Data>(
1801 this.worker,
1802 this.filePath,
1803 {
1804 env: this.opts.env,
1805 workerOptions: this.opts.workerOptions,
1806 tasksQueueBackPressureSize:
1807 this.opts.tasksQueueOptions?.size ??
1808 getDefaultTasksQueueOptions(
1809 this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
1810 ).size
1811 }
1812 )
1813 // Flag the worker node as ready at pool startup.
1814 if (this.starting) {
1815 workerNode.info.ready = true
1816 }
1817 return workerNode
1818 }
1819
1820 /**
1821 * Adds the given worker node in the pool worker nodes.
1822 *
1823 * @param workerNode - The worker node.
1824 * @returns The added worker node key.
1825 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1826 */
1827 private addWorkerNode (workerNode: IWorkerNode<Worker, Data>): number {
1828 this.workerNodes.push(workerNode)
1829 const workerNodeKey = this.workerNodes.indexOf(workerNode)
1830 if (workerNodeKey === -1) {
1831 throw new Error('Worker added not found in worker nodes')
1832 }
1833 return workerNodeKey
1834 }
1835
1836 /**
1837 * Removes the worker node from the pool worker nodes.
1838 *
1839 * @param workerNode - The worker node.
1840 */
1841 private removeWorkerNode (workerNode: IWorkerNode<Worker, Data>): void {
1842 const workerNodeKey = this.workerNodes.indexOf(workerNode)
1843 if (workerNodeKey !== -1) {
1844 this.workerNodes.splice(workerNodeKey, 1)
1845 this.workerChoiceStrategyContext.remove(workerNodeKey)
1846 }
1847 }
1848
1849 protected flagWorkerNodeAsNotReady (workerNodeKey: number): void {
1850 this.getWorkerInfo(workerNodeKey).ready = false
1851 }
1852
1853 /** @inheritDoc */
1854 public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
1855 return (
1856 this.opts.enableTasksQueue === true &&
1857 this.workerNodes[workerNodeKey].hasBackPressure()
1858 )
1859 }
1860
1861 private hasBackPressure (): boolean {
1862 return (
1863 this.opts.enableTasksQueue === true &&
1864 this.workerNodes.findIndex(
1865 workerNode => !workerNode.hasBackPressure()
1866 ) === -1
1867 )
1868 }
1869
1870 /**
1871 * Executes the given task on the worker given its worker node key.
1872 *
1873 * @param workerNodeKey - The worker node key.
1874 * @param task - The task to execute.
1875 */
1876 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1877 this.beforeTaskExecutionHook(workerNodeKey, task)
1878 this.sendToWorker(workerNodeKey, task, task.transferList)
1879 this.checkAndEmitTaskExecutionEvents()
1880 }
1881
1882 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1883 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1884 this.checkAndEmitTaskQueuingEvents()
1885 return tasksQueueSize
1886 }
1887
1888 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1889 return this.workerNodes[workerNodeKey].dequeueTask()
1890 }
1891
1892 private tasksQueueSize (workerNodeKey: number): number {
1893 return this.workerNodes[workerNodeKey].tasksQueueSize()
1894 }
1895
1896 protected flushTasksQueue (workerNodeKey: number): number {
1897 let flushedTasks = 0
1898 while (this.tasksQueueSize(workerNodeKey) > 0) {
1899 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1900 this.executeTask(workerNodeKey, this.dequeueTask(workerNodeKey)!)
1901 ++flushedTasks
1902 }
1903 this.workerNodes[workerNodeKey].clearTasksQueue()
1904 return flushedTasks
1905 }
1906
1907 private flushTasksQueues (): void {
1908 for (const [workerNodeKey] of this.workerNodes.entries()) {
1909 this.flushTasksQueue(workerNodeKey)
1910 }
1911 }
1912 }