Merge branch 'master' of github.com:jerome-benoit/poolifier
[poolifier.git] / src / pools / abstract-pool.ts
1 import crypto from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import type { MessageValue, PromiseResponseWrapper } from '../utility-types'
4 import {
5 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
6 EMPTY_FUNCTION,
7 isKillBehavior,
8 isPlainObject,
9 median,
10 round
11 } from '../utils'
12 import { KillBehaviors } from '../worker/worker-options'
13 import { CircularArray } from '../circular-array'
14 import { Queue } from '../queue'
15 import {
16 type IPool,
17 PoolEmitter,
18 PoolEvents,
19 type PoolInfo,
20 type PoolOptions,
21 type PoolType,
22 PoolTypes,
23 type TasksQueueOptions,
24 type WorkerType,
25 WorkerTypes
26 } from './pool'
27 import type {
28 IWorker,
29 MessageHandler,
30 Task,
31 WorkerNode,
32 WorkerUsage
33 } from './worker'
34 import {
35 Measurements,
36 WorkerChoiceStrategies,
37 type WorkerChoiceStrategy,
38 type WorkerChoiceStrategyOptions
39 } from './selection-strategies/selection-strategies-types'
40 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
41 import { version } from './version'
42
43 /**
44 * Base class that implements some shared logic for all poolifier pools.
45 *
46 * @typeParam Worker - Type of worker which manages this pool.
47 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
48 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
49 */
50 export abstract class AbstractPool<
51 Worker extends IWorker,
52 Data = unknown,
53 Response = unknown
54 > implements IPool<Worker, Data, Response> {
55 /** @inheritDoc */
56 public readonly workerNodes: Array<WorkerNode<Worker, Data>> = []
57
58 /** @inheritDoc */
59 public readonly emitter?: PoolEmitter
60
61 /**
62 * The execution response promise map.
63 *
64 * - `key`: The message id of each submitted task.
65 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
66 *
67 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
68 */
69 protected promiseResponseMap: Map<
70 string,
71 PromiseResponseWrapper<Worker, Response>
72 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
73
74 /**
75 * Worker choice strategy context referencing a worker choice algorithm implementation.
76 */
77 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78 Worker,
79 Data,
80 Response
81 >
82
83 /**
84 * The start timestamp of the pool.
85 */
86 private readonly startTimestamp
87
88 /**
89 * Constructs a new poolifier pool.
90 *
91 * @param numberOfWorkers - Number of workers that this pool should manage.
92 * @param filePath - Path to the worker file.
93 * @param opts - Options for the pool.
94 */
95 public constructor (
96 protected readonly numberOfWorkers: number,
97 protected readonly filePath: string,
98 protected readonly opts: PoolOptions<Worker>
99 ) {
100 if (!this.isMain()) {
101 throw new Error('Cannot start a pool from a worker!')
102 }
103 this.checkNumberOfWorkers(this.numberOfWorkers)
104 this.checkFilePath(this.filePath)
105 this.checkPoolOptions(this.opts)
106
107 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
108 this.executeTask = this.executeTask.bind(this)
109 this.enqueueTask = this.enqueueTask.bind(this)
110 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
111
112 if (this.opts.enableEvents === true) {
113 this.emitter = new PoolEmitter()
114 }
115 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
116 Worker,
117 Data,
118 Response
119 >(
120 this,
121 this.opts.workerChoiceStrategy,
122 this.opts.workerChoiceStrategyOptions
123 )
124
125 this.setupHook()
126
127 while (this.workerNodes.length < this.numberOfWorkers) {
128 this.createAndSetupWorker()
129 }
130
131 this.startTimestamp = performance.now()
132 }
133
134 private checkFilePath (filePath: string): void {
135 if (
136 filePath == null ||
137 (typeof filePath === 'string' && filePath.trim().length === 0)
138 ) {
139 throw new Error('Please specify a file with a worker implementation')
140 }
141 }
142
143 private checkNumberOfWorkers (numberOfWorkers: number): void {
144 if (numberOfWorkers == null) {
145 throw new Error(
146 'Cannot instantiate a pool without specifying the number of workers'
147 )
148 } else if (!Number.isSafeInteger(numberOfWorkers)) {
149 throw new TypeError(
150 'Cannot instantiate a pool with a non safe integer number of workers'
151 )
152 } else if (numberOfWorkers < 0) {
153 throw new RangeError(
154 'Cannot instantiate a pool with a negative number of workers'
155 )
156 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
157 throw new Error('Cannot instantiate a fixed pool with no worker')
158 }
159 }
160
161 private checkPoolOptions (opts: PoolOptions<Worker>): void {
162 if (isPlainObject(opts)) {
163 this.opts.workerChoiceStrategy =
164 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
165 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
166 this.opts.workerChoiceStrategyOptions =
167 opts.workerChoiceStrategyOptions ??
168 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
169 this.checkValidWorkerChoiceStrategyOptions(
170 this.opts.workerChoiceStrategyOptions
171 )
172 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
173 this.opts.enableEvents = opts.enableEvents ?? true
174 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
175 if (this.opts.enableTasksQueue) {
176 this.checkValidTasksQueueOptions(
177 opts.tasksQueueOptions as TasksQueueOptions
178 )
179 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
180 opts.tasksQueueOptions as TasksQueueOptions
181 )
182 }
183 } else {
184 throw new TypeError('Invalid pool options: must be a plain object')
185 }
186 }
187
188 private checkValidWorkerChoiceStrategy (
189 workerChoiceStrategy: WorkerChoiceStrategy
190 ): void {
191 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
192 throw new Error(
193 `Invalid worker choice strategy '${workerChoiceStrategy}'`
194 )
195 }
196 }
197
198 private checkValidWorkerChoiceStrategyOptions (
199 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
200 ): void {
201 if (!isPlainObject(workerChoiceStrategyOptions)) {
202 throw new TypeError(
203 'Invalid worker choice strategy options: must be a plain object'
204 )
205 }
206 if (
207 workerChoiceStrategyOptions.weights != null &&
208 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
209 ) {
210 throw new Error(
211 'Invalid worker choice strategy options: must have a weight for each worker node'
212 )
213 }
214 if (
215 workerChoiceStrategyOptions.measurement != null &&
216 !Object.values(Measurements).includes(
217 workerChoiceStrategyOptions.measurement
218 )
219 ) {
220 throw new Error(
221 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
222 )
223 }
224 }
225
226 private checkValidTasksQueueOptions (
227 tasksQueueOptions: TasksQueueOptions
228 ): void {
229 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
230 throw new TypeError('Invalid tasks queue options: must be a plain object')
231 }
232 if (
233 tasksQueueOptions?.concurrency != null &&
234 !Number.isSafeInteger(tasksQueueOptions.concurrency)
235 ) {
236 throw new TypeError(
237 'Invalid worker tasks concurrency: must be an integer'
238 )
239 }
240 if (
241 tasksQueueOptions?.concurrency != null &&
242 tasksQueueOptions.concurrency <= 0
243 ) {
244 throw new Error(
245 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
246 )
247 }
248 }
249
250 /** @inheritDoc */
251 public get info (): PoolInfo {
252 return {
253 version,
254 type: this.type,
255 worker: this.worker,
256 minSize: this.minSize,
257 maxSize: this.maxSize,
258 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
259 .runTime.aggregate &&
260 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
261 .waitTime.aggregate && { utilization: round(this.utilization) }),
262 workerNodes: this.workerNodes.length,
263 idleWorkerNodes: this.workerNodes.reduce(
264 (accumulator, workerNode) =>
265 workerNode.usage.tasks.executing === 0
266 ? accumulator + 1
267 : accumulator,
268 0
269 ),
270 busyWorkerNodes: this.workerNodes.reduce(
271 (accumulator, workerNode) =>
272 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
273 0
274 ),
275 executedTasks: this.workerNodes.reduce(
276 (accumulator, workerNode) =>
277 accumulator + workerNode.usage.tasks.executed,
278 0
279 ),
280 executingTasks: this.workerNodes.reduce(
281 (accumulator, workerNode) =>
282 accumulator + workerNode.usage.tasks.executing,
283 0
284 ),
285 queuedTasks: this.workerNodes.reduce(
286 (accumulator, workerNode) =>
287 accumulator + workerNode.usage.tasks.queued,
288 0
289 ),
290 maxQueuedTasks: this.workerNodes.reduce(
291 (accumulator, workerNode) =>
292 accumulator + workerNode.usage.tasks.maxQueued,
293 0
294 ),
295 failedTasks: this.workerNodes.reduce(
296 (accumulator, workerNode) =>
297 accumulator + workerNode.usage.tasks.failed,
298 0
299 )
300 }
301 }
302
303 /**
304 * Gets the approximate pool utilization.
305 *
306 * @returns The pool utilization.
307 */
308 private get utilization (): number {
309 const poolRunTimeCapacity =
310 (performance.now() - this.startTimestamp) * this.maxSize
311 const totalTasksRunTime = this.workerNodes.reduce(
312 (accumulator, workerNode) =>
313 accumulator + workerNode.usage.runTime.aggregate,
314 0
315 )
316 const totalTasksWaitTime = this.workerNodes.reduce(
317 (accumulator, workerNode) =>
318 accumulator + workerNode.usage.waitTime.aggregate,
319 0
320 )
321 return (totalTasksRunTime + totalTasksWaitTime) / poolRunTimeCapacity
322 }
323
324 /**
325 * Pool type.
326 *
327 * If it is `'dynamic'`, it provides the `max` property.
328 */
329 protected abstract get type (): PoolType
330
331 /**
332 * Gets the worker type.
333 */
334 protected abstract get worker (): WorkerType
335
336 /**
337 * Pool minimum size.
338 */
339 protected abstract get minSize (): number
340
341 /**
342 * Pool maximum size.
343 */
344 protected abstract get maxSize (): number
345
346 /**
347 * Get the worker given its id.
348 *
349 * @param workerId - The worker id.
350 * @returns The worker if found in the pool worker nodes, `undefined` otherwise.
351 */
352 private getWorkerById (workerId: number): Worker | undefined {
353 return this.workerNodes.find(workerNode => workerNode.info.id === workerId)
354 ?.worker
355 }
356
357 /**
358 * Gets the given worker its worker node key.
359 *
360 * @param worker - The worker.
361 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
362 */
363 private getWorkerNodeKey (worker: Worker): number {
364 return this.workerNodes.findIndex(
365 workerNode => workerNode.worker === worker
366 )
367 }
368
369 /** @inheritDoc */
370 public setWorkerChoiceStrategy (
371 workerChoiceStrategy: WorkerChoiceStrategy,
372 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
373 ): void {
374 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
375 this.opts.workerChoiceStrategy = workerChoiceStrategy
376 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
377 this.opts.workerChoiceStrategy
378 )
379 if (workerChoiceStrategyOptions != null) {
380 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
381 }
382 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
383 this.setWorkerNodeTasksUsage(
384 workerNode,
385 this.getWorkerUsage(workerNodeKey)
386 )
387 this.setWorkerStatistics(workerNode.worker)
388 }
389 }
390
391 /** @inheritDoc */
392 public setWorkerChoiceStrategyOptions (
393 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
394 ): void {
395 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
396 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
397 this.workerChoiceStrategyContext.setOptions(
398 this.opts.workerChoiceStrategyOptions
399 )
400 }
401
402 /** @inheritDoc */
403 public enableTasksQueue (
404 enable: boolean,
405 tasksQueueOptions?: TasksQueueOptions
406 ): void {
407 if (this.opts.enableTasksQueue === true && !enable) {
408 this.flushTasksQueues()
409 }
410 this.opts.enableTasksQueue = enable
411 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
412 }
413
414 /** @inheritDoc */
415 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
416 if (this.opts.enableTasksQueue === true) {
417 this.checkValidTasksQueueOptions(tasksQueueOptions)
418 this.opts.tasksQueueOptions =
419 this.buildTasksQueueOptions(tasksQueueOptions)
420 } else if (this.opts.tasksQueueOptions != null) {
421 delete this.opts.tasksQueueOptions
422 }
423 }
424
425 private buildTasksQueueOptions (
426 tasksQueueOptions: TasksQueueOptions
427 ): TasksQueueOptions {
428 return {
429 concurrency: tasksQueueOptions?.concurrency ?? 1
430 }
431 }
432
433 /**
434 * Whether the pool is full or not.
435 *
436 * The pool filling boolean status.
437 */
438 protected get full (): boolean {
439 return this.workerNodes.length >= this.maxSize
440 }
441
442 /**
443 * Whether the pool is busy or not.
444 *
445 * The pool busyness boolean status.
446 */
447 protected abstract get busy (): boolean
448
449 /**
450 * Whether worker nodes are executing at least one task.
451 *
452 * @returns Worker nodes busyness boolean status.
453 */
454 protected internalBusy (): boolean {
455 return (
456 this.workerNodes.findIndex(workerNode => {
457 return workerNode.usage.tasks.executing === 0
458 }) === -1
459 )
460 }
461
462 /** @inheritDoc */
463 public async execute (data?: Data, name?: string): Promise<Response> {
464 const timestamp = performance.now()
465 const workerNodeKey = this.chooseWorkerNode()
466 const submittedTask: Task<Data> = {
467 name,
468 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
469 data: data ?? ({} as Data),
470 timestamp,
471 id: crypto.randomUUID()
472 }
473 const res = new Promise<Response>((resolve, reject) => {
474 this.promiseResponseMap.set(submittedTask.id as string, {
475 resolve,
476 reject,
477 worker: this.workerNodes[workerNodeKey].worker
478 })
479 })
480 if (
481 this.opts.enableTasksQueue === true &&
482 (this.busy ||
483 this.workerNodes[workerNodeKey].usage.tasks.executing >=
484 ((this.opts.tasksQueueOptions as TasksQueueOptions)
485 .concurrency as number))
486 ) {
487 this.enqueueTask(workerNodeKey, submittedTask)
488 } else {
489 this.executeTask(workerNodeKey, submittedTask)
490 }
491 this.checkAndEmitEvents()
492 // eslint-disable-next-line @typescript-eslint/return-await
493 return res
494 }
495
496 /** @inheritDoc */
497 public async destroy (): Promise<void> {
498 await Promise.all(
499 this.workerNodes.map(async (workerNode, workerNodeKey) => {
500 this.flushTasksQueue(workerNodeKey)
501 // FIXME: wait for tasks to be finished
502 const workerExitPromise = new Promise<void>(resolve => {
503 workerNode.worker.on('exit', () => {
504 resolve()
505 })
506 })
507 await this.destroyWorker(workerNode.worker)
508 await workerExitPromise
509 })
510 )
511 }
512
513 /**
514 * Terminates the given worker.
515 *
516 * @param worker - A worker within `workerNodes`.
517 */
518 protected abstract destroyWorker (worker: Worker): void | Promise<void>
519
520 /**
521 * Setup hook to execute code before worker nodes are created in the abstract constructor.
522 * Can be overridden.
523 *
524 * @virtual
525 */
526 protected setupHook (): void {
527 // Intentionally empty
528 }
529
530 /**
531 * Should return whether the worker is the main worker or not.
532 */
533 protected abstract isMain (): boolean
534
535 /**
536 * Hook executed before the worker task execution.
537 * Can be overridden.
538 *
539 * @param workerNodeKey - The worker node key.
540 * @param task - The task to execute.
541 */
542 protected beforeTaskExecutionHook (
543 workerNodeKey: number,
544 task: Task<Data>
545 ): void {
546 const workerUsage = this.workerNodes[workerNodeKey].usage
547 ++workerUsage.tasks.executing
548 this.updateWaitTimeWorkerUsage(workerUsage, task)
549 }
550
551 /**
552 * Hook executed after the worker task execution.
553 * Can be overridden.
554 *
555 * @param worker - The worker.
556 * @param message - The received message.
557 */
558 protected afterTaskExecutionHook (
559 worker: Worker,
560 message: MessageValue<Response>
561 ): void {
562 const workerUsage = this.workerNodes[this.getWorkerNodeKey(worker)].usage
563 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
564 this.updateRunTimeWorkerUsage(workerUsage, message)
565 this.updateEluWorkerUsage(workerUsage, message)
566 }
567
568 private updateTaskStatisticsWorkerUsage (
569 workerUsage: WorkerUsage,
570 message: MessageValue<Response>
571 ): void {
572 const workerTaskStatistics = workerUsage.tasks
573 --workerTaskStatistics.executing
574 ++workerTaskStatistics.executed
575 if (message.taskError != null) {
576 ++workerTaskStatistics.failed
577 }
578 }
579
580 private updateRunTimeWorkerUsage (
581 workerUsage: WorkerUsage,
582 message: MessageValue<Response>
583 ): void {
584 if (
585 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
586 .aggregate
587 ) {
588 workerUsage.runTime.aggregate += message.taskPerformance?.runTime ?? 0
589 if (
590 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
591 .average &&
592 workerUsage.tasks.executed !== 0
593 ) {
594 workerUsage.runTime.average =
595 workerUsage.runTime.aggregate /
596 (workerUsage.tasks.executed - workerUsage.tasks.failed)
597 }
598 if (
599 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
600 .median &&
601 message.taskPerformance?.runTime != null
602 ) {
603 workerUsage.runTime.history.push(message.taskPerformance.runTime)
604 workerUsage.runTime.median = median(workerUsage.runTime.history)
605 }
606 }
607 }
608
609 private updateWaitTimeWorkerUsage (
610 workerUsage: WorkerUsage,
611 task: Task<Data>
612 ): void {
613 const timestamp = performance.now()
614 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
615 if (
616 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime
617 .aggregate
618 ) {
619 workerUsage.waitTime.aggregate += taskWaitTime ?? 0
620 if (
621 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
622 .waitTime.average &&
623 workerUsage.tasks.executed !== 0
624 ) {
625 workerUsage.waitTime.average =
626 workerUsage.waitTime.aggregate /
627 (workerUsage.tasks.executed - workerUsage.tasks.failed)
628 }
629 if (
630 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
631 .waitTime.median &&
632 taskWaitTime != null
633 ) {
634 workerUsage.waitTime.history.push(taskWaitTime)
635 workerUsage.waitTime.median = median(workerUsage.waitTime.history)
636 }
637 }
638 }
639
640 private updateEluWorkerUsage (
641 workerUsage: WorkerUsage,
642 message: MessageValue<Response>
643 ): void {
644 if (
645 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
646 .aggregate
647 ) {
648 if (workerUsage.elu != null && message.taskPerformance?.elu != null) {
649 workerUsage.elu.idle.aggregate += message.taskPerformance.elu.idle
650 workerUsage.elu.active.aggregate += message.taskPerformance.elu.active
651 workerUsage.elu.utilization =
652 (workerUsage.elu.utilization +
653 message.taskPerformance.elu.utilization) /
654 2
655 } else if (message.taskPerformance?.elu != null) {
656 workerUsage.elu.idle.aggregate = message.taskPerformance.elu.idle
657 workerUsage.elu.active.aggregate = message.taskPerformance.elu.active
658 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
659 }
660 if (
661 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
662 .average &&
663 workerUsage.tasks.executed !== 0
664 ) {
665 const executedTasks =
666 workerUsage.tasks.executed - workerUsage.tasks.failed
667 workerUsage.elu.idle.average =
668 workerUsage.elu.idle.aggregate / executedTasks
669 workerUsage.elu.active.average =
670 workerUsage.elu.active.aggregate / executedTasks
671 }
672 if (
673 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
674 .median &&
675 message.taskPerformance?.elu != null
676 ) {
677 workerUsage.elu.idle.history.push(message.taskPerformance.elu.idle)
678 workerUsage.elu.active.history.push(message.taskPerformance.elu.active)
679 workerUsage.elu.idle.median = median(workerUsage.elu.idle.history)
680 workerUsage.elu.active.median = median(workerUsage.elu.active.history)
681 }
682 }
683 }
684
685 /**
686 * Chooses a worker node for the next task.
687 *
688 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
689 *
690 * @returns The worker node key
691 */
692 private chooseWorkerNode (): number {
693 if (this.shallCreateDynamicWorker()) {
694 const worker = this.createAndSetupDynamicWorker()
695 if (
696 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
697 ) {
698 return this.getWorkerNodeKey(worker)
699 }
700 }
701 return this.workerChoiceStrategyContext.execute()
702 }
703
704 /**
705 * Conditions for dynamic worker creation.
706 *
707 * @returns Whether to create a dynamic worker or not.
708 */
709 private shallCreateDynamicWorker (): boolean {
710 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
711 }
712
713 /**
714 * Sends a message to the given worker.
715 *
716 * @param worker - The worker which should receive the message.
717 * @param message - The message.
718 */
719 protected abstract sendToWorker (
720 worker: Worker,
721 message: MessageValue<Data>
722 ): void
723
724 /**
725 * Registers a listener callback on the given worker.
726 *
727 * @param worker - The worker which should register a listener.
728 * @param listener - The message listener callback.
729 */
730 private registerWorkerMessageListener<Message extends Data | Response>(
731 worker: Worker,
732 listener: (message: MessageValue<Message>) => void
733 ): void {
734 worker.on('message', listener as MessageHandler<Worker>)
735 }
736
737 /**
738 * Creates a new worker.
739 *
740 * @returns Newly created worker.
741 */
742 protected abstract createWorker (): Worker
743
744 /**
745 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
746 * Can be overridden.
747 *
748 * @param worker - The newly created worker.
749 */
750 protected afterWorkerSetup (worker: Worker): void {
751 // Listen to worker messages.
752 this.registerWorkerMessageListener(worker, this.workerListener())
753 }
754
755 /**
756 * Creates a new worker and sets it up completely in the pool worker nodes.
757 *
758 * @returns New, completely set up worker.
759 */
760 protected createAndSetupWorker (): Worker {
761 const worker = this.createWorker()
762
763 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
764 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
765 worker.on('error', error => {
766 if (this.emitter != null) {
767 this.emitter.emit(PoolEvents.error, error)
768 }
769 if (this.opts.enableTasksQueue === true) {
770 const workerNodeKey = this.getWorkerNodeKey(worker)
771 while (this.tasksQueueSize(workerNodeKey) > 0) {
772 let targetWorkerNodeKey: number = workerNodeKey
773 let minQueuedTasks = Infinity
774 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
775 if (
776 workerNodeId !== workerNodeKey &&
777 workerNode.usage.tasks.queued === 0
778 ) {
779 targetWorkerNodeKey = workerNodeId
780 break
781 }
782 if (
783 workerNodeId !== workerNodeKey &&
784 workerNode.usage.tasks.queued < minQueuedTasks
785 ) {
786 minQueuedTasks = workerNode.usage.tasks.queued
787 targetWorkerNodeKey = workerNodeId
788 }
789 }
790 this.enqueueTask(
791 targetWorkerNodeKey,
792 this.dequeueTask(workerNodeKey) as Task<Data>
793 )
794 }
795 }
796 if (this.opts.restartWorkerOnError === true) {
797 this.createAndSetupWorker()
798 }
799 })
800 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
801 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
802 worker.once('exit', () => {
803 this.removeWorkerNode(worker)
804 })
805
806 this.pushWorkerNode(worker)
807
808 this.setWorkerStatistics(worker)
809
810 this.afterWorkerSetup(worker)
811
812 return worker
813 }
814
815 /**
816 * Creates a new dynamic worker and sets it up completely in the pool worker nodes.
817 *
818 * @returns New, completely set up dynamic worker.
819 */
820 protected createAndSetupDynamicWorker (): Worker {
821 const worker = this.createAndSetupWorker()
822 this.registerWorkerMessageListener(worker, message => {
823 const workerNodeKey = this.getWorkerNodeKey(worker)
824 if (
825 isKillBehavior(KillBehaviors.HARD, message.kill) ||
826 (message.kill != null &&
827 ((this.opts.enableTasksQueue === false &&
828 this.workerNodes[workerNodeKey].usage.tasks.executing === 0) ||
829 (this.opts.enableTasksQueue === true &&
830 this.workerNodes[workerNodeKey].usage.tasks.executing === 0 &&
831 this.tasksQueueSize(workerNodeKey) === 0)))
832 ) {
833 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
834 void (this.destroyWorker(worker) as Promise<void>)
835 }
836 })
837 return worker
838 }
839
840 /**
841 * This function is the listener registered for each worker message.
842 *
843 * @returns The listener function to execute when a message is received from a worker.
844 */
845 protected workerListener (): (message: MessageValue<Response>) => void {
846 return message => {
847 if (message.workerId != null && message.started != null) {
848 // Worker started message received
849 this.handleWorkerStartedMessage(message)
850 } else if (message.id != null) {
851 // Task execution response received
852 this.handleTaskExecutionResponse(message)
853 }
854 }
855 }
856
857 private handleWorkerStartedMessage (message: MessageValue<Response>): void {
858 // Worker started message received
859 const worker = this.getWorkerById(message.workerId as number)
860 if (worker != null) {
861 this.workerNodes[this.getWorkerNodeKey(worker)].info.started =
862 message.started as boolean
863 } else {
864 throw new Error(
865 `Worker started message received from unknown worker '${
866 message.workerId as number
867 }'`
868 )
869 }
870 }
871
872 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
873 const promiseResponse = this.promiseResponseMap.get(message.id as string)
874 if (promiseResponse != null) {
875 if (message.taskError != null) {
876 if (this.emitter != null) {
877 this.emitter.emit(PoolEvents.taskError, message.taskError)
878 }
879 promiseResponse.reject(message.taskError.message)
880 } else {
881 promiseResponse.resolve(message.data as Response)
882 }
883 this.afterTaskExecutionHook(promiseResponse.worker, message)
884 this.promiseResponseMap.delete(message.id as string)
885 const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
886 if (
887 this.opts.enableTasksQueue === true &&
888 this.tasksQueueSize(workerNodeKey) > 0
889 ) {
890 this.executeTask(
891 workerNodeKey,
892 this.dequeueTask(workerNodeKey) as Task<Data>
893 )
894 }
895 this.workerChoiceStrategyContext.update(workerNodeKey)
896 }
897 }
898
899 private checkAndEmitEvents (): void {
900 if (this.emitter != null) {
901 if (this.busy) {
902 this.emitter?.emit(PoolEvents.busy, this.info)
903 }
904 if (this.type === PoolTypes.dynamic && this.full) {
905 this.emitter?.emit(PoolEvents.full, this.info)
906 }
907 }
908 }
909
910 /**
911 * Sets the given worker node its tasks usage in the pool.
912 *
913 * @param workerNode - The worker node.
914 * @param workerUsage - The worker usage.
915 */
916 private setWorkerNodeTasksUsage (
917 workerNode: WorkerNode<Worker, Data>,
918 workerUsage: WorkerUsage
919 ): void {
920 workerNode.usage = workerUsage
921 }
922
923 /**
924 * Pushes the given worker in the pool worker nodes.
925 *
926 * @param worker - The worker.
927 * @returns The worker nodes length.
928 */
929 private pushWorkerNode (worker: Worker): number {
930 this.workerNodes.push({
931 worker,
932 info: { id: this.getWorkerId(worker), started: true },
933 usage: this.getWorkerUsage(),
934 tasksQueue: new Queue<Task<Data>>()
935 })
936 const workerNodeKey = this.getWorkerNodeKey(worker)
937 this.setWorkerNodeTasksUsage(
938 this.workerNodes[workerNodeKey],
939 this.getWorkerUsage(workerNodeKey)
940 )
941 return this.workerNodes.length
942 }
943
944 /**
945 * Gets the worker id.
946 *
947 * @param worker - The worker.
948 * @returns The worker id.
949 */
950 private getWorkerId (worker: Worker): number | undefined {
951 if (this.worker === WorkerTypes.thread) {
952 return worker.threadId
953 } else if (this.worker === WorkerTypes.cluster) {
954 return worker.id
955 }
956 }
957
958 // /**
959 // * Sets the given worker in the pool worker nodes.
960 // *
961 // * @param workerNodeKey - The worker node key.
962 // * @param worker - The worker.
963 // * @param workerInfo - The worker info.
964 // * @param workerUsage - The worker usage.
965 // * @param tasksQueue - The worker task queue.
966 // */
967 // private setWorkerNode (
968 // workerNodeKey: number,
969 // worker: Worker,
970 // workerInfo: WorkerInfo,
971 // workerUsage: WorkerUsage,
972 // tasksQueue: Queue<Task<Data>>
973 // ): void {
974 // this.workerNodes[workerNodeKey] = {
975 // worker,
976 // info: workerInfo,
977 // usage: workerUsage,
978 // tasksQueue
979 // }
980 // }
981
982 /**
983 * Removes the given worker from the pool worker nodes.
984 *
985 * @param worker - The worker.
986 */
987 private removeWorkerNode (worker: Worker): void {
988 const workerNodeKey = this.getWorkerNodeKey(worker)
989 if (workerNodeKey !== -1) {
990 this.workerNodes.splice(workerNodeKey, 1)
991 this.workerChoiceStrategyContext.remove(workerNodeKey)
992 }
993 }
994
995 private executeTask (workerNodeKey: number, task: Task<Data>): void {
996 this.beforeTaskExecutionHook(workerNodeKey, task)
997 this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
998 }
999
1000 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1001 return this.workerNodes[workerNodeKey].tasksQueue.enqueue(task)
1002 }
1003
1004 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1005 return this.workerNodes[workerNodeKey].tasksQueue.dequeue()
1006 }
1007
1008 private tasksQueueSize (workerNodeKey: number): number {
1009 return this.workerNodes[workerNodeKey].tasksQueue.size
1010 }
1011
1012 private tasksMaxQueueSize (workerNodeKey: number): number {
1013 return this.workerNodes[workerNodeKey].tasksQueue.maxSize
1014 }
1015
1016 private flushTasksQueue (workerNodeKey: number): void {
1017 while (this.tasksQueueSize(workerNodeKey) > 0) {
1018 this.executeTask(
1019 workerNodeKey,
1020 this.dequeueTask(workerNodeKey) as Task<Data>
1021 )
1022 }
1023 this.workerNodes[workerNodeKey].tasksQueue.clear()
1024 }
1025
1026 private flushTasksQueues (): void {
1027 for (const [workerNodeKey] of this.workerNodes.entries()) {
1028 this.flushTasksQueue(workerNodeKey)
1029 }
1030 }
1031
1032 private setWorkerStatistics (worker: Worker): void {
1033 this.sendToWorker(worker, {
1034 statistics: {
1035 runTime:
1036 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1037 .runTime.aggregate,
1038 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1039 .elu.aggregate
1040 }
1041 })
1042 }
1043
1044 private getWorkerUsage (workerNodeKey?: number): WorkerUsage {
1045 const getTasksQueueSize = (workerNodeKey?: number): number => {
1046 return workerNodeKey != null ? this.tasksQueueSize(workerNodeKey) : 0
1047 }
1048 const getTasksMaxQueueSize = (workerNodeKey?: number): number => {
1049 return workerNodeKey != null ? this.tasksMaxQueueSize(workerNodeKey) : 0
1050 }
1051 return {
1052 tasks: {
1053 executed: 0,
1054 executing: 0,
1055 get queued (): number {
1056 return getTasksQueueSize(workerNodeKey)
1057 },
1058 get maxQueued (): number {
1059 return getTasksMaxQueueSize(workerNodeKey)
1060 },
1061 failed: 0
1062 },
1063 runTime: {
1064 aggregate: 0,
1065 average: 0,
1066 median: 0,
1067 history: new CircularArray()
1068 },
1069 waitTime: {
1070 aggregate: 0,
1071 average: 0,
1072 median: 0,
1073 history: new CircularArray()
1074 },
1075 elu: {
1076 idle: {
1077 aggregate: 0,
1078 average: 0,
1079 median: 0,
1080 history: new CircularArray()
1081 },
1082 active: {
1083 aggregate: 0,
1084 average: 0,
1085 median: 0,
1086 history: new CircularArray()
1087 },
1088 utilization: 0
1089 }
1090 }
1091 }
1092 }