Merge branch 'master' into feature/task-functions
[poolifier.git] / src / pools / abstract-pool.ts
1 import { randomUUID } from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import { existsSync } from 'node:fs'
4 import { type TransferListItem } from 'node:worker_threads'
5 import type {
6 MessageValue,
7 PromiseResponseWrapper,
8 Task
9 } from '../utility-types'
10 import {
11 DEFAULT_TASK_NAME,
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
13 EMPTY_FUNCTION,
14 average,
15 isKillBehavior,
16 isPlainObject,
17 max,
18 median,
19 min,
20 round,
21 updateMeasurementStatistics
22 } from '../utils'
23 import { KillBehaviors } from '../worker/worker-options'
24 import type { TaskFunction } from '../worker/task-functions'
25 import {
26 type IPool,
27 PoolEmitter,
28 PoolEvents,
29 type PoolInfo,
30 type PoolOptions,
31 type PoolType,
32 PoolTypes,
33 type TasksQueueOptions
34 } from './pool'
35 import type {
36 IWorker,
37 IWorkerNode,
38 WorkerInfo,
39 WorkerType,
40 WorkerUsage
41 } from './worker'
42 import {
43 type MeasurementStatisticsRequirements,
44 Measurements,
45 WorkerChoiceStrategies,
46 type WorkerChoiceStrategy,
47 type WorkerChoiceStrategyOptions
48 } from './selection-strategies/selection-strategies-types'
49 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
50 import { version } from './version'
51 import { WorkerNode } from './worker-node'
52
53 /**
54 * Base class that implements some shared logic for all poolifier pools.
55 *
56 * @typeParam Worker - Type of worker which manages this pool.
57 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
58 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
59 */
60 export abstract class AbstractPool<
61 Worker extends IWorker,
62 Data = unknown,
63 Response = unknown
64 > implements IPool<Worker, Data, Response> {
65 /** @inheritDoc */
66 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
67
68 /** @inheritDoc */
69 public readonly emitter?: PoolEmitter
70
71 /**
72 * The task execution response promise map:
73 * - `key`: The message id of each submitted task.
74 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
75 *
76 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
77 */
78 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
79 new Map<string, PromiseResponseWrapper<Response>>()
80
81 /**
82 * Worker choice strategy context referencing a worker choice algorithm implementation.
83 */
84 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
85 Worker,
86 Data,
87 Response
88 >
89
90 /**
91 * Dynamic pool maximum size property placeholder.
92 */
93 protected readonly max?: number
94
95 /**
96 * The task functions added at runtime map:
97 * - `key`: The task function name.
98 * - `value`: The task function itself.
99 */
100 private readonly taskFunctions: Map<string, TaskFunction<Data, Response>>
101
102 /**
103 * Whether the pool is started or not.
104 */
105 private started: boolean
106 /**
107 * Whether the pool is starting or not.
108 */
109 private starting: boolean
110 /**
111 * The start timestamp of the pool.
112 */
113 private readonly startTimestamp
114
115 /**
116 * Constructs a new poolifier pool.
117 *
118 * @param numberOfWorkers - Number of workers that this pool should manage.
119 * @param filePath - Path to the worker file.
120 * @param opts - Options for the pool.
121 */
122 public constructor (
123 protected readonly numberOfWorkers: number,
124 protected readonly filePath: string,
125 protected readonly opts: PoolOptions<Worker>
126 ) {
127 if (!this.isMain()) {
128 throw new Error(
129 'Cannot start a pool from a worker with the same type as the pool'
130 )
131 }
132 this.checkNumberOfWorkers(this.numberOfWorkers)
133 this.checkFilePath(this.filePath)
134 this.checkPoolOptions(this.opts)
135
136 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
137 this.executeTask = this.executeTask.bind(this)
138 this.enqueueTask = this.enqueueTask.bind(this)
139
140 if (this.opts.enableEvents === true) {
141 this.emitter = new PoolEmitter()
142 }
143 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
144 Worker,
145 Data,
146 Response
147 >(
148 this,
149 this.opts.workerChoiceStrategy,
150 this.opts.workerChoiceStrategyOptions
151 )
152
153 this.setupHook()
154
155 this.taskFunctions = new Map<string, TaskFunction<Data, Response>>()
156
157 this.started = false
158 this.starting = false
159 if (this.opts.startWorkers === true) {
160 this.start()
161 }
162
163 this.startTimestamp = performance.now()
164 }
165
166 private checkFilePath (filePath: string): void {
167 if (
168 filePath == null ||
169 typeof filePath !== 'string' ||
170 (typeof filePath === 'string' && filePath.trim().length === 0)
171 ) {
172 throw new Error('Please specify a file with a worker implementation')
173 }
174 if (!existsSync(filePath)) {
175 throw new Error(`Cannot find the worker file '${filePath}'`)
176 }
177 }
178
179 private checkNumberOfWorkers (numberOfWorkers: number): void {
180 if (numberOfWorkers == null) {
181 throw new Error(
182 'Cannot instantiate a pool without specifying the number of workers'
183 )
184 } else if (!Number.isSafeInteger(numberOfWorkers)) {
185 throw new TypeError(
186 'Cannot instantiate a pool with a non safe integer number of workers'
187 )
188 } else if (numberOfWorkers < 0) {
189 throw new RangeError(
190 'Cannot instantiate a pool with a negative number of workers'
191 )
192 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
193 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
194 }
195 }
196
197 protected checkDynamicPoolSize (min: number, max: number): void {
198 if (this.type === PoolTypes.dynamic) {
199 if (max == null) {
200 throw new TypeError(
201 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
202 )
203 } else if (!Number.isSafeInteger(max)) {
204 throw new TypeError(
205 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
206 )
207 } else if (min > max) {
208 throw new RangeError(
209 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
210 )
211 } else if (max === 0) {
212 throw new RangeError(
213 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
214 )
215 } else if (min === max) {
216 throw new RangeError(
217 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
218 )
219 }
220 }
221 }
222
223 private checkPoolOptions (opts: PoolOptions<Worker>): void {
224 if (isPlainObject(opts)) {
225 this.opts.startWorkers = opts.startWorkers ?? true
226 this.opts.workerChoiceStrategy =
227 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
228 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
229 this.opts.workerChoiceStrategyOptions = {
230 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
231 ...opts.workerChoiceStrategyOptions
232 }
233 this.checkValidWorkerChoiceStrategyOptions(
234 this.opts.workerChoiceStrategyOptions
235 )
236 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
237 this.opts.enableEvents = opts.enableEvents ?? true
238 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
239 if (this.opts.enableTasksQueue) {
240 this.checkValidTasksQueueOptions(
241 opts.tasksQueueOptions as TasksQueueOptions
242 )
243 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
244 opts.tasksQueueOptions as TasksQueueOptions
245 )
246 }
247 } else {
248 throw new TypeError('Invalid pool options: must be a plain object')
249 }
250 }
251
252 private checkValidWorkerChoiceStrategy (
253 workerChoiceStrategy: WorkerChoiceStrategy
254 ): void {
255 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
256 throw new Error(
257 `Invalid worker choice strategy '${workerChoiceStrategy}'`
258 )
259 }
260 }
261
262 private checkValidWorkerChoiceStrategyOptions (
263 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
264 ): void {
265 if (!isPlainObject(workerChoiceStrategyOptions)) {
266 throw new TypeError(
267 'Invalid worker choice strategy options: must be a plain object'
268 )
269 }
270 if (
271 workerChoiceStrategyOptions.retries != null &&
272 !Number.isSafeInteger(workerChoiceStrategyOptions.retries)
273 ) {
274 throw new TypeError(
275 'Invalid worker choice strategy options: retries must be an integer'
276 )
277 }
278 if (
279 workerChoiceStrategyOptions.retries != null &&
280 workerChoiceStrategyOptions.retries < 0
281 ) {
282 throw new RangeError(
283 `Invalid worker choice strategy options: retries '${workerChoiceStrategyOptions.retries}' must be greater or equal than zero`
284 )
285 }
286 if (
287 workerChoiceStrategyOptions.weights != null &&
288 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
289 ) {
290 throw new Error(
291 'Invalid worker choice strategy options: must have a weight for each worker node'
292 )
293 }
294 if (
295 workerChoiceStrategyOptions.measurement != null &&
296 !Object.values(Measurements).includes(
297 workerChoiceStrategyOptions.measurement
298 )
299 ) {
300 throw new Error(
301 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
302 )
303 }
304 }
305
306 private checkValidTasksQueueOptions (
307 tasksQueueOptions: TasksQueueOptions
308 ): void {
309 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
310 throw new TypeError('Invalid tasks queue options: must be a plain object')
311 }
312 if (
313 tasksQueueOptions?.concurrency != null &&
314 !Number.isSafeInteger(tasksQueueOptions?.concurrency)
315 ) {
316 throw new TypeError(
317 'Invalid worker node tasks concurrency: must be an integer'
318 )
319 }
320 if (
321 tasksQueueOptions?.concurrency != null &&
322 tasksQueueOptions?.concurrency <= 0
323 ) {
324 throw new RangeError(
325 `Invalid worker node tasks concurrency: ${tasksQueueOptions?.concurrency} is a negative integer or zero`
326 )
327 }
328 if (
329 tasksQueueOptions?.size != null &&
330 !Number.isSafeInteger(tasksQueueOptions?.size)
331 ) {
332 throw new TypeError(
333 'Invalid worker node tasks queue size: must be an integer'
334 )
335 }
336 if (tasksQueueOptions?.size != null && tasksQueueOptions?.size <= 0) {
337 throw new RangeError(
338 `Invalid worker node tasks queue size: ${tasksQueueOptions?.size} is a negative integer or zero`
339 )
340 }
341 }
342
343 /** @inheritDoc */
344 public get info (): PoolInfo {
345 return {
346 version,
347 type: this.type,
348 worker: this.worker,
349 started: this.started,
350 ready: this.ready,
351 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
352 minSize: this.minSize,
353 maxSize: this.maxSize,
354 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
355 .runTime.aggregate &&
356 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
357 .waitTime.aggregate && { utilization: round(this.utilization) }),
358 workerNodes: this.workerNodes.length,
359 idleWorkerNodes: this.workerNodes.reduce(
360 (accumulator, workerNode) =>
361 workerNode.usage.tasks.executing === 0
362 ? accumulator + 1
363 : accumulator,
364 0
365 ),
366 busyWorkerNodes: this.workerNodes.reduce(
367 (accumulator, workerNode) =>
368 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
369 0
370 ),
371 executedTasks: this.workerNodes.reduce(
372 (accumulator, workerNode) =>
373 accumulator + workerNode.usage.tasks.executed,
374 0
375 ),
376 executingTasks: this.workerNodes.reduce(
377 (accumulator, workerNode) =>
378 accumulator + workerNode.usage.tasks.executing,
379 0
380 ),
381 ...(this.opts.enableTasksQueue === true && {
382 queuedTasks: this.workerNodes.reduce(
383 (accumulator, workerNode) =>
384 accumulator + workerNode.usage.tasks.queued,
385 0
386 )
387 }),
388 ...(this.opts.enableTasksQueue === true && {
389 maxQueuedTasks: this.workerNodes.reduce(
390 (accumulator, workerNode) =>
391 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
392 0
393 )
394 }),
395 ...(this.opts.enableTasksQueue === true && {
396 backPressure: this.hasBackPressure()
397 }),
398 ...(this.opts.enableTasksQueue === true && {
399 stolenTasks: this.workerNodes.reduce(
400 (accumulator, workerNode) =>
401 accumulator + workerNode.usage.tasks.stolen,
402 0
403 )
404 }),
405 failedTasks: this.workerNodes.reduce(
406 (accumulator, workerNode) =>
407 accumulator + workerNode.usage.tasks.failed,
408 0
409 ),
410 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
411 .runTime.aggregate && {
412 runTime: {
413 minimum: round(
414 min(
415 ...this.workerNodes.map(
416 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
417 )
418 )
419 ),
420 maximum: round(
421 max(
422 ...this.workerNodes.map(
423 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
424 )
425 )
426 ),
427 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
428 .runTime.average && {
429 average: round(
430 average(
431 this.workerNodes.reduce<number[]>(
432 (accumulator, workerNode) =>
433 accumulator.concat(workerNode.usage.runTime.history),
434 []
435 )
436 )
437 )
438 }),
439 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
440 .runTime.median && {
441 median: round(
442 median(
443 this.workerNodes.reduce<number[]>(
444 (accumulator, workerNode) =>
445 accumulator.concat(workerNode.usage.runTime.history),
446 []
447 )
448 )
449 )
450 })
451 }
452 }),
453 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
454 .waitTime.aggregate && {
455 waitTime: {
456 minimum: round(
457 min(
458 ...this.workerNodes.map(
459 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
460 )
461 )
462 ),
463 maximum: round(
464 max(
465 ...this.workerNodes.map(
466 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
467 )
468 )
469 ),
470 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
471 .waitTime.average && {
472 average: round(
473 average(
474 this.workerNodes.reduce<number[]>(
475 (accumulator, workerNode) =>
476 accumulator.concat(workerNode.usage.waitTime.history),
477 []
478 )
479 )
480 )
481 }),
482 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
483 .waitTime.median && {
484 median: round(
485 median(
486 this.workerNodes.reduce<number[]>(
487 (accumulator, workerNode) =>
488 accumulator.concat(workerNode.usage.waitTime.history),
489 []
490 )
491 )
492 )
493 })
494 }
495 })
496 }
497 }
498
499 /**
500 * The pool readiness boolean status.
501 */
502 private get ready (): boolean {
503 return (
504 this.workerNodes.reduce(
505 (accumulator, workerNode) =>
506 !workerNode.info.dynamic && workerNode.info.ready
507 ? accumulator + 1
508 : accumulator,
509 0
510 ) >= this.minSize
511 )
512 }
513
514 /**
515 * The approximate pool utilization.
516 *
517 * @returns The pool utilization.
518 */
519 private get utilization (): number {
520 const poolTimeCapacity =
521 (performance.now() - this.startTimestamp) * this.maxSize
522 const totalTasksRunTime = this.workerNodes.reduce(
523 (accumulator, workerNode) =>
524 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
525 0
526 )
527 const totalTasksWaitTime = this.workerNodes.reduce(
528 (accumulator, workerNode) =>
529 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
530 0
531 )
532 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
533 }
534
535 /**
536 * The pool type.
537 *
538 * If it is `'dynamic'`, it provides the `max` property.
539 */
540 protected abstract get type (): PoolType
541
542 /**
543 * The worker type.
544 */
545 protected abstract get worker (): WorkerType
546
547 /**
548 * The pool minimum size.
549 */
550 protected get minSize (): number {
551 return this.numberOfWorkers
552 }
553
554 /**
555 * The pool maximum size.
556 */
557 protected get maxSize (): number {
558 return this.max ?? this.numberOfWorkers
559 }
560
561 /**
562 * Checks if the worker id sent in the received message from a worker is valid.
563 *
564 * @param message - The received message.
565 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
566 */
567 private checkMessageWorkerId (message: MessageValue<Response>): void {
568 if (message.workerId == null) {
569 throw new Error('Worker message received without worker id')
570 } else if (
571 message.workerId != null &&
572 this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
573 ) {
574 throw new Error(
575 `Worker message received from unknown worker '${message.workerId}'`
576 )
577 }
578 }
579
580 /**
581 * Gets the given worker its worker node key.
582 *
583 * @param worker - The worker.
584 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
585 */
586 private getWorkerNodeKeyByWorker (worker: Worker): number {
587 return this.workerNodes.findIndex(
588 workerNode => workerNode.worker === worker
589 )
590 }
591
592 /**
593 * Gets the worker node key given its worker id.
594 *
595 * @param workerId - The worker id.
596 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
597 */
598 private getWorkerNodeKeyByWorkerId (workerId: number | undefined): number {
599 return this.workerNodes.findIndex(
600 workerNode => workerNode.info.id === workerId
601 )
602 }
603
604 /** @inheritDoc */
605 public setWorkerChoiceStrategy (
606 workerChoiceStrategy: WorkerChoiceStrategy,
607 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
608 ): void {
609 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
610 this.opts.workerChoiceStrategy = workerChoiceStrategy
611 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
612 this.opts.workerChoiceStrategy
613 )
614 if (workerChoiceStrategyOptions != null) {
615 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
616 }
617 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
618 workerNode.resetUsage()
619 this.sendStatisticsMessageToWorker(workerNodeKey)
620 }
621 }
622
623 /** @inheritDoc */
624 public setWorkerChoiceStrategyOptions (
625 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
626 ): void {
627 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
628 this.opts.workerChoiceStrategyOptions = {
629 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
630 ...workerChoiceStrategyOptions
631 }
632 this.workerChoiceStrategyContext.setOptions(
633 this.opts.workerChoiceStrategyOptions
634 )
635 }
636
637 /** @inheritDoc */
638 public enableTasksQueue (
639 enable: boolean,
640 tasksQueueOptions?: TasksQueueOptions
641 ): void {
642 if (this.opts.enableTasksQueue === true && !enable) {
643 this.flushTasksQueues()
644 }
645 this.opts.enableTasksQueue = enable
646 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
647 }
648
649 /** @inheritDoc */
650 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
651 if (this.opts.enableTasksQueue === true) {
652 this.checkValidTasksQueueOptions(tasksQueueOptions)
653 this.opts.tasksQueueOptions =
654 this.buildTasksQueueOptions(tasksQueueOptions)
655 this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number)
656 } else if (this.opts.tasksQueueOptions != null) {
657 delete this.opts.tasksQueueOptions
658 }
659 }
660
661 private setTasksQueueSize (size: number): void {
662 for (const workerNode of this.workerNodes) {
663 workerNode.tasksQueueBackPressureSize = size
664 }
665 }
666
667 private buildTasksQueueOptions (
668 tasksQueueOptions: TasksQueueOptions
669 ): TasksQueueOptions {
670 return {
671 ...{
672 size: Math.pow(this.maxSize, 2),
673 concurrency: 1,
674 taskStealing: true,
675 tasksStealingOnBackPressure: true
676 },
677 ...tasksQueueOptions
678 }
679 }
680
681 /**
682 * Whether the pool is full or not.
683 *
684 * The pool filling boolean status.
685 */
686 protected get full (): boolean {
687 return this.workerNodes.length >= this.maxSize
688 }
689
690 /**
691 * Whether the pool is busy or not.
692 *
693 * The pool busyness boolean status.
694 */
695 protected abstract get busy (): boolean
696
697 /**
698 * Whether worker nodes are executing concurrently their tasks quota or not.
699 *
700 * @returns Worker nodes busyness boolean status.
701 */
702 protected internalBusy (): boolean {
703 if (this.opts.enableTasksQueue === true) {
704 return (
705 this.workerNodes.findIndex(
706 workerNode =>
707 workerNode.info.ready &&
708 workerNode.usage.tasks.executing <
709 (this.opts.tasksQueueOptions?.concurrency as number)
710 ) === -1
711 )
712 }
713 return (
714 this.workerNodes.findIndex(
715 workerNode =>
716 workerNode.info.ready && workerNode.usage.tasks.executing === 0
717 ) === -1
718 )
719 }
720
721 private async sendTaskFunctionOperationToWorker (
722 workerNodeKey: number,
723 message: MessageValue<Data>
724 ): Promise<boolean> {
725 const workerId = this.getWorkerInfo(workerNodeKey).id as number
726 return await new Promise<boolean>((resolve, reject) => {
727 this.registerWorkerMessageListener(workerNodeKey, message => {
728 if (
729 message.workerId === workerId &&
730 message.taskFunctionOperationStatus === true
731 ) {
732 resolve(true)
733 } else if (
734 message.workerId === workerId &&
735 message.taskFunctionOperationStatus === false
736 ) {
737 reject(
738 new Error(
739 `Task function operation ${
740 message.taskFunctionOperation as string
741 } failed on worker ${message.workerId}`
742 )
743 )
744 }
745 })
746 this.sendToWorker(workerNodeKey, message)
747 })
748 }
749
750 private async sendTaskFunctionOperationToWorkers (
751 message: Omit<MessageValue<Data>, 'workerId'>
752 ): Promise<boolean> {
753 return await new Promise<boolean>((resolve, reject) => {
754 const responsesReceived = new Array<MessageValue<Data | Response>>()
755 for (const [workerNodeKey] of this.workerNodes.entries()) {
756 this.registerWorkerMessageListener(workerNodeKey, message => {
757 if (message.taskFunctionOperationStatus != null) {
758 responsesReceived.push(message)
759 if (
760 responsesReceived.length === this.workerNodes.length &&
761 responsesReceived.every(
762 message => message.taskFunctionOperationStatus === true
763 )
764 ) {
765 resolve(true)
766 } else if (
767 responsesReceived.length === this.workerNodes.length &&
768 responsesReceived.some(
769 message => message.taskFunctionOperationStatus === false
770 )
771 ) {
772 reject(
773 new Error(
774 `Task function operation ${
775 message.taskFunctionOperation as string
776 } failed on worker ${message.workerId as number}`
777 )
778 )
779 }
780 }
781 })
782 this.sendToWorker(workerNodeKey, message)
783 }
784 })
785 }
786
787 /** @inheritDoc */
788 public hasTaskFunction (name: string): boolean {
789 for (const workerNode of this.workerNodes) {
790 if (
791 Array.isArray(workerNode.info.taskFunctionNames) &&
792 workerNode.info.taskFunctionNames.includes(name)
793 ) {
794 return true
795 }
796 }
797 return false
798 }
799
800 /** @inheritDoc */
801 public async addTaskFunction (
802 name: string,
803 taskFunction: TaskFunction<Data, Response>
804 ): Promise<boolean> {
805 this.taskFunctions.set(name, taskFunction)
806 return await this.sendTaskFunctionOperationToWorkers({
807 taskFunctionOperation: 'add',
808 taskFunctionName: name,
809 taskFunction: taskFunction.toString()
810 })
811 }
812
813 /** @inheritDoc */
814 public async removeTaskFunction (name: string): Promise<boolean> {
815 this.taskFunctions.delete(name)
816 return await this.sendTaskFunctionOperationToWorkers({
817 taskFunctionOperation: 'remove',
818 taskFunctionName: name
819 })
820 }
821
822 /** @inheritDoc */
823 public listTaskFunctionNames (): string[] {
824 for (const workerNode of this.workerNodes) {
825 if (
826 Array.isArray(workerNode.info.taskFunctionNames) &&
827 workerNode.info.taskFunctionNames.length > 0
828 ) {
829 return workerNode.info.taskFunctionNames
830 }
831 }
832 return []
833 }
834
835 /** @inheritDoc */
836 public async setDefaultTaskFunction (name: string): Promise<boolean> {
837 return await this.sendTaskFunctionOperationToWorkers({
838 taskFunctionOperation: 'default',
839 taskFunctionName: name
840 })
841 }
842
843 private shallExecuteTask (workerNodeKey: number): boolean {
844 return (
845 this.tasksQueueSize(workerNodeKey) === 0 &&
846 this.workerNodes[workerNodeKey].usage.tasks.executing <
847 (this.opts.tasksQueueOptions?.concurrency as number)
848 )
849 }
850
851 /** @inheritDoc */
852 public async execute (
853 data?: Data,
854 name?: string,
855 transferList?: TransferListItem[]
856 ): Promise<Response> {
857 return await new Promise<Response>((resolve, reject) => {
858 if (!this.started) {
859 reject(new Error('Cannot execute a task on not started pool'))
860 return
861 }
862 if (name != null && typeof name !== 'string') {
863 reject(new TypeError('name argument must be a string'))
864 return
865 }
866 if (
867 name != null &&
868 typeof name === 'string' &&
869 name.trim().length === 0
870 ) {
871 reject(new TypeError('name argument must not be an empty string'))
872 return
873 }
874 if (transferList != null && !Array.isArray(transferList)) {
875 reject(new TypeError('transferList argument must be an array'))
876 return
877 }
878 const timestamp = performance.now()
879 const workerNodeKey = this.chooseWorkerNode()
880 const task: Task<Data> = {
881 name: name ?? DEFAULT_TASK_NAME,
882 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
883 data: data ?? ({} as Data),
884 transferList,
885 timestamp,
886 taskId: randomUUID()
887 }
888 this.promiseResponseMap.set(task.taskId as string, {
889 resolve,
890 reject,
891 workerNodeKey
892 })
893 if (
894 this.opts.enableTasksQueue === false ||
895 (this.opts.enableTasksQueue === true &&
896 this.shallExecuteTask(workerNodeKey))
897 ) {
898 this.executeTask(workerNodeKey, task)
899 } else {
900 this.enqueueTask(workerNodeKey, task)
901 }
902 })
903 }
904
905 /** @inheritdoc */
906 public start (): void {
907 this.starting = true
908 while (
909 this.workerNodes.reduce(
910 (accumulator, workerNode) =>
911 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
912 0
913 ) < this.numberOfWorkers
914 ) {
915 this.createAndSetupWorkerNode()
916 }
917 this.starting = false
918 this.started = true
919 }
920
921 /** @inheritDoc */
922 public async destroy (): Promise<void> {
923 await Promise.all(
924 this.workerNodes.map(async (_, workerNodeKey) => {
925 await this.destroyWorkerNode(workerNodeKey)
926 })
927 )
928 this.emitter?.emit(PoolEvents.destroy, this.info)
929 this.started = false
930 }
931
932 protected async sendKillMessageToWorker (
933 workerNodeKey: number
934 ): Promise<void> {
935 await new Promise<void>((resolve, reject) => {
936 this.registerWorkerMessageListener(workerNodeKey, message => {
937 if (message.kill === 'success') {
938 resolve()
939 } else if (message.kill === 'failure') {
940 reject(
941 new Error(
942 `Worker ${
943 message.workerId as number
944 } kill message handling failed`
945 )
946 )
947 }
948 })
949 this.sendToWorker(workerNodeKey, { kill: true })
950 })
951 }
952
953 /**
954 * Terminates the worker node given its worker node key.
955 *
956 * @param workerNodeKey - The worker node key.
957 */
958 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
959
960 /**
961 * Setup hook to execute code before worker nodes are created in the abstract constructor.
962 * Can be overridden.
963 *
964 * @virtual
965 */
966 protected setupHook (): void {
967 /* Intentionally empty */
968 }
969
970 /**
971 * Should return whether the worker is the main worker or not.
972 */
973 protected abstract isMain (): boolean
974
975 /**
976 * Hook executed before the worker task execution.
977 * Can be overridden.
978 *
979 * @param workerNodeKey - The worker node key.
980 * @param task - The task to execute.
981 */
982 protected beforeTaskExecutionHook (
983 workerNodeKey: number,
984 task: Task<Data>
985 ): void {
986 if (this.workerNodes[workerNodeKey]?.usage != null) {
987 const workerUsage = this.workerNodes[workerNodeKey].usage
988 ++workerUsage.tasks.executing
989 this.updateWaitTimeWorkerUsage(workerUsage, task)
990 }
991 if (
992 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
993 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
994 task.name as string
995 ) != null
996 ) {
997 const taskFunctionWorkerUsage = this.workerNodes[
998 workerNodeKey
999 ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
1000 ++taskFunctionWorkerUsage.tasks.executing
1001 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
1002 }
1003 }
1004
1005 /**
1006 * Hook executed after the worker task execution.
1007 * Can be overridden.
1008 *
1009 * @param workerNodeKey - The worker node key.
1010 * @param message - The received message.
1011 */
1012 protected afterTaskExecutionHook (
1013 workerNodeKey: number,
1014 message: MessageValue<Response>
1015 ): void {
1016 if (this.workerNodes[workerNodeKey]?.usage != null) {
1017 const workerUsage = this.workerNodes[workerNodeKey].usage
1018 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
1019 this.updateRunTimeWorkerUsage(workerUsage, message)
1020 this.updateEluWorkerUsage(workerUsage, message)
1021 }
1022 if (
1023 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1024 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
1025 message.taskPerformance?.name as string
1026 ) != null
1027 ) {
1028 const taskFunctionWorkerUsage = this.workerNodes[
1029 workerNodeKey
1030 ].getTaskFunctionWorkerUsage(
1031 message.taskPerformance?.name as string
1032 ) as WorkerUsage
1033 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
1034 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
1035 this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
1036 }
1037 }
1038
1039 /**
1040 * Whether the worker node shall update its task function worker usage or not.
1041 *
1042 * @param workerNodeKey - The worker node key.
1043 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1044 */
1045 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
1046 const workerInfo = this.getWorkerInfo(workerNodeKey)
1047 return (
1048 workerInfo != null &&
1049 Array.isArray(workerInfo.taskFunctionNames) &&
1050 workerInfo.taskFunctionNames.length > 2
1051 )
1052 }
1053
1054 private updateTaskStatisticsWorkerUsage (
1055 workerUsage: WorkerUsage,
1056 message: MessageValue<Response>
1057 ): void {
1058 const workerTaskStatistics = workerUsage.tasks
1059 if (
1060 workerTaskStatistics.executing != null &&
1061 workerTaskStatistics.executing > 0
1062 ) {
1063 --workerTaskStatistics.executing
1064 }
1065 if (message.workerError == null) {
1066 ++workerTaskStatistics.executed
1067 } else {
1068 ++workerTaskStatistics.failed
1069 }
1070 }
1071
1072 private updateRunTimeWorkerUsage (
1073 workerUsage: WorkerUsage,
1074 message: MessageValue<Response>
1075 ): void {
1076 if (message.workerError != null) {
1077 return
1078 }
1079 updateMeasurementStatistics(
1080 workerUsage.runTime,
1081 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
1082 message.taskPerformance?.runTime ?? 0
1083 )
1084 }
1085
1086 private updateWaitTimeWorkerUsage (
1087 workerUsage: WorkerUsage,
1088 task: Task<Data>
1089 ): void {
1090 const timestamp = performance.now()
1091 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
1092 updateMeasurementStatistics(
1093 workerUsage.waitTime,
1094 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
1095 taskWaitTime
1096 )
1097 }
1098
1099 private updateEluWorkerUsage (
1100 workerUsage: WorkerUsage,
1101 message: MessageValue<Response>
1102 ): void {
1103 if (message.workerError != null) {
1104 return
1105 }
1106 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
1107 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
1108 updateMeasurementStatistics(
1109 workerUsage.elu.active,
1110 eluTaskStatisticsRequirements,
1111 message.taskPerformance?.elu?.active ?? 0
1112 )
1113 updateMeasurementStatistics(
1114 workerUsage.elu.idle,
1115 eluTaskStatisticsRequirements,
1116 message.taskPerformance?.elu?.idle ?? 0
1117 )
1118 if (eluTaskStatisticsRequirements.aggregate) {
1119 if (message.taskPerformance?.elu != null) {
1120 if (workerUsage.elu.utilization != null) {
1121 workerUsage.elu.utilization =
1122 (workerUsage.elu.utilization +
1123 message.taskPerformance.elu.utilization) /
1124 2
1125 } else {
1126 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
1127 }
1128 }
1129 }
1130 }
1131
1132 /**
1133 * Chooses a worker node for the next task.
1134 *
1135 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
1136 *
1137 * @returns The chosen worker node key
1138 */
1139 private chooseWorkerNode (): number {
1140 if (this.shallCreateDynamicWorker()) {
1141 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
1142 if (
1143 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1144 ) {
1145 return workerNodeKey
1146 }
1147 }
1148 return this.workerChoiceStrategyContext.execute()
1149 }
1150
1151 /**
1152 * Conditions for dynamic worker creation.
1153 *
1154 * @returns Whether to create a dynamic worker or not.
1155 */
1156 private shallCreateDynamicWorker (): boolean {
1157 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
1158 }
1159
1160 /**
1161 * Sends a message to worker given its worker node key.
1162 *
1163 * @param workerNodeKey - The worker node key.
1164 * @param message - The message.
1165 * @param transferList - The optional array of transferable objects.
1166 */
1167 protected abstract sendToWorker (
1168 workerNodeKey: number,
1169 message: MessageValue<Data>,
1170 transferList?: TransferListItem[]
1171 ): void
1172
1173 /**
1174 * Creates a new worker.
1175 *
1176 * @returns Newly created worker.
1177 */
1178 protected abstract createWorker (): Worker
1179
1180 /**
1181 * Creates a new, completely set up worker node.
1182 *
1183 * @returns New, completely set up worker node key.
1184 */
1185 protected createAndSetupWorkerNode (): number {
1186 const worker = this.createWorker()
1187
1188 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
1189 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
1190 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1191 worker.on('error', error => {
1192 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1193 const workerInfo = this.getWorkerInfo(workerNodeKey)
1194 workerInfo.ready = false
1195 this.workerNodes[workerNodeKey].closeChannel()
1196 this.emitter?.emit(PoolEvents.error, error)
1197 if (
1198 this.opts.restartWorkerOnError === true &&
1199 this.started &&
1200 !this.starting
1201 ) {
1202 if (workerInfo.dynamic) {
1203 this.createAndSetupDynamicWorkerNode()
1204 } else {
1205 this.createAndSetupWorkerNode()
1206 }
1207 }
1208 if (this.opts.enableTasksQueue === true) {
1209 this.redistributeQueuedTasks(workerNodeKey)
1210 }
1211 })
1212 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
1213 worker.once('exit', () => {
1214 this.removeWorkerNode(worker)
1215 })
1216
1217 const workerNodeKey = this.addWorkerNode(worker)
1218
1219 this.afterWorkerNodeSetup(workerNodeKey)
1220
1221 return workerNodeKey
1222 }
1223
1224 /**
1225 * Creates a new, completely set up dynamic worker node.
1226 *
1227 * @returns New, completely set up dynamic worker node key.
1228 */
1229 protected createAndSetupDynamicWorkerNode (): number {
1230 const workerNodeKey = this.createAndSetupWorkerNode()
1231 this.registerWorkerMessageListener(workerNodeKey, message => {
1232 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1233 message.workerId
1234 )
1235 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
1236 // Kill message received from worker
1237 if (
1238 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1239 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
1240 ((this.opts.enableTasksQueue === false &&
1241 workerUsage.tasks.executing === 0) ||
1242 (this.opts.enableTasksQueue === true &&
1243 workerUsage.tasks.executing === 0 &&
1244 this.tasksQueueSize(localWorkerNodeKey) === 0)))
1245 ) {
1246 this.destroyWorkerNode(localWorkerNodeKey).catch(error => {
1247 this.emitter?.emit(PoolEvents.error, error)
1248 })
1249 }
1250 })
1251 const workerInfo = this.getWorkerInfo(workerNodeKey)
1252 this.sendToWorker(workerNodeKey, {
1253 checkActive: true
1254 })
1255 if (this.taskFunctions.size > 0) {
1256 for (const [taskFunctionName, taskFunction] of this.taskFunctions) {
1257 this.sendTaskFunctionOperationToWorker(workerNodeKey, {
1258 taskFunctionOperation: 'add',
1259 taskFunctionName,
1260 taskFunction: taskFunction.toString()
1261 }).catch(error => {
1262 this.emitter?.emit(PoolEvents.error, error)
1263 })
1264 }
1265 }
1266 workerInfo.dynamic = true
1267 if (
1268 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
1269 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1270 ) {
1271 workerInfo.ready = true
1272 }
1273 this.checkAndEmitDynamicWorkerCreationEvents()
1274 return workerNodeKey
1275 }
1276
1277 /**
1278 * Registers a listener callback on the worker given its worker node key.
1279 *
1280 * @param workerNodeKey - The worker node key.
1281 * @param listener - The message listener callback.
1282 */
1283 protected abstract registerWorkerMessageListener<
1284 Message extends Data | Response
1285 >(
1286 workerNodeKey: number,
1287 listener: (message: MessageValue<Message>) => void
1288 ): void
1289
1290 /**
1291 * Method hooked up after a worker node has been newly created.
1292 * Can be overridden.
1293 *
1294 * @param workerNodeKey - The newly created worker node key.
1295 */
1296 protected afterWorkerNodeSetup (workerNodeKey: number): void {
1297 // Listen to worker messages.
1298 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
1299 // Send the startup message to worker.
1300 this.sendStartupMessageToWorker(workerNodeKey)
1301 // Send the statistics message to worker.
1302 this.sendStatisticsMessageToWorker(workerNodeKey)
1303 if (this.opts.enableTasksQueue === true) {
1304 if (this.opts.tasksQueueOptions?.taskStealing === true) {
1305 this.workerNodes[workerNodeKey].onEmptyQueue =
1306 this.taskStealingOnEmptyQueue.bind(this)
1307 }
1308 if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
1309 this.workerNodes[workerNodeKey].onBackPressure =
1310 this.tasksStealingOnBackPressure.bind(this)
1311 }
1312 }
1313 }
1314
1315 /**
1316 * Sends the startup message to worker given its worker node key.
1317 *
1318 * @param workerNodeKey - The worker node key.
1319 */
1320 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1321
1322 /**
1323 * Sends the statistics message to worker given its worker node key.
1324 *
1325 * @param workerNodeKey - The worker node key.
1326 */
1327 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
1328 this.sendToWorker(workerNodeKey, {
1329 statistics: {
1330 runTime:
1331 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1332 .runTime.aggregate,
1333 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1334 .elu.aggregate
1335 }
1336 })
1337 }
1338
1339 private redistributeQueuedTasks (workerNodeKey: number): void {
1340 while (this.tasksQueueSize(workerNodeKey) > 0) {
1341 const destinationWorkerNodeKey = this.workerNodes.reduce(
1342 (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
1343 return workerNode.info.ready &&
1344 workerNode.usage.tasks.queued <
1345 workerNodes[minWorkerNodeKey].usage.tasks.queued
1346 ? workerNodeKey
1347 : minWorkerNodeKey
1348 },
1349 0
1350 )
1351 const task = this.dequeueTask(workerNodeKey) as Task<Data>
1352 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1353 this.executeTask(destinationWorkerNodeKey, task)
1354 } else {
1355 this.enqueueTask(destinationWorkerNodeKey, task)
1356 }
1357 }
1358 }
1359
1360 private updateTaskStolenStatisticsWorkerUsage (
1361 workerNodeKey: number,
1362 taskName: string
1363 ): void {
1364 const workerNode = this.workerNodes[workerNodeKey]
1365 if (workerNode?.usage != null) {
1366 ++workerNode.usage.tasks.stolen
1367 }
1368 if (
1369 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1370 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1371 ) {
1372 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1373 taskName
1374 ) as WorkerUsage
1375 ++taskFunctionWorkerUsage.tasks.stolen
1376 }
1377 }
1378
1379 private taskStealingOnEmptyQueue (workerId: number): void {
1380 const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
1381 const workerNodes = this.workerNodes
1382 .slice()
1383 .sort(
1384 (workerNodeA, workerNodeB) =>
1385 workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
1386 )
1387 const sourceWorkerNode = workerNodes.find(
1388 workerNode =>
1389 workerNode.info.ready &&
1390 workerNode.info.id !== workerId &&
1391 workerNode.usage.tasks.queued > 0
1392 )
1393 if (sourceWorkerNode != null) {
1394 const task = sourceWorkerNode.popTask() as Task<Data>
1395 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1396 this.executeTask(destinationWorkerNodeKey, task)
1397 } else {
1398 this.enqueueTask(destinationWorkerNodeKey, task)
1399 }
1400 this.updateTaskStolenStatisticsWorkerUsage(
1401 destinationWorkerNodeKey,
1402 task.name as string
1403 )
1404 }
1405 }
1406
1407 private tasksStealingOnBackPressure (workerId: number): void {
1408 const sizeOffset = 1
1409 if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) {
1410 return
1411 }
1412 const sourceWorkerNode =
1413 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
1414 const workerNodes = this.workerNodes
1415 .slice()
1416 .sort(
1417 (workerNodeA, workerNodeB) =>
1418 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1419 )
1420 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1421 if (
1422 sourceWorkerNode.usage.tasks.queued > 0 &&
1423 workerNode.info.ready &&
1424 workerNode.info.id !== workerId &&
1425 workerNode.usage.tasks.queued <
1426 (this.opts.tasksQueueOptions?.size as number) - sizeOffset
1427 ) {
1428 const task = sourceWorkerNode.popTask() as Task<Data>
1429 if (this.shallExecuteTask(workerNodeKey)) {
1430 this.executeTask(workerNodeKey, task)
1431 } else {
1432 this.enqueueTask(workerNodeKey, task)
1433 }
1434 this.updateTaskStolenStatisticsWorkerUsage(
1435 workerNodeKey,
1436 task.name as string
1437 )
1438 }
1439 }
1440 }
1441
1442 /**
1443 * This method is the listener registered for each worker message.
1444 *
1445 * @returns The listener function to execute when a message is received from a worker.
1446 */
1447 protected workerListener (): (message: MessageValue<Response>) => void {
1448 return message => {
1449 this.checkMessageWorkerId(message)
1450 if (message.ready != null && message.taskFunctionNames != null) {
1451 // Worker ready response received from worker
1452 this.handleWorkerReadyResponse(message)
1453 } else if (message.taskId != null) {
1454 // Task execution response received from worker
1455 this.handleTaskExecutionResponse(message)
1456 } else if (message.taskFunctionNames != null) {
1457 // Task function names message received from worker
1458 this.getWorkerInfo(
1459 this.getWorkerNodeKeyByWorkerId(message.workerId)
1460 ).taskFunctionNames = message.taskFunctionNames
1461 }
1462 }
1463 }
1464
1465 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
1466 if (message.ready === false) {
1467 throw new Error(
1468 `Worker ${message.workerId as number} failed to initialize`
1469 )
1470 }
1471 const workerInfo = this.getWorkerInfo(
1472 this.getWorkerNodeKeyByWorkerId(message.workerId)
1473 )
1474 workerInfo.ready = message.ready as boolean
1475 workerInfo.taskFunctionNames = message.taskFunctionNames
1476 if (this.emitter != null && this.ready) {
1477 this.emitter.emit(PoolEvents.ready, this.info)
1478 }
1479 }
1480
1481 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1482 const { taskId, workerError, data } = message
1483 const promiseResponse = this.promiseResponseMap.get(taskId as string)
1484 if (promiseResponse != null) {
1485 if (workerError != null) {
1486 this.emitter?.emit(PoolEvents.taskError, workerError)
1487 promiseResponse.reject(workerError.message)
1488 } else {
1489 promiseResponse.resolve(data as Response)
1490 }
1491 const workerNodeKey = promiseResponse.workerNodeKey
1492 this.afterTaskExecutionHook(workerNodeKey, message)
1493 this.workerChoiceStrategyContext.update(workerNodeKey)
1494 this.promiseResponseMap.delete(taskId as string)
1495 if (
1496 this.opts.enableTasksQueue === true &&
1497 this.tasksQueueSize(workerNodeKey) > 0 &&
1498 this.workerNodes[workerNodeKey].usage.tasks.executing <
1499 (this.opts.tasksQueueOptions?.concurrency as number)
1500 ) {
1501 this.executeTask(
1502 workerNodeKey,
1503 this.dequeueTask(workerNodeKey) as Task<Data>
1504 )
1505 }
1506 }
1507 }
1508
1509 private checkAndEmitTaskExecutionEvents (): void {
1510 if (this.busy) {
1511 this.emitter?.emit(PoolEvents.busy, this.info)
1512 }
1513 }
1514
1515 private checkAndEmitTaskQueuingEvents (): void {
1516 if (this.hasBackPressure()) {
1517 this.emitter?.emit(PoolEvents.backPressure, this.info)
1518 }
1519 }
1520
1521 private checkAndEmitDynamicWorkerCreationEvents (): void {
1522 if (this.type === PoolTypes.dynamic) {
1523 if (this.full) {
1524 this.emitter?.emit(PoolEvents.full, this.info)
1525 }
1526 }
1527 }
1528
1529 /**
1530 * Gets the worker information given its worker node key.
1531 *
1532 * @param workerNodeKey - The worker node key.
1533 * @returns The worker information.
1534 */
1535 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
1536 return this.workerNodes[workerNodeKey].info
1537 }
1538
1539 /**
1540 * Adds the given worker in the pool worker nodes.
1541 *
1542 * @param worker - The worker.
1543 * @returns The added worker node key.
1544 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1545 */
1546 private addWorkerNode (worker: Worker): number {
1547 const workerNode = new WorkerNode<Worker, Data>(
1548 worker,
1549 this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2)
1550 )
1551 // Flag the worker node as ready at pool startup.
1552 if (this.starting) {
1553 workerNode.info.ready = true
1554 }
1555 this.workerNodes.push(workerNode)
1556 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1557 if (workerNodeKey === -1) {
1558 throw new Error('Worker added not found in worker nodes')
1559 }
1560 return workerNodeKey
1561 }
1562
1563 /**
1564 * Removes the given worker from the pool worker nodes.
1565 *
1566 * @param worker - The worker.
1567 */
1568 private removeWorkerNode (worker: Worker): void {
1569 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1570 if (workerNodeKey !== -1) {
1571 this.workerNodes.splice(workerNodeKey, 1)
1572 this.workerChoiceStrategyContext.remove(workerNodeKey)
1573 }
1574 }
1575
1576 /** @inheritDoc */
1577 public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
1578 return (
1579 this.opts.enableTasksQueue === true &&
1580 this.workerNodes[workerNodeKey].hasBackPressure()
1581 )
1582 }
1583
1584 private hasBackPressure (): boolean {
1585 return (
1586 this.opts.enableTasksQueue === true &&
1587 this.workerNodes.findIndex(
1588 workerNode => !workerNode.hasBackPressure()
1589 ) === -1
1590 )
1591 }
1592
1593 /**
1594 * Executes the given task on the worker given its worker node key.
1595 *
1596 * @param workerNodeKey - The worker node key.
1597 * @param task - The task to execute.
1598 */
1599 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1600 this.beforeTaskExecutionHook(workerNodeKey, task)
1601 this.sendToWorker(workerNodeKey, task, task.transferList)
1602 this.checkAndEmitTaskExecutionEvents()
1603 }
1604
1605 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1606 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1607 this.checkAndEmitTaskQueuingEvents()
1608 return tasksQueueSize
1609 }
1610
1611 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1612 return this.workerNodes[workerNodeKey].dequeueTask()
1613 }
1614
1615 private tasksQueueSize (workerNodeKey: number): number {
1616 return this.workerNodes[workerNodeKey].tasksQueueSize()
1617 }
1618
1619 protected flushTasksQueue (workerNodeKey: number): void {
1620 while (this.tasksQueueSize(workerNodeKey) > 0) {
1621 this.executeTask(
1622 workerNodeKey,
1623 this.dequeueTask(workerNodeKey) as Task<Data>
1624 )
1625 }
1626 this.workerNodes[workerNodeKey].clearTasksQueue()
1627 }
1628
1629 private flushTasksQueues (): void {
1630 for (const [workerNodeKey] of this.workerNodes.entries()) {
1631 this.flushTasksQueue(workerNodeKey)
1632 }
1633 }
1634 }