feat: switch to `EventEmitterAsyncRessource` type from `@types/node`
[poolifier.git] / src / pools / abstract-pool.ts
1 import { randomUUID } from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import type { TransferListItem } from 'node:worker_threads'
4 import { EventEmitterAsyncResource } from 'node:events'
5 import type {
6 MessageValue,
7 PromiseResponseWrapper,
8 Task
9 } from '../utility-types'
10 import {
11 DEFAULT_TASK_NAME,
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
13 EMPTY_FUNCTION,
14 average,
15 isKillBehavior,
16 isPlainObject,
17 max,
18 median,
19 min,
20 round
21 } from '../utils'
22 import { KillBehaviors } from '../worker/worker-options'
23 import type { TaskFunction } from '../worker/task-functions'
24 import {
25 type IPool,
26 PoolEvents,
27 type PoolInfo,
28 type PoolOptions,
29 type PoolType,
30 PoolTypes,
31 type TasksQueueOptions
32 } from './pool'
33 import type {
34 IWorker,
35 IWorkerNode,
36 WorkerInfo,
37 WorkerType,
38 WorkerUsage
39 } from './worker'
40 import {
41 type MeasurementStatisticsRequirements,
42 Measurements,
43 WorkerChoiceStrategies,
44 type WorkerChoiceStrategy,
45 type WorkerChoiceStrategyOptions
46 } from './selection-strategies/selection-strategies-types'
47 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
48 import { version } from './version'
49 import { WorkerNode } from './worker-node'
50 import {
51 checkFilePath,
52 checkValidTasksQueueOptions,
53 checkValidWorkerChoiceStrategy,
54 updateMeasurementStatistics
55 } from './utils'
56
57 /**
58 * Base class that implements some shared logic for all poolifier pools.
59 *
60 * @typeParam Worker - Type of worker which manages this pool.
61 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
62 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
63 */
64 export abstract class AbstractPool<
65 Worker extends IWorker,
66 Data = unknown,
67 Response = unknown
68 > implements IPool<Worker, Data, Response> {
69 /** @inheritDoc */
70 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
71
72 /** @inheritDoc */
73 public emitter?: EventEmitterAsyncResource
74
75 /**
76 * The task execution response promise map:
77 * - `key`: The message id of each submitted task.
78 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
79 *
80 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
81 */
82 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
83 new Map<string, PromiseResponseWrapper<Response>>()
84
85 /**
86 * Worker choice strategy context referencing a worker choice algorithm implementation.
87 */
88 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
89 Worker,
90 Data,
91 Response
92 >
93
94 /**
95 * Dynamic pool maximum size property placeholder.
96 */
97 protected readonly max?: number
98
99 /**
100 * The task functions added at runtime map:
101 * - `key`: The task function name.
102 * - `value`: The task function itself.
103 */
104 private readonly taskFunctions: Map<string, TaskFunction<Data, Response>>
105
106 /**
107 * Whether the pool is started or not.
108 */
109 private started: boolean
110 /**
111 * Whether the pool is starting or not.
112 */
113 private starting: boolean
114 /**
115 * The start timestamp of the pool.
116 */
117 private readonly startTimestamp
118
119 /**
120 * Constructs a new poolifier pool.
121 *
122 * @param numberOfWorkers - Number of workers that this pool should manage.
123 * @param filePath - Path to the worker file.
124 * @param opts - Options for the pool.
125 */
126 public constructor (
127 protected readonly numberOfWorkers: number,
128 protected readonly filePath: string,
129 protected readonly opts: PoolOptions<Worker>
130 ) {
131 if (!this.isMain()) {
132 throw new Error(
133 'Cannot start a pool from a worker with the same type as the pool'
134 )
135 }
136 checkFilePath(this.filePath)
137 this.checkNumberOfWorkers(this.numberOfWorkers)
138 this.checkPoolOptions(this.opts)
139
140 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
141 this.executeTask = this.executeTask.bind(this)
142 this.enqueueTask = this.enqueueTask.bind(this)
143
144 if (this.opts.enableEvents === true) {
145 this.initializeEventEmitter()
146 }
147 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
148 Worker,
149 Data,
150 Response
151 >(
152 this,
153 this.opts.workerChoiceStrategy,
154 this.opts.workerChoiceStrategyOptions
155 )
156
157 this.setupHook()
158
159 this.taskFunctions = new Map<string, TaskFunction<Data, Response>>()
160
161 this.started = false
162 this.starting = false
163 if (this.opts.startWorkers === true) {
164 this.start()
165 }
166
167 this.startTimestamp = performance.now()
168 }
169
170 private checkNumberOfWorkers (numberOfWorkers: number): void {
171 if (numberOfWorkers == null) {
172 throw new Error(
173 'Cannot instantiate a pool without specifying the number of workers'
174 )
175 } else if (!Number.isSafeInteger(numberOfWorkers)) {
176 throw new TypeError(
177 'Cannot instantiate a pool with a non safe integer number of workers'
178 )
179 } else if (numberOfWorkers < 0) {
180 throw new RangeError(
181 'Cannot instantiate a pool with a negative number of workers'
182 )
183 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
184 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
185 }
186 }
187
188 private checkPoolOptions (opts: PoolOptions<Worker>): void {
189 if (isPlainObject(opts)) {
190 this.opts.startWorkers = opts.startWorkers ?? true
191 checkValidWorkerChoiceStrategy(
192 opts.workerChoiceStrategy as WorkerChoiceStrategy
193 )
194 this.opts.workerChoiceStrategy =
195 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
196 this.checkValidWorkerChoiceStrategyOptions(
197 opts.workerChoiceStrategyOptions as WorkerChoiceStrategyOptions
198 )
199 this.opts.workerChoiceStrategyOptions = {
200 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
201 ...opts.workerChoiceStrategyOptions
202 }
203 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
204 this.opts.enableEvents = opts.enableEvents ?? true
205 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
206 if (this.opts.enableTasksQueue) {
207 checkValidTasksQueueOptions(opts.tasksQueueOptions as TasksQueueOptions)
208 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
209 opts.tasksQueueOptions as TasksQueueOptions
210 )
211 }
212 } else {
213 throw new TypeError('Invalid pool options: must be a plain object')
214 }
215 }
216
217 private checkValidWorkerChoiceStrategyOptions (
218 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
219 ): void {
220 if (
221 workerChoiceStrategyOptions != null &&
222 !isPlainObject(workerChoiceStrategyOptions)
223 ) {
224 throw new TypeError(
225 'Invalid worker choice strategy options: must be a plain object'
226 )
227 }
228 if (
229 workerChoiceStrategyOptions?.retries != null &&
230 !Number.isSafeInteger(workerChoiceStrategyOptions.retries)
231 ) {
232 throw new TypeError(
233 'Invalid worker choice strategy options: retries must be an integer'
234 )
235 }
236 if (
237 workerChoiceStrategyOptions?.retries != null &&
238 workerChoiceStrategyOptions.retries < 0
239 ) {
240 throw new RangeError(
241 `Invalid worker choice strategy options: retries '${workerChoiceStrategyOptions.retries}' must be greater or equal than zero`
242 )
243 }
244 if (
245 workerChoiceStrategyOptions?.weights != null &&
246 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
247 ) {
248 throw new Error(
249 'Invalid worker choice strategy options: must have a weight for each worker node'
250 )
251 }
252 if (
253 workerChoiceStrategyOptions?.measurement != null &&
254 !Object.values(Measurements).includes(
255 workerChoiceStrategyOptions.measurement
256 )
257 ) {
258 throw new Error(
259 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
260 )
261 }
262 }
263
264 private initializeEventEmitter (): void {
265 this.emitter = new EventEmitterAsyncResource({
266 name: `poolifier:${this.type}-${this.worker}-pool`
267 })
268 }
269
270 /** @inheritDoc */
271 public get info (): PoolInfo {
272 return {
273 version,
274 type: this.type,
275 worker: this.worker,
276 started: this.started,
277 ready: this.ready,
278 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
279 minSize: this.minSize,
280 maxSize: this.maxSize,
281 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
282 .runTime.aggregate &&
283 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
284 .waitTime.aggregate && { utilization: round(this.utilization) }),
285 workerNodes: this.workerNodes.length,
286 idleWorkerNodes: this.workerNodes.reduce(
287 (accumulator, workerNode) =>
288 workerNode.usage.tasks.executing === 0
289 ? accumulator + 1
290 : accumulator,
291 0
292 ),
293 busyWorkerNodes: this.workerNodes.reduce(
294 (accumulator, workerNode) =>
295 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
296 0
297 ),
298 executedTasks: this.workerNodes.reduce(
299 (accumulator, workerNode) =>
300 accumulator + workerNode.usage.tasks.executed,
301 0
302 ),
303 executingTasks: this.workerNodes.reduce(
304 (accumulator, workerNode) =>
305 accumulator + workerNode.usage.tasks.executing,
306 0
307 ),
308 ...(this.opts.enableTasksQueue === true && {
309 queuedTasks: this.workerNodes.reduce(
310 (accumulator, workerNode) =>
311 accumulator + workerNode.usage.tasks.queued,
312 0
313 )
314 }),
315 ...(this.opts.enableTasksQueue === true && {
316 maxQueuedTasks: this.workerNodes.reduce(
317 (accumulator, workerNode) =>
318 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
319 0
320 )
321 }),
322 ...(this.opts.enableTasksQueue === true && {
323 backPressure: this.hasBackPressure()
324 }),
325 ...(this.opts.enableTasksQueue === true && {
326 stolenTasks: this.workerNodes.reduce(
327 (accumulator, workerNode) =>
328 accumulator + workerNode.usage.tasks.stolen,
329 0
330 )
331 }),
332 failedTasks: this.workerNodes.reduce(
333 (accumulator, workerNode) =>
334 accumulator + workerNode.usage.tasks.failed,
335 0
336 ),
337 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
338 .runTime.aggregate && {
339 runTime: {
340 minimum: round(
341 min(
342 ...this.workerNodes.map(
343 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
344 )
345 )
346 ),
347 maximum: round(
348 max(
349 ...this.workerNodes.map(
350 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
351 )
352 )
353 ),
354 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
355 .runTime.average && {
356 average: round(
357 average(
358 this.workerNodes.reduce<number[]>(
359 (accumulator, workerNode) =>
360 accumulator.concat(workerNode.usage.runTime.history),
361 []
362 )
363 )
364 )
365 }),
366 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
367 .runTime.median && {
368 median: round(
369 median(
370 this.workerNodes.reduce<number[]>(
371 (accumulator, workerNode) =>
372 accumulator.concat(workerNode.usage.runTime.history),
373 []
374 )
375 )
376 )
377 })
378 }
379 }),
380 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
381 .waitTime.aggregate && {
382 waitTime: {
383 minimum: round(
384 min(
385 ...this.workerNodes.map(
386 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
387 )
388 )
389 ),
390 maximum: round(
391 max(
392 ...this.workerNodes.map(
393 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
394 )
395 )
396 ),
397 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
398 .waitTime.average && {
399 average: round(
400 average(
401 this.workerNodes.reduce<number[]>(
402 (accumulator, workerNode) =>
403 accumulator.concat(workerNode.usage.waitTime.history),
404 []
405 )
406 )
407 )
408 }),
409 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
410 .waitTime.median && {
411 median: round(
412 median(
413 this.workerNodes.reduce<number[]>(
414 (accumulator, workerNode) =>
415 accumulator.concat(workerNode.usage.waitTime.history),
416 []
417 )
418 )
419 )
420 })
421 }
422 })
423 }
424 }
425
426 /**
427 * The pool readiness boolean status.
428 */
429 private get ready (): boolean {
430 return (
431 this.workerNodes.reduce(
432 (accumulator, workerNode) =>
433 !workerNode.info.dynamic && workerNode.info.ready
434 ? accumulator + 1
435 : accumulator,
436 0
437 ) >= this.minSize
438 )
439 }
440
441 /**
442 * The approximate pool utilization.
443 *
444 * @returns The pool utilization.
445 */
446 private get utilization (): number {
447 const poolTimeCapacity =
448 (performance.now() - this.startTimestamp) * this.maxSize
449 const totalTasksRunTime = this.workerNodes.reduce(
450 (accumulator, workerNode) =>
451 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
452 0
453 )
454 const totalTasksWaitTime = this.workerNodes.reduce(
455 (accumulator, workerNode) =>
456 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
457 0
458 )
459 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
460 }
461
462 /**
463 * The pool type.
464 *
465 * If it is `'dynamic'`, it provides the `max` property.
466 */
467 protected abstract get type (): PoolType
468
469 /**
470 * The worker type.
471 */
472 protected abstract get worker (): WorkerType
473
474 /**
475 * The pool minimum size.
476 */
477 protected get minSize (): number {
478 return this.numberOfWorkers
479 }
480
481 /**
482 * The pool maximum size.
483 */
484 protected get maxSize (): number {
485 return this.max ?? this.numberOfWorkers
486 }
487
488 /**
489 * Checks if the worker id sent in the received message from a worker is valid.
490 *
491 * @param message - The received message.
492 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
493 */
494 private checkMessageWorkerId (message: MessageValue<Data | Response>): void {
495 if (message.workerId == null) {
496 throw new Error('Worker message received without worker id')
497 } else if (
498 message.workerId != null &&
499 this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
500 ) {
501 throw new Error(
502 `Worker message received from unknown worker '${message.workerId}'`
503 )
504 }
505 }
506
507 /**
508 * Gets the given worker its worker node key.
509 *
510 * @param worker - The worker.
511 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
512 */
513 private getWorkerNodeKeyByWorker (worker: Worker): number {
514 return this.workerNodes.findIndex(
515 workerNode => workerNode.worker === worker
516 )
517 }
518
519 /**
520 * Gets the worker node key given its worker id.
521 *
522 * @param workerId - The worker id.
523 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
524 */
525 private getWorkerNodeKeyByWorkerId (workerId: number | undefined): number {
526 return this.workerNodes.findIndex(
527 workerNode => workerNode.info.id === workerId
528 )
529 }
530
531 /** @inheritDoc */
532 public setWorkerChoiceStrategy (
533 workerChoiceStrategy: WorkerChoiceStrategy,
534 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
535 ): void {
536 checkValidWorkerChoiceStrategy(workerChoiceStrategy)
537 this.opts.workerChoiceStrategy = workerChoiceStrategy
538 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
539 this.opts.workerChoiceStrategy
540 )
541 if (workerChoiceStrategyOptions != null) {
542 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
543 }
544 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
545 workerNode.resetUsage()
546 this.sendStatisticsMessageToWorker(workerNodeKey)
547 }
548 }
549
550 /** @inheritDoc */
551 public setWorkerChoiceStrategyOptions (
552 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
553 ): void {
554 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
555 this.opts.workerChoiceStrategyOptions = {
556 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
557 ...workerChoiceStrategyOptions
558 }
559 this.workerChoiceStrategyContext.setOptions(
560 this.opts.workerChoiceStrategyOptions
561 )
562 }
563
564 /** @inheritDoc */
565 public enableTasksQueue (
566 enable: boolean,
567 tasksQueueOptions?: TasksQueueOptions
568 ): void {
569 if (this.opts.enableTasksQueue === true && !enable) {
570 this.unsetTaskStealing()
571 this.unsetTasksStealingOnBackPressure()
572 this.flushTasksQueues()
573 }
574 this.opts.enableTasksQueue = enable
575 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
576 }
577
578 /** @inheritDoc */
579 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
580 if (this.opts.enableTasksQueue === true) {
581 checkValidTasksQueueOptions(tasksQueueOptions)
582 this.opts.tasksQueueOptions =
583 this.buildTasksQueueOptions(tasksQueueOptions)
584 this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number)
585 if (this.opts.tasksQueueOptions.taskStealing === true) {
586 this.setTaskStealing()
587 } else {
588 this.unsetTaskStealing()
589 }
590 if (this.opts.tasksQueueOptions.tasksStealingOnBackPressure === true) {
591 this.setTasksStealingOnBackPressure()
592 } else {
593 this.unsetTasksStealingOnBackPressure()
594 }
595 } else if (this.opts.tasksQueueOptions != null) {
596 delete this.opts.tasksQueueOptions
597 }
598 }
599
600 private buildTasksQueueOptions (
601 tasksQueueOptions: TasksQueueOptions
602 ): TasksQueueOptions {
603 return {
604 ...{
605 size: Math.pow(this.maxSize, 2),
606 concurrency: 1,
607 taskStealing: true,
608 tasksStealingOnBackPressure: true
609 },
610 ...tasksQueueOptions
611 }
612 }
613
614 private setTasksQueueSize (size: number): void {
615 for (const workerNode of this.workerNodes) {
616 workerNode.tasksQueueBackPressureSize = size
617 }
618 }
619
620 private setTaskStealing (): void {
621 for (const [workerNodeKey] of this.workerNodes.entries()) {
622 this.workerNodes[workerNodeKey].onEmptyQueue =
623 this.taskStealingOnEmptyQueue.bind(this)
624 }
625 }
626
627 private unsetTaskStealing (): void {
628 for (const [workerNodeKey] of this.workerNodes.entries()) {
629 delete this.workerNodes[workerNodeKey].onEmptyQueue
630 }
631 }
632
633 private setTasksStealingOnBackPressure (): void {
634 for (const [workerNodeKey] of this.workerNodes.entries()) {
635 this.workerNodes[workerNodeKey].onBackPressure =
636 this.tasksStealingOnBackPressure.bind(this)
637 }
638 }
639
640 private unsetTasksStealingOnBackPressure (): void {
641 for (const [workerNodeKey] of this.workerNodes.entries()) {
642 delete this.workerNodes[workerNodeKey].onBackPressure
643 }
644 }
645
646 /**
647 * Whether the pool is full or not.
648 *
649 * The pool filling boolean status.
650 */
651 protected get full (): boolean {
652 return this.workerNodes.length >= this.maxSize
653 }
654
655 /**
656 * Whether the pool is busy or not.
657 *
658 * The pool busyness boolean status.
659 */
660 protected abstract get busy (): boolean
661
662 /**
663 * Whether worker nodes are executing concurrently their tasks quota or not.
664 *
665 * @returns Worker nodes busyness boolean status.
666 */
667 protected internalBusy (): boolean {
668 if (this.opts.enableTasksQueue === true) {
669 return (
670 this.workerNodes.findIndex(
671 workerNode =>
672 workerNode.info.ready &&
673 workerNode.usage.tasks.executing <
674 (this.opts.tasksQueueOptions?.concurrency as number)
675 ) === -1
676 )
677 }
678 return (
679 this.workerNodes.findIndex(
680 workerNode =>
681 workerNode.info.ready && workerNode.usage.tasks.executing === 0
682 ) === -1
683 )
684 }
685
686 private async sendTaskFunctionOperationToWorker (
687 workerNodeKey: number,
688 message: MessageValue<Data>
689 ): Promise<boolean> {
690 return await new Promise<boolean>((resolve, reject) => {
691 const taskFunctionOperationListener = (
692 message: MessageValue<Response>
693 ): void => {
694 this.checkMessageWorkerId(message)
695 const workerId = this.getWorkerInfo(workerNodeKey).id as number
696 if (
697 message.taskFunctionOperationStatus != null &&
698 message.workerId === workerId
699 ) {
700 if (message.taskFunctionOperationStatus) {
701 resolve(true)
702 } else if (!message.taskFunctionOperationStatus) {
703 reject(
704 new Error(
705 `Task function operation '${
706 message.taskFunctionOperation as string
707 }' failed on worker ${message.workerId} with error: '${
708 message.workerError?.message as string
709 }'`
710 )
711 )
712 }
713 this.deregisterWorkerMessageListener(
714 this.getWorkerNodeKeyByWorkerId(message.workerId),
715 taskFunctionOperationListener
716 )
717 }
718 }
719 this.registerWorkerMessageListener(
720 workerNodeKey,
721 taskFunctionOperationListener
722 )
723 this.sendToWorker(workerNodeKey, message)
724 })
725 }
726
727 private async sendTaskFunctionOperationToWorkers (
728 message: MessageValue<Data>
729 ): Promise<boolean> {
730 return await new Promise<boolean>((resolve, reject) => {
731 const responsesReceived = new Array<MessageValue<Response>>()
732 const taskFunctionOperationsListener = (
733 message: MessageValue<Response>
734 ): void => {
735 this.checkMessageWorkerId(message)
736 if (message.taskFunctionOperationStatus != null) {
737 responsesReceived.push(message)
738 if (responsesReceived.length === this.workerNodes.length) {
739 if (
740 responsesReceived.every(
741 message => message.taskFunctionOperationStatus === true
742 )
743 ) {
744 resolve(true)
745 } else if (
746 responsesReceived.some(
747 message => message.taskFunctionOperationStatus === false
748 )
749 ) {
750 const errorResponse = responsesReceived.find(
751 response => response.taskFunctionOperationStatus === false
752 )
753 reject(
754 new Error(
755 `Task function operation '${
756 message.taskFunctionOperation as string
757 }' failed on worker ${
758 errorResponse?.workerId as number
759 } with error: '${
760 errorResponse?.workerError?.message as string
761 }'`
762 )
763 )
764 }
765 this.deregisterWorkerMessageListener(
766 this.getWorkerNodeKeyByWorkerId(message.workerId),
767 taskFunctionOperationsListener
768 )
769 }
770 }
771 }
772 for (const [workerNodeKey] of this.workerNodes.entries()) {
773 this.registerWorkerMessageListener(
774 workerNodeKey,
775 taskFunctionOperationsListener
776 )
777 this.sendToWorker(workerNodeKey, message)
778 }
779 })
780 }
781
782 /** @inheritDoc */
783 public hasTaskFunction (name: string): boolean {
784 for (const workerNode of this.workerNodes) {
785 if (
786 Array.isArray(workerNode.info.taskFunctionNames) &&
787 workerNode.info.taskFunctionNames.includes(name)
788 ) {
789 return true
790 }
791 }
792 return false
793 }
794
795 /** @inheritDoc */
796 public async addTaskFunction (
797 name: string,
798 fn: TaskFunction<Data, Response>
799 ): Promise<boolean> {
800 if (typeof name !== 'string') {
801 throw new TypeError('name argument must be a string')
802 }
803 if (typeof name === 'string' && name.trim().length === 0) {
804 throw new TypeError('name argument must not be an empty string')
805 }
806 if (typeof fn !== 'function') {
807 throw new TypeError('fn argument must be a function')
808 }
809 const opResult = await this.sendTaskFunctionOperationToWorkers({
810 taskFunctionOperation: 'add',
811 taskFunctionName: name,
812 taskFunction: fn.toString()
813 })
814 this.taskFunctions.set(name, fn)
815 return opResult
816 }
817
818 /** @inheritDoc */
819 public async removeTaskFunction (name: string): Promise<boolean> {
820 if (!this.taskFunctions.has(name)) {
821 throw new Error(
822 'Cannot remove a task function not handled on the pool side'
823 )
824 }
825 const opResult = await this.sendTaskFunctionOperationToWorkers({
826 taskFunctionOperation: 'remove',
827 taskFunctionName: name
828 })
829 this.deleteTaskFunctionWorkerUsages(name)
830 this.taskFunctions.delete(name)
831 return opResult
832 }
833
834 /** @inheritDoc */
835 public listTaskFunctionNames (): string[] {
836 for (const workerNode of this.workerNodes) {
837 if (
838 Array.isArray(workerNode.info.taskFunctionNames) &&
839 workerNode.info.taskFunctionNames.length > 0
840 ) {
841 return workerNode.info.taskFunctionNames
842 }
843 }
844 return []
845 }
846
847 /** @inheritDoc */
848 public async setDefaultTaskFunction (name: string): Promise<boolean> {
849 return await this.sendTaskFunctionOperationToWorkers({
850 taskFunctionOperation: 'default',
851 taskFunctionName: name
852 })
853 }
854
855 private deleteTaskFunctionWorkerUsages (name: string): void {
856 for (const workerNode of this.workerNodes) {
857 workerNode.deleteTaskFunctionWorkerUsage(name)
858 }
859 }
860
861 private shallExecuteTask (workerNodeKey: number): boolean {
862 return (
863 this.tasksQueueSize(workerNodeKey) === 0 &&
864 this.workerNodes[workerNodeKey].usage.tasks.executing <
865 (this.opts.tasksQueueOptions?.concurrency as number)
866 )
867 }
868
869 /** @inheritDoc */
870 public async execute (
871 data?: Data,
872 name?: string,
873 transferList?: TransferListItem[]
874 ): Promise<Response> {
875 return await new Promise<Response>((resolve, reject) => {
876 if (!this.started) {
877 reject(new Error('Cannot execute a task on not started pool'))
878 return
879 }
880 if (name != null && typeof name !== 'string') {
881 reject(new TypeError('name argument must be a string'))
882 return
883 }
884 if (
885 name != null &&
886 typeof name === 'string' &&
887 name.trim().length === 0
888 ) {
889 reject(new TypeError('name argument must not be an empty string'))
890 return
891 }
892 if (transferList != null && !Array.isArray(transferList)) {
893 reject(new TypeError('transferList argument must be an array'))
894 return
895 }
896 const timestamp = performance.now()
897 const workerNodeKey = this.chooseWorkerNode()
898 const task: Task<Data> = {
899 name: name ?? DEFAULT_TASK_NAME,
900 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
901 data: data ?? ({} as Data),
902 transferList,
903 timestamp,
904 taskId: randomUUID()
905 }
906 this.promiseResponseMap.set(task.taskId as string, {
907 resolve,
908 reject,
909 workerNodeKey
910 })
911 if (
912 this.opts.enableTasksQueue === false ||
913 (this.opts.enableTasksQueue === true &&
914 this.shallExecuteTask(workerNodeKey))
915 ) {
916 this.executeTask(workerNodeKey, task)
917 } else {
918 this.enqueueTask(workerNodeKey, task)
919 }
920 })
921 }
922
923 /** @inheritdoc */
924 public start (): void {
925 this.starting = true
926 while (
927 this.workerNodes.reduce(
928 (accumulator, workerNode) =>
929 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
930 0
931 ) < this.numberOfWorkers
932 ) {
933 this.createAndSetupWorkerNode()
934 }
935 this.starting = false
936 this.started = true
937 }
938
939 /** @inheritDoc */
940 public async destroy (): Promise<void> {
941 await Promise.all(
942 this.workerNodes.map(async (_, workerNodeKey) => {
943 await this.destroyWorkerNode(workerNodeKey)
944 })
945 )
946 this.emitter?.emit(PoolEvents.destroy, this.info)
947 this.emitter?.emitDestroy()
948 this.started = false
949 }
950
951 protected async sendKillMessageToWorker (
952 workerNodeKey: number
953 ): Promise<void> {
954 await new Promise<void>((resolve, reject) => {
955 const killMessageListener = (message: MessageValue<Response>): void => {
956 this.checkMessageWorkerId(message)
957 if (message.kill === 'success') {
958 resolve()
959 } else if (message.kill === 'failure') {
960 reject(
961 new Error(
962 `Kill message handling failed on worker ${
963 message.workerId as number
964 }`
965 )
966 )
967 }
968 }
969 this.registerWorkerMessageListener(workerNodeKey, killMessageListener)
970 this.sendToWorker(workerNodeKey, { kill: true })
971 })
972 }
973
974 /**
975 * Terminates the worker node given its worker node key.
976 *
977 * @param workerNodeKey - The worker node key.
978 */
979 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
980
981 /**
982 * Setup hook to execute code before worker nodes are created in the abstract constructor.
983 * Can be overridden.
984 *
985 * @virtual
986 */
987 protected setupHook (): void {
988 /* Intentionally empty */
989 }
990
991 /**
992 * Should return whether the worker is the main worker or not.
993 */
994 protected abstract isMain (): boolean
995
996 /**
997 * Hook executed before the worker task execution.
998 * Can be overridden.
999 *
1000 * @param workerNodeKey - The worker node key.
1001 * @param task - The task to execute.
1002 */
1003 protected beforeTaskExecutionHook (
1004 workerNodeKey: number,
1005 task: Task<Data>
1006 ): void {
1007 if (this.workerNodes[workerNodeKey]?.usage != null) {
1008 const workerUsage = this.workerNodes[workerNodeKey].usage
1009 ++workerUsage.tasks.executing
1010 this.updateWaitTimeWorkerUsage(workerUsage, task)
1011 }
1012 if (
1013 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1014 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
1015 task.name as string
1016 ) != null
1017 ) {
1018 const taskFunctionWorkerUsage = this.workerNodes[
1019 workerNodeKey
1020 ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
1021 ++taskFunctionWorkerUsage.tasks.executing
1022 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
1023 }
1024 }
1025
1026 /**
1027 * Hook executed after the worker task execution.
1028 * Can be overridden.
1029 *
1030 * @param workerNodeKey - The worker node key.
1031 * @param message - The received message.
1032 */
1033 protected afterTaskExecutionHook (
1034 workerNodeKey: number,
1035 message: MessageValue<Response>
1036 ): void {
1037 if (this.workerNodes[workerNodeKey]?.usage != null) {
1038 const workerUsage = this.workerNodes[workerNodeKey].usage
1039 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
1040 this.updateRunTimeWorkerUsage(workerUsage, message)
1041 this.updateEluWorkerUsage(workerUsage, message)
1042 }
1043 if (
1044 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1045 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
1046 message.taskPerformance?.name as string
1047 ) != null
1048 ) {
1049 const taskFunctionWorkerUsage = this.workerNodes[
1050 workerNodeKey
1051 ].getTaskFunctionWorkerUsage(
1052 message.taskPerformance?.name as string
1053 ) as WorkerUsage
1054 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
1055 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
1056 this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
1057 }
1058 }
1059
1060 /**
1061 * Whether the worker node shall update its task function worker usage or not.
1062 *
1063 * @param workerNodeKey - The worker node key.
1064 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1065 */
1066 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
1067 const workerInfo = this.getWorkerInfo(workerNodeKey)
1068 return (
1069 workerInfo != null &&
1070 Array.isArray(workerInfo.taskFunctionNames) &&
1071 workerInfo.taskFunctionNames.length > 2
1072 )
1073 }
1074
1075 private updateTaskStatisticsWorkerUsage (
1076 workerUsage: WorkerUsage,
1077 message: MessageValue<Response>
1078 ): void {
1079 const workerTaskStatistics = workerUsage.tasks
1080 if (
1081 workerTaskStatistics.executing != null &&
1082 workerTaskStatistics.executing > 0
1083 ) {
1084 --workerTaskStatistics.executing
1085 }
1086 if (message.workerError == null) {
1087 ++workerTaskStatistics.executed
1088 } else {
1089 ++workerTaskStatistics.failed
1090 }
1091 }
1092
1093 private updateRunTimeWorkerUsage (
1094 workerUsage: WorkerUsage,
1095 message: MessageValue<Response>
1096 ): void {
1097 if (message.workerError != null) {
1098 return
1099 }
1100 updateMeasurementStatistics(
1101 workerUsage.runTime,
1102 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
1103 message.taskPerformance?.runTime ?? 0
1104 )
1105 }
1106
1107 private updateWaitTimeWorkerUsage (
1108 workerUsage: WorkerUsage,
1109 task: Task<Data>
1110 ): void {
1111 const timestamp = performance.now()
1112 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
1113 updateMeasurementStatistics(
1114 workerUsage.waitTime,
1115 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
1116 taskWaitTime
1117 )
1118 }
1119
1120 private updateEluWorkerUsage (
1121 workerUsage: WorkerUsage,
1122 message: MessageValue<Response>
1123 ): void {
1124 if (message.workerError != null) {
1125 return
1126 }
1127 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
1128 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
1129 updateMeasurementStatistics(
1130 workerUsage.elu.active,
1131 eluTaskStatisticsRequirements,
1132 message.taskPerformance?.elu?.active ?? 0
1133 )
1134 updateMeasurementStatistics(
1135 workerUsage.elu.idle,
1136 eluTaskStatisticsRequirements,
1137 message.taskPerformance?.elu?.idle ?? 0
1138 )
1139 if (eluTaskStatisticsRequirements.aggregate) {
1140 if (message.taskPerformance?.elu != null) {
1141 if (workerUsage.elu.utilization != null) {
1142 workerUsage.elu.utilization =
1143 (workerUsage.elu.utilization +
1144 message.taskPerformance.elu.utilization) /
1145 2
1146 } else {
1147 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
1148 }
1149 }
1150 }
1151 }
1152
1153 /**
1154 * Chooses a worker node for the next task.
1155 *
1156 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
1157 *
1158 * @returns The chosen worker node key
1159 */
1160 private chooseWorkerNode (): number {
1161 if (this.shallCreateDynamicWorker()) {
1162 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
1163 if (
1164 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1165 ) {
1166 return workerNodeKey
1167 }
1168 }
1169 return this.workerChoiceStrategyContext.execute()
1170 }
1171
1172 /**
1173 * Conditions for dynamic worker creation.
1174 *
1175 * @returns Whether to create a dynamic worker or not.
1176 */
1177 private shallCreateDynamicWorker (): boolean {
1178 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
1179 }
1180
1181 /**
1182 * Sends a message to worker given its worker node key.
1183 *
1184 * @param workerNodeKey - The worker node key.
1185 * @param message - The message.
1186 * @param transferList - The optional array of transferable objects.
1187 */
1188 protected abstract sendToWorker (
1189 workerNodeKey: number,
1190 message: MessageValue<Data>,
1191 transferList?: TransferListItem[]
1192 ): void
1193
1194 /**
1195 * Creates a new worker.
1196 *
1197 * @returns Newly created worker.
1198 */
1199 protected abstract createWorker (): Worker
1200
1201 /**
1202 * Creates a new, completely set up worker node.
1203 *
1204 * @returns New, completely set up worker node key.
1205 */
1206 protected createAndSetupWorkerNode (): number {
1207 const worker = this.createWorker()
1208
1209 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
1210 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
1211 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1212 worker.on('error', error => {
1213 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1214 const workerInfo = this.getWorkerInfo(workerNodeKey)
1215 workerInfo.ready = false
1216 this.workerNodes[workerNodeKey].closeChannel()
1217 this.emitter?.emit(PoolEvents.error, error)
1218 if (
1219 this.started &&
1220 !this.starting &&
1221 this.opts.restartWorkerOnError === true
1222 ) {
1223 if (workerInfo.dynamic) {
1224 this.createAndSetupDynamicWorkerNode()
1225 } else {
1226 this.createAndSetupWorkerNode()
1227 }
1228 }
1229 if (this.started && this.opts.enableTasksQueue === true) {
1230 this.redistributeQueuedTasks(workerNodeKey)
1231 }
1232 })
1233 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
1234 worker.once('exit', () => {
1235 this.removeWorkerNode(worker)
1236 })
1237
1238 const workerNodeKey = this.addWorkerNode(worker)
1239
1240 this.afterWorkerNodeSetup(workerNodeKey)
1241
1242 return workerNodeKey
1243 }
1244
1245 /**
1246 * Creates a new, completely set up dynamic worker node.
1247 *
1248 * @returns New, completely set up dynamic worker node key.
1249 */
1250 protected createAndSetupDynamicWorkerNode (): number {
1251 const workerNodeKey = this.createAndSetupWorkerNode()
1252 this.registerWorkerMessageListener(workerNodeKey, message => {
1253 this.checkMessageWorkerId(message)
1254 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1255 message.workerId
1256 )
1257 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
1258 // Kill message received from worker
1259 if (
1260 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1261 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
1262 ((this.opts.enableTasksQueue === false &&
1263 workerUsage.tasks.executing === 0) ||
1264 (this.opts.enableTasksQueue === true &&
1265 workerUsage.tasks.executing === 0 &&
1266 this.tasksQueueSize(localWorkerNodeKey) === 0)))
1267 ) {
1268 this.destroyWorkerNode(localWorkerNodeKey).catch(error => {
1269 this.emitter?.emit(PoolEvents.error, error)
1270 })
1271 }
1272 })
1273 const workerInfo = this.getWorkerInfo(workerNodeKey)
1274 this.sendToWorker(workerNodeKey, {
1275 checkActive: true
1276 })
1277 if (this.taskFunctions.size > 0) {
1278 for (const [taskFunctionName, taskFunction] of this.taskFunctions) {
1279 this.sendTaskFunctionOperationToWorker(workerNodeKey, {
1280 taskFunctionOperation: 'add',
1281 taskFunctionName,
1282 taskFunction: taskFunction.toString()
1283 }).catch(error => {
1284 this.emitter?.emit(PoolEvents.error, error)
1285 })
1286 }
1287 }
1288 workerInfo.dynamic = true
1289 if (
1290 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
1291 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1292 ) {
1293 workerInfo.ready = true
1294 }
1295 this.checkAndEmitDynamicWorkerCreationEvents()
1296 return workerNodeKey
1297 }
1298
1299 /**
1300 * Registers a listener callback on the worker given its worker node key.
1301 *
1302 * @param workerNodeKey - The worker node key.
1303 * @param listener - The message listener callback.
1304 */
1305 protected abstract registerWorkerMessageListener<
1306 Message extends Data | Response
1307 >(
1308 workerNodeKey: number,
1309 listener: (message: MessageValue<Message>) => void
1310 ): void
1311
1312 /**
1313 * Registers once a listener callback on the worker given its worker node key.
1314 *
1315 * @param workerNodeKey - The worker node key.
1316 * @param listener - The message listener callback.
1317 */
1318 protected abstract registerOnceWorkerMessageListener<
1319 Message extends Data | Response
1320 >(
1321 workerNodeKey: number,
1322 listener: (message: MessageValue<Message>) => void
1323 ): void
1324
1325 /**
1326 * Deregisters a listener callback on the worker given its worker node key.
1327 *
1328 * @param workerNodeKey - The worker node key.
1329 * @param listener - The message listener callback.
1330 */
1331 protected abstract deregisterWorkerMessageListener<
1332 Message extends Data | Response
1333 >(
1334 workerNodeKey: number,
1335 listener: (message: MessageValue<Message>) => void
1336 ): void
1337
1338 /**
1339 * Method hooked up after a worker node has been newly created.
1340 * Can be overridden.
1341 *
1342 * @param workerNodeKey - The newly created worker node key.
1343 */
1344 protected afterWorkerNodeSetup (workerNodeKey: number): void {
1345 // Listen to worker messages.
1346 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
1347 // Send the startup message to worker.
1348 this.sendStartupMessageToWorker(workerNodeKey)
1349 // Send the statistics message to worker.
1350 this.sendStatisticsMessageToWorker(workerNodeKey)
1351 if (this.opts.enableTasksQueue === true) {
1352 if (this.opts.tasksQueueOptions?.taskStealing === true) {
1353 this.workerNodes[workerNodeKey].onEmptyQueue =
1354 this.taskStealingOnEmptyQueue.bind(this)
1355 }
1356 if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
1357 this.workerNodes[workerNodeKey].onBackPressure =
1358 this.tasksStealingOnBackPressure.bind(this)
1359 }
1360 }
1361 }
1362
1363 /**
1364 * Sends the startup message to worker given its worker node key.
1365 *
1366 * @param workerNodeKey - The worker node key.
1367 */
1368 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1369
1370 /**
1371 * Sends the statistics message to worker given its worker node key.
1372 *
1373 * @param workerNodeKey - The worker node key.
1374 */
1375 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
1376 this.sendToWorker(workerNodeKey, {
1377 statistics: {
1378 runTime:
1379 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1380 .runTime.aggregate,
1381 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1382 .elu.aggregate
1383 }
1384 })
1385 }
1386
1387 private redistributeQueuedTasks (workerNodeKey: number): void {
1388 while (this.tasksQueueSize(workerNodeKey) > 0) {
1389 const destinationWorkerNodeKey = this.workerNodes.reduce(
1390 (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
1391 return workerNode.info.ready &&
1392 workerNode.usage.tasks.queued <
1393 workerNodes[minWorkerNodeKey].usage.tasks.queued
1394 ? workerNodeKey
1395 : minWorkerNodeKey
1396 },
1397 0
1398 )
1399 const task = this.dequeueTask(workerNodeKey) as Task<Data>
1400 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1401 this.executeTask(destinationWorkerNodeKey, task)
1402 } else {
1403 this.enqueueTask(destinationWorkerNodeKey, task)
1404 }
1405 }
1406 }
1407
1408 private updateTaskStolenStatisticsWorkerUsage (
1409 workerNodeKey: number,
1410 taskName: string
1411 ): void {
1412 const workerNode = this.workerNodes[workerNodeKey]
1413 if (workerNode?.usage != null) {
1414 ++workerNode.usage.tasks.stolen
1415 }
1416 if (
1417 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1418 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1419 ) {
1420 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1421 taskName
1422 ) as WorkerUsage
1423 ++taskFunctionWorkerUsage.tasks.stolen
1424 }
1425 }
1426
1427 private taskStealingOnEmptyQueue (workerId: number): void {
1428 const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
1429 const workerNodes = this.workerNodes
1430 .slice()
1431 .sort(
1432 (workerNodeA, workerNodeB) =>
1433 workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
1434 )
1435 const sourceWorkerNode = workerNodes.find(
1436 workerNode =>
1437 workerNode.info.ready &&
1438 workerNode.info.id !== workerId &&
1439 workerNode.usage.tasks.queued > 0
1440 )
1441 if (sourceWorkerNode != null) {
1442 const task = sourceWorkerNode.popTask() as Task<Data>
1443 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1444 this.executeTask(destinationWorkerNodeKey, task)
1445 } else {
1446 this.enqueueTask(destinationWorkerNodeKey, task)
1447 }
1448 this.updateTaskStolenStatisticsWorkerUsage(
1449 destinationWorkerNodeKey,
1450 task.name as string
1451 )
1452 }
1453 }
1454
1455 private tasksStealingOnBackPressure (workerId: number): void {
1456 const sizeOffset = 1
1457 if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) {
1458 return
1459 }
1460 const sourceWorkerNode =
1461 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
1462 const workerNodes = this.workerNodes
1463 .slice()
1464 .sort(
1465 (workerNodeA, workerNodeB) =>
1466 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1467 )
1468 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1469 if (
1470 sourceWorkerNode.usage.tasks.queued > 0 &&
1471 workerNode.info.ready &&
1472 workerNode.info.id !== workerId &&
1473 workerNode.usage.tasks.queued <
1474 (this.opts.tasksQueueOptions?.size as number) - sizeOffset
1475 ) {
1476 const task = sourceWorkerNode.popTask() as Task<Data>
1477 if (this.shallExecuteTask(workerNodeKey)) {
1478 this.executeTask(workerNodeKey, task)
1479 } else {
1480 this.enqueueTask(workerNodeKey, task)
1481 }
1482 this.updateTaskStolenStatisticsWorkerUsage(
1483 workerNodeKey,
1484 task.name as string
1485 )
1486 }
1487 }
1488 }
1489
1490 /**
1491 * This method is the listener registered for each worker message.
1492 *
1493 * @returns The listener function to execute when a message is received from a worker.
1494 */
1495 protected workerListener (): (message: MessageValue<Response>) => void {
1496 return message => {
1497 this.checkMessageWorkerId(message)
1498 if (message.ready != null && message.taskFunctionNames != null) {
1499 // Worker ready response received from worker
1500 this.handleWorkerReadyResponse(message)
1501 } else if (message.taskId != null) {
1502 // Task execution response received from worker
1503 this.handleTaskExecutionResponse(message)
1504 } else if (message.taskFunctionNames != null) {
1505 // Task function names message received from worker
1506 this.getWorkerInfo(
1507 this.getWorkerNodeKeyByWorkerId(message.workerId)
1508 ).taskFunctionNames = message.taskFunctionNames
1509 }
1510 }
1511 }
1512
1513 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
1514 if (message.ready === false) {
1515 throw new Error(
1516 `Worker ${message.workerId as number} failed to initialize`
1517 )
1518 }
1519 const workerInfo = this.getWorkerInfo(
1520 this.getWorkerNodeKeyByWorkerId(message.workerId)
1521 )
1522 workerInfo.ready = message.ready as boolean
1523 workerInfo.taskFunctionNames = message.taskFunctionNames
1524 if (this.ready) {
1525 this.emitter?.emit(PoolEvents.ready, this.info)
1526 }
1527 }
1528
1529 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1530 const { taskId, workerError, data } = message
1531 const promiseResponse = this.promiseResponseMap.get(taskId as string)
1532 if (promiseResponse != null) {
1533 if (workerError != null) {
1534 this.emitter?.emit(PoolEvents.taskError, workerError)
1535 promiseResponse.reject(workerError.message)
1536 } else {
1537 promiseResponse.resolve(data as Response)
1538 }
1539 const workerNodeKey = promiseResponse.workerNodeKey
1540 this.afterTaskExecutionHook(workerNodeKey, message)
1541 this.workerChoiceStrategyContext.update(workerNodeKey)
1542 this.promiseResponseMap.delete(taskId as string)
1543 if (
1544 this.opts.enableTasksQueue === true &&
1545 this.tasksQueueSize(workerNodeKey) > 0 &&
1546 this.workerNodes[workerNodeKey].usage.tasks.executing <
1547 (this.opts.tasksQueueOptions?.concurrency as number)
1548 ) {
1549 this.executeTask(
1550 workerNodeKey,
1551 this.dequeueTask(workerNodeKey) as Task<Data>
1552 )
1553 }
1554 }
1555 }
1556
1557 private checkAndEmitTaskExecutionEvents (): void {
1558 if (this.busy) {
1559 this.emitter?.emit(PoolEvents.busy, this.info)
1560 }
1561 }
1562
1563 private checkAndEmitTaskQueuingEvents (): void {
1564 if (this.hasBackPressure()) {
1565 this.emitter?.emit(PoolEvents.backPressure, this.info)
1566 }
1567 }
1568
1569 private checkAndEmitDynamicWorkerCreationEvents (): void {
1570 if (this.type === PoolTypes.dynamic) {
1571 if (this.full) {
1572 this.emitter?.emit(PoolEvents.full, this.info)
1573 }
1574 }
1575 }
1576
1577 /**
1578 * Gets the worker information given its worker node key.
1579 *
1580 * @param workerNodeKey - The worker node key.
1581 * @returns The worker information.
1582 */
1583 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
1584 return this.workerNodes[workerNodeKey].info
1585 }
1586
1587 /**
1588 * Adds the given worker in the pool worker nodes.
1589 *
1590 * @param worker - The worker.
1591 * @returns The added worker node key.
1592 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1593 */
1594 private addWorkerNode (worker: Worker): number {
1595 const workerNode = new WorkerNode<Worker, Data>(
1596 worker,
1597 this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2)
1598 )
1599 // Flag the worker node as ready at pool startup.
1600 if (this.starting) {
1601 workerNode.info.ready = true
1602 }
1603 this.workerNodes.push(workerNode)
1604 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1605 if (workerNodeKey === -1) {
1606 throw new Error('Worker added not found in worker nodes')
1607 }
1608 return workerNodeKey
1609 }
1610
1611 /**
1612 * Removes the given worker from the pool worker nodes.
1613 *
1614 * @param worker - The worker.
1615 */
1616 private removeWorkerNode (worker: Worker): void {
1617 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1618 if (workerNodeKey !== -1) {
1619 this.workerNodes.splice(workerNodeKey, 1)
1620 this.workerChoiceStrategyContext.remove(workerNodeKey)
1621 }
1622 }
1623
1624 /** @inheritDoc */
1625 public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
1626 return (
1627 this.opts.enableTasksQueue === true &&
1628 this.workerNodes[workerNodeKey].hasBackPressure()
1629 )
1630 }
1631
1632 private hasBackPressure (): boolean {
1633 return (
1634 this.opts.enableTasksQueue === true &&
1635 this.workerNodes.findIndex(
1636 workerNode => !workerNode.hasBackPressure()
1637 ) === -1
1638 )
1639 }
1640
1641 /**
1642 * Executes the given task on the worker given its worker node key.
1643 *
1644 * @param workerNodeKey - The worker node key.
1645 * @param task - The task to execute.
1646 */
1647 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1648 this.beforeTaskExecutionHook(workerNodeKey, task)
1649 this.sendToWorker(workerNodeKey, task, task.transferList)
1650 this.checkAndEmitTaskExecutionEvents()
1651 }
1652
1653 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1654 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1655 this.checkAndEmitTaskQueuingEvents()
1656 return tasksQueueSize
1657 }
1658
1659 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1660 return this.workerNodes[workerNodeKey].dequeueTask()
1661 }
1662
1663 private tasksQueueSize (workerNodeKey: number): number {
1664 return this.workerNodes[workerNodeKey].tasksQueueSize()
1665 }
1666
1667 protected flushTasksQueue (workerNodeKey: number): void {
1668 while (this.tasksQueueSize(workerNodeKey) > 0) {
1669 this.executeTask(
1670 workerNodeKey,
1671 this.dequeueTask(workerNodeKey) as Task<Data>
1672 )
1673 }
1674 this.workerNodes[workerNodeKey].clearTasksQueue()
1675 }
1676
1677 private flushTasksQueues (): void {
1678 for (const [workerNodeKey] of this.workerNodes.entries()) {
1679 this.flushTasksQueue(workerNodeKey)
1680 }
1681 }
1682 }