fix: check pool statuses
[poolifier.git] / src / pools / abstract-pool.ts
1 import { randomUUID } from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import type { TransferListItem } from 'node:worker_threads'
4 import { EventEmitterAsyncResource } from 'node:events'
5 import type {
6 MessageValue,
7 PromiseResponseWrapper,
8 Task
9 } from '../utility-types'
10 import {
11 DEFAULT_TASK_NAME,
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
13 EMPTY_FUNCTION,
14 average,
15 isKillBehavior,
16 isPlainObject,
17 max,
18 median,
19 min,
20 once,
21 round
22 } from '../utils'
23 import { KillBehaviors } from '../worker/worker-options'
24 import type { TaskFunction } from '../worker/task-functions'
25 import {
26 type IPool,
27 PoolEvents,
28 type PoolInfo,
29 type PoolOptions,
30 type PoolType,
31 PoolTypes,
32 type TasksQueueOptions
33 } from './pool'
34 import type {
35 IWorker,
36 IWorkerNode,
37 WorkerInfo,
38 WorkerNodeEventDetail,
39 WorkerType,
40 WorkerUsage
41 } from './worker'
42 import {
43 type MeasurementStatisticsRequirements,
44 Measurements,
45 WorkerChoiceStrategies,
46 type WorkerChoiceStrategy,
47 type WorkerChoiceStrategyOptions
48 } from './selection-strategies/selection-strategies-types'
49 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
50 import { version } from './version'
51 import { WorkerNode } from './worker-node'
52 import {
53 checkFilePath,
54 checkValidTasksQueueOptions,
55 checkValidWorkerChoiceStrategy,
56 updateMeasurementStatistics
57 } from './utils'
58
59 /**
60 * Base class that implements some shared logic for all poolifier pools.
61 *
62 * @typeParam Worker - Type of worker which manages this pool.
63 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
64 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
65 */
66 export abstract class AbstractPool<
67 Worker extends IWorker,
68 Data = unknown,
69 Response = unknown
70 > implements IPool<Worker, Data, Response> {
71 /** @inheritDoc */
72 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
73
74 /** @inheritDoc */
75 public emitter?: EventEmitterAsyncResource
76
77 /**
78 * Dynamic pool maximum size property placeholder.
79 */
80 protected readonly max?: number
81
82 /**
83 * The task execution response promise map:
84 * - `key`: The message id of each submitted task.
85 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
86 *
87 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
88 */
89 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
90 new Map<string, PromiseResponseWrapper<Response>>()
91
92 /**
93 * Worker choice strategy context referencing a worker choice algorithm implementation.
94 */
95 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
96 Worker,
97 Data,
98 Response
99 >
100
101 /**
102 * The task functions added at runtime map:
103 * - `key`: The task function name.
104 * - `value`: The task function itself.
105 */
106 private readonly taskFunctions: Map<string, TaskFunction<Data, Response>>
107
108 /**
109 * Whether the pool is started or not.
110 */
111 private started: boolean
112 /**
113 * Whether the pool is starting or not.
114 */
115 private starting: boolean
116 /**
117 * Whether the pool is destroying or not.
118 */
119 private destroying: boolean
120 /**
121 * The start timestamp of the pool.
122 */
123 private readonly startTimestamp
124
125 /**
126 * Constructs a new poolifier pool.
127 *
128 * @param numberOfWorkers - Number of workers that this pool should manage.
129 * @param filePath - Path to the worker file.
130 * @param opts - Options for the pool.
131 */
132 public constructor (
133 protected readonly numberOfWorkers: number,
134 protected readonly filePath: string,
135 protected readonly opts: PoolOptions<Worker>
136 ) {
137 if (!this.isMain()) {
138 throw new Error(
139 'Cannot start a pool from a worker with the same type as the pool'
140 )
141 }
142 checkFilePath(this.filePath)
143 this.checkNumberOfWorkers(this.numberOfWorkers)
144 this.checkPoolOptions(this.opts)
145
146 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
147 this.executeTask = this.executeTask.bind(this)
148 this.enqueueTask = this.enqueueTask.bind(this)
149
150 if (this.opts.enableEvents === true) {
151 this.initializeEventEmitter()
152 }
153 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
154 Worker,
155 Data,
156 Response
157 >(
158 this,
159 this.opts.workerChoiceStrategy,
160 this.opts.workerChoiceStrategyOptions
161 )
162
163 this.setupHook()
164
165 this.taskFunctions = new Map<string, TaskFunction<Data, Response>>()
166
167 this.started = false
168 this.starting = false
169 this.destroying = false
170 if (this.opts.startWorkers === true) {
171 this.start()
172 }
173
174 this.startTimestamp = performance.now()
175 }
176
177 private checkNumberOfWorkers (numberOfWorkers: number): void {
178 if (numberOfWorkers == null) {
179 throw new Error(
180 'Cannot instantiate a pool without specifying the number of workers'
181 )
182 } else if (!Number.isSafeInteger(numberOfWorkers)) {
183 throw new TypeError(
184 'Cannot instantiate a pool with a non safe integer number of workers'
185 )
186 } else if (numberOfWorkers < 0) {
187 throw new RangeError(
188 'Cannot instantiate a pool with a negative number of workers'
189 )
190 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
191 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
192 }
193 }
194
195 private checkPoolOptions (opts: PoolOptions<Worker>): void {
196 if (isPlainObject(opts)) {
197 this.opts.startWorkers = opts.startWorkers ?? true
198 checkValidWorkerChoiceStrategy(
199 opts.workerChoiceStrategy as WorkerChoiceStrategy
200 )
201 this.opts.workerChoiceStrategy =
202 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
203 this.checkValidWorkerChoiceStrategyOptions(
204 opts.workerChoiceStrategyOptions as WorkerChoiceStrategyOptions
205 )
206 this.opts.workerChoiceStrategyOptions = {
207 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
208 ...opts.workerChoiceStrategyOptions
209 }
210 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
211 this.opts.enableEvents = opts.enableEvents ?? true
212 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
213 if (this.opts.enableTasksQueue) {
214 checkValidTasksQueueOptions(opts.tasksQueueOptions as TasksQueueOptions)
215 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
216 opts.tasksQueueOptions as TasksQueueOptions
217 )
218 }
219 } else {
220 throw new TypeError('Invalid pool options: must be a plain object')
221 }
222 }
223
224 private checkValidWorkerChoiceStrategyOptions (
225 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
226 ): void {
227 if (
228 workerChoiceStrategyOptions != null &&
229 !isPlainObject(workerChoiceStrategyOptions)
230 ) {
231 throw new TypeError(
232 'Invalid worker choice strategy options: must be a plain object'
233 )
234 }
235 if (
236 workerChoiceStrategyOptions?.retries != null &&
237 !Number.isSafeInteger(workerChoiceStrategyOptions.retries)
238 ) {
239 throw new TypeError(
240 'Invalid worker choice strategy options: retries must be an integer'
241 )
242 }
243 if (
244 workerChoiceStrategyOptions?.retries != null &&
245 workerChoiceStrategyOptions.retries < 0
246 ) {
247 throw new RangeError(
248 `Invalid worker choice strategy options: retries '${workerChoiceStrategyOptions.retries}' must be greater or equal than zero`
249 )
250 }
251 if (
252 workerChoiceStrategyOptions?.weights != null &&
253 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
254 ) {
255 throw new Error(
256 'Invalid worker choice strategy options: must have a weight for each worker node'
257 )
258 }
259 if (
260 workerChoiceStrategyOptions?.measurement != null &&
261 !Object.values(Measurements).includes(
262 workerChoiceStrategyOptions.measurement
263 )
264 ) {
265 throw new Error(
266 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
267 )
268 }
269 }
270
271 private initializeEventEmitter (): void {
272 this.emitter = new EventEmitterAsyncResource({
273 name: `poolifier:${this.type}-${this.worker}-pool`
274 })
275 }
276
277 /** @inheritDoc */
278 public get info (): PoolInfo {
279 return {
280 version,
281 type: this.type,
282 worker: this.worker,
283 started: this.started,
284 ready: this.ready,
285 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
286 minSize: this.minSize,
287 maxSize: this.maxSize,
288 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
289 .runTime.aggregate &&
290 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
291 .waitTime.aggregate && { utilization: round(this.utilization) }),
292 workerNodes: this.workerNodes.length,
293 idleWorkerNodes: this.workerNodes.reduce(
294 (accumulator, workerNode) =>
295 workerNode.usage.tasks.executing === 0
296 ? accumulator + 1
297 : accumulator,
298 0
299 ),
300 busyWorkerNodes: this.workerNodes.reduce(
301 (accumulator, workerNode) =>
302 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
303 0
304 ),
305 executedTasks: this.workerNodes.reduce(
306 (accumulator, workerNode) =>
307 accumulator + workerNode.usage.tasks.executed,
308 0
309 ),
310 executingTasks: this.workerNodes.reduce(
311 (accumulator, workerNode) =>
312 accumulator + workerNode.usage.tasks.executing,
313 0
314 ),
315 ...(this.opts.enableTasksQueue === true && {
316 queuedTasks: this.workerNodes.reduce(
317 (accumulator, workerNode) =>
318 accumulator + workerNode.usage.tasks.queued,
319 0
320 )
321 }),
322 ...(this.opts.enableTasksQueue === true && {
323 maxQueuedTasks: this.workerNodes.reduce(
324 (accumulator, workerNode) =>
325 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
326 0
327 )
328 }),
329 ...(this.opts.enableTasksQueue === true && {
330 backPressure: this.hasBackPressure()
331 }),
332 ...(this.opts.enableTasksQueue === true && {
333 stolenTasks: this.workerNodes.reduce(
334 (accumulator, workerNode) =>
335 accumulator + workerNode.usage.tasks.stolen,
336 0
337 )
338 }),
339 failedTasks: this.workerNodes.reduce(
340 (accumulator, workerNode) =>
341 accumulator + workerNode.usage.tasks.failed,
342 0
343 ),
344 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
345 .runTime.aggregate && {
346 runTime: {
347 minimum: round(
348 min(
349 ...this.workerNodes.map(
350 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
351 )
352 )
353 ),
354 maximum: round(
355 max(
356 ...this.workerNodes.map(
357 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
358 )
359 )
360 ),
361 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
362 .runTime.average && {
363 average: round(
364 average(
365 this.workerNodes.reduce<number[]>(
366 (accumulator, workerNode) =>
367 accumulator.concat(workerNode.usage.runTime.history),
368 []
369 )
370 )
371 )
372 }),
373 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
374 .runTime.median && {
375 median: round(
376 median(
377 this.workerNodes.reduce<number[]>(
378 (accumulator, workerNode) =>
379 accumulator.concat(workerNode.usage.runTime.history),
380 []
381 )
382 )
383 )
384 })
385 }
386 }),
387 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
388 .waitTime.aggregate && {
389 waitTime: {
390 minimum: round(
391 min(
392 ...this.workerNodes.map(
393 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
394 )
395 )
396 ),
397 maximum: round(
398 max(
399 ...this.workerNodes.map(
400 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
401 )
402 )
403 ),
404 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
405 .waitTime.average && {
406 average: round(
407 average(
408 this.workerNodes.reduce<number[]>(
409 (accumulator, workerNode) =>
410 accumulator.concat(workerNode.usage.waitTime.history),
411 []
412 )
413 )
414 )
415 }),
416 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
417 .waitTime.median && {
418 median: round(
419 median(
420 this.workerNodes.reduce<number[]>(
421 (accumulator, workerNode) =>
422 accumulator.concat(workerNode.usage.waitTime.history),
423 []
424 )
425 )
426 )
427 })
428 }
429 })
430 }
431 }
432
433 /**
434 * The pool readiness boolean status.
435 */
436 private get ready (): boolean {
437 return (
438 this.workerNodes.reduce(
439 (accumulator, workerNode) =>
440 !workerNode.info.dynamic && workerNode.info.ready
441 ? accumulator + 1
442 : accumulator,
443 0
444 ) >= this.minSize
445 )
446 }
447
448 /**
449 * The approximate pool utilization.
450 *
451 * @returns The pool utilization.
452 */
453 private get utilization (): number {
454 const poolTimeCapacity =
455 (performance.now() - this.startTimestamp) * this.maxSize
456 const totalTasksRunTime = this.workerNodes.reduce(
457 (accumulator, workerNode) =>
458 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
459 0
460 )
461 const totalTasksWaitTime = this.workerNodes.reduce(
462 (accumulator, workerNode) =>
463 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
464 0
465 )
466 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
467 }
468
469 /**
470 * The pool type.
471 *
472 * If it is `'dynamic'`, it provides the `max` property.
473 */
474 protected abstract get type (): PoolType
475
476 /**
477 * The worker type.
478 */
479 protected abstract get worker (): WorkerType
480
481 /**
482 * The pool minimum size.
483 */
484 protected get minSize (): number {
485 return this.numberOfWorkers
486 }
487
488 /**
489 * The pool maximum size.
490 */
491 protected get maxSize (): number {
492 return this.max ?? this.numberOfWorkers
493 }
494
495 /**
496 * Checks if the worker id sent in the received message from a worker is valid.
497 *
498 * @param message - The received message.
499 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
500 */
501 private checkMessageWorkerId (message: MessageValue<Data | Response>): void {
502 if (message.workerId == null) {
503 throw new Error('Worker message received without worker id')
504 } else if (this.getWorkerNodeKeyByWorkerId(message.workerId) === -1) {
505 throw new Error(
506 `Worker message received from unknown worker '${message.workerId}'`
507 )
508 }
509 }
510
511 /**
512 * Gets the given worker its worker node key.
513 *
514 * @param worker - The worker.
515 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
516 */
517 private getWorkerNodeKeyByWorker (worker: Worker): number {
518 return this.workerNodes.findIndex(
519 workerNode => workerNode.worker === worker
520 )
521 }
522
523 /**
524 * Gets the worker node key given its worker id.
525 *
526 * @param workerId - The worker id.
527 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
528 */
529 private getWorkerNodeKeyByWorkerId (workerId: number | undefined): number {
530 return this.workerNodes.findIndex(
531 workerNode => workerNode.info.id === workerId
532 )
533 }
534
535 /** @inheritDoc */
536 public setWorkerChoiceStrategy (
537 workerChoiceStrategy: WorkerChoiceStrategy,
538 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
539 ): void {
540 checkValidWorkerChoiceStrategy(workerChoiceStrategy)
541 this.opts.workerChoiceStrategy = workerChoiceStrategy
542 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
543 this.opts.workerChoiceStrategy
544 )
545 if (workerChoiceStrategyOptions != null) {
546 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
547 }
548 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
549 workerNode.resetUsage()
550 this.sendStatisticsMessageToWorker(workerNodeKey)
551 }
552 }
553
554 /** @inheritDoc */
555 public setWorkerChoiceStrategyOptions (
556 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
557 ): void {
558 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
559 this.opts.workerChoiceStrategyOptions = {
560 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
561 ...workerChoiceStrategyOptions
562 }
563 this.workerChoiceStrategyContext.setOptions(
564 this.opts.workerChoiceStrategyOptions
565 )
566 }
567
568 /** @inheritDoc */
569 public enableTasksQueue (
570 enable: boolean,
571 tasksQueueOptions?: TasksQueueOptions
572 ): void {
573 if (this.opts.enableTasksQueue === true && !enable) {
574 this.unsetTaskStealing()
575 this.unsetTasksStealingOnBackPressure()
576 this.flushTasksQueues()
577 }
578 this.opts.enableTasksQueue = enable
579 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
580 }
581
582 /** @inheritDoc */
583 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
584 if (this.opts.enableTasksQueue === true) {
585 checkValidTasksQueueOptions(tasksQueueOptions)
586 this.opts.tasksQueueOptions =
587 this.buildTasksQueueOptions(tasksQueueOptions)
588 this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number)
589 if (this.opts.tasksQueueOptions.taskStealing === true) {
590 this.setTaskStealing()
591 } else {
592 this.unsetTaskStealing()
593 }
594 if (this.opts.tasksQueueOptions.tasksStealingOnBackPressure === true) {
595 this.setTasksStealingOnBackPressure()
596 } else {
597 this.unsetTasksStealingOnBackPressure()
598 }
599 } else if (this.opts.tasksQueueOptions != null) {
600 delete this.opts.tasksQueueOptions
601 }
602 }
603
604 private buildTasksQueueOptions (
605 tasksQueueOptions: TasksQueueOptions
606 ): TasksQueueOptions {
607 return {
608 ...{
609 size: Math.pow(this.maxSize, 2),
610 concurrency: 1,
611 taskStealing: true,
612 tasksStealingOnBackPressure: true
613 },
614 ...tasksQueueOptions
615 }
616 }
617
618 private setTasksQueueSize (size: number): void {
619 for (const workerNode of this.workerNodes) {
620 workerNode.tasksQueueBackPressureSize = size
621 }
622 }
623
624 private setTaskStealing (): void {
625 for (const [workerNodeKey] of this.workerNodes.entries()) {
626 this.workerNodes[workerNodeKey].addEventListener(
627 'emptyqueue',
628 this.handleEmptyQueueEvent as EventListener
629 )
630 }
631 }
632
633 private unsetTaskStealing (): void {
634 for (const [workerNodeKey] of this.workerNodes.entries()) {
635 this.workerNodes[workerNodeKey].removeEventListener(
636 'emptyqueue',
637 this.handleEmptyQueueEvent as EventListener
638 )
639 }
640 }
641
642 private setTasksStealingOnBackPressure (): void {
643 for (const [workerNodeKey] of this.workerNodes.entries()) {
644 this.workerNodes[workerNodeKey].addEventListener(
645 'backpressure',
646 this.handleBackPressureEvent as EventListener
647 )
648 }
649 }
650
651 private unsetTasksStealingOnBackPressure (): void {
652 for (const [workerNodeKey] of this.workerNodes.entries()) {
653 this.workerNodes[workerNodeKey].removeEventListener(
654 'backpressure',
655 this.handleBackPressureEvent as EventListener
656 )
657 }
658 }
659
660 /**
661 * Whether the pool is full or not.
662 *
663 * The pool filling boolean status.
664 */
665 protected get full (): boolean {
666 return this.workerNodes.length >= this.maxSize
667 }
668
669 /**
670 * Whether the pool is busy or not.
671 *
672 * The pool busyness boolean status.
673 */
674 protected abstract get busy (): boolean
675
676 /**
677 * Whether worker nodes are executing concurrently their tasks quota or not.
678 *
679 * @returns Worker nodes busyness boolean status.
680 */
681 protected internalBusy (): boolean {
682 if (this.opts.enableTasksQueue === true) {
683 return (
684 this.workerNodes.findIndex(
685 workerNode =>
686 workerNode.info.ready &&
687 workerNode.usage.tasks.executing <
688 (this.opts.tasksQueueOptions?.concurrency as number)
689 ) === -1
690 )
691 }
692 return (
693 this.workerNodes.findIndex(
694 workerNode =>
695 workerNode.info.ready && workerNode.usage.tasks.executing === 0
696 ) === -1
697 )
698 }
699
700 private async sendTaskFunctionOperationToWorker (
701 workerNodeKey: number,
702 message: MessageValue<Data>
703 ): Promise<boolean> {
704 return await new Promise<boolean>((resolve, reject) => {
705 const taskFunctionOperationListener = (
706 message: MessageValue<Response>
707 ): void => {
708 this.checkMessageWorkerId(message)
709 const workerId = this.getWorkerInfo(workerNodeKey).id as number
710 if (
711 message.taskFunctionOperationStatus != null &&
712 message.workerId === workerId
713 ) {
714 if (message.taskFunctionOperationStatus) {
715 resolve(true)
716 } else if (!message.taskFunctionOperationStatus) {
717 reject(
718 new Error(
719 `Task function operation '${
720 message.taskFunctionOperation as string
721 }' failed on worker ${message.workerId} with error: '${
722 message.workerError?.message as string
723 }'`
724 )
725 )
726 }
727 this.deregisterWorkerMessageListener(
728 this.getWorkerNodeKeyByWorkerId(message.workerId),
729 taskFunctionOperationListener
730 )
731 }
732 }
733 this.registerWorkerMessageListener(
734 workerNodeKey,
735 taskFunctionOperationListener
736 )
737 this.sendToWorker(workerNodeKey, message)
738 })
739 }
740
741 private async sendTaskFunctionOperationToWorkers (
742 message: MessageValue<Data>
743 ): Promise<boolean> {
744 return await new Promise<boolean>((resolve, reject) => {
745 const responsesReceived = new Array<MessageValue<Response>>()
746 const taskFunctionOperationsListener = (
747 message: MessageValue<Response>
748 ): void => {
749 this.checkMessageWorkerId(message)
750 if (message.taskFunctionOperationStatus != null) {
751 responsesReceived.push(message)
752 if (responsesReceived.length === this.workerNodes.length) {
753 if (
754 responsesReceived.every(
755 message => message.taskFunctionOperationStatus === true
756 )
757 ) {
758 resolve(true)
759 } else if (
760 responsesReceived.some(
761 message => message.taskFunctionOperationStatus === false
762 )
763 ) {
764 const errorResponse = responsesReceived.find(
765 response => response.taskFunctionOperationStatus === false
766 )
767 reject(
768 new Error(
769 `Task function operation '${
770 message.taskFunctionOperation as string
771 }' failed on worker ${
772 errorResponse?.workerId as number
773 } with error: '${
774 errorResponse?.workerError?.message as string
775 }'`
776 )
777 )
778 }
779 this.deregisterWorkerMessageListener(
780 this.getWorkerNodeKeyByWorkerId(message.workerId),
781 taskFunctionOperationsListener
782 )
783 }
784 }
785 }
786 for (const [workerNodeKey] of this.workerNodes.entries()) {
787 this.registerWorkerMessageListener(
788 workerNodeKey,
789 taskFunctionOperationsListener
790 )
791 this.sendToWorker(workerNodeKey, message)
792 }
793 })
794 }
795
796 /** @inheritDoc */
797 public hasTaskFunction (name: string): boolean {
798 for (const workerNode of this.workerNodes) {
799 if (
800 Array.isArray(workerNode.info.taskFunctionNames) &&
801 workerNode.info.taskFunctionNames.includes(name)
802 ) {
803 return true
804 }
805 }
806 return false
807 }
808
809 /** @inheritDoc */
810 public async addTaskFunction (
811 name: string,
812 fn: TaskFunction<Data, Response>
813 ): Promise<boolean> {
814 if (typeof name !== 'string') {
815 throw new TypeError('name argument must be a string')
816 }
817 if (typeof name === 'string' && name.trim().length === 0) {
818 throw new TypeError('name argument must not be an empty string')
819 }
820 if (typeof fn !== 'function') {
821 throw new TypeError('fn argument must be a function')
822 }
823 const opResult = await this.sendTaskFunctionOperationToWorkers({
824 taskFunctionOperation: 'add',
825 taskFunctionName: name,
826 taskFunction: fn.toString()
827 })
828 this.taskFunctions.set(name, fn)
829 return opResult
830 }
831
832 /** @inheritDoc */
833 public async removeTaskFunction (name: string): Promise<boolean> {
834 if (!this.taskFunctions.has(name)) {
835 throw new Error(
836 'Cannot remove a task function not handled on the pool side'
837 )
838 }
839 const opResult = await this.sendTaskFunctionOperationToWorkers({
840 taskFunctionOperation: 'remove',
841 taskFunctionName: name
842 })
843 this.deleteTaskFunctionWorkerUsages(name)
844 this.taskFunctions.delete(name)
845 return opResult
846 }
847
848 /** @inheritDoc */
849 public listTaskFunctionNames (): string[] {
850 for (const workerNode of this.workerNodes) {
851 if (
852 Array.isArray(workerNode.info.taskFunctionNames) &&
853 workerNode.info.taskFunctionNames.length > 0
854 ) {
855 return workerNode.info.taskFunctionNames
856 }
857 }
858 return []
859 }
860
861 /** @inheritDoc */
862 public async setDefaultTaskFunction (name: string): Promise<boolean> {
863 return await this.sendTaskFunctionOperationToWorkers({
864 taskFunctionOperation: 'default',
865 taskFunctionName: name
866 })
867 }
868
869 private deleteTaskFunctionWorkerUsages (name: string): void {
870 for (const workerNode of this.workerNodes) {
871 workerNode.deleteTaskFunctionWorkerUsage(name)
872 }
873 }
874
875 private shallExecuteTask (workerNodeKey: number): boolean {
876 return (
877 this.tasksQueueSize(workerNodeKey) === 0 &&
878 this.workerNodes[workerNodeKey].usage.tasks.executing <
879 (this.opts.tasksQueueOptions?.concurrency as number)
880 )
881 }
882
883 /** @inheritDoc */
884 public async execute (
885 data?: Data,
886 name?: string,
887 transferList?: TransferListItem[]
888 ): Promise<Response> {
889 return await new Promise<Response>((resolve, reject) => {
890 if (!this.started) {
891 reject(new Error('Cannot execute a task on not started pool'))
892 return
893 }
894 if (this.destroying) {
895 reject(new Error('Cannot execute a task on destroying pool'))
896 return
897 }
898 if (name != null && typeof name !== 'string') {
899 reject(new TypeError('name argument must be a string'))
900 return
901 }
902 if (
903 name != null &&
904 typeof name === 'string' &&
905 name.trim().length === 0
906 ) {
907 reject(new TypeError('name argument must not be an empty string'))
908 return
909 }
910 if (transferList != null && !Array.isArray(transferList)) {
911 reject(new TypeError('transferList argument must be an array'))
912 return
913 }
914 const timestamp = performance.now()
915 const workerNodeKey = this.chooseWorkerNode()
916 const task: Task<Data> = {
917 name: name ?? DEFAULT_TASK_NAME,
918 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
919 data: data ?? ({} as Data),
920 transferList,
921 timestamp,
922 taskId: randomUUID()
923 }
924 this.promiseResponseMap.set(task.taskId as string, {
925 resolve,
926 reject,
927 workerNodeKey
928 })
929 if (
930 this.opts.enableTasksQueue === false ||
931 (this.opts.enableTasksQueue === true &&
932 this.shallExecuteTask(workerNodeKey))
933 ) {
934 this.executeTask(workerNodeKey, task)
935 } else {
936 this.enqueueTask(workerNodeKey, task)
937 }
938 })
939 }
940
941 /** @inheritdoc */
942 public start (): void {
943 if (this.started) {
944 throw new Error('Cannot start an already started pool')
945 }
946 if (this.starting) {
947 throw new Error('Cannot start an already starting pool')
948 }
949 if (this.destroying) {
950 throw new Error('Cannot start a destroying pool')
951 }
952 this.starting = true
953 while (
954 this.workerNodes.reduce(
955 (accumulator, workerNode) =>
956 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
957 0
958 ) < this.numberOfWorkers
959 ) {
960 this.createAndSetupWorkerNode()
961 }
962 this.starting = false
963 this.started = true
964 }
965
966 /** @inheritDoc */
967 public async destroy (): Promise<void> {
968 if (!this.started) {
969 throw new Error('Cannot destroy an already destroyed pool')
970 }
971 if (this.starting) {
972 throw new Error('Cannot destroy an starting pool')
973 }
974 if (this.destroying) {
975 throw new Error('Cannot destroy an already destroying pool')
976 }
977 this.destroying = true
978 await Promise.all(
979 this.workerNodes.map(async (_, workerNodeKey) => {
980 await this.destroyWorkerNode(workerNodeKey)
981 })
982 )
983 this.emitter?.emit(PoolEvents.destroy, this.info)
984 this.emitter?.emitDestroy()
985 this.destroying = false
986 this.started = false
987 }
988
989 protected async sendKillMessageToWorker (
990 workerNodeKey: number
991 ): Promise<void> {
992 await new Promise<void>((resolve, reject) => {
993 const killMessageListener = (message: MessageValue<Response>): void => {
994 this.checkMessageWorkerId(message)
995 if (message.kill === 'success') {
996 resolve()
997 } else if (message.kill === 'failure') {
998 reject(
999 new Error(
1000 `Kill message handling failed on worker ${
1001 message.workerId as number
1002 }`
1003 )
1004 )
1005 }
1006 }
1007 // FIXME: should be registered only once
1008 this.registerWorkerMessageListener(workerNodeKey, killMessageListener)
1009 this.sendToWorker(workerNodeKey, { kill: true })
1010 })
1011 }
1012
1013 /**
1014 * Terminates the worker node given its worker node key.
1015 *
1016 * @param workerNodeKey - The worker node key.
1017 */
1018 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
1019
1020 /**
1021 * Setup hook to execute code before worker nodes are created in the abstract constructor.
1022 * Can be overridden.
1023 *
1024 * @virtual
1025 */
1026 protected setupHook (): void {
1027 /* Intentionally empty */
1028 }
1029
1030 /**
1031 * Should return whether the worker is the main worker or not.
1032 */
1033 protected abstract isMain (): boolean
1034
1035 /**
1036 * Hook executed before the worker task execution.
1037 * Can be overridden.
1038 *
1039 * @param workerNodeKey - The worker node key.
1040 * @param task - The task to execute.
1041 */
1042 protected beforeTaskExecutionHook (
1043 workerNodeKey: number,
1044 task: Task<Data>
1045 ): void {
1046 if (this.workerNodes[workerNodeKey]?.usage != null) {
1047 const workerUsage = this.workerNodes[workerNodeKey].usage
1048 ++workerUsage.tasks.executing
1049 this.updateWaitTimeWorkerUsage(workerUsage, task)
1050 }
1051 if (
1052 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1053 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
1054 task.name as string
1055 ) != null
1056 ) {
1057 const taskFunctionWorkerUsage = this.workerNodes[
1058 workerNodeKey
1059 ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
1060 ++taskFunctionWorkerUsage.tasks.executing
1061 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
1062 }
1063 }
1064
1065 /**
1066 * Hook executed after the worker task execution.
1067 * Can be overridden.
1068 *
1069 * @param workerNodeKey - The worker node key.
1070 * @param message - The received message.
1071 */
1072 protected afterTaskExecutionHook (
1073 workerNodeKey: number,
1074 message: MessageValue<Response>
1075 ): void {
1076 if (this.workerNodes[workerNodeKey]?.usage != null) {
1077 const workerUsage = this.workerNodes[workerNodeKey].usage
1078 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
1079 this.updateRunTimeWorkerUsage(workerUsage, message)
1080 this.updateEluWorkerUsage(workerUsage, message)
1081 }
1082 if (
1083 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1084 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
1085 message.taskPerformance?.name as string
1086 ) != null
1087 ) {
1088 const taskFunctionWorkerUsage = this.workerNodes[
1089 workerNodeKey
1090 ].getTaskFunctionWorkerUsage(
1091 message.taskPerformance?.name as string
1092 ) as WorkerUsage
1093 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
1094 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
1095 this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
1096 }
1097 }
1098
1099 /**
1100 * Whether the worker node shall update its task function worker usage or not.
1101 *
1102 * @param workerNodeKey - The worker node key.
1103 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1104 */
1105 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
1106 const workerInfo = this.getWorkerInfo(workerNodeKey)
1107 return (
1108 workerInfo != null &&
1109 Array.isArray(workerInfo.taskFunctionNames) &&
1110 workerInfo.taskFunctionNames.length > 2
1111 )
1112 }
1113
1114 private updateTaskStatisticsWorkerUsage (
1115 workerUsage: WorkerUsage,
1116 message: MessageValue<Response>
1117 ): void {
1118 const workerTaskStatistics = workerUsage.tasks
1119 if (
1120 workerTaskStatistics.executing != null &&
1121 workerTaskStatistics.executing > 0
1122 ) {
1123 --workerTaskStatistics.executing
1124 }
1125 if (message.workerError == null) {
1126 ++workerTaskStatistics.executed
1127 } else {
1128 ++workerTaskStatistics.failed
1129 }
1130 }
1131
1132 private updateRunTimeWorkerUsage (
1133 workerUsage: WorkerUsage,
1134 message: MessageValue<Response>
1135 ): void {
1136 if (message.workerError != null) {
1137 return
1138 }
1139 updateMeasurementStatistics(
1140 workerUsage.runTime,
1141 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
1142 message.taskPerformance?.runTime ?? 0
1143 )
1144 }
1145
1146 private updateWaitTimeWorkerUsage (
1147 workerUsage: WorkerUsage,
1148 task: Task<Data>
1149 ): void {
1150 const timestamp = performance.now()
1151 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
1152 updateMeasurementStatistics(
1153 workerUsage.waitTime,
1154 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
1155 taskWaitTime
1156 )
1157 }
1158
1159 private updateEluWorkerUsage (
1160 workerUsage: WorkerUsage,
1161 message: MessageValue<Response>
1162 ): void {
1163 if (message.workerError != null) {
1164 return
1165 }
1166 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
1167 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
1168 updateMeasurementStatistics(
1169 workerUsage.elu.active,
1170 eluTaskStatisticsRequirements,
1171 message.taskPerformance?.elu?.active ?? 0
1172 )
1173 updateMeasurementStatistics(
1174 workerUsage.elu.idle,
1175 eluTaskStatisticsRequirements,
1176 message.taskPerformance?.elu?.idle ?? 0
1177 )
1178 if (eluTaskStatisticsRequirements.aggregate) {
1179 if (message.taskPerformance?.elu != null) {
1180 if (workerUsage.elu.utilization != null) {
1181 workerUsage.elu.utilization =
1182 (workerUsage.elu.utilization +
1183 message.taskPerformance.elu.utilization) /
1184 2
1185 } else {
1186 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
1187 }
1188 }
1189 }
1190 }
1191
1192 /**
1193 * Chooses a worker node for the next task.
1194 *
1195 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
1196 *
1197 * @returns The chosen worker node key
1198 */
1199 private chooseWorkerNode (): number {
1200 if (this.shallCreateDynamicWorker()) {
1201 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
1202 if (
1203 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1204 ) {
1205 return workerNodeKey
1206 }
1207 }
1208 return this.workerChoiceStrategyContext.execute()
1209 }
1210
1211 /**
1212 * Conditions for dynamic worker creation.
1213 *
1214 * @returns Whether to create a dynamic worker or not.
1215 */
1216 private shallCreateDynamicWorker (): boolean {
1217 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
1218 }
1219
1220 /**
1221 * Sends a message to worker given its worker node key.
1222 *
1223 * @param workerNodeKey - The worker node key.
1224 * @param message - The message.
1225 * @param transferList - The optional array of transferable objects.
1226 */
1227 protected abstract sendToWorker (
1228 workerNodeKey: number,
1229 message: MessageValue<Data>,
1230 transferList?: TransferListItem[]
1231 ): void
1232
1233 /**
1234 * Creates a new worker.
1235 *
1236 * @returns Newly created worker.
1237 */
1238 protected abstract createWorker (): Worker
1239
1240 /**
1241 * Creates a new, completely set up worker node.
1242 *
1243 * @returns New, completely set up worker node key.
1244 */
1245 protected createAndSetupWorkerNode (): number {
1246 const worker = this.createWorker()
1247
1248 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
1249 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
1250 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1251 worker.on('error', error => {
1252 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1253 this.flagWorkerNodeAsNotReady(workerNodeKey)
1254 const workerInfo = this.getWorkerInfo(workerNodeKey)
1255 this.emitter?.emit(PoolEvents.error, error)
1256 this.workerNodes[workerNodeKey].closeChannel()
1257 if (
1258 this.started &&
1259 !this.starting &&
1260 !this.destroying &&
1261 this.opts.restartWorkerOnError === true
1262 ) {
1263 if (workerInfo.dynamic) {
1264 this.createAndSetupDynamicWorkerNode()
1265 } else {
1266 this.createAndSetupWorkerNode()
1267 }
1268 }
1269 if (
1270 this.started &&
1271 !this.destroying &&
1272 this.opts.enableTasksQueue === true
1273 ) {
1274 this.redistributeQueuedTasks(workerNodeKey)
1275 }
1276 })
1277 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
1278 worker.once('exit', () => {
1279 this.removeWorkerNode(worker)
1280 })
1281
1282 const workerNodeKey = this.addWorkerNode(worker)
1283
1284 this.afterWorkerNodeSetup(workerNodeKey)
1285
1286 return workerNodeKey
1287 }
1288
1289 /**
1290 * Creates a new, completely set up dynamic worker node.
1291 *
1292 * @returns New, completely set up dynamic worker node key.
1293 */
1294 protected createAndSetupDynamicWorkerNode (): number {
1295 const workerNodeKey = this.createAndSetupWorkerNode()
1296 this.registerWorkerMessageListener(workerNodeKey, message => {
1297 this.checkMessageWorkerId(message)
1298 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1299 message.workerId
1300 )
1301 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
1302 // Kill message received from worker
1303 if (
1304 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1305 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
1306 ((this.opts.enableTasksQueue === false &&
1307 workerUsage.tasks.executing === 0) ||
1308 (this.opts.enableTasksQueue === true &&
1309 workerUsage.tasks.executing === 0 &&
1310 this.tasksQueueSize(localWorkerNodeKey) === 0)))
1311 ) {
1312 // Flag the worker node as not ready immediately
1313 this.flagWorkerNodeAsNotReady(localWorkerNodeKey)
1314 this.destroyWorkerNode(localWorkerNodeKey).catch(error => {
1315 this.emitter?.emit(PoolEvents.error, error)
1316 })
1317 }
1318 })
1319 const workerInfo = this.getWorkerInfo(workerNodeKey)
1320 this.sendToWorker(workerNodeKey, {
1321 checkActive: true
1322 })
1323 if (this.taskFunctions.size > 0) {
1324 for (const [taskFunctionName, taskFunction] of this.taskFunctions) {
1325 this.sendTaskFunctionOperationToWorker(workerNodeKey, {
1326 taskFunctionOperation: 'add',
1327 taskFunctionName,
1328 taskFunction: taskFunction.toString()
1329 }).catch(error => {
1330 this.emitter?.emit(PoolEvents.error, error)
1331 })
1332 }
1333 }
1334 workerInfo.dynamic = true
1335 if (
1336 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
1337 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1338 ) {
1339 workerInfo.ready = true
1340 }
1341 this.checkAndEmitDynamicWorkerCreationEvents()
1342 return workerNodeKey
1343 }
1344
1345 /**
1346 * Registers a listener callback on the worker given its worker node key.
1347 *
1348 * @param workerNodeKey - The worker node key.
1349 * @param listener - The message listener callback.
1350 */
1351 protected abstract registerWorkerMessageListener<
1352 Message extends Data | Response
1353 >(
1354 workerNodeKey: number,
1355 listener: (message: MessageValue<Message>) => void
1356 ): void
1357
1358 /**
1359 * Registers once a listener callback on the worker given its worker node key.
1360 *
1361 * @param workerNodeKey - The worker node key.
1362 * @param listener - The message listener callback.
1363 */
1364 protected abstract registerOnceWorkerMessageListener<
1365 Message extends Data | Response
1366 >(
1367 workerNodeKey: number,
1368 listener: (message: MessageValue<Message>) => void
1369 ): void
1370
1371 /**
1372 * Deregisters a listener callback on the worker given its worker node key.
1373 *
1374 * @param workerNodeKey - The worker node key.
1375 * @param listener - The message listener callback.
1376 */
1377 protected abstract deregisterWorkerMessageListener<
1378 Message extends Data | Response
1379 >(
1380 workerNodeKey: number,
1381 listener: (message: MessageValue<Message>) => void
1382 ): void
1383
1384 /**
1385 * Method hooked up after a worker node has been newly created.
1386 * Can be overridden.
1387 *
1388 * @param workerNodeKey - The newly created worker node key.
1389 */
1390 protected afterWorkerNodeSetup (workerNodeKey: number): void {
1391 // Listen to worker messages.
1392 this.registerWorkerMessageListener(
1393 workerNodeKey,
1394 this.workerMessageListener.bind(this)
1395 )
1396 // Send the startup message to worker.
1397 this.sendStartupMessageToWorker(workerNodeKey)
1398 // Send the statistics message to worker.
1399 this.sendStatisticsMessageToWorker(workerNodeKey)
1400 if (this.opts.enableTasksQueue === true) {
1401 if (this.opts.tasksQueueOptions?.taskStealing === true) {
1402 this.workerNodes[workerNodeKey].addEventListener(
1403 'emptyqueue',
1404 this.handleEmptyQueueEvent as EventListener
1405 )
1406 }
1407 if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
1408 this.workerNodes[workerNodeKey].addEventListener(
1409 'backpressure',
1410 this.handleBackPressureEvent as EventListener
1411 )
1412 }
1413 }
1414 }
1415
1416 /**
1417 * Sends the startup message to worker given its worker node key.
1418 *
1419 * @param workerNodeKey - The worker node key.
1420 */
1421 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1422
1423 /**
1424 * Sends the statistics message to worker given its worker node key.
1425 *
1426 * @param workerNodeKey - The worker node key.
1427 */
1428 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
1429 this.sendToWorker(workerNodeKey, {
1430 statistics: {
1431 runTime:
1432 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1433 .runTime.aggregate,
1434 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1435 .elu.aggregate
1436 }
1437 })
1438 }
1439
1440 private redistributeQueuedTasks (workerNodeKey: number): void {
1441 while (this.tasksQueueSize(workerNodeKey) > 0) {
1442 const destinationWorkerNodeKey = this.workerNodes.reduce(
1443 (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
1444 return workerNode.info.ready &&
1445 workerNode.usage.tasks.queued <
1446 workerNodes[minWorkerNodeKey].usage.tasks.queued
1447 ? workerNodeKey
1448 : minWorkerNodeKey
1449 },
1450 0
1451 )
1452 const task = this.dequeueTask(workerNodeKey) as Task<Data>
1453 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1454 this.executeTask(destinationWorkerNodeKey, task)
1455 } else {
1456 this.enqueueTask(destinationWorkerNodeKey, task)
1457 }
1458 }
1459 }
1460
1461 private updateTaskStolenStatisticsWorkerUsage (
1462 workerNodeKey: number,
1463 taskName: string
1464 ): void {
1465 const workerNode = this.workerNodes[workerNodeKey]
1466 if (workerNode?.usage != null) {
1467 ++workerNode.usage.tasks.stolen
1468 }
1469 if (
1470 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1471 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1472 ) {
1473 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1474 taskName
1475 ) as WorkerUsage
1476 ++taskFunctionWorkerUsage.tasks.stolen
1477 }
1478 }
1479
1480 private readonly handleEmptyQueueEvent = (
1481 event: CustomEvent<WorkerNodeEventDetail>
1482 ): void => {
1483 const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1484 event.detail.workerId
1485 )
1486 const workerNodes = this.workerNodes
1487 .slice()
1488 .sort(
1489 (workerNodeA, workerNodeB) =>
1490 workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
1491 )
1492 const sourceWorkerNode = workerNodes.find(
1493 workerNode =>
1494 workerNode.info.ready &&
1495 workerNode.info.id !== event.detail.workerId &&
1496 workerNode.usage.tasks.queued > 0
1497 )
1498 if (sourceWorkerNode != null) {
1499 const task = sourceWorkerNode.popTask() as Task<Data>
1500 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1501 this.executeTask(destinationWorkerNodeKey, task)
1502 } else {
1503 this.enqueueTask(destinationWorkerNodeKey, task)
1504 }
1505 this.updateTaskStolenStatisticsWorkerUsage(
1506 destinationWorkerNodeKey,
1507 task.name as string
1508 )
1509 }
1510 }
1511
1512 private readonly handleBackPressureEvent = (
1513 event: CustomEvent<WorkerNodeEventDetail>
1514 ): void => {
1515 const sizeOffset = 1
1516 if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) {
1517 return
1518 }
1519 const sourceWorkerNode =
1520 this.workerNodes[this.getWorkerNodeKeyByWorkerId(event.detail.workerId)]
1521 const workerNodes = this.workerNodes
1522 .slice()
1523 .sort(
1524 (workerNodeA, workerNodeB) =>
1525 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1526 )
1527 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1528 if (
1529 sourceWorkerNode.usage.tasks.queued > 0 &&
1530 workerNode.info.ready &&
1531 workerNode.info.id !== event.detail.workerId &&
1532 workerNode.usage.tasks.queued <
1533 (this.opts.tasksQueueOptions?.size as number) - sizeOffset
1534 ) {
1535 const task = sourceWorkerNode.popTask() as Task<Data>
1536 if (this.shallExecuteTask(workerNodeKey)) {
1537 this.executeTask(workerNodeKey, task)
1538 } else {
1539 this.enqueueTask(workerNodeKey, task)
1540 }
1541 this.updateTaskStolenStatisticsWorkerUsage(
1542 workerNodeKey,
1543 task.name as string
1544 )
1545 }
1546 }
1547 }
1548
1549 /**
1550 * This method is the message listener registered on each worker.
1551 */
1552 protected workerMessageListener (message: MessageValue<Response>): void {
1553 this.checkMessageWorkerId(message)
1554 if (message.ready != null && message.taskFunctionNames != null) {
1555 // Worker ready response received from worker
1556 this.handleWorkerReadyResponse(message)
1557 } else if (message.taskId != null) {
1558 // Task execution response received from worker
1559 this.handleTaskExecutionResponse(message)
1560 } else if (message.taskFunctionNames != null) {
1561 // Task function names message received from worker
1562 this.getWorkerInfo(
1563 this.getWorkerNodeKeyByWorkerId(message.workerId)
1564 ).taskFunctionNames = message.taskFunctionNames
1565 }
1566 }
1567
1568 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
1569 if (message.ready === false) {
1570 throw new Error(
1571 `Worker ${message.workerId as number} failed to initialize`
1572 )
1573 }
1574 const workerInfo = this.getWorkerInfo(
1575 this.getWorkerNodeKeyByWorkerId(message.workerId)
1576 )
1577 workerInfo.ready = message.ready as boolean
1578 workerInfo.taskFunctionNames = message.taskFunctionNames
1579 if (this.ready) {
1580 const emitPoolReadyEventOnce = once(
1581 () => this.emitter?.emit(PoolEvents.ready, this.info),
1582 this
1583 )
1584 emitPoolReadyEventOnce()
1585 }
1586 }
1587
1588 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1589 const { taskId, workerError, data } = message
1590 const promiseResponse = this.promiseResponseMap.get(taskId as string)
1591 if (promiseResponse != null) {
1592 if (workerError != null) {
1593 this.emitter?.emit(PoolEvents.taskError, workerError)
1594 promiseResponse.reject(workerError.message)
1595 } else {
1596 promiseResponse.resolve(data as Response)
1597 }
1598 const workerNodeKey = promiseResponse.workerNodeKey
1599 this.afterTaskExecutionHook(workerNodeKey, message)
1600 this.workerChoiceStrategyContext.update(workerNodeKey)
1601 this.promiseResponseMap.delete(taskId as string)
1602 if (
1603 this.opts.enableTasksQueue === true &&
1604 this.tasksQueueSize(workerNodeKey) > 0 &&
1605 this.workerNodes[workerNodeKey].usage.tasks.executing <
1606 (this.opts.tasksQueueOptions?.concurrency as number)
1607 ) {
1608 this.executeTask(
1609 workerNodeKey,
1610 this.dequeueTask(workerNodeKey) as Task<Data>
1611 )
1612 }
1613 }
1614 }
1615
1616 private checkAndEmitTaskExecutionEvents (): void {
1617 if (this.busy) {
1618 this.emitter?.emit(PoolEvents.busy, this.info)
1619 }
1620 }
1621
1622 private checkAndEmitTaskQueuingEvents (): void {
1623 if (this.hasBackPressure()) {
1624 this.emitter?.emit(PoolEvents.backPressure, this.info)
1625 }
1626 }
1627
1628 private checkAndEmitDynamicWorkerCreationEvents (): void {
1629 if (this.type === PoolTypes.dynamic) {
1630 if (this.full) {
1631 this.emitter?.emit(PoolEvents.full, this.info)
1632 }
1633 }
1634 }
1635
1636 /**
1637 * Gets the worker information given its worker node key.
1638 *
1639 * @param workerNodeKey - The worker node key.
1640 * @returns The worker information.
1641 */
1642 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
1643 return this.workerNodes[workerNodeKey]?.info
1644 }
1645
1646 /**
1647 * Adds the given worker in the pool worker nodes.
1648 *
1649 * @param worker - The worker.
1650 * @returns The added worker node key.
1651 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1652 */
1653 private addWorkerNode (worker: Worker): number {
1654 const workerNode = new WorkerNode<Worker, Data>(
1655 worker,
1656 this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2)
1657 )
1658 // Flag the worker node as ready at pool startup.
1659 if (this.starting) {
1660 workerNode.info.ready = true
1661 }
1662 this.workerNodes.push(workerNode)
1663 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1664 if (workerNodeKey === -1) {
1665 throw new Error('Worker added not found in worker nodes')
1666 }
1667 return workerNodeKey
1668 }
1669
1670 /**
1671 * Removes the given worker from the pool worker nodes.
1672 *
1673 * @param worker - The worker.
1674 */
1675 private removeWorkerNode (worker: Worker): void {
1676 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1677 if (workerNodeKey !== -1) {
1678 this.workerNodes.splice(workerNodeKey, 1)
1679 this.workerChoiceStrategyContext.remove(workerNodeKey)
1680 }
1681 }
1682
1683 protected flagWorkerNodeAsNotReady (workerNodeKey: number): void {
1684 this.getWorkerInfo(workerNodeKey).ready = false
1685 }
1686
1687 /** @inheritDoc */
1688 public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
1689 return (
1690 this.opts.enableTasksQueue === true &&
1691 this.workerNodes[workerNodeKey].hasBackPressure()
1692 )
1693 }
1694
1695 private hasBackPressure (): boolean {
1696 return (
1697 this.opts.enableTasksQueue === true &&
1698 this.workerNodes.findIndex(
1699 workerNode => !workerNode.hasBackPressure()
1700 ) === -1
1701 )
1702 }
1703
1704 /**
1705 * Executes the given task on the worker given its worker node key.
1706 *
1707 * @param workerNodeKey - The worker node key.
1708 * @param task - The task to execute.
1709 */
1710 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1711 this.beforeTaskExecutionHook(workerNodeKey, task)
1712 this.sendToWorker(workerNodeKey, task, task.transferList)
1713 this.checkAndEmitTaskExecutionEvents()
1714 }
1715
1716 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1717 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1718 this.checkAndEmitTaskQueuingEvents()
1719 return tasksQueueSize
1720 }
1721
1722 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1723 return this.workerNodes[workerNodeKey].dequeueTask()
1724 }
1725
1726 private tasksQueueSize (workerNodeKey: number): number {
1727 return this.workerNodes[workerNodeKey].tasksQueueSize()
1728 }
1729
1730 protected flushTasksQueue (workerNodeKey: number): void {
1731 while (this.tasksQueueSize(workerNodeKey) > 0) {
1732 this.executeTask(
1733 workerNodeKey,
1734 this.dequeueTask(workerNodeKey) as Task<Data>
1735 )
1736 }
1737 this.workerNodes[workerNodeKey].clearTasksQueue()
1738 }
1739
1740 private flushTasksQueues (): void {
1741 for (const [workerNodeKey] of this.workerNodes.entries()) {
1742 this.flushTasksQueue(workerNodeKey)
1743 }
1744 }
1745 }