fix: fix events listener cleanup
[poolifier.git] / src / pools / abstract-pool.ts
1 import { randomUUID } from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import type { TransferListItem } from 'node:worker_threads'
4 import { EventEmitterAsyncResource } from 'node:events'
5 import type {
6 MessageValue,
7 PromiseResponseWrapper,
8 Task
9 } from '../utility-types'
10 import {
11 DEFAULT_TASK_NAME,
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
13 EMPTY_FUNCTION,
14 average,
15 exponentialDelay,
16 isKillBehavior,
17 isPlainObject,
18 max,
19 median,
20 min,
21 round,
22 sleep
23 } from '../utils'
24 import { KillBehaviors } from '../worker/worker-options'
25 import type { TaskFunction } from '../worker/task-functions'
26 import {
27 type IPool,
28 PoolEvents,
29 type PoolInfo,
30 type PoolOptions,
31 type PoolType,
32 PoolTypes,
33 type TasksQueueOptions
34 } from './pool'
35 import type {
36 IWorker,
37 IWorkerNode,
38 TaskStatistics,
39 WorkerInfo,
40 WorkerNodeEventDetail,
41 WorkerType,
42 WorkerUsage
43 } from './worker'
44 import {
45 type MeasurementStatisticsRequirements,
46 Measurements,
47 WorkerChoiceStrategies,
48 type WorkerChoiceStrategy,
49 type WorkerChoiceStrategyOptions
50 } from './selection-strategies/selection-strategies-types'
51 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
52 import { version } from './version'
53 import { WorkerNode } from './worker-node'
54 import {
55 checkFilePath,
56 checkValidTasksQueueOptions,
57 checkValidWorkerChoiceStrategy,
58 updateMeasurementStatistics
59 } from './utils'
60
61 /**
62 * Base class that implements some shared logic for all poolifier pools.
63 *
64 * @typeParam Worker - Type of worker which manages this pool.
65 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
66 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
67 */
68 export abstract class AbstractPool<
69 Worker extends IWorker,
70 Data = unknown,
71 Response = unknown
72 > implements IPool<Worker, Data, Response> {
73 /** @inheritDoc */
74 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
75
76 /** @inheritDoc */
77 public emitter?: EventEmitterAsyncResource
78
79 /**
80 * Dynamic pool maximum size property placeholder.
81 */
82 protected readonly max?: number
83
84 /**
85 * The task execution response promise map:
86 * - `key`: The message id of each submitted task.
87 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
88 *
89 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
90 */
91 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
92 new Map<string, PromiseResponseWrapper<Response>>()
93
94 /**
95 * Worker choice strategy context referencing a worker choice algorithm implementation.
96 */
97 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
98 Worker,
99 Data,
100 Response
101 >
102
103 /**
104 * The task functions added at runtime map:
105 * - `key`: The task function name.
106 * - `value`: The task function itself.
107 */
108 private readonly taskFunctions: Map<string, TaskFunction<Data, Response>>
109
110 /**
111 * Whether the pool is started or not.
112 */
113 private started: boolean
114 /**
115 * Whether the pool is starting or not.
116 */
117 private starting: boolean
118 /**
119 * Whether the pool is destroying or not.
120 */
121 private destroying: boolean
122 /**
123 * Whether the pool ready event has been emitted or not.
124 */
125 private readyEventEmitted: boolean
126 /**
127 * The start timestamp of the pool.
128 */
129 private readonly startTimestamp
130
131 /**
132 * Constructs a new poolifier pool.
133 *
134 * @param numberOfWorkers - Number of workers that this pool should manage.
135 * @param filePath - Path to the worker file.
136 * @param opts - Options for the pool.
137 */
138 public constructor (
139 protected readonly numberOfWorkers: number,
140 protected readonly filePath: string,
141 protected readonly opts: PoolOptions<Worker>
142 ) {
143 if (!this.isMain()) {
144 throw new Error(
145 'Cannot start a pool from a worker with the same type as the pool'
146 )
147 }
148 checkFilePath(this.filePath)
149 this.checkNumberOfWorkers(this.numberOfWorkers)
150 this.checkPoolOptions(this.opts)
151
152 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
153 this.executeTask = this.executeTask.bind(this)
154 this.enqueueTask = this.enqueueTask.bind(this)
155
156 if (this.opts.enableEvents === true) {
157 this.initializeEventEmitter()
158 }
159 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
160 Worker,
161 Data,
162 Response
163 >(
164 this,
165 this.opts.workerChoiceStrategy,
166 this.opts.workerChoiceStrategyOptions
167 )
168
169 this.setupHook()
170
171 this.taskFunctions = new Map<string, TaskFunction<Data, Response>>()
172
173 this.started = false
174 this.starting = false
175 this.destroying = false
176 this.readyEventEmitted = false
177 if (this.opts.startWorkers === true) {
178 this.start()
179 }
180
181 this.startTimestamp = performance.now()
182 }
183
184 private checkNumberOfWorkers (numberOfWorkers: number): void {
185 if (numberOfWorkers == null) {
186 throw new Error(
187 'Cannot instantiate a pool without specifying the number of workers'
188 )
189 } else if (!Number.isSafeInteger(numberOfWorkers)) {
190 throw new TypeError(
191 'Cannot instantiate a pool with a non safe integer number of workers'
192 )
193 } else if (numberOfWorkers < 0) {
194 throw new RangeError(
195 'Cannot instantiate a pool with a negative number of workers'
196 )
197 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
198 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
199 }
200 }
201
202 private checkPoolOptions (opts: PoolOptions<Worker>): void {
203 if (isPlainObject(opts)) {
204 this.opts.startWorkers = opts.startWorkers ?? true
205 checkValidWorkerChoiceStrategy(
206 opts.workerChoiceStrategy as WorkerChoiceStrategy
207 )
208 this.opts.workerChoiceStrategy =
209 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
210 this.checkValidWorkerChoiceStrategyOptions(
211 opts.workerChoiceStrategyOptions as WorkerChoiceStrategyOptions
212 )
213 this.opts.workerChoiceStrategyOptions = {
214 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
215 ...opts.workerChoiceStrategyOptions
216 }
217 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
218 this.opts.enableEvents = opts.enableEvents ?? true
219 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
220 if (this.opts.enableTasksQueue) {
221 checkValidTasksQueueOptions(opts.tasksQueueOptions as TasksQueueOptions)
222 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
223 opts.tasksQueueOptions as TasksQueueOptions
224 )
225 }
226 } else {
227 throw new TypeError('Invalid pool options: must be a plain object')
228 }
229 }
230
231 private checkValidWorkerChoiceStrategyOptions (
232 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
233 ): void {
234 if (
235 workerChoiceStrategyOptions != null &&
236 !isPlainObject(workerChoiceStrategyOptions)
237 ) {
238 throw new TypeError(
239 'Invalid worker choice strategy options: must be a plain object'
240 )
241 }
242 if (
243 workerChoiceStrategyOptions?.retries != null &&
244 !Number.isSafeInteger(workerChoiceStrategyOptions.retries)
245 ) {
246 throw new TypeError(
247 'Invalid worker choice strategy options: retries must be an integer'
248 )
249 }
250 if (
251 workerChoiceStrategyOptions?.retries != null &&
252 workerChoiceStrategyOptions.retries < 0
253 ) {
254 throw new RangeError(
255 `Invalid worker choice strategy options: retries '${workerChoiceStrategyOptions.retries}' must be greater or equal than zero`
256 )
257 }
258 if (
259 workerChoiceStrategyOptions?.weights != null &&
260 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
261 ) {
262 throw new Error(
263 'Invalid worker choice strategy options: must have a weight for each worker node'
264 )
265 }
266 if (
267 workerChoiceStrategyOptions?.measurement != null &&
268 !Object.values(Measurements).includes(
269 workerChoiceStrategyOptions.measurement
270 )
271 ) {
272 throw new Error(
273 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
274 )
275 }
276 }
277
278 private initializeEventEmitter (): void {
279 this.emitter = new EventEmitterAsyncResource({
280 name: `poolifier:${this.type}-${this.worker}-pool`
281 })
282 }
283
284 /** @inheritDoc */
285 public get info (): PoolInfo {
286 return {
287 version,
288 type: this.type,
289 worker: this.worker,
290 started: this.started,
291 ready: this.ready,
292 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
293 minSize: this.minSize,
294 maxSize: this.maxSize,
295 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
296 .runTime.aggregate &&
297 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
298 .waitTime.aggregate && { utilization: round(this.utilization) }),
299 workerNodes: this.workerNodes.length,
300 idleWorkerNodes: this.workerNodes.reduce(
301 (accumulator, workerNode) =>
302 workerNode.usage.tasks.executing === 0
303 ? accumulator + 1
304 : accumulator,
305 0
306 ),
307 busyWorkerNodes: this.workerNodes.reduce(
308 (accumulator, workerNode) =>
309 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
310 0
311 ),
312 executedTasks: this.workerNodes.reduce(
313 (accumulator, workerNode) =>
314 accumulator + workerNode.usage.tasks.executed,
315 0
316 ),
317 executingTasks: this.workerNodes.reduce(
318 (accumulator, workerNode) =>
319 accumulator + workerNode.usage.tasks.executing,
320 0
321 ),
322 ...(this.opts.enableTasksQueue === true && {
323 queuedTasks: this.workerNodes.reduce(
324 (accumulator, workerNode) =>
325 accumulator + workerNode.usage.tasks.queued,
326 0
327 )
328 }),
329 ...(this.opts.enableTasksQueue === true && {
330 maxQueuedTasks: this.workerNodes.reduce(
331 (accumulator, workerNode) =>
332 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
333 0
334 )
335 }),
336 ...(this.opts.enableTasksQueue === true && {
337 backPressure: this.hasBackPressure()
338 }),
339 ...(this.opts.enableTasksQueue === true && {
340 stolenTasks: this.workerNodes.reduce(
341 (accumulator, workerNode) =>
342 accumulator + workerNode.usage.tasks.stolen,
343 0
344 )
345 }),
346 failedTasks: this.workerNodes.reduce(
347 (accumulator, workerNode) =>
348 accumulator + workerNode.usage.tasks.failed,
349 0
350 ),
351 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
352 .runTime.aggregate && {
353 runTime: {
354 minimum: round(
355 min(
356 ...this.workerNodes.map(
357 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
358 )
359 )
360 ),
361 maximum: round(
362 max(
363 ...this.workerNodes.map(
364 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
365 )
366 )
367 ),
368 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
369 .runTime.average && {
370 average: round(
371 average(
372 this.workerNodes.reduce<number[]>(
373 (accumulator, workerNode) =>
374 accumulator.concat(workerNode.usage.runTime.history),
375 []
376 )
377 )
378 )
379 }),
380 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
381 .runTime.median && {
382 median: round(
383 median(
384 this.workerNodes.reduce<number[]>(
385 (accumulator, workerNode) =>
386 accumulator.concat(workerNode.usage.runTime.history),
387 []
388 )
389 )
390 )
391 })
392 }
393 }),
394 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
395 .waitTime.aggregate && {
396 waitTime: {
397 minimum: round(
398 min(
399 ...this.workerNodes.map(
400 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
401 )
402 )
403 ),
404 maximum: round(
405 max(
406 ...this.workerNodes.map(
407 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
408 )
409 )
410 ),
411 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
412 .waitTime.average && {
413 average: round(
414 average(
415 this.workerNodes.reduce<number[]>(
416 (accumulator, workerNode) =>
417 accumulator.concat(workerNode.usage.waitTime.history),
418 []
419 )
420 )
421 )
422 }),
423 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
424 .waitTime.median && {
425 median: round(
426 median(
427 this.workerNodes.reduce<number[]>(
428 (accumulator, workerNode) =>
429 accumulator.concat(workerNode.usage.waitTime.history),
430 []
431 )
432 )
433 )
434 })
435 }
436 })
437 }
438 }
439
440 /**
441 * The pool readiness boolean status.
442 */
443 private get ready (): boolean {
444 return (
445 this.workerNodes.reduce(
446 (accumulator, workerNode) =>
447 !workerNode.info.dynamic && workerNode.info.ready
448 ? accumulator + 1
449 : accumulator,
450 0
451 ) >= this.minSize
452 )
453 }
454
455 /**
456 * The approximate pool utilization.
457 *
458 * @returns The pool utilization.
459 */
460 private get utilization (): number {
461 const poolTimeCapacity =
462 (performance.now() - this.startTimestamp) * this.maxSize
463 const totalTasksRunTime = this.workerNodes.reduce(
464 (accumulator, workerNode) =>
465 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
466 0
467 )
468 const totalTasksWaitTime = this.workerNodes.reduce(
469 (accumulator, workerNode) =>
470 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
471 0
472 )
473 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
474 }
475
476 /**
477 * The pool type.
478 *
479 * If it is `'dynamic'`, it provides the `max` property.
480 */
481 protected abstract get type (): PoolType
482
483 /**
484 * The worker type.
485 */
486 protected abstract get worker (): WorkerType
487
488 /**
489 * The pool minimum size.
490 */
491 protected get minSize (): number {
492 return this.numberOfWorkers
493 }
494
495 /**
496 * The pool maximum size.
497 */
498 protected get maxSize (): number {
499 return this.max ?? this.numberOfWorkers
500 }
501
502 /**
503 * Checks if the worker id sent in the received message from a worker is valid.
504 *
505 * @param message - The received message.
506 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
507 */
508 private checkMessageWorkerId (message: MessageValue<Data | Response>): void {
509 if (message.workerId == null) {
510 throw new Error('Worker message received without worker id')
511 } else if (this.getWorkerNodeKeyByWorkerId(message.workerId) === -1) {
512 throw new Error(
513 `Worker message received from unknown worker '${message.workerId}'`
514 )
515 }
516 }
517
518 /**
519 * Gets the given worker its worker node key.
520 *
521 * @param worker - The worker.
522 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
523 */
524 private getWorkerNodeKeyByWorker (worker: Worker): number {
525 return this.workerNodes.findIndex(
526 workerNode => workerNode.worker === worker
527 )
528 }
529
530 /**
531 * Gets the worker node key given its worker id.
532 *
533 * @param workerId - The worker id.
534 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
535 */
536 private getWorkerNodeKeyByWorkerId (workerId: number | undefined): number {
537 return this.workerNodes.findIndex(
538 workerNode => workerNode.info.id === workerId
539 )
540 }
541
542 /** @inheritDoc */
543 public setWorkerChoiceStrategy (
544 workerChoiceStrategy: WorkerChoiceStrategy,
545 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
546 ): void {
547 checkValidWorkerChoiceStrategy(workerChoiceStrategy)
548 this.opts.workerChoiceStrategy = workerChoiceStrategy
549 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
550 this.opts.workerChoiceStrategy
551 )
552 if (workerChoiceStrategyOptions != null) {
553 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
554 }
555 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
556 workerNode.resetUsage()
557 this.sendStatisticsMessageToWorker(workerNodeKey)
558 }
559 }
560
561 /** @inheritDoc */
562 public setWorkerChoiceStrategyOptions (
563 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
564 ): void {
565 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
566 this.opts.workerChoiceStrategyOptions = {
567 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
568 ...workerChoiceStrategyOptions
569 }
570 this.workerChoiceStrategyContext.setOptions(
571 this.opts.workerChoiceStrategyOptions
572 )
573 }
574
575 /** @inheritDoc */
576 public enableTasksQueue (
577 enable: boolean,
578 tasksQueueOptions?: TasksQueueOptions
579 ): void {
580 if (this.opts.enableTasksQueue === true && !enable) {
581 this.unsetTaskStealing()
582 this.unsetTasksStealingOnBackPressure()
583 this.flushTasksQueues()
584 }
585 this.opts.enableTasksQueue = enable
586 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
587 }
588
589 /** @inheritDoc */
590 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
591 if (this.opts.enableTasksQueue === true) {
592 checkValidTasksQueueOptions(tasksQueueOptions)
593 this.opts.tasksQueueOptions =
594 this.buildTasksQueueOptions(tasksQueueOptions)
595 this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number)
596 if (this.opts.tasksQueueOptions.taskStealing === true) {
597 this.setTaskStealing()
598 } else {
599 this.unsetTaskStealing()
600 }
601 if (this.opts.tasksQueueOptions.tasksStealingOnBackPressure === true) {
602 this.setTasksStealingOnBackPressure()
603 } else {
604 this.unsetTasksStealingOnBackPressure()
605 }
606 } else if (this.opts.tasksQueueOptions != null) {
607 delete this.opts.tasksQueueOptions
608 }
609 }
610
611 private buildTasksQueueOptions (
612 tasksQueueOptions: TasksQueueOptions
613 ): TasksQueueOptions {
614 return {
615 ...{
616 size: Math.pow(this.maxSize, 2),
617 concurrency: 1,
618 taskStealing: true,
619 tasksStealingOnBackPressure: true
620 },
621 ...tasksQueueOptions
622 }
623 }
624
625 private setTasksQueueSize (size: number): void {
626 for (const workerNode of this.workerNodes) {
627 workerNode.tasksQueueBackPressureSize = size
628 }
629 }
630
631 private setTaskStealing (): void {
632 for (const [workerNodeKey] of this.workerNodes.entries()) {
633 this.workerNodes[workerNodeKey].on(
634 'idleWorkerNode',
635 this.handleIdleWorkerNodeEvent
636 )
637 }
638 }
639
640 private unsetTaskStealing (): void {
641 for (const [workerNodeKey] of this.workerNodes.entries()) {
642 this.workerNodes[workerNodeKey].off(
643 'idleWorkerNode',
644 this.handleIdleWorkerNodeEvent
645 )
646 }
647 }
648
649 private setTasksStealingOnBackPressure (): void {
650 for (const [workerNodeKey] of this.workerNodes.entries()) {
651 this.workerNodes[workerNodeKey].on(
652 'backPressure',
653 this.handleBackPressureEvent
654 )
655 }
656 }
657
658 private unsetTasksStealingOnBackPressure (): void {
659 for (const [workerNodeKey] of this.workerNodes.entries()) {
660 this.workerNodes[workerNodeKey].off(
661 'backPressure',
662 this.handleBackPressureEvent
663 )
664 }
665 }
666
667 /**
668 * Whether the pool is full or not.
669 *
670 * The pool filling boolean status.
671 */
672 protected get full (): boolean {
673 return this.workerNodes.length >= this.maxSize
674 }
675
676 /**
677 * Whether the pool is busy or not.
678 *
679 * The pool busyness boolean status.
680 */
681 protected abstract get busy (): boolean
682
683 /**
684 * Whether worker nodes are executing concurrently their tasks quota or not.
685 *
686 * @returns Worker nodes busyness boolean status.
687 */
688 protected internalBusy (): boolean {
689 if (this.opts.enableTasksQueue === true) {
690 return (
691 this.workerNodes.findIndex(
692 workerNode =>
693 workerNode.info.ready &&
694 workerNode.usage.tasks.executing <
695 (this.opts.tasksQueueOptions?.concurrency as number)
696 ) === -1
697 )
698 }
699 return (
700 this.workerNodes.findIndex(
701 workerNode =>
702 workerNode.info.ready && workerNode.usage.tasks.executing === 0
703 ) === -1
704 )
705 }
706
707 private async sendTaskFunctionOperationToWorker (
708 workerNodeKey: number,
709 message: MessageValue<Data>
710 ): Promise<boolean> {
711 return await new Promise<boolean>((resolve, reject) => {
712 const taskFunctionOperationListener = (
713 message: MessageValue<Response>
714 ): void => {
715 this.checkMessageWorkerId(message)
716 const workerId = this.getWorkerInfo(workerNodeKey).id as number
717 if (
718 message.taskFunctionOperationStatus != null &&
719 message.workerId === workerId
720 ) {
721 if (message.taskFunctionOperationStatus) {
722 resolve(true)
723 } else if (!message.taskFunctionOperationStatus) {
724 reject(
725 new Error(
726 `Task function operation '${
727 message.taskFunctionOperation as string
728 }' failed on worker ${message.workerId} with error: '${
729 message.workerError?.message as string
730 }'`
731 )
732 )
733 }
734 this.deregisterWorkerMessageListener(
735 this.getWorkerNodeKeyByWorkerId(message.workerId),
736 taskFunctionOperationListener
737 )
738 }
739 }
740 this.registerWorkerMessageListener(
741 workerNodeKey,
742 taskFunctionOperationListener
743 )
744 this.sendToWorker(workerNodeKey, message)
745 })
746 }
747
748 private async sendTaskFunctionOperationToWorkers (
749 message: MessageValue<Data>
750 ): Promise<boolean> {
751 return await new Promise<boolean>((resolve, reject) => {
752 const responsesReceived = new Array<MessageValue<Response>>()
753 const taskFunctionOperationsListener = (
754 message: MessageValue<Response>
755 ): void => {
756 this.checkMessageWorkerId(message)
757 if (message.taskFunctionOperationStatus != null) {
758 responsesReceived.push(message)
759 if (responsesReceived.length === this.workerNodes.length) {
760 if (
761 responsesReceived.every(
762 message => message.taskFunctionOperationStatus === true
763 )
764 ) {
765 resolve(true)
766 } else if (
767 responsesReceived.some(
768 message => message.taskFunctionOperationStatus === false
769 )
770 ) {
771 const errorResponse = responsesReceived.find(
772 response => response.taskFunctionOperationStatus === false
773 )
774 reject(
775 new Error(
776 `Task function operation '${
777 message.taskFunctionOperation as string
778 }' failed on worker ${
779 errorResponse?.workerId as number
780 } with error: '${
781 errorResponse?.workerError?.message as string
782 }'`
783 )
784 )
785 }
786 this.deregisterWorkerMessageListener(
787 this.getWorkerNodeKeyByWorkerId(message.workerId),
788 taskFunctionOperationsListener
789 )
790 }
791 }
792 }
793 for (const [workerNodeKey] of this.workerNodes.entries()) {
794 this.registerWorkerMessageListener(
795 workerNodeKey,
796 taskFunctionOperationsListener
797 )
798 this.sendToWorker(workerNodeKey, message)
799 }
800 })
801 }
802
803 /** @inheritDoc */
804 public hasTaskFunction (name: string): boolean {
805 for (const workerNode of this.workerNodes) {
806 if (
807 Array.isArray(workerNode.info.taskFunctionNames) &&
808 workerNode.info.taskFunctionNames.includes(name)
809 ) {
810 return true
811 }
812 }
813 return false
814 }
815
816 /** @inheritDoc */
817 public async addTaskFunction (
818 name: string,
819 fn: TaskFunction<Data, Response>
820 ): Promise<boolean> {
821 if (typeof name !== 'string') {
822 throw new TypeError('name argument must be a string')
823 }
824 if (typeof name === 'string' && name.trim().length === 0) {
825 throw new TypeError('name argument must not be an empty string')
826 }
827 if (typeof fn !== 'function') {
828 throw new TypeError('fn argument must be a function')
829 }
830 const opResult = await this.sendTaskFunctionOperationToWorkers({
831 taskFunctionOperation: 'add',
832 taskFunctionName: name,
833 taskFunction: fn.toString()
834 })
835 this.taskFunctions.set(name, fn)
836 return opResult
837 }
838
839 /** @inheritDoc */
840 public async removeTaskFunction (name: string): Promise<boolean> {
841 if (!this.taskFunctions.has(name)) {
842 throw new Error(
843 'Cannot remove a task function not handled on the pool side'
844 )
845 }
846 const opResult = await this.sendTaskFunctionOperationToWorkers({
847 taskFunctionOperation: 'remove',
848 taskFunctionName: name
849 })
850 this.deleteTaskFunctionWorkerUsages(name)
851 this.taskFunctions.delete(name)
852 return opResult
853 }
854
855 /** @inheritDoc */
856 public listTaskFunctionNames (): string[] {
857 for (const workerNode of this.workerNodes) {
858 if (
859 Array.isArray(workerNode.info.taskFunctionNames) &&
860 workerNode.info.taskFunctionNames.length > 0
861 ) {
862 return workerNode.info.taskFunctionNames
863 }
864 }
865 return []
866 }
867
868 /** @inheritDoc */
869 public async setDefaultTaskFunction (name: string): Promise<boolean> {
870 return await this.sendTaskFunctionOperationToWorkers({
871 taskFunctionOperation: 'default',
872 taskFunctionName: name
873 })
874 }
875
876 private deleteTaskFunctionWorkerUsages (name: string): void {
877 for (const workerNode of this.workerNodes) {
878 workerNode.deleteTaskFunctionWorkerUsage(name)
879 }
880 }
881
882 private shallExecuteTask (workerNodeKey: number): boolean {
883 return (
884 this.tasksQueueSize(workerNodeKey) === 0 &&
885 this.workerNodes[workerNodeKey].usage.tasks.executing <
886 (this.opts.tasksQueueOptions?.concurrency as number)
887 )
888 }
889
890 /** @inheritDoc */
891 public async execute (
892 data?: Data,
893 name?: string,
894 transferList?: TransferListItem[]
895 ): Promise<Response> {
896 return await new Promise<Response>((resolve, reject) => {
897 if (!this.started) {
898 reject(new Error('Cannot execute a task on not started pool'))
899 return
900 }
901 if (this.destroying) {
902 reject(new Error('Cannot execute a task on destroying pool'))
903 return
904 }
905 if (name != null && typeof name !== 'string') {
906 reject(new TypeError('name argument must be a string'))
907 return
908 }
909 if (
910 name != null &&
911 typeof name === 'string' &&
912 name.trim().length === 0
913 ) {
914 reject(new TypeError('name argument must not be an empty string'))
915 return
916 }
917 if (transferList != null && !Array.isArray(transferList)) {
918 reject(new TypeError('transferList argument must be an array'))
919 return
920 }
921 const timestamp = performance.now()
922 const workerNodeKey = this.chooseWorkerNode()
923 const task: Task<Data> = {
924 name: name ?? DEFAULT_TASK_NAME,
925 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
926 data: data ?? ({} as Data),
927 transferList,
928 timestamp,
929 taskId: randomUUID()
930 }
931 this.promiseResponseMap.set(task.taskId as string, {
932 resolve,
933 reject,
934 workerNodeKey
935 })
936 if (
937 this.opts.enableTasksQueue === false ||
938 (this.opts.enableTasksQueue === true &&
939 this.shallExecuteTask(workerNodeKey))
940 ) {
941 this.executeTask(workerNodeKey, task)
942 } else {
943 this.enqueueTask(workerNodeKey, task)
944 }
945 })
946 }
947
948 /** @inheritdoc */
949 public start (): void {
950 if (this.started) {
951 throw new Error('Cannot start an already started pool')
952 }
953 if (this.starting) {
954 throw new Error('Cannot start an already starting pool')
955 }
956 if (this.destroying) {
957 throw new Error('Cannot start a destroying pool')
958 }
959 this.starting = true
960 while (
961 this.workerNodes.reduce(
962 (accumulator, workerNode) =>
963 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
964 0
965 ) < this.numberOfWorkers
966 ) {
967 this.createAndSetupWorkerNode()
968 }
969 this.starting = false
970 this.started = true
971 }
972
973 /** @inheritDoc */
974 public async destroy (): Promise<void> {
975 if (!this.started) {
976 throw new Error('Cannot destroy an already destroyed pool')
977 }
978 if (this.starting) {
979 throw new Error('Cannot destroy an starting pool')
980 }
981 if (this.destroying) {
982 throw new Error('Cannot destroy an already destroying pool')
983 }
984 this.destroying = true
985 await Promise.all(
986 this.workerNodes.map(async (_, workerNodeKey) => {
987 await this.destroyWorkerNode(workerNodeKey)
988 })
989 )
990 this.emitter?.emit(PoolEvents.destroy, this.info)
991 this.emitter?.emitDestroy()
992 this.emitter?.removeAllListeners()
993 this.readyEventEmitted = false
994 this.destroying = false
995 this.started = false
996 }
997
998 protected async sendKillMessageToWorker (
999 workerNodeKey: number
1000 ): Promise<void> {
1001 await new Promise<void>((resolve, reject) => {
1002 const killMessageListener = (message: MessageValue<Response>): void => {
1003 this.checkMessageWorkerId(message)
1004 if (message.kill === 'success') {
1005 resolve()
1006 } else if (message.kill === 'failure') {
1007 reject(
1008 new Error(
1009 `Kill message handling failed on worker ${
1010 message.workerId as number
1011 }`
1012 )
1013 )
1014 }
1015 }
1016 // FIXME: should be registered only once
1017 this.registerWorkerMessageListener(workerNodeKey, killMessageListener)
1018 this.sendToWorker(workerNodeKey, { kill: true })
1019 })
1020 }
1021
1022 /**
1023 * Terminates the worker node given its worker node key.
1024 *
1025 * @param workerNodeKey - The worker node key.
1026 */
1027 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
1028
1029 /**
1030 * Setup hook to execute code before worker nodes are created in the abstract constructor.
1031 * Can be overridden.
1032 *
1033 * @virtual
1034 */
1035 protected setupHook (): void {
1036 /* Intentionally empty */
1037 }
1038
1039 /**
1040 * Should return whether the worker is the main worker or not.
1041 */
1042 protected abstract isMain (): boolean
1043
1044 /**
1045 * Hook executed before the worker task execution.
1046 * Can be overridden.
1047 *
1048 * @param workerNodeKey - The worker node key.
1049 * @param task - The task to execute.
1050 */
1051 protected beforeTaskExecutionHook (
1052 workerNodeKey: number,
1053 task: Task<Data>
1054 ): void {
1055 if (this.workerNodes[workerNodeKey]?.usage != null) {
1056 const workerUsage = this.workerNodes[workerNodeKey].usage
1057 ++workerUsage.tasks.executing
1058 this.updateWaitTimeWorkerUsage(workerUsage, task)
1059 }
1060 if (
1061 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1062 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
1063 task.name as string
1064 ) != null
1065 ) {
1066 const taskFunctionWorkerUsage = this.workerNodes[
1067 workerNodeKey
1068 ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
1069 ++taskFunctionWorkerUsage.tasks.executing
1070 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
1071 }
1072 }
1073
1074 /**
1075 * Hook executed after the worker task execution.
1076 * Can be overridden.
1077 *
1078 * @param workerNodeKey - The worker node key.
1079 * @param message - The received message.
1080 */
1081 protected afterTaskExecutionHook (
1082 workerNodeKey: number,
1083 message: MessageValue<Response>
1084 ): void {
1085 if (this.workerNodes[workerNodeKey]?.usage != null) {
1086 const workerUsage = this.workerNodes[workerNodeKey].usage
1087 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
1088 this.updateRunTimeWorkerUsage(workerUsage, message)
1089 this.updateEluWorkerUsage(workerUsage, message)
1090 }
1091 if (
1092 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1093 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
1094 message.taskPerformance?.name as string
1095 ) != null
1096 ) {
1097 const taskFunctionWorkerUsage = this.workerNodes[
1098 workerNodeKey
1099 ].getTaskFunctionWorkerUsage(
1100 message.taskPerformance?.name as string
1101 ) as WorkerUsage
1102 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
1103 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
1104 this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
1105 }
1106 }
1107
1108 /**
1109 * Whether the worker node shall update its task function worker usage or not.
1110 *
1111 * @param workerNodeKey - The worker node key.
1112 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1113 */
1114 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
1115 const workerInfo = this.getWorkerInfo(workerNodeKey)
1116 return (
1117 workerInfo != null &&
1118 Array.isArray(workerInfo.taskFunctionNames) &&
1119 workerInfo.taskFunctionNames.length > 2
1120 )
1121 }
1122
1123 private updateTaskStatisticsWorkerUsage (
1124 workerUsage: WorkerUsage,
1125 message: MessageValue<Response>
1126 ): void {
1127 const workerTaskStatistics = workerUsage.tasks
1128 if (
1129 workerTaskStatistics.executing != null &&
1130 workerTaskStatistics.executing > 0
1131 ) {
1132 --workerTaskStatistics.executing
1133 }
1134 if (message.workerError == null) {
1135 ++workerTaskStatistics.executed
1136 } else {
1137 ++workerTaskStatistics.failed
1138 }
1139 }
1140
1141 private updateRunTimeWorkerUsage (
1142 workerUsage: WorkerUsage,
1143 message: MessageValue<Response>
1144 ): void {
1145 if (message.workerError != null) {
1146 return
1147 }
1148 updateMeasurementStatistics(
1149 workerUsage.runTime,
1150 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
1151 message.taskPerformance?.runTime ?? 0
1152 )
1153 }
1154
1155 private updateWaitTimeWorkerUsage (
1156 workerUsage: WorkerUsage,
1157 task: Task<Data>
1158 ): void {
1159 const timestamp = performance.now()
1160 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
1161 updateMeasurementStatistics(
1162 workerUsage.waitTime,
1163 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
1164 taskWaitTime
1165 )
1166 }
1167
1168 private updateEluWorkerUsage (
1169 workerUsage: WorkerUsage,
1170 message: MessageValue<Response>
1171 ): void {
1172 if (message.workerError != null) {
1173 return
1174 }
1175 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
1176 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
1177 updateMeasurementStatistics(
1178 workerUsage.elu.active,
1179 eluTaskStatisticsRequirements,
1180 message.taskPerformance?.elu?.active ?? 0
1181 )
1182 updateMeasurementStatistics(
1183 workerUsage.elu.idle,
1184 eluTaskStatisticsRequirements,
1185 message.taskPerformance?.elu?.idle ?? 0
1186 )
1187 if (eluTaskStatisticsRequirements.aggregate) {
1188 if (message.taskPerformance?.elu != null) {
1189 if (workerUsage.elu.utilization != null) {
1190 workerUsage.elu.utilization =
1191 (workerUsage.elu.utilization +
1192 message.taskPerformance.elu.utilization) /
1193 2
1194 } else {
1195 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
1196 }
1197 }
1198 }
1199 }
1200
1201 /**
1202 * Chooses a worker node for the next task.
1203 *
1204 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
1205 *
1206 * @returns The chosen worker node key
1207 */
1208 private chooseWorkerNode (): number {
1209 if (this.shallCreateDynamicWorker()) {
1210 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
1211 if (
1212 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1213 ) {
1214 return workerNodeKey
1215 }
1216 }
1217 return this.workerChoiceStrategyContext.execute()
1218 }
1219
1220 /**
1221 * Conditions for dynamic worker creation.
1222 *
1223 * @returns Whether to create a dynamic worker or not.
1224 */
1225 private shallCreateDynamicWorker (): boolean {
1226 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
1227 }
1228
1229 /**
1230 * Sends a message to worker given its worker node key.
1231 *
1232 * @param workerNodeKey - The worker node key.
1233 * @param message - The message.
1234 * @param transferList - The optional array of transferable objects.
1235 */
1236 protected abstract sendToWorker (
1237 workerNodeKey: number,
1238 message: MessageValue<Data>,
1239 transferList?: TransferListItem[]
1240 ): void
1241
1242 /**
1243 * Creates a new worker.
1244 *
1245 * @returns Newly created worker.
1246 */
1247 protected abstract createWorker (): Worker
1248
1249 /**
1250 * Creates a new, completely set up worker node.
1251 *
1252 * @returns New, completely set up worker node key.
1253 */
1254 protected createAndSetupWorkerNode (): number {
1255 const worker = this.createWorker()
1256
1257 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
1258 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
1259 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1260 worker.on('error', error => {
1261 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1262 this.flagWorkerNodeAsNotReady(workerNodeKey)
1263 const workerInfo = this.getWorkerInfo(workerNodeKey)
1264 this.emitter?.emit(PoolEvents.error, error)
1265 this.workerNodes[workerNodeKey].closeChannel()
1266 if (
1267 this.started &&
1268 !this.starting &&
1269 !this.destroying &&
1270 this.opts.restartWorkerOnError === true
1271 ) {
1272 if (workerInfo.dynamic) {
1273 this.createAndSetupDynamicWorkerNode()
1274 } else {
1275 this.createAndSetupWorkerNode()
1276 }
1277 }
1278 if (this.started && this.opts.enableTasksQueue === true) {
1279 this.redistributeQueuedTasks(workerNodeKey)
1280 }
1281 })
1282 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
1283 worker.once('exit', () => {
1284 this.removeWorkerNode(worker)
1285 })
1286
1287 const workerNodeKey = this.addWorkerNode(worker)
1288
1289 this.afterWorkerNodeSetup(workerNodeKey)
1290
1291 return workerNodeKey
1292 }
1293
1294 /**
1295 * Creates a new, completely set up dynamic worker node.
1296 *
1297 * @returns New, completely set up dynamic worker node key.
1298 */
1299 protected createAndSetupDynamicWorkerNode (): number {
1300 const workerNodeKey = this.createAndSetupWorkerNode()
1301 this.registerWorkerMessageListener(workerNodeKey, message => {
1302 this.checkMessageWorkerId(message)
1303 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1304 message.workerId
1305 )
1306 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
1307 // Kill message received from worker
1308 if (
1309 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1310 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
1311 ((this.opts.enableTasksQueue === false &&
1312 workerUsage.tasks.executing === 0) ||
1313 (this.opts.enableTasksQueue === true &&
1314 workerUsage.tasks.executing === 0 &&
1315 this.tasksQueueSize(localWorkerNodeKey) === 0)))
1316 ) {
1317 // Flag the worker node as not ready immediately
1318 this.flagWorkerNodeAsNotReady(localWorkerNodeKey)
1319 this.destroyWorkerNode(localWorkerNodeKey).catch(error => {
1320 this.emitter?.emit(PoolEvents.error, error)
1321 })
1322 }
1323 })
1324 const workerInfo = this.getWorkerInfo(workerNodeKey)
1325 this.sendToWorker(workerNodeKey, {
1326 checkActive: true
1327 })
1328 if (this.taskFunctions.size > 0) {
1329 for (const [taskFunctionName, taskFunction] of this.taskFunctions) {
1330 this.sendTaskFunctionOperationToWorker(workerNodeKey, {
1331 taskFunctionOperation: 'add',
1332 taskFunctionName,
1333 taskFunction: taskFunction.toString()
1334 }).catch(error => {
1335 this.emitter?.emit(PoolEvents.error, error)
1336 })
1337 }
1338 }
1339 workerInfo.dynamic = true
1340 if (
1341 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
1342 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1343 ) {
1344 workerInfo.ready = true
1345 }
1346 this.checkAndEmitDynamicWorkerCreationEvents()
1347 return workerNodeKey
1348 }
1349
1350 /**
1351 * Registers a listener callback on the worker given its worker node key.
1352 *
1353 * @param workerNodeKey - The worker node key.
1354 * @param listener - The message listener callback.
1355 */
1356 protected abstract registerWorkerMessageListener<
1357 Message extends Data | Response
1358 >(
1359 workerNodeKey: number,
1360 listener: (message: MessageValue<Message>) => void
1361 ): void
1362
1363 /**
1364 * Registers once a listener callback on the worker given its worker node key.
1365 *
1366 * @param workerNodeKey - The worker node key.
1367 * @param listener - The message listener callback.
1368 */
1369 protected abstract registerOnceWorkerMessageListener<
1370 Message extends Data | Response
1371 >(
1372 workerNodeKey: number,
1373 listener: (message: MessageValue<Message>) => void
1374 ): void
1375
1376 /**
1377 * Deregisters a listener callback on the worker given its worker node key.
1378 *
1379 * @param workerNodeKey - The worker node key.
1380 * @param listener - The message listener callback.
1381 */
1382 protected abstract deregisterWorkerMessageListener<
1383 Message extends Data | Response
1384 >(
1385 workerNodeKey: number,
1386 listener: (message: MessageValue<Message>) => void
1387 ): void
1388
1389 /**
1390 * Method hooked up after a worker node has been newly created.
1391 * Can be overridden.
1392 *
1393 * @param workerNodeKey - The newly created worker node key.
1394 */
1395 protected afterWorkerNodeSetup (workerNodeKey: number): void {
1396 // Listen to worker messages.
1397 this.registerWorkerMessageListener(
1398 workerNodeKey,
1399 this.workerMessageListener.bind(this)
1400 )
1401 // Send the startup message to worker.
1402 this.sendStartupMessageToWorker(workerNodeKey)
1403 // Send the statistics message to worker.
1404 this.sendStatisticsMessageToWorker(workerNodeKey)
1405 if (this.opts.enableTasksQueue === true) {
1406 if (this.opts.tasksQueueOptions?.taskStealing === true) {
1407 this.workerNodes[workerNodeKey].on(
1408 'idleWorkerNode',
1409 this.handleIdleWorkerNodeEvent
1410 )
1411 }
1412 if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
1413 this.workerNodes[workerNodeKey].on(
1414 'backPressure',
1415 this.handleBackPressureEvent
1416 )
1417 }
1418 }
1419 }
1420
1421 /**
1422 * Sends the startup message to worker given its worker node key.
1423 *
1424 * @param workerNodeKey - The worker node key.
1425 */
1426 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1427
1428 /**
1429 * Sends the statistics message to worker given its worker node key.
1430 *
1431 * @param workerNodeKey - The worker node key.
1432 */
1433 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
1434 this.sendToWorker(workerNodeKey, {
1435 statistics: {
1436 runTime:
1437 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1438 .runTime.aggregate,
1439 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1440 .elu.aggregate
1441 }
1442 })
1443 }
1444
1445 private redistributeQueuedTasks (workerNodeKey: number): void {
1446 while (this.tasksQueueSize(workerNodeKey) > 0) {
1447 const destinationWorkerNodeKey = this.workerNodes.reduce(
1448 (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
1449 return workerNode.info.ready &&
1450 workerNode.usage.tasks.queued <
1451 workerNodes[minWorkerNodeKey].usage.tasks.queued
1452 ? workerNodeKey
1453 : minWorkerNodeKey
1454 },
1455 0
1456 )
1457 const task = this.dequeueTask(workerNodeKey) as Task<Data>
1458 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1459 this.executeTask(destinationWorkerNodeKey, task)
1460 } else {
1461 this.enqueueTask(destinationWorkerNodeKey, task)
1462 }
1463 }
1464 }
1465
1466 private updateTaskStolenStatisticsWorkerUsage (
1467 workerNodeKey: number,
1468 taskName: string
1469 ): void {
1470 const workerNode = this.workerNodes[workerNodeKey]
1471 if (workerNode?.usage != null) {
1472 ++workerNode.usage.tasks.stolen
1473 }
1474 if (
1475 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1476 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1477 ) {
1478 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1479 taskName
1480 ) as WorkerUsage
1481 ++taskFunctionWorkerUsage.tasks.stolen
1482 }
1483 }
1484
1485 private updateTaskSequentiallyStolenStatisticsWorkerUsage (
1486 workerNodeKey: number
1487 ): void {
1488 const workerNode = this.workerNodes[workerNodeKey]
1489 if (workerNode?.usage != null) {
1490 ++workerNode.usage.tasks.sequentiallyStolen
1491 }
1492 }
1493
1494 private updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1495 workerNodeKey: number,
1496 taskName: string
1497 ): void {
1498 const workerNode = this.workerNodes[workerNodeKey]
1499 if (
1500 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1501 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1502 ) {
1503 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1504 taskName
1505 ) as WorkerUsage
1506 ++taskFunctionWorkerUsage.tasks.sequentiallyStolen
1507 }
1508 }
1509
1510 private resetTaskSequentiallyStolenStatisticsWorkerUsage (
1511 workerNodeKey: number
1512 ): void {
1513 const workerNode = this.workerNodes[workerNodeKey]
1514 if (workerNode?.usage != null) {
1515 workerNode.usage.tasks.sequentiallyStolen = 0
1516 }
1517 }
1518
1519 private resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1520 workerNodeKey: number,
1521 taskName: string
1522 ): void {
1523 const workerNode = this.workerNodes[workerNodeKey]
1524 if (
1525 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1526 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1527 ) {
1528 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1529 taskName
1530 ) as WorkerUsage
1531 taskFunctionWorkerUsage.tasks.sequentiallyStolen = 0
1532 }
1533 }
1534
1535 private readonly handleIdleWorkerNodeEvent = (
1536 eventDetail: WorkerNodeEventDetail,
1537 previousStolenTask?: Task<Data>
1538 ): void => {
1539 const { workerNodeKey } = eventDetail
1540 if (workerNodeKey == null) {
1541 throw new Error(
1542 'WorkerNode event detail workerNodeKey attribute must be defined'
1543 )
1544 }
1545 const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks
1546 if (
1547 previousStolenTask != null &&
1548 workerNodeTasksUsage.sequentiallyStolen > 0 &&
1549 (workerNodeTasksUsage.executing > 0 ||
1550 this.tasksQueueSize(workerNodeKey) > 0)
1551 ) {
1552 for (const taskName of this.workerNodes[workerNodeKey].info
1553 .taskFunctionNames as string[]) {
1554 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1555 workerNodeKey,
1556 taskName
1557 )
1558 }
1559 this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
1560 return
1561 }
1562 const stolenTask = this.workerNodeStealTask(workerNodeKey)
1563 if (
1564 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1565 stolenTask != null
1566 ) {
1567 const taskFunctionTasksWorkerUsage = this.workerNodes[
1568 workerNodeKey
1569 ].getTaskFunctionWorkerUsage(stolenTask.name as string)
1570 ?.tasks as TaskStatistics
1571 if (
1572 taskFunctionTasksWorkerUsage.sequentiallyStolen === 0 ||
1573 (previousStolenTask != null &&
1574 previousStolenTask.name === stolenTask.name &&
1575 taskFunctionTasksWorkerUsage.sequentiallyStolen > 0)
1576 ) {
1577 this.updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1578 workerNodeKey,
1579 stolenTask.name as string
1580 )
1581 } else {
1582 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1583 workerNodeKey,
1584 stolenTask.name as string
1585 )
1586 }
1587 }
1588 sleep(exponentialDelay(workerNodeTasksUsage.sequentiallyStolen))
1589 .then(() => {
1590 this.handleIdleWorkerNodeEvent(eventDetail, stolenTask)
1591 return undefined
1592 })
1593 .catch(EMPTY_FUNCTION)
1594 }
1595
1596 private readonly workerNodeStealTask = (
1597 workerNodeKey: number
1598 ): Task<Data> | undefined => {
1599 const workerNodes = this.workerNodes
1600 .slice()
1601 .sort(
1602 (workerNodeA, workerNodeB) =>
1603 workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
1604 )
1605 const sourceWorkerNode = workerNodes.find(
1606 (sourceWorkerNode, sourceWorkerNodeKey) =>
1607 sourceWorkerNode.info.ready &&
1608 sourceWorkerNodeKey !== workerNodeKey &&
1609 sourceWorkerNode.usage.tasks.queued > 0
1610 )
1611 if (sourceWorkerNode != null) {
1612 const task = sourceWorkerNode.popTask() as Task<Data>
1613 if (this.shallExecuteTask(workerNodeKey)) {
1614 this.executeTask(workerNodeKey, task)
1615 } else {
1616 this.enqueueTask(workerNodeKey, task)
1617 }
1618 this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
1619 this.updateTaskStolenStatisticsWorkerUsage(
1620 workerNodeKey,
1621 task.name as string
1622 )
1623 return task
1624 }
1625 }
1626
1627 private readonly handleBackPressureEvent = (
1628 eventDetail: WorkerNodeEventDetail
1629 ): void => {
1630 const { workerId } = eventDetail
1631 const sizeOffset = 1
1632 if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) {
1633 return
1634 }
1635 const sourceWorkerNode =
1636 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
1637 const workerNodes = this.workerNodes
1638 .slice()
1639 .sort(
1640 (workerNodeA, workerNodeB) =>
1641 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1642 )
1643 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1644 if (
1645 sourceWorkerNode.usage.tasks.queued > 0 &&
1646 workerNode.info.ready &&
1647 workerNode.info.id !== workerId &&
1648 workerNode.usage.tasks.queued <
1649 (this.opts.tasksQueueOptions?.size as number) - sizeOffset
1650 ) {
1651 const task = sourceWorkerNode.popTask() as Task<Data>
1652 if (this.shallExecuteTask(workerNodeKey)) {
1653 this.executeTask(workerNodeKey, task)
1654 } else {
1655 this.enqueueTask(workerNodeKey, task)
1656 }
1657 this.updateTaskStolenStatisticsWorkerUsage(
1658 workerNodeKey,
1659 task.name as string
1660 )
1661 }
1662 }
1663 }
1664
1665 /**
1666 * This method is the message listener registered on each worker.
1667 */
1668 protected workerMessageListener (message: MessageValue<Response>): void {
1669 this.checkMessageWorkerId(message)
1670 const { workerId, ready, taskId, taskFunctionNames } = message
1671 if (ready != null && taskFunctionNames != null) {
1672 // Worker ready response received from worker
1673 this.handleWorkerReadyResponse(message)
1674 } else if (taskId != null) {
1675 // Task execution response received from worker
1676 this.handleTaskExecutionResponse(message)
1677 } else if (taskFunctionNames != null) {
1678 // Task function names message received from worker
1679 this.getWorkerInfo(
1680 this.getWorkerNodeKeyByWorkerId(workerId)
1681 ).taskFunctionNames = taskFunctionNames
1682 }
1683 }
1684
1685 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
1686 const { workerId, ready, taskFunctionNames } = message
1687 if (ready === false) {
1688 throw new Error(`Worker ${workerId as number} failed to initialize`)
1689 }
1690 const workerInfo = this.getWorkerInfo(
1691 this.getWorkerNodeKeyByWorkerId(workerId)
1692 )
1693 workerInfo.ready = ready as boolean
1694 workerInfo.taskFunctionNames = taskFunctionNames
1695 if (!this.readyEventEmitted && this.ready) {
1696 this.readyEventEmitted = true
1697 this.emitter?.emit(PoolEvents.ready, this.info)
1698 }
1699 }
1700
1701 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1702 const { workerId, taskId, workerError, data } = message
1703 const promiseResponse = this.promiseResponseMap.get(taskId as string)
1704 if (promiseResponse != null) {
1705 const { resolve, reject, workerNodeKey } = promiseResponse
1706 if (workerError != null) {
1707 this.emitter?.emit(PoolEvents.taskError, workerError)
1708 reject(workerError.message)
1709 } else {
1710 resolve(data as Response)
1711 }
1712 this.afterTaskExecutionHook(workerNodeKey, message)
1713 this.workerChoiceStrategyContext.update(workerNodeKey)
1714 this.promiseResponseMap.delete(taskId as string)
1715 if (this.opts.enableTasksQueue === true) {
1716 const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks
1717 if (
1718 this.tasksQueueSize(workerNodeKey) > 0 &&
1719 workerNodeTasksUsage.executing <
1720 (this.opts.tasksQueueOptions?.concurrency as number)
1721 ) {
1722 this.executeTask(
1723 workerNodeKey,
1724 this.dequeueTask(workerNodeKey) as Task<Data>
1725 )
1726 }
1727 if (
1728 workerNodeTasksUsage.executing === 0 &&
1729 this.tasksQueueSize(workerNodeKey) === 0 &&
1730 workerNodeTasksUsage.sequentiallyStolen === 0
1731 ) {
1732 this.workerNodes[workerNodeKey].emit('idleWorkerNode', {
1733 workerId: workerId as number,
1734 workerNodeKey
1735 })
1736 }
1737 }
1738 }
1739 }
1740
1741 private checkAndEmitTaskExecutionEvents (): void {
1742 if (this.busy) {
1743 this.emitter?.emit(PoolEvents.busy, this.info)
1744 }
1745 }
1746
1747 private checkAndEmitTaskQueuingEvents (): void {
1748 if (this.hasBackPressure()) {
1749 this.emitter?.emit(PoolEvents.backPressure, this.info)
1750 }
1751 }
1752
1753 private checkAndEmitDynamicWorkerCreationEvents (): void {
1754 if (this.type === PoolTypes.dynamic) {
1755 if (this.full) {
1756 this.emitter?.emit(PoolEvents.full, this.info)
1757 }
1758 }
1759 }
1760
1761 /**
1762 * Gets the worker information given its worker node key.
1763 *
1764 * @param workerNodeKey - The worker node key.
1765 * @returns The worker information.
1766 */
1767 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
1768 return this.workerNodes[workerNodeKey]?.info
1769 }
1770
1771 /**
1772 * Adds the given worker in the pool worker nodes.
1773 *
1774 * @param worker - The worker.
1775 * @returns The added worker node key.
1776 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1777 */
1778 private addWorkerNode (worker: Worker): number {
1779 const workerNode = new WorkerNode<Worker, Data>(
1780 worker,
1781 this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2)
1782 )
1783 // Flag the worker node as ready at pool startup.
1784 if (this.starting) {
1785 workerNode.info.ready = true
1786 }
1787 this.workerNodes.push(workerNode)
1788 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1789 if (workerNodeKey === -1) {
1790 throw new Error('Worker added not found in worker nodes')
1791 }
1792 return workerNodeKey
1793 }
1794
1795 /**
1796 * Removes the given worker from the pool worker nodes.
1797 *
1798 * @param worker - The worker.
1799 */
1800 private removeWorkerNode (worker: Worker): void {
1801 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1802 if (workerNodeKey !== -1) {
1803 this.workerNodes.splice(workerNodeKey, 1)
1804 this.workerChoiceStrategyContext.remove(workerNodeKey)
1805 }
1806 }
1807
1808 protected flagWorkerNodeAsNotReady (workerNodeKey: number): void {
1809 this.getWorkerInfo(workerNodeKey).ready = false
1810 }
1811
1812 /** @inheritDoc */
1813 public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
1814 return (
1815 this.opts.enableTasksQueue === true &&
1816 this.workerNodes[workerNodeKey].hasBackPressure()
1817 )
1818 }
1819
1820 private hasBackPressure (): boolean {
1821 return (
1822 this.opts.enableTasksQueue === true &&
1823 this.workerNodes.findIndex(
1824 workerNode => !workerNode.hasBackPressure()
1825 ) === -1
1826 )
1827 }
1828
1829 /**
1830 * Executes the given task on the worker given its worker node key.
1831 *
1832 * @param workerNodeKey - The worker node key.
1833 * @param task - The task to execute.
1834 */
1835 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1836 this.beforeTaskExecutionHook(workerNodeKey, task)
1837 this.sendToWorker(workerNodeKey, task, task.transferList)
1838 this.checkAndEmitTaskExecutionEvents()
1839 }
1840
1841 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1842 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1843 this.checkAndEmitTaskQueuingEvents()
1844 return tasksQueueSize
1845 }
1846
1847 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1848 return this.workerNodes[workerNodeKey].dequeueTask()
1849 }
1850
1851 private tasksQueueSize (workerNodeKey: number): number {
1852 return this.workerNodes[workerNodeKey].tasksQueueSize()
1853 }
1854
1855 protected flushTasksQueue (workerNodeKey: number): void {
1856 while (this.tasksQueueSize(workerNodeKey) > 0) {
1857 this.executeTask(
1858 workerNodeKey,
1859 this.dequeueTask(workerNodeKey) as Task<Data>
1860 )
1861 }
1862 this.workerNodes[workerNodeKey].clearTasksQueue()
1863 }
1864
1865 private flushTasksQueues (): void {
1866 for (const [workerNodeKey] of this.workerNodes.entries()) {
1867 this.flushTasksQueue(workerNodeKey)
1868 }
1869 }
1870 }