feat: sync task function names in pool
[poolifier.git] / src / pools / abstract-pool.ts
1 import { randomUUID } from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import { existsSync } from 'node:fs'
4 import { type TransferListItem } from 'node:worker_threads'
5 import type {
6 MessageValue,
7 PromiseResponseWrapper,
8 Task
9 } from '../utility-types'
10 import {
11 DEFAULT_TASK_NAME,
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
13 EMPTY_FUNCTION,
14 isKillBehavior,
15 isPlainObject,
16 median,
17 round,
18 updateMeasurementStatistics
19 } from '../utils'
20 import { KillBehaviors } from '../worker/worker-options'
21 import {
22 type IPool,
23 PoolEmitter,
24 PoolEvents,
25 type PoolInfo,
26 type PoolOptions,
27 type PoolType,
28 PoolTypes,
29 type TasksQueueOptions
30 } from './pool'
31 import type {
32 IWorker,
33 IWorkerNode,
34 WorkerInfo,
35 WorkerType,
36 WorkerUsage
37 } from './worker'
38 import {
39 type MeasurementStatisticsRequirements,
40 Measurements,
41 WorkerChoiceStrategies,
42 type WorkerChoiceStrategy,
43 type WorkerChoiceStrategyOptions
44 } from './selection-strategies/selection-strategies-types'
45 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
46 import { version } from './version'
47 import { WorkerNode } from './worker-node'
48
49 /**
50 * Base class that implements some shared logic for all poolifier pools.
51 *
52 * @typeParam Worker - Type of worker which manages this pool.
53 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
54 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
55 */
56 export abstract class AbstractPool<
57 Worker extends IWorker,
58 Data = unknown,
59 Response = unknown
60 > implements IPool<Worker, Data, Response> {
61 /** @inheritDoc */
62 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
63
64 /** @inheritDoc */
65 public readonly emitter?: PoolEmitter
66
67 /**
68 * The task execution response promise map.
69 *
70 * - `key`: The message id of each submitted task.
71 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
72 *
73 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
74 */
75 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
76 new Map<string, PromiseResponseWrapper<Response>>()
77
78 /**
79 * Worker choice strategy context referencing a worker choice algorithm implementation.
80 */
81 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
82 Worker,
83 Data,
84 Response
85 >
86
87 /**
88 * Whether the pool is starting or not.
89 */
90 private readonly starting: boolean
91 /**
92 * The start timestamp of the pool.
93 */
94 private readonly startTimestamp
95 /**
96 * The task function names.
97 */
98 private taskFunctions!: string[]
99
100 /**
101 * Constructs a new poolifier pool.
102 *
103 * @param numberOfWorkers - Number of workers that this pool should manage.
104 * @param filePath - Path to the worker file.
105 * @param opts - Options for the pool.
106 */
107 public constructor (
108 protected readonly numberOfWorkers: number,
109 protected readonly filePath: string,
110 protected readonly opts: PoolOptions<Worker>
111 ) {
112 if (!this.isMain()) {
113 throw new Error(
114 'Cannot start a pool from a worker with the same type as the pool'
115 )
116 }
117 this.checkNumberOfWorkers(this.numberOfWorkers)
118 this.checkFilePath(this.filePath)
119 this.checkPoolOptions(this.opts)
120
121 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
122 this.executeTask = this.executeTask.bind(this)
123 this.enqueueTask = this.enqueueTask.bind(this)
124 this.dequeueTask = this.dequeueTask.bind(this)
125 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
126
127 if (this.opts.enableEvents === true) {
128 this.emitter = new PoolEmitter()
129 }
130 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
131 Worker,
132 Data,
133 Response
134 >(
135 this,
136 this.opts.workerChoiceStrategy,
137 this.opts.workerChoiceStrategyOptions
138 )
139
140 this.setupHook()
141
142 this.starting = true
143 this.startPool()
144 this.starting = false
145
146 this.startTimestamp = performance.now()
147 }
148
149 private checkFilePath (filePath: string): void {
150 if (
151 filePath == null ||
152 typeof filePath !== 'string' ||
153 (typeof filePath === 'string' && filePath.trim().length === 0)
154 ) {
155 throw new Error('Please specify a file with a worker implementation')
156 }
157 if (!existsSync(filePath)) {
158 throw new Error(`Cannot find the worker file '${filePath}'`)
159 }
160 }
161
162 private checkNumberOfWorkers (numberOfWorkers: number): void {
163 if (numberOfWorkers == null) {
164 throw new Error(
165 'Cannot instantiate a pool without specifying the number of workers'
166 )
167 } else if (!Number.isSafeInteger(numberOfWorkers)) {
168 throw new TypeError(
169 'Cannot instantiate a pool with a non safe integer number of workers'
170 )
171 } else if (numberOfWorkers < 0) {
172 throw new RangeError(
173 'Cannot instantiate a pool with a negative number of workers'
174 )
175 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
176 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
177 }
178 }
179
180 protected checkDynamicPoolSize (min: number, max: number): void {
181 if (this.type === PoolTypes.dynamic) {
182 if (max == null) {
183 throw new Error(
184 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
185 )
186 } else if (!Number.isSafeInteger(max)) {
187 throw new TypeError(
188 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
189 )
190 } else if (min > max) {
191 throw new RangeError(
192 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
193 )
194 } else if (max === 0) {
195 throw new RangeError(
196 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
197 )
198 } else if (min === max) {
199 throw new RangeError(
200 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
201 )
202 }
203 }
204 }
205
206 private checkPoolOptions (opts: PoolOptions<Worker>): void {
207 if (isPlainObject(opts)) {
208 this.opts.workerChoiceStrategy =
209 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
210 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
211 this.opts.workerChoiceStrategyOptions =
212 opts.workerChoiceStrategyOptions ??
213 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
214 this.checkValidWorkerChoiceStrategyOptions(
215 this.opts.workerChoiceStrategyOptions
216 )
217 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
218 this.opts.enableEvents = opts.enableEvents ?? true
219 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
220 if (this.opts.enableTasksQueue) {
221 this.checkValidTasksQueueOptions(
222 opts.tasksQueueOptions as TasksQueueOptions
223 )
224 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
225 opts.tasksQueueOptions as TasksQueueOptions
226 )
227 }
228 } else {
229 throw new TypeError('Invalid pool options: must be a plain object')
230 }
231 }
232
233 private checkValidWorkerChoiceStrategy (
234 workerChoiceStrategy: WorkerChoiceStrategy
235 ): void {
236 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
237 throw new Error(
238 `Invalid worker choice strategy '${workerChoiceStrategy}'`
239 )
240 }
241 }
242
243 private checkValidWorkerChoiceStrategyOptions (
244 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
245 ): void {
246 if (!isPlainObject(workerChoiceStrategyOptions)) {
247 throw new TypeError(
248 'Invalid worker choice strategy options: must be a plain object'
249 )
250 }
251 if (
252 workerChoiceStrategyOptions.weights != null &&
253 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
254 ) {
255 throw new Error(
256 'Invalid worker choice strategy options: must have a weight for each worker node'
257 )
258 }
259 if (
260 workerChoiceStrategyOptions.measurement != null &&
261 !Object.values(Measurements).includes(
262 workerChoiceStrategyOptions.measurement
263 )
264 ) {
265 throw new Error(
266 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
267 )
268 }
269 }
270
271 private checkValidTasksQueueOptions (
272 tasksQueueOptions: TasksQueueOptions
273 ): void {
274 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
275 throw new TypeError('Invalid tasks queue options: must be a plain object')
276 }
277 if (
278 tasksQueueOptions?.concurrency != null &&
279 !Number.isSafeInteger(tasksQueueOptions.concurrency)
280 ) {
281 throw new TypeError(
282 'Invalid worker tasks concurrency: must be an integer'
283 )
284 }
285 if (
286 tasksQueueOptions?.concurrency != null &&
287 tasksQueueOptions.concurrency <= 0
288 ) {
289 throw new Error(
290 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
291 )
292 }
293 }
294
295 private startPool (): void {
296 while (
297 this.workerNodes.reduce(
298 (accumulator, workerNode) =>
299 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
300 0
301 ) < this.numberOfWorkers
302 ) {
303 this.createAndSetupWorkerNode()
304 }
305 }
306
307 /** @inheritDoc */
308 public get info (): PoolInfo {
309 return {
310 version,
311 type: this.type,
312 worker: this.worker,
313 ready: this.ready,
314 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
315 minSize: this.minSize,
316 maxSize: this.maxSize,
317 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
318 .runTime.aggregate &&
319 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
320 .waitTime.aggregate && { utilization: round(this.utilization) }),
321 workerNodes: this.workerNodes.length,
322 idleWorkerNodes: this.workerNodes.reduce(
323 (accumulator, workerNode) =>
324 workerNode.usage.tasks.executing === 0
325 ? accumulator + 1
326 : accumulator,
327 0
328 ),
329 busyWorkerNodes: this.workerNodes.reduce(
330 (accumulator, workerNode) =>
331 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
332 0
333 ),
334 executedTasks: this.workerNodes.reduce(
335 (accumulator, workerNode) =>
336 accumulator + workerNode.usage.tasks.executed,
337 0
338 ),
339 executingTasks: this.workerNodes.reduce(
340 (accumulator, workerNode) =>
341 accumulator + workerNode.usage.tasks.executing,
342 0
343 ),
344 ...(this.opts.enableTasksQueue === true && {
345 queuedTasks: this.workerNodes.reduce(
346 (accumulator, workerNode) =>
347 accumulator + workerNode.usage.tasks.queued,
348 0
349 )
350 }),
351 ...(this.opts.enableTasksQueue === true && {
352 maxQueuedTasks: this.workerNodes.reduce(
353 (accumulator, workerNode) =>
354 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
355 0
356 )
357 }),
358 failedTasks: this.workerNodes.reduce(
359 (accumulator, workerNode) =>
360 accumulator + workerNode.usage.tasks.failed,
361 0
362 ),
363 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
364 .runTime.aggregate && {
365 runTime: {
366 minimum: round(
367 Math.min(
368 ...this.workerNodes.map(
369 (workerNode) => workerNode.usage.runTime?.minimum ?? Infinity
370 )
371 )
372 ),
373 maximum: round(
374 Math.max(
375 ...this.workerNodes.map(
376 (workerNode) => workerNode.usage.runTime?.maximum ?? -Infinity
377 )
378 )
379 ),
380 average: round(
381 this.workerNodes.reduce(
382 (accumulator, workerNode) =>
383 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
384 0
385 ) /
386 this.workerNodes.reduce(
387 (accumulator, workerNode) =>
388 accumulator + (workerNode.usage.tasks?.executed ?? 0),
389 0
390 )
391 ),
392 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
393 .runTime.median && {
394 median: round(
395 median(
396 this.workerNodes.map(
397 (workerNode) => workerNode.usage.runTime?.median ?? 0
398 )
399 )
400 )
401 })
402 }
403 }),
404 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
405 .waitTime.aggregate && {
406 waitTime: {
407 minimum: round(
408 Math.min(
409 ...this.workerNodes.map(
410 (workerNode) => workerNode.usage.waitTime?.minimum ?? Infinity
411 )
412 )
413 ),
414 maximum: round(
415 Math.max(
416 ...this.workerNodes.map(
417 (workerNode) => workerNode.usage.waitTime?.maximum ?? -Infinity
418 )
419 )
420 ),
421 average: round(
422 this.workerNodes.reduce(
423 (accumulator, workerNode) =>
424 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
425 0
426 ) /
427 this.workerNodes.reduce(
428 (accumulator, workerNode) =>
429 accumulator + (workerNode.usage.tasks?.executed ?? 0),
430 0
431 )
432 ),
433 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
434 .waitTime.median && {
435 median: round(
436 median(
437 this.workerNodes.map(
438 (workerNode) => workerNode.usage.waitTime?.median ?? 0
439 )
440 )
441 )
442 })
443 }
444 })
445 }
446 }
447
448 /**
449 * The pool readiness boolean status.
450 */
451 private get ready (): boolean {
452 return (
453 this.workerNodes.reduce(
454 (accumulator, workerNode) =>
455 !workerNode.info.dynamic && workerNode.info.ready
456 ? accumulator + 1
457 : accumulator,
458 0
459 ) >= this.minSize
460 )
461 }
462
463 /**
464 * The approximate pool utilization.
465 *
466 * @returns The pool utilization.
467 */
468 private get utilization (): number {
469 const poolTimeCapacity =
470 (performance.now() - this.startTimestamp) * this.maxSize
471 const totalTasksRunTime = this.workerNodes.reduce(
472 (accumulator, workerNode) =>
473 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
474 0
475 )
476 const totalTasksWaitTime = this.workerNodes.reduce(
477 (accumulator, workerNode) =>
478 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
479 0
480 )
481 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
482 }
483
484 /**
485 * The pool type.
486 *
487 * If it is `'dynamic'`, it provides the `max` property.
488 */
489 protected abstract get type (): PoolType
490
491 /**
492 * The worker type.
493 */
494 protected abstract get worker (): WorkerType
495
496 /**
497 * The pool minimum size.
498 */
499 protected abstract get minSize (): number
500
501 /**
502 * The pool maximum size.
503 */
504 protected abstract get maxSize (): number
505
506 /**
507 * Checks if the worker id sent in the received message from a worker is valid.
508 *
509 * @param message - The received message.
510 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
511 */
512 private checkMessageWorkerId (message: MessageValue<Response>): void {
513 if (
514 message.workerId != null &&
515 this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
516 ) {
517 throw new Error(
518 `Worker message received from unknown worker '${message.workerId}'`
519 )
520 }
521 }
522
523 /**
524 * Gets the given worker its worker node key.
525 *
526 * @param worker - The worker.
527 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
528 */
529 private getWorkerNodeKeyByWorker (worker: Worker): number {
530 return this.workerNodes.findIndex(
531 (workerNode) => workerNode.worker === worker
532 )
533 }
534
535 /**
536 * Gets the worker node key given its worker id.
537 *
538 * @param workerId - The worker id.
539 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
540 */
541 private getWorkerNodeKeyByWorkerId (workerId: number): number {
542 return this.workerNodes.findIndex(
543 (workerNode) => workerNode.info.id === workerId
544 )
545 }
546
547 /** @inheritDoc */
548 public setWorkerChoiceStrategy (
549 workerChoiceStrategy: WorkerChoiceStrategy,
550 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
551 ): void {
552 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
553 this.opts.workerChoiceStrategy = workerChoiceStrategy
554 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
555 this.opts.workerChoiceStrategy
556 )
557 if (workerChoiceStrategyOptions != null) {
558 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
559 }
560 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
561 workerNode.resetUsage()
562 this.sendStatisticsMessageToWorker(workerNodeKey)
563 }
564 }
565
566 /** @inheritDoc */
567 public setWorkerChoiceStrategyOptions (
568 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
569 ): void {
570 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
571 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
572 this.workerChoiceStrategyContext.setOptions(
573 this.opts.workerChoiceStrategyOptions
574 )
575 }
576
577 /** @inheritDoc */
578 public enableTasksQueue (
579 enable: boolean,
580 tasksQueueOptions?: TasksQueueOptions
581 ): void {
582 if (this.opts.enableTasksQueue === true && !enable) {
583 this.flushTasksQueues()
584 }
585 this.opts.enableTasksQueue = enable
586 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
587 }
588
589 /** @inheritDoc */
590 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
591 if (this.opts.enableTasksQueue === true) {
592 this.checkValidTasksQueueOptions(tasksQueueOptions)
593 this.opts.tasksQueueOptions =
594 this.buildTasksQueueOptions(tasksQueueOptions)
595 } else if (this.opts.tasksQueueOptions != null) {
596 delete this.opts.tasksQueueOptions
597 }
598 }
599
600 private buildTasksQueueOptions (
601 tasksQueueOptions: TasksQueueOptions
602 ): TasksQueueOptions {
603 return {
604 concurrency: tasksQueueOptions?.concurrency ?? 1
605 }
606 }
607
608 /**
609 * Whether the pool is full or not.
610 *
611 * The pool filling boolean status.
612 */
613 protected get full (): boolean {
614 return this.workerNodes.length >= this.maxSize
615 }
616
617 /**
618 * Whether the pool is busy or not.
619 *
620 * The pool busyness boolean status.
621 */
622 protected abstract get busy (): boolean
623
624 /**
625 * Whether worker nodes are executing concurrently their tasks quota or not.
626 *
627 * @returns Worker nodes busyness boolean status.
628 */
629 protected internalBusy (): boolean {
630 if (this.opts.enableTasksQueue === true) {
631 return (
632 this.workerNodes.findIndex(
633 (workerNode) =>
634 workerNode.info.ready &&
635 workerNode.usage.tasks.executing <
636 (this.opts.tasksQueueOptions?.concurrency as number)
637 ) === -1
638 )
639 } else {
640 return (
641 this.workerNodes.findIndex(
642 (workerNode) =>
643 workerNode.info.ready && workerNode.usage.tasks.executing === 0
644 ) === -1
645 )
646 }
647 }
648
649 /** @inheritDoc */
650 public listTaskFunctions (): string[] {
651 if (this.taskFunctions != null) {
652 return this.taskFunctions
653 } else {
654 return []
655 }
656 }
657
658 /** @inheritDoc */
659 public async execute (
660 data?: Data,
661 name?: string,
662 transferList?: TransferListItem[]
663 ): Promise<Response> {
664 return await new Promise<Response>((resolve, reject) => {
665 if (name != null && typeof name !== 'string') {
666 reject(new TypeError('name argument must be a string'))
667 }
668 if (
669 name != null &&
670 typeof name === 'string' &&
671 name.trim().length === 0
672 ) {
673 reject(new Error('name argument must not be an empty string'))
674 }
675 if (name != null && !this.taskFunctions.includes(name)) {
676 reject(
677 new Error(`Task function '${name}' is not registered in the pool`)
678 )
679 }
680 if (transferList != null && !Array.isArray(transferList)) {
681 reject(new TypeError('transferList argument must be an array'))
682 }
683 const timestamp = performance.now()
684 const workerNodeKey = this.chooseWorkerNode()
685 const task: Task<Data> = {
686 name: name ?? DEFAULT_TASK_NAME,
687 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
688 data: data ?? ({} as Data),
689 transferList,
690 timestamp,
691 workerId: this.getWorkerInfo(workerNodeKey).id as number,
692 taskId: randomUUID()
693 }
694 this.promiseResponseMap.set(task.taskId as string, {
695 resolve,
696 reject,
697 workerNodeKey
698 })
699 if (
700 this.opts.enableTasksQueue === false ||
701 (this.opts.enableTasksQueue === true &&
702 this.workerNodes[workerNodeKey].usage.tasks.executing <
703 (this.opts.tasksQueueOptions?.concurrency as number))
704 ) {
705 this.executeTask(workerNodeKey, task)
706 } else {
707 this.enqueueTask(workerNodeKey, task)
708 }
709 this.checkAndEmitEvents()
710 })
711 }
712
713 /** @inheritDoc */
714 public async destroy (): Promise<void> {
715 await Promise.all(
716 this.workerNodes.map(async (_, workerNodeKey) => {
717 await this.destroyWorkerNode(workerNodeKey)
718 })
719 )
720 }
721
722 protected async sendKillMessageToWorker (
723 workerNodeKey: number,
724 workerId: number
725 ): Promise<void> {
726 await new Promise<void>((resolve, reject) => {
727 this.registerWorkerMessageListener(workerNodeKey, (message) => {
728 if (message.kill === 'success') {
729 resolve()
730 } else if (message.kill === 'failure') {
731 reject(new Error(`Worker ${workerId} kill message handling failed`))
732 }
733 })
734 this.sendToWorker(workerNodeKey, { kill: true, workerId })
735 })
736 }
737
738 /**
739 * Terminates the worker node given its worker node key.
740 *
741 * @param workerNodeKey - The worker node key.
742 */
743 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
744
745 /**
746 * Setup hook to execute code before worker nodes are created in the abstract constructor.
747 * Can be overridden.
748 *
749 * @virtual
750 */
751 protected setupHook (): void {
752 // Intentionally empty
753 }
754
755 /**
756 * Should return whether the worker is the main worker or not.
757 */
758 protected abstract isMain (): boolean
759
760 /**
761 * Hook executed before the worker task execution.
762 * Can be overridden.
763 *
764 * @param workerNodeKey - The worker node key.
765 * @param task - The task to execute.
766 */
767 protected beforeTaskExecutionHook (
768 workerNodeKey: number,
769 task: Task<Data>
770 ): void {
771 const workerUsage = this.workerNodes[workerNodeKey].usage
772 ++workerUsage.tasks.executing
773 this.updateWaitTimeWorkerUsage(workerUsage, task)
774 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
775 task.name as string
776 ) as WorkerUsage
777 ++taskWorkerUsage.tasks.executing
778 this.updateWaitTimeWorkerUsage(taskWorkerUsage, task)
779 }
780
781 /**
782 * Hook executed after the worker task execution.
783 * Can be overridden.
784 *
785 * @param workerNodeKey - The worker node key.
786 * @param message - The received message.
787 */
788 protected afterTaskExecutionHook (
789 workerNodeKey: number,
790 message: MessageValue<Response>
791 ): void {
792 const workerUsage = this.workerNodes[workerNodeKey].usage
793 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
794 this.updateRunTimeWorkerUsage(workerUsage, message)
795 this.updateEluWorkerUsage(workerUsage, message)
796 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
797 message.taskPerformance?.name ?? DEFAULT_TASK_NAME
798 ) as WorkerUsage
799 this.updateTaskStatisticsWorkerUsage(taskWorkerUsage, message)
800 this.updateRunTimeWorkerUsage(taskWorkerUsage, message)
801 this.updateEluWorkerUsage(taskWorkerUsage, message)
802 }
803
804 private updateTaskStatisticsWorkerUsage (
805 workerUsage: WorkerUsage,
806 message: MessageValue<Response>
807 ): void {
808 const workerTaskStatistics = workerUsage.tasks
809 --workerTaskStatistics.executing
810 if (message.taskError == null) {
811 ++workerTaskStatistics.executed
812 } else {
813 ++workerTaskStatistics.failed
814 }
815 }
816
817 private updateRunTimeWorkerUsage (
818 workerUsage: WorkerUsage,
819 message: MessageValue<Response>
820 ): void {
821 updateMeasurementStatistics(
822 workerUsage.runTime,
823 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
824 message.taskPerformance?.runTime ?? 0,
825 workerUsage.tasks.executed
826 )
827 }
828
829 private updateWaitTimeWorkerUsage (
830 workerUsage: WorkerUsage,
831 task: Task<Data>
832 ): void {
833 const timestamp = performance.now()
834 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
835 updateMeasurementStatistics(
836 workerUsage.waitTime,
837 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
838 taskWaitTime,
839 workerUsage.tasks.executed
840 )
841 }
842
843 private updateEluWorkerUsage (
844 workerUsage: WorkerUsage,
845 message: MessageValue<Response>
846 ): void {
847 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
848 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
849 updateMeasurementStatistics(
850 workerUsage.elu.active,
851 eluTaskStatisticsRequirements,
852 message.taskPerformance?.elu?.active ?? 0,
853 workerUsage.tasks.executed
854 )
855 updateMeasurementStatistics(
856 workerUsage.elu.idle,
857 eluTaskStatisticsRequirements,
858 message.taskPerformance?.elu?.idle ?? 0,
859 workerUsage.tasks.executed
860 )
861 if (eluTaskStatisticsRequirements.aggregate) {
862 if (message.taskPerformance?.elu != null) {
863 if (workerUsage.elu.utilization != null) {
864 workerUsage.elu.utilization =
865 (workerUsage.elu.utilization +
866 message.taskPerformance.elu.utilization) /
867 2
868 } else {
869 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
870 }
871 }
872 }
873 }
874
875 /**
876 * Chooses a worker node for the next task.
877 *
878 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
879 *
880 * @returns The chosen worker node key
881 */
882 private chooseWorkerNode (): number {
883 if (this.shallCreateDynamicWorker()) {
884 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
885 if (
886 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
887 ) {
888 return workerNodeKey
889 }
890 }
891 return this.workerChoiceStrategyContext.execute()
892 }
893
894 /**
895 * Conditions for dynamic worker creation.
896 *
897 * @returns Whether to create a dynamic worker or not.
898 */
899 private shallCreateDynamicWorker (): boolean {
900 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
901 }
902
903 /**
904 * Sends a message to worker given its worker node key.
905 *
906 * @param workerNodeKey - The worker node key.
907 * @param message - The message.
908 * @param transferList - The optional array of transferable objects.
909 */
910 protected abstract sendToWorker (
911 workerNodeKey: number,
912 message: MessageValue<Data>,
913 transferList?: TransferListItem[]
914 ): void
915
916 /**
917 * Creates a new worker.
918 *
919 * @returns Newly created worker.
920 */
921 protected abstract createWorker (): Worker
922
923 /**
924 * Creates a new, completely set up worker node.
925 *
926 * @returns New, completely set up worker node key.
927 */
928 protected createAndSetupWorkerNode (): number {
929 const worker = this.createWorker()
930
931 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
932 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
933 worker.on('error', (error) => {
934 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
935 const workerInfo = this.getWorkerInfo(workerNodeKey)
936 workerInfo.ready = false
937 this.workerNodes[workerNodeKey].closeChannel()
938 this.emitter?.emit(PoolEvents.error, error)
939 if (this.opts.restartWorkerOnError === true && !this.starting) {
940 if (workerInfo.dynamic) {
941 this.createAndSetupDynamicWorkerNode()
942 } else {
943 this.createAndSetupWorkerNode()
944 }
945 }
946 if (this.opts.enableTasksQueue === true) {
947 this.redistributeQueuedTasks(workerNodeKey)
948 }
949 })
950 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
951 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
952 worker.once('exit', () => {
953 this.removeWorkerNode(worker)
954 })
955
956 const workerNodeKey = this.addWorkerNode(worker)
957
958 this.afterWorkerNodeSetup(workerNodeKey)
959
960 return workerNodeKey
961 }
962
963 /**
964 * Creates a new, completely set up dynamic worker node.
965 *
966 * @returns New, completely set up dynamic worker node key.
967 */
968 protected createAndSetupDynamicWorkerNode (): number {
969 const workerNodeKey = this.createAndSetupWorkerNode()
970 this.registerWorkerMessageListener(workerNodeKey, (message) => {
971 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
972 message.workerId
973 )
974 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
975 // Kill message received from worker
976 if (
977 isKillBehavior(KillBehaviors.HARD, message.kill) ||
978 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
979 ((this.opts.enableTasksQueue === false &&
980 workerUsage.tasks.executing === 0) ||
981 (this.opts.enableTasksQueue === true &&
982 workerUsage.tasks.executing === 0 &&
983 this.tasksQueueSize(localWorkerNodeKey) === 0)))
984 ) {
985 this.destroyWorkerNode(localWorkerNodeKey).catch((error) => {
986 this.emitter?.emit(PoolEvents.error, error)
987 })
988 }
989 })
990 const workerInfo = this.getWorkerInfo(workerNodeKey)
991 this.sendToWorker(workerNodeKey, {
992 checkActive: true,
993 workerId: workerInfo.id as number
994 })
995 workerInfo.dynamic = true
996 if (this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker) {
997 workerInfo.ready = true
998 }
999 return workerNodeKey
1000 }
1001
1002 /**
1003 * Registers a listener callback on the worker given its worker node key.
1004 *
1005 * @param workerNodeKey - The worker node key.
1006 * @param listener - The message listener callback.
1007 */
1008 protected abstract registerWorkerMessageListener<
1009 Message extends Data | Response
1010 >(
1011 workerNodeKey: number,
1012 listener: (message: MessageValue<Message>) => void
1013 ): void
1014
1015 /**
1016 * Method hooked up after a worker node has been newly created.
1017 * Can be overridden.
1018 *
1019 * @param workerNodeKey - The newly created worker node key.
1020 */
1021 protected afterWorkerNodeSetup (workerNodeKey: number): void {
1022 // Listen to worker messages.
1023 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
1024 // Send the startup message to worker.
1025 this.sendStartupMessageToWorker(workerNodeKey)
1026 // Send the statistics message to worker.
1027 this.sendStatisticsMessageToWorker(workerNodeKey)
1028 }
1029
1030 /**
1031 * Sends the startup message to worker given its worker node key.
1032 *
1033 * @param workerNodeKey - The worker node key.
1034 */
1035 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1036
1037 /**
1038 * Sends the statistics message to worker given its worker node key.
1039 *
1040 * @param workerNodeKey - The worker node key.
1041 */
1042 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
1043 this.sendToWorker(workerNodeKey, {
1044 statistics: {
1045 runTime:
1046 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1047 .runTime.aggregate,
1048 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1049 .elu.aggregate
1050 },
1051 workerId: this.getWorkerInfo(workerNodeKey).id as number
1052 })
1053 }
1054
1055 private redistributeQueuedTasks (workerNodeKey: number): void {
1056 while (this.tasksQueueSize(workerNodeKey) > 0) {
1057 let targetWorkerNodeKey: number = workerNodeKey
1058 let minQueuedTasks = Infinity
1059 let executeTask = false
1060 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
1061 const workerInfo = this.getWorkerInfo(workerNodeId)
1062 if (
1063 workerNodeId !== workerNodeKey &&
1064 workerInfo.ready &&
1065 workerNode.usage.tasks.queued === 0
1066 ) {
1067 if (
1068 this.workerNodes[workerNodeId].usage.tasks.executing <
1069 (this.opts.tasksQueueOptions?.concurrency as number)
1070 ) {
1071 executeTask = true
1072 }
1073 targetWorkerNodeKey = workerNodeId
1074 break
1075 }
1076 if (
1077 workerNodeId !== workerNodeKey &&
1078 workerInfo.ready &&
1079 workerNode.usage.tasks.queued < minQueuedTasks
1080 ) {
1081 minQueuedTasks = workerNode.usage.tasks.queued
1082 targetWorkerNodeKey = workerNodeId
1083 }
1084 }
1085 if (executeTask) {
1086 this.executeTask(
1087 targetWorkerNodeKey,
1088 this.dequeueTask(workerNodeKey) as Task<Data>
1089 )
1090 } else {
1091 this.enqueueTask(
1092 targetWorkerNodeKey,
1093 this.dequeueTask(workerNodeKey) as Task<Data>
1094 )
1095 }
1096 }
1097 }
1098
1099 /**
1100 * This method is the listener registered for each worker message.
1101 *
1102 * @returns The listener function to execute when a message is received from a worker.
1103 */
1104 protected workerListener (): (message: MessageValue<Response>) => void {
1105 return (message) => {
1106 this.checkMessageWorkerId(message)
1107 if (message.ready != null) {
1108 // Worker ready response received from worker
1109 this.handleWorkerReadyResponse(message)
1110 } else if (message.taskId != null) {
1111 // Task execution response received from worker
1112 this.handleTaskExecutionResponse(message)
1113 } else if (message.taskFunctions != null) {
1114 // Task functions message received from worker
1115 this.taskFunctions = message.taskFunctions
1116 }
1117 }
1118 }
1119
1120 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
1121 this.getWorkerInfo(
1122 this.getWorkerNodeKeyByWorkerId(message.workerId)
1123 ).ready = message.ready as boolean
1124 if (this.emitter != null && this.ready) {
1125 this.emitter.emit(PoolEvents.ready, this.info)
1126 }
1127 }
1128
1129 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1130 const promiseResponse = this.promiseResponseMap.get(
1131 message.taskId as string
1132 )
1133 if (promiseResponse != null) {
1134 if (message.taskError != null) {
1135 this.emitter?.emit(PoolEvents.taskError, message.taskError)
1136 promiseResponse.reject(message.taskError.message)
1137 } else {
1138 promiseResponse.resolve(message.data as Response)
1139 }
1140 const workerNodeKey = promiseResponse.workerNodeKey
1141 this.afterTaskExecutionHook(workerNodeKey, message)
1142 this.promiseResponseMap.delete(message.taskId as string)
1143 if (
1144 this.opts.enableTasksQueue === true &&
1145 this.tasksQueueSize(workerNodeKey) > 0 &&
1146 this.workerNodes[workerNodeKey].usage.tasks.executing <
1147 (this.opts.tasksQueueOptions?.concurrency as number)
1148 ) {
1149 this.executeTask(
1150 workerNodeKey,
1151 this.dequeueTask(workerNodeKey) as Task<Data>
1152 )
1153 }
1154 this.workerChoiceStrategyContext.update(workerNodeKey)
1155 }
1156 }
1157
1158 private checkAndEmitEvents (): void {
1159 if (this.emitter != null) {
1160 if (this.busy) {
1161 this.emitter.emit(PoolEvents.busy, this.info)
1162 }
1163 if (this.type === PoolTypes.dynamic && this.full) {
1164 this.emitter.emit(PoolEvents.full, this.info)
1165 }
1166 }
1167 }
1168
1169 /**
1170 * Gets the worker information given its worker node key.
1171 *
1172 * @param workerNodeKey - The worker node key.
1173 * @returns The worker information.
1174 */
1175 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
1176 return this.workerNodes[workerNodeKey].info
1177 }
1178
1179 /**
1180 * Adds the given worker in the pool worker nodes.
1181 *
1182 * @param worker - The worker.
1183 * @returns The added worker node key.
1184 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1185 */
1186 private addWorkerNode (worker: Worker): number {
1187 const workerNode = new WorkerNode<Worker, Data>(worker, this.worker)
1188 // Flag the worker node as ready at pool startup.
1189 if (this.starting) {
1190 workerNode.info.ready = true
1191 }
1192 this.workerNodes.push(workerNode)
1193 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1194 if (workerNodeKey === -1) {
1195 throw new Error('Worker node not found')
1196 }
1197 return workerNodeKey
1198 }
1199
1200 /**
1201 * Removes the given worker from the pool worker nodes.
1202 *
1203 * @param worker - The worker.
1204 */
1205 private removeWorkerNode (worker: Worker): void {
1206 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1207 if (workerNodeKey !== -1) {
1208 this.workerNodes.splice(workerNodeKey, 1)
1209 this.workerChoiceStrategyContext.remove(workerNodeKey)
1210 }
1211 }
1212
1213 /**
1214 * Executes the given task on the worker given its worker node key.
1215 *
1216 * @param workerNodeKey - The worker node key.
1217 * @param task - The task to execute.
1218 */
1219 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1220 this.beforeTaskExecutionHook(workerNodeKey, task)
1221 this.sendToWorker(workerNodeKey, task, task.transferList)
1222 }
1223
1224 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1225 return this.workerNodes[workerNodeKey].enqueueTask(task)
1226 }
1227
1228 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1229 return this.workerNodes[workerNodeKey].dequeueTask()
1230 }
1231
1232 private tasksQueueSize (workerNodeKey: number): number {
1233 return this.workerNodes[workerNodeKey].tasksQueueSize()
1234 }
1235
1236 protected flushTasksQueue (workerNodeKey: number): void {
1237 while (this.tasksQueueSize(workerNodeKey) > 0) {
1238 this.executeTask(
1239 workerNodeKey,
1240 this.dequeueTask(workerNodeKey) as Task<Data>
1241 )
1242 }
1243 this.workerNodes[workerNodeKey].clearTasksQueue()
1244 }
1245
1246 private flushTasksQueues (): void {
1247 for (const [workerNodeKey] of this.workerNodes.entries()) {
1248 this.flushTasksQueue(workerNodeKey)
1249 }
1250 }
1251 }