refactor: improve task error message
[poolifier.git] / src / pools / abstract-pool.ts
1 import crypto from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import type { MessageValue, PromiseResponseWrapper } from '../utility-types'
4 import {
5 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
6 EMPTY_FUNCTION,
7 isPlainObject,
8 median,
9 round
10 } from '../utils'
11 import { KillBehaviors, isKillBehavior } from '../worker/worker-options'
12 import { CircularArray } from '../circular-array'
13 import { Queue } from '../queue'
14 import {
15 type IPool,
16 PoolEmitter,
17 PoolEvents,
18 type PoolInfo,
19 type PoolOptions,
20 type PoolType,
21 PoolTypes,
22 type TasksQueueOptions,
23 type WorkerType,
24 WorkerTypes
25 } from './pool'
26 import type {
27 IWorker,
28 MessageHandler,
29 Task,
30 WorkerNode,
31 WorkerUsage
32 } from './worker'
33 import {
34 Measurements,
35 WorkerChoiceStrategies,
36 type WorkerChoiceStrategy,
37 type WorkerChoiceStrategyOptions
38 } from './selection-strategies/selection-strategies-types'
39 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
40
41 /**
42 * Base class that implements some shared logic for all poolifier pools.
43 *
44 * @typeParam Worker - Type of worker which manages this pool.
45 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
46 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
47 */
48 export abstract class AbstractPool<
49 Worker extends IWorker,
50 Data = unknown,
51 Response = unknown
52 > implements IPool<Worker, Data, Response> {
53 /** @inheritDoc */
54 public readonly workerNodes: Array<WorkerNode<Worker, Data>> = []
55
56 /** @inheritDoc */
57 public readonly emitter?: PoolEmitter
58
59 /**
60 * The execution response promise map.
61 *
62 * - `key`: The message id of each submitted task.
63 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
64 *
65 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
66 */
67 protected promiseResponseMap: Map<
68 string,
69 PromiseResponseWrapper<Worker, Response>
70 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
71
72 /**
73 * Worker choice strategy context referencing a worker choice algorithm implementation.
74 */
75 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
76 Worker,
77 Data,
78 Response
79 >
80
81 /**
82 * The start timestamp of the pool.
83 */
84 private readonly startTimestamp
85
86 /**
87 * Constructs a new poolifier pool.
88 *
89 * @param numberOfWorkers - Number of workers that this pool should manage.
90 * @param filePath - Path to the worker file.
91 * @param opts - Options for the pool.
92 */
93 public constructor (
94 protected readonly numberOfWorkers: number,
95 protected readonly filePath: string,
96 protected readonly opts: PoolOptions<Worker>
97 ) {
98 if (!this.isMain()) {
99 throw new Error('Cannot start a pool from a worker!')
100 }
101 this.checkNumberOfWorkers(this.numberOfWorkers)
102 this.checkFilePath(this.filePath)
103 this.checkPoolOptions(this.opts)
104
105 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
106 this.executeTask = this.executeTask.bind(this)
107 this.enqueueTask = this.enqueueTask.bind(this)
108 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
109
110 if (this.opts.enableEvents === true) {
111 this.emitter = new PoolEmitter()
112 }
113 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
114 Worker,
115 Data,
116 Response
117 >(
118 this,
119 this.opts.workerChoiceStrategy,
120 this.opts.workerChoiceStrategyOptions
121 )
122
123 this.setupHook()
124
125 while (this.workerNodes.length < this.numberOfWorkers) {
126 this.createAndSetupWorker()
127 }
128
129 this.startTimestamp = performance.now()
130 }
131
132 private checkFilePath (filePath: string): void {
133 if (
134 filePath == null ||
135 (typeof filePath === 'string' && filePath.trim().length === 0)
136 ) {
137 throw new Error('Please specify a file with a worker implementation')
138 }
139 }
140
141 private checkNumberOfWorkers (numberOfWorkers: number): void {
142 if (numberOfWorkers == null) {
143 throw new Error(
144 'Cannot instantiate a pool without specifying the number of workers'
145 )
146 } else if (!Number.isSafeInteger(numberOfWorkers)) {
147 throw new TypeError(
148 'Cannot instantiate a pool with a non safe integer number of workers'
149 )
150 } else if (numberOfWorkers < 0) {
151 throw new RangeError(
152 'Cannot instantiate a pool with a negative number of workers'
153 )
154 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
155 throw new Error('Cannot instantiate a fixed pool with no worker')
156 }
157 }
158
159 private checkPoolOptions (opts: PoolOptions<Worker>): void {
160 if (isPlainObject(opts)) {
161 this.opts.workerChoiceStrategy =
162 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
163 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
164 this.opts.workerChoiceStrategyOptions =
165 opts.workerChoiceStrategyOptions ??
166 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
167 this.checkValidWorkerChoiceStrategyOptions(
168 this.opts.workerChoiceStrategyOptions
169 )
170 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
171 this.opts.enableEvents = opts.enableEvents ?? true
172 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
173 if (this.opts.enableTasksQueue) {
174 this.checkValidTasksQueueOptions(
175 opts.tasksQueueOptions as TasksQueueOptions
176 )
177 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
178 opts.tasksQueueOptions as TasksQueueOptions
179 )
180 }
181 } else {
182 throw new TypeError('Invalid pool options: must be a plain object')
183 }
184 }
185
186 private checkValidWorkerChoiceStrategy (
187 workerChoiceStrategy: WorkerChoiceStrategy
188 ): void {
189 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
190 throw new Error(
191 `Invalid worker choice strategy '${workerChoiceStrategy}'`
192 )
193 }
194 }
195
196 private checkValidWorkerChoiceStrategyOptions (
197 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
198 ): void {
199 if (!isPlainObject(workerChoiceStrategyOptions)) {
200 throw new TypeError(
201 'Invalid worker choice strategy options: must be a plain object'
202 )
203 }
204 if (
205 workerChoiceStrategyOptions.weights != null &&
206 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
207 ) {
208 throw new Error(
209 'Invalid worker choice strategy options: must have a weight for each worker node'
210 )
211 }
212 if (
213 workerChoiceStrategyOptions.measurement != null &&
214 !Object.values(Measurements).includes(
215 workerChoiceStrategyOptions.measurement
216 )
217 ) {
218 throw new Error(
219 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
220 )
221 }
222 }
223
224 private checkValidTasksQueueOptions (
225 tasksQueueOptions: TasksQueueOptions
226 ): void {
227 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
228 throw new TypeError('Invalid tasks queue options: must be a plain object')
229 }
230 if (
231 tasksQueueOptions?.concurrency != null &&
232 !Number.isSafeInteger(tasksQueueOptions.concurrency)
233 ) {
234 throw new TypeError(
235 'Invalid worker tasks concurrency: must be an integer'
236 )
237 }
238 if (
239 tasksQueueOptions?.concurrency != null &&
240 tasksQueueOptions.concurrency <= 0
241 ) {
242 throw new Error(
243 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
244 )
245 }
246 }
247
248 /** @inheritDoc */
249 public get info (): PoolInfo {
250 return {
251 type: this.type,
252 worker: this.worker,
253 minSize: this.minSize,
254 maxSize: this.maxSize,
255 utilization: round(this.utilization),
256 workerNodes: this.workerNodes.length,
257 idleWorkerNodes: this.workerNodes.reduce(
258 (accumulator, workerNode) =>
259 workerNode.usage.tasks.executing === 0
260 ? accumulator + 1
261 : accumulator,
262 0
263 ),
264 busyWorkerNodes: this.workerNodes.reduce(
265 (accumulator, workerNode) =>
266 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
267 0
268 ),
269 executedTasks: this.workerNodes.reduce(
270 (accumulator, workerNode) =>
271 accumulator + workerNode.usage.tasks.executed,
272 0
273 ),
274 executingTasks: this.workerNodes.reduce(
275 (accumulator, workerNode) =>
276 accumulator + workerNode.usage.tasks.executing,
277 0
278 ),
279 queuedTasks: this.workerNodes.reduce(
280 (accumulator, workerNode) =>
281 accumulator + workerNode.usage.tasks.queued,
282 0
283 ),
284 maxQueuedTasks: this.workerNodes.reduce(
285 (accumulator, workerNode) =>
286 accumulator + workerNode.usage.tasks.maxQueued,
287 0
288 ),
289 failedTasks: this.workerNodes.reduce(
290 (accumulator, workerNode) =>
291 accumulator + workerNode.usage.tasks.failed,
292 0
293 )
294 }
295 }
296
297 /**
298 * Gets the pool run time.
299 *
300 * @returns The pool run time in milliseconds.
301 */
302 private get runTime (): number {
303 return performance.now() - this.startTimestamp
304 }
305
306 /**
307 * Gets the approximate pool utilization.
308 *
309 * @returns The pool utilization.
310 */
311 private get utilization (): number {
312 const poolRunTimeCapacity = this.runTime * this.maxSize
313 const totalTasksRunTime = this.workerNodes.reduce(
314 (accumulator, workerNode) =>
315 accumulator + workerNode.usage.runTime.aggregate,
316 0
317 )
318 const totalTasksWaitTime = this.workerNodes.reduce(
319 (accumulator, workerNode) =>
320 accumulator + workerNode.usage.waitTime.aggregate,
321 0
322 )
323 return (totalTasksRunTime + totalTasksWaitTime) / poolRunTimeCapacity
324 }
325
326 /**
327 * Pool type.
328 *
329 * If it is `'dynamic'`, it provides the `max` property.
330 */
331 protected abstract get type (): PoolType
332
333 /**
334 * Gets the worker type.
335 */
336 protected abstract get worker (): WorkerType
337
338 /**
339 * Pool minimum size.
340 */
341 protected abstract get minSize (): number
342
343 /**
344 * Pool maximum size.
345 */
346 protected abstract get maxSize (): number
347
348 /**
349 * Get the worker given its id.
350 *
351 * @param workerId - The worker id.
352 * @returns The worker if found in the pool worker nodes, `undefined` otherwise.
353 */
354 private getWorkerById (workerId: number): Worker | undefined {
355 return this.workerNodes.find(workerNode => workerNode.info.id === workerId)
356 ?.worker
357 }
358
359 /**
360 * Gets the given worker its worker node key.
361 *
362 * @param worker - The worker.
363 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
364 */
365 private getWorkerNodeKey (worker: Worker): number {
366 return this.workerNodes.findIndex(
367 workerNode => workerNode.worker === worker
368 )
369 }
370
371 /** @inheritDoc */
372 public setWorkerChoiceStrategy (
373 workerChoiceStrategy: WorkerChoiceStrategy,
374 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
375 ): void {
376 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
377 this.opts.workerChoiceStrategy = workerChoiceStrategy
378 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
379 this.opts.workerChoiceStrategy
380 )
381 if (workerChoiceStrategyOptions != null) {
382 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
383 }
384 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
385 this.setWorkerNodeTasksUsage(
386 workerNode,
387 this.getWorkerUsage(workerNodeKey)
388 )
389 this.setWorkerStatistics(workerNode.worker)
390 }
391 }
392
393 /** @inheritDoc */
394 public setWorkerChoiceStrategyOptions (
395 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
396 ): void {
397 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
398 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
399 this.workerChoiceStrategyContext.setOptions(
400 this.opts.workerChoiceStrategyOptions
401 )
402 }
403
404 /** @inheritDoc */
405 public enableTasksQueue (
406 enable: boolean,
407 tasksQueueOptions?: TasksQueueOptions
408 ): void {
409 if (this.opts.enableTasksQueue === true && !enable) {
410 this.flushTasksQueues()
411 }
412 this.opts.enableTasksQueue = enable
413 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
414 }
415
416 /** @inheritDoc */
417 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
418 if (this.opts.enableTasksQueue === true) {
419 this.checkValidTasksQueueOptions(tasksQueueOptions)
420 this.opts.tasksQueueOptions =
421 this.buildTasksQueueOptions(tasksQueueOptions)
422 } else if (this.opts.tasksQueueOptions != null) {
423 delete this.opts.tasksQueueOptions
424 }
425 }
426
427 private buildTasksQueueOptions (
428 tasksQueueOptions: TasksQueueOptions
429 ): TasksQueueOptions {
430 return {
431 concurrency: tasksQueueOptions?.concurrency ?? 1
432 }
433 }
434
435 /**
436 * Whether the pool is full or not.
437 *
438 * The pool filling boolean status.
439 */
440 protected get full (): boolean {
441 return this.workerNodes.length >= this.maxSize
442 }
443
444 /**
445 * Whether the pool is busy or not.
446 *
447 * The pool busyness boolean status.
448 */
449 protected abstract get busy (): boolean
450
451 /**
452 * Whether worker nodes are executing at least one task.
453 *
454 * @returns Worker nodes busyness boolean status.
455 */
456 protected internalBusy (): boolean {
457 return (
458 this.workerNodes.findIndex(workerNode => {
459 return workerNode.usage.tasks.executing === 0
460 }) === -1
461 )
462 }
463
464 /** @inheritDoc */
465 public async execute (data?: Data, name?: string): Promise<Response> {
466 const timestamp = performance.now()
467 const workerNodeKey = this.chooseWorkerNode()
468 const submittedTask: Task<Data> = {
469 name,
470 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
471 data: data ?? ({} as Data),
472 timestamp,
473 id: crypto.randomUUID()
474 }
475 const res = new Promise<Response>((resolve, reject) => {
476 this.promiseResponseMap.set(submittedTask.id as string, {
477 resolve,
478 reject,
479 worker: this.workerNodes[workerNodeKey].worker
480 })
481 })
482 if (
483 this.opts.enableTasksQueue === true &&
484 (this.busy ||
485 this.workerNodes[workerNodeKey].usage.tasks.executing >=
486 ((this.opts.tasksQueueOptions as TasksQueueOptions)
487 .concurrency as number))
488 ) {
489 this.enqueueTask(workerNodeKey, submittedTask)
490 } else {
491 this.executeTask(workerNodeKey, submittedTask)
492 }
493 this.checkAndEmitEvents()
494 // eslint-disable-next-line @typescript-eslint/return-await
495 return res
496 }
497
498 /** @inheritDoc */
499 public async destroy (): Promise<void> {
500 await Promise.all(
501 this.workerNodes.map(async (workerNode, workerNodeKey) => {
502 this.flushTasksQueue(workerNodeKey)
503 // FIXME: wait for tasks to be finished
504 await this.destroyWorker(workerNode.worker)
505 })
506 )
507 }
508
509 /**
510 * Terminates the given worker.
511 *
512 * @param worker - A worker within `workerNodes`.
513 */
514 protected abstract destroyWorker (worker: Worker): void | Promise<void>
515
516 /**
517 * Setup hook to execute code before worker nodes are created in the abstract constructor.
518 * Can be overridden.
519 *
520 * @virtual
521 */
522 protected setupHook (): void {
523 // Intentionally empty
524 }
525
526 /**
527 * Should return whether the worker is the main worker or not.
528 */
529 protected abstract isMain (): boolean
530
531 /**
532 * Hook executed before the worker task execution.
533 * Can be overridden.
534 *
535 * @param workerNodeKey - The worker node key.
536 * @param task - The task to execute.
537 */
538 protected beforeTaskExecutionHook (
539 workerNodeKey: number,
540 task: Task<Data>
541 ): void {
542 const workerUsage = this.workerNodes[workerNodeKey].usage
543 ++workerUsage.tasks.executing
544 this.updateWaitTimeWorkerUsage(workerUsage, task)
545 }
546
547 /**
548 * Hook executed after the worker task execution.
549 * Can be overridden.
550 *
551 * @param worker - The worker.
552 * @param message - The received message.
553 */
554 protected afterTaskExecutionHook (
555 worker: Worker,
556 message: MessageValue<Response>
557 ): void {
558 const workerUsage = this.workerNodes[this.getWorkerNodeKey(worker)].usage
559 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
560 this.updateRunTimeWorkerUsage(workerUsage, message)
561 this.updateEluWorkerUsage(workerUsage, message)
562 }
563
564 private updateTaskStatisticsWorkerUsage (
565 workerUsage: WorkerUsage,
566 message: MessageValue<Response>
567 ): void {
568 const workerTaskStatistics = workerUsage.tasks
569 --workerTaskStatistics.executing
570 ++workerTaskStatistics.executed
571 if (message.taskError != null) {
572 ++workerTaskStatistics.failed
573 }
574 }
575
576 private updateRunTimeWorkerUsage (
577 workerUsage: WorkerUsage,
578 message: MessageValue<Response>
579 ): void {
580 if (
581 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
582 .aggregate
583 ) {
584 workerUsage.runTime.aggregate += message.taskPerformance?.runTime ?? 0
585 if (
586 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
587 .average &&
588 workerUsage.tasks.executed !== 0
589 ) {
590 workerUsage.runTime.average =
591 workerUsage.runTime.aggregate /
592 (workerUsage.tasks.executed - workerUsage.tasks.failed)
593 }
594 if (
595 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
596 .median &&
597 message.taskPerformance?.runTime != null
598 ) {
599 workerUsage.runTime.history.push(message.taskPerformance.runTime)
600 workerUsage.runTime.median = median(workerUsage.runTime.history)
601 }
602 }
603 }
604
605 private updateWaitTimeWorkerUsage (
606 workerUsage: WorkerUsage,
607 task: Task<Data>
608 ): void {
609 const timestamp = performance.now()
610 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
611 if (
612 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime
613 .aggregate
614 ) {
615 workerUsage.waitTime.aggregate += taskWaitTime ?? 0
616 if (
617 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
618 .waitTime.average &&
619 workerUsage.tasks.executed !== 0
620 ) {
621 workerUsage.waitTime.average =
622 workerUsage.waitTime.aggregate /
623 (workerUsage.tasks.executed - workerUsage.tasks.failed)
624 }
625 if (
626 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
627 .waitTime.median &&
628 taskWaitTime != null
629 ) {
630 workerUsage.waitTime.history.push(taskWaitTime)
631 workerUsage.waitTime.median = median(workerUsage.waitTime.history)
632 }
633 }
634 }
635
636 private updateEluWorkerUsage (
637 workerUsage: WorkerUsage,
638 message: MessageValue<Response>
639 ): void {
640 if (
641 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
642 .aggregate
643 ) {
644 if (workerUsage.elu != null && message.taskPerformance?.elu != null) {
645 workerUsage.elu.idle.aggregate += message.taskPerformance.elu.idle
646 workerUsage.elu.active.aggregate += message.taskPerformance.elu.active
647 workerUsage.elu.utilization =
648 (workerUsage.elu.utilization +
649 message.taskPerformance.elu.utilization) /
650 2
651 } else if (message.taskPerformance?.elu != null) {
652 workerUsage.elu.idle.aggregate = message.taskPerformance.elu.idle
653 workerUsage.elu.active.aggregate = message.taskPerformance.elu.active
654 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
655 }
656 if (
657 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
658 .average &&
659 workerUsage.tasks.executed !== 0
660 ) {
661 const executedTasks =
662 workerUsage.tasks.executed - workerUsage.tasks.failed
663 workerUsage.elu.idle.average =
664 workerUsage.elu.idle.aggregate / executedTasks
665 workerUsage.elu.active.average =
666 workerUsage.elu.active.aggregate / executedTasks
667 }
668 if (
669 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
670 .median &&
671 message.taskPerformance?.elu != null
672 ) {
673 workerUsage.elu.idle.history.push(message.taskPerformance.elu.idle)
674 workerUsage.elu.active.history.push(message.taskPerformance.elu.active)
675 workerUsage.elu.idle.median = median(workerUsage.elu.idle.history)
676 workerUsage.elu.active.median = median(workerUsage.elu.active.history)
677 }
678 }
679 }
680
681 /**
682 * Chooses a worker node for the next task.
683 *
684 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
685 *
686 * @returns The worker node key
687 */
688 private chooseWorkerNode (): number {
689 if (this.shallCreateDynamicWorker()) {
690 const worker = this.createAndSetupDynamicWorker()
691 if (
692 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
693 ) {
694 return this.getWorkerNodeKey(worker)
695 }
696 }
697 return this.workerChoiceStrategyContext.execute()
698 }
699
700 /**
701 * Conditions for dynamic worker creation.
702 *
703 * @returns Whether to create a dynamic worker or not.
704 */
705 private shallCreateDynamicWorker (): boolean {
706 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
707 }
708
709 /**
710 * Sends a message to the given worker.
711 *
712 * @param worker - The worker which should receive the message.
713 * @param message - The message.
714 */
715 protected abstract sendToWorker (
716 worker: Worker,
717 message: MessageValue<Data>
718 ): void
719
720 /**
721 * Registers a listener callback on the given worker.
722 *
723 * @param worker - The worker which should register a listener.
724 * @param listener - The message listener callback.
725 */
726 private registerWorkerMessageListener<Message extends Data | Response>(
727 worker: Worker,
728 listener: (message: MessageValue<Message>) => void
729 ): void {
730 worker.on('message', listener as MessageHandler<Worker>)
731 }
732
733 /**
734 * Creates a new worker.
735 *
736 * @returns Newly created worker.
737 */
738 protected abstract createWorker (): Worker
739
740 /**
741 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
742 * Can be overridden.
743 *
744 * @param worker - The newly created worker.
745 */
746 protected afterWorkerSetup (worker: Worker): void {
747 // Listen to worker messages.
748 this.registerWorkerMessageListener(worker, this.workerListener())
749 }
750
751 /**
752 * Creates a new worker and sets it up completely in the pool worker nodes.
753 *
754 * @returns New, completely set up worker.
755 */
756 protected createAndSetupWorker (): Worker {
757 const worker = this.createWorker()
758
759 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
760 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
761 worker.on('error', error => {
762 if (this.emitter != null) {
763 this.emitter.emit(PoolEvents.error, error)
764 }
765 if (this.opts.restartWorkerOnError === true) {
766 this.createAndSetupWorker()
767 }
768 })
769 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
770 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
771 worker.once('exit', () => {
772 this.removeWorkerNode(worker)
773 })
774
775 this.pushWorkerNode(worker)
776
777 this.setWorkerStatistics(worker)
778
779 this.afterWorkerSetup(worker)
780
781 return worker
782 }
783
784 /**
785 * Creates a new dynamic worker and sets it up completely in the pool worker nodes.
786 *
787 * @returns New, completely set up dynamic worker.
788 */
789 protected createAndSetupDynamicWorker (): Worker {
790 const worker = this.createAndSetupWorker()
791 this.registerWorkerMessageListener(worker, message => {
792 const workerNodeKey = this.getWorkerNodeKey(worker)
793 if (
794 isKillBehavior(KillBehaviors.HARD, message.kill) ||
795 (message.kill != null &&
796 ((this.opts.enableTasksQueue === false &&
797 this.workerNodes[workerNodeKey].usage.tasks.executing === 0) ||
798 (this.opts.enableTasksQueue === true &&
799 this.workerNodes[workerNodeKey].usage.tasks.executing === 0 &&
800 this.tasksQueueSize(workerNodeKey) === 0)))
801 ) {
802 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
803 void (this.destroyWorker(worker) as Promise<void>)
804 }
805 })
806 return worker
807 }
808
809 /**
810 * This function is the listener registered for each worker message.
811 *
812 * @returns The listener function to execute when a message is received from a worker.
813 */
814 protected workerListener (): (message: MessageValue<Response>) => void {
815 return message => {
816 if (message.workerId != null && message.started != null) {
817 // Worker started message received
818 const worker = this.getWorkerById(message.workerId)
819 if (worker != null) {
820 this.workerNodes[this.getWorkerNodeKey(worker)].info.started =
821 message.started
822 } else {
823 throw new Error(
824 `Worker started message received from unknown worker '${message.workerId}'`
825 )
826 }
827 } else if (message.id != null) {
828 // Task execution response received
829 const promiseResponse = this.promiseResponseMap.get(message.id)
830 if (promiseResponse != null) {
831 if (message.taskError != null) {
832 if (this.emitter != null) {
833 this.emitter.emit(PoolEvents.taskError, message.taskError)
834 }
835 promiseResponse.reject(
836 `${message.taskError.message} on worker '${message.taskError.workerId}'`
837 )
838 } else {
839 promiseResponse.resolve(message.data as Response)
840 }
841 this.afterTaskExecutionHook(promiseResponse.worker, message)
842 this.promiseResponseMap.delete(message.id)
843 const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
844 if (
845 this.opts.enableTasksQueue === true &&
846 this.tasksQueueSize(workerNodeKey) > 0
847 ) {
848 this.executeTask(
849 workerNodeKey,
850 this.dequeueTask(workerNodeKey) as Task<Data>
851 )
852 }
853 this.workerChoiceStrategyContext.update(workerNodeKey)
854 }
855 }
856 }
857 }
858
859 private checkAndEmitEvents (): void {
860 if (this.emitter != null) {
861 if (this.busy) {
862 this.emitter?.emit(PoolEvents.busy, this.info)
863 }
864 if (this.type === PoolTypes.dynamic && this.full) {
865 this.emitter?.emit(PoolEvents.full, this.info)
866 }
867 }
868 }
869
870 /**
871 * Sets the given worker node its tasks usage in the pool.
872 *
873 * @param workerNode - The worker node.
874 * @param workerUsage - The worker usage.
875 */
876 private setWorkerNodeTasksUsage (
877 workerNode: WorkerNode<Worker, Data>,
878 workerUsage: WorkerUsage
879 ): void {
880 workerNode.usage = workerUsage
881 }
882
883 /**
884 * Pushes the given worker in the pool worker nodes.
885 *
886 * @param worker - The worker.
887 * @returns The worker nodes length.
888 */
889 private pushWorkerNode (worker: Worker): number {
890 this.workerNodes.push({
891 worker,
892 info: { id: this.getWorkerId(worker), started: true },
893 usage: this.getWorkerUsage(),
894 tasksQueue: new Queue<Task<Data>>()
895 })
896 const workerNodeKey = this.getWorkerNodeKey(worker)
897 this.setWorkerNodeTasksUsage(
898 this.workerNodes[workerNodeKey],
899 this.getWorkerUsage(workerNodeKey)
900 )
901 return this.workerNodes.length
902 }
903
904 /**
905 * Gets the worker id.
906 *
907 * @param worker - The worker.
908 * @returns The worker id.
909 */
910 private getWorkerId (worker: Worker): number | undefined {
911 if (this.worker === WorkerTypes.thread) {
912 return worker.threadId
913 } else if (this.worker === WorkerTypes.cluster) {
914 return worker.id
915 }
916 }
917
918 // /**
919 // * Sets the given worker in the pool worker nodes.
920 // *
921 // * @param workerNodeKey - The worker node key.
922 // * @param worker - The worker.
923 // * @param workerInfo - The worker info.
924 // * @param workerUsage - The worker usage.
925 // * @param tasksQueue - The worker task queue.
926 // */
927 // private setWorkerNode (
928 // workerNodeKey: number,
929 // worker: Worker,
930 // workerInfo: WorkerInfo,
931 // workerUsage: WorkerUsage,
932 // tasksQueue: Queue<Task<Data>>
933 // ): void {
934 // this.workerNodes[workerNodeKey] = {
935 // worker,
936 // info: workerInfo,
937 // usage: workerUsage,
938 // tasksQueue
939 // }
940 // }
941
942 /**
943 * Removes the given worker from the pool worker nodes.
944 *
945 * @param worker - The worker.
946 */
947 private removeWorkerNode (worker: Worker): void {
948 const workerNodeKey = this.getWorkerNodeKey(worker)
949 if (workerNodeKey !== -1) {
950 this.workerNodes.splice(workerNodeKey, 1)
951 this.workerChoiceStrategyContext.remove(workerNodeKey)
952 }
953 }
954
955 private executeTask (workerNodeKey: number, task: Task<Data>): void {
956 this.beforeTaskExecutionHook(workerNodeKey, task)
957 this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
958 }
959
960 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
961 return this.workerNodes[workerNodeKey].tasksQueue.enqueue(task)
962 }
963
964 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
965 return this.workerNodes[workerNodeKey].tasksQueue.dequeue()
966 }
967
968 private tasksQueueSize (workerNodeKey: number): number {
969 return this.workerNodes[workerNodeKey].tasksQueue.size
970 }
971
972 private tasksMaxQueueSize (workerNodeKey: number): number {
973 return this.workerNodes[workerNodeKey].tasksQueue.maxSize
974 }
975
976 private flushTasksQueue (workerNodeKey: number): void {
977 if (this.tasksQueueSize(workerNodeKey) > 0) {
978 for (let i = 0; i < this.tasksQueueSize(workerNodeKey); i++) {
979 this.executeTask(
980 workerNodeKey,
981 this.dequeueTask(workerNodeKey) as Task<Data>
982 )
983 }
984 }
985 this.workerNodes[workerNodeKey].tasksQueue.clear()
986 }
987
988 private flushTasksQueues (): void {
989 for (const [workerNodeKey] of this.workerNodes.entries()) {
990 this.flushTasksQueue(workerNodeKey)
991 }
992 }
993
994 private setWorkerStatistics (worker: Worker): void {
995 this.sendToWorker(worker, {
996 statistics: {
997 runTime:
998 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
999 .runTime.aggregate,
1000 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1001 .elu.aggregate
1002 }
1003 })
1004 }
1005
1006 private getWorkerUsage (workerNodeKey?: number): WorkerUsage {
1007 const getTasksQueueSize = (workerNodeKey?: number): number => {
1008 return workerNodeKey != null ? this.tasksQueueSize(workerNodeKey) : 0
1009 }
1010 const getTasksMaxQueueSize = (workerNodeKey?: number): number => {
1011 return workerNodeKey != null ? this.tasksMaxQueueSize(workerNodeKey) : 0
1012 }
1013 return {
1014 tasks: {
1015 executed: 0,
1016 executing: 0,
1017 get queued (): number {
1018 return getTasksQueueSize(workerNodeKey)
1019 },
1020 get maxQueued (): number {
1021 return getTasksMaxQueueSize(workerNodeKey)
1022 },
1023 failed: 0
1024 },
1025 runTime: {
1026 aggregate: 0,
1027 average: 0,
1028 median: 0,
1029 history: new CircularArray()
1030 },
1031 waitTime: {
1032 aggregate: 0,
1033 average: 0,
1034 median: 0,
1035 history: new CircularArray()
1036 },
1037 elu: {
1038 idle: {
1039 aggregate: 0,
1040 average: 0,
1041 median: 0,
1042 history: new CircularArray()
1043 },
1044 active: {
1045 aggregate: 0,
1046 average: 0,
1047 median: 0,
1048 history: new CircularArray()
1049 },
1050 utilization: 0
1051 }
1052 }
1053 }
1054 }