feat: add task statistics to pool info: runTime and waitTime
[poolifier.git] / src / pools / abstract-pool.ts
1 import { randomUUID } from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import type { MessageValue, PromiseResponseWrapper } from '../utility-types'
4 import {
5 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
6 EMPTY_FUNCTION,
7 isKillBehavior,
8 isPlainObject,
9 median,
10 round
11 } from '../utils'
12 import { KillBehaviors } from '../worker/worker-options'
13 import { CircularArray } from '../circular-array'
14 import { Queue } from '../queue'
15 import {
16 type IPool,
17 PoolEmitter,
18 PoolEvents,
19 type PoolInfo,
20 type PoolOptions,
21 type PoolType,
22 PoolTypes,
23 type TasksQueueOptions,
24 type WorkerType,
25 WorkerTypes
26 } from './pool'
27 import type {
28 IWorker,
29 MessageHandler,
30 Task,
31 WorkerInfo,
32 WorkerNode,
33 WorkerUsage
34 } from './worker'
35 import {
36 Measurements,
37 WorkerChoiceStrategies,
38 type WorkerChoiceStrategy,
39 type WorkerChoiceStrategyOptions
40 } from './selection-strategies/selection-strategies-types'
41 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
42 import { version } from './version'
43
44 /**
45 * Base class that implements some shared logic for all poolifier pools.
46 *
47 * @typeParam Worker - Type of worker which manages this pool.
48 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
49 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
50 */
51 export abstract class AbstractPool<
52 Worker extends IWorker,
53 Data = unknown,
54 Response = unknown
55 > implements IPool<Worker, Data, Response> {
56 /** @inheritDoc */
57 public readonly workerNodes: Array<WorkerNode<Worker, Data>> = []
58
59 /** @inheritDoc */
60 public readonly emitter?: PoolEmitter
61
62 /**
63 * The execution response promise map.
64 *
65 * - `key`: The message id of each submitted task.
66 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
67 *
68 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
69 */
70 protected promiseResponseMap: Map<
71 string,
72 PromiseResponseWrapper<Worker, Response>
73 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
74
75 /**
76 * Worker choice strategy context referencing a worker choice algorithm implementation.
77 */
78 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
79 Worker,
80 Data,
81 Response
82 >
83
84 /**
85 * The start timestamp of the pool.
86 */
87 private readonly startTimestamp
88
89 /**
90 * Constructs a new poolifier pool.
91 *
92 * @param numberOfWorkers - Number of workers that this pool should manage.
93 * @param filePath - Path to the worker file.
94 * @param opts - Options for the pool.
95 */
96 public constructor (
97 protected readonly numberOfWorkers: number,
98 protected readonly filePath: string,
99 protected readonly opts: PoolOptions<Worker>
100 ) {
101 if (!this.isMain()) {
102 throw new Error('Cannot start a pool from a worker!')
103 }
104 this.checkNumberOfWorkers(this.numberOfWorkers)
105 this.checkFilePath(this.filePath)
106 this.checkPoolOptions(this.opts)
107
108 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
109 this.executeTask = this.executeTask.bind(this)
110 this.enqueueTask = this.enqueueTask.bind(this)
111 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
112
113 if (this.opts.enableEvents === true) {
114 this.emitter = new PoolEmitter()
115 }
116 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
117 Worker,
118 Data,
119 Response
120 >(
121 this,
122 this.opts.workerChoiceStrategy,
123 this.opts.workerChoiceStrategyOptions
124 )
125
126 this.setupHook()
127
128 while (this.workerNodes.length < this.numberOfWorkers) {
129 this.createAndSetupWorker()
130 }
131
132 this.startTimestamp = performance.now()
133 }
134
135 private checkFilePath (filePath: string): void {
136 if (
137 filePath == null ||
138 (typeof filePath === 'string' && filePath.trim().length === 0)
139 ) {
140 throw new Error('Please specify a file with a worker implementation')
141 }
142 }
143
144 private checkNumberOfWorkers (numberOfWorkers: number): void {
145 if (numberOfWorkers == null) {
146 throw new Error(
147 'Cannot instantiate a pool without specifying the number of workers'
148 )
149 } else if (!Number.isSafeInteger(numberOfWorkers)) {
150 throw new TypeError(
151 'Cannot instantiate a pool with a non safe integer number of workers'
152 )
153 } else if (numberOfWorkers < 0) {
154 throw new RangeError(
155 'Cannot instantiate a pool with a negative number of workers'
156 )
157 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
158 throw new Error('Cannot instantiate a fixed pool with no worker')
159 }
160 }
161
162 private checkPoolOptions (opts: PoolOptions<Worker>): void {
163 if (isPlainObject(opts)) {
164 this.opts.workerChoiceStrategy =
165 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
166 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
167 this.opts.workerChoiceStrategyOptions =
168 opts.workerChoiceStrategyOptions ??
169 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
170 this.checkValidWorkerChoiceStrategyOptions(
171 this.opts.workerChoiceStrategyOptions
172 )
173 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
174 this.opts.enableEvents = opts.enableEvents ?? true
175 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
176 if (this.opts.enableTasksQueue) {
177 this.checkValidTasksQueueOptions(
178 opts.tasksQueueOptions as TasksQueueOptions
179 )
180 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
181 opts.tasksQueueOptions as TasksQueueOptions
182 )
183 }
184 } else {
185 throw new TypeError('Invalid pool options: must be a plain object')
186 }
187 }
188
189 private checkValidWorkerChoiceStrategy (
190 workerChoiceStrategy: WorkerChoiceStrategy
191 ): void {
192 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
193 throw new Error(
194 `Invalid worker choice strategy '${workerChoiceStrategy}'`
195 )
196 }
197 }
198
199 private checkValidWorkerChoiceStrategyOptions (
200 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
201 ): void {
202 if (!isPlainObject(workerChoiceStrategyOptions)) {
203 throw new TypeError(
204 'Invalid worker choice strategy options: must be a plain object'
205 )
206 }
207 if (
208 workerChoiceStrategyOptions.weights != null &&
209 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
210 ) {
211 throw new Error(
212 'Invalid worker choice strategy options: must have a weight for each worker node'
213 )
214 }
215 if (
216 workerChoiceStrategyOptions.measurement != null &&
217 !Object.values(Measurements).includes(
218 workerChoiceStrategyOptions.measurement
219 )
220 ) {
221 throw new Error(
222 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
223 )
224 }
225 }
226
227 private checkValidTasksQueueOptions (
228 tasksQueueOptions: TasksQueueOptions
229 ): void {
230 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
231 throw new TypeError('Invalid tasks queue options: must be a plain object')
232 }
233 if (
234 tasksQueueOptions?.concurrency != null &&
235 !Number.isSafeInteger(tasksQueueOptions.concurrency)
236 ) {
237 throw new TypeError(
238 'Invalid worker tasks concurrency: must be an integer'
239 )
240 }
241 if (
242 tasksQueueOptions?.concurrency != null &&
243 tasksQueueOptions.concurrency <= 0
244 ) {
245 throw new Error(
246 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
247 )
248 }
249 }
250
251 /** @inheritDoc */
252 public get info (): PoolInfo {
253 return {
254 version,
255 type: this.type,
256 worker: this.worker,
257 minSize: this.minSize,
258 maxSize: this.maxSize,
259 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
260 .runTime.aggregate &&
261 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
262 .waitTime.aggregate && { utilization: round(this.utilization) }),
263 workerNodes: this.workerNodes.length,
264 idleWorkerNodes: this.workerNodes.reduce(
265 (accumulator, workerNode) =>
266 workerNode.usage.tasks.executing === 0
267 ? accumulator + 1
268 : accumulator,
269 0
270 ),
271 busyWorkerNodes: this.workerNodes.reduce(
272 (accumulator, workerNode) =>
273 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
274 0
275 ),
276 executedTasks: this.workerNodes.reduce(
277 (accumulator, workerNode) =>
278 accumulator + workerNode.usage.tasks.executed,
279 0
280 ),
281 executingTasks: this.workerNodes.reduce(
282 (accumulator, workerNode) =>
283 accumulator + workerNode.usage.tasks.executing,
284 0
285 ),
286 queuedTasks: this.workerNodes.reduce(
287 (accumulator, workerNode) =>
288 accumulator + workerNode.usage.tasks.queued,
289 0
290 ),
291 maxQueuedTasks: this.workerNodes.reduce(
292 (accumulator, workerNode) =>
293 accumulator + workerNode.usage.tasks.maxQueued,
294 0
295 ),
296 failedTasks: this.workerNodes.reduce(
297 (accumulator, workerNode) =>
298 accumulator + workerNode.usage.tasks.failed,
299 0
300 ),
301 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
302 .runTime.aggregate && {
303 runTime: {
304 minimum: Math.min(
305 ...this.workerNodes.map(
306 workerNode => workerNode.usage.runTime.minimum
307 )
308 ),
309 maximum: Math.max(
310 ...this.workerNodes.map(
311 workerNode => workerNode.usage.runTime.maximum
312 )
313 )
314 }
315 }),
316 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
317 .waitTime.aggregate && {
318 waitTime: {
319 minimum: Math.min(
320 ...this.workerNodes.map(
321 workerNode => workerNode.usage.waitTime.minimum
322 )
323 ),
324 maximum: Math.max(
325 ...this.workerNodes.map(
326 workerNode => workerNode.usage.waitTime.maximum
327 )
328 )
329 }
330 })
331 }
332 }
333
334 /**
335 * Gets the approximate pool utilization.
336 *
337 * @returns The pool utilization.
338 */
339 private get utilization (): number {
340 const poolRunTimeCapacity =
341 (performance.now() - this.startTimestamp) * this.maxSize
342 const totalTasksRunTime = this.workerNodes.reduce(
343 (accumulator, workerNode) =>
344 accumulator + workerNode.usage.runTime.aggregate,
345 0
346 )
347 const totalTasksWaitTime = this.workerNodes.reduce(
348 (accumulator, workerNode) =>
349 accumulator + workerNode.usage.waitTime.aggregate,
350 0
351 )
352 return (totalTasksRunTime + totalTasksWaitTime) / poolRunTimeCapacity
353 }
354
355 /**
356 * Pool type.
357 *
358 * If it is `'dynamic'`, it provides the `max` property.
359 */
360 protected abstract get type (): PoolType
361
362 /**
363 * Gets the worker type.
364 */
365 protected abstract get worker (): WorkerType
366
367 /**
368 * Pool minimum size.
369 */
370 protected abstract get minSize (): number
371
372 /**
373 * Pool maximum size.
374 */
375 protected abstract get maxSize (): number
376
377 /**
378 * Get the worker given its id.
379 *
380 * @param workerId - The worker id.
381 * @returns The worker if found in the pool worker nodes, `undefined` otherwise.
382 */
383 private getWorkerById (workerId: number): Worker | undefined {
384 return this.workerNodes.find(workerNode => workerNode.info.id === workerId)
385 ?.worker
386 }
387
388 /**
389 * Gets the given worker its worker node key.
390 *
391 * @param worker - The worker.
392 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
393 */
394 private getWorkerNodeKey (worker: Worker): number {
395 return this.workerNodes.findIndex(
396 workerNode => workerNode.worker === worker
397 )
398 }
399
400 /** @inheritDoc */
401 public setWorkerChoiceStrategy (
402 workerChoiceStrategy: WorkerChoiceStrategy,
403 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
404 ): void {
405 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
406 this.opts.workerChoiceStrategy = workerChoiceStrategy
407 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
408 this.opts.workerChoiceStrategy
409 )
410 if (workerChoiceStrategyOptions != null) {
411 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
412 }
413 for (const workerNode of this.workerNodes) {
414 this.setWorkerNodeTasksUsage(
415 workerNode,
416 this.getInitialWorkerUsage(workerNode.worker)
417 )
418 this.setWorkerStatistics(workerNode.worker)
419 }
420 }
421
422 /** @inheritDoc */
423 public setWorkerChoiceStrategyOptions (
424 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
425 ): void {
426 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
427 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
428 this.workerChoiceStrategyContext.setOptions(
429 this.opts.workerChoiceStrategyOptions
430 )
431 }
432
433 /** @inheritDoc */
434 public enableTasksQueue (
435 enable: boolean,
436 tasksQueueOptions?: TasksQueueOptions
437 ): void {
438 if (this.opts.enableTasksQueue === true && !enable) {
439 this.flushTasksQueues()
440 }
441 this.opts.enableTasksQueue = enable
442 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
443 }
444
445 /** @inheritDoc */
446 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
447 if (this.opts.enableTasksQueue === true) {
448 this.checkValidTasksQueueOptions(tasksQueueOptions)
449 this.opts.tasksQueueOptions =
450 this.buildTasksQueueOptions(tasksQueueOptions)
451 } else if (this.opts.tasksQueueOptions != null) {
452 delete this.opts.tasksQueueOptions
453 }
454 }
455
456 private buildTasksQueueOptions (
457 tasksQueueOptions: TasksQueueOptions
458 ): TasksQueueOptions {
459 return {
460 concurrency: tasksQueueOptions?.concurrency ?? 1
461 }
462 }
463
464 /**
465 * Whether the pool is full or not.
466 *
467 * The pool filling boolean status.
468 */
469 protected get full (): boolean {
470 return this.workerNodes.length >= this.maxSize
471 }
472
473 /**
474 * Whether the pool is busy or not.
475 *
476 * The pool busyness boolean status.
477 */
478 protected abstract get busy (): boolean
479
480 /**
481 * Whether worker nodes are executing at least one task.
482 *
483 * @returns Worker nodes busyness boolean status.
484 */
485 protected internalBusy (): boolean {
486 return (
487 this.workerNodes.findIndex(workerNode => {
488 return workerNode.usage.tasks.executing === 0
489 }) === -1
490 )
491 }
492
493 /** @inheritDoc */
494 public async execute (data?: Data, name?: string): Promise<Response> {
495 const timestamp = performance.now()
496 const workerNodeKey = this.chooseWorkerNode()
497 const submittedTask: Task<Data> = {
498 name,
499 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
500 data: data ?? ({} as Data),
501 timestamp,
502 id: randomUUID()
503 }
504 const res = new Promise<Response>((resolve, reject) => {
505 this.promiseResponseMap.set(submittedTask.id as string, {
506 resolve,
507 reject,
508 worker: this.workerNodes[workerNodeKey].worker
509 })
510 })
511 if (
512 this.opts.enableTasksQueue === true &&
513 (this.busy ||
514 this.workerNodes[workerNodeKey].usage.tasks.executing >=
515 ((this.opts.tasksQueueOptions as TasksQueueOptions)
516 .concurrency as number))
517 ) {
518 this.enqueueTask(workerNodeKey, submittedTask)
519 } else {
520 this.executeTask(workerNodeKey, submittedTask)
521 }
522 this.checkAndEmitEvents()
523 // eslint-disable-next-line @typescript-eslint/return-await
524 return res
525 }
526
527 /** @inheritDoc */
528 public async destroy (): Promise<void> {
529 await Promise.all(
530 this.workerNodes.map(async (workerNode, workerNodeKey) => {
531 this.flushTasksQueue(workerNodeKey)
532 // FIXME: wait for tasks to be finished
533 const workerExitPromise = new Promise<void>(resolve => {
534 workerNode.worker.on('exit', () => {
535 resolve()
536 })
537 })
538 await this.destroyWorker(workerNode.worker)
539 await workerExitPromise
540 })
541 )
542 }
543
544 /**
545 * Terminates the given worker.
546 *
547 * @param worker - A worker within `workerNodes`.
548 */
549 protected abstract destroyWorker (worker: Worker): void | Promise<void>
550
551 /**
552 * Setup hook to execute code before worker nodes are created in the abstract constructor.
553 * Can be overridden.
554 *
555 * @virtual
556 */
557 protected setupHook (): void {
558 // Intentionally empty
559 }
560
561 /**
562 * Should return whether the worker is the main worker or not.
563 */
564 protected abstract isMain (): boolean
565
566 /**
567 * Hook executed before the worker task execution.
568 * Can be overridden.
569 *
570 * @param workerNodeKey - The worker node key.
571 * @param task - The task to execute.
572 */
573 protected beforeTaskExecutionHook (
574 workerNodeKey: number,
575 task: Task<Data>
576 ): void {
577 const workerUsage = this.workerNodes[workerNodeKey].usage
578 ++workerUsage.tasks.executing
579 this.updateWaitTimeWorkerUsage(workerUsage, task)
580 }
581
582 /**
583 * Hook executed after the worker task execution.
584 * Can be overridden.
585 *
586 * @param worker - The worker.
587 * @param message - The received message.
588 */
589 protected afterTaskExecutionHook (
590 worker: Worker,
591 message: MessageValue<Response>
592 ): void {
593 const workerUsage = this.workerNodes[this.getWorkerNodeKey(worker)].usage
594 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
595 this.updateRunTimeWorkerUsage(workerUsage, message)
596 this.updateEluWorkerUsage(workerUsage, message)
597 }
598
599 private updateTaskStatisticsWorkerUsage (
600 workerUsage: WorkerUsage,
601 message: MessageValue<Response>
602 ): void {
603 const workerTaskStatistics = workerUsage.tasks
604 --workerTaskStatistics.executing
605 ++workerTaskStatistics.executed
606 if (message.taskError != null) {
607 ++workerTaskStatistics.failed
608 }
609 }
610
611 private updateRunTimeWorkerUsage (
612 workerUsage: WorkerUsage,
613 message: MessageValue<Response>
614 ): void {
615 if (
616 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
617 .aggregate
618 ) {
619 const taskRunTime = message.taskPerformance?.runTime ?? 0
620 workerUsage.runTime.aggregate += taskRunTime
621 workerUsage.runTime.minimum = Math.min(
622 taskRunTime,
623 workerUsage.runTime.minimum ?? Infinity
624 )
625 workerUsage.runTime.maximum = Math.max(
626 taskRunTime,
627 workerUsage.runTime.maximum ?? -Infinity
628 )
629 if (
630 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
631 .average &&
632 workerUsage.tasks.executed !== 0
633 ) {
634 workerUsage.runTime.average =
635 workerUsage.runTime.aggregate /
636 (workerUsage.tasks.executed - workerUsage.tasks.failed)
637 }
638 if (
639 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
640 .median &&
641 message.taskPerformance?.runTime != null
642 ) {
643 workerUsage.runTime.history.push(message.taskPerformance.runTime)
644 workerUsage.runTime.median = median(workerUsage.runTime.history)
645 }
646 }
647 }
648
649 private updateWaitTimeWorkerUsage (
650 workerUsage: WorkerUsage,
651 task: Task<Data>
652 ): void {
653 const timestamp = performance.now()
654 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
655 if (
656 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime
657 .aggregate
658 ) {
659 workerUsage.waitTime.aggregate += taskWaitTime
660 workerUsage.waitTime.minimum = Math.min(
661 taskWaitTime,
662 workerUsage.waitTime.minimum ?? Infinity
663 )
664 workerUsage.waitTime.maximum = Math.max(
665 taskWaitTime,
666 workerUsage.waitTime.maximum ?? -Infinity
667 )
668 if (
669 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
670 .waitTime.average &&
671 workerUsage.tasks.executed !== 0
672 ) {
673 workerUsage.waitTime.average =
674 workerUsage.waitTime.aggregate /
675 (workerUsage.tasks.executed - workerUsage.tasks.failed)
676 }
677 if (
678 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
679 .waitTime.median &&
680 taskWaitTime != null
681 ) {
682 workerUsage.waitTime.history.push(taskWaitTime)
683 workerUsage.waitTime.median = median(workerUsage.waitTime.history)
684 }
685 }
686 }
687
688 private updateEluWorkerUsage (
689 workerUsage: WorkerUsage,
690 message: MessageValue<Response>
691 ): void {
692 if (
693 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
694 .aggregate
695 ) {
696 if (message.taskPerformance?.elu != null) {
697 workerUsage.elu.idle.aggregate += message.taskPerformance.elu.idle
698 workerUsage.elu.active.aggregate += message.taskPerformance.elu.active
699 if (workerUsage.elu.utilization != null) {
700 workerUsage.elu.utilization =
701 (workerUsage.elu.utilization +
702 message.taskPerformance.elu.utilization) /
703 2
704 } else {
705 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
706 }
707 workerUsage.elu.idle.minimum = Math.min(
708 message.taskPerformance.elu.idle,
709 workerUsage.elu.idle.minimum ?? Infinity
710 )
711 workerUsage.elu.idle.maximum = Math.max(
712 message.taskPerformance.elu.idle,
713 workerUsage.elu.idle.maximum ?? -Infinity
714 )
715 workerUsage.elu.active.minimum = Math.min(
716 message.taskPerformance.elu.active,
717 workerUsage.elu.active.minimum ?? Infinity
718 )
719 workerUsage.elu.active.maximum = Math.max(
720 message.taskPerformance.elu.active,
721 workerUsage.elu.active.maximum ?? -Infinity
722 )
723 }
724 if (
725 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
726 .average &&
727 workerUsage.tasks.executed !== 0
728 ) {
729 const executedTasks =
730 workerUsage.tasks.executed - workerUsage.tasks.failed
731 workerUsage.elu.idle.average =
732 workerUsage.elu.idle.aggregate / executedTasks
733 workerUsage.elu.active.average =
734 workerUsage.elu.active.aggregate / executedTasks
735 }
736 if (
737 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
738 .median &&
739 message.taskPerformance?.elu != null
740 ) {
741 workerUsage.elu.idle.history.push(message.taskPerformance.elu.idle)
742 workerUsage.elu.active.history.push(message.taskPerformance.elu.active)
743 workerUsage.elu.idle.median = median(workerUsage.elu.idle.history)
744 workerUsage.elu.active.median = median(workerUsage.elu.active.history)
745 }
746 }
747 }
748
749 /**
750 * Chooses a worker node for the next task.
751 *
752 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
753 *
754 * @returns The worker node key
755 */
756 private chooseWorkerNode (): number {
757 if (this.shallCreateDynamicWorker()) {
758 const worker = this.createAndSetupDynamicWorker()
759 if (
760 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
761 ) {
762 return this.getWorkerNodeKey(worker)
763 }
764 }
765 return this.workerChoiceStrategyContext.execute()
766 }
767
768 /**
769 * Conditions for dynamic worker creation.
770 *
771 * @returns Whether to create a dynamic worker or not.
772 */
773 private shallCreateDynamicWorker (): boolean {
774 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
775 }
776
777 /**
778 * Sends a message to the given worker.
779 *
780 * @param worker - The worker which should receive the message.
781 * @param message - The message.
782 */
783 protected abstract sendToWorker (
784 worker: Worker,
785 message: MessageValue<Data>
786 ): void
787
788 /**
789 * Registers a listener callback on the given worker.
790 *
791 * @param worker - The worker which should register a listener.
792 * @param listener - The message listener callback.
793 */
794 private registerWorkerMessageListener<Message extends Data | Response>(
795 worker: Worker,
796 listener: (message: MessageValue<Message>) => void
797 ): void {
798 worker.on('message', listener as MessageHandler<Worker>)
799 }
800
801 /**
802 * Creates a new worker.
803 *
804 * @returns Newly created worker.
805 */
806 protected abstract createWorker (): Worker
807
808 /**
809 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
810 * Can be overridden.
811 *
812 * @param worker - The newly created worker.
813 */
814 protected afterWorkerSetup (worker: Worker): void {
815 // Listen to worker messages.
816 this.registerWorkerMessageListener(worker, this.workerListener())
817 }
818
819 /**
820 * Creates a new worker and sets it up completely in the pool worker nodes.
821 *
822 * @returns New, completely set up worker.
823 */
824 protected createAndSetupWorker (): Worker {
825 const worker = this.createWorker()
826
827 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
828 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
829 worker.on('error', error => {
830 if (this.emitter != null) {
831 this.emitter.emit(PoolEvents.error, error)
832 }
833 if (this.opts.enableTasksQueue === true) {
834 const workerNodeKey = this.getWorkerNodeKey(worker)
835 while (this.tasksQueueSize(workerNodeKey) > 0) {
836 let targetWorkerNodeKey: number = workerNodeKey
837 let minQueuedTasks = Infinity
838 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
839 if (
840 workerNodeId !== workerNodeKey &&
841 workerNode.usage.tasks.queued === 0
842 ) {
843 targetWorkerNodeKey = workerNodeId
844 break
845 }
846 if (
847 workerNodeId !== workerNodeKey &&
848 workerNode.usage.tasks.queued < minQueuedTasks
849 ) {
850 minQueuedTasks = workerNode.usage.tasks.queued
851 targetWorkerNodeKey = workerNodeId
852 }
853 }
854 this.enqueueTask(
855 targetWorkerNodeKey,
856 this.dequeueTask(workerNodeKey) as Task<Data>
857 )
858 }
859 }
860 if (this.opts.restartWorkerOnError === true) {
861 if (this.getWorkerInfo(this.getWorkerNodeKey(worker)).dynamic) {
862 this.createAndSetupDynamicWorker()
863 } else {
864 this.createAndSetupWorker()
865 }
866 }
867 })
868 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
869 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
870 worker.once('exit', () => {
871 this.removeWorkerNode(worker)
872 })
873
874 this.pushWorkerNode(worker)
875
876 this.setWorkerStatistics(worker)
877
878 this.afterWorkerSetup(worker)
879
880 return worker
881 }
882
883 /**
884 * Creates a new dynamic worker and sets it up completely in the pool worker nodes.
885 *
886 * @returns New, completely set up dynamic worker.
887 */
888 protected createAndSetupDynamicWorker (): Worker {
889 const worker = this.createAndSetupWorker()
890 this.getWorkerInfo(this.getWorkerNodeKey(worker)).dynamic = true
891 this.registerWorkerMessageListener(worker, message => {
892 const workerNodeKey = this.getWorkerNodeKey(worker)
893 if (
894 isKillBehavior(KillBehaviors.HARD, message.kill) ||
895 (message.kill != null &&
896 ((this.opts.enableTasksQueue === false &&
897 this.workerNodes[workerNodeKey].usage.tasks.executing === 0) ||
898 (this.opts.enableTasksQueue === true &&
899 this.workerNodes[workerNodeKey].usage.tasks.executing === 0 &&
900 this.tasksQueueSize(workerNodeKey) === 0)))
901 ) {
902 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
903 void (this.destroyWorker(worker) as Promise<void>)
904 }
905 })
906 return worker
907 }
908
909 /**
910 * This function is the listener registered for each worker message.
911 *
912 * @returns The listener function to execute when a message is received from a worker.
913 */
914 protected workerListener (): (message: MessageValue<Response>) => void {
915 return message => {
916 if (message.workerId != null && message.started != null) {
917 // Worker started message received
918 this.handleWorkerStartedMessage(message)
919 } else if (message.id != null) {
920 // Task execution response received
921 this.handleTaskExecutionResponse(message)
922 }
923 }
924 }
925
926 private handleWorkerStartedMessage (message: MessageValue<Response>): void {
927 // Worker started message received
928 const worker = this.getWorkerById(message.workerId as number)
929 if (worker != null) {
930 this.workerNodes[this.getWorkerNodeKey(worker)].info.started =
931 message.started as boolean
932 } else {
933 throw new Error(
934 `Worker started message received from unknown worker '${
935 message.workerId as number
936 }'`
937 )
938 }
939 }
940
941 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
942 const promiseResponse = this.promiseResponseMap.get(message.id as string)
943 if (promiseResponse != null) {
944 if (message.taskError != null) {
945 if (this.emitter != null) {
946 this.emitter.emit(PoolEvents.taskError, message.taskError)
947 }
948 promiseResponse.reject(message.taskError.message)
949 } else {
950 promiseResponse.resolve(message.data as Response)
951 }
952 this.afterTaskExecutionHook(promiseResponse.worker, message)
953 this.promiseResponseMap.delete(message.id as string)
954 const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
955 if (
956 this.opts.enableTasksQueue === true &&
957 this.tasksQueueSize(workerNodeKey) > 0
958 ) {
959 this.executeTask(
960 workerNodeKey,
961 this.dequeueTask(workerNodeKey) as Task<Data>
962 )
963 }
964 this.workerChoiceStrategyContext.update(workerNodeKey)
965 }
966 }
967
968 private checkAndEmitEvents (): void {
969 if (this.emitter != null) {
970 if (this.busy) {
971 this.emitter.emit(PoolEvents.busy, this.info)
972 }
973 if (this.type === PoolTypes.dynamic && this.full) {
974 this.emitter.emit(PoolEvents.full, this.info)
975 }
976 }
977 }
978
979 /**
980 * Sets the given worker node its tasks usage in the pool.
981 *
982 * @param workerNode - The worker node.
983 * @param workerUsage - The worker usage.
984 */
985 private setWorkerNodeTasksUsage (
986 workerNode: WorkerNode<Worker, Data>,
987 workerUsage: WorkerUsage
988 ): void {
989 workerNode.usage = workerUsage
990 }
991
992 /**
993 * Gets the worker information.
994 *
995 * @param workerNodeKey - The worker node key.
996 */
997 private getWorkerInfo (workerNodeKey: number): WorkerInfo {
998 return this.workerNodes[workerNodeKey].info
999 }
1000
1001 /**
1002 * Pushes the given worker in the pool worker nodes.
1003 *
1004 * @param worker - The worker.
1005 * @returns The worker nodes length.
1006 */
1007 private pushWorkerNode (worker: Worker): number {
1008 this.workerNodes.push({
1009 worker,
1010 info: this.getInitialWorkerInfo(worker),
1011 usage: this.getInitialWorkerUsage(),
1012 tasksQueue: new Queue<Task<Data>>()
1013 })
1014 this.setWorkerNodeTasksUsage(
1015 this.workerNodes[this.getWorkerNodeKey(worker)],
1016 this.getInitialWorkerUsage(worker)
1017 )
1018 return this.workerNodes.length
1019 }
1020
1021 /**
1022 * Gets the worker id.
1023 *
1024 * @param worker - The worker.
1025 * @returns The worker id.
1026 */
1027 private getWorkerId (worker: Worker): number | undefined {
1028 if (this.worker === WorkerTypes.thread) {
1029 return worker.threadId
1030 } else if (this.worker === WorkerTypes.cluster) {
1031 return worker.id
1032 }
1033 }
1034
1035 // /**
1036 // * Sets the given worker in the pool worker nodes.
1037 // *
1038 // * @param workerNodeKey - The worker node key.
1039 // * @param worker - The worker.
1040 // * @param workerInfo - The worker info.
1041 // * @param workerUsage - The worker usage.
1042 // * @param tasksQueue - The worker task queue.
1043 // */
1044 // private setWorkerNode (
1045 // workerNodeKey: number,
1046 // worker: Worker,
1047 // workerInfo: WorkerInfo,
1048 // workerUsage: WorkerUsage,
1049 // tasksQueue: Queue<Task<Data>>
1050 // ): void {
1051 // this.workerNodes[workerNodeKey] = {
1052 // worker,
1053 // info: workerInfo,
1054 // usage: workerUsage,
1055 // tasksQueue
1056 // }
1057 // }
1058
1059 /**
1060 * Removes the given worker from the pool worker nodes.
1061 *
1062 * @param worker - The worker.
1063 */
1064 private removeWorkerNode (worker: Worker): void {
1065 const workerNodeKey = this.getWorkerNodeKey(worker)
1066 if (workerNodeKey !== -1) {
1067 this.workerNodes.splice(workerNodeKey, 1)
1068 this.workerChoiceStrategyContext.remove(workerNodeKey)
1069 }
1070 }
1071
1072 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1073 this.beforeTaskExecutionHook(workerNodeKey, task)
1074 this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
1075 }
1076
1077 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1078 return this.workerNodes[workerNodeKey].tasksQueue.enqueue(task)
1079 }
1080
1081 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1082 return this.workerNodes[workerNodeKey].tasksQueue.dequeue()
1083 }
1084
1085 private tasksQueueSize (workerNodeKey: number): number {
1086 return this.workerNodes[workerNodeKey].tasksQueue.size
1087 }
1088
1089 private tasksMaxQueueSize (workerNodeKey: number): number {
1090 return this.workerNodes[workerNodeKey].tasksQueue.maxSize
1091 }
1092
1093 private flushTasksQueue (workerNodeKey: number): void {
1094 while (this.tasksQueueSize(workerNodeKey) > 0) {
1095 this.executeTask(
1096 workerNodeKey,
1097 this.dequeueTask(workerNodeKey) as Task<Data>
1098 )
1099 }
1100 this.workerNodes[workerNodeKey].tasksQueue.clear()
1101 }
1102
1103 private flushTasksQueues (): void {
1104 for (const [workerNodeKey] of this.workerNodes.entries()) {
1105 this.flushTasksQueue(workerNodeKey)
1106 }
1107 }
1108
1109 private setWorkerStatistics (worker: Worker): void {
1110 this.sendToWorker(worker, {
1111 statistics: {
1112 runTime:
1113 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1114 .runTime.aggregate,
1115 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1116 .elu.aggregate
1117 }
1118 })
1119 }
1120
1121 private getInitialWorkerUsage (worker?: Worker): WorkerUsage {
1122 const getTasksQueueSize = (worker?: Worker): number => {
1123 return worker != null
1124 ? this.tasksQueueSize(this.getWorkerNodeKey(worker))
1125 : 0
1126 }
1127 const getTasksMaxQueueSize = (worker?: Worker): number => {
1128 return worker != null
1129 ? this.tasksMaxQueueSize(this.getWorkerNodeKey(worker))
1130 : 0
1131 }
1132 return {
1133 tasks: {
1134 executed: 0,
1135 executing: 0,
1136 get queued (): number {
1137 return getTasksQueueSize(worker)
1138 },
1139 get maxQueued (): number {
1140 return getTasksMaxQueueSize(worker)
1141 },
1142 failed: 0
1143 },
1144 runTime: {
1145 aggregate: 0,
1146 maximum: 0,
1147 minimum: 0,
1148 average: 0,
1149 median: 0,
1150 history: new CircularArray()
1151 },
1152 waitTime: {
1153 aggregate: 0,
1154 maximum: 0,
1155 minimum: 0,
1156 average: 0,
1157 median: 0,
1158 history: new CircularArray()
1159 },
1160 elu: {
1161 idle: {
1162 aggregate: 0,
1163 maximum: 0,
1164 minimum: 0,
1165 average: 0,
1166 median: 0,
1167 history: new CircularArray()
1168 },
1169 active: {
1170 aggregate: 0,
1171 maximum: 0,
1172 minimum: 0,
1173 average: 0,
1174 median: 0,
1175 history: new CircularArray()
1176 }
1177 }
1178 }
1179 }
1180
1181 private getInitialWorkerInfo (worker: Worker): WorkerInfo {
1182 return { id: this.getWorkerId(worker), dynamic: false, started: true }
1183 }
1184 }