Merge pull request #1250 from poolifier/combined-prs-branch
[poolifier.git] / src / pools / abstract-pool.ts
1 import { randomUUID } from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import { type TransferListItem } from 'node:worker_threads'
4 import type {
5 MessageValue,
6 PromiseResponseWrapper,
7 Task
8 } from '../utility-types'
9 import {
10 DEFAULT_TASK_NAME,
11 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
12 EMPTY_FUNCTION,
13 average,
14 isKillBehavior,
15 isPlainObject,
16 max,
17 median,
18 min,
19 round
20 } from '../utils'
21 import { KillBehaviors } from '../worker/worker-options'
22 import type { TaskFunction } from '../worker/task-functions'
23 import {
24 type IPool,
25 PoolEmitter,
26 PoolEvents,
27 type PoolInfo,
28 type PoolOptions,
29 type PoolType,
30 PoolTypes,
31 type TasksQueueOptions
32 } from './pool'
33 import type {
34 IWorker,
35 IWorkerNode,
36 WorkerInfo,
37 WorkerType,
38 WorkerUsage
39 } from './worker'
40 import {
41 type MeasurementStatisticsRequirements,
42 Measurements,
43 WorkerChoiceStrategies,
44 type WorkerChoiceStrategy,
45 type WorkerChoiceStrategyOptions
46 } from './selection-strategies/selection-strategies-types'
47 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
48 import { version } from './version'
49 import { WorkerNode } from './worker-node'
50 import {
51 checkFilePath,
52 checkValidTasksQueueOptions,
53 checkValidWorkerChoiceStrategy,
54 updateMeasurementStatistics
55 } from './utils'
56
57 /**
58 * Base class that implements some shared logic for all poolifier pools.
59 *
60 * @typeParam Worker - Type of worker which manages this pool.
61 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
62 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
63 */
64 export abstract class AbstractPool<
65 Worker extends IWorker,
66 Data = unknown,
67 Response = unknown
68 > implements IPool<Worker, Data, Response> {
69 /** @inheritDoc */
70 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
71
72 /** @inheritDoc */
73 public readonly emitter?: PoolEmitter
74
75 /**
76 * The task execution response promise map:
77 * - `key`: The message id of each submitted task.
78 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
79 *
80 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
81 */
82 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
83 new Map<string, PromiseResponseWrapper<Response>>()
84
85 /**
86 * Worker choice strategy context referencing a worker choice algorithm implementation.
87 */
88 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
89 Worker,
90 Data,
91 Response
92 >
93
94 /**
95 * Dynamic pool maximum size property placeholder.
96 */
97 protected readonly max?: number
98
99 /**
100 * The task functions added at runtime map:
101 * - `key`: The task function name.
102 * - `value`: The task function itself.
103 */
104 private readonly taskFunctions: Map<string, TaskFunction<Data, Response>>
105
106 /**
107 * Whether the pool is started or not.
108 */
109 private started: boolean
110 /**
111 * Whether the pool is starting or not.
112 */
113 private starting: boolean
114 /**
115 * The start timestamp of the pool.
116 */
117 private readonly startTimestamp
118
119 /**
120 * Constructs a new poolifier pool.
121 *
122 * @param numberOfWorkers - Number of workers that this pool should manage.
123 * @param filePath - Path to the worker file.
124 * @param opts - Options for the pool.
125 */
126 public constructor (
127 protected readonly numberOfWorkers: number,
128 protected readonly filePath: string,
129 protected readonly opts: PoolOptions<Worker>
130 ) {
131 if (!this.isMain()) {
132 throw new Error(
133 'Cannot start a pool from a worker with the same type as the pool'
134 )
135 }
136 this.checkNumberOfWorkers(this.numberOfWorkers)
137 checkFilePath(this.filePath)
138 this.checkPoolOptions(this.opts)
139
140 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
141 this.executeTask = this.executeTask.bind(this)
142 this.enqueueTask = this.enqueueTask.bind(this)
143
144 if (this.opts.enableEvents === true) {
145 this.emitter = new PoolEmitter()
146 }
147 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
148 Worker,
149 Data,
150 Response
151 >(
152 this,
153 this.opts.workerChoiceStrategy,
154 this.opts.workerChoiceStrategyOptions
155 )
156
157 this.setupHook()
158
159 this.taskFunctions = new Map<string, TaskFunction<Data, Response>>()
160
161 this.started = false
162 this.starting = false
163 if (this.opts.startWorkers === true) {
164 this.start()
165 }
166
167 this.startTimestamp = performance.now()
168 }
169
170 private checkNumberOfWorkers (numberOfWorkers: number): void {
171 if (numberOfWorkers == null) {
172 throw new Error(
173 'Cannot instantiate a pool without specifying the number of workers'
174 )
175 } else if (!Number.isSafeInteger(numberOfWorkers)) {
176 throw new TypeError(
177 'Cannot instantiate a pool with a non safe integer number of workers'
178 )
179 } else if (numberOfWorkers < 0) {
180 throw new RangeError(
181 'Cannot instantiate a pool with a negative number of workers'
182 )
183 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
184 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
185 }
186 }
187
188 private checkPoolOptions (opts: PoolOptions<Worker>): void {
189 if (isPlainObject(opts)) {
190 this.opts.startWorkers = opts.startWorkers ?? true
191 checkValidWorkerChoiceStrategy(
192 opts.workerChoiceStrategy as WorkerChoiceStrategy
193 )
194 this.opts.workerChoiceStrategy =
195 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
196 this.checkValidWorkerChoiceStrategyOptions(
197 opts.workerChoiceStrategyOptions as WorkerChoiceStrategyOptions
198 )
199 this.opts.workerChoiceStrategyOptions = {
200 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
201 ...opts.workerChoiceStrategyOptions
202 }
203 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
204 this.opts.enableEvents = opts.enableEvents ?? true
205 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
206 if (this.opts.enableTasksQueue) {
207 checkValidTasksQueueOptions(opts.tasksQueueOptions as TasksQueueOptions)
208 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
209 opts.tasksQueueOptions as TasksQueueOptions
210 )
211 }
212 } else {
213 throw new TypeError('Invalid pool options: must be a plain object')
214 }
215 }
216
217 private checkValidWorkerChoiceStrategyOptions (
218 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
219 ): void {
220 if (
221 workerChoiceStrategyOptions != null &&
222 !isPlainObject(workerChoiceStrategyOptions)
223 ) {
224 throw new TypeError(
225 'Invalid worker choice strategy options: must be a plain object'
226 )
227 }
228 if (
229 workerChoiceStrategyOptions?.retries != null &&
230 !Number.isSafeInteger(workerChoiceStrategyOptions.retries)
231 ) {
232 throw new TypeError(
233 'Invalid worker choice strategy options: retries must be an integer'
234 )
235 }
236 if (
237 workerChoiceStrategyOptions?.retries != null &&
238 workerChoiceStrategyOptions.retries < 0
239 ) {
240 throw new RangeError(
241 `Invalid worker choice strategy options: retries '${workerChoiceStrategyOptions.retries}' must be greater or equal than zero`
242 )
243 }
244 if (
245 workerChoiceStrategyOptions?.weights != null &&
246 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
247 ) {
248 throw new Error(
249 'Invalid worker choice strategy options: must have a weight for each worker node'
250 )
251 }
252 if (
253 workerChoiceStrategyOptions?.measurement != null &&
254 !Object.values(Measurements).includes(
255 workerChoiceStrategyOptions.measurement
256 )
257 ) {
258 throw new Error(
259 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
260 )
261 }
262 }
263
264 /** @inheritDoc */
265 public get info (): PoolInfo {
266 return {
267 version,
268 type: this.type,
269 worker: this.worker,
270 started: this.started,
271 ready: this.ready,
272 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
273 minSize: this.minSize,
274 maxSize: this.maxSize,
275 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
276 .runTime.aggregate &&
277 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
278 .waitTime.aggregate && { utilization: round(this.utilization) }),
279 workerNodes: this.workerNodes.length,
280 idleWorkerNodes: this.workerNodes.reduce(
281 (accumulator, workerNode) =>
282 workerNode.usage.tasks.executing === 0
283 ? accumulator + 1
284 : accumulator,
285 0
286 ),
287 busyWorkerNodes: this.workerNodes.reduce(
288 (accumulator, workerNode) =>
289 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
290 0
291 ),
292 executedTasks: this.workerNodes.reduce(
293 (accumulator, workerNode) =>
294 accumulator + workerNode.usage.tasks.executed,
295 0
296 ),
297 executingTasks: this.workerNodes.reduce(
298 (accumulator, workerNode) =>
299 accumulator + workerNode.usage.tasks.executing,
300 0
301 ),
302 ...(this.opts.enableTasksQueue === true && {
303 queuedTasks: this.workerNodes.reduce(
304 (accumulator, workerNode) =>
305 accumulator + workerNode.usage.tasks.queued,
306 0
307 )
308 }),
309 ...(this.opts.enableTasksQueue === true && {
310 maxQueuedTasks: this.workerNodes.reduce(
311 (accumulator, workerNode) =>
312 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
313 0
314 )
315 }),
316 ...(this.opts.enableTasksQueue === true && {
317 backPressure: this.hasBackPressure()
318 }),
319 ...(this.opts.enableTasksQueue === true && {
320 stolenTasks: this.workerNodes.reduce(
321 (accumulator, workerNode) =>
322 accumulator + workerNode.usage.tasks.stolen,
323 0
324 )
325 }),
326 failedTasks: this.workerNodes.reduce(
327 (accumulator, workerNode) =>
328 accumulator + workerNode.usage.tasks.failed,
329 0
330 ),
331 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
332 .runTime.aggregate && {
333 runTime: {
334 minimum: round(
335 min(
336 ...this.workerNodes.map(
337 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
338 )
339 )
340 ),
341 maximum: round(
342 max(
343 ...this.workerNodes.map(
344 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
345 )
346 )
347 ),
348 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
349 .runTime.average && {
350 average: round(
351 average(
352 this.workerNodes.reduce<number[]>(
353 (accumulator, workerNode) =>
354 accumulator.concat(workerNode.usage.runTime.history),
355 []
356 )
357 )
358 )
359 }),
360 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
361 .runTime.median && {
362 median: round(
363 median(
364 this.workerNodes.reduce<number[]>(
365 (accumulator, workerNode) =>
366 accumulator.concat(workerNode.usage.runTime.history),
367 []
368 )
369 )
370 )
371 })
372 }
373 }),
374 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
375 .waitTime.aggregate && {
376 waitTime: {
377 minimum: round(
378 min(
379 ...this.workerNodes.map(
380 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
381 )
382 )
383 ),
384 maximum: round(
385 max(
386 ...this.workerNodes.map(
387 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
388 )
389 )
390 ),
391 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
392 .waitTime.average && {
393 average: round(
394 average(
395 this.workerNodes.reduce<number[]>(
396 (accumulator, workerNode) =>
397 accumulator.concat(workerNode.usage.waitTime.history),
398 []
399 )
400 )
401 )
402 }),
403 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
404 .waitTime.median && {
405 median: round(
406 median(
407 this.workerNodes.reduce<number[]>(
408 (accumulator, workerNode) =>
409 accumulator.concat(workerNode.usage.waitTime.history),
410 []
411 )
412 )
413 )
414 })
415 }
416 })
417 }
418 }
419
420 /**
421 * The pool readiness boolean status.
422 */
423 private get ready (): boolean {
424 return (
425 this.workerNodes.reduce(
426 (accumulator, workerNode) =>
427 !workerNode.info.dynamic && workerNode.info.ready
428 ? accumulator + 1
429 : accumulator,
430 0
431 ) >= this.minSize
432 )
433 }
434
435 /**
436 * The approximate pool utilization.
437 *
438 * @returns The pool utilization.
439 */
440 private get utilization (): number {
441 const poolTimeCapacity =
442 (performance.now() - this.startTimestamp) * this.maxSize
443 const totalTasksRunTime = this.workerNodes.reduce(
444 (accumulator, workerNode) =>
445 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
446 0
447 )
448 const totalTasksWaitTime = this.workerNodes.reduce(
449 (accumulator, workerNode) =>
450 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
451 0
452 )
453 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
454 }
455
456 /**
457 * The pool type.
458 *
459 * If it is `'dynamic'`, it provides the `max` property.
460 */
461 protected abstract get type (): PoolType
462
463 /**
464 * The worker type.
465 */
466 protected abstract get worker (): WorkerType
467
468 /**
469 * The pool minimum size.
470 */
471 protected get minSize (): number {
472 return this.numberOfWorkers
473 }
474
475 /**
476 * The pool maximum size.
477 */
478 protected get maxSize (): number {
479 return this.max ?? this.numberOfWorkers
480 }
481
482 /**
483 * Checks if the worker id sent in the received message from a worker is valid.
484 *
485 * @param message - The received message.
486 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
487 */
488 private checkMessageWorkerId (message: MessageValue<Response>): void {
489 if (message.workerId == null) {
490 throw new Error('Worker message received without worker id')
491 } else if (
492 message.workerId != null &&
493 this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
494 ) {
495 throw new Error(
496 `Worker message received from unknown worker '${message.workerId}'`
497 )
498 }
499 }
500
501 /**
502 * Gets the given worker its worker node key.
503 *
504 * @param worker - The worker.
505 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
506 */
507 private getWorkerNodeKeyByWorker (worker: Worker): number {
508 return this.workerNodes.findIndex(
509 workerNode => workerNode.worker === worker
510 )
511 }
512
513 /**
514 * Gets the worker node key given its worker id.
515 *
516 * @param workerId - The worker id.
517 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
518 */
519 private getWorkerNodeKeyByWorkerId (workerId: number | undefined): number {
520 return this.workerNodes.findIndex(
521 workerNode => workerNode.info.id === workerId
522 )
523 }
524
525 /** @inheritDoc */
526 public setWorkerChoiceStrategy (
527 workerChoiceStrategy: WorkerChoiceStrategy,
528 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
529 ): void {
530 checkValidWorkerChoiceStrategy(workerChoiceStrategy)
531 this.opts.workerChoiceStrategy = workerChoiceStrategy
532 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
533 this.opts.workerChoiceStrategy
534 )
535 if (workerChoiceStrategyOptions != null) {
536 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
537 }
538 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
539 workerNode.resetUsage()
540 this.sendStatisticsMessageToWorker(workerNodeKey)
541 }
542 }
543
544 /** @inheritDoc */
545 public setWorkerChoiceStrategyOptions (
546 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
547 ): void {
548 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
549 this.opts.workerChoiceStrategyOptions = {
550 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
551 ...workerChoiceStrategyOptions
552 }
553 this.workerChoiceStrategyContext.setOptions(
554 this.opts.workerChoiceStrategyOptions
555 )
556 }
557
558 /** @inheritDoc */
559 public enableTasksQueue (
560 enable: boolean,
561 tasksQueueOptions?: TasksQueueOptions
562 ): void {
563 if (this.opts.enableTasksQueue === true && !enable) {
564 this.unsetTaskStealing()
565 this.unsetTasksStealingOnBackPressure()
566 this.flushTasksQueues()
567 }
568 this.opts.enableTasksQueue = enable
569 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
570 }
571
572 /** @inheritDoc */
573 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
574 if (this.opts.enableTasksQueue === true) {
575 checkValidTasksQueueOptions(tasksQueueOptions)
576 this.opts.tasksQueueOptions =
577 this.buildTasksQueueOptions(tasksQueueOptions)
578 this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number)
579 if (this.opts.tasksQueueOptions.taskStealing === true) {
580 this.setTaskStealing()
581 } else {
582 this.unsetTaskStealing()
583 }
584 if (this.opts.tasksQueueOptions.tasksStealingOnBackPressure === true) {
585 this.setTasksStealingOnBackPressure()
586 } else {
587 this.unsetTasksStealingOnBackPressure()
588 }
589 } else if (this.opts.tasksQueueOptions != null) {
590 delete this.opts.tasksQueueOptions
591 }
592 }
593
594 private buildTasksQueueOptions (
595 tasksQueueOptions: TasksQueueOptions
596 ): TasksQueueOptions {
597 return {
598 ...{
599 size: Math.pow(this.maxSize, 2),
600 concurrency: 1,
601 taskStealing: true,
602 tasksStealingOnBackPressure: true
603 },
604 ...tasksQueueOptions
605 }
606 }
607
608 private setTasksQueueSize (size: number): void {
609 for (const workerNode of this.workerNodes) {
610 workerNode.tasksQueueBackPressureSize = size
611 }
612 }
613
614 private setTaskStealing (): void {
615 for (const [workerNodeKey] of this.workerNodes.entries()) {
616 this.workerNodes[workerNodeKey].onEmptyQueue =
617 this.taskStealingOnEmptyQueue.bind(this)
618 }
619 }
620
621 private unsetTaskStealing (): void {
622 for (const [workerNodeKey] of this.workerNodes.entries()) {
623 delete this.workerNodes[workerNodeKey].onEmptyQueue
624 }
625 }
626
627 private setTasksStealingOnBackPressure (): void {
628 for (const [workerNodeKey] of this.workerNodes.entries()) {
629 this.workerNodes[workerNodeKey].onBackPressure =
630 this.tasksStealingOnBackPressure.bind(this)
631 }
632 }
633
634 private unsetTasksStealingOnBackPressure (): void {
635 for (const [workerNodeKey] of this.workerNodes.entries()) {
636 delete this.workerNodes[workerNodeKey].onBackPressure
637 }
638 }
639
640 /**
641 * Whether the pool is full or not.
642 *
643 * The pool filling boolean status.
644 */
645 protected get full (): boolean {
646 return this.workerNodes.length >= this.maxSize
647 }
648
649 /**
650 * Whether the pool is busy or not.
651 *
652 * The pool busyness boolean status.
653 */
654 protected abstract get busy (): boolean
655
656 /**
657 * Whether worker nodes are executing concurrently their tasks quota or not.
658 *
659 * @returns Worker nodes busyness boolean status.
660 */
661 protected internalBusy (): boolean {
662 if (this.opts.enableTasksQueue === true) {
663 return (
664 this.workerNodes.findIndex(
665 workerNode =>
666 workerNode.info.ready &&
667 workerNode.usage.tasks.executing <
668 (this.opts.tasksQueueOptions?.concurrency as number)
669 ) === -1
670 )
671 }
672 return (
673 this.workerNodes.findIndex(
674 workerNode =>
675 workerNode.info.ready && workerNode.usage.tasks.executing === 0
676 ) === -1
677 )
678 }
679
680 private async sendTaskFunctionOperationToWorker (
681 workerNodeKey: number,
682 message: MessageValue<Data>
683 ): Promise<boolean> {
684 return await new Promise<boolean>((resolve, reject) => {
685 const workerId = this.getWorkerInfo(workerNodeKey).id as number
686 this.registerWorkerMessageListener(workerNodeKey, message => {
687 if (
688 message.workerId === workerId &&
689 message.taskFunctionOperationStatus === true
690 ) {
691 resolve(true)
692 } else if (
693 message.workerId === workerId &&
694 message.taskFunctionOperationStatus === false
695 ) {
696 reject(
697 new Error(
698 `Task function operation '${
699 message.taskFunctionOperation as string
700 }' failed on worker ${message.workerId} with error: '${
701 message.workerError?.message as string
702 }'`
703 )
704 )
705 }
706 })
707 this.sendToWorker(workerNodeKey, message)
708 })
709 }
710
711 private async sendTaskFunctionOperationToWorkers (
712 message: MessageValue<Data>
713 ): Promise<boolean> {
714 return await new Promise<boolean>((resolve, reject) => {
715 const responsesReceived = new Array<MessageValue<Data | Response>>()
716 for (const [workerNodeKey] of this.workerNodes.entries()) {
717 this.registerWorkerMessageListener(workerNodeKey, message => {
718 if (message.taskFunctionOperationStatus != null) {
719 responsesReceived.push(message)
720 if (
721 responsesReceived.length === this.workerNodes.length &&
722 responsesReceived.every(
723 message => message.taskFunctionOperationStatus === true
724 )
725 ) {
726 resolve(true)
727 } else if (
728 responsesReceived.length === this.workerNodes.length &&
729 responsesReceived.some(
730 message => message.taskFunctionOperationStatus === false
731 )
732 ) {
733 const errorResponse = responsesReceived.find(
734 response => response.taskFunctionOperationStatus === false
735 )
736 reject(
737 new Error(
738 `Task function operation '${
739 message.taskFunctionOperation as string
740 }' failed on worker ${
741 errorResponse?.workerId as number
742 } with error: '${
743 errorResponse?.workerError?.message as string
744 }'`
745 )
746 )
747 }
748 }
749 })
750 this.sendToWorker(workerNodeKey, message)
751 }
752 })
753 }
754
755 /** @inheritDoc */
756 public hasTaskFunction (name: string): boolean {
757 for (const workerNode of this.workerNodes) {
758 if (
759 Array.isArray(workerNode.info.taskFunctionNames) &&
760 workerNode.info.taskFunctionNames.includes(name)
761 ) {
762 return true
763 }
764 }
765 return false
766 }
767
768 /** @inheritDoc */
769 public async addTaskFunction (
770 name: string,
771 fn: TaskFunction<Data, Response>
772 ): Promise<boolean> {
773 if (typeof name !== 'string') {
774 throw new TypeError('name argument must be a string')
775 }
776 if (typeof name === 'string' && name.trim().length === 0) {
777 throw new TypeError('name argument must not be an empty string')
778 }
779 if (typeof fn !== 'function') {
780 throw new TypeError('fn argument must be a function')
781 }
782 const opResult = await this.sendTaskFunctionOperationToWorkers({
783 taskFunctionOperation: 'add',
784 taskFunctionName: name,
785 taskFunction: fn.toString()
786 })
787 this.taskFunctions.set(name, fn)
788 return opResult
789 }
790
791 /** @inheritDoc */
792 public async removeTaskFunction (name: string): Promise<boolean> {
793 if (!this.taskFunctions.has(name)) {
794 throw new Error(
795 'Cannot remove a task function not handled on the pool side'
796 )
797 }
798 const opResult = await this.sendTaskFunctionOperationToWorkers({
799 taskFunctionOperation: 'remove',
800 taskFunctionName: name
801 })
802 this.deleteTaskFunctionWorkerUsages(name)
803 this.taskFunctions.delete(name)
804 return opResult
805 }
806
807 /** @inheritDoc */
808 public listTaskFunctionNames (): string[] {
809 for (const workerNode of this.workerNodes) {
810 if (
811 Array.isArray(workerNode.info.taskFunctionNames) &&
812 workerNode.info.taskFunctionNames.length > 0
813 ) {
814 return workerNode.info.taskFunctionNames
815 }
816 }
817 return []
818 }
819
820 /** @inheritDoc */
821 public async setDefaultTaskFunction (name: string): Promise<boolean> {
822 return await this.sendTaskFunctionOperationToWorkers({
823 taskFunctionOperation: 'default',
824 taskFunctionName: name
825 })
826 }
827
828 private deleteTaskFunctionWorkerUsages (name: string): void {
829 for (const workerNode of this.workerNodes) {
830 workerNode.deleteTaskFunctionWorkerUsage(name)
831 }
832 }
833
834 private shallExecuteTask (workerNodeKey: number): boolean {
835 return (
836 this.tasksQueueSize(workerNodeKey) === 0 &&
837 this.workerNodes[workerNodeKey].usage.tasks.executing <
838 (this.opts.tasksQueueOptions?.concurrency as number)
839 )
840 }
841
842 /** @inheritDoc */
843 public async execute (
844 data?: Data,
845 name?: string,
846 transferList?: TransferListItem[]
847 ): Promise<Response> {
848 return await new Promise<Response>((resolve, reject) => {
849 if (!this.started) {
850 reject(new Error('Cannot execute a task on not started pool'))
851 return
852 }
853 if (name != null && typeof name !== 'string') {
854 reject(new TypeError('name argument must be a string'))
855 return
856 }
857 if (
858 name != null &&
859 typeof name === 'string' &&
860 name.trim().length === 0
861 ) {
862 reject(new TypeError('name argument must not be an empty string'))
863 return
864 }
865 if (transferList != null && !Array.isArray(transferList)) {
866 reject(new TypeError('transferList argument must be an array'))
867 return
868 }
869 const timestamp = performance.now()
870 const workerNodeKey = this.chooseWorkerNode()
871 const task: Task<Data> = {
872 name: name ?? DEFAULT_TASK_NAME,
873 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
874 data: data ?? ({} as Data),
875 transferList,
876 timestamp,
877 taskId: randomUUID()
878 }
879 this.promiseResponseMap.set(task.taskId as string, {
880 resolve,
881 reject,
882 workerNodeKey
883 })
884 if (
885 this.opts.enableTasksQueue === false ||
886 (this.opts.enableTasksQueue === true &&
887 this.shallExecuteTask(workerNodeKey))
888 ) {
889 this.executeTask(workerNodeKey, task)
890 } else {
891 this.enqueueTask(workerNodeKey, task)
892 }
893 })
894 }
895
896 /** @inheritdoc */
897 public start (): void {
898 this.starting = true
899 while (
900 this.workerNodes.reduce(
901 (accumulator, workerNode) =>
902 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
903 0
904 ) < this.numberOfWorkers
905 ) {
906 this.createAndSetupWorkerNode()
907 }
908 this.starting = false
909 this.started = true
910 }
911
912 /** @inheritDoc */
913 public async destroy (): Promise<void> {
914 await Promise.all(
915 this.workerNodes.map(async (_, workerNodeKey) => {
916 await this.destroyWorkerNode(workerNodeKey)
917 })
918 )
919 this.emitter?.emit(PoolEvents.destroy, this.info)
920 this.started = false
921 }
922
923 protected async sendKillMessageToWorker (
924 workerNodeKey: number
925 ): Promise<void> {
926 await new Promise<void>((resolve, reject) => {
927 this.registerWorkerMessageListener(workerNodeKey, message => {
928 if (message.kill === 'success') {
929 resolve()
930 } else if (message.kill === 'failure') {
931 reject(
932 new Error(
933 `Worker ${
934 message.workerId as number
935 } kill message handling failed`
936 )
937 )
938 }
939 })
940 this.sendToWorker(workerNodeKey, { kill: true })
941 })
942 }
943
944 /**
945 * Terminates the worker node given its worker node key.
946 *
947 * @param workerNodeKey - The worker node key.
948 */
949 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
950
951 /**
952 * Setup hook to execute code before worker nodes are created in the abstract constructor.
953 * Can be overridden.
954 *
955 * @virtual
956 */
957 protected setupHook (): void {
958 /* Intentionally empty */
959 }
960
961 /**
962 * Should return whether the worker is the main worker or not.
963 */
964 protected abstract isMain (): boolean
965
966 /**
967 * Hook executed before the worker task execution.
968 * Can be overridden.
969 *
970 * @param workerNodeKey - The worker node key.
971 * @param task - The task to execute.
972 */
973 protected beforeTaskExecutionHook (
974 workerNodeKey: number,
975 task: Task<Data>
976 ): void {
977 if (this.workerNodes[workerNodeKey]?.usage != null) {
978 const workerUsage = this.workerNodes[workerNodeKey].usage
979 ++workerUsage.tasks.executing
980 this.updateWaitTimeWorkerUsage(workerUsage, task)
981 }
982 if (
983 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
984 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
985 task.name as string
986 ) != null
987 ) {
988 const taskFunctionWorkerUsage = this.workerNodes[
989 workerNodeKey
990 ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
991 ++taskFunctionWorkerUsage.tasks.executing
992 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
993 }
994 }
995
996 /**
997 * Hook executed after the worker task execution.
998 * Can be overridden.
999 *
1000 * @param workerNodeKey - The worker node key.
1001 * @param message - The received message.
1002 */
1003 protected afterTaskExecutionHook (
1004 workerNodeKey: number,
1005 message: MessageValue<Response>
1006 ): void {
1007 if (this.workerNodes[workerNodeKey]?.usage != null) {
1008 const workerUsage = this.workerNodes[workerNodeKey].usage
1009 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
1010 this.updateRunTimeWorkerUsage(workerUsage, message)
1011 this.updateEluWorkerUsage(workerUsage, message)
1012 }
1013 if (
1014 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1015 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
1016 message.taskPerformance?.name as string
1017 ) != null
1018 ) {
1019 const taskFunctionWorkerUsage = this.workerNodes[
1020 workerNodeKey
1021 ].getTaskFunctionWorkerUsage(
1022 message.taskPerformance?.name as string
1023 ) as WorkerUsage
1024 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
1025 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
1026 this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
1027 }
1028 }
1029
1030 /**
1031 * Whether the worker node shall update its task function worker usage or not.
1032 *
1033 * @param workerNodeKey - The worker node key.
1034 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1035 */
1036 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
1037 const workerInfo = this.getWorkerInfo(workerNodeKey)
1038 return (
1039 workerInfo != null &&
1040 Array.isArray(workerInfo.taskFunctionNames) &&
1041 workerInfo.taskFunctionNames.length > 2
1042 )
1043 }
1044
1045 private updateTaskStatisticsWorkerUsage (
1046 workerUsage: WorkerUsage,
1047 message: MessageValue<Response>
1048 ): void {
1049 const workerTaskStatistics = workerUsage.tasks
1050 if (
1051 workerTaskStatistics.executing != null &&
1052 workerTaskStatistics.executing > 0
1053 ) {
1054 --workerTaskStatistics.executing
1055 }
1056 if (message.workerError == null) {
1057 ++workerTaskStatistics.executed
1058 } else {
1059 ++workerTaskStatistics.failed
1060 }
1061 }
1062
1063 private updateRunTimeWorkerUsage (
1064 workerUsage: WorkerUsage,
1065 message: MessageValue<Response>
1066 ): void {
1067 if (message.workerError != null) {
1068 return
1069 }
1070 updateMeasurementStatistics(
1071 workerUsage.runTime,
1072 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
1073 message.taskPerformance?.runTime ?? 0
1074 )
1075 }
1076
1077 private updateWaitTimeWorkerUsage (
1078 workerUsage: WorkerUsage,
1079 task: Task<Data>
1080 ): void {
1081 const timestamp = performance.now()
1082 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
1083 updateMeasurementStatistics(
1084 workerUsage.waitTime,
1085 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
1086 taskWaitTime
1087 )
1088 }
1089
1090 private updateEluWorkerUsage (
1091 workerUsage: WorkerUsage,
1092 message: MessageValue<Response>
1093 ): void {
1094 if (message.workerError != null) {
1095 return
1096 }
1097 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
1098 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
1099 updateMeasurementStatistics(
1100 workerUsage.elu.active,
1101 eluTaskStatisticsRequirements,
1102 message.taskPerformance?.elu?.active ?? 0
1103 )
1104 updateMeasurementStatistics(
1105 workerUsage.elu.idle,
1106 eluTaskStatisticsRequirements,
1107 message.taskPerformance?.elu?.idle ?? 0
1108 )
1109 if (eluTaskStatisticsRequirements.aggregate) {
1110 if (message.taskPerformance?.elu != null) {
1111 if (workerUsage.elu.utilization != null) {
1112 workerUsage.elu.utilization =
1113 (workerUsage.elu.utilization +
1114 message.taskPerformance.elu.utilization) /
1115 2
1116 } else {
1117 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
1118 }
1119 }
1120 }
1121 }
1122
1123 /**
1124 * Chooses a worker node for the next task.
1125 *
1126 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
1127 *
1128 * @returns The chosen worker node key
1129 */
1130 private chooseWorkerNode (): number {
1131 if (this.shallCreateDynamicWorker()) {
1132 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
1133 if (
1134 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1135 ) {
1136 return workerNodeKey
1137 }
1138 }
1139 return this.workerChoiceStrategyContext.execute()
1140 }
1141
1142 /**
1143 * Conditions for dynamic worker creation.
1144 *
1145 * @returns Whether to create a dynamic worker or not.
1146 */
1147 private shallCreateDynamicWorker (): boolean {
1148 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
1149 }
1150
1151 /**
1152 * Sends a message to worker given its worker node key.
1153 *
1154 * @param workerNodeKey - The worker node key.
1155 * @param message - The message.
1156 * @param transferList - The optional array of transferable objects.
1157 */
1158 protected abstract sendToWorker (
1159 workerNodeKey: number,
1160 message: MessageValue<Data>,
1161 transferList?: TransferListItem[]
1162 ): void
1163
1164 /**
1165 * Creates a new worker.
1166 *
1167 * @returns Newly created worker.
1168 */
1169 protected abstract createWorker (): Worker
1170
1171 /**
1172 * Creates a new, completely set up worker node.
1173 *
1174 * @returns New, completely set up worker node key.
1175 */
1176 protected createAndSetupWorkerNode (): number {
1177 const worker = this.createWorker()
1178
1179 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
1180 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
1181 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1182 worker.on('error', error => {
1183 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1184 const workerInfo = this.getWorkerInfo(workerNodeKey)
1185 workerInfo.ready = false
1186 this.workerNodes[workerNodeKey].closeChannel()
1187 this.emitter?.emit(PoolEvents.error, error)
1188 if (
1189 this.started &&
1190 !this.starting &&
1191 this.opts.restartWorkerOnError === true
1192 ) {
1193 if (workerInfo.dynamic) {
1194 this.createAndSetupDynamicWorkerNode()
1195 } else {
1196 this.createAndSetupWorkerNode()
1197 }
1198 }
1199 if (this.started && this.opts.enableTasksQueue === true) {
1200 this.redistributeQueuedTasks(workerNodeKey)
1201 }
1202 })
1203 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
1204 worker.once('exit', () => {
1205 this.removeWorkerNode(worker)
1206 })
1207
1208 const workerNodeKey = this.addWorkerNode(worker)
1209
1210 this.afterWorkerNodeSetup(workerNodeKey)
1211
1212 return workerNodeKey
1213 }
1214
1215 /**
1216 * Creates a new, completely set up dynamic worker node.
1217 *
1218 * @returns New, completely set up dynamic worker node key.
1219 */
1220 protected createAndSetupDynamicWorkerNode (): number {
1221 const workerNodeKey = this.createAndSetupWorkerNode()
1222 this.registerWorkerMessageListener(workerNodeKey, message => {
1223 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1224 message.workerId
1225 )
1226 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
1227 // Kill message received from worker
1228 if (
1229 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1230 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
1231 ((this.opts.enableTasksQueue === false &&
1232 workerUsage.tasks.executing === 0) ||
1233 (this.opts.enableTasksQueue === true &&
1234 workerUsage.tasks.executing === 0 &&
1235 this.tasksQueueSize(localWorkerNodeKey) === 0)))
1236 ) {
1237 this.destroyWorkerNode(localWorkerNodeKey).catch(error => {
1238 this.emitter?.emit(PoolEvents.error, error)
1239 })
1240 }
1241 })
1242 const workerInfo = this.getWorkerInfo(workerNodeKey)
1243 this.sendToWorker(workerNodeKey, {
1244 checkActive: true
1245 })
1246 if (this.taskFunctions.size > 0) {
1247 for (const [taskFunctionName, taskFunction] of this.taskFunctions) {
1248 this.sendTaskFunctionOperationToWorker(workerNodeKey, {
1249 taskFunctionOperation: 'add',
1250 taskFunctionName,
1251 taskFunction: taskFunction.toString()
1252 }).catch(error => {
1253 this.emitter?.emit(PoolEvents.error, error)
1254 })
1255 }
1256 }
1257 workerInfo.dynamic = true
1258 if (
1259 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
1260 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1261 ) {
1262 workerInfo.ready = true
1263 }
1264 this.checkAndEmitDynamicWorkerCreationEvents()
1265 return workerNodeKey
1266 }
1267
1268 /**
1269 * Registers a listener callback on the worker given its worker node key.
1270 *
1271 * @param workerNodeKey - The worker node key.
1272 * @param listener - The message listener callback.
1273 */
1274 protected abstract registerWorkerMessageListener<
1275 Message extends Data | Response
1276 >(
1277 workerNodeKey: number,
1278 listener: (message: MessageValue<Message>) => void
1279 ): void
1280
1281 /**
1282 * Method hooked up after a worker node has been newly created.
1283 * Can be overridden.
1284 *
1285 * @param workerNodeKey - The newly created worker node key.
1286 */
1287 protected afterWorkerNodeSetup (workerNodeKey: number): void {
1288 // Listen to worker messages.
1289 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
1290 // Send the startup message to worker.
1291 this.sendStartupMessageToWorker(workerNodeKey)
1292 // Send the statistics message to worker.
1293 this.sendStatisticsMessageToWorker(workerNodeKey)
1294 if (this.opts.enableTasksQueue === true) {
1295 if (this.opts.tasksQueueOptions?.taskStealing === true) {
1296 this.workerNodes[workerNodeKey].onEmptyQueue =
1297 this.taskStealingOnEmptyQueue.bind(this)
1298 }
1299 if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
1300 this.workerNodes[workerNodeKey].onBackPressure =
1301 this.tasksStealingOnBackPressure.bind(this)
1302 }
1303 }
1304 }
1305
1306 /**
1307 * Sends the startup message to worker given its worker node key.
1308 *
1309 * @param workerNodeKey - The worker node key.
1310 */
1311 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1312
1313 /**
1314 * Sends the statistics message to worker given its worker node key.
1315 *
1316 * @param workerNodeKey - The worker node key.
1317 */
1318 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
1319 this.sendToWorker(workerNodeKey, {
1320 statistics: {
1321 runTime:
1322 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1323 .runTime.aggregate,
1324 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1325 .elu.aggregate
1326 }
1327 })
1328 }
1329
1330 private redistributeQueuedTasks (workerNodeKey: number): void {
1331 while (this.tasksQueueSize(workerNodeKey) > 0) {
1332 const destinationWorkerNodeKey = this.workerNodes.reduce(
1333 (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
1334 return workerNode.info.ready &&
1335 workerNode.usage.tasks.queued <
1336 workerNodes[minWorkerNodeKey].usage.tasks.queued
1337 ? workerNodeKey
1338 : minWorkerNodeKey
1339 },
1340 0
1341 )
1342 const task = this.dequeueTask(workerNodeKey) as Task<Data>
1343 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1344 this.executeTask(destinationWorkerNodeKey, task)
1345 } else {
1346 this.enqueueTask(destinationWorkerNodeKey, task)
1347 }
1348 }
1349 }
1350
1351 private updateTaskStolenStatisticsWorkerUsage (
1352 workerNodeKey: number,
1353 taskName: string
1354 ): void {
1355 const workerNode = this.workerNodes[workerNodeKey]
1356 if (workerNode?.usage != null) {
1357 ++workerNode.usage.tasks.stolen
1358 }
1359 if (
1360 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1361 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1362 ) {
1363 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1364 taskName
1365 ) as WorkerUsage
1366 ++taskFunctionWorkerUsage.tasks.stolen
1367 }
1368 }
1369
1370 private taskStealingOnEmptyQueue (workerId: number): void {
1371 const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
1372 const workerNodes = this.workerNodes
1373 .slice()
1374 .sort(
1375 (workerNodeA, workerNodeB) =>
1376 workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
1377 )
1378 const sourceWorkerNode = workerNodes.find(
1379 workerNode =>
1380 workerNode.info.ready &&
1381 workerNode.info.id !== workerId &&
1382 workerNode.usage.tasks.queued > 0
1383 )
1384 if (sourceWorkerNode != null) {
1385 const task = sourceWorkerNode.popTask() as Task<Data>
1386 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1387 this.executeTask(destinationWorkerNodeKey, task)
1388 } else {
1389 this.enqueueTask(destinationWorkerNodeKey, task)
1390 }
1391 this.updateTaskStolenStatisticsWorkerUsage(
1392 destinationWorkerNodeKey,
1393 task.name as string
1394 )
1395 }
1396 }
1397
1398 private tasksStealingOnBackPressure (workerId: number): void {
1399 const sizeOffset = 1
1400 if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) {
1401 return
1402 }
1403 const sourceWorkerNode =
1404 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
1405 const workerNodes = this.workerNodes
1406 .slice()
1407 .sort(
1408 (workerNodeA, workerNodeB) =>
1409 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1410 )
1411 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1412 if (
1413 sourceWorkerNode.usage.tasks.queued > 0 &&
1414 workerNode.info.ready &&
1415 workerNode.info.id !== workerId &&
1416 workerNode.usage.tasks.queued <
1417 (this.opts.tasksQueueOptions?.size as number) - sizeOffset
1418 ) {
1419 const task = sourceWorkerNode.popTask() as Task<Data>
1420 if (this.shallExecuteTask(workerNodeKey)) {
1421 this.executeTask(workerNodeKey, task)
1422 } else {
1423 this.enqueueTask(workerNodeKey, task)
1424 }
1425 this.updateTaskStolenStatisticsWorkerUsage(
1426 workerNodeKey,
1427 task.name as string
1428 )
1429 }
1430 }
1431 }
1432
1433 /**
1434 * This method is the listener registered for each worker message.
1435 *
1436 * @returns The listener function to execute when a message is received from a worker.
1437 */
1438 protected workerListener (): (message: MessageValue<Response>) => void {
1439 return message => {
1440 this.checkMessageWorkerId(message)
1441 if (message.ready != null && message.taskFunctionNames != null) {
1442 // Worker ready response received from worker
1443 this.handleWorkerReadyResponse(message)
1444 } else if (message.taskId != null) {
1445 // Task execution response received from worker
1446 this.handleTaskExecutionResponse(message)
1447 } else if (message.taskFunctionNames != null) {
1448 // Task function names message received from worker
1449 this.getWorkerInfo(
1450 this.getWorkerNodeKeyByWorkerId(message.workerId)
1451 ).taskFunctionNames = message.taskFunctionNames
1452 }
1453 }
1454 }
1455
1456 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
1457 if (message.ready === false) {
1458 throw new Error(
1459 `Worker ${message.workerId as number} failed to initialize`
1460 )
1461 }
1462 const workerInfo = this.getWorkerInfo(
1463 this.getWorkerNodeKeyByWorkerId(message.workerId)
1464 )
1465 workerInfo.ready = message.ready as boolean
1466 workerInfo.taskFunctionNames = message.taskFunctionNames
1467 if (this.ready) {
1468 this.emitter?.emit(PoolEvents.ready, this.info)
1469 }
1470 }
1471
1472 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1473 const { taskId, workerError, data } = message
1474 const promiseResponse = this.promiseResponseMap.get(taskId as string)
1475 if (promiseResponse != null) {
1476 if (workerError != null) {
1477 this.emitter?.emit(PoolEvents.taskError, workerError)
1478 promiseResponse.reject(workerError.message)
1479 } else {
1480 promiseResponse.resolve(data as Response)
1481 }
1482 const workerNodeKey = promiseResponse.workerNodeKey
1483 this.afterTaskExecutionHook(workerNodeKey, message)
1484 this.workerChoiceStrategyContext.update(workerNodeKey)
1485 this.promiseResponseMap.delete(taskId as string)
1486 if (
1487 this.opts.enableTasksQueue === true &&
1488 this.tasksQueueSize(workerNodeKey) > 0 &&
1489 this.workerNodes[workerNodeKey].usage.tasks.executing <
1490 (this.opts.tasksQueueOptions?.concurrency as number)
1491 ) {
1492 this.executeTask(
1493 workerNodeKey,
1494 this.dequeueTask(workerNodeKey) as Task<Data>
1495 )
1496 }
1497 }
1498 }
1499
1500 private checkAndEmitTaskExecutionEvents (): void {
1501 if (this.busy) {
1502 this.emitter?.emit(PoolEvents.busy, this.info)
1503 }
1504 }
1505
1506 private checkAndEmitTaskQueuingEvents (): void {
1507 if (this.hasBackPressure()) {
1508 this.emitter?.emit(PoolEvents.backPressure, this.info)
1509 }
1510 }
1511
1512 private checkAndEmitDynamicWorkerCreationEvents (): void {
1513 if (this.type === PoolTypes.dynamic) {
1514 if (this.full) {
1515 this.emitter?.emit(PoolEvents.full, this.info)
1516 }
1517 }
1518 }
1519
1520 /**
1521 * Gets the worker information given its worker node key.
1522 *
1523 * @param workerNodeKey - The worker node key.
1524 * @returns The worker information.
1525 */
1526 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
1527 return this.workerNodes[workerNodeKey].info
1528 }
1529
1530 /**
1531 * Adds the given worker in the pool worker nodes.
1532 *
1533 * @param worker - The worker.
1534 * @returns The added worker node key.
1535 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1536 */
1537 private addWorkerNode (worker: Worker): number {
1538 const workerNode = new WorkerNode<Worker, Data>(
1539 worker,
1540 this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2)
1541 )
1542 // Flag the worker node as ready at pool startup.
1543 if (this.starting) {
1544 workerNode.info.ready = true
1545 }
1546 this.workerNodes.push(workerNode)
1547 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1548 if (workerNodeKey === -1) {
1549 throw new Error('Worker added not found in worker nodes')
1550 }
1551 return workerNodeKey
1552 }
1553
1554 /**
1555 * Removes the given worker from the pool worker nodes.
1556 *
1557 * @param worker - The worker.
1558 */
1559 private removeWorkerNode (worker: Worker): void {
1560 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1561 if (workerNodeKey !== -1) {
1562 this.workerNodes.splice(workerNodeKey, 1)
1563 this.workerChoiceStrategyContext.remove(workerNodeKey)
1564 }
1565 }
1566
1567 /** @inheritDoc */
1568 public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
1569 return (
1570 this.opts.enableTasksQueue === true &&
1571 this.workerNodes[workerNodeKey].hasBackPressure()
1572 )
1573 }
1574
1575 private hasBackPressure (): boolean {
1576 return (
1577 this.opts.enableTasksQueue === true &&
1578 this.workerNodes.findIndex(
1579 workerNode => !workerNode.hasBackPressure()
1580 ) === -1
1581 )
1582 }
1583
1584 /**
1585 * Executes the given task on the worker given its worker node key.
1586 *
1587 * @param workerNodeKey - The worker node key.
1588 * @param task - The task to execute.
1589 */
1590 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1591 this.beforeTaskExecutionHook(workerNodeKey, task)
1592 this.sendToWorker(workerNodeKey, task, task.transferList)
1593 this.checkAndEmitTaskExecutionEvents()
1594 }
1595
1596 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1597 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1598 this.checkAndEmitTaskQueuingEvents()
1599 return tasksQueueSize
1600 }
1601
1602 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1603 return this.workerNodes[workerNodeKey].dequeueTask()
1604 }
1605
1606 private tasksQueueSize (workerNodeKey: number): number {
1607 return this.workerNodes[workerNodeKey].tasksQueueSize()
1608 }
1609
1610 protected flushTasksQueue (workerNodeKey: number): void {
1611 while (this.tasksQueueSize(workerNodeKey) > 0) {
1612 this.executeTask(
1613 workerNodeKey,
1614 this.dequeueTask(workerNodeKey) as Task<Data>
1615 )
1616 }
1617 this.workerNodes[workerNodeKey].clearTasksQueue()
1618 }
1619
1620 private flushTasksQueues (): void {
1621 for (const [workerNodeKey] of this.workerNodes.entries()) {
1622 this.flushTasksQueue(workerNodeKey)
1623 }
1624 }
1625 }