refactor: cleanup workerId usage in messaging
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
2845f2a5 1import { randomUUID } from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
3d6dd312 3import { existsSync } from 'node:fs'
7d91a8cd 4import { type TransferListItem } from 'node:worker_threads'
5c4d16da
JB
5import type {
6 MessageValue,
7 PromiseResponseWrapper,
76d91ea0 8 Task
5c4d16da 9} from '../utility-types'
bbeadd16 10import {
ff128cc9 11 DEFAULT_TASK_NAME,
bbeadd16
JB
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
13 EMPTY_FUNCTION,
dc021bcc 14 average,
59317253 15 isKillBehavior,
0d80593b 16 isPlainObject,
90d6701c 17 max,
afe0d5bf 18 median,
90d6701c 19 min,
e4f20deb
JB
20 round,
21 updateMeasurementStatistics
bbeadd16 22} from '../utils'
59317253 23import { KillBehaviors } from '../worker/worker-options'
6703b9f4 24import type { TaskFunction } from '../worker/task-functions'
c4855468 25import {
65d7a1c9 26 type IPool,
7c5a1080 27 PoolEmitter,
c4855468 28 PoolEvents,
6b27d407 29 type PoolInfo,
c4855468 30 type PoolOptions,
6b27d407
JB
31 type PoolType,
32 PoolTypes,
4b628b48 33 type TasksQueueOptions
c4855468 34} from './pool'
bbfa38a2
JB
35import type {
36 IWorker,
37 IWorkerNode,
38 WorkerInfo,
39 WorkerType,
40 WorkerUsage
e102732c 41} from './worker'
a35560ba 42import {
008512c7 43 type MeasurementStatisticsRequirements,
f0d7f803 44 Measurements,
a35560ba 45 WorkerChoiceStrategies,
a20f0ba5
JB
46 type WorkerChoiceStrategy,
47 type WorkerChoiceStrategyOptions
bdaf31cd
JB
48} from './selection-strategies/selection-strategies-types'
49import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
92b1feaa 50import { version } from './version'
4b628b48 51import { WorkerNode } from './worker-node'
23ccf9d7 52
729c563d 53/**
ea7a90d3 54 * Base class that implements some shared logic for all poolifier pools.
729c563d 55 *
38e795c1 56 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
57 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
58 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 59 */
c97c7edb 60export abstract class AbstractPool<
f06e48d8 61 Worker extends IWorker,
d3c8a1a8
S
62 Data = unknown,
63 Response = unknown
c4855468 64> implements IPool<Worker, Data, Response> {
afc003b2 65 /** @inheritDoc */
4b628b48 66 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
4a6952ff 67
afc003b2 68 /** @inheritDoc */
7c0ba920
JB
69 public readonly emitter?: PoolEmitter
70
be0676b3 71 /**
52b71763 72 * The task execution response promise map.
be0676b3 73 *
2740a743 74 * - `key`: The message id of each submitted task.
a3445496 75 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 76 *
a3445496 77 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 78 */
501aea93
JB
79 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
80 new Map<string, PromiseResponseWrapper<Response>>()
c97c7edb 81
a35560ba 82 /**
51fe3d3c 83 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
84 */
85 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
86 Worker,
87 Data,
88 Response
a35560ba
S
89 >
90
8735b4e5
JB
91 /**
92 * Dynamic pool maximum size property placeholder.
93 */
94 protected readonly max?: number
95
72ae84a2
JB
96 /**
97 * The task functions added at runtime map:
98 * - `key`: The task function name.
99 * - `value`: The task function itself.
100 */
101 private readonly taskFunctions: Map<string, TaskFunction<Data, Response>>
102
075e51d1 103 /**
adc9cc64 104 * Whether the pool is starting or not.
075e51d1
JB
105 */
106 private readonly starting: boolean
15b176e0
JB
107 /**
108 * Whether the pool is started or not.
109 */
110 private started: boolean
afe0d5bf
JB
111 /**
112 * The start timestamp of the pool.
113 */
114 private readonly startTimestamp
115
729c563d
S
116 /**
117 * Constructs a new poolifier pool.
118 *
38e795c1 119 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 120 * @param filePath - Path to the worker file.
38e795c1 121 * @param opts - Options for the pool.
729c563d 122 */
c97c7edb 123 public constructor (
b4213b7f
JB
124 protected readonly numberOfWorkers: number,
125 protected readonly filePath: string,
126 protected readonly opts: PoolOptions<Worker>
c97c7edb 127 ) {
78cea37e 128 if (!this.isMain()) {
04f45163 129 throw new Error(
8c6d4acf 130 'Cannot start a pool from a worker with the same type as the pool'
04f45163 131 )
c97c7edb 132 }
8d3782fa 133 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 134 this.checkFilePath(this.filePath)
7c0ba920 135 this.checkPoolOptions(this.opts)
1086026a 136
7254e419
JB
137 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
138 this.executeTask = this.executeTask.bind(this)
139 this.enqueueTask = this.enqueueTask.bind(this)
1086026a 140
6bd72cd0 141 if (this.opts.enableEvents === true) {
7c0ba920
JB
142 this.emitter = new PoolEmitter()
143 }
d59df138
JB
144 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
145 Worker,
146 Data,
147 Response
da309861
JB
148 >(
149 this,
150 this.opts.workerChoiceStrategy,
151 this.opts.workerChoiceStrategyOptions
152 )
b6b32453
JB
153
154 this.setupHook()
155
72ae84a2
JB
156 this.taskFunctions = new Map<string, TaskFunction<Data, Response>>()
157
075e51d1 158 this.starting = true
e761c033 159 this.startPool()
075e51d1 160 this.starting = false
15b176e0 161 this.started = true
afe0d5bf
JB
162
163 this.startTimestamp = performance.now()
c97c7edb
S
164 }
165
a35560ba 166 private checkFilePath (filePath: string): void {
ffcbbad8
JB
167 if (
168 filePath == null ||
3d6dd312 169 typeof filePath !== 'string' ||
ffcbbad8
JB
170 (typeof filePath === 'string' && filePath.trim().length === 0)
171 ) {
c510fea7
APA
172 throw new Error('Please specify a file with a worker implementation')
173 }
3d6dd312
JB
174 if (!existsSync(filePath)) {
175 throw new Error(`Cannot find the worker file '${filePath}'`)
176 }
c510fea7
APA
177 }
178
8d3782fa
JB
179 private checkNumberOfWorkers (numberOfWorkers: number): void {
180 if (numberOfWorkers == null) {
181 throw new Error(
182 'Cannot instantiate a pool without specifying the number of workers'
183 )
78cea37e 184 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 185 throw new TypeError(
0d80593b 186 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
187 )
188 } else if (numberOfWorkers < 0) {
473c717a 189 throw new RangeError(
8d3782fa
JB
190 'Cannot instantiate a pool with a negative number of workers'
191 )
6b27d407 192 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
2431bdb4
JB
193 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
194 }
195 }
196
197 protected checkDynamicPoolSize (min: number, max: number): void {
079de991 198 if (this.type === PoolTypes.dynamic) {
a5ed75b7 199 if (max == null) {
e695d66f 200 throw new TypeError(
a5ed75b7
JB
201 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
202 )
203 } else if (!Number.isSafeInteger(max)) {
2761efb4
JB
204 throw new TypeError(
205 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
206 )
207 } else if (min > max) {
079de991
JB
208 throw new RangeError(
209 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
210 )
b97d82d8 211 } else if (max === 0) {
079de991 212 throw new RangeError(
d640b48b 213 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
079de991
JB
214 )
215 } else if (min === max) {
216 throw new RangeError(
217 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
218 )
219 }
8d3782fa
JB
220 }
221 }
222
7c0ba920 223 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b
JB
224 if (isPlainObject(opts)) {
225 this.opts.workerChoiceStrategy =
226 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
227 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
8990357d
JB
228 this.opts.workerChoiceStrategyOptions = {
229 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
230 ...opts.workerChoiceStrategyOptions
231 }
49be33fe
JB
232 this.checkValidWorkerChoiceStrategyOptions(
233 this.opts.workerChoiceStrategyOptions
234 )
1f68cede 235 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
236 this.opts.enableEvents = opts.enableEvents ?? true
237 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
238 if (this.opts.enableTasksQueue) {
239 this.checkValidTasksQueueOptions(
240 opts.tasksQueueOptions as TasksQueueOptions
241 )
242 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
243 opts.tasksQueueOptions as TasksQueueOptions
244 )
245 }
246 } else {
247 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 248 }
aee46736
JB
249 }
250
251 private checkValidWorkerChoiceStrategy (
252 workerChoiceStrategy: WorkerChoiceStrategy
253 ): void {
254 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
b529c323 255 throw new Error(
aee46736 256 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
257 )
258 }
7c0ba920
JB
259 }
260
0d80593b
JB
261 private checkValidWorkerChoiceStrategyOptions (
262 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
263 ): void {
264 if (!isPlainObject(workerChoiceStrategyOptions)) {
265 throw new TypeError(
266 'Invalid worker choice strategy options: must be a plain object'
267 )
268 }
8990357d 269 if (
8c0b113f
JB
270 workerChoiceStrategyOptions.retries != null &&
271 !Number.isSafeInteger(workerChoiceStrategyOptions.retries)
8990357d
JB
272 ) {
273 throw new TypeError(
8c0b113f 274 'Invalid worker choice strategy options: retries must be an integer'
8990357d
JB
275 )
276 }
277 if (
8c0b113f
JB
278 workerChoiceStrategyOptions.retries != null &&
279 workerChoiceStrategyOptions.retries < 0
8990357d
JB
280 ) {
281 throw new RangeError(
8c0b113f 282 `Invalid worker choice strategy options: retries '${workerChoiceStrategyOptions.retries}' must be greater or equal than zero`
8990357d
JB
283 )
284 }
49be33fe
JB
285 if (
286 workerChoiceStrategyOptions.weights != null &&
6b27d407 287 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
288 ) {
289 throw new Error(
290 'Invalid worker choice strategy options: must have a weight for each worker node'
291 )
292 }
f0d7f803
JB
293 if (
294 workerChoiceStrategyOptions.measurement != null &&
295 !Object.values(Measurements).includes(
296 workerChoiceStrategyOptions.measurement
297 )
298 ) {
299 throw new Error(
300 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
301 )
302 }
0d80593b
JB
303 }
304
a20f0ba5 305 private checkValidTasksQueueOptions (
76d91ea0 306 tasksQueueOptions: TasksQueueOptions
a20f0ba5 307 ): void {
0d80593b
JB
308 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
309 throw new TypeError('Invalid tasks queue options: must be a plain object')
310 }
f0d7f803 311 if (
b7d085c4
JB
312 tasksQueueOptions?.concurrency != null &&
313 !Number.isSafeInteger(tasksQueueOptions?.concurrency)
f0d7f803
JB
314 ) {
315 throw new TypeError(
20c6f652 316 'Invalid worker node tasks concurrency: must be an integer'
f0d7f803
JB
317 )
318 }
319 if (
b7d085c4
JB
320 tasksQueueOptions?.concurrency != null &&
321 tasksQueueOptions?.concurrency <= 0
f0d7f803 322 ) {
e695d66f 323 throw new RangeError(
b7d085c4 324 `Invalid worker node tasks concurrency: ${tasksQueueOptions?.concurrency} is a negative integer or zero`
20c6f652
JB
325 )
326 }
b7d085c4 327 if (tasksQueueOptions?.queueMaxSize != null) {
ff3f866a 328 throw new Error(
68dbcdc0 329 'Invalid tasks queue options: queueMaxSize is deprecated, please use size instead'
20c6f652
JB
330 )
331 }
332 if (
b7d085c4
JB
333 tasksQueueOptions?.size != null &&
334 !Number.isSafeInteger(tasksQueueOptions?.size)
20c6f652 335 ) {
ff3f866a 336 throw new TypeError(
68dbcdc0 337 'Invalid worker node tasks queue size: must be an integer'
ff3f866a
JB
338 )
339 }
b7d085c4 340 if (tasksQueueOptions?.size != null && tasksQueueOptions?.size <= 0) {
20c6f652 341 throw new RangeError(
b7d085c4 342 `Invalid worker node tasks queue size: ${tasksQueueOptions?.size} is a negative integer or zero`
a20f0ba5
JB
343 )
344 }
345 }
346
e761c033
JB
347 private startPool (): void {
348 while (
349 this.workerNodes.reduce(
350 (accumulator, workerNode) =>
351 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
352 0
353 ) < this.numberOfWorkers
354 ) {
aa9eede8 355 this.createAndSetupWorkerNode()
e761c033
JB
356 }
357 }
358
08f3f44c 359 /** @inheritDoc */
6b27d407
JB
360 public get info (): PoolInfo {
361 return {
23ccf9d7 362 version,
6b27d407 363 type: this.type,
184855e6 364 worker: this.worker,
2431bdb4
JB
365 ready: this.ready,
366 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
6b27d407
JB
367 minSize: this.minSize,
368 maxSize: this.maxSize,
c05f0d50
JB
369 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
370 .runTime.aggregate &&
1305e9a8
JB
371 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
372 .waitTime.aggregate && { utilization: round(this.utilization) }),
6b27d407
JB
373 workerNodes: this.workerNodes.length,
374 idleWorkerNodes: this.workerNodes.reduce(
375 (accumulator, workerNode) =>
f59e1027 376 workerNode.usage.tasks.executing === 0
a4e07f72
JB
377 ? accumulator + 1
378 : accumulator,
6b27d407
JB
379 0
380 ),
381 busyWorkerNodes: this.workerNodes.reduce(
382 (accumulator, workerNode) =>
f59e1027 383 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
6b27d407
JB
384 0
385 ),
a4e07f72 386 executedTasks: this.workerNodes.reduce(
6b27d407 387 (accumulator, workerNode) =>
f59e1027 388 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
389 0
390 ),
391 executingTasks: this.workerNodes.reduce(
392 (accumulator, workerNode) =>
f59e1027 393 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
394 0
395 ),
daf86646
JB
396 ...(this.opts.enableTasksQueue === true && {
397 queuedTasks: this.workerNodes.reduce(
398 (accumulator, workerNode) =>
399 accumulator + workerNode.usage.tasks.queued,
400 0
401 )
402 }),
403 ...(this.opts.enableTasksQueue === true && {
404 maxQueuedTasks: this.workerNodes.reduce(
405 (accumulator, workerNode) =>
406 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
407 0
408 )
409 }),
a1763c54
JB
410 ...(this.opts.enableTasksQueue === true && {
411 backPressure: this.hasBackPressure()
412 }),
68cbdc84
JB
413 ...(this.opts.enableTasksQueue === true && {
414 stolenTasks: this.workerNodes.reduce(
415 (accumulator, workerNode) =>
416 accumulator + workerNode.usage.tasks.stolen,
417 0
418 )
419 }),
a4e07f72
JB
420 failedTasks: this.workerNodes.reduce(
421 (accumulator, workerNode) =>
f59e1027 422 accumulator + workerNode.usage.tasks.failed,
a4e07f72 423 0
1dcf8b7b
JB
424 ),
425 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
426 .runTime.aggregate && {
427 runTime: {
98e72cda 428 minimum: round(
90d6701c 429 min(
98e72cda 430 ...this.workerNodes.map(
041dc05b 431 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
98e72cda 432 )
1dcf8b7b
JB
433 )
434 ),
98e72cda 435 maximum: round(
90d6701c 436 max(
98e72cda 437 ...this.workerNodes.map(
041dc05b 438 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
98e72cda 439 )
1dcf8b7b 440 )
98e72cda 441 ),
3baa0837
JB
442 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
443 .runTime.average && {
444 average: round(
445 average(
446 this.workerNodes.reduce<number[]>(
447 (accumulator, workerNode) =>
448 accumulator.concat(workerNode.usage.runTime.history),
449 []
450 )
98e72cda 451 )
dc021bcc 452 )
3baa0837 453 }),
98e72cda
JB
454 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
455 .runTime.median && {
456 median: round(
457 median(
3baa0837
JB
458 this.workerNodes.reduce<number[]>(
459 (accumulator, workerNode) =>
460 accumulator.concat(workerNode.usage.runTime.history),
461 []
98e72cda
JB
462 )
463 )
464 )
465 })
1dcf8b7b
JB
466 }
467 }),
468 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
469 .waitTime.aggregate && {
470 waitTime: {
98e72cda 471 minimum: round(
90d6701c 472 min(
98e72cda 473 ...this.workerNodes.map(
041dc05b 474 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
98e72cda 475 )
1dcf8b7b
JB
476 )
477 ),
98e72cda 478 maximum: round(
90d6701c 479 max(
98e72cda 480 ...this.workerNodes.map(
041dc05b 481 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
98e72cda 482 )
1dcf8b7b 483 )
98e72cda 484 ),
3baa0837
JB
485 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
486 .waitTime.average && {
487 average: round(
488 average(
489 this.workerNodes.reduce<number[]>(
490 (accumulator, workerNode) =>
491 accumulator.concat(workerNode.usage.waitTime.history),
492 []
493 )
98e72cda 494 )
dc021bcc 495 )
3baa0837 496 }),
98e72cda
JB
497 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
498 .waitTime.median && {
499 median: round(
500 median(
3baa0837
JB
501 this.workerNodes.reduce<number[]>(
502 (accumulator, workerNode) =>
503 accumulator.concat(workerNode.usage.waitTime.history),
504 []
98e72cda
JB
505 )
506 )
507 )
508 })
1dcf8b7b
JB
509 }
510 })
6b27d407
JB
511 }
512 }
08f3f44c 513
aa9eede8
JB
514 /**
515 * The pool readiness boolean status.
516 */
2431bdb4
JB
517 private get ready (): boolean {
518 return (
b97d82d8
JB
519 this.workerNodes.reduce(
520 (accumulator, workerNode) =>
521 !workerNode.info.dynamic && workerNode.info.ready
522 ? accumulator + 1
523 : accumulator,
524 0
525 ) >= this.minSize
2431bdb4
JB
526 )
527 }
528
afe0d5bf 529 /**
aa9eede8 530 * The approximate pool utilization.
afe0d5bf
JB
531 *
532 * @returns The pool utilization.
533 */
534 private get utilization (): number {
8e5ca040 535 const poolTimeCapacity =
fe7d90db 536 (performance.now() - this.startTimestamp) * this.maxSize
afe0d5bf
JB
537 const totalTasksRunTime = this.workerNodes.reduce(
538 (accumulator, workerNode) =>
71514351 539 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
afe0d5bf
JB
540 0
541 )
542 const totalTasksWaitTime = this.workerNodes.reduce(
543 (accumulator, workerNode) =>
71514351 544 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
afe0d5bf
JB
545 0
546 )
8e5ca040 547 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
afe0d5bf
JB
548 }
549
8881ae32 550 /**
aa9eede8 551 * The pool type.
8881ae32
JB
552 *
553 * If it is `'dynamic'`, it provides the `max` property.
554 */
555 protected abstract get type (): PoolType
556
184855e6 557 /**
aa9eede8 558 * The worker type.
184855e6
JB
559 */
560 protected abstract get worker (): WorkerType
561
c2ade475 562 /**
aa9eede8 563 * The pool minimum size.
c2ade475 564 */
8735b4e5
JB
565 protected get minSize (): number {
566 return this.numberOfWorkers
567 }
ff733df7
JB
568
569 /**
aa9eede8 570 * The pool maximum size.
ff733df7 571 */
8735b4e5
JB
572 protected get maxSize (): number {
573 return this.max ?? this.numberOfWorkers
574 }
a35560ba 575
6b813701
JB
576 /**
577 * Checks if the worker id sent in the received message from a worker is valid.
578 *
579 * @param message - The received message.
580 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
581 */
21f710aa 582 private checkMessageWorkerId (message: MessageValue<Response>): void {
310de0aa
JB
583 if (message.workerId == null) {
584 throw new Error('Worker message received without worker id')
585 } else if (
21f710aa 586 message.workerId != null &&
aad6fb64 587 this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
21f710aa
JB
588 ) {
589 throw new Error(
590 `Worker message received from unknown worker '${message.workerId}'`
591 )
592 }
593 }
594
ffcbbad8 595 /**
f06e48d8 596 * Gets the given worker its worker node key.
ffcbbad8
JB
597 *
598 * @param worker - The worker.
f59e1027 599 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
ffcbbad8 600 */
aad6fb64 601 private getWorkerNodeKeyByWorker (worker: Worker): number {
f06e48d8 602 return this.workerNodes.findIndex(
041dc05b 603 workerNode => workerNode.worker === worker
f06e48d8 604 )
bf9549ae
JB
605 }
606
aa9eede8
JB
607 /**
608 * Gets the worker node key given its worker id.
609 *
610 * @param workerId - The worker id.
aad6fb64 611 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
aa9eede8 612 */
72ae84a2 613 private getWorkerNodeKeyByWorkerId (workerId: number | undefined): number {
aad6fb64 614 return this.workerNodes.findIndex(
041dc05b 615 workerNode => workerNode.info.id === workerId
aad6fb64 616 )
aa9eede8
JB
617 }
618
afc003b2 619 /** @inheritDoc */
a35560ba 620 public setWorkerChoiceStrategy (
59219cbb
JB
621 workerChoiceStrategy: WorkerChoiceStrategy,
622 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 623 ): void {
aee46736 624 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 625 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
626 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
627 this.opts.workerChoiceStrategy
628 )
629 if (workerChoiceStrategyOptions != null) {
630 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
631 }
aa9eede8 632 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
4b628b48 633 workerNode.resetUsage()
9edb9717 634 this.sendStatisticsMessageToWorker(workerNodeKey)
59219cbb 635 }
a20f0ba5
JB
636 }
637
638 /** @inheritDoc */
639 public setWorkerChoiceStrategyOptions (
640 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
641 ): void {
0d80593b 642 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
8990357d
JB
643 this.opts.workerChoiceStrategyOptions = {
644 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
645 ...workerChoiceStrategyOptions
646 }
a20f0ba5
JB
647 this.workerChoiceStrategyContext.setOptions(
648 this.opts.workerChoiceStrategyOptions
a35560ba
S
649 )
650 }
651
a20f0ba5 652 /** @inheritDoc */
8f52842f
JB
653 public enableTasksQueue (
654 enable: boolean,
655 tasksQueueOptions?: TasksQueueOptions
656 ): void {
a20f0ba5 657 if (this.opts.enableTasksQueue === true && !enable) {
ef41a6e6 658 this.flushTasksQueues()
a20f0ba5
JB
659 }
660 this.opts.enableTasksQueue = enable
8f52842f 661 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
662 }
663
664 /** @inheritDoc */
8f52842f 665 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 666 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
667 this.checkValidTasksQueueOptions(tasksQueueOptions)
668 this.opts.tasksQueueOptions =
669 this.buildTasksQueueOptions(tasksQueueOptions)
5b49e864 670 this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number)
5baee0d7 671 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
672 delete this.opts.tasksQueueOptions
673 }
674 }
675
5b49e864 676 private setTasksQueueSize (size: number): void {
20c6f652 677 for (const workerNode of this.workerNodes) {
ff3f866a 678 workerNode.tasksQueueBackPressureSize = size
20c6f652
JB
679 }
680 }
681
a20f0ba5
JB
682 private buildTasksQueueOptions (
683 tasksQueueOptions: TasksQueueOptions
684 ): TasksQueueOptions {
685 return {
20c6f652 686 ...{
ff3f866a 687 size: Math.pow(this.maxSize, 2),
20c6f652
JB
688 concurrency: 1
689 },
690 ...tasksQueueOptions
a20f0ba5
JB
691 }
692 }
693
c319c66b
JB
694 /**
695 * Whether the pool is full or not.
696 *
697 * The pool filling boolean status.
698 */
dea903a8
JB
699 protected get full (): boolean {
700 return this.workerNodes.length >= this.maxSize
701 }
c2ade475 702
c319c66b
JB
703 /**
704 * Whether the pool is busy or not.
705 *
706 * The pool busyness boolean status.
707 */
708 protected abstract get busy (): boolean
7c0ba920 709
6c6afb84 710 /**
3d76750a 711 * Whether worker nodes are executing concurrently their tasks quota or not.
6c6afb84
JB
712 *
713 * @returns Worker nodes busyness boolean status.
714 */
c2ade475 715 protected internalBusy (): boolean {
3d76750a
JB
716 if (this.opts.enableTasksQueue === true) {
717 return (
718 this.workerNodes.findIndex(
041dc05b 719 workerNode =>
3d76750a
JB
720 workerNode.info.ready &&
721 workerNode.usage.tasks.executing <
722 (this.opts.tasksQueueOptions?.concurrency as number)
723 ) === -1
724 )
725 } else {
726 return (
727 this.workerNodes.findIndex(
041dc05b 728 workerNode =>
3d76750a
JB
729 workerNode.info.ready && workerNode.usage.tasks.executing === 0
730 ) === -1
731 )
732 }
cb70b19d
JB
733 }
734
e81c38f2 735 private async sendTaskFunctionOperationToWorker (
72ae84a2
JB
736 workerNodeKey: number,
737 message: MessageValue<Data>
738 ): Promise<boolean> {
739 const workerId = this.getWorkerInfo(workerNodeKey).id as number
740 return await new Promise<boolean>((resolve, reject) => {
741 this.registerWorkerMessageListener(workerNodeKey, message => {
742 if (
743 message.workerId === workerId &&
744 message.taskFunctionOperationStatus === true
745 ) {
746 resolve(true)
747 } else if (
748 message.workerId === workerId &&
749 message.taskFunctionOperationStatus === false
750 ) {
751 reject(
752 new Error(
753 `Task function operation ${
754 message.taskFunctionOperation as string
755 } failed on worker ${message.workerId}`
756 )
757 )
758 }
759 })
760 this.sendToWorker(workerNodeKey, message)
761 })
762 }
763
764 private async sendTaskFunctionOperationToWorkers (
e81c38f2
JB
765 message: Omit<MessageValue<Data>, 'workerId'>
766 ): Promise<boolean> {
767 return await new Promise<boolean>((resolve, reject) => {
768 const responsesReceived = new Array<MessageValue<Data | Response>>()
769 for (const [workerNodeKey] of this.workerNodes.entries()) {
770 this.registerWorkerMessageListener(workerNodeKey, message => {
771 if (message.taskFunctionOperationStatus != null) {
772 responsesReceived.push(message)
773 if (
774 responsesReceived.length === this.workerNodes.length &&
775 responsesReceived.every(
776 message => message.taskFunctionOperationStatus === true
777 )
778 ) {
779 resolve(true)
780 } else if (
781 responsesReceived.length === this.workerNodes.length &&
782 responsesReceived.some(
783 message => message.taskFunctionOperationStatus === false
784 )
785 ) {
786 reject(
787 new Error(
788 `Task function operation ${
789 message.taskFunctionOperation as string
72ae84a2 790 } failed on worker ${message.workerId as number}`
e81c38f2
JB
791 )
792 )
793 }
794 }
795 })
72ae84a2 796 this.sendToWorker(workerNodeKey, message)
e81c38f2
JB
797 }
798 })
6703b9f4
JB
799 }
800
801 /** @inheritDoc */
802 public hasTaskFunction (name: string): boolean {
edbc15c6
JB
803 for (const workerNode of this.workerNodes) {
804 if (
805 Array.isArray(workerNode.info.taskFunctionNames) &&
806 workerNode.info.taskFunctionNames.includes(name)
807 ) {
808 return true
809 }
810 }
811 return false
6703b9f4
JB
812 }
813
814 /** @inheritDoc */
e81c38f2
JB
815 public async addTaskFunction (
816 name: string,
72ae84a2 817 taskFunction: TaskFunction<Data, Response>
e81c38f2 818 ): Promise<boolean> {
72ae84a2
JB
819 this.taskFunctions.set(name, taskFunction)
820 return await this.sendTaskFunctionOperationToWorkers({
6703b9f4
JB
821 taskFunctionOperation: 'add',
822 taskFunctionName: name,
823 taskFunction: taskFunction.toString()
824 })
6703b9f4
JB
825 }
826
827 /** @inheritDoc */
e81c38f2 828 public async removeTaskFunction (name: string): Promise<boolean> {
72ae84a2
JB
829 this.taskFunctions.delete(name)
830 return await this.sendTaskFunctionOperationToWorkers({
6703b9f4
JB
831 taskFunctionOperation: 'remove',
832 taskFunctionName: name
833 })
6703b9f4
JB
834 }
835
90d7d101 836 /** @inheritDoc */
6703b9f4 837 public listTaskFunctionNames (): string[] {
f2dbbf95
JB
838 for (const workerNode of this.workerNodes) {
839 if (
6703b9f4
JB
840 Array.isArray(workerNode.info.taskFunctionNames) &&
841 workerNode.info.taskFunctionNames.length > 0
f2dbbf95 842 ) {
6703b9f4 843 return workerNode.info.taskFunctionNames
f2dbbf95 844 }
90d7d101 845 }
f2dbbf95 846 return []
90d7d101
JB
847 }
848
6703b9f4 849 /** @inheritDoc */
e81c38f2 850 public async setDefaultTaskFunction (name: string): Promise<boolean> {
72ae84a2 851 return await this.sendTaskFunctionOperationToWorkers({
6703b9f4
JB
852 taskFunctionOperation: 'default',
853 taskFunctionName: name
854 })
6703b9f4
JB
855 }
856
375f7504
JB
857 private shallExecuteTask (workerNodeKey: number): boolean {
858 return (
859 this.tasksQueueSize(workerNodeKey) === 0 &&
860 this.workerNodes[workerNodeKey].usage.tasks.executing <
861 (this.opts.tasksQueueOptions?.concurrency as number)
862 )
863 }
864
afc003b2 865 /** @inheritDoc */
7d91a8cd
JB
866 public async execute (
867 data?: Data,
868 name?: string,
869 transferList?: TransferListItem[]
870 ): Promise<Response> {
52b71763 871 return await new Promise<Response>((resolve, reject) => {
15b176e0
JB
872 if (!this.started) {
873 reject(new Error('Cannot execute a task on destroyed pool'))
9d2d0da1 874 return
15b176e0 875 }
7d91a8cd
JB
876 if (name != null && typeof name !== 'string') {
877 reject(new TypeError('name argument must be a string'))
9d2d0da1 878 return
7d91a8cd 879 }
90d7d101
JB
880 if (
881 name != null &&
882 typeof name === 'string' &&
883 name.trim().length === 0
884 ) {
f58b60b9 885 reject(new TypeError('name argument must not be an empty string'))
9d2d0da1 886 return
90d7d101 887 }
b558f6b5
JB
888 if (transferList != null && !Array.isArray(transferList)) {
889 reject(new TypeError('transferList argument must be an array'))
9d2d0da1 890 return
b558f6b5
JB
891 }
892 const timestamp = performance.now()
893 const workerNodeKey = this.chooseWorkerNode()
501aea93 894 const task: Task<Data> = {
52b71763
JB
895 name: name ?? DEFAULT_TASK_NAME,
896 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
897 data: data ?? ({} as Data),
7d91a8cd 898 transferList,
52b71763 899 timestamp,
7629bdf1 900 taskId: randomUUID()
52b71763 901 }
7629bdf1 902 this.promiseResponseMap.set(task.taskId as string, {
2e81254d
JB
903 resolve,
904 reject,
501aea93 905 workerNodeKey
2e81254d 906 })
52b71763 907 if (
4e377863
JB
908 this.opts.enableTasksQueue === false ||
909 (this.opts.enableTasksQueue === true &&
375f7504 910 this.shallExecuteTask(workerNodeKey))
52b71763 911 ) {
501aea93 912 this.executeTask(workerNodeKey, task)
4e377863
JB
913 } else {
914 this.enqueueTask(workerNodeKey, task)
52b71763 915 }
2e81254d 916 })
280c2a77 917 }
c97c7edb 918
afc003b2 919 /** @inheritDoc */
c97c7edb 920 public async destroy (): Promise<void> {
1fbcaa7c 921 await Promise.all(
81c02522 922 this.workerNodes.map(async (_, workerNodeKey) => {
aa9eede8 923 await this.destroyWorkerNode(workerNodeKey)
1fbcaa7c
JB
924 })
925 )
33e6bb4c 926 this.emitter?.emit(PoolEvents.destroy, this.info)
15b176e0 927 this.started = false
c97c7edb
S
928 }
929
1e3214b6 930 protected async sendKillMessageToWorker (
72ae84a2 931 workerNodeKey: number
1e3214b6 932 ): Promise<void> {
9edb9717 933 await new Promise<void>((resolve, reject) => {
041dc05b 934 this.registerWorkerMessageListener(workerNodeKey, message => {
1e3214b6
JB
935 if (message.kill === 'success') {
936 resolve()
937 } else if (message.kill === 'failure') {
72ae84a2
JB
938 reject(
939 new Error(
940 `Worker ${
941 message.workerId as number
942 } kill message handling failed`
943 )
944 )
1e3214b6
JB
945 }
946 })
72ae84a2 947 this.sendToWorker(workerNodeKey, { kill: true })
1e3214b6 948 })
1e3214b6
JB
949 }
950
4a6952ff 951 /**
aa9eede8 952 * Terminates the worker node given its worker node key.
4a6952ff 953 *
aa9eede8 954 * @param workerNodeKey - The worker node key.
4a6952ff 955 */
81c02522 956 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
c97c7edb 957
729c563d 958 /**
6677a3d3
JB
959 * Setup hook to execute code before worker nodes are created in the abstract constructor.
960 * Can be overridden.
afc003b2
JB
961 *
962 * @virtual
729c563d 963 */
280c2a77 964 protected setupHook (): void {
965df41c 965 /* Intentionally empty */
280c2a77 966 }
c97c7edb 967
729c563d 968 /**
280c2a77
S
969 * Should return whether the worker is the main worker or not.
970 */
971 protected abstract isMain (): boolean
972
973 /**
2e81254d 974 * Hook executed before the worker task execution.
bf9549ae 975 * Can be overridden.
729c563d 976 *
f06e48d8 977 * @param workerNodeKey - The worker node key.
1c6fe997 978 * @param task - The task to execute.
729c563d 979 */
1c6fe997
JB
980 protected beforeTaskExecutionHook (
981 workerNodeKey: number,
982 task: Task<Data>
983 ): void {
94407def
JB
984 if (this.workerNodes[workerNodeKey]?.usage != null) {
985 const workerUsage = this.workerNodes[workerNodeKey].usage
986 ++workerUsage.tasks.executing
987 this.updateWaitTimeWorkerUsage(workerUsage, task)
988 }
989 if (
990 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
991 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
992 task.name as string
993 ) != null
994 ) {
db0e38ee 995 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 996 workerNodeKey
db0e38ee 997 ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
5623b8d5
JB
998 ++taskFunctionWorkerUsage.tasks.executing
999 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
b558f6b5 1000 }
c97c7edb
S
1001 }
1002
c01733f1 1003 /**
2e81254d 1004 * Hook executed after the worker task execution.
bf9549ae 1005 * Can be overridden.
c01733f1 1006 *
501aea93 1007 * @param workerNodeKey - The worker node key.
38e795c1 1008 * @param message - The received message.
c01733f1 1009 */
2e81254d 1010 protected afterTaskExecutionHook (
501aea93 1011 workerNodeKey: number,
2740a743 1012 message: MessageValue<Response>
bf9549ae 1013 ): void {
94407def
JB
1014 if (this.workerNodes[workerNodeKey]?.usage != null) {
1015 const workerUsage = this.workerNodes[workerNodeKey].usage
1016 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
1017 this.updateRunTimeWorkerUsage(workerUsage, message)
1018 this.updateEluWorkerUsage(workerUsage, message)
1019 }
1020 if (
1021 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1022 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
5623b8d5 1023 message.taskPerformance?.name as string
94407def
JB
1024 ) != null
1025 ) {
db0e38ee 1026 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 1027 workerNodeKey
db0e38ee 1028 ].getTaskFunctionWorkerUsage(
0628755c 1029 message.taskPerformance?.name as string
b558f6b5 1030 ) as WorkerUsage
db0e38ee
JB
1031 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
1032 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
1033 this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
b558f6b5
JB
1034 }
1035 }
1036
db0e38ee
JB
1037 /**
1038 * Whether the worker node shall update its task function worker usage or not.
1039 *
1040 * @param workerNodeKey - The worker node key.
1041 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1042 */
1043 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
a5d15204 1044 const workerInfo = this.getWorkerInfo(workerNodeKey)
b558f6b5 1045 return (
94407def 1046 workerInfo != null &&
6703b9f4
JB
1047 Array.isArray(workerInfo.taskFunctionNames) &&
1048 workerInfo.taskFunctionNames.length > 2
b558f6b5 1049 )
f1c06930
JB
1050 }
1051
1052 private updateTaskStatisticsWorkerUsage (
1053 workerUsage: WorkerUsage,
1054 message: MessageValue<Response>
1055 ): void {
a4e07f72 1056 const workerTaskStatistics = workerUsage.tasks
5bb5be17
JB
1057 if (
1058 workerTaskStatistics.executing != null &&
1059 workerTaskStatistics.executing > 0
1060 ) {
1061 --workerTaskStatistics.executing
5bb5be17 1062 }
6703b9f4 1063 if (message.workerError == null) {
98e72cda
JB
1064 ++workerTaskStatistics.executed
1065 } else {
a4e07f72 1066 ++workerTaskStatistics.failed
2740a743 1067 }
f8eb0a2a
JB
1068 }
1069
a4e07f72
JB
1070 private updateRunTimeWorkerUsage (
1071 workerUsage: WorkerUsage,
f8eb0a2a
JB
1072 message: MessageValue<Response>
1073 ): void {
6703b9f4 1074 if (message.workerError != null) {
dc021bcc
JB
1075 return
1076 }
e4f20deb
JB
1077 updateMeasurementStatistics(
1078 workerUsage.runTime,
1079 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
dc021bcc 1080 message.taskPerformance?.runTime ?? 0
e4f20deb 1081 )
f8eb0a2a
JB
1082 }
1083
a4e07f72
JB
1084 private updateWaitTimeWorkerUsage (
1085 workerUsage: WorkerUsage,
1c6fe997 1086 task: Task<Data>
f8eb0a2a 1087 ): void {
1c6fe997
JB
1088 const timestamp = performance.now()
1089 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
e4f20deb
JB
1090 updateMeasurementStatistics(
1091 workerUsage.waitTime,
1092 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
dc021bcc 1093 taskWaitTime
e4f20deb 1094 )
c01733f1 1095 }
1096
a4e07f72 1097 private updateEluWorkerUsage (
5df69fab 1098 workerUsage: WorkerUsage,
62c15a68
JB
1099 message: MessageValue<Response>
1100 ): void {
6703b9f4 1101 if (message.workerError != null) {
dc021bcc
JB
1102 return
1103 }
008512c7
JB
1104 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
1105 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
e4f20deb
JB
1106 updateMeasurementStatistics(
1107 workerUsage.elu.active,
008512c7 1108 eluTaskStatisticsRequirements,
dc021bcc 1109 message.taskPerformance?.elu?.active ?? 0
e4f20deb
JB
1110 )
1111 updateMeasurementStatistics(
1112 workerUsage.elu.idle,
008512c7 1113 eluTaskStatisticsRequirements,
dc021bcc 1114 message.taskPerformance?.elu?.idle ?? 0
e4f20deb 1115 )
008512c7 1116 if (eluTaskStatisticsRequirements.aggregate) {
f7510105 1117 if (message.taskPerformance?.elu != null) {
f7510105
JB
1118 if (workerUsage.elu.utilization != null) {
1119 workerUsage.elu.utilization =
1120 (workerUsage.elu.utilization +
1121 message.taskPerformance.elu.utilization) /
1122 2
1123 } else {
1124 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
1125 }
62c15a68
JB
1126 }
1127 }
1128 }
1129
280c2a77 1130 /**
f06e48d8 1131 * Chooses a worker node for the next task.
280c2a77 1132 *
6c6afb84 1133 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 1134 *
aa9eede8 1135 * @returns The chosen worker node key
280c2a77 1136 */
6c6afb84 1137 private chooseWorkerNode (): number {
930dcf12 1138 if (this.shallCreateDynamicWorker()) {
aa9eede8 1139 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
6c6afb84 1140 if (
b1aae695 1141 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
6c6afb84 1142 ) {
aa9eede8 1143 return workerNodeKey
6c6afb84 1144 }
17393ac8 1145 }
930dcf12
JB
1146 return this.workerChoiceStrategyContext.execute()
1147 }
1148
6c6afb84
JB
1149 /**
1150 * Conditions for dynamic worker creation.
1151 *
1152 * @returns Whether to create a dynamic worker or not.
1153 */
1154 private shallCreateDynamicWorker (): boolean {
930dcf12 1155 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
1156 }
1157
280c2a77 1158 /**
aa9eede8 1159 * Sends a message to worker given its worker node key.
280c2a77 1160 *
aa9eede8 1161 * @param workerNodeKey - The worker node key.
38e795c1 1162 * @param message - The message.
7d91a8cd 1163 * @param transferList - The optional array of transferable objects.
280c2a77
S
1164 */
1165 protected abstract sendToWorker (
aa9eede8 1166 workerNodeKey: number,
7d91a8cd
JB
1167 message: MessageValue<Data>,
1168 transferList?: TransferListItem[]
280c2a77
S
1169 ): void
1170
729c563d 1171 /**
41344292 1172 * Creates a new worker.
6c6afb84
JB
1173 *
1174 * @returns Newly created worker.
729c563d 1175 */
280c2a77 1176 protected abstract createWorker (): Worker
c97c7edb 1177
4a6952ff 1178 /**
aa9eede8 1179 * Creates a new, completely set up worker node.
4a6952ff 1180 *
aa9eede8 1181 * @returns New, completely set up worker node key.
4a6952ff 1182 */
aa9eede8 1183 protected createAndSetupWorkerNode (): number {
bdacc2d2 1184 const worker = this.createWorker()
280c2a77 1185
fd04474e 1186 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
35cf1c03 1187 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 1188 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
041dc05b 1189 worker.on('error', error => {
aad6fb64 1190 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
46b0bb09 1191 const workerInfo = this.getWorkerInfo(workerNodeKey)
9b106837 1192 workerInfo.ready = false
0dc838e3 1193 this.workerNodes[workerNodeKey].closeChannel()
2a69b8c5 1194 this.emitter?.emit(PoolEvents.error, error)
15b176e0
JB
1195 if (
1196 this.opts.restartWorkerOnError === true &&
b6bfca01
JB
1197 this.started &&
1198 !this.starting
15b176e0 1199 ) {
9b106837 1200 if (workerInfo.dynamic) {
aa9eede8 1201 this.createAndSetupDynamicWorkerNode()
8a1260a3 1202 } else {
aa9eede8 1203 this.createAndSetupWorkerNode()
8a1260a3 1204 }
5baee0d7 1205 }
19dbc45b 1206 if (this.opts.enableTasksQueue === true) {
9b106837 1207 this.redistributeQueuedTasks(workerNodeKey)
19dbc45b 1208 }
5baee0d7 1209 })
a35560ba 1210 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 1211 worker.once('exit', () => {
f06e48d8 1212 this.removeWorkerNode(worker)
a974afa6 1213 })
280c2a77 1214
aa9eede8 1215 const workerNodeKey = this.addWorkerNode(worker)
280c2a77 1216
aa9eede8 1217 this.afterWorkerNodeSetup(workerNodeKey)
280c2a77 1218
aa9eede8 1219 return workerNodeKey
c97c7edb 1220 }
be0676b3 1221
930dcf12 1222 /**
aa9eede8 1223 * Creates a new, completely set up dynamic worker node.
930dcf12 1224 *
aa9eede8 1225 * @returns New, completely set up dynamic worker node key.
930dcf12 1226 */
aa9eede8
JB
1227 protected createAndSetupDynamicWorkerNode (): number {
1228 const workerNodeKey = this.createAndSetupWorkerNode()
041dc05b 1229 this.registerWorkerMessageListener(workerNodeKey, message => {
aa9eede8
JB
1230 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1231 message.workerId
aad6fb64 1232 )
aa9eede8 1233 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
81c02522 1234 // Kill message received from worker
930dcf12
JB
1235 if (
1236 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1e3214b6 1237 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
7b56f532 1238 ((this.opts.enableTasksQueue === false &&
aa9eede8 1239 workerUsage.tasks.executing === 0) ||
7b56f532 1240 (this.opts.enableTasksQueue === true &&
aa9eede8
JB
1241 workerUsage.tasks.executing === 0 &&
1242 this.tasksQueueSize(localWorkerNodeKey) === 0)))
930dcf12 1243 ) {
041dc05b 1244 this.destroyWorkerNode(localWorkerNodeKey).catch(error => {
5270d253
JB
1245 this.emitter?.emit(PoolEvents.error, error)
1246 })
930dcf12
JB
1247 }
1248 })
46b0bb09 1249 const workerInfo = this.getWorkerInfo(workerNodeKey)
aa9eede8 1250 this.sendToWorker(workerNodeKey, {
b0a4db63 1251 checkActive: true,
21f710aa
JB
1252 workerId: workerInfo.id as number
1253 })
72ae84a2
JB
1254 if (this.taskFunctions.size > 0) {
1255 for (const [taskFunctionName, taskFunction] of this.taskFunctions) {
1256 this.sendTaskFunctionOperationToWorker(workerNodeKey, {
1257 taskFunctionOperation: 'add',
1258 taskFunctionName,
1259 taskFunction: taskFunction.toString()
1260 }).catch(error => {
1261 this.emitter?.emit(PoolEvents.error, error)
1262 })
1263 }
1264 }
b5e113f6 1265 workerInfo.dynamic = true
b1aae695
JB
1266 if (
1267 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
1268 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1269 ) {
b5e113f6
JB
1270 workerInfo.ready = true
1271 }
33e6bb4c 1272 this.checkAndEmitDynamicWorkerCreationEvents()
aa9eede8 1273 return workerNodeKey
930dcf12
JB
1274 }
1275
a2ed5053 1276 /**
aa9eede8 1277 * Registers a listener callback on the worker given its worker node key.
a2ed5053 1278 *
aa9eede8 1279 * @param workerNodeKey - The worker node key.
a2ed5053
JB
1280 * @param listener - The message listener callback.
1281 */
85aeb3f3
JB
1282 protected abstract registerWorkerMessageListener<
1283 Message extends Data | Response
aa9eede8
JB
1284 >(
1285 workerNodeKey: number,
1286 listener: (message: MessageValue<Message>) => void
1287 ): void
a2ed5053
JB
1288
1289 /**
aa9eede8 1290 * Method hooked up after a worker node has been newly created.
a2ed5053
JB
1291 * Can be overridden.
1292 *
aa9eede8 1293 * @param workerNodeKey - The newly created worker node key.
a2ed5053 1294 */
aa9eede8 1295 protected afterWorkerNodeSetup (workerNodeKey: number): void {
a2ed5053 1296 // Listen to worker messages.
aa9eede8 1297 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
85aeb3f3 1298 // Send the startup message to worker.
aa9eede8 1299 this.sendStartupMessageToWorker(workerNodeKey)
9edb9717
JB
1300 // Send the statistics message to worker.
1301 this.sendStatisticsMessageToWorker(workerNodeKey)
72695f86 1302 if (this.opts.enableTasksQueue === true) {
a6b3272b
JB
1303 this.workerNodes[workerNodeKey].onEmptyQueue =
1304 this.taskStealingOnEmptyQueue.bind(this)
72695f86
JB
1305 this.workerNodes[workerNodeKey].onBackPressure =
1306 this.tasksStealingOnBackPressure.bind(this)
1307 }
d2c73f82
JB
1308 }
1309
85aeb3f3 1310 /**
aa9eede8
JB
1311 * Sends the startup message to worker given its worker node key.
1312 *
1313 * @param workerNodeKey - The worker node key.
1314 */
1315 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1316
1317 /**
9edb9717 1318 * Sends the statistics message to worker given its worker node key.
85aeb3f3 1319 *
aa9eede8 1320 * @param workerNodeKey - The worker node key.
85aeb3f3 1321 */
9edb9717 1322 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
aa9eede8
JB
1323 this.sendToWorker(workerNodeKey, {
1324 statistics: {
1325 runTime:
1326 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1327 .runTime.aggregate,
1328 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1329 .elu.aggregate
72ae84a2 1330 }
aa9eede8
JB
1331 })
1332 }
a2ed5053
JB
1333
1334 private redistributeQueuedTasks (workerNodeKey: number): void {
1335 while (this.tasksQueueSize(workerNodeKey) > 0) {
f201a0cd
JB
1336 const destinationWorkerNodeKey = this.workerNodes.reduce(
1337 (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
852ed3e4
JB
1338 return workerNode.info.ready &&
1339 workerNode.usage.tasks.queued <
1340 workerNodes[minWorkerNodeKey].usage.tasks.queued
f201a0cd
JB
1341 ? workerNodeKey
1342 : minWorkerNodeKey
1343 },
1344 0
1345 )
72ae84a2 1346 const task = this.dequeueTask(workerNodeKey) as Task<Data>
3f690f25
JB
1347 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1348 this.executeTask(destinationWorkerNodeKey, task)
1349 } else {
1350 this.enqueueTask(destinationWorkerNodeKey, task)
dd951876
JB
1351 }
1352 }
1353 }
1354
b1838604
JB
1355 private updateTaskStolenStatisticsWorkerUsage (
1356 workerNodeKey: number,
b1838604
JB
1357 taskName: string
1358 ): void {
1a880eca 1359 const workerNode = this.workerNodes[workerNodeKey]
b1838604
JB
1360 if (workerNode?.usage != null) {
1361 ++workerNode.usage.tasks.stolen
1362 }
1363 if (
1364 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1365 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1366 ) {
1367 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1368 taskName
1369 ) as WorkerUsage
1370 ++taskFunctionWorkerUsage.tasks.stolen
1371 }
1372 }
1373
dd951876 1374 private taskStealingOnEmptyQueue (workerId: number): void {
a6b3272b 1375 const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
dd951876 1376 const workerNodes = this.workerNodes
a6b3272b 1377 .slice()
dd951876
JB
1378 .sort(
1379 (workerNodeA, workerNodeB) =>
1380 workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
1381 )
f201a0cd 1382 const sourceWorkerNode = workerNodes.find(
041dc05b 1383 workerNode =>
f201a0cd
JB
1384 workerNode.info.ready &&
1385 workerNode.info.id !== workerId &&
1386 workerNode.usage.tasks.queued > 0
1387 )
1388 if (sourceWorkerNode != null) {
72ae84a2 1389 const task = sourceWorkerNode.popTask() as Task<Data>
f201a0cd
JB
1390 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1391 this.executeTask(destinationWorkerNodeKey, task)
1392 } else {
1393 this.enqueueTask(destinationWorkerNodeKey, task)
72695f86 1394 }
f201a0cd
JB
1395 this.updateTaskStolenStatisticsWorkerUsage(
1396 destinationWorkerNodeKey,
1397 task.name as string
1398 )
72695f86
JB
1399 }
1400 }
1401
1402 private tasksStealingOnBackPressure (workerId: number): void {
f778c355
JB
1403 const sizeOffset = 1
1404 if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) {
68dbcdc0
JB
1405 return
1406 }
72695f86
JB
1407 const sourceWorkerNode =
1408 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
1409 const workerNodes = this.workerNodes
a6b3272b 1410 .slice()
72695f86
JB
1411 .sort(
1412 (workerNodeA, workerNodeB) =>
1413 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1414 )
1415 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1416 if (
0bc68267 1417 sourceWorkerNode.usage.tasks.queued > 0 &&
a6b3272b
JB
1418 workerNode.info.ready &&
1419 workerNode.info.id !== workerId &&
0bc68267 1420 workerNode.usage.tasks.queued <
f778c355 1421 (this.opts.tasksQueueOptions?.size as number) - sizeOffset
72695f86 1422 ) {
72ae84a2 1423 const task = sourceWorkerNode.popTask() as Task<Data>
375f7504 1424 if (this.shallExecuteTask(workerNodeKey)) {
dd951876 1425 this.executeTask(workerNodeKey, task)
4de3d785 1426 } else {
dd951876 1427 this.enqueueTask(workerNodeKey, task)
4de3d785 1428 }
b1838604
JB
1429 this.updateTaskStolenStatisticsWorkerUsage(
1430 workerNodeKey,
b1838604
JB
1431 task.name as string
1432 )
10ecf8fd 1433 }
a2ed5053
JB
1434 }
1435 }
1436
be0676b3 1437 /**
aa9eede8 1438 * This method is the listener registered for each worker message.
be0676b3 1439 *
bdacc2d2 1440 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
1441 */
1442 protected workerListener (): (message: MessageValue<Response>) => void {
041dc05b 1443 return message => {
21f710aa 1444 this.checkMessageWorkerId(message)
6703b9f4 1445 if (message.ready != null && message.taskFunctionNames != null) {
81c02522 1446 // Worker ready response received from worker
10e2aa7e 1447 this.handleWorkerReadyResponse(message)
7629bdf1 1448 } else if (message.taskId != null) {
81c02522 1449 // Task execution response received from worker
6b272951 1450 this.handleTaskExecutionResponse(message)
6703b9f4
JB
1451 } else if (message.taskFunctionNames != null) {
1452 // Task function names message received from worker
46b0bb09
JB
1453 this.getWorkerInfo(
1454 this.getWorkerNodeKeyByWorkerId(message.workerId)
6703b9f4 1455 ).taskFunctionNames = message.taskFunctionNames
6b272951
JB
1456 }
1457 }
1458 }
1459
10e2aa7e 1460 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
f05ed93c 1461 if (message.ready === false) {
72ae84a2
JB
1462 throw new Error(
1463 `Worker ${message.workerId as number} failed to initialize`
1464 )
f05ed93c 1465 }
a5d15204 1466 const workerInfo = this.getWorkerInfo(
aad6fb64 1467 this.getWorkerNodeKeyByWorkerId(message.workerId)
46b0bb09 1468 )
a5d15204 1469 workerInfo.ready = message.ready as boolean
6703b9f4 1470 workerInfo.taskFunctionNames = message.taskFunctionNames
2431bdb4
JB
1471 if (this.emitter != null && this.ready) {
1472 this.emitter.emit(PoolEvents.ready, this.info)
1473 }
6b272951
JB
1474 }
1475
1476 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
6703b9f4 1477 const { taskId, workerError, data } = message
5441aea6 1478 const promiseResponse = this.promiseResponseMap.get(taskId as string)
6b272951 1479 if (promiseResponse != null) {
6703b9f4
JB
1480 if (workerError != null) {
1481 this.emitter?.emit(PoolEvents.taskError, workerError)
1482 promiseResponse.reject(workerError.message)
6b272951 1483 } else {
5441aea6 1484 promiseResponse.resolve(data as Response)
6b272951 1485 }
501aea93
JB
1486 const workerNodeKey = promiseResponse.workerNodeKey
1487 this.afterTaskExecutionHook(workerNodeKey, message)
f3a91bac 1488 this.workerChoiceStrategyContext.update(workerNodeKey)
5441aea6 1489 this.promiseResponseMap.delete(taskId as string)
6b272951
JB
1490 if (
1491 this.opts.enableTasksQueue === true &&
b5e113f6
JB
1492 this.tasksQueueSize(workerNodeKey) > 0 &&
1493 this.workerNodes[workerNodeKey].usage.tasks.executing <
1494 (this.opts.tasksQueueOptions?.concurrency as number)
6b272951
JB
1495 ) {
1496 this.executeTask(
1497 workerNodeKey,
1498 this.dequeueTask(workerNodeKey) as Task<Data>
1499 )
be0676b3
APA
1500 }
1501 }
be0676b3 1502 }
7c0ba920 1503
a1763c54 1504 private checkAndEmitTaskExecutionEvents (): void {
33e6bb4c
JB
1505 if (this.busy) {
1506 this.emitter?.emit(PoolEvents.busy, this.info)
a1763c54
JB
1507 }
1508 }
1509
1510 private checkAndEmitTaskQueuingEvents (): void {
1511 if (this.hasBackPressure()) {
1512 this.emitter?.emit(PoolEvents.backPressure, this.info)
164d950a
JB
1513 }
1514 }
1515
33e6bb4c
JB
1516 private checkAndEmitDynamicWorkerCreationEvents (): void {
1517 if (this.type === PoolTypes.dynamic) {
1518 if (this.full) {
1519 this.emitter?.emit(PoolEvents.full, this.info)
1520 }
1521 }
1522 }
1523
8a1260a3 1524 /**
aa9eede8 1525 * Gets the worker information given its worker node key.
8a1260a3
JB
1526 *
1527 * @param workerNodeKey - The worker node key.
3f09ed9f 1528 * @returns The worker information.
8a1260a3 1529 */
46b0bb09
JB
1530 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
1531 return this.workerNodes[workerNodeKey].info
e221309a
JB
1532 }
1533
a05c10de 1534 /**
b0a4db63 1535 * Adds the given worker in the pool worker nodes.
ea7a90d3 1536 *
38e795c1 1537 * @param worker - The worker.
aa9eede8
JB
1538 * @returns The added worker node key.
1539 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
ea7a90d3 1540 */
b0a4db63 1541 private addWorkerNode (worker: Worker): number {
671d5154
JB
1542 const workerNode = new WorkerNode<Worker, Data>(
1543 worker,
ff3f866a 1544 this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2)
671d5154 1545 )
b97d82d8 1546 // Flag the worker node as ready at pool startup.
d2c73f82
JB
1547 if (this.starting) {
1548 workerNode.info.ready = true
1549 }
aa9eede8 1550 this.workerNodes.push(workerNode)
aad6fb64 1551 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
aa9eede8 1552 if (workerNodeKey === -1) {
86ed0598 1553 throw new Error('Worker added not found in worker nodes')
aa9eede8
JB
1554 }
1555 return workerNodeKey
ea7a90d3 1556 }
c923ce56 1557
51fe3d3c 1558 /**
f06e48d8 1559 * Removes the given worker from the pool worker nodes.
51fe3d3c 1560 *
f06e48d8 1561 * @param worker - The worker.
51fe3d3c 1562 */
416fd65c 1563 private removeWorkerNode (worker: Worker): void {
aad6fb64 1564 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1f68cede
JB
1565 if (workerNodeKey !== -1) {
1566 this.workerNodes.splice(workerNodeKey, 1)
1567 this.workerChoiceStrategyContext.remove(workerNodeKey)
1568 }
51fe3d3c 1569 }
adc3c320 1570
e2b31e32
JB
1571 /** @inheritDoc */
1572 public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
9e844245 1573 return (
e2b31e32
JB
1574 this.opts.enableTasksQueue === true &&
1575 this.workerNodes[workerNodeKey].hasBackPressure()
9e844245
JB
1576 )
1577 }
1578
1579 private hasBackPressure (): boolean {
1580 return (
1581 this.opts.enableTasksQueue === true &&
1582 this.workerNodes.findIndex(
041dc05b 1583 workerNode => !workerNode.hasBackPressure()
a1763c54 1584 ) === -1
9e844245 1585 )
e2b31e32
JB
1586 }
1587
b0a4db63 1588 /**
aa9eede8 1589 * Executes the given task on the worker given its worker node key.
b0a4db63 1590 *
aa9eede8 1591 * @param workerNodeKey - The worker node key.
b0a4db63
JB
1592 * @param task - The task to execute.
1593 */
2e81254d 1594 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 1595 this.beforeTaskExecutionHook(workerNodeKey, task)
bbfa38a2 1596 this.sendToWorker(workerNodeKey, task, task.transferList)
a1763c54 1597 this.checkAndEmitTaskExecutionEvents()
2e81254d
JB
1598 }
1599
f9f00b5f 1600 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
a1763c54
JB
1601 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1602 this.checkAndEmitTaskQueuingEvents()
1603 return tasksQueueSize
adc3c320
JB
1604 }
1605
416fd65c 1606 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
4b628b48 1607 return this.workerNodes[workerNodeKey].dequeueTask()
adc3c320
JB
1608 }
1609
416fd65c 1610 private tasksQueueSize (workerNodeKey: number): number {
4b628b48 1611 return this.workerNodes[workerNodeKey].tasksQueueSize()
df593701
JB
1612 }
1613
81c02522 1614 protected flushTasksQueue (workerNodeKey: number): void {
920278a2
JB
1615 while (this.tasksQueueSize(workerNodeKey) > 0) {
1616 this.executeTask(
1617 workerNodeKey,
1618 this.dequeueTask(workerNodeKey) as Task<Data>
1619 )
ff733df7 1620 }
4b628b48 1621 this.workerNodes[workerNodeKey].clearTasksQueue()
ff733df7
JB
1622 }
1623
ef41a6e6
JB
1624 private flushTasksQueues (): void {
1625 for (const [workerNodeKey] of this.workerNodes.entries()) {
1626 this.flushTasksQueue(workerNodeKey)
1627 }
1628 }
c97c7edb 1629}