Merge branch 'master' into feature/task-functions
[poolifier.git] / src / pools / abstract-pool.ts
1 import { randomUUID } from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import { existsSync } from 'node:fs'
4 import { type TransferListItem } from 'node:worker_threads'
5 import type {
6 MessageValue,
7 PromiseResponseWrapper,
8 Task
9 } from '../utility-types'
10 import {
11 DEFAULT_TASK_NAME,
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
13 EMPTY_FUNCTION,
14 average,
15 isKillBehavior,
16 isPlainObject,
17 max,
18 median,
19 min,
20 round,
21 updateMeasurementStatistics
22 } from '../utils'
23 import { KillBehaviors } from '../worker/worker-options'
24 import type { TaskFunction } from '../worker/task-functions'
25 import {
26 type IPool,
27 PoolEmitter,
28 PoolEvents,
29 type PoolInfo,
30 type PoolOptions,
31 type PoolType,
32 PoolTypes,
33 type TasksQueueOptions
34 } from './pool'
35 import type {
36 IWorker,
37 IWorkerNode,
38 WorkerInfo,
39 WorkerType,
40 WorkerUsage
41 } from './worker'
42 import {
43 type MeasurementStatisticsRequirements,
44 Measurements,
45 WorkerChoiceStrategies,
46 type WorkerChoiceStrategy,
47 type WorkerChoiceStrategyOptions
48 } from './selection-strategies/selection-strategies-types'
49 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
50 import { version } from './version'
51 import { WorkerNode } from './worker-node'
52
53 /**
54 * Base class that implements some shared logic for all poolifier pools.
55 *
56 * @typeParam Worker - Type of worker which manages this pool.
57 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
58 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
59 */
60 export abstract class AbstractPool<
61 Worker extends IWorker,
62 Data = unknown,
63 Response = unknown
64 > implements IPool<Worker, Data, Response> {
65 /** @inheritDoc */
66 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
67
68 /** @inheritDoc */
69 public readonly emitter?: PoolEmitter
70
71 /**
72 * The task execution response promise map.
73 *
74 * - `key`: The message id of each submitted task.
75 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
76 *
77 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
78 */
79 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
80 new Map<string, PromiseResponseWrapper<Response>>()
81
82 /**
83 * Worker choice strategy context referencing a worker choice algorithm implementation.
84 */
85 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
86 Worker,
87 Data,
88 Response
89 >
90
91 /**
92 * Dynamic pool maximum size property placeholder.
93 */
94 protected readonly max?: number
95
96 /**
97 * The task functions added at runtime map:
98 * - `key`: The task function name.
99 * - `value`: The task function itself.
100 */
101 private readonly taskFunctions: Map<string, TaskFunction<Data, Response>>
102
103 /**
104 * Whether the pool is starting or not.
105 */
106 private readonly starting: boolean
107 /**
108 * Whether the pool is started or not.
109 */
110 private started: boolean
111 /**
112 * The start timestamp of the pool.
113 */
114 private readonly startTimestamp
115
116 /**
117 * Constructs a new poolifier pool.
118 *
119 * @param numberOfWorkers - Number of workers that this pool should manage.
120 * @param filePath - Path to the worker file.
121 * @param opts - Options for the pool.
122 */
123 public constructor (
124 protected readonly numberOfWorkers: number,
125 protected readonly filePath: string,
126 protected readonly opts: PoolOptions<Worker>
127 ) {
128 if (!this.isMain()) {
129 throw new Error(
130 'Cannot start a pool from a worker with the same type as the pool'
131 )
132 }
133 this.checkNumberOfWorkers(this.numberOfWorkers)
134 this.checkFilePath(this.filePath)
135 this.checkPoolOptions(this.opts)
136
137 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
138 this.executeTask = this.executeTask.bind(this)
139 this.enqueueTask = this.enqueueTask.bind(this)
140
141 if (this.opts.enableEvents === true) {
142 this.emitter = new PoolEmitter()
143 }
144 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
145 Worker,
146 Data,
147 Response
148 >(
149 this,
150 this.opts.workerChoiceStrategy,
151 this.opts.workerChoiceStrategyOptions
152 )
153
154 this.setupHook()
155
156 this.taskFunctions = new Map<string, TaskFunction<Data, Response>>()
157
158 this.starting = true
159 this.startPool()
160 this.starting = false
161 this.started = true
162
163 this.startTimestamp = performance.now()
164 }
165
166 private checkFilePath (filePath: string): void {
167 if (
168 filePath == null ||
169 typeof filePath !== 'string' ||
170 (typeof filePath === 'string' && filePath.trim().length === 0)
171 ) {
172 throw new Error('Please specify a file with a worker implementation')
173 }
174 if (!existsSync(filePath)) {
175 throw new Error(`Cannot find the worker file '${filePath}'`)
176 }
177 }
178
179 private checkNumberOfWorkers (numberOfWorkers: number): void {
180 if (numberOfWorkers == null) {
181 throw new Error(
182 'Cannot instantiate a pool without specifying the number of workers'
183 )
184 } else if (!Number.isSafeInteger(numberOfWorkers)) {
185 throw new TypeError(
186 'Cannot instantiate a pool with a non safe integer number of workers'
187 )
188 } else if (numberOfWorkers < 0) {
189 throw new RangeError(
190 'Cannot instantiate a pool with a negative number of workers'
191 )
192 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
193 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
194 }
195 }
196
197 protected checkDynamicPoolSize (min: number, max: number): void {
198 if (this.type === PoolTypes.dynamic) {
199 if (max == null) {
200 throw new TypeError(
201 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
202 )
203 } else if (!Number.isSafeInteger(max)) {
204 throw new TypeError(
205 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
206 )
207 } else if (min > max) {
208 throw new RangeError(
209 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
210 )
211 } else if (max === 0) {
212 throw new RangeError(
213 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
214 )
215 } else if (min === max) {
216 throw new RangeError(
217 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
218 )
219 }
220 }
221 }
222
223 private checkPoolOptions (opts: PoolOptions<Worker>): void {
224 if (isPlainObject(opts)) {
225 this.opts.workerChoiceStrategy =
226 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
227 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
228 this.opts.workerChoiceStrategyOptions = {
229 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
230 ...opts.workerChoiceStrategyOptions
231 }
232 this.checkValidWorkerChoiceStrategyOptions(
233 this.opts.workerChoiceStrategyOptions
234 )
235 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
236 this.opts.enableEvents = opts.enableEvents ?? true
237 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
238 if (this.opts.enableTasksQueue) {
239 this.checkValidTasksQueueOptions(
240 opts.tasksQueueOptions as TasksQueueOptions
241 )
242 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
243 opts.tasksQueueOptions as TasksQueueOptions
244 )
245 }
246 } else {
247 throw new TypeError('Invalid pool options: must be a plain object')
248 }
249 }
250
251 private checkValidWorkerChoiceStrategy (
252 workerChoiceStrategy: WorkerChoiceStrategy
253 ): void {
254 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
255 throw new Error(
256 `Invalid worker choice strategy '${workerChoiceStrategy}'`
257 )
258 }
259 }
260
261 private checkValidWorkerChoiceStrategyOptions (
262 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
263 ): void {
264 if (!isPlainObject(workerChoiceStrategyOptions)) {
265 throw new TypeError(
266 'Invalid worker choice strategy options: must be a plain object'
267 )
268 }
269 if (
270 workerChoiceStrategyOptions.retries != null &&
271 !Number.isSafeInteger(workerChoiceStrategyOptions.retries)
272 ) {
273 throw new TypeError(
274 'Invalid worker choice strategy options: retries must be an integer'
275 )
276 }
277 if (
278 workerChoiceStrategyOptions.retries != null &&
279 workerChoiceStrategyOptions.retries < 0
280 ) {
281 throw new RangeError(
282 `Invalid worker choice strategy options: retries '${workerChoiceStrategyOptions.retries}' must be greater or equal than zero`
283 )
284 }
285 if (
286 workerChoiceStrategyOptions.weights != null &&
287 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
288 ) {
289 throw new Error(
290 'Invalid worker choice strategy options: must have a weight for each worker node'
291 )
292 }
293 if (
294 workerChoiceStrategyOptions.measurement != null &&
295 !Object.values(Measurements).includes(
296 workerChoiceStrategyOptions.measurement
297 )
298 ) {
299 throw new Error(
300 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
301 )
302 }
303 }
304
305 private checkValidTasksQueueOptions (
306 tasksQueueOptions: TasksQueueOptions
307 ): void {
308 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
309 throw new TypeError('Invalid tasks queue options: must be a plain object')
310 }
311 if (
312 tasksQueueOptions?.concurrency != null &&
313 !Number.isSafeInteger(tasksQueueOptions?.concurrency)
314 ) {
315 throw new TypeError(
316 'Invalid worker node tasks concurrency: must be an integer'
317 )
318 }
319 if (
320 tasksQueueOptions?.concurrency != null &&
321 tasksQueueOptions?.concurrency <= 0
322 ) {
323 throw new RangeError(
324 `Invalid worker node tasks concurrency: ${tasksQueueOptions?.concurrency} is a negative integer or zero`
325 )
326 }
327 if (tasksQueueOptions?.queueMaxSize != null) {
328 throw new Error(
329 'Invalid tasks queue options: queueMaxSize is deprecated, please use size instead'
330 )
331 }
332 if (
333 tasksQueueOptions?.size != null &&
334 !Number.isSafeInteger(tasksQueueOptions?.size)
335 ) {
336 throw new TypeError(
337 'Invalid worker node tasks queue size: must be an integer'
338 )
339 }
340 if (tasksQueueOptions?.size != null && tasksQueueOptions?.size <= 0) {
341 throw new RangeError(
342 `Invalid worker node tasks queue size: ${tasksQueueOptions?.size} is a negative integer or zero`
343 )
344 }
345 }
346
347 private startPool (): void {
348 while (
349 this.workerNodes.reduce(
350 (accumulator, workerNode) =>
351 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
352 0
353 ) < this.numberOfWorkers
354 ) {
355 this.createAndSetupWorkerNode()
356 }
357 }
358
359 /** @inheritDoc */
360 public get info (): PoolInfo {
361 return {
362 version,
363 type: this.type,
364 worker: this.worker,
365 ready: this.ready,
366 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
367 minSize: this.minSize,
368 maxSize: this.maxSize,
369 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
370 .runTime.aggregate &&
371 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
372 .waitTime.aggregate && { utilization: round(this.utilization) }),
373 workerNodes: this.workerNodes.length,
374 idleWorkerNodes: this.workerNodes.reduce(
375 (accumulator, workerNode) =>
376 workerNode.usage.tasks.executing === 0
377 ? accumulator + 1
378 : accumulator,
379 0
380 ),
381 busyWorkerNodes: this.workerNodes.reduce(
382 (accumulator, workerNode) =>
383 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
384 0
385 ),
386 executedTasks: this.workerNodes.reduce(
387 (accumulator, workerNode) =>
388 accumulator + workerNode.usage.tasks.executed,
389 0
390 ),
391 executingTasks: this.workerNodes.reduce(
392 (accumulator, workerNode) =>
393 accumulator + workerNode.usage.tasks.executing,
394 0
395 ),
396 ...(this.opts.enableTasksQueue === true && {
397 queuedTasks: this.workerNodes.reduce(
398 (accumulator, workerNode) =>
399 accumulator + workerNode.usage.tasks.queued,
400 0
401 )
402 }),
403 ...(this.opts.enableTasksQueue === true && {
404 maxQueuedTasks: this.workerNodes.reduce(
405 (accumulator, workerNode) =>
406 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
407 0
408 )
409 }),
410 ...(this.opts.enableTasksQueue === true && {
411 backPressure: this.hasBackPressure()
412 }),
413 ...(this.opts.enableTasksQueue === true && {
414 stolenTasks: this.workerNodes.reduce(
415 (accumulator, workerNode) =>
416 accumulator + workerNode.usage.tasks.stolen,
417 0
418 )
419 }),
420 failedTasks: this.workerNodes.reduce(
421 (accumulator, workerNode) =>
422 accumulator + workerNode.usage.tasks.failed,
423 0
424 ),
425 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
426 .runTime.aggregate && {
427 runTime: {
428 minimum: round(
429 min(
430 ...this.workerNodes.map(
431 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
432 )
433 )
434 ),
435 maximum: round(
436 max(
437 ...this.workerNodes.map(
438 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
439 )
440 )
441 ),
442 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
443 .runTime.average && {
444 average: round(
445 average(
446 this.workerNodes.reduce<number[]>(
447 (accumulator, workerNode) =>
448 accumulator.concat(workerNode.usage.runTime.history),
449 []
450 )
451 )
452 )
453 }),
454 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
455 .runTime.median && {
456 median: round(
457 median(
458 this.workerNodes.reduce<number[]>(
459 (accumulator, workerNode) =>
460 accumulator.concat(workerNode.usage.runTime.history),
461 []
462 )
463 )
464 )
465 })
466 }
467 }),
468 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
469 .waitTime.aggregate && {
470 waitTime: {
471 minimum: round(
472 min(
473 ...this.workerNodes.map(
474 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
475 )
476 )
477 ),
478 maximum: round(
479 max(
480 ...this.workerNodes.map(
481 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
482 )
483 )
484 ),
485 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
486 .waitTime.average && {
487 average: round(
488 average(
489 this.workerNodes.reduce<number[]>(
490 (accumulator, workerNode) =>
491 accumulator.concat(workerNode.usage.waitTime.history),
492 []
493 )
494 )
495 )
496 }),
497 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
498 .waitTime.median && {
499 median: round(
500 median(
501 this.workerNodes.reduce<number[]>(
502 (accumulator, workerNode) =>
503 accumulator.concat(workerNode.usage.waitTime.history),
504 []
505 )
506 )
507 )
508 })
509 }
510 })
511 }
512 }
513
514 /**
515 * The pool readiness boolean status.
516 */
517 private get ready (): boolean {
518 return (
519 this.workerNodes.reduce(
520 (accumulator, workerNode) =>
521 !workerNode.info.dynamic && workerNode.info.ready
522 ? accumulator + 1
523 : accumulator,
524 0
525 ) >= this.minSize
526 )
527 }
528
529 /**
530 * The approximate pool utilization.
531 *
532 * @returns The pool utilization.
533 */
534 private get utilization (): number {
535 const poolTimeCapacity =
536 (performance.now() - this.startTimestamp) * this.maxSize
537 const totalTasksRunTime = this.workerNodes.reduce(
538 (accumulator, workerNode) =>
539 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
540 0
541 )
542 const totalTasksWaitTime = this.workerNodes.reduce(
543 (accumulator, workerNode) =>
544 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
545 0
546 )
547 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
548 }
549
550 /**
551 * The pool type.
552 *
553 * If it is `'dynamic'`, it provides the `max` property.
554 */
555 protected abstract get type (): PoolType
556
557 /**
558 * The worker type.
559 */
560 protected abstract get worker (): WorkerType
561
562 /**
563 * The pool minimum size.
564 */
565 protected get minSize (): number {
566 return this.numberOfWorkers
567 }
568
569 /**
570 * The pool maximum size.
571 */
572 protected get maxSize (): number {
573 return this.max ?? this.numberOfWorkers
574 }
575
576 /**
577 * Checks if the worker id sent in the received message from a worker is valid.
578 *
579 * @param message - The received message.
580 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
581 */
582 private checkMessageWorkerId (message: MessageValue<Response>): void {
583 if (message.workerId == null) {
584 throw new Error('Worker message received without worker id')
585 } else if (
586 message.workerId != null &&
587 this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
588 ) {
589 throw new Error(
590 `Worker message received from unknown worker '${message.workerId}'`
591 )
592 }
593 }
594
595 /**
596 * Gets the given worker its worker node key.
597 *
598 * @param worker - The worker.
599 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
600 */
601 private getWorkerNodeKeyByWorker (worker: Worker): number {
602 return this.workerNodes.findIndex(
603 workerNode => workerNode.worker === worker
604 )
605 }
606
607 /**
608 * Gets the worker node key given its worker id.
609 *
610 * @param workerId - The worker id.
611 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
612 */
613 private getWorkerNodeKeyByWorkerId (workerId: number | undefined): number {
614 return this.workerNodes.findIndex(
615 workerNode => workerNode.info.id === workerId
616 )
617 }
618
619 /** @inheritDoc */
620 public setWorkerChoiceStrategy (
621 workerChoiceStrategy: WorkerChoiceStrategy,
622 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
623 ): void {
624 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
625 this.opts.workerChoiceStrategy = workerChoiceStrategy
626 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
627 this.opts.workerChoiceStrategy
628 )
629 if (workerChoiceStrategyOptions != null) {
630 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
631 }
632 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
633 workerNode.resetUsage()
634 this.sendStatisticsMessageToWorker(workerNodeKey)
635 }
636 }
637
638 /** @inheritDoc */
639 public setWorkerChoiceStrategyOptions (
640 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
641 ): void {
642 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
643 this.opts.workerChoiceStrategyOptions = {
644 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
645 ...workerChoiceStrategyOptions
646 }
647 this.workerChoiceStrategyContext.setOptions(
648 this.opts.workerChoiceStrategyOptions
649 )
650 }
651
652 /** @inheritDoc */
653 public enableTasksQueue (
654 enable: boolean,
655 tasksQueueOptions?: TasksQueueOptions
656 ): void {
657 if (this.opts.enableTasksQueue === true && !enable) {
658 this.flushTasksQueues()
659 }
660 this.opts.enableTasksQueue = enable
661 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
662 }
663
664 /** @inheritDoc */
665 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
666 if (this.opts.enableTasksQueue === true) {
667 this.checkValidTasksQueueOptions(tasksQueueOptions)
668 this.opts.tasksQueueOptions =
669 this.buildTasksQueueOptions(tasksQueueOptions)
670 this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number)
671 } else if (this.opts.tasksQueueOptions != null) {
672 delete this.opts.tasksQueueOptions
673 }
674 }
675
676 private setTasksQueueSize (size: number): void {
677 for (const workerNode of this.workerNodes) {
678 workerNode.tasksQueueBackPressureSize = size
679 }
680 }
681
682 private buildTasksQueueOptions (
683 tasksQueueOptions: TasksQueueOptions
684 ): TasksQueueOptions {
685 return {
686 ...{
687 size: Math.pow(this.maxSize, 2),
688 concurrency: 1
689 },
690 ...tasksQueueOptions
691 }
692 }
693
694 /**
695 * Whether the pool is full or not.
696 *
697 * The pool filling boolean status.
698 */
699 protected get full (): boolean {
700 return this.workerNodes.length >= this.maxSize
701 }
702
703 /**
704 * Whether the pool is busy or not.
705 *
706 * The pool busyness boolean status.
707 */
708 protected abstract get busy (): boolean
709
710 /**
711 * Whether worker nodes are executing concurrently their tasks quota or not.
712 *
713 * @returns Worker nodes busyness boolean status.
714 */
715 protected internalBusy (): boolean {
716 if (this.opts.enableTasksQueue === true) {
717 return (
718 this.workerNodes.findIndex(
719 workerNode =>
720 workerNode.info.ready &&
721 workerNode.usage.tasks.executing <
722 (this.opts.tasksQueueOptions?.concurrency as number)
723 ) === -1
724 )
725 } else {
726 return (
727 this.workerNodes.findIndex(
728 workerNode =>
729 workerNode.info.ready && workerNode.usage.tasks.executing === 0
730 ) === -1
731 )
732 }
733 }
734
735 private async sendTaskFunctionOperationToWorker (
736 workerNodeKey: number,
737 message: MessageValue<Data>
738 ): Promise<boolean> {
739 const workerId = this.getWorkerInfo(workerNodeKey).id as number
740 return await new Promise<boolean>((resolve, reject) => {
741 this.registerWorkerMessageListener(workerNodeKey, message => {
742 if (
743 message.workerId === workerId &&
744 message.taskFunctionOperationStatus === true
745 ) {
746 resolve(true)
747 } else if (
748 message.workerId === workerId &&
749 message.taskFunctionOperationStatus === false
750 ) {
751 reject(
752 new Error(
753 `Task function operation ${
754 message.taskFunctionOperation as string
755 } failed on worker ${message.workerId}`
756 )
757 )
758 }
759 })
760 this.sendToWorker(workerNodeKey, message)
761 })
762 }
763
764 private async sendTaskFunctionOperationToWorkers (
765 message: Omit<MessageValue<Data>, 'workerId'>
766 ): Promise<boolean> {
767 return await new Promise<boolean>((resolve, reject) => {
768 const responsesReceived = new Array<MessageValue<Data | Response>>()
769 for (const [workerNodeKey] of this.workerNodes.entries()) {
770 this.registerWorkerMessageListener(workerNodeKey, message => {
771 if (message.taskFunctionOperationStatus != null) {
772 responsesReceived.push(message)
773 if (
774 responsesReceived.length === this.workerNodes.length &&
775 responsesReceived.every(
776 message => message.taskFunctionOperationStatus === true
777 )
778 ) {
779 resolve(true)
780 } else if (
781 responsesReceived.length === this.workerNodes.length &&
782 responsesReceived.some(
783 message => message.taskFunctionOperationStatus === false
784 )
785 ) {
786 reject(
787 new Error(
788 `Task function operation ${
789 message.taskFunctionOperation as string
790 } failed on worker ${message.workerId as number}`
791 )
792 )
793 }
794 }
795 })
796 this.sendToWorker(workerNodeKey, message)
797 }
798 })
799 }
800
801 /** @inheritDoc */
802 public hasTaskFunction (name: string): boolean {
803 for (const workerNode of this.workerNodes) {
804 if (
805 Array.isArray(workerNode.info.taskFunctionNames) &&
806 workerNode.info.taskFunctionNames.includes(name)
807 ) {
808 return true
809 }
810 }
811 return false
812 }
813
814 /** @inheritDoc */
815 public async addTaskFunction (
816 name: string,
817 taskFunction: TaskFunction<Data, Response>
818 ): Promise<boolean> {
819 this.taskFunctions.set(name, taskFunction)
820 return await this.sendTaskFunctionOperationToWorkers({
821 taskFunctionOperation: 'add',
822 taskFunctionName: name,
823 taskFunction: taskFunction.toString()
824 })
825 }
826
827 /** @inheritDoc */
828 public async removeTaskFunction (name: string): Promise<boolean> {
829 this.taskFunctions.delete(name)
830 return await this.sendTaskFunctionOperationToWorkers({
831 taskFunctionOperation: 'remove',
832 taskFunctionName: name
833 })
834 }
835
836 /** @inheritDoc */
837 public listTaskFunctionNames (): string[] {
838 for (const workerNode of this.workerNodes) {
839 if (
840 Array.isArray(workerNode.info.taskFunctionNames) &&
841 workerNode.info.taskFunctionNames.length > 0
842 ) {
843 return workerNode.info.taskFunctionNames
844 }
845 }
846 return []
847 }
848
849 /** @inheritDoc */
850 public async setDefaultTaskFunction (name: string): Promise<boolean> {
851 return await this.sendTaskFunctionOperationToWorkers({
852 taskFunctionOperation: 'default',
853 taskFunctionName: name
854 })
855 }
856
857 private shallExecuteTask (workerNodeKey: number): boolean {
858 return (
859 this.tasksQueueSize(workerNodeKey) === 0 &&
860 this.workerNodes[workerNodeKey].usage.tasks.executing <
861 (this.opts.tasksQueueOptions?.concurrency as number)
862 )
863 }
864
865 /** @inheritDoc */
866 public async execute (
867 data?: Data,
868 name?: string,
869 transferList?: TransferListItem[]
870 ): Promise<Response> {
871 return await new Promise<Response>((resolve, reject) => {
872 if (!this.started) {
873 reject(new Error('Cannot execute a task on destroyed pool'))
874 return
875 }
876 if (name != null && typeof name !== 'string') {
877 reject(new TypeError('name argument must be a string'))
878 return
879 }
880 if (
881 name != null &&
882 typeof name === 'string' &&
883 name.trim().length === 0
884 ) {
885 reject(new TypeError('name argument must not be an empty string'))
886 return
887 }
888 if (transferList != null && !Array.isArray(transferList)) {
889 reject(new TypeError('transferList argument must be an array'))
890 return
891 }
892 const timestamp = performance.now()
893 const workerNodeKey = this.chooseWorkerNode()
894 const task: Task<Data> = {
895 name: name ?? DEFAULT_TASK_NAME,
896 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
897 data: data ?? ({} as Data),
898 transferList,
899 timestamp,
900 taskId: randomUUID()
901 }
902 this.promiseResponseMap.set(task.taskId as string, {
903 resolve,
904 reject,
905 workerNodeKey
906 })
907 if (
908 this.opts.enableTasksQueue === false ||
909 (this.opts.enableTasksQueue === true &&
910 this.shallExecuteTask(workerNodeKey))
911 ) {
912 this.executeTask(workerNodeKey, task)
913 } else {
914 this.enqueueTask(workerNodeKey, task)
915 }
916 })
917 }
918
919 /** @inheritDoc */
920 public async destroy (): Promise<void> {
921 await Promise.all(
922 this.workerNodes.map(async (_, workerNodeKey) => {
923 await this.destroyWorkerNode(workerNodeKey)
924 })
925 )
926 this.emitter?.emit(PoolEvents.destroy, this.info)
927 this.started = false
928 }
929
930 protected async sendKillMessageToWorker (
931 workerNodeKey: number
932 ): Promise<void> {
933 await new Promise<void>((resolve, reject) => {
934 this.registerWorkerMessageListener(workerNodeKey, message => {
935 if (message.kill === 'success') {
936 resolve()
937 } else if (message.kill === 'failure') {
938 reject(
939 new Error(
940 `Worker ${
941 message.workerId as number
942 } kill message handling failed`
943 )
944 )
945 }
946 })
947 this.sendToWorker(workerNodeKey, { kill: true })
948 })
949 }
950
951 /**
952 * Terminates the worker node given its worker node key.
953 *
954 * @param workerNodeKey - The worker node key.
955 */
956 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
957
958 /**
959 * Setup hook to execute code before worker nodes are created in the abstract constructor.
960 * Can be overridden.
961 *
962 * @virtual
963 */
964 protected setupHook (): void {
965 /* Intentionally empty */
966 }
967
968 /**
969 * Should return whether the worker is the main worker or not.
970 */
971 protected abstract isMain (): boolean
972
973 /**
974 * Hook executed before the worker task execution.
975 * Can be overridden.
976 *
977 * @param workerNodeKey - The worker node key.
978 * @param task - The task to execute.
979 */
980 protected beforeTaskExecutionHook (
981 workerNodeKey: number,
982 task: Task<Data>
983 ): void {
984 if (this.workerNodes[workerNodeKey]?.usage != null) {
985 const workerUsage = this.workerNodes[workerNodeKey].usage
986 ++workerUsage.tasks.executing
987 this.updateWaitTimeWorkerUsage(workerUsage, task)
988 }
989 if (
990 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
991 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
992 task.name as string
993 ) != null
994 ) {
995 const taskFunctionWorkerUsage = this.workerNodes[
996 workerNodeKey
997 ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
998 ++taskFunctionWorkerUsage.tasks.executing
999 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
1000 }
1001 }
1002
1003 /**
1004 * Hook executed after the worker task execution.
1005 * Can be overridden.
1006 *
1007 * @param workerNodeKey - The worker node key.
1008 * @param message - The received message.
1009 */
1010 protected afterTaskExecutionHook (
1011 workerNodeKey: number,
1012 message: MessageValue<Response>
1013 ): void {
1014 if (this.workerNodes[workerNodeKey]?.usage != null) {
1015 const workerUsage = this.workerNodes[workerNodeKey].usage
1016 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
1017 this.updateRunTimeWorkerUsage(workerUsage, message)
1018 this.updateEluWorkerUsage(workerUsage, message)
1019 }
1020 if (
1021 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1022 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
1023 message.taskPerformance?.name as string
1024 ) != null
1025 ) {
1026 const taskFunctionWorkerUsage = this.workerNodes[
1027 workerNodeKey
1028 ].getTaskFunctionWorkerUsage(
1029 message.taskPerformance?.name as string
1030 ) as WorkerUsage
1031 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
1032 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
1033 this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
1034 }
1035 }
1036
1037 /**
1038 * Whether the worker node shall update its task function worker usage or not.
1039 *
1040 * @param workerNodeKey - The worker node key.
1041 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1042 */
1043 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
1044 const workerInfo = this.getWorkerInfo(workerNodeKey)
1045 return (
1046 workerInfo != null &&
1047 Array.isArray(workerInfo.taskFunctionNames) &&
1048 workerInfo.taskFunctionNames.length > 2
1049 )
1050 }
1051
1052 private updateTaskStatisticsWorkerUsage (
1053 workerUsage: WorkerUsage,
1054 message: MessageValue<Response>
1055 ): void {
1056 const workerTaskStatistics = workerUsage.tasks
1057 if (
1058 workerTaskStatistics.executing != null &&
1059 workerTaskStatistics.executing > 0
1060 ) {
1061 --workerTaskStatistics.executing
1062 }
1063 if (message.workerError == null) {
1064 ++workerTaskStatistics.executed
1065 } else {
1066 ++workerTaskStatistics.failed
1067 }
1068 }
1069
1070 private updateRunTimeWorkerUsage (
1071 workerUsage: WorkerUsage,
1072 message: MessageValue<Response>
1073 ): void {
1074 if (message.workerError != null) {
1075 return
1076 }
1077 updateMeasurementStatistics(
1078 workerUsage.runTime,
1079 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
1080 message.taskPerformance?.runTime ?? 0
1081 )
1082 }
1083
1084 private updateWaitTimeWorkerUsage (
1085 workerUsage: WorkerUsage,
1086 task: Task<Data>
1087 ): void {
1088 const timestamp = performance.now()
1089 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
1090 updateMeasurementStatistics(
1091 workerUsage.waitTime,
1092 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
1093 taskWaitTime
1094 )
1095 }
1096
1097 private updateEluWorkerUsage (
1098 workerUsage: WorkerUsage,
1099 message: MessageValue<Response>
1100 ): void {
1101 if (message.workerError != null) {
1102 return
1103 }
1104 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
1105 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
1106 updateMeasurementStatistics(
1107 workerUsage.elu.active,
1108 eluTaskStatisticsRequirements,
1109 message.taskPerformance?.elu?.active ?? 0
1110 )
1111 updateMeasurementStatistics(
1112 workerUsage.elu.idle,
1113 eluTaskStatisticsRequirements,
1114 message.taskPerformance?.elu?.idle ?? 0
1115 )
1116 if (eluTaskStatisticsRequirements.aggregate) {
1117 if (message.taskPerformance?.elu != null) {
1118 if (workerUsage.elu.utilization != null) {
1119 workerUsage.elu.utilization =
1120 (workerUsage.elu.utilization +
1121 message.taskPerformance.elu.utilization) /
1122 2
1123 } else {
1124 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
1125 }
1126 }
1127 }
1128 }
1129
1130 /**
1131 * Chooses a worker node for the next task.
1132 *
1133 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
1134 *
1135 * @returns The chosen worker node key
1136 */
1137 private chooseWorkerNode (): number {
1138 if (this.shallCreateDynamicWorker()) {
1139 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
1140 if (
1141 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1142 ) {
1143 return workerNodeKey
1144 }
1145 }
1146 return this.workerChoiceStrategyContext.execute()
1147 }
1148
1149 /**
1150 * Conditions for dynamic worker creation.
1151 *
1152 * @returns Whether to create a dynamic worker or not.
1153 */
1154 private shallCreateDynamicWorker (): boolean {
1155 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
1156 }
1157
1158 /**
1159 * Sends a message to worker given its worker node key.
1160 *
1161 * @param workerNodeKey - The worker node key.
1162 * @param message - The message.
1163 * @param transferList - The optional array of transferable objects.
1164 */
1165 protected abstract sendToWorker (
1166 workerNodeKey: number,
1167 message: MessageValue<Data>,
1168 transferList?: TransferListItem[]
1169 ): void
1170
1171 /**
1172 * Creates a new worker.
1173 *
1174 * @returns Newly created worker.
1175 */
1176 protected abstract createWorker (): Worker
1177
1178 /**
1179 * Creates a new, completely set up worker node.
1180 *
1181 * @returns New, completely set up worker node key.
1182 */
1183 protected createAndSetupWorkerNode (): number {
1184 const worker = this.createWorker()
1185
1186 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
1187 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
1188 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1189 worker.on('error', error => {
1190 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1191 const workerInfo = this.getWorkerInfo(workerNodeKey)
1192 workerInfo.ready = false
1193 this.workerNodes[workerNodeKey].closeChannel()
1194 this.emitter?.emit(PoolEvents.error, error)
1195 if (
1196 this.opts.restartWorkerOnError === true &&
1197 this.started &&
1198 !this.starting
1199 ) {
1200 if (workerInfo.dynamic) {
1201 this.createAndSetupDynamicWorkerNode()
1202 } else {
1203 this.createAndSetupWorkerNode()
1204 }
1205 }
1206 if (this.opts.enableTasksQueue === true) {
1207 this.redistributeQueuedTasks(workerNodeKey)
1208 }
1209 })
1210 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
1211 worker.once('exit', () => {
1212 this.removeWorkerNode(worker)
1213 })
1214
1215 const workerNodeKey = this.addWorkerNode(worker)
1216
1217 this.afterWorkerNodeSetup(workerNodeKey)
1218
1219 return workerNodeKey
1220 }
1221
1222 /**
1223 * Creates a new, completely set up dynamic worker node.
1224 *
1225 * @returns New, completely set up dynamic worker node key.
1226 */
1227 protected createAndSetupDynamicWorkerNode (): number {
1228 const workerNodeKey = this.createAndSetupWorkerNode()
1229 this.registerWorkerMessageListener(workerNodeKey, message => {
1230 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1231 message.workerId
1232 )
1233 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
1234 // Kill message received from worker
1235 if (
1236 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1237 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
1238 ((this.opts.enableTasksQueue === false &&
1239 workerUsage.tasks.executing === 0) ||
1240 (this.opts.enableTasksQueue === true &&
1241 workerUsage.tasks.executing === 0 &&
1242 this.tasksQueueSize(localWorkerNodeKey) === 0)))
1243 ) {
1244 this.destroyWorkerNode(localWorkerNodeKey).catch(error => {
1245 this.emitter?.emit(PoolEvents.error, error)
1246 })
1247 }
1248 })
1249 const workerInfo = this.getWorkerInfo(workerNodeKey)
1250 this.sendToWorker(workerNodeKey, {
1251 checkActive: true
1252 })
1253 if (this.taskFunctions.size > 0) {
1254 for (const [taskFunctionName, taskFunction] of this.taskFunctions) {
1255 this.sendTaskFunctionOperationToWorker(workerNodeKey, {
1256 taskFunctionOperation: 'add',
1257 taskFunctionName,
1258 taskFunction: taskFunction.toString()
1259 }).catch(error => {
1260 this.emitter?.emit(PoolEvents.error, error)
1261 })
1262 }
1263 }
1264 workerInfo.dynamic = true
1265 if (
1266 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
1267 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1268 ) {
1269 workerInfo.ready = true
1270 }
1271 this.checkAndEmitDynamicWorkerCreationEvents()
1272 return workerNodeKey
1273 }
1274
1275 /**
1276 * Registers a listener callback on the worker given its worker node key.
1277 *
1278 * @param workerNodeKey - The worker node key.
1279 * @param listener - The message listener callback.
1280 */
1281 protected abstract registerWorkerMessageListener<
1282 Message extends Data | Response
1283 >(
1284 workerNodeKey: number,
1285 listener: (message: MessageValue<Message>) => void
1286 ): void
1287
1288 /**
1289 * Method hooked up after a worker node has been newly created.
1290 * Can be overridden.
1291 *
1292 * @param workerNodeKey - The newly created worker node key.
1293 */
1294 protected afterWorkerNodeSetup (workerNodeKey: number): void {
1295 // Listen to worker messages.
1296 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
1297 // Send the startup message to worker.
1298 this.sendStartupMessageToWorker(workerNodeKey)
1299 // Send the statistics message to worker.
1300 this.sendStatisticsMessageToWorker(workerNodeKey)
1301 if (this.opts.enableTasksQueue === true) {
1302 this.workerNodes[workerNodeKey].onEmptyQueue =
1303 this.taskStealingOnEmptyQueue.bind(this)
1304 this.workerNodes[workerNodeKey].onBackPressure =
1305 this.tasksStealingOnBackPressure.bind(this)
1306 }
1307 }
1308
1309 /**
1310 * Sends the startup message to worker given its worker node key.
1311 *
1312 * @param workerNodeKey - The worker node key.
1313 */
1314 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1315
1316 /**
1317 * Sends the statistics message to worker given its worker node key.
1318 *
1319 * @param workerNodeKey - The worker node key.
1320 */
1321 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
1322 this.sendToWorker(workerNodeKey, {
1323 statistics: {
1324 runTime:
1325 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1326 .runTime.aggregate,
1327 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1328 .elu.aggregate
1329 }
1330 })
1331 }
1332
1333 private redistributeQueuedTasks (workerNodeKey: number): void {
1334 while (this.tasksQueueSize(workerNodeKey) > 0) {
1335 const destinationWorkerNodeKey = this.workerNodes.reduce(
1336 (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
1337 return workerNode.info.ready &&
1338 workerNode.usage.tasks.queued <
1339 workerNodes[minWorkerNodeKey].usage.tasks.queued
1340 ? workerNodeKey
1341 : minWorkerNodeKey
1342 },
1343 0
1344 )
1345 const task = this.dequeueTask(workerNodeKey) as Task<Data>
1346 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1347 this.executeTask(destinationWorkerNodeKey, task)
1348 } else {
1349 this.enqueueTask(destinationWorkerNodeKey, task)
1350 }
1351 }
1352 }
1353
1354 private updateTaskStolenStatisticsWorkerUsage (
1355 workerNodeKey: number,
1356 taskName: string
1357 ): void {
1358 const workerNode = this.workerNodes[workerNodeKey]
1359 if (workerNode?.usage != null) {
1360 ++workerNode.usage.tasks.stolen
1361 }
1362 if (
1363 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1364 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1365 ) {
1366 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1367 taskName
1368 ) as WorkerUsage
1369 ++taskFunctionWorkerUsage.tasks.stolen
1370 }
1371 }
1372
1373 private taskStealingOnEmptyQueue (workerId: number): void {
1374 const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
1375 const workerNodes = this.workerNodes
1376 .slice()
1377 .sort(
1378 (workerNodeA, workerNodeB) =>
1379 workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
1380 )
1381 const sourceWorkerNode = workerNodes.find(
1382 workerNode =>
1383 workerNode.info.ready &&
1384 workerNode.info.id !== workerId &&
1385 workerNode.usage.tasks.queued > 0
1386 )
1387 if (sourceWorkerNode != null) {
1388 const task = sourceWorkerNode.popTask() as Task<Data>
1389 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1390 this.executeTask(destinationWorkerNodeKey, task)
1391 } else {
1392 this.enqueueTask(destinationWorkerNodeKey, task)
1393 }
1394 this.updateTaskStolenStatisticsWorkerUsage(
1395 destinationWorkerNodeKey,
1396 task.name as string
1397 )
1398 }
1399 }
1400
1401 private tasksStealingOnBackPressure (workerId: number): void {
1402 const sizeOffset = 1
1403 if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) {
1404 return
1405 }
1406 const sourceWorkerNode =
1407 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
1408 const workerNodes = this.workerNodes
1409 .slice()
1410 .sort(
1411 (workerNodeA, workerNodeB) =>
1412 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1413 )
1414 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1415 if (
1416 sourceWorkerNode.usage.tasks.queued > 0 &&
1417 workerNode.info.ready &&
1418 workerNode.info.id !== workerId &&
1419 workerNode.usage.tasks.queued <
1420 (this.opts.tasksQueueOptions?.size as number) - sizeOffset
1421 ) {
1422 const task = sourceWorkerNode.popTask() as Task<Data>
1423 if (this.shallExecuteTask(workerNodeKey)) {
1424 this.executeTask(workerNodeKey, task)
1425 } else {
1426 this.enqueueTask(workerNodeKey, task)
1427 }
1428 this.updateTaskStolenStatisticsWorkerUsage(
1429 workerNodeKey,
1430 task.name as string
1431 )
1432 }
1433 }
1434 }
1435
1436 /**
1437 * This method is the listener registered for each worker message.
1438 *
1439 * @returns The listener function to execute when a message is received from a worker.
1440 */
1441 protected workerListener (): (message: MessageValue<Response>) => void {
1442 return message => {
1443 this.checkMessageWorkerId(message)
1444 if (message.ready != null && message.taskFunctionNames != null) {
1445 // Worker ready response received from worker
1446 this.handleWorkerReadyResponse(message)
1447 } else if (message.taskId != null) {
1448 // Task execution response received from worker
1449 this.handleTaskExecutionResponse(message)
1450 } else if (message.taskFunctionNames != null) {
1451 // Task function names message received from worker
1452 this.getWorkerInfo(
1453 this.getWorkerNodeKeyByWorkerId(message.workerId)
1454 ).taskFunctionNames = message.taskFunctionNames
1455 }
1456 }
1457 }
1458
1459 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
1460 if (message.ready === false) {
1461 throw new Error(
1462 `Worker ${message.workerId as number} failed to initialize`
1463 )
1464 }
1465 const workerInfo = this.getWorkerInfo(
1466 this.getWorkerNodeKeyByWorkerId(message.workerId)
1467 )
1468 workerInfo.ready = message.ready as boolean
1469 workerInfo.taskFunctionNames = message.taskFunctionNames
1470 if (this.emitter != null && this.ready) {
1471 this.emitter.emit(PoolEvents.ready, this.info)
1472 }
1473 }
1474
1475 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1476 const { taskId, workerError, data } = message
1477 const promiseResponse = this.promiseResponseMap.get(taskId as string)
1478 if (promiseResponse != null) {
1479 if (workerError != null) {
1480 this.emitter?.emit(PoolEvents.taskError, workerError)
1481 promiseResponse.reject(workerError.message)
1482 } else {
1483 promiseResponse.resolve(data as Response)
1484 }
1485 const workerNodeKey = promiseResponse.workerNodeKey
1486 this.afterTaskExecutionHook(workerNodeKey, message)
1487 this.workerChoiceStrategyContext.update(workerNodeKey)
1488 this.promiseResponseMap.delete(taskId as string)
1489 if (
1490 this.opts.enableTasksQueue === true &&
1491 this.tasksQueueSize(workerNodeKey) > 0 &&
1492 this.workerNodes[workerNodeKey].usage.tasks.executing <
1493 (this.opts.tasksQueueOptions?.concurrency as number)
1494 ) {
1495 this.executeTask(
1496 workerNodeKey,
1497 this.dequeueTask(workerNodeKey) as Task<Data>
1498 )
1499 }
1500 }
1501 }
1502
1503 private checkAndEmitTaskExecutionEvents (): void {
1504 if (this.busy) {
1505 this.emitter?.emit(PoolEvents.busy, this.info)
1506 }
1507 }
1508
1509 private checkAndEmitTaskQueuingEvents (): void {
1510 if (this.hasBackPressure()) {
1511 this.emitter?.emit(PoolEvents.backPressure, this.info)
1512 }
1513 }
1514
1515 private checkAndEmitDynamicWorkerCreationEvents (): void {
1516 if (this.type === PoolTypes.dynamic) {
1517 if (this.full) {
1518 this.emitter?.emit(PoolEvents.full, this.info)
1519 }
1520 }
1521 }
1522
1523 /**
1524 * Gets the worker information given its worker node key.
1525 *
1526 * @param workerNodeKey - The worker node key.
1527 * @returns The worker information.
1528 */
1529 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
1530 return this.workerNodes[workerNodeKey].info
1531 }
1532
1533 /**
1534 * Adds the given worker in the pool worker nodes.
1535 *
1536 * @param worker - The worker.
1537 * @returns The added worker node key.
1538 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1539 */
1540 private addWorkerNode (worker: Worker): number {
1541 const workerNode = new WorkerNode<Worker, Data>(
1542 worker,
1543 this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2)
1544 )
1545 // Flag the worker node as ready at pool startup.
1546 if (this.starting) {
1547 workerNode.info.ready = true
1548 }
1549 this.workerNodes.push(workerNode)
1550 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1551 if (workerNodeKey === -1) {
1552 throw new Error('Worker added not found in worker nodes')
1553 }
1554 return workerNodeKey
1555 }
1556
1557 /**
1558 * Removes the given worker from the pool worker nodes.
1559 *
1560 * @param worker - The worker.
1561 */
1562 private removeWorkerNode (worker: Worker): void {
1563 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1564 if (workerNodeKey !== -1) {
1565 this.workerNodes.splice(workerNodeKey, 1)
1566 this.workerChoiceStrategyContext.remove(workerNodeKey)
1567 }
1568 }
1569
1570 /** @inheritDoc */
1571 public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
1572 return (
1573 this.opts.enableTasksQueue === true &&
1574 this.workerNodes[workerNodeKey].hasBackPressure()
1575 )
1576 }
1577
1578 private hasBackPressure (): boolean {
1579 return (
1580 this.opts.enableTasksQueue === true &&
1581 this.workerNodes.findIndex(
1582 workerNode => !workerNode.hasBackPressure()
1583 ) === -1
1584 )
1585 }
1586
1587 /**
1588 * Executes the given task on the worker given its worker node key.
1589 *
1590 * @param workerNodeKey - The worker node key.
1591 * @param task - The task to execute.
1592 */
1593 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1594 this.beforeTaskExecutionHook(workerNodeKey, task)
1595 this.sendToWorker(workerNodeKey, task, task.transferList)
1596 this.checkAndEmitTaskExecutionEvents()
1597 }
1598
1599 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1600 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1601 this.checkAndEmitTaskQueuingEvents()
1602 return tasksQueueSize
1603 }
1604
1605 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1606 return this.workerNodes[workerNodeKey].dequeueTask()
1607 }
1608
1609 private tasksQueueSize (workerNodeKey: number): number {
1610 return this.workerNodes[workerNodeKey].tasksQueueSize()
1611 }
1612
1613 protected flushTasksQueue (workerNodeKey: number): void {
1614 while (this.tasksQueueSize(workerNodeKey) > 0) {
1615 this.executeTask(
1616 workerNodeKey,
1617 this.dequeueTask(workerNodeKey) as Task<Data>
1618 )
1619 }
1620 this.workerNodes[workerNodeKey].clearTasksQueue()
1621 }
1622
1623 private flushTasksQueues (): void {
1624 for (const [workerNodeKey] of this.workerNodes.entries()) {
1625 this.flushTasksQueue(workerNodeKey)
1626 }
1627 }
1628 }