perf: drastically reduce worker nodes array lookups
[poolifier.git] / src / pools / abstract-pool.ts
1 import { randomUUID } from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import { existsSync } from 'node:fs'
4 import type {
5 MessageValue,
6 PromiseResponseWrapper,
7 Task
8 } from '../utility-types'
9 import {
10 DEFAULT_TASK_NAME,
11 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
12 EMPTY_FUNCTION,
13 isKillBehavior,
14 isPlainObject,
15 median,
16 round,
17 updateMeasurementStatistics
18 } from '../utils'
19 import { KillBehaviors } from '../worker/worker-options'
20 import {
21 type IPool,
22 PoolEmitter,
23 PoolEvents,
24 type PoolInfo,
25 type PoolOptions,
26 type PoolType,
27 PoolTypes,
28 type TasksQueueOptions
29 } from './pool'
30 import type {
31 IWorker,
32 IWorkerNode,
33 WorkerInfo,
34 WorkerType,
35 WorkerUsage
36 } from './worker'
37 import {
38 type MeasurementStatisticsRequirements,
39 Measurements,
40 WorkerChoiceStrategies,
41 type WorkerChoiceStrategy,
42 type WorkerChoiceStrategyOptions
43 } from './selection-strategies/selection-strategies-types'
44 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
45 import { version } from './version'
46 import { WorkerNode } from './worker-node'
47
48 /**
49 * Base class that implements some shared logic for all poolifier pools.
50 *
51 * @typeParam Worker - Type of worker which manages this pool.
52 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
53 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
54 */
55 export abstract class AbstractPool<
56 Worker extends IWorker,
57 Data = unknown,
58 Response = unknown
59 > implements IPool<Worker, Data, Response> {
60 /** @inheritDoc */
61 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
62
63 /** @inheritDoc */
64 public readonly emitter?: PoolEmitter
65
66 /**
67 * The task execution response promise map.
68 *
69 * - `key`: The message id of each submitted task.
70 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
71 *
72 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
73 */
74 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
75 new Map<string, PromiseResponseWrapper<Response>>()
76
77 /**
78 * Worker choice strategy context referencing a worker choice algorithm implementation.
79 */
80 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
81 Worker,
82 Data,
83 Response
84 >
85
86 /**
87 * Whether the pool is starting or not.
88 */
89 private readonly starting: boolean
90 /**
91 * The start timestamp of the pool.
92 */
93 private readonly startTimestamp
94
95 /**
96 * Constructs a new poolifier pool.
97 *
98 * @param numberOfWorkers - Number of workers that this pool should manage.
99 * @param filePath - Path to the worker file.
100 * @param opts - Options for the pool.
101 */
102 public constructor (
103 protected readonly numberOfWorkers: number,
104 protected readonly filePath: string,
105 protected readonly opts: PoolOptions<Worker>
106 ) {
107 if (!this.isMain()) {
108 throw new Error('Cannot start a pool from a worker!')
109 }
110 this.checkNumberOfWorkers(this.numberOfWorkers)
111 this.checkFilePath(this.filePath)
112 this.checkPoolOptions(this.opts)
113
114 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
115 this.executeTask = this.executeTask.bind(this)
116 this.enqueueTask = this.enqueueTask.bind(this)
117 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
118
119 if (this.opts.enableEvents === true) {
120 this.emitter = new PoolEmitter()
121 }
122 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
123 Worker,
124 Data,
125 Response
126 >(
127 this,
128 this.opts.workerChoiceStrategy,
129 this.opts.workerChoiceStrategyOptions
130 )
131
132 this.setupHook()
133
134 this.starting = true
135 this.startPool()
136 this.starting = false
137
138 this.startTimestamp = performance.now()
139 }
140
141 private checkFilePath (filePath: string): void {
142 if (
143 filePath == null ||
144 typeof filePath !== 'string' ||
145 (typeof filePath === 'string' && filePath.trim().length === 0)
146 ) {
147 throw new Error('Please specify a file with a worker implementation')
148 }
149 if (!existsSync(filePath)) {
150 throw new Error(`Cannot find the worker file '${filePath}'`)
151 }
152 }
153
154 private checkNumberOfWorkers (numberOfWorkers: number): void {
155 if (numberOfWorkers == null) {
156 throw new Error(
157 'Cannot instantiate a pool without specifying the number of workers'
158 )
159 } else if (!Number.isSafeInteger(numberOfWorkers)) {
160 throw new TypeError(
161 'Cannot instantiate a pool with a non safe integer number of workers'
162 )
163 } else if (numberOfWorkers < 0) {
164 throw new RangeError(
165 'Cannot instantiate a pool with a negative number of workers'
166 )
167 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
168 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
169 }
170 }
171
172 protected checkDynamicPoolSize (min: number, max: number): void {
173 if (this.type === PoolTypes.dynamic) {
174 if (min > max) {
175 throw new RangeError(
176 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
177 )
178 } else if (max === 0) {
179 throw new RangeError(
180 'Cannot instantiate a dynamic pool with a pool size equal to zero'
181 )
182 } else if (min === max) {
183 throw new RangeError(
184 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
185 )
186 }
187 }
188 }
189
190 private checkPoolOptions (opts: PoolOptions<Worker>): void {
191 if (isPlainObject(opts)) {
192 this.opts.workerChoiceStrategy =
193 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
194 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
195 this.opts.workerChoiceStrategyOptions =
196 opts.workerChoiceStrategyOptions ??
197 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
198 this.checkValidWorkerChoiceStrategyOptions(
199 this.opts.workerChoiceStrategyOptions
200 )
201 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
202 this.opts.enableEvents = opts.enableEvents ?? true
203 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
204 if (this.opts.enableTasksQueue) {
205 this.checkValidTasksQueueOptions(
206 opts.tasksQueueOptions as TasksQueueOptions
207 )
208 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
209 opts.tasksQueueOptions as TasksQueueOptions
210 )
211 }
212 } else {
213 throw new TypeError('Invalid pool options: must be a plain object')
214 }
215 }
216
217 private checkValidWorkerChoiceStrategy (
218 workerChoiceStrategy: WorkerChoiceStrategy
219 ): void {
220 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
221 throw new Error(
222 `Invalid worker choice strategy '${workerChoiceStrategy}'`
223 )
224 }
225 }
226
227 private checkValidWorkerChoiceStrategyOptions (
228 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
229 ): void {
230 if (!isPlainObject(workerChoiceStrategyOptions)) {
231 throw new TypeError(
232 'Invalid worker choice strategy options: must be a plain object'
233 )
234 }
235 if (
236 workerChoiceStrategyOptions.weights != null &&
237 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
238 ) {
239 throw new Error(
240 'Invalid worker choice strategy options: must have a weight for each worker node'
241 )
242 }
243 if (
244 workerChoiceStrategyOptions.measurement != null &&
245 !Object.values(Measurements).includes(
246 workerChoiceStrategyOptions.measurement
247 )
248 ) {
249 throw new Error(
250 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
251 )
252 }
253 }
254
255 private checkValidTasksQueueOptions (
256 tasksQueueOptions: TasksQueueOptions
257 ): void {
258 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
259 throw new TypeError('Invalid tasks queue options: must be a plain object')
260 }
261 if (
262 tasksQueueOptions?.concurrency != null &&
263 !Number.isSafeInteger(tasksQueueOptions.concurrency)
264 ) {
265 throw new TypeError(
266 'Invalid worker tasks concurrency: must be an integer'
267 )
268 }
269 if (
270 tasksQueueOptions?.concurrency != null &&
271 tasksQueueOptions.concurrency <= 0
272 ) {
273 throw new Error(
274 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
275 )
276 }
277 }
278
279 private startPool (): void {
280 while (
281 this.workerNodes.reduce(
282 (accumulator, workerNode) =>
283 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
284 0
285 ) < this.numberOfWorkers
286 ) {
287 this.createAndSetupWorkerNode()
288 }
289 }
290
291 /** @inheritDoc */
292 public get info (): PoolInfo {
293 return {
294 version,
295 type: this.type,
296 worker: this.worker,
297 ready: this.ready,
298 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
299 minSize: this.minSize,
300 maxSize: this.maxSize,
301 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
302 .runTime.aggregate &&
303 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
304 .waitTime.aggregate && { utilization: round(this.utilization) }),
305 workerNodes: this.workerNodes.length,
306 idleWorkerNodes: this.workerNodes.reduce(
307 (accumulator, workerNode) =>
308 workerNode.usage.tasks.executing === 0
309 ? accumulator + 1
310 : accumulator,
311 0
312 ),
313 busyWorkerNodes: this.workerNodes.reduce(
314 (accumulator, workerNode) =>
315 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
316 0
317 ),
318 executedTasks: this.workerNodes.reduce(
319 (accumulator, workerNode) =>
320 accumulator + workerNode.usage.tasks.executed,
321 0
322 ),
323 executingTasks: this.workerNodes.reduce(
324 (accumulator, workerNode) =>
325 accumulator + workerNode.usage.tasks.executing,
326 0
327 ),
328 queuedTasks: this.workerNodes.reduce(
329 (accumulator, workerNode) =>
330 accumulator + workerNode.usage.tasks.queued,
331 0
332 ),
333 maxQueuedTasks: this.workerNodes.reduce(
334 (accumulator, workerNode) =>
335 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
336 0
337 ),
338 failedTasks: this.workerNodes.reduce(
339 (accumulator, workerNode) =>
340 accumulator + workerNode.usage.tasks.failed,
341 0
342 ),
343 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
344 .runTime.aggregate && {
345 runTime: {
346 minimum: round(
347 Math.min(
348 ...this.workerNodes.map(
349 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
350 )
351 )
352 ),
353 maximum: round(
354 Math.max(
355 ...this.workerNodes.map(
356 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
357 )
358 )
359 ),
360 average: round(
361 this.workerNodes.reduce(
362 (accumulator, workerNode) =>
363 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
364 0
365 ) /
366 this.workerNodes.reduce(
367 (accumulator, workerNode) =>
368 accumulator + (workerNode.usage.tasks?.executed ?? 0),
369 0
370 )
371 ),
372 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
373 .runTime.median && {
374 median: round(
375 median(
376 this.workerNodes.map(
377 workerNode => workerNode.usage.runTime?.median ?? 0
378 )
379 )
380 )
381 })
382 }
383 }),
384 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
385 .waitTime.aggregate && {
386 waitTime: {
387 minimum: round(
388 Math.min(
389 ...this.workerNodes.map(
390 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
391 )
392 )
393 ),
394 maximum: round(
395 Math.max(
396 ...this.workerNodes.map(
397 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
398 )
399 )
400 ),
401 average: round(
402 this.workerNodes.reduce(
403 (accumulator, workerNode) =>
404 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
405 0
406 ) /
407 this.workerNodes.reduce(
408 (accumulator, workerNode) =>
409 accumulator + (workerNode.usage.tasks?.executed ?? 0),
410 0
411 )
412 ),
413 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
414 .waitTime.median && {
415 median: round(
416 median(
417 this.workerNodes.map(
418 workerNode => workerNode.usage.waitTime?.median ?? 0
419 )
420 )
421 )
422 })
423 }
424 })
425 }
426 }
427
428 /**
429 * The pool readiness boolean status.
430 */
431 private get ready (): boolean {
432 return (
433 this.workerNodes.reduce(
434 (accumulator, workerNode) =>
435 !workerNode.info.dynamic && workerNode.info.ready
436 ? accumulator + 1
437 : accumulator,
438 0
439 ) >= this.minSize
440 )
441 }
442
443 /**
444 * The approximate pool utilization.
445 *
446 * @returns The pool utilization.
447 */
448 private get utilization (): number {
449 const poolTimeCapacity =
450 (performance.now() - this.startTimestamp) * this.maxSize
451 const totalTasksRunTime = this.workerNodes.reduce(
452 (accumulator, workerNode) =>
453 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
454 0
455 )
456 const totalTasksWaitTime = this.workerNodes.reduce(
457 (accumulator, workerNode) =>
458 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
459 0
460 )
461 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
462 }
463
464 /**
465 * The pool type.
466 *
467 * If it is `'dynamic'`, it provides the `max` property.
468 */
469 protected abstract get type (): PoolType
470
471 /**
472 * The worker type.
473 */
474 protected abstract get worker (): WorkerType
475
476 /**
477 * The pool minimum size.
478 */
479 protected abstract get minSize (): number
480
481 /**
482 * The pool maximum size.
483 */
484 protected abstract get maxSize (): number
485
486 /**
487 * Checks if the worker id sent in the received message from a worker is valid.
488 *
489 * @param message - The received message.
490 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
491 */
492 private checkMessageWorkerId (message: MessageValue<Response>): void {
493 if (
494 message.workerId != null &&
495 this.getWorkerNodeKeyByWorkerId(message.workerId) == null
496 ) {
497 throw new Error(
498 `Worker message received from unknown worker '${message.workerId}'`
499 )
500 }
501 }
502
503 /**
504 * Gets the given worker its worker node key.
505 *
506 * @param worker - The worker.
507 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
508 */
509 private getWorkerNodeKey (worker: Worker): number {
510 return this.workerNodes.findIndex(
511 workerNode => workerNode.worker === worker
512 )
513 }
514
515 /**
516 * Gets the worker node key given its worker id.
517 *
518 * @param workerId - The worker id.
519 * @returns The worker node key if the worker id is found in the pool worker nodes, `undefined` otherwise.
520 */
521 private getWorkerNodeKeyByWorkerId (workerId: number): number | undefined {
522 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
523 if (workerNode.info.id === workerId) {
524 return workerNodeKey
525 }
526 }
527 }
528
529 /** @inheritDoc */
530 public setWorkerChoiceStrategy (
531 workerChoiceStrategy: WorkerChoiceStrategy,
532 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
533 ): void {
534 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
535 this.opts.workerChoiceStrategy = workerChoiceStrategy
536 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
537 this.opts.workerChoiceStrategy
538 )
539 if (workerChoiceStrategyOptions != null) {
540 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
541 }
542 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
543 workerNode.resetUsage()
544 this.sendWorkerStatisticsMessageToWorker(workerNodeKey)
545 }
546 }
547
548 /** @inheritDoc */
549 public setWorkerChoiceStrategyOptions (
550 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
551 ): void {
552 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
553 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
554 this.workerChoiceStrategyContext.setOptions(
555 this.opts.workerChoiceStrategyOptions
556 )
557 }
558
559 /** @inheritDoc */
560 public enableTasksQueue (
561 enable: boolean,
562 tasksQueueOptions?: TasksQueueOptions
563 ): void {
564 if (this.opts.enableTasksQueue === true && !enable) {
565 this.flushTasksQueues()
566 }
567 this.opts.enableTasksQueue = enable
568 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
569 }
570
571 /** @inheritDoc */
572 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
573 if (this.opts.enableTasksQueue === true) {
574 this.checkValidTasksQueueOptions(tasksQueueOptions)
575 this.opts.tasksQueueOptions =
576 this.buildTasksQueueOptions(tasksQueueOptions)
577 } else if (this.opts.tasksQueueOptions != null) {
578 delete this.opts.tasksQueueOptions
579 }
580 }
581
582 private buildTasksQueueOptions (
583 tasksQueueOptions: TasksQueueOptions
584 ): TasksQueueOptions {
585 return {
586 concurrency: tasksQueueOptions?.concurrency ?? 1
587 }
588 }
589
590 /**
591 * Whether the pool is full or not.
592 *
593 * The pool filling boolean status.
594 */
595 protected get full (): boolean {
596 return this.workerNodes.length >= this.maxSize
597 }
598
599 /**
600 * Whether the pool is busy or not.
601 *
602 * The pool busyness boolean status.
603 */
604 protected abstract get busy (): boolean
605
606 /**
607 * Whether worker nodes are executing at least one task.
608 *
609 * @returns Worker nodes busyness boolean status.
610 */
611 protected internalBusy (): boolean {
612 return (
613 this.workerNodes.findIndex(workerNode => {
614 return workerNode.usage.tasks.executing === 0
615 }) === -1
616 )
617 }
618
619 /** @inheritDoc */
620 public async execute (data?: Data, name?: string): Promise<Response> {
621 return await new Promise<Response>((resolve, reject) => {
622 const timestamp = performance.now()
623 const workerNodeKey = this.chooseWorkerNode()
624 const task: Task<Data> = {
625 name: name ?? DEFAULT_TASK_NAME,
626 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
627 data: data ?? ({} as Data),
628 timestamp,
629 workerId: this.getWorkerInfo(workerNodeKey).id as number,
630 id: randomUUID()
631 }
632 this.promiseResponseMap.set(task.id as string, {
633 resolve,
634 reject,
635 workerNodeKey
636 })
637 if (
638 this.opts.enableTasksQueue === true &&
639 (this.busy ||
640 this.workerNodes[workerNodeKey].usage.tasks.executing >=
641 ((this.opts.tasksQueueOptions as TasksQueueOptions)
642 .concurrency as number))
643 ) {
644 this.enqueueTask(workerNodeKey, task)
645 } else {
646 this.executeTask(workerNodeKey, task)
647 }
648 this.checkAndEmitEvents()
649 })
650 }
651
652 /** @inheritDoc */
653 public async destroy (): Promise<void> {
654 await Promise.all(
655 this.workerNodes.map(async (workerNode, workerNodeKey) => {
656 this.flushTasksQueue(workerNodeKey)
657 // FIXME: wait for tasks to be finished
658 const workerExitPromise = new Promise<void>(resolve => {
659 workerNode.worker.on('exit', () => {
660 resolve()
661 })
662 })
663 await this.destroyWorkerNode(workerNodeKey)
664 await workerExitPromise
665 })
666 )
667 }
668
669 /**
670 * Terminates the worker node given its worker node key.
671 *
672 * @param workerNodeKey - The worker node key.
673 */
674 protected abstract destroyWorkerNode (
675 workerNodeKey: number
676 ): void | Promise<void>
677
678 /**
679 * Setup hook to execute code before worker nodes are created in the abstract constructor.
680 * Can be overridden.
681 *
682 * @virtual
683 */
684 protected setupHook (): void {
685 // Intentionally empty
686 }
687
688 /**
689 * Should return whether the worker is the main worker or not.
690 */
691 protected abstract isMain (): boolean
692
693 /**
694 * Hook executed before the worker task execution.
695 * Can be overridden.
696 *
697 * @param workerNodeKey - The worker node key.
698 * @param task - The task to execute.
699 */
700 protected beforeTaskExecutionHook (
701 workerNodeKey: number,
702 task: Task<Data>
703 ): void {
704 const workerUsage = this.workerNodes[workerNodeKey].usage
705 ++workerUsage.tasks.executing
706 this.updateWaitTimeWorkerUsage(workerUsage, task)
707 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
708 task.name as string
709 ) as WorkerUsage
710 ++taskWorkerUsage.tasks.executing
711 this.updateWaitTimeWorkerUsage(taskWorkerUsage, task)
712 }
713
714 /**
715 * Hook executed after the worker task execution.
716 * Can be overridden.
717 *
718 * @param workerNodeKey - The worker node key.
719 * @param message - The received message.
720 */
721 protected afterTaskExecutionHook (
722 workerNodeKey: number,
723 message: MessageValue<Response>
724 ): void {
725 const workerUsage = this.workerNodes[workerNodeKey].usage
726 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
727 this.updateRunTimeWorkerUsage(workerUsage, message)
728 this.updateEluWorkerUsage(workerUsage, message)
729 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
730 message.taskPerformance?.name ?? DEFAULT_TASK_NAME
731 ) as WorkerUsage
732 this.updateTaskStatisticsWorkerUsage(taskWorkerUsage, message)
733 this.updateRunTimeWorkerUsage(taskWorkerUsage, message)
734 this.updateEluWorkerUsage(taskWorkerUsage, message)
735 }
736
737 private updateTaskStatisticsWorkerUsage (
738 workerUsage: WorkerUsage,
739 message: MessageValue<Response>
740 ): void {
741 const workerTaskStatistics = workerUsage.tasks
742 --workerTaskStatistics.executing
743 if (message.taskError == null) {
744 ++workerTaskStatistics.executed
745 } else {
746 ++workerTaskStatistics.failed
747 }
748 }
749
750 private updateRunTimeWorkerUsage (
751 workerUsage: WorkerUsage,
752 message: MessageValue<Response>
753 ): void {
754 updateMeasurementStatistics(
755 workerUsage.runTime,
756 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
757 message.taskPerformance?.runTime ?? 0,
758 workerUsage.tasks.executed
759 )
760 }
761
762 private updateWaitTimeWorkerUsage (
763 workerUsage: WorkerUsage,
764 task: Task<Data>
765 ): void {
766 const timestamp = performance.now()
767 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
768 updateMeasurementStatistics(
769 workerUsage.waitTime,
770 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
771 taskWaitTime,
772 workerUsage.tasks.executed
773 )
774 }
775
776 private updateEluWorkerUsage (
777 workerUsage: WorkerUsage,
778 message: MessageValue<Response>
779 ): void {
780 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
781 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
782 updateMeasurementStatistics(
783 workerUsage.elu.active,
784 eluTaskStatisticsRequirements,
785 message.taskPerformance?.elu?.active ?? 0,
786 workerUsage.tasks.executed
787 )
788 updateMeasurementStatistics(
789 workerUsage.elu.idle,
790 eluTaskStatisticsRequirements,
791 message.taskPerformance?.elu?.idle ?? 0,
792 workerUsage.tasks.executed
793 )
794 if (eluTaskStatisticsRequirements.aggregate) {
795 if (message.taskPerformance?.elu != null) {
796 if (workerUsage.elu.utilization != null) {
797 workerUsage.elu.utilization =
798 (workerUsage.elu.utilization +
799 message.taskPerformance.elu.utilization) /
800 2
801 } else {
802 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
803 }
804 }
805 }
806 }
807
808 /**
809 * Chooses a worker node for the next task.
810 *
811 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
812 *
813 * @returns The chosen worker node key
814 */
815 private chooseWorkerNode (): number {
816 if (this.shallCreateDynamicWorker()) {
817 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
818 if (
819 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
820 ) {
821 return workerNodeKey
822 }
823 }
824 return this.workerChoiceStrategyContext.execute()
825 }
826
827 /**
828 * Conditions for dynamic worker creation.
829 *
830 * @returns Whether to create a dynamic worker or not.
831 */
832 private shallCreateDynamicWorker (): boolean {
833 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
834 }
835
836 /**
837 * Sends a message to worker given its worker node key.
838 *
839 * @param workerNodeKey - The worker node key.
840 * @param message - The message.
841 */
842 protected abstract sendToWorker (
843 workerNodeKey: number,
844 message: MessageValue<Data>
845 ): void
846
847 /**
848 * Creates a new worker.
849 *
850 * @returns Newly created worker.
851 */
852 protected abstract createWorker (): Worker
853
854 /**
855 * Creates a new, completely set up worker node.
856 *
857 * @returns New, completely set up worker node key.
858 */
859 protected createAndSetupWorkerNode (): number {
860 const worker = this.createWorker()
861
862 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
863 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
864 worker.on('error', error => {
865 const workerNodeKey = this.getWorkerNodeKey(worker)
866 const workerInfo = this.getWorkerInfo(workerNodeKey)
867 workerInfo.ready = false
868 this.workerNodes[workerNodeKey].closeChannel()
869 this.emitter?.emit(PoolEvents.error, error)
870 if (this.opts.restartWorkerOnError === true && !this.starting) {
871 if (workerInfo.dynamic) {
872 this.createAndSetupDynamicWorkerNode()
873 } else {
874 this.createAndSetupWorkerNode()
875 }
876 }
877 if (this.opts.enableTasksQueue === true) {
878 this.redistributeQueuedTasks(workerNodeKey)
879 }
880 })
881 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
882 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
883 worker.once('exit', () => {
884 this.removeWorkerNode(worker)
885 })
886
887 const workerNodeKey = this.addWorkerNode(worker)
888
889 this.afterWorkerNodeSetup(workerNodeKey)
890
891 return workerNodeKey
892 }
893
894 /**
895 * Creates a new, completely set up dynamic worker node.
896 *
897 * @returns New, completely set up dynamic worker node key.
898 */
899 protected createAndSetupDynamicWorkerNode (): number {
900 const workerNodeKey = this.createAndSetupWorkerNode()
901 this.registerWorkerMessageListener(workerNodeKey, message => {
902 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
903 message.workerId
904 ) as number
905 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
906 if (
907 isKillBehavior(KillBehaviors.HARD, message.kill) ||
908 (message.kill != null &&
909 ((this.opts.enableTasksQueue === false &&
910 workerUsage.tasks.executing === 0) ||
911 (this.opts.enableTasksQueue === true &&
912 workerUsage.tasks.executing === 0 &&
913 this.tasksQueueSize(localWorkerNodeKey) === 0)))
914 ) {
915 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
916 void (this.destroyWorkerNode(localWorkerNodeKey) as Promise<void>)
917 }
918 })
919 const workerInfo = this.getWorkerInfo(workerNodeKey)
920 workerInfo.dynamic = true
921 if (this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker) {
922 workerInfo.ready = true
923 }
924 this.sendToWorker(workerNodeKey, {
925 checkActive: true,
926 workerId: workerInfo.id as number
927 })
928 return workerNodeKey
929 }
930
931 /**
932 * Registers a listener callback on the worker given its worker node key.
933 *
934 * @param workerNodeKey - The worker node key.
935 * @param listener - The message listener callback.
936 */
937 protected abstract registerWorkerMessageListener<
938 Message extends Data | Response
939 >(
940 workerNodeKey: number,
941 listener: (message: MessageValue<Message>) => void
942 ): void
943
944 /**
945 * Method hooked up after a worker node has been newly created.
946 * Can be overridden.
947 *
948 * @param workerNodeKey - The newly created worker node key.
949 */
950 protected afterWorkerNodeSetup (workerNodeKey: number): void {
951 // Listen to worker messages.
952 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
953 // Send the startup message to worker.
954 this.sendStartupMessageToWorker(workerNodeKey)
955 // Send the worker statistics message to worker.
956 this.sendWorkerStatisticsMessageToWorker(workerNodeKey)
957 }
958
959 /**
960 * Sends the startup message to worker given its worker node key.
961 *
962 * @param workerNodeKey - The worker node key.
963 */
964 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
965
966 /**
967 * Sends the worker statistics message to worker given its worker node key.
968 *
969 * @param workerNodeKey - The worker node key.
970 */
971 private sendWorkerStatisticsMessageToWorker (workerNodeKey: number): void {
972 this.sendToWorker(workerNodeKey, {
973 statistics: {
974 runTime:
975 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
976 .runTime.aggregate,
977 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
978 .elu.aggregate
979 },
980 workerId: this.getWorkerInfo(workerNodeKey).id as number
981 })
982 }
983
984 private redistributeQueuedTasks (workerNodeKey: number): void {
985 while (this.tasksQueueSize(workerNodeKey) > 0) {
986 let targetWorkerNodeKey: number = workerNodeKey
987 let minQueuedTasks = Infinity
988 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
989 const workerInfo = this.getWorkerInfo(workerNodeId)
990 if (
991 workerNodeId !== workerNodeKey &&
992 workerInfo.ready &&
993 workerNode.usage.tasks.queued === 0
994 ) {
995 targetWorkerNodeKey = workerNodeId
996 break
997 }
998 if (
999 workerNodeId !== workerNodeKey &&
1000 workerInfo.ready &&
1001 workerNode.usage.tasks.queued < minQueuedTasks
1002 ) {
1003 minQueuedTasks = workerNode.usage.tasks.queued
1004 targetWorkerNodeKey = workerNodeId
1005 }
1006 }
1007 this.enqueueTask(
1008 targetWorkerNodeKey,
1009 this.dequeueTask(workerNodeKey) as Task<Data>
1010 )
1011 }
1012 }
1013
1014 /**
1015 * This method is the listener registered for each worker message.
1016 *
1017 * @returns The listener function to execute when a message is received from a worker.
1018 */
1019 protected workerListener (): (message: MessageValue<Response>) => void {
1020 return message => {
1021 this.checkMessageWorkerId(message)
1022 if (message.ready != null) {
1023 // Worker ready response received
1024 this.handleWorkerReadyResponse(message)
1025 } else if (message.id != null) {
1026 // Task execution response received
1027 this.handleTaskExecutionResponse(message)
1028 }
1029 }
1030 }
1031
1032 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
1033 this.getWorkerInfo(
1034 this.getWorkerNodeKeyByWorkerId(message.workerId) as number
1035 ).ready = message.ready as boolean
1036 if (this.emitter != null && this.ready) {
1037 this.emitter.emit(PoolEvents.ready, this.info)
1038 }
1039 }
1040
1041 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1042 const promiseResponse = this.promiseResponseMap.get(message.id as string)
1043 if (promiseResponse != null) {
1044 if (message.taskError != null) {
1045 this.emitter?.emit(PoolEvents.taskError, message.taskError)
1046 promiseResponse.reject(message.taskError.message)
1047 } else {
1048 promiseResponse.resolve(message.data as Response)
1049 }
1050 const workerNodeKey = promiseResponse.workerNodeKey
1051 this.afterTaskExecutionHook(workerNodeKey, message)
1052 this.promiseResponseMap.delete(message.id as string)
1053 if (
1054 this.opts.enableTasksQueue === true &&
1055 this.tasksQueueSize(workerNodeKey) > 0
1056 ) {
1057 this.executeTask(
1058 workerNodeKey,
1059 this.dequeueTask(workerNodeKey) as Task<Data>
1060 )
1061 }
1062 this.workerChoiceStrategyContext.update(workerNodeKey)
1063 }
1064 }
1065
1066 private checkAndEmitEvents (): void {
1067 if (this.emitter != null) {
1068 if (this.busy) {
1069 this.emitter.emit(PoolEvents.busy, this.info)
1070 }
1071 if (this.type === PoolTypes.dynamic && this.full) {
1072 this.emitter.emit(PoolEvents.full, this.info)
1073 }
1074 }
1075 }
1076
1077 /**
1078 * Gets the worker information given its worker node key.
1079 *
1080 * @param workerNodeKey - The worker node key.
1081 * @returns The worker information.
1082 */
1083 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
1084 return this.workerNodes[workerNodeKey].info
1085 }
1086
1087 /**
1088 * Adds the given worker in the pool worker nodes.
1089 *
1090 * @param worker - The worker.
1091 * @returns The added worker node key.
1092 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1093 */
1094 private addWorkerNode (worker: Worker): number {
1095 const workerNode = new WorkerNode<Worker, Data>(worker, this.worker)
1096 // Flag the worker node as ready at pool startup.
1097 if (this.starting) {
1098 workerNode.info.ready = true
1099 }
1100 this.workerNodes.push(workerNode)
1101 const workerNodeKey = this.getWorkerNodeKey(worker)
1102 if (workerNodeKey === -1) {
1103 throw new Error('Worker node not found')
1104 }
1105 return workerNodeKey
1106 }
1107
1108 /**
1109 * Removes the given worker from the pool worker nodes.
1110 *
1111 * @param worker - The worker.
1112 */
1113 private removeWorkerNode (worker: Worker): void {
1114 const workerNodeKey = this.getWorkerNodeKey(worker)
1115 if (workerNodeKey !== -1) {
1116 this.workerNodes.splice(workerNodeKey, 1)
1117 this.workerChoiceStrategyContext.remove(workerNodeKey)
1118 }
1119 }
1120
1121 /**
1122 * Executes the given task on the worker given its worker node key.
1123 *
1124 * @param workerNodeKey - The worker node key.
1125 * @param task - The task to execute.
1126 */
1127 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1128 this.beforeTaskExecutionHook(workerNodeKey, task)
1129 this.sendToWorker(workerNodeKey, task)
1130 }
1131
1132 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1133 return this.workerNodes[workerNodeKey].enqueueTask(task)
1134 }
1135
1136 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1137 return this.workerNodes[workerNodeKey].dequeueTask()
1138 }
1139
1140 private tasksQueueSize (workerNodeKey: number): number {
1141 return this.workerNodes[workerNodeKey].tasksQueueSize()
1142 }
1143
1144 private flushTasksQueue (workerNodeKey: number): void {
1145 while (this.tasksQueueSize(workerNodeKey) > 0) {
1146 this.executeTask(
1147 workerNodeKey,
1148 this.dequeueTask(workerNodeKey) as Task<Data>
1149 )
1150 }
1151 this.workerNodes[workerNodeKey].clearTasksQueue()
1152 }
1153
1154 private flushTasksQueues (): void {
1155 for (const [workerNodeKey] of this.workerNodes.entries()) {
1156 this.flushTasksQueue(workerNodeKey)
1157 }
1158 }
1159 }