Merge branch 'master' of github.com:poolifier/poolifier
[poolifier.git] / src / pools / abstract-pool.ts
1 import { randomUUID } from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import type { TransferListItem } from 'node:worker_threads'
4 import { EventEmitterAsyncResource } from 'node:events'
5 import type {
6 MessageValue,
7 PromiseResponseWrapper,
8 Task
9 } from '../utility-types'
10 import {
11 DEFAULT_TASK_NAME,
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
13 EMPTY_FUNCTION,
14 average,
15 exponentialDelay,
16 isKillBehavior,
17 isPlainObject,
18 max,
19 median,
20 min,
21 round,
22 sleep
23 } from '../utils'
24 import { KillBehaviors } from '../worker/worker-options'
25 import type { TaskFunction } from '../worker/task-functions'
26 import {
27 type IPool,
28 PoolEvents,
29 type PoolInfo,
30 type PoolOptions,
31 type PoolType,
32 PoolTypes,
33 type TasksQueueOptions
34 } from './pool'
35 import type {
36 IWorker,
37 IWorkerNode,
38 WorkerInfo,
39 WorkerNodeEventDetail,
40 WorkerType,
41 WorkerUsage
42 } from './worker'
43 import {
44 type MeasurementStatisticsRequirements,
45 Measurements,
46 WorkerChoiceStrategies,
47 type WorkerChoiceStrategy,
48 type WorkerChoiceStrategyOptions
49 } from './selection-strategies/selection-strategies-types'
50 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
51 import { version } from './version'
52 import { WorkerNode } from './worker-node'
53 import {
54 checkFilePath,
55 checkValidTasksQueueOptions,
56 checkValidWorkerChoiceStrategy,
57 updateMeasurementStatistics
58 } from './utils'
59
60 /**
61 * Base class that implements some shared logic for all poolifier pools.
62 *
63 * @typeParam Worker - Type of worker which manages this pool.
64 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
65 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
66 */
67 export abstract class AbstractPool<
68 Worker extends IWorker,
69 Data = unknown,
70 Response = unknown
71 > implements IPool<Worker, Data, Response> {
72 /** @inheritDoc */
73 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
74
75 /** @inheritDoc */
76 public emitter?: EventEmitterAsyncResource
77
78 /**
79 * Dynamic pool maximum size property placeholder.
80 */
81 protected readonly max?: number
82
83 /**
84 * The task execution response promise map:
85 * - `key`: The message id of each submitted task.
86 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
87 *
88 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
89 */
90 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
91 new Map<string, PromiseResponseWrapper<Response>>()
92
93 /**
94 * Worker choice strategy context referencing a worker choice algorithm implementation.
95 */
96 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
97 Worker,
98 Data,
99 Response
100 >
101
102 /**
103 * The task functions added at runtime map:
104 * - `key`: The task function name.
105 * - `value`: The task function itself.
106 */
107 private readonly taskFunctions: Map<string, TaskFunction<Data, Response>>
108
109 /**
110 * Whether the pool is started or not.
111 */
112 private started: boolean
113 /**
114 * Whether the pool is starting or not.
115 */
116 private starting: boolean
117 /**
118 * Whether the pool is destroying or not.
119 */
120 private destroying: boolean
121 /**
122 * Whether the pool ready event has been emitted or not.
123 */
124 private readyEventEmitted: boolean
125 /**
126 * The start timestamp of the pool.
127 */
128 private readonly startTimestamp
129
130 /**
131 * Constructs a new poolifier pool.
132 *
133 * @param numberOfWorkers - Number of workers that this pool should manage.
134 * @param filePath - Path to the worker file.
135 * @param opts - Options for the pool.
136 */
137 public constructor (
138 protected readonly numberOfWorkers: number,
139 protected readonly filePath: string,
140 protected readonly opts: PoolOptions<Worker>
141 ) {
142 if (!this.isMain()) {
143 throw new Error(
144 'Cannot start a pool from a worker with the same type as the pool'
145 )
146 }
147 checkFilePath(this.filePath)
148 this.checkNumberOfWorkers(this.numberOfWorkers)
149 this.checkPoolOptions(this.opts)
150
151 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
152 this.executeTask = this.executeTask.bind(this)
153 this.enqueueTask = this.enqueueTask.bind(this)
154
155 if (this.opts.enableEvents === true) {
156 this.initializeEventEmitter()
157 }
158 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
159 Worker,
160 Data,
161 Response
162 >(
163 this,
164 this.opts.workerChoiceStrategy,
165 this.opts.workerChoiceStrategyOptions
166 )
167
168 this.setupHook()
169
170 this.taskFunctions = new Map<string, TaskFunction<Data, Response>>()
171
172 this.started = false
173 this.starting = false
174 this.destroying = false
175 this.readyEventEmitted = false
176 if (this.opts.startWorkers === true) {
177 this.start()
178 }
179
180 this.startTimestamp = performance.now()
181 }
182
183 private checkNumberOfWorkers (numberOfWorkers: number): void {
184 if (numberOfWorkers == null) {
185 throw new Error(
186 'Cannot instantiate a pool without specifying the number of workers'
187 )
188 } else if (!Number.isSafeInteger(numberOfWorkers)) {
189 throw new TypeError(
190 'Cannot instantiate a pool with a non safe integer number of workers'
191 )
192 } else if (numberOfWorkers < 0) {
193 throw new RangeError(
194 'Cannot instantiate a pool with a negative number of workers'
195 )
196 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
197 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
198 }
199 }
200
201 private checkPoolOptions (opts: PoolOptions<Worker>): void {
202 if (isPlainObject(opts)) {
203 this.opts.startWorkers = opts.startWorkers ?? true
204 checkValidWorkerChoiceStrategy(
205 opts.workerChoiceStrategy as WorkerChoiceStrategy
206 )
207 this.opts.workerChoiceStrategy =
208 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
209 this.checkValidWorkerChoiceStrategyOptions(
210 opts.workerChoiceStrategyOptions as WorkerChoiceStrategyOptions
211 )
212 this.opts.workerChoiceStrategyOptions = {
213 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
214 ...opts.workerChoiceStrategyOptions
215 }
216 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
217 this.opts.enableEvents = opts.enableEvents ?? true
218 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
219 if (this.opts.enableTasksQueue) {
220 checkValidTasksQueueOptions(opts.tasksQueueOptions as TasksQueueOptions)
221 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
222 opts.tasksQueueOptions as TasksQueueOptions
223 )
224 }
225 } else {
226 throw new TypeError('Invalid pool options: must be a plain object')
227 }
228 }
229
230 private checkValidWorkerChoiceStrategyOptions (
231 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
232 ): void {
233 if (
234 workerChoiceStrategyOptions != null &&
235 !isPlainObject(workerChoiceStrategyOptions)
236 ) {
237 throw new TypeError(
238 'Invalid worker choice strategy options: must be a plain object'
239 )
240 }
241 if (
242 workerChoiceStrategyOptions?.retries != null &&
243 !Number.isSafeInteger(workerChoiceStrategyOptions.retries)
244 ) {
245 throw new TypeError(
246 'Invalid worker choice strategy options: retries must be an integer'
247 )
248 }
249 if (
250 workerChoiceStrategyOptions?.retries != null &&
251 workerChoiceStrategyOptions.retries < 0
252 ) {
253 throw new RangeError(
254 `Invalid worker choice strategy options: retries '${workerChoiceStrategyOptions.retries}' must be greater or equal than zero`
255 )
256 }
257 if (
258 workerChoiceStrategyOptions?.weights != null &&
259 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
260 ) {
261 throw new Error(
262 'Invalid worker choice strategy options: must have a weight for each worker node'
263 )
264 }
265 if (
266 workerChoiceStrategyOptions?.measurement != null &&
267 !Object.values(Measurements).includes(
268 workerChoiceStrategyOptions.measurement
269 )
270 ) {
271 throw new Error(
272 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
273 )
274 }
275 }
276
277 private initializeEventEmitter (): void {
278 this.emitter = new EventEmitterAsyncResource({
279 name: `poolifier:${this.type}-${this.worker}-pool`
280 })
281 }
282
283 /** @inheritDoc */
284 public get info (): PoolInfo {
285 return {
286 version,
287 type: this.type,
288 worker: this.worker,
289 started: this.started,
290 ready: this.ready,
291 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
292 minSize: this.minSize,
293 maxSize: this.maxSize,
294 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
295 .runTime.aggregate &&
296 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
297 .waitTime.aggregate && { utilization: round(this.utilization) }),
298 workerNodes: this.workerNodes.length,
299 idleWorkerNodes: this.workerNodes.reduce(
300 (accumulator, workerNode) =>
301 workerNode.usage.tasks.executing === 0
302 ? accumulator + 1
303 : accumulator,
304 0
305 ),
306 busyWorkerNodes: this.workerNodes.reduce(
307 (accumulator, workerNode) =>
308 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
309 0
310 ),
311 executedTasks: this.workerNodes.reduce(
312 (accumulator, workerNode) =>
313 accumulator + workerNode.usage.tasks.executed,
314 0
315 ),
316 executingTasks: this.workerNodes.reduce(
317 (accumulator, workerNode) =>
318 accumulator + workerNode.usage.tasks.executing,
319 0
320 ),
321 ...(this.opts.enableTasksQueue === true && {
322 queuedTasks: this.workerNodes.reduce(
323 (accumulator, workerNode) =>
324 accumulator + workerNode.usage.tasks.queued,
325 0
326 )
327 }),
328 ...(this.opts.enableTasksQueue === true && {
329 maxQueuedTasks: this.workerNodes.reduce(
330 (accumulator, workerNode) =>
331 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
332 0
333 )
334 }),
335 ...(this.opts.enableTasksQueue === true && {
336 backPressure: this.hasBackPressure()
337 }),
338 ...(this.opts.enableTasksQueue === true && {
339 stolenTasks: this.workerNodes.reduce(
340 (accumulator, workerNode) =>
341 accumulator + workerNode.usage.tasks.stolen,
342 0
343 )
344 }),
345 failedTasks: this.workerNodes.reduce(
346 (accumulator, workerNode) =>
347 accumulator + workerNode.usage.tasks.failed,
348 0
349 ),
350 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
351 .runTime.aggregate && {
352 runTime: {
353 minimum: round(
354 min(
355 ...this.workerNodes.map(
356 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
357 )
358 )
359 ),
360 maximum: round(
361 max(
362 ...this.workerNodes.map(
363 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
364 )
365 )
366 ),
367 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
368 .runTime.average && {
369 average: round(
370 average(
371 this.workerNodes.reduce<number[]>(
372 (accumulator, workerNode) =>
373 accumulator.concat(workerNode.usage.runTime.history),
374 []
375 )
376 )
377 )
378 }),
379 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
380 .runTime.median && {
381 median: round(
382 median(
383 this.workerNodes.reduce<number[]>(
384 (accumulator, workerNode) =>
385 accumulator.concat(workerNode.usage.runTime.history),
386 []
387 )
388 )
389 )
390 })
391 }
392 }),
393 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
394 .waitTime.aggregate && {
395 waitTime: {
396 minimum: round(
397 min(
398 ...this.workerNodes.map(
399 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
400 )
401 )
402 ),
403 maximum: round(
404 max(
405 ...this.workerNodes.map(
406 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
407 )
408 )
409 ),
410 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
411 .waitTime.average && {
412 average: round(
413 average(
414 this.workerNodes.reduce<number[]>(
415 (accumulator, workerNode) =>
416 accumulator.concat(workerNode.usage.waitTime.history),
417 []
418 )
419 )
420 )
421 }),
422 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
423 .waitTime.median && {
424 median: round(
425 median(
426 this.workerNodes.reduce<number[]>(
427 (accumulator, workerNode) =>
428 accumulator.concat(workerNode.usage.waitTime.history),
429 []
430 )
431 )
432 )
433 })
434 }
435 })
436 }
437 }
438
439 /**
440 * The pool readiness boolean status.
441 */
442 private get ready (): boolean {
443 return (
444 this.workerNodes.reduce(
445 (accumulator, workerNode) =>
446 !workerNode.info.dynamic && workerNode.info.ready
447 ? accumulator + 1
448 : accumulator,
449 0
450 ) >= this.minSize
451 )
452 }
453
454 /**
455 * The approximate pool utilization.
456 *
457 * @returns The pool utilization.
458 */
459 private get utilization (): number {
460 const poolTimeCapacity =
461 (performance.now() - this.startTimestamp) * this.maxSize
462 const totalTasksRunTime = this.workerNodes.reduce(
463 (accumulator, workerNode) =>
464 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
465 0
466 )
467 const totalTasksWaitTime = this.workerNodes.reduce(
468 (accumulator, workerNode) =>
469 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
470 0
471 )
472 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
473 }
474
475 /**
476 * The pool type.
477 *
478 * If it is `'dynamic'`, it provides the `max` property.
479 */
480 protected abstract get type (): PoolType
481
482 /**
483 * The worker type.
484 */
485 protected abstract get worker (): WorkerType
486
487 /**
488 * The pool minimum size.
489 */
490 protected get minSize (): number {
491 return this.numberOfWorkers
492 }
493
494 /**
495 * The pool maximum size.
496 */
497 protected get maxSize (): number {
498 return this.max ?? this.numberOfWorkers
499 }
500
501 /**
502 * Checks if the worker id sent in the received message from a worker is valid.
503 *
504 * @param message - The received message.
505 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
506 */
507 private checkMessageWorkerId (message: MessageValue<Data | Response>): void {
508 if (message.workerId == null) {
509 throw new Error('Worker message received without worker id')
510 } else if (this.getWorkerNodeKeyByWorkerId(message.workerId) === -1) {
511 throw new Error(
512 `Worker message received from unknown worker '${message.workerId}'`
513 )
514 }
515 }
516
517 /**
518 * Gets the given worker its worker node key.
519 *
520 * @param worker - The worker.
521 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
522 */
523 private getWorkerNodeKeyByWorker (worker: Worker): number {
524 return this.workerNodes.findIndex(
525 workerNode => workerNode.worker === worker
526 )
527 }
528
529 /**
530 * Gets the worker node key given its worker id.
531 *
532 * @param workerId - The worker id.
533 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
534 */
535 private getWorkerNodeKeyByWorkerId (workerId: number | undefined): number {
536 return this.workerNodes.findIndex(
537 workerNode => workerNode.info.id === workerId
538 )
539 }
540
541 /** @inheritDoc */
542 public setWorkerChoiceStrategy (
543 workerChoiceStrategy: WorkerChoiceStrategy,
544 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
545 ): void {
546 checkValidWorkerChoiceStrategy(workerChoiceStrategy)
547 this.opts.workerChoiceStrategy = workerChoiceStrategy
548 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
549 this.opts.workerChoiceStrategy
550 )
551 if (workerChoiceStrategyOptions != null) {
552 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
553 }
554 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
555 workerNode.resetUsage()
556 this.sendStatisticsMessageToWorker(workerNodeKey)
557 }
558 }
559
560 /** @inheritDoc */
561 public setWorkerChoiceStrategyOptions (
562 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
563 ): void {
564 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
565 this.opts.workerChoiceStrategyOptions = {
566 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
567 ...workerChoiceStrategyOptions
568 }
569 this.workerChoiceStrategyContext.setOptions(
570 this.opts.workerChoiceStrategyOptions
571 )
572 }
573
574 /** @inheritDoc */
575 public enableTasksQueue (
576 enable: boolean,
577 tasksQueueOptions?: TasksQueueOptions
578 ): void {
579 if (this.opts.enableTasksQueue === true && !enable) {
580 this.unsetTaskStealing()
581 this.unsetTasksStealingOnBackPressure()
582 this.flushTasksQueues()
583 }
584 this.opts.enableTasksQueue = enable
585 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
586 }
587
588 /** @inheritDoc */
589 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
590 if (this.opts.enableTasksQueue === true) {
591 checkValidTasksQueueOptions(tasksQueueOptions)
592 this.opts.tasksQueueOptions =
593 this.buildTasksQueueOptions(tasksQueueOptions)
594 this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number)
595 if (this.opts.tasksQueueOptions.taskStealing === true) {
596 this.setTaskStealing()
597 } else {
598 this.unsetTaskStealing()
599 }
600 if (this.opts.tasksQueueOptions.tasksStealingOnBackPressure === true) {
601 this.setTasksStealingOnBackPressure()
602 } else {
603 this.unsetTasksStealingOnBackPressure()
604 }
605 } else if (this.opts.tasksQueueOptions != null) {
606 delete this.opts.tasksQueueOptions
607 }
608 }
609
610 private buildTasksQueueOptions (
611 tasksQueueOptions: TasksQueueOptions
612 ): TasksQueueOptions {
613 return {
614 ...{
615 size: Math.pow(this.maxSize, 2),
616 concurrency: 1,
617 taskStealing: true,
618 tasksStealingOnBackPressure: true
619 },
620 ...tasksQueueOptions
621 }
622 }
623
624 private setTasksQueueSize (size: number): void {
625 for (const workerNode of this.workerNodes) {
626 workerNode.tasksQueueBackPressureSize = size
627 }
628 }
629
630 private setTaskStealing (): void {
631 for (const [workerNodeKey] of this.workerNodes.entries()) {
632 this.workerNodes[workerNodeKey].addEventListener(
633 'idleWorkerNode',
634 this.handleIdleWorkerNodeEvent as EventListener
635 )
636 }
637 }
638
639 private unsetTaskStealing (): void {
640 for (const [workerNodeKey] of this.workerNodes.entries()) {
641 this.workerNodes[workerNodeKey].removeEventListener(
642 'idleWorkerNode',
643 this.handleIdleWorkerNodeEvent as EventListener
644 )
645 }
646 }
647
648 private setTasksStealingOnBackPressure (): void {
649 for (const [workerNodeKey] of this.workerNodes.entries()) {
650 this.workerNodes[workerNodeKey].addEventListener(
651 'backPressure',
652 this.handleBackPressureEvent as EventListener
653 )
654 }
655 }
656
657 private unsetTasksStealingOnBackPressure (): void {
658 for (const [workerNodeKey] of this.workerNodes.entries()) {
659 this.workerNodes[workerNodeKey].removeEventListener(
660 'backPressure',
661 this.handleBackPressureEvent as EventListener
662 )
663 }
664 }
665
666 /**
667 * Whether the pool is full or not.
668 *
669 * The pool filling boolean status.
670 */
671 protected get full (): boolean {
672 return this.workerNodes.length >= this.maxSize
673 }
674
675 /**
676 * Whether the pool is busy or not.
677 *
678 * The pool busyness boolean status.
679 */
680 protected abstract get busy (): boolean
681
682 /**
683 * Whether worker nodes are executing concurrently their tasks quota or not.
684 *
685 * @returns Worker nodes busyness boolean status.
686 */
687 protected internalBusy (): boolean {
688 if (this.opts.enableTasksQueue === true) {
689 return (
690 this.workerNodes.findIndex(
691 workerNode =>
692 workerNode.info.ready &&
693 workerNode.usage.tasks.executing <
694 (this.opts.tasksQueueOptions?.concurrency as number)
695 ) === -1
696 )
697 }
698 return (
699 this.workerNodes.findIndex(
700 workerNode =>
701 workerNode.info.ready && workerNode.usage.tasks.executing === 0
702 ) === -1
703 )
704 }
705
706 private async sendTaskFunctionOperationToWorker (
707 workerNodeKey: number,
708 message: MessageValue<Data>
709 ): Promise<boolean> {
710 return await new Promise<boolean>((resolve, reject) => {
711 const taskFunctionOperationListener = (
712 message: MessageValue<Response>
713 ): void => {
714 this.checkMessageWorkerId(message)
715 const workerId = this.getWorkerInfo(workerNodeKey).id as number
716 if (
717 message.taskFunctionOperationStatus != null &&
718 message.workerId === workerId
719 ) {
720 if (message.taskFunctionOperationStatus) {
721 resolve(true)
722 } else if (!message.taskFunctionOperationStatus) {
723 reject(
724 new Error(
725 `Task function operation '${
726 message.taskFunctionOperation as string
727 }' failed on worker ${message.workerId} with error: '${
728 message.workerError?.message as string
729 }'`
730 )
731 )
732 }
733 this.deregisterWorkerMessageListener(
734 this.getWorkerNodeKeyByWorkerId(message.workerId),
735 taskFunctionOperationListener
736 )
737 }
738 }
739 this.registerWorkerMessageListener(
740 workerNodeKey,
741 taskFunctionOperationListener
742 )
743 this.sendToWorker(workerNodeKey, message)
744 })
745 }
746
747 private async sendTaskFunctionOperationToWorkers (
748 message: MessageValue<Data>
749 ): Promise<boolean> {
750 return await new Promise<boolean>((resolve, reject) => {
751 const responsesReceived = new Array<MessageValue<Response>>()
752 const taskFunctionOperationsListener = (
753 message: MessageValue<Response>
754 ): void => {
755 this.checkMessageWorkerId(message)
756 if (message.taskFunctionOperationStatus != null) {
757 responsesReceived.push(message)
758 if (responsesReceived.length === this.workerNodes.length) {
759 if (
760 responsesReceived.every(
761 message => message.taskFunctionOperationStatus === true
762 )
763 ) {
764 resolve(true)
765 } else if (
766 responsesReceived.some(
767 message => message.taskFunctionOperationStatus === false
768 )
769 ) {
770 const errorResponse = responsesReceived.find(
771 response => response.taskFunctionOperationStatus === false
772 )
773 reject(
774 new Error(
775 `Task function operation '${
776 message.taskFunctionOperation as string
777 }' failed on worker ${
778 errorResponse?.workerId as number
779 } with error: '${
780 errorResponse?.workerError?.message as string
781 }'`
782 )
783 )
784 }
785 this.deregisterWorkerMessageListener(
786 this.getWorkerNodeKeyByWorkerId(message.workerId),
787 taskFunctionOperationsListener
788 )
789 }
790 }
791 }
792 for (const [workerNodeKey] of this.workerNodes.entries()) {
793 this.registerWorkerMessageListener(
794 workerNodeKey,
795 taskFunctionOperationsListener
796 )
797 this.sendToWorker(workerNodeKey, message)
798 }
799 })
800 }
801
802 /** @inheritDoc */
803 public hasTaskFunction (name: string): boolean {
804 for (const workerNode of this.workerNodes) {
805 if (
806 Array.isArray(workerNode.info.taskFunctionNames) &&
807 workerNode.info.taskFunctionNames.includes(name)
808 ) {
809 return true
810 }
811 }
812 return false
813 }
814
815 /** @inheritDoc */
816 public async addTaskFunction (
817 name: string,
818 fn: TaskFunction<Data, Response>
819 ): Promise<boolean> {
820 if (typeof name !== 'string') {
821 throw new TypeError('name argument must be a string')
822 }
823 if (typeof name === 'string' && name.trim().length === 0) {
824 throw new TypeError('name argument must not be an empty string')
825 }
826 if (typeof fn !== 'function') {
827 throw new TypeError('fn argument must be a function')
828 }
829 const opResult = await this.sendTaskFunctionOperationToWorkers({
830 taskFunctionOperation: 'add',
831 taskFunctionName: name,
832 taskFunction: fn.toString()
833 })
834 this.taskFunctions.set(name, fn)
835 return opResult
836 }
837
838 /** @inheritDoc */
839 public async removeTaskFunction (name: string): Promise<boolean> {
840 if (!this.taskFunctions.has(name)) {
841 throw new Error(
842 'Cannot remove a task function not handled on the pool side'
843 )
844 }
845 const opResult = await this.sendTaskFunctionOperationToWorkers({
846 taskFunctionOperation: 'remove',
847 taskFunctionName: name
848 })
849 this.deleteTaskFunctionWorkerUsages(name)
850 this.taskFunctions.delete(name)
851 return opResult
852 }
853
854 /** @inheritDoc */
855 public listTaskFunctionNames (): string[] {
856 for (const workerNode of this.workerNodes) {
857 if (
858 Array.isArray(workerNode.info.taskFunctionNames) &&
859 workerNode.info.taskFunctionNames.length > 0
860 ) {
861 return workerNode.info.taskFunctionNames
862 }
863 }
864 return []
865 }
866
867 /** @inheritDoc */
868 public async setDefaultTaskFunction (name: string): Promise<boolean> {
869 return await this.sendTaskFunctionOperationToWorkers({
870 taskFunctionOperation: 'default',
871 taskFunctionName: name
872 })
873 }
874
875 private deleteTaskFunctionWorkerUsages (name: string): void {
876 for (const workerNode of this.workerNodes) {
877 workerNode.deleteTaskFunctionWorkerUsage(name)
878 }
879 }
880
881 private shallExecuteTask (workerNodeKey: number): boolean {
882 return (
883 this.tasksQueueSize(workerNodeKey) === 0 &&
884 this.workerNodes[workerNodeKey].usage.tasks.executing <
885 (this.opts.tasksQueueOptions?.concurrency as number)
886 )
887 }
888
889 /** @inheritDoc */
890 public async execute (
891 data?: Data,
892 name?: string,
893 transferList?: TransferListItem[]
894 ): Promise<Response> {
895 return await new Promise<Response>((resolve, reject) => {
896 if (!this.started) {
897 reject(new Error('Cannot execute a task on not started pool'))
898 return
899 }
900 if (this.destroying) {
901 reject(new Error('Cannot execute a task on destroying pool'))
902 return
903 }
904 if (name != null && typeof name !== 'string') {
905 reject(new TypeError('name argument must be a string'))
906 return
907 }
908 if (
909 name != null &&
910 typeof name === 'string' &&
911 name.trim().length === 0
912 ) {
913 reject(new TypeError('name argument must not be an empty string'))
914 return
915 }
916 if (transferList != null && !Array.isArray(transferList)) {
917 reject(new TypeError('transferList argument must be an array'))
918 return
919 }
920 const timestamp = performance.now()
921 const workerNodeKey = this.chooseWorkerNode()
922 const task: Task<Data> = {
923 name: name ?? DEFAULT_TASK_NAME,
924 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
925 data: data ?? ({} as Data),
926 transferList,
927 timestamp,
928 taskId: randomUUID()
929 }
930 this.promiseResponseMap.set(task.taskId as string, {
931 resolve,
932 reject,
933 workerNodeKey
934 })
935 if (
936 this.opts.enableTasksQueue === false ||
937 (this.opts.enableTasksQueue === true &&
938 this.shallExecuteTask(workerNodeKey))
939 ) {
940 this.executeTask(workerNodeKey, task)
941 } else {
942 this.enqueueTask(workerNodeKey, task)
943 }
944 })
945 }
946
947 /** @inheritdoc */
948 public start (): void {
949 if (this.started) {
950 throw new Error('Cannot start an already started pool')
951 }
952 if (this.starting) {
953 throw new Error('Cannot start an already starting pool')
954 }
955 if (this.destroying) {
956 throw new Error('Cannot start a destroying pool')
957 }
958 this.starting = true
959 while (
960 this.workerNodes.reduce(
961 (accumulator, workerNode) =>
962 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
963 0
964 ) < this.numberOfWorkers
965 ) {
966 this.createAndSetupWorkerNode()
967 }
968 this.starting = false
969 this.started = true
970 }
971
972 /** @inheritDoc */
973 public async destroy (): Promise<void> {
974 if (!this.started) {
975 throw new Error('Cannot destroy an already destroyed pool')
976 }
977 if (this.starting) {
978 throw new Error('Cannot destroy an starting pool')
979 }
980 if (this.destroying) {
981 throw new Error('Cannot destroy an already destroying pool')
982 }
983 this.destroying = true
984 await Promise.all(
985 this.workerNodes.map(async (_, workerNodeKey) => {
986 await this.destroyWorkerNode(workerNodeKey)
987 })
988 )
989 this.emitter?.emit(PoolEvents.destroy, this.info)
990 this.emitter?.emitDestroy()
991 this.readyEventEmitted = false
992 this.destroying = false
993 this.started = false
994 }
995
996 protected async sendKillMessageToWorker (
997 workerNodeKey: number
998 ): Promise<void> {
999 await new Promise<void>((resolve, reject) => {
1000 const killMessageListener = (message: MessageValue<Response>): void => {
1001 this.checkMessageWorkerId(message)
1002 if (message.kill === 'success') {
1003 resolve()
1004 } else if (message.kill === 'failure') {
1005 reject(
1006 new Error(
1007 `Kill message handling failed on worker ${
1008 message.workerId as number
1009 }`
1010 )
1011 )
1012 }
1013 }
1014 // FIXME: should be registered only once
1015 this.registerWorkerMessageListener(workerNodeKey, killMessageListener)
1016 this.sendToWorker(workerNodeKey, { kill: true })
1017 })
1018 }
1019
1020 /**
1021 * Terminates the worker node given its worker node key.
1022 *
1023 * @param workerNodeKey - The worker node key.
1024 */
1025 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
1026
1027 /**
1028 * Setup hook to execute code before worker nodes are created in the abstract constructor.
1029 * Can be overridden.
1030 *
1031 * @virtual
1032 */
1033 protected setupHook (): void {
1034 /* Intentionally empty */
1035 }
1036
1037 /**
1038 * Should return whether the worker is the main worker or not.
1039 */
1040 protected abstract isMain (): boolean
1041
1042 /**
1043 * Hook executed before the worker task execution.
1044 * Can be overridden.
1045 *
1046 * @param workerNodeKey - The worker node key.
1047 * @param task - The task to execute.
1048 */
1049 protected beforeTaskExecutionHook (
1050 workerNodeKey: number,
1051 task: Task<Data>
1052 ): void {
1053 if (this.workerNodes[workerNodeKey]?.usage != null) {
1054 const workerUsage = this.workerNodes[workerNodeKey].usage
1055 ++workerUsage.tasks.executing
1056 this.updateWaitTimeWorkerUsage(workerUsage, task)
1057 }
1058 if (
1059 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1060 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
1061 task.name as string
1062 ) != null
1063 ) {
1064 const taskFunctionWorkerUsage = this.workerNodes[
1065 workerNodeKey
1066 ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
1067 ++taskFunctionWorkerUsage.tasks.executing
1068 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
1069 }
1070 }
1071
1072 /**
1073 * Hook executed after the worker task execution.
1074 * Can be overridden.
1075 *
1076 * @param workerNodeKey - The worker node key.
1077 * @param message - The received message.
1078 */
1079 protected afterTaskExecutionHook (
1080 workerNodeKey: number,
1081 message: MessageValue<Response>
1082 ): void {
1083 if (this.workerNodes[workerNodeKey]?.usage != null) {
1084 const workerUsage = this.workerNodes[workerNodeKey].usage
1085 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
1086 this.updateRunTimeWorkerUsage(workerUsage, message)
1087 this.updateEluWorkerUsage(workerUsage, message)
1088 }
1089 if (
1090 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1091 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
1092 message.taskPerformance?.name as string
1093 ) != null
1094 ) {
1095 const taskFunctionWorkerUsage = this.workerNodes[
1096 workerNodeKey
1097 ].getTaskFunctionWorkerUsage(
1098 message.taskPerformance?.name as string
1099 ) as WorkerUsage
1100 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
1101 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
1102 this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
1103 }
1104 }
1105
1106 /**
1107 * Whether the worker node shall update its task function worker usage or not.
1108 *
1109 * @param workerNodeKey - The worker node key.
1110 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1111 */
1112 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
1113 const workerInfo = this.getWorkerInfo(workerNodeKey)
1114 return (
1115 workerInfo != null &&
1116 Array.isArray(workerInfo.taskFunctionNames) &&
1117 workerInfo.taskFunctionNames.length > 2
1118 )
1119 }
1120
1121 private updateTaskStatisticsWorkerUsage (
1122 workerUsage: WorkerUsage,
1123 message: MessageValue<Response>
1124 ): void {
1125 const workerTaskStatistics = workerUsage.tasks
1126 if (
1127 workerTaskStatistics.executing != null &&
1128 workerTaskStatistics.executing > 0
1129 ) {
1130 --workerTaskStatistics.executing
1131 }
1132 if (message.workerError == null) {
1133 ++workerTaskStatistics.executed
1134 } else {
1135 ++workerTaskStatistics.failed
1136 }
1137 }
1138
1139 private updateRunTimeWorkerUsage (
1140 workerUsage: WorkerUsage,
1141 message: MessageValue<Response>
1142 ): void {
1143 if (message.workerError != null) {
1144 return
1145 }
1146 updateMeasurementStatistics(
1147 workerUsage.runTime,
1148 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
1149 message.taskPerformance?.runTime ?? 0
1150 )
1151 }
1152
1153 private updateWaitTimeWorkerUsage (
1154 workerUsage: WorkerUsage,
1155 task: Task<Data>
1156 ): void {
1157 const timestamp = performance.now()
1158 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
1159 updateMeasurementStatistics(
1160 workerUsage.waitTime,
1161 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
1162 taskWaitTime
1163 )
1164 }
1165
1166 private updateEluWorkerUsage (
1167 workerUsage: WorkerUsage,
1168 message: MessageValue<Response>
1169 ): void {
1170 if (message.workerError != null) {
1171 return
1172 }
1173 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
1174 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
1175 updateMeasurementStatistics(
1176 workerUsage.elu.active,
1177 eluTaskStatisticsRequirements,
1178 message.taskPerformance?.elu?.active ?? 0
1179 )
1180 updateMeasurementStatistics(
1181 workerUsage.elu.idle,
1182 eluTaskStatisticsRequirements,
1183 message.taskPerformance?.elu?.idle ?? 0
1184 )
1185 if (eluTaskStatisticsRequirements.aggregate) {
1186 if (message.taskPerformance?.elu != null) {
1187 if (workerUsage.elu.utilization != null) {
1188 workerUsage.elu.utilization =
1189 (workerUsage.elu.utilization +
1190 message.taskPerformance.elu.utilization) /
1191 2
1192 } else {
1193 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
1194 }
1195 }
1196 }
1197 }
1198
1199 /**
1200 * Chooses a worker node for the next task.
1201 *
1202 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
1203 *
1204 * @returns The chosen worker node key
1205 */
1206 private chooseWorkerNode (): number {
1207 if (this.shallCreateDynamicWorker()) {
1208 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
1209 if (
1210 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1211 ) {
1212 return workerNodeKey
1213 }
1214 }
1215 return this.workerChoiceStrategyContext.execute()
1216 }
1217
1218 /**
1219 * Conditions for dynamic worker creation.
1220 *
1221 * @returns Whether to create a dynamic worker or not.
1222 */
1223 private shallCreateDynamicWorker (): boolean {
1224 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
1225 }
1226
1227 /**
1228 * Sends a message to worker given its worker node key.
1229 *
1230 * @param workerNodeKey - The worker node key.
1231 * @param message - The message.
1232 * @param transferList - The optional array of transferable objects.
1233 */
1234 protected abstract sendToWorker (
1235 workerNodeKey: number,
1236 message: MessageValue<Data>,
1237 transferList?: TransferListItem[]
1238 ): void
1239
1240 /**
1241 * Creates a new worker.
1242 *
1243 * @returns Newly created worker.
1244 */
1245 protected abstract createWorker (): Worker
1246
1247 /**
1248 * Creates a new, completely set up worker node.
1249 *
1250 * @returns New, completely set up worker node key.
1251 */
1252 protected createAndSetupWorkerNode (): number {
1253 const worker = this.createWorker()
1254
1255 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
1256 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
1257 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1258 worker.on('error', error => {
1259 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1260 this.flagWorkerNodeAsNotReady(workerNodeKey)
1261 const workerInfo = this.getWorkerInfo(workerNodeKey)
1262 this.emitter?.emit(PoolEvents.error, error)
1263 this.workerNodes[workerNodeKey].closeChannel()
1264 if (
1265 this.started &&
1266 !this.starting &&
1267 !this.destroying &&
1268 this.opts.restartWorkerOnError === true
1269 ) {
1270 if (workerInfo.dynamic) {
1271 this.createAndSetupDynamicWorkerNode()
1272 } else {
1273 this.createAndSetupWorkerNode()
1274 }
1275 }
1276 if (this.started && this.opts.enableTasksQueue === true) {
1277 this.redistributeQueuedTasks(workerNodeKey)
1278 }
1279 })
1280 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
1281 worker.once('exit', () => {
1282 this.removeWorkerNode(worker)
1283 })
1284
1285 const workerNodeKey = this.addWorkerNode(worker)
1286
1287 this.afterWorkerNodeSetup(workerNodeKey)
1288
1289 return workerNodeKey
1290 }
1291
1292 /**
1293 * Creates a new, completely set up dynamic worker node.
1294 *
1295 * @returns New, completely set up dynamic worker node key.
1296 */
1297 protected createAndSetupDynamicWorkerNode (): number {
1298 const workerNodeKey = this.createAndSetupWorkerNode()
1299 this.registerWorkerMessageListener(workerNodeKey, message => {
1300 this.checkMessageWorkerId(message)
1301 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1302 message.workerId
1303 )
1304 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
1305 // Kill message received from worker
1306 if (
1307 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1308 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
1309 ((this.opts.enableTasksQueue === false &&
1310 workerUsage.tasks.executing === 0) ||
1311 (this.opts.enableTasksQueue === true &&
1312 workerUsage.tasks.executing === 0 &&
1313 this.tasksQueueSize(localWorkerNodeKey) === 0)))
1314 ) {
1315 // Flag the worker node as not ready immediately
1316 this.flagWorkerNodeAsNotReady(localWorkerNodeKey)
1317 this.destroyWorkerNode(localWorkerNodeKey).catch(error => {
1318 this.emitter?.emit(PoolEvents.error, error)
1319 })
1320 }
1321 })
1322 const workerInfo = this.getWorkerInfo(workerNodeKey)
1323 this.sendToWorker(workerNodeKey, {
1324 checkActive: true
1325 })
1326 if (this.taskFunctions.size > 0) {
1327 for (const [taskFunctionName, taskFunction] of this.taskFunctions) {
1328 this.sendTaskFunctionOperationToWorker(workerNodeKey, {
1329 taskFunctionOperation: 'add',
1330 taskFunctionName,
1331 taskFunction: taskFunction.toString()
1332 }).catch(error => {
1333 this.emitter?.emit(PoolEvents.error, error)
1334 })
1335 }
1336 }
1337 workerInfo.dynamic = true
1338 if (
1339 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
1340 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1341 ) {
1342 workerInfo.ready = true
1343 }
1344 this.checkAndEmitDynamicWorkerCreationEvents()
1345 return workerNodeKey
1346 }
1347
1348 /**
1349 * Registers a listener callback on the worker given its worker node key.
1350 *
1351 * @param workerNodeKey - The worker node key.
1352 * @param listener - The message listener callback.
1353 */
1354 protected abstract registerWorkerMessageListener<
1355 Message extends Data | Response
1356 >(
1357 workerNodeKey: number,
1358 listener: (message: MessageValue<Message>) => void
1359 ): void
1360
1361 /**
1362 * Registers once a listener callback on the worker given its worker node key.
1363 *
1364 * @param workerNodeKey - The worker node key.
1365 * @param listener - The message listener callback.
1366 */
1367 protected abstract registerOnceWorkerMessageListener<
1368 Message extends Data | Response
1369 >(
1370 workerNodeKey: number,
1371 listener: (message: MessageValue<Message>) => void
1372 ): void
1373
1374 /**
1375 * Deregisters a listener callback on the worker given its worker node key.
1376 *
1377 * @param workerNodeKey - The worker node key.
1378 * @param listener - The message listener callback.
1379 */
1380 protected abstract deregisterWorkerMessageListener<
1381 Message extends Data | Response
1382 >(
1383 workerNodeKey: number,
1384 listener: (message: MessageValue<Message>) => void
1385 ): void
1386
1387 /**
1388 * Method hooked up after a worker node has been newly created.
1389 * Can be overridden.
1390 *
1391 * @param workerNodeKey - The newly created worker node key.
1392 */
1393 protected afterWorkerNodeSetup (workerNodeKey: number): void {
1394 // Listen to worker messages.
1395 this.registerWorkerMessageListener(
1396 workerNodeKey,
1397 this.workerMessageListener.bind(this)
1398 )
1399 // Send the startup message to worker.
1400 this.sendStartupMessageToWorker(workerNodeKey)
1401 // Send the statistics message to worker.
1402 this.sendStatisticsMessageToWorker(workerNodeKey)
1403 if (this.opts.enableTasksQueue === true) {
1404 if (this.opts.tasksQueueOptions?.taskStealing === true) {
1405 this.workerNodes[workerNodeKey].addEventListener(
1406 'idleWorkerNode',
1407 this.handleIdleWorkerNodeEvent as EventListener
1408 )
1409 }
1410 if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
1411 this.workerNodes[workerNodeKey].addEventListener(
1412 'backPressure',
1413 this.handleBackPressureEvent as EventListener
1414 )
1415 }
1416 }
1417 }
1418
1419 /**
1420 * Sends the startup message to worker given its worker node key.
1421 *
1422 * @param workerNodeKey - The worker node key.
1423 */
1424 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1425
1426 /**
1427 * Sends the statistics message to worker given its worker node key.
1428 *
1429 * @param workerNodeKey - The worker node key.
1430 */
1431 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
1432 this.sendToWorker(workerNodeKey, {
1433 statistics: {
1434 runTime:
1435 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1436 .runTime.aggregate,
1437 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1438 .elu.aggregate
1439 }
1440 })
1441 }
1442
1443 private redistributeQueuedTasks (workerNodeKey: number): void {
1444 while (this.tasksQueueSize(workerNodeKey) > 0) {
1445 const destinationWorkerNodeKey = this.workerNodes.reduce(
1446 (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
1447 return workerNode.info.ready &&
1448 workerNode.usage.tasks.queued <
1449 workerNodes[minWorkerNodeKey].usage.tasks.queued
1450 ? workerNodeKey
1451 : minWorkerNodeKey
1452 },
1453 0
1454 )
1455 const task = this.dequeueTask(workerNodeKey) as Task<Data>
1456 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1457 this.executeTask(destinationWorkerNodeKey, task)
1458 } else {
1459 this.enqueueTask(destinationWorkerNodeKey, task)
1460 }
1461 }
1462 }
1463
1464 private updateTaskStolenStatisticsWorkerUsage (
1465 workerNodeKey: number,
1466 taskName: string
1467 ): void {
1468 const workerNode = this.workerNodes[workerNodeKey]
1469 if (workerNode?.usage != null) {
1470 ++workerNode.usage.tasks.stolen
1471 }
1472 if (
1473 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1474 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1475 ) {
1476 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1477 taskName
1478 ) as WorkerUsage
1479 ++taskFunctionWorkerUsage.tasks.stolen
1480 }
1481 }
1482
1483 private updateTaskSequentiallyStolenStatisticsWorkerUsage (
1484 workerNodeKey: number,
1485 taskName: string
1486 ): void {
1487 const workerNode = this.workerNodes[workerNodeKey]
1488 if (workerNode?.usage != null) {
1489 ++workerNode.usage.tasks.sequentiallyStolen
1490 }
1491 if (
1492 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1493 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1494 ) {
1495 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1496 taskName
1497 ) as WorkerUsage
1498 ++taskFunctionWorkerUsage.tasks.sequentiallyStolen
1499 }
1500 }
1501
1502 private resetTaskSequentiallyStolenStatisticsWorkerUsage (
1503 workerNodeKey: number,
1504 taskName: string
1505 ): void {
1506 const workerNode = this.workerNodes[workerNodeKey]
1507 if (workerNode?.usage != null) {
1508 workerNode.usage.tasks.sequentiallyStolen = 0
1509 }
1510 if (
1511 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1512 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1513 ) {
1514 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1515 taskName
1516 ) as WorkerUsage
1517 taskFunctionWorkerUsage.tasks.sequentiallyStolen = 0
1518 }
1519 }
1520
1521 private readonly handleIdleWorkerNodeEvent = (
1522 event: CustomEvent<WorkerNodeEventDetail>,
1523 previousStolenTask?: Task<Data>
1524 ): void => {
1525 const { workerNodeKey } = event.detail
1526 if (workerNodeKey == null) {
1527 throw new Error(
1528 'WorkerNode event detail workerNodeKey attribute must be defined'
1529 )
1530 }
1531 const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks
1532 if (
1533 previousStolenTask != null &&
1534 workerNodeTasksUsage.sequentiallyStolen > 0 &&
1535 (workerNodeTasksUsage.executing > 0 ||
1536 this.tasksQueueSize(workerNodeKey) > 0)
1537 ) {
1538 this.resetTaskSequentiallyStolenStatisticsWorkerUsage(
1539 workerNodeKey,
1540 previousStolenTask.name as string
1541 )
1542 return
1543 }
1544 const stolenTask = this.workerNodeStealTask(workerNodeKey)
1545 sleep(exponentialDelay(workerNodeTasksUsage.sequentiallyStolen))
1546 .then(() => {
1547 this.handleIdleWorkerNodeEvent(event, stolenTask)
1548 return undefined
1549 })
1550 .catch(EMPTY_FUNCTION)
1551 }
1552
1553 private readonly workerNodeStealTask = (
1554 workerNodeKey: number
1555 ): Task<Data> | undefined => {
1556 const workerNodes = this.workerNodes
1557 .slice()
1558 .sort(
1559 (workerNodeA, workerNodeB) =>
1560 workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
1561 )
1562 const sourceWorkerNode = workerNodes.find(
1563 (sourceWorkerNode, sourceWorkerNodeKey) =>
1564 sourceWorkerNode.info.ready &&
1565 sourceWorkerNodeKey !== workerNodeKey &&
1566 sourceWorkerNode.usage.tasks.queued > 0
1567 )
1568 if (sourceWorkerNode != null) {
1569 const task = sourceWorkerNode.popTask() as Task<Data>
1570 if (this.shallExecuteTask(workerNodeKey)) {
1571 this.executeTask(workerNodeKey, task)
1572 } else {
1573 this.enqueueTask(workerNodeKey, task)
1574 }
1575 this.updateTaskSequentiallyStolenStatisticsWorkerUsage(
1576 workerNodeKey,
1577 task.name as string
1578 )
1579 this.updateTaskStolenStatisticsWorkerUsage(
1580 workerNodeKey,
1581 task.name as string
1582 )
1583 return task
1584 }
1585 }
1586
1587 private readonly handleBackPressureEvent = (
1588 event: CustomEvent<WorkerNodeEventDetail>
1589 ): void => {
1590 const { workerId } = event.detail
1591 const sizeOffset = 1
1592 if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) {
1593 return
1594 }
1595 const sourceWorkerNode =
1596 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
1597 const workerNodes = this.workerNodes
1598 .slice()
1599 .sort(
1600 (workerNodeA, workerNodeB) =>
1601 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1602 )
1603 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1604 if (
1605 sourceWorkerNode.usage.tasks.queued > 0 &&
1606 workerNode.info.ready &&
1607 workerNode.info.id !== workerId &&
1608 workerNode.usage.tasks.queued <
1609 (this.opts.tasksQueueOptions?.size as number) - sizeOffset
1610 ) {
1611 const task = sourceWorkerNode.popTask() as Task<Data>
1612 if (this.shallExecuteTask(workerNodeKey)) {
1613 this.executeTask(workerNodeKey, task)
1614 } else {
1615 this.enqueueTask(workerNodeKey, task)
1616 }
1617 this.updateTaskStolenStatisticsWorkerUsage(
1618 workerNodeKey,
1619 task.name as string
1620 )
1621 }
1622 }
1623 }
1624
1625 /**
1626 * This method is the message listener registered on each worker.
1627 */
1628 protected workerMessageListener (message: MessageValue<Response>): void {
1629 this.checkMessageWorkerId(message)
1630 if (message.ready != null && message.taskFunctionNames != null) {
1631 // Worker ready response received from worker
1632 this.handleWorkerReadyResponse(message)
1633 } else if (message.taskId != null) {
1634 // Task execution response received from worker
1635 this.handleTaskExecutionResponse(message)
1636 } else if (message.taskFunctionNames != null) {
1637 // Task function names message received from worker
1638 this.getWorkerInfo(
1639 this.getWorkerNodeKeyByWorkerId(message.workerId)
1640 ).taskFunctionNames = message.taskFunctionNames
1641 }
1642 }
1643
1644 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
1645 const { workerId, ready, taskFunctionNames } = message
1646 if (ready === false) {
1647 throw new Error(`Worker ${workerId as number} failed to initialize`)
1648 }
1649 const workerInfo = this.getWorkerInfo(
1650 this.getWorkerNodeKeyByWorkerId(workerId)
1651 )
1652 workerInfo.ready = ready as boolean
1653 workerInfo.taskFunctionNames = taskFunctionNames
1654 if (!this.readyEventEmitted && this.ready) {
1655 this.readyEventEmitted = true
1656 this.emitter?.emit(PoolEvents.ready, this.info)
1657 }
1658 }
1659
1660 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1661 const { workerId, taskId, workerError, data } = message
1662 const promiseResponse = this.promiseResponseMap.get(taskId as string)
1663 if (promiseResponse != null) {
1664 const { resolve, reject, workerNodeKey } = promiseResponse
1665 if (workerError != null) {
1666 this.emitter?.emit(PoolEvents.taskError, workerError)
1667 reject(workerError.message)
1668 } else {
1669 resolve(data as Response)
1670 }
1671 this.afterTaskExecutionHook(workerNodeKey, message)
1672 this.workerChoiceStrategyContext.update(workerNodeKey)
1673 this.promiseResponseMap.delete(taskId as string)
1674 if (this.opts.enableTasksQueue === true) {
1675 const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks
1676 if (
1677 this.tasksQueueSize(workerNodeKey) > 0 &&
1678 workerNodeTasksUsage.executing <
1679 (this.opts.tasksQueueOptions?.concurrency as number)
1680 ) {
1681 this.executeTask(
1682 workerNodeKey,
1683 this.dequeueTask(workerNodeKey) as Task<Data>
1684 )
1685 }
1686 if (
1687 workerNodeTasksUsage.executing === 0 &&
1688 this.tasksQueueSize(workerNodeKey) === 0 &&
1689 workerNodeTasksUsage.sequentiallyStolen === 0
1690 ) {
1691 this.workerNodes[workerNodeKey].dispatchEvent(
1692 new CustomEvent<WorkerNodeEventDetail>('idleWorkerNode', {
1693 detail: { workerId: workerId as number, workerNodeKey }
1694 })
1695 )
1696 }
1697 }
1698 }
1699 }
1700
1701 private checkAndEmitTaskExecutionEvents (): void {
1702 if (this.busy) {
1703 this.emitter?.emit(PoolEvents.busy, this.info)
1704 }
1705 }
1706
1707 private checkAndEmitTaskQueuingEvents (): void {
1708 if (this.hasBackPressure()) {
1709 this.emitter?.emit(PoolEvents.backPressure, this.info)
1710 }
1711 }
1712
1713 private checkAndEmitDynamicWorkerCreationEvents (): void {
1714 if (this.type === PoolTypes.dynamic) {
1715 if (this.full) {
1716 this.emitter?.emit(PoolEvents.full, this.info)
1717 }
1718 }
1719 }
1720
1721 /**
1722 * Gets the worker information given its worker node key.
1723 *
1724 * @param workerNodeKey - The worker node key.
1725 * @returns The worker information.
1726 */
1727 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
1728 return this.workerNodes[workerNodeKey]?.info
1729 }
1730
1731 /**
1732 * Adds the given worker in the pool worker nodes.
1733 *
1734 * @param worker - The worker.
1735 * @returns The added worker node key.
1736 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1737 */
1738 private addWorkerNode (worker: Worker): number {
1739 const workerNode = new WorkerNode<Worker, Data>(
1740 worker,
1741 this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2)
1742 )
1743 // Flag the worker node as ready at pool startup.
1744 if (this.starting) {
1745 workerNode.info.ready = true
1746 }
1747 this.workerNodes.push(workerNode)
1748 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1749 if (workerNodeKey === -1) {
1750 throw new Error('Worker added not found in worker nodes')
1751 }
1752 return workerNodeKey
1753 }
1754
1755 /**
1756 * Removes the given worker from the pool worker nodes.
1757 *
1758 * @param worker - The worker.
1759 */
1760 private removeWorkerNode (worker: Worker): void {
1761 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1762 if (workerNodeKey !== -1) {
1763 this.workerNodes.splice(workerNodeKey, 1)
1764 this.workerChoiceStrategyContext.remove(workerNodeKey)
1765 }
1766 }
1767
1768 protected flagWorkerNodeAsNotReady (workerNodeKey: number): void {
1769 this.getWorkerInfo(workerNodeKey).ready = false
1770 }
1771
1772 /** @inheritDoc */
1773 public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
1774 return (
1775 this.opts.enableTasksQueue === true &&
1776 this.workerNodes[workerNodeKey].hasBackPressure()
1777 )
1778 }
1779
1780 private hasBackPressure (): boolean {
1781 return (
1782 this.opts.enableTasksQueue === true &&
1783 this.workerNodes.findIndex(
1784 workerNode => !workerNode.hasBackPressure()
1785 ) === -1
1786 )
1787 }
1788
1789 /**
1790 * Executes the given task on the worker given its worker node key.
1791 *
1792 * @param workerNodeKey - The worker node key.
1793 * @param task - The task to execute.
1794 */
1795 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1796 this.beforeTaskExecutionHook(workerNodeKey, task)
1797 this.sendToWorker(workerNodeKey, task, task.transferList)
1798 this.checkAndEmitTaskExecutionEvents()
1799 }
1800
1801 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1802 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1803 this.checkAndEmitTaskQueuingEvents()
1804 return tasksQueueSize
1805 }
1806
1807 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1808 return this.workerNodes[workerNodeKey].dequeueTask()
1809 }
1810
1811 private tasksQueueSize (workerNodeKey: number): number {
1812 return this.workerNodes[workerNodeKey].tasksQueueSize()
1813 }
1814
1815 protected flushTasksQueue (workerNodeKey: number): void {
1816 while (this.tasksQueueSize(workerNodeKey) > 0) {
1817 this.executeTask(
1818 workerNodeKey,
1819 this.dequeueTask(workerNodeKey) as Task<Data>
1820 )
1821 }
1822 this.workerNodes[workerNodeKey].clearTasksQueue()
1823 }
1824
1825 private flushTasksQueues (): void {
1826 for (const [workerNodeKey] of this.workerNodes.entries()) {
1827 this.flushTasksQueue(workerNodeKey)
1828 }
1829 }
1830 }