Merge branch 'master' into combined-prs-branch
[poolifier.git] / src / pools / abstract-pool.ts
1 import { randomUUID } from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import type { TransferListItem } from 'node:worker_threads'
4 import { EventEmitterAsyncResource } from 'node:events'
5 import { AsyncResource } from 'node:async_hooks'
6 import type {
7 MessageValue,
8 PromiseResponseWrapper,
9 Task
10 } from '../utility-types'
11 import {
12 DEFAULT_TASK_NAME,
13 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
14 EMPTY_FUNCTION,
15 average,
16 exponentialDelay,
17 isKillBehavior,
18 isPlainObject,
19 max,
20 median,
21 min,
22 round,
23 sleep
24 } from '../utils'
25 import { KillBehaviors } from '../worker/worker-options'
26 import type { TaskFunction } from '../worker/task-functions'
27 import {
28 type IPool,
29 PoolEvents,
30 type PoolInfo,
31 type PoolOptions,
32 type PoolType,
33 PoolTypes,
34 type TasksQueueOptions
35 } from './pool'
36 import type {
37 IWorker,
38 IWorkerNode,
39 TaskStatistics,
40 WorkerInfo,
41 WorkerNodeEventDetail,
42 WorkerType,
43 WorkerUsage
44 } from './worker'
45 import {
46 type MeasurementStatisticsRequirements,
47 Measurements,
48 WorkerChoiceStrategies,
49 type WorkerChoiceStrategy,
50 type WorkerChoiceStrategyOptions
51 } from './selection-strategies/selection-strategies-types'
52 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
53 import { version } from './version'
54 import { WorkerNode } from './worker-node'
55 import {
56 checkFilePath,
57 checkValidTasksQueueOptions,
58 checkValidWorkerChoiceStrategy,
59 updateMeasurementStatistics
60 } from './utils'
61
62 /**
63 * Base class that implements some shared logic for all poolifier pools.
64 *
65 * @typeParam Worker - Type of worker which manages this pool.
66 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
67 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
68 */
69 export abstract class AbstractPool<
70 Worker extends IWorker,
71 Data = unknown,
72 Response = unknown
73 > implements IPool<Worker, Data, Response> {
74 /** @inheritDoc */
75 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
76
77 /** @inheritDoc */
78 public emitter?: EventEmitterAsyncResource
79
80 /**
81 * Dynamic pool maximum size property placeholder.
82 */
83 protected readonly max?: number
84
85 /**
86 * The task execution response promise map:
87 * - `key`: The message id of each submitted task.
88 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
89 *
90 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
91 */
92 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
93 new Map<string, PromiseResponseWrapper<Response>>()
94
95 /**
96 * Worker choice strategy context referencing a worker choice algorithm implementation.
97 */
98 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
99 Worker,
100 Data,
101 Response
102 >
103
104 /**
105 * The task functions added at runtime map:
106 * - `key`: The task function name.
107 * - `value`: The task function itself.
108 */
109 private readonly taskFunctions: Map<string, TaskFunction<Data, Response>>
110
111 /**
112 * Whether the pool is started or not.
113 */
114 private started: boolean
115 /**
116 * Whether the pool is starting or not.
117 */
118 private starting: boolean
119 /**
120 * Whether the pool is destroying or not.
121 */
122 private destroying: boolean
123 /**
124 * Whether the pool ready event has been emitted or not.
125 */
126 private readyEventEmitted: boolean
127 /**
128 * The start timestamp of the pool.
129 */
130 private readonly startTimestamp
131
132 /**
133 * Constructs a new poolifier pool.
134 *
135 * @param numberOfWorkers - Number of workers that this pool should manage.
136 * @param filePath - Path to the worker file.
137 * @param opts - Options for the pool.
138 */
139 public constructor (
140 protected readonly numberOfWorkers: number,
141 protected readonly filePath: string,
142 protected readonly opts: PoolOptions<Worker>
143 ) {
144 if (!this.isMain()) {
145 throw new Error(
146 'Cannot start a pool from a worker with the same type as the pool'
147 )
148 }
149 checkFilePath(this.filePath)
150 this.checkNumberOfWorkers(this.numberOfWorkers)
151 this.checkPoolOptions(this.opts)
152
153 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
154 this.executeTask = this.executeTask.bind(this)
155 this.enqueueTask = this.enqueueTask.bind(this)
156
157 if (this.opts.enableEvents === true) {
158 this.initializeEventEmitter()
159 }
160 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
161 Worker,
162 Data,
163 Response
164 >(
165 this,
166 this.opts.workerChoiceStrategy,
167 this.opts.workerChoiceStrategyOptions
168 )
169
170 this.setupHook()
171
172 this.taskFunctions = new Map<string, TaskFunction<Data, Response>>()
173
174 this.started = false
175 this.starting = false
176 this.destroying = false
177 this.readyEventEmitted = false
178 if (this.opts.startWorkers === true) {
179 this.start()
180 }
181
182 this.startTimestamp = performance.now()
183 }
184
185 private checkNumberOfWorkers (numberOfWorkers: number): void {
186 if (numberOfWorkers == null) {
187 throw new Error(
188 'Cannot instantiate a pool without specifying the number of workers'
189 )
190 } else if (!Number.isSafeInteger(numberOfWorkers)) {
191 throw new TypeError(
192 'Cannot instantiate a pool with a non safe integer number of workers'
193 )
194 } else if (numberOfWorkers < 0) {
195 throw new RangeError(
196 'Cannot instantiate a pool with a negative number of workers'
197 )
198 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
199 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
200 }
201 }
202
203 private checkPoolOptions (opts: PoolOptions<Worker>): void {
204 if (isPlainObject(opts)) {
205 this.opts.startWorkers = opts.startWorkers ?? true
206 checkValidWorkerChoiceStrategy(
207 opts.workerChoiceStrategy as WorkerChoiceStrategy
208 )
209 this.opts.workerChoiceStrategy =
210 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
211 this.checkValidWorkerChoiceStrategyOptions(
212 opts.workerChoiceStrategyOptions as WorkerChoiceStrategyOptions
213 )
214 this.opts.workerChoiceStrategyOptions = {
215 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
216 ...opts.workerChoiceStrategyOptions
217 }
218 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
219 this.opts.enableEvents = opts.enableEvents ?? true
220 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
221 if (this.opts.enableTasksQueue) {
222 checkValidTasksQueueOptions(opts.tasksQueueOptions as TasksQueueOptions)
223 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
224 opts.tasksQueueOptions as TasksQueueOptions
225 )
226 }
227 } else {
228 throw new TypeError('Invalid pool options: must be a plain object')
229 }
230 }
231
232 private checkValidWorkerChoiceStrategyOptions (
233 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
234 ): void {
235 if (
236 workerChoiceStrategyOptions != null &&
237 !isPlainObject(workerChoiceStrategyOptions)
238 ) {
239 throw new TypeError(
240 'Invalid worker choice strategy options: must be a plain object'
241 )
242 }
243 if (
244 workerChoiceStrategyOptions?.retries != null &&
245 !Number.isSafeInteger(workerChoiceStrategyOptions.retries)
246 ) {
247 throw new TypeError(
248 'Invalid worker choice strategy options: retries must be an integer'
249 )
250 }
251 if (
252 workerChoiceStrategyOptions?.retries != null &&
253 workerChoiceStrategyOptions.retries < 0
254 ) {
255 throw new RangeError(
256 `Invalid worker choice strategy options: retries '${workerChoiceStrategyOptions.retries}' must be greater or equal than zero`
257 )
258 }
259 if (
260 workerChoiceStrategyOptions?.weights != null &&
261 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
262 ) {
263 throw new Error(
264 'Invalid worker choice strategy options: must have a weight for each worker node'
265 )
266 }
267 if (
268 workerChoiceStrategyOptions?.measurement != null &&
269 !Object.values(Measurements).includes(
270 workerChoiceStrategyOptions.measurement
271 )
272 ) {
273 throw new Error(
274 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
275 )
276 }
277 }
278
279 private initializeEventEmitter (): void {
280 this.emitter = new EventEmitterAsyncResource({
281 name: `poolifier:${this.type}-${this.worker}-pool`
282 })
283 }
284
285 /** @inheritDoc */
286 public get info (): PoolInfo {
287 return {
288 version,
289 type: this.type,
290 worker: this.worker,
291 started: this.started,
292 ready: this.ready,
293 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
294 minSize: this.minSize,
295 maxSize: this.maxSize,
296 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
297 .runTime.aggregate &&
298 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
299 .waitTime.aggregate && { utilization: round(this.utilization) }),
300 workerNodes: this.workerNodes.length,
301 idleWorkerNodes: this.workerNodes.reduce(
302 (accumulator, workerNode) =>
303 workerNode.usage.tasks.executing === 0
304 ? accumulator + 1
305 : accumulator,
306 0
307 ),
308 busyWorkerNodes: this.workerNodes.reduce(
309 (accumulator, _workerNode, workerNodeKey) =>
310 this.isWorkerNodeBusy(workerNodeKey) ? accumulator + 1 : accumulator,
311 0
312 ),
313 executedTasks: this.workerNodes.reduce(
314 (accumulator, workerNode) =>
315 accumulator + workerNode.usage.tasks.executed,
316 0
317 ),
318 executingTasks: this.workerNodes.reduce(
319 (accumulator, workerNode) =>
320 accumulator + workerNode.usage.tasks.executing,
321 0
322 ),
323 ...(this.opts.enableTasksQueue === true && {
324 queuedTasks: this.workerNodes.reduce(
325 (accumulator, workerNode) =>
326 accumulator + workerNode.usage.tasks.queued,
327 0
328 )
329 }),
330 ...(this.opts.enableTasksQueue === true && {
331 maxQueuedTasks: this.workerNodes.reduce(
332 (accumulator, workerNode) =>
333 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
334 0
335 )
336 }),
337 ...(this.opts.enableTasksQueue === true && {
338 backPressure: this.hasBackPressure()
339 }),
340 ...(this.opts.enableTasksQueue === true && {
341 stolenTasks: this.workerNodes.reduce(
342 (accumulator, workerNode) =>
343 accumulator + workerNode.usage.tasks.stolen,
344 0
345 )
346 }),
347 failedTasks: this.workerNodes.reduce(
348 (accumulator, workerNode) =>
349 accumulator + workerNode.usage.tasks.failed,
350 0
351 ),
352 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
353 .runTime.aggregate && {
354 runTime: {
355 minimum: round(
356 min(
357 ...this.workerNodes.map(
358 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
359 )
360 )
361 ),
362 maximum: round(
363 max(
364 ...this.workerNodes.map(
365 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
366 )
367 )
368 ),
369 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
370 .runTime.average && {
371 average: round(
372 average(
373 this.workerNodes.reduce<number[]>(
374 (accumulator, workerNode) =>
375 accumulator.concat(workerNode.usage.runTime.history),
376 []
377 )
378 )
379 )
380 }),
381 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
382 .runTime.median && {
383 median: round(
384 median(
385 this.workerNodes.reduce<number[]>(
386 (accumulator, workerNode) =>
387 accumulator.concat(workerNode.usage.runTime.history),
388 []
389 )
390 )
391 )
392 })
393 }
394 }),
395 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
396 .waitTime.aggregate && {
397 waitTime: {
398 minimum: round(
399 min(
400 ...this.workerNodes.map(
401 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
402 )
403 )
404 ),
405 maximum: round(
406 max(
407 ...this.workerNodes.map(
408 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
409 )
410 )
411 ),
412 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
413 .waitTime.average && {
414 average: round(
415 average(
416 this.workerNodes.reduce<number[]>(
417 (accumulator, workerNode) =>
418 accumulator.concat(workerNode.usage.waitTime.history),
419 []
420 )
421 )
422 )
423 }),
424 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
425 .waitTime.median && {
426 median: round(
427 median(
428 this.workerNodes.reduce<number[]>(
429 (accumulator, workerNode) =>
430 accumulator.concat(workerNode.usage.waitTime.history),
431 []
432 )
433 )
434 )
435 })
436 }
437 })
438 }
439 }
440
441 /**
442 * The pool readiness boolean status.
443 */
444 private get ready (): boolean {
445 return (
446 this.workerNodes.reduce(
447 (accumulator, workerNode) =>
448 !workerNode.info.dynamic && workerNode.info.ready
449 ? accumulator + 1
450 : accumulator,
451 0
452 ) >= this.minSize
453 )
454 }
455
456 /**
457 * The approximate pool utilization.
458 *
459 * @returns The pool utilization.
460 */
461 private get utilization (): number {
462 const poolTimeCapacity =
463 (performance.now() - this.startTimestamp) * this.maxSize
464 const totalTasksRunTime = this.workerNodes.reduce(
465 (accumulator, workerNode) =>
466 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
467 0
468 )
469 const totalTasksWaitTime = this.workerNodes.reduce(
470 (accumulator, workerNode) =>
471 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
472 0
473 )
474 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
475 }
476
477 /**
478 * The pool type.
479 *
480 * If it is `'dynamic'`, it provides the `max` property.
481 */
482 protected abstract get type (): PoolType
483
484 /**
485 * The worker type.
486 */
487 protected abstract get worker (): WorkerType
488
489 /**
490 * The pool minimum size.
491 */
492 protected get minSize (): number {
493 return this.numberOfWorkers
494 }
495
496 /**
497 * The pool maximum size.
498 */
499 protected get maxSize (): number {
500 return this.max ?? this.numberOfWorkers
501 }
502
503 /**
504 * Checks if the worker id sent in the received message from a worker is valid.
505 *
506 * @param message - The received message.
507 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
508 */
509 private checkMessageWorkerId (message: MessageValue<Data | Response>): void {
510 if (message.workerId == null) {
511 throw new Error('Worker message received without worker id')
512 } else if (this.getWorkerNodeKeyByWorkerId(message.workerId) === -1) {
513 throw new Error(
514 `Worker message received from unknown worker '${message.workerId}'`
515 )
516 }
517 }
518
519 /**
520 * Gets the given worker its worker node key.
521 *
522 * @param worker - The worker.
523 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
524 */
525 private getWorkerNodeKeyByWorker (worker: Worker): number {
526 return this.workerNodes.findIndex(
527 workerNode => workerNode.worker === worker
528 )
529 }
530
531 /**
532 * Gets the worker node key given its worker id.
533 *
534 * @param workerId - The worker id.
535 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
536 */
537 private getWorkerNodeKeyByWorkerId (workerId: number | undefined): number {
538 return this.workerNodes.findIndex(
539 workerNode => workerNode.info.id === workerId
540 )
541 }
542
543 /** @inheritDoc */
544 public setWorkerChoiceStrategy (
545 workerChoiceStrategy: WorkerChoiceStrategy,
546 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
547 ): void {
548 checkValidWorkerChoiceStrategy(workerChoiceStrategy)
549 this.opts.workerChoiceStrategy = workerChoiceStrategy
550 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
551 this.opts.workerChoiceStrategy
552 )
553 if (workerChoiceStrategyOptions != null) {
554 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
555 }
556 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
557 workerNode.resetUsage()
558 this.sendStatisticsMessageToWorker(workerNodeKey)
559 }
560 }
561
562 /** @inheritDoc */
563 public setWorkerChoiceStrategyOptions (
564 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
565 ): void {
566 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
567 this.opts.workerChoiceStrategyOptions = {
568 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
569 ...workerChoiceStrategyOptions
570 }
571 this.workerChoiceStrategyContext.setOptions(
572 this.opts.workerChoiceStrategyOptions
573 )
574 }
575
576 /** @inheritDoc */
577 public enableTasksQueue (
578 enable: boolean,
579 tasksQueueOptions?: TasksQueueOptions
580 ): void {
581 if (this.opts.enableTasksQueue === true && !enable) {
582 this.unsetTaskStealing()
583 this.unsetTasksStealingOnBackPressure()
584 this.flushTasksQueues()
585 }
586 this.opts.enableTasksQueue = enable
587 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
588 }
589
590 /** @inheritDoc */
591 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
592 if (this.opts.enableTasksQueue === true) {
593 checkValidTasksQueueOptions(tasksQueueOptions)
594 this.opts.tasksQueueOptions =
595 this.buildTasksQueueOptions(tasksQueueOptions)
596 this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number)
597 if (this.opts.tasksQueueOptions.taskStealing === true) {
598 this.unsetTaskStealing()
599 this.setTaskStealing()
600 } else {
601 this.unsetTaskStealing()
602 }
603 if (this.opts.tasksQueueOptions.tasksStealingOnBackPressure === true) {
604 this.unsetTasksStealingOnBackPressure()
605 this.setTasksStealingOnBackPressure()
606 } else {
607 this.unsetTasksStealingOnBackPressure()
608 }
609 } else if (this.opts.tasksQueueOptions != null) {
610 delete this.opts.tasksQueueOptions
611 }
612 }
613
614 private buildTasksQueueOptions (
615 tasksQueueOptions: TasksQueueOptions
616 ): TasksQueueOptions {
617 return {
618 ...{
619 size: Math.pow(this.maxSize, 2),
620 concurrency: 1,
621 taskStealing: true,
622 tasksStealingOnBackPressure: true
623 },
624 ...tasksQueueOptions
625 }
626 }
627
628 private setTasksQueueSize (size: number): void {
629 for (const workerNode of this.workerNodes) {
630 workerNode.tasksQueueBackPressureSize = size
631 }
632 }
633
634 private setTaskStealing (): void {
635 for (const [workerNodeKey] of this.workerNodes.entries()) {
636 this.workerNodes[workerNodeKey].on(
637 'idleWorkerNode',
638 this.handleIdleWorkerNodeEvent
639 )
640 }
641 }
642
643 private unsetTaskStealing (): void {
644 for (const [workerNodeKey] of this.workerNodes.entries()) {
645 this.workerNodes[workerNodeKey].off(
646 'idleWorkerNode',
647 this.handleIdleWorkerNodeEvent
648 )
649 }
650 }
651
652 private setTasksStealingOnBackPressure (): void {
653 for (const [workerNodeKey] of this.workerNodes.entries()) {
654 this.workerNodes[workerNodeKey].on(
655 'backPressure',
656 this.handleBackPressureEvent
657 )
658 }
659 }
660
661 private unsetTasksStealingOnBackPressure (): void {
662 for (const [workerNodeKey] of this.workerNodes.entries()) {
663 this.workerNodes[workerNodeKey].off(
664 'backPressure',
665 this.handleBackPressureEvent
666 )
667 }
668 }
669
670 /**
671 * Whether the pool is full or not.
672 *
673 * The pool filling boolean status.
674 */
675 protected get full (): boolean {
676 return this.workerNodes.length >= this.maxSize
677 }
678
679 /**
680 * Whether the pool is busy or not.
681 *
682 * The pool busyness boolean status.
683 */
684 protected abstract get busy (): boolean
685
686 /**
687 * Whether worker nodes are executing concurrently their tasks quota or not.
688 *
689 * @returns Worker nodes busyness boolean status.
690 */
691 protected internalBusy (): boolean {
692 if (this.opts.enableTasksQueue === true) {
693 return (
694 this.workerNodes.findIndex(
695 workerNode =>
696 workerNode.info.ready &&
697 workerNode.usage.tasks.executing <
698 (this.opts.tasksQueueOptions?.concurrency as number)
699 ) === -1
700 )
701 }
702 return (
703 this.workerNodes.findIndex(
704 workerNode =>
705 workerNode.info.ready && workerNode.usage.tasks.executing === 0
706 ) === -1
707 )
708 }
709
710 private isWorkerNodeBusy (workerNodeKey: number): boolean {
711 if (this.opts.enableTasksQueue === true) {
712 return (
713 this.workerNodes[workerNodeKey].usage.tasks.executing >=
714 (this.opts.tasksQueueOptions?.concurrency as number)
715 )
716 }
717 return this.workerNodes[workerNodeKey].usage.tasks.executing > 0
718 }
719
720 private async sendTaskFunctionOperationToWorker (
721 workerNodeKey: number,
722 message: MessageValue<Data>
723 ): Promise<boolean> {
724 return await new Promise<boolean>((resolve, reject) => {
725 const taskFunctionOperationListener = (
726 message: MessageValue<Response>
727 ): void => {
728 this.checkMessageWorkerId(message)
729 const workerId = this.getWorkerInfo(workerNodeKey).id as number
730 if (
731 message.taskFunctionOperationStatus != null &&
732 message.workerId === workerId
733 ) {
734 if (message.taskFunctionOperationStatus) {
735 resolve(true)
736 } else if (!message.taskFunctionOperationStatus) {
737 reject(
738 new Error(
739 `Task function operation '${
740 message.taskFunctionOperation as string
741 }' failed on worker ${message.workerId} with error: '${
742 message.workerError?.message as string
743 }'`
744 )
745 )
746 }
747 this.deregisterWorkerMessageListener(
748 this.getWorkerNodeKeyByWorkerId(message.workerId),
749 taskFunctionOperationListener
750 )
751 }
752 }
753 this.registerWorkerMessageListener(
754 workerNodeKey,
755 taskFunctionOperationListener
756 )
757 this.sendToWorker(workerNodeKey, message)
758 })
759 }
760
761 private async sendTaskFunctionOperationToWorkers (
762 message: MessageValue<Data>
763 ): Promise<boolean> {
764 return await new Promise<boolean>((resolve, reject) => {
765 const responsesReceived = new Array<MessageValue<Response>>()
766 const taskFunctionOperationsListener = (
767 message: MessageValue<Response>
768 ): void => {
769 this.checkMessageWorkerId(message)
770 if (message.taskFunctionOperationStatus != null) {
771 responsesReceived.push(message)
772 if (responsesReceived.length === this.workerNodes.length) {
773 if (
774 responsesReceived.every(
775 message => message.taskFunctionOperationStatus === true
776 )
777 ) {
778 resolve(true)
779 } else if (
780 responsesReceived.some(
781 message => message.taskFunctionOperationStatus === false
782 )
783 ) {
784 const errorResponse = responsesReceived.find(
785 response => response.taskFunctionOperationStatus === false
786 )
787 reject(
788 new Error(
789 `Task function operation '${
790 message.taskFunctionOperation as string
791 }' failed on worker ${
792 errorResponse?.workerId as number
793 } with error: '${
794 errorResponse?.workerError?.message as string
795 }'`
796 )
797 )
798 }
799 this.deregisterWorkerMessageListener(
800 this.getWorkerNodeKeyByWorkerId(message.workerId),
801 taskFunctionOperationsListener
802 )
803 }
804 }
805 }
806 for (const [workerNodeKey] of this.workerNodes.entries()) {
807 this.registerWorkerMessageListener(
808 workerNodeKey,
809 taskFunctionOperationsListener
810 )
811 this.sendToWorker(workerNodeKey, message)
812 }
813 })
814 }
815
816 /** @inheritDoc */
817 public hasTaskFunction (name: string): boolean {
818 for (const workerNode of this.workerNodes) {
819 if (
820 Array.isArray(workerNode.info.taskFunctionNames) &&
821 workerNode.info.taskFunctionNames.includes(name)
822 ) {
823 return true
824 }
825 }
826 return false
827 }
828
829 /** @inheritDoc */
830 public async addTaskFunction (
831 name: string,
832 fn: TaskFunction<Data, Response>
833 ): Promise<boolean> {
834 if (typeof name !== 'string') {
835 throw new TypeError('name argument must be a string')
836 }
837 if (typeof name === 'string' && name.trim().length === 0) {
838 throw new TypeError('name argument must not be an empty string')
839 }
840 if (typeof fn !== 'function') {
841 throw new TypeError('fn argument must be a function')
842 }
843 const opResult = await this.sendTaskFunctionOperationToWorkers({
844 taskFunctionOperation: 'add',
845 taskFunctionName: name,
846 taskFunction: fn.toString()
847 })
848 this.taskFunctions.set(name, fn)
849 return opResult
850 }
851
852 /** @inheritDoc */
853 public async removeTaskFunction (name: string): Promise<boolean> {
854 if (!this.taskFunctions.has(name)) {
855 throw new Error(
856 'Cannot remove a task function not handled on the pool side'
857 )
858 }
859 const opResult = await this.sendTaskFunctionOperationToWorkers({
860 taskFunctionOperation: 'remove',
861 taskFunctionName: name
862 })
863 this.deleteTaskFunctionWorkerUsages(name)
864 this.taskFunctions.delete(name)
865 return opResult
866 }
867
868 /** @inheritDoc */
869 public listTaskFunctionNames (): string[] {
870 for (const workerNode of this.workerNodes) {
871 if (
872 Array.isArray(workerNode.info.taskFunctionNames) &&
873 workerNode.info.taskFunctionNames.length > 0
874 ) {
875 return workerNode.info.taskFunctionNames
876 }
877 }
878 return []
879 }
880
881 /** @inheritDoc */
882 public async setDefaultTaskFunction (name: string): Promise<boolean> {
883 return await this.sendTaskFunctionOperationToWorkers({
884 taskFunctionOperation: 'default',
885 taskFunctionName: name
886 })
887 }
888
889 private deleteTaskFunctionWorkerUsages (name: string): void {
890 for (const workerNode of this.workerNodes) {
891 workerNode.deleteTaskFunctionWorkerUsage(name)
892 }
893 }
894
895 private shallExecuteTask (workerNodeKey: number): boolean {
896 return (
897 this.tasksQueueSize(workerNodeKey) === 0 &&
898 this.workerNodes[workerNodeKey].usage.tasks.executing <
899 (this.opts.tasksQueueOptions?.concurrency as number)
900 )
901 }
902
903 /** @inheritDoc */
904 public async execute (
905 data?: Data,
906 name?: string,
907 transferList?: TransferListItem[]
908 ): Promise<Response> {
909 return await new Promise<Response>((resolve, reject) => {
910 if (!this.started) {
911 reject(new Error('Cannot execute a task on not started pool'))
912 return
913 }
914 if (this.destroying) {
915 reject(new Error('Cannot execute a task on destroying pool'))
916 return
917 }
918 if (name != null && typeof name !== 'string') {
919 reject(new TypeError('name argument must be a string'))
920 return
921 }
922 if (
923 name != null &&
924 typeof name === 'string' &&
925 name.trim().length === 0
926 ) {
927 reject(new TypeError('name argument must not be an empty string'))
928 return
929 }
930 if (transferList != null && !Array.isArray(transferList)) {
931 reject(new TypeError('transferList argument must be an array'))
932 return
933 }
934 const timestamp = performance.now()
935 const workerNodeKey = this.chooseWorkerNode()
936 const task: Task<Data> = {
937 name: name ?? DEFAULT_TASK_NAME,
938 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
939 data: data ?? ({} as Data),
940 transferList,
941 timestamp,
942 taskId: randomUUID()
943 }
944 this.promiseResponseMap.set(task.taskId as string, {
945 resolve,
946 reject,
947 workerNodeKey,
948 ...(this.emitter != null && {
949 asyncResource: new AsyncResource('poolifier:task', {
950 triggerAsyncId: this.emitter.asyncId,
951 requireManualDestroy: true
952 })
953 })
954 })
955 if (
956 this.opts.enableTasksQueue === false ||
957 (this.opts.enableTasksQueue === true &&
958 this.shallExecuteTask(workerNodeKey))
959 ) {
960 this.executeTask(workerNodeKey, task)
961 } else {
962 this.enqueueTask(workerNodeKey, task)
963 }
964 })
965 }
966
967 /** @inheritdoc */
968 public start (): void {
969 if (this.started) {
970 throw new Error('Cannot start an already started pool')
971 }
972 if (this.starting) {
973 throw new Error('Cannot start an already starting pool')
974 }
975 if (this.destroying) {
976 throw new Error('Cannot start a destroying pool')
977 }
978 this.starting = true
979 while (
980 this.workerNodes.reduce(
981 (accumulator, workerNode) =>
982 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
983 0
984 ) < this.numberOfWorkers
985 ) {
986 this.createAndSetupWorkerNode()
987 }
988 this.starting = false
989 this.started = true
990 }
991
992 /** @inheritDoc */
993 public async destroy (): Promise<void> {
994 if (!this.started) {
995 throw new Error('Cannot destroy an already destroyed pool')
996 }
997 if (this.starting) {
998 throw new Error('Cannot destroy an starting pool')
999 }
1000 if (this.destroying) {
1001 throw new Error('Cannot destroy an already destroying pool')
1002 }
1003 this.destroying = true
1004 await Promise.all(
1005 this.workerNodes.map(async (_workerNode, workerNodeKey) => {
1006 await this.destroyWorkerNode(workerNodeKey)
1007 })
1008 )
1009 this.emitter?.emit(PoolEvents.destroy, this.info)
1010 this.emitter?.emitDestroy()
1011 this.emitter?.removeAllListeners()
1012 this.readyEventEmitted = false
1013 this.destroying = false
1014 this.started = false
1015 }
1016
1017 protected async sendKillMessageToWorker (
1018 workerNodeKey: number
1019 ): Promise<void> {
1020 await new Promise<void>((resolve, reject) => {
1021 const killMessageListener = (message: MessageValue<Response>): void => {
1022 this.checkMessageWorkerId(message)
1023 if (message.kill === 'success') {
1024 resolve()
1025 } else if (message.kill === 'failure') {
1026 reject(
1027 new Error(
1028 `Kill message handling failed on worker ${
1029 message.workerId as number
1030 }`
1031 )
1032 )
1033 }
1034 }
1035 // FIXME: should be registered only once
1036 this.registerWorkerMessageListener(workerNodeKey, killMessageListener)
1037 this.sendToWorker(workerNodeKey, { kill: true })
1038 })
1039 }
1040
1041 /**
1042 * Terminates the worker node given its worker node key.
1043 *
1044 * @param workerNodeKey - The worker node key.
1045 */
1046 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
1047
1048 /**
1049 * Setup hook to execute code before worker nodes are created in the abstract constructor.
1050 * Can be overridden.
1051 *
1052 * @virtual
1053 */
1054 protected setupHook (): void {
1055 /* Intentionally empty */
1056 }
1057
1058 /**
1059 * Should return whether the worker is the main worker or not.
1060 */
1061 protected abstract isMain (): boolean
1062
1063 /**
1064 * Hook executed before the worker task execution.
1065 * Can be overridden.
1066 *
1067 * @param workerNodeKey - The worker node key.
1068 * @param task - The task to execute.
1069 */
1070 protected beforeTaskExecutionHook (
1071 workerNodeKey: number,
1072 task: Task<Data>
1073 ): void {
1074 if (this.workerNodes[workerNodeKey]?.usage != null) {
1075 const workerUsage = this.workerNodes[workerNodeKey].usage
1076 ++workerUsage.tasks.executing
1077 this.updateWaitTimeWorkerUsage(workerUsage, task)
1078 }
1079 if (
1080 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1081 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
1082 task.name as string
1083 ) != null
1084 ) {
1085 const taskFunctionWorkerUsage = this.workerNodes[
1086 workerNodeKey
1087 ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
1088 ++taskFunctionWorkerUsage.tasks.executing
1089 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
1090 }
1091 }
1092
1093 /**
1094 * Hook executed after the worker task execution.
1095 * Can be overridden.
1096 *
1097 * @param workerNodeKey - The worker node key.
1098 * @param message - The received message.
1099 */
1100 protected afterTaskExecutionHook (
1101 workerNodeKey: number,
1102 message: MessageValue<Response>
1103 ): void {
1104 if (this.workerNodes[workerNodeKey]?.usage != null) {
1105 const workerUsage = this.workerNodes[workerNodeKey].usage
1106 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
1107 this.updateRunTimeWorkerUsage(workerUsage, message)
1108 this.updateEluWorkerUsage(workerUsage, message)
1109 }
1110 if (
1111 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1112 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
1113 message.taskPerformance?.name as string
1114 ) != null
1115 ) {
1116 const taskFunctionWorkerUsage = this.workerNodes[
1117 workerNodeKey
1118 ].getTaskFunctionWorkerUsage(
1119 message.taskPerformance?.name as string
1120 ) as WorkerUsage
1121 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
1122 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
1123 this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
1124 }
1125 }
1126
1127 /**
1128 * Whether the worker node shall update its task function worker usage or not.
1129 *
1130 * @param workerNodeKey - The worker node key.
1131 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1132 */
1133 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
1134 const workerInfo = this.getWorkerInfo(workerNodeKey)
1135 return (
1136 workerInfo != null &&
1137 Array.isArray(workerInfo.taskFunctionNames) &&
1138 workerInfo.taskFunctionNames.length > 2
1139 )
1140 }
1141
1142 private updateTaskStatisticsWorkerUsage (
1143 workerUsage: WorkerUsage,
1144 message: MessageValue<Response>
1145 ): void {
1146 const workerTaskStatistics = workerUsage.tasks
1147 if (
1148 workerTaskStatistics.executing != null &&
1149 workerTaskStatistics.executing > 0
1150 ) {
1151 --workerTaskStatistics.executing
1152 }
1153 if (message.workerError == null) {
1154 ++workerTaskStatistics.executed
1155 } else {
1156 ++workerTaskStatistics.failed
1157 }
1158 }
1159
1160 private updateRunTimeWorkerUsage (
1161 workerUsage: WorkerUsage,
1162 message: MessageValue<Response>
1163 ): void {
1164 if (message.workerError != null) {
1165 return
1166 }
1167 updateMeasurementStatistics(
1168 workerUsage.runTime,
1169 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
1170 message.taskPerformance?.runTime ?? 0
1171 )
1172 }
1173
1174 private updateWaitTimeWorkerUsage (
1175 workerUsage: WorkerUsage,
1176 task: Task<Data>
1177 ): void {
1178 const timestamp = performance.now()
1179 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
1180 updateMeasurementStatistics(
1181 workerUsage.waitTime,
1182 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
1183 taskWaitTime
1184 )
1185 }
1186
1187 private updateEluWorkerUsage (
1188 workerUsage: WorkerUsage,
1189 message: MessageValue<Response>
1190 ): void {
1191 if (message.workerError != null) {
1192 return
1193 }
1194 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
1195 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
1196 updateMeasurementStatistics(
1197 workerUsage.elu.active,
1198 eluTaskStatisticsRequirements,
1199 message.taskPerformance?.elu?.active ?? 0
1200 )
1201 updateMeasurementStatistics(
1202 workerUsage.elu.idle,
1203 eluTaskStatisticsRequirements,
1204 message.taskPerformance?.elu?.idle ?? 0
1205 )
1206 if (eluTaskStatisticsRequirements.aggregate) {
1207 if (message.taskPerformance?.elu != null) {
1208 if (workerUsage.elu.utilization != null) {
1209 workerUsage.elu.utilization =
1210 (workerUsage.elu.utilization +
1211 message.taskPerformance.elu.utilization) /
1212 2
1213 } else {
1214 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
1215 }
1216 }
1217 }
1218 }
1219
1220 /**
1221 * Chooses a worker node for the next task.
1222 *
1223 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
1224 *
1225 * @returns The chosen worker node key
1226 */
1227 private chooseWorkerNode (): number {
1228 if (this.shallCreateDynamicWorker()) {
1229 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
1230 if (
1231 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1232 ) {
1233 return workerNodeKey
1234 }
1235 }
1236 return this.workerChoiceStrategyContext.execute()
1237 }
1238
1239 /**
1240 * Conditions for dynamic worker creation.
1241 *
1242 * @returns Whether to create a dynamic worker or not.
1243 */
1244 private shallCreateDynamicWorker (): boolean {
1245 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
1246 }
1247
1248 /**
1249 * Sends a message to worker given its worker node key.
1250 *
1251 * @param workerNodeKey - The worker node key.
1252 * @param message - The message.
1253 * @param transferList - The optional array of transferable objects.
1254 */
1255 protected abstract sendToWorker (
1256 workerNodeKey: number,
1257 message: MessageValue<Data>,
1258 transferList?: TransferListItem[]
1259 ): void
1260
1261 /**
1262 * Creates a new worker.
1263 *
1264 * @returns Newly created worker.
1265 */
1266 protected abstract createWorker (): Worker
1267
1268 /**
1269 * Creates a new, completely set up worker node.
1270 *
1271 * @returns New, completely set up worker node key.
1272 */
1273 protected createAndSetupWorkerNode (): number {
1274 const worker = this.createWorker()
1275
1276 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
1277 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
1278 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1279 worker.on('error', error => {
1280 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1281 this.flagWorkerNodeAsNotReady(workerNodeKey)
1282 const workerInfo = this.getWorkerInfo(workerNodeKey)
1283 this.emitter?.emit(PoolEvents.error, error)
1284 this.workerNodes[workerNodeKey].closeChannel()
1285 if (
1286 this.started &&
1287 !this.starting &&
1288 !this.destroying &&
1289 this.opts.restartWorkerOnError === true
1290 ) {
1291 if (workerInfo.dynamic) {
1292 this.createAndSetupDynamicWorkerNode()
1293 } else {
1294 this.createAndSetupWorkerNode()
1295 }
1296 }
1297 if (this.started && this.opts.enableTasksQueue === true) {
1298 this.redistributeQueuedTasks(workerNodeKey)
1299 }
1300 })
1301 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
1302 worker.once('exit', () => {
1303 this.removeWorkerNode(worker)
1304 })
1305
1306 const workerNodeKey = this.addWorkerNode(worker)
1307
1308 this.afterWorkerNodeSetup(workerNodeKey)
1309
1310 return workerNodeKey
1311 }
1312
1313 /**
1314 * Creates a new, completely set up dynamic worker node.
1315 *
1316 * @returns New, completely set up dynamic worker node key.
1317 */
1318 protected createAndSetupDynamicWorkerNode (): number {
1319 const workerNodeKey = this.createAndSetupWorkerNode()
1320 this.registerWorkerMessageListener(workerNodeKey, message => {
1321 this.checkMessageWorkerId(message)
1322 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1323 message.workerId
1324 )
1325 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
1326 // Kill message received from worker
1327 if (
1328 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1329 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
1330 ((this.opts.enableTasksQueue === false &&
1331 workerUsage.tasks.executing === 0) ||
1332 (this.opts.enableTasksQueue === true &&
1333 workerUsage.tasks.executing === 0 &&
1334 this.tasksQueueSize(localWorkerNodeKey) === 0)))
1335 ) {
1336 // Flag the worker node as not ready immediately
1337 this.flagWorkerNodeAsNotReady(localWorkerNodeKey)
1338 this.destroyWorkerNode(localWorkerNodeKey).catch(error => {
1339 this.emitter?.emit(PoolEvents.error, error)
1340 })
1341 }
1342 })
1343 const workerInfo = this.getWorkerInfo(workerNodeKey)
1344 this.sendToWorker(workerNodeKey, {
1345 checkActive: true
1346 })
1347 if (this.taskFunctions.size > 0) {
1348 for (const [taskFunctionName, taskFunction] of this.taskFunctions) {
1349 this.sendTaskFunctionOperationToWorker(workerNodeKey, {
1350 taskFunctionOperation: 'add',
1351 taskFunctionName,
1352 taskFunction: taskFunction.toString()
1353 }).catch(error => {
1354 this.emitter?.emit(PoolEvents.error, error)
1355 })
1356 }
1357 }
1358 workerInfo.dynamic = true
1359 if (
1360 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
1361 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1362 ) {
1363 workerInfo.ready = true
1364 }
1365 this.checkAndEmitDynamicWorkerCreationEvents()
1366 return workerNodeKey
1367 }
1368
1369 /**
1370 * Registers a listener callback on the worker given its worker node key.
1371 *
1372 * @param workerNodeKey - The worker node key.
1373 * @param listener - The message listener callback.
1374 */
1375 protected abstract registerWorkerMessageListener<
1376 Message extends Data | Response
1377 >(
1378 workerNodeKey: number,
1379 listener: (message: MessageValue<Message>) => void
1380 ): void
1381
1382 /**
1383 * Registers once a listener callback on the worker given its worker node key.
1384 *
1385 * @param workerNodeKey - The worker node key.
1386 * @param listener - The message listener callback.
1387 */
1388 protected abstract registerOnceWorkerMessageListener<
1389 Message extends Data | Response
1390 >(
1391 workerNodeKey: number,
1392 listener: (message: MessageValue<Message>) => void
1393 ): void
1394
1395 /**
1396 * Deregisters a listener callback on the worker given its worker node key.
1397 *
1398 * @param workerNodeKey - The worker node key.
1399 * @param listener - The message listener callback.
1400 */
1401 protected abstract deregisterWorkerMessageListener<
1402 Message extends Data | Response
1403 >(
1404 workerNodeKey: number,
1405 listener: (message: MessageValue<Message>) => void
1406 ): void
1407
1408 /**
1409 * Method hooked up after a worker node has been newly created.
1410 * Can be overridden.
1411 *
1412 * @param workerNodeKey - The newly created worker node key.
1413 */
1414 protected afterWorkerNodeSetup (workerNodeKey: number): void {
1415 // Listen to worker messages.
1416 this.registerWorkerMessageListener(
1417 workerNodeKey,
1418 this.workerMessageListener
1419 )
1420 // Send the startup message to worker.
1421 this.sendStartupMessageToWorker(workerNodeKey)
1422 // Send the statistics message to worker.
1423 this.sendStatisticsMessageToWorker(workerNodeKey)
1424 if (this.opts.enableTasksQueue === true) {
1425 if (this.opts.tasksQueueOptions?.taskStealing === true) {
1426 this.workerNodes[workerNodeKey].on(
1427 'idleWorkerNode',
1428 this.handleIdleWorkerNodeEvent
1429 )
1430 }
1431 if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
1432 this.workerNodes[workerNodeKey].on(
1433 'backPressure',
1434 this.handleBackPressureEvent
1435 )
1436 }
1437 }
1438 }
1439
1440 /**
1441 * Sends the startup message to worker given its worker node key.
1442 *
1443 * @param workerNodeKey - The worker node key.
1444 */
1445 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1446
1447 /**
1448 * Sends the statistics message to worker given its worker node key.
1449 *
1450 * @param workerNodeKey - The worker node key.
1451 */
1452 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
1453 this.sendToWorker(workerNodeKey, {
1454 statistics: {
1455 runTime:
1456 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1457 .runTime.aggregate,
1458 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1459 .elu.aggregate
1460 }
1461 })
1462 }
1463
1464 private handleTask (workerNodeKey: number, task: Task<Data>): void {
1465 if (this.shallExecuteTask(workerNodeKey)) {
1466 this.executeTask(workerNodeKey, task)
1467 } else {
1468 this.enqueueTask(workerNodeKey, task)
1469 }
1470 }
1471
1472 private redistributeQueuedTasks (workerNodeKey: number): void {
1473 if (this.workerNodes.length <= 1) {
1474 return
1475 }
1476 while (this.tasksQueueSize(workerNodeKey) > 0) {
1477 const destinationWorkerNodeKey = this.workerNodes.reduce(
1478 (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
1479 return workerNode.info.ready &&
1480 workerNode.usage.tasks.queued <
1481 workerNodes[minWorkerNodeKey].usage.tasks.queued
1482 ? workerNodeKey
1483 : minWorkerNodeKey
1484 },
1485 0
1486 )
1487 this.handleTask(
1488 destinationWorkerNodeKey,
1489 this.dequeueTask(workerNodeKey) as Task<Data>
1490 )
1491 }
1492 }
1493
1494 private updateTaskStolenStatisticsWorkerUsage (
1495 workerNodeKey: number,
1496 taskName: string
1497 ): void {
1498 const workerNode = this.workerNodes[workerNodeKey]
1499 if (workerNode?.usage != null) {
1500 ++workerNode.usage.tasks.stolen
1501 }
1502 if (
1503 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1504 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1505 ) {
1506 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1507 taskName
1508 ) as WorkerUsage
1509 ++taskFunctionWorkerUsage.tasks.stolen
1510 }
1511 }
1512
1513 private updateTaskSequentiallyStolenStatisticsWorkerUsage (
1514 workerNodeKey: number
1515 ): void {
1516 const workerNode = this.workerNodes[workerNodeKey]
1517 if (workerNode?.usage != null) {
1518 ++workerNode.usage.tasks.sequentiallyStolen
1519 }
1520 }
1521
1522 private updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1523 workerNodeKey: number,
1524 taskName: string
1525 ): void {
1526 const workerNode = this.workerNodes[workerNodeKey]
1527 if (
1528 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1529 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1530 ) {
1531 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1532 taskName
1533 ) as WorkerUsage
1534 ++taskFunctionWorkerUsage.tasks.sequentiallyStolen
1535 }
1536 }
1537
1538 private resetTaskSequentiallyStolenStatisticsWorkerUsage (
1539 workerNodeKey: number
1540 ): void {
1541 const workerNode = this.workerNodes[workerNodeKey]
1542 if (workerNode?.usage != null) {
1543 workerNode.usage.tasks.sequentiallyStolen = 0
1544 }
1545 }
1546
1547 private resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1548 workerNodeKey: number,
1549 taskName: string
1550 ): void {
1551 const workerNode = this.workerNodes[workerNodeKey]
1552 if (
1553 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1554 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1555 ) {
1556 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1557 taskName
1558 ) as WorkerUsage
1559 taskFunctionWorkerUsage.tasks.sequentiallyStolen = 0
1560 }
1561 }
1562
1563 private readonly handleIdleWorkerNodeEvent = (
1564 eventDetail: WorkerNodeEventDetail,
1565 previousStolenTask?: Task<Data>
1566 ): void => {
1567 if (this.workerNodes.length <= 1) {
1568 return
1569 }
1570 const { workerNodeKey } = eventDetail
1571 if (workerNodeKey == null) {
1572 throw new Error(
1573 'WorkerNode event detail workerNodeKey attribute must be defined'
1574 )
1575 }
1576 const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks
1577 if (
1578 previousStolenTask != null &&
1579 workerNodeTasksUsage.sequentiallyStolen > 0 &&
1580 (workerNodeTasksUsage.executing > 0 ||
1581 this.tasksQueueSize(workerNodeKey) > 0)
1582 ) {
1583 for (const taskName of this.workerNodes[workerNodeKey].info
1584 .taskFunctionNames as string[]) {
1585 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1586 workerNodeKey,
1587 taskName
1588 )
1589 }
1590 this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
1591 return
1592 }
1593 const stolenTask = this.workerNodeStealTask(workerNodeKey)
1594 if (
1595 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1596 stolenTask != null
1597 ) {
1598 const taskFunctionTasksWorkerUsage = this.workerNodes[
1599 workerNodeKey
1600 ].getTaskFunctionWorkerUsage(stolenTask.name as string)
1601 ?.tasks as TaskStatistics
1602 if (
1603 taskFunctionTasksWorkerUsage.sequentiallyStolen === 0 ||
1604 (previousStolenTask != null &&
1605 previousStolenTask.name === stolenTask.name &&
1606 taskFunctionTasksWorkerUsage.sequentiallyStolen > 0)
1607 ) {
1608 this.updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1609 workerNodeKey,
1610 stolenTask.name as string
1611 )
1612 } else {
1613 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1614 workerNodeKey,
1615 stolenTask.name as string
1616 )
1617 }
1618 }
1619 sleep(exponentialDelay(workerNodeTasksUsage.sequentiallyStolen))
1620 .then(() => {
1621 this.handleIdleWorkerNodeEvent(eventDetail, stolenTask)
1622 return undefined
1623 })
1624 .catch(EMPTY_FUNCTION)
1625 }
1626
1627 private readonly workerNodeStealTask = (
1628 workerNodeKey: number
1629 ): Task<Data> | undefined => {
1630 const workerNodes = this.workerNodes
1631 .slice()
1632 .sort(
1633 (workerNodeA, workerNodeB) =>
1634 workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
1635 )
1636 const sourceWorkerNode = workerNodes.find(
1637 (sourceWorkerNode, sourceWorkerNodeKey) =>
1638 sourceWorkerNode.info.ready &&
1639 sourceWorkerNodeKey !== workerNodeKey &&
1640 sourceWorkerNode.usage.tasks.queued > 0
1641 )
1642 if (sourceWorkerNode != null) {
1643 const task = sourceWorkerNode.popTask() as Task<Data>
1644 this.handleTask(workerNodeKey, task)
1645 this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
1646 this.updateTaskStolenStatisticsWorkerUsage(
1647 workerNodeKey,
1648 task.name as string
1649 )
1650 return task
1651 }
1652 }
1653
1654 private readonly handleBackPressureEvent = (
1655 eventDetail: WorkerNodeEventDetail
1656 ): void => {
1657 if (this.workerNodes.length <= 1) {
1658 return
1659 }
1660 const { workerId } = eventDetail
1661 const sizeOffset = 1
1662 if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) {
1663 return
1664 }
1665 const sourceWorkerNode =
1666 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
1667 const workerNodes = this.workerNodes
1668 .slice()
1669 .sort(
1670 (workerNodeA, workerNodeB) =>
1671 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1672 )
1673 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1674 if (
1675 sourceWorkerNode.usage.tasks.queued > 0 &&
1676 workerNode.info.ready &&
1677 workerNode.info.id !== workerId &&
1678 workerNode.usage.tasks.queued <
1679 (this.opts.tasksQueueOptions?.size as number) - sizeOffset
1680 ) {
1681 const task = sourceWorkerNode.popTask() as Task<Data>
1682 this.handleTask(workerNodeKey, task)
1683 this.updateTaskStolenStatisticsWorkerUsage(
1684 workerNodeKey,
1685 task.name as string
1686 )
1687 }
1688 }
1689 }
1690
1691 /**
1692 * This method is the message listener registered on each worker.
1693 */
1694 protected readonly workerMessageListener = (
1695 message: MessageValue<Response>
1696 ): void => {
1697 this.checkMessageWorkerId(message)
1698 const { workerId, ready, taskId, taskFunctionNames } = message
1699 if (ready != null && taskFunctionNames != null) {
1700 // Worker ready response received from worker
1701 this.handleWorkerReadyResponse(message)
1702 } else if (taskId != null) {
1703 // Task execution response received from worker
1704 this.handleTaskExecutionResponse(message)
1705 } else if (taskFunctionNames != null) {
1706 // Task function names message received from worker
1707 this.getWorkerInfo(
1708 this.getWorkerNodeKeyByWorkerId(workerId)
1709 ).taskFunctionNames = taskFunctionNames
1710 }
1711 }
1712
1713 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
1714 const { workerId, ready, taskFunctionNames } = message
1715 if (ready === false) {
1716 throw new Error(`Worker ${workerId as number} failed to initialize`)
1717 }
1718 const workerInfo = this.getWorkerInfo(
1719 this.getWorkerNodeKeyByWorkerId(workerId)
1720 )
1721 workerInfo.ready = ready as boolean
1722 workerInfo.taskFunctionNames = taskFunctionNames
1723 if (!this.readyEventEmitted && this.ready) {
1724 this.readyEventEmitted = true
1725 this.emitter?.emit(PoolEvents.ready, this.info)
1726 }
1727 }
1728
1729 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1730 const { workerId, taskId, workerError, data } = message
1731 const promiseResponse = this.promiseResponseMap.get(taskId as string)
1732 if (promiseResponse != null) {
1733 const { resolve, reject, workerNodeKey, asyncResource } = promiseResponse
1734 if (workerError != null) {
1735 this.emitter?.emit(PoolEvents.taskError, workerError)
1736 asyncResource != null
1737 ? asyncResource.runInAsyncScope(
1738 reject,
1739 this.emitter,
1740 workerError.message
1741 )
1742 : reject(workerError.message)
1743 } else {
1744 asyncResource != null
1745 ? asyncResource.runInAsyncScope(resolve, this.emitter, data)
1746 : resolve(data as Response)
1747 }
1748 asyncResource?.emitDestroy()
1749 this.afterTaskExecutionHook(workerNodeKey, message)
1750 this.workerChoiceStrategyContext.update(workerNodeKey)
1751 this.promiseResponseMap.delete(taskId as string)
1752 if (this.opts.enableTasksQueue === true) {
1753 const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks
1754 if (
1755 this.tasksQueueSize(workerNodeKey) > 0 &&
1756 workerNodeTasksUsage.executing <
1757 (this.opts.tasksQueueOptions?.concurrency as number)
1758 ) {
1759 this.executeTask(
1760 workerNodeKey,
1761 this.dequeueTask(workerNodeKey) as Task<Data>
1762 )
1763 }
1764 if (
1765 workerNodeTasksUsage.executing === 0 &&
1766 this.tasksQueueSize(workerNodeKey) === 0 &&
1767 workerNodeTasksUsage.sequentiallyStolen === 0
1768 ) {
1769 this.workerNodes[workerNodeKey].emit('idleWorkerNode', {
1770 workerId: workerId as number,
1771 workerNodeKey
1772 })
1773 }
1774 }
1775 }
1776 }
1777
1778 private checkAndEmitTaskExecutionEvents (): void {
1779 if (this.busy) {
1780 this.emitter?.emit(PoolEvents.busy, this.info)
1781 }
1782 }
1783
1784 private checkAndEmitTaskQueuingEvents (): void {
1785 if (this.hasBackPressure()) {
1786 this.emitter?.emit(PoolEvents.backPressure, this.info)
1787 }
1788 }
1789
1790 private checkAndEmitDynamicWorkerCreationEvents (): void {
1791 if (this.type === PoolTypes.dynamic) {
1792 if (this.full) {
1793 this.emitter?.emit(PoolEvents.full, this.info)
1794 }
1795 }
1796 }
1797
1798 /**
1799 * Gets the worker information given its worker node key.
1800 *
1801 * @param workerNodeKey - The worker node key.
1802 * @returns The worker information.
1803 */
1804 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
1805 return this.workerNodes[workerNodeKey]?.info
1806 }
1807
1808 /**
1809 * Adds the given worker in the pool worker nodes.
1810 *
1811 * @param worker - The worker.
1812 * @returns The added worker node key.
1813 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1814 */
1815 private addWorkerNode (worker: Worker): number {
1816 const workerNode = new WorkerNode<Worker, Data>(
1817 worker,
1818 this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2)
1819 )
1820 // Flag the worker node as ready at pool startup.
1821 if (this.starting) {
1822 workerNode.info.ready = true
1823 }
1824 this.workerNodes.push(workerNode)
1825 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1826 if (workerNodeKey === -1) {
1827 throw new Error('Worker added not found in worker nodes')
1828 }
1829 return workerNodeKey
1830 }
1831
1832 /**
1833 * Removes the given worker from the pool worker nodes.
1834 *
1835 * @param worker - The worker.
1836 */
1837 private removeWorkerNode (worker: Worker): void {
1838 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1839 if (workerNodeKey !== -1) {
1840 this.workerNodes.splice(workerNodeKey, 1)
1841 this.workerChoiceStrategyContext.remove(workerNodeKey)
1842 }
1843 }
1844
1845 protected flagWorkerNodeAsNotReady (workerNodeKey: number): void {
1846 this.getWorkerInfo(workerNodeKey).ready = false
1847 }
1848
1849 /** @inheritDoc */
1850 public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
1851 return (
1852 this.opts.enableTasksQueue === true &&
1853 this.workerNodes[workerNodeKey].hasBackPressure()
1854 )
1855 }
1856
1857 private hasBackPressure (): boolean {
1858 return (
1859 this.opts.enableTasksQueue === true &&
1860 this.workerNodes.findIndex(
1861 workerNode => !workerNode.hasBackPressure()
1862 ) === -1
1863 )
1864 }
1865
1866 /**
1867 * Executes the given task on the worker given its worker node key.
1868 *
1869 * @param workerNodeKey - The worker node key.
1870 * @param task - The task to execute.
1871 */
1872 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1873 this.beforeTaskExecutionHook(workerNodeKey, task)
1874 this.sendToWorker(workerNodeKey, task, task.transferList)
1875 this.checkAndEmitTaskExecutionEvents()
1876 }
1877
1878 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1879 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1880 this.checkAndEmitTaskQueuingEvents()
1881 return tasksQueueSize
1882 }
1883
1884 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1885 return this.workerNodes[workerNodeKey].dequeueTask()
1886 }
1887
1888 private tasksQueueSize (workerNodeKey: number): number {
1889 return this.workerNodes[workerNodeKey].tasksQueueSize()
1890 }
1891
1892 protected flushTasksQueue (workerNodeKey: number): void {
1893 while (this.tasksQueueSize(workerNodeKey) > 0) {
1894 this.executeTask(
1895 workerNodeKey,
1896 this.dequeueTask(workerNodeKey) as Task<Data>
1897 )
1898 }
1899 this.workerNodes[workerNodeKey].clearTasksQueue()
1900 }
1901
1902 private flushTasksQueues (): void {
1903 for (const [workerNodeKey] of this.workerNodes.entries()) {
1904 this.flushTasksQueue(workerNodeKey)
1905 }
1906 }
1907 }