fix: fix worker task functions handling
[poolifier.git] / src / pools / abstract-pool.ts
1 import { randomUUID } from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import { existsSync } from 'node:fs'
4 import type {
5 MessageValue,
6 PromiseResponseWrapper,
7 Task
8 } from '../utility-types'
9 import {
10 DEFAULT_TASK_NAME,
11 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
12 EMPTY_FUNCTION,
13 isKillBehavior,
14 isPlainObject,
15 median,
16 round
17 } from '../utils'
18 import { KillBehaviors } from '../worker/worker-options'
19 import {
20 type IPool,
21 PoolEmitter,
22 PoolEvents,
23 type PoolInfo,
24 type PoolOptions,
25 type PoolType,
26 PoolTypes,
27 type TasksQueueOptions
28 } from './pool'
29 import type {
30 IWorker,
31 IWorkerNode,
32 MessageHandler,
33 WorkerInfo,
34 WorkerType,
35 WorkerUsage
36 } from './worker'
37 import {
38 Measurements,
39 WorkerChoiceStrategies,
40 type WorkerChoiceStrategy,
41 type WorkerChoiceStrategyOptions
42 } from './selection-strategies/selection-strategies-types'
43 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
44 import { version } from './version'
45 import { WorkerNode } from './worker-node'
46
47 /**
48 * Base class that implements some shared logic for all poolifier pools.
49 *
50 * @typeParam Worker - Type of worker which manages this pool.
51 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
52 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
53 */
54 export abstract class AbstractPool<
55 Worker extends IWorker,
56 Data = unknown,
57 Response = unknown
58 > implements IPool<Worker, Data, Response> {
59 /** @inheritDoc */
60 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
61
62 /** @inheritDoc */
63 public readonly emitter?: PoolEmitter
64
65 /**
66 * The execution response promise map.
67 *
68 * - `key`: The message id of each submitted task.
69 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
70 *
71 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
72 */
73 protected promiseResponseMap: Map<
74 string,
75 PromiseResponseWrapper<Worker, Response>
76 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
77
78 /**
79 * Worker choice strategy context referencing a worker choice algorithm implementation.
80 */
81 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
82 Worker,
83 Data,
84 Response
85 >
86
87 /**
88 * Whether the pool is starting or not.
89 */
90 private readonly starting: boolean
91 /**
92 * The start timestamp of the pool.
93 */
94 private readonly startTimestamp
95
96 /**
97 * Constructs a new poolifier pool.
98 *
99 * @param numberOfWorkers - Number of workers that this pool should manage.
100 * @param filePath - Path to the worker file.
101 * @param opts - Options for the pool.
102 */
103 public constructor (
104 protected readonly numberOfWorkers: number,
105 protected readonly filePath: string,
106 protected readonly opts: PoolOptions<Worker>
107 ) {
108 if (!this.isMain()) {
109 throw new Error('Cannot start a pool from a worker!')
110 }
111 this.checkNumberOfWorkers(this.numberOfWorkers)
112 this.checkFilePath(this.filePath)
113 this.checkPoolOptions(this.opts)
114
115 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
116 this.executeTask = this.executeTask.bind(this)
117 this.enqueueTask = this.enqueueTask.bind(this)
118 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
119
120 if (this.opts.enableEvents === true) {
121 this.emitter = new PoolEmitter()
122 }
123 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
124 Worker,
125 Data,
126 Response
127 >(
128 this,
129 this.opts.workerChoiceStrategy,
130 this.opts.workerChoiceStrategyOptions
131 )
132
133 this.setupHook()
134
135 this.starting = true
136 while (
137 this.workerNodes.reduce(
138 (accumulator, workerNode) =>
139 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
140 0
141 ) < this.numberOfWorkers
142 ) {
143 this.createAndSetupWorker()
144 }
145 this.starting = false
146
147 this.startTimestamp = performance.now()
148 }
149
150 private checkFilePath (filePath: string): void {
151 if (
152 filePath == null ||
153 typeof filePath !== 'string' ||
154 (typeof filePath === 'string' && filePath.trim().length === 0)
155 ) {
156 throw new Error('Please specify a file with a worker implementation')
157 }
158 if (!existsSync(filePath)) {
159 throw new Error(`Cannot find the worker file '${filePath}'`)
160 }
161 }
162
163 private checkNumberOfWorkers (numberOfWorkers: number): void {
164 if (numberOfWorkers == null) {
165 throw new Error(
166 'Cannot instantiate a pool without specifying the number of workers'
167 )
168 } else if (!Number.isSafeInteger(numberOfWorkers)) {
169 throw new TypeError(
170 'Cannot instantiate a pool with a non safe integer number of workers'
171 )
172 } else if (numberOfWorkers < 0) {
173 throw new RangeError(
174 'Cannot instantiate a pool with a negative number of workers'
175 )
176 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
177 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
178 }
179 }
180
181 protected checkDynamicPoolSize (min: number, max: number): void {
182 if (this.type === PoolTypes.dynamic) {
183 if (min > max) {
184 throw new RangeError(
185 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
186 )
187 } else if (max === 0) {
188 throw new RangeError(
189 'Cannot instantiate a dynamic pool with a pool size equal to zero'
190 )
191 } else if (min === max) {
192 throw new RangeError(
193 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
194 )
195 }
196 }
197 }
198
199 private checkPoolOptions (opts: PoolOptions<Worker>): void {
200 if (isPlainObject(opts)) {
201 this.opts.workerChoiceStrategy =
202 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
203 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
204 this.opts.workerChoiceStrategyOptions =
205 opts.workerChoiceStrategyOptions ??
206 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
207 this.checkValidWorkerChoiceStrategyOptions(
208 this.opts.workerChoiceStrategyOptions
209 )
210 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
211 this.opts.enableEvents = opts.enableEvents ?? true
212 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
213 if (this.opts.enableTasksQueue) {
214 this.checkValidTasksQueueOptions(
215 opts.tasksQueueOptions as TasksQueueOptions
216 )
217 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
218 opts.tasksQueueOptions as TasksQueueOptions
219 )
220 }
221 } else {
222 throw new TypeError('Invalid pool options: must be a plain object')
223 }
224 }
225
226 private checkValidWorkerChoiceStrategy (
227 workerChoiceStrategy: WorkerChoiceStrategy
228 ): void {
229 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
230 throw new Error(
231 `Invalid worker choice strategy '${workerChoiceStrategy}'`
232 )
233 }
234 }
235
236 private checkValidWorkerChoiceStrategyOptions (
237 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
238 ): void {
239 if (!isPlainObject(workerChoiceStrategyOptions)) {
240 throw new TypeError(
241 'Invalid worker choice strategy options: must be a plain object'
242 )
243 }
244 if (
245 workerChoiceStrategyOptions.weights != null &&
246 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
247 ) {
248 throw new Error(
249 'Invalid worker choice strategy options: must have a weight for each worker node'
250 )
251 }
252 if (
253 workerChoiceStrategyOptions.measurement != null &&
254 !Object.values(Measurements).includes(
255 workerChoiceStrategyOptions.measurement
256 )
257 ) {
258 throw new Error(
259 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
260 )
261 }
262 }
263
264 private checkValidTasksQueueOptions (
265 tasksQueueOptions: TasksQueueOptions
266 ): void {
267 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
268 throw new TypeError('Invalid tasks queue options: must be a plain object')
269 }
270 if (
271 tasksQueueOptions?.concurrency != null &&
272 !Number.isSafeInteger(tasksQueueOptions.concurrency)
273 ) {
274 throw new TypeError(
275 'Invalid worker tasks concurrency: must be an integer'
276 )
277 }
278 if (
279 tasksQueueOptions?.concurrency != null &&
280 tasksQueueOptions.concurrency <= 0
281 ) {
282 throw new Error(
283 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
284 )
285 }
286 }
287
288 /** @inheritDoc */
289 public get info (): PoolInfo {
290 return {
291 version,
292 type: this.type,
293 worker: this.worker,
294 ready: this.ready,
295 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
296 minSize: this.minSize,
297 maxSize: this.maxSize,
298 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
299 .runTime.aggregate &&
300 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
301 .waitTime.aggregate && { utilization: round(this.utilization) }),
302 workerNodes: this.workerNodes.length,
303 idleWorkerNodes: this.workerNodes.reduce(
304 (accumulator, workerNode) =>
305 workerNode.usage.tasks.executing === 0
306 ? accumulator + 1
307 : accumulator,
308 0
309 ),
310 busyWorkerNodes: this.workerNodes.reduce(
311 (accumulator, workerNode) =>
312 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
313 0
314 ),
315 executedTasks: this.workerNodes.reduce(
316 (accumulator, workerNode) =>
317 accumulator + workerNode.usage.tasks.executed,
318 0
319 ),
320 executingTasks: this.workerNodes.reduce(
321 (accumulator, workerNode) =>
322 accumulator + workerNode.usage.tasks.executing,
323 0
324 ),
325 queuedTasks: this.workerNodes.reduce(
326 (accumulator, workerNode) =>
327 accumulator + workerNode.usage.tasks.queued,
328 0
329 ),
330 maxQueuedTasks: this.workerNodes.reduce(
331 (accumulator, workerNode) =>
332 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
333 0
334 ),
335 failedTasks: this.workerNodes.reduce(
336 (accumulator, workerNode) =>
337 accumulator + workerNode.usage.tasks.failed,
338 0
339 ),
340 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
341 .runTime.aggregate && {
342 runTime: {
343 minimum: round(
344 Math.min(
345 ...this.workerNodes.map(
346 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
347 )
348 )
349 ),
350 maximum: round(
351 Math.max(
352 ...this.workerNodes.map(
353 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
354 )
355 )
356 ),
357 average: round(
358 this.workerNodes.reduce(
359 (accumulator, workerNode) =>
360 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
361 0
362 ) /
363 this.workerNodes.reduce(
364 (accumulator, workerNode) =>
365 accumulator + (workerNode.usage.tasks?.executed ?? 0),
366 0
367 )
368 ),
369 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
370 .runTime.median && {
371 median: round(
372 median(
373 this.workerNodes.map(
374 workerNode => workerNode.usage.runTime?.median ?? 0
375 )
376 )
377 )
378 })
379 }
380 }),
381 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
382 .waitTime.aggregate && {
383 waitTime: {
384 minimum: round(
385 Math.min(
386 ...this.workerNodes.map(
387 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
388 )
389 )
390 ),
391 maximum: round(
392 Math.max(
393 ...this.workerNodes.map(
394 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
395 )
396 )
397 ),
398 average: round(
399 this.workerNodes.reduce(
400 (accumulator, workerNode) =>
401 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
402 0
403 ) /
404 this.workerNodes.reduce(
405 (accumulator, workerNode) =>
406 accumulator + (workerNode.usage.tasks?.executed ?? 0),
407 0
408 )
409 ),
410 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
411 .waitTime.median && {
412 median: round(
413 median(
414 this.workerNodes.map(
415 workerNode => workerNode.usage.waitTime?.median ?? 0
416 )
417 )
418 )
419 })
420 }
421 })
422 }
423 }
424
425 private get ready (): boolean {
426 return (
427 this.workerNodes.reduce(
428 (accumulator, workerNode) =>
429 !workerNode.info.dynamic && workerNode.info.ready
430 ? accumulator + 1
431 : accumulator,
432 0
433 ) >= this.minSize
434 )
435 }
436
437 /**
438 * Gets the approximate pool utilization.
439 *
440 * @returns The pool utilization.
441 */
442 private get utilization (): number {
443 const poolTimeCapacity =
444 (performance.now() - this.startTimestamp) * this.maxSize
445 const totalTasksRunTime = this.workerNodes.reduce(
446 (accumulator, workerNode) =>
447 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
448 0
449 )
450 const totalTasksWaitTime = this.workerNodes.reduce(
451 (accumulator, workerNode) =>
452 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
453 0
454 )
455 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
456 }
457
458 /**
459 * Pool type.
460 *
461 * If it is `'dynamic'`, it provides the `max` property.
462 */
463 protected abstract get type (): PoolType
464
465 /**
466 * Gets the worker type.
467 */
468 protected abstract get worker (): WorkerType
469
470 /**
471 * Pool minimum size.
472 */
473 protected abstract get minSize (): number
474
475 /**
476 * Pool maximum size.
477 */
478 protected abstract get maxSize (): number
479
480 /**
481 * Get the worker given its id.
482 *
483 * @param workerId - The worker id.
484 * @returns The worker if found in the pool worker nodes, `undefined` otherwise.
485 */
486 private getWorkerById (workerId: number): Worker | undefined {
487 return this.workerNodes.find(workerNode => workerNode.info.id === workerId)
488 ?.worker
489 }
490
491 /**
492 * Checks if the worker id sent in the received message from a worker is valid.
493 *
494 * @param message - The received message.
495 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
496 */
497 private checkMessageWorkerId (message: MessageValue<Response>): void {
498 if (
499 message.workerId != null &&
500 this.getWorkerById(message.workerId) == null
501 ) {
502 throw new Error(
503 `Worker message received from unknown worker '${message.workerId}'`
504 )
505 }
506 }
507
508 /**
509 * Gets the given worker its worker node key.
510 *
511 * @param worker - The worker.
512 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
513 */
514 private getWorkerNodeKey (worker: Worker): number {
515 return this.workerNodes.findIndex(
516 workerNode => workerNode.worker === worker
517 )
518 }
519
520 /** @inheritDoc */
521 public setWorkerChoiceStrategy (
522 workerChoiceStrategy: WorkerChoiceStrategy,
523 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
524 ): void {
525 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
526 this.opts.workerChoiceStrategy = workerChoiceStrategy
527 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
528 this.opts.workerChoiceStrategy
529 )
530 if (workerChoiceStrategyOptions != null) {
531 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
532 }
533 for (const workerNode of this.workerNodes) {
534 workerNode.resetUsage()
535 this.setWorkerStatistics(workerNode.worker)
536 }
537 }
538
539 /** @inheritDoc */
540 public setWorkerChoiceStrategyOptions (
541 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
542 ): void {
543 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
544 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
545 this.workerChoiceStrategyContext.setOptions(
546 this.opts.workerChoiceStrategyOptions
547 )
548 }
549
550 /** @inheritDoc */
551 public enableTasksQueue (
552 enable: boolean,
553 tasksQueueOptions?: TasksQueueOptions
554 ): void {
555 if (this.opts.enableTasksQueue === true && !enable) {
556 this.flushTasksQueues()
557 }
558 this.opts.enableTasksQueue = enable
559 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
560 }
561
562 /** @inheritDoc */
563 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
564 if (this.opts.enableTasksQueue === true) {
565 this.checkValidTasksQueueOptions(tasksQueueOptions)
566 this.opts.tasksQueueOptions =
567 this.buildTasksQueueOptions(tasksQueueOptions)
568 } else if (this.opts.tasksQueueOptions != null) {
569 delete this.opts.tasksQueueOptions
570 }
571 }
572
573 private buildTasksQueueOptions (
574 tasksQueueOptions: TasksQueueOptions
575 ): TasksQueueOptions {
576 return {
577 concurrency: tasksQueueOptions?.concurrency ?? 1
578 }
579 }
580
581 /**
582 * Whether the pool is full or not.
583 *
584 * The pool filling boolean status.
585 */
586 protected get full (): boolean {
587 return this.workerNodes.length >= this.maxSize
588 }
589
590 /**
591 * Whether the pool is busy or not.
592 *
593 * The pool busyness boolean status.
594 */
595 protected abstract get busy (): boolean
596
597 /**
598 * Whether worker nodes are executing at least one task.
599 *
600 * @returns Worker nodes busyness boolean status.
601 */
602 protected internalBusy (): boolean {
603 return (
604 this.workerNodes.findIndex(workerNode => {
605 return workerNode.usage.tasks.executing === 0
606 }) === -1
607 )
608 }
609
610 /** @inheritDoc */
611 public async execute (data?: Data, name?: string): Promise<Response> {
612 const timestamp = performance.now()
613 const workerNodeKey = this.chooseWorkerNode()
614 const submittedTask: Task<Data> = {
615 name: name ?? DEFAULT_TASK_NAME,
616 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
617 data: data ?? ({} as Data),
618 timestamp,
619 workerId: this.getWorkerInfo(workerNodeKey).id as number,
620 id: randomUUID()
621 }
622 const res = new Promise<Response>((resolve, reject) => {
623 this.promiseResponseMap.set(submittedTask.id as string, {
624 resolve,
625 reject,
626 worker: this.workerNodes[workerNodeKey].worker
627 })
628 })
629 if (
630 this.opts.enableTasksQueue === true &&
631 (this.busy ||
632 this.workerNodes[workerNodeKey].usage.tasks.executing >=
633 ((this.opts.tasksQueueOptions as TasksQueueOptions)
634 .concurrency as number))
635 ) {
636 this.enqueueTask(workerNodeKey, submittedTask)
637 } else {
638 this.executeTask(workerNodeKey, submittedTask)
639 }
640 this.checkAndEmitEvents()
641 // eslint-disable-next-line @typescript-eslint/return-await
642 return res
643 }
644
645 /** @inheritDoc */
646 public async destroy (): Promise<void> {
647 await Promise.all(
648 this.workerNodes.map(async (workerNode, workerNodeKey) => {
649 this.flushTasksQueue(workerNodeKey)
650 // FIXME: wait for tasks to be finished
651 const workerExitPromise = new Promise<void>(resolve => {
652 workerNode.worker.on('exit', () => {
653 resolve()
654 })
655 })
656 await this.destroyWorker(workerNode.worker)
657 await workerExitPromise
658 })
659 )
660 }
661
662 /**
663 * Terminates the given worker.
664 *
665 * @param worker - A worker within `workerNodes`.
666 */
667 protected abstract destroyWorker (worker: Worker): void | Promise<void>
668
669 /**
670 * Setup hook to execute code before worker nodes are created in the abstract constructor.
671 * Can be overridden.
672 *
673 * @virtual
674 */
675 protected setupHook (): void {
676 // Intentionally empty
677 }
678
679 /**
680 * Should return whether the worker is the main worker or not.
681 */
682 protected abstract isMain (): boolean
683
684 /**
685 * Hook executed before the worker task execution.
686 * Can be overridden.
687 *
688 * @param workerNodeKey - The worker node key.
689 * @param task - The task to execute.
690 */
691 protected beforeTaskExecutionHook (
692 workerNodeKey: number,
693 task: Task<Data>
694 ): void {
695 const workerUsage = this.workerNodes[workerNodeKey].usage
696 ++workerUsage.tasks.executing
697 this.updateWaitTimeWorkerUsage(workerUsage, task)
698 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
699 task.name as string
700 ) as WorkerUsage
701 ++taskWorkerUsage.tasks.executing
702 this.updateWaitTimeWorkerUsage(taskWorkerUsage, task)
703 }
704
705 /**
706 * Hook executed after the worker task execution.
707 * Can be overridden.
708 *
709 * @param worker - The worker.
710 * @param message - The received message.
711 */
712 protected afterTaskExecutionHook (
713 worker: Worker,
714 message: MessageValue<Response>
715 ): void {
716 const workerNodeKey = this.getWorkerNodeKey(worker)
717 const workerUsage = this.workerNodes[workerNodeKey].usage
718 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
719 this.updateRunTimeWorkerUsage(workerUsage, message)
720 this.updateEluWorkerUsage(workerUsage, message)
721 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
722 message.taskPerformance?.name ?? DEFAULT_TASK_NAME
723 ) as WorkerUsage
724 this.updateTaskStatisticsWorkerUsage(taskWorkerUsage, message)
725 this.updateRunTimeWorkerUsage(taskWorkerUsage, message)
726 this.updateEluWorkerUsage(taskWorkerUsage, message)
727 }
728
729 private updateTaskStatisticsWorkerUsage (
730 workerUsage: WorkerUsage,
731 message: MessageValue<Response>
732 ): void {
733 const workerTaskStatistics = workerUsage.tasks
734 --workerTaskStatistics.executing
735 if (message.taskError == null) {
736 ++workerTaskStatistics.executed
737 } else {
738 ++workerTaskStatistics.failed
739 }
740 }
741
742 private updateRunTimeWorkerUsage (
743 workerUsage: WorkerUsage,
744 message: MessageValue<Response>
745 ): void {
746 if (
747 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
748 .aggregate
749 ) {
750 const taskRunTime = message.taskPerformance?.runTime ?? 0
751 workerUsage.runTime.aggregate =
752 (workerUsage.runTime.aggregate ?? 0) + taskRunTime
753 workerUsage.runTime.minimum = Math.min(
754 taskRunTime,
755 workerUsage.runTime?.minimum ?? Infinity
756 )
757 workerUsage.runTime.maximum = Math.max(
758 taskRunTime,
759 workerUsage.runTime?.maximum ?? -Infinity
760 )
761 if (
762 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
763 .average &&
764 workerUsage.tasks.executed !== 0
765 ) {
766 workerUsage.runTime.average =
767 workerUsage.runTime.aggregate / workerUsage.tasks.executed
768 }
769 if (
770 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
771 .median &&
772 message.taskPerformance?.runTime != null
773 ) {
774 workerUsage.runTime.history.push(message.taskPerformance.runTime)
775 workerUsage.runTime.median = median(workerUsage.runTime.history)
776 }
777 }
778 }
779
780 private updateWaitTimeWorkerUsage (
781 workerUsage: WorkerUsage,
782 task: Task<Data>
783 ): void {
784 const timestamp = performance.now()
785 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
786 if (
787 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime
788 .aggregate
789 ) {
790 workerUsage.waitTime.aggregate =
791 (workerUsage.waitTime?.aggregate ?? 0) + taskWaitTime
792 workerUsage.waitTime.minimum = Math.min(
793 taskWaitTime,
794 workerUsage.waitTime?.minimum ?? Infinity
795 )
796 workerUsage.waitTime.maximum = Math.max(
797 taskWaitTime,
798 workerUsage.waitTime?.maximum ?? -Infinity
799 )
800 if (
801 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
802 .waitTime.average &&
803 workerUsage.tasks.executed !== 0
804 ) {
805 workerUsage.waitTime.average =
806 workerUsage.waitTime.aggregate / workerUsage.tasks.executed
807 }
808 if (
809 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
810 .waitTime.median
811 ) {
812 workerUsage.waitTime.history.push(taskWaitTime)
813 workerUsage.waitTime.median = median(workerUsage.waitTime.history)
814 }
815 }
816 }
817
818 private updateEluWorkerUsage (
819 workerUsage: WorkerUsage,
820 message: MessageValue<Response>
821 ): void {
822 if (
823 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
824 .aggregate
825 ) {
826 if (message.taskPerformance?.elu != null) {
827 workerUsage.elu.idle.aggregate =
828 (workerUsage.elu.idle?.aggregate ?? 0) +
829 message.taskPerformance.elu.idle
830 workerUsage.elu.active.aggregate =
831 (workerUsage.elu.active?.aggregate ?? 0) +
832 message.taskPerformance.elu.active
833 if (workerUsage.elu.utilization != null) {
834 workerUsage.elu.utilization =
835 (workerUsage.elu.utilization +
836 message.taskPerformance.elu.utilization) /
837 2
838 } else {
839 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
840 }
841 workerUsage.elu.idle.minimum = Math.min(
842 message.taskPerformance.elu.idle,
843 workerUsage.elu.idle?.minimum ?? Infinity
844 )
845 workerUsage.elu.idle.maximum = Math.max(
846 message.taskPerformance.elu.idle,
847 workerUsage.elu.idle?.maximum ?? -Infinity
848 )
849 workerUsage.elu.active.minimum = Math.min(
850 message.taskPerformance.elu.active,
851 workerUsage.elu.active?.minimum ?? Infinity
852 )
853 workerUsage.elu.active.maximum = Math.max(
854 message.taskPerformance.elu.active,
855 workerUsage.elu.active?.maximum ?? -Infinity
856 )
857 if (
858 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
859 .average &&
860 workerUsage.tasks.executed !== 0
861 ) {
862 workerUsage.elu.idle.average =
863 workerUsage.elu.idle.aggregate / workerUsage.tasks.executed
864 workerUsage.elu.active.average =
865 workerUsage.elu.active.aggregate / workerUsage.tasks.executed
866 }
867 if (
868 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
869 .median
870 ) {
871 workerUsage.elu.idle.history.push(message.taskPerformance.elu.idle)
872 workerUsage.elu.active.history.push(
873 message.taskPerformance.elu.active
874 )
875 workerUsage.elu.idle.median = median(workerUsage.elu.idle.history)
876 workerUsage.elu.active.median = median(workerUsage.elu.active.history)
877 }
878 }
879 }
880 }
881
882 /**
883 * Chooses a worker node for the next task.
884 *
885 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
886 *
887 * @returns The worker node key
888 */
889 private chooseWorkerNode (): number {
890 if (this.shallCreateDynamicWorker()) {
891 const worker = this.createAndSetupDynamicWorker()
892 if (
893 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
894 ) {
895 return this.getWorkerNodeKey(worker)
896 }
897 }
898 return this.workerChoiceStrategyContext.execute()
899 }
900
901 /**
902 * Conditions for dynamic worker creation.
903 *
904 * @returns Whether to create a dynamic worker or not.
905 */
906 private shallCreateDynamicWorker (): boolean {
907 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
908 }
909
910 /**
911 * Sends a message to the given worker.
912 *
913 * @param worker - The worker which should receive the message.
914 * @param message - The message.
915 */
916 protected abstract sendToWorker (
917 worker: Worker,
918 message: MessageValue<Data>
919 ): void
920
921 /**
922 * Creates a new worker.
923 *
924 * @returns Newly created worker.
925 */
926 protected abstract createWorker (): Worker
927
928 /**
929 * Creates a new worker and sets it up completely in the pool worker nodes.
930 *
931 * @returns New, completely set up worker.
932 */
933 protected createAndSetupWorker (): Worker {
934 const worker = this.createWorker()
935
936 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
937 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
938 worker.on('error', error => {
939 const workerNodeKey = this.getWorkerNodeKey(worker)
940 const workerInfo = this.getWorkerInfo(workerNodeKey)
941 workerInfo.ready = false
942 this.emitter?.emit(PoolEvents.error, error)
943 if (this.opts.restartWorkerOnError === true && !this.starting) {
944 if (workerInfo.dynamic) {
945 this.createAndSetupDynamicWorker()
946 } else {
947 this.createAndSetupWorker()
948 }
949 }
950 if (this.opts.enableTasksQueue === true) {
951 this.redistributeQueuedTasks(workerNodeKey)
952 }
953 })
954 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
955 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
956 worker.once('exit', () => {
957 this.removeWorkerNode(worker)
958 })
959
960 this.addWorkerNode(worker)
961
962 this.afterWorkerSetup(worker)
963
964 return worker
965 }
966
967 /**
968 * Creates a new dynamic worker and sets it up completely in the pool worker nodes.
969 *
970 * @returns New, completely set up dynamic worker.
971 */
972 protected createAndSetupDynamicWorker (): Worker {
973 const worker = this.createAndSetupWorker()
974 this.registerWorkerMessageListener(worker, message => {
975 const workerNodeKey = this.getWorkerNodeKey(worker)
976 if (
977 isKillBehavior(KillBehaviors.HARD, message.kill) ||
978 (message.kill != null &&
979 ((this.opts.enableTasksQueue === false &&
980 this.workerNodes[workerNodeKey].usage.tasks.executing === 0) ||
981 (this.opts.enableTasksQueue === true &&
982 this.workerNodes[workerNodeKey].usage.tasks.executing === 0 &&
983 this.tasksQueueSize(workerNodeKey) === 0)))
984 ) {
985 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
986 void (this.destroyWorker(worker) as Promise<void>)
987 }
988 })
989 const workerInfo = this.getWorkerInfo(this.getWorkerNodeKey(worker))
990 workerInfo.dynamic = true
991 if (this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker) {
992 workerInfo.ready = true
993 }
994 this.sendToWorker(worker, {
995 checkActive: true,
996 workerId: workerInfo.id as number
997 })
998 return worker
999 }
1000
1001 /**
1002 * Registers a listener callback on the given worker.
1003 *
1004 * @param worker - The worker which should register a listener.
1005 * @param listener - The message listener callback.
1006 */
1007 private registerWorkerMessageListener<Message extends Data | Response>(
1008 worker: Worker,
1009 listener: (message: MessageValue<Message>) => void
1010 ): void {
1011 worker.on('message', listener as MessageHandler<Worker>)
1012 }
1013
1014 /**
1015 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
1016 * Can be overridden.
1017 *
1018 * @param worker - The newly created worker.
1019 */
1020 protected afterWorkerSetup (worker: Worker): void {
1021 // Listen to worker messages.
1022 this.registerWorkerMessageListener(worker, this.workerListener())
1023 // Send startup message to worker.
1024 this.sendWorkerStartupMessage(worker)
1025 // Setup worker task statistics computation.
1026 this.setWorkerStatistics(worker)
1027 }
1028
1029 private sendWorkerStartupMessage (worker: Worker): void {
1030 this.sendToWorker(worker, {
1031 ready: false,
1032 workerId: this.getWorkerInfo(this.getWorkerNodeKey(worker)).id as number
1033 })
1034 }
1035
1036 private redistributeQueuedTasks (workerNodeKey: number): void {
1037 while (this.tasksQueueSize(workerNodeKey) > 0) {
1038 let targetWorkerNodeKey: number = workerNodeKey
1039 let minQueuedTasks = Infinity
1040 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
1041 const workerInfo = this.getWorkerInfo(workerNodeId)
1042 if (
1043 workerNodeId !== workerNodeKey &&
1044 workerInfo.ready &&
1045 workerNode.usage.tasks.queued === 0
1046 ) {
1047 targetWorkerNodeKey = workerNodeId
1048 break
1049 }
1050 if (
1051 workerNodeId !== workerNodeKey &&
1052 workerInfo.ready &&
1053 workerNode.usage.tasks.queued < minQueuedTasks
1054 ) {
1055 minQueuedTasks = workerNode.usage.tasks.queued
1056 targetWorkerNodeKey = workerNodeId
1057 }
1058 }
1059 this.enqueueTask(
1060 targetWorkerNodeKey,
1061 this.dequeueTask(workerNodeKey) as Task<Data>
1062 )
1063 }
1064 }
1065
1066 /**
1067 * This function is the listener registered for each worker message.
1068 *
1069 * @returns The listener function to execute when a message is received from a worker.
1070 */
1071 protected workerListener (): (message: MessageValue<Response>) => void {
1072 return message => {
1073 this.checkMessageWorkerId(message)
1074 if (message.ready != null) {
1075 // Worker ready response received
1076 this.handleWorkerReadyResponse(message)
1077 } else if (message.id != null) {
1078 // Task execution response received
1079 this.handleTaskExecutionResponse(message)
1080 }
1081 }
1082 }
1083
1084 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
1085 const worker = this.getWorkerById(message.workerId)
1086 this.getWorkerInfo(this.getWorkerNodeKey(worker as Worker)).ready =
1087 message.ready as boolean
1088 if (this.emitter != null && this.ready) {
1089 this.emitter.emit(PoolEvents.ready, this.info)
1090 }
1091 }
1092
1093 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1094 const promiseResponse = this.promiseResponseMap.get(message.id as string)
1095 if (promiseResponse != null) {
1096 if (message.taskError != null) {
1097 this.emitter?.emit(PoolEvents.taskError, message.taskError)
1098 promiseResponse.reject(message.taskError.message)
1099 } else {
1100 promiseResponse.resolve(message.data as Response)
1101 }
1102 this.afterTaskExecutionHook(promiseResponse.worker, message)
1103 this.promiseResponseMap.delete(message.id as string)
1104 const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
1105 if (
1106 this.opts.enableTasksQueue === true &&
1107 this.tasksQueueSize(workerNodeKey) > 0
1108 ) {
1109 this.executeTask(
1110 workerNodeKey,
1111 this.dequeueTask(workerNodeKey) as Task<Data>
1112 )
1113 }
1114 this.workerChoiceStrategyContext.update(workerNodeKey)
1115 }
1116 }
1117
1118 private checkAndEmitEvents (): void {
1119 if (this.emitter != null) {
1120 if (this.busy) {
1121 this.emitter.emit(PoolEvents.busy, this.info)
1122 }
1123 if (this.type === PoolTypes.dynamic && this.full) {
1124 this.emitter.emit(PoolEvents.full, this.info)
1125 }
1126 }
1127 }
1128
1129 /**
1130 * Gets the worker information.
1131 *
1132 * @param workerNodeKey - The worker node key.
1133 */
1134 private getWorkerInfo (workerNodeKey: number): WorkerInfo {
1135 return this.workerNodes[workerNodeKey].info
1136 }
1137
1138 /**
1139 * Adds the given worker in the pool worker nodes.
1140 *
1141 * @param worker - The worker.
1142 * @returns The worker nodes length.
1143 */
1144 private addWorkerNode (worker: Worker): number {
1145 const workerNode = new WorkerNode<Worker, Data>(worker, this.worker)
1146 // Flag the worker node as ready at pool startup.
1147 if (this.starting) {
1148 workerNode.info.ready = true
1149 }
1150 return this.workerNodes.push(workerNode)
1151 }
1152
1153 /**
1154 * Removes the given worker from the pool worker nodes.
1155 *
1156 * @param worker - The worker.
1157 */
1158 private removeWorkerNode (worker: Worker): void {
1159 const workerNodeKey = this.getWorkerNodeKey(worker)
1160 if (workerNodeKey !== -1) {
1161 this.workerNodes.splice(workerNodeKey, 1)
1162 this.workerChoiceStrategyContext.remove(workerNodeKey)
1163 }
1164 }
1165
1166 /**
1167 * Executes the given task on the given worker.
1168 *
1169 * @param worker - The worker.
1170 * @param task - The task to execute.
1171 */
1172 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1173 this.beforeTaskExecutionHook(workerNodeKey, task)
1174 this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
1175 }
1176
1177 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1178 return this.workerNodes[workerNodeKey].enqueueTask(task)
1179 }
1180
1181 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1182 return this.workerNodes[workerNodeKey].dequeueTask()
1183 }
1184
1185 private tasksQueueSize (workerNodeKey: number): number {
1186 return this.workerNodes[workerNodeKey].tasksQueueSize()
1187 }
1188
1189 private flushTasksQueue (workerNodeKey: number): void {
1190 while (this.tasksQueueSize(workerNodeKey) > 0) {
1191 this.executeTask(
1192 workerNodeKey,
1193 this.dequeueTask(workerNodeKey) as Task<Data>
1194 )
1195 }
1196 this.workerNodes[workerNodeKey].clearTasksQueue()
1197 }
1198
1199 private flushTasksQueues (): void {
1200 for (const [workerNodeKey] of this.workerNodes.entries()) {
1201 this.flushTasksQueue(workerNodeKey)
1202 }
1203 }
1204
1205 private setWorkerStatistics (worker: Worker): void {
1206 this.sendToWorker(worker, {
1207 statistics: {
1208 runTime:
1209 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1210 .runTime.aggregate,
1211 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1212 .elu.aggregate
1213 },
1214 workerId: this.getWorkerInfo(this.getWorkerNodeKey(worker)).id as number
1215 })
1216 }
1217 }