build: switch from prettier to rome as code formatter
[poolifier.git] / src / pools / abstract-pool.ts
1 import { randomUUID } from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import { existsSync } from 'node:fs'
4 import { type TransferListItem } from 'node:worker_threads'
5 import type {
6 MessageValue,
7 PromiseResponseWrapper,
8 Task
9 } from '../utility-types'
10 import {
11 DEFAULT_TASK_NAME,
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
13 EMPTY_FUNCTION,
14 isKillBehavior,
15 isPlainObject,
16 median,
17 round,
18 updateMeasurementStatistics
19 } from '../utils'
20 import { KillBehaviors } from '../worker/worker-options'
21 import {
22 type IPool,
23 PoolEmitter,
24 PoolEvents,
25 type PoolInfo,
26 type PoolOptions,
27 type PoolType,
28 PoolTypes,
29 type TasksQueueOptions
30 } from './pool'
31 import type {
32 IWorker,
33 IWorkerNode,
34 WorkerInfo,
35 WorkerType,
36 WorkerUsage
37 } from './worker'
38 import {
39 type MeasurementStatisticsRequirements,
40 Measurements,
41 WorkerChoiceStrategies,
42 type WorkerChoiceStrategy,
43 type WorkerChoiceStrategyOptions
44 } from './selection-strategies/selection-strategies-types'
45 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
46 import { version } from './version'
47 import { WorkerNode } from './worker-node'
48
49 /**
50 * Base class that implements some shared logic for all poolifier pools.
51 *
52 * @typeParam Worker - Type of worker which manages this pool.
53 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
54 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
55 */
56 export abstract class AbstractPool<
57 Worker extends IWorker,
58 Data = unknown,
59 Response = unknown
60 > implements IPool<Worker, Data, Response> {
61 /** @inheritDoc */
62 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
63
64 /** @inheritDoc */
65 public readonly emitter?: PoolEmitter
66
67 /**
68 * The task execution response promise map.
69 *
70 * - `key`: The message id of each submitted task.
71 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
72 *
73 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
74 */
75 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
76 new Map<string, PromiseResponseWrapper<Response>>()
77
78 /**
79 * Worker choice strategy context referencing a worker choice algorithm implementation.
80 */
81 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
82 Worker,
83 Data,
84 Response
85 >
86
87 /**
88 * Whether the pool is starting or not.
89 */
90 private readonly starting: boolean
91 /**
92 * The start timestamp of the pool.
93 */
94 private readonly startTimestamp
95
96 /**
97 * Constructs a new poolifier pool.
98 *
99 * @param numberOfWorkers - Number of workers that this pool should manage.
100 * @param filePath - Path to the worker file.
101 * @param opts - Options for the pool.
102 */
103 public constructor (
104 protected readonly numberOfWorkers: number,
105 protected readonly filePath: string,
106 protected readonly opts: PoolOptions<Worker>
107 ) {
108 if (!this.isMain()) {
109 throw new Error('Cannot start a pool from a worker!')
110 }
111 this.checkNumberOfWorkers(this.numberOfWorkers)
112 this.checkFilePath(this.filePath)
113 this.checkPoolOptions(this.opts)
114
115 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
116 this.executeTask = this.executeTask.bind(this)
117 this.enqueueTask = this.enqueueTask.bind(this)
118 this.dequeueTask = this.dequeueTask.bind(this)
119 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
120
121 if (this.opts.enableEvents === true) {
122 this.emitter = new PoolEmitter()
123 }
124 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
125 Worker,
126 Data,
127 Response
128 >(
129 this,
130 this.opts.workerChoiceStrategy,
131 this.opts.workerChoiceStrategyOptions
132 )
133
134 this.setupHook()
135
136 this.starting = true
137 this.startPool()
138 this.starting = false
139
140 this.startTimestamp = performance.now()
141 }
142
143 private checkFilePath (filePath: string): void {
144 if (
145 filePath == null ||
146 typeof filePath !== 'string' ||
147 (typeof filePath === 'string' && filePath.trim().length === 0)
148 ) {
149 throw new Error('Please specify a file with a worker implementation')
150 }
151 if (!existsSync(filePath)) {
152 throw new Error(`Cannot find the worker file '${filePath}'`)
153 }
154 }
155
156 private checkNumberOfWorkers (numberOfWorkers: number): void {
157 if (numberOfWorkers == null) {
158 throw new Error(
159 'Cannot instantiate a pool without specifying the number of workers'
160 )
161 } else if (!Number.isSafeInteger(numberOfWorkers)) {
162 throw new TypeError(
163 'Cannot instantiate a pool with a non safe integer number of workers'
164 )
165 } else if (numberOfWorkers < 0) {
166 throw new RangeError(
167 'Cannot instantiate a pool with a negative number of workers'
168 )
169 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
170 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
171 }
172 }
173
174 protected checkDynamicPoolSize (min: number, max: number): void {
175 if (this.type === PoolTypes.dynamic) {
176 if (max == null) {
177 throw new Error(
178 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
179 )
180 } else if (!Number.isSafeInteger(max)) {
181 throw new TypeError(
182 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
183 )
184 } else if (min > max) {
185 throw new RangeError(
186 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
187 )
188 } else if (max === 0) {
189 throw new RangeError(
190 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
191 )
192 } else if (min === max) {
193 throw new RangeError(
194 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
195 )
196 }
197 }
198 }
199
200 private checkPoolOptions (opts: PoolOptions<Worker>): void {
201 if (isPlainObject(opts)) {
202 this.opts.workerChoiceStrategy =
203 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
204 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
205 this.opts.workerChoiceStrategyOptions =
206 opts.workerChoiceStrategyOptions ??
207 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
208 this.checkValidWorkerChoiceStrategyOptions(
209 this.opts.workerChoiceStrategyOptions
210 )
211 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
212 this.opts.enableEvents = opts.enableEvents ?? true
213 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
214 if (this.opts.enableTasksQueue) {
215 this.checkValidTasksQueueOptions(
216 opts.tasksQueueOptions as TasksQueueOptions
217 )
218 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
219 opts.tasksQueueOptions as TasksQueueOptions
220 )
221 }
222 } else {
223 throw new TypeError('Invalid pool options: must be a plain object')
224 }
225 }
226
227 private checkValidWorkerChoiceStrategy (
228 workerChoiceStrategy: WorkerChoiceStrategy
229 ): void {
230 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
231 throw new Error(
232 `Invalid worker choice strategy '${workerChoiceStrategy}'`
233 )
234 }
235 }
236
237 private checkValidWorkerChoiceStrategyOptions (
238 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
239 ): void {
240 if (!isPlainObject(workerChoiceStrategyOptions)) {
241 throw new TypeError(
242 'Invalid worker choice strategy options: must be a plain object'
243 )
244 }
245 if (
246 workerChoiceStrategyOptions.weights != null &&
247 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
248 ) {
249 throw new Error(
250 'Invalid worker choice strategy options: must have a weight for each worker node'
251 )
252 }
253 if (
254 workerChoiceStrategyOptions.measurement != null &&
255 !Object.values(Measurements).includes(
256 workerChoiceStrategyOptions.measurement
257 )
258 ) {
259 throw new Error(
260 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
261 )
262 }
263 }
264
265 private checkValidTasksQueueOptions (
266 tasksQueueOptions: TasksQueueOptions
267 ): void {
268 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
269 throw new TypeError('Invalid tasks queue options: must be a plain object')
270 }
271 if (
272 tasksQueueOptions?.concurrency != null &&
273 !Number.isSafeInteger(tasksQueueOptions.concurrency)
274 ) {
275 throw new TypeError(
276 'Invalid worker tasks concurrency: must be an integer'
277 )
278 }
279 if (
280 tasksQueueOptions?.concurrency != null &&
281 tasksQueueOptions.concurrency <= 0
282 ) {
283 throw new Error(
284 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
285 )
286 }
287 }
288
289 private startPool (): void {
290 while (
291 this.workerNodes.reduce(
292 (accumulator, workerNode) =>
293 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
294 0
295 ) < this.numberOfWorkers
296 ) {
297 this.createAndSetupWorkerNode()
298 }
299 }
300
301 /** @inheritDoc */
302 public get info (): PoolInfo {
303 return {
304 version,
305 type: this.type,
306 worker: this.worker,
307 ready: this.ready,
308 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
309 minSize: this.minSize,
310 maxSize: this.maxSize,
311 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
312 .runTime.aggregate &&
313 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
314 .waitTime.aggregate && { utilization: round(this.utilization) }),
315 workerNodes: this.workerNodes.length,
316 idleWorkerNodes: this.workerNodes.reduce(
317 (accumulator, workerNode) =>
318 workerNode.usage.tasks.executing === 0
319 ? accumulator + 1
320 : accumulator,
321 0
322 ),
323 busyWorkerNodes: this.workerNodes.reduce(
324 (accumulator, workerNode) =>
325 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
326 0
327 ),
328 executedTasks: this.workerNodes.reduce(
329 (accumulator, workerNode) =>
330 accumulator + workerNode.usage.tasks.executed,
331 0
332 ),
333 executingTasks: this.workerNodes.reduce(
334 (accumulator, workerNode) =>
335 accumulator + workerNode.usage.tasks.executing,
336 0
337 ),
338 ...(this.opts.enableTasksQueue === true && {
339 queuedTasks: this.workerNodes.reduce(
340 (accumulator, workerNode) =>
341 accumulator + workerNode.usage.tasks.queued,
342 0
343 )
344 }),
345 ...(this.opts.enableTasksQueue === true && {
346 maxQueuedTasks: this.workerNodes.reduce(
347 (accumulator, workerNode) =>
348 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
349 0
350 )
351 }),
352 failedTasks: this.workerNodes.reduce(
353 (accumulator, workerNode) =>
354 accumulator + workerNode.usage.tasks.failed,
355 0
356 ),
357 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
358 .runTime.aggregate && {
359 runTime: {
360 minimum: round(
361 Math.min(
362 ...this.workerNodes.map(
363 (workerNode) => workerNode.usage.runTime?.minimum ?? Infinity
364 )
365 )
366 ),
367 maximum: round(
368 Math.max(
369 ...this.workerNodes.map(
370 (workerNode) => workerNode.usage.runTime?.maximum ?? -Infinity
371 )
372 )
373 ),
374 average: round(
375 this.workerNodes.reduce(
376 (accumulator, workerNode) =>
377 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
378 0
379 ) /
380 this.workerNodes.reduce(
381 (accumulator, workerNode) =>
382 accumulator + (workerNode.usage.tasks?.executed ?? 0),
383 0
384 )
385 ),
386 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
387 .runTime.median && {
388 median: round(
389 median(
390 this.workerNodes.map(
391 (workerNode) => workerNode.usage.runTime?.median ?? 0
392 )
393 )
394 )
395 })
396 }
397 }),
398 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
399 .waitTime.aggregate && {
400 waitTime: {
401 minimum: round(
402 Math.min(
403 ...this.workerNodes.map(
404 (workerNode) => workerNode.usage.waitTime?.minimum ?? Infinity
405 )
406 )
407 ),
408 maximum: round(
409 Math.max(
410 ...this.workerNodes.map(
411 (workerNode) => workerNode.usage.waitTime?.maximum ?? -Infinity
412 )
413 )
414 ),
415 average: round(
416 this.workerNodes.reduce(
417 (accumulator, workerNode) =>
418 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
419 0
420 ) /
421 this.workerNodes.reduce(
422 (accumulator, workerNode) =>
423 accumulator + (workerNode.usage.tasks?.executed ?? 0),
424 0
425 )
426 ),
427 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
428 .waitTime.median && {
429 median: round(
430 median(
431 this.workerNodes.map(
432 (workerNode) => workerNode.usage.waitTime?.median ?? 0
433 )
434 )
435 )
436 })
437 }
438 })
439 }
440 }
441
442 /**
443 * The pool readiness boolean status.
444 */
445 private get ready (): boolean {
446 return (
447 this.workerNodes.reduce(
448 (accumulator, workerNode) =>
449 !workerNode.info.dynamic && workerNode.info.ready
450 ? accumulator + 1
451 : accumulator,
452 0
453 ) >= this.minSize
454 )
455 }
456
457 /**
458 * The approximate pool utilization.
459 *
460 * @returns The pool utilization.
461 */
462 private get utilization (): number {
463 const poolTimeCapacity =
464 (performance.now() - this.startTimestamp) * this.maxSize
465 const totalTasksRunTime = this.workerNodes.reduce(
466 (accumulator, workerNode) =>
467 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
468 0
469 )
470 const totalTasksWaitTime = this.workerNodes.reduce(
471 (accumulator, workerNode) =>
472 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
473 0
474 )
475 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
476 }
477
478 /**
479 * The pool type.
480 *
481 * If it is `'dynamic'`, it provides the `max` property.
482 */
483 protected abstract get type (): PoolType
484
485 /**
486 * The worker type.
487 */
488 protected abstract get worker (): WorkerType
489
490 /**
491 * The pool minimum size.
492 */
493 protected abstract get minSize (): number
494
495 /**
496 * The pool maximum size.
497 */
498 protected abstract get maxSize (): number
499
500 /**
501 * Checks if the worker id sent in the received message from a worker is valid.
502 *
503 * @param message - The received message.
504 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
505 */
506 private checkMessageWorkerId (message: MessageValue<Response>): void {
507 if (
508 message.workerId != null &&
509 this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
510 ) {
511 throw new Error(
512 `Worker message received from unknown worker '${message.workerId}'`
513 )
514 }
515 }
516
517 /**
518 * Gets the given worker its worker node key.
519 *
520 * @param worker - The worker.
521 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
522 */
523 private getWorkerNodeKeyByWorker (worker: Worker): number {
524 return this.workerNodes.findIndex(
525 (workerNode) => workerNode.worker === worker
526 )
527 }
528
529 /**
530 * Gets the worker node key given its worker id.
531 *
532 * @param workerId - The worker id.
533 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
534 */
535 private getWorkerNodeKeyByWorkerId (workerId: number): number {
536 return this.workerNodes.findIndex(
537 (workerNode) => workerNode.info.id === workerId
538 )
539 }
540
541 /** @inheritDoc */
542 public setWorkerChoiceStrategy (
543 workerChoiceStrategy: WorkerChoiceStrategy,
544 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
545 ): void {
546 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
547 this.opts.workerChoiceStrategy = workerChoiceStrategy
548 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
549 this.opts.workerChoiceStrategy
550 )
551 if (workerChoiceStrategyOptions != null) {
552 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
553 }
554 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
555 workerNode.resetUsage()
556 this.sendWorkerStatisticsMessageToWorker(workerNodeKey)
557 }
558 }
559
560 /** @inheritDoc */
561 public setWorkerChoiceStrategyOptions (
562 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
563 ): void {
564 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
565 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
566 this.workerChoiceStrategyContext.setOptions(
567 this.opts.workerChoiceStrategyOptions
568 )
569 }
570
571 /** @inheritDoc */
572 public enableTasksQueue (
573 enable: boolean,
574 tasksQueueOptions?: TasksQueueOptions
575 ): void {
576 if (this.opts.enableTasksQueue === true && !enable) {
577 this.flushTasksQueues()
578 }
579 this.opts.enableTasksQueue = enable
580 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
581 }
582
583 /** @inheritDoc */
584 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
585 if (this.opts.enableTasksQueue === true) {
586 this.checkValidTasksQueueOptions(tasksQueueOptions)
587 this.opts.tasksQueueOptions =
588 this.buildTasksQueueOptions(tasksQueueOptions)
589 } else if (this.opts.tasksQueueOptions != null) {
590 delete this.opts.tasksQueueOptions
591 }
592 }
593
594 private buildTasksQueueOptions (
595 tasksQueueOptions: TasksQueueOptions
596 ): TasksQueueOptions {
597 return {
598 concurrency: tasksQueueOptions?.concurrency ?? 1
599 }
600 }
601
602 /**
603 * Whether the pool is full or not.
604 *
605 * The pool filling boolean status.
606 */
607 protected get full (): boolean {
608 return this.workerNodes.length >= this.maxSize
609 }
610
611 /**
612 * Whether the pool is busy or not.
613 *
614 * The pool busyness boolean status.
615 */
616 protected abstract get busy (): boolean
617
618 /**
619 * Whether worker nodes are executing concurrently their tasks quota or not.
620 *
621 * @returns Worker nodes busyness boolean status.
622 */
623 protected internalBusy (): boolean {
624 if (this.opts.enableTasksQueue === true) {
625 return (
626 this.workerNodes.findIndex(
627 (workerNode) =>
628 workerNode.info.ready &&
629 workerNode.usage.tasks.executing <
630 (this.opts.tasksQueueOptions?.concurrency as number)
631 ) === -1
632 )
633 } else {
634 return (
635 this.workerNodes.findIndex(
636 (workerNode) =>
637 workerNode.info.ready && workerNode.usage.tasks.executing === 0
638 ) === -1
639 )
640 }
641 }
642
643 /** @inheritDoc */
644 public async execute (
645 data?: Data,
646 name?: string,
647 transferList?: TransferListItem[]
648 ): Promise<Response> {
649 return await new Promise<Response>((resolve, reject) => {
650 if (name != null && typeof name !== 'string') {
651 reject(new TypeError('name argument must be a string'))
652 }
653 if (transferList != null && !Array.isArray(transferList)) {
654 reject(new TypeError('transferList argument must be an array'))
655 }
656 const timestamp = performance.now()
657 const workerNodeKey = this.chooseWorkerNode()
658 const task: Task<Data> = {
659 name: name ?? DEFAULT_TASK_NAME,
660 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
661 data: data ?? ({} as Data),
662 transferList,
663 timestamp,
664 workerId: this.getWorkerInfo(workerNodeKey).id as number,
665 taskId: randomUUID()
666 }
667 this.promiseResponseMap.set(task.taskId as string, {
668 resolve,
669 reject,
670 workerNodeKey
671 })
672 if (
673 this.opts.enableTasksQueue === false ||
674 (this.opts.enableTasksQueue === true &&
675 this.workerNodes[workerNodeKey].usage.tasks.executing <
676 (this.opts.tasksQueueOptions?.concurrency as number))
677 ) {
678 this.executeTask(workerNodeKey, task)
679 } else {
680 this.enqueueTask(workerNodeKey, task)
681 }
682 this.checkAndEmitEvents()
683 })
684 }
685
686 /** @inheritDoc */
687 public async destroy (): Promise<void> {
688 await Promise.all(
689 this.workerNodes.map(async (_, workerNodeKey) => {
690 await this.destroyWorkerNode(workerNodeKey)
691 })
692 )
693 }
694
695 /**
696 * Terminates the worker node given its worker node key.
697 *
698 * @param workerNodeKey - The worker node key.
699 */
700 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
701
702 /**
703 * Setup hook to execute code before worker nodes are created in the abstract constructor.
704 * Can be overridden.
705 *
706 * @virtual
707 */
708 protected setupHook (): void {
709 // Intentionally empty
710 }
711
712 /**
713 * Should return whether the worker is the main worker or not.
714 */
715 protected abstract isMain (): boolean
716
717 /**
718 * Hook executed before the worker task execution.
719 * Can be overridden.
720 *
721 * @param workerNodeKey - The worker node key.
722 * @param task - The task to execute.
723 */
724 protected beforeTaskExecutionHook (
725 workerNodeKey: number,
726 task: Task<Data>
727 ): void {
728 const workerUsage = this.workerNodes[workerNodeKey].usage
729 ++workerUsage.tasks.executing
730 this.updateWaitTimeWorkerUsage(workerUsage, task)
731 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
732 task.name as string
733 ) as WorkerUsage
734 ++taskWorkerUsage.tasks.executing
735 this.updateWaitTimeWorkerUsage(taskWorkerUsage, task)
736 }
737
738 /**
739 * Hook executed after the worker task execution.
740 * Can be overridden.
741 *
742 * @param workerNodeKey - The worker node key.
743 * @param message - The received message.
744 */
745 protected afterTaskExecutionHook (
746 workerNodeKey: number,
747 message: MessageValue<Response>
748 ): void {
749 const workerUsage = this.workerNodes[workerNodeKey].usage
750 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
751 this.updateRunTimeWorkerUsage(workerUsage, message)
752 this.updateEluWorkerUsage(workerUsage, message)
753 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
754 message.taskPerformance?.name ?? DEFAULT_TASK_NAME
755 ) as WorkerUsage
756 this.updateTaskStatisticsWorkerUsage(taskWorkerUsage, message)
757 this.updateRunTimeWorkerUsage(taskWorkerUsage, message)
758 this.updateEluWorkerUsage(taskWorkerUsage, message)
759 }
760
761 private updateTaskStatisticsWorkerUsage (
762 workerUsage: WorkerUsage,
763 message: MessageValue<Response>
764 ): void {
765 const workerTaskStatistics = workerUsage.tasks
766 --workerTaskStatistics.executing
767 if (message.taskError == null) {
768 ++workerTaskStatistics.executed
769 } else {
770 ++workerTaskStatistics.failed
771 }
772 }
773
774 private updateRunTimeWorkerUsage (
775 workerUsage: WorkerUsage,
776 message: MessageValue<Response>
777 ): void {
778 updateMeasurementStatistics(
779 workerUsage.runTime,
780 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
781 message.taskPerformance?.runTime ?? 0,
782 workerUsage.tasks.executed
783 )
784 }
785
786 private updateWaitTimeWorkerUsage (
787 workerUsage: WorkerUsage,
788 task: Task<Data>
789 ): void {
790 const timestamp = performance.now()
791 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
792 updateMeasurementStatistics(
793 workerUsage.waitTime,
794 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
795 taskWaitTime,
796 workerUsage.tasks.executed
797 )
798 }
799
800 private updateEluWorkerUsage (
801 workerUsage: WorkerUsage,
802 message: MessageValue<Response>
803 ): void {
804 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
805 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
806 updateMeasurementStatistics(
807 workerUsage.elu.active,
808 eluTaskStatisticsRequirements,
809 message.taskPerformance?.elu?.active ?? 0,
810 workerUsage.tasks.executed
811 )
812 updateMeasurementStatistics(
813 workerUsage.elu.idle,
814 eluTaskStatisticsRequirements,
815 message.taskPerformance?.elu?.idle ?? 0,
816 workerUsage.tasks.executed
817 )
818 if (eluTaskStatisticsRequirements.aggregate) {
819 if (message.taskPerformance?.elu != null) {
820 if (workerUsage.elu.utilization != null) {
821 workerUsage.elu.utilization =
822 (workerUsage.elu.utilization +
823 message.taskPerformance.elu.utilization) /
824 2
825 } else {
826 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
827 }
828 }
829 }
830 }
831
832 /**
833 * Chooses a worker node for the next task.
834 *
835 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
836 *
837 * @returns The chosen worker node key
838 */
839 private chooseWorkerNode (): number {
840 if (this.shallCreateDynamicWorker()) {
841 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
842 if (
843 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
844 ) {
845 return workerNodeKey
846 }
847 }
848 return this.workerChoiceStrategyContext.execute()
849 }
850
851 /**
852 * Conditions for dynamic worker creation.
853 *
854 * @returns Whether to create a dynamic worker or not.
855 */
856 private shallCreateDynamicWorker (): boolean {
857 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
858 }
859
860 /**
861 * Sends a message to worker given its worker node key.
862 *
863 * @param workerNodeKey - The worker node key.
864 * @param message - The message.
865 * @param transferList - The optional array of transferable objects.
866 */
867 protected abstract sendToWorker (
868 workerNodeKey: number,
869 message: MessageValue<Data>,
870 transferList?: TransferListItem[]
871 ): void
872
873 /**
874 * Creates a new worker.
875 *
876 * @returns Newly created worker.
877 */
878 protected abstract createWorker (): Worker
879
880 /**
881 * Creates a new, completely set up worker node.
882 *
883 * @returns New, completely set up worker node key.
884 */
885 protected createAndSetupWorkerNode (): number {
886 const worker = this.createWorker()
887
888 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
889 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
890 worker.on('error', (error) => {
891 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
892 const workerInfo = this.getWorkerInfo(workerNodeKey)
893 workerInfo.ready = false
894 this.workerNodes[workerNodeKey].closeChannel()
895 this.emitter?.emit(PoolEvents.error, error)
896 if (this.opts.restartWorkerOnError === true && !this.starting) {
897 if (workerInfo.dynamic) {
898 this.createAndSetupDynamicWorkerNode()
899 } else {
900 this.createAndSetupWorkerNode()
901 }
902 }
903 if (this.opts.enableTasksQueue === true) {
904 this.redistributeQueuedTasks(workerNodeKey)
905 }
906 })
907 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
908 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
909 worker.once('exit', () => {
910 this.removeWorkerNode(worker)
911 })
912
913 const workerNodeKey = this.addWorkerNode(worker)
914
915 this.afterWorkerNodeSetup(workerNodeKey)
916
917 return workerNodeKey
918 }
919
920 /**
921 * Creates a new, completely set up dynamic worker node.
922 *
923 * @returns New, completely set up dynamic worker node key.
924 */
925 protected createAndSetupDynamicWorkerNode (): number {
926 const workerNodeKey = this.createAndSetupWorkerNode()
927 this.registerWorkerMessageListener(workerNodeKey, (message) => {
928 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
929 message.workerId
930 )
931 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
932 // Kill message received from worker
933 if (
934 isKillBehavior(KillBehaviors.HARD, message.kill) ||
935 (message.kill != null &&
936 ((this.opts.enableTasksQueue === false &&
937 workerUsage.tasks.executing === 0) ||
938 (this.opts.enableTasksQueue === true &&
939 workerUsage.tasks.executing === 0 &&
940 this.tasksQueueSize(localWorkerNodeKey) === 0)))
941 ) {
942 this.destroyWorkerNode(localWorkerNodeKey).catch(EMPTY_FUNCTION)
943 }
944 })
945 const workerInfo = this.getWorkerInfo(workerNodeKey)
946 this.sendToWorker(workerNodeKey, {
947 checkActive: true,
948 workerId: workerInfo.id as number
949 })
950 workerInfo.dynamic = true
951 if (this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker) {
952 workerInfo.ready = true
953 }
954 return workerNodeKey
955 }
956
957 /**
958 * Registers a listener callback on the worker given its worker node key.
959 *
960 * @param workerNodeKey - The worker node key.
961 * @param listener - The message listener callback.
962 */
963 protected abstract registerWorkerMessageListener<
964 Message extends Data | Response
965 >(
966 workerNodeKey: number,
967 listener: (message: MessageValue<Message>) => void
968 ): void
969
970 /**
971 * Method hooked up after a worker node has been newly created.
972 * Can be overridden.
973 *
974 * @param workerNodeKey - The newly created worker node key.
975 */
976 protected afterWorkerNodeSetup (workerNodeKey: number): void {
977 // Listen to worker messages.
978 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
979 // Send the startup message to worker.
980 this.sendStartupMessageToWorker(workerNodeKey)
981 // Send the worker statistics message to worker.
982 this.sendWorkerStatisticsMessageToWorker(workerNodeKey)
983 }
984
985 /**
986 * Sends the startup message to worker given its worker node key.
987 *
988 * @param workerNodeKey - The worker node key.
989 */
990 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
991
992 /**
993 * Sends the worker statistics message to worker given its worker node key.
994 *
995 * @param workerNodeKey - The worker node key.
996 */
997 private sendWorkerStatisticsMessageToWorker (workerNodeKey: number): void {
998 this.sendToWorker(workerNodeKey, {
999 statistics: {
1000 runTime:
1001 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1002 .runTime.aggregate,
1003 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1004 .elu.aggregate
1005 },
1006 workerId: this.getWorkerInfo(workerNodeKey).id as number
1007 })
1008 }
1009
1010 private redistributeQueuedTasks (workerNodeKey: number): void {
1011 while (this.tasksQueueSize(workerNodeKey) > 0) {
1012 let targetWorkerNodeKey: number = workerNodeKey
1013 let minQueuedTasks = Infinity
1014 let executeTask = false
1015 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
1016 const workerInfo = this.getWorkerInfo(workerNodeId)
1017 if (
1018 workerNodeId !== workerNodeKey &&
1019 workerInfo.ready &&
1020 workerNode.usage.tasks.queued === 0
1021 ) {
1022 if (
1023 this.workerNodes[workerNodeId].usage.tasks.executing <
1024 (this.opts.tasksQueueOptions?.concurrency as number)
1025 ) {
1026 executeTask = true
1027 }
1028 targetWorkerNodeKey = workerNodeId
1029 break
1030 }
1031 if (
1032 workerNodeId !== workerNodeKey &&
1033 workerInfo.ready &&
1034 workerNode.usage.tasks.queued < minQueuedTasks
1035 ) {
1036 minQueuedTasks = workerNode.usage.tasks.queued
1037 targetWorkerNodeKey = workerNodeId
1038 }
1039 }
1040 if (executeTask) {
1041 this.executeTask(
1042 targetWorkerNodeKey,
1043 this.dequeueTask(workerNodeKey) as Task<Data>
1044 )
1045 } else {
1046 this.enqueueTask(
1047 targetWorkerNodeKey,
1048 this.dequeueTask(workerNodeKey) as Task<Data>
1049 )
1050 }
1051 }
1052 }
1053
1054 /**
1055 * This method is the listener registered for each worker message.
1056 *
1057 * @returns The listener function to execute when a message is received from a worker.
1058 */
1059 protected workerListener (): (message: MessageValue<Response>) => void {
1060 return (message) => {
1061 this.checkMessageWorkerId(message)
1062 if (message.ready != null) {
1063 // Worker ready response received from worker
1064 this.handleWorkerReadyResponse(message)
1065 } else if (message.taskId != null) {
1066 // Task execution response received from worker
1067 this.handleTaskExecutionResponse(message)
1068 }
1069 }
1070 }
1071
1072 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
1073 this.getWorkerInfo(
1074 this.getWorkerNodeKeyByWorkerId(message.workerId)
1075 ).ready = message.ready as boolean
1076 if (this.emitter != null && this.ready) {
1077 this.emitter.emit(PoolEvents.ready, this.info)
1078 }
1079 }
1080
1081 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1082 const promiseResponse = this.promiseResponseMap.get(
1083 message.taskId as string
1084 )
1085 if (promiseResponse != null) {
1086 if (message.taskError != null) {
1087 this.emitter?.emit(PoolEvents.taskError, message.taskError)
1088 promiseResponse.reject(message.taskError.message)
1089 } else {
1090 promiseResponse.resolve(message.data as Response)
1091 }
1092 const workerNodeKey = promiseResponse.workerNodeKey
1093 this.afterTaskExecutionHook(workerNodeKey, message)
1094 this.promiseResponseMap.delete(message.taskId as string)
1095 if (
1096 this.opts.enableTasksQueue === true &&
1097 this.tasksQueueSize(workerNodeKey) > 0 &&
1098 this.workerNodes[workerNodeKey].usage.tasks.executing <
1099 (this.opts.tasksQueueOptions?.concurrency as number)
1100 ) {
1101 this.executeTask(
1102 workerNodeKey,
1103 this.dequeueTask(workerNodeKey) as Task<Data>
1104 )
1105 }
1106 this.workerChoiceStrategyContext.update(workerNodeKey)
1107 }
1108 }
1109
1110 private checkAndEmitEvents (): void {
1111 if (this.emitter != null) {
1112 if (this.busy) {
1113 this.emitter.emit(PoolEvents.busy, this.info)
1114 }
1115 if (this.type === PoolTypes.dynamic && this.full) {
1116 this.emitter.emit(PoolEvents.full, this.info)
1117 }
1118 }
1119 }
1120
1121 /**
1122 * Gets the worker information given its worker node key.
1123 *
1124 * @param workerNodeKey - The worker node key.
1125 * @returns The worker information.
1126 */
1127 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
1128 return this.workerNodes[workerNodeKey].info
1129 }
1130
1131 /**
1132 * Adds the given worker in the pool worker nodes.
1133 *
1134 * @param worker - The worker.
1135 * @returns The added worker node key.
1136 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1137 */
1138 private addWorkerNode (worker: Worker): number {
1139 const workerNode = new WorkerNode<Worker, Data>(worker, this.worker)
1140 // Flag the worker node as ready at pool startup.
1141 if (this.starting) {
1142 workerNode.info.ready = true
1143 }
1144 this.workerNodes.push(workerNode)
1145 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1146 if (workerNodeKey === -1) {
1147 throw new Error('Worker node not found')
1148 }
1149 return workerNodeKey
1150 }
1151
1152 /**
1153 * Removes the given worker from the pool worker nodes.
1154 *
1155 * @param worker - The worker.
1156 */
1157 private removeWorkerNode (worker: Worker): void {
1158 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1159 if (workerNodeKey !== -1) {
1160 this.workerNodes.splice(workerNodeKey, 1)
1161 this.workerChoiceStrategyContext.remove(workerNodeKey)
1162 }
1163 }
1164
1165 /**
1166 * Executes the given task on the worker given its worker node key.
1167 *
1168 * @param workerNodeKey - The worker node key.
1169 * @param task - The task to execute.
1170 */
1171 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1172 this.beforeTaskExecutionHook(workerNodeKey, task)
1173 this.sendToWorker(workerNodeKey, task, task.transferList)
1174 }
1175
1176 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1177 return this.workerNodes[workerNodeKey].enqueueTask(task)
1178 }
1179
1180 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1181 return this.workerNodes[workerNodeKey].dequeueTask()
1182 }
1183
1184 private tasksQueueSize (workerNodeKey: number): number {
1185 return this.workerNodes[workerNodeKey].tasksQueueSize()
1186 }
1187
1188 protected flushTasksQueue (workerNodeKey: number): void {
1189 while (this.tasksQueueSize(workerNodeKey) > 0) {
1190 this.executeTask(
1191 workerNodeKey,
1192 this.dequeueTask(workerNodeKey) as Task<Data>
1193 )
1194 }
1195 this.workerNodes[workerNodeKey].clearTasksQueue()
1196 }
1197
1198 private flushTasksQueues (): void {
1199 for (const [workerNodeKey] of this.workerNodes.entries()) {
1200 this.flushTasksQueue(workerNodeKey)
1201 }
1202 }
1203 }