7ed6ab0351f3ad1cf44c6961b1273a2d78fc593f
[poolifier.git] / src / pools / abstract-pool.ts
1 import { randomUUID } from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import { existsSync } from 'node:fs'
4 import { type TransferListItem } from 'node:worker_threads'
5 import type {
6 MessageValue,
7 PromiseResponseWrapper,
8 Task
9 } from '../utility-types'
10 import {
11 DEFAULT_TASK_NAME,
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
13 EMPTY_FUNCTION,
14 isKillBehavior,
15 isPlainObject,
16 median,
17 round,
18 updateMeasurementStatistics
19 } from '../utils'
20 import { KillBehaviors } from '../worker/worker-options'
21 import {
22 type IPool,
23 PoolEmitter,
24 PoolEvents,
25 type PoolInfo,
26 type PoolOptions,
27 type PoolType,
28 PoolTypes,
29 type TasksQueueOptions
30 } from './pool'
31 import type {
32 IWorker,
33 IWorkerNode,
34 WorkerInfo,
35 WorkerType,
36 WorkerUsage
37 } from './worker'
38 import {
39 type MeasurementStatisticsRequirements,
40 Measurements,
41 WorkerChoiceStrategies,
42 type WorkerChoiceStrategy,
43 type WorkerChoiceStrategyOptions
44 } from './selection-strategies/selection-strategies-types'
45 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
46 import { version } from './version'
47 import { WorkerNode } from './worker-node'
48
49 /**
50 * Base class that implements some shared logic for all poolifier pools.
51 *
52 * @typeParam Worker - Type of worker which manages this pool.
53 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
54 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
55 */
56 export abstract class AbstractPool<
57 Worker extends IWorker,
58 Data = unknown,
59 Response = unknown
60 > implements IPool<Worker, Data, Response> {
61 /** @inheritDoc */
62 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
63
64 /** @inheritDoc */
65 public readonly emitter?: PoolEmitter
66
67 /**
68 * The task execution response promise map.
69 *
70 * - `key`: The message id of each submitted task.
71 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
72 *
73 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
74 */
75 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
76 new Map<string, PromiseResponseWrapper<Response>>()
77
78 /**
79 * Worker choice strategy context referencing a worker choice algorithm implementation.
80 */
81 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
82 Worker,
83 Data,
84 Response
85 >
86
87 /**
88 * Whether the pool is starting or not.
89 */
90 private readonly starting: boolean
91 /**
92 * The start timestamp of the pool.
93 */
94 private readonly startTimestamp
95
96 /**
97 * Constructs a new poolifier pool.
98 *
99 * @param numberOfWorkers - Number of workers that this pool should manage.
100 * @param filePath - Path to the worker file.
101 * @param opts - Options for the pool.
102 */
103 public constructor (
104 protected readonly numberOfWorkers: number,
105 protected readonly filePath: string,
106 protected readonly opts: PoolOptions<Worker>
107 ) {
108 if (!this.isMain()) {
109 throw new Error(
110 'Cannot start a pool from a worker with the same type as the pool'
111 )
112 }
113 this.checkNumberOfWorkers(this.numberOfWorkers)
114 this.checkFilePath(this.filePath)
115 this.checkPoolOptions(this.opts)
116
117 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
118 this.executeTask = this.executeTask.bind(this)
119 this.enqueueTask = this.enqueueTask.bind(this)
120 this.dequeueTask = this.dequeueTask.bind(this)
121 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
122
123 if (this.opts.enableEvents === true) {
124 this.emitter = new PoolEmitter()
125 }
126 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
127 Worker,
128 Data,
129 Response
130 >(
131 this,
132 this.opts.workerChoiceStrategy,
133 this.opts.workerChoiceStrategyOptions
134 )
135
136 this.setupHook()
137
138 this.starting = true
139 this.startPool()
140 this.starting = false
141
142 this.startTimestamp = performance.now()
143 }
144
145 private checkFilePath (filePath: string): void {
146 if (
147 filePath == null ||
148 typeof filePath !== 'string' ||
149 (typeof filePath === 'string' && filePath.trim().length === 0)
150 ) {
151 throw new Error('Please specify a file with a worker implementation')
152 }
153 if (!existsSync(filePath)) {
154 throw new Error(`Cannot find the worker file '${filePath}'`)
155 }
156 }
157
158 private checkNumberOfWorkers (numberOfWorkers: number): void {
159 if (numberOfWorkers == null) {
160 throw new Error(
161 'Cannot instantiate a pool without specifying the number of workers'
162 )
163 } else if (!Number.isSafeInteger(numberOfWorkers)) {
164 throw new TypeError(
165 'Cannot instantiate a pool with a non safe integer number of workers'
166 )
167 } else if (numberOfWorkers < 0) {
168 throw new RangeError(
169 'Cannot instantiate a pool with a negative number of workers'
170 )
171 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
172 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
173 }
174 }
175
176 protected checkDynamicPoolSize (min: number, max: number): void {
177 if (this.type === PoolTypes.dynamic) {
178 if (max == null) {
179 throw new Error(
180 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
181 )
182 } else if (!Number.isSafeInteger(max)) {
183 throw new TypeError(
184 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
185 )
186 } else if (min > max) {
187 throw new RangeError(
188 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
189 )
190 } else if (max === 0) {
191 throw new RangeError(
192 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
193 )
194 } else if (min === max) {
195 throw new RangeError(
196 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
197 )
198 }
199 }
200 }
201
202 private checkPoolOptions (opts: PoolOptions<Worker>): void {
203 if (isPlainObject(opts)) {
204 this.opts.workerChoiceStrategy =
205 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
206 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
207 this.opts.workerChoiceStrategyOptions = {
208 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
209 ...opts.workerChoiceStrategyOptions
210 }
211 this.checkValidWorkerChoiceStrategyOptions(
212 this.opts.workerChoiceStrategyOptions
213 )
214 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
215 this.opts.enableEvents = opts.enableEvents ?? true
216 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
217 if (this.opts.enableTasksQueue) {
218 this.checkValidTasksQueueOptions(
219 opts.tasksQueueOptions as TasksQueueOptions
220 )
221 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
222 opts.tasksQueueOptions as TasksQueueOptions
223 )
224 }
225 } else {
226 throw new TypeError('Invalid pool options: must be a plain object')
227 }
228 }
229
230 private checkValidWorkerChoiceStrategy (
231 workerChoiceStrategy: WorkerChoiceStrategy
232 ): void {
233 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
234 throw new Error(
235 `Invalid worker choice strategy '${workerChoiceStrategy}'`
236 )
237 }
238 }
239
240 private checkValidWorkerChoiceStrategyOptions (
241 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
242 ): void {
243 if (!isPlainObject(workerChoiceStrategyOptions)) {
244 throw new TypeError(
245 'Invalid worker choice strategy options: must be a plain object'
246 )
247 }
248 if (
249 workerChoiceStrategyOptions.choiceRetries != null &&
250 !Number.isSafeInteger(workerChoiceStrategyOptions.choiceRetries)
251 ) {
252 throw new TypeError(
253 'Invalid worker choice strategy options: choice retries must be an integer'
254 )
255 }
256 if (
257 workerChoiceStrategyOptions.choiceRetries != null &&
258 workerChoiceStrategyOptions.choiceRetries <= 0
259 ) {
260 throw new RangeError(
261 `Invalid worker choice strategy options: choice retries '${workerChoiceStrategyOptions.choiceRetries}' must be greater than zero`
262 )
263 }
264 if (
265 workerChoiceStrategyOptions.weights != null &&
266 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
267 ) {
268 throw new Error(
269 'Invalid worker choice strategy options: must have a weight for each worker node'
270 )
271 }
272 if (
273 workerChoiceStrategyOptions.measurement != null &&
274 !Object.values(Measurements).includes(
275 workerChoiceStrategyOptions.measurement
276 )
277 ) {
278 throw new Error(
279 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
280 )
281 }
282 }
283
284 private checkValidTasksQueueOptions (
285 tasksQueueOptions: TasksQueueOptions
286 ): void {
287 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
288 throw new TypeError('Invalid tasks queue options: must be a plain object')
289 }
290 if (
291 tasksQueueOptions?.concurrency != null &&
292 !Number.isSafeInteger(tasksQueueOptions.concurrency)
293 ) {
294 throw new TypeError(
295 'Invalid worker tasks concurrency: must be an integer'
296 )
297 }
298 if (
299 tasksQueueOptions?.concurrency != null &&
300 tasksQueueOptions.concurrency <= 0
301 ) {
302 throw new Error(
303 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}' is a negative integer or zero`
304 )
305 }
306 }
307
308 private startPool (): void {
309 while (
310 this.workerNodes.reduce(
311 (accumulator, workerNode) =>
312 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
313 0
314 ) < this.numberOfWorkers
315 ) {
316 this.createAndSetupWorkerNode()
317 }
318 }
319
320 /** @inheritDoc */
321 public get info (): PoolInfo {
322 return {
323 version,
324 type: this.type,
325 worker: this.worker,
326 ready: this.ready,
327 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
328 minSize: this.minSize,
329 maxSize: this.maxSize,
330 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
331 .runTime.aggregate &&
332 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
333 .waitTime.aggregate && { utilization: round(this.utilization) }),
334 workerNodes: this.workerNodes.length,
335 idleWorkerNodes: this.workerNodes.reduce(
336 (accumulator, workerNode) =>
337 workerNode.usage.tasks.executing === 0
338 ? accumulator + 1
339 : accumulator,
340 0
341 ),
342 busyWorkerNodes: this.workerNodes.reduce(
343 (accumulator, workerNode) =>
344 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
345 0
346 ),
347 executedTasks: this.workerNodes.reduce(
348 (accumulator, workerNode) =>
349 accumulator + workerNode.usage.tasks.executed,
350 0
351 ),
352 executingTasks: this.workerNodes.reduce(
353 (accumulator, workerNode) =>
354 accumulator + workerNode.usage.tasks.executing,
355 0
356 ),
357 ...(this.opts.enableTasksQueue === true && {
358 queuedTasks: this.workerNodes.reduce(
359 (accumulator, workerNode) =>
360 accumulator + workerNode.usage.tasks.queued,
361 0
362 )
363 }),
364 ...(this.opts.enableTasksQueue === true && {
365 maxQueuedTasks: this.workerNodes.reduce(
366 (accumulator, workerNode) =>
367 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
368 0
369 )
370 }),
371 failedTasks: this.workerNodes.reduce(
372 (accumulator, workerNode) =>
373 accumulator + workerNode.usage.tasks.failed,
374 0
375 ),
376 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
377 .runTime.aggregate && {
378 runTime: {
379 minimum: round(
380 Math.min(
381 ...this.workerNodes.map(
382 (workerNode) => workerNode.usage.runTime?.minimum ?? Infinity
383 )
384 )
385 ),
386 maximum: round(
387 Math.max(
388 ...this.workerNodes.map(
389 (workerNode) => workerNode.usage.runTime?.maximum ?? -Infinity
390 )
391 )
392 ),
393 average: round(
394 this.workerNodes.reduce(
395 (accumulator, workerNode) =>
396 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
397 0
398 ) /
399 this.workerNodes.reduce(
400 (accumulator, workerNode) =>
401 accumulator + (workerNode.usage.tasks?.executed ?? 0),
402 0
403 )
404 ),
405 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
406 .runTime.median && {
407 median: round(
408 median(
409 this.workerNodes.map(
410 (workerNode) => workerNode.usage.runTime?.median ?? 0
411 )
412 )
413 )
414 })
415 }
416 }),
417 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
418 .waitTime.aggregate && {
419 waitTime: {
420 minimum: round(
421 Math.min(
422 ...this.workerNodes.map(
423 (workerNode) => workerNode.usage.waitTime?.minimum ?? Infinity
424 )
425 )
426 ),
427 maximum: round(
428 Math.max(
429 ...this.workerNodes.map(
430 (workerNode) => workerNode.usage.waitTime?.maximum ?? -Infinity
431 )
432 )
433 ),
434 average: round(
435 this.workerNodes.reduce(
436 (accumulator, workerNode) =>
437 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
438 0
439 ) /
440 this.workerNodes.reduce(
441 (accumulator, workerNode) =>
442 accumulator + (workerNode.usage.tasks?.executed ?? 0),
443 0
444 )
445 ),
446 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
447 .waitTime.median && {
448 median: round(
449 median(
450 this.workerNodes.map(
451 (workerNode) => workerNode.usage.waitTime?.median ?? 0
452 )
453 )
454 )
455 })
456 }
457 })
458 }
459 }
460
461 /**
462 * The pool readiness boolean status.
463 */
464 private get ready (): boolean {
465 return (
466 this.workerNodes.reduce(
467 (accumulator, workerNode) =>
468 !workerNode.info.dynamic && workerNode.info.ready
469 ? accumulator + 1
470 : accumulator,
471 0
472 ) >= this.minSize
473 )
474 }
475
476 /**
477 * The approximate pool utilization.
478 *
479 * @returns The pool utilization.
480 */
481 private get utilization (): number {
482 const poolTimeCapacity =
483 (performance.now() - this.startTimestamp) * this.maxSize
484 const totalTasksRunTime = this.workerNodes.reduce(
485 (accumulator, workerNode) =>
486 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
487 0
488 )
489 const totalTasksWaitTime = this.workerNodes.reduce(
490 (accumulator, workerNode) =>
491 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
492 0
493 )
494 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
495 }
496
497 /**
498 * The pool type.
499 *
500 * If it is `'dynamic'`, it provides the `max` property.
501 */
502 protected abstract get type (): PoolType
503
504 /**
505 * The worker type.
506 */
507 protected abstract get worker (): WorkerType
508
509 /**
510 * The pool minimum size.
511 */
512 protected abstract get minSize (): number
513
514 /**
515 * The pool maximum size.
516 */
517 protected abstract get maxSize (): number
518
519 /**
520 * Checks if the worker id sent in the received message from a worker is valid.
521 *
522 * @param message - The received message.
523 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
524 */
525 private checkMessageWorkerId (message: MessageValue<Response>): void {
526 if (message.workerId == null) {
527 throw new Error('Worker message received without worker id')
528 } else if (
529 message.workerId != null &&
530 this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
531 ) {
532 throw new Error(
533 `Worker message received from unknown worker '${message.workerId}'`
534 )
535 }
536 }
537
538 /**
539 * Gets the given worker its worker node key.
540 *
541 * @param worker - The worker.
542 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
543 */
544 private getWorkerNodeKeyByWorker (worker: Worker): number {
545 return this.workerNodes.findIndex(
546 (workerNode) => workerNode.worker === worker
547 )
548 }
549
550 /**
551 * Gets the worker node key given its worker id.
552 *
553 * @param workerId - The worker id.
554 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
555 */
556 private getWorkerNodeKeyByWorkerId (workerId: number): number {
557 return this.workerNodes.findIndex(
558 (workerNode) => workerNode.info.id === workerId
559 )
560 }
561
562 /** @inheritDoc */
563 public setWorkerChoiceStrategy (
564 workerChoiceStrategy: WorkerChoiceStrategy,
565 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
566 ): void {
567 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
568 this.opts.workerChoiceStrategy = workerChoiceStrategy
569 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
570 this.opts.workerChoiceStrategy
571 )
572 if (workerChoiceStrategyOptions != null) {
573 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
574 }
575 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
576 workerNode.resetUsage()
577 this.sendStatisticsMessageToWorker(workerNodeKey)
578 }
579 }
580
581 /** @inheritDoc */
582 public setWorkerChoiceStrategyOptions (
583 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
584 ): void {
585 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
586 this.opts.workerChoiceStrategyOptions = {
587 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
588 ...workerChoiceStrategyOptions
589 }
590 this.workerChoiceStrategyContext.setOptions(
591 this.opts.workerChoiceStrategyOptions
592 )
593 }
594
595 /** @inheritDoc */
596 public enableTasksQueue (
597 enable: boolean,
598 tasksQueueOptions?: TasksQueueOptions
599 ): void {
600 if (this.opts.enableTasksQueue === true && !enable) {
601 this.flushTasksQueues()
602 }
603 this.opts.enableTasksQueue = enable
604 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
605 }
606
607 /** @inheritDoc */
608 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
609 if (this.opts.enableTasksQueue === true) {
610 this.checkValidTasksQueueOptions(tasksQueueOptions)
611 this.opts.tasksQueueOptions =
612 this.buildTasksQueueOptions(tasksQueueOptions)
613 } else if (this.opts.tasksQueueOptions != null) {
614 delete this.opts.tasksQueueOptions
615 }
616 }
617
618 private buildTasksQueueOptions (
619 tasksQueueOptions: TasksQueueOptions
620 ): TasksQueueOptions {
621 return {
622 concurrency: tasksQueueOptions?.concurrency ?? 1
623 }
624 }
625
626 /**
627 * Whether the pool is full or not.
628 *
629 * The pool filling boolean status.
630 */
631 protected get full (): boolean {
632 return this.workerNodes.length >= this.maxSize
633 }
634
635 /**
636 * Whether the pool is busy or not.
637 *
638 * The pool busyness boolean status.
639 */
640 protected abstract get busy (): boolean
641
642 /**
643 * Whether worker nodes are executing concurrently their tasks quota or not.
644 *
645 * @returns Worker nodes busyness boolean status.
646 */
647 protected internalBusy (): boolean {
648 if (this.opts.enableTasksQueue === true) {
649 return (
650 this.workerNodes.findIndex(
651 (workerNode) =>
652 workerNode.info.ready &&
653 workerNode.usage.tasks.executing <
654 (this.opts.tasksQueueOptions?.concurrency as number)
655 ) === -1
656 )
657 } else {
658 return (
659 this.workerNodes.findIndex(
660 (workerNode) =>
661 workerNode.info.ready && workerNode.usage.tasks.executing === 0
662 ) === -1
663 )
664 }
665 }
666
667 /** @inheritDoc */
668 public listTaskFunctions (): string[] {
669 for (const workerNode of this.workerNodes) {
670 if (
671 Array.isArray(workerNode.info.taskFunctions) &&
672 workerNode.info.taskFunctions.length > 0
673 ) {
674 return workerNode.info.taskFunctions
675 }
676 }
677 return []
678 }
679
680 /** @inheritDoc */
681 public async execute (
682 data?: Data,
683 name?: string,
684 transferList?: TransferListItem[]
685 ): Promise<Response> {
686 return await new Promise<Response>((resolve, reject) => {
687 if (name != null && typeof name !== 'string') {
688 reject(new TypeError('name argument must be a string'))
689 }
690 if (
691 name != null &&
692 typeof name === 'string' &&
693 name.trim().length === 0
694 ) {
695 reject(new TypeError('name argument must not be an empty string'))
696 }
697 if (transferList != null && !Array.isArray(transferList)) {
698 reject(new TypeError('transferList argument must be an array'))
699 }
700 const timestamp = performance.now()
701 const workerNodeKey = this.chooseWorkerNode()
702 const workerInfo = this.getWorkerInfo(workerNodeKey)
703 if (
704 name != null &&
705 Array.isArray(workerInfo.taskFunctions) &&
706 !workerInfo.taskFunctions.includes(name)
707 ) {
708 reject(
709 new Error(`Task function '${name}' is not registered in the pool`)
710 )
711 }
712 const task: Task<Data> = {
713 name: name ?? DEFAULT_TASK_NAME,
714 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
715 data: data ?? ({} as Data),
716 transferList,
717 timestamp,
718 workerId: workerInfo.id as number,
719 taskId: randomUUID()
720 }
721 this.promiseResponseMap.set(task.taskId as string, {
722 resolve,
723 reject,
724 workerNodeKey
725 })
726 if (
727 this.opts.enableTasksQueue === false ||
728 (this.opts.enableTasksQueue === true &&
729 this.workerNodes[workerNodeKey].usage.tasks.executing <
730 (this.opts.tasksQueueOptions?.concurrency as number))
731 ) {
732 this.executeTask(workerNodeKey, task)
733 } else {
734 this.enqueueTask(workerNodeKey, task)
735 }
736 this.checkAndEmitEvents()
737 })
738 }
739
740 /** @inheritDoc */
741 public async destroy (): Promise<void> {
742 await Promise.all(
743 this.workerNodes.map(async (_, workerNodeKey) => {
744 await this.destroyWorkerNode(workerNodeKey)
745 })
746 )
747 this.emitter?.emit(PoolEvents.destroy)
748 }
749
750 protected async sendKillMessageToWorker (
751 workerNodeKey: number,
752 workerId: number
753 ): Promise<void> {
754 await new Promise<void>((resolve, reject) => {
755 this.registerWorkerMessageListener(workerNodeKey, (message) => {
756 if (message.kill === 'success') {
757 resolve()
758 } else if (message.kill === 'failure') {
759 reject(new Error(`Worker ${workerId} kill message handling failed`))
760 }
761 })
762 this.sendToWorker(workerNodeKey, { kill: true, workerId })
763 })
764 }
765
766 /**
767 * Terminates the worker node given its worker node key.
768 *
769 * @param workerNodeKey - The worker node key.
770 */
771 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
772
773 /**
774 * Setup hook to execute code before worker nodes are created in the abstract constructor.
775 * Can be overridden.
776 *
777 * @virtual
778 */
779 protected setupHook (): void {
780 // Intentionally empty
781 }
782
783 /**
784 * Should return whether the worker is the main worker or not.
785 */
786 protected abstract isMain (): boolean
787
788 /**
789 * Hook executed before the worker task execution.
790 * Can be overridden.
791 *
792 * @param workerNodeKey - The worker node key.
793 * @param task - The task to execute.
794 */
795 protected beforeTaskExecutionHook (
796 workerNodeKey: number,
797 task: Task<Data>
798 ): void {
799 const workerUsage = this.workerNodes[workerNodeKey].usage
800 ++workerUsage.tasks.executing
801 this.updateWaitTimeWorkerUsage(workerUsage, task)
802 if (this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey)) {
803 const taskFunctionWorkerUsage = this.workerNodes[
804 workerNodeKey
805 ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
806 ++taskFunctionWorkerUsage.tasks.executing
807 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
808 }
809 }
810
811 /**
812 * Hook executed after the worker task execution.
813 * Can be overridden.
814 *
815 * @param workerNodeKey - The worker node key.
816 * @param message - The received message.
817 */
818 protected afterTaskExecutionHook (
819 workerNodeKey: number,
820 message: MessageValue<Response>
821 ): void {
822 const workerUsage = this.workerNodes[workerNodeKey].usage
823 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
824 this.updateRunTimeWorkerUsage(workerUsage, message)
825 this.updateEluWorkerUsage(workerUsage, message)
826 if (this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey)) {
827 const taskFunctionWorkerUsage = this.workerNodes[
828 workerNodeKey
829 ].getTaskFunctionWorkerUsage(
830 message.taskPerformance?.name ?? DEFAULT_TASK_NAME
831 ) as WorkerUsage
832 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
833 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
834 this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
835 }
836 }
837
838 /**
839 * Whether the worker node shall update its task function worker usage or not.
840 *
841 * @param workerNodeKey - The worker node key.
842 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
843 */
844 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
845 const workerInfo = this.getWorkerInfo(workerNodeKey)
846 return (
847 Array.isArray(workerInfo.taskFunctions) &&
848 workerInfo.taskFunctions.length > 2
849 )
850 }
851
852 private updateTaskStatisticsWorkerUsage (
853 workerUsage: WorkerUsage,
854 message: MessageValue<Response>
855 ): void {
856 const workerTaskStatistics = workerUsage.tasks
857 if (
858 workerTaskStatistics.executing != null &&
859 workerTaskStatistics.executing > 0
860 ) {
861 --workerTaskStatistics.executing
862 } else if (
863 workerTaskStatistics.executing != null &&
864 workerTaskStatistics.executing < 0
865 ) {
866 throw new Error(
867 'Worker usage statistic for tasks executing cannot be negative'
868 )
869 }
870 if (message.taskError == null) {
871 ++workerTaskStatistics.executed
872 } else {
873 ++workerTaskStatistics.failed
874 }
875 }
876
877 private updateRunTimeWorkerUsage (
878 workerUsage: WorkerUsage,
879 message: MessageValue<Response>
880 ): void {
881 updateMeasurementStatistics(
882 workerUsage.runTime,
883 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
884 message.taskPerformance?.runTime ?? 0,
885 workerUsage.tasks.executed
886 )
887 }
888
889 private updateWaitTimeWorkerUsage (
890 workerUsage: WorkerUsage,
891 task: Task<Data>
892 ): void {
893 const timestamp = performance.now()
894 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
895 updateMeasurementStatistics(
896 workerUsage.waitTime,
897 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
898 taskWaitTime,
899 workerUsage.tasks.executed
900 )
901 }
902
903 private updateEluWorkerUsage (
904 workerUsage: WorkerUsage,
905 message: MessageValue<Response>
906 ): void {
907 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
908 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
909 updateMeasurementStatistics(
910 workerUsage.elu.active,
911 eluTaskStatisticsRequirements,
912 message.taskPerformance?.elu?.active ?? 0,
913 workerUsage.tasks.executed
914 )
915 updateMeasurementStatistics(
916 workerUsage.elu.idle,
917 eluTaskStatisticsRequirements,
918 message.taskPerformance?.elu?.idle ?? 0,
919 workerUsage.tasks.executed
920 )
921 if (eluTaskStatisticsRequirements.aggregate) {
922 if (message.taskPerformance?.elu != null) {
923 if (workerUsage.elu.utilization != null) {
924 workerUsage.elu.utilization =
925 (workerUsage.elu.utilization +
926 message.taskPerformance.elu.utilization) /
927 2
928 } else {
929 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
930 }
931 }
932 }
933 }
934
935 /**
936 * Chooses a worker node for the next task.
937 *
938 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
939 *
940 * @returns The chosen worker node key
941 */
942 private chooseWorkerNode (): number {
943 if (this.shallCreateDynamicWorker()) {
944 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
945 if (
946 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
947 ) {
948 return workerNodeKey
949 }
950 }
951 return this.workerChoiceStrategyContext.execute()
952 }
953
954 /**
955 * Conditions for dynamic worker creation.
956 *
957 * @returns Whether to create a dynamic worker or not.
958 */
959 private shallCreateDynamicWorker (): boolean {
960 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
961 }
962
963 /**
964 * Sends a message to worker given its worker node key.
965 *
966 * @param workerNodeKey - The worker node key.
967 * @param message - The message.
968 * @param transferList - The optional array of transferable objects.
969 */
970 protected abstract sendToWorker (
971 workerNodeKey: number,
972 message: MessageValue<Data>,
973 transferList?: TransferListItem[]
974 ): void
975
976 /**
977 * Creates a new worker.
978 *
979 * @returns Newly created worker.
980 */
981 protected abstract createWorker (): Worker
982
983 /**
984 * Creates a new, completely set up worker node.
985 *
986 * @returns New, completely set up worker node key.
987 */
988 protected createAndSetupWorkerNode (): number {
989 const worker = this.createWorker()
990
991 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
992 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
993 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
994 worker.on('error', (error) => {
995 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
996 const workerInfo = this.getWorkerInfo(workerNodeKey)
997 workerInfo.ready = false
998 this.workerNodes[workerNodeKey].closeChannel()
999 this.emitter?.emit(PoolEvents.error, error)
1000 if (this.opts.restartWorkerOnError === true && !this.starting) {
1001 if (workerInfo.dynamic) {
1002 this.createAndSetupDynamicWorkerNode()
1003 } else {
1004 this.createAndSetupWorkerNode()
1005 }
1006 }
1007 if (this.opts.enableTasksQueue === true) {
1008 this.redistributeQueuedTasks(workerNodeKey)
1009 }
1010 })
1011 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
1012 worker.once('exit', () => {
1013 this.removeWorkerNode(worker)
1014 })
1015
1016 const workerNodeKey = this.addWorkerNode(worker)
1017
1018 this.afterWorkerNodeSetup(workerNodeKey)
1019
1020 return workerNodeKey
1021 }
1022
1023 /**
1024 * Creates a new, completely set up dynamic worker node.
1025 *
1026 * @returns New, completely set up dynamic worker node key.
1027 */
1028 protected createAndSetupDynamicWorkerNode (): number {
1029 const workerNodeKey = this.createAndSetupWorkerNode()
1030 this.registerWorkerMessageListener(workerNodeKey, (message) => {
1031 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1032 message.workerId
1033 )
1034 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
1035 // Kill message received from worker
1036 if (
1037 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1038 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
1039 ((this.opts.enableTasksQueue === false &&
1040 workerUsage.tasks.executing === 0) ||
1041 (this.opts.enableTasksQueue === true &&
1042 workerUsage.tasks.executing === 0 &&
1043 this.tasksQueueSize(localWorkerNodeKey) === 0)))
1044 ) {
1045 this.destroyWorkerNode(localWorkerNodeKey).catch((error) => {
1046 this.emitter?.emit(PoolEvents.error, error)
1047 })
1048 }
1049 })
1050 const workerInfo = this.getWorkerInfo(workerNodeKey)
1051 this.sendToWorker(workerNodeKey, {
1052 checkActive: true,
1053 workerId: workerInfo.id as number
1054 })
1055 workerInfo.dynamic = true
1056 if (this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker) {
1057 workerInfo.ready = true
1058 }
1059 return workerNodeKey
1060 }
1061
1062 /**
1063 * Registers a listener callback on the worker given its worker node key.
1064 *
1065 * @param workerNodeKey - The worker node key.
1066 * @param listener - The message listener callback.
1067 */
1068 protected abstract registerWorkerMessageListener<
1069 Message extends Data | Response
1070 >(
1071 workerNodeKey: number,
1072 listener: (message: MessageValue<Message>) => void
1073 ): void
1074
1075 /**
1076 * Method hooked up after a worker node has been newly created.
1077 * Can be overridden.
1078 *
1079 * @param workerNodeKey - The newly created worker node key.
1080 */
1081 protected afterWorkerNodeSetup (workerNodeKey: number): void {
1082 // Listen to worker messages.
1083 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
1084 // Send the startup message to worker.
1085 this.sendStartupMessageToWorker(workerNodeKey)
1086 // Send the statistics message to worker.
1087 this.sendStatisticsMessageToWorker(workerNodeKey)
1088 }
1089
1090 /**
1091 * Sends the startup message to worker given its worker node key.
1092 *
1093 * @param workerNodeKey - The worker node key.
1094 */
1095 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1096
1097 /**
1098 * Sends the statistics message to worker given its worker node key.
1099 *
1100 * @param workerNodeKey - The worker node key.
1101 */
1102 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
1103 this.sendToWorker(workerNodeKey, {
1104 statistics: {
1105 runTime:
1106 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1107 .runTime.aggregate,
1108 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1109 .elu.aggregate
1110 },
1111 workerId: this.getWorkerInfo(workerNodeKey).id as number
1112 })
1113 }
1114
1115 private redistributeQueuedTasks (workerNodeKey: number): void {
1116 while (this.tasksQueueSize(workerNodeKey) > 0) {
1117 let targetWorkerNodeKey: number = workerNodeKey
1118 let minQueuedTasks = Infinity
1119 let executeTask = false
1120 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
1121 const workerInfo = this.getWorkerInfo(workerNodeId)
1122 if (
1123 workerNodeId !== workerNodeKey &&
1124 workerInfo.ready &&
1125 workerNode.usage.tasks.queued === 0
1126 ) {
1127 if (
1128 this.workerNodes[workerNodeId].usage.tasks.executing <
1129 (this.opts.tasksQueueOptions?.concurrency as number)
1130 ) {
1131 executeTask = true
1132 }
1133 targetWorkerNodeKey = workerNodeId
1134 break
1135 }
1136 if (
1137 workerNodeId !== workerNodeKey &&
1138 workerInfo.ready &&
1139 workerNode.usage.tasks.queued < minQueuedTasks
1140 ) {
1141 minQueuedTasks = workerNode.usage.tasks.queued
1142 targetWorkerNodeKey = workerNodeId
1143 }
1144 }
1145 if (executeTask) {
1146 this.executeTask(
1147 targetWorkerNodeKey,
1148 this.dequeueTask(workerNodeKey) as Task<Data>
1149 )
1150 } else {
1151 this.enqueueTask(
1152 targetWorkerNodeKey,
1153 this.dequeueTask(workerNodeKey) as Task<Data>
1154 )
1155 }
1156 }
1157 }
1158
1159 /**
1160 * This method is the listener registered for each worker message.
1161 *
1162 * @returns The listener function to execute when a message is received from a worker.
1163 */
1164 protected workerListener (): (message: MessageValue<Response>) => void {
1165 return (message) => {
1166 this.checkMessageWorkerId(message)
1167 if (message.ready != null && message.taskFunctions != null) {
1168 // Worker ready response received from worker
1169 this.handleWorkerReadyResponse(message)
1170 } else if (message.taskId != null) {
1171 // Task execution response received from worker
1172 this.handleTaskExecutionResponse(message)
1173 } else if (message.taskFunctions != null) {
1174 // Task functions message received from worker
1175 this.getWorkerInfo(
1176 this.getWorkerNodeKeyByWorkerId(message.workerId)
1177 ).taskFunctions = message.taskFunctions
1178 }
1179 }
1180 }
1181
1182 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
1183 if (message.ready === false) {
1184 throw new Error(`Worker ${message.workerId} failed to initialize`)
1185 }
1186 const workerInfo = this.getWorkerInfo(
1187 this.getWorkerNodeKeyByWorkerId(message.workerId)
1188 )
1189 workerInfo.ready = message.ready as boolean
1190 workerInfo.taskFunctions = message.taskFunctions
1191 if (this.emitter != null && this.ready) {
1192 this.emitter.emit(PoolEvents.ready, this.info)
1193 }
1194 }
1195
1196 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1197 const { taskId, taskError, data } = message
1198 const promiseResponse = this.promiseResponseMap.get(taskId as string)
1199 if (promiseResponse != null) {
1200 if (taskError != null) {
1201 this.emitter?.emit(PoolEvents.taskError, taskError)
1202 promiseResponse.reject(taskError.message)
1203 } else {
1204 promiseResponse.resolve(data as Response)
1205 }
1206 const workerNodeKey = promiseResponse.workerNodeKey
1207 this.afterTaskExecutionHook(workerNodeKey, message)
1208 this.promiseResponseMap.delete(taskId as string)
1209 if (
1210 this.opts.enableTasksQueue === true &&
1211 this.tasksQueueSize(workerNodeKey) > 0 &&
1212 this.workerNodes[workerNodeKey].usage.tasks.executing <
1213 (this.opts.tasksQueueOptions?.concurrency as number)
1214 ) {
1215 this.executeTask(
1216 workerNodeKey,
1217 this.dequeueTask(workerNodeKey) as Task<Data>
1218 )
1219 }
1220 this.workerChoiceStrategyContext.update(workerNodeKey)
1221 }
1222 }
1223
1224 private checkAndEmitEvents (): void {
1225 if (this.emitter != null) {
1226 if (this.busy) {
1227 this.emitter.emit(PoolEvents.busy, this.info)
1228 }
1229 if (this.type === PoolTypes.dynamic && this.full) {
1230 this.emitter.emit(PoolEvents.full, this.info)
1231 }
1232 if (this.hasBackPressure()) {
1233 this.emitter.emit(PoolEvents.backPressure, this.info)
1234 }
1235 }
1236 }
1237
1238 /**
1239 * Gets the worker information given its worker node key.
1240 *
1241 * @param workerNodeKey - The worker node key.
1242 * @returns The worker information.
1243 */
1244 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
1245 return this.workerNodes[workerNodeKey].info
1246 }
1247
1248 /**
1249 * Adds the given worker in the pool worker nodes.
1250 *
1251 * @param worker - The worker.
1252 * @returns The added worker node key.
1253 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1254 */
1255 private addWorkerNode (worker: Worker): number {
1256 const workerNode = new WorkerNode<Worker, Data>(
1257 worker,
1258 this.worker,
1259 this.maxSize
1260 )
1261 // Flag the worker node as ready at pool startup.
1262 if (this.starting) {
1263 workerNode.info.ready = true
1264 }
1265 this.workerNodes.push(workerNode)
1266 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1267 if (workerNodeKey === -1) {
1268 throw new Error('Worker node added not found')
1269 }
1270 return workerNodeKey
1271 }
1272
1273 /**
1274 * Removes the given worker from the pool worker nodes.
1275 *
1276 * @param worker - The worker.
1277 */
1278 private removeWorkerNode (worker: Worker): void {
1279 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1280 if (workerNodeKey !== -1) {
1281 this.workerNodes.splice(workerNodeKey, 1)
1282 this.workerChoiceStrategyContext.remove(workerNodeKey)
1283 }
1284 }
1285
1286 /** @inheritDoc */
1287 public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
1288 return (
1289 this.opts.enableTasksQueue === true &&
1290 this.workerNodes[workerNodeKey].hasBackPressure()
1291 )
1292 }
1293
1294 private hasBackPressure (): boolean {
1295 return (
1296 this.opts.enableTasksQueue === true &&
1297 this.workerNodes.findIndex(
1298 (workerNode) => !workerNode.hasBackPressure()
1299 ) !== -1
1300 )
1301 }
1302
1303 /**
1304 * Executes the given task on the worker given its worker node key.
1305 *
1306 * @param workerNodeKey - The worker node key.
1307 * @param task - The task to execute.
1308 */
1309 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1310 this.beforeTaskExecutionHook(workerNodeKey, task)
1311 this.sendToWorker(workerNodeKey, task, task.transferList)
1312 }
1313
1314 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1315 return this.workerNodes[workerNodeKey].enqueueTask(task)
1316 }
1317
1318 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1319 return this.workerNodes[workerNodeKey].dequeueTask()
1320 }
1321
1322 private tasksQueueSize (workerNodeKey: number): number {
1323 return this.workerNodes[workerNodeKey].tasksQueueSize()
1324 }
1325
1326 protected flushTasksQueue (workerNodeKey: number): void {
1327 while (this.tasksQueueSize(workerNodeKey) > 0) {
1328 this.executeTask(
1329 workerNodeKey,
1330 this.dequeueTask(workerNodeKey) as Task<Data>
1331 )
1332 }
1333 this.workerNodes[workerNodeKey].clearTasksQueue()
1334 }
1335
1336 private flushTasksQueues (): void {
1337 for (const [workerNodeKey] of this.workerNodes.entries()) {
1338 this.flushTasksQueue(workerNodeKey)
1339 }
1340 }
1341 }