Merge branch 'master' into feature/task-functions
[poolifier.git] / src / pools / abstract-pool.ts
1 import { randomUUID } from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import { existsSync } from 'node:fs'
4 import { type TransferListItem } from 'node:worker_threads'
5 import type {
6 MessageValue,
7 PromiseResponseWrapper,
8 Task
9 } from '../utility-types'
10 import {
11 DEFAULT_TASK_NAME,
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
13 EMPTY_FUNCTION,
14 average,
15 isKillBehavior,
16 isPlainObject,
17 max,
18 median,
19 min,
20 round,
21 updateMeasurementStatistics
22 } from '../utils'
23 import { KillBehaviors } from '../worker/worker-options'
24 import type { TaskFunction } from '../worker/task-functions'
25 import {
26 type IPool,
27 PoolEmitter,
28 PoolEvents,
29 type PoolInfo,
30 type PoolOptions,
31 type PoolType,
32 PoolTypes,
33 type TasksQueueOptions
34 } from './pool'
35 import type {
36 IWorker,
37 IWorkerNode,
38 WorkerInfo,
39 WorkerType,
40 WorkerUsage
41 } from './worker'
42 import {
43 type MeasurementStatisticsRequirements,
44 Measurements,
45 WorkerChoiceStrategies,
46 type WorkerChoiceStrategy,
47 type WorkerChoiceStrategyOptions
48 } from './selection-strategies/selection-strategies-types'
49 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
50 import { version } from './version'
51 import { WorkerNode } from './worker-node'
52
53 /**
54 * Base class that implements some shared logic for all poolifier pools.
55 *
56 * @typeParam Worker - Type of worker which manages this pool.
57 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
58 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
59 */
60 export abstract class AbstractPool<
61 Worker extends IWorker,
62 Data = unknown,
63 Response = unknown
64 > implements IPool<Worker, Data, Response> {
65 /** @inheritDoc */
66 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
67
68 /** @inheritDoc */
69 public readonly emitter?: PoolEmitter
70
71 /**
72 * The task execution response promise map:
73 * - `key`: The message id of each submitted task.
74 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
75 *
76 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
77 */
78 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
79 new Map<string, PromiseResponseWrapper<Response>>()
80
81 /**
82 * Worker choice strategy context referencing a worker choice algorithm implementation.
83 */
84 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
85 Worker,
86 Data,
87 Response
88 >
89
90 /**
91 * Dynamic pool maximum size property placeholder.
92 */
93 protected readonly max?: number
94
95 /**
96 * The task functions added at runtime map:
97 * - `key`: The task function name.
98 * - `value`: The task function itself.
99 */
100 private readonly taskFunctions: Map<string, TaskFunction<Data, Response>>
101
102 /**
103 * Whether the pool is started or not.
104 */
105 private started: boolean
106 /**
107 * Whether the pool is starting or not.
108 */
109 private starting: boolean
110 /**
111 * The start timestamp of the pool.
112 */
113 private readonly startTimestamp
114
115 /**
116 * Constructs a new poolifier pool.
117 *
118 * @param numberOfWorkers - Number of workers that this pool should manage.
119 * @param filePath - Path to the worker file.
120 * @param opts - Options for the pool.
121 */
122 public constructor (
123 protected readonly numberOfWorkers: number,
124 protected readonly filePath: string,
125 protected readonly opts: PoolOptions<Worker>
126 ) {
127 if (!this.isMain()) {
128 throw new Error(
129 'Cannot start a pool from a worker with the same type as the pool'
130 )
131 }
132 this.checkNumberOfWorkers(this.numberOfWorkers)
133 this.checkFilePath(this.filePath)
134 this.checkPoolOptions(this.opts)
135
136 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
137 this.executeTask = this.executeTask.bind(this)
138 this.enqueueTask = this.enqueueTask.bind(this)
139
140 if (this.opts.enableEvents === true) {
141 this.emitter = new PoolEmitter()
142 }
143 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
144 Worker,
145 Data,
146 Response
147 >(
148 this,
149 this.opts.workerChoiceStrategy,
150 this.opts.workerChoiceStrategyOptions
151 )
152
153 this.setupHook()
154
155 this.taskFunctions = new Map<string, TaskFunction<Data, Response>>()
156
157 this.started = false
158 this.starting = false
159 if (this.opts.startWorkers === true) {
160 this.start()
161 }
162
163 this.startTimestamp = performance.now()
164 }
165
166 private checkFilePath (filePath: string): void {
167 if (
168 filePath == null ||
169 typeof filePath !== 'string' ||
170 (typeof filePath === 'string' && filePath.trim().length === 0)
171 ) {
172 throw new Error('Please specify a file with a worker implementation')
173 }
174 if (!existsSync(filePath)) {
175 throw new Error(`Cannot find the worker file '${filePath}'`)
176 }
177 }
178
179 private checkNumberOfWorkers (numberOfWorkers: number): void {
180 if (numberOfWorkers == null) {
181 throw new Error(
182 'Cannot instantiate a pool without specifying the number of workers'
183 )
184 } else if (!Number.isSafeInteger(numberOfWorkers)) {
185 throw new TypeError(
186 'Cannot instantiate a pool with a non safe integer number of workers'
187 )
188 } else if (numberOfWorkers < 0) {
189 throw new RangeError(
190 'Cannot instantiate a pool with a negative number of workers'
191 )
192 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
193 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
194 }
195 }
196
197 protected checkDynamicPoolSize (min: number, max: number): void {
198 if (this.type === PoolTypes.dynamic) {
199 if (max == null) {
200 throw new TypeError(
201 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
202 )
203 } else if (!Number.isSafeInteger(max)) {
204 throw new TypeError(
205 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
206 )
207 } else if (min > max) {
208 throw new RangeError(
209 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
210 )
211 } else if (max === 0) {
212 throw new RangeError(
213 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
214 )
215 } else if (min === max) {
216 throw new RangeError(
217 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
218 )
219 }
220 }
221 }
222
223 private checkPoolOptions (opts: PoolOptions<Worker>): void {
224 if (isPlainObject(opts)) {
225 this.opts.startWorkers = opts.startWorkers ?? true
226 this.opts.workerChoiceStrategy =
227 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
228 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
229 this.opts.workerChoiceStrategyOptions = {
230 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
231 ...opts.workerChoiceStrategyOptions
232 }
233 this.checkValidWorkerChoiceStrategyOptions(
234 this.opts.workerChoiceStrategyOptions
235 )
236 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
237 this.opts.enableEvents = opts.enableEvents ?? true
238 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
239 if (this.opts.enableTasksQueue) {
240 this.checkValidTasksQueueOptions(
241 opts.tasksQueueOptions as TasksQueueOptions
242 )
243 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
244 opts.tasksQueueOptions as TasksQueueOptions
245 )
246 }
247 } else {
248 throw new TypeError('Invalid pool options: must be a plain object')
249 }
250 }
251
252 private checkValidWorkerChoiceStrategy (
253 workerChoiceStrategy: WorkerChoiceStrategy
254 ): void {
255 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
256 throw new Error(
257 `Invalid worker choice strategy '${workerChoiceStrategy}'`
258 )
259 }
260 }
261
262 private checkValidWorkerChoiceStrategyOptions (
263 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
264 ): void {
265 if (!isPlainObject(workerChoiceStrategyOptions)) {
266 throw new TypeError(
267 'Invalid worker choice strategy options: must be a plain object'
268 )
269 }
270 if (
271 workerChoiceStrategyOptions.retries != null &&
272 !Number.isSafeInteger(workerChoiceStrategyOptions.retries)
273 ) {
274 throw new TypeError(
275 'Invalid worker choice strategy options: retries must be an integer'
276 )
277 }
278 if (
279 workerChoiceStrategyOptions.retries != null &&
280 workerChoiceStrategyOptions.retries < 0
281 ) {
282 throw new RangeError(
283 `Invalid worker choice strategy options: retries '${workerChoiceStrategyOptions.retries}' must be greater or equal than zero`
284 )
285 }
286 if (
287 workerChoiceStrategyOptions.weights != null &&
288 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
289 ) {
290 throw new Error(
291 'Invalid worker choice strategy options: must have a weight for each worker node'
292 )
293 }
294 if (
295 workerChoiceStrategyOptions.measurement != null &&
296 !Object.values(Measurements).includes(
297 workerChoiceStrategyOptions.measurement
298 )
299 ) {
300 throw new Error(
301 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
302 )
303 }
304 }
305
306 private checkValidTasksQueueOptions (
307 tasksQueueOptions: TasksQueueOptions
308 ): void {
309 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
310 throw new TypeError('Invalid tasks queue options: must be a plain object')
311 }
312 if (
313 tasksQueueOptions?.concurrency != null &&
314 !Number.isSafeInteger(tasksQueueOptions?.concurrency)
315 ) {
316 throw new TypeError(
317 'Invalid worker node tasks concurrency: must be an integer'
318 )
319 }
320 if (
321 tasksQueueOptions?.concurrency != null &&
322 tasksQueueOptions?.concurrency <= 0
323 ) {
324 throw new RangeError(
325 `Invalid worker node tasks concurrency: ${tasksQueueOptions?.concurrency} is a negative integer or zero`
326 )
327 }
328 if (
329 tasksQueueOptions?.size != null &&
330 !Number.isSafeInteger(tasksQueueOptions?.size)
331 ) {
332 throw new TypeError(
333 'Invalid worker node tasks queue size: must be an integer'
334 )
335 }
336 if (tasksQueueOptions?.size != null && tasksQueueOptions?.size <= 0) {
337 throw new RangeError(
338 `Invalid worker node tasks queue size: ${tasksQueueOptions?.size} is a negative integer or zero`
339 )
340 }
341 }
342
343 /** @inheritDoc */
344 public get info (): PoolInfo {
345 return {
346 version,
347 type: this.type,
348 worker: this.worker,
349 started: this.started,
350 ready: this.ready,
351 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
352 minSize: this.minSize,
353 maxSize: this.maxSize,
354 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
355 .runTime.aggregate &&
356 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
357 .waitTime.aggregate && { utilization: round(this.utilization) }),
358 workerNodes: this.workerNodes.length,
359 idleWorkerNodes: this.workerNodes.reduce(
360 (accumulator, workerNode) =>
361 workerNode.usage.tasks.executing === 0
362 ? accumulator + 1
363 : accumulator,
364 0
365 ),
366 busyWorkerNodes: this.workerNodes.reduce(
367 (accumulator, workerNode) =>
368 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
369 0
370 ),
371 executedTasks: this.workerNodes.reduce(
372 (accumulator, workerNode) =>
373 accumulator + workerNode.usage.tasks.executed,
374 0
375 ),
376 executingTasks: this.workerNodes.reduce(
377 (accumulator, workerNode) =>
378 accumulator + workerNode.usage.tasks.executing,
379 0
380 ),
381 ...(this.opts.enableTasksQueue === true && {
382 queuedTasks: this.workerNodes.reduce(
383 (accumulator, workerNode) =>
384 accumulator + workerNode.usage.tasks.queued,
385 0
386 )
387 }),
388 ...(this.opts.enableTasksQueue === true && {
389 maxQueuedTasks: this.workerNodes.reduce(
390 (accumulator, workerNode) =>
391 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
392 0
393 )
394 }),
395 ...(this.opts.enableTasksQueue === true && {
396 backPressure: this.hasBackPressure()
397 }),
398 ...(this.opts.enableTasksQueue === true && {
399 stolenTasks: this.workerNodes.reduce(
400 (accumulator, workerNode) =>
401 accumulator + workerNode.usage.tasks.stolen,
402 0
403 )
404 }),
405 failedTasks: this.workerNodes.reduce(
406 (accumulator, workerNode) =>
407 accumulator + workerNode.usage.tasks.failed,
408 0
409 ),
410 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
411 .runTime.aggregate && {
412 runTime: {
413 minimum: round(
414 min(
415 ...this.workerNodes.map(
416 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
417 )
418 )
419 ),
420 maximum: round(
421 max(
422 ...this.workerNodes.map(
423 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
424 )
425 )
426 ),
427 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
428 .runTime.average && {
429 average: round(
430 average(
431 this.workerNodes.reduce<number[]>(
432 (accumulator, workerNode) =>
433 accumulator.concat(workerNode.usage.runTime.history),
434 []
435 )
436 )
437 )
438 }),
439 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
440 .runTime.median && {
441 median: round(
442 median(
443 this.workerNodes.reduce<number[]>(
444 (accumulator, workerNode) =>
445 accumulator.concat(workerNode.usage.runTime.history),
446 []
447 )
448 )
449 )
450 })
451 }
452 }),
453 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
454 .waitTime.aggregate && {
455 waitTime: {
456 minimum: round(
457 min(
458 ...this.workerNodes.map(
459 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
460 )
461 )
462 ),
463 maximum: round(
464 max(
465 ...this.workerNodes.map(
466 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
467 )
468 )
469 ),
470 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
471 .waitTime.average && {
472 average: round(
473 average(
474 this.workerNodes.reduce<number[]>(
475 (accumulator, workerNode) =>
476 accumulator.concat(workerNode.usage.waitTime.history),
477 []
478 )
479 )
480 )
481 }),
482 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
483 .waitTime.median && {
484 median: round(
485 median(
486 this.workerNodes.reduce<number[]>(
487 (accumulator, workerNode) =>
488 accumulator.concat(workerNode.usage.waitTime.history),
489 []
490 )
491 )
492 )
493 })
494 }
495 })
496 }
497 }
498
499 /**
500 * The pool readiness boolean status.
501 */
502 private get ready (): boolean {
503 return (
504 this.workerNodes.reduce(
505 (accumulator, workerNode) =>
506 !workerNode.info.dynamic && workerNode.info.ready
507 ? accumulator + 1
508 : accumulator,
509 0
510 ) >= this.minSize
511 )
512 }
513
514 /**
515 * The approximate pool utilization.
516 *
517 * @returns The pool utilization.
518 */
519 private get utilization (): number {
520 const poolTimeCapacity =
521 (performance.now() - this.startTimestamp) * this.maxSize
522 const totalTasksRunTime = this.workerNodes.reduce(
523 (accumulator, workerNode) =>
524 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
525 0
526 )
527 const totalTasksWaitTime = this.workerNodes.reduce(
528 (accumulator, workerNode) =>
529 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
530 0
531 )
532 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
533 }
534
535 /**
536 * The pool type.
537 *
538 * If it is `'dynamic'`, it provides the `max` property.
539 */
540 protected abstract get type (): PoolType
541
542 /**
543 * The worker type.
544 */
545 protected abstract get worker (): WorkerType
546
547 /**
548 * The pool minimum size.
549 */
550 protected get minSize (): number {
551 return this.numberOfWorkers
552 }
553
554 /**
555 * The pool maximum size.
556 */
557 protected get maxSize (): number {
558 return this.max ?? this.numberOfWorkers
559 }
560
561 /**
562 * Checks if the worker id sent in the received message from a worker is valid.
563 *
564 * @param message - The received message.
565 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
566 */
567 private checkMessageWorkerId (message: MessageValue<Response>): void {
568 if (message.workerId == null) {
569 throw new Error('Worker message received without worker id')
570 } else if (
571 message.workerId != null &&
572 this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
573 ) {
574 throw new Error(
575 `Worker message received from unknown worker '${message.workerId}'`
576 )
577 }
578 }
579
580 /**
581 * Gets the given worker its worker node key.
582 *
583 * @param worker - The worker.
584 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
585 */
586 private getWorkerNodeKeyByWorker (worker: Worker): number {
587 return this.workerNodes.findIndex(
588 workerNode => workerNode.worker === worker
589 )
590 }
591
592 /**
593 * Gets the worker node key given its worker id.
594 *
595 * @param workerId - The worker id.
596 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
597 */
598 private getWorkerNodeKeyByWorkerId (workerId: number | undefined): number {
599 return this.workerNodes.findIndex(
600 workerNode => workerNode.info.id === workerId
601 )
602 }
603
604 /** @inheritDoc */
605 public setWorkerChoiceStrategy (
606 workerChoiceStrategy: WorkerChoiceStrategy,
607 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
608 ): void {
609 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
610 this.opts.workerChoiceStrategy = workerChoiceStrategy
611 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
612 this.opts.workerChoiceStrategy
613 )
614 if (workerChoiceStrategyOptions != null) {
615 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
616 }
617 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
618 workerNode.resetUsage()
619 this.sendStatisticsMessageToWorker(workerNodeKey)
620 }
621 }
622
623 /** @inheritDoc */
624 public setWorkerChoiceStrategyOptions (
625 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
626 ): void {
627 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
628 this.opts.workerChoiceStrategyOptions = {
629 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
630 ...workerChoiceStrategyOptions
631 }
632 this.workerChoiceStrategyContext.setOptions(
633 this.opts.workerChoiceStrategyOptions
634 )
635 }
636
637 /** @inheritDoc */
638 public enableTasksQueue (
639 enable: boolean,
640 tasksQueueOptions?: TasksQueueOptions
641 ): void {
642 if (this.opts.enableTasksQueue === true && !enable) {
643 this.unsetTaskStealing()
644 this.unsetTasksStealingOnBackPressure()
645 this.flushTasksQueues()
646 }
647 this.opts.enableTasksQueue = enable
648 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
649 }
650
651 /** @inheritDoc */
652 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
653 if (this.opts.enableTasksQueue === true) {
654 this.checkValidTasksQueueOptions(tasksQueueOptions)
655 this.opts.tasksQueueOptions =
656 this.buildTasksQueueOptions(tasksQueueOptions)
657 this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number)
658 if (this.opts.tasksQueueOptions.taskStealing === true) {
659 this.setTaskStealing()
660 } else {
661 this.unsetTaskStealing()
662 }
663 if (this.opts.tasksQueueOptions.tasksStealingOnBackPressure === true) {
664 this.setTasksStealingOnBackPressure()
665 } else {
666 this.unsetTasksStealingOnBackPressure()
667 }
668 } else if (this.opts.tasksQueueOptions != null) {
669 delete this.opts.tasksQueueOptions
670 }
671 }
672
673 private setTasksQueueSize (size: number): void {
674 for (const workerNode of this.workerNodes) {
675 workerNode.tasksQueueBackPressureSize = size
676 }
677 }
678
679 private setTaskStealing (): void {
680 for (const [workerNodeKey] of this.workerNodes.entries()) {
681 this.workerNodes[workerNodeKey].onEmptyQueue =
682 this.taskStealingOnEmptyQueue.bind(this)
683 }
684 }
685
686 private unsetTaskStealing (): void {
687 for (const [workerNodeKey] of this.workerNodes.entries()) {
688 delete this.workerNodes[workerNodeKey].onEmptyQueue
689 }
690 }
691
692 private setTasksStealingOnBackPressure (): void {
693 for (const [workerNodeKey] of this.workerNodes.entries()) {
694 this.workerNodes[workerNodeKey].onBackPressure =
695 this.tasksStealingOnBackPressure.bind(this)
696 }
697 }
698
699 private unsetTasksStealingOnBackPressure (): void {
700 for (const [workerNodeKey] of this.workerNodes.entries()) {
701 delete this.workerNodes[workerNodeKey].onBackPressure
702 }
703 }
704
705 private buildTasksQueueOptions (
706 tasksQueueOptions: TasksQueueOptions
707 ): TasksQueueOptions {
708 return {
709 ...{
710 size: Math.pow(this.maxSize, 2),
711 concurrency: 1,
712 taskStealing: true,
713 tasksStealingOnBackPressure: true
714 },
715 ...tasksQueueOptions
716 }
717 }
718
719 /**
720 * Whether the pool is full or not.
721 *
722 * The pool filling boolean status.
723 */
724 protected get full (): boolean {
725 return this.workerNodes.length >= this.maxSize
726 }
727
728 /**
729 * Whether the pool is busy or not.
730 *
731 * The pool busyness boolean status.
732 */
733 protected abstract get busy (): boolean
734
735 /**
736 * Whether worker nodes are executing concurrently their tasks quota or not.
737 *
738 * @returns Worker nodes busyness boolean status.
739 */
740 protected internalBusy (): boolean {
741 if (this.opts.enableTasksQueue === true) {
742 return (
743 this.workerNodes.findIndex(
744 workerNode =>
745 workerNode.info.ready &&
746 workerNode.usage.tasks.executing <
747 (this.opts.tasksQueueOptions?.concurrency as number)
748 ) === -1
749 )
750 }
751 return (
752 this.workerNodes.findIndex(
753 workerNode =>
754 workerNode.info.ready && workerNode.usage.tasks.executing === 0
755 ) === -1
756 )
757 }
758
759 private async sendTaskFunctionOperationToWorker (
760 workerNodeKey: number,
761 message: MessageValue<Data>
762 ): Promise<boolean> {
763 const workerId = this.getWorkerInfo(workerNodeKey).id as number
764 return await new Promise<boolean>((resolve, reject) => {
765 this.registerWorkerMessageListener(workerNodeKey, message => {
766 if (
767 message.workerId === workerId &&
768 message.taskFunctionOperationStatus === true
769 ) {
770 resolve(true)
771 } else if (
772 message.workerId === workerId &&
773 message.taskFunctionOperationStatus === false
774 ) {
775 reject(
776 new Error(
777 `Task function operation ${
778 message.taskFunctionOperation as string
779 } failed on worker ${message.workerId}`
780 )
781 )
782 }
783 })
784 this.sendToWorker(workerNodeKey, message)
785 })
786 }
787
788 private async sendTaskFunctionOperationToWorkers (
789 message: Omit<MessageValue<Data>, 'workerId'>
790 ): Promise<boolean> {
791 return await new Promise<boolean>((resolve, reject) => {
792 const responsesReceived = new Array<MessageValue<Data | Response>>()
793 for (const [workerNodeKey] of this.workerNodes.entries()) {
794 this.registerWorkerMessageListener(workerNodeKey, message => {
795 if (message.taskFunctionOperationStatus != null) {
796 responsesReceived.push(message)
797 if (
798 responsesReceived.length === this.workerNodes.length &&
799 responsesReceived.every(
800 message => message.taskFunctionOperationStatus === true
801 )
802 ) {
803 resolve(true)
804 } else if (
805 responsesReceived.length === this.workerNodes.length &&
806 responsesReceived.some(
807 message => message.taskFunctionOperationStatus === false
808 )
809 ) {
810 reject(
811 new Error(
812 `Task function operation ${
813 message.taskFunctionOperation as string
814 } failed on worker ${message.workerId as number}`
815 )
816 )
817 }
818 }
819 })
820 this.sendToWorker(workerNodeKey, message)
821 }
822 })
823 }
824
825 /** @inheritDoc */
826 public hasTaskFunction (name: string): boolean {
827 for (const workerNode of this.workerNodes) {
828 if (
829 Array.isArray(workerNode.info.taskFunctionNames) &&
830 workerNode.info.taskFunctionNames.includes(name)
831 ) {
832 return true
833 }
834 }
835 return false
836 }
837
838 /** @inheritDoc */
839 public async addTaskFunction (
840 name: string,
841 taskFunction: TaskFunction<Data, Response>
842 ): Promise<boolean> {
843 this.taskFunctions.set(name, taskFunction)
844 return await this.sendTaskFunctionOperationToWorkers({
845 taskFunctionOperation: 'add',
846 taskFunctionName: name,
847 taskFunction: taskFunction.toString()
848 })
849 }
850
851 /** @inheritDoc */
852 public async removeTaskFunction (name: string): Promise<boolean> {
853 this.taskFunctions.delete(name)
854 return await this.sendTaskFunctionOperationToWorkers({
855 taskFunctionOperation: 'remove',
856 taskFunctionName: name
857 })
858 }
859
860 /** @inheritDoc */
861 public listTaskFunctionNames (): string[] {
862 for (const workerNode of this.workerNodes) {
863 if (
864 Array.isArray(workerNode.info.taskFunctionNames) &&
865 workerNode.info.taskFunctionNames.length > 0
866 ) {
867 return workerNode.info.taskFunctionNames
868 }
869 }
870 return []
871 }
872
873 /** @inheritDoc */
874 public async setDefaultTaskFunction (name: string): Promise<boolean> {
875 return await this.sendTaskFunctionOperationToWorkers({
876 taskFunctionOperation: 'default',
877 taskFunctionName: name
878 })
879 }
880
881 private shallExecuteTask (workerNodeKey: number): boolean {
882 return (
883 this.tasksQueueSize(workerNodeKey) === 0 &&
884 this.workerNodes[workerNodeKey].usage.tasks.executing <
885 (this.opts.tasksQueueOptions?.concurrency as number)
886 )
887 }
888
889 /** @inheritDoc */
890 public async execute (
891 data?: Data,
892 name?: string,
893 transferList?: TransferListItem[]
894 ): Promise<Response> {
895 return await new Promise<Response>((resolve, reject) => {
896 if (!this.started) {
897 reject(new Error('Cannot execute a task on not started pool'))
898 return
899 }
900 if (name != null && typeof name !== 'string') {
901 reject(new TypeError('name argument must be a string'))
902 return
903 }
904 if (
905 name != null &&
906 typeof name === 'string' &&
907 name.trim().length === 0
908 ) {
909 reject(new TypeError('name argument must not be an empty string'))
910 return
911 }
912 if (transferList != null && !Array.isArray(transferList)) {
913 reject(new TypeError('transferList argument must be an array'))
914 return
915 }
916 const timestamp = performance.now()
917 const workerNodeKey = this.chooseWorkerNode()
918 const task: Task<Data> = {
919 name: name ?? DEFAULT_TASK_NAME,
920 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
921 data: data ?? ({} as Data),
922 transferList,
923 timestamp,
924 taskId: randomUUID()
925 }
926 this.promiseResponseMap.set(task.taskId as string, {
927 resolve,
928 reject,
929 workerNodeKey
930 })
931 if (
932 this.opts.enableTasksQueue === false ||
933 (this.opts.enableTasksQueue === true &&
934 this.shallExecuteTask(workerNodeKey))
935 ) {
936 this.executeTask(workerNodeKey, task)
937 } else {
938 this.enqueueTask(workerNodeKey, task)
939 }
940 })
941 }
942
943 /** @inheritdoc */
944 public start (): void {
945 this.starting = true
946 while (
947 this.workerNodes.reduce(
948 (accumulator, workerNode) =>
949 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
950 0
951 ) < this.numberOfWorkers
952 ) {
953 this.createAndSetupWorkerNode()
954 }
955 this.starting = false
956 this.started = true
957 }
958
959 /** @inheritDoc */
960 public async destroy (): Promise<void> {
961 await Promise.all(
962 this.workerNodes.map(async (_, workerNodeKey) => {
963 await this.destroyWorkerNode(workerNodeKey)
964 })
965 )
966 this.emitter?.emit(PoolEvents.destroy, this.info)
967 this.started = false
968 }
969
970 protected async sendKillMessageToWorker (
971 workerNodeKey: number
972 ): Promise<void> {
973 await new Promise<void>((resolve, reject) => {
974 this.registerWorkerMessageListener(workerNodeKey, message => {
975 if (message.kill === 'success') {
976 resolve()
977 } else if (message.kill === 'failure') {
978 reject(
979 new Error(
980 `Worker ${
981 message.workerId as number
982 } kill message handling failed`
983 )
984 )
985 }
986 })
987 this.sendToWorker(workerNodeKey, { kill: true })
988 })
989 }
990
991 /**
992 * Terminates the worker node given its worker node key.
993 *
994 * @param workerNodeKey - The worker node key.
995 */
996 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
997
998 /**
999 * Setup hook to execute code before worker nodes are created in the abstract constructor.
1000 * Can be overridden.
1001 *
1002 * @virtual
1003 */
1004 protected setupHook (): void {
1005 /* Intentionally empty */
1006 }
1007
1008 /**
1009 * Should return whether the worker is the main worker or not.
1010 */
1011 protected abstract isMain (): boolean
1012
1013 /**
1014 * Hook executed before the worker task execution.
1015 * Can be overridden.
1016 *
1017 * @param workerNodeKey - The worker node key.
1018 * @param task - The task to execute.
1019 */
1020 protected beforeTaskExecutionHook (
1021 workerNodeKey: number,
1022 task: Task<Data>
1023 ): void {
1024 if (this.workerNodes[workerNodeKey]?.usage != null) {
1025 const workerUsage = this.workerNodes[workerNodeKey].usage
1026 ++workerUsage.tasks.executing
1027 this.updateWaitTimeWorkerUsage(workerUsage, task)
1028 }
1029 if (
1030 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1031 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
1032 task.name as string
1033 ) != null
1034 ) {
1035 const taskFunctionWorkerUsage = this.workerNodes[
1036 workerNodeKey
1037 ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
1038 ++taskFunctionWorkerUsage.tasks.executing
1039 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
1040 }
1041 }
1042
1043 /**
1044 * Hook executed after the worker task execution.
1045 * Can be overridden.
1046 *
1047 * @param workerNodeKey - The worker node key.
1048 * @param message - The received message.
1049 */
1050 protected afterTaskExecutionHook (
1051 workerNodeKey: number,
1052 message: MessageValue<Response>
1053 ): void {
1054 if (this.workerNodes[workerNodeKey]?.usage != null) {
1055 const workerUsage = this.workerNodes[workerNodeKey].usage
1056 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
1057 this.updateRunTimeWorkerUsage(workerUsage, message)
1058 this.updateEluWorkerUsage(workerUsage, message)
1059 }
1060 if (
1061 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1062 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
1063 message.taskPerformance?.name as string
1064 ) != null
1065 ) {
1066 const taskFunctionWorkerUsage = this.workerNodes[
1067 workerNodeKey
1068 ].getTaskFunctionWorkerUsage(
1069 message.taskPerformance?.name as string
1070 ) as WorkerUsage
1071 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
1072 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
1073 this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
1074 }
1075 }
1076
1077 /**
1078 * Whether the worker node shall update its task function worker usage or not.
1079 *
1080 * @param workerNodeKey - The worker node key.
1081 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1082 */
1083 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
1084 const workerInfo = this.getWorkerInfo(workerNodeKey)
1085 return (
1086 workerInfo != null &&
1087 Array.isArray(workerInfo.taskFunctionNames) &&
1088 workerInfo.taskFunctionNames.length > 2
1089 )
1090 }
1091
1092 private updateTaskStatisticsWorkerUsage (
1093 workerUsage: WorkerUsage,
1094 message: MessageValue<Response>
1095 ): void {
1096 const workerTaskStatistics = workerUsage.tasks
1097 if (
1098 workerTaskStatistics.executing != null &&
1099 workerTaskStatistics.executing > 0
1100 ) {
1101 --workerTaskStatistics.executing
1102 }
1103 if (message.workerError == null) {
1104 ++workerTaskStatistics.executed
1105 } else {
1106 ++workerTaskStatistics.failed
1107 }
1108 }
1109
1110 private updateRunTimeWorkerUsage (
1111 workerUsage: WorkerUsage,
1112 message: MessageValue<Response>
1113 ): void {
1114 if (message.workerError != null) {
1115 return
1116 }
1117 updateMeasurementStatistics(
1118 workerUsage.runTime,
1119 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
1120 message.taskPerformance?.runTime ?? 0
1121 )
1122 }
1123
1124 private updateWaitTimeWorkerUsage (
1125 workerUsage: WorkerUsage,
1126 task: Task<Data>
1127 ): void {
1128 const timestamp = performance.now()
1129 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
1130 updateMeasurementStatistics(
1131 workerUsage.waitTime,
1132 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
1133 taskWaitTime
1134 )
1135 }
1136
1137 private updateEluWorkerUsage (
1138 workerUsage: WorkerUsage,
1139 message: MessageValue<Response>
1140 ): void {
1141 if (message.workerError != null) {
1142 return
1143 }
1144 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
1145 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
1146 updateMeasurementStatistics(
1147 workerUsage.elu.active,
1148 eluTaskStatisticsRequirements,
1149 message.taskPerformance?.elu?.active ?? 0
1150 )
1151 updateMeasurementStatistics(
1152 workerUsage.elu.idle,
1153 eluTaskStatisticsRequirements,
1154 message.taskPerformance?.elu?.idle ?? 0
1155 )
1156 if (eluTaskStatisticsRequirements.aggregate) {
1157 if (message.taskPerformance?.elu != null) {
1158 if (workerUsage.elu.utilization != null) {
1159 workerUsage.elu.utilization =
1160 (workerUsage.elu.utilization +
1161 message.taskPerformance.elu.utilization) /
1162 2
1163 } else {
1164 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
1165 }
1166 }
1167 }
1168 }
1169
1170 /**
1171 * Chooses a worker node for the next task.
1172 *
1173 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
1174 *
1175 * @returns The chosen worker node key
1176 */
1177 private chooseWorkerNode (): number {
1178 if (this.shallCreateDynamicWorker()) {
1179 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
1180 if (
1181 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1182 ) {
1183 return workerNodeKey
1184 }
1185 }
1186 return this.workerChoiceStrategyContext.execute()
1187 }
1188
1189 /**
1190 * Conditions for dynamic worker creation.
1191 *
1192 * @returns Whether to create a dynamic worker or not.
1193 */
1194 private shallCreateDynamicWorker (): boolean {
1195 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
1196 }
1197
1198 /**
1199 * Sends a message to worker given its worker node key.
1200 *
1201 * @param workerNodeKey - The worker node key.
1202 * @param message - The message.
1203 * @param transferList - The optional array of transferable objects.
1204 */
1205 protected abstract sendToWorker (
1206 workerNodeKey: number,
1207 message: MessageValue<Data>,
1208 transferList?: TransferListItem[]
1209 ): void
1210
1211 /**
1212 * Creates a new worker.
1213 *
1214 * @returns Newly created worker.
1215 */
1216 protected abstract createWorker (): Worker
1217
1218 /**
1219 * Creates a new, completely set up worker node.
1220 *
1221 * @returns New, completely set up worker node key.
1222 */
1223 protected createAndSetupWorkerNode (): number {
1224 const worker = this.createWorker()
1225
1226 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
1227 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
1228 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1229 worker.on('error', error => {
1230 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1231 const workerInfo = this.getWorkerInfo(workerNodeKey)
1232 workerInfo.ready = false
1233 this.workerNodes[workerNodeKey].closeChannel()
1234 this.emitter?.emit(PoolEvents.error, error)
1235 if (
1236 this.opts.restartWorkerOnError === true &&
1237 this.started &&
1238 !this.starting
1239 ) {
1240 if (workerInfo.dynamic) {
1241 this.createAndSetupDynamicWorkerNode()
1242 } else {
1243 this.createAndSetupWorkerNode()
1244 }
1245 }
1246 if (this.opts.enableTasksQueue === true) {
1247 this.redistributeQueuedTasks(workerNodeKey)
1248 }
1249 })
1250 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
1251 worker.once('exit', () => {
1252 this.removeWorkerNode(worker)
1253 })
1254
1255 const workerNodeKey = this.addWorkerNode(worker)
1256
1257 this.afterWorkerNodeSetup(workerNodeKey)
1258
1259 return workerNodeKey
1260 }
1261
1262 /**
1263 * Creates a new, completely set up dynamic worker node.
1264 *
1265 * @returns New, completely set up dynamic worker node key.
1266 */
1267 protected createAndSetupDynamicWorkerNode (): number {
1268 const workerNodeKey = this.createAndSetupWorkerNode()
1269 this.registerWorkerMessageListener(workerNodeKey, message => {
1270 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1271 message.workerId
1272 )
1273 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
1274 // Kill message received from worker
1275 if (
1276 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1277 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
1278 ((this.opts.enableTasksQueue === false &&
1279 workerUsage.tasks.executing === 0) ||
1280 (this.opts.enableTasksQueue === true &&
1281 workerUsage.tasks.executing === 0 &&
1282 this.tasksQueueSize(localWorkerNodeKey) === 0)))
1283 ) {
1284 this.destroyWorkerNode(localWorkerNodeKey).catch(error => {
1285 this.emitter?.emit(PoolEvents.error, error)
1286 })
1287 }
1288 })
1289 const workerInfo = this.getWorkerInfo(workerNodeKey)
1290 this.sendToWorker(workerNodeKey, {
1291 checkActive: true
1292 })
1293 if (this.taskFunctions.size > 0) {
1294 for (const [taskFunctionName, taskFunction] of this.taskFunctions) {
1295 this.sendTaskFunctionOperationToWorker(workerNodeKey, {
1296 taskFunctionOperation: 'add',
1297 taskFunctionName,
1298 taskFunction: taskFunction.toString()
1299 }).catch(error => {
1300 this.emitter?.emit(PoolEvents.error, error)
1301 })
1302 }
1303 }
1304 workerInfo.dynamic = true
1305 if (
1306 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
1307 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1308 ) {
1309 workerInfo.ready = true
1310 }
1311 this.checkAndEmitDynamicWorkerCreationEvents()
1312 return workerNodeKey
1313 }
1314
1315 /**
1316 * Registers a listener callback on the worker given its worker node key.
1317 *
1318 * @param workerNodeKey - The worker node key.
1319 * @param listener - The message listener callback.
1320 */
1321 protected abstract registerWorkerMessageListener<
1322 Message extends Data | Response
1323 >(
1324 workerNodeKey: number,
1325 listener: (message: MessageValue<Message>) => void
1326 ): void
1327
1328 /**
1329 * Method hooked up after a worker node has been newly created.
1330 * Can be overridden.
1331 *
1332 * @param workerNodeKey - The newly created worker node key.
1333 */
1334 protected afterWorkerNodeSetup (workerNodeKey: number): void {
1335 // Listen to worker messages.
1336 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
1337 // Send the startup message to worker.
1338 this.sendStartupMessageToWorker(workerNodeKey)
1339 // Send the statistics message to worker.
1340 this.sendStatisticsMessageToWorker(workerNodeKey)
1341 if (this.opts.enableTasksQueue === true) {
1342 if (this.opts.tasksQueueOptions?.taskStealing === true) {
1343 this.workerNodes[workerNodeKey].onEmptyQueue =
1344 this.taskStealingOnEmptyQueue.bind(this)
1345 }
1346 if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
1347 this.workerNodes[workerNodeKey].onBackPressure =
1348 this.tasksStealingOnBackPressure.bind(this)
1349 }
1350 }
1351 }
1352
1353 /**
1354 * Sends the startup message to worker given its worker node key.
1355 *
1356 * @param workerNodeKey - The worker node key.
1357 */
1358 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1359
1360 /**
1361 * Sends the statistics message to worker given its worker node key.
1362 *
1363 * @param workerNodeKey - The worker node key.
1364 */
1365 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
1366 this.sendToWorker(workerNodeKey, {
1367 statistics: {
1368 runTime:
1369 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1370 .runTime.aggregate,
1371 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1372 .elu.aggregate
1373 }
1374 })
1375 }
1376
1377 private redistributeQueuedTasks (workerNodeKey: number): void {
1378 while (this.tasksQueueSize(workerNodeKey) > 0) {
1379 const destinationWorkerNodeKey = this.workerNodes.reduce(
1380 (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
1381 return workerNode.info.ready &&
1382 workerNode.usage.tasks.queued <
1383 workerNodes[minWorkerNodeKey].usage.tasks.queued
1384 ? workerNodeKey
1385 : minWorkerNodeKey
1386 },
1387 0
1388 )
1389 const task = this.dequeueTask(workerNodeKey) as Task<Data>
1390 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1391 this.executeTask(destinationWorkerNodeKey, task)
1392 } else {
1393 this.enqueueTask(destinationWorkerNodeKey, task)
1394 }
1395 }
1396 }
1397
1398 private updateTaskStolenStatisticsWorkerUsage (
1399 workerNodeKey: number,
1400 taskName: string
1401 ): void {
1402 const workerNode = this.workerNodes[workerNodeKey]
1403 if (workerNode?.usage != null) {
1404 ++workerNode.usage.tasks.stolen
1405 }
1406 if (
1407 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1408 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1409 ) {
1410 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1411 taskName
1412 ) as WorkerUsage
1413 ++taskFunctionWorkerUsage.tasks.stolen
1414 }
1415 }
1416
1417 private taskStealingOnEmptyQueue (workerId: number): void {
1418 const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
1419 const workerNodes = this.workerNodes
1420 .slice()
1421 .sort(
1422 (workerNodeA, workerNodeB) =>
1423 workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
1424 )
1425 const sourceWorkerNode = workerNodes.find(
1426 workerNode =>
1427 workerNode.info.ready &&
1428 workerNode.info.id !== workerId &&
1429 workerNode.usage.tasks.queued > 0
1430 )
1431 if (sourceWorkerNode != null) {
1432 const task = sourceWorkerNode.popTask() as Task<Data>
1433 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1434 this.executeTask(destinationWorkerNodeKey, task)
1435 } else {
1436 this.enqueueTask(destinationWorkerNodeKey, task)
1437 }
1438 this.updateTaskStolenStatisticsWorkerUsage(
1439 destinationWorkerNodeKey,
1440 task.name as string
1441 )
1442 }
1443 }
1444
1445 private tasksStealingOnBackPressure (workerId: number): void {
1446 const sizeOffset = 1
1447 if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) {
1448 return
1449 }
1450 const sourceWorkerNode =
1451 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
1452 const workerNodes = this.workerNodes
1453 .slice()
1454 .sort(
1455 (workerNodeA, workerNodeB) =>
1456 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1457 )
1458 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1459 if (
1460 sourceWorkerNode.usage.tasks.queued > 0 &&
1461 workerNode.info.ready &&
1462 workerNode.info.id !== workerId &&
1463 workerNode.usage.tasks.queued <
1464 (this.opts.tasksQueueOptions?.size as number) - sizeOffset
1465 ) {
1466 const task = sourceWorkerNode.popTask() as Task<Data>
1467 if (this.shallExecuteTask(workerNodeKey)) {
1468 this.executeTask(workerNodeKey, task)
1469 } else {
1470 this.enqueueTask(workerNodeKey, task)
1471 }
1472 this.updateTaskStolenStatisticsWorkerUsage(
1473 workerNodeKey,
1474 task.name as string
1475 )
1476 }
1477 }
1478 }
1479
1480 /**
1481 * This method is the listener registered for each worker message.
1482 *
1483 * @returns The listener function to execute when a message is received from a worker.
1484 */
1485 protected workerListener (): (message: MessageValue<Response>) => void {
1486 return message => {
1487 this.checkMessageWorkerId(message)
1488 if (message.ready != null && message.taskFunctionNames != null) {
1489 // Worker ready response received from worker
1490 this.handleWorkerReadyResponse(message)
1491 } else if (message.taskId != null) {
1492 // Task execution response received from worker
1493 this.handleTaskExecutionResponse(message)
1494 } else if (message.taskFunctionNames != null) {
1495 // Task function names message received from worker
1496 this.getWorkerInfo(
1497 this.getWorkerNodeKeyByWorkerId(message.workerId)
1498 ).taskFunctionNames = message.taskFunctionNames
1499 }
1500 }
1501 }
1502
1503 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
1504 if (message.ready === false) {
1505 throw new Error(
1506 `Worker ${message.workerId as number} failed to initialize`
1507 )
1508 }
1509 const workerInfo = this.getWorkerInfo(
1510 this.getWorkerNodeKeyByWorkerId(message.workerId)
1511 )
1512 workerInfo.ready = message.ready as boolean
1513 workerInfo.taskFunctionNames = message.taskFunctionNames
1514 if (this.emitter != null && this.ready) {
1515 this.emitter.emit(PoolEvents.ready, this.info)
1516 }
1517 }
1518
1519 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1520 const { taskId, workerError, data } = message
1521 const promiseResponse = this.promiseResponseMap.get(taskId as string)
1522 if (promiseResponse != null) {
1523 if (workerError != null) {
1524 this.emitter?.emit(PoolEvents.taskError, workerError)
1525 promiseResponse.reject(workerError.message)
1526 } else {
1527 promiseResponse.resolve(data as Response)
1528 }
1529 const workerNodeKey = promiseResponse.workerNodeKey
1530 this.afterTaskExecutionHook(workerNodeKey, message)
1531 this.workerChoiceStrategyContext.update(workerNodeKey)
1532 this.promiseResponseMap.delete(taskId as string)
1533 if (
1534 this.opts.enableTasksQueue === true &&
1535 this.tasksQueueSize(workerNodeKey) > 0 &&
1536 this.workerNodes[workerNodeKey].usage.tasks.executing <
1537 (this.opts.tasksQueueOptions?.concurrency as number)
1538 ) {
1539 this.executeTask(
1540 workerNodeKey,
1541 this.dequeueTask(workerNodeKey) as Task<Data>
1542 )
1543 }
1544 }
1545 }
1546
1547 private checkAndEmitTaskExecutionEvents (): void {
1548 if (this.busy) {
1549 this.emitter?.emit(PoolEvents.busy, this.info)
1550 }
1551 }
1552
1553 private checkAndEmitTaskQueuingEvents (): void {
1554 if (this.hasBackPressure()) {
1555 this.emitter?.emit(PoolEvents.backPressure, this.info)
1556 }
1557 }
1558
1559 private checkAndEmitDynamicWorkerCreationEvents (): void {
1560 if (this.type === PoolTypes.dynamic) {
1561 if (this.full) {
1562 this.emitter?.emit(PoolEvents.full, this.info)
1563 }
1564 }
1565 }
1566
1567 /**
1568 * Gets the worker information given its worker node key.
1569 *
1570 * @param workerNodeKey - The worker node key.
1571 * @returns The worker information.
1572 */
1573 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
1574 return this.workerNodes[workerNodeKey].info
1575 }
1576
1577 /**
1578 * Adds the given worker in the pool worker nodes.
1579 *
1580 * @param worker - The worker.
1581 * @returns The added worker node key.
1582 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1583 */
1584 private addWorkerNode (worker: Worker): number {
1585 const workerNode = new WorkerNode<Worker, Data>(
1586 worker,
1587 this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2)
1588 )
1589 // Flag the worker node as ready at pool startup.
1590 if (this.starting) {
1591 workerNode.info.ready = true
1592 }
1593 this.workerNodes.push(workerNode)
1594 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1595 if (workerNodeKey === -1) {
1596 throw new Error('Worker added not found in worker nodes')
1597 }
1598 return workerNodeKey
1599 }
1600
1601 /**
1602 * Removes the given worker from the pool worker nodes.
1603 *
1604 * @param worker - The worker.
1605 */
1606 private removeWorkerNode (worker: Worker): void {
1607 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1608 if (workerNodeKey !== -1) {
1609 this.workerNodes.splice(workerNodeKey, 1)
1610 this.workerChoiceStrategyContext.remove(workerNodeKey)
1611 }
1612 }
1613
1614 /** @inheritDoc */
1615 public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
1616 return (
1617 this.opts.enableTasksQueue === true &&
1618 this.workerNodes[workerNodeKey].hasBackPressure()
1619 )
1620 }
1621
1622 private hasBackPressure (): boolean {
1623 return (
1624 this.opts.enableTasksQueue === true &&
1625 this.workerNodes.findIndex(
1626 workerNode => !workerNode.hasBackPressure()
1627 ) === -1
1628 )
1629 }
1630
1631 /**
1632 * Executes the given task on the worker given its worker node key.
1633 *
1634 * @param workerNodeKey - The worker node key.
1635 * @param task - The task to execute.
1636 */
1637 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1638 this.beforeTaskExecutionHook(workerNodeKey, task)
1639 this.sendToWorker(workerNodeKey, task, task.transferList)
1640 this.checkAndEmitTaskExecutionEvents()
1641 }
1642
1643 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1644 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1645 this.checkAndEmitTaskQueuingEvents()
1646 return tasksQueueSize
1647 }
1648
1649 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1650 return this.workerNodes[workerNodeKey].dequeueTask()
1651 }
1652
1653 private tasksQueueSize (workerNodeKey: number): number {
1654 return this.workerNodes[workerNodeKey].tasksQueueSize()
1655 }
1656
1657 protected flushTasksQueue (workerNodeKey: number): void {
1658 while (this.tasksQueueSize(workerNodeKey) > 0) {
1659 this.executeTask(
1660 workerNodeKey,
1661 this.dequeueTask(workerNodeKey) as Task<Data>
1662 )
1663 }
1664 this.workerNodes[workerNodeKey].clearTasksQueue()
1665 }
1666
1667 private flushTasksQueues (): void {
1668 for (const [workerNodeKey] of this.workerNodes.entries()) {
1669 this.flushTasksQueue(workerNodeKey)
1670 }
1671 }
1672 }