Merge branch 'master' of github.com:poolifier/poolifier
[poolifier.git] / src / pools / abstract-pool.ts
1 import { randomUUID } from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import { existsSync } from 'node:fs'
4 import { type TransferListItem } from 'node:worker_threads'
5 import type {
6 MessageValue,
7 PromiseResponseWrapper,
8 Task
9 } from '../utility-types'
10 import {
11 DEFAULT_TASK_NAME,
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
13 EMPTY_FUNCTION,
14 isKillBehavior,
15 isPlainObject,
16 median,
17 round,
18 updateMeasurementStatistics
19 } from '../utils'
20 import { KillBehaviors } from '../worker/worker-options'
21 import {
22 type IPool,
23 PoolEmitter,
24 PoolEvents,
25 type PoolInfo,
26 type PoolOptions,
27 type PoolType,
28 PoolTypes,
29 type TasksQueueOptions
30 } from './pool'
31 import type {
32 IWorker,
33 IWorkerNode,
34 WorkerInfo,
35 WorkerType,
36 WorkerUsage
37 } from './worker'
38 import {
39 type MeasurementStatisticsRequirements,
40 Measurements,
41 WorkerChoiceStrategies,
42 type WorkerChoiceStrategy,
43 type WorkerChoiceStrategyOptions
44 } from './selection-strategies/selection-strategies-types'
45 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
46 import { version } from './version'
47 import { WorkerNode } from './worker-node'
48
49 /**
50 * Base class that implements some shared logic for all poolifier pools.
51 *
52 * @typeParam Worker - Type of worker which manages this pool.
53 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
54 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
55 */
56 export abstract class AbstractPool<
57 Worker extends IWorker,
58 Data = unknown,
59 Response = unknown
60 > implements IPool<Worker, Data, Response> {
61 /** @inheritDoc */
62 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
63
64 /** @inheritDoc */
65 public readonly emitter?: PoolEmitter
66
67 /**
68 * The task execution response promise map.
69 *
70 * - `key`: The message id of each submitted task.
71 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
72 *
73 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
74 */
75 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
76 new Map<string, PromiseResponseWrapper<Response>>()
77
78 /**
79 * Worker choice strategy context referencing a worker choice algorithm implementation.
80 */
81 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
82 Worker,
83 Data,
84 Response
85 >
86
87 /**
88 * Whether the pool is starting or not.
89 */
90 private readonly starting: boolean
91 /**
92 * The start timestamp of the pool.
93 */
94 private readonly startTimestamp
95 /**
96 * The task function names.
97 */
98 private taskFunctions!: string[]
99
100 /**
101 * Constructs a new poolifier pool.
102 *
103 * @param numberOfWorkers - Number of workers that this pool should manage.
104 * @param filePath - Path to the worker file.
105 * @param opts - Options for the pool.
106 */
107 public constructor (
108 protected readonly numberOfWorkers: number,
109 protected readonly filePath: string,
110 protected readonly opts: PoolOptions<Worker>
111 ) {
112 if (!this.isMain()) {
113 throw new Error(
114 'Cannot start a pool from a worker with the same type as the pool'
115 )
116 }
117 this.checkNumberOfWorkers(this.numberOfWorkers)
118 this.checkFilePath(this.filePath)
119 this.checkPoolOptions(this.opts)
120
121 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
122 this.executeTask = this.executeTask.bind(this)
123 this.enqueueTask = this.enqueueTask.bind(this)
124 this.dequeueTask = this.dequeueTask.bind(this)
125 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
126
127 if (this.opts.enableEvents === true) {
128 this.emitter = new PoolEmitter()
129 }
130 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
131 Worker,
132 Data,
133 Response
134 >(
135 this,
136 this.opts.workerChoiceStrategy,
137 this.opts.workerChoiceStrategyOptions
138 )
139
140 this.setupHook()
141
142 this.starting = true
143 this.startPool()
144 this.starting = false
145
146 this.startTimestamp = performance.now()
147 }
148
149 private checkFilePath (filePath: string): void {
150 if (
151 filePath == null ||
152 typeof filePath !== 'string' ||
153 (typeof filePath === 'string' && filePath.trim().length === 0)
154 ) {
155 throw new Error('Please specify a file with a worker implementation')
156 }
157 if (!existsSync(filePath)) {
158 throw new Error(`Cannot find the worker file '${filePath}'`)
159 }
160 }
161
162 private checkNumberOfWorkers (numberOfWorkers: number): void {
163 if (numberOfWorkers == null) {
164 throw new Error(
165 'Cannot instantiate a pool without specifying the number of workers'
166 )
167 } else if (!Number.isSafeInteger(numberOfWorkers)) {
168 throw new TypeError(
169 'Cannot instantiate a pool with a non safe integer number of workers'
170 )
171 } else if (numberOfWorkers < 0) {
172 throw new RangeError(
173 'Cannot instantiate a pool with a negative number of workers'
174 )
175 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
176 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
177 }
178 }
179
180 protected checkDynamicPoolSize (min: number, max: number): void {
181 if (this.type === PoolTypes.dynamic) {
182 if (max == null) {
183 throw new Error(
184 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
185 )
186 } else if (!Number.isSafeInteger(max)) {
187 throw new TypeError(
188 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
189 )
190 } else if (min > max) {
191 throw new RangeError(
192 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
193 )
194 } else if (max === 0) {
195 throw new RangeError(
196 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
197 )
198 } else if (min === max) {
199 throw new RangeError(
200 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
201 )
202 }
203 }
204 }
205
206 private checkPoolOptions (opts: PoolOptions<Worker>): void {
207 if (isPlainObject(opts)) {
208 this.opts.workerChoiceStrategy =
209 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
210 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
211 this.opts.workerChoiceStrategyOptions =
212 opts.workerChoiceStrategyOptions ??
213 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
214 this.checkValidWorkerChoiceStrategyOptions(
215 this.opts.workerChoiceStrategyOptions
216 )
217 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
218 this.opts.enableEvents = opts.enableEvents ?? true
219 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
220 if (this.opts.enableTasksQueue) {
221 this.checkValidTasksQueueOptions(
222 opts.tasksQueueOptions as TasksQueueOptions
223 )
224 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
225 opts.tasksQueueOptions as TasksQueueOptions
226 )
227 }
228 } else {
229 throw new TypeError('Invalid pool options: must be a plain object')
230 }
231 }
232
233 private checkValidWorkerChoiceStrategy (
234 workerChoiceStrategy: WorkerChoiceStrategy
235 ): void {
236 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
237 throw new Error(
238 `Invalid worker choice strategy '${workerChoiceStrategy}'`
239 )
240 }
241 }
242
243 private checkValidWorkerChoiceStrategyOptions (
244 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
245 ): void {
246 if (!isPlainObject(workerChoiceStrategyOptions)) {
247 throw new TypeError(
248 'Invalid worker choice strategy options: must be a plain object'
249 )
250 }
251 if (
252 workerChoiceStrategyOptions.weights != null &&
253 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
254 ) {
255 throw new Error(
256 'Invalid worker choice strategy options: must have a weight for each worker node'
257 )
258 }
259 if (
260 workerChoiceStrategyOptions.measurement != null &&
261 !Object.values(Measurements).includes(
262 workerChoiceStrategyOptions.measurement
263 )
264 ) {
265 throw new Error(
266 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
267 )
268 }
269 }
270
271 private checkValidTasksQueueOptions (
272 tasksQueueOptions: TasksQueueOptions
273 ): void {
274 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
275 throw new TypeError('Invalid tasks queue options: must be a plain object')
276 }
277 if (
278 tasksQueueOptions?.concurrency != null &&
279 !Number.isSafeInteger(tasksQueueOptions.concurrency)
280 ) {
281 throw new TypeError(
282 'Invalid worker tasks concurrency: must be an integer'
283 )
284 }
285 if (
286 tasksQueueOptions?.concurrency != null &&
287 tasksQueueOptions.concurrency <= 0
288 ) {
289 throw new Error(
290 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
291 )
292 }
293 }
294
295 private startPool (): void {
296 while (
297 this.workerNodes.reduce(
298 (accumulator, workerNode) =>
299 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
300 0
301 ) < this.numberOfWorkers
302 ) {
303 this.createAndSetupWorkerNode()
304 }
305 }
306
307 /** @inheritDoc */
308 public get info (): PoolInfo {
309 return {
310 version,
311 type: this.type,
312 worker: this.worker,
313 ready: this.ready,
314 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
315 minSize: this.minSize,
316 maxSize: this.maxSize,
317 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
318 .runTime.aggregate &&
319 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
320 .waitTime.aggregate && { utilization: round(this.utilization) }),
321 workerNodes: this.workerNodes.length,
322 idleWorkerNodes: this.workerNodes.reduce(
323 (accumulator, workerNode) =>
324 workerNode.usage.tasks.executing === 0
325 ? accumulator + 1
326 : accumulator,
327 0
328 ),
329 busyWorkerNodes: this.workerNodes.reduce(
330 (accumulator, workerNode) =>
331 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
332 0
333 ),
334 executedTasks: this.workerNodes.reduce(
335 (accumulator, workerNode) =>
336 accumulator + workerNode.usage.tasks.executed,
337 0
338 ),
339 executingTasks: this.workerNodes.reduce(
340 (accumulator, workerNode) =>
341 accumulator + workerNode.usage.tasks.executing,
342 0
343 ),
344 ...(this.opts.enableTasksQueue === true && {
345 queuedTasks: this.workerNodes.reduce(
346 (accumulator, workerNode) =>
347 accumulator + workerNode.usage.tasks.queued,
348 0
349 )
350 }),
351 ...(this.opts.enableTasksQueue === true && {
352 maxQueuedTasks: this.workerNodes.reduce(
353 (accumulator, workerNode) =>
354 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
355 0
356 )
357 }),
358 failedTasks: this.workerNodes.reduce(
359 (accumulator, workerNode) =>
360 accumulator + workerNode.usage.tasks.failed,
361 0
362 ),
363 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
364 .runTime.aggregate && {
365 runTime: {
366 minimum: round(
367 Math.min(
368 ...this.workerNodes.map(
369 (workerNode) => workerNode.usage.runTime?.minimum ?? Infinity
370 )
371 )
372 ),
373 maximum: round(
374 Math.max(
375 ...this.workerNodes.map(
376 (workerNode) => workerNode.usage.runTime?.maximum ?? -Infinity
377 )
378 )
379 ),
380 average: round(
381 this.workerNodes.reduce(
382 (accumulator, workerNode) =>
383 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
384 0
385 ) /
386 this.workerNodes.reduce(
387 (accumulator, workerNode) =>
388 accumulator + (workerNode.usage.tasks?.executed ?? 0),
389 0
390 )
391 ),
392 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
393 .runTime.median && {
394 median: round(
395 median(
396 this.workerNodes.map(
397 (workerNode) => workerNode.usage.runTime?.median ?? 0
398 )
399 )
400 )
401 })
402 }
403 }),
404 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
405 .waitTime.aggregate && {
406 waitTime: {
407 minimum: round(
408 Math.min(
409 ...this.workerNodes.map(
410 (workerNode) => workerNode.usage.waitTime?.minimum ?? Infinity
411 )
412 )
413 ),
414 maximum: round(
415 Math.max(
416 ...this.workerNodes.map(
417 (workerNode) => workerNode.usage.waitTime?.maximum ?? -Infinity
418 )
419 )
420 ),
421 average: round(
422 this.workerNodes.reduce(
423 (accumulator, workerNode) =>
424 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
425 0
426 ) /
427 this.workerNodes.reduce(
428 (accumulator, workerNode) =>
429 accumulator + (workerNode.usage.tasks?.executed ?? 0),
430 0
431 )
432 ),
433 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
434 .waitTime.median && {
435 median: round(
436 median(
437 this.workerNodes.map(
438 (workerNode) => workerNode.usage.waitTime?.median ?? 0
439 )
440 )
441 )
442 })
443 }
444 })
445 }
446 }
447
448 /**
449 * The pool readiness boolean status.
450 */
451 private get ready (): boolean {
452 return (
453 this.workerNodes.reduce(
454 (accumulator, workerNode) =>
455 !workerNode.info.dynamic && workerNode.info.ready
456 ? accumulator + 1
457 : accumulator,
458 0
459 ) >= this.minSize
460 )
461 }
462
463 /**
464 * The approximate pool utilization.
465 *
466 * @returns The pool utilization.
467 */
468 private get utilization (): number {
469 const poolTimeCapacity =
470 (performance.now() - this.startTimestamp) * this.maxSize
471 const totalTasksRunTime = this.workerNodes.reduce(
472 (accumulator, workerNode) =>
473 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
474 0
475 )
476 const totalTasksWaitTime = this.workerNodes.reduce(
477 (accumulator, workerNode) =>
478 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
479 0
480 )
481 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
482 }
483
484 /**
485 * The pool type.
486 *
487 * If it is `'dynamic'`, it provides the `max` property.
488 */
489 protected abstract get type (): PoolType
490
491 /**
492 * The worker type.
493 */
494 protected abstract get worker (): WorkerType
495
496 /**
497 * The pool minimum size.
498 */
499 protected abstract get minSize (): number
500
501 /**
502 * The pool maximum size.
503 */
504 protected abstract get maxSize (): number
505
506 /**
507 * Checks if the worker id sent in the received message from a worker is valid.
508 *
509 * @param message - The received message.
510 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
511 */
512 private checkMessageWorkerId (message: MessageValue<Response>): void {
513 if (message.workerId == null) {
514 throw new Error('Worker message received without worker id')
515 } else if (
516 message.workerId != null &&
517 this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
518 ) {
519 throw new Error(
520 `Worker message received from unknown worker '${message.workerId}'`
521 )
522 }
523 }
524
525 /**
526 * Gets the given worker its worker node key.
527 *
528 * @param worker - The worker.
529 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
530 */
531 private getWorkerNodeKeyByWorker (worker: Worker): number {
532 return this.workerNodes.findIndex(
533 (workerNode) => workerNode.worker === worker
534 )
535 }
536
537 /**
538 * Gets the worker node key given its worker id.
539 *
540 * @param workerId - The worker id.
541 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
542 */
543 private getWorkerNodeKeyByWorkerId (workerId: number): number {
544 return this.workerNodes.findIndex(
545 (workerNode) => workerNode.info.id === workerId
546 )
547 }
548
549 /** @inheritDoc */
550 public setWorkerChoiceStrategy (
551 workerChoiceStrategy: WorkerChoiceStrategy,
552 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
553 ): void {
554 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
555 this.opts.workerChoiceStrategy = workerChoiceStrategy
556 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
557 this.opts.workerChoiceStrategy
558 )
559 if (workerChoiceStrategyOptions != null) {
560 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
561 }
562 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
563 workerNode.resetUsage()
564 this.sendStatisticsMessageToWorker(workerNodeKey)
565 }
566 }
567
568 /** @inheritDoc */
569 public setWorkerChoiceStrategyOptions (
570 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
571 ): void {
572 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
573 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
574 this.workerChoiceStrategyContext.setOptions(
575 this.opts.workerChoiceStrategyOptions
576 )
577 }
578
579 /** @inheritDoc */
580 public enableTasksQueue (
581 enable: boolean,
582 tasksQueueOptions?: TasksQueueOptions
583 ): void {
584 if (this.opts.enableTasksQueue === true && !enable) {
585 this.flushTasksQueues()
586 }
587 this.opts.enableTasksQueue = enable
588 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
589 }
590
591 /** @inheritDoc */
592 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
593 if (this.opts.enableTasksQueue === true) {
594 this.checkValidTasksQueueOptions(tasksQueueOptions)
595 this.opts.tasksQueueOptions =
596 this.buildTasksQueueOptions(tasksQueueOptions)
597 } else if (this.opts.tasksQueueOptions != null) {
598 delete this.opts.tasksQueueOptions
599 }
600 }
601
602 private buildTasksQueueOptions (
603 tasksQueueOptions: TasksQueueOptions
604 ): TasksQueueOptions {
605 return {
606 concurrency: tasksQueueOptions?.concurrency ?? 1
607 }
608 }
609
610 /**
611 * Whether the pool is full or not.
612 *
613 * The pool filling boolean status.
614 */
615 protected get full (): boolean {
616 return this.workerNodes.length >= this.maxSize
617 }
618
619 /**
620 * Whether the pool is busy or not.
621 *
622 * The pool busyness boolean status.
623 */
624 protected abstract get busy (): boolean
625
626 /**
627 * Whether worker nodes are executing concurrently their tasks quota or not.
628 *
629 * @returns Worker nodes busyness boolean status.
630 */
631 protected internalBusy (): boolean {
632 if (this.opts.enableTasksQueue === true) {
633 return (
634 this.workerNodes.findIndex(
635 (workerNode) =>
636 workerNode.info.ready &&
637 workerNode.usage.tasks.executing <
638 (this.opts.tasksQueueOptions?.concurrency as number)
639 ) === -1
640 )
641 } else {
642 return (
643 this.workerNodes.findIndex(
644 (workerNode) =>
645 workerNode.info.ready && workerNode.usage.tasks.executing === 0
646 ) === -1
647 )
648 }
649 }
650
651 /** @inheritDoc */
652 public listTaskFunctions (): string[] {
653 if (this.taskFunctions != null) {
654 return this.taskFunctions
655 } else {
656 return []
657 }
658 }
659
660 /** @inheritDoc */
661 public async execute (
662 data?: Data,
663 name?: string,
664 transferList?: TransferListItem[]
665 ): Promise<Response> {
666 return await new Promise<Response>((resolve, reject) => {
667 if (name != null && typeof name !== 'string') {
668 reject(new TypeError('name argument must be a string'))
669 }
670 if (
671 name != null &&
672 typeof name === 'string' &&
673 name.trim().length === 0
674 ) {
675 reject(new TypeError('name argument must not be an empty string'))
676 }
677 if (
678 name != null &&
679 this.taskFunctions != null &&
680 !this.taskFunctions.includes(name)
681 ) {
682 reject(
683 new Error(`Task function '${name}' is not registered in the pool`)
684 )
685 }
686 if (transferList != null && !Array.isArray(transferList)) {
687 reject(new TypeError('transferList argument must be an array'))
688 }
689 const timestamp = performance.now()
690 const workerNodeKey = this.chooseWorkerNode()
691 const task: Task<Data> = {
692 name: name ?? DEFAULT_TASK_NAME,
693 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
694 data: data ?? ({} as Data),
695 transferList,
696 timestamp,
697 workerId: this.getWorkerInfo(workerNodeKey).id as number,
698 taskId: randomUUID()
699 }
700 this.promiseResponseMap.set(task.taskId as string, {
701 resolve,
702 reject,
703 workerNodeKey
704 })
705 if (
706 this.opts.enableTasksQueue === false ||
707 (this.opts.enableTasksQueue === true &&
708 this.workerNodes[workerNodeKey].usage.tasks.executing <
709 (this.opts.tasksQueueOptions?.concurrency as number))
710 ) {
711 this.executeTask(workerNodeKey, task)
712 } else {
713 this.enqueueTask(workerNodeKey, task)
714 }
715 this.checkAndEmitEvents()
716 })
717 }
718
719 /** @inheritDoc */
720 public async destroy (): Promise<void> {
721 await Promise.all(
722 this.workerNodes.map(async (_, workerNodeKey) => {
723 await this.destroyWorkerNode(workerNodeKey)
724 })
725 )
726 this.emitter?.emit(PoolEvents.destroy)
727 }
728
729 protected async sendKillMessageToWorker (
730 workerNodeKey: number,
731 workerId: number
732 ): Promise<void> {
733 await new Promise<void>((resolve, reject) => {
734 this.registerWorkerMessageListener(workerNodeKey, (message) => {
735 if (message.kill === 'success') {
736 resolve()
737 } else if (message.kill === 'failure') {
738 reject(new Error(`Worker ${workerId} kill message handling failed`))
739 }
740 })
741 this.sendToWorker(workerNodeKey, { kill: true, workerId })
742 })
743 }
744
745 /**
746 * Terminates the worker node given its worker node key.
747 *
748 * @param workerNodeKey - The worker node key.
749 */
750 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
751
752 /**
753 * Setup hook to execute code before worker nodes are created in the abstract constructor.
754 * Can be overridden.
755 *
756 * @virtual
757 */
758 protected setupHook (): void {
759 // Intentionally empty
760 }
761
762 /**
763 * Should return whether the worker is the main worker or not.
764 */
765 protected abstract isMain (): boolean
766
767 /**
768 * Hook executed before the worker task execution.
769 * Can be overridden.
770 *
771 * @param workerNodeKey - The worker node key.
772 * @param task - The task to execute.
773 */
774 protected beforeTaskExecutionHook (
775 workerNodeKey: number,
776 task: Task<Data>
777 ): void {
778 const workerUsage = this.workerNodes[workerNodeKey].usage
779 ++workerUsage.tasks.executing
780 this.updateWaitTimeWorkerUsage(workerUsage, task)
781 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
782 task.name as string
783 ) as WorkerUsage
784 ++taskWorkerUsage.tasks.executing
785 this.updateWaitTimeWorkerUsage(taskWorkerUsage, task)
786 }
787
788 /**
789 * Hook executed after the worker task execution.
790 * Can be overridden.
791 *
792 * @param workerNodeKey - The worker node key.
793 * @param message - The received message.
794 */
795 protected afterTaskExecutionHook (
796 workerNodeKey: number,
797 message: MessageValue<Response>
798 ): void {
799 const workerUsage = this.workerNodes[workerNodeKey].usage
800 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
801 this.updateRunTimeWorkerUsage(workerUsage, message)
802 this.updateEluWorkerUsage(workerUsage, message)
803 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
804 message.taskPerformance?.name ?? DEFAULT_TASK_NAME
805 ) as WorkerUsage
806 this.updateTaskStatisticsWorkerUsage(taskWorkerUsage, message)
807 this.updateRunTimeWorkerUsage(taskWorkerUsage, message)
808 this.updateEluWorkerUsage(taskWorkerUsage, message)
809 }
810
811 private updateTaskStatisticsWorkerUsage (
812 workerUsage: WorkerUsage,
813 message: MessageValue<Response>
814 ): void {
815 const workerTaskStatistics = workerUsage.tasks
816 --workerTaskStatistics.executing
817 if (message.taskError == null) {
818 ++workerTaskStatistics.executed
819 } else {
820 ++workerTaskStatistics.failed
821 }
822 }
823
824 private updateRunTimeWorkerUsage (
825 workerUsage: WorkerUsage,
826 message: MessageValue<Response>
827 ): void {
828 updateMeasurementStatistics(
829 workerUsage.runTime,
830 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
831 message.taskPerformance?.runTime ?? 0,
832 workerUsage.tasks.executed
833 )
834 }
835
836 private updateWaitTimeWorkerUsage (
837 workerUsage: WorkerUsage,
838 task: Task<Data>
839 ): void {
840 const timestamp = performance.now()
841 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
842 updateMeasurementStatistics(
843 workerUsage.waitTime,
844 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
845 taskWaitTime,
846 workerUsage.tasks.executed
847 )
848 }
849
850 private updateEluWorkerUsage (
851 workerUsage: WorkerUsage,
852 message: MessageValue<Response>
853 ): void {
854 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
855 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
856 updateMeasurementStatistics(
857 workerUsage.elu.active,
858 eluTaskStatisticsRequirements,
859 message.taskPerformance?.elu?.active ?? 0,
860 workerUsage.tasks.executed
861 )
862 updateMeasurementStatistics(
863 workerUsage.elu.idle,
864 eluTaskStatisticsRequirements,
865 message.taskPerformance?.elu?.idle ?? 0,
866 workerUsage.tasks.executed
867 )
868 if (eluTaskStatisticsRequirements.aggregate) {
869 if (message.taskPerformance?.elu != null) {
870 if (workerUsage.elu.utilization != null) {
871 workerUsage.elu.utilization =
872 (workerUsage.elu.utilization +
873 message.taskPerformance.elu.utilization) /
874 2
875 } else {
876 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
877 }
878 }
879 }
880 }
881
882 /**
883 * Chooses a worker node for the next task.
884 *
885 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
886 *
887 * @returns The chosen worker node key
888 */
889 private chooseWorkerNode (): number {
890 if (this.shallCreateDynamicWorker()) {
891 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
892 if (
893 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
894 ) {
895 return workerNodeKey
896 }
897 }
898 return this.workerChoiceStrategyContext.execute()
899 }
900
901 /**
902 * Conditions for dynamic worker creation.
903 *
904 * @returns Whether to create a dynamic worker or not.
905 */
906 private shallCreateDynamicWorker (): boolean {
907 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
908 }
909
910 /**
911 * Sends a message to worker given its worker node key.
912 *
913 * @param workerNodeKey - The worker node key.
914 * @param message - The message.
915 * @param transferList - The optional array of transferable objects.
916 */
917 protected abstract sendToWorker (
918 workerNodeKey: number,
919 message: MessageValue<Data>,
920 transferList?: TransferListItem[]
921 ): void
922
923 /**
924 * Creates a new worker.
925 *
926 * @returns Newly created worker.
927 */
928 protected abstract createWorker (): Worker
929
930 /**
931 * Creates a new, completely set up worker node.
932 *
933 * @returns New, completely set up worker node key.
934 */
935 protected createAndSetupWorkerNode (): number {
936 const worker = this.createWorker()
937
938 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
939 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
940 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
941 worker.on('error', (error) => {
942 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
943 const workerInfo = this.getWorkerInfo(workerNodeKey)
944 workerInfo.ready = false
945 this.workerNodes[workerNodeKey].closeChannel()
946 this.emitter?.emit(PoolEvents.error, error)
947 if (this.opts.restartWorkerOnError === true && !this.starting) {
948 if (workerInfo.dynamic) {
949 this.createAndSetupDynamicWorkerNode()
950 } else {
951 this.createAndSetupWorkerNode()
952 }
953 }
954 if (this.opts.enableTasksQueue === true) {
955 this.redistributeQueuedTasks(workerNodeKey)
956 }
957 })
958 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
959 worker.once('exit', () => {
960 this.removeWorkerNode(worker)
961 })
962
963 const workerNodeKey = this.addWorkerNode(worker)
964
965 this.afterWorkerNodeSetup(workerNodeKey)
966
967 return workerNodeKey
968 }
969
970 /**
971 * Creates a new, completely set up dynamic worker node.
972 *
973 * @returns New, completely set up dynamic worker node key.
974 */
975 protected createAndSetupDynamicWorkerNode (): number {
976 const workerNodeKey = this.createAndSetupWorkerNode()
977 this.registerWorkerMessageListener(workerNodeKey, (message) => {
978 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
979 message.workerId
980 )
981 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
982 // Kill message received from worker
983 if (
984 isKillBehavior(KillBehaviors.HARD, message.kill) ||
985 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
986 ((this.opts.enableTasksQueue === false &&
987 workerUsage.tasks.executing === 0) ||
988 (this.opts.enableTasksQueue === true &&
989 workerUsage.tasks.executing === 0 &&
990 this.tasksQueueSize(localWorkerNodeKey) === 0)))
991 ) {
992 this.destroyWorkerNode(localWorkerNodeKey).catch((error) => {
993 this.emitter?.emit(PoolEvents.error, error)
994 })
995 }
996 })
997 const workerInfo = this.getWorkerInfo(workerNodeKey)
998 this.sendToWorker(workerNodeKey, {
999 checkActive: true,
1000 workerId: workerInfo.id as number
1001 })
1002 workerInfo.dynamic = true
1003 if (this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker) {
1004 workerInfo.ready = true
1005 }
1006 return workerNodeKey
1007 }
1008
1009 /**
1010 * Registers a listener callback on the worker given its worker node key.
1011 *
1012 * @param workerNodeKey - The worker node key.
1013 * @param listener - The message listener callback.
1014 */
1015 protected abstract registerWorkerMessageListener<
1016 Message extends Data | Response
1017 >(
1018 workerNodeKey: number,
1019 listener: (message: MessageValue<Message>) => void
1020 ): void
1021
1022 /**
1023 * Method hooked up after a worker node has been newly created.
1024 * Can be overridden.
1025 *
1026 * @param workerNodeKey - The newly created worker node key.
1027 */
1028 protected afterWorkerNodeSetup (workerNodeKey: number): void {
1029 // Listen to worker messages.
1030 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
1031 // Send the startup message to worker.
1032 this.sendStartupMessageToWorker(workerNodeKey)
1033 // Send the statistics message to worker.
1034 this.sendStatisticsMessageToWorker(workerNodeKey)
1035 }
1036
1037 /**
1038 * Sends the startup message to worker given its worker node key.
1039 *
1040 * @param workerNodeKey - The worker node key.
1041 */
1042 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1043
1044 /**
1045 * Sends the statistics message to worker given its worker node key.
1046 *
1047 * @param workerNodeKey - The worker node key.
1048 */
1049 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
1050 this.sendToWorker(workerNodeKey, {
1051 statistics: {
1052 runTime:
1053 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1054 .runTime.aggregate,
1055 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1056 .elu.aggregate
1057 },
1058 workerId: this.getWorkerInfo(workerNodeKey).id as number
1059 })
1060 }
1061
1062 private redistributeQueuedTasks (workerNodeKey: number): void {
1063 while (this.tasksQueueSize(workerNodeKey) > 0) {
1064 let targetWorkerNodeKey: number = workerNodeKey
1065 let minQueuedTasks = Infinity
1066 let executeTask = false
1067 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
1068 const workerInfo = this.getWorkerInfo(workerNodeId)
1069 if (
1070 workerNodeId !== workerNodeKey &&
1071 workerInfo.ready &&
1072 workerNode.usage.tasks.queued === 0
1073 ) {
1074 if (
1075 this.workerNodes[workerNodeId].usage.tasks.executing <
1076 (this.opts.tasksQueueOptions?.concurrency as number)
1077 ) {
1078 executeTask = true
1079 }
1080 targetWorkerNodeKey = workerNodeId
1081 break
1082 }
1083 if (
1084 workerNodeId !== workerNodeKey &&
1085 workerInfo.ready &&
1086 workerNode.usage.tasks.queued < minQueuedTasks
1087 ) {
1088 minQueuedTasks = workerNode.usage.tasks.queued
1089 targetWorkerNodeKey = workerNodeId
1090 }
1091 }
1092 if (executeTask) {
1093 this.executeTask(
1094 targetWorkerNodeKey,
1095 this.dequeueTask(workerNodeKey) as Task<Data>
1096 )
1097 } else {
1098 this.enqueueTask(
1099 targetWorkerNodeKey,
1100 this.dequeueTask(workerNodeKey) as Task<Data>
1101 )
1102 }
1103 }
1104 }
1105
1106 /**
1107 * This method is the listener registered for each worker message.
1108 *
1109 * @returns The listener function to execute when a message is received from a worker.
1110 */
1111 protected workerListener (): (message: MessageValue<Response>) => void {
1112 return (message) => {
1113 this.checkMessageWorkerId(message)
1114 if (message.ready != null) {
1115 // Worker ready response received from worker
1116 this.handleWorkerReadyResponse(message)
1117 } else if (message.taskId != null) {
1118 // Task execution response received from worker
1119 this.handleTaskExecutionResponse(message)
1120 } else if (message.taskFunctions != null) {
1121 // Task functions message received from worker
1122 this.taskFunctions = message.taskFunctions
1123 }
1124 }
1125 }
1126
1127 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
1128 this.getWorkerInfo(
1129 this.getWorkerNodeKeyByWorkerId(message.workerId)
1130 ).ready = message.ready as boolean
1131 if (this.emitter != null && this.ready) {
1132 this.emitter.emit(PoolEvents.ready, this.info)
1133 }
1134 }
1135
1136 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1137 const promiseResponse = this.promiseResponseMap.get(
1138 message.taskId as string
1139 )
1140 if (promiseResponse != null) {
1141 if (message.taskError != null) {
1142 this.emitter?.emit(PoolEvents.taskError, message.taskError)
1143 promiseResponse.reject(message.taskError.message)
1144 } else {
1145 promiseResponse.resolve(message.data as Response)
1146 }
1147 const workerNodeKey = promiseResponse.workerNodeKey
1148 this.afterTaskExecutionHook(workerNodeKey, message)
1149 this.promiseResponseMap.delete(message.taskId as string)
1150 if (
1151 this.opts.enableTasksQueue === true &&
1152 this.tasksQueueSize(workerNodeKey) > 0 &&
1153 this.workerNodes[workerNodeKey].usage.tasks.executing <
1154 (this.opts.tasksQueueOptions?.concurrency as number)
1155 ) {
1156 this.executeTask(
1157 workerNodeKey,
1158 this.dequeueTask(workerNodeKey) as Task<Data>
1159 )
1160 }
1161 this.workerChoiceStrategyContext.update(workerNodeKey)
1162 }
1163 }
1164
1165 private checkAndEmitEvents (): void {
1166 if (this.emitter != null) {
1167 if (this.busy) {
1168 this.emitter.emit(PoolEvents.busy, this.info)
1169 }
1170 if (this.type === PoolTypes.dynamic && this.full) {
1171 this.emitter.emit(PoolEvents.full, this.info)
1172 }
1173 }
1174 }
1175
1176 /**
1177 * Gets the worker information given its worker node key.
1178 *
1179 * @param workerNodeKey - The worker node key.
1180 * @returns The worker information.
1181 */
1182 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
1183 return this.workerNodes[workerNodeKey].info
1184 }
1185
1186 /**
1187 * Adds the given worker in the pool worker nodes.
1188 *
1189 * @param worker - The worker.
1190 * @returns The added worker node key.
1191 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1192 */
1193 private addWorkerNode (worker: Worker): number {
1194 const workerNode = new WorkerNode<Worker, Data>(worker, this.worker)
1195 // Flag the worker node as ready at pool startup.
1196 if (this.starting) {
1197 workerNode.info.ready = true
1198 }
1199 this.workerNodes.push(workerNode)
1200 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1201 if (workerNodeKey === -1) {
1202 throw new Error('Worker node not found')
1203 }
1204 return workerNodeKey
1205 }
1206
1207 /**
1208 * Removes the given worker from the pool worker nodes.
1209 *
1210 * @param worker - The worker.
1211 */
1212 private removeWorkerNode (worker: Worker): void {
1213 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1214 if (workerNodeKey !== -1) {
1215 this.workerNodes.splice(workerNodeKey, 1)
1216 this.workerChoiceStrategyContext.remove(workerNodeKey)
1217 }
1218 }
1219
1220 /**
1221 * Executes the given task on the worker given its worker node key.
1222 *
1223 * @param workerNodeKey - The worker node key.
1224 * @param task - The task to execute.
1225 */
1226 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1227 this.beforeTaskExecutionHook(workerNodeKey, task)
1228 this.sendToWorker(workerNodeKey, task, task.transferList)
1229 }
1230
1231 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1232 return this.workerNodes[workerNodeKey].enqueueTask(task)
1233 }
1234
1235 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1236 return this.workerNodes[workerNodeKey].dequeueTask()
1237 }
1238
1239 private tasksQueueSize (workerNodeKey: number): number {
1240 return this.workerNodes[workerNodeKey].tasksQueueSize()
1241 }
1242
1243 protected flushTasksQueue (workerNodeKey: number): void {
1244 while (this.tasksQueueSize(workerNodeKey) > 0) {
1245 this.executeTask(
1246 workerNodeKey,
1247 this.dequeueTask(workerNodeKey) as Task<Data>
1248 )
1249 }
1250 this.workerNodes[workerNodeKey].clearTasksQueue()
1251 }
1252
1253 private flushTasksQueues (): void {
1254 for (const [workerNodeKey] of this.workerNodes.entries()) {
1255 this.flushTasksQueue(workerNodeKey)
1256 }
1257 }
1258 }