fix: properly handle response for add/remove/set task function operaions
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
2845f2a5 1import { randomUUID } from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
3d6dd312 3import { existsSync } from 'node:fs'
7d91a8cd 4import { type TransferListItem } from 'node:worker_threads'
5c4d16da
JB
5import type {
6 MessageValue,
7 PromiseResponseWrapper,
76d91ea0 8 Task
5c4d16da 9} from '../utility-types'
bbeadd16 10import {
ff128cc9 11 DEFAULT_TASK_NAME,
bbeadd16
JB
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
13 EMPTY_FUNCTION,
dc021bcc 14 average,
59317253 15 isKillBehavior,
0d80593b 16 isPlainObject,
90d6701c 17 max,
afe0d5bf 18 median,
90d6701c 19 min,
e4f20deb
JB
20 round,
21 updateMeasurementStatistics
bbeadd16 22} from '../utils'
59317253 23import { KillBehaviors } from '../worker/worker-options'
6703b9f4 24import type { TaskFunction } from '../worker/task-functions'
c4855468 25import {
65d7a1c9 26 type IPool,
7c5a1080 27 PoolEmitter,
c4855468 28 PoolEvents,
6b27d407 29 type PoolInfo,
c4855468 30 type PoolOptions,
6b27d407
JB
31 type PoolType,
32 PoolTypes,
4b628b48 33 type TasksQueueOptions
c4855468 34} from './pool'
bbfa38a2
JB
35import type {
36 IWorker,
37 IWorkerNode,
38 WorkerInfo,
39 WorkerType,
40 WorkerUsage
e102732c 41} from './worker'
a35560ba 42import {
008512c7 43 type MeasurementStatisticsRequirements,
f0d7f803 44 Measurements,
a35560ba 45 WorkerChoiceStrategies,
a20f0ba5
JB
46 type WorkerChoiceStrategy,
47 type WorkerChoiceStrategyOptions
bdaf31cd
JB
48} from './selection-strategies/selection-strategies-types'
49import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
92b1feaa 50import { version } from './version'
4b628b48 51import { WorkerNode } from './worker-node'
23ccf9d7 52
729c563d 53/**
ea7a90d3 54 * Base class that implements some shared logic for all poolifier pools.
729c563d 55 *
38e795c1 56 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
57 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
58 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 59 */
c97c7edb 60export abstract class AbstractPool<
f06e48d8 61 Worker extends IWorker,
d3c8a1a8
S
62 Data = unknown,
63 Response = unknown
c4855468 64> implements IPool<Worker, Data, Response> {
afc003b2 65 /** @inheritDoc */
4b628b48 66 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
4a6952ff 67
afc003b2 68 /** @inheritDoc */
7c0ba920
JB
69 public readonly emitter?: PoolEmitter
70
be0676b3 71 /**
52b71763 72 * The task execution response promise map.
be0676b3 73 *
2740a743 74 * - `key`: The message id of each submitted task.
a3445496 75 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 76 *
a3445496 77 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 78 */
501aea93
JB
79 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
80 new Map<string, PromiseResponseWrapper<Response>>()
c97c7edb 81
a35560ba 82 /**
51fe3d3c 83 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
84 */
85 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
86 Worker,
87 Data,
88 Response
a35560ba
S
89 >
90
8735b4e5
JB
91 /**
92 * Dynamic pool maximum size property placeholder.
93 */
94 protected readonly max?: number
95
075e51d1 96 /**
adc9cc64 97 * Whether the pool is starting or not.
075e51d1
JB
98 */
99 private readonly starting: boolean
15b176e0
JB
100 /**
101 * Whether the pool is started or not.
102 */
103 private started: boolean
afe0d5bf
JB
104 /**
105 * The start timestamp of the pool.
106 */
107 private readonly startTimestamp
108
729c563d
S
109 /**
110 * Constructs a new poolifier pool.
111 *
38e795c1 112 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 113 * @param filePath - Path to the worker file.
38e795c1 114 * @param opts - Options for the pool.
729c563d 115 */
c97c7edb 116 public constructor (
b4213b7f
JB
117 protected readonly numberOfWorkers: number,
118 protected readonly filePath: string,
119 protected readonly opts: PoolOptions<Worker>
c97c7edb 120 ) {
78cea37e 121 if (!this.isMain()) {
04f45163 122 throw new Error(
8c6d4acf 123 'Cannot start a pool from a worker with the same type as the pool'
04f45163 124 )
c97c7edb 125 }
8d3782fa 126 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 127 this.checkFilePath(this.filePath)
7c0ba920 128 this.checkPoolOptions(this.opts)
1086026a 129
7254e419
JB
130 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
131 this.executeTask = this.executeTask.bind(this)
132 this.enqueueTask = this.enqueueTask.bind(this)
1086026a 133
6bd72cd0 134 if (this.opts.enableEvents === true) {
7c0ba920
JB
135 this.emitter = new PoolEmitter()
136 }
d59df138
JB
137 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
138 Worker,
139 Data,
140 Response
da309861
JB
141 >(
142 this,
143 this.opts.workerChoiceStrategy,
144 this.opts.workerChoiceStrategyOptions
145 )
b6b32453
JB
146
147 this.setupHook()
148
075e51d1 149 this.starting = true
e761c033 150 this.startPool()
075e51d1 151 this.starting = false
15b176e0 152 this.started = true
afe0d5bf
JB
153
154 this.startTimestamp = performance.now()
c97c7edb
S
155 }
156
a35560ba 157 private checkFilePath (filePath: string): void {
ffcbbad8
JB
158 if (
159 filePath == null ||
3d6dd312 160 typeof filePath !== 'string' ||
ffcbbad8
JB
161 (typeof filePath === 'string' && filePath.trim().length === 0)
162 ) {
c510fea7
APA
163 throw new Error('Please specify a file with a worker implementation')
164 }
3d6dd312
JB
165 if (!existsSync(filePath)) {
166 throw new Error(`Cannot find the worker file '${filePath}'`)
167 }
c510fea7
APA
168 }
169
8d3782fa
JB
170 private checkNumberOfWorkers (numberOfWorkers: number): void {
171 if (numberOfWorkers == null) {
172 throw new Error(
173 'Cannot instantiate a pool without specifying the number of workers'
174 )
78cea37e 175 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 176 throw new TypeError(
0d80593b 177 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
178 )
179 } else if (numberOfWorkers < 0) {
473c717a 180 throw new RangeError(
8d3782fa
JB
181 'Cannot instantiate a pool with a negative number of workers'
182 )
6b27d407 183 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
2431bdb4
JB
184 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
185 }
186 }
187
188 protected checkDynamicPoolSize (min: number, max: number): void {
079de991 189 if (this.type === PoolTypes.dynamic) {
a5ed75b7 190 if (max == null) {
e695d66f 191 throw new TypeError(
a5ed75b7
JB
192 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
193 )
194 } else if (!Number.isSafeInteger(max)) {
2761efb4
JB
195 throw new TypeError(
196 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
197 )
198 } else if (min > max) {
079de991
JB
199 throw new RangeError(
200 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
201 )
b97d82d8 202 } else if (max === 0) {
079de991 203 throw new RangeError(
d640b48b 204 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
079de991
JB
205 )
206 } else if (min === max) {
207 throw new RangeError(
208 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
209 )
210 }
8d3782fa
JB
211 }
212 }
213
7c0ba920 214 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b
JB
215 if (isPlainObject(opts)) {
216 this.opts.workerChoiceStrategy =
217 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
218 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
8990357d
JB
219 this.opts.workerChoiceStrategyOptions = {
220 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
221 ...opts.workerChoiceStrategyOptions
222 }
49be33fe
JB
223 this.checkValidWorkerChoiceStrategyOptions(
224 this.opts.workerChoiceStrategyOptions
225 )
1f68cede 226 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
227 this.opts.enableEvents = opts.enableEvents ?? true
228 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
229 if (this.opts.enableTasksQueue) {
230 this.checkValidTasksQueueOptions(
231 opts.tasksQueueOptions as TasksQueueOptions
232 )
233 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
234 opts.tasksQueueOptions as TasksQueueOptions
235 )
236 }
237 } else {
238 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 239 }
aee46736
JB
240 }
241
242 private checkValidWorkerChoiceStrategy (
243 workerChoiceStrategy: WorkerChoiceStrategy
244 ): void {
245 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
b529c323 246 throw new Error(
aee46736 247 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
248 )
249 }
7c0ba920
JB
250 }
251
0d80593b
JB
252 private checkValidWorkerChoiceStrategyOptions (
253 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
254 ): void {
255 if (!isPlainObject(workerChoiceStrategyOptions)) {
256 throw new TypeError(
257 'Invalid worker choice strategy options: must be a plain object'
258 )
259 }
8990357d 260 if (
8c0b113f
JB
261 workerChoiceStrategyOptions.retries != null &&
262 !Number.isSafeInteger(workerChoiceStrategyOptions.retries)
8990357d
JB
263 ) {
264 throw new TypeError(
8c0b113f 265 'Invalid worker choice strategy options: retries must be an integer'
8990357d
JB
266 )
267 }
268 if (
8c0b113f
JB
269 workerChoiceStrategyOptions.retries != null &&
270 workerChoiceStrategyOptions.retries < 0
8990357d
JB
271 ) {
272 throw new RangeError(
8c0b113f 273 `Invalid worker choice strategy options: retries '${workerChoiceStrategyOptions.retries}' must be greater or equal than zero`
8990357d
JB
274 )
275 }
49be33fe
JB
276 if (
277 workerChoiceStrategyOptions.weights != null &&
6b27d407 278 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
279 ) {
280 throw new Error(
281 'Invalid worker choice strategy options: must have a weight for each worker node'
282 )
283 }
f0d7f803
JB
284 if (
285 workerChoiceStrategyOptions.measurement != null &&
286 !Object.values(Measurements).includes(
287 workerChoiceStrategyOptions.measurement
288 )
289 ) {
290 throw new Error(
291 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
292 )
293 }
0d80593b
JB
294 }
295
a20f0ba5 296 private checkValidTasksQueueOptions (
76d91ea0 297 tasksQueueOptions: TasksQueueOptions
a20f0ba5 298 ): void {
0d80593b
JB
299 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
300 throw new TypeError('Invalid tasks queue options: must be a plain object')
301 }
f0d7f803 302 if (
b7d085c4
JB
303 tasksQueueOptions?.concurrency != null &&
304 !Number.isSafeInteger(tasksQueueOptions?.concurrency)
f0d7f803
JB
305 ) {
306 throw new TypeError(
20c6f652 307 'Invalid worker node tasks concurrency: must be an integer'
f0d7f803
JB
308 )
309 }
310 if (
b7d085c4
JB
311 tasksQueueOptions?.concurrency != null &&
312 tasksQueueOptions?.concurrency <= 0
f0d7f803 313 ) {
e695d66f 314 throw new RangeError(
b7d085c4 315 `Invalid worker node tasks concurrency: ${tasksQueueOptions?.concurrency} is a negative integer or zero`
20c6f652
JB
316 )
317 }
b7d085c4 318 if (tasksQueueOptions?.queueMaxSize != null) {
ff3f866a 319 throw new Error(
68dbcdc0 320 'Invalid tasks queue options: queueMaxSize is deprecated, please use size instead'
20c6f652
JB
321 )
322 }
323 if (
b7d085c4
JB
324 tasksQueueOptions?.size != null &&
325 !Number.isSafeInteger(tasksQueueOptions?.size)
20c6f652 326 ) {
ff3f866a 327 throw new TypeError(
68dbcdc0 328 'Invalid worker node tasks queue size: must be an integer'
ff3f866a
JB
329 )
330 }
b7d085c4 331 if (tasksQueueOptions?.size != null && tasksQueueOptions?.size <= 0) {
20c6f652 332 throw new RangeError(
b7d085c4 333 `Invalid worker node tasks queue size: ${tasksQueueOptions?.size} is a negative integer or zero`
a20f0ba5
JB
334 )
335 }
336 }
337
e761c033
JB
338 private startPool (): void {
339 while (
340 this.workerNodes.reduce(
341 (accumulator, workerNode) =>
342 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
343 0
344 ) < this.numberOfWorkers
345 ) {
aa9eede8 346 this.createAndSetupWorkerNode()
e761c033
JB
347 }
348 }
349
08f3f44c 350 /** @inheritDoc */
6b27d407
JB
351 public get info (): PoolInfo {
352 return {
23ccf9d7 353 version,
6b27d407 354 type: this.type,
184855e6 355 worker: this.worker,
2431bdb4
JB
356 ready: this.ready,
357 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
6b27d407
JB
358 minSize: this.minSize,
359 maxSize: this.maxSize,
c05f0d50
JB
360 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
361 .runTime.aggregate &&
1305e9a8
JB
362 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
363 .waitTime.aggregate && { utilization: round(this.utilization) }),
6b27d407
JB
364 workerNodes: this.workerNodes.length,
365 idleWorkerNodes: this.workerNodes.reduce(
366 (accumulator, workerNode) =>
f59e1027 367 workerNode.usage.tasks.executing === 0
a4e07f72
JB
368 ? accumulator + 1
369 : accumulator,
6b27d407
JB
370 0
371 ),
372 busyWorkerNodes: this.workerNodes.reduce(
373 (accumulator, workerNode) =>
f59e1027 374 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
6b27d407
JB
375 0
376 ),
a4e07f72 377 executedTasks: this.workerNodes.reduce(
6b27d407 378 (accumulator, workerNode) =>
f59e1027 379 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
380 0
381 ),
382 executingTasks: this.workerNodes.reduce(
383 (accumulator, workerNode) =>
f59e1027 384 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
385 0
386 ),
daf86646
JB
387 ...(this.opts.enableTasksQueue === true && {
388 queuedTasks: this.workerNodes.reduce(
389 (accumulator, workerNode) =>
390 accumulator + workerNode.usage.tasks.queued,
391 0
392 )
393 }),
394 ...(this.opts.enableTasksQueue === true && {
395 maxQueuedTasks: this.workerNodes.reduce(
396 (accumulator, workerNode) =>
397 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
398 0
399 )
400 }),
a1763c54
JB
401 ...(this.opts.enableTasksQueue === true && {
402 backPressure: this.hasBackPressure()
403 }),
68cbdc84
JB
404 ...(this.opts.enableTasksQueue === true && {
405 stolenTasks: this.workerNodes.reduce(
406 (accumulator, workerNode) =>
407 accumulator + workerNode.usage.tasks.stolen,
408 0
409 )
410 }),
a4e07f72
JB
411 failedTasks: this.workerNodes.reduce(
412 (accumulator, workerNode) =>
f59e1027 413 accumulator + workerNode.usage.tasks.failed,
a4e07f72 414 0
1dcf8b7b
JB
415 ),
416 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
417 .runTime.aggregate && {
418 runTime: {
98e72cda 419 minimum: round(
90d6701c 420 min(
98e72cda 421 ...this.workerNodes.map(
041dc05b 422 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
98e72cda 423 )
1dcf8b7b
JB
424 )
425 ),
98e72cda 426 maximum: round(
90d6701c 427 max(
98e72cda 428 ...this.workerNodes.map(
041dc05b 429 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
98e72cda 430 )
1dcf8b7b 431 )
98e72cda 432 ),
3baa0837
JB
433 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
434 .runTime.average && {
435 average: round(
436 average(
437 this.workerNodes.reduce<number[]>(
438 (accumulator, workerNode) =>
439 accumulator.concat(workerNode.usage.runTime.history),
440 []
441 )
98e72cda 442 )
dc021bcc 443 )
3baa0837 444 }),
98e72cda
JB
445 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
446 .runTime.median && {
447 median: round(
448 median(
3baa0837
JB
449 this.workerNodes.reduce<number[]>(
450 (accumulator, workerNode) =>
451 accumulator.concat(workerNode.usage.runTime.history),
452 []
98e72cda
JB
453 )
454 )
455 )
456 })
1dcf8b7b
JB
457 }
458 }),
459 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
460 .waitTime.aggregate && {
461 waitTime: {
98e72cda 462 minimum: round(
90d6701c 463 min(
98e72cda 464 ...this.workerNodes.map(
041dc05b 465 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
98e72cda 466 )
1dcf8b7b
JB
467 )
468 ),
98e72cda 469 maximum: round(
90d6701c 470 max(
98e72cda 471 ...this.workerNodes.map(
041dc05b 472 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
98e72cda 473 )
1dcf8b7b 474 )
98e72cda 475 ),
3baa0837
JB
476 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
477 .waitTime.average && {
478 average: round(
479 average(
480 this.workerNodes.reduce<number[]>(
481 (accumulator, workerNode) =>
482 accumulator.concat(workerNode.usage.waitTime.history),
483 []
484 )
98e72cda 485 )
dc021bcc 486 )
3baa0837 487 }),
98e72cda
JB
488 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
489 .waitTime.median && {
490 median: round(
491 median(
3baa0837
JB
492 this.workerNodes.reduce<number[]>(
493 (accumulator, workerNode) =>
494 accumulator.concat(workerNode.usage.waitTime.history),
495 []
98e72cda
JB
496 )
497 )
498 )
499 })
1dcf8b7b
JB
500 }
501 })
6b27d407
JB
502 }
503 }
08f3f44c 504
aa9eede8
JB
505 /**
506 * The pool readiness boolean status.
507 */
2431bdb4
JB
508 private get ready (): boolean {
509 return (
b97d82d8
JB
510 this.workerNodes.reduce(
511 (accumulator, workerNode) =>
512 !workerNode.info.dynamic && workerNode.info.ready
513 ? accumulator + 1
514 : accumulator,
515 0
516 ) >= this.minSize
2431bdb4
JB
517 )
518 }
519
afe0d5bf 520 /**
aa9eede8 521 * The approximate pool utilization.
afe0d5bf
JB
522 *
523 * @returns The pool utilization.
524 */
525 private get utilization (): number {
8e5ca040 526 const poolTimeCapacity =
fe7d90db 527 (performance.now() - this.startTimestamp) * this.maxSize
afe0d5bf
JB
528 const totalTasksRunTime = this.workerNodes.reduce(
529 (accumulator, workerNode) =>
71514351 530 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
afe0d5bf
JB
531 0
532 )
533 const totalTasksWaitTime = this.workerNodes.reduce(
534 (accumulator, workerNode) =>
71514351 535 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
afe0d5bf
JB
536 0
537 )
8e5ca040 538 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
afe0d5bf
JB
539 }
540
8881ae32 541 /**
aa9eede8 542 * The pool type.
8881ae32
JB
543 *
544 * If it is `'dynamic'`, it provides the `max` property.
545 */
546 protected abstract get type (): PoolType
547
184855e6 548 /**
aa9eede8 549 * The worker type.
184855e6
JB
550 */
551 protected abstract get worker (): WorkerType
552
c2ade475 553 /**
aa9eede8 554 * The pool minimum size.
c2ade475 555 */
8735b4e5
JB
556 protected get minSize (): number {
557 return this.numberOfWorkers
558 }
ff733df7
JB
559
560 /**
aa9eede8 561 * The pool maximum size.
ff733df7 562 */
8735b4e5
JB
563 protected get maxSize (): number {
564 return this.max ?? this.numberOfWorkers
565 }
a35560ba 566
6b813701
JB
567 /**
568 * Checks if the worker id sent in the received message from a worker is valid.
569 *
570 * @param message - The received message.
571 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
572 */
21f710aa 573 private checkMessageWorkerId (message: MessageValue<Response>): void {
310de0aa
JB
574 if (message.workerId == null) {
575 throw new Error('Worker message received without worker id')
576 } else if (
21f710aa 577 message.workerId != null &&
aad6fb64 578 this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
21f710aa
JB
579 ) {
580 throw new Error(
581 `Worker message received from unknown worker '${message.workerId}'`
582 )
583 }
584 }
585
ffcbbad8 586 /**
f06e48d8 587 * Gets the given worker its worker node key.
ffcbbad8
JB
588 *
589 * @param worker - The worker.
f59e1027 590 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
ffcbbad8 591 */
aad6fb64 592 private getWorkerNodeKeyByWorker (worker: Worker): number {
f06e48d8 593 return this.workerNodes.findIndex(
041dc05b 594 workerNode => workerNode.worker === worker
f06e48d8 595 )
bf9549ae
JB
596 }
597
aa9eede8
JB
598 /**
599 * Gets the worker node key given its worker id.
600 *
601 * @param workerId - The worker id.
aad6fb64 602 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
aa9eede8 603 */
aad6fb64
JB
604 private getWorkerNodeKeyByWorkerId (workerId: number): number {
605 return this.workerNodes.findIndex(
041dc05b 606 workerNode => workerNode.info.id === workerId
aad6fb64 607 )
aa9eede8
JB
608 }
609
afc003b2 610 /** @inheritDoc */
a35560ba 611 public setWorkerChoiceStrategy (
59219cbb
JB
612 workerChoiceStrategy: WorkerChoiceStrategy,
613 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 614 ): void {
aee46736 615 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 616 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
617 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
618 this.opts.workerChoiceStrategy
619 )
620 if (workerChoiceStrategyOptions != null) {
621 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
622 }
aa9eede8 623 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
4b628b48 624 workerNode.resetUsage()
9edb9717 625 this.sendStatisticsMessageToWorker(workerNodeKey)
59219cbb 626 }
a20f0ba5
JB
627 }
628
629 /** @inheritDoc */
630 public setWorkerChoiceStrategyOptions (
631 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
632 ): void {
0d80593b 633 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
8990357d
JB
634 this.opts.workerChoiceStrategyOptions = {
635 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
636 ...workerChoiceStrategyOptions
637 }
a20f0ba5
JB
638 this.workerChoiceStrategyContext.setOptions(
639 this.opts.workerChoiceStrategyOptions
a35560ba
S
640 )
641 }
642
a20f0ba5 643 /** @inheritDoc */
8f52842f
JB
644 public enableTasksQueue (
645 enable: boolean,
646 tasksQueueOptions?: TasksQueueOptions
647 ): void {
a20f0ba5 648 if (this.opts.enableTasksQueue === true && !enable) {
ef41a6e6 649 this.flushTasksQueues()
a20f0ba5
JB
650 }
651 this.opts.enableTasksQueue = enable
8f52842f 652 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
653 }
654
655 /** @inheritDoc */
8f52842f 656 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 657 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
658 this.checkValidTasksQueueOptions(tasksQueueOptions)
659 this.opts.tasksQueueOptions =
660 this.buildTasksQueueOptions(tasksQueueOptions)
5b49e864 661 this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number)
5baee0d7 662 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
663 delete this.opts.tasksQueueOptions
664 }
665 }
666
5b49e864 667 private setTasksQueueSize (size: number): void {
20c6f652 668 for (const workerNode of this.workerNodes) {
ff3f866a 669 workerNode.tasksQueueBackPressureSize = size
20c6f652
JB
670 }
671 }
672
a20f0ba5
JB
673 private buildTasksQueueOptions (
674 tasksQueueOptions: TasksQueueOptions
675 ): TasksQueueOptions {
676 return {
20c6f652 677 ...{
ff3f866a 678 size: Math.pow(this.maxSize, 2),
20c6f652
JB
679 concurrency: 1
680 },
681 ...tasksQueueOptions
a20f0ba5
JB
682 }
683 }
684
c319c66b
JB
685 /**
686 * Whether the pool is full or not.
687 *
688 * The pool filling boolean status.
689 */
dea903a8
JB
690 protected get full (): boolean {
691 return this.workerNodes.length >= this.maxSize
692 }
c2ade475 693
c319c66b
JB
694 /**
695 * Whether the pool is busy or not.
696 *
697 * The pool busyness boolean status.
698 */
699 protected abstract get busy (): boolean
7c0ba920 700
6c6afb84 701 /**
3d76750a 702 * Whether worker nodes are executing concurrently their tasks quota or not.
6c6afb84
JB
703 *
704 * @returns Worker nodes busyness boolean status.
705 */
c2ade475 706 protected internalBusy (): boolean {
3d76750a
JB
707 if (this.opts.enableTasksQueue === true) {
708 return (
709 this.workerNodes.findIndex(
041dc05b 710 workerNode =>
3d76750a
JB
711 workerNode.info.ready &&
712 workerNode.usage.tasks.executing <
713 (this.opts.tasksQueueOptions?.concurrency as number)
714 ) === -1
715 )
716 } else {
717 return (
718 this.workerNodes.findIndex(
041dc05b 719 workerNode =>
3d76750a
JB
720 workerNode.info.ready && workerNode.usage.tasks.executing === 0
721 ) === -1
722 )
723 }
cb70b19d
JB
724 }
725
e81c38f2
JB
726 private async sendTaskFunctionOperationToWorker (
727 message: Omit<MessageValue<Data>, 'workerId'>
728 ): Promise<boolean> {
729 return await new Promise<boolean>((resolve, reject) => {
730 const responsesReceived = new Array<MessageValue<Data | Response>>()
731 for (const [workerNodeKey] of this.workerNodes.entries()) {
732 this.registerWorkerMessageListener(workerNodeKey, message => {
733 if (message.taskFunctionOperationStatus != null) {
734 responsesReceived.push(message)
735 if (
736 responsesReceived.length === this.workerNodes.length &&
737 responsesReceived.every(
738 message => message.taskFunctionOperationStatus === true
739 )
740 ) {
741 resolve(true)
742 } else if (
743 responsesReceived.length === this.workerNodes.length &&
744 responsesReceived.some(
745 message => message.taskFunctionOperationStatus === false
746 )
747 ) {
748 reject(
749 new Error(
750 `Task function operation ${
751 message.taskFunctionOperation as string
752 } failed on worker ${message.workerId}`
753 )
754 )
755 }
756 }
757 })
758 this.sendToWorker(workerNodeKey, {
759 ...message,
760 workerId: this.getWorkerInfo(workerNodeKey).id as number
761 })
762 }
763 })
6703b9f4
JB
764 }
765
766 /** @inheritDoc */
767 public hasTaskFunction (name: string): boolean {
edbc15c6
JB
768 for (const workerNode of this.workerNodes) {
769 if (
770 Array.isArray(workerNode.info.taskFunctionNames) &&
771 workerNode.info.taskFunctionNames.includes(name)
772 ) {
773 return true
774 }
775 }
776 return false
6703b9f4
JB
777 }
778
779 /** @inheritDoc */
e81c38f2
JB
780 public async addTaskFunction (
781 name: string,
782 taskFunction: TaskFunction
783 ): Promise<boolean> {
784 return await this.sendTaskFunctionOperationToWorker({
6703b9f4
JB
785 taskFunctionOperation: 'add',
786 taskFunctionName: name,
787 taskFunction: taskFunction.toString()
788 })
6703b9f4
JB
789 }
790
791 /** @inheritDoc */
e81c38f2
JB
792 public async removeTaskFunction (name: string): Promise<boolean> {
793 return await this.sendTaskFunctionOperationToWorker({
6703b9f4
JB
794 taskFunctionOperation: 'remove',
795 taskFunctionName: name
796 })
6703b9f4
JB
797 }
798
90d7d101 799 /** @inheritDoc */
6703b9f4 800 public listTaskFunctionNames (): string[] {
f2dbbf95
JB
801 for (const workerNode of this.workerNodes) {
802 if (
6703b9f4
JB
803 Array.isArray(workerNode.info.taskFunctionNames) &&
804 workerNode.info.taskFunctionNames.length > 0
f2dbbf95 805 ) {
6703b9f4 806 return workerNode.info.taskFunctionNames
f2dbbf95 807 }
90d7d101 808 }
f2dbbf95 809 return []
90d7d101
JB
810 }
811
6703b9f4 812 /** @inheritDoc */
e81c38f2
JB
813 public async setDefaultTaskFunction (name: string): Promise<boolean> {
814 return await this.sendTaskFunctionOperationToWorker({
6703b9f4
JB
815 taskFunctionOperation: 'default',
816 taskFunctionName: name
817 })
6703b9f4
JB
818 }
819
375f7504
JB
820 private shallExecuteTask (workerNodeKey: number): boolean {
821 return (
822 this.tasksQueueSize(workerNodeKey) === 0 &&
823 this.workerNodes[workerNodeKey].usage.tasks.executing <
824 (this.opts.tasksQueueOptions?.concurrency as number)
825 )
826 }
827
afc003b2 828 /** @inheritDoc */
7d91a8cd
JB
829 public async execute (
830 data?: Data,
831 name?: string,
832 transferList?: TransferListItem[]
833 ): Promise<Response> {
52b71763 834 return await new Promise<Response>((resolve, reject) => {
15b176e0
JB
835 if (!this.started) {
836 reject(new Error('Cannot execute a task on destroyed pool'))
9d2d0da1 837 return
15b176e0 838 }
7d91a8cd
JB
839 if (name != null && typeof name !== 'string') {
840 reject(new TypeError('name argument must be a string'))
9d2d0da1 841 return
7d91a8cd 842 }
90d7d101
JB
843 if (
844 name != null &&
845 typeof name === 'string' &&
846 name.trim().length === 0
847 ) {
f58b60b9 848 reject(new TypeError('name argument must not be an empty string'))
9d2d0da1 849 return
90d7d101 850 }
b558f6b5
JB
851 if (transferList != null && !Array.isArray(transferList)) {
852 reject(new TypeError('transferList argument must be an array'))
9d2d0da1 853 return
b558f6b5
JB
854 }
855 const timestamp = performance.now()
856 const workerNodeKey = this.chooseWorkerNode()
501aea93 857 const task: Task<Data> = {
52b71763
JB
858 name: name ?? DEFAULT_TASK_NAME,
859 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
860 data: data ?? ({} as Data),
7d91a8cd 861 transferList,
52b71763 862 timestamp,
1a28f967 863 workerId: this.getWorkerInfo(workerNodeKey).id as number,
7629bdf1 864 taskId: randomUUID()
52b71763 865 }
7629bdf1 866 this.promiseResponseMap.set(task.taskId as string, {
2e81254d
JB
867 resolve,
868 reject,
501aea93 869 workerNodeKey
2e81254d 870 })
52b71763 871 if (
4e377863
JB
872 this.opts.enableTasksQueue === false ||
873 (this.opts.enableTasksQueue === true &&
375f7504 874 this.shallExecuteTask(workerNodeKey))
52b71763 875 ) {
501aea93 876 this.executeTask(workerNodeKey, task)
4e377863
JB
877 } else {
878 this.enqueueTask(workerNodeKey, task)
52b71763 879 }
2e81254d 880 })
280c2a77 881 }
c97c7edb 882
afc003b2 883 /** @inheritDoc */
c97c7edb 884 public async destroy (): Promise<void> {
1fbcaa7c 885 await Promise.all(
81c02522 886 this.workerNodes.map(async (_, workerNodeKey) => {
aa9eede8 887 await this.destroyWorkerNode(workerNodeKey)
1fbcaa7c
JB
888 })
889 )
33e6bb4c 890 this.emitter?.emit(PoolEvents.destroy, this.info)
15b176e0 891 this.started = false
c97c7edb
S
892 }
893
1e3214b6
JB
894 protected async sendKillMessageToWorker (
895 workerNodeKey: number,
896 workerId: number
897 ): Promise<void> {
9edb9717 898 await new Promise<void>((resolve, reject) => {
041dc05b 899 this.registerWorkerMessageListener(workerNodeKey, message => {
1e3214b6
JB
900 if (message.kill === 'success') {
901 resolve()
902 } else if (message.kill === 'failure') {
e1af34e6 903 reject(new Error(`Worker ${workerId} kill message handling failed`))
1e3214b6
JB
904 }
905 })
9edb9717 906 this.sendToWorker(workerNodeKey, { kill: true, workerId })
1e3214b6 907 })
1e3214b6
JB
908 }
909
4a6952ff 910 /**
aa9eede8 911 * Terminates the worker node given its worker node key.
4a6952ff 912 *
aa9eede8 913 * @param workerNodeKey - The worker node key.
4a6952ff 914 */
81c02522 915 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
c97c7edb 916
729c563d 917 /**
6677a3d3
JB
918 * Setup hook to execute code before worker nodes are created in the abstract constructor.
919 * Can be overridden.
afc003b2
JB
920 *
921 * @virtual
729c563d 922 */
280c2a77 923 protected setupHook (): void {
965df41c 924 /* Intentionally empty */
280c2a77 925 }
c97c7edb 926
729c563d 927 /**
280c2a77
S
928 * Should return whether the worker is the main worker or not.
929 */
930 protected abstract isMain (): boolean
931
932 /**
2e81254d 933 * Hook executed before the worker task execution.
bf9549ae 934 * Can be overridden.
729c563d 935 *
f06e48d8 936 * @param workerNodeKey - The worker node key.
1c6fe997 937 * @param task - The task to execute.
729c563d 938 */
1c6fe997
JB
939 protected beforeTaskExecutionHook (
940 workerNodeKey: number,
941 task: Task<Data>
942 ): void {
94407def
JB
943 if (this.workerNodes[workerNodeKey]?.usage != null) {
944 const workerUsage = this.workerNodes[workerNodeKey].usage
945 ++workerUsage.tasks.executing
946 this.updateWaitTimeWorkerUsage(workerUsage, task)
947 }
948 if (
949 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
950 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
951 task.name as string
952 ) != null
953 ) {
db0e38ee 954 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 955 workerNodeKey
db0e38ee 956 ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
5623b8d5
JB
957 ++taskFunctionWorkerUsage.tasks.executing
958 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
b558f6b5 959 }
c97c7edb
S
960 }
961
c01733f1 962 /**
2e81254d 963 * Hook executed after the worker task execution.
bf9549ae 964 * Can be overridden.
c01733f1 965 *
501aea93 966 * @param workerNodeKey - The worker node key.
38e795c1 967 * @param message - The received message.
c01733f1 968 */
2e81254d 969 protected afterTaskExecutionHook (
501aea93 970 workerNodeKey: number,
2740a743 971 message: MessageValue<Response>
bf9549ae 972 ): void {
94407def
JB
973 if (this.workerNodes[workerNodeKey]?.usage != null) {
974 const workerUsage = this.workerNodes[workerNodeKey].usage
975 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
976 this.updateRunTimeWorkerUsage(workerUsage, message)
977 this.updateEluWorkerUsage(workerUsage, message)
978 }
979 if (
980 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
981 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
5623b8d5 982 message.taskPerformance?.name as string
94407def
JB
983 ) != null
984 ) {
db0e38ee 985 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 986 workerNodeKey
db0e38ee 987 ].getTaskFunctionWorkerUsage(
0628755c 988 message.taskPerformance?.name as string
b558f6b5 989 ) as WorkerUsage
db0e38ee
JB
990 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
991 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
992 this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
b558f6b5
JB
993 }
994 }
995
db0e38ee
JB
996 /**
997 * Whether the worker node shall update its task function worker usage or not.
998 *
999 * @param workerNodeKey - The worker node key.
1000 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1001 */
1002 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
a5d15204 1003 const workerInfo = this.getWorkerInfo(workerNodeKey)
b558f6b5 1004 return (
94407def 1005 workerInfo != null &&
6703b9f4
JB
1006 Array.isArray(workerInfo.taskFunctionNames) &&
1007 workerInfo.taskFunctionNames.length > 2
b558f6b5 1008 )
f1c06930
JB
1009 }
1010
1011 private updateTaskStatisticsWorkerUsage (
1012 workerUsage: WorkerUsage,
1013 message: MessageValue<Response>
1014 ): void {
a4e07f72 1015 const workerTaskStatistics = workerUsage.tasks
5bb5be17
JB
1016 if (
1017 workerTaskStatistics.executing != null &&
1018 workerTaskStatistics.executing > 0
1019 ) {
1020 --workerTaskStatistics.executing
5bb5be17 1021 }
6703b9f4 1022 if (message.workerError == null) {
98e72cda
JB
1023 ++workerTaskStatistics.executed
1024 } else {
a4e07f72 1025 ++workerTaskStatistics.failed
2740a743 1026 }
f8eb0a2a
JB
1027 }
1028
a4e07f72
JB
1029 private updateRunTimeWorkerUsage (
1030 workerUsage: WorkerUsage,
f8eb0a2a
JB
1031 message: MessageValue<Response>
1032 ): void {
6703b9f4 1033 if (message.workerError != null) {
dc021bcc
JB
1034 return
1035 }
e4f20deb
JB
1036 updateMeasurementStatistics(
1037 workerUsage.runTime,
1038 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
dc021bcc 1039 message.taskPerformance?.runTime ?? 0
e4f20deb 1040 )
f8eb0a2a
JB
1041 }
1042
a4e07f72
JB
1043 private updateWaitTimeWorkerUsage (
1044 workerUsage: WorkerUsage,
1c6fe997 1045 task: Task<Data>
f8eb0a2a 1046 ): void {
1c6fe997
JB
1047 const timestamp = performance.now()
1048 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
e4f20deb
JB
1049 updateMeasurementStatistics(
1050 workerUsage.waitTime,
1051 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
dc021bcc 1052 taskWaitTime
e4f20deb 1053 )
c01733f1 1054 }
1055
a4e07f72 1056 private updateEluWorkerUsage (
5df69fab 1057 workerUsage: WorkerUsage,
62c15a68
JB
1058 message: MessageValue<Response>
1059 ): void {
6703b9f4 1060 if (message.workerError != null) {
dc021bcc
JB
1061 return
1062 }
008512c7
JB
1063 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
1064 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
e4f20deb
JB
1065 updateMeasurementStatistics(
1066 workerUsage.elu.active,
008512c7 1067 eluTaskStatisticsRequirements,
dc021bcc 1068 message.taskPerformance?.elu?.active ?? 0
e4f20deb
JB
1069 )
1070 updateMeasurementStatistics(
1071 workerUsage.elu.idle,
008512c7 1072 eluTaskStatisticsRequirements,
dc021bcc 1073 message.taskPerformance?.elu?.idle ?? 0
e4f20deb 1074 )
008512c7 1075 if (eluTaskStatisticsRequirements.aggregate) {
f7510105 1076 if (message.taskPerformance?.elu != null) {
f7510105
JB
1077 if (workerUsage.elu.utilization != null) {
1078 workerUsage.elu.utilization =
1079 (workerUsage.elu.utilization +
1080 message.taskPerformance.elu.utilization) /
1081 2
1082 } else {
1083 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
1084 }
62c15a68
JB
1085 }
1086 }
1087 }
1088
280c2a77 1089 /**
f06e48d8 1090 * Chooses a worker node for the next task.
280c2a77 1091 *
6c6afb84 1092 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 1093 *
aa9eede8 1094 * @returns The chosen worker node key
280c2a77 1095 */
6c6afb84 1096 private chooseWorkerNode (): number {
930dcf12 1097 if (this.shallCreateDynamicWorker()) {
aa9eede8 1098 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
6c6afb84 1099 if (
b1aae695 1100 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
6c6afb84 1101 ) {
aa9eede8 1102 return workerNodeKey
6c6afb84 1103 }
17393ac8 1104 }
930dcf12
JB
1105 return this.workerChoiceStrategyContext.execute()
1106 }
1107
6c6afb84
JB
1108 /**
1109 * Conditions for dynamic worker creation.
1110 *
1111 * @returns Whether to create a dynamic worker or not.
1112 */
1113 private shallCreateDynamicWorker (): boolean {
930dcf12 1114 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
1115 }
1116
280c2a77 1117 /**
aa9eede8 1118 * Sends a message to worker given its worker node key.
280c2a77 1119 *
aa9eede8 1120 * @param workerNodeKey - The worker node key.
38e795c1 1121 * @param message - The message.
7d91a8cd 1122 * @param transferList - The optional array of transferable objects.
280c2a77
S
1123 */
1124 protected abstract sendToWorker (
aa9eede8 1125 workerNodeKey: number,
7d91a8cd
JB
1126 message: MessageValue<Data>,
1127 transferList?: TransferListItem[]
280c2a77
S
1128 ): void
1129
729c563d 1130 /**
41344292 1131 * Creates a new worker.
6c6afb84
JB
1132 *
1133 * @returns Newly created worker.
729c563d 1134 */
280c2a77 1135 protected abstract createWorker (): Worker
c97c7edb 1136
4a6952ff 1137 /**
aa9eede8 1138 * Creates a new, completely set up worker node.
4a6952ff 1139 *
aa9eede8 1140 * @returns New, completely set up worker node key.
4a6952ff 1141 */
aa9eede8 1142 protected createAndSetupWorkerNode (): number {
bdacc2d2 1143 const worker = this.createWorker()
280c2a77 1144
fd04474e 1145 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
35cf1c03 1146 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 1147 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
041dc05b 1148 worker.on('error', error => {
aad6fb64 1149 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
46b0bb09 1150 const workerInfo = this.getWorkerInfo(workerNodeKey)
9b106837 1151 workerInfo.ready = false
0dc838e3 1152 this.workerNodes[workerNodeKey].closeChannel()
2a69b8c5 1153 this.emitter?.emit(PoolEvents.error, error)
15b176e0
JB
1154 if (
1155 this.opts.restartWorkerOnError === true &&
b6bfca01
JB
1156 this.started &&
1157 !this.starting
15b176e0 1158 ) {
9b106837 1159 if (workerInfo.dynamic) {
aa9eede8 1160 this.createAndSetupDynamicWorkerNode()
8a1260a3 1161 } else {
aa9eede8 1162 this.createAndSetupWorkerNode()
8a1260a3 1163 }
5baee0d7 1164 }
19dbc45b 1165 if (this.opts.enableTasksQueue === true) {
9b106837 1166 this.redistributeQueuedTasks(workerNodeKey)
19dbc45b 1167 }
5baee0d7 1168 })
a35560ba 1169 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 1170 worker.once('exit', () => {
f06e48d8 1171 this.removeWorkerNode(worker)
a974afa6 1172 })
280c2a77 1173
aa9eede8 1174 const workerNodeKey = this.addWorkerNode(worker)
280c2a77 1175
aa9eede8 1176 this.afterWorkerNodeSetup(workerNodeKey)
280c2a77 1177
aa9eede8 1178 return workerNodeKey
c97c7edb 1179 }
be0676b3 1180
930dcf12 1181 /**
aa9eede8 1182 * Creates a new, completely set up dynamic worker node.
930dcf12 1183 *
aa9eede8 1184 * @returns New, completely set up dynamic worker node key.
930dcf12 1185 */
aa9eede8
JB
1186 protected createAndSetupDynamicWorkerNode (): number {
1187 const workerNodeKey = this.createAndSetupWorkerNode()
041dc05b 1188 this.registerWorkerMessageListener(workerNodeKey, message => {
aa9eede8
JB
1189 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1190 message.workerId
aad6fb64 1191 )
aa9eede8 1192 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
81c02522 1193 // Kill message received from worker
930dcf12
JB
1194 if (
1195 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1e3214b6 1196 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
7b56f532 1197 ((this.opts.enableTasksQueue === false &&
aa9eede8 1198 workerUsage.tasks.executing === 0) ||
7b56f532 1199 (this.opts.enableTasksQueue === true &&
aa9eede8
JB
1200 workerUsage.tasks.executing === 0 &&
1201 this.tasksQueueSize(localWorkerNodeKey) === 0)))
930dcf12 1202 ) {
041dc05b 1203 this.destroyWorkerNode(localWorkerNodeKey).catch(error => {
5270d253
JB
1204 this.emitter?.emit(PoolEvents.error, error)
1205 })
930dcf12
JB
1206 }
1207 })
46b0bb09 1208 const workerInfo = this.getWorkerInfo(workerNodeKey)
aa9eede8 1209 this.sendToWorker(workerNodeKey, {
b0a4db63 1210 checkActive: true,
21f710aa
JB
1211 workerId: workerInfo.id as number
1212 })
b5e113f6 1213 workerInfo.dynamic = true
b1aae695
JB
1214 if (
1215 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
1216 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1217 ) {
b5e113f6
JB
1218 workerInfo.ready = true
1219 }
33e6bb4c 1220 this.checkAndEmitDynamicWorkerCreationEvents()
aa9eede8 1221 return workerNodeKey
930dcf12
JB
1222 }
1223
a2ed5053 1224 /**
aa9eede8 1225 * Registers a listener callback on the worker given its worker node key.
a2ed5053 1226 *
aa9eede8 1227 * @param workerNodeKey - The worker node key.
a2ed5053
JB
1228 * @param listener - The message listener callback.
1229 */
85aeb3f3
JB
1230 protected abstract registerWorkerMessageListener<
1231 Message extends Data | Response
aa9eede8
JB
1232 >(
1233 workerNodeKey: number,
1234 listener: (message: MessageValue<Message>) => void
1235 ): void
a2ed5053
JB
1236
1237 /**
aa9eede8 1238 * Method hooked up after a worker node has been newly created.
a2ed5053
JB
1239 * Can be overridden.
1240 *
aa9eede8 1241 * @param workerNodeKey - The newly created worker node key.
a2ed5053 1242 */
aa9eede8 1243 protected afterWorkerNodeSetup (workerNodeKey: number): void {
a2ed5053 1244 // Listen to worker messages.
aa9eede8 1245 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
85aeb3f3 1246 // Send the startup message to worker.
aa9eede8 1247 this.sendStartupMessageToWorker(workerNodeKey)
9edb9717
JB
1248 // Send the statistics message to worker.
1249 this.sendStatisticsMessageToWorker(workerNodeKey)
72695f86 1250 if (this.opts.enableTasksQueue === true) {
a6b3272b
JB
1251 this.workerNodes[workerNodeKey].onEmptyQueue =
1252 this.taskStealingOnEmptyQueue.bind(this)
72695f86
JB
1253 this.workerNodes[workerNodeKey].onBackPressure =
1254 this.tasksStealingOnBackPressure.bind(this)
1255 }
d2c73f82
JB
1256 }
1257
85aeb3f3 1258 /**
aa9eede8
JB
1259 * Sends the startup message to worker given its worker node key.
1260 *
1261 * @param workerNodeKey - The worker node key.
1262 */
1263 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1264
1265 /**
9edb9717 1266 * Sends the statistics message to worker given its worker node key.
85aeb3f3 1267 *
aa9eede8 1268 * @param workerNodeKey - The worker node key.
85aeb3f3 1269 */
9edb9717 1270 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
aa9eede8
JB
1271 this.sendToWorker(workerNodeKey, {
1272 statistics: {
1273 runTime:
1274 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1275 .runTime.aggregate,
1276 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1277 .elu.aggregate
1278 },
46b0bb09 1279 workerId: this.getWorkerInfo(workerNodeKey).id as number
aa9eede8
JB
1280 })
1281 }
a2ed5053
JB
1282
1283 private redistributeQueuedTasks (workerNodeKey: number): void {
1284 while (this.tasksQueueSize(workerNodeKey) > 0) {
f201a0cd
JB
1285 const destinationWorkerNodeKey = this.workerNodes.reduce(
1286 (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
852ed3e4
JB
1287 return workerNode.info.ready &&
1288 workerNode.usage.tasks.queued <
1289 workerNodes[minWorkerNodeKey].usage.tasks.queued
f201a0cd
JB
1290 ? workerNodeKey
1291 : minWorkerNodeKey
1292 },
1293 0
1294 )
3f690f25
JB
1295 const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey]
1296 const task = {
1297 ...(this.dequeueTask(workerNodeKey) as Task<Data>),
1298 workerId: destinationWorkerNode.info.id as number
1299 }
1300 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1301 this.executeTask(destinationWorkerNodeKey, task)
1302 } else {
1303 this.enqueueTask(destinationWorkerNodeKey, task)
dd951876
JB
1304 }
1305 }
1306 }
1307
b1838604
JB
1308 private updateTaskStolenStatisticsWorkerUsage (
1309 workerNodeKey: number,
b1838604
JB
1310 taskName: string
1311 ): void {
1a880eca 1312 const workerNode = this.workerNodes[workerNodeKey]
b1838604
JB
1313 if (workerNode?.usage != null) {
1314 ++workerNode.usage.tasks.stolen
1315 }
1316 if (
1317 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1318 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1319 ) {
1320 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1321 taskName
1322 ) as WorkerUsage
1323 ++taskFunctionWorkerUsage.tasks.stolen
1324 }
1325 }
1326
dd951876 1327 private taskStealingOnEmptyQueue (workerId: number): void {
a6b3272b
JB
1328 const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
1329 const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey]
dd951876 1330 const workerNodes = this.workerNodes
a6b3272b 1331 .slice()
dd951876
JB
1332 .sort(
1333 (workerNodeA, workerNodeB) =>
1334 workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
1335 )
f201a0cd 1336 const sourceWorkerNode = workerNodes.find(
041dc05b 1337 workerNode =>
f201a0cd
JB
1338 workerNode.info.ready &&
1339 workerNode.info.id !== workerId &&
1340 workerNode.usage.tasks.queued > 0
1341 )
1342 if (sourceWorkerNode != null) {
1343 const task = {
1344 ...(sourceWorkerNode.popTask() as Task<Data>),
1345 workerId: destinationWorkerNode.info.id as number
0bc68267 1346 }
f201a0cd
JB
1347 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1348 this.executeTask(destinationWorkerNodeKey, task)
1349 } else {
1350 this.enqueueTask(destinationWorkerNodeKey, task)
72695f86 1351 }
f201a0cd
JB
1352 this.updateTaskStolenStatisticsWorkerUsage(
1353 destinationWorkerNodeKey,
1354 task.name as string
1355 )
72695f86
JB
1356 }
1357 }
1358
1359 private tasksStealingOnBackPressure (workerId: number): void {
f778c355
JB
1360 const sizeOffset = 1
1361 if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) {
68dbcdc0
JB
1362 return
1363 }
72695f86
JB
1364 const sourceWorkerNode =
1365 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
1366 const workerNodes = this.workerNodes
a6b3272b 1367 .slice()
72695f86
JB
1368 .sort(
1369 (workerNodeA, workerNodeB) =>
1370 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1371 )
1372 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1373 if (
0bc68267 1374 sourceWorkerNode.usage.tasks.queued > 0 &&
a6b3272b
JB
1375 workerNode.info.ready &&
1376 workerNode.info.id !== workerId &&
0bc68267 1377 workerNode.usage.tasks.queued <
f778c355 1378 (this.opts.tasksQueueOptions?.size as number) - sizeOffset
72695f86 1379 ) {
dd951876
JB
1380 const task = {
1381 ...(sourceWorkerNode.popTask() as Task<Data>),
1382 workerId: workerNode.info.id as number
1383 }
375f7504 1384 if (this.shallExecuteTask(workerNodeKey)) {
dd951876 1385 this.executeTask(workerNodeKey, task)
4de3d785 1386 } else {
dd951876 1387 this.enqueueTask(workerNodeKey, task)
4de3d785 1388 }
b1838604
JB
1389 this.updateTaskStolenStatisticsWorkerUsage(
1390 workerNodeKey,
b1838604
JB
1391 task.name as string
1392 )
10ecf8fd 1393 }
a2ed5053
JB
1394 }
1395 }
1396
be0676b3 1397 /**
aa9eede8 1398 * This method is the listener registered for each worker message.
be0676b3 1399 *
bdacc2d2 1400 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
1401 */
1402 protected workerListener (): (message: MessageValue<Response>) => void {
041dc05b 1403 return message => {
21f710aa 1404 this.checkMessageWorkerId(message)
6703b9f4 1405 if (message.ready != null && message.taskFunctionNames != null) {
81c02522 1406 // Worker ready response received from worker
10e2aa7e 1407 this.handleWorkerReadyResponse(message)
7629bdf1 1408 } else if (message.taskId != null) {
81c02522 1409 // Task execution response received from worker
6b272951 1410 this.handleTaskExecutionResponse(message)
6703b9f4
JB
1411 } else if (message.taskFunctionNames != null) {
1412 // Task function names message received from worker
46b0bb09
JB
1413 this.getWorkerInfo(
1414 this.getWorkerNodeKeyByWorkerId(message.workerId)
6703b9f4
JB
1415 ).taskFunctionNames = message.taskFunctionNames
1416 } else if (message.taskFunctionOperation != null) {
1417 // Task function operation response received from worker
6b272951
JB
1418 }
1419 }
1420 }
1421
10e2aa7e 1422 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
f05ed93c
JB
1423 if (message.ready === false) {
1424 throw new Error(`Worker ${message.workerId} failed to initialize`)
1425 }
a5d15204 1426 const workerInfo = this.getWorkerInfo(
aad6fb64 1427 this.getWorkerNodeKeyByWorkerId(message.workerId)
46b0bb09 1428 )
a5d15204 1429 workerInfo.ready = message.ready as boolean
6703b9f4 1430 workerInfo.taskFunctionNames = message.taskFunctionNames
2431bdb4
JB
1431 if (this.emitter != null && this.ready) {
1432 this.emitter.emit(PoolEvents.ready, this.info)
1433 }
6b272951
JB
1434 }
1435
1436 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
6703b9f4 1437 const { taskId, workerError, data } = message
5441aea6 1438 const promiseResponse = this.promiseResponseMap.get(taskId as string)
6b272951 1439 if (promiseResponse != null) {
6703b9f4
JB
1440 if (workerError != null) {
1441 this.emitter?.emit(PoolEvents.taskError, workerError)
1442 promiseResponse.reject(workerError.message)
6b272951 1443 } else {
5441aea6 1444 promiseResponse.resolve(data as Response)
6b272951 1445 }
501aea93
JB
1446 const workerNodeKey = promiseResponse.workerNodeKey
1447 this.afterTaskExecutionHook(workerNodeKey, message)
f3a91bac 1448 this.workerChoiceStrategyContext.update(workerNodeKey)
5441aea6 1449 this.promiseResponseMap.delete(taskId as string)
6b272951
JB
1450 if (
1451 this.opts.enableTasksQueue === true &&
b5e113f6
JB
1452 this.tasksQueueSize(workerNodeKey) > 0 &&
1453 this.workerNodes[workerNodeKey].usage.tasks.executing <
1454 (this.opts.tasksQueueOptions?.concurrency as number)
6b272951
JB
1455 ) {
1456 this.executeTask(
1457 workerNodeKey,
1458 this.dequeueTask(workerNodeKey) as Task<Data>
1459 )
be0676b3
APA
1460 }
1461 }
be0676b3 1462 }
7c0ba920 1463
a1763c54 1464 private checkAndEmitTaskExecutionEvents (): void {
33e6bb4c
JB
1465 if (this.busy) {
1466 this.emitter?.emit(PoolEvents.busy, this.info)
a1763c54
JB
1467 }
1468 }
1469
1470 private checkAndEmitTaskQueuingEvents (): void {
1471 if (this.hasBackPressure()) {
1472 this.emitter?.emit(PoolEvents.backPressure, this.info)
164d950a
JB
1473 }
1474 }
1475
33e6bb4c
JB
1476 private checkAndEmitDynamicWorkerCreationEvents (): void {
1477 if (this.type === PoolTypes.dynamic) {
1478 if (this.full) {
1479 this.emitter?.emit(PoolEvents.full, this.info)
1480 }
1481 }
1482 }
1483
8a1260a3 1484 /**
aa9eede8 1485 * Gets the worker information given its worker node key.
8a1260a3
JB
1486 *
1487 * @param workerNodeKey - The worker node key.
3f09ed9f 1488 * @returns The worker information.
8a1260a3 1489 */
46b0bb09
JB
1490 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
1491 return this.workerNodes[workerNodeKey].info
e221309a
JB
1492 }
1493
a05c10de 1494 /**
b0a4db63 1495 * Adds the given worker in the pool worker nodes.
ea7a90d3 1496 *
38e795c1 1497 * @param worker - The worker.
aa9eede8
JB
1498 * @returns The added worker node key.
1499 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
ea7a90d3 1500 */
b0a4db63 1501 private addWorkerNode (worker: Worker): number {
671d5154
JB
1502 const workerNode = new WorkerNode<Worker, Data>(
1503 worker,
ff3f866a 1504 this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2)
671d5154 1505 )
b97d82d8 1506 // Flag the worker node as ready at pool startup.
d2c73f82
JB
1507 if (this.starting) {
1508 workerNode.info.ready = true
1509 }
aa9eede8 1510 this.workerNodes.push(workerNode)
aad6fb64 1511 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
aa9eede8 1512 if (workerNodeKey === -1) {
86ed0598 1513 throw new Error('Worker added not found in worker nodes')
aa9eede8
JB
1514 }
1515 return workerNodeKey
ea7a90d3 1516 }
c923ce56 1517
51fe3d3c 1518 /**
f06e48d8 1519 * Removes the given worker from the pool worker nodes.
51fe3d3c 1520 *
f06e48d8 1521 * @param worker - The worker.
51fe3d3c 1522 */
416fd65c 1523 private removeWorkerNode (worker: Worker): void {
aad6fb64 1524 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1f68cede
JB
1525 if (workerNodeKey !== -1) {
1526 this.workerNodes.splice(workerNodeKey, 1)
1527 this.workerChoiceStrategyContext.remove(workerNodeKey)
1528 }
51fe3d3c 1529 }
adc3c320 1530
e2b31e32
JB
1531 /** @inheritDoc */
1532 public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
9e844245 1533 return (
e2b31e32
JB
1534 this.opts.enableTasksQueue === true &&
1535 this.workerNodes[workerNodeKey].hasBackPressure()
9e844245
JB
1536 )
1537 }
1538
1539 private hasBackPressure (): boolean {
1540 return (
1541 this.opts.enableTasksQueue === true &&
1542 this.workerNodes.findIndex(
041dc05b 1543 workerNode => !workerNode.hasBackPressure()
a1763c54 1544 ) === -1
9e844245 1545 )
e2b31e32
JB
1546 }
1547
b0a4db63 1548 /**
aa9eede8 1549 * Executes the given task on the worker given its worker node key.
b0a4db63 1550 *
aa9eede8 1551 * @param workerNodeKey - The worker node key.
b0a4db63
JB
1552 * @param task - The task to execute.
1553 */
2e81254d 1554 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 1555 this.beforeTaskExecutionHook(workerNodeKey, task)
bbfa38a2 1556 this.sendToWorker(workerNodeKey, task, task.transferList)
a1763c54 1557 this.checkAndEmitTaskExecutionEvents()
2e81254d
JB
1558 }
1559
f9f00b5f 1560 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
a1763c54
JB
1561 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1562 this.checkAndEmitTaskQueuingEvents()
1563 return tasksQueueSize
adc3c320
JB
1564 }
1565
416fd65c 1566 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
4b628b48 1567 return this.workerNodes[workerNodeKey].dequeueTask()
adc3c320
JB
1568 }
1569
416fd65c 1570 private tasksQueueSize (workerNodeKey: number): number {
4b628b48 1571 return this.workerNodes[workerNodeKey].tasksQueueSize()
df593701
JB
1572 }
1573
81c02522 1574 protected flushTasksQueue (workerNodeKey: number): void {
920278a2
JB
1575 while (this.tasksQueueSize(workerNodeKey) > 0) {
1576 this.executeTask(
1577 workerNodeKey,
1578 this.dequeueTask(workerNodeKey) as Task<Data>
1579 )
ff733df7 1580 }
4b628b48 1581 this.workerNodes[workerNodeKey].clearTasksQueue()
ff733df7
JB
1582 }
1583
ef41a6e6
JB
1584 private flushTasksQueues (): void {
1585 for (const [workerNodeKey] of this.workerNodes.entries()) {
1586 this.flushTasksQueue(workerNodeKey)
1587 }
1588 }
c97c7edb 1589}