Merge branch 'master' of github.com:poolifier/poolifier
[poolifier.git] / src / pools / abstract-pool.ts
1 import { randomUUID } from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import { existsSync } from 'node:fs'
4 import { type TransferListItem } from 'node:worker_threads'
5 import type {
6 MessageValue,
7 PromiseResponseWrapper,
8 Task
9 } from '../utility-types'
10 import {
11 DEFAULT_TASK_NAME,
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
13 EMPTY_FUNCTION,
14 isKillBehavior,
15 isPlainObject,
16 median,
17 round,
18 updateMeasurementStatistics
19 } from '../utils'
20 import { KillBehaviors } from '../worker/worker-options'
21 import {
22 type IPool,
23 PoolEmitter,
24 PoolEvents,
25 type PoolInfo,
26 type PoolOptions,
27 type PoolType,
28 PoolTypes,
29 type TasksQueueOptions
30 } from './pool'
31 import {
32 type IWorker,
33 type IWorkerNode,
34 type WorkerInfo,
35 type WorkerType,
36 WorkerTypes,
37 type WorkerUsage
38 } from './worker'
39 import {
40 type MeasurementStatisticsRequirements,
41 Measurements,
42 WorkerChoiceStrategies,
43 type WorkerChoiceStrategy,
44 type WorkerChoiceStrategyOptions
45 } from './selection-strategies/selection-strategies-types'
46 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
47 import { version } from './version'
48 import { WorkerNode } from './worker-node'
49
50 /**
51 * Base class that implements some shared logic for all poolifier pools.
52 *
53 * @typeParam Worker - Type of worker which manages this pool.
54 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
55 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
56 */
57 export abstract class AbstractPool<
58 Worker extends IWorker,
59 Data = unknown,
60 Response = unknown
61 > implements IPool<Worker, Data, Response> {
62 /** @inheritDoc */
63 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
64
65 /** @inheritDoc */
66 public readonly emitter?: PoolEmitter
67
68 /**
69 * The task execution response promise map.
70 *
71 * - `key`: The message id of each submitted task.
72 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
73 *
74 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
75 */
76 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
77 new Map<string, PromiseResponseWrapper<Response>>()
78
79 /**
80 * Worker choice strategy context referencing a worker choice algorithm implementation.
81 */
82 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
83 Worker,
84 Data,
85 Response
86 >
87
88 /**
89 * Whether the pool is starting or not.
90 */
91 private readonly starting: boolean
92 /**
93 * The start timestamp of the pool.
94 */
95 private readonly startTimestamp
96
97 /**
98 * Constructs a new poolifier pool.
99 *
100 * @param numberOfWorkers - Number of workers that this pool should manage.
101 * @param filePath - Path to the worker file.
102 * @param opts - Options for the pool.
103 */
104 public constructor (
105 protected readonly numberOfWorkers: number,
106 protected readonly filePath: string,
107 protected readonly opts: PoolOptions<Worker>
108 ) {
109 if (!this.isMain()) {
110 throw new Error('Cannot start a pool from a worker!')
111 }
112 this.checkNumberOfWorkers(this.numberOfWorkers)
113 this.checkFilePath(this.filePath)
114 this.checkPoolOptions(this.opts)
115
116 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
117 this.executeTask = this.executeTask.bind(this)
118 this.enqueueTask = this.enqueueTask.bind(this)
119 this.dequeueTask = this.dequeueTask.bind(this)
120 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
121
122 if (this.opts.enableEvents === true) {
123 this.emitter = new PoolEmitter()
124 }
125 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
126 Worker,
127 Data,
128 Response
129 >(
130 this,
131 this.opts.workerChoiceStrategy,
132 this.opts.workerChoiceStrategyOptions
133 )
134
135 this.setupHook()
136
137 this.starting = true
138 this.startPool()
139 this.starting = false
140
141 this.startTimestamp = performance.now()
142 }
143
144 private checkFilePath (filePath: string): void {
145 if (
146 filePath == null ||
147 typeof filePath !== 'string' ||
148 (typeof filePath === 'string' && filePath.trim().length === 0)
149 ) {
150 throw new Error('Please specify a file with a worker implementation')
151 }
152 if (!existsSync(filePath)) {
153 throw new Error(`Cannot find the worker file '${filePath}'`)
154 }
155 }
156
157 private checkNumberOfWorkers (numberOfWorkers: number): void {
158 if (numberOfWorkers == null) {
159 throw new Error(
160 'Cannot instantiate a pool without specifying the number of workers'
161 )
162 } else if (!Number.isSafeInteger(numberOfWorkers)) {
163 throw new TypeError(
164 'Cannot instantiate a pool with a non safe integer number of workers'
165 )
166 } else if (numberOfWorkers < 0) {
167 throw new RangeError(
168 'Cannot instantiate a pool with a negative number of workers'
169 )
170 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
171 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
172 }
173 }
174
175 protected checkDynamicPoolSize (min: number, max: number): void {
176 if (this.type === PoolTypes.dynamic) {
177 if (max == null) {
178 throw new Error(
179 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
180 )
181 } else if (!Number.isSafeInteger(max)) {
182 throw new TypeError(
183 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
184 )
185 } else if (min > max) {
186 throw new RangeError(
187 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
188 )
189 } else if (max === 0) {
190 throw new RangeError(
191 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
192 )
193 } else if (min === max) {
194 throw new RangeError(
195 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
196 )
197 }
198 }
199 }
200
201 private checkPoolOptions (opts: PoolOptions<Worker>): void {
202 if (isPlainObject(opts)) {
203 this.opts.workerChoiceStrategy =
204 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
205 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
206 this.opts.workerChoiceStrategyOptions =
207 opts.workerChoiceStrategyOptions ??
208 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
209 this.checkValidWorkerChoiceStrategyOptions(
210 this.opts.workerChoiceStrategyOptions
211 )
212 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
213 this.opts.enableEvents = opts.enableEvents ?? true
214 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
215 if (this.opts.enableTasksQueue) {
216 this.checkValidTasksQueueOptions(
217 opts.tasksQueueOptions as TasksQueueOptions
218 )
219 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
220 opts.tasksQueueOptions as TasksQueueOptions
221 )
222 }
223 } else {
224 throw new TypeError('Invalid pool options: must be a plain object')
225 }
226 }
227
228 private checkValidWorkerChoiceStrategy (
229 workerChoiceStrategy: WorkerChoiceStrategy
230 ): void {
231 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
232 throw new Error(
233 `Invalid worker choice strategy '${workerChoiceStrategy}'`
234 )
235 }
236 }
237
238 private checkValidWorkerChoiceStrategyOptions (
239 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
240 ): void {
241 if (!isPlainObject(workerChoiceStrategyOptions)) {
242 throw new TypeError(
243 'Invalid worker choice strategy options: must be a plain object'
244 )
245 }
246 if (
247 workerChoiceStrategyOptions.weights != null &&
248 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
249 ) {
250 throw new Error(
251 'Invalid worker choice strategy options: must have a weight for each worker node'
252 )
253 }
254 if (
255 workerChoiceStrategyOptions.measurement != null &&
256 !Object.values(Measurements).includes(
257 workerChoiceStrategyOptions.measurement
258 )
259 ) {
260 throw new Error(
261 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
262 )
263 }
264 }
265
266 private checkValidTasksQueueOptions (
267 tasksQueueOptions: TasksQueueOptions
268 ): void {
269 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
270 throw new TypeError('Invalid tasks queue options: must be a plain object')
271 }
272 if (
273 tasksQueueOptions?.concurrency != null &&
274 !Number.isSafeInteger(tasksQueueOptions.concurrency)
275 ) {
276 throw new TypeError(
277 'Invalid worker tasks concurrency: must be an integer'
278 )
279 }
280 if (
281 tasksQueueOptions?.concurrency != null &&
282 tasksQueueOptions.concurrency <= 0
283 ) {
284 throw new Error(
285 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
286 )
287 }
288 }
289
290 private startPool (): void {
291 while (
292 this.workerNodes.reduce(
293 (accumulator, workerNode) =>
294 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
295 0
296 ) < this.numberOfWorkers
297 ) {
298 this.createAndSetupWorkerNode()
299 }
300 }
301
302 /** @inheritDoc */
303 public get info (): PoolInfo {
304 return {
305 version,
306 type: this.type,
307 worker: this.worker,
308 ready: this.ready,
309 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
310 minSize: this.minSize,
311 maxSize: this.maxSize,
312 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
313 .runTime.aggregate &&
314 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
315 .waitTime.aggregate && { utilization: round(this.utilization) }),
316 workerNodes: this.workerNodes.length,
317 idleWorkerNodes: this.workerNodes.reduce(
318 (accumulator, workerNode) =>
319 workerNode.usage.tasks.executing === 0
320 ? accumulator + 1
321 : accumulator,
322 0
323 ),
324 busyWorkerNodes: this.workerNodes.reduce(
325 (accumulator, workerNode) =>
326 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
327 0
328 ),
329 executedTasks: this.workerNodes.reduce(
330 (accumulator, workerNode) =>
331 accumulator + workerNode.usage.tasks.executed,
332 0
333 ),
334 executingTasks: this.workerNodes.reduce(
335 (accumulator, workerNode) =>
336 accumulator + workerNode.usage.tasks.executing,
337 0
338 ),
339 ...(this.opts.enableTasksQueue === true && {
340 queuedTasks: this.workerNodes.reduce(
341 (accumulator, workerNode) =>
342 accumulator + workerNode.usage.tasks.queued,
343 0
344 )
345 }),
346 ...(this.opts.enableTasksQueue === true && {
347 maxQueuedTasks: this.workerNodes.reduce(
348 (accumulator, workerNode) =>
349 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
350 0
351 )
352 }),
353 failedTasks: this.workerNodes.reduce(
354 (accumulator, workerNode) =>
355 accumulator + workerNode.usage.tasks.failed,
356 0
357 ),
358 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
359 .runTime.aggregate && {
360 runTime: {
361 minimum: round(
362 Math.min(
363 ...this.workerNodes.map(
364 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
365 )
366 )
367 ),
368 maximum: round(
369 Math.max(
370 ...this.workerNodes.map(
371 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
372 )
373 )
374 ),
375 average: round(
376 this.workerNodes.reduce(
377 (accumulator, workerNode) =>
378 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
379 0
380 ) /
381 this.workerNodes.reduce(
382 (accumulator, workerNode) =>
383 accumulator + (workerNode.usage.tasks?.executed ?? 0),
384 0
385 )
386 ),
387 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
388 .runTime.median && {
389 median: round(
390 median(
391 this.workerNodes.map(
392 workerNode => workerNode.usage.runTime?.median ?? 0
393 )
394 )
395 )
396 })
397 }
398 }),
399 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
400 .waitTime.aggregate && {
401 waitTime: {
402 minimum: round(
403 Math.min(
404 ...this.workerNodes.map(
405 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
406 )
407 )
408 ),
409 maximum: round(
410 Math.max(
411 ...this.workerNodes.map(
412 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
413 )
414 )
415 ),
416 average: round(
417 this.workerNodes.reduce(
418 (accumulator, workerNode) =>
419 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
420 0
421 ) /
422 this.workerNodes.reduce(
423 (accumulator, workerNode) =>
424 accumulator + (workerNode.usage.tasks?.executed ?? 0),
425 0
426 )
427 ),
428 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
429 .waitTime.median && {
430 median: round(
431 median(
432 this.workerNodes.map(
433 workerNode => workerNode.usage.waitTime?.median ?? 0
434 )
435 )
436 )
437 })
438 }
439 })
440 }
441 }
442
443 /**
444 * The pool readiness boolean status.
445 */
446 private get ready (): boolean {
447 return (
448 this.workerNodes.reduce(
449 (accumulator, workerNode) =>
450 !workerNode.info.dynamic && workerNode.info.ready
451 ? accumulator + 1
452 : accumulator,
453 0
454 ) >= this.minSize
455 )
456 }
457
458 /**
459 * The approximate pool utilization.
460 *
461 * @returns The pool utilization.
462 */
463 private get utilization (): number {
464 const poolTimeCapacity =
465 (performance.now() - this.startTimestamp) * this.maxSize
466 const totalTasksRunTime = this.workerNodes.reduce(
467 (accumulator, workerNode) =>
468 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
469 0
470 )
471 const totalTasksWaitTime = this.workerNodes.reduce(
472 (accumulator, workerNode) =>
473 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
474 0
475 )
476 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
477 }
478
479 /**
480 * The pool type.
481 *
482 * If it is `'dynamic'`, it provides the `max` property.
483 */
484 protected abstract get type (): PoolType
485
486 /**
487 * The worker type.
488 */
489 protected abstract get worker (): WorkerType
490
491 /**
492 * The pool minimum size.
493 */
494 protected abstract get minSize (): number
495
496 /**
497 * The pool maximum size.
498 */
499 protected abstract get maxSize (): number
500
501 /**
502 * Checks if the worker id sent in the received message from a worker is valid.
503 *
504 * @param message - The received message.
505 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
506 */
507 private checkMessageWorkerId (message: MessageValue<Response>): void {
508 if (
509 message.workerId != null &&
510 this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
511 ) {
512 throw new Error(
513 `Worker message received from unknown worker '${message.workerId}'`
514 )
515 }
516 }
517
518 /**
519 * Gets the given worker its worker node key.
520 *
521 * @param worker - The worker.
522 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
523 */
524 private getWorkerNodeKeyByWorker (worker: Worker): number {
525 return this.workerNodes.findIndex(
526 workerNode => workerNode.worker === worker
527 )
528 }
529
530 /**
531 * Gets the worker node key given its worker id.
532 *
533 * @param workerId - The worker id.
534 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
535 */
536 private getWorkerNodeKeyByWorkerId (workerId: number): number {
537 return this.workerNodes.findIndex(
538 workerNode => workerNode.info.id === workerId
539 )
540 }
541
542 /** @inheritDoc */
543 public setWorkerChoiceStrategy (
544 workerChoiceStrategy: WorkerChoiceStrategy,
545 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
546 ): void {
547 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
548 this.opts.workerChoiceStrategy = workerChoiceStrategy
549 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
550 this.opts.workerChoiceStrategy
551 )
552 if (workerChoiceStrategyOptions != null) {
553 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
554 }
555 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
556 workerNode.resetUsage()
557 this.sendWorkerStatisticsMessageToWorker(workerNodeKey)
558 }
559 }
560
561 /** @inheritDoc */
562 public setWorkerChoiceStrategyOptions (
563 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
564 ): void {
565 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
566 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
567 this.workerChoiceStrategyContext.setOptions(
568 this.opts.workerChoiceStrategyOptions
569 )
570 }
571
572 /** @inheritDoc */
573 public enableTasksQueue (
574 enable: boolean,
575 tasksQueueOptions?: TasksQueueOptions
576 ): void {
577 if (this.opts.enableTasksQueue === true && !enable) {
578 this.flushTasksQueues()
579 }
580 this.opts.enableTasksQueue = enable
581 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
582 }
583
584 /** @inheritDoc */
585 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
586 if (this.opts.enableTasksQueue === true) {
587 this.checkValidTasksQueueOptions(tasksQueueOptions)
588 this.opts.tasksQueueOptions =
589 this.buildTasksQueueOptions(tasksQueueOptions)
590 } else if (this.opts.tasksQueueOptions != null) {
591 delete this.opts.tasksQueueOptions
592 }
593 }
594
595 private buildTasksQueueOptions (
596 tasksQueueOptions: TasksQueueOptions
597 ): TasksQueueOptions {
598 return {
599 concurrency: tasksQueueOptions?.concurrency ?? 1
600 }
601 }
602
603 /**
604 * Whether the pool is full or not.
605 *
606 * The pool filling boolean status.
607 */
608 protected get full (): boolean {
609 return this.workerNodes.length >= this.maxSize
610 }
611
612 /**
613 * Whether the pool is busy or not.
614 *
615 * The pool busyness boolean status.
616 */
617 protected abstract get busy (): boolean
618
619 /**
620 * Whether worker nodes are executing concurrently their tasks quota or not.
621 *
622 * @returns Worker nodes busyness boolean status.
623 */
624 protected internalBusy (): boolean {
625 if (this.opts.enableTasksQueue === true) {
626 return (
627 this.workerNodes.findIndex(
628 workerNode =>
629 workerNode.info.ready &&
630 workerNode.usage.tasks.executing <
631 (this.opts.tasksQueueOptions?.concurrency as number)
632 ) === -1
633 )
634 } else {
635 return (
636 this.workerNodes.findIndex(
637 workerNode =>
638 workerNode.info.ready && workerNode.usage.tasks.executing === 0
639 ) === -1
640 )
641 }
642 }
643
644 /** @inheritDoc */
645 public async execute (
646 data?: Data,
647 name?: string,
648 transferList?: TransferListItem[]
649 ): Promise<Response> {
650 return await new Promise<Response>((resolve, reject) => {
651 if (name != null && typeof name !== 'string') {
652 reject(new TypeError('name argument must be a string'))
653 }
654 if (transferList != null && !Array.isArray(transferList)) {
655 reject(new TypeError('transferList argument must be an array'))
656 }
657 const timestamp = performance.now()
658 const workerNodeKey = this.chooseWorkerNode()
659 const task: Task<Data> = {
660 name: name ?? DEFAULT_TASK_NAME,
661 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
662 data: data ?? ({} as Data),
663 transferList,
664 timestamp,
665 workerId: this.getWorkerInfo(workerNodeKey).id as number,
666 taskId: randomUUID()
667 }
668 this.promiseResponseMap.set(task.taskId as string, {
669 resolve,
670 reject,
671 workerNodeKey
672 })
673 if (
674 this.opts.enableTasksQueue === false ||
675 (this.opts.enableTasksQueue === true &&
676 this.workerNodes[workerNodeKey].usage.tasks.executing <
677 (this.opts.tasksQueueOptions?.concurrency as number))
678 ) {
679 this.executeTask(workerNodeKey, task)
680 } else {
681 this.enqueueTask(workerNodeKey, task)
682 }
683 this.checkAndEmitEvents()
684 })
685 }
686
687 /** @inheritDoc */
688 public async destroy (): Promise<void> {
689 await Promise.all(
690 this.workerNodes.map(async (_, workerNodeKey) => {
691 await this.destroyWorkerNode(workerNodeKey)
692 })
693 )
694 }
695
696 /**
697 * Terminates the worker node given its worker node key.
698 *
699 * @param workerNodeKey - The worker node key.
700 */
701 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
702
703 /**
704 * Setup hook to execute code before worker nodes are created in the abstract constructor.
705 * Can be overridden.
706 *
707 * @virtual
708 */
709 protected setupHook (): void {
710 // Intentionally empty
711 }
712
713 /**
714 * Should return whether the worker is the main worker or not.
715 */
716 protected abstract isMain (): boolean
717
718 /**
719 * Hook executed before the worker task execution.
720 * Can be overridden.
721 *
722 * @param workerNodeKey - The worker node key.
723 * @param task - The task to execute.
724 */
725 protected beforeTaskExecutionHook (
726 workerNodeKey: number,
727 task: Task<Data>
728 ): void {
729 const workerUsage = this.workerNodes[workerNodeKey].usage
730 ++workerUsage.tasks.executing
731 this.updateWaitTimeWorkerUsage(workerUsage, task)
732 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
733 task.name as string
734 ) as WorkerUsage
735 ++taskWorkerUsage.tasks.executing
736 this.updateWaitTimeWorkerUsage(taskWorkerUsage, task)
737 }
738
739 /**
740 * Hook executed after the worker task execution.
741 * Can be overridden.
742 *
743 * @param workerNodeKey - The worker node key.
744 * @param message - The received message.
745 */
746 protected afterTaskExecutionHook (
747 workerNodeKey: number,
748 message: MessageValue<Response>
749 ): void {
750 const workerUsage = this.workerNodes[workerNodeKey].usage
751 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
752 this.updateRunTimeWorkerUsage(workerUsage, message)
753 this.updateEluWorkerUsage(workerUsage, message)
754 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
755 message.taskPerformance?.name ?? DEFAULT_TASK_NAME
756 ) as WorkerUsage
757 this.updateTaskStatisticsWorkerUsage(taskWorkerUsage, message)
758 this.updateRunTimeWorkerUsage(taskWorkerUsage, message)
759 this.updateEluWorkerUsage(taskWorkerUsage, message)
760 }
761
762 private updateTaskStatisticsWorkerUsage (
763 workerUsage: WorkerUsage,
764 message: MessageValue<Response>
765 ): void {
766 const workerTaskStatistics = workerUsage.tasks
767 --workerTaskStatistics.executing
768 if (message.taskError == null) {
769 ++workerTaskStatistics.executed
770 } else {
771 ++workerTaskStatistics.failed
772 }
773 }
774
775 private updateRunTimeWorkerUsage (
776 workerUsage: WorkerUsage,
777 message: MessageValue<Response>
778 ): void {
779 updateMeasurementStatistics(
780 workerUsage.runTime,
781 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
782 message.taskPerformance?.runTime ?? 0,
783 workerUsage.tasks.executed
784 )
785 }
786
787 private updateWaitTimeWorkerUsage (
788 workerUsage: WorkerUsage,
789 task: Task<Data>
790 ): void {
791 const timestamp = performance.now()
792 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
793 updateMeasurementStatistics(
794 workerUsage.waitTime,
795 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
796 taskWaitTime,
797 workerUsage.tasks.executed
798 )
799 }
800
801 private updateEluWorkerUsage (
802 workerUsage: WorkerUsage,
803 message: MessageValue<Response>
804 ): void {
805 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
806 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
807 updateMeasurementStatistics(
808 workerUsage.elu.active,
809 eluTaskStatisticsRequirements,
810 message.taskPerformance?.elu?.active ?? 0,
811 workerUsage.tasks.executed
812 )
813 updateMeasurementStatistics(
814 workerUsage.elu.idle,
815 eluTaskStatisticsRequirements,
816 message.taskPerformance?.elu?.idle ?? 0,
817 workerUsage.tasks.executed
818 )
819 if (eluTaskStatisticsRequirements.aggregate) {
820 if (message.taskPerformance?.elu != null) {
821 if (workerUsage.elu.utilization != null) {
822 workerUsage.elu.utilization =
823 (workerUsage.elu.utilization +
824 message.taskPerformance.elu.utilization) /
825 2
826 } else {
827 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
828 }
829 }
830 }
831 }
832
833 /**
834 * Chooses a worker node for the next task.
835 *
836 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
837 *
838 * @returns The chosen worker node key
839 */
840 private chooseWorkerNode (): number {
841 if (this.shallCreateDynamicWorker()) {
842 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
843 if (
844 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
845 ) {
846 return workerNodeKey
847 }
848 }
849 return this.workerChoiceStrategyContext.execute()
850 }
851
852 /**
853 * Conditions for dynamic worker creation.
854 *
855 * @returns Whether to create a dynamic worker or not.
856 */
857 private shallCreateDynamicWorker (): boolean {
858 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
859 }
860
861 /**
862 * Sends a message to worker given its worker node key.
863 *
864 * @param workerNodeKey - The worker node key.
865 * @param message - The message.
866 * @param transferList - The optional array of transferable objects.
867 */
868 protected abstract sendToWorker (
869 workerNodeKey: number,
870 message: MessageValue<Data>,
871 transferList?: TransferListItem[]
872 ): void
873
874 /**
875 * Creates a new worker.
876 *
877 * @returns Newly created worker.
878 */
879 protected abstract createWorker (): Worker
880
881 /**
882 * Creates a new, completely set up worker node.
883 *
884 * @returns New, completely set up worker node key.
885 */
886 protected createAndSetupWorkerNode (): number {
887 const worker = this.createWorker()
888
889 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
890 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
891 worker.on('error', error => {
892 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
893 const workerInfo = this.getWorkerInfo(workerNodeKey)
894 workerInfo.ready = false
895 this.workerNodes[workerNodeKey].closeChannel()
896 this.emitter?.emit(PoolEvents.error, error)
897 if (this.opts.restartWorkerOnError === true && !this.starting) {
898 if (workerInfo.dynamic) {
899 this.createAndSetupDynamicWorkerNode()
900 } else {
901 this.createAndSetupWorkerNode()
902 }
903 }
904 if (this.opts.enableTasksQueue === true) {
905 this.redistributeQueuedTasks(workerNodeKey)
906 }
907 })
908 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
909 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
910 worker.once('exit', () => {
911 this.removeWorkerNode(worker)
912 })
913
914 const workerNodeKey = this.addWorkerNode(worker)
915
916 this.afterWorkerNodeSetup(workerNodeKey)
917
918 return workerNodeKey
919 }
920
921 /**
922 * Creates a new, completely set up dynamic worker node.
923 *
924 * @returns New, completely set up dynamic worker node key.
925 */
926 protected createAndSetupDynamicWorkerNode (): number {
927 const workerNodeKey = this.createAndSetupWorkerNode()
928 this.registerWorkerMessageListener(workerNodeKey, message => {
929 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
930 message.workerId
931 )
932 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
933 // Kill message received from worker
934 if (
935 isKillBehavior(KillBehaviors.HARD, message.kill) ||
936 (message.kill != null &&
937 ((this.opts.enableTasksQueue === false &&
938 workerUsage.tasks.executing === 0) ||
939 (this.opts.enableTasksQueue === true &&
940 workerUsage.tasks.executing === 0 &&
941 this.tasksQueueSize(localWorkerNodeKey) === 0)))
942 ) {
943 this.destroyWorkerNode(localWorkerNodeKey).catch(EMPTY_FUNCTION)
944 }
945 })
946 const workerInfo = this.getWorkerInfo(workerNodeKey)
947 this.sendToWorker(workerNodeKey, {
948 checkActive: true,
949 workerId: workerInfo.id as number
950 })
951 workerInfo.dynamic = true
952 if (this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker) {
953 workerInfo.ready = true
954 }
955 return workerNodeKey
956 }
957
958 /**
959 * Registers a listener callback on the worker given its worker node key.
960 *
961 * @param workerNodeKey - The worker node key.
962 * @param listener - The message listener callback.
963 */
964 protected abstract registerWorkerMessageListener<
965 Message extends Data | Response
966 >(
967 workerNodeKey: number,
968 listener: (message: MessageValue<Message>) => void
969 ): void
970
971 /**
972 * Method hooked up after a worker node has been newly created.
973 * Can be overridden.
974 *
975 * @param workerNodeKey - The newly created worker node key.
976 */
977 protected afterWorkerNodeSetup (workerNodeKey: number): void {
978 // Listen to worker messages.
979 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
980 // Send the startup message to worker.
981 this.sendStartupMessageToWorker(workerNodeKey)
982 // Send the worker statistics message to worker.
983 this.sendWorkerStatisticsMessageToWorker(workerNodeKey)
984 }
985
986 /**
987 * Sends the startup message to worker given its worker node key.
988 *
989 * @param workerNodeKey - The worker node key.
990 */
991 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
992
993 /**
994 * Sends the worker statistics message to worker given its worker node key.
995 *
996 * @param workerNodeKey - The worker node key.
997 */
998 private sendWorkerStatisticsMessageToWorker (workerNodeKey: number): void {
999 this.sendToWorker(workerNodeKey, {
1000 statistics: {
1001 runTime:
1002 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1003 .runTime.aggregate,
1004 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1005 .elu.aggregate
1006 },
1007 workerId: this.getWorkerInfo(workerNodeKey).id as number
1008 })
1009 }
1010
1011 private redistributeQueuedTasks (workerNodeKey: number): void {
1012 while (this.tasksQueueSize(workerNodeKey) > 0) {
1013 let targetWorkerNodeKey: number = workerNodeKey
1014 let minQueuedTasks = Infinity
1015 let executeTask = false
1016 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
1017 const workerInfo = this.getWorkerInfo(workerNodeId)
1018 if (
1019 workerNodeId !== workerNodeKey &&
1020 workerInfo.ready &&
1021 workerNode.usage.tasks.queued === 0
1022 ) {
1023 if (
1024 this.workerNodes[workerNodeId].usage.tasks.executing <
1025 (this.opts.tasksQueueOptions?.concurrency as number)
1026 ) {
1027 executeTask = true
1028 }
1029 targetWorkerNodeKey = workerNodeId
1030 break
1031 }
1032 if (
1033 workerNodeId !== workerNodeKey &&
1034 workerInfo.ready &&
1035 workerNode.usage.tasks.queued < minQueuedTasks
1036 ) {
1037 minQueuedTasks = workerNode.usage.tasks.queued
1038 targetWorkerNodeKey = workerNodeId
1039 }
1040 }
1041 if (executeTask) {
1042 this.executeTask(
1043 targetWorkerNodeKey,
1044 this.dequeueTask(workerNodeKey) as Task<Data>
1045 )
1046 } else {
1047 this.enqueueTask(
1048 targetWorkerNodeKey,
1049 this.dequeueTask(workerNodeKey) as Task<Data>
1050 )
1051 }
1052 }
1053 }
1054
1055 /**
1056 * This method is the listener registered for each worker message.
1057 *
1058 * @returns The listener function to execute when a message is received from a worker.
1059 */
1060 protected workerListener (): (message: MessageValue<Response>) => void {
1061 return message => {
1062 this.checkMessageWorkerId(message)
1063 if (message.ready != null) {
1064 // Worker ready response received from worker
1065 this.handleWorkerReadyResponse(message)
1066 } else if (message.taskId != null) {
1067 // Task execution response received from worker
1068 this.handleTaskExecutionResponse(message)
1069 }
1070 }
1071 }
1072
1073 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
1074 this.getWorkerInfo(
1075 this.getWorkerNodeKeyByWorkerId(message.workerId)
1076 ).ready = message.ready as boolean
1077 if (this.emitter != null && this.ready) {
1078 this.emitter.emit(PoolEvents.ready, this.info)
1079 }
1080 }
1081
1082 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1083 const promiseResponse = this.promiseResponseMap.get(
1084 message.taskId as string
1085 )
1086 if (promiseResponse != null) {
1087 if (message.taskError != null) {
1088 this.emitter?.emit(PoolEvents.taskError, message.taskError)
1089 promiseResponse.reject(message.taskError.message)
1090 } else {
1091 promiseResponse.resolve(message.data as Response)
1092 }
1093 const workerNodeKey = promiseResponse.workerNodeKey
1094 this.afterTaskExecutionHook(workerNodeKey, message)
1095 this.promiseResponseMap.delete(message.taskId as string)
1096 if (
1097 this.opts.enableTasksQueue === true &&
1098 this.tasksQueueSize(workerNodeKey) > 0 &&
1099 this.workerNodes[workerNodeKey].usage.tasks.executing <
1100 (this.opts.tasksQueueOptions?.concurrency as number)
1101 ) {
1102 this.executeTask(
1103 workerNodeKey,
1104 this.dequeueTask(workerNodeKey) as Task<Data>
1105 )
1106 }
1107 this.workerChoiceStrategyContext.update(workerNodeKey)
1108 }
1109 }
1110
1111 private checkAndEmitEvents (): void {
1112 if (this.emitter != null) {
1113 if (this.busy) {
1114 this.emitter.emit(PoolEvents.busy, this.info)
1115 }
1116 if (this.type === PoolTypes.dynamic && this.full) {
1117 this.emitter.emit(PoolEvents.full, this.info)
1118 }
1119 }
1120 }
1121
1122 /**
1123 * Gets the worker information given its worker node key.
1124 *
1125 * @param workerNodeKey - The worker node key.
1126 * @returns The worker information.
1127 */
1128 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
1129 return this.workerNodes[workerNodeKey].info
1130 }
1131
1132 /**
1133 * Adds the given worker in the pool worker nodes.
1134 *
1135 * @param worker - The worker.
1136 * @returns The added worker node key.
1137 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1138 */
1139 private addWorkerNode (worker: Worker): number {
1140 const workerNode = new WorkerNode<Worker, Data>(worker, this.worker)
1141 // Flag the worker node as ready at pool startup.
1142 if (this.starting) {
1143 workerNode.info.ready = true
1144 }
1145 this.workerNodes.push(workerNode)
1146 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1147 if (workerNodeKey === -1) {
1148 throw new Error('Worker node not found')
1149 }
1150 return workerNodeKey
1151 }
1152
1153 /**
1154 * Removes the given worker from the pool worker nodes.
1155 *
1156 * @param worker - The worker.
1157 */
1158 private removeWorkerNode (worker: Worker): void {
1159 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1160 if (workerNodeKey !== -1) {
1161 this.workerNodes.splice(workerNodeKey, 1)
1162 this.workerChoiceStrategyContext.remove(workerNodeKey)
1163 }
1164 }
1165
1166 /**
1167 * Executes the given task on the worker given its worker node key.
1168 *
1169 * @param workerNodeKey - The worker node key.
1170 * @param task - The task to execute.
1171 */
1172 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1173 this.beforeTaskExecutionHook(workerNodeKey, task)
1174 this.sendToWorker(
1175 workerNodeKey,
1176 task,
1177 this.worker === WorkerTypes.thread && task.transferList != null
1178 ? task.transferList
1179 : undefined
1180 )
1181 }
1182
1183 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1184 return this.workerNodes[workerNodeKey].enqueueTask(task)
1185 }
1186
1187 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1188 return this.workerNodes[workerNodeKey].dequeueTask()
1189 }
1190
1191 private tasksQueueSize (workerNodeKey: number): number {
1192 return this.workerNodes[workerNodeKey].tasksQueueSize()
1193 }
1194
1195 protected flushTasksQueue (workerNodeKey: number): void {
1196 while (this.tasksQueueSize(workerNodeKey) > 0) {
1197 this.executeTask(
1198 workerNodeKey,
1199 this.dequeueTask(workerNodeKey) as Task<Data>
1200 )
1201 }
1202 this.workerNodes[workerNodeKey].clearTasksQueue()
1203 }
1204
1205 private flushTasksQueues (): void {
1206 for (const [workerNodeKey] of this.workerNodes.entries()) {
1207 this.flushTasksQueue(workerNodeKey)
1208 }
1209 }
1210 }