refactor: refine argument type
[poolifier.git] / src / pools / abstract-pool.ts
1 import { randomUUID } from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import { existsSync } from 'node:fs'
4 import { type TransferListItem } from 'node:worker_threads'
5 import type {
6 MessageValue,
7 PromiseResponseWrapper,
8 Task
9 } from '../utility-types'
10 import {
11 DEFAULT_TASK_NAME,
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
13 EMPTY_FUNCTION,
14 average,
15 isKillBehavior,
16 isPlainObject,
17 median,
18 round,
19 updateMeasurementStatistics
20 } from '../utils'
21 import { KillBehaviors } from '../worker/worker-options'
22 import {
23 type IPool,
24 PoolEmitter,
25 PoolEvents,
26 type PoolInfo,
27 type PoolOptions,
28 type PoolType,
29 PoolTypes,
30 type TasksQueueOptions
31 } from './pool'
32 import type {
33 IWorker,
34 IWorkerNode,
35 WorkerInfo,
36 WorkerType,
37 WorkerUsage
38 } from './worker'
39 import {
40 type MeasurementStatisticsRequirements,
41 Measurements,
42 WorkerChoiceStrategies,
43 type WorkerChoiceStrategy,
44 type WorkerChoiceStrategyOptions
45 } from './selection-strategies/selection-strategies-types'
46 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
47 import { version } from './version'
48 import { WorkerNode } from './worker-node'
49
50 /**
51 * Base class that implements some shared logic for all poolifier pools.
52 *
53 * @typeParam Worker - Type of worker which manages this pool.
54 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
55 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
56 */
57 export abstract class AbstractPool<
58 Worker extends IWorker,
59 Data = unknown,
60 Response = unknown
61 > implements IPool<Worker, Data, Response> {
62 /** @inheritDoc */
63 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
64
65 /** @inheritDoc */
66 public readonly emitter?: PoolEmitter
67
68 /**
69 * The task execution response promise map.
70 *
71 * - `key`: The message id of each submitted task.
72 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
73 *
74 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
75 */
76 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
77 new Map<string, PromiseResponseWrapper<Response>>()
78
79 /**
80 * Worker choice strategy context referencing a worker choice algorithm implementation.
81 */
82 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
83 Worker,
84 Data,
85 Response
86 >
87
88 /**
89 * Dynamic pool maximum size property placeholder.
90 */
91 protected readonly max?: number
92
93 /**
94 * Whether the pool is starting or not.
95 */
96 private readonly starting: boolean
97 /**
98 * Whether the pool is started or not.
99 */
100 private started: boolean
101 /**
102 * The start timestamp of the pool.
103 */
104 private readonly startTimestamp
105
106 /**
107 * Constructs a new poolifier pool.
108 *
109 * @param numberOfWorkers - Number of workers that this pool should manage.
110 * @param filePath - Path to the worker file.
111 * @param opts - Options for the pool.
112 */
113 public constructor (
114 protected readonly numberOfWorkers: number,
115 protected readonly filePath: string,
116 protected readonly opts: PoolOptions<Worker>
117 ) {
118 if (!this.isMain()) {
119 throw new Error(
120 'Cannot start a pool from a worker with the same type as the pool'
121 )
122 }
123 this.checkNumberOfWorkers(this.numberOfWorkers)
124 this.checkFilePath(this.filePath)
125 this.checkPoolOptions(this.opts)
126
127 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
128 this.executeTask = this.executeTask.bind(this)
129 this.enqueueTask = this.enqueueTask.bind(this)
130
131 if (this.opts.enableEvents === true) {
132 this.emitter = new PoolEmitter()
133 }
134 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
135 Worker,
136 Data,
137 Response
138 >(
139 this,
140 this.opts.workerChoiceStrategy,
141 this.opts.workerChoiceStrategyOptions
142 )
143
144 this.setupHook()
145
146 this.starting = true
147 this.startPool()
148 this.starting = false
149 this.started = true
150
151 this.startTimestamp = performance.now()
152 }
153
154 private checkFilePath (filePath: string): void {
155 if (
156 filePath == null ||
157 typeof filePath !== 'string' ||
158 (typeof filePath === 'string' && filePath.trim().length === 0)
159 ) {
160 throw new Error('Please specify a file with a worker implementation')
161 }
162 if (!existsSync(filePath)) {
163 throw new Error(`Cannot find the worker file '${filePath}'`)
164 }
165 }
166
167 private checkNumberOfWorkers (numberOfWorkers: number): void {
168 if (numberOfWorkers == null) {
169 throw new Error(
170 'Cannot instantiate a pool without specifying the number of workers'
171 )
172 } else if (!Number.isSafeInteger(numberOfWorkers)) {
173 throw new TypeError(
174 'Cannot instantiate a pool with a non safe integer number of workers'
175 )
176 } else if (numberOfWorkers < 0) {
177 throw new RangeError(
178 'Cannot instantiate a pool with a negative number of workers'
179 )
180 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
181 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
182 }
183 }
184
185 protected checkDynamicPoolSize (min: number, max: number): void {
186 if (this.type === PoolTypes.dynamic) {
187 if (max == null) {
188 throw new TypeError(
189 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
190 )
191 } else if (!Number.isSafeInteger(max)) {
192 throw new TypeError(
193 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
194 )
195 } else if (min > max) {
196 throw new RangeError(
197 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
198 )
199 } else if (max === 0) {
200 throw new RangeError(
201 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
202 )
203 } else if (min === max) {
204 throw new RangeError(
205 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
206 )
207 }
208 }
209 }
210
211 private checkPoolOptions (opts: PoolOptions<Worker>): void {
212 if (isPlainObject(opts)) {
213 this.opts.workerChoiceStrategy =
214 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
215 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
216 this.opts.workerChoiceStrategyOptions = {
217 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
218 ...opts.workerChoiceStrategyOptions
219 }
220 this.checkValidWorkerChoiceStrategyOptions(
221 this.opts.workerChoiceStrategyOptions
222 )
223 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
224 this.opts.enableEvents = opts.enableEvents ?? true
225 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
226 if (this.opts.enableTasksQueue) {
227 this.checkValidTasksQueueOptions(
228 opts.tasksQueueOptions as TasksQueueOptions
229 )
230 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
231 opts.tasksQueueOptions as TasksQueueOptions
232 )
233 }
234 } else {
235 throw new TypeError('Invalid pool options: must be a plain object')
236 }
237 }
238
239 private checkValidWorkerChoiceStrategy (
240 workerChoiceStrategy: WorkerChoiceStrategy
241 ): void {
242 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
243 throw new Error(
244 `Invalid worker choice strategy '${workerChoiceStrategy}'`
245 )
246 }
247 }
248
249 private checkValidWorkerChoiceStrategyOptions (
250 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
251 ): void {
252 if (!isPlainObject(workerChoiceStrategyOptions)) {
253 throw new TypeError(
254 'Invalid worker choice strategy options: must be a plain object'
255 )
256 }
257 if (
258 workerChoiceStrategyOptions.choiceRetries != null &&
259 !Number.isSafeInteger(workerChoiceStrategyOptions.choiceRetries)
260 ) {
261 throw new TypeError(
262 'Invalid worker choice strategy options: choice retries must be an integer'
263 )
264 }
265 if (
266 workerChoiceStrategyOptions.choiceRetries != null &&
267 workerChoiceStrategyOptions.choiceRetries < 0
268 ) {
269 throw new RangeError(
270 `Invalid worker choice strategy options: choice retries '${workerChoiceStrategyOptions.choiceRetries}' must be greater or equal than zero`
271 )
272 }
273 if (
274 workerChoiceStrategyOptions.weights != null &&
275 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
276 ) {
277 throw new Error(
278 'Invalid worker choice strategy options: must have a weight for each worker node'
279 )
280 }
281 if (
282 workerChoiceStrategyOptions.measurement != null &&
283 !Object.values(Measurements).includes(
284 workerChoiceStrategyOptions.measurement
285 )
286 ) {
287 throw new Error(
288 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
289 )
290 }
291 }
292
293 private checkValidTasksQueueOptions (
294 tasksQueueOptions: TasksQueueOptions
295 ): void {
296 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
297 throw new TypeError('Invalid tasks queue options: must be a plain object')
298 }
299 if (
300 tasksQueueOptions?.concurrency != null &&
301 !Number.isSafeInteger(tasksQueueOptions.concurrency)
302 ) {
303 throw new TypeError(
304 'Invalid worker node tasks concurrency: must be an integer'
305 )
306 }
307 if (
308 tasksQueueOptions?.concurrency != null &&
309 tasksQueueOptions.concurrency <= 0
310 ) {
311 throw new RangeError(
312 `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero`
313 )
314 }
315 if (tasksQueueOptions?.queueMaxSize != null) {
316 throw new Error(
317 'Invalid tasks queue options: queueMaxSize is deprecated, please use size instead'
318 )
319 }
320 if (
321 tasksQueueOptions?.size != null &&
322 !Number.isSafeInteger(tasksQueueOptions.size)
323 ) {
324 throw new TypeError(
325 'Invalid worker node tasks queue size: must be an integer'
326 )
327 }
328 if (tasksQueueOptions?.size != null && tasksQueueOptions.size <= 0) {
329 throw new RangeError(
330 `Invalid worker node tasks queue size: ${tasksQueueOptions.size} is a negative integer or zero`
331 )
332 }
333 }
334
335 private startPool (): void {
336 while (
337 this.workerNodes.reduce(
338 (accumulator, workerNode) =>
339 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
340 0
341 ) < this.numberOfWorkers
342 ) {
343 this.createAndSetupWorkerNode()
344 }
345 }
346
347 /** @inheritDoc */
348 public get info (): PoolInfo {
349 return {
350 version,
351 type: this.type,
352 worker: this.worker,
353 ready: this.ready,
354 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
355 minSize: this.minSize,
356 maxSize: this.maxSize,
357 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
358 .runTime.aggregate &&
359 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
360 .waitTime.aggregate && { utilization: round(this.utilization) }),
361 workerNodes: this.workerNodes.length,
362 idleWorkerNodes: this.workerNodes.reduce(
363 (accumulator, workerNode) =>
364 workerNode.usage.tasks.executing === 0
365 ? accumulator + 1
366 : accumulator,
367 0
368 ),
369 busyWorkerNodes: this.workerNodes.reduce(
370 (accumulator, workerNode) =>
371 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
372 0
373 ),
374 executedTasks: this.workerNodes.reduce(
375 (accumulator, workerNode) =>
376 accumulator + workerNode.usage.tasks.executed,
377 0
378 ),
379 executingTasks: this.workerNodes.reduce(
380 (accumulator, workerNode) =>
381 accumulator + workerNode.usage.tasks.executing,
382 0
383 ),
384 ...(this.opts.enableTasksQueue === true && {
385 queuedTasks: this.workerNodes.reduce(
386 (accumulator, workerNode) =>
387 accumulator + workerNode.usage.tasks.queued,
388 0
389 )
390 }),
391 ...(this.opts.enableTasksQueue === true && {
392 maxQueuedTasks: this.workerNodes.reduce(
393 (accumulator, workerNode) =>
394 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
395 0
396 )
397 }),
398 ...(this.opts.enableTasksQueue === true && {
399 backPressure: this.hasBackPressure()
400 }),
401 ...(this.opts.enableTasksQueue === true && {
402 stolenTasks: this.workerNodes.reduce(
403 (accumulator, workerNode) =>
404 accumulator + workerNode.usage.tasks.stolen,
405 0
406 )
407 }),
408 failedTasks: this.workerNodes.reduce(
409 (accumulator, workerNode) =>
410 accumulator + workerNode.usage.tasks.failed,
411 0
412 ),
413 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
414 .runTime.aggregate && {
415 runTime: {
416 minimum: round(
417 Math.min(
418 ...this.workerNodes.map(
419 (workerNode) => workerNode.usage.runTime?.minimum ?? Infinity
420 )
421 )
422 ),
423 maximum: round(
424 Math.max(
425 ...this.workerNodes.map(
426 (workerNode) => workerNode.usage.runTime?.maximum ?? -Infinity
427 )
428 )
429 ),
430 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
431 .runTime.average && {
432 average: round(
433 average(
434 this.workerNodes.reduce<number[]>(
435 (accumulator, workerNode) =>
436 accumulator.concat(workerNode.usage.runTime.history),
437 []
438 )
439 )
440 )
441 }),
442 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
443 .runTime.median && {
444 median: round(
445 median(
446 this.workerNodes.reduce<number[]>(
447 (accumulator, workerNode) =>
448 accumulator.concat(workerNode.usage.runTime.history),
449 []
450 )
451 )
452 )
453 })
454 }
455 }),
456 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
457 .waitTime.aggregate && {
458 waitTime: {
459 minimum: round(
460 Math.min(
461 ...this.workerNodes.map(
462 (workerNode) => workerNode.usage.waitTime?.minimum ?? Infinity
463 )
464 )
465 ),
466 maximum: round(
467 Math.max(
468 ...this.workerNodes.map(
469 (workerNode) => workerNode.usage.waitTime?.maximum ?? -Infinity
470 )
471 )
472 ),
473 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
474 .waitTime.average && {
475 average: round(
476 average(
477 this.workerNodes.reduce<number[]>(
478 (accumulator, workerNode) =>
479 accumulator.concat(workerNode.usage.waitTime.history),
480 []
481 )
482 )
483 )
484 }),
485 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
486 .waitTime.median && {
487 median: round(
488 median(
489 this.workerNodes.reduce<number[]>(
490 (accumulator, workerNode) =>
491 accumulator.concat(workerNode.usage.waitTime.history),
492 []
493 )
494 )
495 )
496 })
497 }
498 })
499 }
500 }
501
502 /**
503 * The pool readiness boolean status.
504 */
505 private get ready (): boolean {
506 return (
507 this.workerNodes.reduce(
508 (accumulator, workerNode) =>
509 !workerNode.info.dynamic && workerNode.info.ready
510 ? accumulator + 1
511 : accumulator,
512 0
513 ) >= this.minSize
514 )
515 }
516
517 /**
518 * The approximate pool utilization.
519 *
520 * @returns The pool utilization.
521 */
522 private get utilization (): number {
523 const poolTimeCapacity =
524 (performance.now() - this.startTimestamp) * this.maxSize
525 const totalTasksRunTime = this.workerNodes.reduce(
526 (accumulator, workerNode) =>
527 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
528 0
529 )
530 const totalTasksWaitTime = this.workerNodes.reduce(
531 (accumulator, workerNode) =>
532 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
533 0
534 )
535 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
536 }
537
538 /**
539 * The pool type.
540 *
541 * If it is `'dynamic'`, it provides the `max` property.
542 */
543 protected abstract get type (): PoolType
544
545 /**
546 * The worker type.
547 */
548 protected abstract get worker (): WorkerType
549
550 /**
551 * The pool minimum size.
552 */
553 protected get minSize (): number {
554 return this.numberOfWorkers
555 }
556
557 /**
558 * The pool maximum size.
559 */
560 protected get maxSize (): number {
561 return this.max ?? this.numberOfWorkers
562 }
563
564 /**
565 * Checks if the worker id sent in the received message from a worker is valid.
566 *
567 * @param message - The received message.
568 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
569 */
570 private checkMessageWorkerId (message: MessageValue<Response>): void {
571 if (message.workerId == null) {
572 throw new Error('Worker message received without worker id')
573 } else if (
574 message.workerId != null &&
575 this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
576 ) {
577 throw new Error(
578 `Worker message received from unknown worker '${message.workerId}'`
579 )
580 }
581 }
582
583 /**
584 * Gets the given worker its worker node key.
585 *
586 * @param worker - The worker.
587 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
588 */
589 private getWorkerNodeKeyByWorker (worker: Worker): number {
590 return this.workerNodes.findIndex(
591 (workerNode) => workerNode.worker === worker
592 )
593 }
594
595 /**
596 * Gets the worker node key given its worker id.
597 *
598 * @param workerId - The worker id.
599 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
600 */
601 private getWorkerNodeKeyByWorkerId (workerId: number): number {
602 return this.workerNodes.findIndex(
603 (workerNode) => workerNode.info.id === workerId
604 )
605 }
606
607 /** @inheritDoc */
608 public setWorkerChoiceStrategy (
609 workerChoiceStrategy: WorkerChoiceStrategy,
610 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
611 ): void {
612 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
613 this.opts.workerChoiceStrategy = workerChoiceStrategy
614 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
615 this.opts.workerChoiceStrategy
616 )
617 if (workerChoiceStrategyOptions != null) {
618 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
619 }
620 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
621 workerNode.resetUsage()
622 this.sendStatisticsMessageToWorker(workerNodeKey)
623 }
624 }
625
626 /** @inheritDoc */
627 public setWorkerChoiceStrategyOptions (
628 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
629 ): void {
630 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
631 this.opts.workerChoiceStrategyOptions = {
632 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
633 ...workerChoiceStrategyOptions
634 }
635 this.workerChoiceStrategyContext.setOptions(
636 this.opts.workerChoiceStrategyOptions
637 )
638 }
639
640 /** @inheritDoc */
641 public enableTasksQueue (
642 enable: boolean,
643 tasksQueueOptions?: TasksQueueOptions
644 ): void {
645 if (this.opts.enableTasksQueue === true && !enable) {
646 this.flushTasksQueues()
647 }
648 this.opts.enableTasksQueue = enable
649 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
650 }
651
652 /** @inheritDoc */
653 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
654 if (this.opts.enableTasksQueue === true) {
655 this.checkValidTasksQueueOptions(tasksQueueOptions)
656 this.opts.tasksQueueOptions =
657 this.buildTasksQueueOptions(tasksQueueOptions)
658 this.setTasksQueueMaxSize(this.opts.tasksQueueOptions.size as number)
659 } else if (this.opts.tasksQueueOptions != null) {
660 delete this.opts.tasksQueueOptions
661 }
662 }
663
664 private setTasksQueueMaxSize (size: number): void {
665 for (const workerNode of this.workerNodes) {
666 workerNode.tasksQueueBackPressureSize = size
667 }
668 }
669
670 private buildTasksQueueOptions (
671 tasksQueueOptions: TasksQueueOptions
672 ): TasksQueueOptions {
673 return {
674 ...{
675 size: Math.pow(this.maxSize, 2),
676 concurrency: 1
677 },
678 ...tasksQueueOptions
679 }
680 }
681
682 /**
683 * Whether the pool is full or not.
684 *
685 * The pool filling boolean status.
686 */
687 protected get full (): boolean {
688 return this.workerNodes.length >= this.maxSize
689 }
690
691 /**
692 * Whether the pool is busy or not.
693 *
694 * The pool busyness boolean status.
695 */
696 protected abstract get busy (): boolean
697
698 /**
699 * Whether worker nodes are executing concurrently their tasks quota or not.
700 *
701 * @returns Worker nodes busyness boolean status.
702 */
703 protected internalBusy (): boolean {
704 if (this.opts.enableTasksQueue === true) {
705 return (
706 this.workerNodes.findIndex(
707 (workerNode) =>
708 workerNode.info.ready &&
709 workerNode.usage.tasks.executing <
710 (this.opts.tasksQueueOptions?.concurrency as number)
711 ) === -1
712 )
713 } else {
714 return (
715 this.workerNodes.findIndex(
716 (workerNode) =>
717 workerNode.info.ready && workerNode.usage.tasks.executing === 0
718 ) === -1
719 )
720 }
721 }
722
723 /** @inheritDoc */
724 public listTaskFunctions (): string[] {
725 for (const workerNode of this.workerNodes) {
726 if (
727 Array.isArray(workerNode.info.taskFunctions) &&
728 workerNode.info.taskFunctions.length > 0
729 ) {
730 return workerNode.info.taskFunctions
731 }
732 }
733 return []
734 }
735
736 /** @inheritDoc */
737 public async execute (
738 data?: Data,
739 name?: string,
740 transferList?: TransferListItem[]
741 ): Promise<Response> {
742 return await new Promise<Response>((resolve, reject) => {
743 if (!this.started) {
744 reject(new Error('Cannot execute a task on destroyed pool'))
745 return
746 }
747 if (name != null && typeof name !== 'string') {
748 reject(new TypeError('name argument must be a string'))
749 return
750 }
751 if (
752 name != null &&
753 typeof name === 'string' &&
754 name.trim().length === 0
755 ) {
756 reject(new TypeError('name argument must not be an empty string'))
757 return
758 }
759 if (transferList != null && !Array.isArray(transferList)) {
760 reject(new TypeError('transferList argument must be an array'))
761 return
762 }
763 const timestamp = performance.now()
764 const workerNodeKey = this.chooseWorkerNode()
765 const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo
766 const task: Task<Data> = {
767 name: name ?? DEFAULT_TASK_NAME,
768 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
769 data: data ?? ({} as Data),
770 transferList,
771 timestamp,
772 workerId: workerInfo.id as number,
773 taskId: randomUUID()
774 }
775 this.promiseResponseMap.set(task.taskId as string, {
776 resolve,
777 reject,
778 workerNodeKey
779 })
780 if (
781 this.opts.enableTasksQueue === false ||
782 (this.opts.enableTasksQueue === true &&
783 this.tasksQueueSize(workerNodeKey) === 0 &&
784 this.workerNodes[workerNodeKey].usage.tasks.executing <
785 (this.opts.tasksQueueOptions?.concurrency as number))
786 ) {
787 this.executeTask(workerNodeKey, task)
788 } else {
789 this.enqueueTask(workerNodeKey, task)
790 }
791 })
792 }
793
794 /** @inheritDoc */
795 public async destroy (): Promise<void> {
796 await Promise.all(
797 this.workerNodes.map(async (_, workerNodeKey) => {
798 await this.destroyWorkerNode(workerNodeKey)
799 })
800 )
801 this.emitter?.emit(PoolEvents.destroy, this.info)
802 this.started = false
803 }
804
805 protected async sendKillMessageToWorker (
806 workerNodeKey: number,
807 workerId: number
808 ): Promise<void> {
809 await new Promise<void>((resolve, reject) => {
810 this.registerWorkerMessageListener(workerNodeKey, (message) => {
811 if (message.kill === 'success') {
812 resolve()
813 } else if (message.kill === 'failure') {
814 reject(new Error(`Worker ${workerId} kill message handling failed`))
815 }
816 })
817 this.sendToWorker(workerNodeKey, { kill: true, workerId })
818 })
819 }
820
821 /**
822 * Terminates the worker node given its worker node key.
823 *
824 * @param workerNodeKey - The worker node key.
825 */
826 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
827
828 /**
829 * Setup hook to execute code before worker nodes are created in the abstract constructor.
830 * Can be overridden.
831 *
832 * @virtual
833 */
834 protected setupHook (): void {
835 /** Intentionally empty */
836 }
837
838 /**
839 * Should return whether the worker is the main worker or not.
840 */
841 protected abstract isMain (): boolean
842
843 /**
844 * Hook executed before the worker task execution.
845 * Can be overridden.
846 *
847 * @param workerNodeKey - The worker node key.
848 * @param task - The task to execute.
849 */
850 protected beforeTaskExecutionHook (
851 workerNodeKey: number,
852 task: Task<Data>
853 ): void {
854 if (this.workerNodes[workerNodeKey]?.usage != null) {
855 const workerUsage = this.workerNodes[workerNodeKey].usage
856 ++workerUsage.tasks.executing
857 this.updateWaitTimeWorkerUsage(workerUsage, task)
858 }
859 if (
860 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
861 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
862 task.name as string
863 ) != null
864 ) {
865 const taskFunctionWorkerUsage = this.workerNodes[
866 workerNodeKey
867 ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
868 ++taskFunctionWorkerUsage.tasks.executing
869 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
870 }
871 }
872
873 /**
874 * Hook executed after the worker task execution.
875 * Can be overridden.
876 *
877 * @param workerNodeKey - The worker node key.
878 * @param message - The received message.
879 */
880 protected afterTaskExecutionHook (
881 workerNodeKey: number,
882 message: MessageValue<Response>
883 ): void {
884 if (this.workerNodes[workerNodeKey]?.usage != null) {
885 const workerUsage = this.workerNodes[workerNodeKey].usage
886 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
887 this.updateRunTimeWorkerUsage(workerUsage, message)
888 this.updateEluWorkerUsage(workerUsage, message)
889 }
890 if (
891 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
892 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
893 message.taskPerformance?.name as string
894 ) != null
895 ) {
896 const taskFunctionWorkerUsage = this.workerNodes[
897 workerNodeKey
898 ].getTaskFunctionWorkerUsage(
899 message.taskPerformance?.name as string
900 ) as WorkerUsage
901 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
902 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
903 this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
904 }
905 }
906
907 /**
908 * Whether the worker node shall update its task function worker usage or not.
909 *
910 * @param workerNodeKey - The worker node key.
911 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
912 */
913 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
914 const workerInfo = this.getWorkerInfo(workerNodeKey)
915 return (
916 workerInfo != null &&
917 Array.isArray(workerInfo.taskFunctions) &&
918 workerInfo.taskFunctions.length > 2
919 )
920 }
921
922 private updateTaskStatisticsWorkerUsage (
923 workerUsage: WorkerUsage,
924 message: MessageValue<Response>
925 ): void {
926 const workerTaskStatistics = workerUsage.tasks
927 if (
928 workerTaskStatistics.executing != null &&
929 workerTaskStatistics.executing > 0
930 ) {
931 --workerTaskStatistics.executing
932 }
933 if (message.taskError == null) {
934 ++workerTaskStatistics.executed
935 } else {
936 ++workerTaskStatistics.failed
937 }
938 }
939
940 private updateRunTimeWorkerUsage (
941 workerUsage: WorkerUsage,
942 message: MessageValue<Response>
943 ): void {
944 if (message.taskError != null) {
945 return
946 }
947 updateMeasurementStatistics(
948 workerUsage.runTime,
949 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
950 message.taskPerformance?.runTime ?? 0
951 )
952 }
953
954 private updateWaitTimeWorkerUsage (
955 workerUsage: WorkerUsage,
956 task: Task<Data>
957 ): void {
958 const timestamp = performance.now()
959 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
960 updateMeasurementStatistics(
961 workerUsage.waitTime,
962 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
963 taskWaitTime
964 )
965 }
966
967 private updateEluWorkerUsage (
968 workerUsage: WorkerUsage,
969 message: MessageValue<Response>
970 ): void {
971 if (message.taskError != null) {
972 return
973 }
974 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
975 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
976 updateMeasurementStatistics(
977 workerUsage.elu.active,
978 eluTaskStatisticsRequirements,
979 message.taskPerformance?.elu?.active ?? 0
980 )
981 updateMeasurementStatistics(
982 workerUsage.elu.idle,
983 eluTaskStatisticsRequirements,
984 message.taskPerformance?.elu?.idle ?? 0
985 )
986 if (eluTaskStatisticsRequirements.aggregate) {
987 if (message.taskPerformance?.elu != null) {
988 if (workerUsage.elu.utilization != null) {
989 workerUsage.elu.utilization =
990 (workerUsage.elu.utilization +
991 message.taskPerformance.elu.utilization) /
992 2
993 } else {
994 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
995 }
996 }
997 }
998 }
999
1000 /**
1001 * Chooses a worker node for the next task.
1002 *
1003 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
1004 *
1005 * @returns The chosen worker node key
1006 */
1007 private chooseWorkerNode (): number {
1008 if (this.shallCreateDynamicWorker()) {
1009 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
1010 if (
1011 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1012 ) {
1013 return workerNodeKey
1014 }
1015 }
1016 return this.workerChoiceStrategyContext.execute()
1017 }
1018
1019 /**
1020 * Conditions for dynamic worker creation.
1021 *
1022 * @returns Whether to create a dynamic worker or not.
1023 */
1024 private shallCreateDynamicWorker (): boolean {
1025 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
1026 }
1027
1028 /**
1029 * Sends a message to worker given its worker node key.
1030 *
1031 * @param workerNodeKey - The worker node key.
1032 * @param message - The message.
1033 * @param transferList - The optional array of transferable objects.
1034 */
1035 protected abstract sendToWorker (
1036 workerNodeKey: number,
1037 message: MessageValue<Data>,
1038 transferList?: TransferListItem[]
1039 ): void
1040
1041 /**
1042 * Creates a new worker.
1043 *
1044 * @returns Newly created worker.
1045 */
1046 protected abstract createWorker (): Worker
1047
1048 /**
1049 * Creates a new, completely set up worker node.
1050 *
1051 * @returns New, completely set up worker node key.
1052 */
1053 protected createAndSetupWorkerNode (): number {
1054 const worker = this.createWorker()
1055
1056 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
1057 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
1058 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1059 worker.on('error', (error) => {
1060 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1061 const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo
1062 workerInfo.ready = false
1063 this.workerNodes[workerNodeKey].closeChannel()
1064 this.emitter?.emit(PoolEvents.error, error)
1065 if (
1066 this.opts.restartWorkerOnError === true &&
1067 !this.starting &&
1068 this.started
1069 ) {
1070 if (workerInfo.dynamic) {
1071 this.createAndSetupDynamicWorkerNode()
1072 } else {
1073 this.createAndSetupWorkerNode()
1074 }
1075 }
1076 if (this.opts.enableTasksQueue === true) {
1077 this.redistributeQueuedTasks(workerNodeKey)
1078 }
1079 })
1080 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
1081 worker.once('exit', () => {
1082 this.removeWorkerNode(worker)
1083 })
1084
1085 const workerNodeKey = this.addWorkerNode(worker)
1086
1087 this.afterWorkerNodeSetup(workerNodeKey)
1088
1089 return workerNodeKey
1090 }
1091
1092 /**
1093 * Creates a new, completely set up dynamic worker node.
1094 *
1095 * @returns New, completely set up dynamic worker node key.
1096 */
1097 protected createAndSetupDynamicWorkerNode (): number {
1098 const workerNodeKey = this.createAndSetupWorkerNode()
1099 this.registerWorkerMessageListener(workerNodeKey, (message) => {
1100 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1101 message.workerId
1102 )
1103 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
1104 // Kill message received from worker
1105 if (
1106 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1107 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
1108 ((this.opts.enableTasksQueue === false &&
1109 workerUsage.tasks.executing === 0) ||
1110 (this.opts.enableTasksQueue === true &&
1111 workerUsage.tasks.executing === 0 &&
1112 this.tasksQueueSize(localWorkerNodeKey) === 0)))
1113 ) {
1114 this.destroyWorkerNode(localWorkerNodeKey).catch((error) => {
1115 this.emitter?.emit(PoolEvents.error, error)
1116 })
1117 }
1118 })
1119 const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo
1120 this.sendToWorker(workerNodeKey, {
1121 checkActive: true,
1122 workerId: workerInfo.id as number
1123 })
1124 workerInfo.dynamic = true
1125 if (
1126 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
1127 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1128 ) {
1129 workerInfo.ready = true
1130 }
1131 this.checkAndEmitDynamicWorkerCreationEvents()
1132 return workerNodeKey
1133 }
1134
1135 /**
1136 * Registers a listener callback on the worker given its worker node key.
1137 *
1138 * @param workerNodeKey - The worker node key.
1139 * @param listener - The message listener callback.
1140 */
1141 protected abstract registerWorkerMessageListener<
1142 Message extends Data | Response
1143 >(
1144 workerNodeKey: number,
1145 listener: (message: MessageValue<Message>) => void
1146 ): void
1147
1148 /**
1149 * Method hooked up after a worker node has been newly created.
1150 * Can be overridden.
1151 *
1152 * @param workerNodeKey - The newly created worker node key.
1153 */
1154 protected afterWorkerNodeSetup (workerNodeKey: number): void {
1155 // Listen to worker messages.
1156 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
1157 // Send the startup message to worker.
1158 this.sendStartupMessageToWorker(workerNodeKey)
1159 // Send the statistics message to worker.
1160 this.sendStatisticsMessageToWorker(workerNodeKey)
1161 if (this.opts.enableTasksQueue === true) {
1162 this.workerNodes[workerNodeKey].onEmptyQueue =
1163 this.taskStealingOnEmptyQueue.bind(this)
1164 this.workerNodes[workerNodeKey].onBackPressure =
1165 this.tasksStealingOnBackPressure.bind(this)
1166 }
1167 }
1168
1169 /**
1170 * Sends the startup message to worker given its worker node key.
1171 *
1172 * @param workerNodeKey - The worker node key.
1173 */
1174 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1175
1176 /**
1177 * Sends the statistics message to worker given its worker node key.
1178 *
1179 * @param workerNodeKey - The worker node key.
1180 */
1181 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
1182 this.sendToWorker(workerNodeKey, {
1183 statistics: {
1184 runTime:
1185 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1186 .runTime.aggregate,
1187 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1188 .elu.aggregate
1189 },
1190 workerId: (this.getWorkerInfo(workerNodeKey) as WorkerInfo).id as number
1191 })
1192 }
1193
1194 private redistributeQueuedTasks (workerNodeKey: number): void {
1195 while (this.tasksQueueSize(workerNodeKey) > 0) {
1196 let destinationWorkerNodeKey!: number
1197 let minQueuedTasks = Infinity
1198 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
1199 if (workerNode.info.ready && workerNodeId !== workerNodeKey) {
1200 if (workerNode.usage.tasks.queued === 0) {
1201 destinationWorkerNodeKey = workerNodeId
1202 break
1203 }
1204 if (workerNode.usage.tasks.queued < minQueuedTasks) {
1205 minQueuedTasks = workerNode.usage.tasks.queued
1206 destinationWorkerNodeKey = workerNodeId
1207 }
1208 }
1209 }
1210 if (destinationWorkerNodeKey != null) {
1211 const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey]
1212 const task = {
1213 ...(this.dequeueTask(workerNodeKey) as Task<Data>),
1214 workerId: destinationWorkerNode.info.id as number
1215 }
1216 if (
1217 this.tasksQueueSize(destinationWorkerNodeKey) === 0 &&
1218 destinationWorkerNode.usage.tasks.executing <
1219 (this.opts.tasksQueueOptions?.concurrency as number)
1220 ) {
1221 this.executeTask(destinationWorkerNodeKey, task)
1222 } else {
1223 this.enqueueTask(destinationWorkerNodeKey, task)
1224 }
1225 }
1226 }
1227 }
1228
1229 private taskStealingOnEmptyQueue (workerId: number): void {
1230 const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
1231 const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey]
1232 const workerNodes = this.workerNodes
1233 .slice()
1234 .sort(
1235 (workerNodeA, workerNodeB) =>
1236 workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
1237 )
1238 for (const sourceWorkerNode of workerNodes) {
1239 if (sourceWorkerNode.usage.tasks.queued === 0) {
1240 break
1241 }
1242 if (
1243 sourceWorkerNode.info.ready &&
1244 sourceWorkerNode.info.id !== workerId &&
1245 sourceWorkerNode.usage.tasks.queued > 0
1246 ) {
1247 const task = {
1248 ...(sourceWorkerNode.popTask() as Task<Data>),
1249 workerId: destinationWorkerNode.info.id as number
1250 }
1251 if (
1252 this.tasksQueueSize(destinationWorkerNodeKey) === 0 &&
1253 destinationWorkerNode.usage.tasks.executing <
1254 (this.opts.tasksQueueOptions?.concurrency as number)
1255 ) {
1256 this.executeTask(destinationWorkerNodeKey, task)
1257 } else {
1258 this.enqueueTask(destinationWorkerNodeKey, task)
1259 }
1260 if (destinationWorkerNode?.usage != null) {
1261 ++destinationWorkerNode.usage.tasks.stolen
1262 }
1263 if (
1264 this.shallUpdateTaskFunctionWorkerUsage(destinationWorkerNodeKey) &&
1265 destinationWorkerNode.getTaskFunctionWorkerUsage(
1266 task.name as string
1267 ) != null
1268 ) {
1269 const taskFunctionWorkerUsage =
1270 destinationWorkerNode.getTaskFunctionWorkerUsage(
1271 task.name as string
1272 ) as WorkerUsage
1273 ++taskFunctionWorkerUsage.tasks.stolen
1274 }
1275 break
1276 }
1277 }
1278 }
1279
1280 private tasksStealingOnBackPressure (workerId: number): void {
1281 if ((this.opts.tasksQueueOptions?.size as number) <= 1) {
1282 return
1283 }
1284 const sourceWorkerNode =
1285 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
1286 const workerNodes = this.workerNodes
1287 .slice()
1288 .sort(
1289 (workerNodeA, workerNodeB) =>
1290 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1291 )
1292 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1293 if (
1294 sourceWorkerNode.usage.tasks.queued > 0 &&
1295 workerNode.info.ready &&
1296 workerNode.info.id !== workerId &&
1297 workerNode.usage.tasks.queued <
1298 (this.opts.tasksQueueOptions?.size as number) - 1
1299 ) {
1300 const task = {
1301 ...(sourceWorkerNode.popTask() as Task<Data>),
1302 workerId: workerNode.info.id as number
1303 }
1304 if (this.tasksQueueSize(workerNodeKey) === 0) {
1305 this.executeTask(workerNodeKey, task)
1306 } else {
1307 this.enqueueTask(workerNodeKey, task)
1308 }
1309 if (workerNode?.usage != null) {
1310 ++workerNode.usage.tasks.stolen
1311 }
1312 if (
1313 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1314 workerNode.getTaskFunctionWorkerUsage(task.name as string) != null
1315 ) {
1316 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1317 task.name as string
1318 ) as WorkerUsage
1319 ++taskFunctionWorkerUsage.tasks.stolen
1320 }
1321 }
1322 }
1323 }
1324
1325 /**
1326 * This method is the listener registered for each worker message.
1327 *
1328 * @returns The listener function to execute when a message is received from a worker.
1329 */
1330 protected workerListener (): (message: MessageValue<Response>) => void {
1331 return (message) => {
1332 this.checkMessageWorkerId(message)
1333 if (message.ready != null && message.taskFunctions != null) {
1334 // Worker ready response received from worker
1335 this.handleWorkerReadyResponse(message)
1336 } else if (message.taskId != null) {
1337 // Task execution response received from worker
1338 this.handleTaskExecutionResponse(message)
1339 } else if (message.taskFunctions != null) {
1340 // Task functions message received from worker
1341 (
1342 this.getWorkerInfo(
1343 this.getWorkerNodeKeyByWorkerId(message.workerId)
1344 ) as WorkerInfo
1345 ).taskFunctions = message.taskFunctions
1346 }
1347 }
1348 }
1349
1350 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
1351 if (message.ready === false) {
1352 throw new Error(`Worker ${message.workerId} failed to initialize`)
1353 }
1354 const workerInfo = this.getWorkerInfo(
1355 this.getWorkerNodeKeyByWorkerId(message.workerId)
1356 ) as WorkerInfo
1357 workerInfo.ready = message.ready as boolean
1358 workerInfo.taskFunctions = message.taskFunctions
1359 if (this.emitter != null && this.ready) {
1360 this.emitter.emit(PoolEvents.ready, this.info)
1361 }
1362 }
1363
1364 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1365 const { taskId, taskError, data } = message
1366 const promiseResponse = this.promiseResponseMap.get(taskId as string)
1367 if (promiseResponse != null) {
1368 if (taskError != null) {
1369 this.emitter?.emit(PoolEvents.taskError, taskError)
1370 promiseResponse.reject(taskError.message)
1371 } else {
1372 promiseResponse.resolve(data as Response)
1373 }
1374 const workerNodeKey = promiseResponse.workerNodeKey
1375 this.afterTaskExecutionHook(workerNodeKey, message)
1376 this.promiseResponseMap.delete(taskId as string)
1377 if (
1378 this.opts.enableTasksQueue === true &&
1379 this.tasksQueueSize(workerNodeKey) > 0 &&
1380 this.workerNodes[workerNodeKey].usage.tasks.executing <
1381 (this.opts.tasksQueueOptions?.concurrency as number)
1382 ) {
1383 this.executeTask(
1384 workerNodeKey,
1385 this.dequeueTask(workerNodeKey) as Task<Data>
1386 )
1387 }
1388 this.workerChoiceStrategyContext.update(workerNodeKey)
1389 }
1390 }
1391
1392 private checkAndEmitTaskExecutionEvents (): void {
1393 if (this.busy) {
1394 this.emitter?.emit(PoolEvents.busy, this.info)
1395 }
1396 }
1397
1398 private checkAndEmitTaskQueuingEvents (): void {
1399 if (this.hasBackPressure()) {
1400 this.emitter?.emit(PoolEvents.backPressure, this.info)
1401 }
1402 }
1403
1404 private checkAndEmitDynamicWorkerCreationEvents (): void {
1405 if (this.type === PoolTypes.dynamic) {
1406 if (this.full) {
1407 this.emitter?.emit(PoolEvents.full, this.info)
1408 }
1409 }
1410 }
1411
1412 /**
1413 * Gets the worker information given its worker node key.
1414 *
1415 * @param workerNodeKey - The worker node key.
1416 * @returns The worker information.
1417 */
1418 protected getWorkerInfo (workerNodeKey: number): WorkerInfo | undefined {
1419 return this.workerNodes[workerNodeKey]?.info
1420 }
1421
1422 /**
1423 * Adds the given worker in the pool worker nodes.
1424 *
1425 * @param worker - The worker.
1426 * @returns The added worker node key.
1427 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1428 */
1429 private addWorkerNode (worker: Worker): number {
1430 const workerNode = new WorkerNode<Worker, Data>(
1431 worker,
1432 this.worker,
1433 this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2)
1434 )
1435 // Flag the worker node as ready at pool startup.
1436 if (this.starting) {
1437 workerNode.info.ready = true
1438 }
1439 this.workerNodes.push(workerNode)
1440 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1441 if (workerNodeKey === -1) {
1442 throw new Error('Worker node added not found')
1443 }
1444 return workerNodeKey
1445 }
1446
1447 /**
1448 * Removes the given worker from the pool worker nodes.
1449 *
1450 * @param worker - The worker.
1451 */
1452 private removeWorkerNode (worker: Worker): void {
1453 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1454 if (workerNodeKey !== -1) {
1455 this.workerNodes.splice(workerNodeKey, 1)
1456 this.workerChoiceStrategyContext.remove(workerNodeKey)
1457 }
1458 }
1459
1460 /** @inheritDoc */
1461 public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
1462 return (
1463 this.opts.enableTasksQueue === true &&
1464 this.workerNodes[workerNodeKey].hasBackPressure()
1465 )
1466 }
1467
1468 private hasBackPressure (): boolean {
1469 return (
1470 this.opts.enableTasksQueue === true &&
1471 this.workerNodes.findIndex(
1472 (workerNode) => !workerNode.hasBackPressure()
1473 ) === -1
1474 )
1475 }
1476
1477 /**
1478 * Executes the given task on the worker given its worker node key.
1479 *
1480 * @param workerNodeKey - The worker node key.
1481 * @param task - The task to execute.
1482 */
1483 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1484 this.beforeTaskExecutionHook(workerNodeKey, task)
1485 this.sendToWorker(workerNodeKey, task, task.transferList)
1486 this.checkAndEmitTaskExecutionEvents()
1487 }
1488
1489 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1490 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1491 this.checkAndEmitTaskQueuingEvents()
1492 return tasksQueueSize
1493 }
1494
1495 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1496 return this.workerNodes[workerNodeKey].dequeueTask()
1497 }
1498
1499 private tasksQueueSize (workerNodeKey: number): number {
1500 return this.workerNodes[workerNodeKey].tasksQueueSize()
1501 }
1502
1503 protected flushTasksQueue (workerNodeKey: number): void {
1504 while (this.tasksQueueSize(workerNodeKey) > 0) {
1505 this.executeTask(
1506 workerNodeKey,
1507 this.dequeueTask(workerNodeKey) as Task<Data>
1508 )
1509 }
1510 this.workerNodes[workerNodeKey].clearTasksQueue()
1511 }
1512
1513 private flushTasksQueues (): void {
1514 for (const [workerNodeKey] of this.workerNodes.entries()) {
1515 this.flushTasksQueue(workerNodeKey)
1516 }
1517 }
1518 }