]> Piment Noir Git Repositories - poolifier.git/blob - src/pools/abstract-pool.ts
639a967e7defa0dd984cd206e40a4d02f4301609
[poolifier.git] / src / pools / abstract-pool.ts
1 import type { Transferable } from 'node:worker_threads'
2
3 import { AsyncResource } from 'node:async_hooks'
4 import { randomUUID } from 'node:crypto'
5 import { EventEmitterAsyncResource } from 'node:events'
6 import { performance } from 'node:perf_hooks'
7
8 import type {
9 MessageValue,
10 PromiseResponseWrapper,
11 Task,
12 TaskFunctionProperties,
13 WorkerError,
14 } from '../utility-types.js'
15 import type {
16 TaskFunction,
17 TaskFunctionObject,
18 } from '../worker/task-functions.js'
19 import type {
20 IWorker,
21 IWorkerNode,
22 WorkerInfo,
23 WorkerNodeEventDetail,
24 WorkerType,
25 } from './worker.js'
26
27 import { defaultBucketSize } from '../queues/queue-types.js'
28 import {
29 average,
30 buildTaskFunctionProperties,
31 DEFAULT_TASK_NAME,
32 EMPTY_FUNCTION,
33 exponentialDelay,
34 isKillBehavior,
35 isPlainObject,
36 max,
37 median,
38 min,
39 round,
40 sleep,
41 } from '../utils.js'
42 import { KillBehaviors } from '../worker/worker-options.js'
43 import {
44 type IPool,
45 PoolEvents,
46 type PoolInfo,
47 type PoolOptions,
48 type PoolType,
49 PoolTypes,
50 type TasksQueueOptions,
51 } from './pool.js'
52 import {
53 Measurements,
54 WorkerChoiceStrategies,
55 type WorkerChoiceStrategy,
56 type WorkerChoiceStrategyOptions,
57 } from './selection-strategies/selection-strategies-types.js'
58 import { WorkerChoiceStrategiesContext } from './selection-strategies/worker-choice-strategies-context.js'
59 import {
60 checkFilePath,
61 checkValidPriority,
62 checkValidTasksQueueOptions,
63 checkValidWorkerChoiceStrategy,
64 getDefaultTasksQueueOptions,
65 updateEluWorkerUsage,
66 updateRunTimeWorkerUsage,
67 updateTaskStatisticsWorkerUsage,
68 updateWaitTimeWorkerUsage,
69 waitWorkerNodeEvents,
70 } from './utils.js'
71 import { version } from './version.js'
72 import { WorkerNode } from './worker-node.js'
73
74 /**
75 * Base class that implements some shared logic for all poolifier pools.
76 * @typeParam Worker - Type of worker which manages this pool.
77 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
78 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
79 */
80 export abstract class AbstractPool<
81 Worker extends IWorker,
82 Data = unknown,
83 Response = unknown
84 > implements IPool<Worker, Data, Response> {
85 /** @inheritDoc */
86 public emitter?: EventEmitterAsyncResource
87
88 /** @inheritDoc */
89 public readonly workerNodes: IWorkerNode<Worker, Data>[] = []
90
91 /** @inheritDoc */
92 public get info (): PoolInfo {
93 return {
94 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
95 defaultStrategy: this.opts.workerChoiceStrategy!,
96 maxSize: this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers,
97 minSize: this.minimumNumberOfWorkers,
98 ready: this.ready,
99 started: this.started,
100 strategyRetries: this.workerChoiceStrategiesContext?.retriesCount ?? 0,
101 type: this.type,
102 version,
103 worker: this.worker,
104 ...(this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
105 .runTime.aggregate === true &&
106 this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
107 .waitTime.aggregate && {
108 utilization: round(this.utilization),
109 }),
110 busyWorkerNodes: this.workerNodes.reduce(
111 (accumulator, _, workerNodeKey) =>
112 this.isWorkerNodeBusy(workerNodeKey) ? accumulator + 1 : accumulator,
113 0
114 ),
115 executedTasks: this.workerNodes.reduce(
116 (accumulator, workerNode) =>
117 accumulator + workerNode.usage.tasks.executed,
118 0
119 ),
120 executingTasks: this.workerNodes.reduce(
121 (accumulator, workerNode) =>
122 accumulator + workerNode.usage.tasks.executing,
123 0
124 ),
125 failedTasks: this.workerNodes.reduce(
126 (accumulator, workerNode) =>
127 accumulator + workerNode.usage.tasks.failed,
128 0
129 ),
130 idleWorkerNodes: this.workerNodes.reduce(
131 (accumulator, _, workerNodeKey) =>
132 this.isWorkerNodeIdle(workerNodeKey) ? accumulator + 1 : accumulator,
133 0
134 ),
135 workerNodes: this.workerNodes.length,
136 ...(this.type === PoolTypes.dynamic && {
137 dynamicWorkerNodes: this.workerNodes.reduce(
138 (accumulator, workerNode) =>
139 workerNode.info.dynamic ? accumulator + 1 : accumulator,
140 0
141 ),
142 }),
143 ...(this.opts.enableTasksQueue === true && {
144 backPressure: this.backPressure,
145 backPressureWorkerNodes: this.workerNodes.reduce(
146 (accumulator, _, workerNodeKey) =>
147 this.isWorkerNodeBackPressured(workerNodeKey)
148 ? accumulator + 1
149 : accumulator,
150 0
151 ),
152 maxQueuedTasks: this.workerNodes.reduce(
153 (accumulator, workerNode) =>
154 accumulator + (workerNode.usage.tasks.maxQueued ?? 0),
155 0
156 ),
157 queuedTasks: this.workerNodes.reduce(
158 (accumulator, workerNode) =>
159 accumulator + workerNode.usage.tasks.queued,
160 0
161 ),
162 stealingWorkerNodes: this.workerNodes.reduce(
163 (accumulator, _, workerNodeKey) =>
164 this.isWorkerNodeStealing(workerNodeKey)
165 ? accumulator + 1
166 : accumulator,
167 0
168 ),
169 stolenTasks: this.workerNodes.reduce(
170 (accumulator, workerNode) =>
171 accumulator + workerNode.usage.tasks.stolen,
172 0
173 ),
174 }),
175 ...(this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
176 .runTime.aggregate === true && {
177 runTime: {
178 maximum: round(
179 max(
180 ...this.workerNodes.map(
181 workerNode =>
182 workerNode.usage.runTime.maximum ?? Number.NEGATIVE_INFINITY
183 )
184 )
185 ),
186 minimum: round(
187 min(
188 ...this.workerNodes.map(
189 workerNode =>
190 workerNode.usage.runTime.minimum ?? Number.POSITIVE_INFINITY
191 )
192 )
193 ),
194 ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
195 .runTime.average && {
196 average: round(
197 average(
198 this.workerNodes.reduce<number[]>(
199 (accumulator, workerNode) =>
200 accumulator.concat(
201 workerNode.usage.runTime.history.toArray()
202 ),
203 []
204 )
205 )
206 ),
207 }),
208 ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
209 .runTime.median && {
210 median: round(
211 median(
212 this.workerNodes.reduce<number[]>(
213 (accumulator, workerNode) =>
214 accumulator.concat(
215 workerNode.usage.runTime.history.toArray()
216 ),
217 []
218 )
219 )
220 ),
221 }),
222 },
223 }),
224 ...(this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
225 .waitTime.aggregate === true && {
226 waitTime: {
227 maximum: round(
228 max(
229 ...this.workerNodes.map(
230 workerNode =>
231 workerNode.usage.waitTime.maximum ?? Number.NEGATIVE_INFINITY
232 )
233 )
234 ),
235 minimum: round(
236 min(
237 ...this.workerNodes.map(
238 workerNode =>
239 workerNode.usage.waitTime.minimum ?? Number.POSITIVE_INFINITY
240 )
241 )
242 ),
243 ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
244 .waitTime.average && {
245 average: round(
246 average(
247 this.workerNodes.reduce<number[]>(
248 (accumulator, workerNode) =>
249 accumulator.concat(
250 workerNode.usage.waitTime.history.toArray()
251 ),
252 []
253 )
254 )
255 ),
256 }),
257 ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
258 .waitTime.median && {
259 median: round(
260 median(
261 this.workerNodes.reduce<number[]>(
262 (accumulator, workerNode) =>
263 accumulator.concat(
264 workerNode.usage.waitTime.history.toArray()
265 ),
266 []
267 )
268 )
269 ),
270 }),
271 },
272 }),
273 ...(this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
274 .elu.aggregate === true && {
275 elu: {
276 active: {
277 maximum: round(
278 max(
279 ...this.workerNodes.map(
280 workerNode =>
281 workerNode.usage.elu.active.maximum ??
282 Number.NEGATIVE_INFINITY
283 )
284 )
285 ),
286 minimum: round(
287 min(
288 ...this.workerNodes.map(
289 workerNode =>
290 workerNode.usage.elu.active.minimum ??
291 Number.POSITIVE_INFINITY
292 )
293 )
294 ),
295 ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
296 .elu.average && {
297 average: round(
298 average(
299 this.workerNodes.reduce<number[]>(
300 (accumulator, workerNode) =>
301 accumulator.concat(
302 workerNode.usage.elu.active.history.toArray()
303 ),
304 []
305 )
306 )
307 ),
308 }),
309 ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
310 .elu.median && {
311 median: round(
312 median(
313 this.workerNodes.reduce<number[]>(
314 (accumulator, workerNode) =>
315 accumulator.concat(
316 workerNode.usage.elu.active.history.toArray()
317 ),
318 []
319 )
320 )
321 ),
322 }),
323 },
324 idle: {
325 maximum: round(
326 max(
327 ...this.workerNodes.map(
328 workerNode =>
329 workerNode.usage.elu.idle.maximum ??
330 Number.NEGATIVE_INFINITY
331 )
332 )
333 ),
334 minimum: round(
335 min(
336 ...this.workerNodes.map(
337 workerNode =>
338 workerNode.usage.elu.idle.minimum ??
339 Number.POSITIVE_INFINITY
340 )
341 )
342 ),
343 ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
344 .elu.average && {
345 average: round(
346 average(
347 this.workerNodes.reduce<number[]>(
348 (accumulator, workerNode) =>
349 accumulator.concat(
350 workerNode.usage.elu.idle.history.toArray()
351 ),
352 []
353 )
354 )
355 ),
356 }),
357 ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
358 .elu.median && {
359 median: round(
360 median(
361 this.workerNodes.reduce<number[]>(
362 (accumulator, workerNode) =>
363 accumulator.concat(
364 workerNode.usage.elu.idle.history.toArray()
365 ),
366 []
367 )
368 )
369 ),
370 }),
371 },
372 utilization: {
373 average: round(
374 average(
375 this.workerNodes.map(
376 workerNode => workerNode.usage.elu.utilization ?? 0
377 )
378 )
379 ),
380 median: round(
381 median(
382 this.workerNodes.map(
383 workerNode => workerNode.usage.elu.utilization ?? 0
384 )
385 )
386 ),
387 },
388 },
389 }),
390 }
391 }
392
393 /**
394 * The task execution response promise map:
395 * - `key`: The message id of each submitted task.
396 * - `value`: An object that contains task's worker node key, execution response promise resolve and reject callbacks, async resource.
397 *
398 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
399 */
400 protected promiseResponseMap: Map<
401 `${string}-${string}-${string}-${string}-${string}`,
402 PromiseResponseWrapper<Response>
403 > = new Map<
404 `${string}-${string}-${string}-${string}-${string}`,
405 PromiseResponseWrapper<Response>
406 >()
407
408 /**
409 * Worker choice strategies context referencing worker choice algorithms implementation.
410 */
411 protected workerChoiceStrategiesContext?: WorkerChoiceStrategiesContext<
412 Worker,
413 Data,
414 Response
415 >
416
417 /**
418 * Whether the pool is back pressured or not.
419 * @returns The pool back pressure boolean status.
420 */
421 protected abstract get backPressure (): boolean
422
423 /**
424 * Whether the pool is busy or not.
425 * @returns The pool busyness boolean status.
426 */
427 protected abstract get busy (): boolean
428
429 /**
430 * The pool type.
431 *
432 * If it is `'dynamic'`, it provides the `max` property.
433 */
434 protected abstract get type (): PoolType
435
436 /**
437 * The worker type.
438 */
439 protected abstract get worker (): WorkerType
440
441 /**
442 * Whether the pool back pressure event has been emitted or not.
443 */
444 private backPressureEventEmitted: boolean
445
446 /**
447 * Whether the pool busy event has been emitted or not.
448 */
449 private busyEventEmitted: boolean
450
451 /**
452 * Whether the pool is destroying or not.
453 */
454 private destroying: boolean
455
456 /**
457 * Whether the pool ready event has been emitted or not.
458 */
459 private readyEventEmitted: boolean
460
461 /**
462 * Whether the pool is started or not.
463 */
464 private started: boolean
465
466 /**
467 * Whether the pool is starting or not.
468 */
469 private starting: boolean
470
471 /**
472 * Whether the minimum number of workers is starting or not.
473 */
474 private startingMinimumNumberOfWorkers: boolean
475
476 /**
477 * The start timestamp of the pool.
478 */
479 private startTimestamp?: number
480
481 /**
482 * The task functions added at runtime map:
483 * - `key`: The task function name.
484 * - `value`: The task function object.
485 */
486 private readonly taskFunctions: Map<
487 string,
488 TaskFunctionObject<Data, Response>
489 >
490
491 /**
492 * Whether the pool is ready or not.
493 * @returns The pool readiness boolean status.
494 */
495 private get ready (): boolean {
496 if (!this.started) {
497 return false
498 }
499 return (
500 this.workerNodes.reduce(
501 (accumulator, workerNode) =>
502 !workerNode.info.dynamic && workerNode.info.ready
503 ? accumulator + 1
504 : accumulator,
505 0
506 ) >= this.minimumNumberOfWorkers
507 )
508 }
509
510 /**
511 * The approximate pool utilization.
512 * @returns The pool utilization.
513 */
514 private get utilization (): number {
515 if (this.startTimestamp == null) {
516 return 0
517 }
518 const poolTimeCapacity =
519 (performance.now() - this.startTimestamp) *
520 (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
521 const totalTasksRunTime = this.workerNodes.reduce(
522 (accumulator, workerNode) =>
523 accumulator + (workerNode.usage.runTime.aggregate ?? 0),
524 0
525 )
526 const totalTasksWaitTime = this.workerNodes.reduce(
527 (accumulator, workerNode) =>
528 accumulator + (workerNode.usage.waitTime.aggregate ?? 0),
529 0
530 )
531 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
532 }
533
534 /**
535 * Constructs a new poolifier pool.
536 * @param minimumNumberOfWorkers - Minimum number of workers that this pool manages.
537 * @param filePath - Path to the worker file.
538 * @param opts - Options for the pool.
539 * @param maximumNumberOfWorkers - Maximum number of workers that this pool manages.
540 */
541 public constructor (
542 protected readonly minimumNumberOfWorkers: number,
543 protected readonly filePath: string,
544 protected readonly opts: PoolOptions<Worker>,
545 protected readonly maximumNumberOfWorkers?: number
546 ) {
547 if (!this.isMain()) {
548 throw new Error(
549 'Cannot start a pool from a worker with the same type as the pool'
550 )
551 }
552 this.checkPoolType()
553 checkFilePath(this.filePath)
554 this.checkMinimumNumberOfWorkers(this.minimumNumberOfWorkers)
555 this.checkPoolOptions(this.opts)
556
557 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
558 this.executeTask = this.executeTask.bind(this)
559 this.enqueueTask = this.enqueueTask.bind(this)
560
561 if (this.opts.enableEvents === true) {
562 this.initEventEmitter()
563 }
564 this.workerChoiceStrategiesContext = new WorkerChoiceStrategiesContext<
565 Worker,
566 Data,
567 Response
568 >(
569 this,
570 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
571 [this.opts.workerChoiceStrategy!],
572 this.opts.workerChoiceStrategyOptions
573 )
574
575 this.setupHook()
576
577 this.taskFunctions = new Map<string, TaskFunctionObject<Data, Response>>()
578
579 this.started = false
580 this.starting = false
581 this.destroying = false
582 this.readyEventEmitted = false
583 this.busyEventEmitted = false
584 this.backPressureEventEmitted = false
585 this.startingMinimumNumberOfWorkers = false
586 if (this.opts.startWorkers === true) {
587 this.start()
588 }
589 }
590
591 /** @inheritDoc */
592 public async addTaskFunction (
593 name: string,
594 fn: TaskFunction<Data, Response> | TaskFunctionObject<Data, Response>
595 ): Promise<boolean> {
596 if (typeof name !== 'string') {
597 throw new TypeError('name argument must be a string')
598 }
599 if (typeof name === 'string' && name.trim().length === 0) {
600 throw new TypeError('name argument must not be an empty string')
601 }
602 if (typeof fn === 'function') {
603 fn = { taskFunction: fn } satisfies TaskFunctionObject<Data, Response>
604 }
605 if (typeof fn.taskFunction !== 'function') {
606 throw new TypeError('taskFunction property must be a function')
607 }
608 checkValidPriority(fn.priority)
609 checkValidWorkerChoiceStrategy(fn.strategy)
610 const opResult = await this.sendTaskFunctionOperationToWorkers({
611 taskFunction: fn.taskFunction.toString(),
612 taskFunctionOperation: 'add',
613 taskFunctionProperties: buildTaskFunctionProperties(name, fn),
614 })
615 this.taskFunctions.set(name, fn)
616 this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies(
617 this.getWorkerChoiceStrategies()
618 )
619 for (const workerNodeKey of this.workerNodes.keys()) {
620 this.sendStatisticsMessageToWorker(workerNodeKey)
621 }
622 return opResult
623 }
624
625 /** @inheritDoc */
626 public async destroy (): Promise<void> {
627 if (!this.started) {
628 throw new Error('Cannot destroy an already destroyed pool')
629 }
630 if (this.starting) {
631 throw new Error('Cannot destroy an starting pool')
632 }
633 if (this.destroying) {
634 throw new Error('Cannot destroy an already destroying pool')
635 }
636 this.destroying = true
637 await Promise.all(
638 this.workerNodes.map(async (_, workerNodeKey) => {
639 await this.destroyWorkerNode(workerNodeKey)
640 })
641 )
642 if (this.emitter != null) {
643 this.emitter.emit(PoolEvents.destroy, this.info)
644 this.emitter.emitDestroy()
645 this.readyEventEmitted = false
646 }
647 delete this.startTimestamp
648 this.destroying = false
649 this.started = false
650 }
651
652 /** @inheritDoc */
653 public enableTasksQueue (
654 enable: boolean,
655 tasksQueueOptions?: TasksQueueOptions
656 ): void {
657 if (this.opts.enableTasksQueue === true && !enable) {
658 this.unsetTaskStealing()
659 this.unsetTasksStealingOnBackPressure()
660 this.flushTasksQueues()
661 }
662 this.opts.enableTasksQueue = enable
663 this.setTasksQueueOptions(tasksQueueOptions)
664 }
665
666 /** @inheritDoc */
667 public async execute (
668 data?: Data,
669 name?: string,
670 abortSignal?: AbortSignal,
671 transferList?: readonly Transferable[]
672 ): Promise<Response> {
673 if (!this.started) {
674 throw new Error('Cannot execute a task on not started pool')
675 }
676 if (this.destroying) {
677 throw new Error('Cannot execute a task on destroying pool')
678 }
679 if (name != null && typeof name !== 'string') {
680 throw new TypeError('name argument must be a string')
681 }
682 if (name != null && typeof name === 'string' && name.trim().length === 0) {
683 throw new TypeError('name argument must not be an empty string')
684 }
685 if (abortSignal != null && !(abortSignal instanceof AbortSignal)) {
686 throw new TypeError('abortSignal argument must be an AbortSignal')
687 }
688 if (transferList != null && !Array.isArray(transferList)) {
689 throw new TypeError('transferList argument must be an array')
690 }
691 return await this.internalExecute(data, name, abortSignal, transferList)
692 }
693
694 /** @inheritDoc */
695 public hasTaskFunction (name: string): boolean {
696 return this.listTaskFunctionsProperties().some(
697 taskFunctionProperties => taskFunctionProperties.name === name
698 )
699 }
700
701 /** @inheritDoc */
702 public listTaskFunctionsProperties (): TaskFunctionProperties[] {
703 for (const workerNode of this.workerNodes) {
704 if (
705 Array.isArray(workerNode.info.taskFunctionsProperties) &&
706 workerNode.info.taskFunctionsProperties.length > 0
707 ) {
708 return workerNode.info.taskFunctionsProperties
709 }
710 }
711 return []
712 }
713
714 /** @inheritDoc */
715 public async mapExecute (
716 data: Iterable<Data>,
717 name?: string,
718 abortSignals?: Iterable<AbortSignal>,
719 transferList?: readonly Transferable[]
720 ): Promise<Response[]> {
721 if (!this.started) {
722 throw new Error('Cannot execute task(s) on not started pool')
723 }
724 if (this.destroying) {
725 throw new Error('Cannot execute task(s) on destroying pool')
726 }
727 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
728 if (data == null) {
729 throw new TypeError('data argument must be a defined iterable')
730 }
731 if (typeof data[Symbol.iterator] !== 'function') {
732 throw new TypeError('data argument must be an iterable')
733 }
734 if (name != null && typeof name !== 'string') {
735 throw new TypeError('name argument must be a string')
736 }
737 if (name != null && typeof name === 'string' && name.trim().length === 0) {
738 throw new TypeError('name argument must not be an empty string')
739 }
740 if (!Array.isArray(data)) {
741 data = [...data]
742 }
743 if (abortSignals != null) {
744 if (typeof abortSignals[Symbol.iterator] !== 'function') {
745 throw new TypeError('abortSignals argument must be an iterable')
746 }
747 for (const abortSignal of abortSignals) {
748 if (!(abortSignal instanceof AbortSignal)) {
749 throw new TypeError(
750 'abortSignals argument must be an iterable of AbortSignal'
751 )
752 }
753 }
754 if (!Array.isArray(abortSignals)) {
755 abortSignals = [...abortSignals]
756 }
757 if ((data as Data[]).length !== (abortSignals as AbortSignal[]).length) {
758 throw new Error(
759 'data and abortSignals arguments must have the same length'
760 )
761 }
762 }
763 if (transferList != null && !Array.isArray(transferList)) {
764 throw new TypeError('transferList argument must be an array')
765 }
766 const tasks: [Data, AbortSignal | undefined][] = Array.from(
767 { length: (data as Data[]).length },
768 (_, i) => [
769 (data as Data[])[i],
770 abortSignals != null ? (abortSignals as AbortSignal[])[i] : undefined,
771 ]
772 )
773 return await Promise.all(
774 tasks.map(([data, abortSignal]) =>
775 this.internalExecute(data, name, abortSignal, transferList)
776 )
777 )
778 }
779
780 /** @inheritDoc */
781 public async removeTaskFunction (name: string): Promise<boolean> {
782 if (!this.taskFunctions.has(name)) {
783 throw new Error(
784 'Cannot remove a task function not handled on the pool side'
785 )
786 }
787 const opResult = await this.sendTaskFunctionOperationToWorkers({
788 taskFunctionOperation: 'remove',
789 taskFunctionProperties: buildTaskFunctionProperties(
790 name,
791 this.taskFunctions.get(name)
792 ),
793 })
794 for (const workerNode of this.workerNodes) {
795 workerNode.deleteTaskFunctionWorkerUsage(name)
796 }
797 this.taskFunctions.delete(name)
798 this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies(
799 this.getWorkerChoiceStrategies()
800 )
801 for (const workerNodeKey of this.workerNodes.keys()) {
802 this.sendStatisticsMessageToWorker(workerNodeKey)
803 }
804 return opResult
805 }
806
807 /** @inheritDoc */
808 public async setDefaultTaskFunction (name: string): Promise<boolean> {
809 return await this.sendTaskFunctionOperationToWorkers({
810 taskFunctionOperation: 'default',
811 taskFunctionProperties: buildTaskFunctionProperties(
812 name,
813 this.taskFunctions.get(name)
814 ),
815 })
816 }
817
818 /** @inheritDoc */
819 public setTasksQueueOptions (
820 tasksQueueOptions: TasksQueueOptions | undefined
821 ): void {
822 if (this.opts.enableTasksQueue === true) {
823 checkValidTasksQueueOptions(tasksQueueOptions)
824 this.opts.tasksQueueOptions =
825 this.buildTasksQueueOptions(tasksQueueOptions)
826 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
827 this.setTasksQueueSize(this.opts.tasksQueueOptions.size!)
828 if (this.opts.tasksQueueOptions.taskStealing === true) {
829 this.unsetTaskStealing()
830 this.setTaskStealing()
831 } else {
832 this.unsetTaskStealing()
833 }
834 if (this.opts.tasksQueueOptions.tasksStealingOnBackPressure === true) {
835 this.unsetTasksStealingOnBackPressure()
836 this.setTasksStealingOnBackPressure()
837 } else {
838 this.unsetTasksStealingOnBackPressure()
839 }
840 } else if (this.opts.tasksQueueOptions != null) {
841 delete this.opts.tasksQueueOptions
842 }
843 }
844
845 /** @inheritDoc */
846 public setWorkerChoiceStrategy (
847 workerChoiceStrategy: WorkerChoiceStrategy,
848 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
849 ): void {
850 let requireSync = false
851 checkValidWorkerChoiceStrategy(workerChoiceStrategy)
852 if (workerChoiceStrategyOptions != null) {
853 requireSync = !this.setWorkerChoiceStrategyOptions(
854 workerChoiceStrategyOptions
855 )
856 }
857 if (workerChoiceStrategy !== this.opts.workerChoiceStrategy) {
858 this.opts.workerChoiceStrategy = workerChoiceStrategy
859 this.workerChoiceStrategiesContext?.setDefaultWorkerChoiceStrategy(
860 this.opts.workerChoiceStrategy,
861 this.opts.workerChoiceStrategyOptions
862 )
863 requireSync = true
864 }
865 if (requireSync) {
866 this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies(
867 this.getWorkerChoiceStrategies(),
868 this.opts.workerChoiceStrategyOptions
869 )
870 for (const workerNodeKey of this.workerNodes.keys()) {
871 this.sendStatisticsMessageToWorker(workerNodeKey)
872 }
873 }
874 }
875
876 /** @inheritDoc */
877 public setWorkerChoiceStrategyOptions (
878 workerChoiceStrategyOptions: undefined | WorkerChoiceStrategyOptions
879 ): boolean {
880 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
881 if (workerChoiceStrategyOptions != null) {
882 this.opts.workerChoiceStrategyOptions = {
883 ...this.opts.workerChoiceStrategyOptions,
884 ...workerChoiceStrategyOptions,
885 }
886 this.workerChoiceStrategiesContext?.setOptions(
887 this.opts.workerChoiceStrategyOptions
888 )
889 this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies(
890 this.getWorkerChoiceStrategies(),
891 this.opts.workerChoiceStrategyOptions
892 )
893 for (const workerNodeKey of this.workerNodes.keys()) {
894 this.sendStatisticsMessageToWorker(workerNodeKey)
895 }
896 return true
897 }
898 return false
899 }
900
901 /** @inheritdoc */
902 public start (): void {
903 if (this.started) {
904 throw new Error('Cannot start an already started pool')
905 }
906 if (this.starting) {
907 throw new Error('Cannot start an already starting pool')
908 }
909 if (this.destroying) {
910 throw new Error('Cannot start a destroying pool')
911 }
912 this.starting = true
913 this.startMinimumNumberOfWorkers()
914 this.startTimestamp = performance.now()
915 this.starting = false
916 this.started = true
917 }
918
919 /**
920 * Hook executed after the worker task execution.
921 * Can be overridden.
922 * @param workerNodeKey - The worker node key.
923 * @param message - The received message.
924 */
925 protected afterTaskExecutionHook (
926 workerNodeKey: number,
927 message: MessageValue<Response>
928 ): void {
929 let needWorkerChoiceStrategiesUpdate = false
930 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
931 if (this.workerNodes[workerNodeKey]?.usage != null) {
932 const workerUsage = this.workerNodes[workerNodeKey].usage
933 updateTaskStatisticsWorkerUsage(workerUsage, message)
934 updateRunTimeWorkerUsage(
935 this.workerChoiceStrategiesContext,
936 workerUsage,
937 message
938 )
939 updateEluWorkerUsage(
940 this.workerChoiceStrategiesContext,
941 workerUsage,
942 message
943 )
944 needWorkerChoiceStrategiesUpdate = true
945 }
946 if (
947 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
948 message.taskPerformance?.name != null &&
949 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
950 message.taskPerformance.name
951 ) != null
952 ) {
953 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
954 const taskFunctionWorkerUsage = this.workerNodes[
955 workerNodeKey
956 ].getTaskFunctionWorkerUsage(message.taskPerformance.name)!
957 updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
958 updateRunTimeWorkerUsage(
959 this.workerChoiceStrategiesContext,
960 taskFunctionWorkerUsage,
961 message
962 )
963 updateEluWorkerUsage(
964 this.workerChoiceStrategiesContext,
965 taskFunctionWorkerUsage,
966 message
967 )
968 needWorkerChoiceStrategiesUpdate = true
969 }
970 if (needWorkerChoiceStrategiesUpdate) {
971 this.workerChoiceStrategiesContext?.update(workerNodeKey)
972 }
973 }
974
975 /**
976 * Method hooked up after a worker node has been newly created.
977 * Can be overridden.
978 * @param workerNodeKey - The newly created worker node key.
979 */
980 protected afterWorkerNodeSetup (workerNodeKey: number): void {
981 // Listen to worker messages.
982 this.registerWorkerMessageListener(
983 workerNodeKey,
984 this.workerMessageListener
985 )
986 // Send the startup message to worker.
987 this.sendStartupMessageToWorker(workerNodeKey)
988 // Send the statistics message to worker.
989 this.sendStatisticsMessageToWorker(workerNodeKey)
990 if (this.opts.enableTasksQueue === true) {
991 if (this.opts.tasksQueueOptions?.taskStealing === true) {
992 this.workerNodes[workerNodeKey].on(
993 'idle',
994 this.handleWorkerNodeIdleEvent
995 )
996 }
997 if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
998 this.workerNodes[workerNodeKey].on(
999 'backPressure',
1000 this.handleWorkerNodeBackPressureEvent
1001 )
1002 }
1003 }
1004 this.workerNodes[workerNodeKey].on('abortTask', this.abortTask)
1005 }
1006
1007 /**
1008 * Hook executed before the worker task execution.
1009 * Can be overridden.
1010 * @param workerNodeKey - The worker node key.
1011 * @param task - The task to execute.
1012 */
1013 protected beforeTaskExecutionHook (
1014 workerNodeKey: number,
1015 task: Task<Data>
1016 ): void {
1017 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1018 if (this.workerNodes[workerNodeKey]?.usage != null) {
1019 const workerUsage = this.workerNodes[workerNodeKey].usage
1020 ++workerUsage.tasks.executing
1021 updateWaitTimeWorkerUsage(
1022 this.workerChoiceStrategiesContext,
1023 workerUsage,
1024 task
1025 )
1026 }
1027 if (
1028 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1029 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1030 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(task.name!) !=
1031 null
1032 ) {
1033 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1034 const taskFunctionWorkerUsage = this.workerNodes[
1035 workerNodeKey
1036 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1037 ].getTaskFunctionWorkerUsage(task.name!)!
1038 ++taskFunctionWorkerUsage.tasks.executing
1039 updateWaitTimeWorkerUsage(
1040 this.workerChoiceStrategiesContext,
1041 taskFunctionWorkerUsage,
1042 task
1043 )
1044 }
1045 }
1046
1047 /**
1048 * Emits dynamic worker creation events.
1049 */
1050 protected abstract checkAndEmitDynamicWorkerCreationEvents (): void
1051
1052 /**
1053 * Emits dynamic worker destruction events.
1054 */
1055 protected abstract checkAndEmitDynamicWorkerDestructionEvents (): void
1056
1057 /**
1058 * Creates a new, completely set up dynamic worker node.
1059 * @returns New, completely set up dynamic worker node key.
1060 */
1061 protected createAndSetupDynamicWorkerNode (): number {
1062 const workerNodeKey = this.createAndSetupWorkerNode()
1063 this.registerWorkerMessageListener(workerNodeKey, message => {
1064 this.checkMessageWorkerId(message)
1065 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1066 message.workerId
1067 )
1068 // Kill message received from worker
1069 if (
1070 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1071 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
1072 this.isWorkerNodeIdle(localWorkerNodeKey) &&
1073 !this.isWorkerNodeStealing(localWorkerNodeKey))
1074 ) {
1075 // Flag the worker node as not ready immediately
1076 this.flagWorkerNodeAsNotReady(localWorkerNodeKey)
1077 this.destroyWorkerNode(localWorkerNodeKey).catch((error: unknown) => {
1078 this.emitter?.emit(PoolEvents.error, error)
1079 })
1080 }
1081 })
1082 this.sendToWorker(workerNodeKey, {
1083 checkActive: true,
1084 })
1085 if (this.taskFunctions.size > 0) {
1086 for (const [taskFunctionName, taskFunctionObject] of this.taskFunctions) {
1087 this.sendTaskFunctionOperationToWorker(workerNodeKey, {
1088 taskFunction: taskFunctionObject.taskFunction.toString(),
1089 taskFunctionOperation: 'add',
1090 taskFunctionProperties: buildTaskFunctionProperties(
1091 taskFunctionName,
1092 taskFunctionObject
1093 ),
1094 }).catch((error: unknown) => {
1095 this.emitter?.emit(PoolEvents.error, error)
1096 })
1097 }
1098 }
1099 const workerNode = this.workerNodes[workerNodeKey]
1100 workerNode.info.dynamic = true
1101 if (
1102 this.workerChoiceStrategiesContext?.getPolicy().dynamicWorkerReady ===
1103 true
1104 ) {
1105 workerNode.info.ready = true
1106 }
1107 this.initWorkerNodeUsage(workerNode)
1108 this.checkAndEmitDynamicWorkerCreationEvents()
1109 return workerNodeKey
1110 }
1111
1112 /**
1113 * Creates a new, completely set up worker node.
1114 * @returns New, completely set up worker node key.
1115 */
1116 protected createAndSetupWorkerNode (): number {
1117 const workerNode = this.createWorkerNode()
1118 workerNode.registerWorkerEventHandler(
1119 'online',
1120 this.opts.onlineHandler ?? EMPTY_FUNCTION
1121 )
1122 workerNode.registerWorkerEventHandler(
1123 'message',
1124 this.opts.messageHandler ?? EMPTY_FUNCTION
1125 )
1126 workerNode.registerWorkerEventHandler(
1127 'error',
1128 this.opts.errorHandler ?? EMPTY_FUNCTION
1129 )
1130 workerNode.registerOnceWorkerEventHandler('error', (error: Error) => {
1131 workerNode.info.ready = false
1132 this.emitter?.emit(PoolEvents.error, error)
1133 if (
1134 this.started &&
1135 !this.destroying &&
1136 this.opts.restartWorkerOnError === true
1137 ) {
1138 if (workerNode.info.dynamic) {
1139 this.createAndSetupDynamicWorkerNode()
1140 } else if (!this.startingMinimumNumberOfWorkers) {
1141 this.startMinimumNumberOfWorkers(true)
1142 }
1143 }
1144 if (
1145 this.started &&
1146 !this.destroying &&
1147 this.opts.enableTasksQueue === true
1148 ) {
1149 this.redistributeQueuedTasks(this.workerNodes.indexOf(workerNode))
1150 }
1151 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition, promise/no-promise-in-callback
1152 workerNode?.terminate().catch((error: unknown) => {
1153 this.emitter?.emit(PoolEvents.error, error)
1154 })
1155 })
1156 workerNode.registerWorkerEventHandler(
1157 'exit',
1158 this.opts.exitHandler ?? EMPTY_FUNCTION
1159 )
1160 workerNode.registerOnceWorkerEventHandler('exit', () => {
1161 this.removeWorkerNode(workerNode)
1162 if (
1163 this.started &&
1164 !this.startingMinimumNumberOfWorkers &&
1165 !this.destroying
1166 ) {
1167 this.startMinimumNumberOfWorkers(true)
1168 }
1169 })
1170 const workerNodeKey = this.addWorkerNode(workerNode)
1171 this.afterWorkerNodeSetup(workerNodeKey)
1172 return workerNodeKey
1173 }
1174
1175 /**
1176 * Deregisters a listener callback on the worker given its worker node key.
1177 * @param workerNodeKey - The worker node key.
1178 * @param listener - The message listener callback.
1179 */
1180 protected abstract deregisterWorkerMessageListener<
1181 Message extends Data | Response
1182 >(
1183 workerNodeKey: number,
1184 listener: (message: MessageValue<Message>) => void
1185 ): void
1186
1187 /**
1188 * Terminates the worker node given its worker node key.
1189 * @param workerNodeKey - The worker node key.
1190 */
1191 protected async destroyWorkerNode (workerNodeKey: number): Promise<void> {
1192 this.flagWorkerNodeAsNotReady(workerNodeKey)
1193 const flushedTasks = this.flushTasksQueue(workerNodeKey)
1194 const workerNode = this.workerNodes[workerNodeKey]
1195 await waitWorkerNodeEvents(
1196 workerNode,
1197 'taskFinished',
1198 flushedTasks,
1199 this.opts.tasksQueueOptions?.tasksFinishedTimeout ??
1200 getDefaultTasksQueueOptions(
1201 this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
1202 ).tasksFinishedTimeout
1203 )
1204 await this.sendKillMessageToWorker(workerNodeKey)
1205 await workerNode.terminate()
1206 }
1207
1208 protected flagWorkerNodeAsNotReady (workerNodeKey: number): void {
1209 const workerInfo = this.getWorkerInfo(workerNodeKey)
1210 if (workerInfo != null) {
1211 workerInfo.ready = false
1212 }
1213 }
1214
1215 protected flushTasksQueue (workerNodeKey: number): number {
1216 let flushedTasks = 0
1217 while (this.tasksQueueSize(workerNodeKey) > 0) {
1218 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1219 this.executeTask(workerNodeKey, this.dequeueTask(workerNodeKey)!)
1220 ++flushedTasks
1221 }
1222 this.workerNodes[workerNodeKey].clearTasksQueue()
1223 return flushedTasks
1224 }
1225
1226 /**
1227 * Gets the worker information given its worker node key.
1228 * @param workerNodeKey - The worker node key.
1229 * @returns The worker information.
1230 */
1231 protected getWorkerInfo (workerNodeKey: number): undefined | WorkerInfo {
1232 return this.workerNodes[workerNodeKey]?.info
1233 }
1234
1235 /**
1236 * Whether the worker nodes are back pressured or not.
1237 * @returns Worker nodes back pressure boolean status.
1238 */
1239 protected internalBackPressure (): boolean {
1240 return (
1241 this.workerNodes.reduce(
1242 (accumulator, _, workerNodeKey) =>
1243 this.isWorkerNodeBackPressured(workerNodeKey)
1244 ? accumulator + 1
1245 : accumulator,
1246 0
1247 ) === this.workerNodes.length
1248 )
1249 }
1250
1251 /**
1252 * Whether worker nodes are executing concurrently their tasks quota or not.
1253 * @returns Worker nodes busyness boolean status.
1254 */
1255 protected internalBusy (): boolean {
1256 return (
1257 this.workerNodes.reduce(
1258 (accumulator, _, workerNodeKey) =>
1259 this.isWorkerNodeBusy(workerNodeKey) ? accumulator + 1 : accumulator,
1260 0
1261 ) === this.workerNodes.length
1262 )
1263 }
1264
1265 /**
1266 * Returns whether the worker is the main worker or not.
1267 * @returns `true` if the worker is the main worker, `false` otherwise.
1268 */
1269 protected abstract isMain (): boolean
1270
1271 /**
1272 * Registers once a listener callback on the worker given its worker node key.
1273 * @param workerNodeKey - The worker node key.
1274 * @param listener - The message listener callback.
1275 */
1276 protected abstract registerOnceWorkerMessageListener<
1277 Message extends Data | Response
1278 >(
1279 workerNodeKey: number,
1280 listener: (message: MessageValue<Message>) => void
1281 ): void
1282
1283 /**
1284 * Registers a listener callback on the worker given its worker node key.
1285 * @param workerNodeKey - The worker node key.
1286 * @param listener - The message listener callback.
1287 */
1288 protected abstract registerWorkerMessageListener<
1289 Message extends Data | Response
1290 >(
1291 workerNodeKey: number,
1292 listener: (message: MessageValue<Message>) => void
1293 ): void
1294
1295 /**
1296 * Sends the startup message to worker given its worker node key.
1297 * @param workerNodeKey - The worker node key.
1298 */
1299 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1300
1301 /**
1302 * Sends a message to worker given its worker node key.
1303 * @param workerNodeKey - The worker node key.
1304 * @param message - The message.
1305 * @param transferList - The optional array of transferable objects.
1306 */
1307 protected abstract sendToWorker (
1308 workerNodeKey: number,
1309 message: MessageValue<Data>,
1310 transferList?: readonly Transferable[]
1311 ): void
1312
1313 /**
1314 * Setup hook to execute code before worker nodes are created in the abstract constructor.
1315 * Can be overridden.
1316 */
1317 protected setupHook (): void {
1318 /* Intentionally empty */
1319 }
1320
1321 /**
1322 * Conditions for dynamic worker creation.
1323 * @returns Whether to create a dynamic worker or not.
1324 */
1325 protected abstract shallCreateDynamicWorker (): boolean
1326
1327 /**
1328 * This method is the message listener registered on each worker.
1329 * @param message - The message received from the worker.
1330 */
1331 protected readonly workerMessageListener = (
1332 message: MessageValue<Response>
1333 ): void => {
1334 this.checkMessageWorkerId(message)
1335 const { ready, taskFunctionsProperties, taskId, workerId } = message
1336 if (ready != null && taskFunctionsProperties != null) {
1337 // Worker ready response received from worker
1338 this.handleWorkerReadyResponse(message)
1339 } else if (taskFunctionsProperties != null) {
1340 // Task function properties message received from worker
1341 const workerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
1342 const workerInfo = this.getWorkerInfo(workerNodeKey)
1343 if (workerInfo != null) {
1344 workerInfo.taskFunctionsProperties = taskFunctionsProperties
1345 this.sendStatisticsMessageToWorker(workerNodeKey)
1346 this.setTasksQueuePriority(workerNodeKey)
1347 }
1348 } else if (taskId != null) {
1349 // Task execution response received from worker
1350 this.handleTaskExecutionResponse(message)
1351 }
1352 }
1353
1354 private readonly abortTask = (eventDetail: WorkerNodeEventDetail): void => {
1355 if (!this.started || this.destroying) {
1356 return
1357 }
1358 const { taskId, workerId } = eventDetail
1359 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1360 const promiseResponse = this.promiseResponseMap.get(taskId!)
1361 if (promiseResponse == null) {
1362 return
1363 }
1364 const { abortSignal, reject } = promiseResponse
1365 if (abortSignal?.aborted === false) {
1366 return
1367 }
1368 const workerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
1369 const workerNode = this.workerNodes[workerNodeKey]
1370 if (!workerNode.info.ready) {
1371 return
1372 }
1373 if (this.opts.enableTasksQueue === true) {
1374 for (const task of workerNode.tasksQueue) {
1375 const { abortable, name } = task
1376 if (taskId === task.taskId && abortable === true) {
1377 workerNode.info.queuedTaskAbortion = true
1378 workerNode.deleteTask(task)
1379 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1380 this.promiseResponseMap.delete(taskId!)
1381 workerNode.info.queuedTaskAbortion = false
1382 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1383 reject(this.getAbortError(name!, taskId!))
1384 return
1385 }
1386 }
1387 }
1388 this.sendToWorker(workerNodeKey, { taskId, taskOperation: 'abort' })
1389 }
1390
1391 /**
1392 * Adds the given worker node in the pool worker nodes.
1393 * @param workerNode - The worker node.
1394 * @returns The added worker node key.
1395 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1396 */
1397 private addWorkerNode (workerNode: IWorkerNode<Worker, Data>): number {
1398 this.workerNodes.push(workerNode)
1399 const workerNodeKey = this.workerNodes.indexOf(workerNode)
1400 if (workerNodeKey === -1) {
1401 throw new Error('Worker added not found in worker nodes')
1402 }
1403 return workerNodeKey
1404 }
1405
1406 private buildTasksQueueOptions (
1407 tasksQueueOptions: TasksQueueOptions | undefined
1408 ): TasksQueueOptions {
1409 return {
1410 ...getDefaultTasksQueueOptions(
1411 this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
1412 ),
1413 ...this.opts.tasksQueueOptions,
1414 ...tasksQueueOptions,
1415 }
1416 }
1417
1418 private cannotStealTask (): boolean {
1419 return this.workerNodes.length <= 1 || this.info.queuedTasks === 0
1420 }
1421
1422 private checkAndEmitReadyEvent (): void {
1423 if (this.emitter != null && !this.readyEventEmitted && this.ready) {
1424 this.emitter.emit(PoolEvents.ready, this.info)
1425 this.readyEventEmitted = true
1426 }
1427 }
1428
1429 private checkAndEmitTaskDequeuingEvents (): void {
1430 if (
1431 this.emitter != null &&
1432 this.backPressureEventEmitted &&
1433 !this.backPressure
1434 ) {
1435 this.emitter.emit(PoolEvents.backPressureEnd, this.info)
1436 this.backPressureEventEmitted = false
1437 }
1438 }
1439
1440 private checkAndEmitTaskExecutionEvents (): void {
1441 if (this.emitter != null && !this.busyEventEmitted && this.busy) {
1442 this.emitter.emit(PoolEvents.busy, this.info)
1443 this.busyEventEmitted = true
1444 }
1445 }
1446
1447 private checkAndEmitTaskExecutionFinishedEvents (): void {
1448 if (this.emitter != null && this.busyEventEmitted && !this.busy) {
1449 this.emitter.emit(PoolEvents.busyEnd, this.info)
1450 this.busyEventEmitted = false
1451 }
1452 }
1453
1454 private checkAndEmitTaskQueuingEvents (): void {
1455 if (
1456 this.emitter != null &&
1457 !this.backPressureEventEmitted &&
1458 this.backPressure
1459 ) {
1460 this.emitter.emit(PoolEvents.backPressure, this.info)
1461 this.backPressureEventEmitted = true
1462 }
1463 }
1464
1465 /**
1466 * Checks if the worker id sent in the received message from a worker is valid.
1467 * @param message - The received message.
1468 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
1469 */
1470 private checkMessageWorkerId (message: MessageValue<Data | Response>): void {
1471 if (message.workerId == null) {
1472 throw new Error('Worker message received without worker id')
1473 }
1474 if (this.getWorkerNodeKeyByWorkerId(message.workerId) === -1) {
1475 throw new Error(
1476 `Worker message received from unknown worker '${message.workerId.toString()}'`
1477 )
1478 }
1479 }
1480
1481 private checkMinimumNumberOfWorkers (
1482 minimumNumberOfWorkers: number | undefined
1483 ): void {
1484 if (minimumNumberOfWorkers == null) {
1485 throw new Error(
1486 'Cannot instantiate a pool without specifying the number of workers'
1487 )
1488 }
1489 if (!Number.isSafeInteger(minimumNumberOfWorkers)) {
1490 throw new TypeError(
1491 'Cannot instantiate a pool with a non safe integer number of workers'
1492 )
1493 }
1494 if (minimumNumberOfWorkers < 0) {
1495 throw new RangeError(
1496 'Cannot instantiate a pool with a negative number of workers'
1497 )
1498 }
1499 if (this.type === PoolTypes.fixed && minimumNumberOfWorkers === 0) {
1500 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
1501 }
1502 }
1503
1504 private checkPoolOptions (opts: PoolOptions<Worker>): void {
1505 if (isPlainObject(opts)) {
1506 this.opts.startWorkers = opts.startWorkers ?? true
1507 checkValidWorkerChoiceStrategy(opts.workerChoiceStrategy)
1508 this.opts.workerChoiceStrategy =
1509 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
1510 this.checkValidWorkerChoiceStrategyOptions(
1511 opts.workerChoiceStrategyOptions
1512 )
1513 if (opts.workerChoiceStrategyOptions != null) {
1514 this.opts.workerChoiceStrategyOptions = opts.workerChoiceStrategyOptions
1515 }
1516 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
1517 this.opts.enableEvents = opts.enableEvents ?? true
1518 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
1519 if (this.opts.enableTasksQueue) {
1520 checkValidTasksQueueOptions(opts.tasksQueueOptions)
1521 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
1522 opts.tasksQueueOptions
1523 )
1524 }
1525 } else {
1526 throw new TypeError('Invalid pool options: must be a plain object')
1527 }
1528 }
1529
1530 private checkPoolType (): void {
1531 if (this.type === PoolTypes.fixed && this.maximumNumberOfWorkers != null) {
1532 throw new Error(
1533 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
1534 )
1535 }
1536 }
1537
1538 private checkValidWorkerChoiceStrategyOptions (
1539 workerChoiceStrategyOptions: undefined | WorkerChoiceStrategyOptions
1540 ): void {
1541 if (
1542 workerChoiceStrategyOptions != null &&
1543 !isPlainObject(workerChoiceStrategyOptions)
1544 ) {
1545 throw new TypeError(
1546 'Invalid worker choice strategy options: must be a plain object'
1547 )
1548 }
1549 if (
1550 workerChoiceStrategyOptions?.weights != null &&
1551 Object.keys(workerChoiceStrategyOptions.weights).length !==
1552 (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
1553 ) {
1554 throw new Error(
1555 'Invalid worker choice strategy options: must have a weight for each worker node'
1556 )
1557 }
1558 if (
1559 workerChoiceStrategyOptions?.measurement != null &&
1560 !Object.values(Measurements).includes(
1561 workerChoiceStrategyOptions.measurement
1562 )
1563 ) {
1564 throw new Error(
1565 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
1566 )
1567 }
1568 }
1569
1570 /**
1571 * Chooses a worker node for the next task.
1572 * @param name - The task function name.
1573 * @returns The chosen worker node key.
1574 */
1575 private chooseWorkerNode (name?: string): number {
1576 if (this.shallCreateDynamicWorker()) {
1577 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
1578 if (
1579 this.workerChoiceStrategiesContext?.getPolicy().dynamicWorkerUsage ===
1580 true
1581 ) {
1582 return workerNodeKey
1583 }
1584 }
1585 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1586 return this.workerChoiceStrategiesContext!.execute(
1587 this.getTaskFunctionWorkerChoiceStrategy(name)
1588 )
1589 }
1590
1591 /**
1592 * Creates a worker node.
1593 * @returns The created worker node.
1594 */
1595 private createWorkerNode (): IWorkerNode<Worker, Data> {
1596 const workerNode = new WorkerNode<Worker, Data>(
1597 this.worker,
1598 this.filePath,
1599 {
1600 env: this.opts.env,
1601 tasksQueueBackPressureSize:
1602 this.opts.tasksQueueOptions?.size ??
1603 getDefaultTasksQueueOptions(
1604 this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
1605 ).size,
1606 tasksQueueBucketSize: defaultBucketSize,
1607 tasksQueuePriority: this.getTasksQueuePriority(),
1608 workerOptions: this.opts.workerOptions,
1609 }
1610 )
1611 // Flag the worker node as ready at pool startup.
1612 if (this.starting) {
1613 workerNode.info.ready = true
1614 }
1615 return workerNode
1616 }
1617
1618 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1619 const task = this.workerNodes[workerNodeKey].dequeueTask()
1620 this.checkAndEmitTaskDequeuingEvents()
1621 return task
1622 }
1623
1624 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1625 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1626 this.checkAndEmitTaskQueuingEvents()
1627 return tasksQueueSize
1628 }
1629
1630 /**
1631 * Executes the given task on the worker given its worker node key.
1632 * @param workerNodeKey - The worker node key.
1633 * @param task - The task to execute.
1634 */
1635 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1636 const { transferList } = task
1637 this.beforeTaskExecutionHook(workerNodeKey, task)
1638 this.sendToWorker(workerNodeKey, task, transferList)
1639 this.checkAndEmitTaskExecutionEvents()
1640 }
1641
1642 private flushTasksQueues (): void {
1643 for (const workerNodeKey of this.workerNodes.keys()) {
1644 this.flushTasksQueue(workerNodeKey)
1645 }
1646 }
1647
1648 private readonly getAbortError = (
1649 taskName: string,
1650 taskId: `${string}-${string}-${string}-${string}-${string}`
1651 ): Error => {
1652 const abortError = this.promiseResponseMap.get(taskId)?.abortSignal
1653 ?.reason as Error | string
1654 return abortError instanceof Error
1655 ? abortError
1656 : typeof abortError === 'string'
1657 ? new Error(abortError)
1658 : new Error(`Task '${taskName}' id '${taskId}' aborted`)
1659 }
1660
1661 /**
1662 * Gets task function worker choice strategy, if any.
1663 * @param name - The task function name.
1664 * @returns The task function worker choice strategy if the task function worker choice strategy is defined, `undefined` otherwise.
1665 */
1666 private readonly getTaskFunctionWorkerChoiceStrategy = (
1667 name?: string
1668 ): undefined | WorkerChoiceStrategy => {
1669 name = name ?? DEFAULT_TASK_NAME
1670 const taskFunctionsProperties = this.listTaskFunctionsProperties()
1671 if (name === DEFAULT_TASK_NAME) {
1672 name = taskFunctionsProperties[1]?.name
1673 }
1674 return taskFunctionsProperties.find(
1675 (taskFunctionProperties: TaskFunctionProperties) =>
1676 taskFunctionProperties.name === name
1677 )?.strategy
1678 }
1679
1680 private getTasksQueuePriority (): boolean {
1681 return this.listTaskFunctionsProperties().some(
1682 taskFunctionProperties => taskFunctionProperties.priority != null
1683 )
1684 }
1685
1686 /**
1687 * Gets the worker choice strategies registered in this pool.
1688 * @returns The worker choice strategies.
1689 */
1690 private readonly getWorkerChoiceStrategies =
1691 (): Set<WorkerChoiceStrategy> => {
1692 return new Set([
1693 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1694 this.opts.workerChoiceStrategy!,
1695 ...this.listTaskFunctionsProperties()
1696 .map(
1697 (taskFunctionProperties: TaskFunctionProperties) =>
1698 taskFunctionProperties.strategy
1699 )
1700 .filter(
1701 (strategy: undefined | WorkerChoiceStrategy) => strategy != null
1702 ),
1703 ])
1704 }
1705
1706 /**
1707 * Gets the worker node key given its worker id.
1708 * @param workerId - The worker id.
1709 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
1710 */
1711 private getWorkerNodeKeyByWorkerId (workerId: number | undefined): number {
1712 return this.workerNodes.findIndex(
1713 workerNode => workerNode.info.id === workerId
1714 )
1715 }
1716
1717 /**
1718 * Gets worker node task function priority, if any.
1719 * @param workerNodeKey - The worker node key.
1720 * @param name - The task function name.
1721 * @returns The worker node task function priority if the worker node task function priority is defined, `undefined` otherwise.
1722 */
1723 private readonly getWorkerNodeTaskFunctionPriority = (
1724 workerNodeKey: number,
1725 name?: string
1726 ): number | undefined => {
1727 const workerInfo = this.getWorkerInfo(workerNodeKey)
1728 if (workerInfo == null) {
1729 return
1730 }
1731 name = name ?? DEFAULT_TASK_NAME
1732 if (name === DEFAULT_TASK_NAME) {
1733 name = workerInfo.taskFunctionsProperties?.[1]?.name
1734 }
1735 return workerInfo.taskFunctionsProperties?.find(
1736 (taskFunctionProperties: TaskFunctionProperties) =>
1737 taskFunctionProperties.name === name
1738 )?.priority
1739 }
1740
1741 /**
1742 * Gets worker node task function worker choice strategy, if any.
1743 * @param workerNodeKey - The worker node key.
1744 * @param name - The task function name.
1745 * @returns The worker node task function worker choice strategy if the worker node task function worker choice strategy is defined, `undefined` otherwise.
1746 */
1747 private readonly getWorkerNodeTaskFunctionWorkerChoiceStrategy = (
1748 workerNodeKey: number,
1749 name?: string
1750 ): undefined | WorkerChoiceStrategy => {
1751 const workerInfo = this.getWorkerInfo(workerNodeKey)
1752 if (workerInfo == null) {
1753 return
1754 }
1755 name = name ?? DEFAULT_TASK_NAME
1756 if (name === DEFAULT_TASK_NAME) {
1757 name = workerInfo.taskFunctionsProperties?.[1]?.name
1758 }
1759 return workerInfo.taskFunctionsProperties?.find(
1760 (taskFunctionProperties: TaskFunctionProperties) =>
1761 taskFunctionProperties.name === name
1762 )?.strategy
1763 }
1764
1765 private handleTask (workerNodeKey: number, task: Task<Data>): void {
1766 if (this.shallExecuteTask(workerNodeKey)) {
1767 this.executeTask(workerNodeKey, task)
1768 } else {
1769 this.enqueueTask(workerNodeKey, task)
1770 }
1771 }
1772
1773 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1774 const { data, taskId, workerError } = message
1775 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1776 const promiseResponse = this.promiseResponseMap.get(taskId!)
1777 if (promiseResponse != null) {
1778 const { asyncResource, reject, resolve, workerNodeKey } = promiseResponse
1779 const workerNode = this.workerNodes[workerNodeKey]
1780 if (workerError != null) {
1781 this.emitter?.emit(PoolEvents.taskError, workerError)
1782 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1783 const error = this.handleWorkerError(taskId!, workerError)
1784 asyncResource != null
1785 ? asyncResource.runInAsyncScope(reject, this.emitter, error)
1786 : reject(error)
1787 } else {
1788 asyncResource != null
1789 ? asyncResource.runInAsyncScope(resolve, this.emitter, data)
1790 : resolve(data as Response)
1791 }
1792 asyncResource?.emitDestroy()
1793 this.afterTaskExecutionHook(workerNodeKey, message)
1794 queueMicrotask(() => {
1795 this.checkAndEmitTaskExecutionFinishedEvents()
1796 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1797 workerNode?.emit('taskFinished', taskId)
1798 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1799 this.promiseResponseMap.delete(taskId!)
1800 if (this.opts.enableTasksQueue === true && !this.destroying) {
1801 if (
1802 !this.isWorkerNodeBusy(workerNodeKey) &&
1803 this.tasksQueueSize(workerNodeKey) > 0
1804 ) {
1805 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1806 this.executeTask(workerNodeKey, this.dequeueTask(workerNodeKey)!)
1807 }
1808 if (this.isWorkerNodeIdle(workerNodeKey)) {
1809 workerNode.emit('idle', {
1810 workerNodeKey,
1811 })
1812 }
1813 }
1814 if (this.shallCreateDynamicWorker()) {
1815 this.createAndSetupDynamicWorkerNode()
1816 }
1817 })
1818 }
1819 }
1820
1821 private readonly handleWorkerError = (
1822 taskId: `${string}-${string}-${string}-${string}-${string}`,
1823 workerError: WorkerError
1824 ): Error => {
1825 const { aborted, error, message, name, stack } = workerError
1826 if (aborted) {
1827 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1828 return this.getAbortError(name!, taskId)
1829 }
1830 if (error != null) {
1831 return error
1832 }
1833 const wError = new Error(message)
1834 wError.stack = stack
1835 return wError
1836 }
1837
1838 private readonly handleWorkerNodeBackPressureEvent = (
1839 eventDetail: WorkerNodeEventDetail
1840 ): void => {
1841 if (
1842 this.cannotStealTask() ||
1843 this.backPressure ||
1844 this.isStealingRatioReached()
1845 ) {
1846 return
1847 }
1848 const sizeOffset = 1
1849 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1850 if (this.opts.tasksQueueOptions!.size! <= sizeOffset) {
1851 return
1852 }
1853 const { workerId } = eventDetail
1854 const sourceWorkerNode =
1855 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
1856 const workerNodes = this.workerNodes
1857 .slice()
1858 .sort(
1859 (workerNodeA, workerNodeB) =>
1860 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1861 )
1862 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1863 if (sourceWorkerNode.usage.tasks.queued === 0) {
1864 break
1865 }
1866 if (
1867 workerNode.info.id !== workerId &&
1868 !workerNode.info.backPressureStealing &&
1869 workerNode.usage.tasks.queued <
1870 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1871 this.opts.tasksQueueOptions!.size! - sizeOffset
1872 ) {
1873 workerNode.info.backPressureStealing = true
1874 this.stealTask(sourceWorkerNode, workerNodeKey)
1875 workerNode.info.backPressureStealing = false
1876 }
1877 }
1878 }
1879
1880 private readonly handleWorkerNodeIdleEvent = (
1881 eventDetail: WorkerNodeEventDetail,
1882 previousStolenTask?: Task<Data>
1883 ): void => {
1884 const { workerNodeKey } = eventDetail
1885 if (workerNodeKey == null) {
1886 throw new Error(
1887 "WorkerNode event detail 'workerNodeKey' property must be defined"
1888 )
1889 }
1890 const workerNode = this.workerNodes[workerNodeKey]
1891 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1892 if (workerNode == null) {
1893 return
1894 }
1895 if (
1896 !workerNode.info.continuousStealing &&
1897 (this.cannotStealTask() || this.isStealingRatioReached())
1898 ) {
1899 return
1900 }
1901 const workerNodeTasksUsage = workerNode.usage.tasks
1902 if (
1903 workerNode.info.continuousStealing &&
1904 !this.isWorkerNodeIdle(workerNodeKey)
1905 ) {
1906 workerNode.info.continuousStealing = false
1907 if (workerNodeTasksUsage.sequentiallyStolen > 0) {
1908 this.resetTaskSequentiallyStolenStatisticsWorkerUsage(
1909 workerNodeKey,
1910 previousStolenTask?.name
1911 )
1912 }
1913 return
1914 }
1915 workerNode.info.continuousStealing = true
1916 const stolenTask = this.workerNodeStealTask(workerNodeKey)
1917 this.updateTaskSequentiallyStolenStatisticsWorkerUsage(
1918 workerNodeKey,
1919 stolenTask?.name,
1920 previousStolenTask?.name
1921 )
1922 sleep(exponentialDelay(workerNodeTasksUsage.sequentiallyStolen))
1923 .then(() => {
1924 this.handleWorkerNodeIdleEvent(eventDetail, stolenTask)
1925 return undefined
1926 })
1927 .catch((error: unknown) => {
1928 this.emitter?.emit(PoolEvents.error, error)
1929 })
1930 }
1931
1932 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
1933 const { ready, taskFunctionsProperties, workerId } = message
1934 if (ready == null || !ready) {
1935 // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
1936 throw new Error(`Worker ${workerId?.toString()} failed to initialize`)
1937 }
1938 const workerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
1939 const workerNode = this.workerNodes[workerNodeKey]
1940 workerNode.info.ready = ready
1941 workerNode.info.taskFunctionsProperties = taskFunctionsProperties
1942 this.sendStatisticsMessageToWorker(workerNodeKey)
1943 this.setTasksQueuePriority(workerNodeKey)
1944 this.checkAndEmitReadyEvent()
1945 }
1946
1947 private initEventEmitter (): void {
1948 this.emitter = new EventEmitterAsyncResource({
1949 name: `poolifier:${this.type}-${this.worker}-pool`,
1950 })
1951 }
1952
1953 /**
1954 * Initializes the worker node usage with sensible default values gathered during runtime.
1955 * @param workerNode - The worker node.
1956 */
1957 private initWorkerNodeUsage (workerNode: IWorkerNode<Worker, Data>): void {
1958 if (
1959 this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
1960 .runTime.aggregate === true
1961 ) {
1962 workerNode.usage.runTime.aggregate = min(
1963 ...this.workerNodes.map(
1964 workerNode =>
1965 workerNode.usage.runTime.aggregate ?? Number.POSITIVE_INFINITY
1966 )
1967 )
1968 }
1969 if (
1970 this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
1971 .waitTime.aggregate === true
1972 ) {
1973 workerNode.usage.waitTime.aggregate = min(
1974 ...this.workerNodes.map(
1975 workerNode =>
1976 workerNode.usage.waitTime.aggregate ?? Number.POSITIVE_INFINITY
1977 )
1978 )
1979 }
1980 if (
1981 this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements().elu
1982 .aggregate === true
1983 ) {
1984 workerNode.usage.elu.active.aggregate = min(
1985 ...this.workerNodes.map(
1986 workerNode =>
1987 workerNode.usage.elu.active.aggregate ?? Number.POSITIVE_INFINITY
1988 )
1989 )
1990 }
1991 }
1992
1993 private async internalExecute (
1994 data?: Data,
1995 name?: string,
1996 abortSignal?: AbortSignal,
1997 transferList?: readonly Transferable[]
1998 ): Promise<Response> {
1999 return await new Promise<Response>((resolve, reject) => {
2000 const timestamp = performance.now()
2001 const workerNodeKey = this.chooseWorkerNode(name)
2002 const task: Task<Data> = {
2003 abortable: abortSignal != null,
2004 data: data ?? ({} as Data),
2005 name: name ?? DEFAULT_TASK_NAME,
2006 priority: this.getWorkerNodeTaskFunctionPriority(workerNodeKey, name),
2007 strategy: this.getWorkerNodeTaskFunctionWorkerChoiceStrategy(
2008 workerNodeKey,
2009 name
2010 ),
2011 taskId: randomUUID(),
2012 timestamp,
2013 transferList,
2014 }
2015 abortSignal?.addEventListener(
2016 'abort',
2017 () => {
2018 this.workerNodes[workerNodeKey]?.emit('abortTask', {
2019 taskId: task.taskId,
2020 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
2021 workerId: this.getWorkerInfo(workerNodeKey)!.id!,
2022 })
2023 },
2024 { once: true }
2025 )
2026 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
2027 this.promiseResponseMap.set(task.taskId!, {
2028 reject,
2029 resolve,
2030 workerNodeKey,
2031 ...(this.emitter != null && {
2032 asyncResource: new AsyncResource('poolifier:task', {
2033 requireManualDestroy: true,
2034 triggerAsyncId: this.emitter.asyncId,
2035 }),
2036 }),
2037 abortSignal,
2038 })
2039 if (
2040 this.opts.enableTasksQueue === false ||
2041 (this.opts.enableTasksQueue === true &&
2042 this.shallExecuteTask(workerNodeKey))
2043 ) {
2044 this.executeTask(workerNodeKey, task)
2045 } else {
2046 this.enqueueTask(workerNodeKey, task)
2047 }
2048 })
2049 }
2050
2051 private readonly isStealingRatioReached = (): boolean => {
2052 return (
2053 this.opts.tasksQueueOptions?.tasksStealingRatio === 0 ||
2054 (this.info.stealingWorkerNodes ?? 0) >
2055 Math.ceil(
2056 this.workerNodes.length *
2057 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
2058 this.opts.tasksQueueOptions!.tasksStealingRatio!
2059 )
2060 )
2061 }
2062
2063 private isWorkerNodeBackPressured (workerNodeKey: number): boolean {
2064 const workerNode = this.workerNodes[workerNodeKey]
2065 return workerNode.info.ready && workerNode.info.backPressure
2066 }
2067
2068 private isWorkerNodeBusy (workerNodeKey: number): boolean {
2069 const workerNode = this.workerNodes[workerNodeKey]
2070 if (this.opts.enableTasksQueue === true) {
2071 return (
2072 workerNode.info.ready &&
2073 workerNode.usage.tasks.executing >=
2074 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
2075 this.opts.tasksQueueOptions!.concurrency!
2076 )
2077 }
2078 return workerNode.info.ready && workerNode.usage.tasks.executing > 0
2079 }
2080
2081 private isWorkerNodeIdle (workerNodeKey: number): boolean {
2082 const workerNode = this.workerNodes[workerNodeKey]
2083 if (this.opts.enableTasksQueue === true) {
2084 return (
2085 workerNode.info.ready &&
2086 workerNode.usage.tasks.executing === 0 &&
2087 this.tasksQueueSize(workerNodeKey) === 0
2088 )
2089 }
2090 return workerNode.info.ready && workerNode.usage.tasks.executing === 0
2091 }
2092
2093 private isWorkerNodeStealing (workerNodeKey: number): boolean {
2094 const workerNode = this.workerNodes[workerNodeKey]
2095 return (
2096 workerNode.info.ready &&
2097 (workerNode.info.continuousStealing ||
2098 workerNode.info.backPressureStealing)
2099 )
2100 }
2101
2102 private redistributeQueuedTasks (sourceWorkerNodeKey: number): void {
2103 if (sourceWorkerNodeKey === -1 || this.cannotStealTask()) {
2104 return
2105 }
2106 while (this.tasksQueueSize(sourceWorkerNodeKey) > 0) {
2107 const destinationWorkerNodeKey = this.workerNodes.reduce(
2108 (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
2109 return sourceWorkerNodeKey !== workerNodeKey &&
2110 workerNode.info.ready &&
2111 workerNode.usage.tasks.queued <
2112 workerNodes[minWorkerNodeKey].usage.tasks.queued
2113 ? workerNodeKey
2114 : minWorkerNodeKey
2115 },
2116 0
2117 )
2118 this.handleTask(
2119 destinationWorkerNodeKey,
2120 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
2121 this.dequeueTask(sourceWorkerNodeKey)!
2122 )
2123 }
2124 }
2125
2126 /**
2127 * Removes the worker node from the pool worker nodes.
2128 * @param workerNode - The worker node.
2129 */
2130 private removeWorkerNode (workerNode: IWorkerNode<Worker, Data>): void {
2131 const workerNodeKey = this.workerNodes.indexOf(workerNode)
2132 if (workerNodeKey !== -1) {
2133 this.workerNodes.splice(workerNodeKey, 1)
2134 this.workerChoiceStrategiesContext?.remove(workerNodeKey)
2135 workerNode.info.dynamic &&
2136 this.checkAndEmitDynamicWorkerDestructionEvents()
2137 }
2138 }
2139
2140 private resetTaskSequentiallyStolenStatisticsWorkerUsage (
2141 workerNodeKey: number,
2142 taskName?: string
2143 ): void {
2144 const workerNode = this.workerNodes[workerNodeKey]
2145 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
2146 if (workerNode?.usage != null) {
2147 workerNode.usage.tasks.sequentiallyStolen = 0
2148 }
2149 if (
2150 taskName != null &&
2151 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
2152 workerNode.getTaskFunctionWorkerUsage(taskName) != null
2153 ) {
2154 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
2155 workerNode.getTaskFunctionWorkerUsage(
2156 taskName
2157 )!.tasks.sequentiallyStolen = 0
2158 }
2159 }
2160
2161 private async sendKillMessageToWorker (workerNodeKey: number): Promise<void> {
2162 await new Promise<void>((resolve, reject) => {
2163 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
2164 if (this.workerNodes[workerNodeKey] == null) {
2165 resolve()
2166 return
2167 }
2168 const killMessageListener = (message: MessageValue<Response>): void => {
2169 this.checkMessageWorkerId(message)
2170 if (message.kill === 'success') {
2171 resolve()
2172 } else if (message.kill === 'failure') {
2173 reject(
2174 new Error(
2175 // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
2176 `Kill message handling failed on worker ${message.workerId?.toString()}`
2177 )
2178 )
2179 }
2180 }
2181 // FIXME: should be registered only once
2182 this.registerWorkerMessageListener(workerNodeKey, killMessageListener)
2183 this.sendToWorker(workerNodeKey, { kill: true })
2184 })
2185 }
2186
2187 /**
2188 * Sends the statistics message to worker given its worker node key.
2189 * @param workerNodeKey - The worker node key.
2190 */
2191 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
2192 this.sendToWorker(workerNodeKey, {
2193 statistics: {
2194 elu:
2195 this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
2196 .elu.aggregate ?? false,
2197 runTime:
2198 this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
2199 .runTime.aggregate ?? false,
2200 },
2201 })
2202 }
2203
2204 private async sendTaskFunctionOperationToWorker (
2205 workerNodeKey: number,
2206 message: MessageValue<Data>
2207 ): Promise<boolean> {
2208 return await new Promise<boolean>((resolve, reject) => {
2209 const taskFunctionOperationListener = (
2210 message: MessageValue<Response>
2211 ): void => {
2212 this.checkMessageWorkerId(message)
2213 const workerId = this.getWorkerInfo(workerNodeKey)?.id
2214 if (
2215 message.taskFunctionOperationStatus != null &&
2216 message.workerId === workerId
2217 ) {
2218 if (message.taskFunctionOperationStatus) {
2219 resolve(true)
2220 } else {
2221 reject(
2222 new Error(
2223 // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
2224 `Task function operation '${message.taskFunctionOperation?.toString()}' failed on worker ${message.workerId?.toString()} with error: '${
2225 // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
2226 message.workerError?.message
2227 }'`
2228 )
2229 )
2230 }
2231 this.deregisterWorkerMessageListener(
2232 this.getWorkerNodeKeyByWorkerId(message.workerId),
2233 taskFunctionOperationListener
2234 )
2235 }
2236 }
2237 this.registerWorkerMessageListener(
2238 workerNodeKey,
2239 taskFunctionOperationListener
2240 )
2241 this.sendToWorker(workerNodeKey, message)
2242 })
2243 }
2244
2245 private async sendTaskFunctionOperationToWorkers (
2246 message: MessageValue<Data>
2247 ): Promise<boolean> {
2248 return await new Promise<boolean>((resolve, reject) => {
2249 const responsesReceived: MessageValue<Response>[] = []
2250 const taskFunctionOperationsListener = (
2251 message: MessageValue<Response>
2252 ): void => {
2253 this.checkMessageWorkerId(message)
2254 if (message.taskFunctionOperationStatus != null) {
2255 responsesReceived.push(message)
2256 if (responsesReceived.length === this.workerNodes.length) {
2257 if (
2258 responsesReceived.every(
2259 message => message.taskFunctionOperationStatus === true
2260 )
2261 ) {
2262 resolve(true)
2263 } else if (
2264 responsesReceived.some(
2265 message => message.taskFunctionOperationStatus === false
2266 )
2267 ) {
2268 const errorResponse = responsesReceived.find(
2269 response => response.taskFunctionOperationStatus === false
2270 )
2271 reject(
2272 new Error(
2273 `Task function operation '${
2274 message.taskFunctionOperation as string
2275 // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
2276 }' failed on worker ${errorResponse?.workerId?.toString()} with error: '${
2277 // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
2278 errorResponse?.workerError?.message
2279 }'`
2280 )
2281 )
2282 }
2283 this.deregisterWorkerMessageListener(
2284 this.getWorkerNodeKeyByWorkerId(message.workerId),
2285 taskFunctionOperationsListener
2286 )
2287 }
2288 }
2289 }
2290 for (const workerNodeKey of this.workerNodes.keys()) {
2291 this.registerWorkerMessageListener(
2292 workerNodeKey,
2293 taskFunctionOperationsListener
2294 )
2295 this.sendToWorker(workerNodeKey, message)
2296 }
2297 })
2298 }
2299
2300 private setTasksQueuePriority (workerNodeKey: number): void {
2301 this.workerNodes[workerNodeKey].setTasksQueuePriority(
2302 this.getTasksQueuePriority()
2303 )
2304 }
2305
2306 private setTasksQueueSize (size: number): void {
2307 for (const workerNode of this.workerNodes) {
2308 workerNode.tasksQueueBackPressureSize = size
2309 }
2310 }
2311
2312 private setTasksStealingOnBackPressure (): void {
2313 for (const workerNodeKey of this.workerNodes.keys()) {
2314 this.workerNodes[workerNodeKey].on(
2315 'backPressure',
2316 this.handleWorkerNodeBackPressureEvent
2317 )
2318 }
2319 }
2320
2321 private setTaskStealing (): void {
2322 for (const workerNodeKey of this.workerNodes.keys()) {
2323 this.workerNodes[workerNodeKey].on('idle', this.handleWorkerNodeIdleEvent)
2324 }
2325 }
2326
2327 private shallExecuteTask (workerNodeKey: number): boolean {
2328 return (
2329 this.tasksQueueSize(workerNodeKey) === 0 &&
2330 this.workerNodes[workerNodeKey].usage.tasks.executing <
2331 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
2332 this.opts.tasksQueueOptions!.concurrency!
2333 )
2334 }
2335
2336 /**
2337 * Whether the worker node shall update its task function worker usage or not.
2338 * @param workerNodeKey - The worker node key.
2339 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
2340 */
2341 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
2342 const workerInfo = this.getWorkerInfo(workerNodeKey)
2343 return (
2344 workerInfo != null &&
2345 Array.isArray(workerInfo.taskFunctionsProperties) &&
2346 workerInfo.taskFunctionsProperties.length > 2
2347 )
2348 }
2349
2350 /**
2351 * Starts the minimum number of workers.
2352 * @param initWorkerNodeUsage - Whether to initialize the worker node usage or not. @defaultValue false
2353 */
2354 private startMinimumNumberOfWorkers (initWorkerNodeUsage = false): void {
2355 if (this.minimumNumberOfWorkers === 0) {
2356 return
2357 }
2358 this.startingMinimumNumberOfWorkers = true
2359 while (
2360 this.workerNodes.reduce(
2361 (accumulator, workerNode) =>
2362 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
2363 0
2364 ) < this.minimumNumberOfWorkers
2365 ) {
2366 const workerNodeKey = this.createAndSetupWorkerNode()
2367 initWorkerNodeUsage &&
2368 this.initWorkerNodeUsage(this.workerNodes[workerNodeKey])
2369 }
2370 this.startingMinimumNumberOfWorkers = false
2371 }
2372
2373 private readonly stealTask = (
2374 sourceWorkerNode: IWorkerNode<Worker, Data>,
2375 destinationWorkerNodeKey: number
2376 ): Task<Data> | undefined => {
2377 const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey]
2378 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
2379 if (destinationWorkerNode == null) {
2380 return
2381 }
2382 // Avoid cross and cascade task stealing. Could be smarter by checking stealing/stolen worker ids pair.
2383 if (
2384 !sourceWorkerNode.info.ready ||
2385 sourceWorkerNode.info.stolen ||
2386 sourceWorkerNode.info.stealing ||
2387 sourceWorkerNode.info.queuedTaskAbortion ||
2388 !destinationWorkerNode.info.ready ||
2389 destinationWorkerNode.info.stolen ||
2390 destinationWorkerNode.info.stealing ||
2391 destinationWorkerNode.info.queuedTaskAbortion
2392 ) {
2393 return
2394 }
2395 destinationWorkerNode.info.stealing = true
2396 sourceWorkerNode.info.stolen = true
2397 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
2398 const stolenTask = sourceWorkerNode.dequeueLastPrioritizedTask()!
2399 sourceWorkerNode.info.stolen = false
2400 destinationWorkerNode.info.stealing = false
2401 this.handleTask(destinationWorkerNodeKey, stolenTask)
2402 this.updateTaskStolenStatisticsWorkerUsage(
2403 destinationWorkerNodeKey,
2404 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
2405 stolenTask.name!
2406 )
2407 return stolenTask
2408 }
2409
2410 private tasksQueueSize (workerNodeKey: number): number {
2411 return this.workerNodes[workerNodeKey].tasksQueueSize()
2412 }
2413
2414 private unsetTasksStealingOnBackPressure (): void {
2415 for (const workerNodeKey of this.workerNodes.keys()) {
2416 this.workerNodes[workerNodeKey].off(
2417 'backPressure',
2418 this.handleWorkerNodeBackPressureEvent
2419 )
2420 }
2421 }
2422
2423 private unsetTaskStealing (): void {
2424 for (const workerNodeKey of this.workerNodes.keys()) {
2425 this.workerNodes[workerNodeKey].off(
2426 'idle',
2427 this.handleWorkerNodeIdleEvent
2428 )
2429 }
2430 }
2431
2432 private updateTaskSequentiallyStolenStatisticsWorkerUsage (
2433 workerNodeKey: number,
2434 taskName?: string,
2435 previousTaskName?: string
2436 ): void {
2437 const workerNode = this.workerNodes[workerNodeKey]
2438 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
2439 if (workerNode?.usage != null && taskName != null) {
2440 ++workerNode.usage.tasks.sequentiallyStolen
2441 }
2442 if (
2443 taskName != null &&
2444 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
2445 workerNode.getTaskFunctionWorkerUsage(taskName) != null
2446 ) {
2447 const taskFunctionWorkerUsage =
2448 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
2449 workerNode.getTaskFunctionWorkerUsage(taskName)!
2450 if (
2451 taskFunctionWorkerUsage.tasks.sequentiallyStolen === 0 ||
2452 (previousTaskName != null &&
2453 previousTaskName === taskName &&
2454 taskFunctionWorkerUsage.tasks.sequentiallyStolen > 0)
2455 ) {
2456 ++taskFunctionWorkerUsage.tasks.sequentiallyStolen
2457 } else if (taskFunctionWorkerUsage.tasks.sequentiallyStolen > 0) {
2458 taskFunctionWorkerUsage.tasks.sequentiallyStolen = 0
2459 }
2460 }
2461 }
2462
2463 private updateTaskStolenStatisticsWorkerUsage (
2464 workerNodeKey: number,
2465 taskName: string
2466 ): void {
2467 const workerNode = this.workerNodes[workerNodeKey]
2468 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
2469 if (workerNode?.usage != null) {
2470 ++workerNode.usage.tasks.stolen
2471 }
2472 if (
2473 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
2474 workerNode.getTaskFunctionWorkerUsage(taskName) != null
2475 ) {
2476 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
2477 ++workerNode.getTaskFunctionWorkerUsage(taskName)!.tasks.stolen
2478 }
2479 }
2480
2481 private readonly workerNodeStealTask = (
2482 workerNodeKey: number
2483 ): Task<Data> | undefined => {
2484 const workerNodes = this.workerNodes
2485 .slice()
2486 .sort(
2487 (workerNodeA, workerNodeB) =>
2488 workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
2489 )
2490 const sourceWorkerNode = workerNodes.find(
2491 (sourceWorkerNode, sourceWorkerNodeKey) =>
2492 sourceWorkerNodeKey !== workerNodeKey &&
2493 sourceWorkerNode.usage.tasks.queued > 0
2494 )
2495 if (sourceWorkerNode != null) {
2496 return this.stealTask(sourceWorkerNode, workerNodeKey)
2497 }
2498 }
2499 }