fix: unregister worker callbacks after usage
[poolifier.git] / src / pools / abstract-pool.ts
1 import { randomUUID } from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import { type TransferListItem } from 'node:worker_threads'
4 import type {
5 MessageValue,
6 PromiseResponseWrapper,
7 Task
8 } from '../utility-types'
9 import {
10 DEFAULT_TASK_NAME,
11 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
12 EMPTY_FUNCTION,
13 average,
14 isKillBehavior,
15 isPlainObject,
16 max,
17 median,
18 min,
19 round
20 } from '../utils'
21 import { KillBehaviors } from '../worker/worker-options'
22 import type { TaskFunction } from '../worker/task-functions'
23 import {
24 type IPool,
25 PoolEmitter,
26 PoolEvents,
27 type PoolInfo,
28 type PoolOptions,
29 type PoolType,
30 PoolTypes,
31 type TasksQueueOptions
32 } from './pool'
33 import {
34 type IWorker,
35 type IWorkerNode,
36 type WorkerInfo,
37 type WorkerType,
38 type WorkerUsage
39 } from './worker'
40 import {
41 type MeasurementStatisticsRequirements,
42 Measurements,
43 WorkerChoiceStrategies,
44 type WorkerChoiceStrategy,
45 type WorkerChoiceStrategyOptions
46 } from './selection-strategies/selection-strategies-types'
47 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
48 import { version } from './version'
49 import { WorkerNode } from './worker-node'
50 import {
51 checkFilePath,
52 checkValidTasksQueueOptions,
53 checkValidWorkerChoiceStrategy,
54 updateMeasurementStatistics
55 } from './utils'
56
57 /**
58 * Base class that implements some shared logic for all poolifier pools.
59 *
60 * @typeParam Worker - Type of worker which manages this pool.
61 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
62 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
63 */
64 export abstract class AbstractPool<
65 Worker extends IWorker,
66 Data = unknown,
67 Response = unknown
68 > implements IPool<Worker, Data, Response> {
69 /** @inheritDoc */
70 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
71
72 /** @inheritDoc */
73 public readonly emitter?: PoolEmitter
74
75 /**
76 * The task execution response promise map:
77 * - `key`: The message id of each submitted task.
78 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
79 *
80 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
81 */
82 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
83 new Map<string, PromiseResponseWrapper<Response>>()
84
85 /**
86 * Worker choice strategy context referencing a worker choice algorithm implementation.
87 */
88 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
89 Worker,
90 Data,
91 Response
92 >
93
94 /**
95 * Dynamic pool maximum size property placeholder.
96 */
97 protected readonly max?: number
98
99 /**
100 * The task functions added at runtime map:
101 * - `key`: The task function name.
102 * - `value`: The task function itself.
103 */
104 private readonly taskFunctions: Map<string, TaskFunction<Data, Response>>
105
106 /**
107 * Whether the pool is started or not.
108 */
109 private started: boolean
110 /**
111 * Whether the pool is starting or not.
112 */
113 private starting: boolean
114 /**
115 * The start timestamp of the pool.
116 */
117 private readonly startTimestamp
118
119 /**
120 * Constructs a new poolifier pool.
121 *
122 * @param numberOfWorkers - Number of workers that this pool should manage.
123 * @param filePath - Path to the worker file.
124 * @param opts - Options for the pool.
125 */
126 public constructor (
127 protected readonly numberOfWorkers: number,
128 protected readonly filePath: string,
129 protected readonly opts: PoolOptions<Worker>
130 ) {
131 if (!this.isMain()) {
132 throw new Error(
133 'Cannot start a pool from a worker with the same type as the pool'
134 )
135 }
136 this.checkNumberOfWorkers(this.numberOfWorkers)
137 checkFilePath(this.filePath)
138 this.checkPoolOptions(this.opts)
139
140 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
141 this.executeTask = this.executeTask.bind(this)
142 this.enqueueTask = this.enqueueTask.bind(this)
143
144 if (this.opts.enableEvents === true) {
145 this.emitter = new PoolEmitter()
146 }
147 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
148 Worker,
149 Data,
150 Response
151 >(
152 this,
153 this.opts.workerChoiceStrategy,
154 this.opts.workerChoiceStrategyOptions
155 )
156
157 this.setupHook()
158
159 this.taskFunctions = new Map<string, TaskFunction<Data, Response>>()
160
161 this.started = false
162 this.starting = false
163 if (this.opts.startWorkers === true) {
164 this.start()
165 }
166
167 this.startTimestamp = performance.now()
168 }
169
170 private checkNumberOfWorkers (numberOfWorkers: number): void {
171 if (numberOfWorkers == null) {
172 throw new Error(
173 'Cannot instantiate a pool without specifying the number of workers'
174 )
175 } else if (!Number.isSafeInteger(numberOfWorkers)) {
176 throw new TypeError(
177 'Cannot instantiate a pool with a non safe integer number of workers'
178 )
179 } else if (numberOfWorkers < 0) {
180 throw new RangeError(
181 'Cannot instantiate a pool with a negative number of workers'
182 )
183 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
184 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
185 }
186 }
187
188 private checkPoolOptions (opts: PoolOptions<Worker>): void {
189 if (isPlainObject(opts)) {
190 this.opts.startWorkers = opts.startWorkers ?? true
191 checkValidWorkerChoiceStrategy(
192 opts.workerChoiceStrategy as WorkerChoiceStrategy
193 )
194 this.opts.workerChoiceStrategy =
195 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
196 this.checkValidWorkerChoiceStrategyOptions(
197 opts.workerChoiceStrategyOptions as WorkerChoiceStrategyOptions
198 )
199 this.opts.workerChoiceStrategyOptions = {
200 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
201 ...opts.workerChoiceStrategyOptions
202 }
203 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
204 this.opts.enableEvents = opts.enableEvents ?? true
205 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
206 if (this.opts.enableTasksQueue) {
207 checkValidTasksQueueOptions(opts.tasksQueueOptions as TasksQueueOptions)
208 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
209 opts.tasksQueueOptions as TasksQueueOptions
210 )
211 }
212 } else {
213 throw new TypeError('Invalid pool options: must be a plain object')
214 }
215 }
216
217 private checkValidWorkerChoiceStrategyOptions (
218 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
219 ): void {
220 if (
221 workerChoiceStrategyOptions != null &&
222 !isPlainObject(workerChoiceStrategyOptions)
223 ) {
224 throw new TypeError(
225 'Invalid worker choice strategy options: must be a plain object'
226 )
227 }
228 if (
229 workerChoiceStrategyOptions?.retries != null &&
230 !Number.isSafeInteger(workerChoiceStrategyOptions.retries)
231 ) {
232 throw new TypeError(
233 'Invalid worker choice strategy options: retries must be an integer'
234 )
235 }
236 if (
237 workerChoiceStrategyOptions?.retries != null &&
238 workerChoiceStrategyOptions.retries < 0
239 ) {
240 throw new RangeError(
241 `Invalid worker choice strategy options: retries '${workerChoiceStrategyOptions.retries}' must be greater or equal than zero`
242 )
243 }
244 if (
245 workerChoiceStrategyOptions?.weights != null &&
246 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
247 ) {
248 throw new Error(
249 'Invalid worker choice strategy options: must have a weight for each worker node'
250 )
251 }
252 if (
253 workerChoiceStrategyOptions?.measurement != null &&
254 !Object.values(Measurements).includes(
255 workerChoiceStrategyOptions.measurement
256 )
257 ) {
258 throw new Error(
259 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
260 )
261 }
262 }
263
264 /** @inheritDoc */
265 public get info (): PoolInfo {
266 return {
267 version,
268 type: this.type,
269 worker: this.worker,
270 started: this.started,
271 ready: this.ready,
272 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
273 minSize: this.minSize,
274 maxSize: this.maxSize,
275 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
276 .runTime.aggregate &&
277 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
278 .waitTime.aggregate && { utilization: round(this.utilization) }),
279 workerNodes: this.workerNodes.length,
280 idleWorkerNodes: this.workerNodes.reduce(
281 (accumulator, workerNode) =>
282 workerNode.usage.tasks.executing === 0
283 ? accumulator + 1
284 : accumulator,
285 0
286 ),
287 busyWorkerNodes: this.workerNodes.reduce(
288 (accumulator, workerNode) =>
289 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
290 0
291 ),
292 executedTasks: this.workerNodes.reduce(
293 (accumulator, workerNode) =>
294 accumulator + workerNode.usage.tasks.executed,
295 0
296 ),
297 executingTasks: this.workerNodes.reduce(
298 (accumulator, workerNode) =>
299 accumulator + workerNode.usage.tasks.executing,
300 0
301 ),
302 ...(this.opts.enableTasksQueue === true && {
303 queuedTasks: this.workerNodes.reduce(
304 (accumulator, workerNode) =>
305 accumulator + workerNode.usage.tasks.queued,
306 0
307 )
308 }),
309 ...(this.opts.enableTasksQueue === true && {
310 maxQueuedTasks: this.workerNodes.reduce(
311 (accumulator, workerNode) =>
312 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
313 0
314 )
315 }),
316 ...(this.opts.enableTasksQueue === true && {
317 backPressure: this.hasBackPressure()
318 }),
319 ...(this.opts.enableTasksQueue === true && {
320 stolenTasks: this.workerNodes.reduce(
321 (accumulator, workerNode) =>
322 accumulator + workerNode.usage.tasks.stolen,
323 0
324 )
325 }),
326 failedTasks: this.workerNodes.reduce(
327 (accumulator, workerNode) =>
328 accumulator + workerNode.usage.tasks.failed,
329 0
330 ),
331 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
332 .runTime.aggregate && {
333 runTime: {
334 minimum: round(
335 min(
336 ...this.workerNodes.map(
337 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
338 )
339 )
340 ),
341 maximum: round(
342 max(
343 ...this.workerNodes.map(
344 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
345 )
346 )
347 ),
348 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
349 .runTime.average && {
350 average: round(
351 average(
352 this.workerNodes.reduce<number[]>(
353 (accumulator, workerNode) =>
354 accumulator.concat(workerNode.usage.runTime.history),
355 []
356 )
357 )
358 )
359 }),
360 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
361 .runTime.median && {
362 median: round(
363 median(
364 this.workerNodes.reduce<number[]>(
365 (accumulator, workerNode) =>
366 accumulator.concat(workerNode.usage.runTime.history),
367 []
368 )
369 )
370 )
371 })
372 }
373 }),
374 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
375 .waitTime.aggregate && {
376 waitTime: {
377 minimum: round(
378 min(
379 ...this.workerNodes.map(
380 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
381 )
382 )
383 ),
384 maximum: round(
385 max(
386 ...this.workerNodes.map(
387 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
388 )
389 )
390 ),
391 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
392 .waitTime.average && {
393 average: round(
394 average(
395 this.workerNodes.reduce<number[]>(
396 (accumulator, workerNode) =>
397 accumulator.concat(workerNode.usage.waitTime.history),
398 []
399 )
400 )
401 )
402 }),
403 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
404 .waitTime.median && {
405 median: round(
406 median(
407 this.workerNodes.reduce<number[]>(
408 (accumulator, workerNode) =>
409 accumulator.concat(workerNode.usage.waitTime.history),
410 []
411 )
412 )
413 )
414 })
415 }
416 })
417 }
418 }
419
420 /**
421 * The pool readiness boolean status.
422 */
423 private get ready (): boolean {
424 return (
425 this.workerNodes.reduce(
426 (accumulator, workerNode) =>
427 !workerNode.info.dynamic && workerNode.info.ready
428 ? accumulator + 1
429 : accumulator,
430 0
431 ) >= this.minSize
432 )
433 }
434
435 /**
436 * The approximate pool utilization.
437 *
438 * @returns The pool utilization.
439 */
440 private get utilization (): number {
441 const poolTimeCapacity =
442 (performance.now() - this.startTimestamp) * this.maxSize
443 const totalTasksRunTime = this.workerNodes.reduce(
444 (accumulator, workerNode) =>
445 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
446 0
447 )
448 const totalTasksWaitTime = this.workerNodes.reduce(
449 (accumulator, workerNode) =>
450 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
451 0
452 )
453 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
454 }
455
456 /**
457 * The pool type.
458 *
459 * If it is `'dynamic'`, it provides the `max` property.
460 */
461 protected abstract get type (): PoolType
462
463 /**
464 * The worker type.
465 */
466 protected abstract get worker (): WorkerType
467
468 /**
469 * The pool minimum size.
470 */
471 protected get minSize (): number {
472 return this.numberOfWorkers
473 }
474
475 /**
476 * The pool maximum size.
477 */
478 protected get maxSize (): number {
479 return this.max ?? this.numberOfWorkers
480 }
481
482 /**
483 * Checks if the worker id sent in the received message from a worker is valid.
484 *
485 * @param message - The received message.
486 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
487 */
488 private checkMessageWorkerId (message: MessageValue<Response>): void {
489 if (message.workerId == null) {
490 throw new Error('Worker message received without worker id')
491 } else if (
492 message.workerId != null &&
493 this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
494 ) {
495 throw new Error(
496 `Worker message received from unknown worker '${message.workerId}'`
497 )
498 }
499 }
500
501 /**
502 * Gets the given worker its worker node key.
503 *
504 * @param worker - The worker.
505 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
506 */
507 private getWorkerNodeKeyByWorker (worker: Worker): number {
508 return this.workerNodes.findIndex(
509 workerNode => workerNode.worker === worker
510 )
511 }
512
513 /**
514 * Gets the worker node key given its worker id.
515 *
516 * @param workerId - The worker id.
517 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
518 */
519 private getWorkerNodeKeyByWorkerId (workerId: number | undefined): number {
520 return this.workerNodes.findIndex(
521 workerNode => workerNode.info.id === workerId
522 )
523 }
524
525 /** @inheritDoc */
526 public setWorkerChoiceStrategy (
527 workerChoiceStrategy: WorkerChoiceStrategy,
528 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
529 ): void {
530 checkValidWorkerChoiceStrategy(workerChoiceStrategy)
531 this.opts.workerChoiceStrategy = workerChoiceStrategy
532 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
533 this.opts.workerChoiceStrategy
534 )
535 if (workerChoiceStrategyOptions != null) {
536 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
537 }
538 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
539 workerNode.resetUsage()
540 this.sendStatisticsMessageToWorker(workerNodeKey)
541 }
542 }
543
544 /** @inheritDoc */
545 public setWorkerChoiceStrategyOptions (
546 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
547 ): void {
548 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
549 this.opts.workerChoiceStrategyOptions = {
550 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
551 ...workerChoiceStrategyOptions
552 }
553 this.workerChoiceStrategyContext.setOptions(
554 this.opts.workerChoiceStrategyOptions
555 )
556 }
557
558 /** @inheritDoc */
559 public enableTasksQueue (
560 enable: boolean,
561 tasksQueueOptions?: TasksQueueOptions
562 ): void {
563 if (this.opts.enableTasksQueue === true && !enable) {
564 this.unsetTaskStealing()
565 this.unsetTasksStealingOnBackPressure()
566 this.flushTasksQueues()
567 }
568 this.opts.enableTasksQueue = enable
569 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
570 }
571
572 /** @inheritDoc */
573 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
574 if (this.opts.enableTasksQueue === true) {
575 checkValidTasksQueueOptions(tasksQueueOptions)
576 this.opts.tasksQueueOptions =
577 this.buildTasksQueueOptions(tasksQueueOptions)
578 this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number)
579 if (this.opts.tasksQueueOptions.taskStealing === true) {
580 this.setTaskStealing()
581 } else {
582 this.unsetTaskStealing()
583 }
584 if (this.opts.tasksQueueOptions.tasksStealingOnBackPressure === true) {
585 this.setTasksStealingOnBackPressure()
586 } else {
587 this.unsetTasksStealingOnBackPressure()
588 }
589 } else if (this.opts.tasksQueueOptions != null) {
590 delete this.opts.tasksQueueOptions
591 }
592 }
593
594 private buildTasksQueueOptions (
595 tasksQueueOptions: TasksQueueOptions
596 ): TasksQueueOptions {
597 return {
598 ...{
599 size: Math.pow(this.maxSize, 2),
600 concurrency: 1,
601 taskStealing: true,
602 tasksStealingOnBackPressure: true
603 },
604 ...tasksQueueOptions
605 }
606 }
607
608 private setTasksQueueSize (size: number): void {
609 for (const workerNode of this.workerNodes) {
610 workerNode.tasksQueueBackPressureSize = size
611 }
612 }
613
614 private setTaskStealing (): void {
615 for (const [workerNodeKey] of this.workerNodes.entries()) {
616 this.workerNodes[workerNodeKey].onEmptyQueue =
617 this.taskStealingOnEmptyQueue.bind(this)
618 }
619 }
620
621 private unsetTaskStealing (): void {
622 for (const [workerNodeKey] of this.workerNodes.entries()) {
623 delete this.workerNodes[workerNodeKey].onEmptyQueue
624 }
625 }
626
627 private setTasksStealingOnBackPressure (): void {
628 for (const [workerNodeKey] of this.workerNodes.entries()) {
629 this.workerNodes[workerNodeKey].onBackPressure =
630 this.tasksStealingOnBackPressure.bind(this)
631 }
632 }
633
634 private unsetTasksStealingOnBackPressure (): void {
635 for (const [workerNodeKey] of this.workerNodes.entries()) {
636 delete this.workerNodes[workerNodeKey].onBackPressure
637 }
638 }
639
640 /**
641 * Whether the pool is full or not.
642 *
643 * The pool filling boolean status.
644 */
645 protected get full (): boolean {
646 return this.workerNodes.length >= this.maxSize
647 }
648
649 /**
650 * Whether the pool is busy or not.
651 *
652 * The pool busyness boolean status.
653 */
654 protected abstract get busy (): boolean
655
656 /**
657 * Whether worker nodes are executing concurrently their tasks quota or not.
658 *
659 * @returns Worker nodes busyness boolean status.
660 */
661 protected internalBusy (): boolean {
662 if (this.opts.enableTasksQueue === true) {
663 return (
664 this.workerNodes.findIndex(
665 workerNode =>
666 workerNode.info.ready &&
667 workerNode.usage.tasks.executing <
668 (this.opts.tasksQueueOptions?.concurrency as number)
669 ) === -1
670 )
671 }
672 return (
673 this.workerNodes.findIndex(
674 workerNode =>
675 workerNode.info.ready && workerNode.usage.tasks.executing === 0
676 ) === -1
677 )
678 }
679
680 private async sendTaskFunctionOperationToWorker (
681 workerNodeKey: number,
682 message: MessageValue<Data>
683 ): Promise<boolean> {
684 return await new Promise<boolean>((resolve, reject) => {
685 const taskFunctionOperationListener = (
686 message: MessageValue<Response>
687 ): void => {
688 this.checkMessageWorkerId(message)
689 const workerId = this.getWorkerInfo(workerNodeKey).id as number
690 if (
691 message.taskFunctionOperationStatus != null &&
692 message.workerId === workerId
693 ) {
694 if (message.taskFunctionOperationStatus) {
695 resolve(true)
696 } else if (!message.taskFunctionOperationStatus) {
697 reject(
698 new Error(
699 `Task function operation '${
700 message.taskFunctionOperation as string
701 }' failed on worker ${message.workerId} with error: '${
702 message.workerError?.message as string
703 }'`
704 )
705 )
706 }
707 this.deregisterWorkerMessageListener(
708 this.getWorkerNodeKeyByWorkerId(message.workerId),
709 taskFunctionOperationListener
710 )
711 }
712 }
713 this.registerWorkerMessageListener(
714 workerNodeKey,
715 taskFunctionOperationListener
716 )
717 this.sendToWorker(workerNodeKey, message)
718 })
719 }
720
721 private async sendTaskFunctionOperationToWorkers (
722 message: MessageValue<Data>
723 ): Promise<boolean> {
724 return await new Promise<boolean>((resolve, reject) => {
725 const responsesReceived = new Array<MessageValue<Response>>()
726 const taskFunctionOperationsListener = (
727 message: MessageValue<Response>
728 ): void => {
729 this.checkMessageWorkerId(message)
730 if (message.taskFunctionOperationStatus != null) {
731 responsesReceived.push(message)
732 if (responsesReceived.length === this.workerNodes.length) {
733 if (
734 responsesReceived.every(
735 message => message.taskFunctionOperationStatus === true
736 )
737 ) {
738 resolve(true)
739 } else if (
740 responsesReceived.some(
741 message => message.taskFunctionOperationStatus === false
742 )
743 ) {
744 const errorResponse = responsesReceived.find(
745 response => response.taskFunctionOperationStatus === false
746 )
747 reject(
748 new Error(
749 `Task function operation '${
750 message.taskFunctionOperation as string
751 }' failed on worker ${
752 errorResponse?.workerId as number
753 } with error: '${
754 errorResponse?.workerError?.message as string
755 }'`
756 )
757 )
758 }
759 this.deregisterWorkerMessageListener(
760 this.getWorkerNodeKeyByWorkerId(message.workerId),
761 taskFunctionOperationsListener
762 )
763 }
764 }
765 }
766 for (const [workerNodeKey] of this.workerNodes.entries()) {
767 this.registerWorkerMessageListener(
768 workerNodeKey,
769 taskFunctionOperationsListener
770 )
771 this.sendToWorker(workerNodeKey, message)
772 }
773 })
774 }
775
776 /** @inheritDoc */
777 public hasTaskFunction (name: string): boolean {
778 for (const workerNode of this.workerNodes) {
779 if (
780 Array.isArray(workerNode.info.taskFunctionNames) &&
781 workerNode.info.taskFunctionNames.includes(name)
782 ) {
783 return true
784 }
785 }
786 return false
787 }
788
789 /** @inheritDoc */
790 public async addTaskFunction (
791 name: string,
792 fn: TaskFunction<Data, Response>
793 ): Promise<boolean> {
794 if (typeof name !== 'string') {
795 throw new TypeError('name argument must be a string')
796 }
797 if (typeof name === 'string' && name.trim().length === 0) {
798 throw new TypeError('name argument must not be an empty string')
799 }
800 if (typeof fn !== 'function') {
801 throw new TypeError('fn argument must be a function')
802 }
803 const opResult = await this.sendTaskFunctionOperationToWorkers({
804 taskFunctionOperation: 'add',
805 taskFunctionName: name,
806 taskFunction: fn.toString()
807 })
808 this.taskFunctions.set(name, fn)
809 return opResult
810 }
811
812 /** @inheritDoc */
813 public async removeTaskFunction (name: string): Promise<boolean> {
814 if (!this.taskFunctions.has(name)) {
815 throw new Error(
816 'Cannot remove a task function not handled on the pool side'
817 )
818 }
819 const opResult = await this.sendTaskFunctionOperationToWorkers({
820 taskFunctionOperation: 'remove',
821 taskFunctionName: name
822 })
823 this.deleteTaskFunctionWorkerUsages(name)
824 this.taskFunctions.delete(name)
825 return opResult
826 }
827
828 /** @inheritDoc */
829 public listTaskFunctionNames (): string[] {
830 for (const workerNode of this.workerNodes) {
831 if (
832 Array.isArray(workerNode.info.taskFunctionNames) &&
833 workerNode.info.taskFunctionNames.length > 0
834 ) {
835 return workerNode.info.taskFunctionNames
836 }
837 }
838 return []
839 }
840
841 /** @inheritDoc */
842 public async setDefaultTaskFunction (name: string): Promise<boolean> {
843 return await this.sendTaskFunctionOperationToWorkers({
844 taskFunctionOperation: 'default',
845 taskFunctionName: name
846 })
847 }
848
849 private deleteTaskFunctionWorkerUsages (name: string): void {
850 for (const workerNode of this.workerNodes) {
851 workerNode.deleteTaskFunctionWorkerUsage(name)
852 }
853 }
854
855 private shallExecuteTask (workerNodeKey: number): boolean {
856 return (
857 this.tasksQueueSize(workerNodeKey) === 0 &&
858 this.workerNodes[workerNodeKey].usage.tasks.executing <
859 (this.opts.tasksQueueOptions?.concurrency as number)
860 )
861 }
862
863 /** @inheritDoc */
864 public async execute (
865 data?: Data,
866 name?: string,
867 transferList?: TransferListItem[]
868 ): Promise<Response> {
869 return await new Promise<Response>((resolve, reject) => {
870 if (!this.started) {
871 reject(new Error('Cannot execute a task on not started pool'))
872 return
873 }
874 if (name != null && typeof name !== 'string') {
875 reject(new TypeError('name argument must be a string'))
876 return
877 }
878 if (
879 name != null &&
880 typeof name === 'string' &&
881 name.trim().length === 0
882 ) {
883 reject(new TypeError('name argument must not be an empty string'))
884 return
885 }
886 if (transferList != null && !Array.isArray(transferList)) {
887 reject(new TypeError('transferList argument must be an array'))
888 return
889 }
890 const timestamp = performance.now()
891 const workerNodeKey = this.chooseWorkerNode()
892 const task: Task<Data> = {
893 name: name ?? DEFAULT_TASK_NAME,
894 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
895 data: data ?? ({} as Data),
896 transferList,
897 timestamp,
898 taskId: randomUUID()
899 }
900 this.promiseResponseMap.set(task.taskId as string, {
901 resolve,
902 reject,
903 workerNodeKey
904 })
905 if (
906 this.opts.enableTasksQueue === false ||
907 (this.opts.enableTasksQueue === true &&
908 this.shallExecuteTask(workerNodeKey))
909 ) {
910 this.executeTask(workerNodeKey, task)
911 } else {
912 this.enqueueTask(workerNodeKey, task)
913 }
914 })
915 }
916
917 /** @inheritdoc */
918 public start (): void {
919 this.starting = true
920 while (
921 this.workerNodes.reduce(
922 (accumulator, workerNode) =>
923 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
924 0
925 ) < this.numberOfWorkers
926 ) {
927 this.createAndSetupWorkerNode()
928 }
929 this.starting = false
930 this.started = true
931 }
932
933 /** @inheritDoc */
934 public async destroy (): Promise<void> {
935 await Promise.all(
936 this.workerNodes.map(async (_, workerNodeKey) => {
937 await this.destroyWorkerNode(workerNodeKey)
938 })
939 )
940 this.emitter?.emit(PoolEvents.destroy, this.info)
941 this.started = false
942 }
943
944 protected async sendKillMessageToWorker (
945 workerNodeKey: number
946 ): Promise<void> {
947 await new Promise<void>((resolve, reject) => {
948 const killMessageListener = (message: MessageValue<Response>): void => {
949 this.checkMessageWorkerId(message)
950 if (message.kill === 'success') {
951 resolve()
952 } else if (message.kill === 'failure') {
953 reject(
954 new Error(
955 `Kill message handling failed on worker ${
956 message.workerId as number
957 }`
958 )
959 )
960 }
961 }
962 this.registerWorkerMessageListener(workerNodeKey, killMessageListener)
963 this.sendToWorker(workerNodeKey, { kill: true })
964 })
965 }
966
967 /**
968 * Terminates the worker node given its worker node key.
969 *
970 * @param workerNodeKey - The worker node key.
971 */
972 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
973
974 /**
975 * Setup hook to execute code before worker nodes are created in the abstract constructor.
976 * Can be overridden.
977 *
978 * @virtual
979 */
980 protected setupHook (): void {
981 /* Intentionally empty */
982 }
983
984 /**
985 * Should return whether the worker is the main worker or not.
986 */
987 protected abstract isMain (): boolean
988
989 /**
990 * Hook executed before the worker task execution.
991 * Can be overridden.
992 *
993 * @param workerNodeKey - The worker node key.
994 * @param task - The task to execute.
995 */
996 protected beforeTaskExecutionHook (
997 workerNodeKey: number,
998 task: Task<Data>
999 ): void {
1000 if (this.workerNodes[workerNodeKey]?.usage != null) {
1001 const workerUsage = this.workerNodes[workerNodeKey].usage
1002 ++workerUsage.tasks.executing
1003 this.updateWaitTimeWorkerUsage(workerUsage, task)
1004 }
1005 if (
1006 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1007 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
1008 task.name as string
1009 ) != null
1010 ) {
1011 const taskFunctionWorkerUsage = this.workerNodes[
1012 workerNodeKey
1013 ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
1014 ++taskFunctionWorkerUsage.tasks.executing
1015 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
1016 }
1017 }
1018
1019 /**
1020 * Hook executed after the worker task execution.
1021 * Can be overridden.
1022 *
1023 * @param workerNodeKey - The worker node key.
1024 * @param message - The received message.
1025 */
1026 protected afterTaskExecutionHook (
1027 workerNodeKey: number,
1028 message: MessageValue<Response>
1029 ): void {
1030 if (this.workerNodes[workerNodeKey]?.usage != null) {
1031 const workerUsage = this.workerNodes[workerNodeKey].usage
1032 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
1033 this.updateRunTimeWorkerUsage(workerUsage, message)
1034 this.updateEluWorkerUsage(workerUsage, message)
1035 }
1036 if (
1037 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1038 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
1039 message.taskPerformance?.name as string
1040 ) != null
1041 ) {
1042 const taskFunctionWorkerUsage = this.workerNodes[
1043 workerNodeKey
1044 ].getTaskFunctionWorkerUsage(
1045 message.taskPerformance?.name as string
1046 ) as WorkerUsage
1047 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
1048 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
1049 this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
1050 }
1051 }
1052
1053 /**
1054 * Whether the worker node shall update its task function worker usage or not.
1055 *
1056 * @param workerNodeKey - The worker node key.
1057 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1058 */
1059 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
1060 const workerInfo = this.getWorkerInfo(workerNodeKey)
1061 return (
1062 workerInfo != null &&
1063 Array.isArray(workerInfo.taskFunctionNames) &&
1064 workerInfo.taskFunctionNames.length > 2
1065 )
1066 }
1067
1068 private updateTaskStatisticsWorkerUsage (
1069 workerUsage: WorkerUsage,
1070 message: MessageValue<Response>
1071 ): void {
1072 const workerTaskStatistics = workerUsage.tasks
1073 if (
1074 workerTaskStatistics.executing != null &&
1075 workerTaskStatistics.executing > 0
1076 ) {
1077 --workerTaskStatistics.executing
1078 }
1079 if (message.workerError == null) {
1080 ++workerTaskStatistics.executed
1081 } else {
1082 ++workerTaskStatistics.failed
1083 }
1084 }
1085
1086 private updateRunTimeWorkerUsage (
1087 workerUsage: WorkerUsage,
1088 message: MessageValue<Response>
1089 ): void {
1090 if (message.workerError != null) {
1091 return
1092 }
1093 updateMeasurementStatistics(
1094 workerUsage.runTime,
1095 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
1096 message.taskPerformance?.runTime ?? 0
1097 )
1098 }
1099
1100 private updateWaitTimeWorkerUsage (
1101 workerUsage: WorkerUsage,
1102 task: Task<Data>
1103 ): void {
1104 const timestamp = performance.now()
1105 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
1106 updateMeasurementStatistics(
1107 workerUsage.waitTime,
1108 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
1109 taskWaitTime
1110 )
1111 }
1112
1113 private updateEluWorkerUsage (
1114 workerUsage: WorkerUsage,
1115 message: MessageValue<Response>
1116 ): void {
1117 if (message.workerError != null) {
1118 return
1119 }
1120 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
1121 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
1122 updateMeasurementStatistics(
1123 workerUsage.elu.active,
1124 eluTaskStatisticsRequirements,
1125 message.taskPerformance?.elu?.active ?? 0
1126 )
1127 updateMeasurementStatistics(
1128 workerUsage.elu.idle,
1129 eluTaskStatisticsRequirements,
1130 message.taskPerformance?.elu?.idle ?? 0
1131 )
1132 if (eluTaskStatisticsRequirements.aggregate) {
1133 if (message.taskPerformance?.elu != null) {
1134 if (workerUsage.elu.utilization != null) {
1135 workerUsage.elu.utilization =
1136 (workerUsage.elu.utilization +
1137 message.taskPerformance.elu.utilization) /
1138 2
1139 } else {
1140 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
1141 }
1142 }
1143 }
1144 }
1145
1146 /**
1147 * Chooses a worker node for the next task.
1148 *
1149 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
1150 *
1151 * @returns The chosen worker node key
1152 */
1153 private chooseWorkerNode (): number {
1154 if (this.shallCreateDynamicWorker()) {
1155 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
1156 if (
1157 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1158 ) {
1159 return workerNodeKey
1160 }
1161 }
1162 return this.workerChoiceStrategyContext.execute()
1163 }
1164
1165 /**
1166 * Conditions for dynamic worker creation.
1167 *
1168 * @returns Whether to create a dynamic worker or not.
1169 */
1170 private shallCreateDynamicWorker (): boolean {
1171 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
1172 }
1173
1174 /**
1175 * Sends a message to worker given its worker node key.
1176 *
1177 * @param workerNodeKey - The worker node key.
1178 * @param message - The message.
1179 * @param transferList - The optional array of transferable objects.
1180 */
1181 protected abstract sendToWorker (
1182 workerNodeKey: number,
1183 message: MessageValue<Data>,
1184 transferList?: TransferListItem[]
1185 ): void
1186
1187 /**
1188 * Creates a new worker.
1189 *
1190 * @returns Newly created worker.
1191 */
1192 protected abstract createWorker (): Worker
1193
1194 /**
1195 * Creates a new, completely set up worker node.
1196 *
1197 * @returns New, completely set up worker node key.
1198 */
1199 protected createAndSetupWorkerNode (): number {
1200 const worker = this.createWorker()
1201
1202 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
1203 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
1204 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1205 worker.on('error', error => {
1206 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1207 const workerInfo = this.getWorkerInfo(workerNodeKey)
1208 workerInfo.ready = false
1209 this.workerNodes[workerNodeKey].closeChannel()
1210 this.emitter?.emit(PoolEvents.error, error)
1211 if (
1212 this.started &&
1213 !this.starting &&
1214 this.opts.restartWorkerOnError === true
1215 ) {
1216 if (workerInfo.dynamic) {
1217 this.createAndSetupDynamicWorkerNode()
1218 } else {
1219 this.createAndSetupWorkerNode()
1220 }
1221 }
1222 if (this.started && this.opts.enableTasksQueue === true) {
1223 this.redistributeQueuedTasks(workerNodeKey)
1224 }
1225 })
1226 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
1227 worker.once('exit', () => {
1228 this.removeWorkerNode(worker)
1229 })
1230
1231 const workerNodeKey = this.addWorkerNode(worker)
1232
1233 this.afterWorkerNodeSetup(workerNodeKey)
1234
1235 return workerNodeKey
1236 }
1237
1238 /**
1239 * Creates a new, completely set up dynamic worker node.
1240 *
1241 * @returns New, completely set up dynamic worker node key.
1242 */
1243 protected createAndSetupDynamicWorkerNode (): number {
1244 const workerNodeKey = this.createAndSetupWorkerNode()
1245 this.registerWorkerMessageListener(workerNodeKey, message => {
1246 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1247 message.workerId
1248 )
1249 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
1250 // Kill message received from worker
1251 if (
1252 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1253 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
1254 ((this.opts.enableTasksQueue === false &&
1255 workerUsage.tasks.executing === 0) ||
1256 (this.opts.enableTasksQueue === true &&
1257 workerUsage.tasks.executing === 0 &&
1258 this.tasksQueueSize(localWorkerNodeKey) === 0)))
1259 ) {
1260 this.destroyWorkerNode(localWorkerNodeKey).catch(error => {
1261 this.emitter?.emit(PoolEvents.error, error)
1262 })
1263 }
1264 })
1265 const workerInfo = this.getWorkerInfo(workerNodeKey)
1266 this.sendToWorker(workerNodeKey, {
1267 checkActive: true
1268 })
1269 if (this.taskFunctions.size > 0) {
1270 for (const [taskFunctionName, taskFunction] of this.taskFunctions) {
1271 this.sendTaskFunctionOperationToWorker(workerNodeKey, {
1272 taskFunctionOperation: 'add',
1273 taskFunctionName,
1274 taskFunction: taskFunction.toString()
1275 }).catch(error => {
1276 this.emitter?.emit(PoolEvents.error, error)
1277 })
1278 }
1279 }
1280 workerInfo.dynamic = true
1281 if (
1282 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
1283 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1284 ) {
1285 workerInfo.ready = true
1286 }
1287 this.checkAndEmitDynamicWorkerCreationEvents()
1288 return workerNodeKey
1289 }
1290
1291 /**
1292 * Registers a listener callback on the worker given its worker node key.
1293 *
1294 * @param workerNodeKey - The worker node key.
1295 * @param listener - The message listener callback.
1296 */
1297 protected abstract registerWorkerMessageListener<
1298 Message extends Data | Response
1299 >(
1300 workerNodeKey: number,
1301 listener: (message: MessageValue<Message>) => void
1302 ): void
1303
1304 /**
1305 * Registers once a listener callback on the worker given its worker node key.
1306 *
1307 * @param workerNodeKey - The worker node key.
1308 * @param listener - The message listener callback.
1309 */
1310 protected abstract registerOnceWorkerMessageListener<
1311 Message extends Data | Response
1312 >(
1313 workerNodeKey: number,
1314 listener: (message: MessageValue<Message>) => void
1315 ): void
1316
1317 /**
1318 * Deregisters a listener callback on the worker given its worker node key.
1319 *
1320 * @param workerNodeKey - The worker node key.
1321 * @param listener - The message listener callback.
1322 */
1323 protected abstract deregisterWorkerMessageListener<
1324 Message extends Data | Response
1325 >(
1326 workerNodeKey: number,
1327 listener: (message: MessageValue<Message>) => void
1328 ): void
1329
1330 /**
1331 * Method hooked up after a worker node has been newly created.
1332 * Can be overridden.
1333 *
1334 * @param workerNodeKey - The newly created worker node key.
1335 */
1336 protected afterWorkerNodeSetup (workerNodeKey: number): void {
1337 // Listen to worker messages.
1338 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
1339 // Send the startup message to worker.
1340 this.sendStartupMessageToWorker(workerNodeKey)
1341 // Send the statistics message to worker.
1342 this.sendStatisticsMessageToWorker(workerNodeKey)
1343 if (this.opts.enableTasksQueue === true) {
1344 if (this.opts.tasksQueueOptions?.taskStealing === true) {
1345 this.workerNodes[workerNodeKey].onEmptyQueue =
1346 this.taskStealingOnEmptyQueue.bind(this)
1347 }
1348 if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
1349 this.workerNodes[workerNodeKey].onBackPressure =
1350 this.tasksStealingOnBackPressure.bind(this)
1351 }
1352 }
1353 }
1354
1355 /**
1356 * Sends the startup message to worker given its worker node key.
1357 *
1358 * @param workerNodeKey - The worker node key.
1359 */
1360 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1361
1362 /**
1363 * Sends the statistics message to worker given its worker node key.
1364 *
1365 * @param workerNodeKey - The worker node key.
1366 */
1367 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
1368 this.sendToWorker(workerNodeKey, {
1369 statistics: {
1370 runTime:
1371 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1372 .runTime.aggregate,
1373 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1374 .elu.aggregate
1375 }
1376 })
1377 }
1378
1379 private redistributeQueuedTasks (workerNodeKey: number): void {
1380 while (this.tasksQueueSize(workerNodeKey) > 0) {
1381 const destinationWorkerNodeKey = this.workerNodes.reduce(
1382 (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
1383 return workerNode.info.ready &&
1384 workerNode.usage.tasks.queued <
1385 workerNodes[minWorkerNodeKey].usage.tasks.queued
1386 ? workerNodeKey
1387 : minWorkerNodeKey
1388 },
1389 0
1390 )
1391 const task = this.dequeueTask(workerNodeKey) as Task<Data>
1392 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1393 this.executeTask(destinationWorkerNodeKey, task)
1394 } else {
1395 this.enqueueTask(destinationWorkerNodeKey, task)
1396 }
1397 }
1398 }
1399
1400 private updateTaskStolenStatisticsWorkerUsage (
1401 workerNodeKey: number,
1402 taskName: string
1403 ): void {
1404 const workerNode = this.workerNodes[workerNodeKey]
1405 if (workerNode?.usage != null) {
1406 ++workerNode.usage.tasks.stolen
1407 }
1408 if (
1409 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1410 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1411 ) {
1412 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1413 taskName
1414 ) as WorkerUsage
1415 ++taskFunctionWorkerUsage.tasks.stolen
1416 }
1417 }
1418
1419 private taskStealingOnEmptyQueue (workerId: number): void {
1420 const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
1421 const workerNodes = this.workerNodes
1422 .slice()
1423 .sort(
1424 (workerNodeA, workerNodeB) =>
1425 workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
1426 )
1427 const sourceWorkerNode = workerNodes.find(
1428 workerNode =>
1429 workerNode.info.ready &&
1430 workerNode.info.id !== workerId &&
1431 workerNode.usage.tasks.queued > 0
1432 )
1433 if (sourceWorkerNode != null) {
1434 const task = sourceWorkerNode.popTask() as Task<Data>
1435 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1436 this.executeTask(destinationWorkerNodeKey, task)
1437 } else {
1438 this.enqueueTask(destinationWorkerNodeKey, task)
1439 }
1440 this.updateTaskStolenStatisticsWorkerUsage(
1441 destinationWorkerNodeKey,
1442 task.name as string
1443 )
1444 }
1445 }
1446
1447 private tasksStealingOnBackPressure (workerId: number): void {
1448 const sizeOffset = 1
1449 if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) {
1450 return
1451 }
1452 const sourceWorkerNode =
1453 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
1454 const workerNodes = this.workerNodes
1455 .slice()
1456 .sort(
1457 (workerNodeA, workerNodeB) =>
1458 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1459 )
1460 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1461 if (
1462 sourceWorkerNode.usage.tasks.queued > 0 &&
1463 workerNode.info.ready &&
1464 workerNode.info.id !== workerId &&
1465 workerNode.usage.tasks.queued <
1466 (this.opts.tasksQueueOptions?.size as number) - sizeOffset
1467 ) {
1468 const task = sourceWorkerNode.popTask() as Task<Data>
1469 if (this.shallExecuteTask(workerNodeKey)) {
1470 this.executeTask(workerNodeKey, task)
1471 } else {
1472 this.enqueueTask(workerNodeKey, task)
1473 }
1474 this.updateTaskStolenStatisticsWorkerUsage(
1475 workerNodeKey,
1476 task.name as string
1477 )
1478 }
1479 }
1480 }
1481
1482 /**
1483 * This method is the listener registered for each worker message.
1484 *
1485 * @returns The listener function to execute when a message is received from a worker.
1486 */
1487 protected workerListener (): (message: MessageValue<Response>) => void {
1488 return message => {
1489 this.checkMessageWorkerId(message)
1490 if (message.ready != null && message.taskFunctionNames != null) {
1491 // Worker ready response received from worker
1492 this.handleWorkerReadyResponse(message)
1493 } else if (message.taskId != null) {
1494 // Task execution response received from worker
1495 this.handleTaskExecutionResponse(message)
1496 } else if (message.taskFunctionNames != null) {
1497 // Task function names message received from worker
1498 this.getWorkerInfo(
1499 this.getWorkerNodeKeyByWorkerId(message.workerId)
1500 ).taskFunctionNames = message.taskFunctionNames
1501 }
1502 }
1503 }
1504
1505 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
1506 if (message.ready === false) {
1507 throw new Error(
1508 `Worker ${message.workerId as number} failed to initialize`
1509 )
1510 }
1511 const workerInfo = this.getWorkerInfo(
1512 this.getWorkerNodeKeyByWorkerId(message.workerId)
1513 )
1514 workerInfo.ready = message.ready as boolean
1515 workerInfo.taskFunctionNames = message.taskFunctionNames
1516 if (this.ready) {
1517 this.emitter?.emit(PoolEvents.ready, this.info)
1518 }
1519 }
1520
1521 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1522 const { taskId, workerError, data } = message
1523 const promiseResponse = this.promiseResponseMap.get(taskId as string)
1524 if (promiseResponse != null) {
1525 if (workerError != null) {
1526 this.emitter?.emit(PoolEvents.taskError, workerError)
1527 promiseResponse.reject(workerError.message)
1528 } else {
1529 promiseResponse.resolve(data as Response)
1530 }
1531 const workerNodeKey = promiseResponse.workerNodeKey
1532 this.afterTaskExecutionHook(workerNodeKey, message)
1533 this.workerChoiceStrategyContext.update(workerNodeKey)
1534 this.promiseResponseMap.delete(taskId as string)
1535 if (
1536 this.opts.enableTasksQueue === true &&
1537 this.tasksQueueSize(workerNodeKey) > 0 &&
1538 this.workerNodes[workerNodeKey].usage.tasks.executing <
1539 (this.opts.tasksQueueOptions?.concurrency as number)
1540 ) {
1541 this.executeTask(
1542 workerNodeKey,
1543 this.dequeueTask(workerNodeKey) as Task<Data>
1544 )
1545 }
1546 }
1547 }
1548
1549 private checkAndEmitTaskExecutionEvents (): void {
1550 if (this.busy) {
1551 this.emitter?.emit(PoolEvents.busy, this.info)
1552 }
1553 }
1554
1555 private checkAndEmitTaskQueuingEvents (): void {
1556 if (this.hasBackPressure()) {
1557 this.emitter?.emit(PoolEvents.backPressure, this.info)
1558 }
1559 }
1560
1561 private checkAndEmitDynamicWorkerCreationEvents (): void {
1562 if (this.type === PoolTypes.dynamic) {
1563 if (this.full) {
1564 this.emitter?.emit(PoolEvents.full, this.info)
1565 }
1566 }
1567 }
1568
1569 /**
1570 * Gets the worker information given its worker node key.
1571 *
1572 * @param workerNodeKey - The worker node key.
1573 * @returns The worker information.
1574 */
1575 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
1576 return this.workerNodes[workerNodeKey].info
1577 }
1578
1579 /**
1580 * Adds the given worker in the pool worker nodes.
1581 *
1582 * @param worker - The worker.
1583 * @returns The added worker node key.
1584 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1585 */
1586 private addWorkerNode (worker: Worker): number {
1587 const workerNode = new WorkerNode<Worker, Data>(
1588 worker,
1589 this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2)
1590 )
1591 // Flag the worker node as ready at pool startup.
1592 if (this.starting) {
1593 workerNode.info.ready = true
1594 }
1595 this.workerNodes.push(workerNode)
1596 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1597 if (workerNodeKey === -1) {
1598 throw new Error('Worker added not found in worker nodes')
1599 }
1600 return workerNodeKey
1601 }
1602
1603 /**
1604 * Removes the given worker from the pool worker nodes.
1605 *
1606 * @param worker - The worker.
1607 */
1608 private removeWorkerNode (worker: Worker): void {
1609 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1610 if (workerNodeKey !== -1) {
1611 this.workerNodes.splice(workerNodeKey, 1)
1612 this.workerChoiceStrategyContext.remove(workerNodeKey)
1613 }
1614 }
1615
1616 /** @inheritDoc */
1617 public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
1618 return (
1619 this.opts.enableTasksQueue === true &&
1620 this.workerNodes[workerNodeKey].hasBackPressure()
1621 )
1622 }
1623
1624 private hasBackPressure (): boolean {
1625 return (
1626 this.opts.enableTasksQueue === true &&
1627 this.workerNodes.findIndex(
1628 workerNode => !workerNode.hasBackPressure()
1629 ) === -1
1630 )
1631 }
1632
1633 /**
1634 * Executes the given task on the worker given its worker node key.
1635 *
1636 * @param workerNodeKey - The worker node key.
1637 * @param task - The task to execute.
1638 */
1639 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1640 this.beforeTaskExecutionHook(workerNodeKey, task)
1641 this.sendToWorker(workerNodeKey, task, task.transferList)
1642 this.checkAndEmitTaskExecutionEvents()
1643 }
1644
1645 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1646 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1647 this.checkAndEmitTaskQueuingEvents()
1648 return tasksQueueSize
1649 }
1650
1651 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1652 return this.workerNodes[workerNodeKey].dequeueTask()
1653 }
1654
1655 private tasksQueueSize (workerNodeKey: number): number {
1656 return this.workerNodes[workerNodeKey].tasksQueueSize()
1657 }
1658
1659 protected flushTasksQueue (workerNodeKey: number): void {
1660 while (this.tasksQueueSize(workerNodeKey) > 0) {
1661 this.executeTask(
1662 workerNodeKey,
1663 this.dequeueTask(workerNodeKey) as Task<Data>
1664 )
1665 }
1666 this.workerNodes[workerNodeKey].clearTasksQueue()
1667 }
1668
1669 private flushTasksQueues (): void {
1670 for (const [workerNodeKey] of this.workerNodes.entries()) {
1671 this.flushTasksQueue(workerNodeKey)
1672 }
1673 }
1674 }