refactor: make worker message listener an arrow function
[poolifier.git] / src / pools / abstract-pool.ts
1 import { randomUUID } from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import type { TransferListItem } from 'node:worker_threads'
4 import { EventEmitterAsyncResource } from 'node:events'
5 import type {
6 MessageValue,
7 PromiseResponseWrapper,
8 Task
9 } from '../utility-types'
10 import {
11 DEFAULT_TASK_NAME,
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
13 EMPTY_FUNCTION,
14 average,
15 exponentialDelay,
16 isKillBehavior,
17 isPlainObject,
18 max,
19 median,
20 min,
21 round,
22 sleep
23 } from '../utils'
24 import { KillBehaviors } from '../worker/worker-options'
25 import type { TaskFunction } from '../worker/task-functions'
26 import {
27 type IPool,
28 PoolEvents,
29 type PoolInfo,
30 type PoolOptions,
31 type PoolType,
32 PoolTypes,
33 type TasksQueueOptions
34 } from './pool'
35 import type {
36 IWorker,
37 IWorkerNode,
38 TaskStatistics,
39 WorkerInfo,
40 WorkerNodeEventDetail,
41 WorkerType,
42 WorkerUsage
43 } from './worker'
44 import {
45 type MeasurementStatisticsRequirements,
46 Measurements,
47 WorkerChoiceStrategies,
48 type WorkerChoiceStrategy,
49 type WorkerChoiceStrategyOptions
50 } from './selection-strategies/selection-strategies-types'
51 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
52 import { version } from './version'
53 import { WorkerNode } from './worker-node'
54 import {
55 checkFilePath,
56 checkValidTasksQueueOptions,
57 checkValidWorkerChoiceStrategy,
58 updateMeasurementStatistics
59 } from './utils'
60
61 /**
62 * Base class that implements some shared logic for all poolifier pools.
63 *
64 * @typeParam Worker - Type of worker which manages this pool.
65 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
66 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
67 */
68 export abstract class AbstractPool<
69 Worker extends IWorker,
70 Data = unknown,
71 Response = unknown
72 > implements IPool<Worker, Data, Response> {
73 /** @inheritDoc */
74 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
75
76 /** @inheritDoc */
77 public emitter?: EventEmitterAsyncResource
78
79 /**
80 * Dynamic pool maximum size property placeholder.
81 */
82 protected readonly max?: number
83
84 /**
85 * The task execution response promise map:
86 * - `key`: The message id of each submitted task.
87 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
88 *
89 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
90 */
91 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
92 new Map<string, PromiseResponseWrapper<Response>>()
93
94 /**
95 * Worker choice strategy context referencing a worker choice algorithm implementation.
96 */
97 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
98 Worker,
99 Data,
100 Response
101 >
102
103 /**
104 * The task functions added at runtime map:
105 * - `key`: The task function name.
106 * - `value`: The task function itself.
107 */
108 private readonly taskFunctions: Map<string, TaskFunction<Data, Response>>
109
110 /**
111 * Whether the pool is started or not.
112 */
113 private started: boolean
114 /**
115 * Whether the pool is starting or not.
116 */
117 private starting: boolean
118 /**
119 * Whether the pool is destroying or not.
120 */
121 private destroying: boolean
122 /**
123 * Whether the pool ready event has been emitted or not.
124 */
125 private readyEventEmitted: boolean
126 /**
127 * The start timestamp of the pool.
128 */
129 private readonly startTimestamp
130
131 /**
132 * Constructs a new poolifier pool.
133 *
134 * @param numberOfWorkers - Number of workers that this pool should manage.
135 * @param filePath - Path to the worker file.
136 * @param opts - Options for the pool.
137 */
138 public constructor (
139 protected readonly numberOfWorkers: number,
140 protected readonly filePath: string,
141 protected readonly opts: PoolOptions<Worker>
142 ) {
143 if (!this.isMain()) {
144 throw new Error(
145 'Cannot start a pool from a worker with the same type as the pool'
146 )
147 }
148 checkFilePath(this.filePath)
149 this.checkNumberOfWorkers(this.numberOfWorkers)
150 this.checkPoolOptions(this.opts)
151
152 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
153 this.executeTask = this.executeTask.bind(this)
154 this.enqueueTask = this.enqueueTask.bind(this)
155
156 if (this.opts.enableEvents === true) {
157 this.initializeEventEmitter()
158 }
159 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
160 Worker,
161 Data,
162 Response
163 >(
164 this,
165 this.opts.workerChoiceStrategy,
166 this.opts.workerChoiceStrategyOptions
167 )
168
169 this.setupHook()
170
171 this.taskFunctions = new Map<string, TaskFunction<Data, Response>>()
172
173 this.started = false
174 this.starting = false
175 this.destroying = false
176 this.readyEventEmitted = false
177 if (this.opts.startWorkers === true) {
178 this.start()
179 }
180
181 this.startTimestamp = performance.now()
182 }
183
184 private checkNumberOfWorkers (numberOfWorkers: number): void {
185 if (numberOfWorkers == null) {
186 throw new Error(
187 'Cannot instantiate a pool without specifying the number of workers'
188 )
189 } else if (!Number.isSafeInteger(numberOfWorkers)) {
190 throw new TypeError(
191 'Cannot instantiate a pool with a non safe integer number of workers'
192 )
193 } else if (numberOfWorkers < 0) {
194 throw new RangeError(
195 'Cannot instantiate a pool with a negative number of workers'
196 )
197 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
198 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
199 }
200 }
201
202 private checkPoolOptions (opts: PoolOptions<Worker>): void {
203 if (isPlainObject(opts)) {
204 this.opts.startWorkers = opts.startWorkers ?? true
205 checkValidWorkerChoiceStrategy(
206 opts.workerChoiceStrategy as WorkerChoiceStrategy
207 )
208 this.opts.workerChoiceStrategy =
209 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
210 this.checkValidWorkerChoiceStrategyOptions(
211 opts.workerChoiceStrategyOptions as WorkerChoiceStrategyOptions
212 )
213 this.opts.workerChoiceStrategyOptions = {
214 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
215 ...opts.workerChoiceStrategyOptions
216 }
217 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
218 this.opts.enableEvents = opts.enableEvents ?? true
219 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
220 if (this.opts.enableTasksQueue) {
221 checkValidTasksQueueOptions(opts.tasksQueueOptions as TasksQueueOptions)
222 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
223 opts.tasksQueueOptions as TasksQueueOptions
224 )
225 }
226 } else {
227 throw new TypeError('Invalid pool options: must be a plain object')
228 }
229 }
230
231 private checkValidWorkerChoiceStrategyOptions (
232 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
233 ): void {
234 if (
235 workerChoiceStrategyOptions != null &&
236 !isPlainObject(workerChoiceStrategyOptions)
237 ) {
238 throw new TypeError(
239 'Invalid worker choice strategy options: must be a plain object'
240 )
241 }
242 if (
243 workerChoiceStrategyOptions?.retries != null &&
244 !Number.isSafeInteger(workerChoiceStrategyOptions.retries)
245 ) {
246 throw new TypeError(
247 'Invalid worker choice strategy options: retries must be an integer'
248 )
249 }
250 if (
251 workerChoiceStrategyOptions?.retries != null &&
252 workerChoiceStrategyOptions.retries < 0
253 ) {
254 throw new RangeError(
255 `Invalid worker choice strategy options: retries '${workerChoiceStrategyOptions.retries}' must be greater or equal than zero`
256 )
257 }
258 if (
259 workerChoiceStrategyOptions?.weights != null &&
260 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
261 ) {
262 throw new Error(
263 'Invalid worker choice strategy options: must have a weight for each worker node'
264 )
265 }
266 if (
267 workerChoiceStrategyOptions?.measurement != null &&
268 !Object.values(Measurements).includes(
269 workerChoiceStrategyOptions.measurement
270 )
271 ) {
272 throw new Error(
273 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
274 )
275 }
276 }
277
278 private initializeEventEmitter (): void {
279 this.emitter = new EventEmitterAsyncResource({
280 name: `poolifier:${this.type}-${this.worker}-pool`
281 })
282 }
283
284 /** @inheritDoc */
285 public get info (): PoolInfo {
286 return {
287 version,
288 type: this.type,
289 worker: this.worker,
290 started: this.started,
291 ready: this.ready,
292 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
293 minSize: this.minSize,
294 maxSize: this.maxSize,
295 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
296 .runTime.aggregate &&
297 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
298 .waitTime.aggregate && { utilization: round(this.utilization) }),
299 workerNodes: this.workerNodes.length,
300 idleWorkerNodes: this.workerNodes.reduce(
301 (accumulator, workerNode) =>
302 workerNode.usage.tasks.executing === 0
303 ? accumulator + 1
304 : accumulator,
305 0
306 ),
307 busyWorkerNodes: this.workerNodes.reduce(
308 (accumulator, workerNode) =>
309 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
310 0
311 ),
312 executedTasks: this.workerNodes.reduce(
313 (accumulator, workerNode) =>
314 accumulator + workerNode.usage.tasks.executed,
315 0
316 ),
317 executingTasks: this.workerNodes.reduce(
318 (accumulator, workerNode) =>
319 accumulator + workerNode.usage.tasks.executing,
320 0
321 ),
322 ...(this.opts.enableTasksQueue === true && {
323 queuedTasks: this.workerNodes.reduce(
324 (accumulator, workerNode) =>
325 accumulator + workerNode.usage.tasks.queued,
326 0
327 )
328 }),
329 ...(this.opts.enableTasksQueue === true && {
330 maxQueuedTasks: this.workerNodes.reduce(
331 (accumulator, workerNode) =>
332 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
333 0
334 )
335 }),
336 ...(this.opts.enableTasksQueue === true && {
337 backPressure: this.hasBackPressure()
338 }),
339 ...(this.opts.enableTasksQueue === true && {
340 stolenTasks: this.workerNodes.reduce(
341 (accumulator, workerNode) =>
342 accumulator + workerNode.usage.tasks.stolen,
343 0
344 )
345 }),
346 failedTasks: this.workerNodes.reduce(
347 (accumulator, workerNode) =>
348 accumulator + workerNode.usage.tasks.failed,
349 0
350 ),
351 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
352 .runTime.aggregate && {
353 runTime: {
354 minimum: round(
355 min(
356 ...this.workerNodes.map(
357 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
358 )
359 )
360 ),
361 maximum: round(
362 max(
363 ...this.workerNodes.map(
364 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
365 )
366 )
367 ),
368 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
369 .runTime.average && {
370 average: round(
371 average(
372 this.workerNodes.reduce<number[]>(
373 (accumulator, workerNode) =>
374 accumulator.concat(workerNode.usage.runTime.history),
375 []
376 )
377 )
378 )
379 }),
380 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
381 .runTime.median && {
382 median: round(
383 median(
384 this.workerNodes.reduce<number[]>(
385 (accumulator, workerNode) =>
386 accumulator.concat(workerNode.usage.runTime.history),
387 []
388 )
389 )
390 )
391 })
392 }
393 }),
394 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
395 .waitTime.aggregate && {
396 waitTime: {
397 minimum: round(
398 min(
399 ...this.workerNodes.map(
400 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
401 )
402 )
403 ),
404 maximum: round(
405 max(
406 ...this.workerNodes.map(
407 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
408 )
409 )
410 ),
411 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
412 .waitTime.average && {
413 average: round(
414 average(
415 this.workerNodes.reduce<number[]>(
416 (accumulator, workerNode) =>
417 accumulator.concat(workerNode.usage.waitTime.history),
418 []
419 )
420 )
421 )
422 }),
423 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
424 .waitTime.median && {
425 median: round(
426 median(
427 this.workerNodes.reduce<number[]>(
428 (accumulator, workerNode) =>
429 accumulator.concat(workerNode.usage.waitTime.history),
430 []
431 )
432 )
433 )
434 })
435 }
436 })
437 }
438 }
439
440 /**
441 * The pool readiness boolean status.
442 */
443 private get ready (): boolean {
444 return (
445 this.workerNodes.reduce(
446 (accumulator, workerNode) =>
447 !workerNode.info.dynamic && workerNode.info.ready
448 ? accumulator + 1
449 : accumulator,
450 0
451 ) >= this.minSize
452 )
453 }
454
455 /**
456 * The approximate pool utilization.
457 *
458 * @returns The pool utilization.
459 */
460 private get utilization (): number {
461 const poolTimeCapacity =
462 (performance.now() - this.startTimestamp) * this.maxSize
463 const totalTasksRunTime = this.workerNodes.reduce(
464 (accumulator, workerNode) =>
465 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
466 0
467 )
468 const totalTasksWaitTime = this.workerNodes.reduce(
469 (accumulator, workerNode) =>
470 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
471 0
472 )
473 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
474 }
475
476 /**
477 * The pool type.
478 *
479 * If it is `'dynamic'`, it provides the `max` property.
480 */
481 protected abstract get type (): PoolType
482
483 /**
484 * The worker type.
485 */
486 protected abstract get worker (): WorkerType
487
488 /**
489 * The pool minimum size.
490 */
491 protected get minSize (): number {
492 return this.numberOfWorkers
493 }
494
495 /**
496 * The pool maximum size.
497 */
498 protected get maxSize (): number {
499 return this.max ?? this.numberOfWorkers
500 }
501
502 /**
503 * Checks if the worker id sent in the received message from a worker is valid.
504 *
505 * @param message - The received message.
506 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
507 */
508 private checkMessageWorkerId (message: MessageValue<Data | Response>): void {
509 if (message.workerId == null) {
510 throw new Error('Worker message received without worker id')
511 } else if (this.getWorkerNodeKeyByWorkerId(message.workerId) === -1) {
512 throw new Error(
513 `Worker message received from unknown worker '${message.workerId}'`
514 )
515 }
516 }
517
518 /**
519 * Gets the given worker its worker node key.
520 *
521 * @param worker - The worker.
522 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
523 */
524 private getWorkerNodeKeyByWorker (worker: Worker): number {
525 return this.workerNodes.findIndex(
526 workerNode => workerNode.worker === worker
527 )
528 }
529
530 /**
531 * Gets the worker node key given its worker id.
532 *
533 * @param workerId - The worker id.
534 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
535 */
536 private getWorkerNodeKeyByWorkerId (workerId: number | undefined): number {
537 return this.workerNodes.findIndex(
538 workerNode => workerNode.info.id === workerId
539 )
540 }
541
542 /** @inheritDoc */
543 public setWorkerChoiceStrategy (
544 workerChoiceStrategy: WorkerChoiceStrategy,
545 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
546 ): void {
547 checkValidWorkerChoiceStrategy(workerChoiceStrategy)
548 this.opts.workerChoiceStrategy = workerChoiceStrategy
549 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
550 this.opts.workerChoiceStrategy
551 )
552 if (workerChoiceStrategyOptions != null) {
553 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
554 }
555 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
556 workerNode.resetUsage()
557 this.sendStatisticsMessageToWorker(workerNodeKey)
558 }
559 }
560
561 /** @inheritDoc */
562 public setWorkerChoiceStrategyOptions (
563 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
564 ): void {
565 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
566 this.opts.workerChoiceStrategyOptions = {
567 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
568 ...workerChoiceStrategyOptions
569 }
570 this.workerChoiceStrategyContext.setOptions(
571 this.opts.workerChoiceStrategyOptions
572 )
573 }
574
575 /** @inheritDoc */
576 public enableTasksQueue (
577 enable: boolean,
578 tasksQueueOptions?: TasksQueueOptions
579 ): void {
580 if (this.opts.enableTasksQueue === true && !enable) {
581 this.unsetTaskStealing()
582 this.unsetTasksStealingOnBackPressure()
583 this.flushTasksQueues()
584 }
585 this.opts.enableTasksQueue = enable
586 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
587 }
588
589 /** @inheritDoc */
590 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
591 if (this.opts.enableTasksQueue === true) {
592 checkValidTasksQueueOptions(tasksQueueOptions)
593 this.opts.tasksQueueOptions =
594 this.buildTasksQueueOptions(tasksQueueOptions)
595 this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number)
596 if (this.opts.tasksQueueOptions.taskStealing === true) {
597 this.unsetTaskStealing()
598 this.setTaskStealing()
599 } else {
600 this.unsetTaskStealing()
601 }
602 if (this.opts.tasksQueueOptions.tasksStealingOnBackPressure === true) {
603 this.unsetTasksStealingOnBackPressure()
604 this.setTasksStealingOnBackPressure()
605 } else {
606 this.unsetTasksStealingOnBackPressure()
607 }
608 } else if (this.opts.tasksQueueOptions != null) {
609 delete this.opts.tasksQueueOptions
610 }
611 }
612
613 private buildTasksQueueOptions (
614 tasksQueueOptions: TasksQueueOptions
615 ): TasksQueueOptions {
616 return {
617 ...{
618 size: Math.pow(this.maxSize, 2),
619 concurrency: 1,
620 taskStealing: true,
621 tasksStealingOnBackPressure: true
622 },
623 ...tasksQueueOptions
624 }
625 }
626
627 private setTasksQueueSize (size: number): void {
628 for (const workerNode of this.workerNodes) {
629 workerNode.tasksQueueBackPressureSize = size
630 }
631 }
632
633 private setTaskStealing (): void {
634 for (const [workerNodeKey] of this.workerNodes.entries()) {
635 this.workerNodes[workerNodeKey].on(
636 'idleWorkerNode',
637 this.handleIdleWorkerNodeEvent
638 )
639 }
640 }
641
642 private unsetTaskStealing (): void {
643 for (const [workerNodeKey] of this.workerNodes.entries()) {
644 this.workerNodes[workerNodeKey].off(
645 'idleWorkerNode',
646 this.handleIdleWorkerNodeEvent
647 )
648 }
649 }
650
651 private setTasksStealingOnBackPressure (): void {
652 for (const [workerNodeKey] of this.workerNodes.entries()) {
653 this.workerNodes[workerNodeKey].on(
654 'backPressure',
655 this.handleBackPressureEvent
656 )
657 }
658 }
659
660 private unsetTasksStealingOnBackPressure (): void {
661 for (const [workerNodeKey] of this.workerNodes.entries()) {
662 this.workerNodes[workerNodeKey].off(
663 'backPressure',
664 this.handleBackPressureEvent
665 )
666 }
667 }
668
669 /**
670 * Whether the pool is full or not.
671 *
672 * The pool filling boolean status.
673 */
674 protected get full (): boolean {
675 return this.workerNodes.length >= this.maxSize
676 }
677
678 /**
679 * Whether the pool is busy or not.
680 *
681 * The pool busyness boolean status.
682 */
683 protected abstract get busy (): boolean
684
685 /**
686 * Whether worker nodes are executing concurrently their tasks quota or not.
687 *
688 * @returns Worker nodes busyness boolean status.
689 */
690 protected internalBusy (): boolean {
691 if (this.opts.enableTasksQueue === true) {
692 return (
693 this.workerNodes.findIndex(
694 workerNode =>
695 workerNode.info.ready &&
696 workerNode.usage.tasks.executing <
697 (this.opts.tasksQueueOptions?.concurrency as number)
698 ) === -1
699 )
700 }
701 return (
702 this.workerNodes.findIndex(
703 workerNode =>
704 workerNode.info.ready && workerNode.usage.tasks.executing === 0
705 ) === -1
706 )
707 }
708
709 private async sendTaskFunctionOperationToWorker (
710 workerNodeKey: number,
711 message: MessageValue<Data>
712 ): Promise<boolean> {
713 return await new Promise<boolean>((resolve, reject) => {
714 const taskFunctionOperationListener = (
715 message: MessageValue<Response>
716 ): void => {
717 this.checkMessageWorkerId(message)
718 const workerId = this.getWorkerInfo(workerNodeKey).id as number
719 if (
720 message.taskFunctionOperationStatus != null &&
721 message.workerId === workerId
722 ) {
723 if (message.taskFunctionOperationStatus) {
724 resolve(true)
725 } else if (!message.taskFunctionOperationStatus) {
726 reject(
727 new Error(
728 `Task function operation '${
729 message.taskFunctionOperation as string
730 }' failed on worker ${message.workerId} with error: '${
731 message.workerError?.message as string
732 }'`
733 )
734 )
735 }
736 this.deregisterWorkerMessageListener(
737 this.getWorkerNodeKeyByWorkerId(message.workerId),
738 taskFunctionOperationListener
739 )
740 }
741 }
742 this.registerWorkerMessageListener(
743 workerNodeKey,
744 taskFunctionOperationListener
745 )
746 this.sendToWorker(workerNodeKey, message)
747 })
748 }
749
750 private async sendTaskFunctionOperationToWorkers (
751 message: MessageValue<Data>
752 ): Promise<boolean> {
753 return await new Promise<boolean>((resolve, reject) => {
754 const responsesReceived = new Array<MessageValue<Response>>()
755 const taskFunctionOperationsListener = (
756 message: MessageValue<Response>
757 ): void => {
758 this.checkMessageWorkerId(message)
759 if (message.taskFunctionOperationStatus != null) {
760 responsesReceived.push(message)
761 if (responsesReceived.length === this.workerNodes.length) {
762 if (
763 responsesReceived.every(
764 message => message.taskFunctionOperationStatus === true
765 )
766 ) {
767 resolve(true)
768 } else if (
769 responsesReceived.some(
770 message => message.taskFunctionOperationStatus === false
771 )
772 ) {
773 const errorResponse = responsesReceived.find(
774 response => response.taskFunctionOperationStatus === false
775 )
776 reject(
777 new Error(
778 `Task function operation '${
779 message.taskFunctionOperation as string
780 }' failed on worker ${
781 errorResponse?.workerId as number
782 } with error: '${
783 errorResponse?.workerError?.message as string
784 }'`
785 )
786 )
787 }
788 this.deregisterWorkerMessageListener(
789 this.getWorkerNodeKeyByWorkerId(message.workerId),
790 taskFunctionOperationsListener
791 )
792 }
793 }
794 }
795 for (const [workerNodeKey] of this.workerNodes.entries()) {
796 this.registerWorkerMessageListener(
797 workerNodeKey,
798 taskFunctionOperationsListener
799 )
800 this.sendToWorker(workerNodeKey, message)
801 }
802 })
803 }
804
805 /** @inheritDoc */
806 public hasTaskFunction (name: string): boolean {
807 for (const workerNode of this.workerNodes) {
808 if (
809 Array.isArray(workerNode.info.taskFunctionNames) &&
810 workerNode.info.taskFunctionNames.includes(name)
811 ) {
812 return true
813 }
814 }
815 return false
816 }
817
818 /** @inheritDoc */
819 public async addTaskFunction (
820 name: string,
821 fn: TaskFunction<Data, Response>
822 ): Promise<boolean> {
823 if (typeof name !== 'string') {
824 throw new TypeError('name argument must be a string')
825 }
826 if (typeof name === 'string' && name.trim().length === 0) {
827 throw new TypeError('name argument must not be an empty string')
828 }
829 if (typeof fn !== 'function') {
830 throw new TypeError('fn argument must be a function')
831 }
832 const opResult = await this.sendTaskFunctionOperationToWorkers({
833 taskFunctionOperation: 'add',
834 taskFunctionName: name,
835 taskFunction: fn.toString()
836 })
837 this.taskFunctions.set(name, fn)
838 return opResult
839 }
840
841 /** @inheritDoc */
842 public async removeTaskFunction (name: string): Promise<boolean> {
843 if (!this.taskFunctions.has(name)) {
844 throw new Error(
845 'Cannot remove a task function not handled on the pool side'
846 )
847 }
848 const opResult = await this.sendTaskFunctionOperationToWorkers({
849 taskFunctionOperation: 'remove',
850 taskFunctionName: name
851 })
852 this.deleteTaskFunctionWorkerUsages(name)
853 this.taskFunctions.delete(name)
854 return opResult
855 }
856
857 /** @inheritDoc */
858 public listTaskFunctionNames (): string[] {
859 for (const workerNode of this.workerNodes) {
860 if (
861 Array.isArray(workerNode.info.taskFunctionNames) &&
862 workerNode.info.taskFunctionNames.length > 0
863 ) {
864 return workerNode.info.taskFunctionNames
865 }
866 }
867 return []
868 }
869
870 /** @inheritDoc */
871 public async setDefaultTaskFunction (name: string): Promise<boolean> {
872 return await this.sendTaskFunctionOperationToWorkers({
873 taskFunctionOperation: 'default',
874 taskFunctionName: name
875 })
876 }
877
878 private deleteTaskFunctionWorkerUsages (name: string): void {
879 for (const workerNode of this.workerNodes) {
880 workerNode.deleteTaskFunctionWorkerUsage(name)
881 }
882 }
883
884 private shallExecuteTask (workerNodeKey: number): boolean {
885 return (
886 this.tasksQueueSize(workerNodeKey) === 0 &&
887 this.workerNodes[workerNodeKey].usage.tasks.executing <
888 (this.opts.tasksQueueOptions?.concurrency as number)
889 )
890 }
891
892 /** @inheritDoc */
893 public async execute (
894 data?: Data,
895 name?: string,
896 transferList?: TransferListItem[]
897 ): Promise<Response> {
898 return await new Promise<Response>((resolve, reject) => {
899 if (!this.started) {
900 reject(new Error('Cannot execute a task on not started pool'))
901 return
902 }
903 if (this.destroying) {
904 reject(new Error('Cannot execute a task on destroying pool'))
905 return
906 }
907 if (name != null && typeof name !== 'string') {
908 reject(new TypeError('name argument must be a string'))
909 return
910 }
911 if (
912 name != null &&
913 typeof name === 'string' &&
914 name.trim().length === 0
915 ) {
916 reject(new TypeError('name argument must not be an empty string'))
917 return
918 }
919 if (transferList != null && !Array.isArray(transferList)) {
920 reject(new TypeError('transferList argument must be an array'))
921 return
922 }
923 const timestamp = performance.now()
924 const workerNodeKey = this.chooseWorkerNode()
925 const task: Task<Data> = {
926 name: name ?? DEFAULT_TASK_NAME,
927 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
928 data: data ?? ({} as Data),
929 transferList,
930 timestamp,
931 taskId: randomUUID()
932 }
933 this.promiseResponseMap.set(task.taskId as string, {
934 resolve,
935 reject,
936 workerNodeKey
937 })
938 if (
939 this.opts.enableTasksQueue === false ||
940 (this.opts.enableTasksQueue === true &&
941 this.shallExecuteTask(workerNodeKey))
942 ) {
943 this.executeTask(workerNodeKey, task)
944 } else {
945 this.enqueueTask(workerNodeKey, task)
946 }
947 })
948 }
949
950 /** @inheritdoc */
951 public start (): void {
952 if (this.started) {
953 throw new Error('Cannot start an already started pool')
954 }
955 if (this.starting) {
956 throw new Error('Cannot start an already starting pool')
957 }
958 if (this.destroying) {
959 throw new Error('Cannot start a destroying pool')
960 }
961 this.starting = true
962 while (
963 this.workerNodes.reduce(
964 (accumulator, workerNode) =>
965 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
966 0
967 ) < this.numberOfWorkers
968 ) {
969 this.createAndSetupWorkerNode()
970 }
971 this.starting = false
972 this.started = true
973 }
974
975 /** @inheritDoc */
976 public async destroy (): Promise<void> {
977 if (!this.started) {
978 throw new Error('Cannot destroy an already destroyed pool')
979 }
980 if (this.starting) {
981 throw new Error('Cannot destroy an starting pool')
982 }
983 if (this.destroying) {
984 throw new Error('Cannot destroy an already destroying pool')
985 }
986 this.destroying = true
987 await Promise.all(
988 this.workerNodes.map(async (_workerNode, workerNodeKey) => {
989 await this.destroyWorkerNode(workerNodeKey)
990 })
991 )
992 this.emitter?.emit(PoolEvents.destroy, this.info)
993 this.emitter?.emitDestroy()
994 this.emitter?.removeAllListeners()
995 this.readyEventEmitted = false
996 this.destroying = false
997 this.started = false
998 }
999
1000 protected async sendKillMessageToWorker (
1001 workerNodeKey: number
1002 ): Promise<void> {
1003 await new Promise<void>((resolve, reject) => {
1004 const killMessageListener = (message: MessageValue<Response>): void => {
1005 this.checkMessageWorkerId(message)
1006 if (message.kill === 'success') {
1007 resolve()
1008 } else if (message.kill === 'failure') {
1009 reject(
1010 new Error(
1011 `Kill message handling failed on worker ${
1012 message.workerId as number
1013 }`
1014 )
1015 )
1016 }
1017 }
1018 // FIXME: should be registered only once
1019 this.registerWorkerMessageListener(workerNodeKey, killMessageListener)
1020 this.sendToWorker(workerNodeKey, { kill: true })
1021 })
1022 }
1023
1024 /**
1025 * Terminates the worker node given its worker node key.
1026 *
1027 * @param workerNodeKey - The worker node key.
1028 */
1029 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
1030
1031 /**
1032 * Setup hook to execute code before worker nodes are created in the abstract constructor.
1033 * Can be overridden.
1034 *
1035 * @virtual
1036 */
1037 protected setupHook (): void {
1038 /* Intentionally empty */
1039 }
1040
1041 /**
1042 * Should return whether the worker is the main worker or not.
1043 */
1044 protected abstract isMain (): boolean
1045
1046 /**
1047 * Hook executed before the worker task execution.
1048 * Can be overridden.
1049 *
1050 * @param workerNodeKey - The worker node key.
1051 * @param task - The task to execute.
1052 */
1053 protected beforeTaskExecutionHook (
1054 workerNodeKey: number,
1055 task: Task<Data>
1056 ): void {
1057 if (this.workerNodes[workerNodeKey]?.usage != null) {
1058 const workerUsage = this.workerNodes[workerNodeKey].usage
1059 ++workerUsage.tasks.executing
1060 this.updateWaitTimeWorkerUsage(workerUsage, task)
1061 }
1062 if (
1063 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1064 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
1065 task.name as string
1066 ) != null
1067 ) {
1068 const taskFunctionWorkerUsage = this.workerNodes[
1069 workerNodeKey
1070 ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
1071 ++taskFunctionWorkerUsage.tasks.executing
1072 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
1073 }
1074 }
1075
1076 /**
1077 * Hook executed after the worker task execution.
1078 * Can be overridden.
1079 *
1080 * @param workerNodeKey - The worker node key.
1081 * @param message - The received message.
1082 */
1083 protected afterTaskExecutionHook (
1084 workerNodeKey: number,
1085 message: MessageValue<Response>
1086 ): void {
1087 if (this.workerNodes[workerNodeKey]?.usage != null) {
1088 const workerUsage = this.workerNodes[workerNodeKey].usage
1089 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
1090 this.updateRunTimeWorkerUsage(workerUsage, message)
1091 this.updateEluWorkerUsage(workerUsage, message)
1092 }
1093 if (
1094 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1095 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
1096 message.taskPerformance?.name as string
1097 ) != null
1098 ) {
1099 const taskFunctionWorkerUsage = this.workerNodes[
1100 workerNodeKey
1101 ].getTaskFunctionWorkerUsage(
1102 message.taskPerformance?.name as string
1103 ) as WorkerUsage
1104 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
1105 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
1106 this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
1107 }
1108 }
1109
1110 /**
1111 * Whether the worker node shall update its task function worker usage or not.
1112 *
1113 * @param workerNodeKey - The worker node key.
1114 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1115 */
1116 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
1117 const workerInfo = this.getWorkerInfo(workerNodeKey)
1118 return (
1119 workerInfo != null &&
1120 Array.isArray(workerInfo.taskFunctionNames) &&
1121 workerInfo.taskFunctionNames.length > 2
1122 )
1123 }
1124
1125 private updateTaskStatisticsWorkerUsage (
1126 workerUsage: WorkerUsage,
1127 message: MessageValue<Response>
1128 ): void {
1129 const workerTaskStatistics = workerUsage.tasks
1130 if (
1131 workerTaskStatistics.executing != null &&
1132 workerTaskStatistics.executing > 0
1133 ) {
1134 --workerTaskStatistics.executing
1135 }
1136 if (message.workerError == null) {
1137 ++workerTaskStatistics.executed
1138 } else {
1139 ++workerTaskStatistics.failed
1140 }
1141 }
1142
1143 private updateRunTimeWorkerUsage (
1144 workerUsage: WorkerUsage,
1145 message: MessageValue<Response>
1146 ): void {
1147 if (message.workerError != null) {
1148 return
1149 }
1150 updateMeasurementStatistics(
1151 workerUsage.runTime,
1152 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
1153 message.taskPerformance?.runTime ?? 0
1154 )
1155 }
1156
1157 private updateWaitTimeWorkerUsage (
1158 workerUsage: WorkerUsage,
1159 task: Task<Data>
1160 ): void {
1161 const timestamp = performance.now()
1162 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
1163 updateMeasurementStatistics(
1164 workerUsage.waitTime,
1165 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
1166 taskWaitTime
1167 )
1168 }
1169
1170 private updateEluWorkerUsage (
1171 workerUsage: WorkerUsage,
1172 message: MessageValue<Response>
1173 ): void {
1174 if (message.workerError != null) {
1175 return
1176 }
1177 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
1178 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
1179 updateMeasurementStatistics(
1180 workerUsage.elu.active,
1181 eluTaskStatisticsRequirements,
1182 message.taskPerformance?.elu?.active ?? 0
1183 )
1184 updateMeasurementStatistics(
1185 workerUsage.elu.idle,
1186 eluTaskStatisticsRequirements,
1187 message.taskPerformance?.elu?.idle ?? 0
1188 )
1189 if (eluTaskStatisticsRequirements.aggregate) {
1190 if (message.taskPerformance?.elu != null) {
1191 if (workerUsage.elu.utilization != null) {
1192 workerUsage.elu.utilization =
1193 (workerUsage.elu.utilization +
1194 message.taskPerformance.elu.utilization) /
1195 2
1196 } else {
1197 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
1198 }
1199 }
1200 }
1201 }
1202
1203 /**
1204 * Chooses a worker node for the next task.
1205 *
1206 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
1207 *
1208 * @returns The chosen worker node key
1209 */
1210 private chooseWorkerNode (): number {
1211 if (this.shallCreateDynamicWorker()) {
1212 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
1213 if (
1214 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1215 ) {
1216 return workerNodeKey
1217 }
1218 }
1219 return this.workerChoiceStrategyContext.execute()
1220 }
1221
1222 /**
1223 * Conditions for dynamic worker creation.
1224 *
1225 * @returns Whether to create a dynamic worker or not.
1226 */
1227 private shallCreateDynamicWorker (): boolean {
1228 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
1229 }
1230
1231 /**
1232 * Sends a message to worker given its worker node key.
1233 *
1234 * @param workerNodeKey - The worker node key.
1235 * @param message - The message.
1236 * @param transferList - The optional array of transferable objects.
1237 */
1238 protected abstract sendToWorker (
1239 workerNodeKey: number,
1240 message: MessageValue<Data>,
1241 transferList?: TransferListItem[]
1242 ): void
1243
1244 /**
1245 * Creates a new worker.
1246 *
1247 * @returns Newly created worker.
1248 */
1249 protected abstract createWorker (): Worker
1250
1251 /**
1252 * Creates a new, completely set up worker node.
1253 *
1254 * @returns New, completely set up worker node key.
1255 */
1256 protected createAndSetupWorkerNode (): number {
1257 const worker = this.createWorker()
1258
1259 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
1260 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
1261 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1262 worker.on('error', error => {
1263 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1264 this.flagWorkerNodeAsNotReady(workerNodeKey)
1265 const workerInfo = this.getWorkerInfo(workerNodeKey)
1266 this.emitter?.emit(PoolEvents.error, error)
1267 this.workerNodes[workerNodeKey].closeChannel()
1268 if (
1269 this.started &&
1270 !this.starting &&
1271 !this.destroying &&
1272 this.opts.restartWorkerOnError === true
1273 ) {
1274 if (workerInfo.dynamic) {
1275 this.createAndSetupDynamicWorkerNode()
1276 } else {
1277 this.createAndSetupWorkerNode()
1278 }
1279 }
1280 if (this.started && this.opts.enableTasksQueue === true) {
1281 this.redistributeQueuedTasks(workerNodeKey)
1282 }
1283 })
1284 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
1285 worker.once('exit', () => {
1286 this.removeWorkerNode(worker)
1287 })
1288
1289 const workerNodeKey = this.addWorkerNode(worker)
1290
1291 this.afterWorkerNodeSetup(workerNodeKey)
1292
1293 return workerNodeKey
1294 }
1295
1296 /**
1297 * Creates a new, completely set up dynamic worker node.
1298 *
1299 * @returns New, completely set up dynamic worker node key.
1300 */
1301 protected createAndSetupDynamicWorkerNode (): number {
1302 const workerNodeKey = this.createAndSetupWorkerNode()
1303 this.registerWorkerMessageListener(workerNodeKey, message => {
1304 this.checkMessageWorkerId(message)
1305 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1306 message.workerId
1307 )
1308 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
1309 // Kill message received from worker
1310 if (
1311 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1312 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
1313 ((this.opts.enableTasksQueue === false &&
1314 workerUsage.tasks.executing === 0) ||
1315 (this.opts.enableTasksQueue === true &&
1316 workerUsage.tasks.executing === 0 &&
1317 this.tasksQueueSize(localWorkerNodeKey) === 0)))
1318 ) {
1319 // Flag the worker node as not ready immediately
1320 this.flagWorkerNodeAsNotReady(localWorkerNodeKey)
1321 this.destroyWorkerNode(localWorkerNodeKey).catch(error => {
1322 this.emitter?.emit(PoolEvents.error, error)
1323 })
1324 }
1325 })
1326 const workerInfo = this.getWorkerInfo(workerNodeKey)
1327 this.sendToWorker(workerNodeKey, {
1328 checkActive: true
1329 })
1330 if (this.taskFunctions.size > 0) {
1331 for (const [taskFunctionName, taskFunction] of this.taskFunctions) {
1332 this.sendTaskFunctionOperationToWorker(workerNodeKey, {
1333 taskFunctionOperation: 'add',
1334 taskFunctionName,
1335 taskFunction: taskFunction.toString()
1336 }).catch(error => {
1337 this.emitter?.emit(PoolEvents.error, error)
1338 })
1339 }
1340 }
1341 workerInfo.dynamic = true
1342 if (
1343 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
1344 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1345 ) {
1346 workerInfo.ready = true
1347 }
1348 this.checkAndEmitDynamicWorkerCreationEvents()
1349 return workerNodeKey
1350 }
1351
1352 /**
1353 * Registers a listener callback on the worker given its worker node key.
1354 *
1355 * @param workerNodeKey - The worker node key.
1356 * @param listener - The message listener callback.
1357 */
1358 protected abstract registerWorkerMessageListener<
1359 Message extends Data | Response
1360 >(
1361 workerNodeKey: number,
1362 listener: (message: MessageValue<Message>) => void
1363 ): void
1364
1365 /**
1366 * Registers once a listener callback on the worker given its worker node key.
1367 *
1368 * @param workerNodeKey - The worker node key.
1369 * @param listener - The message listener callback.
1370 */
1371 protected abstract registerOnceWorkerMessageListener<
1372 Message extends Data | Response
1373 >(
1374 workerNodeKey: number,
1375 listener: (message: MessageValue<Message>) => void
1376 ): void
1377
1378 /**
1379 * Deregisters a listener callback on the worker given its worker node key.
1380 *
1381 * @param workerNodeKey - The worker node key.
1382 * @param listener - The message listener callback.
1383 */
1384 protected abstract deregisterWorkerMessageListener<
1385 Message extends Data | Response
1386 >(
1387 workerNodeKey: number,
1388 listener: (message: MessageValue<Message>) => void
1389 ): void
1390
1391 /**
1392 * Method hooked up after a worker node has been newly created.
1393 * Can be overridden.
1394 *
1395 * @param workerNodeKey - The newly created worker node key.
1396 */
1397 protected afterWorkerNodeSetup (workerNodeKey: number): void {
1398 // Listen to worker messages.
1399 this.registerWorkerMessageListener(
1400 workerNodeKey,
1401 this.workerMessageListener
1402 )
1403 // Send the startup message to worker.
1404 this.sendStartupMessageToWorker(workerNodeKey)
1405 // Send the statistics message to worker.
1406 this.sendStatisticsMessageToWorker(workerNodeKey)
1407 if (this.opts.enableTasksQueue === true) {
1408 if (this.opts.tasksQueueOptions?.taskStealing === true) {
1409 this.workerNodes[workerNodeKey].on(
1410 'idleWorkerNode',
1411 this.handleIdleWorkerNodeEvent
1412 )
1413 }
1414 if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
1415 this.workerNodes[workerNodeKey].on(
1416 'backPressure',
1417 this.handleBackPressureEvent
1418 )
1419 }
1420 }
1421 }
1422
1423 /**
1424 * Sends the startup message to worker given its worker node key.
1425 *
1426 * @param workerNodeKey - The worker node key.
1427 */
1428 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1429
1430 /**
1431 * Sends the statistics message to worker given its worker node key.
1432 *
1433 * @param workerNodeKey - The worker node key.
1434 */
1435 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
1436 this.sendToWorker(workerNodeKey, {
1437 statistics: {
1438 runTime:
1439 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1440 .runTime.aggregate,
1441 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1442 .elu.aggregate
1443 }
1444 })
1445 }
1446
1447 private redistributeQueuedTasks (workerNodeKey: number): void {
1448 while (this.tasksQueueSize(workerNodeKey) > 0) {
1449 const destinationWorkerNodeKey = this.workerNodes.reduce(
1450 (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
1451 return workerNode.info.ready &&
1452 workerNode.usage.tasks.queued <
1453 workerNodes[minWorkerNodeKey].usage.tasks.queued
1454 ? workerNodeKey
1455 : minWorkerNodeKey
1456 },
1457 0
1458 )
1459 const task = this.dequeueTask(workerNodeKey) as Task<Data>
1460 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1461 this.executeTask(destinationWorkerNodeKey, task)
1462 } else {
1463 this.enqueueTask(destinationWorkerNodeKey, task)
1464 }
1465 }
1466 }
1467
1468 private updateTaskStolenStatisticsWorkerUsage (
1469 workerNodeKey: number,
1470 taskName: string
1471 ): void {
1472 const workerNode = this.workerNodes[workerNodeKey]
1473 if (workerNode?.usage != null) {
1474 ++workerNode.usage.tasks.stolen
1475 }
1476 if (
1477 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1478 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1479 ) {
1480 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1481 taskName
1482 ) as WorkerUsage
1483 ++taskFunctionWorkerUsage.tasks.stolen
1484 }
1485 }
1486
1487 private updateTaskSequentiallyStolenStatisticsWorkerUsage (
1488 workerNodeKey: number
1489 ): void {
1490 const workerNode = this.workerNodes[workerNodeKey]
1491 if (workerNode?.usage != null) {
1492 ++workerNode.usage.tasks.sequentiallyStolen
1493 }
1494 }
1495
1496 private updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1497 workerNodeKey: number,
1498 taskName: string
1499 ): void {
1500 const workerNode = this.workerNodes[workerNodeKey]
1501 if (
1502 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1503 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1504 ) {
1505 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1506 taskName
1507 ) as WorkerUsage
1508 ++taskFunctionWorkerUsage.tasks.sequentiallyStolen
1509 }
1510 }
1511
1512 private resetTaskSequentiallyStolenStatisticsWorkerUsage (
1513 workerNodeKey: number
1514 ): void {
1515 const workerNode = this.workerNodes[workerNodeKey]
1516 if (workerNode?.usage != null) {
1517 workerNode.usage.tasks.sequentiallyStolen = 0
1518 }
1519 }
1520
1521 private resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1522 workerNodeKey: number,
1523 taskName: string
1524 ): void {
1525 const workerNode = this.workerNodes[workerNodeKey]
1526 if (
1527 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1528 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1529 ) {
1530 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1531 taskName
1532 ) as WorkerUsage
1533 taskFunctionWorkerUsage.tasks.sequentiallyStolen = 0
1534 }
1535 }
1536
1537 private readonly handleIdleWorkerNodeEvent = (
1538 eventDetail: WorkerNodeEventDetail,
1539 previousStolenTask?: Task<Data>
1540 ): void => {
1541 const { workerNodeKey } = eventDetail
1542 if (workerNodeKey == null) {
1543 throw new Error(
1544 'WorkerNode event detail workerNodeKey attribute must be defined'
1545 )
1546 }
1547 const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks
1548 if (
1549 previousStolenTask != null &&
1550 workerNodeTasksUsage.sequentiallyStolen > 0 &&
1551 (workerNodeTasksUsage.executing > 0 ||
1552 this.tasksQueueSize(workerNodeKey) > 0)
1553 ) {
1554 for (const taskName of this.workerNodes[workerNodeKey].info
1555 .taskFunctionNames as string[]) {
1556 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1557 workerNodeKey,
1558 taskName
1559 )
1560 }
1561 this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
1562 return
1563 }
1564 const stolenTask = this.workerNodeStealTask(workerNodeKey)
1565 if (
1566 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1567 stolenTask != null
1568 ) {
1569 const taskFunctionTasksWorkerUsage = this.workerNodes[
1570 workerNodeKey
1571 ].getTaskFunctionWorkerUsage(stolenTask.name as string)
1572 ?.tasks as TaskStatistics
1573 if (
1574 taskFunctionTasksWorkerUsage.sequentiallyStolen === 0 ||
1575 (previousStolenTask != null &&
1576 previousStolenTask.name === stolenTask.name &&
1577 taskFunctionTasksWorkerUsage.sequentiallyStolen > 0)
1578 ) {
1579 this.updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1580 workerNodeKey,
1581 stolenTask.name as string
1582 )
1583 } else {
1584 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1585 workerNodeKey,
1586 stolenTask.name as string
1587 )
1588 }
1589 }
1590 sleep(exponentialDelay(workerNodeTasksUsage.sequentiallyStolen))
1591 .then(() => {
1592 this.handleIdleWorkerNodeEvent(eventDetail, stolenTask)
1593 return undefined
1594 })
1595 .catch(EMPTY_FUNCTION)
1596 }
1597
1598 private readonly workerNodeStealTask = (
1599 workerNodeKey: number
1600 ): Task<Data> | undefined => {
1601 const workerNodes = this.workerNodes
1602 .slice()
1603 .sort(
1604 (workerNodeA, workerNodeB) =>
1605 workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
1606 )
1607 const sourceWorkerNode = workerNodes.find(
1608 (sourceWorkerNode, sourceWorkerNodeKey) =>
1609 sourceWorkerNode.info.ready &&
1610 sourceWorkerNodeKey !== workerNodeKey &&
1611 sourceWorkerNode.usage.tasks.queued > 0
1612 )
1613 if (sourceWorkerNode != null) {
1614 const task = sourceWorkerNode.popTask() as Task<Data>
1615 if (this.shallExecuteTask(workerNodeKey)) {
1616 this.executeTask(workerNodeKey, task)
1617 } else {
1618 this.enqueueTask(workerNodeKey, task)
1619 }
1620 this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
1621 this.updateTaskStolenStatisticsWorkerUsage(
1622 workerNodeKey,
1623 task.name as string
1624 )
1625 return task
1626 }
1627 }
1628
1629 private readonly handleBackPressureEvent = (
1630 eventDetail: WorkerNodeEventDetail
1631 ): void => {
1632 const { workerId } = eventDetail
1633 const sizeOffset = 1
1634 if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) {
1635 return
1636 }
1637 const sourceWorkerNode =
1638 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
1639 const workerNodes = this.workerNodes
1640 .slice()
1641 .sort(
1642 (workerNodeA, workerNodeB) =>
1643 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1644 )
1645 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1646 if (
1647 sourceWorkerNode.usage.tasks.queued > 0 &&
1648 workerNode.info.ready &&
1649 workerNode.info.id !== workerId &&
1650 workerNode.usage.tasks.queued <
1651 (this.opts.tasksQueueOptions?.size as number) - sizeOffset
1652 ) {
1653 const task = sourceWorkerNode.popTask() as Task<Data>
1654 if (this.shallExecuteTask(workerNodeKey)) {
1655 this.executeTask(workerNodeKey, task)
1656 } else {
1657 this.enqueueTask(workerNodeKey, task)
1658 }
1659 this.updateTaskStolenStatisticsWorkerUsage(
1660 workerNodeKey,
1661 task.name as string
1662 )
1663 }
1664 }
1665 }
1666
1667 /**
1668 * This method is the message listener registered on each worker.
1669 */
1670 protected readonly workerMessageListener = (
1671 message: MessageValue<Response>
1672 ): void => {
1673 this.checkMessageWorkerId(message)
1674 const { workerId, ready, taskId, taskFunctionNames } = message
1675 if (ready != null && taskFunctionNames != null) {
1676 // Worker ready response received from worker
1677 this.handleWorkerReadyResponse(message)
1678 } else if (taskId != null) {
1679 // Task execution response received from worker
1680 this.handleTaskExecutionResponse(message)
1681 } else if (taskFunctionNames != null) {
1682 // Task function names message received from worker
1683 this.getWorkerInfo(
1684 this.getWorkerNodeKeyByWorkerId(workerId)
1685 ).taskFunctionNames = taskFunctionNames
1686 }
1687 }
1688
1689 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
1690 const { workerId, ready, taskFunctionNames } = message
1691 if (ready === false) {
1692 throw new Error(`Worker ${workerId as number} failed to initialize`)
1693 }
1694 const workerInfo = this.getWorkerInfo(
1695 this.getWorkerNodeKeyByWorkerId(workerId)
1696 )
1697 workerInfo.ready = ready as boolean
1698 workerInfo.taskFunctionNames = taskFunctionNames
1699 if (!this.readyEventEmitted && this.ready) {
1700 this.readyEventEmitted = true
1701 this.emitter?.emit(PoolEvents.ready, this.info)
1702 }
1703 }
1704
1705 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1706 const { workerId, taskId, workerError, data } = message
1707 const promiseResponse = this.promiseResponseMap.get(taskId as string)
1708 if (promiseResponse != null) {
1709 const { resolve, reject, workerNodeKey } = promiseResponse
1710 if (workerError != null) {
1711 this.emitter?.emit(PoolEvents.taskError, workerError)
1712 reject(workerError.message)
1713 } else {
1714 resolve(data as Response)
1715 }
1716 this.afterTaskExecutionHook(workerNodeKey, message)
1717 this.workerChoiceStrategyContext.update(workerNodeKey)
1718 this.promiseResponseMap.delete(taskId as string)
1719 if (this.opts.enableTasksQueue === true) {
1720 const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks
1721 if (
1722 this.tasksQueueSize(workerNodeKey) > 0 &&
1723 workerNodeTasksUsage.executing <
1724 (this.opts.tasksQueueOptions?.concurrency as number)
1725 ) {
1726 this.executeTask(
1727 workerNodeKey,
1728 this.dequeueTask(workerNodeKey) as Task<Data>
1729 )
1730 }
1731 if (
1732 workerNodeTasksUsage.executing === 0 &&
1733 this.tasksQueueSize(workerNodeKey) === 0 &&
1734 workerNodeTasksUsage.sequentiallyStolen === 0
1735 ) {
1736 this.workerNodes[workerNodeKey].emit('idleWorkerNode', {
1737 workerId: workerId as number,
1738 workerNodeKey
1739 })
1740 }
1741 }
1742 }
1743 }
1744
1745 private checkAndEmitTaskExecutionEvents (): void {
1746 if (this.busy) {
1747 this.emitter?.emit(PoolEvents.busy, this.info)
1748 }
1749 }
1750
1751 private checkAndEmitTaskQueuingEvents (): void {
1752 if (this.hasBackPressure()) {
1753 this.emitter?.emit(PoolEvents.backPressure, this.info)
1754 }
1755 }
1756
1757 private checkAndEmitDynamicWorkerCreationEvents (): void {
1758 if (this.type === PoolTypes.dynamic) {
1759 if (this.full) {
1760 this.emitter?.emit(PoolEvents.full, this.info)
1761 }
1762 }
1763 }
1764
1765 /**
1766 * Gets the worker information given its worker node key.
1767 *
1768 * @param workerNodeKey - The worker node key.
1769 * @returns The worker information.
1770 */
1771 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
1772 return this.workerNodes[workerNodeKey]?.info
1773 }
1774
1775 /**
1776 * Adds the given worker in the pool worker nodes.
1777 *
1778 * @param worker - The worker.
1779 * @returns The added worker node key.
1780 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1781 */
1782 private addWorkerNode (worker: Worker): number {
1783 const workerNode = new WorkerNode<Worker, Data>(
1784 worker,
1785 this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2)
1786 )
1787 // Flag the worker node as ready at pool startup.
1788 if (this.starting) {
1789 workerNode.info.ready = true
1790 }
1791 this.workerNodes.push(workerNode)
1792 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1793 if (workerNodeKey === -1) {
1794 throw new Error('Worker added not found in worker nodes')
1795 }
1796 return workerNodeKey
1797 }
1798
1799 /**
1800 * Removes the given worker from the pool worker nodes.
1801 *
1802 * @param worker - The worker.
1803 */
1804 private removeWorkerNode (worker: Worker): void {
1805 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1806 if (workerNodeKey !== -1) {
1807 this.workerNodes.splice(workerNodeKey, 1)
1808 this.workerChoiceStrategyContext.remove(workerNodeKey)
1809 }
1810 }
1811
1812 protected flagWorkerNodeAsNotReady (workerNodeKey: number): void {
1813 this.getWorkerInfo(workerNodeKey).ready = false
1814 }
1815
1816 /** @inheritDoc */
1817 public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
1818 return (
1819 this.opts.enableTasksQueue === true &&
1820 this.workerNodes[workerNodeKey].hasBackPressure()
1821 )
1822 }
1823
1824 private hasBackPressure (): boolean {
1825 return (
1826 this.opts.enableTasksQueue === true &&
1827 this.workerNodes.findIndex(
1828 workerNode => !workerNode.hasBackPressure()
1829 ) === -1
1830 )
1831 }
1832
1833 /**
1834 * Executes the given task on the worker given its worker node key.
1835 *
1836 * @param workerNodeKey - The worker node key.
1837 * @param task - The task to execute.
1838 */
1839 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1840 this.beforeTaskExecutionHook(workerNodeKey, task)
1841 this.sendToWorker(workerNodeKey, task, task.transferList)
1842 this.checkAndEmitTaskExecutionEvents()
1843 }
1844
1845 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1846 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1847 this.checkAndEmitTaskQueuingEvents()
1848 return tasksQueueSize
1849 }
1850
1851 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1852 return this.workerNodes[workerNodeKey].dequeueTask()
1853 }
1854
1855 private tasksQueueSize (workerNodeKey: number): number {
1856 return this.workerNodes[workerNodeKey].tasksQueueSize()
1857 }
1858
1859 protected flushTasksQueue (workerNodeKey: number): void {
1860 while (this.tasksQueueSize(workerNodeKey) > 0) {
1861 this.executeTask(
1862 workerNodeKey,
1863 this.dequeueTask(workerNodeKey) as Task<Data>
1864 )
1865 }
1866 this.workerNodes[workerNodeKey].clearTasksQueue()
1867 }
1868
1869 private flushTasksQueues (): void {
1870 for (const [workerNodeKey] of this.workerNodes.entries()) {
1871 this.flushTasksQueue(workerNodeKey)
1872 }
1873 }
1874 }