bfd0268aebff7a880c59a1bb77ec67603a16e3f7
[poolifier.git] / src / pools / abstract-pool.ts
1 import crypto from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import type { MessageValue, PromiseResponseWrapper } from '../utility-types'
4 import {
5 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
6 EMPTY_FUNCTION,
7 isKillBehavior,
8 isPlainObject,
9 median,
10 round
11 } from '../utils'
12 import { KillBehaviors } from '../worker/worker-options'
13 import { CircularArray } from '../circular-array'
14 import { Queue } from '../queue'
15 import {
16 type IPool,
17 PoolEmitter,
18 PoolEvents,
19 type PoolInfo,
20 type PoolOptions,
21 type PoolType,
22 PoolTypes,
23 type TasksQueueOptions,
24 type WorkerType,
25 WorkerTypes
26 } from './pool'
27 import type {
28 IWorker,
29 MessageHandler,
30 Task,
31 WorkerInfo,
32 WorkerNode,
33 WorkerUsage
34 } from './worker'
35 import {
36 Measurements,
37 WorkerChoiceStrategies,
38 type WorkerChoiceStrategy,
39 type WorkerChoiceStrategyOptions
40 } from './selection-strategies/selection-strategies-types'
41 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
42 import { version } from './version'
43
44 /**
45 * Base class that implements some shared logic for all poolifier pools.
46 *
47 * @typeParam Worker - Type of worker which manages this pool.
48 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
49 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
50 */
51 export abstract class AbstractPool<
52 Worker extends IWorker,
53 Data = unknown,
54 Response = unknown
55 > implements IPool<Worker, Data, Response> {
56 /** @inheritDoc */
57 public readonly workerNodes: Array<WorkerNode<Worker, Data>> = []
58
59 /** @inheritDoc */
60 public readonly emitter?: PoolEmitter
61
62 /**
63 * The execution response promise map.
64 *
65 * - `key`: The message id of each submitted task.
66 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
67 *
68 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
69 */
70 protected promiseResponseMap: Map<
71 string,
72 PromiseResponseWrapper<Worker, Response>
73 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
74
75 /**
76 * Worker choice strategy context referencing a worker choice algorithm implementation.
77 */
78 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
79 Worker,
80 Data,
81 Response
82 >
83
84 /**
85 * The start timestamp of the pool.
86 */
87 private readonly startTimestamp
88
89 /**
90 * Constructs a new poolifier pool.
91 *
92 * @param numberOfWorkers - Number of workers that this pool should manage.
93 * @param filePath - Path to the worker file.
94 * @param opts - Options for the pool.
95 */
96 public constructor (
97 protected readonly numberOfWorkers: number,
98 protected readonly filePath: string,
99 protected readonly opts: PoolOptions<Worker>
100 ) {
101 if (!this.isMain()) {
102 throw new Error('Cannot start a pool from a worker!')
103 }
104 this.checkNumberOfWorkers(this.numberOfWorkers)
105 this.checkFilePath(this.filePath)
106 this.checkPoolOptions(this.opts)
107
108 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
109 this.executeTask = this.executeTask.bind(this)
110 this.enqueueTask = this.enqueueTask.bind(this)
111 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
112
113 if (this.opts.enableEvents === true) {
114 this.emitter = new PoolEmitter()
115 }
116 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
117 Worker,
118 Data,
119 Response
120 >(
121 this,
122 this.opts.workerChoiceStrategy,
123 this.opts.workerChoiceStrategyOptions
124 )
125
126 this.setupHook()
127
128 while (this.workerNodes.length < this.numberOfWorkers) {
129 this.createAndSetupWorker()
130 }
131
132 this.startTimestamp = performance.now()
133 }
134
135 private checkFilePath (filePath: string): void {
136 if (
137 filePath == null ||
138 (typeof filePath === 'string' && filePath.trim().length === 0)
139 ) {
140 throw new Error('Please specify a file with a worker implementation')
141 }
142 }
143
144 private checkNumberOfWorkers (numberOfWorkers: number): void {
145 if (numberOfWorkers == null) {
146 throw new Error(
147 'Cannot instantiate a pool without specifying the number of workers'
148 )
149 } else if (!Number.isSafeInteger(numberOfWorkers)) {
150 throw new TypeError(
151 'Cannot instantiate a pool with a non safe integer number of workers'
152 )
153 } else if (numberOfWorkers < 0) {
154 throw new RangeError(
155 'Cannot instantiate a pool with a negative number of workers'
156 )
157 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
158 throw new Error('Cannot instantiate a fixed pool with no worker')
159 }
160 }
161
162 private checkPoolOptions (opts: PoolOptions<Worker>): void {
163 if (isPlainObject(opts)) {
164 this.opts.workerChoiceStrategy =
165 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
166 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
167 this.opts.workerChoiceStrategyOptions =
168 opts.workerChoiceStrategyOptions ??
169 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
170 this.checkValidWorkerChoiceStrategyOptions(
171 this.opts.workerChoiceStrategyOptions
172 )
173 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
174 this.opts.enableEvents = opts.enableEvents ?? true
175 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
176 if (this.opts.enableTasksQueue) {
177 this.checkValidTasksQueueOptions(
178 opts.tasksQueueOptions as TasksQueueOptions
179 )
180 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
181 opts.tasksQueueOptions as TasksQueueOptions
182 )
183 }
184 } else {
185 throw new TypeError('Invalid pool options: must be a plain object')
186 }
187 }
188
189 private checkValidWorkerChoiceStrategy (
190 workerChoiceStrategy: WorkerChoiceStrategy
191 ): void {
192 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
193 throw new Error(
194 `Invalid worker choice strategy '${workerChoiceStrategy}'`
195 )
196 }
197 }
198
199 private checkValidWorkerChoiceStrategyOptions (
200 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
201 ): void {
202 if (!isPlainObject(workerChoiceStrategyOptions)) {
203 throw new TypeError(
204 'Invalid worker choice strategy options: must be a plain object'
205 )
206 }
207 if (
208 workerChoiceStrategyOptions.weights != null &&
209 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
210 ) {
211 throw new Error(
212 'Invalid worker choice strategy options: must have a weight for each worker node'
213 )
214 }
215 if (
216 workerChoiceStrategyOptions.measurement != null &&
217 !Object.values(Measurements).includes(
218 workerChoiceStrategyOptions.measurement
219 )
220 ) {
221 throw new Error(
222 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
223 )
224 }
225 }
226
227 private checkValidTasksQueueOptions (
228 tasksQueueOptions: TasksQueueOptions
229 ): void {
230 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
231 throw new TypeError('Invalid tasks queue options: must be a plain object')
232 }
233 if (
234 tasksQueueOptions?.concurrency != null &&
235 !Number.isSafeInteger(tasksQueueOptions.concurrency)
236 ) {
237 throw new TypeError(
238 'Invalid worker tasks concurrency: must be an integer'
239 )
240 }
241 if (
242 tasksQueueOptions?.concurrency != null &&
243 tasksQueueOptions.concurrency <= 0
244 ) {
245 throw new Error(
246 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
247 )
248 }
249 }
250
251 /** @inheritDoc */
252 public get info (): PoolInfo {
253 return {
254 version,
255 type: this.type,
256 worker: this.worker,
257 minSize: this.minSize,
258 maxSize: this.maxSize,
259 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
260 .runTime.aggregate &&
261 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
262 .waitTime.aggregate && { utilization: round(this.utilization) }),
263 workerNodes: this.workerNodes.length,
264 idleWorkerNodes: this.workerNodes.reduce(
265 (accumulator, workerNode) =>
266 workerNode.usage.tasks.executing === 0
267 ? accumulator + 1
268 : accumulator,
269 0
270 ),
271 busyWorkerNodes: this.workerNodes.reduce(
272 (accumulator, workerNode) =>
273 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
274 0
275 ),
276 executedTasks: this.workerNodes.reduce(
277 (accumulator, workerNode) =>
278 accumulator + workerNode.usage.tasks.executed,
279 0
280 ),
281 executingTasks: this.workerNodes.reduce(
282 (accumulator, workerNode) =>
283 accumulator + workerNode.usage.tasks.executing,
284 0
285 ),
286 queuedTasks: this.workerNodes.reduce(
287 (accumulator, workerNode) =>
288 accumulator + workerNode.usage.tasks.queued,
289 0
290 ),
291 maxQueuedTasks: this.workerNodes.reduce(
292 (accumulator, workerNode) =>
293 accumulator + workerNode.usage.tasks.maxQueued,
294 0
295 ),
296 failedTasks: this.workerNodes.reduce(
297 (accumulator, workerNode) =>
298 accumulator + workerNode.usage.tasks.failed,
299 0
300 )
301 }
302 }
303
304 /**
305 * Gets the approximate pool utilization.
306 *
307 * @returns The pool utilization.
308 */
309 private get utilization (): number {
310 const poolRunTimeCapacity =
311 (performance.now() - this.startTimestamp) * this.maxSize
312 const totalTasksRunTime = this.workerNodes.reduce(
313 (accumulator, workerNode) =>
314 accumulator + workerNode.usage.runTime.aggregate,
315 0
316 )
317 const totalTasksWaitTime = this.workerNodes.reduce(
318 (accumulator, workerNode) =>
319 accumulator + workerNode.usage.waitTime.aggregate,
320 0
321 )
322 return (totalTasksRunTime + totalTasksWaitTime) / poolRunTimeCapacity
323 }
324
325 /**
326 * Pool type.
327 *
328 * If it is `'dynamic'`, it provides the `max` property.
329 */
330 protected abstract get type (): PoolType
331
332 /**
333 * Gets the worker type.
334 */
335 protected abstract get worker (): WorkerType
336
337 /**
338 * Pool minimum size.
339 */
340 protected abstract get minSize (): number
341
342 /**
343 * Pool maximum size.
344 */
345 protected abstract get maxSize (): number
346
347 /**
348 * Get the worker given its id.
349 *
350 * @param workerId - The worker id.
351 * @returns The worker if found in the pool worker nodes, `undefined` otherwise.
352 */
353 private getWorkerById (workerId: number): Worker | undefined {
354 return this.workerNodes.find(workerNode => workerNode.info.id === workerId)
355 ?.worker
356 }
357
358 /**
359 * Gets the given worker its worker node key.
360 *
361 * @param worker - The worker.
362 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
363 */
364 private getWorkerNodeKey (worker: Worker): number {
365 return this.workerNodes.findIndex(
366 workerNode => workerNode.worker === worker
367 )
368 }
369
370 /** @inheritDoc */
371 public setWorkerChoiceStrategy (
372 workerChoiceStrategy: WorkerChoiceStrategy,
373 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
374 ): void {
375 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
376 this.opts.workerChoiceStrategy = workerChoiceStrategy
377 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
378 this.opts.workerChoiceStrategy
379 )
380 if (workerChoiceStrategyOptions != null) {
381 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
382 }
383 for (const workerNode of this.workerNodes) {
384 this.setWorkerNodeTasksUsage(
385 workerNode,
386 this.getInitialWorkerUsage(workerNode.worker)
387 )
388 this.setWorkerStatistics(workerNode.worker)
389 }
390 }
391
392 /** @inheritDoc */
393 public setWorkerChoiceStrategyOptions (
394 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
395 ): void {
396 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
397 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
398 this.workerChoiceStrategyContext.setOptions(
399 this.opts.workerChoiceStrategyOptions
400 )
401 }
402
403 /** @inheritDoc */
404 public enableTasksQueue (
405 enable: boolean,
406 tasksQueueOptions?: TasksQueueOptions
407 ): void {
408 if (this.opts.enableTasksQueue === true && !enable) {
409 this.flushTasksQueues()
410 }
411 this.opts.enableTasksQueue = enable
412 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
413 }
414
415 /** @inheritDoc */
416 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
417 if (this.opts.enableTasksQueue === true) {
418 this.checkValidTasksQueueOptions(tasksQueueOptions)
419 this.opts.tasksQueueOptions =
420 this.buildTasksQueueOptions(tasksQueueOptions)
421 } else if (this.opts.tasksQueueOptions != null) {
422 delete this.opts.tasksQueueOptions
423 }
424 }
425
426 private buildTasksQueueOptions (
427 tasksQueueOptions: TasksQueueOptions
428 ): TasksQueueOptions {
429 return {
430 concurrency: tasksQueueOptions?.concurrency ?? 1
431 }
432 }
433
434 /**
435 * Whether the pool is full or not.
436 *
437 * The pool filling boolean status.
438 */
439 protected get full (): boolean {
440 return this.workerNodes.length >= this.maxSize
441 }
442
443 /**
444 * Whether the pool is busy or not.
445 *
446 * The pool busyness boolean status.
447 */
448 protected abstract get busy (): boolean
449
450 /**
451 * Whether worker nodes are executing at least one task.
452 *
453 * @returns Worker nodes busyness boolean status.
454 */
455 protected internalBusy (): boolean {
456 return (
457 this.workerNodes.findIndex(workerNode => {
458 return workerNode.usage.tasks.executing === 0
459 }) === -1
460 )
461 }
462
463 /** @inheritDoc */
464 public async execute (data?: Data, name?: string): Promise<Response> {
465 const timestamp = performance.now()
466 const workerNodeKey = this.chooseWorkerNode()
467 const submittedTask: Task<Data> = {
468 name,
469 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
470 data: data ?? ({} as Data),
471 timestamp,
472 id: crypto.randomUUID()
473 }
474 const res = new Promise<Response>((resolve, reject) => {
475 this.promiseResponseMap.set(submittedTask.id as string, {
476 resolve,
477 reject,
478 worker: this.workerNodes[workerNodeKey].worker
479 })
480 })
481 if (
482 this.opts.enableTasksQueue === true &&
483 (this.busy ||
484 this.workerNodes[workerNodeKey].usage.tasks.executing >=
485 ((this.opts.tasksQueueOptions as TasksQueueOptions)
486 .concurrency as number))
487 ) {
488 this.enqueueTask(workerNodeKey, submittedTask)
489 } else {
490 this.executeTask(workerNodeKey, submittedTask)
491 }
492 this.checkAndEmitEvents()
493 // eslint-disable-next-line @typescript-eslint/return-await
494 return res
495 }
496
497 /** @inheritDoc */
498 public async destroy (): Promise<void> {
499 await Promise.all(
500 this.workerNodes.map(async (workerNode, workerNodeKey) => {
501 this.flushTasksQueue(workerNodeKey)
502 // FIXME: wait for tasks to be finished
503 const workerExitPromise = new Promise<void>(resolve => {
504 workerNode.worker.on('exit', () => {
505 resolve()
506 })
507 })
508 await this.destroyWorker(workerNode.worker)
509 await workerExitPromise
510 })
511 )
512 }
513
514 /**
515 * Terminates the given worker.
516 *
517 * @param worker - A worker within `workerNodes`.
518 */
519 protected abstract destroyWorker (worker: Worker): void | Promise<void>
520
521 /**
522 * Setup hook to execute code before worker nodes are created in the abstract constructor.
523 * Can be overridden.
524 *
525 * @virtual
526 */
527 protected setupHook (): void {
528 // Intentionally empty
529 }
530
531 /**
532 * Should return whether the worker is the main worker or not.
533 */
534 protected abstract isMain (): boolean
535
536 /**
537 * Hook executed before the worker task execution.
538 * Can be overridden.
539 *
540 * @param workerNodeKey - The worker node key.
541 * @param task - The task to execute.
542 */
543 protected beforeTaskExecutionHook (
544 workerNodeKey: number,
545 task: Task<Data>
546 ): void {
547 const workerUsage = this.workerNodes[workerNodeKey].usage
548 ++workerUsage.tasks.executing
549 this.updateWaitTimeWorkerUsage(workerUsage, task)
550 }
551
552 /**
553 * Hook executed after the worker task execution.
554 * Can be overridden.
555 *
556 * @param worker - The worker.
557 * @param message - The received message.
558 */
559 protected afterTaskExecutionHook (
560 worker: Worker,
561 message: MessageValue<Response>
562 ): void {
563 const workerUsage = this.workerNodes[this.getWorkerNodeKey(worker)].usage
564 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
565 this.updateRunTimeWorkerUsage(workerUsage, message)
566 this.updateEluWorkerUsage(workerUsage, message)
567 }
568
569 private updateTaskStatisticsWorkerUsage (
570 workerUsage: WorkerUsage,
571 message: MessageValue<Response>
572 ): void {
573 const workerTaskStatistics = workerUsage.tasks
574 --workerTaskStatistics.executing
575 ++workerTaskStatistics.executed
576 if (message.taskError != null) {
577 ++workerTaskStatistics.failed
578 }
579 }
580
581 private updateRunTimeWorkerUsage (
582 workerUsage: WorkerUsage,
583 message: MessageValue<Response>
584 ): void {
585 if (
586 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
587 .aggregate
588 ) {
589 const taskRunTime = message.taskPerformance?.runTime ?? 0
590 workerUsage.runTime.aggregate += taskRunTime
591 workerUsage.runTime.minimum = Math.min(
592 taskRunTime,
593 workerUsage.runTime.minimum ?? Infinity
594 )
595 workerUsage.runTime.maximum = Math.max(
596 taskRunTime,
597 workerUsage.runTime.maximum ?? -Infinity
598 )
599 if (
600 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
601 .average &&
602 workerUsage.tasks.executed !== 0
603 ) {
604 workerUsage.runTime.average =
605 workerUsage.runTime.aggregate /
606 (workerUsage.tasks.executed - workerUsage.tasks.failed)
607 }
608 if (
609 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
610 .median &&
611 message.taskPerformance?.runTime != null
612 ) {
613 workerUsage.runTime.history.push(message.taskPerformance.runTime)
614 workerUsage.runTime.median = median(workerUsage.runTime.history)
615 }
616 }
617 }
618
619 private updateWaitTimeWorkerUsage (
620 workerUsage: WorkerUsage,
621 task: Task<Data>
622 ): void {
623 const timestamp = performance.now()
624 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
625 if (
626 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime
627 .aggregate
628 ) {
629 workerUsage.waitTime.aggregate += taskWaitTime
630 workerUsage.waitTime.minimum = Math.min(
631 taskWaitTime,
632 workerUsage.waitTime.minimum ?? Infinity
633 )
634 workerUsage.waitTime.maximum = Math.max(
635 taskWaitTime,
636 workerUsage.waitTime.maximum ?? -Infinity
637 )
638 if (
639 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
640 .waitTime.average &&
641 workerUsage.tasks.executed !== 0
642 ) {
643 workerUsage.waitTime.average =
644 workerUsage.waitTime.aggregate /
645 (workerUsage.tasks.executed - workerUsage.tasks.failed)
646 }
647 if (
648 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
649 .waitTime.median &&
650 taskWaitTime != null
651 ) {
652 workerUsage.waitTime.history.push(taskWaitTime)
653 workerUsage.waitTime.median = median(workerUsage.waitTime.history)
654 }
655 }
656 }
657
658 private updateEluWorkerUsage (
659 workerUsage: WorkerUsage,
660 message: MessageValue<Response>
661 ): void {
662 if (
663 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
664 .aggregate
665 ) {
666 if (message.taskPerformance?.elu != null) {
667 workerUsage.elu.idle.aggregate += message.taskPerformance.elu.idle
668 workerUsage.elu.active.aggregate += message.taskPerformance.elu.active
669 if (workerUsage.elu.utilization != null) {
670 workerUsage.elu.utilization =
671 (workerUsage.elu.utilization +
672 message.taskPerformance.elu.utilization) /
673 2
674 } else {
675 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
676 }
677 workerUsage.elu.idle.minimum = Math.min(
678 message.taskPerformance.elu.idle,
679 workerUsage.elu.idle.minimum ?? Infinity
680 )
681 workerUsage.elu.idle.maximum = Math.max(
682 message.taskPerformance.elu.idle,
683 workerUsage.elu.idle.maximum ?? -Infinity
684 )
685 workerUsage.elu.active.minimum = Math.min(
686 message.taskPerformance.elu.active,
687 workerUsage.elu.active.minimum ?? Infinity
688 )
689 workerUsage.elu.active.maximum = Math.max(
690 message.taskPerformance.elu.active,
691 workerUsage.elu.active.maximum ?? -Infinity
692 )
693 }
694 if (
695 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
696 .average &&
697 workerUsage.tasks.executed !== 0
698 ) {
699 const executedTasks =
700 workerUsage.tasks.executed - workerUsage.tasks.failed
701 workerUsage.elu.idle.average =
702 workerUsage.elu.idle.aggregate / executedTasks
703 workerUsage.elu.active.average =
704 workerUsage.elu.active.aggregate / executedTasks
705 }
706 if (
707 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
708 .median &&
709 message.taskPerformance?.elu != null
710 ) {
711 workerUsage.elu.idle.history.push(message.taskPerformance.elu.idle)
712 workerUsage.elu.active.history.push(message.taskPerformance.elu.active)
713 workerUsage.elu.idle.median = median(workerUsage.elu.idle.history)
714 workerUsage.elu.active.median = median(workerUsage.elu.active.history)
715 }
716 }
717 }
718
719 /**
720 * Chooses a worker node for the next task.
721 *
722 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
723 *
724 * @returns The worker node key
725 */
726 private chooseWorkerNode (): number {
727 if (this.shallCreateDynamicWorker()) {
728 const worker = this.createAndSetupDynamicWorker()
729 if (
730 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
731 ) {
732 return this.getWorkerNodeKey(worker)
733 }
734 }
735 return this.workerChoiceStrategyContext.execute()
736 }
737
738 /**
739 * Conditions for dynamic worker creation.
740 *
741 * @returns Whether to create a dynamic worker or not.
742 */
743 private shallCreateDynamicWorker (): boolean {
744 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
745 }
746
747 /**
748 * Sends a message to the given worker.
749 *
750 * @param worker - The worker which should receive the message.
751 * @param message - The message.
752 */
753 protected abstract sendToWorker (
754 worker: Worker,
755 message: MessageValue<Data>
756 ): void
757
758 /**
759 * Registers a listener callback on the given worker.
760 *
761 * @param worker - The worker which should register a listener.
762 * @param listener - The message listener callback.
763 */
764 private registerWorkerMessageListener<Message extends Data | Response>(
765 worker: Worker,
766 listener: (message: MessageValue<Message>) => void
767 ): void {
768 worker.on('message', listener as MessageHandler<Worker>)
769 }
770
771 /**
772 * Creates a new worker.
773 *
774 * @returns Newly created worker.
775 */
776 protected abstract createWorker (): Worker
777
778 /**
779 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
780 * Can be overridden.
781 *
782 * @param worker - The newly created worker.
783 */
784 protected afterWorkerSetup (worker: Worker): void {
785 // Listen to worker messages.
786 this.registerWorkerMessageListener(worker, this.workerListener())
787 }
788
789 /**
790 * Creates a new worker and sets it up completely in the pool worker nodes.
791 *
792 * @returns New, completely set up worker.
793 */
794 protected createAndSetupWorker (): Worker {
795 const worker = this.createWorker()
796
797 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
798 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
799 worker.on('error', error => {
800 if (this.emitter != null) {
801 this.emitter.emit(PoolEvents.error, error)
802 }
803 if (this.opts.enableTasksQueue === true) {
804 const workerNodeKey = this.getWorkerNodeKey(worker)
805 while (this.tasksQueueSize(workerNodeKey) > 0) {
806 let targetWorkerNodeKey: number = workerNodeKey
807 let minQueuedTasks = Infinity
808 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
809 if (
810 workerNodeId !== workerNodeKey &&
811 workerNode.usage.tasks.queued === 0
812 ) {
813 targetWorkerNodeKey = workerNodeId
814 break
815 }
816 if (
817 workerNodeId !== workerNodeKey &&
818 workerNode.usage.tasks.queued < minQueuedTasks
819 ) {
820 minQueuedTasks = workerNode.usage.tasks.queued
821 targetWorkerNodeKey = workerNodeId
822 }
823 }
824 this.enqueueTask(
825 targetWorkerNodeKey,
826 this.dequeueTask(workerNodeKey) as Task<Data>
827 )
828 }
829 }
830 if (this.opts.restartWorkerOnError === true) {
831 if (this.getWorkerInfo(this.getWorkerNodeKey(worker)).dynamic) {
832 this.createAndSetupDynamicWorker()
833 } else {
834 this.createAndSetupWorker()
835 }
836 }
837 })
838 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
839 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
840 worker.once('exit', () => {
841 this.removeWorkerNode(worker)
842 })
843
844 this.pushWorkerNode(worker)
845
846 this.setWorkerStatistics(worker)
847
848 this.afterWorkerSetup(worker)
849
850 return worker
851 }
852
853 /**
854 * Creates a new dynamic worker and sets it up completely in the pool worker nodes.
855 *
856 * @returns New, completely set up dynamic worker.
857 */
858 protected createAndSetupDynamicWorker (): Worker {
859 const worker = this.createAndSetupWorker()
860 this.getWorkerInfo(this.getWorkerNodeKey(worker)).dynamic = true
861 this.registerWorkerMessageListener(worker, message => {
862 const workerNodeKey = this.getWorkerNodeKey(worker)
863 if (
864 isKillBehavior(KillBehaviors.HARD, message.kill) ||
865 (message.kill != null &&
866 ((this.opts.enableTasksQueue === false &&
867 this.workerNodes[workerNodeKey].usage.tasks.executing === 0) ||
868 (this.opts.enableTasksQueue === true &&
869 this.workerNodes[workerNodeKey].usage.tasks.executing === 0 &&
870 this.tasksQueueSize(workerNodeKey) === 0)))
871 ) {
872 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
873 void (this.destroyWorker(worker) as Promise<void>)
874 }
875 })
876 return worker
877 }
878
879 /**
880 * This function is the listener registered for each worker message.
881 *
882 * @returns The listener function to execute when a message is received from a worker.
883 */
884 protected workerListener (): (message: MessageValue<Response>) => void {
885 return message => {
886 if (message.workerId != null && message.started != null) {
887 // Worker started message received
888 this.handleWorkerStartedMessage(message)
889 } else if (message.id != null) {
890 // Task execution response received
891 this.handleTaskExecutionResponse(message)
892 }
893 }
894 }
895
896 private handleWorkerStartedMessage (message: MessageValue<Response>): void {
897 // Worker started message received
898 const worker = this.getWorkerById(message.workerId as number)
899 if (worker != null) {
900 this.workerNodes[this.getWorkerNodeKey(worker)].info.started =
901 message.started as boolean
902 } else {
903 throw new Error(
904 `Worker started message received from unknown worker '${
905 message.workerId as number
906 }'`
907 )
908 }
909 }
910
911 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
912 const promiseResponse = this.promiseResponseMap.get(message.id as string)
913 if (promiseResponse != null) {
914 if (message.taskError != null) {
915 if (this.emitter != null) {
916 this.emitter.emit(PoolEvents.taskError, message.taskError)
917 }
918 promiseResponse.reject(message.taskError.message)
919 } else {
920 promiseResponse.resolve(message.data as Response)
921 }
922 this.afterTaskExecutionHook(promiseResponse.worker, message)
923 this.promiseResponseMap.delete(message.id as string)
924 const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
925 if (
926 this.opts.enableTasksQueue === true &&
927 this.tasksQueueSize(workerNodeKey) > 0
928 ) {
929 this.executeTask(
930 workerNodeKey,
931 this.dequeueTask(workerNodeKey) as Task<Data>
932 )
933 }
934 this.workerChoiceStrategyContext.update(workerNodeKey)
935 }
936 }
937
938 private checkAndEmitEvents (): void {
939 if (this.emitter != null) {
940 if (this.busy) {
941 this.emitter?.emit(PoolEvents.busy, this.info)
942 }
943 if (this.type === PoolTypes.dynamic && this.full) {
944 this.emitter?.emit(PoolEvents.full, this.info)
945 }
946 }
947 }
948
949 /**
950 * Sets the given worker node its tasks usage in the pool.
951 *
952 * @param workerNode - The worker node.
953 * @param workerUsage - The worker usage.
954 */
955 private setWorkerNodeTasksUsage (
956 workerNode: WorkerNode<Worker, Data>,
957 workerUsage: WorkerUsage
958 ): void {
959 workerNode.usage = workerUsage
960 }
961
962 /**
963 * Gets the worker information.
964 *
965 * @param workerNodeKey - The worker node key.
966 */
967 private getWorkerInfo (workerNodeKey: number): WorkerInfo {
968 return this.workerNodes[workerNodeKey].info
969 }
970
971 /**
972 * Pushes the given worker in the pool worker nodes.
973 *
974 * @param worker - The worker.
975 * @returns The worker nodes length.
976 */
977 private pushWorkerNode (worker: Worker): number {
978 this.workerNodes.push({
979 worker,
980 info: this.getInitialWorkerInfo(worker),
981 usage: this.getInitialWorkerUsage(),
982 tasksQueue: new Queue<Task<Data>>()
983 })
984 this.setWorkerNodeTasksUsage(
985 this.workerNodes[this.getWorkerNodeKey(worker)],
986 this.getInitialWorkerUsage(worker)
987 )
988 return this.workerNodes.length
989 }
990
991 /**
992 * Gets the worker id.
993 *
994 * @param worker - The worker.
995 * @returns The worker id.
996 */
997 private getWorkerId (worker: Worker): number | undefined {
998 if (this.worker === WorkerTypes.thread) {
999 return worker.threadId
1000 } else if (this.worker === WorkerTypes.cluster) {
1001 return worker.id
1002 }
1003 }
1004
1005 // /**
1006 // * Sets the given worker in the pool worker nodes.
1007 // *
1008 // * @param workerNodeKey - The worker node key.
1009 // * @param worker - The worker.
1010 // * @param workerInfo - The worker info.
1011 // * @param workerUsage - The worker usage.
1012 // * @param tasksQueue - The worker task queue.
1013 // */
1014 // private setWorkerNode (
1015 // workerNodeKey: number,
1016 // worker: Worker,
1017 // workerInfo: WorkerInfo,
1018 // workerUsage: WorkerUsage,
1019 // tasksQueue: Queue<Task<Data>>
1020 // ): void {
1021 // this.workerNodes[workerNodeKey] = {
1022 // worker,
1023 // info: workerInfo,
1024 // usage: workerUsage,
1025 // tasksQueue
1026 // }
1027 // }
1028
1029 /**
1030 * Removes the given worker from the pool worker nodes.
1031 *
1032 * @param worker - The worker.
1033 */
1034 private removeWorkerNode (worker: Worker): void {
1035 const workerNodeKey = this.getWorkerNodeKey(worker)
1036 if (workerNodeKey !== -1) {
1037 this.workerNodes.splice(workerNodeKey, 1)
1038 this.workerChoiceStrategyContext.remove(workerNodeKey)
1039 }
1040 }
1041
1042 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1043 this.beforeTaskExecutionHook(workerNodeKey, task)
1044 this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
1045 }
1046
1047 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1048 return this.workerNodes[workerNodeKey].tasksQueue.enqueue(task)
1049 }
1050
1051 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1052 return this.workerNodes[workerNodeKey].tasksQueue.dequeue()
1053 }
1054
1055 private tasksQueueSize (workerNodeKey: number): number {
1056 return this.workerNodes[workerNodeKey].tasksQueue.size
1057 }
1058
1059 private tasksMaxQueueSize (workerNodeKey: number): number {
1060 return this.workerNodes[workerNodeKey].tasksQueue.maxSize
1061 }
1062
1063 private flushTasksQueue (workerNodeKey: number): void {
1064 while (this.tasksQueueSize(workerNodeKey) > 0) {
1065 this.executeTask(
1066 workerNodeKey,
1067 this.dequeueTask(workerNodeKey) as Task<Data>
1068 )
1069 }
1070 this.workerNodes[workerNodeKey].tasksQueue.clear()
1071 }
1072
1073 private flushTasksQueues (): void {
1074 for (const [workerNodeKey] of this.workerNodes.entries()) {
1075 this.flushTasksQueue(workerNodeKey)
1076 }
1077 }
1078
1079 private setWorkerStatistics (worker: Worker): void {
1080 this.sendToWorker(worker, {
1081 statistics: {
1082 runTime:
1083 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1084 .runTime.aggregate,
1085 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1086 .elu.aggregate
1087 }
1088 })
1089 }
1090
1091 private getInitialWorkerUsage (worker?: Worker): WorkerUsage {
1092 const getTasksQueueSize = (worker?: Worker): number => {
1093 return worker != null
1094 ? this.tasksQueueSize(this.getWorkerNodeKey(worker))
1095 : 0
1096 }
1097 const getTasksMaxQueueSize = (worker?: Worker): number => {
1098 return worker != null
1099 ? this.tasksMaxQueueSize(this.getWorkerNodeKey(worker))
1100 : 0
1101 }
1102 return {
1103 tasks: {
1104 executed: 0,
1105 executing: 0,
1106 get queued (): number {
1107 return getTasksQueueSize(worker)
1108 },
1109 get maxQueued (): number {
1110 return getTasksMaxQueueSize(worker)
1111 },
1112 failed: 0
1113 },
1114 runTime: {
1115 aggregate: 0,
1116 maximum: 0,
1117 minimum: 0,
1118 average: 0,
1119 median: 0,
1120 history: new CircularArray()
1121 },
1122 waitTime: {
1123 aggregate: 0,
1124 maximum: 0,
1125 minimum: 0,
1126 average: 0,
1127 median: 0,
1128 history: new CircularArray()
1129 },
1130 elu: {
1131 idle: {
1132 aggregate: 0,
1133 maximum: 0,
1134 minimum: 0,
1135 average: 0,
1136 median: 0,
1137 history: new CircularArray()
1138 },
1139 active: {
1140 aggregate: 0,
1141 maximum: 0,
1142 minimum: 0,
1143 average: 0,
1144 median: 0,
1145 history: new CircularArray()
1146 }
1147 }
1148 }
1149 }
1150
1151 private getInitialWorkerInfo (worker: Worker): WorkerInfo {
1152 return { id: this.getWorkerId(worker), dynamic: false, started: true }
1153 }
1154 }