chore(deps-dev): bump tatami-ng to 0.6.0
[poolifier.git] / src / pools / abstract-pool.ts
... / ...
CommitLineData
1import type { TransferListItem } from 'node:worker_threads'
2
3import { AsyncResource } from 'node:async_hooks'
4import { randomUUID } from 'node:crypto'
5import { EventEmitterAsyncResource } from 'node:events'
6import { performance } from 'node:perf_hooks'
7
8import type {
9 MessageValue,
10 PromiseResponseWrapper,
11 Task,
12 TaskFunctionProperties,
13} from '../utility-types.js'
14import type {
15 TaskFunction,
16 TaskFunctionObject,
17} from '../worker/task-functions.js'
18import type {
19 IWorker,
20 IWorkerNode,
21 WorkerInfo,
22 WorkerNodeEventDetail,
23 WorkerType,
24} from './worker.js'
25
26import { defaultBucketSize } from '../queues/queue-types.js'
27import {
28 average,
29 buildTaskFunctionProperties,
30 DEFAULT_TASK_NAME,
31 EMPTY_FUNCTION,
32 exponentialDelay,
33 isKillBehavior,
34 isPlainObject,
35 max,
36 median,
37 min,
38 round,
39 sleep,
40} from '../utils.js'
41import { KillBehaviors } from '../worker/worker-options.js'
42import {
43 type IPool,
44 PoolEvents,
45 type PoolInfo,
46 type PoolOptions,
47 type PoolType,
48 PoolTypes,
49 type TasksQueueOptions,
50} from './pool.js'
51import {
52 Measurements,
53 WorkerChoiceStrategies,
54 type WorkerChoiceStrategy,
55 type WorkerChoiceStrategyOptions,
56} from './selection-strategies/selection-strategies-types.js'
57import { WorkerChoiceStrategiesContext } from './selection-strategies/worker-choice-strategies-context.js'
58import {
59 checkFilePath,
60 checkValidPriority,
61 checkValidTasksQueueOptions,
62 checkValidWorkerChoiceStrategy,
63 getDefaultTasksQueueOptions,
64 updateEluWorkerUsage,
65 updateRunTimeWorkerUsage,
66 updateTaskStatisticsWorkerUsage,
67 updateWaitTimeWorkerUsage,
68 waitWorkerNodeEvents,
69} from './utils.js'
70import { version } from './version.js'
71import { WorkerNode } from './worker-node.js'
72
73/**
74 * Base class that implements some shared logic for all poolifier pools.
75 * @typeParam Worker - Type of worker which manages this pool.
76 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
77 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
78 */
79export abstract class AbstractPool<
80 Worker extends IWorker,
81 Data = unknown,
82 Response = unknown
83> implements IPool<Worker, Data, Response> {
84 /**
85 * The task execution response promise map:
86 * - `key`: The message id of each submitted task.
87 * - `value`: An object that contains task's worker node key, execution response promise resolve and reject callbacks, async resource.
88 *
89 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
90 */
91 protected promiseResponseMap: Map<
92 `${string}-${string}-${string}-${string}-${string}`,
93 PromiseResponseWrapper<Response>
94 > = new Map<
95 `${string}-${string}-${string}-${string}-${string}`,
96 PromiseResponseWrapper<Response>
97 >()
98
99 /**
100 * Worker choice strategies context referencing worker choice algorithms implementation.
101 */
102 protected workerChoiceStrategiesContext?: WorkerChoiceStrategiesContext<
103 Worker,
104 Data,
105 Response
106 >
107
108 /**
109 * This method is the message listener registered on each worker.
110 * @param message - The message received from the worker.
111 */
112 protected readonly workerMessageListener = (
113 message: MessageValue<Response>
114 ): void => {
115 this.checkMessageWorkerId(message)
116 const { ready, taskFunctionsProperties, taskId, workerId } = message
117 if (ready != null && taskFunctionsProperties != null) {
118 // Worker ready response received from worker
119 this.handleWorkerReadyResponse(message)
120 } else if (taskFunctionsProperties != null) {
121 // Task function properties message received from worker
122 const workerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
123 const workerInfo = this.getWorkerInfo(workerNodeKey)
124 if (workerInfo != null) {
125 workerInfo.taskFunctionsProperties = taskFunctionsProperties
126 this.sendStatisticsMessageToWorker(workerNodeKey)
127 this.setTasksQueuePriority(workerNodeKey)
128 }
129 } else if (taskId != null) {
130 // Task execution response received from worker
131 this.handleTaskExecutionResponse(message)
132 }
133 }
134
135 /**
136 * Whether the pool back pressure event has been emitted or not.
137 */
138 private backPressureEventEmitted: boolean
139
140 /**
141 * Whether the pool busy event has been emitted or not.
142 */
143 private busyEventEmitted: boolean
144
145 /**
146 * Whether the pool is destroying or not.
147 */
148 private destroying: boolean
149
150 /**
151 * Gets task function worker choice strategy, if any.
152 * @param name - The task function name.
153 * @returns The task function worker choice strategy if the task function worker choice strategy is defined, `undefined` otherwise.
154 */
155 private readonly getTaskFunctionWorkerChoiceStrategy = (
156 name?: string
157 ): undefined | WorkerChoiceStrategy => {
158 name = name ?? DEFAULT_TASK_NAME
159 const taskFunctionsProperties = this.listTaskFunctionsProperties()
160 if (name === DEFAULT_TASK_NAME) {
161 name = taskFunctionsProperties[1]?.name
162 }
163 return taskFunctionsProperties.find(
164 (taskFunctionProperties: TaskFunctionProperties) =>
165 taskFunctionProperties.name === name
166 )?.strategy
167 }
168
169 /**
170 * Gets the worker choice strategies registered in this pool.
171 * @returns The worker choice strategies.
172 */
173 private readonly getWorkerChoiceStrategies =
174 (): Set<WorkerChoiceStrategy> => {
175 return new Set([
176 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
177 this.opts.workerChoiceStrategy!,
178 ...this.listTaskFunctionsProperties()
179 .map(
180 (taskFunctionProperties: TaskFunctionProperties) =>
181 taskFunctionProperties.strategy
182 )
183 .filter(
184 (strategy: undefined | WorkerChoiceStrategy) => strategy != null
185 ),
186 ])
187 }
188
189 /**
190 * Gets worker node task function priority, if any.
191 * @param workerNodeKey - The worker node key.
192 * @param name - The task function name.
193 * @returns The worker node task function priority if the worker node task function priority is defined, `undefined` otherwise.
194 */
195 private readonly getWorkerNodeTaskFunctionPriority = (
196 workerNodeKey: number,
197 name?: string
198 ): number | undefined => {
199 const workerInfo = this.getWorkerInfo(workerNodeKey)
200 if (workerInfo == null) {
201 return
202 }
203 name = name ?? DEFAULT_TASK_NAME
204 if (name === DEFAULT_TASK_NAME) {
205 name = workerInfo.taskFunctionsProperties?.[1]?.name
206 }
207 return workerInfo.taskFunctionsProperties?.find(
208 (taskFunctionProperties: TaskFunctionProperties) =>
209 taskFunctionProperties.name === name
210 )?.priority
211 }
212
213 /**
214 * Gets worker node task function worker choice strategy, if any.
215 * @param workerNodeKey - The worker node key.
216 * @param name - The task function name.
217 * @returns The worker node task function worker choice strategy if the worker node task function worker choice strategy is defined, `undefined` otherwise.
218 */
219 private readonly getWorkerNodeTaskFunctionWorkerChoiceStrategy = (
220 workerNodeKey: number,
221 name?: string
222 ): undefined | WorkerChoiceStrategy => {
223 const workerInfo = this.getWorkerInfo(workerNodeKey)
224 if (workerInfo == null) {
225 return
226 }
227 name = name ?? DEFAULT_TASK_NAME
228 if (name === DEFAULT_TASK_NAME) {
229 name = workerInfo.taskFunctionsProperties?.[1]?.name
230 }
231 return workerInfo.taskFunctionsProperties?.find(
232 (taskFunctionProperties: TaskFunctionProperties) =>
233 taskFunctionProperties.name === name
234 )?.strategy
235 }
236
237 private readonly handleWorkerNodeBackPressureEvent = (
238 eventDetail: WorkerNodeEventDetail
239 ): void => {
240 if (
241 this.cannotStealTask() ||
242 this.backPressure ||
243 this.isStealingRatioReached()
244 ) {
245 return
246 }
247 const sizeOffset = 1
248 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
249 if (this.opts.tasksQueueOptions!.size! <= sizeOffset) {
250 return
251 }
252 const { workerId } = eventDetail
253 const sourceWorkerNode =
254 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
255 const workerNodes = this.workerNodes
256 .slice()
257 .sort(
258 (workerNodeA, workerNodeB) =>
259 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
260 )
261 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
262 if (sourceWorkerNode.usage.tasks.queued === 0) {
263 break
264 }
265 if (
266 workerNode.info.id !== workerId &&
267 !workerNode.info.backPressureStealing &&
268 workerNode.usage.tasks.queued <
269 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
270 this.opts.tasksQueueOptions!.size! - sizeOffset
271 ) {
272 workerNode.info.backPressureStealing = true
273 this.stealTask(sourceWorkerNode, workerNodeKey)
274 workerNode.info.backPressureStealing = false
275 }
276 }
277 }
278
279 private readonly handleWorkerNodeIdleEvent = (
280 eventDetail: WorkerNodeEventDetail,
281 previousStolenTask?: Task<Data>
282 ): void => {
283 const { workerNodeKey } = eventDetail
284 if (workerNodeKey == null) {
285 throw new Error(
286 "WorkerNode event detail 'workerNodeKey' property must be defined"
287 )
288 }
289 const workerNode = this.workerNodes[workerNodeKey]
290 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
291 if (workerNode == null) {
292 return
293 }
294 if (
295 !workerNode.info.continuousStealing &&
296 (this.cannotStealTask() || this.isStealingRatioReached())
297 ) {
298 return
299 }
300 const workerNodeTasksUsage = workerNode.usage.tasks
301 if (
302 workerNode.info.continuousStealing &&
303 !this.isWorkerNodeIdle(workerNodeKey)
304 ) {
305 workerNode.info.continuousStealing = false
306 if (workerNodeTasksUsage.sequentiallyStolen > 0) {
307 this.resetTaskSequentiallyStolenStatisticsWorkerUsage(
308 workerNodeKey,
309 previousStolenTask?.name
310 )
311 }
312 return
313 }
314 workerNode.info.continuousStealing = true
315 const stolenTask = this.workerNodeStealTask(workerNodeKey)
316 this.updateTaskSequentiallyStolenStatisticsWorkerUsage(
317 workerNodeKey,
318 stolenTask?.name,
319 previousStolenTask?.name
320 )
321 sleep(exponentialDelay(workerNodeTasksUsage.sequentiallyStolen))
322 .then(() => {
323 this.handleWorkerNodeIdleEvent(eventDetail, stolenTask)
324 return undefined
325 })
326 .catch((error: unknown) => {
327 this.emitter?.emit(PoolEvents.error, error)
328 })
329 }
330
331 private readonly isStealingRatioReached = (): boolean => {
332 return (
333 this.opts.tasksQueueOptions?.tasksStealingRatio === 0 ||
334 (this.info.stealingWorkerNodes ?? 0) >
335 Math.ceil(
336 this.workerNodes.length *
337 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
338 this.opts.tasksQueueOptions!.tasksStealingRatio!
339 )
340 )
341 }
342
343 /**
344 * Whether the pool ready event has been emitted or not.
345 */
346 private readyEventEmitted: boolean
347
348 /**
349 * Whether the pool is started or not.
350 */
351 private started: boolean
352
353 /**
354 * Whether the pool is starting or not.
355 */
356 private starting: boolean
357
358 /**
359 * Whether the minimum number of workers is starting or not.
360 */
361 private startingMinimumNumberOfWorkers: boolean
362
363 /**
364 * The start timestamp of the pool.
365 */
366 private startTimestamp?: number
367
368 private readonly stealTask = (
369 sourceWorkerNode: IWorkerNode<Worker, Data>,
370 destinationWorkerNodeKey: number
371 ): Task<Data> | undefined => {
372 const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey]
373 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
374 if (destinationWorkerNode == null) {
375 return
376 }
377 // Avoid cross and cascade task stealing. Could be smarter by checking stealing/stolen worker ids pair.
378 if (
379 !sourceWorkerNode.info.ready ||
380 sourceWorkerNode.info.stolen ||
381 sourceWorkerNode.info.stealing ||
382 !destinationWorkerNode.info.ready ||
383 destinationWorkerNode.info.stolen ||
384 destinationWorkerNode.info.stealing
385 ) {
386 return
387 }
388 destinationWorkerNode.info.stealing = true
389 sourceWorkerNode.info.stolen = true
390 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
391 const stolenTask = sourceWorkerNode.dequeueLastPrioritizedTask()!
392 sourceWorkerNode.info.stolen = false
393 destinationWorkerNode.info.stealing = false
394 this.handleTask(destinationWorkerNodeKey, stolenTask)
395 this.updateTaskStolenStatisticsWorkerUsage(
396 destinationWorkerNodeKey,
397 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
398 stolenTask.name!
399 )
400 return stolenTask
401 }
402
403 /**
404 * The task functions added at runtime map:
405 * - `key`: The task function name.
406 * - `value`: The task function object.
407 */
408 private readonly taskFunctions: Map<
409 string,
410 TaskFunctionObject<Data, Response>
411 >
412
413 private readonly workerNodeStealTask = (
414 workerNodeKey: number
415 ): Task<Data> | undefined => {
416 const workerNodes = this.workerNodes
417 .slice()
418 .sort(
419 (workerNodeA, workerNodeB) =>
420 workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
421 )
422 const sourceWorkerNode = workerNodes.find(
423 (sourceWorkerNode, sourceWorkerNodeKey) =>
424 sourceWorkerNodeKey !== workerNodeKey &&
425 sourceWorkerNode.usage.tasks.queued > 0
426 )
427 if (sourceWorkerNode != null) {
428 return this.stealTask(sourceWorkerNode, workerNodeKey)
429 }
430 }
431
432 /** @inheritDoc */
433 public emitter?: EventEmitterAsyncResource
434
435 /** @inheritDoc */
436 public readonly workerNodes: IWorkerNode<Worker, Data>[] = []
437
438 /**
439 * Constructs a new poolifier pool.
440 * @param minimumNumberOfWorkers - Minimum number of workers that this pool manages.
441 * @param filePath - Path to the worker file.
442 * @param opts - Options for the pool.
443 * @param maximumNumberOfWorkers - Maximum number of workers that this pool manages.
444 */
445 public constructor (
446 protected readonly minimumNumberOfWorkers: number,
447 protected readonly filePath: string,
448 protected readonly opts: PoolOptions<Worker>,
449 protected readonly maximumNumberOfWorkers?: number
450 ) {
451 if (!this.isMain()) {
452 throw new Error(
453 'Cannot start a pool from a worker with the same type as the pool'
454 )
455 }
456 this.checkPoolType()
457 checkFilePath(this.filePath)
458 this.checkMinimumNumberOfWorkers(this.minimumNumberOfWorkers)
459 this.checkPoolOptions(this.opts)
460
461 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
462 this.executeTask = this.executeTask.bind(this)
463 this.enqueueTask = this.enqueueTask.bind(this)
464
465 if (this.opts.enableEvents === true) {
466 this.initEventEmitter()
467 }
468 this.workerChoiceStrategiesContext = new WorkerChoiceStrategiesContext<
469 Worker,
470 Data,
471 Response
472 >(
473 this,
474 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
475 [this.opts.workerChoiceStrategy!],
476 this.opts.workerChoiceStrategyOptions
477 )
478
479 this.setupHook()
480
481 this.taskFunctions = new Map<string, TaskFunctionObject<Data, Response>>()
482
483 this.started = false
484 this.starting = false
485 this.destroying = false
486 this.readyEventEmitted = false
487 this.busyEventEmitted = false
488 this.backPressureEventEmitted = false
489 this.startingMinimumNumberOfWorkers = false
490 if (this.opts.startWorkers === true) {
491 this.start()
492 }
493 }
494
495 /**
496 * Hook executed after the worker task execution.
497 * Can be overridden.
498 * @param workerNodeKey - The worker node key.
499 * @param message - The received message.
500 */
501 protected afterTaskExecutionHook (
502 workerNodeKey: number,
503 message: MessageValue<Response>
504 ): void {
505 let needWorkerChoiceStrategiesUpdate = false
506 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
507 if (this.workerNodes[workerNodeKey]?.usage != null) {
508 const workerUsage = this.workerNodes[workerNodeKey].usage
509 updateTaskStatisticsWorkerUsage(workerUsage, message)
510 updateRunTimeWorkerUsage(
511 this.workerChoiceStrategiesContext,
512 workerUsage,
513 message
514 )
515 updateEluWorkerUsage(
516 this.workerChoiceStrategiesContext,
517 workerUsage,
518 message
519 )
520 needWorkerChoiceStrategiesUpdate = true
521 }
522 if (
523 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
524 message.taskPerformance?.name != null &&
525 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
526 message.taskPerformance.name
527 ) != null
528 ) {
529 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
530 const taskFunctionWorkerUsage = this.workerNodes[
531 workerNodeKey
532 ].getTaskFunctionWorkerUsage(message.taskPerformance.name)!
533 updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
534 updateRunTimeWorkerUsage(
535 this.workerChoiceStrategiesContext,
536 taskFunctionWorkerUsage,
537 message
538 )
539 updateEluWorkerUsage(
540 this.workerChoiceStrategiesContext,
541 taskFunctionWorkerUsage,
542 message
543 )
544 needWorkerChoiceStrategiesUpdate = true
545 }
546 if (needWorkerChoiceStrategiesUpdate) {
547 this.workerChoiceStrategiesContext?.update(workerNodeKey)
548 }
549 }
550
551 /**
552 * Method hooked up after a worker node has been newly created.
553 * Can be overridden.
554 * @param workerNodeKey - The newly created worker node key.
555 */
556 protected afterWorkerNodeSetup (workerNodeKey: number): void {
557 // Listen to worker messages.
558 this.registerWorkerMessageListener(
559 workerNodeKey,
560 this.workerMessageListener
561 )
562 // Send the startup message to worker.
563 this.sendStartupMessageToWorker(workerNodeKey)
564 // Send the statistics message to worker.
565 this.sendStatisticsMessageToWorker(workerNodeKey)
566 if (this.opts.enableTasksQueue === true) {
567 if (this.opts.tasksQueueOptions?.taskStealing === true) {
568 this.workerNodes[workerNodeKey].on(
569 'idle',
570 this.handleWorkerNodeIdleEvent
571 )
572 }
573 if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
574 this.workerNodes[workerNodeKey].on(
575 'backPressure',
576 this.handleWorkerNodeBackPressureEvent
577 )
578 }
579 }
580 }
581
582 /**
583 * Hook executed before the worker task execution.
584 * Can be overridden.
585 * @param workerNodeKey - The worker node key.
586 * @param task - The task to execute.
587 */
588 protected beforeTaskExecutionHook (
589 workerNodeKey: number,
590 task: Task<Data>
591 ): void {
592 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
593 if (this.workerNodes[workerNodeKey]?.usage != null) {
594 const workerUsage = this.workerNodes[workerNodeKey].usage
595 ++workerUsage.tasks.executing
596 updateWaitTimeWorkerUsage(
597 this.workerChoiceStrategiesContext,
598 workerUsage,
599 task
600 )
601 }
602 if (
603 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
604 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
605 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(task.name!) !=
606 null
607 ) {
608 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
609 const taskFunctionWorkerUsage = this.workerNodes[
610 workerNodeKey
611 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
612 ].getTaskFunctionWorkerUsage(task.name!)!
613 ++taskFunctionWorkerUsage.tasks.executing
614 updateWaitTimeWorkerUsage(
615 this.workerChoiceStrategiesContext,
616 taskFunctionWorkerUsage,
617 task
618 )
619 }
620 }
621
622 /**
623 * Emits dynamic worker creation events.
624 */
625 protected abstract checkAndEmitDynamicWorkerCreationEvents (): void
626
627 /**
628 * Emits dynamic worker destruction events.
629 */
630 protected abstract checkAndEmitDynamicWorkerDestructionEvents (): void
631
632 /**
633 * Creates a new, completely set up dynamic worker node.
634 * @returns New, completely set up dynamic worker node key.
635 */
636 protected createAndSetupDynamicWorkerNode (): number {
637 const workerNodeKey = this.createAndSetupWorkerNode()
638 this.registerWorkerMessageListener(workerNodeKey, message => {
639 this.checkMessageWorkerId(message)
640 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
641 message.workerId
642 )
643 // Kill message received from worker
644 if (
645 isKillBehavior(KillBehaviors.HARD, message.kill) ||
646 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
647 this.isWorkerNodeIdle(localWorkerNodeKey) &&
648 !this.isWorkerNodeStealing(localWorkerNodeKey))
649 ) {
650 // Flag the worker node as not ready immediately
651 this.flagWorkerNodeAsNotReady(localWorkerNodeKey)
652 this.destroyWorkerNode(localWorkerNodeKey).catch((error: unknown) => {
653 this.emitter?.emit(PoolEvents.error, error)
654 })
655 }
656 })
657 this.sendToWorker(workerNodeKey, {
658 checkActive: true,
659 })
660 if (this.taskFunctions.size > 0) {
661 for (const [taskFunctionName, taskFunctionObject] of this.taskFunctions) {
662 this.sendTaskFunctionOperationToWorker(workerNodeKey, {
663 taskFunction: taskFunctionObject.taskFunction.toString(),
664 taskFunctionOperation: 'add',
665 taskFunctionProperties: buildTaskFunctionProperties(
666 taskFunctionName,
667 taskFunctionObject
668 ),
669 }).catch((error: unknown) => {
670 this.emitter?.emit(PoolEvents.error, error)
671 })
672 }
673 }
674 const workerNode = this.workerNodes[workerNodeKey]
675 workerNode.info.dynamic = true
676 if (
677 this.workerChoiceStrategiesContext?.getPolicy().dynamicWorkerReady ===
678 true ||
679 this.workerChoiceStrategiesContext?.getPolicy().dynamicWorkerUsage ===
680 true
681 ) {
682 workerNode.info.ready = true
683 }
684 this.initWorkerNodeUsage(workerNode)
685 this.checkAndEmitDynamicWorkerCreationEvents()
686 return workerNodeKey
687 }
688
689 /**
690 * Creates a new, completely set up worker node.
691 * @returns New, completely set up worker node key.
692 */
693 protected createAndSetupWorkerNode (): number {
694 const workerNode = this.createWorkerNode()
695 workerNode.registerWorkerEventHandler(
696 'online',
697 this.opts.onlineHandler ?? EMPTY_FUNCTION
698 )
699 workerNode.registerWorkerEventHandler(
700 'message',
701 this.opts.messageHandler ?? EMPTY_FUNCTION
702 )
703 workerNode.registerWorkerEventHandler(
704 'error',
705 this.opts.errorHandler ?? EMPTY_FUNCTION
706 )
707 workerNode.registerOnceWorkerEventHandler('error', (error: Error) => {
708 workerNode.info.ready = false
709 this.emitter?.emit(PoolEvents.error, error)
710 if (
711 this.started &&
712 !this.destroying &&
713 this.opts.restartWorkerOnError === true
714 ) {
715 if (workerNode.info.dynamic) {
716 this.createAndSetupDynamicWorkerNode()
717 } else if (!this.startingMinimumNumberOfWorkers) {
718 this.startMinimumNumberOfWorkers(true)
719 }
720 }
721 if (
722 this.started &&
723 !this.destroying &&
724 this.opts.enableTasksQueue === true
725 ) {
726 this.redistributeQueuedTasks(this.workerNodes.indexOf(workerNode))
727 }
728 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition, promise/no-promise-in-callback
729 workerNode?.terminate().catch((error: unknown) => {
730 this.emitter?.emit(PoolEvents.error, error)
731 })
732 })
733 workerNode.registerWorkerEventHandler(
734 'exit',
735 this.opts.exitHandler ?? EMPTY_FUNCTION
736 )
737 workerNode.registerOnceWorkerEventHandler('exit', () => {
738 this.removeWorkerNode(workerNode)
739 if (
740 this.started &&
741 !this.startingMinimumNumberOfWorkers &&
742 !this.destroying
743 ) {
744 this.startMinimumNumberOfWorkers(true)
745 }
746 })
747 const workerNodeKey = this.addWorkerNode(workerNode)
748 this.afterWorkerNodeSetup(workerNodeKey)
749 return workerNodeKey
750 }
751
752 /**
753 * Deregisters a listener callback on the worker given its worker node key.
754 * @param workerNodeKey - The worker node key.
755 * @param listener - The message listener callback.
756 */
757 protected abstract deregisterWorkerMessageListener<
758 Message extends Data | Response
759 >(
760 workerNodeKey: number,
761 listener: (message: MessageValue<Message>) => void
762 ): void
763
764 /**
765 * Terminates the worker node given its worker node key.
766 * @param workerNodeKey - The worker node key.
767 */
768 protected async destroyWorkerNode (workerNodeKey: number): Promise<void> {
769 this.flagWorkerNodeAsNotReady(workerNodeKey)
770 const flushedTasks = this.flushTasksQueue(workerNodeKey)
771 const workerNode = this.workerNodes[workerNodeKey]
772 await waitWorkerNodeEvents(
773 workerNode,
774 'taskFinished',
775 flushedTasks,
776 this.opts.tasksQueueOptions?.tasksFinishedTimeout ??
777 getDefaultTasksQueueOptions(
778 this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
779 ).tasksFinishedTimeout
780 )
781 await this.sendKillMessageToWorker(workerNodeKey)
782 await workerNode.terminate()
783 }
784
785 protected flagWorkerNodeAsNotReady (workerNodeKey: number): void {
786 const workerInfo = this.getWorkerInfo(workerNodeKey)
787 if (workerInfo != null) {
788 workerInfo.ready = false
789 }
790 }
791
792 protected flushTasksQueue (workerNodeKey: number): number {
793 let flushedTasks = 0
794 while (this.tasksQueueSize(workerNodeKey) > 0) {
795 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
796 this.executeTask(workerNodeKey, this.dequeueTask(workerNodeKey)!)
797 ++flushedTasks
798 }
799 this.workerNodes[workerNodeKey].clearTasksQueue()
800 return flushedTasks
801 }
802
803 /**
804 * Gets the worker information given its worker node key.
805 * @param workerNodeKey - The worker node key.
806 * @returns The worker information.
807 */
808 protected getWorkerInfo (workerNodeKey: number): undefined | WorkerInfo {
809 return this.workerNodes[workerNodeKey]?.info
810 }
811
812 /**
813 * Whether the worker nodes are back pressured or not.
814 * @returns Worker nodes back pressure boolean status.
815 */
816 protected internalBackPressure (): boolean {
817 return (
818 this.workerNodes.reduce(
819 (accumulator, _, workerNodeKey) =>
820 this.isWorkerNodeBackPressured(workerNodeKey)
821 ? accumulator + 1
822 : accumulator,
823 0
824 ) === this.workerNodes.length
825 )
826 }
827
828 /**
829 * Whether worker nodes are executing concurrently their tasks quota or not.
830 * @returns Worker nodes busyness boolean status.
831 */
832 protected internalBusy (): boolean {
833 return (
834 this.workerNodes.reduce(
835 (accumulator, _, workerNodeKey) =>
836 this.isWorkerNodeBusy(workerNodeKey) ? accumulator + 1 : accumulator,
837 0
838 ) === this.workerNodes.length
839 )
840 }
841
842 /**
843 * Returns whether the worker is the main worker or not.
844 * @returns `true` if the worker is the main worker, `false` otherwise.
845 */
846 protected abstract isMain (): boolean
847
848 /**
849 * Registers once a listener callback on the worker given its worker node key.
850 * @param workerNodeKey - The worker node key.
851 * @param listener - The message listener callback.
852 */
853 protected abstract registerOnceWorkerMessageListener<
854 Message extends Data | Response
855 >(
856 workerNodeKey: number,
857 listener: (message: MessageValue<Message>) => void
858 ): void
859
860 /**
861 * Registers a listener callback on the worker given its worker node key.
862 * @param workerNodeKey - The worker node key.
863 * @param listener - The message listener callback.
864 */
865 protected abstract registerWorkerMessageListener<
866 Message extends Data | Response
867 >(
868 workerNodeKey: number,
869 listener: (message: MessageValue<Message>) => void
870 ): void
871
872 /**
873 * Sends the startup message to worker given its worker node key.
874 * @param workerNodeKey - The worker node key.
875 */
876 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
877
878 /**
879 * Sends a message to worker given its worker node key.
880 * @param workerNodeKey - The worker node key.
881 * @param message - The message.
882 * @param transferList - The optional array of transferable objects.
883 */
884 protected abstract sendToWorker (
885 workerNodeKey: number,
886 message: MessageValue<Data>,
887 transferList?: readonly TransferListItem[]
888 ): void
889
890 /**
891 * Setup hook to execute code before worker nodes are created in the abstract constructor.
892 * Can be overridden.
893 */
894 protected setupHook (): void {
895 /* Intentionally empty */
896 }
897
898 /**
899 * Conditions for dynamic worker creation.
900 * @returns Whether to create a dynamic worker or not.
901 */
902 protected abstract shallCreateDynamicWorker (): boolean
903
904 /**
905 * Adds the given worker node in the pool worker nodes.
906 * @param workerNode - The worker node.
907 * @returns The added worker node key.
908 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
909 */
910 private addWorkerNode (workerNode: IWorkerNode<Worker, Data>): number {
911 this.workerNodes.push(workerNode)
912 const workerNodeKey = this.workerNodes.indexOf(workerNode)
913 if (workerNodeKey === -1) {
914 throw new Error('Worker added not found in worker nodes')
915 }
916 return workerNodeKey
917 }
918
919 private buildTasksQueueOptions (
920 tasksQueueOptions: TasksQueueOptions | undefined
921 ): TasksQueueOptions {
922 return {
923 ...getDefaultTasksQueueOptions(
924 this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
925 ),
926 ...this.opts.tasksQueueOptions,
927 ...tasksQueueOptions,
928 }
929 }
930
931 private cannotStealTask (): boolean {
932 return this.workerNodes.length <= 1 || this.info.queuedTasks === 0
933 }
934
935 private checkAndEmitReadyEvent (): void {
936 if (this.emitter != null && !this.readyEventEmitted && this.ready) {
937 this.emitter.emit(PoolEvents.ready, this.info)
938 this.readyEventEmitted = true
939 }
940 }
941
942 private checkAndEmitTaskDequeuingEvents (): void {
943 if (
944 this.emitter != null &&
945 this.backPressureEventEmitted &&
946 !this.backPressure
947 ) {
948 this.emitter.emit(PoolEvents.backPressureEnd, this.info)
949 this.backPressureEventEmitted = false
950 }
951 }
952
953 private checkAndEmitTaskExecutionEvents (): void {
954 if (this.emitter != null && !this.busyEventEmitted && this.busy) {
955 this.emitter.emit(PoolEvents.busy, this.info)
956 this.busyEventEmitted = true
957 }
958 }
959
960 private checkAndEmitTaskExecutionFinishedEvents (): void {
961 if (this.emitter != null && this.busyEventEmitted && !this.busy) {
962 this.emitter.emit(PoolEvents.busyEnd, this.info)
963 this.busyEventEmitted = false
964 }
965 }
966
967 private checkAndEmitTaskQueuingEvents (): void {
968 if (
969 this.emitter != null &&
970 !this.backPressureEventEmitted &&
971 this.backPressure
972 ) {
973 this.emitter.emit(PoolEvents.backPressure, this.info)
974 this.backPressureEventEmitted = true
975 }
976 }
977
978 /**
979 * Checks if the worker id sent in the received message from a worker is valid.
980 * @param message - The received message.
981 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
982 */
983 private checkMessageWorkerId (message: MessageValue<Data | Response>): void {
984 if (message.workerId == null) {
985 throw new Error('Worker message received without worker id')
986 } else if (this.getWorkerNodeKeyByWorkerId(message.workerId) === -1) {
987 throw new Error(
988 `Worker message received from unknown worker '${message.workerId.toString()}'`
989 )
990 }
991 }
992
993 private checkMinimumNumberOfWorkers (
994 minimumNumberOfWorkers: number | undefined
995 ): void {
996 if (minimumNumberOfWorkers == null) {
997 throw new Error(
998 'Cannot instantiate a pool without specifying the number of workers'
999 )
1000 } else if (!Number.isSafeInteger(minimumNumberOfWorkers)) {
1001 throw new TypeError(
1002 'Cannot instantiate a pool with a non safe integer number of workers'
1003 )
1004 } else if (minimumNumberOfWorkers < 0) {
1005 throw new RangeError(
1006 'Cannot instantiate a pool with a negative number of workers'
1007 )
1008 } else if (this.type === PoolTypes.fixed && minimumNumberOfWorkers === 0) {
1009 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
1010 }
1011 }
1012
1013 private checkPoolOptions (opts: PoolOptions<Worker>): void {
1014 if (isPlainObject(opts)) {
1015 this.opts.startWorkers = opts.startWorkers ?? true
1016 checkValidWorkerChoiceStrategy(opts.workerChoiceStrategy)
1017 this.opts.workerChoiceStrategy =
1018 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
1019 this.checkValidWorkerChoiceStrategyOptions(
1020 opts.workerChoiceStrategyOptions
1021 )
1022 if (opts.workerChoiceStrategyOptions != null) {
1023 this.opts.workerChoiceStrategyOptions = opts.workerChoiceStrategyOptions
1024 }
1025 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
1026 this.opts.enableEvents = opts.enableEvents ?? true
1027 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
1028 if (this.opts.enableTasksQueue) {
1029 checkValidTasksQueueOptions(opts.tasksQueueOptions)
1030 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
1031 opts.tasksQueueOptions
1032 )
1033 }
1034 } else {
1035 throw new TypeError('Invalid pool options: must be a plain object')
1036 }
1037 }
1038
1039 private checkPoolType (): void {
1040 if (this.type === PoolTypes.fixed && this.maximumNumberOfWorkers != null) {
1041 throw new Error(
1042 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
1043 )
1044 }
1045 }
1046
1047 private checkValidWorkerChoiceStrategyOptions (
1048 workerChoiceStrategyOptions: undefined | WorkerChoiceStrategyOptions
1049 ): void {
1050 if (
1051 workerChoiceStrategyOptions != null &&
1052 !isPlainObject(workerChoiceStrategyOptions)
1053 ) {
1054 throw new TypeError(
1055 'Invalid worker choice strategy options: must be a plain object'
1056 )
1057 }
1058 if (
1059 workerChoiceStrategyOptions?.weights != null &&
1060 Object.keys(workerChoiceStrategyOptions.weights).length !==
1061 (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
1062 ) {
1063 throw new Error(
1064 'Invalid worker choice strategy options: must have a weight for each worker node'
1065 )
1066 }
1067 if (
1068 workerChoiceStrategyOptions?.measurement != null &&
1069 !Object.values(Measurements).includes(
1070 workerChoiceStrategyOptions.measurement
1071 )
1072 ) {
1073 throw new Error(
1074 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
1075 )
1076 }
1077 }
1078
1079 /**
1080 * Chooses a worker node for the next task.
1081 * @param name - The task function name.
1082 * @returns The chosen worker node key.
1083 */
1084 private chooseWorkerNode (name?: string): number {
1085 if (this.shallCreateDynamicWorker()) {
1086 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
1087 if (
1088 this.workerChoiceStrategiesContext?.getPolicy().dynamicWorkerUsage ===
1089 true
1090 ) {
1091 return workerNodeKey
1092 }
1093 }
1094 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1095 return this.workerChoiceStrategiesContext!.execute(
1096 this.getTaskFunctionWorkerChoiceStrategy(name)
1097 )
1098 }
1099
1100 /**
1101 * Creates a worker node.
1102 * @returns The created worker node.
1103 */
1104 private createWorkerNode (): IWorkerNode<Worker, Data> {
1105 const workerNode = new WorkerNode<Worker, Data>(
1106 this.worker,
1107 this.filePath,
1108 {
1109 env: this.opts.env,
1110 tasksQueueBackPressureSize:
1111 this.opts.tasksQueueOptions?.size ??
1112 getDefaultTasksQueueOptions(
1113 this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
1114 ).size,
1115 tasksQueueBucketSize: defaultBucketSize,
1116 tasksQueuePriority: this.getTasksQueuePriority(),
1117 workerOptions: this.opts.workerOptions,
1118 }
1119 )
1120 // Flag the worker node as ready at pool startup.
1121 if (this.starting) {
1122 workerNode.info.ready = true
1123 }
1124 return workerNode
1125 }
1126
1127 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1128 const task = this.workerNodes[workerNodeKey].dequeueTask()
1129 this.checkAndEmitTaskDequeuingEvents()
1130 return task
1131 }
1132
1133 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1134 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1135 this.checkAndEmitTaskQueuingEvents()
1136 return tasksQueueSize
1137 }
1138
1139 /**
1140 * Executes the given task on the worker given its worker node key.
1141 * @param workerNodeKey - The worker node key.
1142 * @param task - The task to execute.
1143 */
1144 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1145 this.beforeTaskExecutionHook(workerNodeKey, task)
1146 this.sendToWorker(workerNodeKey, task, task.transferList)
1147 this.checkAndEmitTaskExecutionEvents()
1148 }
1149
1150 private flushTasksQueues (): void {
1151 for (const workerNodeKey of this.workerNodes.keys()) {
1152 this.flushTasksQueue(workerNodeKey)
1153 }
1154 }
1155
1156 private getTasksQueuePriority (): boolean {
1157 return this.listTaskFunctionsProperties().some(
1158 taskFunctionProperties => taskFunctionProperties.priority != null
1159 )
1160 }
1161
1162 /**
1163 * Gets the worker node key given its worker id.
1164 * @param workerId - The worker id.
1165 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
1166 */
1167 private getWorkerNodeKeyByWorkerId (workerId: number | undefined): number {
1168 return this.workerNodes.findIndex(
1169 workerNode => workerNode.info.id === workerId
1170 )
1171 }
1172
1173 private handleTask (workerNodeKey: number, task: Task<Data>): void {
1174 if (this.shallExecuteTask(workerNodeKey)) {
1175 this.executeTask(workerNodeKey, task)
1176 } else {
1177 this.enqueueTask(workerNodeKey, task)
1178 }
1179 }
1180
1181 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1182 const { data, taskId, workerError } = message
1183 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1184 const promiseResponse = this.promiseResponseMap.get(taskId!)
1185 if (promiseResponse != null) {
1186 const { asyncResource, reject, resolve, workerNodeKey } = promiseResponse
1187 const workerNode = this.workerNodes[workerNodeKey]
1188 if (workerError != null) {
1189 this.emitter?.emit(PoolEvents.taskError, workerError)
1190 asyncResource != null
1191 ? asyncResource.runInAsyncScope(
1192 reject,
1193 this.emitter,
1194 workerError.message
1195 )
1196 : reject(workerError.message)
1197 } else {
1198 asyncResource != null
1199 ? asyncResource.runInAsyncScope(resolve, this.emitter, data)
1200 : resolve(data as Response)
1201 }
1202 asyncResource?.emitDestroy()
1203 this.afterTaskExecutionHook(workerNodeKey, message)
1204 this.checkAndEmitTaskExecutionFinishedEvents()
1205 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1206 this.promiseResponseMap.delete(taskId!)
1207 if (this.opts.enableTasksQueue === true && !this.destroying) {
1208 if (
1209 !this.isWorkerNodeBusy(workerNodeKey) &&
1210 this.tasksQueueSize(workerNodeKey) > 0
1211 ) {
1212 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1213 this.executeTask(workerNodeKey, this.dequeueTask(workerNodeKey)!)
1214 }
1215 if (this.isWorkerNodeIdle(workerNodeKey)) {
1216 workerNode.emit('idle', {
1217 workerNodeKey,
1218 })
1219 }
1220 }
1221 // FIXME: cannot be theoretically undefined. Schedule in the next tick to avoid race conditions?
1222 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1223 workerNode?.emit('taskFinished', taskId)
1224 }
1225 }
1226
1227 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
1228 const { ready, taskFunctionsProperties, workerId } = message
1229 if (ready == null || !ready) {
1230 // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
1231 throw new Error(`Worker ${workerId?.toString()} failed to initialize`)
1232 }
1233 const workerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
1234 const workerNode = this.workerNodes[workerNodeKey]
1235 workerNode.info.ready = ready
1236 workerNode.info.taskFunctionsProperties = taskFunctionsProperties
1237 this.sendStatisticsMessageToWorker(workerNodeKey)
1238 this.setTasksQueuePriority(workerNodeKey)
1239 this.checkAndEmitReadyEvent()
1240 }
1241
1242 private initEventEmitter (): void {
1243 this.emitter = new EventEmitterAsyncResource({
1244 name: `poolifier:${this.type}-${this.worker}-pool`,
1245 })
1246 }
1247
1248 /**
1249 * Initializes the worker node usage with sensible default values gathered during runtime.
1250 * @param workerNode - The worker node.
1251 */
1252 private initWorkerNodeUsage (workerNode: IWorkerNode<Worker, Data>): void {
1253 if (
1254 this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
1255 .runTime.aggregate === true
1256 ) {
1257 workerNode.usage.runTime.aggregate = min(
1258 ...this.workerNodes.map(
1259 workerNode =>
1260 workerNode.usage.runTime.aggregate ?? Number.POSITIVE_INFINITY
1261 )
1262 )
1263 }
1264 if (
1265 this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
1266 .waitTime.aggregate === true
1267 ) {
1268 workerNode.usage.waitTime.aggregate = min(
1269 ...this.workerNodes.map(
1270 workerNode =>
1271 workerNode.usage.waitTime.aggregate ?? Number.POSITIVE_INFINITY
1272 )
1273 )
1274 }
1275 if (
1276 this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements().elu
1277 .aggregate === true
1278 ) {
1279 workerNode.usage.elu.active.aggregate = min(
1280 ...this.workerNodes.map(
1281 workerNode =>
1282 workerNode.usage.elu.active.aggregate ?? Number.POSITIVE_INFINITY
1283 )
1284 )
1285 }
1286 }
1287
1288 private async internalExecute (
1289 data?: Data,
1290 name?: string,
1291 transferList?: readonly TransferListItem[]
1292 ): Promise<Response> {
1293 return await new Promise<Response>((resolve, reject) => {
1294 const timestamp = performance.now()
1295 const workerNodeKey = this.chooseWorkerNode(name)
1296 const task: Task<Data> = {
1297 data: data ?? ({} as Data),
1298 name: name ?? DEFAULT_TASK_NAME,
1299 priority: this.getWorkerNodeTaskFunctionPriority(workerNodeKey, name),
1300 strategy: this.getWorkerNodeTaskFunctionWorkerChoiceStrategy(
1301 workerNodeKey,
1302 name
1303 ),
1304 taskId: randomUUID(),
1305 timestamp,
1306 transferList,
1307 }
1308 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1309 this.promiseResponseMap.set(task.taskId!, {
1310 reject,
1311 resolve,
1312 workerNodeKey,
1313 ...(this.emitter != null && {
1314 asyncResource: new AsyncResource('poolifier:task', {
1315 requireManualDestroy: true,
1316 triggerAsyncId: this.emitter.asyncId,
1317 }),
1318 }),
1319 })
1320 if (
1321 this.opts.enableTasksQueue === false ||
1322 (this.opts.enableTasksQueue === true &&
1323 this.shallExecuteTask(workerNodeKey))
1324 ) {
1325 this.executeTask(workerNodeKey, task)
1326 } else {
1327 this.enqueueTask(workerNodeKey, task)
1328 }
1329 })
1330 }
1331
1332 private isWorkerNodeBackPressured (workerNodeKey: number): boolean {
1333 const workerNode = this.workerNodes[workerNodeKey]
1334 return workerNode.info.ready && workerNode.info.backPressure
1335 }
1336
1337 private isWorkerNodeBusy (workerNodeKey: number): boolean {
1338 const workerNode = this.workerNodes[workerNodeKey]
1339 if (this.opts.enableTasksQueue === true) {
1340 return (
1341 workerNode.info.ready &&
1342 workerNode.usage.tasks.executing >=
1343 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1344 this.opts.tasksQueueOptions!.concurrency!
1345 )
1346 }
1347 return workerNode.info.ready && workerNode.usage.tasks.executing > 0
1348 }
1349
1350 private isWorkerNodeIdle (workerNodeKey: number): boolean {
1351 const workerNode = this.workerNodes[workerNodeKey]
1352 if (this.opts.enableTasksQueue === true) {
1353 return (
1354 workerNode.info.ready &&
1355 workerNode.usage.tasks.executing === 0 &&
1356 this.tasksQueueSize(workerNodeKey) === 0
1357 )
1358 }
1359 return workerNode.info.ready && workerNode.usage.tasks.executing === 0
1360 }
1361
1362 private isWorkerNodeStealing (workerNodeKey: number): boolean {
1363 const workerNode = this.workerNodes[workerNodeKey]
1364 return (
1365 workerNode.info.ready &&
1366 (workerNode.info.continuousStealing ||
1367 workerNode.info.backPressureStealing)
1368 )
1369 }
1370
1371 private redistributeQueuedTasks (sourceWorkerNodeKey: number): void {
1372 if (sourceWorkerNodeKey === -1 || this.cannotStealTask()) {
1373 return
1374 }
1375 while (this.tasksQueueSize(sourceWorkerNodeKey) > 0) {
1376 const destinationWorkerNodeKey = this.workerNodes.reduce(
1377 (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
1378 return sourceWorkerNodeKey !== workerNodeKey &&
1379 workerNode.info.ready &&
1380 workerNode.usage.tasks.queued <
1381 workerNodes[minWorkerNodeKey].usage.tasks.queued
1382 ? workerNodeKey
1383 : minWorkerNodeKey
1384 },
1385 0
1386 )
1387 this.handleTask(
1388 destinationWorkerNodeKey,
1389 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1390 this.dequeueTask(sourceWorkerNodeKey)!
1391 )
1392 }
1393 }
1394
1395 /**
1396 * Removes the worker node from the pool worker nodes.
1397 * @param workerNode - The worker node.
1398 */
1399 private removeWorkerNode (workerNode: IWorkerNode<Worker, Data>): void {
1400 const workerNodeKey = this.workerNodes.indexOf(workerNode)
1401 if (workerNodeKey !== -1) {
1402 this.workerNodes.splice(workerNodeKey, 1)
1403 this.workerChoiceStrategiesContext?.remove(workerNodeKey)
1404 workerNode.info.dynamic &&
1405 this.checkAndEmitDynamicWorkerDestructionEvents()
1406 }
1407 }
1408
1409 private resetTaskSequentiallyStolenStatisticsWorkerUsage (
1410 workerNodeKey: number,
1411 taskName?: string
1412 ): void {
1413 const workerNode = this.workerNodes[workerNodeKey]
1414 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1415 if (workerNode?.usage != null) {
1416 workerNode.usage.tasks.sequentiallyStolen = 0
1417 }
1418 if (
1419 taskName != null &&
1420 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1421 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1422 ) {
1423 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1424 workerNode.getTaskFunctionWorkerUsage(
1425 taskName
1426 )!.tasks.sequentiallyStolen = 0
1427 }
1428 }
1429
1430 private async sendKillMessageToWorker (workerNodeKey: number): Promise<void> {
1431 await new Promise<void>((resolve, reject) => {
1432 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1433 if (this.workerNodes[workerNodeKey] == null) {
1434 resolve()
1435 return
1436 }
1437 const killMessageListener = (message: MessageValue<Response>): void => {
1438 this.checkMessageWorkerId(message)
1439 if (message.kill === 'success') {
1440 resolve()
1441 } else if (message.kill === 'failure') {
1442 reject(
1443 new Error(
1444 // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
1445 `Kill message handling failed on worker ${message.workerId?.toString()}`
1446 )
1447 )
1448 }
1449 }
1450 // FIXME: should be registered only once
1451 this.registerWorkerMessageListener(workerNodeKey, killMessageListener)
1452 this.sendToWorker(workerNodeKey, { kill: true })
1453 })
1454 }
1455
1456 /**
1457 * Sends the statistics message to worker given its worker node key.
1458 * @param workerNodeKey - The worker node key.
1459 */
1460 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
1461 this.sendToWorker(workerNodeKey, {
1462 statistics: {
1463 elu:
1464 this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
1465 .elu.aggregate ?? false,
1466 runTime:
1467 this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
1468 .runTime.aggregate ?? false,
1469 },
1470 })
1471 }
1472
1473 private async sendTaskFunctionOperationToWorker (
1474 workerNodeKey: number,
1475 message: MessageValue<Data>
1476 ): Promise<boolean> {
1477 return await new Promise<boolean>((resolve, reject) => {
1478 const taskFunctionOperationListener = (
1479 message: MessageValue<Response>
1480 ): void => {
1481 this.checkMessageWorkerId(message)
1482 const workerId = this.getWorkerInfo(workerNodeKey)?.id
1483 if (
1484 message.taskFunctionOperationStatus != null &&
1485 message.workerId === workerId
1486 ) {
1487 if (message.taskFunctionOperationStatus) {
1488 resolve(true)
1489 } else {
1490 reject(
1491 new Error(
1492 // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
1493 `Task function operation '${message.taskFunctionOperation?.toString()}' failed on worker ${message.workerId?.toString()} with error: '${
1494 // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
1495 message.workerError?.message
1496 }'`
1497 )
1498 )
1499 }
1500 this.deregisterWorkerMessageListener(
1501 this.getWorkerNodeKeyByWorkerId(message.workerId),
1502 taskFunctionOperationListener
1503 )
1504 }
1505 }
1506 this.registerWorkerMessageListener(
1507 workerNodeKey,
1508 taskFunctionOperationListener
1509 )
1510 this.sendToWorker(workerNodeKey, message)
1511 })
1512 }
1513
1514 private async sendTaskFunctionOperationToWorkers (
1515 message: MessageValue<Data>
1516 ): Promise<boolean> {
1517 return await new Promise<boolean>((resolve, reject) => {
1518 const responsesReceived = new Array<MessageValue<Response>>()
1519 const taskFunctionOperationsListener = (
1520 message: MessageValue<Response>
1521 ): void => {
1522 this.checkMessageWorkerId(message)
1523 if (message.taskFunctionOperationStatus != null) {
1524 responsesReceived.push(message)
1525 if (responsesReceived.length === this.workerNodes.length) {
1526 if (
1527 responsesReceived.every(
1528 message => message.taskFunctionOperationStatus === true
1529 )
1530 ) {
1531 resolve(true)
1532 } else if (
1533 responsesReceived.some(
1534 message => message.taskFunctionOperationStatus === false
1535 )
1536 ) {
1537 const errorResponse = responsesReceived.find(
1538 response => response.taskFunctionOperationStatus === false
1539 )
1540 reject(
1541 new Error(
1542 `Task function operation '${
1543 message.taskFunctionOperation as string
1544 // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
1545 }' failed on worker ${errorResponse?.workerId?.toString()} with error: '${
1546 // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
1547 errorResponse?.workerError?.message
1548 }'`
1549 )
1550 )
1551 }
1552 this.deregisterWorkerMessageListener(
1553 this.getWorkerNodeKeyByWorkerId(message.workerId),
1554 taskFunctionOperationsListener
1555 )
1556 }
1557 }
1558 }
1559 for (const workerNodeKey of this.workerNodes.keys()) {
1560 this.registerWorkerMessageListener(
1561 workerNodeKey,
1562 taskFunctionOperationsListener
1563 )
1564 this.sendToWorker(workerNodeKey, message)
1565 }
1566 })
1567 }
1568
1569 private setTasksQueuePriority (workerNodeKey: number): void {
1570 this.workerNodes[workerNodeKey].setTasksQueuePriority(
1571 this.getTasksQueuePriority()
1572 )
1573 }
1574
1575 private setTasksQueueSize (size: number): void {
1576 for (const workerNode of this.workerNodes) {
1577 workerNode.tasksQueueBackPressureSize = size
1578 }
1579 }
1580
1581 private setTasksStealingOnBackPressure (): void {
1582 for (const workerNodeKey of this.workerNodes.keys()) {
1583 this.workerNodes[workerNodeKey].on(
1584 'backPressure',
1585 this.handleWorkerNodeBackPressureEvent
1586 )
1587 }
1588 }
1589
1590 private setTaskStealing (): void {
1591 for (const workerNodeKey of this.workerNodes.keys()) {
1592 this.workerNodes[workerNodeKey].on('idle', this.handleWorkerNodeIdleEvent)
1593 }
1594 }
1595
1596 private shallExecuteTask (workerNodeKey: number): boolean {
1597 return (
1598 this.tasksQueueSize(workerNodeKey) === 0 &&
1599 this.workerNodes[workerNodeKey].usage.tasks.executing <
1600 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1601 this.opts.tasksQueueOptions!.concurrency!
1602 )
1603 }
1604
1605 /**
1606 * Whether the worker node shall update its task function worker usage or not.
1607 * @param workerNodeKey - The worker node key.
1608 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1609 */
1610 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
1611 const workerInfo = this.getWorkerInfo(workerNodeKey)
1612 return (
1613 workerInfo != null &&
1614 Array.isArray(workerInfo.taskFunctionsProperties) &&
1615 workerInfo.taskFunctionsProperties.length > 2
1616 )
1617 }
1618
1619 /**
1620 * Starts the minimum number of workers.
1621 * @param initWorkerNodeUsage - Whether to initialize the worker node usage or not. @defaultValue false
1622 */
1623 private startMinimumNumberOfWorkers (initWorkerNodeUsage = false): void {
1624 if (this.minimumNumberOfWorkers === 0) {
1625 return
1626 }
1627 this.startingMinimumNumberOfWorkers = true
1628 while (
1629 this.workerNodes.reduce(
1630 (accumulator, workerNode) =>
1631 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
1632 0
1633 ) < this.minimumNumberOfWorkers
1634 ) {
1635 const workerNodeKey = this.createAndSetupWorkerNode()
1636 initWorkerNodeUsage &&
1637 this.initWorkerNodeUsage(this.workerNodes[workerNodeKey])
1638 }
1639 this.startingMinimumNumberOfWorkers = false
1640 }
1641
1642 private tasksQueueSize (workerNodeKey: number): number {
1643 return this.workerNodes[workerNodeKey].tasksQueueSize()
1644 }
1645
1646 private unsetTasksStealingOnBackPressure (): void {
1647 for (const workerNodeKey of this.workerNodes.keys()) {
1648 this.workerNodes[workerNodeKey].off(
1649 'backPressure',
1650 this.handleWorkerNodeBackPressureEvent
1651 )
1652 }
1653 }
1654
1655 private unsetTaskStealing (): void {
1656 for (const workerNodeKey of this.workerNodes.keys()) {
1657 this.workerNodes[workerNodeKey].off(
1658 'idle',
1659 this.handleWorkerNodeIdleEvent
1660 )
1661 }
1662 }
1663
1664 private updateTaskSequentiallyStolenStatisticsWorkerUsage (
1665 workerNodeKey: number,
1666 taskName?: string,
1667 previousTaskName?: string
1668 ): void {
1669 const workerNode = this.workerNodes[workerNodeKey]
1670 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1671 if (workerNode?.usage != null && taskName != null) {
1672 ++workerNode.usage.tasks.sequentiallyStolen
1673 }
1674 if (
1675 taskName != null &&
1676 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1677 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1678 ) {
1679 const taskFunctionWorkerUsage =
1680 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1681 workerNode.getTaskFunctionWorkerUsage(taskName)!
1682 if (
1683 taskFunctionWorkerUsage.tasks.sequentiallyStolen === 0 ||
1684 (previousTaskName != null &&
1685 previousTaskName === taskName &&
1686 taskFunctionWorkerUsage.tasks.sequentiallyStolen > 0)
1687 ) {
1688 ++taskFunctionWorkerUsage.tasks.sequentiallyStolen
1689 } else if (taskFunctionWorkerUsage.tasks.sequentiallyStolen > 0) {
1690 taskFunctionWorkerUsage.tasks.sequentiallyStolen = 0
1691 }
1692 }
1693 }
1694
1695 private updateTaskStolenStatisticsWorkerUsage (
1696 workerNodeKey: number,
1697 taskName: string
1698 ): void {
1699 const workerNode = this.workerNodes[workerNodeKey]
1700 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1701 if (workerNode?.usage != null) {
1702 ++workerNode.usage.tasks.stolen
1703 }
1704 if (
1705 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1706 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1707 ) {
1708 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1709 ++workerNode.getTaskFunctionWorkerUsage(taskName)!.tasks.stolen
1710 }
1711 }
1712
1713 /** @inheritDoc */
1714 public async addTaskFunction (
1715 name: string,
1716 fn: TaskFunction<Data, Response> | TaskFunctionObject<Data, Response>
1717 ): Promise<boolean> {
1718 if (typeof name !== 'string') {
1719 throw new TypeError('name argument must be a string')
1720 }
1721 if (typeof name === 'string' && name.trim().length === 0) {
1722 throw new TypeError('name argument must not be an empty string')
1723 }
1724 if (typeof fn === 'function') {
1725 fn = { taskFunction: fn } satisfies TaskFunctionObject<Data, Response>
1726 }
1727 if (typeof fn.taskFunction !== 'function') {
1728 throw new TypeError('taskFunction property must be a function')
1729 }
1730 checkValidPriority(fn.priority)
1731 checkValidWorkerChoiceStrategy(fn.strategy)
1732 const opResult = await this.sendTaskFunctionOperationToWorkers({
1733 taskFunction: fn.taskFunction.toString(),
1734 taskFunctionOperation: 'add',
1735 taskFunctionProperties: buildTaskFunctionProperties(name, fn),
1736 })
1737 this.taskFunctions.set(name, fn)
1738 this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies(
1739 this.getWorkerChoiceStrategies()
1740 )
1741 for (const workerNodeKey of this.workerNodes.keys()) {
1742 this.sendStatisticsMessageToWorker(workerNodeKey)
1743 }
1744 return opResult
1745 }
1746
1747 /** @inheritDoc */
1748 public async destroy (): Promise<void> {
1749 if (!this.started) {
1750 throw new Error('Cannot destroy an already destroyed pool')
1751 }
1752 if (this.starting) {
1753 throw new Error('Cannot destroy an starting pool')
1754 }
1755 if (this.destroying) {
1756 throw new Error('Cannot destroy an already destroying pool')
1757 }
1758 this.destroying = true
1759 await Promise.all(
1760 this.workerNodes.map(async (_, workerNodeKey) => {
1761 await this.destroyWorkerNode(workerNodeKey)
1762 })
1763 )
1764 if (this.emitter != null) {
1765 this.emitter.emit(PoolEvents.destroy, this.info)
1766 this.emitter.emitDestroy()
1767 this.readyEventEmitted = false
1768 }
1769 delete this.startTimestamp
1770 this.destroying = false
1771 this.started = false
1772 }
1773
1774 /** @inheritDoc */
1775 public enableTasksQueue (
1776 enable: boolean,
1777 tasksQueueOptions?: TasksQueueOptions
1778 ): void {
1779 if (this.opts.enableTasksQueue === true && !enable) {
1780 this.unsetTaskStealing()
1781 this.unsetTasksStealingOnBackPressure()
1782 this.flushTasksQueues()
1783 }
1784 this.opts.enableTasksQueue = enable
1785 this.setTasksQueueOptions(tasksQueueOptions)
1786 }
1787
1788 /** @inheritDoc */
1789 public async execute (
1790 data?: Data,
1791 name?: string,
1792 transferList?: readonly TransferListItem[]
1793 ): Promise<Response> {
1794 if (!this.started) {
1795 throw new Error('Cannot execute a task on not started pool')
1796 }
1797 if (this.destroying) {
1798 throw new Error('Cannot execute a task on destroying pool')
1799 }
1800 if (name != null && typeof name !== 'string') {
1801 throw new TypeError('name argument must be a string')
1802 }
1803 if (name != null && typeof name === 'string' && name.trim().length === 0) {
1804 throw new TypeError('name argument must not be an empty string')
1805 }
1806 if (transferList != null && !Array.isArray(transferList)) {
1807 throw new TypeError('transferList argument must be an array')
1808 }
1809 return await this.internalExecute(data, name, transferList)
1810 }
1811
1812 /** @inheritDoc */
1813 public hasTaskFunction (name: string): boolean {
1814 return this.listTaskFunctionsProperties().some(
1815 taskFunctionProperties => taskFunctionProperties.name === name
1816 )
1817 }
1818
1819 /** @inheritDoc */
1820 public listTaskFunctionsProperties (): TaskFunctionProperties[] {
1821 for (const workerNode of this.workerNodes) {
1822 if (
1823 Array.isArray(workerNode.info.taskFunctionsProperties) &&
1824 workerNode.info.taskFunctionsProperties.length > 0
1825 ) {
1826 return workerNode.info.taskFunctionsProperties
1827 }
1828 }
1829 return []
1830 }
1831
1832 /** @inheritDoc */
1833 public async mapExecute (
1834 data: Iterable<Data>,
1835 name?: string,
1836 transferList?: readonly TransferListItem[]
1837 ): Promise<Response[]> {
1838 if (!this.started) {
1839 throw new Error('Cannot execute task(s) on not started pool')
1840 }
1841 if (this.destroying) {
1842 throw new Error('Cannot execute task(s) on destroying pool')
1843 }
1844 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1845 if (data == null) {
1846 throw new TypeError('data argument must be a defined iterable')
1847 }
1848 if (typeof data[Symbol.iterator] !== 'function') {
1849 throw new TypeError('data argument must be an iterable')
1850 }
1851 if (name != null && typeof name !== 'string') {
1852 throw new TypeError('name argument must be a string')
1853 }
1854 if (name != null && typeof name === 'string' && name.trim().length === 0) {
1855 throw new TypeError('name argument must not be an empty string')
1856 }
1857 if (transferList != null && !Array.isArray(transferList)) {
1858 throw new TypeError('transferList argument must be an array')
1859 }
1860 if (!Array.isArray(data)) {
1861 data = [...data]
1862 }
1863 return await Promise.all(
1864 (data as Data[]).map(data =>
1865 this.internalExecute(data, name, transferList)
1866 )
1867 )
1868 }
1869
1870 /** @inheritDoc */
1871 public async removeTaskFunction (name: string): Promise<boolean> {
1872 if (!this.taskFunctions.has(name)) {
1873 throw new Error(
1874 'Cannot remove a task function not handled on the pool side'
1875 )
1876 }
1877 const opResult = await this.sendTaskFunctionOperationToWorkers({
1878 taskFunctionOperation: 'remove',
1879 taskFunctionProperties: buildTaskFunctionProperties(
1880 name,
1881 this.taskFunctions.get(name)
1882 ),
1883 })
1884 for (const workerNode of this.workerNodes) {
1885 workerNode.deleteTaskFunctionWorkerUsage(name)
1886 }
1887 this.taskFunctions.delete(name)
1888 this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies(
1889 this.getWorkerChoiceStrategies()
1890 )
1891 for (const workerNodeKey of this.workerNodes.keys()) {
1892 this.sendStatisticsMessageToWorker(workerNodeKey)
1893 }
1894 return opResult
1895 }
1896
1897 /** @inheritDoc */
1898 public async setDefaultTaskFunction (name: string): Promise<boolean> {
1899 return await this.sendTaskFunctionOperationToWorkers({
1900 taskFunctionOperation: 'default',
1901 taskFunctionProperties: buildTaskFunctionProperties(
1902 name,
1903 this.taskFunctions.get(name)
1904 ),
1905 })
1906 }
1907
1908 /** @inheritDoc */
1909 public setTasksQueueOptions (
1910 tasksQueueOptions: TasksQueueOptions | undefined
1911 ): void {
1912 if (this.opts.enableTasksQueue === true) {
1913 checkValidTasksQueueOptions(tasksQueueOptions)
1914 this.opts.tasksQueueOptions =
1915 this.buildTasksQueueOptions(tasksQueueOptions)
1916 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1917 this.setTasksQueueSize(this.opts.tasksQueueOptions.size!)
1918 if (this.opts.tasksQueueOptions.taskStealing === true) {
1919 this.unsetTaskStealing()
1920 this.setTaskStealing()
1921 } else {
1922 this.unsetTaskStealing()
1923 }
1924 if (this.opts.tasksQueueOptions.tasksStealingOnBackPressure === true) {
1925 this.unsetTasksStealingOnBackPressure()
1926 this.setTasksStealingOnBackPressure()
1927 } else {
1928 this.unsetTasksStealingOnBackPressure()
1929 }
1930 } else if (this.opts.tasksQueueOptions != null) {
1931 delete this.opts.tasksQueueOptions
1932 }
1933 }
1934
1935 /** @inheritDoc */
1936 public setWorkerChoiceStrategy (
1937 workerChoiceStrategy: WorkerChoiceStrategy,
1938 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
1939 ): void {
1940 let requireSync = false
1941 checkValidWorkerChoiceStrategy(workerChoiceStrategy)
1942 if (workerChoiceStrategyOptions != null) {
1943 requireSync = !this.setWorkerChoiceStrategyOptions(
1944 workerChoiceStrategyOptions
1945 )
1946 }
1947 if (workerChoiceStrategy !== this.opts.workerChoiceStrategy) {
1948 this.opts.workerChoiceStrategy = workerChoiceStrategy
1949 this.workerChoiceStrategiesContext?.setDefaultWorkerChoiceStrategy(
1950 this.opts.workerChoiceStrategy,
1951 this.opts.workerChoiceStrategyOptions
1952 )
1953 requireSync = true
1954 }
1955 if (requireSync) {
1956 this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies(
1957 this.getWorkerChoiceStrategies(),
1958 this.opts.workerChoiceStrategyOptions
1959 )
1960 for (const workerNodeKey of this.workerNodes.keys()) {
1961 this.sendStatisticsMessageToWorker(workerNodeKey)
1962 }
1963 }
1964 }
1965
1966 /** @inheritDoc */
1967 public setWorkerChoiceStrategyOptions (
1968 workerChoiceStrategyOptions: undefined | WorkerChoiceStrategyOptions
1969 ): boolean {
1970 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
1971 if (workerChoiceStrategyOptions != null) {
1972 this.opts.workerChoiceStrategyOptions = {
1973 ...this.opts.workerChoiceStrategyOptions,
1974 ...workerChoiceStrategyOptions,
1975 }
1976 this.workerChoiceStrategiesContext?.setOptions(
1977 this.opts.workerChoiceStrategyOptions
1978 )
1979 this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies(
1980 this.getWorkerChoiceStrategies(),
1981 this.opts.workerChoiceStrategyOptions
1982 )
1983 for (const workerNodeKey of this.workerNodes.keys()) {
1984 this.sendStatisticsMessageToWorker(workerNodeKey)
1985 }
1986 return true
1987 }
1988 return false
1989 }
1990
1991 /** @inheritdoc */
1992 public start (): void {
1993 if (this.started) {
1994 throw new Error('Cannot start an already started pool')
1995 }
1996 if (this.starting) {
1997 throw new Error('Cannot start an already starting pool')
1998 }
1999 if (this.destroying) {
2000 throw new Error('Cannot start a destroying pool')
2001 }
2002 this.starting = true
2003 this.startMinimumNumberOfWorkers()
2004 this.startTimestamp = performance.now()
2005 this.starting = false
2006 this.started = true
2007 }
2008
2009 /**
2010 * Whether the pool is back pressured or not.
2011 * @returns The pool back pressure boolean status.
2012 */
2013 protected abstract get backPressure (): boolean
2014
2015 /**
2016 * Whether the pool is busy or not.
2017 * @returns The pool busyness boolean status.
2018 */
2019 protected abstract get busy (): boolean
2020
2021 /** @inheritDoc */
2022 public get info (): PoolInfo {
2023 return {
2024 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
2025 defaultStrategy: this.opts.workerChoiceStrategy!,
2026 maxSize: this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers,
2027 minSize: this.minimumNumberOfWorkers,
2028 ready: this.ready,
2029 started: this.started,
2030 strategyRetries: this.workerChoiceStrategiesContext?.retriesCount ?? 0,
2031 type: this.type,
2032 version,
2033 worker: this.worker,
2034 ...(this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
2035 .runTime.aggregate === true &&
2036 this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
2037 .waitTime.aggregate && {
2038 utilization: round(this.utilization),
2039 }),
2040 busyWorkerNodes: this.workerNodes.reduce(
2041 (accumulator, _, workerNodeKey) =>
2042 this.isWorkerNodeBusy(workerNodeKey) ? accumulator + 1 : accumulator,
2043 0
2044 ),
2045 executedTasks: this.workerNodes.reduce(
2046 (accumulator, workerNode) =>
2047 accumulator + workerNode.usage.tasks.executed,
2048 0
2049 ),
2050 executingTasks: this.workerNodes.reduce(
2051 (accumulator, workerNode) =>
2052 accumulator + workerNode.usage.tasks.executing,
2053 0
2054 ),
2055 failedTasks: this.workerNodes.reduce(
2056 (accumulator, workerNode) =>
2057 accumulator + workerNode.usage.tasks.failed,
2058 0
2059 ),
2060 idleWorkerNodes: this.workerNodes.reduce(
2061 (accumulator, _, workerNodeKey) =>
2062 this.isWorkerNodeIdle(workerNodeKey) ? accumulator + 1 : accumulator,
2063 0
2064 ),
2065 workerNodes: this.workerNodes.length,
2066 ...(this.type === PoolTypes.dynamic && {
2067 dynamicWorkerNodes: this.workerNodes.reduce(
2068 (accumulator, workerNode) =>
2069 workerNode.info.dynamic ? accumulator + 1 : accumulator,
2070 0
2071 ),
2072 }),
2073 ...(this.opts.enableTasksQueue === true && {
2074 backPressure: this.backPressure,
2075 backPressureWorkerNodes: this.workerNodes.reduce(
2076 (accumulator, _, workerNodeKey) =>
2077 this.isWorkerNodeBackPressured(workerNodeKey)
2078 ? accumulator + 1
2079 : accumulator,
2080 0
2081 ),
2082 maxQueuedTasks: this.workerNodes.reduce(
2083 (accumulator, workerNode) =>
2084 accumulator + (workerNode.usage.tasks.maxQueued ?? 0),
2085 0
2086 ),
2087 queuedTasks: this.workerNodes.reduce(
2088 (accumulator, workerNode) =>
2089 accumulator + workerNode.usage.tasks.queued,
2090 0
2091 ),
2092 stealingWorkerNodes: this.workerNodes.reduce(
2093 (accumulator, _, workerNodeKey) =>
2094 this.isWorkerNodeStealing(workerNodeKey)
2095 ? accumulator + 1
2096 : accumulator,
2097 0
2098 ),
2099 stolenTasks: this.workerNodes.reduce(
2100 (accumulator, workerNode) =>
2101 accumulator + workerNode.usage.tasks.stolen,
2102 0
2103 ),
2104 }),
2105 ...(this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
2106 .runTime.aggregate === true && {
2107 runTime: {
2108 maximum: round(
2109 max(
2110 ...this.workerNodes.map(
2111 workerNode =>
2112 workerNode.usage.runTime.maximum ?? Number.NEGATIVE_INFINITY
2113 )
2114 )
2115 ),
2116 minimum: round(
2117 min(
2118 ...this.workerNodes.map(
2119 workerNode =>
2120 workerNode.usage.runTime.minimum ?? Number.POSITIVE_INFINITY
2121 )
2122 )
2123 ),
2124 ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
2125 .runTime.average && {
2126 average: round(
2127 average(
2128 this.workerNodes.reduce<number[]>(
2129 (accumulator, workerNode) =>
2130 accumulator.concat(
2131 workerNode.usage.runTime.history.toArray()
2132 ),
2133 []
2134 )
2135 )
2136 ),
2137 }),
2138 ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
2139 .runTime.median && {
2140 median: round(
2141 median(
2142 this.workerNodes.reduce<number[]>(
2143 (accumulator, workerNode) =>
2144 accumulator.concat(
2145 workerNode.usage.runTime.history.toArray()
2146 ),
2147 []
2148 )
2149 )
2150 ),
2151 }),
2152 },
2153 }),
2154 ...(this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
2155 .waitTime.aggregate === true && {
2156 waitTime: {
2157 maximum: round(
2158 max(
2159 ...this.workerNodes.map(
2160 workerNode =>
2161 workerNode.usage.waitTime.maximum ?? Number.NEGATIVE_INFINITY
2162 )
2163 )
2164 ),
2165 minimum: round(
2166 min(
2167 ...this.workerNodes.map(
2168 workerNode =>
2169 workerNode.usage.waitTime.minimum ?? Number.POSITIVE_INFINITY
2170 )
2171 )
2172 ),
2173 ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
2174 .waitTime.average && {
2175 average: round(
2176 average(
2177 this.workerNodes.reduce<number[]>(
2178 (accumulator, workerNode) =>
2179 accumulator.concat(
2180 workerNode.usage.waitTime.history.toArray()
2181 ),
2182 []
2183 )
2184 )
2185 ),
2186 }),
2187 ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
2188 .waitTime.median && {
2189 median: round(
2190 median(
2191 this.workerNodes.reduce<number[]>(
2192 (accumulator, workerNode) =>
2193 accumulator.concat(
2194 workerNode.usage.waitTime.history.toArray()
2195 ),
2196 []
2197 )
2198 )
2199 ),
2200 }),
2201 },
2202 }),
2203 ...(this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
2204 .elu.aggregate === true && {
2205 elu: {
2206 active: {
2207 maximum: round(
2208 max(
2209 ...this.workerNodes.map(
2210 workerNode =>
2211 workerNode.usage.elu.active.maximum ??
2212 Number.NEGATIVE_INFINITY
2213 )
2214 )
2215 ),
2216 minimum: round(
2217 min(
2218 ...this.workerNodes.map(
2219 workerNode =>
2220 workerNode.usage.elu.active.minimum ??
2221 Number.POSITIVE_INFINITY
2222 )
2223 )
2224 ),
2225 ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
2226 .elu.average && {
2227 average: round(
2228 average(
2229 this.workerNodes.reduce<number[]>(
2230 (accumulator, workerNode) =>
2231 accumulator.concat(
2232 workerNode.usage.elu.active.history.toArray()
2233 ),
2234 []
2235 )
2236 )
2237 ),
2238 }),
2239 ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
2240 .elu.median && {
2241 median: round(
2242 median(
2243 this.workerNodes.reduce<number[]>(
2244 (accumulator, workerNode) =>
2245 accumulator.concat(
2246 workerNode.usage.elu.active.history.toArray()
2247 ),
2248 []
2249 )
2250 )
2251 ),
2252 }),
2253 },
2254 idle: {
2255 maximum: round(
2256 max(
2257 ...this.workerNodes.map(
2258 workerNode =>
2259 workerNode.usage.elu.idle.maximum ??
2260 Number.NEGATIVE_INFINITY
2261 )
2262 )
2263 ),
2264 minimum: round(
2265 min(
2266 ...this.workerNodes.map(
2267 workerNode =>
2268 workerNode.usage.elu.idle.minimum ??
2269 Number.POSITIVE_INFINITY
2270 )
2271 )
2272 ),
2273 ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
2274 .elu.average && {
2275 average: round(
2276 average(
2277 this.workerNodes.reduce<number[]>(
2278 (accumulator, workerNode) =>
2279 accumulator.concat(
2280 workerNode.usage.elu.idle.history.toArray()
2281 ),
2282 []
2283 )
2284 )
2285 ),
2286 }),
2287 ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
2288 .elu.median && {
2289 median: round(
2290 median(
2291 this.workerNodes.reduce<number[]>(
2292 (accumulator, workerNode) =>
2293 accumulator.concat(
2294 workerNode.usage.elu.idle.history.toArray()
2295 ),
2296 []
2297 )
2298 )
2299 ),
2300 }),
2301 },
2302 utilization: {
2303 average: round(
2304 average(
2305 this.workerNodes.map(
2306 workerNode => workerNode.usage.elu.utilization ?? 0
2307 )
2308 )
2309 ),
2310 median: round(
2311 median(
2312 this.workerNodes.map(
2313 workerNode => workerNode.usage.elu.utilization ?? 0
2314 )
2315 )
2316 ),
2317 },
2318 },
2319 }),
2320 }
2321 }
2322
2323 /**
2324 * Whether the pool is ready or not.
2325 * @returns The pool readiness boolean status.
2326 */
2327 private get ready (): boolean {
2328 if (!this.started) {
2329 return false
2330 }
2331 return (
2332 this.workerNodes.reduce(
2333 (accumulator, workerNode) =>
2334 !workerNode.info.dynamic && workerNode.info.ready
2335 ? accumulator + 1
2336 : accumulator,
2337 0
2338 ) >= this.minimumNumberOfWorkers
2339 )
2340 }
2341
2342 /**
2343 * The pool type.
2344 *
2345 * If it is `'dynamic'`, it provides the `max` property.
2346 */
2347 protected abstract get type (): PoolType
2348
2349 /**
2350 * The approximate pool utilization.
2351 * @returns The pool utilization.
2352 */
2353 private get utilization (): number {
2354 if (this.startTimestamp == null) {
2355 return 0
2356 }
2357 const poolTimeCapacity =
2358 (performance.now() - this.startTimestamp) *
2359 (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
2360 const totalTasksRunTime = this.workerNodes.reduce(
2361 (accumulator, workerNode) =>
2362 accumulator + (workerNode.usage.runTime.aggregate ?? 0),
2363 0
2364 )
2365 const totalTasksWaitTime = this.workerNodes.reduce(
2366 (accumulator, workerNode) =>
2367 accumulator + (workerNode.usage.waitTime.aggregate ?? 0),
2368 0
2369 )
2370 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
2371 }
2372
2373 /**
2374 * The worker type.
2375 */
2376 protected abstract get worker (): WorkerType
2377}