perf: optimize pool emptiness condition
[poolifier.git] / src / pools / abstract-pool.ts
1 import { randomUUID } from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import type { TransferListItem } from 'node:worker_threads'
4 import { EventEmitterAsyncResource } from 'node:events'
5 import { AsyncResource } from 'node:async_hooks'
6 import type {
7 MessageValue,
8 PromiseResponseWrapper,
9 Task
10 } from '../utility-types.js'
11 import {
12 DEFAULT_TASK_NAME,
13 EMPTY_FUNCTION,
14 average,
15 exponentialDelay,
16 isKillBehavior,
17 isPlainObject,
18 max,
19 median,
20 min,
21 round,
22 sleep
23 } from '../utils.js'
24 import { KillBehaviors } from '../worker/worker-options.js'
25 import type { TaskFunction } from '../worker/task-functions.js'
26 import {
27 type IPool,
28 PoolEvents,
29 type PoolInfo,
30 type PoolOptions,
31 type PoolType,
32 PoolTypes,
33 type TasksQueueOptions
34 } from './pool.js'
35 import type {
36 IWorker,
37 IWorkerNode,
38 WorkerInfo,
39 WorkerNodeEventDetail,
40 WorkerType
41 } from './worker.js'
42 import {
43 Measurements,
44 WorkerChoiceStrategies,
45 type WorkerChoiceStrategy,
46 type WorkerChoiceStrategyOptions
47 } from './selection-strategies/selection-strategies-types.js'
48 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context.js'
49 import { version } from './version.js'
50 import { WorkerNode } from './worker-node.js'
51 import {
52 checkFilePath,
53 checkValidTasksQueueOptions,
54 checkValidWorkerChoiceStrategy,
55 getDefaultTasksQueueOptions,
56 updateEluWorkerUsage,
57 updateRunTimeWorkerUsage,
58 updateTaskStatisticsWorkerUsage,
59 updateWaitTimeWorkerUsage,
60 waitWorkerNodeEvents
61 } from './utils.js'
62
63 /**
64 * Base class that implements some shared logic for all poolifier pools.
65 *
66 * @typeParam Worker - Type of worker which manages this pool.
67 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
68 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
69 */
70 export abstract class AbstractPool<
71 Worker extends IWorker,
72 Data = unknown,
73 Response = unknown
74 > implements IPool<Worker, Data, Response> {
75 /** @inheritDoc */
76 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
77
78 /** @inheritDoc */
79 public emitter?: EventEmitterAsyncResource
80
81 /**
82 * The task execution response promise map:
83 * - `key`: The message id of each submitted task.
84 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
85 *
86 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
87 */
88 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
89 new Map<string, PromiseResponseWrapper<Response>>()
90
91 /**
92 * Worker choice strategy context referencing a worker choice algorithm implementation.
93 */
94 protected workerChoiceStrategyContext?: WorkerChoiceStrategyContext<
95 Worker,
96 Data,
97 Response
98 >
99
100 /**
101 * The task functions added at runtime map:
102 * - `key`: The task function name.
103 * - `value`: The task function itself.
104 */
105 private readonly taskFunctions: Map<string, TaskFunction<Data, Response>>
106
107 /**
108 * Whether the pool is started or not.
109 */
110 private started: boolean
111 /**
112 * Whether the pool is starting or not.
113 */
114 private starting: boolean
115 /**
116 * Whether the pool is destroying or not.
117 */
118 private destroying: boolean
119 /**
120 * Whether the pool ready event has been emitted or not.
121 */
122 private readyEventEmitted: boolean
123 /**
124 * The start timestamp of the pool.
125 */
126 private readonly startTimestamp
127
128 /**
129 * Constructs a new poolifier pool.
130 *
131 * @param minimumNumberOfWorkers - Minimum number of workers that this pool manages.
132 * @param filePath - Path to the worker file.
133 * @param opts - Options for the pool.
134 * @param maximumNumberOfWorkers - Maximum number of workers that this pool manages.
135 */
136 public constructor (
137 protected readonly minimumNumberOfWorkers: number,
138 protected readonly filePath: string,
139 protected readonly opts: PoolOptions<Worker>,
140 protected readonly maximumNumberOfWorkers?: number
141 ) {
142 if (!this.isMain()) {
143 throw new Error(
144 'Cannot start a pool from a worker with the same type as the pool'
145 )
146 }
147 this.checkPoolType()
148 checkFilePath(this.filePath)
149 this.checkMinimumNumberOfWorkers(this.minimumNumberOfWorkers)
150 this.checkPoolOptions(this.opts)
151
152 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
153 this.executeTask = this.executeTask.bind(this)
154 this.enqueueTask = this.enqueueTask.bind(this)
155
156 if (this.opts.enableEvents === true) {
157 this.initializeEventEmitter()
158 }
159 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
160 Worker,
161 Data,
162 Response
163 >(
164 this,
165 this.opts.workerChoiceStrategy,
166 this.opts.workerChoiceStrategyOptions
167 )
168
169 this.setupHook()
170
171 this.taskFunctions = new Map<string, TaskFunction<Data, Response>>()
172
173 this.started = false
174 this.starting = false
175 this.destroying = false
176 this.readyEventEmitted = false
177 if (this.opts.startWorkers === true) {
178 this.start()
179 }
180
181 this.startTimestamp = performance.now()
182 }
183
184 private checkPoolType (): void {
185 if (this.type === PoolTypes.fixed && this.maximumNumberOfWorkers != null) {
186 throw new Error(
187 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
188 )
189 }
190 }
191
192 private checkMinimumNumberOfWorkers (
193 minimumNumberOfWorkers: number | undefined
194 ): void {
195 if (minimumNumberOfWorkers == null) {
196 throw new Error(
197 'Cannot instantiate a pool without specifying the number of workers'
198 )
199 } else if (!Number.isSafeInteger(minimumNumberOfWorkers)) {
200 throw new TypeError(
201 'Cannot instantiate a pool with a non safe integer number of workers'
202 )
203 } else if (minimumNumberOfWorkers < 0) {
204 throw new RangeError(
205 'Cannot instantiate a pool with a negative number of workers'
206 )
207 } else if (this.type === PoolTypes.fixed && minimumNumberOfWorkers === 0) {
208 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
209 }
210 }
211
212 private checkPoolOptions (opts: PoolOptions<Worker>): void {
213 if (isPlainObject(opts)) {
214 this.opts.startWorkers = opts.startWorkers ?? true
215 checkValidWorkerChoiceStrategy(opts.workerChoiceStrategy)
216 this.opts.workerChoiceStrategy =
217 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
218 this.checkValidWorkerChoiceStrategyOptions(
219 opts.workerChoiceStrategyOptions
220 )
221 if (opts.workerChoiceStrategyOptions != null) {
222 this.opts.workerChoiceStrategyOptions = opts.workerChoiceStrategyOptions
223 }
224 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
225 this.opts.enableEvents = opts.enableEvents ?? true
226 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
227 if (this.opts.enableTasksQueue) {
228 checkValidTasksQueueOptions(opts.tasksQueueOptions)
229 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
230 opts.tasksQueueOptions
231 )
232 }
233 } else {
234 throw new TypeError('Invalid pool options: must be a plain object')
235 }
236 }
237
238 private checkValidWorkerChoiceStrategyOptions (
239 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions | undefined
240 ): void {
241 if (
242 workerChoiceStrategyOptions != null &&
243 !isPlainObject(workerChoiceStrategyOptions)
244 ) {
245 throw new TypeError(
246 'Invalid worker choice strategy options: must be a plain object'
247 )
248 }
249 if (
250 workerChoiceStrategyOptions?.weights != null &&
251 Object.keys(workerChoiceStrategyOptions.weights).length !==
252 (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
253 ) {
254 throw new Error(
255 'Invalid worker choice strategy options: must have a weight for each worker node'
256 )
257 }
258 if (
259 workerChoiceStrategyOptions?.measurement != null &&
260 !Object.values(Measurements).includes(
261 workerChoiceStrategyOptions.measurement
262 )
263 ) {
264 throw new Error(
265 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
266 )
267 }
268 }
269
270 private initializeEventEmitter (): void {
271 this.emitter = new EventEmitterAsyncResource({
272 name: `poolifier:${this.type}-${this.worker}-pool`
273 })
274 }
275
276 /** @inheritDoc */
277 public get info (): PoolInfo {
278 return {
279 version,
280 type: this.type,
281 worker: this.worker,
282 started: this.started,
283 ready: this.ready,
284 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
285 strategy: this.opts.workerChoiceStrategy!,
286 minSize: this.minimumNumberOfWorkers,
287 maxSize: this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers,
288 ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
289 ?.runTime.aggregate === true &&
290 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
291 .waitTime.aggregate && {
292 utilization: round(this.utilization)
293 }),
294 workerNodes: this.workerNodes.length,
295 idleWorkerNodes: this.workerNodes.reduce(
296 (accumulator, workerNode) =>
297 workerNode.usage.tasks.executing === 0
298 ? accumulator + 1
299 : accumulator,
300 0
301 ),
302 ...(this.opts.enableTasksQueue === true && {
303 stealingWorkerNodes: this.workerNodes.reduce(
304 (accumulator, workerNode) =>
305 workerNode.info.stealing ? accumulator + 1 : accumulator,
306 0
307 )
308 }),
309 busyWorkerNodes: this.workerNodes.reduce(
310 (accumulator, _workerNode, workerNodeKey) =>
311 this.isWorkerNodeBusy(workerNodeKey) ? accumulator + 1 : accumulator,
312 0
313 ),
314 executedTasks: this.workerNodes.reduce(
315 (accumulator, workerNode) =>
316 accumulator + workerNode.usage.tasks.executed,
317 0
318 ),
319 executingTasks: this.workerNodes.reduce(
320 (accumulator, workerNode) =>
321 accumulator + workerNode.usage.tasks.executing,
322 0
323 ),
324 ...(this.opts.enableTasksQueue === true && {
325 queuedTasks: this.workerNodes.reduce(
326 (accumulator, workerNode) =>
327 accumulator + workerNode.usage.tasks.queued,
328 0
329 )
330 }),
331 ...(this.opts.enableTasksQueue === true && {
332 maxQueuedTasks: this.workerNodes.reduce(
333 (accumulator, workerNode) =>
334 accumulator + (workerNode.usage.tasks.maxQueued ?? 0),
335 0
336 )
337 }),
338 ...(this.opts.enableTasksQueue === true && {
339 backPressure: this.hasBackPressure()
340 }),
341 ...(this.opts.enableTasksQueue === true && {
342 stolenTasks: this.workerNodes.reduce(
343 (accumulator, workerNode) =>
344 accumulator + workerNode.usage.tasks.stolen,
345 0
346 )
347 }),
348 failedTasks: this.workerNodes.reduce(
349 (accumulator, workerNode) =>
350 accumulator + workerNode.usage.tasks.failed,
351 0
352 ),
353 ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
354 ?.runTime.aggregate === true && {
355 runTime: {
356 minimum: round(
357 min(
358 ...this.workerNodes.map(
359 workerNode => workerNode.usage.runTime.minimum ?? Infinity
360 )
361 )
362 ),
363 maximum: round(
364 max(
365 ...this.workerNodes.map(
366 workerNode => workerNode.usage.runTime.maximum ?? -Infinity
367 )
368 )
369 ),
370 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
371 .runTime.average && {
372 average: round(
373 average(
374 this.workerNodes.reduce<number[]>(
375 (accumulator, workerNode) =>
376 accumulator.concat(workerNode.usage.runTime.history),
377 []
378 )
379 )
380 )
381 }),
382 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
383 .runTime.median && {
384 median: round(
385 median(
386 this.workerNodes.reduce<number[]>(
387 (accumulator, workerNode) =>
388 accumulator.concat(workerNode.usage.runTime.history),
389 []
390 )
391 )
392 )
393 })
394 }
395 }),
396 ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
397 ?.waitTime.aggregate === true && {
398 waitTime: {
399 minimum: round(
400 min(
401 ...this.workerNodes.map(
402 workerNode => workerNode.usage.waitTime.minimum ?? Infinity
403 )
404 )
405 ),
406 maximum: round(
407 max(
408 ...this.workerNodes.map(
409 workerNode => workerNode.usage.waitTime.maximum ?? -Infinity
410 )
411 )
412 ),
413 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
414 .waitTime.average && {
415 average: round(
416 average(
417 this.workerNodes.reduce<number[]>(
418 (accumulator, workerNode) =>
419 accumulator.concat(workerNode.usage.waitTime.history),
420 []
421 )
422 )
423 )
424 }),
425 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
426 .waitTime.median && {
427 median: round(
428 median(
429 this.workerNodes.reduce<number[]>(
430 (accumulator, workerNode) =>
431 accumulator.concat(workerNode.usage.waitTime.history),
432 []
433 )
434 )
435 )
436 })
437 }
438 })
439 }
440 }
441
442 /**
443 * The pool readiness boolean status.
444 */
445 private get ready (): boolean {
446 if (this.empty) {
447 return false
448 }
449 return (
450 this.workerNodes.reduce(
451 (accumulator, workerNode) =>
452 !workerNode.info.dynamic && workerNode.info.ready
453 ? accumulator + 1
454 : accumulator,
455 0
456 ) >= this.minimumNumberOfWorkers
457 )
458 }
459
460 /**
461 * The pool emptiness boolean status.
462 */
463 protected get empty (): boolean {
464 return this.minimumNumberOfWorkers === 0 && this.workerNodes.length === 0
465 }
466
467 /**
468 * The approximate pool utilization.
469 *
470 * @returns The pool utilization.
471 */
472 private get utilization (): number {
473 const poolTimeCapacity =
474 (performance.now() - this.startTimestamp) *
475 (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
476 const totalTasksRunTime = this.workerNodes.reduce(
477 (accumulator, workerNode) =>
478 accumulator + (workerNode.usage.runTime.aggregate ?? 0),
479 0
480 )
481 const totalTasksWaitTime = this.workerNodes.reduce(
482 (accumulator, workerNode) =>
483 accumulator + (workerNode.usage.waitTime.aggregate ?? 0),
484 0
485 )
486 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
487 }
488
489 /**
490 * The pool type.
491 *
492 * If it is `'dynamic'`, it provides the `max` property.
493 */
494 protected abstract get type (): PoolType
495
496 /**
497 * The worker type.
498 */
499 protected abstract get worker (): WorkerType
500
501 /**
502 * Checks if the worker id sent in the received message from a worker is valid.
503 *
504 * @param message - The received message.
505 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
506 */
507 private checkMessageWorkerId (message: MessageValue<Data | Response>): void {
508 if (message.workerId == null) {
509 throw new Error('Worker message received without worker id')
510 } else if (this.getWorkerNodeKeyByWorkerId(message.workerId) === -1) {
511 throw new Error(
512 `Worker message received from unknown worker '${message.workerId}'`
513 )
514 }
515 }
516
517 /**
518 * Gets the worker node key given its worker id.
519 *
520 * @param workerId - The worker id.
521 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
522 */
523 private getWorkerNodeKeyByWorkerId (workerId: number | undefined): number {
524 return this.workerNodes.findIndex(
525 workerNode => workerNode.info.id === workerId
526 )
527 }
528
529 /** @inheritDoc */
530 public setWorkerChoiceStrategy (
531 workerChoiceStrategy: WorkerChoiceStrategy,
532 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
533 ): void {
534 checkValidWorkerChoiceStrategy(workerChoiceStrategy)
535 this.opts.workerChoiceStrategy = workerChoiceStrategy
536 this.workerChoiceStrategyContext?.setWorkerChoiceStrategy(
537 this.opts.workerChoiceStrategy
538 )
539 if (workerChoiceStrategyOptions != null) {
540 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
541 }
542 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
543 workerNode.resetUsage()
544 this.sendStatisticsMessageToWorker(workerNodeKey)
545 }
546 }
547
548 /** @inheritDoc */
549 public setWorkerChoiceStrategyOptions (
550 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions | undefined
551 ): void {
552 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
553 if (workerChoiceStrategyOptions != null) {
554 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
555 }
556 this.workerChoiceStrategyContext?.setOptions(
557 this.opts.workerChoiceStrategyOptions
558 )
559 }
560
561 /** @inheritDoc */
562 public enableTasksQueue (
563 enable: boolean,
564 tasksQueueOptions?: TasksQueueOptions
565 ): void {
566 if (this.opts.enableTasksQueue === true && !enable) {
567 this.unsetTaskStealing()
568 this.unsetTasksStealingOnBackPressure()
569 this.flushTasksQueues()
570 }
571 this.opts.enableTasksQueue = enable
572 this.setTasksQueueOptions(tasksQueueOptions)
573 }
574
575 /** @inheritDoc */
576 public setTasksQueueOptions (
577 tasksQueueOptions: TasksQueueOptions | undefined
578 ): void {
579 if (this.opts.enableTasksQueue === true) {
580 checkValidTasksQueueOptions(tasksQueueOptions)
581 this.opts.tasksQueueOptions =
582 this.buildTasksQueueOptions(tasksQueueOptions)
583 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
584 this.setTasksQueueSize(this.opts.tasksQueueOptions.size!)
585 if (this.opts.tasksQueueOptions.taskStealing === true) {
586 this.unsetTaskStealing()
587 this.setTaskStealing()
588 } else {
589 this.unsetTaskStealing()
590 }
591 if (this.opts.tasksQueueOptions.tasksStealingOnBackPressure === true) {
592 this.unsetTasksStealingOnBackPressure()
593 this.setTasksStealingOnBackPressure()
594 } else {
595 this.unsetTasksStealingOnBackPressure()
596 }
597 } else if (this.opts.tasksQueueOptions != null) {
598 delete this.opts.tasksQueueOptions
599 }
600 }
601
602 private buildTasksQueueOptions (
603 tasksQueueOptions: TasksQueueOptions | undefined
604 ): TasksQueueOptions {
605 return {
606 ...getDefaultTasksQueueOptions(
607 this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
608 ),
609 ...tasksQueueOptions
610 }
611 }
612
613 private setTasksQueueSize (size: number): void {
614 for (const workerNode of this.workerNodes) {
615 workerNode.tasksQueueBackPressureSize = size
616 }
617 }
618
619 private setTaskStealing (): void {
620 for (const [workerNodeKey] of this.workerNodes.entries()) {
621 this.workerNodes[workerNodeKey].on('idle', this.handleWorkerNodeIdleEvent)
622 }
623 }
624
625 private unsetTaskStealing (): void {
626 for (const [workerNodeKey] of this.workerNodes.entries()) {
627 this.workerNodes[workerNodeKey].off(
628 'idle',
629 this.handleWorkerNodeIdleEvent
630 )
631 }
632 }
633
634 private setTasksStealingOnBackPressure (): void {
635 for (const [workerNodeKey] of this.workerNodes.entries()) {
636 this.workerNodes[workerNodeKey].on(
637 'backPressure',
638 this.handleWorkerNodeBackPressureEvent
639 )
640 }
641 }
642
643 private unsetTasksStealingOnBackPressure (): void {
644 for (const [workerNodeKey] of this.workerNodes.entries()) {
645 this.workerNodes[workerNodeKey].off(
646 'backPressure',
647 this.handleWorkerNodeBackPressureEvent
648 )
649 }
650 }
651
652 /**
653 * Whether the pool is full or not.
654 *
655 * The pool filling boolean status.
656 */
657 protected get full (): boolean {
658 return (
659 this.workerNodes.length >=
660 (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
661 )
662 }
663
664 /**
665 * Whether the pool is busy or not.
666 *
667 * The pool busyness boolean status.
668 */
669 protected abstract get busy (): boolean
670
671 /**
672 * Whether worker nodes are executing concurrently their tasks quota or not.
673 *
674 * @returns Worker nodes busyness boolean status.
675 */
676 protected internalBusy (): boolean {
677 if (this.opts.enableTasksQueue === true) {
678 return (
679 this.workerNodes.findIndex(
680 workerNode =>
681 workerNode.info.ready &&
682 workerNode.usage.tasks.executing <
683 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
684 this.opts.tasksQueueOptions!.concurrency!
685 ) === -1
686 )
687 }
688 return (
689 this.workerNodes.findIndex(
690 workerNode =>
691 workerNode.info.ready && workerNode.usage.tasks.executing === 0
692 ) === -1
693 )
694 }
695
696 private isWorkerNodeBusy (workerNodeKey: number): boolean {
697 if (this.opts.enableTasksQueue === true) {
698 return (
699 this.workerNodes[workerNodeKey].usage.tasks.executing >=
700 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
701 this.opts.tasksQueueOptions!.concurrency!
702 )
703 }
704 return this.workerNodes[workerNodeKey].usage.tasks.executing > 0
705 }
706
707 private async sendTaskFunctionOperationToWorker (
708 workerNodeKey: number,
709 message: MessageValue<Data>
710 ): Promise<boolean> {
711 return await new Promise<boolean>((resolve, reject) => {
712 const taskFunctionOperationListener = (
713 message: MessageValue<Response>
714 ): void => {
715 this.checkMessageWorkerId(message)
716 const workerId = this.getWorkerInfo(workerNodeKey)?.id
717 if (
718 message.taskFunctionOperationStatus != null &&
719 message.workerId === workerId
720 ) {
721 if (message.taskFunctionOperationStatus) {
722 resolve(true)
723 } else {
724 reject(
725 new Error(
726 `Task function operation '${message.taskFunctionOperation}' failed on worker ${message.workerId} with error: '${message.workerError?.message}'`
727 )
728 )
729 }
730 this.deregisterWorkerMessageListener(
731 this.getWorkerNodeKeyByWorkerId(message.workerId),
732 taskFunctionOperationListener
733 )
734 }
735 }
736 this.registerWorkerMessageListener(
737 workerNodeKey,
738 taskFunctionOperationListener
739 )
740 this.sendToWorker(workerNodeKey, message)
741 })
742 }
743
744 private async sendTaskFunctionOperationToWorkers (
745 message: MessageValue<Data>
746 ): Promise<boolean> {
747 return await new Promise<boolean>((resolve, reject) => {
748 const responsesReceived = new Array<MessageValue<Response>>()
749 const taskFunctionOperationsListener = (
750 message: MessageValue<Response>
751 ): void => {
752 this.checkMessageWorkerId(message)
753 if (message.taskFunctionOperationStatus != null) {
754 responsesReceived.push(message)
755 if (responsesReceived.length === this.workerNodes.length) {
756 if (
757 responsesReceived.every(
758 message => message.taskFunctionOperationStatus === true
759 )
760 ) {
761 resolve(true)
762 } else if (
763 responsesReceived.some(
764 message => message.taskFunctionOperationStatus === false
765 )
766 ) {
767 const errorResponse = responsesReceived.find(
768 response => response.taskFunctionOperationStatus === false
769 )
770 reject(
771 new Error(
772 `Task function operation '${
773 message.taskFunctionOperation as string
774 }' failed on worker ${errorResponse?.workerId} with error: '${
775 errorResponse?.workerError?.message
776 }'`
777 )
778 )
779 }
780 this.deregisterWorkerMessageListener(
781 this.getWorkerNodeKeyByWorkerId(message.workerId),
782 taskFunctionOperationsListener
783 )
784 }
785 }
786 }
787 for (const [workerNodeKey] of this.workerNodes.entries()) {
788 this.registerWorkerMessageListener(
789 workerNodeKey,
790 taskFunctionOperationsListener
791 )
792 this.sendToWorker(workerNodeKey, message)
793 }
794 })
795 }
796
797 /** @inheritDoc */
798 public hasTaskFunction (name: string): boolean {
799 for (const workerNode of this.workerNodes) {
800 if (
801 Array.isArray(workerNode.info.taskFunctionNames) &&
802 workerNode.info.taskFunctionNames.includes(name)
803 ) {
804 return true
805 }
806 }
807 return false
808 }
809
810 /** @inheritDoc */
811 public async addTaskFunction (
812 name: string,
813 fn: TaskFunction<Data, Response>
814 ): Promise<boolean> {
815 if (typeof name !== 'string') {
816 throw new TypeError('name argument must be a string')
817 }
818 if (typeof name === 'string' && name.trim().length === 0) {
819 throw new TypeError('name argument must not be an empty string')
820 }
821 if (typeof fn !== 'function') {
822 throw new TypeError('fn argument must be a function')
823 }
824 const opResult = await this.sendTaskFunctionOperationToWorkers({
825 taskFunctionOperation: 'add',
826 taskFunctionName: name,
827 taskFunction: fn.toString()
828 })
829 this.taskFunctions.set(name, fn)
830 return opResult
831 }
832
833 /** @inheritDoc */
834 public async removeTaskFunction (name: string): Promise<boolean> {
835 if (!this.taskFunctions.has(name)) {
836 throw new Error(
837 'Cannot remove a task function not handled on the pool side'
838 )
839 }
840 const opResult = await this.sendTaskFunctionOperationToWorkers({
841 taskFunctionOperation: 'remove',
842 taskFunctionName: name
843 })
844 this.deleteTaskFunctionWorkerUsages(name)
845 this.taskFunctions.delete(name)
846 return opResult
847 }
848
849 /** @inheritDoc */
850 public listTaskFunctionNames (): string[] {
851 for (const workerNode of this.workerNodes) {
852 if (
853 Array.isArray(workerNode.info.taskFunctionNames) &&
854 workerNode.info.taskFunctionNames.length > 0
855 ) {
856 return workerNode.info.taskFunctionNames
857 }
858 }
859 return []
860 }
861
862 /** @inheritDoc */
863 public async setDefaultTaskFunction (name: string): Promise<boolean> {
864 return await this.sendTaskFunctionOperationToWorkers({
865 taskFunctionOperation: 'default',
866 taskFunctionName: name
867 })
868 }
869
870 private deleteTaskFunctionWorkerUsages (name: string): void {
871 for (const workerNode of this.workerNodes) {
872 workerNode.deleteTaskFunctionWorkerUsage(name)
873 }
874 }
875
876 private shallExecuteTask (workerNodeKey: number): boolean {
877 return (
878 this.tasksQueueSize(workerNodeKey) === 0 &&
879 this.workerNodes[workerNodeKey].usage.tasks.executing <
880 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
881 this.opts.tasksQueueOptions!.concurrency!
882 )
883 }
884
885 /** @inheritDoc */
886 public async execute (
887 data?: Data,
888 name?: string,
889 transferList?: TransferListItem[]
890 ): Promise<Response> {
891 return await new Promise<Response>((resolve, reject) => {
892 if (!this.started) {
893 reject(new Error('Cannot execute a task on not started pool'))
894 return
895 }
896 if (this.destroying) {
897 reject(new Error('Cannot execute a task on destroying pool'))
898 return
899 }
900 if (name != null && typeof name !== 'string') {
901 reject(new TypeError('name argument must be a string'))
902 return
903 }
904 if (
905 name != null &&
906 typeof name === 'string' &&
907 name.trim().length === 0
908 ) {
909 reject(new TypeError('name argument must not be an empty string'))
910 return
911 }
912 if (transferList != null && !Array.isArray(transferList)) {
913 reject(new TypeError('transferList argument must be an array'))
914 return
915 }
916 const timestamp = performance.now()
917 const workerNodeKey = this.chooseWorkerNode()
918 const task: Task<Data> = {
919 name: name ?? DEFAULT_TASK_NAME,
920 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
921 data: data ?? ({} as Data),
922 transferList,
923 timestamp,
924 taskId: randomUUID()
925 }
926 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
927 this.promiseResponseMap.set(task.taskId!, {
928 resolve,
929 reject,
930 workerNodeKey,
931 ...(this.emitter != null && {
932 asyncResource: new AsyncResource('poolifier:task', {
933 triggerAsyncId: this.emitter.asyncId,
934 requireManualDestroy: true
935 })
936 })
937 })
938 if (
939 this.opts.enableTasksQueue === false ||
940 (this.opts.enableTasksQueue === true &&
941 this.shallExecuteTask(workerNodeKey))
942 ) {
943 this.executeTask(workerNodeKey, task)
944 } else {
945 this.enqueueTask(workerNodeKey, task)
946 }
947 })
948 }
949
950 /** @inheritdoc */
951 public start (): void {
952 if (this.started) {
953 throw new Error('Cannot start an already started pool')
954 }
955 if (this.starting) {
956 throw new Error('Cannot start an already starting pool')
957 }
958 if (this.destroying) {
959 throw new Error('Cannot start a destroying pool')
960 }
961 this.starting = true
962 while (
963 this.workerNodes.reduce(
964 (accumulator, workerNode) =>
965 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
966 0
967 ) < this.minimumNumberOfWorkers
968 ) {
969 this.createAndSetupWorkerNode()
970 }
971 this.starting = false
972 this.started = true
973 }
974
975 /** @inheritDoc */
976 public async destroy (): Promise<void> {
977 if (!this.started) {
978 throw new Error('Cannot destroy an already destroyed pool')
979 }
980 if (this.starting) {
981 throw new Error('Cannot destroy an starting pool')
982 }
983 if (this.destroying) {
984 throw new Error('Cannot destroy an already destroying pool')
985 }
986 this.destroying = true
987 await Promise.all(
988 this.workerNodes.map(async (_workerNode, workerNodeKey) => {
989 await this.destroyWorkerNode(workerNodeKey)
990 })
991 )
992 this.emitter?.emit(PoolEvents.destroy, this.info)
993 this.emitter?.emitDestroy()
994 this.emitter?.removeAllListeners()
995 this.readyEventEmitted = false
996 this.destroying = false
997 this.started = false
998 }
999
1000 private async sendKillMessageToWorker (workerNodeKey: number): Promise<void> {
1001 await new Promise<void>((resolve, reject) => {
1002 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1003 if (this.workerNodes[workerNodeKey] == null) {
1004 resolve()
1005 return
1006 }
1007 const killMessageListener = (message: MessageValue<Response>): void => {
1008 this.checkMessageWorkerId(message)
1009 if (message.kill === 'success') {
1010 resolve()
1011 } else if (message.kill === 'failure') {
1012 reject(
1013 new Error(
1014 `Kill message handling failed on worker ${message.workerId}`
1015 )
1016 )
1017 }
1018 }
1019 // FIXME: should be registered only once
1020 this.registerWorkerMessageListener(workerNodeKey, killMessageListener)
1021 this.sendToWorker(workerNodeKey, { kill: true })
1022 })
1023 }
1024
1025 /**
1026 * Terminates the worker node given its worker node key.
1027 *
1028 * @param workerNodeKey - The worker node key.
1029 */
1030 protected async destroyWorkerNode (workerNodeKey: number): Promise<void> {
1031 this.flagWorkerNodeAsNotReady(workerNodeKey)
1032 const flushedTasks = this.flushTasksQueue(workerNodeKey)
1033 const workerNode = this.workerNodes[workerNodeKey]
1034 await waitWorkerNodeEvents(
1035 workerNode,
1036 'taskFinished',
1037 flushedTasks,
1038 this.opts.tasksQueueOptions?.tasksFinishedTimeout ??
1039 getDefaultTasksQueueOptions(
1040 this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
1041 ).tasksFinishedTimeout
1042 )
1043 await this.sendKillMessageToWorker(workerNodeKey)
1044 await workerNode.terminate()
1045 }
1046
1047 /**
1048 * Setup hook to execute code before worker nodes are created in the abstract constructor.
1049 * Can be overridden.
1050 *
1051 * @virtual
1052 */
1053 protected setupHook (): void {
1054 /* Intentionally empty */
1055 }
1056
1057 /**
1058 * Should return whether the worker is the main worker or not.
1059 */
1060 protected abstract isMain (): boolean
1061
1062 /**
1063 * Hook executed before the worker task execution.
1064 * Can be overridden.
1065 *
1066 * @param workerNodeKey - The worker node key.
1067 * @param task - The task to execute.
1068 */
1069 protected beforeTaskExecutionHook (
1070 workerNodeKey: number,
1071 task: Task<Data>
1072 ): void {
1073 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1074 if (this.workerNodes[workerNodeKey]?.usage != null) {
1075 const workerUsage = this.workerNodes[workerNodeKey].usage
1076 ++workerUsage.tasks.executing
1077 updateWaitTimeWorkerUsage(
1078 this.workerChoiceStrategyContext,
1079 workerUsage,
1080 task
1081 )
1082 }
1083 if (
1084 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1085 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1086 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(task.name!) !=
1087 null
1088 ) {
1089 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1090 const taskFunctionWorkerUsage = this.workerNodes[
1091 workerNodeKey
1092 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1093 ].getTaskFunctionWorkerUsage(task.name!)!
1094 ++taskFunctionWorkerUsage.tasks.executing
1095 updateWaitTimeWorkerUsage(
1096 this.workerChoiceStrategyContext,
1097 taskFunctionWorkerUsage,
1098 task
1099 )
1100 }
1101 }
1102
1103 /**
1104 * Hook executed after the worker task execution.
1105 * Can be overridden.
1106 *
1107 * @param workerNodeKey - The worker node key.
1108 * @param message - The received message.
1109 */
1110 protected afterTaskExecutionHook (
1111 workerNodeKey: number,
1112 message: MessageValue<Response>
1113 ): void {
1114 let needWorkerChoiceStrategyUpdate = false
1115 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1116 if (this.workerNodes[workerNodeKey]?.usage != null) {
1117 const workerUsage = this.workerNodes[workerNodeKey].usage
1118 updateTaskStatisticsWorkerUsage(workerUsage, message)
1119 updateRunTimeWorkerUsage(
1120 this.workerChoiceStrategyContext,
1121 workerUsage,
1122 message
1123 )
1124 updateEluWorkerUsage(
1125 this.workerChoiceStrategyContext,
1126 workerUsage,
1127 message
1128 )
1129 needWorkerChoiceStrategyUpdate = true
1130 }
1131 if (
1132 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1133 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
1134 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1135 message.taskPerformance!.name
1136 ) != null
1137 ) {
1138 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1139 const taskFunctionWorkerUsage = this.workerNodes[
1140 workerNodeKey
1141 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1142 ].getTaskFunctionWorkerUsage(message.taskPerformance!.name)!
1143 updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
1144 updateRunTimeWorkerUsage(
1145 this.workerChoiceStrategyContext,
1146 taskFunctionWorkerUsage,
1147 message
1148 )
1149 updateEluWorkerUsage(
1150 this.workerChoiceStrategyContext,
1151 taskFunctionWorkerUsage,
1152 message
1153 )
1154 needWorkerChoiceStrategyUpdate = true
1155 }
1156 if (needWorkerChoiceStrategyUpdate) {
1157 this.workerChoiceStrategyContext?.update(workerNodeKey)
1158 }
1159 }
1160
1161 /**
1162 * Whether the worker node shall update its task function worker usage or not.
1163 *
1164 * @param workerNodeKey - The worker node key.
1165 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1166 */
1167 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
1168 const workerInfo = this.getWorkerInfo(workerNodeKey)
1169 return (
1170 workerInfo != null &&
1171 Array.isArray(workerInfo.taskFunctionNames) &&
1172 workerInfo.taskFunctionNames.length > 2
1173 )
1174 }
1175
1176 /**
1177 * Chooses a worker node for the next task.
1178 *
1179 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
1180 *
1181 * @returns The chosen worker node key
1182 */
1183 private chooseWorkerNode (): number {
1184 if (this.shallCreateDynamicWorker()) {
1185 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
1186 if (
1187 this.workerChoiceStrategyContext?.getStrategyPolicy()
1188 .dynamicWorkerUsage === true
1189 ) {
1190 return workerNodeKey
1191 }
1192 }
1193 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1194 return this.workerChoiceStrategyContext!.execute()
1195 }
1196
1197 /**
1198 * Conditions for dynamic worker creation.
1199 *
1200 * @returns Whether to create a dynamic worker or not.
1201 */
1202 protected abstract shallCreateDynamicWorker (): boolean
1203
1204 /**
1205 * Sends a message to worker given its worker node key.
1206 *
1207 * @param workerNodeKey - The worker node key.
1208 * @param message - The message.
1209 * @param transferList - The optional array of transferable objects.
1210 */
1211 protected abstract sendToWorker (
1212 workerNodeKey: number,
1213 message: MessageValue<Data>,
1214 transferList?: TransferListItem[]
1215 ): void
1216
1217 /**
1218 * Creates a new, completely set up worker node.
1219 *
1220 * @returns New, completely set up worker node key.
1221 */
1222 protected createAndSetupWorkerNode (): number {
1223 const workerNode = this.createWorkerNode()
1224 workerNode.registerWorkerEventHandler(
1225 'online',
1226 this.opts.onlineHandler ?? EMPTY_FUNCTION
1227 )
1228 workerNode.registerWorkerEventHandler(
1229 'message',
1230 this.opts.messageHandler ?? EMPTY_FUNCTION
1231 )
1232 workerNode.registerWorkerEventHandler(
1233 'error',
1234 this.opts.errorHandler ?? EMPTY_FUNCTION
1235 )
1236 workerNode.registerWorkerEventHandler('error', (error: Error) => {
1237 workerNode.info.ready = false
1238 this.emitter?.emit(PoolEvents.error, error)
1239 if (
1240 this.started &&
1241 !this.destroying &&
1242 this.opts.restartWorkerOnError === true
1243 ) {
1244 if (workerNode.info.dynamic) {
1245 this.createAndSetupDynamicWorkerNode()
1246 } else {
1247 this.createAndSetupWorkerNode()
1248 }
1249 }
1250 if (
1251 this.started &&
1252 !this.destroying &&
1253 this.opts.enableTasksQueue === true
1254 ) {
1255 this.redistributeQueuedTasks(this.workerNodes.indexOf(workerNode))
1256 }
1257 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1258 workerNode?.terminate().catch(error => {
1259 this.emitter?.emit(PoolEvents.error, error)
1260 })
1261 })
1262 workerNode.registerWorkerEventHandler(
1263 'exit',
1264 this.opts.exitHandler ?? EMPTY_FUNCTION
1265 )
1266 workerNode.registerOnceWorkerEventHandler('exit', () => {
1267 this.removeWorkerNode(workerNode)
1268 })
1269 const workerNodeKey = this.addWorkerNode(workerNode)
1270 this.afterWorkerNodeSetup(workerNodeKey)
1271 return workerNodeKey
1272 }
1273
1274 /**
1275 * Creates a new, completely set up dynamic worker node.
1276 *
1277 * @returns New, completely set up dynamic worker node key.
1278 */
1279 protected createAndSetupDynamicWorkerNode (): number {
1280 const workerNodeKey = this.createAndSetupWorkerNode()
1281 this.registerWorkerMessageListener(workerNodeKey, message => {
1282 this.checkMessageWorkerId(message)
1283 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1284 message.workerId
1285 )
1286 const workerUsage = this.workerNodes[localWorkerNodeKey]?.usage
1287 // Kill message received from worker
1288 if (
1289 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1290 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
1291 ((this.opts.enableTasksQueue === false &&
1292 workerUsage.tasks.executing === 0) ||
1293 (this.opts.enableTasksQueue === true &&
1294 workerUsage.tasks.executing === 0 &&
1295 this.tasksQueueSize(localWorkerNodeKey) === 0)))
1296 ) {
1297 // Flag the worker node as not ready immediately
1298 this.flagWorkerNodeAsNotReady(localWorkerNodeKey)
1299 this.destroyWorkerNode(localWorkerNodeKey).catch(error => {
1300 this.emitter?.emit(PoolEvents.error, error)
1301 })
1302 }
1303 })
1304 this.sendToWorker(workerNodeKey, {
1305 checkActive: true
1306 })
1307 if (this.taskFunctions.size > 0) {
1308 for (const [taskFunctionName, taskFunction] of this.taskFunctions) {
1309 this.sendTaskFunctionOperationToWorker(workerNodeKey, {
1310 taskFunctionOperation: 'add',
1311 taskFunctionName,
1312 taskFunction: taskFunction.toString()
1313 }).catch(error => {
1314 this.emitter?.emit(PoolEvents.error, error)
1315 })
1316 }
1317 }
1318 const workerNode = this.workerNodes[workerNodeKey]
1319 workerNode.info.dynamic = true
1320 if (
1321 this.workerChoiceStrategyContext?.getStrategyPolicy()
1322 .dynamicWorkerReady === true ||
1323 this.workerChoiceStrategyContext?.getStrategyPolicy()
1324 .dynamicWorkerUsage === true
1325 ) {
1326 workerNode.info.ready = true
1327 }
1328 this.checkAndEmitDynamicWorkerCreationEvents()
1329 return workerNodeKey
1330 }
1331
1332 /**
1333 * Registers a listener callback on the worker given its worker node key.
1334 *
1335 * @param workerNodeKey - The worker node key.
1336 * @param listener - The message listener callback.
1337 */
1338 protected abstract registerWorkerMessageListener<
1339 Message extends Data | Response
1340 >(
1341 workerNodeKey: number,
1342 listener: (message: MessageValue<Message>) => void
1343 ): void
1344
1345 /**
1346 * Registers once a listener callback on the worker given its worker node key.
1347 *
1348 * @param workerNodeKey - The worker node key.
1349 * @param listener - The message listener callback.
1350 */
1351 protected abstract registerOnceWorkerMessageListener<
1352 Message extends Data | Response
1353 >(
1354 workerNodeKey: number,
1355 listener: (message: MessageValue<Message>) => void
1356 ): void
1357
1358 /**
1359 * Deregisters a listener callback on the worker given its worker node key.
1360 *
1361 * @param workerNodeKey - The worker node key.
1362 * @param listener - The message listener callback.
1363 */
1364 protected abstract deregisterWorkerMessageListener<
1365 Message extends Data | Response
1366 >(
1367 workerNodeKey: number,
1368 listener: (message: MessageValue<Message>) => void
1369 ): void
1370
1371 /**
1372 * Method hooked up after a worker node has been newly created.
1373 * Can be overridden.
1374 *
1375 * @param workerNodeKey - The newly created worker node key.
1376 */
1377 protected afterWorkerNodeSetup (workerNodeKey: number): void {
1378 // Listen to worker messages.
1379 this.registerWorkerMessageListener(
1380 workerNodeKey,
1381 this.workerMessageListener
1382 )
1383 // Send the startup message to worker.
1384 this.sendStartupMessageToWorker(workerNodeKey)
1385 // Send the statistics message to worker.
1386 this.sendStatisticsMessageToWorker(workerNodeKey)
1387 if (this.opts.enableTasksQueue === true) {
1388 if (this.opts.tasksQueueOptions?.taskStealing === true) {
1389 this.workerNodes[workerNodeKey].on(
1390 'idle',
1391 this.handleWorkerNodeIdleEvent
1392 )
1393 }
1394 if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
1395 this.workerNodes[workerNodeKey].on(
1396 'backPressure',
1397 this.handleWorkerNodeBackPressureEvent
1398 )
1399 }
1400 }
1401 }
1402
1403 /**
1404 * Sends the startup message to worker given its worker node key.
1405 *
1406 * @param workerNodeKey - The worker node key.
1407 */
1408 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1409
1410 /**
1411 * Sends the statistics message to worker given its worker node key.
1412 *
1413 * @param workerNodeKey - The worker node key.
1414 */
1415 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
1416 this.sendToWorker(workerNodeKey, {
1417 statistics: {
1418 runTime:
1419 this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
1420 ?.runTime.aggregate ?? false,
1421 elu:
1422 this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()?.elu
1423 .aggregate ?? false
1424 }
1425 })
1426 }
1427
1428 private cannotStealTask (): boolean {
1429 return this.workerNodes.length <= 1 || this.info.queuedTasks === 0
1430 }
1431
1432 private handleTask (workerNodeKey: number, task: Task<Data>): void {
1433 if (this.shallExecuteTask(workerNodeKey)) {
1434 this.executeTask(workerNodeKey, task)
1435 } else {
1436 this.enqueueTask(workerNodeKey, task)
1437 }
1438 }
1439
1440 private redistributeQueuedTasks (workerNodeKey: number): void {
1441 if (workerNodeKey === -1 || this.cannotStealTask()) {
1442 return
1443 }
1444 while (this.tasksQueueSize(workerNodeKey) > 0) {
1445 const destinationWorkerNodeKey = this.workerNodes.reduce(
1446 (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
1447 return workerNode.info.ready &&
1448 workerNode.usage.tasks.queued <
1449 workerNodes[minWorkerNodeKey].usage.tasks.queued
1450 ? workerNodeKey
1451 : minWorkerNodeKey
1452 },
1453 0
1454 )
1455 this.handleTask(
1456 destinationWorkerNodeKey,
1457 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1458 this.dequeueTask(workerNodeKey)!
1459 )
1460 }
1461 }
1462
1463 private updateTaskStolenStatisticsWorkerUsage (
1464 workerNodeKey: number,
1465 taskName: string
1466 ): void {
1467 const workerNode = this.workerNodes[workerNodeKey]
1468 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1469 if (workerNode?.usage != null) {
1470 ++workerNode.usage.tasks.stolen
1471 }
1472 if (
1473 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1474 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1475 ) {
1476 const taskFunctionWorkerUsage =
1477 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1478 workerNode.getTaskFunctionWorkerUsage(taskName)!
1479 ++taskFunctionWorkerUsage.tasks.stolen
1480 }
1481 }
1482
1483 private updateTaskSequentiallyStolenStatisticsWorkerUsage (
1484 workerNodeKey: number
1485 ): void {
1486 const workerNode = this.workerNodes[workerNodeKey]
1487 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1488 if (workerNode?.usage != null) {
1489 ++workerNode.usage.tasks.sequentiallyStolen
1490 }
1491 }
1492
1493 private updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1494 workerNodeKey: number,
1495 taskName: string
1496 ): void {
1497 const workerNode = this.workerNodes[workerNodeKey]
1498 if (
1499 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1500 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1501 ) {
1502 const taskFunctionWorkerUsage =
1503 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1504 workerNode.getTaskFunctionWorkerUsage(taskName)!
1505 ++taskFunctionWorkerUsage.tasks.sequentiallyStolen
1506 }
1507 }
1508
1509 private resetTaskSequentiallyStolenStatisticsWorkerUsage (
1510 workerNodeKey: number
1511 ): void {
1512 const workerNode = this.workerNodes[workerNodeKey]
1513 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1514 if (workerNode?.usage != null) {
1515 workerNode.usage.tasks.sequentiallyStolen = 0
1516 }
1517 }
1518
1519 private resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1520 workerNodeKey: number,
1521 taskName: string
1522 ): void {
1523 const workerNode = this.workerNodes[workerNodeKey]
1524 if (
1525 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1526 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1527 ) {
1528 const taskFunctionWorkerUsage =
1529 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1530 workerNode.getTaskFunctionWorkerUsage(taskName)!
1531 taskFunctionWorkerUsage.tasks.sequentiallyStolen = 0
1532 }
1533 }
1534
1535 private readonly handleWorkerNodeIdleEvent = (
1536 eventDetail: WorkerNodeEventDetail,
1537 previousStolenTask?: Task<Data>
1538 ): void => {
1539 const { workerNodeKey } = eventDetail
1540 if (workerNodeKey == null) {
1541 throw new Error(
1542 "WorkerNode event detail 'workerNodeKey' property must be defined"
1543 )
1544 }
1545 const workerInfo = this.getWorkerInfo(workerNodeKey)
1546 if (
1547 this.cannotStealTask() ||
1548 (this.info.stealingWorkerNodes ?? 0) >
1549 Math.floor(this.workerNodes.length / 2)
1550 ) {
1551 if (workerInfo != null && previousStolenTask != null) {
1552 workerInfo.stealing = false
1553 }
1554 return
1555 }
1556 const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks
1557 if (
1558 workerInfo != null &&
1559 previousStolenTask != null &&
1560 workerNodeTasksUsage.sequentiallyStolen > 0 &&
1561 (workerNodeTasksUsage.executing > 0 ||
1562 this.tasksQueueSize(workerNodeKey) > 0)
1563 ) {
1564 workerInfo.stealing = false
1565 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1566 for (const taskName of this.workerNodes[workerNodeKey].info
1567 .taskFunctionNames!) {
1568 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1569 workerNodeKey,
1570 taskName
1571 )
1572 }
1573 this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
1574 return
1575 }
1576 if (workerInfo == null) {
1577 throw new Error(
1578 `Worker node with key '${workerNodeKey}' not found in pool`
1579 )
1580 }
1581 workerInfo.stealing = true
1582 const stolenTask = this.workerNodeStealTask(workerNodeKey)
1583 if (
1584 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1585 stolenTask != null
1586 ) {
1587 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1588 const taskFunctionTasksWorkerUsage = this.workerNodes[
1589 workerNodeKey
1590 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1591 ].getTaskFunctionWorkerUsage(stolenTask.name!)!.tasks
1592 if (
1593 taskFunctionTasksWorkerUsage.sequentiallyStolen === 0 ||
1594 (previousStolenTask != null &&
1595 previousStolenTask.name === stolenTask.name &&
1596 taskFunctionTasksWorkerUsage.sequentiallyStolen > 0)
1597 ) {
1598 this.updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1599 workerNodeKey,
1600 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1601 stolenTask.name!
1602 )
1603 } else {
1604 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1605 workerNodeKey,
1606 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1607 stolenTask.name!
1608 )
1609 }
1610 }
1611 sleep(exponentialDelay(workerNodeTasksUsage.sequentiallyStolen))
1612 .then(() => {
1613 this.handleWorkerNodeIdleEvent(eventDetail, stolenTask)
1614 return undefined
1615 })
1616 .catch(error => {
1617 this.emitter?.emit(PoolEvents.error, error)
1618 })
1619 }
1620
1621 private readonly workerNodeStealTask = (
1622 workerNodeKey: number
1623 ): Task<Data> | undefined => {
1624 const workerNodes = this.workerNodes
1625 .slice()
1626 .sort(
1627 (workerNodeA, workerNodeB) =>
1628 workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
1629 )
1630 const sourceWorkerNode = workerNodes.find(
1631 (sourceWorkerNode, sourceWorkerNodeKey) =>
1632 sourceWorkerNode.info.ready &&
1633 !sourceWorkerNode.info.stealing &&
1634 sourceWorkerNodeKey !== workerNodeKey &&
1635 sourceWorkerNode.usage.tasks.queued > 0
1636 )
1637 if (sourceWorkerNode != null) {
1638 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1639 const task = sourceWorkerNode.popTask()!
1640 this.handleTask(workerNodeKey, task)
1641 this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
1642 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1643 this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey, task.name!)
1644 return task
1645 }
1646 }
1647
1648 private readonly handleWorkerNodeBackPressureEvent = (
1649 eventDetail: WorkerNodeEventDetail
1650 ): void => {
1651 if (
1652 this.cannotStealTask() ||
1653 (this.info.stealingWorkerNodes ?? 0) >
1654 Math.floor(this.workerNodes.length / 2)
1655 ) {
1656 return
1657 }
1658 const { workerId } = eventDetail
1659 const sizeOffset = 1
1660 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1661 if (this.opts.tasksQueueOptions!.size! <= sizeOffset) {
1662 return
1663 }
1664 const sourceWorkerNode =
1665 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
1666 const workerNodes = this.workerNodes
1667 .slice()
1668 .sort(
1669 (workerNodeA, workerNodeB) =>
1670 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1671 )
1672 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1673 if (
1674 sourceWorkerNode.usage.tasks.queued > 0 &&
1675 workerNode.info.ready &&
1676 !workerNode.info.stealing &&
1677 workerNode.info.id !== workerId &&
1678 workerNode.usage.tasks.queued <
1679 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1680 this.opts.tasksQueueOptions!.size! - sizeOffset
1681 ) {
1682 const workerInfo = this.getWorkerInfo(workerNodeKey)
1683 if (workerInfo == null) {
1684 throw new Error(
1685 `Worker node with key '${workerNodeKey}' not found in pool`
1686 )
1687 }
1688 workerInfo.stealing = true
1689 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1690 const task = sourceWorkerNode.popTask()!
1691 this.handleTask(workerNodeKey, task)
1692 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1693 this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey, task.name!)
1694 workerInfo.stealing = false
1695 }
1696 }
1697 }
1698
1699 /**
1700 * This method is the message listener registered on each worker.
1701 */
1702 protected readonly workerMessageListener = (
1703 message: MessageValue<Response>
1704 ): void => {
1705 this.checkMessageWorkerId(message)
1706 const { workerId, ready, taskId, taskFunctionNames } = message
1707 if (ready != null && taskFunctionNames != null) {
1708 // Worker ready response received from worker
1709 this.handleWorkerReadyResponse(message)
1710 } else if (taskId != null) {
1711 // Task execution response received from worker
1712 this.handleTaskExecutionResponse(message)
1713 } else if (taskFunctionNames != null) {
1714 // Task function names message received from worker
1715 const workerInfo = this.getWorkerInfo(
1716 this.getWorkerNodeKeyByWorkerId(workerId)
1717 )
1718 if (workerInfo != null) {
1719 workerInfo.taskFunctionNames = taskFunctionNames
1720 }
1721 }
1722 }
1723
1724 private checkAndEmitReadyEvent (): void {
1725 if (!this.readyEventEmitted && this.ready) {
1726 this.emitter?.emit(PoolEvents.ready, this.info)
1727 this.readyEventEmitted = true
1728 }
1729 }
1730
1731 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
1732 const { workerId, ready, taskFunctionNames } = message
1733 if (ready == null || !ready) {
1734 throw new Error(`Worker ${workerId} failed to initialize`)
1735 }
1736 const workerNode =
1737 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
1738 workerNode.info.ready = ready
1739 workerNode.info.taskFunctionNames = taskFunctionNames
1740 this.checkAndEmitReadyEvent()
1741 }
1742
1743 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1744 const { workerId, taskId, workerError, data } = message
1745 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1746 const promiseResponse = this.promiseResponseMap.get(taskId!)
1747 if (promiseResponse != null) {
1748 const { resolve, reject, workerNodeKey, asyncResource } = promiseResponse
1749 const workerNode = this.workerNodes[workerNodeKey]
1750 if (workerError != null) {
1751 this.emitter?.emit(PoolEvents.taskError, workerError)
1752 asyncResource != null
1753 ? asyncResource.runInAsyncScope(
1754 reject,
1755 this.emitter,
1756 workerError.message
1757 )
1758 : reject(workerError.message)
1759 } else {
1760 asyncResource != null
1761 ? asyncResource.runInAsyncScope(resolve, this.emitter, data)
1762 : resolve(data as Response)
1763 }
1764 asyncResource?.emitDestroy()
1765 this.afterTaskExecutionHook(workerNodeKey, message)
1766 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1767 this.promiseResponseMap.delete(taskId!)
1768 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1769 workerNode?.emit('taskFinished', taskId)
1770 if (this.opts.enableTasksQueue === true && !this.destroying) {
1771 const workerNodeTasksUsage = workerNode.usage.tasks
1772 if (
1773 this.tasksQueueSize(workerNodeKey) > 0 &&
1774 workerNodeTasksUsage.executing <
1775 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1776 this.opts.tasksQueueOptions!.concurrency!
1777 ) {
1778 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1779 this.executeTask(workerNodeKey, this.dequeueTask(workerNodeKey)!)
1780 }
1781 if (
1782 workerNodeTasksUsage.executing === 0 &&
1783 this.tasksQueueSize(workerNodeKey) === 0 &&
1784 workerNodeTasksUsage.sequentiallyStolen === 0
1785 ) {
1786 workerNode.emit('idle', {
1787 workerId,
1788 workerNodeKey
1789 })
1790 }
1791 }
1792 }
1793 }
1794
1795 private checkAndEmitTaskExecutionEvents (): void {
1796 if (this.busy) {
1797 this.emitter?.emit(PoolEvents.busy, this.info)
1798 }
1799 }
1800
1801 private checkAndEmitTaskQueuingEvents (): void {
1802 if (this.hasBackPressure()) {
1803 this.emitter?.emit(PoolEvents.backPressure, this.info)
1804 }
1805 }
1806
1807 /**
1808 * Emits dynamic worker creation events.
1809 */
1810 protected abstract checkAndEmitDynamicWorkerCreationEvents (): void
1811
1812 /**
1813 * Gets the worker information given its worker node key.
1814 *
1815 * @param workerNodeKey - The worker node key.
1816 * @returns The worker information.
1817 */
1818 protected getWorkerInfo (workerNodeKey: number): WorkerInfo | undefined {
1819 return this.workerNodes[workerNodeKey]?.info
1820 }
1821
1822 /**
1823 * Creates a worker node.
1824 *
1825 * @returns The created worker node.
1826 */
1827 private createWorkerNode (): IWorkerNode<Worker, Data> {
1828 const workerNode = new WorkerNode<Worker, Data>(
1829 this.worker,
1830 this.filePath,
1831 {
1832 env: this.opts.env,
1833 workerOptions: this.opts.workerOptions,
1834 tasksQueueBackPressureSize:
1835 this.opts.tasksQueueOptions?.size ??
1836 getDefaultTasksQueueOptions(
1837 this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
1838 ).size
1839 }
1840 )
1841 // Flag the worker node as ready at pool startup.
1842 if (this.starting) {
1843 workerNode.info.ready = true
1844 }
1845 return workerNode
1846 }
1847
1848 /**
1849 * Adds the given worker node in the pool worker nodes.
1850 *
1851 * @param workerNode - The worker node.
1852 * @returns The added worker node key.
1853 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1854 */
1855 private addWorkerNode (workerNode: IWorkerNode<Worker, Data>): number {
1856 this.workerNodes.push(workerNode)
1857 const workerNodeKey = this.workerNodes.indexOf(workerNode)
1858 if (workerNodeKey === -1) {
1859 throw new Error('Worker added not found in worker nodes')
1860 }
1861 return workerNodeKey
1862 }
1863
1864 private checkAndEmitEmptyEvent (): void {
1865 if (this.empty) {
1866 this.emitter?.emit(PoolEvents.empty, this.info)
1867 this.readyEventEmitted = false
1868 }
1869 }
1870
1871 /**
1872 * Removes the worker node from the pool worker nodes.
1873 *
1874 * @param workerNode - The worker node.
1875 */
1876 private removeWorkerNode (workerNode: IWorkerNode<Worker, Data>): void {
1877 const workerNodeKey = this.workerNodes.indexOf(workerNode)
1878 if (workerNodeKey !== -1) {
1879 this.workerNodes.splice(workerNodeKey, 1)
1880 this.workerChoiceStrategyContext?.remove(workerNodeKey)
1881 }
1882 this.checkAndEmitEmptyEvent()
1883 }
1884
1885 protected flagWorkerNodeAsNotReady (workerNodeKey: number): void {
1886 const workerInfo = this.getWorkerInfo(workerNodeKey)
1887 if (workerInfo != null) {
1888 workerInfo.ready = false
1889 }
1890 }
1891
1892 private hasBackPressure (): boolean {
1893 return (
1894 this.opts.enableTasksQueue === true &&
1895 this.workerNodes.findIndex(
1896 workerNode => !workerNode.hasBackPressure()
1897 ) === -1
1898 )
1899 }
1900
1901 /**
1902 * Executes the given task on the worker given its worker node key.
1903 *
1904 * @param workerNodeKey - The worker node key.
1905 * @param task - The task to execute.
1906 */
1907 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1908 this.beforeTaskExecutionHook(workerNodeKey, task)
1909 this.sendToWorker(workerNodeKey, task, task.transferList)
1910 this.checkAndEmitTaskExecutionEvents()
1911 }
1912
1913 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1914 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1915 this.checkAndEmitTaskQueuingEvents()
1916 return tasksQueueSize
1917 }
1918
1919 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1920 return this.workerNodes[workerNodeKey].dequeueTask()
1921 }
1922
1923 private tasksQueueSize (workerNodeKey: number): number {
1924 return this.workerNodes[workerNodeKey].tasksQueueSize()
1925 }
1926
1927 protected flushTasksQueue (workerNodeKey: number): number {
1928 let flushedTasks = 0
1929 while (this.tasksQueueSize(workerNodeKey) > 0) {
1930 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1931 this.executeTask(workerNodeKey, this.dequeueTask(workerNodeKey)!)
1932 ++flushedTasks
1933 }
1934 this.workerNodes[workerNodeKey].clearTasksQueue()
1935 return flushedTasks
1936 }
1937
1938 private flushTasksQueues (): void {
1939 for (const [workerNodeKey] of this.workerNodes.entries()) {
1940 this.flushTasksQueue(workerNodeKey)
1941 }
1942 }
1943 }