perf: optimize tasks stealing in corner case
[poolifier.git] / src / pools / abstract-pool.ts
... / ...
CommitLineData
1import { randomUUID } from 'node:crypto'
2import { performance } from 'node:perf_hooks'
3import type { TransferListItem } from 'node:worker_threads'
4import { EventEmitterAsyncResource } from 'node:events'
5import type {
6 MessageValue,
7 PromiseResponseWrapper,
8 Task
9} from '../utility-types'
10import {
11 DEFAULT_TASK_NAME,
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
13 EMPTY_FUNCTION,
14 average,
15 exponentialDelay,
16 isKillBehavior,
17 isPlainObject,
18 max,
19 median,
20 min,
21 round,
22 sleep
23} from '../utils'
24import { KillBehaviors } from '../worker/worker-options'
25import type { TaskFunction } from '../worker/task-functions'
26import {
27 type IPool,
28 PoolEvents,
29 type PoolInfo,
30 type PoolOptions,
31 type PoolType,
32 PoolTypes,
33 type TasksQueueOptions
34} from './pool'
35import type {
36 IWorker,
37 IWorkerNode,
38 TaskStatistics,
39 WorkerInfo,
40 WorkerNodeEventDetail,
41 WorkerType,
42 WorkerUsage
43} from './worker'
44import {
45 type MeasurementStatisticsRequirements,
46 Measurements,
47 WorkerChoiceStrategies,
48 type WorkerChoiceStrategy,
49 type WorkerChoiceStrategyOptions
50} from './selection-strategies/selection-strategies-types'
51import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
52import { version } from './version'
53import { WorkerNode } from './worker-node'
54import {
55 checkFilePath,
56 checkValidTasksQueueOptions,
57 checkValidWorkerChoiceStrategy,
58 updateMeasurementStatistics
59} from './utils'
60
61/**
62 * Base class that implements some shared logic for all poolifier pools.
63 *
64 * @typeParam Worker - Type of worker which manages this pool.
65 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
66 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
67 */
68export abstract class AbstractPool<
69 Worker extends IWorker,
70 Data = unknown,
71 Response = unknown
72> implements IPool<Worker, Data, Response> {
73 /** @inheritDoc */
74 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
75
76 /** @inheritDoc */
77 public emitter?: EventEmitterAsyncResource
78
79 /**
80 * Dynamic pool maximum size property placeholder.
81 */
82 protected readonly max?: number
83
84 /**
85 * The task execution response promise map:
86 * - `key`: The message id of each submitted task.
87 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
88 *
89 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
90 */
91 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
92 new Map<string, PromiseResponseWrapper<Response>>()
93
94 /**
95 * Worker choice strategy context referencing a worker choice algorithm implementation.
96 */
97 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
98 Worker,
99 Data,
100 Response
101 >
102
103 /**
104 * The task functions added at runtime map:
105 * - `key`: The task function name.
106 * - `value`: The task function itself.
107 */
108 private readonly taskFunctions: Map<string, TaskFunction<Data, Response>>
109
110 /**
111 * Whether the pool is started or not.
112 */
113 private started: boolean
114 /**
115 * Whether the pool is starting or not.
116 */
117 private starting: boolean
118 /**
119 * Whether the pool is destroying or not.
120 */
121 private destroying: boolean
122 /**
123 * Whether the pool ready event has been emitted or not.
124 */
125 private readyEventEmitted: boolean
126 /**
127 * The start timestamp of the pool.
128 */
129 private readonly startTimestamp
130
131 /**
132 * Constructs a new poolifier pool.
133 *
134 * @param numberOfWorkers - Number of workers that this pool should manage.
135 * @param filePath - Path to the worker file.
136 * @param opts - Options for the pool.
137 */
138 public constructor (
139 protected readonly numberOfWorkers: number,
140 protected readonly filePath: string,
141 protected readonly opts: PoolOptions<Worker>
142 ) {
143 if (!this.isMain()) {
144 throw new Error(
145 'Cannot start a pool from a worker with the same type as the pool'
146 )
147 }
148 checkFilePath(this.filePath)
149 this.checkNumberOfWorkers(this.numberOfWorkers)
150 this.checkPoolOptions(this.opts)
151
152 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
153 this.executeTask = this.executeTask.bind(this)
154 this.enqueueTask = this.enqueueTask.bind(this)
155
156 if (this.opts.enableEvents === true) {
157 this.initializeEventEmitter()
158 }
159 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
160 Worker,
161 Data,
162 Response
163 >(
164 this,
165 this.opts.workerChoiceStrategy,
166 this.opts.workerChoiceStrategyOptions
167 )
168
169 this.setupHook()
170
171 this.taskFunctions = new Map<string, TaskFunction<Data, Response>>()
172
173 this.started = false
174 this.starting = false
175 this.destroying = false
176 this.readyEventEmitted = false
177 if (this.opts.startWorkers === true) {
178 this.start()
179 }
180
181 this.startTimestamp = performance.now()
182 }
183
184 private checkNumberOfWorkers (numberOfWorkers: number): void {
185 if (numberOfWorkers == null) {
186 throw new Error(
187 'Cannot instantiate a pool without specifying the number of workers'
188 )
189 } else if (!Number.isSafeInteger(numberOfWorkers)) {
190 throw new TypeError(
191 'Cannot instantiate a pool with a non safe integer number of workers'
192 )
193 } else if (numberOfWorkers < 0) {
194 throw new RangeError(
195 'Cannot instantiate a pool with a negative number of workers'
196 )
197 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
198 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
199 }
200 }
201
202 private checkPoolOptions (opts: PoolOptions<Worker>): void {
203 if (isPlainObject(opts)) {
204 this.opts.startWorkers = opts.startWorkers ?? true
205 checkValidWorkerChoiceStrategy(
206 opts.workerChoiceStrategy as WorkerChoiceStrategy
207 )
208 this.opts.workerChoiceStrategy =
209 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
210 this.checkValidWorkerChoiceStrategyOptions(
211 opts.workerChoiceStrategyOptions as WorkerChoiceStrategyOptions
212 )
213 this.opts.workerChoiceStrategyOptions = {
214 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
215 ...opts.workerChoiceStrategyOptions
216 }
217 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
218 this.opts.enableEvents = opts.enableEvents ?? true
219 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
220 if (this.opts.enableTasksQueue) {
221 checkValidTasksQueueOptions(opts.tasksQueueOptions as TasksQueueOptions)
222 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
223 opts.tasksQueueOptions as TasksQueueOptions
224 )
225 }
226 } else {
227 throw new TypeError('Invalid pool options: must be a plain object')
228 }
229 }
230
231 private checkValidWorkerChoiceStrategyOptions (
232 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
233 ): void {
234 if (
235 workerChoiceStrategyOptions != null &&
236 !isPlainObject(workerChoiceStrategyOptions)
237 ) {
238 throw new TypeError(
239 'Invalid worker choice strategy options: must be a plain object'
240 )
241 }
242 if (
243 workerChoiceStrategyOptions?.retries != null &&
244 !Number.isSafeInteger(workerChoiceStrategyOptions.retries)
245 ) {
246 throw new TypeError(
247 'Invalid worker choice strategy options: retries must be an integer'
248 )
249 }
250 if (
251 workerChoiceStrategyOptions?.retries != null &&
252 workerChoiceStrategyOptions.retries < 0
253 ) {
254 throw new RangeError(
255 `Invalid worker choice strategy options: retries '${workerChoiceStrategyOptions.retries}' must be greater or equal than zero`
256 )
257 }
258 if (
259 workerChoiceStrategyOptions?.weights != null &&
260 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
261 ) {
262 throw new Error(
263 'Invalid worker choice strategy options: must have a weight for each worker node'
264 )
265 }
266 if (
267 workerChoiceStrategyOptions?.measurement != null &&
268 !Object.values(Measurements).includes(
269 workerChoiceStrategyOptions.measurement
270 )
271 ) {
272 throw new Error(
273 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
274 )
275 }
276 }
277
278 private initializeEventEmitter (): void {
279 this.emitter = new EventEmitterAsyncResource({
280 name: `poolifier:${this.type}-${this.worker}-pool`
281 })
282 }
283
284 /** @inheritDoc */
285 public get info (): PoolInfo {
286 return {
287 version,
288 type: this.type,
289 worker: this.worker,
290 started: this.started,
291 ready: this.ready,
292 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
293 minSize: this.minSize,
294 maxSize: this.maxSize,
295 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
296 .runTime.aggregate &&
297 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
298 .waitTime.aggregate && { utilization: round(this.utilization) }),
299 workerNodes: this.workerNodes.length,
300 idleWorkerNodes: this.workerNodes.reduce(
301 (accumulator, workerNode) =>
302 workerNode.usage.tasks.executing === 0
303 ? accumulator + 1
304 : accumulator,
305 0
306 ),
307 busyWorkerNodes: this.workerNodes.reduce(
308 (accumulator, workerNode) =>
309 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
310 0
311 ),
312 executedTasks: this.workerNodes.reduce(
313 (accumulator, workerNode) =>
314 accumulator + workerNode.usage.tasks.executed,
315 0
316 ),
317 executingTasks: this.workerNodes.reduce(
318 (accumulator, workerNode) =>
319 accumulator + workerNode.usage.tasks.executing,
320 0
321 ),
322 ...(this.opts.enableTasksQueue === true && {
323 queuedTasks: this.workerNodes.reduce(
324 (accumulator, workerNode) =>
325 accumulator + workerNode.usage.tasks.queued,
326 0
327 )
328 }),
329 ...(this.opts.enableTasksQueue === true && {
330 maxQueuedTasks: this.workerNodes.reduce(
331 (accumulator, workerNode) =>
332 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
333 0
334 )
335 }),
336 ...(this.opts.enableTasksQueue === true && {
337 backPressure: this.hasBackPressure()
338 }),
339 ...(this.opts.enableTasksQueue === true && {
340 stolenTasks: this.workerNodes.reduce(
341 (accumulator, workerNode) =>
342 accumulator + workerNode.usage.tasks.stolen,
343 0
344 )
345 }),
346 failedTasks: this.workerNodes.reduce(
347 (accumulator, workerNode) =>
348 accumulator + workerNode.usage.tasks.failed,
349 0
350 ),
351 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
352 .runTime.aggregate && {
353 runTime: {
354 minimum: round(
355 min(
356 ...this.workerNodes.map(
357 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
358 )
359 )
360 ),
361 maximum: round(
362 max(
363 ...this.workerNodes.map(
364 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
365 )
366 )
367 ),
368 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
369 .runTime.average && {
370 average: round(
371 average(
372 this.workerNodes.reduce<number[]>(
373 (accumulator, workerNode) =>
374 accumulator.concat(workerNode.usage.runTime.history),
375 []
376 )
377 )
378 )
379 }),
380 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
381 .runTime.median && {
382 median: round(
383 median(
384 this.workerNodes.reduce<number[]>(
385 (accumulator, workerNode) =>
386 accumulator.concat(workerNode.usage.runTime.history),
387 []
388 )
389 )
390 )
391 })
392 }
393 }),
394 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
395 .waitTime.aggregate && {
396 waitTime: {
397 minimum: round(
398 min(
399 ...this.workerNodes.map(
400 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
401 )
402 )
403 ),
404 maximum: round(
405 max(
406 ...this.workerNodes.map(
407 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
408 )
409 )
410 ),
411 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
412 .waitTime.average && {
413 average: round(
414 average(
415 this.workerNodes.reduce<number[]>(
416 (accumulator, workerNode) =>
417 accumulator.concat(workerNode.usage.waitTime.history),
418 []
419 )
420 )
421 )
422 }),
423 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
424 .waitTime.median && {
425 median: round(
426 median(
427 this.workerNodes.reduce<number[]>(
428 (accumulator, workerNode) =>
429 accumulator.concat(workerNode.usage.waitTime.history),
430 []
431 )
432 )
433 )
434 })
435 }
436 })
437 }
438 }
439
440 /**
441 * The pool readiness boolean status.
442 */
443 private get ready (): boolean {
444 return (
445 this.workerNodes.reduce(
446 (accumulator, workerNode) =>
447 !workerNode.info.dynamic && workerNode.info.ready
448 ? accumulator + 1
449 : accumulator,
450 0
451 ) >= this.minSize
452 )
453 }
454
455 /**
456 * The approximate pool utilization.
457 *
458 * @returns The pool utilization.
459 */
460 private get utilization (): number {
461 const poolTimeCapacity =
462 (performance.now() - this.startTimestamp) * this.maxSize
463 const totalTasksRunTime = this.workerNodes.reduce(
464 (accumulator, workerNode) =>
465 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
466 0
467 )
468 const totalTasksWaitTime = this.workerNodes.reduce(
469 (accumulator, workerNode) =>
470 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
471 0
472 )
473 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
474 }
475
476 /**
477 * The pool type.
478 *
479 * If it is `'dynamic'`, it provides the `max` property.
480 */
481 protected abstract get type (): PoolType
482
483 /**
484 * The worker type.
485 */
486 protected abstract get worker (): WorkerType
487
488 /**
489 * The pool minimum size.
490 */
491 protected get minSize (): number {
492 return this.numberOfWorkers
493 }
494
495 /**
496 * The pool maximum size.
497 */
498 protected get maxSize (): number {
499 return this.max ?? this.numberOfWorkers
500 }
501
502 /**
503 * Checks if the worker id sent in the received message from a worker is valid.
504 *
505 * @param message - The received message.
506 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
507 */
508 private checkMessageWorkerId (message: MessageValue<Data | Response>): void {
509 if (message.workerId == null) {
510 throw new Error('Worker message received without worker id')
511 } else if (this.getWorkerNodeKeyByWorkerId(message.workerId) === -1) {
512 throw new Error(
513 `Worker message received from unknown worker '${message.workerId}'`
514 )
515 }
516 }
517
518 /**
519 * Gets the given worker its worker node key.
520 *
521 * @param worker - The worker.
522 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
523 */
524 private getWorkerNodeKeyByWorker (worker: Worker): number {
525 return this.workerNodes.findIndex(
526 workerNode => workerNode.worker === worker
527 )
528 }
529
530 /**
531 * Gets the worker node key given its worker id.
532 *
533 * @param workerId - The worker id.
534 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
535 */
536 private getWorkerNodeKeyByWorkerId (workerId: number | undefined): number {
537 return this.workerNodes.findIndex(
538 workerNode => workerNode.info.id === workerId
539 )
540 }
541
542 /** @inheritDoc */
543 public setWorkerChoiceStrategy (
544 workerChoiceStrategy: WorkerChoiceStrategy,
545 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
546 ): void {
547 checkValidWorkerChoiceStrategy(workerChoiceStrategy)
548 this.opts.workerChoiceStrategy = workerChoiceStrategy
549 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
550 this.opts.workerChoiceStrategy
551 )
552 if (workerChoiceStrategyOptions != null) {
553 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
554 }
555 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
556 workerNode.resetUsage()
557 this.sendStatisticsMessageToWorker(workerNodeKey)
558 }
559 }
560
561 /** @inheritDoc */
562 public setWorkerChoiceStrategyOptions (
563 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
564 ): void {
565 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
566 this.opts.workerChoiceStrategyOptions = {
567 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
568 ...workerChoiceStrategyOptions
569 }
570 this.workerChoiceStrategyContext.setOptions(
571 this.opts.workerChoiceStrategyOptions
572 )
573 }
574
575 /** @inheritDoc */
576 public enableTasksQueue (
577 enable: boolean,
578 tasksQueueOptions?: TasksQueueOptions
579 ): void {
580 if (this.opts.enableTasksQueue === true && !enable) {
581 this.unsetTaskStealing()
582 this.unsetTasksStealingOnBackPressure()
583 this.flushTasksQueues()
584 }
585 this.opts.enableTasksQueue = enable
586 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
587 }
588
589 /** @inheritDoc */
590 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
591 if (this.opts.enableTasksQueue === true) {
592 checkValidTasksQueueOptions(tasksQueueOptions)
593 this.opts.tasksQueueOptions =
594 this.buildTasksQueueOptions(tasksQueueOptions)
595 this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number)
596 if (this.opts.tasksQueueOptions.taskStealing === true) {
597 this.unsetTaskStealing()
598 this.setTaskStealing()
599 } else {
600 this.unsetTaskStealing()
601 }
602 if (this.opts.tasksQueueOptions.tasksStealingOnBackPressure === true) {
603 this.unsetTasksStealingOnBackPressure()
604 this.setTasksStealingOnBackPressure()
605 } else {
606 this.unsetTasksStealingOnBackPressure()
607 }
608 } else if (this.opts.tasksQueueOptions != null) {
609 delete this.opts.tasksQueueOptions
610 }
611 }
612
613 private buildTasksQueueOptions (
614 tasksQueueOptions: TasksQueueOptions
615 ): TasksQueueOptions {
616 return {
617 ...{
618 size: Math.pow(this.maxSize, 2),
619 concurrency: 1,
620 taskStealing: true,
621 tasksStealingOnBackPressure: true
622 },
623 ...tasksQueueOptions
624 }
625 }
626
627 private setTasksQueueSize (size: number): void {
628 for (const workerNode of this.workerNodes) {
629 workerNode.tasksQueueBackPressureSize = size
630 }
631 }
632
633 private setTaskStealing (): void {
634 for (const [workerNodeKey] of this.workerNodes.entries()) {
635 this.workerNodes[workerNodeKey].on(
636 'idleWorkerNode',
637 this.handleIdleWorkerNodeEvent
638 )
639 }
640 }
641
642 private unsetTaskStealing (): void {
643 for (const [workerNodeKey] of this.workerNodes.entries()) {
644 this.workerNodes[workerNodeKey].off(
645 'idleWorkerNode',
646 this.handleIdleWorkerNodeEvent
647 )
648 }
649 }
650
651 private setTasksStealingOnBackPressure (): void {
652 for (const [workerNodeKey] of this.workerNodes.entries()) {
653 this.workerNodes[workerNodeKey].on(
654 'backPressure',
655 this.handleBackPressureEvent
656 )
657 }
658 }
659
660 private unsetTasksStealingOnBackPressure (): void {
661 for (const [workerNodeKey] of this.workerNodes.entries()) {
662 this.workerNodes[workerNodeKey].off(
663 'backPressure',
664 this.handleBackPressureEvent
665 )
666 }
667 }
668
669 /**
670 * Whether the pool is full or not.
671 *
672 * The pool filling boolean status.
673 */
674 protected get full (): boolean {
675 return this.workerNodes.length >= this.maxSize
676 }
677
678 /**
679 * Whether the pool is busy or not.
680 *
681 * The pool busyness boolean status.
682 */
683 protected abstract get busy (): boolean
684
685 /**
686 * Whether worker nodes are executing concurrently their tasks quota or not.
687 *
688 * @returns Worker nodes busyness boolean status.
689 */
690 protected internalBusy (): boolean {
691 if (this.opts.enableTasksQueue === true) {
692 return (
693 this.workerNodes.findIndex(
694 workerNode =>
695 workerNode.info.ready &&
696 workerNode.usage.tasks.executing <
697 (this.opts.tasksQueueOptions?.concurrency as number)
698 ) === -1
699 )
700 }
701 return (
702 this.workerNodes.findIndex(
703 workerNode =>
704 workerNode.info.ready && workerNode.usage.tasks.executing === 0
705 ) === -1
706 )
707 }
708
709 private async sendTaskFunctionOperationToWorker (
710 workerNodeKey: number,
711 message: MessageValue<Data>
712 ): Promise<boolean> {
713 return await new Promise<boolean>((resolve, reject) => {
714 const taskFunctionOperationListener = (
715 message: MessageValue<Response>
716 ): void => {
717 this.checkMessageWorkerId(message)
718 const workerId = this.getWorkerInfo(workerNodeKey).id as number
719 if (
720 message.taskFunctionOperationStatus != null &&
721 message.workerId === workerId
722 ) {
723 if (message.taskFunctionOperationStatus) {
724 resolve(true)
725 } else if (!message.taskFunctionOperationStatus) {
726 reject(
727 new Error(
728 `Task function operation '${
729 message.taskFunctionOperation as string
730 }' failed on worker ${message.workerId} with error: '${
731 message.workerError?.message as string
732 }'`
733 )
734 )
735 }
736 this.deregisterWorkerMessageListener(
737 this.getWorkerNodeKeyByWorkerId(message.workerId),
738 taskFunctionOperationListener
739 )
740 }
741 }
742 this.registerWorkerMessageListener(
743 workerNodeKey,
744 taskFunctionOperationListener
745 )
746 this.sendToWorker(workerNodeKey, message)
747 })
748 }
749
750 private async sendTaskFunctionOperationToWorkers (
751 message: MessageValue<Data>
752 ): Promise<boolean> {
753 return await new Promise<boolean>((resolve, reject) => {
754 const responsesReceived = new Array<MessageValue<Response>>()
755 const taskFunctionOperationsListener = (
756 message: MessageValue<Response>
757 ): void => {
758 this.checkMessageWorkerId(message)
759 if (message.taskFunctionOperationStatus != null) {
760 responsesReceived.push(message)
761 if (responsesReceived.length === this.workerNodes.length) {
762 if (
763 responsesReceived.every(
764 message => message.taskFunctionOperationStatus === true
765 )
766 ) {
767 resolve(true)
768 } else if (
769 responsesReceived.some(
770 message => message.taskFunctionOperationStatus === false
771 )
772 ) {
773 const errorResponse = responsesReceived.find(
774 response => response.taskFunctionOperationStatus === false
775 )
776 reject(
777 new Error(
778 `Task function operation '${
779 message.taskFunctionOperation as string
780 }' failed on worker ${
781 errorResponse?.workerId as number
782 } with error: '${
783 errorResponse?.workerError?.message as string
784 }'`
785 )
786 )
787 }
788 this.deregisterWorkerMessageListener(
789 this.getWorkerNodeKeyByWorkerId(message.workerId),
790 taskFunctionOperationsListener
791 )
792 }
793 }
794 }
795 for (const [workerNodeKey] of this.workerNodes.entries()) {
796 this.registerWorkerMessageListener(
797 workerNodeKey,
798 taskFunctionOperationsListener
799 )
800 this.sendToWorker(workerNodeKey, message)
801 }
802 })
803 }
804
805 /** @inheritDoc */
806 public hasTaskFunction (name: string): boolean {
807 for (const workerNode of this.workerNodes) {
808 if (
809 Array.isArray(workerNode.info.taskFunctionNames) &&
810 workerNode.info.taskFunctionNames.includes(name)
811 ) {
812 return true
813 }
814 }
815 return false
816 }
817
818 /** @inheritDoc */
819 public async addTaskFunction (
820 name: string,
821 fn: TaskFunction<Data, Response>
822 ): Promise<boolean> {
823 if (typeof name !== 'string') {
824 throw new TypeError('name argument must be a string')
825 }
826 if (typeof name === 'string' && name.trim().length === 0) {
827 throw new TypeError('name argument must not be an empty string')
828 }
829 if (typeof fn !== 'function') {
830 throw new TypeError('fn argument must be a function')
831 }
832 const opResult = await this.sendTaskFunctionOperationToWorkers({
833 taskFunctionOperation: 'add',
834 taskFunctionName: name,
835 taskFunction: fn.toString()
836 })
837 this.taskFunctions.set(name, fn)
838 return opResult
839 }
840
841 /** @inheritDoc */
842 public async removeTaskFunction (name: string): Promise<boolean> {
843 if (!this.taskFunctions.has(name)) {
844 throw new Error(
845 'Cannot remove a task function not handled on the pool side'
846 )
847 }
848 const opResult = await this.sendTaskFunctionOperationToWorkers({
849 taskFunctionOperation: 'remove',
850 taskFunctionName: name
851 })
852 this.deleteTaskFunctionWorkerUsages(name)
853 this.taskFunctions.delete(name)
854 return opResult
855 }
856
857 /** @inheritDoc */
858 public listTaskFunctionNames (): string[] {
859 for (const workerNode of this.workerNodes) {
860 if (
861 Array.isArray(workerNode.info.taskFunctionNames) &&
862 workerNode.info.taskFunctionNames.length > 0
863 ) {
864 return workerNode.info.taskFunctionNames
865 }
866 }
867 return []
868 }
869
870 /** @inheritDoc */
871 public async setDefaultTaskFunction (name: string): Promise<boolean> {
872 return await this.sendTaskFunctionOperationToWorkers({
873 taskFunctionOperation: 'default',
874 taskFunctionName: name
875 })
876 }
877
878 private deleteTaskFunctionWorkerUsages (name: string): void {
879 for (const workerNode of this.workerNodes) {
880 workerNode.deleteTaskFunctionWorkerUsage(name)
881 }
882 }
883
884 private shallExecuteTask (workerNodeKey: number): boolean {
885 return (
886 this.tasksQueueSize(workerNodeKey) === 0 &&
887 this.workerNodes[workerNodeKey].usage.tasks.executing <
888 (this.opts.tasksQueueOptions?.concurrency as number)
889 )
890 }
891
892 /** @inheritDoc */
893 public async execute (
894 data?: Data,
895 name?: string,
896 transferList?: TransferListItem[]
897 ): Promise<Response> {
898 return await new Promise<Response>((resolve, reject) => {
899 if (!this.started) {
900 reject(new Error('Cannot execute a task on not started pool'))
901 return
902 }
903 if (this.destroying) {
904 reject(new Error('Cannot execute a task on destroying pool'))
905 return
906 }
907 if (name != null && typeof name !== 'string') {
908 reject(new TypeError('name argument must be a string'))
909 return
910 }
911 if (
912 name != null &&
913 typeof name === 'string' &&
914 name.trim().length === 0
915 ) {
916 reject(new TypeError('name argument must not be an empty string'))
917 return
918 }
919 if (transferList != null && !Array.isArray(transferList)) {
920 reject(new TypeError('transferList argument must be an array'))
921 return
922 }
923 const timestamp = performance.now()
924 const workerNodeKey = this.chooseWorkerNode()
925 const task: Task<Data> = {
926 name: name ?? DEFAULT_TASK_NAME,
927 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
928 data: data ?? ({} as Data),
929 transferList,
930 timestamp,
931 taskId: randomUUID()
932 }
933 this.promiseResponseMap.set(task.taskId as string, {
934 resolve,
935 reject,
936 workerNodeKey
937 })
938 if (
939 this.opts.enableTasksQueue === false ||
940 (this.opts.enableTasksQueue === true &&
941 this.shallExecuteTask(workerNodeKey))
942 ) {
943 this.executeTask(workerNodeKey, task)
944 } else {
945 this.enqueueTask(workerNodeKey, task)
946 }
947 })
948 }
949
950 /** @inheritdoc */
951 public start (): void {
952 if (this.started) {
953 throw new Error('Cannot start an already started pool')
954 }
955 if (this.starting) {
956 throw new Error('Cannot start an already starting pool')
957 }
958 if (this.destroying) {
959 throw new Error('Cannot start a destroying pool')
960 }
961 this.starting = true
962 while (
963 this.workerNodes.reduce(
964 (accumulator, workerNode) =>
965 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
966 0
967 ) < this.numberOfWorkers
968 ) {
969 this.createAndSetupWorkerNode()
970 }
971 this.starting = false
972 this.started = true
973 }
974
975 /** @inheritDoc */
976 public async destroy (): Promise<void> {
977 if (!this.started) {
978 throw new Error('Cannot destroy an already destroyed pool')
979 }
980 if (this.starting) {
981 throw new Error('Cannot destroy an starting pool')
982 }
983 if (this.destroying) {
984 throw new Error('Cannot destroy an already destroying pool')
985 }
986 this.destroying = true
987 await Promise.all(
988 this.workerNodes.map(async (_workerNode, workerNodeKey) => {
989 await this.destroyWorkerNode(workerNodeKey)
990 })
991 )
992 this.emitter?.emit(PoolEvents.destroy, this.info)
993 this.emitter?.emitDestroy()
994 this.emitter?.removeAllListeners()
995 this.readyEventEmitted = false
996 this.destroying = false
997 this.started = false
998 }
999
1000 protected async sendKillMessageToWorker (
1001 workerNodeKey: number
1002 ): Promise<void> {
1003 await new Promise<void>((resolve, reject) => {
1004 const killMessageListener = (message: MessageValue<Response>): void => {
1005 this.checkMessageWorkerId(message)
1006 if (message.kill === 'success') {
1007 resolve()
1008 } else if (message.kill === 'failure') {
1009 reject(
1010 new Error(
1011 `Kill message handling failed on worker ${
1012 message.workerId as number
1013 }`
1014 )
1015 )
1016 }
1017 }
1018 // FIXME: should be registered only once
1019 this.registerWorkerMessageListener(workerNodeKey, killMessageListener)
1020 this.sendToWorker(workerNodeKey, { kill: true })
1021 })
1022 }
1023
1024 /**
1025 * Terminates the worker node given its worker node key.
1026 *
1027 * @param workerNodeKey - The worker node key.
1028 */
1029 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
1030
1031 /**
1032 * Setup hook to execute code before worker nodes are created in the abstract constructor.
1033 * Can be overridden.
1034 *
1035 * @virtual
1036 */
1037 protected setupHook (): void {
1038 /* Intentionally empty */
1039 }
1040
1041 /**
1042 * Should return whether the worker is the main worker or not.
1043 */
1044 protected abstract isMain (): boolean
1045
1046 /**
1047 * Hook executed before the worker task execution.
1048 * Can be overridden.
1049 *
1050 * @param workerNodeKey - The worker node key.
1051 * @param task - The task to execute.
1052 */
1053 protected beforeTaskExecutionHook (
1054 workerNodeKey: number,
1055 task: Task<Data>
1056 ): void {
1057 if (this.workerNodes[workerNodeKey]?.usage != null) {
1058 const workerUsage = this.workerNodes[workerNodeKey].usage
1059 ++workerUsage.tasks.executing
1060 this.updateWaitTimeWorkerUsage(workerUsage, task)
1061 }
1062 if (
1063 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1064 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
1065 task.name as string
1066 ) != null
1067 ) {
1068 const taskFunctionWorkerUsage = this.workerNodes[
1069 workerNodeKey
1070 ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
1071 ++taskFunctionWorkerUsage.tasks.executing
1072 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
1073 }
1074 }
1075
1076 /**
1077 * Hook executed after the worker task execution.
1078 * Can be overridden.
1079 *
1080 * @param workerNodeKey - The worker node key.
1081 * @param message - The received message.
1082 */
1083 protected afterTaskExecutionHook (
1084 workerNodeKey: number,
1085 message: MessageValue<Response>
1086 ): void {
1087 if (this.workerNodes[workerNodeKey]?.usage != null) {
1088 const workerUsage = this.workerNodes[workerNodeKey].usage
1089 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
1090 this.updateRunTimeWorkerUsage(workerUsage, message)
1091 this.updateEluWorkerUsage(workerUsage, message)
1092 }
1093 if (
1094 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1095 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
1096 message.taskPerformance?.name as string
1097 ) != null
1098 ) {
1099 const taskFunctionWorkerUsage = this.workerNodes[
1100 workerNodeKey
1101 ].getTaskFunctionWorkerUsage(
1102 message.taskPerformance?.name as string
1103 ) as WorkerUsage
1104 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
1105 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
1106 this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
1107 }
1108 }
1109
1110 /**
1111 * Whether the worker node shall update its task function worker usage or not.
1112 *
1113 * @param workerNodeKey - The worker node key.
1114 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1115 */
1116 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
1117 const workerInfo = this.getWorkerInfo(workerNodeKey)
1118 return (
1119 workerInfo != null &&
1120 Array.isArray(workerInfo.taskFunctionNames) &&
1121 workerInfo.taskFunctionNames.length > 2
1122 )
1123 }
1124
1125 private updateTaskStatisticsWorkerUsage (
1126 workerUsage: WorkerUsage,
1127 message: MessageValue<Response>
1128 ): void {
1129 const workerTaskStatistics = workerUsage.tasks
1130 if (
1131 workerTaskStatistics.executing != null &&
1132 workerTaskStatistics.executing > 0
1133 ) {
1134 --workerTaskStatistics.executing
1135 }
1136 if (message.workerError == null) {
1137 ++workerTaskStatistics.executed
1138 } else {
1139 ++workerTaskStatistics.failed
1140 }
1141 }
1142
1143 private updateRunTimeWorkerUsage (
1144 workerUsage: WorkerUsage,
1145 message: MessageValue<Response>
1146 ): void {
1147 if (message.workerError != null) {
1148 return
1149 }
1150 updateMeasurementStatistics(
1151 workerUsage.runTime,
1152 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
1153 message.taskPerformance?.runTime ?? 0
1154 )
1155 }
1156
1157 private updateWaitTimeWorkerUsage (
1158 workerUsage: WorkerUsage,
1159 task: Task<Data>
1160 ): void {
1161 const timestamp = performance.now()
1162 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
1163 updateMeasurementStatistics(
1164 workerUsage.waitTime,
1165 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
1166 taskWaitTime
1167 )
1168 }
1169
1170 private updateEluWorkerUsage (
1171 workerUsage: WorkerUsage,
1172 message: MessageValue<Response>
1173 ): void {
1174 if (message.workerError != null) {
1175 return
1176 }
1177 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
1178 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
1179 updateMeasurementStatistics(
1180 workerUsage.elu.active,
1181 eluTaskStatisticsRequirements,
1182 message.taskPerformance?.elu?.active ?? 0
1183 )
1184 updateMeasurementStatistics(
1185 workerUsage.elu.idle,
1186 eluTaskStatisticsRequirements,
1187 message.taskPerformance?.elu?.idle ?? 0
1188 )
1189 if (eluTaskStatisticsRequirements.aggregate) {
1190 if (message.taskPerformance?.elu != null) {
1191 if (workerUsage.elu.utilization != null) {
1192 workerUsage.elu.utilization =
1193 (workerUsage.elu.utilization +
1194 message.taskPerformance.elu.utilization) /
1195 2
1196 } else {
1197 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
1198 }
1199 }
1200 }
1201 }
1202
1203 /**
1204 * Chooses a worker node for the next task.
1205 *
1206 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
1207 *
1208 * @returns The chosen worker node key
1209 */
1210 private chooseWorkerNode (): number {
1211 if (this.shallCreateDynamicWorker()) {
1212 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
1213 if (
1214 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1215 ) {
1216 return workerNodeKey
1217 }
1218 }
1219 return this.workerChoiceStrategyContext.execute()
1220 }
1221
1222 /**
1223 * Conditions for dynamic worker creation.
1224 *
1225 * @returns Whether to create a dynamic worker or not.
1226 */
1227 private shallCreateDynamicWorker (): boolean {
1228 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
1229 }
1230
1231 /**
1232 * Sends a message to worker given its worker node key.
1233 *
1234 * @param workerNodeKey - The worker node key.
1235 * @param message - The message.
1236 * @param transferList - The optional array of transferable objects.
1237 */
1238 protected abstract sendToWorker (
1239 workerNodeKey: number,
1240 message: MessageValue<Data>,
1241 transferList?: TransferListItem[]
1242 ): void
1243
1244 /**
1245 * Creates a new worker.
1246 *
1247 * @returns Newly created worker.
1248 */
1249 protected abstract createWorker (): Worker
1250
1251 /**
1252 * Creates a new, completely set up worker node.
1253 *
1254 * @returns New, completely set up worker node key.
1255 */
1256 protected createAndSetupWorkerNode (): number {
1257 const worker = this.createWorker()
1258
1259 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
1260 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
1261 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1262 worker.on('error', error => {
1263 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1264 this.flagWorkerNodeAsNotReady(workerNodeKey)
1265 const workerInfo = this.getWorkerInfo(workerNodeKey)
1266 this.emitter?.emit(PoolEvents.error, error)
1267 this.workerNodes[workerNodeKey].closeChannel()
1268 if (
1269 this.started &&
1270 !this.starting &&
1271 !this.destroying &&
1272 this.opts.restartWorkerOnError === true
1273 ) {
1274 if (workerInfo.dynamic) {
1275 this.createAndSetupDynamicWorkerNode()
1276 } else {
1277 this.createAndSetupWorkerNode()
1278 }
1279 }
1280 if (this.started && this.opts.enableTasksQueue === true) {
1281 this.redistributeQueuedTasks(workerNodeKey)
1282 }
1283 })
1284 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
1285 worker.once('exit', () => {
1286 this.removeWorkerNode(worker)
1287 })
1288
1289 const workerNodeKey = this.addWorkerNode(worker)
1290
1291 this.afterWorkerNodeSetup(workerNodeKey)
1292
1293 return workerNodeKey
1294 }
1295
1296 /**
1297 * Creates a new, completely set up dynamic worker node.
1298 *
1299 * @returns New, completely set up dynamic worker node key.
1300 */
1301 protected createAndSetupDynamicWorkerNode (): number {
1302 const workerNodeKey = this.createAndSetupWorkerNode()
1303 this.registerWorkerMessageListener(workerNodeKey, message => {
1304 this.checkMessageWorkerId(message)
1305 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1306 message.workerId
1307 )
1308 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
1309 // Kill message received from worker
1310 if (
1311 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1312 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
1313 ((this.opts.enableTasksQueue === false &&
1314 workerUsage.tasks.executing === 0) ||
1315 (this.opts.enableTasksQueue === true &&
1316 workerUsage.tasks.executing === 0 &&
1317 this.tasksQueueSize(localWorkerNodeKey) === 0)))
1318 ) {
1319 // Flag the worker node as not ready immediately
1320 this.flagWorkerNodeAsNotReady(localWorkerNodeKey)
1321 this.destroyWorkerNode(localWorkerNodeKey).catch(error => {
1322 this.emitter?.emit(PoolEvents.error, error)
1323 })
1324 }
1325 })
1326 const workerInfo = this.getWorkerInfo(workerNodeKey)
1327 this.sendToWorker(workerNodeKey, {
1328 checkActive: true
1329 })
1330 if (this.taskFunctions.size > 0) {
1331 for (const [taskFunctionName, taskFunction] of this.taskFunctions) {
1332 this.sendTaskFunctionOperationToWorker(workerNodeKey, {
1333 taskFunctionOperation: 'add',
1334 taskFunctionName,
1335 taskFunction: taskFunction.toString()
1336 }).catch(error => {
1337 this.emitter?.emit(PoolEvents.error, error)
1338 })
1339 }
1340 }
1341 workerInfo.dynamic = true
1342 if (
1343 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
1344 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1345 ) {
1346 workerInfo.ready = true
1347 }
1348 this.checkAndEmitDynamicWorkerCreationEvents()
1349 return workerNodeKey
1350 }
1351
1352 /**
1353 * Registers a listener callback on the worker given its worker node key.
1354 *
1355 * @param workerNodeKey - The worker node key.
1356 * @param listener - The message listener callback.
1357 */
1358 protected abstract registerWorkerMessageListener<
1359 Message extends Data | Response
1360 >(
1361 workerNodeKey: number,
1362 listener: (message: MessageValue<Message>) => void
1363 ): void
1364
1365 /**
1366 * Registers once a listener callback on the worker given its worker node key.
1367 *
1368 * @param workerNodeKey - The worker node key.
1369 * @param listener - The message listener callback.
1370 */
1371 protected abstract registerOnceWorkerMessageListener<
1372 Message extends Data | Response
1373 >(
1374 workerNodeKey: number,
1375 listener: (message: MessageValue<Message>) => void
1376 ): void
1377
1378 /**
1379 * Deregisters a listener callback on the worker given its worker node key.
1380 *
1381 * @param workerNodeKey - The worker node key.
1382 * @param listener - The message listener callback.
1383 */
1384 protected abstract deregisterWorkerMessageListener<
1385 Message extends Data | Response
1386 >(
1387 workerNodeKey: number,
1388 listener: (message: MessageValue<Message>) => void
1389 ): void
1390
1391 /**
1392 * Method hooked up after a worker node has been newly created.
1393 * Can be overridden.
1394 *
1395 * @param workerNodeKey - The newly created worker node key.
1396 */
1397 protected afterWorkerNodeSetup (workerNodeKey: number): void {
1398 // Listen to worker messages.
1399 this.registerWorkerMessageListener(
1400 workerNodeKey,
1401 this.workerMessageListener
1402 )
1403 // Send the startup message to worker.
1404 this.sendStartupMessageToWorker(workerNodeKey)
1405 // Send the statistics message to worker.
1406 this.sendStatisticsMessageToWorker(workerNodeKey)
1407 if (this.opts.enableTasksQueue === true) {
1408 if (this.opts.tasksQueueOptions?.taskStealing === true) {
1409 this.workerNodes[workerNodeKey].on(
1410 'idleWorkerNode',
1411 this.handleIdleWorkerNodeEvent
1412 )
1413 }
1414 if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
1415 this.workerNodes[workerNodeKey].on(
1416 'backPressure',
1417 this.handleBackPressureEvent
1418 )
1419 }
1420 }
1421 }
1422
1423 /**
1424 * Sends the startup message to worker given its worker node key.
1425 *
1426 * @param workerNodeKey - The worker node key.
1427 */
1428 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1429
1430 /**
1431 * Sends the statistics message to worker given its worker node key.
1432 *
1433 * @param workerNodeKey - The worker node key.
1434 */
1435 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
1436 this.sendToWorker(workerNodeKey, {
1437 statistics: {
1438 runTime:
1439 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1440 .runTime.aggregate,
1441 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1442 .elu.aggregate
1443 }
1444 })
1445 }
1446
1447 private redistributeQueuedTasks (workerNodeKey: number): void {
1448 if (this.workerNodes.length <= 1) {
1449 return
1450 }
1451 while (this.tasksQueueSize(workerNodeKey) > 0) {
1452 const destinationWorkerNodeKey = this.workerNodes.reduce(
1453 (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
1454 return workerNode.info.ready &&
1455 workerNode.usage.tasks.queued <
1456 workerNodes[minWorkerNodeKey].usage.tasks.queued
1457 ? workerNodeKey
1458 : minWorkerNodeKey
1459 },
1460 0
1461 )
1462 const task = this.dequeueTask(workerNodeKey) as Task<Data>
1463 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1464 this.executeTask(destinationWorkerNodeKey, task)
1465 } else {
1466 this.enqueueTask(destinationWorkerNodeKey, task)
1467 }
1468 }
1469 }
1470
1471 private updateTaskStolenStatisticsWorkerUsage (
1472 workerNodeKey: number,
1473 taskName: string
1474 ): void {
1475 const workerNode = this.workerNodes[workerNodeKey]
1476 if (workerNode?.usage != null) {
1477 ++workerNode.usage.tasks.stolen
1478 }
1479 if (
1480 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1481 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1482 ) {
1483 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1484 taskName
1485 ) as WorkerUsage
1486 ++taskFunctionWorkerUsage.tasks.stolen
1487 }
1488 }
1489
1490 private updateTaskSequentiallyStolenStatisticsWorkerUsage (
1491 workerNodeKey: number
1492 ): void {
1493 const workerNode = this.workerNodes[workerNodeKey]
1494 if (workerNode?.usage != null) {
1495 ++workerNode.usage.tasks.sequentiallyStolen
1496 }
1497 }
1498
1499 private updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1500 workerNodeKey: number,
1501 taskName: string
1502 ): void {
1503 const workerNode = this.workerNodes[workerNodeKey]
1504 if (
1505 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1506 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1507 ) {
1508 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1509 taskName
1510 ) as WorkerUsage
1511 ++taskFunctionWorkerUsage.tasks.sequentiallyStolen
1512 }
1513 }
1514
1515 private resetTaskSequentiallyStolenStatisticsWorkerUsage (
1516 workerNodeKey: number
1517 ): void {
1518 const workerNode = this.workerNodes[workerNodeKey]
1519 if (workerNode?.usage != null) {
1520 workerNode.usage.tasks.sequentiallyStolen = 0
1521 }
1522 }
1523
1524 private resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1525 workerNodeKey: number,
1526 taskName: string
1527 ): void {
1528 const workerNode = this.workerNodes[workerNodeKey]
1529 if (
1530 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1531 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1532 ) {
1533 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1534 taskName
1535 ) as WorkerUsage
1536 taskFunctionWorkerUsage.tasks.sequentiallyStolen = 0
1537 }
1538 }
1539
1540 private readonly handleIdleWorkerNodeEvent = (
1541 eventDetail: WorkerNodeEventDetail,
1542 previousStolenTask?: Task<Data>
1543 ): void => {
1544 if (this.workerNodes.length <= 1) {
1545 return
1546 }
1547 const { workerNodeKey } = eventDetail
1548 if (workerNodeKey == null) {
1549 throw new Error(
1550 'WorkerNode event detail workerNodeKey attribute must be defined'
1551 )
1552 }
1553 const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks
1554 if (
1555 previousStolenTask != null &&
1556 workerNodeTasksUsage.sequentiallyStolen > 0 &&
1557 (workerNodeTasksUsage.executing > 0 ||
1558 this.tasksQueueSize(workerNodeKey) > 0)
1559 ) {
1560 for (const taskName of this.workerNodes[workerNodeKey].info
1561 .taskFunctionNames as string[]) {
1562 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1563 workerNodeKey,
1564 taskName
1565 )
1566 }
1567 this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
1568 return
1569 }
1570 const stolenTask = this.workerNodeStealTask(workerNodeKey)
1571 if (
1572 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1573 stolenTask != null
1574 ) {
1575 const taskFunctionTasksWorkerUsage = this.workerNodes[
1576 workerNodeKey
1577 ].getTaskFunctionWorkerUsage(stolenTask.name as string)
1578 ?.tasks as TaskStatistics
1579 if (
1580 taskFunctionTasksWorkerUsage.sequentiallyStolen === 0 ||
1581 (previousStolenTask != null &&
1582 previousStolenTask.name === stolenTask.name &&
1583 taskFunctionTasksWorkerUsage.sequentiallyStolen > 0)
1584 ) {
1585 this.updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1586 workerNodeKey,
1587 stolenTask.name as string
1588 )
1589 } else {
1590 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1591 workerNodeKey,
1592 stolenTask.name as string
1593 )
1594 }
1595 }
1596 sleep(exponentialDelay(workerNodeTasksUsage.sequentiallyStolen))
1597 .then(() => {
1598 this.handleIdleWorkerNodeEvent(eventDetail, stolenTask)
1599 return undefined
1600 })
1601 .catch(EMPTY_FUNCTION)
1602 }
1603
1604 private readonly workerNodeStealTask = (
1605 workerNodeKey: number
1606 ): Task<Data> | undefined => {
1607 const workerNodes = this.workerNodes
1608 .slice()
1609 .sort(
1610 (workerNodeA, workerNodeB) =>
1611 workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
1612 )
1613 const sourceWorkerNode = workerNodes.find(
1614 (sourceWorkerNode, sourceWorkerNodeKey) =>
1615 sourceWorkerNode.info.ready &&
1616 sourceWorkerNodeKey !== workerNodeKey &&
1617 sourceWorkerNode.usage.tasks.queued > 0
1618 )
1619 if (sourceWorkerNode != null) {
1620 const task = sourceWorkerNode.popTask() as Task<Data>
1621 if (this.shallExecuteTask(workerNodeKey)) {
1622 this.executeTask(workerNodeKey, task)
1623 } else {
1624 this.enqueueTask(workerNodeKey, task)
1625 }
1626 this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
1627 this.updateTaskStolenStatisticsWorkerUsage(
1628 workerNodeKey,
1629 task.name as string
1630 )
1631 return task
1632 }
1633 }
1634
1635 private readonly handleBackPressureEvent = (
1636 eventDetail: WorkerNodeEventDetail
1637 ): void => {
1638 if (this.workerNodes.length <= 1) {
1639 return
1640 }
1641 const { workerId } = eventDetail
1642 const sizeOffset = 1
1643 if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) {
1644 return
1645 }
1646 const sourceWorkerNode =
1647 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
1648 const workerNodes = this.workerNodes
1649 .slice()
1650 .sort(
1651 (workerNodeA, workerNodeB) =>
1652 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1653 )
1654 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1655 if (
1656 sourceWorkerNode.usage.tasks.queued > 0 &&
1657 workerNode.info.ready &&
1658 workerNode.info.id !== workerId &&
1659 workerNode.usage.tasks.queued <
1660 (this.opts.tasksQueueOptions?.size as number) - sizeOffset
1661 ) {
1662 const task = sourceWorkerNode.popTask() as Task<Data>
1663 if (this.shallExecuteTask(workerNodeKey)) {
1664 this.executeTask(workerNodeKey, task)
1665 } else {
1666 this.enqueueTask(workerNodeKey, task)
1667 }
1668 this.updateTaskStolenStatisticsWorkerUsage(
1669 workerNodeKey,
1670 task.name as string
1671 )
1672 }
1673 }
1674 }
1675
1676 /**
1677 * This method is the message listener registered on each worker.
1678 */
1679 protected readonly workerMessageListener = (
1680 message: MessageValue<Response>
1681 ): void => {
1682 this.checkMessageWorkerId(message)
1683 const { workerId, ready, taskId, taskFunctionNames } = message
1684 if (ready != null && taskFunctionNames != null) {
1685 // Worker ready response received from worker
1686 this.handleWorkerReadyResponse(message)
1687 } else if (taskId != null) {
1688 // Task execution response received from worker
1689 this.handleTaskExecutionResponse(message)
1690 } else if (taskFunctionNames != null) {
1691 // Task function names message received from worker
1692 this.getWorkerInfo(
1693 this.getWorkerNodeKeyByWorkerId(workerId)
1694 ).taskFunctionNames = taskFunctionNames
1695 }
1696 }
1697
1698 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
1699 const { workerId, ready, taskFunctionNames } = message
1700 if (ready === false) {
1701 throw new Error(`Worker ${workerId as number} failed to initialize`)
1702 }
1703 const workerInfo = this.getWorkerInfo(
1704 this.getWorkerNodeKeyByWorkerId(workerId)
1705 )
1706 workerInfo.ready = ready as boolean
1707 workerInfo.taskFunctionNames = taskFunctionNames
1708 if (!this.readyEventEmitted && this.ready) {
1709 this.readyEventEmitted = true
1710 this.emitter?.emit(PoolEvents.ready, this.info)
1711 }
1712 }
1713
1714 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1715 const { workerId, taskId, workerError, data } = message
1716 const promiseResponse = this.promiseResponseMap.get(taskId as string)
1717 if (promiseResponse != null) {
1718 const { resolve, reject, workerNodeKey } = promiseResponse
1719 if (workerError != null) {
1720 this.emitter?.emit(PoolEvents.taskError, workerError)
1721 reject(workerError.message)
1722 } else {
1723 resolve(data as Response)
1724 }
1725 this.afterTaskExecutionHook(workerNodeKey, message)
1726 this.workerChoiceStrategyContext.update(workerNodeKey)
1727 this.promiseResponseMap.delete(taskId as string)
1728 if (this.opts.enableTasksQueue === true) {
1729 const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks
1730 if (
1731 this.tasksQueueSize(workerNodeKey) > 0 &&
1732 workerNodeTasksUsage.executing <
1733 (this.opts.tasksQueueOptions?.concurrency as number)
1734 ) {
1735 this.executeTask(
1736 workerNodeKey,
1737 this.dequeueTask(workerNodeKey) as Task<Data>
1738 )
1739 }
1740 if (
1741 workerNodeTasksUsage.executing === 0 &&
1742 this.tasksQueueSize(workerNodeKey) === 0 &&
1743 workerNodeTasksUsage.sequentiallyStolen === 0
1744 ) {
1745 this.workerNodes[workerNodeKey].emit('idleWorkerNode', {
1746 workerId: workerId as number,
1747 workerNodeKey
1748 })
1749 }
1750 }
1751 }
1752 }
1753
1754 private checkAndEmitTaskExecutionEvents (): void {
1755 if (this.busy) {
1756 this.emitter?.emit(PoolEvents.busy, this.info)
1757 }
1758 }
1759
1760 private checkAndEmitTaskQueuingEvents (): void {
1761 if (this.hasBackPressure()) {
1762 this.emitter?.emit(PoolEvents.backPressure, this.info)
1763 }
1764 }
1765
1766 private checkAndEmitDynamicWorkerCreationEvents (): void {
1767 if (this.type === PoolTypes.dynamic) {
1768 if (this.full) {
1769 this.emitter?.emit(PoolEvents.full, this.info)
1770 }
1771 }
1772 }
1773
1774 /**
1775 * Gets the worker information given its worker node key.
1776 *
1777 * @param workerNodeKey - The worker node key.
1778 * @returns The worker information.
1779 */
1780 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
1781 return this.workerNodes[workerNodeKey]?.info
1782 }
1783
1784 /**
1785 * Adds the given worker in the pool worker nodes.
1786 *
1787 * @param worker - The worker.
1788 * @returns The added worker node key.
1789 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1790 */
1791 private addWorkerNode (worker: Worker): number {
1792 const workerNode = new WorkerNode<Worker, Data>(
1793 worker,
1794 this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2)
1795 )
1796 // Flag the worker node as ready at pool startup.
1797 if (this.starting) {
1798 workerNode.info.ready = true
1799 }
1800 this.workerNodes.push(workerNode)
1801 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1802 if (workerNodeKey === -1) {
1803 throw new Error('Worker added not found in worker nodes')
1804 }
1805 return workerNodeKey
1806 }
1807
1808 /**
1809 * Removes the given worker from the pool worker nodes.
1810 *
1811 * @param worker - The worker.
1812 */
1813 private removeWorkerNode (worker: Worker): void {
1814 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1815 if (workerNodeKey !== -1) {
1816 this.workerNodes.splice(workerNodeKey, 1)
1817 this.workerChoiceStrategyContext.remove(workerNodeKey)
1818 }
1819 }
1820
1821 protected flagWorkerNodeAsNotReady (workerNodeKey: number): void {
1822 this.getWorkerInfo(workerNodeKey).ready = false
1823 }
1824
1825 /** @inheritDoc */
1826 public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
1827 return (
1828 this.opts.enableTasksQueue === true &&
1829 this.workerNodes[workerNodeKey].hasBackPressure()
1830 )
1831 }
1832
1833 private hasBackPressure (): boolean {
1834 return (
1835 this.opts.enableTasksQueue === true &&
1836 this.workerNodes.findIndex(
1837 workerNode => !workerNode.hasBackPressure()
1838 ) === -1
1839 )
1840 }
1841
1842 /**
1843 * Executes the given task on the worker given its worker node key.
1844 *
1845 * @param workerNodeKey - The worker node key.
1846 * @param task - The task to execute.
1847 */
1848 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1849 this.beforeTaskExecutionHook(workerNodeKey, task)
1850 this.sendToWorker(workerNodeKey, task, task.transferList)
1851 this.checkAndEmitTaskExecutionEvents()
1852 }
1853
1854 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1855 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1856 this.checkAndEmitTaskQueuingEvents()
1857 return tasksQueueSize
1858 }
1859
1860 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1861 return this.workerNodes[workerNodeKey].dequeueTask()
1862 }
1863
1864 private tasksQueueSize (workerNodeKey: number): number {
1865 return this.workerNodes[workerNodeKey].tasksQueueSize()
1866 }
1867
1868 protected flushTasksQueue (workerNodeKey: number): void {
1869 while (this.tasksQueueSize(workerNodeKey) > 0) {
1870 this.executeTask(
1871 workerNodeKey,
1872 this.dequeueTask(workerNodeKey) as Task<Data>
1873 )
1874 }
1875 this.workerNodes[workerNodeKey].clearTasksQueue()
1876 }
1877
1878 private flushTasksQueues (): void {
1879 for (const [workerNodeKey] of this.workerNodes.entries()) {
1880 this.flushTasksQueue(workerNodeKey)
1881 }
1882 }
1883}