refactor: cleanup conditions
[poolifier.git] / src / pools / abstract-pool.ts
1 import { randomUUID } from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import type { TransferListItem } from 'node:worker_threads'
4 import { EventEmitterAsyncResource } from 'node:events'
5 import { AsyncResource } from 'node:async_hooks'
6 import type {
7 MessageValue,
8 PromiseResponseWrapper,
9 Task
10 } from '../utility-types'
11 import {
12 DEFAULT_TASK_NAME,
13 EMPTY_FUNCTION,
14 average,
15 exponentialDelay,
16 isKillBehavior,
17 isPlainObject,
18 max,
19 median,
20 min,
21 round,
22 sleep
23 } from '../utils'
24 import { KillBehaviors } from '../worker/worker-options'
25 import type { TaskFunction } from '../worker/task-functions'
26 import {
27 type IPool,
28 PoolEvents,
29 type PoolInfo,
30 type PoolOptions,
31 type PoolType,
32 PoolTypes,
33 type TasksQueueOptions
34 } from './pool'
35 import type {
36 IWorker,
37 IWorkerNode,
38 TaskStatistics,
39 WorkerInfo,
40 WorkerNodeEventDetail,
41 WorkerType,
42 WorkerUsage
43 } from './worker'
44 import {
45 Measurements,
46 WorkerChoiceStrategies,
47 type WorkerChoiceStrategy,
48 type WorkerChoiceStrategyOptions
49 } from './selection-strategies/selection-strategies-types'
50 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
51 import { version } from './version'
52 import { WorkerNode } from './worker-node'
53 import {
54 checkFilePath,
55 checkValidTasksQueueOptions,
56 checkValidWorkerChoiceStrategy,
57 getDefaultTasksQueueOptions,
58 updateEluWorkerUsage,
59 updateRunTimeWorkerUsage,
60 updateTaskStatisticsWorkerUsage,
61 updateWaitTimeWorkerUsage,
62 waitWorkerNodeEvents
63 } from './utils'
64
65 /**
66 * Base class that implements some shared logic for all poolifier pools.
67 *
68 * @typeParam Worker - Type of worker which manages this pool.
69 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
70 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
71 */
72 export abstract class AbstractPool<
73 Worker extends IWorker,
74 Data = unknown,
75 Response = unknown
76 > implements IPool<Worker, Data, Response> {
77 /** @inheritDoc */
78 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
79
80 /** @inheritDoc */
81 public emitter?: EventEmitterAsyncResource
82
83 /**
84 * The task execution response promise map:
85 * - `key`: The message id of each submitted task.
86 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
87 *
88 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
89 */
90 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
91 new Map<string, PromiseResponseWrapper<Response>>()
92
93 /**
94 * Worker choice strategy context referencing a worker choice algorithm implementation.
95 */
96 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
97 Worker,
98 Data,
99 Response
100 >
101
102 /**
103 * The task functions added at runtime map:
104 * - `key`: The task function name.
105 * - `value`: The task function itself.
106 */
107 private readonly taskFunctions: Map<string, TaskFunction<Data, Response>>
108
109 /**
110 * Whether the pool is started or not.
111 */
112 private started: boolean
113 /**
114 * Whether the pool is starting or not.
115 */
116 private starting: boolean
117 /**
118 * Whether the pool is destroying or not.
119 */
120 private destroying: boolean
121 /**
122 * Whether the pool ready event has been emitted or not.
123 */
124 private readyEventEmitted: boolean
125 /**
126 * The start timestamp of the pool.
127 */
128 private readonly startTimestamp
129
130 /**
131 * Constructs a new poolifier pool.
132 *
133 * @param minimumNumberOfWorkers - Minimum number of workers that this pool manages.
134 * @param filePath - Path to the worker file.
135 * @param opts - Options for the pool.
136 * @param maximumNumberOfWorkers - Maximum number of workers that this pool manages.
137 */
138 public constructor (
139 protected readonly minimumNumberOfWorkers: number,
140 protected readonly filePath: string,
141 protected readonly opts: PoolOptions<Worker>,
142 protected readonly maximumNumberOfWorkers?: number
143 ) {
144 if (!this.isMain()) {
145 throw new Error(
146 'Cannot start a pool from a worker with the same type as the pool'
147 )
148 }
149 this.checkPoolType()
150 checkFilePath(this.filePath)
151 this.checkMinimumNumberOfWorkers(this.minimumNumberOfWorkers)
152 this.checkPoolOptions(this.opts)
153
154 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
155 this.executeTask = this.executeTask.bind(this)
156 this.enqueueTask = this.enqueueTask.bind(this)
157
158 if (this.opts.enableEvents === true) {
159 this.initializeEventEmitter()
160 }
161 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
162 Worker,
163 Data,
164 Response
165 >(
166 this,
167 this.opts.workerChoiceStrategy,
168 this.opts.workerChoiceStrategyOptions
169 )
170
171 this.setupHook()
172
173 this.taskFunctions = new Map<string, TaskFunction<Data, Response>>()
174
175 this.started = false
176 this.starting = false
177 this.destroying = false
178 this.readyEventEmitted = false
179 if (this.opts.startWorkers === true) {
180 this.start()
181 }
182
183 this.startTimestamp = performance.now()
184 }
185
186 private checkPoolType (): void {
187 if (this.type === PoolTypes.fixed && this.maximumNumberOfWorkers != null) {
188 throw new Error(
189 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
190 )
191 }
192 }
193
194 private checkMinimumNumberOfWorkers (minimumNumberOfWorkers: number): void {
195 if (minimumNumberOfWorkers == null) {
196 throw new Error(
197 'Cannot instantiate a pool without specifying the number of workers'
198 )
199 } else if (!Number.isSafeInteger(minimumNumberOfWorkers)) {
200 throw new TypeError(
201 'Cannot instantiate a pool with a non safe integer number of workers'
202 )
203 } else if (minimumNumberOfWorkers < 0) {
204 throw new RangeError(
205 'Cannot instantiate a pool with a negative number of workers'
206 )
207 } else if (this.type === PoolTypes.fixed && minimumNumberOfWorkers === 0) {
208 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
209 }
210 }
211
212 private checkPoolOptions (opts: PoolOptions<Worker>): void {
213 if (isPlainObject(opts)) {
214 this.opts.startWorkers = opts.startWorkers ?? true
215 checkValidWorkerChoiceStrategy(
216 opts.workerChoiceStrategy as WorkerChoiceStrategy
217 )
218 this.opts.workerChoiceStrategy =
219 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
220 this.checkValidWorkerChoiceStrategyOptions(
221 opts.workerChoiceStrategyOptions as WorkerChoiceStrategyOptions
222 )
223 if (opts.workerChoiceStrategyOptions != null) {
224 this.opts.workerChoiceStrategyOptions = opts.workerChoiceStrategyOptions
225 }
226 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
227 this.opts.enableEvents = opts.enableEvents ?? true
228 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
229 if (this.opts.enableTasksQueue) {
230 checkValidTasksQueueOptions(opts.tasksQueueOptions as TasksQueueOptions)
231 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
232 opts.tasksQueueOptions as TasksQueueOptions
233 )
234 }
235 } else {
236 throw new TypeError('Invalid pool options: must be a plain object')
237 }
238 }
239
240 private checkValidWorkerChoiceStrategyOptions (
241 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
242 ): void {
243 if (
244 workerChoiceStrategyOptions != null &&
245 !isPlainObject(workerChoiceStrategyOptions)
246 ) {
247 throw new TypeError(
248 'Invalid worker choice strategy options: must be a plain object'
249 )
250 }
251 if (
252 workerChoiceStrategyOptions?.weights != null &&
253 Object.keys(workerChoiceStrategyOptions.weights).length !==
254 (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
255 ) {
256 throw new Error(
257 'Invalid worker choice strategy options: must have a weight for each worker node'
258 )
259 }
260 if (
261 workerChoiceStrategyOptions?.measurement != null &&
262 !Object.values(Measurements).includes(
263 workerChoiceStrategyOptions.measurement
264 )
265 ) {
266 throw new Error(
267 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
268 )
269 }
270 }
271
272 private initializeEventEmitter (): void {
273 this.emitter = new EventEmitterAsyncResource({
274 name: `poolifier:${this.type}-${this.worker}-pool`
275 })
276 }
277
278 /** @inheritDoc */
279 public get info (): PoolInfo {
280 return {
281 version,
282 type: this.type,
283 worker: this.worker,
284 started: this.started,
285 ready: this.ready,
286 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
287 minSize: this.minimumNumberOfWorkers,
288 maxSize: this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers,
289 ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
290 .runTime.aggregate &&
291 this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
292 .waitTime.aggregate && { utilization: round(this.utilization) }),
293 workerNodes: this.workerNodes.length,
294 idleWorkerNodes: this.workerNodes.reduce(
295 (accumulator, workerNode) =>
296 workerNode.usage.tasks.executing === 0
297 ? accumulator + 1
298 : accumulator,
299 0
300 ),
301 ...(this.opts.enableTasksQueue === true && {
302 stealingWorkerNodes: this.workerNodes.reduce(
303 (accumulator, workerNode) =>
304 workerNode.info.stealing ? accumulator + 1 : accumulator,
305 0
306 )
307 }),
308 busyWorkerNodes: this.workerNodes.reduce(
309 (accumulator, _workerNode, workerNodeKey) =>
310 this.isWorkerNodeBusy(workerNodeKey) ? accumulator + 1 : accumulator,
311 0
312 ),
313 executedTasks: this.workerNodes.reduce(
314 (accumulator, workerNode) =>
315 accumulator + workerNode.usage.tasks.executed,
316 0
317 ),
318 executingTasks: this.workerNodes.reduce(
319 (accumulator, workerNode) =>
320 accumulator + workerNode.usage.tasks.executing,
321 0
322 ),
323 ...(this.opts.enableTasksQueue === true && {
324 queuedTasks: this.workerNodes.reduce(
325 (accumulator, workerNode) =>
326 accumulator + workerNode.usage.tasks.queued,
327 0
328 )
329 }),
330 ...(this.opts.enableTasksQueue === true && {
331 maxQueuedTasks: this.workerNodes.reduce(
332 (accumulator, workerNode) =>
333 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
334 0
335 )
336 }),
337 ...(this.opts.enableTasksQueue === true && {
338 backPressure: this.hasBackPressure()
339 }),
340 ...(this.opts.enableTasksQueue === true && {
341 stolenTasks: this.workerNodes.reduce(
342 (accumulator, workerNode) =>
343 accumulator + workerNode.usage.tasks.stolen,
344 0
345 )
346 }),
347 failedTasks: this.workerNodes.reduce(
348 (accumulator, workerNode) =>
349 accumulator + workerNode.usage.tasks.failed,
350 0
351 ),
352 ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
353 .runTime.aggregate && {
354 runTime: {
355 minimum: round(
356 min(
357 ...this.workerNodes.map(
358 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
359 )
360 )
361 ),
362 maximum: round(
363 max(
364 ...this.workerNodes.map(
365 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
366 )
367 )
368 ),
369 ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
370 .runTime.average && {
371 average: round(
372 average(
373 this.workerNodes.reduce<number[]>(
374 (accumulator, workerNode) =>
375 accumulator.concat(workerNode.usage.runTime.history),
376 []
377 )
378 )
379 )
380 }),
381 ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
382 .runTime.median && {
383 median: round(
384 median(
385 this.workerNodes.reduce<number[]>(
386 (accumulator, workerNode) =>
387 accumulator.concat(workerNode.usage.runTime.history),
388 []
389 )
390 )
391 )
392 })
393 }
394 }),
395 ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
396 .waitTime.aggregate && {
397 waitTime: {
398 minimum: round(
399 min(
400 ...this.workerNodes.map(
401 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
402 )
403 )
404 ),
405 maximum: round(
406 max(
407 ...this.workerNodes.map(
408 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
409 )
410 )
411 ),
412 ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
413 .waitTime.average && {
414 average: round(
415 average(
416 this.workerNodes.reduce<number[]>(
417 (accumulator, workerNode) =>
418 accumulator.concat(workerNode.usage.waitTime.history),
419 []
420 )
421 )
422 )
423 }),
424 ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
425 .waitTime.median && {
426 median: round(
427 median(
428 this.workerNodes.reduce<number[]>(
429 (accumulator, workerNode) =>
430 accumulator.concat(workerNode.usage.waitTime.history),
431 []
432 )
433 )
434 )
435 })
436 }
437 })
438 }
439 }
440
441 /**
442 * The pool readiness boolean status.
443 */
444 private get ready (): boolean {
445 return (
446 this.workerNodes.reduce(
447 (accumulator, workerNode) =>
448 !workerNode.info.dynamic && workerNode.info.ready
449 ? accumulator + 1
450 : accumulator,
451 0
452 ) >= this.minimumNumberOfWorkers
453 )
454 }
455
456 /**
457 * The approximate pool utilization.
458 *
459 * @returns The pool utilization.
460 */
461 private get utilization (): number {
462 const poolTimeCapacity =
463 (performance.now() - this.startTimestamp) *
464 (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
465 const totalTasksRunTime = this.workerNodes.reduce(
466 (accumulator, workerNode) =>
467 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
468 0
469 )
470 const totalTasksWaitTime = this.workerNodes.reduce(
471 (accumulator, workerNode) =>
472 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
473 0
474 )
475 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
476 }
477
478 /**
479 * The pool type.
480 *
481 * If it is `'dynamic'`, it provides the `max` property.
482 */
483 protected abstract get type (): PoolType
484
485 /**
486 * The worker type.
487 */
488 protected abstract get worker (): WorkerType
489
490 /**
491 * Checks if the worker id sent in the received message from a worker is valid.
492 *
493 * @param message - The received message.
494 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
495 */
496 private checkMessageWorkerId (message: MessageValue<Data | Response>): void {
497 if (message.workerId == null) {
498 throw new Error('Worker message received without worker id')
499 } else if (this.getWorkerNodeKeyByWorkerId(message.workerId) === -1) {
500 throw new Error(
501 `Worker message received from unknown worker '${message.workerId}'`
502 )
503 }
504 }
505
506 /**
507 * Gets the worker node key given its worker id.
508 *
509 * @param workerId - The worker id.
510 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
511 */
512 private getWorkerNodeKeyByWorkerId (workerId: number | undefined): number {
513 return this.workerNodes.findIndex(
514 workerNode => workerNode.info.id === workerId
515 )
516 }
517
518 /** @inheritDoc */
519 public setWorkerChoiceStrategy (
520 workerChoiceStrategy: WorkerChoiceStrategy,
521 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
522 ): void {
523 checkValidWorkerChoiceStrategy(workerChoiceStrategy)
524 this.opts.workerChoiceStrategy = workerChoiceStrategy
525 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
526 this.opts.workerChoiceStrategy
527 )
528 if (workerChoiceStrategyOptions != null) {
529 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
530 }
531 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
532 workerNode.resetUsage()
533 this.sendStatisticsMessageToWorker(workerNodeKey)
534 }
535 }
536
537 /** @inheritDoc */
538 public setWorkerChoiceStrategyOptions (
539 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
540 ): void {
541 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
542 if (workerChoiceStrategyOptions != null) {
543 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
544 }
545 this.workerChoiceStrategyContext.setOptions(
546 this,
547 this.opts.workerChoiceStrategyOptions
548 )
549 }
550
551 /** @inheritDoc */
552 public enableTasksQueue (
553 enable: boolean,
554 tasksQueueOptions?: TasksQueueOptions
555 ): void {
556 if (this.opts.enableTasksQueue === true && !enable) {
557 this.unsetTaskStealing()
558 this.unsetTasksStealingOnBackPressure()
559 this.flushTasksQueues()
560 }
561 this.opts.enableTasksQueue = enable
562 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
563 }
564
565 /** @inheritDoc */
566 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
567 if (this.opts.enableTasksQueue === true) {
568 checkValidTasksQueueOptions(tasksQueueOptions)
569 this.opts.tasksQueueOptions =
570 this.buildTasksQueueOptions(tasksQueueOptions)
571 this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number)
572 if (this.opts.tasksQueueOptions.taskStealing === true) {
573 this.unsetTaskStealing()
574 this.setTaskStealing()
575 } else {
576 this.unsetTaskStealing()
577 }
578 if (this.opts.tasksQueueOptions.tasksStealingOnBackPressure === true) {
579 this.unsetTasksStealingOnBackPressure()
580 this.setTasksStealingOnBackPressure()
581 } else {
582 this.unsetTasksStealingOnBackPressure()
583 }
584 } else if (this.opts.tasksQueueOptions != null) {
585 delete this.opts.tasksQueueOptions
586 }
587 }
588
589 private buildTasksQueueOptions (
590 tasksQueueOptions: TasksQueueOptions
591 ): TasksQueueOptions {
592 return {
593 ...getDefaultTasksQueueOptions(
594 this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
595 ),
596 ...tasksQueueOptions
597 }
598 }
599
600 private setTasksQueueSize (size: number): void {
601 for (const workerNode of this.workerNodes) {
602 workerNode.tasksQueueBackPressureSize = size
603 }
604 }
605
606 private setTaskStealing (): void {
607 for (const [workerNodeKey] of this.workerNodes.entries()) {
608 this.workerNodes[workerNodeKey].on(
609 'idleWorkerNode',
610 this.handleIdleWorkerNodeEvent
611 )
612 }
613 }
614
615 private unsetTaskStealing (): void {
616 for (const [workerNodeKey] of this.workerNodes.entries()) {
617 this.workerNodes[workerNodeKey].off(
618 'idleWorkerNode',
619 this.handleIdleWorkerNodeEvent
620 )
621 }
622 }
623
624 private setTasksStealingOnBackPressure (): void {
625 for (const [workerNodeKey] of this.workerNodes.entries()) {
626 this.workerNodes[workerNodeKey].on(
627 'backPressure',
628 this.handleBackPressureEvent
629 )
630 }
631 }
632
633 private unsetTasksStealingOnBackPressure (): void {
634 for (const [workerNodeKey] of this.workerNodes.entries()) {
635 this.workerNodes[workerNodeKey].off(
636 'backPressure',
637 this.handleBackPressureEvent
638 )
639 }
640 }
641
642 /**
643 * Whether the pool is full or not.
644 *
645 * The pool filling boolean status.
646 */
647 protected get full (): boolean {
648 return (
649 this.workerNodes.length >=
650 (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
651 )
652 }
653
654 /**
655 * Whether the pool is busy or not.
656 *
657 * The pool busyness boolean status.
658 */
659 protected abstract get busy (): boolean
660
661 /**
662 * Whether worker nodes are executing concurrently their tasks quota or not.
663 *
664 * @returns Worker nodes busyness boolean status.
665 */
666 protected internalBusy (): boolean {
667 if (this.opts.enableTasksQueue === true) {
668 return (
669 this.workerNodes.findIndex(
670 workerNode =>
671 workerNode.info.ready &&
672 workerNode.usage.tasks.executing <
673 (this.opts.tasksQueueOptions?.concurrency as number)
674 ) === -1
675 )
676 }
677 return (
678 this.workerNodes.findIndex(
679 workerNode =>
680 workerNode.info.ready && workerNode.usage.tasks.executing === 0
681 ) === -1
682 )
683 }
684
685 private isWorkerNodeBusy (workerNodeKey: number): boolean {
686 if (this.opts.enableTasksQueue === true) {
687 return (
688 this.workerNodes[workerNodeKey].usage.tasks.executing >=
689 (this.opts.tasksQueueOptions?.concurrency as number)
690 )
691 }
692 return this.workerNodes[workerNodeKey].usage.tasks.executing > 0
693 }
694
695 private async sendTaskFunctionOperationToWorker (
696 workerNodeKey: number,
697 message: MessageValue<Data>
698 ): Promise<boolean> {
699 return await new Promise<boolean>((resolve, reject) => {
700 const taskFunctionOperationListener = (
701 message: MessageValue<Response>
702 ): void => {
703 this.checkMessageWorkerId(message)
704 const workerId = this.getWorkerInfo(workerNodeKey).id as number
705 if (
706 message.taskFunctionOperationStatus != null &&
707 message.workerId === workerId
708 ) {
709 if (message.taskFunctionOperationStatus) {
710 resolve(true)
711 } else if (!message.taskFunctionOperationStatus) {
712 reject(
713 new Error(
714 `Task function operation '${
715 message.taskFunctionOperation as string
716 }' failed on worker ${message.workerId} with error: '${
717 message.workerError?.message as string
718 }'`
719 )
720 )
721 }
722 this.deregisterWorkerMessageListener(
723 this.getWorkerNodeKeyByWorkerId(message.workerId),
724 taskFunctionOperationListener
725 )
726 }
727 }
728 this.registerWorkerMessageListener(
729 workerNodeKey,
730 taskFunctionOperationListener
731 )
732 this.sendToWorker(workerNodeKey, message)
733 })
734 }
735
736 private async sendTaskFunctionOperationToWorkers (
737 message: MessageValue<Data>
738 ): Promise<boolean> {
739 return await new Promise<boolean>((resolve, reject) => {
740 const responsesReceived = new Array<MessageValue<Response>>()
741 const taskFunctionOperationsListener = (
742 message: MessageValue<Response>
743 ): void => {
744 this.checkMessageWorkerId(message)
745 if (message.taskFunctionOperationStatus != null) {
746 responsesReceived.push(message)
747 if (responsesReceived.length === this.workerNodes.length) {
748 if (
749 responsesReceived.every(
750 message => message.taskFunctionOperationStatus === true
751 )
752 ) {
753 resolve(true)
754 } else if (
755 responsesReceived.some(
756 message => message.taskFunctionOperationStatus === false
757 )
758 ) {
759 const errorResponse = responsesReceived.find(
760 response => response.taskFunctionOperationStatus === false
761 )
762 reject(
763 new Error(
764 `Task function operation '${
765 message.taskFunctionOperation as string
766 }' failed on worker ${
767 errorResponse?.workerId as number
768 } with error: '${
769 errorResponse?.workerError?.message as string
770 }'`
771 )
772 )
773 }
774 this.deregisterWorkerMessageListener(
775 this.getWorkerNodeKeyByWorkerId(message.workerId),
776 taskFunctionOperationsListener
777 )
778 }
779 }
780 }
781 for (const [workerNodeKey] of this.workerNodes.entries()) {
782 this.registerWorkerMessageListener(
783 workerNodeKey,
784 taskFunctionOperationsListener
785 )
786 this.sendToWorker(workerNodeKey, message)
787 }
788 })
789 }
790
791 /** @inheritDoc */
792 public hasTaskFunction (name: string): boolean {
793 for (const workerNode of this.workerNodes) {
794 if (
795 Array.isArray(workerNode.info.taskFunctionNames) &&
796 workerNode.info.taskFunctionNames.includes(name)
797 ) {
798 return true
799 }
800 }
801 return false
802 }
803
804 /** @inheritDoc */
805 public async addTaskFunction (
806 name: string,
807 fn: TaskFunction<Data, Response>
808 ): Promise<boolean> {
809 if (typeof name !== 'string') {
810 throw new TypeError('name argument must be a string')
811 }
812 if (typeof name === 'string' && name.trim().length === 0) {
813 throw new TypeError('name argument must not be an empty string')
814 }
815 if (typeof fn !== 'function') {
816 throw new TypeError('fn argument must be a function')
817 }
818 const opResult = await this.sendTaskFunctionOperationToWorkers({
819 taskFunctionOperation: 'add',
820 taskFunctionName: name,
821 taskFunction: fn.toString()
822 })
823 this.taskFunctions.set(name, fn)
824 return opResult
825 }
826
827 /** @inheritDoc */
828 public async removeTaskFunction (name: string): Promise<boolean> {
829 if (!this.taskFunctions.has(name)) {
830 throw new Error(
831 'Cannot remove a task function not handled on the pool side'
832 )
833 }
834 const opResult = await this.sendTaskFunctionOperationToWorkers({
835 taskFunctionOperation: 'remove',
836 taskFunctionName: name
837 })
838 this.deleteTaskFunctionWorkerUsages(name)
839 this.taskFunctions.delete(name)
840 return opResult
841 }
842
843 /** @inheritDoc */
844 public listTaskFunctionNames (): string[] {
845 for (const workerNode of this.workerNodes) {
846 if (
847 Array.isArray(workerNode.info.taskFunctionNames) &&
848 workerNode.info.taskFunctionNames.length > 0
849 ) {
850 return workerNode.info.taskFunctionNames
851 }
852 }
853 return []
854 }
855
856 /** @inheritDoc */
857 public async setDefaultTaskFunction (name: string): Promise<boolean> {
858 return await this.sendTaskFunctionOperationToWorkers({
859 taskFunctionOperation: 'default',
860 taskFunctionName: name
861 })
862 }
863
864 private deleteTaskFunctionWorkerUsages (name: string): void {
865 for (const workerNode of this.workerNodes) {
866 workerNode.deleteTaskFunctionWorkerUsage(name)
867 }
868 }
869
870 private shallExecuteTask (workerNodeKey: number): boolean {
871 return (
872 this.tasksQueueSize(workerNodeKey) === 0 &&
873 this.workerNodes[workerNodeKey].usage.tasks.executing <
874 (this.opts.tasksQueueOptions?.concurrency as number)
875 )
876 }
877
878 /** @inheritDoc */
879 public async execute (
880 data?: Data,
881 name?: string,
882 transferList?: TransferListItem[]
883 ): Promise<Response> {
884 return await new Promise<Response>((resolve, reject) => {
885 if (!this.started) {
886 reject(new Error('Cannot execute a task on not started pool'))
887 return
888 }
889 if (this.destroying) {
890 reject(new Error('Cannot execute a task on destroying pool'))
891 return
892 }
893 if (name != null && typeof name !== 'string') {
894 reject(new TypeError('name argument must be a string'))
895 return
896 }
897 if (
898 name != null &&
899 typeof name === 'string' &&
900 name.trim().length === 0
901 ) {
902 reject(new TypeError('name argument must not be an empty string'))
903 return
904 }
905 if (transferList != null && !Array.isArray(transferList)) {
906 reject(new TypeError('transferList argument must be an array'))
907 return
908 }
909 const timestamp = performance.now()
910 const workerNodeKey = this.chooseWorkerNode()
911 const task: Task<Data> = {
912 name: name ?? DEFAULT_TASK_NAME,
913 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
914 data: data ?? ({} as Data),
915 transferList,
916 timestamp,
917 taskId: randomUUID()
918 }
919 this.promiseResponseMap.set(task.taskId as string, {
920 resolve,
921 reject,
922 workerNodeKey,
923 ...(this.emitter != null && {
924 asyncResource: new AsyncResource('poolifier:task', {
925 triggerAsyncId: this.emitter.asyncId,
926 requireManualDestroy: true
927 })
928 })
929 })
930 if (
931 this.opts.enableTasksQueue === false ||
932 (this.opts.enableTasksQueue === true &&
933 this.shallExecuteTask(workerNodeKey))
934 ) {
935 this.executeTask(workerNodeKey, task)
936 } else {
937 this.enqueueTask(workerNodeKey, task)
938 }
939 })
940 }
941
942 /** @inheritdoc */
943 public start (): void {
944 if (this.started) {
945 throw new Error('Cannot start an already started pool')
946 }
947 if (this.starting) {
948 throw new Error('Cannot start an already starting pool')
949 }
950 if (this.destroying) {
951 throw new Error('Cannot start a destroying pool')
952 }
953 this.starting = true
954 while (
955 this.workerNodes.reduce(
956 (accumulator, workerNode) =>
957 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
958 0
959 ) < this.minimumNumberOfWorkers
960 ) {
961 this.createAndSetupWorkerNode()
962 }
963 this.starting = false
964 this.started = true
965 }
966
967 /** @inheritDoc */
968 public async destroy (): Promise<void> {
969 if (!this.started) {
970 throw new Error('Cannot destroy an already destroyed pool')
971 }
972 if (this.starting) {
973 throw new Error('Cannot destroy an starting pool')
974 }
975 if (this.destroying) {
976 throw new Error('Cannot destroy an already destroying pool')
977 }
978 this.destroying = true
979 await Promise.all(
980 this.workerNodes.map(async (_workerNode, workerNodeKey) => {
981 await this.destroyWorkerNode(workerNodeKey)
982 })
983 )
984 this.emitter?.emit(PoolEvents.destroy, this.info)
985 this.emitter?.emitDestroy()
986 this.emitter?.removeAllListeners()
987 this.readyEventEmitted = false
988 this.destroying = false
989 this.started = false
990 }
991
992 private async sendKillMessageToWorker (workerNodeKey: number): Promise<void> {
993 await new Promise<void>((resolve, reject) => {
994 if (this.workerNodes?.[workerNodeKey] == null) {
995 resolve()
996 return
997 }
998 const killMessageListener = (message: MessageValue<Response>): void => {
999 this.checkMessageWorkerId(message)
1000 if (message.kill === 'success') {
1001 resolve()
1002 } else if (message.kill === 'failure') {
1003 reject(
1004 new Error(
1005 `Kill message handling failed on worker ${
1006 message.workerId as number
1007 }`
1008 )
1009 )
1010 }
1011 }
1012 // FIXME: should be registered only once
1013 this.registerWorkerMessageListener(workerNodeKey, killMessageListener)
1014 this.sendToWorker(workerNodeKey, { kill: true })
1015 })
1016 }
1017
1018 /**
1019 * Terminates the worker node given its worker node key.
1020 *
1021 * @param workerNodeKey - The worker node key.
1022 */
1023 protected async destroyWorkerNode (workerNodeKey: number): Promise<void> {
1024 this.flagWorkerNodeAsNotReady(workerNodeKey)
1025 const flushedTasks = this.flushTasksQueue(workerNodeKey)
1026 const workerNode = this.workerNodes[workerNodeKey]
1027 await waitWorkerNodeEvents(
1028 workerNode,
1029 'taskFinished',
1030 flushedTasks,
1031 this.opts.tasksQueueOptions?.tasksFinishedTimeout ??
1032 getDefaultTasksQueueOptions(
1033 this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
1034 ).tasksFinishedTimeout
1035 )
1036 await this.sendKillMessageToWorker(workerNodeKey)
1037 await workerNode.terminate()
1038 }
1039
1040 /**
1041 * Setup hook to execute code before worker nodes are created in the abstract constructor.
1042 * Can be overridden.
1043 *
1044 * @virtual
1045 */
1046 protected setupHook (): void {
1047 /* Intentionally empty */
1048 }
1049
1050 /**
1051 * Should return whether the worker is the main worker or not.
1052 */
1053 protected abstract isMain (): boolean
1054
1055 /**
1056 * Hook executed before the worker task execution.
1057 * Can be overridden.
1058 *
1059 * @param workerNodeKey - The worker node key.
1060 * @param task - The task to execute.
1061 */
1062 protected beforeTaskExecutionHook (
1063 workerNodeKey: number,
1064 task: Task<Data>
1065 ): void {
1066 if (this.workerNodes[workerNodeKey]?.usage != null) {
1067 const workerUsage = this.workerNodes[workerNodeKey].usage
1068 ++workerUsage.tasks.executing
1069 updateWaitTimeWorkerUsage(
1070 this.workerChoiceStrategyContext,
1071 workerUsage,
1072 task
1073 )
1074 }
1075 if (
1076 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1077 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
1078 task.name as string
1079 ) != null
1080 ) {
1081 const taskFunctionWorkerUsage = this.workerNodes[
1082 workerNodeKey
1083 ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
1084 ++taskFunctionWorkerUsage.tasks.executing
1085 updateWaitTimeWorkerUsage(
1086 this.workerChoiceStrategyContext,
1087 taskFunctionWorkerUsage,
1088 task
1089 )
1090 }
1091 }
1092
1093 /**
1094 * Hook executed after the worker task execution.
1095 * Can be overridden.
1096 *
1097 * @param workerNodeKey - The worker node key.
1098 * @param message - The received message.
1099 */
1100 protected afterTaskExecutionHook (
1101 workerNodeKey: number,
1102 message: MessageValue<Response>
1103 ): void {
1104 let needWorkerChoiceStrategyUpdate = false
1105 if (this.workerNodes[workerNodeKey]?.usage != null) {
1106 const workerUsage = this.workerNodes[workerNodeKey].usage
1107 updateTaskStatisticsWorkerUsage(workerUsage, message)
1108 updateRunTimeWorkerUsage(
1109 this.workerChoiceStrategyContext,
1110 workerUsage,
1111 message
1112 )
1113 updateEluWorkerUsage(
1114 this.workerChoiceStrategyContext,
1115 workerUsage,
1116 message
1117 )
1118 needWorkerChoiceStrategyUpdate = true
1119 }
1120 if (
1121 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1122 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
1123 message.taskPerformance?.name as string
1124 ) != null
1125 ) {
1126 const taskFunctionWorkerUsage = this.workerNodes[
1127 workerNodeKey
1128 ].getTaskFunctionWorkerUsage(
1129 message.taskPerformance?.name as string
1130 ) as WorkerUsage
1131 updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
1132 updateRunTimeWorkerUsage(
1133 this.workerChoiceStrategyContext,
1134 taskFunctionWorkerUsage,
1135 message
1136 )
1137 updateEluWorkerUsage(
1138 this.workerChoiceStrategyContext,
1139 taskFunctionWorkerUsage,
1140 message
1141 )
1142 needWorkerChoiceStrategyUpdate = true
1143 }
1144 if (needWorkerChoiceStrategyUpdate) {
1145 this.workerChoiceStrategyContext.update(workerNodeKey)
1146 }
1147 }
1148
1149 /**
1150 * Whether the worker node shall update its task function worker usage or not.
1151 *
1152 * @param workerNodeKey - The worker node key.
1153 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1154 */
1155 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
1156 const workerInfo = this.getWorkerInfo(workerNodeKey)
1157 return (
1158 workerInfo != null &&
1159 Array.isArray(workerInfo.taskFunctionNames) &&
1160 workerInfo.taskFunctionNames.length > 2
1161 )
1162 }
1163
1164 /**
1165 * Chooses a worker node for the next task.
1166 *
1167 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
1168 *
1169 * @returns The chosen worker node key
1170 */
1171 private chooseWorkerNode (): number {
1172 if (this.shallCreateDynamicWorker()) {
1173 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
1174 if (
1175 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1176 ) {
1177 return workerNodeKey
1178 }
1179 }
1180 return this.workerChoiceStrategyContext.execute()
1181 }
1182
1183 /**
1184 * Conditions for dynamic worker creation.
1185 *
1186 * @returns Whether to create a dynamic worker or not.
1187 */
1188 protected abstract shallCreateDynamicWorker (): boolean
1189
1190 /**
1191 * Sends a message to worker given its worker node key.
1192 *
1193 * @param workerNodeKey - The worker node key.
1194 * @param message - The message.
1195 * @param transferList - The optional array of transferable objects.
1196 */
1197 protected abstract sendToWorker (
1198 workerNodeKey: number,
1199 message: MessageValue<Data>,
1200 transferList?: TransferListItem[]
1201 ): void
1202
1203 /**
1204 * Creates a new, completely set up worker node.
1205 *
1206 * @returns New, completely set up worker node key.
1207 */
1208 protected createAndSetupWorkerNode (): number {
1209 const workerNode = this.createWorkerNode()
1210 workerNode.registerWorkerEventHandler(
1211 'online',
1212 this.opts.onlineHandler ?? EMPTY_FUNCTION
1213 )
1214 workerNode.registerWorkerEventHandler(
1215 'message',
1216 this.opts.messageHandler ?? EMPTY_FUNCTION
1217 )
1218 workerNode.registerWorkerEventHandler(
1219 'error',
1220 this.opts.errorHandler ?? EMPTY_FUNCTION
1221 )
1222 workerNode.registerWorkerEventHandler('error', (error: Error) => {
1223 workerNode.info.ready = false
1224 this.emitter?.emit(PoolEvents.error, error)
1225 if (
1226 this.started &&
1227 !this.destroying &&
1228 this.opts.restartWorkerOnError === true
1229 ) {
1230 if (workerNode.info.dynamic) {
1231 this.createAndSetupDynamicWorkerNode()
1232 } else {
1233 this.createAndSetupWorkerNode()
1234 }
1235 }
1236 if (
1237 this.started &&
1238 !this.destroying &&
1239 this.opts.enableTasksQueue === true
1240 ) {
1241 this.redistributeQueuedTasks(this.workerNodes.indexOf(workerNode))
1242 }
1243 workerNode?.terminate().catch(error => {
1244 this.emitter?.emit(PoolEvents.error, error)
1245 })
1246 })
1247 workerNode.registerWorkerEventHandler(
1248 'exit',
1249 this.opts.exitHandler ?? EMPTY_FUNCTION
1250 )
1251 workerNode.registerOnceWorkerEventHandler('exit', () => {
1252 this.removeWorkerNode(workerNode)
1253 })
1254 const workerNodeKey = this.addWorkerNode(workerNode)
1255 this.afterWorkerNodeSetup(workerNodeKey)
1256 return workerNodeKey
1257 }
1258
1259 /**
1260 * Creates a new, completely set up dynamic worker node.
1261 *
1262 * @returns New, completely set up dynamic worker node key.
1263 */
1264 protected createAndSetupDynamicWorkerNode (): number {
1265 const workerNodeKey = this.createAndSetupWorkerNode()
1266 this.registerWorkerMessageListener(workerNodeKey, message => {
1267 this.checkMessageWorkerId(message)
1268 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1269 message.workerId
1270 )
1271 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
1272 // Kill message received from worker
1273 if (
1274 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1275 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
1276 ((this.opts.enableTasksQueue === false &&
1277 workerUsage.tasks.executing === 0) ||
1278 (this.opts.enableTasksQueue === true &&
1279 workerUsage.tasks.executing === 0 &&
1280 this.tasksQueueSize(localWorkerNodeKey) === 0)))
1281 ) {
1282 // Flag the worker node as not ready immediately
1283 this.flagWorkerNodeAsNotReady(localWorkerNodeKey)
1284 this.destroyWorkerNode(localWorkerNodeKey).catch(error => {
1285 this.emitter?.emit(PoolEvents.error, error)
1286 })
1287 }
1288 })
1289 const workerInfo = this.getWorkerInfo(workerNodeKey)
1290 this.sendToWorker(workerNodeKey, {
1291 checkActive: true
1292 })
1293 if (this.taskFunctions.size > 0) {
1294 for (const [taskFunctionName, taskFunction] of this.taskFunctions) {
1295 this.sendTaskFunctionOperationToWorker(workerNodeKey, {
1296 taskFunctionOperation: 'add',
1297 taskFunctionName,
1298 taskFunction: taskFunction.toString()
1299 }).catch(error => {
1300 this.emitter?.emit(PoolEvents.error, error)
1301 })
1302 }
1303 }
1304 workerInfo.dynamic = true
1305 if (
1306 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
1307 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1308 ) {
1309 workerInfo.ready = true
1310 }
1311 this.checkAndEmitDynamicWorkerCreationEvents()
1312 return workerNodeKey
1313 }
1314
1315 /**
1316 * Registers a listener callback on the worker given its worker node key.
1317 *
1318 * @param workerNodeKey - The worker node key.
1319 * @param listener - The message listener callback.
1320 */
1321 protected abstract registerWorkerMessageListener<
1322 Message extends Data | Response
1323 >(
1324 workerNodeKey: number,
1325 listener: (message: MessageValue<Message>) => void
1326 ): void
1327
1328 /**
1329 * Registers once a listener callback on the worker given its worker node key.
1330 *
1331 * @param workerNodeKey - The worker node key.
1332 * @param listener - The message listener callback.
1333 */
1334 protected abstract registerOnceWorkerMessageListener<
1335 Message extends Data | Response
1336 >(
1337 workerNodeKey: number,
1338 listener: (message: MessageValue<Message>) => void
1339 ): void
1340
1341 /**
1342 * Deregisters a listener callback on the worker given its worker node key.
1343 *
1344 * @param workerNodeKey - The worker node key.
1345 * @param listener - The message listener callback.
1346 */
1347 protected abstract deregisterWorkerMessageListener<
1348 Message extends Data | Response
1349 >(
1350 workerNodeKey: number,
1351 listener: (message: MessageValue<Message>) => void
1352 ): void
1353
1354 /**
1355 * Method hooked up after a worker node has been newly created.
1356 * Can be overridden.
1357 *
1358 * @param workerNodeKey - The newly created worker node key.
1359 */
1360 protected afterWorkerNodeSetup (workerNodeKey: number): void {
1361 // Listen to worker messages.
1362 this.registerWorkerMessageListener(
1363 workerNodeKey,
1364 this.workerMessageListener
1365 )
1366 // Send the startup message to worker.
1367 this.sendStartupMessageToWorker(workerNodeKey)
1368 // Send the statistics message to worker.
1369 this.sendStatisticsMessageToWorker(workerNodeKey)
1370 if (this.opts.enableTasksQueue === true) {
1371 if (this.opts.tasksQueueOptions?.taskStealing === true) {
1372 this.workerNodes[workerNodeKey].on(
1373 'idleWorkerNode',
1374 this.handleIdleWorkerNodeEvent
1375 )
1376 }
1377 if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
1378 this.workerNodes[workerNodeKey].on(
1379 'backPressure',
1380 this.handleBackPressureEvent
1381 )
1382 }
1383 }
1384 }
1385
1386 /**
1387 * Sends the startup message to worker given its worker node key.
1388 *
1389 * @param workerNodeKey - The worker node key.
1390 */
1391 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1392
1393 /**
1394 * Sends the statistics message to worker given its worker node key.
1395 *
1396 * @param workerNodeKey - The worker node key.
1397 */
1398 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
1399 this.sendToWorker(workerNodeKey, {
1400 statistics: {
1401 runTime:
1402 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1403 .runTime.aggregate,
1404 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1405 .elu.aggregate
1406 }
1407 })
1408 }
1409
1410 private cannotStealTask (): boolean {
1411 return this.workerNodes.length <= 1 || this.info.queuedTasks === 0
1412 }
1413
1414 private handleTask (workerNodeKey: number, task: Task<Data>): void {
1415 if (this.shallExecuteTask(workerNodeKey)) {
1416 this.executeTask(workerNodeKey, task)
1417 } else {
1418 this.enqueueTask(workerNodeKey, task)
1419 }
1420 }
1421
1422 private redistributeQueuedTasks (workerNodeKey: number): void {
1423 if (workerNodeKey === -1 || this.cannotStealTask()) {
1424 return
1425 }
1426 while (this.tasksQueueSize(workerNodeKey) > 0) {
1427 const destinationWorkerNodeKey = this.workerNodes.reduce(
1428 (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
1429 return workerNode.info.ready &&
1430 workerNode.usage.tasks.queued <
1431 workerNodes[minWorkerNodeKey].usage.tasks.queued
1432 ? workerNodeKey
1433 : minWorkerNodeKey
1434 },
1435 0
1436 )
1437 this.handleTask(
1438 destinationWorkerNodeKey,
1439 this.dequeueTask(workerNodeKey) as Task<Data>
1440 )
1441 }
1442 }
1443
1444 private updateTaskStolenStatisticsWorkerUsage (
1445 workerNodeKey: number,
1446 taskName: string
1447 ): void {
1448 const workerNode = this.workerNodes[workerNodeKey]
1449 if (workerNode?.usage != null) {
1450 ++workerNode.usage.tasks.stolen
1451 }
1452 if (
1453 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1454 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1455 ) {
1456 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1457 taskName
1458 ) as WorkerUsage
1459 ++taskFunctionWorkerUsage.tasks.stolen
1460 }
1461 }
1462
1463 private updateTaskSequentiallyStolenStatisticsWorkerUsage (
1464 workerNodeKey: number
1465 ): void {
1466 const workerNode = this.workerNodes[workerNodeKey]
1467 if (workerNode?.usage != null) {
1468 ++workerNode.usage.tasks.sequentiallyStolen
1469 }
1470 }
1471
1472 private updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1473 workerNodeKey: number,
1474 taskName: string
1475 ): void {
1476 const workerNode = this.workerNodes[workerNodeKey]
1477 if (
1478 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1479 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1480 ) {
1481 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1482 taskName
1483 ) as WorkerUsage
1484 ++taskFunctionWorkerUsage.tasks.sequentiallyStolen
1485 }
1486 }
1487
1488 private resetTaskSequentiallyStolenStatisticsWorkerUsage (
1489 workerNodeKey: number
1490 ): void {
1491 const workerNode = this.workerNodes[workerNodeKey]
1492 if (workerNode?.usage != null) {
1493 workerNode.usage.tasks.sequentiallyStolen = 0
1494 }
1495 }
1496
1497 private resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1498 workerNodeKey: number,
1499 taskName: string
1500 ): void {
1501 const workerNode = this.workerNodes[workerNodeKey]
1502 if (
1503 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1504 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1505 ) {
1506 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1507 taskName
1508 ) as WorkerUsage
1509 taskFunctionWorkerUsage.tasks.sequentiallyStolen = 0
1510 }
1511 }
1512
1513 private readonly handleIdleWorkerNodeEvent = (
1514 eventDetail: WorkerNodeEventDetail,
1515 previousStolenTask?: Task<Data>
1516 ): void => {
1517 const { workerNodeKey } = eventDetail
1518 if (workerNodeKey == null) {
1519 throw new Error(
1520 'WorkerNode event detail workerNodeKey property must be defined'
1521 )
1522 }
1523 if (
1524 this.cannotStealTask() ||
1525 (this.info.stealingWorkerNodes as number) >
1526 Math.floor(this.workerNodes.length / 2)
1527 ) {
1528 if (previousStolenTask != null) {
1529 this.getWorkerInfo(workerNodeKey).stealing = false
1530 }
1531 return
1532 }
1533 const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks
1534 if (
1535 previousStolenTask != null &&
1536 workerNodeTasksUsage.sequentiallyStolen > 0 &&
1537 (workerNodeTasksUsage.executing > 0 ||
1538 this.tasksQueueSize(workerNodeKey) > 0)
1539 ) {
1540 this.getWorkerInfo(workerNodeKey).stealing = false
1541 for (const taskName of this.workerNodes[workerNodeKey].info
1542 .taskFunctionNames as string[]) {
1543 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1544 workerNodeKey,
1545 taskName
1546 )
1547 }
1548 this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
1549 return
1550 }
1551 this.getWorkerInfo(workerNodeKey).stealing = true
1552 const stolenTask = this.workerNodeStealTask(workerNodeKey)
1553 if (
1554 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1555 stolenTask != null
1556 ) {
1557 const taskFunctionTasksWorkerUsage = this.workerNodes[
1558 workerNodeKey
1559 ].getTaskFunctionWorkerUsage(stolenTask.name as string)
1560 ?.tasks as TaskStatistics
1561 if (
1562 taskFunctionTasksWorkerUsage.sequentiallyStolen === 0 ||
1563 (previousStolenTask != null &&
1564 previousStolenTask.name === stolenTask.name &&
1565 taskFunctionTasksWorkerUsage.sequentiallyStolen > 0)
1566 ) {
1567 this.updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1568 workerNodeKey,
1569 stolenTask.name as string
1570 )
1571 } else {
1572 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1573 workerNodeKey,
1574 stolenTask.name as string
1575 )
1576 }
1577 }
1578 sleep(exponentialDelay(workerNodeTasksUsage.sequentiallyStolen))
1579 .then(() => {
1580 this.handleIdleWorkerNodeEvent(eventDetail, stolenTask)
1581 return undefined
1582 })
1583 .catch(EMPTY_FUNCTION)
1584 }
1585
1586 private readonly workerNodeStealTask = (
1587 workerNodeKey: number
1588 ): Task<Data> | undefined => {
1589 const workerNodes = this.workerNodes
1590 .slice()
1591 .sort(
1592 (workerNodeA, workerNodeB) =>
1593 workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
1594 )
1595 const sourceWorkerNode = workerNodes.find(
1596 (sourceWorkerNode, sourceWorkerNodeKey) =>
1597 sourceWorkerNode.info.ready &&
1598 !sourceWorkerNode.info.stealing &&
1599 sourceWorkerNodeKey !== workerNodeKey &&
1600 sourceWorkerNode.usage.tasks.queued > 0
1601 )
1602 if (sourceWorkerNode != null) {
1603 const task = sourceWorkerNode.popTask() as Task<Data>
1604 this.handleTask(workerNodeKey, task)
1605 this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
1606 this.updateTaskStolenStatisticsWorkerUsage(
1607 workerNodeKey,
1608 task.name as string
1609 )
1610 return task
1611 }
1612 }
1613
1614 private readonly handleBackPressureEvent = (
1615 eventDetail: WorkerNodeEventDetail
1616 ): void => {
1617 if (
1618 this.cannotStealTask() ||
1619 (this.info.stealingWorkerNodes as number) >
1620 Math.floor(this.workerNodes.length / 2)
1621 ) {
1622 return
1623 }
1624 const { workerId } = eventDetail
1625 const sizeOffset = 1
1626 if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) {
1627 return
1628 }
1629 const sourceWorkerNode =
1630 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
1631 const workerNodes = this.workerNodes
1632 .slice()
1633 .sort(
1634 (workerNodeA, workerNodeB) =>
1635 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1636 )
1637 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1638 if (
1639 sourceWorkerNode.usage.tasks.queued > 0 &&
1640 workerNode.info.ready &&
1641 !workerNode.info.stealing &&
1642 workerNode.info.id !== workerId &&
1643 workerNode.usage.tasks.queued <
1644 (this.opts.tasksQueueOptions?.size as number) - sizeOffset
1645 ) {
1646 this.getWorkerInfo(workerNodeKey).stealing = true
1647 const task = sourceWorkerNode.popTask() as Task<Data>
1648 this.handleTask(workerNodeKey, task)
1649 this.updateTaskStolenStatisticsWorkerUsage(
1650 workerNodeKey,
1651 task.name as string
1652 )
1653 this.getWorkerInfo(workerNodeKey).stealing = false
1654 }
1655 }
1656 }
1657
1658 /**
1659 * This method is the message listener registered on each worker.
1660 */
1661 protected readonly workerMessageListener = (
1662 message: MessageValue<Response>
1663 ): void => {
1664 this.checkMessageWorkerId(message)
1665 const { workerId, ready, taskId, taskFunctionNames } = message
1666 if (ready != null && taskFunctionNames != null) {
1667 // Worker ready response received from worker
1668 this.handleWorkerReadyResponse(message)
1669 } else if (taskId != null) {
1670 // Task execution response received from worker
1671 this.handleTaskExecutionResponse(message)
1672 } else if (taskFunctionNames != null) {
1673 // Task function names message received from worker
1674 this.getWorkerInfo(
1675 this.getWorkerNodeKeyByWorkerId(workerId)
1676 ).taskFunctionNames = taskFunctionNames
1677 }
1678 }
1679
1680 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
1681 const { workerId, ready, taskFunctionNames } = message
1682 if (ready === false) {
1683 throw new Error(`Worker ${workerId as number} failed to initialize`)
1684 }
1685 const workerInfo = this.getWorkerInfo(
1686 this.getWorkerNodeKeyByWorkerId(workerId)
1687 )
1688 workerInfo.ready = ready as boolean
1689 workerInfo.taskFunctionNames = taskFunctionNames
1690 if (!this.readyEventEmitted && this.ready) {
1691 this.readyEventEmitted = true
1692 this.emitter?.emit(PoolEvents.ready, this.info)
1693 }
1694 }
1695
1696 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1697 const { workerId, taskId, workerError, data } = message
1698 const promiseResponse = this.promiseResponseMap.get(taskId as string)
1699 if (promiseResponse != null) {
1700 const { resolve, reject, workerNodeKey, asyncResource } = promiseResponse
1701 const workerNode = this.workerNodes[workerNodeKey]
1702 if (workerError != null) {
1703 this.emitter?.emit(PoolEvents.taskError, workerError)
1704 asyncResource != null
1705 ? asyncResource.runInAsyncScope(
1706 reject,
1707 this.emitter,
1708 workerError.message
1709 )
1710 : reject(workerError.message)
1711 } else {
1712 asyncResource != null
1713 ? asyncResource.runInAsyncScope(resolve, this.emitter, data)
1714 : resolve(data as Response)
1715 }
1716 asyncResource?.emitDestroy()
1717 this.afterTaskExecutionHook(workerNodeKey, message)
1718 this.promiseResponseMap.delete(taskId as string)
1719 workerNode?.emit('taskFinished', taskId)
1720 if (this.opts.enableTasksQueue === true && !this.destroying) {
1721 const workerNodeTasksUsage = workerNode.usage.tasks
1722 if (
1723 this.tasksQueueSize(workerNodeKey) > 0 &&
1724 workerNodeTasksUsage.executing <
1725 (this.opts.tasksQueueOptions?.concurrency as number)
1726 ) {
1727 this.executeTask(
1728 workerNodeKey,
1729 this.dequeueTask(workerNodeKey) as Task<Data>
1730 )
1731 }
1732 if (
1733 workerNodeTasksUsage.executing === 0 &&
1734 this.tasksQueueSize(workerNodeKey) === 0 &&
1735 workerNodeTasksUsage.sequentiallyStolen === 0
1736 ) {
1737 workerNode.emit('idleWorkerNode', {
1738 workerId: workerId as number,
1739 workerNodeKey
1740 })
1741 }
1742 }
1743 }
1744 }
1745
1746 private checkAndEmitTaskExecutionEvents (): void {
1747 if (this.busy) {
1748 this.emitter?.emit(PoolEvents.busy, this.info)
1749 }
1750 }
1751
1752 private checkAndEmitTaskQueuingEvents (): void {
1753 if (this.hasBackPressure()) {
1754 this.emitter?.emit(PoolEvents.backPressure, this.info)
1755 }
1756 }
1757
1758 /**
1759 * Emits dynamic worker creation events.
1760 */
1761 protected abstract checkAndEmitDynamicWorkerCreationEvents (): void
1762
1763 /**
1764 * Gets the worker information given its worker node key.
1765 *
1766 * @param workerNodeKey - The worker node key.
1767 * @returns The worker information.
1768 */
1769 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
1770 return this.workerNodes[workerNodeKey]?.info
1771 }
1772
1773 /**
1774 * Creates a worker node.
1775 *
1776 * @returns The created worker node.
1777 */
1778 private createWorkerNode (): IWorkerNode<Worker, Data> {
1779 const workerNode = new WorkerNode<Worker, Data>(
1780 this.worker,
1781 this.filePath,
1782 {
1783 env: this.opts.env,
1784 workerOptions: this.opts.workerOptions,
1785 tasksQueueBackPressureSize:
1786 this.opts.tasksQueueOptions?.size ??
1787 getDefaultTasksQueueOptions(
1788 this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
1789 ).size
1790 }
1791 )
1792 // Flag the worker node as ready at pool startup.
1793 if (this.starting) {
1794 workerNode.info.ready = true
1795 }
1796 return workerNode
1797 }
1798
1799 /**
1800 * Adds the given worker node in the pool worker nodes.
1801 *
1802 * @param workerNode - The worker node.
1803 * @returns The added worker node key.
1804 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1805 */
1806 private addWorkerNode (workerNode: IWorkerNode<Worker, Data>): number {
1807 this.workerNodes.push(workerNode)
1808 const workerNodeKey = this.workerNodes.indexOf(workerNode)
1809 if (workerNodeKey === -1) {
1810 throw new Error('Worker added not found in worker nodes')
1811 }
1812 return workerNodeKey
1813 }
1814
1815 /**
1816 * Removes the worker node from the pool worker nodes.
1817 *
1818 * @param workerNode - The worker node.
1819 */
1820 private removeWorkerNode (workerNode: IWorkerNode<Worker, Data>): void {
1821 const workerNodeKey = this.workerNodes.indexOf(workerNode)
1822 if (workerNodeKey !== -1) {
1823 this.workerNodes.splice(workerNodeKey, 1)
1824 this.workerChoiceStrategyContext.remove(workerNodeKey)
1825 }
1826 }
1827
1828 protected flagWorkerNodeAsNotReady (workerNodeKey: number): void {
1829 this.getWorkerInfo(workerNodeKey).ready = false
1830 }
1831
1832 /** @inheritDoc */
1833 public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
1834 return (
1835 this.opts.enableTasksQueue === true &&
1836 this.workerNodes[workerNodeKey].hasBackPressure()
1837 )
1838 }
1839
1840 private hasBackPressure (): boolean {
1841 return (
1842 this.opts.enableTasksQueue === true &&
1843 this.workerNodes.findIndex(
1844 workerNode => !workerNode.hasBackPressure()
1845 ) === -1
1846 )
1847 }
1848
1849 /**
1850 * Executes the given task on the worker given its worker node key.
1851 *
1852 * @param workerNodeKey - The worker node key.
1853 * @param task - The task to execute.
1854 */
1855 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1856 this.beforeTaskExecutionHook(workerNodeKey, task)
1857 this.sendToWorker(workerNodeKey, task, task.transferList)
1858 this.checkAndEmitTaskExecutionEvents()
1859 }
1860
1861 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1862 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1863 this.checkAndEmitTaskQueuingEvents()
1864 return tasksQueueSize
1865 }
1866
1867 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1868 return this.workerNodes[workerNodeKey].dequeueTask()
1869 }
1870
1871 private tasksQueueSize (workerNodeKey: number): number {
1872 return this.workerNodes[workerNodeKey].tasksQueueSize()
1873 }
1874
1875 protected flushTasksQueue (workerNodeKey: number): number {
1876 let flushedTasks = 0
1877 while (this.tasksQueueSize(workerNodeKey) > 0) {
1878 this.executeTask(
1879 workerNodeKey,
1880 this.dequeueTask(workerNodeKey) as Task<Data>
1881 )
1882 ++flushedTasks
1883 }
1884 this.workerNodes[workerNodeKey].clearTasksQueue()
1885 return flushedTasks
1886 }
1887
1888 private flushTasksQueues (): void {
1889 for (const [workerNodeKey] of this.workerNodes.entries()) {
1890 this.flushTasksQueue(workerNodeKey)
1891 }
1892 }
1893 }