fix: test task function name only if the received from workers
[poolifier.git] / src / pools / abstract-pool.ts
1 import { randomUUID } from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import { existsSync } from 'node:fs'
4 import { type TransferListItem } from 'node:worker_threads'
5 import type {
6 MessageValue,
7 PromiseResponseWrapper,
8 Task
9 } from '../utility-types'
10 import {
11 DEFAULT_TASK_NAME,
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
13 EMPTY_FUNCTION,
14 isKillBehavior,
15 isPlainObject,
16 median,
17 round,
18 updateMeasurementStatistics
19 } from '../utils'
20 import { KillBehaviors } from '../worker/worker-options'
21 import {
22 type IPool,
23 PoolEmitter,
24 PoolEvents,
25 type PoolInfo,
26 type PoolOptions,
27 type PoolType,
28 PoolTypes,
29 type TasksQueueOptions
30 } from './pool'
31 import type {
32 IWorker,
33 IWorkerNode,
34 WorkerInfo,
35 WorkerType,
36 WorkerUsage
37 } from './worker'
38 import {
39 type MeasurementStatisticsRequirements,
40 Measurements,
41 WorkerChoiceStrategies,
42 type WorkerChoiceStrategy,
43 type WorkerChoiceStrategyOptions
44 } from './selection-strategies/selection-strategies-types'
45 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
46 import { version } from './version'
47 import { WorkerNode } from './worker-node'
48
49 /**
50 * Base class that implements some shared logic for all poolifier pools.
51 *
52 * @typeParam Worker - Type of worker which manages this pool.
53 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
54 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
55 */
56 export abstract class AbstractPool<
57 Worker extends IWorker,
58 Data = unknown,
59 Response = unknown
60 > implements IPool<Worker, Data, Response> {
61 /** @inheritDoc */
62 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
63
64 /** @inheritDoc */
65 public readonly emitter?: PoolEmitter
66
67 /**
68 * The task execution response promise map.
69 *
70 * - `key`: The message id of each submitted task.
71 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
72 *
73 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
74 */
75 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
76 new Map<string, PromiseResponseWrapper<Response>>()
77
78 /**
79 * Worker choice strategy context referencing a worker choice algorithm implementation.
80 */
81 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
82 Worker,
83 Data,
84 Response
85 >
86
87 /**
88 * Whether the pool is starting or not.
89 */
90 private readonly starting: boolean
91 /**
92 * The start timestamp of the pool.
93 */
94 private readonly startTimestamp
95 /**
96 * The task function names.
97 */
98 private taskFunctions!: string[]
99
100 /**
101 * Constructs a new poolifier pool.
102 *
103 * @param numberOfWorkers - Number of workers that this pool should manage.
104 * @param filePath - Path to the worker file.
105 * @param opts - Options for the pool.
106 */
107 public constructor (
108 protected readonly numberOfWorkers: number,
109 protected readonly filePath: string,
110 protected readonly opts: PoolOptions<Worker>
111 ) {
112 if (!this.isMain()) {
113 throw new Error(
114 'Cannot start a pool from a worker with the same type as the pool'
115 )
116 }
117 this.checkNumberOfWorkers(this.numberOfWorkers)
118 this.checkFilePath(this.filePath)
119 this.checkPoolOptions(this.opts)
120
121 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
122 this.executeTask = this.executeTask.bind(this)
123 this.enqueueTask = this.enqueueTask.bind(this)
124 this.dequeueTask = this.dequeueTask.bind(this)
125 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
126
127 if (this.opts.enableEvents === true) {
128 this.emitter = new PoolEmitter()
129 }
130 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
131 Worker,
132 Data,
133 Response
134 >(
135 this,
136 this.opts.workerChoiceStrategy,
137 this.opts.workerChoiceStrategyOptions
138 )
139
140 this.setupHook()
141
142 this.starting = true
143 this.startPool()
144 this.starting = false
145
146 this.startTimestamp = performance.now()
147 }
148
149 private checkFilePath (filePath: string): void {
150 if (
151 filePath == null ||
152 typeof filePath !== 'string' ||
153 (typeof filePath === 'string' && filePath.trim().length === 0)
154 ) {
155 throw new Error('Please specify a file with a worker implementation')
156 }
157 if (!existsSync(filePath)) {
158 throw new Error(`Cannot find the worker file '${filePath}'`)
159 }
160 }
161
162 private checkNumberOfWorkers (numberOfWorkers: number): void {
163 if (numberOfWorkers == null) {
164 throw new Error(
165 'Cannot instantiate a pool without specifying the number of workers'
166 )
167 } else if (!Number.isSafeInteger(numberOfWorkers)) {
168 throw new TypeError(
169 'Cannot instantiate a pool with a non safe integer number of workers'
170 )
171 } else if (numberOfWorkers < 0) {
172 throw new RangeError(
173 'Cannot instantiate a pool with a negative number of workers'
174 )
175 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
176 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
177 }
178 }
179
180 protected checkDynamicPoolSize (min: number, max: number): void {
181 if (this.type === PoolTypes.dynamic) {
182 if (max == null) {
183 throw new Error(
184 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
185 )
186 } else if (!Number.isSafeInteger(max)) {
187 throw new TypeError(
188 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
189 )
190 } else if (min > max) {
191 throw new RangeError(
192 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
193 )
194 } else if (max === 0) {
195 throw new RangeError(
196 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
197 )
198 } else if (min === max) {
199 throw new RangeError(
200 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
201 )
202 }
203 }
204 }
205
206 private checkPoolOptions (opts: PoolOptions<Worker>): void {
207 if (isPlainObject(opts)) {
208 this.opts.workerChoiceStrategy =
209 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
210 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
211 this.opts.workerChoiceStrategyOptions =
212 opts.workerChoiceStrategyOptions ??
213 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
214 this.checkValidWorkerChoiceStrategyOptions(
215 this.opts.workerChoiceStrategyOptions
216 )
217 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
218 this.opts.enableEvents = opts.enableEvents ?? true
219 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
220 if (this.opts.enableTasksQueue) {
221 this.checkValidTasksQueueOptions(
222 opts.tasksQueueOptions as TasksQueueOptions
223 )
224 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
225 opts.tasksQueueOptions as TasksQueueOptions
226 )
227 }
228 } else {
229 throw new TypeError('Invalid pool options: must be a plain object')
230 }
231 }
232
233 private checkValidWorkerChoiceStrategy (
234 workerChoiceStrategy: WorkerChoiceStrategy
235 ): void {
236 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
237 throw new Error(
238 `Invalid worker choice strategy '${workerChoiceStrategy}'`
239 )
240 }
241 }
242
243 private checkValidWorkerChoiceStrategyOptions (
244 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
245 ): void {
246 if (!isPlainObject(workerChoiceStrategyOptions)) {
247 throw new TypeError(
248 'Invalid worker choice strategy options: must be a plain object'
249 )
250 }
251 if (
252 workerChoiceStrategyOptions.weights != null &&
253 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
254 ) {
255 throw new Error(
256 'Invalid worker choice strategy options: must have a weight for each worker node'
257 )
258 }
259 if (
260 workerChoiceStrategyOptions.measurement != null &&
261 !Object.values(Measurements).includes(
262 workerChoiceStrategyOptions.measurement
263 )
264 ) {
265 throw new Error(
266 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
267 )
268 }
269 }
270
271 private checkValidTasksQueueOptions (
272 tasksQueueOptions: TasksQueueOptions
273 ): void {
274 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
275 throw new TypeError('Invalid tasks queue options: must be a plain object')
276 }
277 if (
278 tasksQueueOptions?.concurrency != null &&
279 !Number.isSafeInteger(tasksQueueOptions.concurrency)
280 ) {
281 throw new TypeError(
282 'Invalid worker tasks concurrency: must be an integer'
283 )
284 }
285 if (
286 tasksQueueOptions?.concurrency != null &&
287 tasksQueueOptions.concurrency <= 0
288 ) {
289 throw new Error(
290 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
291 )
292 }
293 }
294
295 private startPool (): void {
296 while (
297 this.workerNodes.reduce(
298 (accumulator, workerNode) =>
299 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
300 0
301 ) < this.numberOfWorkers
302 ) {
303 this.createAndSetupWorkerNode()
304 }
305 }
306
307 /** @inheritDoc */
308 public get info (): PoolInfo {
309 return {
310 version,
311 type: this.type,
312 worker: this.worker,
313 ready: this.ready,
314 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
315 minSize: this.minSize,
316 maxSize: this.maxSize,
317 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
318 .runTime.aggregate &&
319 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
320 .waitTime.aggregate && { utilization: round(this.utilization) }),
321 workerNodes: this.workerNodes.length,
322 idleWorkerNodes: this.workerNodes.reduce(
323 (accumulator, workerNode) =>
324 workerNode.usage.tasks.executing === 0
325 ? accumulator + 1
326 : accumulator,
327 0
328 ),
329 busyWorkerNodes: this.workerNodes.reduce(
330 (accumulator, workerNode) =>
331 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
332 0
333 ),
334 executedTasks: this.workerNodes.reduce(
335 (accumulator, workerNode) =>
336 accumulator + workerNode.usage.tasks.executed,
337 0
338 ),
339 executingTasks: this.workerNodes.reduce(
340 (accumulator, workerNode) =>
341 accumulator + workerNode.usage.tasks.executing,
342 0
343 ),
344 ...(this.opts.enableTasksQueue === true && {
345 queuedTasks: this.workerNodes.reduce(
346 (accumulator, workerNode) =>
347 accumulator + workerNode.usage.tasks.queued,
348 0
349 )
350 }),
351 ...(this.opts.enableTasksQueue === true && {
352 maxQueuedTasks: this.workerNodes.reduce(
353 (accumulator, workerNode) =>
354 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
355 0
356 )
357 }),
358 failedTasks: this.workerNodes.reduce(
359 (accumulator, workerNode) =>
360 accumulator + workerNode.usage.tasks.failed,
361 0
362 ),
363 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
364 .runTime.aggregate && {
365 runTime: {
366 minimum: round(
367 Math.min(
368 ...this.workerNodes.map(
369 (workerNode) => workerNode.usage.runTime?.minimum ?? Infinity
370 )
371 )
372 ),
373 maximum: round(
374 Math.max(
375 ...this.workerNodes.map(
376 (workerNode) => workerNode.usage.runTime?.maximum ?? -Infinity
377 )
378 )
379 ),
380 average: round(
381 this.workerNodes.reduce(
382 (accumulator, workerNode) =>
383 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
384 0
385 ) /
386 this.workerNodes.reduce(
387 (accumulator, workerNode) =>
388 accumulator + (workerNode.usage.tasks?.executed ?? 0),
389 0
390 )
391 ),
392 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
393 .runTime.median && {
394 median: round(
395 median(
396 this.workerNodes.map(
397 (workerNode) => workerNode.usage.runTime?.median ?? 0
398 )
399 )
400 )
401 })
402 }
403 }),
404 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
405 .waitTime.aggregate && {
406 waitTime: {
407 minimum: round(
408 Math.min(
409 ...this.workerNodes.map(
410 (workerNode) => workerNode.usage.waitTime?.minimum ?? Infinity
411 )
412 )
413 ),
414 maximum: round(
415 Math.max(
416 ...this.workerNodes.map(
417 (workerNode) => workerNode.usage.waitTime?.maximum ?? -Infinity
418 )
419 )
420 ),
421 average: round(
422 this.workerNodes.reduce(
423 (accumulator, workerNode) =>
424 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
425 0
426 ) /
427 this.workerNodes.reduce(
428 (accumulator, workerNode) =>
429 accumulator + (workerNode.usage.tasks?.executed ?? 0),
430 0
431 )
432 ),
433 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
434 .waitTime.median && {
435 median: round(
436 median(
437 this.workerNodes.map(
438 (workerNode) => workerNode.usage.waitTime?.median ?? 0
439 )
440 )
441 )
442 })
443 }
444 })
445 }
446 }
447
448 /**
449 * The pool readiness boolean status.
450 */
451 private get ready (): boolean {
452 return (
453 this.workerNodes.reduce(
454 (accumulator, workerNode) =>
455 !workerNode.info.dynamic && workerNode.info.ready
456 ? accumulator + 1
457 : accumulator,
458 0
459 ) >= this.minSize
460 )
461 }
462
463 /**
464 * The approximate pool utilization.
465 *
466 * @returns The pool utilization.
467 */
468 private get utilization (): number {
469 const poolTimeCapacity =
470 (performance.now() - this.startTimestamp) * this.maxSize
471 const totalTasksRunTime = this.workerNodes.reduce(
472 (accumulator, workerNode) =>
473 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
474 0
475 )
476 const totalTasksWaitTime = this.workerNodes.reduce(
477 (accumulator, workerNode) =>
478 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
479 0
480 )
481 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
482 }
483
484 /**
485 * The pool type.
486 *
487 * If it is `'dynamic'`, it provides the `max` property.
488 */
489 protected abstract get type (): PoolType
490
491 /**
492 * The worker type.
493 */
494 protected abstract get worker (): WorkerType
495
496 /**
497 * The pool minimum size.
498 */
499 protected abstract get minSize (): number
500
501 /**
502 * The pool maximum size.
503 */
504 protected abstract get maxSize (): number
505
506 /**
507 * Checks if the worker id sent in the received message from a worker is valid.
508 *
509 * @param message - The received message.
510 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
511 */
512 private checkMessageWorkerId (message: MessageValue<Response>): void {
513 if (
514 message.workerId != null &&
515 this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
516 ) {
517 throw new Error(
518 `Worker message received from unknown worker '${message.workerId}'`
519 )
520 }
521 }
522
523 /**
524 * Gets the given worker its worker node key.
525 *
526 * @param worker - The worker.
527 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
528 */
529 private getWorkerNodeKeyByWorker (worker: Worker): number {
530 return this.workerNodes.findIndex(
531 (workerNode) => workerNode.worker === worker
532 )
533 }
534
535 /**
536 * Gets the worker node key given its worker id.
537 *
538 * @param workerId - The worker id.
539 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
540 */
541 private getWorkerNodeKeyByWorkerId (workerId: number): number {
542 return this.workerNodes.findIndex(
543 (workerNode) => workerNode.info.id === workerId
544 )
545 }
546
547 /** @inheritDoc */
548 public setWorkerChoiceStrategy (
549 workerChoiceStrategy: WorkerChoiceStrategy,
550 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
551 ): void {
552 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
553 this.opts.workerChoiceStrategy = workerChoiceStrategy
554 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
555 this.opts.workerChoiceStrategy
556 )
557 if (workerChoiceStrategyOptions != null) {
558 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
559 }
560 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
561 workerNode.resetUsage()
562 this.sendStatisticsMessageToWorker(workerNodeKey)
563 }
564 }
565
566 /** @inheritDoc */
567 public setWorkerChoiceStrategyOptions (
568 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
569 ): void {
570 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
571 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
572 this.workerChoiceStrategyContext.setOptions(
573 this.opts.workerChoiceStrategyOptions
574 )
575 }
576
577 /** @inheritDoc */
578 public enableTasksQueue (
579 enable: boolean,
580 tasksQueueOptions?: TasksQueueOptions
581 ): void {
582 if (this.opts.enableTasksQueue === true && !enable) {
583 this.flushTasksQueues()
584 }
585 this.opts.enableTasksQueue = enable
586 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
587 }
588
589 /** @inheritDoc */
590 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
591 if (this.opts.enableTasksQueue === true) {
592 this.checkValidTasksQueueOptions(tasksQueueOptions)
593 this.opts.tasksQueueOptions =
594 this.buildTasksQueueOptions(tasksQueueOptions)
595 } else if (this.opts.tasksQueueOptions != null) {
596 delete this.opts.tasksQueueOptions
597 }
598 }
599
600 private buildTasksQueueOptions (
601 tasksQueueOptions: TasksQueueOptions
602 ): TasksQueueOptions {
603 return {
604 concurrency: tasksQueueOptions?.concurrency ?? 1
605 }
606 }
607
608 /**
609 * Whether the pool is full or not.
610 *
611 * The pool filling boolean status.
612 */
613 protected get full (): boolean {
614 return this.workerNodes.length >= this.maxSize
615 }
616
617 /**
618 * Whether the pool is busy or not.
619 *
620 * The pool busyness boolean status.
621 */
622 protected abstract get busy (): boolean
623
624 /**
625 * Whether worker nodes are executing concurrently their tasks quota or not.
626 *
627 * @returns Worker nodes busyness boolean status.
628 */
629 protected internalBusy (): boolean {
630 if (this.opts.enableTasksQueue === true) {
631 return (
632 this.workerNodes.findIndex(
633 (workerNode) =>
634 workerNode.info.ready &&
635 workerNode.usage.tasks.executing <
636 (this.opts.tasksQueueOptions?.concurrency as number)
637 ) === -1
638 )
639 } else {
640 return (
641 this.workerNodes.findIndex(
642 (workerNode) =>
643 workerNode.info.ready && workerNode.usage.tasks.executing === 0
644 ) === -1
645 )
646 }
647 }
648
649 /** @inheritDoc */
650 public listTaskFunctions (): string[] {
651 if (this.taskFunctions != null) {
652 return this.taskFunctions
653 } else {
654 return []
655 }
656 }
657
658 /** @inheritDoc */
659 public async execute (
660 data?: Data,
661 name?: string,
662 transferList?: TransferListItem[]
663 ): Promise<Response> {
664 return await new Promise<Response>((resolve, reject) => {
665 if (name != null && typeof name !== 'string') {
666 reject(new TypeError('name argument must be a string'))
667 }
668 if (
669 name != null &&
670 typeof name === 'string' &&
671 name.trim().length === 0
672 ) {
673 reject(new TypeError('name argument must not be an empty string'))
674 }
675 if (
676 name != null &&
677 this.taskFunctions != null &&
678 !this.taskFunctions.includes(name)
679 ) {
680 reject(
681 new Error(`Task function '${name}' is not registered in the pool`)
682 )
683 }
684 if (transferList != null && !Array.isArray(transferList)) {
685 reject(new TypeError('transferList argument must be an array'))
686 }
687 const timestamp = performance.now()
688 const workerNodeKey = this.chooseWorkerNode()
689 const task: Task<Data> = {
690 name: name ?? DEFAULT_TASK_NAME,
691 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
692 data: data ?? ({} as Data),
693 transferList,
694 timestamp,
695 workerId: this.getWorkerInfo(workerNodeKey).id as number,
696 taskId: randomUUID()
697 }
698 this.promiseResponseMap.set(task.taskId as string, {
699 resolve,
700 reject,
701 workerNodeKey
702 })
703 if (
704 this.opts.enableTasksQueue === false ||
705 (this.opts.enableTasksQueue === true &&
706 this.workerNodes[workerNodeKey].usage.tasks.executing <
707 (this.opts.tasksQueueOptions?.concurrency as number))
708 ) {
709 this.executeTask(workerNodeKey, task)
710 } else {
711 this.enqueueTask(workerNodeKey, task)
712 }
713 this.checkAndEmitEvents()
714 })
715 }
716
717 /** @inheritDoc */
718 public async destroy (): Promise<void> {
719 await Promise.all(
720 this.workerNodes.map(async (_, workerNodeKey) => {
721 await this.destroyWorkerNode(workerNodeKey)
722 })
723 )
724 }
725
726 protected async sendKillMessageToWorker (
727 workerNodeKey: number,
728 workerId: number
729 ): Promise<void> {
730 await new Promise<void>((resolve, reject) => {
731 this.registerWorkerMessageListener(workerNodeKey, (message) => {
732 if (message.kill === 'success') {
733 resolve()
734 } else if (message.kill === 'failure') {
735 reject(new Error(`Worker ${workerId} kill message handling failed`))
736 }
737 })
738 this.sendToWorker(workerNodeKey, { kill: true, workerId })
739 })
740 }
741
742 /**
743 * Terminates the worker node given its worker node key.
744 *
745 * @param workerNodeKey - The worker node key.
746 */
747 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
748
749 /**
750 * Setup hook to execute code before worker nodes are created in the abstract constructor.
751 * Can be overridden.
752 *
753 * @virtual
754 */
755 protected setupHook (): void {
756 // Intentionally empty
757 }
758
759 /**
760 * Should return whether the worker is the main worker or not.
761 */
762 protected abstract isMain (): boolean
763
764 /**
765 * Hook executed before the worker task execution.
766 * Can be overridden.
767 *
768 * @param workerNodeKey - The worker node key.
769 * @param task - The task to execute.
770 */
771 protected beforeTaskExecutionHook (
772 workerNodeKey: number,
773 task: Task<Data>
774 ): void {
775 const workerUsage = this.workerNodes[workerNodeKey].usage
776 ++workerUsage.tasks.executing
777 this.updateWaitTimeWorkerUsage(workerUsage, task)
778 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
779 task.name as string
780 ) as WorkerUsage
781 ++taskWorkerUsage.tasks.executing
782 this.updateWaitTimeWorkerUsage(taskWorkerUsage, task)
783 }
784
785 /**
786 * Hook executed after the worker task execution.
787 * Can be overridden.
788 *
789 * @param workerNodeKey - The worker node key.
790 * @param message - The received message.
791 */
792 protected afterTaskExecutionHook (
793 workerNodeKey: number,
794 message: MessageValue<Response>
795 ): void {
796 const workerUsage = this.workerNodes[workerNodeKey].usage
797 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
798 this.updateRunTimeWorkerUsage(workerUsage, message)
799 this.updateEluWorkerUsage(workerUsage, message)
800 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
801 message.taskPerformance?.name ?? DEFAULT_TASK_NAME
802 ) as WorkerUsage
803 this.updateTaskStatisticsWorkerUsage(taskWorkerUsage, message)
804 this.updateRunTimeWorkerUsage(taskWorkerUsage, message)
805 this.updateEluWorkerUsage(taskWorkerUsage, message)
806 }
807
808 private updateTaskStatisticsWorkerUsage (
809 workerUsage: WorkerUsage,
810 message: MessageValue<Response>
811 ): void {
812 const workerTaskStatistics = workerUsage.tasks
813 --workerTaskStatistics.executing
814 if (message.taskError == null) {
815 ++workerTaskStatistics.executed
816 } else {
817 ++workerTaskStatistics.failed
818 }
819 }
820
821 private updateRunTimeWorkerUsage (
822 workerUsage: WorkerUsage,
823 message: MessageValue<Response>
824 ): void {
825 updateMeasurementStatistics(
826 workerUsage.runTime,
827 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
828 message.taskPerformance?.runTime ?? 0,
829 workerUsage.tasks.executed
830 )
831 }
832
833 private updateWaitTimeWorkerUsage (
834 workerUsage: WorkerUsage,
835 task: Task<Data>
836 ): void {
837 const timestamp = performance.now()
838 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
839 updateMeasurementStatistics(
840 workerUsage.waitTime,
841 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
842 taskWaitTime,
843 workerUsage.tasks.executed
844 )
845 }
846
847 private updateEluWorkerUsage (
848 workerUsage: WorkerUsage,
849 message: MessageValue<Response>
850 ): void {
851 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
852 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
853 updateMeasurementStatistics(
854 workerUsage.elu.active,
855 eluTaskStatisticsRequirements,
856 message.taskPerformance?.elu?.active ?? 0,
857 workerUsage.tasks.executed
858 )
859 updateMeasurementStatistics(
860 workerUsage.elu.idle,
861 eluTaskStatisticsRequirements,
862 message.taskPerformance?.elu?.idle ?? 0,
863 workerUsage.tasks.executed
864 )
865 if (eluTaskStatisticsRequirements.aggregate) {
866 if (message.taskPerformance?.elu != null) {
867 if (workerUsage.elu.utilization != null) {
868 workerUsage.elu.utilization =
869 (workerUsage.elu.utilization +
870 message.taskPerformance.elu.utilization) /
871 2
872 } else {
873 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
874 }
875 }
876 }
877 }
878
879 /**
880 * Chooses a worker node for the next task.
881 *
882 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
883 *
884 * @returns The chosen worker node key
885 */
886 private chooseWorkerNode (): number {
887 if (this.shallCreateDynamicWorker()) {
888 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
889 if (
890 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
891 ) {
892 return workerNodeKey
893 }
894 }
895 return this.workerChoiceStrategyContext.execute()
896 }
897
898 /**
899 * Conditions for dynamic worker creation.
900 *
901 * @returns Whether to create a dynamic worker or not.
902 */
903 private shallCreateDynamicWorker (): boolean {
904 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
905 }
906
907 /**
908 * Sends a message to worker given its worker node key.
909 *
910 * @param workerNodeKey - The worker node key.
911 * @param message - The message.
912 * @param transferList - The optional array of transferable objects.
913 */
914 protected abstract sendToWorker (
915 workerNodeKey: number,
916 message: MessageValue<Data>,
917 transferList?: TransferListItem[]
918 ): void
919
920 /**
921 * Creates a new worker.
922 *
923 * @returns Newly created worker.
924 */
925 protected abstract createWorker (): Worker
926
927 /**
928 * Creates a new, completely set up worker node.
929 *
930 * @returns New, completely set up worker node key.
931 */
932 protected createAndSetupWorkerNode (): number {
933 const worker = this.createWorker()
934
935 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
936 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
937 worker.on('error', (error) => {
938 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
939 const workerInfo = this.getWorkerInfo(workerNodeKey)
940 workerInfo.ready = false
941 this.workerNodes[workerNodeKey].closeChannel()
942 this.emitter?.emit(PoolEvents.error, error)
943 if (this.opts.restartWorkerOnError === true && !this.starting) {
944 if (workerInfo.dynamic) {
945 this.createAndSetupDynamicWorkerNode()
946 } else {
947 this.createAndSetupWorkerNode()
948 }
949 }
950 if (this.opts.enableTasksQueue === true) {
951 this.redistributeQueuedTasks(workerNodeKey)
952 }
953 })
954 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
955 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
956 worker.once('exit', () => {
957 this.removeWorkerNode(worker)
958 })
959
960 const workerNodeKey = this.addWorkerNode(worker)
961
962 this.afterWorkerNodeSetup(workerNodeKey)
963
964 return workerNodeKey
965 }
966
967 /**
968 * Creates a new, completely set up dynamic worker node.
969 *
970 * @returns New, completely set up dynamic worker node key.
971 */
972 protected createAndSetupDynamicWorkerNode (): number {
973 const workerNodeKey = this.createAndSetupWorkerNode()
974 this.registerWorkerMessageListener(workerNodeKey, (message) => {
975 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
976 message.workerId
977 )
978 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
979 // Kill message received from worker
980 if (
981 isKillBehavior(KillBehaviors.HARD, message.kill) ||
982 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
983 ((this.opts.enableTasksQueue === false &&
984 workerUsage.tasks.executing === 0) ||
985 (this.opts.enableTasksQueue === true &&
986 workerUsage.tasks.executing === 0 &&
987 this.tasksQueueSize(localWorkerNodeKey) === 0)))
988 ) {
989 this.destroyWorkerNode(localWorkerNodeKey).catch((error) => {
990 this.emitter?.emit(PoolEvents.error, error)
991 })
992 }
993 })
994 const workerInfo = this.getWorkerInfo(workerNodeKey)
995 this.sendToWorker(workerNodeKey, {
996 checkActive: true,
997 workerId: workerInfo.id as number
998 })
999 workerInfo.dynamic = true
1000 if (this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker) {
1001 workerInfo.ready = true
1002 }
1003 return workerNodeKey
1004 }
1005
1006 /**
1007 * Registers a listener callback on the worker given its worker node key.
1008 *
1009 * @param workerNodeKey - The worker node key.
1010 * @param listener - The message listener callback.
1011 */
1012 protected abstract registerWorkerMessageListener<
1013 Message extends Data | Response
1014 >(
1015 workerNodeKey: number,
1016 listener: (message: MessageValue<Message>) => void
1017 ): void
1018
1019 /**
1020 * Method hooked up after a worker node has been newly created.
1021 * Can be overridden.
1022 *
1023 * @param workerNodeKey - The newly created worker node key.
1024 */
1025 protected afterWorkerNodeSetup (workerNodeKey: number): void {
1026 // Listen to worker messages.
1027 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
1028 // Send the startup message to worker.
1029 this.sendStartupMessageToWorker(workerNodeKey)
1030 // Send the statistics message to worker.
1031 this.sendStatisticsMessageToWorker(workerNodeKey)
1032 }
1033
1034 /**
1035 * Sends the startup message to worker given its worker node key.
1036 *
1037 * @param workerNodeKey - The worker node key.
1038 */
1039 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1040
1041 /**
1042 * Sends the statistics message to worker given its worker node key.
1043 *
1044 * @param workerNodeKey - The worker node key.
1045 */
1046 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
1047 this.sendToWorker(workerNodeKey, {
1048 statistics: {
1049 runTime:
1050 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1051 .runTime.aggregate,
1052 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1053 .elu.aggregate
1054 },
1055 workerId: this.getWorkerInfo(workerNodeKey).id as number
1056 })
1057 }
1058
1059 private redistributeQueuedTasks (workerNodeKey: number): void {
1060 while (this.tasksQueueSize(workerNodeKey) > 0) {
1061 let targetWorkerNodeKey: number = workerNodeKey
1062 let minQueuedTasks = Infinity
1063 let executeTask = false
1064 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
1065 const workerInfo = this.getWorkerInfo(workerNodeId)
1066 if (
1067 workerNodeId !== workerNodeKey &&
1068 workerInfo.ready &&
1069 workerNode.usage.tasks.queued === 0
1070 ) {
1071 if (
1072 this.workerNodes[workerNodeId].usage.tasks.executing <
1073 (this.opts.tasksQueueOptions?.concurrency as number)
1074 ) {
1075 executeTask = true
1076 }
1077 targetWorkerNodeKey = workerNodeId
1078 break
1079 }
1080 if (
1081 workerNodeId !== workerNodeKey &&
1082 workerInfo.ready &&
1083 workerNode.usage.tasks.queued < minQueuedTasks
1084 ) {
1085 minQueuedTasks = workerNode.usage.tasks.queued
1086 targetWorkerNodeKey = workerNodeId
1087 }
1088 }
1089 if (executeTask) {
1090 this.executeTask(
1091 targetWorkerNodeKey,
1092 this.dequeueTask(workerNodeKey) as Task<Data>
1093 )
1094 } else {
1095 this.enqueueTask(
1096 targetWorkerNodeKey,
1097 this.dequeueTask(workerNodeKey) as Task<Data>
1098 )
1099 }
1100 }
1101 }
1102
1103 /**
1104 * This method is the listener registered for each worker message.
1105 *
1106 * @returns The listener function to execute when a message is received from a worker.
1107 */
1108 protected workerListener (): (message: MessageValue<Response>) => void {
1109 return (message) => {
1110 this.checkMessageWorkerId(message)
1111 if (message.ready != null) {
1112 // Worker ready response received from worker
1113 this.handleWorkerReadyResponse(message)
1114 } else if (message.taskId != null) {
1115 // Task execution response received from worker
1116 this.handleTaskExecutionResponse(message)
1117 } else if (message.taskFunctions != null) {
1118 // Task functions message received from worker
1119 this.taskFunctions = message.taskFunctions
1120 }
1121 }
1122 }
1123
1124 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
1125 this.getWorkerInfo(
1126 this.getWorkerNodeKeyByWorkerId(message.workerId)
1127 ).ready = message.ready as boolean
1128 if (this.emitter != null && this.ready) {
1129 this.emitter.emit(PoolEvents.ready, this.info)
1130 }
1131 }
1132
1133 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1134 const promiseResponse = this.promiseResponseMap.get(
1135 message.taskId as string
1136 )
1137 if (promiseResponse != null) {
1138 if (message.taskError != null) {
1139 this.emitter?.emit(PoolEvents.taskError, message.taskError)
1140 promiseResponse.reject(message.taskError.message)
1141 } else {
1142 promiseResponse.resolve(message.data as Response)
1143 }
1144 const workerNodeKey = promiseResponse.workerNodeKey
1145 this.afterTaskExecutionHook(workerNodeKey, message)
1146 this.promiseResponseMap.delete(message.taskId as string)
1147 if (
1148 this.opts.enableTasksQueue === true &&
1149 this.tasksQueueSize(workerNodeKey) > 0 &&
1150 this.workerNodes[workerNodeKey].usage.tasks.executing <
1151 (this.opts.tasksQueueOptions?.concurrency as number)
1152 ) {
1153 this.executeTask(
1154 workerNodeKey,
1155 this.dequeueTask(workerNodeKey) as Task<Data>
1156 )
1157 }
1158 this.workerChoiceStrategyContext.update(workerNodeKey)
1159 }
1160 }
1161
1162 private checkAndEmitEvents (): void {
1163 if (this.emitter != null) {
1164 if (this.busy) {
1165 this.emitter.emit(PoolEvents.busy, this.info)
1166 }
1167 if (this.type === PoolTypes.dynamic && this.full) {
1168 this.emitter.emit(PoolEvents.full, this.info)
1169 }
1170 }
1171 }
1172
1173 /**
1174 * Gets the worker information given its worker node key.
1175 *
1176 * @param workerNodeKey - The worker node key.
1177 * @returns The worker information.
1178 */
1179 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
1180 return this.workerNodes[workerNodeKey].info
1181 }
1182
1183 /**
1184 * Adds the given worker in the pool worker nodes.
1185 *
1186 * @param worker - The worker.
1187 * @returns The added worker node key.
1188 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1189 */
1190 private addWorkerNode (worker: Worker): number {
1191 const workerNode = new WorkerNode<Worker, Data>(worker, this.worker)
1192 // Flag the worker node as ready at pool startup.
1193 if (this.starting) {
1194 workerNode.info.ready = true
1195 }
1196 this.workerNodes.push(workerNode)
1197 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1198 if (workerNodeKey === -1) {
1199 throw new Error('Worker node not found')
1200 }
1201 return workerNodeKey
1202 }
1203
1204 /**
1205 * Removes the given worker from the pool worker nodes.
1206 *
1207 * @param worker - The worker.
1208 */
1209 private removeWorkerNode (worker: Worker): void {
1210 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1211 if (workerNodeKey !== -1) {
1212 this.workerNodes.splice(workerNodeKey, 1)
1213 this.workerChoiceStrategyContext.remove(workerNodeKey)
1214 }
1215 }
1216
1217 /**
1218 * Executes the given task on the worker given its worker node key.
1219 *
1220 * @param workerNodeKey - The worker node key.
1221 * @param task - The task to execute.
1222 */
1223 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1224 this.beforeTaskExecutionHook(workerNodeKey, task)
1225 this.sendToWorker(workerNodeKey, task, task.transferList)
1226 }
1227
1228 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1229 return this.workerNodes[workerNodeKey].enqueueTask(task)
1230 }
1231
1232 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1233 return this.workerNodes[workerNodeKey].dequeueTask()
1234 }
1235
1236 private tasksQueueSize (workerNodeKey: number): number {
1237 return this.workerNodes[workerNodeKey].tasksQueueSize()
1238 }
1239
1240 protected flushTasksQueue (workerNodeKey: number): void {
1241 while (this.tasksQueueSize(workerNodeKey) > 0) {
1242 this.executeTask(
1243 workerNodeKey,
1244 this.dequeueTask(workerNodeKey) as Task<Data>
1245 )
1246 }
1247 this.workerNodes[workerNodeKey].clearTasksQueue()
1248 }
1249
1250 private flushTasksQueues (): void {
1251 for (const [workerNodeKey] of this.workerNodes.entries()) {
1252 this.flushTasksQueue(workerNodeKey)
1253 }
1254 }
1255 }