perf: speed up dynamic worker termination
[poolifier.git] / src / pools / abstract-pool.ts
1 import { randomUUID } from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import { existsSync } from 'node:fs'
4 import type {
5 MessageValue,
6 PromiseResponseWrapper,
7 Task
8 } from '../utility-types'
9 import {
10 DEFAULT_TASK_NAME,
11 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
12 EMPTY_FUNCTION,
13 isAsyncFunction,
14 isKillBehavior,
15 isPlainObject,
16 median,
17 round,
18 updateMeasurementStatistics
19 } from '../utils'
20 import { KillBehaviors } from '../worker/worker-options'
21 import {
22 type IPool,
23 PoolEmitter,
24 PoolEvents,
25 type PoolInfo,
26 type PoolOptions,
27 type PoolType,
28 PoolTypes,
29 type TasksQueueOptions
30 } from './pool'
31 import type {
32 IWorker,
33 IWorkerNode,
34 WorkerInfo,
35 WorkerType,
36 WorkerUsage
37 } from './worker'
38 import {
39 type MeasurementStatisticsRequirements,
40 Measurements,
41 WorkerChoiceStrategies,
42 type WorkerChoiceStrategy,
43 type WorkerChoiceStrategyOptions
44 } from './selection-strategies/selection-strategies-types'
45 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
46 import { version } from './version'
47 import { WorkerNode } from './worker-node'
48
49 /**
50 * Base class that implements some shared logic for all poolifier pools.
51 *
52 * @typeParam Worker - Type of worker which manages this pool.
53 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
54 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
55 */
56 export abstract class AbstractPool<
57 Worker extends IWorker,
58 Data = unknown,
59 Response = unknown
60 > implements IPool<Worker, Data, Response> {
61 /** @inheritDoc */
62 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
63
64 /** @inheritDoc */
65 public readonly emitter?: PoolEmitter
66
67 /**
68 * The task execution response promise map.
69 *
70 * - `key`: The message id of each submitted task.
71 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
72 *
73 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
74 */
75 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
76 new Map<string, PromiseResponseWrapper<Response>>()
77
78 /**
79 * Worker choice strategy context referencing a worker choice algorithm implementation.
80 */
81 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
82 Worker,
83 Data,
84 Response
85 >
86
87 /**
88 * Whether the pool is starting or not.
89 */
90 private readonly starting: boolean
91 /**
92 * The start timestamp of the pool.
93 */
94 private readonly startTimestamp
95
96 /**
97 * Constructs a new poolifier pool.
98 *
99 * @param numberOfWorkers - Number of workers that this pool should manage.
100 * @param filePath - Path to the worker file.
101 * @param opts - Options for the pool.
102 */
103 public constructor (
104 protected readonly numberOfWorkers: number,
105 protected readonly filePath: string,
106 protected readonly opts: PoolOptions<Worker>
107 ) {
108 if (!this.isMain()) {
109 throw new Error('Cannot start a pool from a worker!')
110 }
111 this.checkNumberOfWorkers(this.numberOfWorkers)
112 this.checkFilePath(this.filePath)
113 this.checkPoolOptions(this.opts)
114
115 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
116 this.executeTask = this.executeTask.bind(this)
117 this.enqueueTask = this.enqueueTask.bind(this)
118 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
119
120 if (this.opts.enableEvents === true) {
121 this.emitter = new PoolEmitter()
122 }
123 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
124 Worker,
125 Data,
126 Response
127 >(
128 this,
129 this.opts.workerChoiceStrategy,
130 this.opts.workerChoiceStrategyOptions
131 )
132
133 this.setupHook()
134
135 this.starting = true
136 this.startPool()
137 this.starting = false
138
139 this.startTimestamp = performance.now()
140 }
141
142 private checkFilePath (filePath: string): void {
143 if (
144 filePath == null ||
145 typeof filePath !== 'string' ||
146 (typeof filePath === 'string' && filePath.trim().length === 0)
147 ) {
148 throw new Error('Please specify a file with a worker implementation')
149 }
150 if (!existsSync(filePath)) {
151 throw new Error(`Cannot find the worker file '${filePath}'`)
152 }
153 }
154
155 private checkNumberOfWorkers (numberOfWorkers: number): void {
156 if (numberOfWorkers == null) {
157 throw new Error(
158 'Cannot instantiate a pool without specifying the number of workers'
159 )
160 } else if (!Number.isSafeInteger(numberOfWorkers)) {
161 throw new TypeError(
162 'Cannot instantiate a pool with a non safe integer number of workers'
163 )
164 } else if (numberOfWorkers < 0) {
165 throw new RangeError(
166 'Cannot instantiate a pool with a negative number of workers'
167 )
168 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
169 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
170 }
171 }
172
173 protected checkDynamicPoolSize (min: number, max: number): void {
174 if (this.type === PoolTypes.dynamic) {
175 if (min > max) {
176 throw new RangeError(
177 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
178 )
179 } else if (max === 0) {
180 throw new RangeError(
181 'Cannot instantiate a dynamic pool with a pool size equal to zero'
182 )
183 } else if (min === max) {
184 throw new RangeError(
185 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
186 )
187 }
188 }
189 }
190
191 private checkPoolOptions (opts: PoolOptions<Worker>): void {
192 if (isPlainObject(opts)) {
193 this.opts.workerChoiceStrategy =
194 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
195 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
196 this.opts.workerChoiceStrategyOptions =
197 opts.workerChoiceStrategyOptions ??
198 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
199 this.checkValidWorkerChoiceStrategyOptions(
200 this.opts.workerChoiceStrategyOptions
201 )
202 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
203 this.opts.enableEvents = opts.enableEvents ?? true
204 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
205 if (this.opts.enableTasksQueue) {
206 this.checkValidTasksQueueOptions(
207 opts.tasksQueueOptions as TasksQueueOptions
208 )
209 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
210 opts.tasksQueueOptions as TasksQueueOptions
211 )
212 }
213 } else {
214 throw new TypeError('Invalid pool options: must be a plain object')
215 }
216 }
217
218 private checkValidWorkerChoiceStrategy (
219 workerChoiceStrategy: WorkerChoiceStrategy
220 ): void {
221 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
222 throw new Error(
223 `Invalid worker choice strategy '${workerChoiceStrategy}'`
224 )
225 }
226 }
227
228 private checkValidWorkerChoiceStrategyOptions (
229 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
230 ): void {
231 if (!isPlainObject(workerChoiceStrategyOptions)) {
232 throw new TypeError(
233 'Invalid worker choice strategy options: must be a plain object'
234 )
235 }
236 if (
237 workerChoiceStrategyOptions.weights != null &&
238 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
239 ) {
240 throw new Error(
241 'Invalid worker choice strategy options: must have a weight for each worker node'
242 )
243 }
244 if (
245 workerChoiceStrategyOptions.measurement != null &&
246 !Object.values(Measurements).includes(
247 workerChoiceStrategyOptions.measurement
248 )
249 ) {
250 throw new Error(
251 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
252 )
253 }
254 }
255
256 private checkValidTasksQueueOptions (
257 tasksQueueOptions: TasksQueueOptions
258 ): void {
259 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
260 throw new TypeError('Invalid tasks queue options: must be a plain object')
261 }
262 if (
263 tasksQueueOptions?.concurrency != null &&
264 !Number.isSafeInteger(tasksQueueOptions.concurrency)
265 ) {
266 throw new TypeError(
267 'Invalid worker tasks concurrency: must be an integer'
268 )
269 }
270 if (
271 tasksQueueOptions?.concurrency != null &&
272 tasksQueueOptions.concurrency <= 0
273 ) {
274 throw new Error(
275 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
276 )
277 }
278 }
279
280 private startPool (): void {
281 while (
282 this.workerNodes.reduce(
283 (accumulator, workerNode) =>
284 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
285 0
286 ) < this.numberOfWorkers
287 ) {
288 this.createAndSetupWorkerNode()
289 }
290 }
291
292 /** @inheritDoc */
293 public get info (): PoolInfo {
294 return {
295 version,
296 type: this.type,
297 worker: this.worker,
298 ready: this.ready,
299 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
300 minSize: this.minSize,
301 maxSize: this.maxSize,
302 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
303 .runTime.aggregate &&
304 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
305 .waitTime.aggregate && { utilization: round(this.utilization) }),
306 workerNodes: this.workerNodes.length,
307 idleWorkerNodes: this.workerNodes.reduce(
308 (accumulator, workerNode) =>
309 workerNode.usage.tasks.executing === 0
310 ? accumulator + 1
311 : accumulator,
312 0
313 ),
314 busyWorkerNodes: this.workerNodes.reduce(
315 (accumulator, workerNode) =>
316 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
317 0
318 ),
319 executedTasks: this.workerNodes.reduce(
320 (accumulator, workerNode) =>
321 accumulator + workerNode.usage.tasks.executed,
322 0
323 ),
324 executingTasks: this.workerNodes.reduce(
325 (accumulator, workerNode) =>
326 accumulator + workerNode.usage.tasks.executing,
327 0
328 ),
329 queuedTasks: this.workerNodes.reduce(
330 (accumulator, workerNode) =>
331 accumulator + workerNode.usage.tasks.queued,
332 0
333 ),
334 maxQueuedTasks: this.workerNodes.reduce(
335 (accumulator, workerNode) =>
336 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
337 0
338 ),
339 failedTasks: this.workerNodes.reduce(
340 (accumulator, workerNode) =>
341 accumulator + workerNode.usage.tasks.failed,
342 0
343 ),
344 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
345 .runTime.aggregate && {
346 runTime: {
347 minimum: round(
348 Math.min(
349 ...this.workerNodes.map(
350 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
351 )
352 )
353 ),
354 maximum: round(
355 Math.max(
356 ...this.workerNodes.map(
357 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
358 )
359 )
360 ),
361 average: round(
362 this.workerNodes.reduce(
363 (accumulator, workerNode) =>
364 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
365 0
366 ) /
367 this.workerNodes.reduce(
368 (accumulator, workerNode) =>
369 accumulator + (workerNode.usage.tasks?.executed ?? 0),
370 0
371 )
372 ),
373 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
374 .runTime.median && {
375 median: round(
376 median(
377 this.workerNodes.map(
378 workerNode => workerNode.usage.runTime?.median ?? 0
379 )
380 )
381 )
382 })
383 }
384 }),
385 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
386 .waitTime.aggregate && {
387 waitTime: {
388 minimum: round(
389 Math.min(
390 ...this.workerNodes.map(
391 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
392 )
393 )
394 ),
395 maximum: round(
396 Math.max(
397 ...this.workerNodes.map(
398 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
399 )
400 )
401 ),
402 average: round(
403 this.workerNodes.reduce(
404 (accumulator, workerNode) =>
405 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
406 0
407 ) /
408 this.workerNodes.reduce(
409 (accumulator, workerNode) =>
410 accumulator + (workerNode.usage.tasks?.executed ?? 0),
411 0
412 )
413 ),
414 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
415 .waitTime.median && {
416 median: round(
417 median(
418 this.workerNodes.map(
419 workerNode => workerNode.usage.waitTime?.median ?? 0
420 )
421 )
422 )
423 })
424 }
425 })
426 }
427 }
428
429 /**
430 * The pool readiness boolean status.
431 */
432 private get ready (): boolean {
433 return (
434 this.workerNodes.reduce(
435 (accumulator, workerNode) =>
436 !workerNode.info.dynamic && workerNode.info.ready
437 ? accumulator + 1
438 : accumulator,
439 0
440 ) >= this.minSize
441 )
442 }
443
444 /**
445 * The approximate pool utilization.
446 *
447 * @returns The pool utilization.
448 */
449 private get utilization (): number {
450 const poolTimeCapacity =
451 (performance.now() - this.startTimestamp) * this.maxSize
452 const totalTasksRunTime = this.workerNodes.reduce(
453 (accumulator, workerNode) =>
454 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
455 0
456 )
457 const totalTasksWaitTime = this.workerNodes.reduce(
458 (accumulator, workerNode) =>
459 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
460 0
461 )
462 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
463 }
464
465 /**
466 * The pool type.
467 *
468 * If it is `'dynamic'`, it provides the `max` property.
469 */
470 protected abstract get type (): PoolType
471
472 /**
473 * The worker type.
474 */
475 protected abstract get worker (): WorkerType
476
477 /**
478 * The pool minimum size.
479 */
480 protected abstract get minSize (): number
481
482 /**
483 * The pool maximum size.
484 */
485 protected abstract get maxSize (): number
486
487 /**
488 * Checks if the worker id sent in the received message from a worker is valid.
489 *
490 * @param message - The received message.
491 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
492 */
493 private checkMessageWorkerId (message: MessageValue<Response>): void {
494 if (
495 message.workerId != null &&
496 this.getWorkerNodeKeyByWorkerId(message.workerId) == null
497 ) {
498 throw new Error(
499 `Worker message received from unknown worker '${message.workerId}'`
500 )
501 }
502 }
503
504 /**
505 * Gets the given worker its worker node key.
506 *
507 * @param worker - The worker.
508 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
509 */
510 private getWorkerNodeKey (worker: Worker): number {
511 return this.workerNodes.findIndex(
512 workerNode => workerNode.worker === worker
513 )
514 }
515
516 /**
517 * Gets the worker node key given its worker id.
518 *
519 * @param workerId - The worker id.
520 * @returns The worker node key if the worker id is found in the pool worker nodes, `undefined` otherwise.
521 */
522 private getWorkerNodeKeyByWorkerId (workerId: number): number | undefined {
523 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
524 if (workerNode.info.id === workerId) {
525 return workerNodeKey
526 }
527 }
528 }
529
530 /** @inheritDoc */
531 public setWorkerChoiceStrategy (
532 workerChoiceStrategy: WorkerChoiceStrategy,
533 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
534 ): void {
535 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
536 this.opts.workerChoiceStrategy = workerChoiceStrategy
537 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
538 this.opts.workerChoiceStrategy
539 )
540 if (workerChoiceStrategyOptions != null) {
541 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
542 }
543 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
544 workerNode.resetUsage()
545 this.sendWorkerStatisticsMessageToWorker(workerNodeKey)
546 }
547 }
548
549 /** @inheritDoc */
550 public setWorkerChoiceStrategyOptions (
551 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
552 ): void {
553 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
554 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
555 this.workerChoiceStrategyContext.setOptions(
556 this.opts.workerChoiceStrategyOptions
557 )
558 }
559
560 /** @inheritDoc */
561 public enableTasksQueue (
562 enable: boolean,
563 tasksQueueOptions?: TasksQueueOptions
564 ): void {
565 if (this.opts.enableTasksQueue === true && !enable) {
566 this.flushTasksQueues()
567 }
568 this.opts.enableTasksQueue = enable
569 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
570 }
571
572 /** @inheritDoc */
573 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
574 if (this.opts.enableTasksQueue === true) {
575 this.checkValidTasksQueueOptions(tasksQueueOptions)
576 this.opts.tasksQueueOptions =
577 this.buildTasksQueueOptions(tasksQueueOptions)
578 } else if (this.opts.tasksQueueOptions != null) {
579 delete this.opts.tasksQueueOptions
580 }
581 }
582
583 private buildTasksQueueOptions (
584 tasksQueueOptions: TasksQueueOptions
585 ): TasksQueueOptions {
586 return {
587 concurrency: tasksQueueOptions?.concurrency ?? 1
588 }
589 }
590
591 /**
592 * Whether the pool is full or not.
593 *
594 * The pool filling boolean status.
595 */
596 protected get full (): boolean {
597 return this.workerNodes.length >= this.maxSize
598 }
599
600 /**
601 * Whether the pool is busy or not.
602 *
603 * The pool busyness boolean status.
604 */
605 protected abstract get busy (): boolean
606
607 /**
608 * Whether worker nodes are executing at least one task.
609 *
610 * @returns Worker nodes busyness boolean status.
611 */
612 protected internalBusy (): boolean {
613 return (
614 this.workerNodes.findIndex(workerNode => {
615 return workerNode.usage.tasks.executing === 0
616 }) === -1
617 )
618 }
619
620 /** @inheritDoc */
621 public async execute (data?: Data, name?: string): Promise<Response> {
622 return await new Promise<Response>((resolve, reject) => {
623 const timestamp = performance.now()
624 const workerNodeKey = this.chooseWorkerNode()
625 const task: Task<Data> = {
626 name: name ?? DEFAULT_TASK_NAME,
627 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
628 data: data ?? ({} as Data),
629 timestamp,
630 workerId: this.getWorkerInfo(workerNodeKey).id as number,
631 id: randomUUID()
632 }
633 this.promiseResponseMap.set(task.id as string, {
634 resolve,
635 reject,
636 workerNodeKey
637 })
638 if (
639 this.opts.enableTasksQueue === true &&
640 (this.busy ||
641 this.workerNodes[workerNodeKey].usage.tasks.executing >=
642 ((this.opts.tasksQueueOptions as TasksQueueOptions)
643 .concurrency as number))
644 ) {
645 this.enqueueTask(workerNodeKey, task)
646 } else {
647 this.executeTask(workerNodeKey, task)
648 }
649 this.checkAndEmitEvents()
650 })
651 }
652
653 /** @inheritDoc */
654 public async destroy (): Promise<void> {
655 await Promise.all(
656 this.workerNodes.map(async (workerNode, workerNodeKey) => {
657 this.flushTasksQueue(workerNodeKey)
658 // FIXME: wait for tasks to be finished
659 const workerExitPromise = new Promise<void>(resolve => {
660 workerNode.worker.on('exit', () => {
661 resolve()
662 })
663 })
664 await this.destroyWorkerNode(workerNodeKey)
665 await workerExitPromise
666 })
667 )
668 }
669
670 /**
671 * Terminates the worker node given its worker node key.
672 *
673 * @param workerNodeKey - The worker node key.
674 */
675 protected abstract destroyWorkerNode (
676 workerNodeKey: number
677 ): void | Promise<void>
678
679 /**
680 * Setup hook to execute code before worker nodes are created in the abstract constructor.
681 * Can be overridden.
682 *
683 * @virtual
684 */
685 protected setupHook (): void {
686 // Intentionally empty
687 }
688
689 /**
690 * Should return whether the worker is the main worker or not.
691 */
692 protected abstract isMain (): boolean
693
694 /**
695 * Hook executed before the worker task execution.
696 * Can be overridden.
697 *
698 * @param workerNodeKey - The worker node key.
699 * @param task - The task to execute.
700 */
701 protected beforeTaskExecutionHook (
702 workerNodeKey: number,
703 task: Task<Data>
704 ): void {
705 const workerUsage = this.workerNodes[workerNodeKey].usage
706 ++workerUsage.tasks.executing
707 this.updateWaitTimeWorkerUsage(workerUsage, task)
708 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
709 task.name as string
710 ) as WorkerUsage
711 ++taskWorkerUsage.tasks.executing
712 this.updateWaitTimeWorkerUsage(taskWorkerUsage, task)
713 }
714
715 /**
716 * Hook executed after the worker task execution.
717 * Can be overridden.
718 *
719 * @param workerNodeKey - The worker node key.
720 * @param message - The received message.
721 */
722 protected afterTaskExecutionHook (
723 workerNodeKey: number,
724 message: MessageValue<Response>
725 ): void {
726 const workerUsage = this.workerNodes[workerNodeKey].usage
727 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
728 this.updateRunTimeWorkerUsage(workerUsage, message)
729 this.updateEluWorkerUsage(workerUsage, message)
730 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
731 message.taskPerformance?.name ?? DEFAULT_TASK_NAME
732 ) as WorkerUsage
733 this.updateTaskStatisticsWorkerUsage(taskWorkerUsage, message)
734 this.updateRunTimeWorkerUsage(taskWorkerUsage, message)
735 this.updateEluWorkerUsage(taskWorkerUsage, message)
736 }
737
738 private updateTaskStatisticsWorkerUsage (
739 workerUsage: WorkerUsage,
740 message: MessageValue<Response>
741 ): void {
742 const workerTaskStatistics = workerUsage.tasks
743 --workerTaskStatistics.executing
744 if (message.taskError == null) {
745 ++workerTaskStatistics.executed
746 } else {
747 ++workerTaskStatistics.failed
748 }
749 }
750
751 private updateRunTimeWorkerUsage (
752 workerUsage: WorkerUsage,
753 message: MessageValue<Response>
754 ): void {
755 updateMeasurementStatistics(
756 workerUsage.runTime,
757 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
758 message.taskPerformance?.runTime ?? 0,
759 workerUsage.tasks.executed
760 )
761 }
762
763 private updateWaitTimeWorkerUsage (
764 workerUsage: WorkerUsage,
765 task: Task<Data>
766 ): void {
767 const timestamp = performance.now()
768 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
769 updateMeasurementStatistics(
770 workerUsage.waitTime,
771 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
772 taskWaitTime,
773 workerUsage.tasks.executed
774 )
775 }
776
777 private updateEluWorkerUsage (
778 workerUsage: WorkerUsage,
779 message: MessageValue<Response>
780 ): void {
781 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
782 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
783 updateMeasurementStatistics(
784 workerUsage.elu.active,
785 eluTaskStatisticsRequirements,
786 message.taskPerformance?.elu?.active ?? 0,
787 workerUsage.tasks.executed
788 )
789 updateMeasurementStatistics(
790 workerUsage.elu.idle,
791 eluTaskStatisticsRequirements,
792 message.taskPerformance?.elu?.idle ?? 0,
793 workerUsage.tasks.executed
794 )
795 if (eluTaskStatisticsRequirements.aggregate) {
796 if (message.taskPerformance?.elu != null) {
797 if (workerUsage.elu.utilization != null) {
798 workerUsage.elu.utilization =
799 (workerUsage.elu.utilization +
800 message.taskPerformance.elu.utilization) /
801 2
802 } else {
803 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
804 }
805 }
806 }
807 }
808
809 /**
810 * Chooses a worker node for the next task.
811 *
812 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
813 *
814 * @returns The chosen worker node key
815 */
816 private chooseWorkerNode (): number {
817 if (this.shallCreateDynamicWorker()) {
818 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
819 if (
820 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
821 ) {
822 return workerNodeKey
823 }
824 }
825 return this.workerChoiceStrategyContext.execute()
826 }
827
828 /**
829 * Conditions for dynamic worker creation.
830 *
831 * @returns Whether to create a dynamic worker or not.
832 */
833 private shallCreateDynamicWorker (): boolean {
834 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
835 }
836
837 /**
838 * Sends a message to worker given its worker node key.
839 *
840 * @param workerNodeKey - The worker node key.
841 * @param message - The message.
842 */
843 protected abstract sendToWorker (
844 workerNodeKey: number,
845 message: MessageValue<Data>
846 ): void
847
848 /**
849 * Creates a new worker.
850 *
851 * @returns Newly created worker.
852 */
853 protected abstract createWorker (): Worker
854
855 /**
856 * Creates a new, completely set up worker node.
857 *
858 * @returns New, completely set up worker node key.
859 */
860 protected createAndSetupWorkerNode (): number {
861 const worker = this.createWorker()
862
863 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
864 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
865 worker.on('error', error => {
866 const workerNodeKey = this.getWorkerNodeKey(worker)
867 const workerInfo = this.getWorkerInfo(workerNodeKey)
868 workerInfo.ready = false
869 this.workerNodes[workerNodeKey].closeChannel()
870 this.emitter?.emit(PoolEvents.error, error)
871 if (this.opts.restartWorkerOnError === true && !this.starting) {
872 if (workerInfo.dynamic) {
873 this.createAndSetupDynamicWorkerNode()
874 } else {
875 this.createAndSetupWorkerNode()
876 }
877 }
878 if (this.opts.enableTasksQueue === true) {
879 this.redistributeQueuedTasks(workerNodeKey)
880 }
881 })
882 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
883 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
884 worker.once('exit', () => {
885 this.removeWorkerNode(worker)
886 })
887
888 const workerNodeKey = this.addWorkerNode(worker)
889
890 this.afterWorkerNodeSetup(workerNodeKey)
891
892 return workerNodeKey
893 }
894
895 /**
896 * Creates a new, completely set up dynamic worker node.
897 *
898 * @returns New, completely set up dynamic worker node key.
899 */
900 protected createAndSetupDynamicWorkerNode (): number {
901 const workerNodeKey = this.createAndSetupWorkerNode()
902 this.registerWorkerMessageListener(workerNodeKey, message => {
903 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
904 message.workerId
905 ) as number
906 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
907 if (
908 isKillBehavior(KillBehaviors.HARD, message.kill) ||
909 (message.kill != null &&
910 ((this.opts.enableTasksQueue === false &&
911 workerUsage.tasks.executing === 0) ||
912 (this.opts.enableTasksQueue === true &&
913 workerUsage.tasks.executing === 0 &&
914 this.tasksQueueSize(localWorkerNodeKey) === 0)))
915 ) {
916 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
917 const destroyWorkerNodeBounded = this.destroyWorkerNode.bind(this)
918 if (isAsyncFunction(destroyWorkerNodeBounded)) {
919 (
920 destroyWorkerNodeBounded as (workerNodeKey: number) => Promise<void>
921 )(localWorkerNodeKey).catch(EMPTY_FUNCTION)
922 } else {
923 (destroyWorkerNodeBounded as (workerNodeKey: number) => void)(
924 localWorkerNodeKey
925 )
926 }
927 }
928 })
929 const workerInfo = this.getWorkerInfo(workerNodeKey)
930 workerInfo.dynamic = true
931 if (this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker) {
932 workerInfo.ready = true
933 }
934 this.sendToWorker(workerNodeKey, {
935 checkActive: true,
936 workerId: workerInfo.id as number
937 })
938 return workerNodeKey
939 }
940
941 /**
942 * Registers a listener callback on the worker given its worker node key.
943 *
944 * @param workerNodeKey - The worker node key.
945 * @param listener - The message listener callback.
946 */
947 protected abstract registerWorkerMessageListener<
948 Message extends Data | Response
949 >(
950 workerNodeKey: number,
951 listener: (message: MessageValue<Message>) => void
952 ): void
953
954 /**
955 * Method hooked up after a worker node has been newly created.
956 * Can be overridden.
957 *
958 * @param workerNodeKey - The newly created worker node key.
959 */
960 protected afterWorkerNodeSetup (workerNodeKey: number): void {
961 // Listen to worker messages.
962 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
963 // Send the startup message to worker.
964 this.sendStartupMessageToWorker(workerNodeKey)
965 // Send the worker statistics message to worker.
966 this.sendWorkerStatisticsMessageToWorker(workerNodeKey)
967 }
968
969 /**
970 * Sends the startup message to worker given its worker node key.
971 *
972 * @param workerNodeKey - The worker node key.
973 */
974 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
975
976 /**
977 * Sends the worker statistics message to worker given its worker node key.
978 *
979 * @param workerNodeKey - The worker node key.
980 */
981 private sendWorkerStatisticsMessageToWorker (workerNodeKey: number): void {
982 this.sendToWorker(workerNodeKey, {
983 statistics: {
984 runTime:
985 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
986 .runTime.aggregate,
987 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
988 .elu.aggregate
989 },
990 workerId: this.getWorkerInfo(workerNodeKey).id as number
991 })
992 }
993
994 private redistributeQueuedTasks (workerNodeKey: number): void {
995 while (this.tasksQueueSize(workerNodeKey) > 0) {
996 let targetWorkerNodeKey: number = workerNodeKey
997 let minQueuedTasks = Infinity
998 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
999 const workerInfo = this.getWorkerInfo(workerNodeId)
1000 if (
1001 workerNodeId !== workerNodeKey &&
1002 workerInfo.ready &&
1003 workerNode.usage.tasks.queued === 0
1004 ) {
1005 targetWorkerNodeKey = workerNodeId
1006 break
1007 }
1008 if (
1009 workerNodeId !== workerNodeKey &&
1010 workerInfo.ready &&
1011 workerNode.usage.tasks.queued < minQueuedTasks
1012 ) {
1013 minQueuedTasks = workerNode.usage.tasks.queued
1014 targetWorkerNodeKey = workerNodeId
1015 }
1016 }
1017 this.enqueueTask(
1018 targetWorkerNodeKey,
1019 this.dequeueTask(workerNodeKey) as Task<Data>
1020 )
1021 }
1022 }
1023
1024 /**
1025 * This method is the listener registered for each worker message.
1026 *
1027 * @returns The listener function to execute when a message is received from a worker.
1028 */
1029 protected workerListener (): (message: MessageValue<Response>) => void {
1030 return message => {
1031 this.checkMessageWorkerId(message)
1032 if (message.ready != null) {
1033 // Worker ready response received
1034 this.handleWorkerReadyResponse(message)
1035 } else if (message.id != null) {
1036 // Task execution response received
1037 this.handleTaskExecutionResponse(message)
1038 }
1039 }
1040 }
1041
1042 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
1043 this.getWorkerInfo(
1044 this.getWorkerNodeKeyByWorkerId(message.workerId) as number
1045 ).ready = message.ready as boolean
1046 if (this.emitter != null && this.ready) {
1047 this.emitter.emit(PoolEvents.ready, this.info)
1048 }
1049 }
1050
1051 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1052 const promiseResponse = this.promiseResponseMap.get(message.id as string)
1053 if (promiseResponse != null) {
1054 if (message.taskError != null) {
1055 this.emitter?.emit(PoolEvents.taskError, message.taskError)
1056 promiseResponse.reject(message.taskError.message)
1057 } else {
1058 promiseResponse.resolve(message.data as Response)
1059 }
1060 const workerNodeKey = promiseResponse.workerNodeKey
1061 this.afterTaskExecutionHook(workerNodeKey, message)
1062 this.promiseResponseMap.delete(message.id as string)
1063 if (
1064 this.opts.enableTasksQueue === true &&
1065 this.tasksQueueSize(workerNodeKey) > 0
1066 ) {
1067 this.executeTask(
1068 workerNodeKey,
1069 this.dequeueTask(workerNodeKey) as Task<Data>
1070 )
1071 }
1072 this.workerChoiceStrategyContext.update(workerNodeKey)
1073 }
1074 }
1075
1076 private checkAndEmitEvents (): void {
1077 if (this.emitter != null) {
1078 if (this.busy) {
1079 this.emitter.emit(PoolEvents.busy, this.info)
1080 }
1081 if (this.type === PoolTypes.dynamic && this.full) {
1082 this.emitter.emit(PoolEvents.full, this.info)
1083 }
1084 }
1085 }
1086
1087 /**
1088 * Gets the worker information given its worker node key.
1089 *
1090 * @param workerNodeKey - The worker node key.
1091 * @returns The worker information.
1092 */
1093 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
1094 return this.workerNodes[workerNodeKey].info
1095 }
1096
1097 /**
1098 * Adds the given worker in the pool worker nodes.
1099 *
1100 * @param worker - The worker.
1101 * @returns The added worker node key.
1102 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1103 */
1104 private addWorkerNode (worker: Worker): number {
1105 const workerNode = new WorkerNode<Worker, Data>(worker, this.worker)
1106 // Flag the worker node as ready at pool startup.
1107 if (this.starting) {
1108 workerNode.info.ready = true
1109 }
1110 this.workerNodes.push(workerNode)
1111 const workerNodeKey = this.getWorkerNodeKey(worker)
1112 if (workerNodeKey === -1) {
1113 throw new Error('Worker node not found')
1114 }
1115 return workerNodeKey
1116 }
1117
1118 /**
1119 * Removes the given worker from the pool worker nodes.
1120 *
1121 * @param worker - The worker.
1122 */
1123 private removeWorkerNode (worker: Worker): void {
1124 const workerNodeKey = this.getWorkerNodeKey(worker)
1125 if (workerNodeKey !== -1) {
1126 this.workerNodes.splice(workerNodeKey, 1)
1127 this.workerChoiceStrategyContext.remove(workerNodeKey)
1128 }
1129 }
1130
1131 /**
1132 * Executes the given task on the worker given its worker node key.
1133 *
1134 * @param workerNodeKey - The worker node key.
1135 * @param task - The task to execute.
1136 */
1137 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1138 this.beforeTaskExecutionHook(workerNodeKey, task)
1139 this.sendToWorker(workerNodeKey, task)
1140 }
1141
1142 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1143 return this.workerNodes[workerNodeKey].enqueueTask(task)
1144 }
1145
1146 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1147 return this.workerNodes[workerNodeKey].dequeueTask()
1148 }
1149
1150 private tasksQueueSize (workerNodeKey: number): number {
1151 return this.workerNodes[workerNodeKey].tasksQueueSize()
1152 }
1153
1154 private flushTasksQueue (workerNodeKey: number): void {
1155 while (this.tasksQueueSize(workerNodeKey) > 0) {
1156 this.executeTask(
1157 workerNodeKey,
1158 this.dequeueTask(workerNodeKey) as Task<Data>
1159 )
1160 }
1161 this.workerNodes[workerNodeKey].clearTasksQueue()
1162 }
1163
1164 private flushTasksQueues (): void {
1165 for (const [workerNodeKey] of this.workerNodes.entries()) {
1166 this.flushTasksQueue(workerNodeKey)
1167 }
1168 }
1169 }