feat: fire tasks stealing at worker node idling
[poolifier.git] / src / pools / abstract-pool.ts
1 import { randomUUID } from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import type { TransferListItem } from 'node:worker_threads'
4 import { EventEmitterAsyncResource } from 'node:events'
5 import type {
6 MessageValue,
7 PromiseResponseWrapper,
8 Task
9 } from '../utility-types'
10 import {
11 DEFAULT_TASK_NAME,
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
13 EMPTY_FUNCTION,
14 average,
15 isKillBehavior,
16 isPlainObject,
17 max,
18 median,
19 min,
20 round
21 } from '../utils'
22 import { KillBehaviors } from '../worker/worker-options'
23 import type { TaskFunction } from '../worker/task-functions'
24 import {
25 type IPool,
26 PoolEvents,
27 type PoolInfo,
28 type PoolOptions,
29 type PoolType,
30 PoolTypes,
31 type TasksQueueOptions
32 } from './pool'
33 import type {
34 IWorker,
35 IWorkerNode,
36 WorkerInfo,
37 WorkerNodeEventDetail,
38 WorkerType,
39 WorkerUsage
40 } from './worker'
41 import {
42 type MeasurementStatisticsRequirements,
43 Measurements,
44 WorkerChoiceStrategies,
45 type WorkerChoiceStrategy,
46 type WorkerChoiceStrategyOptions
47 } from './selection-strategies/selection-strategies-types'
48 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
49 import { version } from './version'
50 import { WorkerNode } from './worker-node'
51 import {
52 checkFilePath,
53 checkValidTasksQueueOptions,
54 checkValidWorkerChoiceStrategy,
55 updateMeasurementStatistics
56 } from './utils'
57
58 /**
59 * Base class that implements some shared logic for all poolifier pools.
60 *
61 * @typeParam Worker - Type of worker which manages this pool.
62 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
63 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
64 */
65 export abstract class AbstractPool<
66 Worker extends IWorker,
67 Data = unknown,
68 Response = unknown
69 > implements IPool<Worker, Data, Response> {
70 /** @inheritDoc */
71 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
72
73 /** @inheritDoc */
74 public emitter?: EventEmitterAsyncResource
75
76 /**
77 * Dynamic pool maximum size property placeholder.
78 */
79 protected readonly max?: number
80
81 /**
82 * The task execution response promise map:
83 * - `key`: The message id of each submitted task.
84 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
85 *
86 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
87 */
88 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
89 new Map<string, PromiseResponseWrapper<Response>>()
90
91 /**
92 * Worker choice strategy context referencing a worker choice algorithm implementation.
93 */
94 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
95 Worker,
96 Data,
97 Response
98 >
99
100 /**
101 * The task functions added at runtime map:
102 * - `key`: The task function name.
103 * - `value`: The task function itself.
104 */
105 private readonly taskFunctions: Map<string, TaskFunction<Data, Response>>
106
107 /**
108 * Whether the pool is started or not.
109 */
110 private started: boolean
111 /**
112 * Whether the pool is starting or not.
113 */
114 private starting: boolean
115 /**
116 * Whether the pool is destroying or not.
117 */
118 private destroying: boolean
119 /**
120 * Whether the pool ready event has been emitted or not.
121 */
122 private readyEventEmitted: boolean
123 /**
124 * The start timestamp of the pool.
125 */
126 private readonly startTimestamp
127
128 /**
129 * Constructs a new poolifier pool.
130 *
131 * @param numberOfWorkers - Number of workers that this pool should manage.
132 * @param filePath - Path to the worker file.
133 * @param opts - Options for the pool.
134 */
135 public constructor (
136 protected readonly numberOfWorkers: number,
137 protected readonly filePath: string,
138 protected readonly opts: PoolOptions<Worker>
139 ) {
140 if (!this.isMain()) {
141 throw new Error(
142 'Cannot start a pool from a worker with the same type as the pool'
143 )
144 }
145 checkFilePath(this.filePath)
146 this.checkNumberOfWorkers(this.numberOfWorkers)
147 this.checkPoolOptions(this.opts)
148
149 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
150 this.executeTask = this.executeTask.bind(this)
151 this.enqueueTask = this.enqueueTask.bind(this)
152
153 if (this.opts.enableEvents === true) {
154 this.initializeEventEmitter()
155 }
156 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
157 Worker,
158 Data,
159 Response
160 >(
161 this,
162 this.opts.workerChoiceStrategy,
163 this.opts.workerChoiceStrategyOptions
164 )
165
166 this.setupHook()
167
168 this.taskFunctions = new Map<string, TaskFunction<Data, Response>>()
169
170 this.started = false
171 this.starting = false
172 this.destroying = false
173 this.readyEventEmitted = false
174 if (this.opts.startWorkers === true) {
175 this.start()
176 }
177
178 this.startTimestamp = performance.now()
179 }
180
181 private checkNumberOfWorkers (numberOfWorkers: number): void {
182 if (numberOfWorkers == null) {
183 throw new Error(
184 'Cannot instantiate a pool without specifying the number of workers'
185 )
186 } else if (!Number.isSafeInteger(numberOfWorkers)) {
187 throw new TypeError(
188 'Cannot instantiate a pool with a non safe integer number of workers'
189 )
190 } else if (numberOfWorkers < 0) {
191 throw new RangeError(
192 'Cannot instantiate a pool with a negative number of workers'
193 )
194 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
195 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
196 }
197 }
198
199 private checkPoolOptions (opts: PoolOptions<Worker>): void {
200 if (isPlainObject(opts)) {
201 this.opts.startWorkers = opts.startWorkers ?? true
202 checkValidWorkerChoiceStrategy(
203 opts.workerChoiceStrategy as WorkerChoiceStrategy
204 )
205 this.opts.workerChoiceStrategy =
206 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
207 this.checkValidWorkerChoiceStrategyOptions(
208 opts.workerChoiceStrategyOptions as WorkerChoiceStrategyOptions
209 )
210 this.opts.workerChoiceStrategyOptions = {
211 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
212 ...opts.workerChoiceStrategyOptions
213 }
214 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
215 this.opts.enableEvents = opts.enableEvents ?? true
216 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
217 if (this.opts.enableTasksQueue) {
218 checkValidTasksQueueOptions(opts.tasksQueueOptions as TasksQueueOptions)
219 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
220 opts.tasksQueueOptions as TasksQueueOptions
221 )
222 }
223 } else {
224 throw new TypeError('Invalid pool options: must be a plain object')
225 }
226 }
227
228 private checkValidWorkerChoiceStrategyOptions (
229 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
230 ): void {
231 if (
232 workerChoiceStrategyOptions != null &&
233 !isPlainObject(workerChoiceStrategyOptions)
234 ) {
235 throw new TypeError(
236 'Invalid worker choice strategy options: must be a plain object'
237 )
238 }
239 if (
240 workerChoiceStrategyOptions?.retries != null &&
241 !Number.isSafeInteger(workerChoiceStrategyOptions.retries)
242 ) {
243 throw new TypeError(
244 'Invalid worker choice strategy options: retries must be an integer'
245 )
246 }
247 if (
248 workerChoiceStrategyOptions?.retries != null &&
249 workerChoiceStrategyOptions.retries < 0
250 ) {
251 throw new RangeError(
252 `Invalid worker choice strategy options: retries '${workerChoiceStrategyOptions.retries}' must be greater or equal than zero`
253 )
254 }
255 if (
256 workerChoiceStrategyOptions?.weights != null &&
257 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
258 ) {
259 throw new Error(
260 'Invalid worker choice strategy options: must have a weight for each worker node'
261 )
262 }
263 if (
264 workerChoiceStrategyOptions?.measurement != null &&
265 !Object.values(Measurements).includes(
266 workerChoiceStrategyOptions.measurement
267 )
268 ) {
269 throw new Error(
270 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
271 )
272 }
273 }
274
275 private initializeEventEmitter (): void {
276 this.emitter = new EventEmitterAsyncResource({
277 name: `poolifier:${this.type}-${this.worker}-pool`
278 })
279 }
280
281 /** @inheritDoc */
282 public get info (): PoolInfo {
283 return {
284 version,
285 type: this.type,
286 worker: this.worker,
287 started: this.started,
288 ready: this.ready,
289 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
290 minSize: this.minSize,
291 maxSize: this.maxSize,
292 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
293 .runTime.aggregate &&
294 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
295 .waitTime.aggregate && { utilization: round(this.utilization) }),
296 workerNodes: this.workerNodes.length,
297 idleWorkerNodes: this.workerNodes.reduce(
298 (accumulator, workerNode) =>
299 workerNode.usage.tasks.executing === 0
300 ? accumulator + 1
301 : accumulator,
302 0
303 ),
304 busyWorkerNodes: this.workerNodes.reduce(
305 (accumulator, workerNode) =>
306 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
307 0
308 ),
309 executedTasks: this.workerNodes.reduce(
310 (accumulator, workerNode) =>
311 accumulator + workerNode.usage.tasks.executed,
312 0
313 ),
314 executingTasks: this.workerNodes.reduce(
315 (accumulator, workerNode) =>
316 accumulator + workerNode.usage.tasks.executing,
317 0
318 ),
319 ...(this.opts.enableTasksQueue === true && {
320 queuedTasks: this.workerNodes.reduce(
321 (accumulator, workerNode) =>
322 accumulator + workerNode.usage.tasks.queued,
323 0
324 )
325 }),
326 ...(this.opts.enableTasksQueue === true && {
327 maxQueuedTasks: this.workerNodes.reduce(
328 (accumulator, workerNode) =>
329 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
330 0
331 )
332 }),
333 ...(this.opts.enableTasksQueue === true && {
334 backPressure: this.hasBackPressure()
335 }),
336 ...(this.opts.enableTasksQueue === true && {
337 stolenTasks: this.workerNodes.reduce(
338 (accumulator, workerNode) =>
339 accumulator + workerNode.usage.tasks.stolen,
340 0
341 )
342 }),
343 failedTasks: this.workerNodes.reduce(
344 (accumulator, workerNode) =>
345 accumulator + workerNode.usage.tasks.failed,
346 0
347 ),
348 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
349 .runTime.aggregate && {
350 runTime: {
351 minimum: round(
352 min(
353 ...this.workerNodes.map(
354 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
355 )
356 )
357 ),
358 maximum: round(
359 max(
360 ...this.workerNodes.map(
361 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
362 )
363 )
364 ),
365 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
366 .runTime.average && {
367 average: round(
368 average(
369 this.workerNodes.reduce<number[]>(
370 (accumulator, workerNode) =>
371 accumulator.concat(workerNode.usage.runTime.history),
372 []
373 )
374 )
375 )
376 }),
377 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
378 .runTime.median && {
379 median: round(
380 median(
381 this.workerNodes.reduce<number[]>(
382 (accumulator, workerNode) =>
383 accumulator.concat(workerNode.usage.runTime.history),
384 []
385 )
386 )
387 )
388 })
389 }
390 }),
391 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
392 .waitTime.aggregate && {
393 waitTime: {
394 minimum: round(
395 min(
396 ...this.workerNodes.map(
397 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
398 )
399 )
400 ),
401 maximum: round(
402 max(
403 ...this.workerNodes.map(
404 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
405 )
406 )
407 ),
408 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
409 .waitTime.average && {
410 average: round(
411 average(
412 this.workerNodes.reduce<number[]>(
413 (accumulator, workerNode) =>
414 accumulator.concat(workerNode.usage.waitTime.history),
415 []
416 )
417 )
418 )
419 }),
420 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
421 .waitTime.median && {
422 median: round(
423 median(
424 this.workerNodes.reduce<number[]>(
425 (accumulator, workerNode) =>
426 accumulator.concat(workerNode.usage.waitTime.history),
427 []
428 )
429 )
430 )
431 })
432 }
433 })
434 }
435 }
436
437 /**
438 * The pool readiness boolean status.
439 */
440 private get ready (): boolean {
441 return (
442 this.workerNodes.reduce(
443 (accumulator, workerNode) =>
444 !workerNode.info.dynamic && workerNode.info.ready
445 ? accumulator + 1
446 : accumulator,
447 0
448 ) >= this.minSize
449 )
450 }
451
452 /**
453 * The approximate pool utilization.
454 *
455 * @returns The pool utilization.
456 */
457 private get utilization (): number {
458 const poolTimeCapacity =
459 (performance.now() - this.startTimestamp) * this.maxSize
460 const totalTasksRunTime = this.workerNodes.reduce(
461 (accumulator, workerNode) =>
462 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
463 0
464 )
465 const totalTasksWaitTime = this.workerNodes.reduce(
466 (accumulator, workerNode) =>
467 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
468 0
469 )
470 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
471 }
472
473 /**
474 * The pool type.
475 *
476 * If it is `'dynamic'`, it provides the `max` property.
477 */
478 protected abstract get type (): PoolType
479
480 /**
481 * The worker type.
482 */
483 protected abstract get worker (): WorkerType
484
485 /**
486 * The pool minimum size.
487 */
488 protected get minSize (): number {
489 return this.numberOfWorkers
490 }
491
492 /**
493 * The pool maximum size.
494 */
495 protected get maxSize (): number {
496 return this.max ?? this.numberOfWorkers
497 }
498
499 /**
500 * Checks if the worker id sent in the received message from a worker is valid.
501 *
502 * @param message - The received message.
503 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
504 */
505 private checkMessageWorkerId (message: MessageValue<Data | Response>): void {
506 if (message.workerId == null) {
507 throw new Error('Worker message received without worker id')
508 } else if (this.getWorkerNodeKeyByWorkerId(message.workerId) === -1) {
509 throw new Error(
510 `Worker message received from unknown worker '${message.workerId}'`
511 )
512 }
513 }
514
515 /**
516 * Gets the given worker its worker node key.
517 *
518 * @param worker - The worker.
519 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
520 */
521 private getWorkerNodeKeyByWorker (worker: Worker): number {
522 return this.workerNodes.findIndex(
523 workerNode => workerNode.worker === worker
524 )
525 }
526
527 /**
528 * Gets the worker node key given its worker id.
529 *
530 * @param workerId - The worker id.
531 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
532 */
533 private getWorkerNodeKeyByWorkerId (workerId: number | undefined): number {
534 return this.workerNodes.findIndex(
535 workerNode => workerNode.info.id === workerId
536 )
537 }
538
539 /** @inheritDoc */
540 public setWorkerChoiceStrategy (
541 workerChoiceStrategy: WorkerChoiceStrategy,
542 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
543 ): void {
544 checkValidWorkerChoiceStrategy(workerChoiceStrategy)
545 this.opts.workerChoiceStrategy = workerChoiceStrategy
546 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
547 this.opts.workerChoiceStrategy
548 )
549 if (workerChoiceStrategyOptions != null) {
550 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
551 }
552 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
553 workerNode.resetUsage()
554 this.sendStatisticsMessageToWorker(workerNodeKey)
555 }
556 }
557
558 /** @inheritDoc */
559 public setWorkerChoiceStrategyOptions (
560 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
561 ): void {
562 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
563 this.opts.workerChoiceStrategyOptions = {
564 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
565 ...workerChoiceStrategyOptions
566 }
567 this.workerChoiceStrategyContext.setOptions(
568 this.opts.workerChoiceStrategyOptions
569 )
570 }
571
572 /** @inheritDoc */
573 public enableTasksQueue (
574 enable: boolean,
575 tasksQueueOptions?: TasksQueueOptions
576 ): void {
577 if (this.opts.enableTasksQueue === true && !enable) {
578 this.unsetTaskStealing()
579 this.unsetTasksStealingOnBackPressure()
580 this.flushTasksQueues()
581 }
582 this.opts.enableTasksQueue = enable
583 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
584 }
585
586 /** @inheritDoc */
587 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
588 if (this.opts.enableTasksQueue === true) {
589 checkValidTasksQueueOptions(tasksQueueOptions)
590 this.opts.tasksQueueOptions =
591 this.buildTasksQueueOptions(tasksQueueOptions)
592 this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number)
593 if (this.opts.tasksQueueOptions.taskStealing === true) {
594 this.setTaskStealing()
595 } else {
596 this.unsetTaskStealing()
597 }
598 if (this.opts.tasksQueueOptions.tasksStealingOnBackPressure === true) {
599 this.setTasksStealingOnBackPressure()
600 } else {
601 this.unsetTasksStealingOnBackPressure()
602 }
603 } else if (this.opts.tasksQueueOptions != null) {
604 delete this.opts.tasksQueueOptions
605 }
606 }
607
608 private buildTasksQueueOptions (
609 tasksQueueOptions: TasksQueueOptions
610 ): TasksQueueOptions {
611 return {
612 ...{
613 size: Math.pow(this.maxSize, 2),
614 concurrency: 1,
615 taskStealing: true,
616 tasksStealingOnBackPressure: true
617 },
618 ...tasksQueueOptions
619 }
620 }
621
622 private setTasksQueueSize (size: number): void {
623 for (const workerNode of this.workerNodes) {
624 workerNode.tasksQueueBackPressureSize = size
625 }
626 }
627
628 private setTaskStealing (): void {
629 for (const [workerNodeKey] of this.workerNodes.entries()) {
630 this.workerNodes[workerNodeKey].addEventListener(
631 'idleWorkerNode',
632 this.handleIdleWorkerNodeEvent as EventListener
633 )
634 }
635 }
636
637 private unsetTaskStealing (): void {
638 for (const [workerNodeKey] of this.workerNodes.entries()) {
639 this.workerNodes[workerNodeKey].removeEventListener(
640 'idleWorkerNode',
641 this.handleIdleWorkerNodeEvent as EventListener
642 )
643 }
644 }
645
646 private setTasksStealingOnBackPressure (): void {
647 for (const [workerNodeKey] of this.workerNodes.entries()) {
648 this.workerNodes[workerNodeKey].addEventListener(
649 'backPressure',
650 this.handleBackPressureEvent as EventListener
651 )
652 }
653 }
654
655 private unsetTasksStealingOnBackPressure (): void {
656 for (const [workerNodeKey] of this.workerNodes.entries()) {
657 this.workerNodes[workerNodeKey].removeEventListener(
658 'backPressure',
659 this.handleBackPressureEvent as EventListener
660 )
661 }
662 }
663
664 /**
665 * Whether the pool is full or not.
666 *
667 * The pool filling boolean status.
668 */
669 protected get full (): boolean {
670 return this.workerNodes.length >= this.maxSize
671 }
672
673 /**
674 * Whether the pool is busy or not.
675 *
676 * The pool busyness boolean status.
677 */
678 protected abstract get busy (): boolean
679
680 /**
681 * Whether worker nodes are executing concurrently their tasks quota or not.
682 *
683 * @returns Worker nodes busyness boolean status.
684 */
685 protected internalBusy (): boolean {
686 if (this.opts.enableTasksQueue === true) {
687 return (
688 this.workerNodes.findIndex(
689 workerNode =>
690 workerNode.info.ready &&
691 workerNode.usage.tasks.executing <
692 (this.opts.tasksQueueOptions?.concurrency as number)
693 ) === -1
694 )
695 }
696 return (
697 this.workerNodes.findIndex(
698 workerNode =>
699 workerNode.info.ready && workerNode.usage.tasks.executing === 0
700 ) === -1
701 )
702 }
703
704 private async sendTaskFunctionOperationToWorker (
705 workerNodeKey: number,
706 message: MessageValue<Data>
707 ): Promise<boolean> {
708 return await new Promise<boolean>((resolve, reject) => {
709 const taskFunctionOperationListener = (
710 message: MessageValue<Response>
711 ): void => {
712 this.checkMessageWorkerId(message)
713 const workerId = this.getWorkerInfo(workerNodeKey).id as number
714 if (
715 message.taskFunctionOperationStatus != null &&
716 message.workerId === workerId
717 ) {
718 if (message.taskFunctionOperationStatus) {
719 resolve(true)
720 } else if (!message.taskFunctionOperationStatus) {
721 reject(
722 new Error(
723 `Task function operation '${
724 message.taskFunctionOperation as string
725 }' failed on worker ${message.workerId} with error: '${
726 message.workerError?.message as string
727 }'`
728 )
729 )
730 }
731 this.deregisterWorkerMessageListener(
732 this.getWorkerNodeKeyByWorkerId(message.workerId),
733 taskFunctionOperationListener
734 )
735 }
736 }
737 this.registerWorkerMessageListener(
738 workerNodeKey,
739 taskFunctionOperationListener
740 )
741 this.sendToWorker(workerNodeKey, message)
742 })
743 }
744
745 private async sendTaskFunctionOperationToWorkers (
746 message: MessageValue<Data>
747 ): Promise<boolean> {
748 return await new Promise<boolean>((resolve, reject) => {
749 const responsesReceived = new Array<MessageValue<Response>>()
750 const taskFunctionOperationsListener = (
751 message: MessageValue<Response>
752 ): void => {
753 this.checkMessageWorkerId(message)
754 if (message.taskFunctionOperationStatus != null) {
755 responsesReceived.push(message)
756 if (responsesReceived.length === this.workerNodes.length) {
757 if (
758 responsesReceived.every(
759 message => message.taskFunctionOperationStatus === true
760 )
761 ) {
762 resolve(true)
763 } else if (
764 responsesReceived.some(
765 message => message.taskFunctionOperationStatus === false
766 )
767 ) {
768 const errorResponse = responsesReceived.find(
769 response => response.taskFunctionOperationStatus === false
770 )
771 reject(
772 new Error(
773 `Task function operation '${
774 message.taskFunctionOperation as string
775 }' failed on worker ${
776 errorResponse?.workerId as number
777 } with error: '${
778 errorResponse?.workerError?.message as string
779 }'`
780 )
781 )
782 }
783 this.deregisterWorkerMessageListener(
784 this.getWorkerNodeKeyByWorkerId(message.workerId),
785 taskFunctionOperationsListener
786 )
787 }
788 }
789 }
790 for (const [workerNodeKey] of this.workerNodes.entries()) {
791 this.registerWorkerMessageListener(
792 workerNodeKey,
793 taskFunctionOperationsListener
794 )
795 this.sendToWorker(workerNodeKey, message)
796 }
797 })
798 }
799
800 /** @inheritDoc */
801 public hasTaskFunction (name: string): boolean {
802 for (const workerNode of this.workerNodes) {
803 if (
804 Array.isArray(workerNode.info.taskFunctionNames) &&
805 workerNode.info.taskFunctionNames.includes(name)
806 ) {
807 return true
808 }
809 }
810 return false
811 }
812
813 /** @inheritDoc */
814 public async addTaskFunction (
815 name: string,
816 fn: TaskFunction<Data, Response>
817 ): Promise<boolean> {
818 if (typeof name !== 'string') {
819 throw new TypeError('name argument must be a string')
820 }
821 if (typeof name === 'string' && name.trim().length === 0) {
822 throw new TypeError('name argument must not be an empty string')
823 }
824 if (typeof fn !== 'function') {
825 throw new TypeError('fn argument must be a function')
826 }
827 const opResult = await this.sendTaskFunctionOperationToWorkers({
828 taskFunctionOperation: 'add',
829 taskFunctionName: name,
830 taskFunction: fn.toString()
831 })
832 this.taskFunctions.set(name, fn)
833 return opResult
834 }
835
836 /** @inheritDoc */
837 public async removeTaskFunction (name: string): Promise<boolean> {
838 if (!this.taskFunctions.has(name)) {
839 throw new Error(
840 'Cannot remove a task function not handled on the pool side'
841 )
842 }
843 const opResult = await this.sendTaskFunctionOperationToWorkers({
844 taskFunctionOperation: 'remove',
845 taskFunctionName: name
846 })
847 this.deleteTaskFunctionWorkerUsages(name)
848 this.taskFunctions.delete(name)
849 return opResult
850 }
851
852 /** @inheritDoc */
853 public listTaskFunctionNames (): string[] {
854 for (const workerNode of this.workerNodes) {
855 if (
856 Array.isArray(workerNode.info.taskFunctionNames) &&
857 workerNode.info.taskFunctionNames.length > 0
858 ) {
859 return workerNode.info.taskFunctionNames
860 }
861 }
862 return []
863 }
864
865 /** @inheritDoc */
866 public async setDefaultTaskFunction (name: string): Promise<boolean> {
867 return await this.sendTaskFunctionOperationToWorkers({
868 taskFunctionOperation: 'default',
869 taskFunctionName: name
870 })
871 }
872
873 private deleteTaskFunctionWorkerUsages (name: string): void {
874 for (const workerNode of this.workerNodes) {
875 workerNode.deleteTaskFunctionWorkerUsage(name)
876 }
877 }
878
879 private shallExecuteTask (workerNodeKey: number): boolean {
880 return (
881 this.tasksQueueSize(workerNodeKey) === 0 &&
882 this.workerNodes[workerNodeKey].usage.tasks.executing <
883 (this.opts.tasksQueueOptions?.concurrency as number)
884 )
885 }
886
887 /** @inheritDoc */
888 public async execute (
889 data?: Data,
890 name?: string,
891 transferList?: TransferListItem[]
892 ): Promise<Response> {
893 return await new Promise<Response>((resolve, reject) => {
894 if (!this.started) {
895 reject(new Error('Cannot execute a task on not started pool'))
896 return
897 }
898 if (this.destroying) {
899 reject(new Error('Cannot execute a task on destroying pool'))
900 return
901 }
902 if (name != null && typeof name !== 'string') {
903 reject(new TypeError('name argument must be a string'))
904 return
905 }
906 if (
907 name != null &&
908 typeof name === 'string' &&
909 name.trim().length === 0
910 ) {
911 reject(new TypeError('name argument must not be an empty string'))
912 return
913 }
914 if (transferList != null && !Array.isArray(transferList)) {
915 reject(new TypeError('transferList argument must be an array'))
916 return
917 }
918 const timestamp = performance.now()
919 const workerNodeKey = this.chooseWorkerNode()
920 const task: Task<Data> = {
921 name: name ?? DEFAULT_TASK_NAME,
922 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
923 data: data ?? ({} as Data),
924 transferList,
925 timestamp,
926 taskId: randomUUID()
927 }
928 this.promiseResponseMap.set(task.taskId as string, {
929 resolve,
930 reject,
931 workerNodeKey
932 })
933 if (
934 this.opts.enableTasksQueue === false ||
935 (this.opts.enableTasksQueue === true &&
936 this.shallExecuteTask(workerNodeKey))
937 ) {
938 this.executeTask(workerNodeKey, task)
939 } else {
940 this.enqueueTask(workerNodeKey, task)
941 }
942 })
943 }
944
945 /** @inheritdoc */
946 public start (): void {
947 if (this.started) {
948 throw new Error('Cannot start an already started pool')
949 }
950 if (this.starting) {
951 throw new Error('Cannot start an already starting pool')
952 }
953 if (this.destroying) {
954 throw new Error('Cannot start a destroying pool')
955 }
956 this.starting = true
957 while (
958 this.workerNodes.reduce(
959 (accumulator, workerNode) =>
960 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
961 0
962 ) < this.numberOfWorkers
963 ) {
964 this.createAndSetupWorkerNode()
965 }
966 this.starting = false
967 this.started = true
968 }
969
970 /** @inheritDoc */
971 public async destroy (): Promise<void> {
972 if (!this.started) {
973 throw new Error('Cannot destroy an already destroyed pool')
974 }
975 if (this.starting) {
976 throw new Error('Cannot destroy an starting pool')
977 }
978 if (this.destroying) {
979 throw new Error('Cannot destroy an already destroying pool')
980 }
981 this.destroying = true
982 await Promise.all(
983 this.workerNodes.map(async (_, workerNodeKey) => {
984 await this.destroyWorkerNode(workerNodeKey)
985 })
986 )
987 this.emitter?.emit(PoolEvents.destroy, this.info)
988 this.emitter?.emitDestroy()
989 this.readyEventEmitted = false
990 this.destroying = false
991 this.started = false
992 }
993
994 protected async sendKillMessageToWorker (
995 workerNodeKey: number
996 ): Promise<void> {
997 await new Promise<void>((resolve, reject) => {
998 const killMessageListener = (message: MessageValue<Response>): void => {
999 this.checkMessageWorkerId(message)
1000 if (message.kill === 'success') {
1001 resolve()
1002 } else if (message.kill === 'failure') {
1003 reject(
1004 new Error(
1005 `Kill message handling failed on worker ${
1006 message.workerId as number
1007 }`
1008 )
1009 )
1010 }
1011 }
1012 // FIXME: should be registered only once
1013 this.registerWorkerMessageListener(workerNodeKey, killMessageListener)
1014 this.sendToWorker(workerNodeKey, { kill: true })
1015 })
1016 }
1017
1018 /**
1019 * Terminates the worker node given its worker node key.
1020 *
1021 * @param workerNodeKey - The worker node key.
1022 */
1023 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
1024
1025 /**
1026 * Setup hook to execute code before worker nodes are created in the abstract constructor.
1027 * Can be overridden.
1028 *
1029 * @virtual
1030 */
1031 protected setupHook (): void {
1032 /* Intentionally empty */
1033 }
1034
1035 /**
1036 * Should return whether the worker is the main worker or not.
1037 */
1038 protected abstract isMain (): boolean
1039
1040 /**
1041 * Hook executed before the worker task execution.
1042 * Can be overridden.
1043 *
1044 * @param workerNodeKey - The worker node key.
1045 * @param task - The task to execute.
1046 */
1047 protected beforeTaskExecutionHook (
1048 workerNodeKey: number,
1049 task: Task<Data>
1050 ): void {
1051 if (this.workerNodes[workerNodeKey]?.usage != null) {
1052 const workerUsage = this.workerNodes[workerNodeKey].usage
1053 ++workerUsage.tasks.executing
1054 this.updateWaitTimeWorkerUsage(workerUsage, task)
1055 }
1056 if (
1057 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1058 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
1059 task.name as string
1060 ) != null
1061 ) {
1062 const taskFunctionWorkerUsage = this.workerNodes[
1063 workerNodeKey
1064 ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
1065 ++taskFunctionWorkerUsage.tasks.executing
1066 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
1067 }
1068 }
1069
1070 /**
1071 * Hook executed after the worker task execution.
1072 * Can be overridden.
1073 *
1074 * @param workerNodeKey - The worker node key.
1075 * @param message - The received message.
1076 */
1077 protected afterTaskExecutionHook (
1078 workerNodeKey: number,
1079 message: MessageValue<Response>
1080 ): void {
1081 if (this.workerNodes[workerNodeKey]?.usage != null) {
1082 const workerUsage = this.workerNodes[workerNodeKey].usage
1083 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
1084 this.updateRunTimeWorkerUsage(workerUsage, message)
1085 this.updateEluWorkerUsage(workerUsage, message)
1086 }
1087 if (
1088 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1089 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
1090 message.taskPerformance?.name as string
1091 ) != null
1092 ) {
1093 const taskFunctionWorkerUsage = this.workerNodes[
1094 workerNodeKey
1095 ].getTaskFunctionWorkerUsage(
1096 message.taskPerformance?.name as string
1097 ) as WorkerUsage
1098 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
1099 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
1100 this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
1101 }
1102 }
1103
1104 /**
1105 * Whether the worker node shall update its task function worker usage or not.
1106 *
1107 * @param workerNodeKey - The worker node key.
1108 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1109 */
1110 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
1111 const workerInfo = this.getWorkerInfo(workerNodeKey)
1112 return (
1113 workerInfo != null &&
1114 Array.isArray(workerInfo.taskFunctionNames) &&
1115 workerInfo.taskFunctionNames.length > 2
1116 )
1117 }
1118
1119 private updateTaskStatisticsWorkerUsage (
1120 workerUsage: WorkerUsage,
1121 message: MessageValue<Response>
1122 ): void {
1123 const workerTaskStatistics = workerUsage.tasks
1124 if (
1125 workerTaskStatistics.executing != null &&
1126 workerTaskStatistics.executing > 0
1127 ) {
1128 --workerTaskStatistics.executing
1129 }
1130 if (message.workerError == null) {
1131 ++workerTaskStatistics.executed
1132 } else {
1133 ++workerTaskStatistics.failed
1134 }
1135 }
1136
1137 private updateRunTimeWorkerUsage (
1138 workerUsage: WorkerUsage,
1139 message: MessageValue<Response>
1140 ): void {
1141 if (message.workerError != null) {
1142 return
1143 }
1144 updateMeasurementStatistics(
1145 workerUsage.runTime,
1146 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
1147 message.taskPerformance?.runTime ?? 0
1148 )
1149 }
1150
1151 private updateWaitTimeWorkerUsage (
1152 workerUsage: WorkerUsage,
1153 task: Task<Data>
1154 ): void {
1155 const timestamp = performance.now()
1156 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
1157 updateMeasurementStatistics(
1158 workerUsage.waitTime,
1159 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
1160 taskWaitTime
1161 )
1162 }
1163
1164 private updateEluWorkerUsage (
1165 workerUsage: WorkerUsage,
1166 message: MessageValue<Response>
1167 ): void {
1168 if (message.workerError != null) {
1169 return
1170 }
1171 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
1172 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
1173 updateMeasurementStatistics(
1174 workerUsage.elu.active,
1175 eluTaskStatisticsRequirements,
1176 message.taskPerformance?.elu?.active ?? 0
1177 )
1178 updateMeasurementStatistics(
1179 workerUsage.elu.idle,
1180 eluTaskStatisticsRequirements,
1181 message.taskPerformance?.elu?.idle ?? 0
1182 )
1183 if (eluTaskStatisticsRequirements.aggregate) {
1184 if (message.taskPerformance?.elu != null) {
1185 if (workerUsage.elu.utilization != null) {
1186 workerUsage.elu.utilization =
1187 (workerUsage.elu.utilization +
1188 message.taskPerformance.elu.utilization) /
1189 2
1190 } else {
1191 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
1192 }
1193 }
1194 }
1195 }
1196
1197 /**
1198 * Chooses a worker node for the next task.
1199 *
1200 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
1201 *
1202 * @returns The chosen worker node key
1203 */
1204 private chooseWorkerNode (): number {
1205 if (this.shallCreateDynamicWorker()) {
1206 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
1207 if (
1208 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1209 ) {
1210 return workerNodeKey
1211 }
1212 }
1213 return this.workerChoiceStrategyContext.execute()
1214 }
1215
1216 /**
1217 * Conditions for dynamic worker creation.
1218 *
1219 * @returns Whether to create a dynamic worker or not.
1220 */
1221 private shallCreateDynamicWorker (): boolean {
1222 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
1223 }
1224
1225 /**
1226 * Sends a message to worker given its worker node key.
1227 *
1228 * @param workerNodeKey - The worker node key.
1229 * @param message - The message.
1230 * @param transferList - The optional array of transferable objects.
1231 */
1232 protected abstract sendToWorker (
1233 workerNodeKey: number,
1234 message: MessageValue<Data>,
1235 transferList?: TransferListItem[]
1236 ): void
1237
1238 /**
1239 * Creates a new worker.
1240 *
1241 * @returns Newly created worker.
1242 */
1243 protected abstract createWorker (): Worker
1244
1245 /**
1246 * Creates a new, completely set up worker node.
1247 *
1248 * @returns New, completely set up worker node key.
1249 */
1250 protected createAndSetupWorkerNode (): number {
1251 const worker = this.createWorker()
1252
1253 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
1254 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
1255 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1256 worker.on('error', error => {
1257 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1258 this.flagWorkerNodeAsNotReady(workerNodeKey)
1259 const workerInfo = this.getWorkerInfo(workerNodeKey)
1260 this.emitter?.emit(PoolEvents.error, error)
1261 this.workerNodes[workerNodeKey].closeChannel()
1262 if (
1263 this.started &&
1264 !this.starting &&
1265 !this.destroying &&
1266 this.opts.restartWorkerOnError === true
1267 ) {
1268 if (workerInfo.dynamic) {
1269 this.createAndSetupDynamicWorkerNode()
1270 } else {
1271 this.createAndSetupWorkerNode()
1272 }
1273 }
1274 if (this.started && this.opts.enableTasksQueue === true) {
1275 this.redistributeQueuedTasks(workerNodeKey)
1276 }
1277 })
1278 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
1279 worker.once('exit', () => {
1280 this.removeWorkerNode(worker)
1281 })
1282
1283 const workerNodeKey = this.addWorkerNode(worker)
1284
1285 this.afterWorkerNodeSetup(workerNodeKey)
1286
1287 return workerNodeKey
1288 }
1289
1290 /**
1291 * Creates a new, completely set up dynamic worker node.
1292 *
1293 * @returns New, completely set up dynamic worker node key.
1294 */
1295 protected createAndSetupDynamicWorkerNode (): number {
1296 const workerNodeKey = this.createAndSetupWorkerNode()
1297 this.registerWorkerMessageListener(workerNodeKey, message => {
1298 this.checkMessageWorkerId(message)
1299 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1300 message.workerId
1301 )
1302 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
1303 // Kill message received from worker
1304 if (
1305 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1306 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
1307 ((this.opts.enableTasksQueue === false &&
1308 workerUsage.tasks.executing === 0) ||
1309 (this.opts.enableTasksQueue === true &&
1310 workerUsage.tasks.executing === 0 &&
1311 this.tasksQueueSize(localWorkerNodeKey) === 0)))
1312 ) {
1313 // Flag the worker node as not ready immediately
1314 this.flagWorkerNodeAsNotReady(localWorkerNodeKey)
1315 this.destroyWorkerNode(localWorkerNodeKey).catch(error => {
1316 this.emitter?.emit(PoolEvents.error, error)
1317 })
1318 }
1319 })
1320 const workerInfo = this.getWorkerInfo(workerNodeKey)
1321 this.sendToWorker(workerNodeKey, {
1322 checkActive: true
1323 })
1324 if (this.taskFunctions.size > 0) {
1325 for (const [taskFunctionName, taskFunction] of this.taskFunctions) {
1326 this.sendTaskFunctionOperationToWorker(workerNodeKey, {
1327 taskFunctionOperation: 'add',
1328 taskFunctionName,
1329 taskFunction: taskFunction.toString()
1330 }).catch(error => {
1331 this.emitter?.emit(PoolEvents.error, error)
1332 })
1333 }
1334 }
1335 workerInfo.dynamic = true
1336 if (
1337 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
1338 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1339 ) {
1340 workerInfo.ready = true
1341 }
1342 this.checkAndEmitDynamicWorkerCreationEvents()
1343 return workerNodeKey
1344 }
1345
1346 /**
1347 * Registers a listener callback on the worker given its worker node key.
1348 *
1349 * @param workerNodeKey - The worker node key.
1350 * @param listener - The message listener callback.
1351 */
1352 protected abstract registerWorkerMessageListener<
1353 Message extends Data | Response
1354 >(
1355 workerNodeKey: number,
1356 listener: (message: MessageValue<Message>) => void
1357 ): void
1358
1359 /**
1360 * Registers once a listener callback on the worker given its worker node key.
1361 *
1362 * @param workerNodeKey - The worker node key.
1363 * @param listener - The message listener callback.
1364 */
1365 protected abstract registerOnceWorkerMessageListener<
1366 Message extends Data | Response
1367 >(
1368 workerNodeKey: number,
1369 listener: (message: MessageValue<Message>) => void
1370 ): void
1371
1372 /**
1373 * Deregisters a listener callback on the worker given its worker node key.
1374 *
1375 * @param workerNodeKey - The worker node key.
1376 * @param listener - The message listener callback.
1377 */
1378 protected abstract deregisterWorkerMessageListener<
1379 Message extends Data | Response
1380 >(
1381 workerNodeKey: number,
1382 listener: (message: MessageValue<Message>) => void
1383 ): void
1384
1385 /**
1386 * Method hooked up after a worker node has been newly created.
1387 * Can be overridden.
1388 *
1389 * @param workerNodeKey - The newly created worker node key.
1390 */
1391 protected afterWorkerNodeSetup (workerNodeKey: number): void {
1392 // Listen to worker messages.
1393 this.registerWorkerMessageListener(
1394 workerNodeKey,
1395 this.workerMessageListener.bind(this)
1396 )
1397 // Send the startup message to worker.
1398 this.sendStartupMessageToWorker(workerNodeKey)
1399 // Send the statistics message to worker.
1400 this.sendStatisticsMessageToWorker(workerNodeKey)
1401 if (this.opts.enableTasksQueue === true) {
1402 if (this.opts.tasksQueueOptions?.taskStealing === true) {
1403 this.workerNodes[workerNodeKey].addEventListener(
1404 'idleWorkerNode',
1405 this.handleIdleWorkerNodeEvent as EventListener
1406 )
1407 }
1408 if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
1409 this.workerNodes[workerNodeKey].addEventListener(
1410 'backPressure',
1411 this.handleBackPressureEvent as EventListener
1412 )
1413 }
1414 }
1415 }
1416
1417 /**
1418 * Sends the startup message to worker given its worker node key.
1419 *
1420 * @param workerNodeKey - The worker node key.
1421 */
1422 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1423
1424 /**
1425 * Sends the statistics message to worker given its worker node key.
1426 *
1427 * @param workerNodeKey - The worker node key.
1428 */
1429 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
1430 this.sendToWorker(workerNodeKey, {
1431 statistics: {
1432 runTime:
1433 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1434 .runTime.aggregate,
1435 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1436 .elu.aggregate
1437 }
1438 })
1439 }
1440
1441 private redistributeQueuedTasks (workerNodeKey: number): void {
1442 while (this.tasksQueueSize(workerNodeKey) > 0) {
1443 const destinationWorkerNodeKey = this.workerNodes.reduce(
1444 (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
1445 return workerNode.info.ready &&
1446 workerNode.usage.tasks.queued <
1447 workerNodes[minWorkerNodeKey].usage.tasks.queued
1448 ? workerNodeKey
1449 : minWorkerNodeKey
1450 },
1451 0
1452 )
1453 const task = this.dequeueTask(workerNodeKey) as Task<Data>
1454 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1455 this.executeTask(destinationWorkerNodeKey, task)
1456 } else {
1457 this.enqueueTask(destinationWorkerNodeKey, task)
1458 }
1459 }
1460 }
1461
1462 private updateTaskStolenStatisticsWorkerUsage (
1463 workerNodeKey: number,
1464 taskName: string
1465 ): void {
1466 const workerNode = this.workerNodes[workerNodeKey]
1467 if (workerNode?.usage != null) {
1468 ++workerNode.usage.tasks.stolen
1469 }
1470 if (
1471 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1472 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1473 ) {
1474 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1475 taskName
1476 ) as WorkerUsage
1477 ++taskFunctionWorkerUsage.tasks.stolen
1478 }
1479 }
1480
1481 private readonly handleIdleWorkerNodeEvent = (
1482 event: CustomEvent<WorkerNodeEventDetail>
1483 ): void => {
1484 const { workerId } = event.detail
1485 const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
1486 const workerNodes = this.workerNodes
1487 .slice()
1488 .sort(
1489 (workerNodeA, workerNodeB) =>
1490 workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
1491 )
1492 const sourceWorkerNode = workerNodes.find(
1493 workerNode =>
1494 workerNode.info.ready &&
1495 workerNode.info.id !== workerId &&
1496 workerNode.usage.tasks.queued > 0
1497 )
1498 if (sourceWorkerNode != null) {
1499 const task = sourceWorkerNode.popTask() as Task<Data>
1500 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1501 this.executeTask(destinationWorkerNodeKey, task)
1502 } else {
1503 this.enqueueTask(destinationWorkerNodeKey, task)
1504 }
1505 this.updateTaskStolenStatisticsWorkerUsage(
1506 destinationWorkerNodeKey,
1507 task.name as string
1508 )
1509 }
1510 }
1511
1512 private readonly handleBackPressureEvent = (
1513 event: CustomEvent<WorkerNodeEventDetail>
1514 ): void => {
1515 const { workerId } = event.detail
1516 const sizeOffset = 1
1517 if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) {
1518 return
1519 }
1520 const sourceWorkerNode =
1521 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
1522 const workerNodes = this.workerNodes
1523 .slice()
1524 .sort(
1525 (workerNodeA, workerNodeB) =>
1526 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1527 )
1528 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1529 if (
1530 sourceWorkerNode.usage.tasks.queued > 0 &&
1531 workerNode.info.ready &&
1532 workerNode.info.id !== workerId &&
1533 workerNode.usage.tasks.queued <
1534 (this.opts.tasksQueueOptions?.size as number) - sizeOffset
1535 ) {
1536 const task = sourceWorkerNode.popTask() as Task<Data>
1537 if (this.shallExecuteTask(workerNodeKey)) {
1538 this.executeTask(workerNodeKey, task)
1539 } else {
1540 this.enqueueTask(workerNodeKey, task)
1541 }
1542 this.updateTaskStolenStatisticsWorkerUsage(
1543 workerNodeKey,
1544 task.name as string
1545 )
1546 }
1547 }
1548 }
1549
1550 /**
1551 * This method is the message listener registered on each worker.
1552 */
1553 protected workerMessageListener (message: MessageValue<Response>): void {
1554 this.checkMessageWorkerId(message)
1555 if (message.ready != null && message.taskFunctionNames != null) {
1556 // Worker ready response received from worker
1557 this.handleWorkerReadyResponse(message)
1558 } else if (message.taskId != null) {
1559 // Task execution response received from worker
1560 this.handleTaskExecutionResponse(message)
1561 } else if (message.taskFunctionNames != null) {
1562 // Task function names message received from worker
1563 this.getWorkerInfo(
1564 this.getWorkerNodeKeyByWorkerId(message.workerId)
1565 ).taskFunctionNames = message.taskFunctionNames
1566 }
1567 }
1568
1569 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
1570 if (message.ready === false) {
1571 throw new Error(
1572 `Worker ${message.workerId as number} failed to initialize`
1573 )
1574 }
1575 const workerInfo = this.getWorkerInfo(
1576 this.getWorkerNodeKeyByWorkerId(message.workerId)
1577 )
1578 workerInfo.ready = message.ready as boolean
1579 workerInfo.taskFunctionNames = message.taskFunctionNames
1580 if (!this.readyEventEmitted && this.ready) {
1581 this.readyEventEmitted = true
1582 this.emitter?.emit(PoolEvents.ready, this.info)
1583 }
1584 }
1585
1586 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1587 const { taskId, workerError, data } = message
1588 const promiseResponse = this.promiseResponseMap.get(taskId as string)
1589 if (promiseResponse != null) {
1590 const { resolve, reject, workerNodeKey } = promiseResponse
1591 if (workerError != null) {
1592 this.emitter?.emit(PoolEvents.taskError, workerError)
1593 reject(workerError.message)
1594 } else {
1595 resolve(data as Response)
1596 }
1597 this.afterTaskExecutionHook(workerNodeKey, message)
1598 this.workerChoiceStrategyContext.update(workerNodeKey)
1599 this.promiseResponseMap.delete(taskId as string)
1600 if (
1601 this.opts.enableTasksQueue === true &&
1602 this.tasksQueueSize(workerNodeKey) > 0 &&
1603 this.workerNodes[workerNodeKey].usage.tasks.executing <
1604 (this.opts.tasksQueueOptions?.concurrency as number)
1605 ) {
1606 this.executeTask(
1607 workerNodeKey,
1608 this.dequeueTask(workerNodeKey) as Task<Data>
1609 )
1610 }
1611 }
1612 }
1613
1614 private checkAndEmitTaskExecutionEvents (): void {
1615 if (this.busy) {
1616 this.emitter?.emit(PoolEvents.busy, this.info)
1617 }
1618 }
1619
1620 private checkAndEmitTaskQueuingEvents (): void {
1621 if (this.hasBackPressure()) {
1622 this.emitter?.emit(PoolEvents.backPressure, this.info)
1623 }
1624 }
1625
1626 private checkAndEmitDynamicWorkerCreationEvents (): void {
1627 if (this.type === PoolTypes.dynamic) {
1628 if (this.full) {
1629 this.emitter?.emit(PoolEvents.full, this.info)
1630 }
1631 }
1632 }
1633
1634 /**
1635 * Gets the worker information given its worker node key.
1636 *
1637 * @param workerNodeKey - The worker node key.
1638 * @returns The worker information.
1639 */
1640 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
1641 return this.workerNodes[workerNodeKey]?.info
1642 }
1643
1644 /**
1645 * Adds the given worker in the pool worker nodes.
1646 *
1647 * @param worker - The worker.
1648 * @returns The added worker node key.
1649 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1650 */
1651 private addWorkerNode (worker: Worker): number {
1652 const workerNode = new WorkerNode<Worker, Data>(
1653 worker,
1654 this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2)
1655 )
1656 // Flag the worker node as ready at pool startup.
1657 if (this.starting) {
1658 workerNode.info.ready = true
1659 }
1660 this.workerNodes.push(workerNode)
1661 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1662 if (workerNodeKey === -1) {
1663 throw new Error('Worker added not found in worker nodes')
1664 }
1665 return workerNodeKey
1666 }
1667
1668 /**
1669 * Removes the given worker from the pool worker nodes.
1670 *
1671 * @param worker - The worker.
1672 */
1673 private removeWorkerNode (worker: Worker): void {
1674 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1675 if (workerNodeKey !== -1) {
1676 this.workerNodes.splice(workerNodeKey, 1)
1677 this.workerChoiceStrategyContext.remove(workerNodeKey)
1678 }
1679 }
1680
1681 protected flagWorkerNodeAsNotReady (workerNodeKey: number): void {
1682 this.getWorkerInfo(workerNodeKey).ready = false
1683 }
1684
1685 /** @inheritDoc */
1686 public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
1687 return (
1688 this.opts.enableTasksQueue === true &&
1689 this.workerNodes[workerNodeKey].hasBackPressure()
1690 )
1691 }
1692
1693 private hasBackPressure (): boolean {
1694 return (
1695 this.opts.enableTasksQueue === true &&
1696 this.workerNodes.findIndex(
1697 workerNode => !workerNode.hasBackPressure()
1698 ) === -1
1699 )
1700 }
1701
1702 /**
1703 * Executes the given task on the worker given its worker node key.
1704 *
1705 * @param workerNodeKey - The worker node key.
1706 * @param task - The task to execute.
1707 */
1708 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1709 this.beforeTaskExecutionHook(workerNodeKey, task)
1710 this.sendToWorker(workerNodeKey, task, task.transferList)
1711 this.checkAndEmitTaskExecutionEvents()
1712 }
1713
1714 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1715 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1716 this.checkAndEmitTaskQueuingEvents()
1717 return tasksQueueSize
1718 }
1719
1720 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1721 return this.workerNodes[workerNodeKey].dequeueTask()
1722 }
1723
1724 private tasksQueueSize (workerNodeKey: number): number {
1725 return this.workerNodes[workerNodeKey].tasksQueueSize()
1726 }
1727
1728 protected flushTasksQueue (workerNodeKey: number): void {
1729 while (this.tasksQueueSize(workerNodeKey) > 0) {
1730 this.executeTask(
1731 workerNodeKey,
1732 this.dequeueTask(workerNodeKey) as Task<Data>
1733 )
1734 }
1735 this.workerNodes[workerNodeKey].clearTasksQueue()
1736 }
1737
1738 private flushTasksQueues (): void {
1739 for (const [workerNodeKey] of this.workerNodes.entries()) {
1740 this.flushTasksQueue(workerNodeKey)
1741 }
1742 }
1743 }