fix: make pool busyness check for worker readiness
[poolifier.git] / src / pools / abstract-pool.ts
1 import { randomUUID } from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import { existsSync } from 'node:fs'
4 import type {
5 MessageValue,
6 PromiseResponseWrapper,
7 Task
8 } from '../utility-types'
9 import {
10 DEFAULT_TASK_NAME,
11 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
12 EMPTY_FUNCTION,
13 isKillBehavior,
14 isPlainObject,
15 median,
16 round,
17 updateMeasurementStatistics
18 } from '../utils'
19 import { KillBehaviors } from '../worker/worker-options'
20 import {
21 type IPool,
22 PoolEmitter,
23 PoolEvents,
24 type PoolInfo,
25 type PoolOptions,
26 type PoolType,
27 PoolTypes,
28 type TasksQueueOptions
29 } from './pool'
30 import type {
31 IWorker,
32 IWorkerNode,
33 WorkerInfo,
34 WorkerType,
35 WorkerUsage
36 } from './worker'
37 import {
38 type MeasurementStatisticsRequirements,
39 Measurements,
40 WorkerChoiceStrategies,
41 type WorkerChoiceStrategy,
42 type WorkerChoiceStrategyOptions
43 } from './selection-strategies/selection-strategies-types'
44 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
45 import { version } from './version'
46 import { WorkerNode } from './worker-node'
47
48 /**
49 * Base class that implements some shared logic for all poolifier pools.
50 *
51 * @typeParam Worker - Type of worker which manages this pool.
52 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
53 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
54 */
55 export abstract class AbstractPool<
56 Worker extends IWorker,
57 Data = unknown,
58 Response = unknown
59 > implements IPool<Worker, Data, Response> {
60 /** @inheritDoc */
61 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
62
63 /** @inheritDoc */
64 public readonly emitter?: PoolEmitter
65
66 /**
67 * The task execution response promise map.
68 *
69 * - `key`: The message id of each submitted task.
70 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
71 *
72 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
73 */
74 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
75 new Map<string, PromiseResponseWrapper<Response>>()
76
77 /**
78 * Worker choice strategy context referencing a worker choice algorithm implementation.
79 */
80 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
81 Worker,
82 Data,
83 Response
84 >
85
86 /**
87 * Whether the pool is starting or not.
88 */
89 private readonly starting: boolean
90 /**
91 * The start timestamp of the pool.
92 */
93 private readonly startTimestamp
94
95 /**
96 * Constructs a new poolifier pool.
97 *
98 * @param numberOfWorkers - Number of workers that this pool should manage.
99 * @param filePath - Path to the worker file.
100 * @param opts - Options for the pool.
101 */
102 public constructor (
103 protected readonly numberOfWorkers: number,
104 protected readonly filePath: string,
105 protected readonly opts: PoolOptions<Worker>
106 ) {
107 if (!this.isMain()) {
108 throw new Error('Cannot start a pool from a worker!')
109 }
110 this.checkNumberOfWorkers(this.numberOfWorkers)
111 this.checkFilePath(this.filePath)
112 this.checkPoolOptions(this.opts)
113
114 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
115 this.executeTask = this.executeTask.bind(this)
116 this.enqueueTask = this.enqueueTask.bind(this)
117 this.dequeueTask = this.dequeueTask.bind(this)
118 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
119
120 if (this.opts.enableEvents === true) {
121 this.emitter = new PoolEmitter()
122 }
123 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
124 Worker,
125 Data,
126 Response
127 >(
128 this,
129 this.opts.workerChoiceStrategy,
130 this.opts.workerChoiceStrategyOptions
131 )
132
133 this.setupHook()
134
135 this.starting = true
136 this.startPool()
137 this.starting = false
138
139 this.startTimestamp = performance.now()
140 }
141
142 private checkFilePath (filePath: string): void {
143 if (
144 filePath == null ||
145 typeof filePath !== 'string' ||
146 (typeof filePath === 'string' && filePath.trim().length === 0)
147 ) {
148 throw new Error('Please specify a file with a worker implementation')
149 }
150 if (!existsSync(filePath)) {
151 throw new Error(`Cannot find the worker file '${filePath}'`)
152 }
153 }
154
155 private checkNumberOfWorkers (numberOfWorkers: number): void {
156 if (numberOfWorkers == null) {
157 throw new Error(
158 'Cannot instantiate a pool without specifying the number of workers'
159 )
160 } else if (!Number.isSafeInteger(numberOfWorkers)) {
161 throw new TypeError(
162 'Cannot instantiate a pool with a non safe integer number of workers'
163 )
164 } else if (numberOfWorkers < 0) {
165 throw new RangeError(
166 'Cannot instantiate a pool with a negative number of workers'
167 )
168 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
169 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
170 }
171 }
172
173 protected checkDynamicPoolSize (min: number, max: number): void {
174 if (this.type === PoolTypes.dynamic) {
175 if (max == null) {
176 throw new Error(
177 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
178 )
179 } else if (!Number.isSafeInteger(max)) {
180 throw new TypeError(
181 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
182 )
183 } else if (min > max) {
184 throw new RangeError(
185 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
186 )
187 } else if (max === 0) {
188 throw new RangeError(
189 'Cannot instantiate a dynamic pool with a pool size equal to zero'
190 )
191 } else if (min === max) {
192 throw new RangeError(
193 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
194 )
195 }
196 }
197 }
198
199 private checkPoolOptions (opts: PoolOptions<Worker>): void {
200 if (isPlainObject(opts)) {
201 this.opts.workerChoiceStrategy =
202 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
203 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
204 this.opts.workerChoiceStrategyOptions =
205 opts.workerChoiceStrategyOptions ??
206 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
207 this.checkValidWorkerChoiceStrategyOptions(
208 this.opts.workerChoiceStrategyOptions
209 )
210 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
211 this.opts.enableEvents = opts.enableEvents ?? true
212 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
213 if (this.opts.enableTasksQueue) {
214 this.checkValidTasksQueueOptions(
215 opts.tasksQueueOptions as TasksQueueOptions
216 )
217 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
218 opts.tasksQueueOptions as TasksQueueOptions
219 )
220 }
221 } else {
222 throw new TypeError('Invalid pool options: must be a plain object')
223 }
224 }
225
226 private checkValidWorkerChoiceStrategy (
227 workerChoiceStrategy: WorkerChoiceStrategy
228 ): void {
229 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
230 throw new Error(
231 `Invalid worker choice strategy '${workerChoiceStrategy}'`
232 )
233 }
234 }
235
236 private checkValidWorkerChoiceStrategyOptions (
237 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
238 ): void {
239 if (!isPlainObject(workerChoiceStrategyOptions)) {
240 throw new TypeError(
241 'Invalid worker choice strategy options: must be a plain object'
242 )
243 }
244 if (
245 workerChoiceStrategyOptions.weights != null &&
246 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
247 ) {
248 throw new Error(
249 'Invalid worker choice strategy options: must have a weight for each worker node'
250 )
251 }
252 if (
253 workerChoiceStrategyOptions.measurement != null &&
254 !Object.values(Measurements).includes(
255 workerChoiceStrategyOptions.measurement
256 )
257 ) {
258 throw new Error(
259 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
260 )
261 }
262 }
263
264 private checkValidTasksQueueOptions (
265 tasksQueueOptions: TasksQueueOptions
266 ): void {
267 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
268 throw new TypeError('Invalid tasks queue options: must be a plain object')
269 }
270 if (
271 tasksQueueOptions?.concurrency != null &&
272 !Number.isSafeInteger(tasksQueueOptions.concurrency)
273 ) {
274 throw new TypeError(
275 'Invalid worker tasks concurrency: must be an integer'
276 )
277 }
278 if (
279 tasksQueueOptions?.concurrency != null &&
280 tasksQueueOptions.concurrency <= 0
281 ) {
282 throw new Error(
283 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
284 )
285 }
286 }
287
288 private startPool (): void {
289 while (
290 this.workerNodes.reduce(
291 (accumulator, workerNode) =>
292 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
293 0
294 ) < this.numberOfWorkers
295 ) {
296 this.createAndSetupWorkerNode()
297 }
298 }
299
300 /** @inheritDoc */
301 public get info (): PoolInfo {
302 return {
303 version,
304 type: this.type,
305 worker: this.worker,
306 ready: this.ready,
307 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
308 minSize: this.minSize,
309 maxSize: this.maxSize,
310 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
311 .runTime.aggregate &&
312 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
313 .waitTime.aggregate && { utilization: round(this.utilization) }),
314 workerNodes: this.workerNodes.length,
315 idleWorkerNodes: this.workerNodes.reduce(
316 (accumulator, workerNode) =>
317 workerNode.usage.tasks.executing === 0
318 ? accumulator + 1
319 : accumulator,
320 0
321 ),
322 busyWorkerNodes: this.workerNodes.reduce(
323 (accumulator, workerNode) =>
324 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
325 0
326 ),
327 executedTasks: this.workerNodes.reduce(
328 (accumulator, workerNode) =>
329 accumulator + workerNode.usage.tasks.executed,
330 0
331 ),
332 executingTasks: this.workerNodes.reduce(
333 (accumulator, workerNode) =>
334 accumulator + workerNode.usage.tasks.executing,
335 0
336 ),
337 queuedTasks: this.workerNodes.reduce(
338 (accumulator, workerNode) =>
339 accumulator + workerNode.usage.tasks.queued,
340 0
341 ),
342 maxQueuedTasks: this.workerNodes.reduce(
343 (accumulator, workerNode) =>
344 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
345 0
346 ),
347 failedTasks: this.workerNodes.reduce(
348 (accumulator, workerNode) =>
349 accumulator + workerNode.usage.tasks.failed,
350 0
351 ),
352 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
353 .runTime.aggregate && {
354 runTime: {
355 minimum: round(
356 Math.min(
357 ...this.workerNodes.map(
358 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
359 )
360 )
361 ),
362 maximum: round(
363 Math.max(
364 ...this.workerNodes.map(
365 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
366 )
367 )
368 ),
369 average: round(
370 this.workerNodes.reduce(
371 (accumulator, workerNode) =>
372 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
373 0
374 ) /
375 this.workerNodes.reduce(
376 (accumulator, workerNode) =>
377 accumulator + (workerNode.usage.tasks?.executed ?? 0),
378 0
379 )
380 ),
381 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
382 .runTime.median && {
383 median: round(
384 median(
385 this.workerNodes.map(
386 workerNode => workerNode.usage.runTime?.median ?? 0
387 )
388 )
389 )
390 })
391 }
392 }),
393 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
394 .waitTime.aggregate && {
395 waitTime: {
396 minimum: round(
397 Math.min(
398 ...this.workerNodes.map(
399 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
400 )
401 )
402 ),
403 maximum: round(
404 Math.max(
405 ...this.workerNodes.map(
406 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
407 )
408 )
409 ),
410 average: round(
411 this.workerNodes.reduce(
412 (accumulator, workerNode) =>
413 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
414 0
415 ) /
416 this.workerNodes.reduce(
417 (accumulator, workerNode) =>
418 accumulator + (workerNode.usage.tasks?.executed ?? 0),
419 0
420 )
421 ),
422 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
423 .waitTime.median && {
424 median: round(
425 median(
426 this.workerNodes.map(
427 workerNode => workerNode.usage.waitTime?.median ?? 0
428 )
429 )
430 )
431 })
432 }
433 })
434 }
435 }
436
437 /**
438 * The pool readiness boolean status.
439 */
440 private get ready (): boolean {
441 return (
442 this.workerNodes.reduce(
443 (accumulator, workerNode) =>
444 !workerNode.info.dynamic && workerNode.info.ready
445 ? accumulator + 1
446 : accumulator,
447 0
448 ) >= this.minSize
449 )
450 }
451
452 /**
453 * The approximate pool utilization.
454 *
455 * @returns The pool utilization.
456 */
457 private get utilization (): number {
458 const poolTimeCapacity =
459 (performance.now() - this.startTimestamp) * this.maxSize
460 const totalTasksRunTime = this.workerNodes.reduce(
461 (accumulator, workerNode) =>
462 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
463 0
464 )
465 const totalTasksWaitTime = this.workerNodes.reduce(
466 (accumulator, workerNode) =>
467 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
468 0
469 )
470 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
471 }
472
473 /**
474 * The pool type.
475 *
476 * If it is `'dynamic'`, it provides the `max` property.
477 */
478 protected abstract get type (): PoolType
479
480 /**
481 * The worker type.
482 */
483 protected abstract get worker (): WorkerType
484
485 /**
486 * The pool minimum size.
487 */
488 protected abstract get minSize (): number
489
490 /**
491 * The pool maximum size.
492 */
493 protected abstract get maxSize (): number
494
495 /**
496 * Checks if the worker id sent in the received message from a worker is valid.
497 *
498 * @param message - The received message.
499 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
500 */
501 private checkMessageWorkerId (message: MessageValue<Response>): void {
502 if (
503 message.workerId != null &&
504 this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
505 ) {
506 throw new Error(
507 `Worker message received from unknown worker '${message.workerId}'`
508 )
509 }
510 }
511
512 /**
513 * Gets the given worker its worker node key.
514 *
515 * @param worker - The worker.
516 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
517 */
518 private getWorkerNodeKeyByWorker (worker: Worker): number {
519 return this.workerNodes.findIndex(
520 workerNode => workerNode.worker === worker
521 )
522 }
523
524 /**
525 * Gets the worker node key given its worker id.
526 *
527 * @param workerId - The worker id.
528 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
529 */
530 private getWorkerNodeKeyByWorkerId (workerId: number): number {
531 return this.workerNodes.findIndex(
532 workerNode => workerNode.info.id === workerId
533 )
534 }
535
536 /** @inheritDoc */
537 public setWorkerChoiceStrategy (
538 workerChoiceStrategy: WorkerChoiceStrategy,
539 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
540 ): void {
541 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
542 this.opts.workerChoiceStrategy = workerChoiceStrategy
543 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
544 this.opts.workerChoiceStrategy
545 )
546 if (workerChoiceStrategyOptions != null) {
547 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
548 }
549 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
550 workerNode.resetUsage()
551 this.sendWorkerStatisticsMessageToWorker(workerNodeKey)
552 }
553 }
554
555 /** @inheritDoc */
556 public setWorkerChoiceStrategyOptions (
557 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
558 ): void {
559 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
560 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
561 this.workerChoiceStrategyContext.setOptions(
562 this.opts.workerChoiceStrategyOptions
563 )
564 }
565
566 /** @inheritDoc */
567 public enableTasksQueue (
568 enable: boolean,
569 tasksQueueOptions?: TasksQueueOptions
570 ): void {
571 if (this.opts.enableTasksQueue === true && !enable) {
572 this.flushTasksQueues()
573 }
574 this.opts.enableTasksQueue = enable
575 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
576 }
577
578 /** @inheritDoc */
579 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
580 if (this.opts.enableTasksQueue === true) {
581 this.checkValidTasksQueueOptions(tasksQueueOptions)
582 this.opts.tasksQueueOptions =
583 this.buildTasksQueueOptions(tasksQueueOptions)
584 } else if (this.opts.tasksQueueOptions != null) {
585 delete this.opts.tasksQueueOptions
586 }
587 }
588
589 private buildTasksQueueOptions (
590 tasksQueueOptions: TasksQueueOptions
591 ): TasksQueueOptions {
592 return {
593 concurrency: tasksQueueOptions?.concurrency ?? 1
594 }
595 }
596
597 /**
598 * Whether the pool is full or not.
599 *
600 * The pool filling boolean status.
601 */
602 protected get full (): boolean {
603 return this.workerNodes.length >= this.maxSize
604 }
605
606 /**
607 * Whether the pool is busy or not.
608 *
609 * The pool busyness boolean status.
610 */
611 protected abstract get busy (): boolean
612
613 /**
614 * Whether worker nodes are executing at least one task.
615 *
616 * @returns Worker nodes busyness boolean status.
617 */
618 protected internalBusy (): boolean {
619 return (
620 this.workerNodes.findIndex(
621 workerNode =>
622 workerNode.info.ready && workerNode.usage.tasks.executing === 0
623 ) === -1
624 )
625 }
626
627 /** @inheritDoc */
628 public async execute (data?: Data, name?: string): Promise<Response> {
629 return await new Promise<Response>((resolve, reject) => {
630 const timestamp = performance.now()
631 const workerNodeKey = this.chooseWorkerNode()
632 const task: Task<Data> = {
633 name: name ?? DEFAULT_TASK_NAME,
634 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
635 data: data ?? ({} as Data),
636 timestamp,
637 workerId: this.getWorkerInfo(workerNodeKey).id as number,
638 id: randomUUID()
639 }
640 this.promiseResponseMap.set(task.id as string, {
641 resolve,
642 reject,
643 workerNodeKey
644 })
645 if (
646 this.opts.enableTasksQueue === false ||
647 (this.opts.enableTasksQueue === true &&
648 this.workerNodes[workerNodeKey].usage.tasks.executing <
649 (this.opts.tasksQueueOptions?.concurrency as number))
650 ) {
651 this.executeTask(workerNodeKey, task)
652 } else {
653 this.enqueueTask(workerNodeKey, task)
654 }
655 this.checkAndEmitEvents()
656 })
657 }
658
659 /** @inheritDoc */
660 public async destroy (): Promise<void> {
661 await Promise.all(
662 this.workerNodes.map(async (_, workerNodeKey) => {
663 await this.destroyWorkerNode(workerNodeKey)
664 })
665 )
666 }
667
668 /**
669 * Terminates the worker node given its worker node key.
670 *
671 * @param workerNodeKey - The worker node key.
672 */
673 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
674
675 /**
676 * Setup hook to execute code before worker nodes are created in the abstract constructor.
677 * Can be overridden.
678 *
679 * @virtual
680 */
681 protected setupHook (): void {
682 // Intentionally empty
683 }
684
685 /**
686 * Should return whether the worker is the main worker or not.
687 */
688 protected abstract isMain (): boolean
689
690 /**
691 * Hook executed before the worker task execution.
692 * Can be overridden.
693 *
694 * @param workerNodeKey - The worker node key.
695 * @param task - The task to execute.
696 */
697 protected beforeTaskExecutionHook (
698 workerNodeKey: number,
699 task: Task<Data>
700 ): void {
701 const workerUsage = this.workerNodes[workerNodeKey].usage
702 ++workerUsage.tasks.executing
703 this.updateWaitTimeWorkerUsage(workerUsage, task)
704 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
705 task.name as string
706 ) as WorkerUsage
707 ++taskWorkerUsage.tasks.executing
708 this.updateWaitTimeWorkerUsage(taskWorkerUsage, task)
709 }
710
711 /**
712 * Hook executed after the worker task execution.
713 * Can be overridden.
714 *
715 * @param workerNodeKey - The worker node key.
716 * @param message - The received message.
717 */
718 protected afterTaskExecutionHook (
719 workerNodeKey: number,
720 message: MessageValue<Response>
721 ): void {
722 const workerUsage = this.workerNodes[workerNodeKey].usage
723 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
724 this.updateRunTimeWorkerUsage(workerUsage, message)
725 this.updateEluWorkerUsage(workerUsage, message)
726 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
727 message.taskPerformance?.name ?? DEFAULT_TASK_NAME
728 ) as WorkerUsage
729 this.updateTaskStatisticsWorkerUsage(taskWorkerUsage, message)
730 this.updateRunTimeWorkerUsage(taskWorkerUsage, message)
731 this.updateEluWorkerUsage(taskWorkerUsage, message)
732 }
733
734 private updateTaskStatisticsWorkerUsage (
735 workerUsage: WorkerUsage,
736 message: MessageValue<Response>
737 ): void {
738 const workerTaskStatistics = workerUsage.tasks
739 --workerTaskStatistics.executing
740 if (message.taskError == null) {
741 ++workerTaskStatistics.executed
742 } else {
743 ++workerTaskStatistics.failed
744 }
745 }
746
747 private updateRunTimeWorkerUsage (
748 workerUsage: WorkerUsage,
749 message: MessageValue<Response>
750 ): void {
751 updateMeasurementStatistics(
752 workerUsage.runTime,
753 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
754 message.taskPerformance?.runTime ?? 0,
755 workerUsage.tasks.executed
756 )
757 }
758
759 private updateWaitTimeWorkerUsage (
760 workerUsage: WorkerUsage,
761 task: Task<Data>
762 ): void {
763 const timestamp = performance.now()
764 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
765 updateMeasurementStatistics(
766 workerUsage.waitTime,
767 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
768 taskWaitTime,
769 workerUsage.tasks.executed
770 )
771 }
772
773 private updateEluWorkerUsage (
774 workerUsage: WorkerUsage,
775 message: MessageValue<Response>
776 ): void {
777 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
778 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
779 updateMeasurementStatistics(
780 workerUsage.elu.active,
781 eluTaskStatisticsRequirements,
782 message.taskPerformance?.elu?.active ?? 0,
783 workerUsage.tasks.executed
784 )
785 updateMeasurementStatistics(
786 workerUsage.elu.idle,
787 eluTaskStatisticsRequirements,
788 message.taskPerformance?.elu?.idle ?? 0,
789 workerUsage.tasks.executed
790 )
791 if (eluTaskStatisticsRequirements.aggregate) {
792 if (message.taskPerformance?.elu != null) {
793 if (workerUsage.elu.utilization != null) {
794 workerUsage.elu.utilization =
795 (workerUsage.elu.utilization +
796 message.taskPerformance.elu.utilization) /
797 2
798 } else {
799 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
800 }
801 }
802 }
803 }
804
805 /**
806 * Chooses a worker node for the next task.
807 *
808 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
809 *
810 * @returns The chosen worker node key
811 */
812 private chooseWorkerNode (): number {
813 if (this.shallCreateDynamicWorker()) {
814 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
815 if (
816 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
817 ) {
818 return workerNodeKey
819 }
820 }
821 return this.workerChoiceStrategyContext.execute()
822 }
823
824 /**
825 * Conditions for dynamic worker creation.
826 *
827 * @returns Whether to create a dynamic worker or not.
828 */
829 private shallCreateDynamicWorker (): boolean {
830 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
831 }
832
833 /**
834 * Sends a message to worker given its worker node key.
835 *
836 * @param workerNodeKey - The worker node key.
837 * @param message - The message.
838 */
839 protected abstract sendToWorker (
840 workerNodeKey: number,
841 message: MessageValue<Data>
842 ): void
843
844 /**
845 * Creates a new worker.
846 *
847 * @returns Newly created worker.
848 */
849 protected abstract createWorker (): Worker
850
851 /**
852 * Creates a new, completely set up worker node.
853 *
854 * @returns New, completely set up worker node key.
855 */
856 protected createAndSetupWorkerNode (): number {
857 const worker = this.createWorker()
858
859 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
860 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
861 worker.on('error', error => {
862 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
863 const workerInfo = this.getWorkerInfo(workerNodeKey)
864 workerInfo.ready = false
865 this.workerNodes[workerNodeKey].closeChannel()
866 this.emitter?.emit(PoolEvents.error, error)
867 if (this.opts.restartWorkerOnError === true && !this.starting) {
868 if (workerInfo.dynamic) {
869 this.createAndSetupDynamicWorkerNode()
870 } else {
871 this.createAndSetupWorkerNode()
872 }
873 }
874 if (this.opts.enableTasksQueue === true) {
875 this.redistributeQueuedTasks(workerNodeKey)
876 }
877 })
878 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
879 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
880 worker.once('exit', () => {
881 this.removeWorkerNode(worker)
882 })
883
884 const workerNodeKey = this.addWorkerNode(worker)
885
886 this.afterWorkerNodeSetup(workerNodeKey)
887
888 return workerNodeKey
889 }
890
891 /**
892 * Creates a new, completely set up dynamic worker node.
893 *
894 * @returns New, completely set up dynamic worker node key.
895 */
896 protected createAndSetupDynamicWorkerNode (): number {
897 const workerNodeKey = this.createAndSetupWorkerNode()
898 this.registerWorkerMessageListener(workerNodeKey, message => {
899 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
900 message.workerId
901 )
902 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
903 // Kill message received from worker
904 if (
905 isKillBehavior(KillBehaviors.HARD, message.kill) ||
906 (message.kill != null &&
907 ((this.opts.enableTasksQueue === false &&
908 workerUsage.tasks.executing === 0) ||
909 (this.opts.enableTasksQueue === true &&
910 workerUsage.tasks.executing === 0 &&
911 this.tasksQueueSize(localWorkerNodeKey) === 0)))
912 ) {
913 this.destroyWorkerNode(localWorkerNodeKey).catch(EMPTY_FUNCTION)
914 }
915 })
916 const workerInfo = this.getWorkerInfo(workerNodeKey)
917 this.sendToWorker(workerNodeKey, {
918 checkActive: true,
919 workerId: workerInfo.id as number
920 })
921 workerInfo.dynamic = true
922 if (this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker) {
923 workerInfo.ready = true
924 }
925 return workerNodeKey
926 }
927
928 /**
929 * Registers a listener callback on the worker given its worker node key.
930 *
931 * @param workerNodeKey - The worker node key.
932 * @param listener - The message listener callback.
933 */
934 protected abstract registerWorkerMessageListener<
935 Message extends Data | Response
936 >(
937 workerNodeKey: number,
938 listener: (message: MessageValue<Message>) => void
939 ): void
940
941 /**
942 * Method hooked up after a worker node has been newly created.
943 * Can be overridden.
944 *
945 * @param workerNodeKey - The newly created worker node key.
946 */
947 protected afterWorkerNodeSetup (workerNodeKey: number): void {
948 // Listen to worker messages.
949 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
950 // Send the startup message to worker.
951 this.sendStartupMessageToWorker(workerNodeKey)
952 // Send the worker statistics message to worker.
953 this.sendWorkerStatisticsMessageToWorker(workerNodeKey)
954 }
955
956 /**
957 * Sends the startup message to worker given its worker node key.
958 *
959 * @param workerNodeKey - The worker node key.
960 */
961 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
962
963 /**
964 * Sends the worker statistics message to worker given its worker node key.
965 *
966 * @param workerNodeKey - The worker node key.
967 */
968 private sendWorkerStatisticsMessageToWorker (workerNodeKey: number): void {
969 this.sendToWorker(workerNodeKey, {
970 statistics: {
971 runTime:
972 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
973 .runTime.aggregate,
974 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
975 .elu.aggregate
976 },
977 workerId: this.getWorkerInfo(workerNodeKey).id as number
978 })
979 }
980
981 private redistributeQueuedTasks (workerNodeKey: number): void {
982 while (this.tasksQueueSize(workerNodeKey) > 0) {
983 let targetWorkerNodeKey: number = workerNodeKey
984 let minQueuedTasks = Infinity
985 let executeTask = false
986 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
987 const workerInfo = this.getWorkerInfo(workerNodeId)
988 if (
989 workerNodeId !== workerNodeKey &&
990 workerInfo.ready &&
991 workerNode.usage.tasks.queued === 0
992 ) {
993 if (
994 this.workerNodes[workerNodeId].usage.tasks.executing <
995 (this.opts.tasksQueueOptions?.concurrency as number)
996 ) {
997 executeTask = true
998 }
999 targetWorkerNodeKey = workerNodeId
1000 break
1001 }
1002 if (
1003 workerNodeId !== workerNodeKey &&
1004 workerInfo.ready &&
1005 workerNode.usage.tasks.queued < minQueuedTasks
1006 ) {
1007 minQueuedTasks = workerNode.usage.tasks.queued
1008 targetWorkerNodeKey = workerNodeId
1009 }
1010 }
1011 if (executeTask) {
1012 this.executeTask(
1013 targetWorkerNodeKey,
1014 this.dequeueTask(workerNodeKey) as Task<Data>
1015 )
1016 } else {
1017 this.enqueueTask(
1018 targetWorkerNodeKey,
1019 this.dequeueTask(workerNodeKey) as Task<Data>
1020 )
1021 }
1022 }
1023 }
1024
1025 /**
1026 * This method is the listener registered for each worker message.
1027 *
1028 * @returns The listener function to execute when a message is received from a worker.
1029 */
1030 protected workerListener (): (message: MessageValue<Response>) => void {
1031 return message => {
1032 this.checkMessageWorkerId(message)
1033 if (message.ready != null) {
1034 // Worker ready response received from worker
1035 this.handleWorkerReadyResponse(message)
1036 } else if (message.id != null) {
1037 // Task execution response received from worker
1038 this.handleTaskExecutionResponse(message)
1039 }
1040 }
1041 }
1042
1043 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
1044 this.getWorkerInfo(
1045 this.getWorkerNodeKeyByWorkerId(message.workerId)
1046 ).ready = message.ready as boolean
1047 if (this.emitter != null && this.ready) {
1048 this.emitter.emit(PoolEvents.ready, this.info)
1049 }
1050 }
1051
1052 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1053 const promiseResponse = this.promiseResponseMap.get(message.id as string)
1054 if (promiseResponse != null) {
1055 if (message.taskError != null) {
1056 this.emitter?.emit(PoolEvents.taskError, message.taskError)
1057 promiseResponse.reject(message.taskError.message)
1058 } else {
1059 promiseResponse.resolve(message.data as Response)
1060 }
1061 const workerNodeKey = promiseResponse.workerNodeKey
1062 this.afterTaskExecutionHook(workerNodeKey, message)
1063 this.promiseResponseMap.delete(message.id as string)
1064 if (
1065 this.opts.enableTasksQueue === true &&
1066 this.tasksQueueSize(workerNodeKey) > 0 &&
1067 this.workerNodes[workerNodeKey].usage.tasks.executing <
1068 (this.opts.tasksQueueOptions?.concurrency as number)
1069 ) {
1070 this.executeTask(
1071 workerNodeKey,
1072 this.dequeueTask(workerNodeKey) as Task<Data>
1073 )
1074 }
1075 this.workerChoiceStrategyContext.update(workerNodeKey)
1076 }
1077 }
1078
1079 private checkAndEmitEvents (): void {
1080 if (this.emitter != null) {
1081 if (this.busy) {
1082 this.emitter.emit(PoolEvents.busy, this.info)
1083 }
1084 if (this.type === PoolTypes.dynamic && this.full) {
1085 this.emitter.emit(PoolEvents.full, this.info)
1086 }
1087 }
1088 }
1089
1090 /**
1091 * Gets the worker information given its worker node key.
1092 *
1093 * @param workerNodeKey - The worker node key.
1094 * @returns The worker information.
1095 */
1096 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
1097 return this.workerNodes[workerNodeKey].info
1098 }
1099
1100 /**
1101 * Adds the given worker in the pool worker nodes.
1102 *
1103 * @param worker - The worker.
1104 * @returns The added worker node key.
1105 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1106 */
1107 private addWorkerNode (worker: Worker): number {
1108 const workerNode = new WorkerNode<Worker, Data>(worker, this.worker)
1109 // Flag the worker node as ready at pool startup.
1110 if (this.starting) {
1111 workerNode.info.ready = true
1112 }
1113 this.workerNodes.push(workerNode)
1114 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1115 if (workerNodeKey === -1) {
1116 throw new Error('Worker node not found')
1117 }
1118 return workerNodeKey
1119 }
1120
1121 /**
1122 * Removes the given worker from the pool worker nodes.
1123 *
1124 * @param worker - The worker.
1125 */
1126 private removeWorkerNode (worker: Worker): void {
1127 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1128 if (workerNodeKey !== -1) {
1129 this.workerNodes.splice(workerNodeKey, 1)
1130 this.workerChoiceStrategyContext.remove(workerNodeKey)
1131 }
1132 }
1133
1134 /**
1135 * Executes the given task on the worker given its worker node key.
1136 *
1137 * @param workerNodeKey - The worker node key.
1138 * @param task - The task to execute.
1139 */
1140 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1141 this.beforeTaskExecutionHook(workerNodeKey, task)
1142 this.sendToWorker(workerNodeKey, task)
1143 }
1144
1145 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1146 return this.workerNodes[workerNodeKey].enqueueTask(task)
1147 }
1148
1149 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1150 return this.workerNodes[workerNodeKey].dequeueTask()
1151 }
1152
1153 private tasksQueueSize (workerNodeKey: number): number {
1154 return this.workerNodes[workerNodeKey].tasksQueueSize()
1155 }
1156
1157 protected flushTasksQueue (workerNodeKey: number): void {
1158 while (this.tasksQueueSize(workerNodeKey) > 0) {
1159 this.executeTask(
1160 workerNodeKey,
1161 this.dequeueTask(workerNodeKey) as Task<Data>
1162 )
1163 }
1164 this.workerNodes[workerNodeKey].clearTasksQueue()
1165 }
1166
1167 private flushTasksQueues (): void {
1168 for (const [workerNodeKey] of this.workerNodes.entries()) {
1169 this.flushTasksQueue(workerNodeKey)
1170 }
1171 }
1172 }