build(deps-dev): apply updates
[poolifier.git] / src / pools / abstract-pool.ts
1 import { randomUUID } from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import type { TransferListItem } from 'node:worker_threads'
4 import { EventEmitterAsyncResource } from 'node:events'
5 import { AsyncResource } from 'node:async_hooks'
6 import type {
7 MessageValue,
8 PromiseResponseWrapper,
9 Task
10 } from '../utility-types.js'
11 import {
12 DEFAULT_TASK_NAME,
13 EMPTY_FUNCTION,
14 average,
15 exponentialDelay,
16 isKillBehavior,
17 isPlainObject,
18 max,
19 median,
20 min,
21 round,
22 sleep
23 } from '../utils.js'
24 import { KillBehaviors } from '../worker/worker-options.js'
25 import type { TaskFunction } from '../worker/task-functions.js'
26 import {
27 type IPool,
28 PoolEvents,
29 type PoolInfo,
30 type PoolOptions,
31 type PoolType,
32 PoolTypes,
33 type TasksQueueOptions
34 } from './pool.js'
35 import type {
36 IWorker,
37 IWorkerNode,
38 WorkerInfo,
39 WorkerNodeEventDetail,
40 WorkerType
41 } from './worker.js'
42 import {
43 Measurements,
44 WorkerChoiceStrategies,
45 type WorkerChoiceStrategy,
46 type WorkerChoiceStrategyOptions
47 } from './selection-strategies/selection-strategies-types.js'
48 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context.js'
49 import { version } from './version.js'
50 import { WorkerNode } from './worker-node.js'
51 import {
52 checkFilePath,
53 checkValidTasksQueueOptions,
54 checkValidWorkerChoiceStrategy,
55 getDefaultTasksQueueOptions,
56 updateEluWorkerUsage,
57 updateRunTimeWorkerUsage,
58 updateTaskStatisticsWorkerUsage,
59 updateWaitTimeWorkerUsage,
60 waitWorkerNodeEvents
61 } from './utils.js'
62
63 /**
64 * Base class that implements some shared logic for all poolifier pools.
65 *
66 * @typeParam Worker - Type of worker which manages this pool.
67 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
68 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
69 */
70 export abstract class AbstractPool<
71 Worker extends IWorker,
72 Data = unknown,
73 Response = unknown
74 > implements IPool<Worker, Data, Response> {
75 /** @inheritDoc */
76 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
77
78 /** @inheritDoc */
79 public emitter?: EventEmitterAsyncResource
80
81 /**
82 * The task execution response promise map:
83 * - `key`: The message id of each submitted task.
84 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
85 *
86 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
87 */
88 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
89 new Map<string, PromiseResponseWrapper<Response>>()
90
91 /**
92 * Worker choice strategy context referencing a worker choice algorithm implementation.
93 */
94 protected workerChoiceStrategyContext?: WorkerChoiceStrategyContext<
95 Worker,
96 Data,
97 Response
98 >
99
100 /**
101 * The task functions added at runtime map:
102 * - `key`: The task function name.
103 * - `value`: The task function itself.
104 */
105 private readonly taskFunctions: Map<string, TaskFunction<Data, Response>>
106
107 /**
108 * Whether the pool is started or not.
109 */
110 private started: boolean
111 /**
112 * Whether the pool is starting or not.
113 */
114 private starting: boolean
115 /**
116 * Whether the pool is destroying or not.
117 */
118 private destroying: boolean
119 /**
120 * Whether the pool ready event has been emitted or not.
121 */
122 private readyEventEmitted: boolean
123 /**
124 * The start timestamp of the pool.
125 */
126 private readonly startTimestamp
127
128 /**
129 * Constructs a new poolifier pool.
130 *
131 * @param minimumNumberOfWorkers - Minimum number of workers that this pool manages.
132 * @param filePath - Path to the worker file.
133 * @param opts - Options for the pool.
134 * @param maximumNumberOfWorkers - Maximum number of workers that this pool manages.
135 */
136 public constructor (
137 protected readonly minimumNumberOfWorkers: number,
138 protected readonly filePath: string,
139 protected readonly opts: PoolOptions<Worker>,
140 protected readonly maximumNumberOfWorkers?: number
141 ) {
142 if (!this.isMain()) {
143 throw new Error(
144 'Cannot start a pool from a worker with the same type as the pool'
145 )
146 }
147 this.checkPoolType()
148 checkFilePath(this.filePath)
149 this.checkMinimumNumberOfWorkers(this.minimumNumberOfWorkers)
150 this.checkPoolOptions(this.opts)
151
152 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
153 this.executeTask = this.executeTask.bind(this)
154 this.enqueueTask = this.enqueueTask.bind(this)
155
156 if (this.opts.enableEvents === true) {
157 this.initializeEventEmitter()
158 }
159 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
160 Worker,
161 Data,
162 Response
163 >(
164 this,
165 this.opts.workerChoiceStrategy,
166 this.opts.workerChoiceStrategyOptions
167 )
168
169 this.setupHook()
170
171 this.taskFunctions = new Map<string, TaskFunction<Data, Response>>()
172
173 this.started = false
174 this.starting = false
175 this.destroying = false
176 this.readyEventEmitted = false
177 if (this.opts.startWorkers === true) {
178 this.start()
179 }
180
181 this.startTimestamp = performance.now()
182 }
183
184 private checkPoolType (): void {
185 if (this.type === PoolTypes.fixed && this.maximumNumberOfWorkers != null) {
186 throw new Error(
187 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
188 )
189 }
190 }
191
192 private checkMinimumNumberOfWorkers (
193 minimumNumberOfWorkers: number | undefined
194 ): void {
195 if (minimumNumberOfWorkers == null) {
196 throw new Error(
197 'Cannot instantiate a pool without specifying the number of workers'
198 )
199 } else if (!Number.isSafeInteger(minimumNumberOfWorkers)) {
200 throw new TypeError(
201 'Cannot instantiate a pool with a non safe integer number of workers'
202 )
203 } else if (minimumNumberOfWorkers < 0) {
204 throw new RangeError(
205 'Cannot instantiate a pool with a negative number of workers'
206 )
207 } else if (this.type === PoolTypes.fixed && minimumNumberOfWorkers === 0) {
208 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
209 }
210 }
211
212 private checkPoolOptions (opts: PoolOptions<Worker>): void {
213 if (isPlainObject(opts)) {
214 this.opts.startWorkers = opts.startWorkers ?? true
215 checkValidWorkerChoiceStrategy(opts.workerChoiceStrategy)
216 this.opts.workerChoiceStrategy =
217 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
218 this.checkValidWorkerChoiceStrategyOptions(
219 opts.workerChoiceStrategyOptions
220 )
221 if (opts.workerChoiceStrategyOptions != null) {
222 this.opts.workerChoiceStrategyOptions = opts.workerChoiceStrategyOptions
223 }
224 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
225 this.opts.enableEvents = opts.enableEvents ?? true
226 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
227 if (this.opts.enableTasksQueue) {
228 checkValidTasksQueueOptions(opts.tasksQueueOptions)
229 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
230 opts.tasksQueueOptions
231 )
232 }
233 } else {
234 throw new TypeError('Invalid pool options: must be a plain object')
235 }
236 }
237
238 private checkValidWorkerChoiceStrategyOptions (
239 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions | undefined
240 ): void {
241 if (
242 workerChoiceStrategyOptions != null &&
243 !isPlainObject(workerChoiceStrategyOptions)
244 ) {
245 throw new TypeError(
246 'Invalid worker choice strategy options: must be a plain object'
247 )
248 }
249 if (
250 workerChoiceStrategyOptions?.weights != null &&
251 Object.keys(workerChoiceStrategyOptions.weights).length !==
252 (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
253 ) {
254 throw new Error(
255 'Invalid worker choice strategy options: must have a weight for each worker node'
256 )
257 }
258 if (
259 workerChoiceStrategyOptions?.measurement != null &&
260 !Object.values(Measurements).includes(
261 workerChoiceStrategyOptions.measurement
262 )
263 ) {
264 throw new Error(
265 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
266 )
267 }
268 }
269
270 private initializeEventEmitter (): void {
271 this.emitter = new EventEmitterAsyncResource({
272 name: `poolifier:${this.type}-${this.worker}-pool`
273 })
274 }
275
276 /** @inheritDoc */
277 public get info (): PoolInfo {
278 return {
279 version,
280 type: this.type,
281 worker: this.worker,
282 started: this.started,
283 ready: this.ready,
284 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
285 strategy: this.opts.workerChoiceStrategy!,
286 strategyRetries: this.workerChoiceStrategyContext?.retriesCount ?? 0,
287 minSize: this.minimumNumberOfWorkers,
288 maxSize: this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers,
289 ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
290 .runTime.aggregate === true &&
291 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
292 .waitTime.aggregate && {
293 utilization: round(this.utilization)
294 }),
295 workerNodes: this.workerNodes.length,
296 idleWorkerNodes: this.workerNodes.reduce(
297 (accumulator, workerNode) =>
298 workerNode.usage.tasks.executing === 0
299 ? accumulator + 1
300 : accumulator,
301 0
302 ),
303 ...(this.opts.enableTasksQueue === true && {
304 stealingWorkerNodes: this.workerNodes.reduce(
305 (accumulator, workerNode) =>
306 workerNode.info.stealing ? accumulator + 1 : accumulator,
307 0
308 )
309 }),
310 busyWorkerNodes: this.workerNodes.reduce(
311 (accumulator, _workerNode, workerNodeKey) =>
312 this.isWorkerNodeBusy(workerNodeKey) ? accumulator + 1 : accumulator,
313 0
314 ),
315 executedTasks: this.workerNodes.reduce(
316 (accumulator, workerNode) =>
317 accumulator + workerNode.usage.tasks.executed,
318 0
319 ),
320 executingTasks: this.workerNodes.reduce(
321 (accumulator, workerNode) =>
322 accumulator + workerNode.usage.tasks.executing,
323 0
324 ),
325 ...(this.opts.enableTasksQueue === true && {
326 queuedTasks: this.workerNodes.reduce(
327 (accumulator, workerNode) =>
328 accumulator + workerNode.usage.tasks.queued,
329 0
330 )
331 }),
332 ...(this.opts.enableTasksQueue === true && {
333 maxQueuedTasks: this.workerNodes.reduce(
334 (accumulator, workerNode) =>
335 accumulator + (workerNode.usage.tasks.maxQueued ?? 0),
336 0
337 )
338 }),
339 ...(this.opts.enableTasksQueue === true && {
340 backPressure: this.hasBackPressure()
341 }),
342 ...(this.opts.enableTasksQueue === true && {
343 stolenTasks: this.workerNodes.reduce(
344 (accumulator, workerNode) =>
345 accumulator + workerNode.usage.tasks.stolen,
346 0
347 )
348 }),
349 failedTasks: this.workerNodes.reduce(
350 (accumulator, workerNode) =>
351 accumulator + workerNode.usage.tasks.failed,
352 0
353 ),
354 ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
355 .runTime.aggregate === true && {
356 runTime: {
357 minimum: round(
358 min(
359 ...this.workerNodes.map(
360 workerNode => workerNode.usage.runTime.minimum ?? Infinity
361 )
362 )
363 ),
364 maximum: round(
365 max(
366 ...this.workerNodes.map(
367 workerNode => workerNode.usage.runTime.maximum ?? -Infinity
368 )
369 )
370 ),
371 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
372 .runTime.average && {
373 average: round(
374 average(
375 this.workerNodes.reduce<number[]>(
376 (accumulator, workerNode) =>
377 accumulator.concat(workerNode.usage.runTime.history),
378 []
379 )
380 )
381 )
382 }),
383 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
384 .runTime.median && {
385 median: round(
386 median(
387 this.workerNodes.reduce<number[]>(
388 (accumulator, workerNode) =>
389 accumulator.concat(workerNode.usage.runTime.history),
390 []
391 )
392 )
393 )
394 })
395 }
396 }),
397 ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
398 .waitTime.aggregate === true && {
399 waitTime: {
400 minimum: round(
401 min(
402 ...this.workerNodes.map(
403 workerNode => workerNode.usage.waitTime.minimum ?? Infinity
404 )
405 )
406 ),
407 maximum: round(
408 max(
409 ...this.workerNodes.map(
410 workerNode => workerNode.usage.waitTime.maximum ?? -Infinity
411 )
412 )
413 ),
414 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
415 .waitTime.average && {
416 average: round(
417 average(
418 this.workerNodes.reduce<number[]>(
419 (accumulator, workerNode) =>
420 accumulator.concat(workerNode.usage.waitTime.history),
421 []
422 )
423 )
424 )
425 }),
426 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
427 .waitTime.median && {
428 median: round(
429 median(
430 this.workerNodes.reduce<number[]>(
431 (accumulator, workerNode) =>
432 accumulator.concat(workerNode.usage.waitTime.history),
433 []
434 )
435 )
436 )
437 })
438 }
439 })
440 }
441 }
442
443 /**
444 * The pool readiness boolean status.
445 */
446 private get ready (): boolean {
447 if (this.empty) {
448 return false
449 }
450 return (
451 this.workerNodes.reduce(
452 (accumulator, workerNode) =>
453 !workerNode.info.dynamic && workerNode.info.ready
454 ? accumulator + 1
455 : accumulator,
456 0
457 ) >= this.minimumNumberOfWorkers
458 )
459 }
460
461 /**
462 * The pool emptiness boolean status.
463 */
464 protected get empty (): boolean {
465 return this.minimumNumberOfWorkers === 0 && this.workerNodes.length === 0
466 }
467
468 /**
469 * The approximate pool utilization.
470 *
471 * @returns The pool utilization.
472 */
473 private get utilization (): number {
474 const poolTimeCapacity =
475 (performance.now() - this.startTimestamp) *
476 (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
477 const totalTasksRunTime = this.workerNodes.reduce(
478 (accumulator, workerNode) =>
479 accumulator + (workerNode.usage.runTime.aggregate ?? 0),
480 0
481 )
482 const totalTasksWaitTime = this.workerNodes.reduce(
483 (accumulator, workerNode) =>
484 accumulator + (workerNode.usage.waitTime.aggregate ?? 0),
485 0
486 )
487 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
488 }
489
490 /**
491 * The pool type.
492 *
493 * If it is `'dynamic'`, it provides the `max` property.
494 */
495 protected abstract get type (): PoolType
496
497 /**
498 * The worker type.
499 */
500 protected abstract get worker (): WorkerType
501
502 /**
503 * Checks if the worker id sent in the received message from a worker is valid.
504 *
505 * @param message - The received message.
506 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
507 */
508 private checkMessageWorkerId (message: MessageValue<Data | Response>): void {
509 if (message.workerId == null) {
510 throw new Error('Worker message received without worker id')
511 } else if (this.getWorkerNodeKeyByWorkerId(message.workerId) === -1) {
512 throw new Error(
513 `Worker message received from unknown worker '${message.workerId}'`
514 )
515 }
516 }
517
518 /**
519 * Gets the worker node key given its worker id.
520 *
521 * @param workerId - The worker id.
522 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
523 */
524 private getWorkerNodeKeyByWorkerId (workerId: number | undefined): number {
525 return this.workerNodes.findIndex(
526 workerNode => workerNode.info.id === workerId
527 )
528 }
529
530 /** @inheritDoc */
531 public setWorkerChoiceStrategy (
532 workerChoiceStrategy: WorkerChoiceStrategy,
533 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
534 ): void {
535 checkValidWorkerChoiceStrategy(workerChoiceStrategy)
536 this.opts.workerChoiceStrategy = workerChoiceStrategy
537 this.workerChoiceStrategyContext?.setWorkerChoiceStrategy(
538 this.opts.workerChoiceStrategy
539 )
540 if (workerChoiceStrategyOptions != null) {
541 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
542 }
543 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
544 workerNode.resetUsage()
545 this.sendStatisticsMessageToWorker(workerNodeKey)
546 }
547 }
548
549 /** @inheritDoc */
550 public setWorkerChoiceStrategyOptions (
551 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions | undefined
552 ): void {
553 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
554 if (workerChoiceStrategyOptions != null) {
555 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
556 }
557 this.workerChoiceStrategyContext?.setOptions(
558 this.opts.workerChoiceStrategyOptions
559 )
560 }
561
562 /** @inheritDoc */
563 public enableTasksQueue (
564 enable: boolean,
565 tasksQueueOptions?: TasksQueueOptions
566 ): void {
567 if (this.opts.enableTasksQueue === true && !enable) {
568 this.unsetTaskStealing()
569 this.unsetTasksStealingOnBackPressure()
570 this.flushTasksQueues()
571 }
572 this.opts.enableTasksQueue = enable
573 this.setTasksQueueOptions(tasksQueueOptions)
574 }
575
576 /** @inheritDoc */
577 public setTasksQueueOptions (
578 tasksQueueOptions: TasksQueueOptions | undefined
579 ): void {
580 if (this.opts.enableTasksQueue === true) {
581 checkValidTasksQueueOptions(tasksQueueOptions)
582 this.opts.tasksQueueOptions =
583 this.buildTasksQueueOptions(tasksQueueOptions)
584 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
585 this.setTasksQueueSize(this.opts.tasksQueueOptions.size!)
586 if (this.opts.tasksQueueOptions.taskStealing === true) {
587 this.unsetTaskStealing()
588 this.setTaskStealing()
589 } else {
590 this.unsetTaskStealing()
591 }
592 if (this.opts.tasksQueueOptions.tasksStealingOnBackPressure === true) {
593 this.unsetTasksStealingOnBackPressure()
594 this.setTasksStealingOnBackPressure()
595 } else {
596 this.unsetTasksStealingOnBackPressure()
597 }
598 } else if (this.opts.tasksQueueOptions != null) {
599 delete this.opts.tasksQueueOptions
600 }
601 }
602
603 private buildTasksQueueOptions (
604 tasksQueueOptions: TasksQueueOptions | undefined
605 ): TasksQueueOptions {
606 return {
607 ...getDefaultTasksQueueOptions(
608 this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
609 ),
610 ...tasksQueueOptions
611 }
612 }
613
614 private setTasksQueueSize (size: number): void {
615 for (const workerNode of this.workerNodes) {
616 workerNode.tasksQueueBackPressureSize = size
617 }
618 }
619
620 private setTaskStealing (): void {
621 for (const [workerNodeKey] of this.workerNodes.entries()) {
622 this.workerNodes[workerNodeKey].on('idle', this.handleWorkerNodeIdleEvent)
623 }
624 }
625
626 private unsetTaskStealing (): void {
627 for (const [workerNodeKey] of this.workerNodes.entries()) {
628 this.workerNodes[workerNodeKey].off(
629 'idle',
630 this.handleWorkerNodeIdleEvent
631 )
632 }
633 }
634
635 private setTasksStealingOnBackPressure (): void {
636 for (const [workerNodeKey] of this.workerNodes.entries()) {
637 this.workerNodes[workerNodeKey].on(
638 'backPressure',
639 this.handleWorkerNodeBackPressureEvent
640 )
641 }
642 }
643
644 private unsetTasksStealingOnBackPressure (): void {
645 for (const [workerNodeKey] of this.workerNodes.entries()) {
646 this.workerNodes[workerNodeKey].off(
647 'backPressure',
648 this.handleWorkerNodeBackPressureEvent
649 )
650 }
651 }
652
653 /**
654 * Whether the pool is full or not.
655 *
656 * The pool filling boolean status.
657 */
658 protected get full (): boolean {
659 return (
660 this.workerNodes.length >=
661 (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
662 )
663 }
664
665 /**
666 * Whether the pool is busy or not.
667 *
668 * The pool busyness boolean status.
669 */
670 protected abstract get busy (): boolean
671
672 /**
673 * Whether worker nodes are executing concurrently their tasks quota or not.
674 *
675 * @returns Worker nodes busyness boolean status.
676 */
677 protected internalBusy (): boolean {
678 if (this.opts.enableTasksQueue === true) {
679 return (
680 this.workerNodes.findIndex(
681 workerNode =>
682 workerNode.info.ready &&
683 workerNode.usage.tasks.executing <
684 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
685 this.opts.tasksQueueOptions!.concurrency!
686 ) === -1
687 )
688 }
689 return (
690 this.workerNodes.findIndex(
691 workerNode =>
692 workerNode.info.ready && workerNode.usage.tasks.executing === 0
693 ) === -1
694 )
695 }
696
697 private isWorkerNodeBusy (workerNodeKey: number): boolean {
698 if (this.opts.enableTasksQueue === true) {
699 return (
700 this.workerNodes[workerNodeKey].usage.tasks.executing >=
701 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
702 this.opts.tasksQueueOptions!.concurrency!
703 )
704 }
705 return this.workerNodes[workerNodeKey].usage.tasks.executing > 0
706 }
707
708 private async sendTaskFunctionOperationToWorker (
709 workerNodeKey: number,
710 message: MessageValue<Data>
711 ): Promise<boolean> {
712 return await new Promise<boolean>((resolve, reject) => {
713 const taskFunctionOperationListener = (
714 message: MessageValue<Response>
715 ): void => {
716 this.checkMessageWorkerId(message)
717 const workerId = this.getWorkerInfo(workerNodeKey)?.id
718 if (
719 message.taskFunctionOperationStatus != null &&
720 message.workerId === workerId
721 ) {
722 if (message.taskFunctionOperationStatus) {
723 resolve(true)
724 } else {
725 reject(
726 new Error(
727 `Task function operation '${message.taskFunctionOperation}' failed on worker ${message.workerId} with error: '${message.workerError?.message}'`
728 )
729 )
730 }
731 this.deregisterWorkerMessageListener(
732 this.getWorkerNodeKeyByWorkerId(message.workerId),
733 taskFunctionOperationListener
734 )
735 }
736 }
737 this.registerWorkerMessageListener(
738 workerNodeKey,
739 taskFunctionOperationListener
740 )
741 this.sendToWorker(workerNodeKey, message)
742 })
743 }
744
745 private async sendTaskFunctionOperationToWorkers (
746 message: MessageValue<Data>
747 ): Promise<boolean> {
748 return await new Promise<boolean>((resolve, reject) => {
749 const responsesReceived = new Array<MessageValue<Response>>()
750 const taskFunctionOperationsListener = (
751 message: MessageValue<Response>
752 ): void => {
753 this.checkMessageWorkerId(message)
754 if (message.taskFunctionOperationStatus != null) {
755 responsesReceived.push(message)
756 if (responsesReceived.length === this.workerNodes.length) {
757 if (
758 responsesReceived.every(
759 message => message.taskFunctionOperationStatus === true
760 )
761 ) {
762 resolve(true)
763 } else if (
764 responsesReceived.some(
765 message => message.taskFunctionOperationStatus === false
766 )
767 ) {
768 const errorResponse = responsesReceived.find(
769 response => response.taskFunctionOperationStatus === false
770 )
771 reject(
772 new Error(
773 `Task function operation '${
774 message.taskFunctionOperation as string
775 }' failed on worker ${errorResponse?.workerId} with error: '${
776 errorResponse?.workerError?.message
777 }'`
778 )
779 )
780 }
781 this.deregisterWorkerMessageListener(
782 this.getWorkerNodeKeyByWorkerId(message.workerId),
783 taskFunctionOperationsListener
784 )
785 }
786 }
787 }
788 for (const [workerNodeKey] of this.workerNodes.entries()) {
789 this.registerWorkerMessageListener(
790 workerNodeKey,
791 taskFunctionOperationsListener
792 )
793 this.sendToWorker(workerNodeKey, message)
794 }
795 })
796 }
797
798 /** @inheritDoc */
799 public hasTaskFunction (name: string): boolean {
800 for (const workerNode of this.workerNodes) {
801 if (
802 Array.isArray(workerNode.info.taskFunctionNames) &&
803 workerNode.info.taskFunctionNames.includes(name)
804 ) {
805 return true
806 }
807 }
808 return false
809 }
810
811 /** @inheritDoc */
812 public async addTaskFunction (
813 name: string,
814 fn: TaskFunction<Data, Response>
815 ): Promise<boolean> {
816 if (typeof name !== 'string') {
817 throw new TypeError('name argument must be a string')
818 }
819 if (typeof name === 'string' && name.trim().length === 0) {
820 throw new TypeError('name argument must not be an empty string')
821 }
822 if (typeof fn !== 'function') {
823 throw new TypeError('fn argument must be a function')
824 }
825 const opResult = await this.sendTaskFunctionOperationToWorkers({
826 taskFunctionOperation: 'add',
827 taskFunctionName: name,
828 taskFunction: fn.toString()
829 })
830 this.taskFunctions.set(name, fn)
831 return opResult
832 }
833
834 /** @inheritDoc */
835 public async removeTaskFunction (name: string): Promise<boolean> {
836 if (!this.taskFunctions.has(name)) {
837 throw new Error(
838 'Cannot remove a task function not handled on the pool side'
839 )
840 }
841 const opResult = await this.sendTaskFunctionOperationToWorkers({
842 taskFunctionOperation: 'remove',
843 taskFunctionName: name
844 })
845 this.deleteTaskFunctionWorkerUsages(name)
846 this.taskFunctions.delete(name)
847 return opResult
848 }
849
850 /** @inheritDoc */
851 public listTaskFunctionNames (): string[] {
852 for (const workerNode of this.workerNodes) {
853 if (
854 Array.isArray(workerNode.info.taskFunctionNames) &&
855 workerNode.info.taskFunctionNames.length > 0
856 ) {
857 return workerNode.info.taskFunctionNames
858 }
859 }
860 return []
861 }
862
863 /** @inheritDoc */
864 public async setDefaultTaskFunction (name: string): Promise<boolean> {
865 return await this.sendTaskFunctionOperationToWorkers({
866 taskFunctionOperation: 'default',
867 taskFunctionName: name
868 })
869 }
870
871 private deleteTaskFunctionWorkerUsages (name: string): void {
872 for (const workerNode of this.workerNodes) {
873 workerNode.deleteTaskFunctionWorkerUsage(name)
874 }
875 }
876
877 private shallExecuteTask (workerNodeKey: number): boolean {
878 return (
879 this.tasksQueueSize(workerNodeKey) === 0 &&
880 this.workerNodes[workerNodeKey].usage.tasks.executing <
881 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
882 this.opts.tasksQueueOptions!.concurrency!
883 )
884 }
885
886 /** @inheritDoc */
887 public async execute (
888 data?: Data,
889 name?: string,
890 transferList?: TransferListItem[]
891 ): Promise<Response> {
892 return await new Promise<Response>((resolve, reject) => {
893 if (!this.started) {
894 reject(new Error('Cannot execute a task on not started pool'))
895 return
896 }
897 if (this.destroying) {
898 reject(new Error('Cannot execute a task on destroying pool'))
899 return
900 }
901 if (name != null && typeof name !== 'string') {
902 reject(new TypeError('name argument must be a string'))
903 return
904 }
905 if (
906 name != null &&
907 typeof name === 'string' &&
908 name.trim().length === 0
909 ) {
910 reject(new TypeError('name argument must not be an empty string'))
911 return
912 }
913 if (transferList != null && !Array.isArray(transferList)) {
914 reject(new TypeError('transferList argument must be an array'))
915 return
916 }
917 const timestamp = performance.now()
918 const workerNodeKey = this.chooseWorkerNode()
919 const task: Task<Data> = {
920 name: name ?? DEFAULT_TASK_NAME,
921 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
922 data: data ?? ({} as Data),
923 transferList,
924 timestamp,
925 taskId: randomUUID()
926 }
927 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
928 this.promiseResponseMap.set(task.taskId!, {
929 resolve,
930 reject,
931 workerNodeKey,
932 ...(this.emitter != null && {
933 asyncResource: new AsyncResource('poolifier:task', {
934 triggerAsyncId: this.emitter.asyncId,
935 requireManualDestroy: true
936 })
937 })
938 })
939 if (
940 this.opts.enableTasksQueue === false ||
941 (this.opts.enableTasksQueue === true &&
942 this.shallExecuteTask(workerNodeKey))
943 ) {
944 this.executeTask(workerNodeKey, task)
945 } else {
946 this.enqueueTask(workerNodeKey, task)
947 }
948 })
949 }
950
951 /** @inheritdoc */
952 public start (): void {
953 if (this.started) {
954 throw new Error('Cannot start an already started pool')
955 }
956 if (this.starting) {
957 throw new Error('Cannot start an already starting pool')
958 }
959 if (this.destroying) {
960 throw new Error('Cannot start a destroying pool')
961 }
962 this.starting = true
963 while (
964 this.workerNodes.reduce(
965 (accumulator, workerNode) =>
966 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
967 0
968 ) < this.minimumNumberOfWorkers
969 ) {
970 this.createAndSetupWorkerNode()
971 }
972 this.starting = false
973 this.started = true
974 }
975
976 /** @inheritDoc */
977 public async destroy (): Promise<void> {
978 if (!this.started) {
979 throw new Error('Cannot destroy an already destroyed pool')
980 }
981 if (this.starting) {
982 throw new Error('Cannot destroy an starting pool')
983 }
984 if (this.destroying) {
985 throw new Error('Cannot destroy an already destroying pool')
986 }
987 this.destroying = true
988 await Promise.all(
989 this.workerNodes.map(async (_workerNode, workerNodeKey) => {
990 await this.destroyWorkerNode(workerNodeKey)
991 })
992 )
993 this.emitter?.emit(PoolEvents.destroy, this.info)
994 this.emitter?.emitDestroy()
995 this.emitter?.removeAllListeners()
996 this.readyEventEmitted = false
997 this.destroying = false
998 this.started = false
999 }
1000
1001 private async sendKillMessageToWorker (workerNodeKey: number): Promise<void> {
1002 await new Promise<void>((resolve, reject) => {
1003 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1004 if (this.workerNodes[workerNodeKey] == null) {
1005 resolve()
1006 return
1007 }
1008 const killMessageListener = (message: MessageValue<Response>): void => {
1009 this.checkMessageWorkerId(message)
1010 if (message.kill === 'success') {
1011 resolve()
1012 } else if (message.kill === 'failure') {
1013 reject(
1014 new Error(
1015 `Kill message handling failed on worker ${message.workerId}`
1016 )
1017 )
1018 }
1019 }
1020 // FIXME: should be registered only once
1021 this.registerWorkerMessageListener(workerNodeKey, killMessageListener)
1022 this.sendToWorker(workerNodeKey, { kill: true })
1023 })
1024 }
1025
1026 /**
1027 * Terminates the worker node given its worker node key.
1028 *
1029 * @param workerNodeKey - The worker node key.
1030 */
1031 protected async destroyWorkerNode (workerNodeKey: number): Promise<void> {
1032 this.flagWorkerNodeAsNotReady(workerNodeKey)
1033 const flushedTasks = this.flushTasksQueue(workerNodeKey)
1034 const workerNode = this.workerNodes[workerNodeKey]
1035 await waitWorkerNodeEvents(
1036 workerNode,
1037 'taskFinished',
1038 flushedTasks,
1039 this.opts.tasksQueueOptions?.tasksFinishedTimeout ??
1040 getDefaultTasksQueueOptions(
1041 this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
1042 ).tasksFinishedTimeout
1043 )
1044 await this.sendKillMessageToWorker(workerNodeKey)
1045 await workerNode.terminate()
1046 }
1047
1048 /**
1049 * Setup hook to execute code before worker nodes are created in the abstract constructor.
1050 * Can be overridden.
1051 *
1052 * @virtual
1053 */
1054 protected setupHook (): void {
1055 /* Intentionally empty */
1056 }
1057
1058 /**
1059 * Should return whether the worker is the main worker or not.
1060 */
1061 protected abstract isMain (): boolean
1062
1063 /**
1064 * Hook executed before the worker task execution.
1065 * Can be overridden.
1066 *
1067 * @param workerNodeKey - The worker node key.
1068 * @param task - The task to execute.
1069 */
1070 protected beforeTaskExecutionHook (
1071 workerNodeKey: number,
1072 task: Task<Data>
1073 ): void {
1074 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1075 if (this.workerNodes[workerNodeKey]?.usage != null) {
1076 const workerUsage = this.workerNodes[workerNodeKey].usage
1077 ++workerUsage.tasks.executing
1078 updateWaitTimeWorkerUsage(
1079 this.workerChoiceStrategyContext,
1080 workerUsage,
1081 task
1082 )
1083 }
1084 if (
1085 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1086 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1087 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(task.name!) !=
1088 null
1089 ) {
1090 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1091 const taskFunctionWorkerUsage = this.workerNodes[
1092 workerNodeKey
1093 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1094 ].getTaskFunctionWorkerUsage(task.name!)!
1095 ++taskFunctionWorkerUsage.tasks.executing
1096 updateWaitTimeWorkerUsage(
1097 this.workerChoiceStrategyContext,
1098 taskFunctionWorkerUsage,
1099 task
1100 )
1101 }
1102 }
1103
1104 /**
1105 * Hook executed after the worker task execution.
1106 * Can be overridden.
1107 *
1108 * @param workerNodeKey - The worker node key.
1109 * @param message - The received message.
1110 */
1111 protected afterTaskExecutionHook (
1112 workerNodeKey: number,
1113 message: MessageValue<Response>
1114 ): void {
1115 let needWorkerChoiceStrategyUpdate = false
1116 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1117 if (this.workerNodes[workerNodeKey]?.usage != null) {
1118 const workerUsage = this.workerNodes[workerNodeKey].usage
1119 updateTaskStatisticsWorkerUsage(workerUsage, message)
1120 updateRunTimeWorkerUsage(
1121 this.workerChoiceStrategyContext,
1122 workerUsage,
1123 message
1124 )
1125 updateEluWorkerUsage(
1126 this.workerChoiceStrategyContext,
1127 workerUsage,
1128 message
1129 )
1130 needWorkerChoiceStrategyUpdate = true
1131 }
1132 if (
1133 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1134 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
1135 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1136 message.taskPerformance!.name
1137 ) != null
1138 ) {
1139 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1140 const taskFunctionWorkerUsage = this.workerNodes[
1141 workerNodeKey
1142 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1143 ].getTaskFunctionWorkerUsage(message.taskPerformance!.name)!
1144 updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
1145 updateRunTimeWorkerUsage(
1146 this.workerChoiceStrategyContext,
1147 taskFunctionWorkerUsage,
1148 message
1149 )
1150 updateEluWorkerUsage(
1151 this.workerChoiceStrategyContext,
1152 taskFunctionWorkerUsage,
1153 message
1154 )
1155 needWorkerChoiceStrategyUpdate = true
1156 }
1157 if (needWorkerChoiceStrategyUpdate) {
1158 this.workerChoiceStrategyContext?.update(workerNodeKey)
1159 }
1160 }
1161
1162 /**
1163 * Whether the worker node shall update its task function worker usage or not.
1164 *
1165 * @param workerNodeKey - The worker node key.
1166 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1167 */
1168 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
1169 const workerInfo = this.getWorkerInfo(workerNodeKey)
1170 return (
1171 workerInfo != null &&
1172 Array.isArray(workerInfo.taskFunctionNames) &&
1173 workerInfo.taskFunctionNames.length > 2
1174 )
1175 }
1176
1177 /**
1178 * Chooses a worker node for the next task.
1179 *
1180 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
1181 *
1182 * @returns The chosen worker node key
1183 */
1184 private chooseWorkerNode (): number {
1185 if (this.shallCreateDynamicWorker()) {
1186 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
1187 if (
1188 this.workerChoiceStrategyContext?.getStrategyPolicy()
1189 .dynamicWorkerUsage === true
1190 ) {
1191 return workerNodeKey
1192 }
1193 }
1194 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1195 return this.workerChoiceStrategyContext!.execute()
1196 }
1197
1198 /**
1199 * Conditions for dynamic worker creation.
1200 *
1201 * @returns Whether to create a dynamic worker or not.
1202 */
1203 protected abstract shallCreateDynamicWorker (): boolean
1204
1205 /**
1206 * Sends a message to worker given its worker node key.
1207 *
1208 * @param workerNodeKey - The worker node key.
1209 * @param message - The message.
1210 * @param transferList - The optional array of transferable objects.
1211 */
1212 protected abstract sendToWorker (
1213 workerNodeKey: number,
1214 message: MessageValue<Data>,
1215 transferList?: TransferListItem[]
1216 ): void
1217
1218 /**
1219 * Creates a new, completely set up worker node.
1220 *
1221 * @returns New, completely set up worker node key.
1222 */
1223 protected createAndSetupWorkerNode (): number {
1224 const workerNode = this.createWorkerNode()
1225 workerNode.registerWorkerEventHandler(
1226 'online',
1227 this.opts.onlineHandler ?? EMPTY_FUNCTION
1228 )
1229 workerNode.registerWorkerEventHandler(
1230 'message',
1231 this.opts.messageHandler ?? EMPTY_FUNCTION
1232 )
1233 workerNode.registerWorkerEventHandler(
1234 'error',
1235 this.opts.errorHandler ?? EMPTY_FUNCTION
1236 )
1237 workerNode.registerWorkerEventHandler('error', (error: Error) => {
1238 workerNode.info.ready = false
1239 this.emitter?.emit(PoolEvents.error, error)
1240 if (
1241 this.started &&
1242 !this.destroying &&
1243 this.opts.restartWorkerOnError === true
1244 ) {
1245 if (workerNode.info.dynamic) {
1246 this.createAndSetupDynamicWorkerNode()
1247 } else {
1248 this.createAndSetupWorkerNode()
1249 }
1250 }
1251 if (
1252 this.started &&
1253 !this.destroying &&
1254 this.opts.enableTasksQueue === true
1255 ) {
1256 this.redistributeQueuedTasks(this.workerNodes.indexOf(workerNode))
1257 }
1258 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1259 workerNode?.terminate().catch(error => {
1260 this.emitter?.emit(PoolEvents.error, error)
1261 })
1262 })
1263 workerNode.registerWorkerEventHandler(
1264 'exit',
1265 this.opts.exitHandler ?? EMPTY_FUNCTION
1266 )
1267 workerNode.registerOnceWorkerEventHandler('exit', () => {
1268 this.removeWorkerNode(workerNode)
1269 })
1270 const workerNodeKey = this.addWorkerNode(workerNode)
1271 this.afterWorkerNodeSetup(workerNodeKey)
1272 return workerNodeKey
1273 }
1274
1275 /**
1276 * Creates a new, completely set up dynamic worker node.
1277 *
1278 * @returns New, completely set up dynamic worker node key.
1279 */
1280 protected createAndSetupDynamicWorkerNode (): number {
1281 const workerNodeKey = this.createAndSetupWorkerNode()
1282 this.registerWorkerMessageListener(workerNodeKey, message => {
1283 this.checkMessageWorkerId(message)
1284 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1285 message.workerId
1286 )
1287 const workerUsage = this.workerNodes[localWorkerNodeKey]?.usage
1288 // Kill message received from worker
1289 if (
1290 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1291 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
1292 ((this.opts.enableTasksQueue === false &&
1293 workerUsage.tasks.executing === 0) ||
1294 (this.opts.enableTasksQueue === true &&
1295 workerUsage.tasks.executing === 0 &&
1296 this.tasksQueueSize(localWorkerNodeKey) === 0)))
1297 ) {
1298 // Flag the worker node as not ready immediately
1299 this.flagWorkerNodeAsNotReady(localWorkerNodeKey)
1300 this.destroyWorkerNode(localWorkerNodeKey).catch(error => {
1301 this.emitter?.emit(PoolEvents.error, error)
1302 })
1303 }
1304 })
1305 this.sendToWorker(workerNodeKey, {
1306 checkActive: true
1307 })
1308 if (this.taskFunctions.size > 0) {
1309 for (const [taskFunctionName, taskFunction] of this.taskFunctions) {
1310 this.sendTaskFunctionOperationToWorker(workerNodeKey, {
1311 taskFunctionOperation: 'add',
1312 taskFunctionName,
1313 taskFunction: taskFunction.toString()
1314 }).catch(error => {
1315 this.emitter?.emit(PoolEvents.error, error)
1316 })
1317 }
1318 }
1319 const workerNode = this.workerNodes[workerNodeKey]
1320 workerNode.info.dynamic = true
1321 if (
1322 this.workerChoiceStrategyContext?.getStrategyPolicy()
1323 .dynamicWorkerReady === true ||
1324 this.workerChoiceStrategyContext?.getStrategyPolicy()
1325 .dynamicWorkerUsage === true
1326 ) {
1327 workerNode.info.ready = true
1328 }
1329 this.checkAndEmitDynamicWorkerCreationEvents()
1330 return workerNodeKey
1331 }
1332
1333 /**
1334 * Registers a listener callback on the worker given its worker node key.
1335 *
1336 * @param workerNodeKey - The worker node key.
1337 * @param listener - The message listener callback.
1338 */
1339 protected abstract registerWorkerMessageListener<
1340 Message extends Data | Response
1341 >(
1342 workerNodeKey: number,
1343 listener: (message: MessageValue<Message>) => void
1344 ): void
1345
1346 /**
1347 * Registers once a listener callback on the worker given its worker node key.
1348 *
1349 * @param workerNodeKey - The worker node key.
1350 * @param listener - The message listener callback.
1351 */
1352 protected abstract registerOnceWorkerMessageListener<
1353 Message extends Data | Response
1354 >(
1355 workerNodeKey: number,
1356 listener: (message: MessageValue<Message>) => void
1357 ): void
1358
1359 /**
1360 * Deregisters a listener callback on the worker given its worker node key.
1361 *
1362 * @param workerNodeKey - The worker node key.
1363 * @param listener - The message listener callback.
1364 */
1365 protected abstract deregisterWorkerMessageListener<
1366 Message extends Data | Response
1367 >(
1368 workerNodeKey: number,
1369 listener: (message: MessageValue<Message>) => void
1370 ): void
1371
1372 /**
1373 * Method hooked up after a worker node has been newly created.
1374 * Can be overridden.
1375 *
1376 * @param workerNodeKey - The newly created worker node key.
1377 */
1378 protected afterWorkerNodeSetup (workerNodeKey: number): void {
1379 // Listen to worker messages.
1380 this.registerWorkerMessageListener(
1381 workerNodeKey,
1382 this.workerMessageListener
1383 )
1384 // Send the startup message to worker.
1385 this.sendStartupMessageToWorker(workerNodeKey)
1386 // Send the statistics message to worker.
1387 this.sendStatisticsMessageToWorker(workerNodeKey)
1388 if (this.opts.enableTasksQueue === true) {
1389 if (this.opts.tasksQueueOptions?.taskStealing === true) {
1390 this.workerNodes[workerNodeKey].on(
1391 'idle',
1392 this.handleWorkerNodeIdleEvent
1393 )
1394 }
1395 if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
1396 this.workerNodes[workerNodeKey].on(
1397 'backPressure',
1398 this.handleWorkerNodeBackPressureEvent
1399 )
1400 }
1401 }
1402 }
1403
1404 /**
1405 * Sends the startup message to worker given its worker node key.
1406 *
1407 * @param workerNodeKey - The worker node key.
1408 */
1409 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1410
1411 /**
1412 * Sends the statistics message to worker given its worker node key.
1413 *
1414 * @param workerNodeKey - The worker node key.
1415 */
1416 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
1417 this.sendToWorker(workerNodeKey, {
1418 statistics: {
1419 runTime:
1420 this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
1421 .runTime.aggregate ?? false,
1422 elu:
1423 this.workerChoiceStrategyContext?.getTaskStatisticsRequirements().elu
1424 .aggregate ?? false
1425 }
1426 })
1427 }
1428
1429 private cannotStealTask (): boolean {
1430 return this.workerNodes.length <= 1 || this.info.queuedTasks === 0
1431 }
1432
1433 private handleTask (workerNodeKey: number, task: Task<Data>): void {
1434 if (this.shallExecuteTask(workerNodeKey)) {
1435 this.executeTask(workerNodeKey, task)
1436 } else {
1437 this.enqueueTask(workerNodeKey, task)
1438 }
1439 }
1440
1441 private redistributeQueuedTasks (workerNodeKey: number): void {
1442 if (workerNodeKey === -1 || this.cannotStealTask()) {
1443 return
1444 }
1445 while (this.tasksQueueSize(workerNodeKey) > 0) {
1446 const destinationWorkerNodeKey = this.workerNodes.reduce(
1447 (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
1448 return workerNode.info.ready &&
1449 workerNode.usage.tasks.queued <
1450 workerNodes[minWorkerNodeKey].usage.tasks.queued
1451 ? workerNodeKey
1452 : minWorkerNodeKey
1453 },
1454 0
1455 )
1456 this.handleTask(
1457 destinationWorkerNodeKey,
1458 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1459 this.dequeueTask(workerNodeKey)!
1460 )
1461 }
1462 }
1463
1464 private updateTaskStolenStatisticsWorkerUsage (
1465 workerNodeKey: number,
1466 taskName: string
1467 ): void {
1468 const workerNode = this.workerNodes[workerNodeKey]
1469 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1470 if (workerNode?.usage != null) {
1471 ++workerNode.usage.tasks.stolen
1472 }
1473 if (
1474 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1475 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1476 ) {
1477 const taskFunctionWorkerUsage =
1478 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1479 workerNode.getTaskFunctionWorkerUsage(taskName)!
1480 ++taskFunctionWorkerUsage.tasks.stolen
1481 }
1482 }
1483
1484 private updateTaskSequentiallyStolenStatisticsWorkerUsage (
1485 workerNodeKey: number
1486 ): void {
1487 const workerNode = this.workerNodes[workerNodeKey]
1488 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1489 if (workerNode?.usage != null) {
1490 ++workerNode.usage.tasks.sequentiallyStolen
1491 }
1492 }
1493
1494 private updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1495 workerNodeKey: number,
1496 taskName: string
1497 ): void {
1498 const workerNode = this.workerNodes[workerNodeKey]
1499 if (
1500 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1501 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1502 ) {
1503 const taskFunctionWorkerUsage =
1504 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1505 workerNode.getTaskFunctionWorkerUsage(taskName)!
1506 ++taskFunctionWorkerUsage.tasks.sequentiallyStolen
1507 }
1508 }
1509
1510 private resetTaskSequentiallyStolenStatisticsWorkerUsage (
1511 workerNodeKey: number
1512 ): void {
1513 const workerNode = this.workerNodes[workerNodeKey]
1514 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1515 if (workerNode?.usage != null) {
1516 workerNode.usage.tasks.sequentiallyStolen = 0
1517 }
1518 }
1519
1520 private resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1521 workerNodeKey: number,
1522 taskName: string
1523 ): void {
1524 const workerNode = this.workerNodes[workerNodeKey]
1525 if (
1526 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1527 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1528 ) {
1529 const taskFunctionWorkerUsage =
1530 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1531 workerNode.getTaskFunctionWorkerUsage(taskName)!
1532 taskFunctionWorkerUsage.tasks.sequentiallyStolen = 0
1533 }
1534 }
1535
1536 private readonly handleWorkerNodeIdleEvent = (
1537 eventDetail: WorkerNodeEventDetail,
1538 previousStolenTask?: Task<Data>
1539 ): void => {
1540 const { workerNodeKey } = eventDetail
1541 if (workerNodeKey == null) {
1542 throw new Error(
1543 "WorkerNode event detail 'workerNodeKey' property must be defined"
1544 )
1545 }
1546 const workerInfo = this.getWorkerInfo(workerNodeKey)
1547 if (
1548 this.cannotStealTask() ||
1549 (this.info.stealingWorkerNodes ?? 0) >
1550 Math.floor(this.workerNodes.length / 2)
1551 ) {
1552 if (workerInfo != null && previousStolenTask != null) {
1553 workerInfo.stealing = false
1554 }
1555 return
1556 }
1557 const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks
1558 if (
1559 workerInfo != null &&
1560 previousStolenTask != null &&
1561 workerNodeTasksUsage.sequentiallyStolen > 0 &&
1562 (workerNodeTasksUsage.executing > 0 ||
1563 this.tasksQueueSize(workerNodeKey) > 0)
1564 ) {
1565 workerInfo.stealing = false
1566 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1567 for (const taskName of this.workerNodes[workerNodeKey].info
1568 .taskFunctionNames!) {
1569 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1570 workerNodeKey,
1571 taskName
1572 )
1573 }
1574 this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
1575 return
1576 }
1577 if (workerInfo == null) {
1578 throw new Error(
1579 `Worker node with key '${workerNodeKey}' not found in pool`
1580 )
1581 }
1582 workerInfo.stealing = true
1583 const stolenTask = this.workerNodeStealTask(workerNodeKey)
1584 if (
1585 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1586 stolenTask != null
1587 ) {
1588 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1589 const taskFunctionTasksWorkerUsage = this.workerNodes[
1590 workerNodeKey
1591 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1592 ].getTaskFunctionWorkerUsage(stolenTask.name!)!.tasks
1593 if (
1594 taskFunctionTasksWorkerUsage.sequentiallyStolen === 0 ||
1595 (previousStolenTask != null &&
1596 previousStolenTask.name === stolenTask.name &&
1597 taskFunctionTasksWorkerUsage.sequentiallyStolen > 0)
1598 ) {
1599 this.updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1600 workerNodeKey,
1601 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1602 stolenTask.name!
1603 )
1604 } else {
1605 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1606 workerNodeKey,
1607 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1608 stolenTask.name!
1609 )
1610 }
1611 }
1612 sleep(exponentialDelay(workerNodeTasksUsage.sequentiallyStolen))
1613 .then(() => {
1614 this.handleWorkerNodeIdleEvent(eventDetail, stolenTask)
1615 return undefined
1616 })
1617 .catch(error => {
1618 this.emitter?.emit(PoolEvents.error, error)
1619 })
1620 }
1621
1622 private readonly workerNodeStealTask = (
1623 workerNodeKey: number
1624 ): Task<Data> | undefined => {
1625 const workerNodes = this.workerNodes
1626 .slice()
1627 .sort(
1628 (workerNodeA, workerNodeB) =>
1629 workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
1630 )
1631 const sourceWorkerNode = workerNodes.find(
1632 (sourceWorkerNode, sourceWorkerNodeKey) =>
1633 sourceWorkerNode.info.ready &&
1634 !sourceWorkerNode.info.stealing &&
1635 sourceWorkerNodeKey !== workerNodeKey &&
1636 sourceWorkerNode.usage.tasks.queued > 0
1637 )
1638 if (sourceWorkerNode != null) {
1639 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1640 const task = sourceWorkerNode.popTask()!
1641 this.handleTask(workerNodeKey, task)
1642 this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
1643 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1644 this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey, task.name!)
1645 return task
1646 }
1647 }
1648
1649 private readonly handleWorkerNodeBackPressureEvent = (
1650 eventDetail: WorkerNodeEventDetail
1651 ): void => {
1652 if (
1653 this.cannotStealTask() ||
1654 (this.info.stealingWorkerNodes ?? 0) >
1655 Math.floor(this.workerNodes.length / 2)
1656 ) {
1657 return
1658 }
1659 const { workerId } = eventDetail
1660 const sizeOffset = 1
1661 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1662 if (this.opts.tasksQueueOptions!.size! <= sizeOffset) {
1663 return
1664 }
1665 const sourceWorkerNode =
1666 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
1667 const workerNodes = this.workerNodes
1668 .slice()
1669 .sort(
1670 (workerNodeA, workerNodeB) =>
1671 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1672 )
1673 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1674 if (
1675 sourceWorkerNode.usage.tasks.queued > 0 &&
1676 workerNode.info.ready &&
1677 !workerNode.info.stealing &&
1678 workerNode.info.id !== workerId &&
1679 workerNode.usage.tasks.queued <
1680 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1681 this.opts.tasksQueueOptions!.size! - sizeOffset
1682 ) {
1683 const workerInfo = this.getWorkerInfo(workerNodeKey)
1684 if (workerInfo == null) {
1685 throw new Error(
1686 `Worker node with key '${workerNodeKey}' not found in pool`
1687 )
1688 }
1689 workerInfo.stealing = true
1690 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1691 const task = sourceWorkerNode.popTask()!
1692 this.handleTask(workerNodeKey, task)
1693 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1694 this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey, task.name!)
1695 workerInfo.stealing = false
1696 }
1697 }
1698 }
1699
1700 /**
1701 * This method is the message listener registered on each worker.
1702 */
1703 protected readonly workerMessageListener = (
1704 message: MessageValue<Response>
1705 ): void => {
1706 this.checkMessageWorkerId(message)
1707 const { workerId, ready, taskId, taskFunctionNames } = message
1708 if (ready != null && taskFunctionNames != null) {
1709 // Worker ready response received from worker
1710 this.handleWorkerReadyResponse(message)
1711 } else if (taskId != null) {
1712 // Task execution response received from worker
1713 this.handleTaskExecutionResponse(message)
1714 } else if (taskFunctionNames != null) {
1715 // Task function names message received from worker
1716 const workerInfo = this.getWorkerInfo(
1717 this.getWorkerNodeKeyByWorkerId(workerId)
1718 )
1719 if (workerInfo != null) {
1720 workerInfo.taskFunctionNames = taskFunctionNames
1721 }
1722 }
1723 }
1724
1725 private checkAndEmitReadyEvent (): void {
1726 if (!this.readyEventEmitted && this.ready) {
1727 this.emitter?.emit(PoolEvents.ready, this.info)
1728 this.readyEventEmitted = true
1729 }
1730 }
1731
1732 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
1733 const { workerId, ready, taskFunctionNames } = message
1734 if (ready == null || !ready) {
1735 throw new Error(`Worker ${workerId} failed to initialize`)
1736 }
1737 const workerNode =
1738 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
1739 workerNode.info.ready = ready
1740 workerNode.info.taskFunctionNames = taskFunctionNames
1741 this.checkAndEmitReadyEvent()
1742 }
1743
1744 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1745 const { workerId, taskId, workerError, data } = message
1746 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1747 const promiseResponse = this.promiseResponseMap.get(taskId!)
1748 if (promiseResponse != null) {
1749 const { resolve, reject, workerNodeKey, asyncResource } = promiseResponse
1750 const workerNode = this.workerNodes[workerNodeKey]
1751 if (workerError != null) {
1752 this.emitter?.emit(PoolEvents.taskError, workerError)
1753 asyncResource != null
1754 ? asyncResource.runInAsyncScope(
1755 reject,
1756 this.emitter,
1757 workerError.message
1758 )
1759 : reject(workerError.message)
1760 } else {
1761 asyncResource != null
1762 ? asyncResource.runInAsyncScope(resolve, this.emitter, data)
1763 : resolve(data as Response)
1764 }
1765 asyncResource?.emitDestroy()
1766 this.afterTaskExecutionHook(workerNodeKey, message)
1767 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1768 this.promiseResponseMap.delete(taskId!)
1769 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1770 workerNode?.emit('taskFinished', taskId)
1771 if (this.opts.enableTasksQueue === true && !this.destroying) {
1772 const workerNodeTasksUsage = workerNode.usage.tasks
1773 if (
1774 this.tasksQueueSize(workerNodeKey) > 0 &&
1775 workerNodeTasksUsage.executing <
1776 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1777 this.opts.tasksQueueOptions!.concurrency!
1778 ) {
1779 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1780 this.executeTask(workerNodeKey, this.dequeueTask(workerNodeKey)!)
1781 }
1782 if (
1783 workerNodeTasksUsage.executing === 0 &&
1784 this.tasksQueueSize(workerNodeKey) === 0 &&
1785 workerNodeTasksUsage.sequentiallyStolen === 0
1786 ) {
1787 workerNode.emit('idle', {
1788 workerId,
1789 workerNodeKey
1790 })
1791 }
1792 }
1793 }
1794 }
1795
1796 private checkAndEmitTaskExecutionEvents (): void {
1797 if (this.busy) {
1798 this.emitter?.emit(PoolEvents.busy, this.info)
1799 }
1800 }
1801
1802 private checkAndEmitTaskQueuingEvents (): void {
1803 if (this.hasBackPressure()) {
1804 this.emitter?.emit(PoolEvents.backPressure, this.info)
1805 }
1806 }
1807
1808 /**
1809 * Emits dynamic worker creation events.
1810 */
1811 protected abstract checkAndEmitDynamicWorkerCreationEvents (): void
1812
1813 /**
1814 * Gets the worker information given its worker node key.
1815 *
1816 * @param workerNodeKey - The worker node key.
1817 * @returns The worker information.
1818 */
1819 protected getWorkerInfo (workerNodeKey: number): WorkerInfo | undefined {
1820 return this.workerNodes[workerNodeKey]?.info
1821 }
1822
1823 /**
1824 * Creates a worker node.
1825 *
1826 * @returns The created worker node.
1827 */
1828 private createWorkerNode (): IWorkerNode<Worker, Data> {
1829 const workerNode = new WorkerNode<Worker, Data>(
1830 this.worker,
1831 this.filePath,
1832 {
1833 env: this.opts.env,
1834 workerOptions: this.opts.workerOptions,
1835 tasksQueueBackPressureSize:
1836 this.opts.tasksQueueOptions?.size ??
1837 getDefaultTasksQueueOptions(
1838 this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
1839 ).size
1840 }
1841 )
1842 // Flag the worker node as ready at pool startup.
1843 if (this.starting) {
1844 workerNode.info.ready = true
1845 }
1846 return workerNode
1847 }
1848
1849 /**
1850 * Adds the given worker node in the pool worker nodes.
1851 *
1852 * @param workerNode - The worker node.
1853 * @returns The added worker node key.
1854 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1855 */
1856 private addWorkerNode (workerNode: IWorkerNode<Worker, Data>): number {
1857 this.workerNodes.push(workerNode)
1858 const workerNodeKey = this.workerNodes.indexOf(workerNode)
1859 if (workerNodeKey === -1) {
1860 throw new Error('Worker added not found in worker nodes')
1861 }
1862 return workerNodeKey
1863 }
1864
1865 private checkAndEmitEmptyEvent (): void {
1866 if (this.empty) {
1867 this.emitter?.emit(PoolEvents.empty, this.info)
1868 this.readyEventEmitted = false
1869 }
1870 }
1871
1872 /**
1873 * Removes the worker node from the pool worker nodes.
1874 *
1875 * @param workerNode - The worker node.
1876 */
1877 private removeWorkerNode (workerNode: IWorkerNode<Worker, Data>): void {
1878 const workerNodeKey = this.workerNodes.indexOf(workerNode)
1879 if (workerNodeKey !== -1) {
1880 this.workerNodes.splice(workerNodeKey, 1)
1881 this.workerChoiceStrategyContext?.remove(workerNodeKey)
1882 }
1883 this.checkAndEmitEmptyEvent()
1884 }
1885
1886 protected flagWorkerNodeAsNotReady (workerNodeKey: number): void {
1887 const workerInfo = this.getWorkerInfo(workerNodeKey)
1888 if (workerInfo != null) {
1889 workerInfo.ready = false
1890 }
1891 }
1892
1893 private hasBackPressure (): boolean {
1894 return (
1895 this.opts.enableTasksQueue === true &&
1896 this.workerNodes.findIndex(
1897 workerNode => !workerNode.hasBackPressure()
1898 ) === -1
1899 )
1900 }
1901
1902 /**
1903 * Executes the given task on the worker given its worker node key.
1904 *
1905 * @param workerNodeKey - The worker node key.
1906 * @param task - The task to execute.
1907 */
1908 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1909 this.beforeTaskExecutionHook(workerNodeKey, task)
1910 this.sendToWorker(workerNodeKey, task, task.transferList)
1911 this.checkAndEmitTaskExecutionEvents()
1912 }
1913
1914 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1915 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1916 this.checkAndEmitTaskQueuingEvents()
1917 return tasksQueueSize
1918 }
1919
1920 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1921 return this.workerNodes[workerNodeKey].dequeueTask()
1922 }
1923
1924 private tasksQueueSize (workerNodeKey: number): number {
1925 return this.workerNodes[workerNodeKey].tasksQueueSize()
1926 }
1927
1928 protected flushTasksQueue (workerNodeKey: number): number {
1929 let flushedTasks = 0
1930 while (this.tasksQueueSize(workerNodeKey) > 0) {
1931 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1932 this.executeTask(workerNodeKey, this.dequeueTask(workerNodeKey)!)
1933 ++flushedTasks
1934 }
1935 this.workerNodes[workerNodeKey].clearTasksQueue()
1936 return flushedTasks
1937 }
1938
1939 private flushTasksQueues (): void {
1940 for (const [workerNodeKey] of this.workerNodes.entries()) {
1941 this.flushTasksQueue(workerNodeKey)
1942 }
1943 }
1944 }