fix: reassign queued tasks on worker error
[poolifier.git] / src / pools / abstract-pool.ts
1 import crypto from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import type { MessageValue, PromiseResponseWrapper } from '../utility-types'
4 import {
5 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
6 EMPTY_FUNCTION,
7 isKillBehavior,
8 isPlainObject,
9 median,
10 round
11 } from '../utils'
12 import { KillBehaviors } from '../worker/worker-options'
13 import { CircularArray } from '../circular-array'
14 import { Queue } from '../queue'
15 import {
16 type IPool,
17 PoolEmitter,
18 PoolEvents,
19 type PoolInfo,
20 type PoolOptions,
21 type PoolType,
22 PoolTypes,
23 type TasksQueueOptions,
24 type WorkerType,
25 WorkerTypes
26 } from './pool'
27 import type {
28 IWorker,
29 MessageHandler,
30 Task,
31 WorkerNode,
32 WorkerUsage
33 } from './worker'
34 import {
35 Measurements,
36 WorkerChoiceStrategies,
37 type WorkerChoiceStrategy,
38 type WorkerChoiceStrategyOptions
39 } from './selection-strategies/selection-strategies-types'
40 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
41
42 /**
43 * Base class that implements some shared logic for all poolifier pools.
44 *
45 * @typeParam Worker - Type of worker which manages this pool.
46 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
47 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
48 */
49 export abstract class AbstractPool<
50 Worker extends IWorker,
51 Data = unknown,
52 Response = unknown
53 > implements IPool<Worker, Data, Response> {
54 /** @inheritDoc */
55 public readonly workerNodes: Array<WorkerNode<Worker, Data>> = []
56
57 /** @inheritDoc */
58 public readonly emitter?: PoolEmitter
59
60 /**
61 * The execution response promise map.
62 *
63 * - `key`: The message id of each submitted task.
64 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
65 *
66 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
67 */
68 protected promiseResponseMap: Map<
69 string,
70 PromiseResponseWrapper<Worker, Response>
71 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
72
73 /**
74 * Worker choice strategy context referencing a worker choice algorithm implementation.
75 */
76 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
77 Worker,
78 Data,
79 Response
80 >
81
82 /**
83 * The start timestamp of the pool.
84 */
85 private readonly startTimestamp
86
87 /**
88 * Constructs a new poolifier pool.
89 *
90 * @param numberOfWorkers - Number of workers that this pool should manage.
91 * @param filePath - Path to the worker file.
92 * @param opts - Options for the pool.
93 */
94 public constructor (
95 protected readonly numberOfWorkers: number,
96 protected readonly filePath: string,
97 protected readonly opts: PoolOptions<Worker>
98 ) {
99 if (!this.isMain()) {
100 throw new Error('Cannot start a pool from a worker!')
101 }
102 this.checkNumberOfWorkers(this.numberOfWorkers)
103 this.checkFilePath(this.filePath)
104 this.checkPoolOptions(this.opts)
105
106 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
107 this.executeTask = this.executeTask.bind(this)
108 this.enqueueTask = this.enqueueTask.bind(this)
109 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
110
111 if (this.opts.enableEvents === true) {
112 this.emitter = new PoolEmitter()
113 }
114 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
115 Worker,
116 Data,
117 Response
118 >(
119 this,
120 this.opts.workerChoiceStrategy,
121 this.opts.workerChoiceStrategyOptions
122 )
123
124 this.setupHook()
125
126 while (this.workerNodes.length < this.numberOfWorkers) {
127 this.createAndSetupWorker()
128 }
129
130 this.startTimestamp = performance.now()
131 }
132
133 private checkFilePath (filePath: string): void {
134 if (
135 filePath == null ||
136 (typeof filePath === 'string' && filePath.trim().length === 0)
137 ) {
138 throw new Error('Please specify a file with a worker implementation')
139 }
140 }
141
142 private checkNumberOfWorkers (numberOfWorkers: number): void {
143 if (numberOfWorkers == null) {
144 throw new Error(
145 'Cannot instantiate a pool without specifying the number of workers'
146 )
147 } else if (!Number.isSafeInteger(numberOfWorkers)) {
148 throw new TypeError(
149 'Cannot instantiate a pool with a non safe integer number of workers'
150 )
151 } else if (numberOfWorkers < 0) {
152 throw new RangeError(
153 'Cannot instantiate a pool with a negative number of workers'
154 )
155 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
156 throw new Error('Cannot instantiate a fixed pool with no worker')
157 }
158 }
159
160 private checkPoolOptions (opts: PoolOptions<Worker>): void {
161 if (isPlainObject(opts)) {
162 this.opts.workerChoiceStrategy =
163 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
164 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
165 this.opts.workerChoiceStrategyOptions =
166 opts.workerChoiceStrategyOptions ??
167 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
168 this.checkValidWorkerChoiceStrategyOptions(
169 this.opts.workerChoiceStrategyOptions
170 )
171 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
172 this.opts.enableEvents = opts.enableEvents ?? true
173 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
174 if (this.opts.enableTasksQueue) {
175 this.checkValidTasksQueueOptions(
176 opts.tasksQueueOptions as TasksQueueOptions
177 )
178 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
179 opts.tasksQueueOptions as TasksQueueOptions
180 )
181 }
182 } else {
183 throw new TypeError('Invalid pool options: must be a plain object')
184 }
185 }
186
187 private checkValidWorkerChoiceStrategy (
188 workerChoiceStrategy: WorkerChoiceStrategy
189 ): void {
190 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
191 throw new Error(
192 `Invalid worker choice strategy '${workerChoiceStrategy}'`
193 )
194 }
195 }
196
197 private checkValidWorkerChoiceStrategyOptions (
198 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
199 ): void {
200 if (!isPlainObject(workerChoiceStrategyOptions)) {
201 throw new TypeError(
202 'Invalid worker choice strategy options: must be a plain object'
203 )
204 }
205 if (
206 workerChoiceStrategyOptions.weights != null &&
207 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
208 ) {
209 throw new Error(
210 'Invalid worker choice strategy options: must have a weight for each worker node'
211 )
212 }
213 if (
214 workerChoiceStrategyOptions.measurement != null &&
215 !Object.values(Measurements).includes(
216 workerChoiceStrategyOptions.measurement
217 )
218 ) {
219 throw new Error(
220 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
221 )
222 }
223 }
224
225 private checkValidTasksQueueOptions (
226 tasksQueueOptions: TasksQueueOptions
227 ): void {
228 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
229 throw new TypeError('Invalid tasks queue options: must be a plain object')
230 }
231 if (
232 tasksQueueOptions?.concurrency != null &&
233 !Number.isSafeInteger(tasksQueueOptions.concurrency)
234 ) {
235 throw new TypeError(
236 'Invalid worker tasks concurrency: must be an integer'
237 )
238 }
239 if (
240 tasksQueueOptions?.concurrency != null &&
241 tasksQueueOptions.concurrency <= 0
242 ) {
243 throw new Error(
244 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
245 )
246 }
247 }
248
249 /** @inheritDoc */
250 public get info (): PoolInfo {
251 return {
252 type: this.type,
253 worker: this.worker,
254 minSize: this.minSize,
255 maxSize: this.maxSize,
256 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
257 .runTime.aggregate &&
258 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
259 .waitTime.aggregate && { utilization: round(this.utilization) }),
260 workerNodes: this.workerNodes.length,
261 idleWorkerNodes: this.workerNodes.reduce(
262 (accumulator, workerNode) =>
263 workerNode.usage.tasks.executing === 0
264 ? accumulator + 1
265 : accumulator,
266 0
267 ),
268 busyWorkerNodes: this.workerNodes.reduce(
269 (accumulator, workerNode) =>
270 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
271 0
272 ),
273 executedTasks: this.workerNodes.reduce(
274 (accumulator, workerNode) =>
275 accumulator + workerNode.usage.tasks.executed,
276 0
277 ),
278 executingTasks: this.workerNodes.reduce(
279 (accumulator, workerNode) =>
280 accumulator + workerNode.usage.tasks.executing,
281 0
282 ),
283 queuedTasks: this.workerNodes.reduce(
284 (accumulator, workerNode) =>
285 accumulator + workerNode.usage.tasks.queued,
286 0
287 ),
288 maxQueuedTasks: this.workerNodes.reduce(
289 (accumulator, workerNode) =>
290 accumulator + workerNode.usage.tasks.maxQueued,
291 0
292 ),
293 failedTasks: this.workerNodes.reduce(
294 (accumulator, workerNode) =>
295 accumulator + workerNode.usage.tasks.failed,
296 0
297 )
298 }
299 }
300
301 /**
302 * Gets the pool run time.
303 *
304 * @returns The pool run time in milliseconds.
305 */
306 private get runTime (): number {
307 return performance.now() - this.startTimestamp
308 }
309
310 /**
311 * Gets the approximate pool utilization.
312 *
313 * @returns The pool utilization.
314 */
315 private get utilization (): number {
316 const poolRunTimeCapacity = this.runTime * this.maxSize
317 const totalTasksRunTime = this.workerNodes.reduce(
318 (accumulator, workerNode) =>
319 accumulator + workerNode.usage.runTime.aggregate,
320 0
321 )
322 const totalTasksWaitTime = this.workerNodes.reduce(
323 (accumulator, workerNode) =>
324 accumulator + workerNode.usage.waitTime.aggregate,
325 0
326 )
327 return (totalTasksRunTime + totalTasksWaitTime) / poolRunTimeCapacity
328 }
329
330 /**
331 * Pool type.
332 *
333 * If it is `'dynamic'`, it provides the `max` property.
334 */
335 protected abstract get type (): PoolType
336
337 /**
338 * Gets the worker type.
339 */
340 protected abstract get worker (): WorkerType
341
342 /**
343 * Pool minimum size.
344 */
345 protected abstract get minSize (): number
346
347 /**
348 * Pool maximum size.
349 */
350 protected abstract get maxSize (): number
351
352 /**
353 * Get the worker given its id.
354 *
355 * @param workerId - The worker id.
356 * @returns The worker if found in the pool worker nodes, `undefined` otherwise.
357 */
358 private getWorkerById (workerId: number): Worker | undefined {
359 return this.workerNodes.find(workerNode => workerNode.info.id === workerId)
360 ?.worker
361 }
362
363 /**
364 * Gets the given worker its worker node key.
365 *
366 * @param worker - The worker.
367 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
368 */
369 private getWorkerNodeKey (worker: Worker): number {
370 return this.workerNodes.findIndex(
371 workerNode => workerNode.worker === worker
372 )
373 }
374
375 /** @inheritDoc */
376 public setWorkerChoiceStrategy (
377 workerChoiceStrategy: WorkerChoiceStrategy,
378 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
379 ): void {
380 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
381 this.opts.workerChoiceStrategy = workerChoiceStrategy
382 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
383 this.opts.workerChoiceStrategy
384 )
385 if (workerChoiceStrategyOptions != null) {
386 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
387 }
388 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
389 this.setWorkerNodeTasksUsage(
390 workerNode,
391 this.getWorkerUsage(workerNodeKey)
392 )
393 this.setWorkerStatistics(workerNode.worker)
394 }
395 }
396
397 /** @inheritDoc */
398 public setWorkerChoiceStrategyOptions (
399 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
400 ): void {
401 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
402 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
403 this.workerChoiceStrategyContext.setOptions(
404 this.opts.workerChoiceStrategyOptions
405 )
406 }
407
408 /** @inheritDoc */
409 public enableTasksQueue (
410 enable: boolean,
411 tasksQueueOptions?: TasksQueueOptions
412 ): void {
413 if (this.opts.enableTasksQueue === true && !enable) {
414 this.flushTasksQueues()
415 }
416 this.opts.enableTasksQueue = enable
417 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
418 }
419
420 /** @inheritDoc */
421 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
422 if (this.opts.enableTasksQueue === true) {
423 this.checkValidTasksQueueOptions(tasksQueueOptions)
424 this.opts.tasksQueueOptions =
425 this.buildTasksQueueOptions(tasksQueueOptions)
426 } else if (this.opts.tasksQueueOptions != null) {
427 delete this.opts.tasksQueueOptions
428 }
429 }
430
431 private buildTasksQueueOptions (
432 tasksQueueOptions: TasksQueueOptions
433 ): TasksQueueOptions {
434 return {
435 concurrency: tasksQueueOptions?.concurrency ?? 1
436 }
437 }
438
439 /**
440 * Whether the pool is full or not.
441 *
442 * The pool filling boolean status.
443 */
444 protected get full (): boolean {
445 return this.workerNodes.length >= this.maxSize
446 }
447
448 /**
449 * Whether the pool is busy or not.
450 *
451 * The pool busyness boolean status.
452 */
453 protected abstract get busy (): boolean
454
455 /**
456 * Whether worker nodes are executing at least one task.
457 *
458 * @returns Worker nodes busyness boolean status.
459 */
460 protected internalBusy (): boolean {
461 return (
462 this.workerNodes.findIndex(workerNode => {
463 return workerNode.usage.tasks.executing === 0
464 }) === -1
465 )
466 }
467
468 /** @inheritDoc */
469 public async execute (data?: Data, name?: string): Promise<Response> {
470 const timestamp = performance.now()
471 const workerNodeKey = this.chooseWorkerNode()
472 const submittedTask: Task<Data> = {
473 name,
474 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
475 data: data ?? ({} as Data),
476 timestamp,
477 id: crypto.randomUUID()
478 }
479 const res = new Promise<Response>((resolve, reject) => {
480 this.promiseResponseMap.set(submittedTask.id as string, {
481 resolve,
482 reject,
483 worker: this.workerNodes[workerNodeKey].worker
484 })
485 })
486 if (
487 this.opts.enableTasksQueue === true &&
488 (this.busy ||
489 this.workerNodes[workerNodeKey].usage.tasks.executing >=
490 ((this.opts.tasksQueueOptions as TasksQueueOptions)
491 .concurrency as number))
492 ) {
493 this.enqueueTask(workerNodeKey, submittedTask)
494 } else {
495 this.executeTask(workerNodeKey, submittedTask)
496 }
497 this.checkAndEmitEvents()
498 // eslint-disable-next-line @typescript-eslint/return-await
499 return res
500 }
501
502 /** @inheritDoc */
503 public async destroy (): Promise<void> {
504 await Promise.all(
505 this.workerNodes.map(async (workerNode, workerNodeKey) => {
506 this.flushTasksQueue(workerNodeKey)
507 // FIXME: wait for tasks to be finished
508 const workerExitPromise = new Promise<void>(resolve => {
509 workerNode.worker.on('exit', () => {
510 resolve()
511 })
512 })
513 await this.destroyWorker(workerNode.worker)
514 await workerExitPromise
515 })
516 )
517 }
518
519 /**
520 * Terminates the given worker.
521 *
522 * @param worker - A worker within `workerNodes`.
523 */
524 protected abstract destroyWorker (worker: Worker): void | Promise<void>
525
526 /**
527 * Setup hook to execute code before worker nodes are created in the abstract constructor.
528 * Can be overridden.
529 *
530 * @virtual
531 */
532 protected setupHook (): void {
533 // Intentionally empty
534 }
535
536 /**
537 * Should return whether the worker is the main worker or not.
538 */
539 protected abstract isMain (): boolean
540
541 /**
542 * Hook executed before the worker task execution.
543 * Can be overridden.
544 *
545 * @param workerNodeKey - The worker node key.
546 * @param task - The task to execute.
547 */
548 protected beforeTaskExecutionHook (
549 workerNodeKey: number,
550 task: Task<Data>
551 ): void {
552 const workerUsage = this.workerNodes[workerNodeKey].usage
553 ++workerUsage.tasks.executing
554 this.updateWaitTimeWorkerUsage(workerUsage, task)
555 }
556
557 /**
558 * Hook executed after the worker task execution.
559 * Can be overridden.
560 *
561 * @param worker - The worker.
562 * @param message - The received message.
563 */
564 protected afterTaskExecutionHook (
565 worker: Worker,
566 message: MessageValue<Response>
567 ): void {
568 const workerUsage = this.workerNodes[this.getWorkerNodeKey(worker)].usage
569 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
570 this.updateRunTimeWorkerUsage(workerUsage, message)
571 this.updateEluWorkerUsage(workerUsage, message)
572 }
573
574 private updateTaskStatisticsWorkerUsage (
575 workerUsage: WorkerUsage,
576 message: MessageValue<Response>
577 ): void {
578 const workerTaskStatistics = workerUsage.tasks
579 --workerTaskStatistics.executing
580 ++workerTaskStatistics.executed
581 if (message.taskError != null) {
582 ++workerTaskStatistics.failed
583 }
584 }
585
586 private updateRunTimeWorkerUsage (
587 workerUsage: WorkerUsage,
588 message: MessageValue<Response>
589 ): void {
590 if (
591 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
592 .aggregate
593 ) {
594 workerUsage.runTime.aggregate += message.taskPerformance?.runTime ?? 0
595 if (
596 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
597 .average &&
598 workerUsage.tasks.executed !== 0
599 ) {
600 workerUsage.runTime.average =
601 workerUsage.runTime.aggregate /
602 (workerUsage.tasks.executed - workerUsage.tasks.failed)
603 }
604 if (
605 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
606 .median &&
607 message.taskPerformance?.runTime != null
608 ) {
609 workerUsage.runTime.history.push(message.taskPerformance.runTime)
610 workerUsage.runTime.median = median(workerUsage.runTime.history)
611 }
612 }
613 }
614
615 private updateWaitTimeWorkerUsage (
616 workerUsage: WorkerUsage,
617 task: Task<Data>
618 ): void {
619 const timestamp = performance.now()
620 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
621 if (
622 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime
623 .aggregate
624 ) {
625 workerUsage.waitTime.aggregate += taskWaitTime ?? 0
626 if (
627 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
628 .waitTime.average &&
629 workerUsage.tasks.executed !== 0
630 ) {
631 workerUsage.waitTime.average =
632 workerUsage.waitTime.aggregate /
633 (workerUsage.tasks.executed - workerUsage.tasks.failed)
634 }
635 if (
636 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
637 .waitTime.median &&
638 taskWaitTime != null
639 ) {
640 workerUsage.waitTime.history.push(taskWaitTime)
641 workerUsage.waitTime.median = median(workerUsage.waitTime.history)
642 }
643 }
644 }
645
646 private updateEluWorkerUsage (
647 workerUsage: WorkerUsage,
648 message: MessageValue<Response>
649 ): void {
650 if (
651 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
652 .aggregate
653 ) {
654 if (workerUsage.elu != null && message.taskPerformance?.elu != null) {
655 workerUsage.elu.idle.aggregate += message.taskPerformance.elu.idle
656 workerUsage.elu.active.aggregate += message.taskPerformance.elu.active
657 workerUsage.elu.utilization =
658 (workerUsage.elu.utilization +
659 message.taskPerformance.elu.utilization) /
660 2
661 } else if (message.taskPerformance?.elu != null) {
662 workerUsage.elu.idle.aggregate = message.taskPerformance.elu.idle
663 workerUsage.elu.active.aggregate = message.taskPerformance.elu.active
664 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
665 }
666 if (
667 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
668 .average &&
669 workerUsage.tasks.executed !== 0
670 ) {
671 const executedTasks =
672 workerUsage.tasks.executed - workerUsage.tasks.failed
673 workerUsage.elu.idle.average =
674 workerUsage.elu.idle.aggregate / executedTasks
675 workerUsage.elu.active.average =
676 workerUsage.elu.active.aggregate / executedTasks
677 }
678 if (
679 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
680 .median &&
681 message.taskPerformance?.elu != null
682 ) {
683 workerUsage.elu.idle.history.push(message.taskPerformance.elu.idle)
684 workerUsage.elu.active.history.push(message.taskPerformance.elu.active)
685 workerUsage.elu.idle.median = median(workerUsage.elu.idle.history)
686 workerUsage.elu.active.median = median(workerUsage.elu.active.history)
687 }
688 }
689 }
690
691 /**
692 * Chooses a worker node for the next task.
693 *
694 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
695 *
696 * @returns The worker node key
697 */
698 private chooseWorkerNode (): number {
699 if (this.shallCreateDynamicWorker()) {
700 const worker = this.createAndSetupDynamicWorker()
701 if (
702 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
703 ) {
704 return this.getWorkerNodeKey(worker)
705 }
706 }
707 return this.workerChoiceStrategyContext.execute()
708 }
709
710 /**
711 * Conditions for dynamic worker creation.
712 *
713 * @returns Whether to create a dynamic worker or not.
714 */
715 private shallCreateDynamicWorker (): boolean {
716 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
717 }
718
719 /**
720 * Sends a message to the given worker.
721 *
722 * @param worker - The worker which should receive the message.
723 * @param message - The message.
724 */
725 protected abstract sendToWorker (
726 worker: Worker,
727 message: MessageValue<Data>
728 ): void
729
730 /**
731 * Registers a listener callback on the given worker.
732 *
733 * @param worker - The worker which should register a listener.
734 * @param listener - The message listener callback.
735 */
736 private registerWorkerMessageListener<Message extends Data | Response>(
737 worker: Worker,
738 listener: (message: MessageValue<Message>) => void
739 ): void {
740 worker.on('message', listener as MessageHandler<Worker>)
741 }
742
743 /**
744 * Creates a new worker.
745 *
746 * @returns Newly created worker.
747 */
748 protected abstract createWorker (): Worker
749
750 /**
751 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
752 * Can be overridden.
753 *
754 * @param worker - The newly created worker.
755 */
756 protected afterWorkerSetup (worker: Worker): void {
757 // Listen to worker messages.
758 this.registerWorkerMessageListener(worker, this.workerListener())
759 }
760
761 /**
762 * Creates a new worker and sets it up completely in the pool worker nodes.
763 *
764 * @returns New, completely set up worker.
765 */
766 protected createAndSetupWorker (): Worker {
767 const worker = this.createWorker()
768
769 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
770 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
771 worker.on('error', error => {
772 if (this.emitter != null) {
773 this.emitter.emit(PoolEvents.error, error)
774 }
775 if (this.opts.enableTasksQueue === true) {
776 const workerNodeKey = this.getWorkerNodeKey(worker)
777 while (this.tasksQueueSize(workerNodeKey) > 0) {
778 let targetWorkerNodeKey: number = workerNodeKey
779 let minQueuedTasks = Infinity
780 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
781 if (
782 workerNodeId !== workerNodeKey &&
783 workerNode.usage.tasks.queued === 0
784 ) {
785 targetWorkerNodeKey = workerNodeId
786 break
787 }
788 if (
789 workerNodeId !== workerNodeKey &&
790 workerNode.usage.tasks.queued < minQueuedTasks
791 ) {
792 minQueuedTasks = workerNode.usage.tasks.queued
793 targetWorkerNodeKey = workerNodeId
794 }
795 }
796 this.enqueueTask(
797 targetWorkerNodeKey,
798 this.dequeueTask(workerNodeKey) as Task<Data>
799 )
800 }
801 }
802 if (this.opts.restartWorkerOnError === true) {
803 this.createAndSetupWorker()
804 }
805 })
806 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
807 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
808 worker.once('exit', () => {
809 this.removeWorkerNode(worker)
810 })
811
812 this.pushWorkerNode(worker)
813
814 this.setWorkerStatistics(worker)
815
816 this.afterWorkerSetup(worker)
817
818 return worker
819 }
820
821 /**
822 * Creates a new dynamic worker and sets it up completely in the pool worker nodes.
823 *
824 * @returns New, completely set up dynamic worker.
825 */
826 protected createAndSetupDynamicWorker (): Worker {
827 const worker = this.createAndSetupWorker()
828 this.registerWorkerMessageListener(worker, message => {
829 const workerNodeKey = this.getWorkerNodeKey(worker)
830 if (
831 isKillBehavior(KillBehaviors.HARD, message.kill) ||
832 (message.kill != null &&
833 ((this.opts.enableTasksQueue === false &&
834 this.workerNodes[workerNodeKey].usage.tasks.executing === 0) ||
835 (this.opts.enableTasksQueue === true &&
836 this.workerNodes[workerNodeKey].usage.tasks.executing === 0 &&
837 this.tasksQueueSize(workerNodeKey) === 0)))
838 ) {
839 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
840 void (this.destroyWorker(worker) as Promise<void>)
841 }
842 })
843 return worker
844 }
845
846 /**
847 * This function is the listener registered for each worker message.
848 *
849 * @returns The listener function to execute when a message is received from a worker.
850 */
851 protected workerListener (): (message: MessageValue<Response>) => void {
852 return message => {
853 if (message.workerId != null && message.started != null) {
854 // Worker started message received
855 this.handleWorkerStartedMessage(message)
856 } else if (message.id != null) {
857 // Task execution response received
858 this.handleTaskExecutionResponse(message)
859 }
860 }
861 }
862
863 private handleWorkerStartedMessage (message: MessageValue<Response>): void {
864 // Worker started message received
865 const worker = this.getWorkerById(message.workerId as number)
866 if (worker != null) {
867 this.workerNodes[this.getWorkerNodeKey(worker)].info.started =
868 message.started as boolean
869 } else {
870 throw new Error(
871 `Worker started message received from unknown worker '${
872 message.workerId as number
873 }'`
874 )
875 }
876 }
877
878 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
879 const promiseResponse = this.promiseResponseMap.get(message.id as string)
880 if (promiseResponse != null) {
881 if (message.taskError != null) {
882 if (this.emitter != null) {
883 this.emitter.emit(PoolEvents.taskError, message.taskError)
884 }
885 promiseResponse.reject(message.taskError.message)
886 } else {
887 promiseResponse.resolve(message.data as Response)
888 }
889 this.afterTaskExecutionHook(promiseResponse.worker, message)
890 this.promiseResponseMap.delete(message.id as string)
891 const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
892 if (
893 this.opts.enableTasksQueue === true &&
894 this.tasksQueueSize(workerNodeKey) > 0
895 ) {
896 this.executeTask(
897 workerNodeKey,
898 this.dequeueTask(workerNodeKey) as Task<Data>
899 )
900 }
901 this.workerChoiceStrategyContext.update(workerNodeKey)
902 }
903 }
904
905 private checkAndEmitEvents (): void {
906 if (this.emitter != null) {
907 if (this.busy) {
908 this.emitter?.emit(PoolEvents.busy, this.info)
909 }
910 if (this.type === PoolTypes.dynamic && this.full) {
911 this.emitter?.emit(PoolEvents.full, this.info)
912 }
913 }
914 }
915
916 /**
917 * Sets the given worker node its tasks usage in the pool.
918 *
919 * @param workerNode - The worker node.
920 * @param workerUsage - The worker usage.
921 */
922 private setWorkerNodeTasksUsage (
923 workerNode: WorkerNode<Worker, Data>,
924 workerUsage: WorkerUsage
925 ): void {
926 workerNode.usage = workerUsage
927 }
928
929 /**
930 * Pushes the given worker in the pool worker nodes.
931 *
932 * @param worker - The worker.
933 * @returns The worker nodes length.
934 */
935 private pushWorkerNode (worker: Worker): number {
936 this.workerNodes.push({
937 worker,
938 info: { id: this.getWorkerId(worker), started: true },
939 usage: this.getWorkerUsage(),
940 tasksQueue: new Queue<Task<Data>>()
941 })
942 const workerNodeKey = this.getWorkerNodeKey(worker)
943 this.setWorkerNodeTasksUsage(
944 this.workerNodes[workerNodeKey],
945 this.getWorkerUsage(workerNodeKey)
946 )
947 return this.workerNodes.length
948 }
949
950 /**
951 * Gets the worker id.
952 *
953 * @param worker - The worker.
954 * @returns The worker id.
955 */
956 private getWorkerId (worker: Worker): number | undefined {
957 if (this.worker === WorkerTypes.thread) {
958 return worker.threadId
959 } else if (this.worker === WorkerTypes.cluster) {
960 return worker.id
961 }
962 }
963
964 // /**
965 // * Sets the given worker in the pool worker nodes.
966 // *
967 // * @param workerNodeKey - The worker node key.
968 // * @param worker - The worker.
969 // * @param workerInfo - The worker info.
970 // * @param workerUsage - The worker usage.
971 // * @param tasksQueue - The worker task queue.
972 // */
973 // private setWorkerNode (
974 // workerNodeKey: number,
975 // worker: Worker,
976 // workerInfo: WorkerInfo,
977 // workerUsage: WorkerUsage,
978 // tasksQueue: Queue<Task<Data>>
979 // ): void {
980 // this.workerNodes[workerNodeKey] = {
981 // worker,
982 // info: workerInfo,
983 // usage: workerUsage,
984 // tasksQueue
985 // }
986 // }
987
988 /**
989 * Removes the given worker from the pool worker nodes.
990 *
991 * @param worker - The worker.
992 */
993 private removeWorkerNode (worker: Worker): void {
994 const workerNodeKey = this.getWorkerNodeKey(worker)
995 if (workerNodeKey !== -1) {
996 this.workerNodes.splice(workerNodeKey, 1)
997 this.workerChoiceStrategyContext.remove(workerNodeKey)
998 }
999 }
1000
1001 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1002 this.beforeTaskExecutionHook(workerNodeKey, task)
1003 this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
1004 }
1005
1006 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1007 return this.workerNodes[workerNodeKey].tasksQueue.enqueue(task)
1008 }
1009
1010 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1011 return this.workerNodes[workerNodeKey].tasksQueue.dequeue()
1012 }
1013
1014 private tasksQueueSize (workerNodeKey: number): number {
1015 return this.workerNodes[workerNodeKey].tasksQueue.size
1016 }
1017
1018 private tasksMaxQueueSize (workerNodeKey: number): number {
1019 return this.workerNodes[workerNodeKey].tasksQueue.maxSize
1020 }
1021
1022 private flushTasksQueue (workerNodeKey: number): void {
1023 while (this.tasksQueueSize(workerNodeKey) > 0) {
1024 this.executeTask(
1025 workerNodeKey,
1026 this.dequeueTask(workerNodeKey) as Task<Data>
1027 )
1028 }
1029 this.workerNodes[workerNodeKey].tasksQueue.clear()
1030 }
1031
1032 private flushTasksQueues (): void {
1033 for (const [workerNodeKey] of this.workerNodes.entries()) {
1034 this.flushTasksQueue(workerNodeKey)
1035 }
1036 }
1037
1038 private setWorkerStatistics (worker: Worker): void {
1039 this.sendToWorker(worker, {
1040 statistics: {
1041 runTime:
1042 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1043 .runTime.aggregate,
1044 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1045 .elu.aggregate
1046 }
1047 })
1048 }
1049
1050 private getWorkerUsage (workerNodeKey?: number): WorkerUsage {
1051 const getTasksQueueSize = (workerNodeKey?: number): number => {
1052 return workerNodeKey != null ? this.tasksQueueSize(workerNodeKey) : 0
1053 }
1054 const getTasksMaxQueueSize = (workerNodeKey?: number): number => {
1055 return workerNodeKey != null ? this.tasksMaxQueueSize(workerNodeKey) : 0
1056 }
1057 return {
1058 tasks: {
1059 executed: 0,
1060 executing: 0,
1061 get queued (): number {
1062 return getTasksQueueSize(workerNodeKey)
1063 },
1064 get maxQueued (): number {
1065 return getTasksMaxQueueSize(workerNodeKey)
1066 },
1067 failed: 0
1068 },
1069 runTime: {
1070 aggregate: 0,
1071 average: 0,
1072 median: 0,
1073 history: new CircularArray()
1074 },
1075 waitTime: {
1076 aggregate: 0,
1077 average: 0,
1078 median: 0,
1079 history: new CircularArray()
1080 },
1081 elu: {
1082 idle: {
1083 aggregate: 0,
1084 average: 0,
1085 median: 0,
1086 history: new CircularArray()
1087 },
1088 active: {
1089 aggregate: 0,
1090 average: 0,
1091 median: 0,
1092 history: new CircularArray()
1093 },
1094 utilization: 0
1095 }
1096 }
1097 }
1098 }