perf: use worker node key instead of heavy worker reference in hot code path
[poolifier.git] / src / pools / abstract-pool.ts
1 import { randomUUID } from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import { existsSync } from 'node:fs'
4 import type {
5 MessageValue,
6 PromiseResponseWrapper,
7 Task
8 } from '../utility-types'
9 import {
10 DEFAULT_TASK_NAME,
11 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
12 EMPTY_FUNCTION,
13 isKillBehavior,
14 isPlainObject,
15 median,
16 round,
17 updateMeasurementStatistics
18 } from '../utils'
19 import { KillBehaviors } from '../worker/worker-options'
20 import {
21 type IPool,
22 PoolEmitter,
23 PoolEvents,
24 type PoolInfo,
25 type PoolOptions,
26 type PoolType,
27 PoolTypes,
28 type TasksQueueOptions
29 } from './pool'
30 import type {
31 IWorker,
32 IWorkerNode,
33 WorkerInfo,
34 WorkerType,
35 WorkerUsage
36 } from './worker'
37 import {
38 type MeasurementStatisticsRequirements,
39 Measurements,
40 WorkerChoiceStrategies,
41 type WorkerChoiceStrategy,
42 type WorkerChoiceStrategyOptions
43 } from './selection-strategies/selection-strategies-types'
44 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
45 import { version } from './version'
46 import { WorkerNode } from './worker-node'
47
48 /**
49 * Base class that implements some shared logic for all poolifier pools.
50 *
51 * @typeParam Worker - Type of worker which manages this pool.
52 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
53 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
54 */
55 export abstract class AbstractPool<
56 Worker extends IWorker,
57 Data = unknown,
58 Response = unknown
59 > implements IPool<Worker, Data, Response> {
60 /** @inheritDoc */
61 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
62
63 /** @inheritDoc */
64 public readonly emitter?: PoolEmitter
65
66 /**
67 * The task execution response promise map.
68 *
69 * - `key`: The message id of each submitted task.
70 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
71 *
72 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
73 */
74 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
75 new Map<string, PromiseResponseWrapper<Response>>()
76
77 /**
78 * Worker choice strategy context referencing a worker choice algorithm implementation.
79 */
80 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
81 Worker,
82 Data,
83 Response
84 >
85
86 /**
87 * Whether the pool is starting or not.
88 */
89 private readonly starting: boolean
90 /**
91 * The start timestamp of the pool.
92 */
93 private readonly startTimestamp
94
95 /**
96 * Constructs a new poolifier pool.
97 *
98 * @param numberOfWorkers - Number of workers that this pool should manage.
99 * @param filePath - Path to the worker file.
100 * @param opts - Options for the pool.
101 */
102 public constructor (
103 protected readonly numberOfWorkers: number,
104 protected readonly filePath: string,
105 protected readonly opts: PoolOptions<Worker>
106 ) {
107 if (!this.isMain()) {
108 throw new Error('Cannot start a pool from a worker!')
109 }
110 this.checkNumberOfWorkers(this.numberOfWorkers)
111 this.checkFilePath(this.filePath)
112 this.checkPoolOptions(this.opts)
113
114 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
115 this.executeTask = this.executeTask.bind(this)
116 this.enqueueTask = this.enqueueTask.bind(this)
117 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
118
119 if (this.opts.enableEvents === true) {
120 this.emitter = new PoolEmitter()
121 }
122 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
123 Worker,
124 Data,
125 Response
126 >(
127 this,
128 this.opts.workerChoiceStrategy,
129 this.opts.workerChoiceStrategyOptions
130 )
131
132 this.setupHook()
133
134 this.starting = true
135 this.startPool()
136 this.starting = false
137
138 this.startTimestamp = performance.now()
139 }
140
141 private checkFilePath (filePath: string): void {
142 if (
143 filePath == null ||
144 typeof filePath !== 'string' ||
145 (typeof filePath === 'string' && filePath.trim().length === 0)
146 ) {
147 throw new Error('Please specify a file with a worker implementation')
148 }
149 if (!existsSync(filePath)) {
150 throw new Error(`Cannot find the worker file '${filePath}'`)
151 }
152 }
153
154 private checkNumberOfWorkers (numberOfWorkers: number): void {
155 if (numberOfWorkers == null) {
156 throw new Error(
157 'Cannot instantiate a pool without specifying the number of workers'
158 )
159 } else if (!Number.isSafeInteger(numberOfWorkers)) {
160 throw new TypeError(
161 'Cannot instantiate a pool with a non safe integer number of workers'
162 )
163 } else if (numberOfWorkers < 0) {
164 throw new RangeError(
165 'Cannot instantiate a pool with a negative number of workers'
166 )
167 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
168 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
169 }
170 }
171
172 protected checkDynamicPoolSize (min: number, max: number): void {
173 if (this.type === PoolTypes.dynamic) {
174 if (min > max) {
175 throw new RangeError(
176 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
177 )
178 } else if (max === 0) {
179 throw new RangeError(
180 'Cannot instantiate a dynamic pool with a pool size equal to zero'
181 )
182 } else if (min === max) {
183 throw new RangeError(
184 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
185 )
186 }
187 }
188 }
189
190 private checkPoolOptions (opts: PoolOptions<Worker>): void {
191 if (isPlainObject(opts)) {
192 this.opts.workerChoiceStrategy =
193 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
194 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
195 this.opts.workerChoiceStrategyOptions =
196 opts.workerChoiceStrategyOptions ??
197 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
198 this.checkValidWorkerChoiceStrategyOptions(
199 this.opts.workerChoiceStrategyOptions
200 )
201 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
202 this.opts.enableEvents = opts.enableEvents ?? true
203 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
204 if (this.opts.enableTasksQueue) {
205 this.checkValidTasksQueueOptions(
206 opts.tasksQueueOptions as TasksQueueOptions
207 )
208 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
209 opts.tasksQueueOptions as TasksQueueOptions
210 )
211 }
212 } else {
213 throw new TypeError('Invalid pool options: must be a plain object')
214 }
215 }
216
217 private checkValidWorkerChoiceStrategy (
218 workerChoiceStrategy: WorkerChoiceStrategy
219 ): void {
220 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
221 throw new Error(
222 `Invalid worker choice strategy '${workerChoiceStrategy}'`
223 )
224 }
225 }
226
227 private checkValidWorkerChoiceStrategyOptions (
228 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
229 ): void {
230 if (!isPlainObject(workerChoiceStrategyOptions)) {
231 throw new TypeError(
232 'Invalid worker choice strategy options: must be a plain object'
233 )
234 }
235 if (
236 workerChoiceStrategyOptions.weights != null &&
237 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
238 ) {
239 throw new Error(
240 'Invalid worker choice strategy options: must have a weight for each worker node'
241 )
242 }
243 if (
244 workerChoiceStrategyOptions.measurement != null &&
245 !Object.values(Measurements).includes(
246 workerChoiceStrategyOptions.measurement
247 )
248 ) {
249 throw new Error(
250 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
251 )
252 }
253 }
254
255 private checkValidTasksQueueOptions (
256 tasksQueueOptions: TasksQueueOptions
257 ): void {
258 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
259 throw new TypeError('Invalid tasks queue options: must be a plain object')
260 }
261 if (
262 tasksQueueOptions?.concurrency != null &&
263 !Number.isSafeInteger(tasksQueueOptions.concurrency)
264 ) {
265 throw new TypeError(
266 'Invalid worker tasks concurrency: must be an integer'
267 )
268 }
269 if (
270 tasksQueueOptions?.concurrency != null &&
271 tasksQueueOptions.concurrency <= 0
272 ) {
273 throw new Error(
274 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
275 )
276 }
277 }
278
279 private startPool (): void {
280 while (
281 this.workerNodes.reduce(
282 (accumulator, workerNode) =>
283 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
284 0
285 ) < this.numberOfWorkers
286 ) {
287 this.createAndSetupWorker()
288 }
289 }
290
291 /** @inheritDoc */
292 public get info (): PoolInfo {
293 return {
294 version,
295 type: this.type,
296 worker: this.worker,
297 ready: this.ready,
298 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
299 minSize: this.minSize,
300 maxSize: this.maxSize,
301 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
302 .runTime.aggregate &&
303 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
304 .waitTime.aggregate && { utilization: round(this.utilization) }),
305 workerNodes: this.workerNodes.length,
306 idleWorkerNodes: this.workerNodes.reduce(
307 (accumulator, workerNode) =>
308 workerNode.usage.tasks.executing === 0
309 ? accumulator + 1
310 : accumulator,
311 0
312 ),
313 busyWorkerNodes: this.workerNodes.reduce(
314 (accumulator, workerNode) =>
315 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
316 0
317 ),
318 executedTasks: this.workerNodes.reduce(
319 (accumulator, workerNode) =>
320 accumulator + workerNode.usage.tasks.executed,
321 0
322 ),
323 executingTasks: this.workerNodes.reduce(
324 (accumulator, workerNode) =>
325 accumulator + workerNode.usage.tasks.executing,
326 0
327 ),
328 queuedTasks: this.workerNodes.reduce(
329 (accumulator, workerNode) =>
330 accumulator + workerNode.usage.tasks.queued,
331 0
332 ),
333 maxQueuedTasks: this.workerNodes.reduce(
334 (accumulator, workerNode) =>
335 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
336 0
337 ),
338 failedTasks: this.workerNodes.reduce(
339 (accumulator, workerNode) =>
340 accumulator + workerNode.usage.tasks.failed,
341 0
342 ),
343 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
344 .runTime.aggregate && {
345 runTime: {
346 minimum: round(
347 Math.min(
348 ...this.workerNodes.map(
349 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
350 )
351 )
352 ),
353 maximum: round(
354 Math.max(
355 ...this.workerNodes.map(
356 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
357 )
358 )
359 ),
360 average: round(
361 this.workerNodes.reduce(
362 (accumulator, workerNode) =>
363 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
364 0
365 ) /
366 this.workerNodes.reduce(
367 (accumulator, workerNode) =>
368 accumulator + (workerNode.usage.tasks?.executed ?? 0),
369 0
370 )
371 ),
372 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
373 .runTime.median && {
374 median: round(
375 median(
376 this.workerNodes.map(
377 workerNode => workerNode.usage.runTime?.median ?? 0
378 )
379 )
380 )
381 })
382 }
383 }),
384 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
385 .waitTime.aggregate && {
386 waitTime: {
387 minimum: round(
388 Math.min(
389 ...this.workerNodes.map(
390 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
391 )
392 )
393 ),
394 maximum: round(
395 Math.max(
396 ...this.workerNodes.map(
397 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
398 )
399 )
400 ),
401 average: round(
402 this.workerNodes.reduce(
403 (accumulator, workerNode) =>
404 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
405 0
406 ) /
407 this.workerNodes.reduce(
408 (accumulator, workerNode) =>
409 accumulator + (workerNode.usage.tasks?.executed ?? 0),
410 0
411 )
412 ),
413 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
414 .waitTime.median && {
415 median: round(
416 median(
417 this.workerNodes.map(
418 workerNode => workerNode.usage.waitTime?.median ?? 0
419 )
420 )
421 )
422 })
423 }
424 })
425 }
426 }
427
428 private get ready (): boolean {
429 return (
430 this.workerNodes.reduce(
431 (accumulator, workerNode) =>
432 !workerNode.info.dynamic && workerNode.info.ready
433 ? accumulator + 1
434 : accumulator,
435 0
436 ) >= this.minSize
437 )
438 }
439
440 /**
441 * Gets the approximate pool utilization.
442 *
443 * @returns The pool utilization.
444 */
445 private get utilization (): number {
446 const poolTimeCapacity =
447 (performance.now() - this.startTimestamp) * this.maxSize
448 const totalTasksRunTime = this.workerNodes.reduce(
449 (accumulator, workerNode) =>
450 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
451 0
452 )
453 const totalTasksWaitTime = this.workerNodes.reduce(
454 (accumulator, workerNode) =>
455 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
456 0
457 )
458 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
459 }
460
461 /**
462 * Pool type.
463 *
464 * If it is `'dynamic'`, it provides the `max` property.
465 */
466 protected abstract get type (): PoolType
467
468 /**
469 * Gets the worker type.
470 */
471 protected abstract get worker (): WorkerType
472
473 /**
474 * Pool minimum size.
475 */
476 protected abstract get minSize (): number
477
478 /**
479 * Pool maximum size.
480 */
481 protected abstract get maxSize (): number
482
483 /**
484 * Get the worker given its id.
485 *
486 * @param workerId - The worker id.
487 * @returns The worker if found in the pool worker nodes, `undefined` otherwise.
488 */
489 private getWorkerById (workerId: number): Worker | undefined {
490 return this.workerNodes.find(workerNode => workerNode.info.id === workerId)
491 ?.worker
492 }
493
494 /**
495 * Checks if the worker id sent in the received message from a worker is valid.
496 *
497 * @param message - The received message.
498 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
499 */
500 private checkMessageWorkerId (message: MessageValue<Response>): void {
501 if (
502 message.workerId != null &&
503 this.getWorkerById(message.workerId) == null
504 ) {
505 throw new Error(
506 `Worker message received from unknown worker '${message.workerId}'`
507 )
508 }
509 }
510
511 /**
512 * Gets the given worker its worker node key.
513 *
514 * @param worker - The worker.
515 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
516 */
517 protected getWorkerNodeKey (worker: Worker): number {
518 return this.workerNodes.findIndex(
519 workerNode => workerNode.worker === worker
520 )
521 }
522
523 /** @inheritDoc */
524 public setWorkerChoiceStrategy (
525 workerChoiceStrategy: WorkerChoiceStrategy,
526 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
527 ): void {
528 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
529 this.opts.workerChoiceStrategy = workerChoiceStrategy
530 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
531 this.opts.workerChoiceStrategy
532 )
533 if (workerChoiceStrategyOptions != null) {
534 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
535 }
536 for (const workerNode of this.workerNodes) {
537 workerNode.resetUsage()
538 this.setWorkerStatistics(workerNode.worker)
539 }
540 }
541
542 /** @inheritDoc */
543 public setWorkerChoiceStrategyOptions (
544 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
545 ): void {
546 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
547 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
548 this.workerChoiceStrategyContext.setOptions(
549 this.opts.workerChoiceStrategyOptions
550 )
551 }
552
553 /** @inheritDoc */
554 public enableTasksQueue (
555 enable: boolean,
556 tasksQueueOptions?: TasksQueueOptions
557 ): void {
558 if (this.opts.enableTasksQueue === true && !enable) {
559 this.flushTasksQueues()
560 }
561 this.opts.enableTasksQueue = enable
562 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
563 }
564
565 /** @inheritDoc */
566 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
567 if (this.opts.enableTasksQueue === true) {
568 this.checkValidTasksQueueOptions(tasksQueueOptions)
569 this.opts.tasksQueueOptions =
570 this.buildTasksQueueOptions(tasksQueueOptions)
571 } else if (this.opts.tasksQueueOptions != null) {
572 delete this.opts.tasksQueueOptions
573 }
574 }
575
576 private buildTasksQueueOptions (
577 tasksQueueOptions: TasksQueueOptions
578 ): TasksQueueOptions {
579 return {
580 concurrency: tasksQueueOptions?.concurrency ?? 1
581 }
582 }
583
584 /**
585 * Whether the pool is full or not.
586 *
587 * The pool filling boolean status.
588 */
589 protected get full (): boolean {
590 return this.workerNodes.length >= this.maxSize
591 }
592
593 /**
594 * Whether the pool is busy or not.
595 *
596 * The pool busyness boolean status.
597 */
598 protected abstract get busy (): boolean
599
600 /**
601 * Whether worker nodes are executing at least one task.
602 *
603 * @returns Worker nodes busyness boolean status.
604 */
605 protected internalBusy (): boolean {
606 return (
607 this.workerNodes.findIndex(workerNode => {
608 return workerNode.usage.tasks.executing === 0
609 }) === -1
610 )
611 }
612
613 /** @inheritDoc */
614 public async execute (data?: Data, name?: string): Promise<Response> {
615 return await new Promise<Response>((resolve, reject) => {
616 const timestamp = performance.now()
617 const workerNodeKey = this.chooseWorkerNode()
618 const task: Task<Data> = {
619 name: name ?? DEFAULT_TASK_NAME,
620 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
621 data: data ?? ({} as Data),
622 timestamp,
623 workerId: this.getWorkerInfo(workerNodeKey).id as number,
624 id: randomUUID()
625 }
626 this.promiseResponseMap.set(task.id as string, {
627 resolve,
628 reject,
629 workerNodeKey
630 })
631 if (
632 this.opts.enableTasksQueue === true &&
633 (this.busy ||
634 this.workerNodes[workerNodeKey].usage.tasks.executing >=
635 ((this.opts.tasksQueueOptions as TasksQueueOptions)
636 .concurrency as number))
637 ) {
638 this.enqueueTask(workerNodeKey, task)
639 } else {
640 this.executeTask(workerNodeKey, task)
641 }
642 this.checkAndEmitEvents()
643 })
644 }
645
646 /** @inheritDoc */
647 public async destroy (): Promise<void> {
648 await Promise.all(
649 this.workerNodes.map(async (workerNode, workerNodeKey) => {
650 this.flushTasksQueue(workerNodeKey)
651 // FIXME: wait for tasks to be finished
652 const workerExitPromise = new Promise<void>(resolve => {
653 workerNode.worker.on('exit', () => {
654 resolve()
655 })
656 })
657 await this.destroyWorker(workerNode.worker)
658 await workerExitPromise
659 })
660 )
661 }
662
663 /**
664 * Terminates the given worker.
665 *
666 * @param worker - A worker within `workerNodes`.
667 */
668 protected abstract destroyWorker (worker: Worker): void | Promise<void>
669
670 /**
671 * Setup hook to execute code before worker nodes are created in the abstract constructor.
672 * Can be overridden.
673 *
674 * @virtual
675 */
676 protected setupHook (): void {
677 // Intentionally empty
678 }
679
680 /**
681 * Should return whether the worker is the main worker or not.
682 */
683 protected abstract isMain (): boolean
684
685 /**
686 * Hook executed before the worker task execution.
687 * Can be overridden.
688 *
689 * @param workerNodeKey - The worker node key.
690 * @param task - The task to execute.
691 */
692 protected beforeTaskExecutionHook (
693 workerNodeKey: number,
694 task: Task<Data>
695 ): void {
696 const workerUsage = this.workerNodes[workerNodeKey].usage
697 ++workerUsage.tasks.executing
698 this.updateWaitTimeWorkerUsage(workerUsage, task)
699 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
700 task.name as string
701 ) as WorkerUsage
702 ++taskWorkerUsage.tasks.executing
703 this.updateWaitTimeWorkerUsage(taskWorkerUsage, task)
704 }
705
706 /**
707 * Hook executed after the worker task execution.
708 * Can be overridden.
709 *
710 * @param workerNodeKey - The worker node key.
711 * @param message - The received message.
712 */
713 protected afterTaskExecutionHook (
714 workerNodeKey: number,
715 message: MessageValue<Response>
716 ): void {
717 const workerUsage = this.workerNodes[workerNodeKey].usage
718 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
719 this.updateRunTimeWorkerUsage(workerUsage, message)
720 this.updateEluWorkerUsage(workerUsage, message)
721 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
722 message.taskPerformance?.name ?? DEFAULT_TASK_NAME
723 ) as WorkerUsage
724 this.updateTaskStatisticsWorkerUsage(taskWorkerUsage, message)
725 this.updateRunTimeWorkerUsage(taskWorkerUsage, message)
726 this.updateEluWorkerUsage(taskWorkerUsage, message)
727 }
728
729 private updateTaskStatisticsWorkerUsage (
730 workerUsage: WorkerUsage,
731 message: MessageValue<Response>
732 ): void {
733 const workerTaskStatistics = workerUsage.tasks
734 --workerTaskStatistics.executing
735 if (message.taskError == null) {
736 ++workerTaskStatistics.executed
737 } else {
738 ++workerTaskStatistics.failed
739 }
740 }
741
742 private updateRunTimeWorkerUsage (
743 workerUsage: WorkerUsage,
744 message: MessageValue<Response>
745 ): void {
746 updateMeasurementStatistics(
747 workerUsage.runTime,
748 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
749 message.taskPerformance?.runTime ?? 0,
750 workerUsage.tasks.executed
751 )
752 }
753
754 private updateWaitTimeWorkerUsage (
755 workerUsage: WorkerUsage,
756 task: Task<Data>
757 ): void {
758 const timestamp = performance.now()
759 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
760 updateMeasurementStatistics(
761 workerUsage.waitTime,
762 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
763 taskWaitTime,
764 workerUsage.tasks.executed
765 )
766 }
767
768 private updateEluWorkerUsage (
769 workerUsage: WorkerUsage,
770 message: MessageValue<Response>
771 ): void {
772 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
773 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
774 updateMeasurementStatistics(
775 workerUsage.elu.active,
776 eluTaskStatisticsRequirements,
777 message.taskPerformance?.elu?.active ?? 0,
778 workerUsage.tasks.executed
779 )
780 updateMeasurementStatistics(
781 workerUsage.elu.idle,
782 eluTaskStatisticsRequirements,
783 message.taskPerformance?.elu?.idle ?? 0,
784 workerUsage.tasks.executed
785 )
786 if (eluTaskStatisticsRequirements.aggregate) {
787 if (message.taskPerformance?.elu != null) {
788 if (workerUsage.elu.utilization != null) {
789 workerUsage.elu.utilization =
790 (workerUsage.elu.utilization +
791 message.taskPerformance.elu.utilization) /
792 2
793 } else {
794 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
795 }
796 }
797 }
798 }
799
800 /**
801 * Chooses a worker node for the next task.
802 *
803 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
804 *
805 * @returns The worker node key
806 */
807 private chooseWorkerNode (): number {
808 if (this.shallCreateDynamicWorker()) {
809 const worker = this.createAndSetupDynamicWorker()
810 if (
811 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
812 ) {
813 return this.getWorkerNodeKey(worker)
814 }
815 }
816 return this.workerChoiceStrategyContext.execute()
817 }
818
819 /**
820 * Conditions for dynamic worker creation.
821 *
822 * @returns Whether to create a dynamic worker or not.
823 */
824 private shallCreateDynamicWorker (): boolean {
825 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
826 }
827
828 /**
829 * Sends a message to the given worker.
830 *
831 * @param worker - The worker which should receive the message.
832 * @param message - The message.
833 */
834 protected abstract sendToWorker (
835 worker: Worker,
836 message: MessageValue<Data>
837 ): void
838
839 /**
840 * Creates a new worker.
841 *
842 * @returns Newly created worker.
843 */
844 protected abstract createWorker (): Worker
845
846 /**
847 * Creates a new worker and sets it up completely in the pool worker nodes.
848 *
849 * @returns New, completely set up worker.
850 */
851 protected createAndSetupWorker (): Worker {
852 const worker = this.createWorker()
853
854 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
855 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
856 worker.on('error', error => {
857 const workerNodeKey = this.getWorkerNodeKey(worker)
858 const workerInfo = this.getWorkerInfo(workerNodeKey)
859 workerInfo.ready = false
860 this.workerNodes[workerNodeKey].closeChannel()
861 this.emitter?.emit(PoolEvents.error, error)
862 if (this.opts.restartWorkerOnError === true && !this.starting) {
863 if (workerInfo.dynamic) {
864 this.createAndSetupDynamicWorker()
865 } else {
866 this.createAndSetupWorker()
867 }
868 }
869 if (this.opts.enableTasksQueue === true) {
870 this.redistributeQueuedTasks(workerNodeKey)
871 }
872 })
873 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
874 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
875 worker.once('exit', () => {
876 this.removeWorkerNode(worker)
877 })
878
879 this.addWorkerNode(worker)
880
881 this.afterWorkerSetup(worker)
882
883 return worker
884 }
885
886 /**
887 * Creates a new dynamic worker and sets it up completely in the pool worker nodes.
888 *
889 * @returns New, completely set up dynamic worker.
890 */
891 protected createAndSetupDynamicWorker (): Worker {
892 const worker = this.createAndSetupWorker()
893 this.registerWorkerMessageListener(worker, message => {
894 const workerNodeKey = this.getWorkerNodeKey(worker)
895 if (
896 isKillBehavior(KillBehaviors.HARD, message.kill) ||
897 (message.kill != null &&
898 ((this.opts.enableTasksQueue === false &&
899 this.workerNodes[workerNodeKey].usage.tasks.executing === 0) ||
900 (this.opts.enableTasksQueue === true &&
901 this.workerNodes[workerNodeKey].usage.tasks.executing === 0 &&
902 this.tasksQueueSize(workerNodeKey) === 0)))
903 ) {
904 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
905 void (this.destroyWorker(worker) as Promise<void>)
906 }
907 })
908 const workerInfo = this.getWorkerInfoByWorker(worker)
909 workerInfo.dynamic = true
910 if (this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker) {
911 workerInfo.ready = true
912 }
913 this.sendToWorker(worker, {
914 checkActive: true,
915 workerId: workerInfo.id as number
916 })
917 return worker
918 }
919
920 /**
921 * Registers a listener callback on the given worker.
922 *
923 * @param worker - The worker which should register a listener.
924 * @param listener - The message listener callback.
925 */
926 protected abstract registerWorkerMessageListener<
927 Message extends Data | Response
928 >(worker: Worker, listener: (message: MessageValue<Message>) => void): void
929
930 /**
931 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
932 * Can be overridden.
933 *
934 * @param worker - The newly created worker.
935 */
936 protected afterWorkerSetup (worker: Worker): void {
937 // Listen to worker messages.
938 this.registerWorkerMessageListener(worker, this.workerListener())
939 // Send the startup message to worker.
940 this.sendStartupMessageToWorker(worker)
941 // Setup worker task statistics computation.
942 this.setWorkerStatistics(worker)
943 }
944
945 /**
946 * Sends the startup message to the given worker.
947 *
948 * @param worker - The worker which should receive the startup message.
949 */
950 protected abstract sendStartupMessageToWorker (worker: Worker): void
951
952 private redistributeQueuedTasks (workerNodeKey: number): void {
953 while (this.tasksQueueSize(workerNodeKey) > 0) {
954 let targetWorkerNodeKey: number = workerNodeKey
955 let minQueuedTasks = Infinity
956 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
957 const workerInfo = this.getWorkerInfo(workerNodeId)
958 if (
959 workerNodeId !== workerNodeKey &&
960 workerInfo.ready &&
961 workerNode.usage.tasks.queued === 0
962 ) {
963 targetWorkerNodeKey = workerNodeId
964 break
965 }
966 if (
967 workerNodeId !== workerNodeKey &&
968 workerInfo.ready &&
969 workerNode.usage.tasks.queued < minQueuedTasks
970 ) {
971 minQueuedTasks = workerNode.usage.tasks.queued
972 targetWorkerNodeKey = workerNodeId
973 }
974 }
975 this.enqueueTask(
976 targetWorkerNodeKey,
977 this.dequeueTask(workerNodeKey) as Task<Data>
978 )
979 }
980 }
981
982 /**
983 * This function is the listener registered for each worker message.
984 *
985 * @returns The listener function to execute when a message is received from a worker.
986 */
987 protected workerListener (): (message: MessageValue<Response>) => void {
988 return message => {
989 this.checkMessageWorkerId(message)
990 if (message.ready != null) {
991 // Worker ready response received
992 this.handleWorkerReadyResponse(message)
993 } else if (message.id != null) {
994 // Task execution response received
995 this.handleTaskExecutionResponse(message)
996 }
997 }
998 }
999
1000 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
1001 this.getWorkerInfoByWorker(
1002 this.getWorkerById(message.workerId) as Worker
1003 ).ready = message.ready as boolean
1004 if (this.emitter != null && this.ready) {
1005 this.emitter.emit(PoolEvents.ready, this.info)
1006 }
1007 }
1008
1009 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1010 const promiseResponse = this.promiseResponseMap.get(message.id as string)
1011 if (promiseResponse != null) {
1012 if (message.taskError != null) {
1013 this.emitter?.emit(PoolEvents.taskError, message.taskError)
1014 promiseResponse.reject(message.taskError.message)
1015 } else {
1016 promiseResponse.resolve(message.data as Response)
1017 }
1018 const workerNodeKey = promiseResponse.workerNodeKey
1019 this.afterTaskExecutionHook(workerNodeKey, message)
1020 this.promiseResponseMap.delete(message.id as string)
1021 if (
1022 this.opts.enableTasksQueue === true &&
1023 this.tasksQueueSize(workerNodeKey) > 0
1024 ) {
1025 this.executeTask(
1026 workerNodeKey,
1027 this.dequeueTask(workerNodeKey) as Task<Data>
1028 )
1029 }
1030 this.workerChoiceStrategyContext.update(workerNodeKey)
1031 }
1032 }
1033
1034 private checkAndEmitEvents (): void {
1035 if (this.emitter != null) {
1036 if (this.busy) {
1037 this.emitter.emit(PoolEvents.busy, this.info)
1038 }
1039 if (this.type === PoolTypes.dynamic && this.full) {
1040 this.emitter.emit(PoolEvents.full, this.info)
1041 }
1042 }
1043 }
1044
1045 /**
1046 * Gets the worker information from the given worker node key.
1047 *
1048 * @param workerNodeKey - The worker node key.
1049 * @returns The worker information.
1050 */
1051 private getWorkerInfo (workerNodeKey: number): WorkerInfo {
1052 return this.workerNodes[workerNodeKey].info
1053 }
1054
1055 /**
1056 * Gets the worker information from the given worker.
1057 *
1058 * @param worker - The worker.
1059 * @returns The worker information.
1060 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker is not found.
1061 */
1062 protected getWorkerInfoByWorker (worker: Worker): WorkerInfo {
1063 const workerNodeKey = this.getWorkerNodeKey(worker)
1064 if (workerNodeKey === -1) {
1065 throw new Error('Worker not found')
1066 }
1067 return this.workerNodes[workerNodeKey].info
1068 }
1069
1070 /**
1071 * Adds the given worker in the pool worker nodes.
1072 *
1073 * @param worker - The worker.
1074 * @returns The worker nodes length.
1075 */
1076 private addWorkerNode (worker: Worker): number {
1077 const workerNode = new WorkerNode<Worker, Data>(worker, this.worker)
1078 // Flag the worker node as ready at pool startup.
1079 if (this.starting) {
1080 workerNode.info.ready = true
1081 }
1082 return this.workerNodes.push(workerNode)
1083 }
1084
1085 /**
1086 * Removes the given worker from the pool worker nodes.
1087 *
1088 * @param worker - The worker.
1089 */
1090 private removeWorkerNode (worker: Worker): void {
1091 const workerNodeKey = this.getWorkerNodeKey(worker)
1092 if (workerNodeKey !== -1) {
1093 this.workerNodes.splice(workerNodeKey, 1)
1094 this.workerChoiceStrategyContext.remove(workerNodeKey)
1095 }
1096 }
1097
1098 /**
1099 * Executes the given task on the given worker.
1100 *
1101 * @param worker - The worker.
1102 * @param task - The task to execute.
1103 */
1104 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1105 this.beforeTaskExecutionHook(workerNodeKey, task)
1106 this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
1107 }
1108
1109 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1110 return this.workerNodes[workerNodeKey].enqueueTask(task)
1111 }
1112
1113 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1114 return this.workerNodes[workerNodeKey].dequeueTask()
1115 }
1116
1117 private tasksQueueSize (workerNodeKey: number): number {
1118 return this.workerNodes[workerNodeKey].tasksQueueSize()
1119 }
1120
1121 private flushTasksQueue (workerNodeKey: number): void {
1122 while (this.tasksQueueSize(workerNodeKey) > 0) {
1123 this.executeTask(
1124 workerNodeKey,
1125 this.dequeueTask(workerNodeKey) as Task<Data>
1126 )
1127 }
1128 this.workerNodes[workerNodeKey].clearTasksQueue()
1129 }
1130
1131 private flushTasksQueues (): void {
1132 for (const [workerNodeKey] of this.workerNodes.entries()) {
1133 this.flushTasksQueue(workerNodeKey)
1134 }
1135 }
1136
1137 private setWorkerStatistics (worker: Worker): void {
1138 this.sendToWorker(worker, {
1139 statistics: {
1140 runTime:
1141 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1142 .runTime.aggregate,
1143 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1144 .elu.aggregate
1145 },
1146 workerId: this.getWorkerInfoByWorker(worker).id as number
1147 })
1148 }
1149 }