Merge branch 'master' into feature/task-functions
[poolifier.git] / src / pools / abstract-pool.ts
1 import { randomUUID } from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import { existsSync } from 'node:fs'
4 import { type TransferListItem } from 'node:worker_threads'
5 import type {
6 MessageValue,
7 PromiseResponseWrapper,
8 Task
9 } from '../utility-types'
10 import {
11 DEFAULT_TASK_NAME,
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
13 EMPTY_FUNCTION,
14 average,
15 isKillBehavior,
16 isPlainObject,
17 max,
18 median,
19 min,
20 round,
21 updateMeasurementStatistics
22 } from '../utils'
23 import { KillBehaviors } from '../worker/worker-options'
24 import type { TaskFunction } from '../worker/task-functions'
25 import {
26 type IPool,
27 PoolEmitter,
28 PoolEvents,
29 type PoolInfo,
30 type PoolOptions,
31 type PoolType,
32 PoolTypes,
33 type TasksQueueOptions
34 } from './pool'
35 import type {
36 IWorker,
37 IWorkerNode,
38 WorkerInfo,
39 WorkerType,
40 WorkerUsage
41 } from './worker'
42 import {
43 type MeasurementStatisticsRequirements,
44 Measurements,
45 WorkerChoiceStrategies,
46 type WorkerChoiceStrategy,
47 type WorkerChoiceStrategyOptions
48 } from './selection-strategies/selection-strategies-types'
49 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
50 import { version } from './version'
51 import { WorkerNode } from './worker-node'
52
53 /**
54 * Base class that implements some shared logic for all poolifier pools.
55 *
56 * @typeParam Worker - Type of worker which manages this pool.
57 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
58 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
59 */
60 export abstract class AbstractPool<
61 Worker extends IWorker,
62 Data = unknown,
63 Response = unknown
64 > implements IPool<Worker, Data, Response> {
65 /** @inheritDoc */
66 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
67
68 /** @inheritDoc */
69 public readonly emitter?: PoolEmitter
70
71 /**
72 * The task execution response promise map:
73 * - `key`: The message id of each submitted task.
74 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
75 *
76 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
77 */
78 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
79 new Map<string, PromiseResponseWrapper<Response>>()
80
81 /**
82 * Worker choice strategy context referencing a worker choice algorithm implementation.
83 */
84 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
85 Worker,
86 Data,
87 Response
88 >
89
90 /**
91 * Dynamic pool maximum size property placeholder.
92 */
93 protected readonly max?: number
94
95 /**
96 * The task functions added at runtime map:
97 * - `key`: The task function name.
98 * - `value`: The task function itself.
99 */
100 private readonly taskFunctions: Map<string, TaskFunction<Data, Response>>
101
102 /**
103 * Whether the pool is started or not.
104 */
105 private started: boolean
106 /**
107 * Whether the pool is starting or not.
108 */
109 private starting: boolean
110 /**
111 * The start timestamp of the pool.
112 */
113 private readonly startTimestamp
114
115 /**
116 * Constructs a new poolifier pool.
117 *
118 * @param numberOfWorkers - Number of workers that this pool should manage.
119 * @param filePath - Path to the worker file.
120 * @param opts - Options for the pool.
121 */
122 public constructor (
123 protected readonly numberOfWorkers: number,
124 protected readonly filePath: string,
125 protected readonly opts: PoolOptions<Worker>
126 ) {
127 if (!this.isMain()) {
128 throw new Error(
129 'Cannot start a pool from a worker with the same type as the pool'
130 )
131 }
132 this.checkNumberOfWorkers(this.numberOfWorkers)
133 this.checkFilePath(this.filePath)
134 this.checkPoolOptions(this.opts)
135
136 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
137 this.executeTask = this.executeTask.bind(this)
138 this.enqueueTask = this.enqueueTask.bind(this)
139
140 if (this.opts.enableEvents === true) {
141 this.emitter = new PoolEmitter()
142 }
143 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
144 Worker,
145 Data,
146 Response
147 >(
148 this,
149 this.opts.workerChoiceStrategy,
150 this.opts.workerChoiceStrategyOptions
151 )
152
153 this.setupHook()
154
155 this.taskFunctions = new Map<string, TaskFunction<Data, Response>>()
156
157 this.started = false
158 this.starting = false
159 if (this.opts.startWorkers === true) {
160 this.start()
161 }
162
163 this.startTimestamp = performance.now()
164 }
165
166 private checkFilePath (filePath: string): void {
167 if (
168 filePath == null ||
169 typeof filePath !== 'string' ||
170 (typeof filePath === 'string' && filePath.trim().length === 0)
171 ) {
172 throw new Error('Please specify a file with a worker implementation')
173 }
174 if (!existsSync(filePath)) {
175 throw new Error(`Cannot find the worker file '${filePath}'`)
176 }
177 }
178
179 private checkNumberOfWorkers (numberOfWorkers: number): void {
180 if (numberOfWorkers == null) {
181 throw new Error(
182 'Cannot instantiate a pool without specifying the number of workers'
183 )
184 } else if (!Number.isSafeInteger(numberOfWorkers)) {
185 throw new TypeError(
186 'Cannot instantiate a pool with a non safe integer number of workers'
187 )
188 } else if (numberOfWorkers < 0) {
189 throw new RangeError(
190 'Cannot instantiate a pool with a negative number of workers'
191 )
192 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
193 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
194 }
195 }
196
197 protected checkDynamicPoolSize (min: number, max: number): void {
198 if (this.type === PoolTypes.dynamic) {
199 if (max == null) {
200 throw new TypeError(
201 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
202 )
203 } else if (!Number.isSafeInteger(max)) {
204 throw new TypeError(
205 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
206 )
207 } else if (min > max) {
208 throw new RangeError(
209 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
210 )
211 } else if (max === 0) {
212 throw new RangeError(
213 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
214 )
215 } else if (min === max) {
216 throw new RangeError(
217 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
218 )
219 }
220 }
221 }
222
223 private checkPoolOptions (opts: PoolOptions<Worker>): void {
224 if (isPlainObject(opts)) {
225 this.opts.startWorkers = opts.startWorkers ?? true
226 this.checkValidWorkerChoiceStrategy(
227 opts.workerChoiceStrategy as WorkerChoiceStrategy
228 )
229 this.opts.workerChoiceStrategy =
230 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
231 this.checkValidWorkerChoiceStrategyOptions(
232 opts.workerChoiceStrategyOptions as WorkerChoiceStrategyOptions
233 )
234 this.opts.workerChoiceStrategyOptions = {
235 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
236 ...opts.workerChoiceStrategyOptions
237 }
238 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
239 this.opts.enableEvents = opts.enableEvents ?? true
240 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
241 if (this.opts.enableTasksQueue) {
242 this.checkValidTasksQueueOptions(
243 opts.tasksQueueOptions as TasksQueueOptions
244 )
245 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
246 opts.tasksQueueOptions as TasksQueueOptions
247 )
248 }
249 } else {
250 throw new TypeError('Invalid pool options: must be a plain object')
251 }
252 }
253
254 private checkValidWorkerChoiceStrategy (
255 workerChoiceStrategy: WorkerChoiceStrategy
256 ): void {
257 if (
258 workerChoiceStrategy != null &&
259 !Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)
260 ) {
261 throw new Error(
262 `Invalid worker choice strategy '${workerChoiceStrategy}'`
263 )
264 }
265 }
266
267 private checkValidWorkerChoiceStrategyOptions (
268 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
269 ): void {
270 if (
271 workerChoiceStrategyOptions != null &&
272 !isPlainObject(workerChoiceStrategyOptions)
273 ) {
274 throw new TypeError(
275 'Invalid worker choice strategy options: must be a plain object'
276 )
277 }
278 if (
279 workerChoiceStrategyOptions?.retries != null &&
280 !Number.isSafeInteger(workerChoiceStrategyOptions.retries)
281 ) {
282 throw new TypeError(
283 'Invalid worker choice strategy options: retries must be an integer'
284 )
285 }
286 if (
287 workerChoiceStrategyOptions?.retries != null &&
288 workerChoiceStrategyOptions.retries < 0
289 ) {
290 throw new RangeError(
291 `Invalid worker choice strategy options: retries '${workerChoiceStrategyOptions.retries}' must be greater or equal than zero`
292 )
293 }
294 if (
295 workerChoiceStrategyOptions?.weights != null &&
296 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
297 ) {
298 throw new Error(
299 'Invalid worker choice strategy options: must have a weight for each worker node'
300 )
301 }
302 if (
303 workerChoiceStrategyOptions?.measurement != null &&
304 !Object.values(Measurements).includes(
305 workerChoiceStrategyOptions.measurement
306 )
307 ) {
308 throw new Error(
309 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
310 )
311 }
312 }
313
314 private checkValidTasksQueueOptions (
315 tasksQueueOptions: TasksQueueOptions
316 ): void {
317 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
318 throw new TypeError('Invalid tasks queue options: must be a plain object')
319 }
320 if (
321 tasksQueueOptions?.concurrency != null &&
322 !Number.isSafeInteger(tasksQueueOptions.concurrency)
323 ) {
324 throw new TypeError(
325 'Invalid worker node tasks concurrency: must be an integer'
326 )
327 }
328 if (
329 tasksQueueOptions?.concurrency != null &&
330 tasksQueueOptions.concurrency <= 0
331 ) {
332 throw new RangeError(
333 `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero`
334 )
335 }
336 if (
337 tasksQueueOptions?.size != null &&
338 !Number.isSafeInteger(tasksQueueOptions.size)
339 ) {
340 throw new TypeError(
341 'Invalid worker node tasks queue size: must be an integer'
342 )
343 }
344 if (tasksQueueOptions?.size != null && tasksQueueOptions.size <= 0) {
345 throw new RangeError(
346 `Invalid worker node tasks queue size: ${tasksQueueOptions.size} is a negative integer or zero`
347 )
348 }
349 }
350
351 /** @inheritDoc */
352 public get info (): PoolInfo {
353 return {
354 version,
355 type: this.type,
356 worker: this.worker,
357 started: this.started,
358 ready: this.ready,
359 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
360 minSize: this.minSize,
361 maxSize: this.maxSize,
362 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
363 .runTime.aggregate &&
364 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
365 .waitTime.aggregate && { utilization: round(this.utilization) }),
366 workerNodes: this.workerNodes.length,
367 idleWorkerNodes: this.workerNodes.reduce(
368 (accumulator, workerNode) =>
369 workerNode.usage.tasks.executing === 0
370 ? accumulator + 1
371 : accumulator,
372 0
373 ),
374 busyWorkerNodes: this.workerNodes.reduce(
375 (accumulator, workerNode) =>
376 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
377 0
378 ),
379 executedTasks: this.workerNodes.reduce(
380 (accumulator, workerNode) =>
381 accumulator + workerNode.usage.tasks.executed,
382 0
383 ),
384 executingTasks: this.workerNodes.reduce(
385 (accumulator, workerNode) =>
386 accumulator + workerNode.usage.tasks.executing,
387 0
388 ),
389 ...(this.opts.enableTasksQueue === true && {
390 queuedTasks: this.workerNodes.reduce(
391 (accumulator, workerNode) =>
392 accumulator + workerNode.usage.tasks.queued,
393 0
394 )
395 }),
396 ...(this.opts.enableTasksQueue === true && {
397 maxQueuedTasks: this.workerNodes.reduce(
398 (accumulator, workerNode) =>
399 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
400 0
401 )
402 }),
403 ...(this.opts.enableTasksQueue === true && {
404 backPressure: this.hasBackPressure()
405 }),
406 ...(this.opts.enableTasksQueue === true && {
407 stolenTasks: this.workerNodes.reduce(
408 (accumulator, workerNode) =>
409 accumulator + workerNode.usage.tasks.stolen,
410 0
411 )
412 }),
413 failedTasks: this.workerNodes.reduce(
414 (accumulator, workerNode) =>
415 accumulator + workerNode.usage.tasks.failed,
416 0
417 ),
418 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
419 .runTime.aggregate && {
420 runTime: {
421 minimum: round(
422 min(
423 ...this.workerNodes.map(
424 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
425 )
426 )
427 ),
428 maximum: round(
429 max(
430 ...this.workerNodes.map(
431 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
432 )
433 )
434 ),
435 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
436 .runTime.average && {
437 average: round(
438 average(
439 this.workerNodes.reduce<number[]>(
440 (accumulator, workerNode) =>
441 accumulator.concat(workerNode.usage.runTime.history),
442 []
443 )
444 )
445 )
446 }),
447 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
448 .runTime.median && {
449 median: round(
450 median(
451 this.workerNodes.reduce<number[]>(
452 (accumulator, workerNode) =>
453 accumulator.concat(workerNode.usage.runTime.history),
454 []
455 )
456 )
457 )
458 })
459 }
460 }),
461 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
462 .waitTime.aggregate && {
463 waitTime: {
464 minimum: round(
465 min(
466 ...this.workerNodes.map(
467 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
468 )
469 )
470 ),
471 maximum: round(
472 max(
473 ...this.workerNodes.map(
474 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
475 )
476 )
477 ),
478 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
479 .waitTime.average && {
480 average: round(
481 average(
482 this.workerNodes.reduce<number[]>(
483 (accumulator, workerNode) =>
484 accumulator.concat(workerNode.usage.waitTime.history),
485 []
486 )
487 )
488 )
489 }),
490 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
491 .waitTime.median && {
492 median: round(
493 median(
494 this.workerNodes.reduce<number[]>(
495 (accumulator, workerNode) =>
496 accumulator.concat(workerNode.usage.waitTime.history),
497 []
498 )
499 )
500 )
501 })
502 }
503 })
504 }
505 }
506
507 /**
508 * The pool readiness boolean status.
509 */
510 private get ready (): boolean {
511 return (
512 this.workerNodes.reduce(
513 (accumulator, workerNode) =>
514 !workerNode.info.dynamic && workerNode.info.ready
515 ? accumulator + 1
516 : accumulator,
517 0
518 ) >= this.minSize
519 )
520 }
521
522 /**
523 * The approximate pool utilization.
524 *
525 * @returns The pool utilization.
526 */
527 private get utilization (): number {
528 const poolTimeCapacity =
529 (performance.now() - this.startTimestamp) * this.maxSize
530 const totalTasksRunTime = this.workerNodes.reduce(
531 (accumulator, workerNode) =>
532 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
533 0
534 )
535 const totalTasksWaitTime = this.workerNodes.reduce(
536 (accumulator, workerNode) =>
537 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
538 0
539 )
540 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
541 }
542
543 /**
544 * The pool type.
545 *
546 * If it is `'dynamic'`, it provides the `max` property.
547 */
548 protected abstract get type (): PoolType
549
550 /**
551 * The worker type.
552 */
553 protected abstract get worker (): WorkerType
554
555 /**
556 * The pool minimum size.
557 */
558 protected get minSize (): number {
559 return this.numberOfWorkers
560 }
561
562 /**
563 * The pool maximum size.
564 */
565 protected get maxSize (): number {
566 return this.max ?? this.numberOfWorkers
567 }
568
569 /**
570 * Checks if the worker id sent in the received message from a worker is valid.
571 *
572 * @param message - The received message.
573 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
574 */
575 private checkMessageWorkerId (message: MessageValue<Response>): void {
576 if (message.workerId == null) {
577 throw new Error('Worker message received without worker id')
578 } else if (
579 message.workerId != null &&
580 this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
581 ) {
582 throw new Error(
583 `Worker message received from unknown worker '${message.workerId}'`
584 )
585 }
586 }
587
588 /**
589 * Gets the given worker its worker node key.
590 *
591 * @param worker - The worker.
592 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
593 */
594 private getWorkerNodeKeyByWorker (worker: Worker): number {
595 return this.workerNodes.findIndex(
596 workerNode => workerNode.worker === worker
597 )
598 }
599
600 /**
601 * Gets the worker node key given its worker id.
602 *
603 * @param workerId - The worker id.
604 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
605 */
606 private getWorkerNodeKeyByWorkerId (workerId: number | undefined): number {
607 return this.workerNodes.findIndex(
608 workerNode => workerNode.info.id === workerId
609 )
610 }
611
612 /** @inheritDoc */
613 public setWorkerChoiceStrategy (
614 workerChoiceStrategy: WorkerChoiceStrategy,
615 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
616 ): void {
617 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
618 this.opts.workerChoiceStrategy = workerChoiceStrategy
619 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
620 this.opts.workerChoiceStrategy
621 )
622 if (workerChoiceStrategyOptions != null) {
623 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
624 }
625 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
626 workerNode.resetUsage()
627 this.sendStatisticsMessageToWorker(workerNodeKey)
628 }
629 }
630
631 /** @inheritDoc */
632 public setWorkerChoiceStrategyOptions (
633 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
634 ): void {
635 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
636 this.opts.workerChoiceStrategyOptions = {
637 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
638 ...workerChoiceStrategyOptions
639 }
640 this.workerChoiceStrategyContext.setOptions(
641 this.opts.workerChoiceStrategyOptions
642 )
643 }
644
645 /** @inheritDoc */
646 public enableTasksQueue (
647 enable: boolean,
648 tasksQueueOptions?: TasksQueueOptions
649 ): void {
650 if (this.opts.enableTasksQueue === true && !enable) {
651 this.unsetTaskStealing()
652 this.unsetTasksStealingOnBackPressure()
653 this.flushTasksQueues()
654 }
655 this.opts.enableTasksQueue = enable
656 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
657 }
658
659 /** @inheritDoc */
660 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
661 if (this.opts.enableTasksQueue === true) {
662 this.checkValidTasksQueueOptions(tasksQueueOptions)
663 this.opts.tasksQueueOptions =
664 this.buildTasksQueueOptions(tasksQueueOptions)
665 this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number)
666 if (this.opts.tasksQueueOptions.taskStealing === true) {
667 this.setTaskStealing()
668 } else {
669 this.unsetTaskStealing()
670 }
671 if (this.opts.tasksQueueOptions.tasksStealingOnBackPressure === true) {
672 this.setTasksStealingOnBackPressure()
673 } else {
674 this.unsetTasksStealingOnBackPressure()
675 }
676 } else if (this.opts.tasksQueueOptions != null) {
677 delete this.opts.tasksQueueOptions
678 }
679 }
680
681 private buildTasksQueueOptions (
682 tasksQueueOptions: TasksQueueOptions
683 ): TasksQueueOptions {
684 return {
685 ...{
686 size: Math.pow(this.maxSize, 2),
687 concurrency: 1,
688 taskStealing: true,
689 tasksStealingOnBackPressure: true
690 },
691 ...tasksQueueOptions
692 }
693 }
694
695 private setTasksQueueSize (size: number): void {
696 for (const workerNode of this.workerNodes) {
697 workerNode.tasksQueueBackPressureSize = size
698 }
699 }
700
701 private setTaskStealing (): void {
702 for (const [workerNodeKey] of this.workerNodes.entries()) {
703 this.workerNodes[workerNodeKey].onEmptyQueue =
704 this.taskStealingOnEmptyQueue.bind(this)
705 }
706 }
707
708 private unsetTaskStealing (): void {
709 for (const [workerNodeKey] of this.workerNodes.entries()) {
710 delete this.workerNodes[workerNodeKey].onEmptyQueue
711 }
712 }
713
714 private setTasksStealingOnBackPressure (): void {
715 for (const [workerNodeKey] of this.workerNodes.entries()) {
716 this.workerNodes[workerNodeKey].onBackPressure =
717 this.tasksStealingOnBackPressure.bind(this)
718 }
719 }
720
721 private unsetTasksStealingOnBackPressure (): void {
722 for (const [workerNodeKey] of this.workerNodes.entries()) {
723 delete this.workerNodes[workerNodeKey].onBackPressure
724 }
725 }
726
727 /**
728 * Whether the pool is full or not.
729 *
730 * The pool filling boolean status.
731 */
732 protected get full (): boolean {
733 return this.workerNodes.length >= this.maxSize
734 }
735
736 /**
737 * Whether the pool is busy or not.
738 *
739 * The pool busyness boolean status.
740 */
741 protected abstract get busy (): boolean
742
743 /**
744 * Whether worker nodes are executing concurrently their tasks quota or not.
745 *
746 * @returns Worker nodes busyness boolean status.
747 */
748 protected internalBusy (): boolean {
749 if (this.opts.enableTasksQueue === true) {
750 return (
751 this.workerNodes.findIndex(
752 workerNode =>
753 workerNode.info.ready &&
754 workerNode.usage.tasks.executing <
755 (this.opts.tasksQueueOptions?.concurrency as number)
756 ) === -1
757 )
758 }
759 return (
760 this.workerNodes.findIndex(
761 workerNode =>
762 workerNode.info.ready && workerNode.usage.tasks.executing === 0
763 ) === -1
764 )
765 }
766
767 private async sendTaskFunctionOperationToWorker (
768 workerNodeKey: number,
769 message: MessageValue<Data>
770 ): Promise<boolean> {
771 return await new Promise<boolean>((resolve, reject) => {
772 const workerId = this.getWorkerInfo(workerNodeKey).id as number
773 this.registerWorkerMessageListener(workerNodeKey, message => {
774 if (
775 message.workerId === workerId &&
776 message.taskFunctionOperationStatus === true
777 ) {
778 resolve(true)
779 } else if (
780 message.workerId === workerId &&
781 message.taskFunctionOperationStatus === false
782 ) {
783 reject(
784 new Error(
785 `Task function operation ${
786 message.taskFunctionOperation as string
787 } failed on worker ${message.workerId}`
788 )
789 )
790 }
791 })
792 this.sendToWorker(workerNodeKey, message)
793 })
794 }
795
796 private async sendTaskFunctionOperationToWorkers (
797 message: MessageValue<Data>
798 ): Promise<boolean> {
799 return await new Promise<boolean>((resolve, reject) => {
800 const responsesReceived = new Array<MessageValue<Data | Response>>()
801 for (const [workerNodeKey] of this.workerNodes.entries()) {
802 this.registerWorkerMessageListener(workerNodeKey, message => {
803 if (message.taskFunctionOperationStatus != null) {
804 responsesReceived.push(message)
805 if (
806 responsesReceived.length === this.workerNodes.length &&
807 responsesReceived.every(
808 message => message.taskFunctionOperationStatus === true
809 )
810 ) {
811 resolve(true)
812 } else if (
813 responsesReceived.length === this.workerNodes.length &&
814 responsesReceived.some(
815 message => message.taskFunctionOperationStatus === false
816 )
817 ) {
818 reject(
819 new Error(
820 `Task function operation ${
821 message.taskFunctionOperation as string
822 } failed on worker ${message.workerId as number}`
823 )
824 )
825 }
826 }
827 })
828 this.sendToWorker(workerNodeKey, message)
829 }
830 })
831 }
832
833 /** @inheritDoc */
834 public hasTaskFunction (name: string): boolean {
835 for (const workerNode of this.workerNodes) {
836 if (
837 Array.isArray(workerNode.info.taskFunctionNames) &&
838 workerNode.info.taskFunctionNames.includes(name)
839 ) {
840 return true
841 }
842 }
843 return false
844 }
845
846 /** @inheritDoc */
847 public async addTaskFunction (
848 name: string,
849 fn: TaskFunction<Data, Response>
850 ): Promise<boolean> {
851 if (typeof name !== 'string') {
852 throw new TypeError('name argument must be a string')
853 }
854 if (typeof name === 'string' && name.trim().length === 0) {
855 throw new TypeError('name argument must not be an empty string')
856 }
857 if (typeof fn !== 'function') {
858 throw new TypeError('fn argument must be a function')
859 }
860 const opResult = await this.sendTaskFunctionOperationToWorkers({
861 taskFunctionOperation: 'add',
862 taskFunctionName: name,
863 taskFunction: fn.toString()
864 })
865 this.taskFunctions.set(name, fn)
866 return opResult
867 }
868
869 /** @inheritDoc */
870 public async removeTaskFunction (name: string): Promise<boolean> {
871 if (!this.taskFunctions.has(name)) {
872 throw new Error(
873 'Cannot remove a task function not handled on the pool side'
874 )
875 }
876 const opResult = await this.sendTaskFunctionOperationToWorkers({
877 taskFunctionOperation: 'remove',
878 taskFunctionName: name
879 })
880 this.deleteTaskFunctionWorkerUsages(name)
881 this.taskFunctions.delete(name)
882 return opResult
883 }
884
885 /** @inheritDoc */
886 public listTaskFunctionNames (): string[] {
887 for (const workerNode of this.workerNodes) {
888 if (
889 Array.isArray(workerNode.info.taskFunctionNames) &&
890 workerNode.info.taskFunctionNames.length > 0
891 ) {
892 return workerNode.info.taskFunctionNames
893 }
894 }
895 return []
896 }
897
898 /** @inheritDoc */
899 public async setDefaultTaskFunction (name: string): Promise<boolean> {
900 return await this.sendTaskFunctionOperationToWorkers({
901 taskFunctionOperation: 'default',
902 taskFunctionName: name
903 })
904 }
905
906 private deleteTaskFunctionWorkerUsages (name: string): void {
907 for (const workerNode of this.workerNodes) {
908 workerNode.deleteTaskFunctionWorkerUsage(name)
909 }
910 }
911
912 private shallExecuteTask (workerNodeKey: number): boolean {
913 return (
914 this.tasksQueueSize(workerNodeKey) === 0 &&
915 this.workerNodes[workerNodeKey].usage.tasks.executing <
916 (this.opts.tasksQueueOptions?.concurrency as number)
917 )
918 }
919
920 /** @inheritDoc */
921 public async execute (
922 data?: Data,
923 name?: string,
924 transferList?: TransferListItem[]
925 ): Promise<Response> {
926 return await new Promise<Response>((resolve, reject) => {
927 if (!this.started) {
928 reject(new Error('Cannot execute a task on not started pool'))
929 return
930 }
931 if (name != null && typeof name !== 'string') {
932 reject(new TypeError('name argument must be a string'))
933 return
934 }
935 if (
936 name != null &&
937 typeof name === 'string' &&
938 name.trim().length === 0
939 ) {
940 reject(new TypeError('name argument must not be an empty string'))
941 return
942 }
943 if (transferList != null && !Array.isArray(transferList)) {
944 reject(new TypeError('transferList argument must be an array'))
945 return
946 }
947 const timestamp = performance.now()
948 const workerNodeKey = this.chooseWorkerNode()
949 const task: Task<Data> = {
950 name: name ?? DEFAULT_TASK_NAME,
951 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
952 data: data ?? ({} as Data),
953 transferList,
954 timestamp,
955 taskId: randomUUID()
956 }
957 this.promiseResponseMap.set(task.taskId as string, {
958 resolve,
959 reject,
960 workerNodeKey
961 })
962 if (
963 this.opts.enableTasksQueue === false ||
964 (this.opts.enableTasksQueue === true &&
965 this.shallExecuteTask(workerNodeKey))
966 ) {
967 this.executeTask(workerNodeKey, task)
968 } else {
969 this.enqueueTask(workerNodeKey, task)
970 }
971 })
972 }
973
974 /** @inheritdoc */
975 public start (): void {
976 this.starting = true
977 while (
978 this.workerNodes.reduce(
979 (accumulator, workerNode) =>
980 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
981 0
982 ) < this.numberOfWorkers
983 ) {
984 this.createAndSetupWorkerNode()
985 }
986 this.starting = false
987 this.started = true
988 }
989
990 /** @inheritDoc */
991 public async destroy (): Promise<void> {
992 await Promise.all(
993 this.workerNodes.map(async (_, workerNodeKey) => {
994 await this.destroyWorkerNode(workerNodeKey)
995 })
996 )
997 this.emitter?.emit(PoolEvents.destroy, this.info)
998 this.started = false
999 }
1000
1001 protected async sendKillMessageToWorker (
1002 workerNodeKey: number
1003 ): Promise<void> {
1004 await new Promise<void>((resolve, reject) => {
1005 this.registerWorkerMessageListener(workerNodeKey, message => {
1006 if (message.kill === 'success') {
1007 resolve()
1008 } else if (message.kill === 'failure') {
1009 reject(
1010 new Error(
1011 `Worker ${
1012 message.workerId as number
1013 } kill message handling failed`
1014 )
1015 )
1016 }
1017 })
1018 this.sendToWorker(workerNodeKey, { kill: true })
1019 })
1020 }
1021
1022 /**
1023 * Terminates the worker node given its worker node key.
1024 *
1025 * @param workerNodeKey - The worker node key.
1026 */
1027 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
1028
1029 /**
1030 * Setup hook to execute code before worker nodes are created in the abstract constructor.
1031 * Can be overridden.
1032 *
1033 * @virtual
1034 */
1035 protected setupHook (): void {
1036 /* Intentionally empty */
1037 }
1038
1039 /**
1040 * Should return whether the worker is the main worker or not.
1041 */
1042 protected abstract isMain (): boolean
1043
1044 /**
1045 * Hook executed before the worker task execution.
1046 * Can be overridden.
1047 *
1048 * @param workerNodeKey - The worker node key.
1049 * @param task - The task to execute.
1050 */
1051 protected beforeTaskExecutionHook (
1052 workerNodeKey: number,
1053 task: Task<Data>
1054 ): void {
1055 if (this.workerNodes[workerNodeKey]?.usage != null) {
1056 const workerUsage = this.workerNodes[workerNodeKey].usage
1057 ++workerUsage.tasks.executing
1058 this.updateWaitTimeWorkerUsage(workerUsage, task)
1059 }
1060 if (
1061 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1062 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
1063 task.name as string
1064 ) != null
1065 ) {
1066 const taskFunctionWorkerUsage = this.workerNodes[
1067 workerNodeKey
1068 ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
1069 ++taskFunctionWorkerUsage.tasks.executing
1070 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
1071 }
1072 }
1073
1074 /**
1075 * Hook executed after the worker task execution.
1076 * Can be overridden.
1077 *
1078 * @param workerNodeKey - The worker node key.
1079 * @param message - The received message.
1080 */
1081 protected afterTaskExecutionHook (
1082 workerNodeKey: number,
1083 message: MessageValue<Response>
1084 ): void {
1085 if (this.workerNodes[workerNodeKey]?.usage != null) {
1086 const workerUsage = this.workerNodes[workerNodeKey].usage
1087 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
1088 this.updateRunTimeWorkerUsage(workerUsage, message)
1089 this.updateEluWorkerUsage(workerUsage, message)
1090 }
1091 if (
1092 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1093 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
1094 message.taskPerformance?.name as string
1095 ) != null
1096 ) {
1097 const taskFunctionWorkerUsage = this.workerNodes[
1098 workerNodeKey
1099 ].getTaskFunctionWorkerUsage(
1100 message.taskPerformance?.name as string
1101 ) as WorkerUsage
1102 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
1103 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
1104 this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
1105 }
1106 }
1107
1108 /**
1109 * Whether the worker node shall update its task function worker usage or not.
1110 *
1111 * @param workerNodeKey - The worker node key.
1112 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1113 */
1114 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
1115 const workerInfo = this.getWorkerInfo(workerNodeKey)
1116 return (
1117 workerInfo != null &&
1118 Array.isArray(workerInfo.taskFunctionNames) &&
1119 workerInfo.taskFunctionNames.length > 2
1120 )
1121 }
1122
1123 private updateTaskStatisticsWorkerUsage (
1124 workerUsage: WorkerUsage,
1125 message: MessageValue<Response>
1126 ): void {
1127 const workerTaskStatistics = workerUsage.tasks
1128 if (
1129 workerTaskStatistics.executing != null &&
1130 workerTaskStatistics.executing > 0
1131 ) {
1132 --workerTaskStatistics.executing
1133 }
1134 if (message.workerError == null) {
1135 ++workerTaskStatistics.executed
1136 } else {
1137 ++workerTaskStatistics.failed
1138 }
1139 }
1140
1141 private updateRunTimeWorkerUsage (
1142 workerUsage: WorkerUsage,
1143 message: MessageValue<Response>
1144 ): void {
1145 if (message.workerError != null) {
1146 return
1147 }
1148 updateMeasurementStatistics(
1149 workerUsage.runTime,
1150 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
1151 message.taskPerformance?.runTime ?? 0
1152 )
1153 }
1154
1155 private updateWaitTimeWorkerUsage (
1156 workerUsage: WorkerUsage,
1157 task: Task<Data>
1158 ): void {
1159 const timestamp = performance.now()
1160 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
1161 updateMeasurementStatistics(
1162 workerUsage.waitTime,
1163 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
1164 taskWaitTime
1165 )
1166 }
1167
1168 private updateEluWorkerUsage (
1169 workerUsage: WorkerUsage,
1170 message: MessageValue<Response>
1171 ): void {
1172 if (message.workerError != null) {
1173 return
1174 }
1175 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
1176 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
1177 updateMeasurementStatistics(
1178 workerUsage.elu.active,
1179 eluTaskStatisticsRequirements,
1180 message.taskPerformance?.elu?.active ?? 0
1181 )
1182 updateMeasurementStatistics(
1183 workerUsage.elu.idle,
1184 eluTaskStatisticsRequirements,
1185 message.taskPerformance?.elu?.idle ?? 0
1186 )
1187 if (eluTaskStatisticsRequirements.aggregate) {
1188 if (message.taskPerformance?.elu != null) {
1189 if (workerUsage.elu.utilization != null) {
1190 workerUsage.elu.utilization =
1191 (workerUsage.elu.utilization +
1192 message.taskPerformance.elu.utilization) /
1193 2
1194 } else {
1195 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
1196 }
1197 }
1198 }
1199 }
1200
1201 /**
1202 * Chooses a worker node for the next task.
1203 *
1204 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
1205 *
1206 * @returns The chosen worker node key
1207 */
1208 private chooseWorkerNode (): number {
1209 if (this.shallCreateDynamicWorker()) {
1210 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
1211 if (
1212 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1213 ) {
1214 return workerNodeKey
1215 }
1216 }
1217 return this.workerChoiceStrategyContext.execute()
1218 }
1219
1220 /**
1221 * Conditions for dynamic worker creation.
1222 *
1223 * @returns Whether to create a dynamic worker or not.
1224 */
1225 private shallCreateDynamicWorker (): boolean {
1226 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
1227 }
1228
1229 /**
1230 * Sends a message to worker given its worker node key.
1231 *
1232 * @param workerNodeKey - The worker node key.
1233 * @param message - The message.
1234 * @param transferList - The optional array of transferable objects.
1235 */
1236 protected abstract sendToWorker (
1237 workerNodeKey: number,
1238 message: MessageValue<Data>,
1239 transferList?: TransferListItem[]
1240 ): void
1241
1242 /**
1243 * Creates a new worker.
1244 *
1245 * @returns Newly created worker.
1246 */
1247 protected abstract createWorker (): Worker
1248
1249 /**
1250 * Creates a new, completely set up worker node.
1251 *
1252 * @returns New, completely set up worker node key.
1253 */
1254 protected createAndSetupWorkerNode (): number {
1255 const worker = this.createWorker()
1256
1257 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
1258 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
1259 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1260 worker.on('error', error => {
1261 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1262 const workerInfo = this.getWorkerInfo(workerNodeKey)
1263 workerInfo.ready = false
1264 this.workerNodes[workerNodeKey].closeChannel()
1265 this.emitter?.emit(PoolEvents.error, error)
1266 if (
1267 this.started &&
1268 !this.starting &&
1269 this.opts.restartWorkerOnError === true
1270 ) {
1271 if (workerInfo.dynamic) {
1272 this.createAndSetupDynamicWorkerNode()
1273 } else {
1274 this.createAndSetupWorkerNode()
1275 }
1276 }
1277 if (this.started && this.opts.enableTasksQueue === true) {
1278 this.redistributeQueuedTasks(workerNodeKey)
1279 }
1280 })
1281 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
1282 worker.once('exit', () => {
1283 this.removeWorkerNode(worker)
1284 })
1285
1286 const workerNodeKey = this.addWorkerNode(worker)
1287
1288 this.afterWorkerNodeSetup(workerNodeKey)
1289
1290 return workerNodeKey
1291 }
1292
1293 /**
1294 * Creates a new, completely set up dynamic worker node.
1295 *
1296 * @returns New, completely set up dynamic worker node key.
1297 */
1298 protected createAndSetupDynamicWorkerNode (): number {
1299 const workerNodeKey = this.createAndSetupWorkerNode()
1300 this.registerWorkerMessageListener(workerNodeKey, message => {
1301 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1302 message.workerId
1303 )
1304 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
1305 // Kill message received from worker
1306 if (
1307 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1308 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
1309 ((this.opts.enableTasksQueue === false &&
1310 workerUsage.tasks.executing === 0) ||
1311 (this.opts.enableTasksQueue === true &&
1312 workerUsage.tasks.executing === 0 &&
1313 this.tasksQueueSize(localWorkerNodeKey) === 0)))
1314 ) {
1315 this.destroyWorkerNode(localWorkerNodeKey).catch(error => {
1316 this.emitter?.emit(PoolEvents.error, error)
1317 })
1318 }
1319 })
1320 const workerInfo = this.getWorkerInfo(workerNodeKey)
1321 this.sendToWorker(workerNodeKey, {
1322 checkActive: true
1323 })
1324 if (this.taskFunctions.size > 0) {
1325 for (const [taskFunctionName, taskFunction] of this.taskFunctions) {
1326 this.sendTaskFunctionOperationToWorker(workerNodeKey, {
1327 taskFunctionOperation: 'add',
1328 taskFunctionName,
1329 taskFunction: taskFunction.toString()
1330 }).catch(error => {
1331 this.emitter?.emit(PoolEvents.error, error)
1332 })
1333 }
1334 }
1335 workerInfo.dynamic = true
1336 if (
1337 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
1338 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1339 ) {
1340 workerInfo.ready = true
1341 }
1342 this.checkAndEmitDynamicWorkerCreationEvents()
1343 return workerNodeKey
1344 }
1345
1346 /**
1347 * Registers a listener callback on the worker given its worker node key.
1348 *
1349 * @param workerNodeKey - The worker node key.
1350 * @param listener - The message listener callback.
1351 */
1352 protected abstract registerWorkerMessageListener<
1353 Message extends Data | Response
1354 >(
1355 workerNodeKey: number,
1356 listener: (message: MessageValue<Message>) => void
1357 ): void
1358
1359 /**
1360 * Method hooked up after a worker node has been newly created.
1361 * Can be overridden.
1362 *
1363 * @param workerNodeKey - The newly created worker node key.
1364 */
1365 protected afterWorkerNodeSetup (workerNodeKey: number): void {
1366 // Listen to worker messages.
1367 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
1368 // Send the startup message to worker.
1369 this.sendStartupMessageToWorker(workerNodeKey)
1370 // Send the statistics message to worker.
1371 this.sendStatisticsMessageToWorker(workerNodeKey)
1372 if (this.opts.enableTasksQueue === true) {
1373 if (this.opts.tasksQueueOptions?.taskStealing === true) {
1374 this.workerNodes[workerNodeKey].onEmptyQueue =
1375 this.taskStealingOnEmptyQueue.bind(this)
1376 }
1377 if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
1378 this.workerNodes[workerNodeKey].onBackPressure =
1379 this.tasksStealingOnBackPressure.bind(this)
1380 }
1381 }
1382 }
1383
1384 /**
1385 * Sends the startup message to worker given its worker node key.
1386 *
1387 * @param workerNodeKey - The worker node key.
1388 */
1389 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1390
1391 /**
1392 * Sends the statistics message to worker given its worker node key.
1393 *
1394 * @param workerNodeKey - The worker node key.
1395 */
1396 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
1397 this.sendToWorker(workerNodeKey, {
1398 statistics: {
1399 runTime:
1400 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1401 .runTime.aggregate,
1402 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1403 .elu.aggregate
1404 }
1405 })
1406 }
1407
1408 private redistributeQueuedTasks (workerNodeKey: number): void {
1409 while (this.tasksQueueSize(workerNodeKey) > 0) {
1410 const destinationWorkerNodeKey = this.workerNodes.reduce(
1411 (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
1412 return workerNode.info.ready &&
1413 workerNode.usage.tasks.queued <
1414 workerNodes[minWorkerNodeKey].usage.tasks.queued
1415 ? workerNodeKey
1416 : minWorkerNodeKey
1417 },
1418 0
1419 )
1420 const task = this.dequeueTask(workerNodeKey) as Task<Data>
1421 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1422 this.executeTask(destinationWorkerNodeKey, task)
1423 } else {
1424 this.enqueueTask(destinationWorkerNodeKey, task)
1425 }
1426 }
1427 }
1428
1429 private updateTaskStolenStatisticsWorkerUsage (
1430 workerNodeKey: number,
1431 taskName: string
1432 ): void {
1433 const workerNode = this.workerNodes[workerNodeKey]
1434 if (workerNode?.usage != null) {
1435 ++workerNode.usage.tasks.stolen
1436 }
1437 if (
1438 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1439 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1440 ) {
1441 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1442 taskName
1443 ) as WorkerUsage
1444 ++taskFunctionWorkerUsage.tasks.stolen
1445 }
1446 }
1447
1448 private taskStealingOnEmptyQueue (workerId: number): void {
1449 const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
1450 const workerNodes = this.workerNodes
1451 .slice()
1452 .sort(
1453 (workerNodeA, workerNodeB) =>
1454 workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
1455 )
1456 const sourceWorkerNode = workerNodes.find(
1457 workerNode =>
1458 workerNode.info.ready &&
1459 workerNode.info.id !== workerId &&
1460 workerNode.usage.tasks.queued > 0
1461 )
1462 if (sourceWorkerNode != null) {
1463 const task = sourceWorkerNode.popTask() as Task<Data>
1464 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1465 this.executeTask(destinationWorkerNodeKey, task)
1466 } else {
1467 this.enqueueTask(destinationWorkerNodeKey, task)
1468 }
1469 this.updateTaskStolenStatisticsWorkerUsage(
1470 destinationWorkerNodeKey,
1471 task.name as string
1472 )
1473 }
1474 }
1475
1476 private tasksStealingOnBackPressure (workerId: number): void {
1477 const sizeOffset = 1
1478 if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) {
1479 return
1480 }
1481 const sourceWorkerNode =
1482 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
1483 const workerNodes = this.workerNodes
1484 .slice()
1485 .sort(
1486 (workerNodeA, workerNodeB) =>
1487 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1488 )
1489 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1490 if (
1491 sourceWorkerNode.usage.tasks.queued > 0 &&
1492 workerNode.info.ready &&
1493 workerNode.info.id !== workerId &&
1494 workerNode.usage.tasks.queued <
1495 (this.opts.tasksQueueOptions?.size as number) - sizeOffset
1496 ) {
1497 const task = sourceWorkerNode.popTask() as Task<Data>
1498 if (this.shallExecuteTask(workerNodeKey)) {
1499 this.executeTask(workerNodeKey, task)
1500 } else {
1501 this.enqueueTask(workerNodeKey, task)
1502 }
1503 this.updateTaskStolenStatisticsWorkerUsage(
1504 workerNodeKey,
1505 task.name as string
1506 )
1507 }
1508 }
1509 }
1510
1511 /**
1512 * This method is the listener registered for each worker message.
1513 *
1514 * @returns The listener function to execute when a message is received from a worker.
1515 */
1516 protected workerListener (): (message: MessageValue<Response>) => void {
1517 return message => {
1518 this.checkMessageWorkerId(message)
1519 if (message.ready != null && message.taskFunctionNames != null) {
1520 // Worker ready response received from worker
1521 this.handleWorkerReadyResponse(message)
1522 } else if (message.taskId != null) {
1523 // Task execution response received from worker
1524 this.handleTaskExecutionResponse(message)
1525 } else if (message.taskFunctionNames != null) {
1526 // Task function names message received from worker
1527 this.getWorkerInfo(
1528 this.getWorkerNodeKeyByWorkerId(message.workerId)
1529 ).taskFunctionNames = message.taskFunctionNames
1530 }
1531 }
1532 }
1533
1534 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
1535 if (message.ready === false) {
1536 throw new Error(
1537 `Worker ${message.workerId as number} failed to initialize`
1538 )
1539 }
1540 const workerInfo = this.getWorkerInfo(
1541 this.getWorkerNodeKeyByWorkerId(message.workerId)
1542 )
1543 workerInfo.ready = message.ready as boolean
1544 workerInfo.taskFunctionNames = message.taskFunctionNames
1545 if (this.ready) {
1546 this.emitter?.emit(PoolEvents.ready, this.info)
1547 }
1548 }
1549
1550 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1551 const { taskId, workerError, data } = message
1552 const promiseResponse = this.promiseResponseMap.get(taskId as string)
1553 if (promiseResponse != null) {
1554 if (workerError != null) {
1555 this.emitter?.emit(PoolEvents.taskError, workerError)
1556 promiseResponse.reject(workerError.message)
1557 } else {
1558 promiseResponse.resolve(data as Response)
1559 }
1560 const workerNodeKey = promiseResponse.workerNodeKey
1561 this.afterTaskExecutionHook(workerNodeKey, message)
1562 this.workerChoiceStrategyContext.update(workerNodeKey)
1563 this.promiseResponseMap.delete(taskId as string)
1564 if (
1565 this.opts.enableTasksQueue === true &&
1566 this.tasksQueueSize(workerNodeKey) > 0 &&
1567 this.workerNodes[workerNodeKey].usage.tasks.executing <
1568 (this.opts.tasksQueueOptions?.concurrency as number)
1569 ) {
1570 this.executeTask(
1571 workerNodeKey,
1572 this.dequeueTask(workerNodeKey) as Task<Data>
1573 )
1574 }
1575 }
1576 }
1577
1578 private checkAndEmitTaskExecutionEvents (): void {
1579 if (this.busy) {
1580 this.emitter?.emit(PoolEvents.busy, this.info)
1581 }
1582 }
1583
1584 private checkAndEmitTaskQueuingEvents (): void {
1585 if (this.hasBackPressure()) {
1586 this.emitter?.emit(PoolEvents.backPressure, this.info)
1587 }
1588 }
1589
1590 private checkAndEmitDynamicWorkerCreationEvents (): void {
1591 if (this.type === PoolTypes.dynamic) {
1592 if (this.full) {
1593 this.emitter?.emit(PoolEvents.full, this.info)
1594 }
1595 }
1596 }
1597
1598 /**
1599 * Gets the worker information given its worker node key.
1600 *
1601 * @param workerNodeKey - The worker node key.
1602 * @returns The worker information.
1603 */
1604 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
1605 return this.workerNodes[workerNodeKey].info
1606 }
1607
1608 /**
1609 * Adds the given worker in the pool worker nodes.
1610 *
1611 * @param worker - The worker.
1612 * @returns The added worker node key.
1613 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1614 */
1615 private addWorkerNode (worker: Worker): number {
1616 const workerNode = new WorkerNode<Worker, Data>(
1617 worker,
1618 this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2)
1619 )
1620 // Flag the worker node as ready at pool startup.
1621 if (this.starting) {
1622 workerNode.info.ready = true
1623 }
1624 this.workerNodes.push(workerNode)
1625 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1626 if (workerNodeKey === -1) {
1627 throw new Error('Worker added not found in worker nodes')
1628 }
1629 return workerNodeKey
1630 }
1631
1632 /**
1633 * Removes the given worker from the pool worker nodes.
1634 *
1635 * @param worker - The worker.
1636 */
1637 private removeWorkerNode (worker: Worker): void {
1638 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1639 if (workerNodeKey !== -1) {
1640 this.workerNodes.splice(workerNodeKey, 1)
1641 this.workerChoiceStrategyContext.remove(workerNodeKey)
1642 }
1643 }
1644
1645 /** @inheritDoc */
1646 public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
1647 return (
1648 this.opts.enableTasksQueue === true &&
1649 this.workerNodes[workerNodeKey].hasBackPressure()
1650 )
1651 }
1652
1653 private hasBackPressure (): boolean {
1654 return (
1655 this.opts.enableTasksQueue === true &&
1656 this.workerNodes.findIndex(
1657 workerNode => !workerNode.hasBackPressure()
1658 ) === -1
1659 )
1660 }
1661
1662 /**
1663 * Executes the given task on the worker given its worker node key.
1664 *
1665 * @param workerNodeKey - The worker node key.
1666 * @param task - The task to execute.
1667 */
1668 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1669 this.beforeTaskExecutionHook(workerNodeKey, task)
1670 this.sendToWorker(workerNodeKey, task, task.transferList)
1671 this.checkAndEmitTaskExecutionEvents()
1672 }
1673
1674 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1675 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1676 this.checkAndEmitTaskQueuingEvents()
1677 return tasksQueueSize
1678 }
1679
1680 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1681 return this.workerNodes[workerNodeKey].dequeueTask()
1682 }
1683
1684 private tasksQueueSize (workerNodeKey: number): number {
1685 return this.workerNodes[workerNodeKey].tasksQueueSize()
1686 }
1687
1688 protected flushTasksQueue (workerNodeKey: number): void {
1689 while (this.tasksQueueSize(workerNodeKey) > 0) {
1690 this.executeTask(
1691 workerNodeKey,
1692 this.dequeueTask(workerNodeKey) as Task<Data>
1693 )
1694 }
1695 this.workerNodes[workerNodeKey].clearTasksQueue()
1696 }
1697
1698 private flushTasksQueues (): void {
1699 for (const [workerNodeKey] of this.workerNodes.entries()) {
1700 this.flushTasksQueue(workerNodeKey)
1701 }
1702 }
1703 }