fix: fix queued tasks rescheduling
[poolifier.git] / src / pools / abstract-pool.ts
1 import { randomUUID } from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import { existsSync } from 'node:fs'
4 import { type TransferListItem } from 'node:worker_threads'
5 import type {
6 MessageValue,
7 PromiseResponseWrapper,
8 Task,
9 Writable
10 } from '../utility-types'
11 import {
12 DEFAULT_TASK_NAME,
13 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
14 EMPTY_FUNCTION,
15 isKillBehavior,
16 isPlainObject,
17 median,
18 round,
19 updateMeasurementStatistics
20 } from '../utils'
21 import { KillBehaviors } from '../worker/worker-options'
22 import {
23 type IPool,
24 PoolEmitter,
25 PoolEvents,
26 type PoolInfo,
27 type PoolOptions,
28 type PoolType,
29 PoolTypes,
30 type TasksQueueOptions
31 } from './pool'
32 import type {
33 IWorker,
34 IWorkerNode,
35 WorkerInfo,
36 WorkerType,
37 WorkerUsage
38 } from './worker'
39 import {
40 type MeasurementStatisticsRequirements,
41 Measurements,
42 WorkerChoiceStrategies,
43 type WorkerChoiceStrategy,
44 type WorkerChoiceStrategyOptions
45 } from './selection-strategies/selection-strategies-types'
46 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
47 import { version } from './version'
48 import { WorkerNode } from './worker-node'
49
50 /**
51 * Base class that implements some shared logic for all poolifier pools.
52 *
53 * @typeParam Worker - Type of worker which manages this pool.
54 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
55 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
56 */
57 export abstract class AbstractPool<
58 Worker extends IWorker,
59 Data = unknown,
60 Response = unknown
61 > implements IPool<Worker, Data, Response> {
62 /** @inheritDoc */
63 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
64
65 /** @inheritDoc */
66 public readonly emitter?: PoolEmitter
67
68 /**
69 * The task execution response promise map.
70 *
71 * - `key`: The message id of each submitted task.
72 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
73 *
74 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
75 */
76 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
77 new Map<string, PromiseResponseWrapper<Response>>()
78
79 /**
80 * Worker choice strategy context referencing a worker choice algorithm implementation.
81 */
82 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
83 Worker,
84 Data,
85 Response
86 >
87
88 /**
89 * Dynamic pool maximum size property placeholder.
90 */
91 protected readonly max?: number
92
93 /**
94 * Whether the pool is starting or not.
95 */
96 private readonly starting: boolean
97 /**
98 * Whether the pool is started or not.
99 */
100 private started: boolean
101 /**
102 * The start timestamp of the pool.
103 */
104 private readonly startTimestamp
105
106 /**
107 * Constructs a new poolifier pool.
108 *
109 * @param numberOfWorkers - Number of workers that this pool should manage.
110 * @param filePath - Path to the worker file.
111 * @param opts - Options for the pool.
112 */
113 public constructor (
114 protected readonly numberOfWorkers: number,
115 protected readonly filePath: string,
116 protected readonly opts: PoolOptions<Worker>
117 ) {
118 if (!this.isMain()) {
119 throw new Error(
120 'Cannot start a pool from a worker with the same type as the pool'
121 )
122 }
123 this.checkNumberOfWorkers(this.numberOfWorkers)
124 this.checkFilePath(this.filePath)
125 this.checkPoolOptions(this.opts)
126
127 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
128 this.executeTask = this.executeTask.bind(this)
129 this.enqueueTask = this.enqueueTask.bind(this)
130
131 if (this.opts.enableEvents === true) {
132 this.emitter = new PoolEmitter()
133 }
134 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
135 Worker,
136 Data,
137 Response
138 >(
139 this,
140 this.opts.workerChoiceStrategy,
141 this.opts.workerChoiceStrategyOptions
142 )
143
144 this.setupHook()
145
146 this.starting = true
147 this.startPool()
148 this.starting = false
149 this.started = true
150
151 this.startTimestamp = performance.now()
152 }
153
154 private checkFilePath (filePath: string): void {
155 if (
156 filePath == null ||
157 typeof filePath !== 'string' ||
158 (typeof filePath === 'string' && filePath.trim().length === 0)
159 ) {
160 throw new Error('Please specify a file with a worker implementation')
161 }
162 if (!existsSync(filePath)) {
163 throw new Error(`Cannot find the worker file '${filePath}'`)
164 }
165 }
166
167 private checkNumberOfWorkers (numberOfWorkers: number): void {
168 if (numberOfWorkers == null) {
169 throw new Error(
170 'Cannot instantiate a pool without specifying the number of workers'
171 )
172 } else if (!Number.isSafeInteger(numberOfWorkers)) {
173 throw new TypeError(
174 'Cannot instantiate a pool with a non safe integer number of workers'
175 )
176 } else if (numberOfWorkers < 0) {
177 throw new RangeError(
178 'Cannot instantiate a pool with a negative number of workers'
179 )
180 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
181 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
182 }
183 }
184
185 protected checkDynamicPoolSize (min: number, max: number): void {
186 if (this.type === PoolTypes.dynamic) {
187 if (max == null) {
188 throw new TypeError(
189 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
190 )
191 } else if (!Number.isSafeInteger(max)) {
192 throw new TypeError(
193 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
194 )
195 } else if (min > max) {
196 throw new RangeError(
197 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
198 )
199 } else if (max === 0) {
200 throw new RangeError(
201 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
202 )
203 } else if (min === max) {
204 throw new RangeError(
205 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
206 )
207 }
208 }
209 }
210
211 private checkPoolOptions (opts: PoolOptions<Worker>): void {
212 if (isPlainObject(opts)) {
213 this.opts.workerChoiceStrategy =
214 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
215 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
216 this.opts.workerChoiceStrategyOptions = {
217 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
218 ...opts.workerChoiceStrategyOptions
219 }
220 this.checkValidWorkerChoiceStrategyOptions(
221 this.opts.workerChoiceStrategyOptions
222 )
223 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
224 this.opts.enableEvents = opts.enableEvents ?? true
225 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
226 if (this.opts.enableTasksQueue) {
227 this.checkValidTasksQueueOptions(
228 opts.tasksQueueOptions as TasksQueueOptions
229 )
230 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
231 opts.tasksQueueOptions as TasksQueueOptions
232 )
233 }
234 } else {
235 throw new TypeError('Invalid pool options: must be a plain object')
236 }
237 }
238
239 private checkValidWorkerChoiceStrategy (
240 workerChoiceStrategy: WorkerChoiceStrategy
241 ): void {
242 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
243 throw new Error(
244 `Invalid worker choice strategy '${workerChoiceStrategy}'`
245 )
246 }
247 }
248
249 private checkValidWorkerChoiceStrategyOptions (
250 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
251 ): void {
252 if (!isPlainObject(workerChoiceStrategyOptions)) {
253 throw new TypeError(
254 'Invalid worker choice strategy options: must be a plain object'
255 )
256 }
257 if (
258 workerChoiceStrategyOptions.choiceRetries != null &&
259 !Number.isSafeInteger(workerChoiceStrategyOptions.choiceRetries)
260 ) {
261 throw new TypeError(
262 'Invalid worker choice strategy options: choice retries must be an integer'
263 )
264 }
265 if (
266 workerChoiceStrategyOptions.choiceRetries != null &&
267 workerChoiceStrategyOptions.choiceRetries <= 0
268 ) {
269 throw new RangeError(
270 `Invalid worker choice strategy options: choice retries '${workerChoiceStrategyOptions.choiceRetries}' must be greater than zero`
271 )
272 }
273 if (
274 workerChoiceStrategyOptions.weights != null &&
275 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
276 ) {
277 throw new Error(
278 'Invalid worker choice strategy options: must have a weight for each worker node'
279 )
280 }
281 if (
282 workerChoiceStrategyOptions.measurement != null &&
283 !Object.values(Measurements).includes(
284 workerChoiceStrategyOptions.measurement
285 )
286 ) {
287 throw new Error(
288 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
289 )
290 }
291 }
292
293 private checkValidTasksQueueOptions (
294 tasksQueueOptions: Writable<TasksQueueOptions>
295 ): void {
296 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
297 throw new TypeError('Invalid tasks queue options: must be a plain object')
298 }
299 if (
300 tasksQueueOptions?.concurrency != null &&
301 !Number.isSafeInteger(tasksQueueOptions.concurrency)
302 ) {
303 throw new TypeError(
304 'Invalid worker node tasks concurrency: must be an integer'
305 )
306 }
307 if (
308 tasksQueueOptions?.concurrency != null &&
309 tasksQueueOptions.concurrency <= 0
310 ) {
311 throw new RangeError(
312 `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero`
313 )
314 }
315 if (
316 tasksQueueOptions?.queueMaxSize != null &&
317 tasksQueueOptions?.size != null
318 ) {
319 throw new Error(
320 'Invalid tasks queue options: cannot specify both queueMaxSize and size'
321 )
322 }
323 if (tasksQueueOptions?.queueMaxSize != null) {
324 tasksQueueOptions.size = tasksQueueOptions.queueMaxSize
325 }
326 if (
327 tasksQueueOptions?.size != null &&
328 !Number.isSafeInteger(tasksQueueOptions.size)
329 ) {
330 throw new TypeError(
331 'Invalid worker node tasks queue max size: must be an integer'
332 )
333 }
334 if (tasksQueueOptions?.size != null && tasksQueueOptions.size <= 0) {
335 throw new RangeError(
336 `Invalid worker node tasks queue max size: ${tasksQueueOptions.size} is a negative integer or zero`
337 )
338 }
339 }
340
341 private startPool (): void {
342 while (
343 this.workerNodes.reduce(
344 (accumulator, workerNode) =>
345 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
346 0
347 ) < this.numberOfWorkers
348 ) {
349 this.createAndSetupWorkerNode()
350 }
351 }
352
353 /** @inheritDoc */
354 public get info (): PoolInfo {
355 return {
356 version,
357 type: this.type,
358 worker: this.worker,
359 ready: this.ready,
360 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
361 minSize: this.minSize,
362 maxSize: this.maxSize,
363 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
364 .runTime.aggregate &&
365 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
366 .waitTime.aggregate && { utilization: round(this.utilization) }),
367 workerNodes: this.workerNodes.length,
368 idleWorkerNodes: this.workerNodes.reduce(
369 (accumulator, workerNode) =>
370 workerNode.usage.tasks.executing === 0
371 ? accumulator + 1
372 : accumulator,
373 0
374 ),
375 busyWorkerNodes: this.workerNodes.reduce(
376 (accumulator, workerNode) =>
377 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
378 0
379 ),
380 executedTasks: this.workerNodes.reduce(
381 (accumulator, workerNode) =>
382 accumulator + workerNode.usage.tasks.executed,
383 0
384 ),
385 executingTasks: this.workerNodes.reduce(
386 (accumulator, workerNode) =>
387 accumulator + workerNode.usage.tasks.executing,
388 0
389 ),
390 ...(this.opts.enableTasksQueue === true && {
391 queuedTasks: this.workerNodes.reduce(
392 (accumulator, workerNode) =>
393 accumulator + workerNode.usage.tasks.queued,
394 0
395 )
396 }),
397 ...(this.opts.enableTasksQueue === true && {
398 maxQueuedTasks: this.workerNodes.reduce(
399 (accumulator, workerNode) =>
400 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
401 0
402 )
403 }),
404 ...(this.opts.enableTasksQueue === true && {
405 backPressure: this.hasBackPressure()
406 }),
407 failedTasks: this.workerNodes.reduce(
408 (accumulator, workerNode) =>
409 accumulator + workerNode.usage.tasks.failed,
410 0
411 ),
412 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
413 .runTime.aggregate && {
414 runTime: {
415 minimum: round(
416 Math.min(
417 ...this.workerNodes.map(
418 (workerNode) => workerNode.usage.runTime?.minimum ?? Infinity
419 )
420 )
421 ),
422 maximum: round(
423 Math.max(
424 ...this.workerNodes.map(
425 (workerNode) => workerNode.usage.runTime?.maximum ?? -Infinity
426 )
427 )
428 ),
429 average: round(
430 this.workerNodes.reduce(
431 (accumulator, workerNode) =>
432 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
433 0
434 ) /
435 this.workerNodes.reduce(
436 (accumulator, workerNode) =>
437 accumulator + (workerNode.usage.tasks?.executed ?? 0),
438 0
439 )
440 ),
441 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
442 .runTime.median && {
443 median: round(
444 median(
445 this.workerNodes.map(
446 (workerNode) => workerNode.usage.runTime?.median ?? 0
447 )
448 )
449 )
450 })
451 }
452 }),
453 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
454 .waitTime.aggregate && {
455 waitTime: {
456 minimum: round(
457 Math.min(
458 ...this.workerNodes.map(
459 (workerNode) => workerNode.usage.waitTime?.minimum ?? Infinity
460 )
461 )
462 ),
463 maximum: round(
464 Math.max(
465 ...this.workerNodes.map(
466 (workerNode) => workerNode.usage.waitTime?.maximum ?? -Infinity
467 )
468 )
469 ),
470 average: round(
471 this.workerNodes.reduce(
472 (accumulator, workerNode) =>
473 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
474 0
475 ) /
476 this.workerNodes.reduce(
477 (accumulator, workerNode) =>
478 accumulator + (workerNode.usage.tasks?.executed ?? 0),
479 0
480 )
481 ),
482 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
483 .waitTime.median && {
484 median: round(
485 median(
486 this.workerNodes.map(
487 (workerNode) => workerNode.usage.waitTime?.median ?? 0
488 )
489 )
490 )
491 })
492 }
493 })
494 }
495 }
496
497 /**
498 * The pool readiness boolean status.
499 */
500 private get ready (): boolean {
501 return (
502 this.workerNodes.reduce(
503 (accumulator, workerNode) =>
504 !workerNode.info.dynamic && workerNode.info.ready
505 ? accumulator + 1
506 : accumulator,
507 0
508 ) >= this.minSize
509 )
510 }
511
512 /**
513 * The approximate pool utilization.
514 *
515 * @returns The pool utilization.
516 */
517 private get utilization (): number {
518 const poolTimeCapacity =
519 (performance.now() - this.startTimestamp) * this.maxSize
520 const totalTasksRunTime = this.workerNodes.reduce(
521 (accumulator, workerNode) =>
522 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
523 0
524 )
525 const totalTasksWaitTime = this.workerNodes.reduce(
526 (accumulator, workerNode) =>
527 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
528 0
529 )
530 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
531 }
532
533 /**
534 * The pool type.
535 *
536 * If it is `'dynamic'`, it provides the `max` property.
537 */
538 protected abstract get type (): PoolType
539
540 /**
541 * The worker type.
542 */
543 protected abstract get worker (): WorkerType
544
545 /**
546 * The pool minimum size.
547 */
548 protected get minSize (): number {
549 return this.numberOfWorkers
550 }
551
552 /**
553 * The pool maximum size.
554 */
555 protected get maxSize (): number {
556 return this.max ?? this.numberOfWorkers
557 }
558
559 /**
560 * Checks if the worker id sent in the received message from a worker is valid.
561 *
562 * @param message - The received message.
563 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
564 */
565 private checkMessageWorkerId (message: MessageValue<Response>): void {
566 if (message.workerId == null) {
567 throw new Error('Worker message received without worker id')
568 } else if (
569 message.workerId != null &&
570 this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
571 ) {
572 throw new Error(
573 `Worker message received from unknown worker '${message.workerId}'`
574 )
575 }
576 }
577
578 /**
579 * Gets the given worker its worker node key.
580 *
581 * @param worker - The worker.
582 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
583 */
584 private getWorkerNodeKeyByWorker (worker: Worker): number {
585 return this.workerNodes.findIndex(
586 (workerNode) => workerNode.worker === worker
587 )
588 }
589
590 /**
591 * Gets the worker node key given its worker id.
592 *
593 * @param workerId - The worker id.
594 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
595 */
596 private getWorkerNodeKeyByWorkerId (workerId: number): number {
597 return this.workerNodes.findIndex(
598 (workerNode) => workerNode.info.id === workerId
599 )
600 }
601
602 /** @inheritDoc */
603 public setWorkerChoiceStrategy (
604 workerChoiceStrategy: WorkerChoiceStrategy,
605 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
606 ): void {
607 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
608 this.opts.workerChoiceStrategy = workerChoiceStrategy
609 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
610 this.opts.workerChoiceStrategy
611 )
612 if (workerChoiceStrategyOptions != null) {
613 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
614 }
615 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
616 workerNode.resetUsage()
617 this.sendStatisticsMessageToWorker(workerNodeKey)
618 }
619 }
620
621 /** @inheritDoc */
622 public setWorkerChoiceStrategyOptions (
623 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
624 ): void {
625 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
626 this.opts.workerChoiceStrategyOptions = {
627 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
628 ...workerChoiceStrategyOptions
629 }
630 this.workerChoiceStrategyContext.setOptions(
631 this.opts.workerChoiceStrategyOptions
632 )
633 }
634
635 /** @inheritDoc */
636 public enableTasksQueue (
637 enable: boolean,
638 tasksQueueOptions?: TasksQueueOptions
639 ): void {
640 if (this.opts.enableTasksQueue === true && !enable) {
641 this.flushTasksQueues()
642 }
643 this.opts.enableTasksQueue = enable
644 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
645 }
646
647 /** @inheritDoc */
648 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
649 if (this.opts.enableTasksQueue === true) {
650 this.checkValidTasksQueueOptions(tasksQueueOptions)
651 this.opts.tasksQueueOptions =
652 this.buildTasksQueueOptions(tasksQueueOptions)
653 this.setTasksQueueMaxSize(this.opts.tasksQueueOptions.size as number)
654 } else if (this.opts.tasksQueueOptions != null) {
655 delete this.opts.tasksQueueOptions
656 }
657 }
658
659 private setTasksQueueMaxSize (size: number): void {
660 for (const workerNode of this.workerNodes) {
661 workerNode.tasksQueueBackPressureSize = size
662 }
663 }
664
665 private buildTasksQueueOptions (
666 tasksQueueOptions: TasksQueueOptions
667 ): TasksQueueOptions {
668 return {
669 ...{
670 size: Math.pow(this.maxSize, 2),
671 concurrency: 1
672 },
673 ...tasksQueueOptions
674 }
675 }
676
677 /**
678 * Whether the pool is full or not.
679 *
680 * The pool filling boolean status.
681 */
682 protected get full (): boolean {
683 return this.workerNodes.length >= this.maxSize
684 }
685
686 /**
687 * Whether the pool is busy or not.
688 *
689 * The pool busyness boolean status.
690 */
691 protected abstract get busy (): boolean
692
693 /**
694 * Whether worker nodes are executing concurrently their tasks quota or not.
695 *
696 * @returns Worker nodes busyness boolean status.
697 */
698 protected internalBusy (): boolean {
699 if (this.opts.enableTasksQueue === true) {
700 return (
701 this.workerNodes.findIndex(
702 (workerNode) =>
703 workerNode.info.ready &&
704 workerNode.usage.tasks.executing <
705 (this.opts.tasksQueueOptions?.concurrency as number)
706 ) === -1
707 )
708 } else {
709 return (
710 this.workerNodes.findIndex(
711 (workerNode) =>
712 workerNode.info.ready && workerNode.usage.tasks.executing === 0
713 ) === -1
714 )
715 }
716 }
717
718 /** @inheritDoc */
719 public listTaskFunctions (): string[] {
720 for (const workerNode of this.workerNodes) {
721 if (
722 Array.isArray(workerNode.info.taskFunctions) &&
723 workerNode.info.taskFunctions.length > 0
724 ) {
725 return workerNode.info.taskFunctions
726 }
727 }
728 return []
729 }
730
731 /** @inheritDoc */
732 public async execute (
733 data?: Data,
734 name?: string,
735 transferList?: TransferListItem[]
736 ): Promise<Response> {
737 return await new Promise<Response>((resolve, reject) => {
738 if (!this.started) {
739 reject(new Error('Cannot execute a task on destroyed pool'))
740 }
741 if (name != null && typeof name !== 'string') {
742 reject(new TypeError('name argument must be a string'))
743 }
744 if (
745 name != null &&
746 typeof name === 'string' &&
747 name.trim().length === 0
748 ) {
749 reject(new TypeError('name argument must not be an empty string'))
750 }
751 if (transferList != null && !Array.isArray(transferList)) {
752 reject(new TypeError('transferList argument must be an array'))
753 }
754 const timestamp = performance.now()
755 const workerNodeKey = this.chooseWorkerNode()
756 const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo
757 if (
758 name != null &&
759 Array.isArray(workerInfo.taskFunctions) &&
760 !workerInfo.taskFunctions.includes(name)
761 ) {
762 reject(
763 new Error(`Task function '${name}' is not registered in the pool`)
764 )
765 }
766 const task: Task<Data> = {
767 name: name ?? DEFAULT_TASK_NAME,
768 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
769 data: data ?? ({} as Data),
770 transferList,
771 timestamp,
772 workerId: workerInfo.id as number,
773 taskId: randomUUID()
774 }
775 this.promiseResponseMap.set(task.taskId as string, {
776 resolve,
777 reject,
778 workerNodeKey
779 })
780 if (
781 this.opts.enableTasksQueue === false ||
782 (this.opts.enableTasksQueue === true &&
783 this.workerNodes[workerNodeKey].usage.tasks.executing <
784 (this.opts.tasksQueueOptions?.concurrency as number))
785 ) {
786 this.executeTask(workerNodeKey, task)
787 } else {
788 this.enqueueTask(workerNodeKey, task)
789 }
790 })
791 }
792
793 /** @inheritDoc */
794 public async destroy (): Promise<void> {
795 await Promise.all(
796 this.workerNodes.map(async (_, workerNodeKey) => {
797 await this.destroyWorkerNode(workerNodeKey)
798 })
799 )
800 this.emitter?.emit(PoolEvents.destroy, this.info)
801 this.started = false
802 }
803
804 protected async sendKillMessageToWorker (
805 workerNodeKey: number,
806 workerId: number
807 ): Promise<void> {
808 await new Promise<void>((resolve, reject) => {
809 this.registerWorkerMessageListener(workerNodeKey, (message) => {
810 if (message.kill === 'success') {
811 resolve()
812 } else if (message.kill === 'failure') {
813 reject(new Error(`Worker ${workerId} kill message handling failed`))
814 }
815 })
816 this.sendToWorker(workerNodeKey, { kill: true, workerId })
817 })
818 }
819
820 /**
821 * Terminates the worker node given its worker node key.
822 *
823 * @param workerNodeKey - The worker node key.
824 */
825 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
826
827 /**
828 * Setup hook to execute code before worker nodes are created in the abstract constructor.
829 * Can be overridden.
830 *
831 * @virtual
832 */
833 protected setupHook (): void {
834 // Intentionally empty
835 }
836
837 /**
838 * Should return whether the worker is the main worker or not.
839 */
840 protected abstract isMain (): boolean
841
842 /**
843 * Hook executed before the worker task execution.
844 * Can be overridden.
845 *
846 * @param workerNodeKey - The worker node key.
847 * @param task - The task to execute.
848 */
849 protected beforeTaskExecutionHook (
850 workerNodeKey: number,
851 task: Task<Data>
852 ): void {
853 if (this.workerNodes[workerNodeKey]?.usage != null) {
854 const workerUsage = this.workerNodes[workerNodeKey].usage
855 ++workerUsage.tasks.executing
856 this.updateWaitTimeWorkerUsage(workerUsage, task)
857 }
858 if (
859 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
860 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
861 task.name as string
862 ) != null
863 ) {
864 const taskFunctionWorkerUsage = this.workerNodes[
865 workerNodeKey
866 ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
867 ++taskFunctionWorkerUsage.tasks.executing
868 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
869 }
870 }
871
872 /**
873 * Hook executed after the worker task execution.
874 * Can be overridden.
875 *
876 * @param workerNodeKey - The worker node key.
877 * @param message - The received message.
878 */
879 protected afterTaskExecutionHook (
880 workerNodeKey: number,
881 message: MessageValue<Response>
882 ): void {
883 if (this.workerNodes[workerNodeKey]?.usage != null) {
884 const workerUsage = this.workerNodes[workerNodeKey].usage
885 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
886 this.updateRunTimeWorkerUsage(workerUsage, message)
887 this.updateEluWorkerUsage(workerUsage, message)
888 }
889 if (
890 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
891 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
892 message.taskPerformance?.name as string
893 ) != null
894 ) {
895 const taskFunctionWorkerUsage = this.workerNodes[
896 workerNodeKey
897 ].getTaskFunctionWorkerUsage(
898 message.taskPerformance?.name as string
899 ) as WorkerUsage
900 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
901 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
902 this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
903 }
904 }
905
906 /**
907 * Whether the worker node shall update its task function worker usage or not.
908 *
909 * @param workerNodeKey - The worker node key.
910 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
911 */
912 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
913 const workerInfo = this.getWorkerInfo(workerNodeKey)
914 return (
915 workerInfo != null &&
916 Array.isArray(workerInfo.taskFunctions) &&
917 workerInfo.taskFunctions.length > 2
918 )
919 }
920
921 private updateTaskStatisticsWorkerUsage (
922 workerUsage: WorkerUsage,
923 message: MessageValue<Response>
924 ): void {
925 const workerTaskStatistics = workerUsage.tasks
926 if (
927 workerTaskStatistics.executing != null &&
928 workerTaskStatistics.executing > 0
929 ) {
930 --workerTaskStatistics.executing
931 }
932 if (message.taskError == null) {
933 ++workerTaskStatistics.executed
934 } else {
935 ++workerTaskStatistics.failed
936 }
937 }
938
939 private updateRunTimeWorkerUsage (
940 workerUsage: WorkerUsage,
941 message: MessageValue<Response>
942 ): void {
943 updateMeasurementStatistics(
944 workerUsage.runTime,
945 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
946 message.taskPerformance?.runTime ?? 0,
947 workerUsage.tasks.executed
948 )
949 }
950
951 private updateWaitTimeWorkerUsage (
952 workerUsage: WorkerUsage,
953 task: Task<Data>
954 ): void {
955 const timestamp = performance.now()
956 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
957 updateMeasurementStatistics(
958 workerUsage.waitTime,
959 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
960 taskWaitTime,
961 workerUsage.tasks.executed
962 )
963 }
964
965 private updateEluWorkerUsage (
966 workerUsage: WorkerUsage,
967 message: MessageValue<Response>
968 ): void {
969 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
970 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
971 updateMeasurementStatistics(
972 workerUsage.elu.active,
973 eluTaskStatisticsRequirements,
974 message.taskPerformance?.elu?.active ?? 0,
975 workerUsage.tasks.executed
976 )
977 updateMeasurementStatistics(
978 workerUsage.elu.idle,
979 eluTaskStatisticsRequirements,
980 message.taskPerformance?.elu?.idle ?? 0,
981 workerUsage.tasks.executed
982 )
983 if (eluTaskStatisticsRequirements.aggregate) {
984 if (message.taskPerformance?.elu != null) {
985 if (workerUsage.elu.utilization != null) {
986 workerUsage.elu.utilization =
987 (workerUsage.elu.utilization +
988 message.taskPerformance.elu.utilization) /
989 2
990 } else {
991 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
992 }
993 }
994 }
995 }
996
997 /**
998 * Chooses a worker node for the next task.
999 *
1000 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
1001 *
1002 * @returns The chosen worker node key
1003 */
1004 private chooseWorkerNode (): number {
1005 if (this.shallCreateDynamicWorker()) {
1006 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
1007 if (
1008 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1009 ) {
1010 return workerNodeKey
1011 }
1012 }
1013 return this.workerChoiceStrategyContext.execute()
1014 }
1015
1016 /**
1017 * Conditions for dynamic worker creation.
1018 *
1019 * @returns Whether to create a dynamic worker or not.
1020 */
1021 private shallCreateDynamicWorker (): boolean {
1022 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
1023 }
1024
1025 /**
1026 * Sends a message to worker given its worker node key.
1027 *
1028 * @param workerNodeKey - The worker node key.
1029 * @param message - The message.
1030 * @param transferList - The optional array of transferable objects.
1031 */
1032 protected abstract sendToWorker (
1033 workerNodeKey: number,
1034 message: MessageValue<Data>,
1035 transferList?: TransferListItem[]
1036 ): void
1037
1038 /**
1039 * Creates a new worker.
1040 *
1041 * @returns Newly created worker.
1042 */
1043 protected abstract createWorker (): Worker
1044
1045 /**
1046 * Creates a new, completely set up worker node.
1047 *
1048 * @returns New, completely set up worker node key.
1049 */
1050 protected createAndSetupWorkerNode (): number {
1051 const worker = this.createWorker()
1052
1053 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
1054 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
1055 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1056 worker.on('error', (error) => {
1057 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1058 const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo
1059 workerInfo.ready = false
1060 this.workerNodes[workerNodeKey].closeChannel()
1061 this.emitter?.emit(PoolEvents.error, error)
1062 if (
1063 this.opts.restartWorkerOnError === true &&
1064 !this.starting &&
1065 this.started
1066 ) {
1067 if (workerInfo.dynamic) {
1068 this.createAndSetupDynamicWorkerNode()
1069 } else {
1070 this.createAndSetupWorkerNode()
1071 }
1072 }
1073 if (this.opts.enableTasksQueue === true) {
1074 this.redistributeQueuedTasks(workerNodeKey)
1075 }
1076 })
1077 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
1078 worker.once('exit', () => {
1079 this.removeWorkerNode(worker)
1080 })
1081
1082 const workerNodeKey = this.addWorkerNode(worker)
1083
1084 this.afterWorkerNodeSetup(workerNodeKey)
1085
1086 return workerNodeKey
1087 }
1088
1089 /**
1090 * Creates a new, completely set up dynamic worker node.
1091 *
1092 * @returns New, completely set up dynamic worker node key.
1093 */
1094 protected createAndSetupDynamicWorkerNode (): number {
1095 const workerNodeKey = this.createAndSetupWorkerNode()
1096 this.registerWorkerMessageListener(workerNodeKey, (message) => {
1097 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1098 message.workerId
1099 )
1100 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
1101 // Kill message received from worker
1102 if (
1103 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1104 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
1105 ((this.opts.enableTasksQueue === false &&
1106 workerUsage.tasks.executing === 0) ||
1107 (this.opts.enableTasksQueue === true &&
1108 workerUsage.tasks.executing === 0 &&
1109 this.tasksQueueSize(localWorkerNodeKey) === 0)))
1110 ) {
1111 this.destroyWorkerNode(localWorkerNodeKey).catch((error) => {
1112 this.emitter?.emit(PoolEvents.error, error)
1113 })
1114 }
1115 })
1116 const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo
1117 this.sendToWorker(workerNodeKey, {
1118 checkActive: true,
1119 workerId: workerInfo.id as number
1120 })
1121 workerInfo.dynamic = true
1122 if (
1123 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
1124 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1125 ) {
1126 workerInfo.ready = true
1127 }
1128 this.checkAndEmitDynamicWorkerCreationEvents()
1129 return workerNodeKey
1130 }
1131
1132 /**
1133 * Registers a listener callback on the worker given its worker node key.
1134 *
1135 * @param workerNodeKey - The worker node key.
1136 * @param listener - The message listener callback.
1137 */
1138 protected abstract registerWorkerMessageListener<
1139 Message extends Data | Response
1140 >(
1141 workerNodeKey: number,
1142 listener: (message: MessageValue<Message>) => void
1143 ): void
1144
1145 /**
1146 * Method hooked up after a worker node has been newly created.
1147 * Can be overridden.
1148 *
1149 * @param workerNodeKey - The newly created worker node key.
1150 */
1151 protected afterWorkerNodeSetup (workerNodeKey: number): void {
1152 // Listen to worker messages.
1153 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
1154 // Send the startup message to worker.
1155 this.sendStartupMessageToWorker(workerNodeKey)
1156 // Send the statistics message to worker.
1157 this.sendStatisticsMessageToWorker(workerNodeKey)
1158 if (this.opts.enableTasksQueue === true) {
1159 // this.workerNodes[workerNodeKey].onEmptyQueue =
1160 // this.taskStealingOnEmptyQueue.bind(this)
1161 this.workerNodes[workerNodeKey].onBackPressure =
1162 this.tasksStealingOnBackPressure.bind(this)
1163 }
1164 }
1165
1166 /**
1167 * Sends the startup message to worker given its worker node key.
1168 *
1169 * @param workerNodeKey - The worker node key.
1170 */
1171 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1172
1173 /**
1174 * Sends the statistics message to worker given its worker node key.
1175 *
1176 * @param workerNodeKey - The worker node key.
1177 */
1178 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
1179 this.sendToWorker(workerNodeKey, {
1180 statistics: {
1181 runTime:
1182 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1183 .runTime.aggregate,
1184 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1185 .elu.aggregate
1186 },
1187 workerId: (this.getWorkerInfo(workerNodeKey) as WorkerInfo).id as number
1188 })
1189 }
1190
1191 private redistributeQueuedTasks (workerNodeKey: number): void {
1192 const workerNodes = this.workerNodes.filter(
1193 (workerNode, workerNodeId) =>
1194 workerNode.info.ready && workerNodeId !== workerNodeKey
1195 )
1196 while (this.tasksQueueSize(workerNodeKey) > 0) {
1197 let destinationWorkerNodeKey: number = workerNodeKey
1198 let minQueuedTasks = Infinity
1199 let executeTask = false
1200 for (const [workerNodeId, workerNode] of workerNodes.entries()) {
1201 if (
1202 workerNode.usage.tasks.executing <
1203 (this.opts.tasksQueueOptions?.concurrency as number)
1204 ) {
1205 executeTask = true
1206 }
1207 if (workerNode.usage.tasks.queued === 0) {
1208 destinationWorkerNodeKey = workerNodeId
1209 break
1210 }
1211 if (workerNode.usage.tasks.queued < minQueuedTasks) {
1212 minQueuedTasks = workerNode.usage.tasks.queued
1213 destinationWorkerNodeKey = workerNodeId
1214 }
1215 }
1216 const task = {
1217 ...(this.dequeueTask(workerNodeKey) as Task<Data>),
1218 workerId: (this.getWorkerInfo(destinationWorkerNodeKey) as WorkerInfo)
1219 .id as number
1220 }
1221 if (executeTask) {
1222 this.executeTask(destinationWorkerNodeKey, task)
1223 } else {
1224 this.enqueueTask(destinationWorkerNodeKey, task)
1225 }
1226 }
1227 }
1228
1229 private taskStealingOnEmptyQueue (workerId: number): void {
1230 const workerNodes = this.workerNodes
1231 .filter(
1232 (workerNode) => workerNode.info.ready && workerNode.info.id !== workerId
1233 )
1234 .sort(
1235 (workerNodeA, workerNodeB) =>
1236 workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
1237 )
1238 const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
1239 const destinationWorkerNode = workerNodes[destinationWorkerNodeKey]
1240 for (const sourceWorkerNode of workerNodes) {
1241 if (sourceWorkerNode.usage.tasks.queued > 0) {
1242 if (
1243 destinationWorkerNode?.usage?.tasks?.executing <
1244 (this.opts.tasksQueueOptions?.concurrency as number)
1245 ) {
1246 const task = {
1247 ...(sourceWorkerNode.popTask() as Task<Data>),
1248 workerId: destinationWorkerNode.info.id as number
1249 }
1250 this.executeTask(destinationWorkerNodeKey, task)
1251 } else {
1252 const task = {
1253 ...(sourceWorkerNode.popTask() as Task<Data>),
1254 workerId: destinationWorkerNode.info.id as number
1255 }
1256 this.enqueueTask(destinationWorkerNodeKey, task)
1257 }
1258 break
1259 }
1260 }
1261 }
1262
1263 private tasksStealingOnBackPressure (workerId: number): void {
1264 const sourceWorkerNode =
1265 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
1266 const workerNodes = this.workerNodes
1267 .filter(
1268 (workerNode) => workerNode.info.ready && workerNode.info.id !== workerId
1269 )
1270 .sort(
1271 (workerNodeA, workerNodeB) =>
1272 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1273 )
1274 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1275 if (
1276 sourceWorkerNode.usage.tasks.queued > 0 &&
1277 !workerNode.hasBackPressure()
1278 ) {
1279 const task = {
1280 ...(sourceWorkerNode.popTask() as Task<Data>),
1281 workerId: workerNode.info.id as number
1282 }
1283 if (
1284 workerNode.usage.tasks.executing <
1285 (this.opts.tasksQueueOptions?.concurrency as number)
1286 ) {
1287 this.executeTask(workerNodeKey, task)
1288 } else {
1289 this.enqueueTask(workerNodeKey, task)
1290 }
1291 }
1292 }
1293 }
1294
1295 /**
1296 * This method is the listener registered for each worker message.
1297 *
1298 * @returns The listener function to execute when a message is received from a worker.
1299 */
1300 protected workerListener (): (message: MessageValue<Response>) => void {
1301 return (message) => {
1302 this.checkMessageWorkerId(message)
1303 if (message.ready != null && message.taskFunctions != null) {
1304 // Worker ready response received from worker
1305 this.handleWorkerReadyResponse(message)
1306 } else if (message.taskId != null) {
1307 // Task execution response received from worker
1308 this.handleTaskExecutionResponse(message)
1309 } else if (message.taskFunctions != null) {
1310 // Task functions message received from worker
1311 (
1312 this.getWorkerInfo(
1313 this.getWorkerNodeKeyByWorkerId(message.workerId)
1314 ) as WorkerInfo
1315 ).taskFunctions = message.taskFunctions
1316 }
1317 }
1318 }
1319
1320 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
1321 if (message.ready === false) {
1322 throw new Error(`Worker ${message.workerId} failed to initialize`)
1323 }
1324 const workerInfo = this.getWorkerInfo(
1325 this.getWorkerNodeKeyByWorkerId(message.workerId)
1326 ) as WorkerInfo
1327 workerInfo.ready = message.ready as boolean
1328 workerInfo.taskFunctions = message.taskFunctions
1329 if (this.emitter != null && this.ready) {
1330 this.emitter.emit(PoolEvents.ready, this.info)
1331 }
1332 }
1333
1334 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1335 const { taskId, taskError, data } = message
1336 const promiseResponse = this.promiseResponseMap.get(taskId as string)
1337 if (promiseResponse != null) {
1338 if (taskError != null) {
1339 this.emitter?.emit(PoolEvents.taskError, taskError)
1340 promiseResponse.reject(taskError.message)
1341 } else {
1342 promiseResponse.resolve(data as Response)
1343 }
1344 const workerNodeKey = promiseResponse.workerNodeKey
1345 this.afterTaskExecutionHook(workerNodeKey, message)
1346 this.promiseResponseMap.delete(taskId as string)
1347 if (
1348 this.opts.enableTasksQueue === true &&
1349 this.tasksQueueSize(workerNodeKey) > 0 &&
1350 this.workerNodes[workerNodeKey].usage.tasks.executing <
1351 (this.opts.tasksQueueOptions?.concurrency as number)
1352 ) {
1353 this.executeTask(
1354 workerNodeKey,
1355 this.dequeueTask(workerNodeKey) as Task<Data>
1356 )
1357 }
1358 this.workerChoiceStrategyContext.update(workerNodeKey)
1359 }
1360 }
1361
1362 private checkAndEmitTaskExecutionEvents (): void {
1363 if (this.busy) {
1364 this.emitter?.emit(PoolEvents.busy, this.info)
1365 }
1366 }
1367
1368 private checkAndEmitTaskQueuingEvents (): void {
1369 if (this.hasBackPressure()) {
1370 this.emitter?.emit(PoolEvents.backPressure, this.info)
1371 }
1372 }
1373
1374 private checkAndEmitDynamicWorkerCreationEvents (): void {
1375 if (this.type === PoolTypes.dynamic) {
1376 if (this.full) {
1377 this.emitter?.emit(PoolEvents.full, this.info)
1378 }
1379 }
1380 }
1381
1382 /**
1383 * Gets the worker information given its worker node key.
1384 *
1385 * @param workerNodeKey - The worker node key.
1386 * @returns The worker information.
1387 */
1388 protected getWorkerInfo (workerNodeKey: number): WorkerInfo | undefined {
1389 return this.workerNodes[workerNodeKey]?.info
1390 }
1391
1392 /**
1393 * Adds the given worker in the pool worker nodes.
1394 *
1395 * @param worker - The worker.
1396 * @returns The added worker node key.
1397 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1398 */
1399 private addWorkerNode (worker: Worker): number {
1400 const workerNode = new WorkerNode<Worker, Data>(
1401 worker,
1402 this.worker,
1403 this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2)
1404 )
1405 // Flag the worker node as ready at pool startup.
1406 if (this.starting) {
1407 workerNode.info.ready = true
1408 }
1409 this.workerNodes.push(workerNode)
1410 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1411 if (workerNodeKey === -1) {
1412 throw new Error('Worker node added not found')
1413 }
1414 return workerNodeKey
1415 }
1416
1417 /**
1418 * Removes the given worker from the pool worker nodes.
1419 *
1420 * @param worker - The worker.
1421 */
1422 private removeWorkerNode (worker: Worker): void {
1423 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1424 if (workerNodeKey !== -1) {
1425 this.workerNodes.splice(workerNodeKey, 1)
1426 this.workerChoiceStrategyContext.remove(workerNodeKey)
1427 }
1428 }
1429
1430 /** @inheritDoc */
1431 public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
1432 return (
1433 this.opts.enableTasksQueue === true &&
1434 this.workerNodes[workerNodeKey].hasBackPressure()
1435 )
1436 }
1437
1438 private hasBackPressure (): boolean {
1439 return (
1440 this.opts.enableTasksQueue === true &&
1441 this.workerNodes.findIndex(
1442 (workerNode) => !workerNode.hasBackPressure()
1443 ) === -1
1444 )
1445 }
1446
1447 /**
1448 * Executes the given task on the worker given its worker node key.
1449 *
1450 * @param workerNodeKey - The worker node key.
1451 * @param task - The task to execute.
1452 */
1453 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1454 this.beforeTaskExecutionHook(workerNodeKey, task)
1455 this.sendToWorker(workerNodeKey, task, task.transferList)
1456 this.checkAndEmitTaskExecutionEvents()
1457 }
1458
1459 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1460 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1461 this.checkAndEmitTaskQueuingEvents()
1462 return tasksQueueSize
1463 }
1464
1465 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1466 return this.workerNodes[workerNodeKey].dequeueTask()
1467 }
1468
1469 private tasksQueueSize (workerNodeKey: number): number {
1470 return this.workerNodes[workerNodeKey].tasksQueueSize()
1471 }
1472
1473 protected flushTasksQueue (workerNodeKey: number): void {
1474 while (this.tasksQueueSize(workerNodeKey) > 0) {
1475 this.executeTask(
1476 workerNodeKey,
1477 this.dequeueTask(workerNodeKey) as Task<Data>
1478 )
1479 }
1480 this.workerNodes[workerNodeKey].clearTasksQueue()
1481 }
1482
1483 private flushTasksQueues (): void {
1484 for (const [workerNodeKey] of this.workerNodes.entries()) {
1485 this.flushTasksQueue(workerNodeKey)
1486 }
1487 }
1488 }