docs: refine docs/api.md
[poolifier.git] / src / pools / abstract-pool.ts
... / ...
CommitLineData
1import { randomUUID } from 'node:crypto'
2import { performance } from 'node:perf_hooks'
3import { existsSync } from 'node:fs'
4import { type TransferListItem } from 'node:worker_threads'
5import type {
6 MessageValue,
7 PromiseResponseWrapper,
8 Task
9} from '../utility-types'
10import {
11 DEFAULT_TASK_NAME,
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
13 EMPTY_FUNCTION,
14 isKillBehavior,
15 isPlainObject,
16 median,
17 round,
18 updateMeasurementStatistics
19} from '../utils'
20import { KillBehaviors } from '../worker/worker-options'
21import {
22 type IPool,
23 PoolEmitter,
24 PoolEvents,
25 type PoolInfo,
26 type PoolOptions,
27 type PoolType,
28 PoolTypes,
29 type TasksQueueOptions
30} from './pool'
31import type {
32 IWorker,
33 IWorkerNode,
34 WorkerInfo,
35 WorkerType,
36 WorkerUsage
37} from './worker'
38import {
39 type MeasurementStatisticsRequirements,
40 Measurements,
41 WorkerChoiceStrategies,
42 type WorkerChoiceStrategy,
43 type WorkerChoiceStrategyOptions
44} from './selection-strategies/selection-strategies-types'
45import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
46import { version } from './version'
47import { WorkerNode } from './worker-node'
48
49/**
50 * Base class that implements some shared logic for all poolifier pools.
51 *
52 * @typeParam Worker - Type of worker which manages this pool.
53 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
54 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
55 */
56export abstract class AbstractPool<
57 Worker extends IWorker,
58 Data = unknown,
59 Response = unknown
60> implements IPool<Worker, Data, Response> {
61 /** @inheritDoc */
62 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
63
64 /** @inheritDoc */
65 public readonly emitter?: PoolEmitter
66
67 /**
68 * The task execution response promise map.
69 *
70 * - `key`: The message id of each submitted task.
71 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
72 *
73 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
74 */
75 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
76 new Map<string, PromiseResponseWrapper<Response>>()
77
78 /**
79 * Worker choice strategy context referencing a worker choice algorithm implementation.
80 */
81 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
82 Worker,
83 Data,
84 Response
85 >
86
87 /**
88 * Whether the pool is starting or not.
89 */
90 private readonly starting: boolean
91 /**
92 * The start timestamp of the pool.
93 */
94 private readonly startTimestamp
95 /**
96 * The task function names.
97 */
98 private taskFunctions!: string[]
99
100 /**
101 * Constructs a new poolifier pool.
102 *
103 * @param numberOfWorkers - Number of workers that this pool should manage.
104 * @param filePath - Path to the worker file.
105 * @param opts - Options for the pool.
106 */
107 public constructor (
108 protected readonly numberOfWorkers: number,
109 protected readonly filePath: string,
110 protected readonly opts: PoolOptions<Worker>
111 ) {
112 if (!this.isMain()) {
113 throw new Error(
114 'Cannot start a pool from a worker with the same type as the pool'
115 )
116 }
117 this.checkNumberOfWorkers(this.numberOfWorkers)
118 this.checkFilePath(this.filePath)
119 this.checkPoolOptions(this.opts)
120
121 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
122 this.executeTask = this.executeTask.bind(this)
123 this.enqueueTask = this.enqueueTask.bind(this)
124 this.dequeueTask = this.dequeueTask.bind(this)
125 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
126
127 if (this.opts.enableEvents === true) {
128 this.emitter = new PoolEmitter()
129 }
130 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
131 Worker,
132 Data,
133 Response
134 >(
135 this,
136 this.opts.workerChoiceStrategy,
137 this.opts.workerChoiceStrategyOptions
138 )
139
140 this.setupHook()
141
142 this.starting = true
143 this.startPool()
144 this.starting = false
145
146 this.startTimestamp = performance.now()
147 }
148
149 private checkFilePath (filePath: string): void {
150 if (
151 filePath == null ||
152 typeof filePath !== 'string' ||
153 (typeof filePath === 'string' && filePath.trim().length === 0)
154 ) {
155 throw new Error('Please specify a file with a worker implementation')
156 }
157 if (!existsSync(filePath)) {
158 throw new Error(`Cannot find the worker file '${filePath}'`)
159 }
160 }
161
162 private checkNumberOfWorkers (numberOfWorkers: number): void {
163 if (numberOfWorkers == null) {
164 throw new Error(
165 'Cannot instantiate a pool without specifying the number of workers'
166 )
167 } else if (!Number.isSafeInteger(numberOfWorkers)) {
168 throw new TypeError(
169 'Cannot instantiate a pool with a non safe integer number of workers'
170 )
171 } else if (numberOfWorkers < 0) {
172 throw new RangeError(
173 'Cannot instantiate a pool with a negative number of workers'
174 )
175 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
176 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
177 }
178 }
179
180 protected checkDynamicPoolSize (min: number, max: number): void {
181 if (this.type === PoolTypes.dynamic) {
182 if (max == null) {
183 throw new Error(
184 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
185 )
186 } else if (!Number.isSafeInteger(max)) {
187 throw new TypeError(
188 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
189 )
190 } else if (min > max) {
191 throw new RangeError(
192 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
193 )
194 } else if (max === 0) {
195 throw new RangeError(
196 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
197 )
198 } else if (min === max) {
199 throw new RangeError(
200 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
201 )
202 }
203 }
204 }
205
206 private checkPoolOptions (opts: PoolOptions<Worker>): void {
207 if (isPlainObject(opts)) {
208 this.opts.workerChoiceStrategy =
209 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
210 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
211 this.opts.workerChoiceStrategyOptions =
212 opts.workerChoiceStrategyOptions ??
213 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
214 this.checkValidWorkerChoiceStrategyOptions(
215 this.opts.workerChoiceStrategyOptions
216 )
217 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
218 this.opts.enableEvents = opts.enableEvents ?? true
219 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
220 if (this.opts.enableTasksQueue) {
221 this.checkValidTasksQueueOptions(
222 opts.tasksQueueOptions as TasksQueueOptions
223 )
224 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
225 opts.tasksQueueOptions as TasksQueueOptions
226 )
227 }
228 } else {
229 throw new TypeError('Invalid pool options: must be a plain object')
230 }
231 }
232
233 private checkValidWorkerChoiceStrategy (
234 workerChoiceStrategy: WorkerChoiceStrategy
235 ): void {
236 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
237 throw new Error(
238 `Invalid worker choice strategy '${workerChoiceStrategy}'`
239 )
240 }
241 }
242
243 private checkValidWorkerChoiceStrategyOptions (
244 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
245 ): void {
246 if (!isPlainObject(workerChoiceStrategyOptions)) {
247 throw new TypeError(
248 'Invalid worker choice strategy options: must be a plain object'
249 )
250 }
251 if (
252 workerChoiceStrategyOptions.weights != null &&
253 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
254 ) {
255 throw new Error(
256 'Invalid worker choice strategy options: must have a weight for each worker node'
257 )
258 }
259 if (
260 workerChoiceStrategyOptions.measurement != null &&
261 !Object.values(Measurements).includes(
262 workerChoiceStrategyOptions.measurement
263 )
264 ) {
265 throw new Error(
266 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
267 )
268 }
269 }
270
271 private checkValidTasksQueueOptions (
272 tasksQueueOptions: TasksQueueOptions
273 ): void {
274 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
275 throw new TypeError('Invalid tasks queue options: must be a plain object')
276 }
277 if (
278 tasksQueueOptions?.concurrency != null &&
279 !Number.isSafeInteger(tasksQueueOptions.concurrency)
280 ) {
281 throw new TypeError(
282 'Invalid worker tasks concurrency: must be an integer'
283 )
284 }
285 if (
286 tasksQueueOptions?.concurrency != null &&
287 tasksQueueOptions.concurrency <= 0
288 ) {
289 throw new Error(
290 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
291 )
292 }
293 }
294
295 private startPool (): void {
296 while (
297 this.workerNodes.reduce(
298 (accumulator, workerNode) =>
299 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
300 0
301 ) < this.numberOfWorkers
302 ) {
303 this.createAndSetupWorkerNode()
304 }
305 }
306
307 /** @inheritDoc */
308 public get info (): PoolInfo {
309 return {
310 version,
311 type: this.type,
312 worker: this.worker,
313 ready: this.ready,
314 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
315 minSize: this.minSize,
316 maxSize: this.maxSize,
317 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
318 .runTime.aggregate &&
319 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
320 .waitTime.aggregate && { utilization: round(this.utilization) }),
321 workerNodes: this.workerNodes.length,
322 idleWorkerNodes: this.workerNodes.reduce(
323 (accumulator, workerNode) =>
324 workerNode.usage.tasks.executing === 0
325 ? accumulator + 1
326 : accumulator,
327 0
328 ),
329 busyWorkerNodes: this.workerNodes.reduce(
330 (accumulator, workerNode) =>
331 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
332 0
333 ),
334 executedTasks: this.workerNodes.reduce(
335 (accumulator, workerNode) =>
336 accumulator + workerNode.usage.tasks.executed,
337 0
338 ),
339 executingTasks: this.workerNodes.reduce(
340 (accumulator, workerNode) =>
341 accumulator + workerNode.usage.tasks.executing,
342 0
343 ),
344 ...(this.opts.enableTasksQueue === true && {
345 queuedTasks: this.workerNodes.reduce(
346 (accumulator, workerNode) =>
347 accumulator + workerNode.usage.tasks.queued,
348 0
349 )
350 }),
351 ...(this.opts.enableTasksQueue === true && {
352 maxQueuedTasks: this.workerNodes.reduce(
353 (accumulator, workerNode) =>
354 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
355 0
356 )
357 }),
358 failedTasks: this.workerNodes.reduce(
359 (accumulator, workerNode) =>
360 accumulator + workerNode.usage.tasks.failed,
361 0
362 ),
363 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
364 .runTime.aggregate && {
365 runTime: {
366 minimum: round(
367 Math.min(
368 ...this.workerNodes.map(
369 (workerNode) => workerNode.usage.runTime?.minimum ?? Infinity
370 )
371 )
372 ),
373 maximum: round(
374 Math.max(
375 ...this.workerNodes.map(
376 (workerNode) => workerNode.usage.runTime?.maximum ?? -Infinity
377 )
378 )
379 ),
380 average: round(
381 this.workerNodes.reduce(
382 (accumulator, workerNode) =>
383 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
384 0
385 ) /
386 this.workerNodes.reduce(
387 (accumulator, workerNode) =>
388 accumulator + (workerNode.usage.tasks?.executed ?? 0),
389 0
390 )
391 ),
392 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
393 .runTime.median && {
394 median: round(
395 median(
396 this.workerNodes.map(
397 (workerNode) => workerNode.usage.runTime?.median ?? 0
398 )
399 )
400 )
401 })
402 }
403 }),
404 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
405 .waitTime.aggregate && {
406 waitTime: {
407 minimum: round(
408 Math.min(
409 ...this.workerNodes.map(
410 (workerNode) => workerNode.usage.waitTime?.minimum ?? Infinity
411 )
412 )
413 ),
414 maximum: round(
415 Math.max(
416 ...this.workerNodes.map(
417 (workerNode) => workerNode.usage.waitTime?.maximum ?? -Infinity
418 )
419 )
420 ),
421 average: round(
422 this.workerNodes.reduce(
423 (accumulator, workerNode) =>
424 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
425 0
426 ) /
427 this.workerNodes.reduce(
428 (accumulator, workerNode) =>
429 accumulator + (workerNode.usage.tasks?.executed ?? 0),
430 0
431 )
432 ),
433 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
434 .waitTime.median && {
435 median: round(
436 median(
437 this.workerNodes.map(
438 (workerNode) => workerNode.usage.waitTime?.median ?? 0
439 )
440 )
441 )
442 })
443 }
444 })
445 }
446 }
447
448 /**
449 * The pool readiness boolean status.
450 */
451 private get ready (): boolean {
452 return (
453 this.workerNodes.reduce(
454 (accumulator, workerNode) =>
455 !workerNode.info.dynamic && workerNode.info.ready
456 ? accumulator + 1
457 : accumulator,
458 0
459 ) >= this.minSize
460 )
461 }
462
463 /**
464 * The approximate pool utilization.
465 *
466 * @returns The pool utilization.
467 */
468 private get utilization (): number {
469 const poolTimeCapacity =
470 (performance.now() - this.startTimestamp) * this.maxSize
471 const totalTasksRunTime = this.workerNodes.reduce(
472 (accumulator, workerNode) =>
473 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
474 0
475 )
476 const totalTasksWaitTime = this.workerNodes.reduce(
477 (accumulator, workerNode) =>
478 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
479 0
480 )
481 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
482 }
483
484 /**
485 * The pool type.
486 *
487 * If it is `'dynamic'`, it provides the `max` property.
488 */
489 protected abstract get type (): PoolType
490
491 /**
492 * The worker type.
493 */
494 protected abstract get worker (): WorkerType
495
496 /**
497 * The pool minimum size.
498 */
499 protected abstract get minSize (): number
500
501 /**
502 * The pool maximum size.
503 */
504 protected abstract get maxSize (): number
505
506 /**
507 * Checks if the worker id sent in the received message from a worker is valid.
508 *
509 * @param message - The received message.
510 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
511 */
512 private checkMessageWorkerId (message: MessageValue<Response>): void {
513 if (
514 message.workerId != null &&
515 this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
516 ) {
517 throw new Error(
518 `Worker message received from unknown worker '${message.workerId}'`
519 )
520 }
521 }
522
523 /**
524 * Gets the given worker its worker node key.
525 *
526 * @param worker - The worker.
527 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
528 */
529 private getWorkerNodeKeyByWorker (worker: Worker): number {
530 return this.workerNodes.findIndex(
531 (workerNode) => workerNode.worker === worker
532 )
533 }
534
535 /**
536 * Gets the worker node key given its worker id.
537 *
538 * @param workerId - The worker id.
539 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
540 */
541 private getWorkerNodeKeyByWorkerId (workerId: number): number {
542 return this.workerNodes.findIndex(
543 (workerNode) => workerNode.info.id === workerId
544 )
545 }
546
547 /** @inheritDoc */
548 public setWorkerChoiceStrategy (
549 workerChoiceStrategy: WorkerChoiceStrategy,
550 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
551 ): void {
552 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
553 this.opts.workerChoiceStrategy = workerChoiceStrategy
554 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
555 this.opts.workerChoiceStrategy
556 )
557 if (workerChoiceStrategyOptions != null) {
558 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
559 }
560 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
561 workerNode.resetUsage()
562 this.sendStatisticsMessageToWorker(workerNodeKey)
563 }
564 }
565
566 /** @inheritDoc */
567 public setWorkerChoiceStrategyOptions (
568 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
569 ): void {
570 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
571 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
572 this.workerChoiceStrategyContext.setOptions(
573 this.opts.workerChoiceStrategyOptions
574 )
575 }
576
577 /** @inheritDoc */
578 public enableTasksQueue (
579 enable: boolean,
580 tasksQueueOptions?: TasksQueueOptions
581 ): void {
582 if (this.opts.enableTasksQueue === true && !enable) {
583 this.flushTasksQueues()
584 }
585 this.opts.enableTasksQueue = enable
586 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
587 }
588
589 /** @inheritDoc */
590 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
591 if (this.opts.enableTasksQueue === true) {
592 this.checkValidTasksQueueOptions(tasksQueueOptions)
593 this.opts.tasksQueueOptions =
594 this.buildTasksQueueOptions(tasksQueueOptions)
595 } else if (this.opts.tasksQueueOptions != null) {
596 delete this.opts.tasksQueueOptions
597 }
598 }
599
600 private buildTasksQueueOptions (
601 tasksQueueOptions: TasksQueueOptions
602 ): TasksQueueOptions {
603 return {
604 concurrency: tasksQueueOptions?.concurrency ?? 1
605 }
606 }
607
608 /**
609 * Whether the pool is full or not.
610 *
611 * The pool filling boolean status.
612 */
613 protected get full (): boolean {
614 return this.workerNodes.length >= this.maxSize
615 }
616
617 /**
618 * Whether the pool is busy or not.
619 *
620 * The pool busyness boolean status.
621 */
622 protected abstract get busy (): boolean
623
624 /**
625 * Whether worker nodes are executing concurrently their tasks quota or not.
626 *
627 * @returns Worker nodes busyness boolean status.
628 */
629 protected internalBusy (): boolean {
630 if (this.opts.enableTasksQueue === true) {
631 return (
632 this.workerNodes.findIndex(
633 (workerNode) =>
634 workerNode.info.ready &&
635 workerNode.usage.tasks.executing <
636 (this.opts.tasksQueueOptions?.concurrency as number)
637 ) === -1
638 )
639 } else {
640 return (
641 this.workerNodes.findIndex(
642 (workerNode) =>
643 workerNode.info.ready && workerNode.usage.tasks.executing === 0
644 ) === -1
645 )
646 }
647 }
648
649 /** @inheritDoc */
650 public listTaskFunctions (): string[] {
651 if (this.taskFunctions != null) {
652 return this.taskFunctions
653 } else {
654 return []
655 }
656 }
657
658 /** @inheritDoc */
659 public async execute (
660 data?: Data,
661 name?: string,
662 transferList?: TransferListItem[]
663 ): Promise<Response> {
664 return await new Promise<Response>((resolve, reject) => {
665 if (name != null && typeof name !== 'string') {
666 reject(new TypeError('name argument must be a string'))
667 }
668 if (
669 name != null &&
670 typeof name === 'string' &&
671 name.trim().length === 0
672 ) {
673 reject(new TypeError('name argument must not be an empty string'))
674 }
675 if (
676 name != null &&
677 this.taskFunctions != null &&
678 !this.taskFunctions.includes(name)
679 ) {
680 reject(
681 new Error(`Task function '${name}' is not registered in the pool`)
682 )
683 }
684 if (transferList != null && !Array.isArray(transferList)) {
685 reject(new TypeError('transferList argument must be an array'))
686 }
687 const timestamp = performance.now()
688 const workerNodeKey = this.chooseWorkerNode()
689 const task: Task<Data> = {
690 name: name ?? DEFAULT_TASK_NAME,
691 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
692 data: data ?? ({} as Data),
693 transferList,
694 timestamp,
695 workerId: this.getWorkerInfo(workerNodeKey).id as number,
696 taskId: randomUUID()
697 }
698 this.promiseResponseMap.set(task.taskId as string, {
699 resolve,
700 reject,
701 workerNodeKey
702 })
703 if (
704 this.opts.enableTasksQueue === false ||
705 (this.opts.enableTasksQueue === true &&
706 this.workerNodes[workerNodeKey].usage.tasks.executing <
707 (this.opts.tasksQueueOptions?.concurrency as number))
708 ) {
709 this.executeTask(workerNodeKey, task)
710 } else {
711 this.enqueueTask(workerNodeKey, task)
712 }
713 this.checkAndEmitEvents()
714 })
715 }
716
717 /** @inheritDoc */
718 public async destroy (): Promise<void> {
719 await Promise.all(
720 this.workerNodes.map(async (_, workerNodeKey) => {
721 await this.destroyWorkerNode(workerNodeKey)
722 })
723 )
724 this.emitter?.emit(PoolEvents.destroy)
725 }
726
727 protected async sendKillMessageToWorker (
728 workerNodeKey: number,
729 workerId: number
730 ): Promise<void> {
731 await new Promise<void>((resolve, reject) => {
732 this.registerWorkerMessageListener(workerNodeKey, (message) => {
733 if (message.kill === 'success') {
734 resolve()
735 } else if (message.kill === 'failure') {
736 reject(new Error(`Worker ${workerId} kill message handling failed`))
737 }
738 })
739 this.sendToWorker(workerNodeKey, { kill: true, workerId })
740 })
741 }
742
743 /**
744 * Terminates the worker node given its worker node key.
745 *
746 * @param workerNodeKey - The worker node key.
747 */
748 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
749
750 /**
751 * Setup hook to execute code before worker nodes are created in the abstract constructor.
752 * Can be overridden.
753 *
754 * @virtual
755 */
756 protected setupHook (): void {
757 // Intentionally empty
758 }
759
760 /**
761 * Should return whether the worker is the main worker or not.
762 */
763 protected abstract isMain (): boolean
764
765 /**
766 * Hook executed before the worker task execution.
767 * Can be overridden.
768 *
769 * @param workerNodeKey - The worker node key.
770 * @param task - The task to execute.
771 */
772 protected beforeTaskExecutionHook (
773 workerNodeKey: number,
774 task: Task<Data>
775 ): void {
776 const workerUsage = this.workerNodes[workerNodeKey].usage
777 ++workerUsage.tasks.executing
778 this.updateWaitTimeWorkerUsage(workerUsage, task)
779 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
780 task.name as string
781 ) as WorkerUsage
782 ++taskWorkerUsage.tasks.executing
783 this.updateWaitTimeWorkerUsage(taskWorkerUsage, task)
784 }
785
786 /**
787 * Hook executed after the worker task execution.
788 * Can be overridden.
789 *
790 * @param workerNodeKey - The worker node key.
791 * @param message - The received message.
792 */
793 protected afterTaskExecutionHook (
794 workerNodeKey: number,
795 message: MessageValue<Response>
796 ): void {
797 const workerUsage = this.workerNodes[workerNodeKey].usage
798 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
799 this.updateRunTimeWorkerUsage(workerUsage, message)
800 this.updateEluWorkerUsage(workerUsage, message)
801 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
802 message.taskPerformance?.name ?? DEFAULT_TASK_NAME
803 ) as WorkerUsage
804 this.updateTaskStatisticsWorkerUsage(taskWorkerUsage, message)
805 this.updateRunTimeWorkerUsage(taskWorkerUsage, message)
806 this.updateEluWorkerUsage(taskWorkerUsage, message)
807 }
808
809 private updateTaskStatisticsWorkerUsage (
810 workerUsage: WorkerUsage,
811 message: MessageValue<Response>
812 ): void {
813 const workerTaskStatistics = workerUsage.tasks
814 --workerTaskStatistics.executing
815 if (message.taskError == null) {
816 ++workerTaskStatistics.executed
817 } else {
818 ++workerTaskStatistics.failed
819 }
820 }
821
822 private updateRunTimeWorkerUsage (
823 workerUsage: WorkerUsage,
824 message: MessageValue<Response>
825 ): void {
826 updateMeasurementStatistics(
827 workerUsage.runTime,
828 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
829 message.taskPerformance?.runTime ?? 0,
830 workerUsage.tasks.executed
831 )
832 }
833
834 private updateWaitTimeWorkerUsage (
835 workerUsage: WorkerUsage,
836 task: Task<Data>
837 ): void {
838 const timestamp = performance.now()
839 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
840 updateMeasurementStatistics(
841 workerUsage.waitTime,
842 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
843 taskWaitTime,
844 workerUsage.tasks.executed
845 )
846 }
847
848 private updateEluWorkerUsage (
849 workerUsage: WorkerUsage,
850 message: MessageValue<Response>
851 ): void {
852 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
853 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
854 updateMeasurementStatistics(
855 workerUsage.elu.active,
856 eluTaskStatisticsRequirements,
857 message.taskPerformance?.elu?.active ?? 0,
858 workerUsage.tasks.executed
859 )
860 updateMeasurementStatistics(
861 workerUsage.elu.idle,
862 eluTaskStatisticsRequirements,
863 message.taskPerformance?.elu?.idle ?? 0,
864 workerUsage.tasks.executed
865 )
866 if (eluTaskStatisticsRequirements.aggregate) {
867 if (message.taskPerformance?.elu != null) {
868 if (workerUsage.elu.utilization != null) {
869 workerUsage.elu.utilization =
870 (workerUsage.elu.utilization +
871 message.taskPerformance.elu.utilization) /
872 2
873 } else {
874 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
875 }
876 }
877 }
878 }
879
880 /**
881 * Chooses a worker node for the next task.
882 *
883 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
884 *
885 * @returns The chosen worker node key
886 */
887 private chooseWorkerNode (): number {
888 if (this.shallCreateDynamicWorker()) {
889 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
890 if (
891 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
892 ) {
893 return workerNodeKey
894 }
895 }
896 return this.workerChoiceStrategyContext.execute()
897 }
898
899 /**
900 * Conditions for dynamic worker creation.
901 *
902 * @returns Whether to create a dynamic worker or not.
903 */
904 private shallCreateDynamicWorker (): boolean {
905 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
906 }
907
908 /**
909 * Sends a message to worker given its worker node key.
910 *
911 * @param workerNodeKey - The worker node key.
912 * @param message - The message.
913 * @param transferList - The optional array of transferable objects.
914 */
915 protected abstract sendToWorker (
916 workerNodeKey: number,
917 message: MessageValue<Data>,
918 transferList?: TransferListItem[]
919 ): void
920
921 /**
922 * Creates a new worker.
923 *
924 * @returns Newly created worker.
925 */
926 protected abstract createWorker (): Worker
927
928 /**
929 * Creates a new, completely set up worker node.
930 *
931 * @returns New, completely set up worker node key.
932 */
933 protected createAndSetupWorkerNode (): number {
934 const worker = this.createWorker()
935
936 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
937 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
938 worker.on('error', (error) => {
939 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
940 const workerInfo = this.getWorkerInfo(workerNodeKey)
941 workerInfo.ready = false
942 this.workerNodes[workerNodeKey].closeChannel()
943 this.emitter?.emit(PoolEvents.error, error)
944 if (this.opts.restartWorkerOnError === true && !this.starting) {
945 if (workerInfo.dynamic) {
946 this.createAndSetupDynamicWorkerNode()
947 } else {
948 this.createAndSetupWorkerNode()
949 }
950 }
951 if (this.opts.enableTasksQueue === true) {
952 this.redistributeQueuedTasks(workerNodeKey)
953 }
954 })
955 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
956 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
957 worker.once('exit', () => {
958 this.removeWorkerNode(worker)
959 })
960
961 const workerNodeKey = this.addWorkerNode(worker)
962
963 this.afterWorkerNodeSetup(workerNodeKey)
964
965 return workerNodeKey
966 }
967
968 /**
969 * Creates a new, completely set up dynamic worker node.
970 *
971 * @returns New, completely set up dynamic worker node key.
972 */
973 protected createAndSetupDynamicWorkerNode (): number {
974 const workerNodeKey = this.createAndSetupWorkerNode()
975 this.registerWorkerMessageListener(workerNodeKey, (message) => {
976 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
977 message.workerId
978 )
979 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
980 // Kill message received from worker
981 if (
982 isKillBehavior(KillBehaviors.HARD, message.kill) ||
983 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
984 ((this.opts.enableTasksQueue === false &&
985 workerUsage.tasks.executing === 0) ||
986 (this.opts.enableTasksQueue === true &&
987 workerUsage.tasks.executing === 0 &&
988 this.tasksQueueSize(localWorkerNodeKey) === 0)))
989 ) {
990 this.destroyWorkerNode(localWorkerNodeKey).catch((error) => {
991 this.emitter?.emit(PoolEvents.error, error)
992 })
993 }
994 })
995 const workerInfo = this.getWorkerInfo(workerNodeKey)
996 this.sendToWorker(workerNodeKey, {
997 checkActive: true,
998 workerId: workerInfo.id as number
999 })
1000 workerInfo.dynamic = true
1001 if (this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker) {
1002 workerInfo.ready = true
1003 }
1004 return workerNodeKey
1005 }
1006
1007 /**
1008 * Registers a listener callback on the worker given its worker node key.
1009 *
1010 * @param workerNodeKey - The worker node key.
1011 * @param listener - The message listener callback.
1012 */
1013 protected abstract registerWorkerMessageListener<
1014 Message extends Data | Response
1015 >(
1016 workerNodeKey: number,
1017 listener: (message: MessageValue<Message>) => void
1018 ): void
1019
1020 /**
1021 * Method hooked up after a worker node has been newly created.
1022 * Can be overridden.
1023 *
1024 * @param workerNodeKey - The newly created worker node key.
1025 */
1026 protected afterWorkerNodeSetup (workerNodeKey: number): void {
1027 // Listen to worker messages.
1028 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
1029 // Send the startup message to worker.
1030 this.sendStartupMessageToWorker(workerNodeKey)
1031 // Send the statistics message to worker.
1032 this.sendStatisticsMessageToWorker(workerNodeKey)
1033 }
1034
1035 /**
1036 * Sends the startup message to worker given its worker node key.
1037 *
1038 * @param workerNodeKey - The worker node key.
1039 */
1040 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1041
1042 /**
1043 * Sends the statistics message to worker given its worker node key.
1044 *
1045 * @param workerNodeKey - The worker node key.
1046 */
1047 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
1048 this.sendToWorker(workerNodeKey, {
1049 statistics: {
1050 runTime:
1051 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1052 .runTime.aggregate,
1053 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1054 .elu.aggregate
1055 },
1056 workerId: this.getWorkerInfo(workerNodeKey).id as number
1057 })
1058 }
1059
1060 private redistributeQueuedTasks (workerNodeKey: number): void {
1061 while (this.tasksQueueSize(workerNodeKey) > 0) {
1062 let targetWorkerNodeKey: number = workerNodeKey
1063 let minQueuedTasks = Infinity
1064 let executeTask = false
1065 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
1066 const workerInfo = this.getWorkerInfo(workerNodeId)
1067 if (
1068 workerNodeId !== workerNodeKey &&
1069 workerInfo.ready &&
1070 workerNode.usage.tasks.queued === 0
1071 ) {
1072 if (
1073 this.workerNodes[workerNodeId].usage.tasks.executing <
1074 (this.opts.tasksQueueOptions?.concurrency as number)
1075 ) {
1076 executeTask = true
1077 }
1078 targetWorkerNodeKey = workerNodeId
1079 break
1080 }
1081 if (
1082 workerNodeId !== workerNodeKey &&
1083 workerInfo.ready &&
1084 workerNode.usage.tasks.queued < minQueuedTasks
1085 ) {
1086 minQueuedTasks = workerNode.usage.tasks.queued
1087 targetWorkerNodeKey = workerNodeId
1088 }
1089 }
1090 if (executeTask) {
1091 this.executeTask(
1092 targetWorkerNodeKey,
1093 this.dequeueTask(workerNodeKey) as Task<Data>
1094 )
1095 } else {
1096 this.enqueueTask(
1097 targetWorkerNodeKey,
1098 this.dequeueTask(workerNodeKey) as Task<Data>
1099 )
1100 }
1101 }
1102 }
1103
1104 /**
1105 * This method is the listener registered for each worker message.
1106 *
1107 * @returns The listener function to execute when a message is received from a worker.
1108 */
1109 protected workerListener (): (message: MessageValue<Response>) => void {
1110 return (message) => {
1111 this.checkMessageWorkerId(message)
1112 if (message.ready != null) {
1113 // Worker ready response received from worker
1114 this.handleWorkerReadyResponse(message)
1115 } else if (message.taskId != null) {
1116 // Task execution response received from worker
1117 this.handleTaskExecutionResponse(message)
1118 } else if (message.taskFunctions != null) {
1119 // Task functions message received from worker
1120 this.taskFunctions = message.taskFunctions
1121 }
1122 }
1123 }
1124
1125 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
1126 this.getWorkerInfo(
1127 this.getWorkerNodeKeyByWorkerId(message.workerId)
1128 ).ready = message.ready as boolean
1129 if (this.emitter != null && this.ready) {
1130 this.emitter.emit(PoolEvents.ready, this.info)
1131 }
1132 }
1133
1134 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1135 const promiseResponse = this.promiseResponseMap.get(
1136 message.taskId as string
1137 )
1138 if (promiseResponse != null) {
1139 if (message.taskError != null) {
1140 this.emitter?.emit(PoolEvents.taskError, message.taskError)
1141 promiseResponse.reject(message.taskError.message)
1142 } else {
1143 promiseResponse.resolve(message.data as Response)
1144 }
1145 const workerNodeKey = promiseResponse.workerNodeKey
1146 this.afterTaskExecutionHook(workerNodeKey, message)
1147 this.promiseResponseMap.delete(message.taskId as string)
1148 if (
1149 this.opts.enableTasksQueue === true &&
1150 this.tasksQueueSize(workerNodeKey) > 0 &&
1151 this.workerNodes[workerNodeKey].usage.tasks.executing <
1152 (this.opts.tasksQueueOptions?.concurrency as number)
1153 ) {
1154 this.executeTask(
1155 workerNodeKey,
1156 this.dequeueTask(workerNodeKey) as Task<Data>
1157 )
1158 }
1159 this.workerChoiceStrategyContext.update(workerNodeKey)
1160 }
1161 }
1162
1163 private checkAndEmitEvents (): void {
1164 if (this.emitter != null) {
1165 if (this.busy) {
1166 this.emitter.emit(PoolEvents.busy, this.info)
1167 }
1168 if (this.type === PoolTypes.dynamic && this.full) {
1169 this.emitter.emit(PoolEvents.full, this.info)
1170 }
1171 }
1172 }
1173
1174 /**
1175 * Gets the worker information given its worker node key.
1176 *
1177 * @param workerNodeKey - The worker node key.
1178 * @returns The worker information.
1179 */
1180 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
1181 return this.workerNodes[workerNodeKey].info
1182 }
1183
1184 /**
1185 * Adds the given worker in the pool worker nodes.
1186 *
1187 * @param worker - The worker.
1188 * @returns The added worker node key.
1189 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1190 */
1191 private addWorkerNode (worker: Worker): number {
1192 const workerNode = new WorkerNode<Worker, Data>(worker, this.worker)
1193 // Flag the worker node as ready at pool startup.
1194 if (this.starting) {
1195 workerNode.info.ready = true
1196 }
1197 this.workerNodes.push(workerNode)
1198 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1199 if (workerNodeKey === -1) {
1200 throw new Error('Worker node not found')
1201 }
1202 return workerNodeKey
1203 }
1204
1205 /**
1206 * Removes the given worker from the pool worker nodes.
1207 *
1208 * @param worker - The worker.
1209 */
1210 private removeWorkerNode (worker: Worker): void {
1211 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1212 if (workerNodeKey !== -1) {
1213 this.workerNodes.splice(workerNodeKey, 1)
1214 this.workerChoiceStrategyContext.remove(workerNodeKey)
1215 }
1216 }
1217
1218 /**
1219 * Executes the given task on the worker given its worker node key.
1220 *
1221 * @param workerNodeKey - The worker node key.
1222 * @param task - The task to execute.
1223 */
1224 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1225 this.beforeTaskExecutionHook(workerNodeKey, task)
1226 this.sendToWorker(workerNodeKey, task, task.transferList)
1227 }
1228
1229 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1230 return this.workerNodes[workerNodeKey].enqueueTask(task)
1231 }
1232
1233 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1234 return this.workerNodes[workerNodeKey].dequeueTask()
1235 }
1236
1237 private tasksQueueSize (workerNodeKey: number): number {
1238 return this.workerNodes[workerNodeKey].tasksQueueSize()
1239 }
1240
1241 protected flushTasksQueue (workerNodeKey: number): void {
1242 while (this.tasksQueueSize(workerNodeKey) > 0) {
1243 this.executeTask(
1244 workerNodeKey,
1245 this.dequeueTask(workerNodeKey) as Task<Data>
1246 )
1247 }
1248 this.workerNodes[workerNodeKey].clearTasksQueue()
1249 }
1250
1251 private flushTasksQueues (): void {
1252 for (const [workerNodeKey] of this.workerNodes.entries()) {
1253 this.flushTasksQueue(workerNodeKey)
1254 }
1255 }
1256}