feat: internal messaging strict worker id checking
[poolifier.git] / src / pools / abstract-pool.ts
1 import { randomUUID } from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import type { MessageValue, PromiseResponseWrapper } from '../utility-types'
4 import {
5 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
6 EMPTY_FUNCTION,
7 isKillBehavior,
8 isPlainObject,
9 median,
10 round
11 } from '../utils'
12 import { KillBehaviors } from '../worker/worker-options'
13 import {
14 type IPool,
15 PoolEmitter,
16 PoolEvents,
17 type PoolInfo,
18 type PoolOptions,
19 type PoolType,
20 PoolTypes,
21 type TasksQueueOptions
22 } from './pool'
23 import type {
24 IWorker,
25 IWorkerNode,
26 MessageHandler,
27 Task,
28 WorkerInfo,
29 WorkerType,
30 WorkerUsage
31 } from './worker'
32 import {
33 Measurements,
34 WorkerChoiceStrategies,
35 type WorkerChoiceStrategy,
36 type WorkerChoiceStrategyOptions
37 } from './selection-strategies/selection-strategies-types'
38 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
39 import { version } from './version'
40 import { WorkerNode } from './worker-node'
41
42 /**
43 * Base class that implements some shared logic for all poolifier pools.
44 *
45 * @typeParam Worker - Type of worker which manages this pool.
46 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
47 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
48 */
49 export abstract class AbstractPool<
50 Worker extends IWorker,
51 Data = unknown,
52 Response = unknown
53 > implements IPool<Worker, Data, Response> {
54 /** @inheritDoc */
55 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
56
57 /** @inheritDoc */
58 public readonly emitter?: PoolEmitter
59
60 /**
61 * The execution response promise map.
62 *
63 * - `key`: The message id of each submitted task.
64 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
65 *
66 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
67 */
68 protected promiseResponseMap: Map<
69 string,
70 PromiseResponseWrapper<Worker, Response>
71 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
72
73 /**
74 * Worker choice strategy context referencing a worker choice algorithm implementation.
75 */
76 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
77 Worker,
78 Data,
79 Response
80 >
81
82 /**
83 * The start timestamp of the pool.
84 */
85 private readonly startTimestamp
86
87 /**
88 * Constructs a new poolifier pool.
89 *
90 * @param numberOfWorkers - Number of workers that this pool should manage.
91 * @param filePath - Path to the worker file.
92 * @param opts - Options for the pool.
93 */
94 public constructor (
95 protected readonly numberOfWorkers: number,
96 protected readonly filePath: string,
97 protected readonly opts: PoolOptions<Worker>
98 ) {
99 if (!this.isMain()) {
100 throw new Error('Cannot start a pool from a worker!')
101 }
102 this.checkNumberOfWorkers(this.numberOfWorkers)
103 this.checkFilePath(this.filePath)
104 this.checkPoolOptions(this.opts)
105
106 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
107 this.executeTask = this.executeTask.bind(this)
108 this.enqueueTask = this.enqueueTask.bind(this)
109 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
110
111 if (this.opts.enableEvents === true) {
112 this.emitter = new PoolEmitter()
113 }
114 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
115 Worker,
116 Data,
117 Response
118 >(
119 this,
120 this.opts.workerChoiceStrategy,
121 this.opts.workerChoiceStrategyOptions
122 )
123
124 this.setupHook()
125
126 while (this.workerNodes.length < this.numberOfWorkers) {
127 this.createAndSetupWorker()
128 }
129
130 this.startTimestamp = performance.now()
131 }
132
133 private checkFilePath (filePath: string): void {
134 if (
135 filePath == null ||
136 (typeof filePath === 'string' && filePath.trim().length === 0)
137 ) {
138 throw new Error('Please specify a file with a worker implementation')
139 }
140 }
141
142 private checkNumberOfWorkers (numberOfWorkers: number): void {
143 if (numberOfWorkers == null) {
144 throw new Error(
145 'Cannot instantiate a pool without specifying the number of workers'
146 )
147 } else if (!Number.isSafeInteger(numberOfWorkers)) {
148 throw new TypeError(
149 'Cannot instantiate a pool with a non safe integer number of workers'
150 )
151 } else if (numberOfWorkers < 0) {
152 throw new RangeError(
153 'Cannot instantiate a pool with a negative number of workers'
154 )
155 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
156 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
157 }
158 }
159
160 protected checkDynamicPoolSize (min: number, max: number): void {
161 if (this.type === PoolTypes.dynamic && min > max) {
162 throw new RangeError(
163 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
164 )
165 } else if (this.type === PoolTypes.dynamic && min === 0 && max === 0) {
166 throw new RangeError(
167 'Cannot instantiate a dynamic pool with a minimum pool size and a maximum pool size equal to zero'
168 )
169 } else if (this.type === PoolTypes.dynamic && min === max) {
170 throw new RangeError(
171 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
172 )
173 }
174 }
175
176 private checkPoolOptions (opts: PoolOptions<Worker>): void {
177 if (isPlainObject(opts)) {
178 this.opts.workerChoiceStrategy =
179 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
180 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
181 this.opts.workerChoiceStrategyOptions =
182 opts.workerChoiceStrategyOptions ??
183 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
184 this.checkValidWorkerChoiceStrategyOptions(
185 this.opts.workerChoiceStrategyOptions
186 )
187 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
188 this.opts.enableEvents = opts.enableEvents ?? true
189 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
190 if (this.opts.enableTasksQueue) {
191 this.checkValidTasksQueueOptions(
192 opts.tasksQueueOptions as TasksQueueOptions
193 )
194 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
195 opts.tasksQueueOptions as TasksQueueOptions
196 )
197 }
198 } else {
199 throw new TypeError('Invalid pool options: must be a plain object')
200 }
201 }
202
203 private checkValidWorkerChoiceStrategy (
204 workerChoiceStrategy: WorkerChoiceStrategy
205 ): void {
206 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
207 throw new Error(
208 `Invalid worker choice strategy '${workerChoiceStrategy}'`
209 )
210 }
211 }
212
213 private checkValidWorkerChoiceStrategyOptions (
214 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
215 ): void {
216 if (!isPlainObject(workerChoiceStrategyOptions)) {
217 throw new TypeError(
218 'Invalid worker choice strategy options: must be a plain object'
219 )
220 }
221 if (
222 workerChoiceStrategyOptions.weights != null &&
223 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
224 ) {
225 throw new Error(
226 'Invalid worker choice strategy options: must have a weight for each worker node'
227 )
228 }
229 if (
230 workerChoiceStrategyOptions.measurement != null &&
231 !Object.values(Measurements).includes(
232 workerChoiceStrategyOptions.measurement
233 )
234 ) {
235 throw new Error(
236 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
237 )
238 }
239 }
240
241 private checkValidTasksQueueOptions (
242 tasksQueueOptions: TasksQueueOptions
243 ): void {
244 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
245 throw new TypeError('Invalid tasks queue options: must be a plain object')
246 }
247 if (
248 tasksQueueOptions?.concurrency != null &&
249 !Number.isSafeInteger(tasksQueueOptions.concurrency)
250 ) {
251 throw new TypeError(
252 'Invalid worker tasks concurrency: must be an integer'
253 )
254 }
255 if (
256 tasksQueueOptions?.concurrency != null &&
257 tasksQueueOptions.concurrency <= 0
258 ) {
259 throw new Error(
260 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
261 )
262 }
263 }
264
265 /** @inheritDoc */
266 public get info (): PoolInfo {
267 return {
268 version,
269 type: this.type,
270 worker: this.worker,
271 ready: this.ready,
272 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
273 minSize: this.minSize,
274 maxSize: this.maxSize,
275 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
276 .runTime.aggregate &&
277 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
278 .waitTime.aggregate && { utilization: round(this.utilization) }),
279 workerNodes: this.workerNodes.length,
280 idleWorkerNodes: this.workerNodes.reduce(
281 (accumulator, workerNode) =>
282 workerNode.usage.tasks.executing === 0
283 ? accumulator + 1
284 : accumulator,
285 0
286 ),
287 busyWorkerNodes: this.workerNodes.reduce(
288 (accumulator, workerNode) =>
289 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
290 0
291 ),
292 executedTasks: this.workerNodes.reduce(
293 (accumulator, workerNode) =>
294 accumulator + workerNode.usage.tasks.executed,
295 0
296 ),
297 executingTasks: this.workerNodes.reduce(
298 (accumulator, workerNode) =>
299 accumulator + workerNode.usage.tasks.executing,
300 0
301 ),
302 queuedTasks: this.workerNodes.reduce(
303 (accumulator, workerNode) =>
304 accumulator + workerNode.usage.tasks.queued,
305 0
306 ),
307 maxQueuedTasks: this.workerNodes.reduce(
308 (accumulator, workerNode) =>
309 accumulator + workerNode.usage.tasks.maxQueued,
310 0
311 ),
312 failedTasks: this.workerNodes.reduce(
313 (accumulator, workerNode) =>
314 accumulator + workerNode.usage.tasks.failed,
315 0
316 ),
317 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
318 .runTime.aggregate && {
319 runTime: {
320 minimum: round(
321 Math.min(
322 ...this.workerNodes.map(
323 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
324 )
325 )
326 ),
327 maximum: round(
328 Math.max(
329 ...this.workerNodes.map(
330 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
331 )
332 )
333 ),
334 average: round(
335 this.workerNodes.reduce(
336 (accumulator, workerNode) =>
337 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
338 0
339 ) /
340 this.workerNodes.reduce(
341 (accumulator, workerNode) =>
342 accumulator + (workerNode.usage.tasks?.executed ?? 0),
343 0
344 )
345 ),
346 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
347 .runTime.median && {
348 median: round(
349 median(
350 this.workerNodes.map(
351 workerNode => workerNode.usage.runTime?.median ?? 0
352 )
353 )
354 )
355 })
356 }
357 }),
358 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
359 .waitTime.aggregate && {
360 waitTime: {
361 minimum: round(
362 Math.min(
363 ...this.workerNodes.map(
364 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
365 )
366 )
367 ),
368 maximum: round(
369 Math.max(
370 ...this.workerNodes.map(
371 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
372 )
373 )
374 ),
375 average: round(
376 this.workerNodes.reduce(
377 (accumulator, workerNode) =>
378 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
379 0
380 ) /
381 this.workerNodes.reduce(
382 (accumulator, workerNode) =>
383 accumulator + (workerNode.usage.tasks?.executed ?? 0),
384 0
385 )
386 ),
387 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
388 .waitTime.median && {
389 median: round(
390 median(
391 this.workerNodes.map(
392 workerNode => workerNode.usage.waitTime?.median ?? 0
393 )
394 )
395 )
396 })
397 }
398 })
399 }
400 }
401
402 private get starting (): boolean {
403 return (
404 !this.full ||
405 (this.full && this.workerNodes.some(workerNode => !workerNode.info.ready))
406 )
407 }
408
409 private get ready (): boolean {
410 return (
411 this.full && this.workerNodes.every(workerNode => workerNode.info.ready)
412 )
413 }
414
415 /**
416 * Gets the approximate pool utilization.
417 *
418 * @returns The pool utilization.
419 */
420 private get utilization (): number {
421 const poolRunTimeCapacity =
422 (performance.now() - this.startTimestamp) * this.maxSize
423 const totalTasksRunTime = this.workerNodes.reduce(
424 (accumulator, workerNode) =>
425 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
426 0
427 )
428 const totalTasksWaitTime = this.workerNodes.reduce(
429 (accumulator, workerNode) =>
430 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
431 0
432 )
433 return (totalTasksRunTime + totalTasksWaitTime) / poolRunTimeCapacity
434 }
435
436 /**
437 * Pool type.
438 *
439 * If it is `'dynamic'`, it provides the `max` property.
440 */
441 protected abstract get type (): PoolType
442
443 /**
444 * Gets the worker type.
445 */
446 protected abstract get worker (): WorkerType
447
448 /**
449 * Pool minimum size.
450 */
451 protected abstract get minSize (): number
452
453 /**
454 * Pool maximum size.
455 */
456 protected abstract get maxSize (): number
457
458 /**
459 * Get the worker given its id.
460 *
461 * @param workerId - The worker id.
462 * @returns The worker if found in the pool worker nodes, `undefined` otherwise.
463 */
464 private getWorkerById (workerId: number): Worker | undefined {
465 return this.workerNodes.find(workerNode => workerNode.info.id === workerId)
466 ?.worker
467 }
468
469 private checkMessageWorkerId (message: MessageValue<Response>): void {
470 if (
471 message.workerId != null &&
472 this.getWorkerById(message.workerId) == null
473 ) {
474 throw new Error(
475 `Worker message received from unknown worker '${message.workerId}'`
476 )
477 }
478 }
479
480 /**
481 * Gets the given worker its worker node key.
482 *
483 * @param worker - The worker.
484 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
485 */
486 private getWorkerNodeKey (worker: Worker): number {
487 return this.workerNodes.findIndex(
488 workerNode => workerNode.worker === worker
489 )
490 }
491
492 /** @inheritDoc */
493 public setWorkerChoiceStrategy (
494 workerChoiceStrategy: WorkerChoiceStrategy,
495 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
496 ): void {
497 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
498 this.opts.workerChoiceStrategy = workerChoiceStrategy
499 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
500 this.opts.workerChoiceStrategy
501 )
502 if (workerChoiceStrategyOptions != null) {
503 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
504 }
505 for (const workerNode of this.workerNodes) {
506 workerNode.resetUsage()
507 this.setWorkerStatistics(workerNode.worker)
508 }
509 }
510
511 /** @inheritDoc */
512 public setWorkerChoiceStrategyOptions (
513 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
514 ): void {
515 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
516 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
517 this.workerChoiceStrategyContext.setOptions(
518 this.opts.workerChoiceStrategyOptions
519 )
520 }
521
522 /** @inheritDoc */
523 public enableTasksQueue (
524 enable: boolean,
525 tasksQueueOptions?: TasksQueueOptions
526 ): void {
527 if (this.opts.enableTasksQueue === true && !enable) {
528 this.flushTasksQueues()
529 }
530 this.opts.enableTasksQueue = enable
531 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
532 }
533
534 /** @inheritDoc */
535 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
536 if (this.opts.enableTasksQueue === true) {
537 this.checkValidTasksQueueOptions(tasksQueueOptions)
538 this.opts.tasksQueueOptions =
539 this.buildTasksQueueOptions(tasksQueueOptions)
540 } else if (this.opts.tasksQueueOptions != null) {
541 delete this.opts.tasksQueueOptions
542 }
543 }
544
545 private buildTasksQueueOptions (
546 tasksQueueOptions: TasksQueueOptions
547 ): TasksQueueOptions {
548 return {
549 concurrency: tasksQueueOptions?.concurrency ?? 1
550 }
551 }
552
553 /**
554 * Whether the pool is full or not.
555 *
556 * The pool filling boolean status.
557 */
558 protected get full (): boolean {
559 return this.workerNodes.length >= this.maxSize
560 }
561
562 /**
563 * Whether the pool is busy or not.
564 *
565 * The pool busyness boolean status.
566 */
567 protected abstract get busy (): boolean
568
569 /**
570 * Whether worker nodes are executing at least one task.
571 *
572 * @returns Worker nodes busyness boolean status.
573 */
574 protected internalBusy (): boolean {
575 return (
576 this.workerNodes.findIndex(workerNode => {
577 return workerNode.usage.tasks.executing === 0
578 }) === -1
579 )
580 }
581
582 /** @inheritDoc */
583 public async execute (data?: Data, name?: string): Promise<Response> {
584 const timestamp = performance.now()
585 const workerNodeKey = this.chooseWorkerNode()
586 const submittedTask: Task<Data> = {
587 name,
588 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
589 data: data ?? ({} as Data),
590 timestamp,
591 workerId: this.getWorkerInfo(workerNodeKey).id as number,
592 id: randomUUID()
593 }
594 const res = new Promise<Response>((resolve, reject) => {
595 this.promiseResponseMap.set(submittedTask.id as string, {
596 resolve,
597 reject,
598 worker: this.workerNodes[workerNodeKey].worker
599 })
600 })
601 if (
602 this.opts.enableTasksQueue === true &&
603 (this.busy ||
604 this.workerNodes[workerNodeKey].usage.tasks.executing >=
605 ((this.opts.tasksQueueOptions as TasksQueueOptions)
606 .concurrency as number))
607 ) {
608 this.enqueueTask(workerNodeKey, submittedTask)
609 } else {
610 this.executeTask(workerNodeKey, submittedTask)
611 }
612 this.checkAndEmitEvents()
613 // eslint-disable-next-line @typescript-eslint/return-await
614 return res
615 }
616
617 /** @inheritDoc */
618 public async destroy (): Promise<void> {
619 await Promise.all(
620 this.workerNodes.map(async (workerNode, workerNodeKey) => {
621 this.flushTasksQueue(workerNodeKey)
622 // FIXME: wait for tasks to be finished
623 const workerExitPromise = new Promise<void>(resolve => {
624 workerNode.worker.on('exit', () => {
625 resolve()
626 })
627 })
628 await this.destroyWorker(workerNode.worker)
629 await workerExitPromise
630 })
631 )
632 }
633
634 /**
635 * Terminates the given worker.
636 *
637 * @param worker - A worker within `workerNodes`.
638 */
639 protected abstract destroyWorker (worker: Worker): void | Promise<void>
640
641 /**
642 * Setup hook to execute code before worker nodes are created in the abstract constructor.
643 * Can be overridden.
644 *
645 * @virtual
646 */
647 protected setupHook (): void {
648 // Intentionally empty
649 }
650
651 /**
652 * Should return whether the worker is the main worker or not.
653 */
654 protected abstract isMain (): boolean
655
656 /**
657 * Hook executed before the worker task execution.
658 * Can be overridden.
659 *
660 * @param workerNodeKey - The worker node key.
661 * @param task - The task to execute.
662 */
663 protected beforeTaskExecutionHook (
664 workerNodeKey: number,
665 task: Task<Data>
666 ): void {
667 const workerUsage = this.workerNodes[workerNodeKey].usage
668 ++workerUsage.tasks.executing
669 this.updateWaitTimeWorkerUsage(workerUsage, task)
670 }
671
672 /**
673 * Hook executed after the worker task execution.
674 * Can be overridden.
675 *
676 * @param worker - The worker.
677 * @param message - The received message.
678 */
679 protected afterTaskExecutionHook (
680 worker: Worker,
681 message: MessageValue<Response>
682 ): void {
683 const workerUsage = this.workerNodes[this.getWorkerNodeKey(worker)].usage
684 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
685 this.updateRunTimeWorkerUsage(workerUsage, message)
686 this.updateEluWorkerUsage(workerUsage, message)
687 }
688
689 private updateTaskStatisticsWorkerUsage (
690 workerUsage: WorkerUsage,
691 message: MessageValue<Response>
692 ): void {
693 const workerTaskStatistics = workerUsage.tasks
694 --workerTaskStatistics.executing
695 if (message.taskError == null) {
696 ++workerTaskStatistics.executed
697 } else {
698 ++workerTaskStatistics.failed
699 }
700 }
701
702 private updateRunTimeWorkerUsage (
703 workerUsage: WorkerUsage,
704 message: MessageValue<Response>
705 ): void {
706 if (
707 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
708 .aggregate
709 ) {
710 const taskRunTime = message.taskPerformance?.runTime ?? 0
711 workerUsage.runTime.aggregate =
712 (workerUsage.runTime.aggregate ?? 0) + taskRunTime
713 workerUsage.runTime.minimum = Math.min(
714 taskRunTime,
715 workerUsage.runTime?.minimum ?? Infinity
716 )
717 workerUsage.runTime.maximum = Math.max(
718 taskRunTime,
719 workerUsage.runTime?.maximum ?? -Infinity
720 )
721 if (
722 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
723 .average &&
724 workerUsage.tasks.executed !== 0
725 ) {
726 workerUsage.runTime.average =
727 workerUsage.runTime.aggregate / workerUsage.tasks.executed
728 }
729 if (
730 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
731 .median &&
732 message.taskPerformance?.runTime != null
733 ) {
734 workerUsage.runTime.history.push(message.taskPerformance.runTime)
735 workerUsage.runTime.median = median(workerUsage.runTime.history)
736 }
737 }
738 }
739
740 private updateWaitTimeWorkerUsage (
741 workerUsage: WorkerUsage,
742 task: Task<Data>
743 ): void {
744 const timestamp = performance.now()
745 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
746 if (
747 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime
748 .aggregate
749 ) {
750 workerUsage.waitTime.aggregate =
751 (workerUsage.waitTime?.aggregate ?? 0) + taskWaitTime
752 workerUsage.waitTime.minimum = Math.min(
753 taskWaitTime,
754 workerUsage.waitTime?.minimum ?? Infinity
755 )
756 workerUsage.waitTime.maximum = Math.max(
757 taskWaitTime,
758 workerUsage.waitTime?.maximum ?? -Infinity
759 )
760 if (
761 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
762 .waitTime.average &&
763 workerUsage.tasks.executed !== 0
764 ) {
765 workerUsage.waitTime.average =
766 workerUsage.waitTime.aggregate / workerUsage.tasks.executed
767 }
768 if (
769 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
770 .waitTime.median
771 ) {
772 workerUsage.waitTime.history.push(taskWaitTime)
773 workerUsage.waitTime.median = median(workerUsage.waitTime.history)
774 }
775 }
776 }
777
778 private updateEluWorkerUsage (
779 workerUsage: WorkerUsage,
780 message: MessageValue<Response>
781 ): void {
782 if (
783 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
784 .aggregate
785 ) {
786 if (message.taskPerformance?.elu != null) {
787 workerUsage.elu.idle.aggregate =
788 (workerUsage.elu.idle?.aggregate ?? 0) +
789 message.taskPerformance.elu.idle
790 workerUsage.elu.active.aggregate =
791 (workerUsage.elu.active?.aggregate ?? 0) +
792 message.taskPerformance.elu.active
793 if (workerUsage.elu.utilization != null) {
794 workerUsage.elu.utilization =
795 (workerUsage.elu.utilization +
796 message.taskPerformance.elu.utilization) /
797 2
798 } else {
799 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
800 }
801 workerUsage.elu.idle.minimum = Math.min(
802 message.taskPerformance.elu.idle,
803 workerUsage.elu.idle?.minimum ?? Infinity
804 )
805 workerUsage.elu.idle.maximum = Math.max(
806 message.taskPerformance.elu.idle,
807 workerUsage.elu.idle?.maximum ?? -Infinity
808 )
809 workerUsage.elu.active.minimum = Math.min(
810 message.taskPerformance.elu.active,
811 workerUsage.elu.active?.minimum ?? Infinity
812 )
813 workerUsage.elu.active.maximum = Math.max(
814 message.taskPerformance.elu.active,
815 workerUsage.elu.active?.maximum ?? -Infinity
816 )
817 if (
818 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
819 .average &&
820 workerUsage.tasks.executed !== 0
821 ) {
822 workerUsage.elu.idle.average =
823 workerUsage.elu.idle.aggregate / workerUsage.tasks.executed
824 workerUsage.elu.active.average =
825 workerUsage.elu.active.aggregate / workerUsage.tasks.executed
826 }
827 if (
828 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
829 .median
830 ) {
831 workerUsage.elu.idle.history.push(message.taskPerformance.elu.idle)
832 workerUsage.elu.active.history.push(
833 message.taskPerformance.elu.active
834 )
835 workerUsage.elu.idle.median = median(workerUsage.elu.idle.history)
836 workerUsage.elu.active.median = median(workerUsage.elu.active.history)
837 }
838 }
839 }
840 }
841
842 /**
843 * Chooses a worker node for the next task.
844 *
845 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
846 *
847 * @returns The worker node key
848 */
849 private chooseWorkerNode (): number {
850 if (this.shallCreateDynamicWorker()) {
851 const worker = this.createAndSetupDynamicWorker()
852 if (
853 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
854 ) {
855 return this.getWorkerNodeKey(worker)
856 }
857 }
858 return this.workerChoiceStrategyContext.execute()
859 }
860
861 /**
862 * Conditions for dynamic worker creation.
863 *
864 * @returns Whether to create a dynamic worker or not.
865 */
866 private shallCreateDynamicWorker (): boolean {
867 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
868 }
869
870 /**
871 * Sends a message to the given worker.
872 *
873 * @param worker - The worker which should receive the message.
874 * @param message - The message.
875 */
876 protected abstract sendToWorker (
877 worker: Worker,
878 message: MessageValue<Data>
879 ): void
880
881 /**
882 * Registers a listener callback on the given worker.
883 *
884 * @param worker - The worker which should register a listener.
885 * @param listener - The message listener callback.
886 */
887 private registerWorkerMessageListener<Message extends Data | Response>(
888 worker: Worker,
889 listener: (message: MessageValue<Message>) => void
890 ): void {
891 worker.on('message', listener as MessageHandler<Worker>)
892 }
893
894 /**
895 * Creates a new worker.
896 *
897 * @returns Newly created worker.
898 */
899 protected abstract createWorker (): Worker
900
901 /**
902 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
903 * Can be overridden.
904 *
905 * @param worker - The newly created worker.
906 */
907 protected afterWorkerSetup (worker: Worker): void {
908 // Listen to worker messages.
909 this.registerWorkerMessageListener(worker, this.workerListener())
910 // Send startup message to worker.
911 this.sendToWorker(worker, {
912 ready: false,
913 workerId: this.getWorkerInfo(this.getWorkerNodeKey(worker)).id as number
914 })
915 // Setup worker task statistics computation.
916 this.setWorkerStatistics(worker)
917 }
918
919 /**
920 * Creates a new worker and sets it up completely in the pool worker nodes.
921 *
922 * @returns New, completely set up worker.
923 */
924 protected createAndSetupWorker (): Worker {
925 const worker = this.createWorker()
926
927 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
928 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
929 worker.on('error', error => {
930 if (this.emitter != null) {
931 this.emitter.emit(PoolEvents.error, error)
932 }
933 if (this.opts.enableTasksQueue === true) {
934 this.redistributeQueuedTasks(worker)
935 }
936 if (this.opts.restartWorkerOnError === true && !this.starting) {
937 if (this.getWorkerInfo(this.getWorkerNodeKey(worker)).dynamic) {
938 this.createAndSetupDynamicWorker()
939 } else {
940 this.createAndSetupWorker()
941 }
942 }
943 })
944 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
945 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
946 worker.once('exit', () => {
947 this.removeWorkerNode(worker)
948 })
949
950 this.pushWorkerNode(worker)
951
952 this.afterWorkerSetup(worker)
953
954 return worker
955 }
956
957 private redistributeQueuedTasks (worker: Worker): void {
958 const workerNodeKey = this.getWorkerNodeKey(worker)
959 while (this.tasksQueueSize(workerNodeKey) > 0) {
960 let targetWorkerNodeKey: number = workerNodeKey
961 let minQueuedTasks = Infinity
962 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
963 if (
964 workerNodeId !== workerNodeKey &&
965 workerNode.usage.tasks.queued === 0
966 ) {
967 targetWorkerNodeKey = workerNodeId
968 break
969 }
970 if (
971 workerNodeId !== workerNodeKey &&
972 workerNode.usage.tasks.queued < minQueuedTasks
973 ) {
974 minQueuedTasks = workerNode.usage.tasks.queued
975 targetWorkerNodeKey = workerNodeId
976 }
977 }
978 this.enqueueTask(
979 targetWorkerNodeKey,
980 this.dequeueTask(workerNodeKey) as Task<Data>
981 )
982 }
983 }
984
985 /**
986 * Creates a new dynamic worker and sets it up completely in the pool worker nodes.
987 *
988 * @returns New, completely set up dynamic worker.
989 */
990 protected createAndSetupDynamicWorker (): Worker {
991 const worker = this.createAndSetupWorker()
992 this.registerWorkerMessageListener(worker, message => {
993 const workerNodeKey = this.getWorkerNodeKey(worker)
994 if (
995 isKillBehavior(KillBehaviors.HARD, message.kill) ||
996 (message.kill != null &&
997 ((this.opts.enableTasksQueue === false &&
998 this.workerNodes[workerNodeKey].usage.tasks.executing === 0) ||
999 (this.opts.enableTasksQueue === true &&
1000 this.workerNodes[workerNodeKey].usage.tasks.executing === 0 &&
1001 this.tasksQueueSize(workerNodeKey) === 0)))
1002 ) {
1003 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
1004 void (this.destroyWorker(worker) as Promise<void>)
1005 }
1006 })
1007 const workerInfo = this.getWorkerInfo(this.getWorkerNodeKey(worker))
1008 workerInfo.dynamic = true
1009 this.sendToWorker(worker, {
1010 checkAlive: true,
1011 workerId: workerInfo.id as number
1012 })
1013 return worker
1014 }
1015
1016 /**
1017 * This function is the listener registered for each worker message.
1018 *
1019 * @returns The listener function to execute when a message is received from a worker.
1020 */
1021 protected workerListener (): (message: MessageValue<Response>) => void {
1022 return message => {
1023 this.checkMessageWorkerId(message)
1024 if (message.ready != null && message.workerId != null) {
1025 // Worker ready message received
1026 this.handleWorkerReadyMessage(message)
1027 } else if (message.id != null) {
1028 // Task execution response received
1029 this.handleTaskExecutionResponse(message)
1030 }
1031 }
1032 }
1033
1034 private handleWorkerReadyMessage (message: MessageValue<Response>): void {
1035 const worker = this.getWorkerById(message.workerId)
1036 this.getWorkerInfo(this.getWorkerNodeKey(worker as Worker)).ready =
1037 message.ready as boolean
1038 if (this.emitter != null && this.ready) {
1039 this.emitter.emit(PoolEvents.ready, this.info)
1040 }
1041 }
1042
1043 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1044 const promiseResponse = this.promiseResponseMap.get(message.id as string)
1045 if (promiseResponse != null) {
1046 if (message.taskError != null) {
1047 if (this.emitter != null) {
1048 this.emitter.emit(PoolEvents.taskError, message.taskError)
1049 }
1050 promiseResponse.reject(message.taskError.message)
1051 } else {
1052 promiseResponse.resolve(message.data as Response)
1053 }
1054 this.afterTaskExecutionHook(promiseResponse.worker, message)
1055 this.promiseResponseMap.delete(message.id as string)
1056 const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
1057 if (
1058 this.opts.enableTasksQueue === true &&
1059 this.tasksQueueSize(workerNodeKey) > 0
1060 ) {
1061 this.executeTask(
1062 workerNodeKey,
1063 this.dequeueTask(workerNodeKey) as Task<Data>
1064 )
1065 }
1066 this.workerChoiceStrategyContext.update(workerNodeKey)
1067 }
1068 }
1069
1070 private checkAndEmitEvents (): void {
1071 if (this.emitter != null) {
1072 if (this.busy) {
1073 this.emitter.emit(PoolEvents.busy, this.info)
1074 }
1075 if (this.type === PoolTypes.dynamic && this.full) {
1076 this.emitter.emit(PoolEvents.full, this.info)
1077 }
1078 }
1079 }
1080
1081 /**
1082 * Gets the worker information.
1083 *
1084 * @param workerNodeKey - The worker node key.
1085 */
1086 private getWorkerInfo (workerNodeKey: number): WorkerInfo {
1087 return this.workerNodes[workerNodeKey].info
1088 }
1089
1090 /**
1091 * Pushes the given worker in the pool worker nodes.
1092 *
1093 * @param worker - The worker.
1094 * @returns The worker nodes length.
1095 */
1096 private pushWorkerNode (worker: Worker): number {
1097 return this.workerNodes.push(new WorkerNode(worker, this.worker))
1098 }
1099
1100 /**
1101 * Removes the given worker from the pool worker nodes.
1102 *
1103 * @param worker - The worker.
1104 */
1105 private removeWorkerNode (worker: Worker): void {
1106 const workerNodeKey = this.getWorkerNodeKey(worker)
1107 if (workerNodeKey !== -1) {
1108 this.workerNodes.splice(workerNodeKey, 1)
1109 this.workerChoiceStrategyContext.remove(workerNodeKey)
1110 }
1111 }
1112
1113 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1114 this.beforeTaskExecutionHook(workerNodeKey, task)
1115 this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
1116 }
1117
1118 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1119 return this.workerNodes[workerNodeKey].enqueueTask(task)
1120 }
1121
1122 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1123 return this.workerNodes[workerNodeKey].dequeueTask()
1124 }
1125
1126 private tasksQueueSize (workerNodeKey: number): number {
1127 return this.workerNodes[workerNodeKey].tasksQueueSize()
1128 }
1129
1130 private flushTasksQueue (workerNodeKey: number): void {
1131 while (this.tasksQueueSize(workerNodeKey) > 0) {
1132 this.executeTask(
1133 workerNodeKey,
1134 this.dequeueTask(workerNodeKey) as Task<Data>
1135 )
1136 }
1137 this.workerNodes[workerNodeKey].clearTasksQueue()
1138 }
1139
1140 private flushTasksQueues (): void {
1141 for (const [workerNodeKey] of this.workerNodes.entries()) {
1142 this.flushTasksQueue(workerNodeKey)
1143 }
1144 }
1145
1146 private setWorkerStatistics (worker: Worker): void {
1147 this.sendToWorker(worker, {
1148 statistics: {
1149 runTime:
1150 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1151 .runTime.aggregate,
1152 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1153 .elu.aggregate
1154 },
1155 workerId: this.getWorkerInfo(this.getWorkerNodeKey(worker)).id as number
1156 })
1157 }
1158 }