build(ci): skip tests on node.hs 16.x
[poolifier.git] / src / pools / abstract-pool.ts
1 import { randomUUID } from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import { type TransferListItem } from 'node:worker_threads'
4 import type {
5 MessageValue,
6 PromiseResponseWrapper,
7 Task
8 } from '../utility-types'
9 import {
10 DEFAULT_TASK_NAME,
11 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
12 EMPTY_FUNCTION,
13 average,
14 isKillBehavior,
15 isPlainObject,
16 max,
17 median,
18 min,
19 round
20 } from '../utils'
21 import { KillBehaviors } from '../worker/worker-options'
22 import type { TaskFunction } from '../worker/task-functions'
23 import {
24 type IPool,
25 PoolEmitter,
26 PoolEvents,
27 type PoolInfo,
28 type PoolOptions,
29 type PoolType,
30 PoolTypes,
31 type TasksQueueOptions
32 } from './pool'
33 import type {
34 IWorker,
35 IWorkerNode,
36 WorkerInfo,
37 WorkerType,
38 WorkerUsage
39 } from './worker'
40 import {
41 type MeasurementStatisticsRequirements,
42 Measurements,
43 WorkerChoiceStrategies,
44 type WorkerChoiceStrategy,
45 type WorkerChoiceStrategyOptions
46 } from './selection-strategies/selection-strategies-types'
47 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
48 import { version } from './version'
49 import { WorkerNode } from './worker-node'
50 import {
51 checkFilePath,
52 checkValidTasksQueueOptions,
53 checkValidWorkerChoiceStrategy,
54 updateMeasurementStatistics
55 } from './utils'
56
57 /**
58 * Base class that implements some shared logic for all poolifier pools.
59 *
60 * @typeParam Worker - Type of worker which manages this pool.
61 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
62 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
63 */
64 export abstract class AbstractPool<
65 Worker extends IWorker,
66 Data = unknown,
67 Response = unknown
68 > implements IPool<Worker, Data, Response> {
69 /** @inheritDoc */
70 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
71
72 /** @inheritDoc */
73 public readonly emitter?: PoolEmitter
74
75 /**
76 * The task execution response promise map:
77 * - `key`: The message id of each submitted task.
78 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
79 *
80 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
81 */
82 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
83 new Map<string, PromiseResponseWrapper<Response>>()
84
85 /**
86 * Worker choice strategy context referencing a worker choice algorithm implementation.
87 */
88 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
89 Worker,
90 Data,
91 Response
92 >
93
94 /**
95 * Dynamic pool maximum size property placeholder.
96 */
97 protected readonly max?: number
98
99 /**
100 * The task functions added at runtime map:
101 * - `key`: The task function name.
102 * - `value`: The task function itself.
103 */
104 private readonly taskFunctions: Map<string, TaskFunction<Data, Response>>
105
106 /**
107 * Whether the pool is started or not.
108 */
109 private started: boolean
110 /**
111 * Whether the pool is starting or not.
112 */
113 private starting: boolean
114 /**
115 * The start timestamp of the pool.
116 */
117 private readonly startTimestamp
118
119 /**
120 * Constructs a new poolifier pool.
121 *
122 * @param numberOfWorkers - Number of workers that this pool should manage.
123 * @param filePath - Path to the worker file.
124 * @param opts - Options for the pool.
125 */
126 public constructor (
127 protected readonly numberOfWorkers: number,
128 protected readonly filePath: string,
129 protected readonly opts: PoolOptions<Worker>
130 ) {
131 if (!this.isMain()) {
132 throw new Error(
133 'Cannot start a pool from a worker with the same type as the pool'
134 )
135 }
136 this.checkNumberOfWorkers(this.numberOfWorkers)
137 checkFilePath(this.filePath)
138 this.checkPoolOptions(this.opts)
139
140 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
141 this.executeTask = this.executeTask.bind(this)
142 this.enqueueTask = this.enqueueTask.bind(this)
143
144 if (this.opts.enableEvents === true) {
145 this.emitter = new PoolEmitter()
146 }
147 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
148 Worker,
149 Data,
150 Response
151 >(
152 this,
153 this.opts.workerChoiceStrategy,
154 this.opts.workerChoiceStrategyOptions
155 )
156
157 this.setupHook()
158
159 this.taskFunctions = new Map<string, TaskFunction<Data, Response>>()
160
161 this.started = false
162 this.starting = false
163 if (this.opts.startWorkers === true) {
164 this.start()
165 }
166
167 this.startTimestamp = performance.now()
168 }
169
170 private checkNumberOfWorkers (numberOfWorkers: number): void {
171 if (numberOfWorkers == null) {
172 throw new Error(
173 'Cannot instantiate a pool without specifying the number of workers'
174 )
175 } else if (!Number.isSafeInteger(numberOfWorkers)) {
176 throw new TypeError(
177 'Cannot instantiate a pool with a non safe integer number of workers'
178 )
179 } else if (numberOfWorkers < 0) {
180 throw new RangeError(
181 'Cannot instantiate a pool with a negative number of workers'
182 )
183 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
184 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
185 }
186 }
187
188 private checkPoolOptions (opts: PoolOptions<Worker>): void {
189 if (isPlainObject(opts)) {
190 this.opts.startWorkers = opts.startWorkers ?? true
191 checkValidWorkerChoiceStrategy(
192 opts.workerChoiceStrategy as WorkerChoiceStrategy
193 )
194 this.opts.workerChoiceStrategy =
195 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
196 this.checkValidWorkerChoiceStrategyOptions(
197 opts.workerChoiceStrategyOptions as WorkerChoiceStrategyOptions
198 )
199 this.opts.workerChoiceStrategyOptions = {
200 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
201 ...opts.workerChoiceStrategyOptions
202 }
203 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
204 this.opts.enableEvents = opts.enableEvents ?? true
205 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
206 if (this.opts.enableTasksQueue) {
207 checkValidTasksQueueOptions(opts.tasksQueueOptions as TasksQueueOptions)
208 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
209 opts.tasksQueueOptions as TasksQueueOptions
210 )
211 }
212 } else {
213 throw new TypeError('Invalid pool options: must be a plain object')
214 }
215 }
216
217 private checkValidWorkerChoiceStrategyOptions (
218 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
219 ): void {
220 if (
221 workerChoiceStrategyOptions != null &&
222 !isPlainObject(workerChoiceStrategyOptions)
223 ) {
224 throw new TypeError(
225 'Invalid worker choice strategy options: must be a plain object'
226 )
227 }
228 if (
229 workerChoiceStrategyOptions?.retries != null &&
230 !Number.isSafeInteger(workerChoiceStrategyOptions.retries)
231 ) {
232 throw new TypeError(
233 'Invalid worker choice strategy options: retries must be an integer'
234 )
235 }
236 if (
237 workerChoiceStrategyOptions?.retries != null &&
238 workerChoiceStrategyOptions.retries < 0
239 ) {
240 throw new RangeError(
241 `Invalid worker choice strategy options: retries '${workerChoiceStrategyOptions.retries}' must be greater or equal than zero`
242 )
243 }
244 if (
245 workerChoiceStrategyOptions?.weights != null &&
246 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
247 ) {
248 throw new Error(
249 'Invalid worker choice strategy options: must have a weight for each worker node'
250 )
251 }
252 if (
253 workerChoiceStrategyOptions?.measurement != null &&
254 !Object.values(Measurements).includes(
255 workerChoiceStrategyOptions.measurement
256 )
257 ) {
258 throw new Error(
259 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
260 )
261 }
262 }
263
264 /** @inheritDoc */
265 public get info (): PoolInfo {
266 return {
267 version,
268 type: this.type,
269 worker: this.worker,
270 started: this.started,
271 ready: this.ready,
272 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
273 minSize: this.minSize,
274 maxSize: this.maxSize,
275 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
276 .runTime.aggregate &&
277 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
278 .waitTime.aggregate && { utilization: round(this.utilization) }),
279 workerNodes: this.workerNodes.length,
280 idleWorkerNodes: this.workerNodes.reduce(
281 (accumulator, workerNode) =>
282 workerNode.usage.tasks.executing === 0
283 ? accumulator + 1
284 : accumulator,
285 0
286 ),
287 busyWorkerNodes: this.workerNodes.reduce(
288 (accumulator, workerNode) =>
289 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
290 0
291 ),
292 executedTasks: this.workerNodes.reduce(
293 (accumulator, workerNode) =>
294 accumulator + workerNode.usage.tasks.executed,
295 0
296 ),
297 executingTasks: this.workerNodes.reduce(
298 (accumulator, workerNode) =>
299 accumulator + workerNode.usage.tasks.executing,
300 0
301 ),
302 ...(this.opts.enableTasksQueue === true && {
303 queuedTasks: this.workerNodes.reduce(
304 (accumulator, workerNode) =>
305 accumulator + workerNode.usage.tasks.queued,
306 0
307 )
308 }),
309 ...(this.opts.enableTasksQueue === true && {
310 maxQueuedTasks: this.workerNodes.reduce(
311 (accumulator, workerNode) =>
312 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
313 0
314 )
315 }),
316 ...(this.opts.enableTasksQueue === true && {
317 backPressure: this.hasBackPressure()
318 }),
319 ...(this.opts.enableTasksQueue === true && {
320 stolenTasks: this.workerNodes.reduce(
321 (accumulator, workerNode) =>
322 accumulator + workerNode.usage.tasks.stolen,
323 0
324 )
325 }),
326 failedTasks: this.workerNodes.reduce(
327 (accumulator, workerNode) =>
328 accumulator + workerNode.usage.tasks.failed,
329 0
330 ),
331 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
332 .runTime.aggregate && {
333 runTime: {
334 minimum: round(
335 min(
336 ...this.workerNodes.map(
337 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
338 )
339 )
340 ),
341 maximum: round(
342 max(
343 ...this.workerNodes.map(
344 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
345 )
346 )
347 ),
348 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
349 .runTime.average && {
350 average: round(
351 average(
352 this.workerNodes.reduce<number[]>(
353 (accumulator, workerNode) =>
354 accumulator.concat(workerNode.usage.runTime.history),
355 []
356 )
357 )
358 )
359 }),
360 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
361 .runTime.median && {
362 median: round(
363 median(
364 this.workerNodes.reduce<number[]>(
365 (accumulator, workerNode) =>
366 accumulator.concat(workerNode.usage.runTime.history),
367 []
368 )
369 )
370 )
371 })
372 }
373 }),
374 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
375 .waitTime.aggregate && {
376 waitTime: {
377 minimum: round(
378 min(
379 ...this.workerNodes.map(
380 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
381 )
382 )
383 ),
384 maximum: round(
385 max(
386 ...this.workerNodes.map(
387 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
388 )
389 )
390 ),
391 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
392 .waitTime.average && {
393 average: round(
394 average(
395 this.workerNodes.reduce<number[]>(
396 (accumulator, workerNode) =>
397 accumulator.concat(workerNode.usage.waitTime.history),
398 []
399 )
400 )
401 )
402 }),
403 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
404 .waitTime.median && {
405 median: round(
406 median(
407 this.workerNodes.reduce<number[]>(
408 (accumulator, workerNode) =>
409 accumulator.concat(workerNode.usage.waitTime.history),
410 []
411 )
412 )
413 )
414 })
415 }
416 })
417 }
418 }
419
420 /**
421 * The pool readiness boolean status.
422 */
423 private get ready (): boolean {
424 return (
425 this.workerNodes.reduce(
426 (accumulator, workerNode) =>
427 !workerNode.info.dynamic && workerNode.info.ready
428 ? accumulator + 1
429 : accumulator,
430 0
431 ) >= this.minSize
432 )
433 }
434
435 /**
436 * The approximate pool utilization.
437 *
438 * @returns The pool utilization.
439 */
440 private get utilization (): number {
441 const poolTimeCapacity =
442 (performance.now() - this.startTimestamp) * this.maxSize
443 const totalTasksRunTime = this.workerNodes.reduce(
444 (accumulator, workerNode) =>
445 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
446 0
447 )
448 const totalTasksWaitTime = this.workerNodes.reduce(
449 (accumulator, workerNode) =>
450 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
451 0
452 )
453 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
454 }
455
456 /**
457 * The pool type.
458 *
459 * If it is `'dynamic'`, it provides the `max` property.
460 */
461 protected abstract get type (): PoolType
462
463 /**
464 * The worker type.
465 */
466 protected abstract get worker (): WorkerType
467
468 /**
469 * The pool minimum size.
470 */
471 protected get minSize (): number {
472 return this.numberOfWorkers
473 }
474
475 /**
476 * The pool maximum size.
477 */
478 protected get maxSize (): number {
479 return this.max ?? this.numberOfWorkers
480 }
481
482 /**
483 * Checks if the worker id sent in the received message from a worker is valid.
484 *
485 * @param message - The received message.
486 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
487 */
488 private checkMessageWorkerId (message: MessageValue<Data | Response>): void {
489 if (message.workerId == null) {
490 throw new Error('Worker message received without worker id')
491 } else if (
492 message.workerId != null &&
493 this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
494 ) {
495 throw new Error(
496 `Worker message received from unknown worker '${message.workerId}'`
497 )
498 }
499 }
500
501 /**
502 * Gets the given worker its worker node key.
503 *
504 * @param worker - The worker.
505 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
506 */
507 private getWorkerNodeKeyByWorker (worker: Worker): number {
508 return this.workerNodes.findIndex(
509 workerNode => workerNode.worker === worker
510 )
511 }
512
513 /**
514 * Gets the worker node key given its worker id.
515 *
516 * @param workerId - The worker id.
517 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
518 */
519 private getWorkerNodeKeyByWorkerId (workerId: number | undefined): number {
520 return this.workerNodes.findIndex(
521 workerNode => workerNode.info.id === workerId
522 )
523 }
524
525 /** @inheritDoc */
526 public setWorkerChoiceStrategy (
527 workerChoiceStrategy: WorkerChoiceStrategy,
528 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
529 ): void {
530 checkValidWorkerChoiceStrategy(workerChoiceStrategy)
531 this.opts.workerChoiceStrategy = workerChoiceStrategy
532 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
533 this.opts.workerChoiceStrategy
534 )
535 if (workerChoiceStrategyOptions != null) {
536 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
537 }
538 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
539 workerNode.resetUsage()
540 this.sendStatisticsMessageToWorker(workerNodeKey)
541 }
542 }
543
544 /** @inheritDoc */
545 public setWorkerChoiceStrategyOptions (
546 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
547 ): void {
548 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
549 this.opts.workerChoiceStrategyOptions = {
550 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
551 ...workerChoiceStrategyOptions
552 }
553 this.workerChoiceStrategyContext.setOptions(
554 this.opts.workerChoiceStrategyOptions
555 )
556 }
557
558 /** @inheritDoc */
559 public enableTasksQueue (
560 enable: boolean,
561 tasksQueueOptions?: TasksQueueOptions
562 ): void {
563 if (this.opts.enableTasksQueue === true && !enable) {
564 this.unsetTaskStealing()
565 this.unsetTasksStealingOnBackPressure()
566 this.flushTasksQueues()
567 }
568 this.opts.enableTasksQueue = enable
569 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
570 }
571
572 /** @inheritDoc */
573 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
574 if (this.opts.enableTasksQueue === true) {
575 checkValidTasksQueueOptions(tasksQueueOptions)
576 this.opts.tasksQueueOptions =
577 this.buildTasksQueueOptions(tasksQueueOptions)
578 this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number)
579 if (this.opts.tasksQueueOptions.taskStealing === true) {
580 this.setTaskStealing()
581 } else {
582 this.unsetTaskStealing()
583 }
584 if (this.opts.tasksQueueOptions.tasksStealingOnBackPressure === true) {
585 this.setTasksStealingOnBackPressure()
586 } else {
587 this.unsetTasksStealingOnBackPressure()
588 }
589 } else if (this.opts.tasksQueueOptions != null) {
590 delete this.opts.tasksQueueOptions
591 }
592 }
593
594 private buildTasksQueueOptions (
595 tasksQueueOptions: TasksQueueOptions
596 ): TasksQueueOptions {
597 return {
598 ...{
599 size: Math.pow(this.maxSize, 2),
600 concurrency: 1,
601 taskStealing: true,
602 tasksStealingOnBackPressure: true
603 },
604 ...tasksQueueOptions
605 }
606 }
607
608 private setTasksQueueSize (size: number): void {
609 for (const workerNode of this.workerNodes) {
610 workerNode.tasksQueueBackPressureSize = size
611 }
612 }
613
614 private setTaskStealing (): void {
615 for (const [workerNodeKey] of this.workerNodes.entries()) {
616 this.workerNodes[workerNodeKey].onEmptyQueue =
617 this.taskStealingOnEmptyQueue.bind(this)
618 }
619 }
620
621 private unsetTaskStealing (): void {
622 for (const [workerNodeKey] of this.workerNodes.entries()) {
623 delete this.workerNodes[workerNodeKey].onEmptyQueue
624 }
625 }
626
627 private setTasksStealingOnBackPressure (): void {
628 for (const [workerNodeKey] of this.workerNodes.entries()) {
629 this.workerNodes[workerNodeKey].onBackPressure =
630 this.tasksStealingOnBackPressure.bind(this)
631 }
632 }
633
634 private unsetTasksStealingOnBackPressure (): void {
635 for (const [workerNodeKey] of this.workerNodes.entries()) {
636 delete this.workerNodes[workerNodeKey].onBackPressure
637 }
638 }
639
640 /**
641 * Whether the pool is full or not.
642 *
643 * The pool filling boolean status.
644 */
645 protected get full (): boolean {
646 return this.workerNodes.length >= this.maxSize
647 }
648
649 /**
650 * Whether the pool is busy or not.
651 *
652 * The pool busyness boolean status.
653 */
654 protected abstract get busy (): boolean
655
656 /**
657 * Whether worker nodes are executing concurrently their tasks quota or not.
658 *
659 * @returns Worker nodes busyness boolean status.
660 */
661 protected internalBusy (): boolean {
662 if (this.opts.enableTasksQueue === true) {
663 return (
664 this.workerNodes.findIndex(
665 workerNode =>
666 workerNode.info.ready &&
667 workerNode.usage.tasks.executing <
668 (this.opts.tasksQueueOptions?.concurrency as number)
669 ) === -1
670 )
671 }
672 return (
673 this.workerNodes.findIndex(
674 workerNode =>
675 workerNode.info.ready && workerNode.usage.tasks.executing === 0
676 ) === -1
677 )
678 }
679
680 private async sendTaskFunctionOperationToWorker (
681 workerNodeKey: number,
682 message: MessageValue<Data>
683 ): Promise<boolean> {
684 return await new Promise<boolean>((resolve, reject) => {
685 const taskFunctionOperationListener = (
686 message: MessageValue<Response>
687 ): void => {
688 this.checkMessageWorkerId(message)
689 const workerId = this.getWorkerInfo(workerNodeKey).id as number
690 if (
691 message.taskFunctionOperationStatus != null &&
692 message.workerId === workerId
693 ) {
694 if (message.taskFunctionOperationStatus) {
695 resolve(true)
696 } else if (!message.taskFunctionOperationStatus) {
697 reject(
698 new Error(
699 `Task function operation '${
700 message.taskFunctionOperation as string
701 }' failed on worker ${message.workerId} with error: '${
702 message.workerError?.message as string
703 }'`
704 )
705 )
706 }
707 this.deregisterWorkerMessageListener(
708 this.getWorkerNodeKeyByWorkerId(message.workerId),
709 taskFunctionOperationListener
710 )
711 }
712 }
713 this.registerWorkerMessageListener(
714 workerNodeKey,
715 taskFunctionOperationListener
716 )
717 this.sendToWorker(workerNodeKey, message)
718 })
719 }
720
721 private async sendTaskFunctionOperationToWorkers (
722 message: MessageValue<Data>
723 ): Promise<boolean> {
724 return await new Promise<boolean>((resolve, reject) => {
725 const responsesReceived = new Array<MessageValue<Response>>()
726 const taskFunctionOperationsListener = (
727 message: MessageValue<Response>
728 ): void => {
729 this.checkMessageWorkerId(message)
730 if (message.taskFunctionOperationStatus != null) {
731 responsesReceived.push(message)
732 if (responsesReceived.length === this.workerNodes.length) {
733 if (
734 responsesReceived.every(
735 message => message.taskFunctionOperationStatus === true
736 )
737 ) {
738 resolve(true)
739 } else if (
740 responsesReceived.some(
741 message => message.taskFunctionOperationStatus === false
742 )
743 ) {
744 const errorResponse = responsesReceived.find(
745 response => response.taskFunctionOperationStatus === false
746 )
747 reject(
748 new Error(
749 `Task function operation '${
750 message.taskFunctionOperation as string
751 }' failed on worker ${
752 errorResponse?.workerId as number
753 } with error: '${
754 errorResponse?.workerError?.message as string
755 }'`
756 )
757 )
758 }
759 this.deregisterWorkerMessageListener(
760 this.getWorkerNodeKeyByWorkerId(message.workerId),
761 taskFunctionOperationsListener
762 )
763 }
764 }
765 }
766 for (const [workerNodeKey] of this.workerNodes.entries()) {
767 this.registerWorkerMessageListener(
768 workerNodeKey,
769 taskFunctionOperationsListener
770 )
771 this.sendToWorker(workerNodeKey, message)
772 }
773 })
774 }
775
776 /** @inheritDoc */
777 public hasTaskFunction (name: string): boolean {
778 for (const workerNode of this.workerNodes) {
779 if (
780 Array.isArray(workerNode.info.taskFunctionNames) &&
781 workerNode.info.taskFunctionNames.includes(name)
782 ) {
783 return true
784 }
785 }
786 return false
787 }
788
789 /** @inheritDoc */
790 public async addTaskFunction (
791 name: string,
792 fn: TaskFunction<Data, Response>
793 ): Promise<boolean> {
794 if (typeof name !== 'string') {
795 throw new TypeError('name argument must be a string')
796 }
797 if (typeof name === 'string' && name.trim().length === 0) {
798 throw new TypeError('name argument must not be an empty string')
799 }
800 if (typeof fn !== 'function') {
801 throw new TypeError('fn argument must be a function')
802 }
803 const opResult = await this.sendTaskFunctionOperationToWorkers({
804 taskFunctionOperation: 'add',
805 taskFunctionName: name,
806 taskFunction: fn.toString()
807 })
808 this.taskFunctions.set(name, fn)
809 return opResult
810 }
811
812 /** @inheritDoc */
813 public async removeTaskFunction (name: string): Promise<boolean> {
814 if (!this.taskFunctions.has(name)) {
815 throw new Error(
816 'Cannot remove a task function not handled on the pool side'
817 )
818 }
819 const opResult = await this.sendTaskFunctionOperationToWorkers({
820 taskFunctionOperation: 'remove',
821 taskFunctionName: name
822 })
823 this.deleteTaskFunctionWorkerUsages(name)
824 this.taskFunctions.delete(name)
825 return opResult
826 }
827
828 /** @inheritDoc */
829 public listTaskFunctionNames (): string[] {
830 for (const workerNode of this.workerNodes) {
831 if (
832 Array.isArray(workerNode.info.taskFunctionNames) &&
833 workerNode.info.taskFunctionNames.length > 0
834 ) {
835 return workerNode.info.taskFunctionNames
836 }
837 }
838 return []
839 }
840
841 /** @inheritDoc */
842 public async setDefaultTaskFunction (name: string): Promise<boolean> {
843 return await this.sendTaskFunctionOperationToWorkers({
844 taskFunctionOperation: 'default',
845 taskFunctionName: name
846 })
847 }
848
849 private deleteTaskFunctionWorkerUsages (name: string): void {
850 for (const workerNode of this.workerNodes) {
851 workerNode.deleteTaskFunctionWorkerUsage(name)
852 }
853 }
854
855 private shallExecuteTask (workerNodeKey: number): boolean {
856 return (
857 this.tasksQueueSize(workerNodeKey) === 0 &&
858 this.workerNodes[workerNodeKey].usage.tasks.executing <
859 (this.opts.tasksQueueOptions?.concurrency as number)
860 )
861 }
862
863 /** @inheritDoc */
864 public async execute (
865 data?: Data,
866 name?: string,
867 transferList?: TransferListItem[]
868 ): Promise<Response> {
869 return await new Promise<Response>((resolve, reject) => {
870 if (!this.started) {
871 reject(new Error('Cannot execute a task on not started pool'))
872 return
873 }
874 if (name != null && typeof name !== 'string') {
875 reject(new TypeError('name argument must be a string'))
876 return
877 }
878 if (
879 name != null &&
880 typeof name === 'string' &&
881 name.trim().length === 0
882 ) {
883 reject(new TypeError('name argument must not be an empty string'))
884 return
885 }
886 if (transferList != null && !Array.isArray(transferList)) {
887 reject(new TypeError('transferList argument must be an array'))
888 return
889 }
890 const timestamp = performance.now()
891 const workerNodeKey = this.chooseWorkerNode()
892 const task: Task<Data> = {
893 name: name ?? DEFAULT_TASK_NAME,
894 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
895 data: data ?? ({} as Data),
896 transferList,
897 timestamp,
898 taskId: randomUUID()
899 }
900 this.promiseResponseMap.set(task.taskId as string, {
901 resolve,
902 reject,
903 workerNodeKey
904 })
905 if (
906 this.opts.enableTasksQueue === false ||
907 (this.opts.enableTasksQueue === true &&
908 this.shallExecuteTask(workerNodeKey))
909 ) {
910 this.executeTask(workerNodeKey, task)
911 } else {
912 this.enqueueTask(workerNodeKey, task)
913 }
914 })
915 }
916
917 /** @inheritdoc */
918 public start (): void {
919 this.starting = true
920 while (
921 this.workerNodes.reduce(
922 (accumulator, workerNode) =>
923 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
924 0
925 ) < this.numberOfWorkers
926 ) {
927 this.createAndSetupWorkerNode()
928 }
929 this.starting = false
930 this.started = true
931 }
932
933 /** @inheritDoc */
934 public async destroy (): Promise<void> {
935 await Promise.all(
936 this.workerNodes.map(async (_, workerNodeKey) => {
937 await this.destroyWorkerNode(workerNodeKey)
938 })
939 )
940 this.emitter?.emit(PoolEvents.destroy, this.info)
941 this.started = false
942 }
943
944 protected async sendKillMessageToWorker (
945 workerNodeKey: number
946 ): Promise<void> {
947 await new Promise<void>((resolve, reject) => {
948 const killMessageListener = (message: MessageValue<Response>): void => {
949 this.checkMessageWorkerId(message)
950 if (message.kill === 'success') {
951 resolve()
952 } else if (message.kill === 'failure') {
953 reject(
954 new Error(
955 `Kill message handling failed on worker ${
956 message.workerId as number
957 }`
958 )
959 )
960 }
961 }
962 this.registerWorkerMessageListener(workerNodeKey, killMessageListener)
963 this.sendToWorker(workerNodeKey, { kill: true })
964 })
965 }
966
967 /**
968 * Terminates the worker node given its worker node key.
969 *
970 * @param workerNodeKey - The worker node key.
971 */
972 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
973
974 /**
975 * Setup hook to execute code before worker nodes are created in the abstract constructor.
976 * Can be overridden.
977 *
978 * @virtual
979 */
980 protected setupHook (): void {
981 /* Intentionally empty */
982 }
983
984 /**
985 * Should return whether the worker is the main worker or not.
986 */
987 protected abstract isMain (): boolean
988
989 /**
990 * Hook executed before the worker task execution.
991 * Can be overridden.
992 *
993 * @param workerNodeKey - The worker node key.
994 * @param task - The task to execute.
995 */
996 protected beforeTaskExecutionHook (
997 workerNodeKey: number,
998 task: Task<Data>
999 ): void {
1000 if (this.workerNodes[workerNodeKey]?.usage != null) {
1001 const workerUsage = this.workerNodes[workerNodeKey].usage
1002 ++workerUsage.tasks.executing
1003 this.updateWaitTimeWorkerUsage(workerUsage, task)
1004 }
1005 if (
1006 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1007 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
1008 task.name as string
1009 ) != null
1010 ) {
1011 const taskFunctionWorkerUsage = this.workerNodes[
1012 workerNodeKey
1013 ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
1014 ++taskFunctionWorkerUsage.tasks.executing
1015 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
1016 }
1017 }
1018
1019 /**
1020 * Hook executed after the worker task execution.
1021 * Can be overridden.
1022 *
1023 * @param workerNodeKey - The worker node key.
1024 * @param message - The received message.
1025 */
1026 protected afterTaskExecutionHook (
1027 workerNodeKey: number,
1028 message: MessageValue<Response>
1029 ): void {
1030 if (this.workerNodes[workerNodeKey]?.usage != null) {
1031 const workerUsage = this.workerNodes[workerNodeKey].usage
1032 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
1033 this.updateRunTimeWorkerUsage(workerUsage, message)
1034 this.updateEluWorkerUsage(workerUsage, message)
1035 }
1036 if (
1037 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1038 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
1039 message.taskPerformance?.name as string
1040 ) != null
1041 ) {
1042 const taskFunctionWorkerUsage = this.workerNodes[
1043 workerNodeKey
1044 ].getTaskFunctionWorkerUsage(
1045 message.taskPerformance?.name as string
1046 ) as WorkerUsage
1047 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
1048 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
1049 this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
1050 }
1051 }
1052
1053 /**
1054 * Whether the worker node shall update its task function worker usage or not.
1055 *
1056 * @param workerNodeKey - The worker node key.
1057 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1058 */
1059 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
1060 const workerInfo = this.getWorkerInfo(workerNodeKey)
1061 return (
1062 workerInfo != null &&
1063 Array.isArray(workerInfo.taskFunctionNames) &&
1064 workerInfo.taskFunctionNames.length > 2
1065 )
1066 }
1067
1068 private updateTaskStatisticsWorkerUsage (
1069 workerUsage: WorkerUsage,
1070 message: MessageValue<Response>
1071 ): void {
1072 const workerTaskStatistics = workerUsage.tasks
1073 if (
1074 workerTaskStatistics.executing != null &&
1075 workerTaskStatistics.executing > 0
1076 ) {
1077 --workerTaskStatistics.executing
1078 }
1079 if (message.workerError == null) {
1080 ++workerTaskStatistics.executed
1081 } else {
1082 ++workerTaskStatistics.failed
1083 }
1084 }
1085
1086 private updateRunTimeWorkerUsage (
1087 workerUsage: WorkerUsage,
1088 message: MessageValue<Response>
1089 ): void {
1090 if (message.workerError != null) {
1091 return
1092 }
1093 updateMeasurementStatistics(
1094 workerUsage.runTime,
1095 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
1096 message.taskPerformance?.runTime ?? 0
1097 )
1098 }
1099
1100 private updateWaitTimeWorkerUsage (
1101 workerUsage: WorkerUsage,
1102 task: Task<Data>
1103 ): void {
1104 const timestamp = performance.now()
1105 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
1106 updateMeasurementStatistics(
1107 workerUsage.waitTime,
1108 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
1109 taskWaitTime
1110 )
1111 }
1112
1113 private updateEluWorkerUsage (
1114 workerUsage: WorkerUsage,
1115 message: MessageValue<Response>
1116 ): void {
1117 if (message.workerError != null) {
1118 return
1119 }
1120 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
1121 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
1122 updateMeasurementStatistics(
1123 workerUsage.elu.active,
1124 eluTaskStatisticsRequirements,
1125 message.taskPerformance?.elu?.active ?? 0
1126 )
1127 updateMeasurementStatistics(
1128 workerUsage.elu.idle,
1129 eluTaskStatisticsRequirements,
1130 message.taskPerformance?.elu?.idle ?? 0
1131 )
1132 if (eluTaskStatisticsRequirements.aggregate) {
1133 if (message.taskPerformance?.elu != null) {
1134 if (workerUsage.elu.utilization != null) {
1135 workerUsage.elu.utilization =
1136 (workerUsage.elu.utilization +
1137 message.taskPerformance.elu.utilization) /
1138 2
1139 } else {
1140 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
1141 }
1142 }
1143 }
1144 }
1145
1146 /**
1147 * Chooses a worker node for the next task.
1148 *
1149 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
1150 *
1151 * @returns The chosen worker node key
1152 */
1153 private chooseWorkerNode (): number {
1154 if (this.shallCreateDynamicWorker()) {
1155 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
1156 if (
1157 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1158 ) {
1159 return workerNodeKey
1160 }
1161 }
1162 return this.workerChoiceStrategyContext.execute()
1163 }
1164
1165 /**
1166 * Conditions for dynamic worker creation.
1167 *
1168 * @returns Whether to create a dynamic worker or not.
1169 */
1170 private shallCreateDynamicWorker (): boolean {
1171 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
1172 }
1173
1174 /**
1175 * Sends a message to worker given its worker node key.
1176 *
1177 * @param workerNodeKey - The worker node key.
1178 * @param message - The message.
1179 * @param transferList - The optional array of transferable objects.
1180 */
1181 protected abstract sendToWorker (
1182 workerNodeKey: number,
1183 message: MessageValue<Data>,
1184 transferList?: TransferListItem[]
1185 ): void
1186
1187 /**
1188 * Creates a new worker.
1189 *
1190 * @returns Newly created worker.
1191 */
1192 protected abstract createWorker (): Worker
1193
1194 /**
1195 * Creates a new, completely set up worker node.
1196 *
1197 * @returns New, completely set up worker node key.
1198 */
1199 protected createAndSetupWorkerNode (): number {
1200 const worker = this.createWorker()
1201
1202 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
1203 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
1204 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1205 worker.on('error', error => {
1206 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1207 const workerInfo = this.getWorkerInfo(workerNodeKey)
1208 workerInfo.ready = false
1209 this.workerNodes[workerNodeKey].closeChannel()
1210 this.emitter?.emit(PoolEvents.error, error)
1211 if (
1212 this.started &&
1213 !this.starting &&
1214 this.opts.restartWorkerOnError === true
1215 ) {
1216 if (workerInfo.dynamic) {
1217 this.createAndSetupDynamicWorkerNode()
1218 } else {
1219 this.createAndSetupWorkerNode()
1220 }
1221 }
1222 if (this.started && this.opts.enableTasksQueue === true) {
1223 this.redistributeQueuedTasks(workerNodeKey)
1224 }
1225 })
1226 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
1227 worker.once('exit', () => {
1228 this.removeWorkerNode(worker)
1229 })
1230
1231 const workerNodeKey = this.addWorkerNode(worker)
1232
1233 this.afterWorkerNodeSetup(workerNodeKey)
1234
1235 return workerNodeKey
1236 }
1237
1238 /**
1239 * Creates a new, completely set up dynamic worker node.
1240 *
1241 * @returns New, completely set up dynamic worker node key.
1242 */
1243 protected createAndSetupDynamicWorkerNode (): number {
1244 const workerNodeKey = this.createAndSetupWorkerNode()
1245 this.registerWorkerMessageListener(workerNodeKey, message => {
1246 this.checkMessageWorkerId(message)
1247 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1248 message.workerId
1249 )
1250 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
1251 // Kill message received from worker
1252 if (
1253 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1254 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
1255 ((this.opts.enableTasksQueue === false &&
1256 workerUsage.tasks.executing === 0) ||
1257 (this.opts.enableTasksQueue === true &&
1258 workerUsage.tasks.executing === 0 &&
1259 this.tasksQueueSize(localWorkerNodeKey) === 0)))
1260 ) {
1261 this.destroyWorkerNode(localWorkerNodeKey).catch(error => {
1262 this.emitter?.emit(PoolEvents.error, error)
1263 })
1264 }
1265 })
1266 const workerInfo = this.getWorkerInfo(workerNodeKey)
1267 this.sendToWorker(workerNodeKey, {
1268 checkActive: true
1269 })
1270 if (this.taskFunctions.size > 0) {
1271 for (const [taskFunctionName, taskFunction] of this.taskFunctions) {
1272 this.sendTaskFunctionOperationToWorker(workerNodeKey, {
1273 taskFunctionOperation: 'add',
1274 taskFunctionName,
1275 taskFunction: taskFunction.toString()
1276 }).catch(error => {
1277 this.emitter?.emit(PoolEvents.error, error)
1278 })
1279 }
1280 }
1281 workerInfo.dynamic = true
1282 if (
1283 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
1284 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1285 ) {
1286 workerInfo.ready = true
1287 }
1288 this.checkAndEmitDynamicWorkerCreationEvents()
1289 return workerNodeKey
1290 }
1291
1292 /**
1293 * Registers a listener callback on the worker given its worker node key.
1294 *
1295 * @param workerNodeKey - The worker node key.
1296 * @param listener - The message listener callback.
1297 */
1298 protected abstract registerWorkerMessageListener<
1299 Message extends Data | Response
1300 >(
1301 workerNodeKey: number,
1302 listener: (message: MessageValue<Message>) => void
1303 ): void
1304
1305 /**
1306 * Registers once a listener callback on the worker given its worker node key.
1307 *
1308 * @param workerNodeKey - The worker node key.
1309 * @param listener - The message listener callback.
1310 */
1311 protected abstract registerOnceWorkerMessageListener<
1312 Message extends Data | Response
1313 >(
1314 workerNodeKey: number,
1315 listener: (message: MessageValue<Message>) => void
1316 ): void
1317
1318 /**
1319 * Deregisters a listener callback on the worker given its worker node key.
1320 *
1321 * @param workerNodeKey - The worker node key.
1322 * @param listener - The message listener callback.
1323 */
1324 protected abstract deregisterWorkerMessageListener<
1325 Message extends Data | Response
1326 >(
1327 workerNodeKey: number,
1328 listener: (message: MessageValue<Message>) => void
1329 ): void
1330
1331 /**
1332 * Method hooked up after a worker node has been newly created.
1333 * Can be overridden.
1334 *
1335 * @param workerNodeKey - The newly created worker node key.
1336 */
1337 protected afterWorkerNodeSetup (workerNodeKey: number): void {
1338 // Listen to worker messages.
1339 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
1340 // Send the startup message to worker.
1341 this.sendStartupMessageToWorker(workerNodeKey)
1342 // Send the statistics message to worker.
1343 this.sendStatisticsMessageToWorker(workerNodeKey)
1344 if (this.opts.enableTasksQueue === true) {
1345 if (this.opts.tasksQueueOptions?.taskStealing === true) {
1346 this.workerNodes[workerNodeKey].onEmptyQueue =
1347 this.taskStealingOnEmptyQueue.bind(this)
1348 }
1349 if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
1350 this.workerNodes[workerNodeKey].onBackPressure =
1351 this.tasksStealingOnBackPressure.bind(this)
1352 }
1353 }
1354 }
1355
1356 /**
1357 * Sends the startup message to worker given its worker node key.
1358 *
1359 * @param workerNodeKey - The worker node key.
1360 */
1361 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1362
1363 /**
1364 * Sends the statistics message to worker given its worker node key.
1365 *
1366 * @param workerNodeKey - The worker node key.
1367 */
1368 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
1369 this.sendToWorker(workerNodeKey, {
1370 statistics: {
1371 runTime:
1372 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1373 .runTime.aggregate,
1374 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1375 .elu.aggregate
1376 }
1377 })
1378 }
1379
1380 private redistributeQueuedTasks (workerNodeKey: number): void {
1381 while (this.tasksQueueSize(workerNodeKey) > 0) {
1382 const destinationWorkerNodeKey = this.workerNodes.reduce(
1383 (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
1384 return workerNode.info.ready &&
1385 workerNode.usage.tasks.queued <
1386 workerNodes[minWorkerNodeKey].usage.tasks.queued
1387 ? workerNodeKey
1388 : minWorkerNodeKey
1389 },
1390 0
1391 )
1392 const task = this.dequeueTask(workerNodeKey) as Task<Data>
1393 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1394 this.executeTask(destinationWorkerNodeKey, task)
1395 } else {
1396 this.enqueueTask(destinationWorkerNodeKey, task)
1397 }
1398 }
1399 }
1400
1401 private updateTaskStolenStatisticsWorkerUsage (
1402 workerNodeKey: number,
1403 taskName: string
1404 ): void {
1405 const workerNode = this.workerNodes[workerNodeKey]
1406 if (workerNode?.usage != null) {
1407 ++workerNode.usage.tasks.stolen
1408 }
1409 if (
1410 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1411 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1412 ) {
1413 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1414 taskName
1415 ) as WorkerUsage
1416 ++taskFunctionWorkerUsage.tasks.stolen
1417 }
1418 }
1419
1420 private taskStealingOnEmptyQueue (workerId: number): void {
1421 const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
1422 const workerNodes = this.workerNodes
1423 .slice()
1424 .sort(
1425 (workerNodeA, workerNodeB) =>
1426 workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
1427 )
1428 const sourceWorkerNode = workerNodes.find(
1429 workerNode =>
1430 workerNode.info.ready &&
1431 workerNode.info.id !== workerId &&
1432 workerNode.usage.tasks.queued > 0
1433 )
1434 if (sourceWorkerNode != null) {
1435 const task = sourceWorkerNode.popTask() as Task<Data>
1436 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1437 this.executeTask(destinationWorkerNodeKey, task)
1438 } else {
1439 this.enqueueTask(destinationWorkerNodeKey, task)
1440 }
1441 this.updateTaskStolenStatisticsWorkerUsage(
1442 destinationWorkerNodeKey,
1443 task.name as string
1444 )
1445 }
1446 }
1447
1448 private tasksStealingOnBackPressure (workerId: number): void {
1449 const sizeOffset = 1
1450 if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) {
1451 return
1452 }
1453 const sourceWorkerNode =
1454 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
1455 const workerNodes = this.workerNodes
1456 .slice()
1457 .sort(
1458 (workerNodeA, workerNodeB) =>
1459 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1460 )
1461 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1462 if (
1463 sourceWorkerNode.usage.tasks.queued > 0 &&
1464 workerNode.info.ready &&
1465 workerNode.info.id !== workerId &&
1466 workerNode.usage.tasks.queued <
1467 (this.opts.tasksQueueOptions?.size as number) - sizeOffset
1468 ) {
1469 const task = sourceWorkerNode.popTask() as Task<Data>
1470 if (this.shallExecuteTask(workerNodeKey)) {
1471 this.executeTask(workerNodeKey, task)
1472 } else {
1473 this.enqueueTask(workerNodeKey, task)
1474 }
1475 this.updateTaskStolenStatisticsWorkerUsage(
1476 workerNodeKey,
1477 task.name as string
1478 )
1479 }
1480 }
1481 }
1482
1483 /**
1484 * This method is the listener registered for each worker message.
1485 *
1486 * @returns The listener function to execute when a message is received from a worker.
1487 */
1488 protected workerListener (): (message: MessageValue<Response>) => void {
1489 return message => {
1490 this.checkMessageWorkerId(message)
1491 if (message.ready != null && message.taskFunctionNames != null) {
1492 // Worker ready response received from worker
1493 this.handleWorkerReadyResponse(message)
1494 } else if (message.taskId != null) {
1495 // Task execution response received from worker
1496 this.handleTaskExecutionResponse(message)
1497 } else if (message.taskFunctionNames != null) {
1498 // Task function names message received from worker
1499 this.getWorkerInfo(
1500 this.getWorkerNodeKeyByWorkerId(message.workerId)
1501 ).taskFunctionNames = message.taskFunctionNames
1502 }
1503 }
1504 }
1505
1506 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
1507 if (message.ready === false) {
1508 throw new Error(
1509 `Worker ${message.workerId as number} failed to initialize`
1510 )
1511 }
1512 const workerInfo = this.getWorkerInfo(
1513 this.getWorkerNodeKeyByWorkerId(message.workerId)
1514 )
1515 workerInfo.ready = message.ready as boolean
1516 workerInfo.taskFunctionNames = message.taskFunctionNames
1517 if (this.ready) {
1518 this.emitter?.emit(PoolEvents.ready, this.info)
1519 }
1520 }
1521
1522 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1523 const { taskId, workerError, data } = message
1524 const promiseResponse = this.promiseResponseMap.get(taskId as string)
1525 if (promiseResponse != null) {
1526 if (workerError != null) {
1527 this.emitter?.emit(PoolEvents.taskError, workerError)
1528 promiseResponse.reject(workerError.message)
1529 } else {
1530 promiseResponse.resolve(data as Response)
1531 }
1532 const workerNodeKey = promiseResponse.workerNodeKey
1533 this.afterTaskExecutionHook(workerNodeKey, message)
1534 this.workerChoiceStrategyContext.update(workerNodeKey)
1535 this.promiseResponseMap.delete(taskId as string)
1536 if (
1537 this.opts.enableTasksQueue === true &&
1538 this.tasksQueueSize(workerNodeKey) > 0 &&
1539 this.workerNodes[workerNodeKey].usage.tasks.executing <
1540 (this.opts.tasksQueueOptions?.concurrency as number)
1541 ) {
1542 this.executeTask(
1543 workerNodeKey,
1544 this.dequeueTask(workerNodeKey) as Task<Data>
1545 )
1546 }
1547 }
1548 }
1549
1550 private checkAndEmitTaskExecutionEvents (): void {
1551 if (this.busy) {
1552 this.emitter?.emit(PoolEvents.busy, this.info)
1553 }
1554 }
1555
1556 private checkAndEmitTaskQueuingEvents (): void {
1557 if (this.hasBackPressure()) {
1558 this.emitter?.emit(PoolEvents.backPressure, this.info)
1559 }
1560 }
1561
1562 private checkAndEmitDynamicWorkerCreationEvents (): void {
1563 if (this.type === PoolTypes.dynamic) {
1564 if (this.full) {
1565 this.emitter?.emit(PoolEvents.full, this.info)
1566 }
1567 }
1568 }
1569
1570 /**
1571 * Gets the worker information given its worker node key.
1572 *
1573 * @param workerNodeKey - The worker node key.
1574 * @returns The worker information.
1575 */
1576 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
1577 return this.workerNodes[workerNodeKey].info
1578 }
1579
1580 /**
1581 * Adds the given worker in the pool worker nodes.
1582 *
1583 * @param worker - The worker.
1584 * @returns The added worker node key.
1585 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1586 */
1587 private addWorkerNode (worker: Worker): number {
1588 const workerNode = new WorkerNode<Worker, Data>(
1589 worker,
1590 this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2)
1591 )
1592 // Flag the worker node as ready at pool startup.
1593 if (this.starting) {
1594 workerNode.info.ready = true
1595 }
1596 this.workerNodes.push(workerNode)
1597 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1598 if (workerNodeKey === -1) {
1599 throw new Error('Worker added not found in worker nodes')
1600 }
1601 return workerNodeKey
1602 }
1603
1604 /**
1605 * Removes the given worker from the pool worker nodes.
1606 *
1607 * @param worker - The worker.
1608 */
1609 private removeWorkerNode (worker: Worker): void {
1610 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1611 if (workerNodeKey !== -1) {
1612 this.workerNodes.splice(workerNodeKey, 1)
1613 this.workerChoiceStrategyContext.remove(workerNodeKey)
1614 }
1615 }
1616
1617 /** @inheritDoc */
1618 public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
1619 return (
1620 this.opts.enableTasksQueue === true &&
1621 this.workerNodes[workerNodeKey].hasBackPressure()
1622 )
1623 }
1624
1625 private hasBackPressure (): boolean {
1626 return (
1627 this.opts.enableTasksQueue === true &&
1628 this.workerNodes.findIndex(
1629 workerNode => !workerNode.hasBackPressure()
1630 ) === -1
1631 )
1632 }
1633
1634 /**
1635 * Executes the given task on the worker given its worker node key.
1636 *
1637 * @param workerNodeKey - The worker node key.
1638 * @param task - The task to execute.
1639 */
1640 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1641 this.beforeTaskExecutionHook(workerNodeKey, task)
1642 this.sendToWorker(workerNodeKey, task, task.transferList)
1643 this.checkAndEmitTaskExecutionEvents()
1644 }
1645
1646 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1647 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1648 this.checkAndEmitTaskQueuingEvents()
1649 return tasksQueueSize
1650 }
1651
1652 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1653 return this.workerNodes[workerNodeKey].dequeueTask()
1654 }
1655
1656 private tasksQueueSize (workerNodeKey: number): number {
1657 return this.workerNodes[workerNodeKey].tasksQueueSize()
1658 }
1659
1660 protected flushTasksQueue (workerNodeKey: number): void {
1661 while (this.tasksQueueSize(workerNodeKey) > 0) {
1662 this.executeTask(
1663 workerNodeKey,
1664 this.dequeueTask(workerNodeKey) as Task<Data>
1665 )
1666 }
1667 this.workerNodes[workerNodeKey].clearTasksQueue()
1668 }
1669
1670 private flushTasksQueues (): void {
1671 for (const [workerNodeKey] of this.workerNodes.entries()) {
1672 this.flushTasksQueue(workerNodeKey)
1673 }
1674 }
1675 }