docs: add changelog entries
[poolifier.git] / src / pools / abstract-pool.ts
1 import { randomUUID } from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import { existsSync } from 'node:fs'
4 import { type TransferListItem } from 'node:worker_threads'
5 import type {
6 MessageValue,
7 PromiseResponseWrapper,
8 Task
9 } from '../utility-types'
10 import {
11 DEFAULT_TASK_NAME,
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
13 EMPTY_FUNCTION,
14 average,
15 isKillBehavior,
16 isPlainObject,
17 max,
18 median,
19 min,
20 round,
21 updateMeasurementStatistics
22 } from '../utils'
23 import { KillBehaviors } from '../worker/worker-options'
24 import type { TaskFunction } from '../worker/task-functions'
25 import {
26 type IPool,
27 PoolEmitter,
28 PoolEvents,
29 type PoolInfo,
30 type PoolOptions,
31 type PoolType,
32 PoolTypes,
33 type TasksQueueOptions
34 } from './pool'
35 import type {
36 IWorker,
37 IWorkerNode,
38 WorkerInfo,
39 WorkerType,
40 WorkerUsage
41 } from './worker'
42 import {
43 type MeasurementStatisticsRequirements,
44 Measurements,
45 WorkerChoiceStrategies,
46 type WorkerChoiceStrategy,
47 type WorkerChoiceStrategyOptions
48 } from './selection-strategies/selection-strategies-types'
49 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
50 import { version } from './version'
51 import { WorkerNode } from './worker-node'
52
53 /**
54 * Base class that implements some shared logic for all poolifier pools.
55 *
56 * @typeParam Worker - Type of worker which manages this pool.
57 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
58 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
59 */
60 export abstract class AbstractPool<
61 Worker extends IWorker,
62 Data = unknown,
63 Response = unknown
64 > implements IPool<Worker, Data, Response> {
65 /** @inheritDoc */
66 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
67
68 /** @inheritDoc */
69 public readonly emitter?: PoolEmitter
70
71 /**
72 * The task execution response promise map:
73 * - `key`: The message id of each submitted task.
74 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
75 *
76 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
77 */
78 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
79 new Map<string, PromiseResponseWrapper<Response>>()
80
81 /**
82 * Worker choice strategy context referencing a worker choice algorithm implementation.
83 */
84 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
85 Worker,
86 Data,
87 Response
88 >
89
90 /**
91 * Dynamic pool maximum size property placeholder.
92 */
93 protected readonly max?: number
94
95 /**
96 * The task functions added at runtime map:
97 * - `key`: The task function name.
98 * - `value`: The task function itself.
99 */
100 private readonly taskFunctions: Map<string, TaskFunction<Data, Response>>
101
102 /**
103 * Whether the pool is starting or not.
104 */
105 private readonly starting: boolean
106 /**
107 * Whether the pool is started or not.
108 */
109 private started: boolean
110 /**
111 * The start timestamp of the pool.
112 */
113 private readonly startTimestamp
114
115 /**
116 * Constructs a new poolifier pool.
117 *
118 * @param numberOfWorkers - Number of workers that this pool should manage.
119 * @param filePath - Path to the worker file.
120 * @param opts - Options for the pool.
121 */
122 public constructor (
123 protected readonly numberOfWorkers: number,
124 protected readonly filePath: string,
125 protected readonly opts: PoolOptions<Worker>
126 ) {
127 if (!this.isMain()) {
128 throw new Error(
129 'Cannot start a pool from a worker with the same type as the pool'
130 )
131 }
132 this.checkNumberOfWorkers(this.numberOfWorkers)
133 this.checkFilePath(this.filePath)
134 this.checkPoolOptions(this.opts)
135
136 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
137 this.executeTask = this.executeTask.bind(this)
138 this.enqueueTask = this.enqueueTask.bind(this)
139
140 if (this.opts.enableEvents === true) {
141 this.emitter = new PoolEmitter()
142 }
143 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
144 Worker,
145 Data,
146 Response
147 >(
148 this,
149 this.opts.workerChoiceStrategy,
150 this.opts.workerChoiceStrategyOptions
151 )
152
153 this.setupHook()
154
155 this.taskFunctions = new Map<string, TaskFunction<Data, Response>>()
156
157 this.starting = true
158 this.startPool()
159 this.starting = false
160 this.started = true
161
162 this.startTimestamp = performance.now()
163 }
164
165 private checkFilePath (filePath: string): void {
166 if (
167 filePath == null ||
168 typeof filePath !== 'string' ||
169 (typeof filePath === 'string' && filePath.trim().length === 0)
170 ) {
171 throw new Error('Please specify a file with a worker implementation')
172 }
173 if (!existsSync(filePath)) {
174 throw new Error(`Cannot find the worker file '${filePath}'`)
175 }
176 }
177
178 private checkNumberOfWorkers (numberOfWorkers: number): void {
179 if (numberOfWorkers == null) {
180 throw new Error(
181 'Cannot instantiate a pool without specifying the number of workers'
182 )
183 } else if (!Number.isSafeInteger(numberOfWorkers)) {
184 throw new TypeError(
185 'Cannot instantiate a pool with a non safe integer number of workers'
186 )
187 } else if (numberOfWorkers < 0) {
188 throw new RangeError(
189 'Cannot instantiate a pool with a negative number of workers'
190 )
191 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
192 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
193 }
194 }
195
196 protected checkDynamicPoolSize (min: number, max: number): void {
197 if (this.type === PoolTypes.dynamic) {
198 if (max == null) {
199 throw new TypeError(
200 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
201 )
202 } else if (!Number.isSafeInteger(max)) {
203 throw new TypeError(
204 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
205 )
206 } else if (min > max) {
207 throw new RangeError(
208 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
209 )
210 } else if (max === 0) {
211 throw new RangeError(
212 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
213 )
214 } else if (min === max) {
215 throw new RangeError(
216 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
217 )
218 }
219 }
220 }
221
222 private checkPoolOptions (opts: PoolOptions<Worker>): void {
223 if (isPlainObject(opts)) {
224 this.opts.workerChoiceStrategy =
225 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
226 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
227 this.opts.workerChoiceStrategyOptions = {
228 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
229 ...opts.workerChoiceStrategyOptions
230 }
231 this.checkValidWorkerChoiceStrategyOptions(
232 this.opts.workerChoiceStrategyOptions
233 )
234 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
235 this.opts.enableEvents = opts.enableEvents ?? true
236 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
237 if (this.opts.enableTasksQueue) {
238 this.checkValidTasksQueueOptions(
239 opts.tasksQueueOptions as TasksQueueOptions
240 )
241 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
242 opts.tasksQueueOptions as TasksQueueOptions
243 )
244 }
245 } else {
246 throw new TypeError('Invalid pool options: must be a plain object')
247 }
248 }
249
250 private checkValidWorkerChoiceStrategy (
251 workerChoiceStrategy: WorkerChoiceStrategy
252 ): void {
253 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
254 throw new Error(
255 `Invalid worker choice strategy '${workerChoiceStrategy}'`
256 )
257 }
258 }
259
260 private checkValidWorkerChoiceStrategyOptions (
261 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
262 ): void {
263 if (!isPlainObject(workerChoiceStrategyOptions)) {
264 throw new TypeError(
265 'Invalid worker choice strategy options: must be a plain object'
266 )
267 }
268 if (
269 workerChoiceStrategyOptions.retries != null &&
270 !Number.isSafeInteger(workerChoiceStrategyOptions.retries)
271 ) {
272 throw new TypeError(
273 'Invalid worker choice strategy options: retries must be an integer'
274 )
275 }
276 if (
277 workerChoiceStrategyOptions.retries != null &&
278 workerChoiceStrategyOptions.retries < 0
279 ) {
280 throw new RangeError(
281 `Invalid worker choice strategy options: retries '${workerChoiceStrategyOptions.retries}' must be greater or equal than zero`
282 )
283 }
284 if (
285 workerChoiceStrategyOptions.weights != null &&
286 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
287 ) {
288 throw new Error(
289 'Invalid worker choice strategy options: must have a weight for each worker node'
290 )
291 }
292 if (
293 workerChoiceStrategyOptions.measurement != null &&
294 !Object.values(Measurements).includes(
295 workerChoiceStrategyOptions.measurement
296 )
297 ) {
298 throw new Error(
299 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
300 )
301 }
302 }
303
304 private checkValidTasksQueueOptions (
305 tasksQueueOptions: TasksQueueOptions
306 ): void {
307 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
308 throw new TypeError('Invalid tasks queue options: must be a plain object')
309 }
310 if (
311 tasksQueueOptions?.concurrency != null &&
312 !Number.isSafeInteger(tasksQueueOptions?.concurrency)
313 ) {
314 throw new TypeError(
315 'Invalid worker node tasks concurrency: must be an integer'
316 )
317 }
318 if (
319 tasksQueueOptions?.concurrency != null &&
320 tasksQueueOptions?.concurrency <= 0
321 ) {
322 throw new RangeError(
323 `Invalid worker node tasks concurrency: ${tasksQueueOptions?.concurrency} is a negative integer or zero`
324 )
325 }
326 if (tasksQueueOptions?.queueMaxSize != null) {
327 throw new Error(
328 'Invalid tasks queue options: queueMaxSize is deprecated, please use size instead'
329 )
330 }
331 if (
332 tasksQueueOptions?.size != null &&
333 !Number.isSafeInteger(tasksQueueOptions?.size)
334 ) {
335 throw new TypeError(
336 'Invalid worker node tasks queue size: must be an integer'
337 )
338 }
339 if (tasksQueueOptions?.size != null && tasksQueueOptions?.size <= 0) {
340 throw new RangeError(
341 `Invalid worker node tasks queue size: ${tasksQueueOptions?.size} is a negative integer or zero`
342 )
343 }
344 }
345
346 private startPool (): void {
347 while (
348 this.workerNodes.reduce(
349 (accumulator, workerNode) =>
350 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
351 0
352 ) < this.numberOfWorkers
353 ) {
354 this.createAndSetupWorkerNode()
355 }
356 }
357
358 /** @inheritDoc */
359 public get info (): PoolInfo {
360 return {
361 version,
362 type: this.type,
363 worker: this.worker,
364 ready: this.ready,
365 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
366 minSize: this.minSize,
367 maxSize: this.maxSize,
368 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
369 .runTime.aggregate &&
370 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
371 .waitTime.aggregate && { utilization: round(this.utilization) }),
372 workerNodes: this.workerNodes.length,
373 idleWorkerNodes: this.workerNodes.reduce(
374 (accumulator, workerNode) =>
375 workerNode.usage.tasks.executing === 0
376 ? accumulator + 1
377 : accumulator,
378 0
379 ),
380 busyWorkerNodes: this.workerNodes.reduce(
381 (accumulator, workerNode) =>
382 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
383 0
384 ),
385 executedTasks: this.workerNodes.reduce(
386 (accumulator, workerNode) =>
387 accumulator + workerNode.usage.tasks.executed,
388 0
389 ),
390 executingTasks: this.workerNodes.reduce(
391 (accumulator, workerNode) =>
392 accumulator + workerNode.usage.tasks.executing,
393 0
394 ),
395 ...(this.opts.enableTasksQueue === true && {
396 queuedTasks: this.workerNodes.reduce(
397 (accumulator, workerNode) =>
398 accumulator + workerNode.usage.tasks.queued,
399 0
400 )
401 }),
402 ...(this.opts.enableTasksQueue === true && {
403 maxQueuedTasks: this.workerNodes.reduce(
404 (accumulator, workerNode) =>
405 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
406 0
407 )
408 }),
409 ...(this.opts.enableTasksQueue === true && {
410 backPressure: this.hasBackPressure()
411 }),
412 ...(this.opts.enableTasksQueue === true && {
413 stolenTasks: this.workerNodes.reduce(
414 (accumulator, workerNode) =>
415 accumulator + workerNode.usage.tasks.stolen,
416 0
417 )
418 }),
419 failedTasks: this.workerNodes.reduce(
420 (accumulator, workerNode) =>
421 accumulator + workerNode.usage.tasks.failed,
422 0
423 ),
424 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
425 .runTime.aggregate && {
426 runTime: {
427 minimum: round(
428 min(
429 ...this.workerNodes.map(
430 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
431 )
432 )
433 ),
434 maximum: round(
435 max(
436 ...this.workerNodes.map(
437 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
438 )
439 )
440 ),
441 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
442 .runTime.average && {
443 average: round(
444 average(
445 this.workerNodes.reduce<number[]>(
446 (accumulator, workerNode) =>
447 accumulator.concat(workerNode.usage.runTime.history),
448 []
449 )
450 )
451 )
452 }),
453 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
454 .runTime.median && {
455 median: round(
456 median(
457 this.workerNodes.reduce<number[]>(
458 (accumulator, workerNode) =>
459 accumulator.concat(workerNode.usage.runTime.history),
460 []
461 )
462 )
463 )
464 })
465 }
466 }),
467 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
468 .waitTime.aggregate && {
469 waitTime: {
470 minimum: round(
471 min(
472 ...this.workerNodes.map(
473 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
474 )
475 )
476 ),
477 maximum: round(
478 max(
479 ...this.workerNodes.map(
480 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
481 )
482 )
483 ),
484 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
485 .waitTime.average && {
486 average: round(
487 average(
488 this.workerNodes.reduce<number[]>(
489 (accumulator, workerNode) =>
490 accumulator.concat(workerNode.usage.waitTime.history),
491 []
492 )
493 )
494 )
495 }),
496 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
497 .waitTime.median && {
498 median: round(
499 median(
500 this.workerNodes.reduce<number[]>(
501 (accumulator, workerNode) =>
502 accumulator.concat(workerNode.usage.waitTime.history),
503 []
504 )
505 )
506 )
507 })
508 }
509 })
510 }
511 }
512
513 /**
514 * The pool readiness boolean status.
515 */
516 private get ready (): boolean {
517 return (
518 this.workerNodes.reduce(
519 (accumulator, workerNode) =>
520 !workerNode.info.dynamic && workerNode.info.ready
521 ? accumulator + 1
522 : accumulator,
523 0
524 ) >= this.minSize
525 )
526 }
527
528 /**
529 * The approximate pool utilization.
530 *
531 * @returns The pool utilization.
532 */
533 private get utilization (): number {
534 const poolTimeCapacity =
535 (performance.now() - this.startTimestamp) * this.maxSize
536 const totalTasksRunTime = this.workerNodes.reduce(
537 (accumulator, workerNode) =>
538 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
539 0
540 )
541 const totalTasksWaitTime = this.workerNodes.reduce(
542 (accumulator, workerNode) =>
543 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
544 0
545 )
546 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
547 }
548
549 /**
550 * The pool type.
551 *
552 * If it is `'dynamic'`, it provides the `max` property.
553 */
554 protected abstract get type (): PoolType
555
556 /**
557 * The worker type.
558 */
559 protected abstract get worker (): WorkerType
560
561 /**
562 * The pool minimum size.
563 */
564 protected get minSize (): number {
565 return this.numberOfWorkers
566 }
567
568 /**
569 * The pool maximum size.
570 */
571 protected get maxSize (): number {
572 return this.max ?? this.numberOfWorkers
573 }
574
575 /**
576 * Checks if the worker id sent in the received message from a worker is valid.
577 *
578 * @param message - The received message.
579 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
580 */
581 private checkMessageWorkerId (message: MessageValue<Response>): void {
582 if (message.workerId == null) {
583 throw new Error('Worker message received without worker id')
584 } else if (
585 message.workerId != null &&
586 this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
587 ) {
588 throw new Error(
589 `Worker message received from unknown worker '${message.workerId}'`
590 )
591 }
592 }
593
594 /**
595 * Gets the given worker its worker node key.
596 *
597 * @param worker - The worker.
598 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
599 */
600 private getWorkerNodeKeyByWorker (worker: Worker): number {
601 return this.workerNodes.findIndex(
602 workerNode => workerNode.worker === worker
603 )
604 }
605
606 /**
607 * Gets the worker node key given its worker id.
608 *
609 * @param workerId - The worker id.
610 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
611 */
612 private getWorkerNodeKeyByWorkerId (workerId: number | undefined): number {
613 return this.workerNodes.findIndex(
614 workerNode => workerNode.info.id === workerId
615 )
616 }
617
618 /** @inheritDoc */
619 public setWorkerChoiceStrategy (
620 workerChoiceStrategy: WorkerChoiceStrategy,
621 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
622 ): void {
623 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
624 this.opts.workerChoiceStrategy = workerChoiceStrategy
625 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
626 this.opts.workerChoiceStrategy
627 )
628 if (workerChoiceStrategyOptions != null) {
629 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
630 }
631 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
632 workerNode.resetUsage()
633 this.sendStatisticsMessageToWorker(workerNodeKey)
634 }
635 }
636
637 /** @inheritDoc */
638 public setWorkerChoiceStrategyOptions (
639 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
640 ): void {
641 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
642 this.opts.workerChoiceStrategyOptions = {
643 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
644 ...workerChoiceStrategyOptions
645 }
646 this.workerChoiceStrategyContext.setOptions(
647 this.opts.workerChoiceStrategyOptions
648 )
649 }
650
651 /** @inheritDoc */
652 public enableTasksQueue (
653 enable: boolean,
654 tasksQueueOptions?: TasksQueueOptions
655 ): void {
656 if (this.opts.enableTasksQueue === true && !enable) {
657 this.flushTasksQueues()
658 }
659 this.opts.enableTasksQueue = enable
660 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
661 }
662
663 /** @inheritDoc */
664 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
665 if (this.opts.enableTasksQueue === true) {
666 this.checkValidTasksQueueOptions(tasksQueueOptions)
667 this.opts.tasksQueueOptions =
668 this.buildTasksQueueOptions(tasksQueueOptions)
669 this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number)
670 } else if (this.opts.tasksQueueOptions != null) {
671 delete this.opts.tasksQueueOptions
672 }
673 }
674
675 private setTasksQueueSize (size: number): void {
676 for (const workerNode of this.workerNodes) {
677 workerNode.tasksQueueBackPressureSize = size
678 }
679 }
680
681 private buildTasksQueueOptions (
682 tasksQueueOptions: TasksQueueOptions
683 ): TasksQueueOptions {
684 return {
685 ...{
686 size: Math.pow(this.maxSize, 2),
687 concurrency: 1
688 },
689 ...tasksQueueOptions
690 }
691 }
692
693 /**
694 * Whether the pool is full or not.
695 *
696 * The pool filling boolean status.
697 */
698 protected get full (): boolean {
699 return this.workerNodes.length >= this.maxSize
700 }
701
702 /**
703 * Whether the pool is busy or not.
704 *
705 * The pool busyness boolean status.
706 */
707 protected abstract get busy (): boolean
708
709 /**
710 * Whether worker nodes are executing concurrently their tasks quota or not.
711 *
712 * @returns Worker nodes busyness boolean status.
713 */
714 protected internalBusy (): boolean {
715 if (this.opts.enableTasksQueue === true) {
716 return (
717 this.workerNodes.findIndex(
718 workerNode =>
719 workerNode.info.ready &&
720 workerNode.usage.tasks.executing <
721 (this.opts.tasksQueueOptions?.concurrency as number)
722 ) === -1
723 )
724 }
725 return (
726 this.workerNodes.findIndex(
727 workerNode =>
728 workerNode.info.ready && workerNode.usage.tasks.executing === 0
729 ) === -1
730 )
731 }
732
733 private async sendTaskFunctionOperationToWorker (
734 workerNodeKey: number,
735 message: MessageValue<Data>
736 ): Promise<boolean> {
737 const workerId = this.getWorkerInfo(workerNodeKey).id as number
738 return await new Promise<boolean>((resolve, reject) => {
739 this.registerWorkerMessageListener(workerNodeKey, message => {
740 if (
741 message.workerId === workerId &&
742 message.taskFunctionOperationStatus === true
743 ) {
744 resolve(true)
745 } else if (
746 message.workerId === workerId &&
747 message.taskFunctionOperationStatus === false
748 ) {
749 reject(
750 new Error(
751 `Task function operation ${
752 message.taskFunctionOperation as string
753 } failed on worker ${message.workerId}`
754 )
755 )
756 }
757 })
758 this.sendToWorker(workerNodeKey, message)
759 })
760 }
761
762 private async sendTaskFunctionOperationToWorkers (
763 message: Omit<MessageValue<Data>, 'workerId'>
764 ): Promise<boolean> {
765 return await new Promise<boolean>((resolve, reject) => {
766 const responsesReceived = new Array<MessageValue<Data | Response>>()
767 for (const [workerNodeKey] of this.workerNodes.entries()) {
768 this.registerWorkerMessageListener(workerNodeKey, message => {
769 if (message.taskFunctionOperationStatus != null) {
770 responsesReceived.push(message)
771 if (
772 responsesReceived.length === this.workerNodes.length &&
773 responsesReceived.every(
774 message => message.taskFunctionOperationStatus === true
775 )
776 ) {
777 resolve(true)
778 } else if (
779 responsesReceived.length === this.workerNodes.length &&
780 responsesReceived.some(
781 message => message.taskFunctionOperationStatus === false
782 )
783 ) {
784 reject(
785 new Error(
786 `Task function operation ${
787 message.taskFunctionOperation as string
788 } failed on worker ${message.workerId as number}`
789 )
790 )
791 }
792 }
793 })
794 this.sendToWorker(workerNodeKey, message)
795 }
796 })
797 }
798
799 /** @inheritDoc */
800 public hasTaskFunction (name: string): boolean {
801 for (const workerNode of this.workerNodes) {
802 if (
803 Array.isArray(workerNode.info.taskFunctionNames) &&
804 workerNode.info.taskFunctionNames.includes(name)
805 ) {
806 return true
807 }
808 }
809 return false
810 }
811
812 /** @inheritDoc */
813 public async addTaskFunction (
814 name: string,
815 taskFunction: TaskFunction<Data, Response>
816 ): Promise<boolean> {
817 this.taskFunctions.set(name, taskFunction)
818 return await this.sendTaskFunctionOperationToWorkers({
819 taskFunctionOperation: 'add',
820 taskFunctionName: name,
821 taskFunction: taskFunction.toString()
822 })
823 }
824
825 /** @inheritDoc */
826 public async removeTaskFunction (name: string): Promise<boolean> {
827 this.taskFunctions.delete(name)
828 return await this.sendTaskFunctionOperationToWorkers({
829 taskFunctionOperation: 'remove',
830 taskFunctionName: name
831 })
832 }
833
834 /** @inheritDoc */
835 public listTaskFunctionNames (): string[] {
836 for (const workerNode of this.workerNodes) {
837 if (
838 Array.isArray(workerNode.info.taskFunctionNames) &&
839 workerNode.info.taskFunctionNames.length > 0
840 ) {
841 return workerNode.info.taskFunctionNames
842 }
843 }
844 return []
845 }
846
847 /** @inheritDoc */
848 public async setDefaultTaskFunction (name: string): Promise<boolean> {
849 return await this.sendTaskFunctionOperationToWorkers({
850 taskFunctionOperation: 'default',
851 taskFunctionName: name
852 })
853 }
854
855 private shallExecuteTask (workerNodeKey: number): boolean {
856 return (
857 this.tasksQueueSize(workerNodeKey) === 0 &&
858 this.workerNodes[workerNodeKey].usage.tasks.executing <
859 (this.opts.tasksQueueOptions?.concurrency as number)
860 )
861 }
862
863 /** @inheritDoc */
864 public async execute (
865 data?: Data,
866 name?: string,
867 transferList?: TransferListItem[]
868 ): Promise<Response> {
869 return await new Promise<Response>((resolve, reject) => {
870 if (!this.started) {
871 reject(new Error('Cannot execute a task on destroyed pool'))
872 return
873 }
874 if (name != null && typeof name !== 'string') {
875 reject(new TypeError('name argument must be a string'))
876 return
877 }
878 if (
879 name != null &&
880 typeof name === 'string' &&
881 name.trim().length === 0
882 ) {
883 reject(new TypeError('name argument must not be an empty string'))
884 return
885 }
886 if (transferList != null && !Array.isArray(transferList)) {
887 reject(new TypeError('transferList argument must be an array'))
888 return
889 }
890 const timestamp = performance.now()
891 const workerNodeKey = this.chooseWorkerNode()
892 const task: Task<Data> = {
893 name: name ?? DEFAULT_TASK_NAME,
894 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
895 data: data ?? ({} as Data),
896 transferList,
897 timestamp,
898 taskId: randomUUID()
899 }
900 this.promiseResponseMap.set(task.taskId as string, {
901 resolve,
902 reject,
903 workerNodeKey
904 })
905 if (
906 this.opts.enableTasksQueue === false ||
907 (this.opts.enableTasksQueue === true &&
908 this.shallExecuteTask(workerNodeKey))
909 ) {
910 this.executeTask(workerNodeKey, task)
911 } else {
912 this.enqueueTask(workerNodeKey, task)
913 }
914 })
915 }
916
917 /** @inheritDoc */
918 public async destroy (): Promise<void> {
919 await Promise.all(
920 this.workerNodes.map(async (_, workerNodeKey) => {
921 await this.destroyWorkerNode(workerNodeKey)
922 })
923 )
924 this.emitter?.emit(PoolEvents.destroy, this.info)
925 this.started = false
926 }
927
928 protected async sendKillMessageToWorker (
929 workerNodeKey: number
930 ): Promise<void> {
931 await new Promise<void>((resolve, reject) => {
932 this.registerWorkerMessageListener(workerNodeKey, message => {
933 if (message.kill === 'success') {
934 resolve()
935 } else if (message.kill === 'failure') {
936 reject(
937 new Error(
938 `Worker ${
939 message.workerId as number
940 } kill message handling failed`
941 )
942 )
943 }
944 })
945 this.sendToWorker(workerNodeKey, { kill: true })
946 })
947 }
948
949 /**
950 * Terminates the worker node given its worker node key.
951 *
952 * @param workerNodeKey - The worker node key.
953 */
954 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
955
956 /**
957 * Setup hook to execute code before worker nodes are created in the abstract constructor.
958 * Can be overridden.
959 *
960 * @virtual
961 */
962 protected setupHook (): void {
963 /* Intentionally empty */
964 }
965
966 /**
967 * Should return whether the worker is the main worker or not.
968 */
969 protected abstract isMain (): boolean
970
971 /**
972 * Hook executed before the worker task execution.
973 * Can be overridden.
974 *
975 * @param workerNodeKey - The worker node key.
976 * @param task - The task to execute.
977 */
978 protected beforeTaskExecutionHook (
979 workerNodeKey: number,
980 task: Task<Data>
981 ): void {
982 if (this.workerNodes[workerNodeKey]?.usage != null) {
983 const workerUsage = this.workerNodes[workerNodeKey].usage
984 ++workerUsage.tasks.executing
985 this.updateWaitTimeWorkerUsage(workerUsage, task)
986 }
987 if (
988 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
989 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
990 task.name as string
991 ) != null
992 ) {
993 const taskFunctionWorkerUsage = this.workerNodes[
994 workerNodeKey
995 ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
996 ++taskFunctionWorkerUsage.tasks.executing
997 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
998 }
999 }
1000
1001 /**
1002 * Hook executed after the worker task execution.
1003 * Can be overridden.
1004 *
1005 * @param workerNodeKey - The worker node key.
1006 * @param message - The received message.
1007 */
1008 protected afterTaskExecutionHook (
1009 workerNodeKey: number,
1010 message: MessageValue<Response>
1011 ): void {
1012 if (this.workerNodes[workerNodeKey]?.usage != null) {
1013 const workerUsage = this.workerNodes[workerNodeKey].usage
1014 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
1015 this.updateRunTimeWorkerUsage(workerUsage, message)
1016 this.updateEluWorkerUsage(workerUsage, message)
1017 }
1018 if (
1019 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1020 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
1021 message.taskPerformance?.name as string
1022 ) != null
1023 ) {
1024 const taskFunctionWorkerUsage = this.workerNodes[
1025 workerNodeKey
1026 ].getTaskFunctionWorkerUsage(
1027 message.taskPerformance?.name as string
1028 ) as WorkerUsage
1029 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
1030 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
1031 this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
1032 }
1033 }
1034
1035 /**
1036 * Whether the worker node shall update its task function worker usage or not.
1037 *
1038 * @param workerNodeKey - The worker node key.
1039 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1040 */
1041 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
1042 const workerInfo = this.getWorkerInfo(workerNodeKey)
1043 return (
1044 workerInfo != null &&
1045 Array.isArray(workerInfo.taskFunctionNames) &&
1046 workerInfo.taskFunctionNames.length > 2
1047 )
1048 }
1049
1050 private updateTaskStatisticsWorkerUsage (
1051 workerUsage: WorkerUsage,
1052 message: MessageValue<Response>
1053 ): void {
1054 const workerTaskStatistics = workerUsage.tasks
1055 if (
1056 workerTaskStatistics.executing != null &&
1057 workerTaskStatistics.executing > 0
1058 ) {
1059 --workerTaskStatistics.executing
1060 }
1061 if (message.workerError == null) {
1062 ++workerTaskStatistics.executed
1063 } else {
1064 ++workerTaskStatistics.failed
1065 }
1066 }
1067
1068 private updateRunTimeWorkerUsage (
1069 workerUsage: WorkerUsage,
1070 message: MessageValue<Response>
1071 ): void {
1072 if (message.workerError != null) {
1073 return
1074 }
1075 updateMeasurementStatistics(
1076 workerUsage.runTime,
1077 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
1078 message.taskPerformance?.runTime ?? 0
1079 )
1080 }
1081
1082 private updateWaitTimeWorkerUsage (
1083 workerUsage: WorkerUsage,
1084 task: Task<Data>
1085 ): void {
1086 const timestamp = performance.now()
1087 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
1088 updateMeasurementStatistics(
1089 workerUsage.waitTime,
1090 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
1091 taskWaitTime
1092 )
1093 }
1094
1095 private updateEluWorkerUsage (
1096 workerUsage: WorkerUsage,
1097 message: MessageValue<Response>
1098 ): void {
1099 if (message.workerError != null) {
1100 return
1101 }
1102 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
1103 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
1104 updateMeasurementStatistics(
1105 workerUsage.elu.active,
1106 eluTaskStatisticsRequirements,
1107 message.taskPerformance?.elu?.active ?? 0
1108 )
1109 updateMeasurementStatistics(
1110 workerUsage.elu.idle,
1111 eluTaskStatisticsRequirements,
1112 message.taskPerformance?.elu?.idle ?? 0
1113 )
1114 if (eluTaskStatisticsRequirements.aggregate) {
1115 if (message.taskPerformance?.elu != null) {
1116 if (workerUsage.elu.utilization != null) {
1117 workerUsage.elu.utilization =
1118 (workerUsage.elu.utilization +
1119 message.taskPerformance.elu.utilization) /
1120 2
1121 } else {
1122 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
1123 }
1124 }
1125 }
1126 }
1127
1128 /**
1129 * Chooses a worker node for the next task.
1130 *
1131 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
1132 *
1133 * @returns The chosen worker node key
1134 */
1135 private chooseWorkerNode (): number {
1136 if (this.shallCreateDynamicWorker()) {
1137 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
1138 if (
1139 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1140 ) {
1141 return workerNodeKey
1142 }
1143 }
1144 return this.workerChoiceStrategyContext.execute()
1145 }
1146
1147 /**
1148 * Conditions for dynamic worker creation.
1149 *
1150 * @returns Whether to create a dynamic worker or not.
1151 */
1152 private shallCreateDynamicWorker (): boolean {
1153 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
1154 }
1155
1156 /**
1157 * Sends a message to worker given its worker node key.
1158 *
1159 * @param workerNodeKey - The worker node key.
1160 * @param message - The message.
1161 * @param transferList - The optional array of transferable objects.
1162 */
1163 protected abstract sendToWorker (
1164 workerNodeKey: number,
1165 message: MessageValue<Data>,
1166 transferList?: TransferListItem[]
1167 ): void
1168
1169 /**
1170 * Creates a new worker.
1171 *
1172 * @returns Newly created worker.
1173 */
1174 protected abstract createWorker (): Worker
1175
1176 /**
1177 * Creates a new, completely set up worker node.
1178 *
1179 * @returns New, completely set up worker node key.
1180 */
1181 protected createAndSetupWorkerNode (): number {
1182 const worker = this.createWorker()
1183
1184 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
1185 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
1186 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1187 worker.on('error', error => {
1188 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1189 const workerInfo = this.getWorkerInfo(workerNodeKey)
1190 workerInfo.ready = false
1191 this.workerNodes[workerNodeKey].closeChannel()
1192 this.emitter?.emit(PoolEvents.error, error)
1193 if (
1194 this.opts.restartWorkerOnError === true &&
1195 this.started &&
1196 !this.starting
1197 ) {
1198 if (workerInfo.dynamic) {
1199 this.createAndSetupDynamicWorkerNode()
1200 } else {
1201 this.createAndSetupWorkerNode()
1202 }
1203 }
1204 if (this.opts.enableTasksQueue === true) {
1205 this.redistributeQueuedTasks(workerNodeKey)
1206 }
1207 })
1208 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
1209 worker.once('exit', () => {
1210 this.removeWorkerNode(worker)
1211 })
1212
1213 const workerNodeKey = this.addWorkerNode(worker)
1214
1215 this.afterWorkerNodeSetup(workerNodeKey)
1216
1217 return workerNodeKey
1218 }
1219
1220 /**
1221 * Creates a new, completely set up dynamic worker node.
1222 *
1223 * @returns New, completely set up dynamic worker node key.
1224 */
1225 protected createAndSetupDynamicWorkerNode (): number {
1226 const workerNodeKey = this.createAndSetupWorkerNode()
1227 this.registerWorkerMessageListener(workerNodeKey, message => {
1228 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1229 message.workerId
1230 )
1231 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
1232 // Kill message received from worker
1233 if (
1234 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1235 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
1236 ((this.opts.enableTasksQueue === false &&
1237 workerUsage.tasks.executing === 0) ||
1238 (this.opts.enableTasksQueue === true &&
1239 workerUsage.tasks.executing === 0 &&
1240 this.tasksQueueSize(localWorkerNodeKey) === 0)))
1241 ) {
1242 this.destroyWorkerNode(localWorkerNodeKey).catch(error => {
1243 this.emitter?.emit(PoolEvents.error, error)
1244 })
1245 }
1246 })
1247 const workerInfo = this.getWorkerInfo(workerNodeKey)
1248 this.sendToWorker(workerNodeKey, {
1249 checkActive: true
1250 })
1251 if (this.taskFunctions.size > 0) {
1252 for (const [taskFunctionName, taskFunction] of this.taskFunctions) {
1253 this.sendTaskFunctionOperationToWorker(workerNodeKey, {
1254 taskFunctionOperation: 'add',
1255 taskFunctionName,
1256 taskFunction: taskFunction.toString()
1257 }).catch(error => {
1258 this.emitter?.emit(PoolEvents.error, error)
1259 })
1260 }
1261 }
1262 workerInfo.dynamic = true
1263 if (
1264 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
1265 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1266 ) {
1267 workerInfo.ready = true
1268 }
1269 this.checkAndEmitDynamicWorkerCreationEvents()
1270 return workerNodeKey
1271 }
1272
1273 /**
1274 * Registers a listener callback on the worker given its worker node key.
1275 *
1276 * @param workerNodeKey - The worker node key.
1277 * @param listener - The message listener callback.
1278 */
1279 protected abstract registerWorkerMessageListener<
1280 Message extends Data | Response
1281 >(
1282 workerNodeKey: number,
1283 listener: (message: MessageValue<Message>) => void
1284 ): void
1285
1286 /**
1287 * Method hooked up after a worker node has been newly created.
1288 * Can be overridden.
1289 *
1290 * @param workerNodeKey - The newly created worker node key.
1291 */
1292 protected afterWorkerNodeSetup (workerNodeKey: number): void {
1293 // Listen to worker messages.
1294 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
1295 // Send the startup message to worker.
1296 this.sendStartupMessageToWorker(workerNodeKey)
1297 // Send the statistics message to worker.
1298 this.sendStatisticsMessageToWorker(workerNodeKey)
1299 if (this.opts.enableTasksQueue === true) {
1300 this.workerNodes[workerNodeKey].onEmptyQueue =
1301 this.taskStealingOnEmptyQueue.bind(this)
1302 this.workerNodes[workerNodeKey].onBackPressure =
1303 this.tasksStealingOnBackPressure.bind(this)
1304 }
1305 }
1306
1307 /**
1308 * Sends the startup message to worker given its worker node key.
1309 *
1310 * @param workerNodeKey - The worker node key.
1311 */
1312 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1313
1314 /**
1315 * Sends the statistics message to worker given its worker node key.
1316 *
1317 * @param workerNodeKey - The worker node key.
1318 */
1319 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
1320 this.sendToWorker(workerNodeKey, {
1321 statistics: {
1322 runTime:
1323 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1324 .runTime.aggregate,
1325 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1326 .elu.aggregate
1327 }
1328 })
1329 }
1330
1331 private redistributeQueuedTasks (workerNodeKey: number): void {
1332 while (this.tasksQueueSize(workerNodeKey) > 0) {
1333 const destinationWorkerNodeKey = this.workerNodes.reduce(
1334 (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
1335 return workerNode.info.ready &&
1336 workerNode.usage.tasks.queued <
1337 workerNodes[minWorkerNodeKey].usage.tasks.queued
1338 ? workerNodeKey
1339 : minWorkerNodeKey
1340 },
1341 0
1342 )
1343 const task = this.dequeueTask(workerNodeKey) as Task<Data>
1344 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1345 this.executeTask(destinationWorkerNodeKey, task)
1346 } else {
1347 this.enqueueTask(destinationWorkerNodeKey, task)
1348 }
1349 }
1350 }
1351
1352 private updateTaskStolenStatisticsWorkerUsage (
1353 workerNodeKey: number,
1354 taskName: string
1355 ): void {
1356 const workerNode = this.workerNodes[workerNodeKey]
1357 if (workerNode?.usage != null) {
1358 ++workerNode.usage.tasks.stolen
1359 }
1360 if (
1361 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1362 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1363 ) {
1364 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1365 taskName
1366 ) as WorkerUsage
1367 ++taskFunctionWorkerUsage.tasks.stolen
1368 }
1369 }
1370
1371 private taskStealingOnEmptyQueue (workerId: number): void {
1372 const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
1373 const workerNodes = this.workerNodes
1374 .slice()
1375 .sort(
1376 (workerNodeA, workerNodeB) =>
1377 workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
1378 )
1379 const sourceWorkerNode = workerNodes.find(
1380 workerNode =>
1381 workerNode.info.ready &&
1382 workerNode.info.id !== workerId &&
1383 workerNode.usage.tasks.queued > 0
1384 )
1385 if (sourceWorkerNode != null) {
1386 const task = sourceWorkerNode.popTask() as Task<Data>
1387 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1388 this.executeTask(destinationWorkerNodeKey, task)
1389 } else {
1390 this.enqueueTask(destinationWorkerNodeKey, task)
1391 }
1392 this.updateTaskStolenStatisticsWorkerUsage(
1393 destinationWorkerNodeKey,
1394 task.name as string
1395 )
1396 }
1397 }
1398
1399 private tasksStealingOnBackPressure (workerId: number): void {
1400 const sizeOffset = 1
1401 if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) {
1402 return
1403 }
1404 const sourceWorkerNode =
1405 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
1406 const workerNodes = this.workerNodes
1407 .slice()
1408 .sort(
1409 (workerNodeA, workerNodeB) =>
1410 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1411 )
1412 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1413 if (
1414 sourceWorkerNode.usage.tasks.queued > 0 &&
1415 workerNode.info.ready &&
1416 workerNode.info.id !== workerId &&
1417 workerNode.usage.tasks.queued <
1418 (this.opts.tasksQueueOptions?.size as number) - sizeOffset
1419 ) {
1420 const task = sourceWorkerNode.popTask() as Task<Data>
1421 if (this.shallExecuteTask(workerNodeKey)) {
1422 this.executeTask(workerNodeKey, task)
1423 } else {
1424 this.enqueueTask(workerNodeKey, task)
1425 }
1426 this.updateTaskStolenStatisticsWorkerUsage(
1427 workerNodeKey,
1428 task.name as string
1429 )
1430 }
1431 }
1432 }
1433
1434 /**
1435 * This method is the listener registered for each worker message.
1436 *
1437 * @returns The listener function to execute when a message is received from a worker.
1438 */
1439 protected workerListener (): (message: MessageValue<Response>) => void {
1440 return message => {
1441 this.checkMessageWorkerId(message)
1442 if (message.ready != null && message.taskFunctionNames != null) {
1443 // Worker ready response received from worker
1444 this.handleWorkerReadyResponse(message)
1445 } else if (message.taskId != null) {
1446 // Task execution response received from worker
1447 this.handleTaskExecutionResponse(message)
1448 } else if (message.taskFunctionNames != null) {
1449 // Task function names message received from worker
1450 this.getWorkerInfo(
1451 this.getWorkerNodeKeyByWorkerId(message.workerId)
1452 ).taskFunctionNames = message.taskFunctionNames
1453 }
1454 }
1455 }
1456
1457 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
1458 if (message.ready === false) {
1459 throw new Error(
1460 `Worker ${message.workerId as number} failed to initialize`
1461 )
1462 }
1463 const workerInfo = this.getWorkerInfo(
1464 this.getWorkerNodeKeyByWorkerId(message.workerId)
1465 )
1466 workerInfo.ready = message.ready as boolean
1467 workerInfo.taskFunctionNames = message.taskFunctionNames
1468 if (this.emitter != null && this.ready) {
1469 this.emitter.emit(PoolEvents.ready, this.info)
1470 }
1471 }
1472
1473 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1474 const { taskId, workerError, data } = message
1475 const promiseResponse = this.promiseResponseMap.get(taskId as string)
1476 if (promiseResponse != null) {
1477 if (workerError != null) {
1478 this.emitter?.emit(PoolEvents.taskError, workerError)
1479 promiseResponse.reject(workerError.message)
1480 } else {
1481 promiseResponse.resolve(data as Response)
1482 }
1483 const workerNodeKey = promiseResponse.workerNodeKey
1484 this.afterTaskExecutionHook(workerNodeKey, message)
1485 this.workerChoiceStrategyContext.update(workerNodeKey)
1486 this.promiseResponseMap.delete(taskId as string)
1487 if (
1488 this.opts.enableTasksQueue === true &&
1489 this.tasksQueueSize(workerNodeKey) > 0 &&
1490 this.workerNodes[workerNodeKey].usage.tasks.executing <
1491 (this.opts.tasksQueueOptions?.concurrency as number)
1492 ) {
1493 this.executeTask(
1494 workerNodeKey,
1495 this.dequeueTask(workerNodeKey) as Task<Data>
1496 )
1497 }
1498 }
1499 }
1500
1501 private checkAndEmitTaskExecutionEvents (): void {
1502 if (this.busy) {
1503 this.emitter?.emit(PoolEvents.busy, this.info)
1504 }
1505 }
1506
1507 private checkAndEmitTaskQueuingEvents (): void {
1508 if (this.hasBackPressure()) {
1509 this.emitter?.emit(PoolEvents.backPressure, this.info)
1510 }
1511 }
1512
1513 private checkAndEmitDynamicWorkerCreationEvents (): void {
1514 if (this.type === PoolTypes.dynamic) {
1515 if (this.full) {
1516 this.emitter?.emit(PoolEvents.full, this.info)
1517 }
1518 }
1519 }
1520
1521 /**
1522 * Gets the worker information given its worker node key.
1523 *
1524 * @param workerNodeKey - The worker node key.
1525 * @returns The worker information.
1526 */
1527 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
1528 return this.workerNodes[workerNodeKey].info
1529 }
1530
1531 /**
1532 * Adds the given worker in the pool worker nodes.
1533 *
1534 * @param worker - The worker.
1535 * @returns The added worker node key.
1536 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1537 */
1538 private addWorkerNode (worker: Worker): number {
1539 const workerNode = new WorkerNode<Worker, Data>(
1540 worker,
1541 this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2)
1542 )
1543 // Flag the worker node as ready at pool startup.
1544 if (this.starting) {
1545 workerNode.info.ready = true
1546 }
1547 this.workerNodes.push(workerNode)
1548 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1549 if (workerNodeKey === -1) {
1550 throw new Error('Worker added not found in worker nodes')
1551 }
1552 return workerNodeKey
1553 }
1554
1555 /**
1556 * Removes the given worker from the pool worker nodes.
1557 *
1558 * @param worker - The worker.
1559 */
1560 private removeWorkerNode (worker: Worker): void {
1561 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1562 if (workerNodeKey !== -1) {
1563 this.workerNodes.splice(workerNodeKey, 1)
1564 this.workerChoiceStrategyContext.remove(workerNodeKey)
1565 }
1566 }
1567
1568 /** @inheritDoc */
1569 public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
1570 return (
1571 this.opts.enableTasksQueue === true &&
1572 this.workerNodes[workerNodeKey].hasBackPressure()
1573 )
1574 }
1575
1576 private hasBackPressure (): boolean {
1577 return (
1578 this.opts.enableTasksQueue === true &&
1579 this.workerNodes.findIndex(
1580 workerNode => !workerNode.hasBackPressure()
1581 ) === -1
1582 )
1583 }
1584
1585 /**
1586 * Executes the given task on the worker given its worker node key.
1587 *
1588 * @param workerNodeKey - The worker node key.
1589 * @param task - The task to execute.
1590 */
1591 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1592 this.beforeTaskExecutionHook(workerNodeKey, task)
1593 this.sendToWorker(workerNodeKey, task, task.transferList)
1594 this.checkAndEmitTaskExecutionEvents()
1595 }
1596
1597 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1598 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1599 this.checkAndEmitTaskQueuingEvents()
1600 return tasksQueueSize
1601 }
1602
1603 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1604 return this.workerNodes[workerNodeKey].dequeueTask()
1605 }
1606
1607 private tasksQueueSize (workerNodeKey: number): number {
1608 return this.workerNodes[workerNodeKey].tasksQueueSize()
1609 }
1610
1611 protected flushTasksQueue (workerNodeKey: number): void {
1612 while (this.tasksQueueSize(workerNodeKey) > 0) {
1613 this.executeTask(
1614 workerNodeKey,
1615 this.dequeueTask(workerNodeKey) as Task<Data>
1616 )
1617 }
1618 this.workerNodes[workerNodeKey].clearTasksQueue()
1619 }
1620
1621 private flushTasksQueues (): void {
1622 for (const [workerNodeKey] of this.workerNodes.entries()) {
1623 this.flushTasksQueue(workerNodeKey)
1624 }
1625 }
1626 }