refactor: improve error reporting at task functions handling
[poolifier.git] / src / pools / abstract-pool.ts
1 import { randomUUID } from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import { existsSync } from 'node:fs'
4 import { type TransferListItem } from 'node:worker_threads'
5 import type {
6 MessageValue,
7 PromiseResponseWrapper,
8 Task
9 } from '../utility-types'
10 import {
11 DEFAULT_TASK_NAME,
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
13 EMPTY_FUNCTION,
14 average,
15 isKillBehavior,
16 isPlainObject,
17 max,
18 median,
19 min,
20 round,
21 updateMeasurementStatistics
22 } from '../utils'
23 import { KillBehaviors } from '../worker/worker-options'
24 import type { TaskFunction } from '../worker/task-functions'
25 import {
26 type IPool,
27 PoolEmitter,
28 PoolEvents,
29 type PoolInfo,
30 type PoolOptions,
31 type PoolType,
32 PoolTypes,
33 type TasksQueueOptions
34 } from './pool'
35 import type {
36 IWorker,
37 IWorkerNode,
38 WorkerInfo,
39 WorkerType,
40 WorkerUsage
41 } from './worker'
42 import {
43 type MeasurementStatisticsRequirements,
44 Measurements,
45 WorkerChoiceStrategies,
46 type WorkerChoiceStrategy,
47 type WorkerChoiceStrategyOptions
48 } from './selection-strategies/selection-strategies-types'
49 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
50 import { version } from './version'
51 import { WorkerNode } from './worker-node'
52
53 /**
54 * Base class that implements some shared logic for all poolifier pools.
55 *
56 * @typeParam Worker - Type of worker which manages this pool.
57 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
58 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
59 */
60 export abstract class AbstractPool<
61 Worker extends IWorker,
62 Data = unknown,
63 Response = unknown
64 > implements IPool<Worker, Data, Response> {
65 /** @inheritDoc */
66 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
67
68 /** @inheritDoc */
69 public readonly emitter?: PoolEmitter
70
71 /**
72 * The task execution response promise map:
73 * - `key`: The message id of each submitted task.
74 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
75 *
76 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
77 */
78 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
79 new Map<string, PromiseResponseWrapper<Response>>()
80
81 /**
82 * Worker choice strategy context referencing a worker choice algorithm implementation.
83 */
84 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
85 Worker,
86 Data,
87 Response
88 >
89
90 /**
91 * Dynamic pool maximum size property placeholder.
92 */
93 protected readonly max?: number
94
95 /**
96 * The task functions added at runtime map:
97 * - `key`: The task function name.
98 * - `value`: The task function itself.
99 */
100 private readonly taskFunctions: Map<string, TaskFunction<Data, Response>>
101
102 /**
103 * Whether the pool is started or not.
104 */
105 private started: boolean
106 /**
107 * Whether the pool is starting or not.
108 */
109 private starting: boolean
110 /**
111 * The start timestamp of the pool.
112 */
113 private readonly startTimestamp
114
115 /**
116 * Constructs a new poolifier pool.
117 *
118 * @param numberOfWorkers - Number of workers that this pool should manage.
119 * @param filePath - Path to the worker file.
120 * @param opts - Options for the pool.
121 */
122 public constructor (
123 protected readonly numberOfWorkers: number,
124 protected readonly filePath: string,
125 protected readonly opts: PoolOptions<Worker>
126 ) {
127 if (!this.isMain()) {
128 throw new Error(
129 'Cannot start a pool from a worker with the same type as the pool'
130 )
131 }
132 this.checkNumberOfWorkers(this.numberOfWorkers)
133 this.checkFilePath(this.filePath)
134 this.checkPoolOptions(this.opts)
135
136 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
137 this.executeTask = this.executeTask.bind(this)
138 this.enqueueTask = this.enqueueTask.bind(this)
139
140 if (this.opts.enableEvents === true) {
141 this.emitter = new PoolEmitter()
142 }
143 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
144 Worker,
145 Data,
146 Response
147 >(
148 this,
149 this.opts.workerChoiceStrategy,
150 this.opts.workerChoiceStrategyOptions
151 )
152
153 this.setupHook()
154
155 this.taskFunctions = new Map<string, TaskFunction<Data, Response>>()
156
157 this.started = false
158 this.starting = false
159 if (this.opts.startWorkers === true) {
160 this.start()
161 }
162
163 this.startTimestamp = performance.now()
164 }
165
166 private checkFilePath (filePath: string): void {
167 if (
168 filePath == null ||
169 typeof filePath !== 'string' ||
170 (typeof filePath === 'string' && filePath.trim().length === 0)
171 ) {
172 throw new Error('Please specify a file with a worker implementation')
173 }
174 if (!existsSync(filePath)) {
175 throw new Error(`Cannot find the worker file '${filePath}'`)
176 }
177 }
178
179 private checkNumberOfWorkers (numberOfWorkers: number): void {
180 if (numberOfWorkers == null) {
181 throw new Error(
182 'Cannot instantiate a pool without specifying the number of workers'
183 )
184 } else if (!Number.isSafeInteger(numberOfWorkers)) {
185 throw new TypeError(
186 'Cannot instantiate a pool with a non safe integer number of workers'
187 )
188 } else if (numberOfWorkers < 0) {
189 throw new RangeError(
190 'Cannot instantiate a pool with a negative number of workers'
191 )
192 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
193 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
194 }
195 }
196
197 protected checkDynamicPoolSize (min: number, max: number): void {
198 if (this.type === PoolTypes.dynamic) {
199 if (max == null) {
200 throw new TypeError(
201 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
202 )
203 } else if (!Number.isSafeInteger(max)) {
204 throw new TypeError(
205 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
206 )
207 } else if (min > max) {
208 throw new RangeError(
209 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
210 )
211 } else if (max === 0) {
212 throw new RangeError(
213 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
214 )
215 } else if (min === max) {
216 throw new RangeError(
217 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
218 )
219 }
220 }
221 }
222
223 private checkPoolOptions (opts: PoolOptions<Worker>): void {
224 if (isPlainObject(opts)) {
225 this.opts.startWorkers = opts.startWorkers ?? true
226 this.checkValidWorkerChoiceStrategy(
227 opts.workerChoiceStrategy as WorkerChoiceStrategy
228 )
229 this.opts.workerChoiceStrategy =
230 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
231 this.checkValidWorkerChoiceStrategyOptions(
232 opts.workerChoiceStrategyOptions as WorkerChoiceStrategyOptions
233 )
234 this.opts.workerChoiceStrategyOptions = {
235 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
236 ...opts.workerChoiceStrategyOptions
237 }
238 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
239 this.opts.enableEvents = opts.enableEvents ?? true
240 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
241 if (this.opts.enableTasksQueue) {
242 this.checkValidTasksQueueOptions(
243 opts.tasksQueueOptions as TasksQueueOptions
244 )
245 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
246 opts.tasksQueueOptions as TasksQueueOptions
247 )
248 }
249 } else {
250 throw new TypeError('Invalid pool options: must be a plain object')
251 }
252 }
253
254 private checkValidWorkerChoiceStrategy (
255 workerChoiceStrategy: WorkerChoiceStrategy
256 ): void {
257 if (
258 workerChoiceStrategy != null &&
259 !Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)
260 ) {
261 throw new Error(
262 `Invalid worker choice strategy '${workerChoiceStrategy}'`
263 )
264 }
265 }
266
267 private checkValidWorkerChoiceStrategyOptions (
268 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
269 ): void {
270 if (
271 workerChoiceStrategyOptions != null &&
272 !isPlainObject(workerChoiceStrategyOptions)
273 ) {
274 throw new TypeError(
275 'Invalid worker choice strategy options: must be a plain object'
276 )
277 }
278 if (
279 workerChoiceStrategyOptions?.retries != null &&
280 !Number.isSafeInteger(workerChoiceStrategyOptions.retries)
281 ) {
282 throw new TypeError(
283 'Invalid worker choice strategy options: retries must be an integer'
284 )
285 }
286 if (
287 workerChoiceStrategyOptions?.retries != null &&
288 workerChoiceStrategyOptions.retries < 0
289 ) {
290 throw new RangeError(
291 `Invalid worker choice strategy options: retries '${workerChoiceStrategyOptions.retries}' must be greater or equal than zero`
292 )
293 }
294 if (
295 workerChoiceStrategyOptions?.weights != null &&
296 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
297 ) {
298 throw new Error(
299 'Invalid worker choice strategy options: must have a weight for each worker node'
300 )
301 }
302 if (
303 workerChoiceStrategyOptions?.measurement != null &&
304 !Object.values(Measurements).includes(
305 workerChoiceStrategyOptions.measurement
306 )
307 ) {
308 throw new Error(
309 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
310 )
311 }
312 }
313
314 private checkValidTasksQueueOptions (
315 tasksQueueOptions: TasksQueueOptions
316 ): void {
317 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
318 throw new TypeError('Invalid tasks queue options: must be a plain object')
319 }
320 if (
321 tasksQueueOptions?.concurrency != null &&
322 !Number.isSafeInteger(tasksQueueOptions.concurrency)
323 ) {
324 throw new TypeError(
325 'Invalid worker node tasks concurrency: must be an integer'
326 )
327 }
328 if (
329 tasksQueueOptions?.concurrency != null &&
330 tasksQueueOptions.concurrency <= 0
331 ) {
332 throw new RangeError(
333 `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero`
334 )
335 }
336 if (
337 tasksQueueOptions?.size != null &&
338 !Number.isSafeInteger(tasksQueueOptions.size)
339 ) {
340 throw new TypeError(
341 'Invalid worker node tasks queue size: must be an integer'
342 )
343 }
344 if (tasksQueueOptions?.size != null && tasksQueueOptions.size <= 0) {
345 throw new RangeError(
346 `Invalid worker node tasks queue size: ${tasksQueueOptions.size} is a negative integer or zero`
347 )
348 }
349 }
350
351 /** @inheritDoc */
352 public get info (): PoolInfo {
353 return {
354 version,
355 type: this.type,
356 worker: this.worker,
357 started: this.started,
358 ready: this.ready,
359 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
360 minSize: this.minSize,
361 maxSize: this.maxSize,
362 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
363 .runTime.aggregate &&
364 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
365 .waitTime.aggregate && { utilization: round(this.utilization) }),
366 workerNodes: this.workerNodes.length,
367 idleWorkerNodes: this.workerNodes.reduce(
368 (accumulator, workerNode) =>
369 workerNode.usage.tasks.executing === 0
370 ? accumulator + 1
371 : accumulator,
372 0
373 ),
374 busyWorkerNodes: this.workerNodes.reduce(
375 (accumulator, workerNode) =>
376 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
377 0
378 ),
379 executedTasks: this.workerNodes.reduce(
380 (accumulator, workerNode) =>
381 accumulator + workerNode.usage.tasks.executed,
382 0
383 ),
384 executingTasks: this.workerNodes.reduce(
385 (accumulator, workerNode) =>
386 accumulator + workerNode.usage.tasks.executing,
387 0
388 ),
389 ...(this.opts.enableTasksQueue === true && {
390 queuedTasks: this.workerNodes.reduce(
391 (accumulator, workerNode) =>
392 accumulator + workerNode.usage.tasks.queued,
393 0
394 )
395 }),
396 ...(this.opts.enableTasksQueue === true && {
397 maxQueuedTasks: this.workerNodes.reduce(
398 (accumulator, workerNode) =>
399 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
400 0
401 )
402 }),
403 ...(this.opts.enableTasksQueue === true && {
404 backPressure: this.hasBackPressure()
405 }),
406 ...(this.opts.enableTasksQueue === true && {
407 stolenTasks: this.workerNodes.reduce(
408 (accumulator, workerNode) =>
409 accumulator + workerNode.usage.tasks.stolen,
410 0
411 )
412 }),
413 failedTasks: this.workerNodes.reduce(
414 (accumulator, workerNode) =>
415 accumulator + workerNode.usage.tasks.failed,
416 0
417 ),
418 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
419 .runTime.aggregate && {
420 runTime: {
421 minimum: round(
422 min(
423 ...this.workerNodes.map(
424 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
425 )
426 )
427 ),
428 maximum: round(
429 max(
430 ...this.workerNodes.map(
431 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
432 )
433 )
434 ),
435 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
436 .runTime.average && {
437 average: round(
438 average(
439 this.workerNodes.reduce<number[]>(
440 (accumulator, workerNode) =>
441 accumulator.concat(workerNode.usage.runTime.history),
442 []
443 )
444 )
445 )
446 }),
447 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
448 .runTime.median && {
449 median: round(
450 median(
451 this.workerNodes.reduce<number[]>(
452 (accumulator, workerNode) =>
453 accumulator.concat(workerNode.usage.runTime.history),
454 []
455 )
456 )
457 )
458 })
459 }
460 }),
461 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
462 .waitTime.aggregate && {
463 waitTime: {
464 minimum: round(
465 min(
466 ...this.workerNodes.map(
467 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
468 )
469 )
470 ),
471 maximum: round(
472 max(
473 ...this.workerNodes.map(
474 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
475 )
476 )
477 ),
478 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
479 .waitTime.average && {
480 average: round(
481 average(
482 this.workerNodes.reduce<number[]>(
483 (accumulator, workerNode) =>
484 accumulator.concat(workerNode.usage.waitTime.history),
485 []
486 )
487 )
488 )
489 }),
490 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
491 .waitTime.median && {
492 median: round(
493 median(
494 this.workerNodes.reduce<number[]>(
495 (accumulator, workerNode) =>
496 accumulator.concat(workerNode.usage.waitTime.history),
497 []
498 )
499 )
500 )
501 })
502 }
503 })
504 }
505 }
506
507 /**
508 * The pool readiness boolean status.
509 */
510 private get ready (): boolean {
511 return (
512 this.workerNodes.reduce(
513 (accumulator, workerNode) =>
514 !workerNode.info.dynamic && workerNode.info.ready
515 ? accumulator + 1
516 : accumulator,
517 0
518 ) >= this.minSize
519 )
520 }
521
522 /**
523 * The approximate pool utilization.
524 *
525 * @returns The pool utilization.
526 */
527 private get utilization (): number {
528 const poolTimeCapacity =
529 (performance.now() - this.startTimestamp) * this.maxSize
530 const totalTasksRunTime = this.workerNodes.reduce(
531 (accumulator, workerNode) =>
532 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
533 0
534 )
535 const totalTasksWaitTime = this.workerNodes.reduce(
536 (accumulator, workerNode) =>
537 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
538 0
539 )
540 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
541 }
542
543 /**
544 * The pool type.
545 *
546 * If it is `'dynamic'`, it provides the `max` property.
547 */
548 protected abstract get type (): PoolType
549
550 /**
551 * The worker type.
552 */
553 protected abstract get worker (): WorkerType
554
555 /**
556 * The pool minimum size.
557 */
558 protected get minSize (): number {
559 return this.numberOfWorkers
560 }
561
562 /**
563 * The pool maximum size.
564 */
565 protected get maxSize (): number {
566 return this.max ?? this.numberOfWorkers
567 }
568
569 /**
570 * Checks if the worker id sent in the received message from a worker is valid.
571 *
572 * @param message - The received message.
573 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
574 */
575 private checkMessageWorkerId (message: MessageValue<Response>): void {
576 if (message.workerId == null) {
577 throw new Error('Worker message received without worker id')
578 } else if (
579 message.workerId != null &&
580 this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
581 ) {
582 throw new Error(
583 `Worker message received from unknown worker '${message.workerId}'`
584 )
585 }
586 }
587
588 /**
589 * Gets the given worker its worker node key.
590 *
591 * @param worker - The worker.
592 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
593 */
594 private getWorkerNodeKeyByWorker (worker: Worker): number {
595 return this.workerNodes.findIndex(
596 workerNode => workerNode.worker === worker
597 )
598 }
599
600 /**
601 * Gets the worker node key given its worker id.
602 *
603 * @param workerId - The worker id.
604 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
605 */
606 private getWorkerNodeKeyByWorkerId (workerId: number | undefined): number {
607 return this.workerNodes.findIndex(
608 workerNode => workerNode.info.id === workerId
609 )
610 }
611
612 /** @inheritDoc */
613 public setWorkerChoiceStrategy (
614 workerChoiceStrategy: WorkerChoiceStrategy,
615 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
616 ): void {
617 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
618 this.opts.workerChoiceStrategy = workerChoiceStrategy
619 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
620 this.opts.workerChoiceStrategy
621 )
622 if (workerChoiceStrategyOptions != null) {
623 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
624 }
625 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
626 workerNode.resetUsage()
627 this.sendStatisticsMessageToWorker(workerNodeKey)
628 }
629 }
630
631 /** @inheritDoc */
632 public setWorkerChoiceStrategyOptions (
633 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
634 ): void {
635 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
636 this.opts.workerChoiceStrategyOptions = {
637 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
638 ...workerChoiceStrategyOptions
639 }
640 this.workerChoiceStrategyContext.setOptions(
641 this.opts.workerChoiceStrategyOptions
642 )
643 }
644
645 /** @inheritDoc */
646 public enableTasksQueue (
647 enable: boolean,
648 tasksQueueOptions?: TasksQueueOptions
649 ): void {
650 if (this.opts.enableTasksQueue === true && !enable) {
651 this.unsetTaskStealing()
652 this.unsetTasksStealingOnBackPressure()
653 this.flushTasksQueues()
654 }
655 this.opts.enableTasksQueue = enable
656 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
657 }
658
659 /** @inheritDoc */
660 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
661 if (this.opts.enableTasksQueue === true) {
662 this.checkValidTasksQueueOptions(tasksQueueOptions)
663 this.opts.tasksQueueOptions =
664 this.buildTasksQueueOptions(tasksQueueOptions)
665 this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number)
666 if (this.opts.tasksQueueOptions.taskStealing === true) {
667 this.setTaskStealing()
668 } else {
669 this.unsetTaskStealing()
670 }
671 if (this.opts.tasksQueueOptions.tasksStealingOnBackPressure === true) {
672 this.setTasksStealingOnBackPressure()
673 } else {
674 this.unsetTasksStealingOnBackPressure()
675 }
676 } else if (this.opts.tasksQueueOptions != null) {
677 delete this.opts.tasksQueueOptions
678 }
679 }
680
681 private buildTasksQueueOptions (
682 tasksQueueOptions: TasksQueueOptions
683 ): TasksQueueOptions {
684 return {
685 ...{
686 size: Math.pow(this.maxSize, 2),
687 concurrency: 1,
688 taskStealing: true,
689 tasksStealingOnBackPressure: true
690 },
691 ...tasksQueueOptions
692 }
693 }
694
695 private setTasksQueueSize (size: number): void {
696 for (const workerNode of this.workerNodes) {
697 workerNode.tasksQueueBackPressureSize = size
698 }
699 }
700
701 private setTaskStealing (): void {
702 for (const [workerNodeKey] of this.workerNodes.entries()) {
703 this.workerNodes[workerNodeKey].onEmptyQueue =
704 this.taskStealingOnEmptyQueue.bind(this)
705 }
706 }
707
708 private unsetTaskStealing (): void {
709 for (const [workerNodeKey] of this.workerNodes.entries()) {
710 delete this.workerNodes[workerNodeKey].onEmptyQueue
711 }
712 }
713
714 private setTasksStealingOnBackPressure (): void {
715 for (const [workerNodeKey] of this.workerNodes.entries()) {
716 this.workerNodes[workerNodeKey].onBackPressure =
717 this.tasksStealingOnBackPressure.bind(this)
718 }
719 }
720
721 private unsetTasksStealingOnBackPressure (): void {
722 for (const [workerNodeKey] of this.workerNodes.entries()) {
723 delete this.workerNodes[workerNodeKey].onBackPressure
724 }
725 }
726
727 /**
728 * Whether the pool is full or not.
729 *
730 * The pool filling boolean status.
731 */
732 protected get full (): boolean {
733 return this.workerNodes.length >= this.maxSize
734 }
735
736 /**
737 * Whether the pool is busy or not.
738 *
739 * The pool busyness boolean status.
740 */
741 protected abstract get busy (): boolean
742
743 /**
744 * Whether worker nodes are executing concurrently their tasks quota or not.
745 *
746 * @returns Worker nodes busyness boolean status.
747 */
748 protected internalBusy (): boolean {
749 if (this.opts.enableTasksQueue === true) {
750 return (
751 this.workerNodes.findIndex(
752 workerNode =>
753 workerNode.info.ready &&
754 workerNode.usage.tasks.executing <
755 (this.opts.tasksQueueOptions?.concurrency as number)
756 ) === -1
757 )
758 }
759 return (
760 this.workerNodes.findIndex(
761 workerNode =>
762 workerNode.info.ready && workerNode.usage.tasks.executing === 0
763 ) === -1
764 )
765 }
766
767 private async sendTaskFunctionOperationToWorker (
768 workerNodeKey: number,
769 message: MessageValue<Data>
770 ): Promise<boolean> {
771 return await new Promise<boolean>((resolve, reject) => {
772 const workerId = this.getWorkerInfo(workerNodeKey).id as number
773 this.registerWorkerMessageListener(workerNodeKey, message => {
774 if (
775 message.workerId === workerId &&
776 message.taskFunctionOperationStatus === true
777 ) {
778 resolve(true)
779 } else if (
780 message.workerId === workerId &&
781 message.taskFunctionOperationStatus === false
782 ) {
783 reject(
784 new Error(
785 `Task function operation '${
786 message.taskFunctionOperation as string
787 }' failed on worker ${message.workerId} with error: '${
788 message.workerError?.message as string
789 }'`
790 )
791 )
792 }
793 })
794 this.sendToWorker(workerNodeKey, message)
795 })
796 }
797
798 private async sendTaskFunctionOperationToWorkers (
799 message: MessageValue<Data>
800 ): Promise<boolean> {
801 return await new Promise<boolean>((resolve, reject) => {
802 const responsesReceived = new Array<MessageValue<Data | Response>>()
803 for (const [workerNodeKey] of this.workerNodes.entries()) {
804 this.registerWorkerMessageListener(workerNodeKey, message => {
805 if (message.taskFunctionOperationStatus != null) {
806 responsesReceived.push(message)
807 if (
808 responsesReceived.length === this.workerNodes.length &&
809 responsesReceived.every(
810 message => message.taskFunctionOperationStatus === true
811 )
812 ) {
813 resolve(true)
814 } else if (
815 responsesReceived.length === this.workerNodes.length &&
816 responsesReceived.some(
817 message => message.taskFunctionOperationStatus === false
818 )
819 ) {
820 const errorResponse = responsesReceived.find(
821 response => response.taskFunctionOperationStatus === false
822 )
823 reject(
824 new Error(
825 `Task function operation '${
826 message.taskFunctionOperation as string
827 }' failed on worker ${
828 errorResponse?.workerId as number
829 } with error: '${
830 errorResponse?.workerError?.message as string
831 }'`
832 )
833 )
834 }
835 }
836 })
837 this.sendToWorker(workerNodeKey, message)
838 }
839 })
840 }
841
842 /** @inheritDoc */
843 public hasTaskFunction (name: string): boolean {
844 for (const workerNode of this.workerNodes) {
845 if (
846 Array.isArray(workerNode.info.taskFunctionNames) &&
847 workerNode.info.taskFunctionNames.includes(name)
848 ) {
849 return true
850 }
851 }
852 return false
853 }
854
855 /** @inheritDoc */
856 public async addTaskFunction (
857 name: string,
858 fn: TaskFunction<Data, Response>
859 ): Promise<boolean> {
860 if (typeof name !== 'string') {
861 throw new TypeError('name argument must be a string')
862 }
863 if (typeof name === 'string' && name.trim().length === 0) {
864 throw new TypeError('name argument must not be an empty string')
865 }
866 if (typeof fn !== 'function') {
867 throw new TypeError('fn argument must be a function')
868 }
869 const opResult = await this.sendTaskFunctionOperationToWorkers({
870 taskFunctionOperation: 'add',
871 taskFunctionName: name,
872 taskFunction: fn.toString()
873 })
874 this.taskFunctions.set(name, fn)
875 return opResult
876 }
877
878 /** @inheritDoc */
879 public async removeTaskFunction (name: string): Promise<boolean> {
880 if (!this.taskFunctions.has(name)) {
881 throw new Error(
882 'Cannot remove a task function not handled on the pool side'
883 )
884 }
885 const opResult = await this.sendTaskFunctionOperationToWorkers({
886 taskFunctionOperation: 'remove',
887 taskFunctionName: name
888 })
889 this.deleteTaskFunctionWorkerUsages(name)
890 this.taskFunctions.delete(name)
891 return opResult
892 }
893
894 /** @inheritDoc */
895 public listTaskFunctionNames (): string[] {
896 for (const workerNode of this.workerNodes) {
897 if (
898 Array.isArray(workerNode.info.taskFunctionNames) &&
899 workerNode.info.taskFunctionNames.length > 0
900 ) {
901 return workerNode.info.taskFunctionNames
902 }
903 }
904 return []
905 }
906
907 /** @inheritDoc */
908 public async setDefaultTaskFunction (name: string): Promise<boolean> {
909 return await this.sendTaskFunctionOperationToWorkers({
910 taskFunctionOperation: 'default',
911 taskFunctionName: name
912 })
913 }
914
915 private deleteTaskFunctionWorkerUsages (name: string): void {
916 for (const workerNode of this.workerNodes) {
917 workerNode.deleteTaskFunctionWorkerUsage(name)
918 }
919 }
920
921 private shallExecuteTask (workerNodeKey: number): boolean {
922 return (
923 this.tasksQueueSize(workerNodeKey) === 0 &&
924 this.workerNodes[workerNodeKey].usage.tasks.executing <
925 (this.opts.tasksQueueOptions?.concurrency as number)
926 )
927 }
928
929 /** @inheritDoc */
930 public async execute (
931 data?: Data,
932 name?: string,
933 transferList?: TransferListItem[]
934 ): Promise<Response> {
935 return await new Promise<Response>((resolve, reject) => {
936 if (!this.started) {
937 reject(new Error('Cannot execute a task on not started pool'))
938 return
939 }
940 if (name != null && typeof name !== 'string') {
941 reject(new TypeError('name argument must be a string'))
942 return
943 }
944 if (
945 name != null &&
946 typeof name === 'string' &&
947 name.trim().length === 0
948 ) {
949 reject(new TypeError('name argument must not be an empty string'))
950 return
951 }
952 if (transferList != null && !Array.isArray(transferList)) {
953 reject(new TypeError('transferList argument must be an array'))
954 return
955 }
956 const timestamp = performance.now()
957 const workerNodeKey = this.chooseWorkerNode()
958 const task: Task<Data> = {
959 name: name ?? DEFAULT_TASK_NAME,
960 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
961 data: data ?? ({} as Data),
962 transferList,
963 timestamp,
964 taskId: randomUUID()
965 }
966 this.promiseResponseMap.set(task.taskId as string, {
967 resolve,
968 reject,
969 workerNodeKey
970 })
971 if (
972 this.opts.enableTasksQueue === false ||
973 (this.opts.enableTasksQueue === true &&
974 this.shallExecuteTask(workerNodeKey))
975 ) {
976 this.executeTask(workerNodeKey, task)
977 } else {
978 this.enqueueTask(workerNodeKey, task)
979 }
980 })
981 }
982
983 /** @inheritdoc */
984 public start (): void {
985 this.starting = true
986 while (
987 this.workerNodes.reduce(
988 (accumulator, workerNode) =>
989 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
990 0
991 ) < this.numberOfWorkers
992 ) {
993 this.createAndSetupWorkerNode()
994 }
995 this.starting = false
996 this.started = true
997 }
998
999 /** @inheritDoc */
1000 public async destroy (): Promise<void> {
1001 await Promise.all(
1002 this.workerNodes.map(async (_, workerNodeKey) => {
1003 await this.destroyWorkerNode(workerNodeKey)
1004 })
1005 )
1006 this.emitter?.emit(PoolEvents.destroy, this.info)
1007 this.started = false
1008 }
1009
1010 protected async sendKillMessageToWorker (
1011 workerNodeKey: number
1012 ): Promise<void> {
1013 await new Promise<void>((resolve, reject) => {
1014 this.registerWorkerMessageListener(workerNodeKey, message => {
1015 if (message.kill === 'success') {
1016 resolve()
1017 } else if (message.kill === 'failure') {
1018 reject(
1019 new Error(
1020 `Worker ${
1021 message.workerId as number
1022 } kill message handling failed`
1023 )
1024 )
1025 }
1026 })
1027 this.sendToWorker(workerNodeKey, { kill: true })
1028 })
1029 }
1030
1031 /**
1032 * Terminates the worker node given its worker node key.
1033 *
1034 * @param workerNodeKey - The worker node key.
1035 */
1036 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
1037
1038 /**
1039 * Setup hook to execute code before worker nodes are created in the abstract constructor.
1040 * Can be overridden.
1041 *
1042 * @virtual
1043 */
1044 protected setupHook (): void {
1045 /* Intentionally empty */
1046 }
1047
1048 /**
1049 * Should return whether the worker is the main worker or not.
1050 */
1051 protected abstract isMain (): boolean
1052
1053 /**
1054 * Hook executed before the worker task execution.
1055 * Can be overridden.
1056 *
1057 * @param workerNodeKey - The worker node key.
1058 * @param task - The task to execute.
1059 */
1060 protected beforeTaskExecutionHook (
1061 workerNodeKey: number,
1062 task: Task<Data>
1063 ): void {
1064 if (this.workerNodes[workerNodeKey]?.usage != null) {
1065 const workerUsage = this.workerNodes[workerNodeKey].usage
1066 ++workerUsage.tasks.executing
1067 this.updateWaitTimeWorkerUsage(workerUsage, task)
1068 }
1069 if (
1070 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1071 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
1072 task.name as string
1073 ) != null
1074 ) {
1075 const taskFunctionWorkerUsage = this.workerNodes[
1076 workerNodeKey
1077 ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
1078 ++taskFunctionWorkerUsage.tasks.executing
1079 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
1080 }
1081 }
1082
1083 /**
1084 * Hook executed after the worker task execution.
1085 * Can be overridden.
1086 *
1087 * @param workerNodeKey - The worker node key.
1088 * @param message - The received message.
1089 */
1090 protected afterTaskExecutionHook (
1091 workerNodeKey: number,
1092 message: MessageValue<Response>
1093 ): void {
1094 if (this.workerNodes[workerNodeKey]?.usage != null) {
1095 const workerUsage = this.workerNodes[workerNodeKey].usage
1096 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
1097 this.updateRunTimeWorkerUsage(workerUsage, message)
1098 this.updateEluWorkerUsage(workerUsage, message)
1099 }
1100 if (
1101 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1102 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
1103 message.taskPerformance?.name as string
1104 ) != null
1105 ) {
1106 const taskFunctionWorkerUsage = this.workerNodes[
1107 workerNodeKey
1108 ].getTaskFunctionWorkerUsage(
1109 message.taskPerformance?.name as string
1110 ) as WorkerUsage
1111 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
1112 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
1113 this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
1114 }
1115 }
1116
1117 /**
1118 * Whether the worker node shall update its task function worker usage or not.
1119 *
1120 * @param workerNodeKey - The worker node key.
1121 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1122 */
1123 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
1124 const workerInfo = this.getWorkerInfo(workerNodeKey)
1125 return (
1126 workerInfo != null &&
1127 Array.isArray(workerInfo.taskFunctionNames) &&
1128 workerInfo.taskFunctionNames.length > 2
1129 )
1130 }
1131
1132 private updateTaskStatisticsWorkerUsage (
1133 workerUsage: WorkerUsage,
1134 message: MessageValue<Response>
1135 ): void {
1136 const workerTaskStatistics = workerUsage.tasks
1137 if (
1138 workerTaskStatistics.executing != null &&
1139 workerTaskStatistics.executing > 0
1140 ) {
1141 --workerTaskStatistics.executing
1142 }
1143 if (message.workerError == null) {
1144 ++workerTaskStatistics.executed
1145 } else {
1146 ++workerTaskStatistics.failed
1147 }
1148 }
1149
1150 private updateRunTimeWorkerUsage (
1151 workerUsage: WorkerUsage,
1152 message: MessageValue<Response>
1153 ): void {
1154 if (message.workerError != null) {
1155 return
1156 }
1157 updateMeasurementStatistics(
1158 workerUsage.runTime,
1159 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
1160 message.taskPerformance?.runTime ?? 0
1161 )
1162 }
1163
1164 private updateWaitTimeWorkerUsage (
1165 workerUsage: WorkerUsage,
1166 task: Task<Data>
1167 ): void {
1168 const timestamp = performance.now()
1169 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
1170 updateMeasurementStatistics(
1171 workerUsage.waitTime,
1172 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
1173 taskWaitTime
1174 )
1175 }
1176
1177 private updateEluWorkerUsage (
1178 workerUsage: WorkerUsage,
1179 message: MessageValue<Response>
1180 ): void {
1181 if (message.workerError != null) {
1182 return
1183 }
1184 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
1185 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
1186 updateMeasurementStatistics(
1187 workerUsage.elu.active,
1188 eluTaskStatisticsRequirements,
1189 message.taskPerformance?.elu?.active ?? 0
1190 )
1191 updateMeasurementStatistics(
1192 workerUsage.elu.idle,
1193 eluTaskStatisticsRequirements,
1194 message.taskPerformance?.elu?.idle ?? 0
1195 )
1196 if (eluTaskStatisticsRequirements.aggregate) {
1197 if (message.taskPerformance?.elu != null) {
1198 if (workerUsage.elu.utilization != null) {
1199 workerUsage.elu.utilization =
1200 (workerUsage.elu.utilization +
1201 message.taskPerformance.elu.utilization) /
1202 2
1203 } else {
1204 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
1205 }
1206 }
1207 }
1208 }
1209
1210 /**
1211 * Chooses a worker node for the next task.
1212 *
1213 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
1214 *
1215 * @returns The chosen worker node key
1216 */
1217 private chooseWorkerNode (): number {
1218 if (this.shallCreateDynamicWorker()) {
1219 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
1220 if (
1221 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1222 ) {
1223 return workerNodeKey
1224 }
1225 }
1226 return this.workerChoiceStrategyContext.execute()
1227 }
1228
1229 /**
1230 * Conditions for dynamic worker creation.
1231 *
1232 * @returns Whether to create a dynamic worker or not.
1233 */
1234 private shallCreateDynamicWorker (): boolean {
1235 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
1236 }
1237
1238 /**
1239 * Sends a message to worker given its worker node key.
1240 *
1241 * @param workerNodeKey - The worker node key.
1242 * @param message - The message.
1243 * @param transferList - The optional array of transferable objects.
1244 */
1245 protected abstract sendToWorker (
1246 workerNodeKey: number,
1247 message: MessageValue<Data>,
1248 transferList?: TransferListItem[]
1249 ): void
1250
1251 /**
1252 * Creates a new worker.
1253 *
1254 * @returns Newly created worker.
1255 */
1256 protected abstract createWorker (): Worker
1257
1258 /**
1259 * Creates a new, completely set up worker node.
1260 *
1261 * @returns New, completely set up worker node key.
1262 */
1263 protected createAndSetupWorkerNode (): number {
1264 const worker = this.createWorker()
1265
1266 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
1267 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
1268 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1269 worker.on('error', error => {
1270 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1271 const workerInfo = this.getWorkerInfo(workerNodeKey)
1272 workerInfo.ready = false
1273 this.workerNodes[workerNodeKey].closeChannel()
1274 this.emitter?.emit(PoolEvents.error, error)
1275 if (
1276 this.started &&
1277 !this.starting &&
1278 this.opts.restartWorkerOnError === true
1279 ) {
1280 if (workerInfo.dynamic) {
1281 this.createAndSetupDynamicWorkerNode()
1282 } else {
1283 this.createAndSetupWorkerNode()
1284 }
1285 }
1286 if (this.started && this.opts.enableTasksQueue === true) {
1287 this.redistributeQueuedTasks(workerNodeKey)
1288 }
1289 })
1290 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
1291 worker.once('exit', () => {
1292 this.removeWorkerNode(worker)
1293 })
1294
1295 const workerNodeKey = this.addWorkerNode(worker)
1296
1297 this.afterWorkerNodeSetup(workerNodeKey)
1298
1299 return workerNodeKey
1300 }
1301
1302 /**
1303 * Creates a new, completely set up dynamic worker node.
1304 *
1305 * @returns New, completely set up dynamic worker node key.
1306 */
1307 protected createAndSetupDynamicWorkerNode (): number {
1308 const workerNodeKey = this.createAndSetupWorkerNode()
1309 this.registerWorkerMessageListener(workerNodeKey, message => {
1310 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1311 message.workerId
1312 )
1313 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
1314 // Kill message received from worker
1315 if (
1316 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1317 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
1318 ((this.opts.enableTasksQueue === false &&
1319 workerUsage.tasks.executing === 0) ||
1320 (this.opts.enableTasksQueue === true &&
1321 workerUsage.tasks.executing === 0 &&
1322 this.tasksQueueSize(localWorkerNodeKey) === 0)))
1323 ) {
1324 this.destroyWorkerNode(localWorkerNodeKey).catch(error => {
1325 this.emitter?.emit(PoolEvents.error, error)
1326 })
1327 }
1328 })
1329 const workerInfo = this.getWorkerInfo(workerNodeKey)
1330 this.sendToWorker(workerNodeKey, {
1331 checkActive: true
1332 })
1333 if (this.taskFunctions.size > 0) {
1334 for (const [taskFunctionName, taskFunction] of this.taskFunctions) {
1335 this.sendTaskFunctionOperationToWorker(workerNodeKey, {
1336 taskFunctionOperation: 'add',
1337 taskFunctionName,
1338 taskFunction: taskFunction.toString()
1339 }).catch(error => {
1340 this.emitter?.emit(PoolEvents.error, error)
1341 })
1342 }
1343 }
1344 workerInfo.dynamic = true
1345 if (
1346 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
1347 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1348 ) {
1349 workerInfo.ready = true
1350 }
1351 this.checkAndEmitDynamicWorkerCreationEvents()
1352 return workerNodeKey
1353 }
1354
1355 /**
1356 * Registers a listener callback on the worker given its worker node key.
1357 *
1358 * @param workerNodeKey - The worker node key.
1359 * @param listener - The message listener callback.
1360 */
1361 protected abstract registerWorkerMessageListener<
1362 Message extends Data | Response
1363 >(
1364 workerNodeKey: number,
1365 listener: (message: MessageValue<Message>) => void
1366 ): void
1367
1368 /**
1369 * Method hooked up after a worker node has been newly created.
1370 * Can be overridden.
1371 *
1372 * @param workerNodeKey - The newly created worker node key.
1373 */
1374 protected afterWorkerNodeSetup (workerNodeKey: number): void {
1375 // Listen to worker messages.
1376 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
1377 // Send the startup message to worker.
1378 this.sendStartupMessageToWorker(workerNodeKey)
1379 // Send the statistics message to worker.
1380 this.sendStatisticsMessageToWorker(workerNodeKey)
1381 if (this.opts.enableTasksQueue === true) {
1382 if (this.opts.tasksQueueOptions?.taskStealing === true) {
1383 this.workerNodes[workerNodeKey].onEmptyQueue =
1384 this.taskStealingOnEmptyQueue.bind(this)
1385 }
1386 if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
1387 this.workerNodes[workerNodeKey].onBackPressure =
1388 this.tasksStealingOnBackPressure.bind(this)
1389 }
1390 }
1391 }
1392
1393 /**
1394 * Sends the startup message to worker given its worker node key.
1395 *
1396 * @param workerNodeKey - The worker node key.
1397 */
1398 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1399
1400 /**
1401 * Sends the statistics message to worker given its worker node key.
1402 *
1403 * @param workerNodeKey - The worker node key.
1404 */
1405 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
1406 this.sendToWorker(workerNodeKey, {
1407 statistics: {
1408 runTime:
1409 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1410 .runTime.aggregate,
1411 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1412 .elu.aggregate
1413 }
1414 })
1415 }
1416
1417 private redistributeQueuedTasks (workerNodeKey: number): void {
1418 while (this.tasksQueueSize(workerNodeKey) > 0) {
1419 const destinationWorkerNodeKey = this.workerNodes.reduce(
1420 (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
1421 return workerNode.info.ready &&
1422 workerNode.usage.tasks.queued <
1423 workerNodes[minWorkerNodeKey].usage.tasks.queued
1424 ? workerNodeKey
1425 : minWorkerNodeKey
1426 },
1427 0
1428 )
1429 const task = this.dequeueTask(workerNodeKey) as Task<Data>
1430 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1431 this.executeTask(destinationWorkerNodeKey, task)
1432 } else {
1433 this.enqueueTask(destinationWorkerNodeKey, task)
1434 }
1435 }
1436 }
1437
1438 private updateTaskStolenStatisticsWorkerUsage (
1439 workerNodeKey: number,
1440 taskName: string
1441 ): void {
1442 const workerNode = this.workerNodes[workerNodeKey]
1443 if (workerNode?.usage != null) {
1444 ++workerNode.usage.tasks.stolen
1445 }
1446 if (
1447 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1448 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1449 ) {
1450 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1451 taskName
1452 ) as WorkerUsage
1453 ++taskFunctionWorkerUsage.tasks.stolen
1454 }
1455 }
1456
1457 private taskStealingOnEmptyQueue (workerId: number): void {
1458 const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
1459 const workerNodes = this.workerNodes
1460 .slice()
1461 .sort(
1462 (workerNodeA, workerNodeB) =>
1463 workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
1464 )
1465 const sourceWorkerNode = workerNodes.find(
1466 workerNode =>
1467 workerNode.info.ready &&
1468 workerNode.info.id !== workerId &&
1469 workerNode.usage.tasks.queued > 0
1470 )
1471 if (sourceWorkerNode != null) {
1472 const task = sourceWorkerNode.popTask() as Task<Data>
1473 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1474 this.executeTask(destinationWorkerNodeKey, task)
1475 } else {
1476 this.enqueueTask(destinationWorkerNodeKey, task)
1477 }
1478 this.updateTaskStolenStatisticsWorkerUsage(
1479 destinationWorkerNodeKey,
1480 task.name as string
1481 )
1482 }
1483 }
1484
1485 private tasksStealingOnBackPressure (workerId: number): void {
1486 const sizeOffset = 1
1487 if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) {
1488 return
1489 }
1490 const sourceWorkerNode =
1491 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
1492 const workerNodes = this.workerNodes
1493 .slice()
1494 .sort(
1495 (workerNodeA, workerNodeB) =>
1496 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1497 )
1498 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1499 if (
1500 sourceWorkerNode.usage.tasks.queued > 0 &&
1501 workerNode.info.ready &&
1502 workerNode.info.id !== workerId &&
1503 workerNode.usage.tasks.queued <
1504 (this.opts.tasksQueueOptions?.size as number) - sizeOffset
1505 ) {
1506 const task = sourceWorkerNode.popTask() as Task<Data>
1507 if (this.shallExecuteTask(workerNodeKey)) {
1508 this.executeTask(workerNodeKey, task)
1509 } else {
1510 this.enqueueTask(workerNodeKey, task)
1511 }
1512 this.updateTaskStolenStatisticsWorkerUsage(
1513 workerNodeKey,
1514 task.name as string
1515 )
1516 }
1517 }
1518 }
1519
1520 /**
1521 * This method is the listener registered for each worker message.
1522 *
1523 * @returns The listener function to execute when a message is received from a worker.
1524 */
1525 protected workerListener (): (message: MessageValue<Response>) => void {
1526 return message => {
1527 this.checkMessageWorkerId(message)
1528 if (message.ready != null && message.taskFunctionNames != null) {
1529 // Worker ready response received from worker
1530 this.handleWorkerReadyResponse(message)
1531 } else if (message.taskId != null) {
1532 // Task execution response received from worker
1533 this.handleTaskExecutionResponse(message)
1534 } else if (message.taskFunctionNames != null) {
1535 // Task function names message received from worker
1536 this.getWorkerInfo(
1537 this.getWorkerNodeKeyByWorkerId(message.workerId)
1538 ).taskFunctionNames = message.taskFunctionNames
1539 }
1540 }
1541 }
1542
1543 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
1544 if (message.ready === false) {
1545 throw new Error(
1546 `Worker ${message.workerId as number} failed to initialize`
1547 )
1548 }
1549 const workerInfo = this.getWorkerInfo(
1550 this.getWorkerNodeKeyByWorkerId(message.workerId)
1551 )
1552 workerInfo.ready = message.ready as boolean
1553 workerInfo.taskFunctionNames = message.taskFunctionNames
1554 if (this.ready) {
1555 this.emitter?.emit(PoolEvents.ready, this.info)
1556 }
1557 }
1558
1559 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1560 const { taskId, workerError, data } = message
1561 const promiseResponse = this.promiseResponseMap.get(taskId as string)
1562 if (promiseResponse != null) {
1563 if (workerError != null) {
1564 this.emitter?.emit(PoolEvents.taskError, workerError)
1565 promiseResponse.reject(workerError.message)
1566 } else {
1567 promiseResponse.resolve(data as Response)
1568 }
1569 const workerNodeKey = promiseResponse.workerNodeKey
1570 this.afterTaskExecutionHook(workerNodeKey, message)
1571 this.workerChoiceStrategyContext.update(workerNodeKey)
1572 this.promiseResponseMap.delete(taskId as string)
1573 if (
1574 this.opts.enableTasksQueue === true &&
1575 this.tasksQueueSize(workerNodeKey) > 0 &&
1576 this.workerNodes[workerNodeKey].usage.tasks.executing <
1577 (this.opts.tasksQueueOptions?.concurrency as number)
1578 ) {
1579 this.executeTask(
1580 workerNodeKey,
1581 this.dequeueTask(workerNodeKey) as Task<Data>
1582 )
1583 }
1584 }
1585 }
1586
1587 private checkAndEmitTaskExecutionEvents (): void {
1588 if (this.busy) {
1589 this.emitter?.emit(PoolEvents.busy, this.info)
1590 }
1591 }
1592
1593 private checkAndEmitTaskQueuingEvents (): void {
1594 if (this.hasBackPressure()) {
1595 this.emitter?.emit(PoolEvents.backPressure, this.info)
1596 }
1597 }
1598
1599 private checkAndEmitDynamicWorkerCreationEvents (): void {
1600 if (this.type === PoolTypes.dynamic) {
1601 if (this.full) {
1602 this.emitter?.emit(PoolEvents.full, this.info)
1603 }
1604 }
1605 }
1606
1607 /**
1608 * Gets the worker information given its worker node key.
1609 *
1610 * @param workerNodeKey - The worker node key.
1611 * @returns The worker information.
1612 */
1613 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
1614 return this.workerNodes[workerNodeKey].info
1615 }
1616
1617 /**
1618 * Adds the given worker in the pool worker nodes.
1619 *
1620 * @param worker - The worker.
1621 * @returns The added worker node key.
1622 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1623 */
1624 private addWorkerNode (worker: Worker): number {
1625 const workerNode = new WorkerNode<Worker, Data>(
1626 worker,
1627 this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2)
1628 )
1629 // Flag the worker node as ready at pool startup.
1630 if (this.starting) {
1631 workerNode.info.ready = true
1632 }
1633 this.workerNodes.push(workerNode)
1634 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1635 if (workerNodeKey === -1) {
1636 throw new Error('Worker added not found in worker nodes')
1637 }
1638 return workerNodeKey
1639 }
1640
1641 /**
1642 * Removes the given worker from the pool worker nodes.
1643 *
1644 * @param worker - The worker.
1645 */
1646 private removeWorkerNode (worker: Worker): void {
1647 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1648 if (workerNodeKey !== -1) {
1649 this.workerNodes.splice(workerNodeKey, 1)
1650 this.workerChoiceStrategyContext.remove(workerNodeKey)
1651 }
1652 }
1653
1654 /** @inheritDoc */
1655 public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
1656 return (
1657 this.opts.enableTasksQueue === true &&
1658 this.workerNodes[workerNodeKey].hasBackPressure()
1659 )
1660 }
1661
1662 private hasBackPressure (): boolean {
1663 return (
1664 this.opts.enableTasksQueue === true &&
1665 this.workerNodes.findIndex(
1666 workerNode => !workerNode.hasBackPressure()
1667 ) === -1
1668 )
1669 }
1670
1671 /**
1672 * Executes the given task on the worker given its worker node key.
1673 *
1674 * @param workerNodeKey - The worker node key.
1675 * @param task - The task to execute.
1676 */
1677 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1678 this.beforeTaskExecutionHook(workerNodeKey, task)
1679 this.sendToWorker(workerNodeKey, task, task.transferList)
1680 this.checkAndEmitTaskExecutionEvents()
1681 }
1682
1683 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1684 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1685 this.checkAndEmitTaskQueuingEvents()
1686 return tasksQueueSize
1687 }
1688
1689 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1690 return this.workerNodes[workerNodeKey].dequeueTask()
1691 }
1692
1693 private tasksQueueSize (workerNodeKey: number): number {
1694 return this.workerNodes[workerNodeKey].tasksQueueSize()
1695 }
1696
1697 protected flushTasksQueue (workerNodeKey: number): void {
1698 while (this.tasksQueueSize(workerNodeKey) > 0) {
1699 this.executeTask(
1700 workerNodeKey,
1701 this.dequeueTask(workerNodeKey) as Task<Data>
1702 )
1703 }
1704 this.workerNodes[workerNodeKey].clearTasksQueue()
1705 }
1706
1707 private flushTasksQueues (): void {
1708 for (const [workerNodeKey] of this.workerNodes.entries()) {
1709 this.flushTasksQueue(workerNodeKey)
1710 }
1711 }
1712 }