fix: fix pool busyness semantic with task queueing enabled
[poolifier.git] / src / pools / abstract-pool.ts
1 import { randomUUID } from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import { existsSync } from 'node:fs'
4 import type {
5 MessageValue,
6 PromiseResponseWrapper,
7 Task
8 } from '../utility-types'
9 import {
10 DEFAULT_TASK_NAME,
11 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
12 EMPTY_FUNCTION,
13 isKillBehavior,
14 isPlainObject,
15 median,
16 round,
17 updateMeasurementStatistics
18 } from '../utils'
19 import { KillBehaviors } from '../worker/worker-options'
20 import {
21 type IPool,
22 PoolEmitter,
23 PoolEvents,
24 type PoolInfo,
25 type PoolOptions,
26 type PoolType,
27 PoolTypes,
28 type TasksQueueOptions
29 } from './pool'
30 import type {
31 IWorker,
32 IWorkerNode,
33 WorkerInfo,
34 WorkerType,
35 WorkerUsage
36 } from './worker'
37 import {
38 type MeasurementStatisticsRequirements,
39 Measurements,
40 WorkerChoiceStrategies,
41 type WorkerChoiceStrategy,
42 type WorkerChoiceStrategyOptions
43 } from './selection-strategies/selection-strategies-types'
44 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
45 import { version } from './version'
46 import { WorkerNode } from './worker-node'
47
48 /**
49 * Base class that implements some shared logic for all poolifier pools.
50 *
51 * @typeParam Worker - Type of worker which manages this pool.
52 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
53 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
54 */
55 export abstract class AbstractPool<
56 Worker extends IWorker,
57 Data = unknown,
58 Response = unknown
59 > implements IPool<Worker, Data, Response> {
60 /** @inheritDoc */
61 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
62
63 /** @inheritDoc */
64 public readonly emitter?: PoolEmitter
65
66 /**
67 * The task execution response promise map.
68 *
69 * - `key`: The message id of each submitted task.
70 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
71 *
72 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
73 */
74 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
75 new Map<string, PromiseResponseWrapper<Response>>()
76
77 /**
78 * Worker choice strategy context referencing a worker choice algorithm implementation.
79 */
80 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
81 Worker,
82 Data,
83 Response
84 >
85
86 /**
87 * Whether the pool is starting or not.
88 */
89 private readonly starting: boolean
90 /**
91 * The start timestamp of the pool.
92 */
93 private readonly startTimestamp
94
95 /**
96 * Constructs a new poolifier pool.
97 *
98 * @param numberOfWorkers - Number of workers that this pool should manage.
99 * @param filePath - Path to the worker file.
100 * @param opts - Options for the pool.
101 */
102 public constructor (
103 protected readonly numberOfWorkers: number,
104 protected readonly filePath: string,
105 protected readonly opts: PoolOptions<Worker>
106 ) {
107 if (!this.isMain()) {
108 throw new Error('Cannot start a pool from a worker!')
109 }
110 this.checkNumberOfWorkers(this.numberOfWorkers)
111 this.checkFilePath(this.filePath)
112 this.checkPoolOptions(this.opts)
113
114 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
115 this.executeTask = this.executeTask.bind(this)
116 this.enqueueTask = this.enqueueTask.bind(this)
117 this.dequeueTask = this.dequeueTask.bind(this)
118 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
119
120 if (this.opts.enableEvents === true) {
121 this.emitter = new PoolEmitter()
122 }
123 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
124 Worker,
125 Data,
126 Response
127 >(
128 this,
129 this.opts.workerChoiceStrategy,
130 this.opts.workerChoiceStrategyOptions
131 )
132
133 this.setupHook()
134
135 this.starting = true
136 this.startPool()
137 this.starting = false
138
139 this.startTimestamp = performance.now()
140 }
141
142 private checkFilePath (filePath: string): void {
143 if (
144 filePath == null ||
145 typeof filePath !== 'string' ||
146 (typeof filePath === 'string' && filePath.trim().length === 0)
147 ) {
148 throw new Error('Please specify a file with a worker implementation')
149 }
150 if (!existsSync(filePath)) {
151 throw new Error(`Cannot find the worker file '${filePath}'`)
152 }
153 }
154
155 private checkNumberOfWorkers (numberOfWorkers: number): void {
156 if (numberOfWorkers == null) {
157 throw new Error(
158 'Cannot instantiate a pool without specifying the number of workers'
159 )
160 } else if (!Number.isSafeInteger(numberOfWorkers)) {
161 throw new TypeError(
162 'Cannot instantiate a pool with a non safe integer number of workers'
163 )
164 } else if (numberOfWorkers < 0) {
165 throw new RangeError(
166 'Cannot instantiate a pool with a negative number of workers'
167 )
168 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
169 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
170 }
171 }
172
173 protected checkDynamicPoolSize (min: number, max: number): void {
174 if (this.type === PoolTypes.dynamic) {
175 if (max == null) {
176 throw new Error(
177 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
178 )
179 } else if (!Number.isSafeInteger(max)) {
180 throw new TypeError(
181 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
182 )
183 } else if (min > max) {
184 throw new RangeError(
185 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
186 )
187 } else if (max === 0) {
188 throw new RangeError(
189 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
190 )
191 } else if (min === max) {
192 throw new RangeError(
193 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
194 )
195 }
196 }
197 }
198
199 private checkPoolOptions (opts: PoolOptions<Worker>): void {
200 if (isPlainObject(opts)) {
201 this.opts.workerChoiceStrategy =
202 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
203 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
204 this.opts.workerChoiceStrategyOptions =
205 opts.workerChoiceStrategyOptions ??
206 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
207 this.checkValidWorkerChoiceStrategyOptions(
208 this.opts.workerChoiceStrategyOptions
209 )
210 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
211 this.opts.enableEvents = opts.enableEvents ?? true
212 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
213 if (this.opts.enableTasksQueue) {
214 this.checkValidTasksQueueOptions(
215 opts.tasksQueueOptions as TasksQueueOptions
216 )
217 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
218 opts.tasksQueueOptions as TasksQueueOptions
219 )
220 }
221 } else {
222 throw new TypeError('Invalid pool options: must be a plain object')
223 }
224 }
225
226 private checkValidWorkerChoiceStrategy (
227 workerChoiceStrategy: WorkerChoiceStrategy
228 ): void {
229 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
230 throw new Error(
231 `Invalid worker choice strategy '${workerChoiceStrategy}'`
232 )
233 }
234 }
235
236 private checkValidWorkerChoiceStrategyOptions (
237 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
238 ): void {
239 if (!isPlainObject(workerChoiceStrategyOptions)) {
240 throw new TypeError(
241 'Invalid worker choice strategy options: must be a plain object'
242 )
243 }
244 if (
245 workerChoiceStrategyOptions.weights != null &&
246 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
247 ) {
248 throw new Error(
249 'Invalid worker choice strategy options: must have a weight for each worker node'
250 )
251 }
252 if (
253 workerChoiceStrategyOptions.measurement != null &&
254 !Object.values(Measurements).includes(
255 workerChoiceStrategyOptions.measurement
256 )
257 ) {
258 throw new Error(
259 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
260 )
261 }
262 }
263
264 private checkValidTasksQueueOptions (
265 tasksQueueOptions: TasksQueueOptions
266 ): void {
267 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
268 throw new TypeError('Invalid tasks queue options: must be a plain object')
269 }
270 if (
271 tasksQueueOptions?.concurrency != null &&
272 !Number.isSafeInteger(tasksQueueOptions.concurrency)
273 ) {
274 throw new TypeError(
275 'Invalid worker tasks concurrency: must be an integer'
276 )
277 }
278 if (
279 tasksQueueOptions?.concurrency != null &&
280 tasksQueueOptions.concurrency <= 0
281 ) {
282 throw new Error(
283 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
284 )
285 }
286 }
287
288 private startPool (): void {
289 while (
290 this.workerNodes.reduce(
291 (accumulator, workerNode) =>
292 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
293 0
294 ) < this.numberOfWorkers
295 ) {
296 this.createAndSetupWorkerNode()
297 }
298 }
299
300 /** @inheritDoc */
301 public get info (): PoolInfo {
302 return {
303 version,
304 type: this.type,
305 worker: this.worker,
306 ready: this.ready,
307 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
308 minSize: this.minSize,
309 maxSize: this.maxSize,
310 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
311 .runTime.aggregate &&
312 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
313 .waitTime.aggregate && { utilization: round(this.utilization) }),
314 workerNodes: this.workerNodes.length,
315 idleWorkerNodes: this.workerNodes.reduce(
316 (accumulator, workerNode) =>
317 workerNode.usage.tasks.executing === 0
318 ? accumulator + 1
319 : accumulator,
320 0
321 ),
322 busyWorkerNodes: this.workerNodes.reduce(
323 (accumulator, workerNode) =>
324 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
325 0
326 ),
327 executedTasks: this.workerNodes.reduce(
328 (accumulator, workerNode) =>
329 accumulator + workerNode.usage.tasks.executed,
330 0
331 ),
332 executingTasks: this.workerNodes.reduce(
333 (accumulator, workerNode) =>
334 accumulator + workerNode.usage.tasks.executing,
335 0
336 ),
337 ...(this.opts.enableTasksQueue === true && {
338 queuedTasks: this.workerNodes.reduce(
339 (accumulator, workerNode) =>
340 accumulator + workerNode.usage.tasks.queued,
341 0
342 )
343 }),
344 ...(this.opts.enableTasksQueue === true && {
345 maxQueuedTasks: this.workerNodes.reduce(
346 (accumulator, workerNode) =>
347 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
348 0
349 )
350 }),
351 failedTasks: this.workerNodes.reduce(
352 (accumulator, workerNode) =>
353 accumulator + workerNode.usage.tasks.failed,
354 0
355 ),
356 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
357 .runTime.aggregate && {
358 runTime: {
359 minimum: round(
360 Math.min(
361 ...this.workerNodes.map(
362 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
363 )
364 )
365 ),
366 maximum: round(
367 Math.max(
368 ...this.workerNodes.map(
369 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
370 )
371 )
372 ),
373 average: round(
374 this.workerNodes.reduce(
375 (accumulator, workerNode) =>
376 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
377 0
378 ) /
379 this.workerNodes.reduce(
380 (accumulator, workerNode) =>
381 accumulator + (workerNode.usage.tasks?.executed ?? 0),
382 0
383 )
384 ),
385 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
386 .runTime.median && {
387 median: round(
388 median(
389 this.workerNodes.map(
390 workerNode => workerNode.usage.runTime?.median ?? 0
391 )
392 )
393 )
394 })
395 }
396 }),
397 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
398 .waitTime.aggregate && {
399 waitTime: {
400 minimum: round(
401 Math.min(
402 ...this.workerNodes.map(
403 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
404 )
405 )
406 ),
407 maximum: round(
408 Math.max(
409 ...this.workerNodes.map(
410 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
411 )
412 )
413 ),
414 average: round(
415 this.workerNodes.reduce(
416 (accumulator, workerNode) =>
417 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
418 0
419 ) /
420 this.workerNodes.reduce(
421 (accumulator, workerNode) =>
422 accumulator + (workerNode.usage.tasks?.executed ?? 0),
423 0
424 )
425 ),
426 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
427 .waitTime.median && {
428 median: round(
429 median(
430 this.workerNodes.map(
431 workerNode => workerNode.usage.waitTime?.median ?? 0
432 )
433 )
434 )
435 })
436 }
437 })
438 }
439 }
440
441 /**
442 * The pool readiness boolean status.
443 */
444 private get ready (): boolean {
445 return (
446 this.workerNodes.reduce(
447 (accumulator, workerNode) =>
448 !workerNode.info.dynamic && workerNode.info.ready
449 ? accumulator + 1
450 : accumulator,
451 0
452 ) >= this.minSize
453 )
454 }
455
456 /**
457 * The approximate pool utilization.
458 *
459 * @returns The pool utilization.
460 */
461 private get utilization (): number {
462 const poolTimeCapacity =
463 (performance.now() - this.startTimestamp) * this.maxSize
464 const totalTasksRunTime = this.workerNodes.reduce(
465 (accumulator, workerNode) =>
466 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
467 0
468 )
469 const totalTasksWaitTime = this.workerNodes.reduce(
470 (accumulator, workerNode) =>
471 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
472 0
473 )
474 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
475 }
476
477 /**
478 * The pool type.
479 *
480 * If it is `'dynamic'`, it provides the `max` property.
481 */
482 protected abstract get type (): PoolType
483
484 /**
485 * The worker type.
486 */
487 protected abstract get worker (): WorkerType
488
489 /**
490 * The pool minimum size.
491 */
492 protected abstract get minSize (): number
493
494 /**
495 * The pool maximum size.
496 */
497 protected abstract get maxSize (): number
498
499 /**
500 * Checks if the worker id sent in the received message from a worker is valid.
501 *
502 * @param message - The received message.
503 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
504 */
505 private checkMessageWorkerId (message: MessageValue<Response>): void {
506 if (
507 message.workerId != null &&
508 this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
509 ) {
510 throw new Error(
511 `Worker message received from unknown worker '${message.workerId}'`
512 )
513 }
514 }
515
516 /**
517 * Gets the given worker its worker node key.
518 *
519 * @param worker - The worker.
520 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
521 */
522 private getWorkerNodeKeyByWorker (worker: Worker): number {
523 return this.workerNodes.findIndex(
524 workerNode => workerNode.worker === worker
525 )
526 }
527
528 /**
529 * Gets the worker node key given its worker id.
530 *
531 * @param workerId - The worker id.
532 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
533 */
534 private getWorkerNodeKeyByWorkerId (workerId: number): number {
535 return this.workerNodes.findIndex(
536 workerNode => workerNode.info.id === workerId
537 )
538 }
539
540 /** @inheritDoc */
541 public setWorkerChoiceStrategy (
542 workerChoiceStrategy: WorkerChoiceStrategy,
543 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
544 ): void {
545 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
546 this.opts.workerChoiceStrategy = workerChoiceStrategy
547 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
548 this.opts.workerChoiceStrategy
549 )
550 if (workerChoiceStrategyOptions != null) {
551 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
552 }
553 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
554 workerNode.resetUsage()
555 this.sendWorkerStatisticsMessageToWorker(workerNodeKey)
556 }
557 }
558
559 /** @inheritDoc */
560 public setWorkerChoiceStrategyOptions (
561 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
562 ): void {
563 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
564 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
565 this.workerChoiceStrategyContext.setOptions(
566 this.opts.workerChoiceStrategyOptions
567 )
568 }
569
570 /** @inheritDoc */
571 public enableTasksQueue (
572 enable: boolean,
573 tasksQueueOptions?: TasksQueueOptions
574 ): void {
575 if (this.opts.enableTasksQueue === true && !enable) {
576 this.flushTasksQueues()
577 }
578 this.opts.enableTasksQueue = enable
579 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
580 }
581
582 /** @inheritDoc */
583 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
584 if (this.opts.enableTasksQueue === true) {
585 this.checkValidTasksQueueOptions(tasksQueueOptions)
586 this.opts.tasksQueueOptions =
587 this.buildTasksQueueOptions(tasksQueueOptions)
588 } else if (this.opts.tasksQueueOptions != null) {
589 delete this.opts.tasksQueueOptions
590 }
591 }
592
593 private buildTasksQueueOptions (
594 tasksQueueOptions: TasksQueueOptions
595 ): TasksQueueOptions {
596 return {
597 concurrency: tasksQueueOptions?.concurrency ?? 1
598 }
599 }
600
601 /**
602 * Whether the pool is full or not.
603 *
604 * The pool filling boolean status.
605 */
606 protected get full (): boolean {
607 return this.workerNodes.length >= this.maxSize
608 }
609
610 /**
611 * Whether the pool is busy or not.
612 *
613 * The pool busyness boolean status.
614 */
615 protected abstract get busy (): boolean
616
617 /**
618 * Whether worker nodes are executing concurrently their tasks quota or not.
619 *
620 * @returns Worker nodes busyness boolean status.
621 */
622 protected internalBusy (): boolean {
623 if (this.opts.enableTasksQueue === true) {
624 return (
625 this.workerNodes.findIndex(
626 workerNode =>
627 workerNode.info.ready &&
628 workerNode.usage.tasks.executing <
629 (this.opts.tasksQueueOptions?.concurrency as number)
630 ) === -1
631 )
632 } else {
633 return (
634 this.workerNodes.findIndex(
635 workerNode =>
636 workerNode.info.ready && workerNode.usage.tasks.executing === 0
637 ) === -1
638 )
639 }
640 }
641
642 /** @inheritDoc */
643 public async execute (data?: Data, name?: string): Promise<Response> {
644 return await new Promise<Response>((resolve, reject) => {
645 const timestamp = performance.now()
646 const workerNodeKey = this.chooseWorkerNode()
647 const task: Task<Data> = {
648 name: name ?? DEFAULT_TASK_NAME,
649 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
650 data: data ?? ({} as Data),
651 timestamp,
652 workerId: this.getWorkerInfo(workerNodeKey).id as number,
653 taskId: randomUUID()
654 }
655 this.promiseResponseMap.set(task.taskId as string, {
656 resolve,
657 reject,
658 workerNodeKey
659 })
660 if (
661 this.opts.enableTasksQueue === false ||
662 (this.opts.enableTasksQueue === true &&
663 this.workerNodes[workerNodeKey].usage.tasks.executing <
664 (this.opts.tasksQueueOptions?.concurrency as number))
665 ) {
666 this.executeTask(workerNodeKey, task)
667 } else {
668 this.enqueueTask(workerNodeKey, task)
669 }
670 this.checkAndEmitEvents()
671 })
672 }
673
674 /** @inheritDoc */
675 public async destroy (): Promise<void> {
676 await Promise.all(
677 this.workerNodes.map(async (_, workerNodeKey) => {
678 await this.destroyWorkerNode(workerNodeKey)
679 })
680 )
681 }
682
683 /**
684 * Terminates the worker node given its worker node key.
685 *
686 * @param workerNodeKey - The worker node key.
687 */
688 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
689
690 /**
691 * Setup hook to execute code before worker nodes are created in the abstract constructor.
692 * Can be overridden.
693 *
694 * @virtual
695 */
696 protected setupHook (): void {
697 // Intentionally empty
698 }
699
700 /**
701 * Should return whether the worker is the main worker or not.
702 */
703 protected abstract isMain (): boolean
704
705 /**
706 * Hook executed before the worker task execution.
707 * Can be overridden.
708 *
709 * @param workerNodeKey - The worker node key.
710 * @param task - The task to execute.
711 */
712 protected beforeTaskExecutionHook (
713 workerNodeKey: number,
714 task: Task<Data>
715 ): void {
716 const workerUsage = this.workerNodes[workerNodeKey].usage
717 ++workerUsage.tasks.executing
718 this.updateWaitTimeWorkerUsage(workerUsage, task)
719 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
720 task.name as string
721 ) as WorkerUsage
722 ++taskWorkerUsage.tasks.executing
723 this.updateWaitTimeWorkerUsage(taskWorkerUsage, task)
724 }
725
726 /**
727 * Hook executed after the worker task execution.
728 * Can be overridden.
729 *
730 * @param workerNodeKey - The worker node key.
731 * @param message - The received message.
732 */
733 protected afterTaskExecutionHook (
734 workerNodeKey: number,
735 message: MessageValue<Response>
736 ): void {
737 const workerUsage = this.workerNodes[workerNodeKey].usage
738 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
739 this.updateRunTimeWorkerUsage(workerUsage, message)
740 this.updateEluWorkerUsage(workerUsage, message)
741 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
742 message.taskPerformance?.name ?? DEFAULT_TASK_NAME
743 ) as WorkerUsage
744 this.updateTaskStatisticsWorkerUsage(taskWorkerUsage, message)
745 this.updateRunTimeWorkerUsage(taskWorkerUsage, message)
746 this.updateEluWorkerUsage(taskWorkerUsage, message)
747 }
748
749 private updateTaskStatisticsWorkerUsage (
750 workerUsage: WorkerUsage,
751 message: MessageValue<Response>
752 ): void {
753 const workerTaskStatistics = workerUsage.tasks
754 --workerTaskStatistics.executing
755 if (message.taskError == null) {
756 ++workerTaskStatistics.executed
757 } else {
758 ++workerTaskStatistics.failed
759 }
760 }
761
762 private updateRunTimeWorkerUsage (
763 workerUsage: WorkerUsage,
764 message: MessageValue<Response>
765 ): void {
766 updateMeasurementStatistics(
767 workerUsage.runTime,
768 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
769 message.taskPerformance?.runTime ?? 0,
770 workerUsage.tasks.executed
771 )
772 }
773
774 private updateWaitTimeWorkerUsage (
775 workerUsage: WorkerUsage,
776 task: Task<Data>
777 ): void {
778 const timestamp = performance.now()
779 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
780 updateMeasurementStatistics(
781 workerUsage.waitTime,
782 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
783 taskWaitTime,
784 workerUsage.tasks.executed
785 )
786 }
787
788 private updateEluWorkerUsage (
789 workerUsage: WorkerUsage,
790 message: MessageValue<Response>
791 ): void {
792 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
793 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
794 updateMeasurementStatistics(
795 workerUsage.elu.active,
796 eluTaskStatisticsRequirements,
797 message.taskPerformance?.elu?.active ?? 0,
798 workerUsage.tasks.executed
799 )
800 updateMeasurementStatistics(
801 workerUsage.elu.idle,
802 eluTaskStatisticsRequirements,
803 message.taskPerformance?.elu?.idle ?? 0,
804 workerUsage.tasks.executed
805 )
806 if (eluTaskStatisticsRequirements.aggregate) {
807 if (message.taskPerformance?.elu != null) {
808 if (workerUsage.elu.utilization != null) {
809 workerUsage.elu.utilization =
810 (workerUsage.elu.utilization +
811 message.taskPerformance.elu.utilization) /
812 2
813 } else {
814 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
815 }
816 }
817 }
818 }
819
820 /**
821 * Chooses a worker node for the next task.
822 *
823 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
824 *
825 * @returns The chosen worker node key
826 */
827 private chooseWorkerNode (): number {
828 if (this.shallCreateDynamicWorker()) {
829 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
830 if (
831 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
832 ) {
833 return workerNodeKey
834 }
835 }
836 return this.workerChoiceStrategyContext.execute()
837 }
838
839 /**
840 * Conditions for dynamic worker creation.
841 *
842 * @returns Whether to create a dynamic worker or not.
843 */
844 private shallCreateDynamicWorker (): boolean {
845 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
846 }
847
848 /**
849 * Sends a message to worker given its worker node key.
850 *
851 * @param workerNodeKey - The worker node key.
852 * @param message - The message.
853 */
854 protected abstract sendToWorker (
855 workerNodeKey: number,
856 message: MessageValue<Data>
857 ): void
858
859 /**
860 * Creates a new worker.
861 *
862 * @returns Newly created worker.
863 */
864 protected abstract createWorker (): Worker
865
866 /**
867 * Creates a new, completely set up worker node.
868 *
869 * @returns New, completely set up worker node key.
870 */
871 protected createAndSetupWorkerNode (): number {
872 const worker = this.createWorker()
873
874 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
875 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
876 worker.on('error', error => {
877 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
878 const workerInfo = this.getWorkerInfo(workerNodeKey)
879 workerInfo.ready = false
880 this.workerNodes[workerNodeKey].closeChannel()
881 this.emitter?.emit(PoolEvents.error, error)
882 if (this.opts.restartWorkerOnError === true && !this.starting) {
883 if (workerInfo.dynamic) {
884 this.createAndSetupDynamicWorkerNode()
885 } else {
886 this.createAndSetupWorkerNode()
887 }
888 }
889 if (this.opts.enableTasksQueue === true) {
890 this.redistributeQueuedTasks(workerNodeKey)
891 }
892 })
893 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
894 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
895 worker.once('exit', () => {
896 this.removeWorkerNode(worker)
897 })
898
899 const workerNodeKey = this.addWorkerNode(worker)
900
901 this.afterWorkerNodeSetup(workerNodeKey)
902
903 return workerNodeKey
904 }
905
906 /**
907 * Creates a new, completely set up dynamic worker node.
908 *
909 * @returns New, completely set up dynamic worker node key.
910 */
911 protected createAndSetupDynamicWorkerNode (): number {
912 const workerNodeKey = this.createAndSetupWorkerNode()
913 this.registerWorkerMessageListener(workerNodeKey, message => {
914 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
915 message.workerId
916 )
917 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
918 // Kill message received from worker
919 if (
920 isKillBehavior(KillBehaviors.HARD, message.kill) ||
921 (message.kill != null &&
922 ((this.opts.enableTasksQueue === false &&
923 workerUsage.tasks.executing === 0) ||
924 (this.opts.enableTasksQueue === true &&
925 workerUsage.tasks.executing === 0 &&
926 this.tasksQueueSize(localWorkerNodeKey) === 0)))
927 ) {
928 this.destroyWorkerNode(localWorkerNodeKey).catch(EMPTY_FUNCTION)
929 }
930 })
931 const workerInfo = this.getWorkerInfo(workerNodeKey)
932 this.sendToWorker(workerNodeKey, {
933 checkActive: true,
934 workerId: workerInfo.id as number
935 })
936 workerInfo.dynamic = true
937 if (this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker) {
938 workerInfo.ready = true
939 }
940 return workerNodeKey
941 }
942
943 /**
944 * Registers a listener callback on the worker given its worker node key.
945 *
946 * @param workerNodeKey - The worker node key.
947 * @param listener - The message listener callback.
948 */
949 protected abstract registerWorkerMessageListener<
950 Message extends Data | Response
951 >(
952 workerNodeKey: number,
953 listener: (message: MessageValue<Message>) => void
954 ): void
955
956 /**
957 * Method hooked up after a worker node has been newly created.
958 * Can be overridden.
959 *
960 * @param workerNodeKey - The newly created worker node key.
961 */
962 protected afterWorkerNodeSetup (workerNodeKey: number): void {
963 // Listen to worker messages.
964 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
965 // Send the startup message to worker.
966 this.sendStartupMessageToWorker(workerNodeKey)
967 // Send the worker statistics message to worker.
968 this.sendWorkerStatisticsMessageToWorker(workerNodeKey)
969 }
970
971 /**
972 * Sends the startup message to worker given its worker node key.
973 *
974 * @param workerNodeKey - The worker node key.
975 */
976 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
977
978 /**
979 * Sends the worker statistics message to worker given its worker node key.
980 *
981 * @param workerNodeKey - The worker node key.
982 */
983 private sendWorkerStatisticsMessageToWorker (workerNodeKey: number): void {
984 this.sendToWorker(workerNodeKey, {
985 statistics: {
986 runTime:
987 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
988 .runTime.aggregate,
989 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
990 .elu.aggregate
991 },
992 workerId: this.getWorkerInfo(workerNodeKey).id as number
993 })
994 }
995
996 private redistributeQueuedTasks (workerNodeKey: number): void {
997 while (this.tasksQueueSize(workerNodeKey) > 0) {
998 let targetWorkerNodeKey: number = workerNodeKey
999 let minQueuedTasks = Infinity
1000 let executeTask = false
1001 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
1002 const workerInfo = this.getWorkerInfo(workerNodeId)
1003 if (
1004 workerNodeId !== workerNodeKey &&
1005 workerInfo.ready &&
1006 workerNode.usage.tasks.queued === 0
1007 ) {
1008 if (
1009 this.workerNodes[workerNodeId].usage.tasks.executing <
1010 (this.opts.tasksQueueOptions?.concurrency as number)
1011 ) {
1012 executeTask = true
1013 }
1014 targetWorkerNodeKey = workerNodeId
1015 break
1016 }
1017 if (
1018 workerNodeId !== workerNodeKey &&
1019 workerInfo.ready &&
1020 workerNode.usage.tasks.queued < minQueuedTasks
1021 ) {
1022 minQueuedTasks = workerNode.usage.tasks.queued
1023 targetWorkerNodeKey = workerNodeId
1024 }
1025 }
1026 if (executeTask) {
1027 this.executeTask(
1028 targetWorkerNodeKey,
1029 this.dequeueTask(workerNodeKey) as Task<Data>
1030 )
1031 } else {
1032 this.enqueueTask(
1033 targetWorkerNodeKey,
1034 this.dequeueTask(workerNodeKey) as Task<Data>
1035 )
1036 }
1037 }
1038 }
1039
1040 /**
1041 * This method is the listener registered for each worker message.
1042 *
1043 * @returns The listener function to execute when a message is received from a worker.
1044 */
1045 protected workerListener (): (message: MessageValue<Response>) => void {
1046 return message => {
1047 this.checkMessageWorkerId(message)
1048 if (message.ready != null) {
1049 // Worker ready response received from worker
1050 this.handleWorkerReadyResponse(message)
1051 } else if (message.taskId != null) {
1052 // Task execution response received from worker
1053 this.handleTaskExecutionResponse(message)
1054 }
1055 }
1056 }
1057
1058 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
1059 this.getWorkerInfo(
1060 this.getWorkerNodeKeyByWorkerId(message.workerId)
1061 ).ready = message.ready as boolean
1062 if (this.emitter != null && this.ready) {
1063 this.emitter.emit(PoolEvents.ready, this.info)
1064 }
1065 }
1066
1067 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1068 const promiseResponse = this.promiseResponseMap.get(
1069 message.taskId as string
1070 )
1071 if (promiseResponse != null) {
1072 if (message.taskError != null) {
1073 this.emitter?.emit(PoolEvents.taskError, message.taskError)
1074 promiseResponse.reject(message.taskError.message)
1075 } else {
1076 promiseResponse.resolve(message.data as Response)
1077 }
1078 const workerNodeKey = promiseResponse.workerNodeKey
1079 this.afterTaskExecutionHook(workerNodeKey, message)
1080 this.promiseResponseMap.delete(message.taskId as string)
1081 if (
1082 this.opts.enableTasksQueue === true &&
1083 this.tasksQueueSize(workerNodeKey) > 0 &&
1084 this.workerNodes[workerNodeKey].usage.tasks.executing <
1085 (this.opts.tasksQueueOptions?.concurrency as number)
1086 ) {
1087 this.executeTask(
1088 workerNodeKey,
1089 this.dequeueTask(workerNodeKey) as Task<Data>
1090 )
1091 }
1092 this.workerChoiceStrategyContext.update(workerNodeKey)
1093 }
1094 }
1095
1096 private checkAndEmitEvents (): void {
1097 if (this.emitter != null) {
1098 if (this.busy) {
1099 this.emitter.emit(PoolEvents.busy, this.info)
1100 }
1101 if (this.type === PoolTypes.dynamic && this.full) {
1102 this.emitter.emit(PoolEvents.full, this.info)
1103 }
1104 }
1105 }
1106
1107 /**
1108 * Gets the worker information given its worker node key.
1109 *
1110 * @param workerNodeKey - The worker node key.
1111 * @returns The worker information.
1112 */
1113 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
1114 return this.workerNodes[workerNodeKey].info
1115 }
1116
1117 /**
1118 * Adds the given worker in the pool worker nodes.
1119 *
1120 * @param worker - The worker.
1121 * @returns The added worker node key.
1122 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1123 */
1124 private addWorkerNode (worker: Worker): number {
1125 const workerNode = new WorkerNode<Worker, Data>(worker, this.worker)
1126 // Flag the worker node as ready at pool startup.
1127 if (this.starting) {
1128 workerNode.info.ready = true
1129 }
1130 this.workerNodes.push(workerNode)
1131 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1132 if (workerNodeKey === -1) {
1133 throw new Error('Worker node not found')
1134 }
1135 return workerNodeKey
1136 }
1137
1138 /**
1139 * Removes the given worker from the pool worker nodes.
1140 *
1141 * @param worker - The worker.
1142 */
1143 private removeWorkerNode (worker: Worker): void {
1144 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1145 if (workerNodeKey !== -1) {
1146 this.workerNodes.splice(workerNodeKey, 1)
1147 this.workerChoiceStrategyContext.remove(workerNodeKey)
1148 }
1149 }
1150
1151 /**
1152 * Executes the given task on the worker given its worker node key.
1153 *
1154 * @param workerNodeKey - The worker node key.
1155 * @param task - The task to execute.
1156 */
1157 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1158 this.beforeTaskExecutionHook(workerNodeKey, task)
1159 this.sendToWorker(workerNodeKey, task)
1160 }
1161
1162 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1163 return this.workerNodes[workerNodeKey].enqueueTask(task)
1164 }
1165
1166 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1167 return this.workerNodes[workerNodeKey].dequeueTask()
1168 }
1169
1170 private tasksQueueSize (workerNodeKey: number): number {
1171 return this.workerNodes[workerNodeKey].tasksQueueSize()
1172 }
1173
1174 protected flushTasksQueue (workerNodeKey: number): void {
1175 while (this.tasksQueueSize(workerNodeKey) > 0) {
1176 this.executeTask(
1177 workerNodeKey,
1178 this.dequeueTask(workerNodeKey) as Task<Data>
1179 )
1180 }
1181 this.workerNodes[workerNodeKey].clearTasksQueue()
1182 }
1183
1184 private flushTasksQueues (): void {
1185 for (const [workerNodeKey] of this.workerNodes.entries()) {
1186 this.flushTasksQueue(workerNodeKey)
1187 }
1188 }
1189 }