perf: update worker choice strategies internal if needed
[poolifier.git] / src / pools / abstract-pool.ts
1 import { randomUUID } from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import type { TransferListItem } from 'node:worker_threads'
4 import { EventEmitterAsyncResource } from 'node:events'
5 import { AsyncResource } from 'node:async_hooks'
6 import type {
7 MessageValue,
8 PromiseResponseWrapper,
9 Task
10 } from '../utility-types'
11 import {
12 DEFAULT_TASK_NAME,
13 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
14 EMPTY_FUNCTION,
15 average,
16 exponentialDelay,
17 isKillBehavior,
18 isPlainObject,
19 max,
20 median,
21 min,
22 round,
23 sleep
24 } from '../utils'
25 import { KillBehaviors } from '../worker/worker-options'
26 import type { TaskFunction } from '../worker/task-functions'
27 import {
28 type IPool,
29 PoolEvents,
30 type PoolInfo,
31 type PoolOptions,
32 type PoolType,
33 PoolTypes,
34 type TasksQueueOptions
35 } from './pool'
36 import type {
37 IWorker,
38 IWorkerNode,
39 TaskStatistics,
40 WorkerInfo,
41 WorkerNodeEventDetail,
42 WorkerType,
43 WorkerUsage
44 } from './worker'
45 import {
46 Measurements,
47 WorkerChoiceStrategies,
48 type WorkerChoiceStrategy,
49 type WorkerChoiceStrategyOptions
50 } from './selection-strategies/selection-strategies-types'
51 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
52 import { version } from './version'
53 import { WorkerNode } from './worker-node'
54 import {
55 checkFilePath,
56 checkValidTasksQueueOptions,
57 checkValidWorkerChoiceStrategy,
58 updateEluWorkerUsage,
59 updateRunTimeWorkerUsage,
60 updateTaskStatisticsWorkerUsage,
61 updateWaitTimeWorkerUsage,
62 waitWorkerNodeEvents
63 } from './utils'
64
65 /**
66 * Base class that implements some shared logic for all poolifier pools.
67 *
68 * @typeParam Worker - Type of worker which manages this pool.
69 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
70 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
71 */
72 export abstract class AbstractPool<
73 Worker extends IWorker,
74 Data = unknown,
75 Response = unknown
76 > implements IPool<Worker, Data, Response> {
77 /** @inheritDoc */
78 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
79
80 /** @inheritDoc */
81 public emitter?: EventEmitterAsyncResource
82
83 /**
84 * Dynamic pool maximum size property placeholder.
85 */
86 protected readonly max?: number
87
88 /**
89 * The task execution response promise map:
90 * - `key`: The message id of each submitted task.
91 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
92 *
93 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
94 */
95 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
96 new Map<string, PromiseResponseWrapper<Response>>()
97
98 /**
99 * Worker choice strategy context referencing a worker choice algorithm implementation.
100 */
101 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
102 Worker,
103 Data,
104 Response
105 >
106
107 /**
108 * The task functions added at runtime map:
109 * - `key`: The task function name.
110 * - `value`: The task function itself.
111 */
112 private readonly taskFunctions: Map<string, TaskFunction<Data, Response>>
113
114 /**
115 * Whether the pool is started or not.
116 */
117 private started: boolean
118 /**
119 * Whether the pool is starting or not.
120 */
121 private starting: boolean
122 /**
123 * Whether the pool is destroying or not.
124 */
125 private destroying: boolean
126 /**
127 * Whether the pool ready event has been emitted or not.
128 */
129 private readyEventEmitted: boolean
130 /**
131 * The start timestamp of the pool.
132 */
133 private readonly startTimestamp
134
135 /**
136 * Constructs a new poolifier pool.
137 *
138 * @param numberOfWorkers - Number of workers that this pool should manage.
139 * @param filePath - Path to the worker file.
140 * @param opts - Options for the pool.
141 */
142 public constructor (
143 protected readonly numberOfWorkers: number,
144 protected readonly filePath: string,
145 protected readonly opts: PoolOptions<Worker>
146 ) {
147 if (!this.isMain()) {
148 throw new Error(
149 'Cannot start a pool from a worker with the same type as the pool'
150 )
151 }
152 checkFilePath(this.filePath)
153 this.checkNumberOfWorkers(this.numberOfWorkers)
154 this.checkPoolOptions(this.opts)
155
156 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
157 this.executeTask = this.executeTask.bind(this)
158 this.enqueueTask = this.enqueueTask.bind(this)
159
160 if (this.opts.enableEvents === true) {
161 this.initializeEventEmitter()
162 }
163 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
164 Worker,
165 Data,
166 Response
167 >(
168 this,
169 this.opts.workerChoiceStrategy,
170 this.opts.workerChoiceStrategyOptions
171 )
172
173 this.setupHook()
174
175 this.taskFunctions = new Map<string, TaskFunction<Data, Response>>()
176
177 this.started = false
178 this.starting = false
179 this.destroying = false
180 this.readyEventEmitted = false
181 if (this.opts.startWorkers === true) {
182 this.start()
183 }
184
185 this.startTimestamp = performance.now()
186 }
187
188 private checkNumberOfWorkers (numberOfWorkers: number): void {
189 if (numberOfWorkers == null) {
190 throw new Error(
191 'Cannot instantiate a pool without specifying the number of workers'
192 )
193 } else if (!Number.isSafeInteger(numberOfWorkers)) {
194 throw new TypeError(
195 'Cannot instantiate a pool with a non safe integer number of workers'
196 )
197 } else if (numberOfWorkers < 0) {
198 throw new RangeError(
199 'Cannot instantiate a pool with a negative number of workers'
200 )
201 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
202 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
203 }
204 }
205
206 private checkPoolOptions (opts: PoolOptions<Worker>): void {
207 if (isPlainObject(opts)) {
208 this.opts.startWorkers = opts.startWorkers ?? true
209 checkValidWorkerChoiceStrategy(
210 opts.workerChoiceStrategy as WorkerChoiceStrategy
211 )
212 this.opts.workerChoiceStrategy =
213 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
214 this.checkValidWorkerChoiceStrategyOptions(
215 opts.workerChoiceStrategyOptions as WorkerChoiceStrategyOptions
216 )
217 this.opts.workerChoiceStrategyOptions = {
218 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
219 ...opts.workerChoiceStrategyOptions
220 }
221 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
222 this.opts.enableEvents = opts.enableEvents ?? true
223 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
224 if (this.opts.enableTasksQueue) {
225 checkValidTasksQueueOptions(opts.tasksQueueOptions as TasksQueueOptions)
226 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
227 opts.tasksQueueOptions as TasksQueueOptions
228 )
229 }
230 } else {
231 throw new TypeError('Invalid pool options: must be a plain object')
232 }
233 }
234
235 private checkValidWorkerChoiceStrategyOptions (
236 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
237 ): void {
238 if (
239 workerChoiceStrategyOptions != null &&
240 !isPlainObject(workerChoiceStrategyOptions)
241 ) {
242 throw new TypeError(
243 'Invalid worker choice strategy options: must be a plain object'
244 )
245 }
246 if (
247 workerChoiceStrategyOptions?.retries != null &&
248 !Number.isSafeInteger(workerChoiceStrategyOptions.retries)
249 ) {
250 throw new TypeError(
251 'Invalid worker choice strategy options: retries must be an integer'
252 )
253 }
254 if (
255 workerChoiceStrategyOptions?.retries != null &&
256 workerChoiceStrategyOptions.retries < 0
257 ) {
258 throw new RangeError(
259 `Invalid worker choice strategy options: retries '${workerChoiceStrategyOptions.retries}' must be greater or equal than zero`
260 )
261 }
262 if (
263 workerChoiceStrategyOptions?.weights != null &&
264 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
265 ) {
266 throw new Error(
267 'Invalid worker choice strategy options: must have a weight for each worker node'
268 )
269 }
270 if (
271 workerChoiceStrategyOptions?.measurement != null &&
272 !Object.values(Measurements).includes(
273 workerChoiceStrategyOptions.measurement
274 )
275 ) {
276 throw new Error(
277 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
278 )
279 }
280 }
281
282 private initializeEventEmitter (): void {
283 this.emitter = new EventEmitterAsyncResource({
284 name: `poolifier:${this.type}-${this.worker}-pool`
285 })
286 }
287
288 /** @inheritDoc */
289 public get info (): PoolInfo {
290 return {
291 version,
292 type: this.type,
293 worker: this.worker,
294 started: this.started,
295 ready: this.ready,
296 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
297 minSize: this.minSize,
298 maxSize: this.maxSize,
299 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
300 .runTime.aggregate &&
301 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
302 .waitTime.aggregate && { utilization: round(this.utilization) }),
303 workerNodes: this.workerNodes.length,
304 idleWorkerNodes: this.workerNodes.reduce(
305 (accumulator, workerNode) =>
306 workerNode.usage.tasks.executing === 0
307 ? accumulator + 1
308 : accumulator,
309 0
310 ),
311 busyWorkerNodes: this.workerNodes.reduce(
312 (accumulator, _workerNode, workerNodeKey) =>
313 this.isWorkerNodeBusy(workerNodeKey) ? accumulator + 1 : accumulator,
314 0
315 ),
316 executedTasks: this.workerNodes.reduce(
317 (accumulator, workerNode) =>
318 accumulator + workerNode.usage.tasks.executed,
319 0
320 ),
321 executingTasks: this.workerNodes.reduce(
322 (accumulator, workerNode) =>
323 accumulator + workerNode.usage.tasks.executing,
324 0
325 ),
326 ...(this.opts.enableTasksQueue === true && {
327 queuedTasks: this.workerNodes.reduce(
328 (accumulator, workerNode) =>
329 accumulator + workerNode.usage.tasks.queued,
330 0
331 )
332 }),
333 ...(this.opts.enableTasksQueue === true && {
334 maxQueuedTasks: this.workerNodes.reduce(
335 (accumulator, workerNode) =>
336 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
337 0
338 )
339 }),
340 ...(this.opts.enableTasksQueue === true && {
341 backPressure: this.hasBackPressure()
342 }),
343 ...(this.opts.enableTasksQueue === true && {
344 stolenTasks: this.workerNodes.reduce(
345 (accumulator, workerNode) =>
346 accumulator + workerNode.usage.tasks.stolen,
347 0
348 )
349 }),
350 failedTasks: this.workerNodes.reduce(
351 (accumulator, workerNode) =>
352 accumulator + workerNode.usage.tasks.failed,
353 0
354 ),
355 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
356 .runTime.aggregate && {
357 runTime: {
358 minimum: round(
359 min(
360 ...this.workerNodes.map(
361 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
362 )
363 )
364 ),
365 maximum: round(
366 max(
367 ...this.workerNodes.map(
368 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
369 )
370 )
371 ),
372 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
373 .runTime.average && {
374 average: round(
375 average(
376 this.workerNodes.reduce<number[]>(
377 (accumulator, workerNode) =>
378 accumulator.concat(workerNode.usage.runTime.history),
379 []
380 )
381 )
382 )
383 }),
384 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
385 .runTime.median && {
386 median: round(
387 median(
388 this.workerNodes.reduce<number[]>(
389 (accumulator, workerNode) =>
390 accumulator.concat(workerNode.usage.runTime.history),
391 []
392 )
393 )
394 )
395 })
396 }
397 }),
398 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
399 .waitTime.aggregate && {
400 waitTime: {
401 minimum: round(
402 min(
403 ...this.workerNodes.map(
404 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
405 )
406 )
407 ),
408 maximum: round(
409 max(
410 ...this.workerNodes.map(
411 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
412 )
413 )
414 ),
415 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
416 .waitTime.average && {
417 average: round(
418 average(
419 this.workerNodes.reduce<number[]>(
420 (accumulator, workerNode) =>
421 accumulator.concat(workerNode.usage.waitTime.history),
422 []
423 )
424 )
425 )
426 }),
427 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
428 .waitTime.median && {
429 median: round(
430 median(
431 this.workerNodes.reduce<number[]>(
432 (accumulator, workerNode) =>
433 accumulator.concat(workerNode.usage.waitTime.history),
434 []
435 )
436 )
437 )
438 })
439 }
440 })
441 }
442 }
443
444 /**
445 * The pool readiness boolean status.
446 */
447 private get ready (): boolean {
448 return (
449 this.workerNodes.reduce(
450 (accumulator, workerNode) =>
451 !workerNode.info.dynamic && workerNode.info.ready
452 ? accumulator + 1
453 : accumulator,
454 0
455 ) >= this.minSize
456 )
457 }
458
459 /**
460 * The approximate pool utilization.
461 *
462 * @returns The pool utilization.
463 */
464 private get utilization (): number {
465 const poolTimeCapacity =
466 (performance.now() - this.startTimestamp) * this.maxSize
467 const totalTasksRunTime = this.workerNodes.reduce(
468 (accumulator, workerNode) =>
469 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
470 0
471 )
472 const totalTasksWaitTime = this.workerNodes.reduce(
473 (accumulator, workerNode) =>
474 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
475 0
476 )
477 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
478 }
479
480 /**
481 * The pool type.
482 *
483 * If it is `'dynamic'`, it provides the `max` property.
484 */
485 protected abstract get type (): PoolType
486
487 /**
488 * The worker type.
489 */
490 protected abstract get worker (): WorkerType
491
492 /**
493 * The pool minimum size.
494 */
495 protected get minSize (): number {
496 return this.numberOfWorkers
497 }
498
499 /**
500 * The pool maximum size.
501 */
502 protected get maxSize (): number {
503 return this.max ?? this.numberOfWorkers
504 }
505
506 /**
507 * Checks if the worker id sent in the received message from a worker is valid.
508 *
509 * @param message - The received message.
510 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
511 */
512 private checkMessageWorkerId (message: MessageValue<Data | Response>): void {
513 if (message.workerId == null) {
514 throw new Error('Worker message received without worker id')
515 } else if (this.getWorkerNodeKeyByWorkerId(message.workerId) === -1) {
516 throw new Error(
517 `Worker message received from unknown worker '${message.workerId}'`
518 )
519 }
520 }
521
522 /**
523 * Gets the given worker its worker node key.
524 *
525 * @param worker - The worker.
526 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
527 */
528 private getWorkerNodeKeyByWorker (worker: Worker): number {
529 return this.workerNodes.findIndex(
530 workerNode => workerNode.worker === worker
531 )
532 }
533
534 /**
535 * Gets the worker node key given its worker id.
536 *
537 * @param workerId - The worker id.
538 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
539 */
540 private getWorkerNodeKeyByWorkerId (workerId: number | undefined): number {
541 return this.workerNodes.findIndex(
542 workerNode => workerNode.info.id === workerId
543 )
544 }
545
546 /** @inheritDoc */
547 public setWorkerChoiceStrategy (
548 workerChoiceStrategy: WorkerChoiceStrategy,
549 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
550 ): void {
551 checkValidWorkerChoiceStrategy(workerChoiceStrategy)
552 this.opts.workerChoiceStrategy = workerChoiceStrategy
553 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
554 this.opts.workerChoiceStrategy
555 )
556 if (workerChoiceStrategyOptions != null) {
557 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
558 }
559 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
560 workerNode.resetUsage()
561 this.sendStatisticsMessageToWorker(workerNodeKey)
562 }
563 }
564
565 /** @inheritDoc */
566 public setWorkerChoiceStrategyOptions (
567 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
568 ): void {
569 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
570 this.opts.workerChoiceStrategyOptions = {
571 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
572 ...workerChoiceStrategyOptions
573 }
574 this.workerChoiceStrategyContext.setOptions(
575 this.opts.workerChoiceStrategyOptions
576 )
577 }
578
579 /** @inheritDoc */
580 public enableTasksQueue (
581 enable: boolean,
582 tasksQueueOptions?: TasksQueueOptions
583 ): void {
584 if (this.opts.enableTasksQueue === true && !enable) {
585 this.unsetTaskStealing()
586 this.unsetTasksStealingOnBackPressure()
587 this.flushTasksQueues()
588 }
589 this.opts.enableTasksQueue = enable
590 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
591 }
592
593 /** @inheritDoc */
594 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
595 if (this.opts.enableTasksQueue === true) {
596 checkValidTasksQueueOptions(tasksQueueOptions)
597 this.opts.tasksQueueOptions =
598 this.buildTasksQueueOptions(tasksQueueOptions)
599 this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number)
600 if (this.opts.tasksQueueOptions.taskStealing === true) {
601 this.unsetTaskStealing()
602 this.setTaskStealing()
603 } else {
604 this.unsetTaskStealing()
605 }
606 if (this.opts.tasksQueueOptions.tasksStealingOnBackPressure === true) {
607 this.unsetTasksStealingOnBackPressure()
608 this.setTasksStealingOnBackPressure()
609 } else {
610 this.unsetTasksStealingOnBackPressure()
611 }
612 } else if (this.opts.tasksQueueOptions != null) {
613 delete this.opts.tasksQueueOptions
614 }
615 }
616
617 private buildTasksQueueOptions (
618 tasksQueueOptions: TasksQueueOptions
619 ): TasksQueueOptions {
620 return {
621 ...{
622 size: Math.pow(this.maxSize, 2),
623 concurrency: 1,
624 taskStealing: true,
625 tasksStealingOnBackPressure: true
626 },
627 ...tasksQueueOptions
628 }
629 }
630
631 private setTasksQueueSize (size: number): void {
632 for (const workerNode of this.workerNodes) {
633 workerNode.tasksQueueBackPressureSize = size
634 }
635 }
636
637 private setTaskStealing (): void {
638 for (const [workerNodeKey] of this.workerNodes.entries()) {
639 this.workerNodes[workerNodeKey].on(
640 'idleWorkerNode',
641 this.handleIdleWorkerNodeEvent
642 )
643 }
644 }
645
646 private unsetTaskStealing (): void {
647 for (const [workerNodeKey] of this.workerNodes.entries()) {
648 this.workerNodes[workerNodeKey].off(
649 'idleWorkerNode',
650 this.handleIdleWorkerNodeEvent
651 )
652 }
653 }
654
655 private setTasksStealingOnBackPressure (): void {
656 for (const [workerNodeKey] of this.workerNodes.entries()) {
657 this.workerNodes[workerNodeKey].on(
658 'backPressure',
659 this.handleBackPressureEvent
660 )
661 }
662 }
663
664 private unsetTasksStealingOnBackPressure (): void {
665 for (const [workerNodeKey] of this.workerNodes.entries()) {
666 this.workerNodes[workerNodeKey].off(
667 'backPressure',
668 this.handleBackPressureEvent
669 )
670 }
671 }
672
673 /**
674 * Whether the pool is full or not.
675 *
676 * The pool filling boolean status.
677 */
678 protected get full (): boolean {
679 return this.workerNodes.length >= this.maxSize
680 }
681
682 /**
683 * Whether the pool is busy or not.
684 *
685 * The pool busyness boolean status.
686 */
687 protected abstract get busy (): boolean
688
689 /**
690 * Whether worker nodes are executing concurrently their tasks quota or not.
691 *
692 * @returns Worker nodes busyness boolean status.
693 */
694 protected internalBusy (): boolean {
695 if (this.opts.enableTasksQueue === true) {
696 return (
697 this.workerNodes.findIndex(
698 workerNode =>
699 workerNode.info.ready &&
700 workerNode.usage.tasks.executing <
701 (this.opts.tasksQueueOptions?.concurrency as number)
702 ) === -1
703 )
704 }
705 return (
706 this.workerNodes.findIndex(
707 workerNode =>
708 workerNode.info.ready && workerNode.usage.tasks.executing === 0
709 ) === -1
710 )
711 }
712
713 private isWorkerNodeBusy (workerNodeKey: number): boolean {
714 if (this.opts.enableTasksQueue === true) {
715 return (
716 this.workerNodes[workerNodeKey].usage.tasks.executing >=
717 (this.opts.tasksQueueOptions?.concurrency as number)
718 )
719 }
720 return this.workerNodes[workerNodeKey].usage.tasks.executing > 0
721 }
722
723 private async sendTaskFunctionOperationToWorker (
724 workerNodeKey: number,
725 message: MessageValue<Data>
726 ): Promise<boolean> {
727 return await new Promise<boolean>((resolve, reject) => {
728 const taskFunctionOperationListener = (
729 message: MessageValue<Response>
730 ): void => {
731 this.checkMessageWorkerId(message)
732 const workerId = this.getWorkerInfo(workerNodeKey).id as number
733 if (
734 message.taskFunctionOperationStatus != null &&
735 message.workerId === workerId
736 ) {
737 if (message.taskFunctionOperationStatus) {
738 resolve(true)
739 } else if (!message.taskFunctionOperationStatus) {
740 reject(
741 new Error(
742 `Task function operation '${
743 message.taskFunctionOperation as string
744 }' failed on worker ${message.workerId} with error: '${
745 message.workerError?.message as string
746 }'`
747 )
748 )
749 }
750 this.deregisterWorkerMessageListener(
751 this.getWorkerNodeKeyByWorkerId(message.workerId),
752 taskFunctionOperationListener
753 )
754 }
755 }
756 this.registerWorkerMessageListener(
757 workerNodeKey,
758 taskFunctionOperationListener
759 )
760 this.sendToWorker(workerNodeKey, message)
761 })
762 }
763
764 private async sendTaskFunctionOperationToWorkers (
765 message: MessageValue<Data>
766 ): Promise<boolean> {
767 return await new Promise<boolean>((resolve, reject) => {
768 const responsesReceived = new Array<MessageValue<Response>>()
769 const taskFunctionOperationsListener = (
770 message: MessageValue<Response>
771 ): void => {
772 this.checkMessageWorkerId(message)
773 if (message.taskFunctionOperationStatus != null) {
774 responsesReceived.push(message)
775 if (responsesReceived.length === this.workerNodes.length) {
776 if (
777 responsesReceived.every(
778 message => message.taskFunctionOperationStatus === true
779 )
780 ) {
781 resolve(true)
782 } else if (
783 responsesReceived.some(
784 message => message.taskFunctionOperationStatus === false
785 )
786 ) {
787 const errorResponse = responsesReceived.find(
788 response => response.taskFunctionOperationStatus === false
789 )
790 reject(
791 new Error(
792 `Task function operation '${
793 message.taskFunctionOperation as string
794 }' failed on worker ${
795 errorResponse?.workerId as number
796 } with error: '${
797 errorResponse?.workerError?.message as string
798 }'`
799 )
800 )
801 }
802 this.deregisterWorkerMessageListener(
803 this.getWorkerNodeKeyByWorkerId(message.workerId),
804 taskFunctionOperationsListener
805 )
806 }
807 }
808 }
809 for (const [workerNodeKey] of this.workerNodes.entries()) {
810 this.registerWorkerMessageListener(
811 workerNodeKey,
812 taskFunctionOperationsListener
813 )
814 this.sendToWorker(workerNodeKey, message)
815 }
816 })
817 }
818
819 /** @inheritDoc */
820 public hasTaskFunction (name: string): boolean {
821 for (const workerNode of this.workerNodes) {
822 if (
823 Array.isArray(workerNode.info.taskFunctionNames) &&
824 workerNode.info.taskFunctionNames.includes(name)
825 ) {
826 return true
827 }
828 }
829 return false
830 }
831
832 /** @inheritDoc */
833 public async addTaskFunction (
834 name: string,
835 fn: TaskFunction<Data, Response>
836 ): Promise<boolean> {
837 if (typeof name !== 'string') {
838 throw new TypeError('name argument must be a string')
839 }
840 if (typeof name === 'string' && name.trim().length === 0) {
841 throw new TypeError('name argument must not be an empty string')
842 }
843 if (typeof fn !== 'function') {
844 throw new TypeError('fn argument must be a function')
845 }
846 const opResult = await this.sendTaskFunctionOperationToWorkers({
847 taskFunctionOperation: 'add',
848 taskFunctionName: name,
849 taskFunction: fn.toString()
850 })
851 this.taskFunctions.set(name, fn)
852 return opResult
853 }
854
855 /** @inheritDoc */
856 public async removeTaskFunction (name: string): Promise<boolean> {
857 if (!this.taskFunctions.has(name)) {
858 throw new Error(
859 'Cannot remove a task function not handled on the pool side'
860 )
861 }
862 const opResult = await this.sendTaskFunctionOperationToWorkers({
863 taskFunctionOperation: 'remove',
864 taskFunctionName: name
865 })
866 this.deleteTaskFunctionWorkerUsages(name)
867 this.taskFunctions.delete(name)
868 return opResult
869 }
870
871 /** @inheritDoc */
872 public listTaskFunctionNames (): string[] {
873 for (const workerNode of this.workerNodes) {
874 if (
875 Array.isArray(workerNode.info.taskFunctionNames) &&
876 workerNode.info.taskFunctionNames.length > 0
877 ) {
878 return workerNode.info.taskFunctionNames
879 }
880 }
881 return []
882 }
883
884 /** @inheritDoc */
885 public async setDefaultTaskFunction (name: string): Promise<boolean> {
886 return await this.sendTaskFunctionOperationToWorkers({
887 taskFunctionOperation: 'default',
888 taskFunctionName: name
889 })
890 }
891
892 private deleteTaskFunctionWorkerUsages (name: string): void {
893 for (const workerNode of this.workerNodes) {
894 workerNode.deleteTaskFunctionWorkerUsage(name)
895 }
896 }
897
898 private shallExecuteTask (workerNodeKey: number): boolean {
899 return (
900 this.tasksQueueSize(workerNodeKey) === 0 &&
901 this.workerNodes[workerNodeKey].usage.tasks.executing <
902 (this.opts.tasksQueueOptions?.concurrency as number)
903 )
904 }
905
906 /** @inheritDoc */
907 public async execute (
908 data?: Data,
909 name?: string,
910 transferList?: TransferListItem[]
911 ): Promise<Response> {
912 return await new Promise<Response>((resolve, reject) => {
913 if (!this.started) {
914 reject(new Error('Cannot execute a task on not started pool'))
915 return
916 }
917 if (this.destroying) {
918 reject(new Error('Cannot execute a task on destroying pool'))
919 return
920 }
921 if (name != null && typeof name !== 'string') {
922 reject(new TypeError('name argument must be a string'))
923 return
924 }
925 if (
926 name != null &&
927 typeof name === 'string' &&
928 name.trim().length === 0
929 ) {
930 reject(new TypeError('name argument must not be an empty string'))
931 return
932 }
933 if (transferList != null && !Array.isArray(transferList)) {
934 reject(new TypeError('transferList argument must be an array'))
935 return
936 }
937 const timestamp = performance.now()
938 const workerNodeKey = this.chooseWorkerNode()
939 const task: Task<Data> = {
940 name: name ?? DEFAULT_TASK_NAME,
941 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
942 data: data ?? ({} as Data),
943 transferList,
944 timestamp,
945 taskId: randomUUID()
946 }
947 this.promiseResponseMap.set(task.taskId as string, {
948 resolve,
949 reject,
950 workerNodeKey,
951 ...(this.emitter != null && {
952 asyncResource: new AsyncResource('poolifier:task', {
953 triggerAsyncId: this.emitter.asyncId,
954 requireManualDestroy: true
955 })
956 })
957 })
958 if (
959 this.opts.enableTasksQueue === false ||
960 (this.opts.enableTasksQueue === true &&
961 this.shallExecuteTask(workerNodeKey))
962 ) {
963 this.executeTask(workerNodeKey, task)
964 } else {
965 this.enqueueTask(workerNodeKey, task)
966 }
967 })
968 }
969
970 /** @inheritdoc */
971 public start (): void {
972 if (this.started) {
973 throw new Error('Cannot start an already started pool')
974 }
975 if (this.starting) {
976 throw new Error('Cannot start an already starting pool')
977 }
978 if (this.destroying) {
979 throw new Error('Cannot start a destroying pool')
980 }
981 this.starting = true
982 while (
983 this.workerNodes.reduce(
984 (accumulator, workerNode) =>
985 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
986 0
987 ) < this.numberOfWorkers
988 ) {
989 this.createAndSetupWorkerNode()
990 }
991 this.starting = false
992 this.started = true
993 }
994
995 /** @inheritDoc */
996 public async destroy (): Promise<void> {
997 if (!this.started) {
998 throw new Error('Cannot destroy an already destroyed pool')
999 }
1000 if (this.starting) {
1001 throw new Error('Cannot destroy an starting pool')
1002 }
1003 if (this.destroying) {
1004 throw new Error('Cannot destroy an already destroying pool')
1005 }
1006 this.destroying = true
1007 await Promise.all(
1008 this.workerNodes.map(async (_workerNode, workerNodeKey) => {
1009 await this.destroyWorkerNode(workerNodeKey)
1010 })
1011 )
1012 this.emitter?.emit(PoolEvents.destroy, this.info)
1013 this.emitter?.emitDestroy()
1014 this.emitter?.removeAllListeners()
1015 this.readyEventEmitted = false
1016 this.destroying = false
1017 this.started = false
1018 }
1019
1020 protected async sendKillMessageToWorker (
1021 workerNodeKey: number
1022 ): Promise<void> {
1023 await new Promise<void>((resolve, reject) => {
1024 const killMessageListener = (message: MessageValue<Response>): void => {
1025 this.checkMessageWorkerId(message)
1026 if (message.kill === 'success') {
1027 resolve()
1028 } else if (message.kill === 'failure') {
1029 reject(
1030 new Error(
1031 `Kill message handling failed on worker ${
1032 message.workerId as number
1033 }`
1034 )
1035 )
1036 }
1037 }
1038 // FIXME: should be registered only once
1039 this.registerWorkerMessageListener(workerNodeKey, killMessageListener)
1040 this.sendToWorker(workerNodeKey, { kill: true })
1041 })
1042 }
1043
1044 /**
1045 * Terminates the worker node given its worker node key.
1046 *
1047 * @param workerNodeKey - The worker node key.
1048 */
1049 protected async destroyWorkerNode (workerNodeKey: number): Promise<void> {
1050 this.flagWorkerNodeAsNotReady(workerNodeKey)
1051 const flushedTasks = this.flushTasksQueue(workerNodeKey)
1052 const workerNode = this.workerNodes[workerNodeKey]
1053 await waitWorkerNodeEvents(workerNode, 'taskFinished', flushedTasks)
1054 await this.sendKillMessageToWorker(workerNodeKey)
1055 await workerNode.terminate()
1056 }
1057
1058 /**
1059 * Setup hook to execute code before worker nodes are created in the abstract constructor.
1060 * Can be overridden.
1061 *
1062 * @virtual
1063 */
1064 protected setupHook (): void {
1065 /* Intentionally empty */
1066 }
1067
1068 /**
1069 * Should return whether the worker is the main worker or not.
1070 */
1071 protected abstract isMain (): boolean
1072
1073 /**
1074 * Hook executed before the worker task execution.
1075 * Can be overridden.
1076 *
1077 * @param workerNodeKey - The worker node key.
1078 * @param task - The task to execute.
1079 */
1080 protected beforeTaskExecutionHook (
1081 workerNodeKey: number,
1082 task: Task<Data>
1083 ): void {
1084 if (this.workerNodes[workerNodeKey]?.usage != null) {
1085 const workerUsage = this.workerNodes[workerNodeKey].usage
1086 ++workerUsage.tasks.executing
1087 updateWaitTimeWorkerUsage(
1088 this.workerChoiceStrategyContext,
1089 workerUsage,
1090 task
1091 )
1092 }
1093 if (
1094 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1095 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
1096 task.name as string
1097 ) != null
1098 ) {
1099 const taskFunctionWorkerUsage = this.workerNodes[
1100 workerNodeKey
1101 ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
1102 ++taskFunctionWorkerUsage.tasks.executing
1103 updateWaitTimeWorkerUsage(
1104 this.workerChoiceStrategyContext,
1105 taskFunctionWorkerUsage,
1106 task
1107 )
1108 }
1109 }
1110
1111 /**
1112 * Hook executed after the worker task execution.
1113 * Can be overridden.
1114 *
1115 * @param workerNodeKey - The worker node key.
1116 * @param message - The received message.
1117 */
1118 protected afterTaskExecutionHook (
1119 workerNodeKey: number,
1120 message: MessageValue<Response>
1121 ): void {
1122 let needWorkerChoiceStrategyUpdate = false
1123 if (this.workerNodes[workerNodeKey]?.usage != null) {
1124 const workerUsage = this.workerNodes[workerNodeKey].usage
1125 updateTaskStatisticsWorkerUsage(workerUsage, message)
1126 updateRunTimeWorkerUsage(
1127 this.workerChoiceStrategyContext,
1128 workerUsage,
1129 message
1130 )
1131 updateEluWorkerUsage(
1132 this.workerChoiceStrategyContext,
1133 workerUsage,
1134 message
1135 )
1136 needWorkerChoiceStrategyUpdate = true
1137 }
1138 if (
1139 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1140 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
1141 message.taskPerformance?.name as string
1142 ) != null
1143 ) {
1144 const taskFunctionWorkerUsage = this.workerNodes[
1145 workerNodeKey
1146 ].getTaskFunctionWorkerUsage(
1147 message.taskPerformance?.name as string
1148 ) as WorkerUsage
1149 updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
1150 updateRunTimeWorkerUsage(
1151 this.workerChoiceStrategyContext,
1152 taskFunctionWorkerUsage,
1153 message
1154 )
1155 updateEluWorkerUsage(
1156 this.workerChoiceStrategyContext,
1157 taskFunctionWorkerUsage,
1158 message
1159 )
1160 needWorkerChoiceStrategyUpdate = true
1161 }
1162 if (needWorkerChoiceStrategyUpdate) {
1163 this.workerChoiceStrategyContext.update(workerNodeKey)
1164 }
1165 }
1166
1167 /**
1168 * Whether the worker node shall update its task function worker usage or not.
1169 *
1170 * @param workerNodeKey - The worker node key.
1171 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1172 */
1173 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
1174 const workerInfo = this.getWorkerInfo(workerNodeKey)
1175 return (
1176 workerInfo != null &&
1177 Array.isArray(workerInfo.taskFunctionNames) &&
1178 workerInfo.taskFunctionNames.length > 2
1179 )
1180 }
1181
1182 /**
1183 * Chooses a worker node for the next task.
1184 *
1185 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
1186 *
1187 * @returns The chosen worker node key
1188 */
1189 private chooseWorkerNode (): number {
1190 if (this.shallCreateDynamicWorker()) {
1191 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
1192 if (
1193 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1194 ) {
1195 return workerNodeKey
1196 }
1197 }
1198 return this.workerChoiceStrategyContext.execute()
1199 }
1200
1201 /**
1202 * Conditions for dynamic worker creation.
1203 *
1204 * @returns Whether to create a dynamic worker or not.
1205 */
1206 private shallCreateDynamicWorker (): boolean {
1207 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
1208 }
1209
1210 /**
1211 * Sends a message to worker given its worker node key.
1212 *
1213 * @param workerNodeKey - The worker node key.
1214 * @param message - The message.
1215 * @param transferList - The optional array of transferable objects.
1216 */
1217 protected abstract sendToWorker (
1218 workerNodeKey: number,
1219 message: MessageValue<Data>,
1220 transferList?: TransferListItem[]
1221 ): void
1222
1223 /**
1224 * Creates a new, completely set up worker node.
1225 *
1226 * @returns New, completely set up worker node key.
1227 */
1228 protected createAndSetupWorkerNode (): number {
1229 const workerNode = this.createWorkerNode()
1230 workerNode.registerWorkerEventHandler(
1231 'online',
1232 this.opts.onlineHandler ?? EMPTY_FUNCTION
1233 )
1234 workerNode.registerWorkerEventHandler(
1235 'message',
1236 this.opts.messageHandler ?? EMPTY_FUNCTION
1237 )
1238 workerNode.registerWorkerEventHandler(
1239 'error',
1240 this.opts.errorHandler ?? EMPTY_FUNCTION
1241 )
1242 workerNode.registerWorkerEventHandler('error', (error: Error) => {
1243 workerNode.info.ready = false
1244 this.emitter?.emit(PoolEvents.error, error)
1245 if (
1246 this.started &&
1247 !this.starting &&
1248 !this.destroying &&
1249 this.opts.restartWorkerOnError === true
1250 ) {
1251 if (workerNode.info.dynamic) {
1252 this.createAndSetupDynamicWorkerNode()
1253 } else {
1254 this.createAndSetupWorkerNode()
1255 }
1256 }
1257 if (this.started && this.opts.enableTasksQueue === true) {
1258 this.redistributeQueuedTasks(this.workerNodes.indexOf(workerNode))
1259 }
1260 workerNode.terminate().catch(error => {
1261 this.emitter?.emit(PoolEvents.error, error)
1262 })
1263 })
1264 workerNode.registerWorkerEventHandler(
1265 'exit',
1266 this.opts.exitHandler ?? EMPTY_FUNCTION
1267 )
1268 workerNode.registerOnceWorkerEventHandler('exit', () => {
1269 this.removeWorkerNode(workerNode)
1270 })
1271 const workerNodeKey = this.addWorkerNode(workerNode)
1272 this.afterWorkerNodeSetup(workerNodeKey)
1273 return workerNodeKey
1274 }
1275
1276 /**
1277 * Creates a new, completely set up dynamic worker node.
1278 *
1279 * @returns New, completely set up dynamic worker node key.
1280 */
1281 protected createAndSetupDynamicWorkerNode (): number {
1282 const workerNodeKey = this.createAndSetupWorkerNode()
1283 this.registerWorkerMessageListener(workerNodeKey, message => {
1284 this.checkMessageWorkerId(message)
1285 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1286 message.workerId
1287 )
1288 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
1289 // Kill message received from worker
1290 if (
1291 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1292 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
1293 ((this.opts.enableTasksQueue === false &&
1294 workerUsage.tasks.executing === 0) ||
1295 (this.opts.enableTasksQueue === true &&
1296 workerUsage.tasks.executing === 0 &&
1297 this.tasksQueueSize(localWorkerNodeKey) === 0)))
1298 ) {
1299 // Flag the worker node as not ready immediately
1300 this.flagWorkerNodeAsNotReady(localWorkerNodeKey)
1301 this.destroyWorkerNode(localWorkerNodeKey).catch(error => {
1302 this.emitter?.emit(PoolEvents.error, error)
1303 })
1304 }
1305 })
1306 const workerInfo = this.getWorkerInfo(workerNodeKey)
1307 this.sendToWorker(workerNodeKey, {
1308 checkActive: true
1309 })
1310 if (this.taskFunctions.size > 0) {
1311 for (const [taskFunctionName, taskFunction] of this.taskFunctions) {
1312 this.sendTaskFunctionOperationToWorker(workerNodeKey, {
1313 taskFunctionOperation: 'add',
1314 taskFunctionName,
1315 taskFunction: taskFunction.toString()
1316 }).catch(error => {
1317 this.emitter?.emit(PoolEvents.error, error)
1318 })
1319 }
1320 }
1321 workerInfo.dynamic = true
1322 if (
1323 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
1324 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1325 ) {
1326 workerInfo.ready = true
1327 }
1328 this.checkAndEmitDynamicWorkerCreationEvents()
1329 return workerNodeKey
1330 }
1331
1332 /**
1333 * Registers a listener callback on the worker given its worker node key.
1334 *
1335 * @param workerNodeKey - The worker node key.
1336 * @param listener - The message listener callback.
1337 */
1338 protected abstract registerWorkerMessageListener<
1339 Message extends Data | Response
1340 >(
1341 workerNodeKey: number,
1342 listener: (message: MessageValue<Message>) => void
1343 ): void
1344
1345 /**
1346 * Registers once a listener callback on the worker given its worker node key.
1347 *
1348 * @param workerNodeKey - The worker node key.
1349 * @param listener - The message listener callback.
1350 */
1351 protected abstract registerOnceWorkerMessageListener<
1352 Message extends Data | Response
1353 >(
1354 workerNodeKey: number,
1355 listener: (message: MessageValue<Message>) => void
1356 ): void
1357
1358 /**
1359 * Deregisters a listener callback on the worker given its worker node key.
1360 *
1361 * @param workerNodeKey - The worker node key.
1362 * @param listener - The message listener callback.
1363 */
1364 protected abstract deregisterWorkerMessageListener<
1365 Message extends Data | Response
1366 >(
1367 workerNodeKey: number,
1368 listener: (message: MessageValue<Message>) => void
1369 ): void
1370
1371 /**
1372 * Method hooked up after a worker node has been newly created.
1373 * Can be overridden.
1374 *
1375 * @param workerNodeKey - The newly created worker node key.
1376 */
1377 protected afterWorkerNodeSetup (workerNodeKey: number): void {
1378 // Listen to worker messages.
1379 this.registerWorkerMessageListener(
1380 workerNodeKey,
1381 this.workerMessageListener
1382 )
1383 // Send the startup message to worker.
1384 this.sendStartupMessageToWorker(workerNodeKey)
1385 // Send the statistics message to worker.
1386 this.sendStatisticsMessageToWorker(workerNodeKey)
1387 if (this.opts.enableTasksQueue === true) {
1388 if (this.opts.tasksQueueOptions?.taskStealing === true) {
1389 this.workerNodes[workerNodeKey].on(
1390 'idleWorkerNode',
1391 this.handleIdleWorkerNodeEvent
1392 )
1393 }
1394 if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
1395 this.workerNodes[workerNodeKey].on(
1396 'backPressure',
1397 this.handleBackPressureEvent
1398 )
1399 }
1400 }
1401 }
1402
1403 /**
1404 * Sends the startup message to worker given its worker node key.
1405 *
1406 * @param workerNodeKey - The worker node key.
1407 */
1408 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1409
1410 /**
1411 * Sends the statistics message to worker given its worker node key.
1412 *
1413 * @param workerNodeKey - The worker node key.
1414 */
1415 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
1416 this.sendToWorker(workerNodeKey, {
1417 statistics: {
1418 runTime:
1419 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1420 .runTime.aggregate,
1421 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1422 .elu.aggregate
1423 }
1424 })
1425 }
1426
1427 private handleTask (workerNodeKey: number, task: Task<Data>): void {
1428 if (this.shallExecuteTask(workerNodeKey)) {
1429 this.executeTask(workerNodeKey, task)
1430 } else {
1431 this.enqueueTask(workerNodeKey, task)
1432 }
1433 }
1434
1435 private redistributeQueuedTasks (workerNodeKey: number): void {
1436 if (this.workerNodes.length <= 1) {
1437 return
1438 }
1439 while (this.tasksQueueSize(workerNodeKey) > 0) {
1440 const destinationWorkerNodeKey = this.workerNodes.reduce(
1441 (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
1442 return workerNode.info.ready &&
1443 workerNode.usage.tasks.queued <
1444 workerNodes[minWorkerNodeKey].usage.tasks.queued
1445 ? workerNodeKey
1446 : minWorkerNodeKey
1447 },
1448 0
1449 )
1450 this.handleTask(
1451 destinationWorkerNodeKey,
1452 this.dequeueTask(workerNodeKey) as Task<Data>
1453 )
1454 }
1455 }
1456
1457 private updateTaskStolenStatisticsWorkerUsage (
1458 workerNodeKey: number,
1459 taskName: string
1460 ): void {
1461 const workerNode = this.workerNodes[workerNodeKey]
1462 if (workerNode?.usage != null) {
1463 ++workerNode.usage.tasks.stolen
1464 }
1465 if (
1466 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1467 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1468 ) {
1469 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1470 taskName
1471 ) as WorkerUsage
1472 ++taskFunctionWorkerUsage.tasks.stolen
1473 }
1474 }
1475
1476 private updateTaskSequentiallyStolenStatisticsWorkerUsage (
1477 workerNodeKey: number
1478 ): void {
1479 const workerNode = this.workerNodes[workerNodeKey]
1480 if (workerNode?.usage != null) {
1481 ++workerNode.usage.tasks.sequentiallyStolen
1482 }
1483 }
1484
1485 private updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1486 workerNodeKey: number,
1487 taskName: string
1488 ): void {
1489 const workerNode = this.workerNodes[workerNodeKey]
1490 if (
1491 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1492 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1493 ) {
1494 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1495 taskName
1496 ) as WorkerUsage
1497 ++taskFunctionWorkerUsage.tasks.sequentiallyStolen
1498 }
1499 }
1500
1501 private resetTaskSequentiallyStolenStatisticsWorkerUsage (
1502 workerNodeKey: number
1503 ): void {
1504 const workerNode = this.workerNodes[workerNodeKey]
1505 if (workerNode?.usage != null) {
1506 workerNode.usage.tasks.sequentiallyStolen = 0
1507 }
1508 }
1509
1510 private resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1511 workerNodeKey: number,
1512 taskName: string
1513 ): void {
1514 const workerNode = this.workerNodes[workerNodeKey]
1515 if (
1516 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1517 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1518 ) {
1519 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1520 taskName
1521 ) as WorkerUsage
1522 taskFunctionWorkerUsage.tasks.sequentiallyStolen = 0
1523 }
1524 }
1525
1526 private readonly handleIdleWorkerNodeEvent = (
1527 eventDetail: WorkerNodeEventDetail,
1528 previousStolenTask?: Task<Data>
1529 ): void => {
1530 if (this.workerNodes.length <= 1) {
1531 return
1532 }
1533 const { workerNodeKey } = eventDetail
1534 if (workerNodeKey == null) {
1535 throw new Error(
1536 'WorkerNode event detail workerNodeKey attribute must be defined'
1537 )
1538 }
1539 const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks
1540 if (
1541 previousStolenTask != null &&
1542 workerNodeTasksUsage.sequentiallyStolen > 0 &&
1543 (workerNodeTasksUsage.executing > 0 ||
1544 this.tasksQueueSize(workerNodeKey) > 0)
1545 ) {
1546 for (const taskName of this.workerNodes[workerNodeKey].info
1547 .taskFunctionNames as string[]) {
1548 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1549 workerNodeKey,
1550 taskName
1551 )
1552 }
1553 this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
1554 return
1555 }
1556 const stolenTask = this.workerNodeStealTask(workerNodeKey)
1557 if (
1558 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1559 stolenTask != null
1560 ) {
1561 const taskFunctionTasksWorkerUsage = this.workerNodes[
1562 workerNodeKey
1563 ].getTaskFunctionWorkerUsage(stolenTask.name as string)
1564 ?.tasks as TaskStatistics
1565 if (
1566 taskFunctionTasksWorkerUsage.sequentiallyStolen === 0 ||
1567 (previousStolenTask != null &&
1568 previousStolenTask.name === stolenTask.name &&
1569 taskFunctionTasksWorkerUsage.sequentiallyStolen > 0)
1570 ) {
1571 this.updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1572 workerNodeKey,
1573 stolenTask.name as string
1574 )
1575 } else {
1576 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1577 workerNodeKey,
1578 stolenTask.name as string
1579 )
1580 }
1581 }
1582 sleep(exponentialDelay(workerNodeTasksUsage.sequentiallyStolen))
1583 .then(() => {
1584 this.handleIdleWorkerNodeEvent(eventDetail, stolenTask)
1585 return undefined
1586 })
1587 .catch(EMPTY_FUNCTION)
1588 }
1589
1590 private readonly workerNodeStealTask = (
1591 workerNodeKey: number
1592 ): Task<Data> | undefined => {
1593 const workerNodes = this.workerNodes
1594 .slice()
1595 .sort(
1596 (workerNodeA, workerNodeB) =>
1597 workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
1598 )
1599 const sourceWorkerNode = workerNodes.find(
1600 (sourceWorkerNode, sourceWorkerNodeKey) =>
1601 sourceWorkerNode.info.ready &&
1602 sourceWorkerNodeKey !== workerNodeKey &&
1603 sourceWorkerNode.usage.tasks.queued > 0
1604 )
1605 if (sourceWorkerNode != null) {
1606 const task = sourceWorkerNode.popTask() as Task<Data>
1607 this.handleTask(workerNodeKey, task)
1608 this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
1609 this.updateTaskStolenStatisticsWorkerUsage(
1610 workerNodeKey,
1611 task.name as string
1612 )
1613 return task
1614 }
1615 }
1616
1617 private readonly handleBackPressureEvent = (
1618 eventDetail: WorkerNodeEventDetail
1619 ): void => {
1620 if (this.workerNodes.length <= 1) {
1621 return
1622 }
1623 const { workerId } = eventDetail
1624 const sizeOffset = 1
1625 if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) {
1626 return
1627 }
1628 const sourceWorkerNode =
1629 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
1630 const workerNodes = this.workerNodes
1631 .slice()
1632 .sort(
1633 (workerNodeA, workerNodeB) =>
1634 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1635 )
1636 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1637 if (
1638 sourceWorkerNode.usage.tasks.queued > 0 &&
1639 workerNode.info.ready &&
1640 workerNode.info.id !== workerId &&
1641 workerNode.usage.tasks.queued <
1642 (this.opts.tasksQueueOptions?.size as number) - sizeOffset
1643 ) {
1644 const task = sourceWorkerNode.popTask() as Task<Data>
1645 this.handleTask(workerNodeKey, task)
1646 this.updateTaskStolenStatisticsWorkerUsage(
1647 workerNodeKey,
1648 task.name as string
1649 )
1650 }
1651 }
1652 }
1653
1654 /**
1655 * This method is the message listener registered on each worker.
1656 */
1657 protected readonly workerMessageListener = (
1658 message: MessageValue<Response>
1659 ): void => {
1660 this.checkMessageWorkerId(message)
1661 const { workerId, ready, taskId, taskFunctionNames } = message
1662 if (ready != null && taskFunctionNames != null) {
1663 // Worker ready response received from worker
1664 this.handleWorkerReadyResponse(message)
1665 } else if (taskId != null) {
1666 // Task execution response received from worker
1667 this.handleTaskExecutionResponse(message)
1668 } else if (taskFunctionNames != null) {
1669 // Task function names message received from worker
1670 this.getWorkerInfo(
1671 this.getWorkerNodeKeyByWorkerId(workerId)
1672 ).taskFunctionNames = taskFunctionNames
1673 }
1674 }
1675
1676 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
1677 const { workerId, ready, taskFunctionNames } = message
1678 if (ready === false) {
1679 throw new Error(`Worker ${workerId as number} failed to initialize`)
1680 }
1681 const workerInfo = this.getWorkerInfo(
1682 this.getWorkerNodeKeyByWorkerId(workerId)
1683 )
1684 workerInfo.ready = ready as boolean
1685 workerInfo.taskFunctionNames = taskFunctionNames
1686 if (!this.readyEventEmitted && this.ready) {
1687 this.readyEventEmitted = true
1688 this.emitter?.emit(PoolEvents.ready, this.info)
1689 }
1690 }
1691
1692 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1693 const { workerId, taskId, workerError, data } = message
1694 const promiseResponse = this.promiseResponseMap.get(taskId as string)
1695 if (promiseResponse != null) {
1696 const { resolve, reject, workerNodeKey, asyncResource } = promiseResponse
1697 const workerNode = this.workerNodes[workerNodeKey]
1698 if (workerError != null) {
1699 this.emitter?.emit(PoolEvents.taskError, workerError)
1700 asyncResource != null
1701 ? asyncResource.runInAsyncScope(
1702 reject,
1703 this.emitter,
1704 workerError.message
1705 )
1706 : reject(workerError.message)
1707 } else {
1708 asyncResource != null
1709 ? asyncResource.runInAsyncScope(resolve, this.emitter, data)
1710 : resolve(data as Response)
1711 }
1712 asyncResource?.emitDestroy()
1713 this.afterTaskExecutionHook(workerNodeKey, message)
1714 this.promiseResponseMap.delete(taskId as string)
1715 workerNode?.emit('taskFinished', taskId)
1716 if (this.opts.enableTasksQueue === true) {
1717 const workerNodeTasksUsage = workerNode.usage.tasks
1718 if (
1719 this.tasksQueueSize(workerNodeKey) > 0 &&
1720 workerNodeTasksUsage.executing <
1721 (this.opts.tasksQueueOptions?.concurrency as number)
1722 ) {
1723 this.executeTask(
1724 workerNodeKey,
1725 this.dequeueTask(workerNodeKey) as Task<Data>
1726 )
1727 }
1728 if (
1729 workerNodeTasksUsage.executing === 0 &&
1730 this.tasksQueueSize(workerNodeKey) === 0 &&
1731 workerNodeTasksUsage.sequentiallyStolen === 0
1732 ) {
1733 workerNode.emit('idleWorkerNode', {
1734 workerId: workerId as number,
1735 workerNodeKey
1736 })
1737 }
1738 }
1739 }
1740 }
1741
1742 private checkAndEmitTaskExecutionEvents (): void {
1743 if (this.busy) {
1744 this.emitter?.emit(PoolEvents.busy, this.info)
1745 }
1746 }
1747
1748 private checkAndEmitTaskQueuingEvents (): void {
1749 if (this.hasBackPressure()) {
1750 this.emitter?.emit(PoolEvents.backPressure, this.info)
1751 }
1752 }
1753
1754 private checkAndEmitDynamicWorkerCreationEvents (): void {
1755 if (this.type === PoolTypes.dynamic) {
1756 if (this.full) {
1757 this.emitter?.emit(PoolEvents.full, this.info)
1758 }
1759 }
1760 }
1761
1762 /**
1763 * Gets the worker information given its worker node key.
1764 *
1765 * @param workerNodeKey - The worker node key.
1766 * @returns The worker information.
1767 */
1768 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
1769 return this.workerNodes[workerNodeKey]?.info
1770 }
1771
1772 /**
1773 * Creates a worker node.
1774 *
1775 * @returns The created worker node.
1776 */
1777 private createWorkerNode (): IWorkerNode<Worker, Data> {
1778 const workerNode = new WorkerNode<Worker, Data>(
1779 this.worker,
1780 this.filePath,
1781 {
1782 env: this.opts.env,
1783 workerOptions: this.opts.workerOptions,
1784 tasksQueueBackPressureSize:
1785 this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2)
1786 }
1787 )
1788 // Flag the worker node as ready at pool startup.
1789 if (this.starting) {
1790 workerNode.info.ready = true
1791 }
1792 return workerNode
1793 }
1794
1795 /**
1796 * Adds the given worker node in the pool worker nodes.
1797 *
1798 * @param workerNode - The worker node.
1799 * @returns The added worker node key.
1800 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1801 */
1802 private addWorkerNode (workerNode: IWorkerNode<Worker, Data>): number {
1803 this.workerNodes.push(workerNode)
1804 const workerNodeKey = this.workerNodes.indexOf(workerNode)
1805 if (workerNodeKey === -1) {
1806 throw new Error('Worker added not found in worker nodes')
1807 }
1808 return workerNodeKey
1809 }
1810
1811 /**
1812 * Removes the worker node from the pool worker nodes.
1813 *
1814 * @param workerNode - The worker node.
1815 */
1816 private removeWorkerNode (workerNode: IWorkerNode<Worker, Data>): void {
1817 const workerNodeKey = this.workerNodes.indexOf(workerNode)
1818 if (workerNodeKey !== -1) {
1819 this.workerNodes.splice(workerNodeKey, 1)
1820 this.workerChoiceStrategyContext.remove(workerNodeKey)
1821 }
1822 }
1823
1824 protected flagWorkerNodeAsNotReady (workerNodeKey: number): void {
1825 this.getWorkerInfo(workerNodeKey).ready = false
1826 }
1827
1828 /** @inheritDoc */
1829 public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
1830 return (
1831 this.opts.enableTasksQueue === true &&
1832 this.workerNodes[workerNodeKey].hasBackPressure()
1833 )
1834 }
1835
1836 private hasBackPressure (): boolean {
1837 return (
1838 this.opts.enableTasksQueue === true &&
1839 this.workerNodes.findIndex(
1840 workerNode => !workerNode.hasBackPressure()
1841 ) === -1
1842 )
1843 }
1844
1845 /**
1846 * Executes the given task on the worker given its worker node key.
1847 *
1848 * @param workerNodeKey - The worker node key.
1849 * @param task - The task to execute.
1850 */
1851 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1852 this.beforeTaskExecutionHook(workerNodeKey, task)
1853 this.sendToWorker(workerNodeKey, task, task.transferList)
1854 this.checkAndEmitTaskExecutionEvents()
1855 }
1856
1857 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1858 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1859 this.checkAndEmitTaskQueuingEvents()
1860 return tasksQueueSize
1861 }
1862
1863 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1864 return this.workerNodes[workerNodeKey].dequeueTask()
1865 }
1866
1867 private tasksQueueSize (workerNodeKey: number): number {
1868 return this.workerNodes[workerNodeKey].tasksQueueSize()
1869 }
1870
1871 protected flushTasksQueue (workerNodeKey: number): number {
1872 let flushedTasks = 0
1873 while (this.tasksQueueSize(workerNodeKey) > 0) {
1874 this.executeTask(
1875 workerNodeKey,
1876 this.dequeueTask(workerNodeKey) as Task<Data>
1877 )
1878 ++flushedTasks
1879 }
1880 this.workerNodes[workerNodeKey].clearTasksQueue()
1881 return flushedTasks
1882 }
1883
1884 private flushTasksQueues (): void {
1885 for (const [workerNodeKey] of this.workerNodes.entries()) {
1886 this.flushTasksQueue(workerNodeKey)
1887 }
1888 }
1889 }