Merge branch 'master' into worker-info
[poolifier.git] / src / pools / abstract-pool.ts
1 import crypto from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import type { MessageValue, PromiseResponseWrapper } from '../utility-types'
4 import {
5 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
6 EMPTY_FUNCTION,
7 isPlainObject,
8 median,
9 round
10 } from '../utils'
11 import { KillBehaviors, isKillBehavior } from '../worker/worker-options'
12 import { CircularArray } from '../circular-array'
13 import { Queue } from '../queue'
14 import {
15 type IPool,
16 PoolEmitter,
17 PoolEvents,
18 type PoolInfo,
19 type PoolOptions,
20 type PoolType,
21 PoolTypes,
22 type TasksQueueOptions,
23 type WorkerType,
24 WorkerTypes
25 } from './pool'
26 import type {
27 IWorker,
28 MessageHandler,
29 Task,
30 WorkerNode,
31 WorkerUsage
32 } from './worker'
33 import {
34 Measurements,
35 WorkerChoiceStrategies,
36 type WorkerChoiceStrategy,
37 type WorkerChoiceStrategyOptions
38 } from './selection-strategies/selection-strategies-types'
39 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
40
41 /**
42 * Base class that implements some shared logic for all poolifier pools.
43 *
44 * @typeParam Worker - Type of worker which manages this pool.
45 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
46 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
47 */
48 export abstract class AbstractPool<
49 Worker extends IWorker,
50 Data = unknown,
51 Response = unknown
52 > implements IPool<Worker, Data, Response> {
53 /** @inheritDoc */
54 public readonly workerNodes: Array<WorkerNode<Worker, Data>> = []
55
56 /** @inheritDoc */
57 public readonly emitter?: PoolEmitter
58
59 /**
60 * The execution response promise map.
61 *
62 * - `key`: The message id of each submitted task.
63 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
64 *
65 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
66 */
67 protected promiseResponseMap: Map<
68 string,
69 PromiseResponseWrapper<Worker, Response>
70 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
71
72 /**
73 * Worker choice strategy context referencing a worker choice algorithm implementation.
74 */
75 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
76 Worker,
77 Data,
78 Response
79 >
80
81 /**
82 * The start timestamp of the pool.
83 */
84 private readonly startTimestamp
85
86 /**
87 * Constructs a new poolifier pool.
88 *
89 * @param numberOfWorkers - Number of workers that this pool should manage.
90 * @param filePath - Path to the worker file.
91 * @param opts - Options for the pool.
92 */
93 public constructor (
94 protected readonly numberOfWorkers: number,
95 protected readonly filePath: string,
96 protected readonly opts: PoolOptions<Worker>
97 ) {
98 if (!this.isMain()) {
99 throw new Error('Cannot start a pool from a worker!')
100 }
101 this.checkNumberOfWorkers(this.numberOfWorkers)
102 this.checkFilePath(this.filePath)
103 this.checkPoolOptions(this.opts)
104
105 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
106 this.executeTask = this.executeTask.bind(this)
107 this.enqueueTask = this.enqueueTask.bind(this)
108 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
109
110 if (this.opts.enableEvents === true) {
111 this.emitter = new PoolEmitter()
112 }
113 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
114 Worker,
115 Data,
116 Response
117 >(
118 this,
119 this.opts.workerChoiceStrategy,
120 this.opts.workerChoiceStrategyOptions
121 )
122
123 this.setupHook()
124
125 while (this.workerNodes.length < this.numberOfWorkers) {
126 this.createAndSetupWorker()
127 }
128
129 this.startTimestamp = performance.now()
130 }
131
132 private checkFilePath (filePath: string): void {
133 if (
134 filePath == null ||
135 (typeof filePath === 'string' && filePath.trim().length === 0)
136 ) {
137 throw new Error('Please specify a file with a worker implementation')
138 }
139 }
140
141 private checkNumberOfWorkers (numberOfWorkers: number): void {
142 if (numberOfWorkers == null) {
143 throw new Error(
144 'Cannot instantiate a pool without specifying the number of workers'
145 )
146 } else if (!Number.isSafeInteger(numberOfWorkers)) {
147 throw new TypeError(
148 'Cannot instantiate a pool with a non safe integer number of workers'
149 )
150 } else if (numberOfWorkers < 0) {
151 throw new RangeError(
152 'Cannot instantiate a pool with a negative number of workers'
153 )
154 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
155 throw new Error('Cannot instantiate a fixed pool with no worker')
156 }
157 }
158
159 private checkPoolOptions (opts: PoolOptions<Worker>): void {
160 if (isPlainObject(opts)) {
161 this.opts.workerChoiceStrategy =
162 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
163 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
164 this.opts.workerChoiceStrategyOptions =
165 opts.workerChoiceStrategyOptions ??
166 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
167 this.checkValidWorkerChoiceStrategyOptions(
168 this.opts.workerChoiceStrategyOptions
169 )
170 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
171 this.opts.enableEvents = opts.enableEvents ?? true
172 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
173 if (this.opts.enableTasksQueue) {
174 this.checkValidTasksQueueOptions(
175 opts.tasksQueueOptions as TasksQueueOptions
176 )
177 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
178 opts.tasksQueueOptions as TasksQueueOptions
179 )
180 }
181 } else {
182 throw new TypeError('Invalid pool options: must be a plain object')
183 }
184 }
185
186 private checkValidWorkerChoiceStrategy (
187 workerChoiceStrategy: WorkerChoiceStrategy
188 ): void {
189 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
190 throw new Error(
191 `Invalid worker choice strategy '${workerChoiceStrategy}'`
192 )
193 }
194 }
195
196 private checkValidWorkerChoiceStrategyOptions (
197 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
198 ): void {
199 if (!isPlainObject(workerChoiceStrategyOptions)) {
200 throw new TypeError(
201 'Invalid worker choice strategy options: must be a plain object'
202 )
203 }
204 if (
205 workerChoiceStrategyOptions.weights != null &&
206 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
207 ) {
208 throw new Error(
209 'Invalid worker choice strategy options: must have a weight for each worker node'
210 )
211 }
212 if (
213 workerChoiceStrategyOptions.measurement != null &&
214 !Object.values(Measurements).includes(
215 workerChoiceStrategyOptions.measurement
216 )
217 ) {
218 throw new Error(
219 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
220 )
221 }
222 }
223
224 private checkValidTasksQueueOptions (
225 tasksQueueOptions: TasksQueueOptions
226 ): void {
227 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
228 throw new TypeError('Invalid tasks queue options: must be a plain object')
229 }
230 if (
231 tasksQueueOptions?.concurrency != null &&
232 !Number.isSafeInteger(tasksQueueOptions.concurrency)
233 ) {
234 throw new TypeError(
235 'Invalid worker tasks concurrency: must be an integer'
236 )
237 }
238 if (
239 tasksQueueOptions?.concurrency != null &&
240 tasksQueueOptions.concurrency <= 0
241 ) {
242 throw new Error(
243 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
244 )
245 }
246 }
247
248 private get starting (): boolean {
249 return this.workerNodes.some(workerNode => !workerNode.info.started)
250 }
251
252 private get started (): boolean {
253 return this.workerNodes.some(workerNode => workerNode.info.started)
254 }
255
256 /** @inheritDoc */
257 public get info (): PoolInfo {
258 return {
259 type: this.type,
260 worker: this.worker,
261 minSize: this.minSize,
262 maxSize: this.maxSize,
263 utilization: round(this.utilization),
264 workerNodes: this.workerNodes.length,
265 idleWorkerNodes: this.workerNodes.reduce(
266 (accumulator, workerNode) =>
267 workerNode.usage.tasks.executing === 0
268 ? accumulator + 1
269 : accumulator,
270 0
271 ),
272 busyWorkerNodes: this.workerNodes.reduce(
273 (accumulator, workerNode) =>
274 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
275 0
276 ),
277 executedTasks: this.workerNodes.reduce(
278 (accumulator, workerNode) =>
279 accumulator + workerNode.usage.tasks.executed,
280 0
281 ),
282 executingTasks: this.workerNodes.reduce(
283 (accumulator, workerNode) =>
284 accumulator + workerNode.usage.tasks.executing,
285 0
286 ),
287 queuedTasks: this.workerNodes.reduce(
288 (accumulator, workerNode) =>
289 accumulator + workerNode.usage.tasks.queued,
290 0
291 ),
292 maxQueuedTasks: this.workerNodes.reduce(
293 (accumulator, workerNode) =>
294 accumulator + workerNode.usage.tasks.maxQueued,
295 0
296 ),
297 failedTasks: this.workerNodes.reduce(
298 (accumulator, workerNode) =>
299 accumulator + workerNode.usage.tasks.failed,
300 0
301 )
302 }
303 }
304
305 /**
306 * Gets the pool run time.
307 *
308 * @returns The pool run time in milliseconds.
309 */
310 private get runTime (): number {
311 return performance.now() - this.startTimestamp
312 }
313
314 /**
315 * Gets the approximate pool utilization.
316 *
317 * @returns The pool utilization.
318 */
319 private get utilization (): number {
320 const poolRunTimeCapacity = this.runTime * this.maxSize
321 const totalTasksRunTime = this.workerNodes.reduce(
322 (accumulator, workerNode) =>
323 accumulator + workerNode.usage.runTime.aggregate,
324 0
325 )
326 const totalTasksWaitTime = this.workerNodes.reduce(
327 (accumulator, workerNode) =>
328 accumulator + workerNode.usage.waitTime.aggregate,
329 0
330 )
331 return (totalTasksRunTime + totalTasksWaitTime) / poolRunTimeCapacity
332 }
333
334 /**
335 * Pool type.
336 *
337 * If it is `'dynamic'`, it provides the `max` property.
338 */
339 protected abstract get type (): PoolType
340
341 /**
342 * Gets the worker type.
343 */
344 protected abstract get worker (): WorkerType
345
346 /**
347 * Pool minimum size.
348 */
349 protected abstract get minSize (): number
350
351 /**
352 * Pool maximum size.
353 */
354 protected abstract get maxSize (): number
355
356 /**
357 * Get the worker given its id.
358 *
359 * @param workerId - The worker id.
360 * @returns The worker if found in the pool worker nodes, `undefined` otherwise.
361 */
362 private getWorkerById (workerId: number): Worker | undefined {
363 return this.workerNodes.find(workerNode => workerNode.info.id === workerId)
364 ?.worker
365 }
366
367 /**
368 * Gets the given worker its worker node key.
369 *
370 * @param worker - The worker.
371 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
372 */
373 private getWorkerNodeKey (worker: Worker): number {
374 return this.workerNodes.findIndex(
375 workerNode => workerNode.worker === worker
376 )
377 }
378
379 /** @inheritDoc */
380 public setWorkerChoiceStrategy (
381 workerChoiceStrategy: WorkerChoiceStrategy,
382 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
383 ): void {
384 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
385 this.opts.workerChoiceStrategy = workerChoiceStrategy
386 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
387 this.opts.workerChoiceStrategy
388 )
389 if (workerChoiceStrategyOptions != null) {
390 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
391 }
392 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
393 this.setWorkerNodeTasksUsage(
394 workerNode,
395 this.getWorkerUsage(workerNodeKey)
396 )
397 this.setWorkerStatistics(workerNode.worker)
398 }
399 }
400
401 /** @inheritDoc */
402 public setWorkerChoiceStrategyOptions (
403 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
404 ): void {
405 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
406 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
407 this.workerChoiceStrategyContext.setOptions(
408 this.opts.workerChoiceStrategyOptions
409 )
410 }
411
412 /** @inheritDoc */
413 public enableTasksQueue (
414 enable: boolean,
415 tasksQueueOptions?: TasksQueueOptions
416 ): void {
417 if (this.opts.enableTasksQueue === true && !enable) {
418 this.flushTasksQueues()
419 }
420 this.opts.enableTasksQueue = enable
421 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
422 }
423
424 /** @inheritDoc */
425 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
426 if (this.opts.enableTasksQueue === true) {
427 this.checkValidTasksQueueOptions(tasksQueueOptions)
428 this.opts.tasksQueueOptions =
429 this.buildTasksQueueOptions(tasksQueueOptions)
430 } else if (this.opts.tasksQueueOptions != null) {
431 delete this.opts.tasksQueueOptions
432 }
433 }
434
435 private buildTasksQueueOptions (
436 tasksQueueOptions: TasksQueueOptions
437 ): TasksQueueOptions {
438 return {
439 concurrency: tasksQueueOptions?.concurrency ?? 1
440 }
441 }
442
443 /**
444 * Whether the pool is full or not.
445 *
446 * The pool filling boolean status.
447 */
448 protected get full (): boolean {
449 return this.workerNodes.length >= this.maxSize
450 }
451
452 /**
453 * Whether the pool is busy or not.
454 *
455 * The pool busyness boolean status.
456 */
457 protected abstract get busy (): boolean
458
459 /**
460 * Whether worker nodes are executing at least one task.
461 *
462 * @returns Worker nodes busyness boolean status.
463 */
464 protected internalBusy (): boolean {
465 return (
466 this.workerNodes.findIndex(workerNode => {
467 return workerNode.usage.tasks.executing === 0
468 }) === -1
469 )
470 }
471
472 /** @inheritDoc */
473 public async execute (data?: Data, name?: string): Promise<Response> {
474 const timestamp = performance.now()
475 const workerNodeKey = this.chooseWorkerNode()
476 const submittedTask: Task<Data> = {
477 name,
478 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
479 data: data ?? ({} as Data),
480 timestamp,
481 id: crypto.randomUUID()
482 }
483 const res = new Promise<Response>((resolve, reject) => {
484 this.promiseResponseMap.set(submittedTask.id as string, {
485 resolve,
486 reject,
487 worker: this.workerNodes[workerNodeKey].worker
488 })
489 })
490 if (
491 this.opts.enableTasksQueue === true &&
492 (this.busy ||
493 this.workerNodes[workerNodeKey].usage.tasks.executing >=
494 ((this.opts.tasksQueueOptions as TasksQueueOptions)
495 .concurrency as number))
496 ) {
497 this.enqueueTask(workerNodeKey, submittedTask)
498 } else {
499 this.executeTask(workerNodeKey, submittedTask)
500 }
501 this.checkAndEmitEvents()
502 // eslint-disable-next-line @typescript-eslint/return-await
503 return res
504 }
505
506 /** @inheritDoc */
507 public async destroy (): Promise<void> {
508 await Promise.all(
509 this.workerNodes.map(async (workerNode, workerNodeKey) => {
510 this.flushTasksQueue(workerNodeKey)
511 // FIXME: wait for tasks to be finished
512 await this.destroyWorker(workerNode.worker)
513 })
514 )
515 }
516
517 /**
518 * Terminates the given worker.
519 *
520 * @param worker - A worker within `workerNodes`.
521 */
522 protected abstract destroyWorker (worker: Worker): void | Promise<void>
523
524 /**
525 * Setup hook to execute code before worker nodes are created in the abstract constructor.
526 * Can be overridden.
527 *
528 * @virtual
529 */
530 protected setupHook (): void {
531 // Intentionally empty
532 }
533
534 /**
535 * Should return whether the worker is the main worker or not.
536 */
537 protected abstract isMain (): boolean
538
539 /**
540 * Hook executed before the worker task execution.
541 * Can be overridden.
542 *
543 * @param workerNodeKey - The worker node key.
544 * @param task - The task to execute.
545 */
546 protected beforeTaskExecutionHook (
547 workerNodeKey: number,
548 task: Task<Data>
549 ): void {
550 const workerUsage = this.workerNodes[workerNodeKey].usage
551 ++workerUsage.tasks.executing
552 this.updateWaitTimeWorkerUsage(workerUsage, task)
553 }
554
555 /**
556 * Hook executed after the worker task execution.
557 * Can be overridden.
558 *
559 * @param worker - The worker.
560 * @param message - The received message.
561 */
562 protected afterTaskExecutionHook (
563 worker: Worker,
564 message: MessageValue<Response>
565 ): void {
566 const workerUsage = this.workerNodes[this.getWorkerNodeKey(worker)].usage
567 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
568 this.updateRunTimeWorkerUsage(workerUsage, message)
569 this.updateEluWorkerUsage(workerUsage, message)
570 }
571
572 private updateTaskStatisticsWorkerUsage (
573 workerUsage: WorkerUsage,
574 message: MessageValue<Response>
575 ): void {
576 const workerTaskStatistics = workerUsage.tasks
577 --workerTaskStatistics.executing
578 ++workerTaskStatistics.executed
579 if (message.taskError != null) {
580 ++workerTaskStatistics.failed
581 }
582 }
583
584 private updateRunTimeWorkerUsage (
585 workerUsage: WorkerUsage,
586 message: MessageValue<Response>
587 ): void {
588 if (
589 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
590 .aggregate
591 ) {
592 workerUsage.runTime.aggregate += message.taskPerformance?.runTime ?? 0
593 if (
594 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
595 .average &&
596 workerUsage.tasks.executed !== 0
597 ) {
598 workerUsage.runTime.average =
599 workerUsage.runTime.aggregate /
600 (workerUsage.tasks.executed - workerUsage.tasks.failed)
601 }
602 if (
603 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
604 .median &&
605 message.taskPerformance?.runTime != null
606 ) {
607 workerUsage.runTime.history.push(message.taskPerformance.runTime)
608 workerUsage.runTime.median = median(workerUsage.runTime.history)
609 }
610 }
611 }
612
613 private updateWaitTimeWorkerUsage (
614 workerUsage: WorkerUsage,
615 task: Task<Data>
616 ): void {
617 const timestamp = performance.now()
618 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
619 if (
620 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime
621 .aggregate
622 ) {
623 workerUsage.waitTime.aggregate += taskWaitTime ?? 0
624 if (
625 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
626 .waitTime.average &&
627 workerUsage.tasks.executed !== 0
628 ) {
629 workerUsage.waitTime.average =
630 workerUsage.waitTime.aggregate /
631 (workerUsage.tasks.executed - workerUsage.tasks.failed)
632 }
633 if (
634 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
635 .waitTime.median &&
636 taskWaitTime != null
637 ) {
638 workerUsage.waitTime.history.push(taskWaitTime)
639 workerUsage.waitTime.median = median(workerUsage.waitTime.history)
640 }
641 }
642 }
643
644 private updateEluWorkerUsage (
645 workerUsage: WorkerUsage,
646 message: MessageValue<Response>
647 ): void {
648 if (
649 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
650 .aggregate
651 ) {
652 if (workerUsage.elu != null && message.taskPerformance?.elu != null) {
653 workerUsage.elu.idle.aggregate += message.taskPerformance.elu.idle
654 workerUsage.elu.active.aggregate += message.taskPerformance.elu.active
655 workerUsage.elu.utilization =
656 (workerUsage.elu.utilization +
657 message.taskPerformance.elu.utilization) /
658 2
659 } else if (message.taskPerformance?.elu != null) {
660 workerUsage.elu.idle.aggregate = message.taskPerformance.elu.idle
661 workerUsage.elu.active.aggregate = message.taskPerformance.elu.active
662 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
663 }
664 if (
665 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
666 .average &&
667 workerUsage.tasks.executed !== 0
668 ) {
669 const executedTasks =
670 workerUsage.tasks.executed - workerUsage.tasks.failed
671 workerUsage.elu.idle.average =
672 workerUsage.elu.idle.aggregate / executedTasks
673 workerUsage.elu.active.average =
674 workerUsage.elu.active.aggregate / executedTasks
675 }
676 if (
677 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
678 .median &&
679 message.taskPerformance?.elu != null
680 ) {
681 workerUsage.elu.idle.history.push(message.taskPerformance.elu.idle)
682 workerUsage.elu.active.history.push(message.taskPerformance.elu.active)
683 workerUsage.elu.idle.median = median(workerUsage.elu.idle.history)
684 workerUsage.elu.active.median = median(workerUsage.elu.active.history)
685 }
686 }
687 }
688
689 /**
690 * Chooses a worker node for the next task.
691 *
692 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
693 *
694 * @returns The worker node key
695 */
696 private chooseWorkerNode (): number {
697 if (this.shallCreateDynamicWorker()) {
698 const worker = this.createAndSetupDynamicWorker()
699 if (
700 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
701 ) {
702 return this.getWorkerNodeKey(worker)
703 }
704 }
705 return this.workerChoiceStrategyContext.execute()
706 }
707
708 /**
709 * Conditions for dynamic worker creation.
710 *
711 * @returns Whether to create a dynamic worker or not.
712 */
713 private shallCreateDynamicWorker (): boolean {
714 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
715 }
716
717 /**
718 * Sends a message to the given worker.
719 *
720 * @param worker - The worker which should receive the message.
721 * @param message - The message.
722 */
723 protected abstract sendToWorker (
724 worker: Worker,
725 message: MessageValue<Data>
726 ): void
727
728 /**
729 * Registers a listener callback on the given worker.
730 *
731 * @param worker - The worker which should register a listener.
732 * @param listener - The message listener callback.
733 */
734 private registerWorkerMessageListener<Message extends Data | Response>(
735 worker: Worker,
736 listener: (message: MessageValue<Message>) => void
737 ): void {
738 worker.on('message', listener as MessageHandler<Worker>)
739 }
740
741 /**
742 * Creates a new worker.
743 *
744 * @returns Newly created worker.
745 */
746 protected abstract createWorker (): Worker
747
748 /**
749 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
750 * Can be overridden.
751 *
752 * @param worker - The newly created worker.
753 */
754 protected afterWorkerSetup (worker: Worker): void {
755 // Listen to worker messages.
756 this.registerWorkerMessageListener(worker, this.workerListener())
757 }
758
759 /**
760 * Creates a new worker and sets it up completely in the pool worker nodes.
761 *
762 * @returns New, completely set up worker.
763 */
764 protected createAndSetupWorker (): Worker {
765 const worker = this.createWorker()
766
767 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
768 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
769 worker.on('error', error => {
770 if (this.emitter != null) {
771 this.emitter.emit(PoolEvents.error, error)
772 }
773 if (this.opts.restartWorkerOnError === true && !this.starting) {
774 this.createAndSetupWorker()
775 }
776 })
777 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
778 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
779 worker.once('exit', () => {
780 this.removeWorkerNode(worker)
781 })
782
783 this.pushWorkerNode(worker)
784
785 this.setWorkerStatistics(worker)
786
787 this.afterWorkerSetup(worker)
788
789 return worker
790 }
791
792 /**
793 * Creates a new dynamic worker and sets it up completely in the pool worker nodes.
794 *
795 * @returns New, completely set up dynamic worker.
796 */
797 protected createAndSetupDynamicWorker (): Worker {
798 const worker = this.createAndSetupWorker()
799 this.registerWorkerMessageListener(worker, message => {
800 const workerNodeKey = this.getWorkerNodeKey(worker)
801 if (
802 isKillBehavior(KillBehaviors.HARD, message.kill) ||
803 (message.kill != null &&
804 ((this.opts.enableTasksQueue === false &&
805 this.workerNodes[workerNodeKey].usage.tasks.executing === 0) ||
806 (this.opts.enableTasksQueue === true &&
807 this.workerNodes[workerNodeKey].usage.tasks.executing === 0 &&
808 this.tasksQueueSize(workerNodeKey) === 0)))
809 ) {
810 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
811 void (this.destroyWorker(worker) as Promise<void>)
812 }
813 })
814 return worker
815 }
816
817 /**
818 * This function is the listener registered for each worker message.
819 *
820 * @returns The listener function to execute when a message is received from a worker.
821 */
822 protected workerListener (): (message: MessageValue<Response>) => void {
823 return message => {
824 if (message.workerId != null && message.started != null) {
825 // Worker started message received
826 const worker = this.getWorkerById(message.workerId)
827 if (worker != null) {
828 this.workerNodes[this.getWorkerNodeKey(worker)].info.started =
829 message.started
830 } else {
831 throw new Error('Worker started message received from unknown worker')
832 }
833 } else if (message.id != null) {
834 // Task execution response received
835 const promiseResponse = this.promiseResponseMap.get(message.id)
836 if (promiseResponse != null) {
837 if (message.taskError != null) {
838 if (this.emitter != null) {
839 this.emitter.emit(PoolEvents.taskError, message.taskError)
840 }
841 promiseResponse.reject(message.taskError.message)
842 } else {
843 promiseResponse.resolve(message.data as Response)
844 }
845 this.afterTaskExecutionHook(promiseResponse.worker, message)
846 this.promiseResponseMap.delete(message.id)
847 const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
848 if (
849 this.opts.enableTasksQueue === true &&
850 this.tasksQueueSize(workerNodeKey) > 0
851 ) {
852 this.executeTask(
853 workerNodeKey,
854 this.dequeueTask(workerNodeKey) as Task<Data>
855 )
856 }
857 this.workerChoiceStrategyContext.update(workerNodeKey)
858 }
859 }
860 }
861 }
862
863 private checkAndEmitEvents (): void {
864 if (this.emitter != null) {
865 if (this.busy) {
866 this.emitter?.emit(PoolEvents.busy, this.info)
867 }
868 if (this.type === PoolTypes.dynamic && this.full) {
869 this.emitter?.emit(PoolEvents.full, this.info)
870 }
871 }
872 }
873
874 /**
875 * Sets the given worker node its tasks usage in the pool.
876 *
877 * @param workerNode - The worker node.
878 * @param workerUsage - The worker usage.
879 */
880 private setWorkerNodeTasksUsage (
881 workerNode: WorkerNode<Worker, Data>,
882 workerUsage: WorkerUsage
883 ): void {
884 workerNode.usage = workerUsage
885 }
886
887 /**
888 * Pushes the given worker in the pool worker nodes.
889 *
890 * @param worker - The worker.
891 * @returns The worker nodes length.
892 */
893 private pushWorkerNode (worker: Worker): number {
894 this.workerNodes.push({
895 worker,
896 info: { id: this.getWorkerId(worker), started: false },
897 usage: this.getWorkerUsage(),
898 tasksQueue: new Queue<Task<Data>>()
899 })
900 const workerNodeKey = this.getWorkerNodeKey(worker)
901 this.setWorkerNodeTasksUsage(
902 this.workerNodes[workerNodeKey],
903 this.getWorkerUsage(workerNodeKey)
904 )
905 return this.workerNodes.length
906 }
907
908 /**
909 * Gets the worker id.
910 *
911 * @param worker - The worker.
912 * @returns The worker id.
913 */
914 private getWorkerId (worker: Worker): number | undefined {
915 if (this.worker === WorkerTypes.thread) {
916 return worker.threadId
917 } else if (this.worker === WorkerTypes.cluster) {
918 return worker.id
919 }
920 }
921
922 // /**
923 // * Sets the given worker in the pool worker nodes.
924 // *
925 // * @param workerNodeKey - The worker node key.
926 // * @param worker - The worker.
927 // * @param workerInfo - The worker info.
928 // * @param workerUsage - The worker usage.
929 // * @param tasksQueue - The worker task queue.
930 // */
931 // private setWorkerNode (
932 // workerNodeKey: number,
933 // worker: Worker,
934 // workerInfo: WorkerInfo,
935 // workerUsage: WorkerUsage,
936 // tasksQueue: Queue<Task<Data>>
937 // ): void {
938 // this.workerNodes[workerNodeKey] = {
939 // worker,
940 // info: workerInfo,
941 // usage: workerUsage,
942 // tasksQueue
943 // }
944 // }
945
946 /**
947 * Removes the given worker from the pool worker nodes.
948 *
949 * @param worker - The worker.
950 */
951 private removeWorkerNode (worker: Worker): void {
952 const workerNodeKey = this.getWorkerNodeKey(worker)
953 if (workerNodeKey !== -1) {
954 this.workerNodes.splice(workerNodeKey, 1)
955 this.workerChoiceStrategyContext.remove(workerNodeKey)
956 }
957 }
958
959 private executeTask (workerNodeKey: number, task: Task<Data>): void {
960 this.beforeTaskExecutionHook(workerNodeKey, task)
961 this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
962 }
963
964 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
965 return this.workerNodes[workerNodeKey].tasksQueue.enqueue(task)
966 }
967
968 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
969 return this.workerNodes[workerNodeKey].tasksQueue.dequeue()
970 }
971
972 private tasksQueueSize (workerNodeKey: number): number {
973 return this.workerNodes[workerNodeKey].tasksQueue.size
974 }
975
976 private tasksMaxQueueSize (workerNodeKey: number): number {
977 return this.workerNodes[workerNodeKey].tasksQueue.maxSize
978 }
979
980 private flushTasksQueue (workerNodeKey: number): void {
981 if (this.tasksQueueSize(workerNodeKey) > 0) {
982 for (let i = 0; i < this.tasksQueueSize(workerNodeKey); i++) {
983 this.executeTask(
984 workerNodeKey,
985 this.dequeueTask(workerNodeKey) as Task<Data>
986 )
987 }
988 }
989 this.workerNodes[workerNodeKey].tasksQueue.clear()
990 }
991
992 private flushTasksQueues (): void {
993 for (const [workerNodeKey] of this.workerNodes.entries()) {
994 this.flushTasksQueue(workerNodeKey)
995 }
996 }
997
998 private setWorkerStatistics (worker: Worker): void {
999 this.sendToWorker(worker, {
1000 statistics: {
1001 runTime:
1002 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1003 .runTime.aggregate,
1004 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1005 .elu.aggregate
1006 }
1007 })
1008 }
1009
1010 private getWorkerUsage (workerNodeKey?: number): WorkerUsage {
1011 const getTasksQueueSize = (workerNodeKey?: number): number => {
1012 return workerNodeKey != null ? this.tasksQueueSize(workerNodeKey) : 0
1013 }
1014 const getTasksMaxQueueSize = (workerNodeKey?: number): number => {
1015 return workerNodeKey != null ? this.tasksMaxQueueSize(workerNodeKey) : 0
1016 }
1017 return {
1018 tasks: {
1019 executed: 0,
1020 executing: 0,
1021 get queued (): number {
1022 return getTasksQueueSize(workerNodeKey)
1023 },
1024 get maxQueued (): number {
1025 return getTasksMaxQueueSize(workerNodeKey)
1026 },
1027 failed: 0
1028 },
1029 runTime: {
1030 aggregate: 0,
1031 average: 0,
1032 median: 0,
1033 history: new CircularArray()
1034 },
1035 waitTime: {
1036 aggregate: 0,
1037 average: 0,
1038 median: 0,
1039 history: new CircularArray()
1040 },
1041 elu: {
1042 idle: {
1043 aggregate: 0,
1044 average: 0,
1045 median: 0,
1046 history: new CircularArray()
1047 },
1048 active: {
1049 aggregate: 0,
1050 average: 0,
1051 median: 0,
1052 history: new CircularArray()
1053 },
1054 utilization: 0
1055 }
1056 }
1057 }
1058 }