fix: ensure pool event full is emitted only once
[poolifier.git] / src / pools / abstract-pool.ts
1 import { randomUUID } from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import { existsSync } from 'node:fs'
4 import { type TransferListItem } from 'node:worker_threads'
5 import type {
6 MessageValue,
7 PromiseResponseWrapper,
8 Task
9 } from '../utility-types'
10 import {
11 DEFAULT_TASK_NAME,
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
13 EMPTY_FUNCTION,
14 isKillBehavior,
15 isPlainObject,
16 median,
17 round,
18 updateMeasurementStatistics
19 } from '../utils'
20 import { KillBehaviors } from '../worker/worker-options'
21 import {
22 type IPool,
23 PoolEmitter,
24 PoolEvents,
25 type PoolInfo,
26 type PoolOptions,
27 type PoolType,
28 PoolTypes,
29 type TasksQueueOptions
30 } from './pool'
31 import type {
32 IWorker,
33 IWorkerNode,
34 WorkerInfo,
35 WorkerType,
36 WorkerUsage
37 } from './worker'
38 import {
39 type MeasurementStatisticsRequirements,
40 Measurements,
41 WorkerChoiceStrategies,
42 type WorkerChoiceStrategy,
43 type WorkerChoiceStrategyOptions
44 } from './selection-strategies/selection-strategies-types'
45 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
46 import { version } from './version'
47 import { WorkerNode } from './worker-node'
48
49 /**
50 * Base class that implements some shared logic for all poolifier pools.
51 *
52 * @typeParam Worker - Type of worker which manages this pool.
53 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
54 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
55 */
56 export abstract class AbstractPool<
57 Worker extends IWorker,
58 Data = unknown,
59 Response = unknown
60 > implements IPool<Worker, Data, Response> {
61 /** @inheritDoc */
62 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
63
64 /** @inheritDoc */
65 public readonly emitter?: PoolEmitter
66
67 /**
68 * The task execution response promise map.
69 *
70 * - `key`: The message id of each submitted task.
71 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
72 *
73 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
74 */
75 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
76 new Map<string, PromiseResponseWrapper<Response>>()
77
78 /**
79 * Worker choice strategy context referencing a worker choice algorithm implementation.
80 */
81 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
82 Worker,
83 Data,
84 Response
85 >
86
87 /**
88 * Whether the pool is starting or not.
89 */
90 private readonly starting: boolean
91 /**
92 * The start timestamp of the pool.
93 */
94 private readonly startTimestamp
95
96 /**
97 * Constructs a new poolifier pool.
98 *
99 * @param numberOfWorkers - Number of workers that this pool should manage.
100 * @param filePath - Path to the worker file.
101 * @param opts - Options for the pool.
102 */
103 public constructor (
104 protected readonly numberOfWorkers: number,
105 protected readonly filePath: string,
106 protected readonly opts: PoolOptions<Worker>
107 ) {
108 if (!this.isMain()) {
109 throw new Error(
110 'Cannot start a pool from a worker with the same type as the pool'
111 )
112 }
113 this.checkNumberOfWorkers(this.numberOfWorkers)
114 this.checkFilePath(this.filePath)
115 this.checkPoolOptions(this.opts)
116
117 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
118 this.executeTask = this.executeTask.bind(this)
119 this.enqueueTask = this.enqueueTask.bind(this)
120 this.dequeueTask = this.dequeueTask.bind(this)
121 this.checkAndEmitTaskExecutionEvents =
122 this.checkAndEmitTaskExecutionEvents.bind(this)
123 this.checkAndEmitTaskQueuingEvents =
124 this.checkAndEmitTaskQueuingEvents.bind(this)
125 this.checkAndEmitDynamicWorkerCreationEvents =
126 this.checkAndEmitDynamicWorkerCreationEvents.bind(this)
127
128 if (this.opts.enableEvents === true) {
129 this.emitter = new PoolEmitter()
130 }
131 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
132 Worker,
133 Data,
134 Response
135 >(
136 this,
137 this.opts.workerChoiceStrategy,
138 this.opts.workerChoiceStrategyOptions
139 )
140
141 this.setupHook()
142
143 this.starting = true
144 this.startPool()
145 this.starting = false
146
147 this.startTimestamp = performance.now()
148 }
149
150 private checkFilePath (filePath: string): void {
151 if (
152 filePath == null ||
153 typeof filePath !== 'string' ||
154 (typeof filePath === 'string' && filePath.trim().length === 0)
155 ) {
156 throw new Error('Please specify a file with a worker implementation')
157 }
158 if (!existsSync(filePath)) {
159 throw new Error(`Cannot find the worker file '${filePath}'`)
160 }
161 }
162
163 private checkNumberOfWorkers (numberOfWorkers: number): void {
164 if (numberOfWorkers == null) {
165 throw new Error(
166 'Cannot instantiate a pool without specifying the number of workers'
167 )
168 } else if (!Number.isSafeInteger(numberOfWorkers)) {
169 throw new TypeError(
170 'Cannot instantiate a pool with a non safe integer number of workers'
171 )
172 } else if (numberOfWorkers < 0) {
173 throw new RangeError(
174 'Cannot instantiate a pool with a negative number of workers'
175 )
176 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
177 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
178 }
179 }
180
181 protected checkDynamicPoolSize (min: number, max: number): void {
182 if (this.type === PoolTypes.dynamic) {
183 if (max == null) {
184 throw new Error(
185 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
186 )
187 } else if (!Number.isSafeInteger(max)) {
188 throw new TypeError(
189 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
190 )
191 } else if (min > max) {
192 throw new RangeError(
193 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
194 )
195 } else if (max === 0) {
196 throw new RangeError(
197 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
198 )
199 } else if (min === max) {
200 throw new RangeError(
201 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
202 )
203 }
204 }
205 }
206
207 private checkPoolOptions (opts: PoolOptions<Worker>): void {
208 if (isPlainObject(opts)) {
209 this.opts.workerChoiceStrategy =
210 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
211 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
212 this.opts.workerChoiceStrategyOptions = {
213 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
214 ...opts.workerChoiceStrategyOptions
215 }
216 this.checkValidWorkerChoiceStrategyOptions(
217 this.opts.workerChoiceStrategyOptions
218 )
219 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
220 this.opts.enableEvents = opts.enableEvents ?? true
221 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
222 if (this.opts.enableTasksQueue) {
223 this.checkValidTasksQueueOptions(
224 opts.tasksQueueOptions as TasksQueueOptions
225 )
226 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
227 opts.tasksQueueOptions as TasksQueueOptions
228 )
229 }
230 } else {
231 throw new TypeError('Invalid pool options: must be a plain object')
232 }
233 }
234
235 private checkValidWorkerChoiceStrategy (
236 workerChoiceStrategy: WorkerChoiceStrategy
237 ): void {
238 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
239 throw new Error(
240 `Invalid worker choice strategy '${workerChoiceStrategy}'`
241 )
242 }
243 }
244
245 private checkValidWorkerChoiceStrategyOptions (
246 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
247 ): void {
248 if (!isPlainObject(workerChoiceStrategyOptions)) {
249 throw new TypeError(
250 'Invalid worker choice strategy options: must be a plain object'
251 )
252 }
253 if (
254 workerChoiceStrategyOptions.choiceRetries != null &&
255 !Number.isSafeInteger(workerChoiceStrategyOptions.choiceRetries)
256 ) {
257 throw new TypeError(
258 'Invalid worker choice strategy options: choice retries must be an integer'
259 )
260 }
261 if (
262 workerChoiceStrategyOptions.choiceRetries != null &&
263 workerChoiceStrategyOptions.choiceRetries <= 0
264 ) {
265 throw new RangeError(
266 `Invalid worker choice strategy options: choice retries '${workerChoiceStrategyOptions.choiceRetries}' must be greater than zero`
267 )
268 }
269 if (
270 workerChoiceStrategyOptions.weights != null &&
271 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
272 ) {
273 throw new Error(
274 'Invalid worker choice strategy options: must have a weight for each worker node'
275 )
276 }
277 if (
278 workerChoiceStrategyOptions.measurement != null &&
279 !Object.values(Measurements).includes(
280 workerChoiceStrategyOptions.measurement
281 )
282 ) {
283 throw new Error(
284 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
285 )
286 }
287 }
288
289 private checkValidTasksQueueOptions (
290 tasksQueueOptions: TasksQueueOptions
291 ): void {
292 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
293 throw new TypeError('Invalid tasks queue options: must be a plain object')
294 }
295 if (
296 tasksQueueOptions?.concurrency != null &&
297 !Number.isSafeInteger(tasksQueueOptions.concurrency)
298 ) {
299 throw new TypeError(
300 'Invalid worker tasks concurrency: must be an integer'
301 )
302 }
303 if (
304 tasksQueueOptions?.concurrency != null &&
305 tasksQueueOptions.concurrency <= 0
306 ) {
307 throw new Error(
308 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}' is a negative integer or zero`
309 )
310 }
311 }
312
313 private startPool (): void {
314 while (
315 this.workerNodes.reduce(
316 (accumulator, workerNode) =>
317 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
318 0
319 ) < this.numberOfWorkers
320 ) {
321 this.createAndSetupWorkerNode()
322 }
323 }
324
325 /** @inheritDoc */
326 public get info (): PoolInfo {
327 return {
328 version,
329 type: this.type,
330 worker: this.worker,
331 ready: this.ready,
332 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
333 minSize: this.minSize,
334 maxSize: this.maxSize,
335 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
336 .runTime.aggregate &&
337 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
338 .waitTime.aggregate && { utilization: round(this.utilization) }),
339 workerNodes: this.workerNodes.length,
340 idleWorkerNodes: this.workerNodes.reduce(
341 (accumulator, workerNode) =>
342 workerNode.usage.tasks.executing === 0
343 ? accumulator + 1
344 : accumulator,
345 0
346 ),
347 busyWorkerNodes: this.workerNodes.reduce(
348 (accumulator, workerNode) =>
349 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
350 0
351 ),
352 executedTasks: this.workerNodes.reduce(
353 (accumulator, workerNode) =>
354 accumulator + workerNode.usage.tasks.executed,
355 0
356 ),
357 executingTasks: this.workerNodes.reduce(
358 (accumulator, workerNode) =>
359 accumulator + workerNode.usage.tasks.executing,
360 0
361 ),
362 ...(this.opts.enableTasksQueue === true && {
363 queuedTasks: this.workerNodes.reduce(
364 (accumulator, workerNode) =>
365 accumulator + workerNode.usage.tasks.queued,
366 0
367 )
368 }),
369 ...(this.opts.enableTasksQueue === true && {
370 maxQueuedTasks: this.workerNodes.reduce(
371 (accumulator, workerNode) =>
372 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
373 0
374 )
375 }),
376 ...(this.opts.enableTasksQueue === true && {
377 backPressure: this.hasBackPressure()
378 }),
379 failedTasks: this.workerNodes.reduce(
380 (accumulator, workerNode) =>
381 accumulator + workerNode.usage.tasks.failed,
382 0
383 ),
384 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
385 .runTime.aggregate && {
386 runTime: {
387 minimum: round(
388 Math.min(
389 ...this.workerNodes.map(
390 (workerNode) => workerNode.usage.runTime?.minimum ?? Infinity
391 )
392 )
393 ),
394 maximum: round(
395 Math.max(
396 ...this.workerNodes.map(
397 (workerNode) => workerNode.usage.runTime?.maximum ?? -Infinity
398 )
399 )
400 ),
401 average: round(
402 this.workerNodes.reduce(
403 (accumulator, workerNode) =>
404 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
405 0
406 ) /
407 this.workerNodes.reduce(
408 (accumulator, workerNode) =>
409 accumulator + (workerNode.usage.tasks?.executed ?? 0),
410 0
411 )
412 ),
413 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
414 .runTime.median && {
415 median: round(
416 median(
417 this.workerNodes.map(
418 (workerNode) => workerNode.usage.runTime?.median ?? 0
419 )
420 )
421 )
422 })
423 }
424 }),
425 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
426 .waitTime.aggregate && {
427 waitTime: {
428 minimum: round(
429 Math.min(
430 ...this.workerNodes.map(
431 (workerNode) => workerNode.usage.waitTime?.minimum ?? Infinity
432 )
433 )
434 ),
435 maximum: round(
436 Math.max(
437 ...this.workerNodes.map(
438 (workerNode) => workerNode.usage.waitTime?.maximum ?? -Infinity
439 )
440 )
441 ),
442 average: round(
443 this.workerNodes.reduce(
444 (accumulator, workerNode) =>
445 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
446 0
447 ) /
448 this.workerNodes.reduce(
449 (accumulator, workerNode) =>
450 accumulator + (workerNode.usage.tasks?.executed ?? 0),
451 0
452 )
453 ),
454 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
455 .waitTime.median && {
456 median: round(
457 median(
458 this.workerNodes.map(
459 (workerNode) => workerNode.usage.waitTime?.median ?? 0
460 )
461 )
462 )
463 })
464 }
465 })
466 }
467 }
468
469 /**
470 * The pool readiness boolean status.
471 */
472 private get ready (): boolean {
473 return (
474 this.workerNodes.reduce(
475 (accumulator, workerNode) =>
476 !workerNode.info.dynamic && workerNode.info.ready
477 ? accumulator + 1
478 : accumulator,
479 0
480 ) >= this.minSize
481 )
482 }
483
484 /**
485 * The approximate pool utilization.
486 *
487 * @returns The pool utilization.
488 */
489 private get utilization (): number {
490 const poolTimeCapacity =
491 (performance.now() - this.startTimestamp) * this.maxSize
492 const totalTasksRunTime = this.workerNodes.reduce(
493 (accumulator, workerNode) =>
494 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
495 0
496 )
497 const totalTasksWaitTime = this.workerNodes.reduce(
498 (accumulator, workerNode) =>
499 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
500 0
501 )
502 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
503 }
504
505 /**
506 * The pool type.
507 *
508 * If it is `'dynamic'`, it provides the `max` property.
509 */
510 protected abstract get type (): PoolType
511
512 /**
513 * The worker type.
514 */
515 protected abstract get worker (): WorkerType
516
517 /**
518 * The pool minimum size.
519 */
520 protected abstract get minSize (): number
521
522 /**
523 * The pool maximum size.
524 */
525 protected abstract get maxSize (): number
526
527 /**
528 * Checks if the worker id sent in the received message from a worker is valid.
529 *
530 * @param message - The received message.
531 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
532 */
533 private checkMessageWorkerId (message: MessageValue<Response>): void {
534 if (message.workerId == null) {
535 throw new Error('Worker message received without worker id')
536 } else if (
537 message.workerId != null &&
538 this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
539 ) {
540 throw new Error(
541 `Worker message received from unknown worker '${message.workerId}'`
542 )
543 }
544 }
545
546 /**
547 * Gets the given worker its worker node key.
548 *
549 * @param worker - The worker.
550 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
551 */
552 private getWorkerNodeKeyByWorker (worker: Worker): number {
553 return this.workerNodes.findIndex(
554 (workerNode) => workerNode.worker === worker
555 )
556 }
557
558 /**
559 * Gets the worker node key given its worker id.
560 *
561 * @param workerId - The worker id.
562 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
563 */
564 private getWorkerNodeKeyByWorkerId (workerId: number): number {
565 return this.workerNodes.findIndex(
566 (workerNode) => workerNode.info.id === workerId
567 )
568 }
569
570 /** @inheritDoc */
571 public setWorkerChoiceStrategy (
572 workerChoiceStrategy: WorkerChoiceStrategy,
573 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
574 ): void {
575 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
576 this.opts.workerChoiceStrategy = workerChoiceStrategy
577 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
578 this.opts.workerChoiceStrategy
579 )
580 if (workerChoiceStrategyOptions != null) {
581 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
582 }
583 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
584 workerNode.resetUsage()
585 this.sendStatisticsMessageToWorker(workerNodeKey)
586 }
587 }
588
589 /** @inheritDoc */
590 public setWorkerChoiceStrategyOptions (
591 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
592 ): void {
593 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
594 this.opts.workerChoiceStrategyOptions = {
595 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
596 ...workerChoiceStrategyOptions
597 }
598 this.workerChoiceStrategyContext.setOptions(
599 this.opts.workerChoiceStrategyOptions
600 )
601 }
602
603 /** @inheritDoc */
604 public enableTasksQueue (
605 enable: boolean,
606 tasksQueueOptions?: TasksQueueOptions
607 ): void {
608 if (this.opts.enableTasksQueue === true && !enable) {
609 this.flushTasksQueues()
610 }
611 this.opts.enableTasksQueue = enable
612 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
613 }
614
615 /** @inheritDoc */
616 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
617 if (this.opts.enableTasksQueue === true) {
618 this.checkValidTasksQueueOptions(tasksQueueOptions)
619 this.opts.tasksQueueOptions =
620 this.buildTasksQueueOptions(tasksQueueOptions)
621 } else if (this.opts.tasksQueueOptions != null) {
622 delete this.opts.tasksQueueOptions
623 }
624 }
625
626 private buildTasksQueueOptions (
627 tasksQueueOptions: TasksQueueOptions
628 ): TasksQueueOptions {
629 return {
630 concurrency: tasksQueueOptions?.concurrency ?? 1
631 }
632 }
633
634 /**
635 * Whether the pool is full or not.
636 *
637 * The pool filling boolean status.
638 */
639 protected get full (): boolean {
640 return this.workerNodes.length >= this.maxSize
641 }
642
643 /**
644 * Whether the pool is busy or not.
645 *
646 * The pool busyness boolean status.
647 */
648 protected abstract get busy (): boolean
649
650 /**
651 * Whether worker nodes are executing concurrently their tasks quota or not.
652 *
653 * @returns Worker nodes busyness boolean status.
654 */
655 protected internalBusy (): boolean {
656 if (this.opts.enableTasksQueue === true) {
657 return (
658 this.workerNodes.findIndex(
659 (workerNode) =>
660 workerNode.info.ready &&
661 workerNode.usage.tasks.executing <
662 (this.opts.tasksQueueOptions?.concurrency as number)
663 ) === -1
664 )
665 } else {
666 return (
667 this.workerNodes.findIndex(
668 (workerNode) =>
669 workerNode.info.ready && workerNode.usage.tasks.executing === 0
670 ) === -1
671 )
672 }
673 }
674
675 /** @inheritDoc */
676 public listTaskFunctions (): string[] {
677 for (const workerNode of this.workerNodes) {
678 if (
679 Array.isArray(workerNode.info.taskFunctions) &&
680 workerNode.info.taskFunctions.length > 0
681 ) {
682 return workerNode.info.taskFunctions
683 }
684 }
685 return []
686 }
687
688 /** @inheritDoc */
689 public async execute (
690 data?: Data,
691 name?: string,
692 transferList?: TransferListItem[]
693 ): Promise<Response> {
694 return await new Promise<Response>((resolve, reject) => {
695 if (name != null && typeof name !== 'string') {
696 reject(new TypeError('name argument must be a string'))
697 }
698 if (
699 name != null &&
700 typeof name === 'string' &&
701 name.trim().length === 0
702 ) {
703 reject(new TypeError('name argument must not be an empty string'))
704 }
705 if (transferList != null && !Array.isArray(transferList)) {
706 reject(new TypeError('transferList argument must be an array'))
707 }
708 const timestamp = performance.now()
709 const workerNodeKey = this.chooseWorkerNode()
710 const workerInfo = this.getWorkerInfo(workerNodeKey)
711 if (
712 name != null &&
713 Array.isArray(workerInfo.taskFunctions) &&
714 !workerInfo.taskFunctions.includes(name)
715 ) {
716 reject(
717 new Error(`Task function '${name}' is not registered in the pool`)
718 )
719 }
720 const task: Task<Data> = {
721 name: name ?? DEFAULT_TASK_NAME,
722 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
723 data: data ?? ({} as Data),
724 transferList,
725 timestamp,
726 workerId: workerInfo.id as number,
727 taskId: randomUUID()
728 }
729 this.promiseResponseMap.set(task.taskId as string, {
730 resolve,
731 reject,
732 workerNodeKey
733 })
734 if (
735 this.opts.enableTasksQueue === false ||
736 (this.opts.enableTasksQueue === true &&
737 this.workerNodes[workerNodeKey].usage.tasks.executing <
738 (this.opts.tasksQueueOptions?.concurrency as number))
739 ) {
740 this.executeTask(workerNodeKey, task)
741 } else {
742 this.enqueueTask(workerNodeKey, task)
743 }
744 })
745 }
746
747 /** @inheritDoc */
748 public async destroy (): Promise<void> {
749 await Promise.all(
750 this.workerNodes.map(async (_, workerNodeKey) => {
751 await this.destroyWorkerNode(workerNodeKey)
752 })
753 )
754 this.emitter?.emit(PoolEvents.destroy, this.info)
755 }
756
757 protected async sendKillMessageToWorker (
758 workerNodeKey: number,
759 workerId: number
760 ): Promise<void> {
761 await new Promise<void>((resolve, reject) => {
762 this.registerWorkerMessageListener(workerNodeKey, (message) => {
763 if (message.kill === 'success') {
764 resolve()
765 } else if (message.kill === 'failure') {
766 reject(new Error(`Worker ${workerId} kill message handling failed`))
767 }
768 })
769 this.sendToWorker(workerNodeKey, { kill: true, workerId })
770 })
771 }
772
773 /**
774 * Terminates the worker node given its worker node key.
775 *
776 * @param workerNodeKey - The worker node key.
777 */
778 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
779
780 /**
781 * Setup hook to execute code before worker nodes are created in the abstract constructor.
782 * Can be overridden.
783 *
784 * @virtual
785 */
786 protected setupHook (): void {
787 // Intentionally empty
788 }
789
790 /**
791 * Should return whether the worker is the main worker or not.
792 */
793 protected abstract isMain (): boolean
794
795 /**
796 * Hook executed before the worker task execution.
797 * Can be overridden.
798 *
799 * @param workerNodeKey - The worker node key.
800 * @param task - The task to execute.
801 */
802 protected beforeTaskExecutionHook (
803 workerNodeKey: number,
804 task: Task<Data>
805 ): void {
806 const workerUsage = this.workerNodes[workerNodeKey].usage
807 ++workerUsage.tasks.executing
808 this.updateWaitTimeWorkerUsage(workerUsage, task)
809 if (this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey)) {
810 const taskFunctionWorkerUsage = this.workerNodes[
811 workerNodeKey
812 ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
813 ++taskFunctionWorkerUsage.tasks.executing
814 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
815 }
816 }
817
818 /**
819 * Hook executed after the worker task execution.
820 * Can be overridden.
821 *
822 * @param workerNodeKey - The worker node key.
823 * @param message - The received message.
824 */
825 protected afterTaskExecutionHook (
826 workerNodeKey: number,
827 message: MessageValue<Response>
828 ): void {
829 const workerUsage = this.workerNodes[workerNodeKey].usage
830 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
831 this.updateRunTimeWorkerUsage(workerUsage, message)
832 this.updateEluWorkerUsage(workerUsage, message)
833 if (this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey)) {
834 const taskFunctionWorkerUsage = this.workerNodes[
835 workerNodeKey
836 ].getTaskFunctionWorkerUsage(
837 message.taskPerformance?.name ?? DEFAULT_TASK_NAME
838 ) as WorkerUsage
839 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
840 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
841 this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
842 }
843 }
844
845 /**
846 * Whether the worker node shall update its task function worker usage or not.
847 *
848 * @param workerNodeKey - The worker node key.
849 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
850 */
851 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
852 const workerInfo = this.getWorkerInfo(workerNodeKey)
853 return (
854 Array.isArray(workerInfo.taskFunctions) &&
855 workerInfo.taskFunctions.length > 2
856 )
857 }
858
859 private updateTaskStatisticsWorkerUsage (
860 workerUsage: WorkerUsage,
861 message: MessageValue<Response>
862 ): void {
863 const workerTaskStatistics = workerUsage.tasks
864 if (
865 workerTaskStatistics.executing != null &&
866 workerTaskStatistics.executing > 0
867 ) {
868 --workerTaskStatistics.executing
869 } else if (
870 workerTaskStatistics.executing != null &&
871 workerTaskStatistics.executing < 0
872 ) {
873 throw new Error(
874 'Worker usage statistic for tasks executing cannot be negative'
875 )
876 }
877 if (message.taskError == null) {
878 ++workerTaskStatistics.executed
879 } else {
880 ++workerTaskStatistics.failed
881 }
882 }
883
884 private updateRunTimeWorkerUsage (
885 workerUsage: WorkerUsage,
886 message: MessageValue<Response>
887 ): void {
888 updateMeasurementStatistics(
889 workerUsage.runTime,
890 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
891 message.taskPerformance?.runTime ?? 0,
892 workerUsage.tasks.executed
893 )
894 }
895
896 private updateWaitTimeWorkerUsage (
897 workerUsage: WorkerUsage,
898 task: Task<Data>
899 ): void {
900 const timestamp = performance.now()
901 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
902 updateMeasurementStatistics(
903 workerUsage.waitTime,
904 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
905 taskWaitTime,
906 workerUsage.tasks.executed
907 )
908 }
909
910 private updateEluWorkerUsage (
911 workerUsage: WorkerUsage,
912 message: MessageValue<Response>
913 ): void {
914 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
915 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
916 updateMeasurementStatistics(
917 workerUsage.elu.active,
918 eluTaskStatisticsRequirements,
919 message.taskPerformance?.elu?.active ?? 0,
920 workerUsage.tasks.executed
921 )
922 updateMeasurementStatistics(
923 workerUsage.elu.idle,
924 eluTaskStatisticsRequirements,
925 message.taskPerformance?.elu?.idle ?? 0,
926 workerUsage.tasks.executed
927 )
928 if (eluTaskStatisticsRequirements.aggregate) {
929 if (message.taskPerformance?.elu != null) {
930 if (workerUsage.elu.utilization != null) {
931 workerUsage.elu.utilization =
932 (workerUsage.elu.utilization +
933 message.taskPerformance.elu.utilization) /
934 2
935 } else {
936 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
937 }
938 }
939 }
940 }
941
942 /**
943 * Chooses a worker node for the next task.
944 *
945 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
946 *
947 * @returns The chosen worker node key
948 */
949 private chooseWorkerNode (): number {
950 if (this.shallCreateDynamicWorker()) {
951 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
952 if (
953 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
954 ) {
955 return workerNodeKey
956 }
957 }
958 return this.workerChoiceStrategyContext.execute()
959 }
960
961 /**
962 * Conditions for dynamic worker creation.
963 *
964 * @returns Whether to create a dynamic worker or not.
965 */
966 private shallCreateDynamicWorker (): boolean {
967 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
968 }
969
970 /**
971 * Sends a message to worker given its worker node key.
972 *
973 * @param workerNodeKey - The worker node key.
974 * @param message - The message.
975 * @param transferList - The optional array of transferable objects.
976 */
977 protected abstract sendToWorker (
978 workerNodeKey: number,
979 message: MessageValue<Data>,
980 transferList?: TransferListItem[]
981 ): void
982
983 /**
984 * Creates a new worker.
985 *
986 * @returns Newly created worker.
987 */
988 protected abstract createWorker (): Worker
989
990 /**
991 * Creates a new, completely set up worker node.
992 *
993 * @returns New, completely set up worker node key.
994 */
995 protected createAndSetupWorkerNode (): number {
996 const worker = this.createWorker()
997
998 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
999 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
1000 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1001 worker.on('error', (error) => {
1002 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1003 const workerInfo = this.getWorkerInfo(workerNodeKey)
1004 workerInfo.ready = false
1005 this.workerNodes[workerNodeKey].closeChannel()
1006 this.emitter?.emit(PoolEvents.error, error)
1007 if (this.opts.restartWorkerOnError === true && !this.starting) {
1008 if (workerInfo.dynamic) {
1009 this.createAndSetupDynamicWorkerNode()
1010 } else {
1011 this.createAndSetupWorkerNode()
1012 }
1013 }
1014 if (this.opts.enableTasksQueue === true) {
1015 this.redistributeQueuedTasks(workerNodeKey)
1016 }
1017 })
1018 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
1019 worker.once('exit', () => {
1020 this.removeWorkerNode(worker)
1021 })
1022
1023 const workerNodeKey = this.addWorkerNode(worker)
1024
1025 this.afterWorkerNodeSetup(workerNodeKey)
1026
1027 return workerNodeKey
1028 }
1029
1030 /**
1031 * Creates a new, completely set up dynamic worker node.
1032 *
1033 * @returns New, completely set up dynamic worker node key.
1034 */
1035 protected createAndSetupDynamicWorkerNode (): number {
1036 const workerNodeKey = this.createAndSetupWorkerNode()
1037 this.registerWorkerMessageListener(workerNodeKey, (message) => {
1038 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1039 message.workerId
1040 )
1041 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
1042 // Kill message received from worker
1043 if (
1044 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1045 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
1046 ((this.opts.enableTasksQueue === false &&
1047 workerUsage.tasks.executing === 0) ||
1048 (this.opts.enableTasksQueue === true &&
1049 workerUsage.tasks.executing === 0 &&
1050 this.tasksQueueSize(localWorkerNodeKey) === 0)))
1051 ) {
1052 this.destroyWorkerNode(localWorkerNodeKey).catch((error) => {
1053 this.emitter?.emit(PoolEvents.error, error)
1054 })
1055 }
1056 })
1057 const workerInfo = this.getWorkerInfo(workerNodeKey)
1058 this.sendToWorker(workerNodeKey, {
1059 checkActive: true,
1060 workerId: workerInfo.id as number
1061 })
1062 workerInfo.dynamic = true
1063 if (this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker) {
1064 workerInfo.ready = true
1065 }
1066 this.checkAndEmitDynamicWorkerCreationEvents()
1067 return workerNodeKey
1068 }
1069
1070 /**
1071 * Registers a listener callback on the worker given its worker node key.
1072 *
1073 * @param workerNodeKey - The worker node key.
1074 * @param listener - The message listener callback.
1075 */
1076 protected abstract registerWorkerMessageListener<
1077 Message extends Data | Response
1078 >(
1079 workerNodeKey: number,
1080 listener: (message: MessageValue<Message>) => void
1081 ): void
1082
1083 /**
1084 * Method hooked up after a worker node has been newly created.
1085 * Can be overridden.
1086 *
1087 * @param workerNodeKey - The newly created worker node key.
1088 */
1089 protected afterWorkerNodeSetup (workerNodeKey: number): void {
1090 // Listen to worker messages.
1091 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
1092 // Send the startup message to worker.
1093 this.sendStartupMessageToWorker(workerNodeKey)
1094 // Send the statistics message to worker.
1095 this.sendStatisticsMessageToWorker(workerNodeKey)
1096 }
1097
1098 /**
1099 * Sends the startup message to worker given its worker node key.
1100 *
1101 * @param workerNodeKey - The worker node key.
1102 */
1103 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1104
1105 /**
1106 * Sends the statistics message to worker given its worker node key.
1107 *
1108 * @param workerNodeKey - The worker node key.
1109 */
1110 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
1111 this.sendToWorker(workerNodeKey, {
1112 statistics: {
1113 runTime:
1114 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1115 .runTime.aggregate,
1116 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1117 .elu.aggregate
1118 },
1119 workerId: this.getWorkerInfo(workerNodeKey).id as number
1120 })
1121 }
1122
1123 private redistributeQueuedTasks (workerNodeKey: number): void {
1124 while (this.tasksQueueSize(workerNodeKey) > 0) {
1125 let targetWorkerNodeKey: number = workerNodeKey
1126 let minQueuedTasks = Infinity
1127 let executeTask = false
1128 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
1129 const workerInfo = this.getWorkerInfo(workerNodeId)
1130 if (
1131 workerNodeId !== workerNodeKey &&
1132 workerInfo.ready &&
1133 workerNode.usage.tasks.queued === 0
1134 ) {
1135 if (
1136 this.workerNodes[workerNodeId].usage.tasks.executing <
1137 (this.opts.tasksQueueOptions?.concurrency as number)
1138 ) {
1139 executeTask = true
1140 }
1141 targetWorkerNodeKey = workerNodeId
1142 break
1143 }
1144 if (
1145 workerNodeId !== workerNodeKey &&
1146 workerInfo.ready &&
1147 workerNode.usage.tasks.queued < minQueuedTasks
1148 ) {
1149 minQueuedTasks = workerNode.usage.tasks.queued
1150 targetWorkerNodeKey = workerNodeId
1151 }
1152 }
1153 if (executeTask) {
1154 this.executeTask(
1155 targetWorkerNodeKey,
1156 this.dequeueTask(workerNodeKey) as Task<Data>
1157 )
1158 } else {
1159 this.enqueueTask(
1160 targetWorkerNodeKey,
1161 this.dequeueTask(workerNodeKey) as Task<Data>
1162 )
1163 }
1164 }
1165 }
1166
1167 /**
1168 * This method is the listener registered for each worker message.
1169 *
1170 * @returns The listener function to execute when a message is received from a worker.
1171 */
1172 protected workerListener (): (message: MessageValue<Response>) => void {
1173 return (message) => {
1174 this.checkMessageWorkerId(message)
1175 if (message.ready != null && message.taskFunctions != null) {
1176 // Worker ready response received from worker
1177 this.handleWorkerReadyResponse(message)
1178 } else if (message.taskId != null) {
1179 // Task execution response received from worker
1180 this.handleTaskExecutionResponse(message)
1181 } else if (message.taskFunctions != null) {
1182 // Task functions message received from worker
1183 this.getWorkerInfo(
1184 this.getWorkerNodeKeyByWorkerId(message.workerId)
1185 ).taskFunctions = message.taskFunctions
1186 }
1187 }
1188 }
1189
1190 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
1191 if (message.ready === false) {
1192 throw new Error(`Worker ${message.workerId} failed to initialize`)
1193 }
1194 const workerInfo = this.getWorkerInfo(
1195 this.getWorkerNodeKeyByWorkerId(message.workerId)
1196 )
1197 workerInfo.ready = message.ready as boolean
1198 workerInfo.taskFunctions = message.taskFunctions
1199 if (this.emitter != null && this.ready) {
1200 this.emitter.emit(PoolEvents.ready, this.info)
1201 }
1202 }
1203
1204 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1205 const { taskId, taskError, data } = message
1206 const promiseResponse = this.promiseResponseMap.get(taskId as string)
1207 if (promiseResponse != null) {
1208 if (taskError != null) {
1209 this.emitter?.emit(PoolEvents.taskError, taskError)
1210 promiseResponse.reject(taskError.message)
1211 } else {
1212 promiseResponse.resolve(data as Response)
1213 }
1214 const workerNodeKey = promiseResponse.workerNodeKey
1215 this.afterTaskExecutionHook(workerNodeKey, message)
1216 this.promiseResponseMap.delete(taskId as string)
1217 if (
1218 this.opts.enableTasksQueue === true &&
1219 this.tasksQueueSize(workerNodeKey) > 0 &&
1220 this.workerNodes[workerNodeKey].usage.tasks.executing <
1221 (this.opts.tasksQueueOptions?.concurrency as number)
1222 ) {
1223 this.executeTask(
1224 workerNodeKey,
1225 this.dequeueTask(workerNodeKey) as Task<Data>
1226 )
1227 }
1228 this.workerChoiceStrategyContext.update(workerNodeKey)
1229 }
1230 }
1231
1232 private checkAndEmitTaskExecutionEvents (): void {
1233 if (this.busy) {
1234 this.emitter?.emit(PoolEvents.busy, this.info)
1235 }
1236 }
1237
1238 private checkAndEmitTaskQueuingEvents (): void {
1239 if (this.hasBackPressure()) {
1240 this.emitter?.emit(PoolEvents.backPressure, this.info)
1241 }
1242 }
1243
1244 private checkAndEmitDynamicWorkerCreationEvents (): void {
1245 if (this.type === PoolTypes.dynamic) {
1246 if (this.full) {
1247 this.emitter?.emit(PoolEvents.full, this.info)
1248 }
1249 }
1250 }
1251
1252 /**
1253 * Gets the worker information given its worker node key.
1254 *
1255 * @param workerNodeKey - The worker node key.
1256 * @returns The worker information.
1257 */
1258 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
1259 return this.workerNodes[workerNodeKey].info
1260 }
1261
1262 /**
1263 * Adds the given worker in the pool worker nodes.
1264 *
1265 * @param worker - The worker.
1266 * @returns The added worker node key.
1267 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1268 */
1269 private addWorkerNode (worker: Worker): number {
1270 const workerNode = new WorkerNode<Worker, Data>(
1271 worker,
1272 this.worker,
1273 this.maxSize
1274 )
1275 // Flag the worker node as ready at pool startup.
1276 if (this.starting) {
1277 workerNode.info.ready = true
1278 }
1279 this.workerNodes.push(workerNode)
1280 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1281 if (workerNodeKey === -1) {
1282 throw new Error('Worker node added not found')
1283 }
1284 return workerNodeKey
1285 }
1286
1287 /**
1288 * Removes the given worker from the pool worker nodes.
1289 *
1290 * @param worker - The worker.
1291 */
1292 private removeWorkerNode (worker: Worker): void {
1293 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1294 if (workerNodeKey !== -1) {
1295 this.workerNodes.splice(workerNodeKey, 1)
1296 this.workerChoiceStrategyContext.remove(workerNodeKey)
1297 }
1298 }
1299
1300 /** @inheritDoc */
1301 public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
1302 return (
1303 this.opts.enableTasksQueue === true &&
1304 this.workerNodes[workerNodeKey].hasBackPressure()
1305 )
1306 }
1307
1308 private hasBackPressure (): boolean {
1309 return (
1310 this.opts.enableTasksQueue === true &&
1311 this.workerNodes.findIndex(
1312 (workerNode) => !workerNode.hasBackPressure()
1313 ) === -1
1314 )
1315 }
1316
1317 /**
1318 * Executes the given task on the worker given its worker node key.
1319 *
1320 * @param workerNodeKey - The worker node key.
1321 * @param task - The task to execute.
1322 */
1323 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1324 this.beforeTaskExecutionHook(workerNodeKey, task)
1325 this.sendToWorker(workerNodeKey, task, task.transferList)
1326 this.checkAndEmitTaskExecutionEvents()
1327 }
1328
1329 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1330 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1331 this.checkAndEmitTaskQueuingEvents()
1332 return tasksQueueSize
1333 }
1334
1335 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1336 return this.workerNodes[workerNodeKey].dequeueTask()
1337 }
1338
1339 private tasksQueueSize (workerNodeKey: number): number {
1340 return this.workerNodes[workerNodeKey].tasksQueueSize()
1341 }
1342
1343 protected flushTasksQueue (workerNodeKey: number): void {
1344 while (this.tasksQueueSize(workerNodeKey) > 0) {
1345 this.executeTask(
1346 workerNodeKey,
1347 this.dequeueTask(workerNodeKey) as Task<Data>
1348 )
1349 }
1350 this.workerNodes[workerNodeKey].clearTasksQueue()
1351 }
1352
1353 private flushTasksQueues (): void {
1354 for (const [workerNodeKey] of this.workerNodes.entries()) {
1355 this.flushTasksQueue(workerNodeKey)
1356 }
1357 }
1358 }