perf: avoid branching on pool type
[poolifier.git] / src / pools / abstract-pool.ts
1 import { randomUUID } from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import type { TransferListItem } from 'node:worker_threads'
4 import { EventEmitterAsyncResource } from 'node:events'
5 import { AsyncResource } from 'node:async_hooks'
6 import type {
7 MessageValue,
8 PromiseResponseWrapper,
9 Task
10 } from '../utility-types'
11 import {
12 DEFAULT_TASK_NAME,
13 EMPTY_FUNCTION,
14 average,
15 exponentialDelay,
16 isKillBehavior,
17 isPlainObject,
18 max,
19 median,
20 min,
21 round,
22 sleep
23 } from '../utils'
24 import { KillBehaviors } from '../worker/worker-options'
25 import type { TaskFunction } from '../worker/task-functions'
26 import {
27 type IPool,
28 PoolEvents,
29 type PoolInfo,
30 type PoolOptions,
31 type PoolType,
32 PoolTypes,
33 type TasksQueueOptions
34 } from './pool'
35 import type {
36 IWorker,
37 IWorkerNode,
38 TaskStatistics,
39 WorkerInfo,
40 WorkerNodeEventDetail,
41 WorkerType,
42 WorkerUsage
43 } from './worker'
44 import {
45 Measurements,
46 WorkerChoiceStrategies,
47 type WorkerChoiceStrategy,
48 type WorkerChoiceStrategyOptions
49 } from './selection-strategies/selection-strategies-types'
50 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
51 import { version } from './version'
52 import { WorkerNode } from './worker-node'
53 import {
54 checkFilePath,
55 checkValidTasksQueueOptions,
56 checkValidWorkerChoiceStrategy,
57 getDefaultTasksQueueOptions,
58 updateEluWorkerUsage,
59 updateRunTimeWorkerUsage,
60 updateTaskStatisticsWorkerUsage,
61 updateWaitTimeWorkerUsage,
62 waitWorkerNodeEvents
63 } from './utils'
64
65 /**
66 * Base class that implements some shared logic for all poolifier pools.
67 *
68 * @typeParam Worker - Type of worker which manages this pool.
69 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
70 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
71 */
72 export abstract class AbstractPool<
73 Worker extends IWorker,
74 Data = unknown,
75 Response = unknown
76 > implements IPool<Worker, Data, Response> {
77 /** @inheritDoc */
78 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
79
80 /** @inheritDoc */
81 public emitter?: EventEmitterAsyncResource
82
83 /**
84 * The task execution response promise map:
85 * - `key`: The message id of each submitted task.
86 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
87 *
88 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
89 */
90 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
91 new Map<string, PromiseResponseWrapper<Response>>()
92
93 /**
94 * Worker choice strategy context referencing a worker choice algorithm implementation.
95 */
96 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
97 Worker,
98 Data,
99 Response
100 >
101
102 /**
103 * The task functions added at runtime map:
104 * - `key`: The task function name.
105 * - `value`: The task function itself.
106 */
107 private readonly taskFunctions: Map<string, TaskFunction<Data, Response>>
108
109 /**
110 * Whether the pool is started or not.
111 */
112 private started: boolean
113 /**
114 * Whether the pool is starting or not.
115 */
116 private starting: boolean
117 /**
118 * Whether the pool is destroying or not.
119 */
120 private destroying: boolean
121 /**
122 * Whether the pool ready event has been emitted or not.
123 */
124 private readyEventEmitted: boolean
125 /**
126 * The start timestamp of the pool.
127 */
128 private readonly startTimestamp
129
130 /**
131 * Constructs a new poolifier pool.
132 *
133 * @param minimumNumberOfWorkers - Minimum number of workers that this pool manages.
134 * @param filePath - Path to the worker file.
135 * @param opts - Options for the pool.
136 * @param maximumNumberOfWorkers - Maximum number of workers that this pool manages.
137 */
138 public constructor (
139 protected readonly minimumNumberOfWorkers: number,
140 protected readonly filePath: string,
141 protected readonly opts: PoolOptions<Worker>,
142 protected readonly maximumNumberOfWorkers?: number
143 ) {
144 if (!this.isMain()) {
145 throw new Error(
146 'Cannot start a pool from a worker with the same type as the pool'
147 )
148 }
149 this.checkPoolType()
150 checkFilePath(this.filePath)
151 this.checkMinimumNumberOfWorkers(this.minimumNumberOfWorkers)
152 this.checkPoolOptions(this.opts)
153
154 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
155 this.executeTask = this.executeTask.bind(this)
156 this.enqueueTask = this.enqueueTask.bind(this)
157
158 if (this.opts.enableEvents === true) {
159 this.initializeEventEmitter()
160 }
161 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
162 Worker,
163 Data,
164 Response
165 >(
166 this,
167 this.opts.workerChoiceStrategy,
168 this.opts.workerChoiceStrategyOptions
169 )
170
171 this.setupHook()
172
173 this.taskFunctions = new Map<string, TaskFunction<Data, Response>>()
174
175 this.started = false
176 this.starting = false
177 this.destroying = false
178 this.readyEventEmitted = false
179 if (this.opts.startWorkers === true) {
180 this.start()
181 }
182
183 this.startTimestamp = performance.now()
184 }
185
186 private checkPoolType (): void {
187 if (this.type === PoolTypes.fixed && this.maximumNumberOfWorkers != null) {
188 throw new Error(
189 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
190 )
191 }
192 }
193
194 private checkMinimumNumberOfWorkers (minimumNumberOfWorkers: number): void {
195 if (minimumNumberOfWorkers == null) {
196 throw new Error(
197 'Cannot instantiate a pool without specifying the number of workers'
198 )
199 } else if (!Number.isSafeInteger(minimumNumberOfWorkers)) {
200 throw new TypeError(
201 'Cannot instantiate a pool with a non safe integer number of workers'
202 )
203 } else if (minimumNumberOfWorkers < 0) {
204 throw new RangeError(
205 'Cannot instantiate a pool with a negative number of workers'
206 )
207 } else if (this.type === PoolTypes.fixed && minimumNumberOfWorkers === 0) {
208 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
209 }
210 }
211
212 private checkPoolOptions (opts: PoolOptions<Worker>): void {
213 if (isPlainObject(opts)) {
214 this.opts.startWorkers = opts.startWorkers ?? true
215 checkValidWorkerChoiceStrategy(
216 opts.workerChoiceStrategy as WorkerChoiceStrategy
217 )
218 this.opts.workerChoiceStrategy =
219 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
220 this.checkValidWorkerChoiceStrategyOptions(
221 opts.workerChoiceStrategyOptions as WorkerChoiceStrategyOptions
222 )
223 if (opts.workerChoiceStrategyOptions != null) {
224 this.opts.workerChoiceStrategyOptions = opts.workerChoiceStrategyOptions
225 }
226 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
227 this.opts.enableEvents = opts.enableEvents ?? true
228 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
229 if (this.opts.enableTasksQueue) {
230 checkValidTasksQueueOptions(opts.tasksQueueOptions as TasksQueueOptions)
231 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
232 opts.tasksQueueOptions as TasksQueueOptions
233 )
234 }
235 } else {
236 throw new TypeError('Invalid pool options: must be a plain object')
237 }
238 }
239
240 private checkValidWorkerChoiceStrategyOptions (
241 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
242 ): void {
243 if (
244 workerChoiceStrategyOptions != null &&
245 !isPlainObject(workerChoiceStrategyOptions)
246 ) {
247 throw new TypeError(
248 'Invalid worker choice strategy options: must be a plain object'
249 )
250 }
251 if (
252 workerChoiceStrategyOptions?.weights != null &&
253 Object.keys(workerChoiceStrategyOptions.weights).length !==
254 (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
255 ) {
256 throw new Error(
257 'Invalid worker choice strategy options: must have a weight for each worker node'
258 )
259 }
260 if (
261 workerChoiceStrategyOptions?.measurement != null &&
262 !Object.values(Measurements).includes(
263 workerChoiceStrategyOptions.measurement
264 )
265 ) {
266 throw new Error(
267 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
268 )
269 }
270 }
271
272 private initializeEventEmitter (): void {
273 this.emitter = new EventEmitterAsyncResource({
274 name: `poolifier:${this.type}-${this.worker}-pool`
275 })
276 }
277
278 /** @inheritDoc */
279 public get info (): PoolInfo {
280 return {
281 version,
282 type: this.type,
283 worker: this.worker,
284 started: this.started,
285 ready: this.ready,
286 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
287 minSize: this.minimumNumberOfWorkers,
288 maxSize: this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers,
289 ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
290 .runTime.aggregate &&
291 this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
292 .waitTime.aggregate && { utilization: round(this.utilization) }),
293 workerNodes: this.workerNodes.length,
294 idleWorkerNodes: this.workerNodes.reduce(
295 (accumulator, workerNode) =>
296 workerNode.usage.tasks.executing === 0
297 ? accumulator + 1
298 : accumulator,
299 0
300 ),
301 busyWorkerNodes: this.workerNodes.reduce(
302 (accumulator, _workerNode, workerNodeKey) =>
303 this.isWorkerNodeBusy(workerNodeKey) ? accumulator + 1 : accumulator,
304 0
305 ),
306 executedTasks: this.workerNodes.reduce(
307 (accumulator, workerNode) =>
308 accumulator + workerNode.usage.tasks.executed,
309 0
310 ),
311 executingTasks: this.workerNodes.reduce(
312 (accumulator, workerNode) =>
313 accumulator + workerNode.usage.tasks.executing,
314 0
315 ),
316 ...(this.opts.enableTasksQueue === true && {
317 queuedTasks: this.workerNodes.reduce(
318 (accumulator, workerNode) =>
319 accumulator + workerNode.usage.tasks.queued,
320 0
321 )
322 }),
323 ...(this.opts.enableTasksQueue === true && {
324 maxQueuedTasks: this.workerNodes.reduce(
325 (accumulator, workerNode) =>
326 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
327 0
328 )
329 }),
330 ...(this.opts.enableTasksQueue === true && {
331 backPressure: this.hasBackPressure()
332 }),
333 ...(this.opts.enableTasksQueue === true && {
334 stolenTasks: this.workerNodes.reduce(
335 (accumulator, workerNode) =>
336 accumulator + workerNode.usage.tasks.stolen,
337 0
338 )
339 }),
340 failedTasks: this.workerNodes.reduce(
341 (accumulator, workerNode) =>
342 accumulator + workerNode.usage.tasks.failed,
343 0
344 ),
345 ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
346 .runTime.aggregate && {
347 runTime: {
348 minimum: round(
349 min(
350 ...this.workerNodes.map(
351 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
352 )
353 )
354 ),
355 maximum: round(
356 max(
357 ...this.workerNodes.map(
358 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
359 )
360 )
361 ),
362 ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
363 .runTime.average && {
364 average: round(
365 average(
366 this.workerNodes.reduce<number[]>(
367 (accumulator, workerNode) =>
368 accumulator.concat(workerNode.usage.runTime.history),
369 []
370 )
371 )
372 )
373 }),
374 ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
375 .runTime.median && {
376 median: round(
377 median(
378 this.workerNodes.reduce<number[]>(
379 (accumulator, workerNode) =>
380 accumulator.concat(workerNode.usage.runTime.history),
381 []
382 )
383 )
384 )
385 })
386 }
387 }),
388 ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
389 .waitTime.aggregate && {
390 waitTime: {
391 minimum: round(
392 min(
393 ...this.workerNodes.map(
394 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
395 )
396 )
397 ),
398 maximum: round(
399 max(
400 ...this.workerNodes.map(
401 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
402 )
403 )
404 ),
405 ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
406 .waitTime.average && {
407 average: round(
408 average(
409 this.workerNodes.reduce<number[]>(
410 (accumulator, workerNode) =>
411 accumulator.concat(workerNode.usage.waitTime.history),
412 []
413 )
414 )
415 )
416 }),
417 ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
418 .waitTime.median && {
419 median: round(
420 median(
421 this.workerNodes.reduce<number[]>(
422 (accumulator, workerNode) =>
423 accumulator.concat(workerNode.usage.waitTime.history),
424 []
425 )
426 )
427 )
428 })
429 }
430 })
431 }
432 }
433
434 /**
435 * The pool readiness boolean status.
436 */
437 private get ready (): boolean {
438 return (
439 this.workerNodes.reduce(
440 (accumulator, workerNode) =>
441 !workerNode.info.dynamic && workerNode.info.ready
442 ? accumulator + 1
443 : accumulator,
444 0
445 ) >= this.minimumNumberOfWorkers
446 )
447 }
448
449 /**
450 * The approximate pool utilization.
451 *
452 * @returns The pool utilization.
453 */
454 private get utilization (): number {
455 const poolTimeCapacity =
456 (performance.now() - this.startTimestamp) *
457 (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
458 const totalTasksRunTime = this.workerNodes.reduce(
459 (accumulator, workerNode) =>
460 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
461 0
462 )
463 const totalTasksWaitTime = this.workerNodes.reduce(
464 (accumulator, workerNode) =>
465 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
466 0
467 )
468 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
469 }
470
471 /**
472 * The pool type.
473 *
474 * If it is `'dynamic'`, it provides the `max` property.
475 */
476 protected abstract get type (): PoolType
477
478 /**
479 * The worker type.
480 */
481 protected abstract get worker (): WorkerType
482
483 /**
484 * Checks if the worker id sent in the received message from a worker is valid.
485 *
486 * @param message - The received message.
487 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
488 */
489 private checkMessageWorkerId (message: MessageValue<Data | Response>): void {
490 if (message.workerId == null) {
491 throw new Error('Worker message received without worker id')
492 } else if (this.getWorkerNodeKeyByWorkerId(message.workerId) === -1) {
493 throw new Error(
494 `Worker message received from unknown worker '${message.workerId}'`
495 )
496 }
497 }
498
499 /**
500 * Gets the worker node key given its worker id.
501 *
502 * @param workerId - The worker id.
503 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
504 */
505 private getWorkerNodeKeyByWorkerId (workerId: number | undefined): number {
506 return this.workerNodes.findIndex(
507 workerNode => workerNode.info.id === workerId
508 )
509 }
510
511 /** @inheritDoc */
512 public setWorkerChoiceStrategy (
513 workerChoiceStrategy: WorkerChoiceStrategy,
514 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
515 ): void {
516 checkValidWorkerChoiceStrategy(workerChoiceStrategy)
517 this.opts.workerChoiceStrategy = workerChoiceStrategy
518 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
519 this.opts.workerChoiceStrategy
520 )
521 if (workerChoiceStrategyOptions != null) {
522 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
523 }
524 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
525 workerNode.resetUsage()
526 this.sendStatisticsMessageToWorker(workerNodeKey)
527 }
528 }
529
530 /** @inheritDoc */
531 public setWorkerChoiceStrategyOptions (
532 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
533 ): void {
534 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
535 if (workerChoiceStrategyOptions != null) {
536 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
537 }
538 this.workerChoiceStrategyContext.setOptions(
539 this,
540 this.opts.workerChoiceStrategyOptions
541 )
542 }
543
544 /** @inheritDoc */
545 public enableTasksQueue (
546 enable: boolean,
547 tasksQueueOptions?: TasksQueueOptions
548 ): void {
549 if (this.opts.enableTasksQueue === true && !enable) {
550 this.unsetTaskStealing()
551 this.unsetTasksStealingOnBackPressure()
552 this.flushTasksQueues()
553 }
554 this.opts.enableTasksQueue = enable
555 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
556 }
557
558 /** @inheritDoc */
559 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
560 if (this.opts.enableTasksQueue === true) {
561 checkValidTasksQueueOptions(tasksQueueOptions)
562 this.opts.tasksQueueOptions =
563 this.buildTasksQueueOptions(tasksQueueOptions)
564 this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number)
565 if (this.opts.tasksQueueOptions.taskStealing === true) {
566 this.unsetTaskStealing()
567 this.setTaskStealing()
568 } else {
569 this.unsetTaskStealing()
570 }
571 if (this.opts.tasksQueueOptions.tasksStealingOnBackPressure === true) {
572 this.unsetTasksStealingOnBackPressure()
573 this.setTasksStealingOnBackPressure()
574 } else {
575 this.unsetTasksStealingOnBackPressure()
576 }
577 } else if (this.opts.tasksQueueOptions != null) {
578 delete this.opts.tasksQueueOptions
579 }
580 }
581
582 private buildTasksQueueOptions (
583 tasksQueueOptions: TasksQueueOptions
584 ): TasksQueueOptions {
585 return {
586 ...getDefaultTasksQueueOptions(
587 this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
588 ),
589 ...tasksQueueOptions
590 }
591 }
592
593 private setTasksQueueSize (size: number): void {
594 for (const workerNode of this.workerNodes) {
595 workerNode.tasksQueueBackPressureSize = size
596 }
597 }
598
599 private setTaskStealing (): void {
600 for (const [workerNodeKey] of this.workerNodes.entries()) {
601 this.workerNodes[workerNodeKey].on(
602 'idleWorkerNode',
603 this.handleIdleWorkerNodeEvent
604 )
605 }
606 }
607
608 private unsetTaskStealing (): void {
609 for (const [workerNodeKey] of this.workerNodes.entries()) {
610 this.workerNodes[workerNodeKey].off(
611 'idleWorkerNode',
612 this.handleIdleWorkerNodeEvent
613 )
614 }
615 }
616
617 private setTasksStealingOnBackPressure (): void {
618 for (const [workerNodeKey] of this.workerNodes.entries()) {
619 this.workerNodes[workerNodeKey].on(
620 'backPressure',
621 this.handleBackPressureEvent
622 )
623 }
624 }
625
626 private unsetTasksStealingOnBackPressure (): void {
627 for (const [workerNodeKey] of this.workerNodes.entries()) {
628 this.workerNodes[workerNodeKey].off(
629 'backPressure',
630 this.handleBackPressureEvent
631 )
632 }
633 }
634
635 /**
636 * Whether the pool is full or not.
637 *
638 * The pool filling boolean status.
639 */
640 protected get full (): boolean {
641 return (
642 this.workerNodes.length >=
643 (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
644 )
645 }
646
647 /**
648 * Whether the pool is busy or not.
649 *
650 * The pool busyness boolean status.
651 */
652 protected abstract get busy (): boolean
653
654 /**
655 * Whether worker nodes are executing concurrently their tasks quota or not.
656 *
657 * @returns Worker nodes busyness boolean status.
658 */
659 protected internalBusy (): boolean {
660 if (this.opts.enableTasksQueue === true) {
661 return (
662 this.workerNodes.findIndex(
663 workerNode =>
664 workerNode.info.ready &&
665 workerNode.usage.tasks.executing <
666 (this.opts.tasksQueueOptions?.concurrency as number)
667 ) === -1
668 )
669 }
670 return (
671 this.workerNodes.findIndex(
672 workerNode =>
673 workerNode.info.ready && workerNode.usage.tasks.executing === 0
674 ) === -1
675 )
676 }
677
678 private isWorkerNodeBusy (workerNodeKey: number): boolean {
679 if (this.opts.enableTasksQueue === true) {
680 return (
681 this.workerNodes[workerNodeKey].usage.tasks.executing >=
682 (this.opts.tasksQueueOptions?.concurrency as number)
683 )
684 }
685 return this.workerNodes[workerNodeKey].usage.tasks.executing > 0
686 }
687
688 private async sendTaskFunctionOperationToWorker (
689 workerNodeKey: number,
690 message: MessageValue<Data>
691 ): Promise<boolean> {
692 return await new Promise<boolean>((resolve, reject) => {
693 const taskFunctionOperationListener = (
694 message: MessageValue<Response>
695 ): void => {
696 this.checkMessageWorkerId(message)
697 const workerId = this.getWorkerInfo(workerNodeKey).id as number
698 if (
699 message.taskFunctionOperationStatus != null &&
700 message.workerId === workerId
701 ) {
702 if (message.taskFunctionOperationStatus) {
703 resolve(true)
704 } else if (!message.taskFunctionOperationStatus) {
705 reject(
706 new Error(
707 `Task function operation '${
708 message.taskFunctionOperation as string
709 }' failed on worker ${message.workerId} with error: '${
710 message.workerError?.message as string
711 }'`
712 )
713 )
714 }
715 this.deregisterWorkerMessageListener(
716 this.getWorkerNodeKeyByWorkerId(message.workerId),
717 taskFunctionOperationListener
718 )
719 }
720 }
721 this.registerWorkerMessageListener(
722 workerNodeKey,
723 taskFunctionOperationListener
724 )
725 this.sendToWorker(workerNodeKey, message)
726 })
727 }
728
729 private async sendTaskFunctionOperationToWorkers (
730 message: MessageValue<Data>
731 ): Promise<boolean> {
732 return await new Promise<boolean>((resolve, reject) => {
733 const responsesReceived = new Array<MessageValue<Response>>()
734 const taskFunctionOperationsListener = (
735 message: MessageValue<Response>
736 ): void => {
737 this.checkMessageWorkerId(message)
738 if (message.taskFunctionOperationStatus != null) {
739 responsesReceived.push(message)
740 if (responsesReceived.length === this.workerNodes.length) {
741 if (
742 responsesReceived.every(
743 message => message.taskFunctionOperationStatus === true
744 )
745 ) {
746 resolve(true)
747 } else if (
748 responsesReceived.some(
749 message => message.taskFunctionOperationStatus === false
750 )
751 ) {
752 const errorResponse = responsesReceived.find(
753 response => response.taskFunctionOperationStatus === false
754 )
755 reject(
756 new Error(
757 `Task function operation '${
758 message.taskFunctionOperation as string
759 }' failed on worker ${
760 errorResponse?.workerId as number
761 } with error: '${
762 errorResponse?.workerError?.message as string
763 }'`
764 )
765 )
766 }
767 this.deregisterWorkerMessageListener(
768 this.getWorkerNodeKeyByWorkerId(message.workerId),
769 taskFunctionOperationsListener
770 )
771 }
772 }
773 }
774 for (const [workerNodeKey] of this.workerNodes.entries()) {
775 this.registerWorkerMessageListener(
776 workerNodeKey,
777 taskFunctionOperationsListener
778 )
779 this.sendToWorker(workerNodeKey, message)
780 }
781 })
782 }
783
784 /** @inheritDoc */
785 public hasTaskFunction (name: string): boolean {
786 for (const workerNode of this.workerNodes) {
787 if (
788 Array.isArray(workerNode.info.taskFunctionNames) &&
789 workerNode.info.taskFunctionNames.includes(name)
790 ) {
791 return true
792 }
793 }
794 return false
795 }
796
797 /** @inheritDoc */
798 public async addTaskFunction (
799 name: string,
800 fn: TaskFunction<Data, Response>
801 ): Promise<boolean> {
802 if (typeof name !== 'string') {
803 throw new TypeError('name argument must be a string')
804 }
805 if (typeof name === 'string' && name.trim().length === 0) {
806 throw new TypeError('name argument must not be an empty string')
807 }
808 if (typeof fn !== 'function') {
809 throw new TypeError('fn argument must be a function')
810 }
811 const opResult = await this.sendTaskFunctionOperationToWorkers({
812 taskFunctionOperation: 'add',
813 taskFunctionName: name,
814 taskFunction: fn.toString()
815 })
816 this.taskFunctions.set(name, fn)
817 return opResult
818 }
819
820 /** @inheritDoc */
821 public async removeTaskFunction (name: string): Promise<boolean> {
822 if (!this.taskFunctions.has(name)) {
823 throw new Error(
824 'Cannot remove a task function not handled on the pool side'
825 )
826 }
827 const opResult = await this.sendTaskFunctionOperationToWorkers({
828 taskFunctionOperation: 'remove',
829 taskFunctionName: name
830 })
831 this.deleteTaskFunctionWorkerUsages(name)
832 this.taskFunctions.delete(name)
833 return opResult
834 }
835
836 /** @inheritDoc */
837 public listTaskFunctionNames (): string[] {
838 for (const workerNode of this.workerNodes) {
839 if (
840 Array.isArray(workerNode.info.taskFunctionNames) &&
841 workerNode.info.taskFunctionNames.length > 0
842 ) {
843 return workerNode.info.taskFunctionNames
844 }
845 }
846 return []
847 }
848
849 /** @inheritDoc */
850 public async setDefaultTaskFunction (name: string): Promise<boolean> {
851 return await this.sendTaskFunctionOperationToWorkers({
852 taskFunctionOperation: 'default',
853 taskFunctionName: name
854 })
855 }
856
857 private deleteTaskFunctionWorkerUsages (name: string): void {
858 for (const workerNode of this.workerNodes) {
859 workerNode.deleteTaskFunctionWorkerUsage(name)
860 }
861 }
862
863 private shallExecuteTask (workerNodeKey: number): boolean {
864 return (
865 this.tasksQueueSize(workerNodeKey) === 0 &&
866 this.workerNodes[workerNodeKey].usage.tasks.executing <
867 (this.opts.tasksQueueOptions?.concurrency as number)
868 )
869 }
870
871 /** @inheritDoc */
872 public async execute (
873 data?: Data,
874 name?: string,
875 transferList?: TransferListItem[]
876 ): Promise<Response> {
877 return await new Promise<Response>((resolve, reject) => {
878 if (!this.started) {
879 reject(new Error('Cannot execute a task on not started pool'))
880 return
881 }
882 if (this.destroying) {
883 reject(new Error('Cannot execute a task on destroying pool'))
884 return
885 }
886 if (name != null && typeof name !== 'string') {
887 reject(new TypeError('name argument must be a string'))
888 return
889 }
890 if (
891 name != null &&
892 typeof name === 'string' &&
893 name.trim().length === 0
894 ) {
895 reject(new TypeError('name argument must not be an empty string'))
896 return
897 }
898 if (transferList != null && !Array.isArray(transferList)) {
899 reject(new TypeError('transferList argument must be an array'))
900 return
901 }
902 const timestamp = performance.now()
903 const workerNodeKey = this.chooseWorkerNode()
904 const task: Task<Data> = {
905 name: name ?? DEFAULT_TASK_NAME,
906 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
907 data: data ?? ({} as Data),
908 transferList,
909 timestamp,
910 taskId: randomUUID()
911 }
912 this.promiseResponseMap.set(task.taskId as string, {
913 resolve,
914 reject,
915 workerNodeKey,
916 ...(this.emitter != null && {
917 asyncResource: new AsyncResource('poolifier:task', {
918 triggerAsyncId: this.emitter.asyncId,
919 requireManualDestroy: true
920 })
921 })
922 })
923 if (
924 this.opts.enableTasksQueue === false ||
925 (this.opts.enableTasksQueue === true &&
926 this.shallExecuteTask(workerNodeKey))
927 ) {
928 this.executeTask(workerNodeKey, task)
929 } else {
930 this.enqueueTask(workerNodeKey, task)
931 }
932 })
933 }
934
935 /** @inheritdoc */
936 public start (): void {
937 if (this.started) {
938 throw new Error('Cannot start an already started pool')
939 }
940 if (this.starting) {
941 throw new Error('Cannot start an already starting pool')
942 }
943 if (this.destroying) {
944 throw new Error('Cannot start a destroying pool')
945 }
946 this.starting = true
947 while (
948 this.workerNodes.reduce(
949 (accumulator, workerNode) =>
950 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
951 0
952 ) < this.minimumNumberOfWorkers
953 ) {
954 this.createAndSetupWorkerNode()
955 }
956 this.starting = false
957 this.started = true
958 }
959
960 /** @inheritDoc */
961 public async destroy (): Promise<void> {
962 if (!this.started) {
963 throw new Error('Cannot destroy an already destroyed pool')
964 }
965 if (this.starting) {
966 throw new Error('Cannot destroy an starting pool')
967 }
968 if (this.destroying) {
969 throw new Error('Cannot destroy an already destroying pool')
970 }
971 this.destroying = true
972 await Promise.all(
973 this.workerNodes.map(async (_workerNode, workerNodeKey) => {
974 await this.destroyWorkerNode(workerNodeKey)
975 })
976 )
977 this.emitter?.emit(PoolEvents.destroy, this.info)
978 this.emitter?.emitDestroy()
979 this.emitter?.removeAllListeners()
980 this.readyEventEmitted = false
981 this.destroying = false
982 this.started = false
983 }
984
985 private async sendKillMessageToWorker (workerNodeKey: number): Promise<void> {
986 await new Promise<void>((resolve, reject) => {
987 if (this.workerNodes?.[workerNodeKey] == null) {
988 resolve()
989 return
990 }
991 const killMessageListener = (message: MessageValue<Response>): void => {
992 this.checkMessageWorkerId(message)
993 if (message.kill === 'success') {
994 resolve()
995 } else if (message.kill === 'failure') {
996 reject(
997 new Error(
998 `Kill message handling failed on worker ${
999 message.workerId as number
1000 }`
1001 )
1002 )
1003 }
1004 }
1005 // FIXME: should be registered only once
1006 this.registerWorkerMessageListener(workerNodeKey, killMessageListener)
1007 this.sendToWorker(workerNodeKey, { kill: true })
1008 })
1009 }
1010
1011 /**
1012 * Terminates the worker node given its worker node key.
1013 *
1014 * @param workerNodeKey - The worker node key.
1015 */
1016 protected async destroyWorkerNode (workerNodeKey: number): Promise<void> {
1017 this.flagWorkerNodeAsNotReady(workerNodeKey)
1018 const flushedTasks = this.flushTasksQueue(workerNodeKey)
1019 const workerNode = this.workerNodes[workerNodeKey]
1020 await waitWorkerNodeEvents(
1021 workerNode,
1022 'taskFinished',
1023 flushedTasks,
1024 this.opts.tasksQueueOptions?.tasksFinishedTimeout ??
1025 getDefaultTasksQueueOptions(
1026 this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
1027 ).tasksFinishedTimeout
1028 )
1029 await this.sendKillMessageToWorker(workerNodeKey)
1030 await workerNode.terminate()
1031 }
1032
1033 /**
1034 * Setup hook to execute code before worker nodes are created in the abstract constructor.
1035 * Can be overridden.
1036 *
1037 * @virtual
1038 */
1039 protected setupHook (): void {
1040 /* Intentionally empty */
1041 }
1042
1043 /**
1044 * Should return whether the worker is the main worker or not.
1045 */
1046 protected abstract isMain (): boolean
1047
1048 /**
1049 * Hook executed before the worker task execution.
1050 * Can be overridden.
1051 *
1052 * @param workerNodeKey - The worker node key.
1053 * @param task - The task to execute.
1054 */
1055 protected beforeTaskExecutionHook (
1056 workerNodeKey: number,
1057 task: Task<Data>
1058 ): void {
1059 if (this.workerNodes[workerNodeKey]?.usage != null) {
1060 const workerUsage = this.workerNodes[workerNodeKey].usage
1061 ++workerUsage.tasks.executing
1062 updateWaitTimeWorkerUsage(
1063 this.workerChoiceStrategyContext,
1064 workerUsage,
1065 task
1066 )
1067 }
1068 if (
1069 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1070 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
1071 task.name as string
1072 ) != null
1073 ) {
1074 const taskFunctionWorkerUsage = this.workerNodes[
1075 workerNodeKey
1076 ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
1077 ++taskFunctionWorkerUsage.tasks.executing
1078 updateWaitTimeWorkerUsage(
1079 this.workerChoiceStrategyContext,
1080 taskFunctionWorkerUsage,
1081 task
1082 )
1083 }
1084 }
1085
1086 /**
1087 * Hook executed after the worker task execution.
1088 * Can be overridden.
1089 *
1090 * @param workerNodeKey - The worker node key.
1091 * @param message - The received message.
1092 */
1093 protected afterTaskExecutionHook (
1094 workerNodeKey: number,
1095 message: MessageValue<Response>
1096 ): void {
1097 let needWorkerChoiceStrategyUpdate = false
1098 if (this.workerNodes[workerNodeKey]?.usage != null) {
1099 const workerUsage = this.workerNodes[workerNodeKey].usage
1100 updateTaskStatisticsWorkerUsage(workerUsage, message)
1101 updateRunTimeWorkerUsage(
1102 this.workerChoiceStrategyContext,
1103 workerUsage,
1104 message
1105 )
1106 updateEluWorkerUsage(
1107 this.workerChoiceStrategyContext,
1108 workerUsage,
1109 message
1110 )
1111 needWorkerChoiceStrategyUpdate = true
1112 }
1113 if (
1114 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1115 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
1116 message.taskPerformance?.name as string
1117 ) != null
1118 ) {
1119 const taskFunctionWorkerUsage = this.workerNodes[
1120 workerNodeKey
1121 ].getTaskFunctionWorkerUsage(
1122 message.taskPerformance?.name as string
1123 ) as WorkerUsage
1124 updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
1125 updateRunTimeWorkerUsage(
1126 this.workerChoiceStrategyContext,
1127 taskFunctionWorkerUsage,
1128 message
1129 )
1130 updateEluWorkerUsage(
1131 this.workerChoiceStrategyContext,
1132 taskFunctionWorkerUsage,
1133 message
1134 )
1135 needWorkerChoiceStrategyUpdate = true
1136 }
1137 if (needWorkerChoiceStrategyUpdate) {
1138 this.workerChoiceStrategyContext.update(workerNodeKey)
1139 }
1140 }
1141
1142 /**
1143 * Whether the worker node shall update its task function worker usage or not.
1144 *
1145 * @param workerNodeKey - The worker node key.
1146 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1147 */
1148 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
1149 const workerInfo = this.getWorkerInfo(workerNodeKey)
1150 return (
1151 workerInfo != null &&
1152 Array.isArray(workerInfo.taskFunctionNames) &&
1153 workerInfo.taskFunctionNames.length > 2
1154 )
1155 }
1156
1157 /**
1158 * Chooses a worker node for the next task.
1159 *
1160 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
1161 *
1162 * @returns The chosen worker node key
1163 */
1164 private chooseWorkerNode (): number {
1165 if (this.shallCreateDynamicWorker()) {
1166 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
1167 if (
1168 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1169 ) {
1170 return workerNodeKey
1171 }
1172 }
1173 return this.workerChoiceStrategyContext.execute()
1174 }
1175
1176 /**
1177 * Conditions for dynamic worker creation.
1178 *
1179 * @returns Whether to create a dynamic worker or not.
1180 */
1181 protected abstract shallCreateDynamicWorker (): boolean
1182
1183 /**
1184 * Sends a message to worker given its worker node key.
1185 *
1186 * @param workerNodeKey - The worker node key.
1187 * @param message - The message.
1188 * @param transferList - The optional array of transferable objects.
1189 */
1190 protected abstract sendToWorker (
1191 workerNodeKey: number,
1192 message: MessageValue<Data>,
1193 transferList?: TransferListItem[]
1194 ): void
1195
1196 /**
1197 * Creates a new, completely set up worker node.
1198 *
1199 * @returns New, completely set up worker node key.
1200 */
1201 protected createAndSetupWorkerNode (): number {
1202 const workerNode = this.createWorkerNode()
1203 workerNode.registerWorkerEventHandler(
1204 'online',
1205 this.opts.onlineHandler ?? EMPTY_FUNCTION
1206 )
1207 workerNode.registerWorkerEventHandler(
1208 'message',
1209 this.opts.messageHandler ?? EMPTY_FUNCTION
1210 )
1211 workerNode.registerWorkerEventHandler(
1212 'error',
1213 this.opts.errorHandler ?? EMPTY_FUNCTION
1214 )
1215 workerNode.registerWorkerEventHandler('error', (error: Error) => {
1216 workerNode.info.ready = false
1217 this.emitter?.emit(PoolEvents.error, error)
1218 if (
1219 this.started &&
1220 !this.starting &&
1221 !this.destroying &&
1222 this.opts.restartWorkerOnError === true
1223 ) {
1224 if (workerNode.info.dynamic) {
1225 this.createAndSetupDynamicWorkerNode()
1226 } else {
1227 this.createAndSetupWorkerNode()
1228 }
1229 }
1230 if (this.started && this.opts.enableTasksQueue === true) {
1231 this.redistributeQueuedTasks(this.workerNodes.indexOf(workerNode))
1232 }
1233 workerNode?.terminate().catch(error => {
1234 this.emitter?.emit(PoolEvents.error, error)
1235 })
1236 })
1237 workerNode.registerWorkerEventHandler(
1238 'exit',
1239 this.opts.exitHandler ?? EMPTY_FUNCTION
1240 )
1241 workerNode.registerOnceWorkerEventHandler('exit', () => {
1242 this.removeWorkerNode(workerNode)
1243 })
1244 const workerNodeKey = this.addWorkerNode(workerNode)
1245 this.afterWorkerNodeSetup(workerNodeKey)
1246 return workerNodeKey
1247 }
1248
1249 /**
1250 * Creates a new, completely set up dynamic worker node.
1251 *
1252 * @returns New, completely set up dynamic worker node key.
1253 */
1254 protected createAndSetupDynamicWorkerNode (): number {
1255 const workerNodeKey = this.createAndSetupWorkerNode()
1256 this.registerWorkerMessageListener(workerNodeKey, message => {
1257 this.checkMessageWorkerId(message)
1258 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1259 message.workerId
1260 )
1261 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
1262 // Kill message received from worker
1263 if (
1264 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1265 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
1266 ((this.opts.enableTasksQueue === false &&
1267 workerUsage.tasks.executing === 0) ||
1268 (this.opts.enableTasksQueue === true &&
1269 workerUsage.tasks.executing === 0 &&
1270 this.tasksQueueSize(localWorkerNodeKey) === 0)))
1271 ) {
1272 // Flag the worker node as not ready immediately
1273 this.flagWorkerNodeAsNotReady(localWorkerNodeKey)
1274 this.destroyWorkerNode(localWorkerNodeKey).catch(error => {
1275 this.emitter?.emit(PoolEvents.error, error)
1276 })
1277 }
1278 })
1279 const workerInfo = this.getWorkerInfo(workerNodeKey)
1280 this.sendToWorker(workerNodeKey, {
1281 checkActive: true
1282 })
1283 if (this.taskFunctions.size > 0) {
1284 for (const [taskFunctionName, taskFunction] of this.taskFunctions) {
1285 this.sendTaskFunctionOperationToWorker(workerNodeKey, {
1286 taskFunctionOperation: 'add',
1287 taskFunctionName,
1288 taskFunction: taskFunction.toString()
1289 }).catch(error => {
1290 this.emitter?.emit(PoolEvents.error, error)
1291 })
1292 }
1293 }
1294 workerInfo.dynamic = true
1295 if (
1296 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
1297 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1298 ) {
1299 workerInfo.ready = true
1300 }
1301 this.checkAndEmitDynamicWorkerCreationEvents()
1302 return workerNodeKey
1303 }
1304
1305 /**
1306 * Registers a listener callback on the worker given its worker node key.
1307 *
1308 * @param workerNodeKey - The worker node key.
1309 * @param listener - The message listener callback.
1310 */
1311 protected abstract registerWorkerMessageListener<
1312 Message extends Data | Response
1313 >(
1314 workerNodeKey: number,
1315 listener: (message: MessageValue<Message>) => void
1316 ): void
1317
1318 /**
1319 * Registers once a listener callback on the worker given its worker node key.
1320 *
1321 * @param workerNodeKey - The worker node key.
1322 * @param listener - The message listener callback.
1323 */
1324 protected abstract registerOnceWorkerMessageListener<
1325 Message extends Data | Response
1326 >(
1327 workerNodeKey: number,
1328 listener: (message: MessageValue<Message>) => void
1329 ): void
1330
1331 /**
1332 * Deregisters a listener callback on the worker given its worker node key.
1333 *
1334 * @param workerNodeKey - The worker node key.
1335 * @param listener - The message listener callback.
1336 */
1337 protected abstract deregisterWorkerMessageListener<
1338 Message extends Data | Response
1339 >(
1340 workerNodeKey: number,
1341 listener: (message: MessageValue<Message>) => void
1342 ): void
1343
1344 /**
1345 * Method hooked up after a worker node has been newly created.
1346 * Can be overridden.
1347 *
1348 * @param workerNodeKey - The newly created worker node key.
1349 */
1350 protected afterWorkerNodeSetup (workerNodeKey: number): void {
1351 // Listen to worker messages.
1352 this.registerWorkerMessageListener(
1353 workerNodeKey,
1354 this.workerMessageListener
1355 )
1356 // Send the startup message to worker.
1357 this.sendStartupMessageToWorker(workerNodeKey)
1358 // Send the statistics message to worker.
1359 this.sendStatisticsMessageToWorker(workerNodeKey)
1360 if (this.opts.enableTasksQueue === true) {
1361 if (this.opts.tasksQueueOptions?.taskStealing === true) {
1362 this.workerNodes[workerNodeKey].on(
1363 'idleWorkerNode',
1364 this.handleIdleWorkerNodeEvent
1365 )
1366 }
1367 if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
1368 this.workerNodes[workerNodeKey].on(
1369 'backPressure',
1370 this.handleBackPressureEvent
1371 )
1372 }
1373 }
1374 }
1375
1376 /**
1377 * Sends the startup message to worker given its worker node key.
1378 *
1379 * @param workerNodeKey - The worker node key.
1380 */
1381 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1382
1383 /**
1384 * Sends the statistics message to worker given its worker node key.
1385 *
1386 * @param workerNodeKey - The worker node key.
1387 */
1388 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
1389 this.sendToWorker(workerNodeKey, {
1390 statistics: {
1391 runTime:
1392 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1393 .runTime.aggregate,
1394 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1395 .elu.aggregate
1396 }
1397 })
1398 }
1399
1400 private handleTask (workerNodeKey: number, task: Task<Data>): void {
1401 if (this.shallExecuteTask(workerNodeKey)) {
1402 this.executeTask(workerNodeKey, task)
1403 } else {
1404 this.enqueueTask(workerNodeKey, task)
1405 }
1406 }
1407
1408 private redistributeQueuedTasks (workerNodeKey: number): void {
1409 if (workerNodeKey === -1) {
1410 return
1411 }
1412 if (this.workerNodes.length <= 1) {
1413 return
1414 }
1415 while (this.tasksQueueSize(workerNodeKey) > 0) {
1416 const destinationWorkerNodeKey = this.workerNodes.reduce(
1417 (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
1418 return workerNode.info.ready &&
1419 workerNode.usage.tasks.queued <
1420 workerNodes[minWorkerNodeKey].usage.tasks.queued
1421 ? workerNodeKey
1422 : minWorkerNodeKey
1423 },
1424 0
1425 )
1426 this.handleTask(
1427 destinationWorkerNodeKey,
1428 this.dequeueTask(workerNodeKey) as Task<Data>
1429 )
1430 }
1431 }
1432
1433 private updateTaskStolenStatisticsWorkerUsage (
1434 workerNodeKey: number,
1435 taskName: string
1436 ): void {
1437 const workerNode = this.workerNodes[workerNodeKey]
1438 if (workerNode?.usage != null) {
1439 ++workerNode.usage.tasks.stolen
1440 }
1441 if (
1442 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1443 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1444 ) {
1445 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1446 taskName
1447 ) as WorkerUsage
1448 ++taskFunctionWorkerUsage.tasks.stolen
1449 }
1450 }
1451
1452 private updateTaskSequentiallyStolenStatisticsWorkerUsage (
1453 workerNodeKey: number
1454 ): void {
1455 const workerNode = this.workerNodes[workerNodeKey]
1456 if (workerNode?.usage != null) {
1457 ++workerNode.usage.tasks.sequentiallyStolen
1458 }
1459 }
1460
1461 private updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1462 workerNodeKey: number,
1463 taskName: string
1464 ): void {
1465 const workerNode = this.workerNodes[workerNodeKey]
1466 if (
1467 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1468 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1469 ) {
1470 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1471 taskName
1472 ) as WorkerUsage
1473 ++taskFunctionWorkerUsage.tasks.sequentiallyStolen
1474 }
1475 }
1476
1477 private resetTaskSequentiallyStolenStatisticsWorkerUsage (
1478 workerNodeKey: number
1479 ): void {
1480 const workerNode = this.workerNodes[workerNodeKey]
1481 if (workerNode?.usage != null) {
1482 workerNode.usage.tasks.sequentiallyStolen = 0
1483 }
1484 }
1485
1486 private resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1487 workerNodeKey: number,
1488 taskName: string
1489 ): void {
1490 const workerNode = this.workerNodes[workerNodeKey]
1491 if (
1492 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1493 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1494 ) {
1495 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1496 taskName
1497 ) as WorkerUsage
1498 taskFunctionWorkerUsage.tasks.sequentiallyStolen = 0
1499 }
1500 }
1501
1502 private readonly handleIdleWorkerNodeEvent = (
1503 eventDetail: WorkerNodeEventDetail,
1504 previousStolenTask?: Task<Data>
1505 ): void => {
1506 if (this.workerNodes.length <= 1) {
1507 return
1508 }
1509 const { workerNodeKey } = eventDetail
1510 if (workerNodeKey == null) {
1511 throw new Error(
1512 'WorkerNode event detail workerNodeKey attribute must be defined'
1513 )
1514 }
1515 const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks
1516 if (
1517 previousStolenTask != null &&
1518 workerNodeTasksUsage.sequentiallyStolen > 0 &&
1519 (workerNodeTasksUsage.executing > 0 ||
1520 this.tasksQueueSize(workerNodeKey) > 0)
1521 ) {
1522 for (const taskName of this.workerNodes[workerNodeKey].info
1523 .taskFunctionNames as string[]) {
1524 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1525 workerNodeKey,
1526 taskName
1527 )
1528 }
1529 this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
1530 return
1531 }
1532 const stolenTask = this.workerNodeStealTask(workerNodeKey)
1533 if (
1534 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1535 stolenTask != null
1536 ) {
1537 const taskFunctionTasksWorkerUsage = this.workerNodes[
1538 workerNodeKey
1539 ].getTaskFunctionWorkerUsage(stolenTask.name as string)
1540 ?.tasks as TaskStatistics
1541 if (
1542 taskFunctionTasksWorkerUsage.sequentiallyStolen === 0 ||
1543 (previousStolenTask != null &&
1544 previousStolenTask.name === stolenTask.name &&
1545 taskFunctionTasksWorkerUsage.sequentiallyStolen > 0)
1546 ) {
1547 this.updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1548 workerNodeKey,
1549 stolenTask.name as string
1550 )
1551 } else {
1552 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1553 workerNodeKey,
1554 stolenTask.name as string
1555 )
1556 }
1557 }
1558 sleep(exponentialDelay(workerNodeTasksUsage.sequentiallyStolen))
1559 .then(() => {
1560 this.handleIdleWorkerNodeEvent(eventDetail, stolenTask)
1561 return undefined
1562 })
1563 .catch(EMPTY_FUNCTION)
1564 }
1565
1566 private readonly workerNodeStealTask = (
1567 workerNodeKey: number
1568 ): Task<Data> | undefined => {
1569 const workerNodes = this.workerNodes
1570 .slice()
1571 .sort(
1572 (workerNodeA, workerNodeB) =>
1573 workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
1574 )
1575 const sourceWorkerNode = workerNodes.find(
1576 (sourceWorkerNode, sourceWorkerNodeKey) =>
1577 sourceWorkerNode.info.ready &&
1578 sourceWorkerNodeKey !== workerNodeKey &&
1579 sourceWorkerNode.usage.tasks.queued > 0
1580 )
1581 if (sourceWorkerNode != null) {
1582 const task = sourceWorkerNode.popTask() as Task<Data>
1583 this.handleTask(workerNodeKey, task)
1584 this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
1585 this.updateTaskStolenStatisticsWorkerUsage(
1586 workerNodeKey,
1587 task.name as string
1588 )
1589 return task
1590 }
1591 }
1592
1593 private readonly handleBackPressureEvent = (
1594 eventDetail: WorkerNodeEventDetail
1595 ): void => {
1596 if (this.workerNodes.length <= 1) {
1597 return
1598 }
1599 const { workerId } = eventDetail
1600 const sizeOffset = 1
1601 if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) {
1602 return
1603 }
1604 const sourceWorkerNode =
1605 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
1606 const workerNodes = this.workerNodes
1607 .slice()
1608 .sort(
1609 (workerNodeA, workerNodeB) =>
1610 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1611 )
1612 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1613 if (
1614 sourceWorkerNode.usage.tasks.queued > 0 &&
1615 workerNode.info.ready &&
1616 workerNode.info.id !== workerId &&
1617 workerNode.usage.tasks.queued <
1618 (this.opts.tasksQueueOptions?.size as number) - sizeOffset
1619 ) {
1620 const task = sourceWorkerNode.popTask() as Task<Data>
1621 this.handleTask(workerNodeKey, task)
1622 this.updateTaskStolenStatisticsWorkerUsage(
1623 workerNodeKey,
1624 task.name as string
1625 )
1626 }
1627 }
1628 }
1629
1630 /**
1631 * This method is the message listener registered on each worker.
1632 */
1633 protected readonly workerMessageListener = (
1634 message: MessageValue<Response>
1635 ): void => {
1636 this.checkMessageWorkerId(message)
1637 const { workerId, ready, taskId, taskFunctionNames } = message
1638 if (ready != null && taskFunctionNames != null) {
1639 // Worker ready response received from worker
1640 this.handleWorkerReadyResponse(message)
1641 } else if (taskId != null) {
1642 // Task execution response received from worker
1643 this.handleTaskExecutionResponse(message)
1644 } else if (taskFunctionNames != null) {
1645 // Task function names message received from worker
1646 this.getWorkerInfo(
1647 this.getWorkerNodeKeyByWorkerId(workerId)
1648 ).taskFunctionNames = taskFunctionNames
1649 }
1650 }
1651
1652 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
1653 const { workerId, ready, taskFunctionNames } = message
1654 if (ready === false) {
1655 throw new Error(`Worker ${workerId as number} failed to initialize`)
1656 }
1657 const workerInfo = this.getWorkerInfo(
1658 this.getWorkerNodeKeyByWorkerId(workerId)
1659 )
1660 workerInfo.ready = ready as boolean
1661 workerInfo.taskFunctionNames = taskFunctionNames
1662 if (!this.readyEventEmitted && this.ready) {
1663 this.readyEventEmitted = true
1664 this.emitter?.emit(PoolEvents.ready, this.info)
1665 }
1666 }
1667
1668 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1669 const { workerId, taskId, workerError, data } = message
1670 const promiseResponse = this.promiseResponseMap.get(taskId as string)
1671 if (promiseResponse != null) {
1672 const { resolve, reject, workerNodeKey, asyncResource } = promiseResponse
1673 const workerNode = this.workerNodes[workerNodeKey]
1674 if (workerError != null) {
1675 this.emitter?.emit(PoolEvents.taskError, workerError)
1676 asyncResource != null
1677 ? asyncResource.runInAsyncScope(
1678 reject,
1679 this.emitter,
1680 workerError.message
1681 )
1682 : reject(workerError.message)
1683 } else {
1684 asyncResource != null
1685 ? asyncResource.runInAsyncScope(resolve, this.emitter, data)
1686 : resolve(data as Response)
1687 }
1688 asyncResource?.emitDestroy()
1689 this.afterTaskExecutionHook(workerNodeKey, message)
1690 this.promiseResponseMap.delete(taskId as string)
1691 workerNode?.emit('taskFinished', taskId)
1692 if (this.opts.enableTasksQueue === true && !this.destroying) {
1693 const workerNodeTasksUsage = workerNode.usage.tasks
1694 if (
1695 this.tasksQueueSize(workerNodeKey) > 0 &&
1696 workerNodeTasksUsage.executing <
1697 (this.opts.tasksQueueOptions?.concurrency as number)
1698 ) {
1699 this.executeTask(
1700 workerNodeKey,
1701 this.dequeueTask(workerNodeKey) as Task<Data>
1702 )
1703 }
1704 if (
1705 workerNodeTasksUsage.executing === 0 &&
1706 this.tasksQueueSize(workerNodeKey) === 0 &&
1707 workerNodeTasksUsage.sequentiallyStolen === 0
1708 ) {
1709 workerNode.emit('idleWorkerNode', {
1710 workerId: workerId as number,
1711 workerNodeKey
1712 })
1713 }
1714 }
1715 }
1716 }
1717
1718 private checkAndEmitTaskExecutionEvents (): void {
1719 if (this.busy) {
1720 this.emitter?.emit(PoolEvents.busy, this.info)
1721 }
1722 }
1723
1724 private checkAndEmitTaskQueuingEvents (): void {
1725 if (this.hasBackPressure()) {
1726 this.emitter?.emit(PoolEvents.backPressure, this.info)
1727 }
1728 }
1729
1730 private checkAndEmitDynamicWorkerCreationEvents (): void {
1731 if (this.type === PoolTypes.dynamic) {
1732 if (this.full) {
1733 this.emitter?.emit(PoolEvents.full, this.info)
1734 }
1735 }
1736 }
1737
1738 /**
1739 * Gets the worker information given its worker node key.
1740 *
1741 * @param workerNodeKey - The worker node key.
1742 * @returns The worker information.
1743 */
1744 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
1745 return this.workerNodes[workerNodeKey]?.info
1746 }
1747
1748 /**
1749 * Creates a worker node.
1750 *
1751 * @returns The created worker node.
1752 */
1753 private createWorkerNode (): IWorkerNode<Worker, Data> {
1754 const workerNode = new WorkerNode<Worker, Data>(
1755 this.worker,
1756 this.filePath,
1757 {
1758 env: this.opts.env,
1759 workerOptions: this.opts.workerOptions,
1760 tasksQueueBackPressureSize:
1761 this.opts.tasksQueueOptions?.size ??
1762 getDefaultTasksQueueOptions(
1763 this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
1764 ).size
1765 }
1766 )
1767 // Flag the worker node as ready at pool startup.
1768 if (this.starting) {
1769 workerNode.info.ready = true
1770 }
1771 return workerNode
1772 }
1773
1774 /**
1775 * Adds the given worker node in the pool worker nodes.
1776 *
1777 * @param workerNode - The worker node.
1778 * @returns The added worker node key.
1779 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1780 */
1781 private addWorkerNode (workerNode: IWorkerNode<Worker, Data>): number {
1782 this.workerNodes.push(workerNode)
1783 const workerNodeKey = this.workerNodes.indexOf(workerNode)
1784 if (workerNodeKey === -1) {
1785 throw new Error('Worker added not found in worker nodes')
1786 }
1787 return workerNodeKey
1788 }
1789
1790 /**
1791 * Removes the worker node from the pool worker nodes.
1792 *
1793 * @param workerNode - The worker node.
1794 */
1795 private removeWorkerNode (workerNode: IWorkerNode<Worker, Data>): void {
1796 const workerNodeKey = this.workerNodes.indexOf(workerNode)
1797 if (workerNodeKey !== -1) {
1798 this.workerNodes.splice(workerNodeKey, 1)
1799 this.workerChoiceStrategyContext.remove(workerNodeKey)
1800 }
1801 }
1802
1803 protected flagWorkerNodeAsNotReady (workerNodeKey: number): void {
1804 this.getWorkerInfo(workerNodeKey).ready = false
1805 }
1806
1807 /** @inheritDoc */
1808 public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
1809 return (
1810 this.opts.enableTasksQueue === true &&
1811 this.workerNodes[workerNodeKey].hasBackPressure()
1812 )
1813 }
1814
1815 private hasBackPressure (): boolean {
1816 return (
1817 this.opts.enableTasksQueue === true &&
1818 this.workerNodes.findIndex(
1819 workerNode => !workerNode.hasBackPressure()
1820 ) === -1
1821 )
1822 }
1823
1824 /**
1825 * Executes the given task on the worker given its worker node key.
1826 *
1827 * @param workerNodeKey - The worker node key.
1828 * @param task - The task to execute.
1829 */
1830 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1831 this.beforeTaskExecutionHook(workerNodeKey, task)
1832 this.sendToWorker(workerNodeKey, task, task.transferList)
1833 this.checkAndEmitTaskExecutionEvents()
1834 }
1835
1836 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1837 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1838 this.checkAndEmitTaskQueuingEvents()
1839 return tasksQueueSize
1840 }
1841
1842 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1843 return this.workerNodes[workerNodeKey].dequeueTask()
1844 }
1845
1846 private tasksQueueSize (workerNodeKey: number): number {
1847 return this.workerNodes[workerNodeKey].tasksQueueSize()
1848 }
1849
1850 protected flushTasksQueue (workerNodeKey: number): number {
1851 let flushedTasks = 0
1852 while (this.tasksQueueSize(workerNodeKey) > 0) {
1853 this.executeTask(
1854 workerNodeKey,
1855 this.dequeueTask(workerNodeKey) as Task<Data>
1856 )
1857 ++flushedTasks
1858 }
1859 this.workerNodes[workerNodeKey].clearTasksQueue()
1860 return flushedTasks
1861 }
1862
1863 private flushTasksQueues (): void {
1864 for (const [workerNodeKey] of this.workerNodes.entries()) {
1865 this.flushTasksQueue(workerNodeKey)
1866 }
1867 }
1868 }