fix: ensure the task concurrency is respected at queued task
[poolifier.git] / src / pools / abstract-pool.ts
1 import { randomUUID } from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import { existsSync } from 'node:fs'
4 import type {
5 MessageValue,
6 PromiseResponseWrapper,
7 Task
8 } from '../utility-types'
9 import {
10 DEFAULT_TASK_NAME,
11 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
12 EMPTY_FUNCTION,
13 isAsyncFunction,
14 isKillBehavior,
15 isPlainObject,
16 median,
17 round,
18 updateMeasurementStatistics
19 } from '../utils'
20 import { KillBehaviors } from '../worker/worker-options'
21 import {
22 type IPool,
23 PoolEmitter,
24 PoolEvents,
25 type PoolInfo,
26 type PoolOptions,
27 type PoolType,
28 PoolTypes,
29 type TasksQueueOptions
30 } from './pool'
31 import type {
32 IWorker,
33 IWorkerNode,
34 WorkerInfo,
35 WorkerType,
36 WorkerUsage
37 } from './worker'
38 import {
39 type MeasurementStatisticsRequirements,
40 Measurements,
41 WorkerChoiceStrategies,
42 type WorkerChoiceStrategy,
43 type WorkerChoiceStrategyOptions
44 } from './selection-strategies/selection-strategies-types'
45 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
46 import { version } from './version'
47 import { WorkerNode } from './worker-node'
48
49 /**
50 * Base class that implements some shared logic for all poolifier pools.
51 *
52 * @typeParam Worker - Type of worker which manages this pool.
53 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
54 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
55 */
56 export abstract class AbstractPool<
57 Worker extends IWorker,
58 Data = unknown,
59 Response = unknown
60 > implements IPool<Worker, Data, Response> {
61 /** @inheritDoc */
62 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
63
64 /** @inheritDoc */
65 public readonly emitter?: PoolEmitter
66
67 /**
68 * The task execution response promise map.
69 *
70 * - `key`: The message id of each submitted task.
71 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
72 *
73 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
74 */
75 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
76 new Map<string, PromiseResponseWrapper<Response>>()
77
78 /**
79 * Worker choice strategy context referencing a worker choice algorithm implementation.
80 */
81 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
82 Worker,
83 Data,
84 Response
85 >
86
87 /**
88 * Whether the pool is starting or not.
89 */
90 private readonly starting: boolean
91 /**
92 * The start timestamp of the pool.
93 */
94 private readonly startTimestamp
95
96 /**
97 * Constructs a new poolifier pool.
98 *
99 * @param numberOfWorkers - Number of workers that this pool should manage.
100 * @param filePath - Path to the worker file.
101 * @param opts - Options for the pool.
102 */
103 public constructor (
104 protected readonly numberOfWorkers: number,
105 protected readonly filePath: string,
106 protected readonly opts: PoolOptions<Worker>
107 ) {
108 if (!this.isMain()) {
109 throw new Error('Cannot start a pool from a worker!')
110 }
111 this.checkNumberOfWorkers(this.numberOfWorkers)
112 this.checkFilePath(this.filePath)
113 this.checkPoolOptions(this.opts)
114
115 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
116 this.executeTask = this.executeTask.bind(this)
117 this.enqueueTask = this.enqueueTask.bind(this)
118 this.dequeueTask = this.dequeueTask.bind(this)
119 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
120
121 if (this.opts.enableEvents === true) {
122 this.emitter = new PoolEmitter()
123 }
124 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
125 Worker,
126 Data,
127 Response
128 >(
129 this,
130 this.opts.workerChoiceStrategy,
131 this.opts.workerChoiceStrategyOptions
132 )
133
134 this.setupHook()
135
136 this.starting = true
137 this.startPool()
138 this.starting = false
139
140 this.startTimestamp = performance.now()
141 }
142
143 private checkFilePath (filePath: string): void {
144 if (
145 filePath == null ||
146 typeof filePath !== 'string' ||
147 (typeof filePath === 'string' && filePath.trim().length === 0)
148 ) {
149 throw new Error('Please specify a file with a worker implementation')
150 }
151 if (!existsSync(filePath)) {
152 throw new Error(`Cannot find the worker file '${filePath}'`)
153 }
154 }
155
156 private checkNumberOfWorkers (numberOfWorkers: number): void {
157 if (numberOfWorkers == null) {
158 throw new Error(
159 'Cannot instantiate a pool without specifying the number of workers'
160 )
161 } else if (!Number.isSafeInteger(numberOfWorkers)) {
162 throw new TypeError(
163 'Cannot instantiate a pool with a non safe integer number of workers'
164 )
165 } else if (numberOfWorkers < 0) {
166 throw new RangeError(
167 'Cannot instantiate a pool with a negative number of workers'
168 )
169 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
170 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
171 }
172 }
173
174 protected checkDynamicPoolSize (min: number, max: number): void {
175 if (this.type === PoolTypes.dynamic) {
176 if (max == null) {
177 throw new Error(
178 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
179 )
180 } else if (!Number.isSafeInteger(max)) {
181 throw new TypeError(
182 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
183 )
184 } else if (min > max) {
185 throw new RangeError(
186 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
187 )
188 } else if (max === 0) {
189 throw new RangeError(
190 'Cannot instantiate a dynamic pool with a pool size equal to zero'
191 )
192 } else if (min === max) {
193 throw new RangeError(
194 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
195 )
196 }
197 }
198 }
199
200 private checkPoolOptions (opts: PoolOptions<Worker>): void {
201 if (isPlainObject(opts)) {
202 this.opts.workerChoiceStrategy =
203 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
204 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
205 this.opts.workerChoiceStrategyOptions =
206 opts.workerChoiceStrategyOptions ??
207 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
208 this.checkValidWorkerChoiceStrategyOptions(
209 this.opts.workerChoiceStrategyOptions
210 )
211 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
212 this.opts.enableEvents = opts.enableEvents ?? true
213 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
214 if (this.opts.enableTasksQueue) {
215 this.checkValidTasksQueueOptions(
216 opts.tasksQueueOptions as TasksQueueOptions
217 )
218 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
219 opts.tasksQueueOptions as TasksQueueOptions
220 )
221 }
222 } else {
223 throw new TypeError('Invalid pool options: must be a plain object')
224 }
225 }
226
227 private checkValidWorkerChoiceStrategy (
228 workerChoiceStrategy: WorkerChoiceStrategy
229 ): void {
230 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
231 throw new Error(
232 `Invalid worker choice strategy '${workerChoiceStrategy}'`
233 )
234 }
235 }
236
237 private checkValidWorkerChoiceStrategyOptions (
238 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
239 ): void {
240 if (!isPlainObject(workerChoiceStrategyOptions)) {
241 throw new TypeError(
242 'Invalid worker choice strategy options: must be a plain object'
243 )
244 }
245 if (
246 workerChoiceStrategyOptions.weights != null &&
247 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
248 ) {
249 throw new Error(
250 'Invalid worker choice strategy options: must have a weight for each worker node'
251 )
252 }
253 if (
254 workerChoiceStrategyOptions.measurement != null &&
255 !Object.values(Measurements).includes(
256 workerChoiceStrategyOptions.measurement
257 )
258 ) {
259 throw new Error(
260 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
261 )
262 }
263 }
264
265 private checkValidTasksQueueOptions (
266 tasksQueueOptions: TasksQueueOptions
267 ): void {
268 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
269 throw new TypeError('Invalid tasks queue options: must be a plain object')
270 }
271 if (
272 tasksQueueOptions?.concurrency != null &&
273 !Number.isSafeInteger(tasksQueueOptions.concurrency)
274 ) {
275 throw new TypeError(
276 'Invalid worker tasks concurrency: must be an integer'
277 )
278 }
279 if (
280 tasksQueueOptions?.concurrency != null &&
281 tasksQueueOptions.concurrency <= 0
282 ) {
283 throw new Error(
284 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
285 )
286 }
287 }
288
289 private startPool (): void {
290 while (
291 this.workerNodes.reduce(
292 (accumulator, workerNode) =>
293 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
294 0
295 ) < this.numberOfWorkers
296 ) {
297 this.createAndSetupWorkerNode()
298 }
299 }
300
301 /** @inheritDoc */
302 public get info (): PoolInfo {
303 return {
304 version,
305 type: this.type,
306 worker: this.worker,
307 ready: this.ready,
308 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
309 minSize: this.minSize,
310 maxSize: this.maxSize,
311 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
312 .runTime.aggregate &&
313 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
314 .waitTime.aggregate && { utilization: round(this.utilization) }),
315 workerNodes: this.workerNodes.length,
316 idleWorkerNodes: this.workerNodes.reduce(
317 (accumulator, workerNode) =>
318 workerNode.usage.tasks.executing === 0
319 ? accumulator + 1
320 : accumulator,
321 0
322 ),
323 busyWorkerNodes: this.workerNodes.reduce(
324 (accumulator, workerNode) =>
325 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
326 0
327 ),
328 executedTasks: this.workerNodes.reduce(
329 (accumulator, workerNode) =>
330 accumulator + workerNode.usage.tasks.executed,
331 0
332 ),
333 executingTasks: this.workerNodes.reduce(
334 (accumulator, workerNode) =>
335 accumulator + workerNode.usage.tasks.executing,
336 0
337 ),
338 queuedTasks: this.workerNodes.reduce(
339 (accumulator, workerNode) =>
340 accumulator + workerNode.usage.tasks.queued,
341 0
342 ),
343 maxQueuedTasks: this.workerNodes.reduce(
344 (accumulator, workerNode) =>
345 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
346 0
347 ),
348 failedTasks: this.workerNodes.reduce(
349 (accumulator, workerNode) =>
350 accumulator + workerNode.usage.tasks.failed,
351 0
352 ),
353 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
354 .runTime.aggregate && {
355 runTime: {
356 minimum: round(
357 Math.min(
358 ...this.workerNodes.map(
359 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
360 )
361 )
362 ),
363 maximum: round(
364 Math.max(
365 ...this.workerNodes.map(
366 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
367 )
368 )
369 ),
370 average: round(
371 this.workerNodes.reduce(
372 (accumulator, workerNode) =>
373 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
374 0
375 ) /
376 this.workerNodes.reduce(
377 (accumulator, workerNode) =>
378 accumulator + (workerNode.usage.tasks?.executed ?? 0),
379 0
380 )
381 ),
382 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
383 .runTime.median && {
384 median: round(
385 median(
386 this.workerNodes.map(
387 workerNode => workerNode.usage.runTime?.median ?? 0
388 )
389 )
390 )
391 })
392 }
393 }),
394 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
395 .waitTime.aggregate && {
396 waitTime: {
397 minimum: round(
398 Math.min(
399 ...this.workerNodes.map(
400 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
401 )
402 )
403 ),
404 maximum: round(
405 Math.max(
406 ...this.workerNodes.map(
407 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
408 )
409 )
410 ),
411 average: round(
412 this.workerNodes.reduce(
413 (accumulator, workerNode) =>
414 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
415 0
416 ) /
417 this.workerNodes.reduce(
418 (accumulator, workerNode) =>
419 accumulator + (workerNode.usage.tasks?.executed ?? 0),
420 0
421 )
422 ),
423 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
424 .waitTime.median && {
425 median: round(
426 median(
427 this.workerNodes.map(
428 workerNode => workerNode.usage.waitTime?.median ?? 0
429 )
430 )
431 )
432 })
433 }
434 })
435 }
436 }
437
438 /**
439 * The pool readiness boolean status.
440 */
441 private get ready (): boolean {
442 return (
443 this.workerNodes.reduce(
444 (accumulator, workerNode) =>
445 !workerNode.info.dynamic && workerNode.info.ready
446 ? accumulator + 1
447 : accumulator,
448 0
449 ) >= this.minSize
450 )
451 }
452
453 /**
454 * The approximate pool utilization.
455 *
456 * @returns The pool utilization.
457 */
458 private get utilization (): number {
459 const poolTimeCapacity =
460 (performance.now() - this.startTimestamp) * this.maxSize
461 const totalTasksRunTime = this.workerNodes.reduce(
462 (accumulator, workerNode) =>
463 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
464 0
465 )
466 const totalTasksWaitTime = this.workerNodes.reduce(
467 (accumulator, workerNode) =>
468 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
469 0
470 )
471 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
472 }
473
474 /**
475 * The pool type.
476 *
477 * If it is `'dynamic'`, it provides the `max` property.
478 */
479 protected abstract get type (): PoolType
480
481 /**
482 * The worker type.
483 */
484 protected abstract get worker (): WorkerType
485
486 /**
487 * The pool minimum size.
488 */
489 protected abstract get minSize (): number
490
491 /**
492 * The pool maximum size.
493 */
494 protected abstract get maxSize (): number
495
496 /**
497 * Checks if the worker id sent in the received message from a worker is valid.
498 *
499 * @param message - The received message.
500 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
501 */
502 private checkMessageWorkerId (message: MessageValue<Response>): void {
503 if (
504 message.workerId != null &&
505 this.getWorkerNodeKeyByWorkerId(message.workerId) == null
506 ) {
507 throw new Error(
508 `Worker message received from unknown worker '${message.workerId}'`
509 )
510 }
511 }
512
513 /**
514 * Gets the given worker its worker node key.
515 *
516 * @param worker - The worker.
517 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
518 */
519 private getWorkerNodeKey (worker: Worker): number {
520 return this.workerNodes.findIndex(
521 workerNode => workerNode.worker === worker
522 )
523 }
524
525 /**
526 * Gets the worker node key given its worker id.
527 *
528 * @param workerId - The worker id.
529 * @returns The worker node key if the worker id is found in the pool worker nodes, `undefined` otherwise.
530 */
531 private getWorkerNodeKeyByWorkerId (workerId: number): number | undefined {
532 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
533 if (workerNode.info.id === workerId) {
534 return workerNodeKey
535 }
536 }
537 }
538
539 /** @inheritDoc */
540 public setWorkerChoiceStrategy (
541 workerChoiceStrategy: WorkerChoiceStrategy,
542 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
543 ): void {
544 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
545 this.opts.workerChoiceStrategy = workerChoiceStrategy
546 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
547 this.opts.workerChoiceStrategy
548 )
549 if (workerChoiceStrategyOptions != null) {
550 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
551 }
552 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
553 workerNode.resetUsage()
554 this.sendWorkerStatisticsMessageToWorker(workerNodeKey)
555 }
556 }
557
558 /** @inheritDoc */
559 public setWorkerChoiceStrategyOptions (
560 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
561 ): void {
562 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
563 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
564 this.workerChoiceStrategyContext.setOptions(
565 this.opts.workerChoiceStrategyOptions
566 )
567 }
568
569 /** @inheritDoc */
570 public enableTasksQueue (
571 enable: boolean,
572 tasksQueueOptions?: TasksQueueOptions
573 ): void {
574 if (this.opts.enableTasksQueue === true && !enable) {
575 this.flushTasksQueues()
576 }
577 this.opts.enableTasksQueue = enable
578 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
579 }
580
581 /** @inheritDoc */
582 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
583 if (this.opts.enableTasksQueue === true) {
584 this.checkValidTasksQueueOptions(tasksQueueOptions)
585 this.opts.tasksQueueOptions =
586 this.buildTasksQueueOptions(tasksQueueOptions)
587 } else if (this.opts.tasksQueueOptions != null) {
588 delete this.opts.tasksQueueOptions
589 }
590 }
591
592 private buildTasksQueueOptions (
593 tasksQueueOptions: TasksQueueOptions
594 ): TasksQueueOptions {
595 return {
596 concurrency: tasksQueueOptions?.concurrency ?? 1
597 }
598 }
599
600 /**
601 * Whether the pool is full or not.
602 *
603 * The pool filling boolean status.
604 */
605 protected get full (): boolean {
606 return this.workerNodes.length >= this.maxSize
607 }
608
609 /**
610 * Whether the pool is busy or not.
611 *
612 * The pool busyness boolean status.
613 */
614 protected abstract get busy (): boolean
615
616 /**
617 * Whether worker nodes are executing at least one task.
618 *
619 * @returns Worker nodes busyness boolean status.
620 */
621 protected internalBusy (): boolean {
622 return (
623 this.workerNodes.findIndex(workerNode => {
624 return workerNode.usage.tasks.executing === 0
625 }) === -1
626 )
627 }
628
629 /** @inheritDoc */
630 public async execute (data?: Data, name?: string): Promise<Response> {
631 return await new Promise<Response>((resolve, reject) => {
632 const timestamp = performance.now()
633 const workerNodeKey = this.chooseWorkerNode()
634 const task: Task<Data> = {
635 name: name ?? DEFAULT_TASK_NAME,
636 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
637 data: data ?? ({} as Data),
638 timestamp,
639 workerId: this.getWorkerInfo(workerNodeKey).id as number,
640 id: randomUUID()
641 }
642 this.promiseResponseMap.set(task.id as string, {
643 resolve,
644 reject,
645 workerNodeKey
646 })
647 if (
648 this.opts.enableTasksQueue === true &&
649 (this.busy ||
650 this.workerNodes[workerNodeKey].usage.tasks.executing >=
651 ((this.opts.tasksQueueOptions as TasksQueueOptions)
652 .concurrency as number))
653 ) {
654 this.enqueueTask(workerNodeKey, task)
655 } else {
656 this.executeTask(workerNodeKey, task)
657 }
658 this.checkAndEmitEvents()
659 })
660 }
661
662 /** @inheritDoc */
663 public async destroy (): Promise<void> {
664 await Promise.all(
665 this.workerNodes.map(async (workerNode, workerNodeKey) => {
666 this.flushTasksQueue(workerNodeKey)
667 // FIXME: wait for tasks to be finished
668 const workerExitPromise = new Promise<void>(resolve => {
669 workerNode.worker.on('exit', () => {
670 resolve()
671 })
672 })
673 await this.destroyWorkerNode(workerNodeKey)
674 await workerExitPromise
675 })
676 )
677 }
678
679 /**
680 * Terminates the worker node given its worker node key.
681 *
682 * @param workerNodeKey - The worker node key.
683 */
684 protected abstract destroyWorkerNode (
685 workerNodeKey: number
686 ): void | Promise<void>
687
688 /**
689 * Setup hook to execute code before worker nodes are created in the abstract constructor.
690 * Can be overridden.
691 *
692 * @virtual
693 */
694 protected setupHook (): void {
695 // Intentionally empty
696 }
697
698 /**
699 * Should return whether the worker is the main worker or not.
700 */
701 protected abstract isMain (): boolean
702
703 /**
704 * Hook executed before the worker task execution.
705 * Can be overridden.
706 *
707 * @param workerNodeKey - The worker node key.
708 * @param task - The task to execute.
709 */
710 protected beforeTaskExecutionHook (
711 workerNodeKey: number,
712 task: Task<Data>
713 ): void {
714 const workerUsage = this.workerNodes[workerNodeKey].usage
715 ++workerUsage.tasks.executing
716 this.updateWaitTimeWorkerUsage(workerUsage, task)
717 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
718 task.name as string
719 ) as WorkerUsage
720 ++taskWorkerUsage.tasks.executing
721 this.updateWaitTimeWorkerUsage(taskWorkerUsage, task)
722 }
723
724 /**
725 * Hook executed after the worker task execution.
726 * Can be overridden.
727 *
728 * @param workerNodeKey - The worker node key.
729 * @param message - The received message.
730 */
731 protected afterTaskExecutionHook (
732 workerNodeKey: number,
733 message: MessageValue<Response>
734 ): void {
735 const workerUsage = this.workerNodes[workerNodeKey].usage
736 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
737 this.updateRunTimeWorkerUsage(workerUsage, message)
738 this.updateEluWorkerUsage(workerUsage, message)
739 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
740 message.taskPerformance?.name ?? DEFAULT_TASK_NAME
741 ) as WorkerUsage
742 this.updateTaskStatisticsWorkerUsage(taskWorkerUsage, message)
743 this.updateRunTimeWorkerUsage(taskWorkerUsage, message)
744 this.updateEluWorkerUsage(taskWorkerUsage, message)
745 }
746
747 private updateTaskStatisticsWorkerUsage (
748 workerUsage: WorkerUsage,
749 message: MessageValue<Response>
750 ): void {
751 const workerTaskStatistics = workerUsage.tasks
752 --workerTaskStatistics.executing
753 if (message.taskError == null) {
754 ++workerTaskStatistics.executed
755 } else {
756 ++workerTaskStatistics.failed
757 }
758 }
759
760 private updateRunTimeWorkerUsage (
761 workerUsage: WorkerUsage,
762 message: MessageValue<Response>
763 ): void {
764 updateMeasurementStatistics(
765 workerUsage.runTime,
766 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
767 message.taskPerformance?.runTime ?? 0,
768 workerUsage.tasks.executed
769 )
770 }
771
772 private updateWaitTimeWorkerUsage (
773 workerUsage: WorkerUsage,
774 task: Task<Data>
775 ): void {
776 const timestamp = performance.now()
777 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
778 updateMeasurementStatistics(
779 workerUsage.waitTime,
780 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
781 taskWaitTime,
782 workerUsage.tasks.executed
783 )
784 }
785
786 private updateEluWorkerUsage (
787 workerUsage: WorkerUsage,
788 message: MessageValue<Response>
789 ): void {
790 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
791 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
792 updateMeasurementStatistics(
793 workerUsage.elu.active,
794 eluTaskStatisticsRequirements,
795 message.taskPerformance?.elu?.active ?? 0,
796 workerUsage.tasks.executed
797 )
798 updateMeasurementStatistics(
799 workerUsage.elu.idle,
800 eluTaskStatisticsRequirements,
801 message.taskPerformance?.elu?.idle ?? 0,
802 workerUsage.tasks.executed
803 )
804 if (eluTaskStatisticsRequirements.aggregate) {
805 if (message.taskPerformance?.elu != null) {
806 if (workerUsage.elu.utilization != null) {
807 workerUsage.elu.utilization =
808 (workerUsage.elu.utilization +
809 message.taskPerformance.elu.utilization) /
810 2
811 } else {
812 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
813 }
814 }
815 }
816 }
817
818 /**
819 * Chooses a worker node for the next task.
820 *
821 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
822 *
823 * @returns The chosen worker node key
824 */
825 private chooseWorkerNode (): number {
826 if (this.shallCreateDynamicWorker()) {
827 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
828 if (
829 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
830 ) {
831 return workerNodeKey
832 }
833 }
834 return this.workerChoiceStrategyContext.execute()
835 }
836
837 /**
838 * Conditions for dynamic worker creation.
839 *
840 * @returns Whether to create a dynamic worker or not.
841 */
842 private shallCreateDynamicWorker (): boolean {
843 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
844 }
845
846 /**
847 * Sends a message to worker given its worker node key.
848 *
849 * @param workerNodeKey - The worker node key.
850 * @param message - The message.
851 */
852 protected abstract sendToWorker (
853 workerNodeKey: number,
854 message: MessageValue<Data>
855 ): void
856
857 /**
858 * Creates a new worker.
859 *
860 * @returns Newly created worker.
861 */
862 protected abstract createWorker (): Worker
863
864 /**
865 * Creates a new, completely set up worker node.
866 *
867 * @returns New, completely set up worker node key.
868 */
869 protected createAndSetupWorkerNode (): number {
870 const worker = this.createWorker()
871
872 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
873 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
874 worker.on('error', error => {
875 const workerNodeKey = this.getWorkerNodeKey(worker)
876 const workerInfo = this.getWorkerInfo(workerNodeKey)
877 workerInfo.ready = false
878 this.workerNodes[workerNodeKey].closeChannel()
879 this.emitter?.emit(PoolEvents.error, error)
880 if (this.opts.restartWorkerOnError === true && !this.starting) {
881 if (workerInfo.dynamic) {
882 this.createAndSetupDynamicWorkerNode()
883 } else {
884 this.createAndSetupWorkerNode()
885 }
886 }
887 if (this.opts.enableTasksQueue === true) {
888 this.redistributeQueuedTasks(workerNodeKey)
889 }
890 })
891 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
892 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
893 worker.once('exit', () => {
894 this.removeWorkerNode(worker)
895 })
896
897 const workerNodeKey = this.addWorkerNode(worker)
898
899 this.afterWorkerNodeSetup(workerNodeKey)
900
901 return workerNodeKey
902 }
903
904 /**
905 * Creates a new, completely set up dynamic worker node.
906 *
907 * @returns New, completely set up dynamic worker node key.
908 */
909 protected createAndSetupDynamicWorkerNode (): number {
910 const workerNodeKey = this.createAndSetupWorkerNode()
911 this.registerWorkerMessageListener(workerNodeKey, message => {
912 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
913 message.workerId
914 ) as number
915 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
916 if (
917 isKillBehavior(KillBehaviors.HARD, message.kill) ||
918 (message.kill != null &&
919 ((this.opts.enableTasksQueue === false &&
920 workerUsage.tasks.executing === 0) ||
921 (this.opts.enableTasksQueue === true &&
922 workerUsage.tasks.executing === 0 &&
923 this.tasksQueueSize(localWorkerNodeKey) === 0)))
924 ) {
925 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
926 const destroyWorkerNodeBounded = this.destroyWorkerNode.bind(this)
927 if (isAsyncFunction(destroyWorkerNodeBounded)) {
928 (
929 destroyWorkerNodeBounded as (workerNodeKey: number) => Promise<void>
930 )(localWorkerNodeKey).catch(EMPTY_FUNCTION)
931 } else {
932 (destroyWorkerNodeBounded as (workerNodeKey: number) => void)(
933 localWorkerNodeKey
934 )
935 }
936 }
937 })
938 const workerInfo = this.getWorkerInfo(workerNodeKey)
939 workerInfo.dynamic = true
940 if (this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker) {
941 workerInfo.ready = true
942 }
943 this.sendToWorker(workerNodeKey, {
944 checkActive: true,
945 workerId: workerInfo.id as number
946 })
947 return workerNodeKey
948 }
949
950 /**
951 * Registers a listener callback on the worker given its worker node key.
952 *
953 * @param workerNodeKey - The worker node key.
954 * @param listener - The message listener callback.
955 */
956 protected abstract registerWorkerMessageListener<
957 Message extends Data | Response
958 >(
959 workerNodeKey: number,
960 listener: (message: MessageValue<Message>) => void
961 ): void
962
963 /**
964 * Method hooked up after a worker node has been newly created.
965 * Can be overridden.
966 *
967 * @param workerNodeKey - The newly created worker node key.
968 */
969 protected afterWorkerNodeSetup (workerNodeKey: number): void {
970 // Listen to worker messages.
971 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
972 // Send the startup message to worker.
973 this.sendStartupMessageToWorker(workerNodeKey)
974 // Send the worker statistics message to worker.
975 this.sendWorkerStatisticsMessageToWorker(workerNodeKey)
976 }
977
978 /**
979 * Sends the startup message to worker given its worker node key.
980 *
981 * @param workerNodeKey - The worker node key.
982 */
983 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
984
985 /**
986 * Sends the worker statistics message to worker given its worker node key.
987 *
988 * @param workerNodeKey - The worker node key.
989 */
990 private sendWorkerStatisticsMessageToWorker (workerNodeKey: number): void {
991 this.sendToWorker(workerNodeKey, {
992 statistics: {
993 runTime:
994 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
995 .runTime.aggregate,
996 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
997 .elu.aggregate
998 },
999 workerId: this.getWorkerInfo(workerNodeKey).id as number
1000 })
1001 }
1002
1003 private redistributeQueuedTasks (workerNodeKey: number): void {
1004 while (this.tasksQueueSize(workerNodeKey) > 0) {
1005 let targetWorkerNodeKey: number = workerNodeKey
1006 let minQueuedTasks = Infinity
1007 let executeTask = false
1008 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
1009 const workerInfo = this.getWorkerInfo(workerNodeId)
1010 if (
1011 workerNodeId !== workerNodeKey &&
1012 workerInfo.ready &&
1013 workerNode.usage.tasks.queued === 0
1014 ) {
1015 if (
1016 this.workerNodes[workerNodeId].usage.tasks.executing <
1017 (this.opts.tasksQueueOptions?.concurrency as number)
1018 ) {
1019 executeTask = true
1020 }
1021 targetWorkerNodeKey = workerNodeId
1022 break
1023 }
1024 if (
1025 workerNodeId !== workerNodeKey &&
1026 workerInfo.ready &&
1027 workerNode.usage.tasks.queued < minQueuedTasks
1028 ) {
1029 minQueuedTasks = workerNode.usage.tasks.queued
1030 targetWorkerNodeKey = workerNodeId
1031 }
1032 }
1033 if (executeTask) {
1034 this.executeTask(
1035 targetWorkerNodeKey,
1036 this.dequeueTask(workerNodeKey) as Task<Data>
1037 )
1038 } else {
1039 this.enqueueTask(
1040 targetWorkerNodeKey,
1041 this.dequeueTask(workerNodeKey) as Task<Data>
1042 )
1043 }
1044 }
1045 }
1046
1047 /**
1048 * This method is the listener registered for each worker message.
1049 *
1050 * @returns The listener function to execute when a message is received from a worker.
1051 */
1052 protected workerListener (): (message: MessageValue<Response>) => void {
1053 return message => {
1054 this.checkMessageWorkerId(message)
1055 if (message.ready != null) {
1056 // Worker ready response received
1057 this.handleWorkerReadyResponse(message)
1058 } else if (message.id != null) {
1059 // Task execution response received
1060 this.handleTaskExecutionResponse(message)
1061 }
1062 }
1063 }
1064
1065 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
1066 this.getWorkerInfo(
1067 this.getWorkerNodeKeyByWorkerId(message.workerId) as number
1068 ).ready = message.ready as boolean
1069 if (this.emitter != null && this.ready) {
1070 this.emitter.emit(PoolEvents.ready, this.info)
1071 }
1072 }
1073
1074 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1075 const promiseResponse = this.promiseResponseMap.get(message.id as string)
1076 if (promiseResponse != null) {
1077 if (message.taskError != null) {
1078 this.emitter?.emit(PoolEvents.taskError, message.taskError)
1079 promiseResponse.reject(message.taskError.message)
1080 } else {
1081 promiseResponse.resolve(message.data as Response)
1082 }
1083 const workerNodeKey = promiseResponse.workerNodeKey
1084 this.afterTaskExecutionHook(workerNodeKey, message)
1085 this.promiseResponseMap.delete(message.id as string)
1086 if (
1087 this.opts.enableTasksQueue === true &&
1088 this.tasksQueueSize(workerNodeKey) > 0
1089 ) {
1090 this.executeTask(
1091 workerNodeKey,
1092 this.dequeueTask(workerNodeKey) as Task<Data>
1093 )
1094 }
1095 this.workerChoiceStrategyContext.update(workerNodeKey)
1096 }
1097 }
1098
1099 private checkAndEmitEvents (): void {
1100 if (this.emitter != null) {
1101 if (this.busy) {
1102 this.emitter.emit(PoolEvents.busy, this.info)
1103 }
1104 if (this.type === PoolTypes.dynamic && this.full) {
1105 this.emitter.emit(PoolEvents.full, this.info)
1106 }
1107 }
1108 }
1109
1110 /**
1111 * Gets the worker information given its worker node key.
1112 *
1113 * @param workerNodeKey - The worker node key.
1114 * @returns The worker information.
1115 */
1116 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
1117 return this.workerNodes[workerNodeKey].info
1118 }
1119
1120 /**
1121 * Adds the given worker in the pool worker nodes.
1122 *
1123 * @param worker - The worker.
1124 * @returns The added worker node key.
1125 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1126 */
1127 private addWorkerNode (worker: Worker): number {
1128 const workerNode = new WorkerNode<Worker, Data>(worker, this.worker)
1129 // Flag the worker node as ready at pool startup.
1130 if (this.starting) {
1131 workerNode.info.ready = true
1132 }
1133 this.workerNodes.push(workerNode)
1134 const workerNodeKey = this.getWorkerNodeKey(worker)
1135 if (workerNodeKey === -1) {
1136 throw new Error('Worker node not found')
1137 }
1138 return workerNodeKey
1139 }
1140
1141 /**
1142 * Removes the given worker from the pool worker nodes.
1143 *
1144 * @param worker - The worker.
1145 */
1146 private removeWorkerNode (worker: Worker): void {
1147 const workerNodeKey = this.getWorkerNodeKey(worker)
1148 if (workerNodeKey !== -1) {
1149 this.workerNodes.splice(workerNodeKey, 1)
1150 this.workerChoiceStrategyContext.remove(workerNodeKey)
1151 }
1152 }
1153
1154 /**
1155 * Executes the given task on the worker given its worker node key.
1156 *
1157 * @param workerNodeKey - The worker node key.
1158 * @param task - The task to execute.
1159 */
1160 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1161 this.beforeTaskExecutionHook(workerNodeKey, task)
1162 this.sendToWorker(workerNodeKey, task)
1163 }
1164
1165 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1166 return this.workerNodes[workerNodeKey].enqueueTask(task)
1167 }
1168
1169 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1170 return this.workerNodes[workerNodeKey].dequeueTask()
1171 }
1172
1173 private tasksQueueSize (workerNodeKey: number): number {
1174 return this.workerNodes[workerNodeKey].tasksQueueSize()
1175 }
1176
1177 private flushTasksQueue (workerNodeKey: number): void {
1178 while (this.tasksQueueSize(workerNodeKey) > 0) {
1179 this.executeTask(
1180 workerNodeKey,
1181 this.dequeueTask(workerNodeKey) as Task<Data>
1182 )
1183 }
1184 this.workerNodes[workerNodeKey].clearTasksQueue()
1185 }
1186
1187 private flushTasksQueues (): void {
1188 for (const [workerNodeKey] of this.workerNodes.entries()) {
1189 this.flushTasksQueue(workerNodeKey)
1190 }
1191 }
1192 }