fix: ensure tasks redistribution avoid task execution starvation
[poolifier.git] / src / pools / abstract-pool.ts
1 import { randomUUID } from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import { existsSync } from 'node:fs'
4 import type {
5 MessageValue,
6 PromiseResponseWrapper,
7 Task
8 } from '../utility-types'
9 import {
10 DEFAULT_TASK_NAME,
11 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
12 EMPTY_FUNCTION,
13 isAsyncFunction,
14 isKillBehavior,
15 isPlainObject,
16 median,
17 round,
18 updateMeasurementStatistics
19 } from '../utils'
20 import { KillBehaviors } from '../worker/worker-options'
21 import {
22 type IPool,
23 PoolEmitter,
24 PoolEvents,
25 type PoolInfo,
26 type PoolOptions,
27 type PoolType,
28 PoolTypes,
29 type TasksQueueOptions
30 } from './pool'
31 import type {
32 IWorker,
33 IWorkerNode,
34 WorkerInfo,
35 WorkerType,
36 WorkerUsage
37 } from './worker'
38 import {
39 type MeasurementStatisticsRequirements,
40 Measurements,
41 WorkerChoiceStrategies,
42 type WorkerChoiceStrategy,
43 type WorkerChoiceStrategyOptions
44 } from './selection-strategies/selection-strategies-types'
45 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
46 import { version } from './version'
47 import { WorkerNode } from './worker-node'
48
49 /**
50 * Base class that implements some shared logic for all poolifier pools.
51 *
52 * @typeParam Worker - Type of worker which manages this pool.
53 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
54 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
55 */
56 export abstract class AbstractPool<
57 Worker extends IWorker,
58 Data = unknown,
59 Response = unknown
60 > implements IPool<Worker, Data, Response> {
61 /** @inheritDoc */
62 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
63
64 /** @inheritDoc */
65 public readonly emitter?: PoolEmitter
66
67 /**
68 * The task execution response promise map.
69 *
70 * - `key`: The message id of each submitted task.
71 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
72 *
73 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
74 */
75 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
76 new Map<string, PromiseResponseWrapper<Response>>()
77
78 /**
79 * Worker choice strategy context referencing a worker choice algorithm implementation.
80 */
81 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
82 Worker,
83 Data,
84 Response
85 >
86
87 /**
88 * Whether the pool is starting or not.
89 */
90 private readonly starting: boolean
91 /**
92 * The start timestamp of the pool.
93 */
94 private readonly startTimestamp
95
96 /**
97 * Constructs a new poolifier pool.
98 *
99 * @param numberOfWorkers - Number of workers that this pool should manage.
100 * @param filePath - Path to the worker file.
101 * @param opts - Options for the pool.
102 */
103 public constructor (
104 protected readonly numberOfWorkers: number,
105 protected readonly filePath: string,
106 protected readonly opts: PoolOptions<Worker>
107 ) {
108 if (!this.isMain()) {
109 throw new Error('Cannot start a pool from a worker!')
110 }
111 this.checkNumberOfWorkers(this.numberOfWorkers)
112 this.checkFilePath(this.filePath)
113 this.checkPoolOptions(this.opts)
114
115 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
116 this.executeTask = this.executeTask.bind(this)
117 this.enqueueTask = this.enqueueTask.bind(this)
118 this.dequeueTask = this.dequeueTask.bind(this)
119 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
120
121 if (this.opts.enableEvents === true) {
122 this.emitter = new PoolEmitter()
123 }
124 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
125 Worker,
126 Data,
127 Response
128 >(
129 this,
130 this.opts.workerChoiceStrategy,
131 this.opts.workerChoiceStrategyOptions
132 )
133
134 this.setupHook()
135
136 this.starting = true
137 this.startPool()
138 this.starting = false
139
140 this.startTimestamp = performance.now()
141 }
142
143 private checkFilePath (filePath: string): void {
144 if (
145 filePath == null ||
146 typeof filePath !== 'string' ||
147 (typeof filePath === 'string' && filePath.trim().length === 0)
148 ) {
149 throw new Error('Please specify a file with a worker implementation')
150 }
151 if (!existsSync(filePath)) {
152 throw new Error(`Cannot find the worker file '${filePath}'`)
153 }
154 }
155
156 private checkNumberOfWorkers (numberOfWorkers: number): void {
157 if (numberOfWorkers == null) {
158 throw new Error(
159 'Cannot instantiate a pool without specifying the number of workers'
160 )
161 } else if (!Number.isSafeInteger(numberOfWorkers)) {
162 throw new TypeError(
163 'Cannot instantiate a pool with a non safe integer number of workers'
164 )
165 } else if (numberOfWorkers < 0) {
166 throw new RangeError(
167 'Cannot instantiate a pool with a negative number of workers'
168 )
169 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
170 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
171 }
172 }
173
174 protected checkDynamicPoolSize (min: number, max: number): void {
175 if (this.type === PoolTypes.dynamic) {
176 if (min > max) {
177 throw new RangeError(
178 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
179 )
180 } else if (max === 0) {
181 throw new RangeError(
182 'Cannot instantiate a dynamic pool with a pool size equal to zero'
183 )
184 } else if (min === max) {
185 throw new RangeError(
186 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
187 )
188 }
189 }
190 }
191
192 private checkPoolOptions (opts: PoolOptions<Worker>): void {
193 if (isPlainObject(opts)) {
194 this.opts.workerChoiceStrategy =
195 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
196 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
197 this.opts.workerChoiceStrategyOptions =
198 opts.workerChoiceStrategyOptions ??
199 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
200 this.checkValidWorkerChoiceStrategyOptions(
201 this.opts.workerChoiceStrategyOptions
202 )
203 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
204 this.opts.enableEvents = opts.enableEvents ?? true
205 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
206 if (this.opts.enableTasksQueue) {
207 this.checkValidTasksQueueOptions(
208 opts.tasksQueueOptions as TasksQueueOptions
209 )
210 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
211 opts.tasksQueueOptions as TasksQueueOptions
212 )
213 }
214 } else {
215 throw new TypeError('Invalid pool options: must be a plain object')
216 }
217 }
218
219 private checkValidWorkerChoiceStrategy (
220 workerChoiceStrategy: WorkerChoiceStrategy
221 ): void {
222 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
223 throw new Error(
224 `Invalid worker choice strategy '${workerChoiceStrategy}'`
225 )
226 }
227 }
228
229 private checkValidWorkerChoiceStrategyOptions (
230 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
231 ): void {
232 if (!isPlainObject(workerChoiceStrategyOptions)) {
233 throw new TypeError(
234 'Invalid worker choice strategy options: must be a plain object'
235 )
236 }
237 if (
238 workerChoiceStrategyOptions.weights != null &&
239 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
240 ) {
241 throw new Error(
242 'Invalid worker choice strategy options: must have a weight for each worker node'
243 )
244 }
245 if (
246 workerChoiceStrategyOptions.measurement != null &&
247 !Object.values(Measurements).includes(
248 workerChoiceStrategyOptions.measurement
249 )
250 ) {
251 throw new Error(
252 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
253 )
254 }
255 }
256
257 private checkValidTasksQueueOptions (
258 tasksQueueOptions: TasksQueueOptions
259 ): void {
260 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
261 throw new TypeError('Invalid tasks queue options: must be a plain object')
262 }
263 if (
264 tasksQueueOptions?.concurrency != null &&
265 !Number.isSafeInteger(tasksQueueOptions.concurrency)
266 ) {
267 throw new TypeError(
268 'Invalid worker tasks concurrency: must be an integer'
269 )
270 }
271 if (
272 tasksQueueOptions?.concurrency != null &&
273 tasksQueueOptions.concurrency <= 0
274 ) {
275 throw new Error(
276 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
277 )
278 }
279 }
280
281 private startPool (): void {
282 while (
283 this.workerNodes.reduce(
284 (accumulator, workerNode) =>
285 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
286 0
287 ) < this.numberOfWorkers
288 ) {
289 this.createAndSetupWorkerNode()
290 }
291 }
292
293 /** @inheritDoc */
294 public get info (): PoolInfo {
295 return {
296 version,
297 type: this.type,
298 worker: this.worker,
299 ready: this.ready,
300 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
301 minSize: this.minSize,
302 maxSize: this.maxSize,
303 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
304 .runTime.aggregate &&
305 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
306 .waitTime.aggregate && { utilization: round(this.utilization) }),
307 workerNodes: this.workerNodes.length,
308 idleWorkerNodes: this.workerNodes.reduce(
309 (accumulator, workerNode) =>
310 workerNode.usage.tasks.executing === 0
311 ? accumulator + 1
312 : accumulator,
313 0
314 ),
315 busyWorkerNodes: this.workerNodes.reduce(
316 (accumulator, workerNode) =>
317 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
318 0
319 ),
320 executedTasks: this.workerNodes.reduce(
321 (accumulator, workerNode) =>
322 accumulator + workerNode.usage.tasks.executed,
323 0
324 ),
325 executingTasks: this.workerNodes.reduce(
326 (accumulator, workerNode) =>
327 accumulator + workerNode.usage.tasks.executing,
328 0
329 ),
330 queuedTasks: this.workerNodes.reduce(
331 (accumulator, workerNode) =>
332 accumulator + workerNode.usage.tasks.queued,
333 0
334 ),
335 maxQueuedTasks: this.workerNodes.reduce(
336 (accumulator, workerNode) =>
337 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
338 0
339 ),
340 failedTasks: this.workerNodes.reduce(
341 (accumulator, workerNode) =>
342 accumulator + workerNode.usage.tasks.failed,
343 0
344 ),
345 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
346 .runTime.aggregate && {
347 runTime: {
348 minimum: round(
349 Math.min(
350 ...this.workerNodes.map(
351 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
352 )
353 )
354 ),
355 maximum: round(
356 Math.max(
357 ...this.workerNodes.map(
358 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
359 )
360 )
361 ),
362 average: round(
363 this.workerNodes.reduce(
364 (accumulator, workerNode) =>
365 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
366 0
367 ) /
368 this.workerNodes.reduce(
369 (accumulator, workerNode) =>
370 accumulator + (workerNode.usage.tasks?.executed ?? 0),
371 0
372 )
373 ),
374 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
375 .runTime.median && {
376 median: round(
377 median(
378 this.workerNodes.map(
379 workerNode => workerNode.usage.runTime?.median ?? 0
380 )
381 )
382 )
383 })
384 }
385 }),
386 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
387 .waitTime.aggregate && {
388 waitTime: {
389 minimum: round(
390 Math.min(
391 ...this.workerNodes.map(
392 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
393 )
394 )
395 ),
396 maximum: round(
397 Math.max(
398 ...this.workerNodes.map(
399 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
400 )
401 )
402 ),
403 average: round(
404 this.workerNodes.reduce(
405 (accumulator, workerNode) =>
406 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
407 0
408 ) /
409 this.workerNodes.reduce(
410 (accumulator, workerNode) =>
411 accumulator + (workerNode.usage.tasks?.executed ?? 0),
412 0
413 )
414 ),
415 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
416 .waitTime.median && {
417 median: round(
418 median(
419 this.workerNodes.map(
420 workerNode => workerNode.usage.waitTime?.median ?? 0
421 )
422 )
423 )
424 })
425 }
426 })
427 }
428 }
429
430 /**
431 * The pool readiness boolean status.
432 */
433 private get ready (): boolean {
434 return (
435 this.workerNodes.reduce(
436 (accumulator, workerNode) =>
437 !workerNode.info.dynamic && workerNode.info.ready
438 ? accumulator + 1
439 : accumulator,
440 0
441 ) >= this.minSize
442 )
443 }
444
445 /**
446 * The approximate pool utilization.
447 *
448 * @returns The pool utilization.
449 */
450 private get utilization (): number {
451 const poolTimeCapacity =
452 (performance.now() - this.startTimestamp) * this.maxSize
453 const totalTasksRunTime = this.workerNodes.reduce(
454 (accumulator, workerNode) =>
455 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
456 0
457 )
458 const totalTasksWaitTime = this.workerNodes.reduce(
459 (accumulator, workerNode) =>
460 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
461 0
462 )
463 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
464 }
465
466 /**
467 * The pool type.
468 *
469 * If it is `'dynamic'`, it provides the `max` property.
470 */
471 protected abstract get type (): PoolType
472
473 /**
474 * The worker type.
475 */
476 protected abstract get worker (): WorkerType
477
478 /**
479 * The pool minimum size.
480 */
481 protected abstract get minSize (): number
482
483 /**
484 * The pool maximum size.
485 */
486 protected abstract get maxSize (): number
487
488 /**
489 * Checks if the worker id sent in the received message from a worker is valid.
490 *
491 * @param message - The received message.
492 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
493 */
494 private checkMessageWorkerId (message: MessageValue<Response>): void {
495 if (
496 message.workerId != null &&
497 this.getWorkerNodeKeyByWorkerId(message.workerId) == null
498 ) {
499 throw new Error(
500 `Worker message received from unknown worker '${message.workerId}'`
501 )
502 }
503 }
504
505 /**
506 * Gets the given worker its worker node key.
507 *
508 * @param worker - The worker.
509 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
510 */
511 private getWorkerNodeKey (worker: Worker): number {
512 return this.workerNodes.findIndex(
513 workerNode => workerNode.worker === worker
514 )
515 }
516
517 /**
518 * Gets the worker node key given its worker id.
519 *
520 * @param workerId - The worker id.
521 * @returns The worker node key if the worker id is found in the pool worker nodes, `undefined` otherwise.
522 */
523 private getWorkerNodeKeyByWorkerId (workerId: number): number | undefined {
524 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
525 if (workerNode.info.id === workerId) {
526 return workerNodeKey
527 }
528 }
529 }
530
531 /** @inheritDoc */
532 public setWorkerChoiceStrategy (
533 workerChoiceStrategy: WorkerChoiceStrategy,
534 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
535 ): void {
536 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
537 this.opts.workerChoiceStrategy = workerChoiceStrategy
538 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
539 this.opts.workerChoiceStrategy
540 )
541 if (workerChoiceStrategyOptions != null) {
542 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
543 }
544 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
545 workerNode.resetUsage()
546 this.sendWorkerStatisticsMessageToWorker(workerNodeKey)
547 }
548 }
549
550 /** @inheritDoc */
551 public setWorkerChoiceStrategyOptions (
552 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
553 ): void {
554 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
555 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
556 this.workerChoiceStrategyContext.setOptions(
557 this.opts.workerChoiceStrategyOptions
558 )
559 }
560
561 /** @inheritDoc */
562 public enableTasksQueue (
563 enable: boolean,
564 tasksQueueOptions?: TasksQueueOptions
565 ): void {
566 if (this.opts.enableTasksQueue === true && !enable) {
567 this.flushTasksQueues()
568 }
569 this.opts.enableTasksQueue = enable
570 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
571 }
572
573 /** @inheritDoc */
574 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
575 if (this.opts.enableTasksQueue === true) {
576 this.checkValidTasksQueueOptions(tasksQueueOptions)
577 this.opts.tasksQueueOptions =
578 this.buildTasksQueueOptions(tasksQueueOptions)
579 } else if (this.opts.tasksQueueOptions != null) {
580 delete this.opts.tasksQueueOptions
581 }
582 }
583
584 private buildTasksQueueOptions (
585 tasksQueueOptions: TasksQueueOptions
586 ): TasksQueueOptions {
587 return {
588 concurrency: tasksQueueOptions?.concurrency ?? 1
589 }
590 }
591
592 /**
593 * Whether the pool is full or not.
594 *
595 * The pool filling boolean status.
596 */
597 protected get full (): boolean {
598 return this.workerNodes.length >= this.maxSize
599 }
600
601 /**
602 * Whether the pool is busy or not.
603 *
604 * The pool busyness boolean status.
605 */
606 protected abstract get busy (): boolean
607
608 /**
609 * Whether worker nodes are executing at least one task.
610 *
611 * @returns Worker nodes busyness boolean status.
612 */
613 protected internalBusy (): boolean {
614 return (
615 this.workerNodes.findIndex(workerNode => {
616 return workerNode.usage.tasks.executing === 0
617 }) === -1
618 )
619 }
620
621 /** @inheritDoc */
622 public async execute (data?: Data, name?: string): Promise<Response> {
623 return await new Promise<Response>((resolve, reject) => {
624 const timestamp = performance.now()
625 const workerNodeKey = this.chooseWorkerNode()
626 const task: Task<Data> = {
627 name: name ?? DEFAULT_TASK_NAME,
628 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
629 data: data ?? ({} as Data),
630 timestamp,
631 workerId: this.getWorkerInfo(workerNodeKey).id as number,
632 id: randomUUID()
633 }
634 this.promiseResponseMap.set(task.id as string, {
635 resolve,
636 reject,
637 workerNodeKey
638 })
639 if (
640 this.opts.enableTasksQueue === true &&
641 (this.busy ||
642 this.workerNodes[workerNodeKey].usage.tasks.executing >=
643 ((this.opts.tasksQueueOptions as TasksQueueOptions)
644 .concurrency as number))
645 ) {
646 this.enqueueTask(workerNodeKey, task)
647 } else {
648 this.executeTask(workerNodeKey, task)
649 }
650 this.checkAndEmitEvents()
651 })
652 }
653
654 /** @inheritDoc */
655 public async destroy (): Promise<void> {
656 await Promise.all(
657 this.workerNodes.map(async (workerNode, workerNodeKey) => {
658 this.flushTasksQueue(workerNodeKey)
659 // FIXME: wait for tasks to be finished
660 const workerExitPromise = new Promise<void>(resolve => {
661 workerNode.worker.on('exit', () => {
662 resolve()
663 })
664 })
665 await this.destroyWorkerNode(workerNodeKey)
666 await workerExitPromise
667 })
668 )
669 }
670
671 /**
672 * Terminates the worker node given its worker node key.
673 *
674 * @param workerNodeKey - The worker node key.
675 */
676 protected abstract destroyWorkerNode (
677 workerNodeKey: number
678 ): void | Promise<void>
679
680 /**
681 * Setup hook to execute code before worker nodes are created in the abstract constructor.
682 * Can be overridden.
683 *
684 * @virtual
685 */
686 protected setupHook (): void {
687 // Intentionally empty
688 }
689
690 /**
691 * Should return whether the worker is the main worker or not.
692 */
693 protected abstract isMain (): boolean
694
695 /**
696 * Hook executed before the worker task execution.
697 * Can be overridden.
698 *
699 * @param workerNodeKey - The worker node key.
700 * @param task - The task to execute.
701 */
702 protected beforeTaskExecutionHook (
703 workerNodeKey: number,
704 task: Task<Data>
705 ): void {
706 const workerUsage = this.workerNodes[workerNodeKey].usage
707 ++workerUsage.tasks.executing
708 this.updateWaitTimeWorkerUsage(workerUsage, task)
709 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
710 task.name as string
711 ) as WorkerUsage
712 ++taskWorkerUsage.tasks.executing
713 this.updateWaitTimeWorkerUsage(taskWorkerUsage, task)
714 }
715
716 /**
717 * Hook executed after the worker task execution.
718 * Can be overridden.
719 *
720 * @param workerNodeKey - The worker node key.
721 * @param message - The received message.
722 */
723 protected afterTaskExecutionHook (
724 workerNodeKey: number,
725 message: MessageValue<Response>
726 ): void {
727 const workerUsage = this.workerNodes[workerNodeKey].usage
728 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
729 this.updateRunTimeWorkerUsage(workerUsage, message)
730 this.updateEluWorkerUsage(workerUsage, message)
731 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
732 message.taskPerformance?.name ?? DEFAULT_TASK_NAME
733 ) as WorkerUsage
734 this.updateTaskStatisticsWorkerUsage(taskWorkerUsage, message)
735 this.updateRunTimeWorkerUsage(taskWorkerUsage, message)
736 this.updateEluWorkerUsage(taskWorkerUsage, message)
737 }
738
739 private updateTaskStatisticsWorkerUsage (
740 workerUsage: WorkerUsage,
741 message: MessageValue<Response>
742 ): void {
743 const workerTaskStatistics = workerUsage.tasks
744 --workerTaskStatistics.executing
745 if (message.taskError == null) {
746 ++workerTaskStatistics.executed
747 } else {
748 ++workerTaskStatistics.failed
749 }
750 }
751
752 private updateRunTimeWorkerUsage (
753 workerUsage: WorkerUsage,
754 message: MessageValue<Response>
755 ): void {
756 updateMeasurementStatistics(
757 workerUsage.runTime,
758 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
759 message.taskPerformance?.runTime ?? 0,
760 workerUsage.tasks.executed
761 )
762 }
763
764 private updateWaitTimeWorkerUsage (
765 workerUsage: WorkerUsage,
766 task: Task<Data>
767 ): void {
768 const timestamp = performance.now()
769 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
770 updateMeasurementStatistics(
771 workerUsage.waitTime,
772 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
773 taskWaitTime,
774 workerUsage.tasks.executed
775 )
776 }
777
778 private updateEluWorkerUsage (
779 workerUsage: WorkerUsage,
780 message: MessageValue<Response>
781 ): void {
782 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
783 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
784 updateMeasurementStatistics(
785 workerUsage.elu.active,
786 eluTaskStatisticsRequirements,
787 message.taskPerformance?.elu?.active ?? 0,
788 workerUsage.tasks.executed
789 )
790 updateMeasurementStatistics(
791 workerUsage.elu.idle,
792 eluTaskStatisticsRequirements,
793 message.taskPerformance?.elu?.idle ?? 0,
794 workerUsage.tasks.executed
795 )
796 if (eluTaskStatisticsRequirements.aggregate) {
797 if (message.taskPerformance?.elu != null) {
798 if (workerUsage.elu.utilization != null) {
799 workerUsage.elu.utilization =
800 (workerUsage.elu.utilization +
801 message.taskPerformance.elu.utilization) /
802 2
803 } else {
804 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
805 }
806 }
807 }
808 }
809
810 /**
811 * Chooses a worker node for the next task.
812 *
813 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
814 *
815 * @returns The chosen worker node key
816 */
817 private chooseWorkerNode (): number {
818 if (this.shallCreateDynamicWorker()) {
819 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
820 if (
821 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
822 ) {
823 return workerNodeKey
824 }
825 }
826 return this.workerChoiceStrategyContext.execute()
827 }
828
829 /**
830 * Conditions for dynamic worker creation.
831 *
832 * @returns Whether to create a dynamic worker or not.
833 */
834 private shallCreateDynamicWorker (): boolean {
835 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
836 }
837
838 /**
839 * Sends a message to worker given its worker node key.
840 *
841 * @param workerNodeKey - The worker node key.
842 * @param message - The message.
843 */
844 protected abstract sendToWorker (
845 workerNodeKey: number,
846 message: MessageValue<Data>
847 ): void
848
849 /**
850 * Creates a new worker.
851 *
852 * @returns Newly created worker.
853 */
854 protected abstract createWorker (): Worker
855
856 /**
857 * Creates a new, completely set up worker node.
858 *
859 * @returns New, completely set up worker node key.
860 */
861 protected createAndSetupWorkerNode (): number {
862 const worker = this.createWorker()
863
864 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
865 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
866 worker.on('error', error => {
867 const workerNodeKey = this.getWorkerNodeKey(worker)
868 const workerInfo = this.getWorkerInfo(workerNodeKey)
869 workerInfo.ready = false
870 this.workerNodes[workerNodeKey].closeChannel()
871 this.emitter?.emit(PoolEvents.error, error)
872 if (this.opts.restartWorkerOnError === true && !this.starting) {
873 if (workerInfo.dynamic) {
874 this.createAndSetupDynamicWorkerNode()
875 } else {
876 this.createAndSetupWorkerNode()
877 }
878 }
879 if (this.opts.enableTasksQueue === true) {
880 this.redistributeQueuedTasks(workerNodeKey)
881 }
882 })
883 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
884 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
885 worker.once('exit', () => {
886 this.removeWorkerNode(worker)
887 })
888
889 const workerNodeKey = this.addWorkerNode(worker)
890
891 this.afterWorkerNodeSetup(workerNodeKey)
892
893 return workerNodeKey
894 }
895
896 /**
897 * Creates a new, completely set up dynamic worker node.
898 *
899 * @returns New, completely set up dynamic worker node key.
900 */
901 protected createAndSetupDynamicWorkerNode (): number {
902 const workerNodeKey = this.createAndSetupWorkerNode()
903 this.registerWorkerMessageListener(workerNodeKey, message => {
904 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
905 message.workerId
906 ) as number
907 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
908 if (
909 isKillBehavior(KillBehaviors.HARD, message.kill) ||
910 (message.kill != null &&
911 ((this.opts.enableTasksQueue === false &&
912 workerUsage.tasks.executing === 0) ||
913 (this.opts.enableTasksQueue === true &&
914 workerUsage.tasks.executing === 0 &&
915 this.tasksQueueSize(localWorkerNodeKey) === 0)))
916 ) {
917 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
918 const destroyWorkerNodeBounded = this.destroyWorkerNode.bind(this)
919 if (isAsyncFunction(destroyWorkerNodeBounded)) {
920 (
921 destroyWorkerNodeBounded as (workerNodeKey: number) => Promise<void>
922 )(localWorkerNodeKey).catch(EMPTY_FUNCTION)
923 } else {
924 (destroyWorkerNodeBounded as (workerNodeKey: number) => void)(
925 localWorkerNodeKey
926 )
927 }
928 }
929 })
930 const workerInfo = this.getWorkerInfo(workerNodeKey)
931 workerInfo.dynamic = true
932 if (this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker) {
933 workerInfo.ready = true
934 }
935 this.sendToWorker(workerNodeKey, {
936 checkActive: true,
937 workerId: workerInfo.id as number
938 })
939 return workerNodeKey
940 }
941
942 /**
943 * Registers a listener callback on the worker given its worker node key.
944 *
945 * @param workerNodeKey - The worker node key.
946 * @param listener - The message listener callback.
947 */
948 protected abstract registerWorkerMessageListener<
949 Message extends Data | Response
950 >(
951 workerNodeKey: number,
952 listener: (message: MessageValue<Message>) => void
953 ): void
954
955 /**
956 * Method hooked up after a worker node has been newly created.
957 * Can be overridden.
958 *
959 * @param workerNodeKey - The newly created worker node key.
960 */
961 protected afterWorkerNodeSetup (workerNodeKey: number): void {
962 // Listen to worker messages.
963 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
964 // Send the startup message to worker.
965 this.sendStartupMessageToWorker(workerNodeKey)
966 // Send the worker statistics message to worker.
967 this.sendWorkerStatisticsMessageToWorker(workerNodeKey)
968 }
969
970 /**
971 * Sends the startup message to worker given its worker node key.
972 *
973 * @param workerNodeKey - The worker node key.
974 */
975 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
976
977 /**
978 * Sends the worker statistics message to worker given its worker node key.
979 *
980 * @param workerNodeKey - The worker node key.
981 */
982 private sendWorkerStatisticsMessageToWorker (workerNodeKey: number): void {
983 this.sendToWorker(workerNodeKey, {
984 statistics: {
985 runTime:
986 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
987 .runTime.aggregate,
988 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
989 .elu.aggregate
990 },
991 workerId: this.getWorkerInfo(workerNodeKey).id as number
992 })
993 }
994
995 private redistributeQueuedTasks (workerNodeKey: number): void {
996 while (this.tasksQueueSize(workerNodeKey) > 0) {
997 let targetWorkerNodeKey: number = workerNodeKey
998 let minQueuedTasks = Infinity
999 let executeTask = false
1000 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
1001 const workerInfo = this.getWorkerInfo(workerNodeId)
1002 if (
1003 workerNodeId !== workerNodeKey &&
1004 workerInfo.ready &&
1005 workerNode.usage.tasks.queued === 0
1006 ) {
1007 if (workerNode.usage.tasks.executing === 0) {
1008 executeTask = true
1009 }
1010 targetWorkerNodeKey = workerNodeId
1011 break
1012 }
1013 if (
1014 workerNodeId !== workerNodeKey &&
1015 workerInfo.ready &&
1016 workerNode.usage.tasks.queued < minQueuedTasks
1017 ) {
1018 minQueuedTasks = workerNode.usage.tasks.queued
1019 targetWorkerNodeKey = workerNodeId
1020 }
1021 }
1022 if (executeTask) {
1023 this.executeTask(
1024 targetWorkerNodeKey,
1025 this.dequeueTask(workerNodeKey) as Task<Data>
1026 )
1027 } else {
1028 this.enqueueTask(
1029 targetWorkerNodeKey,
1030 this.dequeueTask(workerNodeKey) as Task<Data>
1031 )
1032 }
1033 }
1034 }
1035
1036 /**
1037 * This method is the listener registered for each worker message.
1038 *
1039 * @returns The listener function to execute when a message is received from a worker.
1040 */
1041 protected workerListener (): (message: MessageValue<Response>) => void {
1042 return message => {
1043 this.checkMessageWorkerId(message)
1044 if (message.ready != null) {
1045 // Worker ready response received
1046 this.handleWorkerReadyResponse(message)
1047 } else if (message.id != null) {
1048 // Task execution response received
1049 this.handleTaskExecutionResponse(message)
1050 }
1051 }
1052 }
1053
1054 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
1055 this.getWorkerInfo(
1056 this.getWorkerNodeKeyByWorkerId(message.workerId) as number
1057 ).ready = message.ready as boolean
1058 if (this.emitter != null && this.ready) {
1059 this.emitter.emit(PoolEvents.ready, this.info)
1060 }
1061 }
1062
1063 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1064 const promiseResponse = this.promiseResponseMap.get(message.id as string)
1065 if (promiseResponse != null) {
1066 if (message.taskError != null) {
1067 this.emitter?.emit(PoolEvents.taskError, message.taskError)
1068 promiseResponse.reject(message.taskError.message)
1069 } else {
1070 promiseResponse.resolve(message.data as Response)
1071 }
1072 const workerNodeKey = promiseResponse.workerNodeKey
1073 this.afterTaskExecutionHook(workerNodeKey, message)
1074 this.promiseResponseMap.delete(message.id as string)
1075 if (
1076 this.opts.enableTasksQueue === true &&
1077 this.tasksQueueSize(workerNodeKey) > 0
1078 ) {
1079 this.executeTask(
1080 workerNodeKey,
1081 this.dequeueTask(workerNodeKey) as Task<Data>
1082 )
1083 }
1084 this.workerChoiceStrategyContext.update(workerNodeKey)
1085 }
1086 }
1087
1088 private checkAndEmitEvents (): void {
1089 if (this.emitter != null) {
1090 if (this.busy) {
1091 this.emitter.emit(PoolEvents.busy, this.info)
1092 }
1093 if (this.type === PoolTypes.dynamic && this.full) {
1094 this.emitter.emit(PoolEvents.full, this.info)
1095 }
1096 }
1097 }
1098
1099 /**
1100 * Gets the worker information given its worker node key.
1101 *
1102 * @param workerNodeKey - The worker node key.
1103 * @returns The worker information.
1104 */
1105 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
1106 return this.workerNodes[workerNodeKey].info
1107 }
1108
1109 /**
1110 * Adds the given worker in the pool worker nodes.
1111 *
1112 * @param worker - The worker.
1113 * @returns The added worker node key.
1114 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1115 */
1116 private addWorkerNode (worker: Worker): number {
1117 const workerNode = new WorkerNode<Worker, Data>(worker, this.worker)
1118 // Flag the worker node as ready at pool startup.
1119 if (this.starting) {
1120 workerNode.info.ready = true
1121 }
1122 this.workerNodes.push(workerNode)
1123 const workerNodeKey = this.getWorkerNodeKey(worker)
1124 if (workerNodeKey === -1) {
1125 throw new Error('Worker node not found')
1126 }
1127 return workerNodeKey
1128 }
1129
1130 /**
1131 * Removes the given worker from the pool worker nodes.
1132 *
1133 * @param worker - The worker.
1134 */
1135 private removeWorkerNode (worker: Worker): void {
1136 const workerNodeKey = this.getWorkerNodeKey(worker)
1137 if (workerNodeKey !== -1) {
1138 this.workerNodes.splice(workerNodeKey, 1)
1139 this.workerChoiceStrategyContext.remove(workerNodeKey)
1140 }
1141 }
1142
1143 /**
1144 * Executes the given task on the worker given its worker node key.
1145 *
1146 * @param workerNodeKey - The worker node key.
1147 * @param task - The task to execute.
1148 */
1149 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1150 this.beforeTaskExecutionHook(workerNodeKey, task)
1151 this.sendToWorker(workerNodeKey, task)
1152 }
1153
1154 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1155 return this.workerNodes[workerNodeKey].enqueueTask(task)
1156 }
1157
1158 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1159 return this.workerNodes[workerNodeKey].dequeueTask()
1160 }
1161
1162 private tasksQueueSize (workerNodeKey: number): number {
1163 return this.workerNodes[workerNodeKey].tasksQueueSize()
1164 }
1165
1166 private flushTasksQueue (workerNodeKey: number): void {
1167 while (this.tasksQueueSize(workerNodeKey) > 0) {
1168 this.executeTask(
1169 workerNodeKey,
1170 this.dequeueTask(workerNodeKey) as Task<Data>
1171 )
1172 }
1173 this.workerNodes[workerNodeKey].clearTasksQueue()
1174 }
1175
1176 private flushTasksQueues (): void {
1177 for (const [workerNodeKey] of this.workerNodes.entries()) {
1178 this.flushTasksQueue(workerNodeKey)
1179 }
1180 }
1181 }